line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
package IPC::ConcurrencyLimit::WithLatestStandby; |
2
|
4
|
|
|
4
|
|
99160
|
use 5.008001; |
|
4
|
|
|
|
|
16
|
|
3
|
4
|
|
|
4
|
|
16
|
use strict; |
|
4
|
|
|
|
|
4
|
|
|
4
|
|
|
|
|
80
|
|
4
|
4
|
|
|
4
|
|
12
|
use warnings; |
|
4
|
|
|
|
|
8
|
|
|
4
|
|
|
|
|
172
|
|
5
|
|
|
|
|
|
|
|
6
|
|
|
|
|
|
|
our $VERSION = '0.17'; |
7
|
|
|
|
|
|
|
|
8
|
4
|
|
|
4
|
|
16
|
use Carp qw(croak); |
|
4
|
|
|
|
|
4
|
|
|
4
|
|
|
|
|
328
|
|
9
|
4
|
|
|
4
|
|
2324
|
use Time::HiRes qw(sleep time); |
|
4
|
|
|
|
|
6900
|
|
|
4
|
|
|
|
|
20
|
|
10
|
4
|
|
|
4
|
|
2432
|
use IPC::ConcurrencyLimit; |
|
4
|
|
|
|
|
8
|
|
|
4
|
|
|
|
|
4604
|
|
11
|
|
|
|
|
|
|
|
12
|
|
|
|
|
|
|
sub new { |
13
|
7
|
|
|
7
|
0
|
1521614
|
my $class = shift; |
14
|
7
|
|
|
|
|
125
|
my %params = @_; |
15
|
7
|
|
|
|
|
74
|
my $type = delete $params{type}; |
16
|
7
|
50
|
|
|
|
94
|
$type = 'Flock' if not defined $type; |
17
|
7
|
50
|
|
|
|
64
|
croak( __PACKAGE__ . " only supports 'Flock' for now") |
18
|
|
|
|
|
|
|
if $type ne 'Flock'; |
19
|
|
|
|
|
|
|
|
20
|
7
|
50
|
33
|
|
|
70
|
if (defined $params{max_procs} and $params{max_procs}!=1) { |
21
|
0
|
|
|
|
|
0
|
croak( __PACKAGE__ . " does not support max_procs!=1, use multiple objects instead."); |
22
|
|
|
|
|
|
|
} |
23
|
|
|
|
|
|
|
|
24
|
7
|
|
50
|
|
|
139
|
my $process_name_change= $params{process_name_change} || 0; |
25
|
7
|
|
50
|
|
|
58
|
my $path= $params{path} || die __PACKAGE__ . '->new: missing mandatory parameter `path`'; |
26
|
7
|
|
50
|
|
|
66
|
my $file_prefix= $params{file_prefix} || ""; |
27
|
7
|
|
0
|
|
|
35
|
my $poll_time= $params{poll_time} || $params{interval} || 1; # seconds to poll (may be fraction) |
28
|
7
|
|
50
|
|
|
1258
|
my $retries= $params{retries} || undef; |
29
|
7
|
|
50
|
|
|
65
|
my $timeout= $params{timeout} || undef; |
30
|
7
|
|
50
|
|
|
34
|
my $debug= $params{debug} || 0; # show debug? |
31
|
7
|
|
50
|
|
|
28
|
my $debug_sub= $params{debug_sub} || undef; |
32
|
|
|
|
|
|
|
|
33
|
|
|
|
|
|
|
my $retry_sub= (ref $retries) ? $retries : |
34
|
|
|
|
|
|
|
(defined $retries && |
35
|
0
|
0
|
|
0
|
|
0
|
defined $timeout) ? sub { $_[0] <= $retries && $_[2] <= $timeout } : |
36
|
0
|
|
|
0
|
|
0
|
(defined $retries) ? sub { $_[0] <= $retries } : |
37
|
0
|
|
|
0
|
|
0
|
(defined $timeout) ? sub { $_[2] <= $timeout } : |
38
|
7
|
50
|
33
|
25
|
|
333
|
sub { 1 }; |
|
25
|
50
|
|
|
|
140
|
|
|
|
50
|
|
|
|
|
|
|
|
50
|
|
|
|
|
|
39
|
|
|
|
|
|
|
|
40
|
|
|
|
|
|
|
# primary is replace by standby1, is replaced by |
41
|
|
|
|
|
|
|
# standby2, is replaced by standby3. However, standby1 |
42
|
|
|
|
|
|
|
# will exit when standby2's lock is held by another process. |
43
|
7
|
50
|
|
|
|
58
|
my @names= map { ($file_prefix ? "$file_prefix.$_" : $_) } |
|
28
|
|
|
|
|
118
|
|
44
|
|
|
|
|
|
|
qw(primary standby1 standby2 standby3); |
45
|
|
|
|
|
|
|
my @lockers= map { |
46
|
7
|
50
|
|
|
|
50
|
IPC::ConcurrencyLimit->new( |
|
28
|
|
|
|
|
272
|
|
47
|
|
|
|
|
|
|
type => $type, |
48
|
|
|
|
|
|
|
max_procs => 1, |
49
|
|
|
|
|
|
|
# future proofing |
50
|
|
|
|
|
|
|
$type eq "Flock" ? ( |
51
|
|
|
|
|
|
|
file_prefix => $_, |
52
|
|
|
|
|
|
|
path => $path, |
53
|
|
|
|
|
|
|
) : (), |
54
|
|
|
|
|
|
|
) |
55
|
|
|
|
|
|
|
} @names; |
56
|
|
|
|
|
|
|
|
57
|
|
|
|
|
|
|
return bless { |
58
|
|
|
|
|
|
|
poll_time => $poll_time, |
59
|
|
|
|
|
|
|
timeout => $timeout, # FYI |
60
|
|
|
|
|
|
|
retries => $retries, # FYI |
61
|
|
|
|
|
|
|
lock_name => \@names, |
62
|
|
|
|
|
|
|
locker => \@lockers, |
63
|
|
|
|
|
|
|
debug => $debug, |
64
|
0
|
|
|
0
|
|
0
|
debug_sub => $debug_sub || sub { warn @_,"\n" }, |
65
|
7
|
|
50
|
|
|
253
|
retry_sub => $retry_sub, |
66
|
|
|
|
|
|
|
process_name_change => $process_name_change, |
67
|
|
|
|
|
|
|
}, $class; |
68
|
|
|
|
|
|
|
} |
69
|
|
|
|
|
|
|
|
70
|
|
|
|
|
|
|
sub _diag { |
71
|
33
|
|
|
33
|
|
67
|
my ($self, $fmt, @args)= @_; |
72
|
33
|
50
|
|
|
|
92
|
if (!@args) { |
73
|
33
|
|
|
|
|
129
|
$self->{debug_sub}->($fmt); |
74
|
|
|
|
|
|
|
} else { |
75
|
0
|
|
|
|
|
0
|
$self->{debug_sub}->(sprintf $fmt, @args); |
76
|
|
|
|
|
|
|
} |
77
|
|
|
|
|
|
|
} |
78
|
|
|
|
|
|
|
|
79
|
|
|
|
|
|
|
|
80
|
|
|
|
|
|
|
sub get_lock { |
81
|
7
|
|
|
7
|
0
|
1746
|
my ($self) = @_; |
82
|
|
|
|
|
|
|
|
83
|
7
|
|
|
|
|
24
|
my $locker= $self->{locker}; |
84
|
7
|
|
|
|
|
14
|
my $names= $self->{lock_name}; |
85
|
|
|
|
|
|
|
|
86
|
7
|
|
|
|
|
48
|
my $old_oh= $0; |
87
|
|
|
|
|
|
|
|
88
|
|
|
|
|
|
|
$0 = "$old_oh - acquire" |
89
|
7
|
50
|
|
|
|
43
|
if $self->{process_name_change}; |
90
|
|
|
|
|
|
|
|
91
|
|
|
|
|
|
|
# try to get the rightmost lock (standby3) if we don't get it |
92
|
|
|
|
|
|
|
# then we exit out. this shouldn't really happen if other things |
93
|
|
|
|
|
|
|
# are sane, for instance when $poll_time is much smaller than |
94
|
|
|
|
|
|
|
# the rate we allocate new workers. |
95
|
7
|
|
|
|
|
25
|
my $locker_id= $#$locker; |
96
|
7
|
50
|
|
|
|
35
|
if ( $locker->[$locker_id]->get_lock() ) { |
97
|
|
|
|
|
|
|
$self->_diag( "Got a $names->[$locker_id] lock") |
98
|
7
|
50
|
|
|
|
53
|
if $self->{debug}; |
99
|
|
|
|
|
|
|
} else { |
100
|
|
|
|
|
|
|
$self->_diag( "Failed to get a $names->[$locker_id] lock, entry lock is held by another process" ) |
101
|
0
|
0
|
|
|
|
0
|
if $self->{debug}; |
102
|
|
|
|
|
|
|
$0 = "$old_oh - no-lock-acquired" |
103
|
0
|
0
|
|
|
|
0
|
if $self->{process_name_change}; |
104
|
0
|
|
|
|
|
0
|
return; |
105
|
|
|
|
|
|
|
} |
106
|
|
|
|
|
|
|
|
107
|
|
|
|
|
|
|
# Each worker tries to acquire the lock to its left. If it does |
108
|
|
|
|
|
|
|
# then it abandons its old lock. If that means the worker ends up |
109
|
|
|
|
|
|
|
# on locker_id 0 then they are done, and can do work. |
110
|
|
|
|
|
|
|
# The first standby worker also looks to its right to see if there |
111
|
|
|
|
|
|
|
# is a replacement process for it, if there is it exits, leaving |
112
|
|
|
|
|
|
|
# a gap and letting the replacements shuffle left. |
113
|
7
|
|
|
|
|
1018
|
my $tries= 0; |
114
|
7
|
|
|
|
|
10
|
my $lock_tries= 0; |
115
|
7
|
|
|
|
|
49
|
my $standby_start= time(); |
116
|
7
|
|
|
|
|
14
|
my $lock_start= time(); |
117
|
7
|
|
|
|
|
34
|
my $poll_time= $self->{poll_time}; |
118
|
|
|
|
|
|
|
|
119
|
7
|
|
|
|
|
28
|
while ( $locker_id > 0 ) { |
120
|
|
|
|
|
|
|
$0 = "$old_oh - $names->[$locker_id]" |
121
|
44
|
50
|
|
|
|
239
|
if $self->{process_name_change}; |
122
|
|
|
|
|
|
|
|
123
|
|
|
|
|
|
|
# can we shuffle our lock left? |
124
|
44
|
100
|
|
|
|
479
|
if ( $locker->[$locker_id - 1]->get_lock() ) { |
125
|
|
|
|
|
|
|
$self->_diag( "Got a $names->[$locker_id -1] lock, dropping old $names->[$locker_id] lock") |
126
|
19
|
50
|
|
|
|
150
|
if $self->{debug}; |
127
|
|
|
|
|
|
|
# yep, we got the lock to the left, so drop our old lock, |
128
|
|
|
|
|
|
|
# and move the pointer left at the same time. |
129
|
19
|
|
|
|
|
1534
|
$locker->[ $locker_id-- ]->release_lock(); |
130
|
19
|
|
|
|
|
24
|
$lock_tries= 0; |
131
|
19
|
|
|
|
|
50
|
$lock_start= time(); |
132
|
19
|
|
|
|
|
55
|
next; |
133
|
|
|
|
|
|
|
} |
134
|
|
|
|
|
|
|
|
135
|
25
|
50
|
|
|
|
603
|
unless ($self->{retry_sub}->(++$tries, ++$lock_tries, time - $standby_start, time - $lock_start)) { |
136
|
|
|
|
|
|
|
$0 = "$old_oh - no-lock-timeout" |
137
|
0
|
0
|
|
|
|
0
|
if $self->{process_name_change}; |
138
|
0
|
|
|
|
|
0
|
return; |
139
|
|
|
|
|
|
|
} |
140
|
|
|
|
|
|
|
|
141
|
|
|
|
|
|
|
# check if we are the first standby worker. |
142
|
25
|
100
|
|
|
|
267
|
if ( $locker_id == 1 ) { |
143
|
|
|
|
|
|
|
# yep - we are the first standby worker, |
144
|
|
|
|
|
|
|
# so check if the lock to our right is being held: |
145
|
21
|
100
|
|
|
|
159
|
if ( $locker->[$locker_id + 1]->get_lock() ) { |
146
|
|
|
|
|
|
|
# we got the lock, which means nothing else |
147
|
|
|
|
|
|
|
# holds it. so we release the lock and move on. |
148
|
19
|
|
|
|
|
123
|
$locker->[$locker_id + 1]->release_lock(); |
149
|
|
|
|
|
|
|
} else { |
150
|
|
|
|
|
|
|
$self->_diag( |
151
|
|
|
|
|
|
|
"A newer worker is holding the $names->[$locker_id+1] lock, will exit to let it take over" |
152
|
2
|
50
|
|
|
|
30
|
) if $self->{debug}; |
153
|
|
|
|
|
|
|
# we failed to get the lock, which means there is a newer |
154
|
|
|
|
|
|
|
# process that can replace us so return/exit - this frees up |
155
|
|
|
|
|
|
|
# our lock and lets the newer process to move into our position. |
156
|
|
|
|
|
|
|
$0 = "$old_oh - no-lock-retired" |
157
|
2
|
50
|
|
|
|
251
|
if $self->{process_name_change}; |
158
|
2
|
|
|
|
|
10
|
return; |
159
|
|
|
|
|
|
|
} |
160
|
|
|
|
|
|
|
} |
161
|
|
|
|
|
|
|
|
162
|
|
|
|
|
|
|
# nope - the lock to our left is being held so sleep a while before |
163
|
|
|
|
|
|
|
# we try again. We use the rand and the formula so that items to the |
164
|
|
|
|
|
|
|
# right poll faster than items to the left, and to reduce the chance |
165
|
|
|
|
|
|
|
# that lock holder 1 and lock holder 3 poll lock 2 at the same time |
166
|
|
|
|
|
|
|
# forever. The formula guarantees that items to the left poll faster, |
167
|
|
|
|
|
|
|
# and the rand ensures there is jitter. |
168
|
23
|
|
|
|
|
1622397
|
sleep rand(($poll_time / $locker_id)*2); |
169
|
|
|
|
|
|
|
} |
170
|
|
|
|
|
|
|
|
171
|
|
|
|
|
|
|
# assert that $locker_id is 0 at this point. |
172
|
5
|
50
|
|
|
|
19
|
die "panic: We should not reach this point with \$locker_id larger than 0, got $locker_id" |
173
|
|
|
|
|
|
|
if $locker_id; |
174
|
|
|
|
|
|
|
|
175
|
|
|
|
|
|
|
$self->_diag("Got $names->[$locker_id] lock, we are allowed to do work.") |
176
|
5
|
50
|
|
|
|
42
|
if $self->{debug}; |
177
|
|
|
|
|
|
|
|
178
|
|
|
|
|
|
|
# at this point we should be $locker_id == 0 and we can do work. |
179
|
5
|
50
|
|
|
|
273
|
if ($self->{process_name_change}) { |
180
|
0
|
0
|
|
|
|
0
|
if ($self->{process_name_change} > 1) { |
181
|
0
|
|
|
|
|
0
|
$0 = $old_oh; |
182
|
|
|
|
|
|
|
} else { |
183
|
0
|
|
|
|
|
0
|
$0 = "$old_oh - $names->[$locker_id]" |
184
|
|
|
|
|
|
|
} |
185
|
|
|
|
|
|
|
} |
186
|
5
|
|
|
|
|
20
|
return 1; |
187
|
|
|
|
|
|
|
} |
188
|
|
|
|
|
|
|
|
189
|
|
|
|
|
|
|
|
190
|
|
|
|
|
|
|
sub is_locked { |
191
|
0
|
|
|
0
|
0
|
|
my $self = shift; |
192
|
0
|
|
|
|
|
|
return $self->{locker}[0]->is_locked(@_); |
193
|
|
|
|
|
|
|
} |
194
|
|
|
|
|
|
|
|
195
|
|
|
|
|
|
|
sub release_lock { |
196
|
0
|
|
|
0
|
0
|
|
my $self = shift; |
197
|
0
|
|
|
|
|
|
return $self->{locker}[0]->release_lock(@_); |
198
|
|
|
|
|
|
|
} |
199
|
|
|
|
|
|
|
|
200
|
|
|
|
|
|
|
sub lock_id { |
201
|
0
|
|
|
0
|
0
|
|
my $self = shift; |
202
|
0
|
|
|
|
|
|
return $self->{locker}[0]->lock_id(@_); |
203
|
|
|
|
|
|
|
} |
204
|
|
|
|
|
|
|
|
205
|
|
|
|
|
|
|
sub heartbeat { |
206
|
0
|
|
|
0
|
0
|
|
my $self = shift; |
207
|
0
|
|
|
|
|
|
return $self->{locker}[0]->heartbeat; |
208
|
|
|
|
|
|
|
} |
209
|
|
|
|
|
|
|
|
210
|
|
|
|
|
|
|
|
211
|
|
|
|
|
|
|
1; |
212
|
|
|
|
|
|
|
|
213
|
|
|
|
|
|
|
__END__ |