File Coverage

blib/lib/MCE/Step.pm
Criterion Covered Total %
statement 256 348 73.5
branch 116 238 48.7
condition 45 97 46.3
subroutine 21 26 80.7
pod 5 9 55.5
total 443 718 61.7


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## Parallel step model for building creative steps.
4             ##
5             ###############################################################################
6              
7             package MCE::Step;
8              
9 12     12   624250 use strict;
  12         106  
  12         321  
10 12     12   54 use warnings;
  12         19  
  12         423  
11              
12 12     12   60 no warnings qw( threads recursion uninitialized );
  12         25  
  12         607  
13              
14             our $VERSION = '1.889';
15              
16             ## no critic (BuiltinFunctions::ProhibitStringyEval)
17             ## no critic (Subroutines::ProhibitSubroutinePrototypes)
18             ## no critic (TestingAndDebugging::ProhibitNoStrict)
19              
20 12     12   71 use Scalar::Util qw( looks_like_number );
  12         13  
  12         524  
21              
22 12     12   5749 use MCE;
  12         36  
  12         59  
23 12     12   5975 use MCE::Queue;
  12         24  
  12         46  
24              
25             our @CARP_NOT = qw( MCE );
26              
27             my $_tid = $INC{'threads.pm'} ? threads->tid() : 0;
28              
29             sub CLONE {
30 0 0   0   0 $_tid = threads->tid() if $INC{'threads.pm'};
31             }
32              
33             ###############################################################################
34             ## ----------------------------------------------------------------------------
35             ## Import routine.
36             ##
37             ###############################################################################
38              
39             my ($_MCE, $_def, $_params, $_tag) = ({}, {}, {}, 'MCE::Step');
40             my ($_prev_c, $_prev_n, $_prev_t, $_prev_w) = ({}, {}, {}, {});
41             my ($_user_tasks, $_queue, $_last_task_id, $_lkup) = ({}, {}, {}, {});
42              
43             sub import {
44 12     12   162 my ($_class, $_pkg) = (shift, caller);
45              
46 12         42 my $_p = $_def->{$_pkg} = {
47             MAX_WORKERS => 'auto',
48             CHUNK_SIZE => 'auto',
49             };
50              
51             ## Import functions.
52 12 50       37 if ($_pkg !~ /^MCE::/) {
53 12     12   90 no strict 'refs'; no warnings 'redefine';
  12     12   18  
  12         337  
  12         54  
  12         30  
  12         5050  
54 12         24 *{ $_pkg.'::mce_step_f' } = \&run_file;
  12         66  
55 12         90 *{ $_pkg.'::mce_step_s' } = \&run_seq;
  12         125  
56 12         25 *{ $_pkg.'::mce_step' } = \&run;
  12         37  
57             }
58              
59             ## Process module arguments.
60 12         141 while ( my $_argument = shift ) {
61 0         0 my $_arg = lc $_argument;
62              
63 0 0       0 $_p->{MAX_WORKERS} = shift, next if ( $_arg eq 'max_workers' );
64 0 0       0 $_p->{CHUNK_SIZE} = shift, next if ( $_arg eq 'chunk_size' );
65 0 0       0 $_p->{TMP_DIR} = shift, next if ( $_arg eq 'tmp_dir' );
66 0 0       0 $_p->{FREEZE} = shift, next if ( $_arg eq 'freeze' );
67 0 0       0 $_p->{THAW} = shift, next if ( $_arg eq 'thaw' );
68 0 0       0 $_p->{INIT_RELAY} = shift, next if ( $_arg eq 'init_relay' );
69 0 0       0 $_p->{USE_THREADS} = shift, next if ( $_arg eq 'use_threads' );
70              
71 0 0       0 shift, next if ( $_arg eq 'fast' ); # ignored
72              
73             ## Sereal 3.015+, if available, is used automatically by MCE 1.8+.
74 0 0       0 if ( $_arg eq 'sereal' ) {
75 0 0       0 if ( shift eq '0' ) {
76 0         0 require Storable;
77 0         0 $_p->{FREEZE} = \&Storable::freeze;
78 0         0 $_p->{THAW} = \&Storable::thaw;
79             }
80 0         0 next;
81             }
82              
83 0         0 _croak("Error: ($_argument) invalid module option");
84             }
85              
86 12         58 $_p->{MAX_WORKERS} = MCE::_parse_max_workers($_p->{MAX_WORKERS});
87              
88 12         42 MCE::_validate_number($_p->{MAX_WORKERS}, 'MAX_WORKERS', $_tag);
89             MCE::_validate_number($_p->{CHUNK_SIZE}, 'CHUNK_SIZE', $_tag)
90 12 50       36 unless ($_p->{CHUNK_SIZE} eq 'auto');
91              
92 12         166 return;
93             }
94              
95             ###############################################################################
96             ## ----------------------------------------------------------------------------
97             ## The task end callback for when a task completes.
98             ##
99             ###############################################################################
100              
101             sub _task_end {
102              
103 22     22   48 my ($_mce, $_task_id, $_task_name) = @_;
104 22         77 my $_pid = $_mce->{_init_pid}.'.'.$_mce->{_caller};
105              
106 22 100       61 if (defined $_mce->{user_tasks}->[$_task_id + 1]) {
107 10         16 my $n_workers = $_mce->{user_tasks}->[$_task_id + 1]->{max_workers};
108 10         84 $_queue->{$_pid}[$_task_id]->enqueue((undef) x $n_workers);
109             }
110              
111             $_params->{task_end}->($_mce, $_task_id, $_task_name)
112 22 50 33     50 if (exists $_params->{task_end} && ref $_params->{task_end} eq 'CODE');
113              
114 22         62 return;
115             }
116              
117             ###############################################################################
118             ## ----------------------------------------------------------------------------
119             ## Methods for MCE; step, enq, enqp, await.
120             ##
121             ###############################################################################
122              
123             {
124 12     12   80 no warnings 'redefine';
  12         18  
  12         40880  
125              
126             sub MCE::step {
127              
128 21 50   21 0 427 my $x = shift; my $self = ref($x) ? $x : $MCE::MCE;
  21         46  
129 21         59 my $_pid = $self->{_init_pid}.'.'.$self->{_caller};
130              
131             _croak('MCE::step: method is not allowed by the manager process')
132 21 50       50 unless ($self->{_wid});
133              
134 21         28 my $_task_id = $self->{_task_id};
135              
136 21 50       73 if ($_task_id < $_last_task_id->{$_pid}) {
137 21         122 $_queue->{$_pid}[$_task_id]->enqueue($self->freeze([ @_ ]));
138             }
139             else {
140 0         0 _croak('MCE::step: method is not allowed by the last task');
141             }
142              
143 21         81 return;
144             }
145              
146             ############################################################################
147              
148             sub MCE::enq {
149              
150 0 0   0 0 0 my $x = shift; my $self = ref($x) ? $x : $MCE::MCE;
  0         0  
151 0         0 my $_pid = $self->{_init_pid}.'.'.$self->{_caller};
152 0         0 my $_name = shift;
153              
154             _croak('MCE::enq: method is not allowed by the manager process')
155 0 0       0 unless ($self->{_wid});
156             _croak('MCE::enq: (task_name) is not specified or valid')
157 0 0 0     0 if (!defined $_name || !exists $_lkup->{$_pid}{$_name});
158             _croak('MCE::enq: stepping to same task or backwards is not allowed')
159 0 0       0 if ($_lkup->{$_pid}{$_name} <= $self->{_task_id});
160              
161 0         0 my $_task_id = $_lkup->{$_pid}{$_name} - 1;
162              
163 0 0       0 if ($_task_id < $_last_task_id->{$_pid}) {
164 0 0       0 if (scalar @_ > 1) {
165 0         0 my @_items = map { $self->freeze([ $_ ]) } @_;
  0         0  
166 0         0 $_queue->{$_pid}[$_task_id]->enqueue(@_items);
167             }
168             else {
169 0         0 $_queue->{$_pid}[$_task_id]->enqueue($self->freeze([ @_ ]));
170             }
171             }
172             else {
173 0         0 _croak('MCE::enq: method is not allowed by the last task');
174             }
175              
176 0         0 return;
177             }
178              
179             ############################################################################
180              
181             sub MCE::enqp {
182              
183 0 0   0 0 0 my $x = shift; my $self = ref($x) ? $x : $MCE::MCE;
  0         0  
184 0         0 my $_pid = $self->{_init_pid}.'.'.$self->{_caller};
185 0         0 my ($_name, $_p) = (shift, shift);
186              
187             _croak('MCE::enqp: method is not allowed by the manager process')
188 0 0       0 unless ($self->{_wid});
189             _croak('MCE::enqp: (task_name) is not specified or valid')
190 0 0 0     0 if (!defined $_name || !exists $_lkup->{$_pid}{$_name});
191             _croak('MCE::enqp: stepping to same task or backwards is not allowed')
192 0 0       0 if ($_lkup->{$_pid}{$_name} <= $self->{_task_id});
193 0 0 0     0 _croak('MCE::enqp: (priority) is not an integer')
194             if (!looks_like_number($_p) || int($_p) != $_p);
195              
196 0         0 my $_task_id = $_lkup->{$_pid}{$_name} - 1;
197              
198 0 0       0 if ($_task_id < $_last_task_id->{$_pid}) {
199 0 0       0 if (scalar @_ > 1) {
200 0         0 my @_items = map { $self->freeze([ $_ ]) } @_;
  0         0  
201 0         0 $_queue->{$_pid}[$_task_id]->enqueuep($_p, @_items);
202             }
203             else {
204 0         0 $_queue->{$_pid}[$_task_id]->enqueuep($_p, $self->freeze([ @_ ]));
205             }
206             }
207             else {
208 0         0 _croak('MCE::enqp: method is not allowed by the last task');
209             }
210              
211 0         0 return;
212             }
213              
214             ############################################################################
215              
216             sub MCE::await {
217              
218 0 0   0 0 0 my $x = shift; my $self = ref($x) ? $x : $MCE::MCE;
  0         0  
219 0         0 my $_pid = $self->{_init_pid}.'.'.$self->{_caller};
220 0         0 my $_name = shift;
221              
222             _croak('MCE::await: method is not allowed by the manager process')
223 0 0       0 unless ($self->{_wid});
224             _croak('MCE::await: (task_name) is not specified or valid')
225 0 0 0     0 if (!defined $_name || !exists $_lkup->{$_pid}{$_name});
226             _croak('MCE::await: awaiting from same task or backwards is not allowed')
227 0 0       0 if ($_lkup->{$_pid}{$_name} <= $self->{_task_id});
228              
229 0   0     0 my $_task_id = $_lkup->{$_pid}{$_name} - 1; my $_t = shift || 0;
  0         0  
230              
231 0 0 0     0 _croak('MCE::await: (threshold) is not an integer')
232             if (!looks_like_number($_t) || int($_t) != $_t);
233              
234 0 0       0 if ($_task_id < $_last_task_id->{$_pid}) {
235 0         0 $_queue->{$_pid}[$_task_id]->await($_t);
236             } else {
237 0         0 _croak('MCE::await: method is not allowed by the last task');
238             }
239              
240 0         0 return;
241             }
242              
243             }
244              
245             ###############################################################################
246             ## ----------------------------------------------------------------------------
247             ## Init and finish routines.
248             ##
249             ###############################################################################
250              
251             sub init (@) {
252              
253 13 50 33 13 1 1250 shift if (defined $_[0] && $_[0] eq 'MCE::Step');
254 13         90 my $_pkg = "$$.$_tid.".caller();
255              
256 13 50       149 $_params->{$_pkg} = (ref $_[0] eq 'HASH') ? shift : { @_ };
257              
258 13         43 @_ = ();
259              
260 13         30 return;
261             }
262              
263             sub finish (@) {
264              
265 25 50 33 25 1 6555 shift if (defined $_[0] && $_[0] eq 'MCE::Step');
266 25 100       156 my $_pkg = (defined $_[0]) ? shift : "$$.$_tid.".caller();
267              
268 25 100 66     304 if ( $_pkg eq 'MCE' ) {
    100          
269 12         54 for my $_k ( keys %{ $_MCE } ) { MCE::Step->finish($_k, 1); }
  12         266  
  9         189  
270             }
271             elsif ( $_MCE->{$_pkg} && $_MCE->{$_pkg}{_init_pid} eq "$$.$_tid" ) {
272 4 50       66 $_MCE->{$_pkg}->shutdown(@_) if $_MCE->{$_pkg}{_spawned};
273              
274 4         60 delete $_lkup->{$_pkg};
275 4         27 delete $_last_task_id->{$_pkg};
276              
277 4         28 delete $_user_tasks->{$_pkg};
278 4         36 delete $_prev_c->{$_pkg};
279 4         26 delete $_prev_n->{$_pkg};
280 4         18 delete $_prev_t->{$_pkg};
281 4         21 delete $_prev_w->{$_pkg};
282 4         60 delete $_MCE->{$_pkg};
283              
284 4 50       35 if (defined $_queue->{$_pkg}) {
285 4         4395 local $_;
286 4         14 $_->DESTROY() for (@{ $_queue->{$_pkg} });
  4         27  
287 4         23 delete $_queue->{$_pkg};
288             }
289             }
290              
291 25         73 @_ = ();
292              
293 25         69 return;
294             }
295              
296             ###############################################################################
297             ## ----------------------------------------------------------------------------
298             ## Parallel step with MCE -- file.
299             ##
300             ###############################################################################
301              
302             sub run_file (@) {
303              
304 4 50 33 4 1 2650 shift if (defined $_[0] && $_[0] eq 'MCE::Step');
305              
306 4 50       6 my ($_file, $_pos); my $_start_pos = (ref $_[0] eq 'HASH') ? 2 : 1;
  4         16  
307 4         18 my $_pid = "$$.$_tid.".caller();
308              
309 4 50       14 if (defined (my $_p = $_params->{$_pid})) {
310 4 50       10 delete $_p->{input_data} if (exists $_p->{input_data});
311 4 50       8 delete $_p->{sequence} if (exists $_p->{sequence});
312             }
313             else {
314 0         0 $_params->{$_pid} = {};
315             }
316              
317 4         24 for my $_i ($_start_pos .. @_ - 1) {
318 8         16 my $_r = ref $_[$_i];
319 8 100 66     72 if ($_r eq '' || $_r eq 'SCALAR' || $_r =~ /^(?:GLOB|FileHandle|IO::)/) {
      100        
320 4         6 $_file = $_[$_i]; $_pos = $_i;
  4         6  
321 4         6 last;
322             }
323             }
324              
325 4 100 66     54 if (defined $_file && ref $_file eq '' && $_file ne '') {
    50 66        
      33        
326 2 50       74 _croak("$_tag: ($_file) does not exist") unless (-e $_file);
327 2 50       26 _croak("$_tag: ($_file) is not readable") unless (-r $_file);
328 2 50       18 _croak("$_tag: ($_file) is not a plain file") unless (-f $_file);
329 2         38 $_params->{$_pid}{_file} = $_file;
330             }
331             elsif (ref $_file eq 'SCALAR' || ref($_file) =~ /^(?:GLOB|FileHandle|IO::)/) {
332 2         4 $_params->{$_pid}{_file} = $_file;
333             }
334             else {
335 0         0 _croak("$_tag: (file) is not specified or valid");
336             }
337              
338 4 50       8 if (defined $_pos) {
339 4         26 pop @_ for ($_pos .. @_ - 1);
340             }
341              
342 4         12 return run(@_);
343             }
344              
345             ###############################################################################
346             ## ----------------------------------------------------------------------------
347             ## Parallel step with MCE -- sequence.
348             ##
349             ###############################################################################
350              
351             sub run_seq (@) {
352              
353 2 50 33 2 1 1306 shift if (defined $_[0] && $_[0] eq 'MCE::Step');
354              
355 2 50       4 my ($_begin, $_end, $_pos); my $_start_pos = (ref $_[0] eq 'HASH') ? 2 : 1;
  2         10  
356 2         8 my $_pid = "$$.$_tid.".caller();
357              
358 2 50       6 if (defined (my $_p = $_params->{$_pid})) {
359 2 50       6 delete $_p->{sequence} if (exists $_p->{sequence});
360 2 50       6 delete $_p->{input_data} if (exists $_p->{input_data});
361 2 50       8 delete $_p->{_file} if (exists $_p->{_file});
362             }
363             else {
364 0         0 $_params->{$_pid} = {};
365             }
366              
367 2         6 for my $_i ($_start_pos .. @_ - 1) {
368 4         8 my $_r = ref $_[$_i];
369              
370 4 50 66     44 if ($_r eq '' || $_r =~ /^Math::/ || $_r eq 'HASH' || $_r eq 'ARRAY') {
      66        
      33        
371 2         4 $_pos = $_i;
372              
373 2 50 33     8 if ($_r eq '' || $_r =~ /^Math::/) {
    0          
    0          
374 2         4 $_begin = $_[$_pos], $_end = $_[$_pos + 1];
375             $_params->{$_pid}{sequence} = [
376 2         12 $_[$_pos], $_[$_pos + 1], $_[$_pos + 2], $_[$_pos + 3]
377             ];
378             }
379             elsif ($_r eq 'HASH') {
380 0         0 $_begin = $_[$_pos]->{begin}, $_end = $_[$_pos]->{end};
381 0         0 $_params->{$_pid}{sequence} = $_[$_pos];
382             }
383             elsif ($_r eq 'ARRAY') {
384 0         0 $_begin = $_[$_pos]->[0], $_end = $_[$_pos]->[1];
385 0         0 $_params->{$_pid}{sequence} = $_[$_pos];
386             }
387              
388 2         4 last;
389             }
390             }
391              
392             _croak("$_tag: (sequence) is not specified or valid")
393 2 50       6 unless (exists $_params->{$_pid}{sequence});
394 2 50       8 _croak("$_tag: (begin) is not specified for sequence")
395             unless (defined $_begin);
396 2 50       4 _croak("$_tag: (end) is not specified for sequence")
397             unless (defined $_end);
398              
399 2         4 $_params->{$_pid}{sequence_run} = undef;
400              
401 2 50       6 if (defined $_pos) {
402 2         6 pop @_ for ($_pos .. @_ - 1);
403             }
404              
405 2         6 return run(@_);
406             }
407              
408             ###############################################################################
409             ## ----------------------------------------------------------------------------
410             ## Parallel step with MCE.
411             ##
412             ###############################################################################
413              
414             sub run (@) {
415              
416 21 50 33 21 1 3753 shift if (defined $_[0] && $_[0] eq 'MCE::Step');
417              
418 21 100       86 my $_pkg = caller() eq 'MCE::Step' ? caller(1) : caller();
419 21         88 my $_pid = "$$.$_tid.$_pkg";
420              
421 21 100       64 if (ref $_[0] eq 'HASH') {
422 14 50       50 $_params->{$_pid} = {} unless defined $_params->{$_pid};
423 14         16 for my $_p (keys %{ $_[0] }) {
  14         94  
424 14         32 $_params->{$_pid}{$_p} = $_[0]->{$_p};
425             }
426              
427 14         24 shift;
428             }
429              
430             ## -------------------------------------------------------------------------
431              
432 21         34 my (@_code, @_name, @_thrs, @_wrks); my $_init_mce = 0; my $_pos = 0;
  21         36  
  21         28  
433              
434 21         40 %{ $_lkup->{$_pid} } = ();
  21         91  
435              
436 21         69 while (ref $_[0] eq 'CODE') {
437 35         61 push @_code, $_[0];
438              
439 35 50       91 if (defined (my $_p = $_params->{$_pid})) {
440             push @_name, (ref $_p->{task_name} eq 'ARRAY')
441 35 100       103 ? $_p->{task_name}->[$_pos] : undef;
442             push @_thrs, (ref $_p->{use_threads} eq 'ARRAY')
443 35 50       80 ? $_p->{use_threads}->[$_pos] : undef;
444             push @_wrks, (ref $_p->{max_workers} eq 'ARRAY')
445 35 100       86 ? $_p->{max_workers}->[$_pos] : undef;
446             }
447              
448 35 100       90 $_lkup->{$_pid}{ $_name[ $_pos ] } = $_pos if (defined $_name[ $_pos ]);
449              
450             $_init_mce = 1 if (
451             !defined $_prev_c->{$_pid}[$_pos] ||
452 35 100 66     137 $_prev_c->{$_pid}[$_pos] != $_code[$_pos]
453             );
454              
455 35 100       113 $_init_mce = 1 if ($_prev_n->{$_pid}[$_pos] ne $_name[$_pos]);
456 35 50       86 $_init_mce = 1 if ($_prev_t->{$_pid}[$_pos] ne $_thrs[$_pos]);
457 35 100       91 $_init_mce = 1 if ($_prev_w->{$_pid}[$_pos] ne $_wrks[$_pos]);
458              
459 35         57 $_prev_c->{$_pid}[$_pos] = $_code[$_pos];
460 35         50 $_prev_n->{$_pid}[$_pos] = $_name[$_pos];
461 35         53 $_prev_t->{$_pid}[$_pos] = $_thrs[$_pos];
462 35         78 $_prev_w->{$_pid}[$_pos] = $_wrks[$_pos];
463              
464 35         42 shift; $_pos++;
  35         75  
465             }
466              
467 21 50       50 if (defined $_prev_c->{$_pid}[$_pos]) {
468 0         0 pop @{ $_prev_c->{$_pid} } for ($_pos .. $#{ $_prev_c->{$_pid } });
  0         0  
  0         0  
469 0         0 pop @{ $_prev_n->{$_pid} } for ($_pos .. $#{ $_prev_n->{$_pid } });
  0         0  
  0         0  
470 0         0 pop @{ $_prev_t->{$_pid} } for ($_pos .. $#{ $_prev_t->{$_pid } });
  0         0  
  0         0  
471 0         0 pop @{ $_prev_w->{$_pid} } for ($_pos .. $#{ $_prev_w->{$_pid } });
  0         0  
  0         0  
472              
473 0         0 $_init_mce = 1;
474             }
475              
476 21 50       53 return unless (scalar @_code);
477              
478             ## -------------------------------------------------------------------------
479              
480 21         25 my $_input_data; my $_max_workers = $_def->{$_pkg}{MAX_WORKERS};
  21         57  
481 21         38 my $_r = ref $_[0];
482              
483 21 100 66     126 if (@_ == 1 && $_r =~ /^(?:ARRAY|HASH|SCALAR|GLOB|FileHandle|IO::)/) {
484 4         16 $_input_data = shift;
485             }
486              
487 21 50       47 if (defined (my $_p = $_params->{$_pid})) {
488             $_max_workers = MCE::_parse_max_workers($_p->{max_workers})
489 21 100 66     139 if (exists $_p->{max_workers} && ref $_p->{max_workers} ne 'ARRAY');
490              
491 21 100 100     108 delete $_p->{sequence} if (defined $_input_data || scalar @_);
492 21 50       53 delete $_p->{user_func} if (exists $_p->{user_func});
493 21 50       55 delete $_p->{user_tasks} if (exists $_p->{user_tasks});
494             }
495              
496 21 100 66     86 if (@_code > 1 && $_max_workers > 1) {
497 14         38 $_max_workers = int($_max_workers / @_code + 0.5) + 1;
498             }
499              
500             my $_chunk_size = MCE::_parse_chunk_size(
501 21         112 $_def->{$_pkg}{CHUNK_SIZE}, $_max_workers, $_params->{$_pid},
502             $_input_data, scalar @_
503             );
504              
505 21 50       83 if (defined (my $_p = $_params->{$_pid})) {
506 21 100       54 if (exists $_p->{_file}) {
507 4         8 $_input_data = delete $_p->{_file};
508             } else {
509 17 50       50 $_input_data = $_p->{input_data} if exists $_p->{input_data};
510             }
511             }
512              
513             ## -------------------------------------------------------------------------
514              
515 21         97 MCE::_save_state($_MCE->{$_pid});
516              
517 21 100 66     128 if ($_init_mce || !exists $_queue->{$_pid}) {
518 13 50       49 $_MCE->{$_pid}->shutdown() if (defined $_MCE->{$_pid});
519 13 50       94 $_queue->{$_pid} = [] if (!defined $_queue->{$_pid});
520              
521 13         34 my $_Q = $_queue->{$_pid};
522 13         28 pop(@{ $_Q })->DESTROY for (@_code .. @{ $_Q });
  13         53  
  0         0  
523              
524 6         36 push @{ $_Q }, MCE::Queue->new(await => 1)
525 13         26 for (@{ $_Q } .. @_code - 2);
  13         86  
526              
527 13         45 $_last_task_id->{$_pid} = @_code - 1;
528              
529             ## must clear arrays for nested session to work with Perl < v5.14
530 13         154 _gen_user_tasks($_pid,$_Q, [@_code],[@_name],[@_thrs],[@_wrks], $_chunk_size);
531              
532 13         60 @_code = @_name = @_thrs = @_wrks = ();
533              
534             my %_opts = (
535             max_workers => $_max_workers, task_name => $_tag,
536 13         80 user_tasks => $_user_tasks->{$_pid}, task_end => \&_task_end,
537             );
538              
539 13 50       53 if (defined (my $_p = $_params->{$_pid})) {
540 13         26 local $_;
541              
542 13         20 for (keys %{ $_p }) {
  13         49  
543 25 100 100     118 next if ($_ eq 'max_workers' && ref $_p->{max_workers} eq 'ARRAY');
544 19 100 66     88 next if ($_ eq 'task_name' && ref $_p->{task_name} eq 'ARRAY');
545 13 50 33     69 next if ($_ eq 'use_threads' && ref $_p->{use_threads} eq 'ARRAY');
546              
547 13 50       113 next if ($_ eq 'chunk_size');
548 13 50       101 next if ($_ eq 'input_data');
549 13 50       57 next if ($_ eq 'sequence_run');
550 13 50       33 next if ($_ eq 'task_end');
551              
552             _croak("$_tag: ($_) is not a valid constructor argument")
553 13 50       74 unless (exists $MCE::_valid_fields_new{$_});
554              
555 13         46 $_opts{$_} = $_p->{$_};
556             }
557             }
558              
559 13         47 for my $_k (qw/ tmp_dir freeze thaw init_relay use_threads /) {
560             $_opts{$_k} = $_def->{$_pkg}{uc($_k)}
561 65 50 33     158 if (exists $_def->{$_pkg}{uc($_k)} && !exists $_opts{$_k});
562             }
563              
564 13         148 $_MCE->{$_pid} = MCE->new(pkg => $_pkg, %_opts);
565             }
566             else {
567             ## Workers may persist after running. Thus, updating the MCE instance.
568             ## These options do not require respawning.
569 8 50       20 if (defined (my $_p = $_params->{$_pid})) {
570 8         16 for my $_k (qw(
571             RS interval stderr_file stdout_file user_error user_output
572             job_delay submit_delay on_post_exit on_post_run user_args
573             flush_file flush_stderr flush_stdout gather max_retries
574             )) {
575 128 100       244 $_MCE->{$_pid}{$_k} = $_p->{$_k} if (exists $_p->{$_k});
576             }
577             }
578             }
579              
580             ## -------------------------------------------------------------------------
581              
582 21 100       36 my @_a; my $_wa = wantarray; $_MCE->{$_pid}{gather} = \@_a if (defined $_wa);
  21         34  
  21         56  
583              
584 21 100       62 if (defined $_input_data) {
    100          
585 8         14 @_ = ();
586 8         44 $_MCE->{$_pid}->process({ chunk_size => $_chunk_size }, $_input_data);
587 7         42 delete $_MCE->{$_pid}{input_data};
588             }
589             elsif (scalar @_) {
590 6         78 $_MCE->{$_pid}->process({ chunk_size => $_chunk_size }, \@_);
591 2         14 delete $_MCE->{$_pid}{input_data};
592             }
593             else {
594 7 100 66     48 if (defined $_params->{$_pid} && exists $_params->{$_pid}{sequence}) {
595             $_MCE->{$_pid}->run({
596             chunk_size => $_chunk_size,
597             sequence => $_params->{$_pid}{sequence}
598 2         16 }, 0);
599 2 50       10 if (exists $_params->{$_pid}{sequence_run}) {
600 2         4 delete $_params->{$_pid}{sequence_run};
601 2         4 delete $_params->{$_pid}{sequence};
602             }
603 2         4 delete $_MCE->{$_pid}{sequence};
604             }
605             else {
606 5         30 $_MCE->{$_pid}->run({ chunk_size => $_chunk_size }, 0);
607             }
608             }
609              
610 12         150 MCE::_restore_state();
611              
612             # destroy queue(s) if MCE::run requested workers to shutdown
613 12 50       38 if (!$_MCE->{$_pid}{_spawned}) {
614 0         0 $_->DESTROY() for @{ $_queue->{$_pid} };
  0         0  
615 0         0 delete $_queue->{$_pid};
616             }
617              
618 12 100       40 delete $_MCE->{$_pid}{gather} if (defined $_wa);
619              
620 12 100       246 return ((defined $_wa) ? @_a : ());
621             }
622              
623             ###############################################################################
624             ## ----------------------------------------------------------------------------
625             ## Private methods.
626             ##
627             ###############################################################################
628              
629             sub _croak {
630              
631 0     0   0 goto &MCE::_croak;
632             }
633              
634             sub _gen_user_func {
635              
636 6     6   18 my ($_qref, $_cref, $_chunk_size, $_pos) = @_;
637              
638 6         12 my $_q_in = $_qref->[$_pos - 1];
639 6         6 my $_code = $_cref->[$_pos];
640              
641             return sub {
642 10     10   23 my ($_mce) = @_;
643              
644 10         111 $_mce->{_next_jmp} = sub { goto _MCE_STEP__NEXT; };
  0         0  
645 10         53 $_mce->{_last_jmp} = sub { goto _MCE_STEP__LAST; };
  0         0  
646              
647             _MCE_STEP__NEXT:
648              
649 10         61 while (defined (local $_ = $_q_in->dequeue())) {
650 21         144 my $_args = $_mce->thaw($_); $_ = $_args->[0];
  21         50  
651 21         30 $_code->($_mce, @{ $_args });
  21         133  
652             }
653              
654             _MCE_STEP__LAST:
655              
656 10         40 return;
657 6         54 };
658             }
659              
660             sub _gen_user_tasks {
661              
662 13     13   65 my ($_pid, $_qref, $_cref, $_nref, $_tref, $_wref, $_chunk_size) = @_;
663              
664 13         21 @{ $_user_tasks->{$_pid} } = ();
  13         45  
665              
666 13         124 push @{ $_user_tasks->{$_pid} }, {
667             task_name => $_nref->[0],
668             use_threads => $_tref->[0],
669             max_workers => $_wref->[0],
670 30     30   9119 user_func => sub { $_cref->[0]->(@_); return; }
  30         81  
671 13         20 };
672              
673 13         33 for my $_pos (1 .. @{ $_cref } - 1) {
  13         47  
674 6         12 push @{ $_user_tasks->{$_pid} }, {
  6         30  
675             task_name => $_nref->[$_pos],
676             use_threads => $_tref->[$_pos],
677             max_workers => $_wref->[$_pos],
678             user_func => _gen_user_func(
679             $_qref, $_cref, $_chunk_size, $_pos
680             )
681             };
682             }
683              
684 13         26 return;
685             }
686              
687             1;
688              
689             __END__