File Coverage

blib/lib/MCE/Core/Manager.pm
Criterion Covered Total %
statement 214 519 41.2
branch 89 288 30.9
condition 33 116 28.4
subroutine 20 38 52.6
pod n/a
total 356 961 37.0


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## Core methods for the manager process.
4             ##
5             ## This package provides the loop and relevant methods used internally by the
6             ## manager process.
7             ##
8             ## There is no public API.
9             ##
10             ###############################################################################
11              
12             package MCE::Core::Manager;
13              
14 94     94   608 use strict;
  94         231  
  94         3685  
15 94     94   577 use warnings;
  94         191  
  94         8390  
16              
17             our $VERSION = '1.902';
18              
19             ## no critic (BuiltinFunctions::ProhibitStringyEval)
20             ## no critic (TestingAndDebugging::ProhibitNoStrict)
21              
22             ## Items below are folded into MCE.
23              
24             package # hide from rpm
25             MCE;
26              
27 94     94   668 no warnings qw( threads recursion uninitialized );
  94         180  
  94         8969  
28              
29             ## The POSIX module has many symbols. Try not loading it simply
30             ## to have WNOHANG. The following covers most platforms.
31              
32             use constant {
33 94 50       64437 _WNOHANG => ( $INC{'POSIX.pm'} )
    50          
34             ? &POSIX::WNOHANG : ( $^O eq 'solaris' ) ? 64 : 1
35 94     94   867 };
  94         216  
