File Coverage

blib/lib/IO/Async/Stream.pm
Criterion Covered Total %
statement 353 373 94.6
branch 153 198 77.2
condition 74 110 67.2
subroutine 56 60 93.3
pod 24 24 100.0
total 660 765 86.2


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2006-2024 -- leonerd@leonerd.org.uk
5              
6             package IO::Async::Stream 0.805;
7              
8 56     56   7487 use v5.14;
  56         413  
9 56     56   332 use warnings;
  56         122  
  56         3848  
10              
11 56     56   353 use base qw( IO::Async::Handle );
  56         116  
  56         36510  
12              
13 56     56   3068 use Errno qw( EAGAIN EWOULDBLOCK EINTR EPIPE );
  56         8706  
  56         5124  
14              
15 56     56   336 use Carp;
  56         101  
  56         3658  
16              
17 56     56   7345 use Encode 2.11 qw( find_encoding STOP_AT_PARTIAL );
  56         270005  
  56         5790  
18 56     56   392 use Scalar::Util qw( blessed );
  56         132  
  56         3199  
19              
20 56     56   308 use Future 0.44; # ->result
  56         1058  
  56         1685  
21              
22 56     56   332 use IO::Async::Debug;
  56         109  
  56         1564  
23 56     56   5042 use IO::Async::Metrics '$METRICS';
  56         113  
  56         472  
24              
25             # Tuneable from outside
26             # Not yet documented
27             our $READLEN = 8192;
28             our $WRITELEN = 8192;
29              
30 56     56   27949 use Struct::Dumb;
  56         342660  
  56         394  
31              
32             # Element of the writequeue
33             struct Writer => [qw( data writelen on_write on_flush on_error watching )];
34              
35             # Element of the readqueue
36             struct Reader => [qw( on_read future )];
37              
38             # Bitfields in the want flags
39 56     56   5974 use constant WANT_READ_FOR_READ => 0x01;
  56         129  
  56         4069  
40 56     56   349 use constant WANT_READ_FOR_WRITE => 0x02;
  56         110  
  56         2813  
41 56     56   319 use constant WANT_WRITE_FOR_READ => 0x04;
  56         150  
  56         2486  
42 56     56   295 use constant WANT_WRITE_FOR_WRITE => 0x08;
  56         112  
  56         3087  
43 56     56   282 use constant WANT_ANY_READ => WANT_READ_FOR_READ |WANT_READ_FOR_WRITE;
  56         109  
  56         2651  
44 56     56   284 use constant WANT_ANY_WRITE => WANT_WRITE_FOR_READ|WANT_WRITE_FOR_WRITE;
  56         97  
  56         252518  
