File Coverage

blib/lib/MCE/Channel/SimpleFast.pm
Criterion Covered Total %
statement 131 138 94.9
branch 69 122 56.5
condition 14 27 51.8
subroutine 15 15 100.0
pod 11 11 100.0
total 240 313 76.6


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## Channel tuned for one producer and one consumer involving no locking.
4             ##
5             ###############################################################################
6              
7             package MCE::Channel::SimpleFast;
8              
9 1     1   1788 use strict;
  1         3  
  1         31  
10 1     1   5 use warnings;
  1         2  
  1         30  
11              
12 1     1   5 no warnings qw( uninitialized once );
  1         2  
  1         45  
13              
14             our $VERSION = '1.889';
15              
16 1     1   5 use base 'MCE::Channel';
  1         2  
  1         1845  
17              
18             my $LF = "\012"; Internals::SvREADONLY($LF, 1);
19             my $is_MSWin32 = ( $^O eq 'MSWin32' ) ? 1 : 0;
20              
21             sub new {
22 1     1 1 7 my ( $class, %obj ) = ( @_, impl => 'SimpleFast' );
23              
24 1         3 $obj{init_pid} = MCE::Channel::_pid();
25 1         7 MCE::Util::_sock_pair( \%obj, 'p_sock', 'c_sock' );
26              
27 1         6 return bless \%obj, $class;
28             }
29              
30             ###############################################################################
31             ## ----------------------------------------------------------------------------
32             ## Queue-like methods.
33             ##
34             ###############################################################################
35              
36             sub end {
37 1     1 1 17 my ( $self ) = @_;
38              
39 1 50       5 local $\ = undef if (defined $\);
40 1 50       4 MCE::Util::_sock_ready_w( $self->{p_sock} ) if $is_MSWin32;
41 1         3 print { $self->{p_sock} } pack('i', -1);
  1         9  
42              
43 1         8 $self->{ended} = 1;
44             }
45              
46             sub enqueue {
47 13     13 1 2522 my $self = shift;
48 13 100       40 return MCE::Channel::_ended('enqueue') if $self->{ended};
49              
50 12 50       30 local $\ = undef if (defined $\);
51 12 50       25 MCE::Util::_sock_ready_w( $self->{p_sock} ) if $is_MSWin32;
52              
53 12         29 while ( @_ ) {
54 22         53 my $data = ''.shift;
55 22         30 print { $self->{p_sock} } pack('i', length $data) . $data;
  22         316  
56             }
57              
58 12         42 return 1;
59             }
60              
61             sub dequeue {
62 8     8 1 28 my ( $self, $count ) = @_;
63 8 100 66     29 $count = 1 if ( !$count || $count < 1 );
64              
65 8 50       24 local $/ = $LF if ( $/ ne $LF );
66              
67 8 100       19 if ( $count == 1 ) {
68 7         9 my ( $plen, $data );
69 7 50       15 MCE::Util::_sock_ready( $self->{c_sock} ) if $is_MSWin32;
70              
71             $is_MSWin32
72             ? sysread( $self->{c_sock}, $plen, 4 )
73 7 50       59 : read( $self->{c_sock}, $plen, 4 );
74              
75 7         26 my $len = unpack('i', $plen);
76 7 50       16 if ( $len < 0 ) {
77 0         0 $self->end;
78 0 0       0 return wantarray ? () : undef;
79             }
80              
81 7 100       25 return '' unless $len;
82             $is_MSWin32
83             ? MCE::Channel::_read( $self->{c_sock}, $data, $len )
84 5 50       16 : read( $self->{c_sock}, $data, $len );
85              
86 5         30 $data;
87             }
88             else {
89 1         3 my ( $plen, @ret );
90 1 50       8 MCE::Util::_sock_ready( $self->{c_sock} ) if $is_MSWin32;
91              
92 1         5 while ( $count-- ) {
93 3         4 my $data;
94              
95             $is_MSWin32
96             ? sysread( $self->{c_sock}, $plen, 4 )
97 3 50       20 : read( $self->{c_sock}, $plen, 4 );
98              
99 3         9 my $len = unpack('i', $plen);
100 3 50       7 if ( $len < 0 ) {
101 0         0 $self->end;
102 0         0 last;
103             }
104              
105 3 50       6 push(@ret, ''), next unless $len;
106             $is_MSWin32
107             ? MCE::Channel::_read( $self->{c_sock}, $data, $len )
108 3 50       10 : read( $self->{c_sock}, $data, $len );
109              
110 3         8 push @ret, $data;
111             }
112              
113 1 50       8 wantarray ? @ret : $ret[-1];
114             }
115             }
116              
117             sub dequeue_nb {
118 10     10 1 32 my ( $self, $count ) = @_;
119 10 100 66     35 $count = 1 if ( !$count || $count < 1 );
120              
121 10         15 my ( $plen, @ret );
122 10 50       28 local $/ = $LF if ( $/ ne $LF );
123              
124 10         23 while ( $count-- ) {
125 12         15 my $data;
126 12         37 MCE::Util::_nonblocking( $self->{c_sock}, 1 );
127              
128             $is_MSWin32
129             ? sysread( $self->{c_sock}, $plen, 4 )
130 12 50       94 : read( $self->{c_sock}, $plen, 4 );
131              
132 12         41 MCE::Util::_nonblocking( $self->{c_sock}, 0 );
133              
134 12 50       16 my $len; $len = unpack('i', $plen) if $plen;
  12         43  
135 12 100 66     53 if ( !$len || $len < 0 ) {
136 2 50 33     15 $self->end if defined $len && $len < 0;
137 2 50 33     12 push @ret, '' if defined $len && $len == 0;
138 2         3 last;
139             }
140              
141             $is_MSWin32
142             ? MCE::Channel::_read( $self->{c_sock}, $data, $len )
143 10 50       57 : read( $self->{c_sock}, $data, $len );
144              
145 10         35 push @ret, $data;
146             }
147              
148 10 100       86 wantarray ? @ret : $ret[-1];
149             }
150              
151             ###############################################################################
152             ## ----------------------------------------------------------------------------
153             ## Methods for two-way communication; producer(s) to consumers.
154             ##
155             ###############################################################################
156              
157             sub send {
158 7     7 1 714 my $self = shift;
159 7 100       32 return MCE::Channel::_ended('send') if $self->{ended};
160              
161 6         53 my $data = ''.shift;
162              
163 6 50       22 local $\ = undef if (defined $\);
164 6 50       12 MCE::Util::_sock_ready_w( $self->{p_sock} ) if $is_MSWin32;
165 6         8 print { $self->{p_sock} } pack('i', length $data) . $data;
  6         112  
166              
167 6         28 return 1;
168             }
169              
170             sub recv {
171 3     3 1 8 my ( $self ) = @_;
172 3         6 my ( $plen, $data );
173              
174 3 50       14 local $/ = $LF if ( $/ ne $LF );
175 3 50       7 MCE::Util::_sock_ready( $self->{c_sock} ) if $is_MSWin32;
176              
177             $is_MSWin32
178             ? sysread( $self->{c_sock}, $plen, 4 )
179 3 50       63 : read( $self->{c_sock}, $plen, 4 );
180              
181 3         17 my $len = unpack('i', $plen);
182 3 50       9 if ( $len < 0 ) {
183 0         0 $self->end;
184 0 0       0 return wantarray ? () : undef;
185             }
186              
187 3 100       22 return '' unless $len;
188              
189             $is_MSWin32
190             ? MCE::Channel::_read( $self->{c_sock}, $data, $len )
191 1 50       7 : read( $self->{c_sock}, $data, $len );
192              
193 1         7 $data;
194             }
195              
196             sub recv_nb {
197 3     3 1 27 my ( $self ) = @_;
198 3         7 my ( $plen, $data );
199              
200 3 50       12 local $/ = $LF if ( $/ ne $LF );
201 3         14 MCE::Util::_nonblocking( $self->{c_sock}, 1 );
202              
203             $is_MSWin32
204             ? sysread( $self->{c_sock}, $plen, 4 )
205 3 50       37 : read( $self->{c_sock}, $plen, 4 );
206              
207 3         14 MCE::Util::_nonblocking( $self->{c_sock}, 0 );
208              
209 3 50       9 my $len; $len = unpack('i', $plen) if $plen;
  3         34  
210 3 100 66     33 if ( !$len || $len < 0 ) {
211 2 50 33     12 $self->end if defined $len && $len < 0;
212 2 50 33     17 return '' if defined $len && $len == 0;
213 0 0       0 return wantarray ? () : undef;
214             }
215              
216             $is_MSWin32
217             ? MCE::Channel::_read( $self->{c_sock}, $data, $len )
218 1 50       10 : read( $self->{c_sock}, $data, $len );
219              
220 1         7 $data;
221             }
222              
223             ###############################################################################
224             ## ----------------------------------------------------------------------------
225             ## Methods for two-way communication; consumers to producer(s).
226             ##
227             ###############################################################################
228              
229             sub send2 {
230 6     6 1 1363 my $self = shift;
231 6         14 my $data = ''.shift;
232              
233 6 50       18 local $\ = undef if (defined $\);
234 6 50       14 MCE::Util::_sock_ready_w( $self->{c_sock} ) if $is_MSWin32;
235 6         9 print { $self->{c_sock} } pack('i', length $data) . $data;
  6         99  
236              
237 6         28 return 1;
238             }
239              
240             sub recv2 {
241 3     3 1 9 my ( $self ) = @_;
242 3         5 my ( $plen, $data );
243              
244 3 50       10 local $/ = $LF if ( $/ ne $LF );
245 3 50       8 MCE::Util::_sock_ready( $self->{p_sock} ) if $is_MSWin32;
246              
247             $is_MSWin32
248             ? sysread( $self->{p_sock}, $plen, 4 )
249 3 50       61 : read( $self->{p_sock}, $plen, 4 );
250              
251 3         12 my $len = unpack('i', $plen);
252              
253 3 100       17 return '' unless $len;
254              
255             $is_MSWin32
256             ? MCE::Channel::_read( $self->{p_sock}, $data, $len )
257 1 50       5 : read( $self->{p_sock}, $data, $len );
258              
259 1         6 $data;
260             }
261              
262             sub recv2_nb {
263 3     3 1 19 my ( $self ) = @_;
264 3         6 my ( $plen, $data );
265              
266 3 50       11 local $/ = $LF if ( $/ ne $LF );
267 3         11 MCE::Util::_nonblocking( $self->{p_sock}, 1 );
268              
269             $is_MSWin32
270             ? sysread( $self->{p_sock}, $plen, 4 )
271 3 50       40 : read( $self->{p_sock}, $plen, 4 );
272              
273 3         20 MCE::Util::_nonblocking( $self->{p_sock}, 0 );
274              
275 3 50       5 my $len; $len = unpack('i', $plen) if $plen;
  3         13  
276              
277 3 100 66     28 return '' if defined $len && $len == 0;
278 1 0       4 return wantarray ? () : undef unless $len;
    50          
279              
280             $is_MSWin32
281             ? MCE::Channel::_read( $self->{p_sock}, $data, $len )
282 1 50       5 : read( $self->{p_sock}, $data, $len );
283              
284 1         6 $data;
285             }
286              
287             1;
288              
289             __END__