File Coverage

blib/lib/MogileFS/Connection/HTTP.pm
Criterion Covered Total %
statement 118 131 90.0
branch 28 52 53.8
condition 11 28 39.2
subroutine 18 18 100.0
pod 3 6 50.0
total 178 235 75.7


line stmt bran cond sub pod time code
1             package MogileFS::Connection::HTTP;
2 21     21   112 use strict;
  21         34  
  21         469  
3 21     21   82 use warnings;
  21         32  
  21         395  
4 21     21   7014 use MogileFS::Connection::Poolable;
  21         51  
  21         453  
5 21     21   8073 use HTTP::Response;
  21         440402  
  21         641  
6 21     21   125 use base qw(MogileFS::Connection::Poolable);
  21         41  
  21         2729  
7 21     21   128 use MogileFS::Util qw/debug/;
  21         37  
  21         968  
8              
9             use fields (
10 21         99 'read_size_hint', # bytes to read for body
11             'http_response', # HTTP::Response object
12             'http_req', # HTTP request ("GET $URL")
13             'http_res_cb', # called on HTTP::Response (_after_ body is read)
14             'http_res_body_read',# number of bytes read in the response body
15             'http_res_content_cb' # filter for the response body (success-only)
16 21     21   102 );
  21         38  
17 21     21   9117 use Net::HTTP::NB;
  21         219596  
  21         182  
