File Coverage

blib/lib/MCE/Channel/SimpleFast.pm
Criterion Covered Total %
statement 135 142 95.0
branch 70 124 56.4
condition 14 27 51.8
subroutine 15 15 100.0
pod 11 11 100.0
total 245 319 76.8


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## Channel tuned for one producer and one consumer involving no locking.
4             ##
5             ###############################################################################
6              
7             package MCE::Channel::SimpleFast;
8              
9 1     1   1154 use strict;
  1         3  
  1         30  
10 1     1   4 use warnings;
  1         2  
  1         26  
11              
12 1     1   5 no warnings qw( uninitialized once );
  1         2  
  1         42  
13              
14             our $VERSION = '1.888';
15              
16 1     1   11 use base 'MCE::Channel';
  1         2  
  1         1898  
17              
18             my $LF = "\012"; Internals::SvREADONLY($LF, 1);
19             my $is_MSWin32 = ( $^O eq 'MSWin32' ) ? 1 : 0;
20              
21             sub new {
22 1     1 1 6 my ( $class, %obj ) = ( @_, impl => 'SimpleFast' );
23              
24 1         6 $obj{init_pid} = MCE::Channel::_pid();
25 1         6 MCE::Util::_sock_pair( \%obj, 'p_sock', 'c_sock' );
26              
27 1         6 return bless \%obj, $class;
28             }
29              
30             ###############################################################################
31             ## ----------------------------------------------------------------------------
32             ## Queue-like methods.
33             ##
34             ###############################################################################
35              
36             sub end {
37 1     1 1 45 my ( $self ) = @_;
38              
39 1 50       5 local $\ = undef if (defined $\);
40 1 50       5 MCE::Util::_sock_ready_w( $self->{p_sock} ) if $is_MSWin32;
41 1         2 print { $self->{p_sock} } pack('i', -1);
  1         9  
42              
43 1         8 $self->{ended} = 1;
44             }
45              
46             sub enqueue {
47 13     13 1 1670 my $self = shift;
48 13 100       42 return MCE::Channel::_ended('enqueue') if $self->{ended};
49              
50 12 50       31 local $\ = undef if (defined $\);
51 12 50       26 MCE::Util::_sock_ready_w( $self->{p_sock} ) if $is_MSWin32;
52              
53 12         25 while ( @_ ) {
54 22         65 my $data = ''.shift;
55 22         31 print { $self->{p_sock} } pack('i', length $data) . $data;
  22         295  
56             }
57              
58 12         41 return 1;
59             }
60              
61             sub dequeue {
62 8     8 1 22 my ( $self, $count ) = @_;
63 8 100 66     27 $count = 1 if ( !$count || $count < 1 );
64              
65 8 50       24 local $/ = $LF if ( $/ ne $LF );
66              
67 8 100       17 if ( $count == 1 ) {
68 7         12 my ( $plen, $data );
69 7 50       12 MCE::Util::_sock_ready( $self->{c_sock} ) if $is_MSWin32;
70              
71             $is_MSWin32
72             ? sysread( $self->{c_sock}, $plen, 4 )
73 7 50       63 : read( $self->{c_sock}, $plen, 4 );
74              
75 7         26 my $len = unpack('i', $plen);
76 7 50       22 if ( $len < 0 ) {
77 0         0 $self->end;
78 0 0       0 return wantarray ? () : undef;
79             }
80              
81 7 100       26 return '' unless $len;
82             $is_MSWin32
83             ? MCE::Channel::_read( $self->{c_sock}, $data, $len )
84 5 50       15 : read( $self->{c_sock}, $data, $len );
85              
86 5         27 $data;
87             }
88             else {
89 1         3 my ( $plen, @ret );
90 1 50       3 MCE::Util::_sock_ready( $self->{c_sock} ) if $is_MSWin32;
91              
92 1         4 while ( $count-- ) {
93 3         5 my $data;
94              
95             $is_MSWin32
96             ? sysread( $self->{c_sock}, $plen, 4 )
97 3 50       19 : read( $self->{c_sock}, $plen, 4 );
98              
99 3         9 my $len = unpack('i', $plen);
100 3 50       8 if ( $len < 0 ) {
101 0         0 $self->end;
102 0         0 last;
103             }
104              
105 3 50       7 push(@ret, ''), next unless $len;
106             $is_MSWin32
107             ? MCE::Channel::_read( $self->{c_sock}, $data, $len )
108 3 50       7 : read( $self->{c_sock}, $data, $len );
109              
110 3         10 push @ret, $data;
111             }
112              
113 1 50       10 wantarray ? @ret : $ret[-1];
114             }
115             }
116              
117             sub dequeue_nb {
118 10     10 1 34 my ( $self, $count ) = @_;
119 10 100 66     37 $count = 1 if ( !$count || $count < 1 );
120              
121 10         16 my ( $plen, @ret );
122 10 50       28 local $/ = $LF if ( $/ ne $LF );
123              
124 10         22 while ( $count-- ) {
125 12         18 my $data;
126 12         50 MCE::Util::_nonblocking( $self->{c_sock}, 1 );
127              
128             $is_MSWin32
129             ? sysread( $self->{c_sock}, $plen, 4 )
130 12 50       99 : read( $self->{c_sock}, $plen, 4 );
131              
132 12         42 MCE::Util::_nonblocking( $self->{c_sock}, 0 );
133              
134 12 50       21 my $len; $len = unpack('i', $plen) if $plen;
  12         44  
135 12 100 66     49 if ( !$len || $len < 0 ) {
136 2 50 33     13 $self->end if defined $len && $len < 0;
137 2 50 33     12 push @ret, '' if defined $len && $len == 0;
138 2         5 last;
139             }
140              
141             $is_MSWin32
142             ? MCE::Channel::_read( $self->{c_sock}, $data, $len )
143 10 50       38 : read( $self->{c_sock}, $data, $len );
144              
145 10         30 push @ret, $data;
146             }
147              
148 10 100       78 wantarray ? @ret : $ret[-1];
149             }
150              
151             ###############################################################################
152             ## ----------------------------------------------------------------------------
153             ## Methods for two-way communication; producer(s) to consumers.
154             ##
155             ###############################################################################
156              
157             sub send {
158 7     7 1 566 my $self = shift;
159 7 100       36 return MCE::Channel::_ended('send') if $self->{ended};
160              
161 6         15 my $data = ''.shift;
162              
163 6 50       16 local $\ = undef if (defined $\);
164 6 50       14 MCE::Util::_sock_ready_w( $self->{p_sock} ) if $is_MSWin32;
165 6         10 print { $self->{p_sock} } pack('i', length $data) . $data;
  6         120  
166              
167 6         26 return 1;
168             }
169              
170             sub recv {
171 3     3 1 7 my ( $self ) = @_;
172 3         5 my ( $plen, $data );
173              
174 3 50       10 local $/ = $LF if ( $/ ne $LF );
175 3 50       8 MCE::Util::_sock_ready( $self->{c_sock} ) if $is_MSWin32;
176              
177             $is_MSWin32
178             ? sysread( $self->{c_sock}, $plen, 4 )
179 3 50       57 : read( $self->{c_sock}, $plen, 4 );
180              
181 3         15 my $len = unpack('i', $plen);
182 3 50       8 if ( $len < 0 ) {
183 0         0 $self->end;
184 0 0       0 return wantarray ? () : undef;
185             }
186              
187 3 100       17 return '' unless $len;
188              
189             $is_MSWin32
190             ? MCE::Channel::_read( $self->{c_sock}, $data, $len )
191 1 50       6 : read( $self->{c_sock}, $data, $len );
192              
193 1         6 $data;
194             }
195              
196             sub recv_nb {
197 3     3 1 11 my ( $self ) = @_;
198 3         6 my ( $plen, $data );
199              
200 3 50       11 local $/ = $LF if ( $/ ne $LF );
201 3         13 MCE::Util::_nonblocking( $self->{c_sock}, 1 );
202              
203             $is_MSWin32
204             ? sysread( $self->{c_sock}, $plen, 4 )
205 3 50       35 : read( $self->{c_sock}, $plen, 4 );
206              
207 3         16 MCE::Util::_nonblocking( $self->{c_sock}, 0 );
208              
209 3 50       4 my $len; $len = unpack('i', $plen) if $plen;
  3         47  
210 3 100 66     35 if ( !$len || $len < 0 ) {
211 2 50 33     13 $self->end if defined $len && $len < 0;
212 2 50 33     16 return '' if defined $len && $len == 0;
213 0 0       0 return wantarray ? () : undef;
214             }
215              
216             $is_MSWin32
217             ? MCE::Channel::_read( $self->{c_sock}, $data, $len )
218 1 50       9 : read( $self->{c_sock}, $data, $len );
219              
220 1         6 $data;
221             }
222              
223             ###############################################################################
224             ## ----------------------------------------------------------------------------
225             ## Methods for two-way communication; consumers to producer(s).
226             ##
227             ###############################################################################
228              
229             sub send2 {
230 6     6 1 1084 my $self = shift;
231 6         14 my $data = ''.shift;
232              
233 6 50       17 local $\ = undef if (defined $\);
234 6         10 local $MCE::Signal::SIG;
235              
236             {
237 6         9 local $MCE::Signal::IPC = 1;
  6         8  
238              
239 6 50       13 MCE::Util::_sock_ready_w( $self->{c_sock} ) if $is_MSWin32;
240 6         8 print { $self->{c_sock} } pack('i', length $data) . $data;
  6         101  
241             }
242              
243 6 50       22 CORE::kill($MCE::Signal::SIG, $$) if $MCE::Signal::SIG;
244              
245 6         21 return 1;
246             }
247              
248             sub recv2 {
249 3     3 1 11 my ( $self ) = @_;
250 3         5 my ( $plen, $data );
251              
252 3 50       8 local $/ = $LF if ( $/ ne $LF );
253 3 50       7 MCE::Util::_sock_ready( $self->{p_sock} ) if $is_MSWin32;
254              
255             $is_MSWin32
256             ? sysread( $self->{p_sock}, $plen, 4 )
257 3 50       52 : read( $self->{p_sock}, $plen, 4 );
258              
259 3         13 my $len = unpack('i', $plen);
260              
261 3 100       18 return '' unless $len;
262              
263             $is_MSWin32
264             ? MCE::Channel::_read( $self->{p_sock}, $data, $len )
265 1 50       4 : read( $self->{p_sock}, $data, $len );
266              
267 1         8 $data;
268             }
269              
270             sub recv2_nb {
271 3     3 1 12 my ( $self ) = @_;
272 3         6 my ( $plen, $data );
273              
274 3 50       11 local $/ = $LF if ( $/ ne $LF );
275 3         12 MCE::Util::_nonblocking( $self->{p_sock}, 1 );
276              
277             $is_MSWin32
278             ? sysread( $self->{p_sock}, $plen, 4 )
279 3 50       41 : read( $self->{p_sock}, $plen, 4 );
280              
281 3         14 MCE::Util::_nonblocking( $self->{p_sock}, 0 );
282              
283 3 50       5 my $len; $len = unpack('i', $plen) if $plen;
  3         15  
284              
285 3 100 66     25 return '' if defined $len && $len == 0;
286 1 0       4 return wantarray ? () : undef unless $len;
    50          
287              
288             $is_MSWin32
289             ? MCE::Channel::_read( $self->{p_sock}, $data, $len )
290 1 50       5 : read( $self->{p_sock}, $data, $len );
291              
292 1         6 $data;
293             }
294              
295             1;
296              
297             __END__