File Coverage

blib/lib/AnyEvent/Proc.pm
Criterion Covered Total %
statement 353 513 68.8
branch 63 112 56.2
condition 16 52 30.7
subroutine 93 157 59.2
pod 36 36 100.0
total 561 870 64.4


line stmt bran cond sub pod time code
1 33     33   2403085 use strict;
  33         72  
  33         1290  
2 33     33   168 use warnings;
  33         45  
  33         1413  
3              
4             package AnyEvent::Proc;
5              
6             # ABSTRACT: Run external commands
7              
8 33     33   12100 use AnyEvent;
  33         52647  
  33         778  
9 33     33   25255 use AnyEvent::Handle;
  33         599201  
  33         1226  
10 33     33   351 use AnyEvent::Util ();
  33         61  
  33         519  
11 33     33   21150 use Try::Tiny;
  33         39386  
  33         1866  
12 33     33   178 use Exporter qw(import);
  33         45  
  33         725  
13 33     33   17654 use POSIX;
  33         171060  
  33         206  
14              
15             our $VERSION = '0.103'; # VERSION
16              
17             our @EXPORT_OK = qw(run run_cb reader writer);
18              
19             sub _rpipe {
20 80     80   1966 my $on_eof = shift;
21 80         516 my ( $R, $W ) = AnyEvent::Util::portable_pipe;
22 80         4574 my $cv = AE::cv;
23             (
24             $R,
25             AnyEvent::Handle->new(
26             fh => $W,
27             on_error => sub {
28 0     0   0 my ( $handle, $fatal, $message ) = @_;
29 0         0 AE::log warn => "error writing to handle: $message";
30 0         0 $cv->send($message);
31             },
32 80         4940 on_eof => $on_eof,
33             ),
34             $cv,
35             );
36             }
37              
38             sub _wpipe {
39 154     154   5701 my $on_eof = shift;
40 154         461 my ( $R, $W ) = AnyEvent::Util::portable_pipe;
41 154         6611 my $cv = AE::cv;
42             (
43             AnyEvent::Handle->new(
44             fh => $R,
45             on_error => sub {
46 0     0   0 my ( $handle, $fatal, $message ) = @_;
47 0         0 AE::log warn => "error reading from handle: $message";
48 0         0 $cv->send($message);
49             },
50 154         5110 on_eof => $on_eof,
51             ),
52             $W,
53             $cv,
54             );
55             }
56              
57             sub _on_read_helper {
58 59     59   113 my ( $aeh, $sub ) = @_;
59             $aeh->on_read(
60             sub {
61 39     39   11055116 my $x = $_[0]->rbuf;
62 39         393 $_[0]->rbuf = '';
63 39         199 $sub->($x);
64             }
65 59         994 );
66             }
67              
68             sub _read_on_scalar {
69 1     1   10 my ( $var, $sub ) = @_;
70 1   50     22 my $old = $$var || '';
71 1         32 tie $$var, __PACKAGE__ . '::TiedScalar', $sub;
72 1         13 $$var = $old;
73 1         25 $var;
74             }
75              
76             sub _reaper {
77 147     147   298 my ( $self, $waiters ) = @_;
78             my $sub = sub {
79              
80             # my $message = shift; # currently unused
81 177     177   401 foreach my $waiter (@$waiters) {
82 1         9 AE::log debug => "reap $waiter";
83 1 50       64 if ( ref $waiter eq 'CODE' ) {
    50          
    0          
84 0         0 $waiter->(undef);
85             }
86             elsif ( ref $waiter eq 'AnyEvent::CondVar' ) {
87 1         5 $waiter->send(undef);
88             }
89             elsif ( ref $waiter eq 'Coro::Channel' ) {
90 0         0 $waiter->shutdown;
91             }
92             else {
93 0         0 AE::log note => "cannot reap $waiter";
94             }
95             }
96 147         1017 };
97 147         190 push @{ $self->{reapers} } => $sub;
  147         302  
98 147         750 $sub;
99             }
100              
101             sub _push_waiter {
102 1     1   2 my ( $self, $what, $var ) = @_;
103 1         1 push @{ $self->{waiters}->{$what} } => $var;
  1         4  
104             }
105              
106             sub _run_cmd {
107 71     71   134 my ( $cmd, $redir, $pidref ) = @_;
108              
109 71         1719 my $cv = AE::cv;
110              
111 71         533 my %redir = %$redir;
112              
113 71         65077 my $pid = fork;
114 71 50       3127 AE::log error => "cannot fork: $!" unless defined $pid;
115              
116 71 100       1107 unless ($pid) {
117              
118             # move any existing fd's out of the way
119             # this also ensures that dup2 is never called with fd1==fd2
120             # so the cloexec flag is always cleared
121 23         712 my ( @oldfh, @close );
122 23         856 for my $fh ( values %redir ) {
123 71         519 push @oldfh, $fh; # make sure we keep it open
124 71         529 $fh = fileno $fh; # we only want the fd
125              
126             # dup if we are in the way
127             # if we "leak" fds here, they will be dup2'ed over later
128 71   0     1238 defined( $fh = POSIX::dup($fh) )
129             or POSIX::_exit(124)
130             while exists $redir{$fh};
131             }
132              
133             # execute redirects
134 23         719 while ( my ( $k, $v ) = each %redir ) {
135 71 50       1619 defined POSIX::dup2( $v, $k )
136             or POSIX::_exit(123);
137             }
138              
139 23         1022 AnyEvent::Util::close_all_fds_except( keys %redir );
140              
141 23         12021 my $bin = $cmd->[0];
142              
143 33     33   93673 no warnings; ## no critic
  33         77  
  33         77229  
144              
145 23         304 exec {$bin} @$cmd;
  23         0  
146              
147 0         0 POSIX::_exit(126);
148             }
149              
150 48         367 $$pidref = $pid;
151              
152 48         2240 %redir = (); # close child side of the fds
153              
154 48         282 my $status;
155             $cv->begin(
156             sub {
157 48     48   667 shift->send($status);
158             }
159 48         5066 );
160              
161 48         2810 my $cw;
162             $cw = AE::child $pid => sub {
163 48     48   1656036 $status = $_[1] >> 8;
164 48         138 my $signal = $_[1] & 127;
165 48         133 my $coredump = $_[1] & 128;
166 48 100       259 AE::log info => "child exited with status $status" if $status;
167 48 100       773 AE::log debug => "child exited with signal $signal" if $signal;
168 48 50       11238 AE::log note => "child exited with coredump" if $coredump;
169 48         151 undef $cw;
170 48         553 $cv->end;
171 48         7346 };
172              
173 48         4997 $cv;
174             }
175              
176             sub new {
177 71     71 1 12401 my ( $class, %options ) = @_;
178              
179 71   100     777 $options{args} ||= [];
180              
181 71         180 my $eof_in;
182             my $eof_out;
183 0         0 my $eof_err;
184              
185 71     0   523 my ( $rIN, $wIN, $cvIN ) = _rpipe sub { $$eof_in->(@_) };
  0         0  
186 71     27   6792 my ( $rOUT, $wOUT, $cvOUT ) = _wpipe sub { $$eof_out->(@_) };
  27         2133531  
187 71     12   3422 my ( $rERR, $wERR, $cvERR ) = _wpipe sub { $$eof_err->(@_) };
  12         5704373  
188              
189 71 100       3009 my @xhs = @{ delete( $options{extras} ) || [] };
  71         630  
190              
191 71         186 my @args = map { "$_" } @{ delete $options{args} };
  23         81  
  71         239  
192              
193 71         104 my $pid;
194              
195 5         16 my %redir = (
196             0 => $rIN,
197             1 => $wOUT,
198             2 => $wERR,
199 71         362 map { ( "$_" => $_->B ) } @xhs
200             );
201              
202 71         402 my $cv = _run_cmd( [ delete $options{bin} => @args ], \%redir, \$pid );
203              
204 3         80 my $self = bless {
205             handles => {
206             in => $wIN,
207             out => $rOUT,
208             err => $rERR,
209 3         7 map { ( "$_" => $_->A ) } @xhs,
210             },
211             pid => $pid,
212             listeners => {
213             exit => delete $options{on_exit},
214             ttl_exceed => delete $options{on_ttl_exceed},
215              
216             #eof_stdin => delete $options{on_eof_stdin},
217             #eof_stdout => delete $options{on_eof_stdout},
218             #eof_stderr => delete $options{on_eof_stderr},
219             },
220             eol => "\n",
221             cv => $cv,
222             alive => 1,
223             waiters => {
224             in => [],
225             out => [],
226             err => [],
227 48   33     4395 map { ( "$_" => [] ) } @xhs
228             },
229             reapers => [],
230             } => ref $class
231             || $class;
232              
233 48         126 map { $_->{proc} = $self } @xhs;
  3         12  
234              
235             {
236 48         116 my $eol = quotemeta $self->_eol;
  48         1002  
237 48   33     2064 $self->{reol} = delete $options{reol} || qr{$eol};
238             }
239              
240 48         156 my $w;
241 48 100       197 if ( $options{ttl} ) {
242             $w = AnyEvent->timer(
243             after => delete $options{ttl},
244             cb => sub {
245 1 50   1   996111 return unless $self->alive;
246 1         7 $self->kill;
247 1         4 $self->_emit('ttl_exceed');
248             }
249 38         1222 );
250             }
251              
252 48     10   1863 my $kill = sub { $self->end };
  10         95  
253              
254 48 100       202 if ( $options{timeout} ) {
255 4         64 $wIN->timeout( $options{timeout} );
256 4         248 $rOUT->timeout( $options{timeout} );
257 4         88 $rERR->timeout( $options{timeout} );
258 4         96 delete $options{timeout};
259              
260 4   33     56 $self->_on( timeout => ( delete( $options{on_timeout} ) || $kill ) );
261 4     4   36 my $cb = sub { $self->_emit('timeout') };
  4         3988636  
262 4         36 $wIN->on_timeout($cb);
263 4         56 $rOUT->on_timeout($cb);
264 4         16 $rERR->on_timeout($cb);
265             }
266              
267 48 100       186 if ( $options{wtimeout} ) {
268 3         99 $wIN->wtimeout( delete $options{wtimeout} );
269              
270 3   33     342 $self->_on( wtimeout => ( delete( $options{on_wtimeout} ) || $kill ) );
271 3     3   63 my $cb = sub { $self->_emit('wtimeout') };
  3         2991378  
272 3         54 $wIN->on_wtimeout($cb);
273             }
274              
275 48 100       175 if ( $options{rtimeout} ) {
276 2         44 $rOUT->rtimeout( delete $options{rtimeout} );
277              
278 2   33     188 $self->_on( rtimeout => ( delete( $options{on_rtimeout} ) || $kill ) );
279 2     2   8 my $cb = sub { $self->_emit('rtimeout') };
  2         1994262  
280 2         12 $rOUT->on_rtimeout($cb);
281             }
282              
283 48 100       194 if ( $options{etimeout} ) {
284 1         61 $rERR->rtimeout( delete $options{etimeout} );
285              
286 1   33     182 $self->_on( etimeout => ( delete( $options{on_etimeout} ) || $kill ) );
287 1     1   5 my $cb = sub { $self->_emit('etimeout') };
  1         996901  
288 1         22 $rERR->on_rtimeout($cb);
289             }
290              
291 48 100       160 if ( $options{errstr} ) {
292 16         73 my $sref = delete $options{errstr};
293 16         94 $$sref = '';
294 16         283 $self->pipe( err => $sref );
295             }
296              
297 48 100       1756 if ( $options{outstr} ) {
298 26         143 my $sref = delete $options{outstr};
299 26         192 $$sref = '';
300 26         201 $self->pipe( out => $sref );
301             }
302              
303 48         2111 $cvIN->cb( $self->_reaper( $self->{waiters}->{in} ) );
304 48         744 $cvOUT->cb( $self->_reaper( $self->{waiters}->{out} ) );
305 48         399 $cvERR->cb( $self->_reaper( $self->{waiters}->{err} ) );
306 48         298 map { $_->{cv}->cb( $self->_reaper( $self->{waiters}->{"$_"} ) ) } @xhs;
  3         10  
307              
308 48     0   440 $$eof_in = sub { $self->_emit( eof_stdin => @_ ); };
  0         0  
309 48     27   241 $$eof_out = sub { $self->_emit( eof_stdout => @_ ); };
  27         126  
310 48     12   438 $$eof_err = sub { $self->_emit( eof_stderr => @_ ); };
  12         81  
311              
312             $cv->cb(
313             sub {
314 48     48   558 $self->{alive} = 0;
315 48         87 undef $w;
316 48         733 $self->_emit( exit => shift->recv );
317             }
318 48         440 );
319              
320 48 50       415 if ( keys %options ) {
321 0         0 AE::log note => "unknown left-over option(s): " . join ', ' =>
322             keys %options;
323             }
324              
325 48         2631 $self;
326             }
327              
328             sub reader {
329 0     0 1 0 my $eof = sub { };
  3     3   1545  
330 3     0   21 my ( $r, $w, $cv ) = _wpipe sub { $$eof->(@_) };
  0         0  
331 3         417 bless {
332             r => $r,
333             w => $w,
334             eof => $eof,
335             cv => $cv,
336             fileno => fileno( $r->fh )
337             } => __PACKAGE__
338             . '::R';
339             }
340              
341             sub writer {
342 0     0 1 0 my $eof = sub { };
  2     2   16  
343 2     0   18 my ( $r, $w, $cv ) = _rpipe sub { $$eof->(@_) };
  0         0  
344 2         168 bless {
345             r => $r,
346             w => $w,
347             eof => $eof,
348             cv => $cv,
349             fileno => fileno( $w->fh )
350             } => __PACKAGE__
351             . '::W';
352             }
353              
354             sub run {
355 12     12 1 10336 my $cv = AE::cv;
356             run_cb(
357             @_,
358             sub {
359 9     9   42 $cv->send( \@_ );
360             }
361 12         1084 );
362 9         18 my ( $out, $err, $status ) = @{ $cv->recv };
  9         46  
363 9         655 $? = $status << 8;
364 9 100       32 if (wantarray) {
365 3         21 return ( $out, $err );
366             }
367             else {
368 6 50       22 carp $err if $err;
369 6         32 return $out;
370             }
371             }
372              
373             sub run_cb {
374 14     14 1 2861 my $bin = shift;
375 14         26 my $cb = pop;
376 14         40 my @args = @_;
377 14         125 my ( $out, $err ) = ( '', '' );
378 14         105 my $proc = __PACKAGE__->new(
379             bin => $bin,
380             args => \@args,
381             outstr => \$out,
382             errstr => \$err
383             );
384 10         58 $proc->finish;
385             $proc->wait(
386             sub {
387 10     10   13 my $status = shift;
388 10         64 $? = $status << 8;
389 10         41 $cb->( $out, $err, $status );
390             }
391 10         209 );
392             }
393              
394             sub _on {
395 10     10   70 my ( $self, $name, $handler ) = @_;
396 10         33 $self->{listeners}->{$name} = $handler;
397             }
398              
399 35     35 1 141 sub in { shift->_geth('in') }
400              
401 0     0 1 0 sub out { shift->_geth('out') }
402              
403 0     0 1 0 sub err { shift->_geth('err') }
404              
405             sub _geth {
406 107     107   1192 shift->{handles}->{ pop() };
407             }
408              
409 58     58   993 sub _eol { shift->{eol} }
410 1     1   3 sub _reol { shift->{reol} }
411              
412             sub _emit {
413 98     98   803 my ( $self, $name, @args ) = @_;
414 98         626 AE::log debug => "trapped $name";
415 98 100 100     93247 if ( exists $self->{listeners}->{$name}
416             and defined $self->{listeners}->{$name} )
417             {
418 14         83 $self->{listeners}->{$name}->( $self, @args );
419             }
420             }
421              
422             sub pid {
423 7     7 1 152 shift->{pid};
424             }
425              
426             sub fire {
427 7     7 1 22 my ( $self, $signal ) = @_;
428 7 100       75 $signal = 'TERM' unless defined $signal;
429 7         28 kill uc $signal => $self->pid;
430 7         54 $self;
431             }
432              
433             sub kill {
434 1     1 1 2 my ($self) = @_;
435 1         4 $self->fire('kill');
436             }
437              
438             sub fire_and_kill {
439 0     0 1 0 my $self = shift;
440 0 0       0 my $cb = ( ref $_[-1] eq 'CODE' ? pop : undef );
441 0         0 my $time = pop;
442 0   0     0 my $signal = uc( pop || 'TERM' );
443             my $w = AnyEvent->timer(
444             after => $time,
445             cb => sub {
446 0 0   0   0 return unless $self->alive;
447 0         0 $self->kill;
448             }
449 0         0 );
450 0         0 $self->fire($signal);
451 0 0       0 if ($cb) {
452             return $self->wait(
453             sub {
454 0     0   0 undef $w;
455 0         0 $cb->(@_);
456             }
457 0         0 );
458             }
459             else {
460 0         0 my $exit = $self->wait;
461 0         0 undef $w;
462 0         0 return $exit;
463             }
464             }
465              
466             sub alive {
467 4     4 1 726 my $self = shift;
468 4 50       22 return 0 unless $self->{alive};
469 4 50       14 $self->fire(0) ? 1 : 0;
470             }
471              
472             sub wait {
473 48     48 1 12271 my ( $self, $cb ) = @_;
474             my $next = sub {
475 48     48   466 my $status = $self->{cv}->recv;
476 48         1507 waitpid $self->{pid} => 0;
477 48         297 $self->end;
478 48         643 $status;
479 48         469 };
480 48 100       195 if ($cb) {
481 11   50 0   43 my $old_cb = $self->{cv}->cb || sub { };
  0         0  
482 11         407 my $cv = AE::cv;
483             $self->{cv}->cb(
484             sub {
485 11     11   154 $old_cb->(@_);
486 11         46 my $status = $next->();
487 11         45 $cv->send( $cb->($status) );
488             }
489 11         180 );
490 11         105 return $cv;
491             }
492             else {
493 37         168 return $next->();
494             }
495             }
496              
497             sub finish {
498 35     35 1 455 my ($self) = @_;
499 35         301 $self->in->destroy;
500 35         2899 $self;
501             }
502              
503             sub end {
504 58     58 1 256 my ($self) = @_;
505 58         313 map { $_->destroy } values %{ $self->{handles} };
  177         4973  
  58         295  
506 58         1268 map { $_->() } @{ $self->{reapers} };
  177         419  
  58         317  
507 58         134 $self;
508             }
509              
510             sub stop_timeout {
511 0     0 1 0 my ($self) = @_;
512 0         0 $self->in->timeout(0);
513 0         0 $self->out->timeout(0);
514 0         0 $self->err->timeout(0);
515             }
516              
517             sub stop_wtimeout {
518 0     0 1 0 my ($self) = @_;
519 0         0 $self->in->wtimeout(0);
520             }
521              
522             sub stop_rtimeout {
523 0     0 1 0 my ($self) = @_;
524 0         0 $self->out->rtimeout(0);
525             }
526              
527             sub stop_etimeout {
528 0     0 1 0 my ($self) = @_;
529 0         0 $self->err->rtimeout(0);
530             }
531              
532             sub write {
533 21     21 1 65 my ( $self, $type, @args ) = @_;
534 21         36 my $ok = 0;
535             try {
536 21     21   2033 $self->_geth('in')->push_write( $type => @args );
537 21         1344 $ok = 1;
538             }
539             catch {
540 0     0   0 AE::log warn => $_;
541 21         565 };
542 21         585 $ok;
543             }
544              
545             sub writeln {
546 9     9 1 722 my ( $self, @lines ) = @_;
547 9         46 $self->write( $_ . $self->_eol ) for @lines;
548 9         21 $self;
549             }
550              
551             sub pipe {
552 50     50 1 519 my $self = shift;
553 50         74 my $peer = pop;
554 50   100     219 my $what = ( pop || 'out' );
555 50 100       179 if ( ref $what ) {
556 2         4 $what = "$what";
557             }
558             else {
559 48         233 $what = lc $what;
560 48         251 $what =~ s{^std}{};
561             }
562 33     33   638 use Scalar::Util qw(blessed);
  33         44  
  33         10372  
563 50         52 my $sub;
564 50 100       418 if ( blessed $peer) {
    100          
    50          
    0          
565 3 50       192 if ( $peer->isa(__PACKAGE__) ) {
    100          
    50          
    50          
566             $sub = sub {
567 0     0   0 $peer->write(shift);
568             }
569 0         0 }
570             elsif ( $peer->isa('AnyEvent::Handle') ) {
571             $sub = sub {
572 2     2   16 $peer->push_write(shift);
573             }
574 2         82 }
575             elsif ( $peer->isa('Coro::Channel') ) {
576             $sub = sub {
577 0     0   0 $peer->put(shift);
578             }
579 0         0 }
580             elsif ( $peer->can('print') ) {
581             $sub = sub {
582 1     1   14 $peer->print(shift);
583             }
584 1         11 }
585             }
586             elsif ( ref $peer eq 'SCALAR' ) {
587             $sub = sub {
588 24     24   130 $$peer .= shift;
589             }
590 44         347 }
591             elsif ( ref $peer eq 'GLOB' ) {
592             $sub = sub {
593 3     3   54 print $peer shift();
594             }
595 3         30 }
596             elsif ( ref $peer eq 'CODE' ) {
597 0         0 $sub = $peer;
598             }
599 50 50       114 if ($sub) {
600 50         150 _on_read_helper( $self->_geth($what), $sub );
601             }
602             else {
603 0         0 AE::log fatal => "cannot handle $peer for $what";
604             }
605             }
606              
607             sub pull {
608 15     15 1 1410 my ( $self, $peer ) = @_;
609 33     33   381 use Scalar::Util qw(blessed);
  33         46  
  33         52471  
610 15         16 my $sub;
611 15 100       136 if ( blessed $peer) {
    100          
    50          
612 11 50       271 if ( $peer->isa(__PACKAGE__) ) {
    100          
    50          
    0          
613 0         0 return $peer->pipe($self);
614             }
615             elsif ( $peer->isa('AnyEvent::Handle') ) {
616             $peer->on_eof(
617             sub {
618 9     9   649 AE::log debug => "$peer->on_eof";
619 9         607 $self->finish;
620 9         34 shift->destroy;
621             }
622 9         150 );
623             $peer->on_error(
624             sub {
625 0     0   0 AE::log error => "$peer: " . $_[2];
626 0         0 $self->finish;
627 0         0 shift->destroy;
628             }
629 9         258 );
630             return _on_read_helper(
631             $peer,
632             sub {
633 9     9   170 AE::log debug => "$peer->on_read";
634 9         27267 $self->write( shift() );
635             }
636 9         196 );
637             }
638             elsif ( $peer->isa('IO::Handle') ) {
639 2         48 return $self->pull( AnyEvent::Handle->new( fh => $peer ) );
640             }
641             elsif ( $peer->isa('Coro::Channel') ) {
642 0         0 require Coro;
643 0         0 return Coro::async {
644 0         0 while ( my $x = $peer->get ) {
645 0 0       0 $self->write($x) or last;
646 0         0 Coro::cede();
647             }
648             };
649             }
650             }
651             elsif ( ref $peer eq 'SCALAR' ) {
652             return _read_on_scalar(
653             $peer,
654             sub {
655 3     3   42 AE::log debug => "$peer->STORE";
656 3         340 $self->write( shift() );
657             }
658 1         24 );
659             }
660             elsif ( ref $peer eq 'GLOB' ) {
661 3         75 return $self->pull( AnyEvent::Handle->new( fh => $peer ) );
662             }
663 0         0 AE::log fatal => "cannot handle $peer for stdin";
664             }
665              
666             sub _push_read {
667 1     1   12 my ( $self, $what, @args ) = @_;
668 1         2 my $ok = 0;
669             try {
670 1     1   34 $self->_geth($what)->push_read(@args);
671 1         133 $ok = 1;
672             }
673             catch {
674 0     0   0 AE::log warn => "cannot push_read from std$what: $_";
675 1         12 };
676 1         13 $ok;
677             }
678              
679             sub _unshift_read {
680 0     0   0 my ( $self, $what, @args ) = @_;
681 0         0 my $ok = 0;
682             try {
683 0     0   0 $self->_geth($what)->unshift_read(@args);
684 0         0 $ok = 1;
685             }
686             catch {
687 0     0   0 AE::log warn => "cannot unshift_read from std$what: $_";
688 0         0 };
689 0         0 $ok;
690             }
691              
692             sub _readline {
693 1     1   1 my ( $self, $what, $sub ) = @_;
694 1         3 $self->_push_read( $what => line => $self->_reol, $sub );
695             }
696              
697             sub _readchunk {
698 0     0   0 my ( $self, $what, $bytes, $sub ) = @_;
699 0         0 $self->_push_read( $what => chunk => $bytes => $sub );
700             }
701              
702             sub _sub_cb {
703 0     0   0 my ($cb) = @_;
704 0     0   0 sub { $cb->( $_[1] ) }
705 0         0 }
706              
707             sub _sub_cv {
708 1     1   1 my ($cv) = @_;
709 1     1   601428 sub { $cv->send( $_[1] ) }
710 1         16 }
711              
712             sub _sub_ch {
713 0     0   0 my ($ch) = @_;
714 0     0   0 sub { $ch->put( $_[1] ) }
715 0         0 }
716              
717             sub _readline_cb {
718 0     0   0 my ( $self, $what, $cb ) = @_;
719 0         0 $self->_push_waiter( $what => $cb );
720 0         0 $self->_readline( $what => _sub_cb($cb) );
721             }
722              
723             sub _readline_cv {
724 1     1   6 my ( $self, $what, $cv ) = @_;
725 1   33     55 $cv ||= AE::cv;
726 1         10 $self->_push_waiter( $what => $cv );
727 1 50       7 $cv->send unless $self->_readline( $what => _sub_cv($cv) );
728 1         13 $cv;
729             }
730              
731             sub _readline_ch {
732 0     0   0 my ( $self, $what, $channel ) = @_;
733 0 0       0 unless ($channel) {
734 0         0 require Coro::Channel;
735 0   0     0 $channel ||= Coro::Channel->new;
736             }
737 0         0 $self->_push_waiter( $what => $channel );
738 0 0       0 $channel->shutdown unless $self->_readline( $what => _sub_ch($channel) );
739 0         0 $channel;
740             }
741              
742             sub _readlines_cb {
743 0     0   0 my ( $self, $what, $cb ) = @_;
744 0         0 $self->_push_waiter( $what => $cb );
745             $self->_geth($what)->on_read(
746             sub {
747 0     0   0 $self->_readline( $what => _sub_cb($cb) );
748             }
749 0         0 );
750             }
751              
752             sub _readlines_ch {
753 0     0   0 my ( $self, $what, $channel ) = @_;
754 0 0       0 unless ($channel) {
755 0         0 require Coro::Channel;
756 0   0     0 $channel ||= Coro::Channel->new;
757             }
758 0         0 $self->_push_waiter( $what => $channel );
759             $channel->shutdown unless $self->_geth($what)->on_read(
760             sub {
761 0     0   0 $self->_readline( $what => _sub_ch($channel) );
762             }
763 0 0       0 );
764 0         0 $channel;
765             }
766              
767             sub _readchunk_cb {
768 0     0   0 my ( $self, $what, $bytes, $cb ) = @_;
769 0         0 $self->_push_waiter( $what => $cb );
770 0         0 $self->_readchunk( $what, $bytes, _sub_cb($cb) );
771             }
772              
773             sub _readchunk_cv {
774 0     0   0 my ( $self, $what, $bytes, $cv ) = @_;
775 0   0     0 $cv ||= AE::cv;
776 0         0 $self->_push_waiter( $what => $cv );
777 0         0 $self->_readchunk( $what, $bytes, _sub_cv($cv) );
778 0         0 $cv;
779             }
780              
781             sub _readchunk_ch {
782 0     0   0 my ( $self, $what, $bytes, $channel ) = @_;
783 0 0       0 unless ($channel) {
784 0         0 require Coro::Channel;
785 0   0     0 $channel ||= Coro::Channel->new;
786             }
787 0         0 $self->_push_waiter( $what => $channel );
788 0 0       0 $channel->shutdown unless $self->_readline( $what => _sub_ch($channel) );
789 0         0 $channel;
790             }
791              
792             sub _readchunks_ch {
793 0     0   0 my ( $self, $what, $bytes, $channel ) = @_;
794 0 0       0 unless ($channel) {
795 0         0 require Coro::Channel;
796 0   0     0 $channel ||= Coro::Channel->new;
797             }
798 0         0 $self->_push_waiter( $what => $channel );
799             $channel->shutdown unless $self->_geth($what)->on_read(
800             sub {
801 0     0   0 $self->_readline( $what => _sub_ch($channel) );
802             }
803 0 0       0 );
804 0         0 $channel;
805             }
806              
807             sub readline_cb {
808 0     0 1 0 my ( $self, $cb ) = @_;
809 0         0 $self->_readline_cb( out => $cb );
810             }
811              
812             sub readline_cv {
813 1     1 1 1 my ( $self, $cv ) = @_;
814 1         10 $self->_readline_cv( out => $cv );
815             }
816              
817             sub readline_ch {
818 0     0 1 0 my ( $self, $ch ) = @_;
819 0         0 $self->_readline_ch( out => $ch );
820             }
821              
822             sub readlines_cb {
823 0     0 1 0 my ( $self, $cb ) = @_;
824 0         0 $self->_readlines_cb( out => $cb );
825             }
826              
827             sub readlines_ch {
828 0     0 1 0 my ( $self, $ch ) = @_;
829 0         0 $self->_readlines_ch( out => $ch );
830             }
831              
832             sub readline {
833 1     1 1 8 shift->readline_cv->recv;
834             }
835              
836             sub readline_error_cb {
837 0     0 1 0 my ( $self, $cb ) = @_;
838 0         0 $self->_readline_cb( err => $cb );
839             }
840              
841             sub readline_error_cv {
842 0     0 1 0 my ( $self, $cv ) = @_;
843 0         0 $self->_readline_cv( err => $cv );
844             }
845              
846             sub readline_error_ch {
847 0     0 1 0 my ( $self, $ch ) = @_;
848 0         0 $self->_readline_ch( err => $ch );
849             }
850              
851             sub readlines_error_cb {
852 0     0 1 0 my ( $self, $cb ) = @_;
853 0         0 $self->_readlines_cb( out => $cb );
854             }
855              
856             sub readlines_error_ch {
857 0     0 1 0 my ( $self, $ch ) = @_;
858 0         0 $self->_readlines_ch( out => $ch );
859             }
860              
861             sub readline_error {
862 0     0 1 0 shift->readline_error_cv->recv;
863             }
864              
865             # AnyEvent::Impl::Perl has some issues with POSIX::dup.
866             # This statement solves the problem.
867             AnyEvent::post_detect {
868             AE::child $$ => sub { };
869             };
870              
871             1;
872              
873             package # hidden
874             AnyEvent::Proc::R;
875              
876 33     33   229 use overload '""' => sub { shift->{fileno} };
  33     18   59  
  33         359  
  18         448  
