File Coverage

blib/lib/IPC/Open3/Callback.pm
Criterion Covered Total %
statement 30 30 100.0
branch n/a
condition n/a
subroutine 10 10 100.0
pod n/a
total 40 40 100.0


line stmt bran cond sub pod time code
1 1     1   17887 use strict;
  1         2  
  1         45  
2 1     1   5 use warnings;
  1         2  
  1         60  
3              
4             package IPC::Open3::Callback;
5             $IPC::Open3::Callback::VERSION = '1.16';
6             # ABSTRACT: An extension to IPC::Open3 that will feed out and err to callbacks instead of requiring the caller to handle them.
7             # PODNAME: IPC::Open3::Callback
8              
9 1     1   738 use Data::Dumper;
  1         9920  
  1         99  
10 1     1   9 use Exporter qw(import);
  1         2  
  1         42  
11 1     1   677 use Hash::Util qw(lock_keys);
  1         2848  
  1         7  
12 1     1   768 use IO::Select;
  1         1905  
  1         60  
13 1     1   671 use IO::Socket;
  1         29214  
  1         4  
14 1     1   1531 use IPC::Open3;
  1         2113  
  1         52  
15 1     1   5 use Symbol qw(gensym);
  1         1  
  1         32  
16              
17 1     1   414 use parent qw(Class::Accessor);
  1         239  
  1         4  
