File Coverage

blib/lib/MCE.pm
Criterion Covered Total %
statement 602 1075 56.0
branch 231 760 30.3
condition 87 369 23.5
subroutine 53 75 70.6
pod 0 20 0.0
total 973 2299 42.3


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## MCE - Many-Core Engine for Perl providing parallel processing capabilities.
4             ##
5             ###############################################################################
6              
7             package MCE;
8              
9 85     85   1791464 use strict;
  85         327  
  85         2204  
10 85     85   332 use warnings;
  85         119  
  85         2204  
11              
12 85     85   329 no warnings qw( threads recursion uninitialized );
  85         123  
  85         2910  
13              
14             our $VERSION = '1.887';
15              
16             ## no critic (BuiltinFunctions::ProhibitStringyEval)
17             ## no critic (Subroutines::ProhibitSubroutinePrototypes)
18             ## no critic (TestingAndDebugging::ProhibitNoStrict)
19              
20 85     85   512 use Carp ();
  85         139  
  85         18398  
21              
22             my ($_has_threads, $_freeze, $_thaw, $_tid, $_oid);
23              
24             BEGIN {
25 85     85   244 local $@;
26              
27 85 50 33     920 if ( $^O eq 'MSWin32' && ! $INC{'threads.pm'} ) {
    50 33        
28 0         0 eval 'use threads; use threads::shared;';
29             }
30             elsif ( $INC{'threads.pm'} && ! $INC{'threads/shared.pm'} ) {
31 0         0 eval 'use threads::shared;';
32             }
33              
34 85 50       391 $_has_threads = $INC{'threads.pm'} ? 1 : 0;
35 85 50       300 $_tid = $_has_threads ? threads->tid() : 0;
36 85         311 $_oid = "$$.$_tid";
37              
38 85 50 33     622 if ( $] ge '5.008008' && ! $INC{'PDL.pm'} ) {
39 85     85   6686 eval 'use Sereal::Encoder 3.015; use Sereal::Decoder 3.015;';
  85     85   496  
  85         2343  
  85         3639  
  85         462  
  85         1545  
  85         2352  
40 85 50       363 if ( ! $@ ) {
41 85         779 my $_encoder_ver = int( Sereal::Encoder->VERSION() );
42 85         545 my $_decoder_ver = int( Sereal::Decoder->VERSION() );
43 85 50       303 if ( $_encoder_ver - $_decoder_ver == 0 ) {
44 85         161 $_freeze = \&Sereal::Encoder::encode_sereal;
45 85         146 $_thaw = \&Sereal::Decoder::decode_sereal;
46             }
47             }
48             }
49              
50 85 50       1882 if ( ! defined $_freeze ) {
51 0         0 require Storable;
52 0         0 $_freeze = \&Storable::freeze;
53 0         0 $_thaw = \&Storable::thaw;
54             }
55             }
56              
57 85     85   39789 use IO::Handle ();
  85         433875  
  85         1970  
58 85     85   452 use Scalar::Util qw( looks_like_number refaddr reftype weaken );
  85         138  
  85         4288  
59 85     85   39591 use Socket qw( SOL_SOCKET SO_RCVBUF );
  85         252858  
  85         11877  
60 85     85   521 use Time::HiRes qw( sleep time );
  85         157  
  85         637  
61              
62 85     85   40400 use MCE::Util qw( $LF );
  85         199  
  85         7094  
63 85     85   31767 use MCE::Signal ();
  85         197  
  85         1801  
64 85     85   30381 use MCE::Mutex ();
  85         163  
  85         19143  
65              
66             our ($MCE, $RLA, $_que_template, $_que_read_size);
67             our (%_valid_fields_new);
68              
69             my ($TOP_HDLR, $_is_MSWin32, $_is_winenv, $_prev_mce);
70             my (%_valid_fields_task, %_params_allowed_args);
71              
72             BEGIN {
73             ## Configure pack/unpack template for writing to and from the queue.
74             ## Each entry contains 2 positive numbers: chunk_id & msg_id.
75             ## Check for >= 64-bit, otherwize fall back to machine's word length.
76              
77 85     85   278 $_que_template = ( ( log(~0+1) / log(2) ) >= 64 ) ? 'Q2' : 'I2';
78 85         357 $_que_read_size = length pack($_que_template, 0, 0);
79              
80             ## Attributes used internally.
81             ## _abort_msg _caller _chn _com_lock _dat_lock _mgr_live _rla_data _seed
82             ## _chunk_id _pids _run_mode _single_dim _thrs _tids _task_wid _wid _wuf
83             ## _exiting _exit_pid _last_sref _total_exited _total_running _total_workers
84             ## _send_cnt _sess_dir _spawned _state _status _task _task_id _wrk_status
85             ## _init_pid _init_total_workers _pids_t _pids_w _pids_c _relayed
86             ##
87             ## _bsb_r_sock _bsb_w_sock _com_r_sock _com_w_sock _dat_r_sock _dat_w_sock
88             ## _que_r_sock _que_w_sock _rla_r_sock _rla_w_sock _data_channels
89             ## _lock_chn _mutex_n
90              
91 85         168 %_valid_fields_new = map { $_ => 1 } qw(
  3230         5527  
92             max_workers tmp_dir use_threads user_tasks task_end task_name freeze thaw
93             chunk_size input_data sequence job_delay spawn_delay submit_delay RS
94             flush_file flush_stderr flush_stdout stderr_file stdout_file use_slurpio
95             interval user_args user_begin user_end user_func user_error user_output
96             bounds_only gather init_relay on_post_exit on_post_run parallel_io
97             loop_timeout max_retries progress posix_exit
98             );
99 85         311 %_params_allowed_args = map { $_ => 1 } qw(
  2465         2977  
100             chunk_size input_data sequence job_delay spawn_delay submit_delay RS
101             flush_file flush_stderr flush_stdout stderr_file stdout_file use_slurpio
102             interval user_args user_begin user_end user_func user_error user_output
103             bounds_only gather init_relay on_post_exit on_post_run parallel_io
104             loop_timeout max_retries progress
105             );
106 85         266 %_valid_fields_task = map { $_ => 1 } qw(
  1530         2058  
107             max_workers chunk_size input_data interval sequence task_end task_name
108             bounds_only gather init_relay user_args user_begin user_end user_func
109             RS parallel_io use_slurpio use_threads
110             );
111              
112 85 50       554 $_is_MSWin32 = ( $^O eq 'MSWin32' ) ? 1 : 0;
113 85 50       473 $_is_winenv = ( $^O =~ /mswin|mingw|msys|cygwin/i ) ? 1 : 0;
114              
115             ## Create accessor functions.
116 85     85   543 no strict 'refs'; no warnings 'redefine';
  85     85   102  
  85         3351  
  85         432  
  85         192  
  85         20721  
117              
118 85         1994 for my $_p (qw( chunk_size max_retries max_workers task_name user_args )) {
119 425         1663 *{ $_p } = sub () {
120 99 100   99   352 my $self = shift; $self = $MCE unless ref($self);
  99         170  
121 99         440 return $self->{$_p};
122 425         1423 };
123             }
124 85         153 for my $_p (qw( chunk_id task_id task_wid wid )) {
125 340         911 *{ $_p } = sub () {
126 63 100   63   916 my $self = shift; $self = $MCE unless ref($self);
  63         330  
127 63         1277 return $self->{"_${_p}"};
128 340         841 };
129             }
130 85         148 for my $_p (qw( freeze thaw )) {
131 170         709 *{ $_p } = sub () {
132 208 100   208   355 my $self = shift; $self = $MCE unless ref($self);
  208         714  
133 208         2496 return $self->{$_p}(@_);
134 170         457 };
135             }
136              
137 85         155 $RLA = {};
138              
139 85         2885 return;
140             }
141              
142             ###############################################################################
143             ## ----------------------------------------------------------------------------
144             ## Import routine.
145             ##
146             ###############################################################################
147              
148 85     85   665 use constant { SELF => 0, CHUNK => 1, CID => 2 };
  85         199  
  85         8507  
149              
150 85     85   40002 our $_MCE_LOCK : shared = 1;
  85         85757  
  85         407  
151 85     85   5078 our $_WIN_LOCK : shared = 1;
  85         121  
  85         235  
152              
153             my ($_def, $_imported) = ({});
154              
155             sub import {
156 98     98   649 my ($_class, $_pkg) = (shift, caller);
157 98         907 my $_p = $_def->{$_pkg} = {};
158              
159             ## Process module arguments.
160 98         465 while ( my $_argument = shift ) {
161 0         0 my $_arg = lc $_argument;
162              
163 0 0       0 $_p->{MAX_WORKERS} = shift, next if ( $_arg eq 'max_workers' );
164 0 0       0 $_p->{CHUNK_SIZE} = shift, next if ( $_arg eq 'chunk_size' );
165 0 0       0 $_p->{TMP_DIR} = shift, next if ( $_arg eq 'tmp_dir' );
166 0 0       0 $_p->{FREEZE} = shift, next if ( $_arg eq 'freeze' );
167 0 0       0 $_p->{THAW} = shift, next if ( $_arg eq 'thaw' );
168 0 0       0 $_p->{INIT_RELAY} = shift, next if ( $_arg eq 'init_relay' );
169 0 0       0 $_p->{USE_THREADS} = shift, next if ( $_arg eq 'use_threads' );
170              
171 0 0 0     0 if ( $_arg eq 'export_const' || $_arg eq 'const' ) {
172 0 0       0 if ( shift eq '1' ) {
173 85     85   11864 no strict 'refs'; no warnings 'redefine';
  85     85   247  
  85         2405  
  85         377  
  85         122  
  85         25820  
174 0         0 *{ $_pkg.'::SELF' } = \&SELF;
  0         0  
175 0         0 *{ $_pkg.'::CHUNK' } = \&CHUNK;
  0         0  
176 0         0 *{ $_pkg.'::CID' } = \&CID;
  0         0  
177             }
178 0         0 next;
179             }
180              
181             ## Sereal, if available, is used automatically by MCE 1.800 onwards.
182 0 0       0 if ( $_arg eq 'sereal' ) {
183 0 0       0 if ( shift eq '0' ) {
184 0         0 require Storable;
185 0         0 $_p->{FREEZE} = \&Storable::freeze;
186 0         0 $_p->{THAW} = \&Storable::thaw;
187             }
188 0         0 next;
189             }
190              
191 0         0 _croak("Error: ($_argument) invalid module option");
192             }
193              
194 98 100       1969 return if $_imported++;
195              
196             ## Instantiate a module-level instance.
197 85         389 $MCE = MCE->new( _module_instance => 1, max_workers => 0 );
198              
199 85         7682 return;
200             }
201              
202             ###############################################################################
203             ## ----------------------------------------------------------------------------
204             ## Define constants & variables.
205             ##
206             ###############################################################################
207              
208             use constant {
209              
210             # Max data channels. This cannot be greater than 8 on MSWin32.
211 85         77274 DATA_CHANNELS => 8,
212              
213             # Max GC size. Undef variable when exceeding size.
214             MAX_GC_SIZE => 1024 * 1024 * 64,
215              
216             MAX_RECS_SIZE => 8192, # Reads number of records if N <= value
217             # Reads number of bytes if N > value
218              
219             OUTPUT_W_ABT => 'W~ABT', # Worker has aborted
220             OUTPUT_W_DNE => 'W~DNE', # Worker has completed
221             OUTPUT_W_RLA => 'W~RLA', # Worker has relayed
222             OUTPUT_W_EXT => 'W~EXT', # Worker has exited
223             OUTPUT_A_REF => 'A~REF', # Input << Array ref
224             OUTPUT_G_REF => 'G~REF', # Input << Glob ref
225             OUTPUT_H_REF => 'H~REF', # Input << Hash ref
226             OUTPUT_I_REF => 'I~REF', # Input << Iter ref
227             OUTPUT_A_CBK => 'A~CBK', # Callback w/ multiple args
228             OUTPUT_N_CBK => 'N~CBK', # Callback w/ no args
229             OUTPUT_A_GTR => 'A~GTR', # Gather data
230             OUTPUT_O_SND => 'O~SND', # Send >> STDOUT
231             OUTPUT_E_SND => 'E~SND', # Send >> STDERR
232             OUTPUT_F_SND => 'F~SND', # Send >> File
233             OUTPUT_D_SND => 'D~SND', # Send >> File descriptor
234             OUTPUT_B_SYN => 'B~SYN', # Barrier sync - begin
235             OUTPUT_E_SYN => 'E~SYN', # Barrier sync - end
236             OUTPUT_S_IPC => 'S~IPC', # Change to win32 IPC
237             OUTPUT_C_NFY => 'C~NFY', # Chunk ID notification
238             OUTPUT_P_NFY => 'P~NFY', # Progress notification
239             OUTPUT_R_NFY => 'R~NFY', # Relay notification
240             OUTPUT_S_DIR => 'S~DIR', # Make/get sess_dir
241             OUTPUT_T_DIR => 'T~DIR', # Make/get tmp_dir
242             OUTPUT_I_DLY => 'I~DLY', # Interval delay
243              
244             READ_FILE => 0, # Worker reads file handle
245             READ_MEMORY => 1, # Worker reads memory handle
246              
247             REQUEST_ARRAY => 0, # Worker requests next array chunk
248             REQUEST_GLOB => 1, # Worker requests next glob chunk
249             REQUEST_HASH => 2, # Worker requests next hash chunk
250              
251             SENDTO_FILEV1 => 0, # Worker sends to 'file', $a, '/path'
252             SENDTO_FILEV2 => 1, # Worker sends to 'file:/path', $a
253             SENDTO_STDOUT => 2, # Worker sends to STDOUT
254             SENDTO_STDERR => 3, # Worker sends to STDERR
255             SENDTO_FD => 4, # Worker sends to file descriptor
256              
257             WANTS_UNDEF => 0, # Callee wants nothing
258             WANTS_ARRAY => 1, # Callee wants list
259             WANTS_SCALAR => 2, # Callee wants scalar
260 85     85   537 };
  85         134  
