File Coverage

blib/lib/MCE/Flow.pm
Criterion Covered Total %
statement 198 242 81.8
branch 107 182 58.7
condition 43 77 55.8
subroutine 14 17 82.3
pod 5 5 100.0
total 367 523 70.1


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## Parallel flow model for building creative applications.
4             ##
5             ###############################################################################
6              
7             package MCE::Flow;
8              
9 46     46   3409283 use strict;
  46         94  
  46         2034  
10 46     46   244 use warnings;
  46         250  
  46         4647  
11              
12 46     46   409 no warnings qw( threads recursion uninitialized );
  46         807  
  46         3687  
13              
14             our $VERSION = '1.902';
15              
16             ## no critic (BuiltinFunctions::ProhibitStringyEval)
17             ## no critic (Subroutines::ProhibitSubroutinePrototypes)
18             ## no critic (TestingAndDebugging::ProhibitNoStrict)
19              
20 46     46   395 use Scalar::Util qw( looks_like_number );
  46         109  
  46         3643  
21 46     46   20073 use MCE;
  46         157  
  46         388  
22              
23             our @CARP_NOT = qw( MCE );
24              
25             my $_tid = $INC{'threads.pm'} ? threads->tid() : 0;
26              
27             sub CLONE {
28 0 0   0   0 $_tid = threads->tid() if $INC{'threads.pm'};
29             }
30              
31             ###############################################################################
32             ## ----------------------------------------------------------------------------
33             ## Import routine.
34             ##
35             ###############################################################################
36              
37             my ($_MCE, $_def, $_params, $_tag) = ({}, {}, {}, 'MCE::Flow');
38             my ($_prev_c, $_prev_n, $_prev_t, $_prev_w) = ({}, {}, {}, {});
39             my ($_user_tasks) = ({});
40              
41             sub import {
42 46     46   959 my ($_class, $_pkg) = (shift, caller);
43              
44 46         267 my $_p = $_def->{$_pkg} = {
45             MAX_WORKERS => 'auto',
46             CHUNK_SIZE => 'auto',
47             };
48              
49             ## Import functions.
50 46 50       250 if ($_pkg !~ /^MCE::/) {
51 46     46   519 no strict 'refs'; no warnings 'redefine';
  46     46   82  
  46         2746  
  46         271  
  46         90  
  46         199481  
52 46         152 *{ $_pkg.'::mce_flow_f' } = \&run_file;
  46         381  
53 46         108 *{ $_pkg.'::mce_flow_s' } = \&run_seq;
  46         170  
54 46         101 *{ $_pkg.'::mce_flow' } = \&run;
  46         230  
55             }
56              
57             ## Process module arguments.
58 46         228 while ( my $_argument = shift ) {
59 0         0 my $_arg = lc $_argument;
60              
61 0 0       0 $_p->{MAX_WORKERS} = shift, next if ( $_arg eq 'max_workers' );
62 0 0       0 $_p->{CHUNK_SIZE} = shift, next if ( $_arg eq 'chunk_size' );
63 0 0       0 $_p->{TMP_DIR} = shift, next if ( $_arg eq 'tmp_dir' );
64 0 0       0 $_p->{FREEZE} = shift, next if ( $_arg eq 'freeze' );
65 0 0       0 $_p->{THAW} = shift, next if ( $_arg eq 'thaw' );
66 0 0       0 $_p->{INIT_RELAY} = shift, next if ( $_arg eq 'init_relay' );
67 0 0       0 $_p->{USE_THREADS} = shift, next if ( $_arg eq 'use_threads' );
68              
69             ## Sereal 3.015+, if available, is used automatically by MCE 1.8+.
70 0 0       0 if ( $_arg eq 'sereal' ) {
71 0 0       0 if ( shift eq '0' ) {
72 0         0 require Storable;
73 0         0 $_p->{FREEZE} = \&Storable::freeze;
74 0         0 $_p->{THAW} = \&Storable::thaw;
75             }
76 0         0 next;
77             }
78              
79 0         0 _croak("Error: ($_argument) invalid module option");
80             }
81              
82 46         343 $_p->{MAX_WORKERS} = MCE::_parse_max_workers($_p->{MAX_WORKERS});
83              
84 46         251 MCE::_validate_number($_p->{MAX_WORKERS}, 'MAX_WORKERS', $_tag);
85             MCE::_validate_number($_p->{CHUNK_SIZE}, 'CHUNK_SIZE', $_tag)
86 46 50       175 unless ($_p->{CHUNK_SIZE} eq 'auto');
87              
88 46         1083 return;
89             }
90              
91             ###############################################################################
92             ## ----------------------------------------------------------------------------
93             ## Init and finish routines.
94             ##
95             ###############################################################################
96              
97             sub MCE::Flow::_guard::DESTROY {
98 0     0   0 my ($_pkg, $_id) = @{ $_[0] };
  0         0  
99              
100 0 0 0     0 if (defined $_pkg && $_id eq "$$.$_tid") {
101 0         0 @{ $_[0] } = ();
  0         0  
102 0         0 MCE::Flow->finish($_pkg);
103             }
104              
105 0         0 return;
106             }
107              
108             sub init (@) {
109              
110 29 100 66 29 1 3569928 shift if (defined $_[0] && $_[0] eq 'MCE::Flow');
111 29         483 my $_pkg = "$$.$_tid.".caller();
112              
113 29 50       344 $_params->{$_pkg} = (ref $_[0] eq 'HASH') ? shift : { @_ };
114              
115 29         90 @_ = ();
116              
117             defined wantarray
118 29 50       119 ? bless([$_pkg, "$$.$_tid"], MCE::Flow::_guard::)
119             : ();
120             }
121              
122             sub finish (@) {
123              
124 115 50 33 115 1 13890 shift if (defined $_[0] && $_[0] eq 'MCE::Flow');
125 115 100       1584 my $_pkg = (defined $_[0]) ? shift : "$$.$_tid.".caller();
126              
127 115 100 66     2230 if ( $_pkg eq 'MCE' ) {
    100          
128 46         158 for my $_k ( keys %{ $_MCE } ) { MCE::Flow->finish($_k, 1); }
  46         1357  
  33         1292  
129             }
130             elsif ( $_MCE->{$_pkg} && $_MCE->{$_pkg}{_init_pid} eq "$$.$_tid" ) {
131 36 50       1376 $_MCE->{$_pkg}->shutdown(@_) if $_MCE->{$_pkg}{_spawned};
132              
133 36         286 delete $_user_tasks->{$_pkg};
134 36         436 delete $_prev_c->{$_pkg};
135 36         308 delete $_prev_n->{$_pkg};
136 36         243 delete $_prev_t->{$_pkg};
137 36         141 delete $_prev_w->{$_pkg};
138 36         422 delete $_MCE->{$_pkg};
139             }
140              
141 115         413 @_ = ();
142              
143 115         1077 return;
144             }
145              
146             ###############################################################################
147             ## ----------------------------------------------------------------------------
148             ## Parallel flow with MCE -- file.
149             ##
150             ###############################################################################
151              
152             sub run_file (@) {
153              
154 4 50 33 4 1 3802 shift if (defined $_[0] && $_[0] eq 'MCE::Flow');
155              
156 4 50       24 my ($_file, $_pos); my $_start_pos = (ref $_[0] eq 'HASH') ? 2 : 1;
  4         12  
157 4         32 my $_pid = "$$.$_tid.".caller();
158              
159 4 50       14 if (defined (my $_p = $_params->{$_pid})) {
160 4 50       10 delete $_p->{input_data} if (exists $_p->{input_data});
161 4 50       12 delete $_p->{sequence} if (exists $_p->{sequence});
162             }
163             else {
164 0         0 $_params->{$_pid} = {};
165             }
166              
167 4         28 for my $_i ($_start_pos .. @_ - 1) {
168 8         20 my $_r = ref $_[$_i];
169 8 100 66     50 if ($_r eq '' || $_r eq 'SCALAR' || $_r =~ /^(?:GLOB|FileHandle|IO::)/) {
      100        
170 4         8 $_file = $_[$_i]; $_pos = $_i;
  4         4  
171 4         12 last;
172             }
173             }
174              
175 4 100 66     92 if (defined $_file && ref $_file eq '' && $_file ne '') {
    50 66        
      33        
176 2 50       62 _croak("$_tag: ($_file) does not exist") unless (-e $_file);
177 2 50       18 _croak("$_tag: ($_file) is not readable") unless (-r $_file);
178 2 50       12 _croak("$_tag: ($_file) is not a plain file") unless (-f $_file);
179 2         24 $_params->{$_pid}{_file} = $_file;
180             }
181             elsif (ref $_file eq 'SCALAR' || ref($_file) =~ /^(?:GLOB|FileHandle|IO::)/) {
182 2         48 $_params->{$_pid}{_file} = $_file;
183             }
184             else {
185 0         0 _croak("$_tag: (file) is not specified or valid");
186             }
187              
188 4 50       10 if (defined $_pos) {
189 4         16 pop @_ for ($_pos .. @_ - 1);
190             }
191              
192 4         12 return run(@_);
193             }
194              
195             ###############################################################################
196             ## ----------------------------------------------------------------------------
197             ## Parallel flow with MCE -- sequence.
198             ##
199             ###############################################################################
200              
201             sub run_seq (@) {
202              
203 2 50 33 2 1 2174 shift if (defined $_[0] && $_[0] eq 'MCE::Flow');
204              
205 2 50       6 my ($_begin, $_end, $_pos); my $_start_pos = (ref $_[0] eq 'HASH') ? 2 : 1;
  2         12  
206 2         24 my $_pid = "$$.$_tid.".caller();
207              
208 2 50       12 if (defined (my $_p = $_params->{$_pid})) {
209 2 50       10 delete $_p->{sequence} if (exists $_p->{sequence});
210 2 50       8 delete $_p->{input_data} if (exists $_p->{input_data});
211 2 50       8 delete $_p->{_file} if (exists $_p->{_file});
212             }
213             else {
214 0         0 $_params->{$_pid} = {};
215             }
216              
217 2         10 for my $_i ($_start_pos .. @_ - 1) {
218 4         12 my $_r = ref $_[$_i];
219              
220 4 50 66     72 if ($_r eq '' || $_r =~ /^Math::/ || $_r eq 'HASH' || $_r eq 'ARRAY') {
      66        
      33        
221 2         4 $_pos = $_i;
222              
223 2 50 33     10 if ($_r eq '' || $_r =~ /^Math::/) {
    0          
    0          
224 2         6 $_begin = $_[$_pos], $_end = $_[$_pos + 1];
225             $_params->{$_pid}{sequence} = [
226 2         24 $_[$_pos], $_[$_pos + 1], $_[$_pos + 2], $_[$_pos + 3]
227             ];
228             }
229             elsif ($_r eq 'HASH') {
230 0         0 $_begin = $_[$_pos]->{begin}, $_end = $_[$_pos]->{end};
231 0         0 $_params->{$_pid}{sequence} = $_[$_pos];
232             }
233             elsif ($_r eq 'ARRAY') {
234 0         0 $_begin = $_[$_pos]->[0], $_end = $_[$_pos]->[1];
235 0         0 $_params->{$_pid}{sequence} = $_[$_pos];
236             }
237              
238 2         6 last;
239             }
240             }
241              
242             _croak("$_tag: (sequence) is not specified or valid")
243 2 50       10 unless (exists $_params->{$_pid}{sequence});
244 2 50       6 _croak("$_tag: (begin) is not specified for sequence")
245             unless (defined $_begin);
246 2 50       8 _croak("$_tag: (end) is not specified for sequence")
247             unless (defined $_end);
248              
249 2         10 $_params->{$_pid}{sequence_run} = undef;
250              
251 2 50       6 if (defined $_pos) {
252 2         12 pop @_ for ($_pos .. @_ - 1);
253             }
254              
255 2         8 return run(@_);
256             }
257              
258             ###############################################################################
259             ## ----------------------------------------------------------------------------
260             ## Parallel flow with MCE.
261             ##
262             ###############################################################################
263              
264             sub run (@) {
265              
266 79 50 33 79 1 2802959 shift if (defined $_[0] && $_[0] eq 'MCE::Flow');
267              
268 79 100       418 my $_pkg = caller() eq 'MCE::Flow' ? caller(1) : caller();
269 79         773 my $_pid = "$$.$_tid.$_pkg";
270              
271 79 100       496 if (ref $_[0] eq 'HASH') {
272 53 100       368 $_params->{$_pid} = {} unless defined $_params->{$_pid};
273 53         131 for my $_p (keys %{ $_[0] }) {
  53         772  
274 82         484 $_params->{$_pid}{$_p} = $_[0]->{$_p};
275             }
276              
277 53         113 shift;
278             }
279              
280             ## -------------------------------------------------------------------------
281              
282 79         254 my (@_code, @_name, @_thrs, @_wrks); my $_init_mce = 0; my $_pos = 0;
  79         153  
  79         506  
283              
284 79         517 while (ref $_[0] eq 'CODE') {
285 99         336 push @_code, $_[0];
286              
287 99 50       378 if (defined (my $_p = $_params->{$_pid})) {
288             push @_name, (ref $_p->{task_name} eq 'ARRAY')
289 99 100       637 ? $_p->{task_name}->[$_pos] : undef;
290             push @_thrs, (ref $_p->{use_threads} eq 'ARRAY')
291 99 50       342 ? $_p->{use_threads}->[$_pos] : undef;
292             push @_wrks, (ref $_p->{max_workers} eq 'ARRAY')
293 99 100       509 ? $_p->{max_workers}->[$_pos] : undef;
294             }
295              
296             $_init_mce = 1 if (
297             !defined $_prev_c->{$_pid}[$_pos] ||
298 99 100 66     777 $_prev_c->{$_pid}[$_pos] != $_code[$_pos]
299             );
300              
301 99 100       510 $_init_mce = 1 if ($_prev_n->{$_pid}[$_pos] ne $_name[$_pos]);
302 99 50       332 $_init_mce = 1 if ($_prev_t->{$_pid}[$_pos] ne $_thrs[$_pos]);
303 99 100       438 $_init_mce = 1 if ($_prev_w->{$_pid}[$_pos] ne $_wrks[$_pos]);
304              
305 99         239 $_prev_c->{$_pid}[$_pos] = $_code[$_pos];
306 99         217 $_prev_n->{$_pid}[$_pos] = $_name[$_pos];
307 99         184 $_prev_t->{$_pid}[$_pos] = $_thrs[$_pos];
308 99         225 $_prev_w->{$_pid}[$_pos] = $_wrks[$_pos];
309              
310 99         152 shift; $_pos++;
  99         314  
311             }
312              
313 79 50       287 if (defined $_prev_c->{$_pid}[$_pos]) {
314 0         0 pop @{ $_prev_c->{$_pid} } for ($_pos .. $#{ $_prev_c->{$_pid } });
  0         0  
  0         0  
315 0         0 pop @{ $_prev_n->{$_pid} } for ($_pos .. $#{ $_prev_n->{$_pid } });
  0         0  
  0         0  
316 0         0 pop @{ $_prev_t->{$_pid} } for ($_pos .. $#{ $_prev_t->{$_pid } });
  0         0  
  0         0  
317 0         0 pop @{ $_prev_w->{$_pid} } for ($_pos .. $#{ $_prev_w->{$_pid } });
  0         0  
  0         0  
318              
319 0         0 $_init_mce = 1;
320             }
321              
322 79 50       240 return unless (scalar @_code);
323              
324             ## -------------------------------------------------------------------------
325              
326 79         410 my $_input_data; my $_max_workers = $_def->{$_pkg}{MAX_WORKERS};
  79         678  
327 79         214 my $_r = ref $_[0];
328              
329 79 100 66     358 if (@_ == 1 && $_r =~ /^(?:ARRAY|HASH|SCALAR|GLOB|FileHandle|IO::|Iterator::)/) {
330 4         8 $_input_data = shift;
331             }
332              
333 79 50       257 if (defined (my $_p = $_params->{$_pid})) {
334             $_max_workers = MCE::_parse_max_workers($_p->{max_workers})
335 79 100 66     1560 if (exists $_p->{max_workers} && ref $_p->{max_workers} ne 'ARRAY');
336              
337 79 100 100     551 delete $_p->{sequence} if (defined $_input_data || scalar @_);
338 79 50       267 delete $_p->{user_func} if (exists $_p->{user_func});
339 79 50       425 delete $_p->{user_tasks} if (exists $_p->{user_tasks});
340             }
341              
342 79 100 66     389 if (@_code > 1 && $_max_workers > 1) {
343 20         78 $_max_workers = int($_max_workers / @_code + 0.5) + 1;
344             }
345              
346             my $_chunk_size = MCE::_parse_chunk_size(
347 79         3473 $_def->{$_pkg}{CHUNK_SIZE}, $_max_workers, $_params->{$_pid},
348             $_input_data, scalar @_
349             );
350              
351 79 50       457 if (defined (my $_p = $_params->{$_pid})) {
352 79 100       316 if (exists $_p->{_file}) {
353 4         10 $_input_data = delete $_p->{_file};
354             } else {
355 75 50       225 $_input_data = $_p->{input_data} if exists $_p->{input_data};
356             }
357             }
358              
359             ## -------------------------------------------------------------------------
360              
361 79         623 MCE::_save_state($_MCE->{$_pid});
362              
363 79 100       272 if ($_init_mce) {
364 69 50       255 $_MCE->{$_pid}->shutdown() if (defined $_MCE->{$_pid});
365              
366             ## must clear arrays for nested session to work with Perl < v5.14
367 69         601 _gen_user_tasks($_pid, [@_code], [@_name], [@_thrs], [@_wrks]);
368              
369 69         321 @_code = @_name = @_thrs = @_wrks = ();
370              
371             my %_opts = (
372             max_workers => $_max_workers, task_name => $_tag,
373 69         452 user_tasks => $_user_tasks->{$_pid},
374             );
375              
376 69 50       248 if (defined (my $_p = $_params->{$_pid})) {
377 69         283 local $_;
378              
379 69         164 for (keys %{ $_p }) {
  69         275  
380 124 100 100     782 next if ($_ eq 'max_workers' && ref $_p->{max_workers} eq 'ARRAY');
381 112 100 66     358 next if ($_ eq 'task_name' && ref $_p->{task_name} eq 'ARRAY');
382 106 50 33     313 next if ($_ eq 'use_threads' && ref $_p->{use_threads} eq 'ARRAY');
383              
384 106 50       424 next if ($_ eq 'chunk_size');
385 106 50       308 next if ($_ eq 'input_data');
386 106 50       260 next if ($_ eq 'sequence_run');
387              
388             _croak("$_tag: ($_) is not a valid constructor argument")
389 106 50       444 unless (exists $MCE::_valid_fields_new{$_});
390              
391 106         456 $_opts{$_} = $_p->{$_};
392             }
393             }
394              
395 69         250 for my $_k (qw/ tmp_dir freeze thaw init_relay use_threads /) {
396             $_opts{$_k} = $_def->{$_pkg}{uc($_k)}
397 345 50 33     1169 if (exists $_def->{$_pkg}{uc($_k)} && !exists $_opts{$_k});
398             }
399              
400 69         1159 $_MCE->{$_pid} = MCE->new(pkg => $_pkg, %_opts);
401             }
402             else {
403             ## Workers may persist after running. Thus, updating the MCE instance.
404             ## These options do not require respawning.
405 10 50       35 if (defined (my $_p = $_params->{$_pid})) {
406 10         55 for my $_k (qw(
407             RS interval stderr_file stdout_file user_error user_output
408             job_delay submit_delay on_post_exit on_post_run user_args
409             flush_file flush_stderr flush_stdout gather max_retries
410             )) {
411 160 100       384 $_MCE->{$_pid}{$_k} = $_p->{$_k} if (exists $_p->{$_k});
412             }
413             }
414             }
415              
416             ## -------------------------------------------------------------------------
417              
418 79 100       184 my @_a; my $_wa = wantarray; $_MCE->{$_pid}{gather} = \@_a if (defined $_wa);
  79         139  
  79         325  
419              
420 79 100       465 if (defined $_input_data) {
    100          
421 8         14 @_ = ();
422 8         48 $_MCE->{$_pid}->process({ chunk_size => $_chunk_size }, $_input_data);
423 7         48 delete $_MCE->{$_pid}{input_data};
424             }
425             elsif (scalar @_) {
426 6         42 $_MCE->{$_pid}->process({ chunk_size => $_chunk_size }, \@_);
427 2         64 delete $_MCE->{$_pid}{input_data};
428             }
429             else {
430 65 100 66     854 if (defined $_params->{$_pid} && exists $_params->{$_pid}{sequence}) {
431             $_MCE->{$_pid}->run({
432             chunk_size => $_chunk_size,
433             sequence => $_params->{$_pid}{sequence}
434 2         22 }, 0);
435 2 50       14 if (exists $_params->{$_pid}{sequence_run}) {
436 2         8 delete $_params->{$_pid}{sequence_run};
437 2         4 delete $_params->{$_pid}{sequence};
438             }
439 2         6 delete $_MCE->{$_pid}{sequence};
440             }
441             else {
442 63         660 $_MCE->{$_pid}->run({ chunk_size => $_chunk_size }, 0);
443             }
444             }
445              
446 46         1421 MCE::_restore_state();
447              
448 46 100       477 delete $_MCE->{$_pid}{gather} if (defined $_wa);
449              
450 46 100       2969 return ((defined $_wa) ? @_a : ());
451             }
452              
453             ###############################################################################
454             ## ----------------------------------------------------------------------------
455             ## Private methods.
456             ##
457             ###############################################################################
458              
459             sub _croak {
460              
461 0     0   0 goto &MCE::_croak;
462             }
463              
464             sub _gen_user_tasks {
465              
466 69     69   257 my ($_pid, $_code_ref, $_name_ref, $_thrs_ref, $_wrks_ref) = @_;
467              
468 69         136 @{ $_user_tasks->{$_pid} } = ();
  69         529  
469              
470 69         210 for (my $_i = 0; $_i < @{ $_code_ref }; $_i++) {
  150         580  
471 81         148 push @{ $_user_tasks->{$_pid} }, {
  81         1012  
472             task_name => $_name_ref->[$_i],
473             use_threads => $_thrs_ref->[$_i],
474             max_workers => $_wrks_ref->[$_i],
475             user_func => $_code_ref->[$_i]
476             }
477             }
478              
479 69         183 return;
480             }
481              
482             1;
483              
484             __END__
485              
486             ###############################################################################
487             ## ----------------------------------------------------------------------------
488             ## Module usage.
489             ##
490             ###############################################################################
491              
492             =head1 NAME
493              
494             MCE::Flow - Parallel flow model for building creative applications
495              
496             =head1 VERSION
497              
498             This document describes MCE::Flow version 1.902
499              
500             =head1 DESCRIPTION
501              
502             MCE::Flow is great for writing custom apps to maximize on all available cores.
503             This module was created to help one harness user_tasks within MCE.
504              
505             It is trivial to parallelize with mce_stream shown below.
506              
507             ## Native map function
508             my @a = map { $_ * 4 } map { $_ * 3 } map { $_ * 2 } 1..10000;
509              
510             ## Same as with MCE::Stream (processing from right to left)
511             @a = mce_stream
512             sub { $_ * 4 }, sub { $_ * 3 }, sub { $_ * 2 }, 1..10000;
513              
514             ## Pass an array reference to have writes occur simultaneously
515             mce_stream \@a,
516             sub { $_ * 4 }, sub { $_ * 3 }, sub { $_ * 2 }, 1..10000;
517              
518             However, let's have MCE::Flow compute the same in parallel. MCE::Queue
519             will be used for data flow among the sub-tasks.
520              
521             use MCE::Flow;
522             use MCE::Queue;
523              
524             This calls for preserving output order.
525              
526             sub preserve_order {
527             my %tmp; my $order_id = 1; my $gather_ref = $_[0];
528             @{ $gather_ref } = (); ## clear the array (optional)
529              
530             return sub {
531             my ($data_ref, $chunk_id) = @_;
532             $tmp{$chunk_id} = $data_ref;
533              
534             while (1) {
535             last unless exists $tmp{$order_id};
536             push @{ $gather_ref }, @{ delete $tmp{$order_id++} };
537             }
538              
539             return;
540             };
541             }
542              
543             Two queues are needed for data flow between the 3 sub-tasks. Notice task_end
544             and how the value from $task_name is used for determining which task has ended.
545              
546             my $b = MCE::Queue->new;
547             my $c = MCE::Queue->new;
548              
549             sub task_end {
550             my ($mce, $task_id, $task_name) = @_;
551              
552             if (defined $mce->{user_tasks}->[$task_id + 1]) {
553             my $n_workers = $mce->{user_tasks}->[$task_id + 1]->{max_workers};
554              
555             if ($task_name eq 'a') {
556             $b->enqueue((undef) x $n_workers);
557             }
558             elsif ($task_name eq 'b') {
559             $c->enqueue((undef) x $n_workers);
560             }
561             }
562              
563             return;
564             }
565              
566             Next are the 3 sub-tasks. The first one reads input and begins the flow.
567             The 2nd task dequeues, performs the calculation, and enqueues into the next.
568             Finally, the last task calls the gather method.
569              
570             Although serialization is done for you automatically, it is done here to save
571             from double serialization. This is the fastest approach for passing data
572             between sub-tasks. Thus, the least overhead.
573              
574             sub task_a {
575             my @ans; my ($mce, $chunk_ref, $chunk_id) = @_;
576              
577             push @ans, map { $_ * 2 } @{ $chunk_ref };
578             $b->enqueue(MCE->freeze([ \@ans, $chunk_id ]));
579              
580             return;
581             }
582              
583             sub task_b {
584             my ($mce) = @_;
585              
586             while (1) {
587             my @ans; my $chunk = $b->dequeue;
588             last unless defined $chunk;
589              
590             $chunk = MCE->thaw($chunk);
591             push @ans, map { $_ * 3 } @{ $chunk->[0] };
592             $c->enqueue(MCE->freeze([ \@ans, $chunk->[1] ]));
593             }
594              
595             return;
596             }
597              
598             sub task_c {
599             my ($mce) = @_;
600              
601             while (1) {
602             my @ans; my $chunk = $c->dequeue;
603             last unless defined $chunk;
604              
605             $chunk = MCE->thaw($chunk);
606             push @ans, map { $_ * 4 } @{ $chunk->[0] };
607             MCE->gather(\@ans, $chunk->[1]);
608             }
609              
610             return;
611             }
612              
613             In summary, MCE::Flow builds out a MCE instance behind the scene and starts
614             running. The task_name (shown), max_workers, and use_threads options can take
615             an anonymous array for specifying the values uniquely per each sub-task.
616              
617             my @a;
618              
619             mce_flow {
620             task_name => [ 'a', 'b', 'c' ], task_end => \&task_end,
621             gather => preserve_order(\@a)
622              
623             }, \&task_a, \&task_b, \&task_c, 1..10000;
624              
625             print "@a\n";
626              
627             If speed is not a concern and wanting to rid of all the MCE->freeze and
628             MCE->thaw statements, simply enqueue and dequeue 2 items at a time.
629             Or better yet, see L<MCE::Step> introduced in MCE 1.506.
630              
631             First, task_end must be updated. The number of undef(s) must match the number
632             of workers times the dequeue count. Otherwise, the script will stall.
633              
634             sub task_end {
635             ...
636             if ($task_name eq 'a') {
637             # $b->enqueue((undef) x $n_workers);
638             $b->enqueue((undef) x ($n_workers * 2));
639             }
640             elsif ($task_name eq 'b') {
641             # $c->enqueue((undef) x $n_workers);
642             $c->enqueue((undef) x ($n_workers * 2));
643             }
644             ...
645             }
646              
647             Next, the 3 sub-tasks enqueuing and dequeuing 2 elements at a time.
648              
649             sub task_a {
650             my @ans; my ($mce, $chunk_ref, $chunk_id) = @_;
651              
652             push @ans, map { $_ * 2 } @{ $chunk_ref };
653             $b->enqueue(\@ans, $chunk_id);
654              
655             return;
656             }
657              
658             sub task_b {
659             my ($mce) = @_;
660              
661             while (1) {
662             my @ans; my ($chunk_ref, $chunk_id) = $b->dequeue(2);
663             last unless defined $chunk_ref;
664              
665             push @ans, map { $_ * 3 } @{ $chunk_ref };
666             $c->enqueue(\@ans, $chunk_id);
667             }
668              
669             return;
670             }
671              
672             sub task_c {
673             my ($mce) = @_;
674              
675             while (1) {
676             my @ans; my ($chunk_ref, $chunk_id) = $c->dequeue(2);
677             last unless defined $chunk_ref;
678              
679             push @ans, map { $_ * 4 } @{ $chunk_ref };
680             MCE->gather(\@ans, $chunk_id);
681             }
682              
683             return;
684             }
685              
686             Finally, run as usual.
687              
688             my @a;
689              
690             mce_flow {
691             task_name => [ 'a', 'b', 'c' ], task_end => \&task_end,
692             gather => preserve_order(\@a)
693              
694             }, \&task_a, \&task_b, \&task_c, 1..10000;
695              
696             print "@a\n";
697              
698             =head1 SYNOPSIS when CHUNK_SIZE EQUALS 1
699              
700             Although L<MCE::Loop> may be preferred for running using a single code block,
701             the text below also applies to this module, particularly for the first block.
702              
703             All models in MCE default to 'auto' for chunk_size. The arguments for the block
704             are the same as writing a user_func block using the Core API.
705              
706             Beginning with MCE 1.5, the next input item is placed into the input scalar
707             variable $_ when chunk_size equals 1. Otherwise, $_ points to $chunk_ref
708             containing many items. Basically, line 2 below may be omitted from your code
709             when using $_. One can call MCE->chunk_id to obtain the current chunk id.
710              
711             line 1: user_func => sub {
712             line 2: my ($mce, $chunk_ref, $chunk_id) = @_;
713             line 3:
714             line 4: $_ points to $chunk_ref->[0]
715             line 5: in MCE 1.5 when chunk_size == 1
716             line 6:
717             line 7: $_ points to $chunk_ref
718             line 8: in MCE 1.5 when chunk_size > 1
719             line 9: }
720              
721             Follow this synopsis when chunk_size equals one. Looping is not required from
722             inside the first block. Hence, the block is called once per each item.
723              
724             ## Exports mce_flow, mce_flow_f, and mce_flow_s
725             use MCE::Flow;
726              
727             MCE::Flow->init(
728             chunk_size => 1
729             );
730              
731             ## Array or array_ref
732             mce_flow sub { do_work($_) }, 1..10000;
733             mce_flow sub { do_work($_) }, \@list;
734              
735             ## Important; pass an array_ref for deeply input data
736             mce_flow sub { do_work($_) }, [ [ 0, 1 ], [ 0, 2 ], ... ];
737             mce_flow sub { do_work($_) }, \@deeply_list;
738              
739             ## File path, glob ref, IO::All::{ File, Pipe, STDIO } obj, or scalar ref
740             ## Workers read directly and not involve the manager process
741             mce_flow_f sub { chomp; do_work($_) }, "/path/to/file"; # efficient
742              
743             ## Involves the manager process, therefore slower
744             mce_flow_f sub { chomp; do_work($_) }, $file_handle;
745             mce_flow_f sub { chomp; do_work($_) }, $io;
746             mce_flow_f sub { chomp; do_work($_) }, \$scalar;
747              
748             ## Sequence of numbers (begin, end [, step, format])
749             mce_flow_s sub { do_work($_) }, 1, 10000, 5;
750             mce_flow_s sub { do_work($_) }, [ 1, 10000, 5 ];
751              
752             mce_flow_s sub { do_work($_) }, {
753             begin => 1, end => 10000, step => 5, format => undef
754             };
755              
756             =head1 SYNOPSIS when CHUNK_SIZE is GREATER THAN 1
757              
758             Follow this synopsis when chunk_size equals 'auto' or greater than 1.
759             This means having to loop through the chunk from inside the first block.
760              
761             use MCE::Flow;
762              
763             MCE::Flow->init( ## Chunk_size defaults to 'auto' when
764             chunk_size => 'auto' ## not specified. Therefore, the init
765             ); ## function may be omitted.
766              
767             ## Syntax is shown for mce_flow for demonstration purposes.
768             ## Looping inside the block is the same for mce_flow_f and
769             ## mce_flow_s.
770              
771             ## Array or array_ref
772             mce_flow sub { do_work($_) for (@{ $_ }) }, 1..10000;
773             mce_flow sub { do_work($_) for (@{ $_ }) }, \@list;
774              
775             ## Important; pass an array_ref for deeply input data
776             mce_flow sub { do_work($_) for (@{ $_ }) }, [ [ 0, 1 ], [ 0, 2 ], ... ];
777             mce_flow sub { do_work($_) for (@{ $_ }) }, \@deeply_list;
778              
779             ## Resembles code using the core MCE API
780             mce_flow sub {
781             my ($mce, $chunk_ref, $chunk_id) = @_;
782              
783             for (@{ $chunk_ref }) {
784             do_work($_);
785             }
786              
787             }, 1..10000;
788              
789             Chunking reduces the number of IPC calls behind the scene. Think in terms of
790             chunks whenever processing a large amount of data. For relatively small data,
791             choosing 1 for chunk_size is fine.
792              
793             =head1 OVERRIDING DEFAULTS
794              
795             The following list options which may be overridden when loading the module.
796              
797             use Sereal qw( encode_sereal decode_sereal );
798             use CBOR::XS qw( encode_cbor decode_cbor );
799             use JSON::XS qw( encode_json decode_json );
800              
801             use MCE::Flow
802             max_workers => 8, # Default 'auto'
803             chunk_size => 500, # Default 'auto'
804             tmp_dir => "/path/to/app/tmp", # $MCE::Signal::tmp_dir
805             freeze => \&encode_sereal, # \&Storable::freeze
806             thaw => \&decode_sereal, # \&Storable::thaw
807             init_relay => 0, # Default undef; MCE 1.882+
808             use_threads => 0, # Default undef; MCE 1.882+
809             ;
810              
811             From MCE 1.8 onwards, Sereal 3.015+ is loaded automatically if available.
812             Specify C<< Sereal => 0 >> to use Storable instead.
813              
814             use MCE::Flow Sereal => 0;
815              
816             =head1 CUSTOMIZING MCE
817              
818             =over 3
819              
820             =item MCE::Flow->init ( options )
821              
822             =item MCE::Flow::init { options }
823              
824             =back
825              
826             The init function accepts a hash of MCE options. Unlike with MCE::Stream,
827             both gather and bounds_only options may be specified (not shown below).
828              
829             In scalar context (API available since 1.897), call C<MCE::Flow->finish>
830             automatically upon leaving the scope or program.
831              
832             use MCE::Flow;
833              
834             my $guard = MCE::Flow->init(
835             chunk_size => 1, max_workers => 4,
836              
837             user_begin => sub {
838             print "## ", MCE->wid, " started\n";
839             },
840              
841             user_end => sub {
842             print "## ", MCE->wid, " completed\n";
843             }
844             );
845              
846             my %a = mce_flow sub { MCE->gather($_, $_ * $_) }, 1..100;
847              
848             print "\n", "@a{1..100}", "\n";
849              
850             -- Output
851              
852             ## 3 started
853             ## 2 started
854             ## 4 started
855             ## 1 started
856             ## 2 completed
857             ## 4 completed
858             ## 3 completed
859             ## 1 completed
860              
861             1 4 9 16 25 36 49 64 81 100 121 144 169 196 225 256 289 324 361
862             400 441 484 529 576 625 676 729 784 841 900 961 1024 1089 1156
863             1225 1296 1369 1444 1521 1600 1681 1764 1849 1936 2025 2116 2209
864             2304 2401 2500 2601 2704 2809 2916 3025 3136 3249 3364 3481 3600
865             3721 3844 3969 4096 4225 4356 4489 4624 4761 4900 5041 5184 5329
866             5476 5625 5776 5929 6084 6241 6400 6561 6724 6889 7056 7225 7396
867             7569 7744 7921 8100 8281 8464 8649 8836 9025 9216 9409 9604 9801
868             10000
869              
870             Like with MCE::Flow->init above, MCE options may be specified using an
871             anonymous hash for the first argument. Notice how task_name, max_workers,
872             and use_threads can take an anonymous array for setting uniquely per
873             each code block.
874              
875             Unlike MCE::Stream which processes from right-to-left, MCE::Flow begins
876             with the first code block, thus processing from left-to-right.
877              
878             use threads;
879             use MCE::Flow;
880              
881             my @a = mce_flow {
882             task_name => [ 'a', 'b', 'c' ],
883             max_workers => [ 3, 4, 2, ],
884             use_threads => [ 1, 0, 0, ],
885              
886             user_end => sub {
887             my ($mce, $task_id, $task_name) = @_;
888             MCE->print("$task_id - $task_name completed\n");
889             },
890              
891             task_end => sub {
892             my ($mce, $task_id, $task_name) = @_;
893             MCE->print("$task_id - $task_name ended\n");
894             }
895             },
896             sub { sleep 1; }, ## 3 workers, named a
897             sub { sleep 2; }, ## 4 workers, named b
898             sub { sleep 3; }; ## 2 workers, named c
899              
900             -- Output
901              
902             0 - a completed
903             0 - a completed
904             0 - a completed
905             0 - a ended
906             1 - b completed
907             1 - b completed
908             1 - b completed
909             1 - b completed
910             1 - b ended
911             2 - c completed
912             2 - c completed
913             2 - c ended
914              
915             =head1 API DOCUMENTATION
916              
917             Although input data is optional for MCE::Flow, the following assumes chunk_size
918             equals 1 in order to demonstrate all the possibilities for providing input data.
919              
920             =over 3
921              
922             =item MCE::Flow->run ( sub { code }, list )
923              
924             =item mce_flow sub { code }, list
925              
926             =back
927              
928             Input data may be defined using a list, an array ref, or a hash ref.
929              
930             Unlike MCE::Loop, Map, and Grep which take a block as C<{ ... }>, Flow takes a
931             C<sub { ... }> or a code reference. The other difference is that the comma is
932             needed after the block.
933              
934             # $_ contains the item when chunk_size => 1
935              
936             mce_flow sub { do_work($_) }, 1..1000;
937             mce_flow sub { do_work($_) }, \@list;
938              
939             # Important; pass an array_ref for deeply input data
940              
941             mce_flow sub { do_work($_) }, [ [ 0, 1 ], [ 0, 2 ], ... ];
942             mce_flow sub { do_work($_) }, \@deeply_list;
943              
944             # Chunking; any chunk_size => 1 or greater
945              
946             my %res = mce_flow sub {
947             my ($mce, $chunk_ref, $chunk_id) = @_;
948             my %ret;
949             for my $item (@{ $chunk_ref }) {
950             $ret{$item} = $item * 2;
951             }
952             MCE->gather(%ret);
953             },
954             \@list;
955              
956             # Input hash; current API available since 1.828
957              
958             my %res = mce_flow sub {
959             my ($mce, $chunk_ref, $chunk_id) = @_;
960             my %ret;
961             for my $key (keys %{ $chunk_ref }) {
962             $ret{$key} = $chunk_ref->{$key} * 2;
963             }
964             MCE->gather(%ret);
965             },
966             \%hash;
967              
968             # Unlike MCE::Loop, MCE::Flow doesn't need input to run
969              
970             mce_flow { max_workers => 4 }, sub {
971             MCE->say( MCE->wid );
972             };
973              
974             # ... and can run multiple tasks
975              
976             mce_flow {
977             max_workers => [ 1, 3 ],
978             task_name => [ 'p', 'c' ]
979             },
980             sub {
981             # 1 producer
982             MCE->say( "producer: ", MCE->wid );
983             },
984             sub {
985             # 3 consumers
986             MCE->say( "consumer: ", MCE->wid );
987             };
988              
989             # Here, options are specified via init
990              
991             MCE::Flow->init(
992             max_workers => [ 1, 3 ],
993             task_name => [ 'p', 'c' ]
994             );
995              
996             mce_flow \&producer, \&consumers;
997              
998             =over 3
999              
1000             =item MCE::Flow->run_file ( sub { code }, file )
1001              
1002             =item mce_flow_f sub { code }, file
1003              
1004             =back
1005              
1006             The fastest of these is the /path/to/file. Workers communicate the next offset
1007             position among themselves with zero interaction by the manager process.
1008              
1009             C<IO::All> { File, Pipe, STDIO } is supported since MCE 1.845.
1010              
1011             # $_ contains the line when chunk_size => 1
1012              
1013             mce_flow_f sub { $_ }, "/path/to/file"; # faster
1014             mce_flow_f sub { $_ }, $file_handle;
1015             mce_flow_f sub { $_ }, $io; # IO::All
1016             mce_flow_f sub { $_ }, \$scalar;
1017              
1018             # chunking, any chunk_size => 1 or greater
1019              
1020             my %res = mce_flow_f sub {
1021             my ($mce, $chunk_ref, $chunk_id) = @_;
1022             my $buf = '';
1023             for my $line (@{ $chunk_ref }) {
1024             $buf .= $line;
1025             }
1026             MCE->gather($chunk_id, $buf);
1027             },
1028             "/path/to/file";
1029              
1030             =over 3
1031              
1032             =item MCE::Flow->run_seq ( sub { code }, $beg, $end [, $step, $fmt ] )
1033              
1034             =item mce_flow_s sub { code }, $beg, $end [, $step, $fmt ]
1035              
1036             =back
1037              
1038             Sequence may be defined as a list, an array reference, or a hash reference.
1039             The functions require both begin and end values to run. Step and format are
1040             optional. The format is passed to sprintf (% may be omitted below).
1041              
1042             my ($beg, $end, $step, $fmt) = (10, 20, 0.1, "%4.1f");
1043              
1044             # $_ contains the sequence number when chunk_size => 1
1045              
1046             mce_flow_s sub { $_ }, $beg, $end, $step, $fmt;
1047             mce_flow_s sub { $_ }, [ $beg, $end, $step, $fmt ];
1048              
1049             mce_flow_s sub { $_ }, {
1050             begin => $beg, end => $end,
1051             step => $step, format => $fmt
1052             };
1053              
1054             # chunking, any chunk_size => 1 or greater
1055              
1056             my %res = mce_flow_s sub {
1057             my ($mce, $chunk_ref, $chunk_id) = @_;
1058             my $buf = '';
1059             for my $seq (@{ $chunk_ref }) {
1060             $buf .= "$seq\n";
1061             }
1062             MCE->gather($chunk_id, $buf);
1063             },
1064             [ $beg, $end ];
1065              
1066             The sequence engine can compute 'begin' and 'end' items only, for the chunk,
1067             and not the items in between (hence boundaries only). This option applies
1068             to sequence only and has no effect when chunk_size equals 1.
1069              
1070             The time to run is 0.006s below. This becomes 0.827s without the bounds_only
1071             option due to computing all items in between, thus creating a very large
1072             array. Basically, specify bounds_only => 1 when boundaries is all you need
1073             for looping inside the block; e.g. Monte Carlo simulations.
1074              
1075             Time was measured using 1 worker to emphasize the difference.
1076              
1077             use MCE::Flow;
1078              
1079             MCE::Flow->init(
1080             max_workers => 1, chunk_size => 1_250_000,
1081             bounds_only => 1
1082             );
1083              
1084             # Typically, the input scalar $_ contains the sequence number
1085             # when chunk_size => 1, unless the bounds_only option is set
1086             # which is the case here. Thus, $_ points to $chunk_ref.
1087              
1088             mce_flow_s sub {
1089             my ($mce, $chunk_ref, $chunk_id) = @_;
1090              
1091             # $chunk_ref contains 2 items, not 1_250_000
1092             # my ( $begin, $end ) = ( $_->[0], $_->[1] );
1093              
1094             my $begin = $chunk_ref->[0];
1095             my $end = $chunk_ref->[1];
1096              
1097             # for my $seq ( $begin .. $end ) {
1098             # ...
1099             # }
1100              
1101             MCE->printf("%7d .. %8d\n", $begin, $end);
1102             },
1103             [ 1, 10_000_000 ];
1104              
1105             -- Output
1106              
1107             1 .. 1250000
1108             1250001 .. 2500000
1109             2500001 .. 3750000
1110             3750001 .. 5000000
1111             5000001 .. 6250000
1112             6250001 .. 7500000
1113             7500001 .. 8750000
1114             8750001 .. 10000000
1115              
1116             =over 3
1117              
1118             =item MCE::Flow->run ( { input_data => iterator }, sub { code } )
1119              
1120             =item mce_flow { input_data => iterator }, sub { code }
1121              
1122             =back
1123              
1124             An iterator reference may be specified for input_data. The only other way
1125             is to specify input_data via MCE::Flow->init. This prevents MCE::Flow from
1126             configuring the iterator reference as another user task which will not work.
1127              
1128             Iterators are described under section "SYNTAX for INPUT_DATA" at L<MCE::Core>.
1129              
1130             MCE::Flow->init(
1131             input_data => iterator
1132             );
1133              
1134             mce_flow sub { $_ };
1135              
1136             =head1 GATHERING DATA
1137              
1138             Unlike MCE::Map where gather and output order are done for you automatically,
1139             the gather method is used to have results sent back to the manager process.
1140              
1141             use MCE::Flow chunk_size => 1;
1142              
1143             ## Output order is not guaranteed.
1144             my @a1 = mce_flow sub { MCE->gather($_ * 2) }, 1..100;
1145             print "@a1\n\n";
1146              
1147             ## Outputs to a hash instead (key, value).
1148             my %h1 = mce_flow sub { MCE->gather($_, $_ * 2) }, 1..100;
1149             print "@h1{1..100}\n\n";
1150              
1151             ## This does the same thing due to chunk_id starting at one.
1152             my %h2 = mce_flow sub { MCE->gather(MCE->chunk_id, $_ * 2) }, 1..100;
1153             print "@h2{1..100}\n\n";
1154              
1155             The gather method may be called multiple times within the block unlike return
1156             which would leave the block. Therefore, think of gather as yielding results
1157             immediately to the manager process without actually leaving the block.
1158              
1159             use MCE::Flow chunk_size => 1, max_workers => 3;
1160              
1161             my @hosts = qw(
1162             hosta hostb hostc hostd hoste
1163             );
1164              
1165             my %h3 = mce_flow sub {
1166             my ($output, $error, $status); my $host = $_;
1167              
1168             ## Do something with $host;
1169             $output = "Worker ". MCE->wid .": Hello from $host";
1170              
1171             if (MCE->chunk_id % 3 == 0) {
1172             ## Simulating an error condition
1173             local $? = 1; $status = $?;
1174             $error = "Error from $host"
1175             }
1176             else {
1177             $status = 0;
1178             }
1179              
1180             ## Ensure unique keys (key, value) when gathering to
1181             ## a hash.
1182             MCE->gather("$host.out", $output);
1183             MCE->gather("$host.err", $error) if (defined $error);
1184             MCE->gather("$host.sta", $status);
1185              
1186             }, @hosts;
1187              
1188             foreach my $host (@hosts) {
1189             print $h3{"$host.out"}, "\n";
1190             print $h3{"$host.err"}, "\n" if (exists $h3{"$host.err"});
1191             print "Exit status: ", $h3{"$host.sta"}, "\n\n";
1192             }
1193              
1194             -- Output
1195              
1196             Worker 3: Hello from hosta
1197             Exit status: 0
1198              
1199             Worker 2: Hello from hostb
1200             Exit status: 0
1201              
1202             Worker 1: Hello from hostc
1203             Error from hostc
1204             Exit status: 1
1205              
1206             Worker 3: Hello from hostd
1207             Exit status: 0
1208              
1209             Worker 2: Hello from hoste
1210             Exit status: 0
1211              
1212             The following uses an anonymous array containing 3 elements when gathering
1213             data. Serialization is automatic behind the scene.
1214              
1215             my %h3 = mce_flow sub {
1216             ...
1217              
1218             MCE->gather($host, [$output, $error, $status]);
1219              
1220             }, @hosts;
1221              
1222             foreach my $host (@hosts) {
1223             print $h3{$host}->[0], "\n";
1224             print $h3{$host}->[1], "\n" if (defined $h3{$host}->[1]);
1225             print "Exit status: ", $h3{$host}->[2], "\n\n";
1226             }
1227              
1228             Although MCE::Map comes to mind, one may want additional control when
1229             gathering data such as retaining output order.
1230              
1231             use MCE::Flow;
1232              
1233             sub preserve_order {
1234             my %tmp; my $order_id = 1; my $gather_ref = $_[0];
1235              
1236             return sub {
1237             $tmp{ (shift) } = \@_;
1238              
1239             while (1) {
1240             last unless exists $tmp{$order_id};
1241             push @{ $gather_ref }, @{ delete $tmp{$order_id++} };
1242             }
1243              
1244             return;
1245             };
1246             }
1247              
1248             ## Workers persist for the most part after running. Though, not always
1249             ## the case and depends on Perl. Pass a reference to a subroutine if
1250             ## workers must persist; e.g. mce_flow { ... }, \&foo, 1..100000.
1251              
1252             MCE::Flow->init(
1253             chunk_size => 'auto', max_workers => 'auto'
1254             );
1255              
1256             for (1..2) {
1257             my @m2;
1258              
1259             mce_flow {
1260             gather => preserve_order(\@m2)
1261             },
1262             sub {
1263             my @a; my ($mce, $chunk_ref, $chunk_id) = @_;
1264              
1265             ## Compute the entire chunk data at once.
1266             push @a, map { $_ * 2 } @{ $chunk_ref };
1267              
1268             ## Afterwards, invoke the gather feature, which
1269             ## will direct the data to the callback function.
1270             MCE->gather(MCE->chunk_id, @a);
1271              
1272             }, 1..100000;
1273              
1274             print scalar @m2, "\n";
1275             }
1276              
1277             MCE::Flow->finish;
1278              
1279             All 6 models support 'auto' for chunk_size unlike the Core API. Think of the
1280             models as the basis for providing JIT for MCE. They create the instance, tune
1281             max_workers, and tune chunk_size automatically regardless of the hardware.
1282              
1283             The following does the same thing using the Core API. Workers persist after
1284             running.
1285              
1286             use MCE;
1287              
1288             sub preserve_order {
1289             ...
1290             }
1291              
1292             my $mce = MCE->new(
1293             max_workers => 'auto', chunk_size => 8000,
1294              
1295             user_func => sub {
1296             my @a; my ($mce, $chunk_ref, $chunk_id) = @_;
1297              
1298             ## Compute the entire chunk data at once.
1299             push @a, map { $_ * 2 } @{ $chunk_ref };
1300              
1301             ## Afterwards, invoke the gather feature, which
1302             ## will direct the data to the callback function.
1303             MCE->gather(MCE->chunk_id, @a);
1304             }
1305             );
1306              
1307             for (1..2) {
1308             my @m2;
1309              
1310             $mce->process({ gather => preserve_order(\@m2) }, [1..100000]);
1311              
1312             print scalar @m2, "\n";
1313             }
1314              
1315             $mce->shutdown;
1316              
1317             =head1 MANUAL SHUTDOWN
1318              
1319             =over 3
1320              
1321             =item MCE::Flow->finish
1322              
1323             =item MCE::Flow::finish
1324              
1325             =back
1326              
1327             Workers remain persistent as much as possible after running. Shutdown occurs
1328             automatically when the script terminates. Call finish when workers are no
1329             longer needed.
1330              
1331             use MCE::Flow;
1332              
1333             MCE::Flow->init(
1334             chunk_size => 20, max_workers => 'auto'
1335             );
1336              
1337             mce_flow sub { ... }, 1..100;
1338              
1339             MCE::Flow->finish;
1340              
1341             =head1 INDEX
1342              
1343             L<MCE|MCE>, L<MCE::Core>
1344              
1345             =head1 AUTHOR
1346              
1347             Mario E. Roy, S<E<lt>marioeroy AT gmail DOT comE<gt>>
1348              
1349             =cut
1350