File Coverage

blib/lib/Ryu/Async.pm
Criterion Covered Total %
statement 203 258 78.6
branch 20 50 40.0
condition 25 59 42.3
subroutine 49 66 74.2
pod 13 15 86.6
total 310 448 69.2


line stmt bran cond sub pod time code
1             package Ryu::Async;
2             # ABSTRACT: IO::Async support for Ryu stream management
3 6     6   597584 use strict;
  6         81  
  6         207  
4 6     6   34 use warnings;
  6         12  
  6         281  
5              
6             our $VERSION = '0.020';
7              
8 6     6   3907 use utf8;
  6         91  
  6         35  
9              
10             =encoding UTF8
11              
12             =head1 NAME
13              
14             Ryu::Async - use L with L
15              
16             =head1 SYNOPSIS
17              
18             #!/usr/bin/env perl
19             use strict;
20             use warnings;
21             use IO::Async::Loop;
22             use Ryu::Async;
23             # This will generate a lot of output, but is useful
24             # for demonstrating lifecycles. Drop this to 'info' or
25             # 'debug' to make it more realistic.
26             use Log::Any::Adapter qw(Stdout), log_level => 'trace';
27             #
28             my $loop = IO::Async::Loop->new;
29             $loop->add(
30             my $ryu = Ryu::Async->new
31             );
32             {
33             my $timer = $ryu->timer(
34             interval => 0.10,
35             )->take(10)
36             ->each(sub { print "tick\n" });
37             warn $timer->describe;
38             $timer->get;
39             }
40              
41             =head1 DESCRIPTION
42              
43             This is an L subclass for interacting with L.
44              
45             =cut
46              
47 6     6   3313 use parent qw(IO::Async::Notifier);
  6         1938  
  6         37  
48              
49 6     6   100591 use Future::AsyncAwait;
  6         21037  
  6         39  
50 6     6   3451 use IO::Async::Handle;
  6         81583  
  6         317  
51 6     6   3705 use IO::Async::Listener;
  6         23447  
  6         257  
52 6     6   3656 use IO::Async::Process;
  6         22258  
  6         220  
53 6     6   3724 use IO::Async::Resolver;
  6         279944  
  6         290  
54 6     6   3668 use IO::Async::Signal;
  6         4449  
  6         203  
55 6     6   3129 use IO::Async::Socket;
  6         7128  
  6         217  
56 6     6   52 use IO::Async::Stream;
  6         15  
  6         139  
57 6     6   3279 use IO::Async::Timer::Absolute;
  6         3472  
  6         197  
58 6     6   42 use IO::Async::Timer::Countdown;
  6         14  
  6         162  
59 6     6   3237 use IO::Async::Timer::Periodic;
  6         6271  
  6         255  
60              
61 6     6   3312 use Ryu::Async::Client;
  6         18  
  6         203  
62 6     6   2649 use Ryu::Async::Packet;
  6         18  
  6         199  
63 6     6   2639 use Ryu::Async::Server;
  6         16  
  6         194  
64              
65 6     6   2855 use Ryu::Sink;
  6         13076  
  6         196  
66 6     6   4043 use Ryu::Source;
  6         308837  
  6         318  
67              
68 6     6   3505 use URI::udp;
  6         79835  
  6         1790  
69 6     6   3468 use URI::tcp;
  6         1557  
  6         265  
70 6     6   44 use Socket qw(pack_sockaddr_in inet_pton AF_INET);
  6         15  
  6         428  
71              
72 6     6   43 use curry::weak;
  6         14  
  6         155  
73              
74 6     6   1407 use Syntax::Keyword::Try;
  6         14  
  6         55  
75              
76 6     6   3450 use Ryu '2.000';
  6         2952  
  6         799  
77 6     6   45 use Ryu::Source;
  6         14  
  6         146  
78              
79 6     6   2843 use Ryu::Async::Process;
  6         19  
  6         229  
80 6     6   45 use Scalar::Util qw(blessed weaken);
  6         12  
  6         385  
81              
82 6     6   37 use Log::Any qw($log);
  6         19  
  6         54  
