File Coverage

lib/Parallel/WorkUnit.pm
Criterion Covered Total %
statement 342 471 72.6
branch 126 216 58.3
condition 11 21 52.3
subroutine 49 54 90.7
pod 11 11 100.0
total 539 773 69.7


line stmt bran cond sub pod time code
1             #
2             # Copyright (C) 2015-2023 Joelle Maslak
3             # All Rights Reserved - See License
4             #
5              
6             package Parallel::WorkUnit;
7             $Parallel::WorkUnit::VERSION = '2.232180';
8 228     228   49368468 use v5.8;
  228         3075  
9              
10             # ABSTRACT: Provide multi-paradigm forking with ability to pass back data
11              
12 228     228   1324 use strict;
  228         894  
  228         5055  
13 228     228   1053 use warnings;
  228         700  
  228         5866  
14 228     228   1916 use autodie;
  228         15755  
  228         2583  
15              
16 228     228   1303583 use Carp;
  228         747  
  228         15896  
17              
18 228     228   1679 use overload;
  228         460  
  228         1728  
19 228     228   155466 use IO::Handle;
  228         1495109  
  228         11636  
20 228     228   123718 use IO::Pipely qw(pipely);
  228         4243747  
  228         13727  
21 228     228   111769 use IO::Select;
  228         393329  
  228         12403  
22 228     228   3009 use POSIX ':sys_wait_h';
  228         725  
  228         3784  
23 228     228   414668 use Scalar::Util qw(blessed reftype weaken);
  228         546  
  228         13368  
24 228     228   160318 use Storable;
  228         754754  
  228         13460  
25 228     228   120790 use Try::Tiny;
  228         503091  
  228         1377181  
