File Coverage

lib/Parallel/WorkUnit.pm
Criterion Covered Total %
statement 342 471 72.6
branch 126 216 58.3
condition 11 21 52.3
subroutine 49 54 90.7
pod 11 11 100.0
total 539 773 69.7


line stmt bran cond sub pod time code
1             #
2             # Copyright (C) 2015-2023 Joelle Maslak
3             # All Rights Reserved - See License
4             #
5              
6             package Parallel::WorkUnit;
7             $Parallel::WorkUnit::VERSION = '2.232041';
8 228     228   49225983 use v5.8;
  228         2168  
9              
10             # ABSTRACT: Provide multi-paradigm forking with ability to pass back data
11              
12 228     228   1287 use strict;
  228         529  
  228         4562  
13 228     228   1023 use warnings;
  228         826  
  228         5697  
14 228     228   1912 use autodie;
  228         15758  
  228         1698  
15              
16 228     228   1273683 use Carp;
  228         735  
  228         16352  
17              
18 228     228   1510 use overload;
  228         457  
  228         1816  
19 228     228   158693 use IO::Handle;
  228         1498831  
  228         10919  
20 228     228   124847 use IO::Pipely qw(pipely);
  228         4251418  
  228         13697  
21 228     228   113446 use IO::Select;
  228         391784  
  228         12830  
22 228     228   1740 use POSIX ':sys_wait_h';
  228         991  
  228         3286  
23 228     228   412150 use Scalar::Util qw(blessed reftype weaken);
  228         463  
  228         14946  
24 228     228   164969 use Storable;
  228         741333  
  228         14103  
25 228     228   124567 use Try::Tiny;
  228         489903  
  228         1380764  
