File Coverage

blib/lib/MCE/Child.pm
Criterion Covered Total %
statement 380 535 71.0
branch 178 422 42.1
condition 56 173 32.3
subroutine 56 77 72.7
pod 24 24 100.0
total 694 1231 56.3


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## A threads-like parallelization module compatible with Perl 5.8.
4             ##
5             ###############################################################################
6              
7 9     9   886029 use strict;
  9         9  
  9         302  
8 9     9   35 use warnings;
  9         45  
  9         505  
9              
10 9     9   43 no warnings qw( threads recursion uninitialized once redefine );
  9         9  
  9         562  
11              
12             package MCE::Child;
13              
14             our $VERSION = '1.902';
15              
16             ## no critic (BuiltinFunctions::ProhibitStringyEval)
17             ## no critic (Subroutines::ProhibitExplicitReturnUndef)
18             ## no critic (Subroutines::ProhibitSubroutinePrototypes)
19             ## no critic (TestingAndDebugging::ProhibitNoStrict)
20              
21 9     9   3644 use MCE::Signal ();
  9         26  
  9         191  
22 9     9   3892 use MCE::Mutex ();
  9         137  
  9         184  
23 9     9   4351 use MCE::Channel ();
  9         26  
  9         230  
24 9     9   36 use Time::HiRes 'sleep';
  9         9  
  9         69  
25              
26             use overload (
27             q(==) => \&equal,
28 0     0   0 q(!=) => sub { !equal(@_) },
29 9         80 fallback => 1
30 9     9   6592 );
  9         12636  
31              
32             sub import {
33 9 50   9   245 if (caller !~ /^MCE::/) {
34 9     9   850 no strict 'refs'; no warnings 'redefine';
  9     9   18  
  9         449  
  9         56  
  9         19  
  9         1221  
35 9         27 *{ caller().'::mce_child' } = \&mce_child;
  9         88  
36             }
37 9         213 return;
38             }
39              
40             ## The POSIX module has many symbols. Try not loading it simply
41             ## to have WNOHANG. The following covers most platforms.
42              
43             use constant {
44 9 50       48628 _WNOHANG => ( $INC{'POSIX.pm'} )
    50          
45             ? &POSIX::WNOHANG : ( $^O eq 'solaris' ) ? 64 : 1
46 9     9   53 };
  9         17  
