File Coverage

blib/lib/AnyEvent/Process.pm
Criterion Covered Total %
statement 185 208 88.9
branch 45 68 66.1
condition 23 48 47.9
subroutine 32 32 100.0
pod 5 5 100.0
total 290 361 80.3


line stmt bran cond sub pod time code
1             package AnyEvent::Process::Job;
2 12     12   36305 use strict;
  12         22  
  12         4035  
3              
4             sub new {
5 15     15   170 my ($ref, $pid) = @_;
6 15         676 my $self = bless {pid => $pid, cbs => [], handles => [], timers => []}, $ref;
7              
8 15         69 return $self;
9             }
10              
11             sub kill {
12 3     3   1014 my ($self, $signal) = @_;
13              
14 3         355 return kill $signal, $self->{pid};
15             }
16              
17             sub pid {
18 1     1   6 return $_[0]->{pid};
19             }
20              
21             sub _add_cb {
22 15     15   22627 my ($self, $cb) = @_;
23 15         34 push @{$self->{cbs}}, $cb;
  15         173  
24             }
25              
26             sub _add_handle {
27 5     5   10 my ($self, $handle) = @_;
28 5         10 push @{$self->{handles}}, $handle;
  5         60  
29             }
30              
31             sub _add_timer {
32 2     2   4 my ($self, $timer) = @_;
33 2         5 push @{$self->{timers}}, $timer;
  2         15  
34             }
35              
36             sub _remove_cbs {
37 10     10   76 undef $_[0]->{cbs};
38             }
39              
40             sub _remove_timers {
41 12     12   53 my $self = shift;
42 12         46 undef $_ foreach @{$self->{timers}};
  12         244  
43 12         78 undef $self->{timers};
44             }
45              
46             sub close {
47 1     1   3 my $self = shift;
48 1         2 undef $_ foreach @{$self->{handles}};
  1         18  
49 1         55 undef $self->{handles};
50             }
51              
52             package AnyEvent::Process;
53 12     12   59 use strict;
  12         24  
  12         234  
54              
55 12     12   13854 use AnyEvent::Handle;
  12         240039  
  12         494  
56 12     12   115 use AnyEvent::Util;
  12         24  
  12         878  
57 12     12   63 use AnyEvent;
  12         35  
  12         257  
58 12     12   55 use Carp;
  12         24  
  12         19885  
