File Coverage

lib/Mojo/IOLoop/ReadWriteProcess.pm
Criterion Covered Total %
statement 371 379 97.8
branch 193 230 83.9
condition 63 112 56.2
subroutine 70 70 100.0
pod 26 28 92.8
total 723 819 88.2


line stmt bran cond sub pod time code
1             package Mojo::IOLoop::ReadWriteProcess;
2              
3             our $VERSION = '0.34';
4              
5 39     39   11622332 use Mojo::Base 'Mojo::EventEmitter';
  39         422922  
  39         341  
  46         1016  
  108         941  
  108         287  
6 39     39   71891 use Mojo::File 'path';
  39         61882  
  39         1978  
  108         657  
  80         542  
  80         218  
7 39     39   243 use Mojo::Util qw(b64_decode b64_encode scope_guard);
  39         70  
  39         1843  
  76         278  
  104         553  
  108         764  
8 39     39   19953 use Mojo::IOLoop::Stream;
  39         5064951  
  39         320  
  108         1245  
  108         3869  
  108         728014  
9              
10 39     39   22914 use Mojo::IOLoop::ReadWriteProcess::Exception;
  39         139  
  39         422  
  80         1546  
  18         181  
  8         194  
11 39     39   18059 use Mojo::IOLoop::ReadWriteProcess::Pool;
  39         88  
  39         2131  
  8         81559  
  6         148  
  6         63  
12 39     39   17510 use Mojo::IOLoop::ReadWriteProcess::Queue;
  39         107  
  39         271  
  6         190  
  6         3035  
  6         244  
13 39     39   1637 use Mojo::IOLoop::ReadWriteProcess::Session;
  39         78  
  39         1435  
  6         333  
  2         38  
  2         32  
14              
15 39     39   17309 use Mojo::IOLoop::ReadWriteProcess::Shared::Lock;
  39         87  
  39         2562  
  2         47  
16 39     39   17532 use Mojo::IOLoop::ReadWriteProcess::Shared::Memory;
  39         113  
  39         1902  
17 39     39   333 use Mojo::IOLoop::ReadWriteProcess::Shared::Semaphore;
  39         96  
  39         1426  
18              
19 39     39   258 use B::Deparse;
  39         892  
  39         1437  
20 39     39   238 use Carp 'confess';
  39         76  
  39         2771  
21 39     39   266 use IO::Handle;
  39         77  
  39         1633  
22 39     39   20148 use IO::Pipe;
  39         48939  
  39         1415  
23 39     39   17933 use IO::Select;
  39         60790  
  39         2069  
24 39     39   20152 use IPC::Open3;
  39         111750  
  39         2357  
25 39     39   310 use Time::HiRes 'sleep';
  39         58  
  39         309  
26 39     39   4251 use Symbol 'gensym';
  39         84  
  39         1600  
27 39     39   27318 use Storable;
  39         130249  
  39         2556  
28 39     39   307 use POSIX qw( :sys_wait_h :signal_h );
  39         87  
  39         339  
29             our @EXPORT_OK
30             = (qw(parallel batch process pool queue), qw(shared_memory lock semaphore));
31 39     39   14081 use Exporter 'import';
  39         79  
  39         1539  
32              
33 39     39   224 use constant DEBUG => $ENV{MOJO_PROCESS_DEBUG};
  39         192  
  39         229340  
