File Coverage

blib/lib/MCE/Candy.pm
Criterion Covered Total %
statement 35 129 27.1
branch 9 60 15.0
condition 3 15 20.0
subroutine 8 14 57.1
pod 6 6 100.0
total 61 224 27.2


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## Sugar methods and output iterators.
4             ##
5             ###############################################################################
6              
7             package MCE::Candy;
8              
9 10     10   18075 use strict;
  10         30  
  10         679  
10 10     10   60 use warnings;
  10         21  
  10         627  
11              
12 10     10   59 no warnings qw( threads recursion uninitialized );
  10         21  
  10         18361  
13              
14             our $VERSION = '1.902';
15              
16             our @CARP_NOT = qw( MCE );
17              
18             ###############################################################################
19             ## ----------------------------------------------------------------------------
20             ## Import routine.
21             ##
22             ###############################################################################
23              
24             my $_imported;
25              
26             sub import {
27              
28 10 50   10   163 return if ($_imported++);
29              
30 10 50       50 unless ($INC{'MCE.pm'}) {
31 0         0 $\ = undef; require Carp;
  0         0  
32 0         0 Carp::croak(
33             "MCE::Candy requires MCE. Please see the MCE::Candy documentation\n".
34             "for more information.\n\n"
35             );
36             }
37              
38 10         376 return;
39             }
40              
41             ###############################################################################
42             ## ----------------------------------------------------------------------------
43             ## Forchunk, foreach, and forseq sugar methods.
44             ##
45             ###############################################################################
46              
47             sub forchunk {
48              
49 0 0   0 1 0 my $x = shift; my $self = ref($x) ? $x : $MCE::MCE;
  0         0  
50 0         0 my $_input_data = $_[0];
51              
52 0         0 MCE::_validate_runstate($self, 'MCE::forchunk');
53              
54 0         0 my ($_user_func, $_params_ref);
55              
56 0 0       0 if (ref $_[1] eq 'HASH') {
57 0         0 $_user_func = $_[2]; $_params_ref = $_[1];
  0         0  
58             } else {
59 0         0 $_user_func = $_[1]; $_params_ref = {};
  0         0  
60             }
61              
62 0         0 @_ = ();
63              
64 0 0       0 MCE::_croak('MCE::forchunk: (input_data) is not specified')
65             unless (defined $_input_data);
66 0 0       0 MCE::_croak('MCE::forchunk: (code_block) is not specified')
67             unless (defined $_user_func);
68              
69 0         0 $_params_ref->{input_data} = $_input_data;
70 0         0 $_params_ref->{user_func} = $_user_func;
71              
72 0         0 $self->run(1, $_params_ref);
73              
74 0         0 return $self;
75             }
76              
77             sub foreach {
78              
79 0 0   0 1 0 my $x = shift; my $self = ref($x) ? $x : $MCE::MCE;
  0         0  
80 0         0 my $_input_data = $_[0];
81              
82 0         0 MCE::_validate_runstate($self, 'MCE::foreach');
83              
84 0         0 my ($_user_func, $_params_ref);
85              
86 0 0       0 if (ref $_[1] eq 'HASH') {
87 0         0 $_user_func = $_[2]; $_params_ref = $_[1];
  0         0  
88             } else {
89 0         0 $_user_func = $_[1]; $_params_ref = {};
  0         0  
90             }
91              
92 0         0 @_ = ();
93              
94 0 0       0 MCE::_croak('MCE::foreach: (HASH) not allowed as input by this method')
95             if (ref $_input_data eq 'HASH');
96 0 0       0 MCE::_croak('MCE::foreach: (input_data) is not specified')
97             unless (defined $_input_data);
98 0 0       0 MCE::_croak('MCE::foreach: (code_block) is not specified')
99             unless (defined $_user_func);
100              
101 0         0 $_params_ref->{chunk_size} = 1;
102 0         0 $_params_ref->{input_data} = $_input_data;
103 0         0 $_params_ref->{user_func} = $_user_func;
104              
105 0         0 $self->run(1, $_params_ref);
106              
107 0         0 return $self;
108             }
109              
110             sub forseq {
111              
112 0 0   0 1 0 my $x = shift; my $self = ref($x) ? $x : $MCE::MCE;
  0         0  
113 0         0 my $_sequence = $_[0];
114              
115 0         0 MCE::_validate_runstate($self, 'MCE::forseq');
116              
117 0         0 my ($_user_func, $_params_ref);
118              
119 0 0       0 if (ref $_[1] eq 'HASH') {
120 0         0 $_user_func = $_[2]; $_params_ref = $_[1];
  0         0  
121             } else {
122 0         0 $_user_func = $_[1]; $_params_ref = {};
  0         0  
123             }
124              
125 0         0 @_ = ();
126              
127 0 0       0 MCE::_croak('MCE::forseq: (sequence) is not specified')
128             unless (defined $_sequence);
129 0 0       0 MCE::_croak('MCE::forseq: (code_block) is not specified')
130             unless (defined $_user_func);
131              
132 0         0 $_params_ref->{sequence} = $_sequence;
133 0         0 $_params_ref->{user_func} = $_user_func;
134              
135 0         0 $self->run(1, $_params_ref);
136              
137 0         0 return $self;
138             }
139              
140             ###############################################################################
141             ## ----------------------------------------------------------------------------
142             ## Output iterators for preserving output order.
143             ##
144             ###############################################################################
145              
146             sub out_iter_array {
147              
148 9     9 1 1662651 my $_aref = shift; my %_tmp; my $_order_id = 1;
  9         27  
  9         27  
149              
150 9 50       54 if (ref $_aref eq 'MCE::Shared::Object') {
151 0         0 my $_pkg = $_aref->blessed;
152 0 0       0 MCE::_croak('The argument to (out_iter_array) is not valid.')
153             unless $_pkg->can('TIEARRAY');
154             }
155             else {
156 9 50       54 MCE::_croak('The argument to (out_iter_array) is not an array ref.')
157             unless (ref $_aref eq 'ARRAY');
158             }
159              
160             return sub {
161 20     20   45 my $_chunk_id = shift;
162              
163 20 50 33     315 if ($_chunk_id == $_order_id && keys %_tmp == 0) {
164             ## already orderly
165 20         35 $_order_id++, push @{ $_aref }, @_;
  20         145  
166             }
167             else {
168             ## hold temporarily otherwise until orderly
169 0         0 @{ $_tmp{ $_chunk_id } } = @_;
  0         0  
170              
171 0         0 while (1) {
172 0 0       0 last unless exists $_tmp{ $_order_id };
173 0         0 push @{ $_aref }, @{ delete $_tmp{ $_order_id++ } };
  0         0  
  0         0  
174             }
175             }
176 9         324 };
177             }
178              
179             sub out_iter_callback {
180              
181 5     5 1 6675 my $_cref = shift; my %_tmp; my $_order_id = 1;
  5         15  
  5         10  
182              
183 5 50       20 MCE::_croak('The argument to (out_iter_callback) is not a CODE ref.')
184             unless (ref $_cref eq 'CODE');
185              
186             return sub {
187 4     4   16 my $_chunk_id = shift;
188              
189 4 50 66     58 if ($_chunk_id == $_order_id && keys %_tmp == 0) {
190             ## already orderly
191 0         0 $_order_id++, $_cref->(@_);
192             }
193             else {
194             ## hold temporarily otherwise until orderly
195 4         3 @{ $_tmp{ $_chunk_id } } = @_;
  4         29  
196              
197 4         4 while (1) {
198 8 100       33 last unless exists $_tmp{ $_order_id };
199 4         4 $_cref->(@{ delete $_tmp{ $_order_id++ } });
  4         29  
200             }
201             }
202 5         120 };
203             }
204              
205             sub out_iter_fh {
206              
207 0     0 1   my $_fh = $_[0]; my %_tmp; my $_order_id = 1;
  0            
  0            
208 0 0 0       $_fh = \$_[0] if (!ref $_fh && ref \$_[0]);
209              
210 0 0         MCE::_croak('The argument to (out_iter_fh) is not a supported file handle.')
211             unless (ref($_fh) =~ /^(?:GLOB|FileHandle|IO::)/);
212              
213 0 0         if ($_fh->can('print')) {
214             return sub {
215 0     0     my $_chunk_id = shift;
216              
217 0 0 0       if ($_chunk_id == $_order_id && keys %_tmp == 0) {
218             ## already orderly
219 0           $_order_id++, $_fh->print(@_);
220             }
221             else {
222             ## hold temporarily otherwise until orderly
223 0           @{ $_tmp{ $_chunk_id } } = @_;
  0            
224              
225 0           while (1) {
226 0 0         last unless exists $_tmp{ $_order_id };
227 0           $_fh->print(@{ delete $_tmp{ $_order_id++ } });
  0            
228             }
229             }
230 0           };
231             }
232             else {
233             return sub {
234 0     0     my $_chunk_id = shift;
235              
236 0 0 0       if ($_chunk_id == $_order_id && keys %_tmp == 0) {
237             ## already orderly
238 0           $_order_id++, print {$_fh} @_;
  0            
239             }
240             else {
241             ## hold temporarily otherwise until orderly
242 0           @{ $_tmp{ $_chunk_id } } = @_;
  0            
243              
244 0           while (1) {
245 0 0         last unless exists $_tmp{ $_order_id };
246 0           print {$_fh} @{ delete $_tmp{ $_order_id++ } };
  0            
  0            
247             }
248             }
249 0           };
250             }
251             }
252              
253             1;
254              
255             __END__
256              
257             ###############################################################################
258             ## ----------------------------------------------------------------------------
259             ## Module usage.
260             ##
261             ###############################################################################
262              
263             =head1 NAME
264              
265             MCE::Candy - Sugar methods and output iterators
266              
267             =head1 VERSION
268              
269             This document describes MCE::Candy version 1.902
270              
271             =head1 DESCRIPTION
272              
273             This module provides a collection of sugar methods and helpful output iterators
274             for preserving output order.
275              
276             =head1 "FOR" SUGAR METHODS
277              
278             The sugar methods described below were created prior to the 1.5 release which
279             added MCE Models. This module is loaded automatically upon calling a "for"
280             method.
281              
282             =head2 $mce->forchunk ( $input_data [, { options } ], sub { ... } )
283              
284             Forchunk, foreach, and forseq are sugar methods in MCE. Workers are
285             spawned automatically, the code block is executed in parallel, and shutdown
286             is called. Do not call these methods if workers must persist afterwards.
287              
288             Specifying options is optional. Valid options are the same as for the
289             process method.
290              
291             ## Declare a MCE instance.
292              
293             my $mce = MCE->new(
294             max_workers => $max_workers,
295             chunk_size => 20
296             );
297              
298             ## Arguments inside the code block are the same as passed to user_func.
299              
300             $mce->forchunk(\@input_array, sub {
301             my ($mce, $chunk_ref, $chunk_id) = @_;
302             foreach ( @{ $chunk_ref } ) {
303             MCE->print("$chunk_id: $_\n");
304             }
305             });
306              
307             ## Input hash, current API available since 1.828.
308              
309             $mce->forchunk(\%input_hash, sub {
310             my ($mce, $chunk_ref, $chunk_id) = @_;
311             for my $key ( keys %{ $chunk_ref } ) {
312             MCE->print("$chunk_id: [ $key ] ", $chunk_ref->{$key}, "\n");
313             }
314             });
315              
316             ## Passing chunk_size as an option.
317              
318             $mce->forchunk(\@input_array, { chunk_size => 30 }, sub { ... });
319             $mce->forchunk(\%input_hash, { chunk_size => 30 }, sub { ... });
320              
321             =head2 $mce->foreach ( $input_data [, { options } ], sub { ... } )
322              
323             Foreach implies chunk_size => 1 and cannot be overwritten. Thus, looping is
324             not necessary inside the block. Unlike forchunk above, a hash reference as
325             input data isn't allowed.
326              
327             my $mce = MCE->new(
328             max_workers => $max_workers
329             );
330              
331             $mce->foreach(\@input_data, sub {
332             my ($mce, $chunk_ref, $chunk_id) = @_;
333             my $row = $chunk_ref->[0];
334             MCE->print("$chunk_id: $row\n");
335             });
336              
337             =head2 $mce->forseq ( $sequence_spec [, { options } ], sub { ... } )
338              
339             Sequence may be defined using an array or hash reference.
340              
341             my $mce = MCE->new(
342             max_workers => 3
343             );
344              
345             $mce->forseq([ 20, 40 ], sub {
346             my ($mce, $n, $chunk_id) = @_;
347             my $result = `ping 192.168.1.${n}`;
348             ...
349             });
350              
351             $mce->forseq({ begin => 15, end => 10, step => -1 }, sub {
352             my ($mce, $n, $chunk_id) = @_;
353             print $n, " from ", MCE->wid, "\n";
354             });
355              
356             The $n_seq variable points to an array_ref of sequences. Chunk size defaults
357             to 1 when not specified.
358              
359             $mce->forseq([ 20, 80 ], { chunk_size => 10 }, sub {
360             my ($mce, $n_seq, $chunk_id) = @_;
361             for my $n ( @{ $n_seq } ) {
362             my $result = `ping 192.168.1.${n}`;
363             ...
364             }
365             });
366              
367             =head1 OUTPUT ITERATORS WITH INPUT
368              
369             This module provides three output iterators useful for preserving output order
370             while gathering data. The chunk_id value must be the first argument to gather.
371             Gather must be called once and not more inside the block.
372              
373             =head2 gather => MCE::Candy::out_iter_array( \@array )
374              
375             The example utilizes the Core API with chunking disabled. Basically, setting
376             chunk_size to 1.
377              
378             use MCE;
379             use MCE::Candy;
380              
381             my @results;
382              
383             my $mce = MCE->new(
384             chunk_size => 1, max_workers => 4,
385             gather => MCE::Candy::out_iter_array(\@results),
386             user_func => sub {
387             my ($mce, $chunk_ref, $chunk_id) = @_;
388             $mce->gather($chunk_id, $chunk_ref->[0] * 2);
389             }
390             );
391              
392             $mce->process([ 100 .. 109 ]);
393             $mce->shutdown();
394              
395             print "@results", "\n";
396              
397             -- Output
398              
399             200 202 204 206 208 210 212 214 216 218
400              
401             Chunking may be desired for thousands or more items. In other words, wanting
402             to reduce the overhead placed on IPC.
403              
404             use MCE;
405             use MCE::Candy;
406              
407             my @results;
408              
409             my $mce = MCE->new(
410             chunk_size => 100, max_workers => 4,
411             gather => MCE::Candy::out_iter_array(\@results),
412             user_func => sub {
413             my ($mce, $chunk_ref, $chunk_id) = @_;
414             my @output;
415             foreach my $item (@{ $chunk_ref }) {
416             push @output, $item * 2;
417             }
418             $mce->gather($chunk_id, @output);
419             }
420             );
421              
422             $mce->process([ 100_000 .. 200_000 - 1 ]);
423             $mce->shutdown();
424              
425             print scalar @results, "\n";
426              
427             -- Output
428              
429             100000
430              
431             =head2 gather => MCE::Candy::out_iter_callback( \&cb_func )
432              
433             MCE workers pass arguments for the callback function. The chunk_id argument
434             to gather is used internally for calling the callback function orderly.
435              
436             Current API available since 1.897.
437              
438             use MCE;
439             use MCE::Candy;
440              
441             my @results;
442             my $max_status = 0;
443              
444             sub upd_vars {
445             push @results, @{ $_[0] };
446             $max_status = $_[1] if ($_[1] > $max_status);
447             }
448              
449             my $mce = MCE->new(
450             chunk_size => 100, max_workers => 4,
451             gather => MCE::Candy::out_iter_callback(\&upd_vars),
452             user_func => sub {
453             my ($mce, $chunk_ref, $chunk_id) = @_;
454             my @output;
455             foreach my $item (@{ $chunk_ref }) {
456             push @output, $item * 2;
457             }
458             my $status = $mce->chunk_id == 3 ? 2 : 0;
459             $mce->gather($chunk_id, [ @output ], $status);
460             }
461             );
462              
463             $mce->process([ 100_000 .. 200_000 - 1 ]);
464             $mce->shutdown();
465              
466             print scalar @results, "\n";
467             print $max_status, "\n";
468              
469             -- Output
470              
471             100000
472             2
473              
474             =head2 gather => MCE::Candy::out_iter_fh( $fh )
475              
476             Let's change things a bit and use MCE::Flow for the next 2 examples. Chunking
477             is not desired for the first example.
478              
479             use MCE::Flow;
480             use MCE::Candy;
481              
482             open my $fh, '>', '/tmp/foo.txt';
483              
484             mce_flow {
485             chunk_size => 1, max_workers => 4,
486             gather => MCE::Candy::out_iter_fh($fh)
487             },
488             sub {
489             my ($mce, $chunk_ref, $chunk_id) = @_;
490             $mce->gather($chunk_id, $chunk_ref->[0] * 2, "\n");
491              
492             }, (100 .. 109);
493              
494             close $fh;
495              
496             -- Output sent to '/tmp/foo.txt'
497              
498             200
499             202
500             204
501             206
502             208
503             210
504             212
505             214
506             216
507             218
508              
509             =head2 gather => MCE::Candy::out_iter_fh( $io )
510              
511             Same thing, an C<IO::*> object that can C<print> is supported since MCE 1.845.
512              
513             use IO::All;
514             use MCE::Flow;
515             use MCE::Candy;
516              
517             my $io = io('/tmp/foo.txt'); # i.e. $io->can('print')
518              
519             mce_flow {
520             chunk_size => 1, max_workers => 4,
521             gather => MCE::Candy::out_iter_fh($io)
522             },
523             sub {
524             my ($mce, $chunk_ref, $chunk_id) = @_;
525             $mce->gather($chunk_id, $chunk_ref->[0] * 2, "\n");
526              
527             }, (100 .. 109);
528              
529             $io->close;
530              
531             -- Output sent to '/tmp/foo.txt'
532              
533             200
534             202
535             204
536             206
537             208
538             210
539             212
540             214
541             216
542             218
543              
544             Chunking is desired for the next example due to processing many thousands.
545              
546             use MCE::Flow;
547             use MCE::Candy;
548              
549             open my $fh, '>', '/tmp/foo.txt';
550              
551             mce_flow {
552             chunk_size => 100, max_workers => 4,
553             gather => MCE::Candy::out_iter_fh( $fh )
554             },
555             sub {
556             my ($mce, $chunk_ref, $chunk_id) = @_;
557             my @output;
558             foreach my $item (@{ $chunk_ref }) {
559             push @output, ($item * 2) . "\n";
560             }
561             $mce->gather($chunk_id, @output);
562              
563             }, (100_000 .. 200_000 - 1);
564              
565             close $fh;
566              
567             print -s '/tmp/foo.txt', "\n";
568              
569             -- Output
570              
571             700000
572              
573             =head1 OUTPUT ITERATORS WITHOUT INPUT
574              
575             Input data is not a requirement for using the output iterators. The 'chunk_id'
576             argument to gather is still needed and set uniquely, same as 'wid' when not
577             processing input data.
578              
579             =head2 gather => MCE::Candy::out_iter_array( \@array )
580              
581             =head2 gather => MCE::Candy::out_iter_callback( \&cb_func )
582              
583             use MCE::Flow;
584             use MCE::Candy;
585              
586             my @results;
587              
588             sub append_results {
589             push @results, $_[0];
590             }
591              
592             mce_flow {
593             max_workers => 'auto', ## Note that 'auto' is never greater than 8
594             gather => MCE::Candy::out_iter_array(\@results),
595             # gather => MCE::Candy::out_iter_callback(\&append_results),
596             },
597             sub {
598             my ($mce) = @_; ## This line is not necessary
599             ## Calling via module okay; e.g: MCE->method
600             ## Do work
601             ## Sending a complex data structure is allowed
602              
603             ## Output will become orderly by iterator
604             $mce->gather( $mce->wid, {
605             wid => $mce->wid, result => $mce->wid * 2
606             });
607             };
608              
609             foreach my $href (@results) {
610             print $href->{wid} .": ". $href->{result} ."\n";
611             }
612              
613             -- Output
614              
615             1: 2
616             2: 4
617             3: 6
618             4: 8
619             5: 10
620             6: 12
621             7: 14
622             8: 16
623              
624             =head2 gather => MCE::Candy::out_iter_fh( $fh )
625              
626             use MCE::Flow;
627             use MCE::Candy;
628              
629             open my $fh, '>', '/tmp/out.txt';
630              
631             mce_flow {
632             max_workers => 'auto', ## See get_ncpu in <MCE::Util|MCE::Util>
633             gather => MCE::Candy::out_iter_fh($fh)
634             },
635             sub {
636             my $output = "# Worker ID: " . MCE->wid . "\n";
637              
638             ## Append results to $output string
639             $output .= (MCE->wid * 2) . "\n\n";
640              
641             ## Output will become orderly by iterator
642             MCE->gather( MCE->wid, $output );
643             };
644              
645             close $fh;
646              
647             -- Output
648              
649             # Worker ID: 1
650             2
651              
652             # Worker ID: 2
653             4
654              
655             # Worker ID: 3
656             6
657              
658             # Worker ID: 4
659             8
660              
661             # Worker ID: 5
662             10
663              
664             # Worker ID: 6
665             12
666              
667             # Worker ID: 7
668             14
669              
670             # Worker ID: 8
671             16
672              
673             =head1 INDEX
674              
675             L<MCE|MCE>, L<MCE::Core>
676              
677             =head1 AUTHOR
678              
679             Mario E. Roy, S<E<lt>marioeroy AT gmail DOT comE<gt>>
680              
681             =cut
682