File Coverage

blib/lib/MCE/Core/Manager.pm
Criterion Covered Total %
statement 214 519 41.2
branch 89 286 31.1
condition 33 116 28.4
subroutine 20 38 52.6
pod n/a
total 356 959 37.1


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## Core methods for the manager process.
4             ##
5             ## This package provides the loop and relevant methods used internally by the
6             ## manager process.
7             ##
8             ## There is no public API.
9             ##
10             ###############################################################################
11              
12             package MCE::Core::Manager;
13              
14 85     85   612 use strict;
  85         177  
  85         2619  
15 85     85   417 use warnings;
  85         207  
  85         4417  
16              
17             our $VERSION = '1.889';
18              
19             ## no critic (BuiltinFunctions::ProhibitStringyEval)
20             ## no critic (TestingAndDebugging::ProhibitNoStrict)
21              
22             ## Items below are folded into MCE.
23              
24             package # hide from rpm
25             MCE;
26              
27 85     85   513 no warnings qw( threads recursion uninitialized );
  85         154  
  85         5346  
28              
29             ## The POSIX module has many symbols. Try not loading it simply
30             ## to have WNOHANG. The following covers most platforms.
31              
32             use constant {
33 85 50       45341 _WNOHANG => ( $INC{'POSIX.pm'} )
    50          
34             ? &POSIX::WNOHANG : ( $^O eq 'solaris' ) ? 64 : 1
35 85     85   496 };
  85         173  
