File Coverage

blib/lib/AnyEvent/Proc.pm
Criterion Covered Total %
statement 317 502 63.1
branch 62 126 49.2
condition 13 50 26.0
subroutine 78 148 52.7
pod 36 36 100.0
total 506 862 58.7


line stmt bran cond sub pod time code
1 31     31   3214422 use strict;
  31         85  
  31         1540  
2 31     31   187 use warnings;
  31         58  
  31         1606  
3              
4             package AnyEvent::Proc;
5              
6             # ABSTRACT: Run external commands
7              
8 31     31   15330 use AnyEvent;
  31         66717  
  31         883  
9 31     31   58618 use AnyEvent::Handle;
  31         756635  
  31         1646  
10 31     31   378 use AnyEvent::Util ();
  31         52  
  31         583  
11 31     31   23249 use Try::Tiny;
  31         47834  
  31         2652  
12 31     31   21634 use Class::Load;
  31         1037697  
  31         2035  
13 31     31   301 use Exporter qw(import);
  31         57  
  31         1106  
14 31     31   177 use Carp;
  31         44  
  31         2058  
15 31     31   21533 use POSIX;
  31         224045  
  31         249  
16              
17             our $VERSION = '0.105'; # VERSION
18              
19             our @EXPORT_OK = qw(run run_cb reader writer);
20              
21             sub _rpipe {
22 77     77   2872 my ( $R, $W ) = AnyEvent::Util::portable_pipe;
23             (
24             $R,
25             AnyEvent::Handle->new(
26             fh => $W,
27             on_error => sub {
28 0     0   0 my ( $handle, $fatal, $message ) = @_;
29 0         0 AE::log warn => "error writing to handle: $message";
30 0         0 $handle->destroy;
31             },
32 77         5007 ),
33             );
34             }
35              
36             sub _wpipe {
37 148     148   8030 my ( $R, $W ) = AnyEvent::Util::portable_pipe;
38             (
39             AnyEvent::Handle->new(
40             fh => $R,
41             on_error => sub {
42 0     0   0 my ( $handle, $fatal, $message ) = @_;
43 0 0       0 AE::log warn => "error reading from handle: $message"
44             unless $message =~ m{unexpected end-of-file}i;
45 0         0 $handle->destroy;
46             },
47 148         7160 ),
48             $W,
49             );
50             }
51              
52             sub _on_read_helper {
53 59     59   180 my ( $aeh, $sub ) = @_;
54             $aeh->on_read(
55             sub {
56 39     39   30539495 my $x = $_[0]->rbuf;
57 39         485 $_[0]->rbuf = '';
58 39         269 $sub->($x);
59             }
60 59         1153 );
61             }
62              
63             sub _read_on_scalar {
64 1     1   4 my ( $var, $sub ) = @_;
65 1   50     6 my $old = $$var || '';
66 1         978 tie $$var, __PACKAGE__ . '::TiedScalar', $sub;
67 1         14 $$var = $old;
68 1         39 $var;
69             }
70              
71             sub _reaper {
72 0     0   0 my ( $self, $waiters ) = @_;
73             my $sub = sub {
74              
75             # my $message = shift; # currently unused
76 0     0   0 foreach my $waiter (@$waiters) {
77 0         0 AE::log debug => "reap $waiter";
78 0 0       0 if ( ref $waiter eq 'CODE' ) {
    0          
    0          
79 0         0 $waiter->(undef);
80             }
81             elsif ( ref $waiter eq 'AnyEvent::CondVar' ) {
82 0         0 $waiter->send(undef);
83             }
84             elsif ( ref $waiter eq 'Coro::Channel' ) {
85 0         0 $waiter->shutdown;
86             }
87             else {
88 0         0 AE::log note => "cannot reap $waiter";
89             }
90             }
91 0         0 };
92 0         0 push @{ $self->{reapers} } => $sub;
  0         0  
93 0         0 $sub;
94             }
95              
96             sub _push_waiter {
97 0     0   0 my ( $self, $what, $var ) = @_;
98 0         0 push @{ $self->{waiters}->{$what} } => $var;
  0         0  
99             }
100              
101             sub _run_cmd {
102 68     68   140 my ( $cmd, $redir, $pidref ) = @_;
103              
104 68         3008 my $cv = AE::cv;
105              
106 68         4830 my %redir = %$redir;
107              
108 68         134517 my $pid = fork;
109 68 50       4778 AE::log error => "cannot fork: $!" unless defined $pid;
110              
111 68 100       1521 unless ($pid) {
112              
113             # move any existing fd's out of the way
114             # this also ensures that dup2 is never called with fd1==fd2
115             # so the cloexec flag is always cleared
116 21         1015 my ( @oldfh, @close );
117 21         842 for my $fh ( values %redir ) {
118 65         681 push @oldfh, $fh; # make sure we keep it open
119 65         586 $fh = fileno $fh; # we only want the fd
120              
121             # dup if we are in the way
122             # if we "leak" fds here, they will be dup2'ed over later
123 65   0     1830 defined( $fh = POSIX::dup($fh) )
124             or POSIX::_exit(124)
125             while exists $redir{$fh};
126             }
127              
128             # execute redirects
129 21         1446 while ( my ( $k, $v ) = each %redir ) {
130 65 50       1748 defined POSIX::dup2( $v, $k )
131             or POSIX::_exit(123);
132             }
133              
134 21         1215 AnyEvent::Util::close_all_fds_except( keys %redir );
135              
136 21         15811 my $bin = $cmd->[0];
137              
138 31     31   126976 no warnings; ## no critic
  31         81  
  31         90505  
139              
140 21         162 exec {$bin} @$cmd;
  21         0  
141              
142 0         0 POSIX::_exit(126);
143             }
144              
145 47         1634 $$pidref = $pid;
146              
147 47         723 my $w;
148             $w = AE::child $pid => sub {
149 47     47   4332927 my $status = $_[1] >> 8;
150 47         149 my $signal = $_[1] & 127;
151 47         148 my $coredump = $_[1] & 128;
152 47 100       472 AE::log info => "child exited with status $status" if $status;
153 47 100       1023 AE::log debug => "child exited with signal $signal" if $signal;
154 47 50       411 AE::log note => "child exited with coredump" if $coredump;
155 47         178 undef $w;
156 47         321 map { close $_ } values %redir;
  144         1607  
157 47         700 $cv->send($status);
158 47         30423 };
159              
160 47         6410 $cv;
161             }
162              
163             sub new {
164 68     68 1 23609 my ( $class, %options ) = @_;
165              
166 68   100     655 $options{args} ||= [];
167              
168 68         344 my ( $rIN, $wIN ) = _rpipe;
169 68         8202 my ( $rOUT, $wOUT ) = _wpipe;
170 68         4000 my ( $rERR, $wERR ) = _wpipe;
171              
172 68 100       4763 my @xhs = @{ delete( $options{extras} ) || [] };
  68         1200  
173              
174 68         187 my @args = map { "$_" } @{ delete $options{args} };
  23         108  
  68         252  
175              
176 68         153 my $pid;
177              
178 5         12 my %redir = (
179             0 => $rIN,
180             1 => $wOUT,
181             2 => $wERR,
182 68         492 map { ( "$_" => $_->B ) } @xhs
183             );
184              
185 68         536 my $cv = _run_cmd( [ delete $options{bin} => @args ], \%redir, \$pid );
186 47         3319 my $waiter = AE::cv;
187              
188 3         73 my $self = bless {
189             handles => {
190             in => $wIN,
191             out => $rOUT,
192             err => $rERR,
193 3         8 map { ( "$_" => $_->A ) } @xhs,
194             },
195             pid => $pid,
196             listeners => {
197             exit => delete $options{on_exit},
198             ttl_exceed => delete $options{on_ttl_exceed},
199             },
200             eol => "\n",
201             cv => $cv,
202             alive => 1,
203             waiter => $waiter,
204             waiters => {
205             in => [],
206             out => [],
207             err => [],
208 47   33     7219 map { ( "$_" => [] ) } @xhs
209             },
210             reapers => [],
211             } => ref $class
212             || $class;
213              
214 47         169 map { $_->{proc} = $self } @xhs;
  3         44  
215              
216             {
217 47         170 my $eol = quotemeta $self->_eol;
  47         1485  
218 47   33     5972 $self->{reol} = delete $options{reol} || qr{$eol};
219             }
220              
221 47 100       413 if ( $options{ttl} ) {
222             $self->{timer} = AnyEvent->timer(
223             after => delete $options{ttl},
224             cb => sub {
225 1 50   1   964131 return unless $self->alive;
226 1         8 $self->kill;
227 1         10 $self->_emit('ttl_exceed');
228             }
229 37         1883 );
230             }
231              
232 47     10   2441 my $kill = sub { $self->end };
  10         75  
233              
234 47 100       742 if ( $options{timeout} ) {
235 4         112 $wIN->timeout( $options{timeout} );
236 4         436 $rOUT->timeout( $options{timeout} );
237 4         208 $rERR->timeout( $options{timeout} );
238 4         124 delete $options{timeout};
239              
240 4   33     68 $self->_on( timeout => ( delete( $options{on_timeout} ) || $kill ) );
241 4     4   16 my $cb = sub { $self->_emit('timeout') };
  4         3950632  
242 4         68 $wIN->on_timeout($cb);
243 4         52 $rOUT->on_timeout($cb);
244 4         24 $rERR->on_timeout($cb);
245             }
246              
247 47 100       348 if ( $options{wtimeout} ) {
248 3         114 $wIN->wtimeout( delete $options{wtimeout} );
249              
250 3   33     435 $self->_on( wtimeout => ( delete( $options{on_wtimeout} ) || $kill ) );
251 3     3   51 my $cb = sub { $self->_emit('wtimeout') };
  3         2973666  
252 3         60 $wIN->on_wtimeout($cb);
253             }
254              
255 47 100       341 if ( $options{rtimeout} ) {
256 2         88 $rOUT->rtimeout( delete $options{rtimeout} );
257              
258 2   33     264 $self->_on( rtimeout => ( delete( $options{on_rtimeout} ) || $kill ) );
259 2     2   40 my $cb = sub { $self->_emit('rtimeout') };
  2         1990474  
260 2         32 $rOUT->on_rtimeout($cb);
261             }
262              
263 47 100       250 if ( $options{etimeout} ) {
264 1         39 $rERR->rtimeout( delete $options{etimeout} );
265              
266 1   33     108 $self->_on( etimeout => ( delete( $options{on_etimeout} ) || $kill ) );
267 1     1   10 my $cb = sub { $self->_emit('etimeout') };
  1         991213  
268 1         16 $rERR->on_rtimeout($cb);
269             }
270              
271 47 100       247 if ( $options{errstr} ) {
272 16         140 my $sref = delete $options{errstr};
273 16         161 $$sref = '';
274 16         785 $self->pipe( err => $sref );
275             }
276              
277 47 100       1674 if ( $options{outstr} ) {
278 26         177 my $sref = delete $options{outstr};
279 26         221 $$sref = '';
280 26         377 $self->pipe( out => $sref );
281             }
282              
283 47         1180 $waiter->begin;
284             $cv->cb(
285             sub {
286 47     47   1438 $self->{status} = shift->recv;
287 47         940 $self->{alive} = 0;
288 47         164 undef $self->{timer};
289 47         887 $waiter->end;
290 47         986 $self->_emit( exit => $self->{status} );
291             }
292 47         2214 );
293              
294 47 50       2328 if ( keys %options ) {
295 0         0 AE::log note => "unknown left-over option(s): " . join ', ' =>
296             keys %options;
297             }
298              
299 47         4032 $self;
300             }
301              
302             sub reader {
303 3     3 1 2487 my ( $r, $w ) = _wpipe;
304 3         315 bless {
305             r => $r,
306             w => $w,
307             fileno => fileno( $r->fh )
308             } => __PACKAGE__
309             . '::R';
310             }
311              
312             sub writer {
313 2     2 1 16 my ( $r, $w ) = _rpipe;
314 2         252 bless {
315             r => $r,
316             w => $w,
317             fileno => fileno( $w->fh )
318             } => __PACKAGE__
319             . '::W';
320             }
321              
322             sub run {
323 12     12 1 31124 my $cv = AE::cv;
324             run_cb(
325             @_,
326             sub {
327 9     9   67 $cv->send( \@_ );
328             }
329 12         1213 )->recv;
330 9         1011 my ( $out, $err, $status ) = @{ $cv->recv };
  9         52  
331 9         110 $? = $status << 8;
332 9 100       34 if (wantarray) {
333 3         51 return ( $out, $err );
334             }
335             else {
336 6 50       20 carp $err if $err;
337 6         34 return $out;
338             }
339             }
340              
341             sub run_cb {
342 14     14 1 1836 my $bin = shift;
343 14         24 my $cb = pop;
344 14         41 my @args = @_;
345 14         38 my ( $out, $err ) = ( '', '' );
346 14         122 my $proc = __PACKAGE__->new(
347             bin => $bin,
348             args => \@args,
349             outstr => \$out,
350             errstr => \$err
351             );
352 10         178 $proc->finish;
353             $proc->wait(
354             sub {
355 10     10   33 my $status = $proc->{status};
356 10         64 $? = $status << 8;
357 10         35 $cb->( $out, $err, $status );
358             }
359 10         170 );
360             }
361              
362             sub _on {
363 10     10   98 my ( $self, $name, $handler ) = @_;
364 10         42 $self->{listeners}->{$name} = $handler;
365             }
366              
367 35     35 1 144 sub in { shift->_geth('in') }
368              
369 0     0 1 0 sub out { shift->_geth('out') }
370              
371 0     0 1 0 sub err { shift->_geth('err') }
372              
373             sub _geth {
374 105     105   1102 shift->{handles}->{ pop() };
375             }
376              
377 56     56   1576 sub _eol { shift->{eol} }
378 0     0   0 sub _reol { shift->{reol} }
379              
380             sub _emit {
381 58     58   254 my ( $self, $name, @args ) = @_;
382 58         528 AE::log debug => "trapped $name";
383 58 100 66     7041 if ( exists $self->{listeners}->{$name}
384             and defined $self->{listeners}->{$name} )
385             {
386 15         113 $self->{listeners}->{$name}->( $self, @args );
387             }
388             }
389              
390             sub pid {
391 3     3 1 130 shift->{pid};
392             }
393              
394             sub fire {
395 3     3 1 19 my ( $self, $signal ) = @_;
396 3 100       60 $signal = 'TERM' unless defined $signal;
397 3         13 $signal =~ s{^sig}{}i;
398 3         34 AE::log debug => "fire SIG$signal";
399 3         7893 kill uc $signal => $self->pid;
400             }
401              
402             sub kill {
403 1     1 1 4 my ($self) = @_;
404 1         4 $self->fire('kill');
405             }
406              
407             sub fire_and_kill {
408 0     0 1 0 my $self = shift;
409 0 0       0 my $cb = ( ref $_[-1] eq 'CODE' ? pop : undef );
410 0         0 my $time = pop;
411 0   0     0 my $signal = uc( pop || 'TERM' );
412             my $w = AnyEvent->timer(
413             after => $time,
414             cb => sub {
415 0 0   0   0 return unless $self->alive;
416 0         0 $self->kill;
417             }
418 0         0 );
419 0         0 $self->fire($signal);
420 0 0       0 if ($cb) {
421             return $self->wait(
422             sub {
423 0     0   0 undef $w;
424 0         0 $cb->(@_);
425             }
426 0         0 );
427             }
428             else {
429 0         0 my $exit = $self->wait;
430 0         0 undef $w;
431 0         0 return $exit;
432             }
433             }
434              
435             sub alive {
436 1     1 1 4 my $self = shift;
437 1 50       8 return 0 unless $self->{alive};
438 1 50       8 $self->fire(0) ? 1 : 0;
439             }
440              
441             sub wait {
442 47     47 1 13938 my ( $self, $cb ) = @_;
443              
444             my $next = sub {
445 47     47   1248 my $cv = shift;
446 47         258 $cv->recv;
447 47         552 waitpid $self->{pid} => 0;
448 47 100       295 $cb->( $self->{status} ) if ref $cb eq 'CODE';
449 47         1128 $self->end;
450 47         682 $self->{status};
451 47         535 };
452 47         889 AE::log debug => "waiting for "
453             . ( $self->{waiter}->{_ae_counter} ) . " ends";
454 47 100       64413 if ($cb) {
455 11         249 $self->{waiter}->cb($next);
456 11         425 return $self->{waiter};
457             }
458             else {
459 36         572 $self->{waiter}->recv;
460 36         1650 return $next->( $self->{waiter} );
461             }
462             }
463              
464             sub finish {
465 35     35 1 836 my ($self) = @_;
466 35         200 $self->in->destroy;
467 35         3004 $self;
468             }
469              
470             sub end {
471 57     57 1 325 my ($self) = @_;
472 57         108 map { $_->destroy } values %{ $self->{handles} };
  174         4266  
  57         329  
473 57         1191 map { $_->() } @{ $self->{reapers} };
  0         0  
  57         202  
474 57         856 $self;
475             }
476              
477             sub stop_timeout {
478 0     0 1 0 my ($self) = @_;
479 0         0 $self->in->timeout(0);
480 0         0 $self->out->timeout(0);
481 0         0 $self->err->timeout(0);
482             }
483              
484             sub stop_wtimeout {
485 0     0 1 0 my ($self) = @_;
486 0         0 $self->in->wtimeout(0);
487             }
488              
489             sub stop_rtimeout {
490 0     0 1 0 my ($self) = @_;
491 0         0 $self->out->rtimeout(0);
492             }
493              
494             sub stop_etimeout {
495 0     0 1 0 my ($self) = @_;
496 0         0 $self->err->rtimeout(0);
497             }
498              
499             sub write {
500 20     20 1 156 my ( $self, $type, @args ) = @_;
501 20         163 my $ok = 0;
502             try {
503 20     20   2065 $self->_geth('in')->push_write( $type => @args );
504 20         1883 $ok = 1;
505             }
506             catch {
507 0     0   0 AE::log warn => $_;
508 20         668 };
509 20         923 $ok;
510             }
511              
512             sub writeln {
513 8     8 1 1304 my ( $self, @lines ) = @_;
514 8         52 $self->write( $_ . $self->_eol ) for @lines;
515 8         29 $self;
516             }
517              
518             sub pipe {
519 50     50 1 1156 my $self = shift;
520 50         123 my $peer = pop;
521 50   100     323 my $what = ( pop || 'out' );
522 50 100       252 if ( ref $what ) {
523 2         6 $what = "$what";
524             }
525             else {
526 48         222 $what = lc $what;
527 48         403 $what =~ s{^std}{};
528             }
529 31     31   333 use Scalar::Util qw(blessed);
  31         58  
  31         16623  
530 50         245 my $sub;
531 50 100       474 if ( blessed $peer) {
    100          
    50          
    0          
532 3 50       326 if ( $peer->isa(__PACKAGE__) ) {
    100          
    50          
    50          
533             $sub = sub {
534 0     0   0 $peer->write(shift);
535             }
536 0         0 }
537             elsif ( $peer->isa('AnyEvent::Handle') ) {
538             $sub = sub {
539 2     2   24 $peer->push_write(shift);
540             }
541 2         28 }
542             elsif ( $peer->isa('Coro::Channel') ) {
543             $sub = sub {
544 0     0   0 $peer->put(shift);
545             }
546 0         0 }
547             elsif ( $peer->can('print') ) {
548             $sub = sub {
549 1     1   15 $peer->print(shift);
550             }
551 1         20 }
552             }
553             elsif ( ref $peer eq 'SCALAR' ) {
554             $sub = sub {
555 24     24   146 $$peer .= shift;
556             }
557 44         514 }
558             elsif ( ref $peer eq 'GLOB' ) {
559             $sub = sub {
560 3     3   33 print $peer shift();
561             }
562 3         30 }
563             elsif ( ref $peer eq 'CODE' ) {
564 0         0 $sub = $peer;
565             }
566 50 50       256 if ($sub) {
567 50         1233 AE::log debug => "pipe $peer from $what";
568 50         102142 my $aeh = $self->_geth($what);
569             $aeh->on_eof(
570             sub {
571 50     50   4175 AE::log debug => "eof: $what";
572 50         3459 shift->destroy;
573 50         1958 $self->{waiter}->end;
574             }
575 50         1171 );
576 50         962 $self->{output}->{$what} = _on_read_helper( $aeh, $sub );
577 50         8283 $self->{waiter}->begin;
578             }
579             else {
580 0         0 AE::log fatal => "cannot pipe $peer from $what";
581             }
582             }
583              
584             sub pull {
585 15     15 1 2236 my ( $self, $peer ) = @_;
586 15         53 $self->{input} = $peer;
587 15         129 AE::log debug => "pull $peer to stdin";
588 31     31   207 use Scalar::Util qw(blessed);
  31         59  
  31         67782  
589 15         951 my $sub;
590 15 100       188 if ( blessed $peer) {
    100          
    50          
591 11 50       327 if ( $peer->isa(__PACKAGE__) ) {
    100          
    50          
    0          
592 0         0 return $peer->pipe($self);
593             }
594             elsif ( $peer->isa('AnyEvent::Handle') ) {
595             $peer->on_eof(
596             sub {
597 9     9   2035 AE::log debug => "pull($peer)->on_eof";
598 9         942 shift->destroy;
599 9         429 $self->finish;
600             }
601 9         83 );
602             $peer->on_error(
603             sub {
604 0     0   0 AE::log error => "pull($peer)->on_error(" . $_[2] . ")";
605 0         0 shift->destroy;
606             }
607 9         159 );
608             return _on_read_helper(
609             $peer,
610             sub {
611 9     9   50 AE::log debug => "pull($peer)->on_read";
612 9         565 $self->write( shift() );
613             }
614 9         176 );
615             }
616             elsif ( $peer->isa('IO::Handle') ) {
617 2         92 return $self->pull( AnyEvent::Handle->new( fh => $peer ) );
618             }
619             elsif ( $peer->isa('Coro::Channel') ) {
620 0 0       0 if ( my $class = load_class('Coro') ) {
621             return $class->new(
622             sub {
623 0     0   0 while ( my $x = $peer->get ) {
624 0 0       0 $self->write($x) or last;
625 0         0 Coro::cede();
626             }
627 0         0 $self->finish;
628             }
629 0         0 );
630             }
631             }
632             }
633             elsif ( ref $peer eq 'SCALAR' ) {
634             return _read_on_scalar(
635             $peer,
636             sub {
637 3     3   31 AE::log debug => "pull($peer)->STORE";
638 3         642 $self->write( shift() );
639             }
640 1         31 );
641             }
642             elsif ( ref $peer eq 'GLOB' ) {
643 3         84 return $self->pull( AnyEvent::Handle->new( fh => $peer ) );
644             }
645 0         0 AE::log fatal => "cannot pull $peer to stdin";
646             }
647              
648             sub _push_read {
649 0     0   0 my ( $self, $what, @args ) = @_;
650 0         0 my $ok = 0;
651             try {
652 0     0   0 $self->_geth($what)->push_read(@args);
653 0         0 $ok = 1;
654             }
655             catch {
656 0     0   0 AE::log note => "cannot push_read from std$what: $_";
657 0         0 };
658 0         0 $ok;
659             }
660              
661             sub _unshift_read {
662 0     0   0 my ( $self, $what, @args ) = @_;
663 0         0 my $ok = 0;
664             try {
665 0     0   0 $self->_geth($what)->unshift_read(@args);
666 0         0 $ok = 1;
667             }
668             catch {
669 0     0   0 AE::log note => "cannot unshift_read from std$what: $_";
670 0         0 };
671 0         0 $ok;
672             }
673              
674             sub _readline {
675 0     0   0 my ( $self, $what, $sub ) = @_;
676 0         0 $self->_push_read( $what => line => $self->_reol, $sub );
677             }
678              
679             sub _readchunk {
680 0     0   0 my ( $self, $what, $bytes, $sub ) = @_;
681 0         0 $self->_push_read( $what => chunk => $bytes => $sub );
682             }
683              
684             sub _sub_cb {
685 0     0   0 my ($cb) = @_;
686 0     0   0 sub { $cb->( $_[1] ) }
687 0         0 }
688              
689             sub _sub_cv {
690 0     0   0 my ($cv) = @_;
691 0     0   0 sub { $cv->send( $_[1] ) }
692 0         0 }
693              
694             sub _sub_ch {
695 0     0   0 my ($ch) = @_;
696 0     0   0 sub { $ch->put( $_[1] ) }
697 0         0 }
698              
699             sub _readline_cb {
700 0     0   0 my ( $self, $what, $cb ) = @_;
701 0         0 $self->_push_waiter( $what => $cb );
702 0         0 $self->_readline( $what => _sub_cb($cb) );
703             }
704              
705             sub _readline_cv {
706 0     0   0 my ( $self, $what, $cv ) = @_;
707 0   0     0 $cv ||= AE::cv;
708 0         0 $self->_push_waiter( $what => $cv );
709 0 0       0 $cv->send unless $self->_readline( $what => _sub_cv($cv) );
710 0         0 $cv;
711             }
712              
713             sub _readline_ch {
714 0     0   0 my ( $self, $what, $channel ) = @_;
715 0 0       0 unless ($channel) {
716 0 0       0 if ( my $class = load_class('Coro::Channel') ) {
717 0   0     0 $channel ||= $class->new;
718             }
719             }
720 0         0 $self->_push_waiter( $what => $channel );
721 0 0       0 $channel->shutdown unless $self->_readline( $what => _sub_ch($channel) );
722 0         0 $channel;
723             }
724              
725             sub _readlines_cb {
726 0     0   0 my ( $self, $what, $cb ) = @_;
727 0         0 $self->_push_waiter( $what => $cb );
728             $self->_geth($what)->on_read(
729             sub {
730 0     0   0 $self->_readline( $what => _sub_cb($cb) );
731             }
732 0         0 );
733             }
734              
735             sub _readlines_ch {
736 0     0   0 my ( $self, $what, $channel ) = @_;
737 0 0       0 unless ($channel) {
738 0 0       0 if ( my $class = load_class('Coro::Channel') ) {
739 0   0     0 $channel ||= $class->new;
740             }
741             }
742 0         0 $self->_push_waiter( $what => $channel );
743             $channel->shutdown unless $self->_geth($what)->on_read(
744             sub {
745 0     0   0 $self->_readline( $what => _sub_ch($channel) );
746             }
747 0 0       0 );
748 0         0 $channel;
749             }
750              
751             sub _readchunk_cb {
752 0     0   0 my ( $self, $what, $bytes, $cb ) = @_;
753 0         0 $self->_push_waiter( $what => $cb );
754 0         0 $self->_readchunk( $what, $bytes, _sub_cb($cb) );
755             }
756              
757             sub _readchunk_cv {
758 0     0   0 my ( $self, $what, $bytes, $cv ) = @_;
759 0   0     0 $cv ||= AE::cv;
760 0         0 $self->_push_waiter( $what => $cv );
761 0         0 $self->_readchunk( $what, $bytes, _sub_cv($cv) );
762 0         0 $cv;
763             }
764              
765             sub _readchunk_ch {
766 0     0   0 my ( $self, $what, $bytes, $channel ) = @_;
767 0 0       0 unless ($channel) {
768 0 0       0 if ( my $class = load_class('Coro::Channel') ) {
769 0   0     0 $channel ||= $class->new;
770             }
771             }
772 0         0 $self->_push_waiter( $what => $channel );
773 0 0       0 $channel->shutdown unless $self->_readline( $what => _sub_ch($channel) );
774 0         0 $channel;
775             }
776              
777             sub _readchunks_ch {
778 0     0   0 my ( $self, $what, $bytes, $channel ) = @_;
779 0 0       0 unless ($channel) {
780 0 0       0 if ( my $class = load_class('Coro::Channel') ) {
781 0   0     0 $channel ||= $class->new;
782             }
783             }
784 0         0 $self->_push_waiter( $what => $channel );
785             $channel->shutdown unless $self->_geth($what)->on_read(
786             sub {
787 0     0   0 $self->_readline( $what => _sub_ch($channel) );
788             }
789 0 0       0 );
790 0         0 $channel;
791             }
792              
793             sub readline_cb {
794 0     0 1 0 my ( $self, $cb ) = @_;
795 0         0 $self->_readline_cb( out => $cb );
796             }
797              
798             sub readline_cv {
799 0     0 1 0 my ( $self, $cv ) = @_;
800 0         0 $self->_readline_cv( out => $cv );
801             }
802              
803             sub readline_ch {
804 0     0 1 0 my ( $self, $ch ) = @_;
805 0         0 $self->_readline_ch( out => $ch );
806             }
807              
808             sub readlines_cb {
809 0     0 1 0 my ( $self, $cb ) = @_;
810 0         0 $self->_readlines_cb( out => $cb );
811             }
812              
813             sub readlines_ch {
814 0     0 1 0 my ( $self, $ch ) = @_;
815 0         0 $self->_readlines_ch( out => $ch );
816             }
817              
818             sub readline {
819 0     0 1 0 shift->readline_cv->recv;
820             }
821              
822             sub readline_error_cb {
823 0     0 1 0 my ( $self, $cb ) = @_;
824 0         0 $self->_readline_cb( err => $cb );
825             }
826              
827             sub readline_error_cv {
828 0     0 1 0 my ( $self, $cv ) = @_;
829 0         0 $self->_readline_cv( err => $cv );
830             }
831              
832             sub readline_error_ch {
833 0     0 1 0 my ( $self, $ch ) = @_;
834 0         0 $self->_readline_ch( err => $ch );
835             }
836              
837             sub readlines_error_cb {
838 0     0 1 0 my ( $self, $cb ) = @_;
839 0         0 $self->_readlines_cb( out => $cb );
840             }
841              
842             sub readlines_error_ch {
843 0     0 1 0 my ( $self, $ch ) = @_;
844 0         0 $self->_readlines_ch( out => $ch );
845             }
846              
847             sub readline_error {
848 0     0 1 0 shift->readline_error_cv->recv;
849             }
850              
851             # AnyEvent::Impl::Perl has some issues with POSIX::dup.
852             # This statement solves the problem.
853             AnyEvent::post_detect {
854             AE::child $$ => sub { };
855             };
856              
857             1;
858              
859             package # hidden
860             AnyEvent::Proc::R;
861              
862 31     31   364 use overload '""' => sub { shift->{fileno} };
  31     16   64  
  31         459  
  16         378  
