File Coverage

blib/lib/MCE/Relay.pm
Criterion Covered Total %
statement 153 186 82.2
branch 62 112 55.3
condition 7 12 58.3
subroutine 13 13 100.0
pod 0 3 0.0
total 235 326 72.0


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## Extends Many-Core Engine with relay capabilities.
4             ##
5             ###############################################################################
6              
7             package MCE::Relay;
8              
9 10     10   1545 use strict;
  10         21  
  10         275  
10 10     10   31 use warnings;
  10         20  
  10         254  
11              
12 10     10   31 no warnings qw( threads recursion uninitialized numeric );
  10         19  
  10         453  
13              
14             our $VERSION = '1.887';
15              
16             ## no critic (Subroutines::ProhibitSubroutinePrototypes)
17              
18             use constant {
19 10         5987 OUTPUT_W_RLA => 'W~RLA', # Worker has relayed
20             OUTPUT_R_NFY => 'R~NFY', # Relay notification
21 10     10   41 };
  10         20  
22              
23             ###############################################################################
24             ## ----------------------------------------------------------------------------
25             ## Import routine.
26             ##
27             ###############################################################################
28              
29             my $LF = "\012"; Internals::SvREADONLY($LF, 1);
30             my $_imported;
31              
32             sub import {
33              
34 11 100   11   1032 return if ($_imported++);
35              
36 10 50       40 if ($INC{'MCE.pm'}) {
37 10         20 _mce_m_init();
38             }
39             else {
40 0         0 $\ = undef; require Carp;
  0         0  
41 0         0 Carp::croak(
42             "MCE::Relay cannot be used directly. Please consult the MCE::Relay\n".
43             "documentation for more information.\n\n"
44             );
45             }
46              
47 10         28 return;
48             }
49              
50             ###############################################################################
51             ## ----------------------------------------------------------------------------
52             ## Output routines for the manager process.
53             ##
54             ###############################################################################
55              
56             {
57             my ($_MCE, $_DAU_R_SOCK_REF, $_DAU_R_SOCK, $_rla_nextid, $_max_workers);
58              
59             my %_output_function = (
60              
61             OUTPUT_W_RLA.$LF => sub { # Worker has relayed
62              
63             $_rla_nextid = 0 if ( ++$_rla_nextid == $_max_workers );
64              
65             return;
66             },
67              
68             OUTPUT_R_NFY.$LF => sub { # Relay notification
69              
70             $_MCE->{_relayed}++;
71              
72             return;
73             },
74              
75             );
76              
77             sub _mce_m_loop_begin {
78              
79 16     16   49 ($_MCE, $_DAU_R_SOCK_REF) = @_;
80              
81 16         143 my $_caller = $_MCE->{_caller};
82              
83             $_max_workers = (exists $_MCE->{user_tasks})
84             ? $_MCE->{user_tasks}[0]{max_workers}
85 16 50       64 : $_MCE->{max_workers};
86              
87             ## Write initial relay data.
88 16 50       48 if (defined $_MCE->{init_relay}) {
89 16         35 my $_ref = ref $_MCE->{init_relay};
90              
91 16 50 100     163 MCE::_croak("MCE::Relay: (init_relay) is not valid")
      66        
92             if ($_ref ne '' && $_ref ne 'HASH' && $_ref ne 'ARRAY');
93              
94 16         36 my $_RLA_W_SOCK = $_MCE->{_rla_w_sock}->[0];
95 16         16 my $_init_relay;
96              
97 16         33 $_MCE->{_relayed} = 0;
98              
99 16 100       63 if (ref $_MCE->{init_relay} eq '') {
    100          
    50          
100 4         88 $_init_relay = $_MCE->{freeze}(\$_MCE->{init_relay}) . '0';
101             }
102             elsif (ref $_MCE->{init_relay} eq 'HASH') {
103 5         135 $_init_relay = $_MCE->{freeze}($_MCE->{init_relay}) . '1';
104             }
105             elsif (ref $_MCE->{init_relay} eq 'ARRAY') {
106 7         56 $_init_relay = $_MCE->{freeze}($_MCE->{init_relay}) . '2';
107             }
108              
109 16         36 print {$_RLA_W_SOCK} length($_init_relay) . $LF . $_init_relay;
  16         712  
110              
111 16         59 $_rla_nextid = 0;
112             }
113              
114 16         40 delete $MCE::RLA->{$_caller};
115              
116 16         106 return;
117             }
118              
119             sub _mce_m_loop_end {
120              
121             ## Obtain final relay data.
122 16 50   16   59 if (defined $_MCE->{init_relay}) {
123 16         35 my $_RLA_R_SOCK = $_MCE->{_rla_r_sock}->[$_rla_nextid];
124 16         47 my ($_caller, $_len, $_ret) = ($_MCE->{_caller});
125              
126 16         40 delete $_MCE->{_relayed};
127              
128 16 50       134 MCE::Util::_sock_ready($_RLA_R_SOCK, -1) if $^O eq 'MSWin32';
129 16         439 chomp($_len = <$_RLA_R_SOCK>);
130 16         76 read $_RLA_R_SOCK, $_ret, $_len;
131              
132 16 100       58 if (chop $_ret) {
133 12         203 $MCE::RLA->{$_caller} = $_MCE->{thaw}($_ret);
134             } else {
135 4         28 $MCE::RLA->{$_caller} = ${ $_MCE->{thaw}($_ret) };
  4         59  
136             }
137             }
138              
139             ## Clear variables.
140 16         55 $_MCE = $_DAU_R_SOCK_REF = $_DAU_R_SOCK = undef;
141 16         32 $_rla_nextid = $_max_workers = undef;
142              
143 16         50 return;
144             }
145              
146             sub _mce_m_init {
147              
148 10     10   42 MCE::_attach_plugin(
149             \%_output_function, \&_mce_m_loop_begin, \&_mce_m_loop_end
150             );
151              
152 10         12 return;
153             }
154             }
155              
156             ###############################################################################
157             ## ----------------------------------------------------------------------------
158             ## Relay methods.
159             ##
160             ###############################################################################
161              
162             ## Items below are folded into MCE.
163              
164             package # hide from rpm
165             MCE;
166              
167 10     10   69 no warnings qw( threads recursion uninitialized redefine );
  10         10  
  10         444  
