File Coverage

blib/lib/IPC/Simple.pm
Criterion Covered Total %
statement 154 176 87.5
branch 22 36 61.1
condition 5 12 41.6
subroutine 41 49 83.6
pod 12 18 66.6
total 234 291 80.4


line stmt bran cond sub pod time code
1             package IPC::Simple;
2             # ABSTRACT: simple, non-blocking IPC
3             $IPC::Simple::VERSION = '0.08';
4              
5 4     4   306371 use strict;
  4         46  
  4         121  
6 4     4   22 use warnings;
  4         6  
  4         104  
7              
8 4     4   20 use Carp;
  4         8  
  4         188  
9 4     4   21 use AnyEvent qw();
  4         7  
  4         97  
10 4     4   2963 use AnyEvent::Handle qw();
  4         81610  
  4         128  
11 4     4   2380 use IPC::Open3 qw(open3);
  4         16470  
  4         253  
12 4     4   2096 use POSIX qw(:sys_wait_h);
  4         26874  
  4         22  
13 4     4   6233 use Symbol qw(gensym);
  4         8  
  4         178  
14              
15 4     4   2117 use IPC::Simple::Channel qw();
  4         11  
  4         84  
16 4     4   1684 use IPC::Simple::Group qw();
  4         10  
  4         84  
17 4     4   1696 use IPC::Simple::Message;
  4         11  
  4         227  
18              
19 4     4   31 use constant STATE_READY => 0;
  4         8  
  4         233  
20 4     4   26 use constant STATE_RUNNING => 1;
  4         8  
  4         201  
21 4     4   22 use constant STATE_STOPPING => 2;
  4         7  
  4         222  
