File Coverage

blib/lib/MogileFS/Client/Async.pm
Criterion Covered Total %
statement 39 196 19.9
branch 1 58 1.7
condition 0 23 0.0
subroutine 13 27 48.1
pod 5 6 83.3
total 58 310 18.7


line stmt bran cond sub pod time code
1             package MogileFS::Client::Async;
2 3     3   71240 use strict;
  3         17  
  3         84  
3 3     3   23 use warnings;
  3         11  
  3         77  
4 3     3   2875 use AnyEvent;
  3         16271  
  3         105  
5 3     3   1895 use AnyEvent::HTTP;
  3         80404  
  3         295  
6 3     3   2028 use AnyEvent::Socket;
  3         45949  
  3         368  
7 3     3   1111 use URI;
  3         11590  
  3         105  
8 3     3   20 use Carp qw/confess/;
  3         7  
  3         163  
9 3     3   1033 use POSIX qw( EAGAIN );
  3         12570  
  3         35  
10 3     3   2963 use Socket qw/ IPPROTO_TCP /;
  3         9  
  3         177  
11              
12 3     3   19 use base qw/ MogileFS::Client /;
  3         6  
  3         1859  
13              
14 3     3   175468 use IO::AIO qw/ fadvise /;
  3         12749  
  3         313  
15              
16 3 50   3   24 use constant TCP_CORK => ($^O eq "linux" ? 3 : 0); # XXX
  3         7  
  3         293  
17              
18             our $VERSION = '0.031';
19              
20             # ABSTRACT: A non-blocking MogileFS client
21              
22             =head1 NAME
23              
24             MogileFS:Client::Async
25              
26             =head1 SYNOPSIS
27              
28             my $mogfs = MogileFS::Client::Async->new( ... )
29              
30             $mogfs->read_to_file($key, $filename);
31              
32             $mogfs->store_file($key, $class, $filename, \%opts );
33              
34             $mogfs->store_content($key, $class, \$content, \%opts );
35              
36             =head1 DESCRIPTION
37              
38             This package provides replacement implementations of some methods in
39             L to allow for non-blocking IO under L and the
40             ability to read and write files stored in MogileFS without needing to store
41             the entire file in memory.
42              
43             =head1 SEE ALSO
44              
45             =over
46              
47             =item *
48             L
49              
50             =item *
51             L
52              
53             =item *
54             L
55              
56             =back
57              
58             =cut
59              
60 3     3   1438 use namespace::clean;
  3         39249  
  3         19  
