File Coverage

blib/lib/Event/ExecFlow/Job.pm
Criterion Covered Total %
statement 117 283 41.3
branch 22 82 26.8
condition 11 49 22.4
subroutine 36 85 42.3
pod 0 82 0.0
total 186 581 32.0


line stmt bran cond sub pod time code
1             package Event::ExecFlow::Job;
2              
3 2     2   11 use strict;
  2         2  
  2         53  
4 2     2   8 use Carp;
  2         4  
  2         96  
5              
6 2     2   1667 use Locale::TextDomain $Event::ExecFlow::locale_textdomain;
  2         71311  
  2         19  
7              
8 0     0 0 0 sub get_id { shift->{id} }
9 0     0 0 0 sub get_title { shift->{title} }
10 8     8 0 63 sub get_name { shift->{name} }
11 27     27 0 115 sub get_depends_on { shift->{depends_on} }
12 485     485 0 4774 sub get_state { shift->{state} }
13 74     74 0 540 sub get_cancelled { shift->{cancelled} }
14 122     122 0 773 sub get_error_message { shift->{error_message} }
15 28     28 0 1195 sub get_warning_message { shift->{warning_message} }
16 36     36 0 152 sub get_progress_max { shift->{progress_max} }
17 70     70 0 629 sub get_progress_cnt { shift->{progress_cnt} }
18 0     0 0 0 sub get_progress_start_time { shift->{progress_start_time} }
19 0     0 0 0 sub get_progress_end_time { shift->{progress_end_time} }
20 0     0 0 0 sub get_progress_ips { shift->{progress_ips} }
21 0     0 0 0 sub get_no_progress { shift->{no_progress} }
22 36     36 0 227 sub get_last_progress { shift->{last_progress} }
23 0     0 0 0 sub get_last_percent_logged { shift->{last_percent_logged} }
24 14     14 0 99 sub get_pre_callbacks { shift->{pre_callbacks} }
25 28     28 0 453 sub get_post_callbacks { shift->{post_callbacks} }
26 0     0 0 0 sub get_error_callbacks { shift->{error_callbacks} }
27 0     0 0 0 sub get_warning_callbacks { shift->{warning_callbacks} }
28 126     126 0 794 sub get_frontend { shift->{frontend} }
29 60     60 0 294 sub get_group { shift->{group} }
30 0     0 0 0 sub get_diskspace_consumed { shift->{diskspace_consumed} }
31 0     0 0 0 sub get_diskspace_freed { shift->{diskspace_freed} }
32 0     0 0 0 sub get_stash { shift->{stash} }
33 0     0 0 0 sub get_paused { shift->{paused} }
34 0     0 0 0 sub get_paused_seconds { shift->{paused_seconds} }
35 0     0 0 0 sub get_paused_start_time { shift->{paused_start_time} }
36 14     14 0 38 sub get_skipped { shift->{skipped} }
37              
38 0     0 0 0 sub set_title { shift->{title} = $_[1] }
39 0     0 0 0 sub set_name { shift->{name} = $_[1] }
40 56     56 0 396 sub set_state { shift->{state} = $_[1] }
41 14     14 0 35 sub set_error_message { shift->{error_message} = $_[1] }
42 0     0 0 0 sub set_warning_message { shift->{warning_message} = $_[1] }
43 3     3 0 9 sub set_progress_max { shift->{progress_max} = $_[1] }
44 55     55 0 231 sub set_progress_cnt { shift->{progress_cnt} = $_[1] }
45 14     14 0 87 sub set_progress_start_time { shift->{progress_start_time} = $_[1] }
46 28     28 0 106 sub set_progress_end_time { shift->{progress_end_time} = $_[1] }
47 0     0 0 0 sub set_progress_ips { shift->{progress_ips} = $_[1] }
48 0     0 0 0 sub set_no_progress { shift->{no_progress} = $_[1] }
49 40     40 0 173 sub set_last_progress { shift->{last_progress} = $_[1] }
50 14     14 0 36 sub set_last_percent_logged { shift->{last_percent_logged} = $_[1] }
51 0     0 0 0 sub set_pre_callbacks { shift->{pre_callbacks} = $_[1] }
52 0     0 0 0 sub set_post_callbacks { shift->{post_callbacks} = $_[1] }
53 0     0 0 0 sub set_error_callbacks { shift->{error_callbacks} = $_[1] }
54 0     0 0 0 sub set_warning_callbacks { shift->{warning_callbacks} = $_[1] }
55 14     14 0 56 sub set_frontend { shift->{frontend} = $_[1] }
56 13     13 0 41 sub set_group { shift->{group} = $_[1] }
57 0     0 0 0 sub set_diskspace_consumed { shift->{diskspace_consumed} = $_[1] }
58 0     0 0 0 sub set_diskspace_freed { shift->{diskspace_freed} = $_[1] }
59 0     0 0 0 sub set_stash { shift->{stash} = $_[1] }
60 0     0 0 0 sub set_paused { shift->{paused} = $_[1] }
61 0     0 0 0 sub set_paused_seconds { shift->{paused_seconds} = $_[1] }
62 0     0 0 0 sub set_paused_start_time { shift->{paused_start_time} = $_[1] }
63 0     0 0 0 sub set_skipped { shift->{skipped} = $_[1] }
64              
65             sub set_depends_on {
66 14     14 0 17 my $self = shift;
67 14         18 my ($jobs_lref) = @_;
68            
69 14 50       16 my @job_names = map { ref $_ ? $_->get_name : $_ } @{$jobs_lref};
  1         6  
  14         22  
70 14         38 $self->{depends_on} = \@job_names;
71            
72 14         25 return \@job_names;
73             }
74              
75             sub set_cancelled {
76 14     14 0 20 my $self = shift;
77 14         25 my ($cancelled) = @_;
78 14         31 $self->{cancelled} = $cancelled;
79 14 50       62 $self->set_state($cancelled ? "cancelled":"waiting");
80 14         21 return $cancelled;
81             }
82              
83             sub finished_ok {
84 0     0 0 0 my $self = shift;
85 0   0     0 return !$self->get_cancelled &&
86             !$self->get_error_message;
87             }
88              
89             my $JOB_ID = (time - 1140691085) * 1_000_000;
90              
91             sub new {
92 14     14 0 23 my $class = shift;
93 14         61 my %par = @_;
94 14         28 my ($title, $name, $depends_on, $pre_callbacks) =
95             @par{'title','name','depends_on','pre_callbacks'};
96 14         25 my ($post_callbacks, $error_callbacks, $warning_callbacks) =
97             @par{'post_callbacks','error_callbacks','warning_callbacks'};
98 14         22 my ($progress_cnt, $progress_max, $progress_ips, $no_progress) =
99             @par{'progress_cnt','progress_max','progress_ips','no_progress'};
100 14         24 my ($diskspace_consumed, $diskspace_freed, $stash, $frontend) =
101             @par{'diskspace_consumed','diskspace_freed','stash','frontend'};
102              
103 14         19 my $id = ++$JOB_ID;
104              
105 14   100     51 $depends_on ||= [];
106 14   50     61 $stash ||= {};
107 14   33     26 $name ||= '~'.$id;
108              
109 1         4 croak "Job '$name' depends on itself"
110 14 50       16 if grep { $_ eq $name } @{$depends_on};
  14         34  
111              
112 14         25 for my $cb ( $pre_callbacks, $post_callbacks,
113             $error_callbacks, $warning_callbacks ) {
114 56   66     192 $cb ||= Event::ExecFlow::Callbacks->new;
115 56 100       198 $cb = Event::ExecFlow::Callbacks->new($cb) if ref $cb eq 'CODE';
116             }
117              
118 14         197 my $self = bless {
119             id => $id,
120             title => $title,
121             name => $name,
122             depends_on => $depends_on,
123             state => 'waiting',
124             diskspace_consumed => $diskspace_consumed,
125             diskspace_freed => $diskspace_freed,
126             progress_cnt => $progress_cnt,
127             progress_max => $progress_max,
128             progress_ips => $progress_ips,
129             no_progress => $no_progress,
130             pre_callbacks => $pre_callbacks,
131             post_callbacks => $post_callbacks,
132             error_callbacks => $error_callbacks,
133             warning_callbacks => $warning_callbacks,
134             stash => $stash,
135             frontend => $frontend,
136             paused_seconds => 0,
137             last_percent_logged => 0,
138             group => undef,
139             }, $class;
140            
141 14         43 $self->set_depends_on($depends_on);
142            
143 14         62 return $self;
144             }
145              
146             sub init {
147 14     14 0 24 my $self = shift;
148            
149 14 50 33     63 return if $self->get_state ne 'waiting' &&
150             $self->get_state ne 'running';
151            
152 14         91 $self->set_state("waiting");
153 14         74 $self->set_progress_start_time(time);
154 14         66 $self->set_progress_end_time();
155 14         92 $self->set_cancelled();
156 14         74 $self->set_error_message();
157 14         272 $self->set_last_percent_logged(0);
158 14         50 $self->set_last_progress();
159 14         55 $self->set_progress_cnt(0);
160              
161 14         42 1;
162             }
163              
164             sub start {
165 14     14 0 32 my $self = shift;
166            
167 14 50       45 $Event::ExecFlow::DEBUG && print "Job->start(".$self->get_info.")\n";
168              
169 14 50       56 if ( !$self->get_frontend ) {
170 0         0 require Event::ExecFlow::Frontend;
171 0         0 $self->set_frontend(Event::ExecFlow::Frontend->new);
172             }
173            
174 14         195 $self->init;
175 14         34 $self->set_state("running");
176              
177 14         31 $self->get_frontend->report_job_start($self);
178            
179 14         77 $self->get_pre_callbacks->execute($self);
180            
181 14 50       72 if ( $self->get_error_message ) {
182 0         0 $self->execution_finished;
183 0         0 return 0;
184             }
185            
186 14 50       62 if ( $self->get_warning_message ) {
187 0         0 $self->get_warning_callbacks->execute($self);
188 0         0 $self->get_frontend->report_job_warning($self);
189             }
190              
191 14 50       85 if ( $self->get_skipped ) { # may be set by pre_callbacks
192 0         0 $self->execution_finished;
193 0         0 return 0;
194             }
195              
196 14         60 $self->execute;
197            
198 14         131 1;
199             }
200              
201             sub reset {
202 0     0 0 0 my $self = shift;
203            
204 0 0 0     0 return if $self->get_state eq 'running' or
205             $self->get_state eq 'waiting';
206            
207 0         0 $self->set_state("waiting");
208 0         0 $self->set_progress_start_time();
209 0         0 $self->set_progress_end_time();
210 0         0 $self->set_cancelled();
211 0         0 $self->set_error_message();
212 0         0 $self->set_last_percent_logged(0);
213 0         0 $self->set_last_progress();
214 0         0 $self->set_progress_cnt(0);
215            
216 0         0 $self->get_frontend->report_job_progress($self);
217              
218 0         0 1;
219             }
220              
221             sub cancel {
222 0     0 0 0 die "Missing implementation for method cancel() of object ".shift;
223             }
224              
225             sub execute {
226 0     0 0 0 die "Missing implementation for method execute() of object ".shift;
227             }
228              
229             sub pause {
230 0     0 0 0 my $self = shift;
231            
232 0         0 $self->set_paused(!$self->get_paused);
233 0         0 $self->pause_job;
234              
235 0 0       0 if ( $self->get_paused ) {
236 0         0 $self->set_paused_start_time(time);
237             }
238             else {
239 0         0 my $start_time = $self->get_paused_start_time;
240 0         0 my $duration = time - $start_time;
241 0         0 $self->set_paused_seconds($duration + $self->get_paused_seconds);
242 0         0 $self->set_paused_start_time();
243             }
244              
245 0         0 1;
246             }
247              
248             sub execution_finished {
249 14     14 0 36 my $self = shift;
250              
251 14 50       45 $Event::ExecFlow::DEBUG && print "Job->execution_finished(".$self->get_info.")\n";
252              
253 14         76 $self->set_progress_end_time(time);
254 14         46 $self->get_frontend->report_job_progress($self);
255              
256 14 50       65 if ( !$self->get_cancelled ) {
257 14 50       39 if ( $self->get_error_message ) {
258 0         0 $self->set_state("error");
259             }
260             else {
261 14         48 $self->set_state("finished");
262             }
263             }
264              
265 14         168 $self->get_post_callbacks->execute($self);
266              
267 14 50       64 $self->set_state("error") if $self->get_error_message;
268              
269 14         87 $self->get_frontend->report_job_finished($self);
270              
271 14 50       42 if ( !$self->get_cancelled ) {
272 14 50       37 if ( $self->get_error_message ) {
273 0         0 $self->get_error_callbacks->execute($self);
274 0         0 $self->get_frontend->report_job_error($self);
275             }
276              
277 14 50       53 if ( $self->get_warning_message ) {
278 0         0 $self->get_warning_callbacks->execute($self);
279 0         0 $self->get_frontend->report_job_warning($self);
280             }
281             }
282              
283 14 100 66     71 if ( $self->get_type ne 'group' and $self->get_state eq 'finished' ) {
284 11         25 my $parent = $self;
285 11         412 while ( $parent = $parent->get_group ) {
286 21         75 $parent->set_progress_cnt($parent->get_progress_cnt+1);
287 21         68 $self->get_frontend->report_job_progress($parent);
288             }
289             }
290              
291 14         37 1;
292             }
293              
294             sub emit_warning_message {
295 0     0 0 0 my $self = shift;
296 0         0 my ($warning) = @_;
297            
298 0         0 $self->get_frontend->report_job_warning($self, $warning);
299            
300 0         0 1;
301             }
302              
303 21     21 0 49 sub get_job_cnt { 1 }
304              
305             sub get_info {
306 0     0 0 0 my $self = shift;
307 0   0     0 return $self->get_title || $self->get_name || "Unnamed";
308             }
309              
310             sub get_progress_fraction {
311 0     0 0 0 my $self = shift;
312 0   0     0 my $max = $self->get_progress_max || 0;
313 0   0     0 my $cnt = $self->get_progress_cnt || 0;
314 0 0       0 return $max == 0 ? 0 : $cnt / $max;
315             }
316              
317             sub get_progress_percent {
318 0     0 0 0 my $self = shift;
319 0         0 return sprintf("%.2f", 100 * $self->get_progress_fraction);
320             }
321              
322             sub get_progress_text {
323 0     0 0 0 my $self = shift;
324 0         0 return $self->get_info.": ".$self->get_progress_stats;
325             }
326              
327             sub get_progress_stats {
328 0     0 0 0 my $self = shift;
329              
330 0 0       0 my $cancelled = $self->get_cancelled ? "[".__("Cancelled")."]" : "";
331 0 0 0     0 $cancelled ||= $self->get_error_message ? "[".__("Error")."]" : "";
332 0 0 0     0 $cancelled ||= $self->get_skipped ? "[".__("Skipped")."]" : "";
333              
334 0 0       0 return __("Waiting")." ".$cancelled if $self->get_state eq 'waiting';
335              
336 0         0 my $cnt = $self->get_progress_cnt;
337 0   0     0 my $max = $self->get_progress_max || 1;
338 0         0 my $time = ( time - $self->get_progress_start_time
339             - $self->get_paused_seconds );
340 0         0 my $ips_label = $self->get_progress_ips;
341 0         0 my $ips = "";
342              
343 0 0       0 if ( $self->get_progress_end_time ) {
344 0         0 $time = $self->get_progress_end_time
345             - $self->get_progress_start_time
346             - $self->get_paused_seconds;
347 0         0 my $text = __x( "Duration: {time}", time => $self->format_time($time) );
348 0 0       0 if ( $ips_label ) {
349 0   0     0 $time ||= 1;
350 0         0 $text .= ", $ips_label: ".sprintf( "%2.1f", $cnt / $time );
351             }
352 0         0 return $text." ".$cancelled;
353             }
354              
355 0 0       0 return $cancelled if $self->get_no_progress;
356 0 0       0 return __("Initializing")." ".$cancelled if ! defined $cnt;
357              
358 0 0 0     0 $ips = sprintf( ", %2.1f $ips_label", $cnt / $time )
359             if $ips_label && $time;
360              
361 0         0 my $elapsed = "";
362 0 0       0 $elapsed = ", "
363             . __x( "elapsed {time}", time => $self->format_time($time) )
364             if $self->get_type ne 'group';
365              
366 0         0 my $percent = $self->get_progress_percent.'%';
367 0 0       0 $percent .= __" finished" if $self->get_type eq 'group';
368              
369 0         0 my $eta = "";
370 0 0 0     0 $eta = ", ETA: "
      0        
371             . $self->format_time( int( $time * $max / $cnt ) - $time + 1 )
372             if $time > 5 && $cnt != 0 && $self->get_type ne 'group';
373              
374 0         0 my $int_percent = int( $cnt / $max * 100 );
375              
376 0 0       0 if ( $int_percent > $self->get_last_percent_logged + 10 ) {
377 0         0 $int_percent = int( $int_percent / 10 ) * 10;
378 0         0 $self->set_last_percent_logged($int_percent);
379 0         0 my $line = $self->get_info . ": "
380             . __x( "{percent}PERCENT done.",
381             percent => $int_percent );
382 0         0 $line =~ s/PERCENT/%/;
383 0         0 $self->log($line);
384             }
385              
386 0 0       0 $cancelled = " ".$cancelled if $cancelled;
387              
388 0         0 return "$percent$ips$elapsed$eta$cancelled";
389             }
390              
391             sub format_time {
392 0     0 0 0 my $self = shift;
393 0         0 my ($time) = @_;
394              
395 0         0 my ($h, $m, $s);
396 0         0 $h = int($time/3600);
397 0         0 $m = int(($time-$h*3600)/60);
398 0         0 $s = $time % 60;
399              
400 0 0       0 return sprintf ("%02d:%02d", $m, $s) if $h == 0;
401 0         0 return sprintf ("%02d:%02d:%02d", $h, $m, $s);
402             }
403              
404             sub log {
405 10     10 0 2199 my $self = shift;
406 10         34 $self->get_frontend->log(@_);
407 10         30 1;
408             }
409              
410             sub progress_has_changed {
411 36     36 0 129 my $self = shift;
412              
413 36   100     134 my $last_progress = $self->get_last_progress||"";
414 36         271 my $curr_progress = $self->get_progress_cnt."/".$self->get_progress_max;
415              
416 36 100       125 if ( $last_progress ne $curr_progress ) {
417 26         106 $self->set_last_progress($curr_progress);
418 26         202 return 1;
419             }
420             else {
421 10         53 return 0;
422             }
423              
424             }
425              
426             sub frontend_signal {
427 0     0 0   my $self = shift;
428 0           my ($signal, @args) = @_;
429            
430 0           my $method = "signal_$signal";
431 0           $self->get_frontend->$method(@args);
432            
433 0           1;
434             }
435              
436             sub get_max_diskspace_consumed {
437 0     0 0   my $self = shift;
438 0           my ($currently_consumed, $max_consumed) = @_;
439              
440 0           $currently_consumed += $self->get_diskspace_consumed;
441              
442 0 0         if ( $currently_consumed > $max_consumed ) {
443 0           $max_consumed = $currently_consumed;
444             }
445              
446 0           $currently_consumed -= $self->get_diskspace_freed;
447            
448 0           return ($currently_consumed, $max_consumed);
449             }
450              
451             sub backup_state {
452 0     0 0   my $self = shift;
453            
454 0           my %data = %{$self};
  0            
455            
456             delete @data{
457 0           qw(
458             pre_callbacks
459             post_callbacks
460             error_callbacks
461             warning_callbacks
462             frontend
463             group
464             _post_callbacks_added
465             )
466             };
467              
468 0           $data{type} = $self->get_type;
469              
470 0           return \%data;
471             }
472              
473             sub restore_state {
474 0     0 0   my $self = shift;
475 0           my ($data_href) = @_;
476            
477 0 0         if ( $data_href->{type} ne $self->get_type ) {
478 0           die "Can't restore job state due to data type mismatch: ".
479             "Job type=".$self->get_type.", ".
480             "Data type=".$data_href->{type};
481             }
482              
483 0           foreach my $key ( keys %{$data_href} ) {
  0            
484 0           $self->{$key} = $data_href->{$key};
485             }
486              
487 0           delete $self->{type};
488              
489 0 0         $self->set_state("waiting")
490             if $self->get_state eq 'running';
491            
492 0           1;
493             }
494              
495             sub add_stash {
496 0     0 0   my $self = shift;
497 0           my ($add_stash) = @_;
498            
499 0           my $stash = $self->get_stash;
500            
501 0           while ( my ($k, $v) = each %{$add_stash} ) {
  0            
502 0           $stash->{$k} = $v;
503             }
504            
505 0           1;
506             }
507              
508             sub get_job_with_id {
509 0     0 0   my $self = shift;
510 0           my ($job_id) = @_;
511 0 0         return $self if $job_id eq $self->get_id;
512 0           return;
513             }
514              
515             1;
516              
517             __END__