File Coverage

blib/lib/Minion/Job.pm
Criterion Covered Total %
statement 15 67 22.3
branch 0 24 0.0
condition 0 7 0.0
subroutine 5 22 22.7
pod 16 16 100.0
total 36 136 26.4


line stmt bran cond sub pod time code
1             package Minion::Job;
2 2     2   11 use Mojo::Base 'Mojo::EventEmitter';
  2         2  
  2         10  
3              
4 2     2   394 use Carp qw(croak);
  2         4  
  2         104  
5 2     2   365 use Mojo::Collection;
  2         166963  
  2         72  
6 2     2   898 use Mojo::IOLoop;
  2         291920  
  2         40  
7 2     2   103 use POSIX qw(WNOHANG);
  2         6  
  2         17  
8              
9             has [qw(args id minion retries task)];
10              
11 0     0 1   sub app { shift->minion->app }
12              
13             sub execute {
14 0     0 1   my $self = shift;
15 0 0         return eval {
16 0           my $task = $self->minion->tasks->{$self->emit('start')->task};
17 0 0         ref $task ? $self->$task(@{$self->args}) : $self->run(@{$self->args});
  0            
  0            
18 0           !!$self->emit('finish');
19             } ? undef : $@;
20             }
21              
22             sub fail {
23 0   0 0 1   my ($self, $err) = (shift, shift // 'Unknown error');
24 0           my $ok = $self->minion->backend->fail_job($self->id, $self->retries, $err);
25 0 0         return $ok ? !!$self->emit(failed => $err) : undef;
26             }
27              
28             sub finish {
29 0     0 1   my ($self, $result) = @_;
30 0           my $ok = $self->minion->backend->finish_job($self->id, $self->retries, $result);
31 0 0         return $ok ? !!$self->emit(finished => $result) : undef;
32             }
33              
34 0     0 1   sub info { $_[0]->minion->backend->list_jobs(0, 1, {ids => [$_[0]->id]})->{jobs}[0] }
35              
36             sub is_finished {
37 0     0 1   my $self = shift;
38 0 0         return undef unless waitpid($self->{pid}, WNOHANG) == $self->{pid};
39 0 0         $self->_reap($? ? (1, $? >> 8, $? & 127) : ());
40 0           return 1;
41             }
42              
43 0     0 1   sub kill { CORE::kill($_[1], $_[0]->{pid}) }
44              
45             sub note {
46 0     0 1   my $self = shift;
47 0           return $self->minion->backend->note($self->id, {@_});
48             }
49              
50             sub parents {
51 0     0 1   my $self = shift;
52 0           my $minion = $self->minion;
53 0 0 0       return Mojo::Collection->new(map { $minion->job($_) // () } @{($self->info || {})->{parents} || []});
  0   0        
  0            
54             }
55              
56             sub perform {
57 0     0 1   my $self = shift;
58 0           waitpid $self->start->pid, 0;
59 0 0         $self->_reap($? ? (1, $? >> 8, $? & 127) : ());
60             }
61              
62 0     0 1   sub pid { shift->{pid} }
63              
64 0     0 1   sub remove { $_[0]->minion->backend->remove_job($_[0]->id) }
65              
66             sub retry {
67 0     0 1   my $self = shift;
68 0           return $self->minion->backend->retry_job($self->id, $self->retries, @_);
69             }
70              
71 0     0 1   sub run { croak 'Method "run" not implemented by subclass' }
72              
73             sub start {
74 0     0 1   my $self = shift;
75              
76             # Parent
77 0 0         die "Can't fork: $!" unless defined(my $pid = fork);
78 0 0         if ($self->{pid} = $pid) {
79 0           $self->note(minion_pid => $pid);
80 0           return $self->emit(spawn => $pid);
81             }
82              
83             # Reset event loop
84 0           Mojo::IOLoop->reset;
85 0           local $SIG{CHLD} = local $SIG{INT} = local $SIG{TERM} = local $SIG{QUIT} = 'DEFAULT';
86 0           local $SIG{USR1} = local $SIG{USR2} = 'IGNORE';
87 0           srand;
88              
89             # Child
90 0 0         if (defined(my $err = $self->execute)) { $self->fail($err) }
  0            
91 0           $self->emit('cleanup');
92 0           POSIX::_exit(0);
93             }
94              
95 0     0 1   sub stop { shift->kill('KILL') }
96              
97             sub _reap {
98 0     0     my ($self, $term, $exit, $sig) = @_;
99 0           $self->emit(reap => $self->{pid});
100 0 0         $term ? $self->fail("Job terminated unexpectedly (exit code: $exit, signal: $sig)") : $self->finish;
101             }
102              
103             1;
104              
105             =encoding utf8
106              
107             =head1 NAME
108              
109             Minion::Job - Minion job
110              
111             =head1 SYNOPSIS
112              
113             package MyApp::Task::Foo;
114             use Mojo::Base 'Minion::Job', -signatures;
115              
116             sub run ($self, @args) {
117              
118             # Magic here! :)
119             }
120              
121             =head1 DESCRIPTION
122              
123             L is a container for L jobs.
124              
125             =head1 EVENTS
126              
127             L inherits all events from L and can emit the following new ones.
128              
129             =head2 cleanup
130              
131             $job->on(cleanup => sub ($job) {
132             ...
133             });
134              
135             Emitted in the process performing this job right before the process will exit.
136              
137             $job->on(cleanup => sub ($job) {
138             $job->app->log->debug("Process $$ is about to exit");
139             });
140              
141             =head2 failed
142              
143             $job->on(failed => sub ($job, $err) {
144             ...
145             });
146              
147             Emitted in the worker process managing this job or the process performing it, after it has transitioned to the
148             C state.
149              
150             $job->on(failed => sub ($job, $err) {
151             say "Something went wrong: $err";
152             });
153              
154             =head2 finish
155              
156             $job->on(finish => sub ($job) {
157             ...
158             });
159              
160             Emitted in the process performing this job if the task was successful.
161              
162             $job->on(finish => sub ($job) {
163             my $id = $job->id;
164             my $task = $job->task;
165             $job->app->log->debug(qq{Job "$id" was performed with task "$task"});
166             });
167              
168             =head2 finished
169              
170             $job->on(finished => sub ($job, $result) {
171             ...
172             });
173              
174             Emitted in the worker process managing this job or the process performing it, after it has transitioned to the
175             C state.
176              
177             $job->on(finished => sub ($job, $result) {
178             my $id = $job->id;
179             say "Job $id is finished.";
180             });
181              
182             =head2 reap
183              
184             $job->on(reap => sub ($job, $pid) {
185             ...
186             });
187              
188             Emitted in the worker process managing this job, after the process performing it has exited.
189              
190             $job->on(reap => sub ($job, $pid) {
191             my $id = $job->id;
192             say "Job $id ran in process $pid";
193             });
194              
195             =head2 spawn
196              
197             $job->on(spawn => sub ($job, $pid) {
198             ...
199             });
200              
201             Emitted in the worker process managing this job, after a new process has been spawned for processing.
202              
203             $job->on(spawn => sub ($job, $pid) {
204             my $id = $job->id;
205             say "Job $id running in process $pid";
206             });
207              
208             =head2 start
209              
210             $job->on(start => sub ($job) {
211             ...
212             });
213              
214             Emitted in the process performing this job, after it has been spawned.
215              
216             $job->on(start => sub ($job) {
217             $0 = $job->id;
218             });
219              
220             =head1 ATTRIBUTES
221              
222             L implements the following attributes.
223              
224             =head2 args
225              
226             my $args = $job->args;
227             $job = $job->args([]);
228              
229             Arguments passed to task.
230              
231             =head2 id
232              
233             my $id = $job->id;
234             $job = $job->id($id);
235              
236             Job id.
237              
238             =head2 minion
239              
240             my $minion = $job->minion;
241             $job = $job->minion(Minion->new);
242              
243             L object this job belongs to.
244              
245             =head2 retries
246              
247             my $retries = $job->retries;
248             $job = $job->retries(5);
249              
250             Number of times job has been retried.
251              
252             =head2 task
253              
254             my $task = $job->task;
255             $job = $job->task('foo');
256              
257             Task name.
258              
259             =head1 METHODS
260              
261             L inherits all methods from L and implements the following new ones.
262              
263             =head2 app
264              
265             my $app = $job->app;
266              
267             Get application from L.
268              
269             # Longer version
270             my $app = $job->minion->app;
271              
272             =head2 execute
273              
274             my $err = $job->execute;
275              
276             Perform job in this process and return C if the task was successful or an exception otherwise. Note that this
277             method should only be used to implement custom workers.
278              
279             # Perform job in foreground
280             if (my $err = $job->execute) { $job->fail($err) }
281             else { $job->finish }
282              
283             =head2 fail
284              
285             my $bool = $job->fail;
286             my $bool = $job->fail('Something went wrong!');
287             my $bool = $job->fail({whatever => 'Something went wrong!'});
288              
289             Transition from C to C state with or without a result, and if there are attempts remaining, transition
290             back to C with a delay based on L.
291              
292             =head2 finish
293              
294             my $bool = $job->finish;
295             my $bool = $job->finish('All went well!');
296             my $bool = $job->finish({whatever => 'All went well!'});
297              
298             Transition from C to C state with or without a result.
299              
300             =head2 info
301              
302             my $info = $job->info;
303              
304             Get job information.
305              
306             # Check job state
307             my $state = $job->info->{state};
308              
309             # Get job metadata
310             my $progress = $job->info->{notes}{progress};
311              
312             # Get job result
313             my $result = $job->info->{result};
314              
315             These fields are currently available:
316              
317             =over 2
318              
319             =item args
320              
321             args => ['foo', 'bar']
322              
323             Job arguments.
324              
325             =item attempts
326              
327             attempts => 25
328              
329             Number of times performing this job will be attempted.
330              
331             =item children
332              
333             children => ['10026', '10027', '10028']
334              
335             Jobs depending on this job.
336              
337             =item created
338              
339             created => 784111777
340              
341             Epoch time job was created.
342              
343             =item delayed
344              
345             delayed => 784111777
346              
347             Epoch time job was delayed to.
348              
349             =item expires
350              
351             expires => 784111777
352              
353             Epoch time job is valid until before it expires.
354              
355             =item finished
356              
357             finished => 784111777
358              
359             Epoch time job was finished.
360              
361             =item lax
362              
363             lax => 0
364              
365             Existing jobs this job depends on may also have failed to allow for it to be processed.
366              
367             =item notes
368              
369             notes => {foo => 'bar', baz => [1, 2, 3]}
370              
371             Hash reference with arbitrary metadata for this job.
372              
373             =item parents
374              
375             parents => ['10023', '10024', '10025']
376              
377             Jobs this job depends on.
378              
379             =item priority
380              
381             priority => 3
382              
383             Job priority.
384              
385             =item queue
386              
387             queue => 'important'
388              
389             Queue name.
390              
391             =item result
392              
393             result => 'All went well!'
394              
395             Job result.
396              
397             =item retried
398              
399             retried => 784111777
400              
401             Epoch time job has been retried.
402              
403             =item retries
404              
405             retries => 3
406              
407             Number of times job has been retried.
408              
409             =item started
410              
411             started => 784111777
412              
413             Epoch time job was started.
414              
415             =item state
416              
417             state => 'inactive'
418              
419             Current job state, usually C, C, C or C.
420              
421             =item task
422              
423             task => 'foo'
424              
425             Task name.
426              
427             =item time
428              
429             time => 784111777
430              
431             Server time.
432              
433             =item worker
434              
435             worker => '154'
436              
437             Id of worker that is processing the job.
438              
439             =back
440              
441             =head2 is_finished
442              
443             my $bool = $job->is_finished;
444              
445             Check if job performed with L is finished. Note that this method should only be used to implement custom
446             workers.
447              
448             =head2 kill
449              
450             $job->kill('INT');
451              
452             Send a signal to job performed with L. Note that this method should only be used to implement custom workers.
453              
454             =head2 note
455              
456             my $bool = $job->note(mojo => 'rocks', minion => 'too');
457              
458             Change one or more metadata fields for this job. Setting a value to C will remove the field. The new values will
459             get serialized by L (often with L), so you shouldn't send objects and be careful with
460             binary data, nested data structures with hash and array references are fine though.
461              
462             # Share progress information
463             $job->note(progress => 95);
464              
465             # Share stats
466             $job->note(stats => {utime => '0.012628', stime => '0.002429'});
467              
468             =head2 parents
469              
470             my $parents = $job->parents;
471              
472             Return a L object containing all jobs this job depends on as L objects.
473              
474             # Check parent state
475             for my $parent ($job->parents->each) {
476             my $info = $parent->info;
477             say "$info->{id}: $info->{state}";
478             }
479              
480             =head2 perform
481              
482             $job->perform;
483              
484             Perform job in new process and wait for it to finish. Note that this method should only be used to implement custom
485             workers.
486              
487             =head2 pid
488              
489             my $pid = $job->pid;
490              
491             Process id of the process spawned by L if available. Note that this method should only be used to implement
492             custom workers.
493              
494             =head2 remove
495              
496             my $bool = $job->remove;
497              
498             Remove C, C or C job from queue.
499              
500             =head2 retry
501              
502             my $bool = $job->retry;
503             my $bool = $job->retry({delay => 10});
504              
505             Transition job back to C state, already C jobs may also be retried to change options.
506              
507             These options are currently available:
508              
509             =over 2
510              
511             =item attempts
512              
513             attempts => 25
514              
515             Number of times performing this job will be attempted.
516              
517             =item delay
518              
519             delay => 10
520              
521             Delay job for this many seconds (from now), defaults to C<0>.
522              
523             =item expire
524              
525             expire => 300
526              
527             Job is valid for this many seconds (from now) before it expires.
528              
529             =item lax
530              
531             lax => 1
532              
533             Existing jobs this job depends on may also have transitioned to the C state to allow for it to be processed,
534             defaults to C.
535              
536             =item parents
537              
538             parents => [$id1, $id2, $id3]
539              
540             Jobs this job depends on.
541              
542             =item priority
543              
544             priority => 5
545              
546             Job priority.
547              
548             =item queue
549              
550             queue => 'important'
551              
552             Queue to put job in.
553              
554             =back
555              
556             =head2 run
557              
558             $job->run(@args);
559              
560             Task to perform by this job. Meant to be overloaded in a subclass to create a custom task class. Note that this method
561             is B and might change without warning!
562              
563             =head2 start
564              
565             $job = $job->start;
566              
567             Perform job in new process, but do not wait for it to finish. Note that this method should only be used to implement
568             custom workers.
569              
570             # Perform two jobs concurrently
571             $job1->start;
572             $job2->start;
573             my ($first, $second);
574             sleep 1
575             until $first ||= $job1->is_finished and $second ||= $job2->is_finished;
576              
577             =head2 stop
578              
579             $job->stop;
580              
581             Stop job performed with L immediately. Note that this method should only be used to implement custom workers.
582              
583             =head1 SEE ALSO
584              
585             L, L, L, L, L.
586              
587             =cut