File Coverage

blib/lib/MCE/Relay.pm
Criterion Covered Total %
statement 153 186 82.2
branch 62 112 55.3
condition 7 12 58.3
subroutine 13 13 100.0
pod 0 3 0.0
total 235 326 72.0


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## Extends Many-Core Engine with relay capabilities.
4             ##
5             ###############################################################################
6              
7             package MCE::Relay;
8              
9 10     10   1257 use strict;
  10         20  
  10         338  
10 10     10   49 use warnings;
  10         20  
  10         290  
11              
12 10     10   49 no warnings qw( threads recursion uninitialized numeric );
  10         20  
  10         685  
13              
14             our $VERSION = '1.889';
15              
16             ## no critic (Subroutines::ProhibitSubroutinePrototypes)
17              
18             use constant {
19 10         8803 OUTPUT_W_RLA => 'W~RLA', # Worker has relayed
20             OUTPUT_R_NFY => 'R~NFY', # Relay notification
21 10     10   59 };
  10         20  
22              
23             ###############################################################################
24             ## ----------------------------------------------------------------------------
25             ## Import routine.
26             ##
27             ###############################################################################
28              
29             my $LF = "\012"; Internals::SvREADONLY($LF, 1);
30             my $_imported;
31              
32             sub import {
33              
34 11 100   11   1262 return if ($_imported++);
35              
36 10 50       40 if ($INC{'MCE.pm'}) {
37 10         20 _mce_m_init();
38             }
39             else {
40 0         0 $\ = undef; require Carp;
  0         0  
41 0         0 Carp::croak(
42             "MCE::Relay cannot be used directly. Please consult the MCE::Relay\n".
43             "documentation for more information.\n\n"
44             );
45             }
46              
47 10         37 return;
48             }
49              
50             ###############################################################################
51             ## ----------------------------------------------------------------------------
52             ## Output routines for the manager process.
53             ##
54             ###############################################################################
55              
56             {
57             my ($_MCE, $_DAU_R_SOCK_REF, $_DAU_R_SOCK, $_rla_nextid, $_max_workers);
58              
59             my %_output_function = (
60              
61             OUTPUT_W_RLA.$LF => sub { # Worker has relayed
62              
63             $_rla_nextid = 0 if ( ++$_rla_nextid == $_max_workers );
64              
65             return;
66             },
67              
68             OUTPUT_R_NFY.$LF => sub { # Relay notification
69              
70             $_MCE->{_relayed}++;
71              
72             return;
73             },
74              
75             );
76              
77             sub _mce_m_loop_begin {
78              
79 16     16   57 ($_MCE, $_DAU_R_SOCK_REF) = @_;
80              
81 16         153 my $_caller = $_MCE->{_caller};
82              
83             $_max_workers = (exists $_MCE->{user_tasks})
84             ? $_MCE->{user_tasks}[0]{max_workers}
85 16 50       78 : $_MCE->{max_workers};
86              
87             ## Write initial relay data.
88 16 50       51 if (defined $_MCE->{init_relay}) {
89 16         76 my $_ref = ref $_MCE->{init_relay};
90              
91 16 50 100     199 MCE::_croak("MCE::Relay: (init_relay) is not valid")
      66        
92             if ($_ref ne '' && $_ref ne 'HASH' && $_ref ne 'ARRAY');
93              
94 16         58 my $_RLA_W_SOCK = $_MCE->{_rla_w_sock}->[0];
95 16         17 my $_init_relay;
96              
97 16         49 $_MCE->{_relayed} = 0;
98              
99 16 100       74 if (ref $_MCE->{init_relay} eq '') {
    100          
    50          
100 4         121 $_init_relay = $_MCE->{freeze}(\$_MCE->{init_relay}) . '0';
101             }
102             elsif (ref $_MCE->{init_relay} eq 'HASH') {
103 5         170 $_init_relay = $_MCE->{freeze}($_MCE->{init_relay}) . '1';
104             }
105             elsif (ref $_MCE->{init_relay} eq 'ARRAY') {
106 7         70 $_init_relay = $_MCE->{freeze}($_MCE->{init_relay}) . '2';
107             }
108              
109 16         54 print {$_RLA_W_SOCK} length($_init_relay) . $LF . $_init_relay;
  16         645  
110              
111 16         71 $_rla_nextid = 0;
112             }
113              
114 16         45 delete $MCE::RLA->{$_caller};
115              
116 16         64 return;
117             }
118              
119             sub _mce_m_loop_end {
120              
121             ## Obtain final relay data.
122 16 50   16   68 if (defined $_MCE->{init_relay}) {
123 16         43 my $_RLA_R_SOCK = $_MCE->{_rla_r_sock}->[$_rla_nextid];
124 16         61 my ($_caller, $_len, $_ret) = ($_MCE->{_caller});
125              
126 16         43 delete $_MCE->{_relayed};
127              
128 16 50       152 MCE::Util::_sock_ready($_RLA_R_SOCK, -1) if $^O eq 'MSWin32';
129 16         414 chomp($_len = <$_RLA_R_SOCK>);
130 16         87 read $_RLA_R_SOCK, $_ret, $_len;
131              
132 16 100       57 if (chop $_ret) {
133 12         284 $MCE::RLA->{$_caller} = $_MCE->{thaw}($_ret);
134             } else {
135 4         26 $MCE::RLA->{$_caller} = ${ $_MCE->{thaw}($_ret) };
  4         40  
136             }
137             }
138              
139             ## Clear variables.
140 16         66 $_MCE = $_DAU_R_SOCK_REF = $_DAU_R_SOCK = undef;
141 16         39 $_rla_nextid = $_max_workers = undef;
142              
143 16         81 return;
144             }
145              
146             sub _mce_m_init {
147              
148 10     10   50 MCE::_attach_plugin(
149             \%_output_function, \&_mce_m_loop_begin, \&_mce_m_loop_end
150             );
151              
152 10         11 return;
153             }
154             }
155              
156             ###############################################################################
157             ## ----------------------------------------------------------------------------
158             ## Relay methods.
159             ##
160             ###############################################################################
161              
162             ## Items below are folded into MCE.
163              
164             package # hide from rpm
165             MCE;
166              
167 10     10   79 no warnings qw( threads recursion uninitialized redefine );
  10         11  
  10         495  
