File Coverage

blib/lib/AnyEvent/Beanstalk/Worker.pm
Criterion Covered Total %
statement 20 172 11.6
branch 0 44 0.0
condition 0 32 0.0
subroutine 7 26 26.9
pod 12 12 100.0
total 39 286 13.6


line stmt bran cond sub pod time code
1             package AnyEvent::Beanstalk::Worker;
2              
3 1     1   22305 use 5.016001;
  1         6  
4 1     1   10 use strict;
  1         4  
  1         36  
5 1     1   9 use warnings;
  1         14  
  1         42  
6 1     1   6 use feature 'current_sub';
  1         6  
  1         93  
7 1     1   6 use AnyEvent;
  1         2  
  1         21  
8 1     1   520 use AnyEvent::Log;
  1         10218  
  1         54  
9 1     1   653 use AnyEvent::Beanstalk;
  1         47087  
  1         16  
10              
11             our $VERSION = '0.05';
12              
13             sub new {
14 0     0 1   my $class = shift;
15 0           my $self = {};
16 0           bless $self => $class;
17              
18 0           my %args = @_;
19              
20 0           $self->{_cb} = {};
21 0           $self->{_event} = {};
22 0           $self->{_jobs} = {};
23 0           $self->{_events} = []; ## event queue
24 0           $self->{_handled_jobs} = 0; ## simple job counter
25              
26 0           $self->{_running} = 0;
27 0           $self->{_stop_tries} = 0;
28 0   0       $self->{_max_stop_tries} = $args{max_stop_tries} // 3;
29 0   0       $self->{_max_jobs} = $args{max_jobs} || 0;
30 0   0       $self->{_concurrency} = $args{concurrency} || 1;
31 0   0       $self->{_log_level} = $args{log_level} // 4;
32              
33 0   0       $self->{_reserve_timeout} = $args{reserve_timeout} || 1;
34 0           $self->{_reserve_base} = $self->{_reserve_timeout};
35 0           $self->{_reserve_timeout_factor} = 1.1;
36 0           $self->{_reserve_timeout_max} = 4;
37 0   0       $self->{_release_delay} = $args{release_delay} || 3;
38              
39 0           $self->{_initial_state} = $args{initial_state};
40              
41 0           $self->{_log_ctx} = AnyEvent::Log::ctx;
42 0           $self->{_log_ctx}->title(__PACKAGE__);
43 0           $self->{_log_ctx}->level($self->{_log_level});
44              
45 0           $self->{_log} = {};
46 0           $self->{_log}->{trace} = $self->{_log_ctx}->logger("trace");
47 0           $self->{_log}->{debug} = $self->{_log_ctx}->logger("debug");
48 0           $self->{_log}->{info} = $self->{_log_ctx}->logger("info");
49 0           $self->{_log}->{note} = $self->{_log_ctx}->logger("note");
50              
51 0           $self->{_signal} = {};
52             $self->{_signal}->{TERM} = AnyEvent->signal(
53             signal => "TERM",
54             cb =>
55 0     0     sub { $self->{_log_ctx}->log( warn => "TERM received" ); $self->stop }
  0            
56 0           );
57             $self->{_signal}->{INT} = AnyEvent->signal(
58             signal => "INT",
59             cb =>
60 0     0     sub { $self->{_log_ctx}->log( warn => "INT received" ); $self->stop }
  0            
61 0           );
62             $self->{_signal}->{USR2} = AnyEvent->signal(
63             signal => "USR2",
64             cb => sub {
65             $self->{_log_level} =
66 0 0   0     ( $self->{_log_level} >= 9 ? 2 : $self->{_log_level} + 1 );
67 0           $self->{_log_ctx}->level($self->{_log_level});
68             }
69 0           );
70              
71 0   0       $args{beanstalk_host} ||= 'localhost';
72 0   0       $args{beanstalk_port} ||= 11300;
73              
74 0 0         unless ($args{beanstalk_watch}) {
75 0           die "beanstalk_watch argument required\n";
76             }
77              
78             $self->beanstalk(
79             server => $args{beanstalk_host} . ':' . $args{beanstalk_port},
80             decoder => $args{beanstalk_decoder}
81 0           );
82              
83 0           $self->beanstalk->watch( $args{beanstalk_watch} )->recv;
84              
85             $self->on(
86             start => sub {
87 0     0     my $self = shift;
88 0   0       my $reason = shift || '(unknown)';
89              
90 0           $self->{_log}->{trace}->("in start: $reason");
91              
92 0 0         unless ( $self->{_running} ) {
93 0           $self->{_log}->{trace}->("worker is not running");
94 0           return;
95             }
96              
97 0 0         unless ( $self->job_count < $self->concurrency ) {
98 0           $self->{_log}->{trace}->( "worker running "
99             . $self->job_count
100             . " jobs; will not accept more jobs until others finish"
101             );
102 0           return;
103             }
104              
105 0 0 0       if ( $self->max_jobs and $self->handled_jobs >= $self->max_jobs ) {
106 0           $self->{_log}->{info}->("Handled " . $self->handled_jobs . "; will not accept more jobs");
107 0           return;
108             }
109              
110 0 0 0       if ( ! $self->job_count and $self->{_stop_tries} ) {
111 0           $self->{_log}->{info}->("No jobs left; stopping as requested");
112 0           return $self->stop;
113             }
114              
115             $self->beanstalk->reserve(
116             $self->{_reserve_timeout},
117             sub {
118 0           my ( $qjob, $qresp ) = @_;
119 0   0       $qresp //= '';
120              
121 0 0         if ( $qresp =~ /timed_out/i ) {
122             $self->{_reserve_timeout} *=
123             $self->{_reserve_timeout_factor}
124             unless $self->{_reserve_timeout} >=
125 0 0         $self->{_reserve_timeout_max};
126             $self->{_log}->{trace}
127 0           ->("beanstalk reservation timed out");
128 0           return $self->emit( start => ($qresp) );
129             }
130              
131 0 0         unless ( $qresp =~ /reserved/i ) {
132 0 0         $self->{_log}->{note}->("beanstalk returned: $qresp")
133             unless $qresp =~ /deadline_soon/i;
134 0           return $self->emit( start => ($qresp) );
135             }
136              
137 0           $self->{_reserve_timeout} = $self->{_reserve_base};
138              
139 0 0         if ( $self->{_jobs}->{ $qjob->id } ) {
140 0           $self->{_log_ctx}->log( warn => "Already have "
141             . $qjob->id
142             . " reserved (must have expired)" );
143 0           return $self->emit( start => ("already reserved") );
144             }
145              
146 0           $self->{_jobs}->{ $qjob->id } = 1;
147 0           $self->{_handled_jobs}++;
148              
149 0           $self->{_log}->{info}->( "added job "
150             . $qjob->id
151             . "; outstanding jobs: "
152             . $self->job_count );
153              
154 0           $self->{_log}->{trace}->( "reserved job " . $qjob->id );
155              
156 0 0         if ($self->{_initial_state}) {
157 0           $self->emit( $self->{_initial_state} => @_ );
158             }
159              
160             else {
161             $self->finish(
162             release => $qjob->id,
163             { delay => $self->{_release_delay} }
164 0           );
165             }
166              
167 0           $self->emit( start => ('reserved') );
168             }
169 0           );
170             }
171 0           );
172              
173             ## FIXME: thinking about when to touch jobs, how to respond to
174             ## FIXME: NOT_FOUND, etc. after timeouts
175              
176             ## FIXME: think about logging for clarity; figure out how to
177             ## FIXME: filter 'note' level messages (for example)
178              
179 0           $self->init(@_);
180              
181 0           return $self;
182             }
183              
184       0 1   sub init { }
185              
186             sub start {
187 0     0 1   my $self = shift;
188 0           $self->{_running} = 1;
189 0           $self->{_stop_tries} = 0;
190 0           $self->{_log}->{trace}->("starting worker");
191 0           $self->emit( start => ('start sub') );
192             }
193              
194             sub finish {
195 0     0 1   my $self = shift;
196 0           my $action = shift;
197 0           my $job_id = shift;
198 0           my $cb = pop;
199 0           my $args = shift;
200              
201             ## FIXME: find a clean way to execute our code *and* the callback
202 0 0         if ( ref($cb) ne 'CODE' ) {
203 0           $args = $cb;
204 0     0     $cb = sub { };
205             }
206              
207             my $internal = sub {
208 0     0     delete $self->{_jobs}->{$job_id}; ## IMPORTANT
209              
210 0 0         if ( $self->job_count == $self->concurrency - 1 ) {
211             ## we've been waiting for a slot to free up
212 0           $self->emit( start => ('finish sub') );
213             }
214              
215             $self->{_log}->{info}
216 0           ->( "finished with $job_id ($action); outstanding jobs: "
217             . $self->job_count );
218              
219             # $cb->($job_id);
220              
221             ## we're done
222 0 0 0       if ( $self->max_jobs
      0        
223             and $self->handled_jobs >= $self->max_jobs
224             and ! $self->job_count ) {
225 0           $self->{_log}->{info}->("Handled " . $self->handled_jobs . "; quitting");
226 0           return $self->stop;
227             }
228 0           };
229              
230 0           eval {
231 0 0         $self->beanstalk->$action( $job_id, ( $args ? $args : () ), $internal );
232             };
233              
234             $self->{_log_ctx}->log(
235 0 0         error => "first argument to finish() must be a beanstalk command: $@" )
236             if $@;
237             }
238              
239             sub stop {
240 0     0 1   my $self = shift;
241 0           $self->{_stop_tries}++;
242              
243 0 0         if ( $self->{_stop_tries} >= $self->{_max_stop_tries} ) {
244             $self->{_log_ctx}->log(
245 0           warn => "stop requested; impatiently quitting outstanding jobs" );
246 0           exit;
247             }
248              
249 0 0         if ( $self->job_count ) {
250             $self->{_log_ctx}
251 0           ->log( warn => "stop requested; waiting for outstanding jobs" );
252 0           return;
253             }
254              
255 0           $self->{_log_ctx}->log( fatal => "exiting" );
256 0           exit;
257             }
258              
259             sub on {
260 0     0 1   my ( $self, $event, $cb ) = @_;
261              
262 0           $self->{_cb}->{$event} = $cb;
263              
264             $self->{_event}->{$event} = sub {
265 0     0     my $evt = shift;
266             AnyEvent->condvar(
267             cb => sub {
268 0 0         if ( ref( $self->{_cb}->{$evt} ) eq 'CODE' ) {
269 0           $self->{_log}->{trace}->("event: $evt");
270 0           my @data = $_[0]->recv;
271             $self->{_log}->{debug}->(
272 0           "shift event ($evt): " . shift @{ $self->{_events} } );
  0            
273             $self->{_log}->{debug}->(
274 0           "EVENTS (s): " . join( ' ' => @{ $self->{_events} } ) );
  0            
275 0           $self->{_cb}->{$evt}->(@data);
276             }
277              
278 0           $self->{_event}->{$evt} = AnyEvent->condvar( cb => __SUB__ );
279             }
280 0           );
281             }
282 0           ->($event);
283             }
284              
285             sub emit {
286 0     0 1   my $self = shift;
287 0           my $event = shift;
288 0           $self->{_log}->{debug}->("push event ($event)");
289 0           push @{ $self->{_events} }, $event;
  0            
290             $self->{_log}->{debug}
291 0           ->( "EVENTS (p): " . join( ' ' => @{ $self->{_events} } ) );
  0            
292 0           $self->{_event}->{$event}->send( $self, @_ );
293             }
294              
295             sub beanstalk {
296 0     0 1   my $self = shift;
297 0 0         $self->{_beanstalk} = AnyEvent::Beanstalk->new(@_) if @_;
298 0           return $self->{_beanstalk};
299             }
300              
301 0     0 1   sub job_count { scalar keys %{ $_[0]->{_jobs} } }
  0            
302              
303 0     0 1   sub handled_jobs { $_[0]->{_handled_jobs} }
304              
305 0     0 1   sub max_jobs { $_[0]->{_max_jobs} }
306              
307             sub concurrency {
308 0     0 1   my $self = shift;
309              
310 0 0         if (@_) {
311 0           $self->{_concurrency} = shift;
312             }
313 0           return $self->{_concurrency};
314             }
315              
316             1;
317             __END__