File Coverage

blib/lib/Future/Buffer.pm
Criterion Covered Total %
statement 120 122 98.3
branch 35 46 76.0
condition 24 57 42.1
subroutine 25 25 100.0
pod 10 10 100.0
total 214 260 82.3


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2020-2024 -- leonerd@leonerd.org.uk
5              
6             package Future::Buffer 0.06;
7              
8 8     8   2393584 use v5.14;
  8         38  
9 8     8   56 use warnings;
  8         101  
  8         605  
10              
11 8     8   5974 use Future;
  8         157494  
  8         2126  
12              
13 8     8   93 use Scalar::Util qw( weaken );
  8         24  
  8         11575  
14              
15             =head1 NAME
16              
17             C - a string buffer that uses Futures
18              
19             =head1 SYNOPSIS
20              
21             use Future::Buffer;
22              
23             use Future::AsyncAwait;
24             use Future::IO;
25              
26             my $buffer = Future::Buffer->new(
27             fill => sub { Future::IO->sysread( $socket, 8192 ) }
28             );
29              
30             async sub print_lines
31             {
32             while(1) {
33             my $line = await $buffer->read_until( "\n" );
34             chomp $line;
35              
36             say "Got a line: $line";
37             }
38             }
39              
40             await print_lines();
41              
42             =head1 DESCRIPTION
43              
44             Objects in this class provide a string buffer, on which operations return
45             L instances which will complete when data is available. Data can be
46             inserted into the buffer either in a push-based manner by calling the C
47             method, or in a pull-based manner by providing it with a C callback by
48             which it can request data itself. This flexibility allows the buffer to act as
49             an adapter between push- and pull-based providers and consumers.
50              
51             Each C-like method returns a L which will complete once there
52             are enough bytes in the buffer to satisfy the required condition. The buffer
53             behaves somewhat like a pipe, where bytes provided at the writing end (either
54             by the C method or the C callback) are eventually consumed at the
55             reading end by one of the C futures.
56              
57             Multiple C futures can remain pending at once, and will be completed in
58             the order they were created when more data is eventually available. Thus, any
59             call to the C method to provide more data can potentially result in
60             multiple futures becoming ready.
61              
62             I the buffer supports an end-of-file condition. The
63             L method or a C callback future yielding an empty result will
64             mark that the buffer is now closed. Once it has exhausted the remaining stored
65             data any further read futures will yield empty.
66              
67             =cut
68              
69             =head1 CONSTRUCTOR
70              
71             =cut
72              
73             =head2 new
74              
75             $buffer = Future::Buffer->new( %args );
76              
77             Returns a new L instance.
78              
79             Takes the following named arguments:
80              
81             =over 4
82              
83             =item fill => CODE
84              
85             $data = await $fill->();
86              
87             Optional callback which the buffer will invoke when it needs more data.
88              
89             Any read futures which are waiting on the fill future are constructed by using
90             the fill future as a prototype, ensuring they have the correct type.
91              
92             If the result is an empty list this will be treated as an end-of-file
93             notification and the buffer is closed.
94              
95             =back
96              
97             =cut
98              
99             sub new
100             {
101 11     11 1 1890849 my $class = shift;
102 11         46 my %args = @_;
103              
104             return bless {
105             pending => [],
106             data => "",
107             fill => $args{fill},
108 11         107 }, $class;
109             }
110              
111             =head1 METHODS
112              
113             =cut
114              
115             sub _fill
116             {
117 19     19   33 my $self = shift;
118              
119 19 100       85 return $self->{fill_f} if $self->{fill_f};
120              
121 15         56 my $fill = $self->{fill};
122              
123             # Arm the fill loop
124 15         48 my $f = $self->{fill_f} = $fill->(); # TODO: give it a size hint?
125              
126 15         280 weaken( my $weakself = $self );
127              
128             $f->on_done( sub {
129 13 50   13   3759 my $self = $weakself or return;
130              
131 13         37 undef $self->{fill_f};
132              
133 13 100       35 if( @_ ) {
134 11         25 my ( $data ) = @_;
135 11         34 $self->{data} .= $data;
136             }
137             else {
138 2         6 $self->{at_eof} = 1;
139             }
140              
141 13         76 $self->_invoke_pending;
142              
143 13 100       187 $self->_fill if @{ $self->{pending} };
  13         54  
144 15         151 });
145             }
146              
147             sub _new_read_future
148             {
149 33     33   66 my $self = shift;
150 33         72 my ( $code ) = @_;
151              
152 33         105 my $pending = $self->{pending};
153              
154             # First see if the buffer is already sufficient;
155 33 100 100     177 if( !@$pending and
156             ( my @ret = $code->( \$self->{data} ) ) ) {
157 10         43 return Future->done( @ret );
158             }
159              
160 23         50 my $f;
161 23 100 66     106 if( $self->{fill} and my $fill_f = $self->_fill ) {
162 16         377 $f = $fill_f->new;
163             }
164             else {
165 7         51 $f = Future->new;
166             }
167              
168 23         264 push @$pending, [ $code, $f ];
169              
170 23 100       85 $self->_invoke_pending if length $self->{data};
171              
172             $f->on_cancel( sub {
173 3   100 3   112 shift @$pending while @$pending and $pending->[0]->[1]->is_cancelled;
174 3 100 66     62 return if @$pending or !$self->{fill_f};
175              
176 1         6 $self->{fill_f}->cancel;
177 1         67 undef $self->{fill_f};
178 23         197 } );
179              
180 23         670 return $f;
181             }
182              
183             sub _invoke_pending
184             {
185 23     23   53 my $self = shift;
186              
187 23         53 my $pending = $self->{pending};
188              
189 23   100     151 while( @$pending and length $self->{data} ) {
190 20         173 my $p = $pending->[0];
191 20 50 0     108 shift @$pending and next if $p->[1]->is_cancelled;
192              
193 20 100       175 defined( my $ret = $p->[0]->( \$self->{data} ) )
194             or last;
195              
196 16         38 shift @$pending;
197 16         66 $p->[1]->done( $ret );
198             }
199 23   100     816 while( @$pending and $self->{at_eof} ) {
200 3         6 my $p = $pending->[0];
201 3 50 0     19 shift @$pending and next if $p->[1]->is_cancelled;
202              
203 3         21 shift @$pending;
204 3         16 $p->[1]->done();
205             }
206             }
207              
208             =head2 length
209              
210             $len = $buffer->length;
211              
212             Returns the length of the currently-stored data; that is, data that has been
213             provided by C calls or the C callback but not yet consumed by a
214             C future.
215              
216             =cut
217              
218 8     8 1 82 sub length :method { length $_[0]->{data} }
219              
220             =head2 is_empty
221              
222             $empty = $buffer->is_empty;
223              
224             Returns true if the stored length is zero.
225              
226             =cut
227              
228 6     6 1 28974 sub is_empty { shift->length == 0 }
229              
230             =head2 write
231              
232             $f = $buffer->write( $data );
233              
234             Appends to the stored data, invoking any pending C futures that are
235             outstanding and can now complete.
236              
237             Currently this method returns an already-completed C. Some later
238             version may implement a buffer maximum size, and choose not to complete this
239             future until there is enough space to accept the new data. For now it is safe
240             for the caller to ignore the return value, but it may become not so.
241              
242             =cut
243              
244             sub write
245             {
246 14     14 1 3677 my $self = shift;
247 14         65 $self->{data} .= $_[0];
248              
249 14 100       40 $self->_invoke_pending if @{ $self->{pending} };
  14         86  
250              
251 14         118 return Future->done;
252             }
253              
254             =head2 close
255              
256             $buffer->close;
257              
258             Marks that the buffer is now at EOF condition. Once any remaining buffered
259             content is consumed, any further read futures will all yield EOF condition.
260              
261             =cut
262              
263             sub close
264             {
265 1     1 1 6 my $self = shift;
266 1         3 $self->{at_eof} = 1;
267              
268 1 50       2 $self->_invoke_pending if @{ $self->{pending} };
  1         8  
269              
270 1         96 return Future->done;
271             }
272              
273             =head2 read_atmost
274              
275             $data = await $buffer->read_atmost( $len );
276              
277             Returns a future which will complete when there is some data available in the
278             buffer and will yield I the given length. Note that, analogous to
279             calling the C IO method on a filehandle, this can still complete and
280             yield a shorter length if less is currently available.
281              
282             If the stream is closed and there is no remaining data, the returned future
283             will yield empty.
284              
285             =cut
286              
287             sub read_atmost
288             {
289 27     27 1 16550 my $self = shift;
290 27         63 my ( $maxlen ) = @_;
291              
292             return $self->_new_read_future(
293             sub {
294 35     35   75 my ( $dref ) = @_;
295 35 100       142 return unless length $$dref;
296              
297 20         127 return substr( $$dref, 0, $maxlen, "" );
298             }
299 27         216 );
300             }
301              
302             =head2 read_exactly
303              
304             $data = await $buffer->read_exactly( $len );
305              
306             Returns a future which will complete when there is enough data available in
307             the buffer to yield exactly the length given.
308              
309             If the stream is closed and there is no remaining data, the returned future
310             will yield empty.
311              
312             =cut
313              
314             sub read_exactly
315             {
316 3     3 1 1451 my $self = shift;
317 3         10 my ( $len ) = @_;
318              
319             return $self->_new_read_future(
320             sub {
321 8     8   16 my ( $dref ) = @_;
322 8 100       40 return unless length $$dref >= $len;
323              
324 3         21 return substr( $$dref, 0, $len, "" );
325             }
326 3         29 );
327             }
328              
329             =head2 read_until
330              
331             $data = await $buffer->read_until( $pattern );
332              
333             Returns a future which will complete when the buffer contains a match for the
334             given pattern (which may either be a plain string or a compiled C).
335             The future will yield the contents of the buffer up to and including this
336             match.
337              
338             If the stream is closed and there is no remaining data, the returned future
339             will yield empty.
340              
341             For example, a C-like operation can be performed by
342              
343             $f = $buffer->read_until( "\x0d\x0a" );
344              
345             =cut
346              
347             sub read_until
348             {
349 2     2 1 317 my $self = shift;
350 2         7 my ( $pattern ) = @_;
351              
352 2 50       42 $pattern = qr/\Q$pattern/ unless ref $pattern eq "Regexp";
353              
354             return $self->_new_read_future(
355             sub {
356 4     4   9 my ( $dref ) = @_;
357 4 100       36 return unless $$dref =~ m/$pattern/;
358              
359 2         19 return substr( $$dref, 0, $+[0], "" );
360             }
361 2         45 );
362             }
363              
364             =head2 read_unpacked
365              
366             $data = await $buffer->read_unpacked( $pack_format );
367              
368             I
369              
370             Returns a future which will complete when the buffer contains enough data to
371             unpack all of the requested fields using the given C format. The
372             future will yield a list of all the fields extracted by the format.
373              
374             If the stream is closed and there is no remaining data, the returned future
375             will yield empty.
376              
377             Note that because the implementation is shamelessly stolen from
378             L the same limitations on what pack formats are
379             recognized will apply.
380              
381             =cut
382              
383             # Gratuitously stolen from IO::Handle::Packable
384              
385             use constant {
386 8         7291 BYTES_FMT_i => length( pack "i", 0 ),
387             BYTES_FMT_f => length( pack "f", 0 ),
388             BYTES_FMT_d => length( pack "d", 0 ),
389 8     8   80 };
  8         35  
