File Coverage

blib/lib/MCE/Shared/Condvar.pm
Criterion Covered Total %
statement 101 126 80.1
branch 23 52 44.2
condition 4 21 19.0
subroutine 23 43 53.4
pod 18 18 100.0
total 169 260 65.0


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## Condvar helper class.
4             ##
5             ###############################################################################
6              
7             package MCE::Shared::Condvar;
8              
9 6     6   5958 use strict;
  6         12  
  6         174  
10 6     6   30 use warnings;
  6         12  
  6         204  
11              
12 6     6   174 use 5.010001;
  6         24  
13              
14 6     6   24 no warnings qw( threads recursion uninitialized numeric );
  6         12  
  6         276  
15              
16             our $VERSION = '1.881';
17              
18 6     6   102 use MCE::Shared::Base ();
  6         12  
  6         102  
19 6     6   30 use MCE::Util ();
  6         12  
  6         120  
20 6     6   66 use MCE::Mutex ();
  6         18  
  6         288  
21              
22             use overload (
23 6         36 q("") => \&MCE::Shared::Base::_stringify,
24             q(0+) => \&MCE::Shared::Base::_numify,
25             fallback => 1
26 6     6   72 );
  6         18  
27              
28             my $LF = "\012"; Internals::SvREADONLY($LF, 1);
29             my $_tid = $INC{'threads.pm'} ? threads->tid() : 0;
30             my $_reset_flg = 1;
31              
32             sub CLONE {
33 0 0   0   0 $_tid = threads->tid() if $INC{'threads.pm'};
34             }
35              
36             sub DESTROY {
37 6     6   24 my ($_cv) = @_;
38 6 50       36 my $_pid = $_tid ? $$ .'.'. $_tid : $$;
39              
40 6 50       36 if ($_cv->{_init_pid} eq $_pid) {
41 6         66 MCE::Util::_destroy_socks($_cv, qw(_cw_sock _cr_sock));
42             }
43              
44 6         606 return;
45             }
46              
47             ###############################################################################
48             ## ----------------------------------------------------------------------------
49             ## Public methods.
50             ##
51             ###############################################################################
52              
53             sub new {
54 6     6 1 36 my ($_class, $_cv) = (shift, {});
55              
56 6 50       54 $_cv->{_init_pid} = $_tid ? $$ .'.'. $_tid : $$;
57 6   50     36 $_cv->{_value} = shift || 0;
58 6         24 $_cv->{_count} = 0;
59              
60 6         42 MCE::Util::_sock_pair($_cv, qw(_cr_sock _cw_sock), undef, 1);
61              
62             MCE::Shared::Object::_reset(), $_reset_flg = ''
63 6 50 33     1614 if $_reset_flg && $INC{'MCE/Shared/Server.pm'};
64              
65 6         48 bless $_cv, $_class;
66             }
67              
68 0     0 1 0 sub get { $_[0]->{_value} }
69 0     0 1 0 sub set { $_[0]->{_value} = $_[1] }
70              
71             # The following methods applies to shared-context only and handled by
72             # MCE::Shared::Object.
73              
74       0 1   sub lock { }
75       0 1   sub unlock { }
76              
77       0 1   sub broadcast { }
78       0 1   sub signal { }
79       0 1   sub timedwait { }
80       0 1   sub wait { }
81              
82             ###############################################################################
83             ## ----------------------------------------------------------------------------
84             ## Sugar API, mostly resembles https://redis.io/commands#string primitives.
85             ##
86             ###############################################################################
87              
88             # append ( string )
89              
90             sub append {
91 0   0 0 1 0 length( $_[0]->{_value} .= $_[1] // '' );
92             }
93              
94             # decr
95             # decrby ( number )
96             # incr
97             # incrby ( number )
98             # getdecr
99             # getincr
100              
101 0     0 1 0 sub decr { --$_[0]->{_value} }
102 0   0 0 1 0 sub decrby { $_[0]->{_value} -= $_[1] || 0 }
103 0     0 1 0 sub incr { ++$_[0]->{_value} }
104 0   0 0 1 0 sub incrby { $_[0]->{_value} += $_[1] || 0 }
105 0   0 0 1 0 sub getdecr { $_[0]->{_value}-- // 0 }
106 0   0 0 1 0 sub getincr { $_[0]->{_value}++ // 0 }
107              
108             # getset ( value )
109              
110             sub getset {
111 0     0 1 0 my $old = $_[0]->{_value};
112 0         0 $_[0]->{_value} = $_[1];
113              
114 0         0 $old;
115             }
116              
117             # len ( )
118              
119             sub len {
120 0     0 1 0 length $_[0]->{_value};
121             }
122              
123             ###############################################################################
124             ## ----------------------------------------------------------------------------
125             ## Server functions.
126             ##
127             ###############################################################################
128              
129             {
130 6     6   4446 use bytes;
  6         78  
  6         30  
131              
132             use constant {
133 6         3414 SHR_O_CVB => 'O~CVB', # Condvar broadcast
134             SHR_O_CVS => 'O~CVS', # Condvar signal
135             SHR_O_CVT => 'O~CVT', # Condvar timedwait
136             SHR_O_CVW => 'O~CVW', # Condvar wait
137 6     6   270 };
  6         12  
138              
139             my ( $_DAU_R_SOCK_REF, $_DAU_R_SOCK, $_obj, $_id );
140              
141             my %_output_function = (
142              
143             SHR_O_CVB.$LF => sub { # Condvar broadcast
144             $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
145             chomp($_id = <$_DAU_R_SOCK>);
146              
147             my $_var = $_obj->{ $_id } || do {
148             print {$_DAU_R_SOCK} $LF;
149             };
150             for my $_i (1 .. $_var->{_count}) {
151             syswrite($_var->{_cw_sock}, $LF);
152             }
153              
154             $_var->{_count} = 0;
155             print {$_DAU_R_SOCK} $LF;
156              
157             return;
158             },
159              
160             SHR_O_CVS.$LF => sub { # Condvar signal
161             $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
162             chomp($_id = <$_DAU_R_SOCK>);
163              
164             my $_var = $_obj->{ $_id } || do {
165             print {$_DAU_R_SOCK} $LF;
166             };
167             if ( $_var->{_count} >= 0 ) {
168             syswrite($_var->{_cw_sock}, $LF);
169             $_var->{_count} -= 1;
170             }
171              
172             print {$_DAU_R_SOCK} $LF;
173              
174             return;
175             },
176              
177             SHR_O_CVT.$LF => sub { # Condvar timedwait
178             $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
179             chomp($_id = <$_DAU_R_SOCK>);
180              
181             my $_var = $_obj->{ $_id } || do {
182             print {$_DAU_R_SOCK} $LF;
183             };
184              
185             $_var->{_count} -= 1;
186             print {$_DAU_R_SOCK} $LF;
187              
188             return;
189             },
190              
191             SHR_O_CVW.$LF => sub { # Condvar wait
192             $_DAU_R_SOCK = ${ $_DAU_R_SOCK_REF };
193             chomp($_id = <$_DAU_R_SOCK>);
194              
195             my $_var = $_obj->{ $_id } || do {
196             print {$_DAU_R_SOCK} $LF;
197             };
198              
199             $_var->{_count} += 1;
200             print {$_DAU_R_SOCK} $LF;
201              
202             return;
203             },
204              
205             );
206              
207             sub _init_mgr {
208 0     0   0 my $_function;
209 0         0 ( $_DAU_R_SOCK_REF, $_obj, $_function ) = @_;
210              
211 0         0 for my $key ( keys %_output_function ) {
212 0 0       0 last if exists($_function->{$key});
213 0         0 $_function->{$key} = $_output_function{$key};
214             }
215              
216 0         0 return;
217             }
218             }
219              
220             ###############################################################################
221             ## ----------------------------------------------------------------------------
222             ## Object package.
223             ##
224             ###############################################################################
225              
226             ## Items below are folded into MCE::Shared::Object.
227              
228             package # hide from rpm
229             MCE::Shared::Object;
230              
231 6     6   48 use strict;
  6         12  
  6         120  
232 6     6   30 use warnings;
  6         6  
  6         336  
233              
234 6     6   36 no warnings qw( threads recursion uninitialized numeric once );
  6         6  
  6         288  
235              
236 6     6   36 use Time::HiRes qw( alarm sleep );
  6         24  
  6         30  
237 6     6   750 use bytes;
  6         36  
  6         18  
238              
239 6     6   198 no overloading;
  6         12  
  6         5250  
240              
241             my $_is_MSWin32 = ($^O eq 'MSWin32') ? 1 : 0;
242              
243             my ($_DAT_LOCK, $_DAT_W_SOCK, $_DAU_W_SOCK, $_dat_ex, $_dat_un, $_chn, $_obj);
244              
245             sub _init_condvar {
246 17     17   110 ($_DAT_LOCK, $_DAT_W_SOCK, $_DAU_W_SOCK, $_dat_ex, $_dat_un, $_chn, $_obj) = @_;
247              
248 17         141 return;
249             }
250              
251             # broadcast ( floating_seconds )
252             # broadcast ( )
253              
254             sub broadcast {
255 1     1   2000719 my $_id = $_[0]->[0];
256 1 50       70 return unless ( my $_CV = $_obj->{ $_id } );
257 1 50       62 return unless ( exists $_CV->{_cr_sock} );
258              
259 1 50       43 sleep($_[1]) if defined $_[1];
260              
261 1         99 _req1('O~CVB', $_id.$LF);
262 1         71 $_[0]->[6]->unlock();
263              
264 1         195 sleep(0);
265             }
266              
267             # signal ( floating_seconds )
268             # signal ( )
269              
270             sub signal {
271 1     1   117 my $_id = $_[0]->[0];
272 1 50       20 return unless ( my $_CV = $_obj->{ $_id } );
273 1 50       7 return unless ( exists $_CV->{_cr_sock} );
274              
275 1 50       5 sleep($_[1]) if defined $_[1];
276              
277 1         22 _req1('O~CVS', $_id.$LF);
278 1         22 $_[0]->[6]->unlock();
279              
280 1         150 sleep(0);
281             }
282              
283             # timedwait ( floating_seconds )
284              
285             sub timedwait {
286 2     2   79 my $_id = $_[0]->[0];
287 2         7 my $_timeout = $_[1];
288              
289 2 50       29 return unless ( my $_CV = $_obj->{ $_id } );
290 2 50       18 return unless ( exists $_CV->{_cr_sock} );
291 2 50       9 return $_[0]->wait() unless $_timeout;
292              
293 2 50 33     76 _croak('Condvar: timedwait (timeout) is not valid')
294             if (!looks_like_number($_timeout) || $_timeout < 0);
295              
296 2         37 _req1('O~CVW', $_id.$LF);
297 2         24 $_[0]->[6]->unlock();
298              
299 2 50       56 $_timeout = 0.0003 if $_timeout < 0.0003;
300              
301 2         7 local $@; eval {
  2         12  
302 2     0   74 local $SIG{ALRM} = sub { alarm 0; die "alarm clock restart\n" };
  0         0  
  0         0  
303 2 50       57 alarm $_timeout unless $_is_MSWin32;
304              
305             die "alarm clock restart\n"
306 2 50 33     36 if $_is_MSWin32 && MCE::Util::_sock_ready($_CV->{_cr_sock}, $_timeout);
307              
308             (!$_is_MSWin32)
309             ? (MCE::Util::_sysread($_CV->{_cr_sock}, my($_b1), 1), alarm(0))
310 2 50       35 : (MCE::Util::_sysread($_CV->{_cr_sock}, my($_b2), 1));
311             };
312              
313 2 50       4002909 alarm 0 unless $_is_MSWin32;
314              
315 2 50       21 if ($@) {
316 0 0       0 chomp($@), _croak($@) unless $@ eq "alarm clock restart\n";
317 0         0 _req1('O~CVT', $_id.$LF);
318              
319 0         0 return '';
320             }
321              
322 2         21 return 1;
323             }
324              
325             # wait ( )
326              
327             sub wait {
328 6     6   698 my $_id = $_[0]->[0];
329 6 50       118 return unless ( my $_CV = $_obj->{ $_id } );
330 6 50       25 return unless ( exists $_CV->{_cr_sock} );
331              
332 6         158 _req1('O~CVW', $_id.$LF);
333 6         50 $_[0]->[6]->unlock();
334              
335 6 50       125 MCE::Util::_sock_ready($_CV->{_cr_sock}) if $_is_MSWin32;
336 6         34 MCE::Util::_sysread($_CV->{_cr_sock}, my($_b), 1);
337              
338 6         12023817 return 1;
339             }
340              
341             1;
342              
343             __END__