File Coverage

blib/lib/MCE/Mutex/Channel.pm
Criterion Covered Total %
statement 51 86 59.3
branch 17 68 25.0
condition 4 12 33.3
subroutine 14 21 66.6
pod 6 6 100.0
total 92 193 47.6


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## MCE::Mutex::Channel - Mutex locking via a pipe or socket.
4             ##
5             ###############################################################################
6              
7             package MCE::Mutex::Channel;
8              
9 102     102   2821 use strict;
  102         209  
  102         3150  
10 102     102   532 use warnings;
  102         176  
  102         2829  
11              
12 102     102   524 no warnings qw( threads recursion uninitialized once );
  102         177  
  102         6534  
13              
14             our $VERSION = '1.889';
15              
16 102     102   54199 use if $^O eq 'MSWin32', 'threads';
  102         1175  
  102         650  
17 102     102   4897 use if $^O eq 'MSWin32', 'threads::shared';
  102         220  
  102         419  
18              
19 102     102   3064 use base 'MCE::Mutex';
  102         172  
  102         10096  
20 102     102   1549 use MCE::Util ();
  102         211  
  102         2046  
21 102     102   486 use Scalar::Util qw(looks_like_number weaken);
  102         249  
  102         5552  
22 102     102   589 use Time::HiRes 'alarm';
  102         185  
  102         730  
