File Coverage

blib/lib/Coro/Handle.pm
Criterion Covered Total %
statement 110 244 45.0
branch 29 94 30.8
condition 9 60 15.0
subroutine 26 72 36.1
pod 30 32 93.7
total 204 502 40.6


line stmt bran cond sub pod time code
1             =head1 NAME
2              
3             Coro::Handle - non-blocking I/O with a blocking interface.
4              
5             =head1 SYNOPSIS
6              
7             use Coro::Handle;
8              
9             =head1 DESCRIPTION
10              
11             This module is an L user, you need to make sure that you use and
12             run a supported event loop.
13              
14             This module implements IO-handles in a coroutine-compatible way, that is,
15             other coroutines can run while reads or writes block on the handle.
16              
17             It does so by using L to wait for readable/writable
18             data, allowing other coroutines to run while one coroutine waits for I/O.
19              
20             Coro::Handle does NOT inherit from IO::Handle but uses tied objects.
21              
22             If at all possible, you should I prefer method calls on the handle object over invoking
23             tied methods, i.e.:
24              
25             $fh->print ($str); # NOT print $fh $str;
26             my $line = $fh->readline; # NOT my $line = <$fh>;
27              
28             The reason is that perl recurses within the interpreter when invoking tie
29             magic, forcing the (temporary) allocation of a (big) stack. If you have
30             lots of socket connections and they happen to wait in e.g. <$fh>, then
31             they would all have a costly C coroutine associated with them.
32              
33             =over 4
34              
35             =cut
36              
37             package Coro::Handle;
38              
39 2     2   362 use common::sense;
  2         4  
  2         11  
40              
41 2     2   88 use Carp ();
  2         3  
  2         34  
42 2     2   7 use Errno qw(EAGAIN EINTR EINPROGRESS);
  2         4  
  2         219  
43              
44 2     2   11 use AnyEvent::Util qw(WSAEWOULDBLOCK WSAEINPROGRESS);
  2         3  
  2         106  
45 2     2   333 use AnyEvent::Socket ();
  2         10398  
  2         41  
46              
47 2     2   10 use base 'Exporter';
  2         3  
  2         3085  
48              
49             our $VERSION = 6.514;
50             our @EXPORT = qw(unblock);
51              
52             =item $fh = new_from_fh Coro::Handle $fhandle [, arg => value...]
53              
54             Create a new non-blocking io-handle using the given
55             perl-filehandle. Returns C if no filehandle is given. The only
56             other supported argument is "timeout", which sets a timeout for each
57             operation.
58              
59             =cut
60              
61             sub new_from_fh {
62 8     8 1 13 my $class = shift;
63 8 50       42 my $fh = shift or return;
64 8         19 my $self = do { local *Coro::Handle };
  8         25  
65              
66 8         37 tie *$self, 'Coro::Handle::FH', fh => $fh, @_;
67              
68 8 50       26 bless \$self, ref $class ? ref $class : $class
69             }
70              
71             =item $fh = unblock $fh
72              
73             This is a convenience function that just calls C on the
74             given filehandle. Use it to replace a normal perl filehandle by a
75             non-(coroutine-)blocking equivalent.
76              
77             =cut
78              
79             sub unblock($) {
80 8     8 1 168 new_from_fh Coro::Handle $_[0]
81             }
82              
83             =item $fh->writable, $fh->readable
84              
85             Wait until the filehandle is readable or writable (and return true) or
86             until an error condition happens (and return false).
87              
88             =cut
89              
90 0     0 1 0 sub readable { Coro::Handle::FH::readable (tied *${$_[0]}) }
  0         0  
91 0     0 1 0 sub writable { Coro::Handle::FH::writable (tied *${$_[0]}) }
  0         0  
92              
93             =item $fh->readline ([$terminator])
94              
95             Similar to the builtin of the same name, but allows you to specify the
96             input record separator in a coroutine-safe manner (i.e. not using a global
97             variable). Paragraph mode is not supported, use "\n\n" to achieve the same
98             effect.
99              
100             =cut
101              
102 28     28 1 104 sub readline { tied(*${+shift})->READLINE (@_) }
  28         62  