18             __PACKAGE__->follow_best_practice;
19             __PACKAGE__->mk_accessors(
20             qw(out_callback err_callback buffer_output select_timeout buffer_size input_buffer));
21             __PACKAGE__->mk_ro_accessors(qw(pid last_command last_exit_code));
22              
23             our @EXPORT_OK = qw(safe_open3);
24              
25             my $logger;
26             eval {
27             require Log::Log4perl;
28             $logger = Log::Log4perl->get_logger('IPC::Open3::Callback');
29             };
30             if ($@) {
31             require IPC::Open3::Callback::Logger;
32             $logger = IPC::Open3::Callback::Logger->get_logger();
33             }
34              
35             sub new {
36             my ( $class, @args ) = @_;
37             return bless( {}, $class )->_init(@args);
38             }
39              
40             sub _append_to_buffer {
41             my ( $self, $buffer_ref, $data, $flush ) = @_;
42              
43             my @lines = split( /\n/, $$buffer_ref . $data, -1 );
44              
45             # save the last line in the buffer as it may not yet be a complete line
46             $$buffer_ref = $flush ? '' : pop(@lines);
47              
48             # return all complete lines
49             return @lines;
50             }
51              
52             sub _clear_input_buffer {
53             my ($self) = shift;
54             delete( $self->{input_buffer} );
55             }
56              
57             sub DESTROY {
58             my ($self) = shift;
59             $self->_destroy_child();
60             }
61              
62             sub _destroy_child {
63             my $self = shift;
64              
65             my $pid = $self->get_pid();
66             if ($pid) {
67             waitpid( $pid, 0 );
68             $self->_set_last_exit_code( $? >> 8 );
69              
70             $logger->debug(
71             sub {
72             "Exited '",
73             $self->get_last_command(),
74             "' with code ",
75             $self->get_last_exit_code();
76             }
77             );
78             $self->_set_pid();
79             }
80              
81             return $self->{last_exit_code};
82             }
83              
84             sub _init {
85             my ( $self, $args_ref ) = @_;
86              
87             $self->{buffer_output} = undef;
88             $self->{buffer_size} = undef;
89             $self->{err_callback} = undef;
90             $self->{input_buffer} = undef;
91             $self->{last_command} = undef;
92             $self->{last_exit_code} = undef;
93             $self->{out_callback} = undef;
94             $self->{pid} = undef;
95             $self->{select_timeout} = undef;
96             lock_keys( %{$self} );
97              
98             if ( defined($args_ref) ) {
99             $logger->logdie('parameters must be an hash reference')
100             unless ( ( ref($args_ref) ) eq 'HASH' );
101             $self->{out_callback} = $args_ref->{out_callback};
102             $self->{err_callback} = $args_ref->{err_callback};
103             $self->{buffer_output} = $args_ref->{buffer_output};
104             $self->{select_timeout} = $args_ref->{select_timeout} || 3;
105             $self->{buffer_size} = $args_ref->{buffer_size} || 1024;
106             }
107             else {
108             $self->{select_timeout} = 3;
109             $self->{buffer_size} = 1024;
110             }
111              
112             return $self;
113             }
114              
115             sub _nix_open3 {
116             my ( $in_read, $out_write, $err_write, @command ) = @_;
117             my ( $in_write, $out_read, $err_read );
118              
119             if ( !$in_read ) {
120             $in_read = gensym();
121             $in_write = $in_read;
122             }
123             if ( !$out_write ) {
124             $out_read = gensym();
125             $out_write = $out_read;
126             }
127             if ( !$err_write ) {
128             $err_read = gensym();
129             $err_write = $err_read;
130             }
131              
132             return ( open3( $in_read, $out_write, $err_write, @command ),
133             $in_write, $out_read, $err_read );
134             }
135              
136             sub run_command {
137             my ( $self, @command ) = @_;
138              
139             # if last arg is hashref, its command options not arg...
140             my $options = {};
141             if ( ref( $command[-1] ) eq 'HASH' ) {
142             $options = pop(@command);
143             }
144              
145             my ($out_callback, $out_buffer_ref, $err_callback,
146             $err_buffer_ref, $buffer_size, $select_timeout
147             );
148             $out_callback = $options->{out_callback} || $self->get_out_callback();
149             $err_callback = $options->{err_callback} || $self->get_err_callback();
150             if ( $options->{buffer_output} || $self->get_buffer_output() ) {
151             my $out_temp = '';
152             my $err_temp = '';
153             $out_buffer_ref = \$out_temp;
154             $err_buffer_ref = \$err_temp;
155             }
156             $buffer_size = $options->{buffer_size} || $self->get_buffer_size();
157             $select_timeout = $options->{select_timeout} || $self->get_select_timeout();
158              
159             $self->_set_last_command( \@command );
160             $logger->debug( "Running '", $self->get_last_command(), "'" );
161             my ( $pid, $in_fh, $out_fh, $err_fh ) = safe_open3_with(
162             $options->{in_handle},
163             $options->{out_handle},
164             $options->{err_handle}, @command
165             );
166             $self->_set_pid($pid);
167              
168             my $select = IO::Select->new();
169             $select->add( $out_fh, $err_fh );
170             while ( my @ready = $select->can_read($select_timeout) ) {
171             if ( $self->get_input_buffer() ) {
172             syswrite( $in_fh, $self->get_input_buffer() );
173             $self->_clear_input_buffer();
174             }
175             foreach my $fh (@ready) {
176             my $line;
177             my $bytes_read = sysread( $fh, $line, $buffer_size );
178             if ( !defined($bytes_read) && !$!{ECONNRESET} ) {
179             $logger->error( "sysread failed: ", sub { Dumper(%!) } );
180             $logger->logdie( "error in running '", $self->get_last_command(), "': ", $! );
181             }
182             elsif ( !defined($bytes_read) || $bytes_read == 0 ) {
183             $select->remove($fh);
184             next;
185             }
186             else {
187             if ( $out_fh && $fh == $out_fh ) {
188             $self->_write_to_callback( $out_callback, $line, $out_buffer_ref, 0 );
189             }
190             elsif ( $err_fh && $fh == $err_fh ) {
191             $self->_write_to_callback( $err_callback, $line, $err_buffer_ref, 0 );
192             }
193             else {
194             $logger->logdie('Impossible... somehow got a filehandle I dont know about!');
195             }
196             }
197             }
198             }
199              
200             # flush buffers
201             $self->_write_to_callback( $out_callback, '', $out_buffer_ref, 1 );
202             $self->_write_to_callback( $err_callback, '', $err_buffer_ref, 1 );
203              
204             return $self->_destroy_child();
205             }
206              
207             sub safe_open3 {
208             return safe_open3_with( undef, undef, undef, @_ );
209             }
210              
211             sub safe_open3_with {
212             my ( $in_handle, $out_handle, $err_handle, @command ) = @_;
213              
214             my @args = (
215             $in_handle ? '<&' . fileno($in_handle) : undef,
216             $out_handle ? '>&' . fileno($out_handle) : undef,
217             $err_handle ? '>&' . fileno($err_handle) : undef, @command
218             );
219             return ( $^O =~ /MSWin32/ ) ? _win_open3(@args) : _nix_open3(@args);
220             }
221              
222             sub send_input {
223             my ($self) = @_;
224             $self->set_input_buffer(shift);
225             }
226              
227             sub _set_last_command {
228             my ( $self, $command_ref ) = @_;
229              
230             $logger->logdie('the command parameter must be an array reference')
231             unless ( ( ref($command_ref) ) eq 'ARRAY' );
232              
233             $self->{last_command} = join( ' ', @{$command_ref} );
234             }
235              
236             sub _set_last_exit_code {
237             my ( $self, $code ) = @_;
238             $self->{last_exit_code} = $code;
239             }
240              
241             sub _set_pid {
242             my ( $self, $pid ) = @_;
243              
244             if ( !defined($pid) ) {
245             delete( $self->{pid} );
246             }
247             elsif ( $pid !~ /^\d+$/ ) {
248             $logger->logdie('the parameter must be an integer');
249             }
250             else {
251             $self->{pid} = $pid;
252             }
253             }
254              
255             sub _win_open3 {
256             my ( $in_read, $out_write, $err_write, @command ) = @_;
257              
258             my ($in_pipe_read, $in_pipe_write, $out_pipe_read,
259             $out_pipe_write, $err_pipe_read, $err_pipe_write
260             );
261             if ( !$in_read ) {
262             ( $in_pipe_read, $in_pipe_write ) = _win_pipe();
263             $in_read = '>&' . fileno($in_pipe_read);
264             }
265             if ( !$out_write ) {
266             ( $out_pipe_read, $out_pipe_write ) = _win_pipe();
267             $out_write = '<&' . fileno($out_pipe_write);
268             }
269             if ( !$err_write ) {
270             ( $err_pipe_read, $err_pipe_write ) = _win_pipe();
271             $err_write = '<&' . fileno($err_pipe_write);
272             }
273              
274             my $pid = open3( $in_read, $out_write, $err_write, @command );
275              
276             return ( $pid, $in_pipe_write, $out_pipe_read, $err_pipe_read );
277             }
278              
279             sub _win_pipe {
280             my ( $read, $write ) = IO::Socket->socketpair( AF_UNIX, SOCK_STREAM, PF_UNSPEC );
281             $read->shutdown(SHUT_WR); # No more writing for reader
282             $write->shutdown(SHUT_RD); # No more reading for writer
283              
284             return ( $read, $write );
285             }
286              
287             sub _write_to_callback {
288             my ( $self, $callback, $data, $buffer_ref, $flush ) = @_;
289              
290             return if ( !defined($callback) );
291              
292             if ( !defined($buffer_ref) ) {
293             &{$callback}( $data, $self->get_pid() );
294             return;
295             }
296              
297             &{$callback}($_) foreach ( $self->_append_to_buffer( $buffer_ref, $data, $flush ) );
298             }
299              
300             1;
301              
302             __END__