File Coverage

blib/lib/AnyEvent/Proc.pm
Criterion Covered Total %
statement 315 498 63.2
branch 62 124 50.0
condition 13 50 26.0
subroutine 78 148 52.7
pod 36 36 100.0
total 504 856 58.8


line stmt bran cond sub pod time code
1 32     32   2104870 use strict;
  32         65  
  32         1118  
2 32     32   157 use warnings;
  32         42  
  32         1079  
3              
4             package AnyEvent::Proc;
5              
6             # ABSTRACT: Run external commands
7              
8 32     32   11582 use AnyEvent;
  32         50552  
  32         631  
9 32     32   23199 use AnyEvent::Handle;
  32         593929  
  32         1213  
10 32     32   325 use AnyEvent::Util ();
  32         56  
  32         470  
11 32     32   21142 use Try::Tiny;
  32         41750  
  32         1803  
12 32     32   16356 use Class::Load;
  32         716520  
  32         1574  
13 32     32   219 use Exporter qw(import);
  32         38  
  32         735  
14 32     32   118 use Carp;
  32         45  
  32         1483  
15 32     32   16840 use POSIX;
  32         153089  
  32         140  
16              
17             our $VERSION = '0.104'; # VERSION
18              
19             our @EXPORT_OK = qw(run run_cb reader writer);
20              
21             sub _rpipe {
22 78     78   3528 my ( $R, $W ) = AnyEvent::Util::portable_pipe;
23             (
24             $R,
25             AnyEvent::Handle->new(
26             fh => $W,
27             on_error => sub {
28 0     0   0 my ( $handle, $fatal, $message ) = @_;
29 0         0 AE::log warn => "error writing to handle: $message";
30             },
31 78         3029 ),
32             );
33             }
34              
35             sub _wpipe {
36 150     150   6327 my ( $R, $W ) = AnyEvent::Util::portable_pipe;
37             (
38             AnyEvent::Handle->new(
39             fh => $R,
40             on_error => sub {
41 0     0   0 my ( $handle, $fatal, $message ) = @_;
42 0         0 AE::log warn => "error reading from handle: $message";
43             },
44 150         3523 ),
45             $W,
46             );
47             }
48              
49             sub _on_read_helper {
50 59     59   137 my ( $aeh, $sub ) = @_;
51             $aeh->on_read(
52             sub {
53 39     39   19339938 my $x = $_[0]->rbuf;
54 39         462 $_[0]->rbuf = '';
55 39         267 $sub->($x);
56             }
57 59         964 );
58             }
59              
60             sub _read_on_scalar {
61 1     1   4 my ( $var, $sub ) = @_;
62 1   50     7 my $old = $$var || '';
63 1         39 tie $$var, __PACKAGE__ . '::TiedScalar', $sub;
64 1         16 $$var = $old;
65 1         30 $var;
66             }
67              
68             sub _reaper {
69 0     0   0 my ( $self, $waiters ) = @_;
70             my $sub = sub {
71              
72             # my $message = shift; # currently unused
73 0     0   0 foreach my $waiter (@$waiters) {
74 0         0 AE::log debug => "reap $waiter";
75 0 0       0 if ( ref $waiter eq 'CODE' ) {
    0          
    0          
76 0         0 $waiter->(undef);
77             }
78             elsif ( ref $waiter eq 'AnyEvent::CondVar' ) {
79 0         0 $waiter->send(undef);
80             }
81             elsif ( ref $waiter eq 'Coro::Channel' ) {
82 0         0 $waiter->shutdown;
83             }
84             else {
85 0         0 AE::log note => "cannot reap $waiter";
86             }
87             }
88 0         0 };
89 0         0 push @{ $self->{reapers} } => $sub;
  0         0  
90 0         0 $sub;
91             }
92              
93             sub _push_waiter {
94 0     0   0 my ( $self, $what, $var ) = @_;
95 0         0 push @{ $self->{waiters}->{$what} } => $var;
  0         0  
96             }
97              
98             sub _run_cmd {
99 69     69   119 my ( $cmd, $redir, $pidref ) = @_;
100              
101 69         2114 my $cv = AE::cv;
102              
103 69         3839 my %redir = %$redir;
104              
105 69         70422 my $pid = fork;
106 69 50       3755 AE::log error => "cannot fork: $!" unless defined $pid;
107              
108 69 100       878 unless ($pid) {
109              
110             # move any existing fd's out of the way
111             # this also ensures that dup2 is never called with fd1==fd2
112             # so the cloexec flag is always cleared
113 22         673 my ( @oldfh, @close );
114 22         802 for my $fh ( values %redir ) {
115 68         527 push @oldfh, $fh; # make sure we keep it open
116 68         557 $fh = fileno $fh; # we only want the fd
117              
118             # dup if we are in the way
119             # if we "leak" fds here, they will be dup2'ed over later
120 68   0     1202 defined( $fh = POSIX::dup($fh) )
121             or POSIX::_exit(124)
122             while exists $redir{$fh};
123             }
124              
125             # execute redirects
126 22         928 while ( my ( $k, $v ) = each %redir ) {
127 68 50       1749 defined POSIX::dup2( $v, $k )
128             or POSIX::_exit(123);
129             }
130              
131 22         894 AnyEvent::Util::close_all_fds_except( keys %redir );
132              
133 22         1003068 my $bin = $cmd->[0];
134              
135 32     32   83065 no warnings; ## no critic
  32         63  
  32         63135  
136              
137 22         64 exec {$bin} @$cmd;
  22         0  
138              
139 0         0 POSIX::_exit(126);
140             }
141              
142 47         512 $$pidref = $pid;
143              
144 47         367 my $w;
145             $w = AE::child $pid => sub {
146 47     47   3240278 my $status = $_[1] >> 8;
147 47         147 my $signal = $_[1] & 127;
148 47         126 my $coredump = $_[1] & 128;
149 47 100       334 AE::log info => "child exited with status $status" if $status;
150 47 100       907 AE::log debug => "child exited with signal $signal" if $signal;
151 47 50       296 AE::log note => "child exited with coredump" if $coredump;
152 47         136 undef $w;
153 47         352 map { close $_ } values %redir;
  144         1467  
154 47         702 $cv->send($status);
155 47         11017 };
156              
157 47         5245 $cv;
158             }
159              
160             sub new {
161 69     69 1 15574 my ( $class, %options ) = @_;
162              
163 69   100     524 $options{args} ||= [];
164              
165 69         302 my ( $rIN, $wIN ) = _rpipe;
166 69         5954 my ( $rOUT, $wOUT ) = _wpipe;
167 69         3186 my ( $rERR, $wERR ) = _wpipe;
168              
169 69 100       2801 my @xhs = @{ delete( $options{extras} ) || [] };
  69         592  
170              
171 69         131 my @args = map { "$_" } @{ delete $options{args} };
  23         103  
  69         213  
172              
173 69         127 my $pid;
174              
175 5         13 my %redir = (
176             0 => $rIN,
177             1 => $wOUT,
178             2 => $wERR,
179 69         376 map { ( "$_" => $_->B ) } @xhs
180             );
181              
182 69         419 my $cv = _run_cmd( [ delete $options{bin} => @args ], \%redir, \$pid );
183 47         2589 my $waiter = AE::cv;
184              
185 3         61 my $self = bless {
186             handles => {
187             in => $wIN,
188             out => $rOUT,
189             err => $rERR,
190 3         14 map { ( "$_" => $_->A ) } @xhs,
191             },
192             pid => $pid,
193             listeners => {
194             exit => delete $options{on_exit},
195             ttl_exceed => delete $options{on_ttl_exceed},
196             },
197             eol => "\n",
198             cv => $cv,
199             alive => 1,
200             waiter => $waiter,
201             waiters => {
202             in => [],
203             out => [],
204             err => [],
205 47   33     4710 map { ( "$_" => [] ) } @xhs
206             },
207             reapers => [],
208             } => ref $class
209             || $class;
210              
211 47         160 map { $_->{proc} = $self } @xhs;
  3         25  
212              
213             {
214 47         93 my $eol = quotemeta $self->_eol;
  47         735  
215 47   33     1932 $self->{reol} = delete $options{reol} || qr{$eol};
216             }
217              
218 47 100       206 if ( $options{ttl} ) {
219             $self->{timer} = AnyEvent->timer(
220             after => delete $options{ttl},
221             cb => sub {
222 1 50   1   992175 return unless $self->alive;
223 1         5 $self->kill;
224 1         5 $self->_emit('ttl_exceed');
225             }
226 37         1600 );
227             }
228              
229 47     10   1858 my $kill = sub { $self->end };
  10         55  
230              
231 47 100       201 if ( $options{timeout} ) {
232 4         108 $wIN->timeout( $options{timeout} );
233 4         392 $rOUT->timeout( $options{timeout} );
234 4         144 $rERR->timeout( $options{timeout} );
235 4         80 delete $options{timeout};
236              
237 4   33     48 $self->_on( timeout => ( delete( $options{on_timeout} ) || $kill ) );
238 4     4   8 my $cb = sub { $self->_emit('timeout') };
  4         3968604  
239 4         40 $wIN->on_timeout($cb);
240 4         36 $rOUT->on_timeout($cb);
241 4         12 $rERR->on_timeout($cb);
242             }
243              
244 47 100       249 if ( $options{wtimeout} ) {
245 3         69 $wIN->wtimeout( delete $options{wtimeout} );
246              
247 3   33     300 $self->_on( wtimeout => ( delete( $options{on_wtimeout} ) || $kill ) );
248 3     3   39 my $cb = sub { $self->_emit('wtimeout') };
  3         2991423  
249 3         24 $wIN->on_wtimeout($cb);
250             }
251              
252 47 100       244 if ( $options{rtimeout} ) {
253 2         90 $rOUT->rtimeout( delete $options{rtimeout} );
254              
255 2   33     248 $self->_on( rtimeout => ( delete( $options{on_rtimeout} ) || $kill ) );
256 2     2   10 my $cb = sub { $self->_emit('rtimeout') };
  2         1994320  
257 2         14 $rOUT->on_rtimeout($cb);
258             }
259              
260 47 100       213 if ( $options{etimeout} ) {
261 1         24 $rERR->rtimeout( delete $options{etimeout} );
262              
263 1   33     138 $self->_on( etimeout => ( delete( $options{on_etimeout} ) || $kill ) );
264 1     1   16 my $cb = sub { $self->_emit('etimeout') };
  1         996201  
265 1         13 $rERR->on_rtimeout($cb);
266             }
267              
268 47 100       185 if ( $options{errstr} ) {
269 16         152 my $sref = delete $options{errstr};
270 16         121 $$sref = '';
271 16         340 $self->pipe( err => $sref );
272             }
273              
274 47 100       869 if ( $options{outstr} ) {
275 26         134 my $sref = delete $options{outstr};
276 26         136 $$sref = '';
277 26         308 $self->pipe( out => $sref );
278             }
279              
280 47         776 $waiter->begin;
281             $cv->cb(
282             sub {
283 47     47   1224 $self->{status} = shift->recv;
284 47         691 $self->{alive} = 0;
285 47         185 undef $self->{timer};
286 47         807 $waiter->end;
287 47         1043 $self->_emit( exit => $self->{status} );
288             }
289 47         1441 );
290              
291 47 50       798 if ( keys %options ) {
292 0         0 AE::log note => "unknown left-over option(s): " . join ', ' =>
293             keys %options;
294             }
295              
296 47         936 $self;
297             }
298              
299             sub reader {
300 3     3 1 2733 my ( $r, $w ) = _wpipe;
301 3         288 bless {
302             r => $r,
303             w => $w,
304             fileno => fileno( $r->fh )
305             } => __PACKAGE__
306             . '::R';
307             }
308              
309             sub writer {
310 2     2 1 12 my ( $r, $w ) = _rpipe;
311 2         204 bless {
312             r => $r,
313             w => $w,
314             fileno => fileno( $w->fh )
315             } => __PACKAGE__
316             . '::W';
317             }
318              
319             sub run {
320 12     12 1 12560 my $cv = AE::cv;
321             run_cb(
322             @_,
323             sub {
324 9     9   62 $cv->send( \@_ );
325             }
326 12         1653 )->recv;
327 9         813 my ( $out, $err, $status ) = @{ $cv->recv };
  9         42  
328 9         102 $? = $status << 8;
329 9 100       33 if (wantarray) {
330 3         42 return ( $out, $err );
331             }
332             else {
333 6 50       26 carp $err if $err;
334 6         38 return $out;
335             }
336             }
337              
338             sub run_cb {
339 14     14 1 3422 my $bin = shift;
340 14         20 my $cb = pop;
341 14         43 my @args = @_;
342 14         38 my ( $out, $err ) = ( '', '' );
343 14         127 my $proc = __PACKAGE__->new(
344             bin => $bin,
345             args => \@args,
346             outstr => \$out,
347             errstr => \$err
348             );
349 10         90 $proc->finish;
350             $proc->wait(
351             sub {
352 10     10   35 my $status = $proc->{status};
353 10         80 $? = $status << 8;
354 10         52 $cb->( $out, $err, $status );
355             }
356 10         152 );
357             }
358              
359             sub _on {
360 10     10   33 my ( $self, $name, $handler ) = @_;
361 10         50 $self->{listeners}->{$name} = $handler;
362             }
363              
364 35     35 1 118 sub in { shift->_geth('in') }
365              
366 0     0 1 0 sub out { shift->_geth('out') }
367              
368 0     0 1 0 sub err { shift->_geth('err') }
369              
370             sub _geth {
371 105     105   907 shift->{handles}->{ pop() };
372             }
373              
374 56     56   1093 sub _eol { shift->{eol} }
375 0     0   0 sub _reol { shift->{reol} }
376              
377             sub _emit {
378 58     58   212 my ( $self, $name, @args ) = @_;
379 58         448 AE::log debug => "trapped $name";
380 58 100 66     5612 if ( exists $self->{listeners}->{$name}
381             and defined $self->{listeners}->{$name} )
382             {
383 15         89 $self->{listeners}->{$name}->( $self, @args );
384             }
385             }
386              
387             sub pid {
388 3     3 1 104 shift->{pid};
389             }
390              
391             sub fire {
392 3     3 1 28 my ( $self, $signal ) = @_;
393 3 100       15 $signal = 'TERM' unless defined $signal;
394 3         14 $signal =~ s{^sig}{}i;
395 3         23 AE::log debug => "fire SIG$signal";
396 3         5499 kill uc $signal => $self->pid;
397             }
398              
399             sub kill {
400 1     1 1 2 my ($self) = @_;
401 1         4 $self->fire('kill');
402             }
403              
404             sub fire_and_kill {
405 0     0 1 0 my $self = shift;
406 0 0       0 my $cb = ( ref $_[-1] eq 'CODE' ? pop : undef );
407 0         0 my $time = pop;
408 0   0     0 my $signal = uc( pop || 'TERM' );
409             my $w = AnyEvent->timer(
410             after => $time,
411             cb => sub {
412 0 0   0   0 return unless $self->alive;
413 0         0 $self->kill;
414             }
415 0         0 );
416 0         0 $self->fire($signal);
417 0 0       0 if ($cb) {
418             return $self->wait(
419             sub {
420 0     0   0 undef $w;
421 0         0 $cb->(@_);
422             }
423 0         0 );
424             }
425             else {
426 0         0 my $exit = $self->wait;
427 0         0 undef $w;
428 0         0 return $exit;
429             }
430             }
431              
432             sub alive {
433 1     1 1 3 my $self = shift;
434 1 50       9 return 0 unless $self->{alive};
435 1 50       9 $self->fire(0) ? 1 : 0;
436             }
437              
438             sub wait {
439 47     47 1 11755 my ( $self, $cb ) = @_;
440              
441             my $next = sub {
442 47     47   357 my $cv = shift;
443 47         397 $cv->recv;
444 47         507 waitpid $self->{pid} => 0;
445 47 100       478 $cb->( $self->{status} ) if ref $cb eq 'CODE';
446 47         1603 $self->end;
447 47         615 $self->{status};
448 47         406 };
449 47         560 AE::log debug => "waiting for "
450             . ( $self->{waiter}->{_ae_counter} ) . " ends";
451 47 100       37294 if ($cb) {
452 11         128 $self->{waiter}->cb($next);
453 11         383 return $self->{waiter};
454             }
455             else {
456 36         456 $self->{waiter}->recv;
457 36         1618 return $next->( $self->{waiter} );
458             }
459             }
460              
461             sub finish {
462 35     35 1 479 my ($self) = @_;
463 35         143 $self->in->destroy;
464 35         2597 $self;
465             }
466              
467             sub end {
468 57     57 1 128 my ($self) = @_;
469 57         115 map { $_->destroy } values %{ $self->{handles} };
  174         4413  
  57         459  
470 57         881 map { $_->() } @{ $self->{reapers} };
  0         0  
  57         498  
471 57         158 $self;
472             }
473              
474             sub stop_timeout {
475 0     0 1 0 my ($self) = @_;
476 0         0 $self->in->timeout(0);
477 0         0 $self->out->timeout(0);
478 0         0 $self->err->timeout(0);
479             }
480              
481             sub stop_wtimeout {
482 0     0 1 0 my ($self) = @_;
483 0         0 $self->in->wtimeout(0);
484             }
485              
486             sub stop_rtimeout {
487 0     0 1 0 my ($self) = @_;
488 0         0 $self->out->rtimeout(0);
489             }
490              
491             sub stop_etimeout {
492 0     0 1 0 my ($self) = @_;
493 0         0 $self->err->rtimeout(0);
494             }
495              
496             sub write {
497 20     20 1 80 my ( $self, $type, @args ) = @_;
498 20         31 my $ok = 0;
499             try {
500 20     20   2113 $self->_geth('in')->push_write( $type => @args );
501 20         1290 $ok = 1;
502             }
503             catch {
504 0     0   0 AE::log warn => $_;
505 20         576 };
506 20         517 $ok;
507             }
508              
509             sub writeln {
510 8     8 1 144 my ( $self, @lines ) = @_;
511 8         40 $self->write( $_ . $self->_eol ) for @lines;
512 8         21 $self;
513             }
514              
515             sub pipe {
516 50     50 1 697 my $self = shift;
517 50         87 my $peer = pop;
518 50   100     190 my $what = ( pop || 'out' );
519 50 100       257 if ( ref $what ) {
520 2         4 $what = "$what";
521             }
522             else {
523 48         170 $what = lc $what;
524 48         309 $what =~ s{^std}{};
525             }
526 32     32   186 use Scalar::Util qw(blessed);
  32         50  
  32         11811  
527 50         225 my $sub;
528 50 100       510 if ( blessed $peer) {
    100          
    50          
    0          
529 3 50       250 if ( $peer->isa(__PACKAGE__) ) {
    100          
    50          
    50          
530             $sub = sub {
531 0     0   0 $peer->write(shift);
532             }
533 0         0 }
534             elsif ( $peer->isa('AnyEvent::Handle') ) {
535             $sub = sub {
536 2     2   20 $peer->push_write(shift);
537             }
538 2         20 }
539             elsif ( $peer->isa('Coro::Channel') ) {
540             $sub = sub {
541 0     0   0 $peer->put(shift);
542             }
543 0         0 }
544             elsif ( $peer->can('print') ) {
545             $sub = sub {
546 1     1   12 $peer->print(shift);
547             }
548 1         15 }
549             }
550             elsif ( ref $peer eq 'SCALAR' ) {
551             $sub = sub {
552 24     24   163 $$peer .= shift;
553             }
554 44         419 }
555             elsif ( ref $peer eq 'GLOB' ) {
556             $sub = sub {
557 3     3   51 print $peer shift();
558             }
559 3         21 }
560             elsif ( ref $peer eq 'CODE' ) {
561 0         0 $sub = $peer;
562             }
563 50 50       157 if ($sub) {
564 50         802 AE::log debug => "pipe $peer from $what";
565 50         76871 my $aeh = $self->_geth($what);
566             $aeh->on_eof(
567             sub {
568 50     50   3543 AE::log debug => "eof: $what";
569 50         2820 $self->{waiter}->end;
570             }
571 50         1010 );
572 50         704 $self->{output}->{$what} = _on_read_helper( $aeh, $sub );
573 50         4446 $self->{waiter}->begin;
574             }
575             else {
576 0         0 AE::log fatal => "cannot pipe $peer from $what";
577             }
578             }
579              
580             sub pull {
581 15     15 1 1089 my ( $self, $peer ) = @_;
582 15         98 $self->{input} = $peer;
583 15         138 AE::log debug => "pull $peer to stdin";
584 32     32   269 use Scalar::Util qw(blessed);
  32         47  
  32         47133  
585 15         835 my $sub;
586 15 100       193 if ( blessed $peer) {
    100          
    50          
587 11 50       290 if ( $peer->isa(__PACKAGE__) ) {
    100          
    50          
    0          
588 0         0 return $peer->pipe($self);
589             }
590             elsif ( $peer->isa('AnyEvent::Handle') ) {
591             $peer->on_eof(
592             sub {
593 9     9   893 AE::log debug => "pull($peer)->on_eof";
594 9         497 $self->finish;
595             }
596 9         103 );
597             $peer->on_error(
598             sub {
599 0     0   0 AE::log error => "pull($peer)->on_error(" . $_[2] . ")";
600 0         0 shift->destroy;
601             }
602 9         130 );
603             return _on_read_helper(
604             $peer,
605             sub {
606 9     9   49 AE::log debug => "pull($peer)->on_read";
607 9         493 $self->write( shift() );
608             }
609 9         216 );
610             }
611             elsif ( $peer->isa('IO::Handle') ) {
612 2         52 return $self->pull( AnyEvent::Handle->new( fh => $peer ) );
613             }
614             elsif ( $peer->isa('Coro::Channel') ) {
615 0 0       0 if ( my $class = load_class('Coro') ) {
616             return $class->new(
617             sub {
618 0     0   0 while ( my $x = $peer->get ) {
619 0 0       0 $self->write($x) or last;
620 0         0 Coro::cede();
621             }
622 0         0 $self->finish;
623             }
624 0         0 );
625             }
626             }
627             }
628             elsif ( ref $peer eq 'SCALAR' ) {
629             return _read_on_scalar(
630             $peer,
631             sub {
632 3     3   26 AE::log debug => "pull($peer)->STORE";
633 3         153 $self->write( shift() );
634             }
635 1         35 );
636             }
637             elsif ( ref $peer eq 'GLOB' ) {
638 3         57 return $self->pull( AnyEvent::Handle->new( fh => $peer ) );
639             }
640 0         0 AE::log fatal => "cannot pull $peer to stdin";
641             }
642              
643             sub _push_read {
644 0     0   0 my ( $self, $what, @args ) = @_;
645 0         0 my $ok = 0;
646             try {
647 0     0   0 $self->_geth($what)->push_read(@args);
648 0         0 $ok = 1;
649             }
650             catch {
651 0     0   0 AE::log warn => "cannot push_read from std$what: $_";
652 0         0 };
653 0         0 $ok;
654             }
655              
656             sub _unshift_read {
657 0     0   0 my ( $self, $what, @args ) = @_;
658 0         0 my $ok = 0;
659             try {
660 0     0   0 $self->_geth($what)->unshift_read(@args);
661 0         0 $ok = 1;
662             }
663             catch {
664 0     0   0 AE::log warn => "cannot unshift_read from std$what: $_";
665 0         0 };
666 0         0 $ok;
667             }
668              
669             sub _readline {
670 0     0   0 my ( $self, $what, $sub ) = @_;
671 0         0 $self->_push_read( $what => line => $self->_reol, $sub );
672             }
673              
674             sub _readchunk {
675 0     0   0 my ( $self, $what, $bytes, $sub ) = @_;
676 0         0 $self->_push_read( $what => chunk => $bytes => $sub );
677             }
678              
679             sub _sub_cb {
680 0     0   0 my ($cb) = @_;
681 0     0   0 sub { $cb->( $_[1] ) }
682 0         0 }
683              
684             sub _sub_cv {
685 0     0   0 my ($cv) = @_;
686 0     0   0 sub { $cv->send( $_[1] ) }
687 0         0 }
688              
689             sub _sub_ch {
690 0     0   0 my ($ch) = @_;
691 0     0   0 sub { $ch->put( $_[1] ) }
692 0         0 }
693              
694             sub _readline_cb {
695 0     0   0 my ( $self, $what, $cb ) = @_;
696 0         0 $self->_push_waiter( $what => $cb );
697 0         0 $self->_readline( $what => _sub_cb($cb) );
698             }
699              
700             sub _readline_cv {
701 0     0   0 my ( $self, $what, $cv ) = @_;
702 0   0     0 $cv ||= AE::cv;
703 0         0 $self->_push_waiter( $what => $cv );
704 0 0       0 $cv->send unless $self->_readline( $what => _sub_cv($cv) );
705 0         0 $cv;
706             }
707              
708             sub _readline_ch {
709 0     0   0 my ( $self, $what, $channel ) = @_;
710 0 0       0 unless ($channel) {
711 0 0       0 if ( my $class = load_class('Coro::Channel') ) {
712 0   0     0 $channel ||= $class->new;
713             }
714             }
715 0         0 $self->_push_waiter( $what => $channel );
716 0 0       0 $channel->shutdown unless $self->_readline( $what => _sub_ch($channel) );
717 0         0 $channel;
718             }
719              
720             sub _readlines_cb {
721 0     0   0 my ( $self, $what, $cb ) = @_;
722 0         0 $self->_push_waiter( $what => $cb );
723             $self->_geth($what)->on_read(
724             sub {
725 0     0   0 $self->_readline( $what => _sub_cb($cb) );
726             }
727 0         0 );
728             }
729              
730             sub _readlines_ch {
731 0     0   0 my ( $self, $what, $channel ) = @_;
732 0 0       0 unless ($channel) {
733 0 0       0 if ( my $class = load_class('Coro::Channel') ) {
734 0   0     0 $channel ||= $class->new;
735             }
736             }
737 0         0 $self->_push_waiter( $what => $channel );
738             $channel->shutdown unless $self->_geth($what)->on_read(
739             sub {
740 0     0   0 $self->_readline( $what => _sub_ch($channel) );
741             }
742 0 0       0 );
743 0         0 $channel;
744             }
745              
746             sub _readchunk_cb {
747 0     0   0 my ( $self, $what, $bytes, $cb ) = @_;
748 0         0 $self->_push_waiter( $what => $cb );
749 0         0 $self->_readchunk( $what, $bytes, _sub_cb($cb) );
750             }
751              
752             sub _readchunk_cv {
753 0     0   0 my ( $self, $what, $bytes, $cv ) = @_;
754 0   0     0 $cv ||= AE::cv;
755 0         0 $self->_push_waiter( $what => $cv );
756 0         0 $self->_readchunk( $what, $bytes, _sub_cv($cv) );
757 0         0 $cv;
758             }
759              
760             sub _readchunk_ch {
761 0     0   0 my ( $self, $what, $bytes, $channel ) = @_;
762 0 0       0 unless ($channel) {
763 0 0       0 if ( my $class = load_class('Coro::Channel') ) {
764 0   0     0 $channel ||= $class->new;
765             }
766             }
767 0         0 $self->_push_waiter( $what => $channel );
768 0 0       0 $channel->shutdown unless $self->_readline( $what => _sub_ch($channel) );
769 0         0 $channel;
770             }
771              
772             sub _readchunks_ch {
773 0     0   0 my ( $self, $what, $bytes, $channel ) = @_;
774 0 0       0 unless ($channel) {
775 0 0       0 if ( my $class = load_class('Coro::Channel') ) {
776 0   0     0 $channel ||= $class->new;
777             }
778             }
779 0         0 $self->_push_waiter( $what => $channel );
780             $channel->shutdown unless $self->_geth($what)->on_read(
781             sub {
782 0     0   0 $self->_readline( $what => _sub_ch($channel) );
783             }
784 0 0       0 );
785 0         0 $channel;
786             }
787              
788             sub readline_cb {
789 0     0 1 0 my ( $self, $cb ) = @_;
790 0         0 $self->_readline_cb( out => $cb );
791             }
792              
793             sub readline_cv {
794 0     0 1 0 my ( $self, $cv ) = @_;
795 0         0 $self->_readline_cv( out => $cv );
796             }
797              
798             sub readline_ch {
799 0     0 1 0 my ( $self, $ch ) = @_;
800 0         0 $self->_readline_ch( out => $ch );
801             }
802              
803             sub readlines_cb {
804 0     0 1 0 my ( $self, $cb ) = @_;
805 0         0 $self->_readlines_cb( out => $cb );
806             }
807              
808             sub readlines_ch {
809 0     0 1 0 my ( $self, $ch ) = @_;
810 0         0 $self->_readlines_ch( out => $ch );
811             }
812              
813             sub readline {
814 0     0 1 0 shift->readline_cv->recv;
815             }
816              
817             sub readline_error_cb {
818 0     0 1 0 my ( $self, $cb ) = @_;
819 0         0 $self->_readline_cb( err => $cb );
820             }
821              
822             sub readline_error_cv {
823 0     0 1 0 my ( $self, $cv ) = @_;
824 0         0 $self->_readline_cv( err => $cv );
825             }
826              
827             sub readline_error_ch {
828 0     0 1 0 my ( $self, $ch ) = @_;
829 0         0 $self->_readline_ch( err => $ch );
830             }
831              
832             sub readlines_error_cb {
833 0     0 1 0 my ( $self, $cb ) = @_;
834 0         0 $self->_readlines_cb( out => $cb );
835             }
836              
837             sub readlines_error_ch {
838 0     0 1 0 my ( $self, $ch ) = @_;
839 0         0 $self->_readlines_ch( out => $ch );
840             }
841              
842             sub readline_error {
843 0     0 1 0 shift->readline_error_cv->recv;
844             }
845              
846             # AnyEvent::Impl::Perl has some issues with POSIX::dup.
847             # This statement solves the problem.
848             AnyEvent::post_detect {
849             AE::child $$ => sub { };
850             };
851              
852             1;
853              
854             package # hidden
855             AnyEvent::Proc::R;
856              
857 32     32   189 use overload '""' => sub { shift->{fileno} };
  32     16   48  
  32         307  
  16         388  
