File Coverage

blib/lib/Ryu/Node.pm
Criterion Covered Total %
statement 72 73 98.6
branch 43 54 79.6
condition 27 47 57.4
subroutine 19 19 100.0
pod 7 11 63.6
total 168 204 82.3


line stmt bran cond sub pod time code
1             package Ryu::Node;
2              
3 46     46   183084 use strict;
  46         106  
  46         1814  
4 46     46   258 use warnings;
  46         2165  
  46         10541  
5              
6             our $VERSION = '4.001'; # VERSION
7             our $AUTHORITY = 'cpan:TEAM'; # AUTHORITY
8              
9             =head1 NAME
10              
11             Ryu::Node - generic node
12              
13             =head1 DESCRIPTION
14              
15             This is a common base class for all sources, sinks and other related things.
16             It does very little.
17              
18             =cut
19              
20 46     46   37495 use Future;
  46         674044  
  46         2069  
21 46     46   358 use Scalar::Util qw(refaddr);
  46         107  
  46         60722  
22              
23             =head1 METHODS
24              
25             Not really. There's a constructor, but that's not particularly exciting.
26              
27             =cut
28              
29             sub new {
30 237     237 0 203421 bless {
31             pause_propagation => 1,
32             @_[1..$#_]
33             }, $_[0]
34             }
35              
36             =head2 describe
37              
38             Returns a string describing this node and any parents - typically this will result in a chain
39             like C<< from->combine_latest->count >>.
40              
41             =cut
42              
43             # It'd be nice if L already provided a method for this, maybe I should suggest it
44             sub describe {
45 1184     1184 1 1982 my ($self) = @_;
46 1184         2335 my $completed = $self->_completed;
47 1184 100 100     2402 ($self->parent ? $self->parent->describe . '=>' : '') . '[' . ($self->label // 'unknown') . '](' . ($completed ? $completed->state : 'inactive') . ')';
    50          
48             }
49              
50             =head2 completed
51              
52             Returns a L indicating completion (or failure) of this stream.
53              
54             =cut
55              
56             sub completed {
57 42     42 1 32597 my ($self) = @_;
58 42 50       132 my $completion = $self->_completed
59             or return undef;
60             return $completion->without_cancel->on_ready(sub {
61 39     39   4327 my $f = shift;
62 39         97 my ($expected) = $f->state =~ /^(\S+)/;
63 39         428 my ($actual) = $completion->state =~ /^(\S+)/;
64 39 50       343 if($expected ne $actual) {
65 0         0 warn "Completed state $actual does not match internal state $expected - if you are calling ->completed->$expected, this will not work: use ->finish or ->fail instead";
66             }
67 42         178 });
68             }
69              
70             # Internal use only, since it's cancellable
71             sub _completed {
72 3285     3285   8185 my ($self) = @_;
73 3285 100       12868 return $self->{completed} if $self->{completed};
74 223         789 $self->{completed} = my $f = $self->new_future(
75             'completion'
76             );
77 223 50       6086 $f->on_ready(
78             $self->curry::weak::cleanup
79             ) if $self->can('cleanup');
80 223         10663 $f
81             }
82              
83             =head2 pause
84              
85             Does nothing useful.
86              
87             =cut
88              
89             sub pause {
90 55     55 1 2765 my ($self, $src) = @_;
91 55 100 100     318 my $k = (defined $src and ref $src)
      33        
      50        
      100        
92             ? refaddr($src // $self) // 0
93             : $src // 0;
94              
95 55   66     153 my $was_paused = $self->{is_paused} && keys %{$self->{is_paused}};
96 55 100 66     182 if($self->{unblocked} and $self->{unblocked}->is_ready) {
97 6         36 delete $self->{unblocked};
98             }
99 55         215 ++$self->{is_paused}{$k};
100 55 100       196 if(my $parent = $self->parent) {
101 37 100       150 $parent->pause($self) if $self->{pause_propagation};
102             }
103 55 100       143 if(my $flow_control = $self->{flow_control}) {
104 21 50       87 $flow_control->emit(0) unless $was_paused;
105             }
106             $self
107 55         130 }
108              
109             =head2 resume
110              
111             Is about as much use as L.
112              
113             =cut
114              
115             sub resume {
116 59     59 1 6724 my ($self, $src) = @_;
117 59 100 100     364 my $k = (defined $src and ref $src)
      33        
      50        
      100        
118             ? refaddr($src // $self) // 0
119             : $src // 0;
120 59 50       252 delete $self->{is_paused}{$k} unless --$self->{is_paused}{$k} > 0;
121 59 50 33     160 return $self if $self->{is_paused} and keys %{$self->{is_paused}};
  59         186  
122              
123 59 100       148 if(my $f = $self->{unblocked}) {
124 2 50       5 $f->done unless $f->is_ready;
125             }
126 59 100       306 if(my $parent = $self->parent) {
127 39 100       152 $parent->resume($self) if $self->{pause_propagation};
128             }
129 59 100       180 if(my $flow_control = $self->{flow_control}) {
130 21         103 $flow_control->emit(1);
131             }
132             $self
133 59         174 }
134              
135             =head2 unblocked
136              
137             Returns a L representing the current flow control state of this node.
138              
139             It will be L if this node is currently paused,
140             otherwise L.
141              
142             =cut
143              
144             sub unblocked {
145             # Since we don't want stray callers to affect our internal state, we always return
146             # a non-cancellable version of our internal Future.
147 6     6 1 8 my $self = shift;
148 6         18 return $self->_unblocked->without_cancel
149             }
150              
151             sub _unblocked {
152 6     6   12 my ($self) = @_;
153             # Since we don't want stray callers to affect our internal state, we always return
154             # a non-cancellable version of our internal Future.
155 6   33     15 $self->{unblocked} //= do {
156 6 100       14 $self->is_paused
157             ? $self->new_future
158             : Future->done
159             };
160             }
161              
162             =head2 is_paused
163              
164             Might return 1 or 0, but is generally meaningless.
165              
166             =cut
167              
168             sub is_paused {
169 274     274 1 8246 my ($self, $src) = @_;
170 274 100       569 return keys(%{ $self->{is_paused} }) ? 1 : 0 unless defined $src;
  136 100       701  
171 138 50 33     647 my $k = (defined $src and ref $src)
      33        
      50        
      0        
172             ? refaddr($src // $self) // 0
173             : $src // 0;
174             return exists $self->{is_paused}{$k}
175 138 100       661 ? 0 + $self->{is_paused}{$k}
176             : 0;
177             }
178              
179             sub flow_control {
180 31     31 0 57 my ($self) = @_;
181 31 50       90 return $self->{flow_control} if $self->{flow_control};
182             $self->{flow_control} = my $fc = Ryu::Source->new(
183             new_future => $self->{new_future}
184 31         92 );
185             $self->_completed->on_ready(sub {
186             my $fc = delete $self->{flow_control}
187 29 50   29   993 or return;
188 29         96 $fc->finish;
189 31         82 });
190 31         640 $fc
191             }
192              
193 1378     1378 0 8729 sub label { shift->{label} }
194              
195 2040     2040 0 5794 sub parent { shift->{parent} }
196              
197             =head2 new_future
198              
199             Used internally to get a L.
200              
201             =cut
202              
203             sub new_future {
204 228     228 1 482 my $self = shift;
205             (
206 228   66     1360 $self->{new_future} //= $Ryu::Source::FUTURE_FACTORY
207             )->($self, @_)
208             }
209              
210              
211             1;
212              
213             __END__