File Coverage

blib/lib/MCE/Channel/Simple.pm
Criterion Covered Total %
statement 127 138 92.0
branch 63 118 53.3
condition 6 18 33.3
subroutine 15 15 100.0
pod 11 11 100.0
total 222 300 74.0


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## Channel tuned for one producer and one consumer involving no locking.
4             ##
5             ###############################################################################
6              
7             package MCE::Channel::Simple;
8              
9 1     1   1625 use strict;
  1         2  
  1         69  
10 1     1   8 use warnings;
  1         2  
  1         156  
11              
12 1     1   8 no warnings qw( uninitialized once );
  1         2  
  1         107  
13              
14             our $VERSION = '1.902';
15              
16 1     1   8 use base 'MCE::Channel';
  1         2  
  1         2014  
17              
18             my $LF = "\012"; Internals::SvREADONLY($LF, 1);
19             my $is_MSWin32 = ( $^O eq 'MSWin32' ) ? 1 : 0;
20             my $freeze = MCE::Channel::_get_freeze();
21             my $thaw = MCE::Channel::_get_thaw();
22              
23             sub new {
24 1     1 1 4 my ( $class, %obj ) = ( @_, impl => 'Simple' );
25              
26 1         4 $obj{init_pid} = MCE::Channel::_pid();
27 1         7 MCE::Util::_sock_pair( \%obj, 'p_sock', 'c_sock' );
28              
29 1         5 return bless \%obj, $class;
30             }
31              
32             ###############################################################################
33             ## ----------------------------------------------------------------------------
34             ## Queue-like methods.
35             ##
36             ###############################################################################
37              
38             sub end {
39 1     1 1 5 my ( $self ) = @_;
40              
41 1 50       4 local $\ = undef if (defined $\);
42 1 50       36 MCE::Util::_sock_ready_w( $self->{p_sock} ) if $is_MSWin32;
43 1         2 print { $self->{p_sock} } pack('i', -1);
  1         7  
44              
45 1         3 $self->{ended} = 1;
46             }
47              
48             sub enqueue {
49 15     15 1 1746 my $self = shift;
50 15 100       37 return MCE::Channel::_ended('enqueue') if $self->{ended};
51              
52 14 50       24 local $\ = undef if (defined $\);
53 14 50       21 MCE::Util::_sock_ready_w( $self->{p_sock} ) if $is_MSWin32;
54              
55 14         21 while ( @_ ) {
56 24         104 my $data = $freeze->([ shift ]);
57 24         30 print { $self->{p_sock} } pack('i', length $data) . $data;
  24         215  
58             }
59              
60 14         20 return 1;
61             }
62              
63             sub dequeue {
64 9     9 1 17 my ( $self, $count ) = @_;
65 9 100 66     22 $count = 1 if ( !$count || $count < 1 );
66              
67 9 50       22 local $/ = $LF if ( $/ ne $LF );
68              
69 9 100       17 if ( $count == 1 ) {
70 8         9 my ( $plen, $data );
71 8 50       26 MCE::Util::_sock_ready( $self->{c_sock} ) if $is_MSWin32;
72              
73             $is_MSWin32
74             ? sysread( $self->{c_sock}, $plen, 4 )
75 8 50       97 : read( $self->{c_sock}, $plen, 4 );
76              
77 8         19 my $len = unpack('i', $plen);
78 8 50       12 if ( $len < 0 ) {
79 0         0 $self->end;
80 0 0       0 return wantarray ? () : undef;
81             }
82              
83             $is_MSWin32
84             ? MCE::Channel::_read( $self->{c_sock}, $data, $len )
85 8 50       14 : read( $self->{c_sock}, $data, $len );
86              
87 8 50       91 wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1];
  0         0  
88             }
89             else {
90 1         2 my ( $plen, @ret );
91 1 50       2 MCE::Util::_sock_ready( $self->{c_sock} ) if $is_MSWin32;
92              
93 1         4 while ( $count-- ) {
94 3         2 my $data;
95              
96             $is_MSWin32
97             ? sysread( $self->{c_sock}, $plen, 4 )
98 3 50       16 : read( $self->{c_sock}, $plen, 4 );
99              
100 3         5 my $len = unpack('i', $plen);
101 3 50       6 if ( $len < 0 ) {
102 0         0 $self->end;
103 0         0 last;
104             }
105              
106             $is_MSWin32
107             ? MCE::Channel::_read( $self->{c_sock}, $data, $len )
108 3 50       5 : read( $self->{c_sock}, $data, $len );
109              
110 3         3 push @ret, @{ $thaw->($data) };
  3         14  
111             }
112              
113 1 50       7 wantarray ? @ret : $ret[-1];
114             }
115             }
116              
117             sub dequeue_nb {
118 11     11 1 23 my ( $self, $count ) = @_;
119 11 100 66     27 $count = 1 if ( !$count || $count < 1 );
120              
121 11         14 my ( $plen, @ret );
122 11 50       24 local $/ = $LF if ( $/ ne $LF );
123              
124 11         17 while ( $count-- ) {
125 13         13 my $data;
126 13         37 MCE::Util::_nonblocking( $self->{c_sock}, 1 );
127              
128             $is_MSWin32
129             ? sysread( $self->{c_sock}, $plen, 4 )
130 13 50       93 : read( $self->{c_sock}, $plen, 4 );
131              
132 13         24 MCE::Util::_nonblocking( $self->{c_sock}, 0 );
133              
134 13 50       10 my $len; $len = unpack('i', $plen) if $plen;
  13         31  
135 13 50 33     39 if ( !$len || $len < 0 ) {
136 0 0 0     0 $self->end if defined $len && $len < 0;
137 0         0 last;
138             }
139              
140             $is_MSWin32
141             ? MCE::Channel::_read( $self->{c_sock}, $data, $len )
142 13 50       21 : read( $self->{c_sock}, $data, $len );
143              
144 13         11 push @ret, @{ $thaw->($data) };
  13         121  
145             }
146              
147 11 100       53 wantarray ? @ret : $ret[-1];
148             }
149              
150             ###############################################################################
151             ## ----------------------------------------------------------------------------
152             ## Methods for two-way communication; producer(s) to consumers.
153             ##
154             ###############################################################################
155              
156             sub send {
157 11     11 1 575 my $self = shift;
158 11 100       30 return MCE::Channel::_ended('send') if $self->{ended};
159              
160 10         70 my $data = $freeze->([ @_ ]);
161              
162 10 50       24 local $\ = undef if (defined $\);
163 10 50       14 MCE::Util::_sock_ready_w( $self->{p_sock} ) if $is_MSWin32;
164 10         11 print { $self->{p_sock} } pack('i', length $data) . $data;
  10         151  
165              
166 10         20 return 1;
167             }
168              
169             sub recv {
170 5     5 1 7 my ( $self ) = @_;
171 5         6 my ( $plen, $data );
172              
173 5 50       29 local $/ = $LF if ( $/ ne $LF );
174 5 50       11 MCE::Util::_sock_ready( $self->{c_sock} ) if $is_MSWin32;
175              
176             $is_MSWin32
177             ? sysread( $self->{c_sock}, $plen, 4 )
178 5 50       64 : read( $self->{c_sock}, $plen, 4 );
179              
180 5         13 my $len = unpack('i', $plen);
181 5 50       12 if ( $len < 0 ) {
182 0         0 $self->end;
183 0 0       0 return wantarray ? () : undef;
184             }
185              
186             $is_MSWin32
187             ? MCE::Channel::_read( $self->{c_sock}, $data, $len )
188 5 50       12 : read( $self->{c_sock}, $data, $len );
189              
190 5 100       69 wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1];
  1         13  
