File Coverage

blib/lib/MCE.pm
Criterion Covered Total %
statement 605 1080 56.0
branch 236 770 30.6
condition 89 375 23.7
subroutine 52 75 69.3
pod 0 20 0.0
total 982 2320 42.3


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## MCE - Many-Core Engine for Perl providing parallel processing capabilities.
4             ##
5             ###############################################################################
6              
7             package MCE;
8              
9 94     94   3648871 use strict;
  94         201  
  94         3874  
10 94     94   466 use warnings;
  94         528  
  94         5553  
11              
12 94     94   622 no warnings qw( threads recursion uninitialized );
  94         155  
  94         6673  
13              
14             our $VERSION = '1.902';
15              
16             ## no critic (BuiltinFunctions::ProhibitStringyEval)
17             ## no critic (Subroutines::ProhibitSubroutinePrototypes)
18             ## no critic (TestingAndDebugging::ProhibitNoStrict)
19              
20 94     94   598 use Carp ();
  94         200  
  94         29516  
21              
22             my ($_has_threads, $_freeze, $_thaw, $_tid, $_oid);
23              
24             BEGIN {
25 94     94   350 local $@;
26              
27 94 50 33     1297 if ( $^O eq 'MSWin32' && ! $INC{'threads.pm'} ) {
    50 33        
28 0         0 eval 'use threads; use threads::shared;';
29             }
30             elsif ( $INC{'threads.pm'} && ! $INC{'threads/shared.pm'} ) {
31 0         0 eval 'use threads::shared;';
32             }
33              
34 94 50       527 $_has_threads = $INC{'threads.pm'} ? 1 : 0;
35 94 50       380 $_tid = $_has_threads ? threads->tid() : 0;
36 94         814 $_oid = "$$.$_tid";
37              
38 94 50 33     825 if ( $] ge '5.008008' && ! $INC{'PDL.pm'} ) {
39 94     94   8657 eval 'use Sereal::Encoder 3.015; use Sereal::Decoder 3.015;';
  94     94   672  
  94         2751  
  94         6390  
  94         692  
  94         2615  
  94         3458  
40 94 50       541 if ( ! $@ ) {
41 94         1609 my $_encoder_ver = int( Sereal::Encoder->VERSION() );
42 94         1008 my $_decoder_ver = int( Sereal::Decoder->VERSION() );
43 94 50       456 if ( $_encoder_ver - $_decoder_ver == 0 ) {
44 94         380 $_freeze = \&Sereal::Encoder::encode_sereal;
45 94         460 $_thaw = \&Sereal::Decoder::decode_sereal;
46             }
47             }
48             }
49              
50 94 50       2778 if ( ! defined $_freeze ) {
51 0         0 require Storable;
52 0         0 $_freeze = \&Storable::freeze;
53 0         0 $_thaw = \&Storable::thaw;
54             }
55             }
56              
57 94     94   64387 use IO::Handle ();
  94         713269  
  94         3958  
58 94     94   740 use Scalar::Util qw( looks_like_number refaddr reftype weaken );
  94         166  
  94         7818  
59 94     94   57831 use Socket qw( SOL_SOCKET SO_RCVBUF );
  94         652156  
  94         23518  
60 94     94   842 use Time::HiRes qw( sleep time );
  94         273  
  94         833  
61              
62 94     94   57260 use MCE::Util qw( $LF );
  94         297  
  94         14342  
63 94     94   68409 use MCE::Signal ();
  94         372  
  94         3080  
64 94     94   49161 use MCE::Mutex ();
  94         314  
  94         33453  
65              
66             our ($MCE, $RLA, $_que_template, $_que_read_size);
67             our (%_valid_fields_new);
68              
69             my ($TOP_HDLR, $_is_MSWin32, $_is_winenv, $_prev_mce);
70             my (%_valid_fields_task, %_params_allowed_args);
71              
72             BEGIN {
73             ## Configure pack/unpack template for writing to and from the queue.
74             ## Each entry contains 2 positive numbers: chunk_id & msg_id.
75             ## Check for >= 64-bit, otherwize fall back to machine's word length.
76              
77 94     94   815 $_que_template = ( ( log(~0+1) / log(2) ) >= 64 ) ? 'Q2' : 'I2';
78 94         1135 $_que_read_size = length pack($_que_template, 0, 0);
79              
80             ## Attributes used internally.
81             ## _abort_msg _caller _chn _com_lock _dat_lock _mgr_live _rla_data _seed
82             ## _chunk_id _pids _run_mode _single_dim _thrs _tids _task_wid _wid _wuf
83             ## _exiting _exit_pid _last_sref _total_exited _total_running _total_workers
84             ## _send_cnt _sess_dir _spawned _state _status _task _task_id _wrk_status
85             ## _init_pid _init_total_workers _pids_t _pids_w _pids_c _relayed
86             ##
87             ## _bsb_r_sock _bsb_w_sock _com_r_sock _com_w_sock _dat_r_sock _dat_w_sock
88             ## _que_r_sock _que_w_sock _rla_r_sock _rla_w_sock _data_channels
89             ## _lock_chn _mutex_n
90              
91 94         287 %_valid_fields_new = map { $_ => 1 } qw(
  3572         7604  
92             max_workers tmp_dir use_threads user_tasks task_end task_name freeze thaw
93             chunk_size input_data sequence job_delay spawn_delay submit_delay RS
94             flush_file flush_stderr flush_stdout stderr_file stdout_file use_slurpio
95             interval user_args user_begin user_end user_func user_error user_output
96             bounds_only gather init_relay on_post_exit on_post_run parallel_io
97             loop_timeout max_retries progress posix_exit
98             );
99 94         416 %_params_allowed_args = map { $_ => 1 } qw(
  2726         7795  
100             chunk_size input_data sequence job_delay spawn_delay submit_delay RS
101             flush_file flush_stderr flush_stdout stderr_file stdout_file use_slurpio
102             interval user_args user_begin user_end user_func user_error user_output
103             bounds_only gather init_relay on_post_exit on_post_run parallel_io
104             loop_timeout max_retries progress
105             );
106 94         405 %_valid_fields_task = map { $_ => 1 } qw(
  1692         2815  
107             max_workers chunk_size input_data interval sequence task_end task_name
108             bounds_only gather init_relay user_args user_begin user_end user_func
109             RS parallel_io use_slurpio use_threads
110             );
111              
112 94 50       646 $_is_MSWin32 = ( $^O eq 'MSWin32' ) ? 1 : 0;
113 94 50       694 $_is_winenv = ( $^O =~ /mswin|mingw|msys|cygwin/i ) ? 1 : 0;
114              
115             ## Create accessor functions.
116 94     94   686 no strict 'refs'; no warnings 'redefine';
  94     94   162  
  94         24094  
  94         470  
  94         141  
  94         33818  
117              
118 94         370 for my $_p (qw( chunk_size max_retries max_workers task_name user_args )) {
119 470         2386 *{ $_p } = sub () {
120 99 100   99   738 my $self = shift; $self = $MCE unless ref($self);
  99         294  
121 99         623 return $self->{$_p};
122 470         2092 };
123             }
124 94         253 for my $_p (qw( chunk_id seed task_id task_wid wid )) {
125 470         1656 *{ $_p } = sub () {
126 71 100   71   1973 my $self = shift; $self = $MCE unless ref($self);
  71         364  
127 71         1744 return $self->{"_${_p}"};
128 470         2186 };
129             }
130 94         291 for my $_p (qw( freeze thaw )) {
131 188         590 *{ $_p } = sub () {
132 208 100   208   410 my $self = shift; $self = $MCE unless ref($self);
  208         594  
133 208         2232 return $self->{$_p}(@_);
134 188         578 };
135             }
136              
137 94         209 $RLA = {};
138              
139 94         4181 return;
140             }
141              
142             ###############################################################################
143             ## ----------------------------------------------------------------------------
144             ## Import routine.
145             ##
146             ###############################################################################
147              
148 94     94   655 use constant { SELF => 0, CHUNK => 1, CID => 2 };
  94         211  
  94         13031  
149              
150 94     94   60648 our $_MCE_LOCK : shared = 1;
  94         150132  
  94         600  
151 94     94   8498 our $_WIN_LOCK : shared = 1;
  94         530  
  94         496  
152              
153             my ($_def, $_imported) = ({});
154              
155             sub import {
156 116     116   1195 my ($_class, $_pkg) = (shift, caller);
157 116         565 my $_p = $_def->{$_pkg} = {};
158              
159             ## Process module arguments.
160 116         763 while ( my $_argument = shift ) {
161 0         0 my $_arg = lc $_argument;
162              
163 0 0       0 $_p->{MAX_WORKERS} = shift, next if ( $_arg eq 'max_workers' );
164 0 0       0 $_p->{CHUNK_SIZE} = shift, next if ( $_arg eq 'chunk_size' );
165 0 0       0 $_p->{TMP_DIR} = shift, next if ( $_arg eq 'tmp_dir' );
166 0 0       0 $_p->{FREEZE} = shift, next if ( $_arg eq 'freeze' );
167 0 0       0 $_p->{THAW} = shift, next if ( $_arg eq 'thaw' );
168 0 0       0 $_p->{INIT_RELAY} = shift, next if ( $_arg eq 'init_relay' );
169 0 0       0 $_p->{USE_THREADS} = shift, next if ( $_arg eq 'use_threads' );
170              
171 0 0 0     0 if ( $_arg eq 'export_const' || $_arg eq 'const' ) {
172 0 0       0 if ( shift eq '1' ) {
173 94     94   21402 no strict 'refs'; no warnings 'redefine';
  94     94   192  
  94         4081  
  94         488  
  94         184  
  94         42692  
174 0         0 *{ $_pkg.'::SELF' } = \&SELF;
  0         0  
175 0         0 *{ $_pkg.'::CHUNK' } = \&CHUNK;
  0         0  
176 0         0 *{ $_pkg.'::CID' } = \&CID;
  0         0  
177             }
178 0         0 next;
179             }
180              
181             ## Sereal, if available, is used automatically by MCE 1.800 onwards.
182 0 0       0 if ( $_arg eq 'sereal' ) {
183 0 0       0 if ( shift eq '0' ) {
184 0         0 require Storable;
185 0         0 $_p->{FREEZE} = \&Storable::freeze;
186 0         0 $_p->{THAW} = \&Storable::thaw;
187             }
188 0         0 next;
189             }
190              
191 0         0 _croak("Error: ($_argument) invalid module option");
192             }
193              
194 116 100       7185 return if $_imported++;
195              
196             ## Instantiate a module-level instance.
197 94         517 $MCE = MCE->new( _module_instance => 1, max_workers => 0 );
198              
199 94         20639 return;
200             }
201              
202             ###############################################################################
203             ## ----------------------------------------------------------------------------
204             ## Define constants & variables.
205             ##
206             ###############################################################################
207              
208             use constant {
209              
210             # Max data channels. This cannot be greater than 8 on MSWin32.
211 94         124413 DATA_CHANNELS => 8,
212              
213             # Max GC size. Undef variable when exceeding size.
214             MAX_GC_SIZE => 1024 * 1024 * 64,
215              
216             MAX_RECS_SIZE => 8192, # Reads number of records if N <= value
217             # Reads number of bytes if N > value
218              
219             OUTPUT_W_ABT => 'W~ABT', # Worker has aborted
220             OUTPUT_W_DNE => 'W~DNE', # Worker has completed
221             OUTPUT_W_RLA => 'W~RLA', # Worker has relayed
222             OUTPUT_W_EXT => 'W~EXT', # Worker has exited
223             OUTPUT_A_REF => 'A~REF', # Input << Array ref
224             OUTPUT_G_REF => 'G~REF', # Input << Glob ref
225             OUTPUT_H_REF => 'H~REF', # Input << Hash ref
226             OUTPUT_I_REF => 'I~REF', # Input << Iter ref
227             OUTPUT_A_CBK => 'A~CBK', # Callback w/ multiple args
228             OUTPUT_N_CBK => 'N~CBK', # Callback w/ no args
229             OUTPUT_A_GTR => 'A~GTR', # Gather data
230             OUTPUT_O_SND => 'O~SND', # Send >> STDOUT
231             OUTPUT_E_SND => 'E~SND', # Send >> STDERR
232             OUTPUT_F_SND => 'F~SND', # Send >> File
233             OUTPUT_D_SND => 'D~SND', # Send >> File descriptor
234             OUTPUT_B_SYN => 'B~SYN', # Barrier sync - begin
235             OUTPUT_E_SYN => 'E~SYN', # Barrier sync - end
236             OUTPUT_S_IPC => 'S~IPC', # Change to win32 IPC
237             OUTPUT_C_NFY => 'C~NFY', # Chunk ID notification
238             OUTPUT_P_NFY => 'P~NFY', # Progress notification
239             OUTPUT_R_NFY => 'R~NFY', # Relay notification
240             OUTPUT_S_DIR => 'S~DIR', # Make/get sess_dir
241             OUTPUT_T_DIR => 'T~DIR', # Make/get tmp_dir
242             OUTPUT_I_DLY => 'I~DLY', # Interval delay
243              
244             READ_FILE => 0, # Worker reads file handle
245             READ_MEMORY => 1, # Worker reads memory handle
246              
247             REQUEST_ARRAY => 0, # Worker requests next array chunk
248             REQUEST_GLOB => 1, # Worker requests next glob chunk
249             REQUEST_HASH => 2, # Worker requests next hash chunk
250              
251             SENDTO_FILEV1 => 0, # Worker sends to 'file', $a, '/path'
252             SENDTO_FILEV2 => 1, # Worker sends to 'file:/path', $a
253             SENDTO_STDOUT => 2, # Worker sends to STDOUT
254             SENDTO_STDERR => 3, # Worker sends to STDERR
255             SENDTO_FD => 4, # Worker sends to file descriptor
256              
257             WANTS_UNDEF => 0, # Callee wants nothing
258             WANTS_ARRAY => 1, # Callee wants list
259             WANTS_SCALAR => 2, # Callee wants scalar
260 94     94   747 };
  94         181  
