File Coverage

blib/lib/MCE/Hobo.pm
Criterion Covered Total %
statement 341 510 66.8
branch 162 412 39.3
condition 51 161 31.6
subroutine 56 75 74.6
pod 23 23 100.0
total 633 1181 53.6


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## A threads-like parallelization module.
4             ##
5             ###############################################################################
6              
7 18     18   1320000 use strict;
  18         103  
  18         536  
8 18     18   73 use warnings;
  18         124  
  18         446  
9              
10 18     18   567 use 5.010001;
  18         55  
11              
12 18     18   146 no warnings qw( threads recursion uninitialized once redefine );
  18         30  
  18         1408  
13              
14             package MCE::Hobo;
15              
16             our $VERSION = '1.881';
17              
18             ## no critic (BuiltinFunctions::ProhibitStringyEval)
19             ## no critic (Subroutines::ProhibitExplicitReturnUndef)
20             ## no critic (Subroutines::ProhibitSubroutinePrototypes)
21             ## no critic (TestingAndDebugging::ProhibitNoStrict)
22              
23 18     18   9866 use MCE::Signal ();
  18         78988  
  18         467  
24 18     18   8359 use MCE::Mutex ();
  18         7897  
  18         349  
25 18     18   8820 use MCE::Channel ();
  18         351592  
  18         453  
26 18     18   113 use Time::HiRes 'sleep';
  18         37  
  18         92  
27              
28             use overload (
29             q(==) => \&equal,
30 0     0   0 q(!=) => sub { !equal(@_) },
31 18         244 fallback => 1
32 18     18   25724 );
  18         18090  
33              
34             sub import {
35 18 50   18   213 if (caller !~ /^MCE::/) {
36 18     18   2264 no strict 'refs'; no warnings 'redefine';
  18     18   58  
  18         690  
  18         101  
  18         37  
  18         1888  
37 18         37 *{ caller().'::mce_async' } = \&mce_async;
  18         91  
38             }
39 18         202 return;
40             }
41              
42             ## The POSIX module has many symbols. Try not loading it simply
43             ## to have WNOHANG. The following covers most platforms.
44              
45             use constant {
46 18 50       89511 _WNOHANG => ( $INC{'POSIX.pm'} )
    50          
47             ? &POSIX::WNOHANG : ( $^O eq 'solaris' ) ? 64 : 1
48 18     18   126 };
  18         69  
