File Coverage

blib/lib/Async/Stream.pm
Criterion Covered Total %
statement 504 514 98.0
branch 146 156 93.5
condition 29 38 76.3
subroutine 65 65 100.0
pod 24 24 100.0
total 768 797 96.3


line stmt bran cond sub pod time code
1             package Async::Stream;
2              
3 3     3   46559 use 5.010;
  3         14  
4 3     3   18 use strict;
  3         6  
  3         89  
5 3     3   17 use warnings;
  3         10  
  3         99  
6              
7 3     3   951 use Async::Stream::Item;
  3         23  
  3         93  
8 3     3   980 use Async::Stream::Iterator;
  3         8  
  3         76  
9 3     3   16 use Scalar::Util qw(weaken);
  3         5  
  3         162  
10 3     3   16 use Carp;
  3         5  
  3         110  
11 3     3   13 use Exporter 'import';
  3         6  
  3         3726  
12              
13             our @EXPORT_OK = qw(merge branch);
14              
15             =head1 NAME
16              
17             Async::Stream - it's convenient way to work with async data flow.
18              
19             IMPORTANT! PUBLIC INTERFACE ISN'T STABLE, DO NOT USE IN PRODACTION BEFORE VERSION 1.0.
20              
21             =head1 VERSION
22              
23             Version 0.08
24              
25             =cut
26              
27             our $VERSION = '0.08';
28              
29             =head1 SYNOPSIS
30              
31             Module helps to organize your async code to stream.
32              
33             use Async::Stream;
34              
35             my @urls = qw(
36             http://ucoz.com
37             http://ya.ru
38             http://google.com
39             );
40              
41             my $stream = Async::Stream->new_from(@urls);
42              
43             $stream
44             ->transform(sub {
45             $return_cb = shift;
46             http_get $_, sub {
47             $return_cb->({headers => $_[0], body => $_[0]})
48             };
49             }, 'async')
50             ->filter(sub { $_->{headers}->{Status} =~ /^2/ })
51             ->for_each(sub {
52             my $item = shift;
53             print $item->{body};
54             });
55              
56             =head1 SUBROUTINES/METHODS
57              
58             =head2 new($generator)
59              
60             Constructor creates instance of class.
61             Class method gets 1 arguments - generator subroutine references to generate items.
62             Generator will get a callback which it will use for returning result.
63             If generator is exhausted then returning callback is called without arguments.
64              
65             my $i = 0;
66             my $stream = Async::Stream->new(sub {
67             $return_cb = shift;
68             if ($i < 10) {
69             $return_cb->($i++);
70             } else {
71             $return_cb->();
72             }
73             });
74             =cut
75             sub new {
76 41     41 1 8220 my $class = shift;
77 41         70 my $generator = shift;
78 41         99 my %args = @_;
79              
80 41 100 66     230 if (ref $generator ne 'CODE') {
    50          
81 1         130 croak 'First argument can be only subroutine reference';
82             } elsif (defined $args{prefetch} and $args{prefetch} < 0) {
83 0         0 croak 'Prefetch can\'t be less then zero';
84             }
85              
86             my $self = bless {
87             _head => undef,
88 40   100     264 _prefetch => int($args{prefetch} // 0),
89             }, $class;
90              
91 40         140 $self->_set_head($generator);
92              
93 40         121 return $self;
94             }
95              
96             =head2 new_from(@array_of_items)
97              
98             Constructor creates instance of class.
99             Class method gets a list of items which are used for generating streams.
100            
101             my @domains = qw(
102             ucoz.com
103             ya.ru
104             googl.com
105             );
106            
107             my $stream = Async::Stream->new_from(@urls)
108              
109             =cut
110             sub new_from {
111 33     33 1 31299 my $class = shift;
112 33         78 my $items = \@_;
113              
114 134 100   134   215 return $class->new(sub { $_[0]->(@{$items} ? (shift @{$items}) : ()) })
  134         328  
  106         280  
115 33         176 }
116              
117             =head2 head()
118              
119             Method returns stream's head item.
120             Head is a instance of class Async::Stream::Item.
121              
122             my $stream_head = $stream->head;
123             =cut
124             sub head {
125 58     58 1 140 return $_[0]->{_head};
126             }
127              
128             =head2 set_prefetch($number)
129              
130             Method returns stream's head item.
131             Head is a instance of class Async::Stream::Item.
132              
133             $stream->set_prefetch(4);
134             =cut
135             sub set_prefetch {
136 1     1 1 19 my $self = shift;
137 1   50     5 my $prefetch = shift // 0;
138              
139 1 50       5 if ($prefetch < 0) {
140 0         0 croak 'Prefetch can\'t be less then zero';
141             }
142              
143 1         4 $self->{_prefetch} = $prefetch;
144              
145 1         12 return $self;
146             }
147              
148              
149             =head2 iterator()
150              
151             Method returns stream's iterator.
152             Iterator is a instance of class Async::Stream::Iterator.
153              
154             my $stream_iterator = $stream->iterator;
155             =cut
156              
157             sub iterator {
158 56     56 1 188 return Async::Stream::Iterator->new($_[0]);
159             }
160              
161             =head1 CONVEYOR METHODS
162              
163             =head2 peek($action)
164              
165             This method helps to debug streams data flow.
166             You can use this method for printing or logging steam data and track data
167             mutation between stream's transformations.
168              
169             $stream->peek(sub { print $_, "\n" })->to_arrayref(sub {print @{$_[0]}});
170             =cut
171             sub peek {
172 2     2 1 13 my $self = shift;
173 2         6 my $action = shift;
174              
175 2 100       9 if (ref $action ne 'CODE') {
176 1         77 croak 'First argument can be only subroutine reference'
177             }
178              
179 1         4 my $iterator = $self->iterator;
180             my $generator = sub {
181 4     4   9 my $return_cb = shift;
182             $iterator->next(sub {
183 4 100       12 if (@_) {
184 3         10 local *{_} = \$_[0];
185 3         11 $action->();
186 3         1437 $return_cb->($_[0]);
187             } else {
188 1         3 $return_cb->()
189             }
190 4         17 });
191 1         6 };
192              
193 1         6 $self->_set_head($generator, prefetch => 0);
194              
195 1         9 return $self;
196             }
197              
198             =head2 filter($predicate)
199              
200             The method filters current stream. Filter works like lazy grep.
201              
202             $stream->filter(sub {$_ % 2})->to_arrayref(sub {print @{$_[0]}});
203              
204             =cut
205             sub filter {
206 3     3 1 19 my $self = shift;
207 3         5 my $predicate = shift;
208 3   100     15 my $async_opt = shift // 'sync';
209              
210 3         5 my $is_async = 0;
211 3 100       9 if ($async_opt eq 'async') {
212 1         2 $is_async = 1;
213             }
214              
215 3 100       9 if (ref $predicate ne 'CODE') {
216 1         78 croak 'First argument can be only subroutine reference'
217             }
218              
219 2         5 my $iterator = $self->iterator;
220              
221 2         3 my $next_cb; $next_cb = sub {
222 8     8   17 my $return_cb = shift;
223             $iterator->next(sub {
224 8 100       18 if (@_) {
225 6         8 my $item = shift;
226 6         13 local *{_} = \$item;
227 6 100       14 if ($is_async) {
228             $predicate->(sub {
229 3         12 my $is_intresting = shift;
230 3 100       9 if ($is_intresting) {
231 2         6 $return_cb->($item);
232             } else {
233 1         3 $next_cb->($return_cb);
234             }
235 3         20 });
236             } else {
237 3 100       6 if ($predicate->()) {
238 2         11 $return_cb->($item);
239             } else {
240 1         5 $next_cb->($return_cb);
241             }
242             }
243             } else {
244 2         6 $return_cb->()
245             }
246 8         28 });
247 2         7 };
248              
249 2 100       8 $self->_set_head($next_cb, $is_async ? () : ( prefetch => 0 ),);
250              
251 2         10 return $self;
252             }
253              
254             =head2 transform($transformer)
255              
256             Method transform current stream.
257             Transform works like lazy map with async response.
258             You can use the method for example for async http request or another async
259             operation.
260              
261             $stream->transform(sub {
262             $return_cb = shift;
263             $return_cb->($_ * 2)
264             })->to_arrayref(sub {print @{$_[0]}});
265              
266             =cut
267             sub transform {
268 4     4 1 32 my $self = shift;
269 4         9 my $transform = shift;
270 4   100     22 my $async_opt = shift // 'sync';
271              
272 4         10 my $is_async = 0;
273 4 100       16 if ($async_opt eq 'async') {
274 2         5 $is_async = 1;
275             }
276              
277 4 100       14 if (ref $transform ne 'CODE') {
278 1         71 croak 'First argument can be only subroutine reference'
279             }
280              
281 3         11 my $iterator = $self->iterator;
282              
283 3         7 my $next_cb; $next_cb = sub {
284 14     14   31 my $return_cb = shift;
285             $iterator->next(sub {
286 14 100       34 if (@_) {
287 10         22 local *{_} = \$_[0];
288 10 100       21 if ($is_async) {
289 7         19 $transform->($return_cb);
290             } else {
291 3         12 $return_cb->($transform->());
292             }
293             } else {
294 4         13 $return_cb->()
295             }
296 14         69 });
297 3         12 };
298              
299 3 100       15 $self->_set_head($next_cb, $is_async ? () : ( prefetch => 0 ),);
300              
301 3         18 return $self;
302             }
303              
304             =head2 append(@list_streams)
305              
306             The method appends several streams to tail of current stream.
307              
308             $stream->append($stream1)->to_arrayref(sub {print @{$_[0]}});
309             =cut
310             sub append {
311 2     2 1 8 my $self = shift;
312 2         7 my @streams = @_;
313              
314 2         5 for my $stream (@streams) {
315 2 100       16 if (!$stream->isa('Async::Stream')) {
316 1         113 croak 'Arguments can be only Async::Stream or instances of derived class'
317             }
318             }
319              
320 1         2 my $iterator = $self->iterator;
321              
322 1         2 my $generator; $generator = sub {
323 8     8   13 my $return_cb = shift;
324             $iterator->next(sub {
325 8 100       18 if (@_){
    100          
326 6         14 $return_cb->($_[0]);
327             } elsif (@streams) {
328 1         3 $iterator = (shift @streams)->iterator;
329 1         4 $generator->($return_cb);
330             } else {
331 1         4 $return_cb->();
332             }
333 8         34 });
334 1         3 };
335              
336 1         4 $self->_set_head($generator, prefetch => 0);
337              
338 1         6 return $self;
339             }
340              
341             =head2 skip($number)
342              
343             The method skips $number items in stream.
344              
345             $stream->skip(5)->to_arrayref(sub {print @{$_[0]}});
346             =cut
347             sub skip {
348 3     3 1 33 my $self = shift;
349 3         6 my $skip = int shift;
350              
351 3 100       10 if ($skip < 0) {
352 1         174 croak 'First argument can be only non-negative integer'
353             };
354              
355 2 100       6 if ($skip) {
356 1         3 my $iterator = $self->iterator;
357 1         2 my $generator; $generator = sub {
358 4     4   6 my $return_cb = shift;
359             $iterator->next(sub {
360 4 100       9 if (@_){
361 3 100       8 if ($skip-- > 0) {
362 1         4 $generator->($return_cb);
363             } else {
364 2         5 $return_cb->($_[0]);
365             }
366             } else {
367 1         3 $return_cb->();
368             }
369 4         14 });
370 1         4 };
371              
372 1         3 $self->_set_head($generator, prefetch => 0);
373              
374 1         5 return $self;
375             } else {
376 1         6 return $self;
377             }
378             }
379              
380             =head2 limit($number)
381              
382             The method limits $number items in stream.
383              
384             $stream->limit(5)->to_arrayref(sub {print @{$_[0]}});
385             =cut
386             sub limit {
387 4     4 1 24 my $self = shift;
388 4         6 my $limit = int shift;
389              
390 4 100       12 if ($limit < 0) {
391 1         69 croak 'First argument can be only non-negative integer'
392             }
393              
394 3         5 my $generator;
395 3 100       7 if ($limit) {
396 1         8 my $iterator = $self->iterator;
397              
398             $generator = sub {
399 2     2   3 my $return_cb = shift;
400 2 100       7 return $return_cb->() if ($limit-- <= 0);
401 1         3 $iterator->next($return_cb);
402             }
403 1         4 } else {
404             $generator = sub {
405 2     2   4 my $return_cb = shift;
406 2         5 $return_cb->();
407             }
408 2         6 }
409              
410 3         9 $self->_set_head($generator, prefetch => 0);
411              
412 3         14 return $self;
413             }
414              
415             =head2 sorted($comparator)
416              
417             The method sorts whole stream.
418              
419             $stream->sorted(sub{$a <=> $b})->to_arrayref(sub {print @{$_[0]}});
420             =cut
421             sub sorted {
422 2     2 1 14 my $self = shift;
423 2         5 my $comporator = shift;
424              
425 2 100       11 if (ref $comporator ne 'CODE') {
426 1         82 croak 'First argument can be only subroutine reference'
427             }
428              
429 1         4 my $pkg = caller;
430              
431 1         4 my $is_sorted = 0;
432 1         2 my @stream_items;
433 1         2 my $stream = $self;
434              
435 1         5 my $iterator = $self->iterator;
436              
437             my $generator = sub {
438 4     4   9 my $return_cb = shift;
439 4 100       9 if ($is_sorted) {
440 3 100       13 $return_cb->( @stream_items ? shift @stream_items : () );
441             } else {
442 1         2 my $next_cb; $next_cb = sub {
443 4         5 my $next_cb = $next_cb;
444             $iterator->next(sub {
445 4 100       8 if (@_) {
446 3         6 push @stream_items, $_[0];
447 3         10 $next_cb->();
448             } else {
449 1 50       3 if (@stream_items) {
450             {
451 3     3   22 no strict 'refs';
  3         6  
  3         964  
  1         2  
452 1         1 local *{ $pkg . '::a' } = *{ ref($self) . '::a' };
  1         4  
  1         5  
453 1         2 local *{ $pkg . '::b' } = *{ ref($self) . '::b' };
  1         3  
  1         7  
454 1         5 @stream_items = sort $comporator @stream_items;
455             }
456 1         9 $is_sorted = 1;
457 1         3 $return_cb->(shift @stream_items);
458             } else {
459 0         0 $return_cb->();
460             }
461             }
462 4         17 });
463 1         3 };$next_cb->();
  1         3  
464 1         6 weaken $next_cb;
465             }
466 1         6 };
467              
468 1         6 $self->_set_head($generator, prefetch => 0);
469              
470 1         6 return $self;
471             }
472              
473             =head2 cut_sorted($predicate, $comparator)
474              
475             Sometimes stream can be infinity and you can't you $stream->sorted,
476             you need certain parts of streams for example cut part by length of items.
477              
478             $stream
479             ->cut_sorted(sub {length $a != length $b},sub {$a <=> $b})
480             ->to_arrayref(sub {print @{$_[0]}});
481             =cut
482             sub cut_sorted {
483 5     5 1 759 my $self = shift;
484 5         6 my $cut = shift;
485 5         10 my $comporator = shift;
486              
487 5 100 100     24 if (ref $cut ne 'CODE' or ref $comporator ne 'CODE') {
488 3         235 croak 'First and Second arguments can be only subrotine references'
489             }
490              
491 2         6 my $pkg = caller;
492              
493 2         6 my $iterator = $self->iterator;
494              
495 2         12 my $prev;
496             my @cur_slice;
497 2         0 my @sorted_array;
498 2         0 my $generator; $generator = sub {
499 12     12   24 my $return_cb = shift;
500 12 100       27 if (@sorted_array) {
501 3         8 $return_cb->(shift @sorted_array);
502             } else {
503 9 100       21 if (!defined $prev) {
504             $iterator->next(sub {
505 2 100       7 if (@_){
506 1         2 $prev = $_[0];
507 1         2 @cur_slice = ($prev);
508 1         4 $generator->($return_cb);
509             } else {
510 1         3 $return_cb->();
511             }
512 2         11 });
513             } else {
514             $iterator->next(sub {
515 7 100       13 if (@_) {
516 5         10 my $is_cut;
517             {
518 3     3   19 no strict 'refs';
  3         6  
  3         206  
  5         7  
519 5         8 local ${ $pkg . '::a' } = $prev;
  5         14  
520 5         10 local ${ $pkg . '::b' } = $_[0];
  5         11  
521 5         11 $is_cut = $cut->();
522             }
523 5         25 $prev = $_[0];
524 5 100       9 if ($is_cut) {
525             {
526 3     3   18 no strict 'refs';
  3         6  
  3         256  
  2         4  
527 2         3 local *{ $pkg . '::a' } = *{ ref($self) . '::a' };
  2         6  
  2         6  
528 2         3 local *{ $pkg . '::b' } = *{ ref($self) . '::b' };
  2         4  
  2         6  
529 2         6 @sorted_array = sort $comporator @cur_slice;
530             }
531 2         10 @cur_slice = ($prev);
532 2         5 $return_cb->(shift @sorted_array);
533             } else {
534 3         6 push @cur_slice, $prev;
535 3         15 $generator->($return_cb);
536             }
537             } else {
538 2 100       6 if (@cur_slice) {
539             {
540 3     3   15 no strict 'refs';
  3         5  
  3         861  
  1         2  
541 1         2 local *{ $pkg . '::a' } = *{ ref($self) . '::a' };
  1         3  
  1         8  
542 1         2 local *{ $pkg . '::b' } = *{ ref($self) . '::b' };
  1         2  
  1         3  
543 1         4 @sorted_array = sort $comporator @cur_slice;
544             }
545 1         5 @cur_slice = ();
546 1         6 $return_cb->(shift @sorted_array);
547             } else {
548 1         6 $return_cb->();
549             }
550             }
551 7         61 });
552             }
553             }
554 2         10 };
555              
556 2         9 $self->_set_head($generator, prefetch => 0);
557              
558 2         10 return $self;
559             }
560              
561             =head2 merge_in($comparator, @list_streams);
562              
563             Merge additional streams into current stream by comparing each item of stream.
564              
565             $stream->merge_in(sub{$a <=> $b}, $stream1, $stream2);
566             =cut
567             sub merge_in {
568 3     3 1 434 my $self = shift;
569 3         5 my $comporator = shift;
570              
571 3 100       10 if (ref $comporator ne 'CODE') {
572 1         104 croak 'First argument can be only reference to subroutine';
573             }
574              
575 2         5 my $pkg = caller;
576              
577 2         4 my @iterators;
578 2         5 for my $stream ($self, @_) {
579 4 100       19 if ($stream->isa('Async::Stream')) {
580 3         8 push @iterators, [$stream->iterator];
581             } else {
582 1         63 croak 'Arguments can be only Async::Stream or instances of derived class'
583             }
584             }
585              
586             my $generator = sub {
587 7     7   10 my $return_cb = shift;
588 7         16 my $requested_item = grep { @{$_} == 1 } @iterators;
  13         17  
  13         31  
589 7         21 for (my $i = 0; $i < @iterators; $i++) {
590 8 50       13 if (@{$iterators[$i]} == 1) {
  8         20  
591 8         11 my $iterator_id = $i;
592             $iterators[$iterator_id][0]->next(sub {
593 8         13 $requested_item--;
594 8 100       19 if (@_) {
595 6         9 my $item = shift;
596 6         8 push @{$iterators[$iterator_id]}, $item;
  6         13  
597             } else {
598 2         6 $iterators[$iterator_id] = undef;
599             }
600              
601 8 100       18 if ($requested_item == 0) {
602             ### it's awful and need to optimize ###
603             {
604 3     3   22 no strict 'refs';
  3         7  
  3         2496  
  7         10  
605             my $comp = sub {
606 5         10 local ${ $pkg . '::a' } = $a->[1];
  5         16  
607 5         8 local ${ $pkg . '::b' } = $b->[1];
  5         9  
608 5         15 return $comporator->();
609 7         26 };
610 7         13 @iterators = sort $comp grep { defined $_ } @iterators;
  13         45  
611             }
612             ### ###
613 7 100       44 if (@iterators) {
614 6         15 my $item = pop @{$iterators[0]};
  6         15  
615 6         16 $return_cb->($item);
616             } else {
617 1         3 $return_cb->();
618             }
619             }
620 8         37 });
621             }
622             }
623 1         5 };
624              
625 1         5 $self->_set_head($generator);
626              
627 1         3 return $self;
628             }
629              
630             =head2 $branch_stream branch_out($predicat);
631              
632             Split stream into 2 stream are divided by predicat. Branch returns 2 streams.
633             First stream will contain "true" items, Second - "false" items;
634              
635             my $error_stream = $stream->branch_out(sub{$_->{headers}{status} != 200});
636             =cut
637             sub branch_out {
638 2     2 1 16 my $self = shift;
639 2         5 my $predicat = shift;
640              
641 2 100       9 if (ref $predicat ne 'CODE') {
642 1         75 croak 'First argument can be only subroutine reference'
643             }
644              
645 1         3 my @branch_items;
646             my @self_items;
647              
648 1         5 my $iterator = $self->iterator;
649              
650 1         4 my $generator; $generator = sub {
651 11     11   22 my $return_cb = shift;
652 11         17 my $is_for_branch = shift;
653              
654 11 50 66     75 if ($is_for_branch && @branch_items) {
    100 100        
655 0         0 return $return_cb->(shift @branch_items);
656             } elsif (!$is_for_branch && @self_items) {
657 3         11 return $return_cb->(shift @self_items);
658             }
659              
660             $iterator->next(sub {
661 8 100       18 if (@_) {
662 6         12 my $item = shift;
663 6         10 my $is_branch_item;
664            
665             {
666 6         12 local *{_} = \$item;
  6         15  
667 6         18 $is_branch_item = $predicat->();
668             }
669              
670 6 100 66     50 if ($is_for_branch && !$is_branch_item) {
    50 33        
671 3         7 push @self_items, $item;
672 3         12 return $generator->($return_cb,$is_for_branch);
673             } elsif (!$is_for_branch && $is_branch_item) {
674 0         0 push @branch_items, $item;
675 0         0 return $generator->($return_cb,$is_for_branch);
676             } else {
677 3         11 return $return_cb->($item);
678             }
679             } else {
680 2         6 $return_cb->();
681             }
682 8         44 });
683 1         7 };
684              
685 1     4   8 $self->_set_head(sub { $generator->($_[0], 0) });
  4         12  
686 1     4   8 return Async::Stream->new(sub { $generator->($_[0], 1) });
  4         15  
687             }
688              
689             =head2 distinct($key_generator)
690              
691             Method discards duplicate items from stream.
692             By default uniqueness of items will be determined by textual representation of item.
693              
694             $stream->distinct(sub {$_->{name}})->to_arrayref(sub {print @{$_[0]}});
695             =cut
696             sub distinct {
697 3     3 1 27 my $self = shift;
698 3         5 my $to_key = shift;
699              
700 3 100       9 if (ref $to_key ne 'CODE') {
701 1     7   4 $to_key = sub { "$_" };
  7         18  
702             }
703              
704 3         7 my $iterator = $self->iterator;
705              
706 3         5 my %index_of;
707              
708             my $generator; $generator = sub {
709 24     24   36 my $return_cb = shift;
710             $iterator->next(sub {
711 24 100       47 if (@_) {
712 21         29 my $key;
713             {
714 21         28 local *{_} = \$_[0];
  21         39  
715 21         46 $key = $to_key->()
716             }
717              
718 21 100       69 if (exists $index_of{$key}) {
719 14         38 $generator->($return_cb);
720             } else {
721 7         16 $index_of{$key} = undef;
722 7         19 $return_cb->($_[0]);
723             }
724             } else {
725 3         7 $return_cb->();
726             }
727 24         94 });
728              
729 3         9 };
730              
731 3         8 $self->_set_head($generator);
732              
733 3         14 return $self;
734             }
735              
736             =head1 TERMINAL METHODS
737              
738             =head2 to_arrayref($returing_cb)
739              
740             Method returns stream's iterator.
741              
742             $stream->to_arrayref(sub {
743             $array_ref = shift;
744              
745             #...
746             });
747             =cut
748             sub to_arrayref {
749 24     24 1 67 my $self = shift;
750 24         44 my $return_cb = shift;
751              
752 24 100       62 if (ref $return_cb ne 'CODE') {
753 1         113 croak 'First argument can be only subroutine reference'
754             }
755              
756 23         43 my @result;
757              
758 23         43 my $iterator = $self->iterator;
759              
760 23         45 my $next_cb; $next_cb = sub {
761 94     94   148 my $next_cb = $next_cb;
762             $iterator->next(sub {
763 94 100       191 if (@_) {
764 71         142 push @result, $_[0];
765 71         206 $next_cb->();
766             } else {
767 23         58 $return_cb->(\@result);
768             }
769 94         394 });
770 23         74 };$next_cb->();
  23         55  
771 23         91 weaken $next_cb;
772              
773 23         86 return $self;
774             }
775              
776             =head2 for_each($action)
777              
778             Method execute action on each item in stream.
779              
780             $stream->to_arrayref(sub {
781             $array_ref = shift;
782            
783             #...
784             });
785             =cut
786             sub for_each {
787 2     2 1 18 my $self = shift;
788 2         40 my $action = shift;
789              
790 2 100       13 if (ref $action ne 'CODE') {
791 1         86 croak 'First argument can be only subroutine reference'
792             }
793              
794 1         5 my $iterator = $self->iterator;
795              
796 1         4 my $each; $each = sub {
797 4     4   9 my $each = $each;
798             $iterator->next(sub {
799 4 100       14 if (@_) {
800 3         10 $action->($_[0]);
801 3         1714 $each->()
802             }
803 4         22 });
804 1         6 }; $each->();
  1         5  
805 1         7 weaken $each;
806              
807 1         3 return $self;
808             }
809              
810             =head2 reduce($accumulator, $returing_cb)
811              
812             Performs a reduction on the items of the stream.
813              
814             $stream->reduce(
815             sub{ $a + $b },
816             sub {
817             $sum = shift
818             #...
819             });
820              
821             =cut
822             sub reduce {
823 9     9 1 35 my $self = shift;
824 9         18 my $code = shift;
825 9         14 my $return_cb = shift;
826              
827 9 100 66     47 if (ref $return_cb ne 'CODE' or ref $code ne 'CODE') {
828 1         77 croak 'First and Second arguments can be only subroutine references'
829             }
830              
831 8         18 my $pkg = caller;
832              
833 8         21 my $iterator = $self->iterator;
834              
835             $iterator->next(sub {
836 8 50   8   20 if (@_) {
837 8         16 my $prev = $_[0];
838            
839 8         12 my $reduce_cb; $reduce_cb = sub {
840 24         41 my $reduce_cb = $reduce_cb;
841             $iterator->next(sub {
842 24 100       51 if (@_) {
843             {
844 3     3   22 no strict 'refs';
  3         8  
  3         2321  
  16         27  
845 16         25 local *{ $pkg . '::a' } = \$prev;
  16         57  
846 16         28 local *{ $pkg . '::b' } = \$_[0];
  16         43  
847 16         37 $prev = $code->();
848             }
849 16         81 $reduce_cb->();
850             } else {
851 8         26 $return_cb->($prev);
852             }
853 24         112 });
854 8         28 };$reduce_cb->();
  8         21  
855 8         80 weaken $reduce_cb;
856             } else {
857 0         0 $return_cb->();
858             }
859 8         40 });
860              
861 8         23 return $self;
862             }
863              
864             =head2 sum($returing_cb)
865              
866             The method computes sum of all items in stream.
867              
868             $stream->sum(
869             sub {
870             $sum = shift
871             #...
872             });
873             =cut
874             sub sum {
875 2     2 1 14 my $self = shift;
876 2         4 my $return_cb = shift;
877              
878 2 100       8 if (ref $return_cb ne 'CODE') {
879 1         143 croak 'First argument can be only subroutine reference'
880             }
881              
882 1     2   4 $self->reduce(sub{$a+$b}, $return_cb);
  2         6  
883              
884 1         4 return $self;
885             }
886              
887             =head2 min($returing_cb)
888              
889             The method finds out minimum item among all items in stream.
890              
891             $stream->min(
892             sub {
893             $sum = shift
894             #...
895             });
896             =cut
897             sub min {
898 3     3 1 37 my $self = shift;
899 3         6 my $return_cb = shift;
900              
901 3 100       13 if (ref $return_cb ne 'CODE') {
902 1         85 croak 'First argument can be only subroutine reference'
903             }
904              
905 2 100   4   14 $self->reduce(sub{$a < $b ? $a : $b}, $return_cb);
  4         15  
906              
907 2         5 return $self;
908             }
909              
910             =head2 max($returing_cb)
911              
912             The method finds out maximum item among all items in stream.
913              
914             $stream->max(
915             sub {
916             $sum = shift
917             #...
918             });
919             =cut
920             sub max {
921 3     3 1 22 my $self = shift;
922 3         5 my $return_cb = shift;
923              
924 3 100       9 if (ref $return_cb ne 'CODE') {
925 1         66 croak 'First argument can be only subroutine reference'
926             }
927              
928 2 100   4   21 $self->reduce(sub{$a > $b ? $a : $b}, $return_cb);
  4         23  
929              
930 2         5 return $self;
931             }
932              
933             =head2 count($returing_cb)
934              
935             The method counts number items in streams.
936              
937             $stream->count(sub {
938             $count = shift;
939             });
940             =cut
941             sub count {
942 2     2 1 11 my $self = shift;
943 2         3 my $return_cb = shift;
944              
945 2 100       7 if (ref $return_cb ne 'CODE') {
946 1         103 croak 'First argument can be only subroutine reference'
947             }
948              
949 1         2 my $result = 0;
950 1         3 my $iterator = $self->iterator;
951              
952 1         2 my $next_cb ; $next_cb = sub {
953 4     4   7 my $next_cb = $next_cb;
954             $iterator->next(sub {
955 4 100       11 if (@_) {
956 3         4 $result++;
957 3         8 return $next_cb->();
958             }
959 1         3 $return_cb->($result)
960 4         14 });
961 1         4 }; $next_cb->();
  1         3  
962 1         4 weaken $next_cb;
963              
964 1         2 return $self;
965             }
966              
967             =head2 any($predicat, $return_cb)
968              
969             Method look for any equivalent item in steam. if there is any then return that.
970             if there isn't then return nothing.
971              
972             $stream->any(sub {$_ % 2})->to_arrayref(sub {print @{$_[0]}});
973             =cut
974             sub any {
975 2     2 1 22 my $self = shift;
976 2         5 my $predicat = shift;
977 2         5 my $return_cb = shift;
978              
979 2         8 my $iterator = $self->iterator;
980              
981 2         5 my $next_cb; $next_cb = sub {
982 6     6   13 my $next_cb = $next_cb;
983             $iterator->next(sub {
984 6 100       18 if (@_) {
985 5         10 local *{_} = \$_[0];
986 5 100       15 if ($predicat->()) {
987 1         5 $return_cb->($_[0]);
988             } else {
989 4         29 $next_cb->();
990             }
991             } else {
992 1         4 $return_cb->()
993             }
994 6         35 });
995 2         11 }; $next_cb->();
  2         8  
996 2         10 weaken $next_cb;
997              
998 2         17 return $self;
999             }
1000              
1001              
1002             sub _set_head {
1003 59     59   100 my $self = shift;
1004 59         100 my $generator = shift;
1005 59         127 my %args = @_;
1006              
1007 59   100     234 my $prefetch = $args{prefetch} // $self->{_prefetch};
1008              
1009 59 100       135 if ($prefetch) {
1010 2         5 my $new_generator = _get_prefetch_generator($generator, $self->{_prefetch});
1011 2         7 $self->{_head} = Async::Stream::Item->new(undef, $new_generator);
1012             } else {
1013 57         192 $self->{_head} = Async::Stream::Item->new(undef, $generator);
1014             }
1015              
1016 59         128 return $self;
1017             }
1018              
1019             sub _get_prefetch_generator {
1020 2     2   6 my ($generator,$prefetch) = @_;
1021              
1022 2         3 my @responses_cache;
1023             my @requests_queue;
1024 2         4 my $is_exhausted = 0;
1025 2         3 my $item_requested = 0;
1026              
1027             return sub {
1028 10     10   20 my $return_cb = shift;
1029              
1030 10 50       25 if (@responses_cache) {
1031 0         0 $return_cb->(shift @responses_cache);
1032             } else {
1033 10         19 push @requests_queue, $return_cb;
1034             }
1035              
1036 10 100 66     41 if (!$is_exhausted) {
    100          
1037 6         16 for (0 .. ($prefetch - $item_requested)) {
1038 18         44 $item_requested++;
1039             $generator->(sub {
1040 18         99 $item_requested--;
1041 18 100       38 if (@_) {
1042 8 50       16 if (@requests_queue) {
1043 8         24 shift(@requests_queue)->($_[0]);
1044             } else {
1045 0         0 push @responses_cache, $_[0];
1046             }
1047             } else {
1048 10         19 $is_exhausted = 1;
1049 10 50 66     65 if (!$item_requested && @requests_queue) {
1050 0         0 shift(@requests_queue)->();
1051             }
1052             }
1053 18         69 });
1054             }
1055             } elsif (!$item_requested && @requests_queue) {
1056 2         8 shift(@requests_queue)->();
1057             }
1058 2         7 };
1059             }
1060              
1061             =head1 AUTHOR
1062              
1063             Kirill Sysoev, C<< >>
1064              
1065             =head1 BUGS AND LIMITATIONS
1066              
1067             Please report any bugs or feature requests to
1068             L.
1069              
1070             =head1 SUPPORT
1071              
1072             You can find documentation for this module with the perldoc command.
1073              
1074             perldoc Async::Stream::Item
1075              
1076              
1077             =head1 LICENSE AND COPYRIGHT
1078              
1079             Copyright 2017 Kirill Sysoev.
1080              
1081             This program is free software; you can redistribute it and/or modify it
1082             under the terms of the the Artistic License (2.0). You may obtain a
1083             copy of the full license at:
1084              
1085             L
1086             =cut
1087              
1088             1; # End of Async::Stream