File Coverage

blib/lib/Mojo/IOLoop/ReadWriteFork.pm
Criterion Covered Total %
statement 181 210 86.1
branch 53 86 61.6
condition 21 53 39.6
subroutine 41 47 87.2
pod 8 8 100.0
total 304 404 75.2


line stmt bran cond sub pod time code
1             package Mojo::IOLoop::ReadWriteFork;
2 22     22   4384583 use Mojo::Base 'Mojo::EventEmitter';
  22         239  
  22         117  
3              
4 22     22   35770 use Errno qw(EAGAIN ECONNRESET EINTR EPIPE EWOULDBLOCK EIO);
  22         21285  
  22         2284  
5 22     22   144 use IO::Handle;
  22         39  
  22         665  
6 22     22   8711 use IO::Pty;
  22         209207  
  22         1053  
7 22     22   8832 use Mojo::Asset::Memory;
  22         609824  
  22         230  
8 22     22   10246 use Mojo::IOLoop;
  22         2245252  
  22         140  
9 22     22   986 use Mojo::IOLoop::Stream;
  22         50  
  22         128  
10 22     22   9395 use Mojo::IOLoop::ReadWriteFork::SIGCHLD;
  22         62  
  22         1806  
11 22     22   751 use Mojo::Promise;
  22         1542  
  22         118  
12 22     22   536 use Mojo::Util qw(term_escape);
  22         43  
  22         950  
13 22     22   109 use Scalar::Util qw(blessed);
  22         38  
  22         987  
14              
15 22   100 22   105 use constant CHUNK_SIZE => $ENV{MOJO_CHUNK_SIZE} || 131072;
  22         39  
  22         1482  
16 22   50 22   122 use constant DEBUG => $ENV{MOJO_READWRITEFORK_DEBUG} && 1;
  22         44  
  22         1076  
17 22     22   137 use constant READ => 0;
  22         33  
  22         866  
18 22     22   110 use constant WRITE => 1;
  22         41  
  22         78659  
