File Coverage

blib/lib/MCE/Hobo.pm
Criterion Covered Total %
statement 344 533 64.5
branch 164 430 38.1
condition 58 179 32.4
subroutine 56 78 71.7
pod 24 24 100.0
total 646 1244 51.9


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## A threads-like parallelization module.
4             ##
5             ###############################################################################
6              
7 12     12   1335449 use strict;
  12         32  
  12         626  
8 12     12   80 use warnings;
  12         19  
  12         795  
9              
10 12     12   261 use 5.010001;
  12         37  
11              
12 12     12   86 no warnings qw( threads recursion uninitialized once redefine );
  12         34  
  12         1562  
13              
14             package MCE::Hobo;
15              
16             our $VERSION = '1.893';
17              
18             ## no critic (BuiltinFunctions::ProhibitStringyEval)
19             ## no critic (Subroutines::ProhibitExplicitReturnUndef)
20             ## no critic (Subroutines::ProhibitSubroutinePrototypes)
21             ## no critic (TestingAndDebugging::ProhibitNoStrict)
22              
23 12     12   9830 use MCE::Signal ();
  12         63103  
  12         676  
24 12     12   7487 use MCE::Mutex ();
  12         8797  
  12         345  
25 12     12   8533 use MCE::Channel ();
  12         353577  
  12         8909  
26 12     12   160 use Time::HiRes 'sleep';
  12         37  
  12         88  
27              
28             use overload (
29             q(==) => \&equal,
30 0     0   0 q(!=) => sub { !equal(@_) },
31 12         163 fallback => 1
32 12     12   10828 );
  12         28604  
33              
34             sub import {
35 12 50   12   165 if (caller !~ /^MCE::/) {
36 12     12   2034 no strict 'refs'; no warnings 'redefine';
  12     12   24  
  12         540  
  12         81  
  12         15  
  12         1950  
37 12         24 *{ caller().'::mce_async' } = \&mce_async;
  12         91  
38             }
39 12         159 return;
40             }
41              
42             ## The POSIX module has many symbols. Try not loading it simply
43             ## to have WNOHANG. The following covers most platforms.
44              
45             use constant {
46 12 50       98146 _WNOHANG => ( $INC{'POSIX.pm'} )
    50          
47             ? &POSIX::WNOHANG : ( $^O eq 'solaris' ) ? 64 : 1
48 12     12   87 };
  12         24  
