File Coverage

blib/lib/Helios/Job.pm
Criterion Covered Total %
statement 16 18 88.8
branch n/a
condition n/a
subroutine 6 6 100.0
pod n/a
total 22 24 91.6


line stmt bran cond sub pod time code
1             package Helios::Job;
2            
3 1     1   14 use 5.008000;
  1         2  
  1         30  
4 1     1   4 use strict;
  1         1  
  1         22  
5 1     1   3 use warnings;
  1         2  
  1         22  
6            
7 1     1   1415 use DBI;
  1         13521  
  1         85  
8 1     1   9 use Error qw(:try);
  1         1  
  1         8  
9             # [LH] [2013-10-18] Replaced Helios::TheSchwartz with Helios::TS
10 1     1   621 use Helios::TS;
  0            
  0            
11             use Helios::TS::Job;
12             use Helios::ObjectDriver::DBI;
13            
14             require XML::Simple;
15            
16             use Helios::Error;
17             use Helios::JobHistory;
18            
19             our $VERSION = '2.81';
20            
21             our $D_OD_RETRIES = 3;
22             our $D_OD_RETRY_INTERVAL = 5;
23            
24             # 2011-12-15: Removed setting $XML::Simple::PREFERRED_PARSER.
25             # 2012-01-01: Changed failed() and failedNoRetry() methods to truncate error
26             # string at 256 chars. That's the max length of the matching field in the
27             # ERROR table. Updated copyright info.
28             # 2012-03-27: Documented accessor methods and greatly expanded and updated
29             # JOB SUBMISSION documentation.
30             # 2012-04-01: Added setDriver() and initDriver() methods. Refactored old
31             # getDriver() into initDriver(), and changed getDriver() to call initDriver().
32             # 2012-04-25: Added deferred() method.
33             # [LH] 2012-07-11: submit(): changed to use Helios::TheSchwartz instead of
34             # base TheSchwartz to implement database connection caching.
35             # [LH] [2013-09-07] new(): changed to check to see if TheSchwartz::Job object
36             # to new() has an array in arg(), and throw an exception if it doesn't.
37             # (It always should, but [RT79690] is preventing that in a tiny number of cases.)
38             # [LH] [2013-10-18] Replaced calls to Helios::TheSchwartz and TheSchwartz::Job
39             # with Helios::TS and Helios::TS::Job.
40             # [LH] [2013-10-28] Added set/getArgString(), set/getJobType(),
41             # set/getJobtypeid() methods; set/getArgXML(), set/getFuncname(),
42             # set/getFuncid() will be deprecated in Helios 3.x. Changed POD to document
43             # the new functions.
44             # [LH] [2014-08-10] Added get/setPriority() methods.
45            
46            
47             =head1 NAME
48            
49             Helios::Job - base class for jobs in the Helios job processing system
50            
51             =head1 DESCRIPTION
52            
53             Helios::Job is the standard representation of jobs in the Helios framework. It handles tasks
54             related to the underlying TheSchwartz::Job objects, and provides its own methods for manipulating
55             jobs in the Helios system.
56            
57             =head1 ACCESSOR METHODS
58            
59             These accessors allow access to information about an instantiated Helios::Job
60             object:
61            
62             debug() whether Debug Mode is enabled or not
63             get/setConfig() Helios configuration passed by the system to the job object
64             get/setArgs() hashref of the job's arguments (interpreted from the arg string)
65             get/setArgString() the raw XML of the job arguments
66            
67             Several accessors are pass-through accessors to access values in the
68             underlying TheSchwartz::Job object
69            
70             get/setJobid() jobid of the job in the job queue
71             get/setFailures() number of previous failures of the job before current run
72             get/setJobtypeid() jobtypeid value of the job
73             get/setJobType() jobtype name of the job
74             get/setUniqkey() uniqkey value of the job (see TheSchwartz documentation)
75             get/setRunAfter() current run_after value of the job
76             get/setGrabbedUntil() current grabbed_until value of the job
77             get/setCoalesce() coalesce value of the job (see TheSchwartz documentation)
78            
79             When running a job, your service class need not access any of these values
80             directly, though the information is available if you need it (for example,
81             to log how many failures your job has encountered before the current run).
82             When submitting a job, several of the set* accessors are needed to set up the
83             job before submission; see the section on the submit() method for more
84             information.
85            
86             =cut
87            
88             sub setConfig { $_[0]->{config} = $_[1]; }
89             sub getConfig { return $_[0]->{config}; }
90            
91             sub setArgs { $_[0]->{args} = $_[1]; }
92             sub getArgs { return $_[0]->{args}; }
93            
94             sub setJobid { $_[0]->{args} = $_[1]; }
95             sub getJobid { return $_[0]->job()->jobid; }
96            
97             sub setFuncid { return $_[0]->job()->funcid($_[1]); }
98             sub getFuncid { return $_[0]->job()->funcid; }
99            
100             sub setFailures { return $_[0]->job()->failures($_[1]); }
101             sub getFailures { return $_[0]->job()->failures; }
102            
103             sub setFuncname { return $_[0]->job()->funcname($_[1]); }
104             sub getFuncname { return $_[0]->job()->funcname; }
105            
106             sub setUniqkey { return $_[0]->job()->uniqkey($_[1]); }
107             sub getUniqkey { return $_[0]->job()->uniqkey; }
108            
109             sub setRunAfter { return $_[0]->job()->run_after($_[1]); }
110             sub getRunAfter { return $_[0]->job()->run_after; }
111            
112             sub setGrabbedUntil { return $_[0]->job()->grabbed_until($_[1]); }
113             sub getGrabbedUntil { return $_[0]->job()->grabbed_until; }
114            
115             sub setCoalesce { return $_[0]->job()->coalesce($_[1]); }
116             sub getCoalesce { return $_[0]->job()->coalesce; }
117            
118             # BEGIN CODE Copyright (C) 2012 by Andrew Johnson.
119             sub setDriver { $_[0]->{driver} = $_[1]; }
120             sub getDriver {
121             if ( defined($_[0]->{driver}) ) {
122             return $_[0]->{driver};
123             } else {
124             return $_[0]->initDriver();
125             }
126             }
127             # END CODE Copyright (C) 2012 by Andrew Johnson.
128            
129             sub debug { my $self = shift; @_ ? $self->{debug} = shift : $self->{debug}; }
130            
131             # these are for direct access to the underlying TheSchwartz::Job object
132             sub job { my $self = shift; @_ ? $self->{job} = shift : $self->{job}; }
133            
134             sub setArgXML { $_[0]->{argxml} = $_[1]; }
135             sub getArgXML { return $_[0]->{argxml}; }
136            
137             # BEGIN CODE Copyright (C) 2013 by Logical Helion, LLC.
138             sub setArgString { setArgXML(@_) }
139             sub getArgString { getArgXML(@_) }
140            
141             sub setJobType { setFuncname(@_) }
142             sub getJobType { getFuncname(@_) }
143            
144             sub setJobtypeid { setFuncid(@_) }
145             sub getJobtypeid { getFuncid(@_) }
146             # END CODE Copyright (C) 2013 by Logical Helion, LLC.
147            
148             # BEGIN CODE Copyright (C) 2014 by Logical Helion, LLC.
149             sub setPriority {
150             my $self = shift;
151             my $p = shift;
152             $self->job()->priority($p);
153             }
154             sub getPriority {
155             my $self = shift;
156             $self->job()->priority();
157             }
158             # END CODE Copyright (C) 2014 by Logical Helion, LLC.
159            
160             =head1 METHODS
161            
162             =head2 new($job)
163            
164             =cut
165            
166             sub new {
167             my $caller = shift;
168             my $class = ref($caller) || $caller;
169             # my $self = $class->SUPER::new(@_);
170             my $self = {};
171             bless $self, $class;
172            
173             # init fields
174             # [LH] [2013-10-18] Replaced Helios::TheSchwartz with Helios::TS
175             if ( defined($_[0]) && ref($_[0]) && $_[0]->isa('Helios::TS::Job') ) {
176             $self->job($_[0]);
177             # BEGIN CODE COPYRIGHT (C) 2013 LOGICAL HELION, LLC.
178             # [LH] [2013-09-07] new(): changed to check to see if TheSchwartz::Job object
179             # to new() has an array in arg(), and throw an exception if it doesn't.
180             # (It always should, but [RT79690] is preventing that in a tiny number of cases.)
181             if ( ref($_[0]->arg()) eq 'ARRAY' ) {
182             my $arg_str = $_[0]->arg()->[0];
183             $self->setArgXML($arg_str);
184             } else {
185             Helios::Error::DatabaseError->throw("Received job without actual job arguments, probably due to transient database problem [RT79690].");
186             }
187             # END CODE COPYRIGHT (C) 2013 LOGICAL HELION, LLC.
188             } else {
189             # [LH] [2013-10-18] Replaced Helios::TheSchwartz with Helios::TS
190             my $schwartz_job = Helios::TS::Job->new(@_);
191             $self->job($schwartz_job);
192             }
193            
194             return $self;
195             }
196            
197            
198             =head1 ARGUMENT PROCESSING METHODS
199            
200             =head2 parseArgXML($xml)
201            
202             Given a string of XML, parse it into a mixed hash/arrayref structure. This uses XML::Simple.
203            
204             =cut
205            
206             sub parseArgXML {
207             my $self = shift;
208             my $xml = shift;
209             my $xs = XML::Simple->new(SuppressEmpty => undef, KeepRoot => 1, ForceArray => ['job']);
210             my $args;
211             try {
212             $args = $xs->XMLin($xml);
213             } otherwise {
214             throw Helios::Error::InvalidArg($!);
215             };
216             return $args;
217             }
218            
219            
220            
221             =head2 parseArgs()
222            
223             Call parseArgs() to pick the Helios job arguments (the first element of the job->args() array)
224             from the Schwartz job object, parse the XML into a Perl data structure (via XML::Simple) and
225             return the structure to the calling routine.
226            
227             This is really a convenience method created because
228            
229             $args = $self->parseArgXML( $job->arg()->[0] );
230            
231             looks nastier than it really needs to be.
232            
233             =cut
234            
235             sub parseArgs {
236             my $self = shift;
237             my $job = $self->job();
238             my $args;
239             my $parsedxml = $self->parseArgXML($job->arg()->[0]);
240             # is this a metajob?
241             if ( defined($parsedxml->{metajob}) ) {
242             # this is a metajob, with full xml syntax (required for metajobs)
243             $args = $parsedxml->{metajob};
244             $args->{metajob} = 1;
245             } elsif ( defined($parsedxml->{job}) ) {
246             # this isn't a metajob, but is a job with full xml syntax
247             # unfortunately, forcing into an array for metajobs adds complexity here
248             $args = $parsedxml->{job}->[0]->{params};
249             } else {
250             # we'll assume this is the old-style w/o the enclosing section
251             # we'll probably still support this for awhile
252             $args = $parsedxml->{params};
253             }
254            
255             $self->setArgs( $args );
256             return $args;
257             }
258            
259            
260             =head2 isaMetaJob()
261            
262             Returns a true value if the job is a metajob and a false value otherwise.
263            
264             =cut
265            
266             sub isaMetaJob {
267             my $self = shift;
268             my $args = $self->getArgs() ? $self->getArgs() : $self->parseArgs();
269             if ( defined($args->{metajob}) && $args->{metajob} == 1) { return 1; }
270             return 0;
271             }
272            
273            
274             =head1 JOB SUCCESS/FAILURE METHODS
275            
276             Use these methods to mark jobs as either successful or failed.
277            
278             Helios follows the *nix concept of exitstatus: 0 is successful, nonzero is failure. If you don't
279             specify an exitstatus when you call failed() or failedNoRetry(), 1 will be recorded as the
280             exitstatus.
281            
282             The completed(), failed(), and failedNoRetry() methods actually return the exitstatus of the job,
283             so completed() always returns 0 and the failed methods return the exitstatus you specified (or 1
284             if you didn't specify one). This is to facilitate ending of service class run() methods; the
285             caller of a run() method will cause the worker process to exit if a nonzero value is returned. If
286             you make sure your completed() or failed()/failedNoRetry() call is the last thing you do in your
287             run() method, everything should work fine.
288            
289             =head2 completed()
290            
291             Marks the job as completed successfully.
292            
293             Successful jobs are marked with exitstatus of zero in Helios job history.
294            
295             =cut
296            
297             sub completed {
298             my $self = shift;
299             my $job = $self->job();
300            
301             my $retries = 0;
302             RETRY: {
303             try {
304             my $driver = $self->getDriver();
305             my $jobhistory = Helios::JobHistory->new(
306             jobid => $job->jobid,
307             funcid => $job->funcid,
308             arg => $job->arg()->[0],
309             uniqkey => $job->uniqkey,
310             insert_time => $job->insert_time,
311             run_after => $job->run_after,
312             grabbed_until => $job->grabbed_until,
313             priority => $job->priority,
314             coalesce => $job->coalesce,
315             complete_time => time(),
316             exitstatus => 0
317             );
318             $driver->insert($jobhistory);
319             } otherwise {
320             my $e = shift;
321             if ($retries > $D_OD_RETRIES) {
322             throw Helios::Error::DatabaseError($e->text);
323             } else {
324             $retries++;
325             sleep $D_OD_RETRY_INTERVAL;
326             next RETRY;
327             }
328             };
329             }
330             $job->completed();
331             return 0;
332             }
333            
334            
335             =head2 failed([$error][, $exitstatus])
336            
337             Marks the job as failed. Allows job to be retried if the job's service class supports it.
338             Returns the exitstatus recorded for the job (if it wasn't given, it defaults to 1).
339            
340             =cut
341            
342             sub failed {
343             my $self = shift;
344             my $error = shift;
345             my $exitstatus = shift;
346             my $job = $self->job();
347            
348             # this job failed; that means a nonzero exitstatus
349             # if exitstatus wasn't specified (or is zero?), set it to 1
350             if ( !defined($exitstatus) || $exitstatus == 0 ) {
351             $exitstatus = 1;
352             }
353            
354             my $retries = 0;
355             my $retry_limit = 3;
356             RETRY: {
357             try {
358             my $driver = $self->getDriver();
359             my $jobhistory = Helios::JobHistory->new(
360             jobid => $job->jobid,
361             funcid => $job->funcid,
362             arg => $job->arg()->[0],
363             uniqkey => $job->uniqkey,
364             insert_time => $job->insert_time,
365             run_after => $job->run_after,
366             grabbed_until => $job->grabbed_until,
367             priority => $job->priority,
368             coalesce => $job->coalesce,
369             complete_time => time(),
370             exitstatus => $exitstatus
371             );
372             $driver->insert($jobhistory);
373             } otherwise {
374             my $e = shift;
375             if ($retries > $retry_limit) {
376             $job->failed($error, $exitstatus);
377             throw Helios::Error::DatabaseError($e->text);
378             } else {
379             $retries++;
380             sleep 10;
381             next RETRY;
382             }
383             };
384             }
385             $job->failed(substr($error,0,254), $exitstatus);
386             return $exitstatus;
387             }
388            
389            
390             =head2 failedNoRetry([$error][, $exitstatus])
391            
392             Marks the job as permanently failed (no more retries allowed).
393            
394             If not specified, exitstatus defaults to 1.
395            
396             =cut
397            
398             sub failedNoRetry {
399             my $self = shift;
400             my $error = shift;
401             my $exitstatus = shift;
402             my $job = $self->job();
403            
404             # this job failed; that means a nonzero exitstatus
405             # if exitstatus wasn't specified (or is zero?), set it to 1
406             if ( !defined($exitstatus) || $exitstatus == 0 ) {
407             $exitstatus = 1;
408             }
409            
410             my $retries = 0;
411             my $retry_limit = 3;
412             RETRY: {
413             try {
414             my $driver = $self->getDriver();
415             my $jobhistory = Helios::JobHistory->new(
416             jobid => $job->jobid,
417             funcid => $job->funcid,
418             arg => $job->arg()->[0],
419             uniqkey => $job->uniqkey,
420             insert_time => $job->insert_time,
421             run_after => $job->run_after,
422             grabbed_until => $job->grabbed_until,
423             priority => $job->priority,
424             coalesce => $job->coalesce,
425             complete_time => time(),
426             exitstatus => $exitstatus
427             );
428             $driver->insert($jobhistory);
429             } otherwise {
430             my $e = shift;
431             if ($retries > $retry_limit) {
432             $job->permanent_failure($error, $exitstatus);
433             throw Helios::Error::DatabaseError($e->text);
434             } else {
435             $retries++;
436             sleep 10;
437             next RETRY;
438             }
439             };
440             }
441            
442             $job->permanent_failure(substr($error,0,254), $exitstatus);
443             return $exitstatus;
444             }
445            
446             =head2 deferred()
447            
448             Defers processing of a job even though it was available for processing in the
449             queue. The job will be seen as available for processing again when the
450             grabbed_until time has expired (the default is 60 minutes). If your service
451             employs the job retry API, a declined job run does not count against the job's
452             retry count.
453            
454             Unlike the completed() and failed*() methods above, deferred() is actually
455             only a wrapper around TheSchwartz 1.10's TheSchwartz::Job->declined() method
456             for now. No job history is recorded in the HELIOS_JOB_HISTORY_TB in the
457             collective database. This may change in the future.
458            
459             =cut
460            
461             sub deferred {
462             my $self = shift;
463             my $job = $self->job();
464            
465             $job->declined();
466             return 0;
467             }
468            
469            
470             =head1 JOB SUBMISSION
471            
472             =head2 submit()
473            
474             Submits a job to the Helios collective for processing. Returns the jobid if successful, throws an
475             error if it fails.
476            
477             Before a job can be successfully submitted, the following must be set first:
478            
479             $job->setConfig($configHash);
480             $job->setArgString($xmlstring);
481             $job->setJobType($servicename);
482            
483             So, for example, to submit a Helios::TestService to the Helios system, you need
484             to do the following:
485            
486             # you need Helios::Service and Helios::Job
487             use Helios::Service;
488             use Helios::Job;
489            
490             # these are the job arguments we want to pass to Helios::TestService
491             my $jobxml = "This is a test/job>";
492            
493             # first, use Helios::Service to get the Helios configuration
494             my $srv = Helios::Service->new();
495             $srv->prep();
496             my $config = $srv->getConfig();
497            
498             # once you have the config, you can set up the Helios::Job
499             my $job = Helios::Job->new();
500             $job->setConfig($config);
501             $job->setJobType('Helios::TestService');
502             $job->setArgString($jobxml);
503            
504             # then submit the job (this will throw an exception if something goes wrong)
505             my $jobid = $job->submit();
506             print "Submitted job $jobid to Helios\n";
507            
508             Both Helios::Service->prep() and Helios::Job->submit() will throw exceptions
509             if they encounter errors, so a safer example would catch them:
510            
511             use Helios::Service;
512             use Helios::Job;
513            
514             my $jobxml = "This is a test/job>";
515            
516             my $srv = Helios::Service->new();
517             eval {
518             $srv->prep();
519             1;
520             } or do {
521             my $E = $@;
522             print "Error encountered prepping Helios service: $E\n";
523             exit(1);
524             };
525             my $config = $srv->getConfig();
526            
527             # once you have the config, you can set up the Helios::Job
528             my $job = Helios::Job->new();
529             $job->setConfig($config);
530             $job->setJobType('Helios::TestService');
531             $job->setArgString($jobxml);
532            
533             # then submit the job (this will throw an exception if something goes wrong)
534             my $jobid;
535             eval {
536             $jobid = $job->submit();
537             1;
538             } or do {
539             my $E = $@;
540             print "Error encountered attempting job submission: $E\n";
541             };
542             print "Submitted job $jobid to Helios\n";
543            
544             Of course, the Try::Tiny (available on CPAN) would work just as well as an
545             eval{} block, and have much prettier syntax.
546            
547             =cut
548            
549             sub submit {
550             my $self = shift;
551             my $config = $self->getConfig();
552             my $params = $self->getArgXML();
553             my $job_class = $self->getFuncname;
554            
555             my $databases = [
556             { dsn => $config->{dsn},
557             user => $config->{user},
558             pass => $config->{password}
559             }
560             ];
561            
562             my $args = [ $params ];
563            
564             # [LH] [2013-10-18] Replaced Helios::TheSchwartz with Helios::TS
565             my Helios::TS $client = Helios::TS->new( databases => $databases, verbose => 1 );
566             my $sjh = $client->insert($job_class, $args);
567             $self->setJobid($sjh->jobid);
568             return $sjh->jobid;
569             }
570            
571            
572             =head1 JOB BURSTING
573            
574             Metajobs are jobs that specify multiple jobs. These metajobs will be burst apart by Helios into
575             the constituent jobs, which will be available for processing by any of the workers of the
576             appropriate class in the Helios collective. Metajobs provide a faster means to submit jobs in
577             bulk to Helios; rather than submit a thousand jobs, your application can submit 1 metajob that
578             will be burst apart by Helios into the thousand constituent jobs, which other workers will process
579             as if they were submitted individually.
580            
581             Normally, the Helios::Service base class determines whether a job is a metajob or not and can
582             handle the bursting process without intervention from your service subclass. If you need metajobs
583             to be burst in a way different than from the default, you may need to override
584             Helios::Service->burstJob() in your service class (and possibly create a Helios::Job subclass with
585             an overridden burst() method as well).
586            
587             =head2 burst()
588            
589             Bursts a metajob into smaller jobs. Returns the number of jobs burst if successful.
590            
591             =cut
592            
593             sub burst {
594             my $self = shift;
595             my $job = $self->job();
596             my $args = $self->getArgs();
597             my $xs = XML::Simple->new(SuppressEmpty => undef, ForceArray => [ 'job' ]);
598             my @newjobs;
599             my $classname;
600            
601             # determine the class of the burst jobs
602             # if it wasn't specified, it's the same class as this job
603             if ( defined($args->{class}) ) {
604             $classname = $args->{class};
605             } else {
606             $classname = $job->funcname;
607             }
608            
609             try {
610            
611             foreach my $job_arg (@{$args->{jobs}->{job}}) {
612             my $newxml = $xs->XMLout($job_arg, NoAttr => 1, NoIndent => 1, RootName => undef);
613             my $newjob = TheSchwartz::Job->new(
614             funcname => $classname,
615             arg => [ $newxml ]
616             );
617             push(@newjobs, $newjob);
618             }
619            
620             $job->replace_with(@newjobs);
621            
622             } otherwise {
623             my $e = shift;
624             $self->failed($e->text);
625             throw Helios::Error::Fatal($e->text);
626             };
627             $self->completed;
628            
629             # return the number of jobs burst from the meta job here
630             if ($self->debug) {
631             foreach (@newjobs) {
632             print "JOBID: ",$_->jobid,"\n";
633             }
634             }
635             return scalar(@newjobs);
636             }
637            
638            
639             =head1 OTHER METHODS
640            
641             =head2 initDriver()
642            
643             Returns a Data::ObjectDriver object for use with Helios layer database updates.
644            
645             =cut
646            
647             # BEGIN CODE Copyright (C) 2012 by Andrew Johnson.
648            
649             sub initDriver {
650             my $self = shift;
651             my $config = $self->getConfig();
652             if ($self->debug) { print $config->{dsn},$config->{user},$config->{password},"\n"; }
653             my $driver = Helios::ObjectDriver::DBI->new(
654             dsn => $config->{dsn},
655             username => $config->{user},
656             password => $config->{password}
657             );
658             if ($self->debug) { print 'Job->initDriver() DRIVER: ',$driver,"\n"; }
659             $self->setDriver($driver);
660             return $driver;
661             }
662             # END CODE Copyright (C) 2012 by Andrew Johnson.
663            
664            
665             1;
666             __END__