line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
package MogileFS::Connection::HTTP; |
2
|
21
|
|
|
21
|
|
120
|
use strict; |
|
21
|
|
|
|
|
54
|
|
|
21
|
|
|
|
|
828
|
|
3
|
21
|
|
|
21
|
|
119
|
use warnings; |
|
21
|
|
|
|
|
50
|
|
|
21
|
|
|
|
|
729
|
|
4
|
21
|
|
|
21
|
|
11885
|
use MogileFS::Connection::Poolable; |
|
21
|
|
|
|
|
72
|
|
|
21
|
|
|
|
|
611
|
|
5
|
21
|
|
|
21
|
|
31790
|
use HTTP::Response; |
|
21
|
|
|
|
|
943180
|
|
|
21
|
|
|
|
|
997
|
|
6
|
21
|
|
|
21
|
|
258
|
use base qw(MogileFS::Connection::Poolable); |
|
21
|
|
|
|
|
62
|
|
|
21
|
|
|
|
|
5032
|
|
7
|
21
|
|
|
21
|
|
145
|
use MogileFS::Util qw/debug/; |
|
21
|
|
|
|
|
59
|
|
|
21
|
|
|
|
|
1964
|
|
8
|
|
|
|
|
|
|
|
9
|
|
|
|
|
|
|
use fields ( |
10
|
21
|
|
|
|
|
227
|
'read_size_hint', # bytes to read for body |
11
|
|
|
|
|
|
|
'http_response', # HTTP::Response object |
12
|
|
|
|
|
|
|
'http_req', # HTTP request ("GET $URL") |
13
|
|
|
|
|
|
|
'http_res_cb', # called on HTTP::Response (_after_ body is read) |
14
|
|
|
|
|
|
|
'http_res_body_read',# number of bytes read in the response body |
15
|
|
|
|
|
|
|
'http_res_content_cb' # filter for the response body (success-only) |
16
|
21
|
|
|
21
|
|
133
|
); |
|
21
|
|
|
|
|
45
|
|
17
|
21
|
|
|
21
|
|
28744
|
use Net::HTTP::NB; |
|
21
|
|
|
|
|
580794
|
|
|
21
|
|
|
|
|
442
|
|
18
|
|
|
|
|
|
|
|
19
|
|
|
|
|
|
|
sub new { |
20
|
10
|
|
|
10
|
1
|
78
|
my ($self, $ip, $port) = @_; |
21
|
10
|
|
|
|
|
160
|
my %opts = ( Host => "$ip:$port", Blocking => 0, KeepAlive => 300 ); |
22
|
10
|
50
|
|
|
|
793
|
my $sock = Net::HTTP::NB->new(%opts) or return; |
23
|
|
|
|
|
|
|
|
24
|
10
|
50
|
|
|
|
37290
|
$self = fields::new($self) unless ref $self; |
25
|
10
|
|
|
|
|
15624
|
$self->SUPER::new($sock, $ip, $port); # MogileFS::Connection::Poolable->new |
26
|
|
|
|
|
|
|
|
27
|
10
|
|
|
|
|
59
|
return $self; |
28
|
|
|
|
|
|
|
} |
29
|
|
|
|
|
|
|
|
30
|
|
|
|
|
|
|
# starts an HTTP request, returns immediately and relies on Danga::Socket |
31
|
|
|
|
|
|
|
# to schedule the run the callback. |
32
|
|
|
|
|
|
|
sub start { |
33
|
16
|
|
|
16
|
0
|
46
|
my ($self, $method, $path, $opts, $http_res_cb) = @_; |
34
|
16
|
|
50
|
|
|
61
|
$opts ||= {}; |
35
|
|
|
|
|
|
|
|
36
|
16
|
|
|
|
|
49
|
my $err = delete $self->{mfs_err}; |
37
|
16
|
100
|
|
|
|
115
|
return $self->err_response($err, $http_res_cb) if $err; |
38
|
|
|
|
|
|
|
|
39
|
15
|
|
|
|
|
46
|
$self->{http_res_cb} = $http_res_cb; |
40
|
15
|
|
|
|
|
45
|
$self->{http_res_content_cb} = $opts->{content_cb}; |
41
|
15
|
|
50
|
|
|
286
|
$self->{read_size_hint} = $opts->{read_size_hint} || 4096; |
42
|
|
|
|
|
|
|
|
43
|
15
|
|
50
|
|
|
116
|
my $h = $opts->{headers} || {}; |
44
|
15
|
|
|
|
|
125
|
$h->{'User-Agent'} = ref($self) . "/$MogileFS::Server::VERSION"; |
45
|
15
|
|
|
|
|
26
|
my $content = $opts->{content}; |
46
|
15
|
50
|
|
|
|
119
|
if (defined $content) { |
47
|
|
|
|
|
|
|
# Net::HTTP::NB->format_request will set Content-Length for us |
48
|
0
|
|
|
|
|
0
|
$h->{'Content-Type'} = 'application/octet-stream' |
49
|
|
|
|
|
|
|
} else { |
50
|
15
|
|
|
|
|
52
|
$content = ""; |
51
|
|
|
|
|
|
|
} |
52
|
|
|
|
|
|
|
|
53
|
|
|
|
|
|
|
# support full URLs for LWP compatibility |
54
|
|
|
|
|
|
|
# some HTTP daemons don't support Absolute-URIs, so we only give |
55
|
|
|
|
|
|
|
# them the HTTP/1.0-compatible path |
56
|
15
|
50
|
|
|
|
128
|
if ($path =~ m{\Ahttps?://[^/]+(/.*)\z}) { |
57
|
0
|
|
|
|
|
0
|
$path = $1; |
58
|
|
|
|
|
|
|
} |
59
|
|
|
|
|
|
|
|
60
|
15
|
|
|
|
|
101
|
$self->set_timeout("node_timeout"); |
61
|
|
|
|
|
|
|
|
62
|
|
|
|
|
|
|
# Force HTTP/1.0 to avoid potential chunked responses and force server |
63
|
|
|
|
|
|
|
# to set Content-Length: instead. In practice, we'll never get chunked |
64
|
|
|
|
|
|
|
# responses anyways as all known DAV servers will set Content-Length |
65
|
|
|
|
|
|
|
# for static files... |
66
|
15
|
50
|
|
|
|
60
|
$self->sock->http_version($method eq "GET" ? "1.0" : "1.1"); |
67
|
15
|
|
|
|
|
575
|
$h->{Connection} = "keep-alive"; |
68
|
|
|
|
|
|
|
|
69
|
|
|
|
|
|
|
# format the request here since it sets the reader up to read |
70
|
15
|
|
|
|
|
56
|
my $req = $self->sock->format_request($method, $path, %$h, $content); |
71
|
15
|
|
|
|
|
2218
|
$self->{http_req} = "$method http://" . $self->key . $path; |
72
|
|
|
|
|
|
|
|
73
|
|
|
|
|
|
|
# we'll start watching for writes here since it's unlikely the |
74
|
|
|
|
|
|
|
# 3-way handshake for new TCP connections is done at this point |
75
|
15
|
|
|
|
|
124
|
$self->write($req); |
76
|
|
|
|
|
|
|
|
77
|
|
|
|
|
|
|
# start reading once we're done writing |
78
|
|
|
|
|
|
|
$self->write(sub { |
79
|
|
|
|
|
|
|
# we're connected after writing $req is successful, so |
80
|
|
|
|
|
|
|
# change the timeout and wait for readability |
81
|
14
|
|
|
14
|
|
992
|
$self->set_timeout("node_timeout"); |
82
|
14
|
|
|
|
|
88
|
$self->watch_read(1); |
83
|
15
|
|
|
|
|
23678
|
}); |
84
|
|
|
|
|
|
|
} |
85
|
|
|
|
|
|
|
|
86
|
|
|
|
|
|
|
# called by Danga::Socket upon readability |
87
|
|
|
|
|
|
|
sub event_read { |
88
|
15
|
|
|
15
|
1
|
1747526
|
my ($self) = @_; |
89
|
|
|
|
|
|
|
|
90
|
15
|
|
|
|
|
83
|
my $content_cb = $self->{http_res_content_cb}; |
91
|
15
|
|
|
|
|
266
|
my Net::HTTP::NB $sock = $self->sock; |
92
|
15
|
|
|
|
|
202
|
my $res = $self->{http_response}; |
93
|
|
|
|
|
|
|
|
94
|
|
|
|
|
|
|
# read and cache HTTP response headers |
95
|
15
|
100
|
|
|
|
112
|
unless ($res) { |
96
|
14
|
|
|
|
|
146
|
my ($code, $mess, @headers) = eval { $sock->read_response_headers }; |
|
14
|
|
|
|
|
333
|
|
97
|
|
|
|
|
|
|
|
98
|
|
|
|
|
|
|
# wait for readability on EAGAIN |
99
|
14
|
100
|
|
|
|
6234
|
unless (defined $code) { |
100
|
2
|
|
|
|
|
57
|
my $err = $@; |
101
|
2
|
100
|
|
|
|
47
|
if ($err) { |
102
|
1
|
|
|
|
|
88
|
$err =~ s/ at .*\z//s; # do not expose source file location |
103
|
1
|
|
|
|
|
34
|
$err =~ s/\r?\n/\\n/g; # just in case |
104
|
1
|
|
|
|
|
36
|
return $self->err("read_response_headers: $err"); |
105
|
|
|
|
|
|
|
} |
106
|
|
|
|
|
|
|
|
107
|
|
|
|
|
|
|
# assume EAGAIN, though $! gets clobbered by Net::HTTP::* |
108
|
1
|
|
|
|
|
21
|
return; |
109
|
|
|
|
|
|
|
} |
110
|
|
|
|
|
|
|
|
111
|
|
|
|
|
|
|
# hold onto response object until the response body is processed |
112
|
12
|
|
|
|
|
517
|
$res = HTTP::Response->new($code, $mess, \@headers, ""); |
113
|
12
|
|
|
|
|
4570
|
$res->protocol("HTTP/" . $sock->peer_http_version); |
114
|
12
|
|
|
|
|
463
|
$self->{http_response} = $res; |
115
|
12
|
50
|
|
|
|
78
|
$self->{http_res_body_read} = $content_cb ? 0 : undef; |
116
|
|
|
|
|
|
|
} |
117
|
|
|
|
|
|
|
|
118
|
|
|
|
|
|
|
my $body_read = sub { |
119
|
2
|
50
|
|
2
|
|
40
|
$content_cb ? $self->{http_res_body_read} : length($res->content); |
120
|
13
|
|
|
|
|
585
|
}; |
121
|
|
|
|
|
|
|
|
122
|
|
|
|
|
|
|
# continue reading the response body if we have a header |
123
|
13
|
|
|
|
|
44
|
my $rsize = $self->{read_size_hint}; |
124
|
13
|
|
|
|
|
37
|
my $buf; |
125
|
|
|
|
|
|
|
|
126
|
13
|
|
|
|
|
109
|
my $clen = $res->header("Content-Length"); |
127
|
13
|
|
|
|
|
953
|
while (1) { |
128
|
23
|
|
|
|
|
493
|
my $n = $sock->read_entity_body($buf, $rsize); |
129
|
23
|
100
|
|
|
|
1808
|
if (!defined $n) { |
130
|
2
|
50
|
|
|
|
123
|
if ($!{EAGAIN}) { |
131
|
|
|
|
|
|
|
# workaround a bug in Net::HTTP::NB |
132
|
|
|
|
|
|
|
# ref: https://rt.cpan.org/Ticket/Display.html?id=78233 |
133
|
2
|
50
|
33
|
|
|
182
|
if (defined($clen) && $clen == $body_read->()) { |
134
|
0
|
|
|
|
|
0
|
return $self->_http_done; |
135
|
|
|
|
|
|
|
} |
136
|
|
|
|
|
|
|
|
137
|
|
|
|
|
|
|
# reset the timeout if we got any body bytes |
138
|
2
|
|
|
|
|
124
|
$self->set_timeout("node_timeout"); |
139
|
2
|
|
|
|
|
12
|
return; |
140
|
|
|
|
|
|
|
} |
141
|
0
|
0
|
|
|
|
0
|
next if $!{EINTR}; |
142
|
0
|
|
|
|
|
0
|
return $self->err("read_entity_body: $!"); |
143
|
|
|
|
|
|
|
} |
144
|
21
|
100
|
|
|
|
88
|
if ($n == 0) { |
145
|
|
|
|
|
|
|
# EOF, call the response header callback |
146
|
11
|
|
|
|
|
110
|
return $self->_http_done; |
147
|
|
|
|
|
|
|
} |
148
|
10
|
50
|
|
|
|
60
|
if ($n > 0) { |
149
|
10
|
50
|
33
|
|
|
135
|
if ($content_cb && $res->is_success) { |
150
|
0
|
|
|
|
|
0
|
$self->{http_res_body_read} += length($buf); |
151
|
|
|
|
|
|
|
|
152
|
|
|
|
|
|
|
# filter the buffer through content_cb, no buffering. |
153
|
|
|
|
|
|
|
# This will be used by tracker-side checksumming |
154
|
|
|
|
|
|
|
# replicate does NOT use this code path for performance |
155
|
|
|
|
|
|
|
# reasons (tracker-side checksumming is already a slow path, |
156
|
|
|
|
|
|
|
# so there's little point in optimizing). |
157
|
|
|
|
|
|
|
# $buf may be empty on EOF (n == 0) |
158
|
0
|
|
|
|
|
0
|
$content_cb->($buf, $self, $res); |
159
|
|
|
|
|
|
|
|
160
|
0
|
0
|
0
|
|
|
0
|
if (defined($clen) && $clen == $body_read->()) { |
161
|
0
|
|
|
|
|
0
|
return $self->_http_done; |
162
|
|
|
|
|
|
|
} |
163
|
|
|
|
|
|
|
} else { |
164
|
|
|
|
|
|
|
# append to existing buffer, this is only used for |
165
|
|
|
|
|
|
|
# PUT/DELETE/HEAD and small GET responses (monitor) |
166
|
10
|
|
|
|
|
110
|
$res->content($res->content . $buf); |
167
|
|
|
|
|
|
|
} |
168
|
|
|
|
|
|
|
# continue looping until EAGAIN or EOF (n == 0) |
169
|
|
|
|
|
|
|
} |
170
|
|
|
|
|
|
|
} |
171
|
|
|
|
|
|
|
} |
172
|
|
|
|
|
|
|
|
173
|
|
|
|
|
|
|
# this does cleanup as an extra paranoid step to prevent circular refs |
174
|
|
|
|
|
|
|
sub close { |
175
|
9
|
|
|
9
|
1
|
108
|
my ($self, $close_reason) = @_; |
176
|
|
|
|
|
|
|
|
177
|
9
|
|
|
|
|
30
|
delete $self->{http_res_cb}; |
178
|
9
|
|
|
|
|
43
|
delete $self->{http_res_content_cb}; |
179
|
|
|
|
|
|
|
|
180
|
9
|
|
|
|
|
98
|
$self->SUPER::close($close_reason); # MogileFS::Connection::Poolable->close |
181
|
|
|
|
|
|
|
} |
182
|
|
|
|
|
|
|
|
183
|
|
|
|
|
|
|
# This is only called on a socket-level error (e.g. disconnect, timeout) |
184
|
|
|
|
|
|
|
# bad server responses (500, 403) do not trigger this |
185
|
|
|
|
|
|
|
sub err { |
186
|
3
|
|
|
3
|
0
|
39
|
my ($self, $reason) = @_; |
187
|
|
|
|
|
|
|
|
188
|
|
|
|
|
|
|
# Fake an HTTP response like LWP does on errors. |
189
|
|
|
|
|
|
|
# delete prevents http_res_cb from being invoked twice, as event_read |
190
|
|
|
|
|
|
|
# will delete http_res_cb on success, too |
191
|
3
|
|
|
|
|
22
|
my $http_res_cb = delete $self->{http_res_cb}; |
192
|
|
|
|
|
|
|
|
193
|
|
|
|
|
|
|
# don't retry if we already got a response header nor if we got a timeout |
194
|
3
|
50
|
33
|
|
|
109
|
if ($self->retryable($reason) && $http_res_cb && !$self->{http_response}) { |
|
|
|
33
|
|
|
|
|
195
|
|
|
|
|
|
|
# do not call inflight_expire here, since we need inflight_cb |
196
|
|
|
|
|
|
|
# for retrying |
197
|
|
|
|
|
|
|
|
198
|
0
|
|
|
|
|
0
|
$self->close(":retry"); # trigger a retry in MogileFS::ConnectionPool |
199
|
|
|
|
|
|
|
} else { |
200
|
|
|
|
|
|
|
# ensure we don't call new_err on close() |
201
|
3
|
|
|
|
|
55
|
$self->inflight_expire; |
202
|
|
|
|
|
|
|
|
203
|
|
|
|
|
|
|
# free the FD before invoking the callback |
204
|
3
|
|
|
|
|
38
|
$self->close($reason); |
205
|
3
|
50
|
|
|
|
31
|
$self->err_response($reason, $http_res_cb) if $http_res_cb; |
206
|
|
|
|
|
|
|
} |
207
|
|
|
|
|
|
|
} |
208
|
|
|
|
|
|
|
|
209
|
|
|
|
|
|
|
# Fakes an HTTP response like LWP does on errors. |
210
|
|
|
|
|
|
|
sub err_response { |
211
|
4
|
|
|
4
|
0
|
11
|
my ($self, $err, $http_res_cb) = @_; |
212
|
|
|
|
|
|
|
|
213
|
4
|
|
|
|
|
139
|
my $res = HTTP::Response->new(500, $err); |
214
|
4
|
|
50
|
|
|
631
|
$err ||= "(unspecified error)"; |
215
|
4
|
|
100
|
|
|
31
|
my $req = $self->{http_req} || "no HTTP request made"; |
216
|
4
|
|
|
|
|
102
|
Mgd::error("$err: $req"); |
217
|
4
|
|
|
|
|
27
|
$res->header("X-MFS-Error", $err); |
218
|
4
|
|
|
|
|
793
|
$res->protocol("HTTP/1.0"); |
219
|
4
|
|
|
|
|
59
|
$http_res_cb->($res); |
220
|
|
|
|
|
|
|
} |
221
|
|
|
|
|
|
|
|
222
|
|
|
|
|
|
|
# returns true if the HTTP connection is persistent/reusable, false if not. |
223
|
|
|
|
|
|
|
sub _http_persistent { |
224
|
11
|
|
|
11
|
|
27
|
my ($self, $res) = @_; |
225
|
|
|
|
|
|
|
|
226
|
|
|
|
|
|
|
# determine if this connection is reusable: |
227
|
11
|
|
|
|
|
60
|
my $connection = $res->header("Connection"); |
228
|
11
|
|
|
|
|
495
|
my $persist; |
229
|
|
|
|
|
|
|
|
230
|
|
|
|
|
|
|
# Connection: header takes precedence over protocol version |
231
|
11
|
50
|
|
|
|
41
|
if ($connection) { |
232
|
0
|
0
|
|
|
|
0
|
if ($connection =~ /\bkeep-alive\b/i) { |
|
|
0
|
|
|
|
|
|
233
|
0
|
|
|
|
|
0
|
$persist = 1; |
234
|
|
|
|
|
|
|
} elsif ($connection =~ /\bclose\b/i) { |
235
|
0
|
|
|
|
|
0
|
$persist = 0; |
236
|
|
|
|
|
|
|
} |
237
|
|
|
|
|
|
|
|
238
|
|
|
|
|
|
|
# if we can't make sense of the Connection: header, fall through |
239
|
|
|
|
|
|
|
# and decided based on protocol version |
240
|
|
|
|
|
|
|
} |
241
|
|
|
|
|
|
|
|
242
|
|
|
|
|
|
|
# HTTP/1.1 is persistent-by-default, HTTP/1.0 is not. |
243
|
|
|
|
|
|
|
# Will there be HTTP/1.2? |
244
|
11
|
50
|
|
|
|
77
|
$persist = $res->protocol eq "HTTP/1.1" unless defined $persist; |
245
|
|
|
|
|
|
|
|
246
|
|
|
|
|
|
|
# we're not persistent if the pool is full, either |
247
|
11
|
|
33
|
|
|
1478
|
return ($persist && $self->persist); |
248
|
|
|
|
|
|
|
} |
249
|
|
|
|
|
|
|
|
250
|
|
|
|
|
|
|
# Called on successfully read HTTP response (it could be a server-side |
251
|
|
|
|
|
|
|
# error (404,403,500...), but not a socket error between client<->server). |
252
|
|
|
|
|
|
|
sub _http_done { |
253
|
11
|
|
|
11
|
|
28
|
my ($self) = @_; |
254
|
|
|
|
|
|
|
|
255
|
|
|
|
|
|
|
# delete ensures we only fire the callback once |
256
|
11
|
|
|
|
|
53
|
my $http_res_cb = delete $self->{http_res_cb}; |
257
|
11
|
|
|
|
|
53
|
my $res = delete $self->{http_response}; |
258
|
11
|
|
|
|
|
60
|
delete $self->{http_req}; |
259
|
|
|
|
|
|
|
|
260
|
|
|
|
|
|
|
# ensure we don't call new_err on eventual close() |
261
|
11
|
|
|
|
|
171
|
$self->inflight_expire; |
262
|
|
|
|
|
|
|
|
263
|
|
|
|
|
|
|
# free up the FD if possible |
264
|
11
|
50
|
|
|
|
79
|
$self->close('http_close') unless $self->_http_persistent($res); |
265
|
|
|
|
|
|
|
|
266
|
|
|
|
|
|
|
# finally, invoke the user-supplied callback |
267
|
11
|
|
|
|
|
67
|
$http_res_cb->($res); |
268
|
|
|
|
|
|
|
} |
269
|
|
|
|
|
|
|
|
270
|
|
|
|
|
|
|
1; |