File Coverage

lib/MediaCloud/JobManager/Job.pm
Criterion Covered Total %
statement 39 142 27.4
branch 0 36 0.0
condition 0 6 0.0
subroutine 13 26 50.0
pod 9 12 75.0
total 61 222 27.4


line stmt bran cond sub pod time code
1              
2             =head1 NAME
3            
4             C<MediaCloud::JobManager::Job> - An abstract class for a "function".
5            
6            
7             =head1 LINGO
8            
9             =over 4
10            
11             =item * function
12            
13             A function to be run by locally or remotely, e.g. C<add_default_feeds>.
14            
15             =item * job
16            
17             An instance of the function doing the actual job with specific parameters.
18            
19             =back
20            
21             =cut
22              
23             package MediaCloud::JobManager::Job;
24              
25 1     1   4 use strict;
  1         2  
  1         28  
26 1     1   4 use warnings;
  1         1  
  1         33  
27 1     1   3 use Modern::Perl "2012";
  1         1  
  1         7  
28 1     1   118 use feature qw(switch);
  1         1  
  1         26  
29              
30 1     1   3 use Moose::Role 2.1005;
  1         27  
  1         5  
31              
32 1     1   3743 use MediaCloud::JobManager; # helper subroutines
  1         2  
  1         37  
33 1     1   7 use MediaCloud::JobManager::Configuration;
  1         1  
  1         16  
34              
35 1     1   4 use Time::HiRes;
  1         1  
  1         9  
36 1     1   92 use Data::Dumper;
  1         3  
  1         68  
37 1     1   783 use DateTime;
  1         60884  
  1         35  
38 1     1   8 use Readonly;
  1         2  
  1         58  
39              
40             # used for capturing STDOUT and STDERR output of each job and timestamping it;
41             # initialized before each job
42 1     1   4 use Log::Log4perl qw(:easy);
  1         1  
  1         11  