261              
262             my $_mce_count = 0;
263              
264             sub CLONE {
265 0 0   0   0 $_tid = threads->tid() if $INC{'threads.pm'};
266             }
267              
268             sub DESTROY {
269 330 0 33 330   48023 CORE::kill('KILL', $$)
270             if ( $_is_MSWin32 && $MCE::Signal::KILLED );
271              
272             $_[0]->shutdown(1)
273 330 0 33     2374 if ( $_[0] && $_[0]->{_spawned} && $_[0]->{_init_pid} eq "$$.$_tid" &&
      33        
      33        
274             !$MCE::Signal::KILLED );
275              
276 330         10896 return;
277             }
278              
279             END {
280 94 100   94   1056496 return unless ( defined $MCE );
281              
282 18 50       208 my $_pid = $MCE->{_is_thread} ? $$ .'.'. threads->tid() : $$;
283 18 50 33     168 $MCE->exit if ( exists $MCE->{_wuf} && $MCE->{_pid} eq $_pid );
284              
285 18         124 _end();
286             }
287              
288             sub _end {
289 91 100   91   2436 MCE::Flow->finish ( 'MCE' ) if $INC{'MCE/Flow.pm'};
290 91 100       590 MCE::Grep->finish ( 'MCE' ) if $INC{'MCE/Grep.pm'};
291 91 100       587 MCE::Loop->finish ( 'MCE' ) if $INC{'MCE/Loop.pm'};
292 91 100       630 MCE::Map->finish ( 'MCE' ) if $INC{'MCE/Map.pm'};
293 91 100       1372 MCE::Step->finish ( 'MCE' ) if $INC{'MCE/Step.pm'};
294 91 100       2531 MCE::Stream->finish ( 'MCE' ) if $INC{'MCE/Stream.pm'};
295              
296 91         443 $MCE = $TOP_HDLR = undef;
297             }
298              
299             ###############################################################################
300             ## ----------------------------------------------------------------------------
301             ## Plugin interface for external modules plugging into MCE, e.g. MCE::Queue.
302             ##
303             ###############################################################################
304              
305             my (%_plugin_function, @_plugin_loop_begin, @_plugin_loop_end);
306             my (%_plugin_list, @_plugin_worker_init);
307              
308             sub _attach_plugin {
309 45     45   167 my $_ext_module = caller;
310              
311 45 50       191 unless (exists $_plugin_list{$_ext_module}) {
312 45         128 $_plugin_list{$_ext_module} = undef;
313              
314 45         101 my $_ext_output_function = $_[0];
315 45         94 my $_ext_output_loop_begin = $_[1];
316 45         80 my $_ext_output_loop_end = $_[2];
317 45         93 my $_ext_worker_init = $_[3];
318              
319 45 50       196 if (ref $_ext_output_function eq 'HASH') {
320 45         82 for my $_p (keys %{ $_ext_output_function }) {
  45         244  
321             $_plugin_function{$_p} = $_ext_output_function->{$_p}
322 545 50       1602 unless (exists $_plugin_function{$_p});
323             }
324             }
325              
326 45 50       269 push @_plugin_loop_begin, $_ext_output_loop_begin
327             if (ref $_ext_output_loop_begin eq 'CODE');
328 45 50       218 push @_plugin_loop_end, $_ext_output_loop_end
329             if (ref $_ext_output_loop_end eq 'CODE');
330 45 100       398 push @_plugin_worker_init, $_ext_worker_init
331             if (ref $_ext_worker_init eq 'CODE');
332             }
333              
334 45         106 @_ = ();
335              
336 45         135 return;
337             }
338              
339             ## Functions for saving and restoring $MCE.
340             ## Called by MCE::{ Flow, Grep, Loop, Map, Step, and Stream }.
341              
342             sub _save_state {
343 187     187   426 $_prev_mce = $MCE; $MCE = $_[0];
  187         506  
344 187         497 return;
345             }
346             sub _restore_state {
347 130     130   692 $_prev_mce->{_wrk_status} = $MCE->{_wrk_status};
348 130         481 $MCE = $_prev_mce; $_prev_mce = undef;
  130         267  
349 130         372 return;
350             }
351              
352             ###############################################################################
353             ## ----------------------------------------------------------------------------
354             ## New instance instantiation.
355             ##
356             ###############################################################################
357              
358             sub _croak {
359 0 0 0 0   0 if (MCE->wid == 0 || ! $^S) {
360 0         0 $SIG{__DIE__} = \&MCE::Signal::_die_handler;
361 0         0 $SIG{__WARN__} = \&MCE::Signal::_warn_handler;
362             }
363 0         0 $\ = undef; goto &Carp::croak;
  0         0  
364             }
365              
366             sub _relay (;&) {
367 0     0   0 goto &MCE::relay;
368             }
369              
370 94     94   54200 use MCE::Core::Validation ();
  94         359  
  94         2995  
371 94     94   58516 use MCE::Core::Manager ();
  94         355  
  94         3257  
372 94     94   64151 use MCE::Core::Worker ();
  94         432  
  94         336765  
