File Coverage

lib/AnyEvent/Worker.pm
Criterion Covered Total %
statement 133 213 62.4
branch 29 94 30.8
condition 9 38 23.6
subroutine 20 27 74.0
pod 4 7 57.1
total 195 379 51.4


line stmt bran cond sub pod time code
1             package AnyEvent::Worker;
2              
3 3     3   196901 use 5.006;
  3         11  
  3         122  
4 3     3   2641 use common::sense 2;m{
  3         107  
  3         19  
5             use warnings;
6             use strict;
7             }x;
8              
9             =head1 NAME
10              
11             AnyEvent::Worker - Manage blocking task in external process
12              
13             =head1 SYNOPSIS
14              
15             use AnyEvent 5;
16             use AnyEvent::Worker;
17            
18             my $worker1 = AnyEvent::Worker->new( [ 'Actual::Worker::Class' => @init_args ] );
19             my $worker2 = AnyEvent::Worker->new( sub { return "Cb 1 @_"; } );
20             my $worker3 = AnyEvent::Worker->new( {
21             class => 'Actual::Worker::Class2',
22             new => 'create', # alternative constructor
23             args => [qw(arg1 arg2)],
24             } );
25            
26             # Invoke method `test' on Actual::Worker::Class with arguments @args
27             $worker1->do( test => @args , sub {
28             return warn "Request died: $@" if $@;
29             warn "Received response: @_";
30             });
31            
32             # Just call callback, passed to worker2 with arguments @args
33             $worker2->do( @args , sub {
34             return warn "Request died: $@" if $@;
35             warn "Received response: @_";
36             });
37              
38             =head1 CONSTRUCTOR
39              
40             =head2 new $cb->($req), %args
41              
42             Simple stateless worker. On any C a sub sill be invoked with C arguments
43              
44             =head2 new [ Class => @new_args ], %args
45              
46             Stateful, object-based worker. After fork, Class will we Cd, then instantiated with new(@new_args).
47              
48             First argument to C will be interpreted as object method, rest -- as method arguments.
49              
50             =head2 new { class => 'Class', args => \@new_args, new => 'constructor_method' }, %args
51              
52             Same as previous, but allow to pass optional constructor name in C arg
53              
54             =head2 $args{on_error} = $cb->($worker,$error,$fatal,$file,$line)
55              
56             When an unexpected error occurs (for ex: child process exited or killed) C callback will be invoked
57              
58             =head1 METHODS
59              
60             =head2 do @args, $cb->($res)
61              
62             Only for stateless worker.
63              
64             =head2 do method => @args, $cb->($res)
65              
66             Only for stateful worker.
67              
68             =cut
69              
70 3     3   274 use Carp;
  3         10  
  3         183  
71 3     3   934 use Socket ();
  3         3850  
  3         74  
72 3     3   17 use Scalar::Util ();
  3         5  
  3         46  
73 3     3   3317 use Storable ();
  3         11390  
  3         77  
74              
75 3     3   1738 use AnyEvent ();
  3         6264  
  3         60  
76 3     3   1005 use AnyEvent::Util ();
  3         8797  
  3         54  
77              
78 3     3   18 use Errno ();
  3         8  
  3         9016  
79 3     3   35 use Fcntl ();
  3         5  
  3         47  
80 3     3   1053 use POSIX ();
  3         9081  
  3         11739  