45              
46             =head1 NAME
47              
48             C - event callbacks and write bufering for a stream
49             filehandle
50              
51             =head1 SYNOPSIS
52              
53             =for highlighter language=perl
54              
55             use IO::Async::Stream;
56              
57             use IO::Async::Loop;
58             my $loop = IO::Async::Loop->new;
59              
60             my $stream = IO::Async::Stream->new(
61             read_handle => \*STDIN,
62             write_handle => \*STDOUT,
63              
64             on_read => sub {
65             my ( $self, $buffref, $eof ) = @_;
66              
67             while( $$buffref =~ s/^(.*\n)// ) {
68             print "Received a line $1";
69             }
70              
71             if( $eof ) {
72             print "EOF; last partial line is $$buffref\n";
73             }
74              
75             return 0;
76             }
77             );
78              
79             $loop->add( $stream );
80              
81             $stream->write( "An initial line here\n" );
82              
83             =head1 DESCRIPTION
84              
85             This subclass of L contains a filehandle that represents
86             a byte-stream. It provides buffering for both incoming and outgoing data. It
87             invokes the C handler when new data is read from the filehandle. Data
88             may be written to the filehandle by calling the C method.
89              
90             This class is suitable for any kind of filehandle that provides a
91             possibly-bidirectional reliable byte stream, such as a pipe, TTY, or
92             C socket (such as TCP or a byte-oriented UNIX local socket). For
93             datagram or raw message-based sockets (such as UDP) see instead
94             L.
95              
96             =cut
97              
98             =head1 EVENTS
99              
100             The following events are invoked, either using subclass methods or CODE
101             references in parameters:
102              
103             =head2 $ret = on_read \$buffer, $eof
104              
105             Invoked when more data is available in the internal receiving buffer.
106              
107             The first argument is a reference to a plain perl string. The code should
108             inspect and remove any data it likes, but is not required to remove all, or
109             indeed any of the data. Any data remaining in the buffer will be preserved for
110             the next call, the next time more data is received from the handle.
111              
112             In this way, it is easy to implement code that reads records of some form when
113             completed, but ignores partially-received records, until all the data is
114             present. If the handler wishes to be immediately invoke a second time, to have
115             another attempt at consuming more content, it should return C<1>. Otherwise,
116             it should return C<0>, and the handler will next be invoked when more data has
117             arrived from the underlying read handle and appended to the buffer. This makes
118             it easy to implement code that handles multiple incoming records at the same
119             time. Alternatively, if the handler function already attempts to consume as
120             much as possible from the buffer, it will have no need to return C<1> at all.
121             See the examples at the end of this documentation for more detail.
122              
123             The second argument is a scalar indicating whether the stream has reported an
124             end-of-file (EOF) condition. A reference to the buffer is passed to the
125             handler in the usual way, so it may inspect data contained in it. Once the
126             handler returns a false value, it will not be called again, as the handle is
127             now at EOF and no more data can arrive.
128              
129             The C code may also dynamically replace itself with a new callback
130             by returning a CODE reference instead of C<0> or C<1>. The original callback
131             or method that the object first started with may be restored by returning
132             C. Whenever the callback is changed in this way, the new code is called
133             again; even if the read buffer is currently empty. See the examples at the end
134             of this documentation for more detail.
135              
136             The C method can be used to insert new, temporary handlers that
137             take precedence over the global C handler. This event is only used if
138             there are no further pending handlers created by C.
139              
140             =head2 on_read_eof
141              
142             Optional. Invoked when the read handle indicates an end-of-file (EOF)
143             condition. If there is any data in the buffer still to be processed, the
144             C event will be invoked first, before this one.
145              
146             =head2 on_write_eof
147              
148             Optional. Invoked when the write handle indicates an end-of-file (EOF)
149             condition. Note that this condition can only be detected after a C
150             syscall returns the C error. If there is no data pending to be written
151             then it will not be detected yet.
152              
153             =head2 on_read_error $errno
154              
155             Optional. Invoked when the C method on the read handle fails.
156              
157             =head2 on_write_error $errno
158              
159             Optional. Invoked when the C method on the write handle fails.
160              
161             The C and C handlers are passed the value of
162             C<$!> at the time the error occurred. (The C<$!> variable itself, by its
163             nature, may have changed from the original error by the time this handler
164             runs so it should always use the value passed in).
165              
166             If an error occurs when the corresponding error callback is not supplied, and
167             there is not a handler for it, then the C method is called instead.
168              
169             =head2 on_read_high_watermark $length
170              
171             =head2 on_read_low_watermark $length
172              
173             Optional. Invoked when the read buffer grows larger than the high watermark
174             or smaller than the low watermark respectively. These are edge-triggered
175             events; they will only be triggered once per crossing, not continuously while
176             the buffer remains above or below the given limit.
177              
178             If these event handlers are not defined, the default behaviour is to disable
179             read-ready notifications if the read buffer grows larger than the high
180             watermark (so as to avoid it growing arbitrarily if nothing is consuming it),
181             and re-enable notifications again once something has read enough to cause it to
182             drop. If these events are overridden, the overriding code will have to perform
183             this behaviour if required, by using
184              
185             $self->want_readready_for_read(...);
186              
187             =head2 on_outgoing_empty
188              
189             Optional. Invoked when the writing data buffer becomes empty.
190              
191             =head2 on_writeable_start
192              
193             =head2 on_writeable_stop
194              
195             Optional. These two events inform when the filehandle becomes writeable, and
196             when it stops being writeable. C is invoked by the
197             C event if previously it was known to be not writeable.
198             C is invoked after a C operation fails with
199             C or C. These two events track the writeability state,
200             and ensure that only state change cause events to be invoked. A stream starts
201             off being presumed writeable, so the first of these events to be observed will
202             be C.
203              
204             =cut
205              
206             sub _init
207             {
208 712     712   2583 my $self = shift;
209              
210 712         8275 $self->{writequeue} = []; # Queue of Writers
211 712         4899 $self->{readqueue} = []; # Queue of Readers
212 712         5194 $self->{writeable} = 1; # "innocent until proven guilty" (by means of EAGAIN)
213 712         4265 $self->{readbuff} = "";
214              
215 712         8363 $self->{reader} = "_sysread";
216 712         3806 $self->{writer} = "_syswrite";
217              
218 712         4646 $self->{read_len} = $READLEN;
219 712         5310 $self->{write_len} = $WRITELEN;
220              
221 712         3645 $self->{want} = WANT_READ_FOR_READ;
222              
223 712         2987 $self->{close_on_read_eof} = 1;
224             }
225              
226             =head1 PARAMETERS
227              
228             The following named parameters may be passed to C or C:
229              
230             =head2 read_handle => IO
231              
232             The IO handle to read from. Must implement C and C methods.
233              
234             =head2 write_handle => IO
235              
236             The IO handle to write to. Must implement C and C methods.
237              
238             =head2 handle => IO
239              
240             Shortcut to specifying the same IO handle for both of the above.
241              
242             =head2 on_read => CODE
243              
244             =head2 on_read_error => CODE
245              
246             =head2 on_outgoing_empty => CODE
247              
248             =head2 on_write_error => CODE
249              
250             =head2 on_writeable_start => CODE
251              
252             =head2 on_writeable_stop => CODE
253              
254             CODE references for event handlers.
255              
256             =head2 autoflush => BOOL
257              
258             Optional. If true, the C method will attempt to write data to the
259             operating system immediately, without waiting for the loop to indicate the
260             filehandle is write-ready. This is useful, for example, on streams that should
261             contain up-to-date logging or console information.
262              
263             It currently defaults to false for any file handle, but future versions of
264             L may enable this by default on STDOUT and STDERR.
265              
266             =head2 read_len => INT
267              
268             Optional. Sets the buffer size for C calls. Defaults to 8 KiBytes.
269              
270             =head2 read_all => BOOL
271              
272             Optional. If true, attempt to read as much data from the kernel as possible
273             when the handle becomes readable. By default this is turned off, meaning at
274             most one fixed-size buffer is read. If there is still more data in the
275             kernel's buffer, the handle will still be readable, and will be read from
276             again.
277              
278             This behaviour allows multiple streams and sockets to be multiplexed
279             simultaneously, meaning that a large bulk transfer on one cannot starve other
280             filehandles of processing time. Turning this option on may improve bulk data
281             transfer rate, at the risk of delaying or stalling processing on other
282             filehandles.
283              
284             =head2 write_len => INT
285              
286             Optional. Sets the buffer size for C calls. Defaults to 8 KiBytes.
287              
288             =head2 write_all => BOOL
289              
290             Optional. Analogous to the C option, but for writing. When
291             C is enabled, this option only affects deferred writing if the
292             initial attempt failed due to buffer space.
293              
294             =head2 read_high_watermark => INT
295              
296             =head2 read_low_watermark => INT
297              
298             Optional. If defined, gives a way to implement flow control or other
299             behaviours that depend on the size of Stream's read buffer.
300              
301             If after more data is read from the underlying filehandle the read buffer is
302             now larger than the high watermark, the C event is
303             triggered (which, by default, will disable read-ready notifications and pause
304             reading from the filehandle).
305              
306             If after data is consumed by an C handler the read buffer is now
307             smaller than the low watermark, the C event is
308             triggered (which, by default, will re-enable read-ready notifications and
309             resume reading from the filehandle). For to be possible, the read handler
310             would have to be one added by the C method or one of the
311             Future-returning C methods.
312              
313             By default these options are not defined, so this behaviour will not happen.
314             C may not be set to a larger value than
315             C, but it may be set to a smaller value, creating a
316             hysteresis region. If either option is defined then both must be.
317              
318             If these options are used with the default event handlers, be careful not to
319             cause deadlocks by having a high watermark sufficiently low that a single
320             C invocation might not consider it finished yet.
321              
322             =head2 reader => STRING|CODE
323              
324             =head2 writer => STRING|CODE
325              
326             Optional. If defined, gives the name of a method or a CODE reference to use
327             to implement the actual reading from or writing to the filehandle. These will
328             be invoked as
329              
330             $stream->reader( $read_handle, $buffer, $len );
331             $stream->writer( $write_handle, $buffer, $len );
332              
333             Each is expected to modify the passed buffer; C by appending to it,
334             C by removing a prefix from it. Each is expected to return a true
335             value on success, zero on EOF, or C with C<$!> set for errors. If not
336             provided, they will be substituted by implenentations using C and
337             C on the underlying handle, respectively.
338              
339             =head2 close_on_read_eof => BOOL
340              
341             Optional. Usually true, but if set to a false value then the stream will not
342             be Cd when an EOF condition occurs on read. This is normally not useful
343             as at that point the underlying stream filehandle is no longer useable, but it
344             may be useful for reading regular files, or interacting with TTY devices.
345              
346             =head2 encoding => STRING
347              
348             If supplied, sets the name of encoding of the underlying stream. If an
349             encoding is set, then the C method will expect to receive Unicode
350             strings and encodes them into bytes, and incoming bytes will be decoded into
351             Unicode strings for the C event.
352              
353             If an encoding is not supplied then C and C will work in byte
354             strings.
355              
356             I in order to handle reads of UTF-8 content or other
357             multibyte encodings, the code implementing the C event uses a feature
358             of L; the C flag. While this flag has existed for a
359             while and is used by the C<:encoding> PerlIO layer itself for similar
360             purposes, the flag is not officially documented by the C module. In
361             principle this undocumented feature could be subject to change, in practice I
362             believe it to be reasonably stable.
363              
364             This note applies only to the C event; data written using the
365             C method does not rely on any undocumented features of C.
366              
367             If a read handle is given, it is required that either an C callback
368             reference is configured, or that the object provides an C method. It
369             is optional whether either is true for C; if neither is
370             supplied then no action will be taken when the writing buffer becomes empty.
371              
372             An C handler may be supplied even if no read handle is yet given, to
373             be used when a read handle is eventually provided by the C
374             method.
375              
376             This condition is checked at the time the object is added to a Loop; it is
377             allowed to create a C object with a read handle but without
378             a C handler, provided that one is later given using C
379             before the stream is added to its containing Loop, either directly or by being
380             a child of another Notifier already in a Loop, or added to one.
381              
382             =cut
383              
384             sub configure
385             {
386 993     993 1 8033 my $self = shift;
387 993         7917 my %params = @_;
388              
389 993         5777 for (qw( on_read on_outgoing_empty on_read_eof on_write_eof on_read_error
390             on_write_error on_writeable_start on_writeable_stop autoflush
391             read_len read_all write_len write_all on_read_high_watermark
392             on_read_low_watermark reader writer close_on_read_eof )) {
393 17874 100       41223 $self->{$_} = delete $params{$_} if exists $params{$_};
394             }
395              
396 993 100 66     11812 if( exists $params{read_high_watermark} or exists $params{read_low_watermark} ) {
397 1   33     6 my $high = delete $params{read_high_watermark} // $self->{read_high_watermark};
398 1   33     4 my $low = delete $params{read_low_watermark} // $self->{read_low_watermark};
399              
400 1 50 33     8 croak "Cannot set read_low_watermark without read_high_watermark" if defined $low and !defined $high;
401 1 50 33     6 croak "Cannot set read_high_watermark without read_low_watermark" if defined $high and !defined $low;
402              
403 1 50 33     8 croak "Cannot set read_low_watermark higher than read_high_watermark" if defined $low and defined $high and $low > $high;
      33        
404              
405 1         3 $self->{read_high_watermark} = $high;
406 1         2 $self->{read_low_watermark} = $low;
407              
408             # TODO: reassert levels if we've moved them
409             }
410              
411 993 100       2719 if( exists $params{encoding} ) {
412 2         6 my $encoding = delete $params{encoding};
413 2         14 my $obj = find_encoding( $encoding );
414 2 50       115 defined $obj or croak "Cannot handle an encoding of '$encoding'";
415 2         7 $self->{encoding} = $obj;
416             }
417              
418 993         19540 $self->SUPER::configure( %params );
419              
420 993 100 100     3551 if( $self->loop and $self->read_handle ) {
421 5 50       16 $self->can_event( "on_read" ) or
422             croak 'Expected either an on_read callback or to be able to ->on_read';
423             }
424              
425 993 100 100     5471 if( $self->{autoflush} and my $write_handle = $self->write_handle ) {
426 62 50       555 carp "An IO::Async::Stream with autoflush needs an O_NONBLOCK write handle"
427             if $write_handle->blocking;
428             }
429             }
430              
431             sub _add_to_loop
432             {
433 703     703   1463 my $self = shift;
434              
435 703 100       2371 if( defined $self->read_handle ) {
436 592 100       13584 $self->can_event( "on_read" ) or
437             croak 'Expected either an on_read callback or to be able to ->on_read';
438             }
439              
440 702         7996 $self->SUPER::_add_to_loop( @_ );
441              
442 702 100       6028 if( !$self->_is_empty ) {
443 39         200 $self->want_writeready_for_write( 1 );
444             }
445             }
446              
447             =head1 METHODS
448              
449             The following methods documented in C expressions return L
450             instances.
451              
452             =cut
453              
454             =head2 want_readready_for_read
455              
456             =head2 want_readready_for_write
457              
458             $stream->want_readready_for_read( $set );
459              
460             $stream->want_readready_for_write( $set );
461              
462             Mutators for the C property on L, which
463             control whether the C or C behaviour should be continued once the
464             filehandle becomes ready for read.
465              
466             Normally, C is always true (though the read watermark
467             behaviour can modify it), and C is not used.
468             However, if a custom C function is provided, it may find this useful
469             for being invoked again if it cannot proceed with a write operation until the
470             filehandle becomes readable (such as during transport negotiation or SSL key
471             management, for example).
472              
473             =cut
474              
475             sub want_readready_for_read
476             {
477 0     0 1 0 my $self = shift;
478 0         0 my ( $set ) = @_;
479 0 0       0 $set ? ( $self->{want} |= WANT_READ_FOR_READ ) : ( $self->{want} &= ~WANT_READ_FOR_READ );
480              
481 0 0       0 $self->want_readready( $self->{want} & WANT_ANY_READ ) if $self->read_handle;
482             }
483              
484             sub want_readready_for_write
485             {
486 1     1 1 412 my $self = shift;
487 1         3 my ( $set ) = @_;
488 1 50       6 $set ? ( $self->{want} |= WANT_READ_FOR_WRITE ) : ( $self->{want} &= ~WANT_READ_FOR_WRITE );
489              
490 1 50       5 $self->want_readready( $self->{want} & WANT_ANY_READ ) if $self->read_handle;
491             }
492              
493             =head2 want_writeready_for_read
494              
495             =head2 want_writeready_for_write
496              
497             $stream->want_writeready_for_write( $set );
498              
499             $stream->want_writeready_for_read( $set );
500              
501             Mutators for the C property on L, which
502             control whether the C or C behaviour should be continued once the
503             filehandle becomes ready for write.
504              
505             Normally, C is managed by the C method and
506             associated flushing, and C is not used. However, if
507             a custom C function is provided, it may find this useful for being
508             invoked again if it cannot proceed with a read operation until the filehandle
509             becomes writable (such as during transport negotiation or SSL key management,
510             for example).
511              
512             =cut
513              
514             sub want_writeready_for_write
515             {
516 221     221 1 721 my $self = shift;
517 221         590 my ( $set ) = @_;
518 221 100       1172 $set ? ( $self->{want} |= WANT_WRITE_FOR_WRITE ) : ( $self->{want} &= ~WANT_WRITE_FOR_WRITE );
519              
520 221 100       789 $self->want_writeready( $self->{want} & WANT_ANY_WRITE ) if $self->write_handle;
521             }
522              
523             sub want_writeready_for_read
524             {
525 1     1 1 676 my $self = shift;
526 1         3 my ( $set ) = @_;
527 1 50       8 $set ? ( $self->{want} |= WANT_WRITE_FOR_READ ) : ( $self->{want} &= ~WANT_WRITE_FOR_READ );
528              
529 1 50       7 $self->want_writeready( $self->{want} & WANT_ANY_WRITE ) if $self->write_handle;
530             }
531              
532             # FUNCTION not method
533             sub _nonfatal_error
534             {
535 9     9   25 my ( $errno ) = @_;
536              
537 9   66     113 return $errno == EAGAIN ||
538             $errno == EWOULDBLOCK ||
539             $errno == EINTR;
540             }
541              
542             sub _is_empty
543             {
544 1321     1321   2314 my $self = shift;
545 1321         2010 return !@{ $self->{writequeue} };
  1321         14576  
546             }
547              
548             =head2 close
549              
550             $stream->close;
551              
552             A synonym for C. This should not be used when the deferred
553             wait behaviour is required, as the behaviour of C may change in a
554             future version of L. Instead, call C directly.
555              
556             =cut
557              
558             sub close
559             {
560 8     8 1 1192 my $self = shift;
561 8         82 $self->close_when_empty;
562             }
563              
564             =head2 close_when_empty
565              
566             $stream->close_when_empty;
567              
568             If the write buffer is empty, this method calls C on the underlying IO
569             handles, and removes the stream from its containing loop. If the write buffer
570             still contains data, then this is deferred until the buffer is empty. This is
571             intended for "write-then-close" one-shot streams.
572              
573             $stream->write( "Here is my final data\n" );
574             $stream->close_when_empty;
575              
576             Because of this deferred nature, it may not be suitable for error handling.
577             See instead the C method.
578              
579             =cut
580              
581             sub close_when_empty
582             {
583 173     173 1 4502 my $self = shift;
584              
585 173 100       490 return $self->SUPER::close if $self->_is_empty;
586              
587 8         1945 $self->{stream_closing} = 1;
588             }
589              
590             =head2 close_now
591              
592             $stream->close_now;
593              
594             This method immediately closes the underlying IO handles and removes the
595             stream from the containing loop. It will not wait to flush the remaining data
596             in the write buffer.
597              
598             =cut
599              
600             sub close_now
601             {
602 478     478 1 931 my $self = shift;
603              
604 478         1035 foreach ( @{ $self->{writequeue} } ) {
  478         1852  
605 2 100       70 $_->on_error->( $self, "stream closing" ) if $_->on_error;
606             }
607              
608 478         905 undef @{ $self->{writequeue} };
  478         1366  
609 478         1658 undef $self->{stream_closing};
610              
611 478         3934 $self->SUPER::close;
612             }
613              
614             =head2 is_read_eof
615              
616             =head2 is_write_eof
617              
618             $eof = $stream->is_read_eof;
619              
620             $eof = $stream->is_write_eof;
621              
622             Returns true after an EOF condition is reported on either the read or the
623             write handle, respectively.
624              
625             =cut
626              
627             sub is_read_eof
628             {
629 2     2 1 57 my $self = shift;
630 2         14 return $self->{read_eof};
631             }
632              
633             sub is_write_eof
634             {
635 2     2 1 190 my $self = shift;
636 2         19 return $self->{write_eof};
637             }
638              
639             =head2 write
640              
641             $stream->write( $data, %params );
642              
643             This method adds data to the outgoing data queue, or writes it immediately,
644             according to the C parameter.
645              
646             If the C option is set, this method will try immediately to write
647             the data to the underlying filehandle. If this completes successfully then it
648             will have been written by the time this method returns. If it fails to write
649             completely, then the data is queued as if C were not set, and will
650             be flushed as normal.
651              
652             C<$data> can either be a plain string, a L, or a CODE reference. If it
653             is a plain string it is written immediately. If it is not, its value will be
654             used to generate more C<$data> values, eventually leading to strings to be
655             written.
656              
657             If C<$data> is a C, the Stream will wait until it is ready, and take
658             the single value it yields.
659              
660             If C<$data> is a CODE reference, it will be repeatedly invoked to generate new
661             values. Each time the filehandle is ready to write more data to it, the
662             function is invoked. Once the function has finished generating data it should
663             return undef. The function is passed the Stream object as its first argument.
664              
665             It is allowed that Cs yield CODE references, or CODE references return
666             Cs, as well as plain strings.
667              
668             For example, to stream the contents of an existing opened filehandle:
669              
670             open my $fileh, "<", $path or die "Cannot open $path - $!";
671              
672             $stream->write( sub {
673             my ( $stream ) = @_;
674              
675             sysread $fileh, my $buffer, 8192 or return;
676             return $buffer;
677             } );
678              
679             Takes the following optional named parameters in C<%params>:
680              
681             =over 8
682              
683             =item write_len => INT
684              
685             Overrides the C parameter for the data written by this call.
686              
687             =item on_write => CODE
688              
689             A CODE reference which will be invoked after every successful C
690             operation on the underlying filehandle. It will be passed the number of bytes
691             that were written by this call, which may not be the entire length of the
692             buffer - if it takes more than one C operation to empty the buffer
693             then this callback will be invoked multiple times.
694              
695             $on_write->( $stream, $len );
696              
697             =item on_flush => CODE
698              
699             A CODE reference which will be invoked once the data queued by this C
700             call has been flushed. This will be invoked even if the buffer itself is not
701             yet empty; if more data has been queued since the call.
702              
703             $on_flush->( $stream );
704              
705             =item on_error => CODE
706              
707             A CODE reference which will be invoked if a C error happens while
708             performing this write. Invoked as for the C's C event.
709              
710             $on_error->( $stream, $errno );
711              
712             =back
713              
714             If the object is not yet a member of a loop and doesn't yet have a
715             C, then calls to the C method will simply queue the data
716             and return. It will be flushed when the object is added to the loop.
717              
718             If C<$data> is a defined but empty string, the write is still queued, and the
719             C continuation will be invoked, if supplied. This can be used to
720             obtain a marker, to invoke some code once the output queue has been flushed up
721             to this point.
722              
723             =head2 write (scalar)
724              
725             await $stream->write( ... );
726              
727             If called in non-void context, this method returns a L which will
728             complete (with no value) when the write operation has been flushed. This may
729             be used as an alternative to, or combined with, the C callback.
730              
731             =cut
732              
733             sub _syswrite
734             {
735 164     164   11918 my $self = shift;
736 164         604 my ( $handle, undef, $len ) = @_;
737              
738 164         2442 my $written = $handle->syswrite( $_[1], $len );
739 164 100       5594 return $written if !$written; # zero or undef
740              
741 156         569 substr( $_[1], 0, $written ) = "";
742 156         582 return $written;
743             }
744              
745             sub _flush_one_write
746             {
747 175     175   659 my $self = shift;
748              
749 175         718 my $writequeue = $self->{writequeue};
750              
751 175         421 my $head;
752 175   66     4396 while( $head = $writequeue->[0] and ref $head->data ) {
753 18 100 33     1171 if( ref $head->data eq "CODE" ) {
    50          
754 12         373 my $data = $head->data->( $self );
755 12 100       1578 if( !defined $data ) {
756 5 50       160 $head->on_flush->( $self ) if $head->on_flush;
757 5         390 shift @$writequeue;
758 5         144 return 1;
759             }
760 7 100 100     43 if( !ref $data and my $encoding = $self->{encoding} ) {
761 1         25 $data = $encoding->encode( $data );
762             }
763 7         223 unshift @$writequeue, my $new = Writer(
764             $data, $head->writelen, $head->on_write, undef, undef, 0
765             );
766 7         488 next;
767             }
768             elsif( blessed $head->data and $head->data->isa( "Future" ) ) {
769 6         531 my $f = $head->data;
770 6 100       55 if( !$f->is_ready ) {
771 2 50       88 return 0 if $head->watching;
772 2     2   40 $f->on_ready( sub { $self->_flush_one_write } );
  2         751  
773 2         124 $head->watching++;
774 2         24 return 0;
775             }
776 4         47 my $data = $f->result;
777 4 100 100     93 if( !ref $data and my $encoding = $self->{encoding} ) {
778 1         7 $data = $encoding->encode( $data );
779             }
780 4         126 $head->data = $data;
781 4         49 next;
782             }
783             else {
784 0         0 die "Unsure what to do with reference ".ref($head->data)." in write queue";
785             }
786             }
787              
788 168         14377 my $second;
789 168   100     861 while( $second = $writequeue->[1] and
      66        
      66        
      33        
      33        
790             !ref $second->data and
791             $head->writelen == $second->writelen and
792             !$head->on_write and !$second->on_write and
793             !$head->on_flush ) {
794 1         193 $head->data .= $second->data;
795 1         29 $head->on_write = $second->on_write;
796 1         27 $head->on_flush = $second->on_flush;
797 1         13 splice @$writequeue, 1, 1, ();
798             }
799              
800 168 50       3945 die "TODO: head data does not contain a plain string" if ref $head->data;
801              
802 168 50       1594 if( $IO::Async::Debug::DEBUG > 1 ) {
803 0         0 my $data = substr $head->data, 0, $head->writelen;
804 0         0 $self->debug_printf( "WRITE len=%d", length $data );
805 0 0       0 IO::Async::Debug::log_hexdump( $data ) if $IO::Async::Debug::DEBUG_FLAGS{Sw};
806             }
807              
808 168         1205 my $writer = $self->{writer};
809 168         1463 my $len = $self->$writer( $self->write_handle, $head->data, $head->writelen );
810              
811 168 100       626 if( !defined $len ) {
812 3         27 my $errno = $!;
813              
814 3 100 66     22 if( $errno == EAGAIN or $errno == EWOULDBLOCK ) {
815 1 50       10 $self->maybe_invoke_event( on_writeable_stop => ) if $self->{writeable};
816 1         8 $self->{writeable} = 0;
817             }
818              
819 3 100       11 return 0 if _nonfatal_error( $errno );
820              
821 2 50       11 $self->debug_printf( "WRITE err=%d/%s", $errno, $errno ) if $IO::Async::Debug::DEBUG > 1;
822              
823 2 100       7 if( $errno == EPIPE ) {
824 1         19 $self->debug_printf( "WRITE-EOF" );
825 1         2 $self->{write_eof} = 1;
826 1         5 $self->maybe_invoke_event( on_write_eof => );
827             }
828              
829 2 50       114 $head->on_error->( $self, $errno ) if $head->on_error;
830 2 100       10 $self->maybe_invoke_event( on_write_error => $errno )
831             or $self->close_now;
832              
833 2         31 return 0;
834             }
835              
836 165 100 66     986 $METRICS and $METRICS->inc_counter_by( stream_written => $len ) if $len;
837              
838 165 100       9573 if( my $on_write = $head->on_write ) {
839 3         33 $on_write->( $self, $len );
840             }
841              
842 165 100       4720 if( !length $head->data ) {
843 155 100       4441 $head->on_flush->( $self ) if $head->on_flush;
844 155         1663 shift @{ $self->{writequeue} };
  155         596  
845             }
846              
847 165         6448 return 1;
848             }
849              
850             sub write
851             {
852 167     167 1 12867 my $self = shift;
853 167         739 my ( $data, %params ) = @_;
854              
855 167 50 0     829 carp "Cannot write data to a Stream that is closing" and return if $self->{stream_closing};
856              
857             # Allow writes without a filehandle if we're not yet in a Loop, just don't
858             # try to flush them
859 167         1262 my $handle = $self->write_handle;
860              
861 167 100 100     979 croak "Cannot write data to a Stream with no write_handle" if !$handle and $self->loop;
862              
863 166 100 100     1274 if( !ref $data and my $encoding = $self->{encoding} ) {
864 2         13 $data = $encoding->encode( $data );
865             }
866              
867 166         405 my $on_write = delete $params{on_write};
868 166         333 my $on_flush = delete $params{on_flush};
869 166         365 my $on_error = delete $params{on_error};
870              
871 166         294 my $f;
872 166 100       555 if( defined wantarray ) {
873 3         7 my $orig_on_flush = $on_flush;
874 3         7 my $orig_on_error = $on_error;
875              
876 3 50       12 my $loop = $self->loop or
877             croak "Cannot ->write data returning a Future to a Stream not in a Loop";
878 3         19 $f = $loop->new_future;
879             $on_flush = sub {
880 1     1   48 $f->done;
881 1 50       65 $orig_on_flush->( @_ ) if $orig_on_flush;
882 3         26 };
883             $on_error = sub {
884 3     3   114 my $self = shift;
885 3         11 my ( $errno ) = @_;
886              
887 3 100       16 $f->fail( "write failed: $errno", syswrite => $errno ) unless $f->is_ready;
888              
889 3 50       171 $orig_on_error->( $self, @_ ) if $orig_on_error;
890 3         18 };
891             }
892              
893 166   33     4521 my $write_len = $params{write_len} // $self->{write_len};
894              
895 166         333 push @{ $self->{writequeue} }, Writer(
  166         2211  
896             $data, $write_len, $on_write, $on_flush, $on_error, 0
897             );
898              
899 166 50       22937 keys %params and croak "Unrecognised keys for ->write - " . join( ", ", keys %params );
900              
901 166 100       666 return $f unless $handle;
902              
903 130 100       420 if( $self->{autoflush} ) {
904 102   66     467 1 while !$self->_is_empty and $self->_flush_one_write;
905              
906 102 50       239 if( $self->_is_empty ) {
907 102         967 $self->want_writeready_for_write( 0 );
908 102         564 return $f;
909             }
910             }
911              
912 28         118 $self->want_writeready_for_write( 1 );
913 28         131 return $f;
914             }
915              
916             sub on_write_ready
917             {
918 68     68 1 238 my $self = shift;
919              
920 68 100       413 if( !$self->{writeable} ) {
921 1         7 $self->maybe_invoke_event( on_writeable_start => );
922 1         7 $self->{writeable} = 1;
923             }
924              
925 68 100       1295 $self->_do_write if $self->{want} & WANT_WRITE_FOR_WRITE;
926 68 100       638 $self->_do_read if $self->{want} & WANT_WRITE_FOR_READ;
927             }
928              
929             sub _do_write
930             {
931 68     68   285 my $self = shift;
932              
933 68   100     361 1 while !$self->_is_empty and $self->_flush_one_write and $self->{write_all};
      100        
934              
935             # All data successfully flushed
936 68 100       466 if( $self->_is_empty ) {
937 51         299 $self->want_writeready_for_write( 0 );
938              
939 51         382 $self->maybe_invoke_event( on_outgoing_empty => );
940              
941 51 100       418 $self->close_now if $self->{stream_closing};
942             }
943             }
944              
945             sub _flush_one_read
946             {
947 1150     1150   2097 my $self = shift;
948 1150         2351 my ( $eof ) = @_;
949              
950 1150         4810 local $self->{flushing_read} = 1;
951              
952 1150         2254 my $readqueue = $self->{readqueue};
953              
954 1150         1821 my $ret;
955 1150 100 66     4501 if( $readqueue->[0] and my $on_read = $readqueue->[0]->on_read ) {
956 17         835 $ret = $on_read->( $self, \$self->{readbuff}, $eof );
957             }
958             else {
959 1133         7603 $ret = $self->invoke_event( on_read => \$self->{readbuff}, $eof );
960             }
961              
962 1150 100 100     4944 if( defined $self->{read_low_watermark} and $self->{at_read_high_watermark} and
      66        
963             length $self->{readbuff} < $self->{read_low_watermark} ) {
964 1         3 undef $self->{at_read_high_watermark};
965 1         6 $self->invoke_event( on_read_low_watermark => length $self->{readbuff} );
966             }
967              
968 1150 100 100     5018 if( ref $ret eq "CODE" ) {
    100          
969             # Replace the top CODE, or add it if there was none
970 1         7 $readqueue->[0] = Reader( $ret, undef );
971 1         87 return 1;
972             }
973             elsif( @$readqueue and !defined $ret ) {
974 13         25 shift @$readqueue;
975 13         172 return 1;
976             }
977             else {
978 1136   100     10313 return $ret && ( length( $self->{readbuff} ) > 0 || $eof );
979             }
980             }
981              
982             sub _sysread
983             {
984 941     941   2062 my $self = shift;
985 941         2741 my ( $handle, undef, $len ) = @_;
986 941         8160 return $handle->sysread( $_[1], $len );
987             }
988              
989             sub on_read_ready
990             {
991 940     940 1 2123 my $self = shift;
992              
993 940 50       8572 $self->_do_read if $self->{want} & WANT_READ_FOR_READ;
994 940 100       11999 $self->_do_write if $self->{want} & WANT_READ_FOR_WRITE;
995             }
996              
997             sub _do_read
998             {
999 941     941   2002 my $self = shift;
1000              
1001 941         8688 my $handle = $self->read_handle;
1002 941         2944 my $reader = $self->{reader};
1003              
1004 941         1482 while(1) {
1005 945         1436 my $data;
1006 945         8277 my $len = $self->$reader( $handle, $data, $self->{read_len} );
1007              
1008 945 100       19546 if( !defined $len ) {
1009 6         43 my $errno = $!;
1010              
1011 6 100       21 return if _nonfatal_error( $errno );
1012              
1013 4 50       12 $self->debug_printf( "READ err=%d/%s", $errno, $errno ) if $IO::Async::Debug::DEBUG > 1;
1014              
1015 4 100       29 $self->maybe_invoke_event( on_read_error => $errno )
1016             or $self->close_now;
1017              
1018 4         23 foreach ( @{ $self->{readqueue} } ) {
  4         14  
1019 1 50       40 $_->future->fail( "read failed: $errno", sysread => $errno ) if $_->future;
1020             }
1021 4         145 undef @{ $self->{readqueue} };
  4         17  
1022              
1023 4         11 return;
1024             }
1025              
1026 939 50       2828 if( $IO::Async::Debug::DEBUG > 1 ) {
1027 0         0 $self->debug_printf( "READ len=%d", $len );
1028 0 0       0 IO::Async::Debug::log_hexdump( $data ) if $IO::Async::Debug::DEBUG_FLAGS{Sr};
1029             }
1030              
1031 939 100 66     3544 $METRICS and $METRICS->inc_counter_by( stream_read => $len ) if $len;
1032              
1033 939         10884 my $eof = $self->{read_eof} = ( $len == 0 );
1034              
1035 939 100       3042 if( my $encoding = $self->{encoding} ) {
1036 4 100       19 my $bytes = defined $self->{bytes_remaining} ? $self->{bytes_remaining} . $data : $data;
1037 4         38 $data = $encoding->decode( $bytes, STOP_AT_PARTIAL );
1038 4         55 $self->{bytes_remaining} = $bytes;
1039             }
1040              
1041 939 100       4327 $self->{readbuff} .= $data if !$eof;
1042              
1043 939         4089 1 while $self->_flush_one_read( $eof );
1044              
1045 939 100       2704 if( $eof ) {
1046 476         3105 $self->debug_printf( "READ-EOF" );
1047 476         2643 $self->maybe_invoke_event( on_read_eof => );
1048 476 100       3734 $self->close_now if $self->{close_on_read_eof};
1049 476         1054 foreach ( @{ $self->{readqueue} } ) {
  476         1629  
1050 0 0       0 $_->future->done( undef ) if $_->future;
1051             }
1052 476         786 undef @{ $self->{readqueue} };
  476         1238  
1053 476         1981 return;
1054             }
1055              
1056 463 100       1955 last unless $self->{read_all};
1057             }
1058              
1059 459 100 66     1776 if( defined $self->{read_high_watermark} and length $self->{readbuff} >= $self->{read_high_watermark} ) {
1060             $self->{at_read_high_watermark} or
1061 1 50       8 $self->invoke_event( on_read_high_watermark => length $self->{readbuff} );
1062              
1063 1         6 $self->{at_read_high_watermark} = 1;
1064             }
1065             }
1066              
1067             sub on_read_high_watermark
1068             {
1069 0     0 1 0 my $self = shift;
1070 0         0 $self->want_readready_for_read( 0 );
1071             }
1072              
1073             sub on_read_low_watermark
1074             {
1075 0     0 1 0 my $self = shift;
1076 0         0 $self->want_readready_for_read( 1 );
1077             }
1078              
1079             =head2 push_on_read
1080              
1081             $stream->push_on_read( $on_read );
1082              
1083             Pushes a new temporary C handler to the end of the queue. This queue,
1084             if non-empty, is used to provide C event handling code in preference
1085             to using the object's main event handler or method. New handlers can be
1086             supplied at any time, and they will be used in first-in first-out (FIFO)
1087             order.
1088              
1089             As with the main C event handler, each can return a (defined) boolean
1090             to indicate if they wish to be invoked again or not, another C reference
1091             to replace themself with, or C to indicate it is now complete and
1092             should be removed. When a temporary handler returns C it is shifted
1093             from the queue and the next one, if present, is invoked instead. If there are
1094             no more then the object's main handler is invoked instead.
1095              
1096             =cut
1097              
1098             sub push_on_read
1099             {
1100 13     13 1 38 my $self = shift;
1101 13         44 my ( $on_read, %args ) = @_;
1102             # %args undocumented for internal use
1103              
1104 13         26 push @{ $self->{readqueue} }, Reader( $on_read, $args{future} );
  13         63  
1105              
1106             # TODO: Should this always defer?
1107 13 100       830 return if $self->{flushing_read};
1108 12   100     55 1 while length $self->{readbuff} and $self->_flush_one_read( 0 );
1109             }
1110              
1111             =head1 FUTURE-RETURNING READ METHODS
1112              
1113             The following methods all return a L which will become ready when
1114             enough data has been read by the Stream into its buffer. At this point, the
1115             data is removed from the buffer and given to the C object to complete
1116             it.
1117              
1118             my $string = await $stream->read_...
1119              
1120             Unlike the C event handlers, these methods don't allow for access to
1121             "partial" results; they only provide the final result once it is ready.
1122              
1123             If a C is cancelled before it completes it is removed from the read
1124             queue without consuming any data; i.e. each C atomically either
1125             completes or is cancelled.
1126              
1127             Since it is possible to use a readable C entirely using these
1128             C-returning methods instead of the C event, it may be useful
1129             to configure a trivial return-false event handler to keep it from consuming
1130             any input, and to allow it to be added to a C in the first place.
1131              
1132             my $stream = IO::Async::Stream->new( on_read => sub { 0 }, ... );
1133             $loop->add( $stream );
1134              
1135             my $f = $stream->read_...
1136              
1137             If a read EOF or error condition happens while there are read Cs
1138             pending, they are all completed. In the case of a read EOF, they are done with
1139             C; in the case of a read error they are failed using the C<$!> error
1140             value as the failure.
1141              
1142             $f->fail( $message, sysread => $! )
1143              
1144             If a read EOF condition happens to the currently-processing read C, it
1145             will return a partial result. The calling code can detect this by the fact
1146             that the returned data is not complete according to the specification (too
1147             short in C's case, or lacking the ending pattern in
1148             C's case). Additionally, each C will yield the C<$eof>
1149             value in its results.
1150              
1151             my ( $string, $eof ) = await ...;
1152              
1153             =cut
1154              
1155             sub _read_future
1156             {
1157 11     11   24 my $self = shift;
1158 11         40 my $f = $self->loop->new_future;
1159             $f->on_cancel( $self->_capture_weakself( sub {
1160 1 50   1   5 my $self = shift or return;
1161 1         4 1 while $self->_flush_one_read;
1162 11         76 }));
1163 11         261 return $f;
1164             }
1165              
1166             =head2 read_atmost
1167              
1168             =head2 read_exactly
1169              
1170             ( $string, $eof ) = await $stream->read_atmost( $len );
1171              
1172             ( $string, $eof ) = await $stream->read_exactly( $len );
1173              
1174             Completes the C when the read buffer contains C<$len> or more
1175             characters of input. C will also complete after the first
1176             invocation of C, even if fewer characters are available, whereas
1177             C will wait until at least C<$len> are available.
1178              
1179             =cut
1180              
1181             sub read_atmost
1182             {
1183 2     2 1 585 my $self = shift;
1184 2         5 my ( $len ) = @_;
1185              
1186 2         8 my $f = $self->_read_future;
1187             $self->push_on_read( sub {
1188 1     1   5 my ( undef, $buffref, $eof ) = @_;
1189 1 50       11 return undef if $f->is_cancelled;
1190 1         19 $f->done( substr( $$buffref, 0, $len, "" ), $eof );
1191 1         66 return undef;
1192 2         17 }, future => $f );
1193 2         9 return $f;
1194             }
1195              
1196             sub read_exactly
1197             {
1198 4     4 1 1258 my $self = shift;
1199 4         11 my ( $len ) = @_;
1200              
1201 4         13 my $f = $self->_read_future;
1202             $self->push_on_read( sub {
1203 4     4   14 my ( undef, $buffref, $eof ) = @_;
1204 4 50       14 return undef if $f->is_cancelled;
1205 4 50 33     40 return 0 unless $eof or length $$buffref >= $len;
1206 4         25 $f->done( substr( $$buffref, 0, $len, "" ), $eof );
1207 4         292 return undef;
1208 4         25 }, future => $f );
1209 4         32 return $f;
1210             }
1211              
1212             =head2 read_until
1213              
1214             ( $string, $eof ) = await $stream->read_until( $end );
1215              
1216             Completes the C when the read buffer contains a match for C<$end>,
1217             which may either be a plain string or a compiled C reference. Yields
1218             the prefix of the buffer up to and including this match.
1219              
1220             =cut
1221              
1222             sub read_until
1223             {
1224 4     4 1 1742 my $self = shift;
1225 4         10 my ( $until ) = @_;
1226              
1227 4 100       40 ref $until or $until = qr/\Q$until\E/;
1228              
1229 4         11 my $f = $self->_read_future;
1230             $self->push_on_read( sub {
1231 5     5   34 my ( undef, $buffref, $eof ) = @_;
1232 5 100       17 return undef if $f->is_cancelled;
1233 4 100       48 if( $$buffref =~ $until ) {
    50          
1234 3         22 $f->done( substr( $$buffref, 0, $+[0], "" ), $eof );
1235 3         165 return undef;
1236             }
1237             elsif( $eof ) {
1238 0         0 $f->done( $$buffref, $eof ); $$buffref = "";
  0         0  
1239 0         0 return undef;
1240             }
1241             else {
1242 1         4 return 0;
1243             }
1244 4         27 }, future => $f );
1245 4         15 return $f;
1246             }
1247              
1248             =head2 read_until_eof
1249              
1250             ( $string, $eof ) = await $stream->read_until_eof;
1251              
1252             Completes the C when the stream is eventually closed at EOF, and
1253             yields all of the data that was available.
1254              
1255             =cut
1256              
1257             sub read_until_eof
1258             {
1259 1     1 1 614 my $self = shift;
1260              
1261 1         4 my $f = $self->_read_future;
1262             $self->push_on_read( sub {
1263 2     2   6 my ( undef, $buffref, $eof ) = @_;
1264 2 50       10 return undef if $f->is_cancelled;
1265 2 100       18 return 0 unless $eof;
1266 1         6 $f->done( $$buffref, $eof ); $$buffref = "";
  1         52  
1267 1         3 return undef;
1268 1         9 }, future => $f );
1269 1         4 return $f;
1270             }
1271              
1272             =head1 UTILITY CONSTRUCTORS
1273              
1274             =cut
1275              
1276             =head2 new_for_stdin
1277              
1278             =head2 new_for_stdout
1279              
1280             =head2 new_for_stdio
1281              
1282             $stream = IO::Async::Stream->new_for_stdin;
1283              
1284             $stream = IO::Async::Stream->new_for_stdout;
1285              
1286             $stream = IO::Async::Stream->new_for_stdio;
1287              
1288             Return a C object preconfigured with the correct
1289             C, C or both.
1290              
1291             =cut
1292              
1293 1     1 1 17 sub new_for_stdin { shift->new( read_handle => \*STDIN, @_ ) }
1294 1     1 1 20 sub new_for_stdout { shift->new( write_handle => \*STDOUT, @_ ) }
1295              
1296 1     1 1 264 sub new_for_stdio { shift->new( read_handle => \*STDIN, write_handle => \*STDOUT, @_ ) }
1297              
1298             =head2 connect
1299              
1300             $future = $stream->connect( %args );
1301              
1302             A convenient wrapper for calling the C method on the underlying
1303             L object, passing the C hint as C if not
1304             otherwise supplied.
1305              
1306             =cut
1307              
1308             sub connect
1309             {
1310 0     0 1   my $self = shift;
1311 0           return $self->SUPER::connect( socktype => "stream", @_ );
1312             }
1313              
1314             =head1 DEBUGGING FLAGS
1315              
1316             The following flags in C enable extra logging:
1317              
1318             =over 4
1319              
1320             =item C
1321              
1322             Log byte buffers as data is read from a Stream
1323              
1324             =item C
1325              
1326             Log byte buffers as data is written to a Stream
1327              
1328             =back
1329              
1330             =cut
1331              
1332             =head1 EXAMPLES
1333              
1334             =head2 A line-based C method
1335              
1336             The following C method accepts incoming C<\n>-terminated lines and
1337             prints them to the program's C stream.
1338              
1339             sub on_read
1340             {
1341             my $self = shift;
1342             my ( $buffref, $eof ) = @_;
1343              
1344             while( $$buffref =~ s/^(.*\n)// ) {
1345             print "Received a line: $1";
1346             }
1347              
1348             return 0;
1349             }
1350              
1351             Because a reference to the buffer itself is passed, it is simple to use a
1352             C regular expression on the scalar it points at, to both check if data
1353             is ready (i.e. a whole line), and to remove it from the buffer. Since it
1354             always removes as many complete lines as possible, it doesn't need invoking
1355             again when it has finished, so it can return a constant C<0>.
1356              
1357             =head2 Reading binary data
1358              
1359             This C method accepts incoming records in 16-byte chunks, printing
1360             each one.
1361              
1362             sub on_read
1363             {
1364             my ( $self, $buffref, $eof ) = @_;
1365              
1366             if( length $$buffref >= 16 ) {
1367             my $record = substr( $$buffref, 0, 16, "" );
1368             print "Received a 16-byte record: $record\n";
1369              
1370             return 1;
1371             }
1372              
1373             if( $eof and length $$buffref ) {
1374             print "EOF: a partial record still exists\n";
1375             }
1376              
1377             return 0;
1378             }
1379              
1380             This time, rather than a C loop we have decided to have the handler
1381             just process one record, and use the C mechanism to ask that the
1382             handler be invoked again if there still remains data that might contain
1383             another record; only stopping with C when we know we can't find one.
1384              
1385             The 4-argument form of C extracts the 16-byte record from the buffer
1386             and assigns it to the C<$record> variable, if there was enough data in the
1387             buffer to extract it.
1388              
1389             A lot of protocols use a fixed-size header, followed by a variable-sized body
1390             of data, whose size is given by one of the fields of the header. The following
1391             C method extracts messages in such a protocol.
1392              
1393             sub on_read
1394             {
1395             my ( $self, $buffref, $eof ) = @_;
1396              
1397             return 0 unless length $$buffref >= 8; # "N n n" consumes 8 bytes
1398              
1399             my ( $len, $x, $y ) = unpack "N n n", $$buffref;
1400              
1401             return 0 unless length $$buffref >= 8 + $len;
1402              
1403             substr( $$buffref, 0, 8, "" );
1404             my $data = substr( $$buffref, 0, $len, "" );
1405              
1406             print "A record with values x=$x y=$y\n";
1407              
1408             return 1;
1409             }
1410              
1411             In this example, the header is Ced first, to extract the body
1412             length, and then the body is extracted. If the buffer does not have enough
1413             data yet for a complete message then C<0> is returned, and the buffer is left
1414             unmodified for next time. Only when there are enough bytes in total does it
1415             use C to remove them.
1416              
1417             =head2 Dynamic replacement of C
1418              
1419             Consider the following protocol (inspired by IMAP), which consists of
1420             C<\n>-terminated lines that may have an optional data block attached. The
1421             presence of such a data block, as well as its size, is indicated by the line
1422             prefix.
1423              
1424             sub on_read
1425             {
1426             my $self = shift;
1427             my ( $buffref, $eof ) = @_;
1428              
1429             if( $$buffref =~ s/^DATA (\d+):(.*)\n// ) {
1430             my $length = $1;
1431             my $line = $2;
1432              
1433             return sub {
1434             my $self = shift;
1435             my ( $buffref, $eof ) = @_;
1436              
1437             return 0 unless length $$buffref >= $length;
1438              
1439             # Take and remove the data from the buffer
1440             my $data = substr( $$buffref, 0, $length, "" );
1441              
1442             print "Received a line $line with some data ($data)\n";
1443              
1444             return undef; # Restore the original method
1445             }
1446             }
1447             elsif( $$buffref =~ s/^LINE:(.*)\n// ) {
1448             my $line = $1;
1449              
1450             print "Received a line $line with no data\n";
1451              
1452             return 1;
1453             }
1454             else {
1455             print STDERR "Unrecognised input\n";
1456             # Handle it somehow
1457             }
1458             }
1459              
1460             In the case where trailing data is supplied, a new temporary C
1461             callback is provided in a closure. This closure captures the C<$length>
1462             variable so it knows how much data to expect. It also captures the C<$line>
1463             variable so it can use it in the event report. When this method has finished
1464             reading the data, it reports the event, then restores the original method by
1465             returning C.
1466              
1467             =head1 SEE ALSO
1468              
1469             =over 4
1470              
1471             =item *
1472              
1473             L - Supply object methods for I/O handles
1474              
1475             =back
1476              
1477             =head1 AUTHOR
1478              
1479             Paul Evans
1480              
1481             =cut
1482              
1483             0x55AA;