File Coverage

blib/lib/Parallel/PreForkManager.pm
Criterion Covered Total %
statement 187 198 94.4
branch 52 64 81.2
condition 1 3 33.3
subroutine 19 20 95.0
pod 11 11 100.0
total 270 296 91.2


line stmt bran cond sub pod time code
1             package Parallel::PreForkManager;
2              
3 511     511   250172 use strict;
  511         643  
  511         12407  
4 511     511   2106 use warnings;
  511         521  
  511         19197  
5              
6             our $VERSION = '1.20170417'; # VERSION
7              
8 511     511   2143 use Carp;
  511         508  
  511         27897  
9 511     511   247814 use IO::Handle;
  511         2474367  
  511         21089  
10 511     511   224452 use IO::Select;
  511         633729  
  511         37478  
11 511     511   413478 use JSON;
  511         5381841  
  511         1965  
12 511     511   281902 use English qw( -no_match_vars );
  511         1503868  
  511         2362  
13              
14             my $DEBUG = 0;
15              
16             sub new {
17 727     727 1 2005718 my ( $Class, $Args ) = @_;
18              
19 727 100       6438 croak "No ChildHandler set" if ! exists ( $Args->{'ChildHandler'} );
20              
21             my $Self = {
22 726         12415 'ChildHandler' => $Args->{'ChildHandler'},
23             'ChildCount' => 10,
24             'Timeout' => 0,
25             'WaitComplete' => 1,
26             'JobQueue' => [],
27             'Select' => IO::Select->new(),
28             };
29              
30 726         13710 foreach my $Arg ( qw { Timeout ChildCount WaitComplete ParentCallback ProgressCallback JobsPerChild ChildSetupHook ChildTeardownHook } ) {
31 5808 100       19717 $Self->{ $Arg } = $Args->{ $Arg } if exists ( $Args->{ $Arg } );
32             }
33              
34 726   33     7757 bless $Self, ref($Class) || $Class;
35              
36 726         2043 return $Self;
37             }
38              
39             sub AddJob {
40 89262     89262 1 194867 my ( $Self, $Job ) = @_;
41 89262         47728 push @{ $Self->{'JobQueue'} }, $Job;
  89262         71389  
42 89262         80484 return;
43             }
44              
45             sub RunJobs {
46 723     723 1 30355 my ($Self) = @_;
47              
48             # If a worker dies, there's a problem
49             local $SIG{CHLD} = sub {
50 16569     16569   25751851784 my $pid = wait();
51 16569 50       14409351043 if ( exists ( $Self->{'ToChild'}->{$pid} ) ) {
52 0         0 confess("Worker $pid died.");
53             }
54 723         20043 };
55              
56             # Start the workers
57 723         4464 $Self->StartChildren();
58              
59             # Read from the workers, loop until they all shut down
60 440         1986 while ( %{ $Self->{'ToChild'} } ) {
  565         23884  
61             READYLOOP:
62 440         4483 while ( my @Ready = $Self->{'Select'}->can_read() ) {
63             READLOOP:
64 39075         135398807 foreach my $fh (@Ready) {
65 147268         36937412 my $Result = $Self->Receive($fh);
66              
67 147268 50       14813770 if ( !$Result ) {
68 0         0 $Self->{'Select'}->remove($fh);
69 0         0 print STDERR "$fh got eof\n";
70 0         0 next READLOOP;
71             }
72              
73 147268         286807 my $ResultMethod = $Result->{ 'Method' };
74 147268 50       62986236 warn "Parent working on Method $ResultMethod\n" if $DEBUG;
75              
76             # Handle the initial request for work
77 147268 100       64971548 if ( $ResultMethod eq 'Startup' ) {
78 46662 100       110797 if ( $#{ $Self->{'JobQueue'} } > -1 ) {
  46662         27398048  
79             #my $Child = $Self->{ 'ToChild' }->{ $Result->{ 'pid' } };
80 46531         56272 my $NextJob = shift( @{ $Self->{'JobQueue'} } );
  46531         80432  
81 46531         30572771 $Self->Send( $Self->{'ToChild'}->{ $Result->{'pid'} }, { 'Job' => $NextJob, }, );
82 46531         11967278 next READLOOP;
83             }
84             else {
85             # Nothing to do, shut down
86 131         1044 $Self->{'Select'}->remove($fh);
87 131         3764 my $fh = $Self->{'ToChild'}->{ $Result->{'pid'} };
88 131         415 delete( $Self->{'ToChild'}->{ $Result->{'pid'} } );
89 131         79774526 $Self->Send( $fh, { 'Shutdown' => 1, }, );
90 131         4224 close($fh);
91             }
92             }
93              
94             # Process the result handler
95 100737 100       271677 if ( $ResultMethod eq 'Completed' ) {
96              
97             # The child has completed it's work, process the results.
98 50162 100       191432 if ( exists( $Self->{'ParentCallback'} ) ) {
99 49902         408089 $Self->{ 'Result' } = $Result;
100 49902         71240 &{ $Self->{'ParentCallback'} }( $Self, $Result->{ 'Data' } );
  49902         348805  
101 49902         891109 delete $Self->{ 'Result' };
102             }
103              
104             # If the child has reached its processing limit then shut it down
105 50162 100       170800 if ( exists( $Result->{'JobsPerChildLimitReached'} ) ) {
106 44755         12356369 $Self->{'Select'}->remove($fh);
107 44755         1355448 my $fh = $Self->{'ToChild'}->{ $Result->{'pid'} };
108 44755         171415 delete( $Self->{'ToChild'}->{ $Result->{'pid'} } );
109 44755         14916072 $Self->Send( $fh, { 'Shutdown' => 1, }, );
110 44755         634726 close($fh);
111             # If there are still jobs to be done then start a new child
112 44755 100       48843 if ( $#{ $Self->{'JobQueue'} } > -1 ) {
  44755         180966  
113 43788         115040 $Self->StartChild();
114             }
115 44440         424813276 next READLOOP;
116             }
117              
118             # If there's still work to be done, send it to the child
119 5407 100       6210 if ( $#{ $Self->{'JobQueue'} } > -1 ) {
  5407         18306  
120 5262         4651 my $NextJob = shift( @{ $Self->{'JobQueue'} } );
  5262         21339  
121 5262         32730 $Self->Send( $Self->{'ToChild'}->{ $Result->{'pid'} }, { 'Job' => $NextJob, }, );
122 5262         44603 next READLOOP;
123             }
124              
125             # There is no more work to be done, shut down this child
126 145         479 $Self->{'Select'}->remove($fh);
127 145         4153 my $fh = $Self->{'ToChild'}->{ $Result->{pid} };
128 145         372 delete( $Self->{'ToChild'}->{ $Result->{pid} } );
129 145         15898 close($fh);
130 145         915 next READLOOP;
131             }
132              
133 50575 100       13048404 if ( $ResultMethod eq 'ProgressCallback' ) {
134 50444         103844 my $Method = $Result->{'ProgressCallbackMethod'};
135 50444         189673 my $Data = $Result->{'ProgressCallbackData'};
136 50444 50       39629184 if ( exists( $Self->{'ProgressCallback'}->{$Method} ) ) {
137 50444         128459 my $MethodResult = &{ $Self->{'ProgressCallback'}->{$Method} }( $Self, $Data );
  50444         34727292  
138 50444         19408287 $Self->Send( $Self->{'ToChild'}->{ $Result->{'pid'} }, $MethodResult );
139              
140             }
141             else {
142 0         0 confess "Unknown callback method";
143             }
144              
145 50444         257388885 next READLOOP;
146             }
147              
148             }
149             }
150             }
151              
152 125 50       1179 if ( $Self->{ 'WaitComplete' } ) {
153 125         1038 $Self->WaitComplete();
154             }
155              
156 125         91555 return;
157             }
158              
159             sub GetResult {
160 20     20 1 65 my ( $Self ) = @_;
161 20         23 return $Self->{ 'Result' };
162             }
163              
164             sub WaitComplete {
165 125     125 1 249 my ( $Self ) = @_;
166 125         1190093409 while ( ( my $pid = wait() ) != -1 ) { }
167 125         4357 return;
168             }
169              
170             sub StartChildren {
171 723     723 1 1423 my ($Self) = @_;
172              
173 723         1106 my $MaxChildren = $Self->{ 'ChildCount' };
174 723         856 my $ActualJobs = scalar @{ $Self->{ 'JobQueue' } };
  723         1418  
175              
176 723 100       2302 if ( $ActualJobs < $MaxChildren ) {
177 21         21 $MaxChildren = $ActualJobs;
178             }
179              
180 723         3250 foreach ( 1 .. $MaxChildren ) {
181 5794         27483 $Self->StartChild();
182             }
183              
184 440         4209 return;
185             }
186              
187             sub StartChild {
188 49582     49582 1 75974 my ($Self) = @_;
189              
190             # Open a pipe for the worker
191 49582         53454 my ( $FromParent, $FromChild, $ToParent, $ToChild );
192 49582         21536393 pipe( $FromParent, $ToChild );
193 49582         4404233 pipe( $FromChild, $ToParent );
194              
195             # Fork off a worker
196 49582         88467151 my $pid = fork();
197              
198 49582 100       46312351965 if ($pid) {
    50          
199             # Parent
200              
201             # Close unused pipes
202 48984         309516542 close($ToParent);
203 48984         776264 close($FromParent);
204              
205 48984         348017388 $Self->{'ToChild'}->{$pid} = $ToChild;
206 48984         394828 $Self->{'FromChild'}->{$pid} = $FromChild;
207 48984         650682100 $Self->{'Select'}->add($FromChild);
208              
209             }
210             elsif ( $pid == 0 ) {
211             # Child
212              
213 598 50       120060 warn "Child $PID spawned" if $DEBUG;
214              
215             # Close unused pipes
216 598         29747 close($FromChild);
217 598         22430 close($ToChild);
218              
219             # Setup communication pipes
220 598         11533 $Self->{'ToParent'} = $ToParent;
221 598         271769 open( STDIN, '<', '/dev/null' );
222              
223             # Send the initial request
224 598         26961 $Self->Send( $ToParent, { 'Method' => 'Startup' } );
225              
226             # Start processing
227 598         10732 $Self->Child($FromParent);
228              
229             # When the worker subroutine completes, exit
230 0         0 exit 0;
231             }
232             else {
233 0         0 confess("Failed to fork: $!");
234             }
235              
236 48984         680489148 return;
237             }
238              
239             sub Child {
240 598     598 1 96590 my ( $Self, $FromParent ) = @_;
241 598         6549 $Self->{'FromParent'} = $FromParent;
242              
243 598 100       71912 if ( exists( $Self->{'ChildSetupHook'} ) ) {
244 30         71 &{ $Self->{'ChildSetupHook'} }( $Self );
  30         829  
245             }
246              
247             # Read instructions from the parent
248 598         56380 while ( my $Instructions = $Self->Receive($FromParent) ) {
249              
250             # If the handler's children die, that's not our business
251 1293         1657299 $SIG{CHLD} = 'IGNORE';
252              
253 1293 100       502558 if ( exists( $Instructions->{'Shutdown'} ) ) {
254 353 50       2876 warn "Child $PID shutdown" if $DEBUG;
255 353 100       2376 if ( exists( $Self->{'ChildTeardownHook'} ) ) {
256 20         31 &{ $Self->{'ChildTeardownHook'} }( $Self );
  20         361  
257             }
258 353         14722706 exit 0;
259             }
260              
261 940         242898 my $ResultToParent = {};
262 940         17540 $ResultToParent->{ 'Request' } = $Instructions;
263              
264             # Execute the handler with the given instructions
265 940         88874 my $Result;
266 940         2125 eval {
267              
268             # Handle alarms
269             local $SIG{ALRM} = sub {
270 0     0   0 die "Child timed out.";
271 940         496021 };
272              
273             # Set alarm
274 940         549998 alarm( $Self->{'Timeout'} );
275              
276             # Execute the handler and get it's result
277 940 50       57853 if ( exists( $Self->{'ChildHandler'} ) ) {
278 940         3231 $Result = &{ $Self->{'ChildHandler'} }( $Self, $Instructions->{'Job'} );
  940         223010  
279             }
280              
281             # Disable alarm
282 831         1876367 alarm(0);
283              
284             };
285              
286             # report errors
287 840 100       136238 if (my $Error = $@) {
288 9 50       32 warn "Child $PID errored: $@" if $DEBUG;
289 9 50       34 if ( exists( $Self->{'ChildTeardownHook'} ) ) {
290 0         0 eval { &{ $Self->{'ChildTeardownHook'} }( $Self ); };
  0         0  
  0         0  
291             }
292 9         25 $ResultToParent->{ 'Method' } = 'Completed';
293 9         23 $ResultToParent->{ 'Error' } = $Error;
294             }
295             else {
296 831         4608 $ResultToParent->{ 'Method' } = 'Completed';
297 831         100030 $ResultToParent->{ 'Data' } = $Result;
298             }
299              
300 840 100       98589 if ( exists( $Self->{'JobsPerChild'} ) ) {
301 550         4266 $Self->{'JobsPerChild'} = $Self->{'JobsPerChild'} - 1;
302 550 100       687859 if ( $Self->{'JobsPerChild'} == 0 ) {
303 328         1608 $ResultToParent->{'JobsPerChildLimitReached'} = 1;
304             }
305             }
306              
307             # Send the result to the server
308 840         88915 $Self->Send( $Self->{'ToParent'}, $ResultToParent );
309             }
310              
311 145 100       1510 if ( exists( $Self->{'ChildTeardownHook'} ) ) {
312 10         22 &{ $Self->{'ChildTeardownHook'} }( $Self );
  10         59  
313             }
314              
315 145 50       1965 warn "Child $PID completed" if $DEBUG;
316 145         117060 exit 0;
317             }
318              
319             sub ProgressCallback {
320 640     640 1 220357 my ( $Self, $Method, $Data ) = @_;
321 640         452123 $Self->Send( $Self->{'ToParent'}, {
322             'Method' => 'ProgressCallback',
323             'ProgressCallbackMethod' => $Method,
324             'ProgressCallbackData' => $Data,
325             } );
326 640         2825 my $Result = $Self->Receive( $Self->{'FromParent'} );
327 640         663923 return $Result;
328             }
329              
330             sub Receive {
331 149346     149346 1 498846 my ( $Self, $fh ) = @_;
332              
333             # Get a value from the file handle
334 149346         261893 my $Value;
335             my $Char;
336 149346         3150579553 while ( read( $fh, $Char, 1 ) ) {
337 14924080 100       40764794 if ( $Char eq "\n" ) {
338 149201         411425 last;
339             }
340 14774879         43149585 $Value .= $Char;
341             }
342              
343             # Deserialize the data
344 149346         13828409 my $Data = eval { decode_json($Value) };
  149346         5719855  
345              
346 149346         2698608 return $Data;
347             }
348              
349             sub Send {
350 149201     149201 1 671466 my ( $Self, $fh, $Value ) = @_;
351              
352 149201         450324 $Value->{'pid'} = $PID;
353              
354 149201         738535 my $Encoded = encode_json($Value);
355 149201         11530845 print $fh "$Encoded\n";
356              
357             # Force the file handle to flush
358 149201         216929654 $fh->flush();
359              
360 149201         372354 return;
361             }
362              
363             1;
364              
365             __END__