61              
62 0     0 1   sub new_file { confess("new_file is unsupported in " . __PACKAGE__) }
63 0     0 1   sub edit_file { confess("edit_file is unsupported in " . __PACKAGE__) }
64 0     0 1   sub read_file { confess("read_file is unsupported in " . __PACKAGE__) }
65              
66             sub read_to_file {
67 0     0 0   my $self = shift;
68 0           my $key = shift;
69 0           my $fn = shift;
70              
71 0           my @paths = $self->get_paths($key);
72              
73 0 0         die("No paths for $key") unless @paths;
74              
75 0           for (1..2) {
76 0           foreach my $path (@paths) {
77 0           my ($bytes, $write) = (0, undef);
78 0 0         open $write, '>', $fn or confess("Could not open $fn to write");
79              
80 0           my $cv = AnyEvent->condvar;
81 0           my $h;
82             my $guard = http_request
83             GET => $path,
84             timeout => 120, # 2m
85             on_header => sub {
86 0     0     my ($headers) = @_;
87 0 0         return 0 if ($headers->{Status} != 200);
88 0           $h = $headers;
89 0           1;
90             },
91             on_body => sub {
92 0 0   0     syswrite $write, $_[0] or return 0;
93 0           $bytes += length($_[0]);
94 0           1;
95             },
96             sub { # On complete!
97 0     0     my (undef, $headers) = @_;
98 0           $h = $headers;
99 0           close($write);
100 0           undef $write;
101 0           $cv->send;
102 0           1;
103 0           };
104 0           $cv->recv;
105 0 0 0       return $bytes if ($bytes && !$write);
106             # Error..
107 0           $h->{Code} = 590;
108 0           $h->{Reason} = "Unknown error";
109 0           warn("HTTP error getting mogile $key: " . $h->{Reason} . "\n");
110 0           close $write;
111 0           unlink $fn;
112             }
113             }
114 0           confess("Could not read $key from mogile");
115             }
116              
117             sub store_file {
118 0     0 1   my $self = shift;
119 0 0         return undef if $self->{readonly};
120              
121 0           my ($key, $class, $file, $opts) = @_;
122 0   0       $opts ||= {};
123              
124             # Extra args to be passed along with the create_open and create_close commands.
125             # Any internally generated args of the same name will overwrite supplied ones in
126             # these hashes.
127 0   0       my $create_open_args = $opts->{create_open_args} || {};
128 0   0       my $create_close_args = $opts->{create_close_args} || {};
129              
130 0           $self->run_hook('store_file_start', $self, $key, $class, $opts);
131 0           $self->run_hook('new_file_start', $self, $key, $class, $opts);
132              
133             my $res = $self->{backend}->do_request(
134             create_open => {
135             %$create_open_args,
136             domain => $self->{domain},
137             class => $class,
138             key => $key,
139 0 0 0       fid => $opts->{fid} || 0, # fid should be specified, or pass 0 meaning to auto-generate one
140             multi_dest => 1,
141             }
142             ) or return undef;
143              
144 0           my $dests = []; # [ [devid,path], [devid,path], ... ]
145              
146             # determine old vs. new format to populate destinations
147 0 0         unless (exists $res->{dev_count}) {
148 0           push @$dests, [ $res->{devid}, $res->{path} ];
149             } else {
150 0           for my $i (1..$res->{dev_count}) {
151 0           push @$dests, [ $res->{"devid_$i"}, $res->{"path_$i"} ];
152             }
153             }
154              
155 0           my ($length, $error, $devid, $path);
156 0           my @dests = (@$dests, @$dests, @$dests); # 2 retries
157 0           my $try = 0;
158 0           foreach my $dest (@dests) {
159 0           $try++;
160 0           ($devid, $path) = @$dest;
161 0           my $uri = URI->new($path);
162 0           my $cv = AnyEvent->condvar;
163 0           my ($socket_guard, $socket_fh);
164             $socket_guard = tcp_connect $uri->host, $uri->port, sub {
165 0     0     my ($fh, $host, $port) = @_;
166 0           $error = $!;
167 0 0         if (!$fh) {
168 0           $cv->send;
169 0           return;
170             }
171 0           $socket_fh = $fh;
172 0 0         setsockopt($socket_fh, IPPROTO_TCP, TCP_CORK, 1) or warn "could not set TCP_CORK" if TCP_CORK;
173 0           $cv->send;
174 0     0     }, sub { 10 };
  0            
175 0           $cv->recv;
176 0 0         if (! $socket_fh) {
177 0   0       $error ||= 'unknown error';
178 0           warn("Connection error: $error to $path");
179 0           next;
180             }
181 0           undef $error;
182             # We are connected!
183 0 0         open my $fh_from, "<", $file or confess("Could not open $file");
184              
185             # Hint to Linux that doubling readahead will probably pay off.
186 0           fadvise($fh_from, 0, 0, IO::AIO::FADV_SEQUENTIAL());
187              
188 0           $length = -s $file;
189 0           my $buf = 'PUT ' . $uri->path . " HTTP/1.0\r\nConnection: close\r\nContent-Length: $length\r\n\r\n";
190 0           $cv = AnyEvent->condvar;
191 0           my $w;
192             my $timeout;
193             my $reset_timer = sub {
194 0     0     my ($type, $time) = @_;
195 0   0       $type ||= 'unknown';
196 0   0       $time ||= 60;
197 0           my $start = time();
198             $timeout = AnyEvent->timer(
199             after => $time,
200             cb => sub {
201 0           undef $w;
202 0           my $took = time() - $start;
203 0           $error = "Connection timed out duing data transfer of type $type (after $took seconds)";
204 0           $cv->send;
205             },
206 0           );
207 0           };
208             $w = AnyEvent->io( fh => $socket_fh, poll => 'w', cb => sub {
209 0     0     $reset_timer->('read');
210 0 0         if (!length($buf)) {
211 0           my $bytes = sysread $fh_from, $buf, '4096';
212 0           $reset_timer->('write');
213 0 0         if (!defined $bytes) { # Error, read FH blocking, no need to check EAGAIN
214 0           $error = $!;
215 0           $cv->send;
216 0           return;
217             }
218 0 0         if (0 == $bytes) { # EOF reading, and we already wrote everything
219 0           $cv->send;
220 0           return;
221             }
222             }
223 0           my $len = syswrite $socket_fh, $buf;
224 0           $reset_timer->('loop');
225 0 0 0       if ($len && $len > 0) {
226 0           $buf = substr $buf, $len;
227             }
228 0 0 0       if (!defined $len && $! != EAGAIN) { # Error, we could get EAGAIN as write sock non-blocking
229 0           $error = $!;
230 0           $cv->send;
231 0           return;
232             }
233 0           });
234 0           $reset_timer->('start PUT');
235 0           $cv->recv;
236 0 0         setsockopt($socket_fh, IPPROTO_TCP, TCP_CORK, 0) or warn "could not unset TCP_CORK" if TCP_CORK;
237 0 0         shutdown($socket_fh, 1) or warn "could not shutdown our socket: $!";
238 0           $cv = AnyEvent->condvar;
239             # FIXME - Cheat here, the response should be small, so we assume it'll allways all be
240             # readable at once, THIS MAY NOT BE TRUE!!!
241             $w = AnyEvent->io( fh => $socket_fh, poll => 'r', cb => sub {
242 0     0     undef $timeout;
243 0           undef $w;
244 0           $cv->send;
245 0           my $buf;
246 0           do {
247 0 0         if ($socket_fh->eof) {
248 0           $error = "Connection closed unexpectedly without response";
249 0           return;
250             }
251 0           my $res; $socket_fh->read($res, 4096); $buf .= $res;
  0            
  0            
252             } while (!length($buf));
253 0           my ($top, @headers) = split /\r?\n/, $buf;
254 0 0         if ($top =~ m{HTTP/1.[01]\s+2\d\d}) {
255             # Woo, 200!
256 0           undef $error;
257             }
258             else {
259 0           $error = "Got non-200 from remote server $top";
260             }
261 0           });
262 0           $reset_timer->('response', 1200); # Wait up to 20m, as lighty
263             # may have to copy the file between
264             # disks. EWWWW
265 0           $cv->recv;
266 0           undef $timeout;
267 0 0         if ($error) {
268 0           warn("Error sending data (try $try) to $uri: $error");
269 0           next; # Retry
270             }
271 0           last; # Success
272             }
273 0 0         die("Could not write to any mogile hosts, should have tried " . scalar(@$dests) . " did try $try")
274             if $error;
275              
276 0           $self->run_hook('new_file_end', $self, $key, $class, $opts);
277              
278             my $rv = $self->{backend}->do_request
279             ("create_close", {
280             fid => $res->{fid},
281             devid => $devid,
282             domain => $self->{domain},
283 0           size => $length,
284             key => $key,
285             path => $path,
286             });
287              
288 0 0         unless ($rv) {
289 0           die "$self->{backend}->{lasterr}: $self->{backend}->{lasterrstr}";
290 0           return undef;
291             }
292              
293 0           $self->run_hook('store_file_end', $self, $key, $class, $opts);
294              
295 0           return $length;
296             }
297              
298             sub store_content {
299 0     0 1   my MogileFS::Client $self = shift;
300 0 0         return undef if $self->{readonly};
301              
302 0           my($key, $class, $content, $opts) = @_;
303              
304 0           $self->run_hook('store_content_start', $self, $key, $class, $opts);
305              
306 0 0         my $fh = $self->new_file($key, $class, undef, $opts) or return;
307 0 0         $content = ref($content) eq 'SCALAR' ? $$content : $content;
308 0           $fh->print($content);
309              
310 0           $self->run_hook('store_content_end', $self, $key, $class, $opts);
311              
312 0 0         $fh->close or return;
313 0           length($content);
314             }
315              
316             1;
317