103              
104             =item $fh->autoflush ([...])
105              
106             Always returns true, arguments are being ignored (exists for compatibility
107             only). Might change in the future.
108              
109             =cut
110              
111 4     4 1 11 sub autoflush { !0 }
112              
113             =item $fh->fileno, $fh->close, $fh->read, $fh->sysread, $fh->syswrite, $fh->print, $fh->printf
114              
115             Work like their function equivalents (except read, which works like
116             sysread. You should not use the read function with Coro::Handle's, it will
117             work but it's not efficient).
118              
119             =cut
120              
121 0     0 1 0 sub read { Coro::Handle::FH::READ (tied *${$_[0]}, $_[1], $_[2], $_[3]) }
  0         0  
122 0     0 1 0 sub sysread { Coro::Handle::FH::READ (tied *${$_[0]}, $_[1], $_[2], $_[3]) }
  0         0  
123 0     0 1 0 sub syswrite { Coro::Handle::FH::WRITE (tied *${$_[0]}, $_[1], $_[2], $_[3]) }
  0         0  
124 4     4 1 14 sub print { Coro::Handle::FH::WRITE (tied *${+shift}, join "", @_) }
  4         15  
125 0     0 1 0 sub printf { Coro::Handle::FH::PRINTF (tied *${+shift}, @_) }
  0         0  
126 0     0 1 0 sub fileno { Coro::Handle::FH::FILENO (tied *${$_[0]}) }
  0         0  
127 4     4 1 15 sub close { Coro::Handle::FH::CLOSE (tied *${$_[0]}) }
  4         16  
128 0     0 0 0 sub blocking { !0 } # this handler always blocks the caller
129              
130             sub partial {
131 0     0 0 0 my $obj = tied *${$_[0]};
  0         0  
132              
133 0         0 my $retval = $obj->[8];
134 0 0       0 $obj->[8] = $_[1] if @_ > 1;
135 0         0 $retval
136             }
137              
138             =item connect, listen, bind, getsockopt, setsockopt,
139             send, recv, peername, sockname, shutdown, peerport, peerhost
140              
141             Do the same thing as the perl builtins or IO::Socket methods (but return
142             true on EINPROGRESS). Remember that these must be method calls.
143              
144             =cut
145              
146 0 0 0 0 1 0 sub connect { connect tied(*${$_[0]})->[0], $_[1] or $! == EINPROGRESS or $! == EAGAIN or $! == WSAEWOULDBLOCK }
  0   0     0  
147 0     0 1 0 sub bind { bind tied(*${$_[0]})->[0], $_[1] }
  0         0  
148 0     0 1 0 sub listen { listen tied(*${$_[0]})->[0], $_[1] }
  0         0  
149 0     0 1 0 sub getsockopt { getsockopt tied(*${$_[0]})->[0], $_[1], $_[2] }
  0         0  
150 0     0 1 0 sub setsockopt { setsockopt tied(*${$_[0]})->[0], $_[1], $_[2], $_[3] }
  0         0  
151 0 0   0 1 0 sub send { send tied(*${$_[0]})->[0], $_[1], $_[2], @_ > 2 ? $_[3] : () }
  0         0  
152 0 0   0 1 0 sub recv { recv tied(*${$_[0]})->[0], $_[1], $_[2], @_ > 2 ? $_[3] : () }
  0         0  
153 0     0 1 0 sub sockname { getsockname tied(*${$_[0]})->[0] }
  0         0  
154 0     0 1 0 sub peername { getpeername tied(*${$_[0]})->[0] }
  0         0  
155 0     0 1 0 sub shutdown { shutdown tied(*${$_[0]})->[0], $_[1] }
  0         0  
156              
157             =item peeraddr, peerhost, peerport
158              
159             Return the peer host (as numericla IP address) and peer port (as integer).
160              
161             =cut
162              
163             sub peeraddr {
164 0     0 1 0 (AnyEvent::Socket::unpack_sockaddr getpeername tied(*${$_[0]})->[0])[1]
  0         0  
165             }
166              
167             sub peerport {
168 0     0 1 0 (AnyEvent::Socket::unpack_sockaddr getpeername tied(*${$_[0]})->[0])[0]
  0         0  
169             }
170              
171             sub peerhost {
172 0     0 1 0 AnyEvent::Socket::format_address &peeraddr
173             }
174              
175             =item ($fh, $peername) = $listen_fh->accept
176              
177             In scalar context, returns the newly accepted socket (or undef) and in
178             list context return the ($fh, $peername) pair (or nothing).
179              
180             =cut
181              
182             sub accept {
183 0     0 1 0 my ($peername, $fh);
184 0         0 while () {
185 0 0       0 $peername = accept $fh, tied(*${$_[0]})->[0]
  0 0       0  
186             and return wantarray
187             ? ($_[0]->new_from_fh($fh), $peername)
188             : $_[0]->new_from_fh($fh);
189              
190 0 0 0     0 return if $! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK;
      0        
191              
192 0 0       0 $_[0]->readable or return;
193             }
194             }
195              
196             =item $fh->timeout ([...])
197              
198             The optional argument sets the new timeout (in seconds) for this
199             handle. Returns the current (new) value.
200              
201             C<0> is a valid timeout, use C to disable the timeout.
202              
203             =cut
204              
205             sub timeout {
206 0     0 1 0 my $self = tied *${$_[0]};
  0         0  
207 0 0       0 if (@_ > 1) {
208 0         0 $self->[2] = $_[1];
209 0 0       0 $self->[5]->timeout ($_[1]) if $self->[5];
210 0 0       0 $self->[6]->timeout ($_[1]) if $self->[6];
211             }
212 0         0 $self->[2]
213             }
214              
215             =item $fh->fh
216              
217             Returns the "real" (non-blocking) filehandle. Use this if you want to
218             do operations on the file handle you cannot do using the Coro::Handle
219             interface.
220              
221             =item $fh->rbuf
222              
223             Returns the current contents of the read buffer (this is an lvalue, so you
224             can change the read buffer if you like).
225              
226             You can use this function to implement your own optimized reader when neither
227             readline nor sysread are viable candidates, like this:
228              
229             # first get the _real_ non-blocking filehandle
230             # and fetch a reference to the read buffer
231             my $nb_fh = $fh->fh;
232             my $buf = \$fh->rbuf;
233              
234             while () {
235             # now use buffer contents, modifying
236             # if necessary to reflect the removed data
237              
238             last if $$buf ne ""; # we have leftover data
239              
240             # read another buffer full of data
241             $fh->readable or die "end of file";
242             sysread $nb_fh, $$buf, 8192;
243             }
244              
245             =cut
246              
247             sub fh {
248 0     0 1 0 (tied *${$_[0]})->[0];
  0         0  
249             }
250              
251             sub rbuf : lvalue {
252 0     0 1 0 (tied *${$_[0]})->[3];
  0         0  
253             }
254              
255       0     sub DESTROY {
256             # nop
257             }
258              
259             our $AUTOLOAD;
260              
261             sub AUTOLOAD {
262 0     0   0 my $self = tied *${$_[0]};
  0         0  
263              
264 0         0 (my $func = $AUTOLOAD) =~ s/^(.*):://;
265              
266 0         0 my $forward = UNIVERSAL::can $self->[7], $func;
267              
268 0 0       0 $forward or
269             die "Can't locate object method \"$func\" via package \"" . (ref $self) . "\"";
270              
271 0         0 goto &$forward;
272             }
273              
274             package Coro::Handle::FH;
275              
276 2     2   18 use common::sense;
  2         5  
  2         13  
277              
278 2     2   87 use Carp 'croak';
  2         3  
  2         105  
279 2     2   11 use Errno qw(EAGAIN EINTR);
  2         4  
  2         67  
280              
281 2     2   15 use AnyEvent::Util qw(WSAEWOULDBLOCK);
  2         4  
  2         67  
282              
283 2     2   343 use Coro::AnyEvent;
  2         3  
  2         3714  
284              
285             # formerly a hash, but we are speed-critical, so try
286             # to be faster even if it hurts.
287             #
288             # 0 FH
289             # 1 desc
290             # 2 timeout
291             # 3 rb
292             # 4 wb # unused
293             # 5 read watcher, if Coro::Event|EV used
294             # 6 write watcher, if Coro::Event|EV used
295             # 7 forward class
296             # 8 blocking
297              
298             sub TIEHANDLE {
299 8     8   19 my ($class, %arg) = @_;
300              
301 8         14 my $self = bless [], $class;
302 8         18 $self->[0] = $arg{fh};
303 8         9 $self->[1] = $arg{desc};
304 8         11 $self->[2] = $arg{timeout};
305 8         11 $self->[3] = "";
306 8         11 $self->[4] = "";
307 8         8 $self->[5] = undef; # work around changes in 5.20, which requires initialisation
308 8         11 $self->[6] = undef; # work around changes in 5.20, which requires initialisation
309 8         10 $self->[7] = $arg{forward_class};
310 8         10 $self->[8] = $arg{partial};
311              
312 8         19 AnyEvent::Util::fh_nonblocking $self->[0], 1;
313              
314 8         39 $self
315             }
316              
317             sub cleanup {
318             # gets overriden for Coro::Event
319 12     12   14 @{$_[0]} = ();
  12         64  
320             }
321              
322             sub OPEN {
323 0     0   0 &cleanup;
324 0         0 my $self = shift;
325 0 0       0 my $r = @_ == 2 ? open $self->[0], $_[0], $_[1]
326             : open $self->[0], $_[0], $_[1], $_[2];
327              
328 0 0       0 if ($r) {
329 0 0       0 fcntl $self->[0], &Fcntl::F_SETFL, &Fcntl::O_NONBLOCK
330             or croak "fcntl(O_NONBLOCK): $!";
331             }
332              
333             $r
334 0         0 }
335              
336             sub PRINT {
337 0     0   0 WRITE (shift, join "", @_)
338             }
339              
340             sub PRINTF {
341 0     0   0 WRITE (shift, sprintf shift, @_)
342             }
343              
344             sub GETC {
345 0     0   0 my $buf;
346 0         0 READ ($_[0], $buf, 1);
347 0         0 $buf
348             }
349              
350             sub BINMODE {
351 0     0   0 binmode $_[0][0];
352             }
353              
354             sub TELL {
355 0     0   0 Carp::croak "Coro::Handle's don't support tell()";
356             }
357              
358             sub SEEK {
359 0     0   0 Carp::croak "Coro::Handle's don't support seek()";
360             }
361              
362             sub EOF {
363 0     0   0 Carp::croak "Coro::Handle's don't support eof()";
364             }
365              
366             sub CLOSE {
367 4     4   7 my $fh = $_[0][0];
368 4         7 &cleanup;
369 4         40 close $fh
370             }
371              
372             sub DESTROY {
373 8     8   97 &cleanup;
374             }
375              
376             sub FILENO {
377 0     0   0 fileno $_[0][0]
378             }
379              
380             # seems to be called for stringification (how weird), at least
381             # when DumpValue::dumpValue is used to print this.
382             sub FETCH {
383 0     0   0 "$_[0]<$_[0][1]>"
384             }
385              
386             sub _readable_anyevent {
387 4     4   11 my $cb = Coro::rouse_cb;
388              
389 4     4   21 my $w = AE::io $_[0][0], 0, sub { $cb->(1) };
  4         298  
390 4   33 0   11 my $t = (defined $_[0][2]) && AE::timer $_[0][2], 0, sub { $cb->(0) };
  0         0  
391              
392 4         52 Coro::rouse_wait
393             }
394              
395             sub _writable_anyevent {
396 0     0   0 my $cb = Coro::rouse_cb;
397              
398 0     0   0 my $w = AE::io $_[0][0], 1, sub { $cb->(1) };
  0         0  
399 0   0 0   0 my $t = (defined $_[0][2]) && AE::timer $_[0][2], 0, sub { $cb->(0) };
  0         0  
400              
401 0         0 Coro::rouse_wait
402             }
403              
404             sub _readable_coro {
405 0   0 0   0 ($_[0][5] ||= "Coro::Event"->io (
406             fd => $_[0][0],
407             desc => "fh $_[0][1] read watcher",
408             timeout => $_[0][2],
409             poll => &Event::Watcher::R + &Event::Watcher::E + &Event::Watcher::T,
410             ))->next->[4] & &Event::Watcher::R
411             }
412              
413             sub _writable_coro {
414 0   0 0   0 ($_[0][6] ||= "Coro::Event"->io (
415             fd => $_[0][0],
416             desc => "fh $_[0][1] write watcher",
417             timeout => $_[0][2],
418             poll => &Event::Watcher::W + &Event::Watcher::E + &Event::Watcher::T,
419             ))->next->[4] & &Event::Watcher::W
420             }
421              
422             #sub _readable_ev {
423             # &EV::READ == Coro::EV::timed_io_once (fileno $_[0][0], &EV::READ , $_[0][2])
424             #}
425             #
426             #sub _writable_ev {
427             # &EV::WRITE == Coro::EV::timed_io_once (fileno $_[0][0], &EV::WRITE, $_[0][2])
428             #}
429              
430             # decide on event model at runtime
431             for my $rw (qw(readable writable)) {
432             *$rw = sub {
433 1     1   4 AnyEvent::detect;
434 1 50 33     8 if ($AnyEvent::MODEL eq "AnyEvent::Impl::Event" and eval { require Coro::Event }) {
  0 50 33     0  
435 0         0 *$rw = \&{"_$rw\_coro"};
  0         0  
436             *cleanup = sub {
437 0     0   0 eval {
438 0 0       0 $_[0][5]->cancel if $_[0][5];
439 0 0       0 $_[0][6]->cancel if $_[0][6];
440             };
441 0         0 @{$_[0]} = ();
  0         0  
442 0         0 };
443              
444 1         50 } elsif ($AnyEvent::MODEL eq "AnyEvent::Impl::EV" and eval { require Coro::EV }) {
445 0         0 *$rw = \&{"Coro::EV::_$rw\_ev"};
  0         0  
446 0         0 return &$rw; # Coro 5.0+ doesn't support goto &SLF, and this line is executed once only
447              
448             } else {
449 1         2 *$rw = \&{"_$rw\_anyevent"};
  1         6  
450             }
451 1         4 goto &$rw
452             };
453             };
454              
455             sub WRITE {
456 4 50   4   10 my $len = defined $_[2] ? $_[2] : length $_[1];
457 4         4 my $ofs = $_[3];
458 4         5 my $res;
459              
460 4         4 while () {
461 4         24 my $r = syswrite ($_[0][0], $_[1], $len, $ofs);
462 4 50 0     7 if (defined $r) {
    0 0        
463 4         4 $len -= $r;
464 4         5 $ofs += $r;
465 4         5 $res += $r;
466 4 50       34 last unless $len;
467             } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) {
468 0         0 last;
469             }
470 0 0       0 last unless &writable;
471             }
472              
473             $res
474 4         7 }
475              
476             sub READ {
477 0     0   0 my $len = $_[2];
478 0         0 my $ofs = $_[3];
479 0         0 my $res;
480              
481             # first deplete the read buffer
482 0 0       0 if (length $_[0][3]) {
483 0         0 my $l = length $_[0][3];
484 0 0       0 if ($l <= $len) {
485 0         0 substr ($_[1], $ofs) = $_[0][3]; $_[0][3] = "";
  0         0  
486 0         0 $len -= $l;
487 0         0 $ofs += $l;
488 0         0 $res += $l;
489 0 0       0 return $res unless $len;
490             } else {
491 0         0 substr ($_[1], $ofs) = substr ($_[0][3], 0, $len);
492 0         0 substr ($_[0][3], 0, $len) = "";
493 0         0 return $len;
494             }
495             }
496              
497 0         0 while() {
498 0         0 my $r = sysread $_[0][0], $_[1], $len, $ofs;
499 0 0 0     0 if (defined $r) {
    0 0        
500 0         0 $len -= $r;
501 0         0 $ofs += $r;
502 0         0 $res += $r;
503 0 0 0     0 last unless $len && $r;
504             } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) {
505 0         0 last;
506             }
507 0 0 0     0 last if $_[0][8] || !&readable;
508             }
509              
510             $res
511 0         0 }
512              
513             sub READLINE {
514 28 50   28   45 my $irs = @_ > 1 ? $_[1] : $/;
515 28         33 my ($ofs, $len, $pos);
516 28         29 my $bufsize = 1020;
517              
518 28         26 while () {
519 36 100       52 if (length $irs) {
    100          
520 25 100       45 $pos = index $_[0][3], $irs, $ofs < 0 ? 0 : $ofs;
521              
522 25 100       71 return substr $_[0][3], 0, $pos + length $irs, ""
523             if $pos >= 0;
524              
525 8         12 $ofs = (length $_[0][3]) - (length $irs);
526             } elsif (defined $irs) {
527 7 50       17 $pos = index $_[0][3], "\n\n", $ofs < 1 ? 1 : $ofs;
528              
529 7 100       12 if ($pos >= 0) {
530 3         8 my $res = substr $_[0][3], 0, $pos + 2, "";
531 3         8 $res =~ s/\A\n+//;
532 3         8 return $res;
533             }
534              
535 4         6 $ofs = (length $_[0][3]) - 1;
536             }
537              
538 16         22 $len = $bufsize - length $_[0][3];
539 16 50       29 $len = $bufsize *= 2 if $len < $bufsize * 0.5;
540 16         67 $len = sysread $_[0][0], $_[0][3], $len, length $_[0][3];
541              
542 16 100       26 unless ($len) {
543 12 100 33     43 if (defined $len) {
    50 33        
      33        
544             # EOF
545 8 100       20 return undef unless length $_[0][3];
546              
547 4 100 100     14 $_[0][3] =~ s/\A\n+//
548             if ! length $irs && defined $irs;
549              
550 4         11 return delete $_[0][3];
551             } elsif (($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) || !&readable) {
552 0 0         return length $_[0][3] ? delete $_[0][3] : undef;
553             }
554             }
555             }
556             }
557              
558             1;
559              
560             =back
561              
562             =head1 BUGS
563              
564             - Perl's IO-Handle model is THE bug.
565              
566             =head1 AUTHOR/SUPPORT/CONTACT
567              
568             Marc A. Lehmann
569             http://software.schmorp.de/pkg/Coro.html
570              
571             =cut
572