File Coverage

blib/lib/Parallel/Supervisor.pm
Criterion Covered Total %
statement 100 139 71.9
branch 21 48 43.7
condition 1 3 33.3
subroutine 20 24 83.3
pod 14 18 77.7
total 156 232 67.2


line stmt bran cond sub pod time code
1             package Parallel::Supervisor;
2             # A module for managing children / forked processes
3             # used in a supervisor / worker pattern, eg Parallel::ForkManager
4              
5 2     2   57668 use strict;
  2         4  
  2         88  
6 2     2   10 use warnings;
  2         4  
  2         66  
7 2     2   12 use Carp qw(cluck carp);
  2         12  
  2         174  
8 2     2   4202 use Symbol qw(geniosym);
  2         2268  
  2         182  
9 2     2   2136 use IO::Pipe;
  2         18670  
  2         62  
10 2     2   20 use IO::Handle;
  2         2  
  2         2600  
11              
12             our $VERSION = "0.03";
13              
14             sub new # create a new collection of processes
15             {
16 2     2 1 26 my $class = shift;
17 2         16 my $self = {
18             # data
19             'STRUCTS' => {}, # children ready to run (idx name)
20             'PROCESSES' => {}, # children running (idx pid)
21             'FINISHED' => {}, # children finished running (idx name)
22             'NAMES' => {} # index on running child names (idx pid)
23             };
24            
25 2         6 bless($self, $class);
26 2         6 return $self;
27             }
28              
29             #### Getters for dereferencing Object's data structures
30             sub structs # returns hash of all the jobs prepared but not attached
31 2     2 0 8 { my $self = shift; return $self->{STRUCTS} }
  2         16  
32              
33             sub processes # returns hash of all the jobs attached
34 2     2 0 7 { my $self = shift; return $self->{PROCESSES} }
  2         15  
35              
36             sub names # returns a hash of all pid->STRUCTS of attached processes (%NAMES)
37 1     1 0 7 { my $self = shift; return $self->{NAMES} }
  1         5  
38              
39             sub finished # returns a hash of all pid->STRUCTS of completed processes (%FINISHED)
40 3     3 0 11 { my $self = shift; return $self->{FINISHED} }
  3         30  