26              
27              
28             my @ALL_WU; # Holds all active work units so child processes can't
29             # mess with parent work units
30             # Note it holds a reference (strong) to a reference
31             # (weak).
32              
33              
34             sub use_anyevent {
35 4437 50   4437 1 26236 if ( $#_ == 0 ) {
    0          
36 4437         26834 return shift->{use_anyevent};
37             } elsif ( $#_ == 1 ) {
38 0         0 my ( $self, $val ) = @_;
39              
40 0         0 my ($old_val) = $self->{use_anyevent};
41 0         0 $self->{use_anyevent} = $val;
42              
43             # Trigger
44 0         0 $self->_set_anyevent( $val, $old_val );
45              
46 0         0 return $val;
47             } else {
48 0         0 confess("Invalid call");
49             }
50             }
51              
52              
53             # XXX: Add validation that _cv is a Maybe[AnyEvent::CondVar]
54             sub _cv {
55 3198 100   3198   15675 if ( $#_ == 0 ) {
    50          
56 2980         11352 return shift->{_cv};
57             } elsif ( $#_ == 1 ) {
58 218         3310 my ( $self, $val ) = @_;
59 218         6100 $self->{_cv} = $val;
60 218         2030 return $val;
61             } else {
62 0         0 confess("Invalid call");
63             }
64             }
65              
66             # XXX: Add validation that _last_error is a Maybe[Str]
67             sub _last_error {
68 10303 100   10303   32824 if ( $#_ == 0 ) {
    50          
69 10085         38701 return shift->{_last_error};
70             } elsif ( $#_ == 1 ) {
71 218         2363 my ( $self, $val ) = @_;
72 218         4527 $self->{_last_error} = $val;
73 218         2797 return $val;
74             } else {
75 0         0 confess("Invalid call");
76             }
77             }
78              
79             # XXX: Add validation that _ordered_count is a non-negative integer
80             sub _ordered_count {
81 5579 100   5579   18982 if ( $#_ == 0 ) {
    50          
82 2487         4372 my $self = shift;
83              
84             # Initialize
85 2487 100       12056 if ( !exists( $self->{_ordered_count} ) ) { $self->{_ordered_count} = 0; }
  136         657  
86              
87 2487         7176 return $self->{_ordered_count};
88             } elsif ( $#_ == 1 ) {
89 3092         9160 my ( $self, $val ) = @_;
90 3092         8107 $self->{_ordered_count} = $val;
91 3092         6235 return $val;
92             } else {
93 0         0 confess("Invalid call");
94             }
95             }
96              
97             # XXX: Add validation that _ordered_responses is an ArrayRef
98             sub _ordered_responses {
99 2923 100   2923   12319 if ( $#_ == 0 ) {
    50          
100 2318         4737 my $self = shift;
101              
102             # Initialize
103 2318 100       8339 if ( !exists( $self->{_ordered_responses} ) ) { $self->{_ordered_responses} = []; }
  164         1907  
104              
105 2318         14354 return $self->{_ordered_responses};
106             } elsif ( $#_ == 1 ) {
107 605         4166 my ( $self, $val ) = @_;
108 605         4786 $self->{_ordered_responses} = $val;
109 605         2091 return $val;
110             } else {
111 0         0 confess("Invalid call");
112             }
113             }
114              
115              
116             sub max_children {
117 8418 100   8418 1 127860 if ( $#_ == 0 ) {
    50          
118 7792         13282 my $self = shift;
119              
120 7792 100       19353 if ( !exists( $self->{max_children} ) ) { $self->{max_children} = 5; }
  61         225  
121              
122 7792         76297 return $self->{max_children};
123             } elsif ( $#_ == 1 ) {
124 626         1876 my ( $self, $val ) = @_;
125              
126             # Validate
127 626 100       2351 if ( defined($val) ) {
128 554 100       7133 if ( $val !~ m/^[0-9]+$/s ) {
129 92         13616 confess("max_children must be set to a positive integer");
130             }
131 462 100       1985 if ( $val <= 0 ) {
132 46         4600 confess("max_children must be set to a positive integer");
133             }
134             }
135              
136 488         1474 $self->{max_children} = $val;
137              
138             # Trigger
139 488         2074 $self->_start_queued_children();
140              
141 474         1948 return $val;
142             } else {
143 0         0 confess("Invalid call");
144             }
145             }
146              
147             # XXX: Add validation that _subprocs is a HashRef
148             sub _subprocs {
149 31944 100   31944   141388 if ( $#_ == 0 ) {
    50          
150 31726         90915 my $self = shift;
151              
152             # Initial value
153 31726 100       122527 if ( !exists( $self->{_subprocs} ) ) { $self->{_subprocs} = {}; }
  313         1504  
154              
155 31726         185825 return $self->{_subprocs};
156             } elsif ( $#_ == 1 ) {
157 218         2230 my ( $self, $val ) = @_;
158 218         21555 $self->{_subprocs} = $val;
159 218         4645 return $val;
160             } else {
161 0         0 confess("Invalid call");
162             }
163             }
164              
165             # XXX: Add validation that _count is a positive integer
166             # This only gets used on Win32.
167             sub _count {
168 218 50   218   5125 if ( $#_ == 0 ) {
    50          
169 0         0 my $self = shift;
170              
171             # Initial value
172 0 0       0 if ( !exists( $self->{_count} ) ) { $self->{_count} = 1; }
  0         0  
173              
174 0         0 return $self->{_count};
175             } elsif ( $#_ == 1 ) {
176 218         1979 my ( $self, $val ) = @_;
177 218         4150 $self->{_count} = $val;
178 218         2150 return $val;
179             } else {
180 0         0 confess("Invalid call");
181             }
182             }
183              
184             # XXX: Add validation that _parent_pid is a positive integer
185             # We also need to initialize always in the parent process.
186             sub _parent_pid {
187 325 50   325   2058 if ( $#_ == 0 ) {
    50          
188 0         0 return shift->{_parent_pid};
189             } elsif ( $#_ == 1 ) {
190 325         2213 my ( $self, $val ) = @_;
191 325         8052 $self->{_parent_pid} = $val;
192 325         1109 return $val;
193             } else {
194 0         0 confess("Invalid call");
195             }
196             }
197              
198             # Children queued
199             # XXX: Add validation that _queued_cildren is an
200             # ArrayRef[ArrayRef[CodeRef]]
201             sub _queued_children {
202 16064 100   16064   46997 if ( $#_ == 0 ) {
    50          
203 15846         35995 my $self = shift;
204              
205 15846 100       41032 if ( !exists( $self->{_queued_children} ) ) { $self->{_queued_children} = []; }
  275         2466  
206              
207 15846         66580 return $self->{_queued_children};
208             } elsif ( $#_ == 1 ) {
209 218         1873 my ( $self, $val ) = @_;
210 218         4261 $self->{_queued_children} = $val;
211 218         1925 return $val;
212             } else {
213 0         0 confess("Invalid call");
214             }
215             }
216              
217              
218             sub new {
219 325     325 1 63371 my $class = shift;
220 325         845 my $self = {};
221 325         918 bless $self, $class;
222              
223             # Initialize parent PID
224 325         1715 $self->_parent_pid($$);
225              
226             # Make a weak reference and shove it into the ALL_WU array
227 325         670 my $weakself = $self;
228 325         2315 weaken $weakself;
229 325         959 push @ALL_WU, \$weakself;
230              
231             # Do some housekeeping on @ALL_WU, so it is somewhat bounded
232 325         1102 @ALL_WU = grep { defined $$_ } @ALL_WU;
  471         1715  
233              
234             # Do we have any arguments?
235 325 100       1742 if (scalar(@_) > 0) {
236 92 100       920 my %args = (scalar(@_) == 1) ? %{shift()} : @_;
  46         230  
237              
238 92 50       368 if (exists $args{use_anyevent}) {
239 0         0 $self->use_anyevent($args{use_anyevent});
240             }
241 92 50       322 if (exists $args{max_children}) {
242 92         322 $self->max_children($args{max_children});
243             }
244             }
245              
246 325         1588 return $self;
247             }
248              
249              
250             sub async {
251 4250 50   4250 1 54854788 if ( $#_ < 1 ) { confess 'invalid call'; }
  0         0  
252 4250         13634 my $self = shift;
253 4250         7352 my $sub = shift;
254              
255             # Test $sub to make sure it is a code ref or a sub ref
256 4250 100       41791 if ( !_codelike($sub) ) {
257 7         707 confess("Parameter to async() is not a code (or codelike) reference");
258             }
259              
260 4243         8588 my $callback;
261 4243 100       16735 if ( scalar(@_) == 0 ) {
    50          
262             # No callback provided
263              
264 2487         10060 my $cbnum = $self->_ordered_count;
265 2487         8889 $self->_ordered_count( $cbnum + 1 );
266              
267             # We create a callback that populates the ordered responses
268 2487         4731 my $selfref = $self;
269 2487         15107 weaken $selfref;
270             $callback = sub {
271 1931 50   1931   7243 if ( defined $selfref ) { # In case this went away
272 1931         3319 @{ $selfref->_ordered_responses }[$cbnum] = shift;
  1931         5432  
273             }
274 2487         58869 };
275             } elsif ( scalar(@_) == 1 ) {
276             # Callback provided
277 1756         3782 $callback = shift;
278             } else {
279 0         0 confess 'invalid call';
280             }
281              
282             # If there are pending errors, throw that.
283 4243 50       16618 if ( defined( $self->_last_error ) ) { die( $self->_last_error ); }
  0         0  
284              
285 4243         80801 my $pipe = [ pipely() ];
286              
287 4243         1301880 my $pid = fork();
288              
289 4243 100       6812721 if ($pid) {
290             # We are in the parent process
291              
292 4031         377316 $pipe = $pipe->[0];
293              
294 4031         466883 $self->_subprocs()->{$pid} = {
295             fh => $pipe,
296             watcher => undef,
297             callback => $callback,
298             caller => [ caller() ],
299             };
300              
301             # Set up anyevent listener if appropriate
302 4031 50       107132 if ( $self->use_anyevent() ) {
303 0         0 $self->_add_anyevent_watcher($pid);
304             }
305              
306 4031         169178 return $pid;
307              
308             } else {
309             # We are in the child process
310 212         28516 $pipe = $pipe->[1];
311              
312 212         13561 return $self->_child( $sub, $pipe );
313             }
314             }
315              
316              
317             sub asyncs {
318 79 50   79 1 499 if ( $#_ < 2 ) { confess 'invalid call'; }
  0         0  
319 79         265 my $self = shift;
320 79         237 my $children = shift;
321 79         237 my $sub = shift;
322 79 50       342 if ( scalar(@_) > 1 ) { confess("invalid call"); }
  0         0  
323              
324 79 50       1735 if ( $children !~ m/^[1-9][0-9]*$/s ) {
325 0         0 confess("Number of children must be a numeric value > 0");
326             }
327              
328 79         422 for ( my $i = 0; $i < $children; $i++ ) {
329 655     30   8319 $self->async( sub { return $sub->($i); }, @_ );
  30         380  
330             }
331              
332 49         1633 return $children;
333             }
334              
335             sub _child {
336 212 50   212   13292 if ( scalar(@_) != 3 ) { confess 'invalid call'; }
  0         0  
337 212         4613 my ( $self, $sub, $pipe ) = @_;
338              
339             # Cleanup ALL_WU
340 212         9063 @ALL_WU = grep { defined $$_ } @ALL_WU;
  263         8713  
341 212         3359 foreach my $wu ( map { $$_ } @ALL_WU ) {
  218         5066  
342 218         10133 $wu->_clear_all();
343             }
344              
345             try {
346 212     212   52998 my $result = $sub->();
347 205         3292674 $self->_send_result( $pipe, $result );
348             } catch {
349 7     7   2702 $self->_send_error( $pipe, $_ );
350 212         12551 };
351              
352 210         126835 exit();
353             }
354              
355              
356             sub waitall {
357 3367 50   3367 1 41135 if ( $#_ != 0 ) { confess 'invalid call'; }
  0         0  
358 3367         11279 my ($self) = @_;
359              
360             # No subprocs? Just return.
361 3367 100       5324 if ( scalar( keys %{ $self->_subprocs } ) == 0 ) {
  3367         10813  
362 387 50       5124 if ( $self->use_anyevent ) {
363 0         0 $self->_cv( AnyEvent->condvar );
364             }
365              
366 387         3108 return $self->_get_and_reset_ordered_responses();
367             }
368              
369             # Using cv?
370 2980 50       16792 if ( defined( $self->_cv ) ) {
371 0         0 $self->_cv->recv();
372 0 0       0 if ( defined( $self->_last_error ) ) {
373 0         0 my $err = $self->_last_error;
374 0         0 $self->_last_error(undef);
375              
376 0         0 die($err);
377             }
378              
379 0         0 return $self->_get_and_reset_ordered_responses();
380             }
381              
382             # Tail recursion
383 2980 50       9863 if ( $self->_waitone() ) { goto &waitall }
  2947         22871  
384              
385 0         0 return @{ $self->_get_and_reset_ordered_responses() };
  0         0  
386             }
387              
388             # Gets the _ordered_responses and returns the reference. Also
389             # resets the _ordered_responses and the _ordered_counts to an
390             # empty list and zero respectively.
391             sub _get_and_reset_ordered_responses {
392 387 50   387   3305 if ( $#_ != 0 ) { confess 'invalid call'; }
  0         0  
393 387         1863 my $self = shift;
394              
395 387         1637 my (@r) = @{ $self->_ordered_responses() };
  387         2478  
396              
397 387         2933 $self->_ordered_responses( [] );
398 387         2713 $self->_ordered_count(0);
399              
400 387         6856 return @r;
401             }
402              
403              
404             sub waitone {
405 372 50   372 1 203886 if ( $#_ != 0 ) { confess 'invalid call'; }
  0         0  
406 372         2052 my ($self) = @_;
407              
408 372         2513 my $rv = $self->_waitone();
409              
410             # Using AnyEvent?
411 368 50       7850 if ( defined( $self->_last_error ) ) {
412 0         0 my $err = $self->_last_error;
413 0         0 $self->_last_error(undef);
414              
415 0         0 die($err);
416             }
417              
418 368         4896 return $rv;
419             }
420              
421             # Meat of waitone (but doesn't handle returning an exception when using
422             # anyevent)
423             sub _waitone {
424 3352 50   3352   11356 if ( $#_ != 0 ) { confess 'invalid call'; }
  0         0  
425 3352         7945 my ($self) = @_;
426              
427 3352         8150 my $sp = $self->_subprocs();
428 3352         13394 weaken $sp; # To avoid some Windows warnings
429 3352 100       16045 if ( !keys(%$sp) ) { return; }
  8         91  
430              
431             # On everything but Windows
432             #
433 3344         68569 my $s = IO::Select->new();
434 3344         79215 foreach ( keys(%$sp) ) { $s->add( $sp->{$_}{fh} ); }
  14564         500781  
435              
436 3344         141320 my @ready = $s->can_read();
437              
438 3344         2596280 foreach my $fh (@ready) {
439 3344         16812 foreach my $child ( keys(%$sp) ) {
440 8814 50       86128 if ( defined( $fh->fileno() ) ) {
441 8814 100       55635 if ( $fh->fileno() == $sp->{$child}{fh}->fileno() ) {
442 3344         43819 $self->_read_result($child);
443              
444 3325         656075508 waitpid( $child, 0 );
445              
446             # Start queued children, if needed
447 3325         45468 $self->_start_queued_children();
448              
449 3307         83326 return 1; # We don't want to read more than one!
450             }
451             }
452             }
453             }
454              
455             # We should never get here
456 0         0 return;
457             }
458              
459              
460             ## no critic ('Subroutines::ProhibitBuiltinHomonyms')
461             sub wait {
462 8 50   8 1 2124 if ( $#_ != 1 ) { confess 'invalid call'; }
  0         0  
463 8         121 my ( $self, $pid ) = @_;
464              
465 8         235 my $rv = $self->_wait($pid);
466              
467 8 50       120 if ( defined( $self->_last_error ) ) {
468 0         0 my $err = $self->_last_error;
469 0         0 $self->_last_error(undef);
470              
471 0         0 die($err);
472             }
473              
474 8         104 return $rv;
475             }
476              
477             # Internal version that doesn't check for AnyEvent die needs
478             sub _wait {
479 8 50   8   174 if ( $#_ != 1 ) { confess 'invalid call'; }
  0         0  
480 8         140 my ( $self, $pid ) = @_;
481              
482 8 100       136 if ( !exists( $self->_subprocs()->{$pid} ) ) {
483              
484             # We don't warn/die because it's possible that there is
485             # a race between callback and here, in the main thread.
486 4         71 return;
487             }
488              
489 4         193 my $result = $self->_read_result($pid);
490              
491 4         2950618 waitpid( $pid, 0 );
492              
493 4         121 return $result;
494             }
495             ## use critic
496              
497              
498             sub count {
499 7292 50   7292 1 3804946 if ( $#_ != 0 ) { confess 'invalid call'; }
  0         0  
500 7292         39587 my ($self) = @_;
501              
502 7292         23033 my $sp = $self->_subprocs();
503 7292         124114 return scalar( keys %$sp );
504             }
505              
506              
507             sub queue {
508 2302 50   2302 1 57694330 if ( $#_ < 1 ) { confess 'invalid call'; }
  0         0  
509 2302         5271 my $self = shift;
510 2302         4053 my $sub = shift;
511              
512             # Test $sub to make sure it is a code ref or a sub ref
513 2302 100       13243 if ( !_codelike($sub) ) {
514 7         1715 confess("Parameter to queue() is not a code (or codelike) reference");
515             }
516              
517 2295         4141 my $callback;
518 2295 100       7462 if ( scalar(@_) == 0 ) {
    50          
519             # We're okay, don't need to do anything - no callback
520             } elsif ( scalar(@_) == 1 ) {
521             # We have a callback
522 1136         2117 $callback = shift;
523             } else {
524 0         0 confess 'invalid call';
525             }
526              
527             # If there are pending errors, throw that.
528 2295 50       10488 if ( defined( $self->_last_error ) ) { die( $self->_last_error ); }
  0         0  
529              
530 2295         3950 push @{ $self->_queued_children }, [ $sub, $callback ];
  2295         5175  
531 2295         7656 return $self->_start_queued_children();
532             }
533              
534             sub _send_result {
535 205 50   205   3054 if ( $#_ != 2 ) { confess 'invalid call'; }
  0         0  
536 205         2509 my ( $self, $fh, $msg ) = @_;
537              
538 205         2842 return $self->_send( $fh, 'RESULT', $msg );
539             }
540              
541             sub _send_error {
542 7 50   7   357 if ( $#_ != 2 ) { confess 'invalid call'; }
  0         0  
543 7         175 my ( $self, $fh, $err ) = @_;
544              
545 7         286 return $self->_send( $fh, 'ERROR', $err );
546             }
547              
548             sub _send {
549 212 50   212   2443 if ( $#_ != 3 ) { confess 'invalid call'; }
  0         0  
550 212         3704 my ( $self, $fh, $type, $data ) = @_;
551              
552 212         1415 my $msg;
553 212 100 100     6572 if ( blessed($data) && ($data->can('FREEZE')) && ($data->can('THAW')) ) {
      66        
554 1         46 $msg = ref($data) . "!::!" . $data->FREEZE();
555             } else {
556 211         8020 $msg = "!::!" . Storable::freeze( \$data );
557             }
558              
559 210 50       94744 if ( !defined($msg) ) {
560 0         0 die 'freeze() returned undef for child return value';
561             }
562              
563 210         18480 $fh->write($type);
564 210         18281 $fh->write("\n");
565              
566 210         6715 $fh->write( length($msg) );
567 210         6267 $fh->write("\n");
568              
569 210         9665 binmode( $fh, ':raw' );
570              
571 210         143519 $fh->write($msg);
572              
573 210         47435 $fh->close();
574 210         7717 return;
575             }
576              
577             sub _read_result {
578 3348 50   3348   12284 if ( $#_ != 1 ) { confess 'invalid call'; }
  0         0  
579 3348         8922 my ( $self, $child ) = @_;
580              
581 3348         8788 my $cinfo = $self->_subprocs()->{$child};
582 3348 50       11069 if (defined($cinfo->{rawbuff})) {
583 0         0 return $self->_read_result_from_buffer($child);
584             } else {
585 3348         18837 return $self->_read_result_from_fh($child);
586             }
587             }
588              
589             sub _read_result_from_buffer {
590 0 0   0   0 if ( $#_ != 1 ) { confess 'invalid call'; }
  0         0  
591 0         0 my ( $self, $child ) = @_;
592              
593 0         0 my $cinfo = $self->_subprocs()->{$child};
594 0         0 $cinfo->{fh}->close();
595              
596 0         0 my ($type, $size, $buffer) = split /\n/, $cinfo->{rawbuff}, 3;
597 0         0 delete $cinfo->{rawbuff};
598              
599 0 0       0 if ( !defined($type) ) { die 'Could not read child data'; }
  0         0  
600 0 0       0 if ( !defined($size) ) { die 'Could not read child data'; }
  0         0  
601              
602 0         0 my ($class, $frozen) = split("!::!", $buffer, 2);
603 0         0 my $data;
604 0 0       0 if ($class eq "") {
605 0         0 $data = ${ Storable::thaw($frozen) };
  0         0  
606             } else {
607 0         0 $data = $class->THAW($frozen);
608             }
609              
610              
611 0         0 my $caller = $self->_subprocs()->{$child}{caller};
612 0         0 delete $self->_subprocs()->{$child};
613              
614 0 0       0 if ( $type eq 'RESULT' ) {
615 0         0 $cinfo->{callback}->($data);
616             } else {
617 0         0 my $err =
618             "Child (created at "
619             . $caller->[1]
620             . " line "
621             . $caller->[2]
622             . ") died with error: $data";
623              
624 0 0       0 if ( $self->use_anyevent ) {
625             # Can't throw events with anyevent
626 0         0 $self->_last_error($err);
627             } else {
628             # Otherwise we do throw it
629 0         0 die($err);
630             }
631             }
632              
633 0         0 return;
634             }
635              
636             sub _read_result_from_fh {
637 3348 50   3348   12991 if ( $#_ != 1 ) { confess 'invalid call'; }
  0         0  
638 3348         8745 my ( $self, $child ) = @_;
639              
640 3348         7304 my $cinfo = $self->_subprocs()->{$child};
641 3348         6682 my $fh = $cinfo->{fh};
642              
643 3348         119391 my $type = <$fh>;
644 3348 50       18954 if ( !defined($type) ) { die 'Could not read child data'; }
  0         0  
645 3348         15491 chomp($type);
646              
647 3348         15390 my $size = <$fh>;
648 3348         10903 chomp($size);
649              
650 3348         35618 binmode($fh);
651              
652 3348         653920 my $result = '';
653              
654 3348         6877 my $ret = 1;
655 3348   66     51313 while ( defined($ret) && ( length($result) < $size ) ) {
656 3348         13601 my $s = $size - length($result);
657              
658 3348         9479 my $part = '';
659 3348         23573 $ret = $fh->read( $part, $s );
660 3348 50       231919 if ( defined($ret) ) { $result .= $part; }
  3348         85770  
661             }
662              
663 3348         87098 my ($class, $frozen) = split("!::!", $result, 2);
664 3348         8407 my $data;
665 3348 100       11308 if ($class eq "") {
666 3347         5490 $data = ${ Storable::thaw($frozen) };
  3347         22238  
667             } else {
668 1         33 $data = $class->THAW($frozen);
669             }
670              
671 3348         479647 my $caller = $self->_subprocs()->{$child}{caller};
672 3348         7257 delete $self->_subprocs()->{$child};
673 3348         21338 $fh->close();
674              
675 3348 100       87621 if ( $type eq 'RESULT' ) {
676 3329         17312 $cinfo->{callback}->($data);
677             } else {
678 19         681 my $err =
679             "Child (created at "
680             . $caller->[1]
681             . " line "
682             . $caller->[2]
683             . ") died with error: $data";
684              
685 19 50       581 if ( $self->use_anyevent ) {
686             # Can't throw events with anyevent
687 0         0 $self->_last_error($err);
688             } else {
689             # Otherwise we do throw it
690 19         1207 die($err);
691             }
692             }
693              
694 3329         59142 return;
695             }
696              
697             # Start queued children, if possible.
698             # Returns 1 if children were started, undef otherwise
699             sub _start_queued_children {
700 6108 50   6108   26060 if ( $#_ != 0 ) { confess 'invalid call' }
  0         0  
701 6108         18714 my ($self) = @_;
702              
703 6108 100       10546 if ( !( @{ $self->_queued_children } ) ) { return; }
  6108         18237  
  2937         6460  
704 3171 50       10877 if ( defined( $self->_last_error ) ) { return; } # Do not queue if there are errors
  0         0  
705              
706             # Can we start a queued process?
707 3171         6633 while ( scalar @{ $self->_queued_children } ) {
  5260         60613  
708 4011 100 100     18234 if ( ( !defined( $self->max_children ) ) || ( $self->count < $self->max_children ) ) {
709             # Start queued child
710 2183         5092 my $ele = shift @{ $self->_queued_children };
  2183         5739  
711 2183 100       8941 if ( !defined( $ele->[1] ) ) {
712 1103         4145 $self->async( $ele->[0] );
713             } else {
714 1080         8378 $self->async( $ele->[0], $ele->[1] );
715             }
716             } else {
717             # Can't unqueue
718 1828         13588 return;
719             }
720             }
721              
722             # We started at least one process
723 1249         35388 return 1;
724             }
725              
726             # Sets up AnyEvent or tears it down as needed
727             sub _set_anyevent {
728 0 0   0   0 if ( $#_ < 1 ) { confess 'invalid call' }
  0         0  
729 0 0       0 if ( $#_ > 2 ) { confess 'invalid call' }
  0         0  
730 0         0 my ( $self, $new, $old ) = @_;
731              
732 0 0 0     0 if ( ( !$old ) && $new ) {
    0 0        
733             # We are setting up AnyEvent
734 0         0 require AnyEvent;
735              
736 0 0       0 if ( defined( $self->_subprocs() ) ) {
737 0         0 foreach my $pid ( keys %{ $self->_subprocs() } ) {
  0         0  
738 0         0 $self->_add_anyevent_watcher($pid);
739             }
740             }
741              
742 0         0 $self->_cv( AnyEvent->condvar );
743              
744             } elsif ( $old && ( !$new ) ) {
745             # We are tearing down AnyEvent
746              
747 0 0       0 if ( defined( $self->_subprocs() ) ) {
748 0         0 foreach my $pid ( keys %{ $self->_subprocs() } ) {
  0         0  
749 0         0 my $proc = $self->_subprocs()->{$pid};
750              
751 0         0 $proc->{watcher} = undef;
752             }
753             }
754              
755 0         0 $self->_cv(undef);
756             }
757 0         0 return;
758             }
759              
760             # Sets up the listener for AnyEvent
761             sub _add_anyevent_watcher {
762 0 0   0   0 if ( $#_ != 1 ) { confess 'invalid call' }
  0         0  
763 0         0 my ( $self, $pid ) = @_;
764              
765 0         0 my $proc = $self->_subprocs()->{$pid};
766              
767             $proc->{watcher} = AnyEvent->io(
768             fh => $proc->{fh},
769             poll => 'r',
770             cb => sub {
771 0     0   0 $self->_wait($pid);
772 0 0       0 if ( scalar( keys %{ $self->_subprocs() } ) == 0 ) {
  0         0  
773 0         0 my $oldcv = $self->_cv;
774 0         0 $self->_cv( AnyEvent->condvar );
775 0         0 $oldcv->send();
776             }
777              
778             # Start queued children, if needed
779 0         0 $self->_start_queued_children();
780             },
781 0         0 );
782              
783 0         0 return;
784             }
785              
786             # Used to clear all sub-processes, etc, in child process.
787             sub _clear_all {
788 218 50   218   5362 if ( $#_ != 0 ) { confess 'invalid call' }
  0         0  
789 218         3108 my ( $self ) = @_;
790              
791 218         8731 $self->_cv(undef);
792 218         4248 $self->_last_error(undef);
793 218         4257 $self->_ordered_count(0);
794 218         5005 $self->_ordered_responses( [] );
795 218         3626 $self->_count(1);
796 218         4001 $self->_queued_children( [] );
797              
798 218         919 do {
799             # Don't warn on AnyEvent in child threads being DESTROYed
800 218     0   17597 local $SIG{__WARN__} = sub { };
801 218         5649 $self->_subprocs( {} );
802             };
803              
804 218         1728 return;
805             }
806              
807              
808             sub start {
809 5 50   5 1 24011 if ( $#_ != 1 ) { confess 'invalid call'; }
  0         0  
810 5         18 my $self = shift;
811 5         10 my $sub = shift;
812              
813             # Test $sub to make sure it is a code ref or a sub ref
814 5 50       22 if ( !_codelike($sub) ) {
815 0         0 confess("Parameter to start() is not a code (or codelike) reference");
816             }
817              
818 5         39 my $pid = fork();
819              
820 5 100       8604 if ( !$pid ) {
821             # We are in the child process.
822 2         178 $sub->();
823 2         921 exit();
824             }
825              
826 3         148 return;
827             }
828              
829             # Tests to see if something is codelike
830             #
831             # Borrowed from Params::Util (written by Adam Kennedy)
832             sub _codelike {
833 6557 50   6557   26008 if ( scalar(@_) != 1 ) { confess 'invalid call' }
  0         0  
834 6557         12648 my $thing = shift;
835              
836 6557 100       52620 if ( reftype($thing) ) { return 1; }
  6543         30660  
837 14 50 33     56 if ( blessed($thing) && overload::Method( $thing, '()' ) ) { return 1; }
  0         0  
838              
839 14         56 return;
840             }
841              
842             # Destructor emits warning if sub processes are running
843             sub DESTROY {
844 284     284   38484 my $self = shift;
845              
846 284 100       1181 if ( scalar( keys %{ $self->_subprocs } ) ) {
  284         1779  
847 1         109 warn "Warning: Subprocesses running when Parallel::WorkUnit object destroyed\n";
848             }
849              
850 284         53942 return;
851             }
852              
853             1;
854              
855             __END__