168              
169 10     10   50 use Scalar::Util qw( weaken );
  10         10  
  10         11917  
170              
171             sub relay_final {
172              
173 16 50   16 0 263 my $x = shift; my $self = ref($x) ? $x : $MCE::MCE;
  16         68  
174              
175             _croak('MCE::relay_final: method is not allowed by the worker process')
176 16 50       66 if ($self->{_wid});
177              
178 16         96 my $_caller = caller;
179              
180 16 50       117 if (exists $MCE::RLA->{$_caller}) {
181 16 100       127 if (ref $MCE::RLA->{$_caller} eq '') {
    100          
    50          
182 4         35 return delete $MCE::RLA->{$_caller};
183             }
184             elsif (ref $MCE::RLA->{$_caller} eq 'HASH') {
185 5         15 return %{ delete $MCE::RLA->{$_caller} };
  5         100  
186             }
187             elsif (ref $MCE::RLA->{$_caller} eq 'ARRAY') {
188 7         14 return @{ delete $MCE::RLA->{$_caller} };
  7         42  
189             }
190              
191             # should not reach the following line
192 0         0 delete $MCE::RLA->{$_caller};
193             }
194              
195 0         0 return;
196             }
197              
198             sub relay_recv {
199              
200 24 50   24 0 255 my $x = shift; my $self = ref($x) ? $x : $MCE::MCE;
  24         57  
201              
202             _croak('MCE::relay_recv: (init_relay) is not defined')
203 24 50       60 unless (defined $self->{init_relay});
204             _croak('MCE::relay_recv: method is not allowed by the manager process')
205 24 50       67 unless ($self->{_wid});
206             _croak('MCE::relay_recv: method is not allowed by task_id > 0')
207 24 50       54 if ($self->{_task_id} > 0);
208              
209 24         34 my ($_chn, $_nxt, $_rdr, $_len, $_ref); local $_;
  24         30  
210              
211 24 50       72 local $\ = undef if (defined $\);
212 24 50       65 local $/ = $LF if ($/ ne $LF );
213              
214 24   33     141 $_chn = $self->{_chunk_id} || $self->{_wid};
215 24         51 $_chn = ($_chn - 1) % $self->{max_workers};
216 24         30 $_nxt = $_chn + 1;
217 24 100       48 $_nxt = 0 if ($_nxt == $self->{max_workers});
218 24         40 $_rdr = $self->{_rla_r_sock}->[$_chn];
219              
220 24         28 print {$self->{_dat_w_sock}->[0]} OUTPUT_W_RLA.$LF . '0'.$LF;
  24         479  
221              
222 24 50       119 MCE::Util::_sock_ready($_rdr, -1) if $^O eq 'MSWin32';
223 24         11782 chomp($_len = <$_rdr>);
224 24         150 read $_rdr, $_, $_len;
225 24         73 $_ref = chop $_;
226              
227 24 100       173 if ($_ref == 0) { ## scalar value
    100          
    50          
228 8         11 $self->{_rla_data} = ${ $self->{thaw}($_) };
  8         106  
229 8 50       27 return unless defined wantarray;
230 8         32 return $self->{_rla_data};
231             }
232             elsif ($_ref == 1) { ## hash reference
233 8         132 $self->{_rla_data} = $self->{thaw}($_);
234 8 50       26 return unless defined wantarray;
235 8         9 return %{ $self->{_rla_data} };
  8         56  
236             }
237             elsif ($_ref == 2) { ## array reference
238 8         110 $self->{_rla_data} = $self->{thaw}($_);
239 8 50       25 return unless defined wantarray;
240 8         10 return @{ $self->{_rla_data} };
  8         43  
241             }
242              
243 0         0 return;
244             }
245              
246             sub relay (;&) {
247              
248 26     26 0 490 my ($self, $_code);
249              
250 26 50       111 if (ref $_[0] eq 'CODE') {
251 0         0 ($self, $_code) = ($MCE::MCE, shift);
252             } else {
253 26 50       51 my $x = shift; $self = ref($x) ? $x : $MCE::MCE;
  26         51  
254 26         59 $_code = shift;
255             }
256              
257             _croak('MCE::relay: (init_relay) is not defined')
258 26 50       97 unless (defined $self->{init_relay});
259             _croak('MCE::relay: method is not allowed by the manager process')
260 26 50       64 unless ($self->{_wid});
261             _croak('MCE::relay: method is not allowed by task_id > 0')
262 26 50       59 if ($self->{_task_id} > 0);
263              
264 26 50       172 if (ref $_code ne 'CODE') {
265 0 0       0 _croak('MCE::relay: argument is not a code block') if (defined $_code);
266             } else {
267 26         87 weaken $_code;
268             }
269              
270 26         42 my ($_chn, $_cid, $_nxt, $_rdr, $_wtr);
271              
272 26 50       73 local $\ = undef if (defined $\);
273 26 50       88 local $/ = $LF if ($/ ne $LF );
274              
275 26   33     168 $_chn = $_cid = $self->{_chunk_id} || $self->{_wid};
276 26         53 $_chn = ($_chn - 1) % $self->{max_workers};
277 26         40 $_nxt = $_chn + 1;
278 26 100       72 $_nxt = 0 if ($_nxt == $self->{max_workers});
279 26         39 $_rdr = $self->{_rla_r_sock}->[$_chn];
280 26         44 $_wtr = $self->{_rla_w_sock}->[$_nxt];
281              
282 26 100       94 if (exists $self->{_rla_data}) {
283 24         29 my $_tmp; local $_ = delete $self->{_rla_data};
  24         45  
284 24 50       87 $_code->() if (ref $_code eq 'CODE');
285              
286 24 100       149 if (ref $_ eq '') { ## scalar value
    100          
    50          
287 8         43 $_tmp = $self->{freeze}(\$_) . '0';
288             }
289             elsif (ref $_ eq 'HASH') { ## hash reference
290 8         78 $_tmp = $self->{freeze}($_) . '1';
291             }
292             elsif (ref $_ eq 'ARRAY') { ## array reference
293 8         41 $_tmp = $self->{freeze}($_) . '2';
294             }
295              
296 24         41 print {$_wtr} length($_tmp) . $LF . $_tmp;
  24         1981  
297 24         89 print {$self->{_dat_w_sock}->[0]} OUTPUT_R_NFY.$LF . '0'.$LF;
  24         537  
298 24         152 $self->{_relayed} = $_cid;
299             }
300             else {
301 2         5 my ($_len, $_ref); local $_;
  2         5  
302 2         3 print {$self->{_dat_w_sock}->[0]} OUTPUT_W_RLA.$LF . '0'.$LF;
  2         54  
303              
304 2 50       19 MCE::Util::_sock_ready($_rdr, -1) if $^O eq 'MSWin32';
305 2         1478 chomp($_len = <$_rdr>);
306 2         17 read $_rdr, $_, $_len;
307 2         6 $_ref = chop $_;
308              
309 2 50       10 if ($_ref == 0) { ## scalar value
    0          
    0          
310 2         4 my $_ret = ${ $self->{thaw}($_) };
  2         94  
311 2 50       8 local $_ = $_ret; $_code->() if (ref $_code eq 'CODE');
  2         19  
312 2         30 my $_tmp = $self->{freeze}(\$_) . '0';
313              
314 2         9 print {$_wtr} length($_tmp) . $LF . $_tmp;
  2         59  
315 2         7 print {$self->{_dat_w_sock}->[0]} OUTPUT_R_NFY.$LF . '0'.$LF;
  2         22  
316 2         17 $self->{_relayed} = $_cid;
317              
318 2 50       19 return unless defined wantarray;
319 0         0 return $_ret;
320             }
321             elsif ($_ref == 1) { ## hash reference
322 0         0 my %_ret = %{ $self->{thaw}($_) };
  0         0  
323 0 0       0 local $_ = { %_ret }; $_code->() if (ref $_code eq 'CODE');
  0         0  
324 0         0 my $_tmp = $self->{freeze}($_) . '1';
325              
326 0         0 print {$_wtr} length($_tmp) . $LF . $_tmp;
  0         0  
327 0         0 print {$self->{_dat_w_sock}->[0]} OUTPUT_R_NFY.$LF . '0'.$LF;
  0         0  
328 0         0 $self->{_relayed} = $_cid;
329              
330 0 0       0 return unless defined wantarray;
331 0         0 return %_ret;
332             }
333             elsif ($_ref == 2) { ## array reference
334 0         0 my @_ret = @{ $self->{thaw}($_) };
  0         0  
335 0 0       0 local $_ = [ @_ret ]; $_code->() if (ref $_code eq 'CODE');
  0         0  
336 0         0 my $_tmp = $self->{freeze}($_) . '2';
337              
338 0         0 print {$_wtr} length($_tmp) . $LF . $_tmp;
  0         0  
339 0         0 print {$self->{_dat_w_sock}->[0]} OUTPUT_R_NFY.$LF . '0'.$LF;
  0         0  
340 0         0 $self->{_relayed} = $_cid;
341              
342 0 0       0 return unless defined wantarray;
343 0         0 return @_ret;
344             }
345             }
346              
347 24         221 return;
348             }
349              
350             ## Aliases.
351              
352             *relay_lock = \&relay_recv;
353             *relay_unlock = \&relay;
354              
355             1;
356              
357             __END__
358              
359             ###############################################################################
360             ## ----------------------------------------------------------------------------
361             ## Module usage.
362             ##
363             ###############################################################################
364              
365             =head1 NAME
366              
367             MCE::Relay - Extends Many-Core Engine with relay capabilities
368              
369             =head1 VERSION
370              
371             This document describes MCE::Relay version 1.887
372              
373             =head1 SYNOPSIS
374              
375             use MCE::Flow;
376              
377             my $file = shift || \*STDIN;
378              
379             ## Line Count #######################################
380              
381             mce_flow_f {
382             max_workers => 4,
383             use_slurpio => 1,
384             init_relay => 0,
385             },
386             sub {
387             my ($mce, $slurp_ref, $chunk_id) = @_;
388             my $line_count = ($$slurp_ref =~ tr/\n//);
389              
390             ## Receive and pass on updated information.
391             my $lines_read = MCE::relay { $_ += $line_count };
392              
393             }, $file;
394              
395             my $total_lines = MCE->relay_final;
396              
397             print {*STDERR} "$total_lines\n";
398              
399             ## Orderly Action ###################################
400              
401             $| = 1; # Important, must flush output immediately.
402              
403             mce_flow_f {
404             max_workers => 2,
405             use_slurpio => 1,
406             init_relay => 0,
407             },
408             sub {
409             my ($mce, $slurp_ref, $chunk_id) = @_;
410              
411             ## The relay value is relayed and remains 0.
412             ## Writes to STDOUT orderly.
413              
414             MCE->relay_lock;
415             print $$slurp_ref;
416             MCE->relay_unlock;
417              
418             }, $file;
419              
420             =head1 DESCRIPTION
421              
422             This module enables workers to receive and pass on information orderly with
423             zero involvement by the manager process while running. The module is loaded
424             automatically when MCE option C<init_relay> is specified.
425              
426             All workers (belonging to task_id 0) must participate when relaying data.
427              
428             Relaying is not meant for passing big data. The last worker will stall if
429             exceeding the buffer size for the socket. Not exceeding 16 KiB - 7 is safe
430             across all platforms.
431              
432             =head1 API DOCUMENTATION
433              
434             =over 3
435              
436             =item MCE::relay { code }
437              
438             =item mce_relay { code } since 1.882
439              
440             =item MCE->relay ( sub { code } )
441              
442             =item $mce->relay ( sub { code } )
443              
444             =back
445              
446             Relay is enabled by defining the init_relay option which takes a hash or array
447             reference, or a scalar value. Relaying is orderly and driven by chunk_id when
448             processing data, otherwise task_wid. Omitting the code block (e.g. MCE::relay)
449             relays forward.
450              
451             Below, relaying multiple values via a HASH reference.
452              
453             use MCE::Flow max_workers => 4;
454              
455             mce_flow {
456             init_relay => { p => 0, e => 0 },
457             },
458             sub {
459             my $wid = MCE->wid;
460             my $pass = $wid % 3; # simulate work
461             my $errs = $wid % 2;
462              
463             ## relay (include the trailing semicolon)
464              
465             my %last_rpt = MCE::relay { $_->{p} += $pass; $_->{e} += $errs };
466              
467             MCE->print("$wid: passed $pass, errors $errs\n");
468              
469             return;
470             };
471              
472             my %results = MCE->relay_final;
473              
474             print " passed $results{p}, errors $results{e} final\n\n";
475              
476             -- Output
477              
478             1: passed 1, errors 1
479             2: passed 2, errors 0
480             3: passed 0, errors 1
481             4: passed 1, errors 0
482             passed 4, errors 2 final
483              
484             Or multiple values via an ARRAY reference.
485              
486             use MCE::Flow max_workers => 4;
487              
488             mce_flow {
489             init_relay => [ 0, 0 ],
490             },
491             sub {
492             my $wid = MCE->wid;
493              
494             ## do work
495             my $pass = $wid % 3;
496             my $errs = $wid % 2;
497              
498             ## relay
499             my @last_rpt = MCE::relay { $_->[0] += $pass; $_->[1] += $errs };
500              
501             MCE->print("$wid: passed $pass, errors $errs\n");
502              
503             return;
504             };
505              
506             my ($pass, $errs) = MCE->relay_final;
507              
508             print " passed $pass, errors $errs final\n\n";
509              
510             -- Output
511              
512             1: passed 1, errors 1
513             2: passed 2, errors 0
514             3: passed 0, errors 1
515             4: passed 1, errors 0
516             passed 4, errors 2 final
517              
518             Or simply a scalar value.
519              
520             use MCE::Flow max_workers => 4;
521              
522             mce_flow {
523             init_relay => 0,
524             },
525             sub {
526             my $wid = MCE->wid;
527              
528             ## do work
529             my $bytes_read = 1000 + ((MCE->wid % 3) * 3);
530              
531             ## relay
532             my $last_offset = MCE::relay { $_ += $bytes_read };
533              
534             ## output
535             MCE->print("$wid: $bytes_read\n");
536              
537             return;
538             };
539              
540             my $total = MCE->relay_final;
541              
542             print " $total size\n\n";
543              
544             -- Output
545              
546             1: 1003
547             2: 1006
548             3: 1000
549             4: 1003
550             4012 size
551              
552             =over 3
553              
554             =item MCE->relay_final ( void )
555              
556             =item $mce->relay_final ( void )
557              
558             =back
559              
560             Call this method to obtain the final relay value(s) after running. See included
561             example findnull.pl for another use case.
562              
563             use MCE max_workers => 4;
564              
565             my $mce = MCE->new(
566             init_relay => [ 0, 100 ], ## initial values (two counters)
567              
568             user_func => sub {
569             my ($mce) = @_;
570              
571             ## do work
572             my ($acc1, $acc2) = (10, 20);
573              
574             ## relay to next worker
575             MCE::relay { $_->[0] += $acc1; $_->[1] += $acc2 };
576              
577             return;
578             }
579             )->run;
580              
581             my ($cnt1, $cnt2) = $mce->relay_final;
582              
583             print "$cnt1 : $cnt2\n";
584              
585             -- Output
586              
587             40 : 180
588              
589             =over 3
590              
591             =item MCE->relay_recv ( void )
592              
593             =item $mce->relay_recv ( void )
594              
595             =back
596              
597             Call this method to obtain the next relay value before relaying. This allows
598             serial-code to be processed orderly between workers. The following is a parallel
599             demonstration for the fasta-benchmark on the web.
600              
601             # perl fasta.pl 25000000
602              
603             # The Computer Language Benchmarks game
604             # https://benchmarksgame-team.pages.debian.net/benchmarksgame/
605             #
606             # contributed by Barry Walsh
607             # port of fasta.rb #6
608             #
609             # MCE::Flow version by Mario Roy
610             # requires MCE 1.807+
611             # requires MCE::Shared 1.806+
612              
613             use strict;
614             use warnings;
615             use feature 'say';
616              
617             use MCE::Flow;
618             use MCE::Shared;
619             use MCE::Candy;
620              
621             use constant IM => 139968;
622             use constant IA => 3877;
623             use constant IC => 29573;
624              
625             my $LAST = MCE::Shared->scalar( 42 );
626              
627             my $alu =
628             'GGCCGGGCGCGGTGGCTCACGCCTGTAATCCCAGCACTTTGG' .
629             'GAGGCCGAGGCGGGCGGATCACCTGAGGTCAGGAGTTCGAGA' .
630             'CCAGCCTGGCCAACATGGTGAAACCCCGTCTCTACTAAAAAT' .
631             'ACAAAAATTAGCCGGGCGTGGTGGCGCGCGCCTGTAATCCCA' .
632             'GCTACTCGGGAGGCTGAGGCAGGAGAATCGCTTGAACCCGGG' .
633             'AGGCGGAGGTTGCAGTGAGCCGAGATCGCGCCACTGCACTCC' .
634             'AGCCTGGGCGACAGAGCGAGACTCCGTCTCAAAAA';
635              
636             my $iub = [
637             [ 'a', 0.27 ], [ 'c', 0.12 ], [ 'g', 0.12 ],
638             [ 't', 0.27 ], [ 'B', 0.02 ], [ 'D', 0.02 ],
639             [ 'H', 0.02 ], [ 'K', 0.02 ], [ 'M', 0.02 ],
640             [ 'N', 0.02 ], [ 'R', 0.02 ], [ 'S', 0.02 ],
641             [ 'V', 0.02 ], [ 'W', 0.02 ], [ 'Y', 0.02 ]
642             ];
643              
644             my $homosapiens = [
645             [ 'a', 0.3029549426680 ],
646             [ 'c', 0.1979883004921 ],
647             [ 'g', 0.1975473066391 ],
648             [ 't', 0.3015094502008 ]
649             ];
650              
651             sub make_repeat_fasta {
652             my ( $src, $n ) = @_;
653             my $width = qr/(.{1,60})/;
654             my $l = length $src;
655             my $s = $src x ( ($n / $l) + 1 );
656             substr( $s, $n, $l ) = '';
657              
658             while ( $s =~ m/$width/g ) { say $1 }
659             }
660              
661             sub make_random_fasta {
662             my ( $table, $n ) = @_;
663             my $rand = undef;
664             my $width = 60;
665             my $prob = 0.0;
666             my $output = '';
667             my ( $c1, $c2, $last );
668              
669             $_->[1] = ( $prob += $_->[1] ) for @$table;
670              
671             $c1 = '$rand = ( $last = ( $last * IA + IC ) % IM ) / IM;';
672             $c1 .= "\$output .= '$_->[0]', next if $_->[1] > \$rand;\n" for @$table;
673              
674             my $seq = MCE::Shared->sequence(
675             { chunk_size => 2000, bounds_only => 1 },
676             1, $n / $width
677             );
678              
679             my $code1 = q{
680             while ( 1 ) {
681             # --------------------------------------------
682             # Process code orderly between workers.
683             # --------------------------------------------
684              
685             my $chunk_id = MCE->relay_recv;
686             my ( $begin, $end ) = $seq->next;
687              
688             MCE->relay, last if ( !defined $begin );
689              
690             my $last = $LAST->get;
691             my $temp = $last;
692              
693             # Pre-compute $LAST value for the next worker
694             for ( 1 .. ( $end - $begin + 1 ) * $width ) {
695             $temp = ( $temp * IA + IC ) % IM;
696             }
697              
698             $LAST->set( $temp );
699              
700             # Increment chunk_id value
701             MCE->relay( sub { $_ += 1 } );
702              
703             # --------------------------------------------
704             # Also run code in parallel between workers.
705             # --------------------------------------------
706              
707             for ( $begin .. $end ) {
708             for ( 1 .. $width ) { !C! }
709             $output .= "\n";
710             }
711              
712             # --------------------------------------------
713             # Display orderly.
714             # --------------------------------------------
715              
716             MCE->gather( $chunk_id, $output );
717              
718             $output = '';
719             }
720             };
721              
722             $code1 =~ s/!C!/$c1/g;
723              
724             MCE::Flow->init(
725             max_workers => 4, ## MCE::Util->get_ncpu || 4,
726             gather => MCE::Candy::out_iter_fh( \*STDOUT ),
727             init_relay => 1,
728             use_threads => 0,
729             );
730              
731             MCE::Flow->run( sub { eval $code1 } );
732             MCE::Flow->finish;
733              
734             $last = $LAST->get;
735              
736             $c2 = '$rand = ( $last = ( $last * IA + IC ) % IM ) / IM;';
737             $c2 .= "print('$_->[0]'), next if $_->[1] > \$rand;\n" for @$table;
738              
739             my $code2 = q{
740             if ( $n % $width != 0 ) {
741             for ( 1 .. $n % $width ) { !C! }
742             print "\n";
743             }
744             };
745              
746             $code2 =~ s/!C!/$c2/g;
747             eval $code2;
748              
749             $LAST->set( $last );
750             }
751              
752             my $n = $ARGV[0] || 27;
753              
754             say ">ONE Homo sapiens alu";
755             make_repeat_fasta( $alu, $n * 2 );
756              
757             say ">TWO IUB ambiguity codes";
758             make_random_fasta( $iub, $n * 3 );
759              
760             say ">THREE Homo sapiens frequency";
761             make_random_fasta( $homosapiens, $n * 5 );
762              
763             =over 3
764              
765             =item MCE->relay_lock ( void )
766              
767             =item MCE->relay_unlock ( void )
768              
769             =item $mce->relay_lock ( void )
770              
771             =item $mce->relay_unlock ( void )
772              
773             =back
774              
775             The C<relay_lock> and C<relay_unlock> methods, added to MCE 1.807, are
776             aliases for C<relay_recv> and C<relay> respectively. Together, they allow
777             one to perform an exclusive action prior to actual relaying of data.
778              
779             Relaying is driven by C<chunk_id> or C<task_wid> when not processing input,
780             as seen here.
781              
782             MCE->new(
783             max_workers => 8,
784             init_relay => 0,
785             user_func => sub {
786             MCE->relay_lock;
787             MCE->say("wid: ", MCE->task_wid);
788             MCE->relay_unlock( sub {
789             $_ += 2;
790             });
791             }
792             )->run;
793              
794             MCE->say("sum: ", MCE->relay_final);
795              
796             __END__
797              
798             wid: 1
799             wid: 2
800             wid: 3
801             wid: 4
802             wid: 5
803             wid: 6
804             wid: 7
805             wid: 8
806             sum: 16
807              
808             Described above, C<relay> takes a code block and combines C<relay_lock> and
809             C<relay_unlock> into a single call. To make this more interesting, I define
810             C<init_relay> to a hash containing two key-value pairs.
811              
812             MCE->new(
813             max_workers => 8,
814             init_relay => { count => 0, total => 0 },
815             user_func => sub {
816             MCE->relay_lock;
817             MCE->say("wid: ", MCE->task_wid);
818             MCE->relay_unlock( sub {
819             $_->{count} += 1;
820             $_->{total} += 2;
821             });
822             }
823             )->run;
824              
825             my %results = MCE->relay_final;
826              
827             MCE->say("count: ", $results{count});
828             MCE->say("total: ", $results{total});
829              
830             __END__
831              
832             wid: 1
833             wid: 2
834             wid: 3
835             wid: 4
836             wid: 5
837             wid: 6
838             wid: 7
839             wid: 8
840             count: 8
841             total: 16
842              
843             Below, C<user_func> is taken from the C<cat.pl> MCE example. Incrementing
844             the count is done only when the C<-n> switch is passed to the script.
845             Otherwise, output is displaced orderly and not necessary to update the
846             C<$_> value if exclusive locking is all you need.
847              
848             user_func => sub {
849             my ($mce, $chunk_ref, $chunk_id) = @_;
850              
851             if ($n_flag) {
852             ## Relays the total lines read.
853              
854             my $output = ''; my $line_count = ($$chunk_ref =~ tr/\n//);
855             my $lines_read = MCE::relay { $_ += $line_count };
856              
857             open my $fh, '<', $chunk_ref;
858             $output .= sprintf "%6d\t%s", ++$lines_read, $_ while (<$fh>);
859             close $fh;
860              
861             $output .= ":$chunk_id";
862             MCE->do('display_chunk', $output);
863             }
864             else {
865             ## The following is another way to have ordered output. Workers
866             ## write directly to STDOUT exclusively without any involvement
867             ## from the manager process. The statement(s) between relay_lock
868             ## and relay_unlock run serially and most important orderly.
869              
870             MCE->relay_lock; # alias for MCE->relay_recv
871             print $$chunk_ref; # ensure $| = 1 in script
872             MCE->relay_unlock; # alias for MCE->relay
873             }
874              
875             return;
876             }
877              
878             The following is a variant of the fasta-benchmark demonstration shown above.
879             Here, workers write exclusively and orderly to C<STDOUT>.
880              
881             # perl fasta.pl 25000000
882              
883             # The Computer Language Benchmarks game
884             # https://benchmarksgame-team.pages.debian.net/benchmarksgame/
885             #
886             # contributed by Barry Walsh
887             # port of fasta.rb #6
888             #
889             # MCE::Flow version by Mario Roy
890             # requires MCE 1.807+
891             # requires MCE::Shared 1.806+
892              
893             use strict;
894             use warnings;
895             use feature 'say';
896              
897             use MCE::Flow;
898             use MCE::Shared;
899              
900             use constant IM => 139968;
901             use constant IA => 3877;
902             use constant IC => 29573;
903              
904             my $LAST = MCE::Shared->scalar( 42 );
905              
906             my $alu =
907             'GGCCGGGCGCGGTGGCTCACGCCTGTAATCCCAGCACTTTGG' .
908             'GAGGCCGAGGCGGGCGGATCACCTGAGGTCAGGAGTTCGAGA' .
909             'CCAGCCTGGCCAACATGGTGAAACCCCGTCTCTACTAAAAAT' .
910             'ACAAAAATTAGCCGGGCGTGGTGGCGCGCGCCTGTAATCCCA' .
911             'GCTACTCGGGAGGCTGAGGCAGGAGAATCGCTTGAACCCGGG' .
912             'AGGCGGAGGTTGCAGTGAGCCGAGATCGCGCCACTGCACTCC' .
913             'AGCCTGGGCGACAGAGCGAGACTCCGTCTCAAAAA';
914              
915             my $iub = [
916             [ 'a', 0.27 ], [ 'c', 0.12 ], [ 'g', 0.12 ],
917             [ 't', 0.27 ], [ 'B', 0.02 ], [ 'D', 0.02 ],
918             [ 'H', 0.02 ], [ 'K', 0.02 ], [ 'M', 0.02 ],
919             [ 'N', 0.02 ], [ 'R', 0.02 ], [ 'S', 0.02 ],
920             [ 'V', 0.02 ], [ 'W', 0.02 ], [ 'Y', 0.02 ]
921             ];
922              
923             my $homosapiens = [
924             [ 'a', 0.3029549426680 ],
925             [ 'c', 0.1979883004921 ],
926             [ 'g', 0.1975473066391 ],
927             [ 't', 0.3015094502008 ]
928             ];
929              
930             sub make_repeat_fasta {
931             my ( $src, $n ) = @_;
932             my $width = qr/(.{1,60})/;
933             my $l = length $src;
934             my $s = $src x ( ($n / $l) + 1 );
935             substr( $s, $n, $l ) = '';
936              
937             while ( $s =~ m/$width/g ) { say $1 }
938             }
939              
940             sub make_random_fasta {
941             my ( $table, $n ) = @_;
942             my $rand = undef;
943             my $width = 60;
944             my $prob = 0.0;
945             my $output = '';
946             my ( $c1, $c2, $last );
947              
948             $_->[1] = ( $prob += $_->[1] ) for @$table;
949              
950             $c1 = '$rand = ( $last = ( $last * IA + IC ) % IM ) / IM;';
951             $c1 .= "\$output .= '$_->[0]', next if $_->[1] > \$rand;\n" for @$table;
952              
953             my $seq = MCE::Shared->sequence(
954             { chunk_size => 2000, bounds_only => 1 },
955             1, $n / $width
956             );
957              
958             my $code1 = q{
959             $| = 1; # Important, must flush output immediately.
960              
961             while ( 1 ) {
962             # --------------------------------------------
963             # Process code orderly between workers.
964             # --------------------------------------------
965              
966             MCE->relay_lock;
967              
968             my ( $begin, $end ) = $seq->next;
969             print( $output ), $output = '' if ( length $output );
970              
971             MCE->relay_unlock, last if ( !defined $begin );
972              
973             my $last = $LAST->get;
974             my $temp = $last;
975              
976             # Pre-compute $LAST value for the next worker
977             for ( 1 .. ( $end - $begin + 1 ) * $width ) {
978             $temp = ( $temp * IA + IC ) % IM;
979             }
980              
981             $LAST->set( $temp );
982              
983             MCE->relay_unlock;
984              
985             # --------------------------------------------
986             # Also run code in parallel.
987             # --------------------------------------------
988              
989             for ( $begin .. $end ) {
990             for ( 1 .. $width ) { !C! }
991             $output .= "\n";
992             }
993             }
994             };
995              
996             $code1 =~ s/!C!/$c1/g;
997              
998             MCE::Flow->init(
999             max_workers => 4, ## MCE::Util->get_ncpu || 4,
1000             init_relay => 0,
1001             use_threads => 0,
1002             );
1003              
1004             MCE::Flow->run( sub { eval $code1 } );
1005             MCE::Flow->finish;
1006              
1007             $last = $LAST->get;
1008              
1009             $c2 = '$rand = ( $last = ( $last * IA + IC ) % IM ) / IM;';
1010             $c2 .= "print('$_->[0]'), next if $_->[1] > \$rand;\n" for @$table;
1011              
1012             my $code2 = q{
1013             if ( $n % $width != 0 ) {
1014             for ( 1 .. $n % $width ) { !C! }
1015             print "\n";
1016             }
1017             };
1018              
1019             $code2 =~ s/!C!/$c2/g;
1020             eval $code2;
1021              
1022             $LAST->set( $last );
1023             }
1024              
1025             my $n = $ARGV[0] || 27;
1026              
1027             say ">ONE Homo sapiens alu";
1028             make_repeat_fasta( $alu, $n * 2 );
1029              
1030             say ">TWO IUB ambiguity codes";
1031             make_random_fasta( $iub, $n * 3 );
1032              
1033             say ">THREE Homo sapiens frequency";
1034             make_random_fasta( $homosapiens, $n * 5 );
1035              
1036             =head1 GATHER AND RELAY DEMONSTRATIONS
1037              
1038             I received a request from John Martel to process a large flat file and expand
1039             each record to many records based on splitting out items in field 4 delimited
1040             by semicolons. Each row in the output is given a unique ID starting with one
1041             while preserving output order.
1042              
1043             =over 3
1044              
1045             =item Input File, possibly larger than 500 GiB in size
1046              
1047             foo|field2|field3|item1;item2;item3;item4;itemN|field5|field6|field7
1048             bar|field2|field3|item1;item2;item3;item4;itemN|field5|field6|field7
1049             baz|field2|field3|item1;item2;item3;item4;itemN|field5|field6|field7
1050             ...
1051              
1052             =item Output File
1053              
1054             000000000000001|item1|foo|field2|field3|field5|field6|field7
1055             000000000000002|item2|foo|field2|field3|field5|field6|field7
1056             000000000000003|item3|foo|field2|field3|field5|field6|field7
1057             000000000000004|item4|foo|field2|field3|field5|field6|field7
1058             000000000000005|itemN|foo|field2|field3|field5|field6|field7
1059             000000000000006|item1|bar|field2|field3|field5|field6|field7
1060             000000000000007|item2|bar|field2|field3|field5|field6|field7
1061             000000000000008|item3|bar|field2|field3|field5|field6|field7
1062             000000000000009|item4|bar|field2|field3|field5|field6|field7
1063             000000000000010|itemN|bar|field2|field3|field5|field6|field7
1064             000000000000011|item1|baz|field2|field3|field5|field6|field7
1065             000000000000012|item2|baz|field2|field3|field5|field6|field7
1066             000000000000013|item3|baz|field2|field3|field5|field6|field7
1067             000000000000014|item4|baz|field2|field3|field5|field6|field7
1068             000000000000015|itemN|baz|field2|field3|field5|field6|field7
1069             ...
1070              
1071             =item Example One
1072              
1073             =back
1074              
1075             This example configures a custom function for preserving output order.
1076             Unfortunately, the sprintf function alone involves extra CPU time causing
1077             the manager process to fall behind. Thus, workers may idle while waiting
1078             for the manager process to respond to the gather request.
1079              
1080             use strict;
1081             use warnings;
1082              
1083             use MCE::Loop;
1084              
1085             my $infile = shift or die "Usage: $0 infile\n";
1086             my $newfile = 'output.dat';
1087              
1088             open my $fh_out, '>', $newfile or die "open error $newfile: $!\n";
1089              
1090             sub preserve_order {
1091             my ($fh) = @_;
1092             my ($order_id, $start_idx, $idx, %tmp) = (1, 1);
1093              
1094             return sub {
1095             my ($chunk_id, $aref) = @_;
1096             $tmp{ $chunk_id } = $aref;
1097              
1098             while ( my $aref = delete $tmp{ $order_id } ) {
1099             foreach my $line ( @{ $aref } ) {
1100             $idx = sprintf "%015d", $start_idx++;
1101             print $fh $idx, $line;
1102             }
1103             $order_id++;
1104             }
1105             }
1106             }
1107              
1108             MCE::Loop->init(
1109             chunk_size => 'auto', max_workers => 3,
1110             gather => preserve_order($fh_out)
1111             );
1112              
1113             mce_loop_f {
1114             my ($mce, $chunk_ref, $chunk_id) = @_;
1115             my @buf;
1116              
1117             foreach my $line (@{ $chunk_ref }) {
1118             $line =~ s/\r//g; chomp $line;
1119              
1120             my ($f1,$f2,$f3,$items,$f5,$f6,$f7) = split /\|/, $line;
1121             my @items_array = split /;/, $items;
1122              
1123             foreach my $item (@items_array) {
1124             push @buf, "|$item|$f1|$f2|$f3|$f5|$f6|$f7\n";
1125             }
1126             }
1127              
1128             MCE->gather($chunk_id, \@buf);
1129              
1130             } $infile;
1131              
1132             MCE::Loop->finish();
1133             close $fh_out;
1134              
1135             =over 3
1136              
1137             =item Example Two
1138              
1139             =back
1140              
1141             In this example, workers obtain the current ID value and increment/relay for
1142             the next worker, ordered by chunk ID behind the scene. Workers call sprintf
1143             in parallel, allowing the manager process (out_iter_fh) to accommodate up to
1144             32 workers and not fall behind.
1145              
1146             Relay accounts for the worker handling the next chunk_id value. Therefore, do
1147             not call relay more than once per chunk. Doing so will cause IPC to stall.
1148              
1149             use strict;
1150             use warnings;
1151              
1152             use MCE::Loop;
1153             use MCE::Candy;
1154              
1155             my $infile = shift or die "Usage: $0 infile\n";
1156             my $newfile = 'output.dat';
1157              
1158             open my $fh_out, '>', $newfile or die "open error $newfile: $!\n";
1159              
1160             MCE::Loop->init(
1161             chunk_size => 'auto', max_workers => 8,
1162             gather => MCE::Candy::out_iter_fh($fh_out),
1163             init_relay => 1
1164             );
1165              
1166             mce_loop_f {
1167             my ($mce, $chunk_ref, $chunk_id) = @_;
1168             my @lines;
1169              
1170             foreach my $line (@{ $chunk_ref }) {
1171             $line =~ s/\r//g; chomp $line;
1172              
1173             my ($f1,$f2,$f3,$items,$f5,$f6,$f7) = split /\|/, $line;
1174             my @items_array = split /;/, $items;
1175              
1176             foreach my $item (@items_array) {
1177             push @lines, "$item|$f1|$f2|$f3|$f5|$f6|$f7\n";
1178             }
1179             }
1180              
1181             my $idx = MCE::relay { $_ += scalar @lines };
1182             my $buf = '';
1183              
1184             foreach my $line ( @lines ) {
1185             $buf .= sprintf "%015d|%s", $idx++, $line
1186             }
1187              
1188             MCE->gather($chunk_id, $buf);
1189              
1190             } $infile;
1191              
1192             MCE::Loop->finish();
1193             close $fh_out;
1194              
1195             =head1 INDEX
1196              
1197             L<MCE|MCE>, L<MCE::Core>
1198              
1199             =head1 AUTHOR
1200              
1201             Mario E. Roy, S<E<lt>marioeroy AT gmail DOT comE<gt>>
1202              
1203             =cut
1204