File Coverage

blib/lib/Minion/Job.pm
Criterion Covered Total %
statement 15 64 23.4
branch 0 24 0.0
condition 0 7 0.0
subroutine 5 22 22.7
pod 16 16 100.0
total 36 133 27.0


line stmt bran cond sub pod time code
1             package Minion::Job;
2 2     2   11 use Mojo::Base 'Mojo::EventEmitter';
  2         3  
  2         10  
3              
4 2     2   282 use Carp qw(croak);
  2         3  
  2         70  
5 2     2   391 use Mojo::Collection;
  2         3746  
  2         60  
6 2     2   891 use Mojo::IOLoop;
  2         248002  
  2         8  
7 2     2   80 use POSIX qw(WNOHANG);
  2         3  
  2         14  
8              
9             has [qw(args id minion retries task)];
10              
11 0     0 1   sub app { shift->minion->app }
12              
13             sub execute {
14 0     0 1   my $self = shift;
15 0 0         return eval {
16 0           my $task = $self->minion->tasks->{$self->emit('start')->task};
17 0 0         ref $task ? $self->$task(@{$self->args}) : $self->run(@{$self->args});
  0            
  0            
18 0           !!$self->emit('finish');
19             } ? undef : $@;
20             }
21              
22             sub fail {
23 0   0 0 1   my ($self, $err) = (shift, shift // 'Unknown error');
24 0           my $ok = $self->minion->backend->fail_job($self->id, $self->retries, $err);
25 0 0         return $ok ? !!$self->emit(failed => $err) : undef;
26             }
27              
28             sub finish {
29 0     0 1   my ($self, $result) = @_;
30 0           my $ok = $self->minion->backend->finish_job($self->id, $self->retries, $result);
31 0 0         return $ok ? !!$self->emit(finished => $result) : undef;
32             }
33              
34 0     0 1   sub info { $_[0]->minion->backend->list_jobs(0, 1, {ids => [$_[0]->id]})->{jobs}[0] }
35              
36             sub is_finished {
37 0     0 1   my $self = shift;
38 0 0         return undef unless waitpid($self->{pid}, WNOHANG) == $self->{pid};
39 0 0         $self->_reap($? ? (1, $? >> 8, $? & 127) : ());
40 0           return 1;
41             }
42              
43 0     0 1   sub kill { CORE::kill($_[1], $_[0]->{pid}) }
44              
45             sub note {
46 0     0 1   my $self = shift;
47 0           return $self->minion->backend->note($self->id, {@_});
48             }
49              
50             sub parents {
51 0     0 1   my $self = shift;
52 0           my $minion = $self->minion;
53 0 0 0       return Mojo::Collection->new(map { $minion->job($_) // () } @{($self->info || {})->{parents} || []});
  0   0        
  0            
54             }
55              
56             sub perform {
57 0     0 1   my $self = shift;
58 0           waitpid $self->start->pid, 0;
59 0 0         $self->_reap($? ? (1, $? >> 8, $? & 127) : ());
60             }
61              
62 0     0 1   sub pid { shift->{pid} }
63              
64 0     0 1   sub remove { $_[0]->minion->backend->remove_job($_[0]->id) }
65              
66             sub retry {
67 0     0 1   my $self = shift;
68 0           return $self->minion->backend->retry_job($self->id, $self->retries, @_);
69             }
70              
71 0     0 1   sub run { croak 'Method "run" not implemented by subclass' }
72              
73             sub start {
74 0     0 1   my $self = shift;
75              
76             # Parent
77 0 0         die "Can't fork: $!" unless defined(my $pid = fork);
78 0 0         return $self->emit(spawn => $pid) if $self->{pid} = $pid;
79              
80             # Reset event loop
81 0           Mojo::IOLoop->reset;
82 0           local $SIG{CHLD} = local $SIG{INT} = local $SIG{TERM} = local $SIG{QUIT} = 'DEFAULT';
83 0           local $SIG{USR1} = local $SIG{USR2} = 'IGNORE';
84              
85             # Child
86 0 0         if (defined(my $err = $self->execute)) { $self->fail($err) }
  0            
87 0           $self->emit('cleanup');
88 0           POSIX::_exit(0);
89             }
90              
91 0     0 1   sub stop { shift->kill('KILL') }
92              
93             sub _reap {
94 0     0     my ($self, $term, $exit, $sig) = @_;
95 0           $self->emit(reap => $self->{pid});
96 0 0         $term ? $self->fail("Job terminated unexpectedly (exit code: $exit, signal: $sig)") : $self->finish;
97             }
98              
99             1;
100              
101             =encoding utf8
102              
103             =head1 NAME
104              
105             Minion::Job - Minion job
106              
107             =head1 SYNOPSIS
108              
109             package MyApp::Task::Foo;
110             use Mojo::Base 'Minion::Job', -signatures;
111              
112             sub run ($self, @args) {
113              
114             # Magic here! :)
115             }
116              
117             =head1 DESCRIPTION
118              
119             L is a container for L jobs.
120              
121             =head1 EVENTS
122              
123             L inherits all events from L and can emit the following new ones.
124              
125             =head2 cleanup
126              
127             $job->on(cleanup => sub ($job) {
128             ...
129             });
130              
131             Emitted in the process performing this job right before the process will exit.
132              
133             $job->on(cleanup => sub ($job) {
134             $job->app->log->debug("Process $$ is about to exit");
135             });
136              
137             =head2 failed
138              
139             $job->on(failed => sub ($job, $err) {
140             ...
141             });
142              
143             Emitted in the worker process managing this job or the process performing it, after it has transitioned to the
144             C state.
145              
146             $job->on(failed => sub ($job, $err) {
147             say "Something went wrong: $err";
148             });
149              
150             =head2 finish
151              
152             $job->on(finish => sub ($job) {
153             ...
154             });
155              
156             Emitted in the process performing this job if the task was successful.
157              
158             $job->on(finish => sub ($job) {
159             my $id = $job->id;
160             my $task = $job->task;
161             $job->app->log->debug(qq{Job "$id" was performed with task "$task"});
162             });
163              
164             =head2 finished
165              
166             $job->on(finished => sub ($job, $result) {
167             ...
168             });
169              
170             Emitted in the worker process managing this job or the process performing it, after it has transitioned to the
171             C state.
172              
173             $job->on(finished => sub ($job, $result) {
174             my $id = $job->id;
175             say "Job $id is finished.";
176             });
177              
178             =head2 reap
179              
180             $job->on(reap => sub ($job, $pid) {
181             ...
182             });
183              
184             Emitted in the worker process managing this job, after the process performing it has exited.
185              
186             $job->on(reap => sub ($job, $pid) {
187             my $id = $job->id;
188             say "Job $id ran in process $pid";
189             });
190              
191             =head2 spawn
192              
193             $job->on(spawn => sub ($job, $pid) {
194             ...
195             });
196              
197             Emitted in the worker process managing this job, after a new process has been spawned for processing.
198              
199             $job->on(spawn => sub ($job, $pid) {
200             my $id = $job->id;
201             say "Job $id running in process $pid";
202             });
203              
204             =head2 start
205              
206             $job->on(start => sub ($job) {
207             ...
208             });
209              
210             Emitted in the process performing this job, after it has been spawned.
211              
212             $job->on(start => sub ($job) {
213             $0 = $job->id;
214             });
215              
216             =head1 ATTRIBUTES
217              
218             L implements the following attributes.
219              
220             =head2 args
221              
222             my $args = $job->args;
223             $job = $job->args([]);
224              
225             Arguments passed to task.
226              
227             =head2 id
228              
229             my $id = $job->id;
230             $job = $job->id($id);
231              
232             Job id.
233              
234             =head2 minion
235              
236             my $minion = $job->minion;
237             $job = $job->minion(Minion->new);
238              
239             L object this job belongs to.
240              
241             =head2 retries
242              
243             my $retries = $job->retries;
244             $job = $job->retries(5);
245              
246             Number of times job has been retried.
247              
248             =head2 task
249              
250             my $task = $job->task;
251             $job = $job->task('foo');
252              
253             Task name.
254              
255             =head1 METHODS
256              
257             L inherits all methods from L and implements the following new ones.
258              
259             =head2 app
260              
261             my $app = $job->app;
262              
263             Get application from L.
264              
265             # Longer version
266             my $app = $job->minion->app;
267              
268             =head2 execute
269              
270             my $err = $job->execute;
271              
272             Perform job in this process and return C if the task was successful or an exception otherwise. Note that this
273             method should only be used to implement custom workers.
274              
275             # Perform job in foreground
276             if (my $err = $job->execute) { $job->fail($err) }
277             else { $job->finish }
278              
279             =head2 fail
280              
281             my $bool = $job->fail;
282             my $bool = $job->fail('Something went wrong!');
283             my $bool = $job->fail({whatever => 'Something went wrong!'});
284              
285             Transition from C to C state with or without a result, and if there are attempts remaining, transition
286             back to C with a delay based on L.
287              
288             =head2 finish
289              
290             my $bool = $job->finish;
291             my $bool = $job->finish('All went well!');
292             my $bool = $job->finish({whatever => 'All went well!'});
293              
294             Transition from C to C state with or without a result.
295              
296             =head2 info
297              
298             my $info = $job->info;
299              
300             Get job information.
301              
302             # Check job state
303             my $state = $job->info->{state};
304              
305             # Get job metadata
306             my $progress = $job->info->{notes}{progress};
307              
308             # Get job result
309             my $result = $job->info->{result};
310              
311             These fields are currently available:
312              
313             =over 2
314              
315             =item args
316              
317             args => ['foo', 'bar']
318              
319             Job arguments.
320              
321             =item attempts
322              
323             attempts => 25
324              
325             Number of times performing this job will be attempted.
326              
327             =item children
328              
329             children => ['10026', '10027', '10028']
330              
331             Jobs depending on this job.
332              
333             =item created
334              
335             created => 784111777
336              
337             Epoch time job was created.
338              
339             =item delayed
340              
341             delayed => 784111777
342              
343             Epoch time job was delayed to.
344              
345             =item expires
346              
347             expires => 784111777
348              
349             Epoch time job is valid until before it expires.
350              
351             =item finished
352              
353             finished => 784111777
354              
355             Epoch time job was finished.
356              
357             =item lax
358              
359             lax => 0
360              
361             Existing jobs this job depends on may also have failed to allow for it to be processed.
362              
363             =item notes
364              
365             notes => {foo => 'bar', baz => [1, 2, 3]}
366              
367             Hash reference with arbitrary metadata for this job.
368              
369             =item parents
370              
371             parents => ['10023', '10024', '10025']
372              
373             Jobs this job depends on.
374              
375             =item priority
376              
377             priority => 3
378              
379             Job priority.
380              
381             =item queue
382              
383             queue => 'important'
384              
385             Queue name.
386              
387             =item result
388              
389             result => 'All went well!'
390              
391             Job result.
392              
393             =item retried
394              
395             retried => 784111777
396              
397             Epoch time job has been retried.
398              
399             =item retries
400              
401             retries => 3
402              
403             Number of times job has been retried.
404              
405             =item started
406              
407             started => 784111777
408              
409             Epoch time job was started.
410              
411             =item state
412              
413             state => 'inactive'
414              
415             Current job state, usually C, C, C or C.
416              
417             =item task
418              
419             task => 'foo'
420              
421             Task name.
422              
423             =item time
424              
425             time => 784111777
426              
427             Server time.
428              
429             =item worker
430              
431             worker => '154'
432              
433             Id of worker that is processing the job.
434              
435             =back
436              
437             =head2 is_finished
438              
439             my $bool = $job->is_finished;
440              
441             Check if job performed with L is finished. Note that this method should only be used to implement custom
442             workers.
443              
444             =head2 kill
445              
446             $job->kill('INT');
447              
448             Send a signal to job performed with L. Note that this method should only be used to implement custom workers.
449              
450             =head2 note
451              
452             my $bool = $job->note(mojo => 'rocks', minion => 'too');
453              
454             Change one or more metadata fields for this job. Setting a value to C will remove the field. The new values will
455             get serialized by L (often with L), so you shouldn't send objects and be careful with
456             binary data, nested data structures with hash and array references are fine though.
457              
458             # Share progress information
459             $job->note(progress => 95);
460              
461             # Share stats
462             $job->note(stats => {utime => '0.012628', stime => '0.002429'});
463              
464             =head2 parents
465              
466             my $parents = $job->parents;
467              
468             Return a L object containing all jobs this job depends on as L objects.
469              
470             # Check parent state
471             for my $parent ($job->parents->each) {
472             my $info = $parent->info;
473             say "$info->{id}: $info->{state}";
474             }
475              
476             =head2 perform
477              
478             $job->perform;
479              
480             Perform job in new process and wait for it to finish. Note that this method should only be used to implement custom
481             workers.
482              
483             =head2 pid
484              
485             my $pid = $job->pid;
486              
487             Process id of the process spawned by L if available. Note that this method should only be used to implement
488             custom workers.
489              
490             =head2 remove
491              
492             my $bool = $job->remove;
493              
494             Remove C, C or C job from queue.
495              
496             =head2 retry
497              
498             my $bool = $job->retry;
499             my $bool = $job->retry({delay => 10});
500              
501             Transition job back to C state, already C jobs may also be retried to change options.
502              
503             These options are currently available:
504              
505             =over 2
506              
507             =item attempts
508              
509             attempts => 25
510              
511             Number of times performing this job will be attempted.
512              
513             =item delay
514              
515             delay => 10
516              
517             Delay job for this many seconds (from now), defaults to C<0>.
518              
519             =item expire
520              
521             expire => 300
522              
523             Job is valid for this many seconds (from now) before it expires.
524              
525             =item lax
526              
527             lax => 1
528              
529             Existing jobs this job depends on may also have transitioned to the C state to allow for it to be processed,
530             defaults to C. Note that this option is B and might change without warning!
531              
532             =item parents
533              
534             parents => [$id1, $id2, $id3]
535              
536             Jobs this job depends on.
537              
538             =item priority
539              
540             priority => 5
541              
542             Job priority.
543              
544             =item queue
545              
546             queue => 'important'
547              
548             Queue to put job in.
549              
550             =back
551              
552             =head2 run
553              
554             $job->run(@args);
555              
556             Task to perform by this job. Meant to be overloaded in a subclass to create a custom task class. Note that this method
557             is B and might change without warning!
558              
559             =head2 start
560              
561             $job = $job->start;
562              
563             Perform job in new process, but do not wait for it to finish. Note that this method should only be used to implement
564             custom workers.
565              
566             # Perform two jobs concurrently
567             $job1->start;
568             $job2->start;
569             my ($first, $second);
570             sleep 1
571             until $first ||= $job1->is_finished and $second ||= $job2->is_finished;
572              
573             =head2 stop
574              
575             $job->stop;
576              
577             Stop job performed with L immediately. Note that this method should only be used to implement custom workers.
578              
579             =head1 SEE ALSO
580              
581             L, L, L, L, L.
582              
583             =cut