File Coverage

blib/lib/Ryu/Source.pm
Criterion Covered Total %
statement 582 902 64.5
branch 184 320 57.5
condition 95 167 56.8
subroutine 137 227 60.3
pod 74 81 91.3
total 1072 1697 63.1


line stmt bran cond sub pod time code
1             package Ryu::Source;
2              
3 43     43   462194 use strict;
  43         2317  
  43         5202  
4 43     43   2376 use warnings;
  43         2165  
  43         6167  
5              
6 43     43   15039 use parent qw(Ryu::Node);
  43         11117  
  43         324  
7              
8             our $VERSION = '4.001'; # VERSION
9             our $AUTHORITY = 'cpan:TEAM'; # AUTHORITY
10              
11             =head1 NAME
12              
13             Ryu::Source - base representation for a source of events
14              
15             =head1 SYNOPSIS
16              
17             my $src = Ryu::Source->new;
18             my $chained = $src->map(sub { $_ * $_ })->prefix('value: ')->say;
19             $src->emit($_) for 1..5;
20             $src->finish;
21              
22             =head1 DESCRIPTION
23              
24             This is probably the module you'd want to start with, if you were going to be
25             using any of this. There's a disclaimer in L that may be relevant at this
26             point.
27              
28             =head2 Quick start
29              
30             You'd normally want to start by creating a L instance:
31              
32             my $src = Ryu::Source->new;
33              
34             If you're dealing with L code, use L to ensure that you
35             get properly awaitable L instances:
36              
37             $loop->add(my $ryu = Ryu::Async->new);
38             my $src = $ryu->source;
39              
40             Once you have a source, you'll need two things:
41              
42             =over 4
43              
44             =item * items to put into one end
45              
46             =item * processing to attach to the other end
47              
48             =back
49              
50             For the first, call L:
51              
52             use Future::AsyncAwait;
53             # 1s drifting periodic timer
54             while(1) {
55             await $loop->delay_future(after => 1);
56             $src->emit('');
57             }
58              
59             For the second, this would be L:
60              
61             $src->each(sub { print "Had timer tick\n" });
62              
63             So far, not so useful - the power of this type of reactive programming is in the
64             ability to chain and combine disparate event sources.
65              
66             At this point, L is worth a visit - this provides a clear
67             visual demonstration of how to combine multiple event streams using the chaining
68             methods. Most of the API here is modelled after similar principles.
69              
70             First, the L method: this provides a way to transform each item into
71             something else:
72              
73             $src->map(do { my $count = 0; sub { ++$count } })
74             ->each(sub { print "Count is now $_\n" })
75              
76             Next, L provides an equivalent to Perl's L functionality:
77              
78             $src->map(do { my $count = 0; sub { ++$count } })
79             ->filter(sub { $_ % 2 })
80             ->each(sub { print "Count is now at an odd number: $_\n" })
81              
82             You can stack these:
83              
84             $src->map(do { my $count = 0; sub { ++$count } })
85             ->filter(sub { $_ % 2 })
86             ->filter(sub { $_ % 5 })
87             ->each(sub { print "Count is now at an odd number which is not divisible by 5: $_\n" })
88              
89             or:
90              
91             $src->map(do { my $count = 0; sub { ++$count } })
92             ->map(sub { $_ % 3 ? 'fizz' : $_ })
93             ->map(sub { $_ % 5 ? 'buzz' : $_ })
94             ->each(sub { print "An imperfect attempt at the fizz-buzz game: $_\n" })
95              
96             =cut
97              
98 43     43   25955 no indirect;
  43         59992  
  43         1291  
99 43     43   23555 use sort qw(stable);
  43         18383  
  43         496  
100              
101 43     43   2187 use Scalar::Util ();
  43         123  
  43         992  
102 43     43   22749 use Ref::Util ();
  43         123317  
  43         1906  
103 43     43   358 use List::Util ();
  43         96  
  43         1503  
104 43     43   22978 use List::UtilsBy;
  43         102797  
  43         2612  
105 43     43   21866 use Encode ();
  43         721708  
  43         2437  
106 43     43   27304 use Syntax::Keyword::Try;
  43         125412  
  43         304  
107 43     43   4898 use Future;
  43         114  
  43         2198  
108 43     43   22726 use Future::Queue;
  43         90356  
  43         2936  
109 43     43   19670 use curry::weak;
  43         62527  
  43         2112  
110              
111 43     43   23283 use Ryu::Buffer;
  43         125  
  43         3123  
112              
113 43     43   21088 use Log::Any qw($log);
  43         540389  
  43         273  
