File Coverage

blib/lib/MCE/Step.pm
Criterion Covered Total %
statement 256 355 72.1
branch 117 242 48.3
condition 45 100 45.0
subroutine 21 27 77.7
pod 5 9 55.5
total 444 733 60.5


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## Parallel step model for building creative steps.
4             ##
5             ###############################################################################
6              
7             package MCE::Step;
8              
9 12     12   1462627 use strict;
  12         35  
  12         514  
10 12     12   69 use warnings;
  12         90  
  12         802  
11              
12 12     12   84 no warnings qw( threads recursion uninitialized );
  12         29  
  12         843  
13              
14             our $VERSION = '1.902';
15              
16             ## no critic (BuiltinFunctions::ProhibitStringyEval)
17             ## no critic (Subroutines::ProhibitSubroutinePrototypes)
18             ## no critic (TestingAndDebugging::ProhibitNoStrict)
19              
20 12     12   80 use Scalar::Util qw( looks_like_number );
  12         136  
  12         1025  
21              
22 12     12   8482 use MCE;
  12         33  
  12         75  
23 12     12   7341 use MCE::Queue;
  12         39  
  12         68  
24              
25             our @CARP_NOT = qw( MCE );
26              
27             my $_tid = $INC{'threads.pm'} ? threads->tid() : 0;
28              
29             sub CLONE {
30 0 0   0   0 $_tid = threads->tid() if $INC{'threads.pm'};
31             }
32              
33             ###############################################################################
34             ## ----------------------------------------------------------------------------
35             ## Import routine.
36             ##
37             ###############################################################################
38              
39             my ($_MCE, $_def, $_params, $_tag) = ({}, {}, {}, 'MCE::Step');
40             my ($_prev_c, $_prev_n, $_prev_t, $_prev_w) = ({}, {}, {}, {});
41             my ($_user_tasks, $_queue, $_last_task_id, $_lkup) = ({}, {}, {}, {});
42              
43             sub import {
44 12     12   205 my ($_class, $_pkg) = (shift, caller);
45              
46 12         77 my $_p = $_def->{$_pkg} = {
47             MAX_WORKERS => 'auto',
48             CHUNK_SIZE => 'auto',
49             };
50              
51             ## Import functions.
52 12 50       49 if ($_pkg !~ /^MCE::/) {
53 12     12   89 no strict 'refs'; no warnings 'redefine';
  12     12   23  
  12         490  
  12         66  
  12         18  
  12         6989  
54 12         38 *{ $_pkg.'::mce_step_f' } = \&run_file;
  12         203  
55 12         31 *{ $_pkg.'::mce_step_s' } = \&run_seq;
  12         38  
56 12         20 *{ $_pkg.'::mce_step' } = \&run;
  12         40  
57             }
58              
59             ## Process module arguments.
60 12         44 while ( my $_argument = shift ) {
61 0         0 my $_arg = lc $_argument;
62              
63 0 0       0 $_p->{MAX_WORKERS} = shift, next if ( $_arg eq 'max_workers' );
64 0 0       0 $_p->{CHUNK_SIZE} = shift, next if ( $_arg eq 'chunk_size' );
65 0 0       0 $_p->{TMP_DIR} = shift, next if ( $_arg eq 'tmp_dir' );
66 0 0       0 $_p->{FREEZE} = shift, next if ( $_arg eq 'freeze' );
67 0 0       0 $_p->{THAW} = shift, next if ( $_arg eq 'thaw' );
68 0 0       0 $_p->{INIT_RELAY} = shift, next if ( $_arg eq 'init_relay' );
69 0 0       0 $_p->{USE_THREADS} = shift, next if ( $_arg eq 'use_threads' );
70              
71 0 0       0 shift, next if ( $_arg eq 'fast' ); # ignored
72              
73             ## Sereal 3.015+, if available, is used automatically by MCE 1.8+.
74 0 0       0 if ( $_arg eq 'sereal' ) {
75 0 0       0 if ( shift eq '0' ) {
76 0         0 require Storable;
77 0         0 $_p->{FREEZE} = \&Storable::freeze;
78 0         0 $_p->{THAW} = \&Storable::thaw;
79             }
80 0         0 next;
81             }
82              
83 0         0 _croak("Error: ($_argument) invalid module option");
84             }
85              
86 12         78 $_p->{MAX_WORKERS} = MCE::_parse_max_workers($_p->{MAX_WORKERS});
87              
88 12         65 MCE::_validate_number($_p->{MAX_WORKERS}, 'MAX_WORKERS', $_tag);
89             MCE::_validate_number($_p->{CHUNK_SIZE}, 'CHUNK_SIZE', $_tag)
90 12 50       39 unless ($_p->{CHUNK_SIZE} eq 'auto');
91              
92 12         246 return;
93             }
94              
95             ###############################################################################
96             ## ----------------------------------------------------------------------------
97             ## The task end callback for when a task completes.
98             ##
99             ###############################################################################
100              
101             sub _task_end {
102              
103 22     22   70 my ($_mce, $_task_id, $_task_name) = @_;
104 22         88 my $_pid = $_mce->{_init_pid}.'.'.$_mce->{_caller};
105              
106 22 100       74 if (defined $_mce->{user_tasks}->[$_task_id + 1]) {
107 10         30 my $n_workers = $_mce->{user_tasks}->[$_task_id + 1]->{max_workers};
108 10         230 $_queue->{$_pid}[$_task_id]->enqueue((undef) x $n_workers);
109             }
110              
111             $_params->{task_end}->($_mce, $_task_id, $_task_name)
112 22 50 33     84 if (exists $_params->{task_end} && ref $_params->{task_end} eq 'CODE');
113              
114 22         64 return;
115             }
116              
117             ###############################################################################
118             ## ----------------------------------------------------------------------------
119             ## Methods for MCE; step, enq, enqp, await.
120             ##
121             ###############################################################################
122              
123             {
124 12     12   84 no warnings 'redefine';
  12         24  
  12         58474  
125              
126             sub MCE::step {
127              
128 21 50   21 0 6657 my $x = shift; my $self = ref($x) ? $x : $MCE::MCE;
  21         70  
129 21         69 my $_pid = $self->{_init_pid}.'.'.$self->{_caller};
130              
131             _croak('MCE::step: method is not allowed by the manager process')
132 21 50       113 unless ($self->{_wid});
133              
134 21         136 my $_task_id = $self->{_task_id};
135              
136 21 50       71 if ($_task_id < $_last_task_id->{$_pid}) {
137 21         196 $_queue->{$_pid}[$_task_id]->enqueue($self->freeze([ @_ ]));
138             }
139             else {
140 0         0 _croak('MCE::step: method is not allowed by the last task');
141             }
142              
143 21         102 return;
144             }
145              
146             ############################################################################
147              
148             sub MCE::enq {
149              
150 0 0   0 0 0 my $x = shift; my $self = ref($x) ? $x : $MCE::MCE;
  0         0  
151 0         0 my $_pid = $self->{_init_pid}.'.'.$self->{_caller};
152 0         0 my $_name = shift;
153              
154             _croak('MCE::enq: method is not allowed by the manager process')
155 0 0       0 unless ($self->{_wid});
156             _croak('MCE::enq: (task_name) is not specified or valid')
157 0 0 0     0 if (!defined $_name || !exists $_lkup->{$_pid}{$_name});
158             _croak('MCE::enq: stepping to same task or backwards is not allowed')
159 0 0       0 if ($_lkup->{$_pid}{$_name} <= $self->{_task_id});
160              
161 0         0 my $_task_id = $_lkup->{$_pid}{$_name} - 1;
162              
163 0 0       0 if ($_task_id < $_last_task_id->{$_pid}) {
164 0 0       0 if (scalar @_ > 1) {
165 0         0 my @_items = map { $self->freeze([ $_ ]) } @_;
  0         0  
166 0         0 $_queue->{$_pid}[$_task_id]->enqueue(@_items);
167             }
168             else {
169 0         0 $_queue->{$_pid}[$_task_id]->enqueue($self->freeze([ @_ ]));
170             }
171             }
172             else {
173 0         0 _croak('MCE::enq: method is not allowed by the last task');
174             }
175              
176 0         0 return;
177             }
178              
179             ############################################################################
180              
181             sub MCE::enqp {
182              
183 0 0   0 0 0 my $x = shift; my $self = ref($x) ? $x : $MCE::MCE;
  0         0  
184 0         0 my $_pid = $self->{_init_pid}.'.'.$self->{_caller};
185 0         0 my ($_name, $_p) = (shift, shift);
186              
187             _croak('MCE::enqp: method is not allowed by the manager process')
188 0 0       0 unless ($self->{_wid});
189             _croak('MCE::enqp: (task_name) is not specified or valid')
190 0 0 0     0 if (!defined $_name || !exists $_lkup->{$_pid}{$_name});
191             _croak('MCE::enqp: stepping to same task or backwards is not allowed')
192 0 0       0 if ($_lkup->{$_pid}{$_name} <= $self->{_task_id});
193 0 0 0     0 _croak('MCE::enqp: (priority) is not an integer')
194             if (!looks_like_number($_p) || int($_p) != $_p);
195              
196 0         0 my $_task_id = $_lkup->{$_pid}{$_name} - 1;
197              
198 0 0       0 if ($_task_id < $_last_task_id->{$_pid}) {
199 0 0       0 if (scalar @_ > 1) {
200 0         0 my @_items = map { $self->freeze([ $_ ]) } @_;
  0         0  
201 0         0 $_queue->{$_pid}[$_task_id]->enqueuep($_p, @_items);
202             }
203             else {
204 0         0 $_queue->{$_pid}[$_task_id]->enqueuep($_p, $self->freeze([ @_ ]));
205             }
206             }
207             else {
208 0         0 _croak('MCE::enqp: method is not allowed by the last task');
209             }
210              
211 0         0 return;
212             }
213              
214             ############################################################################
215              
216             sub MCE::await {
217              
218 0 0   0 0 0 my $x = shift; my $self = ref($x) ? $x : $MCE::MCE;
  0         0  
219 0         0 my $_pid = $self->{_init_pid}.'.'.$self->{_caller};
220 0         0 my $_name = shift;
221              
222             _croak('MCE::await: method is not allowed by the manager process')
223 0 0       0 unless ($self->{_wid});
224             _croak('MCE::await: (task_name) is not specified or valid')
225 0 0 0     0 if (!defined $_name || !exists $_lkup->{$_pid}{$_name});
226             _croak('MCE::await: awaiting from same task or backwards is not allowed')
227 0 0       0 if ($_lkup->{$_pid}{$_name} <= $self->{_task_id});
228              
229 0   0     0 my $_task_id = $_lkup->{$_pid}{$_name} - 1; my $_t = shift || 0;
  0         0  
230              
231 0 0 0     0 _croak('MCE::await: (threshold) is not an integer')
232             if (!looks_like_number($_t) || int($_t) != $_t);
233              
234 0 0       0 if ($_task_id < $_last_task_id->{$_pid}) {
235 0         0 $_queue->{$_pid}[$_task_id]->await($_t);
236             } else {
237 0         0 _croak('MCE::await: method is not allowed by the last task');
238             }
239              
240 0         0 return;
241             }
242              
243             }
244              
245             ###############################################################################
246             ## ----------------------------------------------------------------------------
247             ## Init and finish routines.
248             ##
249             ###############################################################################
250              
251             sub MCE::Step::_guard::DESTROY {
252 0     0   0 my ($_pkg, $_id) = @{ $_[0] };
  0         0  
253              
254 0 0 0     0 if (defined $_pkg && $_id eq "$$.$_tid") {
255 0         0 @{ $_[0] } = ();
  0         0  
256 0         0 MCE::Step->finish($_pkg);
257             }
258              
259 0         0 return;
260             }
261              
262             sub init (@) {
263              
264 13 50 33 13 1 1036519 shift if (defined $_[0] && $_[0] eq 'MCE::Step');
265 13         247 my $_pkg = "$$.$_tid.".caller();
266              
267 13 50       186 $_params->{$_pkg} = (ref $_[0] eq 'HASH') ? shift : { @_ };
268              
269 13         39 @_ = ();
270              
271             defined wantarray
272 13 50       68 ? bless([$_pkg, "$$.$_tid"], MCE::Step::_guard::)
273             : ();
274             }
275              
276             sub finish (@) {
277              
278 25 50 33 25 1 14546 shift if (defined $_[0] && $_[0] eq 'MCE::Step');
279 25 100       2622 my $_pkg = (defined $_[0]) ? shift : "$$.$_tid.".caller();
280              
281 25 100 66     436 if ( $_pkg eq 'MCE' ) {
    100          
282 12         54 for my $_k ( keys %{ $_MCE } ) { MCE::Step->finish($_k, 1); }
  12         380  
  9         230  
283             }
284             elsif ( $_MCE->{$_pkg} && $_MCE->{$_pkg}{_init_pid} eq "$$.$_tid" ) {
285 4 50       196 $_MCE->{$_pkg}->shutdown(@_) if $_MCE->{$_pkg}{_spawned};
286              
287 4         32 delete $_lkup->{$_pkg};
288 4         17 delete $_last_task_id->{$_pkg};
289              
290 4         22 delete $_user_tasks->{$_pkg};
291 4         24 delete $_prev_c->{$_pkg};
292 4         19 delete $_prev_n->{$_pkg};
293 4         23 delete $_prev_t->{$_pkg};
294 4         17 delete $_prev_w->{$_pkg};
295 4         43 delete $_MCE->{$_pkg};
296              
297 4 50       20 if (defined $_queue->{$_pkg}) {
298 4         7 local $_;
299 4         8 $_->DESTROY() for (@{ $_queue->{$_pkg} });
  4         22  
300 4         43 delete $_queue->{$_pkg};
301             }
302             }
303              
304 25         72 @_ = ();
305              
306 25         126 return;
307             }
308              
309             ###############################################################################
310             ## ----------------------------------------------------------------------------
311             ## Parallel step with MCE -- file.
312             ##
313             ###############################################################################
314              
315             sub run_file (@) {
316              
317 4 50 33 4 1 4540 shift if (defined $_[0] && $_[0] eq 'MCE::Step');
318              
319 4 50       10 my ($_file, $_pos); my $_start_pos = (ref $_[0] eq 'HASH') ? 2 : 1;
  4         48  
320 4         40 my $_pid = "$$.$_tid.".caller();
321              
322 4 50       20 if (defined (my $_p = $_params->{$_pid})) {
323 4 50       20 delete $_p->{input_data} if (exists $_p->{input_data});
324 4 50       18 delete $_p->{sequence} if (exists $_p->{sequence});
325             }
326             else {
327 0         0 $_params->{$_pid} = {};
328             }
329              
330 4         46 for my $_i ($_start_pos .. @_ - 1) {
331 8         18 my $_r = ref $_[$_i];
332 8 100 66     78 if ($_r eq '' || $_r eq 'SCALAR' || $_r =~ /^(?:GLOB|FileHandle|IO::)/) {
      100        
333 4         8 $_file = $_[$_i]; $_pos = $_i;
  4         4  
334 4         10 last;
335             }
336             }
337              
338 4 100 66     72 if (defined $_file && ref $_file eq '' && $_file ne '') {
    50 66        
      33        
339 2 50       90 _croak("$_tag: ($_file) does not exist") unless (-e $_file);
340 2 50       18 _croak("$_tag: ($_file) is not readable") unless (-r $_file);
341 2 50       14 _croak("$_tag: ($_file) is not a plain file") unless (-f $_file);
342 2         78 $_params->{$_pid}{_file} = $_file;
343             }
344             elsif (ref $_file eq 'SCALAR' || ref($_file) =~ /^(?:GLOB|FileHandle|IO::)/) {
345 2         18 $_params->{$_pid}{_file} = $_file;
346             }
347             else {
348 0         0 _croak("$_tag: (file) is not specified or valid");
349             }
350              
351 4 50       20 if (defined $_pos) {
352 4         24 pop @_ for ($_pos .. @_ - 1);
353             }
354              
355 4         20 return run(@_);
356             }
357              
358             ###############################################################################
359             ## ----------------------------------------------------------------------------
360             ## Parallel step with MCE -- sequence.
361             ##
362             ###############################################################################
363              
364             sub run_seq (@) {
365              
366 2 50 33 2 1 1716 shift if (defined $_[0] && $_[0] eq 'MCE::Step');
367              
368 2 50       6 my ($_begin, $_end, $_pos); my $_start_pos = (ref $_[0] eq 'HASH') ? 2 : 1;
  2         8  
369 2         14 my $_pid = "$$.$_tid.".caller();
370              
371 2 50       306 if (defined (my $_p = $_params->{$_pid})) {
372 2 50       12 delete $_p->{sequence} if (exists $_p->{sequence});
373 2 50       8 delete $_p->{input_data} if (exists $_p->{input_data});
374 2 50       6 delete $_p->{_file} if (exists $_p->{_file});
375             }
376             else {
377 0         0 $_params->{$_pid} = {};
378             }
379              
380 2         30 for my $_i ($_start_pos .. @_ - 1) {
381 4         10 my $_r = ref $_[$_i];
382              
383 4 50 66     62 if ($_r eq '' || $_r =~ /^Math::/ || $_r eq 'HASH' || $_r eq 'ARRAY') {
      66        
      33        
384 2         4 $_pos = $_i;
385              
386 2 50 33     10 if ($_r eq '' || $_r =~ /^Math::/) {
    0          
    0          
387 2         6 $_begin = $_[$_pos], $_end = $_[$_pos + 1];
388             $_params->{$_pid}{sequence} = [
389 2         36 $_[$_pos], $_[$_pos + 1], $_[$_pos + 2], $_[$_pos + 3]
390             ];
391             }
392             elsif ($_r eq 'HASH') {
393 0         0 $_begin = $_[$_pos]->{begin}, $_end = $_[$_pos]->{end};
394 0         0 $_params->{$_pid}{sequence} = $_[$_pos];
395             }
396             elsif ($_r eq 'ARRAY') {
397 0         0 $_begin = $_[$_pos]->[0], $_end = $_[$_pos]->[1];
398 0         0 $_params->{$_pid}{sequence} = $_[$_pos];
399             }
400              
401 2         14 last;
402             }
403             }
404              
405             _croak("$_tag: (sequence) is not specified or valid")
406 2 50       12 unless (exists $_params->{$_pid}{sequence});
407 2 50       8 _croak("$_tag: (begin) is not specified for sequence")
408             unless (defined $_begin);
409 2 50       8 _croak("$_tag: (end) is not specified for sequence")
410             unless (defined $_end);
411              
412 2         88 $_params->{$_pid}{sequence_run} = undef;
413              
414 2 50       10 if (defined $_pos) {
415 2         14 pop @_ for ($_pos .. @_ - 1);
416             }
417              
418 2         10 return run(@_);
419             }
420              
421             ###############################################################################
422             ## ----------------------------------------------------------------------------
423             ## Parallel step with MCE.
424             ##
425             ###############################################################################
426              
427             sub run (@) {
428              
429 21 50 33 21 1 12837 shift if (defined $_[0] && $_[0] eq 'MCE::Step');
430              
431 21 100       111 my $_pkg = caller() eq 'MCE::Step' ? caller(1) : caller();
432 21         170 my $_pid = "$$.$_tid.$_pkg";
433              
434 21 100       227 if (ref $_[0] eq 'HASH') {
435 14 50       60 $_params->{$_pid} = {} unless defined $_params->{$_pid};
436 14         38 for my $_p (keys %{ $_[0] }) {
  14         62  
437 14         76 $_params->{$_pid}{$_p} = $_[0]->{$_p};
438             }
439              
440 14         26 shift;
441             }
442              
443             ## -------------------------------------------------------------------------
444              
445 21         52 my (@_code, @_name, @_thrs, @_wrks); my $_init_mce = 0; my $_pos = 0;
  21         36  
  21         46  
446              
447 21         44 %{ $_lkup->{$_pid} } = ();
  21         142  
448              
449 21         102 while (ref $_[0] eq 'CODE') {
450 35         77 push @_code, $_[0];
451              
452 35 50       112 if (defined (my $_p = $_params->{$_pid})) {
453             push @_name, (ref $_p->{task_name} eq 'ARRAY')
454 35 100       261 ? $_p->{task_name}->[$_pos] : undef;
455             push @_thrs, (ref $_p->{use_threads} eq 'ARRAY')
456 35 50       121 ? $_p->{use_threads}->[$_pos] : undef;
457             push @_wrks, (ref $_p->{max_workers} eq 'ARRAY')
458 35 100       166 ? $_p->{max_workers}->[$_pos] : undef;
459             }
460              
461 35 100       143 $_lkup->{$_pid}{ $_name[ $_pos ] } = $_pos if (defined $_name[ $_pos ]);
462              
463             $_init_mce = 1 if (
464             !defined $_prev_c->{$_pid}[$_pos] ||
465 35 100 66     276 $_prev_c->{$_pid}[$_pos] != $_code[$_pos]
466             );
467              
468 35 100       107 $_init_mce = 1 if ($_prev_n->{$_pid}[$_pos] ne $_name[$_pos]);
469 35 50       116 $_init_mce = 1 if ($_prev_t->{$_pid}[$_pos] ne $_thrs[$_pos]);
470 35 100       133 $_init_mce = 1 if ($_prev_w->{$_pid}[$_pos] ne $_wrks[$_pos]);
471              
472 35         73 $_prev_c->{$_pid}[$_pos] = $_code[$_pos];
473 35         95 $_prev_n->{$_pid}[$_pos] = $_name[$_pos];
474 35         67 $_prev_t->{$_pid}[$_pos] = $_thrs[$_pos];
475 35         135 $_prev_w->{$_pid}[$_pos] = $_wrks[$_pos];
476              
477 35         52 shift; $_pos++;
  35         104  
478             }
479              
480 21 50       72 if (defined $_prev_c->{$_pid}[$_pos]) {
481 0         0 pop @{ $_prev_c->{$_pid} } for ($_pos .. $#{ $_prev_c->{$_pid } });
  0         0  
  0         0  
482 0         0 pop @{ $_prev_n->{$_pid} } for ($_pos .. $#{ $_prev_n->{$_pid } });
  0         0  
  0         0  
483 0         0 pop @{ $_prev_t->{$_pid} } for ($_pos .. $#{ $_prev_t->{$_pid } });
  0         0  
  0         0  
484 0         0 pop @{ $_prev_w->{$_pid} } for ($_pos .. $#{ $_prev_w->{$_pid } });
  0         0  
  0         0  
485              
486 0         0 $_init_mce = 1;
487             }
488              
489 21 50       86 return unless (scalar @_code);
490              
491             ## -------------------------------------------------------------------------
492              
493 21         364 my $_input_data; my $_max_workers = $_def->{$_pkg}{MAX_WORKERS};
  21         102  
494 21         57 my $_r = ref $_[0];
495              
496 21 100 66     364 if (@_ == 1 && $_r =~ /^(?:ARRAY|HASH|SCALAR|GLOB|FileHandle|IO::|Iterator::)/) {
497 4         14 $_input_data = shift;
498             }
499              
500 21 50       114 if (defined (my $_p = $_params->{$_pid})) {
501             $_max_workers = MCE::_parse_max_workers($_p->{max_workers})
502 21 100 66     284 if (exists $_p->{max_workers} && ref $_p->{max_workers} ne 'ARRAY');
503              
504 21 100 100     224 delete $_p->{sequence} if (defined $_input_data || scalar @_);
505 21 50       93 delete $_p->{user_func} if (exists $_p->{user_func});
506 21 50       76 delete $_p->{user_tasks} if (exists $_p->{user_tasks});
507             }
508              
509 21 100 66     223 if (@_code > 1 && $_max_workers > 1) {
510 14         60 $_max_workers = int($_max_workers / @_code + 0.5) + 1;
511             }
512              
513             my $_chunk_size = MCE::_parse_chunk_size(
514 21         216 $_def->{$_pkg}{CHUNK_SIZE}, $_max_workers, $_params->{$_pid},
515             $_input_data, scalar @_
516             );
517              
518 21 50       79 if (defined (my $_p = $_params->{$_pid})) {
519 21 100       118 if (exists $_p->{_file}) {
520 4         10 $_input_data = delete $_p->{_file};
521             } else {
522 17 50       59 $_input_data = $_p->{input_data} if exists $_p->{input_data};
523             }
524             }
525              
526             ## -------------------------------------------------------------------------
527              
528 21         215 MCE::_save_state($_MCE->{$_pid});
529              
530 21 100 66     124 if ($_init_mce || !exists $_queue->{$_pid}) {
531 13 50       44 $_MCE->{$_pid}->shutdown() if (defined $_MCE->{$_pid});
532 13 50       66 $_queue->{$_pid} = [] if (!defined $_queue->{$_pid});
533              
534 13         27 my $_Q = $_queue->{$_pid};
535 13         32 pop(@{ $_Q })->DESTROY for (@_code .. @{ $_Q });
  13         64  
  0         0  
536              
537 6         78 push @{ $_Q }, MCE::Queue->new(await => 1)
538 13         93 for (@{ $_Q } .. @_code - 2);
  13         58  
539              
540 13         46 $_last_task_id->{$_pid} = @_code - 1;
541              
542             ## must clear arrays for nested session to work with Perl < v5.14
543 13         172 _gen_user_tasks($_pid,$_Q, [@_code],[@_name],[@_thrs],[@_wrks], $_chunk_size);
544              
545 13         66 @_code = @_name = @_thrs = @_wrks = ();
546              
547             my %_opts = (
548             max_workers => $_max_workers, task_name => $_tag,
549 13         96 user_tasks => $_user_tasks->{$_pid}, task_end => \&_task_end,
550             );
551              
552 13 50       59 if (defined (my $_p = $_params->{$_pid})) {
553 13         26 local $_;
554              
555 13         26 for (keys %{ $_p }) {
  13         47  
556 25 100 100     154 next if ($_ eq 'max_workers' && ref $_p->{max_workers} eq 'ARRAY');
557 19 100 66     209 next if ($_ eq 'task_name' && ref $_p->{task_name} eq 'ARRAY');
558 13 50 33     56 next if ($_ eq 'use_threads' && ref $_p->{use_threads} eq 'ARRAY');
559              
560 13 50       41 next if ($_ eq 'chunk_size');
561 13 50       45 next if ($_ eq 'input_data');
562 13 50       32 next if ($_ eq 'sequence_run');
563 13 50       42 next if ($_ eq 'task_end');
564              
565             _croak("$_tag: ($_) is not a valid constructor argument")
566 13 50       52 unless (exists $MCE::_valid_fields_new{$_});
567              
568 13         41 $_opts{$_} = $_p->{$_};
569             }
570             }
571              
572 13         45 for my $_k (qw/ tmp_dir freeze thaw init_relay use_threads /) {
573             $_opts{$_k} = $_def->{$_pkg}{uc($_k)}
574 65 50 33     280 if (exists $_def->{$_pkg}{uc($_k)} && !exists $_opts{$_k});
575             }
576              
577 13         185 $_MCE->{$_pid} = MCE->new(pkg => $_pkg, %_opts);
578             }
579             else {
580             ## Workers may persist after running. Thus, updating the MCE instance.
581             ## These options do not require respawning.
582 8 50       50 if (defined (my $_p = $_params->{$_pid})) {
583 8         20 for my $_k (qw(
584             RS interval stderr_file stdout_file user_error user_output
585             job_delay submit_delay on_post_exit on_post_run user_args
586             flush_file flush_stderr flush_stdout gather max_retries
587             )) {
588 128 100       338 $_MCE->{$_pid}{$_k} = $_p->{$_k} if (exists $_p->{$_k});
589             }
590             }
591             }
592              
593             ## -------------------------------------------------------------------------
594              
595 21 100       56 my @_a; my $_wa = wantarray; $_MCE->{$_pid}{gather} = \@_a if (defined $_wa);
  21         80  
  21         74  
596              
597 21 100       74 if (defined $_input_data) {
    100          
598 8         24 @_ = ();
599 8         6484 $_MCE->{$_pid}->process({ chunk_size => $_chunk_size }, $_input_data);
600 7         55 delete $_MCE->{$_pid}{input_data};
601             }
602             elsif (scalar @_) {
603 6         60 $_MCE->{$_pid}->process({ chunk_size => $_chunk_size }, \@_);
604 2         30 delete $_MCE->{$_pid}{input_data};
605             }
606             else {
607 7 100 66     138 if (defined $_params->{$_pid} && exists $_params->{$_pid}{sequence}) {
608             $_MCE->{$_pid}->run({
609             chunk_size => $_chunk_size,
610             sequence => $_params->{$_pid}{sequence}
611 2         14 }, 0);
612 2 50       22 if (exists $_params->{$_pid}{sequence_run}) {
613 2         6 delete $_params->{$_pid}{sequence_run};
614 2         4 delete $_params->{$_pid}{sequence};
615             }
616 2         10 delete $_MCE->{$_pid}{sequence};
617             }
618             else {
619 5         45 $_MCE->{$_pid}->run({ chunk_size => $_chunk_size }, 0);
620             }
621             }
622              
623 12         195 MCE::_restore_state();
624              
625             # destroy queue(s) if MCE::run requested workers to shutdown
626 12 50       55 if (!$_MCE->{$_pid}{_spawned}) {
627 0         0 $_->DESTROY() for @{ $_queue->{$_pid} };
  0         0  
628 0         0 delete $_queue->{$_pid};
629             }
630              
631 12 100       65 delete $_MCE->{$_pid}{gather} if (defined $_wa);
632              
633 12 100       618 return ((defined $_wa) ? @_a : ());
634             }
635              
636             ###############################################################################
637             ## ----------------------------------------------------------------------------
638             ## Private methods.
639             ##
640             ###############################################################################
641              
642             sub _croak {
643              
644 0     0   0 goto &MCE::_croak;
645             }
646              
647             sub _gen_user_func {
648              
649 6     6   24 my ($_qref, $_cref, $_chunk_size, $_pos) = @_;
650              
651 6         18 my $_q_in = $_qref->[$_pos - 1];
652 6         12 my $_code = $_cref->[$_pos];
653              
654             return sub {
655 10     10   23 my ($_mce) = @_;
656              
657 10         229 $_mce->{_next_jmp} = sub { goto _MCE_STEP__NEXT; };
  0         0  
658 10         63 $_mce->{_last_jmp} = sub { goto _MCE_STEP__LAST; };
  0         0  
659              
660             _MCE_STEP__NEXT:
661              
662 10         96 while (defined (local $_ = $_q_in->dequeue())) {
663 21         160 my $_args = $_mce->thaw($_); $_ = $_args->[0];
  21         45  
664 21         37 $_code->($_mce, @{ $_args });
  21         253  
665             }
666              
667             _MCE_STEP__LAST:
668              
669 10         45 return;
670 6         78 };
671             }
672              
673             sub _gen_user_tasks {
674              
675 13     13   58 my ($_pid, $_qref, $_cref, $_nref, $_tref, $_wref, $_chunk_size) = @_;
676              
677 13         28 @{ $_user_tasks->{$_pid} } = ();
  13         42  
678              
679 13         463 push @{ $_user_tasks->{$_pid} }, {
680             task_name => $_nref->[0],
681             use_threads => $_tref->[0],
682             max_workers => $_wref->[0],
683 30     30   407 user_func => sub { $_cref->[0]->(@_); return; }
  30         104  
684 13         26 };
685              
686 13         41 for my $_pos (1 .. @{ $_cref } - 1) {
  13         69  
687 6         12 push @{ $_user_tasks->{$_pid} }, {
  6         36  
688             task_name => $_nref->[$_pos],
689             use_threads => $_tref->[$_pos],
690             max_workers => $_wref->[$_pos],
691             user_func => _gen_user_func(
692             $_qref, $_cref, $_chunk_size, $_pos
693             )
694             };
695             }
696              
697 13         29 return;
698             }
699              
700             1;
701              
702             __END__
703              
704             ###############################################################################
705             ## ----------------------------------------------------------------------------
706             ## Module usage.
707             ##
708             ###############################################################################
709              
710             =head1 NAME
711              
712             MCE::Step - Parallel step model for building creative steps
713              
714             =head1 VERSION
715              
716             This document describes MCE::Step version 1.902
717              
718             =head1 DESCRIPTION
719              
720             MCE::Step is similar to L<MCE::Flow> for writing custom apps. The main
721             difference comes from the transparent use of queues between sub-tasks.
722             MCE 1.7 adds mce_enq, mce_enqp, and mce_await methods described under
723             QUEUE-LIKE FEATURES below.
724              
725             It is trivial to parallelize with mce_stream shown below.
726              
727             ## Native map function
728             my @a = map { $_ * 4 } map { $_ * 3 } map { $_ * 2 } 1..10000;
729              
730             ## Same as with MCE::Stream (processing from right to left)
731             @a = mce_stream
732             sub { $_ * 4 }, sub { $_ * 3 }, sub { $_ * 2 }, 1..10000;
733              
734             ## Pass an array reference to have writes occur simultaneously
735             mce_stream \@a,
736             sub { $_ * 4 }, sub { $_ * 3 }, sub { $_ * 2 }, 1..10000;
737              
738             However, let's have MCE::Step compute the same in parallel. Unlike the example
739             in L<MCE::Flow>, the use of MCE::Queue is totally transparent. This calls for
740             preserving output order provided by MCE::Candy.
741              
742             use MCE::Step;
743             use MCE::Candy;
744              
745             Next are the 3 sub-tasks. Compare these 3 sub-tasks with the same as described
746             in L<MCE::Flow>. The call to MCE->step simplifies the passing of data to
747             subsequent sub-task.
748              
749             sub task_a {
750             my @ans; my ($mce, $chunk_ref, $chunk_id) = @_;
751             push @ans, map { $_ * 2 } @{ $chunk_ref };
752             MCE->step(\@ans, $chunk_id);
753             }
754              
755             sub task_b {
756             my @ans; my ($mce, $chunk_ref, $chunk_id) = @_;
757             push @ans, map { $_ * 3 } @{ $chunk_ref };
758             MCE->step(\@ans, $chunk_id);
759             }
760              
761             sub task_c {
762             my @ans; my ($mce, $chunk_ref, $chunk_id) = @_;
763             push @ans, map { $_ * 4 } @{ $chunk_ref };
764             MCE->gather($chunk_id, \@ans);
765             }
766              
767             In summary, MCE::Step builds out a MCE instance behind the scene and starts
768             running. The task_name (shown), max_workers, and use_threads options can take
769             an anonymous array for specifying the values uniquely per each sub-task.
770              
771             The task_name option is required to use ->enq, ->enqp, and ->await.
772              
773             my @a;
774              
775             mce_step {
776             task_name => [ 'a', 'b', 'c' ],
777             gather => MCE::Candy::out_iter_array(\@a)
778              
779             }, \&task_a, \&task_b, \&task_c, 1..10000;
780              
781             print "@a\n";
782              
783             =head1 STEP DEMO
784              
785             In the demonstration below, one may call ->gather or ->step any number of times
786             although ->step is not allowed in the last sub-block. Data is gathered to @arr
787             which may likely be out-of-order. Gathering data is optional. All sub-blocks
788             receive $mce as the first argument.
789              
790             First, defining 3 sub-tasks.
791              
792             use MCE::Step;
793              
794             sub task_a {
795             my ($mce, $chunk_ref, $chunk_id) = @_;
796              
797             if ($_ % 2 == 0) {
798             MCE->gather($_);
799             # MCE->gather($_ * 4); ## Ok to gather multiple times
800             }
801             else {
802             MCE->print("a step: $_, $_ * $_\n");
803             MCE->step($_, $_ * $_);
804             # MCE->step($_, $_ * 4 ); ## Ok to step multiple times
805             }
806             }
807              
808             sub task_b {
809             my ($mce, $arg1, $arg2) = @_;
810              
811             MCE->print("b args: $arg1, $arg2\n");
812              
813             if ($_ % 3 == 0) { ## $_ is the same as $arg1
814             MCE->gather($_);
815             }
816             else {
817             MCE->print("b step: $_ * $_\n");
818             MCE->step($_ * $_);
819             }
820             }
821              
822             sub task_c {
823             my ($mce, $arg1) = @_;
824              
825             MCE->print("c: $_\n");
826             MCE->gather($_);
827             }
828              
829             Next, pass MCE options, using chunk_size 1, and run all 3 tasks in parallel.
830             Notice how max_workers and use_threads can take an anonymous array, similarly
831             to task_name.
832              
833             my @arr = mce_step {
834             task_name => [ 'a', 'b', 'c' ],
835             max_workers => [ 2, 2, 2 ],
836             use_threads => [ 0, 0, 0 ],
837             chunk_size => 1
838              
839             }, \&task_a, \&task_b, \&task_c, 1..10;
840              
841             Finally, sort the array and display its contents.
842              
843             @arr = sort { $a <=> $b } @arr;
844              
845             print "\n@arr\n\n";
846              
847             -- Output
848              
849             a step: 1, 1 * 1
850             a step: 3, 3 * 3
851             a step: 5, 5 * 5
852             a step: 7, 7 * 7
853             a step: 9, 9 * 9
854             b args: 1, 1
855             b step: 1 * 1
856             b args: 3, 9
857             b args: 7, 49
858             b step: 7 * 7
859             b args: 5, 25
860             b step: 5 * 5
861             b args: 9, 81
862             c: 1
863             c: 49
864             c: 25
865              
866             1 2 3 4 6 8 9 10 25 49
867              
868             =head1 SYNOPSIS when CHUNK_SIZE EQUALS 1
869              
870             Although L<MCE::Loop> may be preferred for running using a single code block,
871             the text below also applies to this module, particularly for the first block.
872              
873             All models in MCE default to 'auto' for chunk_size. The arguments for the block
874             are the same as writing a user_func block using the Core API.
875              
876             Beginning with MCE 1.5, the next input item is placed into the input scalar
877             variable $_ when chunk_size equals 1. Otherwise, $_ points to $chunk_ref
878             containing many items. Basically, line 2 below may be omitted from your code
879             when using $_. One can call MCE->chunk_id to obtain the current chunk id.
880              
881             line 1: user_func => sub {
882             line 2: my ($mce, $chunk_ref, $chunk_id) = @_;
883             line 3:
884             line 4: $_ points to $chunk_ref->[0]
885             line 5: in MCE 1.5 when chunk_size == 1
886             line 6:
887             line 7: $_ points to $chunk_ref
888             line 8: in MCE 1.5 when chunk_size > 1
889             line 9: }
890              
891             Follow this synopsis when chunk_size equals one. Looping is not required from
892             inside the first block. Hence, the block is called once per each item.
893              
894             ## Exports mce_step, mce_step_f, and mce_step_s
895             use MCE::Step;
896              
897             MCE::Step->init(
898             chunk_size => 1
899             );
900              
901             ## Array or array_ref
902             mce_step sub { do_work($_) }, 1..10000;
903             mce_step sub { do_work($_) }, \@list;
904              
905             ## Important; pass an array_ref for deeply input data
906             mce_step sub { do_work($_) }, [ [ 0, 1 ], [ 0, 2 ], ... ];
907             mce_step sub { do_work($_) }, \@deeply_list;
908              
909             ## File path, glob ref, IO::All::{ File, Pipe, STDIO } obj, or scalar ref
910             ## Workers read directly and not involve the manager process
911             mce_step_f sub { chomp; do_work($_) }, "/path/to/file"; # efficient
912              
913             ## Involves the manager process, therefore slower
914             mce_step_f sub { chomp; do_work($_) }, $file_handle;
915             mce_step_f sub { chomp; do_work($_) }, $io;
916             mce_step_f sub { chomp; do_work($_) }, \$scalar;
917              
918             ## Sequence of numbers (begin, end [, step, format])
919             mce_step_s sub { do_work($_) }, 1, 10000, 5;
920             mce_step_s sub { do_work($_) }, [ 1, 10000, 5 ];
921              
922             mce_step_s sub { do_work($_) }, {
923             begin => 1, end => 10000, step => 5, format => undef
924             };
925              
926             =head1 SYNOPSIS when CHUNK_SIZE is GREATER THAN 1
927              
928             Follow this synopsis when chunk_size equals 'auto' or greater than 1.
929             This means having to loop through the chunk from inside the first block.
930              
931             use MCE::Step;
932              
933             MCE::Step->init( ## Chunk_size defaults to 'auto' when
934             chunk_size => 'auto' ## not specified. Therefore, the init
935             ); ## function may be omitted.
936              
937             ## Syntax is shown for mce_step for demonstration purposes.
938             ## Looping inside the block is the same for mce_step_f and
939             ## mce_step_s.
940              
941             ## Array or array_ref
942             mce_step sub { do_work($_) for (@{ $_ }) }, 1..10000;
943             mce_step sub { do_work($_) for (@{ $_ }) }, \@list;
944              
945             ## Important; pass an array_ref for deeply input data
946             mce_step sub { do_work($_) for (@{ $_ }) }, [ [ 0, 1 ], [ 0, 2 ], ... ];
947             mce_step sub { do_work($_) for (@{ $_ }) }, \@deeply_list;
948              
949             ## Resembles code using the core MCE API
950             mce_step sub {
951             my ($mce, $chunk_ref, $chunk_id) = @_;
952              
953             for (@{ $chunk_ref }) {
954             do_work($_);
955             }
956              
957             }, 1..10000;
958              
959             Chunking reduces the number of IPC calls behind the scene. Think in terms of
960             chunks whenever processing a large amount of data. For relatively small data,
961             choosing 1 for chunk_size is fine.
962              
963             =head1 OVERRIDING DEFAULTS
964              
965             The following list options which may be overridden when loading the module.
966             The fast option is obsolete in 1.867 onwards; ignored if specified.
967              
968             use Sereal qw( encode_sereal decode_sereal );
969             use CBOR::XS qw( encode_cbor decode_cbor );
970             use JSON::XS qw( encode_json decode_json );
971              
972             use MCE::Step
973             max_workers => 8, # Default 'auto'
974             chunk_size => 500, # Default 'auto'
975             tmp_dir => "/path/to/app/tmp", # $MCE::Signal::tmp_dir
976             freeze => \&encode_sereal, # \&Storable::freeze
977             thaw => \&decode_sereal, # \&Storable::thaw
978             init_relay => 0, # Default undef; MCE 1.882+
979             use_threads => 0, # Default undef; MCE 1.882+
980             ;
981              
982             From MCE 1.8 onwards, Sereal 3.015+ is loaded automatically if available.
983             Specify C<< Sereal => 0 >> to use Storable instead.
984              
985             use MCE::Step Sereal => 0;
986              
987             =head1 CUSTOMIZING MCE
988              
989             =over 3
990              
991             =item MCE::Step->init ( options )
992              
993             =item MCE::Step::init { options }
994              
995             =back
996              
997             The init function accepts a hash of MCE options. Unlike with MCE::Stream,
998             both gather and bounds_only options may be specified (not shown below).
999              
1000             In scalar context (API available since 1.897), call C<MCE::Step->finish>
1001             automatically upon leaving the scope or program.
1002              
1003             use MCE::Step;
1004              
1005             my $guard = MCE::Step->init(
1006             chunk_size => 1, max_workers => 4,
1007              
1008             user_begin => sub {
1009             print "## ", MCE->wid, " started\n";
1010             },
1011              
1012             user_end => sub {
1013             print "## ", MCE->wid, " completed\n";
1014             }
1015             );
1016              
1017             my %a = mce_step sub { MCE->gather($_, $_ * $_) }, 1..100;
1018              
1019             print "\n", "@a{1..100}", "\n";
1020              
1021             -- Output
1022              
1023             ## 3 started
1024             ## 1 started
1025             ## 4 started
1026             ## 2 started
1027             ## 3 completed
1028             ## 4 completed
1029             ## 1 completed
1030             ## 2 completed
1031              
1032             1 4 9 16 25 36 49 64 81 100 121 144 169 196 225 256 289 324 361
1033             400 441 484 529 576 625 676 729 784 841 900 961 1024 1089 1156
1034             1225 1296 1369 1444 1521 1600 1681 1764 1849 1936 2025 2116 2209
1035             2304 2401 2500 2601 2704 2809 2916 3025 3136 3249 3364 3481 3600
1036             3721 3844 3969 4096 4225 4356 4489 4624 4761 4900 5041 5184 5329
1037             5476 5625 5776 5929 6084 6241 6400 6561 6724 6889 7056 7225 7396
1038             7569 7744 7921 8100 8281 8464 8649 8836 9025 9216 9409 9604 9801
1039             10000
1040              
1041             Like with MCE::Step->init above, MCE options may be specified using an
1042             anonymous hash for the first argument. Notice how task_name, max_workers,
1043             and use_threads can take an anonymous array for setting uniquely per
1044             each code block.
1045              
1046             Unlike MCE::Stream which processes from right-to-left, MCE::Step begins
1047             with the first code block, thus processing from left-to-right.
1048              
1049             The following takes 9 seconds to complete. The 9 seconds is from having
1050             only 2 workers assigned for the last sub-task and waiting 1 or 2 seconds
1051             initially before calling MCE->step.
1052              
1053             Removing both calls to MCE->step will cause the script to complete in just
1054             1 second. The reason is due to the 2nd and subsequent sub-tasks awaiting
1055             data from an internal queue. Workers terminate upon receiving an undef.
1056              
1057             use threads;
1058             use MCE::Step;
1059              
1060             my @a = mce_step {
1061             task_name => [ 'a', 'b', 'c' ],
1062             max_workers => [ 3, 4, 2, ],
1063             use_threads => [ 1, 0, 0, ],
1064              
1065             user_end => sub {
1066             my ($mce, $task_id, $task_name) = @_;
1067             MCE->print("$task_id - $task_name completed\n");
1068             },
1069              
1070             task_end => sub {
1071             my ($mce, $task_id, $task_name) = @_;
1072             MCE->print("$task_id - $task_name ended\n");
1073             }
1074             },
1075             sub { sleep 1; MCE->step(""); }, ## 3 workers, named a
1076             sub { sleep 2; MCE->step(""); }, ## 4 workers, named b
1077             sub { sleep 3; }; ## 2 workers, named c
1078              
1079             -- Output
1080              
1081             0 - a completed
1082             0 - a completed
1083             0 - a completed
1084             0 - a ended
1085             1 - b completed
1086             1 - b completed
1087             1 - b completed
1088             1 - b completed
1089             1 - b ended
1090             2 - c completed
1091             2 - c completed
1092             2 - c ended
1093              
1094             =head1 API DOCUMENTATION
1095              
1096             Although input data is optional for MCE::Step, the following assumes chunk_size
1097             equals 1 in order to demonstrate all the possibilities for providing input data.
1098              
1099             =over 3
1100              
1101             =item MCE::Step->run ( sub { code }, list )
1102              
1103             =item mce_step sub { code }, list
1104              
1105             =back
1106              
1107             Input data may be defined using a list, an array ref, or a hash ref.
1108              
1109             Unlike MCE::Loop, Map, and Grep which take a block as C<{ ... }>, Step takes a
1110             C<sub { ... }> or a code reference. The other difference is that the comma is
1111             needed after the block.
1112              
1113             # $_ contains the item when chunk_size => 1
1114              
1115             mce_step sub { do_work($_) }, 1..1000;
1116             mce_step sub { do_work($_) }, \@list;
1117              
1118             # Important; pass an array_ref for deeply input data
1119              
1120             mce_step sub { do_work($_) }, [ [ 0, 1 ], [ 0, 2 ], ... ];
1121             mce_step sub { do_work($_) }, \@deeply_list;
1122              
1123             # Chunking; any chunk_size => 1 or greater
1124              
1125             my %res = mce_step sub {
1126             my ($mce, $chunk_ref, $chunk_id) = @_;
1127             my %ret;
1128             for my $item (@{ $chunk_ref }) {
1129             $ret{$item} = $item * 2;
1130             }
1131             MCE->gather(%ret);
1132             },
1133             \@list;
1134              
1135             # Input hash; current API available since 1.828
1136              
1137             my %res = mce_step sub {
1138             my ($mce, $chunk_ref, $chunk_id) = @_;
1139             my %ret;
1140             for my $key (keys %{ $chunk_ref }) {
1141             $ret{$key} = $chunk_ref->{$key} * 2;
1142             }
1143             MCE->gather(%ret);
1144             },
1145             \%hash;
1146              
1147             # Unlike MCE::Loop, MCE::Step doesn't need input to run
1148              
1149             mce_step { max_workers => 4 }, sub {
1150             MCE->say( MCE->wid );
1151             };
1152              
1153             # ... and can run multiple tasks
1154              
1155             mce_step {
1156             max_workers => [ 1, 3 ],
1157             task_name => [ 'p', 'c' ]
1158             },
1159             sub {
1160             # 1 producer
1161             MCE->say( "producer: ", MCE->wid );
1162             },
1163             sub {
1164             # 3 consumers
1165             MCE->say( "consumer: ", MCE->wid );
1166             };
1167              
1168             # Here, options are specified via init
1169              
1170             MCE::Step->init(
1171             max_workers => [ 1, 3 ],
1172             task_name => [ 'p', 'c' ]
1173             );
1174              
1175             mce_step \&producer, \&consumers;
1176              
1177             =over 3
1178              
1179             =item MCE::Step->run_file ( sub { code }, file )
1180              
1181             =item mce_step_f sub { code }, file
1182              
1183             =back
1184              
1185             The fastest of these is the /path/to/file. Workers communicate the next offset
1186             position among themselves with zero interaction by the manager process.
1187              
1188             C<IO::All> { File, Pipe, STDIO } is supported since MCE 1.845.
1189              
1190             # $_ contains the line when chunk_size => 1
1191              
1192             mce_step_f sub { $_ }, "/path/to/file"; # faster
1193             mce_step_f sub { $_ }, $file_handle;
1194             mce_step_f sub { $_ }, $io; # IO::All
1195             mce_step_f sub { $_ }, \$scalar;
1196              
1197             # chunking, any chunk_size => 1 or greater
1198              
1199             my %res = mce_step_f sub {
1200             my ($mce, $chunk_ref, $chunk_id) = @_;
1201             my $buf = '';
1202             for my $line (@{ $chunk_ref }) {
1203             $buf .= $line;
1204             }
1205             MCE->gather($chunk_id, $buf);
1206             },
1207             "/path/to/file";
1208              
1209             =over 3
1210              
1211             =item MCE::Step->run_seq ( sub { code }, $beg, $end [, $step, $fmt ] )
1212              
1213             =item mce_step_s sub { code }, $beg, $end [, $step, $fmt ]
1214              
1215             =back
1216              
1217             Sequence may be defined as a list, an array reference, or a hash reference.
1218             The functions require both begin and end values to run. Step and format are
1219             optional. The format is passed to sprintf (% may be omitted below).
1220              
1221             my ($beg, $end, $step, $fmt) = (10, 20, 0.1, "%4.1f");
1222              
1223             # $_ contains the sequence number when chunk_size => 1
1224              
1225             mce_step_s sub { $_ }, $beg, $end, $step, $fmt;
1226             mce_step_s sub { $_ }, [ $beg, $end, $step, $fmt ];
1227              
1228             mce_step_s sub { $_ }, {
1229             begin => $beg, end => $end,
1230             step => $step, format => $fmt
1231             };
1232              
1233             # chunking, any chunk_size => 1 or greater
1234              
1235             my %res = mce_step_s sub {
1236             my ($mce, $chunk_ref, $chunk_id) = @_;
1237             my $buf = '';
1238             for my $seq (@{ $chunk_ref }) {
1239             $buf .= "$seq\n";
1240             }
1241             MCE->gather($chunk_id, $buf);
1242             },
1243             [ $beg, $end ];
1244              
1245             The sequence engine can compute 'begin' and 'end' items only, for the chunk,
1246             and not the items in between (hence boundaries only). This option applies
1247             to sequence only and has no effect when chunk_size equals 1.
1248              
1249             The time to run is 0.006s below. This becomes 0.827s without the bounds_only
1250             option due to computing all items in between, thus creating a very large
1251             array. Basically, specify bounds_only => 1 when boundaries is all you need
1252             for looping inside the block; e.g. Monte Carlo simulations.
1253              
1254             Time was measured using 1 worker to emphasize the difference.
1255              
1256             use MCE::Step;
1257              
1258             MCE::Step->init(
1259             max_workers => 1, chunk_size => 1_250_000,
1260             bounds_only => 1
1261             );
1262              
1263             # Typically, the input scalar $_ contains the sequence number
1264             # when chunk_size => 1, unless the bounds_only option is set
1265             # which is the case here. Thus, $_ points to $chunk_ref.
1266              
1267             mce_step_s sub {
1268             my ($mce, $chunk_ref, $chunk_id) = @_;
1269              
1270             # $chunk_ref contains 2 items, not 1_250_000
1271             # my ( $begin, $end ) = ( $_->[0], $_->[1] );
1272              
1273             my $begin = $chunk_ref->[0];
1274             my $end = $chunk_ref->[1];
1275              
1276             # for my $seq ( $begin .. $end ) {
1277             # ...
1278             # }
1279              
1280             MCE->printf("%7d .. %8d\n", $begin, $end);
1281             },
1282             [ 1, 10_000_000 ];
1283              
1284             -- Output
1285              
1286             1 .. 1250000
1287             1250001 .. 2500000
1288             2500001 .. 3750000
1289             3750001 .. 5000000
1290             5000001 .. 6250000
1291             6250001 .. 7500000
1292             7500001 .. 8750000
1293             8750001 .. 10000000
1294              
1295             =over 3
1296              
1297             =item MCE::Step->run ( { input_data => iterator }, sub { code } )
1298              
1299             =item mce_step { input_data => iterator }, sub { code }
1300              
1301             =back
1302              
1303             An iterator reference may be specified for input_data. The only other way
1304             is to specify input_data via MCE::Step->init. This prevents MCE::Step from
1305             configuring the iterator reference as another user task which will not work.
1306              
1307             Iterators are described under section "SYNTAX for INPUT_DATA" at L<MCE::Core>.
1308              
1309             MCE::Step->init(
1310             input_data => iterator
1311             );
1312              
1313             mce_step sub { $_ };
1314              
1315             =head1 QUEUE-LIKE FEATURES
1316              
1317             =over 3
1318              
1319             =item MCE->step ( item )
1320              
1321             =item MCE->step ( arg1, arg2, argN )
1322              
1323             =back
1324              
1325             The ->step method is the simplest form for passing elements into the next
1326             sub-task.
1327              
1328             use MCE::Step;
1329              
1330             sub provider {
1331             MCE->step( $_, rand ) for 10 .. 19;
1332             }
1333              
1334             sub consumer {
1335             my ( $mce, @args ) = @_;
1336             MCE->printf( "%d: %d, %03.06f\n", MCE->wid, $args[0], $args[1] );
1337             }
1338              
1339             MCE::Step->init(
1340             task_name => [ 'p', 'c' ],
1341             max_workers => [ 1 , 4 ]
1342             );
1343              
1344             mce_step \&provider, \&consumer;
1345              
1346             -- Output
1347              
1348             2: 10, 0.583551
1349             4: 11, 0.175319
1350             3: 12, 0.843662
1351             4: 15, 0.748302
1352             2: 14, 0.591752
1353             3: 16, 0.357858
1354             5: 13, 0.953528
1355             4: 17, 0.698907
1356             2: 18, 0.985448
1357             3: 19, 0.146548
1358              
1359             =over 3
1360              
1361             =item MCE->enq ( task_name, item )
1362              
1363             =item MCE->enq ( task_name, [ arg1, arg2, argN ] )
1364              
1365             =item MCE->enq ( task_name, [ arg1, arg2 ], [ arg1, arg2 ] )
1366              
1367             =item MCE->enqp ( task_name, priority, item )
1368              
1369             =item MCE->enqp ( task_name, priority, [ arg1, arg2, argN ] )
1370              
1371             =item MCE->enqp ( task_name, priority, [ arg1, arg2 ], [ arg1, arg2 ] )
1372              
1373             =back
1374              
1375             The MCE 1.7 release enables finer control. Unlike ->step, which take multiple
1376             arguments, the ->enq and ->enqp methods push items at the end of the array
1377             internally. Passing multiple arguments is possible by enclosing the arguments
1378             inside an anonymous array.
1379              
1380             The direction of flow is forward only. Thus, stepping to itself or backwards
1381             will cause an error.
1382              
1383             use MCE::Step;
1384              
1385             sub provider {
1386             if ( MCE->wid % 2 == 0 ) {
1387             MCE->enq( 'c', [ $_, rand ] ) for 10 .. 19;
1388             } else {
1389             MCE->enq( 'd', [ $_, rand ] ) for 20 .. 29;
1390             }
1391             }
1392              
1393             sub consumer_c {
1394             my ( $mce, $args ) = @_;
1395             MCE->printf( "C%d: %d, %03.06f\n", MCE->wid, $args->[0], $args->[1] );
1396             }
1397              
1398             sub consumer_d {
1399             my ( $mce, $args ) = @_;
1400             MCE->printf( "D%d: %d, %03.06f\n", MCE->wid, $args->[0], $args->[1] );
1401             }
1402              
1403             MCE::Step->init(
1404             task_name => [ 'p', 'c', 'd' ],
1405             max_workers => [ 2 , 3 , 3 ]
1406             );
1407              
1408             mce_step \&provider, \&consumer_c, \&consumer_d;
1409              
1410             -- Output
1411              
1412             C4: 10, 0.527531
1413             D6: 20, 0.420108
1414             C5: 11, 0.839770
1415             D8: 21, 0.386414
1416             C3: 12, 0.834645
1417             C4: 13, 0.191014
1418             D6: 23, 0.924027
1419             C5: 14, 0.899357
1420             D8: 24, 0.706186
1421             C4: 15, 0.083823
1422             D7: 22, 0.479708
1423             D6: 25, 0.073882
1424             C3: 16, 0.207446
1425             D8: 26, 0.560755
1426             C5: 17, 0.198157
1427             D7: 27, 0.324909
1428             C4: 18, 0.147505
1429             C5: 19, 0.318371
1430             D6: 28, 0.220465
1431             D8: 29, 0.630111
1432              
1433             =over 3
1434              
1435             =item MCE->await ( task_name, pending_threshold )
1436              
1437             =back
1438              
1439             Providers may sometime run faster than consumers. Thus, increasing memory
1440             consumption. MCE 1.7 adds the ->await method for pausing momentarily until
1441             the receiving sub-task reaches the minimum threshold for the number of
1442             items pending in its queue.
1443              
1444             use MCE::Step;
1445             use Time::HiRes 'sleep';
1446              
1447             sub provider {
1448             for ( 10 .. 29 ) {
1449             # wait until 10 or less items pending
1450             MCE->await( 'c', 10 );
1451             # forward item to a later sub-task ( 'c' comes after 'p' )
1452             MCE->enq( 'c', [ $_, rand ] );
1453             }
1454             }
1455              
1456             sub consumer {
1457             my ($mce, $args) = @_;
1458             MCE->printf( "%d: %d, %03.06f\n", MCE->wid, $args->[0], $args->[1] );
1459             sleep 0.05;
1460             }
1461              
1462             MCE::Step->init(
1463             task_name => [ 'p', 'c' ],
1464             max_workers => [ 1 , 4 ]
1465             );
1466              
1467             mce_step \&provider, \&consumer;
1468              
1469             -- Output
1470              
1471             3: 10, 0.527307
1472             2: 11, 0.036193
1473             5: 12, 0.987168
1474             4: 13, 0.998140
1475             5: 14, 0.219526
1476             4: 15, 0.061609
1477             2: 16, 0.557664
1478             3: 17, 0.658684
1479             4: 18, 0.240932
1480             3: 19, 0.241042
1481             5: 20, 0.884830
1482             2: 21, 0.902223
1483             4: 22, 0.699223
1484             3: 23, 0.208270
1485             5: 24, 0.438919
1486             2: 25, 0.268854
1487             4: 26, 0.596425
1488             5: 27, 0.979818
1489             2: 28, 0.918173
1490             3: 29, 0.358266
1491              
1492             =head1 GATHERING DATA
1493              
1494             Unlike MCE::Map where gather and output order are done for you automatically,
1495             the gather method is used to have results sent back to the manager process.
1496              
1497             use MCE::Step chunk_size => 1;
1498              
1499             ## Output order is not guaranteed.
1500             my @a = mce_step sub { MCE->gather($_ * 2) }, 1..100;
1501             print "@a\n\n";
1502              
1503             ## Outputs to a hash instead (key, value).
1504             my %h1 = mce_step sub { MCE->gather($_, $_ * 2) }, 1..100;
1505             print "@h1{1..100}\n\n";
1506              
1507             ## This does the same thing due to chunk_id starting at one.
1508             my %h2 = mce_step sub { MCE->gather(MCE->chunk_id, $_ * 2) }, 1..100;
1509             print "@h2{1..100}\n\n";
1510              
1511             The gather method may be called multiple times within the block unlike return
1512             which would leave the block. Therefore, think of gather as yielding results
1513             immediately to the manager process without actually leaving the block.
1514              
1515             use MCE::Step chunk_size => 1, max_workers => 3;
1516              
1517             my @hosts = qw(
1518             hosta hostb hostc hostd hoste
1519             );
1520              
1521             my %h3 = mce_step sub {
1522             my ($output, $error, $status); my $host = $_;
1523              
1524             ## Do something with $host;
1525             $output = "Worker ". MCE->wid .": Hello from $host";
1526              
1527             if (MCE->chunk_id % 3 == 0) {
1528             ## Simulating an error condition
1529             local $? = 1; $status = $?;
1530             $error = "Error from $host"
1531             }
1532             else {
1533             $status = 0;
1534             }
1535              
1536             ## Ensure unique keys (key, value) when gathering to
1537             ## a hash.
1538             MCE->gather("$host.out", $output);
1539             MCE->gather("$host.err", $error) if (defined $error);
1540             MCE->gather("$host.sta", $status);
1541              
1542             }, @hosts;
1543              
1544             foreach my $host (@hosts) {
1545             print $h3{"$host.out"}, "\n";
1546             print $h3{"$host.err"}, "\n" if (exists $h3{"$host.err"});
1547             print "Exit status: ", $h3{"$host.sta"}, "\n\n";
1548             }
1549              
1550             -- Output
1551              
1552             Worker 3: Hello from hosta
1553             Exit status: 0
1554              
1555             Worker 2: Hello from hostb
1556             Exit status: 0
1557              
1558             Worker 1: Hello from hostc
1559             Error from hostc
1560             Exit status: 1
1561              
1562             Worker 3: Hello from hostd
1563             Exit status: 0
1564              
1565             Worker 2: Hello from hoste
1566             Exit status: 0
1567              
1568             The following uses an anonymous array containing 3 elements when gathering
1569             data. Serialization is automatic behind the scene.
1570              
1571             my %h3 = mce_step sub {
1572             ...
1573              
1574             MCE->gather($host, [$output, $error, $status]);
1575              
1576             }, @hosts;
1577              
1578             foreach my $host (@hosts) {
1579             print $h3{$host}->[0], "\n";
1580             print $h3{$host}->[1], "\n" if (defined $h3{$host}->[1]);
1581             print "Exit status: ", $h3{$host}->[2], "\n\n";
1582             }
1583              
1584             Although MCE::Map comes to mind, one may want additional control when
1585             gathering data such as retaining output order.
1586              
1587             use MCE::Step;
1588              
1589             sub preserve_order {
1590             my %tmp; my $order_id = 1; my $gather_ref = $_[0];
1591              
1592             return sub {
1593             $tmp{ (shift) } = \@_;
1594              
1595             while (1) {
1596             last unless exists $tmp{$order_id};
1597             push @{ $gather_ref }, @{ delete $tmp{$order_id++} };
1598             }
1599              
1600             return;
1601             };
1602             }
1603              
1604             ## Workers persist for the most part after running. Though, not always
1605             ## the case and depends on Perl. Pass a reference to a subroutine if
1606             ## workers must persist; e.g. mce_step { ... }, \&foo, 1..100000.
1607              
1608             MCE::Step->init(
1609             chunk_size => 'auto', max_workers => 'auto'
1610             );
1611              
1612             for (1..2) {
1613             my @m2;
1614              
1615             mce_step {
1616             gather => preserve_order(\@m2)
1617             },
1618             sub {
1619             my @a; my ($mce, $chunk_ref, $chunk_id) = @_;
1620              
1621             ## Compute the entire chunk data at once.
1622             push @a, map { $_ * 2 } @{ $chunk_ref };
1623              
1624             ## Afterwards, invoke the gather feature, which
1625             ## will direct the data to the callback function.
1626             MCE->gather(MCE->chunk_id, @a);
1627              
1628             }, 1..100000;
1629              
1630             print scalar @m2, "\n";
1631             }
1632              
1633             MCE::Step->finish;
1634              
1635             All 6 models support 'auto' for chunk_size unlike the Core API. Think of the
1636             models as the basis for providing JIT for MCE. They create the instance, tune
1637             max_workers, and tune chunk_size automatically regardless of the hardware.
1638              
1639             The following does the same thing using the Core API. Workers persist after
1640             running.
1641              
1642             use MCE;
1643              
1644             sub preserve_order {
1645             ...
1646             }
1647              
1648             my $mce = MCE->new(
1649             max_workers => 'auto', chunk_size => 8000,
1650              
1651             user_func => sub {
1652             my @a; my ($mce, $chunk_ref, $chunk_id) = @_;
1653              
1654             ## Compute the entire chunk data at once.
1655             push @a, map { $_ * 2 } @{ $chunk_ref };
1656              
1657             ## Afterwards, invoke the gather feature, which
1658             ## will direct the data to the callback function.
1659             MCE->gather(MCE->chunk_id, @a);
1660             }
1661             );
1662              
1663             for (1..2) {
1664             my @m2;
1665              
1666             $mce->process({ gather => preserve_order(\@m2) }, [1..100000]);
1667              
1668             print scalar @m2, "\n";
1669             }
1670              
1671             $mce->shutdown;
1672              
1673             =head1 MANUAL SHUTDOWN
1674              
1675             =over 3
1676              
1677             =item MCE::Step->finish
1678              
1679             =item MCE::Step::finish
1680              
1681             =back
1682              
1683             Workers remain persistent as much as possible after running. Shutdown occurs
1684             automatically when the script terminates. Call finish when workers are no
1685             longer needed.
1686              
1687             use MCE::Step;
1688              
1689             MCE::Step->init(
1690             chunk_size => 20, max_workers => 'auto'
1691             );
1692              
1693             mce_step sub { ... }, 1..100;
1694              
1695             MCE::Step->finish;
1696              
1697             =head1 INDEX
1698              
1699             L<MCE|MCE>, L<MCE::Core>
1700              
1701             =head1 AUTHOR
1702              
1703             Mario E. Roy, S<E<lt>marioeroy AT gmail DOT comE<gt>>
1704              
1705             =cut
1706