49              
50             my ( $_MNGD, $_DATA, $_DELY, $_LIST ) = ( {}, {}, {}, {} );
51              
52             my $_freeze = MCE::Channel::_get_freeze();
53             my $_thaw = MCE::Channel::_get_thaw();
54              
55             my $_is_MSWin32 = ( $^O eq 'MSWin32' ) ? 1 : 0;
56             my $_tid = ( $INC{'threads.pm'} ) ? threads->tid() : 0;
57              
58             sub CLONE {
59 0 0   0   0 $_tid = threads->tid(), &_clear() if $INC{'threads.pm'};
60             }
61              
62             sub _clear {
63 0     0   0 %{ $_LIST } = ();
  0         0  
64             }
65              
66             sub _max_workers {
67 10     10   32 my ( $cpus ) = @_;
68 10 100       57 if ( $cpus eq 'auto' ) {
    100          
69 1         16 $cpus = MCE::Util::get_ncpu();
70             }
71             elsif ( $cpus =~ /^([0-9.]+)%$/ ) {
72 6         30 my ( $percent, $ncpu ) = ( $1 / 100, MCE::Util::get_ncpu() );
73 6         27 $cpus = $ncpu * $percent + 0.5;
74             }
75 10 100 66     130 $cpus = 1 if $cpus !~ /^[\d\.]+$/ || $cpus < 1;
76 10         36 return int($cpus);
77             }
78              
79             ###############################################################################
80             ## ----------------------------------------------------------------------------
81             ## Init routine.
82             ##
83             ###############################################################################
84              
85             bless my $_SELF = { MGR_ID => "$$.$_tid", WRK_ID => $$ }, __PACKAGE__;
86              
87             sub init {
88 25 100 66 25 1 3721 shift if ( defined $_[0] && $_[0] eq __PACKAGE__ );
89              
90             # -- options ----------------------------------------------------------
91             # max_workers hobo_timeout posix_exit on_start on_finish void_context
92             # ---------------------------------------------------------------------
93              
94 25 100       269 my $pkg = "$$.$_tid.".( caller eq __PACKAGE__ ? caller(1) : caller );
95 25 50       342 my $mngd = $_MNGD->{$pkg} = ( ref $_[0] eq 'HASH' ) ? shift : { @_ };
96              
97 25         69 @_ = ();
98              
99             $mngd->{MGR_ID} = "$$.$_tid", $mngd->{PKG} = $pkg,
100 25         194 $mngd->{WRK_ID} = $$;
101              
102 25 100       206 &_force_reap($pkg), $_DATA->{$pkg}->clear() if ( exists $_LIST->{$pkg} );
103              
104 25 100       90 if ( !exists $_LIST->{$pkg} ) {
105 18 0 33     54 $MCE::_GMUTEX->lock() if ( $_tid && $MCE::_GMUTEX );
106 18 50       78 sleep 0.015 if $_tid;
107              
108             # Start the shared-manager process if not running.
109 18 50       445 MCE::Shared->start() if $INC{'MCE/Shared.pm'};
110              
111 18         1204 my $chnl = MCE::Channel->new( impl => 'Mutex' );
112 18         106944 $_LIST->{ $pkg } = MCE::Hobo::_ordhash->new();
113 18         700 $_DELY->{ $pkg } = MCE::Hobo::_delay->new( $chnl );
114 18         292 $_DATA->{ $pkg } = MCE::Hobo::_hash->new();
115 18         857 $_DATA->{"$pkg:seed"} = int(rand() * 1e9);
116 18         89 $_DATA->{"$pkg:id" } = 0;
117              
118 18 0 33     83 $MCE::_GMUTEX->unlock() if ( $_tid && $MCE::_GMUTEX );
119             }
120              
121 25 50       169 if ( !exists $mngd->{posix_exit} ) {
122             $mngd->{posix_exit} = 1 if (
123             $^S || $_tid || $INC{'Mojo/IOLoop.pm'} ||
124             $INC{'Coro.pm'} || $INC{'LWP/UserAgent.pm'} || $INC{'stfl.pm'} ||
125             $INC{'Curses.pm'} || $INC{'CGI.pm'} || $INC{'FCGI.pm'} ||
126             $INC{'Tk.pm'} || $INC{'Wx.pm'} || $INC{'Win32/GUI.pm'} ||
127 25 50 33     1773 $INC{'Gearman/Util.pm'} || $INC{'Gearman/XS.pm'}
      33        
      33        
      33        
      33        
      33        
      33        
      33        
      33        
      33        
      33        
      33        
      33        
128             );
129             }
130              
131 25 100       119 if ( defined $mngd->{max_workers} ) {
132 3         26 $mngd->{max_workers} = _max_workers($mngd->{max_workers});
133             }
134              
135 25 50 33     116 if ( $INC{'LWP/UserAgent.pm'} && !$INC{'Net/HTTP.pm'} ) {
136 0         0 local $@; eval 'require Net::HTTP; require Net::HTTPS';
  0         0  
137             }
138              
139             require POSIX
140 25 50 66     5759 if ( $mngd->{on_finish} && !$INC{'POSIX.pm'} && !$_is_MSWin32 );
      66        
141              
142 25         40742 return;
143             }
144              
145             ###############################################################################
146             ## ----------------------------------------------------------------------------
147             ## 'new', 'mce_async', and 'create' for threads-like similarity.
148             ##
149             ###############################################################################
150              
151             ## 'new' and 'tid' are aliases for 'create' and 'pid' respectively.
152              
153             *new = \&create, *tid = \&pid;
154              
155             ## Use "goto" trick to avoid pad problems from 5.8.1 (fixed in 5.8.2)
156             ## Tip found in threads::async.
157              
158             sub mce_async (&;@) {
159 0     0 1 0 goto &create;
160             }
161              
162             sub create {
163 85   66 85 1 156704 my $mngd = $_MNGD->{ "$$.$_tid.".caller() } || do {
164             # construct mngd internally on first use unless defined
165             init(); $_MNGD->{ "$$.$_tid.".caller() };
166             };
167              
168 85 50       619 shift if ( $_[0] eq __PACKAGE__ );
169              
170             # ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~
171              
172 85 50       1014 my $self = bless ref $_[0] eq 'HASH' ? { %{ shift() } } : { }, __PACKAGE__;
  0         0  
173              
174 85 50       465 $self->{IGNORE} = 1 if $SIG{CHLD} eq 'IGNORE';
175 85         1372 $self->{MGR_ID} = $mngd->{MGR_ID}, $self->{PKG} = $mngd->{PKG};
176 85 50 33     539 $self->{ident } = shift if ( !ref $_[0] && ref $_[1] eq 'CODE' );
177              
178 85 0 33     218 my $func = shift; $func = caller().'::'.$func
  85   33     517  
179             if ( !ref $func && length $func && index($func,':') < 0 );
180              
181 85 50       259 if ( !defined $func ) {
182 0         0 local $\; print {*STDERR} "code function is not specified or valid\n";
  0         0  
  0         0  
183 0         0 return undef;
184             }
185              
186             my ( $list, $max_workers, $pkg ) = (
187             $_LIST->{ $mngd->{PKG} }, $mngd->{max_workers}, $mngd->{PKG}
188 85         367 );
189              
190 85 50       1097 $_DATA->{"$pkg:id"} = 10000 if ( ( my $id = ++$_DATA->{"$pkg:id"} ) >= 2e9 );
191              
192 85 50 33     1212 if ( $max_workers || $self->{IGNORE} ) {
193 0         0 my $wrk_id; local $!;
  0         0  
194              
195             # Reap completed hobo processes.
196 0         0 for my $hobo ( $list->vals() ) {
197 0         0 $wrk_id = $hobo->{WRK_ID};
198 0 0       0 $list->del($wrk_id), next if $hobo->{REAPED};
199 0 0       0 waitpid($wrk_id, _WNOHANG) or next;
200 0         0 _reap_hobo($list->del($wrk_id), 0);
201             }
202              
203             # Wait for a slot if saturated.
204 0 0 0     0 if ( $max_workers && $list->len() >= $max_workers ) {
205 0         0 my $count = $list->len() - $max_workers + 1;
206 0         0 _wait_one($pkg) for 1 .. $count;
207             }
208             }
209              
210             # ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~ ~~~
211              
212 85 0 33     284 $MCE::_GMUTEX->lock() if ( $_tid && $MCE::_GMUTEX );
213              
214 85         214 my @args = @_; @_ = (); # To avoid (Scalars leaked: N) messages
  85         155  
215 85         157 my ( $killed, $pid );
216              
217             {
218 85     0   171 local $SIG{TERM} = local $SIG{INT} = sub { $killed = $_[0] }
  0         0  
219 85 50 33     4166 if ( !$_is_MSWin32 && $] ge '5.010001' );
220              
221             local $SIG{TTIN}, local $SIG{TTOU}, local $SIG{WINCH}
222 85 50       3050 if ( !$_is_MSWin32 );
223              
224 85         87241 $pid = fork();
225              
226 85 50       5270 if ( !defined $pid ) { # error
    100          
227 0         0 local $\; print {*STDERR} "fork error: $!\n";
  0         0  
  0         0  
228             }
229             elsif ( $pid ) { # parent
230 70         2075 $self->{WRK_ID} = $pid;
231 70         3635 $list->set($pid, $self);
232 70 100       9900 $mngd->{on_start}->($pid, $self->{ident}) if $mngd->{on_start};
233             }
234             else { # child
235 15         1169 %{ $_LIST } = (), $_SELF = $self;
  15         2149  
236              
237             local $SIG{TERM} = local $SIG{INT} = local $SIG{ABRT} = \&_trap,
238             local $SIG{SEGV} = local $SIG{HUP} = \&_trap,
239 15         2637 local $SIG{QUIT} = \&_quit;
240 15         431 local $SIG{CHLD};
241              
242 15 50       1853 MCE::Shared::init() if $INC{'MCE/Shared.pm'};
243 15 50       667 $_DATA->{ $_SELF->{PKG} }->set('S'.$$, '') unless $self->{IGNORE};
244 15 50       120 CORE::kill($killed, $$) if $killed;
245              
246             # Sets the seed of the base generator uniquely between workers.
247             # The new seed is computed using the current seed and ID value.
248             # One may set the seed at the application level for predictable
249             # results. Ditto for Math::Prime::Util, Math::Random, and
250             # Math::Random::MT::Auto.
251              
252 15         192 srand( abs($_DATA->{"$pkg:seed"} - ($id * 100000)) % 2147483560 );
253              
254 15 50       83 if ( $INC{'Math/Prime/Util.pm'} ) {
255             Math::Prime::Util::srand(
256 0         0 abs($_DATA->{"$pkg:seed"} - ($id * 100000)) % 2147483560
257             );
258             }
259              
260 15 50       76 if ( $INC{'Math/Random.pm'} ) {
261 0         0 my $cur_seed = Math::Random::random_get_seed();
262 0 0       0 my $new_seed = ($cur_seed < 1073741781)
263             ? $cur_seed + ((abs($id) * 10000) % 1073741780)
264             : $cur_seed - ((abs($id) * 10000) % 1073741780);
265              
266 0         0 Math::Random::random_set_seed($new_seed, $new_seed);
267             }
268              
269 15 50       71 if ( $INC{'Math/Random/MT/Auto.pm'} ) {
270 0         0 my $cur_seed = Math::Random::MT::Auto::get_seed()->[0];
271 0 0       0 my $new_seed = ($cur_seed < 1073741781)
272             ? $cur_seed + ((abs($id) * 10000) % 1073741780)
273             : $cur_seed - ((abs($id) * 10000) % 1073741780);
274              
275 0         0 Math::Random::MT::Auto::set_seed($new_seed);
276             }
277              
278 15         579 _dispatch($mngd, $func, \@args);
279             }
280             }
281              
282 70 0 33     2447 $MCE::_GMUTEX->unlock() if ( $_tid && $MCE::_GMUTEX );
283              
284 70 50       364 CORE::kill($killed, $$) if $killed;
285              
286 70 50       6506 return $pid ? $self : undef;
287             }
288              
289             ###############################################################################
290             ## ----------------------------------------------------------------------------
291             ## Public methods.
292             ##
293             ###############################################################################
294              
295             sub equal {
296 0 0 0 0 1 0 return 0 unless ( ref $_[0] && ref $_[1] );
297 0 0       0 $_[0]->{WRK_ID} == $_[1]->{WRK_ID} ? 1 : 0;
298             }
299              
300             sub error {
301 31 50   31 1 485 _croak('Usage: $hobo->error()') unless ref( my $self = $_[0] );
302 31 50       124 $self->join() unless $self->{REAPED};
303 31 50       455 $self->{ERROR} || undef;
304             }
305              
306             sub exit {
307 10 50 33 10 1 445 shift if ( defined $_[0] && $_[0] eq __PACKAGE__ );
308              
309 10 50       235 my ( $self ) = ( ref $_[0] ? shift : $_SELF );
310 10         120 my ( $pkg, $wrk_id ) = ( $self->{PKG}, $self->{WRK_ID} );
311              
312 10 50 33     350 if ( $wrk_id == $$ && $self->{MGR_ID} eq "$$.$_tid" ) {
    50          
313 0         0 MCE::Hobo->finish('MCE'); CORE::exit(@_);
  0         0  
314             }
315             elsif ( $wrk_id == $$ ) {
316 0   0     0 alarm 0; my ( $exit_status, @res ) = @_; $? = $exit_status || 0;
  0         0  
  0         0  
317 0 0       0 $_DATA->{$pkg}->set('R'.$wrk_id, @res ? $_freeze->(\@res) : '');
318 0         0 die "Hobo exited ($?)\n";
319 0         0 _exit($?); # not reached
320             }
321              
322 10 50       55 return $self if $self->{REAPED};
323              
324 10 50       105 if ( exists $_DATA->{$pkg} ) {
325 10         170 sleep 0.015 until $_DATA->{$pkg}->exists('S'.$wrk_id);
326             } else {
327 0         0 sleep 0.030;
328             }
329              
330 10 50       35 if ($_is_MSWin32) {
331 0 0       0 CORE::kill('KILL', $wrk_id) if CORE::kill('ZERO', $wrk_id);
332             } else {
333 10 50       1235 CORE::kill('QUIT', $wrk_id) if CORE::kill('ZERO', $wrk_id);
334             }
335              
336 10         55 $self;
337             }
338              
339             sub finish {
340 7 50   7 1 4528 _croak('Usage: MCE::Hobo->finish()') if ref($_[0]);
341 7 50 33     125 shift if ( defined $_[0] && $_[0] eq __PACKAGE__ );
342              
343 7 100       49 my $pkg = defined($_[0]) ? $_[0] : caller();
344              
345 7 100       97 if ( $pkg eq 'MCE' ) {
    100          
346 3         10 for my $key ( keys %{ $_LIST } ) { MCE::Hobo->finish($key); }
  3         56  
  3         82  
347             }
348             elsif ( exists $_LIST->{$pkg} ) {
349 3 50       33 return if $MCE::Signal::KILLED;
350              
351 3 50       18 if ( exists $_DELY->{$pkg} ) {
352 3         56 &_force_reap($pkg);
353             delete($_DELY->{$pkg}), delete($_DATA->{"$pkg:seed"}),
354             delete($_LIST->{$pkg}), delete($_DATA->{"$pkg:id"}),
355 3         212 delete($_MNGD->{$pkg}), delete($_DATA->{ $pkg });
356             }
357             }
358              
359 7         39 @_ = ();
360              
361 7         39 return;
362             }
363              
364             sub is_joinable {
365 24 50   24 1 176 _croak('Usage: $hobo->is_joinable()') unless ref( my $self = $_[0] );
366 24         80 my ( $wrk_id, $pkg ) = ( $self->{WRK_ID}, $self->{PKG} );
367              
368 24 50       152 if ( $wrk_id == $$ ) {
    50          
369 0         0 '';
370             }
371             elsif ( $self->{MGR_ID} eq "$$.$_tid" ) {
372 24 50       72 return '' if $self->{REAPED};
373 24         72 local $!;
374 24 50       352 ( waitpid($wrk_id, _WNOHANG) == 0 ) ? '' : do {
375 0 0       0 _reap_hobo($self, 0) unless $self->{REAPED};
376 0         0 1;
377             };
378             }
379             else {
380             _croak('Error: $hobo->is_joinable() not called by managed process')
381 0 0       0 if ( $self->{IGNORE} );
382              
383 0 0       0 return '' if $self->{REAPED};
384 0 0       0 $_DATA->{$pkg}->exists('R'.$wrk_id) ? 1 : '';
385             }
386             }
387              
388             sub is_running {
389 24 50   24 1 8832 _croak('Usage: $hobo->is_running()') unless ref( my $self = $_[0] );
390 24         64 my ( $wrk_id, $pkg ) = ( $self->{WRK_ID}, $self->{PKG} );
391              
392 24 50       136 if ( $wrk_id == $$ ) {
    50          
393 0         0 1;
394             }
395             elsif ( $self->{MGR_ID} eq "$$.$_tid" ) {
396 24 50       64 return '' if $self->{REAPED};
397 24         72 local $!;
398 24 50       456 ( waitpid($wrk_id, _WNOHANG) == 0 ) ? 1 : do {
399 0 0       0 _reap_hobo($self, 0) unless $self->{REAPED};
400 0         0 '';
401             };
402             }
403             else {
404             _croak('Error: $hobo->is_running() not called by managed process')
405 0 0       0 if ( $self->{IGNORE} );
406              
407 0 0       0 return '' if $self->{REAPED};
408 0 0       0 $_DATA->{$pkg}->exists('R'.$wrk_id) ? '' : 1;
409             }
410             }
411              
412             sub join {
413 54 50   54 1 40452 _croak('Usage: $hobo->join()') unless ref( my $self = $_[0] );
414 54         1077 my ( $wrk_id, $pkg ) = ( $self->{WRK_ID}, $self->{PKG} );
415              
416 54 50       251 if ( $self->{REAPED} ) {
417 0 0       0 _croak('Hobo already joined') unless exists( $self->{RESULT} );
418 0 0       0 $_LIST->{$pkg}->del($wrk_id) if ( exists $_LIST->{$pkg} );
419              
420             return ( defined wantarray )
421 0 0       0 ? wantarray ? @{ delete $self->{RESULT} } : delete( $self->{RESULT} )->[-1]
  0 0       0  
422             : ();
423             }
424              
425 54 50       686 if ( $wrk_id == $$ ) {
    50          
426 0         0 _croak('Cannot join self');
427             }
428             elsif ( $self->{MGR_ID} eq "$$.$_tid" ) {
429             # remove from list after reaping
430 54 50       174 if ( $_tid ) {
431 0         0 local $SIG{CHLD};
432 0         0 _reap_hobo($self, 1);
433 0         0 $_LIST->{$pkg}->del($wrk_id);
434             }
435             else {
436 54         1667 local ($SIG{CHLD}, $!);
437 54         43850396 waitpid($wrk_id, 0);
438 54         1274 _reap_hobo($self, 0);
439 54         475 $_LIST->{$pkg}->del($wrk_id);
440             }
441             }
442             else {
443             _croak('Error: $hobo->join() not called by managed process')
444 0 0       0 if ( $self->{IGNORE} );
445              
446 0         0 sleep 0.3 until ( $_DATA->{$pkg}->exists('R'.$wrk_id) );
447 0         0 _reap_hobo($self, 0);
448             }
449              
450 54 50       291 return unless ( exists $self->{RESULT} );
451              
452             ( defined wantarray )
453 54 50       795 ? wantarray ? @{ delete $self->{RESULT} } : delete( $self->{RESULT} )->[-1]
  0 100       0  
454             : ();
455             }
456              
457             sub kill {
458 5 50   5 1 185 _croak('Usage: $hobo->kill()') unless ref( my $self = $_[0] );
459 5         80 my ( $wrk_id, $pkg, $signal ) = ( $self->{WRK_ID}, $self->{PKG}, $_[1] );
460              
461 5 50       35 if ( $wrk_id == $$ ) {
462 0   0     0 CORE::kill($signal || 'INT', $$);
463 0         0 return $self;
464             }
465 5 50       55 if ( $self->{MGR_ID} eq "$$.$_tid" ) {
466 5 50       20 return $self if $self->{REAPED};
467 5 50       20 if ( exists $_DATA->{$pkg} ) {
468 5         30 sleep 0.015 until $_DATA->{$pkg}->exists('S'.$wrk_id);
469             } else {
470 0         0 sleep 0.030;
471             }
472             }
473              
474 5 50 50     215 CORE::kill($signal || 'INT', $wrk_id) if CORE::kill('ZERO', $wrk_id);
475              
476 5         80 $self;
477             }
478              
479             sub list {
480 8 50   8 1 4736 _croak('Usage: MCE::Hobo->list()') if ref($_[0]);
481 8         56 my $pkg = "$$.$_tid.".caller();
482              
483 8 50       72 ( exists $_LIST->{$pkg} ) ? $_LIST->{$pkg}->vals() : ();
484             }
485              
486             sub list_pids {
487 8 50   8 1 576 _croak('Usage: MCE::Hobo->list_pids()') if ref($_[0]);
488 8         304 my $pkg = "$$.$_tid.".caller(); local $_;
  8         88  
489              
490 8 50       320 ( exists $_LIST->{$pkg} ) ? map { $_->pid } $_LIST->{$pkg}->vals() : ();
  24         224  
491             }
492              
493             sub list_joinable {
494 8 50   8 1 4616 _croak('Usage: MCE::Hobo->list_joinable()') if ref($_[0]);
495 8         1080 my $pkg = "$$.$_tid.".caller();
496              
497 8 50       656 return () unless ( my $list = $_LIST->{$pkg} );
498 8         64 local ($!, $?, $_);
499              
500             map {
501 8 50       32 ( waitpid($_->{WRK_ID}, _WNOHANG) == 0 ) ? () : do {
  24         280  
502 0 0       0 _reap_hobo($_, 0) unless $_->{REAPED};
503 0         0 $_;
504             };
505             }
506             $list->vals();
507             }
508              
509             sub list_running {
510 8 50   8 1 16424 _croak('Usage: MCE::Hobo->list_running()') if ref($_[0]);
511 8         72 my $pkg = "$$.$_tid.".caller();
512              
513 8 50       120 return () unless ( my $list = $_LIST->{$pkg} );
514 8         152 local ($!, $?, $_);
515              
516             map {
517 8 50       48 ( waitpid($_->{WRK_ID}, _WNOHANG) == 0 ) ? $_ : do {
  24         312  
518 0 0       0 _reap_hobo($_, 0) unless $_->{REAPED};
519 0         0 ();
520             };
521             }
522             $list->vals();
523             }
524              
525             sub max_workers {
526 17 50   17 1 76 _croak('Usage: MCE::Hobo->max_workers()') if ref($_[0]);
527 17   33     83 my $mngd = $_MNGD->{ "$$.$_tid.".caller() } || do {
528             # construct mngd internally on first use unless defined
529             init(); $_MNGD->{ "$$.$_tid.".caller() };
530             };
531 17 50       40 shift if ( $_[0] eq __PACKAGE__ );
532              
533 17 100       77 $mngd->{max_workers} = _max_workers(shift) if @_;
534 17         94 $mngd->{max_workers};
535             }
536              
537             sub pending {
538 8 50   8 1 4944 _croak('Usage: MCE::Hobo->pending()') if ref($_[0]);
539 8         48 my $pkg = "$$.$_tid.".caller();
540              
541 8 50       120 ( exists $_LIST->{$pkg} ) ? $_LIST->{$pkg}->len() : 0;
542             }
543              
544             sub pid {
545 31 50   31 1 242 ref($_[0]) ? $_[0]->{WRK_ID} : $_SELF->{WRK_ID};
546             }
547              
548             sub result {
549 7 50   7 1 132 _croak('Usage: $hobo->result()') unless ref( my $self = $_[0] );
550 7 50       32 return $self->join() unless $self->{REAPED};
551              
552 7 50       147 _croak('Hobo already joined') unless exists( $self->{RESULT} );
553 7 50       52 wantarray ? @{ delete $self->{RESULT} } : delete( $self->{RESULT} )->[-1];
  0         0  
554             }
555              
556             sub self {
557 0 0   0 1 0 ref($_[0]) ? $_[0] : $_SELF;
558             }
559              
560             sub wait_all {
561 1 50   1 1 70 _croak('Usage: MCE::Hobo->wait_all()') if ref($_[0]);
562 1         48 my $pkg = "$$.$_tid.".caller();
563              
564             return wantarray ? () : 0
565 1 0 33     153 if ( !exists $_LIST->{$pkg} || !$_LIST->{$pkg}->len() );
    50          
566              
567 1         10 local $_; ( wantarray )
568 0         0 ? map { $_->join(); $_ } $_LIST->{$pkg}->vals()
  0         0  
569 1 50       38 : map { $_->join(); () } $_LIST->{$pkg}->vals();
  3         32  
  3         21  
570             }
571              
572             *waitall = \&wait_all; # compatibility
573              
574             sub wait_one {
575 4 50   4 1 324 _croak('Usage: MCE::Hobo->wait_one()') if ref($_[0]);
576 4         160 my $pkg = "$$.$_tid.".caller();
577              
578             return undef
579 4 50 33     204 if ( !exists $_LIST->{$pkg} || !$_LIST->{$pkg}->len() );
580              
581 4         108 _wait_one($pkg);
582             }
583              
584             *waitone = \&wait_one; # compatibility
585              
586             sub yield {
587 0 0   0 1 0 _croak('Usage: MCE::Hobo->yield()') if ref($_[0]);
588 0 0 0     0 shift if ( defined $_[0] && $_[0] eq __PACKAGE__ );
589              
590 0   0     0 my $pkg = $_SELF->{PKG} || do {
591             my $mngd = $_MNGD->{ "$$.$_tid.".caller() } || do {
592             # construct mngd internally on first use unless defined
593             init(); $_MNGD->{ "$$.$_tid.".caller() };
594             };
595             $mngd->{PKG};
596             };
597              
598 0 0       0 return unless $_DELY->{$pkg};
599 0         0 my $seconds = $_DELY->{$pkg}->seconds(@_);
600              
601 0         0 MCE::Util::_sleep( $seconds );
602             }
603              
604             ###############################################################################
605             ## ----------------------------------------------------------------------------
606             ## Private methods.
607             ##
608             ###############################################################################
609              
610             sub _croak {
611 0 0   0   0 if ( $INC{'MCE.pm'} ) {
612 0         0 goto &MCE::_croak;
613             }
614             else {
615 0         0 $SIG{__DIE__} = \&MCE::Signal::_die_handler;
616 0         0 $SIG{__WARN__} = \&MCE::Signal::_warn_handler;
617              
618 0         0 $\ = undef; goto &Carp::croak;
  0         0  
619             }
620             }
621              
622             sub _dispatch {
623 15     15   155 my ( $mngd, $func, $args ) = @_;
624              
625 15         440 $mngd->{WRK_ID} = $_SELF->{WRK_ID} = $$, $? = 0;
626 15 50       132 $ENV{PERL_MCE_IPC} = 'win32' if $_is_MSWin32;
627              
628             {
629 15         85 local $!;
  15         471  
630 15 50       783 (*STDERR)->autoflush(1) if defined( fileno *STDERR );
631 15 50       3579 (*STDOUT)->autoflush(1) if defined( fileno *STDOUT );
632             }
633              
634             # Run task.
635             my $hobo_timeout = ( exists $_SELF->{hobo_timeout} )
636 15 50       670 ? $_SELF->{hobo_timeout} : $mngd->{hobo_timeout};
637              
638             my $void_context = ( exists $_SELF->{void_context} )
639 15 50       277 ? $_SELF->{void_context} : $mngd->{void_context};
640              
641 15         55 my @res; my $timed_out = 0;
  15         49  
642              
643             local $SIG{'ALRM'} = sub {
644 0     0   0 alarm 0; $timed_out = 1; $SIG{__WARN__} = sub {};
  0         0  
  0         0  
645 0         0 die "Hobo timed out\n";
646 15         940 };
647              
648 15 50 33     656 if ( $void_context || $_SELF->{IGNORE} ) {
649 18     18   174 no strict 'refs';
  18         81  
  18         881  
650 0   0     0 eval { alarm($hobo_timeout || 0); $func->(@{ $args }) };
  0         0  
  0         0  
  0         0  
651             }
652             else {
653 18     18   224 no strict 'refs';
  18         36  
  18         3507  
654 14   50     129 @res = eval { alarm($hobo_timeout || 0); $func->(@{ $args }) };
  14         1078  
  14         53  
  14         193  
655             }
656              
657 12         25040 alarm 0;
658 12 50       197 $@ = "Hobo timed out" if $timed_out;
659              
660 12 50       166 if ( $@ ) {
661 0 0       0 _exit($?) if ( $@ =~ /^Hobo exited \(\S+\)$/ );
662 0         0 my $err = $@; $? = 1; $err =~ s/, <__ANONIO__> line \d+//;
  0         0  
  0         0  
663              
664 0 0       0 if ( ! $_SELF->{IGNORE} ) {
665             $_DATA->{ $_SELF->{PKG} }->set('S'.$$, $err),
666 0         0 $_DATA->{ $_SELF->{PKG} }->set('R'.$$, '');
667             }
668              
669 0 0 0     0 if ( !$timed_out && !$mngd->{on_finish} && !$INC{'MCE/Simple.pm'} ) {
      0        
670 18     18   11227 use bytes; warn "Hobo $$ terminated abnormally: reason $err\n";
  18         259  
  18         91  
  0         0  
671             }
672             }
673             else {
674 12 50       124 shift(@res) if ref($res[0]) =~ /^MCE::(?:Barrier|Semaphore)::_guard/s;
675             $_DATA->{ $_SELF->{PKG} }->set('R'.$$, @res ? $_freeze->(\@res) : '')
676 12 50       719 if ( ! $_SELF->{IGNORE} );
    50          
677             }
678              
679 12         118 _exit($?);
680             }
681              
682             sub _exit {
683 15     15   93 my ( $exit_status ) = @_;
684              
685             # Check for nested workers not yet joined.
686 15 50 66     253 MCE::Hobo->finish('MCE') if ( !$_SELF->{SIGNALED} && keys %{ $_LIST } );
  12         169  
687              
688             # Exit hobo process.
689 15 50   0   505 $SIG{__DIE__} = sub {} unless $_tid;
690 15     0   470 $SIG{__WARN__} = sub {};
691              
692 15 50 33     200 threads->exit($exit_status) if ( $INC{'threads.pm'} && $_is_MSWin32 );
693              
694             my $posix_exit = ( exists $_SELF->{posix_exit} )
695 15 50       126 ? $_SELF->{posix_exit} : $_MNGD->{ $_SELF->{PKG} }{posix_exit};
696              
697 15 0 33     152 if ( $posix_exit && !$_SELF->{SIGNALED} && !$_is_MSWin32 ) {
      33        
698 0         0 eval { MCE::Mutex::Channel::_destroy() };
  0         0  
699 0 0       0 POSIX::_exit($exit_status) if $INC{'POSIX.pm'};
700 0         0 CORE::kill('KILL', $$);
701             }
702              
703 15         6026 CORE::exit($exit_status);
704             }
705              
706             sub _force_reap {
707 10     10   47 my ( $count, $pkg ) = ( 0, @_ );
708 10 50 33     178 return unless ( exists $_LIST->{$pkg} && $_LIST->{$pkg}->len() );
709              
710 0         0 for my $hobo ( $_LIST->{$pkg}->vals() ) {
711 0 0       0 next if $hobo->{IGNORE};
712              
713 0 0       0 if ( $hobo->is_running() ) {
714 0 0       0 sleep(0.015), CORE::kill('KILL', $hobo->pid())
715             if CORE::kill('ZERO', $hobo->pid());
716 0         0 $count++;
717             }
718             }
719              
720 0         0 $_LIST->{$pkg}->clear();
721              
722 0 0 0     0 warn "Finished with active hobo processes [$pkg] ($count)\n"
723             if ( $count && !$_is_MSWin32 );
724              
725 0         0 return;
726             }
727              
728             sub _quit {
729 3 50   3   2879 return MCE::Signal::defer($_[0]) if $MCE::Signal::IPC;
730              
731 3         21 alarm 0; my ( $name ) = @_;
  3         14  
732 3         76 $_SELF->{SIGNALED} = 1, $name =~ s/^SIG//;
733              
734       0     $SIG{$name} = sub {}, CORE::kill($name, -$$)
735 3 50       124 if ( exists $SIG{$name} );
736              
737 3 50       32 if ( ! $_SELF->{IGNORE} ) {
738 3         25 my ( $pkg, $wrk_id ) = ( $_SELF->{PKG}, $_SELF->{WRK_ID} );
739 3         21 $_DATA->{$pkg}->set('R'.$wrk_id, '');
740             }
741              
742 3         19 _exit(0);
743             }
744              
745             sub _reap_hobo {
746 58     58   275 my ( $hobo, $wait_flag ) = @_;
747 58 50       469 return unless $hobo;
748              
749 58         873 local @_ = $_DATA->{ $hobo->{PKG} }->get($hobo->{WRK_ID}, $wait_flag);
750              
751 58 100 50     3463 ( $hobo->{ERROR}, $hobo->{RESULT}, $hobo->{REAPED} ) =
752             ( pop || '', length $_[0] ? $_thaw->(pop) : [], 1 );
753              
754 58 50       341 return if $hobo->{IGNORE};
755              
756 58   50     637 my ( $exit, $err ) = ( $? || 0, $hobo->{ERROR} );
757 58         276 my ( $code, $sig ) = ( $exit >> 8, $exit & 0x7f );
758              
759 58 50 33     255 if ( $code > 100 && !$err ) {
760 0 0       0 $code = 2, $sig = 1, $err = 'Hobo received SIGHUP' if $code == 101;
761 0 0       0 $code = 2, $sig = 2, $err = 'Hobo received SIGINT' if $code == 102;
762 0 0       0 $code = 2, $sig = 6, $err = 'Hobo received SIGABRT' if $code == 106;
763 0 0       0 $code = 2, $sig = 11, $err = 'Hobo received SIGSEGV' if $code == 111;
764 0 0       0 $code = 2, $sig = 15, $err = 'Hobo received SIGTERM' if $code == 115;
765              
766 0         0 $hobo->{ERROR} = $err;
767             }
768              
769 58 100       318 if ( my $on_finish = $_MNGD->{ $hobo->{PKG} }{on_finish} ) {
770             $on_finish->(
771             $hobo->{WRK_ID}, $code, $hobo->{ident}, $sig, $err,
772 7         56 @{ $hobo->{RESULT} }
  7         55  
773             );
774             }
775              
776 58         335 return;
777             }
778              
779             sub _trap {
780 0 0   0   0 return MCE::Signal::defer($_[0]) if $MCE::Signal::IPC;
781              
782 0         0 alarm 0; my ( $exit_status, $name ) = ( 2, @_ );
  0         0  
783 0         0 $_SELF->{SIGNALED} = 1, $name =~ s/^SIG//;
784              
785       0     $SIG{$name} = sub {}, CORE::kill($name, -$$)
786 0 0       0 if ( exists $SIG{$name} );
787              
788 0 0       0 if ( $name eq 'HUP' ) { $exit_status = 101 }
  0 0       0  
    0          
    0          
    0          
789 0         0 elsif ( $name eq 'INT' ) { $exit_status = 102 }
790 0         0 elsif ( $name eq 'ABRT' ) { $exit_status = 106 }
791 0         0 elsif ( $name eq 'SEGV' ) { $exit_status = 111 }
792 0         0 elsif ( $name eq 'TERM' ) { $exit_status = 115 }
793              
794 0 0       0 if ( ! $_SELF->{IGNORE} ) {
795 0         0 my ( $pkg, $wrk_id ) = ( $_SELF->{PKG}, $_SELF->{WRK_ID} );
796 0         0 $_DATA->{$pkg}->set('R'.$wrk_id, '');
797             }
798              
799 0         0 _exit($exit_status);
800             }
801              
802             sub _wait_one {
803 4     4   16 my ( $pkg ) = @_;
804 4         44 my ( $list, $self, $wrk_id ) = ( $_LIST->{$pkg} ); local $!;
  4         156  
805              
806 4         40 while () {
807 236         5364 for my $hobo ( $list->vals() ) {
808 236         1040 $wrk_id = $hobo->{WRK_ID};
809 236 50       1276 return $list->del($wrk_id) if $hobo->{REAPED};
810 236 100       5848 $self = $list->del($wrk_id), last if waitpid($wrk_id, _WNOHANG);
811             }
812 236 100       1020 last if $self;
813 232         6999956 sleep 0.030;
814             }
815              
816 4         48 _reap_hobo($self, 0);
817              
818 4         48 $self;
819             }
820              
821             ###############################################################################
822             ## ----------------------------------------------------------------------------
823             ## Delay implementation suited for MCE::Hobo.
824             ##
825             ###############################################################################
826              
827             package # hide from rpm
828             MCE::Hobo::_delay;
829              
830             sub new {
831 18     18   97 my ( $class, $chnl, $delay ) = @_;
832              
833 18 50       108 if ( !defined $delay ) {
834 18 50       173 $delay = ($^O =~ /mswin|mingw|msys|cygwin/i) ? 0.015 : 0.008;
835             }
836              
837 18         268 $chnl->send(undef);
838              
839 18         1079 bless [ $delay, $chnl ], $class;
840             }
841              
842             sub seconds {
843 0     0   0 my ( $self, $how_long ) = @_;
844 0 0       0 my $delay = defined($how_long) ? $how_long : $self->[0];
845 0         0 my $lapse = $self->[1]->recv();
846 0         0 my $time = MCE::Util::_time();
847              
848 0 0 0     0 if ( !$delay || !defined $lapse ) {
    0          
849 0         0 $lapse = $time;
850             }
851             elsif ( $lapse + $delay - $time < 0 ) {
852 0         0 $lapse += int( abs($time - $lapse) / $delay + 0.5 ) * $delay;
853             }
854              
855 0         0 $self->[1]->send( $lapse += $delay );
856              
857 0         0 return $lapse - $time;
858             }
859              
860             ###############################################################################
861             ## ----------------------------------------------------------------------------
862             ## Hash and ordhash implementations suited for MCE::Hobo.
863             ##
864             ###############################################################################
865              
866             package # hide from rpm
867             MCE::Hobo::_hash;
868              
869 18     18   41793 use MCE::Shared ();
  18         53  
  18         527  
