File Coverage

blib/lib/MCE.pm
Criterion Covered Total %
statement 602 1076 55.9
branch 231 760 30.3
condition 86 366 23.5
subroutine 53 75 70.6
pod 0 20 0.0
total 972 2297 42.3


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## MCE - Many-Core Engine for Perl providing parallel processing capabilities.
4             ##
5             ###############################################################################
6              
7             package MCE;
8              
9 85     85   1509841 use strict;
  85         378  
  85         2396  
10 85     85   394 use warnings;
  85         146  
  85         2381  
11              
12 85     85   396 no warnings qw( threads recursion uninitialized );
  85         305  
  85         4014  
13              
14             our $VERSION = '1.889';
15              
16             ## no critic (BuiltinFunctions::ProhibitStringyEval)
17             ## no critic (Subroutines::ProhibitSubroutinePrototypes)
18             ## no critic (TestingAndDebugging::ProhibitNoStrict)
19              
20 85     85   639 use Carp ();
  85         180  
  85         22004  
21              
22             my ($_has_threads, $_freeze, $_thaw, $_tid, $_oid);
23              
24             BEGIN {
25 85     85   307 local $@;
26              
27 85 50 33     1370 if ( $^O eq 'MSWin32' && ! $INC{'threads.pm'} ) {
    50 33        
28 0         0 eval 'use threads; use threads::shared;';
29             }
30             elsif ( $INC{'threads.pm'} && ! $INC{'threads/shared.pm'} ) {
31 0         0 eval 'use threads::shared;';
32             }
33              
34 85 50       491 $_has_threads = $INC{'threads.pm'} ? 1 : 0;
35 85 50       330 $_tid = $_has_threads ? threads->tid() : 0;
36 85         531 $_oid = "$$.$_tid";
37              
38 85 50 33     777 if ( $] ge '5.008008' && ! $INC{'PDL.pm'} ) {
39 85     85   6534 eval 'use Sereal::Encoder 3.015; use Sereal::Decoder 3.015;';
  85     85   587  
  85         2339  
  85         4343  
  85         560  
  85         1604  
  85         2702  
40 85 50       433 if ( ! $@ ) {
41 85         1008 my $_encoder_ver = int( Sereal::Encoder->VERSION() );
42 85         644 my $_decoder_ver = int( Sereal::Decoder->VERSION() );
43 85 50       344 if ( $_encoder_ver - $_decoder_ver == 0 ) {
44 85         197 $_freeze = \&Sereal::Encoder::encode_sereal;
45 85         180 $_thaw = \&Sereal::Decoder::decode_sereal;
46             }
47             }
48             }
49              
50 85 50       2249 if ( ! defined $_freeze ) {
51 0         0 require Storable;
52 0         0 $_freeze = \&Storable::freeze;
53 0         0 $_thaw = \&Storable::thaw;
54             }
55             }
56              
57 85     85   47426 use IO::Handle ();
  85         529044  
  85         2244  
58 85     85   485 use Scalar::Util qw( looks_like_number refaddr reftype weaken );
  85         161  
  85         4737  
59 85     85   46964 use Socket qw( SOL_SOCKET SO_RCVBUF );
  85         304233  
  85         13448  
60 85     85   37926 use Time::HiRes qw( sleep time );
  85         96948  
  85         346  
61              
62 85     85   52724 use MCE::Util qw( $LF );
  85         187  
  85         8415  
63 85     85   42897 use MCE::Signal ();
  85         227  
  85         2233  
64 85     85   35005 use MCE::Mutex ();
  85         205  
  85         23594  
65              
66             our ($MCE, $RLA, $_que_template, $_que_read_size);
67             our (%_valid_fields_new);
68              
69             my ($TOP_HDLR, $_is_MSWin32, $_is_winenv, $_prev_mce);
70             my (%_valid_fields_task, %_params_allowed_args);
71              
72             BEGIN {
73             ## Configure pack/unpack template for writing to and from the queue.
74             ## Each entry contains 2 positive numbers: chunk_id & msg_id.
75             ## Check for >= 64-bit, otherwize fall back to machine's word length.
76              
77 85     85   421 $_que_template = ( ( log(~0+1) / log(2) ) >= 64 ) ? 'Q2' : 'I2';
78 85         628 $_que_read_size = length pack($_que_template, 0, 0);
79              
80             ## Attributes used internally.
81             ## _abort_msg _caller _chn _com_lock _dat_lock _mgr_live _rla_data _seed
82             ## _chunk_id _pids _run_mode _single_dim _thrs _tids _task_wid _wid _wuf
83             ## _exiting _exit_pid _last_sref _total_exited _total_running _total_workers
84             ## _send_cnt _sess_dir _spawned _state _status _task _task_id _wrk_status
85             ## _init_pid _init_total_workers _pids_t _pids_w _pids_c _relayed
86             ##
87             ## _bsb_r_sock _bsb_w_sock _com_r_sock _com_w_sock _dat_r_sock _dat_w_sock
88             ## _que_r_sock _que_w_sock _rla_r_sock _rla_w_sock _data_channels
89             ## _lock_chn _mutex_n
90              
91 85         224 %_valid_fields_new = map { $_ => 1 } qw(
  3230         6777  
92             max_workers tmp_dir use_threads user_tasks task_end task_name freeze thaw
93             chunk_size input_data sequence job_delay spawn_delay submit_delay RS
94             flush_file flush_stderr flush_stdout stderr_file stdout_file use_slurpio
95             interval user_args user_begin user_end user_func user_error user_output
96             bounds_only gather init_relay on_post_exit on_post_run parallel_io
97             loop_timeout max_retries progress posix_exit
98             );
99 85         383 %_params_allowed_args = map { $_ => 1 } qw(
  2465         3509  
100             chunk_size input_data sequence job_delay spawn_delay submit_delay RS
101             flush_file flush_stderr flush_stdout stderr_file stdout_file use_slurpio
102             interval user_args user_begin user_end user_func user_error user_output
103             bounds_only gather init_relay on_post_exit on_post_run parallel_io
104             loop_timeout max_retries progress
105             );
106 85         468 %_valid_fields_task = map { $_ => 1 } qw(
  1530         2446  
107             max_workers chunk_size input_data interval sequence task_end task_name
108             bounds_only gather init_relay user_args user_begin user_end user_func
109             RS parallel_io use_slurpio use_threads
110             );
111              
112 85 50       726 $_is_MSWin32 = ( $^O eq 'MSWin32' ) ? 1 : 0;
113 85 50       650 $_is_winenv = ( $^O =~ /mswin|mingw|msys|cygwin/i ) ? 1 : 0;
114              
115             ## Create accessor functions.
116 85     85   639 no strict 'refs'; no warnings 'redefine';
  85     85   145  
  85         2765  
  85         505  
  85         214  
  85         25226  
117              
118 85         2181 for my $_p (qw( chunk_size max_retries max_workers task_name user_args )) {
119 425         1856 *{ $_p } = sub () {
120 99 100   99   479 my $self = shift; $self = $MCE unless ref($self);
  99         220  
121 99         492 return $self->{$_p};
122 425         1667 };
123             }
124 85         182 for my $_p (qw( chunk_id task_id task_wid wid )) {
125 340         1249 *{ $_p } = sub () {
126 63 100   63   1059 my $self = shift; $self = $MCE unless ref($self);
  63         306  
127 63         1003 return $self->{"_${_p}"};
128 340         1327 };
129             }
130 85         233 for my $_p (qw( freeze thaw )) {
131 170         740 *{ $_p } = sub () {
132 208 100   208   8240 my $self = shift; $self = $MCE unless ref($self);
  208         15297  
133 208         2107 return $self->{$_p}(@_);
134 170         491 };
135             }
136              
137 85         193 $RLA = {};
138              
139 85         3231 return;
140             }
141              
142             ###############################################################################
143             ## ----------------------------------------------------------------------------
144             ## Import routine.
145             ##
146             ###############################################################################
147              
148 85     85   561 use constant { SELF => 0, CHUNK => 1, CID => 2 };
  85         294  
  85         10669  
149              
150 85     85   46147 our $_MCE_LOCK : shared = 1;
  85         102911  
  85         452  
151 85     85   6012 our $_WIN_LOCK : shared = 1;
  85         179  
  85         272  
152              
153             my ($_def, $_imported) = ({});
154              
155             sub import {
156 98     98   676 my ($_class, $_pkg) = (shift, caller);
157 98         355 my $_p = $_def->{$_pkg} = {};
158              
159             ## Process module arguments.
160 98         501 while ( my $_argument = shift ) {
161 0         0 my $_arg = lc $_argument;
162              
163 0 0       0 $_p->{MAX_WORKERS} = shift, next if ( $_arg eq 'max_workers' );
164 0 0       0 $_p->{CHUNK_SIZE} = shift, next if ( $_arg eq 'chunk_size' );
165 0 0       0 $_p->{TMP_DIR} = shift, next if ( $_arg eq 'tmp_dir' );
166 0 0       0 $_p->{FREEZE} = shift, next if ( $_arg eq 'freeze' );
167 0 0       0 $_p->{THAW} = shift, next if ( $_arg eq 'thaw' );
168 0 0       0 $_p->{INIT_RELAY} = shift, next if ( $_arg eq 'init_relay' );
169 0 0       0 $_p->{USE_THREADS} = shift, next if ( $_arg eq 'use_threads' );
170              
171 0 0 0     0 if ( $_arg eq 'export_const' || $_arg eq 'const' ) {
172 0 0       0 if ( shift eq '1' ) {
173 85     85   12699 no strict 'refs'; no warnings 'redefine';
  85     85   195  
  85         2785  
  85         460  
  85         179  
  85         30811  
174 0         0 *{ $_pkg.'::SELF' } = \&SELF;
  0         0  
175 0         0 *{ $_pkg.'::CHUNK' } = \&CHUNK;
  0         0  
176 0         0 *{ $_pkg.'::CID' } = \&CID;
  0         0  
177             }
178 0         0 next;
179             }
180              
181             ## Sereal, if available, is used automatically by MCE 1.800 onwards.
182 0 0       0 if ( $_arg eq 'sereal' ) {
183 0 0       0 if ( shift eq '0' ) {
184 0         0 require Storable;
185 0         0 $_p->{FREEZE} = \&Storable::freeze;
186 0         0 $_p->{THAW} = \&Storable::thaw;
187             }
188 0         0 next;
189             }
190              
191 0         0 _croak("Error: ($_argument) invalid module option");
192             }
193              
194 98 100       2581 return if $_imported++;
195              
196             ## Instantiate a module-level instance.
197 85         295 $MCE = MCE->new( _module_instance => 1, max_workers => 0 );
198              
199 85         9283 return;
200             }
201              
202             ###############################################################################
203             ## ----------------------------------------------------------------------------
204             ## Define constants & variables.
205             ##
206             ###############################################################################
207              
208             use constant {
209              
210             # Max data channels. This cannot be greater than 8 on MSWin32.
211 85         92738 DATA_CHANNELS => 8,
212              
213             # Max GC size. Undef variable when exceeding size.
214             MAX_GC_SIZE => 1024 * 1024 * 64,
215              
216             MAX_RECS_SIZE => 8192, # Reads number of records if N <= value
217             # Reads number of bytes if N > value
218              
219             OUTPUT_W_ABT => 'W~ABT', # Worker has aborted
220             OUTPUT_W_DNE => 'W~DNE', # Worker has completed
221             OUTPUT_W_RLA => 'W~RLA', # Worker has relayed
222             OUTPUT_W_EXT => 'W~EXT', # Worker has exited
223             OUTPUT_A_REF => 'A~REF', # Input << Array ref
224             OUTPUT_G_REF => 'G~REF', # Input << Glob ref
225             OUTPUT_H_REF => 'H~REF', # Input << Hash ref
226             OUTPUT_I_REF => 'I~REF', # Input << Iter ref
227             OUTPUT_A_CBK => 'A~CBK', # Callback w/ multiple args
228             OUTPUT_N_CBK => 'N~CBK', # Callback w/ no args
229             OUTPUT_A_GTR => 'A~GTR', # Gather data
230             OUTPUT_O_SND => 'O~SND', # Send >> STDOUT
231             OUTPUT_E_SND => 'E~SND', # Send >> STDERR
232             OUTPUT_F_SND => 'F~SND', # Send >> File
233             OUTPUT_D_SND => 'D~SND', # Send >> File descriptor
234             OUTPUT_B_SYN => 'B~SYN', # Barrier sync - begin
235             OUTPUT_E_SYN => 'E~SYN', # Barrier sync - end
236             OUTPUT_S_IPC => 'S~IPC', # Change to win32 IPC
237             OUTPUT_C_NFY => 'C~NFY', # Chunk ID notification
238             OUTPUT_P_NFY => 'P~NFY', # Progress notification
239             OUTPUT_R_NFY => 'R~NFY', # Relay notification
240             OUTPUT_S_DIR => 'S~DIR', # Make/get sess_dir
241             OUTPUT_T_DIR => 'T~DIR', # Make/get tmp_dir
242             OUTPUT_I_DLY => 'I~DLY', # Interval delay
243              
244             READ_FILE => 0, # Worker reads file handle
245             READ_MEMORY => 1, # Worker reads memory handle
246              
247             REQUEST_ARRAY => 0, # Worker requests next array chunk
248             REQUEST_GLOB => 1, # Worker requests next glob chunk
249             REQUEST_HASH => 2, # Worker requests next hash chunk
250              
251             SENDTO_FILEV1 => 0, # Worker sends to 'file', $a, '/path'
252             SENDTO_FILEV2 => 1, # Worker sends to 'file:/path', $a
253             SENDTO_STDOUT => 2, # Worker sends to STDOUT
254             SENDTO_STDERR => 3, # Worker sends to STDERR
255             SENDTO_FD => 4, # Worker sends to file descriptor
256              
257             WANTS_UNDEF => 0, # Callee wants nothing
258             WANTS_ARRAY => 1, # Callee wants list
259             WANTS_SCALAR => 2, # Callee wants scalar
260 85     85   632 };
  85         168  
