File Coverage

blib/lib/MCE/Shared/Condvar.pm
Criterion Covered Total %
statement 76 126 60.3
branch 10 52 19.2
condition 2 21 9.5
subroutine 21 43 48.8
pod 18 18 100.0
total 127 260 48.8


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## Condvar helper class.
4             ##
5             ###############################################################################
6              
7             package MCE::Shared::Condvar;
8              
9 3     3   2967 use strict;
  3         6  
  3         87  
10 3     3   18 use warnings;
  3         6  
  3         69  
11              
12 3     3   51 use 5.010001;
  3         9  
13              
14 3     3   15 no warnings qw( threads recursion uninitialized numeric );
  3         6  
  3         174  
15              
16             our $VERSION = '1.886';
17              
18 3     3   21 use MCE::Shared::Base ();
  3         3  
  3         63  
19 3     3   18 use MCE::Util ();
  3         3  
  3         42  
20 3     3   9 use MCE::Mutex ();
  3         6  
  3         153  
21              
22             use overload (
23 3         24 q("") => \&MCE::Shared::Base::_stringify,
24             q(0+) => \&MCE::Shared::Base::_numify,
25             fallback => 1
26 3     3   18 );
  3         27  
27              
28             my $LF = "\012"; Internals::SvREADONLY($LF, 1);
29             my $_tid = $INC{'threads.pm'} ? threads->tid() : 0;
30             my $_reset_flg = 1;
31              
32             sub CLONE {
33 0 0   0   0 $_tid = threads->tid() if $INC{'threads.pm'};
34             }
35              
36             sub DESTROY {
37 3     3   12 my ($_cv) = @_;
38 3 50       15 my $_pid = $_tid ? $$ .'.'. $_tid : $$;
39              
40 3 50       18 if ($_cv->{_init_pid} eq $_pid) {
41 3         15 MCE::Util::_destroy_socks($_cv, qw(_cw_sock _cr_sock));
42             }
43              
44 3         333 return;
45             }
46              
47             ###############################################################################
48             ## ----------------------------------------------------------------------------
49             ## Public methods.
50             ##
51             ###############################################################################
52              
53             sub new {
54 3     3 1 12 my ($_class, $_cv) = (shift, {});
55              
56 3 50       18 $_cv->{_init_pid} = $_tid ? $$ .'.'. $_tid : $$;
57 3   50     18 $_cv->{_value} = shift || 0;
58 3         6 $_cv->{_count} = 0;
59              
60 3         18 MCE::Util::_sock_pair($_cv, qw(_cr_sock _cw_sock), undef, 1);
61              
62             MCE::Shared::Object::_reset(), $_reset_flg = ''
63 3 50 33     789 if $_reset_flg && $INC{'MCE/Shared/Server.pm'};
64              
65 3         21 bless $_cv, $_class;
66             }
67              
68 0     0 1 0 sub get { $_[0]->{_value} }
69 0     0 1 0 sub set { $_[0]->{_value} = $_[1] }
70              
71             # The following methods applies to shared-context only and handled by
72             # MCE::Shared::Object.
73              
74       0 1   sub lock { }
75       0 1   sub unlock { }
76              
77       0 1   sub broadcast { }
78       0 1   sub signal { }
79       0 1   sub timedwait { }
80       0 1   sub wait { }
81              
82             ###############################################################################
83             ## ----------------------------------------------------------------------------
84             ## Sugar API, mostly resembles https://redis.io/commands#string primitives.
85             ##
86             ###############################################################################
87              
88             # append ( string )
89              
90             sub append {
91 0   0 0 1 0 length( $_[0]->{_value} .= $_[1] // '' );
92             }
93              
94             # decr
95             # decrby ( number )
96             # incr
97             # incrby ( number )
98             # getdecr
99             # getincr
100              
101 0     0 1 0 sub decr { --$_[0]->{_value} }
102 0   0 0 1 0 sub decrby { $_[0]->{_value} -= $_[1] || 0 }
103 0     0 1 0 sub incr { ++$_[0]->{_value} }
104 0   0 0 1 0 sub incrby { $_[0]->{_value} += $_[1] || 0 }
105 0   0 0 1 0 sub getdecr { $_[0]->{_value}-- // 0 }
106 0   0 0 1 0 sub getincr { $_[0]->{_value}++ // 0 }
107              
108             # getset ( value )
109              
110             sub getset {
111 0     0 1 0 my $old = $_[0]->{_value};
112 0         0 $_[0]->{_value} = $_[1];
113              
114 0         0 $old;
115             }
116              
117             # len ( )
118              
119             sub len {
120 0     0 1 0 length $_[0]->{_value};
121             }
122              
123             ###############################################################################
124             ## ----------------------------------------------------------------------------
125             ## Server functions.
126             ##
127             ###############################################################################
128              
129             {
130 3     3   2406 use bytes;
  3         6  
  3         15  
131              
132             use constant {
133 3         1743 SHR_O_CVB => 'O~CVB', # Condvar broadcast
134             SHR_O_CVS => 'O~CVS', # Condvar signal
135             SHR_O_CVT => 'O~CVT', # Condvar timedwait
136             SHR_O_CVW => 'O~CVW', # Condvar wait
137 3     3   171 };
  3         6  
138              
139             my ( $_DAU_R_SOCK_REF, $_DAU_R_SOCK, $_obj, $_id );
140              
141             my %_output_function = (
142              
143             SHR_O_CVB.$LF => sub { # Condvar broadcast
144             $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
145             chomp($_id = <$_DAU_R_SOCK>);
146              
147             my $_var = $_obj->{ $_id } || do {
148             print {$_DAU_R_SOCK} $LF;
149             return;
150             };
151             for my $_i (1 .. $_var->{_count}) {
152             syswrite($_var->{_cw_sock}, $LF);
153             }
154              
155             $_var->{_count} = 0;
156             print {$_DAU_R_SOCK} $LF;
157              
158             return;
159             },
160              
161             SHR_O_CVS.$LF => sub { # Condvar signal
162             $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
163             chomp($_id = <$_DAU_R_SOCK>);
164              
165             my $_var = $_obj->{ $_id } || do {
166             print {$_DAU_R_SOCK} $LF;
167             return;
168             };
169             if ( $_var->{_count} >= 0 ) {
170             syswrite($_var->{_cw_sock}, $LF);
171             $_var->{_count} -= 1;
172             }
173              
174             print {$_DAU_R_SOCK} $LF;
175              
176             return;
177             },
178              
179             SHR_O_CVT.$LF => sub { # Condvar timedwait
180             $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
181             chomp($_id = <$_DAU_R_SOCK>);
182              
183             my $_var = $_obj->{ $_id } || do {
184             print {$_DAU_R_SOCK} $LF;
185             return;
186             };
187              
188             $_var->{_count} -= 1;
189             print {$_DAU_R_SOCK} $LF;
190              
191             return;
192             },
193              
194             SHR_O_CVW.$LF => sub { # Condvar wait
195             $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
196             chomp($_id = <$_DAU_R_SOCK>);
197              
198             my $_var = $_obj->{ $_id } || do {
199             print {$_DAU_R_SOCK} $LF;
200             return;
201             };
202              
203             $_var->{_count} += 1;
204             print {$_DAU_R_SOCK} $LF;
205              
206             return;
207             },
208              
209             );
210              
211             sub _init_mgr {
212 0     0   0 my $_function;
213 0         0 ( $_DAU_R_SOCK_REF, $_obj, $_function ) = @_;
214              
215 0         0 for my $key ( keys %_output_function ) {
216 0 0       0 last if exists($_function->{$key});
217 0         0 $_function->{$key} = $_output_function{$key};
218             }
219              
220 0         0 return;
221             }
222             }
223              
224             ###############################################################################
225             ## ----------------------------------------------------------------------------
226             ## Object package.
227             ##
228             ###############################################################################
229              
230             ## Items below are folded into MCE::Shared::Object.
231              
232             package # hide from rpm
233             MCE::Shared::Object;
234              
235 3     3   24 use strict;
  3         6  
  3         90  
236 3     3   18 use warnings;
  3         6  
  3         90  
237              
238 3     3   12 no warnings qw( threads recursion uninitialized numeric once );
  3         6  
  3         144  
239              
240 3     3   15 use Time::HiRes qw( alarm sleep );
  3         6  
  3         15  
241 3     3   333 use bytes;
  3         9  
  3         12  
242              
243 3     3   99 no overloading;
  3         6  
  3         2409  
244              
245             my $_is_MSWin32 = ($^O eq 'MSWin32') ? 1 : 0;
246              
247             my ($_DAT_LOCK, $_DAT_W_SOCK, $_DAU_W_SOCK, $_dat_ex, $_dat_un, $_chn, $_obj);
248              
249             sub _init_condvar {
250 8     8   50 ($_DAT_LOCK, $_DAT_W_SOCK, $_DAU_W_SOCK, $_dat_ex, $_dat_un, $_chn, $_obj) = @_;
251              
252 8         39 return;
253             }
254              
255             # broadcast ( floating_seconds )
256             # broadcast ( )
257              
258             sub broadcast {
259 0     0   0 my $_id = $_[0]->[0];
260 0 0       0 return unless ( my $_CV = $_obj->{ $_id } );
261 0 0       0 return unless ( exists $_CV->{_cr_sock} );
262              
263 0 0       0 sleep($_[1]) if defined $_[1];
264              
265 0         0 _req1('O~CVB', $_id.$LF);
266 0         0 $_[0]->[6]->unlock();
267              
268 0         0 sleep(0);
269             }
270              
271             # signal ( floating_seconds )
272             # signal ( )
273              
274             sub signal {
275 1     1   97 my $_id = $_[0]->[0];
276 1 50       27 return unless ( my $_CV = $_obj->{ $_id } );
277 1 50       8 return unless ( exists $_CV->{_cr_sock} );
278              
279 1 50       5 sleep($_[1]) if defined $_[1];
280              
281 1         44 _req1('O~CVS', $_id.$LF);
282 1         18 $_[0]->[6]->unlock();
283              
284 1         152 sleep(0);
285             }
286              
287             # timedwait ( floating_seconds )
288              
289             sub timedwait {
290 0     0   0 my $_id = $_[0]->[0];
291 0         0 my $_timeout = $_[1];
292              
293 0 0       0 return unless ( my $_CV = $_obj->{ $_id } );
294 0 0       0 return unless ( exists $_CV->{_cr_sock} );
295 0 0       0 return $_[0]->wait() unless $_timeout;
296              
297 0 0 0     0 _croak('Condvar: timedwait (timeout) is not valid')
298             if (!looks_like_number($_timeout) || $_timeout < 0);
299              
300 0         0 _req1('O~CVW', $_id.$LF);
301 0         0 $_[0]->[6]->unlock();
302              
303 0 0       0 $_timeout = 0.0003 if $_timeout < 0.0003;
304              
305 0         0 local $@; eval {
  0         0  
306 0     0   0 local $SIG{ALRM} = sub { alarm 0; die "alarm clock restart\n" };
  0         0  
  0         0  
307 0 0       0 alarm $_timeout unless $_is_MSWin32;
308              
309             die "alarm clock restart\n"
310 0 0 0     0 if $_is_MSWin32 && MCE::Util::_sock_ready($_CV->{_cr_sock}, $_timeout);
311              
312             (!$_is_MSWin32)
313             ? (MCE::Util::_sysread($_CV->{_cr_sock}, my($_b1), 1), alarm(0))
314 0 0       0 : (MCE::Util::_sysread($_CV->{_cr_sock}, my($_b2), 1));
315             };
316              
317 0 0       0 alarm 0 unless $_is_MSWin32;
318              
319 0 0       0 if ($@) {
320 0 0       0 chomp($@), _croak($@) unless $@ eq "alarm clock restart\n";
321 0         0 _req1('O~CVT', $_id.$LF);
322              
323 0         0 return '';
324             }
325              
326 0         0 return 1;
327             }
328              
329             # wait ( )
330              
331             sub wait {
332 2     2   274 my $_id = $_[0]->[0];
333 2 50       54 return unless ( my $_CV = $_obj->{ $_id } );
334 2 50       8 return unless ( exists $_CV->{_cr_sock} );
335              
336 2         58 _req1('O~CVW', $_id.$LF);
337 2         38 $_[0]->[6]->unlock();
338              
339 2 50       64 MCE::Util::_sock_ready($_CV->{_cr_sock}) if $_is_MSWin32;
340 2         30 MCE::Util::_sysread($_CV->{_cr_sock}, my($_b), 1);
341              
342 2         4004442 return 1;
343             }
344              
345             1;
346              
347             __END__