File Coverage

blib/lib/MogileFS/Client/CallbackFile.pm
Criterion Covered Total %
statement 91 187 48.6
branch 13 88 14.7
condition 6 28 21.4
subroutine 18 19 94.7
pod 1 2 50.0
total 129 324 39.8


line stmt bran cond sub pod time code
1             package MogileFS::Client::CallbackFile;
2 1     1   36365 use strict;
  1         2  
  1         34  
3 1     1   5 use warnings;
  1         2  
  1         21  
4 1     1   825 use URI;
  1         8087  
  1         29  
5 1     1   9 use Carp;
  1         1  
  1         87  
6 1     1   1001 use IO::Socket::INET;
  1         28720  
  1         9  
7 1     1   2301 use File::Slurp qw/ slurp /;
  1         13815  
  1         130  
8 1     1   1008 use Try::Tiny;
  1         1479  
  1         58  
9 1     1   4 use Socket qw/ SO_SNDBUF SOL_SOCKET IPPROTO_TCP /;
  1         2  
  1         264  
10 1     1   997 use Time::HiRes qw/ gettimeofday tv_interval /;
  1         2021  
  1         5  
11 1     1   1120 use Linux::PipeMagic qw/ syssendfile /;
  1         5256  
  1         82  
12 1     1   1375 use IO::AIO qw/ fadvise /;
  1         7229  
  1         135  
13 1     1   939 use LWP::Simple qw/ head /;
  1         91720  
  1         11  
14              
15 1     1   243 use base qw/ MogileFS::Client::Async /;
  1         3  
  1         789  
16              
17 1 50   1   12 use constant TCP_CORK => ($^O eq "linux" ? 3 : 0); # XXX
  1         3  
  1         67  
18              
19 1     1   7 use namespace::clean;
  1         2  
  1         5  