168              
169 10     10   68 use Scalar::Util qw( weaken );
  10         20  
  10         14379  
170              
171             sub relay_final {
172              
173 16 50   16 0 255 my $x = shift; my $self = ref($x) ? $x : $MCE::MCE;
  16         87  
174              
175             _croak('MCE::relay_final: method is not allowed by the worker process')
176 16 50       82 if ($self->{_wid});
177              
178 16         65 my $_caller = caller;
179              
180 16 50       81 if (exists $MCE::RLA->{$_caller}) {
181 16 100       113 if (ref $MCE::RLA->{$_caller} eq '') {
    100          
    50          
182 4         40 return delete $MCE::RLA->{$_caller};
183             }
184             elsif (ref $MCE::RLA->{$_caller} eq 'HASH') {
185 5         30 return %{ delete $MCE::RLA->{$_caller} };
  5         75  
186             }
187             elsif (ref $MCE::RLA->{$_caller} eq 'ARRAY') {
188 7         21 return @{ delete $MCE::RLA->{$_caller} };
  7         49  
189             }
190              
191             # should not reach the following line
192 0         0 delete $MCE::RLA->{$_caller};
193             }
194              
195 0         0 return;
196             }
197              
198             sub relay_recv {
199              
200 24 50   24 0 254 my $x = shift; my $self = ref($x) ? $x : $MCE::MCE;
  24         160  
201              
202             _croak('MCE::relay_recv: (init_relay) is not defined')
203 24 50       77 unless (defined $self->{init_relay});
204             _croak('MCE::relay_recv: method is not allowed by the manager process')
205 24 50       57 unless ($self->{_wid});
206             _croak('MCE::relay_recv: method is not allowed by task_id > 0')
207 24 50       48 if ($self->{_task_id} > 0);
208              
209 24         33 my ($_chn, $_nxt, $_rdr, $_len, $_ref); local $_;
  24         34  
210              
211 24 50       72 local $\ = undef if (defined $\);
212 24 50       59 local $/ = $LF if ($/ ne $LF );
213              
214 24   33     142 $_chn = $self->{_chunk_id} || $self->{_wid};
215 24         52 $_chn = ($_chn - 1) % $self->{max_workers};
216 24         31 $_nxt = $_chn + 1;
217 24 100       48 $_nxt = 0 if ($_nxt == $self->{max_workers});
218 24         51 $_rdr = $self->{_rla_r_sock}->[$_chn];
219              
220 24         33 print {$self->{_dat_w_sock}->[0]} OUTPUT_W_RLA.$LF . '0'.$LF;
  24         702  
221              
222 24 50       125 MCE::Util::_sock_ready($_rdr, -1) if $^O eq 'MSWin32';
223 24         13663 chomp($_len = <$_rdr>);
224 24         188 read $_rdr, $_, $_len;
225 24         117 $_ref = chop $_;
226              
227 24 100       97 if ($_ref == 0) { ## scalar value
    100          
    50          
228 8         11 $self->{_rla_data} = ${ $self->{thaw}($_) };
  8         100  
229 8 50       30 return unless defined wantarray;
230 8         35 return $self->{_rla_data};
231             }
232             elsif ($_ref == 1) { ## hash reference
233 8         106 $self->{_rla_data} = $self->{thaw}($_);
234 8 50       38 return unless defined wantarray;
235 8         10 return %{ $self->{_rla_data} };
  8         65  
236             }
237             elsif ($_ref == 2) { ## array reference
238 8         120 $self->{_rla_data} = $self->{thaw}($_);
239 8 50       28 return unless defined wantarray;
240 8         9 return @{ $self->{_rla_data} };
  8         44  
241             }
242              
243 0         0 return;
244             }
245              
246             sub relay (;&) {
247              
248 26     26 0 494 my ($self, $_code);
249              
250 26 50       128 if (ref $_[0] eq 'CODE') {
251 0         0 ($self, $_code) = ($MCE::MCE, shift);
252             } else {
253 26 50       52 my $x = shift; $self = ref($x) ? $x : $MCE::MCE;
  26         84  
254 26         42 $_code = shift;
255             }
256              
257             _croak('MCE::relay: (init_relay) is not defined')
258 26 50       70 unless (defined $self->{init_relay});
259             _croak('MCE::relay: method is not allowed by the manager process')
260 26 50       51 unless ($self->{_wid});
261             _croak('MCE::relay: method is not allowed by task_id > 0')
262 26 50       61 if ($self->{_task_id} > 0);
263              
264 26 50       64 if (ref $_code ne 'CODE') {
265 0 0       0 _croak('MCE::relay: argument is not a code block') if (defined $_code);
266             } else {
267 26         77 weaken $_code;
268             }
269              
270 26         47 my ($_chn, $_cid, $_nxt, $_rdr, $_wtr);
271              
272 26 50       63 local $\ = undef if (defined $\);
273 26 50       75 local $/ = $LF if ($/ ne $LF );
274              
275 26   33     208 $_chn = $_cid = $self->{_chunk_id} || $self->{_wid};
276 26         57 $_chn = ($_chn - 1) % $self->{max_workers};
277 26         42 $_nxt = $_chn + 1;
278 26 100       67 $_nxt = 0 if ($_nxt == $self->{max_workers});
279 26         48 $_rdr = $self->{_rla_r_sock}->[$_chn];
280 26         59 $_wtr = $self->{_rla_w_sock}->[$_nxt];
281              
282 26 100       94 if (exists $self->{_rla_data}) {
283 24         32 my $_tmp; local $_ = delete $self->{_rla_data};
  24         44  
284 24 50       90 $_code->() if (ref $_code eq 'CODE');
285              
286 24 100       132 if (ref $_ eq '') { ## scalar value
    100          
    50          
287 8         66 $_tmp = $self->{freeze}(\$_) . '0';
288             }
289             elsif (ref $_ eq 'HASH') { ## hash reference
290 8         78 $_tmp = $self->{freeze}($_) . '1';
291             }
292             elsif (ref $_ eq 'ARRAY') { ## array reference
293 8         63 $_tmp = $self->{freeze}($_) . '2';
294             }
295              
296 24         51 print {$_wtr} length($_tmp) . $LF . $_tmp;
  24         840  
297 24         89 print {$self->{_dat_w_sock}->[0]} OUTPUT_R_NFY.$LF . '0'.$LF;
  24         641  
298 24         145 $self->{_relayed} = $_cid;
299             }
300             else {
301 2         18 my ($_len, $_ref); local $_;
  2         7  
302 2         5 print {$self->{_dat_w_sock}->[0]} OUTPUT_W_RLA.$LF . '0'.$LF;
  2         62  
303              
304 2 50       32 MCE::Util::_sock_ready($_rdr, -1) if $^O eq 'MSWin32';
305 2         2336 chomp($_len = <$_rdr>);
306 2         20 read $_rdr, $_, $_len;
307 2         32 $_ref = chop $_;
308              
309 2 50       14 if ($_ref == 0) { ## scalar value
    0          
    0          
310 2         7 my $_ret = ${ $self->{thaw}($_) };
  2         24  
311 2 50       8 local $_ = $_ret; $_code->() if (ref $_code eq 'CODE');
  2         17  
312 2         22 my $_tmp = $self->{freeze}(\$_) . '0';
313              
314 2         5 print {$_wtr} length($_tmp) . $LF . $_tmp;
  2         77  
315 2         7 print {$self->{_dat_w_sock}->[0]} OUTPUT_R_NFY.$LF . '0'.$LF;
  2         26  
316 2         28 $self->{_relayed} = $_cid;
317              
318 2 50       22 return unless defined wantarray;
319 0         0 return $_ret;
320             }
321             elsif ($_ref == 1) { ## hash reference
322 0         0 my %_ret = %{ $self->{thaw}($_) };
  0         0  
323 0 0       0 local $_ = { %_ret }; $_code->() if (ref $_code eq 'CODE');
  0         0  
324 0         0 my $_tmp = $self->{freeze}($_) . '1';
325              
326 0         0 print {$_wtr} length($_tmp) . $LF . $_tmp;
  0         0  
327 0         0 print {$self->{_dat_w_sock}->[0]} OUTPUT_R_NFY.$LF . '0'.$LF;
  0         0  
328 0         0 $self->{_relayed} = $_cid;
329              
330 0 0       0 return unless defined wantarray;
331 0         0 return %_ret;
332             }
333             elsif ($_ref == 2) { ## array reference
334 0         0 my @_ret = @{ $self->{thaw}($_) };
  0         0  
335 0 0       0 local $_ = [ @_ret ]; $_code->() if (ref $_code eq 'CODE');
  0         0  
336 0         0 my $_tmp = $self->{freeze}($_) . '2';
337              
338 0         0 print {$_wtr} length($_tmp) . $LF . $_tmp;
  0         0  
339 0         0 print {$self->{_dat_w_sock}->[0]} OUTPUT_R_NFY.$LF . '0'.$LF;
  0         0  
340 0         0 $self->{_relayed} = $_cid;
341              
342 0 0       0 return unless defined wantarray;
343 0         0 return @_ret;
344             }
345             }
346              
347 24         242 return;
348             }
349              
350             ## Aliases.
351              
352             *relay_lock = \&relay_recv;
353             *relay_unlock = \&relay;
354              
355             1;
356              
357             __END__