File Coverage

blib/lib/Async/Stream.pm
Criterion Covered Total %
statement 499 509 98.0
branch 134 144 93.0
condition 25 34 73.5
subroutine 67 67 100.0
pod 24 25 96.0
total 749 779 96.1


line stmt bran cond sub pod time code
1             package Async::Stream;
2              
3 3     3   50567 use 5.010;
  3         11  
4 3     3   17 use strict;
  3         8  
  3         81  
5 3     3   15 use warnings;
  3         10  
  3         72  
6              
7 3     3   948 use Async::Stream::Item;
  3         19  
  3         69  
8 3     3   934 use Async::Stream::Iterator;
  3         6  
  3         81  
9 3     3   19 use Scalar::Util qw(weaken);
  3         6  
  3         202  
10 3     3   19 use Carp;
  3         7  
  3         129  
11 3     3   15 use Exporter 'import';
  3         6  
  3         2876  
12              
13             our @EXPORT_OK = qw(merge branch);
14              
15             =head1 NAME
16              
17             Async::Stream - it's convenient way to work with async data flow.
18              
19             IMPORTANT! PUBLIC INTERFACE ISN'T STABLE, DO NOT USE IN PRODACTION BEFORE VERSION 1.0.
20              
21             =head1 VERSION
22              
23             Version 0.07
24              
25             =cut
26              
27             our $VERSION = '0.07';
28              
29             =head1 SYNOPSIS
30              
31             Module helps to organize your async code to stream.
32              
33             use Async::Stream;
34              
35             my @urls = qw(
36             http://ucoz.com
37             http://ya.ru
38             http://google.com
39             );
40              
41             my $stream = Async::Stream->new_from(@urls);
42              
43             $stream
44             ->transform(sub {
45             $return_cb = shift;
46             http_get $_, sub {
47             $return_cb->({headers => $_[0], body => $_[0]})
48             };
49             })
50             ->filter(sub { $_->{headers}->{Status} =~ /^2/ })
51             ->for_each(sub {
52             my $item = shift;
53             print $item->{body};
54             });
55              
56             =head1 SUBROUTINES/METHODS
57              
58             =head2 new($generator)
59              
60             Constructor creates instance of class.
61             Class method gets 1 arguments - generator subroutine references to generate items.
62             Generator will get a callback which it will use for returning result.
63             If generator is exhausted then returning callback is called without arguments.
64              
65             my $i = 0;
66             my $stream = Async::Stream->new(sub {
67             $return_cb = shift;
68             if ($i < 10) {
69             $return_cb->($i++);
70             } else {
71             $return_cb->();
72             }
73             });
74             =cut
75             sub new {
76 42     42 1 9736 my $class = shift;
77 42         69 my $generator = shift;
78 42         108 my %args = @_;
79              
80 42 100 66     239 if (ref $generator ne 'CODE') {
    50          
81 1         129 croak 'First argument can be only subroutine reference';
82             } elsif (defined $args{prefetch} and $args{prefetch} < 0) {
83 0         0 croak 'Prefetch can\'t be less then zero';
84             }
85              
86             my $self = bless {
87             _head => undef,
88 41   100     250 _prefetch => int($args{prefetch} // 0),
89             }, $class;
90              
91 41         138 $self->_set_head($generator);
92              
93 41         114 return $self;
94             }
95              
96             =head2 new_from(@array_of_items)
97              
98             Constructor creates instance of class.
99             Class method gets a list of items which are used for generating streams.
100            
101             my @domains = qw(
102             ucoz.com
103             ya.ru
104             googl.com
105             );
106            
107             my $stream = Async::Stream->new_from(@urls)
108              
109             =cut
110             sub new_from {
111 32     32 1 36905 my $class = shift;
112 32         74 my $items = \@_;
113              
114 130 100   130   236 return $class->new(sub { $_[0]->(@{$items} ? (shift @{$items}) : ()) })
  130         343  
  103         312  
115 32         168 }
116              
117             =head2 head()
118              
119             Method returns stream's head item.
120             Head is a instance of class Async::Stream::Item.
121              
122             my $stream_head = $stream->head;
123             =cut
124             sub head {
125 55     55 1 136 return $_[0]->{_head};
126             }
127              
128             =head2 prefetch($number)
129              
130             Method returns stream's head item.
131             Head is a instance of class Async::Stream::Item.
132              
133             my $stream_head = $stream->head;
134             =cut
135             sub set_prefetch {
136 1     1 0 15 my $self = shift;
137 1   50     4 my $prefetch = shift // 0;
138              
139 1 50       3 if ($prefetch < 0) {
140 0         0 croak 'Prefetch can\'t be less then zero';
141             }
142              
143 1         2 $self->{_prefetch} = $prefetch;
144              
145 1         7 return $self;
146             }
147              
148              
149             =head2 iterator()
150              
151             Method returns stream's iterator.
152             Iterator is a instance of class Async::Stream::Iterator.
153              
154             my $stream_iterator = $stream->iterator;
155             =cut
156              
157             sub iterator {
158 53     53 1 179 return Async::Stream::Iterator->new($_[0]);
159             }
160              
161             =head2 to_arrayref($returing_cb)
162              
163             Method returns stream's iterator.
164              
165             $stream->to_arrayref(sub {
166             $array_ref = shift;
167              
168             #...
169             });
170             =cut
171             sub to_arrayref {
172 23     23 1 75 my $self = shift;
173 23         40 my $return_cb = shift;
174              
175 23 100       70 if (ref $return_cb ne 'CODE') {
176 1         91 croak 'First argument can be only subroutine reference'
177             }
178              
179 22         34 my @result;
180              
181 22         49 my $iterator = $self->iterator;
182              
183 22         43 my $next_cb; $next_cb = sub {
184 91     91   155 my $next_cb = $next_cb;
185             $iterator->next(sub {
186 91 100       194 if (@_) {
187 69         137 push @result, $_[0];
188 69         207 $next_cb->();
189             } else {
190 22         59 $return_cb->(\@result);
191             }
192 91         435 });
193 22         72 };$next_cb->();
  22         61  
194 22         91 weaken $next_cb;
195              
196 22         72 return $self;
197             }
198              
199             =head2 for_each($action)
200              
201             Method execute action on each item in stream.
202              
203             $stream->to_arrayref(sub {
204             $array_ref = shift;
205            
206             #...
207             });
208             =cut
209             sub for_each {
210 2     2 1 50 my $self = shift;
211 2         5 my $action = shift;
212              
213 2 100       6 if (ref $action ne 'CODE') {
214 1         72 croak 'First argument can be only subroutine reference'
215             }
216              
217 1         27 my $iterator = $self->iterator;
218              
219 1         2 my $each; $each = sub {
220 4     4   9 my $each = $each;
221             $iterator->next(sub {
222 4 100       12 if (@_) {
223 3         7 $action->($_[0]);
224 3         1507 $each->()
225             }
226 4         23 });
227 1         4 }; $each->();
  1         4  
228 1         5 weaken $each;
229              
230 1         2 return $self;
231             }
232              
233             =head2 peek($action)
234              
235             This method helps to debug streams data flow.
236             You can use this method for printing or logging steam data and track data
237             mutation between stream's transformations.
238              
239             $stream->peek(sub { print $_, "\n" })->to_arrayref(sub {print @{$_[0]}});
240             =cut
241             sub peek {
242 2     2 1 9 my $self = shift;
243 2         3 my $action = shift;
244              
245 2 100       8 if (ref $action ne 'CODE') {
246 1         73 croak 'First argument can be only subroutine reference'
247             }
248              
249 1         3 my $iterator = $self->iterator;
250             my $generator = sub {
251 4     4   5 my $return_cb = shift;
252             $iterator->next(sub {
253 4 100       13 if (@_) {
254 3         7 local *{_} = \$_[0];
255 3         8 $action->();
256 3         1481 $return_cb->($_[0]);
257             } else {
258 1         3 $return_cb->()
259             }
260 4         15 });
261 1         4 };
262              
263 1         4 $self->_set_head($generator, prefetch => 0);
264              
265 1         5 return $self;
266             }
267              
268             =head2 filter($predicate)
269              
270             The method filters current stream. Filter works like lazy grep.
271              
272             $stream->filter(sub {$_ % 2})->to_arrayref(sub {print @{$_[0]}});
273              
274             =cut
275             sub filter {
276 2     2 1 9 my $self = shift;
277 2         3 my $is_intresting = shift;
278              
279 2 100       8 if (ref $is_intresting ne 'CODE') {
280 1         94 croak 'First argument can be only subroutine reference'
281             }
282              
283 1         3 my $iterator = $self->iterator;
284              
285 1         3 my $next_cb; $next_cb = sub {
286 4     4   6 my $return_cb = shift;
287             $iterator->next(sub {
288 4 100       7 if (@_) {
289 3         7 local *{_} = \$_[0];
290 3 100       7 if ($is_intresting->()) {
291 2         11 $return_cb->($_[0]);
292             } else {
293 1         6 $next_cb->($return_cb)
294             }
295             } else {
296 1         3 $return_cb->()
297             }
298 4         15 });
299 1         4 };
300              
301 1         4 $self->_set_head($next_cb, prefetch => 0);
302              
303 1         5 return $self;
304             }
305              
306             =head2 smap($transformer)
307              
308             Method smap transforms current stream. Transform works like lazy map.
309              
310             $stream->transform(sub {$_ * 2})->to_arrayref(sub {print @{$_[0]}});
311              
312             =cut
313             sub smap {
314 2     2 1 8 my $self = shift;
315 2         4 my $transform = shift;
316              
317 2 100       7 if (ref $transform ne 'CODE') {
318 1         68 croak 'First argument can be only subroutine reference'
319             }
320              
321 1         3 my $iterator = $self->iterator;
322              
323 1         2 my $next_cb; $next_cb = sub {
324 4     4   7 my $return_cb = shift;
325             $iterator->next(sub {
326 4 100       8 if (@_) {
327 3         5 local *{_} = \$_[0];
328 3         7 $return_cb->($transform->());
329             } else {
330 1         3 $return_cb->()
331             }
332 4         14 });
333 1         4 };
334              
335 1         4 $self->_set_head($next_cb, prefetch => 0);
336              
337 1         5 return $self;
338             }
339              
340             =head2 transform($transformer)
341              
342             Method transform current stream.
343             Transform works like lazy map with async response.
344             You can use the method for example for async http request or another async
345             operation.
346              
347             $stream->transform(sub {
348             $return_cb = shift;
349             $return_cb->($_ * 2)
350             })->to_arrayref(sub {print @{$_[0]}});
351              
352             =cut
353             sub transform {
354 3     3 1 10 my $self = shift;
355 3         6 my $transform = shift;
356              
357 3 100       10 if (ref $transform ne 'CODE') {
358 1         68 croak 'First argument can be only subroutine reference'
359             }
360              
361 2         7 my $iterator = $self->iterator;
362              
363 2         6 my $next_cb; $next_cb = sub {
364 10     10   17 my $return_cb = shift;
365             $iterator->next(sub {
366 10 100       26 if (@_) {
367 7         17 local *{_} = \$_[0];
368 7         22 $transform->($return_cb);
369             } else {
370 3         7 $return_cb->()
371             }
372 10         42 });
373 2         9 };
374              
375 2         8 $self->_set_head($next_cb);
376              
377 2         11 return $self;
378             }
379              
380             =head2 reduce($accumulator, $returing_cb)
381              
382             Performs a reduction on the items of the stream.
383              
384             $stream->reduce(
385             sub{ $a + $b },
386             sub {
387             $sum = shift
388             #...
389             });
390              
391             =cut
392             sub reduce {
393 9     9 1 41 my $self = shift;
394 9         22 my $code = shift;
395 9         14 my $return_cb = shift;
396              
397 9 100 66     47 if (ref $return_cb ne 'CODE' or ref $code ne 'CODE') {
398 1         70 croak 'First and Second arguments can be only subroutine references'
399             }
400              
401 8         19 my $pkg = caller;
402              
403 8         19 my $iterator = $self->iterator;
404              
405             $iterator->next(sub {
406 8 50   8   22 if (@_) {
407 8         14 my $prev = $_[0];
408            
409 8         13 my $reduce_cb; $reduce_cb = sub {
410 24         38 my $reduce_cb = $reduce_cb;
411             $iterator->next(sub {
412 24 100       51 if (@_) {
413             {
414 3     3   23 no strict 'refs';
  3         9  
  3         2659  
  16         26  
415 16         27 local *{ $pkg . '::a' } = \$prev;
  16         58  
416 16         33 local *{ $pkg . '::b' } = \$_[0];
  16         44  
417 16         33 $prev = $code->();
418             }
419 16         63 $reduce_cb->();
420             } else {
421 8         33 $return_cb->($prev);
422             }
423 24         107 });
424 8         30 };$reduce_cb->();
  8         29  
425 8         61 weaken $reduce_cb;
426             } else {
427 0         0 $return_cb->();
428             }
429 8         42 });
430              
431 8         44 return $self;
432             }
433              
434             =head2 sum($returing_cb)
435              
436             The method computes sum of all items in stream.
437              
438             $stream->sum(
439             sub {
440             $sum = shift
441             #...
442             });
443             =cut
444             sub sum {
445 2     2 1 10 my $self = shift;
446 2         5 my $return_cb = shift;
447              
448 2 100       7 if (ref $return_cb ne 'CODE') {
449 1         69 croak 'First argument can be only subroutine reference'
450             }
451              
452 1     2   4 $self->reduce(sub{$a+$b}, $return_cb);
  2         5  
453              
454 1         3 return $self;
455             }
456              
457             =head2 min($returing_cb)
458              
459             The method finds out minimum item among all items in stream.
460              
461             $stream->min(
462             sub {
463             $sum = shift
464             #...
465             });
466             =cut
467             sub min {
468 3     3 1 41 my $self = shift;
469 3         8 my $return_cb = shift;
470              
471 3 100       12 if (ref $return_cb ne 'CODE') {
472 1         117 croak 'First argument can be only subroutine reference'
473             }
474              
475 2 100   4   15 $self->reduce(sub{$a < $b ? $a : $b}, $return_cb);
  4         17  
476              
477 2         7 return $self;
478             }
479              
480             =head2 max($returing_cb)
481              
482             The method finds out maximum item among all items in stream.
483              
484             $stream->max(
485             sub {
486             $sum = shift
487             #...
488             });
489             =cut
490             sub max {
491 3     3 1 25 my $self = shift;
492 3         5 my $return_cb = shift;
493              
494 3 100       9 if (ref $return_cb ne 'CODE') {
495 1         73 croak 'First argument can be only subroutine reference'
496             }
497              
498 2 100   4   20 $self->reduce(sub{$a > $b ? $a : $b}, $return_cb);
  4         25  
499              
500 2         4 return $self;
501             }
502              
503             =head2 append(@list_of_another_streams)
504              
505             The method appends several streams to tail of current stream.
506              
507             $stream->append($stream1)->to_arrayref(sub {print @{$_[0]}});
508             =cut
509             sub append {
510 2     2 1 12 my $self = shift;
511 2         7 my @streams = @_;
512              
513 2         7 for my $stream (@streams) {
514 2 100       15 if (!$stream->isa('Async::Stream')) {
515 1         77 croak 'Arguments can be only Async::Stream or instances of derived class'
516             }
517             }
518              
519 1         4 my $iterator = $self->iterator;
520              
521 1         4 my $generator; $generator = sub {
522 8     8   24 my $return_cb = shift;
523             $iterator->next(sub {
524 8 100       29 if (@_){
    100          
525 6         20 $return_cb->($_[0]);
526             } elsif (@streams) {
527 1         6 $iterator = (shift @streams)->iterator;
528 1         6 $generator->($return_cb);
529             } else {
530 1         4 $return_cb->();
531             }
532 8         54 });
533 1         6 };
534              
535 1         6 $self->_set_head($generator, prefetch => 0);
536              
537 1         9 return $self;
538             }
539              
540             =head2 count($returing_cb)
541              
542             The method counts number items in streams.
543              
544             $stream->count(sub {
545             $count = shift;
546             });
547             =cut
548             sub count {
549 2     2 1 12 my $self = shift;
550 2         4 my $return_cb = shift;
551              
552 2 100       9 if (ref $return_cb ne 'CODE') {
553 1         75 croak 'First argument can be only subroutine reference'
554             }
555              
556 1         3 my $result = 0;
557 1         3 my $iterator = $self->iterator;
558              
559 1         3 my $next_cb ; $next_cb = sub {
560 4     4   5 my $next_cb = $next_cb;
561             $iterator->next(sub {
562 4 100       10 if (@_) {
563 3         6 $result++;
564 3         7 return $next_cb->();
565             }
566 1         3 $return_cb->($result)
567 4         15 });
568 1         3 }; $next_cb->();
  1         4  
569 1         5 weaken $next_cb;
570              
571 1         3 return $self;
572             }
573              
574             =head2 skip($number)
575              
576             The method skips $number items in stream.
577              
578             $stream->skip(5)->to_arrayref(sub {print @{$_[0]}});
579             =cut
580             sub skip {
581 3     3 1 26 my $self = shift;
582 3         6 my $skip = int shift;
583              
584 3 100       10 if ($skip < 0) {
585 1         69 croak 'First argument can be only non-negative integer'
586             };
587              
588 2 100       7 if ($skip) {
589 1         3 my $iterator = $self->iterator;
590 1         2 my $generator; $generator = sub {
591 4     4   8 my $return_cb = shift;
592             $iterator->next(sub {
593 4 100       10 if (@_){
594 3 100       7 if ($skip-- > 0) {
595 1         4 $generator->($return_cb);
596             } else {
597 2         6 $return_cb->($_[0]);
598             }
599             } else {
600 1         3 $return_cb->();
601             }
602 4         15 });
603 1         3 };
604              
605 1         4 $self->_set_head($generator, prefetch => 0);
606              
607 1         5 return $self;
608             } else {
609 1         5 return $self;
610             }
611             }
612              
613             =head2 limit($number)
614              
615             The method limits $number items in stream.
616              
617             $stream->limit(5)->to_arrayref(sub {print @{$_[0]}});
618             =cut
619             sub limit {
620 4     4 1 26 my $self = shift;
621 4         8 my $limit = int shift;
622              
623 4 100       13 if ($limit < 0) {
624 1         72 croak 'First argument can be only non-negative integer'
625             }
626              
627 3         6 my $generator;
628 3 100       9 if ($limit) {
629 1         9 my $iterator = $self->iterator;
630              
631             $generator = sub {
632 2     2   4 my $return_cb = shift;
633 2 100       6 return $return_cb->() if ($limit-- <= 0);
634 1         4 $iterator->next($return_cb);
635             }
636 1         5 } else {
637             $generator = sub {
638 2     2   6 my $return_cb = shift;
639 2         7 $return_cb->();
640             }
641 2         9 }
642              
643 3         10 $self->_set_head($generator, prefetch => 0);
644              
645 3         20 return $self;
646             }
647              
648             =head2 arrange($comparator)
649              
650             The method sorts whole stream.
651              
652             $stream->arrange(sub{$a <=> $b})->to_arrayref(sub {print @{$_[0]}});
653             =cut
654             sub arrange {
655 2     2 1 10 my $self = shift;
656 2         4 my $comporator = shift;
657              
658 2 100       7 if (ref $comporator ne 'CODE') {
659 1         71 croak 'First argument can be only subroutine reference'
660             }
661              
662 1         3 my $pkg = caller;
663              
664 1         2 my $is_sorted = 0;
665 1         2 my @stream_items;
666 1         3 my $stream = $self;
667              
668 1         3 my $iterator = $self->iterator;
669              
670             my $generator = sub {
671 4     4   7 my $return_cb = shift;
672 4 100       8 if ($is_sorted) {
673 3 100       22 $return_cb->( @stream_items ? shift @stream_items : () );
674             } else {
675 1         2 my $next_cb; $next_cb = sub {
676 4         7 my $next_cb = $next_cb;
677             $iterator->next(sub {
678 4 100       9 if (@_) {
679 3         7 push @stream_items, $_[0];
680 3         8 $next_cb->();
681             } else {
682 1 50       4 if (@stream_items) {
683             {
684 3     3   23 no strict 'refs';
  3         6  
  3         854  
  1         2  
685 1         2 local *{ $pkg . '::a' } = *{ ref($self) . '::a' };
  1         5  
  1         4  
686 1         2 local *{ $pkg . '::b' } = *{ ref($self) . '::b' };
  1         3  
  1         3  
687 1         5 @stream_items = sort $comporator @stream_items;
688             }
689 1         8 $is_sorted = 1;
690 1         4 $return_cb->(shift @stream_items);
691             } else {
692 0         0 $return_cb->();
693             }
694             }
695 4         14 });
696 1         11 };$next_cb->();
  1         3  
697 1         7 weaken $next_cb;
698             }
699 1         4 };
700              
701 1         3 $self->_set_head($generator, prefetch => 0);
702              
703 1         5 return $self;
704             }
705              
706             =head2 cut_arrange($predicate, $comparator)
707              
708             Sometimes stream can be infinity and you can't you $stream->arrange,
709             you need certain parts of streams for example cut part by length of items.
710              
711             $stream
712             ->cut_arrange(sub {length $a != length $b},sub {$a <=> $b})
713             ->to_arrayref(sub {print @{$_[0]}});
714             =cut
715             sub cut_arrange {
716 5     5 1 882 my $self = shift;
717 5         11 my $cut = shift;
718 5         9 my $comporator = shift;
719              
720 5 100 100     32 if (ref $cut ne 'CODE' or ref $comporator ne 'CODE') {
721 3         220 croak 'First and Second arguments can be only subrotine references'
722             }
723              
724 2         8 my $pkg = caller;
725              
726 2         8 my $iterator = $self->iterator;
727              
728 2         13 my $prev;
729             my @cur_slice;
730 2         0 my @sorted_array;
731 2         0 my $generator; $generator = sub {
732 12     12   36 my $return_cb = shift;
733 12 100       34 if (@sorted_array) {
734 3         14 $return_cb->(shift @sorted_array);
735             } else {
736 9 100       30 if (!defined $prev) {
737             $iterator->next(sub {
738 2 100       9 if (@_){
739 1         3 $prev = $_[0];
740 1         3 @cur_slice = ($prev);
741 1         6 $generator->($return_cb);
742             } else {
743 1         5 $return_cb->();
744             }
745 2         16 });
746             } else {
747             $iterator->next(sub {
748 7 100       24 if (@_) {
749 5         11 my $is_cut;
750             {
751 3     3   19 no strict 'refs';
  3         6  
  3         211  
  5         15  
752 5         12 local ${ $pkg . '::a' } = $prev;
  5         25  
753 5         11 local ${ $pkg . '::b' } = $_[0];
  5         19  
754 5         17 $is_cut = $cut->();
755             }
756 5         40 $prev = $_[0];
757 5 100       17 if ($is_cut) {
758             {
759 3     3   18 no strict 'refs';
  3         7  
  3         271  
  2         5  
760 2         4 local *{ $pkg . '::a' } = *{ ref($self) . '::a' };
  2         11  
  2         10  
761 2         6 local *{ $pkg . '::b' } = *{ ref($self) . '::b' };
  2         9  
  2         9  
762 2         11 @sorted_array = sort $comporator @cur_slice;
763             }
764 2         25 @cur_slice = ($prev);
765 2         9 $return_cb->(shift @sorted_array);
766             } else {
767 3         7 push @cur_slice, $prev;
768 3         29 $generator->($return_cb);
769             }
770             } else {
771 2 100       8 if (@cur_slice) {
772             {
773 3     3   16 no strict 'refs';
  3         5  
  3         825  
  1         3  
774 1         3 local *{ $pkg . '::a' } = *{ ref($self) . '::a' };
  1         6  
  1         6  
775 1         3 local *{ $pkg . '::b' } = *{ ref($self) . '::b' };
  1         5  
  1         4  
776 1         6 @sorted_array = sort $comporator @cur_slice;
777             }
778 1         10 @cur_slice = ();
779 1         7 $return_cb->(shift @sorted_array);
780             } else {
781 1         5 $return_cb->();
782             }
783             }
784 7         71 });
785             }
786             }
787 2         12 };
788              
789 2         10 $self->_set_head($generator, prefetch => 0);
790              
791 2         17 return $self;
792             }
793              
794             =head2 merge {comparator} $stream1, $stream2;
795              
796             Merge two or more stream by comparing each item of stream and return new stream.
797              
798             my $ordered_stream = merge {$a <=> $b} $stream1, $stream2;
799             =cut
800             sub merge (&@) {
801 3     3 1 452 my $comporator = shift;
802              
803 3 100       11 if (ref $comporator ne 'CODE') {
804 1         122 croak 'First argument can be only reference to subroutine';
805             }
806              
807 2         4 my $pkg = caller;
808              
809 2         4 my @iterators;
810 2         5 for my $stream (@_) {
811 3 100       15 if ($stream->isa('Async::Stream')) {
812 2         4 push @iterators, [$stream->iterator];
813             } else {
814 1         73 croak 'Arguments can be only Async::Stream or instances of derived class'
815             }
816             }
817              
818             my $generator = sub {
819 7     7   12 my $return_cb = shift;
820 7         12 my $requested_item = grep { @{$_} == 1 } @iterators;
  13         21  
  13         28  
821 7         32 for (my $i = 0; $i < @iterators; $i++) {
822 8 50       12 if (@{$iterators[$i]} == 1) {
  8         22  
823 8         14 my $iterator_id = $i;
824             $iterators[$iterator_id][0]->next(sub {
825 8         12 $requested_item--;
826 8 100       16 if (@_) {
827 6         10 my $item = shift;
828 6         10 push @{$iterators[$iterator_id]}, $item;
  6         13  
829             } else {
830 2         4 $iterators[$iterator_id] = undef;
831             }
832              
833 8 100       20 if ($requested_item == 0) {
834             ### it's awful and need to optimize ###
835             {
836 3     3   22 no strict 'refs';
  3         7  
  3         2636  
  7         11  
837             my $comp = sub {
838 5         9 local ${ $pkg . '::a' } = $a->[1];
  5         15  
839 5         6 local ${ $pkg . '::b' } = $b->[1];
  5         10  
840 5         12 return $comporator->();
841 7         29 };
842 7         15 @iterators = sort $comp grep { defined $_ } @iterators;
  13         38  
843             }
844             ### ###
845 7 100       40 if (@iterators) {
846 6         12 my $item = pop @{$iterators[0]};
  6         16  
847 6         17 $return_cb->($item);
848             } else {
849 1         6 $return_cb->();
850             }
851             }
852 8         44 });
853             }
854             }
855 1         5 };
856              
857 1         3 return Async::Stream->new($generator);
858             }
859              
860             =head2 branch {predicat} $stream;
861              
862             Split stream into 2 stream are divided by predicat. Branch returns 2 streams.
863             First stream will contain "true" items, Second - "false" items;
864              
865             my ($success_stream, $error_stream)
866             = branch {$_->{headers}{status} == 200} $stream;
867             =cut
868             sub branch (&$) {
869 2     2 1 15 my $predicat = shift;
870 2         4 my $source_stream = shift;
871            
872 2         5 my @truth_items;
873             my @false_items;
874              
875 2         12 my $iterator = $source_stream->iterator;
876              
877 1         2 my $generator; $generator = sub {
878 11     11   16 my $return_cb = shift;
879 11         16 my $is_truth_branch = shift;
880              
881 11 50 66     61 if ($is_truth_branch && @truth_items) {
    100 100        
882 0         0 return $return_cb->(shift @truth_items);
883             } elsif (!$is_truth_branch && @false_items) {
884 3         10 return $return_cb->(shift @false_items);
885             }
886              
887             $iterator->next(sub {
888 8 100       18 if (@_) {
889 6         12 my $item = shift;
890 6         9 my $is_truth;
891            
892             {
893 6         9 local $_ = $item;
  6         11  
894 6         12 $is_truth = $predicat->();
895             }
896              
897 6 100 66     44 if ($is_truth_branch && !$is_truth) {
    50 33        
898 3         5 push @false_items, $item;
899 3         10 return $generator->($return_cb,$is_truth_branch);
900             } elsif (!$is_truth_branch && $is_truth) {
901 0         0 push @truth_items, $item;
902 0         0 return $generator->($return_cb,$is_truth_branch);
903             } else {
904 3         8 return $return_cb->($item);
905             }
906             } else {
907 2         6 $return_cb->();
908             }
909 8         42 });
910 1         5 };
911              
912              
913 1     4   4 my $truth_branch = Async::Stream->new(sub { $generator->($_[0], 1) });
  4         13  
914 1     4   5 my $false_branch = Async::Stream->new(sub { $generator->($_[0], 0) });
  4         12  
915              
916 1         5 return $truth_branch, $false_branch
917             }
918              
919             =head2 any($predicat, $return_cb)
920              
921             Method look for any equivalent item in steam. if there is any then return that.
922             if there isn't then return nothing.
923              
924             $stream->any(sub {$_ % 2})->to_arrayref(sub {print @{$_[0]}});
925             =cut
926             sub any {
927 2     2 1 17 my $self = shift;
928 2         4 my $predicat = shift;
929 2         5 my $return_cb = shift;
930              
931 2         5 my $iterator = $self->iterator;
932              
933 2         3 my $next_cb; $next_cb = sub {
934 6     6   8 my $next_cb = $next_cb;
935             $iterator->next(sub {
936 6 100       12 if (@_) {
937 5         11 local *{_} = \$_[0];
938 5 100       12 if ($predicat->()) {
939 1         5 $return_cb->($_[0]);
940             } else {
941 4         21 $next_cb->();
942             }
943             } else {
944 1         5 $return_cb->()
945             }
946 6         22 });
947 2         7 }; $next_cb->();
  2         6  
948 2         10 weaken $next_cb;
949              
950 2         16 return $self;
951             }
952              
953             =head2 distinct($key_generator)
954              
955             Method discards duplicate items from stream.
956             By default uniqueness of items will be determined by textual representation of item.
957              
958             $stream->distinct(sub {$_->{name}})->to_arrayref(sub {print @{$_[0]}});
959             =cut
960             sub distinct {
961 3     3 1 41 my $self = shift;
962 3         7 my $to_key = shift;
963              
964 3 100       14 if (ref $to_key ne 'CODE') {
965 1     7   6 $to_key = sub { "$_" };
  7         30  
966             }
967              
968 3         10 my $iterator = $self->iterator;
969              
970 3         10 my %index_of;
971              
972             my $generator; $generator = sub {
973 24     24   62 my $return_cb = shift;
974             $iterator->next(sub {
975 24 100       65 if (@_) {
976 21         53 my $key;
977             {
978 21         43 local *{_} = \$_[0];
  21         60  
979 21         64 $key = $to_key->()
980             }
981              
982 21 100       107 if (exists $index_of{$key}) {
983 14         67 $generator->($return_cb);
984             } else {
985 7         20 $index_of{$key} = undef;
986 7         26 $return_cb->($_[0]);
987             }
988             } else {
989 3         11 $return_cb->();
990             }
991 24         159 });
992              
993 3         14 };
994              
995 3         12 $self->_set_head($generator);
996              
997 3         23 return $self;
998             }
999              
1000              
1001             sub _set_head {
1002 57     57   100 my $self = shift;
1003 57         94 my $generator = shift;
1004 57         127 my %args = @_;
1005              
1006 57   100     228 my $prefetch = $args{prefetch} // $self->{_prefetch};
1007              
1008 57 100       126 if ($prefetch) {
1009 2         9 my $new_generator = _get_prefetch_generator($generator, $self->{_prefetch});
1010 2         7 $self->{_head} = Async::Stream::Item->new(undef, $new_generator);
1011             } else {
1012 55         199 $self->{_head} = Async::Stream::Item->new(undef, $generator);
1013             }
1014              
1015 57         130 return $self;
1016             }
1017              
1018             sub _get_prefetch_generator {
1019 2     2   7 my ($generator,$prefetch) = @_;
1020              
1021 2         5 my @responses_cache;
1022             my @requests_queue;
1023 2         4 my $is_exhausted = 0;
1024 2         3 my $item_requested = 0;
1025              
1026             return sub {
1027 10     10   19 my $return_cb = shift;
1028              
1029 10 50       24 if (@responses_cache) {
1030 0         0 $return_cb->(shift @responses_cache);
1031             } else {
1032 10         25 push @requests_queue, $return_cb;
1033             }
1034              
1035 10 100 66     37 if (!$is_exhausted) {
    100          
1036 6         20 for (0 .. ($prefetch - $item_requested)) {
1037 18         45 $item_requested++;
1038             $generator->(sub {
1039 18         91 $item_requested--;
1040 18 100       36 if (@_) {
1041 8 50       16 if (@requests_queue) {
1042 8         21 shift(@requests_queue)->($_[0]);
1043             } else {
1044 0         0 push @responses_cache, $_[0];
1045             }
1046             } else {
1047 10         14 $is_exhausted = 1;
1048 10 50 66     58 if (!$item_requested && @requests_queue) {
1049 0         0 shift(@requests_queue)->();
1050             }
1051             }
1052 18         79 });
1053             }
1054             } elsif (!$item_requested && @requests_queue) {
1055 2         8 shift(@requests_queue)->();
1056             }
1057 2         12 };
1058             }
1059              
1060             =head1 AUTHOR
1061              
1062             Kirill Sysoev, C<< >>
1063              
1064             =head1 BUGS AND LIMITATIONS
1065              
1066             Please report any bugs or feature requests to
1067             L.
1068              
1069             =head1 SUPPORT
1070              
1071             You can find documentation for this module with the perldoc command.
1072              
1073             perldoc Async::Stream::Item
1074              
1075              
1076             =head1 LICENSE AND COPYRIGHT
1077              
1078             Copyright 2017 Kirill Sysoev.
1079              
1080             This program is free software; you can redistribute it and/or modify it
1081             under the terms of the the Artistic License (2.0). You may obtain a
1082             copy of the full license at:
1083              
1084             L
1085             =cut
1086              
1087             1; # End of Async::Stream