261              
262             my $_mce_count = 0;
263              
264             sub CLONE {
265 0 0   0   0 $_tid = threads->tid() if $INC{'threads.pm'};
266             }
267              
268             sub DESTROY {
269 290 0 33 290   818 CORE::kill('KILL', $$)
270             if ( $_is_MSWin32 && $MCE::Signal::KILLED );
271              
272             $_[0]->shutdown(1)
273 290 0 33     2160 if ( $_[0] && $_[0]->{_spawned} && $_[0]->{_init_pid} eq "$$.$_tid" &&
      33        
      33        
274             !$MCE::Signal::KILLED );
275              
276 290         9556 return;
277             }
278              
279             END {
280 85 100   85   70675 return unless ( defined $MCE );
281              
282 20 50       199 my $_pid = $MCE->{_is_thread} ? $$ .'.'. threads->tid() : $$;
283 20 50 33     137 $MCE->exit if ( exists $MCE->{_wuf} && $MCE->{_pid} eq $_pid );
284              
285 20         86 _end();
286             }
287              
288             sub _end {
289 85 100   85   1530 MCE::Flow->finish ( 'MCE' ) if $INC{'MCE/Flow.pm'};
290 85 100       482 MCE::Grep->finish ( 'MCE' ) if $INC{'MCE/Grep.pm'};
291 85 100       631 MCE::Loop->finish ( 'MCE' ) if $INC{'MCE/Loop.pm'};
292 85 100       596 MCE::Map->finish ( 'MCE' ) if $INC{'MCE/Map.pm'};
293 85 100       710 MCE::Step->finish ( 'MCE' ) if $INC{'MCE/Step.pm'};
294 85 100       618 MCE::Stream->finish ( 'MCE' ) if $INC{'MCE/Stream.pm'};
295              
296 85         467 $MCE = $TOP_HDLR = undef;
297             }
298              
299             ###############################################################################
300             ## ----------------------------------------------------------------------------
301             ## Plugin interface for external modules plugging into MCE, e.g. MCE::Queue.
302             ##
303             ###############################################################################
304              
305             my (%_plugin_function, @_plugin_loop_begin, @_plugin_loop_end);
306             my (%_plugin_list, @_plugin_worker_init);
307              
308             sub _attach_plugin {
309 45     45   97 my $_ext_module = caller;
310              
311 45 50       203 unless (exists $_plugin_list{$_ext_module}) {
312 45         113 $_plugin_list{$_ext_module} = undef;
313              
314 45         98 my $_ext_output_function = $_[0];
315 45         60 my $_ext_output_loop_begin = $_[1];
316 45         74 my $_ext_output_loop_end = $_[2];
317 45         69 my $_ext_worker_init = $_[3];
318              
319 45 50       151 if (ref $_ext_output_function eq 'HASH') {
320 45         244 for my $_p (keys %{ $_ext_output_function }) {
  45         216  
321             $_plugin_function{$_p} = $_ext_output_function->{$_p}
322 545 50       919 unless (exists $_plugin_function{$_p});
323             }
324             }
325              
326 45 50       286 push @_plugin_loop_begin, $_ext_output_loop_begin
327             if (ref $_ext_output_loop_begin eq 'CODE');
328 45 50       150 push @_plugin_loop_end, $_ext_output_loop_end
329             if (ref $_ext_output_loop_end eq 'CODE');
330 45 100       164 push @_plugin_worker_init, $_ext_worker_init
331             if (ref $_ext_worker_init eq 'CODE');
332             }
333              
334 45         98 @_ = ();
335              
336 45         95 return;
337             }
338              
339             ## Functions for saving and restoring $MCE.
340             ## Called by MCE::{ Flow, Grep, Loop, Map, Step, and Stream }.
341              
342             sub _save_state {
343 182     182   336 $_prev_mce = $MCE; $MCE = $_[0];
  182         374  
344 182         307 return;
345             }
346             sub _restore_state {
347 129     129   1096 $_prev_mce->{_wrk_status} = $MCE->{_wrk_status};
348 129         258 $MCE = $_prev_mce; $_prev_mce = undef;
  129         495  
349 129         279 return;
350             }
351              
352             ###############################################################################
353             ## ----------------------------------------------------------------------------
354             ## New instance instantiation.
355             ##
356             ###############################################################################
357              
358             sub _croak {
359 0 0 0 0   0 if (MCE->wid == 0 || ! $^S) {
360 0         0 $SIG{__DIE__} = \&MCE::Signal::_die_handler;
361 0         0 $SIG{__WARN__} = \&MCE::Signal::_warn_handler;
362             }
363 0         0 $\ = undef; goto &Carp::croak;
  0         0  
364             }
365              
366             sub _relay (;&) {
367 0     0   0 goto &MCE::relay;
368             }
369              
370 85     85   34024 use MCE::Core::Validation ();
  85         169  
  85         1617  
371 85     85   35958 use MCE::Core::Manager ();
  85         203  
  85         1870  
372 85     85   34206 use MCE::Core::Worker ();
  85         212  
  85         180581  