870 18     18   127 use Time::HiRes 'sleep';
  18         36  
  18         121  
871              
872             use constant {
873 18 50       15223 _WNOHANG => ( $INC{'POSIX.pm'} )
    50          
874             ? &POSIX::WNOHANG : ( $^O eq 'solaris' ) ? 64 : 1
875 18     18   2163 };
  18         36  
876              
877             sub new {
878 18     18   680 bless \ MCE::Shared->share({ module => 'MCE::Shared::Hash' }), shift;
879             }
880              
881 7     7   25 sub clear { ${ $_[0] }->clear(); }
  7         197  
882 15     15   60 sub exists { ${ $_[0] }->exists($_[1]); }
  15         530  
883 30     30   325 sub set { ${ $_[0] }->set($_[1], $_[2]); }
  30         700  
884              
885             sub get {
886 58     58   240 my ( $self, $wrk_id, $wait_flag ) = @_;
887              
888 58 50       266 if ( $wait_flag ) {
889 0         0 local $!;
890 0 0       0 ( ${ $self }->exists('R'.$wrk_id) ) ? waitpid($wrk_id, 0) : do {
  0         0  
891 0         0 while () {
892 0 0       0 if ( ! ${ $self }->exists('R'.$wrk_id) ) {
  0         0  
893 0 0       0 last if waitpid($wrk_id, _WNOHANG);
894 0         0 sleep(0.030), next;
895             }
896 0         0 waitpid($wrk_id, 0), last;
897             }
898             };
899             }
900              
901 58         193 ${ $self }->_get_hobo_data($wrk_id);
  58         1000  
902             }
903              
904             package # hide from rpm
905             MCE::Hobo::_ordhash;
906              
907 18     18   258 sub new { bless [ {}, [], {}, 0 ], shift; } # data, keys, indx, gcnt
908 0     0   0 sub exists { CORE::exists $_[0]->[0]{ $_[1] }; }
909 0     0   0 sub get { $_[0]->[0]{ $_[1] }; }
910 23     23   99 sub len { scalar keys %{ $_[0]->[0] }; }
  23         270  