20              
21             =head1 NAME
22              
23             MogileFS::Client::CallbackFile
24              
25             =head1 SYNOPSIS
26              
27             my $mogfs = MogileFS::Client::Callback->new( ... )
28              
29             open(my $read_fh, "<", "...") or die ...
30             my $eventual_length = -s $read_fh;
31             my $f = $mogfs->store_file_from_fh($key, $class, $read_fh, $eventual_length, \%opts);
32              
33             $f->($eventual_length, 0); # upload entire file
34              
35             $f->($eventual_length, 1); # indicate EOF
36              
37             =head1 DESCRIPTION
38              
39             This package inherits from L and provides an additional
40             blocking API in which the data you wish to upload is read from a file when
41             commanded by a callback function. This allows other processing to take place on
42             data as you read it from disc or elsewhere.
43              
44             The trackers, and storage backends, are tried repeatedly until the file is
45             successfully stored, or an error is thrown.
46              
47             The C<$key> parameter may be a closure. In this case, it is called every time
48             before C is called, allowing a different key to be used if an
49             upload fails, allowing for additional paranoia.
50              
51             =head1 SEE ALSO
52              
53             =over
54              
55             =item L
56              
57             =back
58              
59             =cut
60              
61             sub store_file_from_fh {
62 1     1 0 8876 my $self = shift;
63 1 50       9 return undef if $self->{readonly};
64              
65 1         3 my ($_key, $class, $read_fh, $eventual_length, $opts) = @_;
66 1   50     104 $opts ||= {};
67              
68             # Hint to Linux that doubling readahead will probably pay off.
69 1         291 fadvise($read_fh, 0, 0, IO::AIO::FADV_SEQUENTIAL());
70              
71             # Extra args to be passed along with the create_open and create_close commands.
72             # Any internally generated args of the same name will overwrite supplied ones in
73             # these hashes.
74 1   50     14 my $create_open_args = $opts->{create_open_args} || {};
75 1   50     9 my $create_close_args = $opts->{create_close_args} || {};
76              
77 1         3 my @dests; # ( [devid,path,fid], [devid,path,fid], ... )
78              
79             my $key;
80              
81             my $get_new_dest = sub {
82 6 50   6   22 if (@dests) {
83 0         0 return pop @dests;
84             }
85              
86 6         14 foreach (1..5) {
87 30 50       221 $key = ref($_key) eq 'CODE' ? $_key->() : $_key;
88              
89 30         390 $self->run_hook('store_file_start', $self, $key, $class, $opts);
90 30         1085 $self->run_hook('new_file_start', $self, $key, $class, $opts);
91              
92             # Calls to the backend may be explodey.
93 30         203 my $res;
94             try {
95 30   50     3710 $res = $self->{backend}->do_request(
96             create_open => {
97             %$create_open_args,
98             domain => $self->{domain},
99             class => $class,
100             key => $key,
101             fid => $opts->{fid} || 0, # fid should be specified, or pass 0 meaning to auto-generate one
102             multi_dest => 1,
103             size => $eventual_length, # not supported by current version
104             }
105             );
106             }
107             catch {
108 30         6132795 warn "Mogile backend failed: $_";
109 30 50       449 $self->{backend}->force_disconnect() if $self->{backend}->can('force_disconnect');
110 30         651 };
111              
112 30 50       702 unless ($res) {
113             # Attempting to connect to the Mogile backend completely failed
114             # let's sleep for a second to see if the problem clears. We
115             # don't sleep for other errors as we'll arrive back here if the
116             # network fails eventually.
117 30         30028202 sleep 1;
118 30         310 next;
119             }
120              
121 0         0 for my $i (1..$res->{dev_count}) {
122 0         0 push @dests, {
123             devid => $res->{"devid_$i"},
124             path => $res->{"path_$i"},
125             fid => $res->{fid},
126             };
127             }
128 0 0       0 if (@dests) {
129 0         0 return pop @dests;
130             }
131             }
132 6         180 die "Fail to get a destination to write to.";
133 1         8 };
134              
135             # When we have a hiccough in your connection, we mark $socket as undef to
136             # indicate that we should reconnect.
137 1         3 my $socket;
138              
139              
140             # We keep track of where we last wrote to.
141             my $last_written_point;
142              
143             # The pointing to the arrayref we're currently writing to.
144 0         0 my $current_dest;
145 0         0 my $create_close_timed_out;
146              
147             return sub {
148 2     2   744 my ($available_to_read, $eof, $checksum) = @_;
149              
150 2         5 my $last_error;
151              
152             my $fail_write_attempt = sub {
153 6         19 my ($msg) = @_;
154 6   50     34 $last_error = $msg || "unknown error";
155              
156 6 50       40 if ($opts->{on_failure}) {
157 0 0       0 $opts->{on_failure}->({
158             url => $current_dest ? $current_dest->{path} : undef,
159             bytes_sent => $last_written_point,
160             total_bytes => $eventual_length,
161             client => 'callbackfile',
162             error => $msg,
163             });
164             }
165              
166 6         1684 warn $msg;
167 6         18 $socket = undef;
168 6         41 $last_written_point = 0;
169 2         13 };
170              
171              
172 2         7 foreach (1..5) {
173 6         194 $last_error = undef;
174              
175             # Create a connection to the storage backend
176 6 50       21 unless ($socket) {
177 6 50       94 sysseek($read_fh, 0, 0) or die "seek failed: $!";
178             try {
179 6         439 $last_written_point = 0;
180 6         15 $current_dest = $get_new_dest->();
181              
182 0 0       0 $opts->{on_new_attempt}->($current_dest) if $opts->{on_new_attempt};
183              
184 0         0 my $uri = URI->new($current_dest->{path});
185 0 0       0 $socket = IO::Socket::INET->new(
186             Timeout => 10,
187             Proto => "tcp",
188             PeerPort => $uri->port,
189             PeerHost => $uri->host,
190             ) or die "connect to ".$current_dest->{path}." failed: $!";
191              
192 0 0       0 $opts->{on_connect}->() if $opts->{on_connect};
193              
194 0         0 my $buf = 'PUT ' . $uri->path . " HTTP/1.0\r\nConnection: close\r\nContent-Length: $eventual_length\r\n\r\n";
195 0 0 0     0 setsockopt($socket, SOL_SOCKET, SO_SNDBUF, 65536) or warn "could not enlarge socket buffer: $!" if (unpack("I", getsockopt($socket, SOL_SOCKET, SO_SNDBUF)) < 65536);
196 0 0       0 setsockopt($socket, IPPROTO_TCP, TCP_CORK, 1) or warn "could not set TCP_CORK" if TCP_CORK;
197 0 0       0 syswrite($socket, $buf)==length($buf) or die "Could not write all: $!";
198             }
199             catch {
200 6         129 $fail_write_attempt->($_);
201 6         88 };
202             }
203              
204             # Write as much data as we have
205 6 50       141 if ($socket) {
206 0         0 my $bytes_to_write = $available_to_read - $last_written_point;
207 0         0 my $block_size = $bytes_to_write;
208              
209 0         0 SENDFILE: while ($bytes_to_write > 0) {
210 0         0 my $c = syssendfile($socket, $read_fh, $block_size);
211 0 0 0     0 if ($c > 0) {
    0          
212 0         0 $last_written_point += $c;
213 0         0 $bytes_to_write -= $c;
214             }
215             elsif ($c == -1 && $block_size > 1024*1024) {
216             # 32 bit kernels won't even allow you to send more than 2Gb, it seems.
217             # Retry with a smaller block size.
218 0         0 $block_size = 1024*1024;
219             }
220             else {
221 0         0 $fail_write_attempt->($_);
222 0         0 warn "syssendfile failed, only $c out of $bytes_to_write written: $!";
223 0         0 last SENDFILE;
224             }
225             }
226              
227 0 0       0 if ($bytes_to_write < 0) {
228 0         0 die "unpossible!";
229             }
230             }
231              
232 6 50 33     46 if ($socket && $eof) {
    100          
233 0 0       0 setsockopt($socket, IPPROTO_TCP, TCP_CORK, 0) or warn "could not unset TCP_CORK: $!" if TCP_CORK;
234 0 0       0 shutdown($socket, 1) or warn "could not shutdown socket: $!";
235 0 0       0 die "File is longer than initially declared, is it still being written to? We are at $last_written_point, $eventual_length initially declared" if ($last_written_point > $eventual_length);
236 0 0       0 die "Cannot be at eof, only $last_written_point out of $eventual_length written!" unless ($last_written_point == $eventual_length);
237              
238 0         0 $self->run_hook('new_file_end', $self, $key, $class, $opts);
239              
240 0         0 my $buf;
241             try {
242 0         0 $buf = slurp($socket);
243             }
244             catch {
245 0         0 warn $_;
246 0         0 };
247              
248 0 0       0 if (!defined($buf)) {
249 0         0 $fail_write_attempt->("slurp failed");
250 0         0 next;
251             }
252              
253 0 0       0 unless(close($socket)) {
254 0         0 $fail_write_attempt->($!);
255 0         0 warn "could not close socket: $!";
256 0         0 next;
257             }
258              
259 0         0 my ($top, @headers) = split /\r?\n/, $buf;
260 0 0       0 if ($top =~ m{HTTP/1.[01]\s+2\d\d}) {
261             # Woo, 200!
262              
263 0 0       0 $opts->{on_http_done}->() if $opts->{http_done};
264              
265 0         0 my @cs;
266              
267 0 0 0     0 if (!$checksum) {
    0          
268             try {
269             # XXX - What's the timeout here.
270 0         0 my $probe_length = (head($current_dest->{path}))[1];
271 0 0       0 die "probe failed: $probe_length vs $eventual_length" if $probe_length != $eventual_length;
272             }
273             catch {
274 0         0 $fail_write_attempt->("HEAD check on newly written file failed: $_");
275 0         0 };
276             # No checksum to supply, but we have at least checked the length.
277             }
278             elsif ($checksum && $create_close_timed_out) {
279             try {
280 0         0 my $md5 = Digest::MD5->new();
281 0         0 my $req = HTTP::Request->new(GET => $current_dest->{path});
282 0         0 LWP::UserAgent->new->request($req, sub { $md5->add($_[0]) });
  0         0  
283              
284 0         0 my $hex_checked = $md5->hexdigest();
285 0 0       0 die "Got $hex_checked, expected $checksum" if "MD5:$hex_checked" ne $checksum;
286             }
287             catch {
288 0         0 $fail_write_attempt->("Cross network checksum failed: $_");
289 0         0 };
290 0         0 @cs = ( checksum => $checksum, checksumverify => 0 );
291             }
292             else {
293 0         0 @cs = ( checksum => $checksum, checksumverify => 1 );
294             }
295              
296 0 0       0 if (defined $last_error) {
297 0         0 next;
298             }
299              
300 0         0 my $rv;
301 0         0 my $ts_sent_create_close = [gettimeofday];
302             try {
303 0         0 $rv = $self->{backend}->do_request
304             ("create_close", {
305             fid => $current_dest->{fid},
306             devid => $current_dest->{devid},
307             domain => $self->{domain},
308             size => $eventual_length,
309             key => $key,
310             path => $current_dest->{path},
311             @cs,
312             });
313             }
314             catch {
315 0         0 warn "create_close exploded: $_";
316 0 0       0 $self->{backend}->force_disconnect() if $self->{backend}->can('force_disconnect');
317 0         0 };
318              
319             # TODO we used to have a file check to query the size of the
320             # file which we just uploaded to MogileFS.
321              
322 0 0 0     0 if ($rv) {
    0 0        
323 0         0 $self->run_hook('store_file_end', $self, $key, $class, $opts);
324 0         0 return $eventual_length;
325             }
326             elsif (!$create_close_timed_out && $checksum && tv_interval($ts_sent_create_close) >= $self->{backend}->{timeout}) {
327 0         0 @dests = ();
328 0         0 $create_close_timed_out = 1;
329 0         0 $fail_write_attempt->("create_close failed, possibly timed out checksumming");
330             }
331             else {
332             # create_close may explode due to a back checksum,
333             # or a network error sending the acknowledgement of
334             # a successfuly upload. To handle this. if
335             # create_close fails we always retry with a new
336             # create_open to get a new FID.
337 0         0 @dests = ();
338 0         0 $fail_write_attempt->("create_close failed");
339             }
340             }
341             else {
342 0         0 $fail_write_attempt->("Got non-200 from remote server $top");
343 0         0 next;
344             }
345             }
346             elsif ($last_written_point == $available_to_read) {
347 1         10 return;
348             }
349             }
350              
351 1         309 die "Mogile write failed: $last_error";
352 1         14 };
353             }
354              
355             sub store_file {
356 0     0 1   my ($self, $key, $class, $fn, $opts) = @_;
357              
358 0 0         if (ref($fn)) {
359 0           warn "not scalar!";
360 0           return $self->SUPER::store_file($key, $class, $fn, $opts);
361             }
362              
363 0 0         open(my $fh, "<", $fn) or die "could not open '$fn': $!";
364              
365 0           my $file_length = -s $fh;
366              
367 0           my $cb = $self->store_file_from_fh(
368             $key, $class, $fh, $file_length, $opts
369             );
370              
371 0 0         open(my $checksum, "-|", "md5sum", "-b", "--", $fn) or die "could not fork off md5sum: $!";
372 0           $cb->($file_length, 0);
373 0           my $line = <$checksum>;
374 0 0         close($checksum) or die "could not finish checksum: $!";
375              
376 0 0         $line =~ /^([0-9a-f]{32})/ or die "could not find checksum";
377              
378 0           $cb->($file_length, 1, "MD5:$1");
379              
380 0           return $file_length;
381             }
382              
383             1;
384