File Coverage

blib/lib/Minion/Worker.pm
Criterion Covered Total %
statement 12 112 10.7
branch 0 38 0.0
condition 0 61 0.0
subroutine 4 21 19.0
pod 8 8 100.0
total 24 240 10.0


line stmt bran cond sub pod time code
1             package Minion::Worker;
2 2     2   11 use Mojo::Base 'Mojo::EventEmitter';
  2         3  
  2         12  
3              
4 2     2   511 use Carp qw(croak);
  2         6  
  2         121  
5 2     2   810 use Minion::Util qw(desired_tasks);
  2         4  
  2         140  
6 2     2   13 use Mojo::Util qw(steady_time);
  2         2  
  2         3423  
7              
8             has [qw(commands status)] => sub { {} };
9             has [qw(id minion)];
10              
11 0 0   0 1   sub add_command { $_[0]->commands->{$_[1]} = $_[2] and return $_[0] }
12              
13             sub dequeue {
14 0     0 1   my ($self, $wait, $options) = @_;
15              
16             # Worker not registered
17 0 0         return undef unless my $id = $self->id;
18              
19 0           my $minion = $self->minion;
20 0 0         return undef unless my $job = $minion->backend->dequeue($id, $wait, $options);
21             $job = $minion->class_for_task($job->{task})
22 0           ->new(args => $job->{args}, id => $job->{id}, minion => $minion, retries => $job->{retries}, task => $job->{task});
23 0           $self->emit(dequeue => $job);
24 0           return $job;
25             }
26              
27 0     0 1   sub info { $_[0]->minion->backend->list_workers(0, 1, {ids => [$_[0]->id]})->{workers}[0] }
28              
29             sub new {
30 0     0 1   my $self = shift->SUPER::new(@_);
31 0     0     $self->on(busy => sub { sleep 1 });
  0            
32 0           return $self;
33             }
34              
35             sub process_commands {
36 0     0 1   my $self = shift;
37              
38 0           my $called = 0;
39 0           for my $command (@{$self->minion->backend->receive($self->id)}) {
  0            
40 0 0         next unless my $cb = $self->commands->{shift @$command};
41 0           $self->$cb(@$command);
42 0           $called++;
43             }
44 0 0         $self->register if $called;
45              
46 0           return $self;
47             }
48              
49             sub register {
50 0     0 1   my $self = shift;
51 0           my $status = {status => $self->status};
52 0           return $self->id($self->minion->backend->register_worker($self->id, $status));
53             }
54              
55             sub run {
56 0     0 1   my $self = shift;
57              
58 0           my $status = $self->status;
59 0   0       $status->{command_interval} //= 10;
60 0   0       $status->{dequeue_timeout} //= 5;
61 0   0       $status->{dispatch_interval} //= 30;
62 0   0       $status->{heartbeat_interval} //= 300;
63 0   0       $status->{jobs} //= 4;
64 0   0       $status->{limits} //= {};
65 0   0       $status->{queues} ||= ['default'];
66 0   0       $status->{performed} //= 0;
67 0   0       $status->{repair_interval} //= 21600;
68 0           $status->{repair_interval} -= int rand $status->{repair_interval} / 2;
69 0   0       $status->{spare} //= 1;
70 0   0       $status->{spare_min_priority} //= 1;
71 0   0       $status->{type} //= 'Perl';
72              
73             # Reset event loop
74 0           Mojo::IOLoop->reset;
75 0     0     local $SIG{CHLD} = sub { };
76 0     0     local $SIG{INT} = local $SIG{TERM} = sub { $self->{finished}++ };
  0            
77             local $SIG{QUIT} = sub {
78 0 0   0     ++$self->{finished} and kill 'KILL', map { $_->pid } @{$self->{jobs}};
  0            
  0            
79 0           };
80              
81             # Remote control commands need to validate arguments carefully
82 0           my $commands = $self->commands;
83 0 0 0 0     local $commands->{jobs} = sub { $status->{jobs} = $_[1] if ($_[1] // '') =~ /^\d+$/ };
  0            
84 0           local $commands->{kill} = \&_kill;
85 0 0 0 0     local $commands->{spare} = sub { $status->{spare} = $_[1] if ($_[1] // '') =~ /^\d+$/ };
  0            
86 0     0     local $commands->{stop} = sub { $self->_kill('KILL', $_[1]) };
  0            
87              
88 0   0       eval { $self->_work until $self->{finished} && !@{$self->{jobs}} };
  0            
  0            
89 0           my $err = $@;
90 0           $self->unregister;
91 0 0         croak $err if $err;
92             }
93              
94             sub unregister {
95 0     0 1   my $self = shift;
96 0           $self->minion->backend->unregister_worker(delete $self->{id});
97 0           return $self;
98             }
99              
100             sub _kill {
101 0   0 0     my ($self, $signal, $id) = (shift, shift // '', shift // '');
      0        
102 0 0         return unless grep { $signal eq $_ } qw(INT TERM KILL USR1 USR2);
  0            
103 0           $_->kill($signal) for grep { $_->id eq $id } @{$self->{jobs}};
  0            
  0            
104             }
105              
106             sub _work {
107 0     0     my $self = shift;
108              
109             # Send heartbeats in regular intervals
110 0           my $status = $self->status;
111 0   0       $self->{last_heartbeat} ||= -$status->{heartbeat_interval};
112             $self->register and $self->{last_heartbeat} = steady_time
113 0 0 0       if ($self->{last_heartbeat} + $status->{heartbeat_interval}) < steady_time;
114              
115             # Process worker remote control commands in regular intervals
116 0   0       $self->{last_command} ||= 0;
117             $self->process_commands and $self->{last_command} = steady_time
118 0 0 0       if ($self->{last_command} + $status->{command_interval}) < steady_time;
119              
120             # Repair in regular intervals (randomize to avoid congestion)
121 0   0       $self->{last_repair} ||= 0;
122 0 0         if (($self->{last_repair} + $status->{repair_interval}) < steady_time) {
123 0           $self->minion->repair;
124 0           $self->{last_repair} = steady_time;
125             }
126              
127             # Dispatch schedules in regular intervals (database advisory lock prevents duplicates)
128 0   0       $self->{last_dispatch} ||= 0;
129 0 0 0       if ($status->{dispatch_interval} && ($self->{last_dispatch} + $status->{dispatch_interval}) < steady_time) {
130 0           $self->minion->dispatch_schedules;
131 0           $self->{last_dispatch} = steady_time;
132             }
133              
134             # Check if jobs are finished
135 0   0       my $jobs = $self->{jobs} ||= [];
136 0 0 0       @$jobs = map { $_->is_finished && ++$status->{performed} ? () : $_ } @$jobs;
  0            
137              
138             # Job limit has been reached or worker is stopping
139 0           my @extra;
140 0 0 0       if ($self->{finished} || ($status->{jobs} + $status->{spare}) <= @$jobs) { return $self->emit('busy') }
  0 0          
141 0           elsif ($status->{jobs} <= @$jobs) { @extra = (min_priority => $status->{spare_min_priority}) }
142              
143             # Try to get more jobs
144 0           my $tasks = desired_tasks($status->{limits}, [keys %{$self->minion->tasks}], [map { $_->task } @$jobs]);
  0            
  0            
145 0 0         return unless @$tasks;
146 0           my ($max, $queues) = @{$status}{qw(dequeue_timeout queues)};
  0            
147 0           my $job = $self->emit('wait')->dequeue($max => {queues => $queues, tasks => $tasks, @extra});
148 0 0         push @$jobs, $job->start if $job;
149             }
150              
151             1;
152              
153             =encoding utf8
154              
155             =head1 NAME
156              
157             Minion::Worker - Minion worker
158              
159             =head1 SYNOPSIS
160              
161             use Minion::Worker;
162              
163             my $worker = Minion::Worker->new(minion => $minion);
164              
165             =head1 DESCRIPTION
166              
167             L performs jobs for L.
168              
169             =head1 WORKER SIGNALS
170              
171             The L process can be controlled at runtime with the following signals.
172              
173             =head2 INT, TERM
174              
175             Stop gracefully after finishing the current jobs.
176              
177             =head2 QUIT
178              
179             Stop immediately without finishing the current jobs.
180              
181             =head1 JOB SIGNALS
182              
183             The job processes spawned by the L process can be controlled at runtime with the following signals.
184              
185             =head2 INT, TERM
186              
187             This signal starts out with the operating system default and allows for jobs to install a custom signal handler to stop
188             gracefully.
189              
190             =head2 USR1, USR2
191              
192             These signals start out being ignored and allow for jobs to install custom signal handlers.
193              
194             =head1 EVENTS
195              
196             L inherits all events from L and can emit the following new ones.
197              
198             =head2 busy
199              
200             $worker->on(busy => sub ($worker) {
201             ...
202             });
203              
204             Emitted in the worker process when it is performing the maximum number of jobs in parallel.
205              
206             $worker->on(busy => sub ($worker) {
207             my $max = $worker->status->{jobs};
208             say "Performing $max jobs.";
209             });
210              
211             =head2 dequeue
212              
213             $worker->on(dequeue => sub ($worker, $job) {
214             ...
215             });
216              
217             Emitted in the worker process after a job has been dequeued.
218              
219             $worker->on(dequeue => sub ($worker, $job) {
220             my $id = $job->id;
221             say "Job $id has been dequeued.";
222             });
223              
224             =head2 wait
225              
226             $worker->on(wait => sub ($worker) {
227             ...
228             });
229              
230             Emitted in the worker process before it tries to dequeue a job.
231              
232             $worker->on(wait => sub ($worker) {
233             my $max = $worker->status->{dequeue_timeout};
234             say "Waiting up to $max seconds for a new job.";
235             });
236              
237             =head1 ATTRIBUTES
238              
239             L implements the following attributes.
240              
241             =head2 commands
242              
243             my $commands = $worker->commands;
244             $worker = $worker->commands({jobs => sub {...}});
245              
246             Registered worker remote control commands.
247              
248             =head2 id
249              
250             my $id = $worker->id;
251             $worker = $worker->id($id);
252              
253             Worker id.
254              
255             =head2 minion
256              
257             my $minion = $worker->minion;
258             $worker = $worker->minion(Minion->new);
259              
260             L object this worker belongs to.
261              
262             =head2 status
263              
264             my $status = $worker->status;
265             $worker = $worker->status({queues => ['default', 'important']);
266              
267             Status information to configure workers started with L and to share every time L is called.
268              
269             =head1 METHODS
270              
271             L inherits all methods from L and implements the following new ones.
272              
273             =head2 add_command
274              
275             $worker = $worker->add_command(jobs => sub {...});
276              
277             Register a worker remote control command.
278              
279             $worker->add_command(foo => sub ($worker, @args) {
280             ...
281             });
282              
283             =head2 dequeue
284              
285             my $job = $worker->dequeue(0.5);
286             my $job = $worker->dequeue(0.5 => {queues => ['important']});
287              
288             Wait a given amount of time in seconds for a job, dequeue L object and transition from C to
289             C state, or return C if queues were empty.
290              
291             These options are currently available:
292              
293             =over 2
294              
295             =item id
296              
297             id => '10023'
298              
299             Dequeue a specific job.
300              
301             =item min_priority
302              
303             min_priority => 3
304              
305             Do not dequeue jobs with a lower priority.
306              
307             =item queues
308              
309             queues => ['important']
310              
311             One or more queues to dequeue jobs from, defaults to C.
312              
313             =item tasks
314              
315             tasks => ['foo', 'bar']
316              
317             One or more tasks to dequeue jobs from, defaults to all tasks.
318              
319             =back
320              
321             =head2 info
322              
323             my $info = $worker->info;
324              
325             Get worker information.
326              
327             # Check worker host
328             my $host = $worker->info->{host};
329              
330             These fields are currently available:
331              
332             =over 2
333              
334             =item host
335              
336             host => 'localhost'
337              
338             Worker host.
339              
340             =item jobs
341              
342             jobs => ['10023', '10024', '10025', '10029']
343              
344             Ids of jobs the worker is currently processing.
345              
346             =item notified
347              
348             notified => 784111777
349              
350             Epoch time worker sent the last heartbeat.
351              
352             =item pid
353              
354             pid => 12345
355              
356             Process id of worker.
357              
358             =item started
359              
360             started => 784111777
361              
362             Epoch time worker was started.
363              
364             =item status
365              
366             status => {queues => ['default', 'important']}
367              
368             Hash reference with whatever status information the worker would like to share.
369              
370             =back
371              
372             =head2 new
373              
374             my $worker = Minion::Worker->new;
375             my $worker = Minion::Worker->new(status => {foo => 'bar'});
376             my $worker = Minion::Worker->new({status => {foo => 'bar'}});
377              
378             Construct a new L object and subscribe to L event with default handler that sleeps for one
379             second.
380              
381             =head2 process_commands
382              
383             $worker = $worker->process_commands;
384              
385             Process worker remote control commands.
386              
387             =head2 register
388              
389             $worker = $worker->register;
390              
391             Register worker or send heartbeat to show that this worker is still alive.
392              
393             =head2 run
394              
395             $worker->run;
396              
397             Run worker and wait for L.
398              
399             # Start a worker for a special named queue
400             my $worker = $minion->worker;
401             $worker->status->{queues} = ['important'];
402             $worker->run;
403              
404             These L options are currently available:
405              
406             =over 2
407              
408             =item command_interval
409              
410             command_interval => 20
411              
412             Worker remote control command interval, defaults to C<10>.
413              
414             =item dequeue_timeout
415              
416             dequeue_timeout => 5
417              
418             Maximum amount time in seconds to wait for a job, defaults to C<5>.
419              
420             =item dispatch_interval
421              
422             dispatch_interval => 60
423              
424             Interval in seconds for dispatching due Ld jobs, defaults to C<30>. Set to C<0> to disable schedule
425             dispatching on this worker (for example when a dedicated scheduler worker is used).
426              
427             =item heartbeat_interval
428              
429             heartbeat_interval => 60
430              
431             Heartbeat interval, defaults to C<300>.
432              
433             =item jobs
434              
435             jobs => 12
436              
437             Maximum number of jobs to perform parallel in forked worker processes (not including spare processes), defaults to C<4>.
438              
439             =item queues
440              
441             queues => ['test']
442              
443             One or more queues to get jobs from, defaults to C.
444              
445             =item repair_interval
446              
447             repair_interval => 3600
448              
449             Repair interval, up to half of this value can be subtracted randomly to make sure not all workers repair at the same
450             time, defaults to C<21600> (6 hours).
451              
452             =item spare
453              
454             spare => 2
455              
456             Number of spare worker processes to reserve for high priority jobs, defaults to C<1>.
457              
458             =item spare_min_priority
459              
460             spare_min_priority => 7
461              
462             Minimum priority of jobs to use spare worker processes for, defaults to C<1>.
463              
464             =back
465              
466             These remote control L are currently available:
467              
468             =over 2
469              
470             =item jobs
471              
472             $minion->broadcast('jobs', [10]);
473             $minion->broadcast('jobs', [10], [$worker_id]);
474              
475             Instruct one or more workers to change the number of jobs to perform concurrently. Setting this value to C<0> will
476             effectively pause the worker. That means all current jobs will be finished, but no new ones accepted, until the number
477             is increased again.
478              
479             =item kill
480              
481             $minion->broadcast('kill', ['INT', 10025]);
482             $minion->broadcast('kill', ['INT', 10025], [$worker_id]);
483              
484             Instruct one or more workers to send a signal to a job that is currently being performed. This command will be ignored
485             by workers that do not have a job matching the id. That means it is safe to broadcast this command to all workers.
486              
487             =item stop
488              
489             $minion->broadcast('stop', [10025]);
490             $minion->broadcast('stop', [10025], [$worker_id]);
491              
492             Instruct one or more workers to stop a job that is currently being performed immediately. This command will be ignored
493             by workers that do not have a job matching the id. That means it is safe to broadcast this command to all workers.
494              
495             =back
496              
497             =head2 unregister
498              
499             $worker = $worker->unregister;
500              
501             Unregister worker.
502              
503             =head1 SEE ALSO
504              
505             L, L, L, L, L.
506              
507             =cut