File Coverage

blib/lib/MCE/Channel/Mutex.pm
Criterion Covered Total %
statement 148 161 91.9
branch 72 100 72.0
condition 6 18 33.3
subroutine 18 19 94.7
pod 11 11 100.0
total 255 309 82.5


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## Channel for producer(s) and many consumers supporting processes and threads.
4             ##
5             ###############################################################################
6              
7             package MCE::Channel::Mutex;
8              
9 11     11   3402 use strict;
  11         27  
  11         405  
10 11     11   53 use warnings;
  11         14  
  11         874  
11              
12 11     11   51 no warnings qw( uninitialized once );
  11         14  
  11         760  
13              
14             our $VERSION = '1.902';
15              
16 11     11   63 use base 'MCE::Channel';
  11         16  
  11         1545  
17 11     11   1163 use MCE::Mutex ();
  11         25  
  11         20192  
18              
19             my $LF = "\012"; Internals::SvREADONLY($LF, 1);
20             my $freeze = MCE::Channel::_get_freeze();
21             my $thaw = MCE::Channel::_get_thaw();
22              
23             sub new {
24 11     11 1 65 my ( $class, %obj ) = ( @_, impl => 'Mutex' );
25              
26 11         63 $obj{init_pid} = MCE::Channel::_pid();
27 11         78 MCE::Util::_sock_pair( \%obj, 'p_sock', 'c_sock' );
28              
29             # locking for the consumer side of the channel
30 11         81 $obj{c_mutex} = MCE::Mutex->new( impl => 'Channel2' );
31              
32             # optionally, support many-producers writing and reading
33 11 100       44 $obj{p_mutex} = MCE::Mutex->new( impl => 'Channel2' ) if $obj{mp};
34              
35 11         47 bless \%obj, $class;
36              
37 11         48 MCE::Mutex::Channel::_save_for_global_cleanup($obj{c_mutex});
38 11 100       68 MCE::Mutex::Channel::_save_for_global_cleanup($obj{p_mutex}) if $obj{mp};
39              
40 11         59 return \%obj;
41             }
42              
43             END {
44 11 100   11   5336 MCE::Child->finish('MCE') if $INC{'MCE/Child.pm'};
45             }
46              
47             ###############################################################################
48             ## ----------------------------------------------------------------------------
49             ## Queue-like methods.
50             ##
51             ###############################################################################
52              
53             sub end {
54 2     2 1 16 my ( $self ) = @_;
55              
56 2 50       11 local $\ = undef if (defined $\);
57 2         4 print { $self->{p_sock} } pack('i', -1);
  2         20  
58              
59 2         11 $self->{ended} = 1;
60             }
61              
62             sub enqueue {
63 30     30 1 5966 my $self = shift;
64 30 100       119 return MCE::Channel::_ended('enqueue') if $self->{ended};
65              
66 28 50       92 local $\ = undef if (defined $\);
67 28         61 my $p_mutex = $self->{p_mutex};
68 28 100       98 $p_mutex->lock2 if $p_mutex;
69              
70 28         77 while ( @_ ) {
71 48         347 my $data = $freeze->([ shift ]);
72 48         95 print { $self->{p_sock} } pack('i', length $data), $data;
  48         742  
73             }
74              
75 28 100       106 $p_mutex->unlock2 if $p_mutex;
76              
77 28         80 return 1;
78             }
79              
80             sub dequeue {
81 18     18 1 56 my ( $self, $count ) = @_;
82 18 100 66     78 $count = 1 if ( !$count || $count < 1 );
83              
84 18 100       64 if ( $count == 1 ) {
85 16         108 ( my $c_mutex = $self->{c_mutex} )->lock;
86 16         49 MCE::Util::_sysread( $self->{c_sock}, my($plen), 4 );
87              
88 16         49 my $len = unpack('i', $plen);
89 16 50       61 if ( $len < 0 ) {
90 0         0 $self->end, $c_mutex->unlock;
91 0 0       0 return wantarray ? () : undef;
92             }
93              
94 16         82 MCE::Channel::_read( $self->{c_sock}, my($data), $len );
95 16         58 $c_mutex->unlock;
96              
97 16 50       367 wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1];
  0         0  
98             }
99             else {
100 2         6 my ( $plen, @ret );
101              
102 2         14 ( my $c_mutex = $self->{c_mutex} )->lock;
103              
104 2         9 while ( $count-- ) {
105 6         23 MCE::Util::_sysread( $self->{c_sock}, $plen, 4 );
106              
107 6         17 my $len = unpack('i', $plen);
108 6 50       17 if ( $len < 0 ) {
109 0         0 $self->end;
110 0         0 last;
111             }
112              
113 6         22 MCE::Channel::_read( $self->{c_sock}, my($data), $len );
114 6         11 push @ret, @{ $thaw->($data) };
  6         54  
115             }
116              
117 2         23 $c_mutex->unlock;
118              
119 2 50       22 wantarray ? @ret : $ret[-1];
120             }
121             }
122              
123             sub dequeue_nb {
124 22     22 1 75 my ( $self, $count ) = @_;
125 22 100 66     98 $count = 1 if ( !$count || $count < 1 );
126              
127 22         41 my ( $plen, @ret );
128 22         108 ( my $c_mutex = $self->{c_mutex} )->lock;
129              
130 22         60 while ( $count-- ) {
131 26         93 MCE::Util::_nonblocking( $self->{c_sock}, 1 );
132 26         79 MCE::Util::_sysread( $self->{c_sock}, $plen, 4 );
133 26         78 MCE::Util::_nonblocking( $self->{c_sock}, 0 );
134              
135 26 50       45 my $len; $len = unpack('i', $plen) if $plen;
  26         98  
136 26 50 33     130 if ( !$len || $len < 0 ) {
137 0 0 0     0 $self->end if defined $len && $len < 0;
138 0         0 last;
139             }
140              
141 26         99 MCE::Channel::_read( $self->{c_sock}, my($data), $len );
142 26         46 push @ret, @{ $thaw->($data) };
  26         539  
143             }
144              
145 22         92 $c_mutex->unlock;
146              
147 22 100       183 wantarray ? @ret : $ret[-1];
148             }
149              
150             ###############################################################################
151             ## ----------------------------------------------------------------------------
152             ## Methods for two-way communication; producer to consumer.
153             ##
154             ###############################################################################
155              
156             sub send {
157 31     31 1 2406 my $self = shift;
158 31 100       177 return MCE::Channel::_ended('send') if $self->{ended};
159              
160 29         422 my $data = $freeze->([ @_ ]);
161              
162 29 50       116 local $\ = undef if (defined $\);
163 29         71 my $p_mutex = $self->{p_mutex};
164 29 100       110 $p_mutex->lock2 if $p_mutex;
165              
166 29         44 print { $self->{p_sock} } pack('i', length $data), $data;
  29         628  
167 29 100       129 $p_mutex->unlock2 if $p_mutex;
168              
169 29         120 return 1;
170             }
171              
172             sub recv {
173 10     10 1 24 my ( $self ) = @_;
174              
175 10         58 ( my $c_mutex = $self->{c_mutex} )->lock;
176 10         45 MCE::Util::_sysread( $self->{c_sock}, my($plen), 4 );
177              
178 10         34 my $len = unpack('i', $plen);
179 10 50       30 if ( $len < 0 ) {
180 0         0 $self->end, $c_mutex->unlock;
181 0 0       0 return wantarray ? () : undef;
182             }
183              
184 10         42 MCE::Channel::_read( $self->{c_sock}, my($data), $len );
185 10         110 $c_mutex->unlock;
186              
187 10 100       209 wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1];
  2         41  