47              
48             my ( $_MNGD, $_DATA, $_DELY, $_LIST ) = ( {}, {}, {}, {} );
49              
50             my $_is_MSWin32 = ( $^O eq 'MSWin32' ) ? 1 : 0;
51             my $_tid = ( $INC{'threads.pm'} ) ? threads->tid() : 0;
52             my $_yield_secs = ( $^O =~ /mswin|mingw|msys|cygwin/i ) ? 0.015 : 0.008;
53              
54             sub CLONE {
55 0 0   0   0 $_tid = threads->tid(), &_clear() if $INC{'threads.pm'};
56             }
57              
58             sub _clear {
59 7     7   23 %{ $_LIST } = ();
  7         77  
60             }
61              
62             sub _max_workers {
63 10     10   23 my ( $cpus ) = @_;
64 10 100       68 if ( $cpus eq 'auto' ) {
    100          
65 1         6 $cpus = MCE::Util::get_ncpu();
66             }
67             elsif ( $cpus =~ /^([0-9.]+)%$/ ) {
68 6         35 my ( $percent, $ncpu ) = ( $1 / 100, MCE::Util::get_ncpu() );
69 6         61 $cpus = $ncpu * $percent + 0.5;
70             }
71 10 100 66     115 $cpus = 1 if $cpus !~ /^[\d\.]+$/ || $cpus < 1;
72 10         39 return int($cpus);
73             }
74              
75             ###############################################################################
76             ## ----------------------------------------------------------------------------
77             ## Init routine.
78             ##
79             ###############################################################################
80              
81             bless my $_SELF = { MGR_ID => "$$.$_tid", WRK_ID => $$ }, __PACKAGE__;
82              
83             sub MCE::Child::_guard::DESTROY {
84 0     0   0 my ($pkg, $id) = @{ $_[0] };
  0         0  
85              
86 0 0 0     0 if (defined $pkg && $id eq "$$.$_tid") {
87 0         0 @{ $_[0] } = ();
  0         0  
88 0         0 MCE::Child->finish($pkg);
89             }
90              
91 0         0 return;
92             }
93              
94             sub init {
95 16 100 66 16 1 280594 shift if ( defined $_[0] && $_[0] eq __PACKAGE__ );
96              
97             # -- options ----------------------------------------------------------
98             # max_workers child_timeout posix_exit on_start on_finish void_context
99             # ---------------------------------------------------------------------
100              
101 16 50       103 my $opt = ( ref $_[0] eq 'HASH' ) ? shift : { @_ };
102 16   66     178 my $pkg = "$$.$_tid.".( delete $opt->{caller} || caller() );
103 16         158 my $mngd = $_MNGD->{$pkg} = $opt;
104              
105 16         48 @_ = ();
106              
107             $mngd->{MGR_ID} = "$$.$_tid", $mngd->{PKG} = $pkg,
108 16         166 $mngd->{WRK_ID} = $$;
109              
110 16 100       129 &_force_reap($pkg), $_DATA->{$pkg}->clear() if ( defined $_LIST->{$pkg} );
111              
112 16 100       81 if ( !defined $_LIST->{$pkg} ) {
113 9 0 33     27 $MCE::_GMUTEX->lock() if ( $_tid && $MCE::_GMUTEX );
114 9 50       20 sleep 0.015 if $_tid;
115              
116             # Start the shared-manager process if not running.
117 9 50       36 MCE::Shared->start() if $INC{'MCE/Shared.pm'};
118              
119 9         128 my $chnl = MCE::Channel->new( impl => 'Mutex' );
120 9         89 $_LIST->{ $pkg } = MCE::Child::_ordhash->new();
121 9         128 $_DELY->{ $pkg } = MCE::Child::_delay->new( $chnl );
122 9         55 $_DATA->{ $pkg } = MCE::Child::_hash->new( $chnl );
123 9         28 $_DATA->{"$pkg:id"} = 0;
124              
125 9         372 $_DATA->{"$pkg:seed"} = int(CORE::rand() * 1e9);
126              
127 9 0 33     85 $MCE::_GMUTEX->unlock() if ( $_tid && $MCE::_GMUTEX );
128             }
129              
130 16 50       50 if ( !exists $mngd->{posix_exit} ) {
131             $mngd->{posix_exit} = 1 if (
132             $^S || $_tid || $INC{'Mojo/IOLoop.pm'} ||
133             $INC{'Coro.pm'} || $INC{'LWP/UserAgent.pm'} || $INC{'stfl.pm'} ||
134             $INC{'Curses.pm'} || $INC{'CGI.pm'} || $INC{'FCGI.pm'} ||
135             $INC{'Tk.pm'} || $INC{'Wx.pm'} || $INC{'Win32/GUI.pm'} ||
136 16 50 33     619 $INC{'Gearman/Util.pm'} || $INC{'Gearman/XS.pm'}
      33        
      33        
      33        
      33        
      33        
      33        
      33        
      33        
      33        
      33        
      33        
      33        
137             );
138             }
139              
140 16 100       55 if ( defined $mngd->{max_workers} ) {
141 3         12 $mngd->{max_workers} = _max_workers($mngd->{max_workers});
142             }
143              
144 16 50 33     78 if ( $INC{'LWP/UserAgent.pm'} && !$INC{'Net/HTTP.pm'} ) {
145 0         0 local $@; eval 'require Net::HTTP; require Net::HTTPS';
  0         0  
146             }
147              
148             require POSIX
149 16 50 66     5625 if ( $mngd->{on_finish} && !$INC{'POSIX.pm'} && !$_is_MSWin32 );
      66        
150              
151             defined wantarray
152 16 50       37273 ? bless([$pkg, "$$.$_tid"], MCE::Child::_guard::)
153             : ();
154             }
155              
156             ###############################################################################
157             ## ----------------------------------------------------------------------------
158             ## 'new', 'mce_child', and 'create' for threads-like similarity.
159             ##
160             ###############################################################################
161              
162             ## 'new' and 'tid' are aliases for 'create' and 'pid' respectively.
163              
164             *new = \&create, *tid = \&pid;
165              
166             ## Use "goto" trick to avoid pad problems from 5.8.1 (fixed in 5.8.2)
167             ## Tip found in threads::async.
168              
169             sub mce_child (&;@) {
170 0     0 1 0 goto &create;
171             }
172              
173             sub create {
174 50     50 1 16074 my $caller = caller();
175              
176 50   66     1907 my $mngd = $_MNGD->{ "$$.$_tid.$caller" } || do {
177             # construct mngd internally on first use unless defined
178             init( caller => $caller ); $_MNGD->{ "$$.$_tid.$caller" };
179             };
180              
181 50 50       296 shift if ( $_[0] eq __PACKAGE__ );
182              
183             # ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~
184              
185 50 50       754 my $self = bless ref $_[0] eq 'HASH' ? { %{ shift() } } : { }, __PACKAGE__;
  0         0  
186              
187 50 50       219 $self->{IGNORE} = 1 if $SIG{CHLD} eq 'IGNORE';
188 50         1052 $self->{MGR_ID} = $mngd->{MGR_ID}, $self->{PKG} = $mngd->{PKG};
189 50 50 33     189 $self->{ident } = shift if ( !ref $_[0] && ref $_[1] eq 'CODE' );
190              
191 50 0 33     90 my $func = shift; $func = $caller.'::'.$func
  50   33     169  
192             if ( !ref $func && length $func && index($func,':') < 0 );
193              
194 50 50       113 if ( !defined $func ) {
195 0         0 local $\; print {*STDERR} "code function is not specified or valid\n";
  0         0  
  0         0  
196 0         0 return undef;
197             }
198              
199             my ( $list, $max_workers, $pkg ) = (
200             $_LIST->{ $mngd->{PKG} }, $mngd->{max_workers}, $mngd->{PKG}
201 50         231 );
202              
203 50 50       635 $_DATA->{"$pkg:id"} = 10000 if ( ( my $id = ++$_DATA->{"$pkg:id"} ) >= 2e9 );
204              
205             # Reap completed child processes.
206             {
207 50         81 local ($SIG{CHLD}, $!, $?, $_);
  50         2597  
208             map {
209 2         68 $_ = substr($_, 1); # strip leading 'R'
210 2         36 my $child = $list->del($_);
211 2 50       42 if ( ! $child->{REAPED} ) {
212 2         3609952 waitpid($child->{WRK_ID}, 0);
213 2         60 _reap_child($child, 0);
214             }
215 2         46 ();
216             }
217 50         999 $_DATA->{$pkg}->get_done();
218             }
219              
220             # Wait for a slot if saturated.
221 50 50 33     177 _wait_one($pkg) if ( $max_workers && $list->len() >= $max_workers );
222              
223             # ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~
224              
225 50 0 33     114 $MCE::_GMUTEX->lock() if ( $_tid && $MCE::_GMUTEX );
226              
227 50         91 my @args = @_; @_ = (); # To avoid (Scalars leaked: N) messages
  50         230  
228 50         66 my ( $killed, $pid );
229              
230             {
231 50     0   171 local $SIG{TERM} = local $SIG{INT} = sub { $killed = $_[0] }
  0         0  
232 50 50 33     2032 if ( !$_is_MSWin32 && $] ge '5.010001' );
233              
234             local $SIG{TTIN}, local $SIG{TTOU}, local $SIG{WINCH}
235 50 50       868 if ( !$_is_MSWin32 );
236              
237 50         95803 $pid = fork();
238              
239 50 50       3269 if ( !defined $pid ) { # error
    100          
240 0         0 local $\; print {*STDERR} "fork error: $!\n";
  0         0  
  0         0  
241             }
242             elsif ( $pid ) { # parent
243 43         2165 $self->{WRK_ID} = $pid;
244 43         3122 $list->set($pid, $self);
245 43 100       8949 $mngd->{on_start}->($pid, $self->{ident}) if $mngd->{on_start};
246             }
247             else { # child
248 7         485 %{ $_LIST } = (), $_SELF = $self;
  7         1968  
249              
250             local $SIG{TERM} = local $SIG{INT} = local $SIG{ABRT} = \&_trap,
251             local $SIG{SEGV} = local $SIG{HUP} = \&_trap,
252 7         1197 local $SIG{QUIT} = \&_quit;
253 7         822 local $SIG{CHLD};
254              
255 7 50       325 MCE::Shared::init() if $INC{'MCE/Shared.pm'};
256 7 50       932 $_DATA->{ $_SELF->{PKG} }->set('S'.$$, '') unless $self->{IGNORE};
257 7 50       83 CORE::kill($killed, $$) if $killed;
258              
259 7 50       189 MCE::Child->_clear() if $INC{'MCE/Child.pm'};
260 7 50       58 MCE::Hobo->_clear() if $INC{'MCE/Hobo.pm'};
261              
262             # Set the seed of the base generator uniquely between workers.
263             # The new seed is computed using the current seed and ID value.
264             # One may set the seed at the application level for predictable
265             # results. Ditto for PDL, Math::Prime::Util, Math::Random, and
266             # Math::Random::MT::Auto.
267              
268             {
269 7         19 my $seed = abs($_DATA->{"$pkg:seed"} - ($id * 100000)) % 2147483560;
  7         115  
270              
271 7         74 CORE::srand($seed);
272 7 50 33     124 PDL::srand($seed) if $INC{'PDL.pm'} && PDL->can('srand'); # PDL 2.062 ~ 2.089
273 7 50 33     56 PDL::srandom($seed) if $INC{'PDL.pm'} && PDL->can('srandom'); # PDL 2.089_01+
274 7 50       67 Math::Prime::Util::srand($seed) if $INC{'Math/Prime/Util.pm'};
275             }
276              
277 7 50       51 if ( $INC{'Math/Random.pm'} ) {
278 0         0 my $cur_seed = Math::Random::random_get_seed();
279 0 0       0 my $new_seed = ($cur_seed < 1073741781)
280             ? $cur_seed + ((abs($id) * 10000) % 1073741780)
281             : $cur_seed - ((abs($id) * 10000) % 1073741780);
282              
283 0         0 Math::Random::random_set_seed($new_seed, $new_seed);
284             }
285              
286 7 50       63 if ( $INC{'Math/Random/MT/Auto.pm'} ) {
287 0         0 my $cur_seed = Math::Random::MT::Auto::get_seed()->[0];
288 0 0       0 my $new_seed = ($cur_seed < 1073741781)
289             ? $cur_seed + ((abs($id) * 10000) % 1073741780)
290             : $cur_seed - ((abs($id) * 10000) % 1073741780);
291              
292 0         0 Math::Random::MT::Auto::set_seed($new_seed);
293             }
294              
295 7         288 _dispatch($mngd, $func, \@args);
296             }
297             }
298              
299 43 0 33     7030 $MCE::_GMUTEX->unlock() if ( $_tid && $MCE::_GMUTEX );
300              
301 43 50       353 CORE::kill($killed, $$) if $killed;
302              
303 43 50       5667 return $pid ? $self : undef;
304             }
305              
306             ###############################################################################
307             ## ----------------------------------------------------------------------------
308             ## Public methods.
309             ##
310             ###############################################################################
311              
312             sub equal {
313 0 0 0 0 1 0 return 0 unless ( ref $_[0] && ref $_[1] );
314 0 0       0 $_[0]->{WRK_ID} == $_[1]->{WRK_ID} ? 1 : 0;
315             }
316              
317             sub error {
318 22 50   22 1 230 _croak('Usage: $child->error()') unless ref( my $self = $_[0] );
319 22 50       65 $self->join() unless $self->{REAPED};
320 22 50       194 $self->{ERROR} || undef;
321             }
322              
323             sub exit {
324 10 50 33 10 1 580 shift if ( defined $_[0] && $_[0] eq __PACKAGE__ );
325              
326 10 50       195 my ( $self ) = ( ref $_[0] ? shift : $_SELF );
327 10         100 my ( $pkg, $wrk_id ) = ( $self->{PKG}, $self->{WRK_ID} );
328              
329 10 50 33     460 if ( $wrk_id == $$ && $self->{MGR_ID} eq "$$.$_tid" ) {
    50          
330 0         0 MCE::Child->finish('MCE'); CORE::exit(@_);
  0         0  
331             }
332             elsif ( $wrk_id == $$ ) {
333 0   0     0 alarm 0; my ( $exit_status, @res ) = @_; $? = $exit_status || 0;
  0         0  
  0         0  
334             $_DATA->{$pkg}->set('R'.$wrk_id, @res ? \@res : '')
335 0 0       0 unless $self->{IGNORE};
    0          
336 0         0 die "Child exited ($?)\n";
337 0         0 _exit($?); # not reached
338             }
339              
340 10 50       100 return $self if $self->{REAPED};
341              
342 10 50       140 if ( defined $_DATA->{$pkg} ) {
343 10         220 sleep $_yield_secs until $_DATA->{$pkg}->exists('S'.$wrk_id);
344             } else {
345 0         0 sleep 0.030;
346             }
347              
348 10 50       45 if ($_is_MSWin32) {
349 0 0       0 CORE::kill('KILL', $wrk_id) if CORE::kill('ZERO', $wrk_id);
350             } else {
351 10 50       315 CORE::kill('QUIT', $wrk_id) if CORE::kill('ZERO', $wrk_id);
352             }
353              
354 10         90 $self;
355             }
356              
357             sub finish {
358 19 50   19 1 4213 _croak('Usage: MCE::Child->finish()') if ref($_[0]);
359 19 50 33     231 shift if ( defined $_[0] && $_[0] eq __PACKAGE__ );
360              
361 19 100       161 my $pkg = defined($_[0]) ? shift : "$$.$_tid.".caller();
362              
363 19 100       93 if ( $pkg eq 'MCE' ) {
    50          
364 17         34 for my $key ( keys %{ $_LIST } ) { MCE::Child->finish($key); }
  17         212  
  1         7  
365             }
366             elsif ( defined $_LIST->{$pkg} ) {
367 2 50       10 return if $MCE::Signal::KILLED;
368              
369 2 50       10 if ( exists $_DELY->{$pkg} ) {
370 2         21 &_force_reap($pkg);
371             delete($_DELY->{$pkg}), delete($_DATA->{"$pkg:seed"}),
372             delete($_LIST->{$pkg}), delete($_DATA->{"$pkg:id"}),
373 2         199 delete($_MNGD->{$pkg}), delete($_DATA->{ $pkg });
374             }
375             }
376              
377 19         60 @_ = ();
378              
379 19         1945 return;
380             }
381              
382             sub is_joinable {
383 15 50   15 1 40 _croak('Usage: $child->is_joinable()') unless ref( my $self = $_[0] );
384 15         35 my ( $wrk_id, $pkg ) = ( $self->{WRK_ID}, $self->{PKG} );
385              
386 15 50       220 if ( $wrk_id == $$ ) {
    50          
387 0         0 '';
388             }
389             elsif ( $self->{MGR_ID} eq "$$.$_tid" ) {
390 15 50       35 return '' if $self->{REAPED};
391 15         40 local $!; $_DATA->{$pkg}->reap_data;
  15         35  
392 15 50       200 ( waitpid($wrk_id, _WNOHANG) == 0 ) ? '' : do {
393 0 0       0 _reap_child($self, 0) unless $self->{REAPED};
394 0         0 1;
395             };
396             }
397             else {
398             # limitation for MCE::Child only; allowed for MCE::Hobo
399 0         0 _croak('Error: $child->is_joinable() not called by managed process');
400             }
401             }
402              
403             sub is_running {
404 15 50   15 1 5985 _croak('Usage: $child->is_running()') unless ref( my $self = $_[0] );
405 15         50 my ( $wrk_id, $pkg ) = ( $self->{WRK_ID}, $self->{PKG} );
406              
407 15 50       285 if ( $wrk_id == $$ ) {
    50          
408 0         0 1;
409             }
410             elsif ( $self->{MGR_ID} eq "$$.$_tid" ) {
411 15 50       145 return '' if $self->{REAPED};
412 15         40 local $!; $_DATA->{$pkg}->reap_data;
  15         45  
413 15 50       150 ( waitpid($wrk_id, _WNOHANG) == 0 ) ? 1 : do {
414 0 0       0 _reap_child($self, 0) unless $self->{REAPED};
415 0         0 '';
416             };
417             }
418             else {
419             # limitation for MCE::Child only; allowed for MCE::Hobo
420 0         0 _croak('Error: $child->is_running() not called by managed process');
421             }
422             }
423              
424             sub join {
425 32 50   32 1 17915 _croak('Usage: $child->join()') unless ref( my $self = $_[0] );
426 32         184 my ( $wrk_id, $pkg ) = ( $self->{WRK_ID}, $self->{PKG} );
427              
428 32 50       102 if ( $self->{REAPED} ) {
429 0 0       0 _croak('Child already joined') unless exists( $self->{RESULT} );
430 0 0       0 $_LIST->{$pkg}->del($wrk_id) if ( defined $_LIST->{$pkg} );
431              
432             return ( defined wantarray )
433 0 0       0 ? wantarray ? @{ delete $self->{RESULT} } : delete( $self->{RESULT} )->[-1]
  0 0       0  
434             : ();
435             }
436              
437 32 50       340 if ( $wrk_id == $$ ) {
    50          
438 0         0 _croak('Cannot join self');
439             }
440             elsif ( $self->{MGR_ID} eq "$$.$_tid" ) {
441             # remove from list after reaping
442 32         320 local $SIG{CHLD};
443 32         364 _reap_child($self, 1);
444 32         213 $_LIST->{$pkg}->del($wrk_id);
445             }
446             else {
447             # limitation for MCE::Child only; allowed for MCE::Hobo
448 0         0 _croak('Error: $child->join() not called by managed process');
449             }
450              
451 32 50       73 return unless ( exists $self->{RESULT} );
452              
453             ( defined wantarray )
454 32 50       800 ? wantarray ? @{ delete $self->{RESULT} } : delete( $self->{RESULT} )->[-1]
  0 100       0  
455             : ();
456             }
457              
458             sub kill {
459 5 50   5 1 260 _croak('Usage: $child->kill()') unless ref( my $self = $_[0] );
460 5         80 my ( $wrk_id, $pkg, $signal ) = ( $self->{WRK_ID}, $self->{PKG}, $_[1] );
461              
462 5 50       25 if ( $wrk_id == $$ ) {
463 0   0     0 CORE::kill($signal || 'INT', $$);
464 0         0 return $self;
465             }
466 5 50       85 if ( $self->{MGR_ID} eq "$$.$_tid" ) {
467 5 50       25 return $self if $self->{REAPED};
468 5 50       10 if ( defined $_DATA->{$pkg} ) {
469 5         95 sleep $_yield_secs until $_DATA->{$pkg}->exists('S'.$wrk_id);
470             } else {
471 0         0 sleep 0.030;
472             }
473             }
474              
475 5 50 50     175 CORE::kill($signal || 'INT', $wrk_id) if CORE::kill('ZERO', $wrk_id);
476              
477 5         115 $self;
478             }
479              
480             sub list {
481 5 50   5 1 3210 _croak('Usage: MCE::Child->list()') if ref($_[0]);
482 5         35 my $pkg = "$$.$_tid.".caller();
483              
484 5 50       30 ( defined $_LIST->{$pkg} ) ? $_LIST->{$pkg}->vals() : ();
485             }
486              
487             sub list_pids {
488 5 50   5 1 265 _croak('Usage: MCE::Child->list_pids()') if ref($_[0]);
489 5         260 my $pkg = "$$.$_tid.".caller(); local $_;
  5         90  
490              
491 5 50       100 ( defined $_LIST->{$pkg} ) ? map { $_->pid } $_LIST->{$pkg}->vals() : ();
  15         145  
492             }
493              
494             sub list_joinable {
495 5 50   5 1 4490 _croak('Usage: MCE::Child->list_joinable()') if ref($_[0]);
496 5         35 my $pkg = "$$.$_tid.".caller();
497              
498 5 50       40 return () unless ( my $list = $_LIST->{$pkg} );
499 5         15 local ($!, $?, $_); $_DATA->{$pkg}->reap_data;
  5         15  
500              
501             map {
502 5 50       40 ( waitpid($_->{WRK_ID}, _WNOHANG) == 0 ) ? () : do {
  15         105  
503 0 0       0 _reap_child($_, 0) unless $_->{REAPED};
504 0         0 $_;
505             };
506             }
507             $list->vals();
508             }
509              
510             sub list_running {
511 5 50   5 1 17990 _croak('Usage: MCE::Child->list_running()') if ref($_[0]);
512 5         75 my $pkg = "$$.$_tid.".caller();
513              
514 5 50       40 return () unless ( my $list = $_LIST->{$pkg} );
515 5         80 local ($!, $?, $_); $_DATA->{$pkg}->reap_data;
  5         80  
516              
517             map {
518 5 50       25 ( waitpid($_->{WRK_ID}, _WNOHANG) == 0 ) ? $_ : do {
  15         185  
519 0 0       0 _reap_child($_, 0) unless $_->{REAPED};
520 0         0 ();
521             };
522             }
523             $list->vals();
524             }
525              
526             sub max_workers {
527 17 50   17 1 63 _croak('Usage: MCE::Child->max_workers()') if ref($_[0]);
528 17   33     197 my $mngd = $_MNGD->{ "$$.$_tid.".caller() } || do {
529             # construct mngd internally on first use unless defined
530             init(); $_MNGD->{ "$$.$_tid.".caller() };
531             };
532 17 50       45 shift if ( $_[0] eq __PACKAGE__ );
533              
534 17 100       44 $mngd->{max_workers} = _max_workers(shift) if @_;
535 17         70 $mngd->{max_workers};
536             }
537              
538             sub pending {
539 5 50   5 1 2755 _croak('Usage: MCE::Child->pending()') if ref($_[0]);
540 5         40 my $pkg = "$$.$_tid.".caller();
541              
542 5 50       40 ( defined $_LIST->{$pkg} ) ? $_LIST->{$pkg}->len() : 0;
543             }
544              
545             sub pid {
546 22 50   22 1 257 ref($_[0]) ? $_[0]->{WRK_ID} : $_SELF->{WRK_ID};
547             }
548              
549             sub result {
550 7 50   7 1 107 _croak('Usage: $child->result()') unless ref( my $self = $_[0] );
551 7 50       343 return $self->join() unless $self->{REAPED};
552              
553 7 50       56 _croak('Child already joined') unless exists( $self->{RESULT} );
554 7 50       65 wantarray ? @{ delete $self->{RESULT} } : delete( $self->{RESULT} )->[-1];
  0         0  
555             }
556              
557             sub seed {
558 0 0   0 1 0 _croak('Usage: MCE::Child->seed()') if ref($_[0]);
559 0 0       0 my $pkg = exists $_SELF->{PKG} ? $_SELF->{PKG} : "$$.$_tid.".caller();
560              
561 0         0 return $_DATA->{"$pkg:seed"};
562             }
563              
564             sub self {
565 0 0   0 1 0 ref($_[0]) ? $_[0] : $_SELF;
566             }
567              
568             sub wait_all {
569 1 50   1 1 139 _croak('Usage: MCE::Child->wait_all()') if ref($_[0]);
570 1         25 my $pkg = "$$.$_tid.".caller();
571              
572             return wantarray ? () : 0
573 1 0 33     51 if ( !defined $_LIST->{$pkg} || !$_LIST->{$pkg}->len() );
    50          
574              
575 1         21 local $_; ( wantarray )
576 0         0 ? map { $_->join(); $_ } $_LIST->{$pkg}->vals()
  0         0  
577 1 50       27 : map { $_->join(); () } $_LIST->{$pkg}->vals();
  2         41  
  2         8  
578             }
579              
580             *waitall = \&wait_all; # compatibility
581              
582             sub wait_one {
583 4 50   4 1 332 _croak('Usage: MCE::Child->wait_one()') if ref($_[0]);
584 4         200 my $pkg = "$$.$_tid.".caller();
585              
586             return undef
587 4 50 33     512 if ( !defined $_LIST->{$pkg} || !$_LIST->{$pkg}->len() );
588              
589 4         876 _wait_one($pkg);
590             }
591              
592             *waitone = \&wait_one; # compatibility
593              
594             sub yield {
595 0 0   0 1 0 _croak('Usage: MCE::Child->yield()') if ref($_[0]);
596 0 0 0     0 shift if ( defined $_[0] && $_[0] eq __PACKAGE__ );
597              
598 0   0     0 my $pkg = $_SELF->{PKG} || do {
599             my $mngd = $_MNGD->{ "$$.$_tid.".caller() } || do {
600             # construct mngd internally on first use unless defined
601             init(); $_MNGD->{ "$$.$_tid.".caller() };
602             };
603             $mngd->{PKG};
604             };
605              
606 0 0       0 return unless $_DELY->{$pkg};
607 0         0 my $seconds = $_DELY->{$pkg}->seconds(@_);
608              
609 0         0 MCE::Util::_sleep( $seconds );
610             }
611              
612             ###############################################################################
613             ## ----------------------------------------------------------------------------
614             ## Private methods.
615             ##
616             ###############################################################################
617              
618             sub _croak {
619 0 0   0   0 if ( $INC{'MCE.pm'} ) {
620 0         0 goto &MCE::_croak;
621             }
622             else {
623 0         0 $SIG{__DIE__} = \&MCE::Signal::_die_handler;
624 0         0 $SIG{__WARN__} = \&MCE::Signal::_warn_handler;
625              
626 0         0 $\ = undef; goto &Carp::croak;
  0         0  
627             }
628             }
629              
630             sub _dispatch {
631 7     7   81 my ( $mngd, $func, $args ) = @_;
632              
633 7         2069 $mngd->{WRK_ID} = $_SELF->{WRK_ID} = $$, $? = 0;
634 7 50       56 $ENV{PERL_MCE_IPC} = 'win32' if $_is_MSWin32;
635              
636             {
637 7         18 local $!;
  7         138  
638 7 50       382 (*STDERR)->autoflush(1) if defined( fileno *STDERR );
639 7 50       2229 (*STDOUT)->autoflush(1) if defined( fileno *STDOUT );
640             }
641              
642             # Run task.
643             my $child_timeout = ( exists $_SELF->{child_timeout} )
644 7 50       342 ? $_SELF->{child_timeout} : $mngd->{child_timeout};
645              
646             my $void_context = ( exists $_SELF->{void_context} )
647 7 50       84 ? $_SELF->{void_context} : $mngd->{void_context};
648              
649 7         73 my @res; my $timed_out = 0;
  7         67  
650              
651             local $SIG{'ALRM'} = sub {
652 0     0   0 alarm 0; $timed_out = 1; $SIG{__WARN__} = sub {};
  0         0  
  0         0  
653 0         0 die "Child timed out\n";
654 7         364 };
655              
656 7 50 33     2427 if ( $void_context || $_SELF->{IGNORE} ) {
657 9     9   103 no strict 'refs';
  9         26  
  9         812  
658 0   0     0 eval { alarm($child_timeout || 0); $func->(@{ $args }) };
  0         0  
  0         0  
  0         0  
659             }
660             else {
661 9     9   75 no strict 'refs';
  9         17  
  9         3042  
662 7   50     110 @res = eval { alarm($child_timeout || 0); $func->(@{ $args }) };
  7         86  
  7         53  
  7         129  
663             }
664              
665 7         127 alarm 0;
666 7 50       45 $@ = "Child timed out" if $timed_out;
667              
668 7 50       109 if ( $@ ) {
669 0 0       0 _exit($?) if ( $@ =~ /^Child exited \(\S+\)$/ );
670 0         0 my $err = $@; $? = 1; $err =~ s/, <__ANONIO__> line \d+//;
  0         0  
  0         0  
671              
672 0 0       0 if ( ! $_SELF->{IGNORE} ) {
673             $_DATA->{ $_SELF->{PKG} }->set('S'.$$, $err),
674 0         0 $_DATA->{ $_SELF->{PKG} }->set('R'.$$, '');
675             }
676              
677 0 0 0     0 if ( !$timed_out && !$mngd->{on_finish} && !$INC{'MCE/Simple.pm'} ) {
      0        
678 9     9   7011 use bytes; warn "Child $$ terminated abnormally: reason $err\n";
  9         6246  
  9         60  
  0         0  
679             }
680             }
681             else {
682 7 50       47 shift(@res) if ref($res[0]) =~ /^MCE::(?:Barrier|Semaphore)::_guard/s;
683             $_DATA->{ $_SELF->{PKG} }->set('R'.$$, @res ? \@res : '')
684 7 50       173 if ( ! $_SELF->{IGNORE} );
    50          
685             }
686              
687 7         189 _exit($?);
688             }
689              
690             sub _exit {
691 7     7   34 my ( $exit_status ) = @_;
692              
693             # Check for nested workers not yet joined.
694 7 50 33     134 MCE::Child->finish('MCE') if ( !$_SELF->{SIGNALED} && keys %{ $_LIST } );
  7         79  
695              
696             # Exit child process.
697 7 50   0   179 $SIG{__DIE__} = sub {} unless $_tid;
698 7     0   293 $SIG{__WARN__} = sub {};
699              
700 7 50 33     86 threads->exit($exit_status) if ( $INC{'threads.pm'} && $_is_MSWin32 );
701 7 50 33     38 CORE::kill('KILL', $$) if ( $_SELF->{SIGNALED} && !$_is_MSWin32 );
702              
703             my $posix_exit = ( exists $_SELF->{posix_exit} )
704 7 50       50 ? $_SELF->{posix_exit} : $_MNGD->{ $_SELF->{PKG} }{posix_exit};
705              
706 7 50 33     45 if ( $posix_exit && !$_is_MSWin32 ) {
707 0         0 eval { MCE::Mutex::Channel::_destroy() };
  0         0  
708 0 0       0 POSIX::_exit($exit_status) if $INC{'POSIX.pm'};
709 0         0 CORE::kill('KILL', $$);
710             }
711              
712 7         3423 CORE::exit($exit_status);
713             }
714              
715             sub _force_reap {
716 9     9   46 my ( $count, $pkg ) = ( 0, @_ );
717 9 50 33     134 return unless ( defined $_LIST->{$pkg} && $_LIST->{$pkg}->len() );
718              
719 0         0 for my $child ( $_LIST->{$pkg}->vals() ) {
720 0 0       0 next if $child->{IGNORE};
721              
722 0 0       0 if ( $child->is_running() ) {
723 0 0       0 sleep($_yield_secs), CORE::kill('KILL', $child->pid())
724             if CORE::kill('ZERO', $child->pid());
725 0         0 $count++;
726             }
727             }
728              
729 0         0 $_LIST->{$pkg}->clear();
730              
731 0 0 0     0 warn "Finished with active child processes [$pkg] ($count)\n"
732             if ( $count && !$_is_MSWin32 );
733              
734 0         0 return;
735             }
736              
737             sub _quit {
738 0 0   0   0 return MCE::Signal::defer($_[0]) if $MCE::Signal::IPC;
739              
740 0         0 alarm 0; my ( $name ) = @_;
  0         0  
741 0         0 $_SELF->{SIGNALED} = 1, $name =~ s/^SIG//;
742              
743       0     $SIG{$name} = sub {}, CORE::kill($name, -$$)
744 0 0       0 if ( exists $SIG{$name} );
745              
746 0 0       0 if ( ! $_SELF->{IGNORE} ) {
747 0         0 my ( $pkg, $wrk_id ) = ( $_SELF->{PKG}, $_SELF->{WRK_ID} );
748 0         0 $_DATA->{$pkg}->set('R'.$wrk_id, '');
749             }
750              
751 0         0 _exit(0);
752             }
753              
754             sub _reap_child {
755 38     38   93 my ( $child, $wait_flag ) = @_;
756 38 50 33     5207 return if ( !$child || !defined $child->{PKG} );
757              
758 38         342 local @_ = $_DATA->{ $child->{PKG} }->get($child->{WRK_ID}, $wait_flag);
759              
760 38 100 50     1510 ( $child->{ERROR}, $child->{RESULT}, $child->{REAPED} ) =
761             ( pop || '', length $_[0] ? pop : [], 1 );
762              
763 38 50       231 return if $child->{IGNORE};
764              
765 38   100     391 my ( $exit, $err ) = ( $? || 0, $child->{ERROR} );
766 38         151 my ( $code, $sig ) = ( $exit >> 8, $exit & 0x7f );
767              
768 38 50 33     105 if ( $code > 100 && !$err ) {
769 0 0       0 $code = 2, $sig = 1, $err = 'Child received SIGHUP' if $code == 101;
770 0 0       0 $code = 2, $sig = 2, $err = 'Child received SIGINT' if $code == 102;
771 0 0       0 $code = 2, $sig = 6, $err = 'Child received SIGABRT' if $code == 106;
772 0 0       0 $code = 2, $sig = 11, $err = 'Child received SIGSEGV' if $code == 111;
773 0 0       0 $code = 2, $sig = 15, $err = 'Child received SIGTERM' if $code == 115;
774              
775 0         0 $child->{ERROR} = $err;
776             }
777              
778 38 100       298 if ( my $on_finish = $_MNGD->{ $child->{PKG} }{on_finish} ) {
779             $on_finish->(
780             $child->{WRK_ID}, $code, $child->{ident}, $sig, $err,
781 8         55 @{ $child->{RESULT} }
  8         152  
782             );
783             }
784              
785 38         265 return;
786             }
787              
788             sub _trap {
789 0 0   0   0 return MCE::Signal::defer($_[0]) if $MCE::Signal::IPC;
790              
791 0         0 alarm 0; my ( $exit_status, $name ) = ( 2, @_ );
  0         0  
792 0         0 $_SELF->{SIGNALED} = 1, $name =~ s/^SIG//;
793              
794       0     $SIG{$name} = sub {}, CORE::kill($name, -$$)
795 0 0       0 if ( exists $SIG{$name} );
796              
797 0 0       0 if ( $name eq 'HUP' ) { $exit_status = 101 }
  0 0       0  
    0          
    0          
    0          
798 0         0 elsif ( $name eq 'INT' ) { $exit_status = 102 }
799 0         0 elsif ( $name eq 'ABRT' ) { $exit_status = 106 }
800 0         0 elsif ( $name eq 'SEGV' ) { $exit_status = 111 }
801 0         0 elsif ( $name eq 'TERM' ) { $exit_status = 115 }
802              
803 0 0       0 if ( ! $_SELF->{IGNORE} ) {
804 0         0 my ( $pkg, $wrk_id ) = ( $_SELF->{PKG}, $_SELF->{WRK_ID} );
805 0         0 $_DATA->{$pkg}->set('R'.$wrk_id, '');
806             }
807              
808 0         0 _exit($exit_status);
809             }
810              
811             sub _wait_one {
812 4     4   80 my ( $pkg ) = @_;
813 4         48 my ( $list, $self, $wrk_id ) = ( $_LIST->{$pkg} ); local $!;
  4         128  
814              
815 4         36 while () {
816 768         12316 $_DATA->{$pkg}->reap_data;
817 768         5048 for my $child ( $list->vals() ) {
818 768         3064 $wrk_id = $child->{WRK_ID};
819 768 50       2452 return $list->del($wrk_id) if $child->{REAPED};
820 768 100       14116 $self = $list->del($wrk_id), last if waitpid($wrk_id, _WNOHANG);
821             }
822 768 100       2008 last if $self;
823 764         6395400 sleep $_yield_secs;
824             }
825              
826 4         72 _reap_child($self, 0);
827              
828 4         48 $self;
829             }
830              
831             ###############################################################################
832             ## ----------------------------------------------------------------------------
833             ## Delay implementation suited for MCE::Child.
834             ##
835             ###############################################################################
836              
837             package # hide from rpm
838             MCE::Child::_delay;
839              
840             sub new {
841 9     9   28 my ( $class, $chnl, $delay ) = @_;
842              
843 9 50       28 if ( !defined $delay ) {
844 9 50       177 $delay = ($^O =~ /mswin|mingw|msys|cygwin/i) ? 0.015 : 0.008;
845             }
846              
847 9         37 $chnl->send(undef);
848              
849 9         38 bless [ $delay, $chnl ], $class;
850             }
851              
852             sub seconds {
853 0     0   0 my ( $self, $how_long ) = @_;
854 0 0       0 my $delay = defined($how_long) ? $how_long : $self->[0];
855 0         0 my $lapse = $self->[1]->recv();
856 0         0 my $time = MCE::Util::_time();
857              
858 0 0 0     0 if ( !$delay || !defined $lapse ) {
    0          
859 0         0 $lapse = $time;
860             }
861             elsif ( $lapse + $delay - $time < 0 ) {
862 0         0 $lapse += int( abs($time - $lapse) / $delay + 0.5 ) * $delay;
863             }
864              
865 0         0 $self->[1]->send( $lapse += $delay );
866              
867 0         0 return $lapse - $time;
868             }
869              
870             ###############################################################################
871             ## ----------------------------------------------------------------------------
872             ## Hash and ordhash implementations suited for MCE::Child.
873             ##
874             ###############################################################################
875              
876             package # hide from rpm
877             MCE::Child::_hash;
878              
879 9     9   20671 use Time::HiRes 'sleep';
  9         26  
  9         126  
