File Coverage

blib/lib/Ryu/Sink.pm
Criterion Covered Total %
statement 64 84 76.1
branch 13 30 43.3
condition 2 3 66.6
subroutine 17 20 85.0
pod 1 9 11.1
total 97 146 66.4


line stmt bran cond sub pod time code
1             package Ryu::Sink;
2              
3 39     39   157866 use strict;
  39         73  
  39         1888  
4 39     39   217 use warnings;
  39         65  
  39         3056  
5              
6 39     39   263 use parent qw(Ryu::Node);
  39         101  
  39         330  
7              
8             our $VERSION = '4.001'; # VERSION
9             our $AUTHORITY = 'cpan:TEAM'; # AUTHORITY
10              
11             =head1 NAME
12              
13             Ryu::Sink - base representation for a thing that receives events
14              
15             =head1 DESCRIPTION
16              
17             This is currently of limited utility.
18              
19             my $src = Ryu::Source->new;
20             my $sink = Ryu::Sink->new;
21             $sink->from($src);
22             $sink->source->say;
23              
24             =cut
25              
26 39     39   4464 use Future;
  39         83  
  39         1337  
27 39     39   607 use Log::Any qw($log);
  39         7389  
  39         329  
28              
29             =head1 METHODS
30              
31             =cut
32              
33             sub new {
34 5     5 0 574295 my $class = shift;
35 5         47 $class->SUPER::new(
36             sources => [],
37             @_
38             )
39             }
40              
41             =head2 from
42              
43             Given a source, will attach it as the input for this sink.
44              
45             The key difference between L and L is that this method will mark the sink as completed
46             when the source is finished. L allows sequencing of multiple sources, keeping the sink active
47             as each of those completes.
48              
49             =cut
50              
51             sub from {
52 0     0 1 0 my ($self, $src, %args) = @_;
53              
54 0 0       0 die 'expected a subclass of Ryu::Source, received ' . $src . ' instead' unless $src->isa('Ryu::Source');
55              
56 0 0       0 $self = $self->new unless ref $self;
57 0         0 $self->drain_from($src);
58             $src->completed->on_ready(sub {
59 0     0   0 $self->finish;
60 0         0 });
61 0         0 return $self
62             }
63              
64             sub drain_from {
65 17     17 0 3477 my ($self, $src) = @_;
66 17 100       63 if(ref $src eq 'ARRAY') {
67 1         1 my $data = $src;
68             $src = Ryu::Source->new(
69             new_future => $self->{new_future},
70 1         5 label => 'array',
71             )->from($data);
72             }
73 17 50       89 die 'expected a subclass of Ryu::Source, received ' . $src . ' instead' unless $src->isa('Ryu::Source');
74              
75 17         67 $log->tracef('Will drain from %s, with %d sources in queue already', $src->describe, 0 + $self->{sources}->@*);
76 17         361 push $self->{sources}->@*, (my $buffered = $src->buffer)->pause;
77 17         56 return $self->start_drain;
78             }
79              
80             sub start_drain {
81 34     34 0 63 my ($self) = @_;
82 34 100       94 if($self->is_draining) {
83 2         5 $log->tracef('Still draining from %s, no need to start new source yet', $self->{active_source}->describe);
84 2         38 return $self;
85             }
86 32 100       100 unless($self->{sources}->@*) {
87 15         57 $log->tracef('No need to start draining, we have no pending sources in queue');
88 15         158 return $self;
89             }
90              
91             my $src = shift $self->{sources}->@*
92 17 50       53 or do {
93 0         0 $log->warnf('Invalid pending source');
94 0         0 return $self;
95             };
96              
97 17         49 $log->tracef('Draining from source %s', $src->describe);
98 17         325 $self->{active_source} = $src;
99 17         38 my $original_parent = delete $self->{parent};
100 17         37 $self->{parent} = $src;
101             $src->_completed->on_ready(sub {
102 17     17   263 undef $self->{active_source};
103 17         40 $self->{parent} = $original_parent;
104 17         57 $self->start_drain;
105 17         40 });
106             $src->each_while_source(sub {
107 38     38   91 $self->emit($_)
108 17         531 }, $self->source, finish_source => 0);
109 17 50       49 $src->resume if $src->is_paused;
110 17         58 $src->prepare_await;
111 17         66 return $self;
112             }
113              
114 34     34 0 128 sub is_draining { !!shift->{active_source} }
115              
116             sub emit {
117 38     38 0 73 my ($self, $data) = @_;
118 38         82 $self->source->emit($data);
119 38         93 $self
120             }
121              
122             sub finish {
123 0     0 0 0 my ($self) = @_;
124 0 0       0 return $self if $self->{is_finished};
125 0         0 $self->{is_finished} = 1;
126 0         0 delete $self->{new_source};
127 0         0 my @src = splice $self->{sources}->@*;
128 0 0       0 push @src, delete $self->{active_source} if $self->{active_source};
129 0         0 for my $src (@src) {
130 0 0       0 $src->resume if $src->is_paused;
131             }
132 0 0       0 return $self unless my $src = $self->{source};
133 0         0 $src->finish;
134 0         0 return $self;
135             }
136              
137             sub source {
138 100     100 0 20935 my ($self) = @_;
139 100 100       531 return $self->{source} if $self->{source};
140 10         67 $log->tracef('Creating source for sink %s', "$self");
141 10   66 10   235 my $src = ($self->{new_source} //= sub { Ryu::Source->new(label => 'sink source') })->();
  10         58  
142              
143 10         49 $self->{source} = $src;
144 10         59 Scalar::Util::weaken($src->{parent} = $self);
145 10 50       32 $src->finish if $self->{is_finished};
146 10         45 return $src;
147             }
148              
149             sub _completed {
150 15     15   31 my ($self) = @_;
151 15         40 return $self->source->_completed;
152             }
153              
154       5 0   sub notify_child_completion { }
155              
156             sub DESTROY {
157 5     5   14055 my ($self) = @_;
158 5 50       25 return unless my $src = delete $self->{source};
159 5         17 $src->finish;
160             }
161              
162             1;
163              
164             __END__