File Coverage

lib/Grid/Request.pm
Criterion Covered Total %
statement 16 18 88.8
branch n/a
condition n/a
subroutine 6 6 100.0
pod n/a
total 22 24 91.6


line stmt bran cond sub pod time code
1             package Grid::Request;
2              
3             =head1 NAME
4              
5             Grid::Request - An API for submitting jobs to a computational grid such as SGE or Condor.
6              
7             =head1 DESCRIPTION
8              
9             An API for submitting work to a Distributed Resource Management (DRM) system
10             such as Sun Grid Engine (SGE) or Condor.
11              
12             =head1 SYNOPSIS
13              
14             use Grid::Request;
15             my $request = Grid::Request->new();
16              
17             # Optionally set the job's project, if the configured scheduler requires it.
18             $request->project("MyProject");
19              
20             $request->times(2);
21             $request->command("/path/to/executable");
22             $request->initialdir("/path/to/initial/directory");
23             $request->error("/path/to/dir/stderr.err");
24              
25             # Note, most of the methods in this module may also be called
26             # with get_ and set_ prefixes. For example, the above code would
27             # also have worked if coded like so:
28              
29             $request->set_times(2);
30             $request->set_command("/path/to/executable");
31             $request->set_initialdir("/path/to/initial/directory");
32             $request->set_error("/path/to/dir/stderr.err");
33              
34             # When retrieving information (accessor behavior), you can call
35             # such methods with no arguments to return the information, or
36             # the "get_" may be prepended. For example:
37              
38             my $times = $request->times();
39             my $times_another_way = $request->get_times();
40             # Please note that calling the get version of a method and
41             # providing arguments does not make sense and will likely, not work...
42              
43             # WRONG
44             my $times_wrong_way = $request->get_times(3);
45              
46             # Finally, submit the request...
47             my @job_ids = $request->submit();
48             print "The first ID for this request is $job_ids[0].\n";
49              
50             # ...and wait for the results. This step is not necessary, only
51             # if you wish to block, or wait for the request to complete before
52             # moving on to other tasks.
53             $request->wait_for_request();
54              
55             # Or, you could simply submit and block:
56             $request->submit_and_wait();
57              
58             exit;
59              
60             =head1 CONSTRUCTOR AND INITIALIZATION
61              
62             Grid::Request->new(%args);
63              
64             B This is the object constructor. Parameters are passed to
65             the constructor in the form of a hash. Examples:
66              
67             my $req = Grid::Request->new();
68              
69             or
70              
71             my $req = Grid::Request->new( project => "SomeProject" );
72              
73             or
74              
75             my $req = Grid::Request->new( project => "SomeProject",
76             opsys => "Linux",
77             initialdir => "/path/to/initialdir",
78             output => "/path/to/output",
79             times => 5,
80             );
81             Users may also add a "debug" flag to the constructor call for increased
82             reporting:
83              
84             my $req = Grid::Request->new( debug => 1 );
85              
86             B A hash of request configuration options.
87              
88             B $obj, a Grid::Request object.
89              
90             =head1 CONFIGURATION
91              
92             By default, the configuration file that is used to determine what grid engine
93             type to use and where to store temporary files is located in the invoking
94             user's home directory under ~/.grid_request.conf. The file needs needs to have
95             a [request] header and entries for the 'tempdir' and 'drm' parameters. In addition,
96             the file may also specify the path to a Log::Log4perl configuration file with the
97             'log4perl-conf' entry name.The following is an example:
98              
99             [request]
100             drm=SGE
101             tempdir=/path/to/grid/accessible/tmp/directory
102             log4perl-conf=/path/to/custom-log4perl.conf
103              
104             The 'tempdir' directory must point to a directory that is accessible
105             to the grid execution machines, for instance, over NFS...
106              
107             Another way of specifying an alternate configuration is to define
108             the GRID_CONFIG environment variable.
109              
110             =head1 CLASS AND OBJECT METHODS
111              
112             =over 4
113              
114             =cut
115              
116              
117 29     29   2695426 use strict;
  29         76  
  29         1138  
118 29     29   36563 use Config::IniFiles;
  29         1215689  
  29         1047  
119 29     29   338 use Carp;
  29         59  
  29         1800  
120 29     29   7378 use Log::Log4perl qw(get_logger);
  29         303828  
  29         326  
121 29     29   29659 use POSIX qw(ceil);
  29         257111  
  29         222  
122 29     29   85580 use Schedule::DRMAAc qw(:all);
  0            
  0            
