File Coverage

lib/Schedule/SGELK.pm
Criterion Covered Total %
statement 138 285 48.4
branch 29 124 23.3
condition 9 33 27.2
subroutine 20 28 71.4
pod 15 19 78.9
total 211 489 43.1


line stmt bran cond sub pod time code
1             #! /usr/bin/env perl
2              
3             =pod
4              
5             =head1 NAME
6              
7             Schedule::SGELK
8              
9             =head1 SYNOPSIS
10              
11             A module for submitting jobs to an SGE queue.
12              
13             use Schedule::SGELK
14             my $sge=Schedule::SGELK->new(verbose=>1,numnodes=>5,numcpus=>8,workingdir=>"SGE/",waitForEachJobToStart=>1);
15             $sge->set("jobname","thisisaname");
16             # run a series of jobs and wait for them all to finish
17             my $job=$sge->pleaseExecute("sleep 60");
18             my $job2=$sge->pleaseExecute("sleep 60");
19             $sge->wrapItUp();
20             # or you can specify which jobs to wait for
21             $sge->waitOnJobs([$job,$job2],1); # 1 means wait for all jobs to finish; 0 to wait for any free node
22             # or in one step
23             $sge->pleaseExecute_andWait("sleep 60");
24              
25             A quick test for this module is the following one-liner
26              
27             perl -MSchedule::SGELK -e '$sge=Schedule::SGELK->new(numnodes=>5); for(1..3){$sge->pleaseExecute("sleep 3");}$sge->wrapItUp();'
28              
29             Another quick test is to use the test() method, if you want to see standardized text output (see test() below)
30              
31             perl -MSchedule::SGELK -e '$sge=Schedule::SGELK->new(-numnodes=>2,-numcpus=>8); $sge->test(\%tmpSettings);'
32            
33              
34             =head1 DESCRIPTION
35              
36             A module for submitting jobs to an SGE queue. Monitoring is
37             performed using a combination of monitoring files
38             written to the hard drive and qstat.
39             Submitting is performed internally by making a perl script.
40              
41             =head1 AUTHOR
42              
43             Author: Lee Katz
44              
45             =cut
46              
47             package Schedule::SGELK;
48 1     1   274225 use strict;
  1         1  
  1         38  
49 1     1   5 use warnings;
  1         2  
  1         53  
50 1     1   5 use Data::Dumper;
  1         1  
  1         58  
51 1     1   4 use File::Basename qw/basename/;
  1         1  
  1         78  
52 1     1   4 use File::Spec;
  1         2  
  1         23  
53 1     1   532 use File::Slurp qw/read_file write_file/;
  1         25972  
  1         85  
54 1     1   9 use File::Temp qw/tempdir/;
  1         2  
  1         44  
55 1     1   489 use String::Escape qw/escape/;
  1         4873  
  1         82  
56 1     1   7 use version 0.77;
  1         15  
  1         7  
57              
58             our $VERSION = version->declare("v1.6.0");
59              
60             my $has_threads=eval{
61             return 0; # this isn't working yet
62             require threads;
63             return 1;
64             };
65              
66             # some global variables
67             my @jobsToClean=();
68             my @jobsToMonitor=();
69             my $numSlots=0; # number of slots that are being used now
70              
71 4     4 0 500 sub logmsg {local $0=basename $0;my $FH = *STDOUT; print $FH "$0: ".(caller(1))[3].": @_\n";}
  4         56  
  4         155  