261              
262             my $_mce_count = 0;
263              
264             sub CLONE {
265 0 0   0   0 $_tid = threads->tid() if $INC{'threads.pm'};
266             }
267              
268             sub DESTROY {
269 290 0 33 290   957 CORE::kill('KILL', $$)
270             if ( $_is_MSWin32 && $MCE::Signal::KILLED );
271              
272             $_[0]->shutdown(1)
273 290 0 33     1802 if ( $_[0] && $_[0]->{_spawned} && $_[0]->{_init_pid} eq "$$.$_tid" &&
      33        
      33        
274             !$MCE::Signal::KILLED );
275              
276 290         7970 return;
277             }
278              
279             END {
280 85 100   85   44734 return unless ( defined $MCE );
281              
282 20 50       176 my $_pid = $MCE->{_is_thread} ? $$ .'.'. threads->tid() : $$;
283 20 50 33     140 $MCE->exit if ( exists $MCE->{_wuf} && $MCE->{_pid} eq $_pid );
284              
285 20         133 _end();
286             }
287              
288             sub _end {
289 85 100   85   1647 MCE::Flow->finish ( 'MCE' ) if $INC{'MCE/Flow.pm'};
290 85 100       526 MCE::Grep->finish ( 'MCE' ) if $INC{'MCE/Grep.pm'};
291 85 100       543 MCE::Loop->finish ( 'MCE' ) if $INC{'MCE/Loop.pm'};
292 85 100       521 MCE::Map->finish ( 'MCE' ) if $INC{'MCE/Map.pm'};
293 85 100       678 MCE::Step->finish ( 'MCE' ) if $INC{'MCE/Step.pm'};
294 85 100       656 MCE::Stream->finish ( 'MCE' ) if $INC{'MCE/Stream.pm'};
295              
296 85         580 $MCE = $TOP_HDLR = undef;
297             }
298              
299             ###############################################################################
300             ## ----------------------------------------------------------------------------
301             ## Plugin interface for external modules plugging into MCE, e.g. MCE::Queue.
302             ##
303             ###############################################################################
304              
305             my (%_plugin_function, @_plugin_loop_begin, @_plugin_loop_end);
306             my (%_plugin_list, @_plugin_worker_init);
307              
308             sub _attach_plugin {
309 45     45   109 my $_ext_module = caller;
310              
311 45 50       142 unless (exists $_plugin_list{$_ext_module}) {
312 45         129 $_plugin_list{$_ext_module} = undef;
313              
314 45         86 my $_ext_output_function = $_[0];
315 45         89 my $_ext_output_loop_begin = $_[1];
316 45         73 my $_ext_output_loop_end = $_[2];
317 45         79 my $_ext_worker_init = $_[3];
318              
319 45 50       202 if (ref $_ext_output_function eq 'HASH') {
320 45         101 for my $_p (keys %{ $_ext_output_function }) {
  45         226  
321             $_plugin_function{$_p} = $_ext_output_function->{$_p}
322 545 50       1038 unless (exists $_plugin_function{$_p});
323             }
324             }
325              
326 45 50       261 push @_plugin_loop_begin, $_ext_output_loop_begin
327             if (ref $_ext_output_loop_begin eq 'CODE');
328 45 50       271 push @_plugin_loop_end, $_ext_output_loop_end
329             if (ref $_ext_output_loop_end eq 'CODE');
330 45 100       186 push @_plugin_worker_init, $_ext_worker_init
331             if (ref $_ext_worker_init eq 'CODE');
332             }
333              
334 45         162 @_ = ();
335              
336 45         148 return;
337             }
338              
339             ## Functions for saving and restoring $MCE.
340             ## Called by MCE::{ Flow, Grep, Loop, Map, Step, and Stream }.
341              
342             sub _save_state {
343 182     182   434 $_prev_mce = $MCE; $MCE = $_[0];
  182         466  
344 182         383 return;
345             }
346             sub _restore_state {
347 129     129   626 $_prev_mce->{_wrk_status} = $MCE->{_wrk_status};
348 129         306 $MCE = $_prev_mce; $_prev_mce = undef;
  129         461  
349 129         336 return;
350             }
351              
352             ###############################################################################
353             ## ----------------------------------------------------------------------------
354             ## New instance instantiation.
355             ##
356             ###############################################################################
357              
358             sub _croak {
359 0 0 0 0   0 if (MCE->wid == 0 || ! $^S) {
360 0         0 $SIG{__DIE__} = \&MCE::Signal::_die_handler;
361 0         0 $SIG{__WARN__} = \&MCE::Signal::_warn_handler;
362             }
363 0         0 $\ = undef; goto &Carp::croak;
  0         0  
364             }
365              
366             sub _relay (;&) {
367 0     0   0 goto &MCE::relay;
368             }
369              
370 85     85   39048 use MCE::Core::Validation ();
  85         201  
  85         1946  
371 85     85   42846 use MCE::Core::Manager ();
  85         195  
  85         2082  
372 85     85   42059 use MCE::Core::Worker ();
  85         227  
  85         221869  