49              
50             my ( $_MNGD, $_DATA, $_DELY, $_LIST ) = ( {}, {}, {}, {} );
51              
52             my $_freeze = MCE::Channel::_get_freeze();
53             my $_thaw = MCE::Channel::_get_thaw();
54              
55             my $_is_MSWin32 = ( $^O eq 'MSWin32' ) ? 1 : 0;
56             my $_tid = ( $INC{'threads.pm'} ) ? threads->tid() : 0;
57             my $_yield_secs = ( $^O =~ /mswin|mingw|msys|cygwin/i ) ? 0.015 : 0.008;
58              
59             sub CLONE {
60 0 0   0   0 $_tid = threads->tid(), &_clear() if $INC{'threads.pm'};
61             }
62              
63             sub _clear {
64 9     9   24 %{ $_LIST } = ();
  9         33  
65             }
66              
67             sub _max_workers {
68 10     10   25 my ( $cpus ) = @_;
69 10 100       72 if ( $cpus eq 'auto' ) {
    100          
70 1         34 $cpus = MCE::Util::get_ncpu();
71             }
72             elsif ( $cpus =~ /^([0-9.]+)%$/ ) {
73 6         41 my ( $percent, $ncpu ) = ( $1 / 100, MCE::Util::get_ncpu() );
74 6         36 $cpus = $ncpu * $percent + 0.5;
75             }
76 10 100 66     139 $cpus = 1 if $cpus !~ /^[\d\.]+$/ || $cpus < 1;
77 10         33 return int($cpus);
78             }
79              
80             ###############################################################################
81             ## ----------------------------------------------------------------------------
82             ## Init routine.
83             ##
84             ###############################################################################
85              
86             bless my $_SELF = { MGR_ID => "$$.$_tid", WRK_ID => $$ }, __PACKAGE__;
87              
88             sub MCE::Hobo::_guard::DESTROY {
89 0     0   0 my ($pkg, $id) = @{ $_[0] };
  0         0  
90              
91 0 0 0     0 if (defined $pkg && $id eq "$$.$_tid") {
92 0         0 @{ $_[0] } = ();
  0         0  
93 0         0 MCE::Hobo->finish($pkg);
94             }
95              
96 0         0 return;
97             }
98              
99             sub init {
100 19 100 66 19 1 274951 shift if ( defined $_[0] && $_[0] eq __PACKAGE__ );
101              
102             # -- options ----------------------------------------------------------
103             # max_workers hobo_timeout posix_exit on_start on_finish void_context
104             # ---------------------------------------------------------------------
105              
106 19 50       265 my $opt = ( ref $_[0] eq 'HASH' ) ? shift : { @_ };
107 19   66     508 my $pkg = "$$.$_tid.".( delete $opt->{caller} || caller() );
108 19         175 my $mngd = $_MNGD->{$pkg} = $opt;
109              
110 19         61 @_ = ();
111              
112             $mngd->{MGR_ID} = "$$.$_tid", $mngd->{PKG} = $pkg,
113 19         269 $mngd->{WRK_ID} = $$;
114              
115 19 100       248 &_force_reap($pkg), $_DATA->{$pkg}->clear() if ( defined $_LIST->{$pkg} );
116              
117 19 100       76 if ( !defined $_LIST->{$pkg} ) {
118 12 0 33     48 $MCE::_GMUTEX->lock() if ( $_tid && $MCE::_GMUTEX );
119 12 50       40 sleep 0.015 if $_tid;
120              
121             # Start the shared-manager process if not running.
122 12 50       528 MCE::Shared->start() if $INC{'MCE/Shared.pm'};
123              
124 12         2848 my $chnl = MCE::Channel->new( impl => 'Mutex' );
125 12         125621 $_LIST->{ $pkg } = MCE::Hobo::_ordhash->new();
126 12         928 $_DELY->{ $pkg } = MCE::Hobo::_delay->new( $chnl );
127 12         438 $_DATA->{ $pkg } = MCE::Hobo::_hash->new();
128 12         108 $_DATA->{"$pkg:id"} = 0;
129              
130 12         1088 $_DATA->{"$pkg:seed"} = int(CORE::rand() * 1e9);
131              
132 12 0 33     124 $MCE::_GMUTEX->unlock() if ( $_tid && $MCE::_GMUTEX );
133             }
134              
135 19 50       127 if ( !exists $mngd->{posix_exit} ) {
136             $mngd->{posix_exit} = 1 if (
137             $^S || $_tid || $INC{'Mojo/IOLoop.pm'} ||
138             $INC{'Coro.pm'} || $INC{'LWP/UserAgent.pm'} || $INC{'stfl.pm'} ||
139             $INC{'Curses.pm'} || $INC{'CGI.pm'} || $INC{'FCGI.pm'} ||
140             $INC{'Tk.pm'} || $INC{'Wx.pm'} || $INC{'Win32/GUI.pm'} ||
141 19 50 33     1360 $INC{'Gearman/Util.pm'} || $INC{'Gearman/XS.pm'}
      33        
      33        
      33        
      33        
      33        
      33        
      33        
      33        
      33        
      33        
      33        
      33        
142             );
143             }
144              
145 19 100       86 if ( defined $mngd->{max_workers} ) {
146 3         41 $mngd->{max_workers} = _max_workers($mngd->{max_workers});
147             }
148              
149 19 50 33     130 if ( $INC{'LWP/UserAgent.pm'} && !$INC{'Net/HTTP.pm'} ) {
150 0         0 local $@; eval 'require Net::HTTP; require Net::HTTPS';
  0         0  
151             }
152              
153             require POSIX
154 19 50 66     6701 if ( $mngd->{on_finish} && !$INC{'POSIX.pm'} && !$_is_MSWin32 );
      66        
155              
156             defined wantarray
157 19 50       60634 ? bless([$pkg, "$$.$_tid"], MCE::Hobo::_guard::)
158             : ();
159             }
160              
161             ###############################################################################
162             ## ----------------------------------------------------------------------------
163             ## 'new', 'mce_async', and 'create' for threads-like similarity.
164             ##
165             ###############################################################################
166              
167             ## 'new' and 'tid' are aliases for 'create' and 'pid' respectively.
168              
169             *new = \&create, *tid = \&pid;
170              
171             ## Use "goto" trick to avoid pad problems from 5.8.1 (fixed in 5.8.2)
172             ## Tip found in threads::async.
173              
174             sub mce_async (&;@) {
175 0     0 1 0 goto &create;
176             }
177              
178             sub create {
179 55     55 1 2190072 my $caller = caller();
180              
181 55   66     2736 my $mngd = $_MNGD->{ "$$.$_tid.$caller" } || do {
182             # construct mngd internally on first use unless defined
183             init( caller => $caller ); $_MNGD->{ "$$.$_tid.$caller" };
184             };
185              
186 55 50       502 shift if ( $_[0] eq __PACKAGE__ );
187              
188             # ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~
189              
190 55 50       1115 my $self = bless ref $_[0] eq 'HASH' ? { %{ shift() } } : { }, __PACKAGE__;
  0         0  
191              
192 55 50       325 $self->{IGNORE} = 1 if $SIG{CHLD} eq 'IGNORE';
193 55         1835 $self->{MGR_ID} = $mngd->{MGR_ID}, $self->{PKG} = $mngd->{PKG};
194 55 50 33     623 $self->{ident } = shift if ( !ref $_[0] && ref $_[1] eq 'CODE' );
195              
196 55 0 33     135 my $func = shift; $func = $caller.'::'.$func
  55   33     186  
197             if ( !ref $func && length $func && index($func,':') < 0 );
198              
199 55 50       234 if ( !defined $func ) {
200 0         0 local $\; print {*STDERR} "code function is not specified or valid\n";
  0         0  
  0         0  
201 0         0 return undef;
202             }
203              
204             my ( $list, $max_workers, $pkg ) = (
205             $_LIST->{ $mngd->{PKG} }, $mngd->{max_workers}, $mngd->{PKG}
206 55         271 );
207              
208 55 50       789 $_DATA->{"$pkg:id"} = 10000 if ( ( my $id = ++$_DATA->{"$pkg:id"} ) >= 2e9 );
209              
210             # Reap completed hobo processes.
211 55 50 33     1164 if ( $self->{IGNORE} || ($max_workers && $list->len() >= $max_workers) ) {
      33        
212 0         0 local ($SIG{CHLD}, $!, $?, $_);
213             map {
214 0         0 $_ = substr($_, 1); # strip leading 'R'
215 0         0 my $hobo = $list->del($_);
216 0 0       0 if ( ! $hobo->{REAPED} ) {
217 0         0 waitpid($hobo->{WRK_ID}, 0);
218 0         0 _reap_hobo($hobo, 0);
219             }
220 0         0 ();
221             }
222 0         0 $_DATA->{$pkg}->get_done();
223             }
224              
225             # Wait for a slot if saturated.
226 55 50 33     254 _wait_one($pkg) if ( $max_workers && $list->len() >= $max_workers );
227              
228             # ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~
229              
230 55 0 33     158 $MCE::_GMUTEX->lock() if ( $_tid && $MCE::_GMUTEX );
231              
232 55         225 my @args = @_; @_ = (); # To avoid (Scalars leaked: N) messages
  55         130  
233 55         109 my ( $killed, $pid );
234              
235             {
236 55     0   102 local $SIG{TERM} = local $SIG{INT} = sub { $killed = $_[0] }
  0         0  
237 55 50 33     3599 if ( !$_is_MSWin32 && $] ge '5.010001' );
238              
239             local $SIG{TTIN}, local $SIG{TTOU}, local $SIG{WINCH}
240 55 50       1576 if ( !$_is_MSWin32 );
241              
242 55         247417 $pid = fork();
243              
244 55 50       6613 if ( !defined $pid ) { # error
    100          
245 0         0 local $\; print {*STDERR} "fork error: $!\n";
  0         0  
  0         0  
246             }
247             elsif ( $pid ) { # parent
248 46         2476 $self->{WRK_ID} = $pid;
249 46         3500 $list->set($pid, $self);
250 46 100       11249 $mngd->{on_start}->($pid, $self->{ident}) if $mngd->{on_start};
251             }
252             else { # child
253 9         646 %{ $_LIST } = (), $_SELF = $self;
  9         2205  
254              
255             local $SIG{TERM} = local $SIG{INT} = local $SIG{ABRT} = \&_trap,
256             local $SIG{SEGV} = local $SIG{HUP} = \&_trap,
257 9         1827 local $SIG{QUIT} = \&_quit;
258 9         385 local $SIG{CHLD};
259              
260 9 50       1462 MCE::Shared::init() if $INC{'MCE/Shared.pm'};
261 9 50       576 $_DATA->{ $_SELF->{PKG} }->set('S'.$$, '') unless $self->{IGNORE};
262 9 50       111 CORE::kill($killed, $$) if $killed;
263              
264 9 50       60 MCE::Child->_clear() if $INC{'MCE/Child.pm'};
265 9 50       258 MCE::Hobo->_clear() if $INC{'MCE/Hobo.pm'};
266              
267             # Set the seed of the base generator uniquely between workers.
268             # The new seed is computed using the current seed and ID value.
269             # One may set the seed at the application level for predictable
270             # results. Ditto for PDL, Math::Prime::Util, Math::Random, and
271             # Math::Random::MT::Auto.
272              
273             {
274 9         22 my $seed = abs($_DATA->{"$pkg:seed"} - ($id * 100000)) % 2147483560;
  9         73  
275              
276 9         94 CORE::srand($seed);
277 9 50 33     108 PDL::srand($seed) if $INC{'PDL.pm'} && PDL->can('srand'); # PDL 2.062 ~ 2.089
278 9 50 33     129 PDL::srandom($seed) if $INC{'PDL.pm'} && PDL->can('srandom'); # PDL 2.089_01+
279 9 50       90 Math::Prime::Util::srand($seed) if $INC{'Math/Prime/Util.pm'};
280             }
281              
282 9 50       66 if ( $INC{'Math/Random.pm'} ) {
283 0         0 my $cur_seed = Math::Random::random_get_seed();
284 0 0       0 my $new_seed = ($cur_seed < 1073741781)
285             ? $cur_seed + ((abs($id) * 10000) % 1073741780)
286             : $cur_seed - ((abs($id) * 10000) % 1073741780);
287              
288 0         0 Math::Random::random_set_seed($new_seed, $new_seed);
289             }
290              
291 9 50       75 if ( $INC{'Math/Random/MT/Auto.pm'} ) {
292 0         0 my $cur_seed = Math::Random::MT::Auto::get_seed()->[0];
293 0 0       0 my $new_seed = ($cur_seed < 1073741781)
294             ? $cur_seed + ((abs($id) * 10000) % 1073741780)
295             : $cur_seed - ((abs($id) * 10000) % 1073741780);
296              
297 0         0 Math::Random::MT::Auto::set_seed($new_seed);
298             }
299              
300 9         442 _dispatch($mngd, $func, \@args);
301             }
302             }
303              
304 46 0 33     3618 $MCE::_GMUTEX->unlock() if ( $_tid && $MCE::_GMUTEX );
305              
306 46 50       296 CORE::kill($killed, $$) if $killed;
307              
308 46 50       9843 return $pid ? $self : undef;
309             }
310              
311             ###############################################################################
312             ## ----------------------------------------------------------------------------
313             ## Public methods.
314             ##
315             ###############################################################################
316              
317             sub equal {
318 0 0 0 0 1 0 return 0 unless ( ref $_[0] && ref $_[1] );
319 0 0       0 $_[0]->{WRK_ID} == $_[1]->{WRK_ID} ? 1 : 0;
320             }
321              
322             sub error {
323 22 50   22 1 224 _croak('Usage: $hobo->error()') unless ref( my $self = $_[0] );
324 22 50       117 $self->join() unless $self->{REAPED};
325 22 50       622 $self->{ERROR} || undef;
326             }
327              
328             sub exit {
329 10 50 33 10 1 895 shift if ( defined $_[0] && $_[0] eq __PACKAGE__ );
330              
331 10 50       575 my ( $self ) = ( ref $_[0] ? shift : $_SELF );
332 10         220 my ( $pkg, $wrk_id ) = ( $self->{PKG}, $self->{WRK_ID} );
333              
334 10 50 33     760 if ( $wrk_id == $$ && $self->{MGR_ID} eq "$$.$_tid" ) {
    50          
335 0         0 MCE::Hobo->finish('MCE'); CORE::exit(@_);
  0         0  
336             }
337             elsif ( $wrk_id == $$ ) {
338 0   0     0 alarm 0; my ( $exit_status, @res ) = @_; $? = $exit_status || 0;
  0         0  
  0         0  
339             $_DATA->{$pkg}->set('R'.$wrk_id, @res ? $_freeze->(\@res) : '')
340 0 0       0 unless $self->{IGNORE};
    0          
341 0         0 die "Hobo exited ($?)\n";
342 0         0 _exit($?); # not reached
343             }
344              
345 10 50       220 return $self if $self->{REAPED};
346              
347 10 50       115 if ( defined $_DATA->{$pkg} ) {
348 10         390 sleep $_yield_secs until $_DATA->{$pkg}->exists('S'.$wrk_id);
349             } else {
350 0         0 sleep 0.030;
351             }
352              
353 10 50       125 if ($_is_MSWin32) {
354 0 0       0 CORE::kill('KILL', $wrk_id) if CORE::kill('ZERO', $wrk_id);
355             } else {
356 10 50       460 CORE::kill('QUIT', $wrk_id) if CORE::kill('ZERO', $wrk_id);
357             }
358              
359 10         55 $self;
360             }
361              
362             sub finish {
363 6 50   6 1 4013 _croak('Usage: MCE::Hobo->finish()') if ref($_[0]);
364 6 50 33     83 shift if ( defined $_[0] && $_[0] eq __PACKAGE__ );
365              
366 6 100       73 my $pkg = defined($_[0]) ? shift : "$$.$_tid.".caller();
367              
368 6 100       41 if ( $pkg eq 'MCE' ) {
    50          
369 3         9 for my $key ( keys %{ $_LIST } ) { MCE::Hobo->finish($key); }
  3         32  
  2         50  
370             }
371             elsif ( defined $_LIST->{$pkg} ) {
372 3 50       16 return if $MCE::Signal::KILLED;
373              
374 3 50       13 if ( exists $_DELY->{$pkg} ) {
375 3         40 &_force_reap($pkg);
376             delete($_DELY->{$pkg}), delete($_DATA->{"$pkg:seed"}),
377             delete($_LIST->{$pkg}), delete($_DATA->{"$pkg:id"}),
378 3         241 delete($_MNGD->{$pkg}), delete($_DATA->{ $pkg });
379             }
380             }
381              
382 6         20 @_ = ();
383              
384 6         26 return;
385             }
386              
387             sub is_joinable {
388 15 50   15 1 230 _croak('Usage: $hobo->is_joinable()') unless ref( my $self = $_[0] );
389 15         60 my ( $wrk_id, $pkg ) = ( $self->{WRK_ID}, $self->{PKG} );
390              
391 15 50       320 if ( $wrk_id == $$ ) {
    50          
392 0         0 '';
393             }
394             elsif ( $self->{MGR_ID} eq "$$.$_tid" ) {
395 15 50       55 return '' if $self->{REAPED};
396 15         65 local $!;
397 15 50       205 ( waitpid($wrk_id, _WNOHANG) == 0 ) ? '' : do {
398 0 0       0 _reap_hobo($self, 0) unless $self->{REAPED};
399 0         0 1;
400             };
401             }
402             else {
403             _croak('Error: $hobo->is_joinable() not called by managed process')
404 0 0       0 if ( $self->{IGNORE} );
405              
406 0 0       0 return '' if $self->{REAPED};
407 0 0       0 $_DATA->{$pkg}->exists('R'.$wrk_id) ? 1 : '';
408             }
409             }
410              
411             sub is_running {
412 15 50   15 1 9660 _croak('Usage: $hobo->is_running()') unless ref( my $self = $_[0] );
413 15         65 my ( $wrk_id, $pkg ) = ( $self->{WRK_ID}, $self->{PKG} );
414              
415 15 50       175 if ( $wrk_id == $$ ) {
    50          
416 0         0 1;
417             }
418             elsif ( $self->{MGR_ID} eq "$$.$_tid" ) {
419 15 50       60 return '' if $self->{REAPED};
420 15         50 local $!;
421 15 50       200 ( waitpid($wrk_id, _WNOHANG) == 0 ) ? 1 : do {
422 0 0       0 _reap_hobo($self, 0) unless $self->{REAPED};
423 0         0 '';
424             };
425             }
426             else {
427             _croak('Error: $hobo->is_running() not called by managed process')
428 0 0       0 if ( $self->{IGNORE} );
429              
430 0 0       0 return '' if $self->{REAPED};
431 0 0       0 $_DATA->{$pkg}->exists('R'.$wrk_id) ? '' : 1;
432             }
433             }
434              
435             sub join {
436 36 50   36 1 26582 _croak('Usage: $hobo->join()') unless ref( my $self = $_[0] );
437 36         179 my ( $wrk_id, $pkg ) = ( $self->{WRK_ID}, $self->{PKG} );
438              
439 36 50       137 if ( $self->{REAPED} ) {
440 0 0       0 _croak('Hobo already joined') unless exists( $self->{RESULT} );
441 0 0       0 $_LIST->{$pkg}->del($wrk_id) if ( defined $_LIST->{$pkg} );
442              
443             return ( defined wantarray )
444 0 0       0 ? wantarray ? @{ delete $self->{RESULT} } : delete( $self->{RESULT} )->[-1]
  0 0       0  
445             : ();
446             }
447              
448 36 50       787 if ( $wrk_id == $$ ) {
    50          
449 0         0 _croak('Cannot join self');
450             }
451             elsif ( $self->{MGR_ID} eq "$$.$_tid" ) {
452             # remove from list after reaping
453 36 50       102 if ( $_tid ) {
454 0         0 local $SIG{CHLD};
455 0         0 _reap_hobo($self, 1);
456 0         0 $_LIST->{$pkg}->del($wrk_id);
457             }
458             else {
459 36         1853 local ($SIG{CHLD}, $!);
460 36         28635364 waitpid($wrk_id, 0);
461 36         645 _reap_hobo($self, 0);
462 36         220 $_LIST->{$pkg}->del($wrk_id);
463             }
464             }
465             else {
466             _croak('Error: $hobo->join() not called by managed process')
467 0 0       0 if ( $self->{IGNORE} );
468              
469 0         0 sleep 0.3 until ( $_DATA->{$pkg}->exists('R'.$wrk_id) );
470 0         0 _reap_hobo($self, 0);
471             }
472              
473 36 50       152 return unless ( exists $self->{RESULT} );
474              
475             ( defined wantarray )
476 36 50       480 ? wantarray ? @{ delete $self->{RESULT} } : delete( $self->{RESULT} )->[-1]
  0 100       0  
477             : ();
478             }
479              
480             sub kill {
481 5 50   5 1 365 _croak('Usage: $hobo->kill()') unless ref( my $self = $_[0] );
482 5         110 my ( $wrk_id, $pkg, $signal ) = ( $self->{WRK_ID}, $self->{PKG}, $_[1] );
483              
484 5 50       60 if ( $wrk_id == $$ ) {
485 0   0     0 CORE::kill($signal || 'INT', $$);
486 0         0 return $self;
487             }
488 5 50       80 if ( $self->{MGR_ID} eq "$$.$_tid" ) {
489 5 50       15 return $self if $self->{REAPED};
490 5 50       35 if ( defined $_DATA->{$pkg} ) {
491 5         70 sleep $_yield_secs until $_DATA->{$pkg}->exists('S'.$wrk_id);
492             } else {
493 0         0 sleep 0.030;
494             }
495             }
496              
497 5 50 50     210 CORE::kill($signal || 'INT', $wrk_id) if CORE::kill('ZERO', $wrk_id);
498              
499 5         30 $self;
500             }
501              
502             sub list {
503 5 50   5 1 4995 _croak('Usage: MCE::Hobo->list()') if ref($_[0]);
504 5         50 my $pkg = "$$.$_tid.".caller();
505              
506 5 50       40 ( defined $_LIST->{$pkg} ) ? $_LIST->{$pkg}->vals() : ();
507             }
508              
509             sub list_pids {
510 5 50   5 1 465 _croak('Usage: MCE::Hobo->list_pids()') if ref($_[0]);
511 5         430 my $pkg = "$$.$_tid.".caller(); local $_;
  5         140  
512              
513 5 50       210 ( defined $_LIST->{$pkg} ) ? map { $_->pid } $_LIST->{$pkg}->vals() : ();
  15         255  
514             }
515              
516             sub list_joinable {
517 5 50   5 1 5460 _croak('Usage: MCE::Hobo->list_joinable()') if ref($_[0]);
518 5         50 my $pkg = "$$.$_tid.".caller();
519              
520 5 50       35 return () unless ( my $list = $_LIST->{$pkg} );
521 5         25 local ($!, $?, $_);
522              
523             map {
524 5 50       40 ( waitpid($_->{WRK_ID}, _WNOHANG) == 0 ) ? () : do {
  15         250  
525 0 0       0 _reap_hobo($_, 0) unless $_->{REAPED};
526 0         0 $_;
527             };
528             }
529             $list->vals();
530             }
531              
532             sub list_running {
533 5 50   5 1 19025 _croak('Usage: MCE::Hobo->list_running()') if ref($_[0]);
534 5         65 my $pkg = "$$.$_tid.".caller();
535              
536 5 50       50 return () unless ( my $list = $_LIST->{$pkg} );
537 5         125 local ($!, $?, $_);
538              
539             map {
540 5 50       245 ( waitpid($_->{WRK_ID}, _WNOHANG) == 0 ) ? $_ : do {
  15         160  
541 0 0       0 _reap_hobo($_, 0) unless $_->{REAPED};
542 0         0 ();
543             };
544             }
545             $list->vals();
546             }
547              
548             sub max_workers {
549 17 50   17 1 115 _croak('Usage: MCE::Hobo->max_workers()') if ref($_[0]);
550 17   33     192 my $mngd = $_MNGD->{ "$$.$_tid.".caller() } || do {
551             # construct mngd internally on first use unless defined
552             init(); $_MNGD->{ "$$.$_tid.".caller() };
553             };
554 17 50       49 shift if ( $_[0] eq __PACKAGE__ );
555              
556 17 100       80 $mngd->{max_workers} = _max_workers(shift) if @_;
557 17         121 $mngd->{max_workers};
558             }
559              
560             sub pending {
561 5 50   5 1 4955 _croak('Usage: MCE::Hobo->pending()') if ref($_[0]);
562 5         50 my $pkg = "$$.$_tid.".caller();
563              
564 5 50       50 ( defined $_LIST->{$pkg} ) ? $_LIST->{$pkg}->len() : 0;
565             }
566              
567             sub pid {
568 22 50   22 1 197 ref($_[0]) ? $_[0]->{WRK_ID} : $_SELF->{WRK_ID};
569             }
570              
571             sub result {
572 7 50   7 1 267 _croak('Usage: $hobo->result()') unless ref( my $self = $_[0] );
573 7 50       48 return $self->join() unless $self->{REAPED};
574              
575 7 50       29 _croak('Hobo already joined') unless exists( $self->{RESULT} );
576 7 50       50 wantarray ? @{ delete $self->{RESULT} } : delete( $self->{RESULT} )->[-1];
  0         0  
577             }
578              
579             sub seed {
580 0 0   0 1 0 _croak('Usage: MCE::Hobo->seed()') if ref($_[0]);
581 0 0       0 my $pkg = exists $_SELF->{PKG} ? $_SELF->{PKG} : "$$.$_tid.".caller();
582              
583 0         0 return $_DATA->{"$pkg:seed"};
584             }
585              
586             sub self {
587 0 0   0 1 0 ref($_[0]) ? $_[0] : $_SELF;
588             }
589              
590             sub wait_all {
591 1 50   1 1 143 _croak('Usage: MCE::Hobo->wait_all()') if ref($_[0]);
592 1         70 my $pkg = "$$.$_tid.".caller();
593              
594             return wantarray ? () : 0
595 1 0 33     103 if ( !defined $_LIST->{$pkg} || !$_LIST->{$pkg}->len() );
    50          
596              
597 1         26 local $_; ( wantarray )
598 0         0 ? map { $_->join(); $_ } $_LIST->{$pkg}->vals()
  0         0  
599 1 50       69 : map { $_->join(); () } $_LIST->{$pkg}->vals();
  3         70  
  3         23  
600             }
601              
602             *waitall = \&wait_all; # compatibility
603              
604             sub wait_one {
605 4 50   4 1 344 _croak('Usage: MCE::Hobo->wait_one()') if ref($_[0]);
606 4         232 my $pkg = "$$.$_tid.".caller();
607              
608             return undef
609 4 50 33     348 if ( !defined $_LIST->{$pkg} || !$_LIST->{$pkg}->len() );
610              
611 4         360 _wait_one($pkg);
612             }
613              
614             *waitone = \&wait_one; # compatibility
615              
616             sub yield {
617 0 0   0 1 0 _croak('Usage: MCE::Hobo->yield()') if ref($_[0]);
618 0 0 0     0 shift if ( defined $_[0] && $_[0] eq __PACKAGE__ );
619              
620 0   0     0 my $pkg = $_SELF->{PKG} || do {
621             my $mngd = $_MNGD->{ "$$.$_tid.".caller() } || do {
622             # construct mngd internally on first use unless defined
623             init(); $_MNGD->{ "$$.$_tid.".caller() };
624             };
625             $mngd->{PKG};
626             };
627              
628 0 0       0 return unless $_DELY->{$pkg};
629 0         0 my $seconds = $_DELY->{$pkg}->seconds(@_);
630              
631 0         0 MCE::Util::_sleep( $seconds );
632             }
633              
634             ###############################################################################
635             ## ----------------------------------------------------------------------------
636             ## Private methods.
637             ##
638             ###############################################################################
639              
640             sub _croak {
641 0 0   0   0 if ( $INC{'MCE.pm'} ) {
642 0         0 goto &MCE::_croak;
643             }
644             else {
645 0         0 $SIG{__DIE__} = \&MCE::Signal::_die_handler;
646 0         0 $SIG{__WARN__} = \&MCE::Signal::_warn_handler;
647              
648 0         0 $\ = undef; goto &Carp::croak;
  0         0  
649             }
650             }
651              
652             sub _dispatch {
653 9     9   99 my ( $mngd, $func, $args ) = @_;
654              
655 9         489 $mngd->{WRK_ID} = $_SELF->{WRK_ID} = $$, $? = 0;
656 9 50       45 $ENV{PERL_MCE_IPC} = 'win32' if $_is_MSWin32;
657              
658             {
659 9         338 local $!;
  9         326  
660 9 50       775 (*STDERR)->autoflush(1) if defined( fileno *STDERR );
661 9 50       3233 (*STDOUT)->autoflush(1) if defined( fileno *STDOUT );
662             }
663              
664             # Run task.
665             my $hobo_timeout = ( exists $_SELF->{hobo_timeout} )
666 9 50       445 ? $_SELF->{hobo_timeout} : $mngd->{hobo_timeout};
667              
668             my $void_context = ( exists $_SELF->{void_context} )
669 9 50       142 ? $_SELF->{void_context} : $mngd->{void_context};
670              
671 9         22 my @res; my $timed_out = 0;
  9         75  
672              
673             local $SIG{'ALRM'} = sub {
674 0     0   0 alarm 0; $timed_out = 1; $SIG{__WARN__} = sub {};
  0         0  
  0         0  
675 0         0 die "Hobo timed out\n";
676 9         652 };
677              
678 9 50 33     236 if ( $void_context || $_SELF->{IGNORE} ) {
679 12     12   123 no strict 'refs';
  12         22  
  12         903  
680 0   0     0 eval { alarm($hobo_timeout || 0); $func->(@{ $args }) };
  0         0  
  0         0  
  0         0  
681             }
682             else {
683 12     12   87 no strict 'refs';
  12         21  
  12         4142  
684 9   50     38 @res = eval { alarm($hobo_timeout || 0); $func->(@{ $args }) };
  9         213  
  9         35  
  9         181  
685             }
686              
687 9         60509 alarm 0;
688 9 50       76 $@ = "Hobo timed out" if $timed_out;
689              
690 9 50       365 if ( $@ ) {
691 0 0       0 _exit($?) if ( $@ =~ /^Hobo exited \(\S+\)$/ );
692 0         0 my $err = $@; $? = 1; $err =~ s/, <__ANONIO__> line \d+//;
  0         0  
  0         0  
693              
694 0 0       0 if ( ! $_SELF->{IGNORE} ) {
695             $_DATA->{ $_SELF->{PKG} }->set('S'.$$, $err),
696 0         0 $_DATA->{ $_SELF->{PKG} }->set('R'.$$, '');
697             }
698              
699 0 0 0     0 if ( !$timed_out && !$mngd->{on_finish} && !$INC{'MCE/Simple.pm'} ) {
      0        
700 12     12   7894 use bytes; warn "Hobo $$ terminated abnormally: reason $err\n";
  12         8788  
  12         84  
  0         0  
701             }
702             }
703             else {
704 9 50       93 shift(@res) if ref($res[0]) =~ /^MCE::(?:Barrier|Semaphore)::_guard/s;
705             $_DATA->{ $_SELF->{PKG} }->set('R'.$$, @res ? $_freeze->(\@res) : '')
706 9 50       568 if ( ! $_SELF->{IGNORE} );
    50          
707             }
708              
709 9         251 _exit($?);
710             }
711              
712             sub _exit {
713 9     9   43 my ( $exit_status ) = @_;
714              
715             # Check for nested workers not yet joined.
716 9 50 33     189 MCE::Hobo->finish('MCE') if ( !$_SELF->{SIGNALED} && keys %{ $_LIST } );
  9         140  
717              
718             # Exit hobo process.
719 9 50   0   404 $SIG{__DIE__} = sub {} unless $_tid;
720 9     0   548 $SIG{__WARN__} = sub {};
721              
722 9 50 33     95 threads->exit($exit_status) if ( $INC{'threads.pm'} && $_is_MSWin32 );
723 9 50 33     73 CORE::kill('KILL', $$) if ( $_SELF->{SIGNALED} && !$_is_MSWin32 );
724              
725             my $posix_exit = ( exists $_SELF->{posix_exit} )
726 9 50       101 ? $_SELF->{posix_exit} : $_MNGD->{ $_SELF->{PKG} }{posix_exit};
727              
728 9 50 33     75 if ( $posix_exit && !$_is_MSWin32 ) {
729 0         0 eval { MCE::Mutex::Channel::_destroy() };
  0         0  
730 0 0       0 POSIX::_exit($exit_status) if $INC{'POSIX.pm'};
731 0         0 CORE::kill('KILL', $$);
732             }
733              
734 9         8457 CORE::exit($exit_status);
735             }
736              
737             sub _force_reap {
738 10     10   52 my ( $count, $pkg ) = ( 0, @_ );
739 10 50 33     110 return unless ( defined $_LIST->{$pkg} && $_LIST->{$pkg}->len() );
740              
741 0         0 for my $hobo ( $_LIST->{$pkg}->vals() ) {
742 0 0       0 next if $hobo->{IGNORE};
743              
744 0 0       0 if ( $hobo->is_running() ) {
745 0 0       0 sleep($_yield_secs), CORE::kill('KILL', $hobo->pid())
746             if CORE::kill('ZERO', $hobo->pid());
747 0         0 $count++;
748             }
749             }
750              
751 0         0 $_LIST->{$pkg}->clear();
752              
753 0 0 0     0 warn "Finished with active hobo processes [$pkg] ($count)\n"
754             if ( $count && !$_is_MSWin32 );
755              
756 0         0 return;
757             }
758              
759             sub _quit {
760 0 0   0   0 return MCE::Signal::defer($_[0]) if $MCE::Signal::IPC;
761              
762 0         0 alarm 0; my ( $name ) = @_;
  0         0  
763 0         0 $_SELF->{SIGNALED} = 1, $name =~ s/^SIG//;
764              
765       0     $SIG{$name} = sub {}, CORE::kill($name, -$$)
766 0 0       0 if ( exists $SIG{$name} );
767              
768 0 0       0 if ( ! $_SELF->{IGNORE} ) {
769 0         0 my ( $pkg, $wrk_id ) = ( $_SELF->{PKG}, $_SELF->{WRK_ID} );
770 0         0 $_DATA->{$pkg}->set('R'.$wrk_id, '');
771             }
772              
773 0         0 _exit(0);
774             }
775              
776             sub _reap_hobo {
777 40     40   309 my ( $hobo, $wait_flag ) = @_;
778 40 50 33     882 return if ( !$hobo || !defined $hobo->{PKG} );
779              
780 40         620 local @_ = $_DATA->{ $hobo->{PKG} }->get($hobo->{WRK_ID}, $wait_flag);
781              
782 40 100 50     2300 ( $hobo->{ERROR}, $hobo->{RESULT}, $hobo->{REAPED} ) =
783             ( pop || '', length $_[0] ? $_thaw->(pop) : [], 1 );
784              
785 40 50       215 return if $hobo->{IGNORE};
786              
787 40   100     485 my ( $exit, $err ) = ( $? || 0, $hobo->{ERROR} );
788 40         138 my ( $code, $sig ) = ( $exit >> 8, $exit & 0x7f );
789              
790 40 50 33     149 if ( $code > 100 && !$err ) {
791 0 0       0 $code = 2, $sig = 1, $err = 'Hobo received SIGHUP' if $code == 101;
792 0 0       0 $code = 2, $sig = 2, $err = 'Hobo received SIGINT' if $code == 102;
793 0 0       0 $code = 2, $sig = 6, $err = 'Hobo received SIGABRT' if $code == 106;
794 0 0       0 $code = 2, $sig = 11, $err = 'Hobo received SIGSEGV' if $code == 111;
795 0 0       0 $code = 2, $sig = 15, $err = 'Hobo received SIGTERM' if $code == 115;
796              
797 0         0 $hobo->{ERROR} = $err;
798             }
799              
800 40 100       260 if ( my $on_finish = $_MNGD->{ $hobo->{PKG} }{on_finish} ) {
801             $on_finish->(
802             $hobo->{WRK_ID}, $code, $hobo->{ident}, $sig, $err,
803 7         135 @{ $hobo->{RESULT} }
  7         90  
804             );
805             }
806              
807 40         386 return;
808             }
809              
810             sub _trap {
811 0 0   0   0 return MCE::Signal::defer($_[0]) if $MCE::Signal::IPC;
812              
813 0         0 alarm 0; my ( $exit_status, $name ) = ( 2, @_ );
  0         0  
814 0         0 $_SELF->{SIGNALED} = 1, $name =~ s/^SIG//;
815              
816       0     $SIG{$name} = sub {}, CORE::kill($name, -$$)
817 0 0       0 if ( exists $SIG{$name} );
818              
819 0 0       0 if ( $name eq 'HUP' ) { $exit_status = 101 }
  0 0       0  
    0          
    0          
    0          
820 0         0 elsif ( $name eq 'INT' ) { $exit_status = 102 }
821 0         0 elsif ( $name eq 'ABRT' ) { $exit_status = 106 }
822 0         0 elsif ( $name eq 'SEGV' ) { $exit_status = 111 }
823 0         0 elsif ( $name eq 'TERM' ) { $exit_status = 115 }
824              
825 0 0       0 if ( ! $_SELF->{IGNORE} ) {
826 0         0 my ( $pkg, $wrk_id ) = ( $_SELF->{PKG}, $_SELF->{WRK_ID} );
827 0         0 $_DATA->{$pkg}->set('R'.$wrk_id, '');
828             }
829              
830 0         0 _exit($exit_status);
831             }
832              
833             sub _wait_one {
834 4     4   24 my ( $pkg ) = @_;
835 4         100 my ( $list, $self, $wrk_id ) = ( $_LIST->{$pkg} ); local $!;
  4         224  
836              
837 4         52 while () {
838 1120         13104 for my $hobo ( $list->vals() ) {
839 1120         4624 $wrk_id = $hobo->{WRK_ID};
840 1120 50       5796 return $list->del($wrk_id) if $hobo->{REAPED};
841 1120 100       25696 $self = $list->del($wrk_id), last if waitpid($wrk_id, _WNOHANG);
842             }
843 1120 100       5072 last if $self;
844 1116         9704956 sleep $_yield_secs;
845             }
846              
847 4         76 _reap_hobo($self, 0);
848              
849 4         124 $self;
850             }
851              
852             ###############################################################################
853             ## ----------------------------------------------------------------------------
854             ## Delay implementation suited for MCE::Hobo.
855             ##
856             ###############################################################################
857              
858             package # hide from rpm
859             MCE::Hobo::_delay;
860              
861             sub new {
862 12     12   55 my ( $class, $chnl, $delay ) = @_;
863              
864 12 50       76 if ( !defined $delay ) {
865 12 50       545 $delay = ($^O =~ /mswin|mingw|msys|cygwin/i) ? 0.015 : 0.008;
866             }
867              
868 12         146 $chnl->send(undef);
869              
870 12         774 bless [ $delay, $chnl ], $class;
871             }
872              
873             sub seconds {
874 0     0   0 my ( $self, $how_long ) = @_;
875 0 0       0 my $delay = defined($how_long) ? $how_long : $self->[0];
876 0         0 my $lapse = $self->[1]->recv();
877 0         0 my $time = MCE::Util::_time();
878              
879 0 0 0     0 if ( !$delay || !defined $lapse ) {
    0          
880 0         0 $lapse = $time;
881             }
882             elsif ( $lapse + $delay - $time < 0 ) {
883 0         0 $lapse += int( abs($time - $lapse) / $delay + 0.5 ) * $delay;
884             }
885              
886 0         0 $self->[1]->send( $lapse += $delay );
887              
888 0         0 return $lapse - $time;
889             }
890              
891             ###############################################################################
892             ## ----------------------------------------------------------------------------
893             ## Hash and ordhash implementations suited for MCE::Hobo.
894             ##
895             ###############################################################################
896              
897             package # hide from rpm
898             MCE::Hobo::_hash;
899              
900 12     12   43002 use MCE::Shared ();
  12         52  
  12         514  
