File Coverage

blib/lib/MCE/Channel/Simple.pm
Criterion Covered Total %
statement 127 138 92.0
branch 63 118 53.3
condition 6 18 33.3
subroutine 15 15 100.0
pod 11 11 100.0
total 222 300 74.0


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## Channel tuned for one producer and one consumer involving no locking.
4             ##
5             ###############################################################################
6              
7             package MCE::Channel::Simple;
8              
9 1     1   1717 use strict;
  1         2  
  1         31  
10 1     1   5 use warnings;
  1         2  
  1         28  
11              
12 1     1   5 no warnings qw( uninitialized once );
  1         2  
  1         71  
13              
14             our $VERSION = '1.889';
15              
16 1     1   6 use base 'MCE::Channel';
  1         2  
  1         1913  
17              
18             my $LF = "\012"; Internals::SvREADONLY($LF, 1);
19             my $is_MSWin32 = ( $^O eq 'MSWin32' ) ? 1 : 0;
20             my $freeze = MCE::Channel::_get_freeze();
21             my $thaw = MCE::Channel::_get_thaw();
22              
23             sub new {
24 1     1 1 6 my ( $class, %obj ) = ( @_, impl => 'Simple' );
25              
26 1         4 $obj{init_pid} = MCE::Channel::_pid();
27 1         5 MCE::Util::_sock_pair( \%obj, 'p_sock', 'c_sock' );
28              
29 1         8 return bless \%obj, $class;
30             }
31              
32             ###############################################################################
33             ## ----------------------------------------------------------------------------
34             ## Queue-like methods.
35             ##
36             ###############################################################################
37              
38             sub end {
39 1     1 1 6 my ( $self ) = @_;
40              
41 1 50       5 local $\ = undef if (defined $\);
42 1 50       3 MCE::Util::_sock_ready_w( $self->{p_sock} ) if $is_MSWin32;
43 1         2 print { $self->{p_sock} } pack('i', -1);
  1         8  
44              
45 1         5 $self->{ended} = 1;
46             }
47              
48             sub enqueue {
49 15     15 1 2043 my $self = shift;
50 15 100       49 return MCE::Channel::_ended('enqueue') if $self->{ended};
51              
52 14 50       35 local $\ = undef if (defined $\);
53 14 50       27 MCE::Util::_sock_ready_w( $self->{p_sock} ) if $is_MSWin32;
54              
55 14         31 while ( @_ ) {
56 24         165 my $data = $freeze->([ shift ]);
57 24         62 print { $self->{p_sock} } pack('i', length $data) . $data;
  24         331  
58             }
59              
60 14         45 return 1;
61             }
62              
63             sub dequeue {
64 9     9 1 29 my ( $self, $count ) = @_;
65 9 100 66     30 $count = 1 if ( !$count || $count < 1 );
66              
67 9 50       24 local $/ = $LF if ( $/ ne $LF );
68              
69 9 100       37 if ( $count == 1 ) {
70 8         14 my ( $plen, $data );
71 8 50       16 MCE::Util::_sock_ready( $self->{c_sock} ) if $is_MSWin32;
72              
73             $is_MSWin32
74             ? sysread( $self->{c_sock}, $plen, 4 )
75 8 50       69 : read( $self->{c_sock}, $plen, 4 );
76              
77 8         29 my $len = unpack('i', $plen);
78 8 50       18 if ( $len < 0 ) {
79 0         0 $self->end;
80 0 0       0 return wantarray ? () : undef;
81             }
82              
83             $is_MSWin32
84             ? MCE::Channel::_read( $self->{c_sock}, $data, $len )
85 8 50       26 : read( $self->{c_sock}, $data, $len );
86              
87 8 50       137 wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1];
  0         0  
88             }
89             else {
90 1         3 my ( $plen, @ret );
91 1 50       3 MCE::Util::_sock_ready( $self->{c_sock} ) if $is_MSWin32;
92              
93 1         4 while ( $count-- ) {
94 3         45 my $data;
95              
96             $is_MSWin32
97             ? sysread( $self->{c_sock}, $plen, 4 )
98 3 50       23 : read( $self->{c_sock}, $plen, 4 );
99              
100 3         8 my $len = unpack('i', $plen);
101 3 50       10 if ( $len < 0 ) {
102 0         0 $self->end;
103 0         0 last;
104             }
105              
106             $is_MSWin32
107             ? MCE::Channel::_read( $self->{c_sock}, $data, $len )
108 3 50       9 : read( $self->{c_sock}, $data, $len );
109              
110 3         5 push @ret, @{ $thaw->($data) };
  3         22  
111             }
112              
113 1 50       9 wantarray ? @ret : $ret[-1];
114             }
115             }
116              
117             sub dequeue_nb {
118 11     11 1 35 my ( $self, $count ) = @_;
119 11 100 66     34 $count = 1 if ( !$count || $count < 1 );
120              
121 11         19 my ( $plen, @ret );
122 11 50       30 local $/ = $LF if ( $/ ne $LF );
123              
124 11         26 while ( $count-- ) {
125 13         16 my $data;
126 13         40 MCE::Util::_nonblocking( $self->{c_sock}, 1 );
127              
128             $is_MSWin32
129             ? sysread( $self->{c_sock}, $plen, 4 )
130 13 50       105 : read( $self->{c_sock}, $plen, 4 );
131              
132 13         45 MCE::Util::_nonblocking( $self->{c_sock}, 0 );
133              
134 13 50       14 my $len; $len = unpack('i', $plen) if $plen;
  13         47  
135 13 50 33     62 if ( !$len || $len < 0 ) {
136 0 0 0     0 $self->end if defined $len && $len < 0;
137 0         0 last;
138             }
139              
140             $is_MSWin32
141             ? MCE::Channel::_read( $self->{c_sock}, $data, $len )
142 13 50       43 : read( $self->{c_sock}, $data, $len );
143              
144 13         19 push @ret, @{ $thaw->($data) };
  13         140  
145             }
146              
147 11 100       70 wantarray ? @ret : $ret[-1];
148             }
149              
150             ###############################################################################
151             ## ----------------------------------------------------------------------------
152             ## Methods for two-way communication; producer(s) to consumers.
153             ##
154             ###############################################################################
155              
156             sub send {
157 11     11 1 702 my $self = shift;
158 11 100       35 return MCE::Channel::_ended('send') if $self->{ended};
159              
160 10         103 my $data = $freeze->([ @_ ]);
161              
162 10 50       35 local $\ = undef if (defined $\);
163 10 50       21 MCE::Util::_sock_ready_w( $self->{p_sock} ) if $is_MSWin32;
164 10         12 print { $self->{p_sock} } pack('i', length $data) . $data;
  10         181  
165              
166 10         47 return 1;
167             }
168              
169             sub recv {
170 5     5 1 14 my ( $self ) = @_;
171 5         8 my ( $plen, $data );
172              
173 5 50       19 local $/ = $LF if ( $/ ne $LF );
174 5 50       11 MCE::Util::_sock_ready( $self->{c_sock} ) if $is_MSWin32;
175              
176             $is_MSWin32
177             ? sysread( $self->{c_sock}, $plen, 4 )
178 5 50       78 : read( $self->{c_sock}, $plen, 4 );
179              
180 5         23 my $len = unpack('i', $plen);
181 5 50       16 if ( $len < 0 ) {
182 0         0 $self->end;
183 0 0       0 return wantarray ? () : undef;
184             }
185              
186             $is_MSWin32
187             ? MCE::Channel::_read( $self->{c_sock}, $data, $len )
188 5 50       15 : read( $self->{c_sock}, $data, $len );
189              
190 5 100       75 wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1];
  1         27  
