File Coverage

blib/lib/MCE/Mutex/Channel.pm
Criterion Covered Total %
statement 50 78 64.1
branch 17 64 26.5
condition 4 12 33.3
subroutine 14 19 73.6
pod 5 5 100.0
total 90 178 50.5


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## MCE::Mutex::Channel - Mutex locking via a pipe or socket.
4             ##
5             ###############################################################################
6              
7             package MCE::Mutex::Channel;
8              
9 102     102   2928 use strict;
  102         257  
  102         3697  
10 102     102   555 use warnings;
  102         189  
  102         3143  
11              
12 102     102   471 no warnings qw( threads recursion uninitialized once );
  102         254  
  102         7330  
13              
14             our $VERSION = '1.888';
15              
16 102     102   58946 use if $^O eq 'MSWin32', 'threads';
  102         1364  
  102         679  
17 102     102   5638 use if $^O eq 'MSWin32', 'threads::shared';
  102         208  
  102         436  
18              
19 102     102   3261 use base 'MCE::Mutex';
  102         183  
  102         10351  
20 102     102   1646 use MCE::Util ();
  102         247  
  102         2111  
21 102     102   461 use Scalar::Util qw(looks_like_number weaken);
  102         212  
  102         9694  
22 102     102   809 use Time::HiRes 'alarm';
  102         224  
  102         844  
