File Coverage

blib/lib/Parallel/Forker.pm
Criterion Covered Total %
statement 78 180 43.3
branch 21 54 38.8
condition 9 47 19.1
subroutine 20 29 68.9
pod 20 23 86.9
total 148 333 44.4


line stmt bran cond sub pod time code
1             # See copyright, etc in below POD section.
2             ######################################################################
3              
4             package Parallel::Forker;
5             require 5.006;
6 34     34   2298099 use Carp qw(carp croak confess);
  34         292  
  34         1552  
7 34     34   169 use IO::File;
  34         35  
  34         3011  
8 34     34   9287 use Time::HiRes qw(usleep);
  34         21432  
  34         163  
9              
10 34     34   17956 use Parallel::Forker::Process;
  34         68  
  34         990  
11 34     34   141 use strict;
  34         52  
  34         661  
12 34     34   131 use vars qw($Debug $VERSION);
  34         62  
  34         56870  
13              
14             $VERSION = '1.254';
15              
16             ######################################################################
17             #### CONSTRUCTOR
18              
19             sub new {
20 53     53 1 23350 my $class = shift;
21 53         933 my $self = {
22             _activity => 1, # Optionally set true when a sig_child comes
23             _processes => {}, # All process objects, keyed by id
24             _labels => {}, # List of process objects, keyed by label
25             _runable => {}, # Process objects runable now, keyed by id
26             _running => {}, # Process objects running now, keyed *PID*
27             _run_after_eqn => undef,# Equation to eval to determine if ready to launch
28             _parent_pid => $$, # PID of initial process creating the forker
29             max_proc => undef, # Number processes to launch, <1=any, +=that number
30             use_sig_child => undef, # Default to not using SIGCHLD handler
31             @_
32             };
33 53   33     514 bless $self, ref($class)||$class;
34 53         137 return $self;
35             }
36              
37             #### ACCESSORS
38              
39             sub in_parent {
40 1     1 1 222 my $self = shift;
41 1         7 return $self->{_parent_pid}==$$;
42             }
43              
44             sub max_proc {
45 9     9 1 45 my $self = shift;
46 9 50       45 $self->{max_proc} = shift if $#_>=0;
47 9         18 return $self->{max_proc};
48             }
49              
50             sub use_sig_child {
51 797     797 1 1599 my $self = shift;
52 797 100       1975 $self->{use_sig_child} = shift if $#_>=0;
53 797         6380 return $self->{use_sig_child};
54             }
55              
56             sub running {
57 23     23 1 191 my $self = shift;
58 23         43 return (values %{$self->{_running}});
  23         173  
59             }
60              
61             sub running_sorted {
62 0     0 0 0 my $self = shift;
63 0         0 return (sort {$a->{name} cmp $b->{name}} values %{$self->{_running}});
  0         0  
  0         0  
64             }
65              
66             sub process {
67 0     0 1 0 my $self = shift;
68 0 0       0 confess "usage: \$fork->process(\$name)" unless scalar(@_) == 1;
69 0         0 return $self->{_processes}{$_[0]};
70             }
71              
72             sub processes {
73 52     52 1 86 my $self = shift;
74 52         56 return (values %{$self->{_processes}});
  52         563  
75             }
76              
77             sub processes_sorted {
78 42     42 1 26138 my $self = shift;
79 42         175 return (sort {$a->{name} cmp $b->{name}} values %{$self->{_processes}});
  756         1022  
  42         602  
80             }
81              
82             sub state_stats {
83 0     0 1 0 my $self = shift;
84 0         0 my %stats = (idle=>0, ready=>0, running=>0, runable=>0,
85             done=>0, parerr=>0, reapable=>0);
86 0         0 map {$stats{$_->state}++} $self->processes;
  0         0  
87 0         0 return %stats;
88             }
89              
90             #### METHODS
91              
92             sub schedule {
93 414     414 1 18860 my $class = shift;
94 414         1245 return Parallel::Forker::Process->_new(_forkref=>$class,
95             @_);
96             }
97              
98             sub sig_child {
99             # Keep minimal to avoid coredumps
100 167 50   167 1 3849 return if !$_[0];
101 167         3354 $_[0]->{_activity} = 1;
102             }
103              
104             sub wait_all {
105 52     52 1 285 my $self = shift;
106 52         157 while ($self->is_any_left) {
107             #print "NRUNNING ", scalar ( (keys %{$self->{_running}}) ), "\n";
108 571         3086 $self->poll;
109 543         44486295 usleep 100*1000;
110             };
111             }
112              
113             sub reap_processes {
114 0     0 1 0 my $self = shift;
115              
116 0         0 my @reaped;
117 0         0 foreach my $process ($self->processes) {
118 0 0       0 next unless $process->is_reapable;
119 0         0 $process->reap;
120 0         0 push @reaped, $process;
121             }
122 0         0 return @reaped;
123             }
124              
125             sub is_any_left {
126 595     595 1 4053 my $self = shift;
127 595 100       1130 return 1 if ( (keys %{$self->{_runable}}) > 0 );
  595         4554  
128 487 100       1018 return 1 if ( (keys %{$self->{_running}}) > 0 );
  487         3907  
129             }
130              
131             sub find_proc_name {
132 426     426 1 472 my $self = shift;
133 426         391 my $name = shift;
134             # Returns list of processes matching the name or label
135 426 100       728 if (exists $self->{_processes}{$name}) {
    100          
136 330         913 return ($self->{_processes}{$name});
137             } elsif (exists $self->{_labels}{$name}) {
138 64         64 return @{$self->{_labels}{$name}};
  64         178  
139             }
140 32         64 return undef;
141             }
142              
143             our $_Warned_Use_Sig_Child;
144              
145             sub poll {
146 571     571 1 987 my $self = shift;
147 571 100 100     2031 return if $self->use_sig_child && !$self->{_activity};
148 225 50       570 if (!defined $self->use_sig_child) {
149 0 0 0     0 carp "%Warning: Forker object should be new'ed with use_sig_child=>0 or 1, "
150             if ($^W && !$_Warned_Use_Sig_Child);
151 0         0 $_Warned_Use_Sig_Child = 1;
152 0         0 $self->use_sig_child(0);
153             }
154              
155             # We don't have a loop around this any more, as we want to allow
156             # applications to do other work. We'd also need to be careful not to
157             # set _activity with no one runnable, as it would potentially cause a
158             # infinite loop.
159              
160 225         491 $self->{_activity} = 0;
161 225         489 my $nrunning = grep { not $_->poll } (values %{$self->{_running}});
  311         2606  
  225         1593  
162              
163 225 50 66     1001 if (!($self->{max_proc} && $nrunning >= $self->{max_proc})) {
164 225         308 foreach my $procref (sort {$a->{name} cmp $b->{name}} # Lanch in named order
  295         745  
165 225         1750 values %{$self->{_runable}}) {
166 243 100 100     2442 last if ($self->{max_proc} && $nrunning >= $self->{max_proc});
167 223         1077 $procref->run;
168 195         2692 $nrunning++;
169             }
170             }
171             # If no one's running, we need _activity set to check for runable -> running
172             # transitions during the next call to poll().
173 197 100       3321 $self->{_activity} = 1 if !$nrunning;
174             }
175              
176             sub ready_all {
177 52     52 1 237 my $self = shift;
178 52         392 foreach my $procref ($self->processes) {
179 414 50       950 $procref->ready if $procref->is_idle;
180             };
181             }
182              
183             sub kill_all {
184 0     0 1   my $self = shift;
185 0   0       my $signal = shift || 9;
186 0           foreach my $procref ($self->running_sorted) {
187 0           $procref->kill($signal);
188             };
189             }
190              
191             sub kill_tree_all {
192 0     0 1   my $self = shift;
193 0   0       my $signal = shift || 9;
194 0           foreach my $procref ($self->running_sorted) {
195 0           $procref->kill_tree($signal);
196             };
197             }
198              
199             sub write_tree {
200 0     0 1   my $self = shift;
201 0           my %params = (@_);
202 0 0         defined $params{filename} or croak "%Error: filename not specified,";
203              
204 0           my %did_print;
205 0           my $another_loop = 1;
206 0           my $level = 0;
207 0           my $line = 4;
208 0           my @lines;
209 0           while ($another_loop) {
210 0           $another_loop = 0;
211 0           $level++;
212             proc:
213 0           foreach my $procref ($self->processes_sorted) {
214 0           foreach my $ra (values %{$procref->{_after_parents}}) {
  0            
215 0 0 0       next proc if (($did_print{$ra->{name}}{level}||999) >= $level);
216             }
217 0 0         if (!$did_print{$procref->{name}}{level}) {
218 0           $did_print{$procref->{name}}{level} = $level;
219 0           $did_print{$procref->{name}}{line} = $line;
220 0           $another_loop = 1;
221 0           $lines[$line][0] = $procref->_write_tree_line($level,0);
222 0           $lines[$line+1][0] = $procref->_write_tree_line($level,1);
223 0           foreach my $ra (values %{$procref->{_after_parents}}) {
  0            
224             $lines[$line][$did_print{$ra->{name}}{line}]
225 0           = $procref->{_after_parents_op}{$ra->{name}};
226             }
227 0           $line+=2;
228 0 0         if ($Debug) {
229 0           $lines[$line++][0] = $procref->_write_tree_line($level,2);
230 0           $lines[$line++][0] = $procref->_write_tree_line($level,3);
231 0           $lines[$line++][0] = $procref->_write_tree_line($level,4);
232             }
233 0           $line++;
234             }
235             }
236             }
237 0           $line++;
238              
239 0           if (0) {
240             for (my $row=1; $row<$line; $row++) {
241             for (my $col=1; $col<$line; $col++) {
242             print ($lines[$row][$col]?1:0);
243             }
244             print "\n";
245             }
246             }
247              
248 0           for (my $col=1; $col<=$#lines; $col++) {
249 0           my $col_used_row_min;
250             my $col_used_row_max;
251 0           for (my $row=1; $row<=$#lines; $row++) {
252 0 0         if ($lines[$row][$col]) {
253 0           $col_used_row_min = min($col_used_row_min, $row);
254 0           $col_used_row_max = max($col_used_row_max, $row);
255             }
256             }
257 0 0         if ($col_used_row_min) {
258 0           $col_used_row_min = min($col_used_row_min, $col);
259 0           $col_used_row_max = max($col_used_row_max, $col);
260 0           for (my $row=$col_used_row_min; $row<=$col_used_row_max; $row++) {
261 0 0 0       $lines[$row][$col] ||= '<' if $row==$col;
262 0   0       $lines[$row][$col] ||= '|';
263             }
264 0           for (my $row=1; $row<=$#lines; $row++) {
265 0 0 0       if (($lines[$row][0]||" ") !~ /^ /) { # Line with text on it
266 0   0       $lines[$row][$col] ||= '-';
267             #$lines[$row][$col-1] ||= '-';
268             }
269              
270 0   0       $lines[$row][$col] ||= ' ';
271             #$lines[$row][$col-1] ||= ' ';
272             }
273             }
274             }
275              
276 0 0         my $fh = IO::File->new($params{filename},"w") or die "%Error: $! $params{filename},";
277 0           print $fh "Tree of process spawn requirements:\n";
278 0           print $fh " & Indicates the program it connects to must complete with ok status\n";
279 0           print $fh " before the command on this row is allowed to become RUNABLE\n";
280 0           print $fh " E As with &, but with error status\n";
281 0           print $fh " ^ As with &, but with error or ok status\n";
282 0           print $fh " O Ored condition, either completing starts proc\n";
283 0           print $fh "\n";
284 0           for (my $row=1; $row<=$#lines; $row++) {
285 0           my $line = "";
286 0           for (my $col=1; $col<$#lines; $col++) {
287 0   0       $line .= ($lines[$row][$col]||"");
288             }
289 0   0       $line .= $lines[$row][0]||"";
290 0           $line =~ s/\s+$//;
291 0           print $fh "$line\n"; #if $line !~ /^\s*$/;
292             }
293              
294 0           $fh->close;
295             }
296              
297             sub min {
298 0     0 0   my $rtn = shift;
299 0           foreach my $v (@_) {
300 0 0 0       $rtn = $v if !defined $rtn || (defined $v && $v < $rtn);
      0        
301             }
302 0           return $rtn;
303             }
304             sub max {
305 0     0 0   my $rtn = shift;
306 0           foreach my $v (@_) {
307 0 0 0       $rtn = $v if !defined $rtn || (defined $v && $v > $rtn);
      0        
308             }
309 0           return $rtn;
310             }
311              
312             1;
313             ######################################################################
314             =pod
315              
316             =head1 NAME
317              
318             Parallel::Forker - Parallel job forking and management
319              
320             =head1 SYNOPSIS
321              
322             use Parallel::Forker;
323             $Fork = new Parallel::Forker (use_sig_child=>1);
324             $SIG{CHLD} = sub { Parallel::Forker::sig_child($Fork); };
325             $SIG{TERM} = sub { $Fork->kill_tree_all('TERM') if $Fork && $Fork->in_parent; die "Quitting...\n"; };
326              
327             $Fork->schedule
328             (run_on_start => sub {print "child work here...";},
329             # run_on_start => \&child_subroutine, # Alternative: call a named sub.
330             run_on_finish => sub {print "parent cleanup here...";},
331             )->run;
332              
333             $Fork->wait_all; # Wait for all children to finish
334              
335             # More processes
336             my $p1 = $Fork->schedule(...)->ready;
337             my $p2 = $Fork->schedule(..., run_after=>[$p1])->ready;
338             $Fork->wait_all; # p1 will complete before p2 starts
339              
340             # Other functions
341             $Fork->poll; # Service any active children
342             foreach my $proc ($Fork->running) { # Loop on each running child
343              
344             while ($Fork->is_any_left) {
345             $Fork->poll;
346             usleep(10*1000);
347             }
348              
349             =head1 DESCRIPTION
350              
351             Parallel::Forker manages parallel processes that are either subroutines or
352             system commands. Forker supports most of the features in all the other
353             little packages out there, with the addition of being able to specify
354             complicated expressions to determine which processes run after others, or
355             run when others fail.
356              
357             Function names are loosely based on Parallel::ForkManager.
358              
359             The unique property of Parallel::Forker is the ability to schedule
360             processes based on expressions that are specified when the processes are
361             defined. For example:
362              
363             my $p1 = $Fork->schedule(..., label=>'p1');
364             my $p2 = $Fork->schedule(..., label=>'p2');
365             my $p3 = $Fork->schedule(..., run_after => ["p1 | p2"]);
366             my $p4 = $Fork->schedule(..., run_after => ["p1 & !p2"]);
367              
368             Process p3 is specified to run after process p1 *or* p2 have completed
369             successfully. Process p4 will run after p1 finishes successfully, and
370             process p2 has completed with bad exit status.
371              
372             For more examples, see the tests.
373              
374             =head1 METHODS
375              
376             =over 4
377              
378             =item $self->find_proc_name()
379              
380             Returns one or more Parallel::Forker::Process objects for the given name (one
381             object returned) or label (one or more objects returned). Returns undef if no
382             processes are found.
383              
384             =item $self->in_parent
385              
386             Return true if and only if called from the parent process (the one that
387             created the Forker object).
388              
389             =item $self->is_any_left
390              
391             Return true if any processes are running, or runnable (need to run).
392              
393             =item $self->kill_all()
394              
395             Send a signal to all running children. You probably want to call this only
396             from the parent process that created the Parallel::Forker object, wrap the
397             call in "if ($self->in_parent)."
398              
399             =item $self->kill_tree_all()
400              
401             Send a signal to all running children and their subchildren.
402              
403             =item $self->max_proc()
404              
405             Specify the maximum number of processes that the poll method will run at
406             any one time. Defaults to undef, which runs all possible jobs at once.
407             Max_proc takes effect when you schedule processes and mark them "ready,"
408             then rely on Parallel::Forker's poll method to move the processes from the
409             ready state to the run state. (You should not call ->run yourself, as this
410             starts a new process immediately, ignoring max_proc.)
411              
412             =item $self->new()
413              
414             Create a new manager object. There may be more than one manager in any
415             application, but applications taking advantage of the sig_child handler
416             should call every manager's C method in the application's
417             C handler.
418              
419             Parameters are passed by name as follows:
420              
421             =over 4
422              
423             =item max_proc => ()
424              
425             See the C object method.
426              
427             =item use_sig_child => ( 0 | 1 )
428              
429             See the C object method. This option must be specified to
430             prevent a warning.
431              
432             =back
433              
434             =item $self->poll
435              
436             See if any children need work, and service them. Start up to max_proc
437             processes that are "ready" by calling their run method. Non-blocking;
438             always returns immediately.
439              
440             =item $self->process()
441              
442             Return Parallel::Forker::Process object for the specified process name, or
443             undef if none is found. See also find_proc_name.
444              
445             =item $self->processes
446              
447             Return Parallel::Forker::Process objects for all processes.
448              
449             =item $self->processes_sorted
450              
451             Return Parallel::Forker::Process objects for all processes, sorted by name.
452              
453             =item $self->ready_all
454              
455             Mark all processes as ready for scheduling.
456              
457             =item $self->reap_processes
458              
459             Reap all processes which have no other processes waiting for them, and the
460             process is is_done or is_parerr. Returns list of processes reaped. This
461             reclaims memory for when a large number of processes are being created,
462             run, and destroyed.
463              
464             =item $self->running
465              
466             Return Parallel::Forker::Process objects for all processes that are
467             currently running.
468              
469             =item $self->schedule()
470              
471             Register a new process perhaps for later running. Returns a
472             Parallel::Forker::Process object. Parameters are passed by name as
473             follows:
474              
475             =over 4
476              
477             =item label
478              
479             Optional name to use in C commands. Unlike C, this may be
480             reused, in which case C will wait on all commands with the given
481             label. Labels must contain only [a-zA-Z0-9_].
482              
483             =item name
484              
485             Optional name to use in C commands. Note that names MUST be
486             unique! When not specified, a unique number will be assigned
487             automatically.
488              
489             =item run_on_start
490              
491             Subroutine reference to execute when the job begins, in the forked process.
492             The subroutine is called with one argument, a reference to the
493             Parallel::Forker::Process that is starting.
494              
495             If your callback is going to fork, you'd be advised to have the child:
496              
497             $SIG{ALRM} = 'DEFAULT';
498             $SIG{CHLD} = 'DEFAULT';
499              
500             This will prevent the child from inheriting the parent's handlers, and
501             possibly confusing any child calls to waitpid.
502              
503             =item run_on_finish
504              
505             Subroutine reference to execute when the job ends, in the master process.
506             The subroutine is called with two arguments, a reference to the
507             Parallel::Forker::Process that is finishing, and the exit status of the
508             child process. Note the exit status will only be correct if a CHLD signal
509             handler is installed.
510              
511             =item run_after
512              
513             A list reference of processes that must be completed before this process
514             can be runnable. You may pass a process object (from schedule), a process
515             name, or a process label. You may use "|" or "&" in a string to run this
516             process after ANY processes exit, or after ALL exit (the default.)
517             ! in front of a process name indicates to run if that process fails with
518             bad exit status. ^ in front of a process indicates to run if that process
519             succeeds OR fails.
520              
521             =back
522              
523             =item $self->sig_child
524              
525             Must be called in a C<$SIG{CHLD}> handler by the parent process if
526             C was called with a "true" value. If there are multiple
527             Parallel::Forker objects each of their C methods must be called
528             in the C<$SIG{CHLD}> handler.
529              
530             =item $self->state_stats
531              
532             Return hash containing statistics with keys of state names, and values with
533             number of processes in each state.
534              
535             =item $self->use_sig_child( 0 | 1 )
536              
537             This should always be called with a 0 or 1. If you install a C<$SIG{CHLD}>
538             handler which calls your Parallel::Forker object's C method, you
539             should also turn on C, by calling it with a "true" argument.
540             Then, calls to C will do less work when there are no children
541             processes to be reaped. If not using the handler call with 0 to prevent a
542             warning.
543              
544             =item $self->wait_all
545              
546             Wait until there are no running or runable jobs left.
547              
548             =item $self->write_tree(filename => )
549              
550             Print a dump of the execution tree.
551              
552             =back
553              
554             =head1 DISTRIBUTION
555              
556             The latest version is available from CPAN and from
557             L.
558              
559             Copyright 2002-2020 by Wilson Snyder. This package is free software; you
560             can redistribute it and/or modify it under the terms of either the GNU
561             Lesser General Public License Version 3 or the Perl Artistic License
562             Version 2.0.
563              
564             =head1 AUTHORS
565              
566             Wilson Snyder
567              
568             =head1 SEE ALSO
569              
570             L
571              
572             =cut
573             ######################################################################