880              
881             use constant {
882 9 50       13801 _WNOHANG => ( $INC{'POSIX.pm'} )
    50          
883             ? &POSIX::WNOHANG : ( $^O eq 'solaris' ) ? 64 : 1
884 9     9   1793 };
  9         27  
885              
886             sub new {
887 9     9   27 my ( $class, $chnl ) = @_;
888 9         28 bless [ {}, $chnl ], shift;
889             }
890              
891             sub clear {
892 7     7   35 my ( $self ) = @_;
893 7         42 1 while ( $self->[1]->recv2_nb() );
894 7         20 %{ $self->[0] } = ();
  7         25  
895             }
896              
897             sub exists {
898 20     20   105 my ( $self, $key ) = @_;
899 20         110 $self->reap_data;
900 20         40955 CORE::exists $self->[0]{ $key };
901             }
902              
903             sub get_done {
904 50     50   234 my ( $self ) = @_;
905 50         59 my @ret;
906              
907 50         179 $self->reap_data;
908 50         64 for my $key (keys %{ $self->[0] }) {
  50         672  
909 15 100       253 push @ret, $key if ( substr($key, 0, 1) eq 'R' );
910             }
911              
912 50         578 return @ret;
913             }
914              
915             sub get {
916 38     38   342 my ( $self, $wrk_id, $wait_flag ) = @_;
917 38 100       278 $self->reap_data if ( !CORE::exists $self->[0]{ 'R'.$wrk_id } );
918              
919 38 100       93 if ( $wait_flag ) {
920 32         298 local $!;
921 32 100       1037 ( CORE::exists $self->[0]{ 'R'.$wrk_id } ) ? waitpid($wrk_id, 0) : do {
922 6         112 while () {
923 12         86 my $data = $self->[1]->recv2_nb();
924 12 100       47 if ( !defined $data ) {
925 6 50       43 last if waitpid($wrk_id, _WNOHANG);
926 6         17242 sleep(0.0009), next;
927             }
928 6         31 $self->[0]{ $data->[0] } = $data->[1];
929 6 50       13359801 waitpid($wrk_id, 0), last if $data->[0] eq 'R'.$wrk_id;
930             }
931 6 50       336 $self->reap_data if ( !CORE::exists $self->[0]{ 'R'.$wrk_id } );
932             };
933             }
934              
935 38         423 my $result = delete $self->[0]{ 'R'.$wrk_id };
936 38         188 my $error = delete $self->[0]{ 'S'.$wrk_id };
937              
938 38 50       134 $result = '' unless ( defined $result );
939 38 50       98 $error = '' unless ( defined $error );
940              
941 38         235 return ( $result, $error );
942             }
943              
944             sub reap_data {
945 895     895   2906 my ( $self ) = @_;
946              
947 895         10817 while ( my $data = $self->[1]->recv2_nb() ) {
948 71         703 $self->[0]{ $data->[0] } = $data->[1];
949             }
950              
951 895         1612 return;
952             }
953              
954             sub set {
955 14     14   669 $_[0]->[1]->send2([ $_[1], $_[2] ]);
956             }
957              
958             package # hide from rpm
959             MCE::Child::_ordhash;
960              
961 9     9   46 sub new { bless [ {}, [], {}, 0 ], shift; } # data, keys, indx, gcnt
962 0     0   0 sub exists { CORE::exists $_[0]->[0]{ $_[1] }; }
963 0     0   0 sub get { $_[0]->[0]{ $_[1] }; }
964 19     19   50 sub len { scalar keys %{ $_[0]->[0] }; }
  19         192  
