File Coverage

blib/lib/AnyEvent/Process.pm
Criterion Covered Total %
statement 171 202 84.6
branch 44 70 62.8
condition 22 44 50.0
subroutine 26 30 86.6
pod 5 5 100.0
total 268 351 76.3


line stmt bran cond sub pod time code
1             package AnyEvent::Process::Job;
2 10     10   9967 use strict;
  10         25  
  10         3869  
3              
4             sub new {
5 14     14   699 my ($ref, $pid) = @_;
6 14         827 my $self = bless {pid => $pid, cbs => [], handles => [], timers => []}, $ref;
7              
8 14         125 return $self;
9             }
10              
11             sub kill {
12 2     2   1204 my ($self, $signal) = @_;
13              
14 2         166 return kill $signal, $self->{pid};
15             }
16              
17             sub pid {
18 0     0   0 return $_[0]->{pid};
19             }
20              
21             sub add_cb {
22 14     14   27767 my ($self, $cb) = @_;
23 14         50 push @{$self->{cbs}}, $cb;
  14         214  
24             }
25              
26             sub add_handle {
27 4     4   8 my ($self, $handle) = @_;
28 4         6 push @{$self->{handles}}, $handle;
  4         64  
29             }
30              
31             sub add_timer {
32 2     2   12 my ($self, $timer) = @_;
33 2         4 push @{$self->{timers}}, $timer;
  2         7  
34             }
35              
36             sub cancel_timers {
37 6     6   22 my $self = shift;
38 6         12 undef $_ foreach @{$self->{timers}};
  6         62  
39 6         26 undef $self->{timers};
40             }
41              
42             sub close {
43 1     1   2 my $self = shift;
44 1         2 undef $_ foreach @{$self->{handles}};
  1         33  
45 1         54 undef $self->{handles};
46             }
47              
48             package AnyEvent::Process;
49 10     10   63 use strict;
  10         20  
  10         288  
50              
51 10     10   15249 use AnyEvent::Handle;
  10         244192  
  10         410  
52 10     10   127 use AnyEvent::Util;
  10         19  
  10         921  
53 10     10   64 use AnyEvent;
  10         18  
  10         217  
54 10     10   52 use Carp;
  10         19  
  10         17594  
