File Coverage

blib/lib/Async/Stream.pm
Criterion Covered Total %
statement 544 557 97.6
branch 151 164 92.0
condition 25 34 73.5
subroutine 72 72 100.0
pod 28 28 100.0
total 820 855 95.9


line stmt bran cond sub pod time code
1             package Async::Stream;
2              
3 5     5   197693 use 5.010;
  5         18  
4 5     5   41 use strict;
  5         13  
  5         107  
5 5     5   28 use warnings;
  5         14  
  5         143  
6 5     5   23 no warnings qw(ambiguous);
  5         8  
  5         396  
7              
8 5     5   1563 use Async::Stream::Item;
  5         10  
  5         134  
9 5     5   1392 use Async::Stream::Iterator;
  5         10  
  5         138  
10              
11 5     5   29 use Carp qw(croak);
  5         7  
  5         330  
12 5     5   33 use Scalar::Util qw(weaken);
  5         11  
  5         7686  
13              
14             =head1 NAME
15              
16             Async::Stream - it's convenient way to work with async data flow.
17              
18             =head1 VERSION
19              
20             Version 0.11
21              
22             =cut
23              
24             our $VERSION = '0.12';
25              
26             =head1 SYNOPSIS
27              
28             Module helps to organize your async code to stream.
29              
30             use Async::Stream;
31              
32             my @urls = qw(
33             http://ucoz.com
34             http://ya.ru
35             http://google.com
36             );
37              
38             my $stream = Async::Stream::FromArray->new(@urls);
39              
40             $stream
41             ->transform(sub {
42             $return_cb = shift;
43             http_get $_, sub {
44             $return_cb->({headers => $_[0], body => $_[0]})
45             };
46             })
47             ->grep(sub { $_->{headers}->{Status} =~ /^2/ })
48             ->each(sub {
49             print $_->{body};
50             });
51              
52             =head1 SUBROUTINES/METHODS
53              
54             =head2 new($generator)
55              
56             Constructor creates instance of class.
57             Class method gets 1 arguments - generator subroutine references to generate items.
58             Generator will get a callback which it will use for returning result.
59             If generator is exhausted then returning callback is called without arguments.
60              
61             my $i = 0;
62             my $stream = Async::Stream->new(sub {
63             $return_cb = shift;
64             if ($i < 10) {
65             $return_cb->($i++);
66             } else {
67             $return_cb->();
68             }
69             });
70             =cut
71             sub new {
72 45     45 1 20142 my $class = shift;
73 45         79 my $generator = shift;
74 45         88 my %args = @_;
75              
76 45 100 66     226 if (ref $generator ne 'CODE') {
    50          
77 1         194 croak 'First argument can be only subroutine reference';
78             }
79             elsif ($args{prefetch} && $args{prefetch} < 0) {
80 0         0 croak 'Prefetch can\'t be less then zero';
81             }
82              
83             my $self = bless {
84             _head => undef,
85 44   100     267 _prefetch => int($args{prefetch} // 0),
86             }, $class;
87              
88 44         151 $self->_set_head($generator);
89              
90 44         259 return $self;
91             }
92              
93             =head2 head()
94              
95             Method returns stream's head item.
96             Head is a instance of class Async::Stream::Item.
97              
98             my $stream_head = $stream->head;
99             =cut
100             sub head {
101 72     72 1 215 return $_[0]->{_head};
102             }
103              
104             =head2 set_prefetch($number)
105              
106             Method returns stream's head item.
107             Head is a instance of class Async::Stream::Item.
108              
109             $stream->set_prefetch(4);
110             =cut
111             sub set_prefetch {
112 1     1 1 10 my $self = shift;
113 1   50     4 my $prefetch = shift // 0;
114              
115 1 50       4 if ($prefetch < 0) {
116 0         0 croak 'Prefetch can\'t be less then zero';
117             }
118              
119 1         2 $self->{_prefetch} = $prefetch;
120              
121 1         8 return $self;
122             }
123              
124              
125             =head2 iterator()
126              
127             Method returns stream's iterator.
128             Iterator is a instance of class Async::Stream::Iterator.
129              
130             my $stream_iterator = $stream->iterator;
131             =cut
132              
133             sub iterator {
134 63     63 1 167 return Async::Stream::Iterator->new( $_[0]->head );
135             }
136              
137             =head2 shift($return_cb)
138              
139             Remove first item from stream and return it to return callback
140              
141             $stream->shift(sub {
142             if (@_){
143             my $item = shift;
144              
145             ...
146             }
147             });
148             =cut
149             sub shift {
150 7     7 1 34 my ($self, $return_cb) = @_;
151              
152 7         14 my $head = $self->head;
153             $head->next(sub {
154 7 100   7   13 if (@_) {
155 6         10 $self->{_head} = $_[0];
156 6         11 $return_cb->($_[0]->val);
157             } else {
158 1         5 $return_cb->();
159             }
160 7         34 });
161              
162 7         31 return $self;
163             }
164              
165              
166             =head1 CONVEYOR METHODS
167              
168             =head2 peek($action)
169              
170             This method helps to debug streams data flow.
171             You can use this method for printing or logging steam data and track data
172             mutation between stream's transformations.
173              
174             $stream->peek(sub { print $_, "\n" })->to_arrayref(sub {print @{$_[0]}});
175             =cut
176             sub peek {
177 2     2 1 13 my $self = shift;
178 2         5 my $action = shift;
179              
180 2 100       9 if (ref $action ne 'CODE') {
181 1         127 croak 'First argument can be only subroutine reference'
182             }
183              
184 1         3 my $iterator = $self->iterator;
185             my $generator = sub {
186 4     4   8 my $return_cb = shift;
187             $iterator->next(sub {
188 4 100       11 if (@_) {
189 3         13 $action->() for ($_[0]);
190 3         2566 $return_cb->($_[0]);
191             } else {
192 1         6 $return_cb->()
193             }
194 4         19 });
195 1         4 };
196              
197 1         4 $self->_set_head($generator, prefetch => 0);
198              
199 1         5 return $self;
200             }
201              
202             =head2 grep($predicate)
203              
204             The method greps current stream. Filter works like lazy grep.
205              
206             $stream->grep(sub {$_ % 2})->to_arrayref(sub {print @{$_[0]}});
207              
208             =cut
209             sub grep {
210 2     2 1 12 my $self = shift;
211 2         4 my $predicate = shift;
212              
213 2 100       6 if (ref $predicate ne 'CODE') {
214 1         67 croak 'First argument can be only subroutine reference'
215             }
216              
217 1         2 my $iterator = $self->iterator;
218              
219 1         2 my $next_cb; $next_cb = sub {
220 4     4   5 my $return_cb = shift;
221             $iterator->next(sub {
222 4 100       7 if (@_) {
223 3         4 my $is_valid;
224 3         8 $is_valid = $predicate->() for ($_[0]);
225 3 100       11 if ($is_valid) {
226 2         3 $return_cb->($_[0]);
227             } else {
228 1         3 $next_cb->($return_cb);
229             }
230             } else {
231 1         3 $return_cb->()
232             }
233 4         14 });
234 1         3 };
235              
236 1         4 $self->_set_head($next_cb, prefetch => 0);
237              
238 1         9 return $self;
239             }
240              
241             =head2 map($transformer)
242              
243             Method makes synchronous transformation for stream, like usual map for array.
244              
245             $stream->map(sub { $_ * 2 })->to_arrayref(sub {print @{$_[0]}});
246              
247             =cut
248             sub map {
249 1     1 1 10 my $self = shift;
250 1         3 my $transformer = shift;
251              
252 1 50       4 if (ref $transformer ne 'CODE') {
253 0         0 croak 'First argument can be only subroutine reference'
254             }
255              
256 1         2 my $iterator = $self->iterator;
257              
258 1         1 my $next_cb; $next_cb = sub {
259 4     4   5 my $return_cb = shift;
260             $iterator->next(sub {
261 4 100       16 if (@_) {
262 3         6 $return_cb->($transformer->()) for ($_[0]);
263             } else {
264 1         2 $return_cb->()
265             }
266 4         12 });
267 1         4 };
268              
269 1         3 $self->_set_head($next_cb, prefetch => 0);
270              
271 1         5 return $self;
272             }
273              
274             =head2 transform($transformer)
275              
276             Method transform current stream.
277             Transform works like lazy map with async response.
278             You can use the method for example for async http request or another async
279             operation.
280              
281             $stream->transform(sub {
282             $return_cb = shift;
283             $return_cb->($_ * 2)
284             })->to_arrayref(sub {print @{$_[0]}});
285              
286             =cut
287             sub transform {
288 4     4 1 10 my $self = shift;
289 4         5 my $transformer = shift;
290              
291 4 100       14 if (ref $transformer ne 'CODE') {
292 2         141 croak 'First argument can be only subroutine reference'
293             }
294              
295 2         6 my $iterator = $self->iterator;
296              
297 2         4 my $next_cb; $next_cb = sub {
298 10     10   15 my $return_cb = shift;
299             $iterator->next(sub {
300 10 100       16 if (@_) {
301 7         19 $transformer->($return_cb) for ($_[0]);
302             } else {
303 3         6 $return_cb->()
304             }
305 10         37 });
306 2         6 };
307              
308 2         6 $self->_set_head($next_cb);
309              
310 2         5 return $self;
311             }
312              
313             =head2 append(@list_streams)
314              
315             The method appends one or several streams to tail of current stream.
316              
317             $stream->append($stream1)->to_arrayref(sub {print @{$_[0]}});
318             =cut
319             sub append {
320 2     2 1 11 my $self = shift;
321 2         5 my @streams = @_;
322              
323 2         5 for my $stream (@streams) {
324 2 100       11 if (!$stream->isa('Async::Stream')) {
325 1         69 croak 'Arguments can be only Async::Stream or instances of derived class'
326             }
327             }
328              
329 1         3 my $iterator = $self->iterator;
330              
331 1         2 my $generator; $generator = sub {
332 8     8   9 my $return_cb = shift;
333             $iterator->next(sub {
334 8 100       18 if (@_){
    100          
335 6         10 $return_cb->($_[0]);
336             } elsif (@streams) {
337 1         2 $iterator = (shift @streams)->iterator;
338 1         3 $generator->($return_cb);
339             } else {
340 1         3 $return_cb->();
341             }
342 8         27 });
343 1         4 };
344              
345 1         4 $self->_set_head($generator, prefetch => 0);
346              
347 1         5 return $self;
348             }
349              
350             =head2 skip($number)
351              
352             The method skips $number items in stream.
353              
354             $stream->skip(5)->to_arrayref(sub {print @{$_[0]}});
355             =cut
356             sub skip {
357 3     3 1 14 my $self = shift;
358 3         5 my $skip = int shift;
359              
360 3 100       79 croak 'First argument can be only non-negative integer' if ($skip < 0);
361              
362 2 100       5 if ($skip) {
363 1         3 my $iterator = $self->iterator;
364 1         2 my $generator; $generator = sub {
365 4     4   7 my $return_cb = shift;
366             $iterator->next(sub {
367 4 100       7 if (@_){
368 3 100       6 if ($skip-- > 0) {
369 1         3 $generator->($return_cb);
370             } else {
371 2         4 $return_cb->($_[0]);
372             }
373             } else {
374 1         3 $return_cb->();
375             }
376 4         11 });
377 1         4 };
378              
379 1         4 $self->_set_head($generator, prefetch => 0);
380              
381 1         5 return $self;
382             }
383            
384 1         5 return $self;
385             }
386              
387             =head2 limit($number)
388              
389             The method limits $number items in stream.
390              
391             $stream->limit(5)->to_arrayref(sub {print @{$_[0]}});
392             =cut
393             sub limit {
394 4     4 1 19 my $self = shift;
395 4         6 my $limit = int shift;
396              
397 4 100       100 croak 'First argument can be only non-negative integer' if ($limit < 0);
398              
399 3         8 my $iterator = $self->iterator;
400              
401             $self->_set_head(
402             sub {
403 4     4   8 my ($return_cb) = @_;
404 4 100       10 return $return_cb->() if ($limit-- <= 0);
405 1         3 $iterator->next($return_cb);
406             },
407 3         14 prefetch => 0
408             );
409              
410 3         13 return $self;
411             }
412              
413             =head2 spray($number)
414              
415             The method helps to divide items of stream to several items.
416             For example you can use this method to get some page,
417             then get all link from that page and make from these link another items.
418              
419             $stream->spray(sub{ return (1 .. $_) })->to_arrayref(sub {print @{$_[0]}});
420             =cut
421             sub spray {
422 2     2 1 11 my $self = shift;
423 2         3 my $splitter = shift;
424              
425 2 100       7 if (ref $splitter ne 'CODE') {
426 1         78 croak 'First argument can be only subroutine reference'
427             }
428              
429 1         3 my $iterator = $self->iterator;
430              
431 1         2 my @buffer;
432             my $generator;
433             $generator = sub {
434 9     9   12 my $return_cb = shift;
435 9 100       14 if (@buffer) {
436 4         7 $return_cb->(shift @buffer);
437             }
438             else {
439             $iterator->next(sub {
440 5 100       9 if (@_) {
441 4         9 @buffer = $splitter->() for ($_[0]);
442 4 100       21 if (@buffer) {
443 2         4 $return_cb->(shift @buffer);
444             } else {
445 2         6 $generator->($return_cb);
446             }
447             } else {
448 1         3 $return_cb->();
449             }
450 5         8 return;
451 5         17 });
452             }
453 9         24 return;
454 1         5 };
455              
456 1         3 $self->_set_head($generator, prefetch => 0);
457              
458 1         5 return $self;
459             }
460              
461              
462             =head2 sort($comparator)
463              
464             The method sorts whole stream.
465              
466             $stream->sort(sub{$a <=> $b})->to_arrayref(sub {print @{$_[0]}});
467             =cut
468             sub sort {
469 2     2 1 12 my $self = shift;
470 2         4 my $comporator = shift;
471              
472 2 100       6 if (ref $comporator ne 'CODE') {
473 1         77 croak 'First argument can be only subroutine reference'
474             }
475              
476 1         2 my $pkg = caller;
477              
478 1         2 my $is_sorted = 0;
479 1         2 my @stream_items;
480              
481 1         3 my $iterator = $self->iterator;
482              
483             my $generator = sub {
484 4     4   6 my $return_cb = shift;
485 4 100       8 if ($is_sorted) {
486 3 100       8 $return_cb->( @stream_items ? shift @stream_items : () );
487             } else {
488 1         2 my $next_cb; $next_cb = sub {
489 4         5 my $next_cb = $next_cb;
490             $iterator->next(sub {
491 4 100       7 if (@_) {
492 3         4 push @stream_items, $_[0];
493 3         9 $next_cb->();
494             } else {
495 1 50       3 if (@stream_items) {
496             {
497 5     5   49 no strict 'refs';
  5         12  
  5         1639  
  1         2  
498 1         2 local *{ $pkg . '::a' } = *{ __PACKAGE__ . '::a' };
  1         5  
  1         3  
499 1         2 local *{ $pkg . '::b' } = *{ __PACKAGE__ . '::b' };
  1         3  
  1         3  
500 1         4 @stream_items = sort $comporator @stream_items;
501             }
502 1         7 $is_sorted = 1;
503 1         3 $return_cb->(shift @stream_items);
504             } else {
505 0         0 $return_cb->();
506             }
507             }
508 4         14 });
509 1         3 };$next_cb->();
  1         3  
510 1         12 weaken $next_cb;
511             }
512 1         5 };
513              
514 1         4 $self->_set_head($generator, prefetch => 0);
515              
516 1         5 return $self;
517             }
518              
519             =head2 cut_sort($predicate, $comparator)
520              
521             Sometimes stream can be infinity and you can't you $stream->sort,
522             you need certain parts of streams for example cut part by length of items.
523              
524             $stream
525             ->cut_sort(sub {length $a != length $b},sub {$a <=> $b})
526             ->to_arrayref(sub {print @{$_[0]}});
527             =cut
528             sub cut_sort {
529 5     5 1 668 my $self = shift;
530 5         7 my $cut = shift;
531 5         7 my $comporator = shift;
532              
533 5 100 100     18 if (ref $cut ne 'CODE' or ref $comporator ne 'CODE') {
534 3         219 croak 'First and Second arguments can be only subrotine references'
535             }
536              
537 2         4 my $pkg = caller;
538              
539 2         3 my $iterator = $self->iterator;
540              
541 2         8 my $prev;
542             my @cur_slice;
543 2         0 my @sorted_array;
544 2         0 my $generator; $generator = sub {
545 12     12   15 my $return_cb = shift;
546 12 100       20 if (@sorted_array) {
547 3         7 $return_cb->(shift @sorted_array);
548             } else {
549 9 100       15 if (!defined $prev) {
550             $iterator->next(sub {
551 2 100       5 if (@_){
552 1         2 $prev = $_[0];
553 1         2 @cur_slice = ($prev);
554 1         3 $generator->($return_cb);
555             } else {
556 1         2 $return_cb->();
557             }
558 2         8 });
559             } else {
560             $iterator->next(sub {
561 7 100       9 if (@_) {
562 5         6 my $is_cut;
563             {
564 5     5   37 no strict 'refs';
  5         11  
  5         364  
  5         6  
565 5         6 local *{ $pkg . '::a' } = \$prev;
  5         13  
566 5         8 local *{ $pkg . '::b' } = \$_[0];
  5         8  
567 5         11 $is_cut = $cut->();
568             }
569 5         17 $prev = $_[0];
570 5 100       9 if ($is_cut) {
571             {
572 5     5   27 no strict 'refs';
  5         10  
  5         587  
  2         3  
573 2         3 local *{ $pkg . '::a' } = *{ __PACKAGE__ . '::a' };
  2         5  
  2         4  
574 2         3 local *{ $pkg . '::b' } = *{ __PACKAGE__ . '::b' };
  2         5  
  2         3  
575 2         6 @sorted_array = sort $comporator @cur_slice;
576             }
577 2         10 @cur_slice = ($prev);
578 2         5 $return_cb->(shift @sorted_array);
579             } else {
580 3         4 push @cur_slice, $prev;
581 3         8 $generator->($return_cb);
582             }
583             } else {
584 2 100       9 if (@cur_slice) {
585             {
586 5     5   32 no strict 'refs';
  5         9  
  5         2347  
  1         3  
587 1         2 local *{ $pkg . '::a' } = *{ __PACKAGE__ . '::a' };
  1         3  
  1         3  
588 1         29 local *{ $pkg . '::b' } = *{ __PACKAGE__ . '::b' };
  1         3  
  1         3  
589 1         21 @sorted_array = sort $comporator @cur_slice;
590             }
591 1         6 @cur_slice = ();
592 1         3 $return_cb->(shift @sorted_array);
593             } else {
594 1         2 $return_cb->();
595             }
596             }
597 7         34 });
598             }
599             }
600 2         6 };
601              
602 2         5 $self->_set_head($generator, prefetch => 0);
603              
604 2         9 return $self;
605             }
606              
607             =head2 reverse()
608              
609             Revers order of stream's items. Can't be done on endless stream.
610              
611             $stream->reverse;
612             =cut
613             sub reverse {
614 1     1 1 8 my $self = shift;
615              
616 1         3 my $is_received = 0;
617 1         2 my @stream_items;
618              
619 1         2 my $iterator = $self->iterator;
620              
621             my $generator = sub {
622 4     4   7 my $return_cb = shift;
623 4 100       6 if ($is_received) {
624 3 100       8 $return_cb->( @stream_items ? pop @stream_items : () );
625             } else {
626 1         8 my $next_cb; $next_cb = sub {
627 4         6 my $next_cb = $next_cb;
628             $iterator->next(sub {
629 4 100       7 if (@_) {
630 3         5 push @stream_items, $_[0];
631 3         7 $next_cb->();
632             } else {
633 1 50       2 if (@stream_items) {
634 1         2 $is_received = 1;
635 1         3 $return_cb->(pop @stream_items);
636             } else {
637 0         0 $return_cb->();
638             }
639             }
640 4         12 });
641 1         4 };$next_cb->();
  1         3  
642 1         10 weaken $next_cb;
643             }
644 1         4 };
645              
646 1         3 $self->_set_head($generator, prefetch => 0);
647              
648 1         4 return $self;
649             }
650              
651             =head2 merge_in($comparator, @list_streams);
652              
653             Merge additional streams into current stream by comparing each item of stream.
654              
655             $stream->merge_in(sub{$a <=> $b}, $stream1, $stream2);
656             =cut
657             sub merge_in {
658 3     3 1 334 my $self = shift;
659 3         5 my $comporator = shift;
660              
661 3 100       11 if (ref $comporator ne 'CODE') {
662 1         119 croak 'First argument can be only reference to subroutine';
663             }
664              
665 2         4 my $pkg = caller;
666              
667 2         3 my @iterators;
668 2         5 for my $stream ($self, @_) {
669 4 100       13 if ($stream->isa('Async::Stream')) {
670 3         7 push @iterators, [$stream->iterator];
671             } else {
672 1         61 croak 'Arguments can be only Async::Stream or instances of derived class'
673             }
674             }
675              
676             my $generator = sub {
677 7     7   11 my $return_cb = shift;
678 7         11 my $requested_item = grep { @{$_} == 1 } @iterators;
  13         14  
  13         25  
679 7         16 for (my $i = 0; $i < @iterators; $i++) {
680 8 50       9 if (@{$iterators[$i]} == 1) {
  8         17  
681 8         9 my $iterator_id = $i;
682             $iterators[$iterator_id][0]->next(sub {
683 8         9 $requested_item--;
684 8 100       12 if (@_) {
685 6         8 my $item = shift;
686 6         8 push @{$iterators[$iterator_id]}, $item;
  6         14  
687             } else {
688 2         4 $iterators[$iterator_id] = undef;
689             }
690              
691 8 100       17 if ($requested_item == 0) {
692             ### it's awful and need to optimize ###
693             {
694 5     5   35 no strict 'refs';
  5         11  
  5         5617  
  7         8  
695             my $comp = sub {
696 5         6 local ${ $pkg . '::a' } = $a->[1];
  5         14  
697 5         6 local ${ $pkg . '::b' } = $b->[1];
  5         9  
698 5         11 return $comporator->();
699 7         16 };
700 7         12 @iterators = sort $comp grep { defined $_ } @iterators;
  13         33  
701             }
702             ### ###
703 7 100       30 if (@iterators) {
704 6         7 my $item = pop @{ $iterators[0] };
  6         10  
705 6         10 $return_cb->($item);
706             } else {
707 1         3 $return_cb->();
708             }
709             }
710 8         52 });
711             }
712             }
713 1         5 };
714              
715 1         3 $self->_set_head($generator);
716              
717 1         2 return $self;
718             }
719              
720             =head2 $branch_stream branch_out($predicat);
721              
722             Method makes new branch of current stream by predicate.
723              
724             my $error_stream = $stream->branch_out(sub{$_->{headers}{status} != 200});
725             =cut
726             sub branch_out {
727 2     2 1 14 my $self = shift;
728 2         2 my $predicat = shift;
729              
730 2 100       7 if (ref $predicat ne 'CODE') {
731 1         91 croak 'First argument can be only subroutine reference'
732             }
733              
734 1         2 my @branch_items;
735             my @self_items;
736              
737 1         2 my $iterator = $self->iterator;
738              
739 1         2 my $generator; $generator = sub {
740 11     11   13 my $return_cb = shift;
741 11         26 my $is_for_branch = shift;
742              
743 11 50 66     45 if ($is_for_branch && @branch_items) {
    100 100        
744 0         0 return $return_cb->(shift @branch_items);
745             } elsif (!$is_for_branch && @self_items) {
746 3         7 return $return_cb->(shift @self_items);
747             }
748              
749             $iterator->next(sub {
750 8 100       19 if (@_) {
751 6         7 my $is_branch_item;
752 6         13 $is_branch_item = $predicat->() for ($_[0]);
753              
754 6 100 66     31 if ($is_for_branch && !$is_branch_item) {
    50 33        
755 3         5 push @self_items, $_[0];
756 3         7 return $generator->($return_cb,$is_for_branch);
757             } elsif (!$is_for_branch && $is_branch_item) {
758 0         0 push @branch_items, $_[0];
759 0         0 return $generator->($return_cb,$is_for_branch);
760             } else {
761 3         7 return $return_cb->($_[0]);
762             }
763             } else {
764 2         3 $return_cb->();
765             }
766 8         30 });
767 1         4 };
768              
769 1     4   5 $self->_set_head(sub { $generator->($_[0], 0) });
  4         8  
770 1     4   4 return Async::Stream->new(sub { $generator->($_[0], 1) });
  4         9  
771             }
772              
773             =head2 distinct($key_generator)
774              
775             Method discards duplicate items from stream.
776             By default uniqueness of items will be determined by textual representation of item.
777              
778             $stream->distinct(sub {$_->{name}})->to_arrayref(sub {print @{$_[0]}});
779             =cut
780             sub distinct {
781 3     3 1 18 my $self = shift;
782 3         4 my $to_key = shift;
783              
784 3 100       9 if (ref $to_key ne 'CODE') {
785 1     7   5 $to_key = sub { "$_" };
  7         13  
786             }
787              
788 3         6 my $iterator = $self->iterator;
789              
790 3         5 my %index_of;
791              
792             my $generator; $generator = sub {
793 24     24   29 my $return_cb = shift;
794             $iterator->next(sub {
795 24 100       33 if (@_) {
796 21         23 my $key;
797 21         39 $key = $to_key->() for ($_[0]);
798              
799 21 100       52 if (exists $index_of{$key}) {
800 14         29 $generator->($return_cb);
801             } else {
802 7         11 $index_of{$key} = undef;
803 7         14 $return_cb->($_[0]);
804             }
805             } else {
806 3         7 $return_cb->();
807             }
808 24         74 });
809              
810 3         7 };
811              
812 3         8 $self->_set_head($generator);
813              
814 3         13 return $self;
815             }
816              
817             =head1 TERMINAL METHODS
818              
819             =head2 to_arrayref($returing_cb)
820              
821             Method returns stream's iterator.
822              
823             $stream->to_arrayref(sub {
824             $array_ref = shift;
825              
826             #...
827             });
828             =cut
829             sub to_arrayref {
830 25     25 1 81 my $self = shift;
831 25         40 my $return_cb = shift;
832              
833 25 50       66 if (ref $return_cb ne 'CODE') {
834 0         0 croak 'First argument can be only subroutine reference'
835             }
836              
837 25         35 my @result;
838              
839 25         57 my $iterator = $self->iterator;
840              
841 25         44 my $next_cb; $next_cb = sub {
842 104     104   129 my $next_cb = $next_cb;
843             $iterator->next(sub {
844 104 100       184 if (@_) {
845 79         112 push @result, $_[0];
846 79         193 $next_cb->();
847             } else {
848 25         60 $return_cb->(\@result);
849             }
850 104         395 });
851 25         74 };$next_cb->();
  25         56  
852 25         84 weaken $next_cb;
853              
854 25         117 return $self;
855             }
856              
857             =head2 each($action)
858              
859             Method execute action on each item in stream.
860              
861             $stream->each(sub {
862             print $_, "\n";
863             });
864             =cut
865             sub each {
866 3     3 1 33 my $self = shift;
867 3         8 my $action = shift;
868              
869 3 100       15 if (ref $action ne 'CODE') {
870 2         260 croak 'First argument can be only subroutine reference'
871             }
872              
873 1         5 my $iterator = $self->iterator;
874              
875 1         4 my $each; $each = sub {
876 4     4   12 my $each = $each;
877             $iterator->next(sub {
878 4 100       17 if (@_) {
879 3         14 $action->() for ($_[0]);
880 3         2781 $each->()
881             }
882 4         33 });
883 1         5 }; $each->();
  1         6  
884 1         6 weaken $each;
885              
886 1         3 return $self;
887             }
888              
889              
890             =head2 shift_each($action)
891              
892             Method acts like each,but after process item, it removes them from the stream
893              
894             $stream->shift_each(sub {
895             print $_, "\n";
896             });
897             =cut
898             sub shift_each {
899 2     2 1 15 my $self = shift;
900 2         2 my $action = shift;
901              
902 2 100       17 if (ref $action ne 'CODE') {
903 1         76 croak 'First argument can be only subroutine reference'
904             }
905              
906              
907 1         1 my $each; $each = sub {
908 4     4   8 my $each = $each;
909             $self->shift(sub {
910 4 100       11 if (@_) {
911 3         8 $action->() for ($_[0]);
912 3         1712 $each->()
913             }
914 4         21 });
915 1         5 }; $each->();
  1         12  
916 1         6 weaken $each;
917              
918 1         2 return $self;
919             }
920              
921             =head2 reduce($accumulator, $returing_cb)
922              
923             Performs a reduction on the items of the stream.
924              
925             $stream->reduce(
926             sub{ $a + $b },
927             sub {
928             my $sum_of_items = shift;
929              
930             ...
931             });
932              
933             =cut
934             sub reduce {
935 9     9 1 37 my $self = shift;
936 9         13 my $code = shift;
937 9         14 my $return_cb = shift;
938              
939 9 100 66     40 if (ref $return_cb ne 'CODE' or ref $code ne 'CODE') {
940 1         98 croak 'First and Second arguments can be only subroutine references'
941             }
942              
943 8         16 my $pkg = caller;
944              
945 8         22 my $iterator = $self->iterator;
946              
947             $iterator->next(sub {
948 8 50   8   19 if (@_) {
949 8         11 my $prev = $_[0];
950            
951 8         11 my $reduce_cb; $reduce_cb = sub {
952 24         38 my $reduce_cb = $reduce_cb;
953             $iterator->next(sub {
954 24 100       81 if (@_) {
955             {
956 5     5   39 no strict 'refs';
  5         10  
  5         4175  
  16         24  
957 16         21 local *{ $pkg . '::a' } = \$prev;
  16         57  
958 16         28 local *{ $pkg . '::b' } = \$_[0];
  16         39  
959 16         32 $prev = $code->();
960             }
961 16         54 $reduce_cb->();
962             } else {
963 8         20 $return_cb->($prev);
964             }
965 24         88 });
966 8         26 };$reduce_cb->();
  8         22  
967 8         35 weaken $reduce_cb;
968             } else {
969 0         0 $return_cb->();
970             }
971 8         41 });
972              
973 8         22 return $self;
974             }
975              
976             =head2 sum($returing_cb)
977              
978             The method computes sum of all items in stream.
979              
980             $stream->sum(
981             sub {
982             my $sum_of_items = shift;
983              
984             ...
985             });
986             =cut
987             sub sum {
988 2     2 1 12 my $self = shift;
989 2         3 my $return_cb = shift;
990              
991 2 100       8 if (ref $return_cb ne 'CODE') {
992 1         68 croak 'First argument can be only subroutine reference'
993             }
994              
995 1     2   5 $self->reduce(sub{$a+$b}, $return_cb);
  2         3  
996              
997 1         2 return $self;
998             }
999              
1000             =head2 min($returing_cb)
1001              
1002             The method finds out minimum item among all items in stream.
1003              
1004             $stream->min(
1005             sub {
1006             my $min_item = shift;
1007            
1008             ...
1009             });
1010             =cut
1011             sub min {
1012 3     3 1 36 my $self = shift;
1013 3         8 my $return_cb = shift;
1014              
1015 3 100       13 if (ref $return_cb ne 'CODE') {
1016 1         182 croak 'First argument can be only subroutine reference'
1017             }
1018              
1019 2 100   4   17 $self->reduce(sub{$a < $b ? $a : $b}, $return_cb);
  4         17  
1020              
1021 2         5 return $self;
1022             }
1023              
1024             =head2 max($returing_cb)
1025              
1026             The method finds out maximum item among all items in stream.
1027              
1028             $stream->max(
1029             sub {
1030             my $max_item = shift;
1031              
1032             ...
1033             });
1034             =cut
1035             sub max {
1036 3     3 1 18 my $self = shift;
1037 3         4 my $return_cb = shift;
1038              
1039 3 100       9 if (ref $return_cb ne 'CODE') {
1040 1         94 croak 'First argument can be only subroutine reference'
1041             }
1042              
1043 2 100   4   7 $self->reduce(sub{$a > $b ? $a : $b}, $return_cb);
  4         10  
1044              
1045 2         5 return $self;
1046             }
1047              
1048             =head2 count($returing_cb)
1049              
1050             The method counts number items in streams.
1051              
1052             $stream->count(sub {
1053             my $count = shift;
1054             });
1055             =cut
1056             sub count {
1057 4     4 1 30 my $self = shift;
1058 4         8 my $return_cb = shift;
1059              
1060 4 100       13 if (ref $return_cb ne 'CODE') {
1061 1         67 croak 'First argument can be only subroutine reference'
1062             }
1063              
1064 3         5 my $result = 0;
1065 3         7 my $iterator = $self->iterator;
1066              
1067 3         6 my $next_cb ; $next_cb = sub {
1068 6     6   10 my $next_cb = $next_cb;
1069             $iterator->next(sub {
1070 6 100       14 if (@_) {
1071 3         4 $result++;
1072 3         7 return $next_cb->();
1073             }
1074 3         8 $return_cb->($result)
1075 6         19 });
1076 3         8 }; $next_cb->();
  3         10  
1077 3         13 weaken $next_cb;
1078              
1079 3         9 return $self;
1080             }
1081              
1082             =head2 any($predicat, $return_cb)
1083              
1084             Method look for any equivalent item in steam. if there is any then return that.
1085             if there isn't then return nothing.
1086              
1087             $stream->any(sub {$_ % 2}, sub{
1088             my $odd_item = shift
1089              
1090             ...
1091             });
1092             =cut
1093             sub any {
1094 2     2 1 20 my $self = shift;
1095 2         4 my $predicat = shift;
1096 2         4 my $return_cb = shift;
1097              
1098 2         5 my $iterator = $self->iterator;
1099              
1100 2         5 my $next_cb; $next_cb = sub {
1101 6     6   10 my $next_cb = $next_cb;
1102             $iterator->next(sub {
1103 6 100       14 if (@_) {
1104 5         7 my $is_valid;
1105 5         15 $is_valid = $predicat->() for ($_[0]);
1106 5 100       23 if ($is_valid) {
1107 1         2 $return_cb->($_[0]);
1108             } else {
1109 4         13 $next_cb->();
1110             }
1111             } else {
1112 1         3 $return_cb->()
1113             }
1114 6         27 });
1115 2         9 }; $next_cb->();
  2         7  
1116 2         11 weaken $next_cb;
1117              
1118 2         21 return $self;
1119             }
1120              
1121              
1122             sub _set_head {
1123 64     64   91 my $self = shift;
1124 64         98 my $generator = shift;
1125 64         113 my %args = @_;
1126              
1127 64   100     208 my $prefetch = $args{prefetch} // $self->{_prefetch};
1128              
1129 64         91 my $new_generator = $generator;
1130              
1131 64 100       129 if ($prefetch) {
1132 2         5 $new_generator = _get_prefetch_generator($generator, $self->{_prefetch});
1133             }
1134              
1135 64         193 $self->{_head} = Async::Stream::Item->new(undef, $new_generator);
1136              
1137 64         122 return $self;
1138             }
1139              
1140             sub _get_prefetch_generator {
1141 2     2   5 my ($generator,$prefetch) = @_;
1142              
1143 2         3 my @responses_cache;
1144             my @requests_queue;
1145 2         3 my $is_exhausted = 0;
1146 2         3 my $item_requested = 0;
1147              
1148             return sub {
1149 10     10   14 my $return_cb = shift;
1150              
1151 10 50       17 if (@responses_cache) {
1152 0         0 $return_cb->(shift @responses_cache);
1153             } else {
1154 10         15 push @requests_queue, $return_cb;
1155             }
1156              
1157 10 100 66     24 if (!$is_exhausted) {
    100          
1158 6         14 for (0 .. ($prefetch - $item_requested)) {
1159 18         32 $item_requested++;
1160             $generator->(sub {
1161 18         73 $item_requested--;
1162 18 100       26 if (@_) {
1163 8 50       14 if (@requests_queue) {
1164 8         16 shift(@requests_queue)->($_[0]);
1165             } else {
1166 0         0 push @responses_cache, $_[0];
1167             }
1168             } else {
1169 10         11 $is_exhausted = 1;
1170 10 50 66     48 if (!$item_requested && @requests_queue) {
1171 0         0 shift(@requests_queue)->();
1172             }
1173             }
1174 18         54 });
1175             }
1176             } elsif (!$item_requested && @requests_queue) {
1177 2         4 shift(@requests_queue)->();
1178             }
1179 2         8 };
1180             }
1181              
1182             =head1 AUTHOR
1183              
1184             Kirill Sysoev, C<< >>
1185              
1186             =head1 BUGS AND LIMITATIONS
1187              
1188             Please report any bugs or feature requests to
1189             L.
1190              
1191             =head1 SUPPORT
1192              
1193             You can find documentation for this module with the perldoc command.
1194              
1195             perldoc Async::Stream::Item
1196              
1197              
1198             =head1 LICENSE AND COPYRIGHT
1199              
1200             Copyright 2017 Kirill Sysoev.
1201              
1202             This program is free software; you can redistribute it and/or modify it
1203             under the terms of the the Artistic License (2.0). You may obtain a
1204             copy of the full license at:
1205              
1206             L
1207             =cut
1208              
1209             1; # End of Async::Stream