191             }
192              
193             sub recv_nb {
194 5     5 1 10 my ( $self ) = @_;
195 5         6 my ( $plen, $data );
196              
197 5 50       12 local $/ = $LF if ( $/ ne $LF );
198 5         16 MCE::Util::_nonblocking( $self->{c_sock}, 1 );
199              
200             $is_MSWin32
201             ? sysread( $self->{c_sock}, $plen, 4 )
202 5 50       41 : read( $self->{c_sock}, $plen, 4 );
203              
204 5         8 MCE::Util::_nonblocking( $self->{c_sock}, 0 );
205              
206 5 50       5 my $len; $len = unpack('i', $plen) if $plen;
  5         14  
207 5 50 33     17 if ( !$len || $len < 0 ) {
208 0 0 0     0 $self->end if defined $len && $len < 0;
209 0 0       0 return wantarray ? () : undef;
210             }
211              
212             $is_MSWin32
213             ? MCE::Channel::_read( $self->{c_sock}, $data, $len )
214 5 50       33 : read( $self->{c_sock}, $data, $len );
215              
216 5 100       62 wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1];
  1         13  
217             }
218              
219             ###############################################################################
220             ## ----------------------------------------------------------------------------
221             ## Methods for two-way communication; consumers to producer(s).
222             ##
223             ###############################################################################
224              
225             sub send2 {
226 10     10 1 1075 my $self = shift;
227 10         66 my $data = $freeze->([ @_ ]);
228              
229 10 50       45 local $\ = undef if (defined $\);
230 10 50       19 MCE::Util::_sock_ready_w( $self->{c_sock} ) if $is_MSWin32;
231 10         15 print { $self->{c_sock} } pack('i', length $data) . $data;
  10         173  
232              
233 10         23 return 1;
234             }
235              
236             sub recv2 {
237 5     5 1 10 my ( $self ) = @_;
238 5         6 my ( $plen, $data );
239              
240 5 50       14 local $/ = $LF if ( $/ ne $LF );
241 5 50       9 MCE::Util::_sock_ready( $self->{p_sock} ) if $is_MSWin32;
242              
243             $is_MSWin32
244             ? sysread( $self->{p_sock}, $plen, 4 )
245 5 50       63 : read( $self->{p_sock}, $plen, 4 );
246              
247 5         11 my $len = unpack('i', $plen);
248              
249             $is_MSWin32
250             ? MCE::Channel::_read( $self->{p_sock}, $data, $len )
251 5 50       13 : read( $self->{p_sock}, $data, $len );
252              
253 5 100       73 wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1];
  1         14  