911              
912             sub clear {
913 0     0   0 my ( $self ) = @_;
914 0         0 %{ $self->[0] } = @{ $self->[1] } = %{ $self->[2] } = (), $self->[3] = 0;
  0         0  
  0         0  
  0         0  
915              
916 0         0 return;
917             }
918              
919             sub del {
920 58     58   347 my ( $self, $key ) = @_;
921 58 50       402 return undef unless defined( my $off = delete $self->[2]{$key} );
922              
923             # tombstone
924 58         218 $self->[1][$off] = undef;
925              
926             # GC keys and refresh index
927 58 100       215 if ( ++$self->[3] > @{ $self->[1] } * 0.667 ) {
  58         493  
928 28         109 my ( $keys, $indx ) = ( $self->[1], $self->[2] );
929 28         108 my $i; $i = $self->[3] = 0;
  28         148  
930 28         83 for my $k ( @{ $keys } ) {
  28         414  
931 58 50       302 $keys->[$i] = $k, $indx->{$k} = $i++ if defined($k);
932             }
933 28         88 splice @{ $keys }, $i;
  28         220  
934             }
935              
936 58         1708 delete $self->[0]{$key};
937             }
938              
939             sub set {
940 70     70   1986 my ( $self, $key ) = @_;
941 70 50       1679 $self->[0]{$key} = $_[2], return 1 if exists($self->[0]{$key});
942              
943 70         267 $self->[2]{$key} = @{ $self->[1] }; push @{ $self->[1] }, $key;
  70         2238  
  70         413  
  70         1207  
944 70         1071 $self->[0]{$key} = $_[2];
945              
946 70         248 return 1;
947             }
948              
949             sub vals {
950 269     269   1947 my ( $self ) = @_;
951             $self->[3]
952 0         0 ? @{ $self->[0] }{ grep defined($_), @{ $self->[1] } }
  0         0  
953 269 50       1820 : @{ $self->[0] }{ @{ $self->[1] } };
  269         2908  
  269         1098  
954             }
955              
956             1;
957              
958             __END__