863              
864 2     2   58 sub A { shift->{r} }
865 3     3   15 sub B { shift->{w} }
866              
867             sub on_timeout {
868 0     0   0 shift->A->on_wtimeout(pop);
869             }
870              
871             sub stop_timeout {
872 0     0   0 shift->A->stop_wtimeout;
873             }
874              
875             sub pipe {
876 2     2   78 my ( $self, $peer ) = @_;
877 2         12 $self->{proc}->pipe( $self => $peer );
878             }
879              
880             sub readline_cb {
881 0     0   0 my ( $self, $cb ) = @_;
882 0         0 $self->{proc}->_readline_cb( $self => $cb );
883             }
884              
885             sub readline_cv {
886 0     0   0 my ( $self, $cv ) = @_;
887 0         0 $self->{proc}->_readline_cv( $self => $cv );
888             }
889              
890             sub readline_ch {
891 0     0   0 my ( $self, $ch ) = @_;
892 0         0 $self->{proc}->_readline_ch( $self => $ch );
893             }
894              
895             sub readlines_cb {
896 0     0   0 my ( $self, $cb ) = @_;
897 0         0 $self->{proc}->_readlines_cb( $self => $cb );
898             }
899              
900             sub readlines_ch {
901 0     0   0 my ( $self, $ch ) = @_;
902 0         0 $self->{proc}->_readlines_cb( $self => $ch );
903             }
904              
905             sub readline {
906 0     0   0 shift->readline_cv->recv;
907             }
908              
909             1;
910              
911             package # hidden
912             AnyEvent::Proc::W;
913              
914 31     31   15776 use overload '""' => sub { shift->{fileno} };
  31     6   570  
  31         330  
  6         286  