41              
42             sub prepare # (name, cmd_to_eval) create a child struct and add it to %structs
43             {
44 7     7 1 2533 my ($self, $name, @cmd_args) = @_;
45 7         26 my $cmd = "@cmd_args";
46              
47 7 50       22 return undef unless defined $name;
48 7 50       20 return undef if $self->is_ready($name);
49              
50             #### Setup IPC
51 7         43 my ($child_writer, $parent_reader) = (geniosym, geniosym);
52 7 50       296 eval {
53 7         257 pipe( $parent_reader, $child_writer );
54             } or cluck "Failed to create pipe for reading child output.";
55              
56 7         421 $parent_reader->autoflush(1);
57 7         334 $child_writer->autoflush(1);
58              
59 7         613 $self->{STRUCTS}->{$name} = { id => $name,
60             cmd => "$cmd",
61             child_writer => $child_writer,
62             parent_reader => $parent_reader
63             };
64            
65 7         30 return 1;
66             }
67              
68             sub is_ready # check whether name is in %structs;
69             {
70 12     12 1 28 my ($self,$name) = @_;
71              
72 12 100       168 return undef unless (keys(%{$self->{STRUCTS}} ) );
  12         80  
73 8         14 for ( keys %{$self->{STRUCTS}} ) {
  8         25  
74 9 100       46 return 1 if $_ eq $name;
75             }
76 5         19 return undef ;
77             }
78              
79             sub is_attached # check for a running process with this name
80             {
81 7     7 1 17 my ($self,$name) = @_;
82              
83 7 50       23 return undef unless ( $self->{NAMES} );
84 7         11 return grep {$_ eq $name} (values( %{ $self->{NAMES} } ) );
  2         29  
  7         38  
85             }
86              
87             # move from %structs to %processes with given pid, register @name
88             sub attach # ident, pid
89             {
90 3     3 1 19121 my ($self,$name,$pid) = @_;
91              
92 3 50       157 return undef unless $pid =~ /^\d+$/;
93 3 50       23 return undef if ( $self->is_attached( $name ) ) ;
94 3 50       15 return undef unless ( $self->is_ready( $name ) );
95              
96 3         55 $self->{PROCESSES}{$pid} = $self->{STRUCTS}{$name};
97 3         19 delete $self->{STRUCTS}{$name};
98 3         5 $self->{NAMES}->{$pid} = $name;
99              
100 3         21 return $?;
101             }
102              
103             # move from %processes to %structs (does not check whether process still exists)
104             sub detach # pid
105             {
106 2     2 1 328852 my ($self, $pid) = @_;
107              
108 2 50 33     41 if ( (! defined $pid) || ($pid !~ /^\d+$/) ) {
109 0         0 carp "Error! Can't detach non-numeric pid $pid\n";
110 0         0 return undef;
111             }
112              
113             # TODO : verify actual running process with pid
114 2         11 my $name = $self->{PROCESSES}{$pid}{id};
115 2 50       9 if (! defined $name ) {
116 0         0 carp "detach: could not find process record for pid $pid\n";
117 0         0 return undef;
118             }
119 2 50       8 if ($self->is_ready($name) ) {
120 0         0 carp "detach: name $name is ready, not running as $pid\n";
121 0         0 return undef;
122             }
123 2 50       16 if ($self->{NAMES}) {
124 2         8 delete $self->{NAMES}->{$pid};
125             }
126              
127 2         11 $self->{FINISHED}{$name} = $self->{PROCESSES}{$pid};
128 2         5 delete ${$self->{PROCESSES}}{$pid};
  2         6  
129 2         11 return 1;
130             }
131              
132             # delete the struct
133             sub forget # ident
134             {
135 3     3 1 10 my ($self, $name) = @_;
136              
137 3 50       15 return undef unless defined $name;
138 3 50       12 if ($self->is_attached($name) ) {
139 0         0 carp "Cannot forget $name because it is alive.";
140 0         0 return undef;
141             }
142 3         54 delete $self->{STRUCTS}->{$name};
143 3         19 delete $self->{FINISHED}->{$name};
144 3         11 return 1;
145             }
146              
147             sub reset # delete all STRUCTS, PROCESSES, NAMES, FINISHED
148             {
149 1     1 1 2 my $self = shift;
150 1         3 $self->{STRUCTS} = {}; # children ready to run (idx name)
151 1         27 $self->{PROCESSES} = {}; # children running (idx pid)
152 1         18 $self->{FINISHED} = {}; # children finished running (idx n ame)
153 1         16 $self->{NAMES} = {}; # index on running child names (idx pid)
154             }
155             # NOTE: an all numeric name could be confused with a PID!!!!
156             # since both structs and processes are checked for a match
157             sub get_child # return the hash for child with this name
158              
159             {
160 0     0 1 0 my ($self, $name) = @_;
161              
162 0         0 my %ret = ();
163 0 0       0 return \%ret unless defined $name;
164 0 0       0 if ( $self->is_ready( $name ) ) {
165             # print STDERR "get_child: $name is ready\n";
166 0         0 %ret = $self->{STRUCTS}->{$name};
167             }
168 0 0       0 if ( $self->is_attached( $name ) ) {
169             # print STDERR "get_child: $name is alive\n";
170 0         0 %ret = $self->{PROCESSES}->{$name};
171 0         0 $ret{ACTIVE} = 1;
172 0         0 return \%ret;
173             }
174             # print STDERR "Could not find the process $name\n";
175 0         0 return \%ret;
176             }
177              
178             # avoid iterating through %processes just to look at each id
179             sub get_names # return array of alive processes by name
180             {
181 0     0 1 0 my $self = shift;
182 0         0 my @ret = ();
183 0 0       0 if ($self->{NAMES}) {
184 0         0 @ret = values(%{$self->{NAMES}} );
  0         0  
185             }
186 0         0 return \@ret;
187             }
188              
189             sub get_pids # returns a list with pid of all alive processes
190             {
191 1     1 1 3 my $self = shift;
192 1         2 my @ret = keys %{$self->{PROCESSES}};
  1         5  
193 1 50       46 return wantarray ? @ret : \@ret; #cheating a bit here
194             }
195              
196             # NOTE: will retrun fh refs to the read end of the pipe
197             # for all processes which have been attached or detached
198             # check keys($self->processes) to see if any children are running
199             sub get_readers # hash of name=>parent_reader_ IO handles
200             {
201 0     0 1 0 my $self = shift;
202 0         0 my %handles = ();
203 0         0 while ( my ($k,$v) = each(%{$self->{PROCESSES}} ) ) {
  0         0  
204 0         0 $handles{$v->{id} } = $v->{parent_reader};
205             }
206 0         0 while ( my ($k,$v) = each(%{$self->{FINISHED}} ) ) {
  0         0  
207 0         0 $handles{$v->{id} } = $v->{parent_reader};
208             }
209 0         0 return \%handles;
210             }
211              
212             sub get_all_ready # return list of ready STRUCTS
213             {
214 0     0 1 0 my $self = shift;
215 0         0 my %ret = ();
216 0 0       0 return \%ret unless ($self->{STRUCTS}) ;
217 0         0 %ret = %{ $self->{STRUCTS} };
  0         0  
218             # print STDERR "get_all_ready found ". keys(%ret) . " elements\n";
219 0         0 return \%ret;
220             }
221              
222             # exercise caution when looping through children:
223             # this can be used to test if child is acivated, otherwise leads to infinite loop
224             sub get_next_ready # returns a hash of a ready child process, or an empty hash
225             {
226 2     2 1 326 my $self = shift;
227 2         8 my %ret = ();
228 2 50       10 return \%ret unless defined $self->{STRUCTS};
229 2 50       4 return \%ret unless (keys(%{$self->{STRUCTS}} ) );
  2         10  
230 2         6 my @ids = keys(%{$self->{STRUCTS}} );
  2         8  
231             # ensure sort order enforced so caller can plan ahead
232 2         36 @ids = sort @ids;
233              
234 2 50       10 return \%ret unless defined $ids[0];
235 2         4 my $idx = $ids[0];
236 2         4 %ret = %{$self->{STRUCTS}{$idx}} ;
  2         14  
237 2         14 return \%ret;
238             }
239              
240             # methods after =cut are processed by the autosplit program and only compiled on demand
241              
242              
243             1;
244              
245             __END__