858              
859 2     2   58 sub A { shift->{r} }
860 3     3   15 sub B { shift->{w} }
861              
862             sub on_timeout {
863 0     0   0 shift->A->on_wtimeout(pop);
864             }
865              
866             sub stop_timeout {
867 0     0   0 shift->A->stop_wtimeout;
868             }
869              
870             sub pipe {
871 2     2   46 my ( $self, $peer ) = @_;
872 2         8 $self->{proc}->pipe( $self => $peer );
873             }
874              
875             sub readline_cb {
876 0     0   0 my ( $self, $cb ) = @_;
877 0         0 $self->{proc}->_readline_cb( $self => $cb );
878             }
879              
880             sub readline_cv {
881 0     0   0 my ( $self, $cv ) = @_;
882 0         0 $self->{proc}->_readline_cv( $self => $cv );
883             }
884              
885             sub readline_ch {
886 0     0   0 my ( $self, $ch ) = @_;
887 0         0 $self->{proc}->_readline_ch( $self => $ch );
888             }
889              
890             sub readlines_cb {
891 0     0   0 my ( $self, $cb ) = @_;
892 0         0 $self->{proc}->_readlines_cb( $self => $cb );
893             }
894              
895             sub readlines_ch {
896 0     0   0 my ( $self, $ch ) = @_;
897 0         0 $self->{proc}->_readlines_cb( $self => $ch );
898             }
899              
900             sub readline {
901 0     0   0 shift->readline_cv->recv;
902             }
903              
904             1;
905              
906             package # hidden
907             AnyEvent::Proc::W;
908              
909 32     32   10528 use overload '""' => sub { shift->{fileno} };
  32     6   65  
  32         163  
  6         222  