55              
56             our @proc_args = qw(fh_table code on_completion args watchdog_interval on_watchdog kill_interval on_kill);
57             our $VERSION = '0.01';
58              
59             sub new {
60 11     11 1 38863 my $ref = shift;
61 11         54 my $self = bless {}, $ref;
62 11         67 my %args = @_;
63              
64 11         47 foreach my $arg (@proc_args) {
65 88 100       344 $self->{$arg} = delete $args{$arg} if defined $args{$arg};
66             }
67              
68 11 50       45 if (%args) {
69 0         0 croak 'Unknown arguments: ' . join ', ', keys %args;
70             }
71              
72 11         45 return $self;
73             }
74              
75             sub run {
76 20     20 1 4457170 my $self = shift;
77 20         56 my %args = @_;
78 20         44 my %proc_args;
79             my @fh_table;
80 0         0 my @handles;
81 0         0 my ($last_callback, $last_callback_set_args);
82              
83             # Process arguments
84 20         63 foreach my $arg (@proc_args) {
85 160   100     782 $proc_args{$arg} = $args{$arg} // $self->{$arg};
86 160 100       450 delete $args{$arg} if defined $args{$arg};
87             }
88              
89 20 50       83 if (%args) {
90 0         0 croak 'Unknown arguments: ' . join ', ', keys %args;
91             }
92              
93 20 100       112 if (defined $proc_args{on_completion}) {
94 6         14 my $counter = 0;
95 6         16 my @last_callback_args;
96              
97             $last_callback = sub {
98 8   100 8   183 my $func = shift // sub {};
  2         5  
99              
100 8         19 $counter++;
101             return sub {
102 6         930850 my ($err, $rtn);
103              
104 6         16 eval {
105 6         35 $rtn = $func->(@_);
106 6         18 }; $err = $@;
107              
108 6 100       40 if (--$counter == 0) {
109 4         43 eval {
110 4         34 $proc_args{on_completion}->(@last_callback_args);
111 4   33     3282 }; $err = $err || $@;
112             }
113              
114 6 50       31 if ($err) {
115 0         0 croak $err;
116             }
117              
118 6         48 return $rtn;
119             }
120 6         35 };
  8         564  
121             $last_callback_set_args = sub {
122 4     4   31 @last_callback_args = @_;
123             }
124 6         34 } else {
125 14   50 10   73 $last_callback = sub { $_[0] // sub {} };
  10         1690  
  0         0  
126             }
127              
128             # Handle fh_table
129 20         53 for (my $i = 0; $i < $#{$proc_args{fh_table}}; $i += 2) {
  56         223  
130 36         60 my ($handle, $args) = @{$proc_args{fh_table}}[$i, $i + 1];
  36         99  
131              
132 36 50 33     239 unless (ref $handle eq 'GLOB' or $handle =~ /^\d{1,4}$/) {
    100          
    50          
    50          
133 0         0 croak "Every second element in 'fh_table' must be " .
134             "GLOB reference or file descriptor number";
135             } elsif ($args->[0] eq 'pipe') {
136 32         44 my ($my_fh, $child_fh);
137              
138             # Create pipe or socketpair
139 32 100 0     105 if ($args->[1] eq '>') {
    50          
    0          
140 16         58 ($my_fh, $child_fh) = portable_pipe;
141             } elsif ($args->[1] eq '<') {
142 16         156 ($child_fh, $my_fh) = portable_pipe;
143             } elsif ($args->[1] eq '+>' or $args->[1] eq '+<') {
144 0         0 ($child_fh, $my_fh) = portable_socketpair;
145             } else {
146 0         0 croak "Invalid mode '$args->[1]'";
147             }
148              
149 32 50 33     1802 unless (defined $my_fh && defined $child_fh) {
150 0         0 croak "Creating pipe failed: $!";
151             }
152              
153 32         108 push @fh_table, [$handle, $child_fh];
154 32 100       102 if (ref $args->[2] eq 'GLOB') {
    50          
155 28         559 open $args->[2], '+>&', $my_fh;
156 28         211 close $my_fh;
157             } elsif ($args->[2] eq 'handle') {
158 4         18 push @handles, [$my_fh, $args->[3]];
159             }
160             } elsif ($args->[0] eq 'open') {
161 0         0 open my $fh, $args->[1], $args->[2];
162 0 0       0 unless (defined $fh) {
163 0         0 croak "Opening file failed: $!";
164             }
165 0         0 push @fh_table, [$handle, $fh];
166             } elsif ($args->[0] eq 'decorate') {
167 4         10 my $out = $args->[3];
168 4 50 33     20 unless (defined $out or ref $out eq 'GLOB') {
169 0         0 croak "Third argument of decorate must be a glob reference";
170             }
171              
172 4         18 my ($my_fh, $child_fh) = portable_pipe;
173 4 50 33     158 unless (defined $my_fh && defined $child_fh) {
174 0         0 croak "Creating pipe failed: $!";
175             }
176              
177 4         6 my $on_read;
178 4         6 my $decorator = $args->[2];
179 4 100 66     38 if (defined $decorator and ref $decorator eq '') {
    50 33        
180             $on_read = sub {
181 1     1   54 while ($_[0]->rbuf() =~ s/^(.*\n)//) {
182 2         21 print $out $decorator, $1;
183             }
184 2         10 };
185             } elsif (defined $decorator and ref $decorator eq 'CODE') {
186             $on_read = sub {
187 1     1   184 while ($_[0]->rbuf() =~ s/^(.*\n)//) {
188 2         39 print $out $decorator->($1);
189             }
190 2         10 };
191             } else {
192 0         0 croak "Second argument of decorate must be a string or code reference";
193             }
194              
195 4         10 push @fh_table, [$handle, $child_fh];
196 4         12 push @handles, [$my_fh, [on_read => $on_read, on_eof => $last_callback->()]];
197             } else {
198 0         0 croak "Unknown redirect type '$args->[0]'";
199             }
200             }
201              
202             # Start child
203 20         29418 my $pid = fork;
204 20         714 my $job;
205 20 50       1157 unless (defined $pid) {
    100          
206 0         0 croak "Fork failed: $!";
207             } elsif ($pid == 0) {
208             # Duplicate FDs
209 6         491 foreach my $dup (@fh_table) {
210 12         1842 open $dup->[0], '+>&', $dup->[1];
211 12         479 close $dup->[1];
212             }
213              
214             # Close handles
215 6         57 foreach my $dup (@handles) {
216 4         44 close $dup->[0];
217             }
218              
219             # Run the code
220 6   50     22 my $rtn = $proc_args{code}->(@{$proc_args{args} // []});
  6         1443  
221 0 0       0 exit ($rtn eq int($rtn) ? $rtn : 1);
222             } else {
223 14         1941 AE::log info => "Forked new process $pid.";
224              
225 14         2950 $job = new AnyEvent::Process::Job($pid);
226              
227             # Close FDs
228 14         190 foreach my $dup (@fh_table) {
229 24         374 AE::log trace => "Closing $dup->[1].";
230 24         108318 close $dup->[1];
231             }
232              
233             # Create handles
234 14         40 foreach my $handle (@handles) {
235 4         9 my (@hdl_args, @hdl_calls);
236 4         8 for (my $i = 0; $i < $#{$handle->[1]}; $i += 2) {
  10         62  
237 6 100 66     189 if (AnyEvent::Handle->can($handle->[1][$i]) and 'ARRAY' eq ref $handle->[1][$i+1]) {
238 2 50       6 if ($handle->[1][$i] eq 'on_eof') {
239 0         0 push @hdl_calls, [$handle->[1][$i], $last_callback->($handle->[1][$i+1][0])];
240             } else {
241 2         22 push @hdl_calls, [$handle->[1][$i], $handle->[1][$i+1]];
242             }
243             } else {
244 4         16 push @hdl_args, $handle->[1][$i] => $handle->[1][$i+1];
245             }
246             }
247 4         24 AE::log trace => "Creating handle " . join ' ', @hdl_args;
248 4         266 my $hdl = AnyEvent::Handle->new(fh => $handle->[0], @hdl_args);
249 4         617 foreach my $call (@hdl_calls) {
250 10     10   80 no strict 'refs';
  10         30  
  10         8909  
251 2         4 my $method = $call->[0];
252 2         6 AE::log trace => "Calling handle method $method(" . join(', ', @{$call->[1]}) . ')';
  2         13  
253 2         75 $hdl->$method(@{$call->[1]});
  2         29  
254             }
255 4         350 $job->add_handle($hdl);
256             }
257              
258             # Create callbacks
259 14         116 my $completion_cb;
260 14 100       66 if (defined $proc_args{on_completion}) {
261             $completion_cb = sub {
262 4     4   31 $job->cancel_timers();
263 4         77 AE::log info => "Process $job->{pid} finished with code $_[1].";
264 4         299 $last_callback_set_args->($job, $_[1]);
265 4         138 };
266             } else {
267             $completion_cb = sub {
268 0     0   0 $job->cancel_timers();
269 0         0 AE::log info => "Process $job->{pid} finished with code $_[1]";
270 10         407 };
271             }
272 14         225 $job->add_cb(AE::child $pid, $last_callback->($completion_cb));
273              
274 14         171 $self->{job} = $job;
275              
276             # Create watchdog and kill timers
277 14   100 1   475 my $on_kill = $proc_args{on_kill} // sub { $_[0]->kill(9) };
  1         24  
278 14 100       69 if (defined $proc_args{kill_interval}) {
279             my $kill_cb = sub {
280 1     1   1996542 $job->cancel_timers();
281 1         24 AE::log warn => "Process $job->{pid} is running too long, killing it.";
282 1         120 $on_kill->($job);
283 1         32 };
284 1         46 $job->add_timer(AE::timer $proc_args{kill_interval}, 0, $kill_cb);
285             }
286 14 100 66     346 if (defined $proc_args{watchdog_interval} or defined $proc_args{on_watchdog}) {
287 1 50 33     26 unless (defined $proc_args{watchdog_interval} &&
288             defined $proc_args{on_watchdog}) {
289 0         0 croak "Both or none of watchdog_interval and on_watchdog must be defined";
290             }
291              
292             my $watchdog_cb = sub {
293 3     3   5997133 AE::log info => "Executing watchdog for process $job->{pid}.";
294 3 100       71 unless ($proc_args{on_watchdog}->($job)) {
295 1         1451 $job->cancel_timers();
296 1         9 AE::log warn => "Watchdog for process $job->{pid} failed, killing it.";
297 1         19013 $on_kill->($job);
298             }
299 1         20 };
300 1         35 $job->add_timer(AE::timer $proc_args{watchdog_interval}, $proc_args{watchdog_interval}, $watchdog_cb);
301             }
302             }
303              
304 14         1172 return $job;
305             }
306              
307             sub kill {
308 0     0 1 0 my ($self, $signal) = @_;
309              
310 0 0       0 croak 'No process was started' unless defined $self->{job};
311 0         0 return $self->{job}->kill($signal);
312             }
313              
314             sub pid {
315 0     0 1 0 my $self = shift;
316              
317 0 0       0 croak 'No process was started' unless defined $self->{job};
318 0         0 return $self->{job}->pid();
319             }
320              
321             sub close {
322 1     1 1 2414 my $self = shift;
323              
324 1 50       4 croak 'No process was started' unless defined $self->{job};
325 1         11 return $self->{job}->close();
326             }
327              
328             1;
329              
330             __END__