23              
24             my $is_MSWin32 = ($^O eq 'MSWin32') ? 1 : 0;
25             my $use_pipe = ($^O !~ /mswin|mingw|msys|cygwin/i && $] gt '5.010000');
26             my $tid = $INC{'threads.pm'} ? threads->tid : 0;
27              
28             sub CLONE {
29 0 0   0   0 $tid = threads->tid if $INC{'threads.pm'};
30             }
31              
32             sub MCE::Mutex::Channel::_guard::DESTROY {
33 0     0   0 my ($pid, $obj) = @{ $_[0] };
  0         0  
34 0 0       0 CORE::syswrite($obj->{_w_sock}, '0'), $obj->{ $pid } = 0 if $obj->{ $pid };
35              
36 0         0 return;
37             }
38              
39             sub DESTROY {
40 256 50   256   4396 my ($pid, $obj) = ($tid ? $$ .'.'. $tid : $$, @_);
41 256 50       1931 CORE::syswrite($obj->{_w_sock}, '0'), $obj->{$pid } = 0 if $obj->{$pid };
42 256 50       1008 CORE::syswrite($obj->{_r_sock}, '0'), $obj->{$pid.'b'} = 0 if $obj->{$pid.'b'};
43              
44 256 100       1349 if ( $obj->{_init_pid} eq $pid ) {
45 210 100 66     2313 (!$use_pipe || $obj->{impl} eq 'Channel2')
46             ? MCE::Util::_destroy_socks($obj, qw(_w_sock _r_sock))
47             : MCE::Util::_destroy_pipes($obj, qw(_w_sock _r_sock));
48             }
49              
50 256         11869 return;
51             }
52              
53             my @mutex;
54              
55             sub _destroy {
56 0 0   0   0 my $pid = $tid ? $$ .'.'. $tid : $$;
57              
58             # Called by { MCE, MCE::Child, and MCE::Hobo }::_exit
59 0         0 for my $i ( 0 .. @mutex - 1 ) {
60             CORE::syswrite($mutex[$i]->{_w_sock}, '0'), $mutex[$i]->{$pid} = 0
61 0 0       0 if ( $mutex[$i]->{$pid} );
62             CORE::syswrite($mutex[$i]->{_r_sock}, '0'), $mutex[$i]->{$pid.'b'} = 0
63 0 0       0 if ( $mutex[$i]->{$pid.'b'} );
64             }
65             }
66              
67             sub _save_for_global_cleanup {
68 32     32   177 push(@mutex, $_[0]), weaken($mutex[-1]);
69             }
70              
71             ###############################################################################
72             ## ----------------------------------------------------------------------------
73             ## Public methods.
74             ##
75             ###############################################################################
76              
77             sub new {
78 431     431 1 1730 my ($class, %obj) = (@_, impl => 'Channel');
79 431 50       1863 $obj{_init_pid} = $tid ? $$ .'.'. $tid : $$;
80 431 50       1142 $obj{_t_lock} = threads::shared::share( my $t_lock ) if $is_MSWin32;
81              
82 431 50       2258 $use_pipe
83             ? MCE::Util::_pipe_pair(\%obj, qw(_r_sock _w_sock))
84             : MCE::Util::_sock_pair(\%obj, qw(_r_sock _w_sock));
85              
86 431         5683 CORE::syswrite($obj{_w_sock}, '0');
87 431         2458 bless \%obj, $class;
88              
89 431 100 66     4740 if ( caller !~ /^MCE:?/ || caller(1) !~ /^MCE:?/ ) {
90 2         6 MCE::Mutex::Channel::_save_for_global_cleanup(\%obj);
91             }
92              
93 431         3959 return \%obj;
94             }
95              
96             sub lock {
97 355 50   355 1 1867 my ($pid, $obj) = ($tid ? $$ .'.'. $tid : $$, shift);
98              
99 355 50       3911 unless ($obj->{ $pid }) {
100             CORE::lock($obj->{_t_lock}), MCE::Util::_sock_ready($obj->{_r_sock})
101 355 50       968 if $is_MSWin32;
102 355         3151 MCE::Util::_sysread($obj->{_r_sock}, my($b), 1), $obj->{ $pid } = 1;
103             }
104              
105 355         1651 return;
106             }
107              
108             sub guard_lock {
109 0     0 1 0 &lock(@_);
110 0 0       0 bless([ $tid ? $$ .'.'. $tid : $$, $_[0] ], MCE::Mutex::Channel::_guard::);
111             }
112              
113             *lock_exclusive = \&lock;
114             *lock_shared = \&lock;
115              
116             sub unlock {
117 355 50   355 1 2686 my ($pid, $obj) = ($tid ? $$ .'.'. $tid : $$, shift);
118              
119             CORE::syswrite($obj->{_w_sock}, '0'), $obj->{ $pid } = 0
120 355 50       8898 if $obj->{ $pid };
121              
122 355         1984 return;
123             }
124              
125             sub synchronize {
126 0 0   0 1   my ($pid, $obj, $code) = ($tid ? $$ .'.'. $tid : $$, shift, shift);
127 0           my (@ret, $b);
128              
129 0 0         return unless ref($code) eq 'CODE';
130              
131             # lock, run, unlock - inlined for performance
132 0           my $guard = bless([ $pid, $obj ], MCE::Mutex::Channel::_guard::);
133 0 0         unless ($obj->{ $pid }) {
134             CORE::lock($obj->{_t_lock}), MCE::Util::_sock_ready($obj->{_r_sock})
135 0 0         if $is_MSWin32;
136 0           MCE::Util::_sysread($obj->{_r_sock}, $b, 1), $obj->{ $pid } = 1;
137             }
138             (defined wantarray)
139 0 0         ? @ret = wantarray ? $code->(@_) : scalar $code->(@_)
    0          
140             : $code->(@_);
141              
142 0 0         return wantarray ? @ret : $ret[-1];
143             }
144              
145             *enter = \&synchronize;
146              
147             sub timedwait {
148 0     0 1   my ($obj, $timeout) = @_;
149              
150 0 0         $timeout = 1 unless defined $timeout;
151 0 0 0       Carp::croak('MCE::Mutex::Channel: timedwait (timeout) is not valid')
152             if (!looks_like_number($timeout) || $timeout < 0);
153              
154 0 0         $timeout = 0.0003 if $timeout < 0.0003;
155 0           local $@; my $ret = '';
  0            
156              
157 0           eval {
158 0     0     local $SIG{ALRM} = sub { alarm 0; die "alarm clock restart\n" };
  0            
  0            
159 0 0         alarm $timeout unless $is_MSWin32;
160              
161             die "alarm clock restart\n"
162 0 0 0       if $is_MSWin32 && MCE::Util::_sock_ready($obj->{_r_sock}, $timeout);
163              
164 0 0         (!$is_MSWin32)
165             ? ($obj->lock_exclusive, $ret = 1, alarm(0))
166             : ($obj->lock_exclusive, $ret = 1);
167             };
168              
169 0 0         alarm 0 unless $is_MSWin32;
170              
171 0           $ret;
172             }
173              
174             1;
175              
176             __END__