254             }
255              
256             sub recv2_nb {
257 5     5 1 15 my ( $self ) = @_;
258 5         5 my ( $plen, $data );
259              
260 5 50       14 local $/ = $LF if ( $/ ne $LF );
261 5         23 MCE::Util::_nonblocking( $self->{p_sock}, 1 );
262              
263             $is_MSWin32
264             ? sysread( $self->{p_sock}, $plen, 4 )
265 5 50       44 : read( $self->{p_sock}, $plen, 4 );
266              
267 5         11 MCE::Util::_nonblocking( $self->{p_sock}, 0 );
268              
269 5 50       5 my $len; $len = unpack('i', $plen) if $plen;
  5         14  
270 5 0       8 return wantarray ? () : undef unless $len;
    50          
271              
272             $is_MSWin32
273             ? MCE::Channel::_read( $self->{p_sock}, $data, $len )
274 5 50       11 : read( $self->{p_sock}, $data, $len );
275              
276 5 100       56 wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1];
  1         14  
277             }
278              
279             1;
280              
281             __END__
282              
283             ###############################################################################
284             ## ----------------------------------------------------------------------------
285             ## Module usage.
286             ##
287             ###############################################################################
288              
289             =head1 NAME
290              
291             MCE::Channel::Simple - Channel tuned for one producer and one consumer
292              
293             =head1 VERSION
294              
295             This document describes MCE::Channel::Simple version 1.902
296              
297             =head1 DESCRIPTION
298              
299             A channel class providing queue-like and two-way communication
300             for one process or thread on either end; no locking needed.
301              
302             The API is described in L<MCE::Channel>.
303              
304             =over 3
305              
306             =item new
307              
308             use MCE::Channel;
309              
310             my $chnl = MCE::Channel->new( impl => 'Simple' );
311              
312             =back
313              
314             =head1 QUEUE-LIKE BEHAVIOR
315              
316             =over 3
317              
318             =item enqueue
319              
320             =item dequeue
321              
322             =item dequeue_nb
323              
324             =item end
325              
326             =back
327              
328             =head1 TWO-WAY IPC - PRODUCER TO CONSUMER
329              
330             =over 3
331              
332             =item send
333              
334             =item recv
335              
336             =item recv_nb
337              
338             =back
339              
340             =head1 TWO-WAY IPC - CONSUMER TO PRODUCER
341              
342             =over 3
343              
344             =item send2
345              
346             =item recv2
347              
348             =item recv2_nb
349              
350             =back
351              
352             =head1 AUTHOR
353              
354             Mario E. Roy, S<E<lt>marioeroy AT gmail DOT comE<gt>>
355              
356             =cut
357