72             local $SIG{'__DIE__'} = sub { my $e = $_[0]; $e =~ s/(at [^\s]+? line \d+\.$)/\nStopped $1/; die("$0: ".(caller(1))[3].": ".$e); };
73             local $SIG{INT} = sub{ cleanAllJobs(); };
74              
75             # to be called when the script exits
76             sub cleanAllJobs{
77 1 50   1 0 10 return if(!@jobsToClean);
78 1         7 logmsg "Cleaning all jobs";
79 1         4 for (@jobsToClean){
80 2         10 cleanAJob($_);
81             }
82             }
83             END{
84             cleanAllJobs();
85             }
86              
87             =pod
88              
89             =head2 METHODS
90              
91             =over
92              
93             =item sub new
94              
95             create a new instance of a scheduler.
96             Arguments and their defaults:
97              
98             numnodes=>50 maximum nodes to use
99             numcpus=>128 maximum cpus that will be used per node in a script
100             maxslots=>9999 maximum slots that you can use. Useful if you want to be limited by total slots instead of nodes or CPUs. E.g. {numnodes=>100,numcpus=>1,maxslots=>20}
101             verbose=>0
102             workingdir=>$ENV{PWD} a directory that all nodes can access to read/write jobs and log files
103             waitForEachJobToStart=>0 Allow each job to start as it's run (0), or to wait until the qstat sees the job before continuing (1)
104             jobname=>... This is the name given to the job when you view it with qstat. By default, it will be named after the script that calls this module.
105             warn_on_error=>1 This will make the script give a warning instead of exiting
106             qsubxopts=>... These are extra options to pass to qsub. E.g., {qsubxopts=>"-V"} Options are overwritten by appending them to the within-script options. Therefore this is not the best way to choose a different queue but it is a way to change a job name or the number of processors.
107             noqsub=>1 Force performing a system call instead of using qsub
108             queue=>all.q Choose the queue to use for a new job. Default: all.q
109              
110             Examples:
111             {numnodes=>100,numcpus=>1,maxslots=>50} # for many small jobs
112             {numnodes=>5,numcpus=>8,maxslots=>40} # for a few larger jobs (note: maxslots should be >= numnodes * maxslots
113              
114             =back
115              
116             =cut
117              
118             sub new{
119 1     1 1 2206 my($class,%args)=@_;
120 1         2 my $self=bless{},$class;
121              
122             # just start with verbosity = 0. This avoids an error in the >= checks
123 1         5 $self->{'settings'}={};
124 1         2 $self->{'error'}="";
125 1         2 $self->{'exit_code'}=0;
126              
127             # load up if we know what we have
128 1         5 foreach my $key (keys %args) {
129 6         6 my $nodash=$key;
130 6         6 $nodash =~ s/^\-//;
131 6         13 $self->set($nodash,$args{$key});
132             }
133              
134             # set defaults if they are not set
135 1         5 my %default=(numnodes=>50,numcpus=>128,verbose=>0,waitForEachJobToStart=>0,maxslots=>9999,queue=>"all.q",scheduler=>"SGE");
136 1         3 while(my($key,$value)=each(%default)){
137 7 100       10 $self->settings($key,$value) if(!defined($self->settings($key)));
138             }
139 1 50       2 if(!$self->get("workingdir")){
140 0         0 $self->set("workingdir",$self->mktempdir());
141 0         0 logmsg "Working directory not set. Using ".$self->get("workingdir");
142             }
143              
144             # executables
145 1         1 for(qw(qsub qstat qdel)){
146             # See if it exists
147 3         20520 my $exec=`which $_ 2>/dev/null`;
148 3 50       49 $exec="" if $?;
149 3         10 chomp($exec);
150 3         51 $self->set($_,$exec);
151              
152 3 50       12 $self->set("scheduler","") if(!$exec);
153             }
154              
155             # See if SGE is present
156 1 50 33     27 if($ENV{SGE_ROOT} && -e $ENV{SGE_ROOT}){
157 0         0 $self->set("scheduler","SGE");
158             } else{
159 1         13 logmsg "Env variable \$SGE_ROOT is not set. I will not use SGE";
160 1         18 $self->set("scheduler","");
161 1         2 $self->set("qsub","");
162             }
163              
164             # Remove the scheduler option if the user explicitly
165             # chooses not to use it.
166 1 50       8 $self->set("scheduler","") if($self->get("noqsub"));
167              
168 1         33 return $self;
169             }
170              
171             =pod
172              
173             =over
174              
175             =item sub error($msg,$exit_code) or error()
176              
177             Get or set the error. Can set the error code too, if provided.
178              
179             =back
180              
181             =cut
182              
183             # Sets the error and returns the previous error message.
184             # Or, simply returns the current error message.
185             sub error{
186 0     0 1 0 my($self,$msg,$exit_code)=@_;
187 0 0       0 return $self->{error} if(!defined($msg));
188 0         0 my $oldmsg=$self->{error};
189 0         0 $self->{error}=$msg;
190 0 0       0 $self->{exit_code}=$exit_code if(defined($exit_code));
191 0         0 return $oldmsg;
192             }
193             =pod
194              
195             =over
196              
197             =item sub set() get() settings()
198              
199             Get or set a setting. All settings are listed under sub new().
200             If a setting is provided without a value, then nothing new will be set,
201             and only the value of the specified setting will be returned.
202              
203             =back
204              
205             =cut
206              
207             sub set{
208 37     37 1 86 my($self,$key,$value)=@_;
209 37 100 100     151 if(defined($key) && defined($value)){
    100          
210 18         54 $self->{'settings'}{$key}=$value;
211             } elsif (defined($key)){
212 17         128 return $self->{'settings'}{$key};
213             }
214 20 50       36 return %{$self->{'settings'}} if wantarray;
  0         0  
215 20         80 return $self->{'settings'};
216             }
217             # renaming of sub set()
218             sub get{
219 8     8 1 65 my($self,@args)=@_;
220 8         24 return $self->set(@args);
221             }
222             # renaming of sub set()
223             sub settings{
224 14     14 1 46 my($self,@args)=@_;
225 14         27 return $self->set(@args);
226             }
227              
228             =pod
229              
230             =over
231              
232             =item pleaseExecute()
233              
234             This is the main method. It will submit a command to the cluster.
235              
236             $sge->set("jobname","a_nu_start");
237             $sge->pleaseExecute("someCommand with parameters");
238              
239             If you are already occupying more than numnodes, then it will pause before
240             executing the command. It will also create many files under workingdir, so
241             be sure to specify it. The workingdir is the current directory by default.
242              
243             You can also specify temporary settings for this one command with a referenced hash.
244              
245             $sge->pleaseExecute("someCommand with parameters",{jobname=>"a_nu_start",numcpus=>2});
246              
247             =back
248              
249             =cut
250              
251             sub pleaseExecute{
252 2     2 1 27 my($self,$cmd,$tmpSettings)=@_;
253 2         208 local $0=basename $0;
254 2         10 my %settings=%{ $self->settings };
  2         9  
255              
256             # read in any temporary settings for this command
257 2 50       12 $tmpSettings={} if(!defined($tmpSettings));
258 2         9 $settings{$_}=$$tmpSettings{$_} for(keys(%$tmpSettings));
259              
260             # default settings for undefined settings
261 2         6 my $jobid=-1; # -1 is an error state
262 2   33     36 $settings{jobname}||="sgelk$0";
263 2   33     33 $settings{logfile}||="$0.log";
264 2   50     7 $settings{numcpus}||=1;
265 2   50     14 $settings{timeout}||=60; # how long we will wait for qsub to start
266 2 50       28 return 0 if($cmd=~/^\s*$/); # if there's no command, then no worries
267              
268 2         18 $self->waitOnJobs(\@jobsToMonitor,0); # wait until the job can be submitted
269              
270 2         43 my $rand=int(rand(999999));
271 2         4 my $tempdir=$settings{workingdir};
272             # create a perl script with the literal command in it
273 2         6 my $script="$tempdir/qsub.$rand.pl";
274              
275 2 50       8 my $prefix=($0 eq '-e')?"STDIN":$0;
276 2         6 $prefix="$settings{workingdir}/$prefix.$rand";
277 2         10 my($submitted,$running,$finished,$died,$output)=("$prefix.submitted", "$prefix.running", "$prefix.finished","$prefix.died","$prefix.log");
278            
279 2         13952 my $perl=`which perl`; chomp($perl);
  2         25  
280 2 50       362 open(SCRIPT,">",$script) or die "Could not write to temporary script $script: $!";
281 2         21 print SCRIPT "#! $perl\n\n";
282             # It has SGE params in it.
283 2         16 print SCRIPT "#\$ -N $settings{jobname}\n";
284 2         12 print SCRIPT "#\$ -S $perl\n";
285 2         7 print SCRIPT "#\$ -V\n";
286 2         12 print SCRIPT "#\$ -wd $ENV{PWD}\n";
287 2         21 print SCRIPT "#\$ -pe smp $settings{numcpus}\n";
288 2         24 print SCRIPT "#\$ -o $output\n";
289 2         14 print SCRIPT "#\$ -e $output\n";
290 2         9 print SCRIPT "#\$ -q $settings{queue}\n";
291             # qsubxopts get to be in here first but will be overwritten by later qsubopts below
292 2 50       24 if(my $opts=$settings{qsubxopts}){
293 0         0 print SCRIPT "# options specified by qsubxopts are in the next line:\n";
294 0         0 print SCRIPT "#\$ $opts\n";
295             }
296 2         5 print SCRIPT "use strict;\nuse warnings;\n";
297 2         19 print SCRIPT "use File::Slurp qw/read_file write_file/;\n";
298              
299             # announces that it was submitted
300 2         68 my $sanitized=escape('qqbackslash',$cmd);
301 2         336 print SCRIPT "write_file('$submitted',$sanitized);\n";
302             # it runs the command
303 2         15 print SCRIPT "write_file('$running',$sanitized);\n";
304 2         11 print SCRIPT "system($sanitized);\n";
305             # let the script try one more time if it fails
306             #print SCRIPT "system($sanitized) if \$?;\n";
307             # print a parsable error if the script dies. This error will be in $output and in file $died
308 2         25 print SCRIPT <
309             if(\$?){
310             my \$error=\"QSUB ERROR\\n\$?\\n\$!\";
311             write_file('$died',$sanitized);
312             write_file('$died',{append=>1},"\\n\$error\\n");
313             die \$error;
314             }
315             END
316             # announces when it is finished
317 2         15 print SCRIPT "write_file('$finished',$sanitized);\n";
318 2         184 close SCRIPT;
319 2 50       10835 system("touch $script"); die if $?; # make the system close the script. Why isn't Perl closing it?
  2         63  
320             #system("cat $script");sleep 60;die;
321              
322             # now run the script and get the jobid
323 2         100 my %return=(submitted=>$submitted,running=>$running,finished=>$finished,died=>$died,tempdir=>$tempdir,output=>$output,cmd=>$cmd,script=>$script,jobname=>$settings{jobname},numcpus=>$settings{numcpus});
324 2         33 my $qsub=$self->get("qsub");
325 2 50       45 if(!$settings{scheduler}){
326 2         25 my $job=command("$perl $script",$has_threads,\%settings);
327 2 50       25 $return{thread}=$job if($has_threads);
328 2 50       18 $return{jobid}=$job->tid if($has_threads);
329 2 50       65 push(@jobsToClean,\%return) if(!$self->settings("keep"));
330 2         5 push(@jobsToMonitor,\%return);
331 2         11 $numSlots+=$settings{numcpus}; # claim these cpus
332 2 50       17 return %return if wantarray;
333 2         200 return \%return;
334             }
335              
336             # At this point, qsub is on this computer. Submit the job.
337 0         0 my $out=`$qsub $script`; chomp($out);
  0         0  
338 0 0       0 if($out=~/Your job (\d+)/){
339 0         0 $jobid=$1;
340 0         0 $out.=" from $script";
341 0 0       0 logmsg $out if($settings{verbose});
342             } else {
343 0         0 logmsg "WARNING: the last job submitted did not have an obvious jobid. It can't be tracked!";
344             }
345              
346             # monitor for the script to be running before moving on
347 0         0 my $started=time;
348 0         0 while(!-e $submitted){
349 0 0       0 last if(!$self->settings("waitForEachJobToStart"));
350 0         0 sleep 1;
351 0 0       0 die "Command timed out!\n $cmd" if((time-$started)>$settings{timeout});
352 0 0       0 die "Command resulted in an error. qstat -j $jobid for more info\n $cmd" if($self->jobStatus($jobid) eq 'Eqw');
353             }
354              
355             # TODO create a link from the jobid to the random id
356            
357 0         0 $return{jobid}=$jobid;
358 0 0       0 push(@jobsToClean,\%return) if(!$self->settings("keep"));
359 0         0 push(@jobsToMonitor,\%return);
360 0         0 $numSlots+=$settings{numcpus}; # claim these cpus
361 0 0       0 return %return if wantarray;
362 0         0 return \%return;
363             }
364              
365             =pod
366              
367             =over
368              
369             =item pleaseExecute_andWait()
370              
371             Exact same as pleaseExecute(), except it will wait for the command to finish
372             before continuing. Internally calls pleaseExecute() and then waitOnJobs().
373             However one key difference between pleaseExecute() and this sub is that you can
374             give a list of commands.
375              
376             # this will take 100 seconds because all commands have to finish.
377             $sge->pleaseExecute_andWait(["sleep 60","sleep 100","sleep 3"]);
378              
379             =back
380              
381             =cut
382              
383             sub pleaseExecute_andWait{
384 0     0 1 0 my($self,$cmd)=@_;
385 0         0 my %settings=$self->settings;
386 0         0 my $mustfinish=$self->settings("mustfinish"); # should be restored later
387 0         0 $self->set("mustfinish",0);
388 0 0       0 $cmd=[$cmd] if(ref($cmd) eq ""); # let cmd be a string but turn it into a list internally
389 0         0 my(@jobid);
390 0         0 for(@$cmd){
391 0         0 my $jobid=$self->pleaseExecute($_);
392 0         0 push(@jobid,$jobid);
393 0         0 $self->waitOnJobs(\@jobid);
394             }
395 0         0 $self->waitOnJobs(\@jobid,1);
396             }
397              
398             =pod
399              
400             =over
401              
402             =item checkJob($jobHash)
403              
404             Checks the status of a given job. The job variable is obtained from pleaseExecute().
405             $self->error can be set if there is an error in the job. Return values:
406             1 for finished; 0 for still running or hasn't started; -1 for error.
407              
408             =back
409              
410             =cut
411              
412             sub checkJob{
413 0     0 1 0 my($self,$job)=@_;
414             # See what the job status is {jobid} for fast checking
415 0         0 my $status=$self->jobStatus($$job{jobid});
416 0 0       0 if($status eq 'qw'){ # queued but not running
    0          
    0          
417 0         0 return 0;
418             } elsif($status eq 'Eqw'){ # error
419 0         0 $self->error("Command resulted in an error. qstat -j $$job{jobid} for more info\n $$job{cmd}");
420 0         0 return -1;
421             } elsif($status=~/[rt]/){ # running or is delayed
422 0         0 return 0;
423             }
424              
425             # look at files to check on the job status, for slower checking.
426             # see if the job has even started: {submitted}
427 0 0       0 return 0 if(!-e $$job{submitted});
428             # if the job finished, then great! {finished}
429 0 0       0 return 1 if(-e $$job{finished});
430 0 0       0 return 1 if(!keys(%$job)); # sometimes a job is blank... why?
431             # if the job died
432 0 0       0 if(-e $$job{died}){
433 0         0 my @content=read_file($$job{output});
434 0         0 chomp(@content);
435 0         0 $self->error(join("\n",@content[-3..-1]));
436 0         0 return -1;
437             }
438             # It's running if the die-file isn't there and if the running file is there
439 0 0       0 return 0 if(-e $$job{running});
440 0         0 logmsg "ERROR: Could not understand what the status is of job $$job{jobid}!\n".Dumper($job);
441 0         0 return -1;
442             }
443              
444             =pod
445              
446             =over
447              
448             =item jobStatus(jobid)
449              
450             Given an SGE job id, it returns its qstat status
451              
452             =back
453              
454             =cut
455              
456             sub jobStatus{
457 0     0 1 0 my($self,$jobid)=@_;
458 0         0 my $state=0;
459 0   0     0 $jobid||=0;
460 0         0 my $qstat=$self->qstat;
461 0         0 for(split(/\n/,$qstat)){
462 0         0 my @F=split /\s+/;
463 0 0       0 if($F[0] eq $jobid){
464 0         0 $state=$F[4];
465             }
466             }
467 0         0 close QSTAT;
468 0         0 return $state;
469             }
470              
471             =pod
472              
473             =over
474              
475             =item qstat
476              
477             Runs qstat and caches the result for one second. Or, returns the cached result of qstat
478              
479             =back
480              
481             =cut
482              
483             sub qstat{
484 0     0 1 0 my($self)=@_;
485             # return the cached value if it was just accessed a second ago
486             #return $self->get("qstat") if(defined($self->get("qstat")) && $self->get("qstat_timestamp") <= time - 1);
487              
488 0         0 my $content="";
489 0 0       0 open(QSTAT,"qstat|") or die "ERROR: could not execute qstat! $!";
490 0         0 while(my $line=){
491 0         0 $line=~s/^\s+|\s+$//g;
492 0         0 $content.="$line\n";
493             }
494 0         0 close QSTAT;
495 0         0 $self->set("qstat",$content);
496 0         0 $self->set("qstat_timestamp",time);
497 0         0 return $self->get("qstat");
498             }
499              
500             =pod
501              
502             =over
503              
504             =item wrapItUp()
505              
506             Waits on all jobs to finish before pausing the program.
507             Calls waitOnJobs or joinAllThreads internally. Does not take any parameters.
508              
509             =back
510              
511             =cut
512              
513             # Wait on all jobs to finish and clear out the queue.
514             sub wrapItUp{
515 1     1 1 26 my($self)=@_;
516 1 50       15 if($self->get("scheduler")){
    50          
517 0         0 $self->waitOnJobs(\@jobsToMonitor,1);
518             } elsif($has_threads){
519 0         0 $self->joinAllThreads(\@jobsToMonitor,1);
520             }
521 1         2 return 1;
522             }
523              
524             =pod
525              
526             =over
527              
528             =item joinAllThreads($jobList)
529              
530             Joins all threads. This is if you have ithreads and if the scheduler is not set.
531             For example, if you specify noqsub or if qsub executable is not found.
532              
533             =back
534              
535             =cut
536              
537             sub joinAllThreads{
538 0     0 1 0 my($self,$job)=@_;
539              
540             JOINALLTHREADS:
541 0         0 for my $j(@$job){
542 0 0       0 next if(!$$j{thread});
543 0 0 0     0 next if($$j{thread} && !$$j{thread}->is_joinable);
544 0         0 logmsg "Joining TID".$$j{jobid};
545 0         0 $$j{thread}->join;
546 0         0 $$j{thread}=0;
547             }
548              
549             # clean out the joined jobs
550 0         0 my @newjob;
551 0         0 for my $j(@$job){
552 0 0       0 push(@newjob,$j) if($$j{thread});
553             }
554 0         0 $job=\@newjob;
555              
556             # if there is still something in @$job, then go for another round
557 0 0       0 if(@$job){
558 0         0 logmsg "Waiting for ".scalar(@$job)." more jobs to finish...";
559 0         0 sleep 1;
560 0         0 goto JOINALLTHREADS;
561             }
562              
563             }
564              
565              
566             =pod
567              
568             =over
569              
570             =item waitOnJobs($jobList,[$mustFinish])
571              
572             Waits on all given jobs to finish. The job list are jobs as given by pleaseExecute().
573             If $mustFinish evaluates to true, then the program will pause until
574             all jobs are finished.
575             Calls on checkJob() internally. Will die with an error message if a job dies.
576              
577             =back
578              
579             =cut
580              
581             # Wait on enough jobs to finish before returning.
582             # If a job finishes, splice it from the job array.
583             sub waitOnJobs{
584 3     3 1 20 my($self,$job,$mustfinish)=@_;
585            
586             # if there is no qsub, then every job is only going one at a time
587 3         15 my $qsub=$self->get("qsub");
588 3 50       12 return @$job if(!$qsub);
589              
590 0         0 my %settings=$self->settings;
591 0 0       0 $settings{mustfinish}=$mustfinish if(defined($mustfinish));
592 0 0       0 if($settings{verbose}){
593 0 0       0 logmsg "We have reached node capacity ($settings{numnodes})! Waiting for a job to finish." if(@$job >= $settings{numnodes});
594 0 0       0 logmsg "We have reached slot capacity ($settings{maxslots})! Waiting for a job to finish." if($numSlots >= $settings{maxslots});
595             }
596 0         0 while(@$job > 0){
597 0         0 for(my $i=0;$i<@$job;$i++){
598 0   0     0 $$job[$i]{jobid}||=0;
599 0         0 my $state=$self->checkJob($$job[$i]);
600 0 0       0 if($state==1){
    0          
601 0 0       0 logmsg "A job finished: $$job[$i]{jobname} ($$job[$i]{jobid})" if($settings{verbose});
602 0         0 $numSlots = $numSlots - $$job[$i]{numcpus}; # not using these slots anymore
603 0         0 splice(@$job,$i,1);
604 0         0 last;
605             } elsif($state==-1){
606 0         0 my $msg="A job failed ($$job[$i]{jobname} [$$job[$i]{jobid}]! Look at $$job[$i]{output} for more details.\nError message was ".$self->error()."\n".Dumper($$job[$i]);
607 0 0       0 die $msg if(!$settings{warn_on_error});
608             # just print the warning if the script didn't die and forget about this dead job
609 0         0 logmsg $msg;
610 0         0 $numSlots = $numSlots - $$job[$i]{numcpus}; # not using these slots anymore
611 0         0 $self->error($msg);
612 0         0 splice(@$job,$i,1);
613 0         0 last;
614             }
615             }
616 0         0 sleep 1;
617             # break out if you don't have to finish yet but you can still add in another job
618 0 0 0     0 last if(!$settings{mustfinish} && @$job<$settings{numnodes} && $numSlots<$settings{maxslots});
      0        
619             }
620 0         0 return @$job;
621             }
622              
623             =pod
624              
625             =over
626              
627             =item cleanAJob
628              
629             This is internally used for cleaning up files after a job is done.
630             Do not use externally.
631              
632             =back
633              
634             =cut
635              
636             sub cleanAJob{
637 2     2 1 6 my($job)=@_;
638 2   50     44 my $jobid=$$job{jobid} || return 0;
639 0         0 logmsg $jobid;
640 0         0 for (qw(running submitted finished output script died)){
641 0         0 unlink $$job{$_};
642             }
643              
644 0         0 system("qdel $$job{jobid} 2>/dev/null | grep -v 'does not exist'");
645             #die "Internal error" if $?;
646 0         0 return 1;
647             }
648              
649             sub mktempdir{
650 0     0 0 0 my ($self,$settings) = @_;
651 0   0     0 $settings||={};
652             # SGELK.22623.XXXXX
653             #my $tempdir_path = File::Spec->join(File::Spec->tmpdir(), (split("::",(caller(1))[3]))[1].".$$.XXXXX");
654 0 0       0 mkdir "./.SGELK" if(!-d "./.SGELK");
655 0 0       0 die if $?;
656 0         0 my $tempdir_path = File::Spec->join("./.SGELK",(split("::",(caller(1))[3]))[1].".$$.XXXXX");
657 0         0 my $tempdir = tempdir($tempdir_path, CLEANUP => !($$settings{keep}));
658 0         0 return $tempdir;
659             }
660              
661             sub command{
662 2     2 0 8 my($cmd,$use_threads,$settings)=@_;
663 2         4 my $job=0;
664 2 50       9 if($use_threads){
665 0         0 $job=threads->new(\&command,$cmd,0,$settings);
666             } else {
667 2         13 logmsg "Running $cmd";
668 2         1245441 system($cmd);
669 2 50       88 die "ERROR with command: $!\n $cmd" if $?;
670             }
671              
672 2         39 return $job;
673             }
674              
675             =pod
676              
677             =over
678              
679             =item test
680              
681             Use this method to perform a test. The test sends
682             ten jobs that print debugging information.
683              
684             You can give an optional hash argument to send other settings as described in new().
685              
686             perl -MSchedule::SGELK -e '$sge=Schedule::SGELK->new(-numnodes=>2,-numcpus=>8); $sge->test(\%tmpSettings);'
687              
688             =back
689              
690             =cut
691              
692             sub test{
693 0     0 1   my($self,$tmpSettings)=@_;
694              
695             # get settings
696 0           my %settings=%{ $self->settings };
  0            
697             # read in any temporary settings for this command
698 0 0         $tmpSettings={} if(!defined($tmpSettings));
699 0           $$tmpSettings{verbose}=1; # make sure it's verbose for debugging
700 0           $settings{$_}=$$tmpSettings{$_} for(keys(%$tmpSettings));
701              
702             # execute the jobs
703 0           for(1..$self->get("numnodes")){
704 0           logmsg "Job $_ is being submitted";
705 0           my $text="Job count\t$_\n";
706 0           $text.="$_\t$settings{$_}\n" for(keys(%settings));
707 0           $self->pleaseExecute("echo '$text'|column -t",$tmpSettings);
708             }
709 0           $self->wrapItUp();
710 0           return 1;
711             }
712              
713             1;