390              
391             sub _length_of_packformat
392             {
393 1     1   3 my ( $format ) = @_;
394 1         3 local $_ = $format;
395              
396 1         2 my $bytes = 0;
397 1         5 while( length ) {
398 3         11 s/^\s+//;
399 3 50       9 length or last;
400              
401 3         6 my $this;
402              
403             # Basic template
404 3 0 100     25 s/^[aAcC]// and $this = 1 or
      50        
      66        
      0        
      33        
      0        
      0        
      0        
      0        
      0        
      0        
      0        
      0        
405             s/^[sSnv]// and $this = 2 or
406             s/^[iI]// and $this = BYTES_FMT_i or
407             s/^[lLNV]// and $this = 4 or
408             s/^[qQ]// and $this = 8 or
409             s/^f// and $this = BYTES_FMT_f or
410             s/^d// and $this = BYTES_FMT_d or
411 0         0 die "TODO: unrecognised template char ${\substr $_, 0, 1}\n";
412              
413             # Ignore endian specifiers
414 3         8 s/^[<>]//;
415              
416             # Repeat count
417 3 50       12 s/^(\d+)// and $this *= $1;
418              
419 3         7 $bytes += $this;
420             }
421              
422 1         4 return $bytes;
423             }
424              
425             sub read_unpacked
426             {
427 1     1 1 50 my $self = shift;
428 1         3 my ( $format ) = @_;
429              
430 1         5 my $len = _length_of_packformat $format;
431             return $self->_new_read_future(
432             sub {
433 1     1   2 my ( $dref ) = @_;
434 1 50       5 return unless length $$dref >= $len;
435              
436 1         20 return unpack $format, substr( $$dref, 0, $len, "" );
437             }
438 1         10 );
439             }
440              
441             =head2 unread
442              
443             $buffer->unread( $data );
444              
445             I
446              
447             Prepends more data back into the buffer,
448              
449             It is uncommon to need this method, but it may be useful in certain situations
450             such as when it is hard to determine upfront how much data needs to be read
451             for a single operation, and it turns out too much was read. The trailing
452             content past what is needed can be put back for a later operation.
453              
454             Note that use of this method causes an inherent race condition between
455             outstanding read futures and existing data in the buffer. If there are no
456             pending futures then this is safe. If there is no existing data already in the
457             buffer this is also safe. If neither of these is true then a warning is
458             printed indicating that the logic of the caller is not well-defined.
459              
460             =cut
461              
462             sub unread
463             {
464 2     2 1 539 my $self = shift;
465 2         7 my ( $data ) = @_;
466              
467 2 50 66     4 if( @{ $self->{pending} } and length $self->{data} ) {
  2         15  
468 0         0 warn "Racy use of ->unread with both pending read futures and existing data";
469             }
470              
471 2         7 $self->{data} = $data . $self->{data};
472 2 100       5 $self->_invoke_pending if @{ $self->{pending} };
  2         10  
473              
474 2         9 return Future->done;
475             }
476              
477             =head1 TODO
478              
479             =over 4
480              
481             =item *
482              
483             An "on-read" event, taking maybe inspiration from L. This
484             would allow both pull- and push-based consumers.
485              
486             =item *
487              
488             Size limitation. Allow an upper bound of stored data, make C calls
489             return pending futures until buffer can accept it. Needs consideration of
490             unbounded C though.
491              
492             =item *
493              
494             Consider extensions of the L method to handle more situations.
495             This may require building a shared CPAN module for doing streaming-unpack
496             along with C and other situations.
497              
498             =back
499              
500             =head1 AUTHOR
501              
502             Paul Evans
503              
504             Inspired by L by Tom Molesworth
505              
506             =cut
507              
508             0x55AA;