File Coverage

blib/lib/MCE/Relay.pm
Criterion Covered Total %
statement 153 186 82.2
branch 62 112 55.3
condition 7 12 58.3
subroutine 13 13 100.0
pod 0 3 0.0
total 235 326 72.0


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## Extends Many-Core Engine with relay capabilities.
4             ##
5             ###############################################################################
6              
7             package MCE::Relay;
8              
9 10     10   1165 use strict;
  10         11  
  10         318  
10 10     10   50 use warnings;
  10         20  
  10         301  
11              
12 10     10   124 no warnings qw( threads recursion uninitialized numeric );
  10         20  
  10         1130  
13              
14             our $VERSION = '1.888';
15              
16             ## no critic (Subroutines::ProhibitSubroutinePrototypes)
17              
18             use constant {
19 10         8676 OUTPUT_W_RLA => 'W~RLA', # Worker has relayed
20             OUTPUT_R_NFY => 'R~NFY', # Relay notification
21 10     10   79 };
  10         10  
22              
23             ###############################################################################
24             ## ----------------------------------------------------------------------------
25             ## Import routine.
26             ##
27             ###############################################################################
28              
29             my $LF = "\012"; Internals::SvREADONLY($LF, 1);
30             my $_imported;
31              
32             sub import {
33              
34 11 100   11   1289 return if ($_imported++);
35              
36 10 50       39 if ($INC{'MCE.pm'}) {
37 10         29 _mce_m_init();
38             }
39             else {
40 0         0 $\ = undef; require Carp;
  0         0  
41 0         0 Carp::croak(
42             "MCE::Relay cannot be used directly. Please consult the MCE::Relay\n".
43             "documentation for more information.\n\n"
44             );
45             }
46              
47 10         29 return;
48             }
49              
50             ###############################################################################
51             ## ----------------------------------------------------------------------------
52             ## Output routines for the manager process.
53             ##
54             ###############################################################################
55              
56             {
57             my ($_MCE, $_DAU_R_SOCK_REF, $_DAU_R_SOCK, $_rla_nextid, $_max_workers);
58              
59             my %_output_function = (
60              
61             OUTPUT_W_RLA.$LF => sub { # Worker has relayed
62              
63             $_rla_nextid = 0 if ( ++$_rla_nextid == $_max_workers );
64              
65             return;
66             },
67              
68             OUTPUT_R_NFY.$LF => sub { # Relay notification
69              
70             $_MCE->{_relayed}++;
71              
72             return;
73             },
74              
75             );
76              
77             sub _mce_m_loop_begin {
78              
79 16     16   112 ($_MCE, $_DAU_R_SOCK_REF) = @_;
80              
81 16         184 my $_caller = $_MCE->{_caller};
82              
83             $_max_workers = (exists $_MCE->{user_tasks})
84             ? $_MCE->{user_tasks}[0]{max_workers}
85 16 50       76 : $_MCE->{max_workers};
86              
87             ## Write initial relay data.
88 16 50       50 if (defined $_MCE->{init_relay}) {
89 16         46 my $_ref = ref $_MCE->{init_relay};
90              
91 16 50 100     239 MCE::_croak("MCE::Relay: (init_relay) is not valid")
      66        
92             if ($_ref ne '' && $_ref ne 'HASH' && $_ref ne 'ARRAY');
93              
94 16         40 my $_RLA_W_SOCK = $_MCE->{_rla_w_sock}->[0];
95 16         109 my $_init_relay;
96              
97 16         52 $_MCE->{_relayed} = 0;
98              
99 16 100       132 if (ref $_MCE->{init_relay} eq '') {
    100          
    50          
100 4         119 $_init_relay = $_MCE->{freeze}(\$_MCE->{init_relay}) . '0';
101             }
102             elsif (ref $_MCE->{init_relay} eq 'HASH') {
103 5         145 $_init_relay = $_MCE->{freeze}($_MCE->{init_relay}) . '1';
104             }
105             elsif (ref $_MCE->{init_relay} eq 'ARRAY') {
106 7         126 $_init_relay = $_MCE->{freeze}($_MCE->{init_relay}) . '2';
107             }
108              
109 16         70 print {$_RLA_W_SOCK} length($_init_relay) . $LF . $_init_relay;
  16         722  
110              
111 16         71 $_rla_nextid = 0;
112             }
113              
114 16         42 delete $MCE::RLA->{$_caller};
115              
116 16         80 return;
117             }
118              
119             sub _mce_m_loop_end {
120              
121             ## Obtain final relay data.
122 16 50   16   51 if (defined $_MCE->{init_relay}) {
123 16         52 my $_RLA_R_SOCK = $_MCE->{_rla_r_sock}->[$_rla_nextid];
124 16         56 my ($_caller, $_len, $_ret) = ($_MCE->{_caller});
125              
126 16         48 delete $_MCE->{_relayed};
127              
128 16 50       97 MCE::Util::_sock_ready($_RLA_R_SOCK, -1) if $^O eq 'MSWin32';
129 16         377 chomp($_len = <$_RLA_R_SOCK>);
130 16         187 read $_RLA_R_SOCK, $_ret, $_len;
131              
132 16 100       80 if (chop $_ret) {
133 12         260 $MCE::RLA->{$_caller} = $_MCE->{thaw}($_ret);
134             } else {
135 4         33 $MCE::RLA->{$_caller} = ${ $_MCE->{thaw}($_ret) };
  4         88  
136             }
137             }
138              
139             ## Clear variables.
140 16         65 $_MCE = $_DAU_R_SOCK_REF = $_DAU_R_SOCK = undef;
141 16         40 $_rla_nextid = $_max_workers = undef;
142              
143 16         65 return;
144             }
145              
146             sub _mce_m_init {
147              
148 10     10   58 MCE::_attach_plugin(
149             \%_output_function, \&_mce_m_loop_begin, \&_mce_m_loop_end
150             );
151              
152 10         20 return;
153             }
154             }
155              
156             ###############################################################################
157             ## ----------------------------------------------------------------------------
158             ## Relay methods.
159             ##
160             ###############################################################################
161              
162             ## Items below are folded into MCE.
163              
164             package # hide from rpm
165             MCE;
166              
167 10     10   79 no warnings qw( threads recursion uninitialized redefine );
  10         20  
  10         383  
