| line | stmt | bran | cond | sub | pod | time | code | 
| 1 |  |  |  |  |  |  | #  You may distribute under the terms of either the GNU General Public License | 
| 2 |  |  |  |  |  |  | #  or the Artistic License (the same terms as Perl itself) | 
| 3 |  |  |  |  |  |  | # | 
| 4 |  |  |  |  |  |  | #  (C) Paul Evans, 2011-2021 -- leonerd@leonerd.org.uk | 
| 5 |  |  |  |  |  |  |  | 
| 6 |  |  |  |  |  |  | package IO::Async::Channel; | 
| 7 |  |  |  |  |  |  |  | 
| 8 | 13 |  |  | 13 |  | 58485 | use strict; | 
|  | 13 |  |  |  |  | 26 |  | 
|  | 13 |  |  |  |  | 432 |  | 
| 9 | 13 |  |  | 13 |  | 70 | use warnings; | 
|  | 13 |  |  |  |  | 111 |  | 
|  | 13 |  |  |  |  | 514 |  | 
| 10 | 13 |  |  | 13 |  | 76 | use base qw( IO::Async::Notifier ); | 
|  | 13 |  |  |  |  | 26 |  | 
|  | 13 |  |  |  |  | 1894 |  | 
| 11 |  |  |  |  |  |  |  | 
| 12 |  |  |  |  |  |  | our $VERSION = '0.801'; | 
| 13 |  |  |  |  |  |  |  | 
| 14 | 13 |  |  | 13 |  | 91 | use Carp; | 
|  | 13 |  |  |  |  | 27 |  | 
|  | 13 |  |  |  |  | 707 |  | 
| 15 |  |  |  |  |  |  |  | 
| 16 | 13 |  |  | 13 |  | 5929 | use IO::Async::Stream; | 
|  | 13 |  |  |  |  | 37 |  | 
|  | 13 |  |  |  |  | 33494 |  | 
| 17 |  |  |  |  |  |  |  | 
| 18 |  |  |  |  |  |  | =head1 NAME | 
| 19 |  |  |  |  |  |  |  | 
| 20 |  |  |  |  |  |  | C - pass values into or out from an L | 
| 21 |  |  |  |  |  |  |  | 
| 22 |  |  |  |  |  |  | =head1 DESCRIPTION | 
| 23 |  |  |  |  |  |  |  | 
| 24 |  |  |  |  |  |  | A C object allows Perl values to be passed into or out of | 
| 25 |  |  |  |  |  |  | an L. It is intended to be used primarily with a Routine | 
| 26 |  |  |  |  |  |  | object rather than independently. For more detail and examples on how to use | 
| 27 |  |  |  |  |  |  | this object see also the documentation for L. | 
| 28 |  |  |  |  |  |  |  | 
| 29 |  |  |  |  |  |  | A Channel object is shared between the main process of the program and the | 
| 30 |  |  |  |  |  |  | process running within the Routine. In the main process it will be used in | 
| 31 |  |  |  |  |  |  | asynchronous mode, and in the Routine process it will be used in synchronous | 
| 32 |  |  |  |  |  |  | mode. In asynchronous mode all methods return immediately and use | 
| 33 |  |  |  |  |  |  | L-style futures or callback functions. In synchronous within the | 
| 34 |  |  |  |  |  |  | Routine process the methods block until they are ready and may be used for | 
| 35 |  |  |  |  |  |  | flow-control within the routine. Alternatively, a Channel may be shared | 
| 36 |  |  |  |  |  |  | between two different Routine objects, and not used directly by the | 
| 37 |  |  |  |  |  |  | controlling program. | 
| 38 |  |  |  |  |  |  |  | 
| 39 |  |  |  |  |  |  | The channel itself represents a FIFO of Perl reference values. New values may | 
| 40 |  |  |  |  |  |  | be put into the channel by the C method in either mode. Values may be | 
| 41 |  |  |  |  |  |  | retrieved from it by the C method. Values inserted into the Channel are | 
| 42 |  |  |  |  |  |  | snapshot by the C method. Any changes to referred variables will not be | 
| 43 |  |  |  |  |  |  | observed by the other end of the Channel after the C method returns. | 
| 44 |  |  |  |  |  |  |  | 
| 45 |  |  |  |  |  |  | =head1 PARAMETERS | 
| 46 |  |  |  |  |  |  |  | 
| 47 |  |  |  |  |  |  | The following named parameters may be passed to C or C: | 
| 48 |  |  |  |  |  |  |  | 
| 49 |  |  |  |  |  |  | =head2 codec => STR | 
| 50 |  |  |  |  |  |  |  | 
| 51 |  |  |  |  |  |  | Gives the name of the encoding method used to represent values over the | 
| 52 |  |  |  |  |  |  | channel. | 
| 53 |  |  |  |  |  |  |  | 
| 54 |  |  |  |  |  |  | This can be set to C to use the core L module. As this | 
| 55 |  |  |  |  |  |  | only supports references, to pass a single scalar value, C a SCALAR | 
| 56 |  |  |  |  |  |  | reference to it, and dereference the result of C. | 
| 57 |  |  |  |  |  |  |  | 
| 58 |  |  |  |  |  |  | If the L and L modules are installed, this | 
| 59 |  |  |  |  |  |  | can be set to C instead, and will use those to perform the encoding | 
| 60 |  |  |  |  |  |  | and decoding. This optional dependency may give higher performance than using | 
| 61 |  |  |  |  |  |  | C. If these modules are available, then this option is picked by | 
| 62 |  |  |  |  |  |  | default. | 
| 63 |  |  |  |  |  |  |  | 
| 64 |  |  |  |  |  |  | =cut | 
| 65 |  |  |  |  |  |  |  | 
| 66 |  |  |  |  |  |  | =head1 CONSTRUCTOR | 
| 67 |  |  |  |  |  |  |  | 
| 68 |  |  |  |  |  |  | =cut | 
| 69 |  |  |  |  |  |  |  | 
| 70 |  |  |  |  |  |  | =head2 new | 
| 71 |  |  |  |  |  |  |  | 
| 72 |  |  |  |  |  |  | $channel = IO::Async::Channel->new | 
| 73 |  |  |  |  |  |  |  | 
| 74 |  |  |  |  |  |  | Returns a new C object. This object reference itself | 
| 75 |  |  |  |  |  |  | should be shared by both sides of a Ced process. After C the | 
| 76 |  |  |  |  |  |  | two C methods may be used to configure the object for operation on | 
| 77 |  |  |  |  |  |  | either end. | 
| 78 |  |  |  |  |  |  |  | 
| 79 |  |  |  |  |  |  | While this object does in fact inherit from L, it should | 
| 80 |  |  |  |  |  |  | not be added to a Loop object directly; event management will be handled by | 
| 81 |  |  |  |  |  |  | its containing L object. | 
| 82 |  |  |  |  |  |  |  | 
| 83 |  |  |  |  |  |  | =cut | 
| 84 |  |  |  |  |  |  |  | 
| 85 |  |  |  |  |  |  | # Undocumented convenience constructors for running IaRoutine in 'spawn' mode | 
| 86 |  |  |  |  |  |  | sub new_sync | 
| 87 |  |  |  |  |  |  | { | 
| 88 | 0 |  |  | 0 | 0 | 0 | my $class = shift; | 
| 89 | 0 |  |  |  |  | 0 | my ( $fd ) = @_; | 
| 90 |  |  |  |  |  |  |  | 
| 91 | 0 |  |  |  |  | 0 | my $self = $class->new; | 
| 92 | 0 |  |  |  |  | 0 | $self->setup_sync_mode( $fd ); | 
| 93 | 0 |  |  |  |  | 0 | return $self; | 
| 94 |  |  |  |  |  |  | } | 
| 95 |  |  |  |  |  |  |  | 
| 96 | 0 |  |  | 0 | 0 | 0 | sub new_stdin  { shift->new_sync( \*STDIN  ); } | 
| 97 | 0 |  |  | 0 | 0 | 0 | sub new_stdout { shift->new_sync( \*STDOUT ); } | 
| 98 |  |  |  |  |  |  |  | 
| 99 |  |  |  |  |  |  | sub DESTROY | 
| 100 |  |  |  |  |  |  | { | 
| 101 | 126 |  |  | 126 |  | 13917 | my $self = shift; | 
| 102 | 126 |  |  |  |  | 223 | eval { $self->close }; # ignore any error | 
|  | 126 |  |  |  |  | 372 |  | 
| 103 |  |  |  |  |  |  | } | 
| 104 |  |  |  |  |  |  |  | 
| 105 |  |  |  |  |  |  | =head1 METHODS | 
| 106 |  |  |  |  |  |  |  | 
| 107 |  |  |  |  |  |  | The following methods documented with a trailing call to C<< ->get >> return | 
| 108 |  |  |  |  |  |  | L instances. | 
| 109 |  |  |  |  |  |  |  | 
| 110 |  |  |  |  |  |  | =cut | 
| 111 |  |  |  |  |  |  |  | 
| 112 |  |  |  |  |  |  | =head2 configure | 
| 113 |  |  |  |  |  |  |  | 
| 114 |  |  |  |  |  |  | $channel->configure( %params ) | 
| 115 |  |  |  |  |  |  |  | 
| 116 |  |  |  |  |  |  | Similar to the standard C method on L, this is | 
| 117 |  |  |  |  |  |  | used to change details of the Channel's operation. | 
| 118 |  |  |  |  |  |  |  | 
| 119 |  |  |  |  |  |  | =over 4 | 
| 120 |  |  |  |  |  |  |  | 
| 121 |  |  |  |  |  |  | =item on_recv => CODE | 
| 122 |  |  |  |  |  |  |  | 
| 123 |  |  |  |  |  |  | May only be set on an async mode channel. If present, will be invoked whenever | 
| 124 |  |  |  |  |  |  | a new value is received, rather than using the C method. | 
| 125 |  |  |  |  |  |  |  | 
| 126 |  |  |  |  |  |  | $on_recv->( $channel, $data ) | 
| 127 |  |  |  |  |  |  |  | 
| 128 |  |  |  |  |  |  | =item on_eof => CODE | 
| 129 |  |  |  |  |  |  |  | 
| 130 |  |  |  |  |  |  | May only be set on an async mode channel. If present, will be invoked when the | 
| 131 |  |  |  |  |  |  | channel gets closed by the peer. | 
| 132 |  |  |  |  |  |  |  | 
| 133 |  |  |  |  |  |  | $on_eof->( $channel ) | 
| 134 |  |  |  |  |  |  |  | 
| 135 |  |  |  |  |  |  | =back | 
| 136 |  |  |  |  |  |  |  | 
| 137 |  |  |  |  |  |  | =cut | 
| 138 |  |  |  |  |  |  |  | 
| 139 |  |  |  |  |  |  | my $DEFAULT_CODEC; | 
| 140 |  |  |  |  |  |  | sub _default_codec | 
| 141 |  |  |  |  |  |  | { | 
| 142 | 153 |  | 66 | 153 |  | 614 | $DEFAULT_CODEC ||= do { | 
| 143 | 8 |  |  |  |  | 14 | my $HAVE_SEREAL = defined eval { | 
| 144 | 8 |  |  |  |  | 49 | require Sereal::Encoder; require Sereal::Decoder }; | 
|  | 8 |  |  |  |  | 41 |  | 
| 145 | 8 | 50 |  |  |  | 76 | $HAVE_SEREAL ? "Sereal" : "Storable"; | 
| 146 |  |  |  |  |  |  | }; | 
| 147 |  |  |  |  |  |  | } | 
| 148 |  |  |  |  |  |  |  | 
| 149 |  |  |  |  |  |  | sub _init | 
| 150 |  |  |  |  |  |  | { | 
| 151 | 148 |  |  | 148 |  | 256 | my $self = shift; | 
| 152 | 148 |  |  |  |  | 280 | my ( $params ) = @_; | 
| 153 |  |  |  |  |  |  |  | 
| 154 | 148 | 100 |  |  |  | 538 | defined $params->{codec} or $params->{codec} = _default_codec; | 
| 155 |  |  |  |  |  |  |  | 
| 156 | 148 |  |  |  |  | 620 | $self->SUPER::_init( $params ); | 
| 157 |  |  |  |  |  |  | } | 
| 158 |  |  |  |  |  |  |  | 
| 159 |  |  |  |  |  |  | sub configure | 
| 160 |  |  |  |  |  |  | { | 
| 161 | 149 |  |  | 149 | 1 | 273 | my $self = shift; | 
| 162 | 149 |  |  |  |  | 507 | my %params = @_; | 
| 163 |  |  |  |  |  |  |  | 
| 164 | 149 |  |  |  |  | 361 | foreach (qw( on_recv on_eof )) { | 
| 165 | 298 | 100 |  |  |  | 686 | next unless exists $params{$_}; | 
| 166 | 2 | 50 | 33 |  |  | 9 | $self->{mode} and $self->{mode} eq "async" or | 
| 167 |  |  |  |  |  |  | croak "Can only configure $_ in async mode"; | 
| 168 |  |  |  |  |  |  |  | 
| 169 | 2 |  |  |  |  | 5 | $self->{$_} = delete $params{$_}; | 
| 170 | 2 |  |  |  |  | 4 | $self->_build_stream; | 
| 171 |  |  |  |  |  |  | } | 
| 172 |  |  |  |  |  |  |  | 
| 173 | 149 | 100 |  |  |  | 414 | if( my $codec = delete $params{codec} ) { | 
| 174 | 148 |  | 33 |  |  | 985 | @{ $self }{qw( encode decode )} = ( | 
|  | 148 |  |  |  |  | 587 |  | 
| 175 |  |  |  |  |  |  | $self->can( "_make_codec_$codec" ) or croak "Unrecognised codec name '$codec'" | 
| 176 |  |  |  |  |  |  | )->(); | 
| 177 |  |  |  |  |  |  | } | 
| 178 |  |  |  |  |  |  |  | 
| 179 | 149 |  |  |  |  | 562 | $self->SUPER::configure( %params ); | 
| 180 |  |  |  |  |  |  | } | 
| 181 |  |  |  |  |  |  |  | 
| 182 |  |  |  |  |  |  | sub _make_codec_Storable | 
| 183 |  |  |  |  |  |  | { | 
| 184 | 0 |  |  | 0 |  | 0 | require Storable; | 
| 185 |  |  |  |  |  |  |  | 
| 186 |  |  |  |  |  |  | return | 
| 187 | 0 |  |  |  |  | 0 | \&Storable::freeze, | 
| 188 |  |  |  |  |  |  | \&Storable::thaw; | 
| 189 |  |  |  |  |  |  | } | 
| 190 |  |  |  |  |  |  |  | 
| 191 |  |  |  |  |  |  | sub _make_codec_Sereal | 
| 192 |  |  |  |  |  |  | { | 
| 193 | 154 |  |  | 154 |  | 908 | require Sereal::Encoder; | 
| 194 | 154 |  |  |  |  | 567 | require Sereal::Decoder; | 
| 195 |  |  |  |  |  |  |  | 
| 196 | 154 |  |  |  |  | 257 | my $encoder; | 
| 197 |  |  |  |  |  |  | my $decoder; | 
| 198 |  |  |  |  |  |  |  | 
| 199 |  |  |  |  |  |  | # "thread safety" to Sereal::{Encoder,Decoder} means that the variables get | 
| 200 |  |  |  |  |  |  | # reset to undef in new threads. We should defend against that. | 
| 201 |  |  |  |  |  |  |  | 
| 202 |  |  |  |  |  |  | return | 
| 203 | 113 |  | 66 | 113 |  | 2154 | sub { ( $encoder ||= Sereal::Encoder->new )->encode( $_[0] ) }, | 
| 204 | 154 |  | 66 | 110 |  | 1302 | sub { ( $decoder ||= Sereal::Decoder->new )->decode( $_[0] ) }; | 
|  | 110 |  |  |  |  | 3348 |  | 
| 205 |  |  |  |  |  |  | } | 
| 206 |  |  |  |  |  |  |  | 
| 207 |  |  |  |  |  |  | =head2 send | 
| 208 |  |  |  |  |  |  |  | 
| 209 |  |  |  |  |  |  | $channel->send( $data ) | 
| 210 |  |  |  |  |  |  |  | 
| 211 |  |  |  |  |  |  | Pushes the data stored in the given Perl reference into the FIFO of the | 
| 212 |  |  |  |  |  |  | Channel, where it can be received by the other end. When called on a | 
| 213 |  |  |  |  |  |  | synchronous mode Channel this method may block if a C call on the | 
| 214 |  |  |  |  |  |  | underlying filehandle blocks. When called on an asynchronous mode channel this | 
| 215 |  |  |  |  |  |  | method will not block. | 
| 216 |  |  |  |  |  |  |  | 
| 217 |  |  |  |  |  |  | =cut | 
| 218 |  |  |  |  |  |  |  | 
| 219 |  |  |  |  |  |  | my %SENDMETHODS; | 
| 220 |  |  |  |  |  |  | sub send | 
| 221 |  |  |  |  |  |  | { | 
| 222 | 21 |  |  | 21 | 1 | 2359 | my $self = shift; | 
| 223 | 21 |  |  |  |  | 56 | my ( $data ) = @_; | 
| 224 |  |  |  |  |  |  |  | 
| 225 | 21 | 50 |  |  |  | 154 | defined( my $mode = $self->{mode} ) or die "Cannot ->send without being set up"; | 
| 226 |  |  |  |  |  |  |  | 
| 227 | 21 | 50 | 66 |  |  | 350 | my $code = ( $SENDMETHODS{$mode} ||= $self->can( "_send_$mode" ) ) | 
| 228 |  |  |  |  |  |  | or die "IO::Async::Channel cannot send in unrecognised mode '$mode'"; | 
| 229 |  |  |  |  |  |  |  | 
| 230 | 21 |  |  |  |  | 152 | $self->$code( $data ); | 
| 231 |  |  |  |  |  |  | } | 
| 232 |  |  |  |  |  |  |  | 
| 233 |  |  |  |  |  |  | *_send_sync = *_send_async = sub { | 
| 234 | 21 |  |  | 21 |  | 67 | my ( $self, $data ) = @_; | 
| 235 | 21 |  |  |  |  | 138 | $self->send_encoded( $self->{encode}->( $data ) ); | 
| 236 |  |  |  |  |  |  | }; | 
| 237 |  |  |  |  |  |  |  | 
| 238 |  |  |  |  |  |  | =head2 send_encoded | 
| 239 |  |  |  |  |  |  |  | 
| 240 |  |  |  |  |  |  | $channel->send_encoded( $record ) | 
| 241 |  |  |  |  |  |  |  | 
| 242 |  |  |  |  |  |  | A variant of the C method; this method pushes the byte record given. | 
| 243 |  |  |  |  |  |  | This should be the result of a call to C. | 
| 244 |  |  |  |  |  |  |  | 
| 245 |  |  |  |  |  |  | =cut | 
| 246 |  |  |  |  |  |  |  | 
| 247 |  |  |  |  |  |  | sub send_encoded | 
| 248 |  |  |  |  |  |  | { | 
| 249 | 111 |  |  | 111 | 1 | 219 | my $self = shift; | 
| 250 | 111 |  |  |  |  | 211 | my ( $record ) = @_; | 
| 251 |  |  |  |  |  |  |  | 
| 252 | 111 |  |  |  |  | 649 | my $bytes = pack( "I", length $record ) . $record; | 
| 253 |  |  |  |  |  |  |  | 
| 254 | 111 | 50 |  |  |  | 338 | defined $self->{mode} or die "Cannot ->send without being set up"; | 
| 255 |  |  |  |  |  |  |  | 
| 256 | 111 | 100 |  |  |  | 296 | return $self->_sendbytes_sync( $bytes )  if $self->{mode} eq "sync"; | 
| 257 | 101 | 50 |  |  |  | 868 | return $self->_sendbytes_async( $bytes ) if $self->{mode} eq "async"; | 
| 258 |  |  |  |  |  |  | } | 
| 259 |  |  |  |  |  |  |  | 
| 260 |  |  |  |  |  |  | =head2 encode | 
| 261 |  |  |  |  |  |  |  | 
| 262 |  |  |  |  |  |  | $record = $channel->encode( $data ) | 
| 263 |  |  |  |  |  |  |  | 
| 264 |  |  |  |  |  |  | Takes a Perl reference and returns a serialised string that can be passed to | 
| 265 |  |  |  |  |  |  | C. The following two forms are equivalent | 
| 266 |  |  |  |  |  |  |  | 
| 267 |  |  |  |  |  |  | $channel->send( $data ) | 
| 268 |  |  |  |  |  |  | $channel->send_encoded( $channel->encode( $data ) ) | 
| 269 |  |  |  |  |  |  |  | 
| 270 |  |  |  |  |  |  | This is provided for the use-case where data needs to be serialised into a | 
| 271 |  |  |  |  |  |  | fixed string to "snapshot it" but not sent yet; the returned string can be | 
| 272 |  |  |  |  |  |  | saved and sent at a later time. | 
| 273 |  |  |  |  |  |  |  | 
| 274 |  |  |  |  |  |  | $record = IO::Async::Channel->encode( $data ) | 
| 275 |  |  |  |  |  |  |  | 
| 276 |  |  |  |  |  |  | This can also be used as a class method, in case it is inconvenient to operate | 
| 277 |  |  |  |  |  |  | on a particular object instance, or when one does not exist yet. In this case | 
| 278 |  |  |  |  |  |  | it will encode using whatever is the default codec for C. | 
| 279 |  |  |  |  |  |  |  | 
| 280 |  |  |  |  |  |  | =cut | 
| 281 |  |  |  |  |  |  |  | 
| 282 |  |  |  |  |  |  | my $default_encode; | 
| 283 |  |  |  |  |  |  | sub encode | 
| 284 |  |  |  |  |  |  | { | 
| 285 | 92 |  |  | 92 | 1 | 212 | my $self = shift; | 
| 286 | 92 |  |  |  |  | 184 | my ( $data ) = @_; | 
| 287 |  |  |  |  |  |  |  | 
| 288 |  |  |  |  |  |  | return ( ref $self ? | 
| 289 |  |  |  |  |  |  | $self->{encode} : | 
| 290 | 92 | 100 | 66 |  |  | 418 | $default_encode ||= do { ( $self->can( "_make_codec_" . _default_codec )->() )[0] } | 
|  | 6 |  |  |  |  | 57 |  | 
| 291 |  |  |  |  |  |  | )->( $data ); | 
| 292 |  |  |  |  |  |  | } | 
| 293 |  |  |  |  |  |  |  | 
| 294 |  |  |  |  |  |  | =head2 recv | 
| 295 |  |  |  |  |  |  |  | 
| 296 |  |  |  |  |  |  | $data = $channel->recv | 
| 297 |  |  |  |  |  |  |  | 
| 298 |  |  |  |  |  |  | When called on a synchronous mode Channel this method will block until a Perl | 
| 299 |  |  |  |  |  |  | reference value is available from the other end and then return it. If the | 
| 300 |  |  |  |  |  |  | Channel is closed this method will return C. Since only references may | 
| 301 |  |  |  |  |  |  | be passed and all Perl references are true the truth of the result of this | 
| 302 |  |  |  |  |  |  | method can be used to detect that the channel is still open and has not yet | 
| 303 |  |  |  |  |  |  | been closed. | 
| 304 |  |  |  |  |  |  |  | 
| 305 |  |  |  |  |  |  | $data = $channel->recv->get | 
| 306 |  |  |  |  |  |  |  | 
| 307 |  |  |  |  |  |  | When called on an asynchronous mode Channel this method returns a future which | 
| 308 |  |  |  |  |  |  | will eventually yield the next Perl reference value that becomes available | 
| 309 |  |  |  |  |  |  | from the other end. If the Channel is closed, the future will fail with an | 
| 310 |  |  |  |  |  |  | C failure. | 
| 311 |  |  |  |  |  |  |  | 
| 312 |  |  |  |  |  |  | $channel->recv( %args ) | 
| 313 |  |  |  |  |  |  |  | 
| 314 |  |  |  |  |  |  | When not returning a future, takes the following named arguments: | 
| 315 |  |  |  |  |  |  |  | 
| 316 |  |  |  |  |  |  | =over 8 | 
| 317 |  |  |  |  |  |  |  | 
| 318 |  |  |  |  |  |  | =item on_recv => CODE | 
| 319 |  |  |  |  |  |  |  | 
| 320 |  |  |  |  |  |  | Called when a new Perl reference value is available. Will be passed the | 
| 321 |  |  |  |  |  |  | Channel object and the reference data. | 
| 322 |  |  |  |  |  |  |  | 
| 323 |  |  |  |  |  |  | $on_recv->( $channel, $data ) | 
| 324 |  |  |  |  |  |  |  | 
| 325 |  |  |  |  |  |  | =item on_eof => CODE | 
| 326 |  |  |  |  |  |  |  | 
| 327 |  |  |  |  |  |  | Called if the Channel was closed before a new value was ready. Will be passed | 
| 328 |  |  |  |  |  |  | the Channel object. | 
| 329 |  |  |  |  |  |  |  | 
| 330 |  |  |  |  |  |  | $on_eof->( $channel ) | 
| 331 |  |  |  |  |  |  |  | 
| 332 |  |  |  |  |  |  | =back | 
| 333 |  |  |  |  |  |  |  | 
| 334 |  |  |  |  |  |  | =cut | 
| 335 |  |  |  |  |  |  |  | 
| 336 |  |  |  |  |  |  | my %RECVMETHODS; | 
| 337 |  |  |  |  |  |  | sub recv | 
| 338 |  |  |  |  |  |  | { | 
| 339 | 115 |  |  | 115 | 1 | 602 | my $self = shift; | 
| 340 |  |  |  |  |  |  |  | 
| 341 | 115 | 50 |  |  |  | 531 | defined( my $mode = $self->{mode} ) or die "Cannot ->recv without being set up"; | 
| 342 |  |  |  |  |  |  |  | 
| 343 | 115 | 50 | 66 |  |  | 662 | my $code = ( $RECVMETHODS{$mode} ||= $self->can( "_recv_$mode" ) ) | 
| 344 |  |  |  |  |  |  | or die "IO::Async::Channel cannot recv in unrecognised mode '$mode'"; | 
| 345 |  |  |  |  |  |  |  | 
| 346 | 115 |  |  |  |  | 390 | return $self->$code( @_ ); | 
| 347 |  |  |  |  |  |  | } | 
| 348 |  |  |  |  |  |  |  | 
| 349 |  |  |  |  |  |  | =head2 close | 
| 350 |  |  |  |  |  |  |  | 
| 351 |  |  |  |  |  |  | $channel->close | 
| 352 |  |  |  |  |  |  |  | 
| 353 |  |  |  |  |  |  | Closes the channel. Causes a pending C on the other end to return undef | 
| 354 |  |  |  |  |  |  | or the queued C callbacks to be invoked. | 
| 355 |  |  |  |  |  |  |  | 
| 356 |  |  |  |  |  |  | =cut | 
| 357 |  |  |  |  |  |  |  | 
| 358 |  |  |  |  |  |  | my %CLOSEMETHODS; | 
| 359 |  |  |  |  |  |  | sub close | 
| 360 |  |  |  |  |  |  | { | 
| 361 | 182 |  |  | 182 | 1 | 1867 | my $self = shift; | 
| 362 |  |  |  |  |  |  |  | 
| 363 | 182 | 50 |  |  |  | 600 | defined( my $mode = $self->{mode} ) or return; | 
| 364 |  |  |  |  |  |  |  | 
| 365 | 182 | 50 | 66 |  |  | 797 | my $code = ( $CLOSEMETHODS{$mode} ||= $self->can( "_close_$mode" ) ) | 
| 366 |  |  |  |  |  |  | or die "IO::Async::Channel cannot close in unrecognised mode '$mode'"; | 
| 367 |  |  |  |  |  |  |  | 
| 368 | 182 |  |  |  |  | 428 | return $self->$code; | 
| 369 |  |  |  |  |  |  | } | 
| 370 |  |  |  |  |  |  |  | 
| 371 |  |  |  |  |  |  | # Leave this undocumented for now | 
| 372 |  |  |  |  |  |  | sub setup_sync_mode | 
| 373 |  |  |  |  |  |  | { | 
| 374 | 11 |  |  | 11 | 0 | 92 | my $self = shift; | 
| 375 | 11 |  |  |  |  | 62 | ( $self->{fh} ) = @_; | 
| 376 |  |  |  |  |  |  |  | 
| 377 | 11 |  |  |  |  | 37 | $self->{mode} = "sync"; | 
| 378 |  |  |  |  |  |  |  | 
| 379 |  |  |  |  |  |  | # Since we're communicating binary structures and not Unicode text we need to | 
| 380 |  |  |  |  |  |  | # enable binmode | 
| 381 | 11 |  |  |  |  | 34 | binmode $self->{fh}; | 
| 382 |  |  |  |  |  |  |  | 
| 383 | 11 |  | 66 |  |  | 73 | defined and $_->blocking( 1 ) for $self->{read_handle}, $self->{write_handle}; | 
| 384 | 11 |  |  |  |  | 108 | $self->{fh}->autoflush(1); | 
| 385 |  |  |  |  |  |  | } | 
| 386 |  |  |  |  |  |  |  | 
| 387 |  |  |  |  |  |  | sub _read_exactly | 
| 388 |  |  |  |  |  |  | { | 
| 389 | 12 |  |  | 12 |  | 18 | $_[1] = ""; | 
| 390 |  |  |  |  |  |  |  | 
| 391 | 12 |  |  |  |  | 34 | while( length $_[1] < $_[2] ) { | 
| 392 | 12 |  |  |  |  | 510 | my $n = read( $_[0], $_[1], $_[2]-length $_[1], length $_[1] ); | 
| 393 | 12 | 50 |  |  |  | 38 | defined $n or return undef; | 
| 394 | 12 | 100 |  |  |  | 50 | $n or return ""; | 
| 395 |  |  |  |  |  |  | } | 
| 396 |  |  |  |  |  |  |  | 
| 397 | 10 |  |  |  |  | 14 | return $_[2]; | 
| 398 |  |  |  |  |  |  | } | 
| 399 |  |  |  |  |  |  |  | 
| 400 |  |  |  |  |  |  | sub _recv_sync | 
| 401 |  |  |  |  |  |  | { | 
| 402 | 7 |  |  | 7 |  | 10 | my $self = shift; | 
| 403 |  |  |  |  |  |  |  | 
| 404 | 7 |  |  |  |  | 16 | my $n = _read_exactly( $self->{fh}, my $lenbuffer, 4 ); | 
| 405 | 7 | 50 |  |  |  | 24 | defined $n or die "Cannot read - $!"; | 
| 406 | 7 | 100 |  |  |  | 25 | length $n or return undef; | 
| 407 |  |  |  |  |  |  |  | 
| 408 | 5 |  |  |  |  | 23 | my $len = unpack( "I", $lenbuffer ); | 
| 409 |  |  |  |  |  |  |  | 
| 410 | 5 |  |  |  |  | 13 | $n = _read_exactly( $self->{fh}, my $record, $len ); | 
| 411 | 5 | 50 |  |  |  | 16 | defined $n or die "Cannot read - $!"; | 
| 412 | 5 | 50 |  |  |  | 17 | length $n or return undef; | 
| 413 |  |  |  |  |  |  |  | 
| 414 | 5 |  |  |  |  | 18 | return $self->{decode}->( $record ); | 
| 415 |  |  |  |  |  |  | } | 
| 416 |  |  |  |  |  |  |  | 
| 417 |  |  |  |  |  |  | sub _sendbytes_sync | 
| 418 |  |  |  |  |  |  | { | 
| 419 | 10 |  |  | 10 |  | 13 | my $self = shift; | 
| 420 | 10 |  |  |  |  | 14 | my ( $bytes ) = @_; | 
| 421 | 10 |  |  |  |  | 28 | $self->{fh}->print( $bytes ); | 
| 422 |  |  |  |  |  |  | } | 
| 423 |  |  |  |  |  |  |  | 
| 424 |  |  |  |  |  |  | sub _close_sync | 
| 425 |  |  |  |  |  |  | { | 
| 426 | 14 |  |  | 14 |  | 15 | my $self = shift; | 
| 427 | 14 |  |  |  |  | 37 | $self->{fh}->close; | 
| 428 |  |  |  |  |  |  | } | 
| 429 |  |  |  |  |  |  |  | 
| 430 |  |  |  |  |  |  | # Leave this undocumented for now | 
| 431 |  |  |  |  |  |  | sub setup_async_mode | 
| 432 |  |  |  |  |  |  | { | 
| 433 | 140 |  |  | 140 | 0 | 775 | my $self = shift; | 
| 434 | 140 |  |  |  |  | 333 | my %args = @_; | 
| 435 |  |  |  |  |  |  |  | 
| 436 | 140 |  | 66 |  |  | 648 | exists $args{$_} and $self->{$_} = delete $args{$_} for qw( read_handle write_handle ); | 
| 437 |  |  |  |  |  |  |  | 
| 438 | 140 | 50 |  |  |  | 353 | keys %args and croak "Unrecognised keys for setup_async_mode: " . join( ", ", keys %args ); | 
| 439 |  |  |  |  |  |  |  | 
| 440 | 140 |  | 66 |  |  | 1934 | defined and $_->blocking( 0 ) for $self->{read_handle}, $self->{write_handle}; | 
| 441 | 140 |  |  |  |  | 765 | $self->{mode} = "async"; | 
| 442 |  |  |  |  |  |  | } | 
| 443 |  |  |  |  |  |  |  | 
| 444 |  |  |  |  |  |  | sub _build_stream | 
| 445 |  |  |  |  |  |  | { | 
| 446 | 211 |  |  | 211 |  | 273 | my $self = shift; | 
| 447 | 211 |  | 66 |  |  | 2170 | return $self->{stream} ||= do { | 
| 448 | 128 |  |  |  |  | 429 | $self->{on_result_queue} = []; | 
| 449 |  |  |  |  |  |  |  | 
| 450 |  |  |  |  |  |  | my $stream = IO::Async::Stream->new( | 
| 451 |  |  |  |  |  |  | read_handle  => $self->{read_handle}, | 
| 452 |  |  |  |  |  |  | write_handle => $self->{write_handle}, | 
| 453 | 128 |  |  |  |  | 676 | autoflush    => 1, | 
| 454 |  |  |  |  |  |  | on_read      => $self->_capture_weakself( '_on_stream_read' ) | 
| 455 |  |  |  |  |  |  | ); | 
| 456 |  |  |  |  |  |  |  | 
| 457 | 128 |  |  |  |  | 614 | $self->add_child( $stream ); | 
| 458 |  |  |  |  |  |  |  | 
| 459 | 128 |  |  |  |  | 1047 | $stream; | 
| 460 |  |  |  |  |  |  | }; | 
| 461 |  |  |  |  |  |  | } | 
| 462 |  |  |  |  |  |  |  | 
| 463 |  |  |  |  |  |  | sub _sendbytes_async | 
| 464 |  |  |  |  |  |  | { | 
| 465 | 101 |  |  | 101 |  | 220 | my $self = shift; | 
| 466 | 101 |  |  |  |  | 220 | my ( $bytes ) = @_; | 
| 467 | 101 |  |  |  |  | 270 | $self->_build_stream->write( $bytes ); | 
| 468 |  |  |  |  |  |  | } | 
| 469 |  |  |  |  |  |  |  | 
| 470 |  |  |  |  |  |  | sub _recv_async | 
| 471 |  |  |  |  |  |  | { | 
| 472 | 108 |  |  | 108 |  | 190 | my $self = shift; | 
| 473 | 108 |  |  |  |  | 248 | my %args = @_; | 
| 474 |  |  |  |  |  |  |  | 
| 475 | 108 |  |  |  |  | 228 | my $on_recv = $args{on_recv}; | 
| 476 | 108 |  |  |  |  | 204 | my $on_eof = $args{on_eof}; | 
| 477 |  |  |  |  |  |  |  | 
| 478 | 108 |  |  |  |  | 293 | my $stream = $self->_build_stream; | 
| 479 |  |  |  |  |  |  |  | 
| 480 | 108 |  |  |  |  | 190 | my $f; | 
| 481 | 108 | 100 |  |  |  | 697 | $f = $stream->loop->new_future unless !defined wantarray; | 
| 482 |  |  |  |  |  |  |  | 
| 483 | 108 |  |  |  |  | 1575 | push @{ $self->{on_result_queue} }, sub { | 
| 484 | 108 |  |  | 108 |  | 372 | my ( $self, $type, $result ) = @_; | 
| 485 | 108 | 100 |  |  |  | 328 | if( $type eq "recv" ) { | 
| 486 | 104 | 100 | 100 |  |  | 1082 | $f->done( $result ) if $f and !$f->is_cancelled; | 
| 487 | 104 | 100 |  |  |  | 6721 | $on_recv->( $self, $result ) if $on_recv; | 
| 488 |  |  |  |  |  |  | } | 
| 489 |  |  |  |  |  |  | else { | 
| 490 | 4 | 100 | 66 |  |  | 71 | $f->fail( "EOF waiting for Channel recv", eof => ) if $f and !$f->is_cancelled; | 
| 491 | 4 | 100 |  |  |  | 218 | $on_eof->( $self ) if $on_eof; | 
| 492 |  |  |  |  |  |  | } | 
| 493 | 108 |  |  |  |  | 181 | }; | 
| 494 |  |  |  |  |  |  |  | 
| 495 | 108 |  |  |  |  | 2613 | return $f; | 
| 496 |  |  |  |  |  |  | } | 
| 497 |  |  |  |  |  |  |  | 
| 498 |  |  |  |  |  |  | sub _close_async | 
| 499 |  |  |  |  |  |  | { | 
| 500 | 168 |  |  | 168 |  | 265 | my $self = shift; | 
| 501 | 168 | 100 |  |  |  | 406 | if( my $stream = $self->{stream} ) { | 
| 502 | 163 |  |  |  |  | 477 | $stream->close_when_empty; | 
| 503 |  |  |  |  |  |  | } | 
| 504 |  |  |  |  |  |  | else { | 
| 505 | 5 |  | 66 |  |  | 109 | $_ and $_->close for $self->{read_handle}, $self->{write_handle}; | 
| 506 |  |  |  |  |  |  | } | 
| 507 |  |  |  |  |  |  |  | 
| 508 | 168 |  |  |  |  | 6721 | undef $_ for $self->{read_handle}, $self->{write_handle}; | 
| 509 |  |  |  |  |  |  | } | 
| 510 |  |  |  |  |  |  |  | 
| 511 |  |  |  |  |  |  | sub _on_stream_read | 
| 512 |  |  |  |  |  |  | { | 
| 513 | 112 | 50 |  | 112 |  | 446 | my $self = shift or return; | 
| 514 | 112 |  |  |  |  | 308 | my ( $stream, $buffref, $eof ) = @_; | 
| 515 |  |  |  |  |  |  |  | 
| 516 | 112 | 100 |  |  |  | 274 | if( $eof ) { | 
| 517 | 7 |  |  |  |  | 47 | while( my $on_result = shift @{ $self->{on_result_queue} } ) { | 
|  | 11 |  |  |  |  | 95 |  | 
| 518 | 4 |  |  |  |  | 57 | $on_result->( $self, eof => ); | 
| 519 |  |  |  |  |  |  | } | 
| 520 | 7 | 100 |  |  |  | 81 | $self->{on_eof}->( $self ) if $self->{on_eof}; | 
| 521 | 7 |  |  |  |  | 45 | return; | 
| 522 |  |  |  |  |  |  | } | 
| 523 |  |  |  |  |  |  |  | 
| 524 | 105 | 50 |  |  |  | 269 | return 0 unless length( $$buffref ) >= 4; | 
| 525 | 105 |  |  |  |  | 421 | my $len = unpack( "I", $$buffref ); | 
| 526 | 105 | 50 |  |  |  | 321 | return 0 unless length( $$buffref ) >= 4 + $len; | 
| 527 |  |  |  |  |  |  |  | 
| 528 | 105 |  |  |  |  | 552 | my $record = $self->{decode}->( substr( $$buffref, 4, $len ) ); | 
| 529 | 105 |  |  |  |  | 456 | substr( $$buffref, 0, 4 + $len ) = ""; | 
| 530 |  |  |  |  |  |  |  | 
| 531 | 105 | 100 |  |  |  | 170 | if( my $on_result = shift @{ $self->{on_result_queue} } ) { | 
|  | 105 |  |  |  |  | 361 |  | 
| 532 | 104 |  |  |  |  | 281 | $on_result->( $self, recv => $record ); | 
| 533 |  |  |  |  |  |  | } | 
| 534 |  |  |  |  |  |  | else { | 
| 535 | 1 |  |  |  |  | 4 | $self->{on_recv}->( $self, $record ); | 
| 536 |  |  |  |  |  |  | } | 
| 537 |  |  |  |  |  |  |  | 
| 538 | 105 |  |  |  |  | 1564 | return 1; | 
| 539 |  |  |  |  |  |  | } | 
| 540 |  |  |  |  |  |  |  | 
| 541 |  |  |  |  |  |  | sub _extract_read_handle | 
| 542 |  |  |  |  |  |  | { | 
| 543 | 66 |  |  | 66 |  | 169 | my $self = shift; | 
| 544 |  |  |  |  |  |  |  | 
| 545 | 66 | 100 |  |  |  | 351 | return undef if !$self->{mode}; | 
| 546 |  |  |  |  |  |  |  | 
| 547 | 1 | 50 |  |  |  | 30 | croak "Cannot extract filehandle" if $self->{mode} ne "async"; | 
| 548 | 1 |  |  |  |  | 26 | $self->{mode} = "dead"; | 
| 549 |  |  |  |  |  |  |  | 
| 550 | 1 |  |  |  |  | 15 | return $self->{read_handle}; | 
| 551 |  |  |  |  |  |  | } | 
| 552 |  |  |  |  |  |  |  | 
| 553 |  |  |  |  |  |  | sub _extract_write_handle | 
| 554 |  |  |  |  |  |  | { | 
| 555 | 67 |  |  | 67 |  | 124 | my $self = shift; | 
| 556 |  |  |  |  |  |  |  | 
| 557 | 67 | 50 |  |  |  | 354 | return undef if !$self->{mode}; | 
| 558 |  |  |  |  |  |  |  | 
| 559 | 0 | 0 |  |  |  |  | croak "Cannot extract filehandle" if $self->{mode} ne "async"; | 
| 560 | 0 |  |  |  |  |  | $self->{mode} = "dead"; | 
| 561 |  |  |  |  |  |  |  | 
| 562 | 0 |  |  |  |  |  | return $self->{write_handle}; | 
| 563 |  |  |  |  |  |  | } | 
| 564 |  |  |  |  |  |  |  | 
| 565 |  |  |  |  |  |  | =head1 AUTHOR | 
| 566 |  |  |  |  |  |  |  | 
| 567 |  |  |  |  |  |  | Paul Evans | 
| 568 |  |  |  |  |  |  |  | 
| 569 |  |  |  |  |  |  | =cut | 
| 570 |  |  |  |  |  |  |  | 
| 571 |  |  |  |  |  |  | 0x55AA; |