114              
115             =head1 GLOBALS
116              
117             =head2 $FUTURE_FACTORY
118              
119             This is a coderef which should return a new L-compatible instance.
120              
121             Example overrides might include:
122              
123             $Ryu::Source::FUTURE_FACTORY = sub { Mojo::Future->new->set_label($_[1]) };
124              
125             =cut
126              
127             our $FUTURE_FACTORY = sub {
128             Future->new->set_label($_[1])
129             };
130              
131             =head2 %ENCODER
132              
133             An encoder is a coderef which takes input and returns output.
134              
135             =cut
136              
137             our %ENCODER = (
138             utf8 => sub {
139             sub {
140             Encode::encode_utf8($_)
141             }
142             },
143             json => sub {
144             require JSON::MaybeXS;
145             my $json = JSON::MaybeXS->new(@_);
146             sub {
147             $json->encode($_)
148             }
149             },
150             csv => sub {
151             require Text::CSV;
152             my $csv = Text::CSV->new(@_);
153             sub {
154             die $csv->error_input unless $csv->combine(@$_);
155             $csv->string
156             }
157             },
158             base64 => sub {
159             require MIME::Base64;
160             sub {
161             MIME::Base64::encode_base64($_, '');
162             }
163             },
164             );
165             # The naming of this one is a perennial source of confusion in Perl,
166             # let's just support both
167             $ENCODER{'UTF-8'} = $ENCODER{utf8};
168              
169             our %DECODER = (
170             utf8 => sub {
171             my $data = '';
172             sub {
173             $data .= $_;
174             Encode::decode_utf8($data, Encode::FB_QUIET)
175             }
176             },
177             json => sub {
178             require JSON::MaybeXS;
179             my $json = JSON::MaybeXS->new(@_);
180             sub {
181             $json->decode($_)
182             }
183             },
184             csv => sub {
185             require Text::CSV;
186             my $csv = Text::CSV->new(@_);
187             sub {
188             die $csv->error_input unless $csv->parse($_);
189             [ $csv->fields ]
190             }
191             },
192             base64 => sub {
193             require MIME::Base64;
194             sub {
195             MIME::Base64::decode_base64($_);
196             }
197             },
198             );
199             $DECODER{'UTF-8'} = $DECODER{utf8};
200              
201             =head1 METHODS
202              
203             =head2 new
204              
205             Takes named parameters, such as:
206              
207             =over 4
208              
209             =item * label - the label used in descriptions
210              
211             =back
212              
213             Note that this is rarely called directly, see L, L and L instead.
214              
215             =cut
216              
217             sub new {
218 223     223 1 9650745 my ($self, %args) = @_;
219 223   100     1104 $args{label} //= 'unknown';
220 223   50     1183 $args{on_item} //= [];
221 223   50     1066 $args{on_batch} //= [];
222 223         1344 $self->SUPER::new(%args);
223             }
224              
225             =head2 from
226              
227             Creates a new source from things.
228              
229             The precise details of what this method supports may be somewhat ill-defined at this point in time.
230             It is expected that the interface and internals of this method will vary greatly in versions to come.
231              
232             At the moment, the following inputs are supported:
233              
234             =over 4
235              
236             =item * arrayref - when called as C<< ->from([1,2,3]) >> this will emit the values from the arrayref,
237             deferring until the source is started
238              
239             =item * L - given a L instance, will emit the results when that L is marked as done
240              
241             =item * file handle - if provided a filehandle, such as C<< ->from(\*STDIN) >>, this will read bytes and
242             emit those until EOF
243              
244             =back
245              
246             =cut
247              
248             sub from {
249 1     1 1 3 my $class = shift;
250 1 50       4 my $src = (ref $class) ? $class : $class->new;
251 1 50       7 if(my $from_class = Scalar::Util::blessed($_[0])) {
    50          
252 0 0       0 if($from_class->isa('Future')) {
253             $_[0]->on_ready(sub {
254 0     0   0 my ($f) = @_;
255 0 0       0 if($f->failure) {
    0          
256 0         0 $src->fail($f->from_future);
257             } elsif(!$f->is_cancelled) {
258 0         0 $src->finish;
259             } else {
260 0         0 $src->emit($f->get);
261 0         0 $src->finish;
262             }
263 0         0 })->retain;
264 0         0 return $src;
265             } else {
266 0         0 die 'Unknown class ' . $from_class . ', cannot turn it into a source';
267             }
268             } elsif(my $ref = ref($_[0])) {
269 1 50       3 if($ref eq 'ARRAY') {
    0          
270 1         1 my $data = $_[0];
271             $src->{on_get} = sub {
272 1     1   4 while($data->@*) {
273 3         5 $src->emit(shift $data->@*);
274             }
275 1         6 $src->finish;
276 1         5 };
277 1         3 return $src;
278             } elsif($ref eq 'GLOB') {
279 0 0       0 if(my $fh = *{$_[0]}{IO}) {
  0         0  
280             my $code = sub {
281 0     0   0 while(read $fh, my $buf, 4096) {
282 0         0 $src->emit($buf)
283             }
284             $src->finish
285 0         0 };
  0         0  
286 0         0 $src->{on_get} = $code;
287 0         0 return $src;
288             } else {
289 0         0 die "have a GLOB with no IO entry, this is not supported"
290             }
291             }
292 0         0 die "unsupported ref type $ref";
293             } else {
294 0         0 die "unknown item in ->from";
295             }
296             }
297              
298             =head2 empty
299              
300             Creates an empty source, which finishes immediately.
301              
302             =cut
303              
304             sub empty {
305 0     0 1 0 my ($class) = @_;
306              
307 0         0 $class->new(label => (caller 0)[3] =~ /::([^:]+)$/)->finish
308             }
309              
310             =head2 never
311              
312             An empty source that never finishes.
313              
314             =cut
315              
316             sub never {
317 0     0 1 0 my ($class) = @_;
318              
319 0         0 $class->new(label => (caller 0)[3] =~ /::([^:]+)$/)
320             }
321              
322             =head1 METHODS - Instance
323              
324             =cut
325              
326             =head2 encode
327              
328             Passes each item through an encoder.
329              
330             The first parameter is the encoder to use, the remainder are
331             used as options for the selected encoder.
332              
333             Examples:
334              
335             $src->encode('json')
336             $src->encode('utf8')
337             $src->encode('base64')
338              
339             =cut
340              
341             sub encode {
342 2     2 1 18 my ($self, $type) = splice @_, 0, 2;
343 2         26 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
344 2   50     18 my $code = ($ENCODER{$type} || $self->can('encode_' . $type) or die "unsupported encoding $type")->(@_);
345             $self->each_while_source(sub {
346 2     2   5 $src->emit($code->($_))
347 2         18 }, $src);
348             }
349              
350             =head2 decode
351              
352             Passes each item through a decoder.
353              
354             The first parameter is the decoder to use, the remainder are
355             used as options for the selected decoder.
356              
357             Examples:
358              
359             $src->decode('json')
360             $src->decode('utf8')
361             $src->decode('base64')
362              
363             =cut
364              
365             sub decode {
366 0     0 1 0 my ($self, $type) = splice @_, 0, 2;
367 0         0 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
368 0   0     0 my $code = ($DECODER{$type} || $self->can('decode_' . $type) or die "unsupported encoding $type")->(@_);
369             $self->each_while_source(sub {
370 0     0   0 $src->emit($code->($_))
371 0         0 }, $src);
372             }
373              
374             =head2 print
375              
376             Shortcut for C<< ->each(sub { print }) >>, except this will
377             also save the initial state of C< $\ > and use that for each
378             call for consistency.
379              
380             =cut
381              
382             sub print {
383 0     0 1 0 my ($self) = @_;
384 0         0 my $delim = $\;
385 0     0   0 $self->each(sub { local $\ = $delim; print });
  0         0  
  0         0  
386             }
387              
388             =head2 say
389              
390             Shortcut for C<< ->each(sub { print "$_\n" }) >>.
391              
392             =cut
393              
394             sub say {
395 0     0 1 0 my ($self) = @_;
396 0     0   0 $self->each(sub { local $\; print "$_\n" });
  0         0  
  0         0  
397             }
398              
399             =head2 hexdump
400              
401             Convert input bytes to a hexdump representation, for example:
402              
403             00000000 00 00 12 04 00 00 00 00 00 00 03 00 00 00 80 00 >................<
404             00000010 04 00 01 00 00 00 05 00 ff ff ff 00 00 04 08 00 >................<
405             00000020 00 00 00 00 7f ff 00 00 >........<
406              
407             One line is emitted for each 16 bytes.
408              
409             Takes the following named parameters:
410              
411             =over 4
412              
413             =item * C - accumulates data for a continuous stream, and
414             does not reset the offset counter. Note that this may cause the last
415             output to be delayed until the source completes.
416              
417             =back
418              
419             =cut
420              
421             sub hexdump {
422 0     0 1 0 my ($self, %args) = @_;
423              
424 0         0 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
425 0         0 my $offset = 0;
426 0         0 my $in = '';
427             $self->each_while_source(sub {
428 0     0   0 my @out;
429 0 0       0 if($args{continuous}) {
430 0         0 $in .= $_;
431 0 0       0 return if length($in) < 16;
432             } else {
433 0         0 $in = $_;
434 0         0 $offset = 0;
435             }
436 0         0 while(length(my $bytes = substr $in, 0, List::Util::min(length($in), 16), '')) {
437 0         0 my $encoded = join '', unpack 'H*' => $bytes;
438 0         0 $encoded =~ s/[[:xdigit:]]{2}\K(?=[[:xdigit:]])/ /g;
439 0         0 my $ascii = $bytes =~ s{[^[:print:]]}{.}gr;
440 0         0 $src->emit(sprintf '%08x %-47.47s %-18.18s', $offset, $encoded, ">$ascii<");
441 0         0 $offset += length($bytes);
442 0 0 0     0 return if $args{continuous} and length($in) < 16;
443             }
444 0         0 }, $src);
445             }
446              
447             =head2 throw
448              
449             Throws something. I don't know what, maybe a chair.
450              
451             =cut
452              
453             sub throw {
454 0     0 1 0 my $src = shift->new(@_);
455 0         0 $src->fail('...');
456             }
457              
458             =head2 debounce
459              
460             Not yet implemented.
461              
462             Requires timing support, see implementations such as L instead.
463              
464             =cut
465              
466             sub debounce {
467 0     0 1 0 my ($self, $interval) = @_;
468             ...
469 0         0 }
470              
471             =head2 chomp
472              
473             Chomps all items with the given delimiter.
474              
475             Once you've instantiated this, it will stick with the delimiter which was in force at the time of instantiation.
476             Said delimiter follows the usual rules of C<< $/ >>, whatever they happen to be.
477              
478             Example:
479              
480             $ryu->stdin
481             ->chomp("\n")
482             ->say
483              
484             =cut
485              
486             sub chomp {
487 0     0 1 0 my ($self, $delim) = @_;
488 0   0     0 $delim //= $/;
489             $self->map(sub {
490 0     0   0 local $/ = $delim;
491 0         0 chomp(my $line = $_);
492 0         0 $line
493             })
494 0         0 }
495              
496             =head2 map
497              
498             A bit like L.
499              
500             Takes a single parameter - the coderef to execute for each item. This should return
501             a scalar value which will be used as the next item.
502              
503             Often useful in conjunction with a C<< do >> block to provide a closure.
504              
505             Examples:
506              
507             $src->map(do {
508             my $idx = 0;
509             sub {
510             [ @$_, ++$idx ]
511             }
512             })
513              
514             =cut
515              
516             sub map : method {
517 3     3 1 36 my ($self, $code) = @_;
518              
519 3         53 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
520             $self->each_while_source(sub {
521             $src->emit(Scalar::Util::blessed($_)
522             ? (scalar $_->$code)
523             : !ref($code)
524 10 100   10   56 ? $_->{$code}
    100          
525             : scalar $_->$code
526             )
527 3         30 }, $src);
528             }
529              
530             =head2 flat_map
531              
532             Similar to L, but will flatten out some items:
533              
534             =over 4
535              
536             =item * an arrayref will be expanded out to emit the individual elements
537              
538             =item * for a L, passes on any emitted elements
539              
540             =back
541              
542             This also means you can "merge" items from a series of sources.
543              
544             Note that this is not recursive - an arrayref of arrayrefs will be expanded out
545             into the child arrayrefs, but no further.
546              
547             Failure on any input source will cause this source to be marked as failed as well.
548              
549             =cut
550              
551             sub flat_map {
552 5     5 1 127 my ($self, $code) = splice @_, 0, 2;
553              
554             # Upgrade ->flat_map(method => args...) to a coderef
555 5 50       19 if(!Ref::Util::is_plain_coderef($code)) {
556 0         0 my $method = $code;
557 0         0 my @args = @_;
558 0     0   0 $code = sub { $_->$method(@args) }
559 0         0 }
560              
561 5         70 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
562              
563 5         17 Scalar::Util::weaken(my $weak_sauce = $src);
564             my $add = sub {
565 9     9   13 my $v = shift;
566 9 50       19 my $src = $weak_sauce or return;
567              
568 9         25 my $k = "$v";
569             $src->{waiting}{$k} = $v->on_ready(sub {
570 5         214 my ($f) = @_;
571 5 50       16 return unless my $src = $weak_sauce;
572              
573             # Any failed input source should propagate failure immediately
574 5 100       22 if($f->is_failed) {
575             # Clear out our waitlist, since we don't want to hold those references any more
576 2         19 delete $src->{waiting};
577 2 50       9 $src->fail($f->failure) unless $src->is_ready;
578 2         81 return;
579             }
580              
581 3         29 delete $src->{waiting}{$k};
582 3 100       5 $src->finish unless %{$src->{waiting}};
  3         20  
583 9         41 });
584 9         172 $log->tracef("Added %s which will bring our count to %d", $k, 0 + keys %{$src->{waiting}});
  9         30  
585 5         25 };
586              
587 5         17 $add->($self->_completed);
588             $self->each_while_source(sub {
589 7 50   7   14 my $src = $weak_sauce or return;
590 7         20 for ($code->($_)) {
591 7         26 my $item = $_;
592 7 100 33     49 if(Ref::Util::is_plain_arrayref($item)) {
    50          
593 3         9 $log->tracef("Have an arrayref of %d items", 0 + @$item);
594 3         18 for(@$item) {
595 9 50       11 last if $src->is_ready;
596 9         34 $src->emit($_);
597             }
598             } elsif(Scalar::Util::blessed($item) && $item->isa(__PACKAGE__)) {
599 4         19 $log->tracef("This item is a source");
600             $src->on_ready(sub {
601 2 100       61 return if $item->is_ready;
602 1         9 $log->tracef("Marking %s as ready because %s was", $item->describe, $src->describe);
603 1         19 shift->on_ready($item->_completed);
604 4         69 });
605 4         83 $add->($item->_completed);
606             $item->each_while_source(sub {
607 5 50       14 my $src = $weak_sauce or return;
608 5         14 $src->emit($_)
609             }, $src)->on_ready(sub {
610 2         66 undef $item;
611 4         45 });
612             }
613             }
614 5         80 }, $src);
615 5         24 $src
616             }
617              
618             =head2 split
619              
620             Splits the input on the given delimiter.
621              
622             By default, will split into characters.
623              
624             Note that each item will be processed separately - the buffer won't be
625             retained across items, see L for that.
626              
627             =cut
628              
629             sub split : method {
630 0     0 1 0 my ($self, $delim) = @_;
631 0   0     0 $delim //= qr//;
632              
633 0         0 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
634 0     0   0 $self->each_while_source(sub { $src->emit($_) for split $delim, $_ }, $src);
  0         0  
635             }
636              
637             =head2 chunksize
638              
639             Splits input into fixed-size chunks.
640              
641             Note that output is always guaranteed to be a full chunk - if there is partial input
642             at the time the input stream finishes, those extra bytes will be discarded.
643              
644             =cut
645              
646             sub chunksize : method {
647 0     0 1 0 my ($self, $size) = @_;
648 0 0 0     0 die 'need positive chunk size parameter' unless $size && $size > 0;
649              
650 0         0 my $buffer = '';
651 0         0 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
652             $self->each_while_source(sub {
653 0     0   0 $buffer .= $_;
654 0         0 $src->emit(substr $buffer, 0, $size, '') while length($buffer) >= $size;
655 0         0 }, $src);
656             }
657              
658             =head2 batch
659              
660             Splits input into arrayref batches of a given size.
661              
662             Note that the last item emitted may have fewer elements (or none at all).
663              
664             $src->batch(10)
665             ->map(sub { "Next 10 (or fewer) items: @$_" })
666             ->say;
667              
668             =cut
669              
670             sub batch : method {
671 1     1 1 8 my ($self, $size) = @_;
672 1 50 33     7 die 'need positive batch parameter' unless $size && $size > 0;
673              
674 1         3 my $buffer = '';
675 1         17 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
676 1         4 my @batch;
677             $self->each_while_source(sub {
678 4     4   10 push @batch, $_;
679 4   66     23 while(@batch >= $size and my (@items) = splice @batch, 0, $size) {
680 1         4 $src->emit(\@items)
681             }
682             }, $src, cleanup => sub {
683 1 50   1   3 $src->emit([ splice @batch ]) if @batch;
684 1         11 });
685             }
686              
687             =head2 by_line
688              
689             Emits one item for each line in the input. Similar to L with a C<< \n >> parameter,
690             except this will accumulate the buffer over successive items and only emit when a complete
691             line has been extracted.
692              
693             =cut
694              
695             sub by_line : method {
696 0     0 1 0 my ($self, $delim) = @_;
697 0   0     0 $delim //= $/;
698              
699 0         0 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
700 0         0 my $buffer = '';
701             $self->each_while_source(sub {
702 0     0   0 $buffer .= $_;
703 0         0 while($buffer =~ s/^(.*)\Q$delim//) {
704 0         0 $src->emit($1)
705             }
706 0         0 }, $src);
707             }
708              
709             =head2 prefix
710              
711             Applies a string prefix to each item.
712              
713             =cut
714              
715             sub prefix {
716 1     1 1 7 my ($self, $txt) = @_;
717 1         12 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
718             $self->each_while_source(sub {
719 3     3   8 $src->emit($txt . $_)
720 1         6 }, $src);
721             }
722              
723             =head2 suffix
724              
725             Applies a string suffix to each item.
726              
727             =cut
728              
729             sub suffix {
730 1     1 1 8 my ($self, $txt) = @_;
731 1         18 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
732             $self->each_while_source(sub {
733 3     3   14 $src->emit($_ . $txt)
734 1         10 }, $src);
735             }
736              
737             =head2 sprintf_methods
738              
739             Convenience method for generating a string from a L-style format
740             string and a set of method names to call.
741              
742             Note that any C items will be mapped to an empty string.
743              
744             Example:
745              
746             $src->sprintf_methods('%d has name %s', qw(id name))
747             ->say
748             ->await;
749              
750             =cut
751              
752             sub sprintf_methods {
753 0     0 1 0 my ($self, $fmt, @methods) = @_;
754 0         0 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
755             $self->each_while_source(sub {
756 0     0   0 my ($item) = @_;
757 0   0     0 $src->emit(sprintf $fmt, map $item->$_ // '', @methods)
758 0         0 }, $src);
759             }
760              
761             =head2 ignore
762              
763             Receives items, but ignores them entirely.
764              
765             Emits nothing and eventually completes when the upstream L is done.
766              
767             Might be useful for keeping a source alive.
768              
769             =cut
770              
771             sub ignore {
772 0     0 1 0 my ($self) = @_;
773 0         0 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
774             $self->_completed->on_ready(sub {
775 0 0   0   0 shift->on_ready($src->_completed) unless $src->_completed->is_ready
776 0         0 });
777 0         0 return $src;
778             }
779              
780             =head2 buffer
781              
782             Accumulate items while any downstream sources are paused.
783              
784             Takes the following named parameters:
785              
786             =over 4
787              
788             =item * C - once at least this many items are buffered, will L
789             the upstream L.
790              
791             =item * C - if the buffered count drops to this number, will L
792             the upstream L.
793              
794             =back
795              
796             =cut
797              
798             sub buffer {
799 30     30 1 3280 my $self = shift;
800 30         53 my %args;
801 30 100       111 %args = @_ != 1
802             ? @_
803             : (
804             low => $_[0],
805             high => $_[0],
806             );
807 30   66     140 $args{low} //= $args{high};
808 30   100     146 $args{low} //= 10;
809 30   66     106 $args{high} //= $args{low};
810              
811 30         395 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
812 30         82 $src->{pause_propagation} = 0;
813 30         67 my @pending;
814             $self->_completed->on_ready(sub {
815 29 100 66 29   2426 shift->on_ready($src->_completed) unless $src->_completed->is_ready or @pending;
816 30         71 });
817 30         799 my $fc = $src->flow_control;
818 30         43 my $item_handler = do {
819 30         57 Scalar::Util::weaken(my $weak_self = $self);
820 30         53 Scalar::Util::weaken(my $weak_src = $src);
821             sub {
822 109     109   153 my $self = $weak_self;
823 109 50       223 my $src = $weak_src or return;
824 109 100 66     320 if(@pending >= $args{high} and $self and not $self->is_paused($src)) {
      100        
825 2         6 $self->pause($src);
826             }
827             $src->emit(shift @pending)
828             while @pending
829             and not($src->is_paused)
830 109   100     316 and @{$self->{children}};
  69   66     275  
831 109 100 100     366 $self->resume($src) if @pending <= $args{low} and $self->is_paused($src);
832              
833 109 100       227 return if @pending;
834              
835             # It's common to have a situation where the parent chain completes while we're
836             # paused waiting for the queue to drain. In this situation, we want to propagate
837             # completion only once the queue is empty.
838 97 100 66     209 $self->_completed->on_ready($src->_completed)
839             if $self->_completed->is_ready and not $src->_completed->is_ready;
840             }
841 30         174 };
842             $src->_completed->on_ready(sub {
843 29 50 33 29   772 $self->resume($src) if $self and $self->is_paused($src);
844 30         88 });
845 30         741 $fc->each($item_handler)->retain;
846             $self->each(my $code = sub {
847 69     69   183 push @pending, $_;
848 69         132 $item_handler->()
849 30         163 });
850             $self->_completed->on_ready(sub {
851 29     29   1394 my ($f) = @_;
852 29 100       76 return if @pending;
853 27         49 my $addr = Scalar::Util::refaddr($code);
854 27         114 my $count = List::UtilsBy::extract_by { $addr == Scalar::Util::refaddr($_) } @{$self->{on_item}};
  0         0  
  27         127  
855 27 50       391 $f->on_ready($src->_completed) unless $src->is_ready;
856 27         190 $log->tracef("->buffer completed on %s for refaddr 0x%x, removed %d on_item handlers", $self->describe, Scalar::Util::refaddr($self), $count);
857 30         96 });
858 30         730 $src;
859             }
860              
861             sub remove_handler {
862 0     0 0 0 my ($self, $code) = @_;
863 0         0 my $addr = Scalar::Util::refaddr($code);
864             my $count = List::UtilsBy::extract_by {
865 0     0   0 $addr == Scalar::Util::refaddr($_)
866 0         0 } @{$self->{on_item}};
  0         0  
867 0         0 $log->tracef(
868             "Removing handler on %s with refaddr 0x%x, matched %d total",
869             $self->describe,
870             Scalar::Util::refaddr($self),
871             $count
872             );
873 0         0 return $self;
874             }
875              
876             sub retain {
877 30     30 0 56 my ($self) = @_;
878 30         60 $self->{_self} = $self;
879             $self->_completed
880 30     29   76 ->on_ready(sub { delete $self->{_self} });
  29         908  
881 30         621 $self
882             }
883              
884             =head2 as_list
885              
886             Resolves to a list consisting of all items emitted by this source.
887              
888             =cut
889              
890             sub as_list {
891 10     10 1 509 my ($self) = @_;
892 10         18 my @data;
893             $self->each(sub {
894 52     52   153 push @data, $_
895 10         59 });
896 10     10   684 $self->_completed->transform(done => sub { @data })
897 10         38 }
898              
899             =head2 as_arrayref
900              
901             Resolves to a single arrayref consisting of all items emitted by this source.
902              
903             =cut
904              
905             sub as_arrayref {
906 2     2 1 5 my ($self) = @_;
907 2         3 my @data;
908             $self->each(sub {
909 6     6   17 push @data, $_
910 2         12 });
911 2     2   124 $self->_completed->transform(done => sub { \@data })
912 2         5 }
913              
914             =head2 as_string
915              
916             Concatenates all items into a single string.
917              
918             Returns a L which will resolve on completion.
919              
920             =cut
921              
922             sub as_string {
923 0     0 1 0 my ($self) = @_;
924 0         0 my $data = '';
925             $self->each(sub {
926 0     0   0 $data .= $_;
927 0         0 });
928 0     0   0 $self->_completed->transform(done => sub { $data })
929 0         0 }
930              
931             =head2 as_queue
932              
933             Returns a L instance which will
934             L items whenever the source
935             emits them.
936              
937             The queue will be marked as finished when this source is completed.
938              
939             Parameters passed to this method will be given to the L
940             constructor:
941              
942             use Future::AsyncAwait qw(:experimental(suspend));
943             my $queue = $src->as_queue(
944             max_items => 100
945             );
946             SUSPEND { print "Waiting for more items\n" }
947             while(my @batch = await $queue->shift_atmost(10)) {
948             print "Had batch of @{[ 0 + @batch ]} items\n";
949             }
950              
951             =cut
952              
953             sub as_queue {
954 0     0 1 0 my ($self, %args) = @_;
955 0         0 my $queue = Future::Queue->new(
956             prototype => $self->curry::weak::new_future,
957             %args
958             );
959              
960 0 0       0 if($args{max_items}) {
961 0         0 my $f;
962             $self->each($self->$curry::weak(sub {
963 0     0   0 my ($self) = @_;
964 0 0 0     0 unless(
965             (my $f = $queue->push($_))->is_ready
966             and not $self->is_paused
967             ) {
968 0         0 $f->on_ready(sub { $self->resume });
  0         0  
969 0         0 $self->pause;
970             }
971 0         0 return;
972 0         0 }));
973             } else {
974             # Avoid the extra overhead when we know there isn't going to be any
975             # upper limit on accepted items.
976             $self->each(sub {
977 0     0   0 $queue->push($_);
978 0         0 return;
979 0         0 });
980             }
981 0     0   0 $self->completed->on_ready(sub { $queue->finish });
  0         0  
982 0         0 return $queue;
983             }
984              
985             =head2 as_buffer
986              
987             Returns a L instance, which will
988             L any emitted items from this
989             source to the buffer as they arrive.
990              
991             Intended for stream protocol handling - individual
992             sized packets are perhaps better suited to the
993             L per-item behaviour.
994              
995             Supports the following named parameters:
996              
997             =over 4
998              
999             =item * C - low waterlevel for buffer, start accepting more bytes
1000             once the L has less content than this
1001              
1002             =item * C - high waterlevel for buffer, will pause the parent stream
1003             if this is reached
1004              
1005             =back
1006              
1007             The backpressure (low/high) values default to undefined, meaning
1008             no backpressure is applied: the buffer will continue to fill
1009             indefinitely.
1010              
1011             =cut
1012              
1013             sub as_buffer {
1014 1     1 1 535 my ($self, %args) = @_;
1015 1         3 my $low = delete $args{low};
1016 1         2 my $high = delete $args{high};
1017             # We're creating a source but keeping it to ourselves here
1018 1         13 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1019              
1020             my $buffer = Ryu::Buffer->new(
1021             new_future => $self->{new_future},
1022             %args,
1023             on_change => sub {
1024 2     2   3 my ($self) = @_;
1025 2 100 66     13 $src->resume if $low and $self->size <= $low;
1026             }
1027 1         10 );
1028              
1029 1         2 Scalar::Util::weaken(my $weak_sauce = $src);
1030 1         1 Scalar::Util::weaken(my $weak_buffer = $buffer);
1031             $self->each_while_source(sub {
1032 6 50   6   13 my $src = $weak_sauce or return;
1033 6 100       14 my $buf = $weak_buffer or do {
1034 1         3 $src->finish;
1035 1         4 return;
1036             };
1037 5         15 $buf->write($_);
1038 5 100 66     14 $src->pause if $high and $buf->size >= $high;
1039 5 100 66     12 $src->resume if $low and $buf->size <= $low;
1040 1         6 }, $src);
1041 1         3 return $buffer;
1042             }
1043              
1044             =head2 as_last
1045              
1046             Returns a L which resolves to the last value received.
1047              
1048             =cut
1049              
1050             sub as_last {
1051 10     10 1 26 my ($self) = @_;
1052 10         14 my $v;
1053             $self->each(sub {
1054 10     10   33 $v = $_;
1055 10         48 });
1056 10     10   721 $self->_completed->transform(done => sub { $v })
1057 10         57 }
1058              
1059             =head2 as_void
1060              
1061             Returns a L which resolves to an empty list.
1062              
1063             =cut
1064              
1065             sub as_void {
1066 0     0 1 0 my ($self) = @_;
1067 0     0   0 $self->_completed->transform(done => sub { () })
1068 0         0 }
1069              
1070             =head2 combine_latest
1071              
1072             Takes the most recent item from one or more Ls, and emits
1073             an arrayref containing the values in order.
1074              
1075             An item is emitted for each update as soon as all sources have provided
1076             at least one value. For example, given 2 sources, if the first emits C<1>
1077             then C<2>, then the second emits C, this would emit a single C<< [2, 'a'] >>
1078             item.
1079              
1080             =cut
1081              
1082             sub combine_latest : method {
1083 1     1 1 11 my ($self, @sources) = @_;
1084 1 50   0   4 push @sources, sub { @_ } if Scalar::Util::blessed $sources[-1];
  0         0  
1085 1         2 my $code = pop @sources;
1086              
1087 1         13 my $combined = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1088 1 50       5 unshift @sources, $self if ref $self;
1089 1         2 my @value;
1090             my %seen;
1091 1         3 for my $idx (0..$#sources) {
1092 2         3 my $src = $sources[$idx];
1093             $src->each_while_source(sub {
1094 5     5   6 $value[$idx] = $_;
1095 5   100     16 $seen{$idx} ||= 1;
1096 5 100       10 $combined->emit([ $code->(@value) ]) if @sources == keys %seen;
1097 2         10 }, $combined);
1098             }
1099             Future->needs_any(
1100             map $_->completed, @sources
1101             )->on_ready(sub {
1102 0     0   0 @value = ();
1103 0 0       0 return if $combined->_completed->is_ready;
1104 0         0 shift->on_ready($combined->_completed)
1105 1         7 })->retain;
1106 1         193 $combined
1107             }
1108              
1109             =head2 with_index
1110              
1111             Emits arrayrefs consisting of C<< [ $item, $idx ] >>.
1112              
1113             =cut
1114              
1115             sub with_index {
1116 1     1 1 10 my ($self) = @_;
1117 1         15 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1118 1         3 my $idx = 0;
1119             $self->each_while_source(sub {
1120 3     3   10 $src->emit([ $_, $idx++ ])
1121 1         9 }, $src);
1122             }
1123              
1124             =head2 with_latest_from
1125              
1126             Similar to L, but will start emitting as soon as
1127             we have any values. The arrayref will contain C<< undef >> for any
1128             sources which have not yet emitted any items.
1129              
1130             =cut
1131              
1132             sub with_latest_from : method {
1133 0     0 1 0 my ($self, @sources) = @_;
1134 0 0   0   0 push @sources, sub { @_ } if Scalar::Util::blessed $sources[-1];
  0         0  
1135 0         0 my $code = pop @sources;
1136              
1137 0         0 my $combined = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1138 0         0 my @value;
1139             my %seen;
1140 0         0 for my $idx (0..$#sources) {
1141 0         0 my $src = $sources[$idx];
1142             $src->each(sub {
1143 0 0   0   0 return if $combined->_completed->is_ready;
1144 0         0 $value[$idx] = $_;
1145 0   0     0 $seen{$idx} ||= 1;
1146 0         0 });
1147             }
1148             $self->each(sub {
1149 0 0   0   0 $combined->emit([ $code->(@value) ]) if keys %seen;
1150 0         0 });
1151 0         0 $self->_completed->on_ready($combined->_completed);
1152             $self->_completed->on_ready(sub {
1153 0     0   0 @value = ();
1154 0 0       0 return if $combined->is_ready;
1155 0         0 shift->on_ready($combined->_completed);
1156 0         0 });
1157 0         0 $combined
1158             }
1159              
1160             =head2 merge
1161              
1162             Emits items as they are generated by the given sources.
1163              
1164             Example:
1165              
1166             $numbers->merge($letters)->say # 1, 'a', 2, 'b', 3, 'c'...
1167              
1168             =cut
1169              
1170             sub merge : method {
1171 4     4 1 21 my ($self, @sources) = @_;
1172              
1173 4         34 my $combined = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1174 4 50       16 unshift @sources, $self if ref $self;
1175 4         8 for my $src (@sources) {
1176             $src->each(sub {
1177 16 50   16   21 return if $combined->_completed->is_ready;
1178 16         55 $combined->emit($_)
1179 5         37 });
1180             }
1181             Future->needs_all(
1182             map $_->completed, @sources
1183             )->on_ready($combined->_completed)
1184 3     3   78 ->on_ready(sub { @sources = () })
1185 4         21 ->retain;
1186 4         172 $combined
1187             }
1188              
1189             =head2 emit_from
1190              
1191             Emits items as they are generated by the given sources.
1192              
1193             Example:
1194              
1195             my $src = Ryu::Source->new;
1196             $src->say;
1197             $src->emit_from(
1198             $numbers,
1199             $letters
1200             );
1201              
1202             =cut
1203              
1204             sub emit_from : method {
1205 1     1 1 8 my ($self, @sources) = @_;
1206              
1207 1         4 for my $src (@sources) {
1208             $src->each_while_source(sub {
1209 5 50   5   12 return if $self->_completed->is_ready;
1210 5         32 $self->emit($_)
1211 2         14 }, $self);
1212             }
1213             $self
1214 1         4 }
1215              
1216             =head2 apply
1217              
1218             Used for setting up multiple streams.
1219              
1220             Accepts a variable number of coderefs, will call each one and gather L
1221             results.
1222              
1223             =cut
1224              
1225             sub apply : method {
1226 0     0 1 0 my ($self, @code) = @_;
1227              
1228 0         0 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1229 0         0 my @pending;
1230 0         0 for my $code (@code) {
1231 0         0 push @pending, map $code->($_), $self;
1232             }
1233             Future->needs_all(
1234 0         0 map $_->completed, @pending
1235             )->on_ready($src->_completed)
1236             ->retain;
1237             # Pass through the original events
1238             $self->each_while_source(sub {
1239 0     0   0 $src->emit($_)
1240 0         0 }, $src)
1241             }
1242              
1243             =head2 switch_str
1244              
1245             Given a condition, will select one of the alternatives based on stringified result.
1246              
1247             Example:
1248              
1249             $src->switch_str(
1250             sub { $_->name }, # our condition
1251             smith => sub { $_->id }, # if this matches the condition, the code will be called with $_ set to the current item
1252             jones => sub { $_->parent->id },
1253             sub { undef } # and this is our default case
1254             );
1255              
1256             =cut
1257              
1258             sub switch_str {
1259 1     1 1 22 my ($self, $condition, @args) = @_;
1260              
1261 1         16 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1262 1         3 my @active;
1263             $self->_completed->on_ready(sub {
1264             Future->needs_all(
1265             grep $_, @active
1266             )->on_ready(sub {
1267 0         0 $src->finish
1268 0     0   0 })->retain
1269 1         5 });
1270              
1271             $self->each_while_source(sub {
1272 3     3   7 my ($item) = $_;
1273 3         10 my $rslt = $condition->($item);
1274             (Scalar::Util::blessed($rslt) && $rslt->isa('Future') ? $rslt : Future->done($rslt))->on_done(sub {
1275 3         138 my ($data) = @_;
1276 3         10 my @copy = @args;
1277 3         13 while(my ($k, $v) = splice @copy, 0, 2) {
1278 6 100       25 if(!defined $v) {
    100          
1279             # Only a single value (or undef)? That's our default, just use it as-is
1280 1         5 return $src->emit(map $k->($_), $item)
1281             } elsif($k eq $data) {
1282             # Key matches our result? Call code with the original item
1283 2         10 return $src->emit(map $v->($_), $item)
1284             }
1285             }
1286 3 50 33     28 })->retain
1287 1         30 }, $src)
1288             }
1289              
1290             =head2 ordered_futures
1291              
1292             Given a stream of Ls, will emit the results as each L
1293             is marked ready.
1294              
1295             If any L in the stream fails, that will mark this source as failed,
1296             and all remaining L instances will be cancelled. To avoid this behaviour
1297             and leave the L instances active, use:
1298              
1299             $src->map('without_cancel')
1300             ->ordered_futures
1301              
1302             See L for more details.
1303              
1304             Takes the following named parameters:
1305              
1306             =over 4
1307              
1308             =item * C - once at least this many unresolved L instances are pending,
1309             will L the upstream L.
1310              
1311             =item * C - if the pending count drops to this number, will L
1312             the upstream L.
1313              
1314             =back
1315              
1316             This method is also available as L.
1317              
1318             =cut
1319              
1320             sub ordered_futures {
1321 5     5 1 38 my ($self, %args) = @_;
1322 5         13 my $high = delete $args{high};
1323 5   66     62 my $low = (delete $args{low}) // $high // 0;
      100        
1324 5         73 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1325 5         16 my %pending;
1326 5         16 my $src_completed = $src->_completed;
1327              
1328 5         9 my $all_finished;
1329             $self->_completed->on_ready(sub {
1330 5     5   190 $all_finished = shift;
1331 5 100 66     27 $all_finished->on_ready($src_completed) unless %pending or $src_completed->is_ready;
1332 5         16 });
1333              
1334             $src_completed->on_ready(sub {
1335 4     4   129 my @pending = values %pending;
1336 4         9 %pending = ();
1337 4         13 for(@pending) {
1338 3 100 66     38 $_->cancel if $_ and not $_->is_ready;
1339             }
1340 5         134 });
1341 5         110 my $paused = 0;
1342             $self->each(sub {
1343 13     13   23 my $f = $_;
1344 13         24 my $k = Scalar::Util::refaddr $f;
1345             # This will keep a copy of the Future around until the
1346             # ->is_ready callback removes it
1347 13         64 $pending{$k} = $f;
1348 13         60 $log->tracef('Ordered futures has %d pending', 0 + keys %pending);
1349 13 100 66     200 if(!$paused and $high and keys(%pending) >= $high) {
      100        
1350 1         9 $src->pause;
1351 1         2 ++$paused;
1352             }
1353             $f->on_done(sub {
1354 9         2282 my @pending = @_;
1355 9   66     44 while(@pending and not $src_completed->is_ready) {
1356 4         34 $src->emit(shift @pending);
1357             }
1358             })
1359 1 50       725 ->on_fail(sub { $src->fail(@_) unless $src_completed->is_ready; })
1360             ->on_ready(sub {
1361 12         408 delete $pending{$k};
1362 12 100 100     47 if($paused and keys(%pending) <= $low) {
1363 1         10 $src->resume;
1364 1         1 --$paused;
1365             }
1366 12         60 $log->tracef('Ordered futures now has %d pending after completion, upstream finish status is %d', 0 + keys(%pending), $all_finished);
1367 12 100       157 return if %pending;
1368 6 100 100     28 $all_finished->on_ready($src_completed) if $all_finished and not $src_completed->is_ready;
1369             })
1370 5         38 });
  13         136  
1371 5         24 return $src;
1372             }
1373              
1374             =head2 resolve
1375              
1376             A synonym for L.
1377              
1378             =cut
1379              
1380             *resolve = *ordered_futures;
1381              
1382             =head2 concurrent
1383              
1384             =cut
1385              
1386             sub concurrent {
1387 0     0 1 0 my ($self) = @_;
1388 0         0 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1389             $self->each_while_source(sub {
1390 0     0   0 $_->on_done($src->curry::weak::emit)
1391             ->on_fail($src->curry::weak::fail)
1392             ->retain
1393 0         0 }, $src);
1394             }
1395              
1396             =head2 distinct
1397              
1398             Emits new distinct items, using string equality with an exception for
1399             C (i.e. C is treated differently from empty string or 0).
1400              
1401             Given 1,2,3,undef,2,3,undef,'2',2,4,1,5, you'd expect to get the sequence 1,2,3,undef,4,5.
1402              
1403             =cut
1404              
1405             sub distinct {
1406 1     1 1 8 my $self = shift;
1407              
1408 1         16 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1409 1         5 my %seen;
1410             my $undef;
1411             $self->each_while_source(sub {
1412 22 100   22   42 if(defined) {
1413 17 100       86 $src->emit($_) unless $seen{$_}++;
1414             } else {
1415 5 100       19 $src->emit($_) unless $undef++;
1416             }
1417 1         10 }, $src);
1418             }
1419              
1420             =head2 distinct_until_changed
1421              
1422             Removes contiguous duplicates, defined by string equality.
1423              
1424             =cut
1425              
1426             sub distinct_until_changed {
1427 1     1 1 10 my $self = shift;
1428              
1429 1         16 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1430 1         3 my $active;
1431             my $prev;
1432             $self->each_while_source(sub {
1433 18 100   18   35 if($active) {
1434 17 100       43 if(defined($prev) ^ defined($_)) {
    100          
1435 10         21 $src->emit($_)
1436             } elsif(defined($_)) {
1437 5 100       18 $src->emit($_) if $prev ne $_;
1438             }
1439             } else {
1440 1         3 $active = 1;
1441 1         5 $src->emit($_);
1442             }
1443 18         38 $prev = $_;
1444 1         11 }, $src);
1445 1         9 $src
1446             }
1447              
1448             =head2 sort_by
1449              
1450             Emits items sorted by the given key. This is a stable sort function.
1451              
1452             The algorithm is taken from L.
1453              
1454             =cut
1455              
1456             sub sort_by {
1457 43     43   958417 use sort qw(stable);
  43         179  
  43         412  
1458 0     0 1 0 my ($self, $code) = @_;
1459 0         0 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1460 0         0 my @items;
1461             my @keys;
1462       0     $self->_completed->on_done(sub {
1463             })->on_ready(sub {
1464 0 0   0   0 return if $src->is_ready;
1465 0         0 shift->on_ready($src->_completed);
1466 0         0 });
1467             $self->each_while_source(sub {
1468 0     0   0 push @items, $_;
1469 0         0 push @keys, $_->$code;
1470             }, $src, cleanup => sub {
1471 0     0   0 my ($f) = @_;
1472 0 0       0 return unless $f->is_done;
1473 0         0 $src->emit($_) for @items[sort { $keys[$a] cmp $keys[$b] } 0 .. $#items];
  0         0  
1474 0         0 });
1475             }
1476              
1477             =head2 nsort_by
1478              
1479             Emits items numerically sorted by the given key. This is a stable sort function.
1480              
1481             See L.
1482              
1483             =cut
1484              
1485             sub nsort_by {
1486 43     43   34926 use sort qw(stable);
  43         127  
  43         212  
1487 0     0 1 0 my ($self, $code) = @_;
1488 0         0 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1489 0         0 my @items;
1490             my @keys;
1491             $self->each_while_source(sub {
1492 0     0   0 push @items, $_;
1493 0         0 push @keys, $_->$code;
1494             }, $src, cleanup => sub {
1495 0 0   0   0 return unless shift->is_done;
1496 0         0 $src->emit($_) for @items[sort { $keys[$a] <=> $keys[$b] } 0 .. $#items];
  0         0  
1497 0         0 });
1498             }
1499              
1500             =head2 rev_sort_by
1501              
1502             Emits items sorted by the given key. This is a stable sort function.
1503              
1504             The algorithm is taken from L.
1505              
1506             =cut
1507              
1508             sub rev_sort_by {
1509 43     43   19886 use sort qw(stable);
  43         101  
  43         195  
1510 0     0 1 0 my ($self, $code) = @_;
1511 0         0 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1512 0         0 my @items;
1513             my @keys;
1514             $self->each_while_source(sub {
1515 0     0   0 push @items, $_;
1516 0         0 push @keys, $_->$code;
1517             }, $src, cleanup => sub {
1518 0 0   0   0 return unless shift->is_done;
1519 0         0 $src->emit($_) for @items[sort { $keys[$b] cmp $keys[$a] } 0 .. $#items];
  0         0  
1520 0         0 });
1521             }
1522              
1523             =head2 rev_nsort_by
1524              
1525             Emits items numerically sorted by the given key. This is a stable sort function.
1526              
1527             See L.
1528              
1529             =cut
1530              
1531             sub rev_nsort_by {
1532 0     0 1 0 my ($self, $code) = @_;
1533 0         0 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1534 0         0 my @items;
1535             my @keys;
1536             $self->each_while_source(sub {
1537 0     0   0 push @items, $_;
1538 0         0 push @keys, $_->$code;
1539             }, $src, cleanup => sub {
1540 0 0   0   0 return unless shift->is_done;
1541 0         0 $src->emit($_) for @items[sort { $keys[$b] <=> $keys[$a] } 0 .. $#items];
  0         0  
1542 0         0 });
1543             }
1544              
1545             =head2 extract_all
1546              
1547             Expects a regular expression and emits hashrefs containing
1548             the named capture buffers.
1549              
1550             The regular expression will be applied using the m//gc operator.
1551              
1552             Example:
1553              
1554             $src->extract_all(qr{/(?[^/]+)})
1555             # emits { component => '...' }, { component => '...' }
1556              
1557             =cut
1558              
1559             sub extract_all {
1560 1     1 1 13 my ($self, $pattern) = @_;
1561 1         17 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1562             $self->each_while_source(sub {
1563 3     3   80 $src->emit(+{ %+ }) while m/$pattern/gc;
1564 1         9 }, $src);
1565             }
1566              
1567             =head2 skip
1568              
1569             Skips the first N items.
1570              
1571             =cut
1572              
1573             sub skip {
1574 1     1 1 11 my ($self, $count) = @_;
1575 1   50     3 $count //= 0;
1576              
1577 1         17 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1578             $self->_completed->on_ready(sub {
1579 1 50   1   31 return if $src->is_ready;
1580 1         9 shift->on_ready($src->_completed);
1581 1         6 });
1582             $self->each(sub {
1583 5 100   5   21 $src->emit($_) unless $count-- > 0;
1584 1         30 });
1585 1         7 $src
1586             }
1587              
1588             =head2 skip_last
1589              
1590             Skips the last N items.
1591              
1592             =cut
1593              
1594             sub skip_last {
1595 1     1 1 6 my ($self, $count) = @_;
1596 1   50     3 $count //= 0;
1597              
1598 1         12 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1599             $self->_completed->on_ready(sub {
1600 1 50   1   17 return if $src->is_ready;
1601 1         6 shift->on_ready($src->_completed);
1602 1         4 });
1603 1         14 my @pending;
1604             $self->each(sub {
1605 5     5   5 push @pending, $_;
1606 5 100       15 $src->emit(shift @pending) if @pending > $count;
1607 1         6 });
1608 1         6 $src
1609             }
1610              
1611             =head2 skip_until
1612              
1613             Skips the items that arrive before a given condition is reached.
1614              
1615             =over 4
1616              
1617             =item * Either a L instance (we skip all items until it's marked as `done`), or a coderef,
1618             which we call for each item until it first returns true
1619              
1620             =back
1621              
1622             =cut
1623              
1624             sub skip_until {
1625 2     2 1 37 my ($self, $condition) = @_;
1626              
1627 2         34 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1628 2         8 $self->each_while_source(do {
1629 2 100 33     29 if(ref($condition) eq 'CODE') {
    50          
1630 1         3 my $reached = 0;
1631 5 100 100 5   23 sub { return $src->emit($_) if $reached ||= $condition->($_); }
1632 1         9 } elsif(Scalar::Util::blessed($condition) && $condition->isa('Future')) {
1633             $condition->on_ready($src->$curry::weak(sub {
1634 1     1   78 my ($src, $cond) = @_;
1635 1 50       6 return if $src->is_ready;
1636 1 50       13 $src->fail($cond->failure) if $cond->is_failed;
1637 1 50       12 $src->cancel if $cond->is_cancelled
1638 1         9 }));
1639 4 100   4   15 sub { $src->emit($_) if $condition->is_done; }
1640 1         71 } else {
1641 0         0 die 'unknown type for condition: ' . $condition;
1642             }
1643             }, $src);
1644             }
1645              
1646             =head2 take_until
1647              
1648             Passes through items that arrive until a given condition is reached.
1649              
1650             Expects a single parameter, which can be one of the following:
1651              
1652             =over 4
1653              
1654             =item * a L instance - we will skip all items until it's marked as C
1655              
1656             =item * a coderef, which we call for each item until it first returns true
1657              
1658             =item * or a L, in which case we stop when that first emits a value
1659              
1660             =back
1661              
1662             =cut
1663              
1664             sub take_until {
1665 0     0 1 0 my ($self, $condition) = @_;
1666              
1667 0         0 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1668 0 0 0     0 if(Scalar::Util::blessed($condition) && $condition->isa('Ryu::Source')) {
1669             $condition->_completed->on_ready(sub {
1670 0     0   0 $log->warnf('Condition completed: %s and %s', $condition->describe, $src->describe);
1671 0 0       0 return if $src->is_ready;
1672 0         0 $log->warnf('Mark as ready');
1673 0         0 shift->on_ready($src->_completed);
1674 0         0 });
1675             $condition->first->each(sub {
1676 0 0   0   0 $src->finish unless $src->is_ready
1677 0         0 });
1678 0         0 return $self->each_while_source($src->curry::emit, $src);
1679             } else {
1680 0         0 return $self->each_while_source(do {
1681 0 0 0     0 if(ref($condition) eq 'CODE') {
    0          
1682 0         0 my $reached = 0;
1683 0 0 0 0   0 sub { return $src->emit($_) unless $reached ||= $condition->($_); }
1684 0         0 } elsif(Scalar::Util::blessed($condition) && $condition->isa('Future')) {
1685             $condition->on_ready($src->$curry::weak(sub {
1686 0     0   0 my ($src, $cond) = @_;
1687 0 0       0 return if $src->is_ready;
1688 0 0       0 $src->fail($cond->failure) if $cond->is_failed;
1689 0 0       0 $src->cancel if $cond->is_cancelled
1690 0         0 }));
1691 0 0   0   0 sub { $src->emit($_) unless $condition->is_done; }
1692 0         0 } else {
1693 0         0 die 'unknown type for condition: ' . $condition;
1694             }
1695             }, $src);
1696             }
1697             }
1698              
1699             =head2 take
1700              
1701             Takes a limited number of items.
1702              
1703             Given a sequence of C< 1,2,3,4,5 > and C<< ->take(3) >>, you'd get 1,2,3 and then the stream
1704             would finish.
1705              
1706             =cut
1707              
1708             sub take {
1709 2     2 1 15 my ($self, $count) = @_;
1710 2   50     33 $count //= 0;
1711 2 50       9 return $self->empty unless $count > 0;
1712              
1713 2         56 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1714             $self->each_while_source(sub {
1715 5     5   26 $log->tracef("Still alive with %d remaining", $count);
1716 5         58 $src->emit($_);
1717 5 100       16 return if --$count;
1718 2         10 $log->tracef("Count is zero, finishing");
1719 2         20 $src->finish
1720 2         16 }, $src);
1721             }
1722              
1723             =head2 first
1724              
1725             Returns a source which provides the first item from the stream.
1726              
1727             =cut
1728              
1729             sub first {
1730 0     0 1 0 my ($self) = @_;
1731              
1732 0         0 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1733             $self->each_while_source(sub {
1734 0     0   0 $src->emit($_);
1735 0         0 $src->finish
1736 0         0 }, $src);
1737             }
1738              
1739             =head2 some
1740              
1741             Applies the given code to each item, and emits a single item:
1742              
1743             =over 4
1744              
1745             =item * 0 if the code never returned true or no items were received
1746              
1747             =item * 1 if the code ever returned a true value
1748              
1749             =back
1750              
1751             =cut
1752              
1753             sub some {
1754 1     1 1 13 my ($self, $code) = @_;
1755              
1756 1         16 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1757             $self->_completed->on_ready(sub {
1758 1     1   27 my $sf = $src->_completed;
1759 1 50       5 return if $sf->is_ready;
1760 0         0 my $f = shift;
1761 0 0       0 return $f->on_ready($sf) unless $f->is_done;
1762 0         0 $src->emit(0);
1763 0         0 $sf->done;
1764 1         5 });
1765             $self->each(sub {
1766 4 50   4   11 return if $src->_completed->is_ready;
1767 4 100       26 return unless $code->($_);
1768 1         10 $src->emit(1);
1769 1         4 $src->_completed->done
1770 1         31 });
1771 1         7 $src
1772             }
1773              
1774             =head2 every
1775              
1776             Similar to L, except this requires the coderef to return true for
1777             all values in order to emit a C<1> value.
1778              
1779             =cut
1780              
1781             sub every {
1782 1     1 1 32 my ($self, $code) = @_;
1783              
1784 1         19 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1785             $self->_completed->on_done(sub {
1786 1 50   1   31 return if $src->_completed->is_ready;
1787 1         13 $src->emit(1);
1788 1         3 $src->_completed->done
1789 1         5 });
1790             $self->each(sub {
1791 5 50   5   13 return if $src->_completed->is_ready;
1792 5 50       29 return if $code->($_);
1793 0         0 $src->emit(0);
1794 0         0 $src->_completed->done
1795 1         35 });
1796 1         7 $src
1797             }
1798              
1799             =head2 count
1800              
1801             Emits the count of items seen once the parent source completes.
1802              
1803             =cut
1804              
1805             sub count {
1806 12     12 1 6339 my ($self) = @_;
1807              
1808 12         25 my $count = 0;
1809              
1810 12         177 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1811 27     27   67 $self->each_while_source(sub { ++$count }, $src, cleanup => sub {
1812 12 50   12   55 return unless shift->is_done;
1813 12         140 $src->emit($count)
1814 12         98 });
1815             }
1816              
1817             =head2 sum
1818              
1819             Emits the numeric sum of items seen once the parent completes.
1820              
1821             =cut
1822              
1823             sub sum {
1824 1     1 1 10 my ($self) = @_;
1825              
1826 1         2 my $sum = 0;
1827              
1828 1         17 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1829             $self->each_while_source(sub {
1830 5     5   14 $sum += $_
1831             }, $src, cleanup => sub {
1832 1 50   1   8 return unless shift->is_done;
1833 1         13 $src->emit($sum)
1834 1         13 });
1835             }
1836              
1837             =head2 mean
1838              
1839             Emits the mean (average) numerical value of all seen items.
1840              
1841             =cut
1842              
1843             sub mean {
1844 1     1 1 9 my ($self) = @_;
1845              
1846 1         4 my $sum = 0;
1847 1         2 my $count = 0;
1848              
1849 1         18 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1850 1     4   13 $self->each(sub { ++$count; $sum += $_ });
  4         6  
  4         11  
1851 1   50 1   54 $self->_completed->on_done(sub { $src->emit($sum / ($count || 1)) })
1852 1         4 ->on_ready($src->_completed);
1853 1         34 $src
1854             }
1855              
1856             =head2 max
1857              
1858             Emits the maximum numerical value of all seen items.
1859              
1860             =cut
1861              
1862             sub max {
1863 1     1 1 10 my ($self) = @_;
1864              
1865 1         15 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1866 1         3 my $max;
1867             $self->each(sub {
1868 8 100 100 8   35 return if defined $max and $max > $_;
1869 2         6 $max = $_;
1870 1         8 });
1871 1     1   25 $self->_completed->on_done(sub { $src->emit($max) })
1872 1         4 ->on_ready($src->_completed);
1873 1         28 $src
1874             }
1875              
1876             =head2 min
1877              
1878             Emits the minimum numerical value of all seen items.
1879              
1880             =cut
1881              
1882             sub min {
1883 1     1 1 9 my ($self) = @_;
1884              
1885 1         16 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1886 1         3 my $min;
1887             $self->each(sub {
1888 8 100 100 8   28 return if defined $min and $min < $_;
1889 4         9 $min = $_;
1890 1         9 });
1891 1     1   25 $self->_completed->on_done(sub { $src->emit($min) })
1892 1         3 ->on_ready($src->_completed);
1893 1         25 $src
1894             }
1895              
1896             =head2 statistics
1897              
1898             Emits a single hashref of statistics once the source completes.
1899              
1900             This will contain the following keys:
1901              
1902             =over 4
1903              
1904             =item * count
1905              
1906             =item * sum
1907              
1908             =item * min
1909              
1910             =item * max
1911              
1912             =item * mean
1913              
1914             =back
1915              
1916             =cut
1917              
1918             sub statistics {
1919 1     1 1 7 my ($self) = @_;
1920              
1921 1         2 my $sum = 0;
1922 1         1 my $count = 0;
1923 1         2 my $min;
1924             my $max;
1925              
1926 1         12 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1927             $self->each(sub {
1928 9   66 9   15 $min //= $_;
1929 9   66     11 $max //= $_;
1930 9 50       10 $min = $_ if $_ < $min;
1931 9 100       13 $max = $_ if $_ > $max;
1932 9         9 ++$count;
1933 9         12 $sum += $_
1934 1         8 });
1935             $self->_completed->on_done(sub {
1936 1   50 1   21 $src->emit({
1937             count => $count,
1938             sum => $sum,
1939             min => $min,
1940             max => $max,
1941             mean => ($sum / ($count || 1))
1942             })
1943             })
1944 1         2 ->on_ready($src->_completed);
1945 1         18 $src
1946             }
1947              
1948             =head2 filter
1949              
1950             Applies the given parameter to filter values.
1951              
1952             The parameter can be a regex or coderef. You can also
1953             pass (key, value) pairs to filter hashrefs or objects
1954             based on regex or coderef values.
1955              
1956             Examples:
1957              
1958             $src->filter(name => qr/^[A-Z]/, id => sub { $_ % 2 })
1959              
1960             =cut
1961              
1962             sub filter {
1963 12     12 1 3851 my $self = shift;
1964              
1965 12         217 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1966             $self->each_while_source((@_ > 1) ? do {
1967 10         43 my %args = @_;
1968             my $check = sub {
1969 51     51   97 my ($k, $v) = @_;
1970 51 100       120 if(my $ref = ref $args{$k}) {
1971 37 100       95 if($ref eq 'Regexp') {
    100          
    50          
1972 15 100 100     127 return 0 unless defined($v) && $v =~ $args{$k};
1973             } elsif($ref eq 'ARRAY') {
1974 12 100 100     49 return 0 unless defined($v) && List::Util::any { $v eq $_ } @{$args{$k}};
  20         62  
  8         26  
1975             } elsif($ref eq 'CODE') {
1976 10         33 return 0 for grep !$args{$k}->($_), $v;
1977             } else {
1978 0         0 die "Unsure what to do with $args{$k} which seems to be a $ref";
1979             }
1980             } else {
1981 14 100       42 return !defined($args{$k}) if !defined($v);
1982 13   66     79 return defined($args{$k}) && $v eq $args{$k};
1983             }
1984 18         8755 return 1;
1985 10         61 };
1986             sub {
1987 51     51   72 my $item = shift;
1988 51 100       139 if(Scalar::Util::blessed $item) {
    50          
1989 15         25 for my $k (keys %args) {
1990 15         43 my $v = $item->$k;
1991 15 100       54 return unless $check->($k, $v);
1992             }
1993             } elsif(my $ref = ref $item) {
1994 36 50       69 if($ref eq 'HASH') {
1995 36         76 for my $k (keys %args) {
1996 36         69 my $v = $item->{$k};
1997 36 100       70 return unless $check->($k, $v);
1998             }
1999             } else {
2000 0         0 die 'not a ref we know how to handle: ' . $ref;
2001             }
2002             } else {
2003 0         0 die 'not a ref, not sure what to do now';
2004             }
2005 21         66 $src->emit($item);
2006             }
2007 12 100       62 } : do {
  10         83  
2008 2         4 my $code = shift;
2009 2 50       10 if(my $ref = ref($code)) {
2010 2 50       14 if($ref eq 'Regexp') {
    50          
2011 0         0 my $re = $code;
2012 0     0   0 $code = sub { /$re/ };
  0         0  
2013             } elsif($ref eq 'CODE') {
2014             # use as-is
2015             } else {
2016 0         0 die "not sure how to handle $ref";
2017             }
2018             }
2019             sub {
2020 4     4   7 my $item = shift;
2021 4 100       13 $src->emit($item) if $code->($item);
2022             }
2023 2         18 }, $src);
2024             }
2025              
2026             =head2 filter_isa
2027              
2028             Emits only the items which C<< ->isa >> one of the given parameters.
2029             Will skip non-blessed items.
2030              
2031             =cut
2032              
2033             sub filter_isa {
2034 0     0 1 0 my ($self, @isa) = @_;
2035              
2036 0         0 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
2037             $self->each_while_source(sub {
2038 0     0   0 my ($item) = @_;
2039 0 0       0 return unless Scalar::Util::blessed $item;
2040 0 0       0 $src->emit($_) if grep $item->isa($_), @isa;
2041 0         0 }, $src);
2042             }
2043              
2044             =head2 emit
2045              
2046             Emits the given item.
2047              
2048             =cut
2049              
2050             sub emit {
2051 597     597 1 55216 my $self = shift;
2052 597         1422 my $completion = $self->_completed;
2053             my @handlers = $self->{on_item}->@*
2054 597 100       1789 or return $self;
2055 566         1046 for (@_) {
2056 598 50       5451 die 'already completed' if $completion->is_ready;
2057 598         2896 for my $code (@handlers) {
2058             try {
2059             $code->($_);
2060 632         3938 } catch {
2061             my $ex = $@;
2062             $log->warnf("Exception raised in %s - %s", (eval { $self->describe } // ""), "$ex");
2063             $completion->fail($ex, source => 'exception in on_item callback');
2064             die $ex;
2065             }
2066             }
2067             }
2068             $self
2069 566         5718 }
2070              
2071             =head2 emit_batch
2072              
2073             =cut
2074              
2075             sub emit_batch {
2076 0     0 1 0 my $self = shift;
2077 0         0 my $completion = $self->_completed;
2078 0 0       0 if(my @handlers = $self->{on_batch}->@*) {
2079 0         0 for (@_) {
2080 0 0       0 die 'already completed' if $completion->is_ready;
2081 0         0 for my $code (@handlers) {
2082             try {
2083             $code->($_);
2084 0         0 } catch {
2085             my $ex = $@;
2086             $log->warnf("Exception raised in %s - %s", (eval { $self->describe } // ""), "$ex");
2087             $completion->fail($ex, source => 'exception in on_batch callback');
2088             die $ex;
2089             }
2090             }
2091             }
2092             }
2093              
2094             # Support item-at-a-time callbacks if we have any
2095 0 0       0 return $self unless $self->{on_item}->@*;
2096 0         0 for my $batch (@_) {
2097 0         0 $self->emit($_) for $batch->@*;
2098             }
2099 0         0 return $self;
2100             }
2101              
2102             =head2 each
2103              
2104             =cut
2105              
2106             sub each : method {
2107 219     219 1 610 my ($self, $code, %args) = @_;
2108 219         327 push @{$self->{on_item}}, $code;
  219         498  
2109 219         501 $self;
2110             }
2111              
2112             =head2 each_batch
2113              
2114             =cut
2115              
2116             sub each_batch : method {
2117 0     0 1 0 my ($self, $code, %args) = @_;
2118 0         0 push @{$self->{on_batch}}, $code;
  0         0  
2119 0         0 $self;
2120             }
2121              
2122             =head2 each_as_source
2123              
2124             =cut
2125              
2126             sub each_as_source : method {
2127 0     0 1 0 my ($self, @code) = @_;
2128              
2129 0         0 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
2130 0         0 my @active;
2131             $self->_completed->on_ready(sub {
2132             Future->needs_all(
2133             grep $_, @active
2134             )->on_ready(sub {
2135 0         0 $src->finish
2136 0     0   0 })->retain
2137 0         0 });
2138              
2139             $self->each_while_source(sub {
2140 0     0   0 my @pending;
2141 0         0 for my $code (@code) {
2142 0         0 push @pending, $code->($_);
2143             }
2144 0         0 push @active, map $_->completed, @pending;
2145 0         0 $src->emit($_);
2146 0         0 }, $src)
2147             }
2148              
2149             sub cleanup {
2150 184     184 0 12655 my ($self) = @_;
2151 184         686 $log->tracef("Cleanup for %s (f = %s)", $self->describe, 0 + $self->_completed);
2152 184 100       1538 $_->cancel for values %{$self->{cancel_on_ready} || {}};
  184         1177  
2153 184 100       640 $self->parent->notify_child_completion($self) if $self->parent;
2154 184         1029 splice $self->{on_item}->@*;
2155 184         465 delete @{$self->{cancel_on_ready}}{keys %{$self->{cancel_on_ready}}};
  184         376  
  184         629  
2156 184         504 $log->tracef("Finished cleanup for %s", $self->describe);
2157             }
2158              
2159             sub notify_child_completion {
2160 78     78 0 203 my ($self, $child) = @_;
2161 78         165 my $addr = Scalar::Util::refaddr($child);
2162 78 50   88   383 if(List::UtilsBy::extract_by { $addr == Scalar::Util::refaddr($_) } @{$self->{children}}) {
  88         1089  
  78         474  
2163             $log->tracef(
2164             "Removed completed child %s, have %d left",
2165             $child->describe,
2166 78         1056 0 + @{$self->{children}}
  78         760  
2167             );
2168 78 100       791 return $self if $self->is_ready;
2169 7 50       48 return $self if @{$self->{children}};
  7         29  
2170              
2171 7         35 $log->tracef(
2172             "This was the last child, cancelling %s",
2173             $self->describe
2174             );
2175 7         154 $self->cancel;
2176 7         144 return $self;
2177             }
2178              
2179 0         0 $log->warnf("Child %s (addr 0x%x) not found in list for %s", $child->describe, $addr, $self->describe);
2180 0         0 $log->tracef("* %s (addr 0x%x)", $_->describe, Scalar::Util::refaddr($_)) for @{$self->{children}};
  0         0  
2181 0         0 $self
2182             }
2183              
2184             =head2 await
2185              
2186             Block until this source finishes.
2187              
2188             =cut
2189              
2190             sub await {
2191 10     10 1 52 my ($self) = @_;
2192 10         31 $self->prepare_await;
2193 10         26 my $f = $self->_completed;
2194 10         30 $f->await until $f->is_ready;
2195 10         88 $self
2196             }
2197              
2198             =head2 next
2199              
2200             Returns a L which will resolve to the next item emitted by this source.
2201              
2202             If the source completes before an item is emitted, the L will be cancelled.
2203              
2204             Note that these are independent - they don't stack, so if you call C<< ->next >>
2205             multiple times before an item is emitted, each of those would return the same value.
2206              
2207             See L if you're dealing with protocols and want to extract sequences of
2208             bytes or characters.
2209              
2210             To access the sequence as a discrete stream of L instances, try L
2211             which will provide a L.
2212              
2213             =cut
2214              
2215             sub next : method {
2216 3     3 1 2397 my ($self) = @_;
2217             my $f = $self->new_future(
2218             'next'
2219             )->on_ready($self->$curry::weak(sub {
2220 3     3   2210 my ($self, $f) = @_;
2221 3         8 my $addr = Scalar::Util::refaddr($f);
2222 3 50       19 List::UtilsBy::extract_by { Scalar::Util::refaddr($_) == $addr } @{$self->{on_item} || []};
  3         44  
  3         31  
2223 3         41 delete $self->{cancel_on_ready}{$f};
2224 3         26 }));
2225 3         268 $self->{cancel_on_ready}{$f} = $f;
2226 3         18 push @{$self->{on_item}}, sub {
2227 3 100   3   12 $f->done(shift) unless $f->is_ready;
2228 3         9 };
2229 3         23 return $f;
2230             }
2231              
2232             =head2 finish
2233              
2234             Mark this source as completed.
2235              
2236             =cut
2237              
2238 92 100   92 1 6456 sub finish { $_[0]->_completed->done unless $_[0]->_completed->is_ready; $_[0] }
  92         1887  
2239              
2240       0 0   sub refresh { }
2241              
2242             =head1 METHODS - Proxied
2243              
2244             The following methods are proxied to our completion L:
2245              
2246             =over 4
2247              
2248             =item * then
2249              
2250             =item * is_ready
2251              
2252             =item * is_done
2253              
2254             =item * failure
2255              
2256             =item * is_cancelled
2257              
2258             =item * else
2259              
2260             =back
2261              
2262             =cut
2263              
2264             sub get {
2265 0     0 0 0 my ($self) = @_;
2266 0         0 my $f = $self->_completed;
2267 0         0 my @rslt;
2268 0 0   0   0 $self->each(sub { push @rslt, $_ }) if defined wantarray;
  0         0  
2269 0 0       0 if(my $parent = $self->parent) {
2270 0         0 $parent->await
2271             }
2272             $f->transform(done => sub {
2273             @rslt
2274 0     0   0 })->get
  0         0  
2275             }
2276              
2277             for my $k (qw(then fail on_ready transform is_ready is_done is_failed failure else)) {
2278 43     43   531296 do { no strict 'refs'; *$k = $_ } for sub { shift->_completed->$k(@_) }
  43     201   99  
  43         9510  
  201         6066  
2279             }
2280             # Cancel operations are only available through the internal state, since we don't want anything
2281             # accidentally cancelling due to Future->wait_any(timeout, $src->_completed) or similar constructs
2282             for my $k (qw(cancel is_cancelled)) {
2283 43     43   298 do { no strict 'refs'; *$k = $_ } for sub { shift->{completed}->$k(@_) }
  43     7   83  
  43         104320  
  7         72  
2284             }
2285              
2286             =head1 METHODS - Internal
2287              
2288             =head2 prepare_await
2289              
2290             Run any pre-completion callbacks (recursively) before
2291             we go into an await cycle.
2292              
2293             Used for compatibility with sync bridges when there's
2294             no real async event loop available.
2295              
2296             =cut
2297              
2298             sub prepare_await {
2299 54     54 1 94 my ($self) = @_;
2300 54 100       121 (delete $self->{on_get})->() if $self->{on_get};
2301 54 100       145 return unless my $parent = $self->parent;
2302 27 50       124 my $code = $parent->can('prepare_await') or return;
2303 27         74 local @_ = ($parent);
2304 27         89 goto &$code;
2305             }
2306              
2307             =head2 chained
2308              
2309             Returns a new L chained from this one.
2310              
2311             =cut
2312              
2313             sub chained {
2314 97     97 1 304949 my ($self) = shift;
2315 97 100       375 if(my $class = ref($self)) {
2316             my $src = $class->new(
2317             new_future => $self->{new_future},
2318 96         720 parent => $self,
2319             @_
2320             );
2321 96         477 Scalar::Util::weaken($src->{parent});
2322 96         272 push @{$self->{children}}, $src;
  96         574  
2323 96         660 $log->tracef("Constructing chained source for %s from %s (%s)", $src->label, $self->label, $self->_completed->state);
2324 96         1769 return $src;
2325             } else {
2326 1         7 my $src = $self->new(@_);
2327 1         8 $log->tracef("Constructing chained source for %s with no parent", $src->label);
2328 1         7 return $src;
2329             }
2330             }
2331              
2332             =head2 each_while_source
2333              
2334             Like L, but removes the source from the callback list once the
2335             parent completes.
2336              
2337             =cut
2338              
2339             sub each_while_source {
2340 73     73 1 265 my ($self, $code, $src, %args) = @_;
2341 73         431 $self->each($code);
2342             $src->_completed->on_ready(sub {
2343 55     55   2102 my $addr = Scalar::Util::refaddr($code);
2344 55         311 my $count = List::UtilsBy::extract_by { $addr == Scalar::Util::refaddr($_) } @{$self->{on_item}};
  1         12  
  55         240  
2345 55         843 $log->tracef("->each_while_source completed on %s for refaddr 0x%x, removed %d on_item handlers", $self->describe, Scalar::Util::refaddr($self), $count);
2346 73         222 });
2347             $self->_completed->on_ready(sub {
2348 55     55   1197 my ($f) = @_;
2349 55 100       240 $args{cleanup}->($f, $src) if exists $args{cleanup};
2350 55 100 100     228 $f->on_ready($src->_completed) unless $src->is_ready or !($args{finish_source} // 1);
      100        
2351 73         1485 });
2352 73         1696 $src
2353             }
2354              
2355             =head2 map_source
2356              
2357             Provides a L source which has more control over what it
2358             emits than a standard L or L implementation.
2359              
2360             $original->map_source(sub {
2361             my ($item, $src) = @_;
2362             $src->emit('' . reverse $item);
2363             });
2364              
2365             =cut
2366              
2367             sub map_source {
2368 0     0 1 0 my ($self, $code) = @_;
2369              
2370 0         0 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
2371             $self->_completed->on_ready(sub {
2372 0 0   0   0 return if $src->is_ready;
2373 0         0 shift->on_ready($src->_completed);
2374 0         0 });
2375             $self->each_while_source(sub {
2376 0     0   0 $code->($_, $src) for $_;
2377 0         0 }, $src);
2378             }
2379              
2380             sub DESTROY {
2381 184     184   331350 my ($self) = @_;
2382 184 50       805 return if ${^GLOBAL_PHASE} eq 'DESTRUCT';
2383 184         685 $log->tracef("Destruction for %s", $self->describe);
2384 184 100       2938 $self->_completed->cancel unless $self->_completed->is_ready;
2385             }
2386              
2387             sub catch {
2388 0     0 0   my ($self, $code) = @_;
2389 0           my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
2390             $self->_completed->on_fail(sub {
2391 0     0     my @failure = @_;
2392 0           my $sub = $code->(@failure);
2393 0 0 0       if(Scalar::Util::blessed $sub && $sub->isa('Ryu::Source')) {
2394             $sub->each_while_source(sub {
2395 0           $src->emit($_)
2396 0           }, $src);
2397             } else {
2398 0           $sub->fail(@failure);
2399             }
2400 0           });
2401             $self->each_while_source(sub {
2402 0     0     $src->emit($_)
2403 0           }, $src);
2404             }
2405              
2406             1;
2407              
2408             __END__