901 12     12   90 use Time::HiRes 'sleep';
  12         22  
  12         130  
902              
903             use constant {
904 12 50       12967 _WNOHANG => ( $INC{'POSIX.pm'} )
    50          
905             ? &POSIX::WNOHANG : ( $^O eq 'solaris' ) ? 64 : 1
906 12     12   1443 };
  12         21  
907              
908             sub new {
909 12     12   749 bless \ MCE::Shared->share({ module => 'MCE::Shared::Hash' }), shift;
910             }
911              
912 7     7   21 sub clear { ${ $_[0] }->clear(); }
  7         195  
913 15     15   105 sub exists { ${ $_[0] }->exists($_[1]); }
  15         1395  
914 18     18   132 sub set { ${ $_[0] }->set($_[1], $_[2]); }
  18         596  
915              
916             sub get_done {
917 0     0   0 my ( $self ) = @_;
918 0         0 my @ret;
919              
920 0         0 for my $key (${ $self }->keys) {
  0         0  
921 0 0       0 push @ret, $key if ( substr($key, 0, 1) eq 'R' );
922             }
923              
924 0         0 return @ret;
925             }
926              
927             sub get {
928 40     40   152 my ( $self, $wrk_id, $wait_flag ) = @_;
929              
930 40 50       195 if ( $wait_flag ) {
931 0         0 local $!;
932 0 0       0 ( ${ $self }->exists('R'.$wrk_id) ) ? waitpid($wrk_id, 0) : do {
  0         0  
933 0         0 while () {
934 0 0       0 if ( ! ${ $self }->exists('R'.$wrk_id) ) {
  0         0  
935 0 0       0 last if waitpid($wrk_id, _WNOHANG);
936 0         0 sleep(0.030), next;
937             }
938 0         0 waitpid($wrk_id, 0), last;
939             }
940             };
941             }
942              
943 40         103 ${ $self }->_get_hobo_data($wrk_id);
  40         839  
944             }
945              
946             package # hide from rpm
947             MCE::Hobo::_ordhash;
948              
949 12     12   297 sub new { bless [ {}, [], {}, 0 ], shift; } # data, keys, indx, gcnt
950 0     0   0 sub exists { CORE::exists $_[0]->[0]{ $_[1] }; }
951 0     0   0 sub get { $_[0]->[0]{ $_[1] }; }
952 20     20   60 sub len { scalar keys %{ $_[0]->[0] }; }
  20         499  
