File Coverage

blib/lib/MCE/Stream.pm
Criterion Covered Total %
statement 280 361 77.5
branch 125 240 52.0
condition 52 100 52.0
subroutine 19 22 86.3
pod 5 5 100.0
total 481 728 66.0


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## Parallel stream model for chaining multiple maps and greps.
4             ##
5             ###############################################################################
6              
7             package MCE::Stream;
8              
9 10     10   623607 use strict;
  10         120  
  10         329  
10 10     10   50 use warnings;
  10         20  
  10         432  
11              
12 10     10   67 no warnings qw( threads recursion uninitialized );
  10         11  
  10         703  
13              
14             our $VERSION = '1.889';
15              
16             ## no critic (BuiltinFunctions::ProhibitStringyEval)
17             ## no critic (Subroutines::ProhibitSubroutinePrototypes)
18             ## no critic (TestingAndDebugging::ProhibitNoStrict)
19              
20 10     10   69 use Scalar::Util qw( looks_like_number );
  10         21  
  10         660  
21              
22 10     10   6019 use MCE;
  10         30  
  10         86  
23 10     10   6040 use MCE::Queue;
  10         30  
  10         42  
24              
25             our @CARP_NOT = qw( MCE );
26              
27             my $_tid = $INC{'threads.pm'} ? threads->tid() : 0;
28              
29             sub CLONE {
30 0 0   0   0 $_tid = threads->tid() if $INC{'threads.pm'};
31             }
32              
33             ###############################################################################
34             ## ----------------------------------------------------------------------------
35             ## Import routine.
36             ##
37             ###############################################################################
38              
39             my ($_MCE, $_def, $_params, $_tag) = ({}, {}, {}, 'MCE::Stream');
40             my ($_prev_c, $_prev_m, $_prev_n, $_prev_w) = ({}, {}, {}, {});
41             my ($_user_tasks, $_queue) = ({}, {});
42              
43             sub import {
44 10     10   149 my ($_class, $_pkg) = (shift, caller);
45              
46 10         40 my $_p = $_def->{$_pkg} = {
47             MAX_WORKERS => 'auto',
48             CHUNK_SIZE => 'auto',
49             DEFAULT_MODE => 'map',
50             };
51              
52             ## Import functions.
53 10 50       42 if ($_pkg !~ /^MCE::/) {
54 10     10   80 no strict 'refs'; no warnings 'redefine';
  10     10   19  
  10         309  
  10         58  
  10         23  
  10         45912  
55 10         29 *{ $_pkg.'::mce_stream_f' } = \&run_file;
  10         60  
56 10         29 *{ $_pkg.'::mce_stream_s' } = \&run_seq;
  10         39  
57 10         21 *{ $_pkg.'::mce_stream' } = \&run;
  10         48  
58             }
59              
60             ## Process module arguments.
61 10         51 while ( my $_argument = shift ) {
62 0         0 my $_arg = lc $_argument;
63              
64 0 0       0 $_p->{MAX_WORKERS} = shift, next if ( $_arg eq 'max_workers' );
65 0 0       0 $_p->{CHUNK_SIZE} = shift, next if ( $_arg eq 'chunk_size' );
66 0 0       0 $_p->{TMP_DIR} = shift, next if ( $_arg eq 'tmp_dir' );
67 0 0       0 $_p->{FREEZE} = shift, next if ( $_arg eq 'freeze' );
68 0 0       0 $_p->{THAW} = shift, next if ( $_arg eq 'thaw' );
69 0 0       0 $_p->{INIT_RELAY} = shift, next if ( $_arg eq 'init_relay' );
70 0 0       0 $_p->{USE_THREADS} = shift, next if ( $_arg eq 'use_threads' );
71 0 0       0 $_p->{DEFAULT_MODE} = shift, next if ( $_arg eq 'default_mode' );
72              
73 0 0       0 shift, next if ( $_arg eq 'fast' ); # ignored
74              
75             ## Sereal 3.015+, if available, is used automatically by MCE 1.8+.
76 0 0       0 if ( $_arg eq 'sereal' ) {
77 0 0       0 if ( shift eq '0' ) {
78 0         0 require Storable;
79 0         0 $_p->{FREEZE} = \&Storable::freeze;
80 0         0 $_p->{THAW} = \&Storable::thaw;
81             }
82 0         0 next;
83             }
84              
85 0         0 _croak("Error: ($_argument) invalid module option");
86             }
87              
88             _croak("Error: (DEFAULT_MODE) is not valid")
89 10 50 33     88 if ($_p->{DEFAULT_MODE} ne 'grep' && $_p->{DEFAULT_MODE} ne 'map');
90              
91 10         59 $_p->{MAX_WORKERS} = MCE::_parse_max_workers($_p->{MAX_WORKERS});
92              
93 10         57 MCE::_validate_number($_p->{MAX_WORKERS}, 'MAX_WORKERS', $_tag);
94             MCE::_validate_number($_p->{CHUNK_SIZE}, 'CHUNK_SIZE', $_tag)
95 10 50       30 unless ($_p->{CHUNK_SIZE} eq 'auto');
96              
97 10         245 return;
98             }
99              
100             ###############################################################################
101             ## ----------------------------------------------------------------------------
102             ## Gather callback to ensure chunk order is preserved during gathering.
103             ## Also, the task end callback for when a task completes.
104             ##
105             ###############################################################################
106              
107             my ($_gather_ref, $_order_id, %_tmp);
108              
109             sub _preserve_order {
110              
111 379     379   1120 $_tmp{$_[1]} = $_[0];
112              
113 379 100       753 if (defined $_gather_ref) {
114 185         275 while (1) {
115 370 100       775 last unless exists $_tmp{$_order_id};
116 185         230 push @{ $_gather_ref }, @{ delete $_tmp{$_order_id++} };
  185         260  
  185         540  
117             }
118             }
119             else {
120 194         251 $_order_id++;
121             }
122              
123 379         772 return;
124             }
125              
126             sub _task_end {
127              
128 102     102   246 my ($_mce, $_task_id, $_task_name) = @_;
129 102         286 my $_pid = $_mce->{_init_pid}.'.'.$_mce->{_caller};
130              
131 102 100       253 if (defined $_mce->{user_tasks}->[$_task_id + 1]) {
132 51         113 my $n_workers = $_mce->{user_tasks}->[$_task_id + 1]->{max_workers};
133 51         62 my $_id = @{ $_queue->{$_pid} } - $_task_id - 1;
  51         134  
134              
135 51         220 $_queue->{$_pid}[$_id]->enqueue((undef) x $n_workers);
136             }
137              
138             $_params->{task_end}->($_mce, $_task_id, $_task_name)
139 102 50 33     255 if (exists $_params->{task_end} && ref $_params->{task_end} eq 'CODE');
140              
141 102         256 return;
142             }
143              
144             ###############################################################################
145             ## ----------------------------------------------------------------------------
146             ## Init and finish routines.
147             ##
148             ###############################################################################
149              
150             sub init (@) {
151              
152 9 50 33 9 1 1566 shift if (defined $_[0] && $_[0] eq 'MCE::Stream');
153 9         72 my $_pkg = "$$.$_tid.".caller();
154              
155 9 50       81 $_params->{$_pkg} = (ref $_[0] eq 'HASH') ? shift : { @_ };
156              
157             _croak("$_tag: (HASH) not allowed as input by this MCE model")
158 9 50       45 if ( ref $_params->{$_pkg}{input_data} eq 'HASH' );
159              
160 9         27 @_ = ();
161              
162 9         27 return;
163             }
164              
165             sub finish (@) {
166              
167 24 50 33 24 1 6194 shift if (defined $_[0] && $_[0] eq 'MCE::Stream');
168 24 100       161 my $_pkg = (defined $_[0]) ? shift : "$$.$_tid.".caller();
169              
170 24 100 66     391 if ( $_pkg eq 'MCE' ) {
    100          
171 10         22 for my $_k ( keys %{ $_MCE } ) { MCE::Stream->finish($_k, 1); }
  10         197  
  8         82  
172             }
173             elsif ( $_MCE->{$_pkg} && $_MCE->{$_pkg}{_init_pid} eq "$$.$_tid" ) {
174 6 50       146 $_MCE->{$_pkg}->shutdown(@_) if $_MCE->{$_pkg}{_spawned};
175 6         33 $_gather_ref = $_order_id = undef, undef %_tmp;
176              
177 6         27 delete $_user_tasks->{$_pkg};
178 6         44 delete $_prev_c->{$_pkg};
179 6         37 delete $_prev_m->{$_pkg};
180 6         32 delete $_prev_n->{$_pkg};
181 6         19 delete $_prev_w->{$_pkg};
182 6         42 delete $_MCE->{$_pkg};
183              
184 6 50       41 if (defined $_queue->{$_pkg}) {
185 6         17 local $_;
186 6         13 $_->DESTROY() for (@{ $_queue->{$_pkg} });
  6         57  
187 6         44 delete $_queue->{$_pkg};
188             }
189             }
190              
191 24         131 @_ = ();
192              
193 24         85 return;
194             }
195              
196             ###############################################################################
197             ## ----------------------------------------------------------------------------
198             ## Parallel stream with MCE -- file.
199             ##
200             ###############################################################################
201              
202             sub run_file (@) {
203              
204 20 50 33 20 1 15195 shift if (defined $_[0] && $_[0] eq 'MCE::Stream');
205              
206 20 50       45 my ($_file, $_pos); my $_start_pos = (ref $_[0] eq 'HASH') ? 2 : 1;
  20         60  
207 20         85 my $_pid = "$$.$_tid.".caller();
208              
209 20 50       70 if (defined (my $_p = $_params->{$_pid})) {
210 20 50       45 delete $_p->{input_data} if (exists $_p->{input_data});
211 20 50       205 delete $_p->{sequence} if (exists $_p->{sequence});
212             }
213             else {
214 0         0 $_params->{$_pid} = {};
215             }
216              
217 20         115 for my $_i ($_start_pos .. @_ - 1) {
218 50         125 my $_r = ref $_[$_i];
219 50 100 66     320 if ($_r eq '' || $_r eq 'SCALAR' || $_r =~ /^(?:GLOB|FileHandle|IO::)/) {
      100        
220 20         40 $_file = $_[$_i]; $_pos = $_i;
  20         25  
221 20         45 last;
222             }
223             }
224              
225 20 100 66     250 if (defined $_file && ref $_file eq '' && $_file ne '') {
    50 66        
      33        
226 10 50       240 _croak("$_tag: ($_file) does not exist") unless (-e $_file);
227 10 50       155 _croak("$_tag: ($_file) is not readable") unless (-r $_file);
228 10 50       110 _croak("$_tag: ($_file) is not a plain file") unless (-f $_file);
229 10         40 $_params->{$_pid}{_file} = $_file;
230             }
231             elsif (ref $_file eq 'SCALAR' || ref($_file) =~ /^(?:GLOB|FileHandle|IO::)/) {
232 10         30 $_params->{$_pid}{_file} = $_file;
233             }
234             else {
235 0         0 _croak("$_tag: (file) is not specified or valid");
236             }
237              
238 20 50       55 if (defined $_pos) {
239 20         80 pop @_ for ($_pos .. @_ - 1);
240             }
241              
242 20         55 return run(@_);
243             }
244              
245             ###############################################################################
246             ## ----------------------------------------------------------------------------
247             ## Parallel stream with MCE -- sequence.
248             ##
249             ###############################################################################
250              
251             sub run_seq (@) {
252              
253 10 50 33 10 1 7615 shift if (defined $_[0] && $_[0] eq 'MCE::Stream');
254              
255 10 50       30 my ($_begin, $_end, $_pos); my $_start_pos = (ref $_[0] eq 'HASH') ? 2 : 1;
  10         40  
256 10         50 my $_pid = "$$.$_tid.".caller();
257              
258 10 50       40 if (defined (my $_p = $_params->{$_pid})) {
259 10 50       40 delete $_p->{sequence} if (exists $_p->{sequence});
260 10 50       25 delete $_p->{input_data} if (exists $_p->{input_data});
261 10 50       40 delete $_p->{_file} if (exists $_p->{_file});
262             }
263             else {
264 0         0 $_params->{$_pid} = {};
265             }
266              
267 10         110 for my $_i ($_start_pos .. @_ - 1) {
268 25         70 my $_r = ref $_[$_i];
269              
270 25 50 66     235 if ($_r eq '' || $_r =~ /^Math::/ || $_r eq 'HASH' || $_r eq 'ARRAY') {
      66        
      33        
271 10         20 $_pos = $_i;
272              
273 10 50 33     45 if ($_r eq '' || $_r =~ /^Math::/) {
    0          
    0          
274 10         30 $_begin = $_[$_pos], $_end = $_[$_pos + 1];
275             $_params->{$_pid}{sequence} = [
276 10         35 $_[$_pos], $_[$_pos + 1], $_[$_pos + 2], $_[$_pos + 3]
277             ];
278             }
279             elsif ($_r eq 'HASH') {
280 0         0 $_begin = $_[$_pos]->{begin}, $_end = $_[$_pos]->{end};
281 0         0 $_params->{$_pid}{sequence} = $_[$_pos];
282             }
283             elsif ($_r eq 'ARRAY') {
284 0         0 $_begin = $_[$_pos]->[0], $_end = $_[$_pos]->[1];
285 0         0 $_params->{$_pid}{sequence} = $_[$_pos];
286             }
287              
288 10         30 last;
289             }
290             }
291              
292             _croak("$_tag: (sequence) is not specified or valid")
293 10 50       40 unless (exists $_params->{$_pid}{sequence});
294 10 50       25 _croak("$_tag: (begin) is not specified for sequence")
295             unless (defined $_begin);
296 10 50       30 _croak("$_tag: (end) is not specified for sequence")
297             unless (defined $_end);
298              
299 10         20 $_params->{$_pid}{sequence_run} = undef;
300              
301 10 50       25 if (defined $_pos) {
302 10         50 pop @_ for ($_pos .. @_ - 1);
303             }
304              
305 10         25 return run(@_);
306             }
307              
308             ###############################################################################
309             ## ----------------------------------------------------------------------------
310             ## Parallel stream with MCE.
311             ##
312             ###############################################################################
313              
314             sub run (@) {
315              
316 59 50 33 59 1 18321 shift if (defined $_[0] && $_[0] eq 'MCE::Stream');
317              
318 59 100       250 my $_pkg = caller() eq 'MCE::Stream' ? caller(1) : caller();
319 59         227 my $_pid = "$$.$_tid.$_pkg";
320              
321 59 50 66     230 if (ref $_[0] eq 'HASH' && !exists $_[0]->{code}) {
322 0 0       0 $_params->{$_pid} = {} unless defined $_params->{$_pid};
323 0         0 for my $_p (keys %{ $_[0] }) {
  0         0  
324 0         0 $_params->{$_pid}{$_p} = $_[0]->{$_p};
325             }
326              
327 0         0 shift;
328             }
329              
330 59 100       117 my $_aref; $_aref = shift if (ref $_[0] eq 'ARRAY');
  59         162  
331              
332 59         107 $_order_id = 1; undef %_tmp;
  59         138  
333              
334 59 100       161 if (defined $_aref) {
335 25         35 $_gather_ref = $_aref; @{ $_aref } = ();
  25         40  
  25         70  
336             } else {
337 34         44 $_gather_ref = undef;
338             }
339              
340             ## -------------------------------------------------------------------------
341              
342 59         122 my (@_code, @_mode, @_name, @_wrks); my $_init_mce = 0; my $_pos = 0;
  59         97  
  59         89  
343 59         160 my $_default_mode = $_def->{$_pkg}{DEFAULT_MODE};
344              
345 59   100     210 while (ref $_[0] eq 'CODE' || ref $_[0] eq 'HASH') {
346 118 100       234 if (ref $_[0] eq 'CODE') {
347 108         147 push @_code, $_[0];
348 108         190 push @_mode, $_default_mode;
349             }
350             else {
351 10 0 33     30 last if (!exists $_[0]->{code} && !exists $_[0]->{mode});
352              
353 10 50       50 push @_code, exists $_[0]->{code} ? $_[0]->{code} : undef;
354 10 50       35 push @_mode, exists $_[0]->{mode} ? $_[0]->{mode} : $_default_mode;
355              
356 10 50       35 unless (ref $_code[-1] eq 'CODE') {
357 0         0 @_ = (); _croak("$_tag: (code) is not valid");
  0         0  
358             }
359 10 50 66     70 if ($_mode[-1] ne 'grep' && $_mode[-1] ne 'map') {
360 0         0 @_ = (); _croak("$_tag: (mode) is not valid");
  0         0  
361             }
362             }
363              
364 118 50       284 if (defined (my $_p = $_params->{$_pid})) {
365             push @_name, (ref $_p->{task_name} eq 'ARRAY')
366 118 50       337 ? $_p->{task_name}->[$_pos] : undef;
367             push @_wrks, (ref $_p->{max_workers} eq 'ARRAY')
368 118 50       288 ? $_p->{max_workers}->[$_pos] : undef;
369             }
370              
371             $_init_mce = 1 if (
372             !defined $_prev_c->{$_pid}[$_pos] ||
373 118 100 66     542 $_prev_c->{$_pid}[$_pos] != $_code[$_pos]
374             );
375             $_init_mce = 1 if (
376             !defined $_prev_m->{$_pid}[$_pos] ||
377 118 100 66     473 $_prev_m->{$_pid}[$_pos] ne $_mode[$_pos]
378             );
379              
380 118 100       296 $_init_mce = 1 if ($_prev_n->{$_pid}[$_pos] ne $_name[$_pos]);
381 118 100       289 $_init_mce = 1 if ($_prev_w->{$_pid}[$_pos] ne $_wrks[$_pos]);
382              
383 118         235 $_prev_c->{$_pid}[$_pos] = $_code[$_pos];
384 118         196 $_prev_m->{$_pid}[$_pos] = $_mode[$_pos];
385 118         201 $_prev_n->{$_pid}[$_pos] = $_name[$_pos];
386 118         185 $_prev_w->{$_pid}[$_pos] = $_wrks[$_pos];
387              
388 118         147 shift; $_pos++;
  118         427  
389             }
390              
391 59 50       216 if (defined $_prev_c->{$_pid}[$_pos]) {
392 0         0 pop @{ $_prev_c->{$_pid} } for ($_pos .. $#{ $_prev_c->{$_pid } });
  0         0  
  0         0  
393 0         0 pop @{ $_prev_m->{$_pid} } for ($_pos .. $#{ $_prev_m->{$_pid } });
  0         0  
  0         0  
394 0         0 pop @{ $_prev_n->{$_pid} } for ($_pos .. $#{ $_prev_n->{$_pid } });
  0         0  
  0         0  
395 0         0 pop @{ $_prev_w->{$_pid} } for ($_pos .. $#{ $_prev_w->{$_pid } });
  0         0  
  0         0  
396              
397 0         0 $_init_mce = 1;
398             }
399              
400 59 50       141 return unless (scalar @_code);
401              
402             ## -------------------------------------------------------------------------
403              
404 59         88 my $_input_data; my $_max_workers = $_def->{$_pkg}{MAX_WORKERS};
  59         126  
405 59         112 my $_r = ref $_[0];
406              
407 59 100 66     426 if (@_ == 1 && $_r =~ /^(?:ARRAY|HASH|SCALAR|GLOB|FileHandle|IO::)/) {
408 10 50       35 _croak("$_tag: (HASH) not allowed as input by this MCE model")
409             if $_r eq 'HASH';
410 10         20 $_input_data = shift;
411             }
412              
413 59 50       170 if (defined (my $_p = $_params->{$_pid})) {
414             $_max_workers = MCE::_parse_max_workers($_p->{max_workers})
415 59 50 33     317 if (exists $_p->{max_workers} && ref $_p->{max_workers} ne 'ARRAY');
416              
417 59 100 100     279 delete $_p->{sequence} if (defined $_input_data || scalar @_);
418 59 50       137 delete $_p->{user_func} if (exists $_p->{user_func});
419 59 50       176 delete $_p->{user_tasks} if (exists $_p->{user_tasks});
420 59 50       222 delete $_p->{use_slurpio} if (exists $_p->{use_slurpio});
421 59 50       145 delete $_p->{bounds_only} if (exists $_p->{bounds_only});
422 59 50       112 delete $_p->{gather} if (exists $_p->{gather});
423             }
424              
425 59 50 33     278 if (@_code > 1 && $_max_workers > 1) {
426 59         220 $_max_workers = int($_max_workers / @_code + 0.5) + 1;
427             }
428              
429 59         107 my $_chunk_size = do {
430 59   50     166 my $_p = $_params->{$_pid} || {};
431             (defined $_p->{init_relay} || defined $_def->{$_pkg}{INIT_RELAY}) ? 1 :
432             MCE::_parse_chunk_size(
433 59 50 33     562 $_def->{$_pkg}{CHUNK_SIZE}, $_max_workers, $_params->{$_pid},
434             $_input_data, scalar @_
435             );
436             };
437              
438 59 50       205 if (defined (my $_p = $_params->{$_pid})) {
439 59 100       132 if (exists $_p->{_file}) {
440 20         55 $_input_data = delete $_p->{_file};
441             } else {
442 39 50       96 $_input_data = $_p->{input_data} if exists $_p->{input_data};
443             }
444             }
445              
446             ## -------------------------------------------------------------------------
447              
448 59         234 MCE::_save_state($_MCE->{$_pid});
449              
450 59 100 66     240 if ($_init_mce || !exists $_queue->{$_pid}) {
451 14 50       42 $_MCE->{$_pid}->shutdown() if (defined $_MCE->{$_pid});
452 14 50       137 $_queue->{$_pid} = [] if (!defined $_queue->{$_pid});
453              
454 14         42 my $_Q = $_queue->{$_pid};
455 14         37 pop(@{ $_Q })->DESTROY for (@_code .. @{ $_Q });
  14         51  
  0         0  
456              
457 14         144 push @{ $_Q }, MCE::Queue->new()
458 14         37 for (@{ $_Q } .. @_code - 2);
  14         46  
459              
460             ## must clear arrays for nested session to work with Perl < v5.14
461 14         107 _gen_user_tasks($_pid, $_Q, [@_code], [@_mode], [@_name], [@_wrks]);
462              
463 14         61 @_code = @_mode = @_name = @_wrks = ();
464              
465             my %_opts = (
466             max_workers => $_max_workers, task_name => $_tag,
467 14         103 user_tasks => $_user_tasks->{$_pid}, task_end => \&_task_end,
468             use_slurpio => 0,
469             );
470              
471 14 50       56 if (defined (my $_p = $_params->{$_pid})) {
472 14         37 local $_;
473              
474 14         28 for (keys %{ $_p }) {
  14         340  
475 28 50       120 next if ($_ eq 'sequence_run');
476 28 100 66     188 next if ($_ eq 'max_workers' && ref $_p->{max_workers} eq 'ARRAY');
477 14 50 33     223 next if ($_ eq 'task_name' && ref $_p->{task_name} eq 'ARRAY');
478 0 0       0 next if ($_ eq 'input_data');
479 0 0       0 next if ($_ eq 'chunk_size');
480 0 0       0 next if ($_ eq 'task_end');
481              
482             _croak("$_tag: ($_) is not a valid constructor argument")
483 0 0       0 unless (exists $MCE::_valid_fields_new{$_});
484              
485 0         0 $_opts{$_} = $_p->{$_};
486             }
487             }
488              
489 14         128 for my $_k (qw/ tmp_dir freeze thaw init_relay use_threads /) {
490             $_opts{$_k} = $_def->{$_pkg}{uc($_k)}
491 70 50 33     306 if (exists $_def->{$_pkg}{uc($_k)} && !exists $_opts{$_k});
492             }
493              
494 14         154 $_MCE->{$_pid} = MCE->new(pkg => $_pkg, %_opts);
495             }
496             else {
497             ## Workers may persist after running. Thus, updating the MCE instance.
498             ## These options do not require respawning.
499 45 50       125 if (defined (my $_p = $_params->{$_pid})) {
500 45         145 for my $_k (qw(
501             RS interval stderr_file stdout_file user_error user_output
502             job_delay submit_delay on_post_exit on_post_run user_args
503             flush_file flush_stderr flush_stdout max_retries
504             )) {
505 675 50       1200 $_MCE->{$_pid}{$_k} = $_p->{$_k} if (exists $_p->{$_k});
506             }
507             }
508             }
509              
510             ## -------------------------------------------------------------------------
511              
512 59 100       266 if (defined $_input_data) {
    100          
513 30         60 @_ = ();
514 30         225 $_MCE->{$_pid}->process({ chunk_size => $_chunk_size }, $_input_data);
515 30         130 delete $_MCE->{$_pid}{input_data};
516             }
517             elsif (scalar @_) {
518 19         161 $_MCE->{$_pid}->process({ chunk_size => $_chunk_size }, \@_);
519 11         67 delete $_MCE->{$_pid}{input_data};
520             }
521             else {
522 10 50 33     275 if (defined $_params->{$_pid} && exists $_params->{$_pid}{sequence}) {
523             $_MCE->{$_pid}->run({
524             chunk_size => $_chunk_size,
525             sequence => $_params->{$_pid}{sequence}
526 10         100 }, 0);
527 10 50       65 if (exists $_params->{$_pid}{sequence_run}) {
528 10         25 delete $_params->{$_pid}{sequence_run};
529 10         20 delete $_params->{$_pid}{sequence};
530             }
531 10         25 delete $_MCE->{$_pid}{sequence};
532             }
533             }
534              
535 51         249 MCE::_restore_state();
536              
537             # destroy queue(s) if MCE::run requested workers to shutdown
538 51 50       125 if (!$_MCE->{$_pid}{_spawned}) {
539 0         0 $_->DESTROY() for @{ $_queue->{$_pid} };
  0         0  
540 0         0 delete $_queue->{$_pid};
541             }
542              
543 51 100       300 return map { @{ $_ } } delete @_tmp{ 1 .. $_order_id - 1 }
  194         226  
  194         694  
544             unless (defined $_aref);
545              
546 25         45 $_gather_ref = undef;
547              
548 25         115 return;
549             }
550              
551             ###############################################################################
552             ## ----------------------------------------------------------------------------
553             ## Private methods.
554             ##
555             ###############################################################################
556              
557             sub _croak {
558              
559 0     0   0 goto &MCE::_croak;
560             }
561              
562             sub _gen_user_tasks {
563              
564 14     14   74 my ($_pid, $_queue_ref, $_code_ref, $_mode_ref, $_name_ref, $_wrks_ref) = @_;
565              
566 14         37 @{ $_user_tasks->{$_pid} } = ();
  14         56  
567              
568             ## For the code block farthest to the right.
569              
570 14         56 push @{ $_user_tasks->{$_pid} }, {
571             task_name => $_name_ref->[-1],
572             max_workers => $_wrks_ref->[-1],
573              
574 14         154 gather => (@{ $_code_ref } > 1)
575             ? $_queue_ref->[-1] : \&_preserve_order,
576              
577             user_func => sub {
578 83     83   163 my ($_mce, $_chunk_ref, $_chunk_id) = @_;
579 83         133 my @_a; my $_code = $_code_ref->[-1];
  83         166  
580              
581 83 100       184 if (ref $_chunk_ref) {
582             push @_a, ($_mode_ref->[-1] eq 'map')
583 72         132 ? map { &{ $_code } } @{ $_chunk_ref }
  72         198  
  56         9456  
584 65 100       177 : grep { &{ $_code } } @{ $_chunk_ref };
  9         13  
  9         22  
  9         21  
585             }
586             else {
587             push @_a, ($_mode_ref->[-1] eq 'map')
588 18         26 ? map { &{ $_code } } $_chunk_ref
  18         55  
589 18 50       54 : grep { &{ $_code } } $_chunk_ref;
  0         0  
  0         0  
590             }
591              
592 83 50       596 MCE->gather( (@{ $_code_ref } > 1)
  83         570  
593             ? MCE->freeze([ \@_a, $_chunk_id ])
594             : (\@_a, $_chunk_id)
595             );
596             }
597 14 50       28 };
598              
599             ## For in-between code blocks (processed from right to left).
600              
601 14         42 for (my $_i = @{ $_code_ref } - 2; $_i > 0; $_i--) {
  14         70  
602 0         0 my $_pos = $_i;
603              
604 0         0 push @{ $_user_tasks->{$_pid} }, {
605             task_name => $_name_ref->[$_pos],
606             max_workers => $_wrks_ref->[$_pos],
607             gather => $_queue_ref->[$_pos - 1],
608              
609             user_func => sub {
610 0     0   0 my $_q = $_queue_ref->[$_pos];
611              
612 0         0 while (1) {
613 0         0 my $_chunk = $_q->dequeue;
614 0 0       0 last unless (defined $_chunk);
615              
616 0         0 my @_a; my $_code = $_code_ref->[$_pos];
  0         0  
617 0         0 $_chunk = MCE->thaw($_chunk);
618              
619             push @_a, ($_mode_ref->[$_pos] eq 'map')
620 0         0 ? map { &{ $_code } } @{ $_chunk->[0] }
  0         0  
  0         0  
621 0 0       0 : grep { &{ $_code } } @{ $_chunk->[0] };
  0         0  
  0         0  
  0         0  
622              
623 0         0 MCE->gather(MCE->freeze([ \@_a, $_chunk->[1] ]));
624             }
625              
626 0         0 return;
627             }
628 0         0 };
629             }
630              
631             ## For the left-most code block.
632              
633 14 50       23 if (@{ $_code_ref } > 1) {
  14         51  
634              
635 14         135 push @{ $_user_tasks->{$_pid} }, {
636             task_name => $_name_ref->[0],
637             max_workers => $_wrks_ref->[0],
638             gather => \&_preserve_order,
639              
640             user_func => sub {
641 22     22   52 my $_q = $_queue_ref->[0];
642              
643 22         29 while (1) {
644 105         409 my $_chunk = $_q->dequeue;
645 105 100       339 last unless (defined $_chunk);
646              
647 83         128 my @_a; my $_code = $_code_ref->[0];
  83         187  
648 83         694 $_chunk = MCE->thaw($_chunk);
649              
650             push @_a, ($_mode_ref->[0] eq 'map')
651 93         143 ? map { &{ $_code } } @{ $_chunk->[0] }
  93         269  
  83         240  
652 83 50       315 : grep { &{ $_code } } @{ $_chunk->[0] };
  0         0  
  0         0  
  0         0  
653              
654 83         662 MCE->gather(\@_a, $_chunk->[1]);
655             }
656              
657 22         64 return;
658             }
659 14         28 };
660             }
661              
662 14         51 return;
663             }
664              
665             1;
666              
667             __END__