877              
878 2     2   60 sub A { shift->{r} }
879 3     3   27 sub B { shift->{w} }
880              
881             sub on_timeout {
882 0     0   0 shift->A->on_wtimeout(pop);
883             }
884              
885             sub stop_timeout {
886 0     0   0 shift->A->stop_wtimeout;
887             }
888              
889             sub pipe {
890 2     2   46 my ( $self, $peer ) = @_;
891 2         6 $self->{proc}->pipe( $self => $peer );
892             }
893              
894             sub readline_cb {
895 0     0   0 my ( $self, $cb ) = @_;
896 0         0 $self->{proc}->_readline_cb( $self => $cb );
897             }
898              
899             sub readline_cv {
900 0     0   0 my ( $self, $cv ) = @_;
901 0         0 $self->{proc}->_readline_cv( $self => $cv );
902             }
903              
904             sub readline_ch {
905 0     0   0 my ( $self, $ch ) = @_;
906 0         0 $self->{proc}->_readline_ch( $self => $ch );
907             }
908              
909             sub readlines_cb {
910 0     0   0 my ( $self, $cb ) = @_;
911 0         0 $self->{proc}->_readlines_cb( $self => $cb );
912             }
913              
914             sub readlines_ch {
915 0     0   0 my ( $self, $ch ) = @_;
916 0         0 $self->{proc}->_readlines_cb( $self => $ch );
917             }
918              
919             sub readline {
920 0     0   0 shift->readline_cv->recv;
921             }
922              
923             1;
924              
925             package # hidden
926             AnyEvent::Proc::W;
927              
928 33     33   11456 use overload '""' => sub { shift->{fileno} };
  33     7   55  
  33         175  
  7         187  
