File Coverage

blib/lib/Async/Stream/Pushable.pm
Criterion Covered Total %
statement 39 44 88.6
branch 8 10 80.0
condition n/a
subroutine 9 9 100.0
pod 3 3 100.0
total 59 66 89.3


line stmt bran cond sub pod time code
1             package Async::Stream::Pushable;
2              
3 2     2   48295 use 5.010;
  2         8  
4 2     2   14 use strict;
  2         5  
  2         49  
5 2     2   11 use warnings;
  2         4  
  2         80  
6              
7 2     2   11 use base qw(Async::Stream);
  2         4  
  2         421  
8              
9 2     2   14 use Carp qw(croak);
  2         5  
  2         736  
10              
11             =head1 NAME
12              
13             Use that class for creating streams which you can use to push item to them.
14              
15             =head1 VERSION
16              
17             Version 0.11
18              
19             =cut
20              
21             our $VERSION = '0.12';
22              
23              
24             =head1 SYNOPSIS
25              
26             Creating pushable streams,
27             you can use this type of streams for example for observer pattern
28              
29             use Async::Stream::Pushable;
30              
31             my $stream = Async::Stream::Pushable->new;
32              
33             $stream->push(1,2,3)->finalize;
34              
35             # Or for example
36            
37             my $stream = Async::Stream::Pushable->new;
38              
39             $some_object->subscribe(sub {
40             my $new_item = shift;
41             $stream->push($new_item);
42             });
43              
44            
45             =head1 SUBROUTINES/METHODS
46              
47              
48             =head2 new(@array_of_items)
49              
50             Constructor creates instance of class.
51             Class method create stream which you can use that to push items to that.
52              
53             my $stream = Async::Stream::Pushable->new(@urls)
54              
55             =cut
56              
57             sub new {
58 1     1 1 880 my $class = shift;
59              
60 1         4 my $self;
61             $self = $class->SUPER::new(
62             sub {
63 4     4   12 my ($return_cb) = @_;
64              
65 4 100       6 if (@{$self->{_items}}) {
  4         13  
66 3         5 $return_cb->(shift @{ $self->{_items} });
  3         13  
67             } else {
68 1 50       3 if ($self->{_is_finalized}){
69 1         2 $return_cb->();
70             }
71             else {
72 0         0 push @{ $self->{_requests} }, $return_cb;
  0         0  
73             }
74             }
75             },
76 1         17 @_,
77             );
78              
79 1         10 $self->{_items} = [];
80 1         3 $self->{_requests} = [];
81 1         3 $self->{_is_finalized} = 0;
82              
83 1         4 return $self;
84             }
85              
86             =head2 push(@new_items)
87              
88             Push new items to stream
89              
90             my $stream->push(@new_items);
91              
92             =cut
93             sub push {
94 2     2 1 15 my ($self, @new_items) = @_;
95              
96 2 100       149 croak q{The stream is finalized} if ($self->{_is_finalized});
97              
98 1         3 while (@{ $self->{_requests} }) {
  1         5  
99 0         0 my $return_cb = shift @{ $self->{_requests} };
  0         0  
100 0         0 $return_cb->( shift @new_items );
101             }
102              
103 1 50       4 if (@new_items) {
104 1         3 push @{ $self->{_items} }, @new_items;
  1         4  
105             }
106              
107 1         7 return $self;
108             }
109              
110             =head2 finalize()
111              
112             Finalize stream
113              
114             my $stream->finalize;
115              
116             =cut
117             sub finalize {
118 2     2 1 613 my ($self) = @_;
119              
120 2 100       86 croak q{The stream has already been finalized} if ($self->{_is_finalized});
121              
122 1         4 $self->{_is_finalized} = 1;
123             }
124              
125             =head1 AUTHOR
126              
127             Kirill Sysoev, C<< >>
128              
129             =head1 BUGS AND LIMITATIONS
130              
131             Please report any bugs or feature requests to
132             L.
133              
134             =head1 SUPPORT
135              
136             You can find documentation for this module with the perldoc command.
137              
138             perldoc Async::Stream::Item
139              
140              
141             =head1 LICENSE AND COPYRIGHT
142              
143             Copyright 2017 Kirill Sysoev.
144              
145             This program is free software; you can redistribute it and/or modify it
146             under the terms of the the Artistic License (2.0). You may obtain a
147             copy of the full license at:
148              
149             L
150             =cut
151              
152             1; # End of Async::Stream::Pushable