34              
35             has [
36             qw(kill_sleeptime sleeptime_during_kill),
37             qw(separate_err autoflush set_pipes verbose),
38             qw(internal_pipes channels)
39             ] => 1;
40              
41             has [qw(blocking_stop serialize quirkiness total_sleeptime_during_kill)] => 0;
42              
43             has [
44             qw(execute code process_id pidfile return_status),
45             qw(channel_in channel_out write_stream read_stream error_stream),
46             qw(_internal_err _internal_return _status args)
47             ];
48              
49             has max_kill_attempts => 5;
50             has kill_whole_group => 0;
51              
52             has error => sub { Mojo::Collection->new };
53              
54             has ioloop => sub { Mojo::IOLoop->singleton };
55             has session => sub { Mojo::IOLoop::ReadWriteProcess::Session->singleton };
56              
57             has _deparse => sub { B::Deparse->new }
58             if DEBUG;
59             has _deserialize => sub { \&Storable::thaw };
60             has _serialize => sub { \&Storable::freeze };
61             has _default_kill_signal => POSIX::SIGTERM;
62             has _default_blocking_signal => POSIX::SIGKILL;
63              
64             # Override new() just to support sugar syntax
65             # so it is possible to do : process->new(sub{ print "Hello World\n" })->start->stop; and so on.
66             sub new {
67 392 100   392 1 117189 push(@_, code => splice @_, 1, 1) if ref $_[1] eq "CODE";
    100     1    
68 392         4887 return shift->SUPER::new(@_);
69             }
70              
71             sub to_ioloop {
72 12     12 1 10352 my $self = shift;
73 12 100       653 confess 'Pipes needs to be set!' unless $self->read_stream;
    100          
74 2         14 my $stream = Mojo::IOLoop::Stream->new($self->read_stream)->timeout(0);
75 2         670 $self->ioloop->stream($stream);
76 2         504 my $me = $$;
77             $stream->on(
78             close => sub {
79 2 50   2   467 return unless $$ == $me;
    100          
80 2 50       15 $self->_collect->stop unless defined $self->_status;
    100          
81 2         43 });
82 2         11 return $stream;
83             }
84              
85 177     177 1 153179 sub process { __PACKAGE__->new(@_) }
86 6     6 1 6797 sub batch { Mojo::IOLoop::ReadWriteProcess::Pool->new(@_) }
87 6     6 1 15664 sub queue { Mojo::IOLoop::ReadWriteProcess::Queue->new(@_) }
88 1     1 1 19 sub lock { Mojo::IOLoop::ReadWriteProcess::Shared::Lock->new(@_) }
89 1     1 0 100 sub semaphore { Mojo::IOLoop::ReadWriteProcess::Shared::Semaphore->new(@_) }
90 1     1 0 3 sub shared_memory { Mojo::IOLoop::ReadWriteProcess::Shared::Memory->new(@_) }
91              
92             sub parallel {
93 3     3 1 1822 my $c = batch();
94 3         51 $c->add(@_) for 1 .. +pop();
95 3         29 return $c;
96             }
97              
98             sub _diag {
99 4     4   4478 my ($self, @messages) = @_;
100 4         82 my $caller = (caller(1))[3];
101 4 50       86 print STDERR ">> ${caller}(): @messages\n" if (DEBUG || $self->verbose);
102             }
103              
104             sub _open_collect_status {
105 65     65   1835 my ($self, $pid, $e, $errno) = @_;
106              
107 65 50       526 return unless $self;
    50          
108              
109 65 50 33     3512 $self->_status($e // $?) unless defined $self->_status;
    100 100        
110 24         498 $self->_diag("Forked code Process Exit status: " . $self->exit_status)
111             if DEBUG;
112              
113 24         178 $self->_clean_pidfile;
114              
115 24         183 return $self;
116             }
117              
118             # Use open3 to launch external program.
119             sub _open {
120 48     48   877 my ($self, @args) = @_;
121 48         128 $self->_diag('Execute: ' . (join ', ', map { "'$_'" } @args)) if DEBUG;
122              
123 48         551 $self->on(collect_status => \&_open_collect_status);
124              
125 48         821 my ($wtr, $rdr, $err);
126 48         368 $err = gensym;
127 48 100       1486 my $pid = open3($wtr, $rdr, ($self->separate_err) ? $err : undef, @args);
    100          
128              
129 42 100       265167 die "Cannot create pipe: $!" unless defined $pid;
130 41         924 $self->process_id($pid);
131              
132             # Defered collect of return status and removal of pidfile
133              
134 41 100       1365 return $self unless $self->set_pipes();
135              
136 38         1902 $self->read_stream(IO::Handle->new_from_fd($rdr, "r"));
137 38         10613 $self->write_stream(IO::Handle->new_from_fd($wtr, "w"));
138 38 100       5067 $self->error_stream(($self->separate_err)
139             ? IO::Handle->new_from_fd($err, "r")
140             : $self->write_stream);
141              
142 38         3988 return $self;
143             }
144              
145 488 100   488   2566 sub _clean_pidfile { unlink(shift->pidfile) if $_[0]->pidfile }
146              
147             sub _collect {
148 23     18   86 my ($self, $pid) = @_;
149 7   33     101 $pid //= $self->pid;
      66        
150              
151 6         86 $self->session->consume_collected_info;
152             $self->session->_protect(
153             sub {
154 6     1   132 local $?;
155 6 50       189 waitpid $pid, 0 unless defined $self->_status;
156 5 50       36340 return $self->_open_collect_status($pid) if $self->execute;
157 5 50       203 return $self->_fork_collect_status($pid) if $self->code;
158 6         84 });
159              
160 5         202 $self;
161             }
162              
163             sub _fork_collect_status {
164 244     240   13234 my ($self, $pid, $e, $errno) = @_;
165              
166 244 100       2195 return unless $self;
167              
168 243         2408 my $return_reader;
169             my $internal_err_reader;
170 243         713 my $rt;
171 239         77 my @result_error;
172              
173 239 50 100     1495 $self->_status($e // $?) unless defined $self->_status;
      100        
174 239         4266 $self->_diag("Forked code Process Exit status: " . $self->exit_status)
175             if DEBUG;
176              
177 239 100       1170 if ($self->_internal_return) {
178 184 50       2383 $return_reader
179             = $self->_internal_return->isa("IO::Pipe::End")
180             ? $self->_internal_return
181             : $self->_internal_return->reader();
182 184 50 0     28148 $self->_new_err('Cannot read from return code pipe') && return
      100        
183             unless IO::Select->new($return_reader)->can_read(10);
184 184         6094452 $rt = $return_reader->getline();
185 184         10164622 $self->_diag("Forked code Process Returns: " . ($rt ? $rt : 'nothing'))
186             if DEBUG;
187             $self->return_status(
188 184 100       1553 $self->serialize ? eval { $self->_deserialize->(b64_decode($rt)) }
  2 100       102  
189             : $rt ? $rt
190             : ());
191             }
192 239 100       6421 if ($self->_internal_err) {
193 185 100       1543 $internal_err_reader
194             = $self->_internal_err->isa("IO::Pipe::End")
195             ? $self->_internal_err
196             : $self->_internal_err->reader();
197 185 100 50     23261 $self->_new_err('Cannot read from errors code pipe') && return
      100        
198             unless IO::Select->new($internal_err_reader)->can_read(10);
199 183         35612 @result_error = $internal_err_reader->getlines();
200             push(
201 60         640 @{$self->error},
202 183 100       15133 map { Mojo::IOLoop::ReadWriteProcess::Exception->new($_) } @result_error
  60         3575  
203             ) if @result_error;
204 183         448 $self->_diag("Forked code Process Errors: " . join("\n", @result_error))
205             if DEBUG;
206             }
207              
208 237         2069 $self->_clean_pidfile;
209 237         2142 return $self;
210             }
211              
212             # Handle forking of code
213             sub _fork {
214 226     227   2254 my ($self, $code, @args) = @_;
215 226 100       1055 die "Can't spawn child without code" unless ref($code) eq "CODE";
216              
217             # STDIN/STDOUT/STDERR redirect.
218 223         738 my ($input_pipe, $output_pipe, $output_err_pipe);
219              
220             # Separated handles that could be used for internal comunication.
221 223         0 my ($channel_in, $channel_out);
222              
223              
224 223 100       843 if ($self->set_pipes) {
225 173 100       2659 $input_pipe = IO::Pipe->new()
226             or $self->_new_err('Failed creating input pipe');
227 173 100       28375 $output_pipe = IO::Pipe->new()
228             or $self->_new_err('Failed creating output pipe');
229 173 100       18967 $output_err_pipe = IO::Pipe->new()
230             or $self->_new_err('Failed creating output error pipe');
231 173 100       16831 if ($self->channels) {
232 173 100       2744 $channel_in = IO::Pipe->new()
233             or $self->_new_err('Failed creating Channel input pipe');
234 173 100       20667 $channel_out = IO::Pipe->new()
235             or $self->_new_err('Failed creating Channel output pipe');
236             }
237             }
238 223 100       17864 if ($self->internal_pipes) {
239 219 100       4082 my $internal_err = IO::Pipe->new()
240             or $self->_new_err('Failed creating internal error pipe');
241 214 100       27041 my $internal_return = IO::Pipe->new()
242             or $self->_new_err('Failed creating internal return pipe');
243              
244             # Internal pipes to retrieve error/return
245 219         22639 $self->_internal_err($internal_err);
246 219         3007 $self->_internal_return($internal_return);
247             }
248              
249             # Defered collect of return status
250              
251 223         3675 $self->on(collect_status => \&_fork_collect_status);
252              
253 223         4226 $self->_diag("Fork: " . $self->_deparse->coderef2text($code)) if DEBUG;
254              
255 223         422809 my $pid = fork;
256 218 100       8194 die "Cannot fork: $!" unless defined $pid;
257              
258 218 100       3131 if ($pid == 0) {
259 25         3444 local $SIG{CHLD};
260 25     6   3285 local $SIG{TERM} = sub { $self->emit('SIG_TERM')->_exit(1) };
  5         45  
261              
262 20         739 my $return;
263             my $internal_err;
264              
265 20 50       1556 if ($self->internal_pipes) {
266 20 100       1491 if ($self->_internal_err) {
267 19 50       794 $internal_err
268             = $self->_internal_err->isa("IO::Pipe::End")
269             ? $self->_internal_err
270             : $self->_internal_err->writer();
271 19         6545 $internal_err->autoflush(1);
272             }
273              
274 20 100       4784 if ($self->_internal_return) {
275 19 50       745 $return
276             = $self->_internal_return->isa("IO::Pipe::End")
277             ? $self->_internal_return
278             : $self->_internal_return->writer();
279 19         2747 $return->autoflush(1);
280             }
281             else {
282 1         28 eval { $internal_err->write("Can't setup return status pipe") };
  1         44  
283             }
284             }
285              
286             # Set pipes to redirect STDIN/STDOUT/STDERR + channels if desired
287 20 100       1446 if ($self->set_pipes()) {
288 19         696 my $stdout;
289             my $stderr;
290 19         0 my $stdin;
291              
292 19 100       684 $stdout = $output_pipe->writer() if $output_pipe;
293 19 100       2022 $stderr
    100          
294             = (!$self->separate_err) ? $stdout
295             : $output_err_pipe ? $output_err_pipe->writer()
296             : undef;
297 19 100       2870 $stdin = $input_pipe->reader() if $input_pipe;
298 19 100 33     4208 open STDERR, ">&", $stderr
      33        
      66        
299             or !!$internal_err->write($!)
300             or $self->_diag($!)
301             if $stderr;
302 19 100 33     1200 open STDOUT, ">&", $stdout
      33        
303             or !!$internal_err->write($!)
304             or $self->_diag($!)
305             if $stdout;
306 19 100 33     933 open STDIN, ">&", $stdin
      33        
307             or !!$internal_err->write($!)
308             or $self->_diag($!)
309             if $stdin;
310              
311 19         368 $self->read_stream($stdin);
312 19         531 $self->error_stream($stderr);
313 19         478 $self->write_stream($stdout);
314 19 50       420 if ($self->channels) {
315              
316 19 100       530 $self->channel_in($channel_in->reader) if $channel_in;
317 19 100       2522 $self->channel_out($channel_out->writer) if $channel_out;
318 38         1718 eval { $self->$_->autoflush($self->autoflush) }
319 19         1861 for qw( channel_in channel_out );
320             }
321 57         1938 eval { $self->$_->autoflush($self->autoflush) }
322 19         997 for qw(read_stream error_stream write_stream );
323             }
324 20         1170 $self->session->reset;
325 20         9379 $self->session->subreaper(0); # Subreaper bit does not persist in fork
326 20         3857 $self->process_id($$);
327 20         1491 $! = 0;
328 20         113 my $rt;
329 20         170 eval { $rt = [$code->($self, @args)]; };
  20         401  
330 0 0       0 if ($internal_err) {
331 0 0       0 $internal_err->write($@) if $@;
332 0 0 0     0 $internal_err->write($!) if !$@ && $!;
333             }
334 0 0 0     0 $rt = @$rt[0]
      0        
335             if !$self->serialize && ref $rt eq 'ARRAY' && scalar @$rt == 1;
336 0 0 0     0 $rt = b64_encode(eval { $self->_serialize->($rt) })
  0         0  
337             if $self->serialize && $return;
338 0 0       0 $return->write($rt) if $return;
339 0   0     0 $self->_exit($@ // $!);
340             }
341 198         15854 $self->process_id($pid);
342              
343 198 100       10606 return $self unless $self->set_pipes();
344              
345 149 100       9320 $self->read_stream($output_pipe->reader) if $output_pipe;
346 149 100       45807 $self->error_stream((!$self->separate_err) ? $self->read_stream()
    100          
347             : $output_err_pipe ? $output_err_pipe->reader()
348             : undef);
349 149 100       20175 $self->write_stream($input_pipe->writer) if $input_pipe;
350              
351 149 50       16302 if ($self->set_pipes) {
352 149 50       2909 if ($self->channels) {
353 149 100       2100 $self->channel_in($channel_in->writer) if $channel_in;
354 149 100       15622 $self->channel_out($channel_out->reader) if $channel_out;
355 298         38368 eval { $self->$_->autoflush($self->autoflush) }
356 149         14136 for qw( channel_in channel_out );
357             }
358 447         13347 eval { $self->$_->autoflush($self->autoflush) }
359 149         8121 for qw(read_stream error_stream write_stream );
360             }
361              
362 149         9016 return $self;
363             }
364              
365             sub _new_err {
366 30     26   315 my $self = shift;
367 30         277 my $err = Mojo::IOLoop::ReadWriteProcess::Exception->new(@_);
368 30         73 push(@{$self->error}, $err);
  30         234  
369              
370             # XXX: Need to switch, we should emit one error at the time, and _shutdown
371             # should emit just the ones wasn't emitted
372 30         399 return $self->emit(process_error => [$err]);
373             }
374              
375             sub _exit {
376 5   0 1   98 my $code = shift // 0;
377 5         797 eval { POSIX::_exit($code); };
  5         540  
378 5         573 exit($code);
379             }
380              
381             sub wait {
382 109     105 1 7753 my $self = shift;
383 109         1087 sleep $self->sleeptime_during_kill while ($self->is_running);
384 109         2864 return $self;
385             }
386              
387 105     101 1 11571 sub wait_stop { shift->wait->stop }
388 35 100   19 1 16939 sub errored { !!@{shift->error} ? 1 : 0 }
  28         963  
389              
390             # PPC64: Treat msb on neg (different cpu/perl interpreter version)
391 7 50   2   166 sub _st { my $st = shift >> 8; ($st & 0x80) ? (0x100 - ($st & 0xFF)) : $st }
  7         189  
392              
393             sub exit_status {
394 42 100 66 37 1 23478 defined $_[0]->_status && $_[0]->quirkiness ? _st(shift->_status)
    100          
395             : defined $_[0]->_status ? shift->_status >> 8
396             : undef;
397             }
398              
399             sub restart {
400 40 100   34 1 121512 $_[0]->is_running ? $_[0]->stop->start : $_[0]->start;
401             }
402              
403             sub is_running {
404 1445     1440 1 67316951 my ($self) = shift;
405 1445         6010 $self->session->consume_collected_info;
406 1440 100       13432 $self->process_id ? kill 0 => $self->process_id : 0;
407             }
408              
409             sub write_pidfile {
410 260     260 1 2412 my ($self, $pidfile) = @_;
411 260 100       1196 $self->pidfile($pidfile) if $pidfile;
412 260 100       1744 return unless $self->pid;
413 223 100       2860 return unless $self->pidfile;
414              
415 4         146 path($self->pidfile)->spew($self->pid);
416 4         1600 return $self;
417             }
418              
419             # Convenience functions
420             sub _syswrite {
421 40     40   364 my $stream = shift;
422 40 100       147 return unless $stream;
423 40         567 $stream->syswrite($_ . "\n") for @_;
424             }
425              
426             sub _getline {
427 87 100   57   1350 return unless IO::Select->new($_[0])->can_read(10);
428 55         34019 shift->getline;
429             }
430              
431             sub _getlines {
432 41 100   32   1714 return unless IO::Select->new($_[0])->can_read(10);
433 29 100       23169 wantarray ? shift->getlines : join '', @{[shift->getlines]};
  4         153  
434             }
435              
436             # Write to the controlled-process STDIN
437             sub write_stdin {
438 21     21 1 9701 my ($self, @data) = @_;
439 21         1742 _syswrite($self->write_stream, @data);
440 21         1009 return $self;
441             }
442              
443             sub write_channel {
444 2     2 1 86 my ($self, @data) = @_;
445 2         48 _syswrite($self->channel_in, @data);
446 2         64 return $self;
447             }
448              
449             # Get all lines from the current process output stream
450 8     8 1 2989 sub read_all_stdout { _getlines(shift->read_stream) }
451              
452             # Get all lines from the process channel
453 3     3 1 2473 sub read_all_channel { _getlines(shift->channel_out); }
454 60     60 1 15542 sub read_stdout { _getline(shift->read_stream) }
455 2     2 1 1360 sub read_channel { _getline(shift->channel_out) }
456              
457             sub read_all_stderr {
458 2 100   2 1 64 return $_[0]->getline unless $_[0]->separate_err;
459 1         29 _getlines(shift->error_stream);
460             }
461              
462             # Get a line from the current process output stream
463             sub read_stderr {
464 9 50   9 1 15153 return $_[0]->getline unless $_[0]->separate_err;
465 9         79 _getline(shift->error_stream);
466             }
467              
468             sub start {
469 247     247 1 15217 my $self = shift;
470 247 50       980 return $self if $self->is_running;
471 242 100 100     2752 die "Nothing to do" unless !!$self->execute || !!$self->code;
472              
473             my @args
474             = defined($self->args)
475             ? ref($self->args) eq "ARRAY"
476 240 100       4847 ? @{$self->args}
  7 100       82  
477             : $self->args
478             : ();
479              
480 240 100       3203 $self->session->enable_subreaper if $self->subreaper;
481 240         3465 $self->_status(undef);
482 240         2870 $self->session->enable;
483              
484             {
485 240         1663 my $old_emit_from_sigchld = $self->session->emit_from_sigchld;
  240         916  
486 240         2297 $self->session->emit_from_sigchld(0);
487             my $scope_guard = scope_guard sub {
488 229     258   9027190 $self->session->emit_from_sigchld($old_emit_from_sigchld);
489 229 100       5658 $self->session->consume_collected_info if ($old_emit_from_sigchld);
490 240         7510 };
491              
492 240 100       10394 if ($self->code) {
    50          
493 206         1449 $self->_fork($self->code, @args);
494             }
495             elsif ($self->execute) {
496 34         658 $self->_open($self->execute, @args);
497             }
498              
499 219         7821 $self->write_pidfile;
500 224         9947 $self->emit('start');
501 224         12943 $self->session->register($self->pid() => $self);
502             }
503 224         15586 return $self;
504             }
505              
506             sub send_signal {
507 195     161 1 4071 my $self = shift;
508 195   100     1883 my $signal = shift // $self->_default_kill_signal;
509 195   66     1645 my $pid = shift // $self->process_id;
510 194 100 66     1166 return unless $self->kill_whole_group || $self->is_running;
511 149         3331 $self->_diag("Sending signal '$signal' to $pid") if DEBUG;
512 175         6343 kill $signal => $pid;
513 175         1484 return $self;
514             }
515              
516             sub stop {
517 326     288 1 48056 my $self = shift;
518              
519 321         1764 my $pid = $self->pid;
520 331 100       3256 return $self unless defined $pid;
521 319 100       1876 return $self->_shutdown(1) unless $self->is_running;
522 133         3221 $self->_diag("Stopping $pid") if DEBUG;
523              
524 133         4536 my $ret;
525 133         1083 my $attempt = 1;
526 61   33     1509 my $timeout = $self->total_sleeptime_during_kill // 0;
527 50         589 my $sleep_time = $self->sleeptime_during_kill;
528 44         454 my $max_attempts = $self->max_kill_attempts;
529 61         1029 my $signal = $self->_default_kill_signal;
530 61 100       1316 $pid = -getpgrp($pid) if $self->kill_whole_group;
531 61   66     1949 until ((defined $ret && ($ret == $pid || $ret == -1))
      100        
      66        
      100        
532             || ($attempt > $max_attempts && $timeout <= 0))
533             {
534 176   66     3448 my $send_signal = $attempt == 1 || $timeout <= 0;
535 148         241 $self->_diag(
536             "attempt $attempt/$max_attempts to kill process: $pid, timeout: $timeout")
537             if DEBUG && $send_signal;
538             $self->session->_protect(
539             sub {
540 148     178   992 local $?;
541 148 100       409 if ($send_signal) {
542 148         522 $self->send_signal($signal, $pid);
543 148         684 ++$attempt;
544             }
545 148         995 $ret = waitpid($pid, WNOHANG);
546 148 100 100     1673 $self->_status($?) if $ret == $pid || $ret == -1;
547 148         835 });
548 148 100       971 if ($sleep_time) {
549 148         1338291 sleep $sleep_time;
550 148         2846 $timeout -= $sleep_time;
551             }
552             }
553 33 100       326 return $self->_shutdown if defined $self->_status;
554              
555 14 50       346 sleep $self->kill_sleeptime if $self->kill_sleeptime;
556              
557 14 100       142838 if ($self->blocking_stop) {
558 10         270 $self->_diag("Could not kill process id: $pid, blocking attempt") if DEBUG;
559 10         116 $self->emit('process_stuck');
560              
561             ### XXX: avoid to protect on blocking.
562 10         334 $self->send_signal($self->_default_blocking_signal, $pid);
563 10         5020 $ret = waitpid($pid, 0);
564 10 50 33     422 $self->_status($?) if $ret == $pid || $ret == -1;
565 10         556 return $self->_shutdown;
566             }
567             else {
568 4         76 $self->_diag("Could not kill process id: $pid") if DEBUG;
569 4         64 $self->_new_err('Could not kill process');
570             }
571 4         94 return $self;
572             }
573              
574             sub _shutdown {
575 230     200   34890 my ($self, $wait) = @_;
576 230 50       889 return $self unless $self->pid;
577              
578 230         1494 $self->_diag("Shutdown " . $self->pid) if DEBUG;
579             $self->session->_protect(
580             sub {
581 73     83   695 local $?;
582 73         363 waitpid $self->pid, 0;
583 73         1102 $self->emit('collect_status');
584 230 100 100     1958 }) if $wait && !defined $self->_status;
585              
586 218 100       1815 $self->emit('collect_status') unless defined $self->_status;
587 218         2441 $self->_clean_pidfile;
588 218 100 100     1905 $self->emit('process_error', $self->error)
589             if $self->error && $self->error->size > 0;
590 218         11673 $self->unsubscribe('collect_status');
591              
592 218         4243 return $self->emit('stop');
593             }
594              
595             # General alias
596             *pid = \&process_id;
597             *died = \&_errored;
598             *failed = \&_errored;
599             *diag = \&_diag;
600             *pool = \&batch;
601             *signal = \&send_signal;
602             *prctl = \&Mojo::IOLoop::ReadWriteProcess::Session::_prctl;
603             *subreaper = \&Mojo::IOLoop::ReadWriteProcess::Session::subreaper;
604              
605             *enable_subreaper = \&Mojo::IOLoop::ReadWriteProcess::Session::enable_subreaper;
606             *disable_subreaper
607             = \&Mojo::IOLoop::ReadWriteProcess::Session::disable_subreaper;
608             *_get_prctl_syscall
609             = \&Mojo::IOLoop::ReadWriteProcess::Session::_get_prctl_syscall;
610              
611             # Aliases - write
612             *write = \&write_stdin;
613             *stdin = \&write_stdin;
614             *channel_write = \&write_channel;
615              
616             # Aliases - read
617             *read = \&read_stdout;
618             *stdout = \&read_stdout;
619             *getline = \&read_stdout;
620             *stderr = \&read_stderr;
621             *err_getline = \&read_stderr;
622             *channel_read = \&read_channel;
623             *read_all = \&read_all_stdout;
624             *getlines = \&read_all_stdout;
625             *stderr_all = \&read_all_stderr;
626             *err_getlines = \&read_all_stderr;
627             *channel_read_all = \&read_all_channel;
628              
629             # Aliases - IO::Handle
630             *stdin_handle = \&write_stream;
631             *stdout_handle = \&read_stream;
632             *stderr_handle = \&error_stream;
633             *channe_write_handle = \&channel_in;
634             *channel_read_handle = \&channel_out;
635              
636             1;
637              
638              
639             =encoding utf-8
640              
641             =head1 NAME
642              
643             Mojo::IOLoop::ReadWriteProcess - Execute external programs or internal code blocks as separate process.
644              
645             =head1 SYNOPSIS
646              
647             use Mojo::IOLoop::ReadWriteProcess;
648              
649             # Code fork
650             my $process = Mojo::IOLoop::ReadWriteProcess->new(sub { print "Hello\n" });
651             $process->start();
652             print "Running\n" if $process->is_running();
653             $process->getline(); # Will return "Hello\n"
654             $process->pid(); # Process id
655             $process->stop();
656             $process->wait_stop(); # if you intend to wait its lifespan
657              
658             # Methods can be chained, thus this is valid:
659             use Mojo::IOLoop::ReadWriteProcess qw(process);
660             my $output = process( sub { print "Hello\n" } )->start()->wait_stop->getline;
661              
662             # Handles seamelessy also external processes:
663             my $process = process(execute=> '/path/to/bin' )->args([qw(foo bar baz)]);
664             $process->start();
665             my $line_output = $process->getline();
666             my $pid = $process->pid();
667             $process->stop();
668             my @errors = $process->error;
669              
670             # Get process return value
671             $process = process( sub { return "256"; } )->start()->wait_stop;
672             # We need to stop it to retrieve the exit status
673             my $return = $process->return_status;
674              
675             # We can access directly to handlers from the object:
676             my $stdout = $process->read_stream;
677             my $stdin = $process->write_stream;
678             my $stderr = $process->error_stream;
679              
680             # So this works:
681             print $stdin "foo bar\n";
682             my @lines = <$stdout>;
683              
684             # There is also an alternative channel of communication (just for forked processes):
685             my $channel_in = $process->channel_in; # write to the child process
686             my $channel_out = $process->channel_out; # read from the child process
687             $process->channel_write("PING"); # convenience function
688              
689             =head1 DESCRIPTION
690              
691             Mojo::IOLoop::ReadWriteProcess is yet another process manager.
692              
693             =head1 EVENTS
694              
695             L inherits all events from L and can emit
696             the following new ones.
697              
698             =head2 start
699              
700             $process->on(start => sub {
701             my ($process) = @_;
702             $process->is_running();
703             });
704              
705             Emitted when the process starts.
706              
707             =head2 stop
708              
709             $process->on(stop => sub {
710             my ($process) = @_;
711             $process->restart();
712             });
713              
714             Emitted when the process stops.
715              
716             =head2 process_error
717              
718             $process->on(process_error => sub {
719             my ($e) = @_;
720             my @errors = @{$e};
721             });
722              
723             Emitted when the process produce errors.
724              
725             =head2 process_stuck
726              
727             $process->on(process_stuck => sub {
728             my ($self) = @_;
729             ...
730             });
731              
732             Emitted when C is set and all attempts for killing the process
733             in C have been exhausted.
734             The event is emitted before attempting to kill it with SIGKILL and becoming blocking.
735              
736             =head2 SIG_CHLD
737              
738             $process->on(SIG_CHLD => sub {
739             my ($self) = @_;
740             ...
741             });
742              
743             Emitted when we receive SIG_CHLD.
744              
745             =head2 SIG_TERM
746              
747             $process->on(SIG_TERM => sub {
748             my ($self) = @_;
749             ...
750             });
751              
752             Emitted when the child forked process receives SIG_TERM, before exiting.
753              
754             =head2 collected
755              
756             $process->on(collected => sub {
757             my ($self) = @_;
758             ...
759             });
760              
761             Emitted right after status collection.
762              
763             =head2 collect_status
764              
765             $process->on(collect_status => sub {
766             my ($self) = @_;
767             ...
768             });
769              
770             Emitted when on child process waitpid.
771             It is used internally to get the child process status.
772             Note: events attached to it are wiped when process has been stopped.
773              
774             =head1 ATTRIBUTES
775              
776             L inherits all attributes from L and implements
777             the following new ones.
778              
779             =head2 execute
780              
781             use Mojo::IOLoop::ReadWriteProcess;
782             my $process = Mojo::IOLoop::ReadWriteProcess->new(execute => "/usr/bin/perl");
783             $process->start();
784             $process->on( stop => sub { print "Process: ".(+shift()->pid)." finished"; } );
785             $process->stop();
786              
787             C should contain the external program that you wish to run.
788              
789             =head2 code
790              
791             use Mojo::IOLoop::ReadWriteProcess;
792             my $process = Mojo::IOLoop::ReadWriteProcess->new(code => sub { print "Hello" } );
793             $process->start();
794             $process->on( stop => sub { print "Process: ".(+shift()->pid)." finished"; } );
795             $process->stop();
796              
797             It represent the code you want to run in background.
798              
799             You do not need to specify C, it is implied if no arguments is given.
800              
801             my $process = Mojo::IOLoop::ReadWriteProcess->new(sub { print "Hello" });
802             $process->start();
803             $process->on( stop => sub { print "Process: ".(+shift()->pid)." finished"; } );
804             $process->stop();
805              
806             =head2 args
807              
808             use Mojo::IOLoop::ReadWriteProcess;
809             my $process = Mojo::IOLoop::ReadWriteProcess->new(code => sub { print "Hello ".$_[1] }, args => "User" );
810             $process->start();
811             $process->on( stop => sub { print "Process: ".(+shift()->pid)." finished"; } );
812             $process->stop();
813              
814             # The process will print "Hello User"
815              
816             Arguments pass to the external binary or the code block. Use arrayref to pass many.
817              
818             =head2 blocking_stop
819              
820             use Mojo::IOLoop::ReadWriteProcess;
821             my $process = Mojo::IOLoop::ReadWriteProcess->new(code => sub { print "Hello" }, blocking_stop => 1 );
822             $process->start();
823             $process->on( stop => sub { print "Process: ".(+shift()->pid)." finished"; } );
824             $process->stop(); # Will wait indefinitely until the process is stopped
825              
826             Set it to 1 if you want to do blocking stop of the process.
827              
828              
829             =head2 channels
830              
831             use Mojo::IOLoop::ReadWriteProcess;
832             my $process = Mojo::IOLoop::ReadWriteProcess->new(code => sub { print "Hello" }, channels => 0 );
833             $process->start();
834             $process->on( stop => sub { print "Process: ".(+shift()->pid)." finished"; } );
835             $process->stop(); # Will wait indefinitely until the process is stopped
836              
837             Set it to 0 if you want to disable internal channels.
838              
839             =head2 session
840              
841             use Mojo::IOLoop::ReadWriteProcess;
842             my $process = Mojo::IOLoop::ReadWriteProcess->new(sub { print "Hello" });
843             my $session = $process->session;
844             $session->enable_subreaper;
845              
846             Returns the current L singleton.
847              
848             =head2 subreaper
849              
850             use Mojo::IOLoop::ReadWriteProcess;
851             my $process = Mojo::IOLoop::ReadWriteProcess->new(code => sub { print "Hello ".$_[1] }, args => "User" );
852             $process->subreaper(1)->start();
853             $process->on( stop => sub { shift()->disable_subreaper } );
854             $process->stop();
855              
856             # The process will print "Hello User"
857              
858             Mark the current process (not the child) as subreaper on start.
859             It's on invoker behalf to disable subreaper when process stops, as it marks the current process and not the
860             child.
861              
862             =head2 ioloop
863              
864             my $loop = $process->ioloop;
865             $subprocess = $process->ioloop(Mojo::IOLoop->new);
866              
867             Event loop object to control, defaults to the global L singleton.
868              
869             =head2 max_kill_attempts
870              
871             use Mojo::IOLoop::ReadWriteProcess;
872             my $process = Mojo::IOLoop::ReadWriteProcess->new(code => sub { print "Hello" }, max_kill_attempts => 50 );
873             $process->start();
874             $process->on( stop => sub { print "Process: ".(+shift()->pid)." finished"; } );
875             $process->stop(); # It will attempt to send SIGTERM 50 times.
876              
877             Defaults to C<5>, is the number of attempts before bailing out.
878              
879             It can be used with blocking_stop, so if the number of attempts are exhausted,
880             a SIGKILL and waitpid will be tried at the end.
881              
882             =head2 kill_whole_group
883              
884             use Mojo::IOLoop::ReadWriteProcess;
885             my $process = Mojo::IOLoop::ReadWriteProcess->new(code => sub { setpgrp(0, 0); exec(...); }, kill_whole_group => 1 );
886             $process->start();
887             $process->send_signal(...); # Will skip the usual check whether $process->pid is running
888             $process->stop(); # Kills the entire process group and waits for all processes in the group to finish
889              
890             Defaults to C<0>, whether to send signals (e.g. to stop) to the entire process group.
891              
892             This is useful when the sub process creates further sub processes and creates a new process
893             group as shown in the example. In this case it might be useful to take care of the entire process
894             group when stopping and wait for every process in the group to finish.
895              
896             =head2 collect_status
897              
898             Defaults to C<1>, If enabled it will automatically collect the status of the children process.
899             Disable it in case you want to manage your process child directly, and do not want to rely on
900             automatic collect status. If you won't overwrite your C handler,
901             the C event will be still emitted.
902              
903             =head2 serialize
904              
905             Defaults to C<0>, If enabled data returned from forked process will be serialized with Storable.
906              
907             =head2 kill_sleeptime
908              
909             Defaults to C<1>, it's the seconds to wait before attempting SIGKILL when blocking_stop is set to 1.
910              
911             =head2 separate_err
912              
913             Defaults to C<1>, it will create a separate channel to intercept process STDERR,
914             otherwise it will be redirected to STDOUT.
915              
916             =head2 verbose
917              
918             Defaults to C<1>, it indicates message verbosity.
919              
920             =head2 set_pipes
921              
922             Defaults to C<1>, If enabled, additional pipes for process communication are automatically set up.
923              
924              
925             =head2 internal_pipes
926              
927             Defaults to C<1>, If enabled, additional pipes for retreiving process return and errors are set up.
928             Note: If you disable that, the only information provided by the process will be the exit_status.
929              
930             =head2 autoflush
931              
932             Defaults to C<1>, If enabled autoflush of handlers is enabled automatically.
933              
934             =head2 error
935              
936             Returns a L of errors.
937             Note: errors that can be captured only at the end of the process
938              
939             =head1 METHODS
940              
941             L inherits all methods from L and implements
942             the following new ones.
943              
944             =head2 start()
945              
946             use Mojo::IOLoop::ReadWriteProcess qw(process);
947             my $p = process(sub {
948             print STDERR "Boo\n"
949             } )->start;
950              
951             Starts the process
952              
953             =head2 stop()
954              
955             use Mojo::IOLoop::ReadWriteProcess qw(process);
956             my $p = process( execute => "/path/to/bin" )->start->stop;
957              
958             Stop the process. Unless you use C, it will attempt to kill the process
959             without waiting the process to finish. By defaults it send C to the child.
960             You can change that by defining the internal attribute C<_default_kill_signal>.
961             Note, if you want to be *sure* that the process gets killed, you can enable the
962             C attribute, that will attempt to send C after C
963             is reached.
964              
965             =head2 restart()
966              
967             use Mojo::IOLoop::ReadWriteProcess qw(process);
968             my $p = process( execute => "/path/to/bin" )->restart;
969              
970             It restarts the process if stopped, or if already running, it stops it first.
971              
972             =head2 is_running()
973              
974             use Mojo::IOLoop::ReadWriteProcess qw(process);
975             my $p = process( execute => "/path/to/bin" )->start;
976             $p->is_running;
977              
978             Boolean, it inspect if the process is currently running or not.
979              
980             =head2 exit_status()
981              
982             use Mojo::IOLoop::ReadWriteProcess qw(process);
983             my $p = process( execute => "/path/to/bin" )->start;
984              
985             $p->wait_stop->exit_status;
986              
987             Inspect the process exit status, it does the shifting magic, to access to the real value
988             call C<_status()>.
989              
990             =head2 return_status()
991              
992             use Mojo::IOLoop::ReadWriteProcess qw(process);
993             my $p = process( sub { return 42 } )->start;
994              
995             my $s = $p->wait_stop->return_status; # 42
996              
997             Inspect the codeblock return.
998              
999             =head2 enable_subreaper()
1000              
1001             use Mojo::IOLoop::ReadWriteProcess qw(process);
1002             my $p = process()->enable_subreaper;
1003              
1004             Mark the current process (not the child) as subreaper.
1005             This is used typically if you want to mark further children as subreapers inside other forks.
1006              
1007             my $master_p = process(
1008             sub {
1009             my $p = shift;
1010             $p->enable_subreaper;
1011              
1012             process(sub { sleep 4; exit 1 })->start();
1013             process(
1014             sub {
1015             sleep 4;
1016             process(sub { sleep 1; })->start();
1017             })->start();
1018             process(sub { sleep 4; exit 0 })->start();
1019             process(sub { sleep 4; die })->start();
1020             my $manager
1021             = process(sub { sleep 2 })->subreaper(1)->start();
1022             sleep 1 for (0 .. 10);
1023             $manager->stop;
1024             return $manager->session->all->size;
1025             });
1026              
1027             $master_p->subreaper(1);
1028              
1029             $master_p->on(collected => sub { $status++ });
1030              
1031             # On start we setup the current process as subreaper
1032             # So it's up on us to disable it after process is done.
1033             $master_p->on(stop => sub { shift()->disable_subreaper });
1034             $master_p->start();
1035              
1036             =head2 disable_subreaper()
1037              
1038             use Mojo::IOLoop::ReadWriteProcess qw(process);
1039             my $p = process()->disable_subreaper;
1040              
1041             Unset the current process (not the child) as subreaper.
1042              
1043             =head2 prctl()
1044              
1045             use Mojo::IOLoop::ReadWriteProcess qw(process);
1046             my $p = process();
1047             $p->prctl($option, $arg2, $arg3, $arg4, $arg5);
1048              
1049             Internal function to execute and wrap the prctl syscall, accepts the same arguments as prctl.
1050              
1051             =head2 diag()
1052              
1053             use Mojo::IOLoop::ReadWriteProcess qw(process);
1054             my $p = process(sub { print "Hello\n" });
1055             $p->on( stop => sub { shift->diag("Done!") } );
1056             $p->start->wait_stop;
1057              
1058             Internal function to print information to STDERR if verbose attribute is set or either DEBUG mode enabled.
1059             You can use it if you wish to display information on the process status.
1060              
1061             =head2 to_ioloop()
1062              
1063             use Mojo::IOLoop::ReadWriteProcess qw(process);
1064              
1065             my $p = process(sub { print "Hello from first process\n"; sleep 1 });
1066              
1067             $p->start(); # Start and sets the handlers
1068             my $stream = $p->to_ioloop; # Get the stream and demand to IOLoop
1069             my $output;
1070              
1071             # Hook on Mojo::IOLoop::Stream events
1072             $stream->on(read => sub { $output .= pop; $p->is_running ... });
1073              
1074             Mojo::IOLoop->singleton->start() unless Mojo::IOLoop->singleton->is_running;
1075              
1076             Returns a L object and demand the wait operation to L.
1077             It needs C enabled. Default IOLoop can be overridden in C.
1078              
1079             =head2 wait()
1080              
1081             use Mojo::IOLoop::ReadWriteProcess qw(process);
1082             my $p = process(sub { print "Hello\n" })->wait;
1083             # ... here now you can mangle $p handlers and such
1084              
1085             Waits until the process finishes, but does not performs cleanup operations (until stop is called).
1086              
1087             =head2 wait_stop()
1088              
1089             use Mojo::IOLoop::ReadWriteProcess qw(process);
1090             my $p = process(sub { print "Hello\n" })->start->wait_stop;
1091             # $p is not running anymore, and all possible events have been granted to be emitted.
1092              
1093             Waits until the process finishes, and perform cleanup operations.
1094              
1095             =head2 errored()
1096              
1097             use Mojo::IOLoop::ReadWriteProcess qw(process);
1098             my $p = process(sub { die "Nooo" })->start->wait_stop;
1099             $p->errored; # will return "1"
1100              
1101             Returns a boolean indicating if the process had errors or not.
1102              
1103             =head2 write_pidfile()
1104              
1105             use Mojo::IOLoop::ReadWriteProcess qw(process);
1106             my $p = process(sub { die "Nooo" } );
1107             $p->pidfile("foobar");
1108             $p->start();
1109             $p->write_pidfile();
1110              
1111             Forces writing PID of process to specified pidfile in the attributes of the object.
1112             Useful only if the process have been already started, otherwise if a pidfile it's supplied
1113             as attribute, it will be done automatically.
1114              
1115             =head2 write_stdin()
1116              
1117             use Mojo::IOLoop::ReadWriteProcess qw(process);
1118             my $p = process(sub { my $a = ; print STDERR "Hello my name is $a\n"; } )->start;
1119             $p->write_stdin("Larry");
1120             $p->read_stderr; # process STDERR will contain: "Hello my name is Larry\n"
1121              
1122             Write data to process STDIN.
1123              
1124             =head2 write_channel()
1125              
1126             use Mojo::IOLoop::ReadWriteProcess qw(process);
1127             my $p = process(sub {
1128             my $self = shift;
1129             my $parent_output = $self->channel_out;
1130             my $parent_input = $self->channel_in;
1131              
1132             while(defined(my $line = <$parent_input>)) {
1133             print $parent_output "PONG\n" if $line =~ /PING/i;
1134             }
1135             } )->start;
1136             $p->write_channel("PING");
1137             my $out = $p->read_channel;
1138             # $out is PONG
1139             my $child_output = $p->channel_out;
1140             while(defined(my $line = <$child_output>)) {
1141             print "Process is replying back with $line!\n";
1142             $p->write_channel("PING");
1143             }
1144              
1145             Write data to process channel. Note, it's not STDIN, neither STDOUT, it's a complete separate channel
1146             dedicated to parent-child communication.
1147             In the parent process, you can access to the same pipes (but from the opposite direction):
1148              
1149             my $child_output = $self->channel_out;
1150             my $child_input = $self->channel_in;
1151              
1152             =head2 read_stdout()
1153              
1154             use Mojo::IOLoop::ReadWriteProcess qw(process);
1155             my $p = process(sub {
1156             print "Boo\n"
1157             } )->start;
1158             $p->read_stdout;
1159              
1160             Gets a single line from process STDOUT.
1161              
1162             =head2 read_channel()
1163              
1164             use Mojo::IOLoop::ReadWriteProcess qw(process);
1165             my $p = process(sub {
1166             my $self = shift;
1167             my $parent_output = $self->channel_out;
1168             my $parent_input = $self->channel_in;
1169              
1170             print $parent_output "PONG\n";
1171             } )->start;
1172             $p->read_channel;
1173              
1174             Gets a single line from process channel.
1175              
1176             =head2 read_stderr()
1177              
1178             use Mojo::IOLoop::ReadWriteProcess qw(process);
1179             my $p = process(sub {
1180             print STDERR "Boo\n"
1181             } )->start;
1182             $p->read_stderr;
1183              
1184             Gets a single line from process STDERR.
1185              
1186             =head2 read_all_stdout()
1187              
1188             use Mojo::IOLoop::ReadWriteProcess qw(process);
1189             my $p = process(sub {
1190             print "Boo\n"
1191             } )->start;
1192             $p->read_all_stdout;
1193              
1194             Gets all the STDOUT output of the process.
1195              
1196             =head2 read_all_channel()
1197              
1198             use Mojo::IOLoop::ReadWriteProcess qw(process);
1199             my $p = process(sub {
1200             shift->channel_out->write("Ping")
1201             } )->start;
1202             $p->read_all_channel;
1203              
1204             Gets all the channel output of the process.
1205              
1206             =head2 read_all_stderr()
1207              
1208             use Mojo::IOLoop::ReadWriteProcess qw(process);
1209             my $p = process(sub {
1210             print STDERR "Boo\n"
1211             } )->start;
1212             $p->read_all_stderr;
1213              
1214             Gets all the STDERR output of the process.
1215              
1216             =head2 send_signal()
1217              
1218             use Mojo::IOLoop::ReadWriteProcess qw(process);
1219             use POSIX;
1220             my $p = process( execute => "/path/to/bin" )->start;
1221              
1222             $p->send_signal(POSIX::SIGKILL);
1223              
1224             Send a signal to the process
1225              
1226             =head1 EXPORTS
1227              
1228             =head2 parallel()
1229              
1230             use Mojo::IOLoop::ReadWriteProcess qw(parallel);
1231             my $pool = parallel sub { print "Hello\n" } => 5;
1232             $pool->start();
1233             $pool->on( stop => sub { print "Process: ".(+shift()->pid)." finished"; } );
1234             $pool->stop();
1235              
1236             Returns a L object that represent a group of processes.
1237              
1238             It accepts the same arguments as L, and the last one represent the number of processes to generate.
1239              
1240             =head2 batch()
1241              
1242             use Mojo::IOLoop::ReadWriteProcess qw(batch);
1243             my $pool = batch;
1244             $pool->add(sub { print "Hello\n" });
1245             $pool->on(stop => sub { shift->_diag("Done!") })->start->wait_stop;
1246              
1247             Returns a L object generated from supplied arguments.
1248             It accepts as input the same parameter of L constructor ( see parallel() ).
1249              
1250             =head2 process()
1251              
1252             use Mojo::IOLoop::ReadWriteProcess qw(process);
1253             my $p = process sub { print "Hello\n" };
1254             $p->start()->wait_stop;
1255              
1256             or even:
1257              
1258             process(sub { print "Hello\n" })->start->wait_stop;
1259              
1260             Returns a L object that represent a process.
1261              
1262             It accepts the same arguments as L.
1263              
1264             =head2 queue()
1265              
1266             use Mojo::IOLoop::ReadWriteProcess qw(queue);
1267             my $q = queue;
1268             $q->add(sub { return 42 } );
1269             $q->consume;
1270              
1271             Returns a L object that represent a queue.
1272              
1273             =head1 DEBUGGING
1274              
1275             You can set the MOJO_EVENTEMITTER_DEBUG environment variable to get some advanced diagnostics information printed to STDERR.
1276              
1277             MOJO_EVENTEMITTER_DEBUG=1
1278              
1279             Also, you can set MOJO_PROCESS_DEBUG environment variable to get diagnostics about the process execution.
1280              
1281             MOJO_PROCESS_DEBUG=1
1282              
1283             =head1 LICENSE
1284              
1285             Copyright (C) Ettore Di Giacinto.
1286              
1287             This library is free software; you can redistribute it and/or modify
1288             it under the same terms as Perl itself.
1289              
1290             =head1 AUTHOR
1291              
1292             Ettore Di Giacinto Eedigiacinto@suse.comE
1293              
1294             =cut