22              
23             BEGIN{
24 4     4   26 use base 'Exporter';
  4         9  
  4         401  
25 4     4   7830 our @EXPORT_OK = qw(
26             spawn
27             process_group
28             );
29             }
30              
31             #-------------------------------------------------------------------------------
32             # Convenience constructor
33             #-------------------------------------------------------------------------------
34             sub spawn ($;%) {
35 5     5 1 374 my ($cmd, @args) = @_;
36 5         37 return IPC::Simple->new(cmd => $cmd, @args);
37             }
38              
39             sub process_group {
40 1     1 1 10 return IPC::Simple::Group->new(@_);
41             }
42              
43             #-------------------------------------------------------------------------------
44             # Constructor
45             #-------------------------------------------------------------------------------
46             sub new {
47 5     5 0 20 my ($class, %param) = @_;
48 5 50       28 my $cmd = ref $param{cmd} ? $param{cmd} : [ $param{cmd} ];
49 5 50       20 my $eol = defined $param{eol} ? $param{eol} : "\n";
50 5   66     34 my $name = $param{name} || "@$cmd";
51 5         13 my $recv_cb = $param{recv_cb};
52 5         10 my $term_cb = $param{term_cb};
53              
54 5         56 bless{
55             name => $name,
56             cmd => $cmd,
57             eol => $eol,
58             recv_cb => $recv_cb,
59             term_cb => $term_cb,
60             run_state => STATE_READY,
61             pid => undef,
62             handle_in => undef,
63             handle_out => undef,
64             handle_err => undef,
65             exit_status => undef,
66             exit_code => undef,
67             messages => undef,
68             kill_timer => undef,
69             }, $class;
70             }
71              
72             #-------------------------------------------------------------------------------
73             # State accessor and predicates
74             #-------------------------------------------------------------------------------
75             sub run_state {
76 44     44 0 92 my $self = shift;
77              
78 44 100       120 if (@_) {
79 13         38 my $new_state = shift;
80 13         69 $self->debug('run state changed to %d', $new_state);
81 13         33 $self->{run_state} = $new_state;
82             }
83              
84 44         203 return $self->{run_state};
85             }
86              
87 11     11 0 24 sub is_ready { $_[0]->run_state == STATE_READY }
88 14     14 0 6015 sub is_running { $_[0]->run_state == STATE_RUNNING }
89 6     6 0 18 sub is_stopping { $_[0]->run_state == STATE_STOPPING }
90              
91             #-------------------------------------------------------------------------------
92             # Other accessors
93             #-------------------------------------------------------------------------------
94 8     8 1 29 sub name { $_[0]->{name} }
95 0     0 1 0 sub pid { $_[0]->{pid} }
96 0     0 1 0 sub exit_status { $_[0]->{exit_status} }
97 1     1 1 13 sub exit_code { $_[0]->{exit_code} }
98              
99             #-------------------------------------------------------------------------------
100             # Ensure the process is cleaned up when the object is garbage collected.
101             #-------------------------------------------------------------------------------
102             sub DESTROY {
103 0     0   0 my $self = shift;
104              
105             # Localize globals to avoid affecting global state during shutdown
106 0         0 local ($., $@, $!, $^E, $?);
107              
108 0 0 0     0 if ($self->{pid} && waitpid($self->{pid}, WNOHANG) == 0) {
109 0         0 kill 'KILL', $self->{pid};
110 0         0 waitpid $self->{pid}, 0;
111             }
112             }
113              
114             #-------------------------------------------------------------------------------
115             # Logs debug messages
116             #-------------------------------------------------------------------------------
117             sub debug {
118 43     43 0 96 my $self = shift;
119              
120 43 50       124 if ($ENV{IPC_SIMPLE_DEBUG}) {
121 0         0 my $msg = sprintf shift, @_;
122              
123 0         0 my ($pkg, $file, $line) = caller;
124 0   0     0 my $pid = $self->{pid} || '(ready)';
125 0         0 my $ts = time;
126              
127 0         0 warn "<$pkg:$line | $ts | pid:$pid> $msg\n";
128             }
129             }
130              
131             #-------------------------------------------------------------------------------
132             # Launch and helpers
133             #-------------------------------------------------------------------------------
134             sub launch {
135 5     5 1 11452 my $self = shift;
136              
137 5 50       19 if ($self->is_running) {
138 0         0 croak 'already running';
139             }
140              
141 5 50       21 if ($self->is_stopping) {
142 0         0 croak 'process is terminating';
143             }
144              
145 5         18 my $cmd = $self->{cmd};
146              
147 5         39 $self->debug('launching: %s', "@$cmd");
148              
149 5 50       32 my $pid = open3(my $in, my $out, my $err = gensym, @$cmd)
150             or croak $!;
151              
152 5         18123 $self->debug('process launched with pid %d', $pid);
153              
154 5         93 $self->run_state(STATE_RUNNING);
155              
156 5         33 $self->{exit_status} = undef;
157 5         22 $self->{exit_code} = undef;
158 5         29 $self->{kill_timer} = undef;
159 5         38 $self->{pid} = $pid;
160 5         89 $self->{handle_err} = $self->_build_input_handle($err, IPC_STDERR);
161 5         117 $self->{handle_out} = $self->_build_input_handle($out, IPC_STDOUT);
162 5         112 $self->{handle_in} = $self->_build_output_handle($in);
163 5         158 $self->{messages} = IPC::Simple::Channel->new;
164              
165 5         114 return 1;
166             }
167              
168             sub _build_output_handle {
169 5     5   27 my ($self, $fh) = @_;
170              
171             # set non-blocking
172 5         23 AnyEvent::fh_unblock($fh);
173              
174             my $handle = AnyEvent::Handle->new(
175             fh => $fh,
176 0     0   0 on_error => sub{ $self->_on_error(IPC_STDIN, @_) },
177 5         118 );
178              
179 5         412 return $handle;
180             }
181              
182             sub _build_input_handle {
183 10     10   83 my ($self, $fh, $type) = @_;
184              
185             # set non-blocking
186 10         166 AnyEvent::fh_unblock($fh);
187              
188             my $handle = AnyEvent::Handle->new(
189             fh => $fh,
190 0     0   0 on_eof => sub{ $self->terminate },
191 2     2   248 on_error => sub{ $self->_on_error($type, @_) },
192 0     0   0 on_read => sub{ $self->_on_read($type, @_) },
193 10         552 );
194              
195             # push an initial read to prime the queue
196 10         9911 $self->_push_read($handle, $type);
197              
198 10         1084 return $handle;
199             }
200              
201             sub _on_error {
202 2     2   25 my ($self, $type, $handle, $fatal, $msg) = @_;
203 2         11 $self->_queue_message(IPC_ERROR, $msg);
204              
205 2 50       6 if ($fatal) {
206 2         11 $self->terminate;
207             }
208             }
209              
210             sub _on_exit {
211 3     3   12 my ($self, $status) = @_;
212 3         20 undef $self->{kill_timer};
213 3         30 $self->run_state(STATE_READY);
214 3   100     16 $self->{exit_status} = $status || 0;
215 3         11 $self->{exit_code} = $self->{exit_status} >> 8;
216              
217             $self->debug('child (pid %s) exited with status %d (exit code: %d)',
218             $self->{pid} || '(no pid)',
219             $self->{exit_status},
220             $self->{exit_code},
221 3   50     21 );
222              
223             # May not be set yet if launch fails early enough
224 3 50       39 if ($self->{messages}) {
225 3         25 $self->{messages}->shutdown;
226             }
227             }
228              
229             sub _on_read {
230 0     0   0 my ($self, $type, $handle) = @_;
231 0         0 $self->debug('read event type=%s', $type);
232 0         0 $self->_push_read($handle, $type);
233             }
234              
235             sub _push_read {
236 10     10   38 my ($self, $handle, $type) = @_;
237             $handle->push_read(line => $self->{eol}, sub{
238 4     4   781 my ($handle, $line) = @_;
239 4         10 chomp $line;
240 4         20 $self->_queue_message($type, $line);
241 10         189 });
242             }
243              
244             sub _queue_message {
245 6     6   20 my ($self, $type, $msg) = @_;
246 6         21 $self->debug('buffered type=%s, msg="%s"', $type, $msg);
247              
248 6         59 my $message = IPC::Simple::Message->new(
249             source => $self,
250             type => $type,
251             message => $msg,
252             );
253              
254 6 100       19 if ($self->{recv_cb}) {
255 4         21 $self->{recv_cb}->($message);
256             } else {
257 2         8 $self->{messages}->put($message);
258             }
259             }
260              
261             #-------------------------------------------------------------------------------
262             # Send a signal to the process
263             #-------------------------------------------------------------------------------
264             sub signal {
265 5     5 1 18 my ($self, $signal) = @_;
266 5 50       16 if ($self->{pid}) {
267 5         31 $self->debug('sending %s to pid %d', $signal, $self->{pid});
268 5         360 kill $signal, $self->{pid};
269             }
270             }
271              
272             #-------------------------------------------------------------------------------
273             # Stopping the process and waiting on it to complete
274             #-------------------------------------------------------------------------------
275             sub terminate {
276 7     7 1 25 my $self = shift;
277 7         14 my $timeout = shift;
278              
279 7 100       16 if ($self->is_running) {
280 5         38 $self->signal('TERM');
281 5         53 $self->run_state(STATE_STOPPING);
282              
283 5         33 $self->{handle_in}->push_shutdown;
284 5         219 $self->{handle_out}->push_shutdown;
285 5         126 $self->{handle_err}->push_shutdown;
286              
287 5 100       199 if (defined $timeout) {
288             $self->{kill_timer} = AnyEvent->timer(
289             after => $timeout,
290             cb => sub{
291 0     0   0 $self->signal('KILL');
292 0         0 undef $self->{kill_timer};
293             },
294 3         107 );
295             }
296              
297 5 100       133 if ($self->{term_cb}) {
298 2         8 $self->{term_cb}->($self);
299             }
300             }
301             }
302              
303             sub join {
304 3     3 1 9 my $self = shift;
305              
306 3 50       11 return if $self->is_ready;
307              
308 3         22 $self->debug('waiting for process to exit, pid %d', $self->{pid});
309              
310 3         77 my $done = AnyEvent->condvar;
311              
312 3         664 my $timer; $timer = AnyEvent->timer(
313             after => 0,
314             interval => 0.01,
315             cb => sub{
316             # non-blocking waitpid returns 0 if the pid is still alive
317 3 50   3   404 if (waitpid($self->{pid}, WNOHANG) != 0) {
318 3         31 my $status = $?;
319              
320             # another waiter might have already called _on_exit
321 3 50       14 unless ($self->is_ready) {
322 3         19 $self->_on_exit($?);
323             }
324              
325 3         32 $done->send;
326             }
327             },
328 3         63 );
329              
330 3         143 $done->recv;
331             }
332              
333             #-------------------------------------------------------------------------------
334             # Messages
335             #-------------------------------------------------------------------------------
336             sub send {
337 1     1 1 4 my ($self, $msg) = @_;
338 1         10 $self->debug('sending "%s"', $msg);
339 1         12 $self->{handle_in}->push_write($msg . $self->{eol});
340 1         119 1;
341             }
342              
343             sub recv {
344 2     2 1 58 my ($self, $type) = @_;
345 2         7 $self->debug('waiting on message from pid %d', $self->{pid});
346 2         15 $self->{messages}->get;
347             }
348              
349             1;
350              
351             __END__