191             }
192              
193             sub recv_nb {
194 5     5 1 17 my ( $self ) = @_;
195 5         9 my ( $plen, $data );
196              
197 5 50       34 local $/ = $LF if ( $/ ne $LF );
198 5         25 MCE::Util::_nonblocking( $self->{c_sock}, 1 );
199              
200             $is_MSWin32
201             ? sysread( $self->{c_sock}, $plen, 4 )
202 5 50       61 : read( $self->{c_sock}, $plen, 4 );
203              
204 5         21 MCE::Util::_nonblocking( $self->{c_sock}, 0 );
205              
206 5 50       7 my $len; $len = unpack('i', $plen) if $plen;
  5         20  
207 5 50 33     25 if ( !$len || $len < 0 ) {
208 0 0 0     0 $self->end if defined $len && $len < 0;
209 0 0       0 return wantarray ? () : undef;
210             }
211              
212             $is_MSWin32
213             ? MCE::Channel::_read( $self->{c_sock}, $data, $len )
214 5 50       19 : read( $self->{c_sock}, $data, $len );
215              
216 5 100       69 wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1];
  1         25  
217             }
218              
219             ###############################################################################
220             ## ----------------------------------------------------------------------------
221             ## Methods for two-way communication; consumers to producer(s).
222             ##
223             ###############################################################################
224              
225             sub send2 {
226 10     10 1 1336 my $self = shift;
227 10         74 my $data = $freeze->([ @_ ]);
228              
229 10 50       38 local $\ = undef if (defined $\);
230 10 50       19 MCE::Util::_sock_ready_w( $self->{c_sock} ) if $is_MSWin32;
231 10         13 print { $self->{c_sock} } pack('i', length $data) . $data;
  10         180  
232              
233 10         46 return 1;
234             }
235              
236             sub recv2 {
237 5     5 1 16 my ( $self ) = @_;
238 5         9 my ( $plen, $data );
239              
240 5 50       17 local $/ = $LF if ( $/ ne $LF );
241 5 50       10 MCE::Util::_sock_ready( $self->{p_sock} ) if $is_MSWin32;
242              
243             $is_MSWin32
244             ? sysread( $self->{p_sock}, $plen, 4 )
245 5 50       77 : read( $self->{p_sock}, $plen, 4 );
246              
247 5         20 my $len = unpack('i', $plen);
248              
249             $is_MSWin32
250             ? MCE::Channel::_read( $self->{p_sock}, $data, $len )
251 5 50       20 : read( $self->{p_sock}, $data, $len );
252              
253 5 100       77 wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1];
  1         18  
254             }
255              
256             sub recv2_nb {
257 5     5 1 16 my ( $self ) = @_;
258 5         7 my ( $plen, $data );
259              
260 5 50       16 local $/ = $LF if ( $/ ne $LF );
261 5         18 MCE::Util::_nonblocking( $self->{p_sock}, 1 );
262              
263             $is_MSWin32
264             ? sysread( $self->{p_sock}, $plen, 4 )
265 5 50       57 : read( $self->{p_sock}, $plen, 4 );
266              
267 5         21 MCE::Util::_nonblocking( $self->{p_sock}, 0 );
268              
269 5 50       9 my $len; $len = unpack('i', $plen) if $plen;
  5         19  
270 5 0       12 return wantarray ? () : undef unless $len;
    50          
271              
272             $is_MSWin32
273             ? MCE::Channel::_read( $self->{p_sock}, $data, $len )
274 5 50       19 : read( $self->{p_sock}, $data, $len );
275              
276 5 100       67 wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1];
  1         17  
277             }
278              
279             1;
280              
281             __END__