43             Log::Log4perl->easy_init(
44                 {
45                     level => $DEBUG,
46                     utf8 => 1,
47                     layout => "%d{ISO8601} [%P]: %m%n"
48                 }
49             );
50              
51             =head1 ABSTRACT INTERFACE
52            
53             The following subroutines must be implemented by the subclasses of this class.
54            
55             =head2 REQUIRED
56            
57             =head3 C<run($self, $args)>
58            
59             Run the job.
60            
61             Parameters:
62            
63             =over 4
64            
65             =item * C<$self>, a reference to the instance of the function class
66            
67             =item * (optional) C<$args> (hashref), arguments needed for running the
68             function
69            
70             =back
71            
72             An instance (object) of the class will be created before each run. Class
73             instance variables (e.g. C<$self-E<gt>_my_variable>) will be discarded after
74             each run.
75            
76             Returns result on success (serializable by the L<JSON> module). The result will
77             be discarded if the job is added as a background process.
78            
79             Provides progress reports when available:
80            
81             =over 4
82            
83             =item * by calling C<$self-E<gt>set_progress($numerator, $denominator)>
84            
85             =back
86            
87             C<die()>s on error.
88            
89             Writes log to C<STDOUT> or C<STDERR> (preferably the latter).
90            
91             =cut
92              
93             requires 'run';
94              
95             =head2 OPTIONAL
96            
97             =head3 (static) C<retries()>
98            
99             Return the number of retries for each job.
100            
101             Returns a number of retries each job will be attempted at. For example, if the
102             number of retries is set to 3, the job will be attempted 4 four times in total.
103            
104             Returns 0 if the job should not be retried (attempted only once).
105            
106             Default implementation of this subroutine returns 0 (no retries).
107            
108             =cut
109              
110             sub retries()
111             {
112             # By default the job will not be retried if it fails
113 0     0 1       return 0;
114             }
115              
116             =head3 (static) C<lazy_queue()>
117            
118             Return true if RabbitMQ should create a "lazy" queue for this function.
119            
120             Returns true if the job queue is expected to grow very large so RabbitMQ should
121             create a "lazy" queue (https://www.rabbitmq.com/lazy-queues.html) for this type
122             of job.
123            
124             Default implementation of this subroutine returns 0 ("default" type of queue).
125            
126             =cut
127              
128             sub lazy_queue()
129             {
130 0     0 1       return 0;
131             }
132              
133             =head3 (static) C<publish_results()>
134            
135             Return true if worker should publish results back to a results RabbitMQ queue.
136            
137             Returns true if client that added job to the queue might be interested in the
138             results of the job (whether or not it has failed, what has run() returned) so
139             RabbitMQ should keep a result of the job and send it back to client when
140             requested.
141            
142             One might want to disable this if distinct results of many jobs aren't that
143             important and you'd like to make job broker a little bit faster.
144            
145             This subroutine will only be used when calling add_to_queue().
146            
147             Default implementation of this subroutine returns 1 (results will be collected,
148             stored and sent back to clients if requested).
149            
150             =cut
151              
152             sub publish_results()
153             {
154 0     0 1       return 1;
155             }
156              
157             =head3 (static) C<configuration()>
158            
159             Return an instance or a subclass of C<MediaCloud::JobManager::Configuration> to
160             be used as default configuration by both workers and clients.
161            
162             Workers and clients will still be able to override this configuration by
163             passing their own C<config> argument. This configuration will be used if no
164             such argument is present.
165            
166             Default implementation of this subroutine returns an instance of
167             C<MediaCloud::JobManager::Configuration> (default configuration).
168            
169             =cut
170              
171             sub configuration()
172             {
173 0     0 1       return MediaCloud::JobManager::Configuration->instance;
174             }
175              
176             =head3 Priorities
177            
178             Jobs in a single queue can have different priorities ("low", "normal" or
179             "high") in order for them to be run in desirable order:
180            
181             =over 4
182            
183             =item * C<$MJM_JOB_PRIORITY_LOW>, if the job is considered of "low priority".
184            
185             =item * C<$MJM_JOB_PRIORITY_NORMAL> if the job is considered of "normal priority".
186            
187             =item * C<$MJM_JOB_PRIORITY_HIGH> if the job is considered of "high priority".
188            
189             =back
190            
191             C<run_remotely()> and C<add_to_queue()> both accept the job priority argument.
192            
193             By default, jobs are being run with a "normal" priority.
194            
195             =cut
196              
197             # Job priorities
198             Readonly our $MJM_JOB_PRIORITY_LOW => 'low';
199             Readonly our $MJM_JOB_PRIORITY_NORMAL => 'normal';
200             Readonly our $MJM_JOB_PRIORITY_HIGH => 'high';
201              
202             # Subroutines for backwards compatibility
203 0     0 0   sub MJM_JOB_PRIORITY_LOW { return $MJM_JOB_PRIORITY_LOW }
204 0     0 0   sub MJM_JOB_PRIORITY_NORMAL { return $MJM_JOB_PRIORITY_NORMAL }
205 0     0 0   sub MJM_JOB_PRIORITY_HIGH { return $MJM_JOB_PRIORITY_HIGH }
206              
207             Readonly my %valid_priorities => (
208                 $MJM_JOB_PRIORITY_LOW => 1,
209                 $MJM_JOB_PRIORITY_NORMAL => 1,
210                 $MJM_JOB_PRIORITY_HIGH => 1,
211             );
212              
213             sub _priority_is_valid($)
214             {
215 0     0         my $priority = shift;
216 0               return exists $valid_priorities{ $priority };
217             }
218              
219             =head1 HELPER SUBROUTINES
220            
221             The following subroutines can be used by the deriving class.
222            
223             =head2 C<$self-E<gt>set_progress($numerator, $denominator)>
224            
225             Provide progress report while running the task (from C<run()>).
226            
227             Examples:
228            
229             =over 4
230            
231             =item * C<$self-E<gt>set_progress(3, 10)>
232            
233             3 out of 10 subtasks are complete.
234            
235             =item * C<$self-E<gt>set_progress(45, 100)>
236            
237             45 out of 100 subtasks are complete (or 45% complete).
238            
239             =back
240            
241             =cut
242              
243             sub set_progress($$$)
244             {
245 0     0 1       my ( $self, $numerator, $denominator ) = @_;
246              
247 0 0             unless ( defined $self->_job )
248                 {
249             # Running the job locally, not going to report progress (because job is blocking)
250 0                   DEBUG( "Job is undef" );
251 0                   return;
252                 }
253 0 0             unless ( $denominator )
254                 {
255 0                   LOGDIE( "Denominator is 0." );
256                 }
257              
258 0               my $function_name = $self->name();
259 0               my $config = $function_name->configuration();
260              
261 0               $config->{ broker }->set_job_progress( $self->_job, $numerator, $denominator );
262              
263             # Written to job's log
264 0               INFO( "$numerator/$denominator complete." );
265             }
266              
267             =head1 CLIENT SUBROUTINES
268            
269             The following subroutines can be used by clients to run a function.
270            
271             =head2 (static) C<$class-E<gt>run_locally([$args, $config])>
272            
273             Run locally and right away, blocking the parent process until the job is
274             finished.
275            
276             Parameters:
277            
278             =over 4
279            
280             =item * (optional) C<$args> (hashref), arguments required for running the
281             function (serializable by the L<JSON> module)
282            
283             =item * (optional, internal) job handle to be later used by send_progress()
284            
285             =back
286            
287             Returns result (may be false of C<undef>) on success, C<die()>s on error
288            
289             =cut
290              
291             sub run_locally($;$$)
292             {
293 0     0 1       my ( $class, $args, $job ) = @_;
294              
295 0 0             if ( ref $class )
296                 {
297 0                   LOGDIE( "Use this subroutine as a static method, e.g. MyFunction->run_locally()" );
298                 }
299              
300 0               my $function_name = $class->name();
301 0               my $config = $function_name->configuration();
302              
303             # DEBUG( "Running locally" );
304              
305 0               my $mjm_job_id;
306 0 0             if ( $job )
307                 {
308 0                   my $job_id = $config->{ broker }->job_id_from_handle( $job );
309 0                   $mjm_job_id = MediaCloud::JobManager::_unique_path_job_id( $function_name, $args, $job_id );
310                 }
311                 else
312                 {
313 0                   $mjm_job_id = MediaCloud::JobManager::_unique_path_job_id( $function_name, $args );
314                 }
315 0 0             unless ( $mjm_job_id )
316                 {
317 0                   LOGDIE( "Unable to determine unique MediaCloud::JobManager job ID" );
318                 }
319              
320 0               my $result;
321 0               eval {
322              
323 0                   my $d = Data::Dumper->new( [ $args ], [ 'args' ] );
324 0                   $d->Indent( 0 );
325 0                   $d->Sortkeys( 1 );
326              
327 0                   my $str_arguments = $d->Dump;
328              
329 0                   INFO( "Starting job ID \"$mjm_job_id\"..." );
330 0                   INFO( "========" );
331 0                   INFO( "Arguments: $str_arguments" );
332 0                   INFO( "========" );
333 0                   INFO( "" );
334              
335 0                   my $start = Time::HiRes::gettimeofday();
336              
337 0                   my $job_succeeded = 0;
338 0                   for ( my $retry = 0 ; $retry <= $class->retries() ; ++$retry )
339                     {
340 0 0                     if ( $retry > 0 )
341                         {
342 0                           INFO( "" );
343 0                           INFO( "========" );
344 0                           INFO( "Retrying ($retry)..." );
345 0                           INFO( "========" );
346 0                           INFO( "" );
347                         }
348              
349 0                       eval {
350              
351             # Try to run the job
352 0                           my $instance = $class->new();
353              
354             # _job is undef when running locally, instance when issued from worker
355 0                           $instance->_job( $job );
356              
357             # Do the work
358 0                           $result = $instance->run( $args );
359              
360             # Unset the _job for the sake of cleanliness
361 0                           $instance->_job( undef );
362              
363             # Destroy instance
364 0                           $instance = undef;
365              
366 0                           $job_succeeded = 1;
367                         };
368              
369 0 0                     if ( $@ )
370                         {
371 0                           ERROR( "Job \"$mjm_job_id\" failed: $@" );
372                         }
373                         else
374                         {
375 0                           last;
376                         }
377                     }
378              
379 0 0                 unless ( $job_succeeded )
380                     {
381 0                       ERROR( "" );
382 0                       ERROR( "========" );
383 0 0                     LOGDIE( "Job \"$mjm_job_id\" failed" .
384                               ( $class->retries() ? " after " . $class->retries() . " retries" : "" ) . ": $@" );
385                     }
386              
387 0                   my $end = Time::HiRes::gettimeofday();
388              
389 0                   INFO( "" );
390 0                   INFO( "========" );
391 0                   INFO( "Finished job ID \"$mjm_job_id\" in " . sprintf( "%.2f", $end - $start ) . " seconds" );
392              
393                 };
394              
395 0               my $error = $@;
396 0 0             if ( $@ )
397                 {
398 0                   LOGDIE( "Job died: $error" );
399                 }
400              
401 0               return $result;
402             }
403              
404             =head2 (static) C<$class-E<gt>run_remotely([$args])>
405            
406             Run remotely, wait for the task to complete, return the result; block the
407             process until the job is complete.
408            
409             Parameters:
410            
411             =over 4
412            
413             =item * (optional) C<$args> (hashref), arguments needed for running the
414             function (serializable by the L<JSON> module)
415            
416             =back
417            
418             Returns result (may be false of C<undef>) on success, C<die()>s on error
419            
420             =cut
421              
422             sub run_remotely($;$$)
423             {
424 0     0 1       my ( $class, $args, $priority ) = @_;
425              
426 0 0             if ( ref $class )
427                 {
428 0                   LOGDIE( "Use this subroutine as a static method, e.g. MyFunction->run_remotely()" );
429                 }
430              
431 0               my $function_name = $class->name;
432 0 0             unless ( $function_name )
433                 {
434 0                   LOGDIE( "Unable to determine function name." );
435                 }
436              
437 0               my $config = $function_name->configuration();
438              
439 0   0           $priority //= $MJM_JOB_PRIORITY_NORMAL;
440 0 0             unless ( _priority_is_valid( $priority ) )
441                 {
442 0                   LOGDIE( "Job priority '$priority' is not valid." );
443                 }
444              
445 0               return $config->{ broker }->run_job_sync( $function_name, $args, $priority );
446             }
447              
448             =head2 (static) C<$class-E<gt>add_to_queue([$args, $config])>
449            
450             Add to queue remotely, do not wait for the task to complete, return
451             immediately; do not block the parent process until the job is complete.
452            
453             Parameters:
454            
455             =over 4
456            
457             =item * (optional) C<$args> (hashref), arguments needed for running the
458             function (serializable by the L<JSON> module)
459            
460             =back
461            
462             Returns job ID if the job was added to queue successfully, C<die()>s on error.
463            
464             =cut
465              
466             sub add_to_queue($;$$)
467             {
468 0     0 1       my ( $class, $args, $priority ) = @_;
469              
470 0 0             if ( ref $class )
471                 {
472 0                   LOGDIE( "Use this subroutine as a static method, e.g. MyFunction->add_to_queue()" );
473                 }
474              
475 0               my $function_name = $class->name;
476 0 0             unless ( $function_name )
477                 {
478 0                   LOGDIE( "Unable to determine function name." );
479                 }
480              
481 0               my $config = $function_name->configuration();
482              
483 0   0           $priority //= $MJM_JOB_PRIORITY_NORMAL;
484 0 0             unless ( _priority_is_valid( $priority ) )
485                 {
486 0                   LOGDIE( "Job priority '$priority' is not valid." );
487                 }
488              
489 0               return $config->{ broker }->run_job_async( $function_name, $args, $priority );
490             }
491              
492             =head2 (static) C<name()>
493            
494             Returns function's name (e.g. C<NinetyNineBottlesOfBeer>).
495            
496             Usage:
497            
498             NinetyNineBottlesOfBeer->name();
499            
500             Parameters:
501            
502             =over 4
503            
504             =item * Class or class instance
505            
506             =back
507            
508             =cut
509              
510             sub name($)
511             {
512 0     0 1       my $self_or_class = shift;
513              
514 0               my $function_name = '';
515 0 0             if ( ref( $self_or_class ) )
516                 {
517             # Instance
518 0                   $function_name = '' . ref( $self_or_class );
519                 }
520                 else
521                 {
522             # Static
523 0                   $function_name = $self_or_class;
524                 }
525              
526 0 0             if ( $function_name eq 'Job' )
527                 {
528 0                   LOGDIE( "Unable to determine function name." );
529                 }
530              
531 0               return $function_name;
532             }
533              
534             # Worker will pass this parameter to run_locally() which, in turn, will
535             # temporarily place job handle to this variable so that set_progress() helper
536             # can later use it
537             has '_job' => ( is => 'rw' );
538              
539 1     1   1595 no Moose; # gets rid of scaffolding
  1         2  
  1         10  
540              
541             1;
542