910              
911 32     32   1426 use Try::Tiny;
  32         36  
  32         8344  
912              
913 3     3   67 sub A { shift->{w} }
914 2     2   12 sub B { shift->{r} }
915              
916             sub finish {
917 1     1   11 shift->A->destroy;
918             }
919              
920             sub on_timeout {
921 0     0   0 shift->A->on_rtimeout(pop);
922             }
923              
924             sub stop_timeout {
925 0     0   0 shift->A->stop_rtimeout;
926             }
927              
928             sub write {
929 1     1   3 my ( $self, $type, @args ) = @_;
930 1         2 my $ok = 0;
931             try {
932 1     1   111 $self->A->push_write( $type => @args );
933 1         86 $ok = 1;
934             }
935             catch {
936 0     0   0 AE::log warn => $_;
937 1         34 };
938 1         33 $ok;
939             }
940              
941             sub writeln {
942 1     1   42 my ( $self, @lines ) = @_;
943 1         7 my $eol = $self->{proc}->_eol;
944 1         7 $self->write( $_ . $eol ) for @lines;
945 1         3 $self;
946             }
947              
948 0     0   0 sub pull { die 'UNIMPLEMENTED' }
949              
950             1;
951              
952             package # hidden
953             AnyEvent::Proc::TiedScalar;
954              
955 32     32   18605 use Tie::Scalar;
  32         13729  
  32         2352  
956              
957             our @ISA = ('Tie::Scalar');
958              
959             sub TIESCALAR {
960 1     1   16 bless pop, shift;
961             }
962              
963             sub FETCH {
964 1     1   9 undef;
965             }
966              
967             sub STORE {
968 3     3   36 shift->(pop);
969             }
970              
971             1;
972              
973             __END__