188             }
189              
190             sub recv_nb {
191 10     10 1 50 my ( $self ) = @_;
192              
193 10         48 ( my $c_mutex = $self->{c_mutex} )->lock;
194 10         102 MCE::Util::_nonblocking( $self->{c_sock}, 1 );
195 10         33 MCE::Util::_sysread( $self->{c_sock}, my($plen), 4 );
196 10         129 MCE::Util::_nonblocking( $self->{c_sock}, 0 );
197              
198 10 50       17 my $len; $len = unpack('i', $plen) if $plen;
  10         45  
199 10 50 33     61 if ( !$len || $len < 0 ) {
200 0 0 0     0 $self->end if defined $len && $len < 0;
201 0         0 $c_mutex->unlock;
202 0 0       0 return wantarray ? () : undef;
203             }
204              
205 10         128 MCE::Channel::_read( $self->{c_sock}, my($data), $len );
206 10         41 $c_mutex->unlock;
207              
208 10 100       243 wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1];
  2         40  
209             }
210              
211             ###############################################################################
212             ## ----------------------------------------------------------------------------
213             ## Methods for two-way communication; consumer to producer.
214             ##
215             ###############################################################################
216              
217             sub send2 {
218 34     34 1 3986 my $self = shift;
219 34         769 my $data = $freeze->([ @_ ]);
220 34         242 my $sig;
221              
222             {
223 34         121 local $SIG{ABRT} = local $SIG{HUP} =
224             local $SIG{QUIT} = local $SIG{INT} =
225             local $SIG{TERM} = sub {
226 0     0   0 $sig = $_[0];
227 34         1909 };
228              
229 34 50       404 local $\ = undef if (defined $\);
230             $self->{c_mutex}->synchronize2( sub {
231 34     34   142 print { $self->{c_sock} } pack('i', length $data), $data;
  34         893  
232 34         946 });
233             }
234              
235 34 50       212 CORE::kill($sig, $$) if $sig;
236              
237 34         185 return 1;
238             }
239              
240             sub recv2 {
241 10     10 1 30 my ( $self ) = @_;
242 10         15 my ( $plen, $data );
243              
244 10 50       77 local $/ = $LF if ( $/ ne $LF );
245 10         22 my $p_mutex = $self->{p_mutex};
246 10 100       38 $p_mutex->lock if $p_mutex;
247              
248             ( $p_mutex )
249             ? MCE::Util::_sysread( $self->{p_sock}, $plen, 4 )
250 10 100       105 : read( $self->{p_sock}, $plen, 4 );
251              
252 10         35 my $len = unpack('i', $plen);
253              
254             ( $p_mutex )
255             ? MCE::Channel::_read( $self->{p_sock}, $data, $len )
256 10 100       64 : read( $self->{p_sock}, $data, $len );
257              
258 10 100       32 $p_mutex->unlock if $p_mutex;
259              
260 10 100       224 wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1];
  2         40  
