File Coverage

lib/Parallel/WorkUnit.pm
Criterion Covered Total %
statement 353 483 73.0
branch 129 220 58.6
condition 11 21 52.3
subroutine 52 57 91.2
pod 12 12 100.0
total 557 793 70.2


line stmt bran cond sub pod time code
1             #
2             # Copyright (C) 2015-2024 Joelle Maslak
3             # All Rights Reserved - See License
4             #
5              
6             package Parallel::WorkUnit;
7             $Parallel::WorkUnit::VERSION = '2.243480';
8 251     251   46516492 use v5.8;
  251         999  
9              
10             # ABSTRACT: Provide multi-paradigm forking with ability to pass back data
11              
12 251     251   1248 use strict;
  251         669  
  251         5473  
13 251     251   921 use warnings;
  251         1017  
  251         10661  
14 251     251   3087 use autodie;
  251         20352  
  251         3091  
15              
16 251     251   1331022 use Carp;
  251         1710  
  251         22019  
17              
18 251     251   2256 use overload;
  251         490  
  251         2166  
19 251     251   173298 use IO::Handle;
  251         1726496  
  251         18635  
20 251     251   132104 use IO::Pipely qw(pipely);
  251         5059203  
  251         18771  
21 251     251   129560 use IO::Select;
  251         505921  
  251         16758  
22 251     251   149986 use POSIX ':sys_wait_h';
  251         1619445  
  251         2505  
23 251     251   510301 use Scalar::Util qw(blessed reftype weaken);
  251         600  
  251         17090  
24 251     251   182835 use Storable;
  251         1161694  
  251         20058  
25 251     251   146313 use Try::Tiny;
  251         642974  
  251         1837283  
