File Coverage

blib/lib/Parallel/Runner.pm
Criterion Covered Total %
statement 88 90 97.7
branch 36 38 94.7
condition 19 20 95.0
subroutine 21 21 100.0
pod 5 5 100.0
total 169 174 97.1


line stmt bran cond sub pod time code
1             package Parallel::Runner;
2 25     25   5470404 use strict;
  25         76  
  25         986  
3 25     25   147 use warnings;
  25         199  
  25         1283  
4              
5 25     25   14037 use POSIX ();
  25         218960  
  25         1143  
6 25     25   216 use Time::HiRes qw/sleep/;
  25         89  
  25         283  
7 25     25   2415 use Carp;
  25         37  
  25         2029  
8 25     25   13389 use Child qw/child/;
  25         154280  
  25         4372  
9              
10             our $VERSION = '0.014';
11              
12             for my $accessor (qw/ exit_callback data_callback iteration_callback _children pid max iteration_delay reap_callback pipe/) {
13             my $sub = sub {
14 19357     19357   48531 my $self = shift;
15 19357 100       66038 ( $self->{$accessor} ) = @_ if @_;
16 19357         312830567 return $self->{$accessor};
17             };
18 25     25   236 no strict 'refs';
  25         50  
  25         31835  
19             *$accessor = $sub;
20             }
21              
22             sub children {
23 2875     2875 1 231629 my $self = shift;
24 2875         7837 my @active;
25              
26 2875 100       7209 for my $proc ( @{$self->_children || []}, @_ ) {
  2875         18775  
27 6807 100       40488 if ( defined $proc->exit_status ) {
28 98 100       21856 if ( $self->data_callback ) {
29 7         173 my $data = $proc->read();
30 7         603 $self->data_callback->($data);
31             }
32              
33 98 100       1940 $self->reap_callback->( $proc->exit_status, $proc->pid, $proc->pid, $proc )
34             if $self->reap_callback;
35              
36 98         51535 next;
37             }
38 6709         458727 push @active => $proc;
39             }
40              
41 2875         12485 $self->_children( \@active );
42 2875         14884 return @active;
43             }
44              
45             sub new {
46 58     58 1 6800440 my $class = shift;
47 58         222 my ($max) = shift;
48 58   100     1691 return bless(
49             {
50             _children => [],
51             pid => $$,
52             max => $max || 1,
53             iteration_delay => 0.1,
54             @_,
55             },
56             $class
57             );
58             }
59              
60             sub run {
61 134     134 1 46839 my $self = shift;
62 134         519 my ( $code, $force_fork ) = @_;
63 134 100       1024 croak("Called run() in child process")
64             unless $self->pid == $$;
65              
66 124   100     1983 my $fork = $force_fork || $self->max > 1;
67 124 100       1022 return $self->_fork($code)
68             if $fork;
69              
70 10         90 my ($data) = $code->();
71 10 100       15000268 $self->data_callback->($data)
72             if $self->data_callback;
73              
74 7         46 return;
75             }
76              
77             sub _fork {
78 114     114   231 my $self = shift;
79 114         276 my ($code) = @_;
80              
81             # Wait for a slot
82             $self->_iterate(
83             sub {
84 2481     2481   39098 $self->children >= $self->max;
85             }
86 114         2974 );
87              
88             my $proc = Child->new(
89             sub {
90 17     17   77101 my $parent = shift;
91 17         1062 $self->_children( [] );
92              
93 17         679 my @return = $code->($parent);
94              
95 17 100       78016356 $self->exit_callback->(@return)
96             if $self->exit_callback;
97              
98 17 100       656 $parent->write( $return[0] )
99             if $self->data_callback;
100              
101             },
102 100 100 100     1951 $self->pipe || $self->data_callback ? ( pipe => $self->pipe ) : ()
103             )->start();
104              
105 566     566   10071 $self->_iterate( sub { !defined $proc->exit_status } )
106 83 100       437001 if $self->max == 1;
107              
108 81 100       2805 $self->children($proc)
109             unless defined $proc->exit_status;
110              
111 81         2056 return $proc;
112             }
113              
114             sub finish {
115 60     60 1 83965 my $self = shift;
116 60     249   1363 $self->_iterate( sub { $self->children }, @_ );
  249         2878  
117             }
118              
119             sub _iterate {
120 193     193   684 my $self = shift;
121 193         533 my ( $condition, $timeout, $timeoutsub ) = @_;
122 193         1120 my $counter = 0;
123              
124 193         505 while ( $condition->() ) {
125 3120 100       71533 $self->iteration_callback->($self)
126             if $self->iteration_callback;
127              
128 3120         15177 $counter += $self->iteration_delay;
129 3120 100 100     14126 last if $timeout and $counter >= $timeout;
130              
131 3119         9868 sleep $self->iteration_delay;
132             }
133              
134 177 100 66     6361 $timeoutsub->()
      100        
135             if $timeout
136             && $timeoutsub
137             && $counter >= $timeout;
138 177         741 1;
139             }
140              
141             sub killall {
142 1     1 1 2 my $self = shift;
143 1         3 my ( $sig, $warn ) = @_;
144              
145 1 50       13 if ($warn) {
146 1         5 warn time . " - Killing: $_ - $sig\n" for grep { $_->pid } $self->children;
  1         5  
147             }
148              
149 1         95 $_->kill($sig) for $self->children;
150             }
151              
152             sub DESTROY {
153 58     58   40035578 my $self = shift;
154             return
155 58 100 100     808 unless $self->pid == $$
156             && $self->children;
157 1         126 warn <
158             Parallel::Runner object destroyed without first calling finish(), This will
159             terminate all your child processes. This either means you forgot to call
160             finish() or your parent process has died.
161             EOT
162              
163 1 50       59 return $self->finish()
164             if $^O eq 'MSWin32';
165              
166             $self->finish(
167             1,
168             sub {
169 1     1   22 $self->killall( 15, 1 );
170             $self->finish(
171             4,
172             sub {
173 0           $self->killall( 9, 1 );
174 0           $self->finish(10);
175             }
176 1         144 );
177             }
178 1         33 );
179             }
180              
181             1;
182              
183             =pod
184              
185             =head1 NAME
186              
187             Parallel::Runner - An object to manage running things in parallel processes.
188              
189             =head1 DESCRIPTION
190              
191             There are several other modules to do this, you probably want one of them. This
192             module exists as a super specialised parallel task manager. You create the
193             object with a process limit and callbacks for what to do while waiting for a
194             free process slot, as well as a callback for what a process should do just
195             before exiting.
196              
197             You must explicitly call $runner->finish() when you are done. If the runner is
198             destroyed before it's children are finished a warning will be generated and
199             your child processes will be killed, by force if necessary.
200              
201             If you specify a maximum of 1 then no forking will occur, and run() will block
202             until the coderef returns. You can force a fork by providing a boolean true
203             value as the second argument to run(), this will force the runner to fork
204             before running the coderef, however run() will still block until it the child
205             exits.
206              
207             =head1 SYNOPSYS
208              
209             #!/usr/bin/perl
210             use strict;
211             use warnings;
212             use Parallel::Runner;
213              
214             my $runner = Parallel::Runner->new(4);
215             $runner->run( sub { ... } );
216             $runner->run( sub { ... } );
217             $runner->run( sub { ... } );
218             $runner->run( sub { ... } );
219              
220             # This will block until one of the previous 4 finishes
221             $runner->run( sub { ... } );
222              
223             # Do not forget this.
224             $runner->finish;
225              
226             =head1 CONSTRUCTOR
227              
228             =over 4
229              
230             =item $runner = $class->new( $max, $accessor => $value, ... );
231              
232             Create a new instance of Parallel::Runner. $accessor can be anything listed
233             under the ACCESSORS section. $max should be the maximum number of processes
234             allowed, defaults to 1.
235              
236             =back
237              
238             =head1 ACCESSORS
239              
240             These are simple accessors, providing an argument sets the accessor to that
241             argument, no argument it simply returns the current value.
242              
243             =over 4
244              
245             =item $val = $runner->data_callback( \&callback )
246              
247             If this is specified than IPC will be automatically enabled, and the final
248             return from each process will be passed into this handler in the main process.
249             Due to the way IPC works only strings/numerical data is passed, if you need to
250             pass a ref you will need to serialize it yourself before returning it, followed
251             by deserializing it in your callback.
252              
253             Example:
254              
255             # Place to put the accumulated data
256             my @accum_data;
257              
258             # Create the runner with a callback that pushes the data onto our array.
259             $runner = $CLASS->new( 2,
260             data_callback => sub {
261             my ($data) = @_;
262             push @accum_data => $data;
263             },
264             );
265              
266             # 4 processes that return data
267             $runner->run( sub { return "foo" });
268             $runner->run( sub { return "bar" });
269             $runner->run( sub { return "baz" });
270             $runner->run( sub { return "bat" });
271             $runner->finish;
272              
273             # Verify the data (order is not predictable)
274             is_deeply(
275             [ sort @accum_data ],
276             [ sort qw/foo bar baz bat/ ],
277             "Got all data returned by subprocesses"
278             );
279              
280             =item $val = $runner->exit_callback( \&callback )
281              
282             Codref to call just before a child exits (called within child)
283              
284             =item $val = $runner->iteration_delay( $float );
285              
286             How long to wait per iterate if nothing has changed.
287              
288             =item $val = $runner->iteration_callback( $newval )
289              
290             Coderef to call multiple times in a loop while run() is blocking waiting for a
291             process slot.
292              
293             =item $val = $runner->reap_callback( $newval )
294              
295             Codref to call whenever a pid is reaped using waitpid. The callback sub will be
296             passed 3 values The first is the exit status of the child process. The second
297             is the pid of the child process. The third used to be the return of waitpid,
298             but this is deprecated as L is now used and throws an exception when
299             waitpid is not what it should be. The third is simply the pid of the child
300             process again. The final argument is the child process object itself.
301              
302             $runner->reap_callback( sub {
303             my ( $status, $pid, $pid_again, $proc ) = @_;
304              
305             # Status as returned from system, so 0 is good, 1+ is bad.
306             die "Child $pid did not exit 0"
307             if $status;
308             });
309              
310             =item @children = $runner->children( @append )
311              
312             Returns a list of L objects.
313              
314             =item $val = $runner->pid()
315              
316             pid of the parent process
317              
318             =item $val = $runner->max( $newval )
319              
320             Maximum number of children
321              
322             =back
323              
324             =head1 OBJECT METHODS
325              
326             =over 4
327              
328             =item run( $code )
329              
330             =item run( $code, $force_fork )
331              
332             Run the specified code in a child process. Blocks if no free slots are
333             available. Force fork can be used to force a fork when max is 1, however it
334             will still block until the child exits.
335              
336             =item finish()
337              
338             =item finish( $timeout )
339              
340             =item finish( $timeout, $timeoutcallback )
341              
342             Wait for all children to finish, then clean up after them. If a timeout is
343             specified it will return after the timeout regardless of wether or not children
344             have all exited. If there is a timeout call back then that code will be run
345             upon timeout just before the method returns.
346              
347             NOTE: DO NOT LET YOUR RUNNER BE DESTROYED BEFORE FINISH COMPLETES WITHOUT A
348             TIMEOUT.
349              
350             the runner will kill all children, possibly with force if your runner is
351             destroyed with children still running, or not waited on.
352              
353             =item killall( $sig )
354              
355             Send all children the specified kill signal.
356              
357             =item DESTROY()
358              
359             Automagically called when the object is destroyed. If called while children are
360             running it will forcefully clean up after you as follows:
361              
362             1) Sends an ugly warning.
363              
364             2) Will first give all your children 1 second to complete.
365              
366             Windows) Strawberry fails with processes, so on windows DESTROY will wait as
367             long as needed, possibly forever.
368              
369             3) Sends kill signal 15 to all children then waits up to 4 seconds.
370              
371             4) Sends kill signal 9 to any remaining children then waits up to 10 seconds
372              
373             5) Gives up and returns
374              
375             =back
376              
377             =head1 FENNEC PROJECT
378              
379             This module is part of the Fennec project. See L for more details.
380             Fennec is a project to develop an extendable and powerful testing framework.
381             Together the tools that make up the Fennec framework provide a potent testing
382             environment.
383              
384             The tools provided by Fennec are also useful on their own. Sometimes a tool
385             created for Fennec is useful outside the greater framework. Such tools are
386             turned into their own projects. This is one such project.
387              
388             =over 2
389              
390             =item L - The core framework
391              
392             The primary Fennec project that ties them all together.
393              
394             =back
395              
396             =head1 AUTHORS
397              
398             Chad Granum L
399              
400             =head1 COPYRIGHT
401              
402             Copyright (C) 2010 Chad Granum
403              
404             Parallel-Runner is free software; Standard perl licence.
405              
406             Parallel-Runner is distributed in the hope that it will be useful, but WITHOUT
407             ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
408             FOR A PARTICULAR PURPOSE. See the license for more details.