83              
84             =head1 Interaction with L
85              
86             On load, this module will provide a L which assigns
87             L instances from L.
88              
89             You can override this behaviour by doing this instead:
90              
91             BEGIN {
92             require Ryu::Source;
93             local $Ryu::Source::FUTURE_FACTORY = sub { };
94             require Ryu::Async;
95             }
96              
97             to ensure the original factory function is preserved.
98              
99             =cut
100              
101             $Ryu::Source::FUTURE_FACTORY = sub {
102             IO::Async::Loop->new->new_future(label => $_[1]);
103             };
104              
105             =head1 METHODS
106              
107             =cut
108              
109             =head2 from
110              
111             Creates a new L from a thing.
112              
113             The exact details of this are likely to change in future, but a few things that are expected to work:
114              
115             $ryu->from($io_async_stream_instance)
116             ->by_line
117             ->each(sub { print "Line: $_\n" });
118             $ryu->from([1..1000])
119             ->sum
120             ->each(sub { print "Total was $_\n" });
121              
122             =cut
123              
124             sub from {
125 2     2 1 15736 my $self = shift;
126              
127 2 100       16 if(my $class = blessed $_[0]) {
128 1 50       10 if($class->isa('IO::Async::Stream')) {
129 1         6 return $self->from_stream($_[0]);
130             } else {
131 0         0 die "Unable to determine appropriate source for $class";
132             }
133             }
134              
135 1         6 my $src = $self->source(label => 'from');
136 1 50       119 if(my $ref = ref $_[0]) {
137 1 50       5 if($ref eq 'ARRAY') {
138             # We'll run a background loop that emits one item from the arrayref
139             # every I/O loop iteration - the arrayref is used as-is, allowing for
140             # dynamic population over time
141 1         2 my $pending = $_[0];
142 1         5 weaken(my $weak_src = $src);
143 1         3 my $loop = $self->loop;
144              
145 1     1   3 (async sub {
146             # Acquire instance on each iteration - this allows us to
147             # give up if nothing cares about the source any more.
148             ITEM:
149 1         7 while(my $src = $weak_src) {
150 4         280 await Future->wait_all(
151             $loop->later,
152             $src->unblocked
153             );
154 4 100       18000 last ITEM unless $pending->@*;
155 3         23 $src->emit(shift $pending->@*);
156             }
157              
158             # Probably overkill, but given stack-not-refcounted issues
159             # then let's not risk $src going through destruction partway
160             # through the ->finish handling
161 1 50       13 (my $src = $weak_src)->finish if $weak_src;
162              
163             # We only needed this for ->later
164 1         399 weaken $loop;
165 1         57 return;
166 1         11 })->()->retain;
167 1         1952 return $src;
168             } else {
169 0         0 die "Unknown type $ref"
170             }
171             }
172              
173 0         0 my %args = @_;
174 0 0       0 if(my $dir = $args{directory}) {
175 0 0       0 opendir my $handle, $dir or die $!;
176 0         0 my $code;
177             $code = sub {
178 0 0   0   0 if(defined(my $item = readdir $handle)) {
179 0 0 0     0 $src->emit($item) unless $item eq '.' or $item eq '..';
180 0         0 $self->loop->later($code);
181             } else {
182 0         0 weaken($code);
183 0 0       0 closedir $handle or die $!;
184 0         0 $src->finish
185             }
186 0         0 };
187 0         0 $code->();
188 0         0 return $self;
189             }
190 0         0 die "unknown stuff";
191             }
192              
193             =head2 from_stream
194              
195             Create a new L from an L instance.
196              
197             Note that a stream which is not already attached to an L
198             will be added as a child of this instance.
199              
200             =cut
201              
202             sub from_stream {
203 1     1 1 3 my ($self, $stream, %args) = @_;
204              
205 1   50     12 my $src = $self->source(label => $args{label} // 'IaStream');
206              
207             # Our ->flow_control monitoring gives us a boolean
208             # value every time the state changes:
209             # 1 - we are active
210             # 0 - we are paused
211             # through sheer coïncidence, this is also what the
212             # IO::Async::Stream `->want_(read|write)ready` methods
213             # expect.
214 1         107 $src->flow_control
215             ->each($stream->curry::weak::want_readready);
216              
217             $stream->configure(
218             on_read => sub {
219 3     3   9840 my ($stream, $buffref, $eof) = @_;
220 3 100       24 $log->tracef("Have %d bytes of data, EOF = %s", length($$buffref), $eof ? 'yes' : 'no');
221 3         18 my $data = substr $$buffref, 0, length $$buffref, '';
222 3         17 $src->emit($data);
223 3 100 100     150 $src->finish if $eof && !$src->completed->is_ready;
224             }
225 1         62 );
226 1 50       79 unless($stream->parent) {
227 1         11 $self->add_child($stream);
228             $src->completed->on_ready(sub {
229 1 50   1   565 $self->remove_child($stream) if $stream->parent;
230 1         310 });
231             }
232 1         1751 return $src;
233             }
234              
235             =head2 to_stream
236              
237             Provides a L that will send data to an L instance.
238              
239             Requires the L and will return a new L instance.
240              
241             =cut
242              
243             sub to_stream {
244 0     0 1 0 my ($self, $stream, %args) = @_;
245              
246 0   0     0 my $sink = $self->sink(label => $args{label} // 'IaStream');
247              
248 0         0 $stream->configure(
249             on_writeable_start => $sink->curry::weak::resume,
250             on_writeable_stop => $sink->curry::weak::pause,
251             );
252             $sink->source
253             ->each(sub {
254 0     0   0 $stream->write($_)
255 0         0 });
256 0 0       0 unless($stream->parent) {
257 0         0 $self->add_child($stream);
258             $sink->completed->on_ready($self->$curry::weak(sub {
259 0     0   0 my ($self) = @_;
260 0 0       0 $self->remove_child($stream) if $stream->parent;
261 0         0 }));
262             }
263 0         0 return $sink;
264             }
265              
266             =head2 stdin
267              
268             Create a new L that wraps STDIN.
269              
270             As with other L wrappers, this will emit data as soon as it's available,
271             as raw bytes.
272              
273             Use L and L to split into lines and/or decode from UTF-8.
274              
275             =cut
276              
277             sub stdin {
278 0     0 1 0 my ($self) = @_;
279 0         0 return $self->from_stream(
280             IO::Async::Stream->new_for_stdin,
281             label => 'STDIN',
282             )
283             }
284              
285             =head2 stdout
286              
287             Returns a new L that wraps STDOUT.
288              
289             =cut
290              
291             sub stdout {
292 0     0 1 0 my ($self) = @_;
293 0         0 return $self->to_stream(
294             IO::Async::Stream->new_for_stdout,
295             label => 'STDOUT',
296             )
297             }
298              
299             =head2 stderr
300              
301             Returns a new L that wraps STDERR.
302              
303             =cut
304              
305             sub stderr {
306 0     0 1 0 my ($self) = @_;
307 0         0 return $self->to_stream(
308             IO::Async::Stream->new_for_stderr,
309             label => 'STDERR',
310             )
311             }
312              
313             =head2 timer
314              
315             Provides a L which emits an empty string at selected intervals.
316              
317             Takes the following named parameters:
318              
319             =over 4
320              
321             =item * interval - how often to trigger the timer, in seconds (fractional values allowed)
322              
323             =item * reschedule - type of rescheduling to use, can be C, C or C as documented
324             in L
325              
326             =back
327              
328             Example:
329              
330             $ryu->timer(interval => 1, reschedule => 'hard')
331             ->combine_latest(...)
332              
333             =cut
334              
335             sub timer {
336 1     1 1 6191 my ($self, %args) = @_;
337 1         5 my $src = $self->source(label => 'timer');
338             $self->add_child(
339             my $timer = IO::Async::Timer::Periodic->new(
340             reschedule => 'hard',
341             %args,
342 1     10   92 on_tick => $src->$curry::weak(sub { shift->emit('') }),
  10         1995267  
343             )
344             );
345 1         244 Scalar::Util::weaken($timer);
346             $src->on_ready($self->$curry::weak(sub {
347 1     1   1686 my ($self) = @_;
348 1 50       5 return unless $timer;
349 1 50       8 $timer->stop if $timer->is_running;
350 1         19 $self->remove_child($timer)
351 1         7 }));
352 1         1272 $timer->start;
353 1         6385 $src
354             }
355              
356             =head2 run
357              
358             Creates an L.
359              
360             =cut
361              
362             sub run {
363 0     0 1 0 my ($self, $code, %args) = @_;
364 0 0       0 if(ref($code) eq 'ARRAY') {
    0          
365             # Fork and exec
366 0         0 $args{command} = $code;
367             } elsif(ref($code) eq 'CODE') {
368 0         0 $args{code} = $code;
369             }
370             $self->add_child(
371 0         0 my $process = Ryu::Async::Process->new(
372             process => IO::Async::Process->new(%args)
373             )
374             );
375 0         0 $process;
376             }
377              
378             =head2 source
379              
380             Returns a new L instance.
381              
382             =cut
383              
384             sub source {
385 6     6 1 32 my ($self, %args) = @_;
386 6   66     38 my $label = delete($args{label}) // $self->label;
387 6         35 Ryu::Source->new(
388             new_future => $self->loop->curry::weak::new_future,
389             apply_timeout => $self->curry::timeout,
390             label => $label,
391             %args,
392             )
393             }
394              
395             =head2 udp_client
396              
397             Creates a new UDP client.
398              
399             This provides a sink for L packets, and a source for L responses.
400              
401             =over 4
402              
403             =item * C - an optional URI of the form C<< udp://host:port >>
404              
405             =item * C - which host to listen on, defaults to C<0.0.0.0>
406              
407             =item * C - the port to listen on
408              
409             =back
410              
411             Returns a L instance.
412              
413             =cut
414              
415             sub udp_client {
416 1     1 1 8886 my ($self, %args) = @_;
417              
418 1         4 my $uri = delete $args{uri};
419 1   50     17 $uri //= 'udp://' . join ':', $args{host} // '*', $args{port} // ();
      33        
      33        
420 1 50       14 $uri = URI->new($uri) unless ref $uri;
421 1         129 $log->debugf("UDP client for %s", $uri->as_string);
422              
423             my $src = $self->source(
424 1   33     21 label => $args{label} // $uri->as_string,
425             );
426             my $sink = $self->sink(
427 1   33     93 label => $args{label} // $uri->as_string,
428             );
429             $self->add_child(
430             my $client = IO::Async::Socket->new(
431             on_recv => sub {
432 0     0   0 my ($sock, $payload, $addr) = @_;
433             try {
434             $log->tracef("Receiving [%s] from %s", $payload, $addr);
435             $src->emit(
436             Ryu::Async::Packet->new(
437             from => $addr,
438             payload => $payload
439             )
440             );
441 0         0 } catch {
442             $log->errorf("Exception when sending: %s", $@);
443             }
444             },
445             )
446 1         50 );
447 1   50     263 my $host = $uri->host || '0.0.0.0';
448 1 50       57 $host = '0.0.0.0' if $host eq '*';
449 1   50     6 my $port = $uri->port // 0;
450 1         44 my $f = $client->connect(
451             host => $host,
452             service => $port,
453             socktype => 'dgram',
454             );
455             $f->on_done(sub {
456 1     1   26 $log->debugf("UDP client connected");
457             })->on_fail(sub {
458 0     0   0 $log->errorf("UDP client failed to connect - %s", join ',', @_);
459 1         5936 });
460             $sink->source->each(sub {
461 1     1   158 my $payload = $_;
462             $f->on_done(sub {
463             try {
464             $log->tracef("Sending [%s] to %s", $payload, $uri);
465             $client->send(
466             $payload,
467             undef,
468             pack_sockaddr_in(
469             $port,
470             '' . inet_pton(AF_INET, $host)
471             )
472             );
473 1         20 } catch {
474             $log->errorf("Exception when sending: %s", $@);
475             }
476 1         9 })->retain;
477 1         30 });
478 1         72 Ryu::Async::Client->new(
479             outgoing => $sink,
480             incoming => $src,
481             );
482             }
483              
484             =head2 udp_server
485              
486             =cut
487              
488             sub udp_server {
489 1     1 1 7798 my ($self, %args) = @_;
490              
491 1         4 my $uri = delete $args{uri};
492 1   33     6 $uri //= do {
493 1   50     4 $args{host} //= '0.0.0.0';
494 1   33     12 'udp://' . join ':', $args{host}, $args{port} // ();
495             };
496 1 50       14 $uri = URI->new($uri) unless ref $uri;
497 1         317 $log->debugf("UDP server %s", $uri->as_string);
498              
499 1         75 my $src = $self->source;
500 1         101 my $sink = $self->sink;
501              
502             $self->add_child(
503             my $server = IO::Async::Socket->new(
504             on_recv => sub {
505 1     1   1451 my ($sock, $msg, $addr) = @_;
506 1         10 $log->debugf("UDP server [%s] had %s from %s", $uri->as_string, $msg, $addr);
507 1         23 $src->emit(
508             Ryu::Async::Packet->new(
509             payload => $msg,
510             from => $addr
511             )
512             )
513             },
514             on_recv_error => sub {
515 0     0   0 my ($sock, $err) = @_;
516 0         0 $src->fail($err);
517             }
518             )
519 1         55 );
520 1     0   302 $sink->source->each(sub { $server->send($_->payload, 0, $_->addr) });
  0         0  
521             my $port_f = $server->bind(
522             service => $uri->port // 0,
523             socktype => 'dgram'
524             )->then(sub {
525 1     1   24956 Future->done($server->write_handle->sockport)
526 1   50     86 });
527 1         133 Ryu::Async::Server->new(
528             port => $port_f,
529             incoming => $src,
530             outgoing => undef,
531             );
532             }
533              
534             =head2 tcp_server
535              
536             Creates a listening TCP socket, and provides a L
537             instance which will emit a new event every time a client connects.
538              
539             =cut
540              
541             sub tcp_server {
542 1     1 1 6216 my ($self, %args) = @_;
543              
544 1         4 my $uri = delete $args{uri};
545 1   33     5 $uri //= do {
546 1   50     3 $args{host} //= '0.0.0.0';
547 1   33     26 'tcp://' . join ':', $args{host}, $args{port} // ();
548             };
549 1 50       13 $uri = URI->new($uri) unless ref $uri;
550 1         351 $log->debugf("TCP server %s", $uri->as_string);
551              
552 1         79 my $src = $self->source;
553 1         111 my $sink = $self->sink;
554              
555             $self->add_child(
556             my $server = IO::Async::Listener->new(
557             on_stream => sub {
558 0     0   0 my ($sock, $msg, $addr) = @_;
559 0         0 $log->debugf("TCP server [%s] had %s from %s", $uri->as_string, $msg, $addr);
560 0         0 $src->emit(
561             Ryu::Async::Packet->new(
562             payload => $msg,
563             from => $addr
564             )
565             )
566             },
567             )
568 1         58 );
569 1     0   276 $sink->source->each(sub { $server->send($_->payload, 0, $_->addr) });
  0         0  
570             my $port_f = $server->listen(
571             service => $uri->port // 0,
572             socktype => 'stream'
573             )->then(sub {
574 1     1   22303 my ($listener) = @_;
575 1         5 Future->done($listener->read_handle->sockport)
576 1   50     80 });
577 1         113 Ryu::Async::Server->new(
578             port => $port_f,
579             incoming => $src,
580             outgoing => undef,
581             );
582             }
583              
584             sub timeout {
585 0     0 0 0 my ($self, $input, $output, $delay) = @_;
586             $self->add_child(
587             my $timer = IO::Async::Timer::Countdown->new(
588             interval => $delay,
589 0     0   0 on_expire => sub { $output->fail('timeout') },
590             )
591 0         0 );
592 0     0   0 $input->each_while_source(sub { $timer->reset }, $output);
  0         0  
593 0         0 return $self;
594             }
595              
596             =head2 sink
597              
598             Returns a new L.
599              
600             The label will default to the calling package/class and method,
601             with some truncation rules:
602              
603             =over 4
604              
605             =item * A C prefix will be replaced by C.
606              
607             =item * A C prefix will be replaced by C.
608              
609             =item * A C prefix will be replaced by C.
610              
611             =item * A C prefix will be replaced by C.
612              
613             =item * A C prefix will be replaced by C.
614              
615             =item * A C prefix will be replaced by C.
616              
617             =back
618              
619             This list of truncations is subject to change, so please don't
620             rely on any of these in string matches or similar - better to set
621             your own label if you need consistency.
622              
623             =cut
624              
625             sub sink {
626 4     4 1 7477 my ($self, %args) = @_;
627 4   66     22 my $label = delete($args{label}) // $self->label;
628 4         18 Ryu::Sink->new(
629             new_future => $self->loop->curry::weak::new_future,
630             label => $label,
631             %args,
632             )
633             }
634              
635             sub label {
636 4     4 0 10 my ($self) = @_;
637 4         33 my $label = (caller 2)[0];
638 4   33     19 for($label // ()) {
639 4         11 s/^Net::Async::/Na/g;
640 4         8 s/^IO::Async::/Ia/g;
641 4         8 s/^Web::Async::/Wa/g;
642 4         6 s/^Tickit::Async::/Ta/g;
643 4         6 s/^Tickit::Widget::/TW/g;
644 4         11 s/::([^:]*)$/->$1/;
645             }
646 4   50     23 return $label // 'unknown';
647             }
648              
649             1;
650              
651             __END__