File Coverage

blib/lib/AnyEvent/Beanstalk/Worker.pm
Criterion Covered Total %
statement 21 173 12.1
branch 0 44 0.0
condition 0 32 0.0
subroutine 7 26 26.9
pod 12 12 100.0
total 40 287 13.9


line stmt bran cond sub pod time code
1             package AnyEvent::Beanstalk::Worker;
2              
3 1     1   29190 use 5.016001;
  1         4  
  1         25  
4 1     1   5 use strict;
  1         3  
  1         37  
5 1     1   4 use warnings;
  1         6  
  1         32  
6 1     1   11 use feature 'current_sub';
  1         2  
  1         94  
7 1     1   4 use AnyEvent;
  1         2  
  1         20  
8 1     1   883 use AnyEvent::Log;
  1         12794  
  1         47  
9 1     1   1111 use AnyEvent::Beanstalk;
  1         77135  
  1         24  
10              
11             our $VERSION = '0.03';
12              
13             sub new {
14 0     0 1   my $class = shift;
15 0           my $self = {};
16 0           bless $self => $class;
17              
18 0           my %args = @_;
19              
20 0           $self->{_cb} = {};
21 0           $self->{_event} = {};
22 0           $self->{_jobs} = {};
23 0           $self->{_events} = []; ## event queue
24 0           $self->{_handled_jobs} = 0; ## simple job counter
25              
26 0           $self->{_running} = 0;
27 0           $self->{_stop_tries} = 0;
28 0   0       $self->{_max_stop_tries} = $args{max_stop_tries} // 3;
29 0   0       $self->{_max_jobs} = $args{max_jobs} || 0;
30 0   0       $self->{_concurrency} = $args{concurrency} || 1;
31 0   0       $self->{_log_level} = $args{log_level} // 4;
32              
33 0   0       $self->{_reserve_timeout} = $args{reserve_timeout} || 1;
34 0           $self->{_reserve_base} = $self->{_reserve_timeout};
35 0           $self->{_reserve_timeout_factor} = 1.1;
36 0           $self->{_reserve_timeout_max} = 4;
37 0   0       $self->{_release_delay} = $args{release_delay} || 3;
38              
39 0           $self->{_initial_state} = $args{initial_state};
40              
41 0           $self->{_log_ctx} = AnyEvent::Log::ctx;
42 0           $self->{_log_ctx}->title(__PACKAGE__);
43              
44 0           $self->{_log} = {};
45 0           $self->{_log}->{trace} = $self->{_log_ctx}->logger("trace");
46 0           $self->{_log}->{debug} = $self->{_log_ctx}->logger("debug");
47 0           $self->{_log}->{info} = $self->{_log_ctx}->logger("info");
48 0           $self->{_log}->{note} = $self->{_log_ctx}->logger("note");
49              
50 0           $self->{_signal} = {};
51             $self->{_signal}->{TERM} = AnyEvent->signal(
52             signal => "TERM",
53             cb =>
54 0     0     sub { $self->{_log_ctx}->log( warn => "TERM received" ); $self->stop }
  0            
55 0           );
56             $self->{_signal}->{INT} = AnyEvent->signal(
57             signal => "INT",
58             cb =>
59 0     0     sub { $self->{_log_ctx}->log( warn => "INT received" ); $self->stop }
  0            
60 0           );
61             $self->{_signal}->{USR2} = AnyEvent->signal(
62             signal => "USR2",
63             cb => sub {
64 0 0   0     $self->{_log_level} =
65             ( $self->{_log_level} >= 9 ? 2 : $self->{_log_level} + 1 );
66             }
67 0           );
68              
69 0   0       $args{beanstalk_host} ||= 'localhost';
70 0   0       $args{beanstalk_port} ||= 11300;
71              
72 0 0         unless ($args{beanstalk_watch}) {
73 0           die "beanstalk_watch argument required\n";
74             }
75              
76             $self->beanstalk(
77 0           server => $args{beanstalk_host} . ':' . $args{beanstalk_port},
78             decoder => $args{beanstalk_decoder}
79             );
80              
81 0           $self->beanstalk->watch( $args{beanstalk_watch} )->recv;
82              
83             $self->on(
84             start => sub {
85 0     0     my $self = shift;
86 0   0       my $reason = shift || '(unknown)';
87              
88 0           $self->{_log}->{trace}->("in start: $reason");
89              
90 0 0         unless ( $self->{_running} ) {
91 0           $self->{_log}->{trace}->("worker is not running");
92 0           return;
93             }
94              
95 0 0         unless ( $self->job_count < $self->concurrency ) {
96 0           $self->{_log}->{trace}->( "worker running "
97             . $self->job_count
98             . " jobs; will not accept more jobs until others finish"
99             );
100 0           return;
101             }
102              
103 0 0 0       if ( $self->max_jobs and $self->handled_jobs >= $self->max_jobs ) {
104 0           $self->{_log}->{info}->("Handled " . $self->handled_jobs . "; will not accept more jobs");
105 0           return;
106             }
107              
108 0 0 0       if ( ! $self->job_count and $self->{_stop_tries} ) {
109 0           $self->{_log}->{info}->("No jobs left; stopping as requested");
110 0           return $self->stop;
111             }
112              
113             $self->beanstalk->reserve(
114             $self->{_reserve_timeout},
115             sub {
116 0           my ( $qjob, $qresp ) = @_;
117 0   0       $qresp //= '';
118              
119 0 0         if ( $qresp =~ /timed_out/i ) {
120 0 0         $self->{_reserve_timeout} *=
121             $self->{_reserve_timeout_factor}
122             unless $self->{_reserve_timeout} >=
123             $self->{_reserve_timeout_max};
124 0           $self->{_log}->{trace}
125             ->("beanstalk reservation timed out");
126 0           return $self->emit( start => ($qresp) );
127             }
128              
129 0 0         unless ( $qresp =~ /reserved/i ) {
130 0 0         $self->{_log}->{note}->("beanstalk returned: $qresp")
131             unless $qresp =~ /deadline_soon/i;
132 0           return $self->emit( start => ($qresp) );
133             }
134              
135 0           $self->{_reserve_timeout} = $self->{_reserve_base};
136              
137 0 0         if ( $self->{_jobs}->{ $qjob->id } ) {
138 0           $self->{_log_ctx}->log( warn => "Already have "
139             . $qjob->id
140             . " reserved (must have expired)" );
141 0           return $self->emit( start => ("already reserved") );
142             }
143              
144 0           $self->{_jobs}->{ $qjob->id } = 1;
145 0           $self->{_handled_jobs}++;
146              
147 0           $self->{_log}->{info}->( "added job "
148             . $qjob->id
149             . "; outstanding jobs: "
150             . $self->job_count );
151              
152 0           $self->{_log}->{trace}->( "reserved job " . $qjob->id );
153              
154 0 0         if ($self->{_initial_state}) {
155 0           $self->emit( $self->{_initial_state} => @_ );
156             }
157              
158             else {
159 0           $self->finish(
160             release => $qjob->id,
161             { delay => $self->{_release_delay} }
162             );
163             }
164              
165 0           $self->emit( start => ('reserved') );
166             }
167 0           );
168             }
169 0           );
170              
171             ## FIXME: thinking about when to touch jobs, how to respond to
172             ## FIXME: NOT_FOUND, etc. after timeouts
173              
174             ## FIXME: think about logging for clarity; figure out how to
175             ## FIXME: filter 'note' level messages (for example)
176              
177 0           $self->init(@_);
178              
179 0           return $self;
180             }
181              
182 0     0 1   sub init { }
183              
184             sub start {
185 0     0 1   my $self = shift;
186 0           $self->{_running} = 1;
187 0           $self->{_stop_tries} = 0;
188 0           $self->{_log}->{trace}->("starting worker");
189 0           $self->emit( start => ('start sub') );
190             }
191              
192             sub finish {
193 0     0 1   my $self = shift;
194 0           my $action = shift;
195 0           my $job_id = shift;
196 0           my $cb = pop;
197 0           my $args = shift;
198              
199             ## FIXME: find a clean way to execute our code *and* the callback
200 0 0         if ( ref($cb) ne 'CODE' ) {
201 0           $args = $cb;
202 0     0     $cb = sub { };
  0            
203             }
204              
205             my $internal = sub {
206 0     0     delete $self->{_jobs}->{$job_id}; ## IMPORTANT
207              
208 0 0         if ( $self->job_count == $self->concurrency - 1 ) {
209             ## we've been waiting for a slot to free up
210 0           $self->emit( start => ('finish sub') );
211             }
212              
213 0           $self->{_log}->{info}
214             ->( "finished with $job_id ($action); outstanding jobs: "
215             . $self->job_count );
216              
217             # $cb->($job_id);
218              
219             ## we're done
220 0 0 0       if ( $self->max_jobs
      0        
221             and $self->handled_jobs >= $self->max_jobs
222             and ! $self->job_count ) {
223 0           $self->{_log}->{info}->("Handled " . $self->handled_jobs . "; quitting");
224 0           return $self->stop;
225             }
226 0           };
227              
228 0           eval {
229 0 0         $self->beanstalk->$action( $job_id, ( $args ? $args : () ), $internal );
230             };
231              
232 0 0         $self->{_log_ctx}->log(
233             error => "first argument to finish() must be a beanstalk command: $@" )
234             if $@;
235             }
236              
237             sub stop {
238 0     0 1   my $self = shift;
239 0           $self->{_stop_tries}++;
240              
241 0 0         if ( $self->{_stop_tries} >= $self->{_max_stop_tries} ) {
242 0           $self->{_log_ctx}->log(
243             warn => "stop requested; impatiently quitting outstanding jobs" );
244 0           exit;
245             }
246              
247 0 0         if ( $self->job_count ) {
248 0           $self->{_log_ctx}
249             ->log( warn => "stop requested; waiting for outstanding jobs" );
250 0           return;
251             }
252              
253 0           $self->{_log_ctx}->log( fatal => "exiting" );
254 0           exit;
255             }
256              
257             sub on {
258 0     0 1   my ( $self, $event, $cb ) = @_;
259              
260 0           $self->{_cb}->{$event} = $cb;
261              
262             $self->{_event}->{$event} = sub {
263 0     0     my $evt = shift;
264             AnyEvent->condvar(
265             cb => sub {
266 0 0         if ( ref( $self->{_cb}->{$evt} ) eq 'CODE' ) {
267 0           $self->{_log}->{trace}->("event: $evt");
268 0           my @data = $_[0]->recv;
269 0           $self->{_log}->{debug}->(
270 0           "shift event ($evt): " . shift @{ $self->{_events} } );
271 0           $self->{_log}->{debug}->(
272 0           "EVENTS (s): " . join( ' ' => @{ $self->{_events} } ) );
273 0           $self->{_cb}->{$evt}->(@data);
274             }
275              
276 0           $self->{_event}->{$evt} = AnyEvent->condvar( cb => __SUB__ );
277             }
278 0           );
279             }
280 0           ->($event);
281             }
282              
283             sub emit {
284 0     0 1   my $self = shift;
285 0           my $event = shift;
286 0           $self->{_log}->{debug}->("push event ($event)");
287 0           push @{ $self->{_events} }, $event;
  0            
288 0           $self->{_log}->{debug}
289 0           ->( "EVENTS (p): " . join( ' ' => @{ $self->{_events} } ) );
290 0           $self->{_event}->{$event}->send( $self, @_ );
291             }
292              
293             sub beanstalk {
294 0     0 1   my $self = shift;
295 0 0         $self->{_beanstalk} = AnyEvent::Beanstalk->new(@_) if @_;
296 0           return $self->{_beanstalk};
297             }
298              
299 0     0 1   sub job_count { scalar keys %{ $_[0]->{_jobs} } }
  0            
300              
301 0     0 1   sub handled_jobs { $_[0]->{_handled_jobs} }
302              
303 0     0 1   sub max_jobs { $_[0]->{_max_jobs} }
304              
305             sub concurrency {
306 0     0 1   my $self = shift;
307              
308 0 0         if (@_) {
309 0           $self->{_concurrency} = shift;
310             }
311 0           return $self->{_concurrency};
312             }
313              
314             1;
315             __END__