168              
169 10     10   52 use Scalar::Util qw( weaken );
  10         19  
  10         14874  
170              
171             sub relay_final {
172              
173 16 50   16 0 301 my $x = shift; my $self = ref($x) ? $x : $MCE::MCE;
  16         171  
174              
175             _croak('MCE::relay_final: method is not allowed by the worker process')
176 16 50       96 if ($self->{_wid});
177              
178 16         88 my $_caller = caller;
179              
180 16 50       86 if (exists $MCE::RLA->{$_caller}) {
181 16 100       799 if (ref $MCE::RLA->{$_caller} eq '') {
    100          
    50          
182 4         59 return delete $MCE::RLA->{$_caller};
183             }
184             elsif (ref $MCE::RLA->{$_caller} eq 'HASH') {
185 5         30 return %{ delete $MCE::RLA->{$_caller} };
  5         100  
186             }
187             elsif (ref $MCE::RLA->{$_caller} eq 'ARRAY') {
188 7         35 return @{ delete $MCE::RLA->{$_caller} };
  7         56  
189             }
190              
191             # should not reach the following line
192 0         0 delete $MCE::RLA->{$_caller};
193             }
194              
195 0         0 return;
196             }
197              
198             sub relay_recv {
199              
200 24 50   24 0 302 my $x = shift; my $self = ref($x) ? $x : $MCE::MCE;
  24         70  
201              
202             _croak('MCE::relay_recv: (init_relay) is not defined')
203 24 50       74 unless (defined $self->{init_relay});
204             _croak('MCE::relay_recv: method is not allowed by the manager process')
205 24 50       57 unless ($self->{_wid});
206             _croak('MCE::relay_recv: method is not allowed by task_id > 0')
207 24 50       64 if ($self->{_task_id} > 0);
208              
209 24         51 my ($_chn, $_nxt, $_rdr, $_len, $_ref); local $_;
  24         38  
210              
211 24 50       77 local $\ = undef if (defined $\);
212 24 50       75 local $/ = $LF if ($/ ne $LF );
213              
214 24   33     152 $_chn = $self->{_chunk_id} || $self->{_wid};
215 24         56 $_chn = ($_chn - 1) % $self->{max_workers};
216 24         37 $_nxt = $_chn + 1;
217 24 100       52 $_nxt = 0 if ($_nxt == $self->{max_workers});
218 24         53 $_rdr = $self->{_rla_r_sock}->[$_chn];
219              
220 24         35 print {$self->{_dat_w_sock}->[0]} OUTPUT_W_RLA.$LF . '0'.$LF;
  24         797  
221              
222 24 50       140 MCE::Util::_sock_ready($_rdr, -1) if $^O eq 'MSWin32';
223 24         10625 chomp($_len = <$_rdr>);
224 24         219 read $_rdr, $_, $_len;
225 24         78 $_ref = chop $_;
226              
227 24 100       222 if ($_ref == 0) { ## scalar value
    100          
    50          
228 8         16 $self->{_rla_data} = ${ $self->{thaw}($_) };
  8         130  
229 8 50       44 return unless defined wantarray;
230 8         43 return $self->{_rla_data};
231             }
232             elsif ($_ref == 1) { ## hash reference
233 8         166 $self->{_rla_data} = $self->{thaw}($_);
234 8 50       35 return unless defined wantarray;
235 8         15 return %{ $self->{_rla_data} };
  8         84  
236             }
237             elsif ($_ref == 2) { ## array reference
238 8         184 $self->{_rla_data} = $self->{thaw}($_);
239 8 50       33 return unless defined wantarray;
240 8         13 return @{ $self->{_rla_data} };
  8         62  
241             }
242              
243 0         0 return;
244             }
245              
246             sub relay (;&) {
247              
248 26     26 0 494 my ($self, $_code);
249              
250 26 50       100 if (ref $_[0] eq 'CODE') {
251 0         0 ($self, $_code) = ($MCE::MCE, shift);
252             } else {
253 26 50       66 my $x = shift; $self = ref($x) ? $x : $MCE::MCE;
  26         82  
254 26         54 $_code = shift;
255             }
256              
257             _croak('MCE::relay: (init_relay) is not defined')
258 26 50       117 unless (defined $self->{init_relay});
259             _croak('MCE::relay: method is not allowed by the manager process')
260 26 50       70 unless ($self->{_wid});
261             _croak('MCE::relay: method is not allowed by task_id > 0')
262 26 50       73 if ($self->{_task_id} > 0);
263              
264 26 50       98 if (ref $_code ne 'CODE') {
265 0 0       0 _croak('MCE::relay: argument is not a code block') if (defined $_code);
266             } else {
267 26         95 weaken $_code;
268             }
269              
270 26         93 my ($_chn, $_cid, $_nxt, $_rdr, $_wtr);
271              
272 26 50       78 local $\ = undef if (defined $\);
273 26 50       79 local $/ = $LF if ($/ ne $LF );
274              
275 26   33     197 $_chn = $_cid = $self->{_chunk_id} || $self->{_wid};
276 26         72 $_chn = ($_chn - 1) % $self->{max_workers};
277 26         36 $_nxt = $_chn + 1;
278 26 100       63 $_nxt = 0 if ($_nxt == $self->{max_workers});
279 26         57 $_rdr = $self->{_rla_r_sock}->[$_chn];
280 26         62 $_wtr = $self->{_rla_w_sock}->[$_nxt];
281              
282 26 100       69 if (exists $self->{_rla_data}) {
283 24         32 my $_tmp; local $_ = delete $self->{_rla_data};
  24         62  
284 24 50       118 $_code->() if (ref $_code eq 'CODE');
285              
286 24 100       160 if (ref $_ eq '') { ## scalar value
    100          
    50          
287 8         58 $_tmp = $self->{freeze}(\$_) . '0';
288             }
289             elsif (ref $_ eq 'HASH') { ## hash reference
290 8         91 $_tmp = $self->{freeze}($_) . '1';
291             }
292             elsif (ref $_ eq 'ARRAY') { ## array reference
293 8         53 $_tmp = $self->{freeze}($_) . '2';
294             }
295              
296 24         50 print {$_wtr} length($_tmp) . $LF . $_tmp;
  24         937  
297 24         89 print {$self->{_dat_w_sock}->[0]} OUTPUT_R_NFY.$LF . '0'.$LF;
  24         727  
298 24         155 $self->{_relayed} = $_cid;
299             }
300             else {
301 2         3 my ($_len, $_ref); local $_;
  2         6  
302 2         3 print {$self->{_dat_w_sock}->[0]} OUTPUT_W_RLA.$LF . '0'.$LF;
  2         46  
303              
304 2 50       15 MCE::Util::_sock_ready($_rdr, -1) if $^O eq 'MSWin32';
305 2         2583 chomp($_len = <$_rdr>);
306 2         20 read $_rdr, $_, $_len;
307 2         10 $_ref = chop $_;
308              
309 2 50       14 if ($_ref == 0) { ## scalar value
    0          
    0          
310 2         4 my $_ret = ${ $self->{thaw}($_) };
  2         22  
311 2 50       6 local $_ = $_ret; $_code->() if (ref $_code eq 'CODE');
  2         19  
312 2         28 my $_tmp = $self->{freeze}(\$_) . '0';
313              
314 2         5 print {$_wtr} length($_tmp) . $LF . $_tmp;
  2         85  
315 2         9 print {$self->{_dat_w_sock}->[0]} OUTPUT_R_NFY.$LF . '0'.$LF;
  2         27  
316 2         10 $self->{_relayed} = $_cid;
317              
318 2 50       19 return unless defined wantarray;
319 0         0 return $_ret;
320             }
321             elsif ($_ref == 1) { ## hash reference
322 0         0 my %_ret = %{ $self->{thaw}($_) };
  0         0  
323 0 0       0 local $_ = { %_ret }; $_code->() if (ref $_code eq 'CODE');
  0         0  
324 0         0 my $_tmp = $self->{freeze}($_) . '1';
325              
326 0         0 print {$_wtr} length($_tmp) . $LF . $_tmp;
  0         0  
327 0         0 print {$self->{_dat_w_sock}->[0]} OUTPUT_R_NFY.$LF . '0'.$LF;
  0         0  
328 0         0 $self->{_relayed} = $_cid;
329              
330 0 0       0 return unless defined wantarray;
331 0         0 return %_ret;
332             }
333             elsif ($_ref == 2) { ## array reference
334 0         0 my @_ret = @{ $self->{thaw}($_) };
  0         0  
335 0 0       0 local $_ = [ @_ret ]; $_code->() if (ref $_code eq 'CODE');
  0         0  
336 0         0 my $_tmp = $self->{freeze}($_) . '2';
337              
338 0         0 print {$_wtr} length($_tmp) . $LF . $_tmp;
  0         0  
339 0         0 print {$self->{_dat_w_sock}->[0]} OUTPUT_R_NFY.$LF . '0'.$LF;
  0         0  
340 0         0 $self->{_relayed} = $_cid;
341              
342 0 0       0 return unless defined wantarray;
343 0         0 return @_ret;
344             }
345             }
346              
347 24         307 return;
348             }
349              
350             ## Aliases.
351              
352             *relay_lock = \&relay_recv;
353             *relay_unlock = \&relay;
354              
355             1;
356              
357             __END__