File Coverage

lib/Mojo/IOLoop/ReadWriteProcess.pm
Criterion Covered Total %
statement 361 380 95.0
branch 194 232 83.6
condition 62 112 55.3
subroutine 70 70 100.0
pod 26 28 92.8
total 713 822 86.7


line stmt bran cond sub pod time code
1             package Mojo::IOLoop::ReadWriteProcess;
2              
3             our $VERSION = '1.1.0';
4              
5 40     40   13703754 use Mojo::Base 'Mojo::EventEmitter';
  40         51178  
  40         367  
  9         151  
  10         62  
  10         77  
6 40     40   102921 use Mojo::File 'path';
  40         720936  
  40         2847  
  10         109  
  2         22  
  2         10  
7 40     40   277 use Mojo::Util qw(b64_decode b64_encode scope_guard);
  40         74  
  40         1973  
  2         9  
  10         74  
  10         79  
8 40     40   35560 use Mojo::IOLoop::Stream;
  40         6550976  
  40         280  
  10         67  
  10         490  
  10         10545  
9              
10 40     40   26400 use Mojo::IOLoop::ReadWriteProcess::Exception;
  40         150  
  40         374  
  2         30  
  1         18  
  0            
11 40     40   21009 use Mojo::IOLoop::ReadWriteProcess::Pool;
  40         161  
  40         2659  
  0            
  0            
  0            
12 40     40   20138 use Mojo::IOLoop::ReadWriteProcess::Queue;
  40         122  
  40         313  
  0            
  0            
  0            
13 40     40   1948 use Mojo::IOLoop::ReadWriteProcess::Session;
  40         382  
  40         3066  
  0            
  0            
  0            
14              
15 40     40   20905 use Mojo::IOLoop::ReadWriteProcess::Shared::Lock;
  40         131  
  40         2294  
  0            
16 40     40   21190 use Mojo::IOLoop::ReadWriteProcess::Shared::Memory;
  40         143  
  40         2392  
17 40     40   310 use Mojo::IOLoop::ReadWriteProcess::Shared::Semaphore;
  40         243  
  40         2013  
18              
19 40     40   254 use B::Deparse;
  40         118  
  40         1992  
20 40     40   199 use Carp 'confess';
  40         63  
  40         2372  
21 40     40   324 use IO::Handle;
  40         73  
  40         1846  
22 40     40   21800 use IO::Pipe;
  40         58837  
  40         1581  
23 40     40   19420 use IO::Select;
  40         63423  
  40         2568  
24 40     40   21588 use IPC::Open3;
  40         130383  
  40         3091  
25 40     40   309 use Time::HiRes 'sleep';
  40         78  
  40         713  
26 40     40   2937 use Symbol 'gensym';
  40         82  
  40         1818  
27 40     40   227 use Storable;
  40         74  
  40         2701  
28 40     40   253 use POSIX qw( :sys_wait_h :signal_h );
  40         104  
  40         308  
29             our @EXPORT_OK
30             = (qw(parallel batch process pool queue), qw(shared_memory lock semaphore));
31 40     40   17754 use Exporter 'import';
  40         140  
  40         2162  
32              
33 40     40   217 use constant DEBUG => $ENV{MOJO_PROCESS_DEBUG};
  40         69  
  40         270912  
