line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
package IPC::ConcurrencyLimit::WithLatestStandby; |
2
|
35
|
|
|
35
|
|
577010
|
use 5.008001; |
|
35
|
|
|
|
|
105
|
|
3
|
35
|
|
|
35
|
|
105
|
use strict; |
|
35
|
|
|
|
|
35
|
|
|
35
|
|
|
|
|
490
|
|
4
|
35
|
|
|
35
|
|
105
|
use warnings; |
|
35
|
|
|
|
|
35
|
|
|
35
|
|
|
|
|
1155
|
|
5
|
|
|
|
|
|
|
|
6
|
|
|
|
|
|
|
our $VERSION = '0.16'; |
7
|
|
|
|
|
|
|
|
8
|
35
|
|
|
35
|
|
105
|
use Carp qw(croak); |
|
35
|
|
|
|
|
35
|
|
|
35
|
|
|
|
|
1295
|
|
9
|
35
|
|
|
35
|
|
16835
|
use Time::HiRes qw(sleep time); |
|
35
|
|
|
|
|
36330
|
|
|
35
|
|
|
|
|
140
|
|
10
|
35
|
|
|
35
|
|
16065
|
use IPC::ConcurrencyLimit; |
|
35
|
|
|
|
|
70
|
|
|
35
|
|
|
|
|
26215
|
|
11
|
|
|
|
|
|
|
|
12
|
|
|
|
|
|
|
sub new { |
13
|
69
|
|
|
69
|
0
|
311626632
|
my $class = shift; |
14
|
69
|
|
|
|
|
1015
|
my %params = @_; |
15
|
69
|
|
|
|
|
318
|
my $type = delete $params{type}; |
16
|
69
|
50
|
|
|
|
649
|
$type = 'Flock' if not defined $type; |
17
|
69
|
50
|
|
|
|
433
|
croak( __PACKAGE__ . " only supports 'Flock' for now") |
18
|
|
|
|
|
|
|
if $type ne 'Flock'; |
19
|
|
|
|
|
|
|
|
20
|
69
|
50
|
33
|
|
|
386
|
if (defined $params{max_procs} and $params{max_procs}!=1) { |
21
|
0
|
|
|
|
|
0
|
croak( __PACKAGE__ . " does not support max_procs!=1, use multiple objects instead."); |
22
|
|
|
|
|
|
|
} |
23
|
|
|
|
|
|
|
|
24
|
69
|
|
50
|
|
|
925
|
my $process_name_change= $params{process_name_change} || 0; |
25
|
69
|
|
50
|
|
|
276
|
my $path= $params{path} || die __PACKAGE__ . '->new: missing mandatory parameter `path`'; |
26
|
69
|
|
50
|
|
|
339
|
my $file_prefix= $params{file_prefix} || ""; |
27
|
69
|
|
0
|
|
|
219
|
my $poll_time= $params{poll_time} || $params{interval} || 1; # seconds to poll (may be fraction) |
28
|
69
|
|
50
|
|
|
677
|
my $retries= $params{retries} || undef; |
29
|
69
|
|
50
|
|
|
410
|
my $timeout= $params{timeout} || undef; |
30
|
69
|
|
50
|
|
|
160
|
my $debug= $params{debug} || 0; # show debug? |
31
|
69
|
|
50
|
|
|
197
|
my $debug_sub= $params{debug_sub} || undef; |
32
|
|
|
|
|
|
|
|
33
|
|
|
|
|
|
|
my $retry_sub= (ref $retries) ? $retries : |
34
|
|
|
|
|
|
|
(defined $retries && |
35
|
0
|
0
|
|
0
|
|
0
|
defined $timeout) ? sub { $_[0] <= $retries && $_[2] <= $timeout } : |
36
|
0
|
|
|
0
|
|
0
|
(defined $retries) ? sub { $_[0] <= $retries } : |
37
|
0
|
|
|
0
|
|
0
|
(defined $timeout) ? sub { $_[2] <= $timeout } : |
38
|
69
|
50
|
33
|
229
|
|
1982
|
sub { 1 }; |
|
229
|
50
|
|
|
|
681
|
|
|
|
50
|
|
|
|
|
|
|
|
50
|
|
|
|
|
|
39
|
|
|
|
|
|
|
|
40
|
|
|
|
|
|
|
# primary is replace by standby1, is replaced by |
41
|
|
|
|
|
|
|
# standby2, is replaced by standby3. However, standby1 |
42
|
|
|
|
|
|
|
# will exit when standby2's lock is held by another process. |
43
|
69
|
50
|
|
|
|
362
|
my @names= map { ($file_prefix ? "$file_prefix.$_" : $_) } |
|
276
|
|
|
|
|
914
|
|
44
|
|
|
|
|
|
|
qw(primary standby1 standby2 standby3); |
45
|
|
|
|
|
|
|
my @lockers= map { |
46
|
69
|
50
|
|
|
|
322
|
IPC::ConcurrencyLimit->new( |
|
276
|
|
|
|
|
1788
|
|
47
|
|
|
|
|
|
|
type => $type, |
48
|
|
|
|
|
|
|
max_procs => 1, |
49
|
|
|
|
|
|
|
# future proofing |
50
|
|
|
|
|
|
|
$type eq "Flock" ? ( |
51
|
|
|
|
|
|
|
file_prefix => $_, |
52
|
|
|
|
|
|
|
path => $path, |
53
|
|
|
|
|
|
|
) : (), |
54
|
|
|
|
|
|
|
) |
55
|
|
|
|
|
|
|
} @names; |
56
|
|
|
|
|
|
|
|
57
|
|
|
|
|
|
|
return bless { |
58
|
|
|
|
|
|
|
poll_time => $poll_time, |
59
|
|
|
|
|
|
|
timeout => $timeout, # FYI |
60
|
|
|
|
|
|
|
retries => $retries, # FYI |
61
|
|
|
|
|
|
|
lock_name => \@names, |
62
|
|
|
|
|
|
|
locker => \@lockers, |
63
|
|
|
|
|
|
|
debug => $debug, |
64
|
0
|
|
|
0
|
|
0
|
debug_sub => $debug_sub || sub { warn @_,"\n" }, |
65
|
69
|
|
50
|
|
|
1292
|
retry_sub => $retry_sub, |
66
|
|
|
|
|
|
|
process_name_change => $process_name_change, |
67
|
|
|
|
|
|
|
}, $class; |
68
|
|
|
|
|
|
|
} |
69
|
|
|
|
|
|
|
|
70
|
|
|
|
|
|
|
sub _diag { |
71
|
321
|
|
|
321
|
|
513
|
my ($self, $fmt, @args)= @_; |
72
|
321
|
50
|
|
|
|
665
|
if (!@args) { |
73
|
321
|
|
|
|
|
1226
|
$self->{debug_sub}->($fmt); |
74
|
|
|
|
|
|
|
} else { |
75
|
0
|
|
|
|
|
0
|
$self->{debug_sub}->(sprintf $fmt, @args); |
76
|
|
|
|
|
|
|
} |
77
|
|
|
|
|
|
|
} |
78
|
|
|
|
|
|
|
|
79
|
|
|
|
|
|
|
|
80
|
|
|
|
|
|
|
sub get_lock { |
81
|
69
|
|
|
69
|
0
|
26015
|
my ($self) = @_; |
82
|
|
|
|
|
|
|
|
83
|
69
|
|
|
|
|
244
|
my $locker= $self->{locker}; |
84
|
69
|
|
|
|
|
139
|
my $names= $self->{lock_name}; |
85
|
|
|
|
|
|
|
|
86
|
69
|
|
|
|
|
389
|
my $old_oh= $0; |
87
|
|
|
|
|
|
|
|
88
|
|
|
|
|
|
|
$0 = "$old_oh - acquire" |
89
|
69
|
50
|
|
|
|
193
|
if $self->{process_name_change}; |
90
|
|
|
|
|
|
|
|
91
|
|
|
|
|
|
|
# try to get the rightmost lock (standby3) if we don't get it |
92
|
|
|
|
|
|
|
# then we exit out. this shouldn't really happen if other things |
93
|
|
|
|
|
|
|
# are sane, for instance when $poll_time is much smaller than |
94
|
|
|
|
|
|
|
# the rate we allocate new workers. |
95
|
69
|
|
|
|
|
265
|
my $locker_id= $#$locker; |
96
|
69
|
50
|
|
|
|
252
|
if ( $locker->[$locker_id]->get_lock() ) { |
97
|
69
|
|
|
|
|
248
|
$self->_diag( "Got a $names->[$locker_id] lock"); |
98
|
|
|
|
|
|
|
} else { |
99
|
|
|
|
|
|
|
$self->_diag( "Failed to get a $names->[$locker_id] lock, entry lock is held by another process" ) |
100
|
0
|
0
|
|
|
|
0
|
if $self->{debug}; |
101
|
|
|
|
|
|
|
$0 = "$old_oh - no-lock-acquired" |
102
|
0
|
0
|
|
|
|
0
|
if $self->{process_name_change}; |
103
|
0
|
|
|
|
|
0
|
return; |
104
|
|
|
|
|
|
|
} |
105
|
|
|
|
|
|
|
|
106
|
|
|
|
|
|
|
# Each worker tries to acquire the lock to its left. If it does |
107
|
|
|
|
|
|
|
# then it abandons its old lock. If that means the worker ends up |
108
|
|
|
|
|
|
|
# on locker_id 0 then they are done, and can do work. |
109
|
|
|
|
|
|
|
# The first standby worker also looks to its right to see if there |
110
|
|
|
|
|
|
|
# is a replacement process for it, if there is it exits, leaving |
111
|
|
|
|
|
|
|
# a gap and letting the replacements shuffle left. |
112
|
69
|
|
|
|
|
6841
|
my $tries= 0; |
113
|
69
|
|
|
|
|
74
|
my $lock_tries= 0; |
114
|
69
|
|
|
|
|
389
|
my $standby_start= time(); |
115
|
69
|
|
|
|
|
128
|
my $lock_start= time(); |
116
|
69
|
|
|
|
|
109
|
my $poll_time= $self->{poll_time}; |
117
|
|
|
|
|
|
|
|
118
|
69
|
|
|
|
|
206
|
while ( $locker_id > 0 ) { |
119
|
|
|
|
|
|
|
$0 = "$old_oh - $names->[$locker_id]" |
120
|
412
|
50
|
|
|
|
1765
|
if $self->{process_name_change}; |
121
|
|
|
|
|
|
|
|
122
|
|
|
|
|
|
|
# can we shuffle our lock left? |
123
|
412
|
100
|
|
|
|
3223
|
if ( $locker->[$locker_id - 1]->get_lock() ) { |
124
|
183
|
|
|
|
|
912
|
$self->_diag( "Got a $names->[$locker_id -1] lock, dropping old $names->[$locker_id] lock"); |
125
|
|
|
|
|
|
|
# yep, we got the lock to the left, so drop our old lock, |
126
|
|
|
|
|
|
|
# and move the pointer left at the same time. |
127
|
183
|
|
|
|
|
16729
|
$locker->[ $locker_id-- ]->release_lock(); |
128
|
183
|
|
|
|
|
225
|
$lock_tries= 0; |
129
|
183
|
|
|
|
|
389
|
$lock_start= time(); |
130
|
183
|
|
|
|
|
478
|
next; |
131
|
|
|
|
|
|
|
} |
132
|
|
|
|
|
|
|
|
133
|
229
|
50
|
|
|
|
2086
|
unless ($self->{retry_sub}->(++$tries, ++$lock_tries, time - $standby_start, time - $lock_start)) { |
134
|
|
|
|
|
|
|
$0 = "$old_oh - no-lock-timeout" |
135
|
0
|
0
|
|
|
|
0
|
if $self->{process_name_change}; |
136
|
0
|
|
|
|
|
0
|
return; |
137
|
|
|
|
|
|
|
} |
138
|
|
|
|
|
|
|
|
139
|
|
|
|
|
|
|
# check if we are the first standby worker. |
140
|
229
|
100
|
|
|
|
1001
|
if ( $locker_id == 1 ) { |
141
|
|
|
|
|
|
|
# yep - we are the first standby worker, |
142
|
|
|
|
|
|
|
# so check if the lock to our right is being held: |
143
|
176
|
100
|
|
|
|
798
|
if ( $locker->[$locker_id + 1]->get_lock() ) { |
144
|
|
|
|
|
|
|
# we got the lock, which means nothing else |
145
|
|
|
|
|
|
|
# holds it. so we release the lock and move on. |
146
|
152
|
|
|
|
|
589
|
$locker->[$locker_id + 1]->release_lock(); |
147
|
|
|
|
|
|
|
} else { |
148
|
|
|
|
|
|
|
$self->_diag( |
149
|
|
|
|
|
|
|
"A newer worker is holding the $names->[$locker_id+1] lock, will exit to let it take over" |
150
|
24
|
50
|
|
|
|
709
|
) if $self->{debug}; |
151
|
|
|
|
|
|
|
# we failed to get the lock, which means there is a newer |
152
|
|
|
|
|
|
|
# process that can replace us so return/exit - this frees up |
153
|
|
|
|
|
|
|
# our lock and lets the newer process to move into our position. |
154
|
|
|
|
|
|
|
$0 = "$old_oh - no-lock-retired" |
155
|
24
|
50
|
|
|
|
2237
|
if $self->{process_name_change}; |
156
|
24
|
|
|
|
|
110
|
return; |
157
|
|
|
|
|
|
|
} |
158
|
|
|
|
|
|
|
} |
159
|
|
|
|
|
|
|
|
160
|
|
|
|
|
|
|
# nope - the lock to our left is being held so sleep a while before |
161
|
|
|
|
|
|
|
# we try again. We use the rand and the formula so that items to the |
162
|
|
|
|
|
|
|
# right poll faster than items to the left, and to reduce the chance |
163
|
|
|
|
|
|
|
# that lock holder 1 and lock holder 3 poll lock 2 at the same time |
164
|
|
|
|
|
|
|
# forever. The formula guarantees that items to the left poll faster, |
165
|
|
|
|
|
|
|
# and the rand ensures there is jitter. |
166
|
205
|
|
|
|
|
17377622
|
sleep rand(($poll_time / $locker_id)*2); |
167
|
|
|
|
|
|
|
} |
168
|
|
|
|
|
|
|
|
169
|
|
|
|
|
|
|
# assert that $locker_id is 0 at this point. |
170
|
45
|
50
|
|
|
|
136
|
die "panic: We should not reach this point with \$locker_id larger than 0, got $locker_id" |
171
|
|
|
|
|
|
|
if $locker_id; |
172
|
|
|
|
|
|
|
|
173
|
|
|
|
|
|
|
$self->_diag("Got $names->[$locker_id] lock, we are allowed to do work.") |
174
|
45
|
50
|
|
|
|
263
|
if $self->{debug}; |
175
|
|
|
|
|
|
|
|
176
|
|
|
|
|
|
|
# at this point we should be $locker_id == 0 and we can do work. |
177
|
45
|
50
|
|
|
|
2136
|
if ($self->{process_name_change}) { |
178
|
0
|
0
|
|
|
|
0
|
if ($self->{process_name_change} > 1) { |
179
|
0
|
|
|
|
|
0
|
$0 = $old_oh; |
180
|
|
|
|
|
|
|
} else { |
181
|
0
|
|
|
|
|
0
|
$0 = "$old_oh - $names->[$locker_id]" |
182
|
|
|
|
|
|
|
} |
183
|
|
|
|
|
|
|
} |
184
|
45
|
|
|
|
|
99
|
return 1; |
185
|
|
|
|
|
|
|
} |
186
|
|
|
|
|
|
|
|
187
|
|
|
|
|
|
|
|
188
|
|
|
|
|
|
|
sub is_locked { |
189
|
0
|
|
|
0
|
0
|
0
|
my $self = shift; |
190
|
0
|
|
|
|
|
0
|
return $self->{locker}[0]->is_locked(@_); |
191
|
|
|
|
|
|
|
} |
192
|
|
|
|
|
|
|
|
193
|
|
|
|
|
|
|
sub release_lock { |
194
|
31
|
|
|
31
|
0
|
62320075
|
my $self = shift; |
195
|
31
|
|
|
|
|
527
|
return $self->{locker}[0]->release_lock(@_); |
196
|
|
|
|
|
|
|
} |
197
|
|
|
|
|
|
|
|
198
|
|
|
|
|
|
|
sub lock_id { |
199
|
0
|
|
|
0
|
0
|
|
my $self = shift; |
200
|
0
|
|
|
|
|
|
return $self->{locker}[0]->lock_id(@_); |
201
|
|
|
|
|
|
|
} |
202
|
|
|
|
|
|
|
|
203
|
|
|
|
|
|
|
sub heartbeat { |
204
|
0
|
|
|
0
|
0
|
|
my $self = shift; |
205
|
0
|
|
|
|
|
|
return $self->{locker}[0]->heartbeat; |
206
|
|
|
|
|
|
|
} |
207
|
|
|
|
|
|
|
|
208
|
|
|
|
|
|
|
|
209
|
|
|
|
|
|
|
1; |
210
|
|
|
|
|
|
|
|
211
|
|
|
|
|
|
|
__END__ |