81              
82             our $VERSION = '0.06';
83             our $FD_MAX = eval { POSIX::sysconf (&POSIX::_SC_OPEN_MAX) - 1 } || 1023;
84              
85             # Almost fully derived from AnyEvent::DBI
86              
87             our $WORKER;
88              
89             sub serve_fh($$) {
90 0     0 0 0 my ($fh, $version) = @_;
91              
92 0 0       0 if ($VERSION != $version) {
93 0         0 syswrite $fh,
94             pack "L/a*",
95             Storable::freeze
96             [undef, __PACKAGE__." version mismatch ($VERSION vs. $version)"];
97 0         0 return;
98             }
99            
100 0         0 eval {
101 0         0 my $rbuf;
102 0 0       0 my $name = ref $WORKER eq 'CODE' ? __PACKAGE__ : ref $WORKER;
103 0         0 $0 .= ' - '.$name;
104 0         0 my $O = $0;
105 0         0 my $N = 0;
106 0         0 while () {
107 0 0       0 sysread $fh, $rbuf, 16384, length $rbuf
108             or last;
109            
110 0         0 while () {
111 0         0 my $len = unpack "L", $rbuf;
112            
113             # full request available?
114 0 0 0     0 last unless $len && $len + 4 <= length $rbuf;
115            
116 0         0 my $req = Storable::thaw substr $rbuf, 4;
117 0         0 substr $rbuf, 0, $len + 4, ""; # remove length + request
118 0         0 local $@;
119 0         0 my $wbuf = eval {
120 0         0 ++$N;
121 0 0       0 if (ref $WORKER eq 'CODE') {
122 0         0 local $0 = "$O : request $N";
123 0         0 pack "L/a*", Storable::freeze [ 1, $WORKER->(@$req) ];
124             } else {
125 0         0 my $method = shift @$req;
126             #warn ">> request $method";
127 0         0 local $0 = "$O : request $N : $method";
128 0         0 pack "L/a*", Storable::freeze [ 1, $WORKER->$method(@$req) ];
129             }
130             };
131             # warn if $@;
132 0         0 $0 = "$O : idle";
133 0 0       0 $wbuf = pack "L/a*", Storable::freeze [ undef, ref $@ ? $@ : "$@" ]
    0          
134             if $@;
135            
136             #warn "<< response";
137 0         0 for (my $ofs = 0; $ofs < length $wbuf; ) {
138 0         0 my $wr = syswrite $fh, $wbuf, length($wbuf), $ofs;
139 0 0 0     0 defined $wr or $!{EINTR} or die "unable to write results: $!";
140 0         0 $ofs += $wr;
141             }
142             }
143             }
144             };
145 0 0       0 warn if $@;
146             }
147              
148             sub serve_fd($$) {
149 0 0   0 0 0 open my $fh, ">>&=$_[0]"
150             or die "Couldn't open server file descriptor: $!";
151              
152 0         0 serve_fh $fh, $_[1];
153             }
154              
155             # stupid Storable autoloading, total loss-loss situation
156             Storable::thaw Storable::freeze [];
157              
158             =head1 METHODS
159              
160             =over 4
161              
162             =cut
163              
164             sub new {
165 5     5 1 496 my ($class, $cb, %arg) = @_;
166            
167 5 50       69 my ($client, $server) = AnyEvent::Util::portable_socketpair
168             or croak "unable to create Anyevent::Worker communications pipe: $!";
169 5         451 binmode $client, ':raw';
170 5         18 binmode $server, ':raw';
171            
172 5         17 my $self = bless \%arg, $class;
173 5         43 $self->{fh} = $client;
174            
175 5         65 AnyEvent::Util::fh_nonblocking $client, 1;
176            
177 5         74 my $rbuf;
178 5         64 my @caller = (caller)[1,2]; # the "default" caller
179            
180             {
181 5         8 Scalar::Util::weaken (my $self = $self);
  5         23  
182            
183             $self->{rw} = AnyEvent->io (fh => $client, poll => "r", cb => sub {
184 6 50   6   3106945 return unless $self;
185            
186 6         86 $self->{last_activity} = AnyEvent->now;
187            
188 6         275 my $len = sysread $client, $rbuf, 65536, length $rbuf;
189            
190 6 100       40 if ($len > 0) {
    50          
    0          
191             # we received data, so reset the timer
192            
193 5         12 while () {
194 11         33 my $len = unpack "L", $rbuf;
195            
196             # full response available?
197 11 100 66     95 last unless $len && $len + 4 <= length $rbuf;
198 6         44 my $res = Storable::thaw substr $rbuf, 4;
199 6         151 substr $rbuf, 0, $len + 4, ""; # remove length + request
200            
201 6 50       19 last unless $self;
202 6         9 my $req = shift @{ $self->{queue} };
  6         23  
203            
204 6 100       18 if (defined $res->[0]) {
205 3         11 $res->[0] = $self;
206 3         18 $req->[0](@$res);
207             } else {
208 3         7 my $cb = shift @$req;
209             {
210 3         4 local $@ = $res->[1];
  3         7  
211 3         96 $@ =~ s{\n$}{};
212 3         14 $cb->($self);
213             }
214             }
215            
216             # no more queued requests, so become idle
217 6         75 undef $self->{last_activity}
218 6 100 66     10051 if $self && !@{ $self->{queue} };
219             }
220            
221             }
222             elsif (defined $len) {
223             # todo, caller?
224 1         13 $self->_error ("unexpected eof", @caller, 1);
225             }
226             elsif ($! != Errno::EAGAIN) {
227             # todo, caller?
228 0         0 $self->_error ("read error ".(0+$!).": $!", @caller, 1);
229             }
230 5         192 });
231            
232             $self->{tw_cb} = sub {
233 0 0 0 0   0 if ($self->{timeout} && $self->{last_activity}) {
234 0 0       0 if (AnyEvent->now > $self->{last_activity} + $self->{timeout}) {
235             # we did time out
236 0         0 my $req = $self->{queue}[0];
237 0         0 $self->_error (timeout => $req->[1], $req->[2], 1); # timeouts are always fatal
238             } else {
239             # we need to re-set the timeout watcher
240 0         0 $self->{tw} = AnyEvent->timer (
241             after => $self->{last_activity} + $self->{timeout} - AnyEvent->now,
242             cb => $self->{tw_cb},
243             );
244 0         0 Scalar::Util::weaken $self;
245             }
246             } else {
247             # no timeout check wanted, or idle
248 0         0 undef $self->{tw};
249             }
250 5         1349 };
251            
252             $self->{ww_cb} = sub {
253 0 0   0   0 return unless $self;
254            
255 0         0 $self->{last_activity} = AnyEvent->now;
256            
257 0 0       0 my $len = syswrite $client, $self->{wbuf}
258             or return delete $self->{ww};
259            
260 0         0 substr $self->{wbuf}, 0, $len, "";
261 5         92 };
262             }
263            
264 5         4769 my $pid = fork;
265            
266 5 50       357 if ($pid) {
    0          
267             # parent
268 5         292 close $server;
269             }
270             elsif (defined $pid) {
271             # child
272 0         0 $SIG{INT} = 'IGNORE';
273 0         0 my $serv_fno = fileno $server;
274            
275 0   0     0 ($_ != $serv_fno) && POSIX::close $_ for $^F+1..$FD_MAX;
276            
277 0 0       0 if (ref $cb eq 'CODE'){
    0          
    0          
278 0         0 $WORKER = $cb;
279             }
280             elsif ( ref $cb eq 'ARRAY') {
281 0         0 my ( $class,@args ) = @$cb;
282 0 0 0     0 eval qq{ use $class; 1 } or croak($@) unless $class->can('new');
283 0         0 $WORKER = $class->new(@args);
284             }
285             elsif ( ref $cb eq 'HASH') {
286 0 0       0 my $class = $cb->{class} or croak "You should define class to construct";
287 0   0     0 my $new = $cb->{new} || 'new';
288 0 0 0     0 eval qq{ use $class; 1 } or croak($@) unless $class->can($new);
289 0 0       0 $WORKER = $class->$new(@{ $cb->{args} || [] });
  0         0  
290             }
291             else {
292 0         0 croak "Bad argument: $cb";
293             }
294            
295 0         0 serve_fh $server, $VERSION;
296            
297             # no other way on the broken windows platform, even this leaks
298             # memory and might fail.
299 0         0 kill 9, $$ if AnyEvent::WIN32;
300            
301             # and this kills the parent process on windows
302 0         0 POSIX::_exit 0;
303             }
304             else {
305 0         0 croak "fork: $!";
306             }
307 5         660 $self->{child_pid} = $pid;
308 5         753 $self
309             }
310              
311             sub _server_pid {
312 0     0   0 shift->{child_pid}
313             }
314              
315             our %KIDW;
316             our %TERM;
317              
318             sub kill_child {
319 6     6 0 10 my $self = shift;
320 6         23 my $child_pid = delete $self->{child_pid};
321 6         10 my $GD = 0;
322             {
323 6 50   6   12 local $SIG{__WARN__} = sub { $GD = 1 if $_[0] =~ / during global destruction\.\s*$/ };
  6         77  
  6         58  
324 6         62 warn 'test';
325             }
326             #print STDERR "killing $child_pid / $GD\n";
327 6 100       218 if ($child_pid) {
328             # send SIGKILL in two seconds
329 5         24 $TERM{$child_pid}++;
330 5 50 33     247 kill 0 => $child_pid and
      33        
331             kill TERM => $child_pid or $!{ESRCH} or warn "kill $child_pid: $!";
332 5 50       18 return if $GD;
333             # MAYBE: kill timer
334             #my $murder_timer = AnyEvent->timer (
335             # after => 2,
336             # cb => sub {
337             # kill 9, $child_pid
338             # and delete $TERM{$child_pid};
339             # },
340             #);
341            
342             # reap process
343             #print STDERR "start reaper $child_pid\n";
344             $KIDW{$child_pid} = AnyEvent->child (
345             pid => $child_pid,
346             cb => sub {
347             # just hold on to this so it won't go away
348             #print STDERR "reaped $child_pid\n";
349 2     2   12740 delete $TERM{$child_pid};
350 2         10 delete $KIDW{$child_pid};
351             # cancel SIGKILL
352             #undef $murder_timer;
353             },
354 5         173 );
355            
356 5         322 close $self->{fh};
357             }
358             }
359             sub END {
360 3     3   629 my $GD = 0;
361             {
362 3 50       6 local $SIG{__WARN__} = sub { $GD = 1 if $_[0] =~ / during global destruction\.\s*$/ };
  3         27  
  3         36  
363 3         26 warn 'test';
364             }
365             #print STDERR "END $!/$? GD=$GD\n";
366 3         14 for (keys %TERM) {
367 3         1348 delete $KIDW{$_};
368             #print STDERR "END kill $_\n";
369 3 50       347 kill 0 => $_ and do {
370 3 50       82 kill KILL => $_ or warn "kill $_ failed: $!";
371             #print STDERR "END waitpid $_\n";
372 3         4926 my $wp = waitpid $_,0;
373             #print STDERR "END waitpid $_ = $wp\n";
374             };
375             #print STDERR "END $_ ($!/$?/${^CHILD_ERROR_NATIVE})\n";
376             }
377 3         12 undef $!;undef $?;
  3         89  
378             }
379              
380             sub DESTROY {
381 5     5   2357 shift->kill_child;
382             }
383              
384             sub _error {
385 1     1   8 my ($self, $error, $filename, $line, $fatal) = @_;
386 1         10 my $caller = '';
387 1         3 my @caller = ($filename,$line);
388 1 50       8 if ($fatal) {
389 1         3 delete $self->{tw};
390 1         17 delete $self->{rw};
391 1         22 delete $self->{ww};
392 1         3 delete $self->{fh};
393            
394             # for fatal errors call all enqueued callbacks with error
395 1         2 while (my $req = shift @{$self->{queue}}) {
  3         3904  
396 2 100       10 @caller = ($req->[1],$req->[2]) unless $caller;
397 2   66     20 $caller ||= " after $req->[1] line $req->[2],";
398 2         9 local $@ = "$error at $req->[1] line $req->[2].\n";
399 2         11 $req->[0]->($self);
400             }
401 1         6 $self->kill_child;
402             }
403            
404 1         3 local $@ = $error;
405            
406 1 50       7 if ($self->{on_error}) {
407 1         10 $self->{on_error}($self, $error, $fatal, @caller);
408             }
409             else {
410 0         0 my $e = "$error$caller";
411 0 0       0 if ($fatal) {
412 0         0 die "$e at $filename, line $line\n";
413             } else {
414 0         0 warn "$e at $filename, line $line\n";
415             }
416             }
417             }
418              
419             =item $worker->on_error ($cb->($worker, $filename, $line, $fatal))
420              
421             Sets (or clears, with C) the C handler.
422              
423             =cut
424              
425             sub on_error {
426 0     0 1 0 $_[0]{on_error} = $_[1];
427             }
428              
429             =item $worker->timeout ($seconds)
430              
431             Sets (or clears, with C) the database timeout. Useful to extend the
432             timeout when you are about to make a really long query.
433              
434             =cut
435              
436             sub timeout {
437 0     0 1 0 my ($self, $timeout) = @_;
438            
439 0         0 $self->{timeout} = $timeout;
440            
441             # reschedule timer if one was running
442 0         0 $self->{tw_cb}->();
443             }
444              
445             =item $worker->do ( @args, $cb->( $worker, @response ) )
446              
447             Executes worker code and execure the callback, when response is ready
448              
449             =cut
450              
451             sub do {
452 8     8 1 2252 my $self = shift;
453 8         16 my $cb = pop;
454 8         60 my ($filename,$line) = (caller)[1,2];
455            
456 8 50       116 unless ($self->{fh}) {
457 0         0 local $@ = my $err = 'no worker connection';
458 0         0 $cb->($self);
459 0         0 $self->_error ($err, $filename, $line, 1);
460 0         0 return;
461             }
462            
463 8         28 push @{ $self->{queue} }, [$cb, $filename, $line];
  8         52  
464            
465             # re-start timeout if necessary
466 8 50 33     39 if ($self->{timeout} && !$self->{tw}) {
467 0         0 $self->{last_activity} = AnyEvent->now;
468 0         0 $self->{tw_cb}->();
469             }
470            
471 8         91 $self->{wbuf} .= pack "L/a*", Storable::freeze \@_;
472            
473 8 50       514 unless ($self->{ww}) {
474 8         73 my $len = syswrite $self->{fh}, $self->{wbuf};
475 8         43 substr $self->{wbuf}, 0, $len, "";
476            
477             # still any left? then install a write watcher
478 8 50       70 $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $self->{ww_cb})
479             if length $self->{wbuf};
480             }
481             }
482              
483             =back
484              
485             =head1 AUTHOR
486              
487             Mons Anderson, C<< >>
488              
489             =head1 ACKNOWLEDGEMENTS
490              
491             This module based on Marc Lehmann's L
492              
493             Thanks to Vladimir Timofeev C<< >> for bugfixes and useful notes
494              
495             =cut
496              
497             1; # End of AnyEvent::Worker