26              
27              
28             my @ALL_WU; # Holds all active work units so child processes can't
29             # mess with parent work units
30             # Note it holds a reference (strong) to a reference
31             # (weak).
32              
33              
34             sub use_anyevent {
35 5781 50   5781 1 37967 if ( $#_ == 0 ) {
    0          
36 5781         35993 return shift->{use_anyevent};
37             } elsif ( $#_ == 1 ) {
38 0         0 my ( $self, $val ) = @_;
39              
40 0         0 my ($old_val) = $self->{use_anyevent};
41 0         0 $self->{use_anyevent} = $val;
42              
43             # Trigger
44 0         0 $self->_set_anyevent( $val, $old_val );
45              
46 0         0 return $val;
47             } else {
48 0         0 confess("Invalid call");
49             }
50             }
51              
52              
53             # XXX: Add validation that _cv is a Maybe[AnyEvent::CondVar]
54             sub _cv {
55 4266 100   4266   22284 if ( $#_ == 0 ) {
    50          
56 4025         20309 return shift->{_cv};
57             } elsif ( $#_ == 1 ) {
58 241         3683 my ( $self, $val ) = @_;
59 241         10675 $self->{_cv} = $val;
60 241         4202 return $val;
61             } else {
62 0         0 confess("Invalid call");
63             }
64             }
65              
66             # XXX: Add validation that _last_error is a Maybe[Str]
67             sub _last_error {
68 14233 100   14233   122373 if ( $#_ == 0 ) {
    50          
69 13992         286056 return shift->{_last_error};
70             } elsif ( $#_ == 1 ) {
71 241         8102 my ( $self, $val ) = @_;
72 241         5914 $self->{_last_error} = $val;
73 241         2977 return $val;
74             } else {
75 0         0 confess("Invalid call");
76             }
77             }
78              
79             # XXX: Add validation that _ordered_count is a non-negative integer
80             sub _ordered_count {
81 6186 100   6186   24510 if ( $#_ == 0 ) {
    50          
82 2719         6373 my $self = shift;
83              
84             # Initialize
85 2719 100       10773 if ( !exists( $self->{_ordered_count} ) ) { $self->{_ordered_count} = 0; }
  139         420  
86              
87 2719         10005 return $self->{_ordered_count};
88             } elsif ( $#_ == 1 ) {
89 3467         11423 my ( $self, $val ) = @_;
90 3467         11470 $self->{_ordered_count} = $val;
91 3467         8619 return $val;
92             } else {
93 0         0 confess("Invalid call");
94             }
95             }
96              
97             # XXX: Add validation that _ordered_responses is an ArrayRef
98             sub _ordered_responses {
99 3357 100   3357   14236 if ( $#_ == 0 ) {
    50          
100 2609         5468 my $self = shift;
101              
102             # Initialize
103 2609 100       8740 if ( !exists( $self->{_ordered_responses} ) ) { $self->{_ordered_responses} = []; }
  187         1403  
104              
105 2609         15284 return $self->{_ordered_responses};
106             } elsif ( $#_ == 1 ) {
107 748         3186 my ( $self, $val ) = @_;
108 748         4122 $self->{_ordered_responses} = $val;
109 748         3389 return $val;
110             } else {
111 0         0 confess("Invalid call");
112             }
113             }
114              
115              
116             sub max_children {
117 11992 100   11992 1 229176 if ( $#_ == 0 ) {
    50          
118 11126         19925 my $self = shift;
119              
120 11126 100       29645 if ( !exists( $self->{max_children} ) ) { $self->{max_children} = 5; }
  84         362  
121              
122 11126         206953 return $self->{max_children};
123             } elsif ( $#_ == 1 ) {
124 866         3465 my ( $self, $val ) = @_;
125              
126             # Validate
127 866 100       3059 if ( defined($val) ) {
128 774 100       11358 if ( $val !~ m/^[0-9]+$/s ) {
129 132         42834 confess("max_children must be set to a positive integer");
130             }
131 642 100       89378 if ( $val <= 0 ) {
132 66         18678 confess("max_children must be set to a positive integer");
133             }
134             }
135              
136 668         2230 $self->{max_children} = $val;
137              
138             # Trigger
139 668         2715 $self->_start_queued_children();
140              
141 654         5342 return $val;
142             } else {
143 0         0 confess("Invalid call");
144             }
145             }
146              
147             # XXX: Add validation that _subprocs is a HashRef
148             sub _subprocs {
149 42436 100   42436   226528 if ( $#_ == 0 ) {
    50          
150 42195         697504 my $self = shift;
151              
152             # Initial value
153 42195 100       365977 if ( !exists( $self->{_subprocs} ) ) { $self->{_subprocs} = {}; }
  376         1522  
154              
155 42195         434235 return $self->{_subprocs};
156             } elsif ( $#_ == 1 ) {
157 241         3927 my ( $self, $val ) = @_;
158 241         40743 $self->{_subprocs} = $val;
159 241         3359 return $val;
160             } else {
161 0         0 confess("Invalid call");
162             }
163             }
164              
165             # XXX: Add validation that _count is a positive integer
166             # This only gets used on Win32.
167             sub _count {
168 241 50   241   8293 if ( $#_ == 0 ) {
    50          
169 0         0 my $self = shift;
170              
171             # Initial value
172 0 0       0 if ( !exists( $self->{_count} ) ) { $self->{_count} = 1; }
  0         0  
173              
174 0         0 return $self->{_count};
175             } elsif ( $#_ == 1 ) {
176 241         5896 my ( $self, $val ) = @_;
177 241         8315 $self->{_count} = $val;
178 241         1748 return $val;
179             } else {
180 0         0 confess("Invalid call");
181             }
182             }
183              
184             # XXX: Add validation that _parent_pid is a positive integer
185             # We also need to initialize always in the parent process.
186             sub _parent_pid {
187 388 50   388   3350 if ( $#_ == 0 ) {
    50          
188 0         0 return shift->{_parent_pid};
189             } elsif ( $#_ == 1 ) {
190 388         3457 my ( $self, $val ) = @_;
191 388         8930 $self->{_parent_pid} = $val;
192 388         1207 return $val;
193             } else {
194 0         0 confess("Invalid call");
195             }
196             }
197              
198             # Children queued
199             # XXX: Add validation that _queued_cildren is an
200             # ArrayRef[ArrayRef[CodeRef]]
201             sub _queued_children {
202 23420 100   23420   236039 if ( $#_ == 0 ) {
    50          
203 23179         55792 my $self = shift;
204              
205 23179 100       67632 if ( !exists( $self->{_queued_children} ) ) { $self->{_queued_children} = []; }
  338         2075  
206              
207 23179         209169 return $self->{_queued_children};
208             } elsif ( $#_ == 1 ) {
209 241         4461 my ( $self, $val ) = @_;
210 241         4797 $self->{_queued_children} = $val;
211 241         3006 return $val;
212             } else {
213 0         0 confess("Invalid call");
214             }
215             }
216              
217              
218             sub new {
219 388     388 1 56731106 my $class = shift;
220 388         1515 my $self = {};
221 388         1307 bless $self, $class;
222              
223             # Initialize parent PID
224 388         2507 $self->_parent_pid($$);
225              
226             # Make a weak reference and shove it into the ALL_WU array
227 388         711 my $weakself = $self;
228 388         1027 weaken $weakself;
229 388         1742 push @ALL_WU, \$weakself;
230              
231             # Do some housekeeping on @ALL_WU, so it is somewhat bounded
232 388         1501 @ALL_WU = grep { defined $$_ } @ALL_WU;
  594         2219  
233              
234             # Do we have any arguments?
235 388 100       2520 if (scalar(@_) > 0) {
236 132 100       660 my %args = (scalar(@_) == 1) ? %{shift()} : @_;
  66         330  
237              
238 132 50       528 if (exists $args{use_anyevent}) {
239 0         0 $self->use_anyevent($args{use_anyevent});
240             }
241 132 50       462 if (exists $args{max_children}) {
242 132         462 $self->max_children($args{max_children});
243             }
244             }
245              
246 388         1875 return $self;
247             }
248              
249              
250             sub async {
251 5497 50   5497 1 64565181 if ( $#_ < 1 ) { confess 'invalid call'; }
  0         0  
252 5497         21105 my $self = shift;
253 5497         16782 my $sub = shift;
254              
255             # Test $sub to make sure it is a code ref or a sub ref
256 5497 100       71507 if ( !_codelike($sub) ) {
257 7         1904 confess("Parameter to async() is not a code (or codelike) reference");
258             }
259              
260 5490         10512 my $callback;
261 5490 100       26639 if ( scalar(@_) == 0 ) {
    50          
262             # No callback provided
263              
264 2719         24905 my $cbnum = $self->_ordered_count;
265 2719         9738 $self->_ordered_count( $cbnum + 1 );
266              
267             # We create a callback that populates the ordered responses
268 2719         4828 my $selfref = $self;
269 2719         12440 weaken $selfref;
270             $callback = sub {
271 2102 50   2102   6876 if ( defined $selfref ) { # In case this went away
272 2102         4229 @{ $selfref->_ordered_responses }[$cbnum] = shift;
  2102         7048  
273             }
274 2719         315186 };
275             } elsif ( scalar(@_) == 1 ) {
276             # Callback provided
277 2771         5624 $callback = shift;
278             } else {
279 0         0 confess 'invalid call';
280             }
281              
282             # If there are pending errors, throw that.
283 5490 50       43961 if ( defined( $self->_last_error ) ) { die( $self->_last_error ); }
  0         0  
284              
285 5490         151127 my $pipe = [ pipely() ];
286              
287 5490         2692411 my $pid = fork();
288              
289 5490 100       34098740 if ($pid) {
290             # We are in the parent process
291              
292 5255         1280687 $pipe = $pipe->[0];
293              
294 5255         1164962 $self->_subprocs()->{$pid} = {
295             fh => $pipe,
296             watcher => undef,
297             callback => $callback,
298             caller => [ caller() ],
299             };
300              
301             # Set up anyevent listener if appropriate
302 5255 50       365641 if ( $self->use_anyevent() ) {
303 0         0 $self->_add_anyevent_watcher($pid);
304             }
305              
306 5255         305535 return $pid;
307              
308             } else {
309             # We are in the child process
310 235         53078 $pipe = $pipe->[1];
311              
312 235         49506 return $self->_child( $sub, $pipe );
313             }
314             }
315              
316              
317             sub asyncs {
318 82 50   82 1 444 if ( $#_ < 2 ) { confess 'invalid call'; }
  0         0  
319 82         194 my $self = shift;
320 82         220 my $children = shift;
321 82         138 my $sub = shift;
322 82 50       388 if ( scalar(@_) > 1 ) { confess("invalid call"); }
  0         0  
323              
324 82 50       1194 if ( $children !~ m/^[1-9][0-9]*$/s ) {
325 0         0 confess("Number of children must be a numeric value > 0");
326             }
327              
328 82         358 for ( my $i = 0; $i < $children; $i++ ) {
329 685     30   16555 $self->async( sub { return $sub->($i); }, @_ );
  30         554  
330             }
331              
332 52         2936 return $children;
333             }
334              
335             sub _child {
336 235 50   235   63656 if ( scalar(@_) != 3 ) { confess 'invalid call'; }
  0         0  
337 235         6498 my ( $self, $sub, $pipe ) = @_;
338              
339             # Cleanup ALL_WU
340 235         13225 @ALL_WU = grep { defined $$_ } @ALL_WU;
  306         13144  
341 235         3909 foreach my $wu ( map { $$_ } @ALL_WU ) {
  241         13548  
342 241         14012 $wu->_clear_all();
343             }
344              
345             try {
346 235     235   68265 my $result = $sub->();
347 228         3392240 $self->_send_result( $pipe, $result );
348             } catch {
349 7     7   3508 $self->_send_error( $pipe, $_ );
350 235         18800 };
351              
352 233         298716 exit();
353             }
354              
355              
356             sub waitall {
357 4532 50   4532 1 52697 if ( $#_ != 0 ) { confess 'invalid call'; }
  0         0  
358 4532         12214 my ($self) = @_;
359              
360             # No subprocs? Just return.
361 4532 100       9050 if ( scalar( keys %{ $self->_subprocs } ) == 0 ) {
  4532         18143  
362 507 50       3571 if ( $self->use_anyevent ) {
363 0         0 $self->_cv( AnyEvent->condvar );
364             }
365              
366 507         4227 return $self->_get_and_reset_ordered_responses();
367             }
368              
369             # Using cv?
370 4025 50       18665 if ( defined( $self->_cv ) ) {
371 0         0 $self->_cv->recv();
372 0 0       0 if ( defined( $self->_last_error ) ) {
373 0         0 my $err = $self->_last_error;
374 0         0 $self->_last_error(undef);
375              
376 0         0 die($err);
377             }
378              
379 0         0 return $self->_get_and_reset_ordered_responses();
380             }
381              
382             # Tail recursion
383 4025 50       16125 if ( $self->_waitone() ) { goto &waitall }
  3992         32451  
384              
385 0         0 return @{ $self->_get_and_reset_ordered_responses() };
  0         0  
386             }
387              
388             # Gets the _ordered_responses and returns the reference. Also
389             # resets the _ordered_responses and the _ordered_counts to an
390             # empty list and zero respectively.
391             sub _get_and_reset_ordered_responses {
392 507 50   507   3795 if ( $#_ != 0 ) { confess 'invalid call'; }
  0         0  
393 507         1725 my $self = shift;
394              
395 507         1446 my (@r) = @{ $self->_ordered_responses() };
  507         3028  
396              
397 507         2629 $self->_ordered_responses( [] );
398 507         2859 $self->_ordered_count(0);
399              
400 507         11292 return @r;
401             }
402              
403              
404             sub waitone {
405 458 50   458 1 424827 if ( $#_ != 0 ) { confess 'invalid call'; }
  0         0  
406 458         3770 my ($self) = @_;
407              
408 458         6096 my $rv = $self->_waitone();
409              
410             # Using AnyEvent?
411 454 50       9772 if ( defined( $self->_last_error ) ) {
412 0         0 my $err = $self->_last_error;
413 0         0 $self->_last_error(undef);
414              
415 0         0 die($err);
416             }
417              
418 454         5040 return $rv;
419             }
420              
421             # Meat of waitone (but doesn't handle returning an exception when using
422             # anyevent)
423             sub _waitone {
424 4483 50   4483   17369 if ( $#_ != 0 ) { confess 'invalid call'; }
  0         0  
425 4483         11634 my ($self) = @_;
426              
427 4483         12553 my $sp = $self->_subprocs();
428 4483         10241 weaken $sp; # To avoid some Windows warnings
429 4483 100       14744 if ( !keys(%$sp) ) { return; }
  8         595  
430              
431             # On everything but Windows
432             #
433 4475         129484 my $s = IO::Select->new();
434 4475         102760 foreach ( keys(%$sp) ) { $s->add( $sp->{$_}{fh} ); }
  19688         1455439  
435              
436 4475         448988 my @ready = $s->can_read();
437              
438 4475         3687048 foreach my $fh (@ready) {
439 4475         16679 foreach my $child ( keys(%$sp) ) {
440 13366 50       469812 if ( defined( $fh->fileno() ) ) {
441 13366 100       609469 if ( $fh->fileno() == $sp->{$child}{fh}->fileno() ) {
442 4475         68023 $self->_read_result($child);
443              
444 4456         1606663706 waitpid( $child, 0 );
445              
446             # Start queued children, if needed
447 4456         116213 $self->_start_queued_children();
448              
449 4438         140080 return 1; # We don't want to read more than one!
450             }
451             }
452             }
453             }
454              
455             # We should never get here
456 0         0 return;
457             }
458              
459              
460             ## no critic ('Subroutines::ProhibitBuiltinHomonyms')
461             sub wait {
462 8 50   8 1 2874 if ( $#_ != 1 ) { confess 'invalid call'; }
  0         0  
463 8         100 my ( $self, $pid ) = @_;
464              
465 8         182 my $rv = $self->_wait($pid);
466              
467 8 50       147 if ( defined( $self->_last_error ) ) {
468 0         0 my $err = $self->_last_error;
469 0         0 $self->_last_error(undef);
470              
471 0         0 die($err);
472             }
473              
474 8         74 return $rv;
475             }
476              
477             # Internal version that doesn't check for AnyEvent die needs
478             sub _wait {
479 8 50   8   262 if ( $#_ != 1 ) { confess 'invalid call'; }
  0         0  
480 8         176 my ( $self, $pid ) = @_;
481              
482 8 100       164 if ( !exists( $self->_subprocs()->{$pid} ) ) {
483              
484             # We don't warn/die because it's possible that there is
485             # a race between callback and here, in the main thread.
486 4         48 return;
487             }
488              
489 4         196 my $result = $self->_read_result($pid);
490              
491 4         3394711 waitpid( $pid, 0 );
492              
493 4         99 return $result;
494             }
495             ## use critic
496              
497              
498             sub count {
499 9657 50   9657 1 7770560 if ( $#_ != 0 ) { confess 'invalid call'; }
  0         0  
500 9657         48559 my ($self) = @_;
501              
502 9657         39778 my $sp = $self->_subprocs();
503 9657         226468 return scalar( keys %$sp );
504             }
505              
506              
507             sub queue {
508 3459 50   3459 1 73930866 if ( $#_ < 1 ) { confess 'invalid call'; }
  0         0  
509 3459         8165 my $self = shift;
510 3459         9345 my $sub = shift;
511              
512             # Test $sub to make sure it is a code ref or a sub ref
513 3459 100       18994 if ( !_codelike($sub) ) {
514 7         2401 confess("Parameter to queue() is not a code (or codelike) reference");
515             }
516              
517 3452         232321 my $callback;
518 3452 100       14978 if ( scalar(@_) == 0 ) {
    50          
519             # We're okay, don't need to do anything - no callback
520             } elsif ( scalar(@_) == 1 ) {
521             # We have a callback
522 2151         4562 $callback = shift;
523             } else {
524 0         0 confess 'invalid call';
525             }
526              
527             # If there are pending errors, throw that.
528 3452 50       25533 if ( defined( $self->_last_error ) ) { die( $self->_last_error ); }
  0         0  
529              
530 3452         87511 push @{ $self->_queued_children }, [ $sub, $callback ];
  3452         10719  
531 3452         13199 return $self->_start_queued_children();
532             }
533              
534              
535             sub queueall {
536 51 50   51 1 162068 if ( $#_ < 2 ) { confess 'invalid call'; }
  0         0  
537 51         1332 my $self = shift;
538 51         172 my $data = shift;
539 51         128 my $sub = shift;
540              
541 51         282 for my $ele (@$data) {
542 354 100       1982 if (scalar(@_) > 0) {
543 215     10   98707 $self->queue(sub { $sub->($ele) }, @_);
  10         190  
544             } else {
545 139     13   2491 $self->queue(sub { $sub->($ele) });
  13         279  
546             }
547             }
548 28         1662 return 1;
549             }
550              
551             sub _send_result {
552 228 50   228   3444 if ( $#_ != 2 ) { confess 'invalid call'; }
  0         0  
553 228         1907 my ( $self, $fh, $msg ) = @_;
554              
555 228         5898 return $self->_send( $fh, 'RESULT', $msg );
556             }
557              
558             sub _send_error {
559 7 50   7   391 if ( $#_ != 2 ) { confess 'invalid call'; }
  0         0  
560 7         273 my ( $self, $fh, $err ) = @_;
561              
562 7         377 return $self->_send( $fh, 'ERROR', $err );
563             }
564              
565             sub _send {
566 235 50   235   4341 if ( $#_ != 3 ) { confess 'invalid call'; }
  0         0  
567 235         7357 my ( $self, $fh, $type, $data ) = @_;
568              
569 235         1698 my $msg;
570 235 100 100     5737 if ( blessed($data) && ($data->can('FREEZE')) && ($data->can('THAW')) ) {
      66        
571 1         9 $msg = ref($data) . "!::!" . $data->FREEZE();
572             } else {
573 234         14494 $msg = "!::!" . Storable::freeze( \$data );
574             }
575              
576 233 50       190022 if ( !defined($msg) ) {
577 0         0 die 'freeze() returned undef for child return value';
578             }
579              
580 233         62130 $fh->write($type);
581 233         80104 $fh->write("\n");
582              
583 233         20935 $fh->write( length($msg) );
584 233         14470 $fh->write("\n");
585              
586 233         53660 binmode( $fh, ':raw' );
587              
588 233         723378 $fh->write($msg);
589              
590 233         69628 $fh->close();
591 233         47420 return;
592             }
593              
594             sub _read_result {
595 4479 50   4479   18428 if ( $#_ != 1 ) { confess 'invalid call'; }
  0         0  
596 4479         31935 my ( $self, $child ) = @_;
597              
598 4479         16361 my $cinfo = $self->_subprocs()->{$child};
599 4479 50       15360 if (defined($cinfo->{rawbuff})) {
600 0         0 return $self->_read_result_from_buffer($child);
601             } else {
602 4479         92184 return $self->_read_result_from_fh($child);
603             }
604             }
605              
606             sub _read_result_from_buffer {
607 0 0   0   0 if ( $#_ != 1 ) { confess 'invalid call'; }
  0         0  
608 0         0 my ( $self, $child ) = @_;
609              
610 0         0 my $cinfo = $self->_subprocs()->{$child};
611 0         0 $cinfo->{fh}->close();
612              
613 0         0 my ($type, $size, $buffer) = split /\n/, $cinfo->{rawbuff}, 3;
614 0         0 delete $cinfo->{rawbuff};
615              
616 0 0       0 if ( !defined($type) ) { die 'Could not read child data'; }
  0         0  
617 0 0       0 if ( !defined($size) ) { die 'Could not read child data'; }
  0         0  
618              
619 0         0 my ($class, $frozen) = split("!::!", $buffer, 2);
620 0         0 my $data;
621 0 0       0 if ($class eq "") {
622 0         0 $data = ${ Storable::thaw($frozen) };
  0         0  
623             } else {
624 0         0 $data = $class->THAW($frozen);
625             }
626              
627              
628 0         0 my $caller = $self->_subprocs()->{$child}{caller};
629 0         0 delete $self->_subprocs()->{$child};
630              
631 0 0       0 if ( $type eq 'RESULT' ) {
632 0         0 $cinfo->{callback}->($data);
633             } else {
634 0         0 my $err =
635             "Child (created at "
636             . $caller->[1]
637             . " line "
638             . $caller->[2]
639             . ") died with error: $data";
640              
641 0 0       0 if ( $self->use_anyevent ) {
642             # Can't throw events with anyevent
643 0         0 $self->_last_error($err);
644             } else {
645             # Otherwise we do throw it
646 0         0 die($err);
647             }
648             }
649              
650 0         0 return;
651             }
652              
653             sub _read_result_from_fh {
654 4479 50   4479   13966 if ( $#_ != 1 ) { confess 'invalid call'; }
  0         0  
655 4479         12048 my ( $self, $child ) = @_;
656              
657 4479         11889 my $cinfo = $self->_subprocs()->{$child};
658 4479         10210 my $fh = $cinfo->{fh};
659              
660 4479         199103 my $type = <$fh>;
661 4479 50       25177 if ( !defined($type) ) { die 'Could not read child data'; }
  0         0  
662 4479         14939 chomp($type);
663              
664 4479         35005 my $size = <$fh>;
665 4479         12686 chomp($size);
666              
667 4479         44825 binmode($fh);
668              
669 4479         1614004 my $result = '';
670              
671 4479         11780 my $ret = 1;
672 4479   66     85061 while ( defined($ret) && ( length($result) < $size ) ) {
673 4479         22531 my $s = $size - length($result);
674              
675 4479         13629 my $part = '';
676 4479         40947 $ret = $fh->read( $part, $s );
677 4479 50       290128 if ( defined($ret) ) { $result .= $part; }
  4479         151233  
678             }
679              
680 4479         136355 my ($class, $frozen) = split("!::!", $result, 2);
681 4479         14789 my $data;
682 4479 100       16597 if ($class eq "") {
683 4478         8874 $data = ${ Storable::thaw($frozen) };
  4478         132694  
684             } else {
685 1         56 $data = $class->THAW($frozen);
686             }
687              
688 4479         869105 my $caller = $self->_subprocs()->{$child}{caller};
689 4479         10405 delete $self->_subprocs()->{$child};
690 4479         21579 $fh->close();
691              
692 4479 100       189556 if ( $type eq 'RESULT' ) {
693 4460         26877 $cinfo->{callback}->($data);
694             } else {
695 19         616 my $err =
696             "Child (created at "
697             . $caller->[1]
698             . " line "
699             . $caller->[2]
700             . ") died with error: $data";
701              
702 19 50       498 if ( $self->use_anyevent ) {
703             # Can't throw events with anyevent
704 0         0 $self->_last_error($err);
705             } else {
706             # Otherwise we do throw it
707 19         1242 die($err);
708             }
709             }
710              
711 4460         105678 return;
712             }
713              
714             # Start queued children, if possible.
715             # Returns 1 if children were started, undef otherwise
716             sub _start_queued_children {
717 8576 50   8576   46044 if ( $#_ != 0 ) { confess 'invalid call' }
  0         0  
718 8576         28669 my ($self) = @_;
719              
720 8576 100       16557 if ( !( @{ $self->_queued_children } ) ) { return; }
  8576         97617  
  3988         9861  
721 4588 50       21285 if ( defined( $self->_last_error ) ) { return; } # Do not queue if there are errors
  0         0  
722              
723             # Can we start a queued process?
724 4588         9007 while ( scalar @{ $self->_queued_children } ) {
  7811         59340  
725 5708 100 100     39246 if ( ( !defined( $self->max_children ) ) || ( $self->count < $self->max_children ) ) {
726             # Start queued child
727 3340         6316 my $ele = shift @{ $self->_queued_children };
  3340         174975  
728 3340 100       11656 if ( !defined( $ele->[1] ) ) {
729 1245         8407 $self->async( $ele->[0] );
730             } else {
731 2095         16311 $self->async( $ele->[0], $ele->[1] );
732             }
733             } else {
734             # Can't unqueue
735 2368         15751 return;
736             }
737             }
738              
739             # We started at least one process
740 2103         136555 return 1;
741             }
742              
743             # Sets up AnyEvent or tears it down as needed
744             sub _set_anyevent {
745 0 0   0   0 if ( $#_ < 1 ) { confess 'invalid call' }
  0         0  
746 0 0       0 if ( $#_ > 2 ) { confess 'invalid call' }
  0         0  
747 0         0 my ( $self, $new, $old ) = @_;
748              
749 0 0 0     0 if ( ( !$old ) && $new ) {
    0 0        
750             # We are setting up AnyEvent
751 0         0 require AnyEvent;
752              
753 0 0       0 if ( defined( $self->_subprocs() ) ) {
754 0         0 foreach my $pid ( keys %{ $self->_subprocs() } ) {
  0         0  
755 0         0 $self->_add_anyevent_watcher($pid);
756             }
757             }
758              
759 0         0 $self->_cv( AnyEvent->condvar );
760              
761             } elsif ( $old && ( !$new ) ) {
762             # We are tearing down AnyEvent
763              
764 0 0       0 if ( defined( $self->_subprocs() ) ) {
765 0         0 foreach my $pid ( keys %{ $self->_subprocs() } ) {
  0         0  
766 0         0 my $proc = $self->_subprocs()->{$pid};
767              
768 0         0 $proc->{watcher} = undef;
769             }
770             }
771              
772 0         0 $self->_cv(undef);
773             }
774 0         0 return;
775             }
776              
777             # Sets up the listener for AnyEvent
778             sub _add_anyevent_watcher {
779 0 0   0   0 if ( $#_ != 1 ) { confess 'invalid call' }
  0         0  
780 0         0 my ( $self, $pid ) = @_;
781              
782 0         0 my $proc = $self->_subprocs()->{$pid};
783              
784             $proc->{watcher} = AnyEvent->io(
785             fh => $proc->{fh},
786             poll => 'r',
787             cb => sub {
788 0     0   0 $self->_wait($pid);
789 0 0       0 if ( scalar( keys %{ $self->_subprocs() } ) == 0 ) {
  0         0  
790 0         0 my $oldcv = $self->_cv;
791 0         0 $self->_cv( AnyEvent->condvar );
792 0         0 $oldcv->send();
793             }
794              
795             # Start queued children, if needed
796 0         0 $self->_start_queued_children();
797             },
798 0         0 );
799              
800 0         0 return;
801             }
802              
803             # Used to clear all sub-processes, etc, in child process.
804             sub _clear_all {
805 241 50   241   7655 if ( $#_ != 0 ) { confess 'invalid call' }
  0         0  
806 241         5202 my ( $self ) = @_;
807              
808 241         9650 $self->_cv(undef);
809 241         7564 $self->_last_error(undef);
810 241         4175 $self->_ordered_count(0);
811 241         7826 $self->_ordered_responses( [] );
812 241         4824 $self->_count(1);
813 241         4757 $self->_queued_children( [] );
814              
815 241         1191 do {
816             # Don't warn on AnyEvent in child threads being DESTROYed
817 241     0   25963 local $SIG{__WARN__} = sub { };
818 241         6767 $self->_subprocs( {} );
819             };
820              
821 241         2674 return;
822             }
823              
824              
825             sub start {
826 5 50   5 1 28789 if ( $#_ != 1 ) { confess 'invalid call'; }
  0         0  
827 5         20 my $self = shift;
828 5         13 my $sub = shift;
829              
830             # Test $sub to make sure it is a code ref or a sub ref
831 5 50       25 if ( !_codelike($sub) ) {
832 0         0 confess("Parameter to start() is not a code (or codelike) reference");
833             }
834              
835 5         58 my $pid = fork();
836              
837 5 100       18947 if ( !$pid ) {
838             # We are in the child process.
839 2         416 $sub->();
840 2         1458 exit();
841             }
842              
843 3         275 return;
844             }
845              
846             # Tests to see if something is codelike
847             #
848             # Borrowed from Params::Util (written by Adam Kennedy)
849             sub _codelike {
850 8961 50   8961   46005 if ( scalar(@_) != 1 ) { confess 'invalid call' }
  0         0  
851 8961         20429 my $thing = shift;
852              
853 8961 100       81376 if ( reftype($thing) ) { return 1; }
  8947         33847  
854 14 50 33     63 if ( blessed($thing) && overload::Method( $thing, '()' ) ) { return 1; }
  0         0  
855              
856 14         70 return;
857             }
858              
859             # Destructor emits warning if sub processes are running
860             sub DESTROY {
861 344     344   43789 my $self = shift;
862              
863 344 100       1496 if ( scalar( keys %{ $self->_subprocs } ) ) {
  344         31618  
864 1         138 warn "Warning: Subprocesses running when Parallel::WorkUnit object destroyed\n";
865             }
866              
867 344         439404 return;
868             }
869              
870             1;
871              
872             __END__