File Coverage

blib/lib/IO/Async/Stream.pm
Criterion Covered Total %
statement 354 374 94.6
branch 156 204 76.4
condition 71 101 70.3
subroutine 55 59 93.2
pod 24 24 100.0
total 660 762 86.6


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2006-2020 -- leonerd@leonerd.org.uk
5              
6             package IO::Async::Stream;
7              
8 54     54   7253 use strict;
  54         137  
  54         1971  
9 54     54   294 use warnings;
  54         116  
  54         2943  
10              
11             our $VERSION = '0.79';
12              
13 54     54   354 use base qw( IO::Async::Handle );
  54         123  
  54         31884  
14              
15 54     54   3329 use Errno qw( EAGAIN EWOULDBLOCK EINTR EPIPE );
  54         7263  
  54         4522  
16              
17 54     54   370 use Carp;
  54         136  
  54         3437  
18              
19 54     54   37871 use Encode 2.11 qw( find_encoding STOP_AT_PARTIAL );
  54         607177  
  54         4862  
20 54     54   537 use Scalar::Util qw( blessed );
  54         139  
  54         2652  
21              
22 54     54   342 use IO::Async::Debug;
  54         164  
  54         1403  
23 54     54   4252 use IO::Async::Metrics '$METRICS';
  54         111  
  54         429  
24              
25             # Tuneable from outside
26             # Not yet documented
27             our $READLEN = 8192;
28             our $WRITELEN = 8192;
29              
30 54     54   24315 use Struct::Dumb;
  54         96557  
  54         308  
31              
32             # Element of the writequeue
33             struct Writer => [qw( data writelen on_write on_flush on_error watching )];
34              
35             # Element of the readqueue
36             struct Reader => [qw( on_read future )];
37              
38             # Bitfields in the want flags
39 54     54   5357 use constant WANT_READ_FOR_READ => 0x01;
  54         129  
  54         3199  
40 54     54   336 use constant WANT_READ_FOR_WRITE => 0x02;
  54         119  
  54         2548  
41 54     54   336 use constant WANT_WRITE_FOR_READ => 0x04;
  54         131  
  54         2578  
42 54     54   360 use constant WANT_WRITE_FOR_WRITE => 0x08;
  54         113  
  54         3049  
43 54     54   360 use constant WANT_ANY_READ => WANT_READ_FOR_READ |WANT_READ_FOR_WRITE;
  54         124  
  54         3150  
44 54     54   338 use constant WANT_ANY_WRITE => WANT_WRITE_FOR_READ|WANT_WRITE_FOR_WRITE;
  54         101  
  54         232259  
