File Coverage

lib/NBI/Pipeline.pm
Criterion Covered Total %
statement 45 47 95.7
branch 10 16 62.5
condition 2 5 40.0
subroutine 8 8 100.0
pod 4 4 100.0
total 69 80 86.2


line stmt bran cond sub pod time code
1             package NBI::Pipeline;
2             #ABSTRACT: Ordered list of NBI::Job objects with dependency wiring
3             #
4             # NBI::Pipeline - Minimal multi-job orchestration for nbilaunch.
5             #
6             # DESCRIPTION:
7             # Holds an ordered list of NBI::Job objects. When run() is called, jobs are
8             # submitted in order. If a job has _nbi_depends_on set to a previous job
9             # object, the afterok dependency is injected into its NBI::Opts before
10             # submission.
11             #
12             # This is intentionally minimal in v1. It does not resolve filenames between
13             # jobs - complex launchers handle that via NBI::Manifest->load().
14             #
15             # USAGE IN A LAUNCHER:
16             # sub build {
17             # my ($self, %args) = @_;
18             # my ($job1, $m1) = $self->_build_step1(%args);
19             # my ($job2, $m2) = $self->_build_step2(%args);
20             # $job2->{_nbi_depends_on} = $job1;
21             # return (NBI::Pipeline->new(jobs => [$job1, $job2]), $m1);
22             # }
23             #
24             # nbilaunch checks ref($result): NBI::Job → single submit, NBI::Pipeline → run().
25             #
26              
27 1     1   1159 use 5.012;
  1         3  
28 1     1   3 use strict;
  1         2  
  1         17  
29 1     1   2 use warnings;
  1         1  
  1         43  
30 1     1   3 use Carp qw(confess);
  1         1  
  1         491  
31              
32             $NBI::Pipeline::VERSION = $NBI::Slurm::VERSION;
33              
34             sub new {
35 5     5 1 628 my ($class, %args) = @_;
36              
37 5   50     15 my $jobs = $args{jobs} // [];
38 5 50       25 ref $jobs eq 'ARRAY'
39             or confess "ERROR NBI::Pipeline: 'jobs' must be an arrayref\n";
40              
41 5         10 for my $j (@$jobs) {
42 9 100       205 $j->isa('NBI::Job')
43             or confess "ERROR NBI::Pipeline: each job must be an NBI::Job instance\n";
44             }
45              
46 4         32 return bless { jobs => [@$jobs] }, $class;
47             }
48              
49             # ── add_job($job) ─────────────────────────────────────────────────────────────
50             sub add_job {
51 1     1 1 961 my ($self, $job) = @_;
52 1 50       4 $job->isa('NBI::Job')
53             or confess "ERROR NBI::Pipeline: job must be an NBI::Job instance\n";
54 1         2 push @{ $self->{jobs} }, $job;
  1         2  
55 1         2 return $self;
56             }
57              
58             # ── run() ─────────────────────────────────────────────────────────────────────
59             # Submit all jobs in order. Wires afterok dependencies automatically.
60             # Returns a list of Slurm job IDs (integers).
61             sub run {
62 2     2 1 13 my ($self) = @_;
63              
64 2         3 my @job_ids;
65             my %submitted; # NBI::Job object → job_id (for dependency lookup)
66              
67 2         4 for my $job (@{ $self->{jobs} }) {
  2         6  
68             # Inject afterok dependency if this job depends on a previous one
69 4 100       17 if (my $dep_job = $job->{_nbi_depends_on}) {
70 1 50       16 my $dep_id = $submitted{$dep_job}
71             or confess "ERROR NBI::Pipeline: dependency job was not yet submitted\n";
72 1         15 $job->opts->add_option("--dependency=afterok:$dep_id");
73             }
74              
75             # Ensure the provenance directory exists before NBI::Job->run() writes there
76 4         15 my $tmpdir = $job->opts->tmpdir;
77 4 50 33     113 if ($tmpdir && !-d $tmpdir) {
78 0         0 require File::Path;
79 0 0       0 File::Path::make_path($tmpdir)
80             or confess "ERROR NBI::Pipeline: cannot create tmpdir '$tmpdir': $!\n";
81             }
82              
83 4         16 my $job_id = $job->run();
84 4         46 $submitted{$job} = $job_id;
85 4         49 push @job_ids, $job_id;
86             }
87              
88 2         57 return @job_ids;
89             }
90              
91             # ── print_summary() ───────────────────────────────────────────────────────────
92             # Human-readable dependency graph.
93             sub print_summary {
94 1     1 1 52 my ($self) = @_;
95 1         8 my @lines = ("Pipeline: " . scalar(@{ $self->{jobs} }) . " job(s)");
  1         6  
96 1         2 for my $job (@{ $self->{jobs} }) {
  1         6  
97 2         13 my $line = " [" . $job->name . "]";
98 2 100       54 if (my $dep = $job->{_nbi_depends_on}) {
99 1         7 $line .= " afterok:[" . $dep->name . "]";
100             }
101 2         11 push @lines, $line;
102             }
103 1         9 print join("\n", @lines) . "\n";
104             }
105              
106             1;
107              
108             __END__