File Coverage

blib/lib/Minion/Job.pm
Criterion Covered Total %
statement 15 65 23.0
branch 0 24 0.0
condition 0 7 0.0
subroutine 5 22 22.7
pod 16 16 100.0
total 36 134 26.8


line stmt bran cond sub pod time code
1             package Minion::Job;
2 2     2   12 use Mojo::Base 'Mojo::EventEmitter';
  2         5  
  2         15  
3              
4 2     2   556 use Carp qw(croak);
  2         4  
  2         160  
5 2     2   683 use Mojo::Collection;
  2         215032  
  2         78  
6 2     2   1175 use Mojo::IOLoop;
  2         401163  
  2         29  
7 2     2   135 use POSIX qw(WNOHANG);
  2         3  
  2         25  
8              
9             has [qw(args id minion retries task)];
10              
11 0     0 1   sub app { shift->minion->app }
12              
13             sub execute {
14 0     0 1   my $self = shift;
15 0 0         return eval {
16 0           my $task = $self->minion->tasks->{$self->emit('start')->task};
17 0 0         ref $task ? $self->$task(@{$self->args}) : $self->run(@{$self->args});
  0            
  0            
18 0           !!$self->emit('finish');
19             } ? undef : $@;
20             }
21              
22             sub fail {
23 0   0 0 1   my ($self, $err) = (shift, shift // 'Unknown error');
24 0           my $ok = $self->minion->backend->fail_job($self->id, $self->retries, $err);
25 0 0         return $ok ? !!$self->emit(failed => $err) : undef;
26             }
27              
28             sub finish {
29 0     0 1   my ($self, $result) = @_;
30 0           my $ok = $self->minion->backend->finish_job($self->id, $self->retries, $result);
31 0 0         return $ok ? !!$self->emit(finished => $result) : undef;
32             }
33              
34 0     0 1   sub info { $_[0]->minion->backend->list_jobs(0, 1, {ids => [$_[0]->id]})->{jobs}[0] }
35              
36             sub is_finished {
37 0     0 1   my $self = shift;
38 0 0         return undef unless waitpid($self->{pid}, WNOHANG) == $self->{pid};
39 0 0         $self->_reap($? ? (1, $? >> 8, $? & 127) : ());
40 0           return 1;
41             }
42              
43 0     0 1   sub kill { CORE::kill($_[1], $_[0]->{pid}) }
44              
45             sub note {
46 0     0 1   my $self = shift;
47 0           return $self->minion->backend->note($self->id, {@_});
48             }
49              
50             sub parents {
51 0     0 1   my $self = shift;
52 0           my $minion = $self->minion;
53 0 0 0       return Mojo::Collection->new(map { $minion->job($_) // () } @{($self->info || {})->{parents} || []});
  0   0        
  0            
54             }
55              
56             sub perform {
57 0     0 1   my $self = shift;
58 0           waitpid $self->start->pid, 0;
59 0 0         $self->_reap($? ? (1, $? >> 8, $? & 127) : ());
60             }
61              
62 0     0 1   sub pid { shift->{pid} }
63              
64 0     0 1   sub remove { $_[0]->minion->backend->remove_job($_[0]->id) }
65              
66             sub retry {
67 0     0 1   my $self = shift;
68 0           return $self->minion->backend->retry_job($self->id, $self->retries, @_);
69             }
70              
71 0     0 1   sub run { croak 'Method "run" not implemented by subclass' }
72              
73             sub start {
74 0     0 1   my $self = shift;
75              
76             # Parent
77 0 0         die "Can't fork: $!" unless defined(my $pid = fork);
78 0 0         return $self->emit(spawn => $pid) if $self->{pid} = $pid;
79              
80             # Reset event loop
81 0           Mojo::IOLoop->reset;
82 0           local $SIG{CHLD} = local $SIG{INT} = local $SIG{TERM} = local $SIG{QUIT} = 'DEFAULT';
83 0           local $SIG{USR1} = local $SIG{USR2} = 'IGNORE';
84 0           srand;
85              
86             # Child
87 0 0         if (defined(my $err = $self->execute)) { $self->fail($err) }
  0            
88 0           $self->emit('cleanup');
89 0           POSIX::_exit(0);
90             }
91              
92 0     0 1   sub stop { shift->kill('KILL') }
93              
94             sub _reap {
95 0     0     my ($self, $term, $exit, $sig) = @_;
96 0           $self->emit(reap => $self->{pid});
97 0 0         $term ? $self->fail("Job terminated unexpectedly (exit code: $exit, signal: $sig)") : $self->finish;
98             }
99              
100             1;
101              
102             =encoding utf8
103              
104             =head1 NAME
105              
106             Minion::Job - Minion job
107              
108             =head1 SYNOPSIS
109              
110             package MyApp::Task::Foo;
111             use Mojo::Base 'Minion::Job', -signatures;
112              
113             sub run ($self, @args) {
114              
115             # Magic here! :)
116             }
117              
118             =head1 DESCRIPTION
119              
120             L is a container for L jobs.
121              
122             =head1 EVENTS
123              
124             L inherits all events from L and can emit the following new ones.
125              
126             =head2 cleanup
127              
128             $job->on(cleanup => sub ($job) {
129             ...
130             });
131              
132             Emitted in the process performing this job right before the process will exit.
133              
134             $job->on(cleanup => sub ($job) {
135             $job->app->log->debug("Process $$ is about to exit");
136             });
137              
138             =head2 failed
139              
140             $job->on(failed => sub ($job, $err) {
141             ...
142             });
143              
144             Emitted in the worker process managing this job or the process performing it, after it has transitioned to the
145             C state.
146              
147             $job->on(failed => sub ($job, $err) {
148             say "Something went wrong: $err";
149             });
150              
151             =head2 finish
152              
153             $job->on(finish => sub ($job) {
154             ...
155             });
156              
157             Emitted in the process performing this job if the task was successful.
158              
159             $job->on(finish => sub ($job) {
160             my $id = $job->id;
161             my $task = $job->task;
162             $job->app->log->debug(qq{Job "$id" was performed with task "$task"});
163             });
164              
165             =head2 finished
166              
167             $job->on(finished => sub ($job, $result) {
168             ...
169             });
170              
171             Emitted in the worker process managing this job or the process performing it, after it has transitioned to the
172             C state.
173              
174             $job->on(finished => sub ($job, $result) {
175             my $id = $job->id;
176             say "Job $id is finished.";
177             });
178              
179             =head2 reap
180              
181             $job->on(reap => sub ($job, $pid) {
182             ...
183             });
184              
185             Emitted in the worker process managing this job, after the process performing it has exited.
186              
187             $job->on(reap => sub ($job, $pid) {
188             my $id = $job->id;
189             say "Job $id ran in process $pid";
190             });
191              
192             =head2 spawn
193              
194             $job->on(spawn => sub ($job, $pid) {
195             ...
196             });
197              
198             Emitted in the worker process managing this job, after a new process has been spawned for processing.
199              
200             $job->on(spawn => sub ($job, $pid) {
201             my $id = $job->id;
202             say "Job $id running in process $pid";
203             });
204              
205             =head2 start
206              
207             $job->on(start => sub ($job) {
208             ...
209             });
210              
211             Emitted in the process performing this job, after it has been spawned.
212              
213             $job->on(start => sub ($job) {
214             $0 = $job->id;
215             });
216              
217             =head1 ATTRIBUTES
218              
219             L implements the following attributes.
220              
221             =head2 args
222              
223             my $args = $job->args;
224             $job = $job->args([]);
225              
226             Arguments passed to task.
227              
228             =head2 id
229              
230             my $id = $job->id;
231             $job = $job->id($id);
232              
233             Job id.
234              
235             =head2 minion
236              
237             my $minion = $job->minion;
238             $job = $job->minion(Minion->new);
239              
240             L object this job belongs to.
241              
242             =head2 retries
243              
244             my $retries = $job->retries;
245             $job = $job->retries(5);
246              
247             Number of times job has been retried.
248              
249             =head2 task
250              
251             my $task = $job->task;
252             $job = $job->task('foo');
253              
254             Task name.
255              
256             =head1 METHODS
257              
258             L inherits all methods from L and implements the following new ones.
259              
260             =head2 app
261              
262             my $app = $job->app;
263              
264             Get application from L.
265              
266             # Longer version
267             my $app = $job->minion->app;
268              
269             =head2 execute
270              
271             my $err = $job->execute;
272              
273             Perform job in this process and return C if the task was successful or an exception otherwise. Note that this
274             method should only be used to implement custom workers.
275              
276             # Perform job in foreground
277             if (my $err = $job->execute) { $job->fail($err) }
278             else { $job->finish }
279              
280             =head2 fail
281              
282             my $bool = $job->fail;
283             my $bool = $job->fail('Something went wrong!');
284             my $bool = $job->fail({whatever => 'Something went wrong!'});
285              
286             Transition from C to C state with or without a result, and if there are attempts remaining, transition
287             back to C with a delay based on L.
288              
289             =head2 finish
290              
291             my $bool = $job->finish;
292             my $bool = $job->finish('All went well!');
293             my $bool = $job->finish({whatever => 'All went well!'});
294              
295             Transition from C to C state with or without a result.
296              
297             =head2 info
298              
299             my $info = $job->info;
300              
301             Get job information.
302              
303             # Check job state
304             my $state = $job->info->{state};
305              
306             # Get job metadata
307             my $progress = $job->info->{notes}{progress};
308              
309             # Get job result
310             my $result = $job->info->{result};
311              
312             These fields are currently available:
313              
314             =over 2
315              
316             =item args
317              
318             args => ['foo', 'bar']
319              
320             Job arguments.
321              
322             =item attempts
323              
324             attempts => 25
325              
326             Number of times performing this job will be attempted.
327              
328             =item children
329              
330             children => ['10026', '10027', '10028']
331              
332             Jobs depending on this job.
333              
334             =item created
335              
336             created => 784111777
337              
338             Epoch time job was created.
339              
340             =item delayed
341              
342             delayed => 784111777
343              
344             Epoch time job was delayed to.
345              
346             =item expires
347              
348             expires => 784111777
349              
350             Epoch time job is valid until before it expires.
351              
352             =item finished
353              
354             finished => 784111777
355              
356             Epoch time job was finished.
357              
358             =item lax
359              
360             lax => 0
361              
362             Existing jobs this job depends on may also have failed to allow for it to be processed.
363              
364             =item notes
365              
366             notes => {foo => 'bar', baz => [1, 2, 3]}
367              
368             Hash reference with arbitrary metadata for this job.
369              
370             =item parents
371              
372             parents => ['10023', '10024', '10025']
373              
374             Jobs this job depends on.
375              
376             =item priority
377              
378             priority => 3
379              
380             Job priority.
381              
382             =item queue
383              
384             queue => 'important'
385              
386             Queue name.
387              
388             =item result
389              
390             result => 'All went well!'
391              
392             Job result.
393              
394             =item retried
395              
396             retried => 784111777
397              
398             Epoch time job has been retried.
399              
400             =item retries
401              
402             retries => 3
403              
404             Number of times job has been retried.
405              
406             =item started
407              
408             started => 784111777
409              
410             Epoch time job was started.
411              
412             =item state
413              
414             state => 'inactive'
415              
416             Current job state, usually C, C, C or C.
417              
418             =item task
419              
420             task => 'foo'
421              
422             Task name.
423              
424             =item time
425              
426             time => 784111777
427              
428             Server time.
429              
430             =item worker
431              
432             worker => '154'
433              
434             Id of worker that is processing the job.
435              
436             =back
437              
438             =head2 is_finished
439              
440             my $bool = $job->is_finished;
441              
442             Check if job performed with L is finished. Note that this method should only be used to implement custom
443             workers.
444              
445             =head2 kill
446              
447             $job->kill('INT');
448              
449             Send a signal to job performed with L. Note that this method should only be used to implement custom workers.
450              
451             =head2 note
452              
453             my $bool = $job->note(mojo => 'rocks', minion => 'too');
454              
455             Change one or more metadata fields for this job. Setting a value to C will remove the field. The new values will
456             get serialized by L (often with L), so you shouldn't send objects and be careful with
457             binary data, nested data structures with hash and array references are fine though.
458              
459             # Share progress information
460             $job->note(progress => 95);
461              
462             # Share stats
463             $job->note(stats => {utime => '0.012628', stime => '0.002429'});
464              
465             =head2 parents
466              
467             my $parents = $job->parents;
468              
469             Return a L object containing all jobs this job depends on as L objects.
470              
471             # Check parent state
472             for my $parent ($job->parents->each) {
473             my $info = $parent->info;
474             say "$info->{id}: $info->{state}";
475             }
476              
477             =head2 perform
478              
479             $job->perform;
480              
481             Perform job in new process and wait for it to finish. Note that this method should only be used to implement custom
482             workers.
483              
484             =head2 pid
485              
486             my $pid = $job->pid;
487              
488             Process id of the process spawned by L if available. Note that this method should only be used to implement
489             custom workers.
490              
491             =head2 remove
492              
493             my $bool = $job->remove;
494              
495             Remove C, C or C job from queue.
496              
497             =head2 retry
498              
499             my $bool = $job->retry;
500             my $bool = $job->retry({delay => 10});
501              
502             Transition job back to C state, already C jobs may also be retried to change options.
503              
504             These options are currently available:
505              
506             =over 2
507              
508             =item attempts
509              
510             attempts => 25
511              
512             Number of times performing this job will be attempted.
513              
514             =item delay
515              
516             delay => 10
517              
518             Delay job for this many seconds (from now), defaults to C<0>.
519              
520             =item expire
521              
522             expire => 300
523              
524             Job is valid for this many seconds (from now) before it expires.
525              
526             =item lax
527              
528             lax => 1
529              
530             Existing jobs this job depends on may also have transitioned to the C state to allow for it to be processed,
531             defaults to C.
532              
533             =item parents
534              
535             parents => [$id1, $id2, $id3]
536              
537             Jobs this job depends on.
538              
539             =item priority
540              
541             priority => 5
542              
543             Job priority.
544              
545             =item queue
546              
547             queue => 'important'
548              
549             Queue to put job in.
550              
551             =back
552              
553             =head2 run
554              
555             $job->run(@args);
556              
557             Task to perform by this job. Meant to be overloaded in a subclass to create a custom task class. Note that this method
558             is B and might change without warning!
559              
560             =head2 start
561              
562             $job = $job->start;
563              
564             Perform job in new process, but do not wait for it to finish. Note that this method should only be used to implement
565             custom workers.
566              
567             # Perform two jobs concurrently
568             $job1->start;
569             $job2->start;
570             my ($first, $second);
571             sleep 1
572             until $first ||= $job1->is_finished and $second ||= $job2->is_finished;
573              
574             =head2 stop
575              
576             $job->stop;
577              
578             Stop job performed with L immediately. Note that this method should only be used to implement custom workers.
579              
580             =head1 SEE ALSO
581              
582             L, L, L, L, L.
583              
584             =cut