23              
24             my $is_MSWin32 = ($^O eq 'MSWin32') ? 1 : 0;
25             my $use_pipe = ($^O !~ /mswin|mingw|msys|cygwin/i && $] gt '5.010000');
26             my $tid = $INC{'threads.pm'} ? threads->tid : 0;
27              
28             sub CLONE {
29 0 0   0   0 $tid = threads->tid if $INC{'threads.pm'};
30             }
31              
32             sub DESTROY {
33 256 50   256   4562 my ($pid, $obj) = ($tid ? $$ .'.'. $tid : $$, @_);
34              
35 256 50       2947 CORE::syswrite($obj->{_w_sock}, '0'), $obj->{$pid } = 0 if $obj->{$pid };
36 256 50       1329 CORE::syswrite($obj->{_r_sock}, '0'), $obj->{$pid.'b'} = 0 if $obj->{$pid.'b'};
37              
38 256 100       1730 if ( $obj->{_init_pid} eq $pid ) {
39 210 100 66     3031 (!$use_pipe || $obj->{impl} eq 'Channel2')
40             ? MCE::Util::_destroy_socks($obj, qw(_w_sock _r_sock))
41             : MCE::Util::_destroy_pipes($obj, qw(_w_sock _r_sock));
42             }
43              
44 256         13615 return;
45             }
46              
47             my @mutex;
48              
49             sub _destroy {
50 0 0   0   0 my $pid = $tid ? $$ .'.'. $tid : $$;
51              
52             # Called by { MCE, MCE::Child, and MCE::Hobo }::_exit
53 0         0 for my $i ( 0 .. @mutex - 1 ) {
54             CORE::syswrite($mutex[$i]->{_w_sock}, '0'), $mutex[$i]->{$pid} = 0
55 0 0       0 if ( $mutex[$i]->{$pid} );
56             CORE::syswrite($mutex[$i]->{_r_sock}, '0'), $mutex[$i]->{$pid.'b'} = 0
57 0 0       0 if ( $mutex[$i]->{$pid.'b'} );
58             }
59             }
60              
61             sub _save_for_global_cleanup {
62 32     32   230 push(@mutex, $_[0]), weaken($mutex[-1]);
63             }
64              
65             ###############################################################################
66             ## ----------------------------------------------------------------------------
67             ## Public methods.
68             ##
69             ###############################################################################
70              
71             sub new {
72 431     431 1 2002 my ($class, %obj) = (@_, impl => 'Channel');
73 431 50       2085 $obj{_init_pid} = $tid ? $$ .'.'. $tid : $$;
74 431 50       1223 $obj{_t_lock} = threads::shared::share( my $t_lock ) if $is_MSWin32;
75              
76 431 50       2539 $use_pipe
77             ? MCE::Util::_pipe_pair(\%obj, qw(_r_sock _w_sock))
78             : MCE::Util::_sock_pair(\%obj, qw(_r_sock _w_sock));
79              
80 431         7484 CORE::syswrite($obj{_w_sock}, '0');
81 431         2724 bless \%obj, $class;
82              
83 431 100 66     5263 if ( caller !~ /^MCE:?/ || caller(1) !~ /^MCE:?/ ) {
84 2         6 MCE::Mutex::Channel::_save_for_global_cleanup(\%obj);
85             }
86              
87 431         4476 return \%obj;
88             }
89              
90             sub lock {
91 355 50   355 1 23504 my ($pid, $obj) = ($tid ? $$ .'.'. $tid : $$, shift);
92              
93             CORE::lock($obj->{_t_lock}), MCE::Util::_sock_ready($obj->{_r_sock})
94 355 50       1278 if $is_MSWin32;
95             MCE::Util::_sysread($obj->{_r_sock}, my($b), 1), $obj->{ $pid } = 1
96 355 50       8394 unless $obj->{ $pid };
97              
98 355         2816 return;
99             }
100              
101             *lock_exclusive = \&lock;
102             *lock_shared = \&lock;
103              
104             sub unlock {
105 355 50   355 1 2636 my ($pid, $obj) = ($tid ? $$ .'.'. $tid : $$, shift);
106              
107             CORE::syswrite($obj->{_w_sock}, '0'), $obj->{ $pid } = 0
108 355 50       12264 if $obj->{ $pid };
109              
110 355         2266 return;
111             }
112              
113             sub synchronize {
114 0 0   0 1   my ($pid, $obj, $code) = ($tid ? $$ .'.'. $tid : $$, shift, shift);
115 0           my (@ret, $b);
116              
117 0 0         return unless ref($code) eq 'CODE';
118              
119             # lock, run, unlock - inlined for performance
120             CORE::lock($obj->{_t_lock}), MCE::Util::_sock_ready($obj->{_r_sock})
121 0 0         if $is_MSWin32;
122             MCE::Util::_sysread($obj->{_r_sock}, $b, 1), $obj->{ $pid } = 1
123 0 0         unless $obj->{ $pid };
124              
125             (defined wantarray)
126 0 0         ? @ret = wantarray ? $code->(@_) : scalar $code->(@_)
    0          
127             : $code->(@_);
128              
129 0           CORE::syswrite($obj->{_w_sock}, '0'), $obj->{ $pid } = 0;
130              
131 0 0         return wantarray ? @ret : $ret[-1];
132             }
133              
134             *enter = \&synchronize;
135              
136             sub timedwait {
137 0     0 1   my ($obj, $timeout) = @_;
138              
139 0 0         $timeout = 1 unless defined $timeout;
140 0 0 0       Carp::croak('MCE::Mutex::Channel: timedwait (timeout) is not valid')
141             if (!looks_like_number($timeout) || $timeout < 0);
142              
143 0 0         $timeout = 0.0003 if $timeout < 0.0003;
144 0           local $@; my $ret = '';
  0            
145              
146 0           eval {
147 0     0     local $SIG{ALRM} = sub { alarm 0; die "alarm clock restart\n" };
  0            
  0            
148 0 0         alarm $timeout unless $is_MSWin32;
149              
150             die "alarm clock restart\n"
151 0 0 0       if $is_MSWin32 && MCE::Util::_sock_ready($obj->{_r_sock}, $timeout);
152              
153 0 0         (!$is_MSWin32)
154             ? ($obj->lock_exclusive, $ret = 1, alarm(0))
155             : ($obj->lock_exclusive, $ret = 1);
156             };
157              
158 0 0         alarm 0 unless $is_MSWin32;
159              
160 0           $ret;
161             }
162              
163             1;
164              
165             __END__