373              
374             sub new {
375 342     342 0 5263460 my ($class, %self) = @_;
376 342 100       2305 my $_pkg = exists $self{pkg} ? delete $self{pkg} : caller;
377              
378 342         1052 @_ = ();
379              
380 342   33     2542 bless(\%self, ref($class) || $class);
381              
382 342   100     2004 $self{task_name} ||= 'MCE';
383 342   50     2347 $self{max_workers} ||= $_def->{$_pkg}{MAX_WORKERS} || 1;
      66        
384 342   50     3562 $self{chunk_size} ||= $_def->{$_pkg}{CHUNK_SIZE} || 1;
      33        
385 342   66     5878 $self{tmp_dir} ||= $_def->{$_pkg}{TMP_DIR} || $MCE::Signal::tmp_dir;
      66        
386 342   33     3898 $self{freeze} ||= $_def->{$_pkg}{FREEZE} || $_freeze;
      33        
387 342   33     4085 $self{thaw} ||= $_def->{$_pkg}{THAW} || $_thaw;
      33        
388              
389             $self{init_relay} = $_def->{$_pkg}{INIT_RELAY}
390 342 50       1275 if (exists $_def->{$_pkg}{INIT_RELAY});
391              
392             $self{use_threads} = $_def->{$_pkg}{USE_THREADS}
393 342 50       3839 if (exists $_def->{$_pkg}{USE_THREADS});
394              
395 342 100       1067 if (exists $self{_module_instance}) {
396 94         310 $self{_init_total_workers} = $self{max_workers};
397 94         763 $self{_chunk_id} = $self{_task_wid} = $self{_wrk_status} = 0;
398 94         361 $self{_spawned} = $self{_task_id} = $self{_wid} = 0;
399 94         1041 $self{_init_pid} = "$$.$_tid";
400              
401 94         528 return \%self;
402             }
403              
404 248         1475 _sendto_fhs_close();
405              
406 248         1113 for my $_p (keys %self) {
407             _croak("MCE::new: ($_p) is not a valid constructor argument")
408 1758 50       3939 unless (exists $_valid_fields_new{$_p});
409             }
410              
411 248         3338 $self{_caller} = $_pkg, $self{_init_pid} = "$$.$_tid";
412              
413 248 50       959 if (defined $self{use_threads}) {
414 0 0 0     0 if (!$_has_threads && $self{use_threads}) {
415 0         0 my $_msg = "\n";
416 0         0 $_msg .= "## Please include threads support prior to loading MCE\n";
417 0         0 $_msg .= "## when specifying use_threads => $self{use_threads}\n";
418 0         0 $_msg .= "\n";
419              
420 0         0 _croak($_msg);
421             }
422             }
423             else {
424 248 50       1327 $self{use_threads} = ($_has_threads) ? 1 : 0;
425             }
426              
427 248 50       1507 if (!exists $self{posix_exit}) {
428             $self{posix_exit} = 1 if (
429             $^S || $_tid || $INC{'Mojo/IOLoop.pm'} ||
430             $INC{'Coro.pm'} || $INC{'LWP/UserAgent.pm'} || $INC{'stfl.pm'} ||
431             $INC{'Curses.pm'} || $INC{'CGI.pm'} || $INC{'FCGI.pm'} ||
432             $INC{'Tk.pm'} || $INC{'Wx.pm'} || $INC{'Win32/GUI.pm'} ||
433 248 50 33     10301 $INC{'Gearman/Util.pm'} || $INC{'Gearman/XS.pm'}
      33        
      33        
      33        
      33        
      33        
      33        
      33        
      33        
      33        
      33        
      33        
      33        
434             );
435             }
436              
437             ## -------------------------------------------------------------------------
438             ## Validation.
439              
440 248 100       1083 if (defined $self{tmp_dir}) {
441             _croak("MCE::new: ($self{tmp_dir}) is not a directory or does not exist")
442 42 50       1124 unless (-d $self{tmp_dir});
443             _croak("MCE::new: ($self{tmp_dir}) is not writeable")
444 42 50       760 unless (-w $self{tmp_dir});
445             }
446              
447 248 100       1664 if (defined $self{user_tasks}) {
448             _croak('MCE::new: (user_tasks) is not an ARRAY reference')
449 108 50       517 unless (ref $self{user_tasks} eq 'ARRAY');
450              
451 108         482 $self{max_workers} = _parse_max_workers($self{max_workers});
452             $self{init_relay} = $self{user_tasks}->[0]->{init_relay}
453 108 50       890 if ($self{user_tasks}->[0]->{init_relay});
454              
455 108         203 for my $_task (@{ $self{user_tasks} }) {
  108         376  
456 164         242 for my $_p (keys %{ $_task }) {
  164         555  
457             _croak("MCE::new: ($_p) is not a valid task constructor argument")
458 548 50       1111 unless (exists $_valid_fields_task{$_p});
459             }
460 164 50       315 $_task->{max_workers} = 0 unless scalar(keys %{ $_task });
  164         442  
461              
462             $_task->{max_workers} = $self{max_workers}
463 164 100       518 unless (defined $_task->{max_workers});
464             $_task->{use_threads} = $self{use_threads}
465 164 50       743 unless (defined $_task->{use_threads});
466              
467 164   50     823 bless($_task, ref(\%self) || \%self);
468             }
469             }
470              
471 248         1438 _validate_args(\%self);
472              
473             ## -------------------------------------------------------------------------
474             ## Private options. Limit chunk_size.
475              
476 248         455 my $_run_lock;
477              
478 248         805 $self{_chunk_id} = 0; # Chunk ID
479 248         572 $self{_send_cnt} = 0; # Number of times data was sent via send
480 248         747 $self{_spawned} = 0; # Have workers been spawned
481 248         588 $self{_task_id} = 0; # Task ID, starts at 0 (array index)
482 248         526 $self{_task_wid} = 0; # Task Worker ID, starts at 1 per task
483 248         560 $self{_wid} = 0; # Worker ID, starts at 1 per MCE instance
484 248         519 $self{_wrk_status} = 0; # For saving exit status when worker exits
485              
486 248 50       722 $self{_run_lock} = threads::shared::share($_run_lock) if $_is_MSWin32;
487              
488             $self{_last_sref} = (ref $self{input_data} eq 'SCALAR')
489 248 50       949 ? refaddr($self{input_data}) : 0;
490              
491             my $_data_channels = ("$$.$_tid" eq $_oid)
492 248 50       2891 ? ( $INC{'MCE/Channel.pm'} ? 4 : DATA_CHANNELS )
    50          
493             : 2;
494              
495 248         476 my $_total_workers = 0;
496              
497 248 100       736 if (defined $self{user_tasks}) {
498 108         190 $_total_workers += $_->{max_workers} for @{ $self{user_tasks} };
  108         524  
499             } else {
500 140         386 $_total_workers = $self{max_workers};
501             }
502              
503 248         676 $self{_init_total_workers} = $_total_workers;
504              
505 248 100       1096 $self{_data_channels} = ($_total_workers < $_data_channels)
506             ? $_total_workers : $_data_channels;
507              
508 248 100       958 $self{_lock_chn} = ($_total_workers > $self{_data_channels}) ? 1 : 0;
509 248 50 33     1745 $self{_lock_chn} = 1 if $INC{'MCE/Child.pm'} || $INC{'MCE/Hobo.pm'};
510              
511 248 50       1164 if ($MCE->{_wid} == 0) {
512 248         894 $MCE = \%self;
513 248 50       994 weaken $MCE if (defined wantarray);
514             }
515              
516 248         1789 return \%self;
517             }
518              
519             ###############################################################################
520             ## ----------------------------------------------------------------------------
521             ## Spawn method.
522             ##
523             ###############################################################################
524              
525             sub spawn {
526 140 50   140 0 291 my $self = shift; $self = $MCE unless ref($self);
  140         450  
527              
528 140         283 local $_; @_ = ();
  140         418  
529              
530             _croak('MCE::spawn: method is not allowed by the worker process')
531 140 50       622 if ($self->{_wid});
532              
533             ## Return if workers have already been spawned or if module instance.
534 140 50 33     1077 return $self if ($self->{_spawned} || exists $self->{_module_instance});
535              
536 140 50       673 lock $_WIN_LOCK if $_is_MSWin32; # Obtain locks
537 140 50 33     868 lock $_MCE_LOCK if $_has_threads && $_is_winenv;
538              
539 140 0 33     383 $MCE::_GMUTEX->lock() if ($_tid && $MCE::_GMUTEX);
540 140 50       466 sleep 0.015 if $_tid;
541              
542 140         679 _sendto_fhs_close();
543              
544 140 50       608 if ($INC{'PDL.pm'}) { local $@;
  0         0  
545             # PDL::IO::Storable is required for serializing piddles.
546 0 0       0 eval 'use PDL::IO::Storable' unless $INC{'PDL/IO/Storable.pm'};
547             # PDL data should not be naively copied in new threads.
548 0         0 eval 'no warnings; sub PDL::CLONE_SKIP { 1 }';
549             # Disable PDL auto-threading.
550 0         0 eval q{ PDL::set_autopthread_targ(1) };
551             }
552 140 50 33     608 if ( $INC{'LWP/UserAgent.pm'} && !$INC{'Net/HTTP.pm'} ) {
553 0         0 local $@; eval 'require Net::HTTP; require Net::HTTPS';
  0         0  
554             }
555              
556             ## Start the shared-manager process if not running.
557 140 50       918 MCE::Shared->start() if $INC{'MCE/Shared.pm'};
558              
559             ## Load input module.
560 140 50       711 if (defined $self->{sequence}) {
    100          
561             require MCE::Core::Input::Sequence
562 0 0       0 unless $INC{'MCE/Core/Input/Sequence.pm'};
563             }
564             elsif (defined $self->{input_data}) {
565 54         147 my $_ref = ref $self->{input_data};
566 54 50       295 if ($_ref =~ /^(?:ARRAY|HASH|GLOB|FileHandle|IO::)/) {
    0          
567             require MCE::Core::Input::Request
568 54 100       26726 unless $INC{'MCE/Core/Input/Request.pm'};
569             }
570             elsif ($_ref =~ /^(?:CODE|Iterator::)/) {
571             require MCE::Core::Input::Iterator
572 0 0       0 unless $INC{'MCE/Core/Input/Iterator.pm'};
573             }
574             else {
575             require MCE::Core::Input::Handle
576 0 0       0 unless $INC{'MCE/Core/Input/Handle.pm'};
577             }
578             }
579              
580 140         792 my $_die_handler = $SIG{__DIE__};
581 140         306 my $_warn_handler = $SIG{__WARN__};
582              
583 140         1758 $SIG{__DIE__} = \&MCE::Signal::_die_handler;
584 140         1042 $SIG{__WARN__} = \&MCE::Signal::_warn_handler;
585              
586 140 50 33     847 if (!defined $TOP_HDLR || (!$TOP_HDLR->{_mgr_live} && !$TOP_HDLR->{_wid})) {
    0          
587             ## On Windows, must shutdown the last idle MCE session.
588 140 0 33     486 if ($_is_MSWin32 && defined $TOP_HDLR && $TOP_HDLR->{_spawned}) {
      33        
589 0         0 $TOP_HDLR->shutdown(1);
590             }
591 140         393 weaken($TOP_HDLR = $self);
592             }
593             elsif (refaddr($self) != refaddr($TOP_HDLR)) {
594             ## Reduce the maximum number of channels for nested sessions.
595 0 0       0 $self->{_data_channels} = 4 if ($self->{_data_channels} > 4);
596 0 0       0 $self->{_lock_chn} = 1 if ($self->{_init_total_workers} > 4);
597              
598             ## On Windows, instruct the manager process to enable win32 IPC.
599 0 0 0     0 if ($_is_MSWin32 && $ENV{'PERL_MCE_IPC'} ne 'win32') {
600 0         0 $ENV{'PERL_MCE_IPC'} = 'win32'; local $\ = undef;
  0         0  
601 0         0 my $_DAT_W_SOCK = $TOP_HDLR->{_dat_w_sock}->[0];
602 0         0 print {$_DAT_W_SOCK} OUTPUT_S_IPC.$LF . '0'.$LF;
  0         0  
603              
604 0         0 MCE::Util::_sock_ready($_DAT_W_SOCK, -1);
605 0         0 MCE::Util::_sysread($_DAT_W_SOCK, my($_buf), 1);
606             }
607             }
608              
609             ## -------------------------------------------------------------------------
610              
611 140         583 my $_data_channels = $self->{_data_channels};
612 140         1011 my $_max_workers = _get_max_workers($self);
613 140         296 my $_use_threads = $self->{use_threads};
614              
615             ## Create [ 0 including 1 up to 8 ] locks for data channels (max 9).
616 140         1668 $self->{'_mutex_0'} = MCE::Mutex->new( impl => 'Channel' );
617              
618 140 50       678 if ($self->{_lock_chn}) {
619             $self->{'_mutex_'.$_} = MCE::Mutex->new( impl => 'Channel' )
620 0         0 for (1 .. $_data_channels);
621             }
622              
623             ## Create two locks for use by MCE::Core::Input::{ Handle or Sequence }.
624 140         1119 $self->{'_mutex_'.$_} = MCE::Mutex->new( impl => 'Channel' ) for (10 .. 11);
625              
626             ## Create sockets for IPC. sync, comm, input, data
627 140         860 MCE::Util::_sock_pair($self, qw(_bsb_r_sock _bsb_w_sock), undef, 1);
628 140         490 MCE::Util::_sock_pair($self, qw(_com_r_sock _com_w_sock), undef, 1);
629 140         556 MCE::Util::_sock_pair($self, qw(_que_r_sock _que_w_sock), undef, 1);
630 140         508 MCE::Util::_sock_pair($self, qw(_dat_r_sock _dat_w_sock), 0);
631              
632             MCE::Util::_sock_pair($self, qw(_dat_r_sock _dat_w_sock), $_, 1)
633 140         1595 for (1 .. $_data_channels);
634              
635 140 50       1560 if ($^O !~ /linux|android|aix/) {
636 0         0 setsockopt($self->{_dat_r_sock}->[0], SOL_SOCKET, SO_RCVBUF, pack('i', 4096));
637             }
638              
639 140 100       626 if (defined $self->{init_relay}) { # relay
640 24 100       80 unless ($INC{'MCE/Relay.pm'}) {
641 9         7713 require MCE::Relay; MCE::Relay->import();
  9         63  
642             }
643             MCE::Util::_sock_pair($self, qw(_rla_r_sock _rla_w_sock), $_, 1)
644 24         148 for (0 .. $_max_workers - 1);
645             }
646              
647 140         4487 $self->{_seed} = int(CORE::rand() * 1e9);
648              
649             ## -------------------------------------------------------------------------
650              
651             ## Spawn workers.
652 140         765 $self->{_pids} = [], $self->{_thrs} = [], $self->{_tids} = [];
653 140         703 $self->{_status} = [], $self->{_state} = [], $self->{_task} = [];
654              
655 140 50 33     685 if ($self->{loop_timeout} && !$_is_MSWin32) {
656 0         0 $self->{_pids_t} = {}, $self->{_pids_w} = {};
657             }
658              
659 140 50       4438 local $SIG{TTIN}, local $SIG{TTOU}, local $SIG{WINCH} unless $_is_MSWin32;
660              
661 140 100       605 if (!defined $self->{user_tasks}) {
662 44         111 $self->{_total_workers} = $_max_workers;
663              
664 44 50 33     316 if (defined $_use_threads && $_use_threads == 1) {
665 0         0 _dispatch_thread($self, $_) for (1 .. $_max_workers);
666             } else {
667 44         480 _dispatch_child($self, $_) for (1 .. $_max_workers);
668             }
669              
670 21         2226 $self->{_task}->[0] = { _total_workers => $_max_workers };
671              
672 21         673 for my $_i (1 .. $_max_workers) {
673 65         1997 $self->{_state}->[$_i] = {
674             _task => undef, _task_id => undef, _task_wid => undef,
675             _params => undef, _chn => $_i % $_data_channels + 1
676             }
677             }
678             }
679             else {
680 96         220 my ($_task_id, $_wid);
681              
682 96         401 $self->{_total_workers} = 0;
683 96         179 $self->{_total_workers} += $_->{max_workers} for @{ $self->{user_tasks} };
  96         520  
684              
685             # Must spawn processes first for extra stability on BSD/Darwin.
686 96         217 $_task_id = $_wid = 0;
687              
688 96         174 for my $_task (@{ $self->{user_tasks} }) {
  96         385  
689 119         496 my $_tsk_use_threads = $_task->{use_threads};
690              
691 119 50 33     1434 if (defined $_tsk_use_threads && $_tsk_use_threads == 1) {
692 0         0 $_wid += $_task->{max_workers};
693             } else {
694             _dispatch_child($self, ++$_wid, $_task, $_task_id, $_)
695 119         1820 for (1 .. $_task->{max_workers});
696             }
697              
698 69         6660 $_task_id++;
699             }
700              
701             # Then, spawn threads last.
702 46         1180 $_task_id = $_wid = 0;
703              
704 46         605 for my $_task (@{ $self->{user_tasks} }) {
  46         780  
705 57         1094 my $_tsk_use_threads = $_task->{use_threads};
706              
707 57 50 33     3283 if (defined $_tsk_use_threads && $_tsk_use_threads == 1) {
708             _dispatch_thread($self, ++$_wid, $_task, $_task_id, $_)
709 0         0 for (1 .. $_task->{max_workers});
710             } else {
711 57         497 $_wid += $_task->{max_workers};
712             }
713              
714 57         556 $_task_id++;
715             }
716              
717             # Save state.
718 46         340 $_task_id = $_wid = 0;
719              
720 46         443 for my $_task (@{ $self->{user_tasks} }) {
  46         563  
721             $self->{_task}->[$_task_id] = {
722             _total_running => 0, _total_workers => $_task->{max_workers}
723 57         3809 };
724 57         1026 for my $_i (1 .. $_task->{max_workers}) {
725 105         596 $_wid += 1;
726 105         4309 $self->{_state}->[$_wid] = {
727             _task => $_task, _task_id => $_task_id, _task_wid => $_i,
728             _params => undef, _chn => $_wid % $_data_channels + 1
729             }
730             }
731              
732 57         322 $_task_id++;
733             }
734             }
735              
736             ## -------------------------------------------------------------------------
737              
738 67         1135 $self->{_send_cnt} = 0, $self->{_spawned} = 1;
739              
740 67         1908 $SIG{__DIE__} = $_die_handler;
741 67         1163 $SIG{__WARN__} = $_warn_handler;
742              
743 67 50 33     4945 $MCE = $self if ($MCE->{_wid} == 0 && refaddr($MCE) != refaddr($self));
744              
745 67 0 33     844 $MCE::_GMUTEX->unlock() if ($_tid && $MCE::_GMUTEX);
746              
747 67         14500 return $self;
748             }
749              
750             ###############################################################################
751             ## ----------------------------------------------------------------------------
752             ## Process method, relay stubs, and AUTOLOAD for methods not used often.
753             ##
754             ###############################################################################
755              
756             sub process {
757 106 50   106 0 3880 my $self = shift; $self = $MCE unless ref($self);
  106         320  
758              
759 106         635 _validate_runstate($self, 'MCE::process');
760              
761 106         274 my ($_params_ref, $_input_data);
762              
763 106 100 100     1074 if (ref $_[0] eq 'HASH' && ref $_[1] eq 'HASH') {
    100          
764 6         16 $_params_ref = $_[0], $_input_data = $_[1];
765             } elsif (ref $_[0] eq 'HASH') {
766 95         327 $_params_ref = $_[0], $_input_data = $_[1];
767             } else {
768 5         9 $_params_ref = $_[1], $_input_data = $_[0];
769             }
770              
771 106         206 @_ = ();
772              
773             ## Set input data.
774 106 50 0     270 if (defined $_input_data) {
    0          
775 106         269 $_params_ref->{input_data} = $_input_data;
776             }
777             elsif ( !defined $_params_ref->{input_data} &&
778             !defined $_params_ref->{sequence} ) {
779 0         0 _croak('MCE::process: (input_data or sequence) is not specified');
780             }
781              
782             ## Pass 0 to "not" auto-shutdown after processing.
783 106         527 $self->run(0, $_params_ref);
784              
785 79         415 return $self;
786             }
787              
788             sub relay (;&) {
789             _croak('MCE::relay: (init_relay) is not defined')
790 0 0   0 0   unless (defined $MCE->{init_relay});
791             }
792              
793             {
794 94     94   1277 no warnings 'once';
  94         270  
  94         642724  
795             *relay_unlock = \&relay;
796             }
797              
798             sub AUTOLOAD {
799             # $AUTOLOAD = MCE::<method_name>
800              
801 0     0   0 my $_fcn = substr($MCE::AUTOLOAD, 5);
802 0 0       0 my $self = shift; $self = $MCE unless ref($self);
  0         0  
803              
804             # "for" sugar methods
805              
806 0 0       0 if ($_fcn eq 'forchunk') {
    0          
    0          
807 0 0       0 require MCE::Candy unless $INC{'MCE/Candy.pm'};
808 0         0 return MCE::Candy::forchunk($self, @_);
809             }
810             elsif ($_fcn eq 'foreach') {
811 0 0       0 require MCE::Candy unless $INC{'MCE/Candy.pm'};
812 0         0 return MCE::Candy::foreach($self, @_);
813             }
814             elsif ($_fcn eq 'forseq') {
815 0 0       0 require MCE::Candy unless $INC{'MCE/Candy.pm'};
816 0         0 return MCE::Candy::forseq($self, @_);
817             }
818              
819             # relay stubs for MCE::Relay
820              
821 0 0 0     0 if ($_fcn eq 'relay_lock' || $_fcn eq 'relay_recv') {
    0          
822             _croak('MCE::relay: (init_relay) is not defined')
823 0 0       0 unless (defined $MCE->{init_relay});
824             }
825             elsif ($_fcn eq 'relay_final') {
826 0         0 return;
827             }
828              
829             # worker immediately exits the chunking loop
830              
831 0 0       0 if ($_fcn eq 'last') {
    0          
    0          
    0          
832             _croak('MCE::last: method is not allowed by the manager process')
833 0 0       0 unless ($self->{_wid});
834              
835 0 0       0 $self->{_last_jmp}() if (defined $self->{_last_jmp});
836              
837 0         0 return;
838             }
839              
840             # worker starts the next iteration of the chunking loop
841              
842             elsif ($_fcn eq 'next') {
843             _croak('MCE::next: method is not allowed by the manager process')
844 0 0       0 unless ($self->{_wid});
845              
846 0 0       0 $self->{_next_jmp}() if (defined $self->{_next_jmp});
847              
848 0         0 return;
849             }
850              
851             # return the process ID, include thread ID for threads
852              
853             elsif ($_fcn eq 'pid') {
854 0 0 0     0 if (defined $self->{_pid}) {
    0          
855 0         0 return $self->{_pid};
856             } elsif ($_has_threads && $self->{use_threads}) {
857 0         0 return $$ .'.'. threads->tid();
858             }
859 0         0 return $$;
860             }
861              
862             # return the exit status
863             # _wrk_status holds the greatest exit status among workers exiting
864              
865             elsif ($_fcn eq 'status') {
866             _croak('MCE::status: method is not allowed by the worker process')
867 0 0       0 if ($self->{_wid});
868              
869 0 0       0 return (defined $self->{_wrk_status}) ? $self->{_wrk_status} : 0;
870             }
871              
872 0         0 _croak("Can't locate object method \"$_fcn\" via package \"MCE\"");
873             }
874              
875             ###############################################################################
876             ## ----------------------------------------------------------------------------
877             ## Restart worker method.
878             ##
879             ###############################################################################
880              
881             sub restart_worker {
882 0 0   0 0 0 my $self = shift; $self = $MCE unless ref($self);
  0         0  
883              
884 0         0 @_ = ();
885              
886             _croak('MCE::restart_worker: method is not allowed by the worker process')
887 0 0       0 if ($self->{_wid});
888              
889 0         0 my $_wid = $self->{_exited_wid};
890              
891 0         0 my $_params = $self->{_state}->[$_wid]->{_params};
892 0         0 my $_task_wid = $self->{_state}->[$_wid]->{_task_wid};
893 0         0 my $_task_id = $self->{_state}->[$_wid]->{_task_id};
894 0         0 my $_task = $self->{_state}->[$_wid]->{_task};
895 0         0 my $_chn = $self->{_state}->[$_wid]->{_chn};
896              
897 0         0 $_params->{_chn} = $_chn;
898              
899             my $_use_threads = (defined $_task_id)
900 0 0       0 ? $_task->{use_threads} : $self->{use_threads};
901              
902 0 0       0 $self->{_task}->[$_task_id]->{_total_running} += 1 if (defined $_task_id);
903 0 0       0 $self->{_task}->[$_task_id]->{_total_workers} += 1 if (defined $_task_id);
904              
905 0         0 $self->{_total_running} += 1;
906 0         0 $self->{_total_workers} += 1;
907              
908 0 0 0     0 if (defined $_use_threads && $_use_threads == 1) {
909 0         0 _dispatch_thread($self, $_wid, $_task, $_task_id, $_task_wid, $_params);
910             } else {
911 0         0 _dispatch_child($self, $_wid, $_task, $_task_id, $_task_wid, $_params);
912             }
913              
914 0         0 delete $self->{_retry_cnt};
915              
916 0 0 0     0 if (defined $self->{spawn_delay} && $self->{spawn_delay} > 0.0) {
    0 0        
917 0         0 sleep $self->{spawn_delay};
918             } elsif ($_tid || $_is_MSWin32) {
919 0         0 sleep 0.045;
920             }
921              
922 0         0 return;
923             }
924              
925             ###############################################################################
926             ## ----------------------------------------------------------------------------
927             ## Run method.
928             ##
929             ###############################################################################
930              
931             sub run {
932 221 50   221 0 688 my $self = shift; $self = $MCE unless ref($self);
  221         809  
933              
934             _croak('MCE::run: method is not allowed by the worker process')
935 221 50       831 if ($self->{_wid});
936              
937 221         440 my ($_auto_shutdown, $_params_ref);
938              
939 221 100       838 if (ref $_[0] eq 'HASH') {
940 86 50       295 $_auto_shutdown = (defined $_[1]) ? $_[1] : 1;
941 86         159 $_params_ref = $_[0];
942             } else {
943 135 100       552 $_auto_shutdown = (defined $_[0]) ? $_[0] : 1;
944 135         345 $_params_ref = $_[1];
945             }
946              
947 221         460 @_ = ();
948              
949 221 100       1326 my $_has_user_tasks = (defined $self->{user_tasks}) ? 1 : 0;
950 221         441 my $_requires_shutdown = 0;
951              
952             ## Unset params if workers have already been sent user_data via send.
953             ## Set user_func to NOOP if not specified.
954              
955 221 50       1503 $_params_ref = undef if ($self->{_send_cnt});
956              
957 221 50 66     1828 if (!defined $self->{user_func} && !defined $_params_ref->{user_func}) {
958 96         670 $self->{user_func} = \&MCE::Signal::_NOOP;
959             }
960              
961             ## Set user specified params if specified.
962             ## Shutdown workers if determined by _sync_params or if processing a
963             ## scalar reference. Workers need to be restarted in order to pick up
964             ## on the new code or scalar reference.
965              
966 221 100 66     1692 if (defined $_params_ref && ref $_params_ref eq 'HASH') {
967 192         1578 $_requires_shutdown = _sync_params($self, $_params_ref);
968 192         995 _validate_args($self);
969             }
970 221 100       902 if ($_has_user_tasks) {
971             $self->{input_data} = $self->{user_tasks}->[0]->{input_data}
972 159 50       604 if ($self->{user_tasks}->[0]->{input_data});
973             $self->{use_slurpio} = $self->{user_tasks}->[0]->{use_slurpio}
974 159 50       647 if ($self->{user_tasks}->[0]->{use_slurpio});
975             $self->{parallel_io} = $self->{user_tasks}->[0]->{parallel_io}
976 159 50       708 if ($self->{user_tasks}->[0]->{parallel_io});
977             $self->{RS} = $self->{user_tasks}->[0]->{RS}
978 159 50       617 if ($self->{user_tasks}->[0]->{RS});
979             }
980 221 50       954 if (ref $self->{input_data} eq 'SCALAR') {
981 0 0       0 if (refaddr($self->{input_data}) != $self->{_last_sref}) {
982 0         0 $_requires_shutdown = 1;
983             }
984 0         0 $self->{_last_sref} = refaddr($self->{input_data});
985             }
986              
987 221 50       555 $self->shutdown() if ($_requires_shutdown);
988              
989             ## -------------------------------------------------------------------------
990              
991 221         513 $self->{_wrk_status} = 0;
992              
993             ## Spawn workers.
994 221 100       1358 $self->spawn() unless ($self->{_spawned});
995 148 50       1544 return $self unless ($self->{_total_workers});
996              
997 148         1697 local $SIG{__DIE__} = \&MCE::Signal::_die_handler;
998 148         894 local $SIG{__WARN__} = \&MCE::Signal::_warn_handler;
999              
1000 148 50 33     12273 $MCE = $self if ($MCE->{_wid} == 0 && refaddr($MCE) != refaddr($self));
1001              
1002 148         1052 my ($_input_data, $_input_file, $_input_glob, $_seq);
1003 148         0 my ($_abort_msg, $_first_msg, $_run_mode, $_single_dim);
1004 148         825 my $_chunk_size = $self->{chunk_size};
1005              
1006             $_seq = ($_has_user_tasks && $self->{user_tasks}->[0]->{sequence})
1007             ? $self->{user_tasks}->[0]->{sequence}
1008 148 50 66     2188 : $self->{sequence};
1009              
1010             ## Determine run mode for workers.
1011 148 100       1599 if (defined $_seq) {
    100          
1012             my ($_begin, $_end, $_step) = (ref $_seq eq 'ARRAY')
1013 18 50       75 ? @{ $_seq } : ($_seq->{begin}, $_seq->{end}, $_seq->{step});
  18         83  
1014              
1015             $_chunk_size = $self->{user_tasks}->[0]->{chunk_size}
1016 18 50 66     264 if ($_has_user_tasks && $self->{user_tasks}->[0]->{chunk_size});
1017              
1018 18         41 $_run_mode = 'sequence';
1019 18         90 $_abort_msg = int(($_end - $_begin) / $_step / $_chunk_size); # + 1;
1020              
1021             # Previously + 1 above. Below, support for large numbers, 1e16 and beyond.
1022             # E.g. sequence => [ 1, 1e16 ], chunk_size => 1e11
1023             #
1024             # Perl: int((1e15 - 1) / 1 / 1e11) = 9999
1025             # Perl: int((1e16 - 1) / 1 / 1e11) = 100000 wrong, due to precision limit
1026             # Calc: int((1e16 - 1) / 1 / 1e11) = 99999
1027              
1028 18 50       42 if ( $_step > 0 ) {
1029 18 50       135 $_abort_msg++
1030             if ($_abort_msg * $_chunk_size * abs($_step) + $_begin <= $_end);
1031             } else {
1032 0 0       0 $_abort_msg++
1033             if ($_abort_msg * $_chunk_size * abs($_step) + $_end <= $_begin);
1034             }
1035              
1036 18         37 $_first_msg = 0;
1037             }
1038             elsif (defined $self->{input_data}) {
1039 84         564 my $_ref = ref $self->{input_data};
1040              
1041 84 100       917 if ($_ref eq '') { # File mode
    100          
    100          
    50          
    0          
    0          
1042 18         452 $_run_mode = 'file';
1043 18         46 $_input_file = $self->{input_data};
1044 18         40 $_input_data = $_input_glob = undef;
1045 18         391 $_abort_msg = (-s $_input_file) + 1;
1046 18         49 $_first_msg = 0; ## Begin at offset position
1047              
1048 18 50       188 if ((-s $_input_file) == 0) {
1049 0 0       0 $self->shutdown() if ($_auto_shutdown == 1);
1050 0         0 return $self;
1051             }
1052             }
1053             elsif ($_ref eq 'ARRAY') { # Array mode
1054 45         403 $_run_mode = 'array';
1055 45         316 $_input_data = $self->{input_data};
1056 45         232 $_input_file = $_input_glob = undef;
1057 45 50       257 $_single_dim = 1 if (ref $_input_data->[0] eq '');
1058 45         184 $_abort_msg = 0; ## Flag: Has Data: No
1059 45         247 $_first_msg = 1; ## Flag: Has Data: Yes
1060              
1061 45 50       123 if (@{ $_input_data } == 0) {
  45         396  
1062 0 0       0 $self->shutdown() if ($_auto_shutdown == 1);
1063 0         0 return $self;
1064             }
1065             }
1066             elsif ($_ref eq 'HASH') { # Hash mode
1067 3         101 $_run_mode = 'hash';
1068 3         106 $_input_data = $self->{input_data};
1069 3         28 $_input_file = $_input_glob = undef;
1070 3         58 $_abort_msg = 0; ## Flag: Has Data: No
1071 3         118 $_first_msg = 1; ## Flag: Has Data: Yes
1072              
1073 3 50       42 if (scalar( keys %{ $_input_data } ) == 0) {
  3         114  
1074 0 0       0 $self->shutdown() if ($_auto_shutdown == 1);
1075 0         0 return $self;
1076             }
1077             }
1078             elsif ($_ref =~ /^(?:GLOB|FileHandle|IO::)/) { # Glob mode
1079 18         224 $_run_mode = 'glob';
1080 18         52 $_input_glob = $self->{input_data};
1081 18         48 $_input_data = $_input_file = undef;
1082 18         38 $_abort_msg = 0; ## Flag: Has Data: No
1083 18         957 $_first_msg = 1; ## Flag: Has Data: Yes
1084             }
1085             elsif ($_ref =~ /^(?:CODE|Iterator::)/) { # Iterator mode
1086 0         0 $_run_mode = 'iterator';
1087 0         0 $_input_data = $self->{input_data};
1088 0         0 $_input_file = $_input_glob = undef;
1089 0         0 $_abort_msg = 0; ## Flag: Has Data: No
1090 0         0 $_first_msg = 1; ## Flag: Has Data: Yes
1091             }
1092             elsif ($_ref eq 'SCALAR') { # Memory mode
1093 0         0 $_run_mode = 'memory';
1094 0         0 $_input_data = $_input_file = $_input_glob = undef;
1095 0         0 $_abort_msg = length(${ $self->{input_data} }) + 1;
  0         0  
1096 0         0 $_first_msg = 0; ## Begin at offset position
1097              
1098 0 0       0 if (length(${ $self->{input_data} }) == 0) {
  0         0  
1099 0 0       0 return $self->shutdown() if ($_auto_shutdown == 1);
1100             }
1101             }
1102             else {
1103 0         0 _croak('MCE::run: (input_data) is not valid');
1104             }
1105             }
1106             else { # Nodata mode
1107 46         1023 $_abort_msg = undef, $_run_mode = 'nodata';
1108             }
1109              
1110             ## -------------------------------------------------------------------------
1111              
1112 148         1154 my $_total_workers = $self->{_total_workers};
1113 148         790 my $_send_cnt = $self->{_send_cnt};
1114              
1115 148 50       562 if ($_send_cnt) {
1116 0         0 $self->{_total_running} = $_send_cnt;
1117 0         0 $self->{_task}->[0]->{_total_running} = $_send_cnt;
1118             }
1119             else {
1120 148         1546 $self->{_total_running} = $_total_workers;
1121              
1122 148         416 my ($_frozen_nodata, $_wid, %_params_nodata, %_task0_wids);
1123 148         589 my $_COM_R_SOCK = $self->{_com_r_sock};
1124 148         514 my $_submit_delay = $self->{submit_delay};
1125              
1126             my %_params = (
1127             '_abort_msg' => $_abort_msg, '_chunk_size' => $_chunk_size,
1128             '_input_file' => $_input_file, '_run_mode' => $_run_mode,
1129             '_bounds_only' => $self->{bounds_only},
1130             '_max_retries' => $self->{max_retries},
1131             '_parallel_io' => $self->{parallel_io},
1132             '_progress' => $self->{progress} ? 1 : 0,
1133             '_sequence' => $self->{sequence},
1134             '_user_args' => $self->{user_args},
1135             '_use_slurpio' => $self->{use_slurpio},
1136             '_RS' => $self->{RS}
1137 148 50       8064 );
1138              
1139 148         6873 my $_frozen_params = $self->{freeze}(\%_params);
1140 148         930 $_frozen_params = length($_frozen_params).$LF . $_frozen_params;
1141              
1142 148 100       744 if ($_has_user_tasks) {
1143 109         2394 %_params_nodata = ( %_params,
1144             '_abort_msg' => undef, '_run_mode' => 'nodata'
1145             );
1146 109         1993 $_frozen_nodata = $self->{freeze}(\%_params_nodata);
1147 109         475 $_frozen_nodata = length($_frozen_nodata).$LF . $_frozen_nodata;
1148              
1149 109         291 for my $_t (@{ $self->{_task} }) {
  109         1036  
1150 181         581 $_t->{_total_running} = $_t->{_total_workers};
1151             }
1152 109         218 for my $_i (1 .. @{ $self->{_state} } - 1) {
  109         481  
1153 351 100       6320 $_task0_wids{$_i} = undef unless ($self->{_state}[$_i]{_task_id});
1154             }
1155             }
1156              
1157 148         2036 local $\ = undef; local $/ = $LF;
  148         2825  
1158              
1159             ## Insert the first message into the queue if defined.
1160 148 100       730 if (defined $_first_msg) {
1161 102         2483 syswrite($self->{_que_w_sock}, pack($_que_template, 0, $_first_msg));
1162             }
1163              
1164             ## Submit params data to workers.
1165 148         812 for my $_i (1 .. $_total_workers) {
1166 452         674 print({$_COM_R_SOCK} $_i.$LF), chomp($_wid = <$_COM_R_SOCK>);
  452         305011  
1167              
1168 452 100 100     5327 if (!$_has_user_tasks || exists $_task0_wids{$_wid}) {
1169 306         591 print({$_COM_R_SOCK} $_frozen_params), <$_COM_R_SOCK>;
  306         137903  
1170 306         3243 $self->{_state}[$_wid]{_params} = \%_params;
1171             } else {
1172 146         253 print({$_COM_R_SOCK} $_frozen_nodata), <$_COM_R_SOCK>;
  146         27620  
1173 146         1146 $self->{_state}[$_wid]{_params} = \%_params_nodata;
1174             }
1175              
1176 452 50 33     3510 sleep $_submit_delay
1177             if defined($_submit_delay) && $_submit_delay > 0.0;
1178             }
1179             }
1180              
1181             ## -------------------------------------------------------------------------
1182              
1183 148         860 $self->{_total_exited} = 0;
1184              
1185             ## Call the output function.
1186 148 50       724 if ($self->{_total_running} > 0) {
1187 148         1301 $self->{_mgr_live} = 1;
1188 148         389 $self->{_abort_msg} = $_abort_msg;
1189 148         1074 $self->{_single_dim} = $_single_dim;
1190              
1191 148 50       540 lock $self->{_run_lock} if $_is_MSWin32;
1192              
1193 148 50       603 if (!$_send_cnt) {
1194             ## Notify workers to commence processing.
1195 148 50       507 if ($_is_MSWin32) {
1196 0         0 my $_buf = _sprintf("%${_total_workers}s", "");
1197 0         0 syswrite($self->{_bsb_r_sock}, $_buf);
1198             } else {
1199 148         454 my $_BSB_R_SOCK = $self->{_bsb_r_sock};
1200 148         558 for my $_i (1 .. $_total_workers) {
1201 452         27531 syswrite($_BSB_R_SOCK, $LF);
1202             }
1203             }
1204             }
1205              
1206 148         5596 _output_loop( $self, $_input_data, $_input_glob,
1207             \%_plugin_function, \@_plugin_loop_begin, \@_plugin_loop_end
1208             );
1209              
1210 148         965 $self->{_mgr_live} = $self->{_abort_msg} = $self->{_single_dim} = undef;
1211             }
1212              
1213             ## Remove the last message from the queue.
1214 148 100 66     2254 if (!$_send_cnt && $_run_mode ne 'nodata') {
1215             MCE::Util::_sysread($self->{_que_r_sock}, my($_buf), $_que_read_size)
1216 102 50       1197 if ( defined $self->{_que_r_sock} );
1217             }
1218              
1219 148         438 $self->{_send_cnt} = 0;
1220              
1221             ## Shutdown workers.
1222 148 100 66     2671 if ($_auto_shutdown || $self->{_total_exited}) {
    50 33        
    50          
1223 15         326 $self->shutdown();
1224             }
1225             elsif ($INC{'MCE/Simple.pm'}) {
1226 0         0 $self->shutdown();
1227             }
1228             elsif ($^S || $ENV{'PERL_IPERL_RUNNING'}) {
1229 0 0 0     0 if (
      0        
      0        
      0        
      0        
1230             !$INC{'Mojo/IOLoop.pm'} && !$INC{'Win32/GUI.pm'} &&
1231             !$INC{'Gearman/XS.pm'} && !$INC{'Gearman/Util.pm'} &&
1232             !$INC{'Tk.pm'} && !$INC{'Wx.pm'}
1233             ) {
1234             # running inside eval or IPerl, check stack trace
1235 0         0 my $_t = Carp::longmess(); $_t =~ s/\teval [^\n]+\n$//;
  0         0  
1236              
1237 0 0 0     0 if ( $_t =~ /^(?:[^\n]+\n){1,7}\teval / ||
      0        
      0        
      0        
1238             $_t =~ /\n\teval [^\n]+\n\t(?:eval|Try)/ ||
1239             $_t =~ /\n\tMCE::_dispatch\(\) [^\n]+ thread \d+\n$/ ||
1240             ( $_tid && !$self->{use_threads} ) )
1241             {
1242 0         0 $self->shutdown();
1243             }
1244             }
1245             }
1246              
1247 148         2341 return $self;
1248             }
1249              
1250             ###############################################################################
1251             ## ----------------------------------------------------------------------------
1252             ## Send method.
1253             ##
1254             ###############################################################################
1255              
1256             sub send {
1257 0 0   0 0 0 my $self = shift; $self = $MCE unless ref($self);
  0         0  
1258              
1259             _croak('MCE::send: method is not allowed by the worker process')
1260 0 0       0 if ($self->{_wid});
1261             _croak('MCE::send: method is not allowed while running')
1262 0 0       0 if ($self->{_total_running});
1263              
1264             _croak('MCE::send: method cannot be used with input_data or sequence')
1265 0 0 0     0 if (defined $self->{input_data} || defined $self->{sequence});
1266             _croak('MCE::send: method cannot be used with user_tasks')
1267 0 0       0 if (defined $self->{user_tasks});
1268              
1269 0         0 my $_data_ref;
1270              
1271 0 0 0     0 if (ref $_[0] eq 'ARRAY' || ref $_[0] eq 'HASH' || ref $_[0] eq 'PDL') {
      0        
1272 0         0 $_data_ref = $_[0];
1273             } else {
1274 0         0 _croak('MCE::send: ARRAY, HASH, or a PDL reference is not specified');
1275             }
1276              
1277 0         0 @_ = ();
1278              
1279 0 0       0 $self->{_send_cnt} = 0 unless (defined $self->{_send_cnt});
1280              
1281             ## -------------------------------------------------------------------------
1282              
1283             ## Spawn workers.
1284 0 0       0 $self->spawn() unless ($self->{_spawned});
1285              
1286             _croak('MCE::send: Sending greater than # of workers is not allowed')
1287 0 0       0 if ($self->{_send_cnt} >= $self->{_task}->[0]->{_total_workers});
1288              
1289 0         0 local $SIG{__DIE__} = \&MCE::Signal::_die_handler;
1290 0         0 local $SIG{__WARN__} = \&MCE::Signal::_warn_handler;
1291              
1292             ## Begin data submission.
1293 0         0 local $\ = undef; local $/ = $LF;
  0         0  
1294              
1295 0         0 my $_COM_R_SOCK = $self->{_com_r_sock};
1296 0         0 my $_submit_delay = $self->{submit_delay};
1297 0         0 my $_frozen_data = $self->{freeze}($_data_ref);
1298 0         0 my $_len = length $_frozen_data;
1299              
1300             ## Submit data to worker.
1301 0         0 print({$_COM_R_SOCK} '_data'.$LF), <$_COM_R_SOCK>;
  0         0  
1302 0         0 print({$_COM_R_SOCK} $_len.$LF, $_frozen_data), <$_COM_R_SOCK>;
  0         0  
1303              
1304 0         0 $self->{_send_cnt} += 1;
1305              
1306 0 0 0     0 sleep $_submit_delay
1307             if defined($_submit_delay) && $_submit_delay > 0.0;
1308              
1309 0         0 return $self;
1310             }
1311              
1312             ###############################################################################
1313             ## ----------------------------------------------------------------------------
1314             ## Shutdown method.
1315             ##
1316             ###############################################################################
1317              
1318             sub shutdown {
1319 67 50   67 0 912 my $self = shift; $self = $MCE unless ref($self);
  67         282  
1320 67   50     778 my $_no_lock = shift || 0;
1321              
1322 67         179 @_ = ();
1323              
1324             ## Return unless spawned or already shutdown.
1325 67 50       267 return unless $self->{_spawned};
1326              
1327             ## Return if signaled.
1328 67 50       245 if ($MCE::Signal::KILLED) {
1329 0 0       0 if (defined $self->{_sess_dir}) {
1330 0         0 my $_sess_dir = delete $self->{_sess_dir};
1331 0 0       0 rmdir $_sess_dir if -d $_sess_dir;
1332             }
1333 0         0 return;
1334             }
1335              
1336 67         1513 _validate_runstate($self, 'MCE::shutdown');
1337              
1338             ## Complete processing before shutting down.
1339 67 50       216 $self->run(0) if ($self->{_send_cnt});
1340              
1341 67         482 local $SIG{__DIE__} = \&MCE::Signal::_die_handler;
1342 67         367 local $SIG{__WARN__} = \&MCE::Signal::_warn_handler;
1343              
1344 67         529 my $_COM_R_SOCK = $self->{_com_r_sock};
1345 67         181 my $_data_channels = $self->{_data_channels};
1346 67         197 my $_total_workers = $self->{_total_workers};
1347 67         327 my $_sess_dir = $self->{_sess_dir};
1348              
1349 67 50 33     1661 if (defined $TOP_HDLR && refaddr($self) == refaddr($TOP_HDLR)) {
1350 67         387 $TOP_HDLR = undef;
1351             }
1352              
1353             ## -------------------------------------------------------------------------
1354              
1355 67 0 33     347 lock $_MCE_LOCK if ($_has_threads && $_is_winenv && !$_no_lock);
      33        
1356              
1357             ## Notify workers to exit loop.
1358 67         482 local ($!, $?, $_); local $\ = undef; local $/ = $LF;
  67         226  
  67         449  
1359              
1360 67         285 for (1 .. $_total_workers) {
1361 170         496 print({$_COM_R_SOCK} '_exit'.$LF), <$_COM_R_SOCK>;
  170         315707  
1362             }
1363              
1364             ## Reap children and/or threads.
1365 67 50       300 if (@{ $self->{_pids} } > 0) {
  67         593  
1366 67         184 my $_list = $self->{_pids};
1367 67         168 for my $i (0 .. @{ $_list }) {
  67         1357  
1368 237 100       280582127 waitpid($_list->[$i], 0) if $_list->[$i];
1369             }
1370             }
1371 67 50       386 if (@{ $self->{_thrs} } > 0) {
  67         785  
1372 0         0 my $_list = $self->{_thrs};
1373 0         0 for my $i (0 .. @{ $_list }) {
  0         0  
1374 0 0       0 $_list->[$i]->join() if $_list->[$i];
1375             }
1376             }
1377              
1378             ## Close sockets.
1379 67         252 $_COM_R_SOCK = undef;
1380              
1381 67         1683 MCE::Util::_destroy_socks($self, qw(
1382             _bsb_w_sock _bsb_r_sock _com_w_sock _com_r_sock _que_w_sock _que_r_sock
1383             _dat_w_sock _dat_r_sock _rla_w_sock _rla_r_sock
1384             ));
1385              
1386             ## -------------------------------------------------------------------------
1387              
1388             ## Destroy mutexes.
1389 67         274 for my $_i (0 .. $_data_channels) { delete $self->{'_mutex_'.$_i}; }
  237         2325  
1390 67         234 for my $_j (10 .. 11) { delete $self->{'_mutex_'.$_j}; } # input mutexes
  134         827  
1391              
1392             ## Remove session directory.
1393 67 50 33     495 rmdir $_sess_dir if (defined $_sess_dir && -d $_sess_dir);
1394              
1395             ## Reset instance.
1396 67         790 undef @{$self->{_pids}}; undef @{$self->{_thrs}}; undef @{$self->{_tids}};
  67         282  
  67         129  
  67         166  
  67         196  
  67         182  
1397 67         149 undef @{$self->{_state}}; undef @{$self->{_status}}; undef @{$self->{_task}};
  67         952  
  67         651  
  67         190  
  67         132  
  67         323  
1398              
1399 67         352 $self->{_chunk_id} = $self->{_send_cnt} = $self->{_spawned} = 0;
1400 67         179 $self->{_total_running} = $self->{_total_exited} = 0;
1401 67         151 $self->{_total_workers} = 0;
1402 67         445 $self->{_sess_dir} = undef;
1403              
1404 67 50       298 if ($self->{loop_timeout}) {
1405 0         0 delete $self->{_pids_t};
1406 0         0 delete $self->{_pids_w};
1407             }
1408              
1409 67         1135 return;
1410             }
1411              
1412             ###############################################################################
1413             ## ----------------------------------------------------------------------------
1414             ## Barrier sync and yield methods.
1415             ##
1416             ###############################################################################
1417              
1418             sub sync {
1419 0 0   0 0 0 my $self = shift; $self = $MCE unless ref($self);
  0         0  
1420              
1421 0 0       0 return unless ($self->{_wid});
1422              
1423             ## Barrier synchronization is supported for task 0 at this time.
1424             ## Note: Workers are assigned task_id 0 when omitting user_tasks.
1425              
1426 0 0       0 return if ($self->{_task_id} > 0);
1427              
1428 0         0 my $_chn = $self->{_chn};
1429 0         0 my $_DAT_W_SOCK = $self->{_dat_w_sock}->[0];
1430 0         0 my $_BSB_R_SOCK = $self->{_bsb_r_sock};
1431 0         0 my $_BSB_W_SOCK = $self->{_bsb_w_sock};
1432 0         0 my $_buf;
1433              
1434 0 0       0 local $\ = undef if (defined $\);
1435              
1436             ## Notify the manager process (barrier begin).
1437 0         0 print {$_DAT_W_SOCK} OUTPUT_B_SYN.$LF . $_chn.$LF;
  0         0  
1438              
1439             ## Wait until all workers from (task_id 0) have synced.
1440 0 0       0 MCE::Util::_sock_ready($_BSB_R_SOCK, -1) if $_is_MSWin32;
1441 0         0 MCE::Util::_sysread($_BSB_R_SOCK, $_buf, 1);
1442              
1443             ## Notify the manager process (barrier end).
1444 0         0 print {$_DAT_W_SOCK} OUTPUT_E_SYN.$LF . $_chn.$LF;
  0         0  
1445              
1446             ## Wait until all workers from (task_id 0) have un-synced.
1447 0 0       0 MCE::Util::_sock_ready($_BSB_W_SOCK, -1) if $_is_MSWin32;
1448 0         0 MCE::Util::_sysread($_BSB_W_SOCK, $_buf, 1);
1449              
1450 0         0 return;
1451             }
1452              
1453             sub yield {
1454 0 0   0 0 0 my $self = shift; $self = $MCE unless ref($self);
  0         0  
1455              
1456 0 0       0 return unless ($self->{_wid});
1457              
1458 0         0 my $_chn = $self->{_chn};
1459 0         0 my $_DAT_LOCK = $self->{_dat_lock};
1460 0         0 my $_DAT_W_SOCK = $self->{_dat_w_sock}->[0];
1461 0         0 my $_DAU_W_SOCK = $self->{_dat_w_sock}->[$_chn];
1462 0         0 my $_lock_chn = $self->{_lock_chn};
1463 0         0 my $_delay;
1464              
1465 0 0       0 local $\ = undef if (defined $\);
1466 0 0 0     0 local $/ = $LF if (!$/ || $/ ne $LF);
1467              
1468 0 0       0 $_DAT_LOCK->lock() if $_lock_chn;
1469 0         0 print({$_DAT_W_SOCK} OUTPUT_I_DLY.$LF . $_chn.$LF),
1470 0         0 print({$_DAU_W_SOCK} $self->{_task_id}.$LF);
  0         0  
1471 0         0 chomp($_delay = <$_DAU_W_SOCK>);
1472 0 0       0 $_DAT_LOCK->unlock() if $_lock_chn;
1473              
1474 0         0 MCE::Util::_sleep( $_delay );
1475             }
1476              
1477             ###############################################################################
1478             ## ----------------------------------------------------------------------------
1479             ## Miscellaneous methods: abort exit sess_dir tmp_dir.
1480             ##
1481             ###############################################################################
1482              
1483             ## Abort current job.
1484              
1485             sub abort {
1486 0 0   0 0 0 my $self = shift; $self = $MCE unless ref($self);
  0         0  
1487              
1488 0         0 my $_QUE_R_SOCK = $self->{_que_r_sock};
1489 0         0 my $_QUE_W_SOCK = $self->{_que_w_sock};
1490 0         0 my $_abort_msg = $self->{_abort_msg};
1491              
1492 0 0       0 if (defined $_abort_msg) {
1493 0         0 local $\ = undef;
1494              
1495 0 0       0 if ($_abort_msg > 0) {
1496 0         0 MCE::Util::_sysread($_QUE_R_SOCK, my($_next), $_que_read_size);
1497 0         0 syswrite($_QUE_W_SOCK, pack($_que_template, 0, $_abort_msg));
1498             }
1499              
1500 0 0       0 if ($self->{_wid} > 0) {
1501 0         0 my $_chn = $self->{_chn};
1502 0         0 my $_DAT_LOCK = $self->{_dat_lock};
1503 0         0 my $_DAT_W_SOCK = $self->{_dat_w_sock}->[0];
1504 0         0 my $_DAU_W_SOCK = $self->{_dat_w_sock}->[$_chn];
1505 0         0 my $_lock_chn = $self->{_lock_chn};
1506              
1507 0 0       0 $_DAT_LOCK->lock() if $_lock_chn;
1508 0         0 print {$_DAT_W_SOCK} OUTPUT_W_ABT.$LF . $_chn.$LF;
  0         0  
1509 0 0       0 $_DAT_LOCK->unlock() if $_lock_chn;
1510             }
1511             }
1512              
1513 0         0 return;
1514             }
1515              
1516             ## Worker exits from MCE.
1517              
1518             sub exit {
1519 0 0   0 0 0 my $self = shift; $self = $MCE unless ref($self);
  0         0  
1520              
1521 0 0       0 my $_exit_status = (defined $_[0]) ? $_[0] : $?;
1522 0 0       0 my $_exit_msg = (defined $_[1]) ? $_[1] : '';
1523 0 0       0 my $_exit_id = (defined $_[2]) ? $_[2] : $self->chunk_id;
1524              
1525 0         0 @_ = ();
1526              
1527             _croak('MCE::exit: method is not allowed by the manager process')
1528 0 0       0 unless ($self->{_wid});
1529              
1530 0         0 my $_chn = $self->{_chn};
1531 0         0 my $_DAT_LOCK = $self->{_dat_lock};
1532 0         0 my $_DAT_W_SOCK = $self->{_dat_w_sock}->[0];
1533 0         0 my $_DAU_W_SOCK = $self->{_dat_w_sock}->[$_chn];
1534 0         0 my $_lock_chn = $self->{_lock_chn};
1535 0         0 my $_task_id = $self->{_task_id};
1536              
1537 0 0       0 unless ( $self->{_exiting} ) {
1538 0         0 $self->{_exiting} = 1;
1539              
1540 0 0       0 my $_pid = $self->{_is_thread} ? $$ .'.'. threads->tid() : $$;
1541 0         0 my $_max_retries = $self->{max_retries};
1542 0         0 my $_chunk_id = $self->{_chunk_id};
1543              
1544 0 0 0     0 if ( defined $self->{init_relay} && !$self->{_relayed} && !$_task_id &&
      0        
      0        
      0        
1545             exists $self->{_wuf} && $self->{_pid} eq $_pid ) {
1546              
1547 0 0       0 $self->{_retry_cnt} = -1 unless defined( $self->{_retry_cnt} );
1548              
1549 0 0 0     0 if ( !$_max_retries || ++$self->{_retry_cnt} == $_max_retries ) {
1550 0 0   0   0 MCE::relay { warn "Error: chunk $_chunk_id failed\n" if $_chunk_id };
  0         0  
1551             }
1552             }
1553              
1554             ## Check for nested workers not yet joined.
1555 0 0       0 MCE::Child->finish('MCE') if $INC{'MCE/Child.pm'};
1556              
1557             MCE::Hobo->finish('MCE')
1558 0 0 0     0 if ( $INC{'MCE/Hobo.pm'} && MCE::Hobo->can('_clear') );
1559              
1560 0 0       0 local $\ = undef if (defined $\);
1561 0         0 my $_len = length $_exit_msg;
1562              
1563 0         0 $_exit_id =~ s/[\r\n][\r\n]*/ /mg;
1564 0 0       0 $_DAT_LOCK->lock() if $_lock_chn;
1565              
1566 0 0 0     0 if ($self->{_retry} && $self->{_retry}->[2]--) {
1567 0         0 $_exit_status = 0; my $_buf = $self->{freeze}($self->{_retry});
  0         0  
1568 0         0 print({$_DAT_W_SOCK} OUTPUT_W_EXT.$LF . $_chn.$LF),
1569 0         0 print({$_DAU_W_SOCK}
1570 0         0 $_task_id.$LF . $self->{_wid}.$LF . $self->{_exit_pid}.$LF .
1571             $_exit_status.$LF . $_exit_id.$LF . $_len.$LF . $_exit_msg .
1572             length($_buf).$LF, $_buf
1573             );
1574             }
1575             else {
1576 0         0 print({$_DAT_W_SOCK} OUTPUT_W_EXT.$LF . $_chn.$LF),
1577 0         0 print({$_DAU_W_SOCK}
1578 0         0 $_task_id.$LF . $self->{_wid}.$LF . $self->{_exit_pid}.$LF .
1579             $_exit_status.$LF . $_exit_id.$LF . $_len.$LF . $_exit_msg .
1580             '0'.$LF
1581             );
1582             }
1583              
1584 0 0       0 $_DAT_LOCK->unlock() if $_lock_chn;
1585             }
1586              
1587 0         0 _exit($self);
1588             }
1589              
1590             ## Return the session dir, made on demand.
1591              
1592             sub sess_dir {
1593 0 0   0 0 0 my $self = shift; $self = $MCE unless ref($self);
  0         0  
1594 0 0       0 return $self->{_sess_dir} if defined $self->{_sess_dir};
1595              
1596 0 0       0 if ($self->{_wid} == 0) {
1597 0 0       0 $self->{_sess_dir} = $self->{_spawned} ? _make_sessdir($self) : undef;
1598             }
1599             else {
1600 0         0 my $_chn = $self->{_chn};
1601 0         0 my $_DAT_LOCK = $self->{_dat_lock};
1602 0         0 my $_DAT_W_SOCK = $self->{_dat_w_sock}->[0];
1603 0         0 my $_DAU_W_SOCK = $self->{_dat_w_sock}->[$_chn];
1604 0         0 my $_lock_chn = $self->{_lock_chn};
1605 0         0 my $_sess_dir;
1606              
1607 0 0       0 local $\ = undef if (defined $\);
1608 0 0 0     0 local $/ = $LF if (!$/ || $/ ne $LF);
1609              
1610 0 0       0 $_DAT_LOCK->lock() if $_lock_chn;
1611 0         0 print({$_DAT_W_SOCK} OUTPUT_S_DIR.$LF . $_chn.$LF);
  0         0  
1612 0         0 chomp($_sess_dir = <$_DAU_W_SOCK>);
1613 0 0       0 $_DAT_LOCK->unlock() if $_lock_chn;
1614              
1615 0         0 $self->{_sess_dir} = $_sess_dir;
1616             }
1617             }
1618              
1619             ## Return the temp dir, made on demand.
1620              
1621             sub tmp_dir {
1622 31 50   31 0 7748821 my $self = shift; $self = $MCE unless ref($self);
  31         191  
1623 31 50       220 return $self->{tmp_dir} if defined $self->{tmp_dir};
1624              
1625 31 50       141 if ($self->{_wid} == 0) {
1626 31         196 $self->{tmp_dir} = MCE::Signal::_make_tmpdir();
1627             }
1628             else {
1629 0         0 my $_chn = $self->{_chn};
1630 0         0 my $_DAT_LOCK = $self->{_dat_lock};
1631 0         0 my $_DAT_W_SOCK = $self->{_dat_w_sock}->[0];
1632 0         0 my $_DAU_W_SOCK = $self->{_dat_w_sock}->[$_chn];
1633 0         0 my $_lock_chn = $self->{_lock_chn};
1634 0         0 my $_tmp_dir;
1635              
1636 0 0       0 local $\ = undef if (defined $\);
1637 0 0 0     0 local $/ = $LF if (!$/ || $/ ne $LF);
1638              
1639 0 0       0 $_DAT_LOCK->lock() if $_lock_chn;
1640 0         0 print({$_DAT_W_SOCK} OUTPUT_T_DIR.$LF . $_chn.$LF);
  0         0  
1641 0         0 chomp($_tmp_dir = <$_DAU_W_SOCK>);
1642 0 0       0 $_DAT_LOCK->unlock() if $_lock_chn;
1643              
1644 0         0 $self->{tmp_dir} = $_tmp_dir;
1645             }
1646             }
1647              
1648             ###############################################################################
1649             ## ----------------------------------------------------------------------------
1650             ## Methods for serializing data from workers to the main process.
1651             ##
1652             ###############################################################################
1653              
1654             ## Do method. Additional arguments are optional.
1655              
1656             sub do {
1657 133 100   133 0 1201 my $self = shift; $self = $MCE unless ref($self);
  133         439  
1658 133 50       619 my $_pkg = caller() eq 'MCE' ? caller(1) : caller();
1659              
1660 133 50       342 _croak('MCE::do: (code ref) is not supported')
1661             if (ref $_[0] eq 'CODE');
1662 133 50       485 _croak('MCE::do: (callback) is not specified')
1663             unless (defined ( my $_func = shift ));
1664              
1665 133 50       459 $_func = $_pkg.'::'.$_func if (index($_func, ':') < 0);
1666              
1667 133 50       475 if ($self->{_wid}) {
1668 133         1134 return _do_callback($self, $_func, [ @_ ]);
1669             }
1670             else {
1671 94     94   1404 no strict 'refs';
  94         249  
  94         381360  
1672 0         0 return $_func->(@_);
1673             }
1674             }
1675              
1676             ## Gather method.
1677              
1678             sub gather {
1679 365 50   365 0 6527 my $self = shift; $self = $MCE unless ref($self);
  365         1359  
1680              
1681             _croak('MCE::gather: method is not allowed by the manager process')
1682 365 50       1154 unless ($self->{_wid});
1683              
1684 365         2422 return _do_gather($self, [ @_ ]);
1685             }
1686              
1687             ## Sendto method.
1688              
1689             {
1690             my %_sendto_lkup = (
1691             'file' => SENDTO_FILEV1, 'stderr' => SENDTO_STDERR,
1692             'file:' => SENDTO_FILEV2, 'stdout' => SENDTO_STDOUT,
1693             'fd:' => SENDTO_FD,
1694             );
1695              
1696             my $_v2_regx = qr/^([^:]+:)(.+)/;
1697              
1698             sub sendto {
1699              
1700 0 0   0 0 0 my $self = shift; $self = $MCE unless ref($self);
  0         0  
1701 0         0 my $_to = shift;
1702              
1703             _croak('MCE::sendto: method is not allowed by the manager process')
1704 0 0       0 unless ($self->{_wid});
1705              
1706 0 0       0 return unless (defined $_[0]);
1707              
1708             my $_dest = exists $_sendto_lkup{ lc($_to) }
1709 0 0       0 ? $_sendto_lkup{ lc($_to) } : undef;
1710 0         0 my $_value;
1711              
1712 0 0       0 if (!defined $_dest) {
1713 0         0 my $_fd;
1714              
1715 0 0 0     0 if (ref($_to) && ( defined ($_fd = fileno($_to)) ||
    0 0        
1716             defined ($_fd = eval { $_to->fileno }) )) {
1717              
1718 0 0       0 if (my $_ob = tied *{ $_to }) {
  0         0  
1719 0 0       0 if (ref $_ob eq 'IO::TieCombine::Handle') {
1720 0 0       0 $_fd = 1 if (lc($_ob->{slot_name}) eq 'stdout');
1721 0 0       0 $_fd = 2 if (lc($_ob->{slot_name}) eq 'stderr');
1722             }
1723             }
1724              
1725 0 0       0 my $_data_ref = (scalar @_ == 1) ? \(''.$_[0]) : \join('', @_);
1726 0         0 return _do_send_glob($self, $_to, $_fd, $_data_ref);
1727             }
1728             elsif (reftype($_to) eq 'GLOB') {
1729 0         0 return _croak('Cannot write to filehandle');
1730             }
1731              
1732 0 0 0     0 if (defined $_to && $_to =~ /$_v2_regx/o) {
1733             $_dest = exists $_sendto_lkup{ lc($1) }
1734 0 0       0 ? $_sendto_lkup{ lc($1) } : undef;
1735 0         0 $_value = $2;
1736             }
1737              
1738 0 0 0     0 if (!defined $_dest || ( !defined $_value && (
      0        
      0        
1739             $_dest == SENDTO_FILEV2 || $_dest == SENDTO_FD
1740             ))) {
1741 0         0 my $_msg = "\n";
1742 0         0 $_msg .= "MCE::sendto: improper use of method\n";
1743 0         0 $_msg .= "\n";
1744 0         0 $_msg .= "## usage:\n";
1745 0         0 $_msg .= "## ->sendto(\"stderr\", ...);\n";
1746 0         0 $_msg .= "## ->sendto(\"stdout\", ...);\n";
1747 0         0 $_msg .= "## ->sendto(\"file:/path/to/file\", ...);\n";
1748 0         0 $_msg .= "## ->sendto(\"fd:2\", ...);\n";
1749 0         0 $_msg .= "\n";
1750              
1751 0         0 _croak($_msg);
1752             }
1753             }
1754              
1755 0 0       0 if ($_dest == SENDTO_FILEV1) { # sendto 'file', $a, $path
1756 0 0 0     0 return if (!defined $_[1] || @_ > 2); # Please switch to using V2
1757 0         0 $_value = $_[1]; delete $_[1]; # sendto 'file:/path', $a
  0         0  
1758 0         0 $_dest = SENDTO_FILEV2;
1759             }
1760              
1761 0         0 return _do_send($self, $_dest, $_value, @_);
1762             }
1763             }
1764              
1765             ###############################################################################
1766             ## ----------------------------------------------------------------------------
1767             ## Functions for serializing print, printf and say statements.
1768             ##
1769             ###############################################################################
1770              
1771             sub print {
1772 0 0   0 0 0 my $self = shift; $self = $MCE unless ref($self);
  0         0  
1773 0         0 my ($_fd, $_glob, $_data);
1774              
1775 0 0 0     0 if (ref($_[0]) && ( defined ($_fd = fileno($_[0])) ||
    0 0        
1776             defined ($_fd = eval { $_[0]->fileno }) )) {
1777              
1778 0 0       0 if (my $_ob = tied *{ $_[0] }) {
  0         0  
1779 0 0       0 if (ref $_ob eq 'IO::TieCombine::Handle') {
1780 0 0       0 $_fd = 1 if (lc($_ob->{slot_name}) eq 'stdout');
1781 0 0       0 $_fd = 2 if (lc($_ob->{slot_name}) eq 'stderr');
1782             }
1783             }
1784              
1785 0         0 $_glob = shift;
1786             }
1787             elsif (reftype($_[0]) eq 'GLOB') {
1788 0         0 return _croak('Cannot write to filehandle');
1789             }
1790              
1791 0 0       0 $_data = join('', scalar @_ ? @_ : $_);
1792              
1793 0 0       0 return _do_send_glob($self, $_glob, $_fd, \$_data) if $_fd;
1794 0 0       0 return _do_send($self, SENDTO_STDOUT, undef, \$_data) if $self->{_wid};
1795 0         0 return _do_send_glob($self, \*STDOUT, 1, \$_data);
1796             }
1797              
1798             sub printf {
1799 0 0   0 0 0 my $self = shift; $self = $MCE unless ref($self);
  0         0  
1800 0         0 my ($_fd, $_glob, $_fmt, $_data);
1801              
1802 0 0 0     0 if (ref($_[0]) && ( defined ($_fd = fileno($_[0])) ||
    0 0        
1803             defined ($_fd = eval { $_[0]->fileno }) )) {
1804              
1805 0 0       0 if (my $_ob = tied *{ $_[0] }) {
  0         0  
1806 0 0       0 if (ref $_ob eq 'IO::TieCombine::Handle') {
1807 0 0       0 $_fd = 1 if (lc($_ob->{slot_name}) eq 'stdout');
1808 0 0       0 $_fd = 2 if (lc($_ob->{slot_name}) eq 'stderr');
1809             }
1810             }
1811              
1812 0         0 $_glob = shift;
1813             }
1814             elsif (reftype($_[0]) eq 'GLOB') {
1815 0         0 return _croak('Cannot write to filehandle');
1816             }
1817              
1818 0   0     0 $_fmt = shift || '%s';
1819 0 0       0 $_data = _sprintf($_fmt, scalar @_ ? @_ : $_);
1820              
1821 0 0       0 return _do_send_glob($self, $_glob, $_fd, \$_data) if $_fd;
1822 0 0       0 return _do_send($self, SENDTO_STDOUT, undef, \$_data) if $self->{_wid};
1823 0         0 return _do_send_glob($self, \*STDOUT, 1, \$_data);
1824             }
1825              
1826             sub say {
1827 0 0   0 0 0 my $self = shift; $self = $MCE unless ref($self);
  0         0  
1828 0         0 my ($_fd, $_glob, $_data);
1829              
1830 0 0 0     0 if (ref($_[0]) && ( defined ($_fd = fileno($_[0])) ||
    0 0        
1831             defined ($_fd = eval { $_[0]->fileno }) )) {
1832              
1833 0 0       0 if (my $_ob = tied *{ $_[0] }) {
  0         0  
1834 0 0       0 if (ref $_ob eq 'IO::TieCombine::Handle') {
1835 0 0       0 $_fd = 1 if (lc($_ob->{slot_name}) eq 'stdout');
1836 0 0       0 $_fd = 2 if (lc($_ob->{slot_name}) eq 'stderr');
1837             }
1838             }
1839              
1840 0         0 $_glob = shift;
1841             }
1842             elsif (reftype($_[0]) eq 'GLOB') {
1843 0         0 return _croak('Cannot write to filehandle');
1844             }
1845              
1846 0 0       0 $_data = join('', scalar @_ ? @_ : $_) . "\n";
1847              
1848 0 0       0 return _do_send_glob($self, $_glob, $_fd, \$_data) if $_fd;
1849 0 0       0 return _do_send($self, SENDTO_STDOUT, undef, \$_data) if $self->{_wid};
1850 0         0 return _do_send_glob($self, \*STDOUT, 1, \$_data);
1851             }
1852              
1853             ###############################################################################
1854             ## ----------------------------------------------------------------------------
1855             ## Private methods.
1856             ##
1857             ###############################################################################
1858              
1859             sub _exit {
1860 73     73   555 my $self = shift;
1861 73 50 33     2620 my $_has_guard = (exists $self->{_guard} && $self->{_guard}->[0]) ? 1 : 0;
1862              
1863 73 50       462 @{ $self->{_guard} } = () if $_has_guard;
  0         0  
1864 73         235 delete $self->{_wuf}; _end();
  73         1948  
1865              
1866             ## Exit thread/child process.
1867 73 50   0   2667 $SIG{__DIE__} = sub {} unless $_tid;
1868 73     0   716 $SIG{__WARN__} = sub {};
1869              
1870 73 50       530 threads->exit(0) if $self->{use_threads};
1871              
1872 73 50       275 if (! $_tid) {
1873             $SIG{HUP} = $SIG{INT} = $SIG{QUIT} = $SIG{TERM} = sub {
1874 0     0   0 $SIG{$_[0]} = $SIG{INT} = $SIG{TERM} = sub {};
1875              
1876 0 0 0     0 CORE::kill($_[0], getppid())
      0        
1877             if (($_[0] eq 'INT' || $_[0] eq 'TERM') && $^O ne 'MSWin32');
1878              
1879 0         0 CORE::kill('KILL', $$);
1880 73         7456 };
1881             }
1882              
1883 73 0 33     628 if ($self->{posix_exit} && !$_has_guard && !$_is_MSWin32) {
      33        
1884 0         0 eval { MCE::Mutex::Channel::_destroy() };
  0         0  
1885 0 0       0 POSIX::_exit(0) if $INC{'POSIX.pm'};
1886 0         0 CORE::kill('KILL', $$);
1887             }
1888              
1889 73         43093 CORE::exit(0);
1890             }
1891              
1892             sub _get_max_workers {
1893 140 50   140   389 my $self = shift; $self = $MCE unless ref($self);
  140         426  
1894              
1895 140 100       720 if (defined $self->{user_tasks}) {
1896 96 50       390 if (defined $self->{user_tasks}->[0]->{max_workers}) {
1897 96         408 return $self->{user_tasks}->[0]->{max_workers};
1898             }
1899             }
1900              
1901 44         112 return $self->{max_workers};
1902             }
1903              
1904             sub _make_sessdir {
1905 0 0   0   0 my $self = shift; $self = $MCE unless ref($self);
  0         0  
1906              
1907 0         0 my $_sess_dir = $self->{_sess_dir};
1908              
1909 0 0       0 unless (defined $_sess_dir) {
1910             $self->{tmp_dir} = MCE::Signal::_make_tmpdir()
1911 0 0       0 unless defined $self->{tmp_dir};
1912              
1913 0 0       0 my $_mce_tid = $INC{'threads.pm'} ? threads->tid() : '';
1914 0 0       0 $_mce_tid = '' unless defined $self->{_mce_tid};
1915              
1916 0         0 my $_mce_sid = $$ .'.'. $_mce_tid .'.'. (++$_mce_count);
1917 0         0 my $_tmp_dir = $self->{tmp_dir};
1918              
1919 0 0 0     0 _croak("MCE::sess_dir: (tmp_dir) is not defined")
1920             if (!defined $_tmp_dir || $_tmp_dir eq '');
1921 0 0       0 _croak("MCE::sess_dir: ($_tmp_dir) is not a directory or does not exist")
1922             unless (-d $_tmp_dir);
1923 0 0       0 _croak("MCE::sess_dir: ($_tmp_dir) is not writeable")
1924             unless (-w $_tmp_dir);
1925              
1926 0         0 my $_cnt = 0; $_sess_dir = "$_tmp_dir/$_mce_sid";
  0         0  
1927              
1928 0         0 $_sess_dir = "$_tmp_dir/$_mce_sid." . (++$_cnt)
1929             while ( !(mkdir $_sess_dir, 0770) );
1930             }
1931              
1932 0         0 return $_sess_dir;
1933             }
1934              
1935             sub _sprintf {
1936 0     0   0 my $_fmt = shift;
1937             # remove tainted'ness
1938 0         0 ($_fmt) = $_fmt =~ /(.*)/s;
1939              
1940 0         0 return sprintf("$_fmt", @_);
1941             }
1942              
1943             sub _sync_buffer_to_array {
1944 5     5   23 my ($_buffer_ref, $_array_ref, $_chop_str) = @_;
1945              
1946 5         319 local $_; my $_cnt = 0;
  5         17  
1947              
1948 5         148 open my $_MEM_FH, '<', $_buffer_ref;
1949 5         31 binmode $_MEM_FH, ':raw';
1950              
1951 5 50       18 unless (length $_chop_str) {
1952 5         508 $_array_ref->[$_cnt++] = $_ while (<$_MEM_FH>);
1953             }
1954             else {
1955 0         0 $_array_ref->[$_cnt++] = <$_MEM_FH>;
1956 0         0 while (<$_MEM_FH>) {
1957 0         0 $_array_ref->[$_cnt ] = $_chop_str;
1958 0         0 $_array_ref->[$_cnt++] .= $_;
1959             }
1960             }
1961              
1962 5         23 close $_MEM_FH;
1963 5         32 weaken $_MEM_FH;
1964              
1965 5         22 return;
1966             }
1967              
1968             sub _sync_params {
1969 192     192   507 my ($self, $_params_ref) = @_;
1970 192         501 my $_requires_shutdown = 0;
1971              
1972 192 50 33     1158 if (defined $_params_ref->{init_relay} && !defined $self->{init_relay}) {
1973 0         0 $_requires_shutdown = 1;
1974             }
1975 192         685 for my $_p (qw( user_begin user_func user_end )) {
1976 576 50       1576 if (defined $_params_ref->{$_p}) {
1977 0         0 $self->{$_p} = delete $_params_ref->{$_p};
1978 0         0 $_requires_shutdown = 1;
1979             }
1980             }
1981 192         446 for my $_p (keys %{ $_params_ref }) {
  192         590  
1982             _croak("MCE::_sync_params: ($_p) is not a valid params argument")
1983 316 50       958 unless (exists $_params_allowed_args{$_p});
1984              
1985 316         805 $self->{$_p} = $_params_ref->{$_p};
1986             }
1987              
1988 192 100       859 return ($self->{_spawned}) ? $_requires_shutdown : 0;
1989             }
1990              
1991             ###############################################################################
1992             ## ----------------------------------------------------------------------------
1993             ## Dispatch methods.
1994             ##
1995             ###############################################################################
1996              
1997             sub _dispatch {
1998 73     73   3924 my @_args = @_; my $_is_thread = shift @_args;
  73         3539  
1999 73         4646 my $self = $MCE = $_args[0];
2000              
2001             ## To avoid (Scalars leaked: N) messages; fixed in Perl 5.12.x
2002 73         2045 @_ = ();
2003              
2004             $ENV{'PERL_MCE_IPC'} = 'win32' if ( $_is_MSWin32 && (
2005             defined($self->{max_retries}) ||
2006             $INC{'MCE/Child.pm'} ||
2007 73 0 0     2869 $INC{'MCE/Hobo.pm'}
      33        
2008             ));
2009              
2010 73         1802 delete $self->{_relayed};
2011              
2012 73         4923 $self->{_is_thread} = $_is_thread;
2013 73 50       9961 $self->{_pid} = $_is_thread ? $$ .'.'. threads->tid() : $$;
2014              
2015 73 50       2999 if (!$self->{use_threads}) {
2016 73 50       2628 MCE::Child->_clear() if $INC{'MCE/Child.pm'};
2017 73 50       1979 MCE::Hobo->_clear() if $INC{'MCE/Hobo.pm'};
2018             }
2019              
2020             # Set the seed of the base generator uniquely between workers.
2021             # The new seed is computed using the current seed and ID value.
2022             # One may set the seed at the application level for predictable
2023             # results (non-thread workers only). Ditto for Math::Prime::Util,
2024             # Math::Random, Math::Random::MT::Auto, and PDL.
2025             #
2026             # MCE 1.892, 2024-06-08: Enable predictability running threads.
2027             # Output matches non-threads for CORE, Math::Prime::Util, and
2028             # Math::Random::MT::Auto. https://perlmonks.org/?node_id=11159834
2029              
2030             {
2031 73         1292 my $_wid = $_args[1];
  73         923  
2032 73         2176 my $_seed = abs($self->{_seed} - ($_wid * 100000)) % 2147483560;
2033              
2034 73 50 33     4726 CORE::srand($_seed) if (!$self->{use_threads} || $] ge '5.020000'); # drand48
2035 73 50       1576 Math::Prime::Util::srand($_seed) if $INC{'Math/Prime/Util.pm'};
2036              
2037             # [etj] identified a race condition in PDL running threads
2038             # https://perlmonks.org/?node_id=11159841
2039 73 50       1503 if (!$self->{use_threads}) {
2040 73 50 33     1849 PDL::srand($_seed) if $INC{'PDL.pm'} && PDL->can('srand'); # PDL 2.062 ~ 2.089
2041 73 50 33     2737 PDL::srandom($_seed) if $INC{'PDL.pm'} && PDL->can('srandom'); # PDL 2.089_01+
2042             }
2043             }
2044              
2045 73 50 33     3419 if (!$self->{use_threads} && $INC{'Math/Random.pm'}) {
2046 0         0 my ($_wid, $_cur_seed) = ($_args[1], Math::Random::random_get_seed());
2047              
2048 0 0       0 my $_new_seed = ($_cur_seed < 1073741781)
2049             ? $_cur_seed + (($_wid * 100000) % 1073741780)
2050             : $_cur_seed - (($_wid * 100000) % 1073741780);
2051              
2052 0         0 Math::Random::random_set_seed($_new_seed, $_new_seed);
2053             }
2054              
2055 73 50       1523 if ($INC{'Math/Random/MT/Auto.pm'}) {
2056 0         0 my ($_wid, $_cur_seed) = (
2057             $_args[1], Math::Random::MT::Auto::get_seed()->[0]
2058             );
2059 0 0       0 my $_new_seed = ($_cur_seed < 1073741781)
2060             ? $_cur_seed + (($_wid * 100000) % 1073741780)
2061             : $_cur_seed - (($_wid * 100000) % 1073741780);
2062              
2063 0         0 Math::Random::MT::Auto::set_seed($_new_seed);
2064             }
2065              
2066             ## Run.
2067              
2068 73         9288 _worker_main(@_args, \@_plugin_worker_init);
2069              
2070 73         2000 _exit($self);
2071             }
2072              
2073             sub _dispatch_thread {
2074 0     0   0 my ($self, $_wid, $_task, $_task_id, $_task_wid, $_params) = @_;
2075              
2076 0         0 @_ = (); local $_;
  0         0  
2077              
2078 0         0 my $_thr = threads->create( \&_dispatch,
2079             1, $self, $_wid, $_task, $_task_id, $_task_wid, $_params
2080             );
2081              
2082 0 0       0 _croak("MCE::_dispatch_thread: Failed to spawn worker $_wid: $!")
2083             if (!defined $_thr);
2084              
2085             ## Store into an available slot (restart), otherwise append to arrays.
2086 0 0       0 if (defined $_params) { for my $_i (0 .. @{ $self->{_tids} } - 1) {
  0         0  
  0         0  
2087 0 0       0 unless (defined $self->{_tids}->[$_i]) {
2088 0         0 $self->{_thrs}->[$_i] = $_thr;
2089 0         0 $self->{_tids}->[$_i] = $_thr->tid();
2090 0         0 return;
2091             }
2092             }}
2093              
2094 0         0 push @{ $self->{_thrs} }, $_thr;
  0         0  
2095 0         0 push @{ $self->{_tids} }, $_thr->tid();
  0         0  
2096              
2097             sleep $self->{spawn_delay}
2098 0 0 0     0 if defined($self->{spawn_delay}) && $self->{spawn_delay} > 0.0;
2099              
2100 0         0 return;
2101             }
2102              
2103             sub _dispatch_child {
2104 321     321   1150 my ($self, $_wid, $_task, $_task_id, $_task_wid, $_params) = @_;
2105              
2106 321         644 @_ = (); local $_;
  321         564  
2107 321         733103 my $_pid = fork();
2108              
2109 321 50       20192 _croak("MCE::_dispatch_child: Failed to spawn worker $_wid: $!")
2110             if (!defined $_pid);
2111              
2112 321 100       13156 _dispatch(0, $self, $_wid, $_task, $_task_id, $_task_wid, $_params)
2113             if ($_pid == 0);
2114              
2115             ## Store into an available slot (restart), otherwise append to array.
2116 248 50       3905 if (defined $_params) { for my $_i (0 .. @{ $self->{_pids} } - 1) {
  0         0  
  0         0  
2117 0 0       0 unless (defined $self->{_pids}->[$_i]) {
2118 0         0 $self->{_pids}->[$_i] = $_pid;
2119 0         0 return;
2120             }
2121             }}
2122              
2123 248         9207 push @{ $self->{_pids} }, $_pid;
  248         10983  
2124              
2125 248 50 33     4501 if ($self->{loop_timeout} && !$_is_MSWin32) {
2126 0         0 $self->{_pids_t}{$_pid} = $_task_id;
2127 0         0 $self->{_pids_w}{$_pid} = $_wid;
2128             }
2129              
2130             sleep $self->{spawn_delay}
2131 248 50 33     4077 if defined($self->{spawn_delay}) && $self->{spawn_delay} > 0.0;
2132              
2133 248         18049 return;
2134             }
2135              
2136             1;
2137