File Coverage

blib/lib/App/Pipeline/Simple.pm
Criterion Covered Total %
statement 206 285 72.2
branch 63 128 49.2
condition 20 52 38.4
subroutine 30 32 93.7
pod 22 22 100.0
total 341 519 65.7


line stmt bran cond sub pod time code
1             #-----------------------------------------------------------------
2             # App::Pipeline::Simple
3             #
4             ## no critic
5             package App::Pipeline::Simple;
6             # ABSTRACT: Simple workflow manager
7             our $VERSION = '0.9.1'; # VERSION
8              
9 2     2   98477 use strict;
  2         4  
  2         71  
10 2     2   10 use warnings;
  2         3  
  2         52  
11 2     2   2186 use autodie;
  2         256586  
  2         15  
12             ## use critic
13              
14 2     2   19420 use Carp;
  2         8  
  2         195  
15 2     2   14 use File::Basename;
  2         4  
  2         227  
16 2     2   2828 use File::Copy;
  2         7497  
  2         162  
17 2     2   2729 use YAML::Syck;
  2         4843  
  2         166  
18 2     2   3341 use Log::Log4perl qw(get_logger :levels :no_extra_logdie_message);
  2         350568  
  2         21  
19             #use Data::Printer;
20              
21             #-----------------------------------------------------------------
22             # Global variables
23             #-----------------------------------------------------------------
24              
25             my $logger_level = {
26             '-1' => $WARN,
27             '0' => $INFO,
28             '1' => $DEBUG,
29             };
30              
31             #-----------------------------------------------------------------
32             # new
33             #-----------------------------------------------------------------
34             sub new {
35 3     3 1 1454 my ($class, @args) = @_;
36              
37             # create an object
38 3   33     29 my $self = bless {}, ref ($class) || $class;
39              
40             # set all @args into this object with 'set' values
41 3 50       25 my (%args) = (@args == 1 ? (value => $args[0]) : @args);
42              
43             # do dir() first so that we know where to write the log
44 3 100       18 $self->dir($args{'dir'}) if defined $args{'dir'};
45              
46             # start logging
47 3         15 $self->_configure_logging;
48              
49 3         12 foreach my $key (keys %args) {
50 8 100       49 next if $key eq 'config'; # this needs to be evaluated last
51 7 100       19 next if $key eq 'dir'; # done this
52             ## no critic
53 2     2   490 no strict 'refs';
  2         5  
  2         9925  
54             ## use critic
55 6         30 $self->$key($args{$key});
56             }
57             # delayed to first find out the verbosity level
58 3         14 $self->logger->info("Logging into file: [ ". $self->dir. '/pipeline.log'. " ]");
59              
60             # this argument needs to be done last
61 3 100       82 $self->config($args{'config'}) if defined $args{'config'};
62              
63             # look into dir() if config not given
64 3 50 66     17 $self->config($self->dir. '/config.yml')
      66        
65             if not $self->{config} and defined $self->dir and -e $self->dir. '/config.yml';
66              
67             # die if no config found
68 3 50 66     17 $self->logger->fatal("pipeline config file not provided or not found in pwd")
69             if not $self->{config} and not $self->debug;
70              
71             # done
72 3         59 return $self;
73             }
74              
75              
76             #-----------------------------------------------------------------
77             # Configure the logger
78             #-----------------------------------------------------------------
79              
80             sub _configure_logging {
81 3     3   6 my $self = shift;
82              
83 3         8 my $logger_config = q(
84             log4perl.category.Pipeline = INFO, Screen
85             log4perl.appender.Screen = Log::Log4perl::Appender::Screen
86             log4perl.appender.Screen.stderr = 1
87             log4perl.appender.Screen.layout = Log::Log4perl::Layout::SimpleLayout
88             );
89              
90 3         26 Log::Log4perl->init_once( \$logger_config );
91 3         10212 my $logger = Log::Log4perl->get_logger("Pipeline");
92              
93 3 100       93 if ($self->dir) {
94 1         6 my $to_file = Log::Log4perl::Appender->new
95             ("Log::Log4perl::Appender::File",
96             name => 'Log',
97             filename => $self->dir. '/pipeline.log',
98             mode => 'append');
99 1         10763 my $pattern = '%d [%r] %p %L | %m%n';
100 1         13 my $layout = Log::Log4perl::Layout::PatternLayout->new ($pattern);
101 1         2279 $to_file->layout ($layout);
102              
103 1         16 $logger->add_appender($to_file);
104             }
105              
106 3         670 $logger->level( $INFO );
107 3         1592 $self->logger($logger);
108             }
109              
110              
111              
112             #-----------------------------------------------------------------
113             #
114             #-----------------------------------------------------------------
115              
116             sub verbose {
117 4     4 1 23 my ($self, $value) = @_;
118 4 100       35 if (defined $value) {
119 3         7 $self->{_verbose} = $value;
120              
121             # verbose = -1 0 1
122             # log level = WARN INFO DEBUG
123              
124 3         9 $self->logger->level( $logger_level->{$value} );
125             }
126 4         1766 return $self->{_verbose};
127             }
128              
129             sub id {
130 29     29 1 664 my ($self, $value) = @_;
131 29 100       537 if (defined $value) {
132 7         21 $self->{_id} = $value;
133             }
134 29         460 return $self->{_id};
135             }
136              
137             sub description {
138 2     2 1 15 my ($self, $value) = @_;
139 2 50       10 if (defined $value) {
140 2         11 $self->{_description} = $value;
141             }
142 2         21 return $self->{_description};
143             }
144              
145             sub name {
146 3     3 1 10 my ($self, $value) = @_;
147 3 100       14 if (defined $value) {
148 2         5 $self->{_name} = $value;
149             }
150 3         13 return $self->{_name};
151             }
152              
153             sub path {
154 2     2 1 5 my ($self, $value) = @_;
155 2 100       9 if (defined $value) {
156 1         3 $self->{_path} = $value;
157             }
158 2         8 return $self->{_path};
159             }
160              
161             sub next_id {
162 1     1 1 3 my ($self, $value) = @_;
163 1 50       6 if (defined $value) {
164 1         4 $self->{_next_id} = $value;
165             }
166 1         6 return $self->{_next_id};
167             }
168              
169              
170             sub input {
171 0     0 1 0 my ($self, $value) = @_;
172 0 0       0 if (defined $value) {
173 0         0 $self->{_input} = $value;
174             }
175 0         0 return $self->{_input};
176             }
177              
178              
179             sub itype {
180 1     1 1 3 my ($self, $value) = @_;
181 1 50       5 if (defined $value) {
182 0         0 $self->{_itype} = $value;
183             }
184 1         11 return $self->{_itype};
185             }
186              
187             sub start {
188 1     1 1 3 my ($self, $value) = @_;
189 1 50       7 if (defined $value) {
190 0         0 $self->{_start} = $value;
191             }
192 1         8 return $self->{_start};
193             }
194              
195              
196             sub stop {
197 0     0 1 0 my ($self, $value) = @_;
198 0 0       0 if (defined $value) {
199 0         0 $self->{_stop} = $value;
200             }
201 0         0 return $self->{_stop};
202             }
203              
204              
205             sub debug {
206 4     4 1 8 my ($self, $value) = @_;
207 4 100       10 if (defined $value) {
208 2         5 $self->{_debug} = $value;
209             }
210 4         11 return $self->{_debug};
211             }
212              
213             sub logger {
214 34     34 1 136 my ($self, $value) = @_;
215 34 100       169 if (defined $value) {
216 3         9 $self->{_logger} = $value;
217             }
218 34         544 return $self->{_logger};
219             }
220              
221             sub config {
222 1     1 1 2 my ($self, $config) = @_;
223              
224 1 50       4 if ($config) {
225 1         3 $self->logger->info("Using config file: [ ". $config. " ]");
226 1         12981 my $pwd = `pwd`; chomp $pwd;
  1         26  
227 1         35 $self->logger->debug("pwd: $pwd");
228 1         30 $self->logger->debug("config: $config");
229 1 50       61 die unless -e $config;
230             # copy the pipeline config
231              
232 1 50 33     17 if ($self->dir and not -e $self->dir."/config.yml") {
233 1         5 copy $config, $self->dir."/config.yml";
234 1         603 $self->logger->info("Config file [ $config ] copied to: [ ".
235             $self->dir."/config.yml ]");
236             }
237              
238 1         19 $self->{config} = LoadFile($self->dir."/config.yml");
239              
240             # set pipeline start parameters
241 1         590 $self->id('s0');
242 1   50     29 $self->name($self->{config}->{name} || '');
243 1   50     17 $self->description($self->{config}->{description} || '');
244              
245             # go through all steps once
246 1         2 my $nexts; # hashref for finding start point(s)
247 1         3 for my $id (sort keys %{$self->{config}->{steps}}) {
  1         18  
248 5         12 my $step = $self->{config}->{steps}->{$id};
249              
250             # bless all steps into Pipeline objects
251 5         24 bless $step, ref($self);
252              
253             #print "ERROR: $id already exists\n" if defined $self->step($id);
254             # create the list of all steps to be used by each_step()
255 5         18 $step->id($id);
256 5         8 push @{$self->{steps}}, $step;
  5         12  
257              
258             # a step without a parent is a starting point, store those with children
259 5         9 foreach my $next (@{$step->{next}}) {
  5         14  
260 4         16 $nexts->{$next}++;
261             }
262             }
263              
264             # store starting points, not listed as children
265 1         11 foreach my $step ($self->each_step) {
266 5 100       18 push @{$self->{next}}, $step->id
  1         24  
267             unless $nexts->{$step->id}
268             }
269              
270             #run needs to fail if starting input values are not set!
271             # my $s = p $self->config;# here x
272             # $self->logger->info("Self-config after: [". $s. "]" );# here x
273              
274             # insert the startup value into the appropriate starting step
275             # unless we are reading old config
276 1 50 33     8 if ($self->itype and $self->input) { # only if new starting input value has been given
277 0         0 $self->logger->info("Input value: [". $self->input. "]" );
278 0         0 $self->logger->info("Input type: [". $self->itype. "]" );
279 0         0 my $real_start_id;
280 0         0 for my $step ( $self->each_step) {
281              
282             # if input type is right, insert the value
283             # note only one of the each type can be used
284 0         0 foreach my $key ( keys %{$step->{args}} ) {
  0         0  
285 0         0 my $arg = $step->{args}->{$key};
286              
287 0 0       0 next unless $key eq 'in';
288 0 0       0 next unless defined $arg->{type};
289 0 0       0 next unless $arg->{type} eq $self->itype;
290              
291 0         0 $arg->{value} = $self->input;
292 0         0 $real_start_id = $step->id;
293             }
294             }
295 0         0 $self->{next} = undef;
296 0         0 push @{$self->{next}}, $real_start_id;
  0         0  
297 0         0 $self->logger->info("Starting point: [". $real_start_id. "]" );
298              
299             # the stored config file needs to be overwritten with these modifications
300 0         0 open my $OUT, ">", $self->dir."/config.yml";
301 0         0 print $OUT Dump ($self->config);
302             }
303             }
304 1         4 return $self->{config};
305             }
306              
307             #-----------------------------------------------------------------
308             #
309             #-----------------------------------------------------------------
310             sub dir {
311 23     23 1 83 my ($self, $dir) = @_;
312 23 100       68 if ($dir) {
313 2 100 66     166 mkdir $dir unless -e $dir and -d $dir;
314 2 50 33     3928 croak "Can not create project directory $dir"
315             unless -e $dir and -d $dir;
316 2         17 $self->{_dir} = $dir;
317             }
318 23 100       370 $self->{_dir} || '';
319             }
320              
321             #-----------------------------------------------------------------
322             #
323             #-----------------------------------------------------------------
324             sub step {
325 10     10 1 56 my ($self) = shift;
326 10         30 my $id = shift;
327 10         41 return $self->{config}->{steps}->{$id};
328             }
329              
330             sub each_next {
331             #map { $_->{id} } grep { $_->{id} } @{shift->{next}};
332 17     17 1 31 @{shift->{next}};
  17         3305  
333             }
334              
335             sub each_step {
336 2     2 1 654 @{shift->{steps}};
  2         17  
337             }
338              
339              
340              
341             sub run {
342 1     1 1 4 my ($self) = shift;
343 1 50       5 unless ($self->dir) {
344 0         0 $self->logger->fatal("Need an output directory to run()");
345 0         0 croak "Need an output directory to run()";
346             }
347              
348             ###
349             # check for input file and warn if not found
350              
351 1         16 chdir $self->{_dir};
352              
353             #
354             # Determine where in the pipeline to start
355             #
356              
357 1         1745 my @steps; # array of next execution points
358              
359             # User has given a starting point id
360 1 50       13 if ($self->start) {
    50          
361 0         0 $self->logger->info("Start point: user input [". $self->start. "]" );
362 0         0 push @steps, $self->start;
363 0         0 $self->logger->info("Starting at [". $self->start. "]" );
364             }
365             # determine if and where the execution of the pipeline was interrupted
366             elsif (-e $self->dir. "/pipeline.log") {
367 1         7 $self->logger->info("Start point: consult the log [".
368             $self->dir. "/pipeline.log ]");
369 1 50       17 open my $LOG, '<', $self->dir. "/pipeline.log"
370             or $self->logger->fatal("Can't open ". $self->dir.
371             "/pipeline.log for reading: $!");
372 1         2635 my $in_execution;
373              
374             # look into only the latest run
375             my @log;
376 1         34 while (<$LOG>) {
377 0         0 push @log, $_;
378 0 0       0 @log = () if /Run started/;
379             }
380              
381 1         3 my $done;
382 1         5 for (@log) {
383 0 0       0 next unless /\[(\d+)\]/;
384 0         0 undef $in_execution; # start of a new run
385 0 0       0 next unless /\| (Running|Finished) +\[(\w+)\]/;
386 0 0       0 $in_execution->{$2}++ if $1 eq 'Running';
387 0 0       0 delete $in_execution->{$2} if $1 eq 'Finished';
388 0 0       0 $done = 1 if /DONE/;
389             }
390              
391 1         7 @steps = sort keys %$in_execution;
392 1 50 33     26 if (scalar @steps == 0 and $done) {
    50          
393 0         0 $self->logger->warn("Pipeline is already finished. ".
394             "Drop -config and define the start step to rerun" );
395 0         0 exit 0;
396             }
397             elsif (@steps) {
398 0         0 $self->logger->info("Continuing at ". $steps[0] );
399             } else {
400             # start from beginning
401 1         8 @steps = $self->each_next;
402 1         7 $self->logger->info("Starting at [". $steps[0] . "]");
403             }
404             }
405             else {
406             # start from beginning
407 0         0 $self->logger->info("Start point: start from beginning" );
408 0         0 @steps = $self->each_next;
409 0         0 $self->logger->info("Starting at [". $steps[0] . "]");
410              
411             }
412              
413             #
414             # Execute one step at a time
415             #
416              
417 1         34 $self->logger->info("Run started");
418              
419 1         12 while (my $step_id = shift @steps) {
420 5         27 $self->logger->debug("steps: [". join (", ", @steps). "]");
421 5         67 my $step = $self->step($step_id);
422 5 50       34 croak "ERROR: Step [$step_id] does not exist" unless $step;
423             # check that we got an object
424              
425             # check that the input file exists
426 5         11 foreach my $arg (@{$step->{arg}}) {
  5         101  
427 0 0       0 next unless $arg->{key} eq 'in';
428 0 0       0 next unless $arg->{type} =~ /file|dir/ ;
429             }
430              
431 5         27 my $command = $step->render;
432 5         21 $self->logger->info("Running [". $step->id . "] $command" );
433 5         130482 `$command`;
434 5         685 $self->logger->info("Finished [". $step->id . "]" );
435              
436             # Add next step(s) to the execution queue unless
437             # the user has asked to stop here
438 5 50 33     647 if ( defined $self->{_stop} and $step->id eq $self->{_stop} ) {
439 0         0 $self->logger->info("Stopping at [". $step->id . "]" );
440             } else {
441 5         59 push @steps, $step->each_next;
442             }
443              
444             }
445 1         25 $self->logger->info("DONE" );
446 1         44 return 1;
447             }
448              
449              
450             #-----------------------------------------------------------------
451             # Render a step into a command line string
452             #-----------------------------------------------------------------
453              
454             sub render {
455 10     10 1 20 my ($step, $display) = @_;
456              
457 10         17 my $str;
458             # path to program
459 10 50       42 if (defined $step->{path}) {
460 0         0 $str .= $step->{path};
461 0 0       0 $str .= '/' unless substr($str, -1, 1) eq '/' ;
462             }
463             # program name
464 10   50     52 $str .= $step->{name} || '';
465              
466             # arguments
467 10         33 my $endstr = '';
468              
469 10         20 foreach my $key (keys %{$step->{args}}) {
  10         98  
470 28         66 my $arg = $step->{args}->{$key};
471              
472 28 100       92 if (ref $arg) {
473 20 100 66     234 if (defined $arg->{type} and $arg->{type} eq 'unnamed') {
    50 33        
    0          
474 2         17 $str .= ' '. $arg->{value};
475             }
476             elsif (defined $arg->{type} and $arg->{type} eq 'redir') {
477 18 100       64 if ($key eq 'in') {
    50          
478 8         52 $endstr .= " < ". $arg->{value};
479             }
480             elsif ($key eq 'out') {
481 10         44 $endstr .= " > ". $arg->{value};
482             } else {
483 0         0 croak "Unknown key ". $key;
484             }
485             }
486             elsif (defined $arg->{value}) {
487 0         0 $str .= " -". $key. " ". $arg->{value};
488             } else {
489 0         0 $str .= " -". $key;
490             }
491             } else {
492 8 100       24 if ($arg) {
493 2         8 $str .= " -". $key. " ". $arg;
494             } else {
495 6         25 $str .= " -". $key;
496             }
497             }
498             }
499 10         24 $str .= $endstr;
500              
501 10 100       33 $str =~ s/(['"])/\\$1/g if $display;
502              
503 10         35 return $str;
504             }
505              
506             sub stringify {
507 1     1 1 7 my ($self) = @_;
508              
509 1         5 $self->logger->info("Stringify starting" );
510              
511 1         12 my @res;
512             # add checks for duplicated ids
513              
514             # add check for a next pointer that leads nowhere
515              
516 1         6 my @steps = $self->each_next;
517              
518 1         2 my $outputs; # hashref for storing input and output filenames
519 1         6 while (my $step_id = shift @steps) {
520 5         16 my $step = $self->step($step_id);
521              
522 5         12 push @res, $step->id, "\n";
523 5         15 push @res, "\t", $step->render('4display'), " # ";
524 5         17 map { push @res, "->", $_, " " } $step->each_next;
  4         20  
525              
526 5         14 push @steps, $step->each_next;
527              
528 5         8 foreach my $arg (@{$step->{arg}}) {
  5         21  
529 0 0 0     0 if ($arg->{key} eq 'out') {
    0          
530 0         0 for ($step->each_next) {
531 0 0       0 push @res, "\n\t", "WARNING: Output file [".
532             $arg->{value}."] is read by [",
533             $outputs->{$arg->{value}}, "] and [$_]"
534             if $outputs->{$arg->{value}};
535              
536 0         0 $outputs->{$arg->{value}} = $_;
537             }
538             }
539             elsif ($arg->{key} eq 'in' and $arg->{type} ne 'redir') {
540 0   0     0 my $prev_step_id = $outputs->{$arg->{value}} || '';
541 0 0 0     0 push @res, "\n\t". "ERROR: Output from the previous step is not [".
      0        
542             ($arg->{value} || ''). "]"
543             if $prev_step_id ne $step->id and $prev_step_id eq $self->id;
544             }
545             # test for steps not referenced by other steps (missing next tag)
546             }
547 5         23 push @res, "\n";
548             }
549 1         16 return join '', @res;
550             }
551              
552              
553             sub graphviz {
554 1     1 1 6 my $self = shift;
555 1         9 my $function = shift;
556              
557 1         16 $self->logger->info("Graphing started. Redirect to a dot file" );
558              
559 1         800 require GraphViz;
560 0           my $g= GraphViz->new;
561              
562 0           my $end;
563 0           $g->add_node($self->id,
564             label => $self->id.
565             $self->render('4display'), rank => 'top');
566 0           map { $g->add_edge('s0' => $_) } $self->each_next;
  0            
567 0 0         if ($self->description) {
568 0           $g->add_node('desc', label => $self->description,
569             shape => 'box', rank => 'top');
570 0           $g->add_edge('s0' => 'desc');
571             }
572              
573 0           foreach my $step ($self->each_step) {
574 0           $g->add_node($step->id, label => $step->id );
575 0 0         if ($step->each_next) {
576 0           map { $g->add_edge($step->id => $_, label => " ". $step->render('display') ) }
  0            
577             $step->each_next;
578             } else {
579 0           $end++;
580 0           $g->add_node($end, label => ' ');
581 0           $g->add_edge($step->id => $end, label => " ". $step->render('display') );
582             }
583              
584             }
585 0           return $g->as_dot;
586              
587 0           $self->logger->info("Graphing done. Process the dot ".
588             "file (e.g. dot -Tpng p.dot|display " );
589              
590             }
591              
592             1;
593              
594             __END__