File Coverage

blib/lib/IO/Async/Channel.pm
Criterion Covered Total %
statement 162 167 97.0
branch 54 76 71.0
condition 33 51 64.7
subroutine 35 36 97.2
pod 6 11 54.5
total 290 341 85.0


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2011-2024 -- leonerd@leonerd.org.uk
5              
6             package IO::Async::Channel 0.805;
7              
8 15     15   464453 use v5.14;
  15         59  
9 15     15   103 use warnings;
  15         28  
  15         856  
10 15     15   89 use base qw( IO::Async::Notifier );
  15         41  
  15         4337  
11              
12 15     15   88 use Carp;
  15         36  
  15         1042  
13              
14 15     15   7701 use IO::Async::Stream;
  15         54  
  15         38868  
15              
16             =head1 NAME
17              
18             C - pass values into or out from an L
19              
20             =head1 DESCRIPTION
21              
22             =for highlighter language=perl
23              
24             A C object allows Perl values to be passed into or out of
25             an L. It is intended to be used primarily with a Routine
26             object rather than independently. For more detail and examples on how to use
27             this object see also the documentation for L.
28              
29             A Channel object is shared between the main process of the program and the
30             process running within the Routine. In the main process it will be used in
31             asynchronous mode, and in the Routine process it will be used in synchronous
32             mode. In asynchronous mode all methods return immediately and use
33             L-style futures or callback functions. In synchronous within the
34             Routine process the methods block until they are ready and may be used for
35             flow-control within the routine. Alternatively, a Channel may be shared
36             between two different Routine objects, and not used directly by the
37             controlling program.
38              
39             The channel itself represents a FIFO of Perl reference values. New values may
40             be put into the channel by the C method in either mode. Values may be
41             retrieved from it by the C method. Values inserted into the Channel are
42             snapshot by the C method. Any changes to referred variables will not be
43             observed by the other end of the Channel after the C method returns.
44              
45             =head1 PARAMETERS
46              
47             The following named parameters may be passed to C or C:
48              
49             =head2 codec => STR
50              
51             Gives the name of the encoding method used to represent values over the
52             channel.
53              
54             This can be set to C to use the core L module. As this
55             only supports references, to pass a single scalar value, C a SCALAR
56             reference to it, and dereference the result of C.
57              
58             If the L and L modules are installed, this
59             can be set to C instead, and will use those to perform the encoding
60             and decoding. This optional dependency may give higher performance than using
61             C. If these modules are available, then this option is picked by
62             default.
63              
64             =cut
65              
66             =head1 CONSTRUCTOR
67              
68             =cut
69              
70             =head2 new
71              
72             $channel = IO::Async::Channel->new;
73              
74             Returns a new C object. This object reference itself
75             should be shared by both sides of a Ced process. After C the
76             two C methods may be used to configure the object for operation on
77             either end.
78              
79             While this object does in fact inherit from L, it should
80             not be added to a Loop object directly; event management will be handled by
81             its containing L object.
82              
83             =cut
84              
85             # Undocumented convenience constructors for running IaRoutine in 'spawn' mode
86             sub new_sync
87             {
88 4     4 0 10 my $class = shift;
89 4         10 my ( $fd ) = @_;
90              
91 4         42 my $self = $class->new;
92 4         14 $self->setup_sync_mode( $fd );
93 4         228 return $self;
94             }
95              
96 2     2 0 32 sub new_stdin { shift->new_sync( \*STDIN ); }
97 2     2 0 9 sub new_stdout { shift->new_sync( \*STDOUT ); }
98              
99             sub DESTROY
100             {
101 130     130   33543 my $self = shift;
102 130         212 eval { $self->close }; # ignore any error
  130         395  
103             }
104              
105             =head1 METHODS
106              
107             The following methods documented in C expressions return L
108             instances.
109              
110             =cut
111              
112             =head2 configure
113              
114             $channel->configure( %params );
115              
116             Similar to the standard C method on L, this is
117             used to change details of the Channel's operation.
118              
119             =over 4
120              
121             =item on_recv => CODE
122              
123             May only be set on an async mode channel. If present, will be invoked whenever
124             a new value is received, rather than using the C method.
125              
126             $on_recv->( $channel, $data );
127              
128             =item on_eof => CODE
129              
130             May only be set on an async mode channel. If present, will be invoked when the
131             channel gets closed by the peer.
132              
133             $on_eof->( $channel );
134              
135             =back
136              
137             =cut
138              
139             my $DEFAULT_CODEC;
140             sub _default_codec
141             {
142 157   66 157   894 $DEFAULT_CODEC ||= do {
143 10         19 my $HAVE_SEREAL = defined eval {
144 10         162 require Sereal::Encoder; require Sereal::Decoder };
  10         49  
145 10 50       104 $HAVE_SEREAL ? "Sereal" : "Storable";
146             };
147             }
148              
149             sub _init
150             {
151 152     152   268 my $self = shift;
152 152         271 my ( $params ) = @_;
153              
154 152   66     1664 $params->{codec} //= _default_codec;
155              
156 152         576 $self->SUPER::_init( $params );
157             }
158              
159             sub configure
160             {
161 153     153 1 215 my $self = shift;
162 153         334 my %params = @_;
163              
164 153         374 foreach (qw( on_recv on_eof )) {
165 306 100       8457 next unless exists $params{$_};
166 2 50 33     8 $self->{mode} and $self->{mode} eq "async" or
167             croak "Can only configure $_ in async mode";
168              
169 2         4 $self->{$_} = delete $params{$_};
170 2         4 $self->_build_stream;
171             }
172              
173 153 100       459 if( my $codec = delete $params{codec} ) {
174 152   33     1006 @{ $self }{qw( encode decode )} = (
  152         560  
175             $self->can( "_make_codec_$codec" ) or croak "Unrecognised codec name '$codec'"
176             )->();
177             }
178              
179 153         550 $self->SUPER::configure( %params );
180             }
181              
182             sub _make_codec_Storable
183             {
184 0     0   0 require Storable;
185              
186             return
187 0         0 \&Storable::freeze,
188             \&Storable::thaw;
189             }
190              
191             sub _make_codec_Sereal
192             {
193 158     158   13187 require Sereal::Encoder;
194 158         581 require Sereal::Decoder;
195              
196 158         230 my $encoder;
197             my $decoder;
198              
199             # "thread safety" to Sereal::{Encoder,Decoder} means that the variables get
200             # reset to undef in new threads. We should defend against that.
201              
202             return
203 113   66 113   13109 sub { ( $encoder ||= Sereal::Encoder->new )->encode( $_[0] ) },
204 158   66 111   1064 sub { ( $decoder ||= Sereal::Decoder->new )->decode( $_[0] ) };
  111         4059  
205             }
206              
207             =head2 send
208              
209             $channel->send( $data );
210              
211             Pushes the data stored in the given Perl reference into the FIFO of the
212             Channel, where it can be received by the other end. When called on a
213             synchronous mode Channel this method may block if a C call on the
214             underlying filehandle blocks. When called on an asynchronous mode channel this
215             method will not block.
216              
217             =cut
218              
219             my %SENDMETHODS;
220             sub send
221             {
222 23     23 1 2836 my $self = shift;
223 23         62 my ( $data ) = @_;
224              
225 23 50       197 defined( my $mode = $self->{mode} ) or die "Cannot ->send without being set up";
226              
227 23 50 66     591 my $code = ( $SENDMETHODS{$mode} ||= $self->can( "_send_$mode" ) )
228             or die "IO::Async::Channel cannot send in unrecognised mode '$mode'";
229              
230 23         332 $self->$code( $data );
231             }
232              
233             *_send_sync = *_send_async = sub {
234 23     23   97 my ( $self, $data ) = @_;
235 23         142 $self->send_encoded( $self->{encode}->( $data ) );
236             };
237              
238             =head2 send_encoded
239              
240             $channel->send_encoded( $record );
241              
242             A variant of the C method; this method pushes the byte record given.
243             This should be the result of a call to C.
244              
245             =cut
246              
247             sub send_encoded
248             {
249 111     111 1 285 my $self = shift;
250 111         240 my ( $record ) = @_;
251              
252 111         801 my $bytes = pack( "I", length $record ) . $record;
253              
254 111 50       410 defined $self->{mode} or die "Cannot ->send without being set up";
255              
256 111 100       350 return $self->_sendbytes_sync( $bytes ) if $self->{mode} eq "sync";
257 99 50       995 return $self->_sendbytes_async( $bytes ) if $self->{mode} eq "async";
258             }
259              
260             =head2 encode
261              
262             $record = $channel->encode( $data );
263              
264             Takes a Perl reference and returns a serialised string that can be passed to
265             C. The following two forms are equivalent
266              
267             $channel->send( $data );
268             $channel->send_encoded( $channel->encode( $data ) );
269              
270             This is provided for the use-case where data needs to be serialised into a
271             fixed string to "snapshot it" but not sent yet; the returned string can be
272             saved and sent at a later time.
273              
274             $record = IO::Async::Channel->encode( $data );
275              
276             This can also be used as a class method, in case it is inconvenient to operate
277             on a particular object instance, or when one does not exist yet. In this case
278             it will encode using whatever is the default codec for C.
279              
280             =cut
281              
282             my $default_encode;
283             sub encode
284             {
285 90     90 1 276 my $self = shift;
286 90         208 my ( $data ) = @_;
287              
288             return ( ref $self ?
289             $self->{encode} :
290 90 100 66     671 $default_encode ||= do { ( $self->can( "_make_codec_" . _default_codec )->() )[0] }
  6         122  
291             )->( $data );
292             }
293              
294             =head2 recv
295              
296             $data = $channel->recv;
297              
298             When called on a synchronous mode Channel this method will block until a Perl
299             reference value is available from the other end and then return it. If the
300             Channel is closed this method will return C. Since only references may
301             be passed and all Perl references are true the truth of the result of this
302             method can be used to detect that the channel is still open and has not yet
303             been closed.
304              
305             $data = await $channel->recv;
306              
307             When called on an asynchronous mode Channel this method returns a future which
308             will eventually yield the next Perl reference value that becomes available
309             from the other end. If the Channel is closed, the future will fail with an
310             C failure.
311              
312             $channel->recv( %args );
313              
314             When not returning a future, takes the following named arguments:
315              
316             =over 8
317              
318             =item on_recv => CODE
319              
320             Called when a new Perl reference value is available. Will be passed the
321             Channel object and the reference data.
322              
323             $on_recv->( $channel, $data );
324              
325             =item on_eof => CODE
326              
327             Called if the Channel was closed before a new value was ready. Will be passed
328             the Channel object.
329              
330             $on_eof->( $channel );
331              
332             =back
333              
334             =cut
335              
336             my %RECVMETHODS;
337             sub recv
338             {
339 118     118 1 749 my $self = shift;
340              
341 118 50       595 defined( my $mode = $self->{mode} ) or die "Cannot ->recv without being set up";
342              
343 118 50 66     792 my $code = ( $RECVMETHODS{$mode} ||= $self->can( "_recv_$mode" ) )
344             or die "IO::Async::Channel cannot recv in unrecognised mode '$mode'";
345              
346 118         539 return $self->$code( @_ );
347             }
348              
349             =head2 close
350              
351             $channel->close;
352              
353             Closes the channel. Causes a pending C on the other end to return undef
354             or the queued C callbacks to be invoked.
355              
356             =cut
357              
358             my %CLOSEMETHODS;
359             sub close
360             {
361 186     186 1 2861 my $self = shift;
362              
363 186 50       2044 defined( my $mode = $self->{mode} ) or return;
364              
365 186 50 66     1252 my $code = ( $CLOSEMETHODS{$mode} ||= $self->can( "_close_$mode" ) )
366             or die "IO::Async::Channel cannot close in unrecognised mode '$mode'";
367              
368 186         452 return $self->$code;
369             }
370              
371             # Leave this undocumented for now
372             sub setup_sync_mode
373             {
374 15     15 0 111 my $self = shift;
375 15         65 ( $self->{fh} ) = @_;
376              
377 15         48 $self->{mode} = "sync";
378              
379             # Since we're communicating binary structures and not Unicode text we need to
380             # enable binmode
381 15         40 binmode $self->{fh};
382              
383 15   66     118 defined and $_->blocking( 1 ) for $self->{read_handle}, $self->{write_handle};
384 15         146 $self->{fh}->autoflush(1);
385             }
386              
387             sub _read_exactly
388             {
389 20     20   43 $_[1] = "";
390              
391 20         45 while( length $_[1] < $_[2] ) {
392 20         7784397 my $n = read( $_[0], $_[1], $_[2]-length $_[1], length $_[1] );
393 20 50       85 defined $n or return undef;
394 20 100       81 $n or return "";
395             }
396              
397 16         27 return $_[2];
398             }
399              
400             sub _recv_sync
401             {
402 12     12   21 my $self = shift;
403              
404 12         64 my $n = _read_exactly( $self->{fh}, my $lenbuffer, 4 );
405 12 50       30 defined $n or die "Cannot read - $!";
406 12 100       73 length $n or return undef;
407              
408 8         71 my $len = unpack( "I", $lenbuffer );
409              
410 8         24 $n = _read_exactly( $self->{fh}, my $record, $len );
411 8 50       52 defined $n or die "Cannot read - $!";
412 8 50       23 length $n or return undef;
413              
414 8         24 return $self->{decode}->( $record );
415             }
416              
417             sub _sendbytes_sync
418             {
419 12     12   15 my $self = shift;
420 12         17 my ( $bytes ) = @_;
421 12         60 $self->{fh}->print( $bytes );
422             }
423              
424             sub _close_sync
425             {
426 18     18   72 my $self = shift;
427 18         93 $self->{fh}->close;
428             }
429              
430             # Leave this undocumented for now
431             sub setup_async_mode
432             {
433 140     140 0 993 my $self = shift;
434 140         2997 my %args = @_;
435              
436 140   66     683 exists $args{$_} and $self->{$_} = delete $args{$_} for qw( read_handle write_handle );
437              
438 140 50       355 keys %args and croak "Unrecognised keys for setup_async_mode: " . join( ", ", keys %args );
439              
440 140   66     1801 defined and $_->blocking( 0 ) for $self->{read_handle}, $self->{write_handle};
441 140         589 $self->{mode} = "async";
442             }
443              
444             sub _build_stream
445             {
446 207     207   334 my $self = shift;
447 207   66     2984 return $self->{stream} ||= do {
448 128         926 $self->{on_result_queue} = [];
449              
450             my $stream = IO::Async::Stream->new(
451             read_handle => $self->{read_handle},
452             write_handle => $self->{write_handle},
453 128         563 autoflush => 1,
454             on_read => $self->_capture_weakself( '_on_stream_read' )
455             );
456              
457 128         963 $self->add_child( $stream );
458              
459 128         1700 $stream;
460             };
461             }
462              
463             sub _sendbytes_async
464             {
465 99     99   241 my $self = shift;
466 99         390 my ( $bytes ) = @_;
467 99         392 $self->_build_stream->write( $bytes );
468             }
469              
470             sub _recv_async
471             {
472 106     106   206 my $self = shift;
473 106         246 my %args = @_;
474              
475 106         289 my $on_recv = $args{on_recv};
476 106         199 my $on_eof = $args{on_eof};
477              
478 106         340 my $stream = $self->_build_stream;
479              
480 106         203 my $f;
481 106 100       527 $f = $stream->loop->new_future unless !defined wantarray;
482              
483 106         2467 push @{ $self->{on_result_queue} }, sub {
484 106     106   408 my ( $self, $type, $result ) = @_;
485 106 100       316 if( $type eq "recv" ) {
486 102 100 100     1305 $f->done( $result ) if $f and !$f->is_cancelled;
487 102 100       18189 $on_recv->( $self, $result ) if $on_recv;
488             }
489             else {
490 4 100 66     97 $f->fail( "EOF waiting for Channel recv", eof => ) if $f and !$f->is_cancelled;
491 4 100       218 $on_eof->( $self ) if $on_eof;
492             }
493 106         187 };
494              
495 106         2642 return $f;
496             }
497              
498             sub _close_async
499             {
500 168     168   204 my $self = shift;
501 168 100       428 if( my $stream = $self->{stream} ) {
502 163         733 $stream->close_when_empty;
503             }
504             else {
505 5   66     114 $_ and $_->close for $self->{read_handle}, $self->{write_handle};
506             }
507              
508 168         5261 undef $_ for $self->{read_handle}, $self->{write_handle};
509             }
510              
511             sub _on_stream_read
512             {
513 110 50   110   426 my $self = shift or return;
514 110         300 my ( $stream, $buffref, $eof ) = @_;
515              
516 110 100       300 if( $eof ) {
517 7         45 while( my $on_result = shift @{ $self->{on_result_queue} } ) {
  11         104  
518 4         30 $on_result->( $self, eof => );
519             }
520 7 100       71 $self->{on_eof}->( $self ) if $self->{on_eof};
521 7         46 return;
522             }
523              
524 103 50       281 return 0 unless length( $$buffref ) >= 4;
525 103         412 my $len = unpack( "I", $$buffref );
526 103 50       12412 return 0 unless length( $$buffref ) >= 4 + $len;
527              
528 103         647 my $record = $self->{decode}->( substr( $$buffref, 4, $len ) );
529 103         427 substr( $$buffref, 0, 4 + $len ) = "";
530              
531 103 100       175 if( my $on_result = shift @{ $self->{on_result_queue} } ) {
  103         401  
532 102         325 $on_result->( $self, recv => $record );
533             }
534             else {
535 1         3 $self->{on_recv}->( $self, $record );
536             }
537              
538 103         2018 return 1;
539             }
540              
541             sub _extract_read_handle
542             {
543 66     66   95 my $self = shift;
544              
545 66 100       386 return undef if !$self->{mode};
546              
547 1 50       16 croak "Cannot extract filehandle" if $self->{mode} ne "async";
548 1         26 $self->{mode} = "dead";
549              
550 1         14 return $self->{read_handle};
551             }
552              
553             sub _extract_write_handle
554             {
555 67     67   148 my $self = shift;
556              
557 67 50       351 return undef if !$self->{mode};
558              
559 0 0         croak "Cannot extract filehandle" if $self->{mode} ne "async";
560 0           $self->{mode} = "dead";
561              
562 0           return $self->{write_handle};
563             }
564              
565             =head1 AUTHOR
566              
567             Paul Evans
568              
569             =cut
570              
571             0x55AA;