line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
package AnyEvent::Beanstalk::Worker; |
2
|
|
|
|
|
|
|
|
3
|
1
|
|
|
1
|
|
29190
|
use 5.016001; |
|
1
|
|
|
|
|
4
|
|
|
1
|
|
|
|
|
25
|
|
4
|
1
|
|
|
1
|
|
5
|
use strict; |
|
1
|
|
|
|
|
3
|
|
|
1
|
|
|
|
|
37
|
|
5
|
1
|
|
|
1
|
|
4
|
use warnings; |
|
1
|
|
|
|
|
6
|
|
|
1
|
|
|
|
|
32
|
|
6
|
1
|
|
|
1
|
|
11
|
use feature 'current_sub'; |
|
1
|
|
|
|
|
2
|
|
|
1
|
|
|
|
|
94
|
|
7
|
1
|
|
|
1
|
|
4
|
use AnyEvent; |
|
1
|
|
|
|
|
2
|
|
|
1
|
|
|
|
|
20
|
|
8
|
1
|
|
|
1
|
|
883
|
use AnyEvent::Log; |
|
1
|
|
|
|
|
12794
|
|
|
1
|
|
|
|
|
47
|
|
9
|
1
|
|
|
1
|
|
1111
|
use AnyEvent::Beanstalk; |
|
1
|
|
|
|
|
77135
|
|
|
1
|
|
|
|
|
24
|
|
10
|
|
|
|
|
|
|
|
11
|
|
|
|
|
|
|
our $VERSION = '0.03'; |
12
|
|
|
|
|
|
|
|
13
|
|
|
|
|
|
|
sub new { |
14
|
0
|
|
|
0
|
1
|
|
my $class = shift; |
15
|
0
|
|
|
|
|
|
my $self = {}; |
16
|
0
|
|
|
|
|
|
bless $self => $class; |
17
|
|
|
|
|
|
|
|
18
|
0
|
|
|
|
|
|
my %args = @_; |
19
|
|
|
|
|
|
|
|
20
|
0
|
|
|
|
|
|
$self->{_cb} = {}; |
21
|
0
|
|
|
|
|
|
$self->{_event} = {}; |
22
|
0
|
|
|
|
|
|
$self->{_jobs} = {}; |
23
|
0
|
|
|
|
|
|
$self->{_events} = []; ## event queue |
24
|
0
|
|
|
|
|
|
$self->{_handled_jobs} = 0; ## simple job counter |
25
|
|
|
|
|
|
|
|
26
|
0
|
|
|
|
|
|
$self->{_running} = 0; |
27
|
0
|
|
|
|
|
|
$self->{_stop_tries} = 0; |
28
|
0
|
|
0
|
|
|
|
$self->{_max_stop_tries} = $args{max_stop_tries} // 3; |
29
|
0
|
|
0
|
|
|
|
$self->{_max_jobs} = $args{max_jobs} || 0; |
30
|
0
|
|
0
|
|
|
|
$self->{_concurrency} = $args{concurrency} || 1; |
31
|
0
|
|
0
|
|
|
|
$self->{_log_level} = $args{log_level} // 4; |
32
|
|
|
|
|
|
|
|
33
|
0
|
|
0
|
|
|
|
$self->{_reserve_timeout} = $args{reserve_timeout} || 1; |
34
|
0
|
|
|
|
|
|
$self->{_reserve_base} = $self->{_reserve_timeout}; |
35
|
0
|
|
|
|
|
|
$self->{_reserve_timeout_factor} = 1.1; |
36
|
0
|
|
|
|
|
|
$self->{_reserve_timeout_max} = 4; |
37
|
0
|
|
0
|
|
|
|
$self->{_release_delay} = $args{release_delay} || 3; |
38
|
|
|
|
|
|
|
|
39
|
0
|
|
|
|
|
|
$self->{_initial_state} = $args{initial_state}; |
40
|
|
|
|
|
|
|
|
41
|
0
|
|
|
|
|
|
$self->{_log_ctx} = AnyEvent::Log::ctx; |
42
|
0
|
|
|
|
|
|
$self->{_log_ctx}->title(__PACKAGE__); |
43
|
|
|
|
|
|
|
|
44
|
0
|
|
|
|
|
|
$self->{_log} = {}; |
45
|
0
|
|
|
|
|
|
$self->{_log}->{trace} = $self->{_log_ctx}->logger("trace"); |
46
|
0
|
|
|
|
|
|
$self->{_log}->{debug} = $self->{_log_ctx}->logger("debug"); |
47
|
0
|
|
|
|
|
|
$self->{_log}->{info} = $self->{_log_ctx}->logger("info"); |
48
|
0
|
|
|
|
|
|
$self->{_log}->{note} = $self->{_log_ctx}->logger("note"); |
49
|
|
|
|
|
|
|
|
50
|
0
|
|
|
|
|
|
$self->{_signal} = {}; |
51
|
|
|
|
|
|
|
$self->{_signal}->{TERM} = AnyEvent->signal( |
52
|
|
|
|
|
|
|
signal => "TERM", |
53
|
|
|
|
|
|
|
cb => |
54
|
0
|
|
|
0
|
|
|
sub { $self->{_log_ctx}->log( warn => "TERM received" ); $self->stop } |
|
0
|
|
|
|
|
|
|
55
|
0
|
|
|
|
|
|
); |
56
|
|
|
|
|
|
|
$self->{_signal}->{INT} = AnyEvent->signal( |
57
|
|
|
|
|
|
|
signal => "INT", |
58
|
|
|
|
|
|
|
cb => |
59
|
0
|
|
|
0
|
|
|
sub { $self->{_log_ctx}->log( warn => "INT received" ); $self->stop } |
|
0
|
|
|
|
|
|
|
60
|
0
|
|
|
|
|
|
); |
61
|
|
|
|
|
|
|
$self->{_signal}->{USR2} = AnyEvent->signal( |
62
|
|
|
|
|
|
|
signal => "USR2", |
63
|
|
|
|
|
|
|
cb => sub { |
64
|
0
|
0
|
|
0
|
|
|
$self->{_log_level} = |
65
|
|
|
|
|
|
|
( $self->{_log_level} >= 9 ? 2 : $self->{_log_level} + 1 ); |
66
|
|
|
|
|
|
|
} |
67
|
0
|
|
|
|
|
|
); |
68
|
|
|
|
|
|
|
|
69
|
0
|
|
0
|
|
|
|
$args{beanstalk_host} ||= 'localhost'; |
70
|
0
|
|
0
|
|
|
|
$args{beanstalk_port} ||= 11300; |
71
|
|
|
|
|
|
|
|
72
|
0
|
0
|
|
|
|
|
unless ($args{beanstalk_watch}) { |
73
|
0
|
|
|
|
|
|
die "beanstalk_watch argument required\n"; |
74
|
|
|
|
|
|
|
} |
75
|
|
|
|
|
|
|
|
76
|
|
|
|
|
|
|
$self->beanstalk( |
77
|
0
|
|
|
|
|
|
server => $args{beanstalk_host} . ':' . $args{beanstalk_port}, |
78
|
|
|
|
|
|
|
decoder => $args{beanstalk_decoder} |
79
|
|
|
|
|
|
|
); |
80
|
|
|
|
|
|
|
|
81
|
0
|
|
|
|
|
|
$self->beanstalk->watch( $args{beanstalk_watch} )->recv; |
82
|
|
|
|
|
|
|
|
83
|
|
|
|
|
|
|
$self->on( |
84
|
|
|
|
|
|
|
start => sub { |
85
|
0
|
|
|
0
|
|
|
my $self = shift; |
86
|
0
|
|
0
|
|
|
|
my $reason = shift || '(unknown)'; |
87
|
|
|
|
|
|
|
|
88
|
0
|
|
|
|
|
|
$self->{_log}->{trace}->("in start: $reason"); |
89
|
|
|
|
|
|
|
|
90
|
0
|
0
|
|
|
|
|
unless ( $self->{_running} ) { |
91
|
0
|
|
|
|
|
|
$self->{_log}->{trace}->("worker is not running"); |
92
|
0
|
|
|
|
|
|
return; |
93
|
|
|
|
|
|
|
} |
94
|
|
|
|
|
|
|
|
95
|
0
|
0
|
|
|
|
|
unless ( $self->job_count < $self->concurrency ) { |
96
|
0
|
|
|
|
|
|
$self->{_log}->{trace}->( "worker running " |
97
|
|
|
|
|
|
|
. $self->job_count |
98
|
|
|
|
|
|
|
. " jobs; will not accept more jobs until others finish" |
99
|
|
|
|
|
|
|
); |
100
|
0
|
|
|
|
|
|
return; |
101
|
|
|
|
|
|
|
} |
102
|
|
|
|
|
|
|
|
103
|
0
|
0
|
0
|
|
|
|
if ( $self->max_jobs and $self->handled_jobs >= $self->max_jobs ) { |
104
|
0
|
|
|
|
|
|
$self->{_log}->{info}->("Handled " . $self->handled_jobs . "; will not accept more jobs"); |
105
|
0
|
|
|
|
|
|
return; |
106
|
|
|
|
|
|
|
} |
107
|
|
|
|
|
|
|
|
108
|
0
|
0
|
0
|
|
|
|
if ( ! $self->job_count and $self->{_stop_tries} ) { |
109
|
0
|
|
|
|
|
|
$self->{_log}->{info}->("No jobs left; stopping as requested"); |
110
|
0
|
|
|
|
|
|
return $self->stop; |
111
|
|
|
|
|
|
|
} |
112
|
|
|
|
|
|
|
|
113
|
|
|
|
|
|
|
$self->beanstalk->reserve( |
114
|
|
|
|
|
|
|
$self->{_reserve_timeout}, |
115
|
|
|
|
|
|
|
sub { |
116
|
0
|
|
|
|
|
|
my ( $qjob, $qresp ) = @_; |
117
|
0
|
|
0
|
|
|
|
$qresp //= ''; |
118
|
|
|
|
|
|
|
|
119
|
0
|
0
|
|
|
|
|
if ( $qresp =~ /timed_out/i ) { |
120
|
0
|
0
|
|
|
|
|
$self->{_reserve_timeout} *= |
121
|
|
|
|
|
|
|
$self->{_reserve_timeout_factor} |
122
|
|
|
|
|
|
|
unless $self->{_reserve_timeout} >= |
123
|
|
|
|
|
|
|
$self->{_reserve_timeout_max}; |
124
|
0
|
|
|
|
|
|
$self->{_log}->{trace} |
125
|
|
|
|
|
|
|
->("beanstalk reservation timed out"); |
126
|
0
|
|
|
|
|
|
return $self->emit( start => ($qresp) ); |
127
|
|
|
|
|
|
|
} |
128
|
|
|
|
|
|
|
|
129
|
0
|
0
|
|
|
|
|
unless ( $qresp =~ /reserved/i ) { |
130
|
0
|
0
|
|
|
|
|
$self->{_log}->{note}->("beanstalk returned: $qresp") |
131
|
|
|
|
|
|
|
unless $qresp =~ /deadline_soon/i; |
132
|
0
|
|
|
|
|
|
return $self->emit( start => ($qresp) ); |
133
|
|
|
|
|
|
|
} |
134
|
|
|
|
|
|
|
|
135
|
0
|
|
|
|
|
|
$self->{_reserve_timeout} = $self->{_reserve_base}; |
136
|
|
|
|
|
|
|
|
137
|
0
|
0
|
|
|
|
|
if ( $self->{_jobs}->{ $qjob->id } ) { |
138
|
0
|
|
|
|
|
|
$self->{_log_ctx}->log( warn => "Already have " |
139
|
|
|
|
|
|
|
. $qjob->id |
140
|
|
|
|
|
|
|
. " reserved (must have expired)" ); |
141
|
0
|
|
|
|
|
|
return $self->emit( start => ("already reserved") ); |
142
|
|
|
|
|
|
|
} |
143
|
|
|
|
|
|
|
|
144
|
0
|
|
|
|
|
|
$self->{_jobs}->{ $qjob->id } = 1; |
145
|
0
|
|
|
|
|
|
$self->{_handled_jobs}++; |
146
|
|
|
|
|
|
|
|
147
|
0
|
|
|
|
|
|
$self->{_log}->{info}->( "added job " |
148
|
|
|
|
|
|
|
. $qjob->id |
149
|
|
|
|
|
|
|
. "; outstanding jobs: " |
150
|
|
|
|
|
|
|
. $self->job_count ); |
151
|
|
|
|
|
|
|
|
152
|
0
|
|
|
|
|
|
$self->{_log}->{trace}->( "reserved job " . $qjob->id ); |
153
|
|
|
|
|
|
|
|
154
|
0
|
0
|
|
|
|
|
if ($self->{_initial_state}) { |
155
|
0
|
|
|
|
|
|
$self->emit( $self->{_initial_state} => @_ ); |
156
|
|
|
|
|
|
|
} |
157
|
|
|
|
|
|
|
|
158
|
|
|
|
|
|
|
else { |
159
|
0
|
|
|
|
|
|
$self->finish( |
160
|
|
|
|
|
|
|
release => $qjob->id, |
161
|
|
|
|
|
|
|
{ delay => $self->{_release_delay} } |
162
|
|
|
|
|
|
|
); |
163
|
|
|
|
|
|
|
} |
164
|
|
|
|
|
|
|
|
165
|
0
|
|
|
|
|
|
$self->emit( start => ('reserved') ); |
166
|
|
|
|
|
|
|
} |
167
|
0
|
|
|
|
|
|
); |
168
|
|
|
|
|
|
|
} |
169
|
0
|
|
|
|
|
|
); |
170
|
|
|
|
|
|
|
|
171
|
|
|
|
|
|
|
## FIXME: thinking about when to touch jobs, how to respond to |
172
|
|
|
|
|
|
|
## FIXME: NOT_FOUND, etc. after timeouts |
173
|
|
|
|
|
|
|
|
174
|
|
|
|
|
|
|
## FIXME: think about logging for clarity; figure out how to |
175
|
|
|
|
|
|
|
## FIXME: filter 'note' level messages (for example) |
176
|
|
|
|
|
|
|
|
177
|
0
|
|
|
|
|
|
$self->init(@_); |
178
|
|
|
|
|
|
|
|
179
|
0
|
|
|
|
|
|
return $self; |
180
|
|
|
|
|
|
|
} |
181
|
|
|
|
|
|
|
|
182
|
0
|
|
|
0
|
1
|
|
sub init { } |
183
|
|
|
|
|
|
|
|
184
|
|
|
|
|
|
|
sub start { |
185
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
186
|
0
|
|
|
|
|
|
$self->{_running} = 1; |
187
|
0
|
|
|
|
|
|
$self->{_stop_tries} = 0; |
188
|
0
|
|
|
|
|
|
$self->{_log}->{trace}->("starting worker"); |
189
|
0
|
|
|
|
|
|
$self->emit( start => ('start sub') ); |
190
|
|
|
|
|
|
|
} |
191
|
|
|
|
|
|
|
|
192
|
|
|
|
|
|
|
sub finish { |
193
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
194
|
0
|
|
|
|
|
|
my $action = shift; |
195
|
0
|
|
|
|
|
|
my $job_id = shift; |
196
|
0
|
|
|
|
|
|
my $cb = pop; |
197
|
0
|
|
|
|
|
|
my $args = shift; |
198
|
|
|
|
|
|
|
|
199
|
|
|
|
|
|
|
## FIXME: find a clean way to execute our code *and* the callback |
200
|
0
|
0
|
|
|
|
|
if ( ref($cb) ne 'CODE' ) { |
201
|
0
|
|
|
|
|
|
$args = $cb; |
202
|
0
|
|
|
0
|
|
|
$cb = sub { }; |
|
0
|
|
|
|
|
|
|
203
|
|
|
|
|
|
|
} |
204
|
|
|
|
|
|
|
|
205
|
|
|
|
|
|
|
my $internal = sub { |
206
|
0
|
|
|
0
|
|
|
delete $self->{_jobs}->{$job_id}; ## IMPORTANT |
207
|
|
|
|
|
|
|
|
208
|
0
|
0
|
|
|
|
|
if ( $self->job_count == $self->concurrency - 1 ) { |
209
|
|
|
|
|
|
|
## we've been waiting for a slot to free up |
210
|
0
|
|
|
|
|
|
$self->emit( start => ('finish sub') ); |
211
|
|
|
|
|
|
|
} |
212
|
|
|
|
|
|
|
|
213
|
0
|
|
|
|
|
|
$self->{_log}->{info} |
214
|
|
|
|
|
|
|
->( "finished with $job_id ($action); outstanding jobs: " |
215
|
|
|
|
|
|
|
. $self->job_count ); |
216
|
|
|
|
|
|
|
|
217
|
|
|
|
|
|
|
# $cb->($job_id); |
218
|
|
|
|
|
|
|
|
219
|
|
|
|
|
|
|
## we're done |
220
|
0
|
0
|
0
|
|
|
|
if ( $self->max_jobs |
|
|
|
0
|
|
|
|
|
221
|
|
|
|
|
|
|
and $self->handled_jobs >= $self->max_jobs |
222
|
|
|
|
|
|
|
and ! $self->job_count ) { |
223
|
0
|
|
|
|
|
|
$self->{_log}->{info}->("Handled " . $self->handled_jobs . "; quitting"); |
224
|
0
|
|
|
|
|
|
return $self->stop; |
225
|
|
|
|
|
|
|
} |
226
|
0
|
|
|
|
|
|
}; |
227
|
|
|
|
|
|
|
|
228
|
0
|
|
|
|
|
|
eval { |
229
|
0
|
0
|
|
|
|
|
$self->beanstalk->$action( $job_id, ( $args ? $args : () ), $internal ); |
230
|
|
|
|
|
|
|
}; |
231
|
|
|
|
|
|
|
|
232
|
0
|
0
|
|
|
|
|
$self->{_log_ctx}->log( |
233
|
|
|
|
|
|
|
error => "first argument to finish() must be a beanstalk command: $@" ) |
234
|
|
|
|
|
|
|
if $@; |
235
|
|
|
|
|
|
|
} |
236
|
|
|
|
|
|
|
|
237
|
|
|
|
|
|
|
sub stop { |
238
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
239
|
0
|
|
|
|
|
|
$self->{_stop_tries}++; |
240
|
|
|
|
|
|
|
|
241
|
0
|
0
|
|
|
|
|
if ( $self->{_stop_tries} >= $self->{_max_stop_tries} ) { |
242
|
0
|
|
|
|
|
|
$self->{_log_ctx}->log( |
243
|
|
|
|
|
|
|
warn => "stop requested; impatiently quitting outstanding jobs" ); |
244
|
0
|
|
|
|
|
|
exit; |
245
|
|
|
|
|
|
|
} |
246
|
|
|
|
|
|
|
|
247
|
0
|
0
|
|
|
|
|
if ( $self->job_count ) { |
248
|
0
|
|
|
|
|
|
$self->{_log_ctx} |
249
|
|
|
|
|
|
|
->log( warn => "stop requested; waiting for outstanding jobs" ); |
250
|
0
|
|
|
|
|
|
return; |
251
|
|
|
|
|
|
|
} |
252
|
|
|
|
|
|
|
|
253
|
0
|
|
|
|
|
|
$self->{_log_ctx}->log( fatal => "exiting" ); |
254
|
0
|
|
|
|
|
|
exit; |
255
|
|
|
|
|
|
|
} |
256
|
|
|
|
|
|
|
|
257
|
|
|
|
|
|
|
sub on { |
258
|
0
|
|
|
0
|
1
|
|
my ( $self, $event, $cb ) = @_; |
259
|
|
|
|
|
|
|
|
260
|
0
|
|
|
|
|
|
$self->{_cb}->{$event} = $cb; |
261
|
|
|
|
|
|
|
|
262
|
|
|
|
|
|
|
$self->{_event}->{$event} = sub { |
263
|
0
|
|
|
0
|
|
|
my $evt = shift; |
264
|
|
|
|
|
|
|
AnyEvent->condvar( |
265
|
|
|
|
|
|
|
cb => sub { |
266
|
0
|
0
|
|
|
|
|
if ( ref( $self->{_cb}->{$evt} ) eq 'CODE' ) { |
267
|
0
|
|
|
|
|
|
$self->{_log}->{trace}->("event: $evt"); |
268
|
0
|
|
|
|
|
|
my @data = $_[0]->recv; |
269
|
0
|
|
|
|
|
|
$self->{_log}->{debug}->( |
270
|
0
|
|
|
|
|
|
"shift event ($evt): " . shift @{ $self->{_events} } ); |
271
|
0
|
|
|
|
|
|
$self->{_log}->{debug}->( |
272
|
0
|
|
|
|
|
|
"EVENTS (s): " . join( ' ' => @{ $self->{_events} } ) ); |
273
|
0
|
|
|
|
|
|
$self->{_cb}->{$evt}->(@data); |
274
|
|
|
|
|
|
|
} |
275
|
|
|
|
|
|
|
|
276
|
0
|
|
|
|
|
|
$self->{_event}->{$evt} = AnyEvent->condvar( cb => __SUB__ ); |
277
|
|
|
|
|
|
|
} |
278
|
0
|
|
|
|
|
|
); |
279
|
|
|
|
|
|
|
} |
280
|
0
|
|
|
|
|
|
->($event); |
281
|
|
|
|
|
|
|
} |
282
|
|
|
|
|
|
|
|
283
|
|
|
|
|
|
|
sub emit { |
284
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
285
|
0
|
|
|
|
|
|
my $event = shift; |
286
|
0
|
|
|
|
|
|
$self->{_log}->{debug}->("push event ($event)"); |
287
|
0
|
|
|
|
|
|
push @{ $self->{_events} }, $event; |
|
0
|
|
|
|
|
|
|
288
|
0
|
|
|
|
|
|
$self->{_log}->{debug} |
289
|
0
|
|
|
|
|
|
->( "EVENTS (p): " . join( ' ' => @{ $self->{_events} } ) ); |
290
|
0
|
|
|
|
|
|
$self->{_event}->{$event}->send( $self, @_ ); |
291
|
|
|
|
|
|
|
} |
292
|
|
|
|
|
|
|
|
293
|
|
|
|
|
|
|
sub beanstalk { |
294
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
295
|
0
|
0
|
|
|
|
|
$self->{_beanstalk} = AnyEvent::Beanstalk->new(@_) if @_; |
296
|
0
|
|
|
|
|
|
return $self->{_beanstalk}; |
297
|
|
|
|
|
|
|
} |
298
|
|
|
|
|
|
|
|
299
|
0
|
|
|
0
|
1
|
|
sub job_count { scalar keys %{ $_[0]->{_jobs} } } |
|
0
|
|
|
|
|
|
|
300
|
|
|
|
|
|
|
|
301
|
0
|
|
|
0
|
1
|
|
sub handled_jobs { $_[0]->{_handled_jobs} } |
302
|
|
|
|
|
|
|
|
303
|
0
|
|
|
0
|
1
|
|
sub max_jobs { $_[0]->{_max_jobs} } |
304
|
|
|
|
|
|
|
|
305
|
|
|
|
|
|
|
sub concurrency { |
306
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
307
|
|
|
|
|
|
|
|
308
|
0
|
0
|
|
|
|
|
if (@_) { |
309
|
0
|
|
|
|
|
|
$self->{_concurrency} = shift; |
310
|
|
|
|
|
|
|
} |
311
|
0
|
|
|
|
|
|
return $self->{_concurrency}; |
312
|
|
|
|
|
|
|
} |
313
|
|
|
|
|
|
|
|
314
|
|
|
|
|
|
|
1; |
315
|
|
|
|
|
|
|
__END__ |