File Coverage

blib/lib/IO/Async/Channel.pm
Criterion Covered Total %
statement 156 168 92.8
branch 56 78 71.7
condition 31 48 64.5
subroutine 32 36 88.8
pod 6 11 54.5
total 281 341 82.4


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2011-2021 -- leonerd@leonerd.org.uk
5              
6             package IO::Async::Channel;
7              
8 13     13   76483 use strict;
  13         28  
  13         422  
9 13     13   65 use warnings;
  13         26  
  13         583  
10 13     13   75 use base qw( IO::Async::Notifier );
  13         27  
  13         2213  
11              
12             our $VERSION = '0.79';
13              
14 13     13   98 use Carp;
  13         29  
  13         740  
15              
16 13     13   6340 use IO::Async::Stream;
  13         53  
  13         31624  
17              
18             =head1 NAME
19              
20             C - pass values into or out from an L
21              
22             =head1 DESCRIPTION
23              
24             A C object allows Perl values to be passed into or out of
25             an L. It is intended to be used primarily with a Routine
26             object rather than independently. For more detail and examples on how to use
27             this object see also the documentation for L.
28              
29             A Channel object is shared between the main process of the program and the
30             process running within the Routine. In the main process it will be used in
31             asynchronous mode, and in the Routine process it will be used in synchronous
32             mode. In asynchronous mode all methods return immediately and use
33             L-style futures or callback functions. In synchronous within the
34             Routine process the methods block until they are ready and may be used for
35             flow-control within the routine. Alternatively, a Channel may be shared
36             between two different Routine objects, and not used directly by the
37             controlling program.
38              
39             The channel itself represents a FIFO of Perl reference values. New values may
40             be put into the channel by the C method in either mode. Values may be
41             retrieved from it by the C method. Values inserted into the Channel are
42             snapshot by the C method. Any changes to referred variables will not be
43             observed by the other end of the Channel after the C method returns.
44              
45             =head1 PARAMETERS
46              
47             The following named parameters may be passed to C or C:
48              
49             =head2 codec => STR
50              
51             Gives the name of the encoding method used to represent values over the
52             channel.
53              
54             This can be set to C to use the core L module. As this
55             only supports references, to pass a single scalar value, C a SCALAR
56             reference to it, and dereference the result of C.
57              
58             If the L and L modules are installed, this
59             can be set to C instead, and will use those to perform the encoding
60             and decoding. This optional dependency may give higher performance than using
61             C. If these modules are available, then this option is picked by
62             default.
63              
64             =cut
65              
66             =head1 CONSTRUCTOR
67              
68             =cut
69              
70             =head2 new
71              
72             $channel = IO::Async::Channel->new
73              
74             Returns a new C object. This object reference itself
75             should be shared by both sides of a Ced process. After C the
76             two C methods may be used to configure the object for operation on
77             either end.
78              
79             While this object does in fact inherit from L, it should
80             not be added to a Loop object directly; event management will be handled by
81             its containing L object.
82              
83             =cut
84              
85             # Undocumented convenience constructors for running IaRoutine in 'spawn' mode
86             sub new_sync
87             {
88 0     0 0 0 my $class = shift;
89 0         0 my ( $fd ) = @_;
90              
91 0         0 my $self = $class->new;
92 0         0 $self->setup_sync_mode( $fd );
93 0         0 return $self;
94             }
95              
96 0     0 0 0 sub new_stdin { shift->new_sync( \*STDIN ); }
97 0     0 0 0 sub new_stdout { shift->new_sync( \*STDOUT ); }
98              
99             sub DESTROY
100             {
101 126     126   17711 my $self = shift;
102 126         250 eval { $self->close }; # ignore any error
  126         398  
103             }
104              
105             =head1 METHODS
106              
107             The following methods documented with a trailing call to C<< ->get >> return
108             L instances.
109              
110             =cut
111              
112             =head2 configure
113              
114             $channel->configure( %params )
115              
116             Similar to the standard C method on L, this is
117             used to change details of the Channel's operation.
118              
119             =over 4
120              
121             =item on_recv => CODE
122              
123             May only be set on an async mode channel. If present, will be invoked whenever
124             a new value is received, rather than using the C method.
125              
126             $on_recv->( $channel, $data )
127              
128             =item on_eof => CODE
129              
130             May only be set on an async mode channel. If present, will be invoked when the
131             channel gets closed by the peer.
132              
133             $on_eof->( $channel )
134              
135             =back
136              
137             =cut
138              
139             my $DEFAULT_CODEC;
140             sub _default_codec
141             {
142 153   66 153   943 $DEFAULT_CODEC ||= do {
143 8         16 my $HAVE_SEREAL = defined eval {
144 8         53 require Sereal::Encoder; require Sereal::Decoder };
  8         40  
145 8 50       71 $HAVE_SEREAL ? "Sereal" : "Storable";
146             };
147             }
148              
149             sub _init
150             {
151 148     148   338 my $self = shift;
152 148         266 my ( $params ) = @_;
153              
154 148 100       573 defined $params->{codec} or $params->{codec} = _default_codec;
155              
156 148         699 $self->SUPER::_init( $params );
157             }
158              
159             sub configure
160             {
161 149     149 1 419 my $self = shift;
162 149         420 my %params = @_;
163              
164 149         364 foreach (qw( on_recv on_eof )) {
165 298 100       695 next unless exists $params{$_};
166 2 50 33     12 $self->{mode} and $self->{mode} eq "async" or
167             croak "Can only configure $_ in async mode";
168              
169 2         5 $self->{$_} = delete $params{$_};
170 2         7 $self->_build_stream;
171             }
172              
173 149 100       417 if( my $codec = delete $params{codec} ) {
174 148   33     984 @{ $self }{qw( encode decode )} = (
  148         669  
175             $self->can( "_make_codec_$codec" ) or croak "Unrecognised codec name '$codec'"
176             )->();
177             }
178              
179 149         625 $self->SUPER::configure( %params );
180             }
181              
182             sub _make_codec_Storable
183             {
184 0     0   0 require Storable;
185              
186             return
187 0         0 \&Storable::freeze,
188             \&Storable::thaw;
189             }
190              
191             sub _make_codec_Sereal
192             {
193 154     154   933 require Sereal::Encoder;
194 154         472 require Sereal::Decoder;
195              
196 154         262 my $encoder;
197             my $decoder;
198              
199             # "thread safety" to Sereal::{Encoder,Decoder} means that the variables get
200             # reset to undef in new threads. We should defend against that.
201              
202             return
203 113   66 113   2539 sub { ( $encoder ||= Sereal::Encoder->new )->encode( $_[0] ) },
204 154   66 110   1497 sub { ( $decoder ||= Sereal::Decoder->new )->decode( $_[0] ) };
  110         3767  
205             }
206              
207             =head2 send
208              
209             $channel->send( $data )
210              
211             Pushes the data stored in the given Perl reference into the FIFO of the
212             Channel, where it can be received by the other end. When called on a
213             synchronous mode Channel this method may block if a C call on the
214             underlying filehandle blocks. When called on an asynchronous mode channel this
215             method will not block.
216              
217             =cut
218              
219             my %SENDMETHODS;
220             sub send
221             {
222 21     21 1 704 my $self = shift;
223 21         94 my ( $data ) = @_;
224              
225 21 50       203 defined( my $mode = $self->{mode} ) or die "Cannot ->send without being set up";
226              
227 21 50 66     398 my $code = ( $SENDMETHODS{$mode} ||= $self->can( "_send_$mode" ) )
228             or die "IO::Async::Channel cannot send in unrecognised mode '$mode'";
229              
230 21         159 $self->$code( $data );
231             }
232              
233             *_send_sync = *_send_async = sub {
234 21     21   94 my ( $self, $data ) = @_;
235 21         139 $self->send_encoded( $self->{encode}->( $data ) );
236             };
237              
238             =head2 send_encoded
239              
240             $channel->send_encoded( $record )
241              
242             A variant of the C method; this method pushes the byte record given.
243             This should be the result of a call to C.
244              
245             =cut
246              
247             sub send_encoded
248             {
249 111     111 1 223 my $self = shift;
250 111         245 my ( $record ) = @_;
251              
252 111         638 my $bytes = pack( "I", length $record ) . $record;
253              
254 111 50       355 defined $self->{mode} or die "Cannot ->send without being set up";
255              
256 111 100       451 return $self->_sendbytes_sync( $bytes ) if $self->{mode} eq "sync";
257 101 50       837 return $self->_sendbytes_async( $bytes ) if $self->{mode} eq "async";
258             }
259              
260             =head2 encode
261              
262             $record = $channel->encode( $data )
263              
264             Takes a Perl reference and returns a serialised string that can be passed to
265             C. The following two forms are equivalent
266              
267             $channel->send( $data )
268             $channel->send_encoded( $channel->encode( $data ) )
269              
270             This is provided for the use-case where data needs to be serialised into a
271             fixed string to "snapshot it" but not sent yet; the returned string can be
272             saved and sent at a later time.
273              
274             $record = IO::Async::Channel->encode( $data )
275              
276             This can also be used as a class method, in case it is inconvenient to operate
277             on a particular object instance, or when one does not exist yet. In this case
278             it will encode using whatever is the default codec for C.
279              
280             =cut
281              
282             my $default_encode;
283             sub encode
284             {
285 92     92 1 212 my $self = shift;
286 92         188 my ( $data ) = @_;
287              
288             return ( ref $self ?
289             $self->{encode} :
290 92 100 66     414 $default_encode ||= do { ( $self->can( "_make_codec_" . _default_codec )->() )[0] }
  6         50  
291             )->( $data );
292             }
293              
294             =head2 recv
295              
296             $data = $channel->recv
297              
298             When called on a synchronous mode Channel this method will block until a Perl
299             reference value is available from the other end and then return it. If the
300             Channel is closed this method will return C. Since only references may
301             be passed and all Perl references are true the truth of the result of this
302             method can be used to detect that the channel is still open and has not yet
303             been closed.
304              
305             $data = $channel->recv->get
306              
307             When called on an asynchronous mode Channel this method returns a future which
308             will eventually yield the next Perl reference value that becomes available
309             from the other end. If the Channel is closed, the future will fail with an
310             C failure.
311              
312             $channel->recv( %args )
313              
314             When not returning a future, takes the following named arguments:
315              
316             =over 8
317              
318             =item on_recv => CODE
319              
320             Called when a new Perl reference value is available. Will be passed the
321             Channel object and the reference data.
322              
323             $on_recv->( $channel, $data )
324              
325             =item on_eof => CODE
326              
327             Called if the Channel was closed before a new value was ready. Will be passed
328             the Channel object.
329              
330             $on_eof->( $channel )
331              
332             =back
333              
334             =cut
335              
336             my %RECVMETHODS;
337             sub recv
338             {
339 115     115 1 595 my $self = shift;
340              
341 115 50       642 defined( my $mode = $self->{mode} ) or die "Cannot ->recv without being set up";
342              
343 115 50 66     684 my $code = ( $RECVMETHODS{$mode} ||= $self->can( "_recv_$mode" ) )
344             or die "IO::Async::Channel cannot recv in unrecognised mode '$mode'";
345              
346 115         423 return $self->$code( @_ );
347             }
348              
349             =head2 close
350              
351             $channel->close
352              
353             Closes the channel. Causes a pending C on the other end to return undef
354             or the queued C callbacks to be invoked.
355              
356             =cut
357              
358             my %CLOSEMETHODS;
359             sub close
360             {
361 182     182 1 2453 my $self = shift;
362              
363 182 50       591 defined( my $mode = $self->{mode} ) or return;
364              
365 182 50 66     768 my $code = ( $CLOSEMETHODS{$mode} ||= $self->can( "_close_$mode" ) )
366             or die "IO::Async::Channel cannot close in unrecognised mode '$mode'";
367              
368 182         457 return $self->$code;
369             }
370              
371             # Leave this undocumented for now
372             sub setup_sync_mode
373             {
374 11     11 0 160 my $self = shift;
375 11         53 ( $self->{fh} ) = @_;
376              
377 11         40 $self->{mode} = "sync";
378              
379             # Since we're communicating binary structures and not Unicode text we need to
380             # enable binmode
381 11         38 binmode $self->{fh};
382              
383 11   66     82 defined and $_->blocking( 1 ) for $self->{read_handle}, $self->{write_handle};
384 11         198 $self->{fh}->autoflush(1);
385             }
386              
387             sub _read_exactly
388             {
389 12     12   23 $_[1] = "";
390              
391 12         40 while( length $_[1] < $_[2] ) {
392 12         741 my $n = read( $_[0], $_[1], $_[2]-length $_[1], length $_[1] );
393 12 50       49 defined $n or return undef;
394 12 100       39 $n or return "";
395             }
396              
397 10         19 return $_[2];
398             }
399              
400             sub _recv_sync
401             {
402 7     7   12 my $self = shift;
403              
404 7         19 my $n = _read_exactly( $self->{fh}, my $lenbuffer, 4 );
405 7 50       19 defined $n or die "Cannot read - $!";
406 7 100       62 length $n or return undef;
407              
408 5         39 my $len = unpack( "I", $lenbuffer );
409              
410 5         20 $n = _read_exactly( $self->{fh}, my $record, $len );
411 5 50       14 defined $n or die "Cannot read - $!";
412 5 50       23 length $n or return undef;
413              
414 5         21 return $self->{decode}->( $record );
415             }
416              
417             sub _sendbytes_sync
418             {
419 10     10   17 my $self = shift;
420 10         23 my ( $bytes ) = @_;
421 10         39 $self->{fh}->print( $bytes );
422             }
423              
424             sub _close_sync
425             {
426 14     14   21 my $self = shift;
427 14         85 $self->{fh}->close;
428             }
429              
430             # Leave this undocumented for now
431             sub setup_async_mode
432             {
433 140     140 0 832 my $self = shift;
434 140         344 my %args = @_;
435              
436 140   66     733 exists $args{$_} and $self->{$_} = delete $args{$_} for qw( read_handle write_handle );
437              
438 140 50       361 keys %args and croak "Unrecognised keys for setup_async_mode: " . join( ", ", keys %args );
439              
440 140   66     2089 defined and $_->blocking( 0 ) for $self->{read_handle}, $self->{write_handle};
441 140         743 $self->{mode} = "async";
442             }
443              
444             sub _build_stream
445             {
446 211     211   381 my $self = shift;
447 211   66     2012 return $self->{stream} ||= do {
448 128         427 $self->{on_result_queue} = [];
449              
450             my $stream = IO::Async::Stream->new(
451             read_handle => $self->{read_handle},
452             write_handle => $self->{write_handle},
453 128         666 autoflush => 1,
454             on_read => $self->_capture_weakself( '_on_stream_read' )
455             );
456              
457 128         577 $self->add_child( $stream );
458              
459 128         1350 $stream;
460             };
461             }
462              
463             sub _sendbytes_async
464             {
465 101     101   221 my $self = shift;
466 101         215 my ( $bytes ) = @_;
467 101         270 $self->_build_stream->write( $bytes );
468             }
469              
470             sub _recv_async
471             {
472 108     108   227 my $self = shift;
473 108         241 my %args = @_;
474              
475 108         253 my $on_recv = $args{on_recv};
476 108         194 my $on_eof = $args{on_eof};
477              
478 108         245 my $stream = $self->_build_stream;
479              
480 108         200 my $f;
481 108 100       417 $f = $stream->loop->new_future unless !defined wantarray;
482              
483 108         1599 push @{ $self->{on_result_queue} }, sub {
484 108     108   690 my ( $self, $type, $result ) = @_;
485 108 100       381 if( $type eq "recv" ) {
486 104 100 100     1209 $f->done( $result ) if $f and !$f->is_cancelled;
487 104 100       7020 $on_recv->( $self, $result ) if $on_recv;
488             }
489             else {
490 4 100 66     104 $f->fail( "EOF waiting for Channel recv", eof => ) if $f and !$f->is_cancelled;
491 4 100       247 $on_eof->( $self ) if $on_eof;
492             }
493 108         317 };
494              
495 108         2418 return $f;
496             }
497              
498             sub _close_async
499             {
500 168     168   277 my $self = shift;
501 168 100       437 if( my $stream = $self->{stream} ) {
502 163         651 $stream->close_when_empty;
503             }
504             else {
505 5   66     126 $_ and $_->close for $self->{read_handle}, $self->{write_handle};
506             }
507              
508 168         4825 undef $_ for $self->{read_handle}, $self->{write_handle};
509             }
510              
511             sub _on_stream_read
512             {
513 112 50   112   470 my $self = shift or return;
514 112         352 my ( $stream, $buffref, $eof ) = @_;
515              
516 112 100       300 if( $eof ) {
517 7         84 while( my $on_result = shift @{ $self->{on_result_queue} } ) {
  11         107  
518 4         49 $on_result->( $self, eof => );
519             }
520 7 100       49 $self->{on_eof}->( $self ) if $self->{on_eof};
521 7         53 return;
522             }
523              
524 105 50       320 return 0 unless length( $$buffref ) >= 4;
525 105         370 my $len = unpack( "I", $$buffref );
526 105 50       311 return 0 unless length( $$buffref ) >= 4 + $len;
527              
528 105         491 my $record = $self->{decode}->( substr( $$buffref, 4, $len ) );
529 105         465 substr( $$buffref, 0, 4 + $len ) = "";
530              
531 105 100       156 if( my $on_result = shift @{ $self->{on_result_queue} } ) {
  105         371  
532 104         305 $on_result->( $self, recv => $record );
533             }
534             else {
535 1         5 $self->{on_recv}->( $self, $record );
536             }
537              
538 105         2253 return 1;
539             }
540              
541             sub _extract_read_handle
542             {
543 66     66   171 my $self = shift;
544              
545 66 100       391 return undef if !$self->{mode};
546              
547 1 50       36 croak "Cannot extract filehandle" if $self->{mode} ne "async";
548 1         18 $self->{mode} = "dead";
549              
550 1         14 return $self->{read_handle};
551             }
552              
553             sub _extract_write_handle
554             {
555 67     67   154 my $self = shift;
556              
557 67 50       329 return undef if !$self->{mode};
558              
559 0 0         croak "Cannot extract filehandle" if $self->{mode} ne "async";
560 0           $self->{mode} = "dead";
561              
562 0           return $self->{write_handle};
563             }
564              
565             =head1 AUTHOR
566              
567             Paul Evans
568              
569             =cut
570              
571             0x55AA;