36              
37             ###############################################################################
38             ## ----------------------------------------------------------------------------
39             ## Call on task_end after task completion.
40             ##
41             ###############################################################################
42              
43             sub _task_end {
44              
45 220     220   704 my ($self, $_task_id) = @_;
46              
47 220         475 @_ = ();
48              
49 220 100       778 if (defined $self->{user_tasks}) {
    50          
50             my $_task_end = (exists $self->{user_tasks}->[$_task_id]->{task_end})
51             ? $self->{user_tasks}->[$_task_id]->{task_end}
52 181 50       740 : $self->{task_end};
53              
54 181 100       595 if (defined $_task_end) {
55             my $_task_name = (exists $self->{user_tasks}->[$_task_id]->{task_name})
56             ? $self->{user_tasks}->[$_task_id]->{task_name}
57 144 50       820 : $self->{task_name};
58              
59 144         752 $_task_end->($self, $_task_id, $_task_name);
60             }
61             }
62             elsif (defined $self->{task_end}) {
63 0         0 $self->{task_end}->($self, 0, $self->{task_name});
64             }
65              
66 220         523 return;
67             }
68              
69             ###############################################################################
70             ## ----------------------------------------------------------------------------
71             ## Process output.
72             ##
73             ## Awaits and processes events from workers. The sendto/do methods tag the
74             ## output accordingly. The hash structure below is key-driven.
75             ##
76             ###############################################################################
77              
78             my %_sendto_fhs;
79              
80             sub _sendto_fhs_close {
81 536     536   3154 for my $_p (keys %_sendto_fhs) {
82 0         0 close $_sendto_fhs{$_p};
83 0         0 delete $_sendto_fhs{$_p};
84             }
85             }
86              
87             sub _sendto_fhs_get {
88 0     0   0 my ($self, $_fd) = @_;
89              
90 0 0       0 $_sendto_fhs{$_fd} || do {
91              
92 0         0 $_sendto_fhs{$_fd} = IO::Handle->new();
93 0 0       0 $_sendto_fhs{$_fd}->fdopen($_fd, 'w')
94             or _croak "Cannot open file descriptor ($_fd): $!";
95              
96 0         0 binmode $_sendto_fhs{$_fd};
97              
98 0 0 0     0 if (!exists $self->{flush_file} || $self->{flush_file}) {
99 0         0 local $!;
100 0         0 $_sendto_fhs{$_fd}->autoflush(1)
101             }
102              
103 0         0 $_sendto_fhs{$_fd};
104             };
105             }
106              
107             sub _output_loop {
108              
109 148     148   786 my ( $self, $_input_data, $_input_glob, $_plugin_function,
110             $_plugin_loop_begin, $_plugin_loop_end ) = @_;
111              
112 148         417 @_ = ();
113              
114             my (
115 148         704 $_aborted, $_eof_flag, $_max_retries, $_syn_flag, $_win32_ipc,
116             $_cb, $_chunk_id, $_chunk_size, $_file, $_size_completed, $_wa,
117             @_is_c_ref, @_is_h_ref, @_is_q_ref, $_on_post_exit, $_on_post_run,
118             $_has_user_tasks, $_sess_dir, $_task_id, $_user_error, $_user_output,
119             $_input_size, $_offset_pos, $_single_dim, @_gather, $_cs_one_flag,
120             $_exit_id, $_exit_pid, $_exit_status, $_exit_wid, $_len, $_sync_cnt,
121             $_BSB_W_SOCK, $_BSB_R_SOCK, $_DAT_R_SOCK, $_DAU_R_SOCK, $_MCE_STDERR,
122             $_I_FLG, $_O_FLG, $_I_SEP, $_O_SEP, $_RS, $_RS_FLG, $_MCE_STDOUT,
123             @_delay_wid
124             );
125              
126             ## -------------------------------------------------------------------------
127             ## Callback return.
128              
129             my $_cb_reply = sub {
130 285 50   285   947 local $\ = $_O_SEP if ($_O_FLG);
131 285 50       654 local $/ = $_I_SEP if ($_I_FLG);
132              
133 94     94   781 no strict 'refs';
  94         204  
  94         288707  
134              
135 285 100       813 if ( $_wa == WANTS_UNDEF ) {
    100          
136 274         3720 $_cb->(@_);
137 274         252486 return;
138             }
139             elsif ( $_wa == WANTS_ARRAY ) {
140 2         58 my @_ret = $_cb->(@_);
141 2         102 my $_buf = $self->{freeze}(\@_ret);
142              
143 2         7 return print {$_DAU_R_SOCK} length($_buf).$LF, $_buf;
  2         59  
144             }
145              
146 9         279 my $_ret = $_cb->(@_);
147 9         180 my $_buf = $self->{freeze}([ $_ret ]);
148              
149 9         42 return print {$_DAU_R_SOCK} length($_buf).$LF, $_buf;
  9         376  
150 148         3837 };
151              
152             ## -------------------------------------------------------------------------
153             ## Create hash structure containing various output functions.
154              
155             my %_core_output_function = (
156              
157             OUTPUT_W_ABT.$LF => sub { # Worker has aborted
158 0     0   0 $_aborted = 1;
159 0         0 return;
160             },
161              
162             OUTPUT_W_DNE.$LF => sub { # Worker has completed
163 452     452   8682 chomp($_task_id = <$_DAU_R_SOCK>);
164 904         965 $self->{_total_running} -= 1;
165              
166 904 100 66     2339 if ($_has_user_tasks && $_task_id >= 0) {
167 351         871 $self->{_task}->[$_task_id]->{_total_running} -= 1;
168             }
169              
170             my $_total_running = ($_has_user_tasks)
171             ? $self->{_task}->[$_task_id]->{_total_running}
172 904 100       1271 : $self->{_total_running};
173              
174 904 50 66     2193 if ($_task_id == 0 && defined $_syn_flag && $_sync_cnt) {
      33        
175 0 0       0 if ($_sync_cnt == $_total_running) {
176 0         0 for my $_i (1 .. $_total_running) {
177 0         0 syswrite($_BSB_W_SOCK, $LF);
178             }
179 0         0 undef $_syn_flag;
180             }
181             }
182              
183 904 100       1911 _task_end($self, $_task_id) unless $_total_running;
184              
185 904         1285 return;
186             },
187              
188             ## ----------------------------------------------------------------------
189              
190             OUTPUT_W_EXT.$LF => sub { # Worker has exited
191 0     0   0 chomp($_task_id = <$_DAU_R_SOCK>);
192              
193 0         0 $self->{_total_exited} += 1;
194 0         0 $self->{_total_running} -= 1;
195 0         0 $self->{_total_workers} -= 1;
196              
197 0 0 0     0 if ($_has_user_tasks && $_task_id >= 0) {
198 0         0 $self->{_task}->[$_task_id]->{_total_running} -= 1;
199 0         0 $self->{_task}->[$_task_id]->{_total_workers} -= 1;
200             }
201              
202             my $_total_running = ($_has_user_tasks)
203             ? $self->{_task}->[$_task_id]->{_total_running}
204 0 0       0 : $self->{_total_running};
205              
206 0 0 0     0 if ($_task_id == 0 && defined $_syn_flag && $_sync_cnt) {
      0        
207 0 0       0 if ($_sync_cnt == $_total_running) {
208 0         0 for my $_i (1 .. $_total_running) {
209 0         0 syswrite($_BSB_W_SOCK, $LF);
210             }
211 0         0 undef $_syn_flag;
212             }
213             }
214              
215 0         0 my ($_exit_msg, $_retry_buf) = ('', '');
216              
217 0         0 chomp($_exit_wid = <$_DAU_R_SOCK>),
218             chomp($_exit_pid = <$_DAU_R_SOCK>),
219             chomp($_exit_status = <$_DAU_R_SOCK>),
220             chomp($_exit_id = <$_DAU_R_SOCK>),
221             chomp($_len = <$_DAU_R_SOCK>);
222              
223 0 0       0 read($_DAU_R_SOCK, $_exit_msg, $_len) if ($_len);
224              
225 0         0 chomp($_len = <$_DAU_R_SOCK>);
226              
227 0 0       0 read($_DAU_R_SOCK, $_retry_buf, $_len) if ($_len);
228              
229 0 0       0 if (abs($_exit_status) > abs($self->{_wrk_status})) {
230 0         0 $self->{_wrk_status} = $_exit_status;
231             }
232              
233             ## Reap child/thread. Note: Win32 uses negative PIDs.
234              
235 0         0 local ($!, $?);
236              
237 0 0       0 if ($_exit_pid =~ /^PID_(-?\d+)/) {
    0          
238 0         0 my $_pid = $1; my $_list = $self->{_pids};
  0         0  
239 0         0 for my $i (0 .. @{ $_list }) {
  0         0  
240 0 0 0     0 if ($_list->[$i] && $_list->[$i] == $_pid) {
241 0         0 waitpid $_pid, 0;
242 0         0 $self->{_pids}->[$i] = undef;
243 0         0 last;
244             }
245             }
246             }
247             elsif ($_exit_pid =~ /^TID_(\d+)/) {
248 0         0 my $_tid = $1; my $_list = $self->{_tids};
  0         0  
249 0         0 for my $i (0 .. @{ $_list }) {
  0         0  
250 0 0 0     0 if ($_list->[$i] && $_list->[$i] == $_tid) {
251 0         0 eval { $self->{_thrs}->[$i]->join() };
  0         0  
252 0         0 $self->{_thrs}->[$i] = undef;
253 0         0 $self->{_tids}->[$i] = undef;
254 0         0 last;
255             }
256             }
257             }
258              
259             ## Call on_post_exit callback if defined. Otherwise, append status
260             ## information if on_post_run is defined for later retrieval.
261              
262 0 0       0 if (defined $_on_post_exit) {
    0          
263 0         0 $self->{_exited_wid} = $_exit_wid;
264              
265 0 0       0 if (length($_retry_buf)) {
266 0         0 $self->{_retry} = $self->{thaw}($_retry_buf);
267 0         0 $self->{_retry_cnt} = $_max_retries - $self->{_retry}[2] - 1;
268              
269             $_on_post_exit->($self, {
270             wid => $_exit_wid, pid => $_exit_pid, status => $_exit_status,
271             msg => $_exit_msg, id => $_exit_id
272 0         0 }, $self->{_retry_cnt});
273              
274 0         0 delete $self->{_retry};
275             }
276             else {
277 0   0     0 $_on_post_exit->($self, {
278             wid => $_exit_wid, pid => $_exit_pid, status => $_exit_status,
279             msg => $_exit_msg, id => $_exit_id
280             }, $_max_retries || 0 );
281             }
282              
283 0         0 delete $self->{_exited_wid};
284             }
285             elsif (defined $_on_post_run) {
286 0         0 push @{ $self->{_status} }, {
  0         0  
287             wid => $_exit_wid, pid => $_exit_pid, status => $_exit_status,
288             msg => $_exit_msg, id => $_exit_id
289             };
290             }
291              
292 0 0       0 _task_end($self, $_task_id) unless $_total_running;
293              
294 0         0 return;
295             },
296              
297             ## ----------------------------------------------------------------------
298              
299             OUTPUT_A_REF.$LF => sub { # Input << Array ref
300 416     416   594 my $_buf;
301              
302 416 100 66     1645 if ($_offset_pos >= $_input_size || $_aborted) {
303 100 50       406 local $\ = undef if (defined $\);
304 100         167 print {$_DAU_R_SOCK} '0'.$LF;
  100         5941  
305              
306 100         509 return;
307             }
308              
309 316 100 66     924 if ($_single_dim && $_cs_one_flag) {
310 249         1389 $_buf = $self->{freeze}( [ $_input_data->[$_offset_pos] ] );
311             }
312             else {
313 67 100       195 if ($_offset_pos + $_chunk_size - 1 < $_input_size) {
314 54         131 $_buf = $self->{freeze}( [ @{ $_input_data }[
  54         559  
315             $_offset_pos .. $_offset_pos + $_chunk_size - 1
316             ] ] );
317             }
318             else {
319 13         39 $_buf = $self->{freeze}( [ @{ $_input_data }[
  13         120  
320             $_offset_pos .. $_input_size - 1
321             ] ] );
322             }
323             }
324              
325 316 50       541 $_len = length $_buf; local $\ = undef if (defined $\);
  316         1807  
326 316         393 print {$_DAU_R_SOCK} $_len.$LF . (++$_chunk_id).$LF, $_buf;
  316         13804  
327 316         633 $_offset_pos += $_chunk_size;
328              
329 316         1066 return;
330             },
331              
332             OUTPUT_G_REF.$LF => sub { # Input << Glob ref
333 54     54   159 my $_buf = '';
334              
335             ## The logic below honors ('Ctrl/Z' in Windows, 'Ctrl/D' in Unix)
336             ## when reading from standard input. No output will be lost as
337             ## far as what was previously read into the buffer.
338              
339 54 100 66     410 if ($_eof_flag || $_aborted) {
340 36 50       119 local $\ = undef if (defined $\);
341 36         57 print {$_DAU_R_SOCK} '0'.$LF;
  36         3755  
342              
343 36         651 return;
344             }
345              
346             {
347 18 50       36 local $/ = $_RS if ($_RS_FLG);
  18         58  
348              
349 18 50       58 if ($_chunk_size <= MAX_RECS_SIZE) {
350 0 0       0 if ($_chunk_size == 1) {
351 0 0       0 $_buf .= $_input_glob->can('getline')
352             ? $_input_glob->getline : <$_input_glob>;
353 0 0       0 $_eof_flag = 1 unless (length $_buf);
354             }
355             else {
356 0         0 my $_last_len = 0;
357 0         0 for (1 .. $_chunk_size) {
358 0 0       0 $_buf .= $_input_glob->can('getline')
359             ? $_input_glob->getline : <$_input_glob>;
360 0         0 $_len = length $_buf;
361 0 0       0 if ($_len == $_last_len) {
362 0         0 $_eof_flag = 1;
363 0         0 last;
364             }
365 0         0 $_last_len = $_len;
366             }
367             }
368             }
369             else {
370 18 50 33     1783 if ($_input_glob->can('getline') && $_input_glob->can('read')) {
371 18 50       301 if ($_input_glob->read($_buf, $_chunk_size) == $_chunk_size) {
372 0         0 $_buf .= $_input_glob->getline;
373 0 0       0 $_eof_flag = 1 if (length $_buf == $_chunk_size);
374             } else {
375 18         1518 $_eof_flag = 1;
376             }
377             }
378             else {
379 0 0       0 if (read($_input_glob, $_buf, $_chunk_size) == $_chunk_size) {
380 0         0 $_buf .= <$_input_glob>;
381 0 0       0 $_eof_flag = 1 if (length $_buf == $_chunk_size);
382             } else {
383 0         0 $_eof_flag = 1;
384             }
385             }
386             }
387             }
388              
389 18 50       45 $_len = length $_buf; local $\ = undef if (defined $\);
  18         152  
390              
391 18 50       61 if ($_len) {
392 18         180 my $_tmp = $self->{freeze}(\$_buf);
393 18         38 print {$_DAU_R_SOCK} length($_tmp).$LF . (++$_chunk_id).$LF, $_tmp;
  18         503  
394             }
395             else {
396 0         0 print {$_DAU_R_SOCK} '0'.$LF;
  0         0  
397             }
398              
399 18         158 return;
400             },
401              
402             OUTPUT_H_REF.$LF => sub { # Input << Hash ref
403 18     18   35 my @_pairs;
404              
405 18 100 66     127 if ($_offset_pos >= $_input_size || $_aborted) {
406 3 50       26 local $\ = undef if (defined $\);
407 3         9 print {$_DAU_R_SOCK} '0'.$LF;
  3         199  
408              
409 3         22 return;
410             }
411              
412 15 100       47 if ($_offset_pos + $_chunk_size - 1 < $_input_size) {
413 12         33 for my $_i ($_offset_pos .. $_offset_pos + $_chunk_size - 1) {
414 24         40 push @_pairs, each %{ $_input_data };
  24         308  
415             }
416             }
417             else {
418 3         24 for my $_i ($_offset_pos .. $_input_size - 1) {
419 3         12 push @_pairs, each %{ $_input_data };
  3         132  
420             }
421             }
422              
423 15         157 my $_buf = $self->{freeze}(\@_pairs);
424              
425 15 50       25 $_len = length $_buf; local $\ = undef if (defined $\);
  15         50  
426 15         23 print {$_DAU_R_SOCK} $_len.$LF . (++$_chunk_id).$LF, $_buf;
  15         9256  
427 15         46 $_offset_pos += $_chunk_size;
428              
429 15         94 return;
430             },
431              
432             OUTPUT_I_REF.$LF => sub { # Input << Iter ref
433 0     0   0 my $_buf;
434              
435 0 0       0 if ($_aborted) {
436 0 0       0 local $\ = undef if (defined $\);
437 0         0 print {$_DAU_R_SOCK} '-1'.$LF;
  0         0  
438              
439 0         0 return;
440             }
441              
442 0 0       0 my @_ret_a = (ref $_input_data eq 'CODE')
443             ? $_input_data->($_chunk_size)
444             : $_input_data->();
445              
446 0 0 0     0 if (@_ret_a > 1 || defined $_ret_a[0]) {
447 0         0 $_buf = $self->{freeze}([ @_ret_a ]);
448 0 0       0 $_len = length $_buf; local $\ = undef if (defined $\);
  0         0  
449 0         0 print {$_DAU_R_SOCK} $_len.$LF . (++$_chunk_id).$LF, $_buf;
  0         0  
450              
451 0         0 return;
452             }
453              
454 0 0       0 local $\ = undef if (defined $\);
455 0         0 print {$_DAU_R_SOCK} '-1'.$LF;
  0         0  
456 0         0 $_aborted = 1;
457              
458 0         0 return;
459             },
460              
461             ## ----------------------------------------------------------------------
462              
463             OUTPUT_A_CBK.$LF => sub { # Callback w/ args
464 283     283   4314 chomp($_wa = <$_DAU_R_SOCK>),
465             chomp($_cb = <$_DAU_R_SOCK>),
466             chomp($_len = <$_DAU_R_SOCK>);
467              
468 283         1483 read $_DAU_R_SOCK, my($_buf), $_len;
469              
470 283         4354 my $_aref = $self->{thaw}($_buf);
471 283         617 undef $_buf;
472              
473 283         476 return $_cb_reply->(@{ $_aref });
  283         758  
474             },
475              
476             OUTPUT_N_CBK.$LF => sub { # Callback w/ no args
477 2     2   47 chomp($_wa = <$_DAU_R_SOCK>),
478             chomp($_cb = <$_DAU_R_SOCK>);
479              
480 2         17 return $_cb_reply->();
481             },
482              
483             OUTPUT_A_GTR.$LF => sub { # Gather data
484 1132     1132   14610 chomp($_task_id = <$_DAU_R_SOCK>),
485             chomp($_len = <$_DAU_R_SOCK>);
486              
487 1132         3446 read $_DAU_R_SOCK, my($_buf), $_len;
488              
489 1132 100       2927 if ($_is_c_ref[$_task_id]) {
    50          
    100          
490 603         5666 local $_ = $self->{thaw}($_buf);
491 603         1048 $_gather[$_task_id]->(@{ $_ });
  603         3259  
492             }
493             elsif ($_is_h_ref[$_task_id]) {
494 0         0 local $_ = $self->{thaw}($_buf);
495 0         0 while (1) {
496 0         0 my $_key = shift @{ $_ }; my $_val = shift @{ $_ };
  0         0  
  0         0  
  0         0  
497 0         0 $_gather[$_task_id]->{$_key} = $_val;
498 0 0       0 last unless (@{ $_ });
  0         0  
499             }
500             }
501             elsif ($_is_q_ref[$_task_id]) {
502 379         629 $_gather[$_task_id]->enqueue(@{ $self->{thaw}($_buf) });
  379         4032  
503             }
504             else {
505 150         194 push @{ $_gather[$_task_id] }, @{ $self->{thaw}($_buf) };
  150         292  
  150         2355  
506             }
507              
508 1132         6085 return;
509             },
510              
511             ## ----------------------------------------------------------------------
512              
513             OUTPUT_O_SND.$LF => sub { # Send >> STDOUT
514 0     0   0 chomp($_len = <$_DAU_R_SOCK>);
515              
516 0         0 read $_DAU_R_SOCK, my($_buf), $_len;
517 0         0 $_buf = ${ $self->{thaw}($_buf) };
  0         0  
518              
519 0 0       0 if (defined $_user_output) {
520 0         0 $_user_output->($_buf);
521             }
522             else {
523 94     94   50092 use bytes;
  94         64361  
  94         563  
524 0         0 print {$_MCE_STDOUT} $_buf;
  0         0  
525             }
526              
527 0         0 return;
528             },
529              
530             OUTPUT_E_SND.$LF => sub { # Send >> STDERR
531 0     0   0 chomp($_len = <$_DAU_R_SOCK>);
532              
533 0         0 read $_DAU_R_SOCK, my($_buf), $_len;
534 0         0 $_buf = ${ $self->{thaw}($_buf) };
  0         0  
535              
536 0 0       0 if (defined $_user_error) {
537 0         0 $_user_error->($_buf);
538             }
539             else {
540 94     94   14856 use bytes;
  94         185  
  94         369  
541 0         0 print {$_MCE_STDERR} $_buf;
  0         0  
542             }
543              
544 0         0 return;
545             },
546              
547             OUTPUT_F_SND.$LF => sub { # Send >> File
548 0     0   0 my ($_buf, $_OUT_FILE);
549              
550 0         0 chomp($_len = <$_DAU_R_SOCK>);
551 0         0 read $_DAU_R_SOCK, $_buf, $_len;
552              
553 0         0 $_buf = $self->{thaw}($_buf);
554 0         0 $_file = $_buf->[0];
555              
556 0 0       0 unless (exists $_sendto_fhs{$_file}) {
557 0 0       0 open $_sendto_fhs{$_file}, ">>", "$_file"
558             or _croak "Cannot open file for writing ($_file): $!";
559              
560 0         0 binmode $_sendto_fhs{$_file};
561              
562 0 0 0     0 if (!exists $self->{flush_file} || $self->{flush_file}) {
563 0         0 local $!;
564 0         0 $_sendto_fhs{$_file}->autoflush(1);
565             }
566             }
567              
568             {
569 94     94   22436 use bytes;
  94         185  
  94         395  
  0         0  
570 0         0 $_OUT_FILE = $_sendto_fhs{$_file};
571 0         0 print {$_OUT_FILE} $_buf->[1];
  0         0  
572             }
573              
574 0         0 return;
575             },
576              
577             OUTPUT_D_SND.$LF => sub { # Send >> File descriptor
578 0     0   0 my ($_buf, $_OUT_FILE);
579              
580 0         0 chomp($_len = <$_DAU_R_SOCK>);
581 0         0 read $_DAU_R_SOCK, $_buf, $_len;
582              
583 0         0 $_buf = $self->{thaw}($_buf);
584              
585             {
586 94     94   17679 use bytes;
  94         263  
  94         421  
  0         0  
587 0         0 $_OUT_FILE = _sendto_fhs_get($self, $_buf->[0]);
588 0         0 print {$_OUT_FILE} $_buf->[1];
  0         0  
589             }
590              
591 0         0 return;
592             },
593              
594             ## ----------------------------------------------------------------------
595              
596             OUTPUT_B_SYN.$LF => sub { # Barrier sync - begin
597 0 0 0 0   0 if (!defined $_sync_cnt || $_sync_cnt == 0) {
598 0         0 $_syn_flag = 1, $_sync_cnt = 0;
599             }
600              
601             my $_total_running = ($_has_user_tasks)
602             ? $self->{_task}->[0]->{_total_running}
603 0 0       0 : $self->{_total_running};
604              
605 0 0       0 if (++$_sync_cnt == $_total_running) {
606 0         0 for my $_i (1 .. $_total_running) {
607 0         0 syswrite($_BSB_W_SOCK, $LF);
608             }
609 0         0 undef $_syn_flag;
610             }
611              
612 0         0 return;
613             },
614              
615             OUTPUT_E_SYN.$LF => sub { # Barrier sync - end
616 0 0   0   0 if (--$_sync_cnt == 0) {
617             my $_total_running = ($_has_user_tasks)
618             ? $self->{_task}->[0]->{_total_running}
619 0 0       0 : $self->{_total_running};
620              
621 0         0 for my $_i (1 .. $_total_running) {
622 0         0 syswrite($_BSB_R_SOCK, $LF);
623             }
624             }
625              
626 0         0 return;
627             },
628              
629             OUTPUT_S_IPC.$LF => sub { # Change to win32 IPC
630 0     0   0 syswrite($_DAT_R_SOCK, $LF);
631              
632 0 0       0 $_win32_ipc = 1, goto _LOOP unless $_win32_ipc;
633              
634 0         0 return;
635             },
636              
637             OUTPUT_C_NFY.$LF => sub { # Chunk ID notification
638 0     0   0 chomp($_len = <$_DAU_R_SOCK>);
639              
640 0         0 my ($_pid, $_chunk_id) = split /:/, $_len;
641 0         0 $self->{_pids_c}{$_pid} = $_chunk_id;
642              
643 0         0 return;
644             },
645              
646             OUTPUT_P_NFY.$LF => sub { # Progress notification
647 0     0   0 chomp($_len = <$_DAU_R_SOCK>);
648              
649 0         0 $self->{progress}->( $_size_completed += $_len );
650              
651 0         0 return;
652             },
653              
654             OUTPUT_S_DIR.$LF => sub { # Make/get sess_dir
655 0     0   0 print {$_DAU_R_SOCK} $self->sess_dir().$LF;
  0         0  
656              
657 0         0 return;
658             },
659              
660             OUTPUT_T_DIR.$LF => sub { # Make/get tmp_dir
661 0     0   0 print {$_DAU_R_SOCK} $self->tmp_dir().$LF;
  0         0  
662              
663 0         0 return;
664             },
665              
666             OUTPUT_I_DLY.$LF => sub { # Interval delay
667 0 0   0   0 my $_tasks = $_has_user_tasks ? $self->{user_tasks} : undef;
668              
669 0         0 chomp($_task_id = <$_DAU_R_SOCK>);
670              
671             my $_interval = ($_tasks && exists $_tasks->[$_task_id]{interval})
672             ? $_tasks->[$_task_id]{interval}
673 0 0 0     0 : $self->{interval};
674              
675 0 0       0 if (!$_interval) {
    0          
676 0         0 print {$_DAU_R_SOCK} '0'.$LF;
  0         0  
677             }
678             elsif ($_interval->{max_nodes} == 1) {
679 0         0 my $_delay = $_interval->{delay};
680 0         0 my $_lapse = $_interval->{_lapse};
681 0         0 my $_time = MCE::Util::_time();
682              
683 0 0 0     0 if (!$_delay || !defined $_lapse) {
    0          
684 0         0 $_lapse = $_time;
685             }
686             elsif ($_lapse + $_delay - $_time < 0) {
687 0         0 $_lapse += int( abs($_time - $_lapse) / $_delay + 0.5 ) * $_delay;
688             }
689              
690 0         0 $_interval->{_lapse} = ($_lapse += $_delay);
691 0         0 print {$_DAU_R_SOCK} ($_lapse - $_time).$LF
  0         0  
692             }
693             else {
694             my $_max_workers = ($_tasks)
695             ? $_tasks->[$_task_id]{max_workers}
696 0 0       0 : $self->{max_workers};
697              
698 0 0       0 if (++$_delay_wid[$_task_id] > $_max_workers) {
699 0         0 $_delay_wid[$_task_id] = 1;
700             }
701              
702 0         0 my $_nodes = $_interval->{max_nodes};
703 0         0 my $_id = $_interval->{node_id};
704 0         0 my $_delay = $_interval->{delay} * $_nodes;
705              
706 0         0 my $_app_tb = $_delay * $_max_workers;
707 0         0 my $_app_st = $_interval->{_time} + ($_delay / $_nodes * $_id);
708 0         0 my $_wrk_st = ($_delay_wid[$_task_id] - 1) * $_delay + $_app_st;
709              
710 0         0 $_delay = $_wrk_st - MCE::Util::_time();
711              
712 0 0 0     0 if ($_delay < 0.0 && $_app_tb) {
713 0         0 my $_count = int($_delay * -1 / $_app_tb + 0.5) + 1;
714 0         0 $_delay += $_app_tb * $_count;
715 0 0       0 $_interval->{_time} = MCE::Util::_time() if ($_count > 2e9);
716             }
717              
718             ($_delay > 0.0)
719 0         0 ? print {$_DAU_R_SOCK} $_delay.$LF
720 0 0       0 : print {$_DAU_R_SOCK} '0'.$LF;
  0         0  
721             }
722              
723 0         0 return;
724             },
725              
726 148         41585 );
727              
728             ## -------------------------------------------------------------------------
729              
730 148         3934 local ($!, $?, $_);
731              
732 148         468 $_aborted = $_chunk_id = $_eof_flag = $_size_completed = 0;
733 148 100       870 $_has_user_tasks = (defined $self->{user_tasks}) ? 1 : 0;
734 148 100       552 $_cs_one_flag = ($self->{chunk_size} == 1) ? 1 : 0;
735              
736 148         376 $_max_retries = $self->{max_retries};
737 148         602 $_on_post_exit = $self->{on_post_exit};
738 148         365 $_on_post_run = $self->{on_post_run};
739 148         358 $_chunk_size = $self->{chunk_size};
740 148         709 $_user_output = $self->{user_output};
741 148         281 $_user_error = $self->{user_error};
742 148         273 $_single_dim = $self->{_single_dim};
743 148         378 $_sess_dir = $self->{_sess_dir};
744              
745 148 50 33     989 if (defined $_max_retries && !$_on_post_exit) {
746             $_on_post_exit = sub {
747 0     0   0 my ($self, $_e, $_retry_cnt) = @_;
748              
749 0 0       0 if ($_e->{id}) {
750 0         0 my $_cnt = $_retry_cnt + 1;
751 0         0 my $_msg = "Error: chunk $_e->{id} failed";
752              
753 0 0       0 if (defined $self->{init_relay}) {
754 0 0       0 print {*STDERR} "$_msg, retrying chunk attempt # $_cnt\n"
  0         0  
755             if ($_retry_cnt < $_max_retries);
756             }
757             else {
758             ($_retry_cnt < $_max_retries)
759 0         0 ? print {*STDERR} "$_msg, retrying chunk attempt # $_cnt\n"
760 0 0       0 : print {*STDERR} "$_msg\n";
  0         0  
761             }
762              
763 0         0 $self->restart_worker;
764             }
765 0         0 };
766             }
767              
768 148 50 66     1988 if ($_has_user_tasks && $self->{user_tasks}->[0]->{chunk_size}) {
769 0         0 $_chunk_size = $self->{user_tasks}->[0]->{chunk_size};
770             }
771              
772 148 100       417 if ($_has_user_tasks) {
773 109         245 for my $_i (0 .. @{ $self->{user_tasks} } - 1) {
  109         1252  
774             $_gather[$_i] = (defined $self->{user_tasks}->[$_i]->{gather})
775 181 100       1552 ? $self->{user_tasks}->[$_i]->{gather} : $self->{gather};
776              
777 181 100       784 $_is_c_ref[$_i] = ( ref $_gather[$_i] eq 'CODE' ) ? 1 : 0;
778 181 50       756 $_is_h_ref[$_i] = ( ref $_gather[$_i] eq 'HASH' ) ? 1 : 0;
779              
780 181 100 66     1566 $_is_q_ref[$_i] = (
781             ref $_gather[$_i] eq 'MCE::Queue' ||
782             ref $_gather[$_i] eq 'Thread::Queue' ) ? 1 : 0;
783             }
784             }
785              
786 148 100       1109 if (defined $self->{gather}) {
787 68         337 $_gather[0] = $self->{gather};
788              
789 68 100       492 $_is_c_ref[0] = ( ref $_gather[0] eq 'CODE' ) ? 1 : 0;
790 68 50       205 $_is_h_ref[0] = ( ref $_gather[0] eq 'HASH' ) ? 1 : 0;
791              
792 68 50 33     774 $_is_q_ref[0] = (
793             ref $_gather[0] eq 'MCE::Queue' ||
794             ref $_gather[0] eq 'Thread::Queue' ) ? 1 : 0;
795             }
796              
797 148 100 100     1102 if (defined $_input_data && ref $_input_data eq 'ARRAY') {
    100 66        
798 45         80 $_input_size = @{ $_input_data };
  45         149  
799 45         105 $_offset_pos = 0;
800             }
801             elsif (defined $_input_data && ref $_input_data eq 'HASH') {
802 3         13 $_input_size = scalar( keys %{ $_input_data } );
  3         15  
803 3         24 $_offset_pos = 0;
804             }
805             else {
806 100         202 $_input_size = $_offset_pos = 0;
807             }
808              
809             ## Set STDOUT/STDERR to user parameters.
810              
811 148 50       950 if (defined $self->{stdout_file}) {
812             open $_MCE_STDOUT, '>>', $self->{stdout_file}
813 0 0       0 or die $self->{stdout_file} . ": $!\n";
814 0         0 binmode $_MCE_STDOUT;
815             }
816             else {
817 148         588 $_MCE_STDOUT = \*STDOUT;
818             }
819              
820 148 50       873 if (defined $self->{stderr_file}) {
821             open $_MCE_STDERR, '>>', $self->{stderr_file}
822 0 0       0 or die $self->{stderr_file} . ": $!\n";
823 0         0 binmode $_MCE_STDERR;
824             }
825             else {
826 148         304 $_MCE_STDERR = \*STDERR;
827             }
828              
829             ## Autoflush STDERR-STDOUT handles if not specified or requested.
830              
831             {
832 148         241 local $!;
  148         533  
833              
834             $_MCE_STDERR->autoflush(1)
835 148 50 33     2550 if ( !exists $self->{flush_stderr} || $self->{flush_stderr} );
836              
837             $_MCE_STDOUT->autoflush(1)
838 148 50 33     12281 if ( !exists $self->{flush_stdout} || $self->{flush_stdout} );
839             }
840              
841             ## -------------------------------------------------------------------------
842              
843             ## Output event loop.
844              
845 148         4451 my $_channels = $self->{_dat_r_sock};
846 148         445 my $_func;
847              
848             $_win32_ipc = (
849             $ENV{'PERL_MCE_IPC'} eq 'win32' ||
850             defined($self->{max_retries}) ||
851             $INC{'MCE/Child.pm'} ||
852 148   33     2665 $INC{'MCE/Hobo.pm'}
853             );
854              
855 148         614 $_BSB_W_SOCK = $self->{_bsb_w_sock};
856 148         661 $_BSB_R_SOCK = $self->{_bsb_r_sock};
857 148         332 $_DAT_R_SOCK = $self->{_dat_r_sock}->[0];
858              
859 148   33     1175 $_RS = $self->{RS} || $/;
860 148         350 $_O_SEP = $\; local $\ = undef;
  148         508  
861 148         287 $_I_SEP = $/; local $/ = $LF;
  148         610  
862              
863 148 50 33     1812 $_RS_FLG = (!$_RS || $_RS ne $LF) ? 1 : 0;
864 148 50       513 $_O_FLG = (defined $_O_SEP) ? 1 : 0;
865 148 50 33     1169 $_I_FLG = (!$_I_SEP || $_I_SEP ne $LF) ? 1 : 0;
866              
867             ## Call module's loop_begin routine for modules plugged into MCE.
868              
869 148         305 for my $_p (@{ $_plugin_loop_begin }) {
  148         1228  
870 103         939 $_p->($self, \$_DAU_R_SOCK);
871             }
872              
873             ## Wait on requests *with* timeout capability. Exit loop when all workers
874             ## have completed processing or exited prematurely.
875              
876             _LOOP:
877              
878 148 50 33     1606 if ($self->{loop_timeout} && @{ $self->{_tids} } == 0 && $^O ne 'MSWin32') {
  0 50 33     0  
879 0         0 my ($_list, $_timeout) = ($self->{_pids}, $self->{loop_timeout});
880 0         0 my ($_DAT_W_SOCK, $_pid) = ($self->{_dat_w_sock}->[0]);
881              
882 0         0 $self->{_pids_c} = {}; # Chunk ID notification
883              
884 0 0       0 $_timeout = 5 if $_timeout < 5;
885              
886             local $SIG{ALRM} = sub {
887 0     0   0 alarm 0; local ($!, $?);
  0         0  
888              
889 0         0 for my $i (0 .. @{ $_list }) {
  0         0  
890 0 0       0 if ($_pid = $_list->[$i]) {
891 0 0       0 if (waitpid($_pid, _WNOHANG)) {
892              
893 0         0 $_list->[$i] = undef;
894              
895 0 0       0 if ($? > abs($self->{_wrk_status})) {
896 0         0 $self->{_wrk_status} = $?;
897             }
898              
899 0         0 my $_task_id = $self->{_pids_t}{$_pid};
900 0         0 my $_wid = $self->{_pids_w}{$_pid};
901              
902 0         0 $self->{_total_exited} += 1;
903 0         0 $self->{_total_running} -= 1;
904 0         0 $self->{_total_workers} -= 1;
905              
906 0 0 0     0 if ($_has_user_tasks && $_task_id >= 0) {
907 0         0 $self->{_task}->[$_task_id]->{_total_running} -= 1;
908 0         0 $self->{_task}->[$_task_id]->{_total_workers} -= 1;
909             }
910              
911             my $_total_running = ($_has_user_tasks)
912             ? $self->{_task}->[$_task_id]->{_total_running}
913 0 0       0 : $self->{_total_running};
914              
915 0 0 0     0 if ($_task_id == 0 && defined $_syn_flag && $_sync_cnt) {
      0        
916 0 0       0 if ($_sync_cnt == $_total_running) {
917 0         0 for my $_i (1 .. $_total_running) {
918 0         0 syswrite($_BSB_W_SOCK, $LF);
919             }
920 0         0 undef $_syn_flag;
921             }
922             }
923              
924 0 0       0 _task_end($self, $_task_id) unless $_total_running;
925              
926 0 0       0 if (my $_cid = $self->{_pids_c}{$_pid}) {
927 0         0 warn "Error: process $_pid has ended prematurely\n",
928             "Error: chunk $_cid failed\n";
929              
930 0 0       0 if ($_cid > $self->{_relayed}) {
931 0         0 local $SIG{CHLD} = 'IGNORE';
932 0         0 my $_pid = fork;
933              
934 0 0 0     0 if (defined $_pid && $_pid == 0) {
935 0         0 delete $self->{max_retries};
936              
937 0         0 $self->{_chunk_id} = $_cid;
938 0         0 $self->{_task_id} = $_task_id;
939 0         0 $self->{_wid} = $_wid;
940              
941 0         0 eval 'MCE::relay';
942              
943 0         0 CORE::kill('KILL', $$);
944 0         0 CORE::exit(0);
945             }
946             }
947             }
948              
949 0         0 delete $self->{_pids_c}{$_pid};
950 0         0 delete $self->{_pids_t}{$_pid};
951 0         0 delete $self->{_pids_w}{$_pid};
952             }
953             }
954             }
955              
956 0         0 print {$_DAT_W_SOCK} 'NOOP'.$LF . '0'.$LF;
  0         0  
957 0         0 };
958              
959 0         0 while ( $self->{_total_running} ) {
960 0         0 alarm($_timeout); $_func = <$_DAT_R_SOCK>, alarm(0);
  0         0  
961 0         0 $_DAU_R_SOCK = $_channels->[ <$_DAT_R_SOCK> ];
962              
963 0 0       0 if (exists $_core_output_function{$_func}) {
    0          
964 0         0 $_core_output_function{$_func}();
965             } elsif (exists $_plugin_function->{$_func}) {
966 0         0 $_plugin_function->{$_func}();
967             }
968             }
969              
970 0         0 delete $self->{_pids_c};
971             }
972              
973             ## Wait on requests *without* timeout capability.
974              
975             elsif ($^O eq 'MSWin32') {
976 0 0       0 MCE::Util::_nonblocking($_DAT_R_SOCK, 1) if $_win32_ipc;
977              
978 0         0 while ($self->{_total_running}) {
979 0         0 MCE::Util::_sysread2($_DAT_R_SOCK, $_func, 8);
980 0 0       0 last() unless length($_func) == 8;
981 0         0 $_DAU_R_SOCK = $_channels->[ substr($_func, -2, 2, '') ];
982              
983 0 0       0 if (exists $_core_output_function{$_func}) {
    0          
984 0         0 $_core_output_function{$_func}();
985             } elsif (exists $_plugin_function->{$_func}) {
986 0         0 $_plugin_function->{$_func}();
987             }
988             }
989              
990 0 0       0 MCE::Util::_nonblocking($_DAT_R_SOCK, 0) if $_win32_ipc;
991             }
992             else {
993 148         601 while ($self->{_total_running}) {
994 3869         531548 $_func = <$_DAT_R_SOCK>;
995 3869 50       10988 last() unless length($_func) == 6;
996 3869         9622 $_DAU_R_SOCK = $_channels->[ <$_DAT_R_SOCK> ];
997              
998 3869 100       12252 if (exists $_core_output_function{$_func}) {
    50          
999 2357         5831 $_core_output_function{$_func}();
1000             } elsif (exists $_plugin_function->{$_func}) {
1001 1512         5379 $_plugin_function->{$_func}();
1002             }
1003             }
1004             }
1005              
1006             ## Call module's loop_end routine for modules plugged into MCE.
1007              
1008 148         293 for my $_p (@{ $_plugin_loop_end }) {
  148         590  
1009 103         708 $_p->($self);
1010             }
1011              
1012             ## Call on_post_run callback.
1013              
1014 148 50       563 $_on_post_run->($self, $self->{_status}) if (defined $_on_post_run);
1015              
1016             ## Close opened sendto file handles.
1017              
1018 148         862 _sendto_fhs_close();
1019              
1020             ## Close MCE STDOUT/STDERR handles.
1021              
1022 148         34906 eval q{
1023             close $_MCE_STDOUT if (fileno $_MCE_STDOUT > 2);
1024             close $_MCE_STDERR if (fileno $_MCE_STDERR > 2);
1025             };
1026              
1027 148         30258 return;
1028             }
1029              
1030             1;
1031              
1032             __END__
1033              
1034             ###############################################################################
1035             ## ----------------------------------------------------------------------------
1036             ## Module usage.
1037             ##
1038             ###############################################################################
1039              
1040             =head1 NAME
1041              
1042             MCE::Core::Manager - Core methods for the manager process
1043              
1044             =head1 VERSION
1045              
1046             This document describes MCE::Core::Manager version 1.902
1047              
1048             =head1 DESCRIPTION
1049              
1050             This package provides the loop and relevant methods used internally by the
1051             manager process.
1052              
1053             There is no public API.
1054              
1055             =head1 AUTHOR
1056              
1057             Mario E. Roy, S<E<lt>marioeroy AT gmail DOT comE<gt>>
1058              
1059             =cut
1060