26              
27              
28             my @ALL_WU; # Holds all active work units so child processes can't
29             # mess with parent work units
30             # Note it holds a reference (strong) to a reference
31             # (weak).
32              
33              
34             sub use_anyevent {
35 4437 50   4437 1 34033 if ( $#_ == 0 ) {
    0          
36 4437         25739 return shift->{use_anyevent};
37             } elsif ( $#_ == 1 ) {
38 0         0 my ( $self, $val ) = @_;
39              
40 0         0 my ($old_val) = $self->{use_anyevent};
41 0         0 $self->{use_anyevent} = $val;
42              
43             # Trigger
44 0         0 $self->_set_anyevent( $val, $old_val );
45              
46 0         0 return $val;
47             } else {
48 0         0 confess("Invalid call");
49             }
50             }
51              
52              
53             # XXX: Add validation that _cv is a Maybe[AnyEvent::CondVar]
54             sub _cv {
55 3198 100   3198   16940 if ( $#_ == 0 ) {
    50          
56 2980         12016 return shift->{_cv};
57             } elsif ( $#_ == 1 ) {
58 218         3519 my ( $self, $val ) = @_;
59 218         6753 $self->{_cv} = $val;
60 218         2933 return $val;
61             } else {
62 0         0 confess("Invalid call");
63             }
64             }
65              
66             # XXX: Add validation that _last_error is a Maybe[Str]
67             sub _last_error {
68 10303 100   10303   32891 if ( $#_ == 0 ) {
    50          
69 10085         37921 return shift->{_last_error};
70             } elsif ( $#_ == 1 ) {
71 218         2788 my ( $self, $val ) = @_;
72 218         5342 $self->{_last_error} = $val;
73 218         2816 return $val;
74             } else {
75 0         0 confess("Invalid call");
76             }
77             }
78              
79             # XXX: Add validation that _ordered_count is a non-negative integer
80             sub _ordered_count {
81 5579 100   5579   21644 if ( $#_ == 0 ) {
    50          
82 2487         5048 my $self = shift;
83              
84             # Initialize
85 2487 100       13604 if ( !exists( $self->{_ordered_count} ) ) { $self->{_ordered_count} = 0; }
  136         465  
86              
87 2487         8215 return $self->{_ordered_count};
88             } elsif ( $#_ == 1 ) {
89 3092         8382 my ( $self, $val ) = @_;
90 3092         7635 $self->{_ordered_count} = $val;
91 3092         10451 return $val;
92             } else {
93 0         0 confess("Invalid call");
94             }
95             }
96              
97             # XXX: Add validation that _ordered_responses is an ArrayRef
98             sub _ordered_responses {
99 2923 100   2923   13476 if ( $#_ == 0 ) {
    50          
100 2318         5539 my $self = shift;
101              
102             # Initialize
103 2318 100       8702 if ( !exists( $self->{_ordered_responses} ) ) { $self->{_ordered_responses} = []; }
  164         2559  
104              
105 2318         18218 return $self->{_ordered_responses};
106             } elsif ( $#_ == 1 ) {
107 605         4818 my ( $self, $val ) = @_;
108 605         4981 $self->{_ordered_responses} = $val;
109 605         2615 return $val;
110             } else {
111 0         0 confess("Invalid call");
112             }
113             }
114              
115              
116             sub max_children {
117 8418 100   8418 1 109334 if ( $#_ == 0 ) {
    50          
118 7792         13491 my $self = shift;
119              
120 7792 100       18972 if ( !exists( $self->{max_children} ) ) { $self->{max_children} = 5; }
  61         305  
121              
122 7792         69227 return $self->{max_children};
123             } elsif ( $#_ == 1 ) {
124 626         1917 my ( $self, $val ) = @_;
125              
126             # Validate
127 626 100       2647 if ( defined($val) ) {
128 554 100       7651 if ( $val !~ m/^[0-9]+$/s ) {
129 92         14536 confess("max_children must be set to a positive integer");
130             }
131 462 100       1842 if ( $val <= 0 ) {
132 46         4508 confess("max_children must be set to a positive integer");
133             }
134             }
135              
136 488         1788 $self->{max_children} = $val;
137              
138             # Trigger
139 488         1997 $self->_start_queued_children();
140              
141 474         1613 return $val;
142             } else {
143 0         0 confess("Invalid call");
144             }
145             }
146              
147             # XXX: Add validation that _subprocs is a HashRef
148             sub _subprocs {
149 31944 100   31944   212747 if ( $#_ == 0 ) {
    50          
150 31726         99279 my $self = shift;
151              
152             # Initial value
153 31726 100       142102 if ( !exists( $self->{_subprocs} ) ) { $self->{_subprocs} = {}; }
  313         1424  
154              
155 31726         201114 return $self->{_subprocs};
156             } elsif ( $#_ == 1 ) {
157 218         2876 my ( $self, $val ) = @_;
158 218         21596 $self->{_subprocs} = $val;
159 218         4510 return $val;
160             } else {
161 0         0 confess("Invalid call");
162             }
163             }
164              
165             # XXX: Add validation that _count is a positive integer
166             # This only gets used on Win32.
167             sub _count {
168 218 50   218   6329 if ( $#_ == 0 ) {
    50          
169 0         0 my $self = shift;
170              
171             # Initial value
172 0 0       0 if ( !exists( $self->{_count} ) ) { $self->{_count} = 1; }
  0         0  
173              
174 0         0 return $self->{_count};
175             } elsif ( $#_ == 1 ) {
176 218         2445 my ( $self, $val ) = @_;
177 218         4113 $self->{_count} = $val;
178 218         2817 return $val;
179             } else {
180 0         0 confess("Invalid call");
181             }
182             }
183              
184             # XXX: Add validation that _parent_pid is a positive integer
185             # We also need to initialize always in the parent process.
186             sub _parent_pid {
187 325 50   325   2320 if ( $#_ == 0 ) {
    50          
188 0         0 return shift->{_parent_pid};
189             } elsif ( $#_ == 1 ) {
190 325         2306 my ( $self, $val ) = @_;
191 325         7731 $self->{_parent_pid} = $val;
192 325         848 return $val;
193             } else {
194 0         0 confess("Invalid call");
195             }
196             }
197              
198             # Children queued
199             # XXX: Add validation that _queued_cildren is an
200             # ArrayRef[ArrayRef[CodeRef]]
201             sub _queued_children {
202 16064 100   16064   58441 if ( $#_ == 0 ) {
    50          
203 15846         29384 my $self = shift;
204              
205 15846 100       42158 if ( !exists( $self->{_queued_children} ) ) { $self->{_queued_children} = []; }
  275         1334  
206              
207 15846         73705 return $self->{_queued_children};
208             } elsif ( $#_ == 1 ) {
209 218         2226 my ( $self, $val ) = @_;
210 218         5013 $self->{_queued_children} = $val;
211 218         2730 return $val;
212             } else {
213 0         0 confess("Invalid call");
214             }
215             }
216              
217              
218             sub new {
219 325     325 1 60445 my $class = shift;
220 325         887 my $self = {};
221 325         882 bless $self, $class;
222              
223             # Initialize parent PID
224 325         3640 $self->_parent_pid($$);
225              
226             # Make a weak reference and shove it into the ALL_WU array
227 325         599 my $weakself = $self;
228 325         2559 weaken $weakself;
229 325         922 push @ALL_WU, \$weakself;
230              
231             # Do some housekeeping on @ALL_WU, so it is somewhat bounded
232 325         832 @ALL_WU = grep { defined $$_ } @ALL_WU;
  471         1667  
233              
234             # Do we have any arguments?
235 325 100       1863 if (scalar(@_) > 0) {
236 92 100       644 my %args = (scalar(@_) == 1) ? %{shift()} : @_;
  46         184  
237              
238 92 50       276 if (exists $args{use_anyevent}) {
239 0         0 $self->use_anyevent($args{use_anyevent});
240             }
241 92 50       322 if (exists $args{max_children}) {
242 92         322 $self->max_children($args{max_children});
243             }
244             }
245              
246 325         1382 return $self;
247             }
248              
249              
250             sub async {
251 4250 50   4250 1 60453858 if ( $#_ < 1 ) { confess 'invalid call'; }
  0         0  
252 4250         10878 my $self = shift;
253 4250         7730 my $sub = shift;
254              
255             # Test $sub to make sure it is a code ref or a sub ref
256 4250 100       51940 if ( !_codelike($sub) ) {
257 7         728 confess("Parameter to async() is not a code (or codelike) reference");
258             }
259              
260 4243         13151 my $callback;
261 4243 100       14627 if ( scalar(@_) == 0 ) {
    50          
262             # No callback provided
263              
264 2487         8206 my $cbnum = $self->_ordered_count;
265 2487         9081 $self->_ordered_count( $cbnum + 1 );
266              
267             # We create a callback that populates the ordered responses
268 2487         4578 my $selfref = $self;
269 2487         11952 weaken $selfref;
270             $callback = sub {
271 1931 50   1931   5862 if ( defined $selfref ) { # In case this went away
272 1931         4843 @{ $selfref->_ordered_responses }[$cbnum] = shift;
  1931         6466  
273             }
274 2487         51299 };
275             } elsif ( scalar(@_) == 1 ) {
276             # Callback provided
277 1756         4330 $callback = shift;
278             } else {
279 0         0 confess 'invalid call';
280             }
281              
282             # If there are pending errors, throw that.
283 4243 50       21905 if ( defined( $self->_last_error ) ) { die( $self->_last_error ); }
  0         0  
284              
285 4243         94874 my $pipe = [ pipely() ];
286              
287 4243         989796 my $pid = fork();
288              
289 4243 100       7033971 if ($pid) {
290             # We are in the parent process
291              
292 4031         387820 $pipe = $pipe->[0];
293              
294 4031         478292 $self->_subprocs()->{$pid} = {
295             fh => $pipe,
296             watcher => undef,
297             callback => $callback,
298             caller => [ caller() ],
299             };
300              
301             # Set up anyevent listener if appropriate
302 4031 50       114410 if ( $self->use_anyevent() ) {
303 0         0 $self->_add_anyevent_watcher($pid);
304             }
305              
306 4031         171488 return $pid;
307              
308             } else {
309             # We are in the child process
310 212         29133 $pipe = $pipe->[1];
311              
312 212         14207 return $self->_child( $sub, $pipe );
313             }
314             }
315              
316              
317             sub asyncs {
318 79 50   79 1 957 if ( $#_ < 2 ) { confess 'invalid call'; }
  0         0  
319 79         264 my $self = shift;
320 79         210 my $children = shift;
321 79         237 my $sub = shift;
322 79 50       369 if ( scalar(@_) > 1 ) { confess("invalid call"); }
  0         0  
323              
324 79 50       1790 if ( $children !~ m/^[1-9][0-9]*$/s ) {
325 0         0 confess("Number of children must be a numeric value > 0");
326             }
327              
328 79         632 for ( my $i = 0; $i < $children; $i++ ) {
329 655     30   10806 $self->async( sub { return $sub->($i); }, @_ );
  30         279  
330             }
331              
332 49         1472 return $children;
333             }
334              
335             sub _child {
336 212 50   212   9758 if ( scalar(@_) != 3 ) { confess 'invalid call'; }
  0         0  
337 212         4143 my ( $self, $sub, $pipe ) = @_;
338              
339             # Cleanup ALL_WU
340 212         8248 @ALL_WU = grep { defined $$_ } @ALL_WU;
  263         9064  
341 212         2955 foreach my $wu ( map { $$_ } @ALL_WU ) {
  218         4787  
342 218         10425 $wu->_clear_all();
343             }
344              
345             try {
346 212     212   54981 my $result = $sub->();
347 205         3307043 $self->_send_result( $pipe, $result );
348             } catch {
349 7     7   3675 $self->_send_error( $pipe, $_ );
350 212         12682 };
351              
352 210         133635 exit();
353             }
354              
355              
356             sub waitall {
357 3367 50   3367 1 44421 if ( $#_ != 0 ) { confess 'invalid call'; }
  0         0  
358 3367         12860 my ($self) = @_;
359              
360             # No subprocs? Just return.
361 3367 100       6983 if ( scalar( keys %{ $self->_subprocs } ) == 0 ) {
  3367         15338  
362 387 50       6041 if ( $self->use_anyevent ) {
363 0         0 $self->_cv( AnyEvent->condvar );
364             }
365              
366 387         4683 return $self->_get_and_reset_ordered_responses();
367             }
368              
369             # Using cv?
370 2980 50       18896 if ( defined( $self->_cv ) ) {
371 0         0 $self->_cv->recv();
372 0 0       0 if ( defined( $self->_last_error ) ) {
373 0         0 my $err = $self->_last_error;
374 0         0 $self->_last_error(undef);
375              
376 0         0 die($err);
377             }
378              
379 0         0 return $self->_get_and_reset_ordered_responses();
380             }
381              
382             # Tail recursion
383 2980 50       11801 if ( $self->_waitone() ) { goto &waitall }
  2947         28816  
384              
385 0         0 return @{ $self->_get_and_reset_ordered_responses() };
  0         0  
386             }
387              
388             # Gets the _ordered_responses and returns the reference. Also
389             # resets the _ordered_responses and the _ordered_counts to an
390             # empty list and zero respectively.
391             sub _get_and_reset_ordered_responses {
392 387 50   387   3432 if ( $#_ != 0 ) { confess 'invalid call'; }
  0         0  
393 387         1511 my $self = shift;
394              
395 387         2688 my (@r) = @{ $self->_ordered_responses() };
  387         2861  
396              
397 387         3826 $self->_ordered_responses( [] );
398 387         3087 $self->_ordered_count(0);
399              
400 387         9895 return @r;
401             }
402              
403              
404             sub waitone {
405 372 50   372 1 179253 if ( $#_ != 0 ) { confess 'invalid call'; }
  0         0  
406 372         2560 my ($self) = @_;
407              
408 372         2768 my $rv = $self->_waitone();
409              
410             # Using AnyEvent?
411 368 50       8327 if ( defined( $self->_last_error ) ) {
412 0         0 my $err = $self->_last_error;
413 0         0 $self->_last_error(undef);
414              
415 0         0 die($err);
416             }
417              
418 368         5836 return $rv;
419             }
420              
421             # Meat of waitone (but doesn't handle returning an exception when using
422             # anyevent)
423             sub _waitone {
424 3352 50   3352   13500 if ( $#_ != 0 ) { confess 'invalid call'; }
  0         0  
425 3352         10777 my ($self) = @_;
426              
427 3352         9564 my $sp = $self->_subprocs();
428 3352         17130 weaken $sp; # To avoid some Windows warnings
429 3352 100       15570 if ( !keys(%$sp) ) { return; }
  8         124  
430              
431             # On everything but Windows
432             #
433 3344         72100 my $s = IO::Select->new();
434 3344         71103 foreach ( keys(%$sp) ) { $s->add( $sp->{$_}{fh} ); }
  14564         564435  
435              
436 3344         161695 my @ready = $s->can_read();
437              
438 3344         2772581 foreach my $fh (@ready) {
439 3344         16552 foreach my $child ( keys(%$sp) ) {
440 9036 50       105052 if ( defined( $fh->fileno() ) ) {
441 9036 100       64428 if ( $fh->fileno() == $sp->{$child}{fh}->fileno() ) {
442 3344         49820 $self->_read_result($child);
443              
444 3325         671383628 waitpid( $child, 0 );
445              
446             # Start queued children, if needed
447 3325         51495 $self->_start_queued_children();
448              
449 3307         100782 return 1; # We don't want to read more than one!
450             }
451             }
452             }
453             }
454              
455             # We should never get here
456 0         0 return;
457             }
458              
459              
460             ## no critic ('Subroutines::ProhibitBuiltinHomonyms')
461             sub wait {
462 8 50   8 1 3236 if ( $#_ != 1 ) { confess 'invalid call'; }
  0         0  
463 8         124 my ( $self, $pid ) = @_;
464              
465 8         221 my $rv = $self->_wait($pid);
466              
467 8 50       161 if ( defined( $self->_last_error ) ) {
468 0         0 my $err = $self->_last_error;
469 0         0 $self->_last_error(undef);
470              
471 0         0 die($err);
472             }
473              
474 8         95 return $rv;
475             }
476              
477             # Internal version that doesn't check for AnyEvent die needs
478             sub _wait {
479 8 50   8   209 if ( $#_ != 1 ) { confess 'invalid call'; }
  0         0  
480 8         136 my ( $self, $pid ) = @_;
481              
482 8 100       178 if ( !exists( $self->_subprocs()->{$pid} ) ) {
483              
484             # We don't warn/die because it's possible that there is
485             # a race between callback and here, in the main thread.
486 4         86 return;
487             }
488              
489 4         205 my $result = $self->_read_result($pid);
490              
491 4         3129661 waitpid( $pid, 0 );
492              
493 4         142 return $result;
494             }
495             ## use critic
496              
497              
498             sub count {
499 7292 50   7292 1 3699598 if ( $#_ != 0 ) { confess 'invalid call'; }
  0         0  
500 7292         26789 my ($self) = @_;
501              
502 7292         21697 my $sp = $self->_subprocs();
503 7292         118782 return scalar( keys %$sp );
504             }
505              
506              
507             sub queue {
508 2302 50   2302 1 60623541 if ( $#_ < 1 ) { confess 'invalid call'; }
  0         0  
509 2302         5440 my $self = shift;
510 2302         5045 my $sub = shift;
511              
512             # Test $sub to make sure it is a code ref or a sub ref
513 2302 100       17499 if ( !_codelike($sub) ) {
514 7         1365 confess("Parameter to queue() is not a code (or codelike) reference");
515             }
516              
517 2295         3954 my $callback;
518 2295 100       7498 if ( scalar(@_) == 0 ) {
    50          
519             # We're okay, don't need to do anything - no callback
520             } elsif ( scalar(@_) == 1 ) {
521             # We have a callback
522 1136         1754 $callback = shift;
523             } else {
524 0         0 confess 'invalid call';
525             }
526              
527             # If there are pending errors, throw that.
528 2295 50       7277 if ( defined( $self->_last_error ) ) { die( $self->_last_error ); }
  0         0  
529              
530 2295         4744 push @{ $self->_queued_children }, [ $sub, $callback ];
  2295         5176  
531 2295         7561 return $self->_start_queued_children();
532             }
533              
534             sub _send_result {
535 205 50   205   3124 if ( $#_ != 2 ) { confess 'invalid call'; }
  0         0  
536 205         1908 my ( $self, $fh, $msg ) = @_;
537              
538 205         3675 return $self->_send( $fh, 'RESULT', $msg );
539             }
540              
541             sub _send_error {
542 7 50   7   586 if ( $#_ != 2 ) { confess 'invalid call'; }
  0         0  
543 7         214 my ( $self, $fh, $err ) = @_;
544              
545 7         408 return $self->_send( $fh, 'ERROR', $err );
546             }
547              
548             sub _send {
549 212 50   212   4078 if ( $#_ != 3 ) { confess 'invalid call'; }
  0         0  
550 212         3549 my ( $self, $fh, $type, $data ) = @_;
551              
552 212         1581 my $msg;
553 212 100 100     6592 if ( blessed($data) && ($data->can('FREEZE')) && ($data->can('THAW')) ) {
      66        
554 1         29 $msg = ref($data) . "!::!" . $data->FREEZE();
555             } else {
556 211         7834 $msg = "!::!" . Storable::freeze( \$data );
557             }
558              
559 210 50       100851 if ( !defined($msg) ) {
560 0         0 die 'freeze() returned undef for child return value';
561             }
562              
563 210         18431 $fh->write($type);
564 210         18890 $fh->write("\n");
565              
566 210         7143 $fh->write( length($msg) );
567 210         6478 $fh->write("\n");
568              
569 210         10103 binmode( $fh, ':raw' );
570              
571 210         139409 $fh->write($msg);
572              
573 210         48059 $fh->close();
574 210         8898 return;
575             }
576              
577             sub _read_result {
578 3348 50   3348   13813 if ( $#_ != 1 ) { confess 'invalid call'; }
  0         0  
579 3348         11937 my ( $self, $child ) = @_;
580              
581 3348         10818 my $cinfo = $self->_subprocs()->{$child};
582 3348 50       12430 if (defined($cinfo->{rawbuff})) {
583 0         0 return $self->_read_result_from_buffer($child);
584             } else {
585 3348         19081 return $self->_read_result_from_fh($child);
586             }
587             }
588              
589             sub _read_result_from_buffer {
590 0 0   0   0 if ( $#_ != 1 ) { confess 'invalid call'; }
  0         0  
591 0         0 my ( $self, $child ) = @_;
592              
593 0         0 my $cinfo = $self->_subprocs()->{$child};
594 0         0 $cinfo->{fh}->close();
595              
596 0         0 my ($type, $size, $buffer) = split /\n/, $cinfo->{rawbuff}, 3;
597 0         0 delete $cinfo->{rawbuff};
598              
599 0 0       0 if ( !defined($type) ) { die 'Could not read child data'; }
  0         0  
600 0 0       0 if ( !defined($size) ) { die 'Could not read child data'; }
  0         0  
601              
602 0         0 my ($class, $frozen) = split("!::!", $buffer, 2);
603 0         0 my $data;
604 0 0       0 if ($class eq "") {
605 0         0 $data = ${ Storable::thaw($frozen) };
  0         0  
606             } else {
607 0         0 $data = $class->THAW($frozen);
608             }
609              
610              
611 0         0 my $caller = $self->_subprocs()->{$child}{caller};
612 0         0 delete $self->_subprocs()->{$child};
613              
614 0 0       0 if ( $type eq 'RESULT' ) {
615 0         0 $cinfo->{callback}->($data);
616             } else {
617 0         0 my $err =
618             "Child (created at "
619             . $caller->[1]
620             . " line "
621             . $caller->[2]
622             . ") died with error: $data";
623              
624 0 0       0 if ( $self->use_anyevent ) {
625             # Can't throw events with anyevent
626 0         0 $self->_last_error($err);
627             } else {
628             # Otherwise we do throw it
629 0         0 die($err);
630             }
631             }
632              
633 0         0 return;
634             }
635              
636             sub _read_result_from_fh {
637 3348 50   3348   13776 if ( $#_ != 1 ) { confess 'invalid call'; }
  0         0  
638 3348         9141 my ( $self, $child ) = @_;
639              
640 3348         8313 my $cinfo = $self->_subprocs()->{$child};
641 3348         8299 my $fh = $cinfo->{fh};
642              
643 3348         125474 my $type = <$fh>;
644 3348 50       27739 if ( !defined($type) ) { die 'Could not read child data'; }
  0         0  
645 3348         19363 chomp($type);
646              
647 3348         12507 my $size = <$fh>;
648 3348         12412 chomp($size);
649              
650 3348         31958 binmode($fh);
651              
652 3348         675540 my $result = '';
653              
654 3348         11200 my $ret = 1;
655 3348   66     55022 while ( defined($ret) && ( length($result) < $size ) ) {
656 3348         17079 my $s = $size - length($result);
657              
658 3348         9906 my $part = '';
659 3348         26002 $ret = $fh->read( $part, $s );
660 3348 50       238864 if ( defined($ret) ) { $result .= $part; }
  3348         91782  
661             }
662              
663 3348         91492 my ($class, $frozen) = split("!::!", $result, 2);
664 3348         9070 my $data;
665 3348 100       12146 if ($class eq "") {
666 3347         7044 $data = ${ Storable::thaw($frozen) };
  3347         27109  
667             } else {
668 1         34 $data = $class->THAW($frozen);
669             }
670              
671 3348         558277 my $caller = $self->_subprocs()->{$child}{caller};
672 3348         9456 delete $self->_subprocs()->{$child};
673 3348         23599 $fh->close();
674              
675 3348 100       103326 if ( $type eq 'RESULT' ) {
676 3329         18282 $cinfo->{callback}->($data);
677             } else {
678 19         918 my $err =
679             "Child (created at "
680             . $caller->[1]
681             . " line "
682             . $caller->[2]
683             . ") died with error: $data";
684              
685 19 50       707 if ( $self->use_anyevent ) {
686             # Can't throw events with anyevent
687 0         0 $self->_last_error($err);
688             } else {
689             # Otherwise we do throw it
690 19         1329 die($err);
691             }
692             }
693              
694 3329         66980 return;
695             }
696              
697             # Start queued children, if possible.
698             # Returns 1 if children were started, undef otherwise
699             sub _start_queued_children {
700 6108 50   6108   40519 if ( $#_ != 0 ) { confess 'invalid call' }
  0         0  
701 6108         19746 my ($self) = @_;
702              
703 6108 100       18742 if ( !( @{ $self->_queued_children } ) ) { return; }
  6108         22253  
  2937         7267  
704 3171 50       12480 if ( defined( $self->_last_error ) ) { return; } # Do not queue if there are errors
  0         0  
705              
706             # Can we start a queued process?
707 3171         6408 while ( scalar @{ $self->_queued_children } ) {
  5260         49188  
708 4011 100 100     15514 if ( ( !defined( $self->max_children ) ) || ( $self->count < $self->max_children ) ) {
709             # Start queued child
710 2183         4861 my $ele = shift @{ $self->_queued_children };
  2183         6169  
711 2183 100       8552 if ( !defined( $ele->[1] ) ) {
712 1103         4719 $self->async( $ele->[0] );
713             } else {
714 1080         5986 $self->async( $ele->[0], $ele->[1] );
715             }
716             } else {
717             # Can't unqueue
718 1828         12751 return;
719             }
720             }
721              
722             # We started at least one process
723 1249         43639 return 1;
724             }
725              
726             # Sets up AnyEvent or tears it down as needed
727             sub _set_anyevent {
728 0 0   0   0 if ( $#_ < 1 ) { confess 'invalid call' }
  0         0  
729 0 0       0 if ( $#_ > 2 ) { confess 'invalid call' }
  0         0  
730 0         0 my ( $self, $new, $old ) = @_;
731              
732 0 0 0     0 if ( ( !$old ) && $new ) {
    0 0        
733             # We are setting up AnyEvent
734 0         0 require AnyEvent;
735              
736 0 0       0 if ( defined( $self->_subprocs() ) ) {
737 0         0 foreach my $pid ( keys %{ $self->_subprocs() } ) {
  0         0  
738 0         0 $self->_add_anyevent_watcher($pid);
739             }
740             }
741              
742 0         0 $self->_cv( AnyEvent->condvar );
743              
744             } elsif ( $old && ( !$new ) ) {
745             # We are tearing down AnyEvent
746              
747 0 0       0 if ( defined( $self->_subprocs() ) ) {
748 0         0 foreach my $pid ( keys %{ $self->_subprocs() } ) {
  0         0  
749 0         0 my $proc = $self->_subprocs()->{$pid};
750              
751 0         0 $proc->{watcher} = undef;
752             }
753             }
754              
755 0         0 $self->_cv(undef);
756             }
757 0         0 return;
758             }
759              
760             # Sets up the listener for AnyEvent
761             sub _add_anyevent_watcher {
762 0 0   0   0 if ( $#_ != 1 ) { confess 'invalid call' }
  0         0  
763 0         0 my ( $self, $pid ) = @_;
764              
765 0         0 my $proc = $self->_subprocs()->{$pid};
766              
767             $proc->{watcher} = AnyEvent->io(
768             fh => $proc->{fh},
769             poll => 'r',
770             cb => sub {
771 0     0   0 $self->_wait($pid);
772 0 0       0 if ( scalar( keys %{ $self->_subprocs() } ) == 0 ) {
  0         0  
773 0         0 my $oldcv = $self->_cv;
774 0         0 $self->_cv( AnyEvent->condvar );
775 0         0 $oldcv->send();
776             }
777              
778             # Start queued children, if needed
779 0         0 $self->_start_queued_children();
780             },
781 0         0 );
782              
783 0         0 return;
784             }
785              
786             # Used to clear all sub-processes, etc, in child process.
787             sub _clear_all {
788 218 50   218   4945 if ( $#_ != 0 ) { confess 'invalid call' }
  0         0  
789 218         3018 my ( $self ) = @_;
790              
791 218         7471 $self->_cv(undef);
792 218         5442 $self->_last_error(undef);
793 218         4981 $self->_ordered_count(0);
794 218         4756 $self->_ordered_responses( [] );
795 218         3250 $self->_count(1);
796 218         4357 $self->_queued_children( [] );
797              
798 218         1160 do {
799             # Don't warn on AnyEvent in child threads being DESTROYed
800 218     0   17212 local $SIG{__WARN__} = sub { };
801 218         4730 $self->_subprocs( {} );
802             };
803              
804 218         2015 return;
805             }
806              
807              
808             sub start {
809 5 50   5 1 23716 if ( $#_ != 1 ) { confess 'invalid call'; }
  0         0  
810 5         20 my $self = shift;
811 5         85 my $sub = shift;
812              
813             # Test $sub to make sure it is a code ref or a sub ref
814 5 50       15 if ( !_codelike($sub) ) {
815 0         0 confess("Parameter to start() is not a code (or codelike) reference");
816             }
817              
818 5         39 my $pid = fork();
819              
820 5 100       11442 if ( !$pid ) {
821             # We are in the child process.
822 2         252 $sub->();
823 2         1572 exit();
824             }
825              
826 3         134 return;
827             }
828              
829             # Tests to see if something is codelike
830             #
831             # Borrowed from Params::Util (written by Adam Kennedy)
832             sub _codelike {
833 6557 50   6557   27679 if ( scalar(@_) != 1 ) { confess 'invalid call' }
  0         0  
834 6557         17848 my $thing = shift;
835              
836 6557 100       43932 if ( reftype($thing) ) { return 1; }
  6543         28368  
837 14 50 33     56 if ( blessed($thing) && overload::Method( $thing, '()' ) ) { return 1; }
  0         0  
838              
839 14         49 return;
840             }
841              
842             # Destructor emits warning if sub processes are running
843             sub DESTROY {
844 284     284   47374 my $self = shift;
845              
846 284 100       3756 if ( scalar( keys %{ $self->_subprocs } ) ) {
  284         1988  
847 1         111 warn "Warning: Subprocesses running when Parallel::WorkUnit object destroyed\n";
848             }
849              
850 284         56880 return;
851             }
852              
853             1;
854              
855             __END__