36              
37             ###############################################################################
38             ## ----------------------------------------------------------------------------
39             ## Call on task_end after task completion.
40             ##
41             ###############################################################################
42              
43             sub _task_end {
44              
45 214     214   650 my ($self, $_task_id) = @_;
46              
47 214         440 @_ = ();
48              
49 214 100       668 if (defined $self->{user_tasks}) {
    50          
50             my $_task_end = (exists $self->{user_tasks}->[$_task_id]->{task_end})
51             ? $self->{user_tasks}->[$_task_id]->{task_end}
52 180 50       652 : $self->{task_end};
53              
54 180 100       461 if (defined $_task_end) {
55             my $_task_name = (exists $self->{user_tasks}->[$_task_id]->{task_name})
56             ? $self->{user_tasks}->[$_task_id]->{task_name}
57 144 50       477 : $self->{task_name};
58              
59 144         499 $_task_end->($self, $_task_id, $_task_name);
60             }
61             }
62             elsif (defined $self->{task_end}) {
63 0         0 $self->{task_end}->($self, 0, $self->{task_name});
64             }
65              
66 214         741 return;
67             }
68              
69             ###############################################################################
70             ## ----------------------------------------------------------------------------
71             ## Process output.
72             ##
73             ## Awaits and processes events from workers. The sendto/do methods tag the
74             ## output accordingly. The hash structure below is key-driven.
75             ##
76             ###############################################################################
77              
78             my %_sendto_fhs;
79              
80             sub _sendto_fhs_close {
81 502     502   2627 for my $_p (keys %_sendto_fhs) {
82 0         0 close $_sendto_fhs{$_p};
83 0         0 delete $_sendto_fhs{$_p};
84             }
85             }
86              
87             sub _sendto_fhs_get {
88 0     0   0 my ($self, $_fd) = @_;
89              
90 0 0       0 $_sendto_fhs{$_fd} || do {
91              
92 0         0 $_sendto_fhs{$_fd} = IO::Handle->new();
93 0 0       0 $_sendto_fhs{$_fd}->fdopen($_fd, 'w')
94             or _croak "Cannot open file descriptor ($_fd): $!";
95              
96 0         0 binmode $_sendto_fhs{$_fd};
97              
98 0 0 0     0 if (!exists $self->{flush_file} || $self->{flush_file}) {
99 0         0 local $!;
100 0         0 $_sendto_fhs{$_fd}->autoflush(1)
101             }
102              
103 0         0 $_sendto_fhs{$_fd};
104             };
105             }
106              
107             sub _output_loop {
108              
109 142     142   749 my ( $self, $_input_data, $_input_glob, $_plugin_function,
110             $_plugin_loop_begin, $_plugin_loop_end ) = @_;
111              
112 142         406 @_ = ();
113              
114             my (
115 142         827 $_aborted, $_eof_flag, $_max_retries, $_syn_flag, $_win32_ipc,
116             $_cb, $_chunk_id, $_chunk_size, $_file, $_size_completed, $_wa,
117             @_is_c_ref, @_is_h_ref, @_is_q_ref, $_on_post_exit, $_on_post_run,
118             $_has_user_tasks, $_sess_dir, $_task_id, $_user_error, $_user_output,
119             $_input_size, $_offset_pos, $_single_dim, @_gather, $_cs_one_flag,
120             $_exit_id, $_exit_pid, $_exit_status, $_exit_wid, $_len, $_sync_cnt,
121             $_BSB_W_SOCK, $_BSB_R_SOCK, $_DAT_R_SOCK, $_DAU_R_SOCK, $_MCE_STDERR,
122             $_I_FLG, $_O_FLG, $_I_SEP, $_O_SEP, $_RS, $_RS_FLG, $_MCE_STDOUT,
123             @_delay_wid
124             );
125              
126             ## -------------------------------------------------------------------------
127             ## Callback return.
128              
129             my $_cb_reply = sub {
130 285 50   285   766 local $\ = $_O_SEP if ($_O_FLG);
131 285 50       542 local $/ = $_I_SEP if ($_I_FLG);
132              
133 85     85   749 no strict 'refs';
  85         163  
  85         206016  
134              
135 285 100       729 if ( $_wa == WANTS_UNDEF ) {
    100          
136 274         2179 $_cb->(@_);
137 274         185742 return;
138             }
139             elsif ( $_wa == WANTS_ARRAY ) {
140 2         50 my @_ret = $_cb->(@_);
141 2         149 my $_buf = $self->{freeze}(\@_ret);
142              
143 2         6 return print {$_DAU_R_SOCK} length($_buf).$LF, $_buf;
  2         120  
144             }
145              
146 9         293 my $_ret = $_cb->(@_);
147 9         139 my $_buf = $self->{freeze}([ $_ret ]);
148              
149 9         30 return print {$_DAU_R_SOCK} length($_buf).$LF, $_buf;
  9         456  
150 142         2646 };
151              
152             ## -------------------------------------------------------------------------
153             ## Create hash structure containing various output functions.
154              
155             my %_core_output_function = (
156              
157             OUTPUT_W_ABT.$LF => sub { # Worker has aborted
158 0     0   0 $_aborted = 1;
159 0         0 return;
160             },
161              
162             OUTPUT_W_DNE.$LF => sub { # Worker has completed
163 428     428   36581 chomp($_task_id = <$_DAU_R_SOCK>);
164 856         1181 $self->{_total_running} -= 1;
165              
166 856 100 66     1968 if ($_has_user_tasks && $_task_id >= 0) {
167 347         924 $self->{_task}->[$_task_id]->{_total_running} -= 1;
168             }
169              
170             my $_total_running = ($_has_user_tasks)
171             ? $self->{_task}->[$_task_id]->{_total_running}
172 856 100       1044 : $self->{_total_running};
173              
174 856 50 66     1632 if ($_task_id == 0 && defined $_syn_flag && $_sync_cnt) {
      33        
175 0 0       0 if ($_sync_cnt == $_total_running) {
176 0         0 for my $_i (1 .. $_total_running) {
177 0         0 syswrite($_BSB_W_SOCK, $LF);
178             }
179 0         0 undef $_syn_flag;
180             }
181             }
182              
183 856 100       16171 _task_end($self, $_task_id) unless $_total_running;
184              
185 856         1188 return;
186             },
187              
188             ## ----------------------------------------------------------------------
189              
190             OUTPUT_W_EXT.$LF => sub { # Worker has exited
191 0     0   0 chomp($_task_id = <$_DAU_R_SOCK>);
192              
193 0         0 $self->{_total_exited} += 1;
194 0         0 $self->{_total_running} -= 1;
195 0         0 $self->{_total_workers} -= 1;
196              
197 0 0 0     0 if ($_has_user_tasks && $_task_id >= 0) {
198 0         0 $self->{_task}->[$_task_id]->{_total_running} -= 1;
199 0         0 $self->{_task}->[$_task_id]->{_total_workers} -= 1;
200             }
201              
202             my $_total_running = ($_has_user_tasks)
203             ? $self->{_task}->[$_task_id]->{_total_running}
204 0 0       0 : $self->{_total_running};
205              
206 0 0 0     0 if ($_task_id == 0 && defined $_syn_flag && $_sync_cnt) {
      0        
207 0 0       0 if ($_sync_cnt == $_total_running) {
208 0         0 for my $_i (1 .. $_total_running) {
209 0         0 syswrite($_BSB_W_SOCK, $LF);
210             }
211 0         0 undef $_syn_flag;
212             }
213             }
214              
215 0         0 my ($_exit_msg, $_retry_buf) = ('', '');
216              
217 0         0 chomp($_exit_wid = <$_DAU_R_SOCK>),
218             chomp($_exit_pid = <$_DAU_R_SOCK>),
219             chomp($_exit_status = <$_DAU_R_SOCK>),
220             chomp($_exit_id = <$_DAU_R_SOCK>),
221             chomp($_len = <$_DAU_R_SOCK>);
222              
223 0 0       0 read($_DAU_R_SOCK, $_exit_msg, $_len) if ($_len);
224              
225 0         0 chomp($_len = <$_DAU_R_SOCK>);
226              
227 0 0       0 read($_DAU_R_SOCK, $_retry_buf, $_len) if ($_len);
228              
229 0 0       0 if (abs($_exit_status) > abs($self->{_wrk_status})) {
230 0         0 $self->{_wrk_status} = $_exit_status;
231             }
232              
233             ## Reap child/thread. Note: Win32 uses negative PIDs.
234              
235 0         0 local ($!, $?);
236              
237 0 0       0 if ($_exit_pid =~ /^PID_(-?\d+)/) {
    0          
238 0         0 my $_pid = $1; my $_list = $self->{_pids};
  0         0  
239 0         0 for my $i (0 .. @{ $_list }) {
  0         0  
240 0 0 0     0 if ($_list->[$i] && $_list->[$i] == $_pid) {
241 0         0 waitpid $_pid, 0;
242 0         0 $self->{_pids}->[$i] = undef;
243 0         0 last;
244             }
245             }
246             }
247             elsif ($_exit_pid =~ /^TID_(\d+)/) {
248 0         0 my $_tid = $1; my $_list = $self->{_tids};
  0         0  
249 0         0 for my $i (0 .. @{ $_list }) {
  0         0  
250 0 0 0     0 if ($_list->[$i] && $_list->[$i] == $_tid) {
251 0         0 eval { $self->{_thrs}->[$i]->join() };
  0         0  
252 0         0 $self->{_thrs}->[$i] = undef;
253 0         0 $self->{_tids}->[$i] = undef;
254 0         0 last;
255             }
256             }
257             }
258              
259             ## Call on_post_exit callback if defined. Otherwise, append status
260             ## information if on_post_run is defined for later retrieval.
261              
262 0 0       0 if (defined $_on_post_exit) {
    0          
263 0         0 $self->{_exited_wid} = $_exit_wid;
264              
265 0 0       0 if (length($_retry_buf)) {
266 0         0 $self->{_retry} = $self->{thaw}($_retry_buf);
267 0         0 $self->{_retry_cnt} = $_max_retries - $self->{_retry}[2] - 1;
268              
269             $_on_post_exit->($self, {
270             wid => $_exit_wid, pid => $_exit_pid, status => $_exit_status,
271             msg => $_exit_msg, id => $_exit_id
272 0         0 }, $self->{_retry_cnt});
273              
274 0         0 delete $self->{_retry};
275             }
276             else {
277 0   0     0 $_on_post_exit->($self, {
278             wid => $_exit_wid, pid => $_exit_pid, status => $_exit_status,
279             msg => $_exit_msg, id => $_exit_id
280             }, $_max_retries || 0 );
281             }
282              
283 0         0 delete $self->{_exited_wid};
284             }
285             elsif (defined $_on_post_run) {
286 0         0 push @{ $self->{_status} }, {
  0         0  
287             wid => $_exit_wid, pid => $_exit_pid, status => $_exit_status,
288             msg => $_exit_msg, id => $_exit_id
289             };
290             }
291              
292 0 0       0 _task_end($self, $_task_id) unless $_total_running;
293              
294 0         0 return;
295             },
296              
297             ## ----------------------------------------------------------------------
298              
299             OUTPUT_A_REF.$LF => sub { # Input << Array ref
300 376     376   482 my $_buf;
301              
302 376 100 66     1335 if ($_offset_pos >= $_input_size || $_aborted) {
303 80 50       357 local $\ = undef if (defined $\);
304 80         112 print {$_DAU_R_SOCK} '0'.$LF;
  80         2701  
305              
306 80         503 return;
307             }
308              
309 296 100 66     882 if ($_single_dim && $_cs_one_flag) {
310 229         1712 $_buf = $self->{freeze}( [ $_input_data->[$_offset_pos] ] );
311             }
312             else {
313 67 100       142 if ($_offset_pos + $_chunk_size - 1 < $_input_size) {
314 54         131 $_buf = $self->{freeze}( [ @{ $_input_data }[
  54         454  
315             $_offset_pos .. $_offset_pos + $_chunk_size - 1
316             ] ] );
317             }
318             else {
319 13         37 $_buf = $self->{freeze}( [ @{ $_input_data }[
  13         88  
320             $_offset_pos .. $_input_size - 1
321             ] ] );
322             }
323             }
324              
325 296 50       578 $_len = length $_buf; local $\ = undef if (defined $\);
  296         785  
326 296         390 print {$_DAU_R_SOCK} $_len.$LF . (++$_chunk_id).$LF, $_buf;
  296         10702  
327 296         1092 $_offset_pos += $_chunk_size;
328              
329 296         1084 return;
330             },
331              
332             OUTPUT_G_REF.$LF => sub { # Input << Glob ref
333 54     54   195 my $_buf = '';
334              
335             ## The logic below honors ('Ctrl/Z' in Windows, 'Ctrl/D' in Unix)
336             ## when reading from standard input. No output will be lost as
337             ## far as what was previously read into the buffer.
338              
339 54 100 66     254 if ($_eof_flag || $_aborted) {
340 36 50       130 local $\ = undef if (defined $\);
341 36         98 print {$_DAU_R_SOCK} '0'.$LF;
  36         1155  
342              
343 36         222 return;
344             }
345              
346             {
347 18 50       34 local $/ = $_RS if ($_RS_FLG);
  18         55  
348              
349 18 50       61 if ($_chunk_size <= MAX_RECS_SIZE) {
350 0 0       0 if ($_chunk_size == 1) {
351 0 0       0 $_buf .= $_input_glob->can('getline')
352             ? $_input_glob->getline : <$_input_glob>;
353 0 0       0 $_eof_flag = 1 unless (length $_buf);
354             }
355             else {
356 0         0 my $_last_len = 0;
357 0         0 for (1 .. $_chunk_size) {
358 0 0       0 $_buf .= $_input_glob->can('getline')
359             ? $_input_glob->getline : <$_input_glob>;
360 0         0 $_len = length $_buf;
361 0 0       0 if ($_len == $_last_len) {
362 0         0 $_eof_flag = 1;
363 0         0 last;
364             }
365 0         0 $_last_len = $_len;
366             }
367             }
368             }
369             else {
370 18 50 33     962 if ($_input_glob->can('getline') && $_input_glob->can('read')) {
371 18 50       125 if ($_input_glob->read($_buf, $_chunk_size) == $_chunk_size) {
372 0         0 $_buf .= $_input_glob->getline;
373 0 0       0 $_eof_flag = 1 if (length $_buf == $_chunk_size);
374             } else {
375 18         948 $_eof_flag = 1;
376             }
377             }
378             else {
379 0 0       0 if (read($_input_glob, $_buf, $_chunk_size) == $_chunk_size) {
380 0         0 $_buf .= <$_input_glob>;
381 0 0       0 $_eof_flag = 1 if (length $_buf == $_chunk_size);
382             } else {
383 0         0 $_eof_flag = 1;
384             }
385             }
386             }
387             }
388              
389 18 50       38 $_len = length $_buf; local $\ = undef if (defined $\);
  18         71  
390              
391 18 50       47 if ($_len) {
392 18         171 my $_tmp = $self->{freeze}(\$_buf);
393 18         37 print {$_DAU_R_SOCK} length($_tmp).$LF . (++$_chunk_id).$LF, $_tmp;
  18         668  
394             }
395             else {
396 0         0 print {$_DAU_R_SOCK} '0'.$LF;
  0         0  
397             }
398              
399 18         115 return;
400             },
401              
402             OUTPUT_H_REF.$LF => sub { # Input << Hash ref
403 18     18   39 my @_pairs;
404              
405 18 100 66     154 if ($_offset_pos >= $_input_size || $_aborted) {
406 3 50       17 local $\ = undef if (defined $\);
407 3         11 print {$_DAU_R_SOCK} '0'.$LF;
  3         152  
408              
409 3         32 return;
410             }
411              
412 15 100       118 if ($_offset_pos + $_chunk_size - 1 < $_input_size) {
413 12         69 for my $_i ($_offset_pos .. $_offset_pos + $_chunk_size - 1) {
414 24         53 push @_pairs, each %{ $_input_data };
  24         192  
415             }
416             }
417             else {
418 3         28 for my $_i ($_offset_pos .. $_input_size - 1) {
419 3         17 push @_pairs, each %{ $_input_data };
  3         59  
420             }
421             }
422              
423 15         176 my $_buf = $self->{freeze}(\@_pairs);
424              
425 15 50       40 $_len = length $_buf; local $\ = undef if (defined $\);
  15         63  
426 15         37 print {$_DAU_R_SOCK} $_len.$LF . (++$_chunk_id).$LF, $_buf;
  15         811  
427 15         77 $_offset_pos += $_chunk_size;
428              
429 15         83 return;
430             },
431              
432             OUTPUT_I_REF.$LF => sub { # Input << Iter ref
433 0     0   0 my $_buf;
434              
435 0 0       0 if ($_aborted) {
436 0 0       0 local $\ = undef if (defined $\);
437 0         0 print {$_DAU_R_SOCK} '-1'.$LF;
  0         0  
438              
439 0         0 return;
440             }
441              
442 0         0 my @_ret_a = $_input_data->($_chunk_size);
443              
444 0 0 0     0 if (@_ret_a > 1 || defined $_ret_a[0]) {
445 0         0 $_buf = $self->{freeze}([ @_ret_a ]);
446 0 0       0 $_len = length $_buf; local $\ = undef if (defined $\);
  0         0  
447 0         0 print {$_DAU_R_SOCK} $_len.$LF . (++$_chunk_id).$LF, $_buf;
  0         0  
448              
449 0         0 return;
450             }
451              
452 0 0       0 local $\ = undef if (defined $\);
453 0         0 print {$_DAU_R_SOCK} '-1'.$LF;
  0         0  
454 0         0 $_aborted = 1;
455              
456 0         0 return;
457             },
458              
459             ## ----------------------------------------------------------------------
460              
461             OUTPUT_A_CBK.$LF => sub { # Callback w/ args
462 283     283   4022 chomp($_wa = <$_DAU_R_SOCK>),
463             chomp($_cb = <$_DAU_R_SOCK>),
464             chomp($_len = <$_DAU_R_SOCK>);
465              
466 283         1456 read $_DAU_R_SOCK, my($_buf), $_len;
467              
468 283         3542 my $_aref = $self->{thaw}($_buf);
469 283         671 undef $_buf;
470              
471 283         464 return $_cb_reply->(@{ $_aref });
  283         781  
472             },
473              
474             OUTPUT_N_CBK.$LF => sub { # Callback w/ no args
475 2     2   47 chomp($_wa = <$_DAU_R_SOCK>),
476             chomp($_cb = <$_DAU_R_SOCK>);
477              
478 2         15 return $_cb_reply->();
479             },
480              
481             OUTPUT_A_GTR.$LF => sub { # Gather data
482 1108     1108   13315 chomp($_task_id = <$_DAU_R_SOCK>),
483             chomp($_len = <$_DAU_R_SOCK>);
484              
485 1108         4594 read $_DAU_R_SOCK, my($_buf), $_len;
486              
487 1108 100       3286 if ($_is_c_ref[$_task_id]) {
    50          
    100          
488 579         5459 local $_ = $self->{thaw}($_buf);
489 579         1040 $_gather[$_task_id]->(@{ $_ });
  579         2121  
490             }
491             elsif ($_is_h_ref[$_task_id]) {
492 0         0 local $_ = $self->{thaw}($_buf);
493 0         0 while (1) {
494 0         0 my $_key = shift @{ $_ }; my $_val = shift @{ $_ };
  0         0  
  0         0  
  0         0  
495 0         0 $_gather[$_task_id]->{$_key} = $_val;
496 0 0       0 last unless (@{ $_ });
  0         0  
497             }
498             }
499             elsif ($_is_q_ref[$_task_id]) {
500 379         546 $_gather[$_task_id]->enqueue(@{ $self->{thaw}($_buf) });
  379         4607  
501             }
502             else {
503 150         214 push @{ $_gather[$_task_id] }, @{ $self->{thaw}($_buf) };
  150         325  
  150         2261  
504             }
505              
506 1108         5141 return;
507             },
508              
509             ## ----------------------------------------------------------------------
510              
511             OUTPUT_O_SND.$LF => sub { # Send >> STDOUT
512 0     0   0 chomp($_len = <$_DAU_R_SOCK>);
513              
514 0         0 read $_DAU_R_SOCK, my($_buf), $_len;
515 0         0 $_buf = ${ $self->{thaw}($_buf) };
  0         0  
516              
517 0 0       0 if (defined $_user_output) {
518 0         0 $_user_output->($_buf);
519             }
520             else {
521 85     85   50736 use bytes;
  85         1302  
  85         583  
522 0         0 print {$_MCE_STDOUT} $_buf;
  0         0  
523             }
524              
525 0         0 return;
526             },
527              
528             OUTPUT_E_SND.$LF => sub { # Send >> STDERR
529 0     0   0 chomp($_len = <$_DAU_R_SOCK>);
530              
531 0         0 read $_DAU_R_SOCK, my($_buf), $_len;
532 0         0 $_buf = ${ $self->{thaw}($_buf) };
  0         0  
533              
534 0 0       0 if (defined $_user_error) {
535 0         0 $_user_error->($_buf);
536             }
537             else {
538 85     85   10842 use bytes;
  85         200  
  85         304  
539 0         0 print {$_MCE_STDERR} $_buf;
  0         0  
540             }
541              
542 0         0 return;
543             },
544              
545             OUTPUT_F_SND.$LF => sub { # Send >> File
546 0     0   0 my ($_buf, $_OUT_FILE);
547              
548 0         0 chomp($_len = <$_DAU_R_SOCK>);
549 0         0 read $_DAU_R_SOCK, $_buf, $_len;
550              
551 0         0 $_buf = $self->{thaw}($_buf);
552 0         0 $_file = $_buf->[0];
553              
554 0 0       0 unless (exists $_sendto_fhs{$_file}) {
555 0 0       0 open $_sendto_fhs{$_file}, ">>", "$_file"
556             or _croak "Cannot open file for writing ($_file): $!";
557              
558 0         0 binmode $_sendto_fhs{$_file};
559              
560 0 0 0     0 if (!exists $self->{flush_file} || $self->{flush_file}) {
561 0         0 local $!;
562 0         0 $_sendto_fhs{$_file}->autoflush(1);
563             }
564             }
565              
566             {
567 85     85   18244 use bytes;
  85         266  
  85         485  
  0         0  
568 0         0 $_OUT_FILE = $_sendto_fhs{$_file};
569 0         0 print {$_OUT_FILE} $_buf->[1];
  0         0  
570             }
571              
572 0         0 return;
573             },
574              
575             OUTPUT_D_SND.$LF => sub { # Send >> File descriptor
576 0     0   0 my ($_buf, $_OUT_FILE);
577              
578 0         0 chomp($_len = <$_DAU_R_SOCK>);
579 0         0 read $_DAU_R_SOCK, $_buf, $_len;
580              
581 0         0 $_buf = $self->{thaw}($_buf);
582              
583             {
584 85     85   11395 use bytes;
  85         170  
  85         370  
  0         0  
585 0         0 $_OUT_FILE = _sendto_fhs_get($self, $_buf->[0]);
586 0         0 print {$_OUT_FILE} $_buf->[1];
  0         0  
587             }
588              
589 0         0 return;
590             },
591              
592             ## ----------------------------------------------------------------------
593              
594             OUTPUT_B_SYN.$LF => sub { # Barrier sync - begin
595 0 0 0 0   0 if (!defined $_sync_cnt || $_sync_cnt == 0) {
596 0         0 $_syn_flag = 1, $_sync_cnt = 0;
597             }
598              
599             my $_total_running = ($_has_user_tasks)
600             ? $self->{_task}->[0]->{_total_running}
601 0 0       0 : $self->{_total_running};
602              
603 0 0       0 if (++$_sync_cnt == $_total_running) {
604 0         0 for my $_i (1 .. $_total_running) {
605 0         0 syswrite($_BSB_W_SOCK, $LF);
606             }
607 0         0 undef $_syn_flag;
608             }
609              
610 0         0 return;
611             },
612              
613             OUTPUT_E_SYN.$LF => sub { # Barrier sync - end
614 0 0   0   0 if (--$_sync_cnt == 0) {
615             my $_total_running = ($_has_user_tasks)
616             ? $self->{_task}->[0]->{_total_running}
617 0 0       0 : $self->{_total_running};
618              
619 0         0 for my $_i (1 .. $_total_running) {
620 0         0 syswrite($_BSB_R_SOCK, $LF);
621             }
622             }
623              
624 0         0 return;
625             },
626              
627             OUTPUT_S_IPC.$LF => sub { # Change to win32 IPC
628 0     0   0 syswrite($_DAT_R_SOCK, $LF);
629              
630 0 0       0 $_win32_ipc = 1, goto _LOOP unless $_win32_ipc;
631              
632 0         0 return;
633             },
634              
635             OUTPUT_C_NFY.$LF => sub { # Chunk ID notification
636 0     0   0 chomp($_len = <$_DAU_R_SOCK>);
637              
638 0         0 my ($_pid, $_chunk_id) = split /:/, $_len;
639 0         0 $self->{_pids_c}{$_pid} = $_chunk_id;
640              
641 0         0 return;
642             },
643              
644             OUTPUT_P_NFY.$LF => sub { # Progress notification
645 0     0   0 chomp($_len = <$_DAU_R_SOCK>);
646              
647 0         0 $self->{progress}->( $_size_completed += $_len );
648              
649 0         0 return;
650             },
651              
652             OUTPUT_S_DIR.$LF => sub { # Make/get sess_dir
653 0     0   0 print {$_DAU_R_SOCK} $self->sess_dir().$LF;
  0         0  
654              
655 0         0 return;
656             },
657              
658             OUTPUT_T_DIR.$LF => sub { # Make/get tmp_dir
659 0     0   0 print {$_DAU_R_SOCK} $self->tmp_dir().$LF;
  0         0  
660              
661 0         0 return;
662             },
663              
664             OUTPUT_I_DLY.$LF => sub { # Interval delay
665 0 0   0   0 my $_tasks = $_has_user_tasks ? $self->{user_tasks} : undef;
666              
667 0         0 chomp($_task_id = <$_DAU_R_SOCK>);
668              
669             my $_interval = ($_tasks && exists $_tasks->[$_task_id]{interval})
670             ? $_tasks->[$_task_id]{interval}
671 0 0 0     0 : $self->{interval};
672              
673 0 0       0 if (!$_interval) {
    0          
674 0         0 print {$_DAU_R_SOCK} '0'.$LF;
  0         0  
675             }
676             elsif ($_interval->{max_nodes} == 1) {
677 0         0 my $_delay = $_interval->{delay};
678 0         0 my $_lapse = $_interval->{_lapse};
679 0         0 my $_time = MCE::Util::_time();
680              
681 0 0 0     0 if (!$_delay || !defined $_lapse) {
    0          
682 0         0 $_lapse = $_time;
683             }
684             elsif ($_lapse + $_delay - $_time < 0) {
685 0         0 $_lapse += int( abs($_time - $_lapse) / $_delay + 0.5 ) * $_delay;
686             }
687              
688 0         0 $_interval->{_lapse} = ($_lapse += $_delay);
689 0         0 print {$_DAU_R_SOCK} ($_lapse - $_time).$LF
  0         0  
690             }
691             else {
692             my $_max_workers = ($_tasks)
693             ? $_tasks->[$_task_id]{max_workers}
694 0 0       0 : $self->{max_workers};
695              
696 0 0       0 if (++$_delay_wid[$_task_id] > $_max_workers) {
697 0         0 $_delay_wid[$_task_id] = 1;
698             }
699              
700 0         0 my $_nodes = $_interval->{max_nodes};
701 0         0 my $_id = $_interval->{node_id};
702 0         0 my $_delay = $_interval->{delay} * $_nodes;
703              
704 0         0 my $_app_tb = $_delay * $_max_workers;
705 0         0 my $_app_st = $_interval->{_time} + ($_delay / $_nodes * $_id);
706 0         0 my $_wrk_st = ($_delay_wid[$_task_id] - 1) * $_delay + $_app_st;
707              
708 0         0 $_delay = $_wrk_st - MCE::Util::_time();
709              
710 0 0 0     0 if ($_delay < 0.0 && $_app_tb) {
711 0         0 my $_count = int($_delay * -1 / $_app_tb + 0.5) + 1;
712 0         0 $_delay += $_app_tb * $_count;
713 0 0       0 $_interval->{_time} = MCE::Util::_time() if ($_count > 2e9);
714             }
715              
716             ($_delay > 0.0)
717 0         0 ? print {$_DAU_R_SOCK} $_delay.$LF
718 0 0       0 : print {$_DAU_R_SOCK} '0'.$LF;
  0         0  
719             }
720              
721 0         0 return;
722             },
723              
724 142         28562 );
725              
726             ## -------------------------------------------------------------------------
727              
728 142         2881 local ($!, $?, $_);
729              
730 142         396 $_aborted = $_chunk_id = $_eof_flag = $_size_completed = 0;
731 142 100       568 $_has_user_tasks = (defined $self->{user_tasks}) ? 1 : 0;
732 142 100       453 $_cs_one_flag = ($self->{chunk_size} == 1) ? 1 : 0;
733              
734 142         357 $_max_retries = $self->{max_retries};
735 142         276 $_on_post_exit = $self->{on_post_exit};
736 142         287 $_on_post_run = $self->{on_post_run};
737 142         243 $_chunk_size = $self->{chunk_size};
738 142         251 $_user_output = $self->{user_output};
739 142         236 $_user_error = $self->{user_error};
740 142         271 $_single_dim = $self->{_single_dim};
741 142         225 $_sess_dir = $self->{_sess_dir};
742              
743 142 50 33     645 if (defined $_max_retries && !$_on_post_exit) {
744             $_on_post_exit = sub {
745 0     0   0 my ($self, $_e, $_retry_cnt) = @_;
746              
747 0 0       0 if ($_e->{id}) {
748 0         0 my $_cnt = $_retry_cnt + 1;
749 0         0 my $_msg = "Error: chunk $_e->{id} failed";
750              
751 0 0       0 if (defined $self->{init_relay}) {
752 0 0       0 print {*STDERR} "$_msg, retrying chunk attempt # $_cnt\n"
  0         0  
753             if ($_retry_cnt < $_max_retries);
754             }
755             else {
756             ($_retry_cnt < $_max_retries)
757 0         0 ? print {*STDERR} "$_msg, retrying chunk attempt # $_cnt\n"
758 0 0       0 : print {*STDERR} "$_msg\n";
  0         0  
759             }
760              
761 0         0 $self->restart_worker;
762             }
763 0         0 };
764             }
765              
766 142 50 66     1253 if ($_has_user_tasks && $self->{user_tasks}->[0]->{chunk_size}) {
767 0         0 $_chunk_size = $self->{user_tasks}->[0]->{chunk_size};
768             }
769              
770 142 100       420 if ($_has_user_tasks) {
771 108         157 for my $_i (0 .. @{ $self->{user_tasks} } - 1) {
  108         998  
772             $_gather[$_i] = (defined $self->{user_tasks}->[$_i]->{gather})
773 180 100       1038 ? $self->{user_tasks}->[$_i]->{gather} : $self->{gather};
774              
775 180 100       765 $_is_c_ref[$_i] = ( ref $_gather[$_i] eq 'CODE' ) ? 1 : 0;
776 180 50       1663 $_is_h_ref[$_i] = ( ref $_gather[$_i] eq 'HASH' ) ? 1 : 0;
777              
778 180 100 66     1118 $_is_q_ref[$_i] = (
779             ref $_gather[$_i] eq 'MCE::Queue' ||
780             ref $_gather[$_i] eq 'Thread::Queue' ) ? 1 : 0;
781             }
782             }
783              
784 142 100       550 if (defined $self->{gather}) {
785 62         1089 $_gather[0] = $self->{gather};
786              
787 62 100       292 $_is_c_ref[0] = ( ref $_gather[0] eq 'CODE' ) ? 1 : 0;
788 62 50       289 $_is_h_ref[0] = ( ref $_gather[0] eq 'HASH' ) ? 1 : 0;
789              
790 62 50 33     576 $_is_q_ref[0] = (
791             ref $_gather[0] eq 'MCE::Queue' ||
792             ref $_gather[0] eq 'Thread::Queue' ) ? 1 : 0;
793             }
794              
795 142 100 100     1045 if (defined $_input_data && ref $_input_data eq 'ARRAY') {
    100 66        
796 40         69 $_input_size = @{ $_input_data };
  40         95  
797 40         104 $_offset_pos = 0;
798             }
799             elsif (defined $_input_data && ref $_input_data eq 'HASH') {
800 3         16 $_input_size = scalar( keys %{ $_input_data } );
  3         22  
801 3         25 $_offset_pos = 0;
802             }
803             else {
804 99         180 $_input_size = $_offset_pos = 0;
805             }
806              
807             ## Set STDOUT/STDERR to user parameters.
808              
809 142 50       446 if (defined $self->{stdout_file}) {
810             open $_MCE_STDOUT, '>>', $self->{stdout_file}
811 0 0       0 or die $self->{stdout_file} . ": $!\n";
812 0         0 binmode $_MCE_STDOUT;
813             }
814             else {
815 142         584 $_MCE_STDOUT = \*STDOUT;
816             }
817              
818 142 50       395 if (defined $self->{stderr_file}) {
819             open $_MCE_STDERR, '>>', $self->{stderr_file}
820 0 0       0 or die $self->{stderr_file} . ": $!\n";
821 0         0 binmode $_MCE_STDERR;
822             }
823             else {
824 142         266 $_MCE_STDERR = \*STDERR;
825             }
826              
827             ## Autoflush STDERR-STDOUT handles if not specified or requested.
828              
829             {
830 142         210 local $!;
  142         521  
831              
832             $_MCE_STDERR->autoflush(1)
833 142 50 33     1741 if ( !exists $self->{flush_stderr} || $self->{flush_stderr} );
834              
835             $_MCE_STDOUT->autoflush(1)
836 142 50 33     11016 if ( !exists $self->{flush_stdout} || $self->{flush_stdout} );
837             }
838              
839             ## -------------------------------------------------------------------------
840              
841             ## Output event loop.
842              
843 142         4496 my $_channels = $self->{_dat_r_sock};
844 142         280 my $_func;
845              
846             $_win32_ipc = (
847             $ENV{'PERL_MCE_IPC'} eq 'win32' ||
848             defined($self->{max_retries}) ||
849             $INC{'MCE/Child.pm'} ||
850 142   33     2299 $INC{'MCE/Hobo.pm'}
851             );
852              
853 142         350 $_BSB_W_SOCK = $self->{_bsb_w_sock};
854 142         257 $_BSB_R_SOCK = $self->{_bsb_r_sock};
855 142         241 $_DAT_R_SOCK = $self->{_dat_r_sock}->[0];
856              
857 142   33     834 $_RS = $self->{RS} || $/;
858 142         538 $_O_SEP = $\; local $\ = undef;
  142         380  
859 142         333 $_I_SEP = $/; local $/ = $LF;
  142         550  
860              
861 142 50 33     1000 $_RS_FLG = (!$_RS || $_RS ne $LF) ? 1 : 0;
862 142 50       346 $_O_FLG = (defined $_O_SEP) ? 1 : 0;
863 142 50 33     826 $_I_FLG = (!$_I_SEP || $_I_SEP ne $LF) ? 1 : 0;
864              
865             ## Call module's loop_begin routine for modules plugged into MCE.
866              
867 142         327 for my $_p (@{ $_plugin_loop_begin }) {
  142         959  
868 103         943 $_p->($self, \$_DAU_R_SOCK);
869             }
870              
871             ## Wait on requests *with* timeout capability. Exit loop when all workers
872             ## have completed processing or exited prematurely.
873              
874             _LOOP:
875              
876 142 50 33     1522 if ($self->{loop_timeout} && @{ $self->{_tids} } == 0 && $^O ne 'MSWin32') {
  0 50 33     0  
877 0         0 my ($_list, $_timeout) = ($self->{_pids}, $self->{loop_timeout});
878 0         0 my ($_DAT_W_SOCK, $_pid) = ($self->{_dat_w_sock}->[0]);
879              
880 0         0 $self->{_pids_c} = {}; # Chunk ID notification
881              
882 0 0       0 $_timeout = 5 if $_timeout < 5;
883              
884             local $SIG{ALRM} = sub {
885 0     0   0 alarm 0; local ($!, $?);
  0         0  
886              
887 0         0 for my $i (0 .. @{ $_list }) {
  0         0  
888 0 0       0 if ($_pid = $_list->[$i]) {
889 0 0       0 if (waitpid($_pid, _WNOHANG)) {
890              
891 0         0 $_list->[$i] = undef;
892              
893 0 0       0 if ($? > abs($self->{_wrk_status})) {
894 0         0 $self->{_wrk_status} = $?;
895             }
896              
897 0         0 my $_task_id = $self->{_pids_t}{$_pid};
898 0         0 my $_wid = $self->{_pids_w}{$_pid};
899              
900 0         0 $self->{_total_exited} += 1;
901 0         0 $self->{_total_running} -= 1;
902 0         0 $self->{_total_workers} -= 1;
903              
904 0 0 0     0 if ($_has_user_tasks && $_task_id >= 0) {
905 0         0 $self->{_task}->[$_task_id]->{_total_running} -= 1;
906 0         0 $self->{_task}->[$_task_id]->{_total_workers} -= 1;
907             }
908              
909             my $_total_running = ($_has_user_tasks)
910             ? $self->{_task}->[$_task_id]->{_total_running}
911 0 0       0 : $self->{_total_running};
912              
913 0 0 0     0 if ($_task_id == 0 && defined $_syn_flag && $_sync_cnt) {
      0        
914 0 0       0 if ($_sync_cnt == $_total_running) {
915 0         0 for my $_i (1 .. $_total_running) {
916 0         0 syswrite($_BSB_W_SOCK, $LF);
917             }
918 0         0 undef $_syn_flag;
919             }
920             }
921              
922 0 0       0 _task_end($self, $_task_id) unless $_total_running;
923              
924 0 0       0 if (my $_cid = $self->{_pids_c}{$_pid}) {
925 0         0 warn "Error: process $_pid has ended prematurely\n",
926             "Error: chunk $_cid failed\n";
927              
928 0 0       0 if ($_cid > $self->{_relayed}) {
929 0         0 local $SIG{CHLD} = 'IGNORE';
930 0         0 my $_pid = fork;
931              
932 0 0 0     0 if (defined $_pid && $_pid == 0) {
933 0         0 delete $self->{max_retries};
934              
935 0         0 $self->{_chunk_id} = $_cid;
936 0         0 $self->{_task_id} = $_task_id;
937 0         0 $self->{_wid} = $_wid;
938              
939 0         0 eval 'MCE::relay';
940              
941 0         0 CORE::kill('KILL', $$);
942 0         0 CORE::exit(0);
943             }
944             }
945             }
946              
947 0         0 delete $self->{_pids_c}{$_pid};
948 0         0 delete $self->{_pids_t}{$_pid};
949 0         0 delete $self->{_pids_w}{$_pid};
950             }
951             }
952             }
953              
954 0         0 print {$_DAT_W_SOCK} 'NOOP'.$LF . '0'.$LF;
  0         0  
955 0         0 };
956              
957 0         0 while ( $self->{_total_running} ) {
958 0         0 alarm($_timeout); $_func = <$_DAT_R_SOCK>, alarm(0);
  0         0  
959 0         0 $_DAU_R_SOCK = $_channels->[ <$_DAT_R_SOCK> ];
960              
961 0 0       0 if (exists $_core_output_function{$_func}) {
    0          
962 0         0 $_core_output_function{$_func}();
963             } elsif (exists $_plugin_function->{$_func}) {
964 0         0 $_plugin_function->{$_func}();
965             }
966             }
967              
968 0         0 delete $self->{_pids_c};
969             }
970              
971             ## Wait on requests *without* timeout capability.
972              
973             elsif ($^O eq 'MSWin32') {
974 0 0       0 MCE::Util::_nonblocking($_DAT_R_SOCK, 1) if $_win32_ipc;
975              
976 0         0 while ($self->{_total_running}) {
977 0         0 MCE::Util::_sysread2($_DAT_R_SOCK, $_func, 8);
978 0 0       0 last() unless length($_func) == 8;
979 0         0 $_DAU_R_SOCK = $_channels->[ substr($_func, -2, 2, '') ];
980              
981 0 0       0 if (exists $_core_output_function{$_func}) {
    0          
982 0         0 $_core_output_function{$_func}();
983             } elsif (exists $_plugin_function->{$_func}) {
984 0         0 $_plugin_function->{$_func}();
985             }
986             }
987              
988 0 0       0 MCE::Util::_nonblocking($_DAT_R_SOCK, 0) if $_win32_ipc;
989             }
990             else {
991 142         428 while ($self->{_total_running}) {
992 3760         467094 $_func = <$_DAT_R_SOCK>;
993 3760 50       12639 last() unless length($_func) == 6;
994 3760         9843 $_DAU_R_SOCK = $_channels->[ <$_DAT_R_SOCK> ];
995              
996 3760 100       9786 if (exists $_core_output_function{$_func}) {
    50          
997 2269         4978 $_core_output_function{$_func}();
998             } elsif (exists $_plugin_function->{$_func}) {
999 1491         5167 $_plugin_function->{$_func}();
1000             }
1001             }
1002             }
1003              
1004             ## Call module's loop_end routine for modules plugged into MCE.
1005              
1006 142         299 for my $_p (@{ $_plugin_loop_end }) {
  142         526  
1007 103         377 $_p->($self);
1008             }
1009              
1010             ## Call on_post_run callback.
1011              
1012 142 50       594 $_on_post_run->($self, $self->{_status}) if (defined $_on_post_run);
1013              
1014             ## Close opened sendto file handles.
1015              
1016 142         438 _sendto_fhs_close();
1017              
1018             ## Close MCE STDOUT/STDERR handles.
1019              
1020 142         22691 eval q{
1021             close $_MCE_STDOUT if (fileno $_MCE_STDOUT > 2);
1022             close $_MCE_STDERR if (fileno $_MCE_STDERR > 2);
1023             };
1024              
1025 142         17009 return;
1026             }
1027              
1028             1;
1029              
1030             __END__