File Coverage

blib/lib/MCE/Channel/MutexFast.pm
Criterion Covered Total %
statement 150 159 94.3
branch 82 110 74.5
condition 13 27 48.1
subroutine 18 19 94.7
pod 11 11 100.0
total 274 326 84.0


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## Channel for producer(s) and many consumers supporting processes and threads.
4             ##
5             ###############################################################################
6              
7             package MCE::Channel::MutexFast;
8              
9 10     10   97597 use strict;
  10         255  
  10         1476  
10 10     10   90 use warnings;
  10         28  
  10         11357  
11              
12 10     10   116 no warnings qw( uninitialized once );
  10         43  
  10         1543  
13              
14             our $VERSION = '1.902';
15              
16 10     10   91 use base 'MCE::Channel';
  10         36  
  10         9678  
17 10     10   1053 use MCE::Mutex ();
  10         29  
  10         19056  
18              
19             my $LF = "\012"; Internals::SvREADONLY($LF, 1);
20              
21             sub new {
22 10     10 1 100 my ( $class, %obj ) = ( @_, impl => 'MutexFast' );
23              
24 10         42 $obj{init_pid} = MCE::Channel::_pid();
25 10         143 MCE::Util::_sock_pair( \%obj, 'p_sock', 'c_sock' );
26              
27             # locking for the consumer side of the channel
28 10         71 $obj{c_mutex} = MCE::Mutex->new( impl => 'Channel2' );
29              
30             # optionally, support many-producers writing and reading
31 10 100       40 $obj{p_mutex} = MCE::Mutex->new( impl => 'Channel2' ) if $obj{mp};
32              
33 10         66 bless \%obj, $class;
34              
35 10         57 MCE::Mutex::Channel::_save_for_global_cleanup($obj{c_mutex});
36 10 100       33 MCE::Mutex::Channel::_save_for_global_cleanup($obj{p_mutex}) if $obj{mp};
37              
38 10         64 return \%obj;
39             }
40              
41             END {
42 10 100   10   1283 MCE::Child->finish('MCE') if $INC{'MCE/Child.pm'};
43             }
44              
45             ###############################################################################
46             ## ----------------------------------------------------------------------------
47             ## Queue-like methods.
48             ##
49             ###############################################################################
50              
51             sub end {
52 2     2 1 19 my ( $self ) = @_;
53              
54 2 50       12 local $\ = undef if (defined $\);
55 2         5 print { $self->{p_sock} } pack('i', -1);
  2         18  
56              
57 2         8 $self->{ended} = 1;
58             }
59              
60             sub enqueue {
61 26     26 1 5496 my $self = shift;
62 26 100       102 return MCE::Channel::_ended('enqueue') if $self->{ended};
63              
64 24 50       73 local $\ = undef if (defined $\);
65 24         49 my $p_mutex = $self->{p_mutex};
66 24 100       121 $p_mutex->lock2 if $p_mutex;
67              
68 24         60 while ( @_ ) {
69 44         78 my $data = ''.shift;
70 44         85 print { $self->{p_sock} } pack('i', length $data), $data;
  44         581  
71             }
72              
73 24 100       86 $p_mutex->unlock2 if $p_mutex;
74              
75 24         128 return 1;
76             }
77              
78             sub dequeue {
79 16     16 1 55 my ( $self, $count ) = @_;
80 16 100 66     68 $count = 1 if ( !$count || $count < 1 );
81              
82 16 100       40 if ( $count == 1 ) {
83 14         84 ( my $c_mutex = $self->{c_mutex} )->lock;
84 14         47 MCE::Util::_sysread( $self->{c_sock}, my($plen), 4 );
85              
86 14         51 my $len = unpack('i', $plen);
87 14 50       40 if ( $len < 0 ) {
88 0         0 $self->end, $c_mutex->unlock;
89 0 0       0 return wantarray ? () : undef;
90             }
91              
92 14 100       72 MCE::Channel::_read( $self->{c_sock}, my($data), $len ) if $len;
93 14         53 $c_mutex->unlock;
94              
95 14 100       112 $len ? $data : '';
96             }
97             else {
98 2         10 my ( $plen, @ret );
99              
100 2         10 ( my $c_mutex = $self->{c_mutex} )->lock;
101              
102 2         10 while ( $count-- ) {
103 6         18 MCE::Util::_sysread( $self->{c_sock}, $plen, 4 );
104              
105 6         12 my $len = unpack('i', $plen);
106 6 50       15 if ( $len < 0 ) {
107 0         0 $self->end;
108 0         0 last;
109             }
110              
111 6 50       13 push(@ret, ''), next unless $len;
112 6         19 MCE::Channel::_read( $self->{c_sock}, my($data), $len );
113 6         18 push @ret, $data;
114             }
115              
116 2         9 $c_mutex->unlock;
117              
118 2 50       20 wantarray ? @ret : $ret[-1];
119             }
120             }
121              
122             sub dequeue_nb {
123 20     20 1 69 my ( $self, $count ) = @_;
124 20 100 66     74 $count = 1 if ( !$count || $count < 1 );
125              
126 20         36 my ( $plen, @ret );
127 20         93 ( my $c_mutex = $self->{c_mutex} )->lock;
128              
129 20         47 while ( $count-- ) {
130 24         77 MCE::Util::_nonblocking( $self->{c_sock}, 1 );
131 24         66 MCE::Util::_sysread( $self->{c_sock}, $plen, 4 );
132 24         64 MCE::Util::_nonblocking( $self->{c_sock}, 0 );
133              
134 24 50       32 my $len; $len = unpack('i', $plen) if $plen;
  24         86  
135 24 100 66     121 if ( !$len || $len < 0 ) {
136 4 50 33     51 $self->end if defined $len && $len < 0;
137 4 50 33     25 push @ret, '' if defined $len && $len == 0;
138 4         13 last;
139             }
140              
141 20         76 MCE::Channel::_read( $self->{c_sock}, my($data), $len );
142 20         68 push @ret, $data;
143             }
144              
145 20         67 $c_mutex->unlock;
146              
147 20 100       151 wantarray ? @ret : $ret[-1];
148             }
149              
150             ###############################################################################
151             ## ----------------------------------------------------------------------------
152             ## Methods for two-way communication; producer to consumer.
153             ##
154             ###############################################################################
155              
156             sub send {
157 29     29 1 4818 my $self = shift;
158 29 100       133 return MCE::Channel::_ended('send') if $self->{ended};
159              
160 27         48 my $data = ''.shift;
161              
162 27 50       68 local $\ = undef if (defined $\);
163 27         99 my $p_mutex = $self->{p_mutex};
164 27 100       86 $p_mutex->lock2 if $p_mutex;
165              
166 27         39 print { $self->{p_sock} } pack('i', length $data), $data;
  27         544  
167 27 100       86 $p_mutex->unlock2 if $p_mutex;
168              
169 27         58 return 1;
170             }
171              
172             sub recv {
173 9     9 1 124 my ( $self ) = @_;
174              
175 9         263 ( my $c_mutex = $self->{c_mutex} )->lock;
176 9         103 MCE::Util::_sysread( $self->{c_sock}, my($plen), 4 );
177              
178 9         47 my $len = unpack('i', $plen);
179 9 50       57 if ( $len < 0 ) {
180 0         0 $self->end, $c_mutex->unlock;
181 0 0       0 return wantarray ? () : undef;
182             }
183              
184 9 100       42 MCE::Channel::_read( $self->{c_sock}, my($data), $len ) if $len;
185 9         152 $c_mutex->unlock;
186              
187 9 100       91 $len ? $data : '';
188             }
189              
190             sub recv_nb {
191 6     6 1 28 my ( $self ) = @_;
192              
193 6         33 ( my $c_mutex = $self->{c_mutex} )->lock;
194 6         26 MCE::Util::_nonblocking( $self->{c_sock}, 1 );
195 6         20 MCE::Util::_sysread( $self->{c_sock}, my($plen), 4 );
196 6         21 MCE::Util::_nonblocking( $self->{c_sock}, 0 );
197              
198 6 50       9 my $len; $len = unpack('i', $plen) if $plen;
  6         30  
199 6 100 66     65 if ( !$len || $len < 0 ) {
200 4 50 33     28 $self->end if defined $len && $len < 0;
201 4         19 $c_mutex->unlock;
202 4 50 33     55 return '' if defined $len && $len == 0;
203 0 0       0 return wantarray ? () : undef;
204             }
205              
206 2         14 MCE::Channel::_read( $self->{c_sock}, my($data), $len );
207 2         8 $c_mutex->unlock;
208              
209 2         9 $data;
210             }
211              
212             ###############################################################################
213             ## ----------------------------------------------------------------------------
214             ## Methods for two-way communication; consumer to producer.
215             ##
216             ###############################################################################
217              
218             sub send2 {
219 12     12 1 4337 my $self = shift;
220 12         32 my $data = ''.shift;
221 12         22 my $sig;
222              
223             {
224 12         18 local $SIG{ABRT} = local $SIG{HUP} =
225             local $SIG{QUIT} = local $SIG{INT} =
226             local $SIG{TERM} = sub {
227 0     0   0 $sig = $_[0];
228 12         517 };
229              
230 12 50       65 local $\ = undef if (defined $\);
231             $self->{c_mutex}->synchronize2( sub {
232 12     12   23 print { $self->{c_sock} } pack('i', length $data), $data;
  12         254  
233 12         157 });
234             }
235              
236 12 50       61 CORE::kill($sig, $$) if $sig;
237              
238 12         40 return 1;
239             }
240              
241             sub recv2 {
242 6     6 1 99 my ( $self ) = @_;
243 6         12 my ( $plen, $data );
244              
245 6 50       27 local $/ = $LF if ( $/ ne $LF );
246 6         58 my $p_mutex = $self->{p_mutex};
247 6 100       29 $p_mutex->lock if $p_mutex;
248              
249             ( $p_mutex )
250             ? MCE::Util::_sysread( $self->{p_sock}, $plen, 4 )
251 6 100       82 : read( $self->{p_sock}, $plen, 4 );
252              
253 6         22 my $len = unpack('i', $plen);
254              
255 6 100       18 if ( $len ) {
256             ( $p_mutex )
257             ? MCE::Channel::_read( $self->{p_sock}, $data, $len )
258 2 100       11 : read( $self->{p_sock}, $data, $len );
259             }
260              
261 6 100       23 $p_mutex->unlock if $p_mutex;
262              
263 6 100       83 $len ? $data : '';
264             }
265              
266             sub recv2_nb {
267 6     6 1 100 my ( $self ) = @_;
268 6         16 my ( $plen, $data );
269              
270 6 50       27 local $/ = $LF if ( $/ ne $LF );
271 6         15 my $p_mutex = $self->{p_mutex};
272 6 100       25 $p_mutex->lock if $p_mutex;
273              
274 6         27 MCE::Util::_nonblocking( $self->{p_sock}, 1 );
275              
276             ( $p_mutex )
277             ? MCE::Util::_sysread( $self->{p_sock}, $plen, 4 )
278 6 100       57 : read( $self->{p_sock}, $plen, 4 );
279              
280 6         22 MCE::Util::_nonblocking( $self->{p_sock}, 0 );
281              
282 6 50       11 my $len; $len = unpack('i', $plen) if $plen;
  6         30  
283 6 100       17 if ( !$len ) {
284 4 100       20 $p_mutex->unlock if $p_mutex;
285 4 50 33     43 return '' if defined $len && $len == 0;
286 0 0       0 return wantarray ? () : undef;
287             }
288              
289             ( $p_mutex )
290             ? MCE::Channel::_read( $self->{p_sock}, $data, $len )
291 2 100       14 : read( $self->{p_sock}, $data, $len );
292              
293 2 100       11 $p_mutex->unlock if $p_mutex;
294              
295 2         14 $data;
296             }
297              
298             1;
299              
300             __END__
301              
302             ###############################################################################
303             ## ----------------------------------------------------------------------------
304             ## Module usage.
305             ##
306             ###############################################################################
307              
308             =head1 NAME
309              
310             MCE::Channel::MutexFast - Fast channel for producer(s) and many consumers
311              
312             =head1 VERSION
313              
314             This document describes MCE::Channel::MutexFast version 1.902
315              
316             =head1 DESCRIPTION
317              
318             A channel class providing queue-like and two-way communication
319             for processes and threads. Locking is handled using MCE::Mutex.
320              
321             This is similar to L<MCE::Channel::Mutex> but optimized for
322             non-Unicode strings only. The main difference is that this module
323             lacks freeze-thaw serialization. Non-string arguments become
324             stringified; i.e. numbers and undef.
325              
326             The API is described in L<MCE::Channel> with the sole difference
327             being C<send> and C<send2> handle one argument.
328              
329             Current module available since MCE 1.877.
330              
331             =over 3
332              
333             =item new
334              
335             use MCE::Channel;
336              
337             # The default is tuned for one producer and many consumers.
338             my $chnl_a = MCE::Channel->new( impl => 'MutexFast' );
339              
340             # Specify the 'mp' option for safe use by two or more producers
341             # sending or receiving on the left side of the channel (i.e.
342             # ->enqueue/->send or ->recv2/->recv2_nb).
343              
344             my $chnl_b = MCE::Channel->new( impl => 'MutexFast', mp => 1 );
345              
346             =back
347              
348             =head1 QUEUE-LIKE BEHAVIOR
349              
350             =over 3
351              
352             =item enqueue
353              
354             =item dequeue
355              
356             =item dequeue_nb
357              
358             =item end
359              
360             =back
361              
362             =head1 TWO-WAY IPC - PRODUCER TO CONSUMER
363              
364             =over 3
365              
366             =item send
367              
368             =item recv
369              
370             =item recv_nb
371              
372             =back
373              
374             =head1 TWO-WAY IPC - CONSUMER TO PRODUCER
375              
376             =over 3
377              
378             =item send2
379              
380             =item recv2
381              
382             =item recv2_nb
383              
384             =back
385              
386             =head1 AUTHOR
387              
388             Mario E. Roy, S<E<lt>marioeroy AT gmail DOT comE<gt>>
389              
390             =cut
391