line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
package AnyEvent::Beanstalk::Worker; |
2
|
|
|
|
|
|
|
|
3
|
1
|
|
|
1
|
|
22305
|
use 5.016001; |
|
1
|
|
|
|
|
6
|
|
4
|
1
|
|
|
1
|
|
10
|
use strict; |
|
1
|
|
|
|
|
4
|
|
|
1
|
|
|
|
|
36
|
|
5
|
1
|
|
|
1
|
|
9
|
use warnings; |
|
1
|
|
|
|
|
14
|
|
|
1
|
|
|
|
|
42
|
|
6
|
1
|
|
|
1
|
|
6
|
use feature 'current_sub'; |
|
1
|
|
|
|
|
6
|
|
|
1
|
|
|
|
|
93
|
|
7
|
1
|
|
|
1
|
|
6
|
use AnyEvent; |
|
1
|
|
|
|
|
2
|
|
|
1
|
|
|
|
|
21
|
|
8
|
1
|
|
|
1
|
|
520
|
use AnyEvent::Log; |
|
1
|
|
|
|
|
10218
|
|
|
1
|
|
|
|
|
54
|
|
9
|
1
|
|
|
1
|
|
653
|
use AnyEvent::Beanstalk; |
|
1
|
|
|
|
|
47087
|
|
|
1
|
|
|
|
|
16
|
|
10
|
|
|
|
|
|
|
|
11
|
|
|
|
|
|
|
our $VERSION = '0.05'; |
12
|
|
|
|
|
|
|
|
13
|
|
|
|
|
|
|
sub new { |
14
|
0
|
|
|
0
|
1
|
|
my $class = shift; |
15
|
0
|
|
|
|
|
|
my $self = {}; |
16
|
0
|
|
|
|
|
|
bless $self => $class; |
17
|
|
|
|
|
|
|
|
18
|
0
|
|
|
|
|
|
my %args = @_; |
19
|
|
|
|
|
|
|
|
20
|
0
|
|
|
|
|
|
$self->{_cb} = {}; |
21
|
0
|
|
|
|
|
|
$self->{_event} = {}; |
22
|
0
|
|
|
|
|
|
$self->{_jobs} = {}; |
23
|
0
|
|
|
|
|
|
$self->{_events} = []; ## event queue |
24
|
0
|
|
|
|
|
|
$self->{_handled_jobs} = 0; ## simple job counter |
25
|
|
|
|
|
|
|
|
26
|
0
|
|
|
|
|
|
$self->{_running} = 0; |
27
|
0
|
|
|
|
|
|
$self->{_stop_tries} = 0; |
28
|
0
|
|
0
|
|
|
|
$self->{_max_stop_tries} = $args{max_stop_tries} // 3; |
29
|
0
|
|
0
|
|
|
|
$self->{_max_jobs} = $args{max_jobs} || 0; |
30
|
0
|
|
0
|
|
|
|
$self->{_concurrency} = $args{concurrency} || 1; |
31
|
0
|
|
0
|
|
|
|
$self->{_log_level} = $args{log_level} // 4; |
32
|
|
|
|
|
|
|
|
33
|
0
|
|
0
|
|
|
|
$self->{_reserve_timeout} = $args{reserve_timeout} || 1; |
34
|
0
|
|
|
|
|
|
$self->{_reserve_base} = $self->{_reserve_timeout}; |
35
|
0
|
|
|
|
|
|
$self->{_reserve_timeout_factor} = 1.1; |
36
|
0
|
|
|
|
|
|
$self->{_reserve_timeout_max} = 4; |
37
|
0
|
|
0
|
|
|
|
$self->{_release_delay} = $args{release_delay} || 3; |
38
|
|
|
|
|
|
|
|
39
|
0
|
|
|
|
|
|
$self->{_initial_state} = $args{initial_state}; |
40
|
|
|
|
|
|
|
|
41
|
0
|
|
|
|
|
|
$self->{_log_ctx} = AnyEvent::Log::ctx; |
42
|
0
|
|
|
|
|
|
$self->{_log_ctx}->title(__PACKAGE__); |
43
|
0
|
|
|
|
|
|
$self->{_log_ctx}->level($self->{_log_level}); |
44
|
|
|
|
|
|
|
|
45
|
0
|
|
|
|
|
|
$self->{_log} = {}; |
46
|
0
|
|
|
|
|
|
$self->{_log}->{trace} = $self->{_log_ctx}->logger("trace"); |
47
|
0
|
|
|
|
|
|
$self->{_log}->{debug} = $self->{_log_ctx}->logger("debug"); |
48
|
0
|
|
|
|
|
|
$self->{_log}->{info} = $self->{_log_ctx}->logger("info"); |
49
|
0
|
|
|
|
|
|
$self->{_log}->{note} = $self->{_log_ctx}->logger("note"); |
50
|
|
|
|
|
|
|
|
51
|
0
|
|
|
|
|
|
$self->{_signal} = {}; |
52
|
|
|
|
|
|
|
$self->{_signal}->{TERM} = AnyEvent->signal( |
53
|
|
|
|
|
|
|
signal => "TERM", |
54
|
|
|
|
|
|
|
cb => |
55
|
0
|
|
|
0
|
|
|
sub { $self->{_log_ctx}->log( warn => "TERM received" ); $self->stop } |
|
0
|
|
|
|
|
|
|
56
|
0
|
|
|
|
|
|
); |
57
|
|
|
|
|
|
|
$self->{_signal}->{INT} = AnyEvent->signal( |
58
|
|
|
|
|
|
|
signal => "INT", |
59
|
|
|
|
|
|
|
cb => |
60
|
0
|
|
|
0
|
|
|
sub { $self->{_log_ctx}->log( warn => "INT received" ); $self->stop } |
|
0
|
|
|
|
|
|
|
61
|
0
|
|
|
|
|
|
); |
62
|
|
|
|
|
|
|
$self->{_signal}->{USR2} = AnyEvent->signal( |
63
|
|
|
|
|
|
|
signal => "USR2", |
64
|
|
|
|
|
|
|
cb => sub { |
65
|
|
|
|
|
|
|
$self->{_log_level} = |
66
|
0
|
0
|
|
0
|
|
|
( $self->{_log_level} >= 9 ? 2 : $self->{_log_level} + 1 ); |
67
|
0
|
|
|
|
|
|
$self->{_log_ctx}->level($self->{_log_level}); |
68
|
|
|
|
|
|
|
} |
69
|
0
|
|
|
|
|
|
); |
70
|
|
|
|
|
|
|
|
71
|
0
|
|
0
|
|
|
|
$args{beanstalk_host} ||= 'localhost'; |
72
|
0
|
|
0
|
|
|
|
$args{beanstalk_port} ||= 11300; |
73
|
|
|
|
|
|
|
|
74
|
0
|
0
|
|
|
|
|
unless ($args{beanstalk_watch}) { |
75
|
0
|
|
|
|
|
|
die "beanstalk_watch argument required\n"; |
76
|
|
|
|
|
|
|
} |
77
|
|
|
|
|
|
|
|
78
|
|
|
|
|
|
|
$self->beanstalk( |
79
|
|
|
|
|
|
|
server => $args{beanstalk_host} . ':' . $args{beanstalk_port}, |
80
|
|
|
|
|
|
|
decoder => $args{beanstalk_decoder} |
81
|
0
|
|
|
|
|
|
); |
82
|
|
|
|
|
|
|
|
83
|
0
|
|
|
|
|
|
$self->beanstalk->watch( $args{beanstalk_watch} )->recv; |
84
|
|
|
|
|
|
|
|
85
|
|
|
|
|
|
|
$self->on( |
86
|
|
|
|
|
|
|
start => sub { |
87
|
0
|
|
|
0
|
|
|
my $self = shift; |
88
|
0
|
|
0
|
|
|
|
my $reason = shift || '(unknown)'; |
89
|
|
|
|
|
|
|
|
90
|
0
|
|
|
|
|
|
$self->{_log}->{trace}->("in start: $reason"); |
91
|
|
|
|
|
|
|
|
92
|
0
|
0
|
|
|
|
|
unless ( $self->{_running} ) { |
93
|
0
|
|
|
|
|
|
$self->{_log}->{trace}->("worker is not running"); |
94
|
0
|
|
|
|
|
|
return; |
95
|
|
|
|
|
|
|
} |
96
|
|
|
|
|
|
|
|
97
|
0
|
0
|
|
|
|
|
unless ( $self->job_count < $self->concurrency ) { |
98
|
0
|
|
|
|
|
|
$self->{_log}->{trace}->( "worker running " |
99
|
|
|
|
|
|
|
. $self->job_count |
100
|
|
|
|
|
|
|
. " jobs; will not accept more jobs until others finish" |
101
|
|
|
|
|
|
|
); |
102
|
0
|
|
|
|
|
|
return; |
103
|
|
|
|
|
|
|
} |
104
|
|
|
|
|
|
|
|
105
|
0
|
0
|
0
|
|
|
|
if ( $self->max_jobs and $self->handled_jobs >= $self->max_jobs ) { |
106
|
0
|
|
|
|
|
|
$self->{_log}->{info}->("Handled " . $self->handled_jobs . "; will not accept more jobs"); |
107
|
0
|
|
|
|
|
|
return; |
108
|
|
|
|
|
|
|
} |
109
|
|
|
|
|
|
|
|
110
|
0
|
0
|
0
|
|
|
|
if ( ! $self->job_count and $self->{_stop_tries} ) { |
111
|
0
|
|
|
|
|
|
$self->{_log}->{info}->("No jobs left; stopping as requested"); |
112
|
0
|
|
|
|
|
|
return $self->stop; |
113
|
|
|
|
|
|
|
} |
114
|
|
|
|
|
|
|
|
115
|
|
|
|
|
|
|
$self->beanstalk->reserve( |
116
|
|
|
|
|
|
|
$self->{_reserve_timeout}, |
117
|
|
|
|
|
|
|
sub { |
118
|
0
|
|
|
|
|
|
my ( $qjob, $qresp ) = @_; |
119
|
0
|
|
0
|
|
|
|
$qresp //= ''; |
120
|
|
|
|
|
|
|
|
121
|
0
|
0
|
|
|
|
|
if ( $qresp =~ /timed_out/i ) { |
122
|
|
|
|
|
|
|
$self->{_reserve_timeout} *= |
123
|
|
|
|
|
|
|
$self->{_reserve_timeout_factor} |
124
|
|
|
|
|
|
|
unless $self->{_reserve_timeout} >= |
125
|
0
|
0
|
|
|
|
|
$self->{_reserve_timeout_max}; |
126
|
|
|
|
|
|
|
$self->{_log}->{trace} |
127
|
0
|
|
|
|
|
|
->("beanstalk reservation timed out"); |
128
|
0
|
|
|
|
|
|
return $self->emit( start => ($qresp) ); |
129
|
|
|
|
|
|
|
} |
130
|
|
|
|
|
|
|
|
131
|
0
|
0
|
|
|
|
|
unless ( $qresp =~ /reserved/i ) { |
132
|
0
|
0
|
|
|
|
|
$self->{_log}->{note}->("beanstalk returned: $qresp") |
133
|
|
|
|
|
|
|
unless $qresp =~ /deadline_soon/i; |
134
|
0
|
|
|
|
|
|
return $self->emit( start => ($qresp) ); |
135
|
|
|
|
|
|
|
} |
136
|
|
|
|
|
|
|
|
137
|
0
|
|
|
|
|
|
$self->{_reserve_timeout} = $self->{_reserve_base}; |
138
|
|
|
|
|
|
|
|
139
|
0
|
0
|
|
|
|
|
if ( $self->{_jobs}->{ $qjob->id } ) { |
140
|
0
|
|
|
|
|
|
$self->{_log_ctx}->log( warn => "Already have " |
141
|
|
|
|
|
|
|
. $qjob->id |
142
|
|
|
|
|
|
|
. " reserved (must have expired)" ); |
143
|
0
|
|
|
|
|
|
return $self->emit( start => ("already reserved") ); |
144
|
|
|
|
|
|
|
} |
145
|
|
|
|
|
|
|
|
146
|
0
|
|
|
|
|
|
$self->{_jobs}->{ $qjob->id } = 1; |
147
|
0
|
|
|
|
|
|
$self->{_handled_jobs}++; |
148
|
|
|
|
|
|
|
|
149
|
0
|
|
|
|
|
|
$self->{_log}->{info}->( "added job " |
150
|
|
|
|
|
|
|
. $qjob->id |
151
|
|
|
|
|
|
|
. "; outstanding jobs: " |
152
|
|
|
|
|
|
|
. $self->job_count ); |
153
|
|
|
|
|
|
|
|
154
|
0
|
|
|
|
|
|
$self->{_log}->{trace}->( "reserved job " . $qjob->id ); |
155
|
|
|
|
|
|
|
|
156
|
0
|
0
|
|
|
|
|
if ($self->{_initial_state}) { |
157
|
0
|
|
|
|
|
|
$self->emit( $self->{_initial_state} => @_ ); |
158
|
|
|
|
|
|
|
} |
159
|
|
|
|
|
|
|
|
160
|
|
|
|
|
|
|
else { |
161
|
|
|
|
|
|
|
$self->finish( |
162
|
|
|
|
|
|
|
release => $qjob->id, |
163
|
|
|
|
|
|
|
{ delay => $self->{_release_delay} } |
164
|
0
|
|
|
|
|
|
); |
165
|
|
|
|
|
|
|
} |
166
|
|
|
|
|
|
|
|
167
|
0
|
|
|
|
|
|
$self->emit( start => ('reserved') ); |
168
|
|
|
|
|
|
|
} |
169
|
0
|
|
|
|
|
|
); |
170
|
|
|
|
|
|
|
} |
171
|
0
|
|
|
|
|
|
); |
172
|
|
|
|
|
|
|
|
173
|
|
|
|
|
|
|
## FIXME: thinking about when to touch jobs, how to respond to |
174
|
|
|
|
|
|
|
## FIXME: NOT_FOUND, etc. after timeouts |
175
|
|
|
|
|
|
|
|
176
|
|
|
|
|
|
|
## FIXME: think about logging for clarity; figure out how to |
177
|
|
|
|
|
|
|
## FIXME: filter 'note' level messages (for example) |
178
|
|
|
|
|
|
|
|
179
|
0
|
|
|
|
|
|
$self->init(@_); |
180
|
|
|
|
|
|
|
|
181
|
0
|
|
|
|
|
|
return $self; |
182
|
|
|
|
|
|
|
} |
183
|
|
|
|
|
|
|
|
184
|
|
|
|
0
|
1
|
|
sub init { } |
185
|
|
|
|
|
|
|
|
186
|
|
|
|
|
|
|
sub start { |
187
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
188
|
0
|
|
|
|
|
|
$self->{_running} = 1; |
189
|
0
|
|
|
|
|
|
$self->{_stop_tries} = 0; |
190
|
0
|
|
|
|
|
|
$self->{_log}->{trace}->("starting worker"); |
191
|
0
|
|
|
|
|
|
$self->emit( start => ('start sub') ); |
192
|
|
|
|
|
|
|
} |
193
|
|
|
|
|
|
|
|
194
|
|
|
|
|
|
|
sub finish { |
195
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
196
|
0
|
|
|
|
|
|
my $action = shift; |
197
|
0
|
|
|
|
|
|
my $job_id = shift; |
198
|
0
|
|
|
|
|
|
my $cb = pop; |
199
|
0
|
|
|
|
|
|
my $args = shift; |
200
|
|
|
|
|
|
|
|
201
|
|
|
|
|
|
|
## FIXME: find a clean way to execute our code *and* the callback |
202
|
0
|
0
|
|
|
|
|
if ( ref($cb) ne 'CODE' ) { |
203
|
0
|
|
|
|
|
|
$args = $cb; |
204
|
0
|
|
|
0
|
|
|
$cb = sub { }; |
205
|
|
|
|
|
|
|
} |
206
|
|
|
|
|
|
|
|
207
|
|
|
|
|
|
|
my $internal = sub { |
208
|
0
|
|
|
0
|
|
|
delete $self->{_jobs}->{$job_id}; ## IMPORTANT |
209
|
|
|
|
|
|
|
|
210
|
0
|
0
|
|
|
|
|
if ( $self->job_count == $self->concurrency - 1 ) { |
211
|
|
|
|
|
|
|
## we've been waiting for a slot to free up |
212
|
0
|
|
|
|
|
|
$self->emit( start => ('finish sub') ); |
213
|
|
|
|
|
|
|
} |
214
|
|
|
|
|
|
|
|
215
|
|
|
|
|
|
|
$self->{_log}->{info} |
216
|
0
|
|
|
|
|
|
->( "finished with $job_id ($action); outstanding jobs: " |
217
|
|
|
|
|
|
|
. $self->job_count ); |
218
|
|
|
|
|
|
|
|
219
|
|
|
|
|
|
|
# $cb->($job_id); |
220
|
|
|
|
|
|
|
|
221
|
|
|
|
|
|
|
## we're done |
222
|
0
|
0
|
0
|
|
|
|
if ( $self->max_jobs |
|
|
|
0
|
|
|
|
|
223
|
|
|
|
|
|
|
and $self->handled_jobs >= $self->max_jobs |
224
|
|
|
|
|
|
|
and ! $self->job_count ) { |
225
|
0
|
|
|
|
|
|
$self->{_log}->{info}->("Handled " . $self->handled_jobs . "; quitting"); |
226
|
0
|
|
|
|
|
|
return $self->stop; |
227
|
|
|
|
|
|
|
} |
228
|
0
|
|
|
|
|
|
}; |
229
|
|
|
|
|
|
|
|
230
|
0
|
|
|
|
|
|
eval { |
231
|
0
|
0
|
|
|
|
|
$self->beanstalk->$action( $job_id, ( $args ? $args : () ), $internal ); |
232
|
|
|
|
|
|
|
}; |
233
|
|
|
|
|
|
|
|
234
|
|
|
|
|
|
|
$self->{_log_ctx}->log( |
235
|
0
|
0
|
|
|
|
|
error => "first argument to finish() must be a beanstalk command: $@" ) |
236
|
|
|
|
|
|
|
if $@; |
237
|
|
|
|
|
|
|
} |
238
|
|
|
|
|
|
|
|
239
|
|
|
|
|
|
|
sub stop { |
240
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
241
|
0
|
|
|
|
|
|
$self->{_stop_tries}++; |
242
|
|
|
|
|
|
|
|
243
|
0
|
0
|
|
|
|
|
if ( $self->{_stop_tries} >= $self->{_max_stop_tries} ) { |
244
|
|
|
|
|
|
|
$self->{_log_ctx}->log( |
245
|
0
|
|
|
|
|
|
warn => "stop requested; impatiently quitting outstanding jobs" ); |
246
|
0
|
|
|
|
|
|
exit; |
247
|
|
|
|
|
|
|
} |
248
|
|
|
|
|
|
|
|
249
|
0
|
0
|
|
|
|
|
if ( $self->job_count ) { |
250
|
|
|
|
|
|
|
$self->{_log_ctx} |
251
|
0
|
|
|
|
|
|
->log( warn => "stop requested; waiting for outstanding jobs" ); |
252
|
0
|
|
|
|
|
|
return; |
253
|
|
|
|
|
|
|
} |
254
|
|
|
|
|
|
|
|
255
|
0
|
|
|
|
|
|
$self->{_log_ctx}->log( fatal => "exiting" ); |
256
|
0
|
|
|
|
|
|
exit; |
257
|
|
|
|
|
|
|
} |
258
|
|
|
|
|
|
|
|
259
|
|
|
|
|
|
|
sub on { |
260
|
0
|
|
|
0
|
1
|
|
my ( $self, $event, $cb ) = @_; |
261
|
|
|
|
|
|
|
|
262
|
0
|
|
|
|
|
|
$self->{_cb}->{$event} = $cb; |
263
|
|
|
|
|
|
|
|
264
|
|
|
|
|
|
|
$self->{_event}->{$event} = sub { |
265
|
0
|
|
|
0
|
|
|
my $evt = shift; |
266
|
|
|
|
|
|
|
AnyEvent->condvar( |
267
|
|
|
|
|
|
|
cb => sub { |
268
|
0
|
0
|
|
|
|
|
if ( ref( $self->{_cb}->{$evt} ) eq 'CODE' ) { |
269
|
0
|
|
|
|
|
|
$self->{_log}->{trace}->("event: $evt"); |
270
|
0
|
|
|
|
|
|
my @data = $_[0]->recv; |
271
|
|
|
|
|
|
|
$self->{_log}->{debug}->( |
272
|
0
|
|
|
|
|
|
"shift event ($evt): " . shift @{ $self->{_events} } ); |
|
0
|
|
|
|
|
|
|
273
|
|
|
|
|
|
|
$self->{_log}->{debug}->( |
274
|
0
|
|
|
|
|
|
"EVENTS (s): " . join( ' ' => @{ $self->{_events} } ) ); |
|
0
|
|
|
|
|
|
|
275
|
0
|
|
|
|
|
|
$self->{_cb}->{$evt}->(@data); |
276
|
|
|
|
|
|
|
} |
277
|
|
|
|
|
|
|
|
278
|
0
|
|
|
|
|
|
$self->{_event}->{$evt} = AnyEvent->condvar( cb => __SUB__ ); |
279
|
|
|
|
|
|
|
} |
280
|
0
|
|
|
|
|
|
); |
281
|
|
|
|
|
|
|
} |
282
|
0
|
|
|
|
|
|
->($event); |
283
|
|
|
|
|
|
|
} |
284
|
|
|
|
|
|
|
|
285
|
|
|
|
|
|
|
sub emit { |
286
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
287
|
0
|
|
|
|
|
|
my $event = shift; |
288
|
0
|
|
|
|
|
|
$self->{_log}->{debug}->("push event ($event)"); |
289
|
0
|
|
|
|
|
|
push @{ $self->{_events} }, $event; |
|
0
|
|
|
|
|
|
|
290
|
|
|
|
|
|
|
$self->{_log}->{debug} |
291
|
0
|
|
|
|
|
|
->( "EVENTS (p): " . join( ' ' => @{ $self->{_events} } ) ); |
|
0
|
|
|
|
|
|
|
292
|
0
|
|
|
|
|
|
$self->{_event}->{$event}->send( $self, @_ ); |
293
|
|
|
|
|
|
|
} |
294
|
|
|
|
|
|
|
|
295
|
|
|
|
|
|
|
sub beanstalk { |
296
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
297
|
0
|
0
|
|
|
|
|
$self->{_beanstalk} = AnyEvent::Beanstalk->new(@_) if @_; |
298
|
0
|
|
|
|
|
|
return $self->{_beanstalk}; |
299
|
|
|
|
|
|
|
} |
300
|
|
|
|
|
|
|
|
301
|
0
|
|
|
0
|
1
|
|
sub job_count { scalar keys %{ $_[0]->{_jobs} } } |
|
0
|
|
|
|
|
|
|
302
|
|
|
|
|
|
|
|
303
|
0
|
|
|
0
|
1
|
|
sub handled_jobs { $_[0]->{_handled_jobs} } |
304
|
|
|
|
|
|
|
|
305
|
0
|
|
|
0
|
1
|
|
sub max_jobs { $_[0]->{_max_jobs} } |
306
|
|
|
|
|
|
|
|
307
|
|
|
|
|
|
|
sub concurrency { |
308
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
309
|
|
|
|
|
|
|
|
310
|
0
|
0
|
|
|
|
|
if (@_) { |
311
|
0
|
|
|
|
|
|
$self->{_concurrency} = shift; |
312
|
|
|
|
|
|
|
} |
313
|
0
|
|
|
|
|
|
return $self->{_concurrency}; |
314
|
|
|
|
|
|
|
} |
315
|
|
|
|
|
|
|
|
316
|
|
|
|
|
|
|
1; |
317
|
|
|
|
|
|
|
__END__ |