123             use Grid::Request::HTC;
124             use Grid::Request::Command;
125             use Grid::Request::Exceptions;
126              
127             use vars qw($AUTOLOAD);
128             # These will be holders for the various method names so we can identify
129             # what class to route the calls to.
130             my %comm_meths;
131              
132             # These are package variables.
133             my ($debug, $default_config, $logger);
134             $default_config = Grid::Request::HTC->config();
135             # The [section] heading in the configuration file.
136             my $section = $Grid::Request::HTC::config_section;
137              
138             my $WORKER = $Grid::Request::HTC::WORKER;
139              
140             my $DRMAA_INITIALIZED = 0;
141             our $VERSION = '0.11';
142             my $SESSION_NAME = lc(__PACKAGE__);
143             $SESSION_NAME =~ s/:+/_/g;
144              
145             # Avoid ugly warnings about single usage.
146             if ($^W) {
147             $VERSION = $VERSION;
148             }
149              
150             BEGIN: {
151             require 5.006_00; # Make sure we're not running some old Perl.
152              
153             # Here we set up which methods go where.
154             my @command_meths = qw(account add_param block_size class cmd_type command
155             email end_time getenv project error hosts initialdir
156             input length memory name opsys output
157             priority start_time state times runtime evictable
158             max_time pass_through params
159             );
160              
161             # Create the hash lookups for the methods so we know how to route later.
162             %comm_meths = map { $_ => 1 } @command_meths;
163             }
164              
165              
166             # The constructor.
167             sub new {
168             my ($class, %args) = @_;
169             my $self = bless {}, $class || ref($class);
170              
171             my $mapper = sub {
172             my @meths = @_;
173             my %hash;
174             foreach my $meth (@meths) {
175             if ( exists($args{$meth}) && defined($args{$meth}) ) {
176             $hash{$meth} = $args{$meth};
177             }
178             }
179             return \%hash;
180             };
181              
182             # Here we separate our arguments to route them to the right class.
183             my $command_args = $mapper->( sort keys %comm_meths );
184              
185             # $config will hold the location of the configuration file for the
186             # module, which holds the logger configuration, etc...
187             # The user may specify it with a "config" parameter in the
188             # constructor.
189             my $config = $args{config} || $default_config;
190             $debug = $args{debug} || 0;
191            
192             $self->_init($command_args, $config);
193             return $self;
194             }
195              
196             # Initialize the object. This is a private method. Do not call directly.
197             sub _init {
198             # Initialize the Request object. We need to parse the configuration
199             # file, initialize the logger, create the Command object.
200             my ($self, $command_args_ref, $config, @remaining) = @_;
201             die "Initialization failed. Too many arguments, stopped" if @remaining;
202              
203             # Parse the default config file. Then, check if the user specified their
204             # own, and if so, parse that and import the values from the default
205             # config. The default config has information about where to create
206             # temporary files and other information that user configuration files do
207             # not need (or should) know about.
208              
209             my $cfg = _init_config_logger($config);
210              
211             $logger->info("Creating the first Command object.");
212             # holds an array of command elements.
213             $self->{_cmd_ele}->[0] =
214             Grid::Request::Command->new(%$command_args_ref);
215              
216             $self->{_drm} = _load_drm($cfg);
217             $self->{_simulate} = 0; # For simulate.
218             $self->{_submitted} = 0; # For the submitted flag.
219             $self->{_config} = $cfg; # For the configuration object.
220             $self->{_env} = []; # To hold the environment.
221             $self->{_session} = "";
222             _init_drmaa();
223             }
224              
225             # Private method only. Used to configure logging.
226             sub _init_config_logger {
227             my $config = shift;
228             # TODO: Since Grid::Request::HTC already parsed the default config file, use
229             # an accessor to get that config object rather than reparsing it
230             # here. This will involve adding an additional method in Grid::Request::HTC.
231             my $default_cfg_obj = Config::IniFiles->new( -file => $default_config );
232             my ($cfg, $same_configs);
233             if (defined $config && ($config eq $default_config)) {
234             $same_configs = 1;
235             $cfg = $default_cfg_obj;
236             } else {
237             $cfg = Config::IniFiles->new(-file => $config,
238             -import => $default_cfg_obj);
239             }
240              
241             # Parse the location of the logger configuration and initialize
242             # Log4perl, if it has not already been initialized.
243             my $logger_conf = $cfg->val($section, "log4perl-conf");
244             if (defined $logger_conf) {
245             if (-f $logger_conf && -r $logger_conf) {
246             Log::Log4perl->init_once($logger_conf);
247             } else {
248             warn "Unable to configure logging with $logger_conf.\n";
249             }
250             }
251              
252             $logger = get_logger(__PACKAGE__);
253              
254             return $cfg;
255             }
256              
257             # Accessors (private) used to return internal objects.
258             sub _drm { return $_[0]->{_drm}; }
259             sub _com_obj_list { return wantarray ? @{ $_[0]->{_cmd_ele} } : $_[0]->{_cmd_ele}; }
260             sub _com_obj { return $_[0]->{_cmd_ele}->[-1]; }
261             sub _config { return $_[0]->{_config}; }
262              
263             sub _load_drm {
264             $logger->debug("In _load_drm.");
265             my $cfg = shift;
266             my $param = $Grid::Request::HTC::drm_param;
267             my $drm = uc($cfg->val($section, $param));
268             if (! defined $drm) {
269             Grid::Request::Exception->throw("No $param parameter specified in the config file " . $cfg->GetFileName());
270             }
271             my $package = __PACKAGE__ . "::DRM::$drm";
272             my $load_result = eval "require $package";
273              
274             if (defined($load_result) && ($load_result == 1) ) {
275             import $package;
276             $logger->info(qq|Loaded and imported "$package".|);
277             } else {
278             my $msg = qq|Could not load "$package".|;
279             $logger->fatal($msg);
280             Grid::Request::Exception->throw($msg);
281             }
282             my $drm_obj;
283             eval {
284             $drm_obj = $package->new();
285             };
286             if ($@ or ! defined $drm_obj) {
287             Grid::Request::Exception->throw(qq|Could not instantiate "$package".|);
288             }
289             $logger->info("Returning a " . ref($drm_obj) . " object.");
290             return $drm_obj;
291             }
292              
293             # A private method for internal use only.
294             sub _get_env_list {
295             $logger->debug("In get_env_list");
296             my $self = shift;
297             # If the environment hasn't yet been determined, determine it and
298             # store it in element 6 (number 5).
299             if (scalar(@{ $self->{_env} }) == 0) {
300             my @temp_env;
301             foreach my $key (keys %ENV) {
302             my $value = $ENV{$key};
303             if((index($key, ";") == -1) && (index($value, ";") == -1)) {
304             push (@temp_env, "$key=$value");
305             }
306             }
307             $self->{_env} = \@temp_env;
308             }
309              
310             # Return either the list or the reference depending on the context.
311             return wantarray ? @{ $self->{_env} } : $self->{_env};
312             }
313              
314             # This is invoked before submit and submit_and_wait
315             sub _validate {
316             my $self = shift;
317             $logger->debug("In _validate.");
318             my $rv = 1;
319             if ($self->project() && ($self->project() =~ m/\s/)) {
320             Grid::Request::Exception->throw("White space is not allowed for the 'project' attribute.");
321             }
322              
323             if (defined($self->account()) && length($self->account()) && $self->account() =~ m/\s/) {
324             Grid::Request::Exception->throw("White space is not allowed for the 'account' attribute.");
325             }
326              
327             $logger->debug("Returning $rv.");
328             return $rv;
329             }
330              
331              
332             # This method knows how to dispatch method invocations to the proper module
333             # or class by checking the name against the hashes set up early in this
334             # module. The hashes are used to look up which methods go where.
335             sub AUTOLOAD {
336             my ($self, @args) = @_;
337             my $method = (split(/::/, $AUTOLOAD))[-1];
338             my $set = 0;
339             if (($method =~ m/^set_/) || (@args && $method !~ m/^get_/)) {
340             $set = 1;
341             }
342             $method =~ s/^(s|g)et_//;
343              
344             if ( $comm_meths{$method} ) {
345             $logger->debug("Received a Command method: $method.");
346             if ($set) {
347             if (! $self->is_submitted()) {
348             $self->_com_obj->$method(@args);
349             } else {
350             $logger->logcroak("Cannot change a Command object after submission.");
351             }
352             } else {
353             $self->_com_obj->$method;
354             }
355             } else {
356             $logger->logcroak("No such method: $AUTOLOAD.");
357             }
358             }
359              
360              
361             # We need a DESTROY method because we are using AUTOLOAD. Otherwise,
362             # the autoload mechanism will fail because it cannot find a DESTROY
363             # method. Don't modify or remove unless you know what you are doing.
364             # This is a private method that is not to be invoked directly.
365             sub DESTROY {
366             # Close the DRMAA Session (if necessary).
367             if ( $DRMAA_INITIALIZED != 0 ) {
368             $logger->debug("Closing the DRMAA session.");
369             my ($error, $diagnosis) = drmaa_exit();
370             if ($error) {
371             $logger->error("Error closing the DRMAA session: ",
372             drmaa_strerror($error), $diagnosis);
373             } else {
374             $DRMAA_INITIALIZED = 0;
375             }
376             }
377             }
378              
379              
380             =item $obj->account([account]);
381              
382             B The account attribute is used to affiliate a grid job with
383             a particular account. Grid engines differ in their treatment of the account
384             attribute.
385              
386             B To use as a setter, the first parameter will be used to
387             set (or reset) the account attribute for the command.
388              
389             B The currently set account (if called with no parameters).
390              
391              
392             =item $obj->add_param($scalar | @list | %hash );
393              
394             B Add a command line argument to the executable when it is
395             executed on the grid. Since many executables associate meaning with the order
396             that command line arguments are given, Grid::Request also honors the
397             order in which parameters are added. They are reassembled at runtime on the grid in
398             the same order that they were added...
399              
400             B If the number of arguments is 1, then it will be considered to
401             be a simple, "anonymous" parameter and ... When called with a single scalar
402             argument, no logic is attempted to interpret the string provided. The module
403             simply adds the specified string verbatim to the list of parameters when
404             building the command line to invoke on the grid. If 3 parameters are passed,
405             then they are read as "key", "value", "type". The parameter 'type' can be
406             either "ARRAY", "DIR", "PARAM", or "FILE" (the default is "PARAM"). Any other
407             number of parameters passed will yield an error.
408              
409             The 'type' is used in the following way to aid in the parallelization of
410             processes: If ARRAY is used, the job will be iterated over the elements of the
411             array, with the value of the parameter being changed to the next element of the
412             array each time. The array must be an array of simple strings passed in as an
413             array reference to 'value'. Newlines will be stripped. Note: Nested data
414             structures will not be respected.
415              
416             If "DIR" is specified as the 'type', the file contents of the directory
417             specified by the 'value' will be iterated over. If the directory contains 25
418             files, then there will be at least 25 invocations of the executable on the grid
419             ( one per file) with the name of each file substituted for the '$(Name)' token
420             each time. Note that hidden files and directories are not counted and the
421             directory is NOT scanned recursively.
422              
423             If "FILE" is specified, then the 'value' specified in the method call
424             will be interpreted as the path to a file containing entries to iterate over.
425             The file may contain hundreds of entries (1 per line) to generate a
426             corresponding number of jobs.
427              
428             If greater clarity and flexibility is desired, one may wish to pass named
429             parameters in a hash reference instead:
430              
431             $obj->add_param( { key => '--someparam=$(Name)',
432             value => "/path/to/directory",
433             type => "DIR",
434             });
435              
436             The 3 supported keys are case insensitive, so "KEY", "Value" and "tYpE" are
437             also valid. Unrecognized keys will generate warnings.
438              
439             If more then 3 arguments are passed to the method an error occurs.
440              
441             For each parameter that is added, the 'key' is what dictates how the parameter
442             should be processed as a command line argument and how the values from the
443             iterable directory, array or file are to be dropped into the final command line
444             invocation. Parameter keys can make use of two tokens: $(Index) and $(Name).
445             The $(Index) token is replaced at runtime with the actual sequence number of
446             the job on the grid. The '$(Name)' token is replaced with the string taken from
447             the iterable file, directory or array. In the case of parameters of type
448              
449             FILE -> $(Name) is repeatedly replaced with each line in the file
450             DIR -> $(Name) is repeatedly replaced with the name of each file in the directory
451             ARRAY -> $(Name) is repeatedly replaced with each scalar value of the element of the array
452              
453             Examples:
454              
455             FILE
456             $request->add_param({ type => "FILE",
457             key => '--string=$(Name)',
458             value => "/path/to/some/file.txt",
459             });
460              
461             DIR
462             $request->add_param({ type => "DIR",
463             key => '--filepath=$(Name)',
464             value => "/path/to/some/directory",
465             });
466             ARRAY
467             $request->add_param({ type => "ARRAY",
468             key => '--element=$(Name)',
469             value => \@array,
470             });
471              
472             B None.
473              
474             =cut
475              
476             sub add_param {
477             $logger->debug("In add_param.");
478             my ($self, @args) = @_;
479              
480             # This is just a function to set the temporary directory on the
481             # command object. It's necessary when the user calls add_param with
482             # a type of "ARRAY". The temp dir is the location where a file is
483             # created that contains each element of the array.
484             my $tempdir_setter = sub {
485             $logger->debug("Getting the configuration object.");
486             my $cfg = $self->_config();
487             $logger->debug("Setting the temporary directory ",
488             "on the command object.");
489             my $tempdir = $cfg->val($Grid::Request::HTC::config_section, "tempdir");
490             if (defined $tempdir) {
491             $self->_com_obj->tempdir($tempdir);
492             } else {
493             Grid::Request::Exception->throw("tempdir has not been configured in " . $cfg->GetFileName());
494             }
495             };
496              
497             if ( (@args == 1) && (ref($args[0]) eq "HASH") ) {
498             foreach my $key ( keys %{ $args[0] } ) {
499             if ( (uc($key) eq "TYPE") && (uc($args[0]->{type}) eq "ARRAY") ) {
500             $tempdir_setter->();
501             }
502             }
503             } elsif ( (@args == 3) && ($args[2] eq "ARRAY") ) {
504             $tempdir_setter->();
505             }
506             my $return = $self->_com_obj->add_param(@args);
507             return $return;
508             }
509              
510             =item $obj->block_size( [ $scalar | $code_ref ] );
511              
512             B By default, Master/Worker (mw) jobs have a default block size
513             of 100. That is to say, that each worker on the grid will process 100 elements
514             of the overall pool of job invocations. However, this isn't always appropriate.
515             The user may override the default block size by calling this method and setting
516             the block size to an alternate value (a positive integer). The user may also
517             provide an anonoymous subroutine (code reference) so that the block size can be
518             computed dynamically. If choosing to pass a subroutine , the code reference
519             will be passed two arguments: the Grid::Request::Command object that will be
520             invoked, and the number of elements that will be iterated over, in that order.
521             The subroutine can then use these pieces of information to compute the block
522             size. The subroutine MUST return a positive integer scalar or an exception will
523             be thrown.
524              
525             Examples:
526             # simple scalar block size
527             $request->block_size(1000);
528              
529             # Passing a code ref, to make the block size dependent on the
530             # executable...
531             $request->block_size(
532             sub {
533             my $com_obj = shift;
534             my $count = shift;
535              
536             my $exe = $com_obj->command();
537              
538             my $block_size = 50;
539             if ($exe =~ m/sort/i) {
540             $block_size = ($count > 100000) ? 10000 : 1000;
541             }
542             return $block_size;
543             }
544             );
545              
546             B A positive integer scalar, or an anonymous subroutine/code
547             reference.
548              
549             B The block size scalar or code reference if called as an accessor
550             (no-arguments). If the block size has not been explicitly set, then the default
551             block size is returned. No return if called as a mutator.
552              
553             =item $obj->class([$class]);
554              
555             B This method is used to set and retrieve the request's class
556             attribute. A request's class describes its general purpose or what it will
557             be used for. For example, a command can be marked as a request for "engineering"
558             or "marketing". Ad hoc requests will generally not use a class setting. If in
559             doubt, leave the class attribute unset.
560              
561             B With no parameters, this method functions as a getter. With one
562             parameter, the method sets the request's class. No validation is
563             performed on the class passed in.
564              
565             B The currently set class (when called with no arguments).
566              
567              
568             =item $obj->command([$command]);
569              
570             B This method is used to set or retrieve the executable that
571             will be called for the request.
572              
573             B With no parameters, this method functions as a getter. With one
574             parameter, the method sets the executable. Currently, this module does not
575             attempt to verify whether the exeutable is actually present or whether
576             permissions on the executable allow it to be invoked by the user on grid
577             machines.
578              
579             B The currently set executable (when called with no arguments).
580              
581              
582             =item $obj->email([$email_address]);
583              
584             B This method is used to set or retrieve the email of the user
585             submitting the request. The email is important for notifications and for
586             tracking purposes in case something goes wrong.
587              
588             B With no parameters, this method functions as a getter and
589             returns the currently configured email address. If the request has not yet been
590             submitted, the user may set or reset the email address by providing an
591             argument. The address is not currently validated for RFC compliance.
592              
593             B The email address currently set, or undef if unset (when called
594             with no arguments).
595              
596              
597             =item $obj->end_time()
598              
599             B Retrieve the finish time of the request.
600              
601             B None.
602              
603             B The ending time of the request (the time the grid finished
604             processing the request), or undef if the ending time has not yet been
605             determined.
606              
607              
608             =item $obj->error([errorfile])
609              
610             B This method allows the user to set, or if the request has not
611             yet been submitted, to reset the error file. The error file will be the place
612             where all STDERR from the invocation of the executable will be written to. This
613             file should be in a globally accessible location on the filesystem such that
614             grid execution machines may create the files. The attribute may not be changed
615             with this method once the request has been submitted.
616              
617             B To set the error file, call this method with one parameter,
618             which should be the path to the file where STDERR is to be written. Note that
619             when submitting array jobs (with the use of the times() method or with
620             Master/Worker parameters through add_param()), one can also use the $(Index)
621             token when specifying the error path. The token will be replaced with the
622             grid's task ID number. For example, if a request generated 100 grid jobs, then
623             an error path containing '/path/to/directory/job_$(Index).err' will result in
624             STDERR files numbered job_1.err, job_2.err, ..., job_100.err in
625             /path/to/directory.
626              
627             B When called with no arguments, this method returns the currently
628             set error file, or undef if not yet set.
629              
630              
631             =item $obj->getenv([1]);
632              
633             B The getenv method is used to set whether the user's environment
634             should be replicated to the grid or not. To replicate your environment, call
635             this method with an argument that evaluates to true. Calling it with a 0
636             argument, or an expression that evaluates to false, will turn off environment
637             replication. The default is NOT to replicate the user environment across the
638             grid.
639              
640             B This method behaves as a getter when called with no arguments.
641             If called with 1, or more arguments, the first will be used to set the
642             attribute to either 1 or 0.
643              
644             B The current setting for getenv (if called with no arguments).
645              
646              
647             =item $obj->ids();
648              
649             B This method functions only as a getter, but returns
650             the DRM ids associated with the overall request after it has been
651             submitted.
652              
653             B None.
654              
655             B Returns an array in list context. In scalar context, returns a
656             reference to an array.
657              
658             =cut
659              
660             sub ids {
661             $logger->debug("In id");
662             my ($self, @args) = @_;
663             if (@args) {
664             my $msg = "The ids method takes only one argument " .
665             "when making an assignment.";
666             $logger->logwarn($msg);
667             }
668             my $total = $self->command_count();
669             my @ids;
670             my $count = 1;
671             for (my $cmd=0;$cmd<$total;$cmd++) {
672             $logger->debug("Getting ids from command $count/$total.");
673             my $cmd_obj = $self->_com_obj_list->[$cmd];
674             my @sub_ids = $cmd_obj->ids();
675             push @ids, @sub_ids;
676             $count++;
677             }
678             return wantarray ? @ids : \@ids;
679             }
680              
681              
682             =item $obj->is_submitted();
683              
684             B Returns whether a request object has been submitted.
685              
686             B None.
687              
688             B 1 if the request has been submitted and 0 if it has not.
689              
690             =cut
691              
692             sub is_submitted {
693             my ($self, $submitted) = @_;
694             if (defined($submitted)) {
695             $self->{_submitted} = ($submitted) ? 1 : 0;
696             } else {
697             return $self->{_submitted};
698             }
699             }
700             =item $obj->project([$project]);
701              
702             B The project attribute is used to affiliate usage of the DRM
703             with a particular administrative project. This will allow for more effective
704             control and allocation of resources, especially when high priority projects
705             must be fulfilled. The caller may change the project setting as long as the job
706             has not yet been submitted (after submission most request attributes are
707             locked).
708              
709             B The first parameter will be used to set (or reset)
710             the project attribute for the request, as long as the request has not
711             been submitted.
712              
713             B The currently set project (if called with no arguments).
714              
715              
716             =item $obj->input([path]);
717              
718             B Used to specify a file to be used as the STDIN for
719             the executable on the grid.
720              
721             B A scalar containing the globally accessible path to
722             the file to use for STDIN.
723              
724             B The currently set input file if called as a getter with no
725             arguments, or undef if not yet set.
726              
727              
728             =item $obj->initialdir([path]);
729              
730             B This method sets the directory where the grid will be
731             chdir'd to before invoking the executable. This is an optional parameter,
732             and if the user leaves it unspecified, the default will be that the grid
733             job will be chdir'd to the root directory "/" before beginning the request.
734             Use of initialdir is encouraged to promote the use of relative paths.
735              
736             B A scalar holding the path to the directory the grid should
737             chdir to before invoking the executable.
738              
739             B When called with no arguments, returns the currently set
740             initialdir, or undef if not yet set.
741              
742              
743             =item $obj->name([name]);
744              
745             B The name attribute for request objects is optional and is
746             provided as a convenience to users to name their requests.
747              
748             B A scalar name for the request.
749              
750             B When called with no arguments, returns the current name, or
751             undef if not yet set. The name cannot be changed once a request is submitted.
752              
753              
754             =item $obj->opsys([$os]);
755              
756             B The default operating system that the request will be processed
757             on is Linux. Users can choose to submit requests to other operating systems by
758             using this method. Available operating systems are "Linux", "Solaris". An
759             attempt to set the opsys attribute to anything else results in an error. Values
760             must be comma separated, so if you would loke your command to run on Linux or
761             Solaris:
762              
763             $obj->opsys("Linux,Solaris");
764              
765             and for Linux only:
766              
767             $obj->opsys("Linux"):
768              
769             B "Linux", "Solaris", etc, when called as a setter
770             (with one argument).
771              
772             B When called with no arguments, returns the operating system the
773             request will run on, which defaults to "Linux".
774              
775             =item $obj->hosts([hostname]);
776              
777             B Used to set a set the list of possible machines to run the
778             jobs on. If this value is not set then any host that matches the other
779             requirements will be used according to the grid engine in use.
780             Hostnames are passed in in comma-separated form with no spaces.
781              
782             B hostname(s), example "machine1,machine2"
783              
784             B When called with no arguments, returns the hosts if set.
785              
786              
787             =item $obj->memory([megabytes]);
788              
789             B Used to set the minimum amount of physical memory needed.
790              
791             B memory in megabytes. Examples: 1000MB, 5000MB
792              
793             B When called with no arguments, returns the memory if set.
794              
795             =item $obj->new_command();
796              
797             B The module allows for requests to encapsulate multiple
798             commands. This method will start work on a a new one by moving a cursor.
799             Commands are processed in the order in which they are created if they are
800             submitted synchronously, or in parallel if submitted asynchronously (the
801             default). In addition, the only attribute that the new command inherits from
802             the command that preceded it, is the project (if set). However, users are free
803             to change the project by calling the project() method...
804              
805             B None.
806              
807             B None.
808              
809             =cut
810              
811             sub new_command {
812             $logger->debug("In new_command.");
813            
814             my ($self, @args) = @_;
815             if (scalar @args) {
816             Grid::Request::InvalidArgumentException->throw("No arguments are valid for new_command().");
817             }
818              
819             # The only piece of information replicated from command to command is the
820             # project. So we first get the project and then use it to build the new
821             # Command object.
822             my $project = $self->project();
823              
824             # Increment element pointer.
825             my @objs = $self->_com_obj_list;
826             my $cmd_len = scalar(@objs);
827              
828             $logger->debug("Creating new Command object in element $cmd_len.");
829             if (defined $project && length($project)) {
830             $self->_com_obj_list->[$cmd_len] =
831             Grid::Request::Command->new( project => $project );
832             } else {
833             $self->_com_obj_list->[$cmd_len] = Grid::Request::Command->new();
834             }
835             }
836              
837              
838             =item $obj->pass_through([pass_value]);
839              
840             B Used to pass strings to the underlying DRM (Distributed
841             Resource Mangement) system (Condor, SGE, LSF, etc...) as part of the
842             request's requirements. Such pass throughs are forwarded unchanged. This is an
843             advanced option and should only be used by those familiar with the the
844             underlying DRM.
845              
846             B $string, a scalar.
847              
848             B None.
849              
850              
851             =item $obj->output([path]);
852              
853             B Sets the path for the output file, which would hold all of
854             the output directed to STDOUT by the request on the grid. This method functions
855             as a setter and getter.
856              
857             B A path to a file. The file must be globally accessible on the
858             filesystem in order to work, otherwise, the location will not be accessible to
859             compute nodes on the grid. This attribute may not be changed once a request is
860             submitted. Note that when submitting array jobs (with the use of the times()
861             method or with Master/Worker parameters through add_param()), one can also use
862             the $(Index) token when specifying the output path. The token will be replaced
863             with the grid's task ID number. For example, if a request generated 100 grid
864             jobs, then an output path containing '/path/to/directory/job_$(Index).out' will
865             result in STDOUT files numbered job_1.out, job_2.out, ..., job_100.out in
866             /path/to/directory.
867              
868             B When called with no arguments, the method returns the currently
869             set path for the output file, or undef if not yet set.
870              
871              
872             =item $obj->params();
873              
874             B Retrieve the list of currently registered parameters for the
875             request.
876              
877             B None.
878              
879             B The method returns a list of hash references.
880              
881              
882             =item $obj->priority([priority]);
883              
884             B Use this method to set the optional priority attribute on the
885             request. The priority setting is used to help allocate the appropriate
886             resources to the request. Higher priority requests may displace lower priority
887             requests.
888              
889             B Scalar priority value.
890              
891             B The current priority, or undef if unset.
892              
893             =item $obj->runtime([minutes]);
894              
895             B Cap the runtime of the job on the DRM. Most DRM systems
896             have a mechanism to limit the maximum amount of time a job can run. This
897             method accepts a value in minutes.
898              
899             B Scalar containing a positive integer nubmer of minutes.
900              
901             B The current runtime limit if called as a getter, or undef if unset.
902              
903              
904             =item $obj->set_env_list(@vars);
905              
906             B This method is used to establish the environment that a
907             request to the grid should run under. Users may pass this method a list of
908             strings that are in "key=value" format. The keys will be converted into
909             environment variables set to "value" before execution of the command is begun.
910             Normally, a request will not copy the user's environment in this way. The only
911             time the environment is established on the grid will be if the user invokes the
912             getenv method or sets it with this method. This method allows the user to
913             override the environment with his or her own notion of what the environment
914             should be at runtime on the grid.
915              
916             B A list of strings in "key=value" format. If any string does
917             not contain the equals (=) sign, it is skipped and a warning is generated.
918              
919             B None.
920              
921             =cut
922              
923             sub set_env_list {
924             my ($self, @args) = @_;
925             my @valid;
926             foreach my $arg (@args) {
927             if ($arg !~ /\S+=\S+/) {
928             $logger->logcroak("$arg is not a valid environment parameter. Skipping it.");
929             next;
930             }
931             push(@valid, $arg);
932             }
933              
934             $self->{_env} = \@valid;
935              
936             # If the user has set their own environment with set_envlist, then we
937             # assume that they want getenv to be true. We do it for them here to save
938             # them an extra step.
939             $self->getenv(1);
940             }
941              
942              
943             =item $obj->show_invocations();
944              
945             B Show what will be executed on the DRM. An attempt is made
946             to excape shell sensitive characters so that the commands can be copied
947             and pasted for test execution.
948              
949             B None.
950              
951             B None. The method print to STDOUT.
952              
953             =cut
954              
955             sub show_invocations {
956             my $self = shift;
957              
958             # Remember, we may have multiple command objects...
959             my @command_objs = $self->_com_obj_list();
960              
961             # Iterate over them...
962             my $command_count = 1;
963              
964             eval { require Grid::Request::JobFormulator };
965             my $formulator = Grid::Request::JobFormulator->new();
966              
967             foreach my $com_obj (@command_objs) {
968             print "Command #" . $command_count . "\n";
969             my $exe = $com_obj->command();
970             my $block_size = $com_obj->block_size();
971             my @params = $com_obj->params();
972             my @param_strings = ();
973              
974             foreach my $param_obj (@params) {
975             my $param_str = $param_obj->to_string();
976             push (@param_strings, $param_str);
977             }
978              
979             my @invocations = $formulator->formulate($block_size, $exe, @param_strings);
980             foreach my $invocations (@invocations) {
981             my @cli = @$invocations;
982             my @esc_cli = _esc_chars(@cli);
983             print join(" ", @esc_cli) . "\n";
984             }
985            
986             $command_count++;
987             }
988             }
989              
990             sub _esc_chars {
991             # will change, for example, a!!a to a\!\!a
992             my @cli = @_;
993             my $e;
994             @cli = map { $e =$_; $e =~ s/([;<>\*\|`&\$!#\(\)\[\]\{\}:'"])/\\$1/g; $e } @cli;
995             @cli = map { if ($_ =~ m/\s/) { '"' . $_ . '"' } else { $_ } } @cli;
996             return @cli;
997             }
998              
999              
1000             =item $obj->simulate([value]);
1001              
1002             B This method is used to toggle the simulate flag for the
1003             request. If this method is passed a true value, the request will not be
1004             submitted to the grid, but will appear to have been submitted. This is most
1005             useful in development and testing environments to conserve resources. When a
1006             request marked simulate is submitted, the request ID returned will be -1. Note
1007             that this attribute cannot be modified once a request is submitted.
1008              
1009             B A true value (such as 1) to mark the request as a simulation.
1010             A false value, or express (such as 0) to mark the request for execution.
1011              
1012             B When called with no arguments, this method returns the current
1013             values of the simulate toggle. 1 for simulation, 0 for execution.
1014              
1015             =cut
1016              
1017             sub simulate {
1018             $logger->debug("In simulate.");
1019             my ($self, $simulate, @args) = @_;
1020             if (defined $simulate) {
1021             $self->{_simulate} = ($simulate) ? 1 : 0;
1022             } else {
1023             return $self->{_simulate};
1024             }
1025             }
1026              
1027              
1028             =item $obj->start_time();
1029              
1030             B Retrieve the start time when the request began processing.
1031             Any attempt to set the time will result in an error.
1032              
1033             B None.
1034              
1035             B $time, the start time (scalar) that the grid began processing
1036             the request.
1037              
1038              
1039             =item $obj->state();
1040              
1041             B Retrieve the "state" attribute of the request. This method
1042             is "read only" and an attempt to set the state will result in an error.
1043             The states are:
1044              
1045             INIT
1046             INTERRUPTED
1047             FAILURE
1048             FINISHED
1049             RUNNING
1050             SUSPENDED
1051             UNKNOWN
1052             WAITING
1053              
1054             B None.
1055              
1056             B $state, a scalar with the current state of the request.
1057              
1058              
1059             =item $obj->stop([$id]);
1060              
1061             B Stop a request that has already been submitted.
1062              
1063             B Request ID (optional)
1064              
1065             B None.
1066              
1067             =cut
1068              
1069             sub stop {
1070             $logger->debug("In stop.");
1071             my ($self, $stop_id, @args) = @_;
1072              
1073             if (! defined $stop_id) {
1074             if (! $self->is_submitted()) {
1075             $logger->warn("Stop was called but the request was not submitted. Do nothing...");
1076             return;
1077             } else {
1078             $logger->debug("Stop called for self.");
1079             $stop_id = $self->get_id();
1080             # TODO Stop all jobs associated with this Request.
1081             }
1082             } else {
1083             $logger->warn("The stop method takes only one argument.") if @args;
1084             #TODO: Stop a particular ID
1085             }
1086             }
1087              
1088              
1089             =item $obj->submit_serially();
1090              
1091             B Calling this method is the equivalent of calling
1092             submit with the serial flag set to a true value, eg. $obj->submit(1).
1093              
1094             B None.
1095              
1096             B The array of grid ids in list context, or an array reference
1097             in scalar context.
1098              
1099             =cut
1100              
1101             sub submit_serially {
1102             $logger->debug("In submit_serially.");
1103             my $self = shift;
1104             if ($self->is_submitted()) {
1105             Grid::Request::Exception->throw("This request has already been submitted.");
1106             }
1107             my @ids = $self->submit(1);
1108             return wantarray ? @ids : \@ids;
1109             }
1110              
1111             =item $obj->submit([$serial]);
1112              
1113             B Submit the request to the grid for execution.
1114              
1115             B An optional parameter, which if true, will cause
1116             the commands to be executed serially. The default is for asynchronous
1117             execution
1118              
1119             B The array of DRM ids in list context, or an array reference
1120             in scalar context.
1121              
1122             =cut
1123              
1124             sub submit {
1125             $logger->debug("In submit.");
1126             my ($self, $serially, @args) = @_;
1127             my @ids = ();
1128             if ($self->is_submitted()) {
1129             Grid::Request::Exception->throw("This request has already been submitted.");
1130             }
1131             $serially = (defined $serially) ? $serially : 0;
1132             if ($self->_validate()) {
1133             $logger->info("Validation process succeeded.");
1134             if ($self->simulate()) {
1135             $logger->debug("Simulation is turned on, so do not really submit.");
1136             } else {
1137             @ids = $self->_drmaa_submit($serially);
1138             }
1139             # Set the submitted flag, so we can't submit multiple times.
1140             $logger->debug("Setting the submitted flag.");
1141             $self->is_submitted(1);
1142             } else {
1143             my $msg = "Validation failed.";
1144             $logger->error($msg);
1145             Grid::Request::Exception->throw($msg);
1146             }
1147              
1148             $logger->debug("Returning from submit.");
1149             return wantarray ? @ids : \@ids;
1150             }
1151              
1152              
1153             # A private method for internal use only. Throws a DRMAA
1154             # related exception.
1155             sub _throw_drmaa {
1156             my ($msg, $error, $diagnosis) = @_;
1157             $logger->error($msg);
1158             $logger->error("Diagnosis: $diagnosis");
1159             Grid::Request::DRMAAException->throw(
1160             error => $msg,
1161             drmaa => drmaa_strerror($error),
1162             diagnosis => $diagnosis
1163             );
1164             }
1165              
1166             # This is a private method for internal use only. It initializes a DRMAA
1167             # session. It also logs some very basic information about the DRMAA
1168             # implementation.
1169             sub _init_drmaa {
1170             $logger->debug("In _init_drmaa.");
1171             if ($DRMAA_INITIALIZED) {
1172             $logger->debug("DRMAA session aready initialized.");
1173             } else {
1174             my ($error, $diagnosis) = drmaa_init("session=$SESSION_NAME");
1175             _throw_drmaa("Could not initialize DRMAA", $error, $diagnosis) if $error;
1176             $DRMAA_INITIALIZED = 1;
1177              
1178             my $contact;
1179             ($error, $contact, $diagnosis) = drmaa_get_contact();
1180              
1181             # Log the DRMAA version if we are in debug logging mode.
1182             if ($logger->is_debug()) {
1183             my ($major, $minor);
1184             ($error, $major, $minor, $diagnosis) = drmaa_version();
1185             if ($error) {
1186             $logger->warn("Unable to get the DRMAA Version: " . drmaa_strerror($error));
1187             } else {
1188             $logger->debug("DRMAA Version: ${major}.${minor}.");
1189             }
1190             }
1191             }
1192             }
1193              
1194              
1195             # Default is serial execution. This is a private method for internal use only.
1196             sub _drmaa_submit {
1197             $logger->debug("In _drmaa_submit.");
1198             my ($self, $serially) = @_;
1199             $serially = (defined $serially) ? $serially : 0;
1200             if ($serially) {
1201             $logger->debug("Submissions will occur serially.");
1202             } else {
1203             $logger->debug("Submissions will occur asynchronously.");
1204             }
1205              
1206             my @ids;
1207             my $total = $self->command_count();
1208             my $count = 1;
1209              
1210             eval {
1211             for (my $cmd = 0; $cmd < $total; $cmd++) {
1212             my $cmd_obj = $self->_com_obj_list->[$cmd];
1213             $logger->debug("Submitting command $count/$total.");
1214             my $jt = $self->_cmd_base_drmaa($cmd_obj);
1215             $logger->debug("Got a good job template.") if defined $jt;
1216             my @sub_ids;
1217             if ($cmd_obj->cmd_type() eq "mw") {
1218             @sub_ids = $self->_submit_mw($jt, $cmd_obj);
1219             } else {
1220             @sub_ids = $self->_submit_htc($jt, $cmd_obj);
1221             }
1222             if ($serially) {
1223             _sync_ids($cmd_obj);
1224             }
1225             push @ids, @sub_ids;
1226             $count++;
1227             }
1228             };
1229              
1230             my $e;
1231             if ($e = Exception::Class->caught("Grid::Request::DRMAAException")) {
1232             $logger->fatal("Unable to submit: " . $e->diagnosis() . ": " . $e->drmaa() );
1233             $e->rethrow();
1234             } elsif ( $e = Exception::Class->caught("Grid::Request::Exception") ) {
1235             $logger->fatal("Unable to submit: " . $e->error());
1236             $e->rethrow();
1237             } else {
1238             $e = Exception::Class->caught();
1239             if ($e) {
1240             ref $e ? $e->rethrow : $logger->logcroak("Unable to submit: $e");
1241             }
1242             }
1243              
1244             $logger->debug("Finished submitting.");
1245             return wantarray ? @ids : \@ids;
1246             }
1247              
1248             # Private method for internal use only. Used to conigure DRMAA settings
1249             # that are common to all jobs, regardless of whether they are Master-Worker (mw)
1250             # or simple jobs (htc).
1251             sub _cmd_base_drmaa {
1252             $logger->debug("In _cmd_base_drmaa.");
1253             my ($self, $cmd) = @_;
1254              
1255             my ($error, $jt, $diagnosis) = drmaa_allocate_job_template();
1256             _throw_drmaa("Could not allocate job template", $error, $diagnosis) if $error;
1257              
1258             my $input = $cmd->input();
1259             if (defined $input) {
1260             ($error, $diagnosis) = drmaa_set_attribute($jt, $DRMAA_INPUT_PATH, ':' . $input);
1261             _throw_drmaa("Could not set input path.", $error, $diagnosis) if $error;
1262             }
1263              
1264             # To prevent users from getting an accumulation of job_name.e* and job_name.o* files
1265             # in their working directories (most likely their home directories), we set the error
1266             # and output paths to /dev/null unless they were specified...
1267             my $output = $cmd->output();
1268             my $error_path = $cmd->error();
1269             if (defined($output) || defined($error_path)) {
1270             $output =~ s/\$\(Index\)/\$drmaa_incr_ph\$/g if defined($output);
1271             $error_path =~ s/\$\(Index\)/\$drmaa_incr_ph\$/g if defined($error_path);
1272              
1273             $output ||= '/dev/null';
1274             $logger->debug("STDOUT will go to $output.");
1275             ($error, $diagnosis) = drmaa_set_attribute($jt, $DRMAA_OUTPUT_PATH, ':' . $output);
1276             _throw_drmaa("Could not set output path.", $error, $diagnosis) if $error;
1277              
1278             $error_path ||= '/dev/null';
1279             $logger->debug("STDERR will go to $error_path.");
1280             ($error, $diagnosis) = drmaa_set_attribute($jt, $DRMAA_ERROR_PATH, ':' . $error_path);
1281             _throw_drmaa("Could not set output path.", $error, $diagnosis) if $error;
1282             } else {
1283             $logger->info("Neither output nor error were defined. Setting both to go /dev/null.");
1284             ($error, $diagnosis) = drmaa_set_attribute($jt, $DRMAA_OUTPUT_PATH, ':/dev/null');
1285             _throw_drmaa("Could not set input path to /dev/null", $error, $diagnosis) if $error;
1286             ($error, $diagnosis) = drmaa_set_attribute($jt, $DRMAA_JOIN_FILES, 'y');
1287             _throw_drmaa("Could not tell DRM to join input and output files.", $error, $diagnosis) if $error;
1288             }
1289            
1290             my $initialdir = $cmd->initialdir();
1291             if (defined $initialdir) {
1292             ($error, $diagnosis) = drmaa_set_attribute($jt, $DRMAA_WD, $initialdir);
1293             _throw_drmaa("Could not set the job working directory.", $error, $diagnosis) if $error;
1294             }
1295            
1296             my $name = $cmd->name();
1297             if (defined $name) {
1298             ($error, $diagnosis) = drmaa_set_attribute($jt, $DRMAA_JOB_NAME, $name);
1299             _throw_drmaa("Could not set command name.", $error, $diagnosis) if $error;
1300             }
1301              
1302             # Replicate the environment, if the user asked for it
1303             if ($cmd->getenv()) {
1304             $logger->info("Setting environment attributes for the job.");
1305             my $env_ref = $self->_get_env_list();
1306             ($error, $diagnosis) = drmaa_set_vector_attribute($jt, $DRMAA_V_ENV, $env_ref);
1307             _throw_drmaa("Unable to set the job environment.", $error, $diagnosis) if $error;
1308             }
1309              
1310             # Set the notification email address, if configured
1311             my $email = $cmd->email();
1312             if ($email) {
1313             $logger->info("Setting DRM to not block emails.");
1314             ($error, $diagnosis) = drmaa_set_attribute($jt, $DRMAA_BLOCK_EMAIL, "0");
1315             _throw_drmaa("Unable to unblock emails.", $error, $diagnosis) if $error;
1316             $logger->info("Setting the job email.");
1317             ($error, $diagnosis) = drmaa_set_vector_attribute($jt, $DRMAA_V_EMAIL, [$email]);
1318             _throw_drmaa("Unable to set the job email.", $error, $diagnosis) if $error;
1319             }
1320            
1321             my @drm_methods = qw(account hosts opsys evictable priority memory
1322             project class runtime);
1323             my @native_attrs;
1324             foreach my $method (@drm_methods) {
1325             my $val = $cmd->$method;
1326             if (defined $val) {
1327             # Translate the user provided value to what the DRM understands by calling the
1328             # DRM plugin...
1329             my $attr = $self->_drm->$method($val);
1330             $logger->debug(qq|DRM plugin mapped "$val" to "$attr".|);
1331             push (@native_attrs, $attr) if defined $attr;
1332             } else {
1333             $logger->debug(qq|Nothing defined for "$method".|);
1334             }
1335             }
1336             # Apply the pass_through, if configured
1337             my $pass_through = $cmd->pass_through();
1338             if ($pass_through) {
1339             $logger->info("Adding job pass-through: $pass_through");
1340             push (@native_attrs, $pass_through);
1341             }
1342              
1343             if ( scalar(@native_attrs) > 0 ) {
1344             my $native = join(" ", @native_attrs);
1345             $logger->debug(qq|Setting native attribute "$native".|);
1346             ($error, $diagnosis) = drmaa_set_attribute($jt, $DRMAA_NATIVE_SPECIFICATION, $native);
1347             _throw_drmaa("Unable to set native specification.", $error, $diagnosis) if $error;
1348             }
1349              
1350             return $jt;
1351             }
1352              
1353              
1354             # Private method for internal use only. This method is used to submit
1355             # mw (Master/Worker) jobs, which are jobs that iterate over files in a directory,
1356             # lines in a file, or elements in an array, by calling grid_request_worker.
1357             sub _submit_mw {
1358             $logger->debug("In _submit_mw.");
1359             my ($self, $jt, $cmd) = @_;
1360             unless (defined $jt && defined $cmd) {
1361             Grid::Request::InvalidArgumentException->throw("Job template and/or command object are not defined.");
1362             }
1363              
1364             $logger->debug("Setting the command executable.");
1365             my ($error, $diagnosis) = drmaa_set_attribute($jt, $DRMAA_REMOTE_COMMAND, $WORKER);
1366             _throw_drmaa("Could not set command executable.") if $error;
1367              
1368             # Calculate how many workers we need. First, calculate the number of iterations by
1369             # examining the mw arguments
1370             my $min_count;
1371             foreach my $param ($cmd->params()) {
1372             if ($param->type() ne "PARAM") {
1373             my $count = $param->count();
1374             if (! defined $min_count) {
1375             $min_count = $count;
1376             } else {
1377             if (($count > 0) && ($count < $min_count)) {
1378             $min_count = $count;
1379             $logger->debug("New minimum iteration count of $min_count.");
1380             }
1381             }
1382             }
1383             }
1384              
1385             # Approach for master/worker (mw) jobs:
1386             #
1387             # 1. For each parameter, create an argument that contains the argument type, and a
1388             # list of the values to iterate over
1389             # 2. Calculate the minimum number of iterations from the parameters. In other words,
1390             # if there is a mismatch, then you have to take the lowest number of parameters
1391             # so that all parameters have defined siblings.
1392             # 3. Based on the number of iterations N, launch a number of workers on the grid to
1393             # process these where the number is some function of N, f(N).
1394             # 4. Pass the path of the exe to the worker program, so that it knows what to execute
1395             # The worker will know what portion of the work to do by the task id that the DRM
1396             # gives it. In SGE, this is done with an environment variable: SGE_TASK_ID.
1397             # 5. Worker will replace $(Index) and $(Name) placeholders with the iteration number or
1398             # or the value itself in the output file, error file, input file, args, etc...
1399             #
1400             # General form:
1401             # /path/to/worker \
1402             # param:blah_blah_blah \
1403             # dir::blah_blah_blah \
1404             # file::blah_blah_blah
1405             #
1406             # Example: /path/to/worker /path/to/user/command 1000 5 \
1407             # dir:/path/to/user/directory:-d $(Name) \
1408             # file:/path/to/user/file:-arg $(Name) \
1409             # param:-plain_arg
1410             #
1411             # We use a helper utility and method to determine how to divide the work.
1412             # We don't just path the min_count, because maybe different types of jobs
1413             # should be split up differently. This is why we pass $cmd, so that more
1414             # intelligent analysis may be done if configured...
1415            
1416             my $block_size = $cmd->block_size();
1417             if (ref($block_size) eq "CODE") {
1418             $logger->debug("Detected a code reference for block size.");
1419             my $block_size_calculator = $block_size;
1420             $logger->debug("Invoking the code to determine the block size.");
1421             $block_size = $block_size_calculator->($cmd, $min_count);
1422              
1423             if (defined($block_size) && length($block_size) && $block_size =~ /^-?\d+$/) {
1424             if ($block_size > 0) {
1425             $logger->debug("Invocation yielded a block size of $block_size.");
1426             } else {
1427             Grid::Request::Exception->throw(
1428             "Block size code reference yielded an invalid result. Must be a positive integer.");
1429             }
1430             } else {
1431             Grid::Request::Exception->throw(
1432             "Block size code reference yielded an invalid result.");
1433             }
1434             } else {
1435             $logger->debug("block_size is a regular scalar: $block_size");
1436             }
1437              
1438             # Compute the number of workers to invoke based on the block size.
1439             my $workers = ceil($min_count / $block_size);
1440              
1441             my $plurality = ($workers == 1) ? "worker" : "workers";
1442             $logger->info("This master/worker command requires $workers $plurality.");
1443              
1444             my $exe = $cmd->command();
1445             my @params;
1446             my $number_of_tasks = $min_count; # Just a variable rename for clarity
1447              
1448             push (@params, $exe, $block_size);
1449              
1450             my $delim = ':';
1451             foreach my $param ($cmd->params()) {
1452             my $arg_type;
1453             my $type = $param->type();
1454             if ($type eq "PARAM") {
1455             $logger->debug("Found a regular (non-MW) parameter.");
1456             push(@params, "param" . $delim . $param->value());
1457             next;
1458             } elsif ($type eq "DIR") {
1459             $arg_type = "dir";
1460             } elsif ($type eq "ARRAY") {
1461             $arg_type = "array";
1462             } elsif ($type eq "FILE") {
1463             $arg_type = "file";
1464             }
1465             my $value = $param->value();
1466             my $key = $param->key();
1467             my $arg = join($delim, ($arg_type, $value, $key));
1468             $logger->debug("Formulated MW worker argument: $arg");
1469             push(@params, $arg);
1470             }
1471             # Set these parameters
1472             ($error, $diagnosis) = drmaa_set_vector_attribute($jt, $DRMAA_V_ARGV, \@params);
1473             _throw_drmaa("Could not set command arguments.", $error, $diagnosis) if $error;
1474              
1475              
1476             # Get the job running...
1477             my @ids;
1478             if (defined($workers) && ($workers > 0)) {
1479             my $job_ids;
1480             ($error, $job_ids, $diagnosis) = drmaa_run_bulk_jobs($jt, 1, $workers, 1);
1481             _throw_drmaa("Could not run bulk jobs.", $error, $diagnosis) if $error;
1482             for (my $i=1; $i<=$workers; $i++) {
1483             my ($error, $job_id) = drmaa_get_next_job_id($job_ids);
1484             _throw_drmaa("Error getting next job id.", $error, $diagnosis) if $error;
1485             $logger->debug("Adding job id $job_id to the jobs array.");
1486             push (@ids, $job_id);
1487             }
1488             } else {
1489             Grid::Request::Exception->throw("MW job resulted in no workers to launch.");
1490             }
1491             # Set the job ids for the command
1492             $cmd->ids(@ids);
1493            
1494             # Delete the job template
1495             ($error, $diagnosis) = drmaa_delete_job_template($jt);
1496             _throw_drmaa("Error deleting the job template.", $error, $diagnosis) if $error;
1497              
1498             $logger->debug("Number of ids to return: " . scalar(@ids));
1499             return wantarray ? @ids : \@ids;
1500             }
1501              
1502             # Private method for internal use only. This method is used to submit
1503             # non-mw (Master/Worker) jobs. In other words, jobs that do not iterate
1504             # over anything by calling grid_request_worker.
1505             sub _submit_htc {
1506             $logger->debug("In _submit_htc.");
1507             my ($self, $jt, $cmd) = @_;
1508             unless (defined $jt && defined $cmd) {
1509             Grid::Request::InvalidArgumentException->throw(
1510             "Job template and/or command object are not defined.");
1511             }
1512              
1513             my $exe = $cmd->command();
1514             unless (defined $exe) {
1515             Grid::Request::InvalidArgumentException->throw("Command executable is not defined.");
1516             }
1517             $logger->debug("Setting the command executable.");
1518             my ($error, $diagnosis) = drmaa_set_attribute($jt, $DRMAA_REMOTE_COMMAND, $exe);
1519             _throw_drmaa("Could not set command executable.") if $error;
1520              
1521             my @params = $cmd->params();
1522             my @args = ();
1523             $logger->debug("Parameters obtained from the command object: " . scalar(@params));
1524             foreach my $param (@params) {
1525             my $value = $param->value();
1526             my $key = $param->key();
1527             if (defined $key) {
1528             $logger->debug("Got parameter key: $key");
1529             push (@args, $key);
1530             }
1531             $logger->debug("Got parameter value: $value");
1532             push (@args, $value);
1533             }
1534              
1535             if (scalar(@args)) {
1536             $logger->debug("Setting " . scalar(@args) . " arguments to the executable.");
1537             ($error, $diagnosis) = drmaa_set_vector_attribute($jt, $DRMAA_V_ARGV, \@args);
1538             _throw_drmaa("Could not set command arguments.", $error, $diagnosis) if $error;
1539             } else {
1540             $logger->debug("No arguments to set for the command.");
1541             }
1542              
1543             # Get the job running...
1544             my $times = $cmd->times();
1545             my @ids;
1546             if (defined($times) && ($times > 1)) {
1547             my $job_ids;
1548             ($error, $job_ids, $diagnosis) = drmaa_run_bulk_jobs($jt, 1, $times, 1);
1549             _throw_drmaa("Could not run bulk jobs.", $error, $diagnosis) if $error;
1550             for (my $i=1; $i<=$times; $i++) {
1551             my ($error, $job_id) = drmaa_get_next_job_id($job_ids);
1552             _throw_drmaa("Problem getting next job id.", $error, $diagnosis) if $error;
1553             $logger->debug("Adding job id $job_id to the jobs array.");
1554             push (@ids, $job_id);
1555             }
1556             } else {
1557             # If here, this is a singleton type job. Only 1 execution...
1558             my $job_id;
1559             ($error, $job_id, $diagnosis) = drmaa_run_job($jt);
1560             _throw_drmaa("Error running job.", $error, $diagnosis) if $error;
1561             $logger->debug("Adding job id $job_id to the jobs array.");
1562             # since the return is for an array of ids, make an array containing
1563             # a single job id.
1564             @ids = ($job_id);
1565             }
1566             # Set the job ids for the command
1567             $cmd->ids(@ids);
1568            
1569             # Delete the job template
1570             ($error, $diagnosis) = drmaa_delete_job_template($jt);
1571             _throw_drmaa("Error deleting the job template.", $error, $diagnosis) if $error;
1572              
1573             $logger->debug("Number of ids to return: " . scalar(@ids));
1574             return wantarray ? @ids : \@ids;
1575             }
1576              
1577             =item $obj->submit_and_wait();
1578              
1579             B Submit the request for execution on the grid and wait for the
1580             request to finish executing before returning control (block).
1581              
1582             B None.
1583              
1584             B $id, the request's id.
1585              
1586             =cut
1587              
1588             sub submit_and_wait {
1589             $logger->debug("In submit_and_wait.");
1590             my ($self, @args) = @_;
1591             my $validate_result = $self->_validate();
1592             my @ids = ();
1593             if ($validate_result == 1) {
1594             $logger->info("Validation process succeeded.");
1595             if ($self->simulate()) {
1596             $logger->info("Simulation is turned on, so do not really submit.");
1597             } else {
1598             @ids = $self->submit();
1599             # The submit() method handles setting the submitted flag.
1600             $self->wait_for_request();
1601             }
1602             } else {
1603             my $msg = "Validation failed.";
1604             $logger->fatal($msg);
1605             Grid::Request::Exception->throw($msg);
1606             }
1607             return wantarray ? @ids : \@ids;
1608             }
1609              
1610              
1611             =item $obj->times([times]);
1612              
1613             B Sometimes it may be desirable to execute a command more than
1614             one time. For instance, a user may choose to run an executable many
1615             times, with each invocation operating on a different input file. This technique
1616             allows for very powerful parallelization of commands. The times method
1617             establishes how many times the executable should be invoked.
1618              
1619             B An integer number may be passed in to set the times attribute on
1620             the request object. If no argument is passed, the method functions as a getter
1621             and returns the currently set "times" attribute, or undef if unset. The setting
1622             cannot be changed after the request has been submitted.
1623              
1624             B $times, when called with no arguments.
1625              
1626              
1627             =item $obj->to_xml();
1628              
1629             B Returns the XML representation of the entire request.
1630              
1631             B None.
1632              
1633             B $xml, a scalar XML string.
1634              
1635             =cut
1636              
1637             sub to_xml {
1638             my ($self, @args) = @_;
1639             $logger->debug("In to_xml.");
1640              
1641             require IO::Scalar;
1642             require XML::Writer;
1643             my $xml = "";
1644            
1645             my $handle = IO::Scalar->new(\$xml);
1646              
1647             my $w = XML::Writer->new( OUTPUT => $handle,
1648             DATA_MODE => 1,
1649             DATA_INDENT => 4
1650             );
1651              
1652             $w->xmlDecl();
1653             $w->comment("Generated by " . __PACKAGE__ . ": " . localtime());
1654             $w->startTag('commandSetRoot');
1655              
1656             # We de-reference the array reference containing all Command
1657             # objects, call to_xml() on each of them and use the XML string to
1658             # build the overall request XML document.
1659              
1660             my $count = 1;
1661             my $total = $self->command_count();
1662             foreach my $com_obj ( @{ $self->_com_obj_list() } ) {
1663             $logger->debug("Encoding command object $count/$total.");
1664             my $command_xml = $com_obj->to_xml();
1665             $handle->print($command_xml);
1666             $count++;
1667             }
1668              
1669             $w->endTag('commandSetRoot');
1670             $w->end();
1671              
1672             $handle->close;
1673             return $xml;
1674             }
1675              
1676             =item $obj->command_count();
1677              
1678             B Returns the number of currently configured commands
1679             in the overall request.
1680              
1681             B None.
1682              
1683             B $count, a scalar.
1684              
1685             =cut
1686              
1687             sub command_count {
1688             $logger->debug("In command_count.");
1689             my $self = shift;
1690             my $total = scalar( @{ $self->_com_obj_list() } );
1691             return $total;
1692             }
1693              
1694             =item $obj->wait_for_request();
1695              
1696             B Once a request has been submitted, a user may choose to wait
1697             for the request to complete before proceeding. This is called "blocking". To
1698             block and wait for a request, submit it ( by calling submit() ) and then call
1699             wait_for_request(). Control will return once the request has been finished
1700             (either completed or errored). If an attempt is made to call this method before
1701             the request has been submitted, a warning is generated.
1702              
1703             B None.
1704              
1705             B None.
1706              
1707             =cut
1708              
1709             sub wait_for_request {
1710             $logger->debug("In wait_for_request.");
1711             my ($self, @args) = @_;
1712             if ($self->is_submitted()) {
1713             # Wait for the job to complete
1714             my $total = $self->command_count();
1715             my $count = 1;
1716             for (my $cmd = 0; $cmd < $total; $cmd++) {
1717             $logger->info("Waiting for command $count/$total.");
1718             my $cmd_obj = $self->_com_obj_list->[$cmd];
1719             _sync_ids($cmd_obj);
1720             $logger->info("Command $count/$total finished executing.");
1721             $count++;
1722             }
1723             } else {
1724             $logger->logwarn("The request must be submitted before wait_for_request ",
1725             "may be called.");
1726             }
1727             }
1728              
1729             sub _sync_ids {
1730             $logger->debug("In _sync_ids.");
1731             my $cmd_obj = shift;
1732             my $max_time = $cmd_obj->max_time(); # In seconds
1733             my $wait_time = (defined $max_time) ? $max_time : $DRMAA_TIMEOUT_WAIT_FOREVER;
1734             if ($wait_time == $DRMAA_TIMEOUT_WAIT_FOREVER) {
1735             $logger->debug("Will wait indefinitely for it.")
1736             } else {
1737             $logger->debug("Will wait for $wait_time seconds.")
1738             }
1739             my @job_ids = $cmd_obj->ids();
1740             my ($error, $diagnosis) = drmaa_synchronize(\@job_ids, $wait_time, 0);
1741             _throw_drmaa("Error waiting for job execution.", $error, $diagnosis) if $error;
1742             }
1743              
1744             =item $obj->get_tasks();
1745              
1746             B Retrieve the tasks for this request
1747              
1748             B None.
1749              
1750             B A hash of hashes (HoH) representing the tasks for this
1751             request. The hash is organized by the index and the value
1752             is another hashref with the actual data. The following is an example
1753             of the return data structure:
1754              
1755             $hashref = {
1756             '1' => {
1757             'returnValue' => 0,
1758             'message' => undef,
1759             'state' => 'FINISHED'
1760             },
1761             '2' => {
1762             'returnValue' => -1,
1763             'message' => 'Failed task.',
1764             'state' => 'FAILED'
1765             }
1766             }
1767              
1768             =cut
1769              
1770             sub get_tasks {
1771             $logger->debug("In get_tasks.");
1772             my ($self, @args) = @_;
1773             my $tasks;
1774             if ($self->is_submitted) {
1775             # TODO: Implement via DRMAA vice HTC
1776             } else {
1777             $logger->logwarn("The request must be submitted before get_tasks ",
1778             "may be called.");
1779             }
1780             return $tasks;
1781             }
1782              
1783             sub get_status {
1784             my ($class, $job_id) = @_;
1785             if (! defined $logger) {
1786             _init_config_logger($default_config);
1787             }
1788             _init_drmaa();
1789              
1790             $class = ref($class) || $class;
1791             if (! defined $job_id) {
1792             Grid::Request::Exception->throw("No job id specified.");
1793             }
1794             $logger->debug("Getting status for id $job_id.");
1795              
1796             my $status;
1797             my ($error, $remoteps, $diagnosis) = drmaa_job_ps($job_id);
1798             _throw_drmaa("Could not get status.", $error, $diagnosis) if $error;
1799             if ($remoteps == $DRMAA_PS_RUNNING) {
1800             $status = "RUNNING";
1801             } elsif ($remoteps == $DRMAA_PS_QUEUED_ACTIVE) {
1802             $status = "WAITING";
1803             } elsif ($remoteps == $DRMAA_PS_DONE) {
1804             $status = "FINISHED";
1805             } elsif ($remoteps == $DRMAA_PS_FAILED) {
1806             $status = "FAILED";
1807             } elsif (($remoteps == $DRMAA_PS_SYSTEM_SUSPENDED) ||
1808             ($remoteps == $DRMAA_PS_USER_SUSPENDED) ||
1809             ($remoteps == $DRMAA_PS_USER_SYSTEM_SUSPENDED)) {
1810             $status = "SUSPENDED";
1811             } elsif (($remoteps == $DRMAA_PS_SYSTEM_ON_HOLD) ||
1812             ($remoteps == $DRMAA_PS_USER_ON_HOLD) ||
1813             ($remoteps == $DRMAA_PS_USER_SYSTEM_ON_HOLD)) {
1814             $status = "HELD";
1815             } elsif ($remoteps == $DRMAA_PS_UNDETERMINED) {
1816             $status = "UNKNOWN";
1817             } else {
1818             $status = "UNKNOWN";
1819             }
1820             return $status;
1821             }
1822              
1823             1;
1824              
1825             __END__