File Coverage

blib/lib/Thread/Barrier.pm
Criterion Covered Total %
statement 15 90 16.6
branch 0 38 0.0
condition 0 14 0.0
subroutine 5 15 33.3
pod 5 6 83.3
total 25 163 15.3


line stmt bran cond sub pod time code
1             ###########################################################################
2             #
3             # See the README file included with the
4             # distribution for license information.
5             #
6             ###########################################################################
7              
8             package Thread::Barrier;
9              
10 1     1   18849 use 5.008;
  1         3  
  1         31  
11 1     1   4 use strict;
  1         2  
  1         25  
12 1     1   5 use warnings;
  1         2  
  1         27  
13              
14 1     1   8959 use threads::shared;
  1         1163  
  1         6  
15              
16             our $VERSION = '0.300_02';
17             $VERSION = eval $VERSION;
18              
19             ###########################################################################
20             # Public Methods
21             ###########################################################################
22              
23             #
24             # new - creates a new Thread::Barrier object
25             #
26             # Arguments:
27             #
28             # threshold
29             # Specifies the required number of threads that
30             # must block on the barrier before it is released.
31             # opt => val ...
32             # Optional arguments to new()
33             #
34             # Returns a Thread::Barrier object on success, dies on failure.
35             #
36             sub new {
37 0     0 1   my ($class, $threshold, %opts) = @_;
38 0 0         $opts{RaiseError} = 1 unless exists $opts{RaiseError};
39              
40             # threads::shared 1.43 (perl 5.18.0) does not yet support shared
41             # CODE refs, which would be obvious/ideal for our 'Action' support.
42             # So, our Thread::Barrier object isn't itself shared, but one of
43             # its members is.
44             #
45             # Object structure (ARRAY ref):
46             #
47             # [ {barrier implementation}, <-- shared
48             # \&optional_coderef ] <-- non-shared
49             #
50              
51 0           my $self = bless [], $class;
52              
53 0           $self->[0] = &share({});
54 0           %{$self->[0]} = (
  0            
55             threshold => 0, # threads required to release barrier
56             count => 0, # number of threads blocking on barrier
57             generation => 0, # incremented when barrier is released
58             #broken => 0, # true if broken
59             #first_released_$gen => 1, # if present, $gen was just released
60             );
61              
62 0 0         $self->set_threshold($threshold) if $threshold; # may die
63 0           while (my ($opt, $val) = each(%opts)) {
64 0 0 0       if ($opt eq 'Action' and defined($val)) {
65 0 0         _confess("Invalid Action parameter to $class->new")
66             unless ref($val) eq 'CODE';
67 0           $self->[1] = $val;
68 0           next;
69             }
70 0 0         if ($opt eq 'RaiseError') {
71 0           $self->[0]->{RaiseError} = !!$val;
72 0           next;
73             }
74 0           _confess("Unrecognized parameter '$opt' to $class->new");
75             }
76              
77 0           return $self;
78             }
79              
80              
81             #
82             # init - set the threshold value for the barrier
83             #
84             # *** DEPRECATED ***
85             #
86             # Arguments:
87             #
88             # threshold
89             # Specifies the required number of threads that
90             # must block on the barrier before it is released.
91             #
92             # Returns the passed argument.
93             #
94             sub init {
95 0     0 0   my($self, $threshold) = @_;
96 0           $self->set_threshold($threshold);
97 0           return $threshold;
98             }
99              
100              
101             #
102             # wait - block until a sufficient number of threads have reached the barrier
103             #
104             # Arguments:
105             #
106             # none
107             #
108             # Returns true to one of threads released upon barrier reset, false to
109             # all others.
110             #
111             sub wait {
112 0     0 1   my ($self, $timeo) = @_;
113 0           my ($bar, $act) = @$self; # Unwrap our actual barrier and Action (if any)
114 0           my ($gen, $i);
115              
116 0 0         $timeo = $self->_normalize_timeout($timeo)
117             if defined($timeo);
118              
119 0           lock $bar;
120              
121 0           $gen = $bar->{generation};
122 0           $i = $bar->{count}++;
123              
124 0 0         if (! $self->_try_release) {
125 0 0         unless (defined $timeo) {
126             # block
127 0   0       while ($bar->{generation} == $gen and not $bar->{broken}) {
128 0           cond_wait($bar);
129             }
130             } else {
131 0   0       while ($bar->{generation} == $gen and not $bar->{broken}) {
132 0 0         last if !cond_timedwait($bar, $timeo);
133             }
134 0 0         $bar->{broken} = 1 if $bar->{generation} == $gen;
135             }
136             }
137              
138             # Are we the first awake from our generation? Run Action if any
139 0 0 0       if (delete $bar->{"first_released_${gen}"} and $act) {
    0          
140 0           my $ok = eval { $act->(); 1; };
  0            
  0            
141 0 0         if (! $ok) {
142 0           $bar->{broken} = 1;
143 0   0       die($@ || "Barrier action failed");
144             }
145             } elsif ($bar->{broken}) {
146 0 0         _croak("Barrier broken") if $bar->{RaiseError};
147 0           return undef;
148             }
149              
150             # In our implementation, the first one to arrive gets the serial
151             # indicator
152 0           return ($i == 0);
153             }
154              
155              
156             #
157             # set_threshold - adjust the barrier's threshold, possibly releasing it
158             # if enough threads are blocking.
159             #
160             # Arguments:
161             #
162             # threshold
163             # Specifies the required number of threads that
164             # must block on the barrier before it is released.
165             #
166             # Returns true if barrier is released, false otherwise.
167             #
168             sub set_threshold {
169 0     0 1   my($self, $threshold) = @_;
170 0           my $err;
171              
172             # validate threshold
173 0           for ($threshold) {
174 0 0         $err = "no argument supplied", last unless defined $_;
175 0 0         $err = "invalid argument supplied", last if /[^0-9]/;
176             }
177 0 0         if ($err) {
178 1     1   697 no warnings 'once';
  1         2  
  1         319  
179 0           local $Carp::CarpLevel = 1;
180 0           _confess($err);
181             }
182              
183             # apply new threshold, possibly releasing barrier
184 0           lock $self->[0];
185 0           $self->[0]->{threshold} = $threshold;
186              
187             # check for release condition
188 0           $self->_try_release;
189             }
190              
191              
192             #
193             # threshold - accessor for debugging purposes
194             #
195             sub threshold {
196 0     0 1   my $bar = shift->[0];
197 0           lock $bar;
198 0           return $bar->{threshold};
199             }
200              
201              
202             #
203             # count - accessor for debugging purposes
204             #
205             sub count {
206 0     0 1   my $bar = shift->[0];
207 0           lock $bar;
208 0           return $bar->{count};
209             }
210              
211              
212             ###########################################################################
213             # Private Methods
214             ###########################################################################
215              
216             sub _confess {
217 0     0     require Carp;
218 0           goto &Carp::confess;
219             }
220              
221             sub _croak {
222 0     0     require Carp;
223 0           goto &Carp::croak;
224             }
225              
226             #
227             # _try_release - release the barrier if a sufficient number of threads
228             # have reached the barrier.
229             # N.B.: Assumes the barrier is locked
230             #
231             # Arguments:
232             #
233             # none
234             #
235             # Returns true if barrier is released, false otherwise.
236             #
237             sub _try_release {
238 0     0     my $bar = shift->[0];
239              
240 0 0         return undef if $bar->{count} < $bar->{threshold};
241              
242             # reset barrier and release
243 0           my $gen = $bar->{generation}++;
244 0           $bar->{"first_released_${gen}"} = 1;
245 0           $bar->{count} = 0;
246              
247 0           cond_broadcast($bar);
248 0           return 1;
249             }
250              
251             sub _normalize_timeout {
252 0     0     my ($self, $timeo) = @_;
253 0 0         $timeo =~ /^\d+(?:\.\d+)*$/
254             or _croak("Invalid timeout specification ($timeo)");
255 0           $timeo += time();
256             }
257              
258             1;
259             __END__