915              
916 31     31   1861 use Try::Tiny;
  31         65  
  31         11022  
917              
918 3     3   151 sub A { shift->{w} }
919 2     2   20 sub B { shift->{r} }
920              
921             sub finish {
922 1     1   13 shift->A->destroy;
923             }
924              
925             sub on_timeout {
926 0     0   0 shift->A->on_rtimeout(pop);
927             }
928              
929             sub stop_timeout {
930 0     0   0 shift->A->stop_rtimeout;
931             }
932              
933             sub write {
934 1     1   9 my ( $self, $type, @args ) = @_;
935 1         8 my $ok = 0;
936             try {
937 1     1   135 $self->A->push_write( $type => @args );
938 1         106 $ok = 1;
939             }
940             catch {
941 0     0   0 AE::log note => $_;
942 1         63 };
943 1         52 $ok;
944             }
945              
946             sub writeln {
947 1     1   39 my ( $self, @lines ) = @_;
948 1         19 my $eol = $self->{proc}->_eol;
949 1         20 $self->write( $_ . $eol ) for @lines;
950 1         4 $self;
951             }
952              
953 0     0   0 sub pull { die 'UNIMPLEMENTED' }
954              
955             1;
956              
957             package # hidden
958             AnyEvent::Proc::TiedScalar;
959              
960 31     31   25697 use Tie::Scalar;
  31         20187  
  31         3673  
961              
962             our @ISA = ('Tie::Scalar');
963              
964             sub TIESCALAR {
965 1     1   25 bless pop, shift;
966             }
967              
968             sub FETCH {
969 1     1   9 undef;
970             }
971              
972             sub STORE {
973 3     3   35 shift->(pop);
974             }
975              
976             1;
977              
978             __END__