953              
954             sub clear {
955 0     0   0 my ( $self ) = @_;
956 0         0 %{ $self->[0] } = @{ $self->[1] } = %{ $self->[2] } = (), $self->[3] = 0;
  0         0  
  0         0  
  0         0  
957              
958 0         0 return;
959             }
960              
961             sub del {
962 40     40   197 my ( $self, $key ) = @_;
963 40 50       262 return undef unless defined( my $off = delete $self->[2]{$key} );
964              
965             # tombstone
966 40         126 $self->[1][$off] = undef;
967              
968             # GC keys and refresh index
969 40 100       134 if ( ++$self->[3] > @{ $self->[1] } * 0.667 ) {
  40         204  
970 18         72 my ( $keys, $indx ) = ( $self->[1], $self->[2] );
971 18         51 my $i; $i = $self->[3] = 0;
  18         71  
972 18         35 for my $k ( @{ $keys } ) {
  18         312  
973 40 50       127 $keys->[$i] = $k, $indx->{$k} = $i++ if defined($k);
974             }
975 18         38 splice @{ $keys }, $i;
  18         83  
976             }
977              
978 40         677 delete $self->[0]{$key};
979             }
980              
981             sub set {
982 46     46   1898 my ( $self, $key ) = @_;
983 46 50       1461 $self->[0]{$key} = $_[2], return 1 if exists($self->[0]{$key});
984              
985 46         174 $self->[2]{$key} = @{ $self->[1] }; push @{ $self->[1] }, $key;
  46         3211  
  46         146  
  46         1632  
986 46         959 $self->[0]{$key} = $_[2];
987              
988 46         290 return 1;
989             }
990              
991             sub vals {
992 1141     1141   5175 my ( $self ) = @_;
993             $self->[3]
994 0         0 ? @{ $self->[0] }{ grep defined($_), @{ $self->[1] } }
  0         0  
995 1141 50       7091 : @{ $self->[0] }{ @{ $self->[1] } };
  1141         8538  
  1141         3613  
996             }
997              
998             1;
999              
1000             __END__