18              
19             sub new {
20 10     10 1 38 my ($self, $ip, $port) = @_;
21 10         159 my %opts = ( Host => "$ip:$port", Blocking => 0, KeepAlive => 300 );
22 10 50       233 my $sock = Net::HTTP::NB->new(%opts) or return;
23              
24 10 50       20967 $self = fields::new($self) unless ref $self;
25 10         6689 $self->SUPER::new($sock, $ip, $port); # MogileFS::Connection::Poolable->new
26              
27 10         31 return $self;
28             }
29              
30             # starts an HTTP request, returns immediately and relies on Danga::Socket
31             # to schedule the run the callback.
32             sub start {
33 16     16 0 55 my ($self, $method, $path, $opts, $http_res_cb) = @_;
34 16   50     38 $opts ||= {};
35              
36 16         33 my $err = delete $self->{mfs_err};
37 16 100       43 return $self->err_response($err, $http_res_cb) if $err;
38              
39 15         32 $self->{http_res_cb} = $http_res_cb;
40 15         33 $self->{http_res_content_cb} = $opts->{content_cb};
41 15   50     80 $self->{read_size_hint} = $opts->{read_size_hint} || 4096;
42              
43 15   50     109 my $h = $opts->{headers} || {};
44 15         80 $h->{'User-Agent'} = ref($self) . "/$MogileFS::Server::VERSION";
45 15         32 my $content = $opts->{content};
46 15 50       66 if (defined $content) {
47             # Net::HTTP::NB->format_request will set Content-Length for us
48 0         0 $h->{'Content-Type'} = 'application/octet-stream'
49             } else {
50 15         66 $content = "";
51             }
52              
53             # support full URLs for LWP compatibility
54             # some HTTP daemons don't support Absolute-URIs, so we only give
55             # them the HTTP/1.0-compatible path
56 15 50       51 if ($path =~ m{\Ahttps?://[^/]+(/.*)\z}) {
57 0         0 $path = $1;
58             }
59              
60 15         65 $self->set_timeout("node_timeout");
61              
62             # Force HTTP/1.0 to avoid potential chunked responses and force server
63             # to set Content-Length: instead. In practice, we'll never get chunked
64             # responses anyways as all known DAV servers will set Content-Length
65             # for static files...
66 15 50       45 $self->sock->http_version($method eq "GET" ? "1.0" : "1.1");
67 15         330 $h->{Connection} = "keep-alive";
68              
69             # format the request here since it sets the reader up to read
70 15         42 my $req = $self->sock->format_request($method, $path, %$h, $content);
71 15         2070 $self->{http_req} = "$method http://" . $self->key . $path;
72              
73             # we'll start watching for writes here since it's unlikely the
74             # 3-way handshake for new TCP connections is done at this point
75 15         65 $self->write($req);
76              
77             # start reading once we're done writing
78             $self->write(sub {
79             # we're connected after writing $req is successful, so
80             # change the timeout and wait for readability
81 14     14   621 $self->set_timeout("node_timeout");
82 14         140 $self->watch_read(1);
83 15         640 });
84             }
85              
86             # called by Danga::Socket upon readability
87             sub event_read {
88 30     30 1 1647916 my ($self) = @_;
89              
90 30         94 my $content_cb = $self->{http_res_content_cb};
91 30         118 my Net::HTTP::NB $sock = $self->sock;
92 30         163 my $res = $self->{http_response};
93              
94             # read and cache HTTP response headers
95 30 100       165 unless ($res) {
96 29         64 my ($code, $mess, @headers) = eval { $sock->read_response_headers };
  29         204  
97              
98             # wait for readability on EAGAIN
99 29 100       6664 unless (defined $code) {
100 17         52 my $err = $@;
101 17 100       52 if ($err) {
102 1         36 $err =~ s/ at .*\z//s; # do not expose source file location
103 1         16 $err =~ s/\r?\n/\\n/g; # just in case
104 1         37 return $self->err("read_response_headers: $err");
105             }
106              
107             # assume EAGAIN, though $! gets clobbered by Net::HTTP::*
108 16         40 return;
109             }
110              
111             # hold onto response object until the response body is processed
112 12         280 $res = HTTP::Response->new($code, $mess, \@headers, "");
113 12         2618 $res->protocol("HTTP/" . $sock->peer_http_version);
114 12         411 $self->{http_response} = $res;
115 12 50       60 $self->{http_res_body_read} = $content_cb ? 0 : undef;
116             }
117              
118             my $body_read = sub {
119 2 50   2   34 $content_cb ? $self->{http_res_body_read} : length($res->content);
120 13         186 };
121              
122             # continue reading the response body if we have a header
123 13         44 my $rsize = $self->{read_size_hint};
124 13         31 my $buf;
125              
126 13         152 my $clen = $res->header("Content-Length");
127 13         885 while (1) {
128 87         1950 my $n = $sock->read_entity_body($buf, $rsize);
129 87 100       6916 if (!defined $n) {
130 2 50       93 if ($!{EAGAIN}) {
131             # workaround a bug in Net::HTTP::NB
132             # ref: https://rt.cpan.org/Ticket/Display.html?id=78233
133 2 50 33     140 if (defined($clen) && $clen == $body_read->()) {
134 0         0 return $self->_http_done;
135             }
136              
137             # reset the timeout if we got any body bytes
138 2         56 $self->set_timeout("node_timeout");
139 2         8 return;
140             }
141 0 0       0 next if $!{EINTR};
142 0         0 return $self->err("read_entity_body: $!");
143             }
144 85 100       166 if ($n == 0) {
145             # EOF, call the response header callback
146 11         91 return $self->_http_done;
147             }
148 74 50       149 if ($n > 0) {
149 74 50 33     171 if ($content_cb && $res->is_success) {
150 0         0 $self->{http_res_body_read} += length($buf);
151              
152             # filter the buffer through content_cb, no buffering.
153             # This will be used by tracker-side checksumming
154             # replicate does NOT use this code path for performance
155             # reasons (tracker-side checksumming is already a slow path,
156             # so there's little point in optimizing).
157             # $buf may be empty on EOF (n == 0)
158 0         0 $content_cb->($buf, $self, $res);
159              
160 0 0 0     0 if (defined($clen) && $clen == $body_read->()) {
161 0         0 return $self->_http_done;
162             }
163             } else {
164             # append to existing buffer, this is only used for
165             # PUT/DELETE/HEAD and small GET responses (monitor)
166 74         216 $res->content($res->content . $buf);
167             }
168             # continue looping until EAGAIN or EOF (n == 0)
169             }
170             }
171             }
172              
173             # this does cleanup as an extra paranoid step to prevent circular refs
174             sub close {
175 9     9 1 86 my ($self, $close_reason) = @_;
176              
177 9         23 delete $self->{http_res_cb};
178 9         19 delete $self->{http_res_content_cb};
179              
180 9         94 $self->SUPER::close($close_reason); # MogileFS::Connection::Poolable->close
181             }
182              
183             # This is only called on a socket-level error (e.g. disconnect, timeout)
184             # bad server responses (500, 403) do not trigger this
185             sub err {
186 3     3 0 23 my ($self, $reason) = @_;
187              
188             # Fake an HTTP response like LWP does on errors.
189             # delete prevents http_res_cb from being invoked twice, as event_read
190             # will delete http_res_cb on success, too
191 3         31 my $http_res_cb = delete $self->{http_res_cb};
192              
193             # don't retry if we already got a response header nor if we got a timeout
194 3 50 33     80 if ($self->retryable($reason) && $http_res_cb && !$self->{http_response}) {
      33        
195             # do not call inflight_expire here, since we need inflight_cb
196             # for retrying
197              
198 0         0 $self->close(":retry"); # trigger a retry in MogileFS::ConnectionPool
199             } else {
200             # ensure we don't call new_err on close()
201 3         58 $self->inflight_expire;
202              
203             # free the FD before invoking the callback
204 3         32 $self->close($reason);
205 3 50       39 $self->err_response($reason, $http_res_cb) if $http_res_cb;
206             }
207             }
208              
209             # Fakes an HTTP response like LWP does on errors.
210             sub err_response {
211 4     4 0 16 my ($self, $err, $http_res_cb) = @_;
212              
213 4         85 my $res = HTTP::Response->new(500, $err);
214 4   50     394 $err ||= "(unspecified error)";
215 4   100     33 my $req = $self->{http_req} || "no HTTP request made";
216 4         82 Mgd::error("$err: $req");
217 4         20 $res->header("X-MFS-Error", $err);
218 4         588 $res->protocol("HTTP/1.0");
219 4         60 $http_res_cb->($res);
220             }
221              
222             # returns true if the HTTP connection is persistent/reusable, false if not.
223             sub _http_persistent {
224 11     11   33 my ($self, $res) = @_;
225              
226             # determine if this connection is reusable:
227 11         61 my $connection = $res->header("Connection");
228 11         561 my $persist;
229              
230             # Connection: header takes precedence over protocol version
231 11 50       42 if ($connection) {
232 0 0       0 if ($connection =~ /\bkeep-alive\b/i) {
    0          
233 0         0 $persist = 1;
234             } elsif ($connection =~ /\bclose\b/i) {
235 0         0 $persist = 0;
236             }
237              
238             # if we can't make sense of the Connection: header, fall through
239             # and decided based on protocol version
240             }
241              
242             # HTTP/1.1 is persistent-by-default, HTTP/1.0 is not.
243             # Will there be HTTP/1.2?
244 11 50       48 $persist = $res->protocol eq "HTTP/1.1" unless defined $persist;
245              
246             # we're not persistent if the pool is full, either
247 11   33     264 return ($persist && $self->persist);
248             }
249              
250             # Called on successfully read HTTP response (it could be a server-side
251             # error (404,403,500...), but not a socket error between client<->server).
252             sub _http_done {
253 11     11   33 my ($self) = @_;
254              
255             # delete ensures we only fire the callback once
256 11         32 my $http_res_cb = delete $self->{http_res_cb};
257 11         26 my $res = delete $self->{http_response};
258 11         27 delete $self->{http_req};
259              
260             # ensure we don't call new_err on eventual close()
261 11         71 $self->inflight_expire;
262              
263             # free up the FD if possible
264 11 50       92 $self->close('http_close') unless $self->_http_persistent($res);
265              
266             # finally, invoke the user-supplied callback
267 11         50 $http_res_cb->($res);
268             }
269              
270             1;