373              
374             sub new {
375 319     319 0 32143 my ($class, %self) = @_;
376 319 100       1305 my $_pkg = exists $self{pkg} ? delete $self{pkg} : caller;
377              
378 319         616 @_ = ();
379              
380 319   33     1712 bless(\%self, ref($class) || $class);
381              
382 319   100     1476 $self{task_name} ||= 'MCE';
383 319   50     1549 $self{max_workers} ||= $_def->{$_pkg}{MAX_WORKERS} || 1;
      66        
384 319   50     2478 $self{chunk_size} ||= $_def->{$_pkg}{CHUNK_SIZE} || 1;
      33        
385 319   66     4255 $self{tmp_dir} ||= $_def->{$_pkg}{TMP_DIR} || $MCE::Signal::tmp_dir;
      66        
386 319   33     2313 $self{freeze} ||= $_def->{$_pkg}{FREEZE} || $_freeze;
      33        
387 319   33     2030 $self{thaw} ||= $_def->{$_pkg}{THAW} || $_thaw;
      33        
388              
389             $self{init_relay} = $_def->{$_pkg}{INIT_RELAY}
390 319 50       811 if (exists $_def->{$_pkg}{INIT_RELAY});
391              
392             $self{use_threads} = $_def->{$_pkg}{USE_THREADS}
393 319 50       680 if (exists $_def->{$_pkg}{USE_THREADS});
394              
395 319 100       784 if (exists $self{_module_instance}) {
396 85         225 $self{_init_total_workers} = $self{max_workers};
397 85         205 $self{_chunk_id} = $self{_task_wid} = $self{_wrk_status} = 0;
398 85         174 $self{_spawned} = $self{_task_id} = $self{_wid} = 0;
399 85         401 $self{_init_pid} = "$$.$_tid";
400              
401 85         261 return \%self;
402             }
403              
404 234         908 _sendto_fhs_close();
405              
406 234         960 for my $_p (keys %self) {
407             _croak("MCE::new: ($_p) is not a valid constructor argument")
408 1637 50       2907 unless (exists $_valid_fields_new{$_p});
409             }
410              
411 234         1341 $self{_caller} = $_pkg, $self{_init_pid} = "$$.$_tid";
412              
413 234 50       626 if (defined $self{use_threads}) {
414 0 0 0     0 if (!$_has_threads && $self{use_threads}) {
415 0         0 my $_msg = "\n";
416 0         0 $_msg .= "## Please include threads support prior to loading MCE\n";
417 0         0 $_msg .= "## when specifying use_threads => $self{use_threads}\n";
418 0         0 $_msg .= "\n";
419              
420 0         0 _croak($_msg);
421             }
422             }
423             else {
424 234 50       743 $self{use_threads} = ($_has_threads) ? 1 : 0;
425             }
426              
427 234 50       573 if (!exists $self{posix_exit}) {
428             $self{posix_exit} = 1 if (
429             $^S || $_tid || $INC{'Mojo/IOLoop.pm'} ||
430             $INC{'Coro.pm'} || $INC{'LWP/UserAgent.pm'} || $INC{'stfl.pm'} ||
431             $INC{'Curses.pm'} || $INC{'CGI.pm'} || $INC{'FCGI.pm'} ||
432             $INC{'Tk.pm'} || $INC{'Wx.pm'} || $INC{'Win32/GUI.pm'} ||
433 234 50 33     6610 $INC{'Gearman/Util.pm'} || $INC{'Gearman/XS.pm'}
      33        
      33        
      33        
      33        
      33        
      33        
      33        
      33        
      33        
      33        
      33        
      33        
434             );
435             }
436              
437             ## -------------------------------------------------------------------------
438             ## Validation.
439              
440 234 100       755 if (defined $self{tmp_dir}) {
441             _croak("MCE::new: ($self{tmp_dir}) is not a directory or does not exist")
442 42 50       714 unless (-d $self{tmp_dir});
443             _croak("MCE::new: ($self{tmp_dir}) is not writeable")
444 42 50       609 unless (-w $self{tmp_dir});
445             }
446              
447 234 100       819 if (defined $self{user_tasks}) {
448             _croak('MCE::new: (user_tasks) is not an ARRAY reference')
449 103 50       460 unless (ref $self{user_tasks} eq 'ARRAY');
450              
451 103         614 $self{max_workers} = _parse_max_workers($self{max_workers});
452             $self{init_relay} = $self{user_tasks}->[0]->{init_relay}
453 103 50       346 if ($self{user_tasks}->[0]->{init_relay});
454              
455 103         176 for my $_task (@{ $self{user_tasks} }) {
  103         294  
456 159         206 for my $_p (keys %{ $_task }) {
  159         536  
457             _croak("MCE::new: ($_p) is not a valid task constructor argument")
458 528 50       978 unless (exists $_valid_fields_task{$_p});
459             }
460 159 50       227 $_task->{max_workers} = 0 unless scalar(keys %{ $_task });
  159         374  
461              
462             $_task->{max_workers} = $self{max_workers}
463 159 100       447 unless (defined $_task->{max_workers});
464             $_task->{use_threads} = $self{use_threads}
465 159 50       499 unless (defined $_task->{use_threads});
466              
467 159   50     626 bless($_task, ref(\%self) || \%self);
468             }
469             }
470              
471 234         1004 _validate_args(\%self);
472              
473             ## -------------------------------------------------------------------------
474             ## Private options. Limit chunk_size.
475              
476 234         281 my $_run_lock;
477              
478 234         612 $self{_chunk_id} = 0; # Chunk ID
479 234         420 $self{_send_cnt} = 0; # Number of times data was sent via send
480 234         383 $self{_spawned} = 0; # Have workers been spawned
481 234         413 $self{_task_id} = 0; # Task ID, starts at 0 (array index)
482 234         441 $self{_task_wid} = 0; # Task Worker ID, starts at 1 per task
483 234         499 $self{_wid} = 0; # Worker ID, starts at 1 per MCE instance
484 234         497 $self{_wrk_status} = 0; # For saving exit status when worker exits
485              
486 234 50       1022 $self{_run_lock} = threads::shared::share($_run_lock) if $_is_MSWin32;
487              
488             $self{_last_sref} = (ref $self{input_data} eq 'SCALAR')
489 234 50       716 ? refaddr($self{input_data}) : 0;
490              
491             my $_data_channels = ("$$.$_tid" eq $_oid)
492 234 50       1373 ? ( $INC{'MCE/Channel.pm'} ? 4 : DATA_CHANNELS )
    50          
493             : 2;
494              
495 234         369 my $_total_workers = 0;
496              
497 234 100       541 if (defined $self{user_tasks}) {
498 103         158 $_total_workers += $_->{max_workers} for @{ $self{user_tasks} };
  103         363  
499             } else {
500 131         208 $_total_workers = $self{max_workers};
501             }
502              
503 234         430 $self{_init_total_workers} = $_total_workers;
504              
505 234 100       629 $self{_data_channels} = ($_total_workers < $_data_channels)
506             ? $_total_workers : $_data_channels;
507              
508 234 100       655 $self{_lock_chn} = ($_total_workers > $self{_data_channels}) ? 1 : 0;
509 234 50 33     1387 $self{_lock_chn} = 1 if $INC{'MCE/Child.pm'} || $INC{'MCE/Hobo.pm'};
510              
511 234 50       882 $MCE = \%self if ($MCE->{_wid} == 0);
512              
513 234         1230 return \%self;
514             }
515              
516             ###############################################################################
517             ## ----------------------------------------------------------------------------
518             ## Spawn method.
519             ##
520             ###############################################################################
521              
522             sub spawn {
523 126 50   126 0 222 my $self = shift; $self = $MCE unless ref($self);
  126         667  
524              
525 126         271 local $_; @_ = ();
  126         331  
526              
527             _croak('MCE::spawn: method is not allowed by the worker process')
528 126 50       441 if ($self->{_wid});
529              
530             ## Return if workers have already been spawned or if module instance.
531 126 50 33     1101 return $self if ($self->{_spawned} || exists $self->{_module_instance});
532              
533 126 50       378 lock $_WIN_LOCK if $_is_MSWin32; # Obtain locks
534 126 50 33     408 lock $_MCE_LOCK if $_has_threads && $_is_winenv;
535              
536 126 0 33     936 $MCE::_GMUTEX->lock() if ($_tid && $MCE::_GMUTEX);
537 126 50       421 sleep 0.015 if $_tid;
538              
539 126         458 _sendto_fhs_close();
540              
541 126 50       468 if ($INC{'PDL.pm'}) { local $@;
  0         0  
542             # PDL::IO::Storable is required for serializing piddles.
543 0 0       0 eval 'use PDL::IO::Storable' unless $INC{'PDL/IO/Storable.pm'};
544             # PDL data should not be naively copied in new threads.
545 0         0 eval 'no warnings; sub PDL::CLONE_SKIP { 1 }';
546             # Disable PDL auto-threading.
547 0         0 eval q{ PDL::set_autopthread_targ(1) };
548             }
549 126 50 33     656 if ( $INC{'LWP/UserAgent.pm'} && !$INC{'Net/HTTP.pm'} ) {
550 0         0 local $@; eval 'require Net::HTTP; require Net::HTTPS';
  0         0  
551             }
552              
553             ## Start the shared-manager process if not running.
554 126 50       465 MCE::Shared->start() if $INC{'MCE/Shared.pm'};
555              
556             ## Load input module.
557 126 50       611 if (defined $self->{sequence}) {
    100          
558             require MCE::Core::Input::Sequence
559 0 0       0 unless $INC{'MCE/Core/Input/Sequence.pm'};
560             }
561             elsif (defined $self->{input_data}) {
562 45         122 my $_ref = ref $self->{input_data};
563 45 50       333 if ($_ref =~ /^(?:ARRAY|HASH|GLOB|FileHandle|IO::)/) {
    0          
564             require MCE::Core::Input::Request
565 45 100       14834 unless $INC{'MCE/Core/Input/Request.pm'};
566             }
567             elsif ($_ref eq 'CODE') {
568             require MCE::Core::Input::Iterator
569 0 0       0 unless $INC{'MCE/Core/Input/Iterator.pm'};
570             }
571             else {
572             require MCE::Core::Input::Handle
573 0 0       0 unless $INC{'MCE/Core/Input/Handle.pm'};
574             }
575             }
576              
577 126         387 my $_die_handler = $SIG{__DIE__};
578 126         233 my $_warn_handler = $SIG{__WARN__};
579              
580 126         528 $SIG{__DIE__} = \&MCE::Signal::_die_handler;
581 126         417 $SIG{__WARN__} = \&MCE::Signal::_warn_handler;
582              
583 126 50 33     1067 if (!defined $TOP_HDLR || (!$TOP_HDLR->{_mgr_live} && !$TOP_HDLR->{_wid})) {
    0          
584             ## On Windows, must shutdown the last idle MCE session.
585 126 0 33     406 if ($_is_MSWin32 && defined $TOP_HDLR && $TOP_HDLR->{_spawned}) {
      33        
586 0         0 $TOP_HDLR->shutdown(1);
587             }
588 126         188 $TOP_HDLR = $self;
589             }
590             elsif (refaddr($self) != refaddr($TOP_HDLR)) {
591             ## Reduce the maximum number of channels for nested sessions.
592 0 0       0 $self->{_data_channels} = 4 if ($self->{_data_channels} > 4);
593 0 0       0 $self->{_lock_chn} = 1 if ($self->{_init_total_workers} > 4);
594              
595             ## On Windows, instruct the manager process to enable win32 IPC.
596 0 0 0     0 if ($_is_MSWin32 && $ENV{'PERL_MCE_IPC'} ne 'win32') {
597 0         0 $ENV{'PERL_MCE_IPC'} = 'win32'; local $\ = undef;
  0         0  
598 0         0 my $_DAT_W_SOCK = $TOP_HDLR->{_dat_w_sock}->[0];
599 0         0 print {$_DAT_W_SOCK} OUTPUT_S_IPC.$LF . '0'.$LF;
  0         0  
600              
601 0         0 MCE::Util::_sock_ready($_DAT_W_SOCK, -1);
602 0         0 MCE::Util::_sysread($_DAT_W_SOCK, my($_buf), 1);
603             }
604             }
605              
606             ## -------------------------------------------------------------------------
607              
608 126         714 my $_data_channels = $self->{_data_channels};
609 126         736 my $_max_workers = _get_max_workers($self);
610 126         308 my $_use_threads = $self->{use_threads};
611              
612             ## Create [ 0 including 1 up to 8 ] locks for data channels (max 9).
613 126         1110 $self->{'_mutex_0'} = MCE::Mutex->new( impl => 'Channel' );
614              
615 126 50       482 if ($self->{_lock_chn}) {
616             $self->{'_mutex_'.$_} = MCE::Mutex->new( impl => 'Channel' )
617 0         0 for (1 .. $_data_channels);
618             }
619              
620             ## Create two locks for use by MCE::Core::Input::{ Handle or Sequence }.
621 126         850 $self->{'_mutex_'.$_} = MCE::Mutex->new( impl => 'Channel' ) for (10 .. 11);
622              
623             ## Create sockets for IPC. sync, comm, input, data
624 126         743 MCE::Util::_sock_pair($self, qw(_bsb_r_sock _bsb_w_sock), undef, 1);
625 126         394 MCE::Util::_sock_pair($self, qw(_com_r_sock _com_w_sock), undef, 1);
626 126         565 MCE::Util::_sock_pair($self, qw(_que_r_sock _que_w_sock), undef, 1);
627 126         453 MCE::Util::_sock_pair($self, qw(_dat_r_sock _dat_w_sock), 0);
628              
629             MCE::Util::_sock_pair($self, qw(_dat_r_sock _dat_w_sock), $_, 1)
630 126         672 for (1 .. $_data_channels);
631              
632 126 50 33     1053 setsockopt($self->{_dat_r_sock}->[0], SOL_SOCKET, SO_RCVBUF, pack('i', 4096))
633             if ($^O ne 'aix' && $^O ne 'linux');
634              
635 126 100       7671 if (defined $self->{init_relay}) { # relay
636 24 100       79 unless ($INC{'MCE/Relay.pm'}) {
637 9         3951 require MCE::Relay; MCE::Relay->import();
  9         36  
638             }
639             MCE::Util::_sock_pair($self, qw(_rla_r_sock _rla_w_sock), $_, 1)
640 24         120 for (0 .. $_max_workers - 1);
641             }
642              
643 126         2709 $self->{_seed} = int(rand() * 1e9);
644              
645             ## -------------------------------------------------------------------------
646              
647             ## Spawn workers.
648 126         693 $self->{_pids} = [], $self->{_thrs} = [], $self->{_tids} = [];
649 126         545 $self->{_status} = [], $self->{_state} = [], $self->{_task} = [];
650              
651 126 50 33     3019 if ($self->{loop_timeout} && !$_is_MSWin32) {
652 0         0 $self->{_pids_t} = {}, $self->{_pids_w} = {};
653             }
654              
655 126 50       7495 local $SIG{TTIN}, local $SIG{TTOU}, local $SIG{WINCH} unless $_is_MSWin32;
656              
657 126 100       652 if (!defined $self->{user_tasks}) {
658 35         92 $self->{_total_workers} = $_max_workers;
659              
660 35 50 33     243 if (defined $_use_threads && $_use_threads == 1) {
661 0         0 _dispatch_thread($self, $_) for (1 .. $_max_workers);
662             } else {
663 35         188 _dispatch_child($self, $_) for (1 .. $_max_workers);
664             }
665              
666 16         1010 $self->{_task}->[0] = { _total_workers => $_max_workers };
667              
668 16         346 for my $_i (1 .. $_max_workers) {
669 45         1059 $self->{_state}->[$_i] = {
670             _task => undef, _task_id => undef, _task_wid => undef,
671             _params => undef, _chn => $_i % $_data_channels + 1
672             }
673             }
674             }
675             else {
676 91         417 my ($_task_id, $_wid);
677              
678 91         219 $self->{_total_workers} = 0;
679 91         153 $self->{_total_workers} += $_->{max_workers} for @{ $self->{user_tasks} };
  91         420  
680              
681             # Must spawn processes first for extra stability on BSD/Darwin.
682 91         211 $_task_id = $_wid = 0;
683              
684 91         129 for my $_task (@{ $self->{user_tasks} }) {
  91         236  
685 114         369 my $_tsk_use_threads = $_task->{use_threads};
686              
687 114 50 33     5583 if (defined $_tsk_use_threads && $_tsk_use_threads == 1) {
688 0         0 $_wid += $_task->{max_workers};
689             } else {
690             _dispatch_child($self, ++$_wid, $_task, $_task_id, $_)
691 114         1053 for (1 .. $_task->{max_workers});
692             }
693              
694 68         1972 $_task_id++;
695             }
696              
697             # Then, spawn threads last.
698 45         628 $_task_id = $_wid = 0;
699              
700 45         322 for my $_task (@{ $self->{user_tasks} }) {
  45         374  
701 56         537 my $_tsk_use_threads = $_task->{use_threads};
702              
703 56 50 33     1625 if (defined $_tsk_use_threads && $_tsk_use_threads == 1) {
704             _dispatch_thread($self, ++$_wid, $_task, $_task_id, $_)
705 0         0 for (1 .. $_task->{max_workers});
706             } else {
707 56         390 $_wid += $_task->{max_workers};
708             }
709              
710 56         252 $_task_id++;
711             }
712              
713             # Save state.
714 45         366 $_task_id = $_wid = 0;
715              
716 45         265 for my $_task (@{ $self->{user_tasks} }) {
  45         268  
717             $self->{_task}->[$_task_id] = {
718             _total_running => 0, _total_workers => $_task->{max_workers}
719 56         2005 };
720 56         533 for my $_i (1 .. $_task->{max_workers}) {
721 101         770 $_wid += 1;
722 101         2309 $self->{_state}->[$_wid] = {
723             _task => $_task, _task_id => $_task_id, _task_wid => $_i,
724             _params => undef, _chn => $_wid % $_data_channels + 1
725             }
726             }
727              
728 56         327 $_task_id++;
729             }
730             }
731              
732             ## -------------------------------------------------------------------------
733              
734 61         731 $self->{_send_cnt} = 0, $self->{_spawned} = 1;
735              
736 61         1031 $SIG{__DIE__} = $_die_handler;
737 61         799 $SIG{__WARN__} = $_warn_handler;
738              
739 61 50       920 $MCE = $self if ($MCE->{_wid} == 0);
740              
741 61 0 33     940 $MCE::_GMUTEX->unlock() if ($_tid && $MCE::_GMUTEX);
742              
743 61         8355 return $self;
744             }
745              
746             ###############################################################################
747             ## ----------------------------------------------------------------------------
748             ## Process method, relay stubs, and AUTOLOAD for methods not used often.
749             ##
750             ###############################################################################
751              
752             sub process {
753 106 50   106 0 3019 my $self = shift; $self = $MCE unless ref($self);
  106         305  
754              
755 106         499 _validate_runstate($self, 'MCE::process');
756              
757 106         185 my ($_params_ref, $_input_data);
758              
759 106 100 100     798 if (ref $_[0] eq 'HASH' && ref $_[1] eq 'HASH') {
    100          
760 6         16 $_params_ref = $_[0], $_input_data = $_[1];
761             } elsif (ref $_[0] eq 'HASH') {
762 95         237 $_params_ref = $_[0], $_input_data = $_[1];
763             } else {
764 5         10 $_params_ref = $_[1], $_input_data = $_[0];
765             }
766              
767 106         174 @_ = ();
768              
769             ## Set input data.
770 106 50 0     259 if (defined $_input_data) {
    0          
771 106         346 $_params_ref->{input_data} = $_input_data;
772             }
773             elsif ( !defined $_params_ref->{input_data} &&
774             !defined $_params_ref->{sequence} ) {
775 0         0 _croak('MCE::process: (input_data or sequence) is not specified');
776             }
777              
778             ## Pass 0 to "not" auto-shutdown after processing.
779 106         426 $self->run(0, $_params_ref);
780              
781 79         320 return $self;
782             }
783              
784             sub relay (;&) {
785             _croak('MCE::relay: (init_relay) is not defined')
786 4 0   4 0 42 unless (defined $MCE->{init_relay});
787             }
788              
789             {
790 85     85   610 no warnings 'once';
  85         217  
  85         365597  
791             *relay_unlock = \&relay;
792             }
793              
794             sub AUTOLOAD {
795             # $AUTOLOAD = MCE::<method_name>
796              
797 0     0   0 my $_fcn = substr($MCE::AUTOLOAD, 5);
798 0 0       0 my $self = shift; $self = $MCE unless ref($self);
  0         0  
799              
800             # "for" sugar methods
801              
802 0 0       0 if ($_fcn eq 'forchunk') {
    0          
    0          
803 0 0       0 require MCE::Candy unless $INC{'MCE/Candy.pm'};
804 0         0 return MCE::Candy::forchunk($self, @_);
805             }
806             elsif ($_fcn eq 'foreach') {
807 0 0       0 require MCE::Candy unless $INC{'MCE/Candy.pm'};
808 0         0 return MCE::Candy::foreach($self, @_);
809             }
810             elsif ($_fcn eq 'forseq') {
811 0 0       0 require MCE::Candy unless $INC{'MCE/Candy.pm'};
812 0         0 return MCE::Candy::forseq($self, @_);
813             }
814              
815             # relay stubs for MCE::Relay
816              
817 0 0 0     0 if ($_fcn eq 'relay_lock' || $_fcn eq 'relay_recv') {
    0          
818             _croak('MCE::relay: (init_relay) is not defined')
819 0 0       0 unless (defined $MCE->{init_relay});
820             }
821             elsif ($_fcn eq 'relay_final') {
822 0         0 return;
823             }
824              
825             # worker immediately exits the chunking loop
826              
827 0 0       0 if ($_fcn eq 'last') {
    0          
    0          
    0          
828             _croak('MCE::last: method is not allowed by the manager process')
829 0 0       0 unless ($self->{_wid});
830              
831 0 0       0 $self->{_last_jmp}() if (defined $self->{_last_jmp});
832              
833 0         0 return;
834             }
835              
836             # worker starts the next iteration of the chunking loop
837              
838             elsif ($_fcn eq 'next') {
839             _croak('MCE::next: method is not allowed by the manager process')
840 0 0       0 unless ($self->{_wid});
841              
842 0 0       0 $self->{_next_jmp}() if (defined $self->{_next_jmp});
843              
844 0         0 return;
845             }
846              
847             # return the process ID, include thread ID for threads
848              
849             elsif ($_fcn eq 'pid') {
850 0 0 0     0 if (defined $self->{_pid}) {
    0          
851 0         0 return $self->{_pid};
852             } elsif ($_has_threads && $self->{use_threads}) {
853 0         0 return $$ .'.'. threads->tid();
854             }
855 0         0 return $$;
856             }
857              
858             # return the exit status
859             # _wrk_status holds the greatest exit status among workers exiting
860              
861             elsif ($_fcn eq 'status') {
862             _croak('MCE::status: method is not allowed by the worker process')
863 0 0       0 if ($self->{_wid});
864              
865 0 0       0 return (defined $self->{_wrk_status}) ? $self->{_wrk_status} : 0;
866             }
867              
868 0         0 _croak("Can't locate object method \"$_fcn\" via package \"MCE\"");
869             }
870              
871             ###############################################################################
872             ## ----------------------------------------------------------------------------
873             ## Restart worker method.
874             ##
875             ###############################################################################
876              
877             sub restart_worker {
878 0 0   0 0 0 my $self = shift; $self = $MCE unless ref($self);
  0         0  
879              
880 0         0 @_ = ();
881              
882             _croak('MCE::restart_worker: method is not allowed by the worker process')
883 0 0       0 if ($self->{_wid});
884              
885 0         0 my $_wid = $self->{_exited_wid};
886              
887 0         0 my $_params = $self->{_state}->[$_wid]->{_params};
888 0         0 my $_task_wid = $self->{_state}->[$_wid]->{_task_wid};
889 0         0 my $_task_id = $self->{_state}->[$_wid]->{_task_id};
890 0         0 my $_task = $self->{_state}->[$_wid]->{_task};
891 0         0 my $_chn = $self->{_state}->[$_wid]->{_chn};
892              
893 0         0 $_params->{_chn} = $_chn;
894              
895             my $_use_threads = (defined $_task_id)
896 0 0       0 ? $_task->{use_threads} : $self->{use_threads};
897              
898 0 0       0 $self->{_task}->[$_task_id]->{_total_running} += 1 if (defined $_task_id);
899 0 0       0 $self->{_task}->[$_task_id]->{_total_workers} += 1 if (defined $_task_id);
900              
901 0         0 $self->{_total_running} += 1;
902 0         0 $self->{_total_workers} += 1;
903              
904 0 0 0     0 if (defined $_use_threads && $_use_threads == 1) {
905 0         0 _dispatch_thread($self, $_wid, $_task, $_task_id, $_task_wid, $_params);
906             } else {
907 0         0 _dispatch_child($self, $_wid, $_task, $_task_id, $_task_wid, $_params);
908             }
909              
910 0         0 delete $self->{_retry_cnt};
911              
912 0 0 0     0 if (defined $self->{spawn_delay} && $self->{spawn_delay} > 0.0) {
    0 0        
913 0         0 sleep $self->{spawn_delay};
914             } elsif ($_tid || $_is_MSWin32) {
915 0         0 sleep 0.045;
916             }
917              
918 0         0 return;
919             }
920              
921             ###############################################################################
922             ## ----------------------------------------------------------------------------
923             ## Run method.
924             ##
925             ###############################################################################
926              
927             sub run {
928 207 50   207 0 534 my $self = shift; $self = $MCE unless ref($self);
  207         570  
929              
930             _croak('MCE::run: method is not allowed by the worker process')
931 207 50       566 if ($self->{_wid});
932              
933 207         504 my ($_auto_shutdown, $_params_ref);
934              
935 207 100       681 if (ref $_[0] eq 'HASH') {
936 81 50       352 $_auto_shutdown = (defined $_[1]) ? $_[1] : 1;
937 81         164 $_params_ref = $_[0];
938             } else {
939 126 100       273 $_auto_shutdown = (defined $_[0]) ? $_[0] : 1;
940 126         187 $_params_ref = $_[1];
941             }
942              
943 207         349 @_ = ();
944              
945 207 100       538 my $_has_user_tasks = (defined $self->{user_tasks}) ? 1 : 0;
946 207         330 my $_requires_shutdown = 0;
947              
948             ## Unset params if workers have already been sent user_data via send.
949             ## Set user_func to NOOP if not specified.
950              
951 207 50       493 $_params_ref = undef if ($self->{_send_cnt});
952              
953 207 50 66     1000 if (!defined $self->{user_func} && !defined $_params_ref->{user_func}) {
954 91         413 $self->{user_func} = \&MCE::Signal::_NOOP;
955             }
956              
957             ## Set user specified params if specified.
958             ## Shutdown workers if determined by _sync_params or if processing a
959             ## scalar reference. Workers need to be restarted in order to pick up
960             ## on the new code or scalar reference.
961              
962 207 100 66     1251 if (defined $_params_ref && ref $_params_ref eq 'HASH') {
963 187         736 $_requires_shutdown = _sync_params($self, $_params_ref);
964 187         838 _validate_args($self);
965             }
966 207 100       502 if ($_has_user_tasks) {
967             $self->{input_data} = $self->{user_tasks}->[0]->{input_data}
968 154 50       446 if ($self->{user_tasks}->[0]->{input_data});
969             $self->{use_slurpio} = $self->{user_tasks}->[0]->{use_slurpio}
970 154 50       484 if ($self->{user_tasks}->[0]->{use_slurpio});
971             $self->{parallel_io} = $self->{user_tasks}->[0]->{parallel_io}
972 154 50       446 if ($self->{user_tasks}->[0]->{parallel_io});
973             $self->{RS} = $self->{user_tasks}->[0]->{RS}
974 154 50       434 if ($self->{user_tasks}->[0]->{RS});
975             }
976 207 50       623 if (ref $self->{input_data} eq 'SCALAR') {
977 0 0       0 if (refaddr($self->{input_data}) != $self->{_last_sref}) {
978 0         0 $_requires_shutdown = 1;
979             }
980 0         0 $self->{_last_sref} = refaddr($self->{input_data});
981             }
982              
983 207 50       1096 $self->shutdown() if ($_requires_shutdown);
984              
985             ## -------------------------------------------------------------------------
986              
987 207         514 $self->{_wrk_status} = 0;
988              
989             ## Spawn workers.
990 207 100       1261 $self->spawn() unless ($self->{_spawned});
991 142 50       1302 return $self unless ($self->{_total_workers});
992              
993 142         1940 local $SIG{__DIE__} = \&MCE::Signal::_die_handler;
994 142         1253 local $SIG{__WARN__} = \&MCE::Signal::_warn_handler;
995              
996 142 50       959 $MCE = $self if ($MCE->{_wid} == 0);
997              
998 142         979 my ($_input_data, $_input_file, $_input_glob, $_seq);
999 142         0 my ($_abort_msg, $_first_msg, $_run_mode, $_single_dim);
1000 142         507 my $_chunk_size = $self->{chunk_size};
1001              
1002             $_seq = ($_has_user_tasks && $self->{user_tasks}->[0]->{sequence})
1003             ? $self->{user_tasks}->[0]->{sequence}
1004 142 50 66     1863 : $self->{sequence};
1005              
1006             ## Determine run mode for workers.
1007 142 100       826 if (defined $_seq) {
    100          
1008             my ($_begin, $_end, $_step) = (ref $_seq eq 'ARRAY')
1009 18 50       98 ? @{ $_seq } : ($_seq->{begin}, $_seq->{end}, $_seq->{step});
  18         51  
1010              
1011             $_chunk_size = $self->{user_tasks}->[0]->{chunk_size}
1012 18 50 66     207 if ($_has_user_tasks && $self->{user_tasks}->[0]->{chunk_size});
1013              
1014 18         38 $_run_mode = 'sequence';
1015 18         62 $_abort_msg = int(($_end - $_begin) / $_step / $_chunk_size); # + 1;
1016              
1017             # Previously + 1 above. Below, support for large numbers, 1e16 and beyond.
1018             # E.g. sequence => [ 1, 1e16 ], chunk_size => 1e11
1019             #
1020             # Perl: int((1e15 - 1) / 1 / 1e11) = 9999
1021             # Perl: int((1e16 - 1) / 1 / 1e11) = 100000 wrong, due to precision limit
1022             # Calc: int((1e16 - 1) / 1 / 1e11) = 99999
1023              
1024 18 50       43 if ( $_step > 0 ) {
1025 18 50       84 $_abort_msg++
1026             if ($_abort_msg * $_chunk_size * abs($_step) + $_begin <= $_end);
1027             } else {
1028 0 0       0 $_abort_msg++
1029             if ($_abort_msg * $_chunk_size * abs($_step) + $_end <= $_begin);
1030             }
1031              
1032 18         43 $_first_msg = 0;
1033             }
1034             elsif (defined $self->{input_data}) {
1035 79         317 my $_ref = ref $self->{input_data};
1036              
1037 79 100       974 if ($_ref eq '') { # File mode
    100          
    100          
    50          
    0          
    0          
1038 18         48 $_run_mode = 'file';
1039 18         37 $_input_file = $self->{input_data};
1040 18         43 $_input_data = $_input_glob = undef;
1041 18         214 $_abort_msg = (-s $_input_file) + 1;
1042 18         51 $_first_msg = 0; ## Begin at offset position
1043              
1044 18 50       188 if ((-s $_input_file) == 0) {
1045 0 0       0 $self->shutdown() if ($_auto_shutdown == 1);
1046 0         0 return $self;
1047             }
1048             }
1049             elsif ($_ref eq 'ARRAY') { # Array mode
1050 40         248 $_run_mode = 'array';
1051 40         206 $_input_data = $self->{input_data};
1052 40         120 $_input_file = $_input_glob = undef;
1053 40 50       159 $_single_dim = 1 if (ref $_input_data->[0] eq '');
1054 40         63 $_abort_msg = 0; ## Flag: Has Data: No
1055 40         76 $_first_msg = 1; ## Flag: Has Data: Yes
1056              
1057 40 50       89 if (@{ $_input_data } == 0) {
  40         192  
1058 0 0       0 $self->shutdown() if ($_auto_shutdown == 1);
1059 0         0 return $self;
1060             }
1061             }
1062             elsif ($_ref eq 'HASH') { # Hash mode
1063 3         88 $_run_mode = 'hash';
1064 3         58 $_input_data = $self->{input_data};
1065 3         36 $_input_file = $_input_glob = undef;
1066 3         32 $_abort_msg = 0; ## Flag: Has Data: No
1067 3         23 $_first_msg = 1; ## Flag: Has Data: Yes
1068              
1069 3 50       24 if (scalar( keys %{ $_input_data } ) == 0) {
  3         96  
1070 0 0       0 $self->shutdown() if ($_auto_shutdown == 1);
1071 0         0 return $self;
1072             }
1073             }
1074             elsif ($_ref =~ /^(?:GLOB|FileHandle|IO::)/) { # Glob mode
1075 18         56 $_run_mode = 'glob';
1076 18         41 $_input_glob = $self->{input_data};
1077 18         51 $_input_data = $_input_file = undef;
1078 18         31 $_abort_msg = 0; ## Flag: Has Data: No
1079 18         30 $_first_msg = 1; ## Flag: Has Data: Yes
1080             }
1081             elsif ($_ref eq 'CODE') { # Iterator mode
1082 0         0 $_run_mode = 'iterator';
1083 0         0 $_input_data = $self->{input_data};
1084 0         0 $_input_file = $_input_glob = undef;
1085 0         0 $_abort_msg = 0; ## Flag: Has Data: No
1086 0         0 $_first_msg = 1; ## Flag: Has Data: Yes
1087             }
1088             elsif ($_ref eq 'SCALAR') { # Memory mode
1089 0         0 $_run_mode = 'memory';
1090 0         0 $_input_data = $_input_file = $_input_glob = undef;
1091 0         0 $_abort_msg = length(${ $self->{input_data} }) + 1;
  0         0  
1092 0         0 $_first_msg = 0; ## Begin at offset position
1093              
1094 0 0       0 if (length(${ $self->{input_data} }) == 0) {
  0         0  
1095 0 0       0 return $self->shutdown() if ($_auto_shutdown == 1);
1096             }
1097             }
1098             else {
1099 0         0 _croak('MCE::run: (input_data) is not valid');
1100             }
1101             }
1102             else { # Nodata mode
1103 45         481 $_abort_msg = undef, $_run_mode = 'nodata';
1104             }
1105              
1106             ## -------------------------------------------------------------------------
1107              
1108 142         648 my $_total_workers = $self->{_total_workers};
1109 142         316 my $_send_cnt = $self->{_send_cnt};
1110              
1111 142 50       465 if ($_send_cnt) {
1112 0         0 $self->{_total_running} = $_send_cnt;
1113 0         0 $self->{_task}->[0]->{_total_running} = $_send_cnt;
1114             }
1115             else {
1116 142         687 $self->{_total_running} = $_total_workers;
1117              
1118 142         543 my ($_frozen_nodata, $_wid, %_params_nodata, %_task0_wids);
1119 142         503 my $_COM_R_SOCK = $self->{_com_r_sock};
1120 142         262 my $_submit_delay = $self->{submit_delay};
1121              
1122             my %_params = (
1123             '_abort_msg' => $_abort_msg, '_chunk_size' => $_chunk_size,
1124             '_input_file' => $_input_file, '_run_mode' => $_run_mode,
1125             '_bounds_only' => $self->{bounds_only},
1126             '_max_retries' => $self->{max_retries},
1127             '_parallel_io' => $self->{parallel_io},
1128             '_progress' => $self->{progress} ? 1 : 0,
1129             '_sequence' => $self->{sequence},
1130             '_user_args' => $self->{user_args},
1131             '_use_slurpio' => $self->{use_slurpio},
1132             '_RS' => $self->{RS}
1133 142 50       4447 );
1134              
1135 142         3991 my $_frozen_params = $self->{freeze}(\%_params);
1136 142         925 $_frozen_params = length($_frozen_params).$LF . $_frozen_params;
1137              
1138 142 100       450 if ($_has_user_tasks) {
1139 108         1650 %_params_nodata = ( %_params,
1140             '_abort_msg' => undef, '_run_mode' => 'nodata'
1141             );
1142 108         1099 $_frozen_nodata = $self->{freeze}(\%_params_nodata);
1143 108         356 $_frozen_nodata = length($_frozen_nodata).$LF . $_frozen_nodata;
1144              
1145 108         171 for my $_t (@{ $self->{_task} }) {
  108         686  
1146 180         463 $_t->{_total_running} = $_t->{_total_workers};
1147             }
1148 108         202 for my $_i (1 .. @{ $self->{_state} } - 1) {
  108         354  
1149 347 100       1578 $_task0_wids{$_i} = undef unless ($self->{_state}[$_i]{_task_id});
1150             }
1151             }
1152              
1153 142         1069 local $\ = undef; local $/ = $LF;
  142         1810  
1154              
1155             ## Insert the first message into the queue if defined.
1156 142 100       426 if (defined $_first_msg) {
1157 97         1802 syswrite($self->{_que_w_sock}, pack($_que_template, 0, $_first_msg));
1158             }
1159              
1160             ## Submit params data to workers.
1161 142         807 for my $_i (1 .. $_total_workers) {
1162 428         596 print({$_COM_R_SOCK} $_i.$LF), chomp($_wid = <$_COM_R_SOCK>);
  428         221585  
1163              
1164 428 100 100     3830 if (!$_has_user_tasks || exists $_task0_wids{$_wid}) {
1165 282         490 print({$_COM_R_SOCK} $_frozen_params), <$_COM_R_SOCK>;
  282         85738  
1166 282         2241 $self->{_state}[$_wid]{_params} = \%_params;
1167             } else {
1168 146         238 print({$_COM_R_SOCK} $_frozen_nodata), <$_COM_R_SOCK>;
  146         23433  
1169 146         943 $self->{_state}[$_wid]{_params} = \%_params_nodata;
1170             }
1171              
1172 428 50 33     3903 sleep $_submit_delay
1173             if defined($_submit_delay) && $_submit_delay > 0.0;
1174             }
1175             }
1176              
1177             ## -------------------------------------------------------------------------
1178              
1179 142         419 $self->{_total_exited} = 0;
1180              
1181             ## Call the output function.
1182 142 50       633 if ($self->{_total_running} > 0) {
1183 142         679 $self->{_mgr_live} = 1;
1184 142         322 $self->{_abort_msg} = $_abort_msg;
1185 142         811 $self->{_single_dim} = $_single_dim;
1186              
1187 142 50       441 lock $self->{_run_lock} if $_is_MSWin32;
1188              
1189 142 50       686 if (!$_send_cnt) {
1190             ## Notify workers to commence processing.
1191 142 50       358 if ($_is_MSWin32) {
1192 0         0 my $_buf = _sprintf("%${_total_workers}s", "");
1193 0         0 syswrite($self->{_bsb_r_sock}, $_buf);
1194             } else {
1195 142         513 my $_BSB_R_SOCK = $self->{_bsb_r_sock};
1196 142         548 for my $_i (1 .. $_total_workers) {
1197 428         12097 syswrite($_BSB_R_SOCK, $LF);
1198             }
1199             }
1200             }
1201              
1202 142         3318 _output_loop( $self, $_input_data, $_input_glob,
1203             \%_plugin_function, \@_plugin_loop_begin, \@_plugin_loop_end
1204             );
1205              
1206 142         817 $self->{_mgr_live} = $self->{_abort_msg} = $self->{_single_dim} = undef;
1207             }
1208              
1209             ## Remove the last message from the queue.
1210 142 100 66     1344 if (!$_send_cnt && $_run_mode ne 'nodata') {
1211             MCE::Util::_sysread($self->{_que_r_sock}, my($_buf), $_que_read_size)
1212 97 50       1132 if ( defined $self->{_que_r_sock} );
1213             }
1214              
1215 142         425 $self->{_send_cnt} = 0;
1216              
1217             ## Shutdown workers.
1218 142 100 66     2188 if ($_auto_shutdown || $self->{_total_exited}) {
    50 33        
    50          
1219 10         105 $self->shutdown();
1220             }
1221             elsif ($INC{'MCE/Simple.pm'}) {
1222 0         0 $self->shutdown();
1223             }
1224             elsif ($^S || $ENV{'PERL_IPERL_RUNNING'}) {
1225 0 0 0     0 if (
      0        
      0        
      0        
      0        
1226             !$INC{'Mojo/IOLoop.pm'} && !$INC{'Win32/GUI.pm'} &&
1227             !$INC{'Gearman/XS.pm'} && !$INC{'Gearman/Util.pm'} &&
1228             !$INC{'Tk.pm'} && !$INC{'Wx.pm'}
1229             ) {
1230             # running inside eval or IPerl, check stack trace
1231 0         0 my $_t = Carp::longmess(); $_t =~ s/\teval [^\n]+\n$//;
  0         0  
1232              
1233 0 0 0     0 if ( $_t =~ /^(?:[^\n]+\n){1,7}\teval / ||
      0        
      0        
      0        
1234             $_t =~ /\n\teval [^\n]+\n\t(?:eval|Try)/ ||
1235             $_t =~ /\n\tMCE::_dispatch\(\) [^\n]+ thread \d+\n$/ ||
1236             ( $_tid && !$self->{use_threads} ) )
1237             {
1238 0         0 $self->shutdown();
1239             }
1240             }
1241             }
1242              
1243 142         1249 return $self;
1244             }
1245              
1246             ###############################################################################
1247             ## ----------------------------------------------------------------------------
1248             ## Send method.
1249             ##
1250             ###############################################################################
1251              
1252             sub send {
1253 0 0   0 0 0 my $self = shift; $self = $MCE unless ref($self);
  0         0  
1254              
1255             _croak('MCE::send: method is not allowed by the worker process')
1256 0 0       0 if ($self->{_wid});
1257             _croak('MCE::send: method is not allowed while running')
1258 0 0       0 if ($self->{_total_running});
1259              
1260             _croak('MCE::send: method cannot be used with input_data or sequence')
1261 0 0 0     0 if (defined $self->{input_data} || defined $self->{sequence});
1262             _croak('MCE::send: method cannot be used with user_tasks')
1263 0 0       0 if (defined $self->{user_tasks});
1264              
1265 0         0 my $_data_ref;
1266              
1267 0 0 0     0 if (ref $_[0] eq 'ARRAY' || ref $_[0] eq 'HASH' || ref $_[0] eq 'PDL') {
      0        
1268 0         0 $_data_ref = $_[0];
1269             } else {
1270 0         0 _croak('MCE::send: ARRAY, HASH, or a PDL reference is not specified');
1271             }
1272              
1273 0         0 @_ = ();
1274              
1275 0 0       0 $self->{_send_cnt} = 0 unless (defined $self->{_send_cnt});
1276              
1277             ## -------------------------------------------------------------------------
1278              
1279             ## Spawn workers.
1280 0 0       0 $self->spawn() unless ($self->{_spawned});
1281              
1282             _croak('MCE::send: Sending greater than # of workers is not allowed')
1283 0 0       0 if ($self->{_send_cnt} >= $self->{_task}->[0]->{_total_workers});
1284              
1285 0         0 local $SIG{__DIE__} = \&MCE::Signal::_die_handler;
1286 0         0 local $SIG{__WARN__} = \&MCE::Signal::_warn_handler;
1287              
1288             ## Begin data submission.
1289 0         0 local $\ = undef; local $/ = $LF;
  0         0  
1290              
1291 0         0 my $_COM_R_SOCK = $self->{_com_r_sock};
1292 0         0 my $_submit_delay = $self->{submit_delay};
1293 0         0 my $_frozen_data = $self->{freeze}($_data_ref);
1294 0         0 my $_len = length $_frozen_data;
1295              
1296             ## Submit data to worker.
1297 0         0 print({$_COM_R_SOCK} '_data'.$LF), <$_COM_R_SOCK>;
  0         0  
1298 0         0 print({$_COM_R_SOCK} $_len.$LF, $_frozen_data), <$_COM_R_SOCK>;
  0         0  
1299              
1300 0         0 $self->{_send_cnt} += 1;
1301              
1302 0 0 0     0 sleep $_submit_delay
1303             if defined($_submit_delay) && $_submit_delay > 0.0;
1304              
1305 0         0 return $self;
1306             }
1307              
1308             ###############################################################################
1309             ## ----------------------------------------------------------------------------
1310             ## Shutdown method.
1311             ##
1312             ###############################################################################
1313              
1314             sub shutdown {
1315 61 50   61 0 1380 my $self = shift; $self = $MCE unless ref($self);
  61         266  
1316 61   50     511 my $_no_lock = shift || 0;
1317              
1318 61         155 @_ = ();
1319              
1320             ## Return unless spawned or already shutdown.
1321 61 50       203 return unless $self->{_spawned};
1322              
1323             ## Return if signaled.
1324 61 50       194 if ($MCE::Signal::KILLED) {
1325 0 0       0 if (defined $self->{_sess_dir}) {
1326 0         0 my $_sess_dir = delete $self->{_sess_dir};
1327 0 0       0 rmdir $_sess_dir if -d $_sess_dir;
1328             }
1329 0         0 return;
1330             }
1331              
1332 61         967 _validate_runstate($self, 'MCE::shutdown');
1333              
1334             ## Complete processing before shutting down.
1335 61 50       241 $self->run(0) if ($self->{_send_cnt});
1336              
1337 61         373 local $SIG{__DIE__} = \&MCE::Signal::_die_handler;
1338 61         279 local $SIG{__WARN__} = \&MCE::Signal::_warn_handler;
1339              
1340 61         143 my $_COM_R_SOCK = $self->{_com_r_sock};
1341 61         128 my $_data_channels = $self->{_data_channels};
1342 61         117 my $_total_workers = $self->{_total_workers};
1343 61         113 my $_sess_dir = $self->{_sess_dir};
1344              
1345 61 50 33     1030 if (defined $TOP_HDLR && refaddr($self) == refaddr($TOP_HDLR)) {
1346 61         289 $TOP_HDLR = undef;
1347             }
1348              
1349             ## -------------------------------------------------------------------------
1350              
1351 61 0 33     268 lock $_MCE_LOCK if ($_has_threads && $_is_winenv && !$_no_lock);
      33        
1352              
1353             ## Notify workers to exit loop.
1354 61         292 local ($!, $?, $_); local $\ = undef; local $/ = $LF;
  61         137  
  61         339  
1355              
1356 61         205 for (1 .. $_total_workers) {
1357 146         432 print({$_COM_R_SOCK} '_exit'.$LF), <$_COM_R_SOCK>;
  146         293984  
1358             }
1359              
1360             ## Reap children and/or threads.
1361 61 50       312 if (@{ $self->{_pids} } > 0) {
  61         402  
1362 61         151 my $_list = $self->{_pids};
1363 61         108 for my $i (0 .. @{ $_list }) {
  61         976  
1364 207 100       138779271 waitpid($_list->[$i], 0) if $_list->[$i];
1365             }
1366             }
1367 61 50       423 if (@{ $self->{_thrs} } > 0) {
  61         609  
1368 0         0 my $_list = $self->{_thrs};
1369 0         0 for my $i (0 .. @{ $_list }) {
  0         0  
1370 0 0       0 $_list->[$i]->join() if $_list->[$i];
1371             }
1372             }
1373              
1374             ## Close sockets.
1375 61         277 $_COM_R_SOCK = undef;
1376              
1377 61         1167 MCE::Util::_destroy_socks($self, qw(
1378             _bsb_w_sock _bsb_r_sock _com_w_sock _com_r_sock _que_w_sock _que_r_sock
1379             _dat_w_sock _dat_r_sock _rla_w_sock _rla_r_sock
1380             ));
1381              
1382             ## -------------------------------------------------------------------------
1383              
1384             ## Destroy mutexes.
1385 61         224 for my $_i (0 .. $_data_channels) { delete $self->{'_mutex_'.$_i}; }
  207         1598  
1386 61         265 for my $_j (10 .. 11) { delete $self->{'_mutex_'.$_j}; } # input mutexes
  122         738  
1387              
1388             ## Remove session directory.
1389 61 50 33     357 rmdir $_sess_dir if (defined $_sess_dir && -d $_sess_dir);
1390              
1391             ## Reset instance.
1392 61         124 undef @{$self->{_pids}}; undef @{$self->{_thrs}}; undef @{$self->{_tids}};
  61         273  
  61         122  
  61         134  
  61         117  
  61         140  
1393 61         116 undef @{$self->{_state}}; undef @{$self->{_status}}; undef @{$self->{_task}};
  61         745  
  61         106  
  61         221  
  61         115  
  61         207  
1394              
1395 61         224 $self->{_chunk_id} = $self->{_send_cnt} = $self->{_spawned} = 0;
1396 61         137 $self->{_total_running} = $self->{_total_exited} = 0;
1397 61         139 $self->{_total_workers} = 0;
1398 61         143 $self->{_sess_dir} = undef;
1399              
1400 61 50       242 if ($self->{loop_timeout}) {
1401 0         0 delete $self->{_pids_t};
1402 0         0 delete $self->{_pids_w};
1403             }
1404              
1405 61         728 return;
1406             }
1407              
1408             ###############################################################################
1409             ## ----------------------------------------------------------------------------
1410             ## Barrier sync and yield methods.
1411             ##
1412             ###############################################################################
1413              
1414             sub sync {
1415 0 0   0 0 0 my $self = shift; $self = $MCE unless ref($self);
  0         0  
1416              
1417 0 0       0 return unless ($self->{_wid});
1418              
1419             ## Barrier synchronization is supported for task 0 at this time.
1420             ## Note: Workers are assigned task_id 0 when omitting user_tasks.
1421              
1422 0 0       0 return if ($self->{_task_id} > 0);
1423              
1424 0         0 my $_chn = $self->{_chn};
1425 0         0 my $_DAT_W_SOCK = $self->{_dat_w_sock}->[0];
1426 0         0 my $_BSB_R_SOCK = $self->{_bsb_r_sock};
1427 0         0 my $_BSB_W_SOCK = $self->{_bsb_w_sock};
1428 0         0 my $_buf;
1429              
1430 0 0       0 local $\ = undef if (defined $\);
1431              
1432             ## Notify the manager process (barrier begin).
1433 0         0 print {$_DAT_W_SOCK} OUTPUT_B_SYN.$LF . $_chn.$LF;
  0         0  
1434              
1435             ## Wait until all workers from (task_id 0) have synced.
1436 0 0       0 MCE::Util::_sock_ready($_BSB_R_SOCK, -1) if $_is_MSWin32;
1437 0         0 MCE::Util::_sysread($_BSB_R_SOCK, $_buf, 1);
1438              
1439             ## Notify the manager process (barrier end).
1440 0         0 print {$_DAT_W_SOCK} OUTPUT_E_SYN.$LF . $_chn.$LF;
  0         0  
1441              
1442             ## Wait until all workers from (task_id 0) have un-synced.
1443 0 0       0 MCE::Util::_sock_ready($_BSB_W_SOCK, -1) if $_is_MSWin32;
1444 0         0 MCE::Util::_sysread($_BSB_W_SOCK, $_buf, 1);
1445              
1446 0         0 return;
1447             }
1448              
1449             sub yield {
1450 0 0   0 0 0 my $self = shift; $self = $MCE unless ref($self);
  0         0  
1451              
1452 0 0       0 return unless ($self->{_wid});
1453              
1454 0         0 my $_chn = $self->{_chn};
1455 0         0 my $_DAT_LOCK = $self->{_dat_lock};
1456 0         0 my $_DAT_W_SOCK = $self->{_dat_w_sock}->[0];
1457 0         0 my $_DAU_W_SOCK = $self->{_dat_w_sock}->[$_chn];
1458 0         0 my $_lock_chn = $self->{_lock_chn};
1459 0         0 my $_delay;
1460              
1461 0 0       0 local $\ = undef if (defined $\);
1462 0 0 0     0 local $/ = $LF if (!$/ || $/ ne $LF);
1463              
1464 0 0       0 $_DAT_LOCK->lock() if $_lock_chn;
1465 0         0 print({$_DAT_W_SOCK} OUTPUT_I_DLY.$LF . $_chn.$LF),
1466 0         0 print({$_DAU_W_SOCK} $self->{_task_id}.$LF);
  0         0  
1467 0         0 chomp($_delay = <$_DAU_W_SOCK>);
1468 0 0       0 $_DAT_LOCK->unlock() if $_lock_chn;
1469              
1470 0         0 MCE::Util::_sleep( $_delay );
1471             }
1472              
1473             ###############################################################################
1474             ## ----------------------------------------------------------------------------
1475             ## Miscellaneous methods: abort exit sess_dir tmp_dir.
1476             ##
1477             ###############################################################################
1478              
1479             ## Abort current job.
1480              
1481             sub abort {
1482 0 0   0 0 0 my $self = shift; $self = $MCE unless ref($self);
  0         0  
1483              
1484 0         0 my $_QUE_R_SOCK = $self->{_que_r_sock};
1485 0         0 my $_QUE_W_SOCK = $self->{_que_w_sock};
1486 0         0 my $_abort_msg = $self->{_abort_msg};
1487              
1488 0 0       0 if (defined $_abort_msg) {
1489 0         0 local $\ = undef;
1490              
1491 0 0       0 if ($_abort_msg > 0) {
1492 0         0 MCE::Util::_sysread($_QUE_R_SOCK, my($_next), $_que_read_size);
1493 0         0 syswrite($_QUE_W_SOCK, pack($_que_template, 0, $_abort_msg));
1494             }
1495              
1496 0 0       0 if ($self->{_wid} > 0) {
1497 0         0 my $_chn = $self->{_chn};
1498 0         0 my $_DAT_LOCK = $self->{_dat_lock};
1499 0         0 my $_DAT_W_SOCK = $self->{_dat_w_sock}->[0];
1500 0         0 my $_DAU_W_SOCK = $self->{_dat_w_sock}->[$_chn];
1501 0         0 my $_lock_chn = $self->{_lock_chn};
1502              
1503 0 0       0 $_DAT_LOCK->lock() if $_lock_chn;
1504 0         0 print {$_DAT_W_SOCK} OUTPUT_W_ABT.$LF . $_chn.$LF;
  0         0  
1505 0 0       0 $_DAT_LOCK->unlock() if $_lock_chn;
1506             }
1507             }
1508              
1509 0         0 return;
1510             }
1511              
1512             ## Worker exits from MCE.
1513              
1514             sub exit {
1515 0 0   0 0 0 my $self = shift; $self = $MCE unless ref($self);
  0         0  
1516              
1517 0 0       0 my $_exit_status = (defined $_[0]) ? $_[0] : $?;
1518 0 0       0 my $_exit_msg = (defined $_[1]) ? $_[1] : '';
1519 0 0       0 my $_exit_id = (defined $_[2]) ? $_[2] : $self->chunk_id;
1520              
1521 0         0 @_ = ();
1522              
1523             _croak('MCE::exit: method is not allowed by the manager process')
1524 0 0       0 unless ($self->{_wid});
1525              
1526 0         0 my $_chn = $self->{_chn};
1527 0         0 my $_DAT_LOCK = $self->{_dat_lock};
1528 0         0 my $_DAT_W_SOCK = $self->{_dat_w_sock}->[0];
1529 0         0 my $_DAU_W_SOCK = $self->{_dat_w_sock}->[$_chn];
1530 0         0 my $_lock_chn = $self->{_lock_chn};
1531 0         0 my $_task_id = $self->{_task_id};
1532              
1533 0 0       0 unless ( $self->{_exiting} ) {
1534 0         0 $self->{_exiting} = 1;
1535              
1536 0 0       0 my $_pid = $self->{_is_thread} ? $$ .'.'. threads->tid() : $$;
1537 0         0 my $_max_retries = $self->{max_retries};
1538 0         0 my $_chunk_id = $self->{_chunk_id};
1539              
1540 0 0 0     0 if ( defined $self->{init_relay} && !$self->{_relayed} && !$_task_id &&
      0        
      0        
      0        
1541             exists $self->{_wuf} && $self->{_pid} eq $_pid ) {
1542              
1543 0 0       0 $self->{_retry_cnt} = -1 unless defined( $self->{_retry_cnt} );
1544              
1545 0 0 0     0 if ( !$_max_retries || ++$self->{_retry_cnt} == $_max_retries ) {
1546 0 0   0   0 MCE::relay { warn "Error: chunk $_chunk_id failed\n" if $_chunk_id };
  0         0  
1547             }
1548             }
1549              
1550             ## Check for nested workers not yet joined.
1551 0 0       0 MCE::Child->finish('MCE') if $INC{'MCE/Child.pm'};
1552              
1553             MCE::Hobo->finish('MCE')
1554 0 0 0     0 if ( $INC{'MCE/Hobo.pm'} && MCE::Hobo->can('_clear') );
1555              
1556 0 0       0 local $\ = undef if (defined $\);
1557 0         0 my $_len = length $_exit_msg;
1558              
1559 0         0 $_exit_id =~ s/[\r\n][\r\n]*/ /mg;
1560 0 0       0 $_DAT_LOCK->lock() if $_lock_chn;
1561              
1562 0 0 0     0 if ($self->{_retry} && $self->{_retry}->[2]--) {
1563 0         0 $_exit_status = 0; my $_buf = $self->{freeze}($self->{_retry});
  0         0  
1564 0         0 print({$_DAT_W_SOCK} OUTPUT_W_EXT.$LF . $_chn.$LF),
1565 0         0 print({$_DAU_W_SOCK}
1566 0         0 $_task_id.$LF . $self->{_wid}.$LF . $self->{_exit_pid}.$LF .
1567             $_exit_status.$LF . $_exit_id.$LF . $_len.$LF . $_exit_msg .
1568             length($_buf).$LF, $_buf
1569             );
1570             }
1571             else {
1572 0         0 print({$_DAT_W_SOCK} OUTPUT_W_EXT.$LF . $_chn.$LF),
1573 0         0 print({$_DAU_W_SOCK}
1574 0         0 $_task_id.$LF . $self->{_wid}.$LF . $self->{_exit_pid}.$LF .
1575             $_exit_status.$LF . $_exit_id.$LF . $_len.$LF . $_exit_msg .
1576             '0'.$LF
1577             );
1578             }
1579              
1580 0 0       0 $_DAT_LOCK->unlock() if $_lock_chn;
1581             }
1582              
1583 0         0 _exit($self);
1584             }
1585              
1586             ## Return the session dir, made on demand.
1587              
1588             sub sess_dir {
1589 0 0   0 0 0 my $self = shift; $self = $MCE unless ref($self);
  0         0  
1590 0 0       0 return $self->{_sess_dir} if defined $self->{_sess_dir};
1591              
1592 0 0       0 if ($self->{_wid} == 0) {
1593             $self->{_sess_dir} = $self->{_spawned}
1594 0 0       0 ? _make_sessdir($self) : undef;
1595             }
1596             else {
1597 0         0 my $_chn = $self->{_chn};
1598 0         0 my $_DAT_LOCK = $self->{_dat_lock};
1599 0         0 my $_DAT_W_SOCK = $self->{_dat_w_sock}->[0];
1600 0         0 my $_DAU_W_SOCK = $self->{_dat_w_sock}->[$_chn];
1601 0         0 my $_lock_chn = $self->{_lock_chn};
1602 0         0 my $_sess_dir;
1603              
1604 0 0       0 local $\ = undef if (defined $\);
1605 0 0 0     0 local $/ = $LF if (!$/ || $/ ne $LF);
1606              
1607 0 0       0 $_DAT_LOCK->lock() if $_lock_chn;
1608 0         0 print({$_DAT_W_SOCK} OUTPUT_S_DIR.$LF . $_chn.$LF);
  0         0  
1609 0         0 chomp($_sess_dir = <$_DAU_W_SOCK>);
1610 0 0       0 $_DAT_LOCK->unlock() if $_lock_chn;
1611              
1612 0         0 $self->{_sess_dir} = $_sess_dir;
1613             }
1614             }
1615              
1616             ## Return the temp dir, made on demand.
1617              
1618             sub tmp_dir {
1619 31 50   31 0 3397 my $self = shift; $self = $MCE unless ref($self);
  31         137  
1620 31 50       205 return $self->{tmp_dir} if defined $self->{tmp_dir};
1621              
1622 31 50       130 if ($self->{_wid} == 0) {
1623 31         142 $self->{tmp_dir} = MCE::Signal::_make_tmpdir();
1624             }
1625             else {
1626 0         0 my $_chn = $self->{_chn};
1627 0         0 my $_DAT_LOCK = $self->{_dat_lock};
1628 0         0 my $_DAT_W_SOCK = $self->{_dat_w_sock}->[0];
1629 0         0 my $_DAU_W_SOCK = $self->{_dat_w_sock}->[$_chn];
1630 0         0 my $_lock_chn = $self->{_lock_chn};
1631 0         0 my $_tmp_dir;
1632              
1633 0 0       0 local $\ = undef if (defined $\);
1634 0 0 0     0 local $/ = $LF if (!$/ || $/ ne $LF);
1635              
1636 0 0       0 $_DAT_LOCK->lock() if $_lock_chn;
1637 0         0 print({$_DAT_W_SOCK} OUTPUT_T_DIR.$LF . $_chn.$LF);
  0         0  
1638 0         0 chomp($_tmp_dir = <$_DAU_W_SOCK>);
1639 0 0       0 $_DAT_LOCK->unlock() if $_lock_chn;
1640              
1641 0         0 $self->{tmp_dir} = $_tmp_dir;
1642             }
1643             }
1644              
1645             ###############################################################################
1646             ## ----------------------------------------------------------------------------
1647             ## Methods for serializing data from workers to the main process.
1648             ##
1649             ###############################################################################
1650              
1651             ## Do method. Additional arguments are optional.
1652              
1653             sub do {
1654 133 100   133 0 856 my $self = shift; $self = $MCE unless ref($self);
  133         340  
1655 133 50       488 my $_pkg = caller() eq 'MCE' ? caller(1) : caller();
1656              
1657 133 50       304 _croak('MCE::do: (code ref) is not supported')
1658             if (ref $_[0] eq 'CODE');
1659 133 50       431 _croak('MCE::do: (callback) is not specified')
1660             unless (defined ( my $_func = shift ));
1661              
1662 133 50       491 $_func = $_pkg.'::'.$_func if (index($_func, ':') < 0);
1663              
1664 133 50       356 if ($self->{_wid}) {
1665 133         808 return _do_callback($self, $_func, [ @_ ]);
1666             }
1667             else {
1668 85     85   816 no strict 'refs';
  85         301  
  85         228489  
1669 0         0 return $_func->(@_);
1670             }
1671             }
1672              
1673             ## Gather method.
1674              
1675             sub gather {
1676 357 50   357 0 2084 my $self = shift; $self = $MCE unless ref($self);
  357         771  
1677              
1678             _croak('MCE::gather: method is not allowed by the manager process')
1679 357 50       984 unless ($self->{_wid});
1680              
1681 357         1779 return _do_gather($self, [ @_ ]);
1682             }
1683              
1684             ## Sendto method.
1685              
1686             {
1687             my %_sendto_lkup = (
1688             'file' => SENDTO_FILEV1, 'stderr' => SENDTO_STDERR,
1689             'file:' => SENDTO_FILEV2, 'stdout' => SENDTO_STDOUT,
1690             'fd:' => SENDTO_FD,
1691             );
1692              
1693             my $_v2_regx = qr/^([^:]+:)(.+)/;
1694              
1695             sub sendto {
1696              
1697 0 0   0 0 0 my $self = shift; $self = $MCE unless ref($self);
  0         0  
1698 0         0 my $_to = shift;
1699              
1700             _croak('MCE::sendto: method is not allowed by the manager process')
1701 0 0       0 unless ($self->{_wid});
1702              
1703 0 0       0 return unless (defined $_[0]);
1704              
1705             my $_dest = exists $_sendto_lkup{ lc($_to) }
1706 0 0       0 ? $_sendto_lkup{ lc($_to) } : undef;
1707 0         0 my $_value;
1708              
1709 0 0       0 if (!defined $_dest) {
1710 0         0 my $_fd;
1711              
1712 0 0 0     0 if (ref($_to) && ( defined ($_fd = fileno($_to)) ||
    0 0        
1713             defined ($_fd = eval { $_to->fileno }) )) {
1714              
1715 0 0       0 if (my $_ob = tied *{ $_to }) {
  0         0  
1716 0 0       0 if (ref $_ob eq 'IO::TieCombine::Handle') {
1717 0 0       0 $_fd = 1 if (lc($_ob->{slot_name}) eq 'stdout');
1718 0 0       0 $_fd = 2 if (lc($_ob->{slot_name}) eq 'stderr');
1719             }
1720             }
1721              
1722 0 0       0 my $_data_ref = (scalar @_ == 1) ? \(''.$_[0]) : \join('', @_);
1723 0         0 return _do_send_glob($self, $_to, $_fd, $_data_ref);
1724             }
1725             elsif (reftype($_to) eq 'GLOB') {
1726 0         0 return _croak('Cannot write to filehandle');
1727             }
1728              
1729 0 0 0     0 if (defined $_to && $_to =~ /$_v2_regx/o) {
1730             $_dest = exists $_sendto_lkup{ lc($1) }
1731 0 0       0 ? $_sendto_lkup{ lc($1) } : undef;
1732 0         0 $_value = $2;
1733             }
1734              
1735 0 0 0     0 if (!defined $_dest || ( !defined $_value && (
      0        
      0        
1736             $_dest == SENDTO_FILEV2 || $_dest == SENDTO_FD
1737             ))) {
1738 0         0 my $_msg = "\n";
1739 0         0 $_msg .= "MCE::sendto: improper use of method\n";
1740 0         0 $_msg .= "\n";
1741 0         0 $_msg .= "## usage:\n";
1742 0         0 $_msg .= "## ->sendto(\"stderr\", ...);\n";
1743 0         0 $_msg .= "## ->sendto(\"stdout\", ...);\n";
1744 0         0 $_msg .= "## ->sendto(\"file:/path/to/file\", ...);\n";
1745 0         0 $_msg .= "## ->sendto(\"fd:2\", ...);\n";
1746 0         0 $_msg .= "\n";
1747              
1748 0         0 _croak($_msg);
1749             }
1750             }
1751              
1752 0 0       0 if ($_dest == SENDTO_FILEV1) { # sendto 'file', $a, $path
1753 0 0 0     0 return if (!defined $_[1] || @_ > 2); # Please switch to using V2
1754 0         0 $_value = $_[1]; delete $_[1]; # sendto 'file:/path', $a
  0         0  
1755 0         0 $_dest = SENDTO_FILEV2;
1756             }
1757              
1758 0         0 return _do_send($self, $_dest, $_value, @_);
1759             }
1760             }
1761              
1762             ###############################################################################
1763             ## ----------------------------------------------------------------------------
1764             ## Functions for serializing print, printf and say statements.
1765             ##
1766             ###############################################################################
1767              
1768             sub print {
1769 0 0   0 0 0 my $self = shift; $self = $MCE unless ref($self);
  0         0  
1770 0         0 my ($_fd, $_glob, $_data);
1771              
1772 0 0 0     0 if (ref($_[0]) && ( defined ($_fd = fileno($_[0])) ||
    0 0        
1773             defined ($_fd = eval { $_[0]->fileno }) )) {
1774              
1775 0 0       0 if (my $_ob = tied *{ $_[0] }) {
  0         0  
1776 0 0       0 if (ref $_ob eq 'IO::TieCombine::Handle') {
1777 0 0       0 $_fd = 1 if (lc($_ob->{slot_name}) eq 'stdout');
1778 0 0       0 $_fd = 2 if (lc($_ob->{slot_name}) eq 'stderr');
1779             }
1780             }
1781              
1782 0         0 $_glob = shift;
1783             }
1784             elsif (reftype($_[0]) eq 'GLOB') {
1785 0         0 return _croak('Cannot write to filehandle');
1786             }
1787              
1788 0 0       0 $_data = join('', scalar @_ ? @_ : $_);
1789              
1790 0 0       0 return _do_send_glob($self, $_glob, $_fd, \$_data) if $_fd;
1791 0 0       0 return _do_send($self, SENDTO_STDOUT, undef, \$_data) if $self->{_wid};
1792 0         0 return _do_send_glob($self, \*STDOUT, 1, \$_data);
1793             }
1794              
1795             sub printf {
1796 0 0   0 0 0 my $self = shift; $self = $MCE unless ref($self);
  0         0  
1797 0         0 my ($_fd, $_glob, $_fmt, $_data);
1798              
1799 0 0 0     0 if (ref($_[0]) && ( defined ($_fd = fileno($_[0])) ||
    0 0        
1800             defined ($_fd = eval { $_[0]->fileno }) )) {
1801              
1802 0 0       0 if (my $_ob = tied *{ $_[0] }) {
  0         0  
1803 0 0       0 if (ref $_ob eq 'IO::TieCombine::Handle') {
1804 0 0       0 $_fd = 1 if (lc($_ob->{slot_name}) eq 'stdout');
1805 0 0       0 $_fd = 2 if (lc($_ob->{slot_name}) eq 'stderr');
1806             }
1807             }
1808              
1809 0         0 $_glob = shift;
1810             }
1811             elsif (reftype($_[0]) eq 'GLOB') {
1812 0         0 return _croak('Cannot write to filehandle');
1813             }
1814              
1815 0   0     0 $_fmt = shift || '%s';
1816 0 0       0 $_data = _sprintf($_fmt, scalar @_ ? @_ : $_);
1817              
1818 0 0       0 return _do_send_glob($self, $_glob, $_fd, \$_data) if $_fd;
1819 0 0       0 return _do_send($self, SENDTO_STDOUT, undef, \$_data) if $self->{_wid};
1820 0         0 return _do_send_glob($self, \*STDOUT, 1, \$_data);
1821             }
1822              
1823             sub say {
1824 0 0   0 0 0 my $self = shift; $self = $MCE unless ref($self);
  0         0  
1825 0         0 my ($_fd, $_glob, $_data);
1826              
1827 0 0 0     0 if (ref($_[0]) && ( defined ($_fd = fileno($_[0])) ||
    0 0        
1828             defined ($_fd = eval { $_[0]->fileno }) )) {
1829              
1830 0 0       0 if (my $_ob = tied *{ $_[0] }) {
  0         0  
1831 0 0       0 if (ref $_ob eq 'IO::TieCombine::Handle') {
1832 0 0       0 $_fd = 1 if (lc($_ob->{slot_name}) eq 'stdout');
1833 0 0       0 $_fd = 2 if (lc($_ob->{slot_name}) eq 'stderr');
1834             }
1835             }
1836              
1837 0         0 $_glob = shift;
1838             }
1839             elsif (reftype($_[0]) eq 'GLOB') {
1840 0         0 return _croak('Cannot write to filehandle');
1841             }
1842              
1843 0 0       0 $_data = join('', scalar @_ ? @_ : $_) . "\n";
1844              
1845 0 0       0 return _do_send_glob($self, $_glob, $_fd, \$_data) if $_fd;
1846 0 0       0 return _do_send($self, SENDTO_STDOUT, undef, \$_data) if $self->{_wid};
1847 0         0 return _do_send_glob($self, \*STDOUT, 1, \$_data);
1848             }
1849              
1850             ###############################################################################
1851             ## ----------------------------------------------------------------------------
1852             ## Private methods.
1853             ##
1854             ###############################################################################
1855              
1856             sub _exit {
1857 65     65   228 my $self = shift;
1858 65 50 33     519 my $_has_guard = (exists $self->{_guard} && $self->{_guard}->[0]) ? 1 : 0;
1859              
1860 65 50       324 @{ $self->{_guard} } = () if $_has_guard;
  0         0  
1861 65         268 delete $self->{_wuf}; _end();
  65         1395  
1862              
1863             ## Exit thread/child process.
1864 65 50   0   5415 $SIG{__DIE__} = sub {} unless $_tid;
1865 65     0   16695 $SIG{__WARN__} = sub {};
1866              
1867 65 50       501 threads->exit(0) if $self->{use_threads};
1868              
1869 65 50       293 if (! $_tid) {
1870             $SIG{HUP} = $SIG{INT} = $SIG{QUIT} = $SIG{TERM} = sub {
1871 0     0   0 $SIG{$_[0]} = $SIG{INT} = $SIG{TERM} = sub {};
1872              
1873 0 0 0     0 CORE::kill($_[0], getppid())
      0        
1874             if (($_[0] eq 'INT' || $_[0] eq 'TERM') && $^O ne 'MSWin32');
1875              
1876 0         0 CORE::kill('KILL', $$);
1877 65         3393 };
1878             }
1879              
1880 65 0 33     669 if ($self->{posix_exit} && !$_has_guard && !$_is_MSWin32) {
      33        
1881 0         0 eval { MCE::Mutex::Channel::_destroy() };
  0         0  
1882 0 0       0 POSIX::_exit(0) if $INC{'POSIX.pm'};
1883 0         0 CORE::kill('KILL', $$);
1884             }
1885              
1886 65         23971 CORE::exit(0);
1887             }
1888              
1889             sub _get_max_workers {
1890 126 50   126   264 my $self = shift; $self = $MCE unless ref($self);
  126         392  
1891              
1892 126 100       483 if (defined $self->{user_tasks}) {
1893 91 50       266 if (defined $self->{user_tasks}->[0]->{max_workers}) {
1894 91         1016 return $self->{user_tasks}->[0]->{max_workers};
1895             }
1896             }
1897              
1898 35         87 return $self->{max_workers};
1899             }
1900              
1901             sub _make_sessdir {
1902 0 0   0   0 my $self = shift; $self = $MCE unless ref($self);
  0         0  
1903              
1904 0         0 my $_sess_dir = $self->{_sess_dir};
1905              
1906 0 0       0 unless (defined $_sess_dir) {
1907             $self->{tmp_dir} = MCE::Signal::_make_tmpdir()
1908 0 0       0 unless defined $self->{tmp_dir};
1909              
1910 0 0       0 my $_mce_tid = $INC{'threads.pm'} ? threads->tid() : '';
1911 0 0       0 $_mce_tid = '' unless defined $self->{_mce_tid};
1912              
1913 0         0 my $_mce_sid = $$ .'.'. $_mce_tid .'.'. (++$_mce_count);
1914 0         0 my $_tmp_dir = $self->{tmp_dir};
1915              
1916 0 0 0     0 _croak("MCE::sess_dir: (tmp_dir) is not defined")
1917             if (!defined $_tmp_dir || $_tmp_dir eq '');
1918 0 0       0 _croak("MCE::sess_dir: ($_tmp_dir) is not a directory or does not exist")
1919             unless (-d $_tmp_dir);
1920 0 0       0 _croak("MCE::sess_dir: ($_tmp_dir) is not writeable")
1921             unless (-w $_tmp_dir);
1922              
1923 0         0 my $_cnt = 0; $_sess_dir = "$_tmp_dir/$_mce_sid";
  0         0  
1924              
1925 0         0 $_sess_dir = "$_tmp_dir/$_mce_sid." . (++$_cnt)
1926             while ( !(mkdir $_sess_dir, 0770) );
1927             }
1928              
1929 0         0 return $_sess_dir;
1930             }
1931              
1932             sub _sprintf {
1933 0     0   0 my ($_fmt, $_arg) = @_;
1934             # remove tainted'ness
1935 0         0 ($_fmt) = $_fmt =~ /(.*)/s;
1936              
1937 0         0 return sprintf("$_fmt", $_arg);
1938             }
1939              
1940             sub _sync_buffer_to_array {
1941 5     5   20 my ($_buffer_ref, $_array_ref, $_chop_str) = @_;
1942              
1943 5         11 local $_; my $_cnt = 0;
  5         9  
1944              
1945 5         306 open my $_MEM_FH, '<', $_buffer_ref;
  4         7  
  4         135  
1946 5         4494 binmode $_MEM_FH, ':raw';
1947              
1948 5 50       27 unless (length $_chop_str) {
1949 5         74 $_array_ref->[$_cnt++] = $_ while (<$_MEM_FH>);
1950             }
1951             else {
1952 0         0 $_array_ref->[$_cnt++] = <$_MEM_FH>;
1953 0         0 while (<$_MEM_FH>) {
1954 0         0 $_array_ref->[$_cnt ] = $_chop_str;
1955 0         0 $_array_ref->[$_cnt++] .= $_;
1956             }
1957             }
1958              
1959 5         15 close $_MEM_FH;
1960 5         31 weaken $_MEM_FH;
1961              
1962 5         15 return;
1963             }
1964              
1965             sub _sync_params {
1966 187     187   445 my ($self, $_params_ref) = @_;
1967 187         825 my $_requires_shutdown = 0;
1968              
1969 187 50 33     748 if (defined $_params_ref->{init_relay} && !defined $self->{init_relay}) {
1970 0         0 $_requires_shutdown = 1;
1971             }
1972 187         938 for my $_p (qw( user_begin user_func user_end )) {
1973 561 50       1225 if (defined $_params_ref->{$_p}) {
1974 0         0 $self->{$_p} = delete $_params_ref->{$_p};
1975 0         0 $_requires_shutdown = 1;
1976             }
1977             }
1978 187         321 for my $_p (keys %{ $_params_ref }) {
  187         947  
1979             _croak("MCE::_sync_params: ($_p) is not a valid params argument")
1980 311 50       866 unless (exists $_params_allowed_args{$_p});
1981              
1982 311         672 $self->{$_p} = $_params_ref->{$_p};
1983             }
1984              
1985 187 100       688 return ($self->{_spawned}) ? $_requires_shutdown : 0;
1986             }
1987              
1988             ###############################################################################
1989             ## ----------------------------------------------------------------------------
1990             ## Dispatch methods.
1991             ##
1992             ###############################################################################
1993              
1994             sub _dispatch {
1995 65     65   2154 my @_args = @_; my $_is_thread = shift @_args;
  65         1080  
1996 65         1546 my $self = $MCE = $_args[0];
1997              
1998             ## To avoid (Scalars leaked: N) messages; fixed in Perl 5.12.x
1999 65         937 @_ = ();
2000              
2001             $ENV{'PERL_MCE_IPC'} = 'win32' if ( $_is_MSWin32 && (
2002             defined($self->{max_retries}) ||
2003             $INC{'MCE/Child.pm'} ||
2004 65 0 0     10191 $INC{'MCE/Hobo.pm'}
      33        
2005             ));
2006              
2007 65         1066 delete $self->{_relayed};
2008              
2009 65         1778 $self->{_is_thread} = $_is_thread;
2010 65 50       3448 $self->{_pid} = $_is_thread ? $$ .'.'. threads->tid() : $$;
2011              
2012             ## Sets the seed of the base generator uniquely between workers.
2013             ## The new seed is computed using the current seed and $_wid value.
2014             ## One may set the seed at the application level for predictable
2015             ## results (non-thread workers only). Ditto for Math::Prime::Util,
2016             ## Math::Random, and Math::Random::MT::Auto.
2017              
2018             {
2019 65         1189 my ($_wid, $_seed) = ($_args[1], $self->{_seed});
  65         857  
2020 65         1866 srand(abs($_seed - ($_wid * 100000)) % 2147483560);
2021              
2022 65 50       1433 if (!$self->{use_threads}) {
2023             Math::Prime::Util::srand(abs($_seed - ($_wid * 100000)) % 2147483560)
2024 65 50       1420 if ( $INC{'Math/Prime/Util.pm'} );
2025              
2026             MCE::Hobo->_clear()
2027 65 50 33     18916 if ( $INC{'MCE/Hobo.pm'} && MCE::Hobo->can('_clear') );
2028              
2029 65 50       1054 MCE::Child->_clear() if $INC{'MCE/Child.pm'};
2030             }
2031             }
2032              
2033 65 50 33     2458 if (!$self->{use_threads} && $INC{'Math/Random.pm'}) {
2034 0         0 my ($_wid, $_cur_seed) = ($_args[1], Math::Random::random_get_seed());
2035              
2036 0 0       0 my $_new_seed = ($_cur_seed < 1073741781)
2037             ? $_cur_seed + (($_wid * 100000) % 1073741780)
2038             : $_cur_seed - (($_wid * 100000) % 1073741780);
2039              
2040 0         0 Math::Random::random_set_seed($_new_seed, $_new_seed);
2041             }
2042              
2043 65 50 33     5208 if (!$self->{use_threads} && $INC{'Math/Random/MT/Auto.pm'}) {
2044 0         0 my ($_wid, $_cur_seed) = (
2045             $_args[1], Math::Random::MT::Auto::get_seed()->[0]
2046             );
2047 0 0       0 my $_new_seed = ($_cur_seed < 1073741781)
2048             ? $_cur_seed + (($_wid * 100000) % 1073741780)
2049             : $_cur_seed - (($_wid * 100000) % 1073741780);
2050              
2051 0         0 Math::Random::MT::Auto::set_seed($_new_seed);
2052             }
2053              
2054             ## Run.
2055              
2056 65         3828 _worker_main(@_args, \@_plugin_worker_init);
2057              
2058 65         1304 _exit($self);
2059             }
2060              
2061             sub _dispatch_thread {
2062 0     0   0 my ($self, $_wid, $_task, $_task_id, $_task_wid, $_params) = @_;
2063              
2064 0         0 @_ = (); local $_;
  0         0  
2065              
2066 0         0 my $_thr = threads->create( \&_dispatch,
2067             1, $self, $_wid, $_task, $_task_id, $_task_wid, $_params
2068             );
2069              
2070 0 0       0 _croak("MCE::_dispatch_thread: Failed to spawn worker $_wid: $!")
2071             if (!defined $_thr);
2072              
2073             ## Store into an available slot (restart), otherwise append to arrays.
2074 0 0       0 if (defined $_params) { for my $_i (0 .. @{ $self->{_tids} } - 1) {
  0         0  
  0         0  
2075 0 0       0 unless (defined $self->{_tids}->[$_i]) {
2076 0         0 $self->{_thrs}->[$_i] = $_thr;
2077 0         0 $self->{_tids}->[$_i] = $_thr->tid();
2078 0         0 return;
2079             }
2080             }}
2081              
2082 0         0 push @{ $self->{_thrs} }, $_thr;
  0         0  
2083 0         0 push @{ $self->{_tids} }, $_thr->tid();
  0         0  
2084              
2085             sleep $self->{spawn_delay}
2086 0 0 0     0 if defined($self->{spawn_delay}) && $self->{spawn_delay} > 0.0;
2087              
2088 0         0 return;
2089             }
2090              
2091             sub _dispatch_child {
2092 277     277   881 my ($self, $_wid, $_task, $_task_id, $_task_wid, $_params) = @_;
2093              
2094 277         444 @_ = (); local $_;
  277         375  
2095 277         357846 my $_pid = fork();
2096              
2097 277 50       25569 _croak("MCE::_dispatch_child: Failed to spawn worker $_wid: $!")
2098             if (!defined $_pid);
2099              
2100 277 100       5996 _dispatch(0, $self, $_wid, $_task, $_task_id, $_task_wid, $_params)
2101             if ($_pid == 0);
2102              
2103             ## Store into an available slot (restart), otherwise append to array.
2104 212 50       1429 if (defined $_params) { for my $_i (0 .. @{ $self->{_pids} } - 1) {
  0         0  
  0         0  
2105 0 0       0 unless (defined $self->{_pids}->[$_i]) {
2106 0         0 $self->{_pids}->[$_i] = $_pid;
2107 0         0 return;
2108             }
2109             }}
2110              
2111 212         4802 push @{ $self->{_pids} }, $_pid;
  212         5046  
2112              
2113 212 50 33     3394 if ($self->{loop_timeout} && !$_is_MSWin32) {
2114 0         0 $self->{_pids_t}{$_pid} = $_task_id;
2115 0         0 $self->{_pids_w}{$_pid} = $_wid;
2116             }
2117              
2118             sleep $self->{spawn_delay}
2119 212 50 33     25745 if defined($self->{spawn_delay}) && $self->{spawn_delay} > 0.0;
2120              
2121 212         8436 return;
2122             }
2123              
2124             1;
2125