373              
374             sub new {
375 319     319 0 37227 my ($class, %self) = @_;
376 319 100       1507 my $_pkg = exists $self{pkg} ? delete $self{pkg} : caller;
377              
378 319         676 @_ = ();
379              
380 319   33     1962 bless(\%self, ref($class) || $class);
381              
382 319   100     1660 $self{task_name} ||= 'MCE';
383 319   50     1556 $self{max_workers} ||= $_def->{$_pkg}{MAX_WORKERS} || 1;
      66        
384 319   50     3256 $self{chunk_size} ||= $_def->{$_pkg}{CHUNK_SIZE} || 1;
      33        
385 319   66     4172 $self{tmp_dir} ||= $_def->{$_pkg}{TMP_DIR} || $MCE::Signal::tmp_dir;
      66        
386 319   33     2863 $self{freeze} ||= $_def->{$_pkg}{FREEZE} || $_freeze;
      33        
387 319   33     2299 $self{thaw} ||= $_def->{$_pkg}{THAW} || $_thaw;
      33        
388              
389             $self{init_relay} = $_def->{$_pkg}{INIT_RELAY}
390 319 50       838 if (exists $_def->{$_pkg}{INIT_RELAY});
391              
392             $self{use_threads} = $_def->{$_pkg}{USE_THREADS}
393 319 50       814 if (exists $_def->{$_pkg}{USE_THREADS});
394              
395 319 100       882 if (exists $self{_module_instance}) {
396 85         229 $self{_init_total_workers} = $self{max_workers};
397 85         250 $self{_chunk_id} = $self{_task_wid} = $self{_wrk_status} = 0;
398 85         212 $self{_spawned} = $self{_task_id} = $self{_wid} = 0;
399 85         423 $self{_init_pid} = "$$.$_tid";
400              
401 85         423 return \%self;
402             }
403              
404 234         1291 _sendto_fhs_close();
405              
406 234         1004 for my $_p (keys %self) {
407             _croak("MCE::new: ($_p) is not a valid constructor argument")
408 1637 50       3272 unless (exists $_valid_fields_new{$_p});
409             }
410              
411 234         1775 $self{_caller} = $_pkg, $self{_init_pid} = "$$.$_tid";
412              
413 234 50       833 if (defined $self{use_threads}) {
414 0 0 0     0 if (!$_has_threads && $self{use_threads}) {
415 0         0 my $_msg = "\n";
416 0         0 $_msg .= "## Please include threads support prior to loading MCE\n";
417 0         0 $_msg .= "## when specifying use_threads => $self{use_threads}\n";
418 0         0 $_msg .= "\n";
419              
420 0         0 _croak($_msg);
421             }
422             }
423             else {
424 234 50       796 $self{use_threads} = ($_has_threads) ? 1 : 0;
425             }
426              
427 234 50       644 if (!exists $self{posix_exit}) {
428             $self{posix_exit} = 1 if (
429             $^S || $_tid || $INC{'Mojo/IOLoop.pm'} ||
430             $INC{'Coro.pm'} || $INC{'LWP/UserAgent.pm'} || $INC{'stfl.pm'} ||
431             $INC{'Curses.pm'} || $INC{'CGI.pm'} || $INC{'FCGI.pm'} ||
432             $INC{'Tk.pm'} || $INC{'Wx.pm'} || $INC{'Win32/GUI.pm'} ||
433 234 50 33     7841 $INC{'Gearman/Util.pm'} || $INC{'Gearman/XS.pm'}
      33        
      33        
      33        
      33        
      33        
      33        
      33        
      33        
      33        
      33        
      33        
      33        
434             );
435             }
436              
437             ## -------------------------------------------------------------------------
438             ## Validation.
439              
440 234 100       952 if (defined $self{tmp_dir}) {
441             _croak("MCE::new: ($self{tmp_dir}) is not a directory or does not exist")
442 42 50       868 unless (-d $self{tmp_dir});
443             _croak("MCE::new: ($self{tmp_dir}) is not writeable")
444 42 50       725 unless (-w $self{tmp_dir});
445             }
446              
447 234 100       1121 if (defined $self{user_tasks}) {
448             _croak('MCE::new: (user_tasks) is not an ARRAY reference')
449 103 50       519 unless (ref $self{user_tasks} eq 'ARRAY');
450              
451 103         473 $self{max_workers} = _parse_max_workers($self{max_workers});
452             $self{init_relay} = $self{user_tasks}->[0]->{init_relay}
453 103 50       589 if ($self{user_tasks}->[0]->{init_relay});
454              
455 103         219 for my $_task (@{ $self{user_tasks} }) {
  103         372  
456 159         289 for my $_p (keys %{ $_task }) {
  159         575  
457             _croak("MCE::new: ($_p) is not a valid task constructor argument")
458 528 50       1142 unless (exists $_valid_fields_task{$_p});
459             }
460 159 50       323 $_task->{max_workers} = 0 unless scalar(keys %{ $_task });
  159         1153  
461              
462             $_task->{max_workers} = $self{max_workers}
463 159 100       482 unless (defined $_task->{max_workers});
464             $_task->{use_threads} = $self{use_threads}
465 159 50       536 unless (defined $_task->{use_threads});
466              
467 159   50     648 bless($_task, ref(\%self) || \%self);
468             }
469             }
470              
471 234         1503 _validate_args(\%self);
472              
473             ## -------------------------------------------------------------------------
474             ## Private options. Limit chunk_size.
475              
476 234         356 my $_run_lock;
477              
478 234         645 $self{_chunk_id} = 0; # Chunk ID
479 234         469 $self{_send_cnt} = 0; # Number of times data was sent via send
480 234         428 $self{_spawned} = 0; # Have workers been spawned
481 234         517 $self{_task_id} = 0; # Task ID, starts at 0 (array index)
482 234         618 $self{_task_wid} = 0; # Task Worker ID, starts at 1 per task
483 234         585 $self{_wid} = 0; # Worker ID, starts at 1 per MCE instance
484 234         649 $self{_wrk_status} = 0; # For saving exit status when worker exits
485              
486 234 50       768 $self{_run_lock} = threads::shared::share($_run_lock) if $_is_MSWin32;
487              
488             $self{_last_sref} = (ref $self{input_data} eq 'SCALAR')
489 234 50       924 ? refaddr($self{input_data}) : 0;
490              
491             my $_data_channels = ("$$.$_tid" eq $_oid)
492 234 50       1397 ? ( $INC{'MCE/Channel.pm'} ? 4 : DATA_CHANNELS )
    50          
493             : 2;
494              
495 234         425 my $_total_workers = 0;
496              
497 234 100       554 if (defined $self{user_tasks}) {
498 103         232 $_total_workers += $_->{max_workers} for @{ $self{user_tasks} };
  103         456  
499             } else {
500 131         223 $_total_workers = $self{max_workers};
501             }
502              
503 234         656 $self{_init_total_workers} = $_total_workers;
504              
505 234 100       764 $self{_data_channels} = ($_total_workers < $_data_channels)
506             ? $_total_workers : $_data_channels;
507              
508 234 100       690 $self{_lock_chn} = ($_total_workers > $self{_data_channels}) ? 1 : 0;
509 234 50 33     1472 $self{_lock_chn} = 1 if $INC{'MCE/Child.pm'} || $INC{'MCE/Hobo.pm'};
510              
511 234 50       1152 $MCE = \%self if ($MCE->{_wid} == 0);
512              
513 234         1367 return \%self;
514             }
515              
516             ###############################################################################
517             ## ----------------------------------------------------------------------------
518             ## Spawn method.
519             ##
520             ###############################################################################
521              
522             sub spawn {
523 126 50   126 0 303 my $self = shift; $self = $MCE unless ref($self);
  126         604  
524              
525 126         324 local $_; @_ = ();
  126         398  
526              
527             _croak('MCE::spawn: method is not allowed by the worker process')
528 126 50       490 if ($self->{_wid});
529              
530             ## Return if workers have already been spawned or if module instance.
531 126 50 33     1369 return $self if ($self->{_spawned} || exists $self->{_module_instance});
532              
533 126 50       530 lock $_WIN_LOCK if $_is_MSWin32; # Obtain locks
534 126 50 33     549 lock $_MCE_LOCK if $_has_threads && $_is_winenv;
535              
536 126 0 33     641 $MCE::_GMUTEX->lock() if ($_tid && $MCE::_GMUTEX);
537 126 50       444 sleep 0.015 if $_tid;
538              
539 126         618 _sendto_fhs_close();
540              
541 126 50       485 if ($INC{'PDL.pm'}) { local $@;
  0         0  
542             # PDL::IO::Storable is required for serializing piddles.
543 0 0       0 eval 'use PDL::IO::Storable' unless $INC{'PDL/IO/Storable.pm'};
544             # PDL data should not be naively copied in new threads.
545 0         0 eval 'no warnings; sub PDL::CLONE_SKIP { 1 }';
546             # Disable PDL auto-threading.
547 0         0 eval q{ PDL::set_autopthread_targ(1) };
548             }
549 126 50 33     948 if ( $INC{'LWP/UserAgent.pm'} && !$INC{'Net/HTTP.pm'} ) {
550 0         0 local $@; eval 'require Net::HTTP; require Net::HTTPS';
  0         0  
551             }
552              
553             ## Start the shared-manager process if not running.
554 126 50       515 MCE::Shared->start() if $INC{'MCE/Shared.pm'};
555              
556             ## Load input module.
557 126 50       608 if (defined $self->{sequence}) {
    100          
558             require MCE::Core::Input::Sequence
559 0 0       0 unless $INC{'MCE/Core/Input/Sequence.pm'};
560             }
561             elsif (defined $self->{input_data}) {
562 45         202 my $_ref = ref $self->{input_data};
563 45 50       376 if ($_ref =~ /^(?:ARRAY|HASH|GLOB|FileHandle|IO::)/) {
    0          
564             require MCE::Core::Input::Request
565 45 100       16616 unless $INC{'MCE/Core/Input/Request.pm'};
566             }
567             elsif ($_ref eq 'CODE') {
568             require MCE::Core::Input::Iterator
569 0 0       0 unless $INC{'MCE/Core/Input/Iterator.pm'};
570             }
571             else {
572             require MCE::Core::Input::Handle
573 0 0       0 unless $INC{'MCE/Core/Input/Handle.pm'};
574             }
575             }
576              
577 126         525 my $_die_handler = $SIG{__DIE__};
578 126         321 my $_warn_handler = $SIG{__WARN__};
579              
580 126         636 $SIG{__DIE__} = \&MCE::Signal::_die_handler;
581 126         500 $SIG{__WARN__} = \&MCE::Signal::_warn_handler;
582              
583 126 50 33     1219 if (!defined $TOP_HDLR || (!$TOP_HDLR->{_mgr_live} && !$TOP_HDLR->{_wid})) {
    0          
584             ## On Windows, must shutdown the last idle MCE session.
585 126 0 33     525 if ($_is_MSWin32 && defined $TOP_HDLR && $TOP_HDLR->{_spawned}) {
      33        
586 0         0 $TOP_HDLR->shutdown(1);
587             }
588 126         324 $TOP_HDLR = $self;
589             }
590             elsif (refaddr($self) != refaddr($TOP_HDLR)) {
591             ## Reduce the maximum number of channels for nested sessions.
592 0 0       0 $self->{_data_channels} = 4 if ($self->{_data_channels} > 4);
593 0 0       0 $self->{_lock_chn} = 1 if ($self->{_init_total_workers} > 4);
594              
595             ## On Windows, instruct the manager process to enable win32 IPC.
596 0 0 0     0 if ($_is_MSWin32 && $ENV{'PERL_MCE_IPC'} ne 'win32') {
597 0         0 $ENV{'PERL_MCE_IPC'} = 'win32'; local $\ = undef;
  0         0  
598 0         0 my $_DAT_W_SOCK = $TOP_HDLR->{_dat_w_sock}->[0];
599 0         0 print {$_DAT_W_SOCK} OUTPUT_S_IPC.$LF . '0'.$LF;
  0         0  
600              
601 0         0 MCE::Util::_sock_ready($_DAT_W_SOCK, -1);
602 0         0 MCE::Util::_sysread($_DAT_W_SOCK, my($_buf), 1);
603             }
604             }
605              
606             ## -------------------------------------------------------------------------
607              
608 126         926 my $_data_channels = $self->{_data_channels};
609 126         949 my $_max_workers = _get_max_workers($self);
610 126         322 my $_use_threads = $self->{use_threads};
611              
612             ## Create [ 0 including 1 up to 8 ] locks for data channels (max 9).
613 126         1237 $self->{'_mutex_0'} = MCE::Mutex->new( impl => 'Channel' );
614              
615 126 50       773 if ($self->{_lock_chn}) {
616             $self->{'_mutex_'.$_} = MCE::Mutex->new( impl => 'Channel' )
617 0         0 for (1 .. $_data_channels);
618             }
619              
620             ## Create two locks for use by MCE::Core::Input::{ Handle or Sequence }.
621 126         1060 $self->{'_mutex_'.$_} = MCE::Mutex->new( impl => 'Channel' ) for (10 .. 11);
622              
623             ## Create sockets for IPC. sync, comm, input, data
624 126         915 MCE::Util::_sock_pair($self, qw(_bsb_r_sock _bsb_w_sock), undef, 1);
625 126         595 MCE::Util::_sock_pair($self, qw(_com_r_sock _com_w_sock), undef, 1);
626 126         588 MCE::Util::_sock_pair($self, qw(_que_r_sock _que_w_sock), undef, 1);
627 126         694 MCE::Util::_sock_pair($self, qw(_dat_r_sock _dat_w_sock), 0);
628              
629             MCE::Util::_sock_pair($self, qw(_dat_r_sock _dat_w_sock), $_, 1)
630 126         892 for (1 .. $_data_channels);
631              
632 126 50       1476 if ($^O !~ /linux|android|aix/) {
633 0         0 setsockopt($self->{_dat_r_sock}->[0], SOL_SOCKET, SO_RCVBUF, pack('i', 4096));
634             }
635              
636 126 100       636 if (defined $self->{init_relay}) { # relay
637 24 100       99 unless ($INC{'MCE/Relay.pm'}) {
638 9         5103 require MCE::Relay; MCE::Relay->import();
  9         54  
639             }
640             MCE::Util::_sock_pair($self, qw(_rla_r_sock _rla_w_sock), $_, 1)
641 24         169 for (0 .. $_max_workers - 1);
642             }
643              
644 126         3195 $self->{_seed} = int(rand() * 1e9);
645              
646             ## -------------------------------------------------------------------------
647              
648             ## Spawn workers.
649 126         986 $self->{_pids} = [], $self->{_thrs} = [], $self->{_tids} = [];
650 126         585 $self->{_status} = [], $self->{_state} = [], $self->{_task} = [];
651              
652 126 50 33     550 if ($self->{loop_timeout} && !$_is_MSWin32) {
653 0         0 $self->{_pids_t} = {}, $self->{_pids_w} = {};
654             }
655              
656 126 50       6285 local $SIG{TTIN}, local $SIG{TTOU}, local $SIG{WINCH} unless $_is_MSWin32;
657              
658 126 100       799 if (!defined $self->{user_tasks}) {
659 35         130 $self->{_total_workers} = $_max_workers;
660              
661 35 50 33     257 if (defined $_use_threads && $_use_threads == 1) {
662 0         0 _dispatch_thread($self, $_) for (1 .. $_max_workers);
663             } else {
664 35         217 _dispatch_child($self, $_) for (1 .. $_max_workers);
665             }
666              
667 16         1054 $self->{_task}->[0] = { _total_workers => $_max_workers };
668              
669 16         511 for my $_i (1 .. $_max_workers) {
670 45         1079 $self->{_state}->[$_i] = {
671             _task => undef, _task_id => undef, _task_wid => undef,
672             _params => undef, _chn => $_i % $_data_channels + 1
673             }
674             }
675             }
676             else {
677 91         295 my ($_task_id, $_wid);
678              
679 91         251 $self->{_total_workers} = 0;
680 91         178 $self->{_total_workers} += $_->{max_workers} for @{ $self->{user_tasks} };
  91         536  
681              
682             # Must spawn processes first for extra stability on BSD/Darwin.
683 91         214 $_task_id = $_wid = 0;
684              
685 91         145 for my $_task (@{ $self->{user_tasks} }) {
  91         266  
686 114         364 my $_tsk_use_threads = $_task->{use_threads};
687              
688 114 50 33     1105 if (defined $_tsk_use_threads && $_tsk_use_threads == 1) {
689 0         0 $_wid += $_task->{max_workers};
690             } else {
691             _dispatch_child($self, ++$_wid, $_task, $_task_id, $_)
692 114         1296 for (1 .. $_task->{max_workers});
693             }
694              
695 68         2351 $_task_id++;
696             }
697              
698             # Then, spawn threads last.
699 45         841 $_task_id = $_wid = 0;
700              
701 45         421 for my $_task (@{ $self->{user_tasks} }) {
  45         390  
702 56         704 my $_tsk_use_threads = $_task->{use_threads};
703              
704 56 50 33     2163 if (defined $_tsk_use_threads && $_tsk_use_threads == 1) {
705             _dispatch_thread($self, ++$_wid, $_task, $_task_id, $_)
706 0         0 for (1 .. $_task->{max_workers});
707             } else {
708 56         412 $_wid += $_task->{max_workers};
709             }
710              
711 56         320 $_task_id++;
712             }
713              
714             # Save state.
715 45         205 $_task_id = $_wid = 0;
716              
717 45         177 for my $_task (@{ $self->{user_tasks} }) {
  45         233  
718             $self->{_task}->[$_task_id] = {
719             _total_running => 0, _total_workers => $_task->{max_workers}
720 56         2134 };
721 56         405 for my $_i (1 .. $_task->{max_workers}) {
722 101         306 $_wid += 1;
723 101         2666 $self->{_state}->[$_wid] = {
724             _task => $_task, _task_id => $_task_id, _task_wid => $_i,
725             _params => undef, _chn => $_wid % $_data_channels + 1
726             }
727             }
728              
729 56         234 $_task_id++;
730             }
731             }
732              
733             ## -------------------------------------------------------------------------
734              
735 61         837 $self->{_send_cnt} = 0, $self->{_spawned} = 1;
736              
737 61         1170 $SIG{__DIE__} = $_die_handler;
738 61         900 $SIG{__WARN__} = $_warn_handler;
739              
740 61 50       1388 $MCE = $self if ($MCE->{_wid} == 0);
741              
742 61 0 33     1000 $MCE::_GMUTEX->unlock() if ($_tid && $MCE::_GMUTEX);
743              
744 61         9356 return $self;
745             }
746              
747             ###############################################################################
748             ## ----------------------------------------------------------------------------
749             ## Process method, relay stubs, and AUTOLOAD for methods not used often.
750             ##
751             ###############################################################################
752              
753             sub process {
754 106 50   106 0 2981 my $self = shift; $self = $MCE unless ref($self);
  106         326  
755              
756 106         426 _validate_runstate($self, 'MCE::process');
757              
758 106         184 my ($_params_ref, $_input_data);
759              
760 106 100 100     842 if (ref $_[0] eq 'HASH' && ref $_[1] eq 'HASH') {
    100          
761 6         26 $_params_ref = $_[0], $_input_data = $_[1];
762             } elsif (ref $_[0] eq 'HASH') {
763 95         201 $_params_ref = $_[0], $_input_data = $_[1];
764             } else {
765 5         11 $_params_ref = $_[1], $_input_data = $_[0];
766             }
767              
768 106         363 @_ = ();
769              
770             ## Set input data.
771 106 50 0     411 if (defined $_input_data) {
    0          
772 106         449 $_params_ref->{input_data} = $_input_data;
773             }
774             elsif ( !defined $_params_ref->{input_data} &&
775             !defined $_params_ref->{sequence} ) {
776 0         0 _croak('MCE::process: (input_data or sequence) is not specified');
777             }
778              
779             ## Pass 0 to "not" auto-shutdown after processing.
780 106         409 $self->run(0, $_params_ref);
781              
782 79         277 return $self;
783             }
784              
785             sub relay (;&) {
786             _croak('MCE::relay: (init_relay) is not defined')
787 5 0   5 0 39 unless (defined $MCE->{init_relay});
788             }
789              
790             {
791 85     85   767 no warnings 'once';
  85         182  
  85         446430  
792             *relay_unlock = \&relay;
793             }
794              
795             sub AUTOLOAD {
796             # $AUTOLOAD = MCE::
797              
798 0     0   0 my $_fcn = substr($MCE::AUTOLOAD, 5);
799 0 0       0 my $self = shift; $self = $MCE unless ref($self);
  0         0  
800              
801             # "for" sugar methods
802              
803 0 0       0 if ($_fcn eq 'forchunk') {
    0          
    0          
804 0 0       0 require MCE::Candy unless $INC{'MCE/Candy.pm'};
805 0         0 return MCE::Candy::forchunk($self, @_);
806             }
807             elsif ($_fcn eq 'foreach') {
808 0 0       0 require MCE::Candy unless $INC{'MCE/Candy.pm'};
809 0         0 return MCE::Candy::foreach($self, @_);
810             }
811             elsif ($_fcn eq 'forseq') {
812 0 0       0 require MCE::Candy unless $INC{'MCE/Candy.pm'};
813 0         0 return MCE::Candy::forseq($self, @_);
814             }
815              
816             # relay stubs for MCE::Relay
817              
818 0 0 0     0 if ($_fcn eq 'relay_lock' || $_fcn eq 'relay_recv') {
    0          
819             _croak('MCE::relay: (init_relay) is not defined')
820 0 0       0 unless (defined $MCE->{init_relay});
821             }
822             elsif ($_fcn eq 'relay_final') {
823 0         0 return;
824             }
825              
826             # worker immediately exits the chunking loop
827              
828 0 0       0 if ($_fcn eq 'last') {
    0          
    0          
    0          
829             _croak('MCE::last: method is not allowed by the manager process')
830 0 0       0 unless ($self->{_wid});
831              
832 0 0       0 $self->{_last_jmp}() if (defined $self->{_last_jmp});
833              
834 0         0 return;
835             }
836              
837             # worker starts the next iteration of the chunking loop
838              
839             elsif ($_fcn eq 'next') {
840             _croak('MCE::next: method is not allowed by the manager process')
841 0 0       0 unless ($self->{_wid});
842              
843 0 0       0 $self->{_next_jmp}() if (defined $self->{_next_jmp});
844              
845 0         0 return;
846             }
847              
848             # return the process ID, include thread ID for threads
849              
850             elsif ($_fcn eq 'pid') {
851 0 0 0     0 if (defined $self->{_pid}) {
    0          
852 0         0 return $self->{_pid};
853             } elsif ($_has_threads && $self->{use_threads}) {
854 0         0 return $$ .'.'. threads->tid();
855             }
856 0         0 return $$;
857             }
858              
859             # return the exit status
860             # _wrk_status holds the greatest exit status among workers exiting
861              
862             elsif ($_fcn eq 'status') {
863             _croak('MCE::status: method is not allowed by the worker process')
864 0 0       0 if ($self->{_wid});
865              
866 0 0       0 return (defined $self->{_wrk_status}) ? $self->{_wrk_status} : 0;
867             }
868              
869 0         0 _croak("Can't locate object method \"$_fcn\" via package \"MCE\"");
870             }
871              
872             ###############################################################################
873             ## ----------------------------------------------------------------------------
874             ## Restart worker method.
875             ##
876             ###############################################################################
877              
878             sub restart_worker {
879 0 0   0 0 0 my $self = shift; $self = $MCE unless ref($self);
  0         0  
880              
881 0         0 @_ = ();
882              
883             _croak('MCE::restart_worker: method is not allowed by the worker process')
884 0 0       0 if ($self->{_wid});
885              
886 0         0 my $_wid = $self->{_exited_wid};
887              
888 0         0 my $_params = $self->{_state}->[$_wid]->{_params};
889 0         0 my $_task_wid = $self->{_state}->[$_wid]->{_task_wid};
890 0         0 my $_task_id = $self->{_state}->[$_wid]->{_task_id};
891 0         0 my $_task = $self->{_state}->[$_wid]->{_task};
892 0         0 my $_chn = $self->{_state}->[$_wid]->{_chn};
893              
894 0         0 $_params->{_chn} = $_chn;
895              
896             my $_use_threads = (defined $_task_id)
897 0 0       0 ? $_task->{use_threads} : $self->{use_threads};
898              
899 0 0       0 $self->{_task}->[$_task_id]->{_total_running} += 1 if (defined $_task_id);
900 0 0       0 $self->{_task}->[$_task_id]->{_total_workers} += 1 if (defined $_task_id);
901              
902 0         0 $self->{_total_running} += 1;
903 0         0 $self->{_total_workers} += 1;
904              
905 0 0 0     0 if (defined $_use_threads && $_use_threads == 1) {
906 0         0 _dispatch_thread($self, $_wid, $_task, $_task_id, $_task_wid, $_params);
907             } else {
908 0         0 _dispatch_child($self, $_wid, $_task, $_task_id, $_task_wid, $_params);
909             }
910              
911 0         0 delete $self->{_retry_cnt};
912              
913 0 0 0     0 if (defined $self->{spawn_delay} && $self->{spawn_delay} > 0.0) {
    0 0        
914 0         0 sleep $self->{spawn_delay};
915             } elsif ($_tid || $_is_MSWin32) {
916 0         0 sleep 0.045;
917             }
918              
919 0         0 return;
920             }
921              
922             ###############################################################################
923             ## ----------------------------------------------------------------------------
924             ## Run method.
925             ##
926             ###############################################################################
927              
928             sub run {
929 207 50   207 0 720 my $self = shift; $self = $MCE unless ref($self);
  207         789  
930              
931             _croak('MCE::run: method is not allowed by the worker process')
932 207 50       634 if ($self->{_wid});
933              
934 207         337 my ($_auto_shutdown, $_params_ref);
935              
936 207 100       612 if (ref $_[0] eq 'HASH') {
937 81 50       291 $_auto_shutdown = (defined $_[1]) ? $_[1] : 1;
938 81         156 $_params_ref = $_[0];
939             } else {
940 126 100       288 $_auto_shutdown = (defined $_[0]) ? $_[0] : 1;
941 126         284 $_params_ref = $_[1];
942             }
943              
944 207         445 @_ = ();
945              
946 207 100       758 my $_has_user_tasks = (defined $self->{user_tasks}) ? 1 : 0;
947 207         434 my $_requires_shutdown = 0;
948              
949             ## Unset params if workers have already been sent user_data via send.
950             ## Set user_func to NOOP if not specified.
951              
952 207 50       677 $_params_ref = undef if ($self->{_send_cnt});
953              
954 207 50 66     1132 if (!defined $self->{user_func} && !defined $_params_ref->{user_func}) {
955 91         552 $self->{user_func} = \&MCE::Signal::_NOOP;
956             }
957              
958             ## Set user specified params if specified.
959             ## Shutdown workers if determined by _sync_params or if processing a
960             ## scalar reference. Workers need to be restarted in order to pick up
961             ## on the new code or scalar reference.
962              
963 207 100 66     1444 if (defined $_params_ref && ref $_params_ref eq 'HASH') {
964 187         888 $_requires_shutdown = _sync_params($self, $_params_ref);
965 187         898 _validate_args($self);
966             }
967 207 100       824 if ($_has_user_tasks) {
968             $self->{input_data} = $self->{user_tasks}->[0]->{input_data}
969 154 50       732 if ($self->{user_tasks}->[0]->{input_data});
970             $self->{use_slurpio} = $self->{user_tasks}->[0]->{use_slurpio}
971 154 50       524 if ($self->{user_tasks}->[0]->{use_slurpio});
972             $self->{parallel_io} = $self->{user_tasks}->[0]->{parallel_io}
973 154 50       430 if ($self->{user_tasks}->[0]->{parallel_io});
974             $self->{RS} = $self->{user_tasks}->[0]->{RS}
975 154 50       453 if ($self->{user_tasks}->[0]->{RS});
976             }
977 207 50       1047 if (ref $self->{input_data} eq 'SCALAR') {
978 0 0       0 if (refaddr($self->{input_data}) != $self->{_last_sref}) {
979 0         0 $_requires_shutdown = 1;
980             }
981 0         0 $self->{_last_sref} = refaddr($self->{input_data});
982             }
983              
984 207 50       1152 $self->shutdown() if ($_requires_shutdown);
985              
986             ## -------------------------------------------------------------------------
987              
988 207         444 $self->{_wrk_status} = 0;
989              
990             ## Spawn workers.
991 207 100       1615 $self->spawn() unless ($self->{_spawned});
992 142 50       992 return $self unless ($self->{_total_workers});
993              
994 142         1665 local $SIG{__DIE__} = \&MCE::Signal::_die_handler;
995 142         1232 local $SIG{__WARN__} = \&MCE::Signal::_warn_handler;
996              
997 142 50       939 $MCE = $self if ($MCE->{_wid} == 0);
998              
999 142         915 my ($_input_data, $_input_file, $_input_glob, $_seq);
1000 142         0 my ($_abort_msg, $_first_msg, $_run_mode, $_single_dim);
1001 142         447 my $_chunk_size = $self->{chunk_size};
1002              
1003             $_seq = ($_has_user_tasks && $self->{user_tasks}->[0]->{sequence})
1004             ? $self->{user_tasks}->[0]->{sequence}
1005 142 50 66     1569 : $self->{sequence};
1006              
1007             ## Determine run mode for workers.
1008 142 100       960 if (defined $_seq) {
    100          
1009             my ($_begin, $_end, $_step) = (ref $_seq eq 'ARRAY')
1010 18 50       109 ? @{ $_seq } : ($_seq->{begin}, $_seq->{end}, $_seq->{step});
  18         61  
1011              
1012             $_chunk_size = $self->{user_tasks}->[0]->{chunk_size}
1013 18 50 66     165 if ($_has_user_tasks && $self->{user_tasks}->[0]->{chunk_size});
1014              
1015 18         53 $_run_mode = 'sequence';
1016 18         58 $_abort_msg = int(($_end - $_begin) / $_step / $_chunk_size); # + 1;
1017              
1018             # Previously + 1 above. Below, support for large numbers, 1e16 and beyond.
1019             # E.g. sequence => [ 1, 1e16 ], chunk_size => 1e11
1020             #
1021             # Perl: int((1e15 - 1) / 1 / 1e11) = 9999
1022             # Perl: int((1e16 - 1) / 1 / 1e11) = 100000 wrong, due to precision limit
1023             # Calc: int((1e16 - 1) / 1 / 1e11) = 99999
1024              
1025 18 50       59 if ( $_step > 0 ) {
1026 18 50       72 $_abort_msg++
1027             if ($_abort_msg * $_chunk_size * abs($_step) + $_begin <= $_end);
1028             } else {
1029 0 0       0 $_abort_msg++
1030             if ($_abort_msg * $_chunk_size * abs($_step) + $_end <= $_begin);
1031             }
1032              
1033 18         31 $_first_msg = 0;
1034             }
1035             elsif (defined $self->{input_data}) {
1036 79         342 my $_ref = ref $self->{input_data};
1037              
1038 79 100       528 if ($_ref eq '') { # File mode
    100          
    100          
    50          
    0          
    0          
1039 18         79 $_run_mode = 'file';
1040 18         41 $_input_file = $self->{input_data};
1041 18         37 $_input_data = $_input_glob = undef;
1042 18         300 $_abort_msg = (-s $_input_file) + 1;
1043 18         54 $_first_msg = 0; ## Begin at offset position
1044              
1045 18 50       212 if ((-s $_input_file) == 0) {
1046 0 0       0 $self->shutdown() if ($_auto_shutdown == 1);
1047 0         0 return $self;
1048             }
1049             }
1050             elsif ($_ref eq 'ARRAY') { # Array mode
1051 40         205 $_run_mode = 'array';
1052 40         235 $_input_data = $self->{input_data};
1053 40         100 $_input_file = $_input_glob = undef;
1054 40 50       158 $_single_dim = 1 if (ref $_input_data->[0] eq '');
1055 40         106 $_abort_msg = 0; ## Flag: Has Data: No
1056 40         83 $_first_msg = 1; ## Flag: Has Data: Yes
1057              
1058 40 50       61 if (@{ $_input_data } == 0) {
  40         239  
1059 0 0       0 $self->shutdown() if ($_auto_shutdown == 1);
1060 0         0 return $self;
1061             }
1062             }
1063             elsif ($_ref eq 'HASH') { # Hash mode
1064 3         90 $_run_mode = 'hash';
1065 3         80 $_input_data = $self->{input_data};
1066 3         43 $_input_file = $_input_glob = undef;
1067 3         30 $_abort_msg = 0; ## Flag: Has Data: No
1068 3         26 $_first_msg = 1; ## Flag: Has Data: Yes
1069              
1070 3 50       49 if (scalar( keys %{ $_input_data } ) == 0) {
  3         117  
1071 0 0       0 $self->shutdown() if ($_auto_shutdown == 1);
1072 0         0 return $self;
1073             }
1074             }
1075             elsif ($_ref =~ /^(?:GLOB|FileHandle|IO::)/) { # Glob mode
1076 18         55 $_run_mode = 'glob';
1077 18         35 $_input_glob = $self->{input_data};
1078 18         46 $_input_data = $_input_file = undef;
1079 18         30 $_abort_msg = 0; ## Flag: Has Data: No
1080 18         43 $_first_msg = 1; ## Flag: Has Data: Yes
1081             }
1082             elsif ($_ref eq 'CODE') { # Iterator mode
1083 0         0 $_run_mode = 'iterator';
1084 0         0 $_input_data = $self->{input_data};
1085 0         0 $_input_file = $_input_glob = undef;
1086 0         0 $_abort_msg = 0; ## Flag: Has Data: No
1087 0         0 $_first_msg = 1; ## Flag: Has Data: Yes
1088             }
1089             elsif ($_ref eq 'SCALAR') { # Memory mode
1090 0         0 $_run_mode = 'memory';
1091 0         0 $_input_data = $_input_file = $_input_glob = undef;
1092 0         0 $_abort_msg = length(${ $self->{input_data} }) + 1;
  0         0  
1093 0         0 $_first_msg = 0; ## Begin at offset position
1094              
1095 0 0       0 if (length(${ $self->{input_data} }) == 0) {
  0         0  
1096 0 0       0 return $self->shutdown() if ($_auto_shutdown == 1);
1097             }
1098             }
1099             else {
1100 0         0 _croak('MCE::run: (input_data) is not valid');
1101             }
1102             }
1103             else { # Nodata mode
1104 45         570 $_abort_msg = undef, $_run_mode = 'nodata';
1105             }
1106              
1107             ## -------------------------------------------------------------------------
1108              
1109 142         766 my $_total_workers = $self->{_total_workers};
1110 142         415 my $_send_cnt = $self->{_send_cnt};
1111              
1112 142 50       846 if ($_send_cnt) {
1113 0         0 $self->{_total_running} = $_send_cnt;
1114 0         0 $self->{_task}->[0]->{_total_running} = $_send_cnt;
1115             }
1116             else {
1117 142         704 $self->{_total_running} = $_total_workers;
1118              
1119 142         448 my ($_frozen_nodata, $_wid, %_params_nodata, %_task0_wids);
1120 142         475 my $_COM_R_SOCK = $self->{_com_r_sock};
1121 142         315 my $_submit_delay = $self->{submit_delay};
1122              
1123             my %_params = (
1124             '_abort_msg' => $_abort_msg, '_chunk_size' => $_chunk_size,
1125             '_input_file' => $_input_file, '_run_mode' => $_run_mode,
1126             '_bounds_only' => $self->{bounds_only},
1127             '_max_retries' => $self->{max_retries},
1128             '_parallel_io' => $self->{parallel_io},
1129             '_progress' => $self->{progress} ? 1 : 0,
1130             '_sequence' => $self->{sequence},
1131             '_user_args' => $self->{user_args},
1132             '_use_slurpio' => $self->{use_slurpio},
1133             '_RS' => $self->{RS}
1134 142 50       4250 );
1135              
1136 142         4333 my $_frozen_params = $self->{freeze}(\%_params);
1137 142         920 $_frozen_params = length($_frozen_params).$LF . $_frozen_params;
1138              
1139 142 100       490 if ($_has_user_tasks) {
1140 108         1447 %_params_nodata = ( %_params,
1141             '_abort_msg' => undef, '_run_mode' => 'nodata'
1142             );
1143 108         1243 $_frozen_nodata = $self->{freeze}(\%_params_nodata);
1144 108         514 $_frozen_nodata = length($_frozen_nodata).$LF . $_frozen_nodata;
1145              
1146 108         181 for my $_t (@{ $self->{_task} }) {
  108         626  
1147 180         496 $_t->{_total_running} = $_t->{_total_workers};
1148             }
1149 108         260 for my $_i (1 .. @{ $self->{_state} } - 1) {
  108         400  
1150 347 100       2168 $_task0_wids{$_i} = undef unless ($self->{_state}[$_i]{_task_id});
1151             }
1152             }
1153              
1154 142         1287 local $\ = undef; local $/ = $LF;
  142         2194  
1155              
1156             ## Insert the first message into the queue if defined.
1157 142 100       503 if (defined $_first_msg) {
1158 97         1878 syswrite($self->{_que_w_sock}, pack($_que_template, 0, $_first_msg));
1159             }
1160              
1161             ## Submit params data to workers.
1162 142         722 for my $_i (1 .. $_total_workers) {
1163 428         597 print({$_COM_R_SOCK} $_i.$LF), chomp($_wid = <$_COM_R_SOCK>);
  428         157180  
1164              
1165 428 100 100     3994 if (!$_has_user_tasks || exists $_task0_wids{$_wid}) {
1166 282         481 print({$_COM_R_SOCK} $_frozen_params), <$_COM_R_SOCK>;
  282         43142  
1167 282         2675 $self->{_state}[$_wid]{_params} = \%_params;
1168             } else {
1169 146         209 print({$_COM_R_SOCK} $_frozen_nodata), <$_COM_R_SOCK>;
  146         28079  
1170 146         1152 $self->{_state}[$_wid]{_params} = \%_params_nodata;
1171             }
1172              
1173 428 50 33     2619 sleep $_submit_delay
1174             if defined($_submit_delay) && $_submit_delay > 0.0;
1175             }
1176             }
1177              
1178             ## -------------------------------------------------------------------------
1179              
1180 142         572 $self->{_total_exited} = 0;
1181              
1182             ## Call the output function.
1183 142 50       674 if ($self->{_total_running} > 0) {
1184 142         826 $self->{_mgr_live} = 1;
1185 142         346 $self->{_abort_msg} = $_abort_msg;
1186 142         899 $self->{_single_dim} = $_single_dim;
1187              
1188 142 50       451 lock $self->{_run_lock} if $_is_MSWin32;
1189              
1190 142 50       592 if (!$_send_cnt) {
1191             ## Notify workers to commence processing.
1192 142 50       470 if ($_is_MSWin32) {
1193 0         0 my $_buf = _sprintf("%${_total_workers}s", "");
1194 0         0 syswrite($self->{_bsb_r_sock}, $_buf);
1195             } else {
1196 142         550 my $_BSB_R_SOCK = $self->{_bsb_r_sock};
1197 142         455 for my $_i (1 .. $_total_workers) {
1198 428         11843 syswrite($_BSB_R_SOCK, $LF);
1199             }
1200             }
1201             }
1202              
1203 142         3759 _output_loop( $self, $_input_data, $_input_glob,
1204             \%_plugin_function, \@_plugin_loop_begin, \@_plugin_loop_end
1205             );
1206              
1207 142         723 $self->{_mgr_live} = $self->{_abort_msg} = $self->{_single_dim} = undef;
1208             }
1209              
1210             ## Remove the last message from the queue.
1211 142 100 66     1588 if (!$_send_cnt && $_run_mode ne 'nodata') {
1212             MCE::Util::_sysread($self->{_que_r_sock}, my($_buf), $_que_read_size)
1213 97 50       949 if ( defined $self->{_que_r_sock} );
1214             }
1215              
1216 142         523 $self->{_send_cnt} = 0;
1217              
1218             ## Shutdown workers.
1219 142 100 66     2350 if ($_auto_shutdown || $self->{_total_exited}) {
    50 33        
    50          
1220 10         191 $self->shutdown();
1221             }
1222             elsif ($INC{'MCE/Simple.pm'}) {
1223 0         0 $self->shutdown();
1224             }
1225             elsif ($^S || $ENV{'PERL_IPERL_RUNNING'}) {
1226 0 0 0     0 if (
      0        
      0        
      0        
      0        
1227             !$INC{'Mojo/IOLoop.pm'} && !$INC{'Win32/GUI.pm'} &&
1228             !$INC{'Gearman/XS.pm'} && !$INC{'Gearman/Util.pm'} &&
1229             !$INC{'Tk.pm'} && !$INC{'Wx.pm'}
1230             ) {
1231             # running inside eval or IPerl, check stack trace
1232 0         0 my $_t = Carp::longmess(); $_t =~ s/\teval [^\n]+\n$//;
  0         0  
1233              
1234 0 0 0     0 if ( $_t =~ /^(?:[^\n]+\n){1,7}\teval / ||
      0        
      0        
      0        
1235             $_t =~ /\n\teval [^\n]+\n\t(?:eval|Try)/ ||
1236             $_t =~ /\n\tMCE::_dispatch\(\) [^\n]+ thread \d+\n$/ ||
1237             ( $_tid && !$self->{use_threads} ) )
1238             {
1239 0         0 $self->shutdown();
1240             }
1241             }
1242             }
1243              
1244 142         1305 return $self;
1245             }
1246              
1247             ###############################################################################
1248             ## ----------------------------------------------------------------------------
1249             ## Send method.
1250             ##
1251             ###############################################################################
1252              
1253             sub send {
1254 0 0   0 0 0 my $self = shift; $self = $MCE unless ref($self);
  0         0  
1255              
1256             _croak('MCE::send: method is not allowed by the worker process')
1257 0 0       0 if ($self->{_wid});
1258             _croak('MCE::send: method is not allowed while running')
1259 0 0       0 if ($self->{_total_running});
1260              
1261             _croak('MCE::send: method cannot be used with input_data or sequence')
1262 0 0 0     0 if (defined $self->{input_data} || defined $self->{sequence});
1263             _croak('MCE::send: method cannot be used with user_tasks')
1264 0 0       0 if (defined $self->{user_tasks});
1265              
1266 0         0 my $_data_ref;
1267              
1268 0 0 0     0 if (ref $_[0] eq 'ARRAY' || ref $_[0] eq 'HASH' || ref $_[0] eq 'PDL') {
      0        
1269 0         0 $_data_ref = $_[0];
1270             } else {
1271 0         0 _croak('MCE::send: ARRAY, HASH, or a PDL reference is not specified');
1272             }
1273              
1274 0         0 @_ = ();
1275              
1276 0 0       0 $self->{_send_cnt} = 0 unless (defined $self->{_send_cnt});
1277              
1278             ## -------------------------------------------------------------------------
1279              
1280             ## Spawn workers.
1281 0 0       0 $self->spawn() unless ($self->{_spawned});
1282              
1283             _croak('MCE::send: Sending greater than # of workers is not allowed')
1284 0 0       0 if ($self->{_send_cnt} >= $self->{_task}->[0]->{_total_workers});
1285              
1286 0         0 local $SIG{__DIE__} = \&MCE::Signal::_die_handler;
1287 0         0 local $SIG{__WARN__} = \&MCE::Signal::_warn_handler;
1288              
1289             ## Begin data submission.
1290 0         0 local $\ = undef; local $/ = $LF;
  0         0  
1291              
1292 0         0 my $_COM_R_SOCK = $self->{_com_r_sock};
1293 0         0 my $_submit_delay = $self->{submit_delay};
1294 0         0 my $_frozen_data = $self->{freeze}($_data_ref);
1295 0         0 my $_len = length $_frozen_data;
1296              
1297             ## Submit data to worker.
1298 0         0 print({$_COM_R_SOCK} '_data'.$LF), <$_COM_R_SOCK>;
  0         0  
1299 0         0 print({$_COM_R_SOCK} $_len.$LF, $_frozen_data), <$_COM_R_SOCK>;
  0         0  
1300              
1301 0         0 $self->{_send_cnt} += 1;
1302              
1303 0 0 0     0 sleep $_submit_delay
1304             if defined($_submit_delay) && $_submit_delay > 0.0;
1305              
1306 0         0 return $self;
1307             }
1308              
1309             ###############################################################################
1310             ## ----------------------------------------------------------------------------
1311             ## Shutdown method.
1312             ##
1313             ###############################################################################
1314              
1315             sub shutdown {
1316 61 50   61 0 845 my $self = shift; $self = $MCE unless ref($self);
  61         284  
1317 61   50     882 my $_no_lock = shift || 0;
1318              
1319 61         234 @_ = ();
1320              
1321             ## Return unless spawned or already shutdown.
1322 61 50       316 return unless $self->{_spawned};
1323              
1324             ## Return if signaled.
1325 61 50       226 if ($MCE::Signal::KILLED) {
1326 0 0       0 if (defined $self->{_sess_dir}) {
1327 0         0 my $_sess_dir = delete $self->{_sess_dir};
1328 0 0       0 rmdir $_sess_dir if -d $_sess_dir;
1329             }
1330 0         0 return;
1331             }
1332              
1333 61         1013 _validate_runstate($self, 'MCE::shutdown');
1334              
1335             ## Complete processing before shutting down.
1336 61 50       240 $self->run(0) if ($self->{_send_cnt});
1337              
1338 61         436 local $SIG{__DIE__} = \&MCE::Signal::_die_handler;
1339 61         265 local $SIG{__WARN__} = \&MCE::Signal::_warn_handler;
1340              
1341 61         172 my $_COM_R_SOCK = $self->{_com_r_sock};
1342 61         142 my $_data_channels = $self->{_data_channels};
1343 61         166 my $_total_workers = $self->{_total_workers};
1344 61         167 my $_sess_dir = $self->{_sess_dir};
1345              
1346 61 50 33     1192 if (defined $TOP_HDLR && refaddr($self) == refaddr($TOP_HDLR)) {
1347 61         241 $TOP_HDLR = undef;
1348             }
1349              
1350             ## -------------------------------------------------------------------------
1351              
1352 61 0 33     216 lock $_MCE_LOCK if ($_has_threads && $_is_winenv && !$_no_lock);
      33        
1353              
1354             ## Notify workers to exit loop.
1355 61         315 local ($!, $?, $_); local $\ = undef; local $/ = $LF;
  61         202  
  61         241  
1356              
1357 61         321 for (1 .. $_total_workers) {
1358 146         448 print({$_COM_R_SOCK} '_exit'.$LF), <$_COM_R_SOCK>;
  146         25948  
1359             }
1360              
1361             ## Reap children and/or threads.
1362 61 50       291 if (@{ $self->{_pids} } > 0) {
  61         369  
1363 61         175 my $_list = $self->{_pids};
1364 61         395 for my $i (0 .. @{ $_list }) {
  61         852  
1365 207 100       157927298 waitpid($_list->[$i], 0) if $_list->[$i];
1366             }
1367             }
1368 61 50       419 if (@{ $self->{_thrs} } > 0) {
  61         604  
1369 0         0 my $_list = $self->{_thrs};
1370 0         0 for my $i (0 .. @{ $_list }) {
  0         0  
1371 0 0       0 $_list->[$i]->join() if $_list->[$i];
1372             }
1373             }
1374              
1375             ## Close sockets.
1376 61         369 $_COM_R_SOCK = undef;
1377              
1378 61         1591 MCE::Util::_destroy_socks($self, qw(
1379             _bsb_w_sock _bsb_r_sock _com_w_sock _com_r_sock _que_w_sock _que_r_sock
1380             _dat_w_sock _dat_r_sock _rla_w_sock _rla_r_sock
1381             ));
1382              
1383             ## -------------------------------------------------------------------------
1384              
1385             ## Destroy mutexes.
1386 61         338 for my $_i (0 .. $_data_channels) { delete $self->{'_mutex_'.$_i}; }
  207         2207  
1387 61         272 for my $_j (10 .. 11) { delete $self->{'_mutex_'.$_j}; } # input mutexes
  122         862  
1388              
1389             ## Remove session directory.
1390 61 50 33     576 rmdir $_sess_dir if (defined $_sess_dir && -d $_sess_dir);
1391              
1392             ## Reset instance.
1393 61         161 undef @{$self->{_pids}}; undef @{$self->{_thrs}}; undef @{$self->{_tids}};
  61         281  
  61         152  
  61         164  
  61         130  
  61         203  
1394 61         167 undef @{$self->{_state}}; undef @{$self->{_status}}; undef @{$self->{_task}};
  61         815  
  61         152  
  61         215  
  61         133  
  61         264  
1395              
1396 61         292 $self->{_chunk_id} = $self->{_send_cnt} = $self->{_spawned} = 0;
1397 61         184 $self->{_total_running} = $self->{_total_exited} = 0;
1398 61         153 $self->{_total_workers} = 0;
1399 61         220 $self->{_sess_dir} = undef;
1400              
1401 61 50       262 if ($self->{loop_timeout}) {
1402 0         0 delete $self->{_pids_t};
1403 0         0 delete $self->{_pids_w};
1404             }
1405              
1406 61         895 return;
1407             }
1408              
1409             ###############################################################################
1410             ## ----------------------------------------------------------------------------
1411             ## Barrier sync and yield methods.
1412             ##
1413             ###############################################################################
1414              
1415             sub sync {
1416 0 0   0 0 0 my $self = shift; $self = $MCE unless ref($self);
  0         0  
1417              
1418 0 0       0 return unless ($self->{_wid});
1419              
1420             ## Barrier synchronization is supported for task 0 at this time.
1421             ## Note: Workers are assigned task_id 0 when omitting user_tasks.
1422              
1423 0 0       0 return if ($self->{_task_id} > 0);
1424              
1425 0         0 my $_chn = $self->{_chn};
1426 0         0 my $_DAT_W_SOCK = $self->{_dat_w_sock}->[0];
1427 0         0 my $_BSB_R_SOCK = $self->{_bsb_r_sock};
1428 0         0 my $_BSB_W_SOCK = $self->{_bsb_w_sock};
1429 0         0 my $_buf;
1430              
1431 0 0       0 local $\ = undef if (defined $\);
1432              
1433             ## Notify the manager process (barrier begin).
1434 0         0 print {$_DAT_W_SOCK} OUTPUT_B_SYN.$LF . $_chn.$LF;
  0         0  
1435              
1436             ## Wait until all workers from (task_id 0) have synced.
1437 0 0       0 MCE::Util::_sock_ready($_BSB_R_SOCK, -1) if $_is_MSWin32;
1438 0         0 MCE::Util::_sysread($_BSB_R_SOCK, $_buf, 1);
1439              
1440             ## Notify the manager process (barrier end).
1441 0         0 print {$_DAT_W_SOCK} OUTPUT_E_SYN.$LF . $_chn.$LF;
  0         0  
1442              
1443             ## Wait until all workers from (task_id 0) have un-synced.
1444 0 0       0 MCE::Util::_sock_ready($_BSB_W_SOCK, -1) if $_is_MSWin32;
1445 0         0 MCE::Util::_sysread($_BSB_W_SOCK, $_buf, 1);
1446              
1447 0         0 return;
1448             }
1449              
1450             sub yield {
1451 0 0   0 0 0 my $self = shift; $self = $MCE unless ref($self);
  0         0  
1452              
1453 0 0       0 return unless ($self->{_wid});
1454              
1455 0         0 my $_chn = $self->{_chn};
1456 0         0 my $_DAT_LOCK = $self->{_dat_lock};
1457 0         0 my $_DAT_W_SOCK = $self->{_dat_w_sock}->[0];
1458 0         0 my $_DAU_W_SOCK = $self->{_dat_w_sock}->[$_chn];
1459 0         0 my $_lock_chn = $self->{_lock_chn};
1460 0         0 my $_delay;
1461              
1462 0 0       0 local $\ = undef if (defined $\);
1463 0 0 0     0 local $/ = $LF if (!$/ || $/ ne $LF);
1464              
1465 0 0       0 $_DAT_LOCK->lock() if $_lock_chn;
1466 0         0 print({$_DAT_W_SOCK} OUTPUT_I_DLY.$LF . $_chn.$LF),
1467 0         0 print({$_DAU_W_SOCK} $self->{_task_id}.$LF);
  0         0  
1468 0         0 chomp($_delay = <$_DAU_W_SOCK>);
1469 0 0       0 $_DAT_LOCK->unlock() if $_lock_chn;
1470              
1471 0         0 MCE::Util::_sleep( $_delay );
1472             }
1473              
1474             ###############################################################################
1475             ## ----------------------------------------------------------------------------
1476             ## Miscellaneous methods: abort exit sess_dir tmp_dir.
1477             ##
1478             ###############################################################################
1479              
1480             ## Abort current job.
1481              
1482             sub abort {
1483 0 0   0 0 0 my $self = shift; $self = $MCE unless ref($self);
  0         0  
1484              
1485 0         0 my $_QUE_R_SOCK = $self->{_que_r_sock};
1486 0         0 my $_QUE_W_SOCK = $self->{_que_w_sock};
1487 0         0 my $_abort_msg = $self->{_abort_msg};
1488              
1489 0 0       0 if (defined $_abort_msg) {
1490 0         0 local $\ = undef;
1491              
1492 0 0       0 if ($_abort_msg > 0) {
1493 0         0 MCE::Util::_sysread($_QUE_R_SOCK, my($_next), $_que_read_size);
1494 0         0 syswrite($_QUE_W_SOCK, pack($_que_template, 0, $_abort_msg));
1495             }
1496              
1497 0 0       0 if ($self->{_wid} > 0) {
1498 0         0 my $_chn = $self->{_chn};
1499 0         0 my $_DAT_LOCK = $self->{_dat_lock};
1500 0         0 my $_DAT_W_SOCK = $self->{_dat_w_sock}->[0];
1501 0         0 my $_DAU_W_SOCK = $self->{_dat_w_sock}->[$_chn];
1502 0         0 my $_lock_chn = $self->{_lock_chn};
1503              
1504 0 0       0 $_DAT_LOCK->lock() if $_lock_chn;
1505 0         0 print {$_DAT_W_SOCK} OUTPUT_W_ABT.$LF . $_chn.$LF;
  0         0  
1506 0 0       0 $_DAT_LOCK->unlock() if $_lock_chn;
1507             }
1508             }
1509              
1510 0         0 return;
1511             }
1512              
1513             ## Worker exits from MCE.
1514              
1515             sub exit {
1516 0 0   0 0 0 my $self = shift; $self = $MCE unless ref($self);
  0         0  
1517              
1518 0 0       0 my $_exit_status = (defined $_[0]) ? $_[0] : $?;
1519 0 0       0 my $_exit_msg = (defined $_[1]) ? $_[1] : '';
1520 0 0       0 my $_exit_id = (defined $_[2]) ? $_[2] : $self->chunk_id;
1521              
1522 0         0 @_ = ();
1523              
1524             _croak('MCE::exit: method is not allowed by the manager process')
1525 0 0       0 unless ($self->{_wid});
1526              
1527 0         0 my $_chn = $self->{_chn};
1528 0         0 my $_DAT_LOCK = $self->{_dat_lock};
1529 0         0 my $_DAT_W_SOCK = $self->{_dat_w_sock}->[0];
1530 0         0 my $_DAU_W_SOCK = $self->{_dat_w_sock}->[$_chn];
1531 0         0 my $_lock_chn = $self->{_lock_chn};
1532 0         0 my $_task_id = $self->{_task_id};
1533              
1534 0 0       0 unless ( $self->{_exiting} ) {
1535 0         0 $self->{_exiting} = 1;
1536              
1537 0 0       0 my $_pid = $self->{_is_thread} ? $$ .'.'. threads->tid() : $$;
1538 0         0 my $_max_retries = $self->{max_retries};
1539 0         0 my $_chunk_id = $self->{_chunk_id};
1540              
1541 0 0 0     0 if ( defined $self->{init_relay} && !$self->{_relayed} && !$_task_id &&
      0        
      0        
      0        
1542             exists $self->{_wuf} && $self->{_pid} eq $_pid ) {
1543              
1544 0 0       0 $self->{_retry_cnt} = -1 unless defined( $self->{_retry_cnt} );
1545              
1546 0 0 0     0 if ( !$_max_retries || ++$self->{_retry_cnt} == $_max_retries ) {
1547 0 0   0   0 MCE::relay { warn "Error: chunk $_chunk_id failed\n" if $_chunk_id };
  0         0  
1548             }
1549             }
1550              
1551             ## Check for nested workers not yet joined.
1552 0 0       0 MCE::Child->finish('MCE') if $INC{'MCE/Child.pm'};
1553              
1554             MCE::Hobo->finish('MCE')
1555 0 0 0     0 if ( $INC{'MCE/Hobo.pm'} && MCE::Hobo->can('_clear') );
1556              
1557 0 0       0 local $\ = undef if (defined $\);
1558 0         0 my $_len = length $_exit_msg;
1559              
1560 0         0 $_exit_id =~ s/[\r\n][\r\n]*/ /mg;
1561 0 0       0 $_DAT_LOCK->lock() if $_lock_chn;
1562              
1563 0 0 0     0 if ($self->{_retry} && $self->{_retry}->[2]--) {
1564 0         0 $_exit_status = 0; my $_buf = $self->{freeze}($self->{_retry});
  0         0  
1565 0         0 print({$_DAT_W_SOCK} OUTPUT_W_EXT.$LF . $_chn.$LF),
1566 0         0 print({$_DAU_W_SOCK}
1567 0         0 $_task_id.$LF . $self->{_wid}.$LF . $self->{_exit_pid}.$LF .
1568             $_exit_status.$LF . $_exit_id.$LF . $_len.$LF . $_exit_msg .
1569             length($_buf).$LF, $_buf
1570             );
1571             }
1572             else {
1573 0         0 print({$_DAT_W_SOCK} OUTPUT_W_EXT.$LF . $_chn.$LF),
1574 0         0 print({$_DAU_W_SOCK}
1575 0         0 $_task_id.$LF . $self->{_wid}.$LF . $self->{_exit_pid}.$LF .
1576             $_exit_status.$LF . $_exit_id.$LF . $_len.$LF . $_exit_msg .
1577             '0'.$LF
1578             );
1579             }
1580              
1581 0 0       0 $_DAT_LOCK->unlock() if $_lock_chn;
1582             }
1583              
1584 0         0 _exit($self);
1585             }
1586              
1587             ## Return the session dir, made on demand.
1588              
1589             sub sess_dir {
1590 0 0   0 0 0 my $self = shift; $self = $MCE unless ref($self);
  0         0  
1591 0 0       0 return $self->{_sess_dir} if defined $self->{_sess_dir};
1592              
1593 0 0       0 if ($self->{_wid} == 0) {
1594             $self->{_sess_dir} = $self->{_spawned}
1595 0 0       0 ? _make_sessdir($self) : undef;
1596             }
1597             else {
1598 0         0 my $_chn = $self->{_chn};
1599 0         0 my $_DAT_LOCK = $self->{_dat_lock};
1600 0         0 my $_DAT_W_SOCK = $self->{_dat_w_sock}->[0];
1601 0         0 my $_DAU_W_SOCK = $self->{_dat_w_sock}->[$_chn];
1602 0         0 my $_lock_chn = $self->{_lock_chn};
1603 0         0 my $_sess_dir;
1604              
1605 0 0       0 local $\ = undef if (defined $\);
1606 0 0 0     0 local $/ = $LF if (!$/ || $/ ne $LF);
1607              
1608 0 0       0 $_DAT_LOCK->lock() if $_lock_chn;
1609 0         0 print({$_DAT_W_SOCK} OUTPUT_S_DIR.$LF . $_chn.$LF);
  0         0  
1610 0         0 chomp($_sess_dir = <$_DAU_W_SOCK>);
1611 0 0       0 $_DAT_LOCK->unlock() if $_lock_chn;
1612              
1613 0         0 $self->{_sess_dir} = $_sess_dir;
1614             }
1615             }
1616              
1617             ## Return the temp dir, made on demand.
1618              
1619             sub tmp_dir {
1620 31 50   31 0 2830 my $self = shift; $self = $MCE unless ref($self);
  31         158  
1621 31 50       257 return $self->{tmp_dir} if defined $self->{tmp_dir};
1622              
1623 31 50       131 if ($self->{_wid} == 0) {
1624 31         136 $self->{tmp_dir} = MCE::Signal::_make_tmpdir();
1625             }
1626             else {
1627 0         0 my $_chn = $self->{_chn};
1628 0         0 my $_DAT_LOCK = $self->{_dat_lock};
1629 0         0 my $_DAT_W_SOCK = $self->{_dat_w_sock}->[0];
1630 0         0 my $_DAU_W_SOCK = $self->{_dat_w_sock}->[$_chn];
1631 0         0 my $_lock_chn = $self->{_lock_chn};
1632 0         0 my $_tmp_dir;
1633              
1634 0 0       0 local $\ = undef if (defined $\);
1635 0 0 0     0 local $/ = $LF if (!$/ || $/ ne $LF);
1636              
1637 0 0       0 $_DAT_LOCK->lock() if $_lock_chn;
1638 0         0 print({$_DAT_W_SOCK} OUTPUT_T_DIR.$LF . $_chn.$LF);
  0         0  
1639 0         0 chomp($_tmp_dir = <$_DAU_W_SOCK>);
1640 0 0       0 $_DAT_LOCK->unlock() if $_lock_chn;
1641              
1642 0         0 $self->{tmp_dir} = $_tmp_dir;
1643             }
1644             }
1645              
1646             ###############################################################################
1647             ## ----------------------------------------------------------------------------
1648             ## Methods for serializing data from workers to the main process.
1649             ##
1650             ###############################################################################
1651              
1652             ## Do method. Additional arguments are optional.
1653              
1654             sub do {
1655 133 100   133 0 1170 my $self = shift; $self = $MCE unless ref($self);
  133         414  
1656 133 50       614 my $_pkg = caller() eq 'MCE' ? caller(1) : caller();
1657              
1658 133 50       359 _croak('MCE::do: (code ref) is not supported')
1659             if (ref $_[0] eq 'CODE');
1660 133 50       557 _croak('MCE::do: (callback) is not specified')
1661             unless (defined ( my $_func = shift ));
1662              
1663 133 50       646 $_func = $_pkg.'::'.$_func if (index($_func, ':') < 0);
1664              
1665 133 50       369 if ($self->{_wid}) {
1666 133         1307 return _do_callback($self, $_func, [ @_ ]);
1667             }
1668             else {
1669 85     85   868 no strict 'refs';
  85         333  
  85         275971  
1670 0         0 return $_func->(@_);
1671             }
1672             }
1673              
1674             ## Gather method.
1675              
1676             sub gather {
1677 357 50   357 0 2434 my $self = shift; $self = $MCE unless ref($self);
  357         941  
1678              
1679             _croak('MCE::gather: method is not allowed by the manager process')
1680 357 50       855 unless ($self->{_wid});
1681              
1682 357         1632 return _do_gather($self, [ @_ ]);
1683             }
1684              
1685             ## Sendto method.
1686              
1687             {
1688             my %_sendto_lkup = (
1689             'file' => SENDTO_FILEV1, 'stderr' => SENDTO_STDERR,
1690             'file:' => SENDTO_FILEV2, 'stdout' => SENDTO_STDOUT,
1691             'fd:' => SENDTO_FD,
1692             );
1693              
1694             my $_v2_regx = qr/^([^:]+:)(.+)/;
1695              
1696             sub sendto {
1697              
1698 0 0   0 0 0 my $self = shift; $self = $MCE unless ref($self);
  0         0  
1699 0         0 my $_to = shift;
1700              
1701             _croak('MCE::sendto: method is not allowed by the manager process')
1702 0 0       0 unless ($self->{_wid});
1703              
1704 0 0       0 return unless (defined $_[0]);
1705              
1706             my $_dest = exists $_sendto_lkup{ lc($_to) }
1707 0 0       0 ? $_sendto_lkup{ lc($_to) } : undef;
1708 0         0 my $_value;
1709              
1710 0 0       0 if (!defined $_dest) {
1711 0         0 my $_fd;
1712              
1713 0 0 0     0 if (ref($_to) && ( defined ($_fd = fileno($_to)) ||
    0 0        
1714             defined ($_fd = eval { $_to->fileno }) )) {
1715              
1716 0 0       0 if (my $_ob = tied *{ $_to }) {
  0         0  
1717 0 0       0 if (ref $_ob eq 'IO::TieCombine::Handle') {
1718 0 0       0 $_fd = 1 if (lc($_ob->{slot_name}) eq 'stdout');
1719 0 0       0 $_fd = 2 if (lc($_ob->{slot_name}) eq 'stderr');
1720             }
1721             }
1722              
1723 0 0       0 my $_data_ref = (scalar @_ == 1) ? \(''.$_[0]) : \join('', @_);
1724 0         0 return _do_send_glob($self, $_to, $_fd, $_data_ref);
1725             }
1726             elsif (reftype($_to) eq 'GLOB') {
1727 0         0 return _croak('Cannot write to filehandle');
1728             }
1729              
1730 0 0 0     0 if (defined $_to && $_to =~ /$_v2_regx/o) {
1731             $_dest = exists $_sendto_lkup{ lc($1) }
1732 0 0       0 ? $_sendto_lkup{ lc($1) } : undef;
1733 0         0 $_value = $2;
1734             }
1735              
1736 0 0 0     0 if (!defined $_dest || ( !defined $_value && (
      0        
      0        
1737             $_dest == SENDTO_FILEV2 || $_dest == SENDTO_FD
1738             ))) {
1739 0         0 my $_msg = "\n";
1740 0         0 $_msg .= "MCE::sendto: improper use of method\n";
1741 0         0 $_msg .= "\n";
1742 0         0 $_msg .= "## usage:\n";
1743 0         0 $_msg .= "## ->sendto(\"stderr\", ...);\n";
1744 0         0 $_msg .= "## ->sendto(\"stdout\", ...);\n";
1745 0         0 $_msg .= "## ->sendto(\"file:/path/to/file\", ...);\n";
1746 0         0 $_msg .= "## ->sendto(\"fd:2\", ...);\n";
1747 0         0 $_msg .= "\n";
1748              
1749 0         0 _croak($_msg);
1750             }
1751             }
1752              
1753 0 0       0 if ($_dest == SENDTO_FILEV1) { # sendto 'file', $a, $path
1754 0 0 0     0 return if (!defined $_[1] || @_ > 2); # Please switch to using V2
1755 0         0 $_value = $_[1]; delete $_[1]; # sendto 'file:/path', $a
  0         0  
1756 0         0 $_dest = SENDTO_FILEV2;
1757             }
1758              
1759 0         0 return _do_send($self, $_dest, $_value, @_);
1760             }
1761             }
1762              
1763             ###############################################################################
1764             ## ----------------------------------------------------------------------------
1765             ## Functions for serializing print, printf and say statements.
1766             ##
1767             ###############################################################################
1768              
1769             sub print {
1770 0 0   0 0 0 my $self = shift; $self = $MCE unless ref($self);
  0         0  
1771 0         0 my ($_fd, $_glob, $_data);
1772              
1773 0 0 0     0 if (ref($_[0]) && ( defined ($_fd = fileno($_[0])) ||
    0 0        
1774             defined ($_fd = eval { $_[0]->fileno }) )) {
1775              
1776 0 0       0 if (my $_ob = tied *{ $_[0] }) {
  0         0  
1777 0 0       0 if (ref $_ob eq 'IO::TieCombine::Handle') {
1778 0 0       0 $_fd = 1 if (lc($_ob->{slot_name}) eq 'stdout');
1779 0 0       0 $_fd = 2 if (lc($_ob->{slot_name}) eq 'stderr');
1780             }
1781             }
1782              
1783 0         0 $_glob = shift;
1784             }
1785             elsif (reftype($_[0]) eq 'GLOB') {
1786 0         0 return _croak('Cannot write to filehandle');
1787             }
1788              
1789 0 0       0 $_data = join('', scalar @_ ? @_ : $_);
1790              
1791 0 0       0 return _do_send_glob($self, $_glob, $_fd, \$_data) if $_fd;
1792 0 0       0 return _do_send($self, SENDTO_STDOUT, undef, \$_data) if $self->{_wid};
1793 0         0 return _do_send_glob($self, \*STDOUT, 1, \$_data);
1794             }
1795              
1796             sub printf {
1797 0 0   0 0 0 my $self = shift; $self = $MCE unless ref($self);
  0         0  
1798 0         0 my ($_fd, $_glob, $_fmt, $_data);
1799              
1800 0 0 0     0 if (ref($_[0]) && ( defined ($_fd = fileno($_[0])) ||
    0 0        
1801             defined ($_fd = eval { $_[0]->fileno }) )) {
1802              
1803 0 0       0 if (my $_ob = tied *{ $_[0] }) {
  0         0  
1804 0 0       0 if (ref $_ob eq 'IO::TieCombine::Handle') {
1805 0 0       0 $_fd = 1 if (lc($_ob->{slot_name}) eq 'stdout');
1806 0 0       0 $_fd = 2 if (lc($_ob->{slot_name}) eq 'stderr');
1807             }
1808             }
1809              
1810 0         0 $_glob = shift;
1811             }
1812             elsif (reftype($_[0]) eq 'GLOB') {
1813 0         0 return _croak('Cannot write to filehandle');
1814             }
1815              
1816 0   0     0 $_fmt = shift || '%s';
1817 0 0       0 $_data = _sprintf($_fmt, scalar @_ ? @_ : $_);
1818              
1819 0 0       0 return _do_send_glob($self, $_glob, $_fd, \$_data) if $_fd;
1820 0 0       0 return _do_send($self, SENDTO_STDOUT, undef, \$_data) if $self->{_wid};
1821 0         0 return _do_send_glob($self, \*STDOUT, 1, \$_data);
1822             }
1823              
1824             sub say {
1825 0 0   0 0 0 my $self = shift; $self = $MCE unless ref($self);
  0         0  
1826 0         0 my ($_fd, $_glob, $_data);
1827              
1828 0 0 0     0 if (ref($_[0]) && ( defined ($_fd = fileno($_[0])) ||
    0 0        
1829             defined ($_fd = eval { $_[0]->fileno }) )) {
1830              
1831 0 0       0 if (my $_ob = tied *{ $_[0] }) {
  0         0  
1832 0 0       0 if (ref $_ob eq 'IO::TieCombine::Handle') {
1833 0 0       0 $_fd = 1 if (lc($_ob->{slot_name}) eq 'stdout');
1834 0 0       0 $_fd = 2 if (lc($_ob->{slot_name}) eq 'stderr');
1835             }
1836             }
1837              
1838 0         0 $_glob = shift;
1839             }
1840             elsif (reftype($_[0]) eq 'GLOB') {
1841 0         0 return _croak('Cannot write to filehandle');
1842             }
1843              
1844 0 0       0 $_data = join('', scalar @_ ? @_ : $_) . "\n";
1845              
1846 0 0       0 return _do_send_glob($self, $_glob, $_fd, \$_data) if $_fd;
1847 0 0       0 return _do_send($self, SENDTO_STDOUT, undef, \$_data) if $self->{_wid};
1848 0         0 return _do_send_glob($self, \*STDOUT, 1, \$_data);
1849             }
1850              
1851             ###############################################################################
1852             ## ----------------------------------------------------------------------------
1853             ## Private methods.
1854             ##
1855             ###############################################################################
1856              
1857             sub _exit {
1858 65     65   412 my $self = shift;
1859 65 50 33     993 my $_has_guard = (exists $self->{_guard} && $self->{_guard}->[0]) ? 1 : 0;
1860              
1861 65 50       330 @{ $self->{_guard} } = () if $_has_guard;
  0         0  
1862 65         252 delete $self->{_wuf}; _end();
  65         1113  
1863              
1864             ## Exit thread/child process.
1865 65 50   0   1486 $SIG{__DIE__} = sub {} unless $_tid;
1866 65     0   837 $SIG{__WARN__} = sub {};
1867              
1868 65 50       416 threads->exit(0) if $self->{use_threads};
1869              
1870 65 50       304 if (! $_tid) {
1871             $SIG{HUP} = $SIG{INT} = $SIG{QUIT} = $SIG{TERM} = sub {
1872 0     0   0 $SIG{$_[0]} = $SIG{INT} = $SIG{TERM} = sub {};
1873              
1874 0 0 0     0 CORE::kill($_[0], getppid())
      0        
1875             if (($_[0] eq 'INT' || $_[0] eq 'TERM') && $^O ne 'MSWin32');
1876              
1877 0         0 CORE::kill('KILL', $$);
1878 65         3642 };
1879             }
1880              
1881 65 0 33     504 if ($self->{posix_exit} && !$_has_guard && !$_is_MSWin32) {
      33        
1882 0         0 eval { MCE::Mutex::Channel::_destroy() };
  0         0  
1883 0 0       0 POSIX::_exit(0) if $INC{'POSIX.pm'};
1884 0         0 CORE::kill('KILL', $$);
1885             }
1886              
1887 65         24175 CORE::exit(0);
1888             }
1889              
1890             sub _get_max_workers {
1891 126 50   126   325 my $self = shift; $self = $MCE unless ref($self);
  126         513  
1892              
1893 126 100       469 if (defined $self->{user_tasks}) {
1894 91 50       423 if (defined $self->{user_tasks}->[0]->{max_workers}) {
1895 91         911 return $self->{user_tasks}->[0]->{max_workers};
1896             }
1897             }
1898              
1899 35         88 return $self->{max_workers};
1900             }
1901              
1902             sub _make_sessdir {
1903 0 0   0   0 my $self = shift; $self = $MCE unless ref($self);
  0         0  
1904              
1905 0         0 my $_sess_dir = $self->{_sess_dir};
1906              
1907 0 0       0 unless (defined $_sess_dir) {
1908             $self->{tmp_dir} = MCE::Signal::_make_tmpdir()
1909 0 0       0 unless defined $self->{tmp_dir};
1910              
1911 0 0       0 my $_mce_tid = $INC{'threads.pm'} ? threads->tid() : '';
1912 0 0       0 $_mce_tid = '' unless defined $self->{_mce_tid};
1913              
1914 0         0 my $_mce_sid = $$ .'.'. $_mce_tid .'.'. (++$_mce_count);
1915 0         0 my $_tmp_dir = $self->{tmp_dir};
1916              
1917 0 0 0     0 _croak("MCE::sess_dir: (tmp_dir) is not defined")
1918             if (!defined $_tmp_dir || $_tmp_dir eq '');
1919 0 0       0 _croak("MCE::sess_dir: ($_tmp_dir) is not a directory or does not exist")
1920             unless (-d $_tmp_dir);
1921 0 0       0 _croak("MCE::sess_dir: ($_tmp_dir) is not writeable")
1922             unless (-w $_tmp_dir);
1923              
1924 0         0 my $_cnt = 0; $_sess_dir = "$_tmp_dir/$_mce_sid";
  0         0  
1925              
1926 0         0 $_sess_dir = "$_tmp_dir/$_mce_sid." . (++$_cnt)
1927             while ( !(mkdir $_sess_dir, 0770) );
1928             }
1929              
1930 0         0 return $_sess_dir;
1931             }
1932              
1933             sub _sprintf {
1934 0     0   0 my ($_fmt, $_arg) = @_;
1935             # remove tainted'ness
1936 0         0 ($_fmt) = $_fmt =~ /(.*)/s;
1937              
1938 0         0 return sprintf("$_fmt", $_arg);
1939             }
1940              
1941             sub _sync_buffer_to_array {
1942 5     5   27 my ($_buffer_ref, $_array_ref, $_chop_str) = @_;
1943              
1944 5         10 local $_; my $_cnt = 0;
  5         9  
1945              
1946 5         245 open my $_MEM_FH, '<', $_buffer_ref;
  5         9  
  5         159  
1947 5         5483 binmode $_MEM_FH, ':raw';
1948              
1949 5 50       26 unless (length $_chop_str) {
1950 5         87 $_array_ref->[$_cnt++] = $_ while (<$_MEM_FH>);
1951             }
1952             else {
1953 0         0 $_array_ref->[$_cnt++] = <$_MEM_FH>;
1954 0         0 while (<$_MEM_FH>) {
1955 0         0 $_array_ref->[$_cnt ] = $_chop_str;
1956 0         0 $_array_ref->[$_cnt++] .= $_;
1957             }
1958             }
1959              
1960 5         16 close $_MEM_FH;
1961 5         35 weaken $_MEM_FH;
1962              
1963 5         16 return;
1964             }
1965              
1966             sub _sync_params {
1967 187     187   512 my ($self, $_params_ref) = @_;
1968 187         329 my $_requires_shutdown = 0;
1969              
1970 187 50 33     603 if (defined $_params_ref->{init_relay} && !defined $self->{init_relay}) {
1971 0         0 $_requires_shutdown = 1;
1972             }
1973 187         630 for my $_p (qw( user_begin user_func user_end )) {
1974 561 50       1421 if (defined $_params_ref->{$_p}) {
1975 0         0 $self->{$_p} = delete $_params_ref->{$_p};
1976 0         0 $_requires_shutdown = 1;
1977             }
1978             }
1979 187         298 for my $_p (keys %{ $_params_ref }) {
  187         722  
1980             _croak("MCE::_sync_params: ($_p) is not a valid params argument")
1981 311 50       927 unless (exists $_params_allowed_args{$_p});
1982              
1983 311         632 $self->{$_p} = $_params_ref->{$_p};
1984             }
1985              
1986 187 100       719 return ($self->{_spawned}) ? $_requires_shutdown : 0;
1987             }
1988              
1989             ###############################################################################
1990             ## ----------------------------------------------------------------------------
1991             ## Dispatch methods.
1992             ##
1993             ###############################################################################
1994              
1995             sub _dispatch {
1996 65     65   2616 my @_args = @_; my $_is_thread = shift @_args;
  65         1343  
1997 65         1796 my $self = $MCE = $_args[0];
1998              
1999             ## To avoid (Scalars leaked: N) messages; fixed in Perl 5.12.x
2000 65         1111 @_ = ();
2001              
2002             $ENV{'PERL_MCE_IPC'} = 'win32' if ( $_is_MSWin32 && (
2003             defined($self->{max_retries}) ||
2004             $INC{'MCE/Child.pm'} ||
2005 65 0 0     2335 $INC{'MCE/Hobo.pm'}
      33        
2006             ));
2007              
2008 65         1180 delete $self->{_relayed};
2009              
2010 65         1782 $self->{_is_thread} = $_is_thread;
2011 65 50       3816 $self->{_pid} = $_is_thread ? $$ .'.'. threads->tid() : $$;
2012              
2013             ## Sets the seed of the base generator uniquely between workers.
2014             ## The new seed is computed using the current seed and $_wid value.
2015             ## One may set the seed at the application level for predictable
2016             ## results (non-thread workers only). Ditto for Math::Prime::Util,
2017             ## Math::Random, and Math::Random::MT::Auto.
2018              
2019             {
2020 65         1157 my ($_wid, $_seed) = ($_args[1], $self->{_seed});
  65         953  
2021 65         2064 srand(abs($_seed - ($_wid * 100000)) % 2147483560);
2022              
2023 65 50       1281 if (!$self->{use_threads}) {
2024             Math::Prime::Util::srand(abs($_seed - ($_wid * 100000)) % 2147483560)
2025 65 50       1205 if ( $INC{'Math/Prime/Util.pm'} );
2026              
2027             MCE::Hobo->_clear()
2028 65 50 33     2324 if ( $INC{'MCE/Hobo.pm'} && MCE::Hobo->can('_clear') );
2029              
2030 65 50       1332 MCE::Child->_clear() if $INC{'MCE/Child.pm'};
2031             }
2032             }
2033              
2034 65 50 33     2912 if (!$self->{use_threads} && $INC{'Math/Random.pm'}) {
2035 0         0 my ($_wid, $_cur_seed) = ($_args[1], Math::Random::random_get_seed());
2036              
2037 0 0       0 my $_new_seed = ($_cur_seed < 1073741781)
2038             ? $_cur_seed + (($_wid * 100000) % 1073741780)
2039             : $_cur_seed - (($_wid * 100000) % 1073741780);
2040              
2041 0         0 Math::Random::random_set_seed($_new_seed, $_new_seed);
2042             }
2043              
2044 65 50 33     1957 if (!$self->{use_threads} && $INC{'Math/Random/MT/Auto.pm'}) {
2045 0         0 my ($_wid, $_cur_seed) = (
2046             $_args[1], Math::Random::MT::Auto::get_seed()->[0]
2047             );
2048 0 0       0 my $_new_seed = ($_cur_seed < 1073741781)
2049             ? $_cur_seed + (($_wid * 100000) % 1073741780)
2050             : $_cur_seed - (($_wid * 100000) % 1073741780);
2051              
2052 0         0 Math::Random::MT::Auto::set_seed($_new_seed);
2053             }
2054              
2055             ## Run.
2056              
2057 65         4587 _worker_main(@_args, \@_plugin_worker_init);
2058              
2059 65         1455 _exit($self);
2060             }
2061              
2062             sub _dispatch_thread {
2063 0     0   0 my ($self, $_wid, $_task, $_task_id, $_task_wid, $_params) = @_;
2064              
2065 0         0 @_ = (); local $_;
  0         0  
2066              
2067 0         0 my $_thr = threads->create( \&_dispatch,
2068             1, $self, $_wid, $_task, $_task_id, $_task_wid, $_params
2069             );
2070              
2071 0 0       0 _croak("MCE::_dispatch_thread: Failed to spawn worker $_wid: $!")
2072             if (!defined $_thr);
2073              
2074             ## Store into an available slot (restart), otherwise append to arrays.
2075 0 0       0 if (defined $_params) { for my $_i (0 .. @{ $self->{_tids} } - 1) {
  0         0  
  0         0  
2076 0 0       0 unless (defined $self->{_tids}->[$_i]) {
2077 0         0 $self->{_thrs}->[$_i] = $_thr;
2078 0         0 $self->{_tids}->[$_i] = $_thr->tid();
2079 0         0 return;
2080             }
2081             }}
2082              
2083 0         0 push @{ $self->{_thrs} }, $_thr;
  0         0  
2084 0         0 push @{ $self->{_tids} }, $_thr->tid();
  0         0  
2085              
2086             sleep $self->{spawn_delay}
2087 0 0 0     0 if defined($self->{spawn_delay}) && $self->{spawn_delay} > 0.0;
2088              
2089 0         0 return;
2090             }
2091              
2092             sub _dispatch_child {
2093 277     277   1018 my ($self, $_wid, $_task, $_task_id, $_task_wid, $_params) = @_;
2094              
2095 277         562 @_ = (); local $_;
  277         428  
2096 277         312465 my $_pid = fork();
2097              
2098 277 50       11334 _croak("MCE::_dispatch_child: Failed to spawn worker $_wid: $!")
2099             if (!defined $_pid);
2100              
2101 277 100       8581 _dispatch(0, $self, $_wid, $_task, $_task_id, $_task_wid, $_params)
2102             if ($_pid == 0);
2103              
2104             ## Store into an available slot (restart), otherwise append to array.
2105 212 50       2023 if (defined $_params) { for my $_i (0 .. @{ $self->{_pids} } - 1) {
  0         0  
  0         0  
2106 0 0       0 unless (defined $self->{_pids}->[$_i]) {
2107 0         0 $self->{_pids}->[$_i] = $_pid;
2108 0         0 return;
2109             }
2110             }}
2111              
2112 212         5496 push @{ $self->{_pids} }, $_pid;
  212         5390  
2113              
2114 212 50 33     2633 if ($self->{loop_timeout} && !$_is_MSWin32) {
2115 0         0 $self->{_pids_t}{$_pid} = $_task_id;
2116 0         0 $self->{_pids_w}{$_pid} = $_wid;
2117             }
2118              
2119             sleep $self->{spawn_delay}
2120 212 50 33     2340 if defined($self->{spawn_delay}) && $self->{spawn_delay} > 0.0;
2121              
2122 212         10338 return;
2123             }
2124              
2125             1;
2126