34              
35             has [
36             qw(kill_sleeptime sleeptime_during_kill),
37             qw(separate_err autoflush set_pipes verbose),
38             qw(internal_pipes channels)
39             ] => 1;
40              
41             has [qw(blocking_stop serialize quirkiness total_sleeptime_during_kill)] => 0;
42              
43             has [
44             qw(execute code process_id pidfile return_status),
45             qw(channel_in channel_out write_stream read_stream error_stream),
46             qw(_internal_err _internal_return _status args)
47             ];
48              
49             has max_kill_attempts => 5;
50             has kill_whole_group => 0;
51              
52             has error => sub { Mojo::Collection->new };
53              
54             has ioloop => sub { Mojo::IOLoop->singleton };
55             has session => sub { Mojo::IOLoop::ReadWriteProcess::Session->singleton };
56              
57             has _deparse => sub { B::Deparse->new }
58             if DEBUG;
59             has _deserialize => sub { \&Storable::thaw };
60             has _serialize => sub { \&Storable::freeze };
61             has _default_kill_signal => POSIX::SIGTERM;
62             has _default_blocking_signal => POSIX::SIGKILL;
63              
64             # Override new() just to support sugar syntax
65             # so it is possible to do : process->new(sub{ print "Hello World\n" })->start->stop; and so on.
66             sub new {
67 379 100   379 1 779217 push(@_, code => splice @_, 1, 1) if ref $_[1] eq "CODE";
    100     1    
68 379         4360 return shift->SUPER::new(@_);
69             }
70              
71             sub to_ioloop {
72 27     27 1 395278 my $self = shift;
73 27 100       1086 confess 'Pipes needs to be set!' unless $self->read_stream;
    100          
74 2         9 my $stream = Mojo::IOLoop::Stream->new($self->read_stream)->timeout(0);
75 2         701 $self->ioloop->stream($stream);
76 2         554 my $me = $$;
77             $stream->on(
78             close => sub {
79 2 50   2   268 return unless $$ == $me;
    100          
80 2 50       12 $self->_collect->stop unless defined $self->_status;
    100          
81 2         40 });
82 2         8 return $stream;
83             }
84              
85 179     179 1 2005052 sub process { __PACKAGE__->new(@_) }
86 6     6 1 10445 sub batch { Mojo::IOLoop::ReadWriteProcess::Pool->new(@_) }
87 6     6 1 372828 sub queue { Mojo::IOLoop::ReadWriteProcess::Queue->new(@_) }
88 1     1 1 10 sub lock { Mojo::IOLoop::ReadWriteProcess::Shared::Lock->new(@_) }
89 1     1 0 105 sub semaphore { Mojo::IOLoop::ReadWriteProcess::Shared::Semaphore->new(@_) }
90 1     1 0 3 sub shared_memory { Mojo::IOLoop::ReadWriteProcess::Shared::Memory->new(@_) }
91              
92             sub parallel {
93 3     3 1 254941 my $c = batch();
94 3         51 $c->add(@_) for 1 .. +pop();
95 3         23 return $c;
96             }
97              
98             sub _diag {
99 4     4   1536 my ($self, @messages) = @_;
100 4         106 my $caller = (caller(1))[3];
101 4 50       65 print STDERR ">> ${caller}(): @messages\n" if (DEBUG || $self->verbose);
102             }
103              
104             sub _open_collect_status {
105 69     69   1963 my ($self, $pid, $e, $errno) = @_;
106              
107 69 50       393 return unless $self;
    50          
108              
109 69 50 33     11624 $self->_status($e // $?) unless defined $self->_status;
    0 100        
110 28         576 $self->_diag("Forked code Process Exit status: " . $self->exit_status)
111             if DEBUG;
112              
113 28         233 $self->_clean_pidfile;
114              
115 28         1161 return $self;
116             }
117              
118             # Use open3 to launch external program.
119             sub _open {
120 51     51   793 my ($self, @args) = @_;
121 51         123 $self->_diag('Execute: ' . (join ', ', map { "'$_'" } @args)) if DEBUG;
122              
123 51         531 $self->on(collect_status => \&_open_collect_status);
124              
125 51         868 my ($wtr, $rdr, $err);
126 51         473 $err = gensym;
127 51 100       1688 my $pid = open3($wtr, $rdr, ($self->separate_err) ? $err : undef, @args);
    0          
128              
129 44 100       544727 die "Cannot create pipe: $!" unless defined $pid;
130 43         964 $self->process_id($pid);
131              
132             # Defered collect of return status and removal of pidfile
133              
134 43 100       1075 return $self unless $self->set_pipes();
135              
136 40         2067 $self->read_stream(IO::Handle->new_from_fd($rdr, "r"));
137 40         9348 $self->write_stream(IO::Handle->new_from_fd($wtr, "w"));
138 40 100       5721 $self->error_stream(($self->separate_err)
139             ? IO::Handle->new_from_fd($err, "r")
140             : $self->write_stream);
141              
142 40         3971 return $self;
143             }
144              
145 495 100   495   1996 sub _clean_pidfile { unlink(shift->pidfile) if $_[0]->pidfile }
146              
147             sub _collect {
148 23     18   93 my ($self, $pid) = @_;
149 8   33     123 $pid //= $self->pid;
      66        
150              
151 6         94 $self->session->consume_collected_info;
152             $self->session->_protect(
153             sub {
154 6     1   122 local $?;
155 6 50       168 waitpid $pid, 0 unless defined $self->_status;
156 5 50       45874 return $self->_open_collect_status($pid) if $self->execute;
157 5 50       228 return $self->_fork_collect_status($pid) if $self->code;
158 6         84 });
159              
160 5         150 $self;
161             }
162              
163             sub _fork_collect_status {
164 244     240   11996 my ($self, $pid, $e, $errno) = @_;
165              
166 244 100       2261 return unless $self;
167              
168 243         2008 my $return_reader;
169             my $internal_err_reader;
170 243         722 my $rt;
171 239         78 my @result_error;
172              
173 239 50 100     1609 $self->_status($e // $?) unless defined $self->_status;
      100        
174 239         3482 $self->_diag("Forked code Process Exit status: " . $self->exit_status)
175             if DEBUG;
176              
177 239 100       937 if ($self->_internal_return) {
178 184 50       1567 $return_reader
179             = $self->_internal_return->isa("IO::Pipe::End")
180             ? $self->_internal_return
181             : $self->_internal_return->reader();
182 184 50 0     25584 $self->_new_err('Cannot read from return code pipe') && return
      33        
183             unless IO::Select->new($return_reader)->can_read(10);
184 184         17649562 $rt = $return_reader->getline();
185 184         1387 $self->_diag("Forked code Process Returns: " . ($rt ? $rt : 'nothing'))
186             if DEBUG;
187             $self->return_status(
188 184 100       1536 $self->serialize ? eval { $self->_deserialize->(b64_decode($rt)) }
  2 100       106  
189             : $rt ? $rt
190             : ());
191             }
192 239 100       6866 if ($self->_internal_err) {
193 185 100       1521 $internal_err_reader
194             = $self->_internal_err->isa("IO::Pipe::End")
195             ? $self->_internal_err
196             : $self->_internal_err->reader();
197 185 100 50     23894 $self->_new_err('Cannot read from errors code pipe') && return
      66        
198             unless IO::Select->new($internal_err_reader)->can_read(10);
199 183         30917 @result_error = $internal_err_reader->getlines();
200             push(
201 60         341 @{$self->error},
202 183 100       759 map { Mojo::IOLoop::ReadWriteProcess::Exception->new($_) } @result_error
  60         2111  
203             ) if @result_error;
204 183         312 $self->_diag("Forked code Process Errors: " . join("\n", @result_error))
205             if DEBUG;
206             }
207              
208 237         1445 $self->_clean_pidfile;
209 237         2162 return $self;
210             }
211              
212             # Handle forking of code
213             sub _fork {
214 213     214   2019 my ($self, $code, @args) = @_;
215 213 100       1159 die "Can't spawn child without code" unless ref($code) eq "CODE";
216              
217             # STDIN/STDOUT/STDERR redirect.
218 211         635 my ($input_pipe, $output_pipe, $output_err_pipe);
219              
220             # Separated handles that could be used for internal comunication.
221 211         0 my ($channel_in, $channel_out);
222              
223              
224 211 100       1165 if ($self->set_pipes) {
225 162 100       2855 $input_pipe = IO::Pipe->new()
226             or $self->_new_err('Failed creating input pipe');
227 162 100       36792 $output_pipe = IO::Pipe->new()
228             or $self->_new_err('Failed creating output pipe');
229 162 100       16416 $output_err_pipe = IO::Pipe->new()
230             or $self->_new_err('Failed creating output error pipe');
231 162 100       14528 if ($self->channels) {
232 162 100       3069 $channel_in = IO::Pipe->new()
233             or $self->_new_err('Failed creating Channel input pipe');
234 162 100       15782 $channel_out = IO::Pipe->new()
235             or $self->_new_err('Failed creating Channel output pipe');
236             }
237             }
238 211 100       14488 if ($self->internal_pipes) {
239 209 100       3892 my $internal_err = IO::Pipe->new()
240             or $self->_new_err('Failed creating internal error pipe');
241 204 100       27292 my $internal_return = IO::Pipe->new()
242             or $self->_new_err('Failed creating internal return pipe');
243              
244             # Internal pipes to retrieve error/return
245 209         22490 $self->_internal_err($internal_err);
246 209         4736 $self->_internal_return($internal_return);
247             }
248              
249             # Defered collect of return status
250              
251 211         4189 $self->on(collect_status => \&_fork_collect_status);
252              
253 211         3679 $self->_diag("Fork: " . $self->_deparse->coderef2text($code)) if DEBUG;
254              
255 211         731517 my $pid = fork;
256 206 100       9849 die "Cannot fork: $!" unless defined $pid;
257              
258 206 100       2936 if ($pid == 0) {
259 25         4581 local $SIG{CHLD};
260 25     19   4368 local $SIG{TERM} = sub { $self->emit('SIG_TERM')->_exit(1) };
  5         76  
261              
262 20         962 my $return;
263             my $internal_err;
264              
265 20 50       1715 if ($self->internal_pipes) {
266 20 100       1849 if ($self->_internal_err) {
267 19 50       1406 $internal_err
268             = $self->_internal_err->isa("IO::Pipe::End")
269             ? $self->_internal_err
270             : $self->_internal_err->writer();
271 19         7024 $internal_err->autoflush(1);
272             }
273              
274 20 100       6118 if ($self->_internal_return) {
275 19 50       976 $return
276             = $self->_internal_return->isa("IO::Pipe::End")
277             ? $self->_internal_return
278             : $self->_internal_return->writer();
279 19         2336 $return->autoflush(1);
280             }
281             else {
282 1         19 eval { $internal_err->write("Can't setup return status pipe") };
  1         37  
283             }
284             }
285              
286             # Set pipes to redirect STDIN/STDOUT/STDERR + channels if desired
287 20 100       1515 if ($self->set_pipes()) {
288 19         624 my $stdout;
289             my $stderr;
290 19         0 my $stdin;
291              
292 19 100       619 $stdout = $output_pipe->writer() if $output_pipe;
293 19 100       1743 $stderr
    100          
294             = (!$self->separate_err) ? $stdout
295             : $output_err_pipe ? $output_err_pipe->writer()
296             : undef;
297 19 100       2401 $stdin = $input_pipe->reader() if $input_pipe;
298 19 100 33     3100 open STDERR, ">&", $stderr
      33        
      0        
299             or !!$internal_err->write($!)
300             or $self->_diag($!)
301             if $stderr;
302 19 100 33     1029 open STDOUT, ">&", $stdout
      33        
303             or !!$internal_err->write($!)
304             or $self->_diag($!)
305             if $stdout;
306 19 100 33     963 open STDIN, ">&", $stdin
      33        
307             or !!$internal_err->write($!)
308             or $self->_diag($!)
309             if $stdin;
310              
311 19         361 $self->read_stream($stdin);
312 19         676 $self->error_stream($stderr);
313 19         531 $self->write_stream($stdout);
314 19 50       490 if ($self->channels) {
315              
316 19 100       636 $self->channel_in($channel_in->reader) if $channel_in;
317 19 100       1709 $self->channel_out($channel_out->writer) if $channel_out;
318 38         2106 eval { $self->$_->autoflush($self->autoflush) }
319 19         1232 for qw( channel_in channel_out );
320             }
321 57         2105 eval { $self->$_->autoflush($self->autoflush) }
322 19         921 for qw(read_stream error_stream write_stream );
323             }
324 20         1219 $self->session->reset;
325 20         11097 $self->session->subreaper(0); # Subreaper bit does not persist in fork
326 20         5416 $self->process_id($$);
327 20         708 $! = 0;
328 20         84 my $rt;
329 20         219 eval { $rt = [$code->($self, @args)]; };
  20         361  
330 0 0       0 if ($internal_err) {
331 0 0       0 $internal_err->write($@) if $@;
332 0 0 0     0 $internal_err->write($!) if !$@ && $!;
333             }
334 0 0 0     0 $rt = @$rt[0]
      0        
335             if !$self->serialize && ref $rt eq 'ARRAY' && scalar @$rt == 1;
336 0 0 0     0 $rt = b64_encode(eval { $self->_serialize->($rt) })
  0         0  
337             if $self->serialize && $return;
338 0 0       0 $return->write($rt) if $return;
339 0   0     0 $self->_exit($@ // $!);
340             }
341 186         23381 $self->process_id($pid);
342              
343 186 100       21682 return $self unless $self->set_pipes();
344              
345 138 100       10453 $self->read_stream($output_pipe->reader) if $output_pipe;
346 138 100       56211 $self->error_stream((!$self->separate_err) ? $self->read_stream()
    100          
347             : $output_err_pipe ? $output_err_pipe->reader()
348             : undef);
349 138 100       21680 $self->write_stream($input_pipe->writer) if $input_pipe;
350              
351 138 50       16353 if ($self->set_pipes) {
352 138 50       1885 if ($self->channels) {
353 138 100       1544 $self->channel_in($channel_in->writer) if $channel_in;
354 138 100       13211 $self->channel_out($channel_out->reader) if $channel_out;
355 276         44192 eval { $self->$_->autoflush($self->autoflush) }
356 138         13898 for qw( channel_in channel_out );
357             }
358 414         13316 eval { $self->$_->autoflush($self->autoflush) }
359 138         8687 for qw(read_stream error_stream write_stream );
360             }
361              
362 138         9558 return $self;
363             }
364              
365             sub _new_err {
366 43     26   425 my $self = shift;
367 43         305 my $err = Mojo::IOLoop::ReadWriteProcess::Exception->new(@_);
368 42         94 push(@{$self->error}, $err);
  42         94  
369              
370             # XXX: Need to switch, we should emit one error at the time, and _shutdown
371             # should emit just the ones wasn't emitted
372 42         359 return $self->emit(process_error => [$err]);
373             }
374              
375             sub _exit {
376 16   0 1   351 my $code = shift // 0;
377 16         15037 eval { POSIX::_exit($code); };
  16         1787  
378 16         1752 exit($code);
379             }
380              
381             sub wait {
382 131     116 1 7031 my $self = shift;
383 131         3065 sleep $self->sleeptime_during_kill while ($self->is_running);
384 132         3662 return $self;
385             }
386              
387 110     96 1 9053 sub wait_stop { shift->wait->stop }
388 41 100   19 1 13406 sub errored { !!@{shift->error} ? 1 : 0 }
  31         3014  
389              
390             # PPC64: Treat msb on neg (different cpu/perl interpreter version)
391 21 100   6   985 sub _st { my $st = shift >> 8; ($st & 0x80) ? (0x100 - ($st & 0xFF)) : $st }
  23         444  
392              
393             sub exit_status {
394 54 100 66 37 1 16828 defined $_[0]->_status && $_[0]->quirkiness ? _st(shift->_status)
    100          
395             : defined $_[0]->_status ? shift->_status >> 8
396             : undef;
397             }
398              
399             sub restart {
400 51 100   34 1 165518 $_[0]->is_running ? $_[0]->stop->start : $_[0]->start;
401             }
402              
403             sub is_running {
404 1512     1495 1 66618250 my ($self) = shift;
405 1512         7703 $self->session->consume_collected_info;
406 1495 100       18787 return 0 unless my $pid = $self->process_id;
407 1254 100       12383 kill(0, ($self->kill_whole_group ? (-$pid, $pid) : ($pid)));
408             }
409              
410             sub write_pidfile {
411 284     284 1 1002021 my ($self, $pidfile) = @_;
412 284 100       1445 $self->pidfile($pidfile) if $pidfile;
413 284 100       1631 return unless $self->pid;
414 272 100       3038 return unless $self->pidfile;
415              
416 6         192 path($self->pidfile)->spew($self->pid);
417 6         3753 return $self;
418             }
419              
420             # Convenience functions
421             sub _syswrite {
422 19     19   256 my $stream = shift;
423 19 100       341 return unless $stream;
424 19         277 $stream->syswrite($_ . "\n") for @_;
425             }
426              
427             sub _getline {
428 92 100   83   2588 return unless IO::Select->new($_[0])->can_read(10);
429 76         100389 shift->getline;
430             }
431              
432             sub _getlines {
433 14 100   7   245 return unless IO::Select->new($_[0])->can_read(10);
434 7 100       1007 wantarray ? shift->getlines : join '', @{[shift->getlines]};
  5         2916591  
435             }
436              
437             # Write to the controlled-process STDIN
438             sub write_stdin {
439 21     21 1 311 my ($self, @data) = @_;
440 21         1534 _syswrite($self->write_stream, @data);
441 21         915 return $self;
442             }
443              
444             sub write_channel {
445 1     1 1 24 my ($self, @data) = @_;
446 1         18 _syswrite($self->channel_in, @data);
447 1         30 return $self;
448             }
449              
450             # Get all lines from the current process output stream
451 7     7 1 2862 sub read_all_stdout { _getlines(shift->read_stream) }
452              
453             # Get all lines from the process channel
454 7     6 1 1773 sub read_all_channel { _getlines(shift->channel_out); }
455 41     40 1 4047 sub read_stdout { _getline(shift->read_stream) }
456 21     21 1 2069 sub read_channel { _getline(shift->channel_out) }
457              
458             sub read_all_stderr {
459 3 100   3 1 45 return $_[0]->getline unless $_[0]->separate_err;
460 1         22 _getlines(shift->error_stream);
461             }
462              
463             # Get a line from the current process output stream
464             sub read_stderr {
465 9 50   9 1 79 return $_[0]->getline unless $_[0]->separate_err;
466 9         144 _getline(shift->error_stream);
467             }
468              
469             sub start {
470 249     249 1 6337 my $self = shift;
471 249 50       1829 return $self if $self->is_running;
472 244 100 100     2661 die "Nothing to do" unless !!$self->execute || !!$self->code;
473              
474             my @args
475             = defined($self->args)
476             ? ref($self->args) eq "ARRAY"
477 242 100       5025 ? @{$self->args}
  8 100       119  
478             : $self->args
479             : ();
480              
481 242 100       2817 $self->session->enable_subreaper if $self->subreaper;
482 242         2910 $self->_status(undef);
483 242         2286 $self->session->enable;
484              
485             {
486 242         1306 my $old_emit_from_sigchld = $self->session->emit_from_sigchld;
  242         644  
487 242         1986 $self->session->emit_from_sigchld(0);
488             my $scope_guard = scope_guard sub {
489 231     260   9045632 $self->session->emit_from_sigchld($old_emit_from_sigchld);
490 231 100       6826 $self->session->consume_collected_info if ($old_emit_from_sigchld);
491 242         8597 };
492              
493 242 100       11044 if ($self->code) {
    50          
494 206         1231 $self->_fork($self->code, @args);
495             }
496             elsif ($self->execute) {
497 36         467 $self->_open($self->execute, @args);
498             }
499              
500 238         14359 $self->write_pidfile;
501 238         13369 $self->emit('start');
502 237         15826 $self->session->register($self->pid() => $self);
503             }
504 237         27159 return $self;
505             }
506              
507             sub send_signal {
508 286     241 1 4064 my $self = shift;
509 286   100     3156 my $signal = shift // $self->_default_kill_signal;
510 286   66     1750 my $pid = shift // $self->process_id;
511 285 100 100     2500 return unless $self->kill_whole_group || $self->is_running;
512 231         8522 $self->_diag("Sending signal '$signal' to $pid") if DEBUG;
513 257         22909 kill $signal => $pid;
514 273         10080 return $self;
515             }
516              
517             sub stop {
518 276     232 1 46728 my $self = shift;
519              
520 308         3382 my $pid = $self->pid;
521 276 100       3073 return $self unless defined $pid;
522 258 100       1471 return $self->_shutdown(1) unless $self->is_running;
523              
524 79         2452 my $ret;
525 79         2061 my $attempt = 1;
526 79   33     928 my $timeout = $self->total_sleeptime_during_kill // 0;
527 78         2177 my $sleep_time = $self->sleeptime_during_kill;
528 67         666 my $max_attempts = $self->max_kill_attempts;
529 61         560 my $signal = $self->_default_kill_signal;
530 78 100       1296 $pid = -$pid if $self->kill_whole_group;
531 78         1450 $self->_diag("Stopping $pid") if DEBUG;
532              
533 78   66     2436 until ((defined $ret && ($ret == $pid || $ret == -1))
      100        
      100        
      100        
534             || ($attempt > $max_attempts && $timeout <= 0))
535             {
536 254   100     4269 my $send_signal = $attempt == 1 || $timeout <= 0;
537 226         341 $self->_diag(
538             "attempt $attempt/$max_attempts to kill process: $pid, timeout: $timeout")
539             if DEBUG && $send_signal;
540             $self->session->_protect(
541             sub {
542 226     156   2202 local $?;
543 226 100       795 if ($send_signal) {
544 222         1429 $self->send_signal($signal, $pid);
545 222         769 ++$attempt;
546             }
547 226         1938 $ret = waitpid($pid, WNOHANG);
548 226 100 66     2593 $self->_status($?) if $ret == $pid || $ret == -1;
549 226         2281 });
550 226 100       2292 if ($sleep_time) {
551 226         2122966 sleep $sleep_time;
552 226         8152 $timeout -= $sleep_time;
553             }
554             }
555 50 100       534 return $self->_shutdown if defined $self->_status;
556              
557 22 50       386 sleep $self->kill_sleeptime if $self->kill_sleeptime;
558              
559 22 100       225608 if ($self->blocking_stop) {
560 16         758 $self->_diag("Could not kill process id: $pid, blocking attempt") if DEBUG;
561 16         320 $self->emit('process_stuck');
562              
563             ### XXX: avoid to protect on blocking.
564 16         536 $self->send_signal($self->_default_blocking_signal, $pid);
565 16         10056 $ret = waitpid($pid, 0);
566 16 100 100     417 $self->_status($?) if $ret == $pid || $ret == -1;
567 16         814 return $self->_shutdown;
568             }
569             else {
570 6         159 $self->_diag("Could not kill process id: $pid") if DEBUG;
571 6         78 $self->_new_err('Could not kill process');
572             }
573 6         105 return $self;
574             }
575              
576             sub _shutdown {
577 230     222   3225 my ($self, $wait) = @_;
578 230 100       888 return $self unless $self->pid;
579              
580 230         1392 $self->_diag("Shutdown " . $self->pid) if DEBUG;
581             $self->session->_protect(
582             sub {
583 56     63   239 local $?;
584 56         154 waitpid $self->pid, 0;
585 56         491 $self->emit('collect_status');
586 230 100 100     1431 }) if $wait && !defined $self->_status;
587              
588 223 100       1592 $self->emit('collect_status') unless defined $self->_status;
589 223         3004 $self->_clean_pidfile;
590 223 100 100     3132 $self->emit('process_error', $self->error)
591             if $self->error && $self->error->size > 0;
592 223         13511 $self->unsubscribe('collect_status');
593              
594 223         3681 return $self->emit('stop');
595             }
596              
597             # General alias
598             *pid = \&process_id;
599             *died = \&_errored;
600             *failed = \&_errored;
601             *diag = \&_diag;
602             *pool = \&batch;
603             *signal = \&send_signal;
604             *prctl = \&Mojo::IOLoop::ReadWriteProcess::Session::_prctl;
605             *subreaper = \&Mojo::IOLoop::ReadWriteProcess::Session::subreaper;
606              
607             *enable_subreaper = \&Mojo::IOLoop::ReadWriteProcess::Session::enable_subreaper;
608             *disable_subreaper
609             = \&Mojo::IOLoop::ReadWriteProcess::Session::disable_subreaper;
610             *_get_prctl_syscall
611             = \&Mojo::IOLoop::ReadWriteProcess::Session::_get_prctl_syscall;
612              
613             # Aliases - write
614             *write = \&write_stdin;
615             *stdin = \&write_stdin;
616             *channel_write = \&write_channel;
617              
618             # Aliases - read
619             *read = \&read_stdout;
620             *stdout = \&read_stdout;
621             *getline = \&read_stdout;
622             *stderr = \&read_stderr;
623             *err_getline = \&read_stderr;
624             *channel_read = \&read_channel;
625             *read_all = \&read_all_stdout;
626             *getlines = \&read_all_stdout;
627             *stderr_all = \&read_all_stderr;
628             *err_getlines = \&read_all_stderr;
629             *channel_read_all = \&read_all_channel;
630              
631             # Aliases - IO::Handle
632             *stdin_handle = \&write_stream;
633             *stdout_handle = \&read_stream;
634             *stderr_handle = \&error_stream;
635             *channe_write_handle = \&channel_in;
636             *channel_read_handle = \&channel_out;
637              
638             1;
639              
640              
641             =encoding utf-8
642              
643             =head1 NAME
644              
645             Mojo::IOLoop::ReadWriteProcess - Execute external programs or internal code blocks as separate process.
646              
647             =head1 SYNOPSIS
648              
649             use Mojo::IOLoop::ReadWriteProcess;
650              
651             # Code fork
652             my $process = Mojo::IOLoop::ReadWriteProcess->new(sub { print "Hello\n" });
653             $process->start();
654             print "Running\n" if $process->is_running();
655             $process->getline(); # Will return "Hello\n"
656             $process->pid(); # Process id
657             $process->stop();
658             $process->wait_stop(); # if you intend to wait its lifespan
659              
660             # Methods can be chained, thus this is valid:
661             use Mojo::IOLoop::ReadWriteProcess qw(process);
662             my $output = process( sub { print "Hello\n" } )->start()->wait_stop->getline;
663              
664             # Handles seamelessy also external processes:
665             my $process = process(execute=> '/path/to/bin' )->args([qw(foo bar baz)]);
666             $process->start();
667             my $line_output = $process->getline();
668             my $pid = $process->pid();
669             $process->stop();
670             my @errors = $process->error;
671              
672             # To help when debugging Mojo::Collections
673             use Mojo::Util qw(dumper);
674             my $errors = dumper $process->error->to_array;
675              
676             # Get process return value
677             $process = process( sub { return "256"; } )->start()->wait_stop;
678             # We need to stop it to retrieve the exit status
679             my $return = $process->return_status;
680              
681             # We can access directly to handlers from the object:
682             my $stdout = $process->read_stream;
683             my $stdin = $process->write_stream;
684             my $stderr = $process->error_stream;
685              
686             # So this works:
687             print $stdin "foo bar\n";
688             my @lines = <$stdout>;
689              
690             # There is also an alternative channel of communication (just for forked processes):
691             my $channel_in = $process->channel_in; # write to the child process
692             my $channel_out = $process->channel_out; # read from the child process
693             $process->channel_write("PING"); # convenience function
694              
695             =head1 DESCRIPTION
696              
697             Mojo::IOLoop::ReadWriteProcess is yet another process manager.
698              
699             =head1 EVENTS
700              
701             L inherits all events from L and can emit
702             the following new ones.
703              
704             =head2 start
705              
706             $process->on(start => sub {
707             my ($process) = @_;
708             $process->is_running();
709             });
710              
711             Emitted when the process starts.
712              
713             =head2 stop
714              
715             $process->on(stop => sub {
716             my ($process) = @_;
717             $process->restart();
718             });
719              
720             Emitted when the process stops.
721              
722             =head2 process_error
723              
724             $process->on(process_error => sub {
725             my ($e) = @_;
726             my @errors = @{$e};
727             });
728              
729             Emitted when the process produce errors.
730              
731             =head2 process_stuck
732              
733             $process->on(process_stuck => sub {
734             my ($self) = @_;
735             ...
736             });
737              
738             Emitted when C is set and all attempts for killing the process
739             in C have been exhausted.
740             The event is emitted before attempting to kill it with SIGKILL and becoming blocking.
741              
742             =head2 SIG_CHLD
743              
744             $process->on(SIG_CHLD => sub {
745             my ($self) = @_;
746             ...
747             });
748              
749             Emitted when we receive SIG_CHLD.
750              
751             =head2 SIG_TERM
752              
753             $process->on(SIG_TERM => sub {
754             my ($self) = @_;
755             ...
756             });
757              
758             Emitted when the child forked process receives SIG_TERM, before exiting.
759              
760             =head2 collected
761              
762             $process->on(collected => sub {
763             my ($self) = @_;
764             ...
765             });
766              
767             Emitted right after status collection.
768              
769             =head2 collect_status
770              
771             $process->on(collect_status => sub {
772             my ($self) = @_;
773             ...
774             });
775              
776             Emitted when on child process waitpid.
777             It is used internally to get the child process status.
778             Note: events attached to it are wiped when process has been stopped.
779              
780             =head1 ATTRIBUTES
781              
782             L inherits all attributes from L and implements
783             the following new ones.
784              
785             =head2 execute
786              
787             use Mojo::IOLoop::ReadWriteProcess;
788             my $process = Mojo::IOLoop::ReadWriteProcess->new(execute => "/usr/bin/perl");
789             $process->start();
790             $process->on( stop => sub { print "Process: ".(+shift()->pid)." finished"; } );
791             $process->stop();
792              
793             C should contain the external program that you wish to run.
794              
795             =head2 code
796              
797             use Mojo::IOLoop::ReadWriteProcess;
798             my $process = Mojo::IOLoop::ReadWriteProcess->new(code => sub { print "Hello" } );
799             $process->start();
800             $process->on( stop => sub { print "Process: ".(+shift()->pid)." finished"; } );
801             $process->stop();
802              
803             It represent the code you want to run in background.
804              
805             You do not need to specify C, it is implied if no arguments is given.
806              
807             my $process = Mojo::IOLoop::ReadWriteProcess->new(sub { print "Hello" });
808             $process->start();
809             $process->on( stop => sub { print "Process: ".(+shift()->pid)." finished"; } );
810             $process->stop();
811              
812             =head2 args
813              
814             use Mojo::IOLoop::ReadWriteProcess;
815             my $process = Mojo::IOLoop::ReadWriteProcess->new(code => sub { print "Hello ".$_[1] }, args => "User" );
816             $process->start();
817             $process->on( stop => sub { print "Process: ".(+shift()->pid)." finished"; } );
818             $process->stop();
819              
820             # The process will print "Hello User"
821              
822             Arguments pass to the external binary or the code block. Use arrayref to pass many.
823              
824             =head2 blocking_stop
825              
826             use Mojo::IOLoop::ReadWriteProcess;
827             my $process = Mojo::IOLoop::ReadWriteProcess->new(code => sub { print "Hello" }, blocking_stop => 1 );
828             $process->start();
829             $process->on( stop => sub { print "Process: ".(+shift()->pid)." finished"; } );
830             $process->stop(); # Will wait indefinitely until the process is stopped
831              
832             Set it to 1 if you want to do blocking stop of the process.
833              
834              
835             =head2 channels
836              
837             use Mojo::IOLoop::ReadWriteProcess;
838             my $process = Mojo::IOLoop::ReadWriteProcess->new(code => sub { print "Hello" }, channels => 0 );
839             $process->start();
840             $process->on( stop => sub { print "Process: ".(+shift()->pid)." finished"; } );
841             $process->stop(); # Will wait indefinitely until the process is stopped
842              
843             Set it to 0 if you want to disable internal channels.
844              
845             =head2 session
846              
847             use Mojo::IOLoop::ReadWriteProcess;
848             my $process = Mojo::IOLoop::ReadWriteProcess->new(sub { print "Hello" });
849             my $session = $process->session;
850             $session->enable_subreaper;
851              
852             Returns the current L singleton.
853              
854             =head2 subreaper
855              
856             use Mojo::IOLoop::ReadWriteProcess;
857             my $process = Mojo::IOLoop::ReadWriteProcess->new(code => sub { print "Hello ".$_[1] }, args => "User" );
858             $process->subreaper(1)->start();
859             $process->on( stop => sub { shift()->disable_subreaper } );
860             $process->stop();
861              
862             # The process will print "Hello User"
863              
864             Mark the current process (not the child) as subreaper on start.
865             It's on invoker behalf to disable subreaper when process stops, as it marks the current process and not the
866             child.
867              
868             =head2 ioloop
869              
870             my $loop = $process->ioloop;
871             $subprocess = $process->ioloop(Mojo::IOLoop->new);
872              
873             Event loop object to control, defaults to the global L singleton.
874              
875             =head2 max_kill_attempts
876              
877             use Mojo::IOLoop::ReadWriteProcess;
878             my $process = Mojo::IOLoop::ReadWriteProcess->new(code => sub { print "Hello" }, max_kill_attempts => 50 );
879             $process->start();
880             $process->on( stop => sub { print "Process: ".(+shift()->pid)." finished"; } );
881             $process->stop(); # It will attempt to send SIGTERM 50 times.
882              
883             Defaults to C<5>, is the number of attempts before bailing out.
884              
885             It can be used with blocking_stop, so if the number of attempts are exhausted,
886             a SIGKILL and waitpid will be tried at the end.
887              
888             =head2 kill_whole_group
889              
890             use Mojo::IOLoop::ReadWriteProcess;
891             my $process = Mojo::IOLoop::ReadWriteProcess->new(code => sub { setpgrp(0, 0); exec(...); }, kill_whole_group => 1 );
892             $process->start();
893             $process->send_signal(...); # Will skip the usual check whether $process->pid is running
894             $process->stop(); # Kills the entire process group and waits for all processes in the group to finish
895              
896             Defaults to C<0>, whether to send signals (e.g. to stop) to the entire process group.
897              
898             This is useful when the sub process creates further sub processes and creates a new process
899             group as shown in the example. In this case it might be useful to take care of the entire process
900             group when stopping and wait for every process in the group to finish.
901              
902             =head2 collect_status
903              
904             Defaults to C<1>, If enabled it will automatically collect the status of the children process.
905             Disable it in case you want to manage your process child directly, and do not want to rely on
906             automatic collect status. If you won't overwrite your C handler,
907             the C event will be still emitted.
908              
909             =head2 serialize
910              
911             Defaults to C<0>, If enabled data returned from forked process will be serialized with Storable.
912              
913             =head2 kill_sleeptime
914              
915             Defaults to C<1>, it's the seconds to wait before attempting SIGKILL when blocking_stop is set to 1.
916              
917             =head2 separate_err
918              
919             Defaults to C<1>, it will create a separate channel to intercept process STDERR,
920             otherwise it will be redirected to STDOUT.
921              
922             =head2 verbose
923              
924             Defaults to C<1>, it indicates message verbosity.
925              
926             =head2 set_pipes
927              
928             Defaults to C<1>, If enabled, additional pipes for process communication are automatically set up.
929              
930              
931             =head2 internal_pipes
932              
933             Defaults to C<1>, If enabled, additional pipes for retreiving process return and errors are set up.
934             Note: If you disable that, the only information provided by the process will be the exit_status.
935              
936             =head2 autoflush
937              
938             Defaults to C<1>, If enabled autoflush of handlers is enabled automatically.
939              
940             =head2 error
941              
942             Returns a L of errors.
943             Note: errors that can be captured only at the end of the process
944              
945             =head1 METHODS
946              
947             L inherits all methods from L and implements
948             the following new ones.
949              
950             =head2 start()
951              
952             use Mojo::IOLoop::ReadWriteProcess qw(process);
953             my $p = process(sub {
954             print STDERR "Boo\n"
955             } )->start;
956              
957             Starts the process
958              
959             =head2 stop()
960              
961             use Mojo::IOLoop::ReadWriteProcess qw(process);
962             my $p = process( execute => "/path/to/bin" )->start->stop;
963              
964             Stop the process. Unless you use C, it will attempt to kill the process
965             without waiting the process to finish. By defaults it send C to the child.
966             You can change that by defining the internal attribute C<_default_kill_signal>.
967             Note, if you want to be *sure* that the process gets killed, you can enable the
968             C attribute, that will attempt to send C after C
969             is reached.
970              
971             =head2 restart()
972              
973             use Mojo::IOLoop::ReadWriteProcess qw(process);
974             my $p = process( execute => "/path/to/bin" )->restart;
975              
976             It restarts the process if stopped, or if already running, it stops it first.
977              
978             =head2 is_running()
979              
980             use Mojo::IOLoop::ReadWriteProcess qw(process);
981             my $p = process( execute => "/path/to/bin" )->start;
982             $p->is_running;
983              
984             Boolean, it inspect if the process is currently running or not.
985              
986             =head2 exit_status()
987              
988             use Mojo::IOLoop::ReadWriteProcess qw(process);
989             my $p = process( execute => "/path/to/bin" )->start;
990              
991             $p->wait_stop->exit_status;
992              
993             Inspect the process exit status, it does the shifting magic, to access to the real value
994             call C<_status()>.
995              
996             =head2 return_status()
997              
998             use Mojo::IOLoop::ReadWriteProcess qw(process);
999             my $p = process( sub { return 42 } )->start;
1000              
1001             my $s = $p->wait_stop->return_status; # 42
1002              
1003             Inspect the codeblock return.
1004              
1005             =head2 enable_subreaper()
1006              
1007             use Mojo::IOLoop::ReadWriteProcess qw(process);
1008             my $p = process()->enable_subreaper;
1009              
1010             Mark the current process (not the child) as subreaper.
1011             This is used typically if you want to mark further children as subreapers inside other forks.
1012              
1013             my $master_p = process(
1014             sub {
1015             my $p = shift;
1016             $p->enable_subreaper;
1017              
1018             process(sub { sleep 4; exit 1 })->start();
1019             process(
1020             sub {
1021             sleep 4;
1022             process(sub { sleep 1; })->start();
1023             })->start();
1024             process(sub { sleep 4; exit 0 })->start();
1025             process(sub { sleep 4; die })->start();
1026             my $manager
1027             = process(sub { sleep 2 })->subreaper(1)->start();
1028             sleep 1 for (0 .. 10);
1029             $manager->stop;
1030             return $manager->session->all->size;
1031             });
1032              
1033             $master_p->subreaper(1);
1034              
1035             $master_p->on(collected => sub { $status++ });
1036              
1037             # On start we setup the current process as subreaper
1038             # So it's up on us to disable it after process is done.
1039             $master_p->on(stop => sub { shift()->disable_subreaper });
1040             $master_p->start();
1041              
1042             =head2 disable_subreaper()
1043              
1044             use Mojo::IOLoop::ReadWriteProcess qw(process);
1045             my $p = process()->disable_subreaper;
1046              
1047             Unset the current process (not the child) as subreaper.
1048              
1049             =head2 prctl()
1050              
1051             use Mojo::IOLoop::ReadWriteProcess qw(process);
1052             my $p = process();
1053             $p->prctl($option, $arg2, $arg3, $arg4, $arg5);
1054              
1055             Internal function to execute and wrap the prctl syscall, accepts the same arguments as prctl.
1056              
1057             =head2 diag()
1058              
1059             use Mojo::IOLoop::ReadWriteProcess qw(process);
1060             my $p = process(sub { print "Hello\n" });
1061             $p->on( stop => sub { shift->diag("Done!") } );
1062             $p->start->wait_stop;
1063              
1064             Internal function to print information to STDERR if verbose attribute is set or either DEBUG mode enabled.
1065             You can use it if you wish to display information on the process status.
1066              
1067             =head2 to_ioloop()
1068              
1069             use Mojo::IOLoop::ReadWriteProcess qw(process);
1070              
1071             my $p = process(sub { print "Hello from first process\n"; sleep 1 });
1072              
1073             $p->start(); # Start and sets the handlers
1074             my $stream = $p->to_ioloop; # Get the stream and demand to IOLoop
1075             my $output;
1076              
1077             # Hook on Mojo::IOLoop::Stream events
1078             $stream->on(read => sub { $output .= pop; $p->is_running ... });
1079              
1080             Mojo::IOLoop->singleton->start() unless Mojo::IOLoop->singleton->is_running;
1081              
1082             Returns a L object and demand the wait operation to L.
1083             It needs C enabled. Default IOLoop can be overridden in C.
1084              
1085             =head2 wait()
1086              
1087             use Mojo::IOLoop::ReadWriteProcess qw(process);
1088             my $p = process(sub { print "Hello\n" })->wait;
1089             # ... here now you can mangle $p handlers and such
1090              
1091             Waits until the process finishes, but does not performs cleanup operations (until stop is called).
1092              
1093             =head2 wait_stop()
1094              
1095             use Mojo::IOLoop::ReadWriteProcess qw(process);
1096             my $p = process(sub { print "Hello\n" })->start->wait_stop;
1097             # $p is not running anymore, and all possible events have been granted to be emitted.
1098              
1099             Waits until the process finishes, and perform cleanup operations.
1100              
1101             =head2 errored()
1102              
1103             use Mojo::IOLoop::ReadWriteProcess qw(process);
1104             my $p = process(sub { die "Nooo" })->start->wait_stop;
1105             $p->errored; # will return "1"
1106              
1107             Returns a boolean indicating if the process had errors or not.
1108              
1109             =head2 write_pidfile()
1110              
1111             use Mojo::IOLoop::ReadWriteProcess qw(process);
1112             my $p = process(sub { die "Nooo" } );
1113             $p->pidfile("foobar");
1114             $p->start();
1115             $p->write_pidfile();
1116              
1117             Forces writing PID of process to specified pidfile in the attributes of the object.
1118             Useful only if the process have been already started, otherwise if a pidfile it's supplied
1119             as attribute, it will be done automatically.
1120              
1121             =head2 write_stdin()
1122              
1123             use Mojo::IOLoop::ReadWriteProcess qw(process);
1124             my $p = process(sub { my $a = ; print STDERR "Hello my name is $a\n"; } )->start;
1125             $p->write_stdin("Larry");
1126             $p->read_stderr; # process STDERR will contain: "Hello my name is Larry\n"
1127              
1128             Write data to process STDIN.
1129              
1130             =head2 write_channel()
1131              
1132             use Mojo::IOLoop::ReadWriteProcess qw(process);
1133             my $p = process(sub {
1134             my $self = shift;
1135             my $parent_output = $self->channel_out;
1136             my $parent_input = $self->channel_in;
1137              
1138             while(defined(my $line = <$parent_input>)) {
1139             print $parent_output "PONG\n" if $line =~ /PING/i;
1140             }
1141             } )->start;
1142             $p->write_channel("PING");
1143             my $out = $p->read_channel;
1144             # $out is PONG
1145             my $child_output = $p->channel_out;
1146             while(defined(my $line = <$child_output>)) {
1147             print "Process is replying back with $line!\n";
1148             $p->write_channel("PING");
1149             }
1150              
1151             Write data to process channel. Note, it's not STDIN, neither STDOUT, it's a complete separate channel
1152             dedicated to parent-child communication.
1153             In the parent process, you can access to the same pipes (but from the opposite direction):
1154              
1155             my $child_output = $self->channel_out;
1156             my $child_input = $self->channel_in;
1157              
1158             =head2 read_stdout()
1159              
1160             use Mojo::IOLoop::ReadWriteProcess qw(process);
1161             my $p = process(sub {
1162             print "Boo\n"
1163             } )->start;
1164             $p->read_stdout;
1165              
1166             Gets a single line from process STDOUT.
1167              
1168             =head2 read_channel()
1169              
1170             use Mojo::IOLoop::ReadWriteProcess qw(process);
1171             my $p = process(sub {
1172             my $self = shift;
1173             my $parent_output = $self->channel_out;
1174             my $parent_input = $self->channel_in;
1175              
1176             print $parent_output "PONG\n";
1177             } )->start;
1178             $p->read_channel;
1179              
1180             Gets a single line from process channel.
1181              
1182             =head2 read_stderr()
1183              
1184             use Mojo::IOLoop::ReadWriteProcess qw(process);
1185             my $p = process(sub {
1186             print STDERR "Boo\n"
1187             } )->start;
1188             $p->read_stderr;
1189              
1190             Gets a single line from process STDERR.
1191              
1192             =head2 read_all_stdout()
1193              
1194             use Mojo::IOLoop::ReadWriteProcess qw(process);
1195             my $p = process(sub {
1196             print "Boo\n"
1197             } )->start;
1198             $p->read_all_stdout;
1199              
1200             Gets all the STDOUT output of the process.
1201              
1202             =head2 read_all_channel()
1203              
1204             use Mojo::IOLoop::ReadWriteProcess qw(process);
1205             my $p = process(sub {
1206             shift->channel_out->write("Ping")
1207             } )->start;
1208             $p->read_all_channel;
1209              
1210             Gets all the channel output of the process.
1211              
1212             =head2 read_all_stderr()
1213              
1214             use Mojo::IOLoop::ReadWriteProcess qw(process);
1215             my $p = process(sub {
1216             print STDERR "Boo\n"
1217             } )->start;
1218             $p->read_all_stderr;
1219              
1220             Gets all the STDERR output of the process.
1221              
1222             =head2 send_signal()
1223              
1224             use Mojo::IOLoop::ReadWriteProcess qw(process);
1225             use POSIX;
1226             my $p = process( execute => "/path/to/bin" )->start;
1227              
1228             $p->send_signal(POSIX::SIGKILL);
1229              
1230             Send a signal to the process
1231              
1232             =head1 EXPORTS
1233              
1234             =head2 parallel()
1235              
1236             use Mojo::IOLoop::ReadWriteProcess qw(parallel);
1237             my $pool = parallel sub { print "Hello\n" } => 5;
1238             $pool->start();
1239             $pool->on( stop => sub { print "Process: ".(+shift()->pid)." finished"; } );
1240             $pool->stop();
1241              
1242             Returns a L object that represent a group of processes.
1243              
1244             It accepts the same arguments as L, and the last one represent the number of processes to generate.
1245              
1246             =head2 batch()
1247              
1248             use Mojo::IOLoop::ReadWriteProcess qw(batch);
1249             my $pool = batch;
1250             $pool->add(sub { print "Hello\n" });
1251             $pool->on(stop => sub { shift->_diag("Done!") })->start->wait_stop;
1252              
1253             Returns a L object generated from supplied arguments.
1254             It accepts as input the same parameter of L constructor ( see parallel() ).
1255              
1256             =head2 process()
1257              
1258             use Mojo::IOLoop::ReadWriteProcess qw(process);
1259             my $p = process sub { print "Hello\n" };
1260             $p->start()->wait_stop;
1261              
1262             or even:
1263              
1264             process(sub { print "Hello\n" })->start->wait_stop;
1265              
1266             Returns a L object that represent a process.
1267              
1268             It accepts the same arguments as L.
1269              
1270             =head2 queue()
1271              
1272             use Mojo::IOLoop::ReadWriteProcess qw(queue);
1273             my $q = queue;
1274             $q->add(sub { return 42 } );
1275             $q->consume;
1276              
1277             Returns a L object that represent a queue.
1278              
1279             =head1 DEBUGGING
1280              
1281             You can set the MOJO_EVENTEMITTER_DEBUG environment variable to get some advanced diagnostics information printed to STDERR.
1282              
1283             MOJO_EVENTEMITTER_DEBUG=1
1284              
1285             Also, you can set MOJO_PROCESS_DEBUG environment variable to get diagnostics about the process execution.
1286              
1287             MOJO_PROCESS_DEBUG=1
1288              
1289             =head1 LICENSE
1290              
1291             Copyright (C) Ettore Di Giacinto.
1292              
1293             This library is free software; you can redistribute it and/or modify
1294             it under the same terms as Perl itself.
1295              
1296             =head1 AUTHOR
1297              
1298             Ettore Di Giacinto Eedigiacinto@suse.comE
1299              
1300             =cut