File Coverage

blib/lib/MCE/Stream.pm
Criterion Covered Total %
statement 280 368 76.0
branch 126 244 51.6
condition 52 103 50.4
subroutine 19 23 82.6
pod 5 5 100.0
total 482 743 64.8


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## Parallel stream model for chaining multiple maps and greps.
4             ##
5             ###############################################################################
6              
7             package MCE::Stream;
8              
9 10     10   1067974 use strict;
  10         20  
  10         381  
10 10     10   34 use warnings;
  10         57  
  10         570  
11              
12 10     10   52 no warnings qw( threads recursion uninitialized );
  10         20  
  10         603  
13              
14             our $VERSION = '1.902';
15              
16             ## no critic (BuiltinFunctions::ProhibitStringyEval)
17             ## no critic (Subroutines::ProhibitSubroutinePrototypes)
18             ## no critic (TestingAndDebugging::ProhibitNoStrict)
19              
20 10     10   44 use Scalar::Util qw( looks_like_number );
  10         12  
  10         504  
21              
22 10     10   5652 use MCE;
  10         31  
  10         88  
23 10     10   6640 use MCE::Queue;
  10         38  
  10         67  
24              
25             our @CARP_NOT = qw( MCE );
26              
27             my $_tid = $INC{'threads.pm'} ? threads->tid() : 0;
28              
29             sub CLONE {
30 0 0   0   0 $_tid = threads->tid() if $INC{'threads.pm'};
31             }
32              
33             ###############################################################################
34             ## ----------------------------------------------------------------------------
35             ## Import routine.
36             ##
37             ###############################################################################
38              
39             my ($_MCE, $_def, $_params, $_tag) = ({}, {}, {}, 'MCE::Stream');
40             my ($_prev_c, $_prev_m, $_prev_n, $_prev_w) = ({}, {}, {}, {});
41             my ($_user_tasks, $_queue) = ({}, {});
42              
43             sub import {
44 10     10   200 my ($_class, $_pkg) = (shift, caller);
45              
46 10         54 my $_p = $_def->{$_pkg} = {
47             MAX_WORKERS => 'auto',
48             CHUNK_SIZE => 'auto',
49             DEFAULT_MODE => 'map',
50             };
51              
52             ## Import functions.
53 10 50       32 if ($_pkg !~ /^MCE::/) {
54 10     10   71 no strict 'refs'; no warnings 'redefine';
  10     10   12  
  10         377  
  10         51  
  10         20  
  10         46806  
55 10         22 *{ $_pkg.'::mce_stream_f' } = \&run_file;
  10         65  
56 10         22 *{ $_pkg.'::mce_stream_s' } = \&run_seq;
  10         69  
57 10         12 *{ $_pkg.'::mce_stream' } = \&run;
  10         42  
58             }
59              
60             ## Process module arguments.
61 10         42 while ( my $_argument = shift ) {
62 0         0 my $_arg = lc $_argument;
63              
64 0 0       0 $_p->{MAX_WORKERS} = shift, next if ( $_arg eq 'max_workers' );
65 0 0       0 $_p->{CHUNK_SIZE} = shift, next if ( $_arg eq 'chunk_size' );
66 0 0       0 $_p->{TMP_DIR} = shift, next if ( $_arg eq 'tmp_dir' );
67 0 0       0 $_p->{FREEZE} = shift, next if ( $_arg eq 'freeze' );
68 0 0       0 $_p->{THAW} = shift, next if ( $_arg eq 'thaw' );
69 0 0       0 $_p->{INIT_RELAY} = shift, next if ( $_arg eq 'init_relay' );
70 0 0       0 $_p->{USE_THREADS} = shift, next if ( $_arg eq 'use_threads' );
71 0 0       0 $_p->{DEFAULT_MODE} = shift, next if ( $_arg eq 'default_mode' );
72              
73 0 0       0 shift, next if ( $_arg eq 'fast' ); # ignored
74              
75             ## Sereal 3.015+, if available, is used automatically by MCE 1.8+.
76 0 0       0 if ( $_arg eq 'sereal' ) {
77 0 0       0 if ( shift eq '0' ) {
78 0         0 require Storable;
79 0         0 $_p->{FREEZE} = \&Storable::freeze;
80 0         0 $_p->{THAW} = \&Storable::thaw;
81             }
82 0         0 next;
83             }
84              
85 0         0 _croak("Error: ($_argument) invalid module option");
86             }
87              
88             _croak("Error: (DEFAULT_MODE) is not valid")
89 10 50 33     65 if ($_p->{DEFAULT_MODE} ne 'grep' && $_p->{DEFAULT_MODE} ne 'map');
90              
91 10         66 $_p->{MAX_WORKERS} = MCE::_parse_max_workers($_p->{MAX_WORKERS});
92              
93 10         54 MCE::_validate_number($_p->{MAX_WORKERS}, 'MAX_WORKERS', $_tag);
94             MCE::_validate_number($_p->{CHUNK_SIZE}, 'CHUNK_SIZE', $_tag)
95 10 50       39 unless ($_p->{CHUNK_SIZE} eq 'auto');
96              
97 10         216 return;
98             }
99              
100             ###############################################################################
101             ## ----------------------------------------------------------------------------
102             ## Gather callback to ensure chunk order is preserved during gathering.
103             ## Also, the task end callback for when a task completes.
104             ##
105             ###############################################################################
106              
107             my ($_gather_ref, $_order_id, %_tmp);
108              
109             sub _preserve_order {
110              
111 379     379   1434 $_tmp{$_[1]} = $_[0];
112              
113 379 100       660 if (defined $_gather_ref) {
114 185         190 while (1) {
115 370 100       660 last unless exists $_tmp{$_order_id};
116 185         210 push @{ $_gather_ref }, @{ delete $_tmp{$_order_id++} };
  185         270  
  185         380  
117             }
118             }
119             else {
120 194         345 $_order_id++;
121             }
122              
123 379         665 return;
124             }
125              
126             sub _task_end {
127              
128 102     102   239 my ($_mce, $_task_id, $_task_name) = @_;
129 102         317 my $_pid = $_mce->{_init_pid}.'.'.$_mce->{_caller};
130              
131 102 100       396 if (defined $_mce->{user_tasks}->[$_task_id + 1]) {
132 51         203 my $n_workers = $_mce->{user_tasks}->[$_task_id + 1]->{max_workers};
133 51         86 my $_id = @{ $_queue->{$_pid} } - $_task_id - 1;
  51         253  
134              
135 51         252 $_queue->{$_pid}[$_id]->enqueue((undef) x $n_workers);
136             }
137              
138             $_params->{task_end}->($_mce, $_task_id, $_task_name)
139 102 50 33     305 if (exists $_params->{task_end} && ref $_params->{task_end} eq 'CODE');
140              
141 102         274 return;
142             }
143              
144             ###############################################################################
145             ## ----------------------------------------------------------------------------
146             ## Init and finish routines.
147             ##
148             ###############################################################################
149              
150             sub MCE::Stream::_guard::DESTROY {
151 0     0   0 my ($_pkg, $_id) = @{ $_[0] };
  0         0  
152              
153 0 0 0     0 if (defined $_pkg && $_id eq "$$.$_tid") {
154 0         0 @{ $_[0] } = ();
  0         0  
155 0         0 MCE::Stream->finish($_pkg);
156             }
157              
158 0         0 return;
159             }
160              
161             sub init (@) {
162              
163 9 50 33 9 1 2592 shift if (defined $_[0] && $_[0] eq 'MCE::Stream');
164 9         90 my $_pkg = "$$.$_tid.".caller();
165              
166 9 50       90 $_params->{$_pkg} = (ref $_[0] eq 'HASH') ? shift : { @_ };
167              
168             _croak("$_tag: (HASH) not allowed as input by this MCE model")
169 9 50       36 if ( ref $_params->{$_pkg}{input_data} eq 'HASH' );
170              
171 9         27 @_ = ();
172              
173             defined wantarray
174 9 50       45 ? bless([$_pkg, "$$.$_tid"], MCE::Stream::_guard::)
175             : ();
176             }
177              
178             sub finish (@) {
179              
180 24 50 33 24 1 6648 shift if (defined $_[0] && $_[0] eq 'MCE::Stream');
181 24 100       247 my $_pkg = (defined $_[0]) ? shift : "$$.$_tid.".caller();
182              
183 24 100 66     590 if ( $_pkg eq 'MCE' ) {
    100          
184 10         17 for my $_k ( keys %{ $_MCE } ) { MCE::Stream->finish($_k, 1); }
  10         298  
  8         392  
185             }
186             elsif ( $_MCE->{$_pkg} && $_MCE->{$_pkg}{_init_pid} eq "$$.$_tid" ) {
187 6 50       191 $_MCE->{$_pkg}->shutdown(@_) if $_MCE->{$_pkg}{_spawned};
188 6         19 $_gather_ref = $_order_id = undef, undef %_tmp;
189              
190 6         39 delete $_user_tasks->{$_pkg};
191 6         30 delete $_prev_c->{$_pkg};
192 6         33 delete $_prev_m->{$_pkg};
193 6         34 delete $_prev_n->{$_pkg};
194 6         23 delete $_prev_w->{$_pkg};
195 6         51 delete $_MCE->{$_pkg};
196              
197 6 50       18 if (defined $_queue->{$_pkg}) {
198 6         14 local $_;
199 6         7 $_->DESTROY() for (@{ $_queue->{$_pkg} });
  6         48  
200 6         54 delete $_queue->{$_pkg};
201             }
202             }
203              
204 24         103 @_ = ();
205              
206 24         112 return;
207             }
208              
209             ###############################################################################
210             ## ----------------------------------------------------------------------------
211             ## Parallel stream with MCE -- file.
212             ##
213             ###############################################################################
214              
215             sub run_file (@) {
216              
217 20 50 33 20 1 29420 shift if (defined $_[0] && $_[0] eq 'MCE::Stream');
218              
219 20 50       105 my ($_file, $_pos); my $_start_pos = (ref $_[0] eq 'HASH') ? 2 : 1;
  20         80  
220 20         235 my $_pid = "$$.$_tid.".caller();
221              
222 20 50       95 if (defined (my $_p = $_params->{$_pid})) {
223 20 50       65 delete $_p->{input_data} if (exists $_p->{input_data});
224 20 50       155 delete $_p->{sequence} if (exists $_p->{sequence});
225             }
226             else {
227 0         0 $_params->{$_pid} = {};
228             }
229              
230 20         75 for my $_i ($_start_pos .. @_ - 1) {
231 50         110 my $_r = ref $_[$_i];
232 50 100 66     435 if ($_r eq '' || $_r eq 'SCALAR' || $_r =~ /^(?:GLOB|FileHandle|IO::)/) {
      100        
233 20         30 $_file = $_[$_i]; $_pos = $_i;
  20         50  
234 20         50 last;
235             }
236             }
237              
238 20 100 66     275 if (defined $_file && ref $_file eq '' && $_file ne '') {
    50 66        
      33        
239 10 50       485 _croak("$_tag: ($_file) does not exist") unless (-e $_file);
240 10 50       125 _croak("$_tag: ($_file) is not readable") unless (-r $_file);
241 10 50       90 _croak("$_tag: ($_file) is not a plain file") unless (-f $_file);
242 10         255 $_params->{$_pid}{_file} = $_file;
243             }
244             elsif (ref $_file eq 'SCALAR' || ref($_file) =~ /^(?:GLOB|FileHandle|IO::)/) {
245 10         35 $_params->{$_pid}{_file} = $_file;
246             }
247             else {
248 0         0 _croak("$_tag: (file) is not specified or valid");
249             }
250              
251 20 50       90 if (defined $_pos) {
252 20         110 pop @_ for ($_pos .. @_ - 1);
253             }
254              
255 20         90 return run(@_);
256             }
257              
258             ###############################################################################
259             ## ----------------------------------------------------------------------------
260             ## Parallel stream with MCE -- sequence.
261             ##
262             ###############################################################################
263              
264             sub run_seq (@) {
265              
266 10 50 33 10 1 7740 shift if (defined $_[0] && $_[0] eq 'MCE::Stream');
267              
268 10 50       20 my ($_begin, $_end, $_pos); my $_start_pos = (ref $_[0] eq 'HASH') ? 2 : 1;
  10         35  
269 10         75 my $_pid = "$$.$_tid.".caller();
270              
271 10 50       120 if (defined (my $_p = $_params->{$_pid})) {
272 10 50       30 delete $_p->{sequence} if (exists $_p->{sequence});
273 10 50       25 delete $_p->{input_data} if (exists $_p->{input_data});
274 10 50       25 delete $_p->{_file} if (exists $_p->{_file});
275             }
276             else {
277 0         0 $_params->{$_pid} = {};
278             }
279              
280 10         35 for my $_i ($_start_pos .. @_ - 1) {
281 25         50 my $_r = ref $_[$_i];
282              
283 25 50 66     205 if ($_r eq '' || $_r =~ /^Math::/ || $_r eq 'HASH' || $_r eq 'ARRAY') {
      66        
      33        
284 10         15 $_pos = $_i;
285              
286 10 50 33     35 if ($_r eq '' || $_r =~ /^Math::/) {
    0          
    0          
287 10         30 $_begin = $_[$_pos], $_end = $_[$_pos + 1];
288             $_params->{$_pid}{sequence} = [
289 10         45 $_[$_pos], $_[$_pos + 1], $_[$_pos + 2], $_[$_pos + 3]
290             ];
291             }
292             elsif ($_r eq 'HASH') {
293 0         0 $_begin = $_[$_pos]->{begin}, $_end = $_[$_pos]->{end};
294 0         0 $_params->{$_pid}{sequence} = $_[$_pos];
295             }
296             elsif ($_r eq 'ARRAY') {
297 0         0 $_begin = $_[$_pos]->[0], $_end = $_[$_pos]->[1];
298 0         0 $_params->{$_pid}{sequence} = $_[$_pos];
299             }
300              
301 10         30 last;
302             }
303             }
304              
305             _croak("$_tag: (sequence) is not specified or valid")
306 10 50       35 unless (exists $_params->{$_pid}{sequence});
307 10 50       25 _croak("$_tag: (begin) is not specified for sequence")
308             unless (defined $_begin);
309 10 50       25 _croak("$_tag: (end) is not specified for sequence")
310             unless (defined $_end);
311              
312 10         65 $_params->{$_pid}{sequence_run} = undef;
313              
314 10 50       775 if (defined $_pos) {
315 10         55 pop @_ for ($_pos .. @_ - 1);
316             }
317              
318 10         30 return run(@_);
319             }
320              
321             ###############################################################################
322             ## ----------------------------------------------------------------------------
323             ## Parallel stream with MCE.
324             ##
325             ###############################################################################
326              
327             sub run (@) {
328              
329 59 50 33 59 1 28933 shift if (defined $_[0] && $_[0] eq 'MCE::Stream');
330              
331 59 100       399 my $_pkg = caller() eq 'MCE::Stream' ? caller(1) : caller();
332 59         438 my $_pid = "$$.$_tid.$_pkg";
333              
334 59 50 66     420 if (ref $_[0] eq 'HASH' && !exists $_[0]->{code}) {
335 0 0       0 $_params->{$_pid} = {} unless defined $_params->{$_pid};
336 0         0 for my $_p (keys %{ $_[0] }) {
  0         0  
337 0         0 $_params->{$_pid}{$_p} = $_[0]->{$_p};
338             }
339              
340 0         0 shift;
341             }
342              
343 59 100       141 my $_aref; $_aref = shift if (ref $_[0] eq 'ARRAY');
  59         345  
344              
345 59         108 $_order_id = 1; undef %_tmp;
  59         141  
346              
347 59 100       155 if (defined $_aref) {
348 25         50 $_gather_ref = $_aref; @{ $_aref } = ();
  25         45  
  25         145  
349             } else {
350 34         63 $_gather_ref = undef;
351             }
352              
353             ## -------------------------------------------------------------------------
354              
355 59         118 my (@_code, @_mode, @_name, @_wrks); my $_init_mce = 0; my $_pos = 0;
  59         93  
  59         107  
356 59         225 my $_default_mode = $_def->{$_pkg}{DEFAULT_MODE};
357              
358 59   100     332 while (ref $_[0] eq 'CODE' || ref $_[0] eq 'HASH') {
359 118 100       304 if (ref $_[0] eq 'CODE') {
360 108         234 push @_code, $_[0];
361 108         196 push @_mode, $_default_mode;
362             }
363             else {
364 10 0 33     25 last if (!exists $_[0]->{code} && !exists $_[0]->{mode});
365              
366 10 50       30 push @_code, exists $_[0]->{code} ? $_[0]->{code} : undef;
367 10 50       25 push @_mode, exists $_[0]->{mode} ? $_[0]->{mode} : $_default_mode;
368              
369 10 50       30 unless (ref $_code[-1] eq 'CODE') {
370 0         0 @_ = (); _croak("$_tag: (code) is not valid");
  0         0  
371             }
372 10 50 66     60 if ($_mode[-1] ne 'grep' && $_mode[-1] ne 'map') {
373 0         0 @_ = (); _croak("$_tag: (mode) is not valid");
  0         0  
374             }
375             }
376              
377 118 50       355 if (defined (my $_p = $_params->{$_pid})) {
378             push @_name, (ref $_p->{task_name} eq 'ARRAY')
379 118 50       500 ? $_p->{task_name}->[$_pos] : undef;
380             push @_wrks, (ref $_p->{max_workers} eq 'ARRAY')
381 118 50       402 ? $_p->{max_workers}->[$_pos] : undef;
382             }
383              
384             $_init_mce = 1 if (
385             !defined $_prev_c->{$_pid}[$_pos] ||
386 118 100 66     650 $_prev_c->{$_pid}[$_pos] != $_code[$_pos]
387             );
388             $_init_mce = 1 if (
389             !defined $_prev_m->{$_pid}[$_pos] ||
390 118 100 66     512 $_prev_m->{$_pid}[$_pos] ne $_mode[$_pos]
391             );
392              
393 118 100       338 $_init_mce = 1 if ($_prev_n->{$_pid}[$_pos] ne $_name[$_pos]);
394 118 100       382 $_init_mce = 1 if ($_prev_w->{$_pid}[$_pos] ne $_wrks[$_pos]);
395              
396 118         231 $_prev_c->{$_pid}[$_pos] = $_code[$_pos];
397 118         209 $_prev_m->{$_pid}[$_pos] = $_mode[$_pos];
398 118         185 $_prev_n->{$_pid}[$_pos] = $_name[$_pos];
399 118         295 $_prev_w->{$_pid}[$_pos] = $_wrks[$_pos];
400              
401 118         195 shift; $_pos++;
  118         747  
402             }
403              
404 59 50       314 if (defined $_prev_c->{$_pid}[$_pos]) {
405 0         0 pop @{ $_prev_c->{$_pid} } for ($_pos .. $#{ $_prev_c->{$_pid } });
  0         0  
  0         0  
406 0         0 pop @{ $_prev_m->{$_pid} } for ($_pos .. $#{ $_prev_m->{$_pid } });
  0         0  
  0         0  
407 0         0 pop @{ $_prev_n->{$_pid} } for ($_pos .. $#{ $_prev_n->{$_pid } });
  0         0  
  0         0  
408 0         0 pop @{ $_prev_w->{$_pid} } for ($_pos .. $#{ $_prev_w->{$_pid } });
  0         0  
  0         0  
409              
410 0         0 $_init_mce = 1;
411             }
412              
413 59 50       175 return unless (scalar @_code);
414              
415             ## -------------------------------------------------------------------------
416              
417 59         113 my $_input_data; my $_max_workers = $_def->{$_pkg}{MAX_WORKERS};
  59         315  
418 59         131 my $_r = ref $_[0];
419              
420 59 100 66     590 if (@_ == 1 && $_r =~ /^(?:ARRAY|HASH|SCALAR|GLOB|FileHandle|IO::|Iterator::)/) {
421 10 50       45 _croak("$_tag: (HASH) not allowed as input by this MCE model")
422             if $_r eq 'HASH';
423 10         30 $_input_data = shift;
424             }
425              
426 59 50       393 if (defined (my $_p = $_params->{$_pid})) {
427             $_max_workers = MCE::_parse_max_workers($_p->{max_workers})
428 59 50 33     549 if (exists $_p->{max_workers} && ref $_p->{max_workers} ne 'ARRAY');
429              
430 59 100 100     402 delete $_p->{sequence} if (defined $_input_data || scalar @_);
431 59 50       238 delete $_p->{user_func} if (exists $_p->{user_func});
432 59 50       201 delete $_p->{user_tasks} if (exists $_p->{user_tasks});
433 59 50       516 delete $_p->{use_slurpio} if (exists $_p->{use_slurpio});
434 59 50       191 delete $_p->{bounds_only} if (exists $_p->{bounds_only});
435 59 50       172 delete $_p->{gather} if (exists $_p->{gather});
436             }
437              
438 59 50 33     369 if (@_code > 1 && $_max_workers > 1) {
439 59         362 $_max_workers = int($_max_workers / @_code + 0.5) + 1;
440             }
441              
442 59         187 my $_chunk_size = do {
443 59   50     162 my $_p = $_params->{$_pid} || {};
444             (defined $_p->{init_relay} || defined $_def->{$_pkg}{INIT_RELAY}) ? 1 :
445             MCE::_parse_chunk_size(
446 59 50 33     786 $_def->{$_pkg}{CHUNK_SIZE}, $_max_workers, $_params->{$_pid},
447             $_input_data, scalar @_
448             );
449             };
450              
451 59 50       211 if (defined (my $_p = $_params->{$_pid})) {
452 59 100       167 if (exists $_p->{_file}) {
453 20         65 $_input_data = delete $_p->{_file};
454             } else {
455 39 50       122 $_input_data = $_p->{input_data} if exists $_p->{input_data};
456             }
457             }
458              
459             ## -------------------------------------------------------------------------
460              
461 59         517 MCE::_save_state($_MCE->{$_pid});
462              
463 59 100 66     545 if ($_init_mce || !exists $_queue->{$_pid}) {
464 14 50       46 $_MCE->{$_pid}->shutdown() if (defined $_MCE->{$_pid});
465 14 50       110 $_queue->{$_pid} = [] if (!defined $_queue->{$_pid});
466              
467 14         23 my $_Q = $_queue->{$_pid};
468 14         28 pop(@{ $_Q })->DESTROY for (@_code .. @{ $_Q });
  14         51  
  0         0  
469              
470 14         232 push @{ $_Q }, MCE::Queue->new()
471 14         37 for (@{ $_Q } .. @_code - 2);
  14         37  
472              
473             ## must clear arrays for nested session to work with Perl < v5.14
474 14         111 _gen_user_tasks($_pid, $_Q, [@_code], [@_mode], [@_name], [@_wrks]);
475              
476 14         65 @_code = @_mode = @_name = @_wrks = ();
477              
478             my %_opts = (
479             max_workers => $_max_workers, task_name => $_tag,
480 14         98 user_tasks => $_user_tasks->{$_pid}, task_end => \&_task_end,
481             use_slurpio => 0,
482             );
483              
484 14 50       42 if (defined (my $_p = $_params->{$_pid})) {
485 14         19 local $_;
486              
487 14         23 for (keys %{ $_p }) {
  14         42  
488 28 50       74 next if ($_ eq 'sequence_run');
489 28 100 66     107 next if ($_ eq 'max_workers' && ref $_p->{max_workers} eq 'ARRAY');
490 14 50 33     113 next if ($_ eq 'task_name' && ref $_p->{task_name} eq 'ARRAY');
491 0 0       0 next if ($_ eq 'input_data');
492 0 0       0 next if ($_ eq 'chunk_size');
493 0 0       0 next if ($_ eq 'task_end');
494              
495             _croak("$_tag: ($_) is not a valid constructor argument")
496 0 0       0 unless (exists $MCE::_valid_fields_new{$_});
497              
498 0         0 $_opts{$_} = $_p->{$_};
499             }
500             }
501              
502 14         37 for my $_k (qw/ tmp_dir freeze thaw init_relay use_threads /) {
503             $_opts{$_k} = $_def->{$_pkg}{uc($_k)}
504 70 50 33     266 if (exists $_def->{$_pkg}{uc($_k)} && !exists $_opts{$_k});
505             }
506              
507 14         252 $_MCE->{$_pid} = MCE->new(pkg => $_pkg, %_opts);
508             }
509             else {
510             ## Workers may persist after running. Thus, updating the MCE instance.
511             ## These options do not require respawning.
512 45 50       245 if (defined (my $_p = $_params->{$_pid})) {
513 45         145 for my $_k (qw(
514             RS interval stderr_file stdout_file user_error user_output
515             job_delay submit_delay on_post_exit on_post_run user_args
516             flush_file flush_stderr flush_stdout max_retries
517             )) {
518 675 50       1175 $_MCE->{$_pid}{$_k} = $_p->{$_k} if (exists $_p->{$_k});
519             }
520             }
521             }
522              
523             ## -------------------------------------------------------------------------
524              
525 59 100       200 if (defined $_input_data) {
    100          
526 30         140 @_ = ();
527 30         245 $_MCE->{$_pid}->process({ chunk_size => $_chunk_size }, $_input_data);
528 30         1395 delete $_MCE->{$_pid}{input_data};
529             }
530             elsif (scalar @_) {
531 19         162 $_MCE->{$_pid}->process({ chunk_size => $_chunk_size }, \@_);
532 11         280 delete $_MCE->{$_pid}{input_data};
533             }
534             else {
535 10 50 33     85 if (defined $_params->{$_pid} && exists $_params->{$_pid}{sequence}) {
536             $_MCE->{$_pid}->run({
537             chunk_size => $_chunk_size,
538             sequence => $_params->{$_pid}{sequence}
539 10         100 }, 0);
540 10 50       85 if (exists $_params->{$_pid}{sequence_run}) {
541 10         40 delete $_params->{$_pid}{sequence_run};
542 10         30 delete $_params->{$_pid}{sequence};
543             }
544 10         40 delete $_MCE->{$_pid}{sequence};
545             }
546             }
547              
548 51         502 MCE::_restore_state();
549              
550             # destroy queue(s) if MCE::run requested workers to shutdown
551 51 50       194 if (!$_MCE->{$_pid}{_spawned}) {
552 0         0 $_->DESTROY() for @{ $_queue->{$_pid} };
  0         0  
553 0         0 delete $_queue->{$_pid};
554             }
555              
556 51 100       500 return map { @{ $_ } } delete @_tmp{ 1 .. $_order_id - 1 }
  194         443  
  194         1578  
557             unless (defined $_aref);
558              
559 25         50 $_gather_ref = undef;
560              
561 25         215 return;
562             }
563              
564             ###############################################################################
565             ## ----------------------------------------------------------------------------
566             ## Private methods.
567             ##
568             ###############################################################################
569              
570             sub _croak {
571              
572 0     0   0 goto &MCE::_croak;
573             }
574              
575             sub _gen_user_tasks {
576              
577 14     14   51 my ($_pid, $_queue_ref, $_code_ref, $_mode_ref, $_name_ref, $_wrks_ref) = @_;
578              
579 14         23 @{ $_user_tasks->{$_pid} } = ();
  14         42  
580              
581             ## For the code block farthest to the right.
582              
583 14         37 push @{ $_user_tasks->{$_pid} }, {
584             task_name => $_name_ref->[-1],
585             max_workers => $_wrks_ref->[-1],
586              
587 14         158 gather => (@{ $_code_ref } > 1)
588             ? $_queue_ref->[-1] : \&_preserve_order,
589              
590             user_func => sub {
591 83     83   210 my ($_mce, $_chunk_ref, $_chunk_id) = @_;
592 83         113 my @_a; my $_code = $_code_ref->[-1];
  83         224  
593              
594 83 100       174 if (ref $_chunk_ref) {
595             push @_a, ($_mode_ref->[-1] eq 'map')
596 72         624 ? map { &{ $_code } } @{ $_chunk_ref }
  72         190  
  56         125  
597 65 100       159 : grep { &{ $_code } } @{ $_chunk_ref };
  9         16  
  9         44  
  9         32  
598             }
599             else {
600             push @_a, ($_mode_ref->[-1] eq 'map')
601 18         21 ? map { &{ $_code } } $_chunk_ref
  18         56  
602 18 50       57 : grep { &{ $_code } } $_chunk_ref;
  0         0  
  0         0  
603             }
604              
605 83 50       633 MCE->gather( (@{ $_code_ref } > 1)
  83         828  
606             ? MCE->freeze([ \@_a, $_chunk_id ])
607             : (\@_a, $_chunk_id)
608             );
609             }
610 14 50       23 };
611              
612             ## For in-between code blocks (processed from right to left).
613              
614 14         19 for (my $_i = @{ $_code_ref } - 2; $_i > 0; $_i--) {
  14         60  
615 0         0 my $_pos = $_i;
616              
617 0         0 push @{ $_user_tasks->{$_pid} }, {
618             task_name => $_name_ref->[$_pos],
619             max_workers => $_wrks_ref->[$_pos],
620             gather => $_queue_ref->[$_pos - 1],
621              
622             user_func => sub {
623 0     0   0 my $_q = $_queue_ref->[$_pos];
624              
625 0         0 while (1) {
626 0         0 my $_chunk = $_q->dequeue;
627 0 0       0 last unless (defined $_chunk);
628              
629 0         0 my @_a; my $_code = $_code_ref->[$_pos];
  0         0  
630 0         0 $_chunk = MCE->thaw($_chunk);
631              
632             push @_a, ($_mode_ref->[$_pos] eq 'map')
633 0         0 ? map { &{ $_code } } @{ $_chunk->[0] }
  0         0  
  0         0  
634 0 0       0 : grep { &{ $_code } } @{ $_chunk->[0] };
  0         0  
  0         0  
  0         0  
635              
636 0         0 MCE->gather(MCE->freeze([ \@_a, $_chunk->[1] ]));
637             }
638              
639 0         0 return;
640             }
641 0         0 };
642             }
643              
644             ## For the left-most code block.
645              
646 14 50       14 if (@{ $_code_ref } > 1) {
  14         28  
647              
648 14         107 push @{ $_user_tasks->{$_pid} }, {
649             task_name => $_name_ref->[0],
650             max_workers => $_wrks_ref->[0],
651             gather => \&_preserve_order,
652              
653             user_func => sub {
654 22     22   75 my $_q = $_queue_ref->[0];
655              
656 22         37 while (1) {
657 105         439 my $_chunk = $_q->dequeue;
658 105 100       367 last unless (defined $_chunk);
659              
660 83         148 my @_a; my $_code = $_code_ref->[0];
  83         287  
661 83         803 $_chunk = MCE->thaw($_chunk);
662              
663             push @_a, ($_mode_ref->[0] eq 'map')
664 93         127 ? map { &{ $_code } } @{ $_chunk->[0] }
  93         278  
  83         219  
665 83 50       290 : grep { &{ $_code } } @{ $_chunk->[0] };
  0         0  
  0         0  
  0         0  
666              
667 83         770 MCE->gather(\@_a, $_chunk->[1]);
668             }
669              
670 22         104 return;
671             }
672 14         23 };
673             }
674              
675 14         43 return;
676             }
677              
678             1;
679              
680             __END__
681              
682             ###############################################################################
683             ## ----------------------------------------------------------------------------
684             ## Module usage.
685             ##
686             ###############################################################################
687              
688             =head1 NAME
689              
690             MCE::Stream - Parallel stream model for chaining multiple maps and greps
691              
692             =head1 VERSION
693              
694             This document describes MCE::Stream version 1.902
695              
696             =head1 SYNOPSIS
697              
698             ## Exports mce_stream, mce_stream_f, mce_stream_s
699             use MCE::Stream;
700              
701             my (@m1, @m2, @m3);
702              
703             ## Default mode is map and processed from right-to-left
704             @m1 = mce_stream sub { $_ * 3 }, sub { $_ * 2 }, 1..10000;
705             mce_stream \@m2, sub { $_ * 3 }, sub { $_ * 2 }, 1..10000;
706              
707             ## Native Perl
708             @m3 = map { $_ * $_ } grep { $_ % 5 == 0 } 1..10000;
709              
710             ## Streaming grep and map in parallel
711             mce_stream \@m3,
712             { mode => 'map', code => sub { $_ * $_ } },
713             { mode => 'grep', code => sub { $_ % 5 == 0 } }, 1..10000;
714              
715             ## Array or array_ref
716             my @a = mce_stream sub { $_ * $_ }, 1..10000;
717             my @b = mce_stream sub { $_ * $_ }, \@list;
718              
719             ## Important; pass an array_ref for deeply input data
720             my @c = mce_stream sub { $_->[1] *= 2; $_ }, [ [ 0, 1 ], [ 0, 2 ], ... ];
721             my @d = mce_stream sub { $_->[1] *= 2; $_ }, \@deeply_list;
722              
723             ## File path, glob ref, IO::All::{ File, Pipe, STDIO } obj, or scalar ref
724             ## Workers read directly and not involve the manager process
725             my @e = mce_stream_f sub { chomp; $_ }, "/path/to/file"; # efficient
726              
727             ## Involves the manager process, therefore slower
728             my @f = mce_stream_f sub { chomp; $_ }, $file_handle;
729             my @g = mce_stream_f sub { chomp; $_ }, $io;
730             my @h = mce_stream_f sub { chomp; $_ }, \$scalar;
731              
732             ## Sequence of numbers (begin, end [, step, format])
733             my @i = mce_stream_s sub { $_ * $_ }, 1, 10000, 5;
734             my @j = mce_stream_s sub { $_ * $_ }, [ 1, 10000, 5 ];
735              
736             my @k = mce_stream_s sub { $_ * $_ }, {
737             begin => 1, end => 10000, step => 5, format => undef
738             };
739              
740             =head1 DESCRIPTION
741              
742             This module allows one to stream multiple map and/or grep operations in
743             parallel. Code blocks run simultaneously from right-to-left. The results
744             are appended immediately when providing a reference to an array.
745              
746             ## Appends are serialized, even out-of-order ok, but immediately.
747             ## Out-of-order chunks are held temporarily until ordered chunks
748             ## arrive.
749              
750             mce_stream \@a, sub { $_ }, sub { $_ }, sub { $_ }, 1..10000;
751              
752             ## input
753             ## chunk1 input
754             ## chunk3 chunk2 input
755             ## chunk2 chunk2 chunk3 input
756             ## append1 chunk3 chunk1 chunk4 input
757             ## append2 chunk1 chunk5 chunk5 input
758             ## append3 chunk5 chunk4 chunk6 ...
759             ## append4 chunk4 chunk6 ...
760             ## append5 chunk6 ...
761             ## append6 ...
762             ## ...
763             ##
764              
765             MCE incurs a small overhead due to passing of data. A fast code block will
766             run faster natively when chaining multiple map functions. However, the
767             overhead will likely diminish as the complexity increases for the code.
768              
769             ## 0.334 secs -- baseline using the native map function
770             my @m1 = map { $_ * 4 } map { $_ * 3 } map { $_ * 2 } 1..1000000;
771              
772             ## 0.427 secs -- this is quite amazing considering data passing
773             my @m2 = mce_stream
774             sub { $_ * 4 }, sub { $_ * 3 }, sub { $_ * 2 }, 1..1000000;
775              
776             ## 0.355 secs -- appends to @m3 immediately, not after running
777             my @m3; mce_stream \@m3,
778             sub { $_ * 4 }, sub { $_ * 3 }, sub { $_ * 2 }, 1..1000000;
779              
780             Even faster is mce_stream_s; useful when input data is a range of numbers.
781             Workers generate sequences mathematically among themselves without any
782             interaction from the manager process. Two arguments are required for
783             mce_stream_s (begin, end). Step defaults to 1 if begin is smaller than end,
784             otherwise -1.
785              
786             ## 0.278 secs -- numbers are generated mathematically via sequence
787             my @m4; mce_stream_s \@m4,
788             sub { $_ * 4 }, sub { $_ * 3 }, sub { $_ * 2 }, 1, 1000000;
789              
790             =head1 OVERRIDING DEFAULTS
791              
792             The following list options which may be overridden when loading the module.
793             The fast option is obsolete in 1.867 onwards; ignored if specified.
794              
795             use Sereal qw( encode_sereal decode_sereal );
796             use CBOR::XS qw( encode_cbor decode_cbor );
797             use JSON::XS qw( encode_json decode_json );
798              
799             use MCE::Stream
800             max_workers => 8, # Default 'auto'
801             chunk_size => 500, # Default 'auto'
802             tmp_dir => "/path/to/app/tmp", # $MCE::Signal::tmp_dir
803             freeze => \&encode_sereal, # \&Storable::freeze
804             thaw => \&decode_sereal, # \&Storable::thaw
805             init_relay => 0, # Default undef; MCE 1.882+
806             use_threads => 0, # Default undef; MCE 1.882+
807             default_mode => 'grep', # Default 'map'
808             ;
809              
810             From MCE 1.8 onwards, Sereal 3.015+ is loaded automatically if available.
811             Specify C<< Sereal => 0 >> to use Storable instead.
812              
813             use MCE::Stream Sereal => 0;
814              
815             =head1 CUSTOMIZING MCE
816              
817             =over 3
818              
819             =item MCE::Stream->init ( options )
820              
821             =item MCE::Stream::init { options }
822              
823             =back
824              
825             The init function accepts a hash of MCE options. The gather and bounds_only
826             options, if specified, are ignored due to being used internally by the
827             module (not shown below).
828              
829             In scalar context (API available since 1.897), call C<MCE::Stream->finish>
830             automatically upon leaving the scope or program.
831              
832             use MCE::Stream;
833              
834             my $guard = MCE::Stream->init(
835             chunk_size => 1, max_workers => 4,
836              
837             user_begin => sub {
838             print "## ", MCE->wid, " started\n";
839             },
840              
841             user_end => sub {
842             print "## ", MCE->wid, " completed\n";
843             }
844             );
845              
846             my @a = mce_stream sub { $_ * $_ }, 1..100;
847              
848             print "\n", "@a", "\n";
849              
850             -- Output
851              
852             ## 1 started
853             ## 2 started
854             ## 3 started
855             ## 4 started
856             ## 3 completed
857             ## 1 completed
858             ## 2 completed
859             ## 4 completed
860              
861             1 4 9 16 25 36 49 64 81 100 121 144 169 196 225 256 289 324 361
862             400 441 484 529 576 625 676 729 784 841 900 961 1024 1089 1156
863             1225 1296 1369 1444 1521 1600 1681 1764 1849 1936 2025 2116 2209
864             2304 2401 2500 2601 2704 2809 2916 3025 3136 3249 3364 3481 3600
865             3721 3844 3969 4096 4225 4356 4489 4624 4761 4900 5041 5184 5329
866             5476 5625 5776 5929 6084 6241 6400 6561 6724 6889 7056 7225 7396
867             7569 7744 7921 8100 8281 8464 8649 8836 9025 9216 9409 9604 9801
868             10000
869              
870             Like with MCE::Stream->init above, MCE options may be specified using an
871             anonymous hash for the first argument. Notice how both max_workers and
872             task_name can take an anonymous array for setting values uniquely
873             per each code block.
874              
875             Remember that MCE::Stream processes from right-to-left when setting the
876             individual values.
877              
878             use MCE::Stream;
879              
880             my @a = mce_stream {
881             task_name => [ 'c', 'b', 'a' ],
882             max_workers => [ 2, 4, 3, ],
883              
884             user_end => sub {
885             my ($mce, $task_id, $task_name) = @_;
886             print "$task_id - $task_name completed\n";
887             },
888              
889             task_end => sub {
890             my ($mce, $task_id, $task_name) = @_;
891             MCE->print("$task_id - $task_name ended\n");
892             }
893             },
894             sub { $_ * 4 }, ## 2 workers, named c
895             sub { $_ * 3 }, ## 4 workers, named b
896             sub { $_ * 2 }, 1..10000; ## 3 workers, named a
897              
898             -- Output
899              
900             0 - a completed
901             0 - a completed
902             0 - a completed
903             0 - a ended
904             1 - b completed
905             1 - b completed
906             1 - b completed
907             1 - b completed
908             1 - b ended
909             2 - c completed
910             2 - c completed
911             2 - c ended
912              
913             Note that the anonymous hash, for specifying options, also comes first when
914             passing an array reference.
915              
916             my @a; mce_stream {
917             ...
918             }, \@a, sub { ... }, sub { ... }, 1..10000;
919              
920             =head1 API DOCUMENTATION
921              
922             Scripts using MCE::Stream can be written using the long or short form.
923             The long form becomes relevant when mixing modes. Again, processing
924             occurs from right-to-left.
925              
926             my @m3 = mce_stream
927             { mode => 'map', code => sub { $_ * $_ } },
928             { mode => 'grep', code => sub { $_ % 5 == 0 } }, 1..10000;
929              
930             my @m4; mce_stream \@m4,
931             { mode => 'map', code => sub { $_ * $_ } },
932             { mode => 'grep', code => sub { $_ % 5 == 0 } }, 1..10000;
933              
934             For multiple grep blocks, the short form can be used. Simply specify the
935             default mode for the module. The two valid values for default_mode is 'grep'
936             and 'map'.
937              
938             use MCE::Stream default_mode => 'grep';
939              
940             my @f = mce_stream_f sub { /ending$/ }, sub { /^starting/ }, $file;
941              
942             The following assumes 'map' for default_mode in order to demonstrate all the
943             possibilities for providing input data.
944              
945             =over 3
946              
947             =item MCE::Stream->run ( sub { code }, list )
948              
949             =item mce_stream sub { code }, list
950              
951             =back
952              
953             Input data may be defined using a list or an array reference. Unlike MCE::Loop,
954             Flow, and Step, specifying a hash reference as input data isn't allowed.
955              
956             ## Array or array_ref
957             my @a = mce_stream sub { $_ * 2 }, 1..1000;
958             my @b = mce_stream sub { $_ * 2 }, \@list;
959              
960             ## Important; pass an array_ref for deeply input data
961             my @c = mce_stream sub { $_->[1] *= 2; $_ }, [ [ 0, 1 ], [ 0, 2 ], ... ];
962             my @d = mce_stream sub { $_->[1] *= 2; $_ }, \@deeply_list;
963              
964             ## Not supported
965             my @z = mce_stream sub { ... }, \%hash;
966              
967             =over 3
968              
969             =item MCE::Stream->run_file ( sub { code }, file )
970              
971             =item mce_stream_f sub { code }, file
972              
973             =back
974              
975             The fastest of these is the /path/to/file. Workers communicate the next offset
976             position among themselves with zero interaction by the manager process.
977              
978             C<IO::All> { File, Pipe, STDIO } is supported since MCE 1.845.
979              
980             my @c = mce_stream_f sub { chomp; $_ . "\r\n" }, "/path/to/file"; # faster
981             my @d = mce_stream_f sub { chomp; $_ . "\r\n" }, $file_handle;
982             my @e = mce_stream_f sub { chomp; $_ . "\r\n" }, $io; # IO::All
983             my @f = mce_stream_f sub { chomp; $_ . "\r\n" }, \$scalar;
984              
985             =over 3
986              
987             =item MCE::Stream->run_seq ( sub { code }, $beg, $end [, $step, $fmt ] )
988              
989             =item mce_stream_s sub { code }, $beg, $end [, $step, $fmt ]
990              
991             =back
992              
993             Sequence may be defined as a list, an array reference, or a hash reference.
994             The functions require both begin and end values to run. Step and format are
995             optional. The format is passed to sprintf (% may be omitted below).
996              
997             my ($beg, $end, $step, $fmt) = (10, 20, 0.1, "%4.1f");
998              
999             my @f = mce_stream_s sub { $_ }, $beg, $end, $step, $fmt;
1000             my @g = mce_stream_s sub { $_ }, [ $beg, $end, $step, $fmt ];
1001              
1002             my @h = mce_stream_s sub { $_ }, {
1003             begin => $beg, end => $end, step => $step, format => $fmt
1004             };
1005              
1006             =over 3
1007              
1008             =item MCE::Stream->run ( { input_data => iterator }, sub { code } )
1009              
1010             =item mce_stream { input_data => iterator }, sub { code }
1011              
1012             =back
1013              
1014             An iterator reference may be specified for input_data. The only other way
1015             is to specify input_data via MCE::Stream->init. This prevents MCE::Stream
1016             from configuring the iterator reference as another user task which will
1017             not work.
1018              
1019             Iterators are described under section "SYNTAX for INPUT_DATA" at L<MCE::Core>.
1020              
1021             MCE::Stream->init(
1022             input_data => iterator
1023             );
1024              
1025             my @a = mce_stream sub { $_ * 3 }, sub { $_ * 2 };
1026              
1027             =head1 MANUAL SHUTDOWN
1028              
1029             =over 3
1030              
1031             =item MCE::Stream->finish
1032              
1033             =item MCE::Stream::finish
1034              
1035             =back
1036              
1037             Workers remain persistent as much as possible after running. Shutdown occurs
1038             automatically when the script terminates. Call finish when workers are no
1039             longer needed.
1040              
1041             use MCE::Stream;
1042              
1043             MCE::Stream->init(
1044             chunk_size => 20, max_workers => 'auto'
1045             );
1046              
1047             my @a = mce_stream { ... } 1..100;
1048              
1049             MCE::Stream->finish;
1050              
1051             =head1 INDEX
1052              
1053             L<MCE|MCE>, L<MCE::Core>
1054              
1055             =head1 AUTHOR
1056              
1057             Mario E. Roy, S<E<lt>marioeroy AT gmail DOT comE<gt>>
1058              
1059             =cut
1060