45              
46             =head1 NAME
47              
48             C - event callbacks and write bufering for a stream
49             filehandle
50              
51             =head1 SYNOPSIS
52              
53             use IO::Async::Stream;
54              
55             use IO::Async::Loop;
56             my $loop = IO::Async::Loop->new;
57              
58             my $stream = IO::Async::Stream->new(
59             read_handle => \*STDIN,
60             write_handle => \*STDOUT,
61              
62             on_read => sub {
63             my ( $self, $buffref, $eof ) = @_;
64              
65             while( $$buffref =~ s/^(.*\n)// ) {
66             print "Received a line $1";
67             }
68              
69             if( $eof ) {
70             print "EOF; last partial line is $$buffref\n";
71             }
72              
73             return 0;
74             }
75             );
76              
77             $loop->add( $stream );
78              
79             $stream->write( "An initial line here\n" );
80              
81             =head1 DESCRIPTION
82              
83             This subclass of L contains a filehandle that represents
84             a byte-stream. It provides buffering for both incoming and outgoing data. It
85             invokes the C handler when new data is read from the filehandle. Data
86             may be written to the filehandle by calling the C method.
87              
88             This class is suitable for any kind of filehandle that provides a
89             possibly-bidirectional reliable byte stream, such as a pipe, TTY, or
90             C socket (such as TCP or a byte-oriented UNIX local socket). For
91             datagram or raw message-based sockets (such as UDP) see instead
92             L.
93              
94             =cut
95              
96             =head1 EVENTS
97              
98             The following events are invoked, either using subclass methods or CODE
99             references in parameters:
100              
101             =head2 $ret = on_read \$buffer, $eof
102              
103             Invoked when more data is available in the internal receiving buffer.
104              
105             The first argument is a reference to a plain perl string. The code should
106             inspect and remove any data it likes, but is not required to remove all, or
107             indeed any of the data. Any data remaining in the buffer will be preserved for
108             the next call, the next time more data is received from the handle.
109              
110             In this way, it is easy to implement code that reads records of some form when
111             completed, but ignores partially-received records, until all the data is
112             present. If the handler wishes to be immediately invoke a second time, to have
113             another attempt at consuming more content, it should return C<1>. Otherwise,
114             it should return C<0>, and the handler will next be invoked when more data has
115             arrived from the underlying read handle and appended to the buffer. This makes
116             it easy to implement code that handles multiple incoming records at the same
117             time. Alternatively, if the handler function already attempts to consume as
118             much as possible from the buffer, it will have no need to return C<1> at all.
119             See the examples at the end of this documentation for more detail.
120              
121             The second argument is a scalar indicating whether the stream has reported an
122             end-of-file (EOF) condition. A reference to the buffer is passed to the
123             handler in the usual way, so it may inspect data contained in it. Once the
124             handler returns a false value, it will not be called again, as the handle is
125             now at EOF and no more data can arrive.
126              
127             The C code may also dynamically replace itself with a new callback
128             by returning a CODE reference instead of C<0> or C<1>. The original callback
129             or method that the object first started with may be restored by returning
130             C. Whenever the callback is changed in this way, the new code is called
131             again; even if the read buffer is currently empty. See the examples at the end
132             of this documentation for more detail.
133              
134             The C method can be used to insert new, temporary handlers that
135             take precedence over the global C handler. This event is only used if
136             there are no further pending handlers created by C.
137              
138             =head2 on_read_eof
139              
140             Optional. Invoked when the read handle indicates an end-of-file (EOF)
141             condition. If there is any data in the buffer still to be processed, the
142             C event will be invoked first, before this one.
143              
144             =head2 on_write_eof
145              
146             Optional. Invoked when the write handle indicates an end-of-file (EOF)
147             condition. Note that this condition can only be detected after a C
148             syscall returns the C error. If there is no data pending to be written
149             then it will not be detected yet.
150              
151             =head2 on_read_error $errno
152              
153             Optional. Invoked when the C method on the read handle fails.
154              
155             =head2 on_write_error $errno
156              
157             Optional. Invoked when the C method on the write handle fails.
158              
159             The C and C handlers are passed the value of
160             C<$!> at the time the error occurred. (The C<$!> variable itself, by its
161             nature, may have changed from the original error by the time this handler
162             runs so it should always use the value passed in).
163              
164             If an error occurs when the corresponding error callback is not supplied, and
165             there is not a handler for it, then the C method is called instead.
166              
167             =head2 on_read_high_watermark $length
168              
169             =head2 on_read_low_watermark $length
170              
171             Optional. Invoked when the read buffer grows larger than the high watermark
172             or smaller than the low watermark respectively. These are edge-triggered
173             events; they will only be triggered once per crossing, not continuously while
174             the buffer remains above or below the given limit.
175              
176             If these event handlers are not defined, the default behaviour is to disable
177             read-ready notifications if the read buffer grows larger than the high
178             watermark (so as to avoid it growing arbitrarily if nothing is consuming it),
179             and re-enable notifications again once something has read enough to cause it to
180             drop. If these events are overridden, the overriding code will have to perform
181             this behaviour if required, by using
182              
183             $self->want_readready_for_read(...)
184              
185             =head2 on_outgoing_empty
186              
187             Optional. Invoked when the writing data buffer becomes empty.
188              
189             =head2 on_writeable_start
190              
191             =head2 on_writeable_stop
192              
193             Optional. These two events inform when the filehandle becomes writeable, and
194             when it stops being writeable. C is invoked by the
195             C event if previously it was known to be not writeable.
196             C is invoked after a C operation fails with
197             C or C. These two events track the writeability state,
198             and ensure that only state change cause events to be invoked. A stream starts
199             off being presumed writeable, so the first of these events to be observed will
200             be C.
201              
202             =cut
203              
204             sub _init
205             {
206 712     712   3012 my $self = shift;
207              
208 712         7293 $self->{writequeue} = []; # Queue of Writers
209 712         3563 $self->{readqueue} = []; # Queue of Readers
210 712         2299 $self->{writeable} = 1; # "innocent until proven guilty" (by means of EAGAIN)
211 712         4005 $self->{readbuff} = "";
212              
213 712         4262 $self->{reader} = "_sysread";
214 712         2380 $self->{writer} = "_syswrite";
215              
216 712         2148 $self->{read_len} = $READLEN;
217 712         3198 $self->{write_len} = $WRITELEN;
218              
219 712         3949 $self->{want} = WANT_READ_FOR_READ;
220              
221 712         2681 $self->{close_on_read_eof} = 1;
222             }
223              
224             =head1 PARAMETERS
225              
226             The following named parameters may be passed to C or C:
227              
228             =head2 read_handle => IO
229              
230             The IO handle to read from. Must implement C and C methods.
231              
232             =head2 write_handle => IO
233              
234             The IO handle to write to. Must implement C and C methods.
235              
236             =head2 handle => IO
237              
238             Shortcut to specifying the same IO handle for both of the above.
239              
240             =head2 on_read => CODE
241              
242             =head2 on_read_error => CODE
243              
244             =head2 on_outgoing_empty => CODE
245              
246             =head2 on_write_error => CODE
247              
248             =head2 on_writeable_start => CODE
249              
250             =head2 on_writeable_stop => CODE
251              
252             CODE references for event handlers.
253              
254             =head2 autoflush => BOOL
255              
256             Optional. If true, the C method will attempt to write data to the
257             operating system immediately, without waiting for the loop to indicate the
258             filehandle is write-ready. This is useful, for example, on streams that should
259             contain up-to-date logging or console information.
260              
261             It currently defaults to false for any file handle, but future versions of
262             L may enable this by default on STDOUT and STDERR.
263              
264             =head2 read_len => INT
265              
266             Optional. Sets the buffer size for C calls. Defaults to 8 KiBytes.
267              
268             =head2 read_all => BOOL
269              
270             Optional. If true, attempt to read as much data from the kernel as possible
271             when the handle becomes readable. By default this is turned off, meaning at
272             most one fixed-size buffer is read. If there is still more data in the
273             kernel's buffer, the handle will still be readable, and will be read from
274             again.
275              
276             This behaviour allows multiple streams and sockets to be multiplexed
277             simultaneously, meaning that a large bulk transfer on one cannot starve other
278             filehandles of processing time. Turning this option on may improve bulk data
279             transfer rate, at the risk of delaying or stalling processing on other
280             filehandles.
281              
282             =head2 write_len => INT
283              
284             Optional. Sets the buffer size for C calls. Defaults to 8 KiBytes.
285              
286             =head2 write_all => BOOL
287              
288             Optional. Analogous to the C option, but for writing. When
289             C is enabled, this option only affects deferred writing if the
290             initial attempt failed due to buffer space.
291              
292             =head2 read_high_watermark => INT
293              
294             =head2 read_low_watermark => INT
295              
296             Optional. If defined, gives a way to implement flow control or other
297             behaviours that depend on the size of Stream's read buffer.
298              
299             If after more data is read from the underlying filehandle the read buffer is
300             now larger than the high watermark, the C event is
301             triggered (which, by default, will disable read-ready notifications and pause
302             reading from the filehandle).
303              
304             If after data is consumed by an C handler the read buffer is now
305             smaller than the low watermark, the C event is
306             triggered (which, by default, will re-enable read-ready notifications and
307             resume reading from the filehandle). For to be possible, the read handler
308             would have to be one added by the C method or one of the
309             Future-returning C methods.
310              
311             By default these options are not defined, so this behaviour will not happen.
312             C may not be set to a larger value than
313             C, but it may be set to a smaller value, creating a
314             hysteresis region. If either option is defined then both must be.
315              
316             If these options are used with the default event handlers, be careful not to
317             cause deadlocks by having a high watermark sufficiently low that a single
318             C invocation might not consider it finished yet.
319              
320             =head2 reader => STRING|CODE
321              
322             =head2 writer => STRING|CODE
323              
324             Optional. If defined, gives the name of a method or a CODE reference to use
325             to implement the actual reading from or writing to the filehandle. These will
326             be invoked as
327              
328             $stream->reader( $read_handle, $buffer, $len )
329             $stream->writer( $write_handle, $buffer, $len )
330              
331             Each is expected to modify the passed buffer; C by appending to it,
332             C by removing a prefix from it. Each is expected to return a true
333             value on success, zero on EOF, or C with C<$!> set for errors. If not
334             provided, they will be substituted by implenentations using C and
335             C on the underlying handle, respectively.
336              
337             =head2 close_on_read_eof => BOOL
338              
339             Optional. Usually true, but if set to a false value then the stream will not
340             be Cd when an EOF condition occurs on read. This is normally not useful
341             as at that point the underlying stream filehandle is no longer useable, but it
342             may be useful for reading regular files, or interacting with TTY devices.
343              
344             =head2 encoding => STRING
345              
346             If supplied, sets the name of encoding of the underlying stream. If an
347             encoding is set, then the C method will expect to receive Unicode
348             strings and encodes them into bytes, and incoming bytes will be decoded into
349             Unicode strings for the C event.
350              
351             If an encoding is not supplied then C and C will work in byte
352             strings.
353              
354             I in order to handle reads of UTF-8 content or other
355             multibyte encodings, the code implementing the C event uses a feature
356             of L; the C flag. While this flag has existed for a
357             while and is used by the C<:encoding> PerlIO layer itself for similar
358             purposes, the flag is not officially documented by the C module. In
359             principle this undocumented feature could be subject to change, in practice I
360             believe it to be reasonably stable.
361              
362             This note applies only to the C event; data written using the
363             C method does not rely on any undocumented features of C.
364              
365             If a read handle is given, it is required that either an C callback
366             reference is configured, or that the object provides an C method. It
367             is optional whether either is true for C; if neither is
368             supplied then no action will be taken when the writing buffer becomes empty.
369              
370             An C handler may be supplied even if no read handle is yet given, to
371             be used when a read handle is eventually provided by the C
372             method.
373              
374             This condition is checked at the time the object is added to a Loop; it is
375             allowed to create a C object with a read handle but without
376             a C handler, provided that one is later given using C
377             before the stream is added to its containing Loop, either directly or by being
378             a child of another Notifier already in a Loop, or added to one.
379              
380             =cut
381              
382             sub configure
383             {
384 993     993 1 6454 my $self = shift;
385 993         6475 my %params = @_;
386              
387 993         4713 for (qw( on_read on_outgoing_empty on_read_eof on_write_eof on_read_error
388             on_write_error on_writeable_start on_writeable_stop autoflush
389             read_len read_all write_len write_all on_read_high_watermark
390             on_read_low_watermark reader writer close_on_read_eof )) {
391 17874 100       43711 $self->{$_} = delete $params{$_} if exists $params{$_};
392             }
393              
394 993 100 66     9521 if( exists $params{read_high_watermark} or exists $params{read_low_watermark} ) {
395 1         22 my $high = delete $params{read_high_watermark};
396 1 50       10 defined $high or $high = $self->{read_high_watermark};
397              
398 1         2 my $low = delete $params{read_low_watermark};
399 1 50       4 defined $low or $low = $self->{read_low_watermark};
400              
401 1 50 33     8 croak "Cannot set read_low_watermark without read_high_watermark" if defined $low and !defined $high;
402 1 50 33     6 croak "Cannot set read_high_watermark without read_low_watermark" if defined $high and !defined $low;
403              
404 1 50 33     8 croak "Cannot set read_low_watermark higher than read_high_watermark" if defined $low and defined $high and $low > $high;
      33        
405              
406 1         3 $self->{read_high_watermark} = $high;
407 1         3 $self->{read_low_watermark} = $low;
408              
409             # TODO: reassert levels if we've moved them
410             }
411              
412 993 100       2758 if( exists $params{encoding} ) {
413 2         4 my $encoding = delete $params{encoding};
414 2         8 my $obj = find_encoding( $encoding );
415 2 50       246 defined $obj or croak "Cannot handle an encoding of '$encoding'";
416 2         5 $self->{encoding} = $obj;
417             }
418              
419 993         14028 $self->SUPER::configure( %params );
420              
421 993 100 100     3072 if( $self->loop and $self->read_handle ) {
422 5 50       25 $self->can_event( "on_read" ) or
423             croak 'Expected either an on_read callback or to be able to ->on_read';
424             }
425              
426 993 100 100     4732 if( $self->{autoflush} and my $write_handle = $self->write_handle ) {
427 62 50       700 carp "An IO::Async::Stream with autoflush needs an O_NONBLOCK write handle"
428             if $write_handle->blocking;
429             }
430             }
431              
432             sub _add_to_loop
433             {
434 703     703   1556 my $self = shift;
435              
436 703 100       2196 if( defined $self->read_handle ) {
437 592 100       1965 $self->can_event( "on_read" ) or
438             croak 'Expected either an on_read callback or to be able to ->on_read';
439             }
440              
441 702         5554 $self->SUPER::_add_to_loop( @_ );
442              
443 702 100       6036 if( !$self->_is_empty ) {
444 39         243 $self->want_writeready_for_write( 1 );
445             }
446             }
447              
448             =head1 METHODS
449              
450             The following methods documented with a trailing call to C<< ->get >> return
451             L instances.
452              
453             =cut
454              
455             =head2 want_readready_for_read
456              
457             =head2 want_readready_for_write
458              
459             $stream->want_readready_for_read( $set )
460              
461             $stream->want_readready_for_write( $set )
462              
463             Mutators for the C property on L, which
464             control whether the C or C behaviour should be continued once the
465             filehandle becomes ready for read.
466              
467             Normally, C is always true (though the read watermark
468             behaviour can modify it), and C is not used.
469             However, if a custom C function is provided, it may find this useful
470             for being invoked again if it cannot proceed with a write operation until the
471             filehandle becomes readable (such as during transport negotiation or SSL key
472             management, for example).
473              
474             =cut
475              
476             sub want_readready_for_read
477             {
478 0     0 1 0 my $self = shift;
479 0         0 my ( $set ) = @_;
480 0 0       0 $set ? ( $self->{want} |= WANT_READ_FOR_READ ) : ( $self->{want} &= ~WANT_READ_FOR_READ );
481              
482 0 0       0 $self->want_readready( $self->{want} & WANT_ANY_READ ) if $self->read_handle;
483             }
484              
485             sub want_readready_for_write
486             {
487 1     1 1 357 my $self = shift;
488 1         3 my ( $set ) = @_;
489 1 50       6 $set ? ( $self->{want} |= WANT_READ_FOR_WRITE ) : ( $self->{want} &= ~WANT_READ_FOR_WRITE );
490              
491 1 50       4 $self->want_readready( $self->{want} & WANT_ANY_READ ) if $self->read_handle;
492             }
493              
494             =head2 want_writeready_for_read
495              
496             =head2 want_writeready_for_write
497              
498             $stream->want_writeready_for_write( $set )
499              
500             $stream->want_writeready_for_read( $set )
501              
502             Mutators for the C property on L, which
503             control whether the C or C behaviour should be continued once the
504             filehandle becomes ready for write.
505              
506             Normally, C is managed by the C method and
507             associated flushing, and C is not used. However, if
508             a custom C function is provided, it may find this useful for being
509             invoked again if it cannot proceed with a read operation until the filehandle
510             becomes writable (such as during transport negotiation or SSL key management,
511             for example).
512              
513             =cut
514              
515             sub want_writeready_for_write
516             {
517 223     223 1 422 my $self = shift;
518 223         610 my ( $set ) = @_;
519 223 100       857 $set ? ( $self->{want} |= WANT_WRITE_FOR_WRITE ) : ( $self->{want} &= ~WANT_WRITE_FOR_WRITE );
520              
521 223 100       749 $self->want_writeready( $self->{want} & WANT_ANY_WRITE ) if $self->write_handle;
522             }
523              
524             sub want_writeready_for_read
525             {
526 1     1 1 605 my $self = shift;
527 1         3 my ( $set ) = @_;
528 1 50       7 $set ? ( $self->{want} |= WANT_WRITE_FOR_READ ) : ( $self->{want} &= ~WANT_WRITE_FOR_READ );
529              
530 1 50       7 $self->want_writeready( $self->{want} & WANT_ANY_WRITE ) if $self->write_handle;
531             }
532              
533             # FUNCTION not method
534             sub _nonfatal_error
535             {
536 9     9   18 my ( $errno ) = @_;
537              
538 9   66     75 return $errno == EAGAIN ||
539             $errno == EWOULDBLOCK ||
540             $errno == EINTR;
541             }
542              
543             sub _is_empty
544             {
545 1327     1327   2464 my $self = shift;
546 1327         2049 return !@{ $self->{writequeue} };
  1327         7676  
547             }
548              
549             =head2 close
550              
551             $stream->close
552              
553             A synonym for C. This should not be used when the deferred
554             wait behaviour is required, as the behaviour of C may change in a
555             future version of L. Instead, call C directly.
556              
557             =cut
558              
559             sub close
560             {
561 8     8 1 1088 my $self = shift;
562 8         67 $self->close_when_empty;
563             }
564              
565             =head2 close_when_empty
566              
567             $stream->close_when_empty
568              
569             If the write buffer is empty, this method calls C on the underlying IO
570             handles, and removes the stream from its containing loop. If the write buffer
571             still contains data, then this is deferred until the buffer is empty. This is
572             intended for "write-then-close" one-shot streams.
573              
574             $stream->write( "Here is my final data\n" );
575             $stream->close_when_empty;
576              
577             Because of this deferred nature, it may not be suitable for error handling.
578             See instead the C method.
579              
580             =cut
581              
582             sub close_when_empty
583             {
584 173     173 1 2216 my $self = shift;
585              
586 173 100       449 return $self->SUPER::close if $self->_is_empty;
587              
588 8         171 $self->{stream_closing} = 1;
589             }
590              
591             =head2 close_now
592              
593             $stream->close_now
594              
595             This method immediately closes the underlying IO handles and removes the
596             stream from the containing loop. It will not wait to flush the remaining data
597             in the write buffer.
598              
599             =cut
600              
601             sub close_now
602             {
603 478     478 1 1342 my $self = shift;
604              
605 478         889 foreach ( @{ $self->{writequeue} } ) {
  478         1968  
606 2 100       7 $_->on_error->( $self, "stream closing" ) if $_->on_error;
607             }
608              
609 478         1011 undef @{ $self->{writequeue} };
  478         1329  
610 478         2688 undef $self->{stream_closing};
611              
612 478         3473 $self->SUPER::close;
613             }
614              
615             =head2 is_read_eof
616              
617             =head2 is_write_eof
618              
619             $eof = $stream->is_read_eof
620              
621             $eof = $stream->is_write_eof
622              
623             Returns true after an EOF condition is reported on either the read or the
624             write handle, respectively.
625              
626             =cut
627              
628             sub is_read_eof
629             {
630 2     2 1 61 my $self = shift;
631 2         9 return $self->{read_eof};
632             }
633              
634             sub is_write_eof
635             {
636 2     2 1 36 my $self = shift;
637 2         10 return $self->{write_eof};
638             }
639              
640             =head2 write
641              
642             $stream->write( $data, %params )
643              
644             This method adds data to the outgoing data queue, or writes it immediately,
645             according to the C parameter.
646              
647             If the C option is set, this method will try immediately to write
648             the data to the underlying filehandle. If this completes successfully then it
649             will have been written by the time this method returns. If it fails to write
650             completely, then the data is queued as if C were not set, and will
651             be flushed as normal.
652              
653             C<$data> can either be a plain string, a L, or a CODE reference. If it
654             is a plain string it is written immediately. If it is not, its value will be
655             used to generate more C<$data> values, eventually leading to strings to be
656             written.
657              
658             If C<$data> is a C, the Stream will wait until it is ready, and take
659             the single value it yields.
660              
661             If C<$data> is a CODE reference, it will be repeatedly invoked to generate new
662             values. Each time the filehandle is ready to write more data to it, the
663             function is invoked. Once the function has finished generating data it should
664             return undef. The function is passed the Stream object as its first argument.
665              
666             It is allowed that Cs yield CODE references, or CODE references return
667             Cs, as well as plain strings.
668              
669             For example, to stream the contents of an existing opened filehandle:
670              
671             open my $fileh, "<", $path or die "Cannot open $path - $!";
672              
673             $stream->write( sub {
674             my ( $stream ) = @_;
675              
676             sysread $fileh, my $buffer, 8192 or return;
677             return $buffer;
678             } );
679              
680             Takes the following optional named parameters in C<%params>:
681              
682             =over 8
683              
684             =item write_len => INT
685              
686             Overrides the C parameter for the data written by this call.
687              
688             =item on_write => CODE
689              
690             A CODE reference which will be invoked after every successful C
691             operation on the underlying filehandle. It will be passed the number of bytes
692             that were written by this call, which may not be the entire length of the
693             buffer - if it takes more than one C operation to empty the buffer
694             then this callback will be invoked multiple times.
695              
696             $on_write->( $stream, $len )
697              
698             =item on_flush => CODE
699              
700             A CODE reference which will be invoked once the data queued by this C
701             call has been flushed. This will be invoked even if the buffer itself is not
702             yet empty; if more data has been queued since the call.
703              
704             $on_flush->( $stream )
705              
706             =item on_error => CODE
707              
708             A CODE reference which will be invoked if a C error happens while
709             performing this write. Invoked as for the C's C event.
710              
711             $on_error->( $stream, $errno )
712              
713             =back
714              
715             If the object is not yet a member of a loop and doesn't yet have a
716             C, then calls to the C method will simply queue the data
717             and return. It will be flushed when the object is added to the loop.
718              
719             If C<$data> is a defined but empty string, the write is still queued, and the
720             C continuation will be invoked, if supplied. This can be used to
721             obtain a marker, to invoke some code once the output queue has been flushed up
722             to this point.
723              
724             =head2 write (scalar)
725              
726             $stream->write( ... )->get
727              
728             If called in non-void context, this method returns a L which will
729             complete (with no value) when the write operation has been flushed. This may
730             be used as an alternative to, or combined with, the C callback.
731              
732             =cut
733              
734             sub _syswrite
735             {
736 166     166   2004 my $self = shift;
737 166         448 my ( $handle, undef, $len ) = @_;
738              
739 166         2293 my $written = $handle->syswrite( $_[1], $len );
740 166 100       7333 return $written if !$written; # zero or undef
741              
742 158         713 substr( $_[1], 0, $written ) = "";
743 158         509 return $written;
744             }
745              
746             sub _flush_one_write
747             {
748 177     177   426 my $self = shift;
749              
750 177         427 my $writequeue = $self->{writequeue};
751              
752 177         299 my $head;
753 177   66     2720 while( $head = $writequeue->[0] and ref $head->data ) {
754 18 100 33     203 if( ref $head->data eq "CODE" ) {
    50          
755 12         71 my $data = $head->data->( $self );
756 12 100       2572 if( !defined $data ) {
757 5 50       17 $head->on_flush->( $self ) if $head->on_flush;
758 5         605 shift @$writequeue;
759 5         56 return 1;
760             }
761 7 100 100     38 if( !ref $data and my $encoding = $self->{encoding} ) {
762 1         6 $data = $encoding->encode( $data );
763             }
764 7         24 unshift @$writequeue, my $new = Writer(
765             $data, $head->writelen, $head->on_write, undef, undef, 0
766             );
767 7         104 next;
768             }
769             elsif( blessed $head->data and $head->data->isa( "Future" ) ) {
770 6         119 my $f = $head->data;
771 6 100       37 if( !$f->is_ready ) {
772 2 50       13 return 0 if $head->watching;
773 2     2   23 $f->on_ready( sub { $self->_flush_one_write } );
  2         713  
774 2         60 $head->watching++;
775 2         15 return 0;
776             }
777 4         34 my $data = $f->get;
778 4 100 100     80 if( !ref $data and my $encoding = $self->{encoding} ) {
779 1         6 $data = $encoding->encode( $data );
780             }
781 4         10 $head->data = $data;
782 4         45 next;
783             }
784             else {
785 0         0 die "Unsure what to do with reference ".ref($head->data)." in write queue";
786             }
787             }
788              
789 170         4457 my $second;
790 170   100     742 while( $second = $writequeue->[1] and
      66        
      66        
      33        
      33        
791             !ref $second->data and
792             $head->writelen == $second->writelen and
793             !$head->on_write and !$second->on_write and
794             !$head->on_flush ) {
795 1         40 $head->data .= $second->data;
796 1         8 $head->on_write = $second->on_write;
797 1         8 $head->on_flush = $second->on_flush;
798 1         9 splice @$writequeue, 1, 1, ();
799             }
800              
801 170 50       624 die "TODO: head data does not contain a plain string" if ref $head->data;
802              
803 170 50       1377 if( $IO::Async::Debug::DEBUG > 1 ) {
804 0         0 my $data = substr $head->data, 0, $head->writelen;
805 0         0 $self->debug_printf( "WRITE len=%d", length $data );
806 0 0       0 IO::Async::Debug::log_hexdump( $data ) if $IO::Async::Debug::DEBUG_FLAGS{Sw};
807             }
808              
809 170         454 my $writer = $self->{writer};
810 170         605 my $len = $self->$writer( $self->write_handle, $head->data, $head->writelen );
811              
812 170 100       706 if( !defined $len ) {
813 3         16 my $errno = $!;
814              
815 3 100 66     104 if( $errno == EAGAIN or $errno == EWOULDBLOCK ) {
816 1 50       13 $self->maybe_invoke_event( on_writeable_stop => ) if $self->{writeable};
817 1         6 $self->{writeable} = 0;
818             }
819              
820 3 100       8 return 0 if _nonfatal_error( $errno );
821              
822 2 50       6 $self->debug_printf( "WRITE err=%d/%s", $errno, $errno ) if $IO::Async::Debug::DEBUG > 1;
823              
824 2 100       6 if( $errno == EPIPE ) {
825 1         16 $self->debug_printf( "WRITE-EOF" );
826 1         2 $self->{write_eof} = 1;
827 1         3 $self->maybe_invoke_event( on_write_eof => );
828             }
829              
830 2 50       10 $head->on_error->( $self, $errno ) if $head->on_error;
831 2 100       6 $self->maybe_invoke_event( on_write_error => $errno )
832             or $self->close_now;
833              
834 2         21 return 0;
835             }
836              
837 167 100 66     873 $METRICS and $METRICS->inc_counter_by( stream_written => $len ) if $len;
838              
839 167 100       4481 if( my $on_write = $head->on_write ) {
840 3         23 $on_write->( $self, $len );
841             }
842              
843 167 100       1944 if( !length $head->data ) {
844 157 100       1385 $head->on_flush->( $self ) if $head->on_flush;
845 157         1429 shift @{ $self->{writequeue} };
  157         436  
846             }
847              
848 167         1624 return 1;
849             }
850              
851             sub write
852             {
853 169     169 1 6842 my $self = shift;
854 169         736 my ( $data, %params ) = @_;
855              
856 169 50 0     956 carp "Cannot write data to a Stream that is closing" and return if $self->{stream_closing};
857              
858             # Allow writes without a filehandle if we're not yet in a Loop, just don't
859             # try to flush them
860 169         772 my $handle = $self->write_handle;
861              
862 169 100 100     935 croak "Cannot write data to a Stream with no write_handle" if !$handle and $self->loop;
863              
864 168 100 100     1691 if( !ref $data and my $encoding = $self->{encoding} ) {
865 2         18 $data = $encoding->encode( $data );
866             }
867              
868 168         485 my $on_write = delete $params{on_write};
869 168         387 my $on_flush = delete $params{on_flush};
870 168         370 my $on_error = delete $params{on_error};
871              
872 168         317 my $f;
873 168 100       478 if( defined wantarray ) {
874 3         4 my $orig_on_flush = $on_flush;
875 3         5 my $orig_on_error = $on_error;
876              
877 3 50       9 my $loop = $self->loop or
878             croak "Cannot ->write data returning a Future to a Stream not in a Loop";
879 3         13 $f = $loop->new_future;
880             $on_flush = sub {
881 1     1   18 $f->done;
882 1 50       51 $orig_on_flush->( @_ ) if $orig_on_flush;
883 3         15 };
884             $on_error = sub {
885 3     3   32 my $self = shift;
886 3         6 my ( $errno ) = @_;
887              
888 3 100       12 $f->fail( "write failed: $errno", syswrite => $errno ) unless $f->is_ready;
889              
890 3 50       123 $orig_on_error->( $self, @_ ) if $orig_on_error;
891 3         10 };
892             }
893              
894 168         380 my $write_len = $params{write_len};
895 168 50       633 defined $write_len or $write_len = $self->{write_len};
896              
897 168         335 push @{ $self->{writequeue} }, Writer(
  168         2095  
898             $data, $write_len, $on_write, $on_flush, $on_error, 0
899             );
900              
901 168 50       4201 keys %params and croak "Unrecognised keys for ->write - " . join( ", ", keys %params );
902              
903 168 100       556 return $f unless $handle;
904              
905 132 100       323 if( $self->{autoflush} ) {
906 104   66     319 1 while !$self->_is_empty and $self->_flush_one_write;
907              
908 104 50       246 if( $self->_is_empty ) {
909 104         341 $self->want_writeready_for_write( 0 );
910 104         490 return $f;
911             }
912             }
913              
914 28         98 $self->want_writeready_for_write( 1 );
915 28         83 return $f;
916             }
917              
918             sub on_write_ready
919             {
920 68     68 1 289 my $self = shift;
921              
922 68 100       384 if( !$self->{writeable} ) {
923 1         5 $self->maybe_invoke_event( on_writeable_start => );
924 1         4 $self->{writeable} = 1;
925             }
926              
927 68 100       715 $self->_do_write if $self->{want} & WANT_WRITE_FOR_WRITE;
928 68 100       340 $self->_do_read if $self->{want} & WANT_WRITE_FOR_READ;
929             }
930              
931             sub _do_write
932             {
933 68     68   327 my $self = shift;
934              
935 68   100     445 1 while !$self->_is_empty and $self->_flush_one_write and $self->{write_all};
      100        
936              
937             # All data successfully flushed
938 68 100       318 if( $self->_is_empty ) {
939 51         234 $self->want_writeready_for_write( 0 );
940              
941 51         348 $self->maybe_invoke_event( on_outgoing_empty => );
942              
943 51 100       254 $self->close_now if $self->{stream_closing};
944             }
945             }
946              
947             sub _flush_one_read
948             {
949 1152     1152   2393 my $self = shift;
950 1152         2597 my ( $eof ) = @_;
951              
952 1152         4891 local $self->{flushing_read} = 1;
953              
954 1152         2323 my $readqueue = $self->{readqueue};
955              
956 1152         1801 my $ret;
957 1152 100 66     4489 if( $readqueue->[0] and my $on_read = $readqueue->[0]->on_read ) {
958 17         205 $ret = $on_read->( $self, \$self->{readbuff}, $eof );
959             }
960             else {
961 1135         6206 $ret = $self->invoke_event( on_read => \$self->{readbuff}, $eof );
962             }
963              
964 1152 100 100     5462 if( defined $self->{read_low_watermark} and $self->{at_read_high_watermark} and
      66        
965             length $self->{readbuff} < $self->{read_low_watermark} ) {
966 1         2 undef $self->{at_read_high_watermark};
967 1         6 $self->invoke_event( on_read_low_watermark => length $self->{readbuff} );
968             }
969              
970 1152 100 100     4798 if( ref $ret eq "CODE" ) {
    100          
971             # Replace the top CODE, or add it if there was none
972 1         6 $readqueue->[0] = Reader( $ret, undef );
973 1         28 return 1;
974             }
975             elsif( @$readqueue and !defined $ret ) {
976 13         22 shift @$readqueue;
977 13         124 return 1;
978             }
979             else {
980 1138   100     9177 return $ret && ( length( $self->{readbuff} ) > 0 || $eof );
981             }
982             }
983              
984             sub _sysread
985             {
986 943     943   2919 my $self = shift;
987 943         2776 my ( $handle, undef, $len ) = @_;
988 943         6448 return $handle->sysread( $_[1], $len );
989             }
990              
991             sub on_read_ready
992             {
993 942     942 1 2181 my $self = shift;
994              
995 942 50       6900 $self->_do_read if $self->{want} & WANT_READ_FOR_READ;
996 942 100       10457 $self->_do_write if $self->{want} & WANT_READ_FOR_WRITE;
997             }
998              
999             sub _do_read
1000             {
1001 943     943   2019 my $self = shift;
1002              
1003 943         3583 my $handle = $self->read_handle;
1004 943         2383 my $reader = $self->{reader};
1005              
1006 943         1602 while(1) {
1007 947         1543 my $data;
1008 947         4914 my $len = $self->$reader( $handle, $data, $self->{read_len} );
1009              
1010 947 100       21405 if( !defined $len ) {
1011 6         32 my $errno = $!;
1012              
1013 6 100       18 return if _nonfatal_error( $errno );
1014              
1015 4 50       14 $self->debug_printf( "READ err=%d/%s", $errno, $errno ) if $IO::Async::Debug::DEBUG > 1;
1016              
1017 4 100       15 $self->maybe_invoke_event( on_read_error => $errno )
1018             or $self->close_now;
1019              
1020 4         20 foreach ( @{ $self->{readqueue} } ) {
  4         10  
1021 1 50       6 $_->future->fail( "read failed: $errno", sysread => $errno ) if $_->future;
1022             }
1023 4         86 undef @{ $self->{readqueue} };
  4         10  
1024              
1025 4         11 return;
1026             }
1027              
1028 941 50       2821 if( $IO::Async::Debug::DEBUG > 1 ) {
1029 0         0 $self->debug_printf( "READ len=%d", $len );
1030 0 0       0 IO::Async::Debug::log_hexdump( $data ) if $IO::Async::Debug::DEBUG_FLAGS{Sr};
1031             }
1032              
1033 941 100 66     3487 $METRICS and $METRICS->inc_counter_by( stream_read => $len ) if $len;
1034              
1035 941         11496 my $eof = $self->{read_eof} = ( $len == 0 );
1036              
1037 941 100       3296 if( my $encoding = $self->{encoding} ) {
1038 4 100       21 my $bytes = defined $self->{bytes_remaining} ? $self->{bytes_remaining} . $data : $data;
1039 4         31 $data = $encoding->decode( $bytes, STOP_AT_PARTIAL );
1040 4         29 $self->{bytes_remaining} = $bytes;
1041             }
1042              
1043 941 100       3556 $self->{readbuff} .= $data if !$eof;
1044              
1045 941         4098 1 while $self->_flush_one_read( $eof );
1046              
1047 941 100       2581 if( $eof ) {
1048 476         2823 $self->debug_printf( "READ-EOF" );
1049 476         2414 $self->maybe_invoke_event( on_read_eof => );
1050 476 100       3508 $self->close_now if $self->{close_on_read_eof};
1051 476         997 foreach ( @{ $self->{readqueue} } ) {
  476         1459  
1052 0 0       0 $_->future->done( undef ) if $_->future;
1053             }
1054 476         799 undef @{ $self->{readqueue} };
  476         1053  
1055 476         1318 return;
1056             }
1057              
1058 465 100       1771 last unless $self->{read_all};
1059             }
1060              
1061 461 100 66     1722 if( defined $self->{read_high_watermark} and length $self->{readbuff} >= $self->{read_high_watermark} ) {
1062             $self->{at_read_high_watermark} or
1063 1 50       8 $self->invoke_event( on_read_high_watermark => length $self->{readbuff} );
1064              
1065 1         4 $self->{at_read_high_watermark} = 1;
1066             }
1067             }
1068              
1069             sub on_read_high_watermark
1070             {
1071 0     0 1 0 my $self = shift;
1072 0         0 $self->want_readready_for_read( 0 );
1073             }
1074              
1075             sub on_read_low_watermark
1076             {
1077 0     0 1 0 my $self = shift;
1078 0         0 $self->want_readready_for_read( 1 );
1079             }
1080              
1081             =head2 push_on_read
1082              
1083             $stream->push_on_read( $on_read )
1084              
1085             Pushes a new temporary C handler to the end of the queue. This queue,
1086             if non-empty, is used to provide C event handling code in preference
1087             to using the object's main event handler or method. New handlers can be
1088             supplied at any time, and they will be used in first-in first-out (FIFO)
1089             order.
1090              
1091             As with the main C event handler, each can return a (defined) boolean
1092             to indicate if they wish to be invoked again or not, another C reference
1093             to replace themself with, or C to indicate it is now complete and
1094             should be removed. When a temporary handler returns C it is shifted
1095             from the queue and the next one, if present, is invoked instead. If there are
1096             no more then the object's main handler is invoked instead.
1097              
1098             =cut
1099              
1100             sub push_on_read
1101             {
1102 13     13 1 39 my $self = shift;
1103 13         32 my ( $on_read, %args ) = @_;
1104             # %args undocumented for internal use
1105              
1106 13         19 push @{ $self->{readqueue} }, Reader( $on_read, $args{future} );
  13         45  
1107              
1108             # TODO: Should this always defer?
1109 13 100       102 return if $self->{flushing_read};
1110 12   100     44 1 while length $self->{readbuff} and $self->_flush_one_read( 0 );
1111             }
1112              
1113             =head1 FUTURE-RETURNING READ METHODS
1114              
1115             The following methods all return a L which will become ready when
1116             enough data has been read by the Stream into its buffer. At this point, the
1117             data is removed from the buffer and given to the C object to complete
1118             it.
1119              
1120             my $f = $stream->read_...
1121              
1122             my ( $string ) = $f->get;
1123              
1124             Unlike the C event handlers, these methods don't allow for access to
1125             "partial" results; they only provide the final result once it is ready.
1126              
1127             If a C is cancelled before it completes it is removed from the read
1128             queue without consuming any data; i.e. each C atomically either
1129             completes or is cancelled.
1130              
1131             Since it is possible to use a readable C entirely using these
1132             C-returning methods instead of the C event, it may be useful
1133             to configure a trivial return-false event handler to keep it from consuming
1134             any input, and to allow it to be added to a C in the first place.
1135              
1136             my $stream = IO::Async::Stream->new( on_read => sub { 0 }, ... );
1137             $loop->add( $stream );
1138              
1139             my $f = $stream->read_...
1140              
1141             If a read EOF or error condition happens while there are read Cs
1142             pending, they are all completed. In the case of a read EOF, they are done with
1143             C; in the case of a read error they are failed using the C<$!> error
1144             value as the failure.
1145              
1146             $f->fail( $message, sysread => $! )
1147              
1148             If a read EOF condition happens to the currently-processing read C, it
1149             will return a partial result. The calling code can detect this by the fact
1150             that the returned data is not complete according to the specification (too
1151             short in C's case, or lacking the ending pattern in
1152             C's case). Additionally, each C will yield the C<$eof>
1153             value in its results.
1154              
1155             my ( $string, $eof ) = $f->get;
1156              
1157             =cut
1158              
1159             sub _read_future
1160             {
1161 11     11   17 my $self = shift;
1162 11         29 my $f = $self->loop->new_future;
1163             $f->on_cancel( $self->_capture_weakself( sub {
1164 1 50   1   5 my $self = shift or return;
1165 1         3 1 while $self->_flush_one_read;
1166 11         62 }));
1167 11         229 return $f;
1168             }
1169              
1170             =head2 read_atmost
1171              
1172             =head2 read_exactly
1173              
1174             ( $string, $eof ) = $stream->read_atmost( $len )->get
1175              
1176             ( $string, $eof ) = $stream->read_exactly( $len )->get
1177              
1178             Completes the C when the read buffer contains C<$len> or more
1179             characters of input. C will also complete after the first
1180             invocation of C, even if fewer characters are available, whereas
1181             C will wait until at least C<$len> are available.
1182              
1183             =cut
1184              
1185             sub read_atmost
1186             {
1187 2     2 1 584 my $self = shift;
1188 2         6 my ( $len ) = @_;
1189              
1190 2         7 my $f = $self->_read_future;
1191             $self->push_on_read( sub {
1192 1     1   3 my ( undef, $buffref, $eof ) = @_;
1193 1 50       11 return undef if $f->is_cancelled;
1194 1         18 $f->done( substr( $$buffref, 0, $len, "" ), $eof );
1195 1         120 return undef;
1196 2         11 }, future => $f );
1197 2         8 return $f;
1198             }
1199              
1200             sub read_exactly
1201             {
1202 4     4 1 1142 my $self = shift;
1203 4         8 my ( $len ) = @_;
1204              
1205 4         9 my $f = $self->_read_future;
1206             $self->push_on_read( sub {
1207 4     4   10 my ( undef, $buffref, $eof ) = @_;
1208 4 50       12 return undef if $f->is_cancelled;
1209 4 50 33     46 return 0 unless $eof or length $$buffref >= $len;
1210 4         20 $f->done( substr( $$buffref, 0, $len, "" ), $eof );
1211 4         247 return undef;
1212 4         20 }, future => $f );
1213 4         31 return $f;
1214             }
1215              
1216             =head2 read_until
1217              
1218             ( $string, $eof ) = $stream->read_until( $end )->get
1219              
1220             Completes the C when the read buffer contains a match for C<$end>,
1221             which may either be a plain string or a compiled C reference. Yields
1222             the prefix of the buffer up to and including this match.
1223              
1224             =cut
1225              
1226             sub read_until
1227             {
1228 4     4 1 1813 my $self = shift;
1229 4         14 my ( $until ) = @_;
1230              
1231 4 100       38 ref $until or $until = qr/\Q$until\E/;
1232              
1233 4         11 my $f = $self->_read_future;
1234             $self->push_on_read( sub {
1235 5     5   12 my ( undef, $buffref, $eof ) = @_;
1236 5 100       13 return undef if $f->is_cancelled;
1237 4 100       50 if( $$buffref =~ $until ) {
    50          
1238 3         27 $f->done( substr( $$buffref, 0, $+[0], "" ), $eof );
1239 3         126 return undef;
1240             }
1241             elsif( $eof ) {
1242 0         0 $f->done( $$buffref, $eof ); $$buffref = "";
  0         0  
1243 0         0 return undef;
1244             }
1245             else {
1246 1         3 return 0;
1247             }
1248 4         25 }, future => $f );
1249 4         18 return $f;
1250             }
1251              
1252             =head2 read_until_eof
1253              
1254             ( $string, $eof ) = $stream->read_until_eof->get
1255              
1256             Completes the C when the stream is eventually closed at EOF, and
1257             yields all of the data that was available.
1258              
1259             =cut
1260              
1261             sub read_until_eof
1262             {
1263 1     1 1 607 my $self = shift;
1264              
1265 1         4 my $f = $self->_read_future;
1266             $self->push_on_read( sub {
1267 2     2   7 my ( undef, $buffref, $eof ) = @_;
1268 2 50       6 return undef if $f->is_cancelled;
1269 2 100       16 return 0 unless $eof;
1270 1         5 $f->done( $$buffref, $eof ); $$buffref = "";
  1         41  
1271 1         2 return undef;
1272 1         8 }, future => $f );
1273 1         3 return $f;
1274             }
1275              
1276             =head1 UTILITY CONSTRUCTORS
1277              
1278             =cut
1279              
1280             =head2 new_for_stdin
1281              
1282             =head2 new_for_stdout
1283              
1284             =head2 new_for_stdio
1285              
1286             $stream = IO::Async::Stream->new_for_stdin
1287              
1288             $stream = IO::Async::Stream->new_for_stdout
1289              
1290             $stream = IO::Async::Stream->new_for_stdio
1291              
1292             Return a C object preconfigured with the correct
1293             C, C or both.
1294              
1295             =cut
1296              
1297 1     1 1 18 sub new_for_stdin { shift->new( read_handle => \*STDIN, @_ ) }
1298 1     1 1 21 sub new_for_stdout { shift->new( write_handle => \*STDOUT, @_ ) }
1299              
1300 1     1 1 397 sub new_for_stdio { shift->new( read_handle => \*STDIN, write_handle => \*STDOUT, @_ ) }
1301              
1302             =head2 connect
1303              
1304             $future = $stream->connect( %args )
1305              
1306             A convenient wrapper for calling the C method on the underlying
1307             L object, passing the C hint as C if not
1308             otherwise supplied.
1309              
1310             =cut
1311              
1312             sub connect
1313             {
1314 0     0 1   my $self = shift;
1315 0           return $self->SUPER::connect( socktype => "stream", @_ );
1316             }
1317              
1318             =head1 DEBUGGING FLAGS
1319              
1320             The following flags in C enable extra logging:
1321              
1322             =over 4
1323              
1324             =item C
1325              
1326             Log byte buffers as data is read from a Stream
1327              
1328             =item C
1329              
1330             Log byte buffers as data is written to a Stream
1331              
1332             =back
1333              
1334             =cut
1335              
1336             =head1 EXAMPLES
1337              
1338             =head2 A line-based C method
1339              
1340             The following C method accepts incoming C<\n>-terminated lines and
1341             prints them to the program's C stream.
1342              
1343             sub on_read
1344             {
1345             my $self = shift;
1346             my ( $buffref, $eof ) = @_;
1347              
1348             while( $$buffref =~ s/^(.*\n)// ) {
1349             print "Received a line: $1";
1350             }
1351              
1352             return 0;
1353             }
1354              
1355             Because a reference to the buffer itself is passed, it is simple to use a
1356             C regular expression on the scalar it points at, to both check if data
1357             is ready (i.e. a whole line), and to remove it from the buffer. Since it
1358             always removes as many complete lines as possible, it doesn't need invoking
1359             again when it has finished, so it can return a constant C<0>.
1360              
1361             =head2 Reading binary data
1362              
1363             This C method accepts incoming records in 16-byte chunks, printing
1364             each one.
1365              
1366             sub on_read
1367             {
1368             my ( $self, $buffref, $eof ) = @_;
1369              
1370             if( length $$buffref >= 16 ) {
1371             my $record = substr( $$buffref, 0, 16, "" );
1372             print "Received a 16-byte record: $record\n";
1373              
1374             return 1;
1375             }
1376              
1377             if( $eof and length $$buffref ) {
1378             print "EOF: a partial record still exists\n";
1379             }
1380              
1381             return 0;
1382             }
1383              
1384             This time, rather than a C loop we have decided to have the handler
1385             just process one record, and use the C mechanism to ask that the
1386             handler be invoked again if there still remains data that might contain
1387             another record; only stopping with C when we know we can't find one.
1388              
1389             The 4-argument form of C extracts the 16-byte record from the buffer
1390             and assigns it to the C<$record> variable, if there was enough data in the
1391             buffer to extract it.
1392              
1393             A lot of protocols use a fixed-size header, followed by a variable-sized body
1394             of data, whose size is given by one of the fields of the header. The following
1395             C method extracts messages in such a protocol.
1396              
1397             sub on_read
1398             {
1399             my ( $self, $buffref, $eof ) = @_;
1400              
1401             return 0 unless length $$buffref >= 8; # "N n n" consumes 8 bytes
1402              
1403             my ( $len, $x, $y ) = unpack "N n n", $$buffref;
1404              
1405             return 0 unless length $$buffref >= 8 + $len;
1406              
1407             substr( $$buffref, 0, 8, "" );
1408             my $data = substr( $$buffref, 0, $len, "" );
1409              
1410             print "A record with values x=$x y=$y\n";
1411              
1412             return 1;
1413             }
1414              
1415             In this example, the header is Ced first, to extract the body
1416             length, and then the body is extracted. If the buffer does not have enough
1417             data yet for a complete message then C<0> is returned, and the buffer is left
1418             unmodified for next time. Only when there are enough bytes in total does it
1419             use C to remove them.
1420              
1421             =head2 Dynamic replacement of C
1422              
1423             Consider the following protocol (inspired by IMAP), which consists of
1424             C<\n>-terminated lines that may have an optional data block attached. The
1425             presence of such a data block, as well as its size, is indicated by the line
1426             prefix.
1427              
1428             sub on_read
1429             {
1430             my $self = shift;
1431             my ( $buffref, $eof ) = @_;
1432              
1433             if( $$buffref =~ s/^DATA (\d+):(.*)\n// ) {
1434             my $length = $1;
1435             my $line = $2;
1436              
1437             return sub {
1438             my $self = shift;
1439             my ( $buffref, $eof ) = @_;
1440              
1441             return 0 unless length $$buffref >= $length;
1442              
1443             # Take and remove the data from the buffer
1444             my $data = substr( $$buffref, 0, $length, "" );
1445              
1446             print "Received a line $line with some data ($data)\n";
1447              
1448             return undef; # Restore the original method
1449             }
1450             }
1451             elsif( $$buffref =~ s/^LINE:(.*)\n// ) {
1452             my $line = $1;
1453              
1454             print "Received a line $line with no data\n";
1455              
1456             return 1;
1457             }
1458             else {
1459             print STDERR "Unrecognised input\n";
1460             # Handle it somehow
1461             }
1462             }
1463              
1464             In the case where trailing data is supplied, a new temporary C
1465             callback is provided in a closure. This closure captures the C<$length>
1466             variable so it knows how much data to expect. It also captures the C<$line>
1467             variable so it can use it in the event report. When this method has finished
1468             reading the data, it reports the event, then restores the original method by
1469             returning C.
1470              
1471             =head1 SEE ALSO
1472              
1473             =over 4
1474              
1475             =item *
1476              
1477             L - Supply object methods for I/O handles
1478              
1479             =back
1480              
1481             =head1 AUTHOR
1482              
1483             Paul Evans
1484              
1485             =cut
1486              
1487             0x55AA;