19              
20             our $VERSION = '2.00';
21              
22             our @SAFE_SIG
23             = grep { !m!^(NUM\d+|__[A-Z0-9]+__|ALL|CATCHALL|DEFER|HOLD|IGNORE|MAX|PAUSE|RTMAX|RTMIN|SEGV|SETS)$! } keys %SIG;
24              
25             my $SIGCHLD = Mojo::IOLoop::ReadWriteFork::SIGCHLD->singleton;
26              
27             has conduit => sub { +{type => 'pipe'} };
28 8 100   8 1 331 sub pid { shift->{pid} || 0; }
29             has ioloop => sub { Mojo::IOLoop->singleton }, weak => 1;
30              
31             sub close {
32 3     3 1 186 my $self = shift;
33 3 50       53 my $what = $_[0] eq 'stdout' ? 'stdout_read' : 'stdin_write'; # stdout_read is EXPERIMENTAL
34 3 50       39 my $fh = delete $self->{$what} or return $self;
35 3 50       52 $fh->close or $self->emit(error => $!);
36 3         151 $self;
37             }
38              
39             sub run {
40 32 100   32 1 6185 my $args = ref $_[-1] eq 'HASH' ? pop : {};
41 32         245 my ($self, $program, @program_args) = @_;
42 32         474 return $self->start({%$args, program => $program, program_args => \@program_args});
43             }
44              
45             sub run_and_capture_p {
46 1     1 1 207 my $self = shift;
47 1         11 my $asset = Mojo::Asset::Memory->new(auto_upgrade => 1);
48 1     2   11 my $read_cb = $self->on(read => sub { $asset->add_chunk($_[1]) });
  2         39  
49 1     1   9 $asset->once(upgrade => sub { $asset = $_[1]; $self->emit(asset => $asset) });
  1         221  
  1         11  
50 1     1   100 return $self->emit(asset => $asset)->run_p(@_)->then(sub {$asset})
51 1     1   22 ->finally(sub { $self->unsubscribe(read => $read_cb) });
  1         227  
52             }
53              
54             sub run_p {
55 9     9 1 4767 my $self = shift;
56 9         79 my $p = Mojo::Promise->new;
57 9         349 my @cb;
58 9     0   129 push @cb, $self->once(error => sub { shift->unsubscribe(finish => $cb[1]); $p->reject(@_) });
  0         0  
  0         0  
59 9     8   203 push @cb, $self->once(finish => sub { shift->unsubscribe(error => $cb[0]); $p->resolve(@_) });
  8         442  
  8         304  
60 9         127 $self->run(@_);
61 9         52 return $p;
62             }
63              
64             sub start {
65 38     38 1 3839 my $self = shift;
66 38 100       150 my $args = ref $_[0] ? $_[0] : {@_};
67 38         179 my $conduit = $self->conduit;
68              
69 38   33     631 $args->{$_} //= $conduit->{$_} for keys %$conduit;
70 38   66     419 $args->{conduit} ||= delete $args->{type};
71 38   100     566 $args->{env} ||= {%ENV};
72 38         120 $self->{errno} = 0;
73 38 100       261 $args->{program} or die 'program is required input';
74 36     36   193 $self->ioloop->next_tick(sub { $self->_start($args) });
  36         5392  
75 36         3969 return $self;
76             }
77              
78             sub _start {
79 36     36   125 my ($self, $args) = @_;
80 36         77 my (@stdin, @stdout, @stderr);
81              
82 36         242 local $!;
83 36 100       195 if ($args->{conduit} eq 'pipe') {
    50          
84 11         39 @stdin = $self->_pipe;
85 11         36 @stdout = $self->_pipe;
86             }
87             elsif ($args->{conduit} eq 'pty') {
88 25         578 $stdin[WRITE] = $stdout[READ] = IO::Pty->new;
89             }
90             else {
91 0         0 warn "[RWF] Invalid conduit ($args->{conduit})\n" if DEBUG;
92 0         0 return $self->emit(error => "Invalid conduit ($args->{conduit})");
93             }
94              
95 36 100       21417 if ($args->{stderr}) {
96 2         5 @stderr = $self->_pipe;
97             }
98              
99 36         384 my $prepare_event = {
100             stderr_read => $stderr[READ],
101             stderr_write => $stderr[WRITE],
102             stdin_read => $stdin[READ],
103             stdin_write => $stdin[WRITE],
104             stdout_read => $stdout[READ],
105             stdout_write => $stdout[WRITE],
106             };
107              
108 36         336 $self->emit(before_fork => $prepare_event); # LEGACY
109 36         702 $self->emit(prepare => $prepare_event);
110              
111 36 50       82608 return $self->emit(error => "Couldn't fork ($!)") unless defined($self->{pid} = fork);
112             return $self->{pid}
113 36 100       5225 ? $self->_start_parent($args, \@stdin, \@stdout, \@stderr)
114             : $self->_start_child($args, \@stdin, \@stdout, \@stderr);
115             }
116              
117             sub _start_child {
118 3     3   97 my ($self, $args, $stdin, $stdout, $stderr) = @_;
119              
120 3 100 66     535 if (blessed $stdin->[WRITE] and $stdin->[WRITE]->isa('IO::Pty')) {
121 2         170 $stdin->[WRITE]->make_slave_controlling_terminal;
122 2         2948 $stdin->[READ] = $stdout->[WRITE] = $stdin->[WRITE]->slave;
123 2 50       148 $stdin->[READ]->set_raw if $args->{raw};
124 2 50       56 $stdin->[READ]->clone_winsize_from($args->{clone_winsize_from}) if $args->{clone_winsize_from};
125             }
126              
127 3   33     236 my $stdout_no = ($args->{stdout} // 1) && fileno($stdout->[WRITE]);
128 3   33     184 my $stderr_no = ($args->{stderr} // 1) && fileno($stderr->[WRITE] || $stdout->[WRITE]);
129 3 50       500 open STDIN, '<&' . fileno($stdin->[READ]) or exit $!;
130 3 50 33     222 open STDOUT, '>&' . $stdout_no or exit $! if $stdout_no;
131 3 50 33     193 open STDERR, '>&' . $stderr_no or exit $! if $stderr_no;
132 3 50       221 $stdout_no ? STDOUT->autoflush(1) : STDOUT->close;
133 3 50       383 $stderr_no ? STDERR->autoflush(1) : STDERR->close;
134              
135 3         237 $stdin->[WRITE]->close;
136 3         91 $stdout->[READ]->close;
137 3 50       78 $stderr->[READ]->close if $stderr->[READ];
138              
139 3         15 %ENV = %{$args->{env}};
  3         1159  
140              
141 3         27 my $errno;
142 3 50       50 if (ref $args->{program} eq 'CODE') {
143 0         0 $! = 0;
144 0         0 @SIG{@SAFE_SIG} = ('DEFAULT') x @SAFE_SIG;
145 0 0       0 eval { $args->{program}->(@{$args->{program_args} || []}); };
  0         0  
  0         0  
146 0 0       0 $errno = $@ ? 255 : $!;
147 0 0       0 print STDERR $@ if length $@;
148             }
149             else {
150 3 50       26 exec $args->{program}, @{$args->{program_args} || []};
  3         0  
151             }
152              
153 0   0     0 eval { POSIX::_exit($errno // $!); };
  0         0  
154 0   0     0 exit($errno // $!);
155             }
156              
157             sub _start_parent {
158 33     33   769 my ($self, $args, $stdin, $stdout, $stderr) = @_;
159              
160 33         219 $self->_d("Forked $args->{program} @{$args->{program_args} || []}") if DEBUG;
161 33         2013 @$self{qw(stdin_write stdout_read stderr_read)} = ($stdin->[WRITE], $stdout->[READ], $stderr->[READ]);
162 33         742 @$self{qw(wait_eof wait_sigchld)} = (1, 1);
163              
164 33 100 66     3485 $stdout->[READ]->close_slave if blessed $stdout->[READ] and $stdout->[READ]->isa('IO::Pty');
165 33 100       4230 $self->_stream($args, stderr => $stderr->[READ]) if $stderr->[READ];
166 33 100 100     2631 $self->_stream($args, stdout => $stdout->[READ]) if !$stderr->[READ] or $args->{stdout};
167              
168 33     32   14865 $SIGCHLD->waitpid($self->{pid} => sub { $self->_sigchld(@_) });
  32         342  
169 33         1993 $self->emit('fork'); # LEGACY
170 33         600 $self->emit('spawn');
171 33         2124 $self->_write;
172             }
173              
174             sub write {
175 6     6 1 701 my ($self, $chunk, $cb) = @_;
176 6 100       52 $self->once(drain => $cb) if $cb;
177 6         67 $self->{stdin_buffer} .= $chunk;
178 6 100       35 $self->_write if $self->{stdin_write};
179 6         31 $self;
180             }
181              
182             sub kill {
183 0     0 1 0 my $self = shift;
184 0   0     0 my $signal = shift // 15;
185 0 0       0 return undef unless my $pid = $self->{pid};
186 0         0 $self->_d("kill $signal $pid") if DEBUG;
187 0         0 return kill $signal, $pid;
188             }
189              
190             sub _error {
191 0 0 0 0   0 return if $! == EAGAIN || $! == EINTR || $! == EWOULDBLOCK;
      0        
192 0 0 0     0 return $_[0]->kill if $! == ECONNRESET || $! == EPIPE;
193 0         0 return $_[0]->emit(error => $!)->kill;
194             }
195              
196 0     0   0 sub _d { warn "-- [$_[0]->{pid}] $_[1]\n" }
197              
198             sub _maybe_terminate {
199 65     65   422 my ($self, $pending_event) = @_;
200 65         453 delete $self->{$pending_event};
201 65 100 66     953 return if $self->{wait_eof} or $self->{wait_sigchld};
202              
203 32         411 delete $self->{stdin_write};
204 32         2901 delete $self->{stdout_read};
205              
206 32         312 my @errors;
207 32         208 for my $cb (@{$self->subscribers('close')}, @{$self->subscribers('finish')}) {
  32         635  
  32         717  
208 38 100       1754 push @errors, $@ unless eval { $self->$cb(@$self{qw(exit_value signal)}); 1 };
  38         405  
  37         8286  
209             }
210              
211 32         1807 $self->emit(error => $_) for @errors;
212             }
213              
214             sub _on_read_cb {
215 34     34   204 my ($self, $args, $name) = @_;
216 34 100       546 my @emit = $args->{stderr} ? ($name) : qw(read);
217 53     53   161476 return sub { $self->emit(@emit => $_[1]) }
218 34         1055 unless DEBUG;
219              
220 0         0 my $NAME = uc $name;
221             return sub {
222 0     0   0 $self->_d(sprintf ">>> RWF:$NAME\n%s", term_escape $_[1]) if DEBUG;
223 0         0 $self->emit(@emit => $_[1]);
224 0         0 };
225             }
226              
227             sub _pipe {
228 24     24   42 my $self = shift;
229 24         35 my @rw;
230 24 50       869 pipe $rw[READ], $rw[WRITE] or return $self->emit(error => "pipe: $!");
231 24         242 $rw[WRITE]->autoflush(1);
232 24         1072 return @rw;
233             }
234              
235             sub _sigchld {
236 32     32   281 my ($self, $status, $pid) = @_;
237 32         274 my ($exit_value, $signal) = ($status >> 8, $status & 127);
238 32         114 $self->_d("Exit exit_value=$exit_value, signal=$signal") if DEBUG;
239 32         383 @$self{qw(exit_value signal)} = ($exit_value, $signal);
240 32         254 $self->_maybe_terminate('wait_sigchld');
241             }
242              
243             sub _stream {
244 34     34   708 my ($self, $args, $name, $handle) = @_;
245 34         2629 my $stream = Mojo::IOLoop::Stream->new($handle)->timeout(0);
246 34 50   22   16225 $stream->on(error => sub { $! != EIO && $self->emit(error => "Read error: $_[1]") });
  22         57929  
247 34     33   1918 $stream->on(close => sub { $self->_maybe_terminate('wait_eof') });
  33         21554  
248 34         889 $stream->on(read => $self->_on_read_cb($args, $name));
249 34         825 $self->ioloop->stream($stream);
250             }
251              
252             sub _write {
253 35     35   1702 my $self = shift;
254              
255 35 100       5753 return unless length $self->{stdin_buffer};
256 6         36 my $stdin_write = $self->{stdin_write};
257 6         221 my $written = $stdin_write->syswrite($self->{stdin_buffer});
258 6 50       300 return $self->_error unless defined $written;
259 6         49 my $chunk = substr $self->{stdin_buffer}, 0, $written, '';
260 6         42 $self->_d(sprintf "<<< RWF\n%s", term_escape $chunk) if DEBUG;
261              
262 6 50       33 if (length $self->{stdin_buffer}) {
263              
264             # This is one ugly hack because it does not seem like IO::Pty play
265             # nice with Mojo::Reactor(::EV) ->io(...) and ->watch(...)
266 0 0   0   0 $self->ioloop->timer(0.01 => sub { $self and $self->_write });
  0         0  
267             }
268             else {
269 6         62 $self->emit('drain');
270             }
271             }
272              
273             1;
274              
275             =encoding utf8
276              
277             =head1 NAME
278              
279             Mojo::IOLoop::ReadWriteFork - Fork a process and read/write from it
280              
281             =head1 VERSION
282              
283             2.00
284              
285             =head1 SYNOPSIS
286              
287             my $fork = Mojo::IOLoop::ReadWriteFork->new;
288              
289             # Emitted if something terrible happens
290             $fork->on(error => sub { my ($fork, $error) = @_; warn $error; });
291              
292             # Emitted when the child completes
293             $fork->on(finish => sub { my ($fork, $exit_value, $signal) = @_; Mojo::IOLoop->stop; });
294              
295             # Emitted when the child prints to STDOUT or STDERR
296             $fork->on(read => sub {
297             my ($fork, $buf) = @_;
298             print qq(Child process sent us "$buf");
299             });
300              
301             # Need to set "conduit" for bash, ssh, and other programs that require a pty
302             $fork->conduit({type => "pty"});
303              
304             # Start the application
305             $fork->run("bash", -c => q(echo $YIKES foo bar baz));
306              
307             # Using promises
308             $fork->on(read => sub { ... });
309             $fork->run_p("bash", -c => q(echo $YIKES foo bar baz))->wait;
310              
311             See also
312             L
313             for an example usage from a L.
314              
315             =head1 DESCRIPTION
316              
317             This class enable you to fork a child process and L and L data
318             to. You can also L to the child and see when the process
319             ends. The child process can be an external program (bash, telnet, ffmpeg, ...)
320             or a CODE block running perl.
321              
322             L that
323             enable the L event to see the difference between STDERR and STDOUT are
324             more than welcome.
325              
326             =head1 EVENTS
327              
328             =head2 stderr
329              
330             $fork->on(stderr => sub { my ($fork, $buf) = @_; });
331              
332             Emitted when the child has written a chunk of data to STDERR and L
333             has the "stderr" key set to a true value.
334              
335             =head2 stdout
336              
337             $fork->on(stdout => sub { my ($fork, $buf) = @_; });
338              
339             Emitted when the child has written a chunk of data to STDOUT and L
340             has the "stdout" key set to a true value.
341              
342             =head2 asset
343              
344             $fork->on(asset => sub { my ($fork, $asset) = @_; });
345              
346             Emitted at least once when calling L. C<$asset> can be
347             either a L or L object.
348              
349             $fork->on(asset => sub {
350             my ($fork, $asset) = @_;
351             # $asset->auto_upgrade(1) is set by default
352             $asset->max_memory_size(1) if $asset->can('max_memory_size');
353             });
354              
355             =head2 error
356              
357             $fork->on(error => sub { my ($fork, $str) = @_; });
358              
359             Emitted when when the there is an issue with creating, writing or reading
360             from the child process.
361              
362             =head2 drain
363              
364             $fork->on(drain => sub { my ($fork) = @_; });
365              
366             Emitted when the buffer has been written to the sub process.
367              
368             =head2 finish
369              
370             $fork->on(finish => sub { my ($fork, $exit_value, $signal) = @_; });
371              
372             Emitted when the child process exit.
373              
374             =head2 read
375              
376             $fork->on(read => sub { my ($fork, $buf) = @_; });
377              
378             Emitted when the child has written a chunk of data to STDOUT or STDERR, and
379             neither "stderr" nor "stdout" is set in the L.
380              
381             =head2 spawn
382              
383             $fork->on(spawn => sub { my ($fork) = @_; });
384              
385             Emitted after C has been called. Note that the child process might not yet have
386             been started. The order of things is impossible to say, but it's something like this:
387              
388             .------.
389             | fork |
390             '------'
391             |
392             ___/ \_______________
393             | |
394             | (parent) | (child)
395             .--------------. |
396             | emit "spawn" | .--------------------.
397             '--------------' | set up filehandles |
398             '--------------------'
399             |
400             .---------------.
401             | exec $program |
402             '---------------'
403              
404             See also L for example usage of this event.
405              
406             =head2 start
407              
408             $fork->on(start => sub { my ($fork, $pipes) = @_; });
409              
410             Emitted right before the child process is forked. Example C<$pipes>
411              
412             $pipes = {
413             # if "stderr" is set in conduit()
414             stdin_write => $stderr_fh_w,
415             stdout_read => $stderr_fh_r,
416              
417             # for both conduit "pipe" and "pty"
418             stdin_write => $pipe_fh_r_or_pty_object,
419             stdout_read => $pipe_fh_w_or_pty_object,
420              
421             # only for conduit "pipe"
422             stdin_read => $pipe_fh_r,
423             stdout_write => $pipe_fh_w,
424             }
425              
426             =head1 ATTRIBUTES
427              
428             =head2 conduit
429              
430             $hash = $fork->conduit;
431             $fork = $fork->conduit(\%options);
432              
433             Used to set the conduit options. Possible values are:
434              
435             =over 2
436              
437             =item * stderr
438              
439             This will make L emit "stderr" events, instead of
440             "read" events. Setting this to "0" will close STDERR in the child.
441              
442             =item * stdout
443              
444             This will make L emit "stdout" events, instead of
445             "read" events. Setting this to "0" will close STDOUT in the child.
446              
447             =item * raw
448              
449             Calls L if "typ" is "pty".
450              
451             =item * type
452              
453             "type" can be either "pipe" or "pty". Default value is "pipe".
454              
455             =back
456              
457             =head2 ioloop
458              
459             $ioloop = $fork->ioloop;
460             $fork = $fork->ioloop(Mojo::IOLoop->singleton);
461              
462             Holds a L object.
463              
464             =head2 pid
465              
466             $int = $fork->pid;
467              
468             Holds the child process ID. Note that L will start the process after
469             the IO loop is started. This means that the code below will not work:
470              
471             $fork->run("bash", -c => q(echo $YIKES foo bar baz));
472             warn $fork->pid; # pid() is not yet set
473              
474             This will work though:
475              
476             $fork->on(fork => sub { my $fork = shift; warn $fork->pid });
477             $fork->run("bash", -c => q(echo $YIKES foo bar baz));
478              
479             =head1 METHODS
480              
481             =head2 close
482              
483             $fork = $fork->close("stdin");
484              
485             Close STDIN stream to the child process immediately.
486              
487             =head2 run
488              
489             $fork = $fork->run($program, @program_args);
490             $fork = $fork->run(\&Some::Perl::function, @function_args);
491              
492             Simpler version of L. Can either start an application or run a perl
493             function.
494              
495             =head2 run_and_capture_p
496              
497             $p = $fork->run_and_capture_p(...)->then(sub { my $asset = shift });
498              
499             L takes the same arguments as L, but the
500             fullfillment callback will receive a L object that holds the
501             output from the command.
502              
503             See also the L event.
504              
505             =head2 run_p
506              
507             $p = $fork->run_p($program, @program_args);
508             $p = $fork->run_p(\&Some::Perl::function, @function_args);
509              
510             Promise based version of L. The L will be resolved on
511             L and rejected on L.
512              
513             =head2 start
514              
515             $fork = $fork->start(\%args);
516              
517             Used to fork and exec a child process. C<%args> can have:
518              
519             =over 2
520              
521             =item * program
522              
523             Either an application or a CODE ref.
524              
525             =item * program_args
526              
527             A list of options passed on to L or as input to the CODE ref.
528              
529             Note that this module will start L with this code:
530              
531             exec $program, @$program_args;
532              
533             This means that the code is subject for
534             L
535             unless invoked with more than one argument. This is considered a feature, but
536             something you should be avare of. See also L for more details.
537              
538             =item * env
539              
540             Passing in C will override the default set of environment variables,
541             stored in C<%ENV>.
542              
543             =item * conduit
544              
545             Either "pipe" (default) or "pty". "pty" will use L to simulate a
546             "pty", while "pipe" will just use L. This can also be specified
547             by using the L attribute.
548              
549             =item * clone_winsize_from
550              
551             See L. This only makes sense if L is set
552             to "pty". This can also be specified by using the L attribute.
553              
554             =item * raw
555              
556             See L. This only makes sense if L is set to "pty".
557             This can also be specified by using the L attribute.
558              
559             =back
560              
561             =head2 write
562              
563             $fork = $fork->write($chunk);
564             $fork = $fork->write($chunk, $cb);
565              
566             Used to write data to the child process STDIN. An optional callback will be
567             called once STDIN is drained.
568              
569             Example:
570              
571             $fork->write("some data\n", sub { shift->close });
572              
573             =head2 kill
574              
575             $bool = $fork->kill;
576             $bool = $fork->kill(15); # default
577              
578             Used to signal the child.
579              
580             =head1 SEE ALSO
581              
582             L.
583              
584             L
585              
586             =head1 COPYRIGHT AND LICENSE
587              
588             Copyright (C) 2013-2016, Jan Henning Thorsen
589              
590             This program is free software, you can redistribute it and/or modify it under
591             the terms of the Artistic License version 2.0.
592              
593             =head1 AUTHOR
594              
595             Jan Henning Thorsen - C
596              
597             =cut