File Coverage

blib/lib/MCE/Loop.pm
Criterion Covered Total %
statement 135 173 78.0
branch 69 134 51.4
condition 22 53 41.5
subroutine 13 16 81.2
pod 5 5 100.0
total 244 381 64.0


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## MCE model for building parallel loops.
4             ##
5             ###############################################################################
6              
7             package MCE::Loop;
8              
9 5     5   537716 use strict;
  5         12  
  5         343  
10 5     5   39 use warnings;
  5         39  
  5         378  
11              
12 5     5   34 no warnings qw( threads recursion uninitialized );
  5         10  
  5         446  
13              
14             our $VERSION = '1.902';
15              
16             ## no critic (BuiltinFunctions::ProhibitStringyEval)
17             ## no critic (Subroutines::ProhibitSubroutinePrototypes)
18             ## no critic (TestingAndDebugging::ProhibitNoStrict)
19              
20 5     5   37 use Scalar::Util qw( looks_like_number );
  5         19  
  5         345  
21 5     5   3197 use MCE;
  5         19  
  5         38  
22              
23             our @CARP_NOT = qw( MCE );
24              
25             my $_tid = $INC{'threads.pm'} ? threads->tid() : 0;
26              
27             sub CLONE {
28 0 0   0   0 $_tid = threads->tid() if $INC{'threads.pm'};
29             }
30              
31             ###############################################################################
32             ## ----------------------------------------------------------------------------
33             ## Import routine.
34             ##
35             ###############################################################################
36              
37             my ($_MCE, $_def, $_params, $_prev_c, $_tag) = ({}, {}, {}, {}, 'MCE::Loop');
38              
39             sub import {
40 5     5   99 my ($_class, $_pkg) = (shift, caller);
41              
42 5         28 my $_p = $_def->{$_pkg} = {
43             MAX_WORKERS => 'auto',
44             CHUNK_SIZE => 'auto',
45             };
46              
47             ## Import functions.
48 5 50       26 if ($_pkg !~ /^MCE::/) {
49 5     5   49 no strict 'refs'; no warnings 'redefine';
  5     5   14  
  5         247  
  5         44  
  5         63  
  5         17118  
50 5         11 *{ $_pkg.'::mce_loop_f' } = \&run_file;
  5         62  
51 5         17 *{ $_pkg.'::mce_loop_s' } = \&run_seq;
  5         27  
52 5         10 *{ $_pkg.'::mce_loop' } = \&run;
  5         21  
53             }
54              
55             ## Process module arguments.
56 5         23 while ( my $_argument = shift ) {
57 0         0 my $_arg = lc $_argument;
58              
59 0 0       0 $_p->{MAX_WORKERS} = shift, next if ( $_arg eq 'max_workers' );
60 0 0       0 $_p->{CHUNK_SIZE} = shift, next if ( $_arg eq 'chunk_size' );
61 0 0       0 $_p->{TMP_DIR} = shift, next if ( $_arg eq 'tmp_dir' );
62 0 0       0 $_p->{FREEZE} = shift, next if ( $_arg eq 'freeze' );
63 0 0       0 $_p->{THAW} = shift, next if ( $_arg eq 'thaw' );
64 0 0       0 $_p->{INIT_RELAY} = shift, next if ( $_arg eq 'init_relay' );
65 0 0       0 $_p->{USE_THREADS} = shift, next if ( $_arg eq 'use_threads' );
66              
67             ## Sereal 3.015+, if available, is used automatically by MCE 1.8+.
68 0 0       0 if ( $_arg eq 'sereal' ) {
69 0 0       0 if ( shift eq '0' ) {
70 0         0 require Storable;
71 0         0 $_p->{FREEZE} = \&Storable::freeze;
72 0         0 $_p->{THAW} = \&Storable::thaw;
73             }
74 0         0 next;
75             }
76              
77 0         0 _croak("Error: ($_argument) invalid module option");
78             }
79              
80 5         34 $_p->{MAX_WORKERS} = MCE::_parse_max_workers($_p->{MAX_WORKERS});
81              
82 5         31 MCE::_validate_number($_p->{MAX_WORKERS}, 'MAX_WORKERS', $_tag);
83             MCE::_validate_number($_p->{CHUNK_SIZE}, 'CHUNK_SIZE', $_tag)
84 5 50       16 unless ($_p->{CHUNK_SIZE} eq 'auto');
85              
86 5         125 return;
87             }
88              
89             ###############################################################################
90             ## ----------------------------------------------------------------------------
91             ## Init and finish routines.
92             ##
93             ###############################################################################
94              
95             sub MCE::Loop::_guard::DESTROY {
96 0     0   0 my ($_pkg, $_id) = @{ $_[0] };
  0         0  
97              
98 0 0 0     0 if (defined $_pkg && $_id eq "$$.$_tid") {
99 0         0 @{ $_[0] } = ();
  0         0  
100 0         0 MCE::Loop->finish($_pkg);
101             }
102              
103 0         0 return;
104             }
105              
106             sub init (@) {
107              
108 6 50 33 6 1 1244 shift if (defined $_[0] && $_[0] eq 'MCE::Loop');
109 6         68 my $_pkg = "$$.$_tid.".caller();
110              
111 6 50       104 $_params->{$_pkg} = (ref $_[0] eq 'HASH') ? shift : { @_ };
112              
113 6         18 @_ = ();
114              
115             defined wantarray
116 6 50       126 ? bless([$_pkg, "$$.$_tid"], MCE::Loop::_guard::)
117             : ();
118             }
119              
120             sub finish (@) {
121              
122 11 50 33 11 1 4509 shift if (defined $_[0] && $_[0] eq 'MCE::Loop');
123 11 100       82 my $_pkg = (defined $_[0]) ? shift : "$$.$_tid.".caller();
124              
125 11 100 66     162 if ( $_pkg eq 'MCE' ) {
    100          
126 5         13 for my $_k ( keys %{ $_MCE } ) { MCE::Loop->finish($_k, 1); }
  5         122  
  3         102  
127             }
128             elsif ( $_MCE->{$_pkg} && $_MCE->{$_pkg}{_init_pid} eq "$$.$_tid" ) {
129 3 50       108 $_MCE->{$_pkg}->shutdown(@_) if $_MCE->{$_pkg}{_spawned};
130              
131 3         32 delete $_prev_c->{$_pkg};
132 3         42 delete $_MCE->{$_pkg};
133             }
134              
135 11         54 @_ = ();
136              
137 11         34 return;
138             }
139              
140             ###############################################################################
141             ## ----------------------------------------------------------------------------
142             ## Parallel loop with MCE -- file.
143             ##
144             ###############################################################################
145              
146             sub run_file (&@) {
147              
148 4 50 33 4 1 4898 shift if (defined $_[0] && $_[0] eq 'MCE::Loop');
149              
150 4         10 my $_code = shift; my $_file = shift;
  4         10  
151 4         44 my $_pid = "$$.$_tid.".caller();
152              
153 4 50       16 if (defined (my $_p = $_params->{$_pid})) {
154 4 50       14 delete $_p->{input_data} if (exists $_p->{input_data});
155 4 50       18 delete $_p->{sequence} if (exists $_p->{sequence});
156             }
157             else {
158 0         0 $_params->{$_pid} = {};
159             }
160              
161 4 100 66     116 if (defined $_file && ref $_file eq '' && $_file ne '') {
    50 66        
      33        
162 2 50       88 _croak("$_tag: ($_file) does not exist") unless (-e $_file);
163 2 50       66 _croak("$_tag: ($_file) is not readable") unless (-r $_file);
164 2 50       20 _croak("$_tag: ($_file) is not a plain file") unless (-f $_file);
165 2         74 $_params->{$_pid}{_file} = $_file;
166             }
167             elsif (ref $_file eq 'SCALAR' || ref($_file) =~ /^(?:GLOB|FileHandle|IO::)/) {
168 2         10 $_params->{$_pid}{_file} = $_file;
169             }
170             else {
171 0         0 _croak("$_tag: (file) is not specified or valid");
172             }
173              
174 4         12 @_ = ();
175              
176 4         18 return run($_code);
177             }
178              
179             ###############################################################################
180             ## ----------------------------------------------------------------------------
181             ## Parallel loop with MCE -- sequence.
182             ##
183             ###############################################################################
184              
185             sub run_seq (&@) {
186              
187 2 50 33 2 1 1426 shift if (defined $_[0] && $_[0] eq 'MCE::Loop');
188              
189 2         4 my $_code = shift;
190 2         12 my $_pid = "$$.$_tid.".caller();
191              
192 2 50       8 if (defined (my $_p = $_params->{$_pid})) {
193 2 50       6 delete $_p->{input_data} if (exists $_p->{input_data});
194 2 50       6 delete $_p->{_file} if (exists $_p->{_file});
195             }
196             else {
197 0         0 $_params->{$_pid} = {};
198             }
199              
200 2         6 my ($_begin, $_end);
201              
202 2 50 33     10 if (ref $_[0] eq 'HASH') {
    50          
    50          
203 0         0 $_begin = $_[0]->{begin}, $_end = $_[0]->{end};
204 0         0 $_params->{$_pid}{sequence} = $_[0];
205             }
206             elsif (ref $_[0] eq 'ARRAY') {
207 0 0 0     0 if (@{ $_[0] } > 3 && $_[0]->[3] =~ /\d$/) {
  0         0  
208 0         0 $_begin = $_[0]->[0], $_end = $_[0]->[-1];
209 0         0 $_params->{$_pid}{sequence} = [ $_[0]->[0], $_[0]->[-1] ];
210             }
211             else {
212 0         0 $_begin = $_[0]->[0], $_end = $_[0]->[1];
213 0         0 $_params->{$_pid}{sequence} = $_[0];
214             }
215             }
216             elsif (ref $_[0] eq '' || ref($_[0]) =~ /^Math::/) {
217 2 50 33     8 if (@_ > 3 && $_[3] =~ /\d$/) {
218 0         0 $_begin = $_[0], $_end = $_[-1];
219 0         0 $_params->{$_pid}{sequence} = [ $_[0], $_[-1] ];
220             }
221             else {
222 2         2 $_begin = $_[0], $_end = $_[1];
223 2         20 $_params->{$_pid}{sequence} = [ @_ ];
224             }
225             }
226             else {
227 0         0 _croak("$_tag: (sequence) is not specified or valid");
228             }
229              
230 2 50       6 _croak("$_tag: (begin) is not specified for sequence")
231             unless (defined $_begin);
232 2 50       4 _croak("$_tag: (end) is not specified for sequence")
233             unless (defined $_end);
234              
235 2         4 $_params->{$_pid}{sequence_run} = undef;
236              
237 2         4 @_ = ();
238              
239 2         6 return run($_code);
240             }
241              
242             ###############################################################################
243             ## ----------------------------------------------------------------------------
244             ## Parallel loop with MCE.
245             ##
246             ###############################################################################
247              
248             sub run (&@) {
249              
250 14 50 33 14 1 9202 shift if (defined $_[0] && $_[0] eq 'MCE::Loop');
251              
252 14         32 my $_code = shift;
253 14 100       132 my $_pkg = caller() eq 'MCE::Loop' ? caller(1) : caller();
254 14         106 my $_pid = "$$.$_tid.$_pkg";
255              
256 14         28 my $_input_data; my $_max_workers = $_def->{$_pkg}{MAX_WORKERS};
  14         72  
257 14         34 my $_r = ref $_[0];
258              
259 14 100 66     224 if (@_ == 1 && $_r =~ /^(?:ARRAY|HASH|SCALAR|CODE|GLOB|FileHandle|IO::|Iterator::)/) {
260 4         12 $_input_data = shift;
261             }
262              
263 14 50       60 if (defined (my $_p = $_params->{$_pid})) {
264             $_max_workers = MCE::_parse_max_workers($_p->{max_workers})
265 14 50       160 if (exists $_p->{max_workers});
266              
267 14 100 100     126 delete $_p->{sequence} if (defined $_input_data || scalar @_);
268 14 50       46 delete $_p->{user_func} if (exists $_p->{user_func});
269 14 50       38 delete $_p->{user_tasks} if (exists $_p->{user_tasks});
270             }
271              
272             my $_chunk_size = MCE::_parse_chunk_size(
273 14         98 $_def->{$_pkg}{CHUNK_SIZE}, $_max_workers, $_params->{$_pid},
274             $_input_data, scalar @_
275             );
276              
277 14 50       54 if (defined (my $_p = $_params->{$_pid})) {
278 14 100       60 if (exists $_p->{_file}) {
279 4         16 $_input_data = delete $_p->{_file};
280             } else {
281 10 50       32 $_input_data = $_p->{input_data} if exists $_p->{input_data};
282             }
283             }
284              
285             ## -------------------------------------------------------------------------
286              
287 14         78 MCE::_save_state($_MCE->{$_pid});
288              
289 14 100 66     118 if (!defined $_prev_c->{$_pid} || $_prev_c->{$_pid} != $_code) {
290 6 50       16 $_MCE->{$_pid}->shutdown() if (defined $_MCE->{$_pid});
291 6         26 $_prev_c->{$_pid} = $_code;
292              
293 6         34 my %_opts = (
294             max_workers => $_max_workers, task_name => $_tag,
295             user_func => $_code,
296             );
297              
298 6 50       24 if (defined (my $_p = $_params->{$_pid})) {
299 6         10 for my $_k (keys %{ $_p }) {
  6         26  
300 10 50       24 next if ($_k eq 'sequence_run');
301 10 50       26 next if ($_k eq 'input_data');
302 10 50       20 next if ($_k eq 'chunk_size');
303              
304             _croak("$_tag: ($_k) is not a valid constructor argument")
305 10 50       94 unless (exists $MCE::_valid_fields_new{$_k});
306              
307 10         30 $_opts{$_k} = $_p->{$_k};
308             }
309             }
310              
311 6         20 for my $_k (qw/ tmp_dir freeze thaw init_relay use_threads /) {
312             $_opts{$_k} = $_def->{$_pkg}{uc($_k)}
313 30 50 33     88 if (exists $_def->{$_pkg}{uc($_k)} && !exists $_opts{$_k});
314             }
315              
316 6         72 $_MCE->{$_pid} = MCE->new(pkg => $_pkg, %_opts);
317             }
318              
319             ## -------------------------------------------------------------------------
320              
321 14 100       76 my @_a; my $_wa = wantarray; $_MCE->{$_pid}{gather} = \@_a if (defined $_wa);
  14         26  
  14         44  
322              
323 14 100       46 if (defined $_input_data) {
    100          
324 8         20 @_ = ();
325 8         94 $_MCE->{$_pid}->process({ chunk_size => $_chunk_size }, $_input_data);
326 7         75 delete $_MCE->{$_pid}{input_data};
327             }
328             elsif (scalar @_) {
329 4         44 $_MCE->{$_pid}->process({ chunk_size => $_chunk_size }, \@_);
330 2         62 delete $_MCE->{$_pid}{input_data};
331             }
332             else {
333 2 50 33     38 if (defined $_params->{$_pid} && exists $_params->{$_pid}{sequence}) {
334             $_MCE->{$_pid}->run({
335             chunk_size => $_chunk_size,
336             sequence => $_params->{$_pid}{sequence}
337 2         14 }, 0);
338 2 50       20 if (exists $_params->{$_pid}{sequence_run}) {
339 2         6 delete $_params->{$_pid}{sequence_run};
340 2         6 delete $_params->{$_pid}{sequence};
341             }
342 2         6 delete $_MCE->{$_pid}{sequence};
343             }
344             }
345              
346 11         126 MCE::_restore_state();
347              
348 11 100       37 delete $_MCE->{$_pid}{gather} if (defined $_wa);
349              
350 11 100       258 return ((defined $_wa) ? @_a : ());
351             }
352              
353             ###############################################################################
354             ## ----------------------------------------------------------------------------
355             ## Private methods.
356             ##
357             ###############################################################################
358              
359             sub _croak {
360              
361 0     0     goto &MCE::_croak;
362             }
363              
364             1;
365              
366             __END__
367              
368             ###############################################################################
369             ## ----------------------------------------------------------------------------
370             ## Module usage.
371             ##
372             ###############################################################################
373              
374             =head1 NAME
375              
376             MCE::Loop - MCE model for building parallel loops
377              
378             =head1 VERSION
379              
380             This document describes MCE::Loop version 1.902
381              
382             =head1 DESCRIPTION
383              
384             This module provides a parallel loop implementation through Many-Core Engine.
385             MCE::Loop is not MCE::Map but more along the lines of an easy way to spin up a
386             MCE instance and have user_func pointing to your code block. If you want
387             something similar to map, then see L<MCE::Map>.
388              
389             ## Construction when chunking is not desired
390              
391             use MCE::Loop;
392              
393             MCE::Loop->init(
394             max_workers => 5, chunk_size => 1
395             );
396              
397             mce_loop {
398             my ($mce, $chunk_ref, $chunk_id) = @_;
399             MCE->say("$chunk_id: $_");
400             } 40 .. 48;
401              
402             -- Output
403              
404             3: 42
405             1: 40
406             2: 41
407             4: 43
408             5: 44
409             6: 45
410             7: 46
411             8: 47
412             9: 48
413              
414             ## Construction for 'auto' or greater than 1
415              
416             use MCE::Loop;
417              
418             MCE::Loop->init(
419             max_workers => 5, chunk_size => 'auto'
420             );
421              
422             mce_loop {
423             my ($mce, $chunk_ref, $chunk_id) = @_;
424             for (@{ $chunk_ref }) {
425             MCE->say("$chunk_id: $_");
426             }
427             } 40 .. 48;
428              
429             -- Output
430              
431             1: 40
432             2: 42
433             1: 41
434             4: 46
435             2: 43
436             5: 48
437             3: 44
438             4: 47
439             3: 45
440              
441             =head1 SYNOPSIS when CHUNK_SIZE EQUALS 1
442              
443             All models in MCE default to 'auto' for chunk_size. The arguments for the block
444             are the same as writing a user_func block using the Core API.
445              
446             Beginning with MCE 1.5, the next input item is placed into the input scalar
447             variable $_ when chunk_size equals 1. Otherwise, $_ points to $chunk_ref
448             containing many items. Basically, line 2 below may be omitted from your code
449             when using $_. One can call MCE->chunk_id to obtain the current chunk id.
450              
451             line 1: user_func => sub {
452             line 2: my ($mce, $chunk_ref, $chunk_id) = @_;
453             line 3:
454             line 4: $_ points to $chunk_ref->[0]
455             line 5: in MCE 1.5 when chunk_size == 1
456             line 6:
457             line 7: $_ points to $chunk_ref
458             line 8: in MCE 1.5 when chunk_size > 1
459             line 9: }
460              
461             Follow this synopsis when chunk_size equals one. Looping is not required from
462             inside the block. Hence, the block is called once per each item.
463              
464             ## Exports mce_loop, mce_loop_f, and mce_loop_s
465             use MCE::Loop;
466              
467             MCE::Loop->init(
468             chunk_size => 1
469             );
470              
471             ## Array or array_ref
472             mce_loop { do_work($_) } 1..10000;
473             mce_loop { do_work($_) } \@list;
474              
475             ## Important; pass an array_ref for deeply input data
476             mce_loop { do_work($_) } [ [ 0, 1 ], [ 0, 2 ], ... ];
477             mce_loop { do_work($_) } \@deeply_list;
478              
479             ## File path, glob ref, IO::All::{ File, Pipe, STDIO } obj, or scalar ref
480             ## Workers read directly and not involve the manager process
481             mce_loop_f { chomp; do_work($_) } "/path/to/file"; # efficient
482              
483             ## Involves the manager process, therefore slower
484             mce_loop_f { chomp; do_work($_) } $file_handle;
485             mce_loop_f { chomp; do_work($_) } $io;
486             mce_loop_f { chomp; do_work($_) } \$scalar;
487              
488             ## Sequence of numbers (begin, end [, step, format])
489             mce_loop_s { do_work($_) } 1, 10000, 5;
490             mce_loop_s { do_work($_) } [ 1, 10000, 5 ];
491              
492             mce_loop_s { do_work($_) } {
493             begin => 1, end => 10000, step => 5, format => undef
494             };
495              
496             =head1 SYNOPSIS when CHUNK_SIZE is GREATER THAN 1
497              
498             Follow this synopsis when chunk_size equals 'auto' or greater than 1.
499             This means having to loop through the chunk from inside the block.
500              
501             use MCE::Loop;
502              
503             MCE::Loop->init( ## Chunk_size defaults to 'auto' when
504             chunk_size => 'auto' ## not specified. Therefore, the init
505             ); ## function may be omitted.
506              
507             ## Syntax is shown for mce_loop for demonstration purposes.
508             ## Looping inside the block is the same for mce_loop_f and
509             ## mce_loop_s.
510              
511             ## Array or array_ref
512             mce_loop { do_work($_) for (@{ $_ }) } 1..10000;
513             mce_loop { do_work($_) for (@{ $_ }) } \@list;
514              
515             ## Important; pass an array_ref for deeply input data
516             mce_loop { do_work($_) for (@{ $_ }) } [ [ 0, 1 ], [ 0, 2 ], ... ];
517             mce_loop { do_work($_) for (@{ $_ }) } \@deeply_list;
518              
519             ## Resembles code using the core MCE API
520             mce_loop {
521             my ($mce, $chunk_ref, $chunk_id) = @_;
522              
523             for (@{ $chunk_ref }) {
524             do_work($_);
525             }
526              
527             } 1..10000;
528              
529             Chunking reduces the number of IPC calls behind the scene. Think in terms of
530             chunks whenever processing a large amount of data. For relatively small data,
531             choosing 1 for chunk_size is fine.
532              
533             =head1 OVERRIDING DEFAULTS
534              
535             The following list options which may be overridden when loading the module.
536              
537             use Sereal qw( encode_sereal decode_sereal );
538             use CBOR::XS qw( encode_cbor decode_cbor );
539             use JSON::XS qw( encode_json decode_json );
540              
541             use MCE::Loop
542             max_workers => 4, # Default 'auto'
543             chunk_size => 100, # Default 'auto'
544             tmp_dir => "/path/to/app/tmp", # $MCE::Signal::tmp_dir
545             freeze => \&encode_sereal, # \&Storable::freeze
546             thaw => \&decode_sereal, # \&Storable::thaw
547             init_relay => 0, # Default undef; MCE 1.882+
548             use_threads => 0, # Default undef; MCE 1.882+
549             ;
550              
551             From MCE 1.8 onwards, Sereal 3.015+ is loaded automatically if available.
552             Specify C<< Sereal => 0 >> to use Storable instead.
553              
554             use MCE::Loop Sereal => 0;
555              
556             =head1 CUSTOMIZING MCE
557              
558             =over 3
559              
560             =item MCE::Loop->init ( options )
561              
562             =item MCE::Loop::init { options }
563              
564             =back
565              
566             The init function accepts a hash of MCE options.
567              
568             In scalar context (API available since 1.897), call C<MCE::Loop->finish>
569             automatically upon leaving the scope or program.
570              
571             use MCE::Loop;
572              
573             my $guard = MCE::Loop->init(
574             chunk_size => 1, max_workers => 4,
575              
576             user_begin => sub {
577             print "## ", MCE->wid, " started\n";
578             },
579              
580             user_end => sub {
581             print "## ", MCE->wid, " completed\n";
582             }
583             );
584              
585             my %a = mce_loop { MCE->gather($_, $_ * $_) } 1..100;
586              
587             print "\n", "@a{1..100}", "\n";
588              
589             -- Output
590              
591             ## 3 started
592             ## 1 started
593             ## 2 started
594             ## 4 started
595             ## 1 completed
596             ## 2 completed
597             ## 3 completed
598             ## 4 completed
599              
600             1 4 9 16 25 36 49 64 81 100 121 144 169 196 225 256 289 324 361
601             400 441 484 529 576 625 676 729 784 841 900 961 1024 1089 1156
602             1225 1296 1369 1444 1521 1600 1681 1764 1849 1936 2025 2116 2209
603             2304 2401 2500 2601 2704 2809 2916 3025 3136 3249 3364 3481 3600
604             3721 3844 3969 4096 4225 4356 4489 4624 4761 4900 5041 5184 5329
605             5476 5625 5776 5929 6084 6241 6400 6561 6724 6889 7056 7225 7396
606             7569 7744 7921 8100 8281 8464 8649 8836 9025 9216 9409 9604 9801
607             10000
608              
609             =head1 API DOCUMENTATION
610              
611             The following assumes chunk_size equals 1 in order to demonstrate all the
612             possibilities for providing input data.
613              
614             =over 3
615              
616             =item MCE::Loop->run ( sub { code }, list )
617              
618             =item mce_loop { code } list
619              
620             =back
621              
622             Input data may be defined using a list, an array ref, or a hash ref.
623              
624             # $_ contains the item when chunk_size => 1
625              
626             mce_loop { do_work($_) } 1..1000;
627             mce_loop { do_work($_) } \@list;
628              
629             # Important; pass an array_ref for deeply input data
630              
631             mce_loop { do_work($_) } [ [ 0, 1 ], [ 0, 2 ], ... ];
632             mce_loop { do_work($_) } \@deeply_list;
633              
634             # Chunking; any chunk_size => 1 or greater
635              
636             my %res = mce_loop {
637             my ($mce, $chunk_ref, $chunk_id) = @_;
638             my %ret;
639             for my $item (@{ $chunk_ref }) {
640             $ret{$item} = $item * 2;
641             }
642             MCE->gather(%ret);
643             }
644             \@list;
645              
646             # Input hash; current API available since 1.828
647              
648             my %res = mce_loop {
649             my ($mce, $chunk_ref, $chunk_id) = @_;
650             my %ret;
651             for my $key (keys %{ $chunk_ref }) {
652             $ret{$key} = $chunk_ref->{$key} * 2;
653             }
654             MCE->gather(%ret);
655             }
656             \%hash;
657              
658             =over 3
659              
660             =item MCE::Loop->run_file ( sub { code }, file )
661              
662             =item mce_loop_f { code } file
663              
664             =back
665              
666             The fastest of these is the /path/to/file. Workers communicate the next offset
667             position among themselves with zero interaction by the manager process.
668              
669             C<IO::All> { File, Pipe, STDIO } is supported since MCE 1.845.
670              
671             # $_ contains the line when chunk_size => 1
672              
673             mce_loop_f { $_ } "/path/to/file"; # faster
674             mce_loop_f { $_ } $file_handle;
675             mce_loop_f { $_ } $io; # IO::All
676             mce_loop_f { $_ } \$scalar;
677              
678             # chunking, any chunk_size => 1 or greater
679              
680             my %res = mce_loop_f {
681             my ($mce, $chunk_ref, $chunk_id) = @_;
682             my $buf = '';
683             for my $line (@{ $chunk_ref }) {
684             $buf .= $line;
685             }
686             MCE->gather($chunk_id, $buf);
687             }
688             "/path/to/file";
689              
690             =over 3
691              
692             =item MCE::Loop->run_seq ( sub { code }, $beg, $end [, $step, $fmt ] )
693              
694             =item mce_loop_s { code } $beg, $end [, $step, $fmt ]
695              
696             =back
697              
698             Sequence may be defined as a list, an array reference, or a hash reference.
699             The functions require both begin and end values to run. Step and format are
700             optional. The format is passed to sprintf (% may be omitted below).
701              
702             my ($beg, $end, $step, $fmt) = (10, 20, 0.1, "%4.1f");
703              
704             # $_ contains the sequence number when chunk_size => 1
705              
706             mce_loop_s { $_ } $beg, $end, $step, $fmt;
707             mce_loop_s { $_ } [ $beg, $end, $step, $fmt ];
708              
709             mce_loop_s { $_ } {
710             begin => $beg, end => $end,
711             step => $step, format => $fmt
712             };
713              
714             # chunking, any chunk_size => 1 or greater
715              
716             my %res = mce_loop_s {
717             my ($mce, $chunk_ref, $chunk_id) = @_;
718             my $buf = '';
719             for my $seq (@{ $chunk_ref }) {
720             $buf .= "$seq\n";
721             }
722             MCE->gather($chunk_id, $buf);
723             }
724             [ $beg, $end ];
725              
726             The sequence engine can compute 'begin' and 'end' items only, for the chunk,
727             and not the items in between (hence boundaries only). This option applies
728             to sequence only and has no effect when chunk_size equals 1.
729              
730             The time to run is 0.006s below. This becomes 0.827s without the bounds_only
731             option due to computing all items in between, thus creating a very large
732             array. Basically, specify bounds_only => 1 when boundaries is all you need
733             for looping inside the block; e.g. Monte Carlo simulations.
734              
735             Time was measured using 1 worker to emphasize the difference.
736              
737             use MCE::Loop;
738              
739             MCE::Loop->init(
740             max_workers => 1, chunk_size => 1_250_000,
741             bounds_only => 1
742             );
743              
744             # Typically, the input scalar $_ contains the sequence number
745             # when chunk_size => 1, unless the bounds_only option is set
746             # which is the case here. Thus, $_ points to $chunk_ref.
747              
748             mce_loop_s {
749             my ($mce, $chunk_ref, $chunk_id) = @_;
750              
751             # $chunk_ref contains 2 items, not 1_250_000
752             # my ( $begin, $end ) = ( $_->[0], $_->[1] );
753              
754             my $begin = $chunk_ref->[0];
755             my $end = $chunk_ref->[1];
756              
757             # for my $seq ( $begin .. $end ) {
758             # ...
759             # }
760              
761             MCE->printf("%7d .. %8d\n", $begin, $end);
762             }
763             [ 1, 10_000_000 ];
764              
765             -- Output
766              
767             1 .. 1250000
768             1250001 .. 2500000
769             2500001 .. 3750000
770             3750001 .. 5000000
771             5000001 .. 6250000
772             6250001 .. 7500000
773             7500001 .. 8750000
774             8750001 .. 10000000
775              
776             =over 3
777              
778             =item MCE::Loop->run ( sub { code }, iterator )
779              
780             =item mce_loop { code } iterator
781              
782             =back
783              
784             An iterator reference may be specified for input_data. Iterators are described
785             under section "SYNTAX for INPUT_DATA" at L<MCE::Core>.
786              
787             mce_loop { $_ } make_iterator(10, 30, 2);
788              
789             =head1 GATHERING DATA
790              
791             Unlike MCE::Map where gather and output order are done for you automatically,
792             the gather method is used to have results sent back to the manager process.
793              
794             use MCE::Loop chunk_size => 1;
795              
796             ## Output order is not guaranteed.
797             my @a1 = mce_loop { MCE->gather($_ * 2) } 1..100;
798             print "@a1\n\n";
799              
800             ## Outputs to a hash instead (key, value).
801             my %h1 = mce_loop { MCE->gather($_, $_ * 2) } 1..100;
802             print "@h1{1..100}\n\n";
803              
804             ## This does the same thing due to chunk_id starting at one.
805             my %h2 = mce_loop { MCE->gather(MCE->chunk_id, $_ * 2) } 1..100;
806             print "@h2{1..100}\n\n";
807              
808             The gather method may be called multiple times within the block unlike return
809             which would leave the block. Therefore, think of gather as yielding results
810             immediately to the manager process without actually leaving the block.
811              
812             use MCE::Loop chunk_size => 1, max_workers => 3;
813              
814             my @hosts = qw(
815             hosta hostb hostc hostd hoste
816             );
817              
818             my %h3 = mce_loop {
819             my ($output, $error, $status); my $host = $_;
820              
821             ## Do something with $host;
822             $output = "Worker ". MCE->wid .": Hello from $host";
823              
824             if (MCE->chunk_id % 3 == 0) {
825             ## Simulating an error condition
826             local $? = 1; $status = $?;
827             $error = "Error from $host"
828             }
829             else {
830             $status = 0;
831             }
832              
833             ## Ensure unique keys (key, value) when gathering to
834             ## a hash.
835             MCE->gather("$host.out", $output);
836             MCE->gather("$host.err", $error) if (defined $error);
837             MCE->gather("$host.sta", $status);
838              
839             } @hosts;
840              
841             foreach my $host (@hosts) {
842             print $h3{"$host.out"}, "\n";
843             print $h3{"$host.err"}, "\n" if (exists $h3{"$host.err"});
844             print "Exit status: ", $h3{"$host.sta"}, "\n\n";
845             }
846              
847             -- Output
848              
849             Worker 2: Hello from hosta
850             Exit status: 0
851              
852             Worker 1: Hello from hostb
853             Exit status: 0
854              
855             Worker 3: Hello from hostc
856             Error from hostc
857             Exit status: 1
858              
859             Worker 2: Hello from hostd
860             Exit status: 0
861              
862             Worker 1: Hello from hoste
863             Exit status: 0
864              
865             The following uses an anonymous array containing 3 elements when gathering
866             data. Serialization is automatic behind the scene.
867              
868             my %h3 = mce_loop {
869             ...
870              
871             MCE->gather($host, [$output, $error, $status]);
872              
873             } @hosts;
874              
875             foreach my $host (@hosts) {
876             print $h3{$host}->[0], "\n";
877             print $h3{$host}->[1], "\n" if (defined $h3{$host}->[1]);
878             print "Exit status: ", $h3{$host}->[2], "\n\n";
879             }
880              
881             Although MCE::Map comes to mind, one may want additional control when
882             gathering data such as retaining output order.
883              
884             use MCE::Loop;
885              
886             sub preserve_order {
887             my %tmp; my $order_id = 1; my $gather_ref = $_[0];
888              
889             return sub {
890             $tmp{ (shift) } = \@_;
891              
892             while (1) {
893             last unless exists $tmp{$order_id};
894             push @{ $gather_ref }, @{ delete $tmp{$order_id++} };
895             }
896              
897             return;
898             };
899             }
900              
901             my @m2;
902              
903             MCE::Loop->init(
904             chunk_size => 'auto', max_workers => 'auto',
905             gather => preserve_order(\@m2)
906             );
907              
908             mce_loop {
909             my @a; my ($mce, $chunk_ref, $chunk_id) = @_;
910              
911             ## Compute the entire chunk data at once.
912             push @a, map { $_ * 2 } @{ $chunk_ref };
913              
914             ## Afterwards, invoke the gather feature, which
915             ## will direct the data to the callback function.
916             MCE->gather(MCE->chunk_id, @a);
917              
918             } 1..100000;
919              
920             MCE::Loop->finish;
921              
922             print scalar @m2, "\n";
923              
924             All 6 models support 'auto' for chunk_size unlike the Core API. Think of the
925             models as the basis for providing JIT for MCE. They create the instance, tune
926             max_workers, and tune chunk_size automatically regardless of the hardware.
927              
928             The following does the same thing using the Core API.
929              
930             use MCE;
931              
932             sub preserve_order {
933             ...
934             }
935              
936             my $mce = MCE->new(
937             max_workers => 'auto', chunk_size => 8000,
938              
939             user_func => sub {
940             my @a; my ($mce, $chunk_ref, $chunk_id) = @_;
941              
942             ## Compute the entire chunk data at once.
943             push @a, map { $_ * 2 } @{ $chunk_ref };
944              
945             ## Afterwards, invoke the gather feature, which
946             ## will direct the data to the callback function.
947             MCE->gather(MCE->chunk_id, @a);
948             }
949             );
950              
951             my @m2;
952              
953             $mce->process({ gather => preserve_order(\@m2) }, [1..100000]);
954             $mce->shutdown;
955              
956             print scalar @m2, "\n";
957              
958             =head1 MANUAL SHUTDOWN
959              
960             =over 3
961              
962             =item MCE::Loop->finish
963              
964             =item MCE::Loop::finish
965              
966             =back
967              
968             Workers remain persistent as much as possible after running. Shutdown occurs
969             automatically when the script terminates. Call finish when workers are no
970             longer needed.
971              
972             use MCE::Loop;
973              
974             MCE::Loop->init(
975             chunk_size => 20, max_workers => 'auto'
976             );
977              
978             mce_loop { ... } 1..100;
979              
980             MCE::Loop->finish;
981              
982             =head1 INDEX
983              
984             L<MCE|MCE>, L<MCE::Core>
985              
986             =head1 AUTHOR
987              
988             Mario E. Roy, S<E<lt>marioeroy AT gmail DOT comE<gt>>
989              
990             =cut
991