261             }
262              
263             sub recv2_nb {
264 995     995 1 2639 my ( $self ) = @_;
265 995         1629 my ( $plen, $data );
266              
267 995 50       5602 local $/ = $LF if ( $/ ne $LF );
268 995         2803 my $p_mutex = $self->{p_mutex};
269 995 100       2592 $p_mutex->lock if $p_mutex;
270              
271 995         7316 MCE::Util::_nonblocking( $self->{p_sock}, 1 );
272              
273             ( $p_mutex )
274             ? MCE::Util::_sysread( $self->{p_sock}, $plen, 4 )
275 995 100       13498 : read( $self->{p_sock}, $plen, 4 );
276              
277 995         3213 MCE::Util::_nonblocking( $self->{p_sock}, 0 );
278              
279 995 100       1389 my $len; $len = unpack('i', $plen) if $plen;
  995         3811  
280 995 100       2545 if ( !$len ) {
281 908 50       2399 $p_mutex->unlock if $p_mutex;
282 908 50       5681 return wantarray ? () : undef;
283             }
284              
285             ( $p_mutex )
286             ? MCE::Channel::_read( $self->{p_sock}, $data, $len )
287 87 100       739 : read( $self->{p_sock}, $data, $len );
288              
289 87 100       438 $p_mutex->unlock if $p_mutex;
290              
291 87 100       2713 wantarray ? @{ $thaw->($data) } : ( $thaw->($data) )->[-1];
  2         54  
292             }
293              
294             1;
295              
296             __END__
297              
298             ###############################################################################
299             ## ----------------------------------------------------------------------------
300             ## Module usage.
301             ##
302             ###############################################################################
303              
304             =head1 NAME
305              
306             MCE::Channel::Mutex - Channel for producer(s) and many consumers
307              
308             =head1 VERSION
309              
310             This document describes MCE::Channel::Mutex version 1.902
311              
312             =head1 DESCRIPTION
313              
314             A channel class providing queue-like and two-way communication
315             for processes and threads. Locking is handled using MCE::Mutex.
316              
317             The API is described in L<MCE::Channel>.
318              
319             =over 3
320              
321             =item new
322              
323             use MCE::Channel;
324              
325             # The default is tuned for one producer and many consumers.
326             my $chnl_a = MCE::Channel->new( impl => 'Mutex' );
327              
328             # Specify the 'mp' option for safe use by two or more producers
329             # sending or receiving on the left side of the channel (i.e.
330             # ->enqueue/->send or ->recv2/->recv2_nb).
331              
332             my $chnl_b = MCE::Channel->new( impl => 'Mutex', mp => 1 );
333              
334             =back
335              
336             =head1 QUEUE-LIKE BEHAVIOR
337              
338             =over 3
339              
340             =item enqueue
341              
342             =item dequeue
343              
344             =item dequeue_nb
345              
346             =item end
347              
348             =back
349              
350             =head1 TWO-WAY IPC - PRODUCER TO CONSUMER
351              
352             =over 3
353              
354             =item send
355              
356             =item recv
357              
358             =item recv_nb
359              
360             =back
361              
362             =head1 TWO-WAY IPC - CONSUMER TO PRODUCER
363              
364             =over 3
365              
366             =item send2
367              
368             =item recv2
369              
370             =item recv2_nb
371              
372             =back
373              
374             =head1 AUTHOR
375              
376             Mario E. Roy, S<E<lt>marioeroy AT gmail DOT comE<gt>>
377              
378             =cut
379