965              
966             sub clear {
967 0     0   0 my ( $self ) = @_;
968 0         0 %{ $self->[0] } = @{ $self->[1] } = %{ $self->[2] } = (), $self->[3] = 0;
  0         0  
  0         0  
  0         0  
969              
970 0         0 return;
971             }
972              
973             sub del {
974 38     38   189 my ( $self, $key ) = @_;
975 38 50       235 return undef unless defined( my $off = delete $self->[2]{$key} );
976              
977             # tombstone
978 38         73 $self->[1][$off] = undef;
979              
980             # GC keys and refresh index
981 38 100       187 if ( ++$self->[3] > @{ $self->[1] } * 0.667 ) {
  38         166  
982 15         49 my ( $keys, $indx ) = ( $self->[1], $self->[2] );
983 15         34 my $i; $i = $self->[3] = 0;
  15         44  
984 15         38 for my $k ( @{ $keys } ) {
  15         114  
985 37 50       84 $keys->[$i] = $k, $indx->{$k} = $i++ if defined($k);
986             }
987 15         25 splice @{ $keys }, $i;
  15         79  
988             }
989              
990 38         453 delete $self->[0]{$key};
991             }
992              
993             sub set {
994 43     43   1132 my ( $self, $key ) = @_;
995 43 50       900 $self->[0]{$key} = $_[2], return 1 if exists($self->[0]{$key});
996              
997 43         149 $self->[2]{$key} = @{ $self->[1] }; push @{ $self->[1] }, $key;
  43         2576  
  43         169  
  43         783  
998 43         656 $self->[0]{$key} = $_[2];
999              
1000 43         224 return 1;
1001             }
1002              
1003             sub vals {
1004 789     789   2755 my ( $self ) = @_;
1005             $self->[3]
1006 1         9 ? @{ $self->[0] }{ grep defined($_), @{ $self->[1] } }
  1         16  
1007 789 100       3058 : @{ $self->[0] }{ @{ $self->[1] } };
  788         4722  
  788         1650  
1008             }
1009              
1010             1;
1011              
1012             __END__
1013              
1014             ###############################################################################
1015             ## ----------------------------------------------------------------------------
1016             ## Module usage.
1017             ##
1018             ###############################################################################
1019              
1020             =head1 NAME
1021              
1022             MCE::Child - A threads-like parallelization module compatible with Perl 5.8
1023              
1024             =head1 VERSION
1025              
1026             This document describes MCE::Child version 1.902
1027              
1028             =head1 SYNOPSIS
1029              
1030             use MCE::Child;
1031              
1032             MCE::Child->init(
1033             max_workers => 'auto', # default undef, unlimited
1034              
1035             # Specify a percentage. MCE::Child 1.876+.
1036             max_workers => '25%', # 4 on HW with 16 lcores
1037             max_workers => '50%', # 8 on HW with 16 lcores
1038              
1039             child_timeout => 20, # default undef, no timeout
1040             posix_exit => 1, # default undef, CORE::exit
1041             void_context => 1, # default undef
1042              
1043             on_start => sub {
1044             my ( $pid, $ident ) = @_;
1045             ...
1046             },
1047             on_finish => sub {
1048             my ( $pid, $exit, $ident, $signal, $error, @ret ) = @_;
1049             ...
1050             }
1051             );
1052              
1053             MCE::Child->create( sub { print "Hello from child\n" } )->join();
1054              
1055             sub parallel {
1056             my ($arg1) = @_;
1057             print "Hello again, $arg1\n" if defined($arg1);
1058             print "Hello again, $_\n"; # same thing
1059             }
1060              
1061             MCE::Child->create( \&parallel, $_ ) for 1 .. 3;
1062              
1063             my @procs = MCE::Child->list();
1064             my @pids = MCE::Child->list_pids();
1065             my @running = MCE::Child->list_running();
1066             my @joinable = MCE::Child->list_joinable();
1067             my @count = MCE::Child->pending();
1068              
1069             # Joining is orderly, e.g. child1 is joined first, child2, child3.
1070             $_->join() for @procs; # (or)
1071             $_->join() for @joinable;
1072              
1073             # Joining occurs immediately as child processes complete execution.
1074             1 while MCE::Child->wait_one();
1075              
1076             my $child = mce_child { foreach (@files) { ... } };
1077              
1078             $child->join();
1079              
1080             if ( my $err = $child->error() ) {
1081             warn "Child error: $err\n";
1082             }
1083              
1084             # Get a child's object
1085             $child = MCE::Child->self();
1086              
1087             # Get a child's ID
1088             $pid = MCE::Child->pid(); # $$
1089             $pid = $child->pid();
1090             $pid = MCE::Child->tid(); # tid is an alias for pid
1091             $pid = $child->tid();
1092              
1093             # Test child objects
1094             if ( $child1 == $child2 ) {
1095             ...
1096             }
1097              
1098             # Give other workers a chance to run
1099             MCE::Child->yield();
1100             MCE::Child->yield(0.05);
1101              
1102             # Return context, wantarray aware
1103             my ($value1, $value2) = $child->join();
1104             my $value = $child->join();
1105              
1106             # Check child's state
1107             if ( $child->is_running() ) {
1108             sleep 1;
1109             }
1110             if ( $child->is_joinable() ) {
1111             $child->join();
1112             }
1113              
1114             # Send a signal to a child
1115             $child->kill('SIGUSR1');
1116              
1117             # Exit a child
1118             MCE::Child->exit(0);
1119             MCE::Child->exit(0, @ret);
1120              
1121             =head1 DESCRIPTION
1122              
1123             L<MCE::Child> is a fork of L<MCE::Hobo> for compatibility with Perl 5.8.
1124              
1125             A child is a migratory worker inside the machine that carries the asynchronous
1126             gene. Child processes are equipped with C<threads>-like capability for running
1127             code asynchronously. Unlike threads, each child is a unique process to the
1128             underlying OS. The IPC is handled via C<MCE::Channel>, which runs on all the
1129             major platforms including Cygwin and Strawberry Perl.
1130              
1131             C<MCE::Child> may be used as a standalone or together with C<MCE> including
1132             running alongside C<threads>.
1133              
1134             use MCE::Child;
1135             use MCE::Shared;
1136              
1137             # synopsis: head -20 file.txt | perl script.pl
1138              
1139             my $ifh = MCE::Shared->handle( "<", \*STDIN ); # shared
1140             my $ofh = MCE::Shared->handle( ">", \*STDOUT );
1141             my $ary = MCE::Shared->array();
1142              
1143             sub parallel_task {
1144             my ( $id ) = @_;
1145             while ( <$ifh> ) {
1146             printf {$ofh} "[ %4d ] %s", $., $_;
1147             # $ary->[ $. - 1 ] = "[ ID $id ] read line $.\n" ); # dereferencing
1148             $ary->set( $. - 1, "[ ID $id ] read line $.\n" ); # faster via OO
1149             }
1150             }
1151              
1152             my $child1 = MCE::Child->new( "parallel_task", 1 );
1153             my $child2 = MCE::Child->new( \&parallel_task, 2 );
1154             my $child3 = MCE::Child->new( sub { parallel_task(3) } );
1155              
1156             $_->join for MCE::Child->list(); # ditto: MCE::Child->wait_all();
1157              
1158             # search array (total one round-trip via IPC)
1159             my @vals = $ary->vals( "val =~ / ID 2 /" );
1160              
1161             print {*STDERR} join("", @vals);
1162              
1163             =head1 API DOCUMENTATION
1164              
1165             =over 3
1166              
1167             =item $child = MCE::Child->create( FUNCTION, ARGS )
1168              
1169             =item $child = MCE::Child->new( FUNCTION, ARGS )
1170              
1171             This will create a new child process that will begin execution with function
1172             as the entry point, and optionally ARGS for list of parameters. It will return
1173             the corresponding MCE::Child object, or undef if child creation failed.
1174              
1175             I<FUNCTION> may either be the name of a function, an anonymous subroutine, or
1176             a code ref.
1177              
1178             my $child = MCE::Child->create( "func_name", ... );
1179             # or
1180             my $child = MCE::Child->create( sub { ... }, ... );
1181             # or
1182             my $child = MCE::Child->create( \&func, ... );
1183              
1184             =item $child = MCE::Child->create( { options }, FUNCTION, ARGS )
1185              
1186             =item $child = MCE::Child->create( IDENT, FUNCTION, ARGS )
1187              
1188             Options, excluding C<ident>, may be specified globally via the C<init> function.
1189             Otherwise, C<ident>, C<child_timeout>, C<posix_exit>, and C<void_context> may
1190             be set uniquely.
1191              
1192             The C<ident> option is used by callback functions C<on_start> and C<on_finish>
1193             for identifying the started and finished child process respectively.
1194              
1195             my $child1 = MCE::Child->create( { posix_exit => 1 }, sub {
1196             ...
1197             } );
1198              
1199             $child1->join;
1200              
1201             my $child2 = MCE::Child->create( { child_timeout => 3 }, sub {
1202             sleep 1 for ( 1 .. 9 );
1203             } );
1204              
1205             $child2->join;
1206              
1207             if ( $child2->error() eq "Child timed out\n" ) {
1208             ...
1209             }
1210              
1211             The C<new()> method is an alias for C<create()>.
1212              
1213             =item mce_child { BLOCK } ARGS;
1214              
1215             =item mce_child { BLOCK };
1216              
1217             C<mce_child> runs the block asynchronously similarly to C<< MCE::Child->create() >>.
1218             It returns the child object, or undef if child creation failed.
1219              
1220             my $child = mce_child { foreach (@files) { ... } };
1221              
1222             $child->join();
1223              
1224             if ( my $err = $child->error() ) {
1225             warn("Child error: $err\n");
1226             }
1227              
1228             =item $child->join()
1229              
1230             This will wait for the corresponding child process to complete its execution.
1231             In non-voided context, C<join()> will return the value(s) of the entry point
1232             function.
1233              
1234             The context (void, scalar or list) for the return value(s) for C<join> is
1235             determined at the time of joining and mostly C<wantarray> aware.
1236              
1237             my $child1 = MCE::Child->create( sub {
1238             my @res = qw(foo bar baz);
1239             return (@res);
1240             });
1241              
1242             my @res1 = $child1->join(); # ( foo, bar, baz )
1243             my $res1 = $child1->join(); # baz
1244              
1245             my $child2 = MCE::Child->create( sub {
1246             return 'foo';
1247             });
1248              
1249             my @res2 = $child2->join(); # ( foo )
1250             my $res2 = $child2->join(); # foo
1251              
1252             =item $child1->equal( $child2 )
1253              
1254             Tests if two child objects are the same child or not. Child comparison is based
1255             on process IDs. This is overloaded to the more natural forms.
1256              
1257             if ( $child1 == $child2 ) {
1258             print("Child objects are the same\n");
1259             }
1260             # or
1261             if ( $child1 != $child2 ) {
1262             print("Child objects differ\n");
1263             }
1264              
1265             =item $child->error()
1266              
1267             Child processes are executed in an C<eval> context. This method will return
1268             C<undef> if the child terminates I<normally>. Otherwise, it returns the value
1269             of C<$@> associated with the child's execution status in its C<eval> context.
1270              
1271             =item $child->exit()
1272              
1273             This sends C<'SIGQUIT'> to the child process, notifying the child to exit.
1274             It returns the child object to allow for method chaining. It is important to
1275             join later if not immediately to not leave a zombie or defunct process.
1276              
1277             $child->exit()->join();
1278             ...
1279              
1280             $child->join(); # later
1281              
1282             =item MCE::Child->exit( 0 )
1283              
1284             =item MCE::Child->exit( 0, @ret )
1285              
1286             A child can exit at any time by calling C<< MCE::Child->exit() >>.
1287             Otherwise, the behavior is the same as C<exit(status)> when called from
1288             the main process. The child process may optionally return data, to be
1289             sent via IPC.
1290              
1291             =item MCE::Child->finish()
1292              
1293             This class method is called automatically by C<END>, but may be called
1294             explicitly. An error is emitted via croak if there are active child
1295             processes not yet joined.
1296              
1297             MCE::Child->create( 'task1', $_ ) for 1 .. 4;
1298             $_->join for MCE::Child->list();
1299              
1300             MCE::Child->create( 'task2', $_ ) for 1 .. 4;
1301             $_->join for MCE::Child->list();
1302              
1303             MCE::Child->create( 'task3', $_ ) for 1 .. 4;
1304             $_->join for MCE::Child->list();
1305              
1306             MCE::Child->finish();
1307              
1308             =item MCE::Child->init( options )
1309              
1310             The init function accepts a list of MCE::Child options.
1311              
1312             In scalar context (API available since 1.897), call C<MCE::Child->finish>
1313             automatically upon leaving the scope or program.
1314              
1315             my $guard = MCE::Child->init(
1316             max_workers => 'auto', # default undef, unlimited
1317              
1318             # Specify a percentage. MCE::Child 1.876+.
1319             max_workers => '25%', # 4 on HW with 16 lcores
1320             max_workers => '50%', # 8 on HW with 16 lcores
1321              
1322             child_timeout => 20, # default undef, no timeout
1323             posix_exit => 1, # default undef, CORE::exit
1324             void_context => 1, # default undef
1325              
1326             on_start => sub {
1327             my ( $pid, $ident ) = @_;
1328             ...
1329             },
1330             on_finish => sub {
1331             my ( $pid, $exit, $ident, $signal, $error, @ret ) = @_;
1332             ...
1333             }
1334             );
1335              
1336             # Identification given as an option or the 1st argument.
1337              
1338             for my $key ( 'aa' .. 'zz' ) {
1339             MCE::Child->create( { ident => $key }, sub { ... } );
1340             MCE::Child->create( $key, sub { ... } );
1341             }
1342              
1343             MCE::Child->wait_all;
1344              
1345             Set C<max_workers> if you want to limit the number of workers by waiting
1346             automatically for an available slot. Specify a percentage or C<auto> to
1347             obtain the number of logical cores via C<MCE::Util::get_ncpu()>.
1348              
1349             Set C<child_timeout>, in number of seconds, if you want the child process
1350             to terminate after some time. The default is C<0> for no timeout.
1351              
1352             Set C<posix_exit> to avoid all END and destructor processing. Constructing
1353             MCE::Child inside a thread implies 1 or if present CGI, FCGI, Coro, Curses,
1354             Gearman::Util, Gearman::XS, LWP::UserAgent, Mojo::IOLoop, STFL, Tk, Wx,
1355             or Win32::GUI.
1356              
1357             Set C<void_context> to create the child process in void context for the
1358             return value. Otherwise, the return context is wantarray-aware for
1359             C<join()> and C<result()> and determined when retrieving the data.
1360              
1361             The callback options C<on_start> and C<on_finish> are called in the parent
1362             process after starting the worker and later when terminated. The arguments
1363             for the subroutines were inspired by L<Parallel::ForkManager>.
1364              
1365             The parameters for C<on_start> are the following:
1366              
1367             - pid of the child process
1368             - identification (ident option or 1st arg to create)
1369              
1370             The parameters for C<on_finish> are the following:
1371              
1372             - pid of the child process
1373             - program exit code
1374             - identification (ident option or 1st arg to create)
1375             - exit signal id
1376             - error message from eval inside MCE::Child
1377             - returned data
1378              
1379             =item $child->is_running()
1380              
1381             Returns true if a child is still running.
1382              
1383             =item $child->is_joinable()
1384              
1385             Returns true if the child has finished running and not yet joined.
1386              
1387             =item $child->kill( 'SIG...' )
1388              
1389             Sends the specified signal to the child. Returns the child object to allow for
1390             method chaining. As with C<exit>, it is important to join eventually if not
1391             immediately to not leave a zombie or defunct process.
1392              
1393             $child->kill('SIG...')->join();
1394              
1395             The following is a parallel demonstration comparing C<MCE::Shared> against
1396             C<Redis> and C<Redis::Fast> on a Fedora 23 VM. Joining begins after all
1397             workers have been notified to quit.
1398              
1399             use Time::HiRes qw(time);
1400              
1401             use Redis;
1402             use Redis::Fast;
1403              
1404             use MCE::Child;
1405             use MCE::Shared;
1406              
1407             my $redis = Redis->new();
1408             my $rfast = Redis::Fast->new();
1409             my $array = MCE::Shared->array();
1410              
1411             sub parallel_redis {
1412             my ($_redis) = @_;
1413             my ($count, $quit, $len) = (0, 0);
1414              
1415             # instead, use a flag to exit loop
1416             $SIG{'QUIT'} = sub { $quit = 1 };
1417              
1418             while () {
1419             $len = $_redis->rpush('list', $count++);
1420             last if $quit;
1421             }
1422              
1423             $count;
1424             }
1425              
1426             sub parallel_array {
1427             my ($count, $quit, $len) = (0, 0);
1428              
1429             # do not exit from inside handler
1430             $SIG{'QUIT'} = sub { $quit = 1 };
1431              
1432             while () {
1433             $len = $array->push($count++);
1434             last if $quit;
1435             }
1436              
1437             $count;
1438             }
1439              
1440             sub benchmark_this {
1441             my ($desc, $num_procs, $timeout, $code, @args) = @_;
1442             my ($start, $total) = (time(), 0);
1443              
1444             MCE::Child->new($code, @args) for 1..$num_procs;
1445             sleep $timeout;
1446              
1447             # joining is not immediate; ok
1448             $_->kill('QUIT') for MCE::Child->list();
1449              
1450             # joining later; ok
1451             $total += $_->join() for MCE::Child->list();
1452              
1453             printf "$desc <> duration: %0.03f secs, count: $total\n",
1454             time() - $start;
1455              
1456             sleep 0.2;
1457             }
1458              
1459             benchmark_this('Redis ', 8, 5.0, \&parallel_redis, $redis);
1460             benchmark_this('Redis::Fast', 8, 5.0, \&parallel_redis, $rfast);
1461             benchmark_this('MCE::Shared', 8, 5.0, \&parallel_array);
1462              
1463             =item MCE::Child->list()
1464              
1465             Returns a list of all child objects not yet joined.
1466              
1467             @procs = MCE::Child->list();
1468              
1469             =item MCE::Child->list_pids()
1470              
1471             Returns a list of all child pids not yet joined (available since 1.849).
1472              
1473             @pids = MCE::Child->list_pids();
1474              
1475             $SIG{INT} = $SIG{HUP} = $SIG{TERM} = sub {
1476             # Signal workers all at once
1477             CORE::kill('KILL', MCE::Child->list_pids());
1478             exec('reset');
1479             };
1480              
1481             =item MCE::Child->list_running()
1482              
1483             Returns a list of all child objects that are still running.
1484              
1485             @procs = MCE::Child->list_running();
1486              
1487             =item MCE::Child->list_joinable()
1488              
1489             Returns a list of all child objects that have completed running.
1490             Thus, ready to be joined without blocking.
1491              
1492             @procs = MCE::Child->list_joinable();
1493              
1494             =item MCE::Child->max_workers([ N ])
1495              
1496             Getter and setter for max_workers. Specify a number or 'auto' to acquire the
1497             total number of cores via MCE::Util::get_ncpu. Specify a false value to set
1498             back to no limit.
1499              
1500             =item MCE::Child->pending()
1501              
1502             Returns a count of all child objects not yet joined.
1503              
1504             $count = MCE::Child->pending();
1505              
1506             =item $child->result()
1507              
1508             Returns the result obtained by C<join>, C<wait_one>, or C<wait_all>. If the
1509             process has not yet exited, waits for the corresponding child to complete its
1510             execution.
1511              
1512             use MCE::Child;
1513             use Time::HiRes qw(sleep);
1514              
1515             sub task {
1516             my ($id) = @_;
1517             sleep $id * 0.333;
1518             return $id;
1519             }
1520              
1521             MCE::Child->create('task', $_) for ( reverse 1 .. 3 );
1522              
1523             # 1 while MCE::Child->wait_one();
1524              
1525             while ( my $child = MCE::Child->wait_one() ) {
1526             my $err = $child->error() || 'no error';
1527             my $res = $child->result();
1528             my $pid = $child->pid();
1529              
1530             print "[$pid] $err : $res\n";
1531             }
1532              
1533             Like C<join> described above, the context (void, scalar or list) for the
1534             return value(s) is determined at the time C<result> is called and mostly
1535             C<wantarray> aware.
1536              
1537             my $child1 = MCE::Child->create( sub {
1538             my @res = qw(foo bar baz);
1539             return (@res);
1540             });
1541              
1542             my @res1 = $child1->result(); # ( foo, bar, baz )
1543             my $res1 = $child1->result(); # baz
1544              
1545             my $child2 = MCE::Child->create( sub {
1546             return 'foo';
1547             });
1548              
1549             my @res2 = $child2->result(); # ( foo )
1550             my $res2 = $child2->result(); # foo
1551              
1552             =item MCE::Child->seed()
1553              
1554             Class method that returns the internal random generated seed or undefined.
1555             The seed is generated once during init or initial create.
1556              
1557             Current API available since 1.895.
1558              
1559             =item MCE::Child->self()
1560              
1561             Class method that allows a child to obtain it's own I<MCE::Child> object.
1562              
1563             =item $child->pid()
1564              
1565             =item $child->tid()
1566              
1567             Returns the ID of the child.
1568              
1569             pid: $$ process id
1570             tid: $$ alias for pid
1571              
1572             =item MCE::Child->pid()
1573              
1574             =item MCE::Child->tid()
1575              
1576             Class methods that allows a child to obtain its own ID.
1577              
1578             pid: $$ process id
1579             tid: $$ alias for pid
1580              
1581             =item MCE::Child->wait_one()
1582              
1583             =item MCE::Child->waitone()
1584              
1585             =item MCE::Child->wait_all()
1586              
1587             =item MCE::Child->waitall()
1588              
1589             Meaningful for the manager process only, waits for one or all child processes
1590             to complete execution. Afterwards, returns the corresponding child objects.
1591             If a child doesn't exist, returns the C<undef> value or an empty list for
1592             C<wait_one> and C<wait_all> respectively.
1593              
1594             The C<waitone> and C<waitall> methods are aliases for compatibility with
1595             C<MCE::Hobo>.
1596              
1597             use MCE::Child;
1598             use Time::HiRes qw(sleep);
1599              
1600             sub task {
1601             my $id = shift;
1602             sleep $id * 0.333;
1603             return $id;
1604             }
1605              
1606             MCE::Child->create('task', $_) for ( reverse 1 .. 3 );
1607              
1608             # join, traditional use case
1609             $_->join() for MCE::Child->list();
1610              
1611             # wait_one, simplistic use case
1612             1 while MCE::Child->wait_one();
1613              
1614             # wait_one
1615             while ( my $child = MCE::Child->wait_one() ) {
1616             my $err = $child->error() || 'no error';
1617             my $res = $child->result();
1618             my $pid = $child->pid();
1619              
1620             print "[$pid] $err : $res\n";
1621             }
1622              
1623             # wait_all
1624             my @procs = MCE::Child->wait_all();
1625              
1626             for ( @procs ) {
1627             my $err = $_->error() || 'no error';
1628             my $res = $_->result();
1629             my $pid = $_->pid();
1630              
1631             print "[$pid] $err : $res\n";
1632             }
1633              
1634             =item MCE::Child->yield( [ floating_seconds ] )
1635              
1636             Give other workers a chance to run, optionally for given time. Yield behaves
1637             similarly to MCE's interval option. It throttles workers from running too fast.
1638             A demonstration is provided in the next section for fetching URLs in parallel.
1639              
1640             The default C<floating_seconds> is 0.008 and 0.015 on UNIX and Windows,
1641             respectively. Pass 0 if simply wanting to give other workers a chance to run.
1642              
1643             # total run time: 1.00 second
1644              
1645             MCE::Child->create( sub { MCE::Child->yield(0.25) } ) for 1 .. 4;
1646             MCE::Child->wait_all();
1647              
1648             =back
1649              
1650             =head1 THREADS-like DETACH CAPABILITY
1651              
1652             Threads-like detach capability was added starting with the 1.867 release.
1653              
1654             A threads example is shown first followed by the MCE::Child example. All one
1655             needs to do is set the CHLD signal handler to IGNORE. Unfortunately, this works
1656             on UNIX platforms only. The child process restores the CHLD handler to default,
1657             so is able to deeply spin workers and reap if desired.
1658              
1659             use threads;
1660              
1661             for ( 1 .. 8 ) {
1662             async {
1663             # do something
1664             }->detach;
1665             }
1666              
1667             use MCE::Child;
1668              
1669             # Have the OS reap workers automatically when exiting.
1670             # The on_finish option is ignored if specified (no-op).
1671             # Ensure not inside a thread on UNIX platforms.
1672              
1673             $SIG{CHLD} = 'IGNORE';
1674              
1675             for ( 1 .. 8 ) {
1676             mce_child {
1677             # do something
1678             };
1679             }
1680              
1681             # Optionally, wait for any remaining workers before leaving.
1682             # This is necessary if workers are consuming shared objects,
1683             # constructed via MCE::Shared.
1684              
1685             MCE::Child->wait_all;
1686              
1687             The following is another way and works on Windows.
1688             Here, the on_finish handler works as usual.
1689              
1690             use MCE::Child;
1691              
1692             MCE::Child->init(
1693             on_finish = sub {
1694             ...
1695             },
1696             );
1697              
1698             for ( 1 .. 8 ) {
1699             $_->join for MCE::Child->list_joinable;
1700             mce_child {
1701             # do something
1702             };
1703             }
1704              
1705             MCE::Child->wait_all;
1706              
1707             =head1 PARALLEL::FORKMANAGER-like DEMONSTRATION
1708              
1709             MCE::Child behaves similarly to threads for the most part. It also provides
1710             L<Parallel::ForkManager>-like capabilities. The C<Parallel::ForkManager>
1711             example is shown first followed by a version using C<MCE::Child>.
1712              
1713             =over 3
1714              
1715             =item Parallel::ForkManager
1716              
1717             use strict;
1718             use warnings;
1719              
1720             use Parallel::ForkManager;
1721             use Time::HiRes 'time';
1722              
1723             my $start = time;
1724              
1725             my $pm = Parallel::ForkManager->new(10);
1726             $pm->set_waitpid_blocking_sleep(0);
1727              
1728             $pm->run_on_finish( sub {
1729             my ($pid, $exit_code, $ident, $exit_signal, $core_dumped, $resp) = @_;
1730             print "child $pid completed: $ident => ", $resp->[0], "\n";
1731             });
1732              
1733             DATA_LOOP:
1734             foreach my $data ( 1..2000 ) {
1735             # forks and returns the pid for the child
1736             my $pid = $pm->start($data) and next DATA_LOOP;
1737             my $ret = [ $data * 2 ];
1738              
1739             $pm->finish(0, $ret);
1740             }
1741              
1742             $pm->wait_all_children;
1743              
1744             printf STDERR "duration: %0.03f seconds\n", time - $start;
1745              
1746             =item MCE::Child
1747              
1748             use strict;
1749             use warnings;
1750              
1751             use MCE::Child 1.843;
1752             use Time::HiRes 'time';
1753              
1754             my $start = time;
1755              
1756             MCE::Child->init(
1757             max_workers => 10,
1758             on_finish => sub {
1759             my ($pid, $exit_code, $ident, $exit_signal, $error, $resp) = @_;
1760             print "child $pid completed: $ident => ", $resp->[0], "\n";
1761             }
1762             );
1763              
1764             foreach my $data ( 1..2000 ) {
1765             MCE::Child->create( $data, sub {
1766             [ $data * 2 ];
1767             });
1768             }
1769              
1770             MCE::Child->wait_all;
1771              
1772             printf STDERR "duration: %0.03f seconds\n", time - $start;
1773              
1774             =item Time to spin 2,000 workers and obtain results (in seconds).
1775              
1776             Results were obtained on a Macbook Pro (2.6 GHz ~ 3.6 GHz with Turbo Boost).
1777             Parallel::ForkManager 2.02 uses Moo. Therefore, I ran again with Moo loaded
1778             at the top of the script.
1779              
1780             MCE::Hobo uses MCE::Shared to retrieve data during reaping.
1781             MCE::Child uses MCE::Channel, no shared-manager.
1782              
1783             Version Cygwin Windows Linux macOS FreeBSD
1784              
1785             MCE::Child 1.843 19.099s 17.091s 0.965s 1.534s 1.229s
1786             MCE::Hobo 1.843 20.514s 19.594s 1.246s 1.629s 1.613s
1787             P::FM 1.20 19.703s 19.235s 0.875s 1.445s 1.346s
1788              
1789             MCE::Child 1.843 20.426s 18.417s 1.116s 1.632s 1.338s Moo loaded
1790             MCE::Hobo 1.843 21.809s 20.810s 1.407s 1.759s 1.722s Moo loaded
1791             P::FM 2.02 21.668s 25.927s 1.882s 2.612s 2.483s Moo used
1792              
1793             =item Set posix_exit to avoid all END and destructor processing.
1794              
1795             This is helpful for reducing overhead when workers exit. Ditto if using a Perl
1796             module not parallel safe. The option is ignored on Windows C<$^O eq 'MSWin32'>.
1797              
1798             MCE::Child->init( posix_exit => 1, ... );
1799             MCE::Hobo->init( posix_exit => 1, ... );
1800              
1801             Version Cygwin Windows Linux macOS FreeBSD
1802              
1803             MCE::Child 1.843 19.815s ignored 0.824s 1.284s 1.245s Moo loaded
1804             MCE::Hobo 1.843 21.029s ignored 0.953s 1.335s 1.439s Moo loaded
1805              
1806             =back
1807              
1808             =head1 PARALLEL HTTP GET DEMONSTRATION USING ANYEVENT
1809              
1810             This demonstration constructs two queues, two handles, starts the
1811             shared-manager process if needed, and spawns four workers.
1812             For this demonstration, am chunking 64 URLs per job. In reality,
1813             one may run with 200 workers and chunk 300 URLs on a 24-way box.
1814              
1815             # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1816             # perl demo.pl -- all output
1817             # perl demo.pl >/dev/null -- mngr/child output
1818             # perl demo.pl 2>/dev/null -- show results only
1819             #
1820             # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1821              
1822             use strict;
1823             use warnings;
1824              
1825             use AnyEvent;
1826             use AnyEvent::HTTP;
1827             use Time::HiRes qw( time );
1828              
1829             use MCE::Child;
1830             use MCE::Shared;
1831              
1832             # Construct two queues, input and return.
1833              
1834             my $que = MCE::Shared->queue();
1835             my $ret = MCE::Shared->queue();
1836              
1837             # Construct shared handles for serializing output from many workers
1838             # writing simultaneously. This prevents garbled output.
1839              
1840             mce_open my $OUT, ">>", \*STDOUT or die "open error: $!";
1841             mce_open my $ERR, ">>", \*STDERR or die "open error: $!";
1842              
1843             # Spawn workers early for minimum memory consumption.
1844              
1845             MCE::Child->create({ posix_exit => 1 }, 'task', $_) for 1 .. 4;
1846              
1847             # Obtain or generate input data for workers to process.
1848              
1849             my ( $count, @urls ) = ( 0 );
1850              
1851             push @urls, map { "http://127.0.0.$_/" } 1..254;
1852             push @urls, map { "http://192.168.0.$_/" } 1..254; # 508 URLs total
1853              
1854             while ( @urls ) {
1855             my @chunk = splice(@urls, 0, 64);
1856             $que->enqueue( { ID => ++$count, INPUT => \@chunk } );
1857             }
1858              
1859             # So that workers leave the loop after consuming the queue.
1860              
1861             $que->end();
1862              
1863             # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1864             # Loop for the manager process. The manager may do other work if
1865             # need be and periodically check $ret->pending() not shown here.
1866             #
1867             # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1868              
1869             my $start = time;
1870              
1871             printf {$ERR} "Mngr - entering loop\n";
1872              
1873             while ( $count ) {
1874             my ( $result, $failed ) = $ret->dequeue( 2 );
1875              
1876             # Remove ID from result, so not treated as a URL item.
1877              
1878             printf {$ERR} "Mngr - received job %s\n", delete $result->{ID};
1879              
1880             # Display the URL and the size captured.
1881              
1882             foreach my $url ( keys %{ $result } ) {
1883             printf {$OUT} "%s: %d\n", $url, length($result->{$url})
1884             if $result->{$url}; # url has content
1885             }
1886              
1887             # Display URLs could not reach.
1888              
1889             if ( @{ $failed } ) {
1890             foreach my $url ( @{ $failed } ) {
1891             print {$OUT} "Failed: $url\n";
1892             }
1893             }
1894              
1895             # Decrement the count.
1896              
1897             $count--;
1898             }
1899              
1900             MCE::Child->wait_all();
1901              
1902             printf {$ERR} "Mngr - exiting loop\n\n";
1903             printf {$ERR} "Duration: %0.3f seconds\n\n", time - $start;
1904              
1905             exit;
1906              
1907             # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1908             # Child processes enqueue two items ( $result and $failed ) per each
1909             # job for the manager process. Likewise, the manager process dequeues
1910             # two items above. Optionally, child processes may include the ID in
1911             # the result.
1912             #
1913             # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1914              
1915             sub task {
1916             my ( $id ) = @_;
1917             printf {$ERR} "Child $id entering loop\n";
1918              
1919             while ( my $job = $que->dequeue() ) {
1920             my ( $result, $failed ) = ( { ID => $job->{ID} }, [ ] );
1921              
1922             # Walk URLs, provide a hash and array refs for data.
1923              
1924             printf {$ERR} "Child $id running job $job->{ID}\n";
1925             walk( $job, $result, $failed );
1926              
1927             # Send results to the manager process.
1928              
1929             $ret->enqueue( $result, $failed );
1930             }
1931              
1932             printf {$ERR} "Child $id exiting loop\n";
1933             }
1934              
1935             sub walk {
1936             my ( $job, $result, $failed ) = @_;
1937              
1938             # Yielding is critical when running an event loop in parallel.
1939             # Not doing so means that the app may reach contention points
1940             # with the firewall and likely impose unnecessary hardship at
1941             # the OS level. The idea here is not to have multiple workers
1942             # initiate HTTP requests to a batch of URLs at the same time.
1943             # Yielding behaves similarly like scatter to have the child
1944             # process run solo for a fraction of time.
1945              
1946             MCE::Child->yield( 0.03 );
1947              
1948             my $cv = AnyEvent->condvar();
1949              
1950             # Populate the hash ref for the URLs it could reach.
1951             # Do not mix AnyEvent timeout with child timeout.
1952             # Therefore, choose event timeout when available.
1953              
1954             foreach my $url ( @{ $job->{INPUT} } ) {
1955             $cv->begin();
1956             http_get $url, timeout => 2, sub {
1957             my ( $data, $headers ) = @_;
1958             $result->{$url} = $data;
1959             $cv->end();
1960             };
1961             }
1962              
1963             $cv->recv();
1964              
1965             # Populate the array ref for URLs it could not reach.
1966              
1967             foreach my $url ( @{ $job->{INPUT} } ) {
1968             push @{ $failed }, $url unless (exists $result->{ $url });
1969             }
1970              
1971             return;
1972             }
1973              
1974             __END__
1975              
1976             $ perl demo.pl
1977              
1978             Child 1 entering loop
1979             Child 2 entering loop
1980             Child 3 entering loop
1981             Mngr - entering loop
1982             Child 2 running job 2
1983             Child 3 running job 3
1984             Child 1 running job 1
1985             Child 4 entering loop
1986             Child 4 running job 4
1987             Child 2 running job 5
1988             Mngr - received job 2
1989             Child 3 running job 6
1990             Mngr - received job 3
1991             Child 1 running job 7
1992             Mngr - received job 1
1993             Child 4 running job 8
1994             Mngr - received job 4
1995             http://192.168.0.1/: 3729
1996             Child 2 exiting loop
1997             Mngr - received job 5
1998             Child 3 exiting loop
1999             Mngr - received job 6
2000             Child 1 exiting loop
2001             Mngr - received job 7
2002             Child 4 exiting loop
2003             Mngr - received job 8
2004             Mngr - exiting loop
2005              
2006             Duration: 4.131 seconds
2007              
2008             =head1 CROSS-PLATFORM TEMPLATE FOR BINARY EXECUTABLE
2009              
2010             Making an executable is possible with the L<PAR::Packer> module.
2011             On the Windows platform, threads, threads::shared, and exiting via
2012             threads are necessary for the binary to exit successfully.
2013              
2014             # https://metacpan.org/pod/PAR::Packer
2015             # https://metacpan.org/pod/pp
2016             #
2017             # pp -o demo.exe demo.pl
2018             # ./demo.exe
2019              
2020             use strict;
2021             use warnings;
2022              
2023             use if $^O eq "MSWin32", "threads";
2024             use if $^O eq "MSWin32", "threads::shared";
2025              
2026             # Include minimum dependencies for MCE::Child.
2027             # Add other modules required by your application here.
2028              
2029             use Storable ();
2030             use Time::HiRes ();
2031              
2032             # use IO::FDPass (); # optional: for condvar, handle, queue
2033             # use Sereal (); # optional: for faster serialization
2034              
2035             use MCE::Child;
2036             use MCE::Shared;
2037              
2038             # For PAR to work on the Windows platform, one must include manually
2039             # any shared modules used by the application.
2040              
2041             # use MCE::Shared::Array; # if using MCE::Shared->array
2042             # use MCE::Shared::Cache; # if using MCE::Shared->cache
2043             # use MCE::Shared::Condvar; # if using MCE::Shared->condvar
2044             # use MCE::Shared::Handle; # if using MCE::Shared->handle, mce_open
2045             # use MCE::Shared::Hash; # if using MCE::Shared->hash
2046             # use MCE::Shared::Minidb; # if using MCE::Shared->minidb
2047             # use MCE::Shared::Ordhash; # if using MCE::Shared->ordhash
2048             # use MCE::Shared::Queue; # if using MCE::Shared->queue
2049             # use MCE::Shared::Scalar; # if using MCE::Shared->scalar
2050              
2051             # Et cetera. Only load modules needed for your application.
2052              
2053             use MCE::Shared::Sequence; # if using MCE::Shared->sequence
2054              
2055             my $seq = MCE::Shared->sequence( 1, 9 );
2056              
2057             sub task {
2058             my ( $id ) = @_;
2059             while ( defined ( my $num = $seq->next() ) ) {
2060             print "$id: $num\n";
2061             sleep 1;
2062             }
2063             }
2064              
2065             sub main {
2066             MCE::Child->new( \&task, $_ ) for 1 .. 3;
2067             MCE::Child->wait_all();
2068             }
2069              
2070             # Main must run inside a thread on the Windows platform or workers
2071             # will fail duing exiting, causing the exe to crash. The reason is
2072             # that PAR or a dependency isn't multi-process safe.
2073              
2074             ( $^O eq "MSWin32" ) ? threads->create(\&main)->join() : main();
2075              
2076             threads->exit(0) if $INC{"threads.pm"};
2077              
2078             =head1 LIMITATION
2079              
2080             MCE::Child emits an error when C<is_joinable>, C<is_running>, and C<join> isn't
2081             called by the managed process, where the child was spawned. This is a limitation
2082             in MCE::Child only due to not involving a shared-manager process for IPC.
2083              
2084             This use-case is not typical.
2085              
2086             =head1 CREDITS
2087              
2088             The inspiration for C<MCE::Child> comes from wanting C<threads>-like behavior
2089             for processes compatible with Perl 5.8. Both can run side-by-side including
2090             safe-use by MCE workers. Likewise, the documentation resembles C<threads>.
2091              
2092             The inspiration for C<wait_all> and C<wait_one> comes from the
2093             C<Parallel::WorkUnit> module.
2094              
2095             =head1 SEE ALSO
2096              
2097             =over 3
2098              
2099             =item * L<forks>
2100              
2101             =item * L<forks::BerkeleyDB>
2102              
2103             =item * L<MCE::Hobo>
2104              
2105             =item * L<Parallel::ForkManager>
2106              
2107             =item * L<Parallel::Loops>
2108              
2109             =item * L<Parallel::Prefork>
2110              
2111             =item * L<Parallel::WorkUnit>
2112              
2113             =item * L<Proc::Fork>
2114              
2115             =item * L<Thread::Tie>
2116              
2117             =item * L<threads>
2118              
2119             =back
2120              
2121             =head1 INDEX
2122              
2123             L<MCE|MCE>, L<MCE::Channel>, L<MCE::Shared>
2124              
2125             =head1 AUTHOR
2126              
2127             Mario E. Roy, S<E<lt>marioeroy AT gmail DOT comE<gt>>
2128              
2129             =cut
2130