59              
60             our @proc_args = qw(fh_table code on_completion args watchdog_interval on_watchdog kill_interval on_kill close_all_fds_except);
61             our $VERSION = '0.02';
62              
63             my $nop = sub {};
64              
65             sub _yield {
66 15     15   539 my $cv_yield = AE::cv;
67 15     15   1219 AE::postpone { $cv_yield->send };
  15         8004  
68 15         522 $cv_yield->recv;
69             }
70              
71             # Create a callback factory. This is needed to execute on_completion after all
72             # other callbacks.
73             sub _create_callback_factory {
74 21   66 21   134 my $on_completion = shift // $nop;
75 21         43 my $counter = 0;
76 21         33 my @on_completion_args;
77              
78             my $factory = sub {
79 20   66 20   136 my $func = shift // $nop;
80              
81 20         44 $counter++;
82             return sub {
83 12         685178 my ($err, $rtn);
84              
85 12         48 eval {
86 12         83 $rtn = $func->(@_);
87 12         53 }; $err = $@;
88              
89 12 100       142 if (--$counter == 0) {
90 10         24 eval {
91 10         232 $on_completion->(@on_completion_args);
92 10   33     2573 }; $err = $err || $@;
93 10         49 $on_completion_args[0]->_remove_cbs;
94             }
95              
96 12 50       66 if ($err) {
97 0         0 croak $err;
98             }
99              
100 12         1010 return $rtn;
101             }
102 21         126 };
  20         1995  
103             my $set_on_completion_args = sub {
104 10     10   105 @on_completion_args = @_;
105 21         100 };
106              
107 21         74 return $factory, $set_on_completion_args;
108             }
109              
110             sub new {
111 12     12 1 36830 my $ref = shift;
112 12         36 my $self = bless {}, $ref;
113 12         56 my %args = @_;
114              
115 12         47 foreach my $arg (@proc_args) {
116 108 100       365 $self->{$arg} = delete $args{$arg} if defined $args{$arg};
117             }
118              
119 12 50       54 if (%args) {
120 0         0 croak 'Unknown arguments: ' . join ', ', keys %args;
121             }
122              
123 12         44 return $self;
124             }
125              
126             sub run {
127 21     21 1 3242364 my $self = shift;
128 21         48 my %args = @_;
129 21         35 my %proc_args;
130             my @fh_table;
131 0         0 my @handles;
132              
133             # Process arguments
134 21         73 foreach my $arg (@proc_args) {
135 189   66     736 $proc_args{$arg} = $args{$arg} // $self->{$arg};
136 189 100       428 delete $args{$arg} if defined $args{$arg};
137             }
138              
139 21 50       75 if (%args) {
140 0         0 croak 'Unknown arguments: ' . join ', ', keys %args;
141             }
142              
143             my ($callback_factory, $set_on_completion_args) =
144 21         76 _create_callback_factory($proc_args{on_completion});
145              
146             # Handle fh_table
147 21         39 for (my $i = 0; $i < $#{$proc_args{fh_table}}; $i += 2) {
  59         240  
148 38         71 my ($handle, $args) = @{$proc_args{fh_table}}[$i, $i + 1];
  38         101  
149              
150 38 50 33     233 unless (ref $handle eq 'GLOB' or $handle =~ /^\d{1,4}$/) {
    100          
    100          
    50          
151 0         0 croak "Every second element in 'fh_table' must be " .
152             "GLOB reference or file descriptor number";
153             } elsif ($args->[0] eq 'pipe') {
154 32         40 my ($my_fh, $child_fh);
155              
156             # Create pipe or socketpair
157 32 100 0     109 if ($args->[1] eq '>') {
    50          
    0          
158 16         128 ($my_fh, $child_fh) = portable_pipe;
159             } elsif ($args->[1] eq '<') {
160 16         134 ($child_fh, $my_fh) = portable_pipe;
161             } elsif ($args->[1] eq '+>' or $args->[1] eq '+<') {
162 0         0 ($child_fh, $my_fh) = portable_socketpair;
163             } else {
164 0         0 croak "Invalid mode '$args->[1]'";
165             }
166              
167 32 50 33     1137 unless (defined $my_fh && defined $child_fh) {
168 0         0 croak "Creating pipe failed: $!";
169             }
170              
171 32         94 push @fh_table, [$handle, $child_fh];
172 32 100       104 if (ref $args->[2] eq 'GLOB') {
    50          
173 28         468 open $args->[2], '+>&', $my_fh;
174 28         161 close $my_fh;
175             } elsif ($args->[2] eq 'handle') {
176 4         14 push @handles, [$my_fh, $args->[3]];
177             }
178             } elsif ($args->[0] eq 'open') {
179 1         28 open my $fh, $args->[1], $args->[2];
180 1 50       3 unless (defined $fh) {
181 0         0 croak "Opening file failed: $!";
182             }
183 1         4 push @fh_table, [$handle, $fh];
184             } elsif ($args->[0] eq 'decorate') {
185 5         7 my $out = $args->[3];
186 5 50 33     17 unless (defined $out or ref $out eq 'GLOB') {
187 0         0 croak "Third argument of decorate must be a glob reference";
188             }
189              
190 5         20 my ($my_fh, $child_fh) = portable_pipe;
191 5 50 33     141 unless (defined $my_fh && defined $child_fh) {
192 0         0 croak "Creating pipe failed: $!";
193             }
194              
195 5         10 my $on_read;
196 5         9 my $decorator = $args->[2];
197 5 100 66     44 if (defined $decorator and ref $decorator eq '') {
    50 33        
198             $on_read = sub {
199 1     1   91 while ($_[0]->rbuf() =~ s/^(.*\n)//) {
200 2         44 print $out $decorator, $1;
201             }
202 2         8 };
203             } elsif (defined $decorator and ref $decorator eq 'CODE') {
204             $on_read = sub {
205 2     2   216 while ($_[0]->rbuf() =~ s/^(.*\n)//) {
206 3         83 print $out $decorator->($1);
207             }
208 3         12 };
209             } else {
210 0         0 croak "Second argument of decorate must be a string or code reference";
211             }
212              
213 5         42 push @fh_table, [$handle, $child_fh];
214 5         15 push @handles, [$my_fh, [on_read => $on_read, on_eof => $callback_factory->()]];
215             } else {
216 0         0 croak "Unknown redirect type '$args->[0]'";
217             }
218             }
219              
220             # Start child
221 21         27277 my $pid = fork;
222 21         537 my $job;
223 21 50       1500 unless (defined $pid) {
    100          
224 0         0 croak "Fork failed: $!";
225             } elsif ($pid == 0) {
226             # Duplicate FDs
227 6         384 foreach my $dup (@fh_table) {
228 12         1602 open $dup->[0], '+>&', $dup->[1];
229 12         247 close $dup->[1];
230             }
231              
232             # Close handles
233 6         95 foreach my $dup (@handles) {
234 4         55 close $dup->[0];
235             }
236              
237             # Close other filedescriptors
238 6 50       336 if (defined $proc_args{close_all_fds_except}) {
239 0         0 my @not_close = map fileno($_), @{$proc_args{close_all_fds_except}};
  0         0  
240 0         0 AE::log trace => "Closing all other fds except: " . join ', ', @not_close;
241 0         0 push @not_close, fileno $_->[0] foreach @fh_table;
242              
243 0         0 AE::log trace => "Closing all other fds except: " . join ', ', @not_close;
244 0         0 AnyEvent::Util::close_all_fds_except @not_close;
245             }
246              
247             # Run the code
248 6   50     52 my $rtn = $proc_args{code}->(@{$proc_args{args} // []});
  6         597  
249 0 0       0 exit ($rtn eq int($rtn) ? $rtn : 1);
250             } else {
251 15         1453 AE::log info => "Forked new process $pid.";
252              
253 15         2640 $job = new AnyEvent::Process::Job($pid);
254              
255             # Close FDs
256 15         180 foreach my $dup (@fh_table) {
257 26         229 AE::log trace => "Closing $dup->[1].";
258 26         112295 close $dup->[1];
259             }
260              
261             # Create handles
262 15         92 foreach my $handle (@handles) {
263 5         11 my (@hdl_args, @hdl_calls);
264 5         13 for (my $i = 0; $i < $#{$handle->[1]}; $i += 2) {
  13         61  
265 8 100 66     235 if (AnyEvent::Handle->can($handle->[1][$i]) and 'ARRAY' eq ref $handle->[1][$i+1]) {
266 2 50       6 if ($handle->[1][$i] eq 'on_eof') {
267 0         0 push @hdl_calls, [$handle->[1][$i], $callback_factory->($handle->[1][$i+1][0])];
268             } else {
269 2         19 push @hdl_calls, [$handle->[1][$i], $handle->[1][$i+1]];
270             }
271             } else {
272 6         39 push @hdl_args, $handle->[1][$i] => $handle->[1][$i+1];
273             }
274             }
275 5         35 AE::log trace => "Creating handle " . join ' ', @hdl_args;
276 5         282 my $hdl = AnyEvent::Handle->new(fh => $handle->[0], @hdl_args);
277 5         738 foreach my $call (@hdl_calls) {
278 12     12   72 no strict 'refs';
  12         54  
  12         9741  
279 2         6 my $method = $call->[0];
280 2         4 AE::log trace => "Calling handle method $method(" . join(', ', @{$call->[1]}) . ')';
  2         12  
281 2         81 $hdl->$method(@{$call->[1]});
  2         28  
282             }
283 5         272 $job->_add_handle($hdl);
284             }
285              
286             # Create callbacks
287             my $completion_cb = sub {
288 10     10   58 $job->_remove_timers();
289 10         163 AE::log info => "Process $job->{pid} finished with code $_[1].";
290 10         767 $set_on_completion_args->($job, $_[1]);
291 15         412 };
292 15         330 $job->_add_cb(AE::child $pid, $callback_factory->($completion_cb));
293              
294 15         118 $self->{job} = $job;
295              
296             # Create watchdog and kill timers
297 15   100 1   377 my $on_kill = $proc_args{on_kill} // sub { $_[0]->kill(9) };
  1         26  
298 15 100       155 if (defined $proc_args{kill_interval}) {
299             my $kill_cb = sub {
300 1     1   1996189 $job->_remove_timers();
301 1         25 AE::log warn => "Process $job->{pid} is running too long, killing it.";
302 1         127 $on_kill->($job);
303 1         30 };
304 1         53 $job->_add_timer(AE::timer $proc_args{kill_interval}, 0, $kill_cb);
305             }
306 15 100 66     345 if (defined $proc_args{watchdog_interval} or defined $proc_args{on_watchdog}) {
307 1 50 33     25 unless (defined $proc_args{watchdog_interval} &&
308             defined $proc_args{on_watchdog}) {
309 0         0 croak "Both or none of watchdog_interval and on_watchdog must be defined";
310             }
311              
312             my $watchdog_cb = sub {
313 3     3   5996176 AE::log info => "Executing watchdog for process $job->{pid}.";
314 3 100       71 unless ($proc_args{on_watchdog}->($job)) {
315 1         1139 $job->_remove_timers();
316 1         6 AE::log warn => "Watchdog for process $job->{pid} failed, killing it.";
317 1         15783 $on_kill->($job);
318             }
319 1         18 };
320 1         39 $job->_add_timer(AE::timer $proc_args{watchdog_interval}, $proc_args{watchdog_interval}, $watchdog_cb);
321             }
322             }
323            
324             # We need this to allow AE collecting pending signals and prevent accumulation of zombies
325 15         194 $self->_yield;
326              
327 15         2869 return $job;
328             }
329              
330             sub kill {
331 1     1 1 3438 my ($self, $signal) = @_;
332              
333 1 50       6 croak 'No process was started' unless defined $self->{job};
334 1   50     16 return $self->{job}->kill($signal // 15);
335             }
336              
337             sub pid {
338 1     1 1 20 my $self = shift;
339              
340 1 50       16 croak 'No process was started' unless defined $self->{job};
341 1         5 return $self->{job}->pid();
342             }
343              
344             sub close {
345 1     1 1 1237 my $self = shift;
346              
347 1 50       15 croak 'No process was started' unless defined $self->{job};
348 1         14 return $self->{job}->close();
349             }
350              
351             1;
352              
353             __END__