File Coverage

blib/lib/Minion/Worker.pm
Criterion Covered Total %
statement 12 107 11.2
branch 0 36 0.0
condition 0 54 0.0
subroutine 4 21 19.0
pod 8 8 100.0
total 24 226 10.6


line stmt bran cond sub pod time code
1             package Minion::Worker;
2 2     2   14 use Mojo::Base 'Mojo::EventEmitter';
  2         3  
  2         16  
3              
4 2     2   640 use Carp qw(croak);
  2         2  
  2         120  
5 2     2   1034 use Minion::Util qw(desired_tasks);
  2         8  
  2         159  
6 2     2   15 use Mojo::Util qw(steady_time);
  2         5  
  2         4577  
7              
8             has [qw(commands status)] => sub { {} };
9             has [qw(id minion)];
10              
11 0 0   0 1   sub add_command { $_[0]->commands->{$_[1]} = $_[2] and return $_[0] }
12              
13             sub dequeue {
14 0     0 1   my ($self, $wait, $options) = @_;
15              
16             # Worker not registered
17 0 0         return undef unless my $id = $self->id;
18              
19 0           my $minion = $self->minion;
20 0 0         return undef unless my $job = $minion->backend->dequeue($id, $wait, $options);
21             $job = $minion->class_for_task($job->{task})
22 0           ->new(args => $job->{args}, id => $job->{id}, minion => $minion, retries => $job->{retries}, task => $job->{task});
23 0           $self->emit(dequeue => $job);
24 0           return $job;
25             }
26              
27 0     0 1   sub info { $_[0]->minion->backend->list_workers(0, 1, {ids => [$_[0]->id]})->{workers}[0] }
28              
29             sub new {
30 0     0 1   my $self = shift->SUPER::new(@_);
31 0     0     $self->on(busy => sub { sleep 1 });
  0            
32 0           return $self;
33             }
34              
35             sub process_commands {
36 0     0 1   my $self = shift;
37              
38 0           my $called = 0;
39 0           for my $command (@{$self->minion->backend->receive($self->id)}) {
  0            
40 0 0         next unless my $cb = $self->commands->{shift @$command};
41 0           $self->$cb(@$command);
42 0           $called++;
43             }
44 0 0         $self->register if $called;
45              
46 0           return $self;
47             }
48              
49             sub register {
50 0     0 1   my $self = shift;
51 0           my $status = {status => $self->status};
52 0           return $self->id($self->minion->backend->register_worker($self->id, $status));
53             }
54              
55             sub run {
56 0     0 1   my $self = shift;
57              
58 0           my $status = $self->status;
59 0   0       $status->{command_interval} //= 10;
60 0   0       $status->{dequeue_timeout} //= 5;
61 0   0       $status->{heartbeat_interval} //= 300;
62 0   0       $status->{jobs} //= 4;
63 0   0       $status->{limits} //= {};
64 0   0       $status->{queues} ||= ['default'];
65 0   0       $status->{performed} //= 0;
66 0   0       $status->{repair_interval} //= 21600;
67 0           $status->{repair_interval} -= int rand $status->{repair_interval} / 2;
68 0   0       $status->{spare} //= 1;
69 0   0       $status->{spare_min_priority} //= 1;
70 0   0       $status->{type} //= 'Perl';
71              
72             # Reset event loop
73 0           Mojo::IOLoop->reset;
74 0     0     local $SIG{CHLD} = sub { };
75 0     0     local $SIG{INT} = local $SIG{TERM} = sub { $self->{finished}++ };
  0            
76             local $SIG{QUIT} = sub {
77 0 0   0     ++$self->{finished} and kill 'KILL', map { $_->pid } @{$self->{jobs}};
  0            
  0            
78 0           };
79              
80             # Remote control commands need to validate arguments carefully
81 0           my $commands = $self->commands;
82 0 0 0 0     local $commands->{jobs} = sub { $status->{jobs} = $_[1] if ($_[1] // '') =~ /^\d+$/ };
  0            
83 0           local $commands->{kill} = \&_kill;
84 0 0 0 0     local $commands->{spare} = sub { $status->{spare} = $_[1] if ($_[1] // '') =~ /^\d+$/ };
  0            
85 0     0     local $commands->{stop} = sub { $self->_kill('KILL', $_[1]) };
  0            
86              
87 0   0       eval { $self->_work until $self->{finished} && !@{$self->{jobs}} };
  0            
  0            
88 0           my $err = $@;
89 0           $self->unregister;
90 0 0         croak $err if $err;
91             }
92              
93             sub unregister {
94 0     0 1   my $self = shift;
95 0           $self->minion->backend->unregister_worker(delete $self->{id});
96 0           return $self;
97             }
98              
99             sub _kill {
100 0   0 0     my ($self, $signal, $id) = (shift, shift // '', shift // '');
      0        
101 0 0         return unless grep { $signal eq $_ } qw(INT TERM KILL USR1 USR2);
  0            
102 0           $_->kill($signal) for grep { $_->id eq $id } @{$self->{jobs}};
  0            
  0            
103             }
104              
105             sub _work {
106 0     0     my $self = shift;
107              
108             # Send heartbeats in regular intervals
109 0           my $status = $self->status;
110 0   0       $self->{last_heartbeat} ||= -$status->{heartbeat_interval};
111             $self->register and $self->{last_heartbeat} = steady_time
112 0 0 0       if ($self->{last_heartbeat} + $status->{heartbeat_interval}) < steady_time;
113              
114             # Process worker remote control commands in regular intervals
115 0   0       $self->{last_command} ||= 0;
116             $self->process_commands and $self->{last_command} = steady_time
117 0 0 0       if ($self->{last_command} + $status->{command_interval}) < steady_time;
118              
119             # Repair in regular intervals (randomize to avoid congestion)
120 0   0       $self->{last_repair} ||= 0;
121 0 0         if (($self->{last_repair} + $status->{repair_interval}) < steady_time) {
122 0           $self->minion->repair;
123 0           $self->{last_repair} = steady_time;
124             }
125              
126             # Check if jobs are finished
127 0   0       my $jobs = $self->{jobs} ||= [];
128 0 0 0       @$jobs = map { $_->is_finished && ++$status->{performed} ? () : $_ } @$jobs;
  0            
129              
130             # Job limit has been reached or worker is stopping
131 0           my @extra;
132 0 0 0       if ($self->{finished} || ($status->{jobs} + $status->{spare}) <= @$jobs) { return $self->emit('busy') }
  0 0          
133 0           elsif ($status->{jobs} <= @$jobs) { @extra = (min_priority => $status->{spare_min_priority}) }
134              
135             # Try to get more jobs
136 0           my $tasks = desired_tasks($status->{limits}, [keys %{$self->minion->tasks}], [map { $_->task } @$jobs]);
  0            
  0            
137 0 0         return unless @$tasks;
138 0           my ($max, $queues) = @{$status}{qw(dequeue_timeout queues)};
  0            
139 0           my $job = $self->emit('wait')->dequeue($max => {queues => $queues, tasks => $tasks, @extra});
140 0 0         push @$jobs, $job->start if $job;
141             }
142              
143             1;
144              
145             =encoding utf8
146              
147             =head1 NAME
148              
149             Minion::Worker - Minion worker
150              
151             =head1 SYNOPSIS
152              
153             use Minion::Worker;
154              
155             my $worker = Minion::Worker->new(minion => $minion);
156              
157             =head1 DESCRIPTION
158              
159             L performs jobs for L.
160              
161             =head1 WORKER SIGNALS
162              
163             The L process can be controlled at runtime with the following signals.
164              
165             =head2 INT, TERM
166              
167             Stop gracefully after finishing the current jobs.
168              
169             =head2 QUIT
170              
171             Stop immediately without finishing the current jobs.
172              
173             =head1 JOB SIGNALS
174              
175             The job processes spawned by the L process can be controlled at runtime with the following signals.
176              
177             =head2 INT, TERM
178              
179             This signal starts out with the operating system default and allows for jobs to install a custom signal handler to stop
180             gracefully.
181              
182             =head2 USR1, USR2
183              
184             These signals start out being ignored and allow for jobs to install custom signal handlers.
185              
186             =head1 EVENTS
187              
188             L inherits all events from L and can emit the following new ones.
189              
190             =head2 busy
191              
192             $worker->on(busy => sub ($worker) {
193             ...
194             });
195              
196             Emitted in the worker process when it is performing the maximum number of jobs in parallel.
197              
198             $worker->on(busy => sub ($worker) {
199             my $max = $worker->status->{jobs};
200             say "Performing $max jobs.";
201             });
202              
203             =head2 dequeue
204              
205             $worker->on(dequeue => sub ($worker, $job) {
206             ...
207             });
208              
209             Emitted in the worker process after a job has been dequeued.
210              
211             $worker->on(dequeue => sub ($worker, $job) {
212             my $id = $job->id;
213             say "Job $id has been dequeued.";
214             });
215              
216             =head2 wait
217              
218             $worker->on(wait => sub ($worker) {
219             ...
220             });
221              
222             Emitted in the worker process before it tries to dequeue a job.
223              
224             $worker->on(wait => sub ($worker) {
225             my $max = $worker->status->{dequeue_timeout};
226             say "Waiting up to $max seconds for a new job.";
227             });
228              
229             =head1 ATTRIBUTES
230              
231             L implements the following attributes.
232              
233             =head2 commands
234              
235             my $commands = $worker->commands;
236             $worker = $worker->commands({jobs => sub {...}});
237              
238             Registered worker remote control commands.
239              
240             =head2 id
241              
242             my $id = $worker->id;
243             $worker = $worker->id($id);
244              
245             Worker id.
246              
247             =head2 minion
248              
249             my $minion = $worker->minion;
250             $worker = $worker->minion(Minion->new);
251              
252             L object this worker belongs to.
253              
254             =head2 status
255              
256             my $status = $worker->status;
257             $worker = $worker->status({queues => ['default', 'important']);
258              
259             Status information to configure workers started with L and to share every time L is called.
260              
261             =head1 METHODS
262              
263             L inherits all methods from L and implements the following new ones.
264              
265             =head2 add_command
266              
267             $worker = $worker->add_command(jobs => sub {...});
268              
269             Register a worker remote control command.
270              
271             $worker->add_command(foo => sub ($worker, @args) {
272             ...
273             });
274              
275             =head2 dequeue
276              
277             my $job = $worker->dequeue(0.5);
278             my $job = $worker->dequeue(0.5 => {queues => ['important']});
279              
280             Wait a given amount of time in seconds for a job, dequeue L object and transition from C to
281             C state, or return C if queues were empty.
282              
283             These options are currently available:
284              
285             =over 2
286              
287             =item id
288              
289             id => '10023'
290              
291             Dequeue a specific job.
292              
293             =item min_priority
294              
295             min_priority => 3
296              
297             Do not dequeue jobs with a lower priority.
298              
299             =item queues
300              
301             queues => ['important']
302              
303             One or more queues to dequeue jobs from, defaults to C.
304              
305             =item tasks
306              
307             tasks => ['foo', 'bar']
308              
309             One or more tasks to dequeue jobs from, defaults to all tasks.
310              
311             =back
312              
313             =head2 info
314              
315             my $info = $worker->info;
316              
317             Get worker information.
318              
319             # Check worker host
320             my $host = $worker->info->{host};
321              
322             These fields are currently available:
323              
324             =over 2
325              
326             =item host
327              
328             host => 'localhost'
329              
330             Worker host.
331              
332             =item jobs
333              
334             jobs => ['10023', '10024', '10025', '10029']
335              
336             Ids of jobs the worker is currently processing.
337              
338             =item notified
339              
340             notified => 784111777
341              
342             Epoch time worker sent the last heartbeat.
343              
344             =item pid
345              
346             pid => 12345
347              
348             Process id of worker.
349              
350             =item started
351              
352             started => 784111777
353              
354             Epoch time worker was started.
355              
356             =item status
357              
358             status => {queues => ['default', 'important']}
359              
360             Hash reference with whatever status information the worker would like to share.
361              
362             =back
363              
364             =head2 new
365              
366             my $worker = Minion::Worker->new;
367             my $worker = Minion::Worker->new(status => {foo => 'bar'});
368             my $worker = Minion::Worker->new({status => {foo => 'bar'}});
369              
370             Construct a new L object and subscribe to L event with default handler that sleeps for one
371             second.
372              
373             =head2 process_commands
374              
375             $worker = $worker->process_commands;
376              
377             Process worker remote control commands.
378              
379             =head2 register
380              
381             $worker = $worker->register;
382              
383             Register worker or send heartbeat to show that this worker is still alive.
384              
385             =head2 run
386              
387             $worker->run;
388              
389             Run worker and wait for L.
390              
391             # Start a worker for a special named queue
392             my $worker = $minion->worker;
393             $worker->status->{queues} = ['important'];
394             $worker->run;
395              
396             These L options are currently available:
397              
398             =over 2
399              
400             =item command_interval
401              
402             command_interval => 20
403              
404             Worker remote control command interval, defaults to C<10>.
405              
406             =item dequeue_timeout
407              
408             dequeue_timeout => 5
409              
410             Maximum amount time in seconds to wait for a job, defaults to C<5>.
411              
412             =item heartbeat_interval
413              
414             heartbeat_interval => 60
415              
416             Heartbeat interval, defaults to C<300>.
417              
418             =item jobs
419              
420             jobs => 12
421              
422             Maximum number of jobs to perform parallel in forked worker processes (not including spare processes), defaults to C<4>.
423              
424             =item queues
425              
426             queues => ['test']
427              
428             One or more queues to get jobs from, defaults to C.
429              
430             =item repair_interval
431              
432             repair_interval => 3600
433              
434             Repair interval, up to half of this value can be subtracted randomly to make sure not all workers repair at the same
435             time, defaults to C<21600> (6 hours).
436              
437             =item spare
438              
439             spare => 2
440              
441             Number of spare worker processes to reserve for high priority jobs, defaults to C<1>.
442              
443             =item spare_min_priority
444              
445             spare_min_priority => 7
446              
447             Minimum priority of jobs to use spare worker processes for, defaults to C<1>.
448              
449             =back
450              
451             These remote control L are currently available:
452              
453             =over 2
454              
455             =item jobs
456              
457             $minion->broadcast('jobs', [10]);
458             $minion->broadcast('jobs', [10], [$worker_id]);
459              
460             Instruct one or more workers to change the number of jobs to perform concurrently. Setting this value to C<0> will
461             effectively pause the worker. That means all current jobs will be finished, but no new ones accepted, until the number
462             is increased again.
463              
464             =item kill
465              
466             $minion->broadcast('kill', ['INT', 10025]);
467             $minion->broadcast('kill', ['INT', 10025], [$worker_id]);
468              
469             Instruct one or more workers to send a signal to a job that is currently being performed. This command will be ignored
470             by workers that do not have a job matching the id. That means it is safe to broadcast this command to all workers.
471              
472             =item stop
473              
474             $minion->broadcast('stop', [10025]);
475             $minion->broadcast('stop', [10025], [$worker_id]);
476              
477             Instruct one or more workers to stop a job that is currently being performed immediately. This command will be ignored
478             by workers that do not have a job matching the id. That means it is safe to broadcast this command to all workers.
479              
480             =back
481              
482             =head2 unregister
483              
484             $worker = $worker->unregister;
485              
486             Unregister worker.
487              
488             =head1 SEE ALSO
489              
490             L, L, L, L, L.
491              
492             =cut