File Coverage

blib/lib/MCE/Stream.pm
Criterion Covered Total %
statement 280 361 77.5
branch 125 240 52.0
condition 52 100 52.0
subroutine 19 22 86.3
pod 5 5 100.0
total 481 728 66.0


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## Parallel stream model for chaining multiple maps and greps.
4             ##
5             ###############################################################################
6              
7             package MCE::Stream;
8              
9 10     10   643214 use strict;
  10         101  
  10         292  
10 10     10   61 use warnings;
  10         119  
  10         407  
11              
12 10     10   59 no warnings qw( threads recursion uninitialized );
  10         20  
  10         687  
13              
14             our $VERSION = '1.888';
15              
16             ## no critic (BuiltinFunctions::ProhibitStringyEval)
17             ## no critic (Subroutines::ProhibitSubroutinePrototypes)
18             ## no critic (TestingAndDebugging::ProhibitNoStrict)
19              
20 10     10   59 use Scalar::Util qw( looks_like_number );
  10         29  
  10         620  
21              
22 10     10   6485 use MCE;
  10         29  
  10         59  
23 10     10   6433 use MCE::Queue;
  10         29  
  10         70  
24              
25             our @CARP_NOT = qw( MCE );
26              
27             my $_tid = $INC{'threads.pm'} ? threads->tid() : 0;
28              
29             sub CLONE {
30 0 0   0   0 $_tid = threads->tid() if $INC{'threads.pm'};
31             }
32              
33             ###############################################################################
34             ## ----------------------------------------------------------------------------
35             ## Import routine.
36             ##
37             ###############################################################################
38              
39             my ($_MCE, $_def, $_params, $_tag) = ({}, {}, {}, 'MCE::Stream');
40             my ($_prev_c, $_prev_m, $_prev_n, $_prev_w) = ({}, {}, {}, {});
41             my ($_user_tasks, $_queue) = ({}, {});
42              
43             sub import {
44 10     10   158 my ($_class, $_pkg) = (shift, caller);
45              
46 10         40 my $_p = $_def->{$_pkg} = {
47             MAX_WORKERS => 'auto',
48             CHUNK_SIZE => 'auto',
49             DEFAULT_MODE => 'map',
50             };
51              
52             ## Import functions.
53 10 50       40 if ($_pkg !~ /^MCE::/) {
54 10     10   79 no strict 'refs'; no warnings 'redefine';
  10     10   20  
  10         308  
  10         87  
  10         20  
  10         47928  
55 10         21 *{ $_pkg.'::mce_stream_f' } = \&run_file;
  10         88  
56 10         74 *{ $_pkg.'::mce_stream_s' } = \&run_seq;
  10         40  
57 10         20 *{ $_pkg.'::mce_stream' } = \&run;
  10         42  
58             }
59              
60             ## Process module arguments.
61 10         77 while ( my $_argument = shift ) {
62 0         0 my $_arg = lc $_argument;
63              
64 0 0       0 $_p->{MAX_WORKERS} = shift, next if ( $_arg eq 'max_workers' );
65 0 0       0 $_p->{CHUNK_SIZE} = shift, next if ( $_arg eq 'chunk_size' );
66 0 0       0 $_p->{TMP_DIR} = shift, next if ( $_arg eq 'tmp_dir' );
67 0 0       0 $_p->{FREEZE} = shift, next if ( $_arg eq 'freeze' );
68 0 0       0 $_p->{THAW} = shift, next if ( $_arg eq 'thaw' );
69 0 0       0 $_p->{INIT_RELAY} = shift, next if ( $_arg eq 'init_relay' );
70 0 0       0 $_p->{USE_THREADS} = shift, next if ( $_arg eq 'use_threads' );
71 0 0       0 $_p->{DEFAULT_MODE} = shift, next if ( $_arg eq 'default_mode' );
72              
73 0 0       0 shift, next if ( $_arg eq 'fast' ); # ignored
74              
75             ## Sereal 3.015+, if available, is used automatically by MCE 1.8+.
76 0 0       0 if ( $_arg eq 'sereal' ) {
77 0 0       0 if ( shift eq '0' ) {
78 0         0 require Storable;
79 0         0 $_p->{FREEZE} = \&Storable::freeze;
80 0         0 $_p->{THAW} = \&Storable::thaw;
81             }
82 0         0 next;
83             }
84              
85 0         0 _croak("Error: ($_argument) invalid module option");
86             }
87              
88             _croak("Error: (DEFAULT_MODE) is not valid")
89 10 50 33     71 if ($_p->{DEFAULT_MODE} ne 'grep' && $_p->{DEFAULT_MODE} ne 'map');
90              
91 10         43 $_p->{MAX_WORKERS} = MCE::_parse_max_workers($_p->{MAX_WORKERS});
92              
93 10         47 MCE::_validate_number($_p->{MAX_WORKERS}, 'MAX_WORKERS', $_tag);
94             MCE::_validate_number($_p->{CHUNK_SIZE}, 'CHUNK_SIZE', $_tag)
95 10 50       31 unless ($_p->{CHUNK_SIZE} eq 'auto');
96              
97 10         218 return;
98             }
99              
100             ###############################################################################
101             ## ----------------------------------------------------------------------------
102             ## Gather callback to ensure chunk order is preserved during gathering.
103             ## Also, the task end callback for when a task completes.
104             ##
105             ###############################################################################
106              
107             my ($_gather_ref, $_order_id, %_tmp);
108              
109             sub _preserve_order {
110              
111 379     379   43434 $_tmp{$_[1]} = $_[0];
112              
113 379 100       897 if (defined $_gather_ref) {
114 185         265 while (1) {
115 370 100       945 last unless exists $_tmp{$_order_id};
116 185         285 push @{ $_gather_ref }, @{ delete $_tmp{$_order_id++} };
  185         345  
  185         745  
117             }
118             }
119             else {
120 194         324 $_order_id++;
121             }
122              
123 379         1326 return;
124             }
125              
126             sub _task_end {
127              
128 102     102   241 my ($_mce, $_task_id, $_task_name) = @_;
129 102         360 my $_pid = $_mce->{_init_pid}.'.'.$_mce->{_caller};
130              
131 102 100       292 if (defined $_mce->{user_tasks}->[$_task_id + 1]) {
132 51         148 my $n_workers = $_mce->{user_tasks}->[$_task_id + 1]->{max_workers};
133 51         87 my $_id = @{ $_queue->{$_pid} } - $_task_id - 1;
  51         140  
134              
135 51         305 $_queue->{$_pid}[$_id]->enqueue((undef) x $n_workers);
136             }
137              
138             $_params->{task_end}->($_mce, $_task_id, $_task_name)
139 102 50 33     450 if (exists $_params->{task_end} && ref $_params->{task_end} eq 'CODE');
140              
141 102         320 return;
142             }
143              
144             ###############################################################################
145             ## ----------------------------------------------------------------------------
146             ## Init and finish routines.
147             ##
148             ###############################################################################
149              
150             sub init (@) {
151              
152 9 50 33 9 1 1548 shift if (defined $_[0] && $_[0] eq 'MCE::Stream');
153 9         63 my $_pkg = "$$.$_tid.".caller();
154              
155 9 50       72 $_params->{$_pkg} = (ref $_[0] eq 'HASH') ? shift : { @_ };
156              
157             _croak("$_tag: (HASH) not allowed as input by this MCE model")
158 9 50       36 if ( ref $_params->{$_pkg}{input_data} eq 'HASH' );
159              
160 9         27 @_ = ();
161              
162 9         18 return;
163             }
164              
165             sub finish (@) {
166              
167 24 50 33 24 1 7413 shift if (defined $_[0] && $_[0] eq 'MCE::Stream');
168 24 100       197 my $_pkg = (defined $_[0]) ? shift : "$$.$_tid.".caller();
169              
170 24 100 66     497 if ( $_pkg eq 'MCE' ) {
    100          
171 10         25 for my $_k ( keys %{ $_MCE } ) { MCE::Stream->finish($_k, 1); }
  10         207  
  8         225  
172             }
173             elsif ( $_MCE->{$_pkg} && $_MCE->{$_pkg}{_init_pid} eq "$$.$_tid" ) {
174 6 50       210 $_MCE->{$_pkg}->shutdown(@_) if $_MCE->{$_pkg}{_spawned};
175 6         48 $_gather_ref = $_order_id = undef, undef %_tmp;
176              
177 6         35 delete $_user_tasks->{$_pkg};
178 6         46 delete $_prev_c->{$_pkg};
179 6         48 delete $_prev_m->{$_pkg};
180 6         50 delete $_prev_n->{$_pkg};
181 6         33 delete $_prev_w->{$_pkg};
182 6         179 delete $_MCE->{$_pkg};
183              
184 6 50       118 if (defined $_queue->{$_pkg}) {
185 6         28 local $_;
186 6         22 $_->DESTROY() for (@{ $_queue->{$_pkg} });
  6         185  
187 6         99 delete $_queue->{$_pkg};
188             }
189             }
190              
191 24         129 @_ = ();
192              
193 24         89 return;
194             }
195              
196             ###############################################################################
197             ## ----------------------------------------------------------------------------
198             ## Parallel stream with MCE -- file.
199             ##
200             ###############################################################################
201              
202             sub run_file (@) {
203              
204 20 50 33 20 1 14785 shift if (defined $_[0] && $_[0] eq 'MCE::Stream');
205              
206 20 50       45 my ($_file, $_pos); my $_start_pos = (ref $_[0] eq 'HASH') ? 2 : 1;
  20         70  
207 20         100 my $_pid = "$$.$_tid.".caller();
208              
209 20 50       65 if (defined (my $_p = $_params->{$_pid})) {
210 20 50       55 delete $_p->{input_data} if (exists $_p->{input_data});
211 20 50       50 delete $_p->{sequence} if (exists $_p->{sequence});
212             }
213             else {
214 0         0 $_params->{$_pid} = {};
215             }
216              
217 20         155 for my $_i ($_start_pos .. @_ - 1) {
218 50         115 my $_r = ref $_[$_i];
219 50 100 66     335 if ($_r eq '' || $_r eq 'SCALAR' || $_r =~ /^(?:GLOB|FileHandle|IO::)/) {
      100        
220 20         35 $_file = $_[$_i]; $_pos = $_i;
  20         65  
221 20         60 last;
222             }
223             }
224              
225 20 100 66     400 if (defined $_file && ref $_file eq '' && $_file ne '') {
    50 66        
      33        
226 10 50       330 _croak("$_tag: ($_file) does not exist") unless (-e $_file);
227 10 50       140 _croak("$_tag: ($_file) is not readable") unless (-r $_file);
228 10 50       115 _croak("$_tag: ($_file) is not a plain file") unless (-f $_file);
229 10         30 $_params->{$_pid}{_file} = $_file;
230             }
231             elsif (ref $_file eq 'SCALAR' || ref($_file) =~ /^(?:GLOB|FileHandle|IO::)/) {
232 10         30 $_params->{$_pid}{_file} = $_file;
233             }
234             else {
235 0         0 _croak("$_tag: (file) is not specified or valid");
236             }
237              
238 20 50       80 if (defined $_pos) {
239 20         135 pop @_ for ($_pos .. @_ - 1);
240             }
241              
242 20         65 return run(@_);
243             }
244              
245             ###############################################################################
246             ## ----------------------------------------------------------------------------
247             ## Parallel stream with MCE -- sequence.
248             ##
249             ###############################################################################
250              
251             sub run_seq (@) {
252              
253 10 50 33 10 1 8225 shift if (defined $_[0] && $_[0] eq 'MCE::Stream');
254              
255 10 50       35 my ($_begin, $_end, $_pos); my $_start_pos = (ref $_[0] eq 'HASH') ? 2 : 1;
  10         145  
256 10         70 my $_pid = "$$.$_tid.".caller();
257              
258 10 50       45 if (defined (my $_p = $_params->{$_pid})) {
259 10 50       35 delete $_p->{sequence} if (exists $_p->{sequence});
260 10 50       45 delete $_p->{input_data} if (exists $_p->{input_data});
261 10 50       35 delete $_p->{_file} if (exists $_p->{_file});
262             }
263             else {
264 0         0 $_params->{$_pid} = {};
265             }
266              
267 10         50 for my $_i ($_start_pos .. @_ - 1) {
268 25         55 my $_r = ref $_[$_i];
269              
270 25 50 66     270 if ($_r eq '' || $_r =~ /^Math::/ || $_r eq 'HASH' || $_r eq 'ARRAY') {
      66        
      33        
271 10         40 $_pos = $_i;
272              
273 10 50 33     45 if ($_r eq '' || $_r =~ /^Math::/) {
    0          
    0          
274 10         40 $_begin = $_[$_pos], $_end = $_[$_pos + 1];
275             $_params->{$_pid}{sequence} = [
276 10         50 $_[$_pos], $_[$_pos + 1], $_[$_pos + 2], $_[$_pos + 3]
277             ];
278             }
279             elsif ($_r eq 'HASH') {
280 0         0 $_begin = $_[$_pos]->{begin}, $_end = $_[$_pos]->{end};
281 0         0 $_params->{$_pid}{sequence} = $_[$_pos];
282             }
283             elsif ($_r eq 'ARRAY') {
284 0         0 $_begin = $_[$_pos]->[0], $_end = $_[$_pos]->[1];
285 0         0 $_params->{$_pid}{sequence} = $_[$_pos];
286             }
287              
288 10         35 last;
289             }
290             }
291              
292             _croak("$_tag: (sequence) is not specified or valid")
293 10 50       30 unless (exists $_params->{$_pid}{sequence});
294 10 50       50 _croak("$_tag: (begin) is not specified for sequence")
295             unless (defined $_begin);
296 10 50       30 _croak("$_tag: (end) is not specified for sequence")
297             unless (defined $_end);
298              
299 10         30 $_params->{$_pid}{sequence_run} = undef;
300              
301 10 50       25 if (defined $_pos) {
302 10         50 pop @_ for ($_pos .. @_ - 1);
303             }
304              
305 10         35 return run(@_);
306             }
307              
308             ###############################################################################
309             ## ----------------------------------------------------------------------------
310             ## Parallel stream with MCE.
311             ##
312             ###############################################################################
313              
314             sub run (@) {
315              
316 59 50 33 59 1 17201 shift if (defined $_[0] && $_[0] eq 'MCE::Stream');
317              
318 59 100       311 my $_pkg = caller() eq 'MCE::Stream' ? caller(1) : caller();
319 59         232 my $_pid = "$$.$_tid.$_pkg";
320              
321 59 50 66     469 if (ref $_[0] eq 'HASH' && !exists $_[0]->{code}) {
322 0 0       0 $_params->{$_pid} = {} unless defined $_params->{$_pid};
323 0         0 for my $_p (keys %{ $_[0] }) {
  0         0  
324 0         0 $_params->{$_pid}{$_p} = $_[0]->{$_p};
325             }
326              
327 0         0 shift;
328             }
329              
330 59 100       89 my $_aref; $_aref = shift if (ref $_[0] eq 'ARRAY');
  59         265  
331              
332 59         539 $_order_id = 1; undef %_tmp;
  59         214  
333              
334 59 100       152 if (defined $_aref) {
335 25         45 $_gather_ref = $_aref; @{ $_aref } = ();
  25         115  
  25         75  
336             } else {
337 34         73 $_gather_ref = undef;
338             }
339              
340             ## -------------------------------------------------------------------------
341              
342 59         108 my (@_code, @_mode, @_name, @_wrks); my $_init_mce = 0; my $_pos = 0;
  59         123  
  59         107  
343 59         152 my $_default_mode = $_def->{$_pkg}{DEFAULT_MODE};
344              
345 59   100     270 while (ref $_[0] eq 'CODE' || ref $_[0] eq 'HASH') {
346 118 100       340 if (ref $_[0] eq 'CODE') {
347 108         182 push @_code, $_[0];
348 108         172 push @_mode, $_default_mode;
349             }
350             else {
351 10 0 33     105 last if (!exists $_[0]->{code} && !exists $_[0]->{mode});
352              
353 10 50       120 push @_code, exists $_[0]->{code} ? $_[0]->{code} : undef;
354 10 50       55 push @_mode, exists $_[0]->{mode} ? $_[0]->{mode} : $_default_mode;
355              
356 10 50       60 unless (ref $_code[-1] eq 'CODE') {
357 0         0 @_ = (); _croak("$_tag: (code) is not valid");
  0         0  
358             }
359 10 50 66     570 if ($_mode[-1] ne 'grep' && $_mode[-1] ne 'map') {
360 0         0 @_ = (); _croak("$_tag: (mode) is not valid");
  0         0  
361             }
362             }
363              
364 118 50       350 if (defined (my $_p = $_params->{$_pid})) {
365             push @_name, (ref $_p->{task_name} eq 'ARRAY')
366 118 50       440 ? $_p->{task_name}->[$_pos] : undef;
367             push @_wrks, (ref $_p->{max_workers} eq 'ARRAY')
368 118 50       337 ? $_p->{max_workers}->[$_pos] : undef;
369             }
370              
371             $_init_mce = 1 if (
372             !defined $_prev_c->{$_pid}[$_pos] ||
373 118 100 66     562 $_prev_c->{$_pid}[$_pos] != $_code[$_pos]
374             );
375             $_init_mce = 1 if (
376             !defined $_prev_m->{$_pid}[$_pos] ||
377 118 100 66     498 $_prev_m->{$_pid}[$_pos] ne $_mode[$_pos]
378             );
379              
380 118 100       294 $_init_mce = 1 if ($_prev_n->{$_pid}[$_pos] ne $_name[$_pos]);
381 118 100       353 $_init_mce = 1 if ($_prev_w->{$_pid}[$_pos] ne $_wrks[$_pos]);
382              
383 118         252 $_prev_c->{$_pid}[$_pos] = $_code[$_pos];
384 118         293 $_prev_m->{$_pid}[$_pos] = $_mode[$_pos];
385 118         232 $_prev_n->{$_pid}[$_pos] = $_name[$_pos];
386 118         258 $_prev_w->{$_pid}[$_pos] = $_wrks[$_pos];
387              
388 118         216 shift; $_pos++;
  118         601  
389             }
390              
391 59 50       268 if (defined $_prev_c->{$_pid}[$_pos]) {
392 0         0 pop @{ $_prev_c->{$_pid} } for ($_pos .. $#{ $_prev_c->{$_pid } });
  0         0  
  0         0  
393 0         0 pop @{ $_prev_m->{$_pid} } for ($_pos .. $#{ $_prev_m->{$_pid } });
  0         0  
  0         0  
394 0         0 pop @{ $_prev_n->{$_pid} } for ($_pos .. $#{ $_prev_n->{$_pid } });
  0         0  
  0         0  
395 0         0 pop @{ $_prev_w->{$_pid} } for ($_pos .. $#{ $_prev_w->{$_pid } });
  0         0  
  0         0  
396              
397 0         0 $_init_mce = 1;
398             }
399              
400 59 50       192 return unless (scalar @_code);
401              
402             ## -------------------------------------------------------------------------
403              
404 59         111 my $_input_data; my $_max_workers = $_def->{$_pkg}{MAX_WORKERS};
  59         137  
405 59         117 my $_r = ref $_[0];
406              
407 59 100 66     356 if (@_ == 1 && $_r =~ /^(?:ARRAY|HASH|SCALAR|GLOB|FileHandle|IO::)/) {
408 10 50       40 _croak("$_tag: (HASH) not allowed as input by this MCE model")
409             if $_r eq 'HASH';
410 10         15 $_input_data = shift;
411             }
412              
413 59 50       181 if (defined (my $_p = $_params->{$_pid})) {
414             $_max_workers = MCE::_parse_max_workers($_p->{max_workers})
415 59 50 33     294 if (exists $_p->{max_workers} && ref $_p->{max_workers} ne 'ARRAY');
416              
417 59 100 100     446 delete $_p->{sequence} if (defined $_input_data || scalar @_);
418 59 50       158 delete $_p->{user_func} if (exists $_p->{user_func});
419 59 50       123 delete $_p->{user_tasks} if (exists $_p->{user_tasks});
420 59 50       132 delete $_p->{use_slurpio} if (exists $_p->{use_slurpio});
421 59 50       113 delete $_p->{bounds_only} if (exists $_p->{bounds_only});
422 59 50       137 delete $_p->{gather} if (exists $_p->{gather});
423             }
424              
425 59 50 33     271 if (@_code > 1 && $_max_workers > 1) {
426 59         231 $_max_workers = int($_max_workers / @_code + 0.5) + 1;
427             }
428              
429 59         118 my $_chunk_size = do {
430 59   50     256 my $_p = $_params->{$_pid} || {};
431             (defined $_p->{init_relay} || defined $_def->{$_pkg}{INIT_RELAY}) ? 1 :
432             MCE::_parse_chunk_size(
433 59 50 33     666 $_def->{$_pkg}{CHUNK_SIZE}, $_max_workers, $_params->{$_pid},
434             $_input_data, scalar @_
435             );
436             };
437              
438 59 50       201 if (defined (my $_p = $_params->{$_pid})) {
439 59 100       118 if (exists $_p->{_file}) {
440 20         55 $_input_data = delete $_p->{_file};
441             } else {
442 39 50       106 $_input_data = $_p->{input_data} if exists $_p->{input_data};
443             }
444             }
445              
446             ## -------------------------------------------------------------------------
447              
448 59         360 MCE::_save_state($_MCE->{$_pid});
449              
450 59 100 66     331 if ($_init_mce || !exists $_queue->{$_pid}) {
451 14 50       57 $_MCE->{$_pid}->shutdown() if (defined $_MCE->{$_pid});
452 14 50       67 $_queue->{$_pid} = [] if (!defined $_queue->{$_pid});
453              
454 14         42 my $_Q = $_queue->{$_pid};
455 14         33 pop(@{ $_Q })->DESTROY for (@_code .. @{ $_Q });
  14         97  
  0         0  
456              
457 14         202 push @{ $_Q }, MCE::Queue->new()
458 14         38 for (@{ $_Q } .. @_code - 2);
  14         133  
459              
460             ## must clear arrays for nested session to work with Perl < v5.14
461 14         119 _gen_user_tasks($_pid, $_Q, [@_code], [@_mode], [@_name], [@_wrks]);
462              
463 14         81 @_code = @_mode = @_name = @_wrks = ();
464              
465             my %_opts = (
466             max_workers => $_max_workers, task_name => $_tag,
467 14         128 user_tasks => $_user_tasks->{$_pid}, task_end => \&_task_end,
468             use_slurpio => 0,
469             );
470              
471 14 50       76 if (defined (my $_p = $_params->{$_pid})) {
472 14         143 local $_;
473              
474 14         43 for (keys %{ $_p }) {
  14         80  
475 28 50       140 next if ($_ eq 'sequence_run');
476 28 100 66     316 next if ($_ eq 'max_workers' && ref $_p->{max_workers} eq 'ARRAY');
477 14 50 33     237 next if ($_ eq 'task_name' && ref $_p->{task_name} eq 'ARRAY');
478 0 0       0 next if ($_ eq 'input_data');
479 0 0       0 next if ($_ eq 'chunk_size');
480 0 0       0 next if ($_ eq 'task_end');
481              
482             _croak("$_tag: ($_) is not a valid constructor argument")
483 0 0       0 unless (exists $MCE::_valid_fields_new{$_});
484              
485 0         0 $_opts{$_} = $_p->{$_};
486             }
487             }
488              
489 14         105 for my $_k (qw/ tmp_dir freeze thaw init_relay use_threads /) {
490             $_opts{$_k} = $_def->{$_pkg}{uc($_k)}
491 70 50 33     366 if (exists $_def->{$_pkg}{uc($_k)} && !exists $_opts{$_k});
492             }
493              
494 14         246 $_MCE->{$_pid} = MCE->new(pkg => $_pkg, %_opts);
495             }
496             else {
497             ## Workers may persist after running. Thus, updating the MCE instance.
498             ## These options do not require respawning.
499 45 50       120 if (defined (my $_p = $_params->{$_pid})) {
500 45         145 for my $_k (qw(
501             RS interval stderr_file stdout_file user_error user_output
502             job_delay submit_delay on_post_exit on_post_run user_args
503             flush_file flush_stderr flush_stdout max_retries
504             )) {
505 675 50       1340 $_MCE->{$_pid}{$_k} = $_p->{$_k} if (exists $_p->{$_k});
506             }
507             }
508             }
509              
510             ## -------------------------------------------------------------------------
511              
512 59 100       301 if (defined $_input_data) {
    100          
513 30         55 @_ = ();
514 30         205 $_MCE->{$_pid}->process({ chunk_size => $_chunk_size }, $_input_data);
515 30         160 delete $_MCE->{$_pid}{input_data};
516             }
517             elsif (scalar @_) {
518 19         177 $_MCE->{$_pid}->process({ chunk_size => $_chunk_size }, \@_);
519 11         100 delete $_MCE->{$_pid}{input_data};
520             }
521             else {
522 10 50 33     210 if (defined $_params->{$_pid} && exists $_params->{$_pid}{sequence}) {
523             $_MCE->{$_pid}->run({
524             chunk_size => $_chunk_size,
525             sequence => $_params->{$_pid}{sequence}
526 10         90 }, 0);
527 10 50       110 if (exists $_params->{$_pid}{sequence_run}) {
528 10         85 delete $_params->{$_pid}{sequence_run};
529 10         25 delete $_params->{$_pid}{sequence};
530             }
531 10         25 delete $_MCE->{$_pid}{sequence};
532             }
533             }
534              
535 51         344 MCE::_restore_state();
536              
537             # destroy queue(s) if MCE::run requested workers to shutdown
538 51 50       171 if (!$_MCE->{$_pid}{_spawned}) {
539 0         0 $_->DESTROY() for @{ $_queue->{$_pid} };
  0         0  
540 0         0 delete $_queue->{$_pid};
541             }
542              
543 51 100       262 return map { @{ $_ } } delete @_tmp{ 1 .. $_order_id - 1 }
  194         220  
  194         820  
544             unless (defined $_aref);
545              
546 25         60 $_gather_ref = undef;
547              
548 25         185 return;
549             }
550              
551             ###############################################################################
552             ## ----------------------------------------------------------------------------
553             ## Private methods.
554             ##
555             ###############################################################################
556              
557             sub _croak {
558              
559 0     0   0 goto &MCE::_croak;
560             }
561              
562             sub _gen_user_tasks {
563              
564 14     14   70 my ($_pid, $_queue_ref, $_code_ref, $_mode_ref, $_name_ref, $_wrks_ref) = @_;
565              
566 14         47 @{ $_user_tasks->{$_pid} } = ();
  14         61  
567              
568             ## For the code block farthest to the right.
569              
570 14         52 push @{ $_user_tasks->{$_pid} }, {
571             task_name => $_name_ref->[-1],
572             max_workers => $_wrks_ref->[-1],
573              
574 14         221 gather => (@{ $_code_ref } > 1)
575             ? $_queue_ref->[-1] : \&_preserve_order,
576              
577             user_func => sub {
578 83     83   188 my ($_mce, $_chunk_ref, $_chunk_id) = @_;
579 83         146 my @_a; my $_code = $_code_ref->[-1];
  83         233  
580              
581 83 100       256 if (ref $_chunk_ref) {
582             push @_a, ($_mode_ref->[-1] eq 'map')
583 72         168 ? map { &{ $_code } } @{ $_chunk_ref }
  72         236  
  56         169  
584 65 100       203 : grep { &{ $_code } } @{ $_chunk_ref };
  9         14  
  9         40  
  9         21  
585             }
586             else {
587             push @_a, ($_mode_ref->[-1] eq 'map')
588 18         71 ? map { &{ $_code } } $_chunk_ref
  18         61  
589 18 50       101 : grep { &{ $_code } } $_chunk_ref;
  0         0  
  0         0  
590             }
591              
592 83 50       11413 MCE->gather( (@{ $_code_ref } > 1)
  83         673  
593             ? MCE->freeze([ \@_a, $_chunk_id ])
594             : (\@_a, $_chunk_id)
595             );
596             }
597 14 50       33 };
598              
599             ## For in-between code blocks (processed from right to left).
600              
601 14         42 for (my $_i = @{ $_code_ref } - 2; $_i > 0; $_i--) {
  14         71  
602 0         0 my $_pos = $_i;
603              
604 0         0 push @{ $_user_tasks->{$_pid} }, {
605             task_name => $_name_ref->[$_pos],
606             max_workers => $_wrks_ref->[$_pos],
607             gather => $_queue_ref->[$_pos - 1],
608              
609             user_func => sub {
610 0     0   0 my $_q = $_queue_ref->[$_pos];
611              
612 0         0 while (1) {
613 0         0 my $_chunk = $_q->dequeue;
614 0 0       0 last unless (defined $_chunk);
615              
616 0         0 my @_a; my $_code = $_code_ref->[$_pos];
  0         0  
617 0         0 $_chunk = MCE->thaw($_chunk);
618              
619             push @_a, ($_mode_ref->[$_pos] eq 'map')
620 0         0 ? map { &{ $_code } } @{ $_chunk->[0] }
  0         0  
  0         0  
621 0 0       0 : grep { &{ $_code } } @{ $_chunk->[0] };
  0         0  
  0         0  
  0         0  
622              
623 0         0 MCE->gather(MCE->freeze([ \@_a, $_chunk->[1] ]));
624             }
625              
626 0         0 return;
627             }
628 0         0 };
629             }
630              
631             ## For the left-most code block.
632              
633 14 50       33 if (@{ $_code_ref } > 1) {
  14         70  
634              
635 14         146 push @{ $_user_tasks->{$_pid} }, {
636             task_name => $_name_ref->[0],
637             max_workers => $_wrks_ref->[0],
638             gather => \&_preserve_order,
639              
640             user_func => sub {
641 22     22   124 my $_q = $_queue_ref->[0];
642              
643 22         46 while (1) {
644 105         592 my $_chunk = $_q->dequeue;
645 105 100       527 last unless (defined $_chunk);
646              
647 83         175 my @_a; my $_code = $_code_ref->[0];
  83         270  
648 83         656 $_chunk = MCE->thaw($_chunk);
649              
650             push @_a, ($_mode_ref->[0] eq 'map')
651 93         221 ? map { &{ $_code } } @{ $_chunk->[0] }
  93         350  
  83         306  
652 83 50       472 : grep { &{ $_code } } @{ $_chunk->[0] };
  0         0  
  0         0  
  0         0  
653              
654 83         976 MCE->gather(\@_a, $_chunk->[1]);
655             }
656              
657 22         116 return;
658             }
659 14         24 };
660             }
661              
662 14         52 return;
663             }
664              
665             1;
666              
667             __END__