929              
930 33     33   1473 use Try::Tiny;
  33         36  
  33         9176  
931              
932 3     3   51 sub A { shift->{w} }
933 2     2   20 sub B { shift->{r} }
934              
935             sub finish {
936 1     1   9 shift->A->destroy;
937             }
938              
939             sub on_timeout {
940 0     0   0 shift->A->on_rtimeout(pop);
941             }
942              
943             sub stop_timeout {
944 0     0   0 shift->A->stop_rtimeout;
945             }
946              
947             sub write {
948 1     1   2 my ( $self, $type, @args ) = @_;
949 1         2 my $ok = 0;
950             try {
951 1     1   123 $self->A->push_write( $type => @args );
952 1         68 $ok = 1;
953             }
954             catch {
955 0     0   0 AE::log warn => $_;
956 1         38 };
957 1         36 $ok;
958             }
959              
960             sub writeln {
961 1     1   46 my ( $self, @lines ) = @_;
962 1         5 my $eol = $self->{proc}->_eol;
963 1         11 $self->write( $_ . $eol ) for @lines;
964 1         2 $self;
965             }
966              
967 0     0   0 sub pull { die 'UNIMPLEMENTED' }
968              
969             1;
970              
971             package # hidden
972             AnyEvent::Proc::TiedScalar;
973              
974 33     33   19569 use Tie::Scalar;
  33         17817  
  33         2667  
975              
976             our @ISA = ('Tie::Scalar');
977              
978             sub TIESCALAR {
979 1     1   9 bless pop, shift;
980             }
981              
982             sub FETCH {
983 1     1   6 undef;
984             }
985              
986             sub STORE {
987 3     3   32 shift->(pop);
988             }
989              
990             1;
991              
992             __END__