File Coverage

blib/lib/Parallel/Prefork.pm
Criterion Covered Total %
statement 130 148 87.8
branch 48 66 72.7
condition 12 17 70.5
subroutine 24 25 96.0
pod 5 6 83.3
total 219 262 83.5


line stmt bran cond sub pod time code
1             package Parallel::Prefork;
2              
3 86     86   2295929 use strict;
  86         119  
  86         1914  
4 86     86   206 use warnings;
  86         95  
  86         1475  
5              
6 86     86   1226 use 5.008_001;
  86         172  
7              
8 86     86   225 use base qw/Class::Accessor::Lite/;
  86         75  
  86         37120  
9 86     86   68920 use List::Util qw/first max min/;
  86         86  
  86         5906  
10 86     86   32054 use Proc::Wait3 ();
  86         33071  
  86         1536  
11 86     86   24342 use Time::HiRes ();
  86         49521  
  86         2375  
12              
13             use Class::Accessor::Lite (
14 86         442 rw => [ qw/max_workers spawn_interval err_respawn_interval trap_signals signal_received manager_pid on_child_reap before_fork after_fork/ ],
15 86     86   354 );
  86         86  
16              
17             our $VERSION = '0.18';
18              
19             sub new {
20 84     84 1 60915 my $klass = shift;
21 84 50       623 my $opts = @_ == 1 ? $_[0] : +{ @_ };
22 84         1371 my $self = bless {
23             worker_pids => {},
24             max_workers => 10,
25             spawn_interval => 0,
26             err_respawn_interval => 1,
27             trap_signals => {
28             TERM => 'TERM',
29             },
30             signal_received => '',
31             manager_pid => undef,
32             generation => 0,
33             %$opts,
34             _no_adjust_until => 0, # becomes undef in wait_all_children
35             }, $klass;
36             $SIG{$_} = sub {
37 6     6   53 $self->signal_received($_[0]);
38 84         221 } for keys %{$self->trap_signals};
  84         645  
39 84     102   4720 $SIG{CHLD} = sub {};
40 84         272 $self;
41             }
42              
43             sub start {
44 85     85 1 21869 my ($self, $cb) = @_;
45            
46 85         350 $self->manager_pid($$);
47 85         489 $self->signal_received('');
48 85         659 $self->{generation}++;
49            
50             die 'cannot start another process while you are in child process'
51 85 50       410 if $self->{in_child};
52            
53             # main loop
54 85         448 while (! $self->signal_received) {
55 1182   66     18967 my $action = $self->{_no_adjust_until} <= Time::HiRes::time()
56             && $self->_decide_action;
57 1182 100       6366 if ($action > 0) {
    100          
58             # start a new worker
59 1113 100       1654 if (my $subref = $self->before_fork) {
60 46         378 $subref->($self);
61             }
62 1113         434091 my $pid = fork;
63 1113 50       11082 unless (defined $pid) {
64 0         0 warn "fork failed:$!";
65 0         0 $self->_update_spawn_delay($self->err_respawn_interval);
66 0         0 next;
67             }
68 1113 100       2902 unless ($pid) {
69             # child process
70 78         2780 $self->{in_child} = 1;
71 78         453 $SIG{$_} = 'DEFAULT' for keys %{$self->trap_signals};
  78         2406  
72 78         6769 $SIG{CHLD} = 'DEFAULT'; # revert to original
73 78 50       1829 exit 0 if $self->signal_received;
74 78 100       1477 if ($cb) {
75 8         83 $cb->();
76 8         93 $self->finish();
77             }
78 70         986 return;
79             }
80 1035 100       37517 if (my $subref = $self->after_fork) {
81 38         819 $subref->($self, $pid);
82             }
83 1035         31108 $self->{worker_pids}{$pid} = $self->{generation};
84 1035         8799 $self->_update_spawn_delay($self->spawn_interval);
85             } elsif ($action < 0) {
86             # stop an existing worker
87             kill(
88             $self->_action_for('TERM')->[0],
89 5         43 (keys %{$self->{worker_pids}})[0],
  5         1071  
90             );
91 5         21 $self->_update_spawn_delay($self->spawn_interval);
92             }
93             $self->{__dbg_callback}->()
94 1104 50       2319 if $self->{__dbg_callback};
95 1104 100 66     15024 if (my ($exit_pid, $status)
96             = $self->_wait(! $self->{__dbg_callback} && $action <= 0)) {
97 28         199 $self->_on_child_reap($exit_pid, $status);
98 28 100 100     534 if (delete($self->{worker_pids}{$exit_pid}) == $self->{generation}
99             && $status != 0) {
100 5         17 $self->_update_spawn_delay($self->err_respawn_interval);
101             }
102             }
103             }
104             # send signals to workers
105 7 50       69 if (my $action = $self->_action_for($self->signal_received)) {
106 7         21 my ($sig, $interval) = @$action;
107 7 50       56 if ($interval) {
108             # fortunately we are the only one using delayed_task, so implement
109             # this setup code idempotent and replace the already-registered
110             # callback (if any)
111 0         0 my @pids = sort keys %{$self->{worker_pids}};
  0         0  
112             $self->{delayed_task} = sub {
113 0     0   0 my $self = shift;
114 0         0 my $pid = shift @pids;
115 0         0 kill $sig, $pid;
116 0 0       0 if (@pids == 0) {
117 0         0 delete $self->{delayed_task};
118 0         0 delete $self->{delayed_task_at};
119             } else {
120 0         0 $self->{delayed_task_at} = Time::HiRes::time() + $interval;
121             }
122 0         0 };
123 0         0 $self->{delayed_task_at} = 0;
124 0         0 $self->{delayed_task}->($self);
125             } else {
126 7         32 $self->signal_all_children($sig);
127             }
128             }
129            
130 7         88 1; # return from parent process
131             }
132              
133             sub finish {
134 68     68 1 68406905 my ($self, $exit_code) = @_;
135 68 50       1158 die "\$parallel_prefork->finish() shouln't be called within the manager process\n"
136             if $self->manager_pid() == $$;
137 68   50     51728 exit($exit_code || 0);
138             }
139              
140             sub signal_all_children {
141 9     9 1 20 my ($self, $sig) = @_;
142 9         14 foreach my $pid (sort keys %{$self->{worker_pids}}) {
  9         151  
143 110         2767 kill $sig, $pid;
144             }
145             }
146              
147             sub num_workers {
148 1468     1468 0 1444 my $self = shift;
149 1468         1167 return scalar keys %{$self->{worker_pids}};
  1468         4667  
150             }
151              
152             sub _decide_action {
153 1215     1215   921 my $self = shift;
154 1215 100       1412 return 1 if $self->num_workers < $self->max_workers;
155 62         423 return 0;
156             }
157              
158             sub _on_child_reap {
159 107     107   347 my ($self, $exit_pid, $status) = @_;
160 107         418 my $cb = $self->on_child_reap;
161 107 100       851 if ($cb) {
162 40         51 eval {
163 40         103 $cb->($self, $exit_pid, $status);
164             };
165             # XXX - hmph, what to do here?
166             }
167             }
168              
169             # runs delayed tasks (if any) and returns how many seconds to wait
170             sub _handle_delayed_task {
171 1204     1204   1071 my $self = shift;
172 1204         1016 while (1) {
173             return undef
174 1204 50       2426 unless $self->{delayed_task};
175 0         0 my $timeleft = $self->{delayed_task_at} - Time::HiRes::time();
176 0 0       0 return $timeleft
177             if $timeleft > 0;
178 0         0 $self->{delayed_task}->($self);
179             }
180             }
181              
182             # returns [sig_to_send, interval_bet_procs] or undef for given recved signal
183             sub _action_for {
184 12     12   66 my ($self, $sig) = @_;
185 12 50       71 my $t = $self->{trap_signals}{$sig}
186             or return undef;
187 12 50       56 $t = [$t, 0] unless ref $t;
188 12         59 return $t;
189             }
190              
191             sub wait_all_children {
192 9     9 1 606 my ($self, $timeout) = @_;
193 9         17 $self->{_no_adjust_until} = undef;
194              
195             my $call_wait = sub {
196 100     100   119 my $blocking = shift;
197 100 100       195 if (my ($pid) = $self->_wait($blocking)) {
198 79 50       398 if (delete $self->{worker_pids}{$pid}) {
199 79         319 $self->_on_child_reap($pid, $?);
200             }
201 79         609 return $pid;
202             }
203 21         32 return;
204 9         85 };
205              
206 9 100       39 if ($timeout) {
207             # the strategy is to use waitpid + sleep that gets interrupted by SIGCHLD
208             # but since there is a race condition bet. waitpid and sleep, the argument
209             # to sleep should be set to a small number (and we use 1 second).
210 3         16 my $start_at = [Time::HiRes::gettimeofday];
211 3   66     9 while ($self->num_workers != 0 && Time::HiRes::tv_interval($start_at) < $timeout) {
212 41 100       544 unless ($call_wait->(0)) {
213 21         3501369 sleep 1;
214             }
215             }
216             } else {
217 6         21 while ($self->num_workers != 0) {
218 59         116 $call_wait->(1);
219             }
220             }
221 9         102 return $self->num_workers;
222             }
223              
224             sub _update_spawn_delay {
225 1045     1045   12733 my ($self, $secs) = @_;
226 1045 50       7810 $self->{_no_adjust_until} = $secs ? Time::HiRes::time() + $secs : 0;
227             }
228              
229             # wrapper function of Proc::Wait3::wait3 that executes delayed task if any. assumes wantarray == 1
230             sub _wait {
231 1204     1204   2571 my ($self, $blocking) = @_;
232 1204 100       5199 if (! $blocking) {
233 1076         3996 $self->_handle_delayed_task();
234 1076         7454 return Proc::Wait3::wait3(0);
235             } else {
236 128         336 my $delayed_task_sleep = $self->_handle_delayed_task();
237             my $delayed_fork_sleep =
238             $self->_decide_action() > 0 && defined $self->{_no_adjust_until}
239 128 50 66     328 ? max($self->{_no_adjust_until} - Time::HiRes::time(), 0)
240             : undef;
241 128         971 my $sleep_secs = min grep { defined $_ } (
  384         1903  
242             $delayed_task_sleep,
243             $delayed_fork_sleep,
244             $self->_max_wait(),
245             );
246 128 100       243 if (defined $sleep_secs) {
247             # wait max sleep_secs or until signalled
248 45         9966790 select(undef, undef, undef, $sleep_secs);
249 45 100       1027 if (my @r = Proc::Wait3::wait3(0)) {
250 10         72 return @r;
251             }
252             } else {
253 83 100       4543722 if (my @r = Proc::Wait3::wait3(1)) {
254 77         531 return @r;
255             }
256             }
257 41         1427 return +();
258             }
259             }
260              
261             sub _max_wait {
262 83     83   337 return undef;
263             }
264              
265             1;
266              
267             __END__