File Coverage

blib/lib/Mojo/Rx/Operators/Pipeable.pm
Criterion Covered Total %
statement 30 192 15.6
branch 0 52 0.0
condition 0 16 0.0
subroutine 10 34 29.4
pod 0 12 0.0
total 40 306 13.0


line stmt bran cond sub pod time code
1             package Mojo::Rx::Operators::Pipeable;
2 2     2   15 use strict;
  2         7  
  2         68  
3 2     2   11 use warnings FATAL => 'all';
  2         6  
  2         88  
4              
5 2     2   12 use Mojo::Rx::Operators::Creation 'rx_observable', 'rx_subject';
  2         4  
  2         101  
6 2     2   854 use Mojo::Rx::ConnectableObservable;
  2         7  
  2         59  
7 2     2   16 use Mojo::Rx::Utils 'get_subscription_from_subscriber';
  2         3  
  2         94  
8 2     2   13 use Mojo::Rx::Subscription;
  2         5  
  2         33  
9              
10 2     2   9 use Mojo::IOLoop;
  2         3  
  2         12  
11              
12 2     2   71 use Carp 'croak';
  2         6  
  2         92  
13 2     2   12 use Scalar::Util 'reftype';
  2         15  
  2         113  
14              
15 2     2   12 use Exporter 'import';
  2         4  
  2         4955  
16             our @EXPORT_OK = qw/
17             op_delay op_filter op_map op_map_to op_multicast op_pairwise op_ref_count
18             op_scan op_share op_take op_take_until op_tap
19             /;
20             our %EXPORT_TAGS = (all => \@EXPORT_OK);
21              
22             # TODO: are these op_delay comments still valid?
23             # Two bugs: 1) script doesn't exit upon the subscriber receiving complete, and 2) delaying of(1, 2, 3) often
24             # shows fewer than 3 'next' values and not in the right order.
25              
26             sub op_delay {
27 0     0 0   my ($delay) = @_;
28              
29             return sub {
30 0     0     my ($source) = @_;
31              
32             return rx_observable->new(sub {
33 0           my ($subscriber) = @_;
34              
35 0           my $queue;
36             my $own_subscriber = {
37             map {
38 0           my $type = $_;
  0            
39              
40             (
41             $type => sub {
42 0           my @value = @_;
43              
44 0 0         if (! defined $queue) {
45 0           $queue = [];
46             Mojo::IOLoop->timer(0, sub {
47 0           my @queue_copy = @$queue;
48 0           undef $queue;
49             Mojo::IOLoop->timer($delay, sub {
50 0           foreach my $item (@queue_copy) {
51 0           my ($type, $value_ref) = @$item;
52 0 0         $subscriber->{$type}->(@$value_ref) if defined $subscriber->{$type};
53             }
54 0           });
55 0           });
56             }
57 0           push @$queue, [$type, \@value];
58             }
59 0           );
60             } qw/ next error complete /
61             };
62              
63 0           return $source->subscribe($own_subscriber);
64 0           });
65 0           };
66             }
67              
68             sub op_filter {
69 0     0 0   my ($filtering_sub) = @_;
70              
71             return sub {
72 0     0     my ($source) = @_;
73              
74             return rx_observable->new(sub {
75 0           my ($subscriber) = @_;
76              
77 0           my $own_subscriber = { %$subscriber };
78             $own_subscriber->{next} &&= sub {
79 0           my $passes = eval { $filtering_sub->(@_) };
  0            
80 0 0         if (my $error = $@) {
81 0           $subscriber->{error}->($error);
82             } else {
83 0 0 0       $subscriber->{next}->(@_) if $passes and defined $subscriber->{next};
84             }
85 0   0       };
86              
87 0           $source->subscribe($own_subscriber);
88              
89 0           return;
90 0           });
91 0           };
92             }
93              
94             sub op_map {
95 0     0 0   my ($mapping_sub) = @_;
96              
97             return sub {
98 0     0     my ($source) = @_;
99              
100             return rx_observable->new(sub {
101 0           my ($subscriber) = @_;
102              
103 0           my $own_subscriber = { %$subscriber };
104             $own_subscriber->{next} &&= sub {
105 0           my $result = eval { $mapping_sub->(@_) };
  0            
106 0 0         if (my $error = $@) {
107 0 0         $subscriber->{error}->($error) if defined $subscriber->{error};
108             } else {
109 0 0         $subscriber->{next}->($result) if defined $subscriber->{next};
110             }
111 0   0       };
112              
113 0           $source->subscribe($own_subscriber);
114              
115 0           return;
116 0           });
117 0           };
118             }
119              
120             sub op_map_to {
121 0     0 0   my ($mapping_value) = @_;
122              
123             return sub {
124 0     0     my ($source) = @_;
125              
126             return rx_observable->new(sub {
127 0           my ($subscriber) = @_;
128              
129 0           my $own_subscriber = { %$subscriber };
130             $own_subscriber->{next} &&= sub {
131 0 0         $subscriber->{next}->($mapping_value) if defined $subscriber->{next};
132 0   0       };
133              
134 0           $source->subscribe($own_subscriber);
135              
136 0           return;
137 0           });
138 0           };
139             }
140              
141             sub op_multicast {
142 0     0 0   my ($subject_factory) = @_;
143              
144             return sub {
145 0     0     my ($source) = @_;
146              
147 0           return Mojo::Rx::ConnectableObservable->new($source, $subject_factory);
148 0           };
149             }
150              
151             sub op_pairwise {
152             return sub {
153 0     0     my ($source) = @_;
154              
155             return rx_observable->new(sub {
156 0           my ($subscriber) = @_;
157              
158 0           my $prev_value;
159 0           my $have_prev_value = 0;
160              
161             my $own_subscriber = {
162             %$subscriber,
163             (
164             next => sub {
165 0           my ($value) = @_;
166              
167 0 0         if ($have_prev_value) {
168 0 0         $subscriber->{next}->([$prev_value, $value]) if defined $subscriber->{next};
169             } else {
170 0           $have_prev_value = 1;
171             }
172              
173 0           $prev_value = $value;
174             }
175             ) x!! defined $subscriber->{next},
176 0           };
177              
178 0           $source->subscribe($own_subscriber);
179              
180 0           return;
181 0           });
182 0     0 0   };
183             };
184              
185             sub op_ref_count {
186             return sub {
187 0     0     my ($source) = @_;
188              
189 0 0         croak 'op_ref_count() was not applied to a connectable observable'
190             unless $source->isa('Mojo::Rx::ConnectableObservable');
191              
192 0           my $count = 0;
193              
194 0           my $connection_subscription;
195             my $typical_unsubscription_fn = sub {
196 0 0         if (--$count == 0) {
197 0           $connection_subscription->unsubscribe;
198             }
199 0           };
200              
201             return rx_observable->new(sub {
202 0           my ($subscriber) = @_;
203              
204 0           my $count_was = $count++;
205              
206 0 0         if ($count_was == 0) {
207 0           $connection_subscription = Mojo::Rx::Subscription->new;
208              
209 0           get_subscription_from_subscriber($subscriber)->add_dependents($typical_unsubscription_fn);
210 0           $source->subscribe($subscriber);
211              
212 0           $connection_subscription = $source->connect;
213             } else {
214 0           get_subscription_from_subscriber($subscriber)->add_dependents($typical_unsubscription_fn);
215 0           $source->subscribe($subscriber);
216             }
217              
218 0           return;
219 0           });
220 0     0 0   };
221             }
222              
223             sub op_scan {
224 0     0 0   my ($accumulator_function, $seed) = @_;
225 0           my $has_seed = @_ >= 2;
226              
227             return sub {
228 0     0     my ($source) = @_;
229              
230             return rx_observable->new(sub {
231 0           my ($subscriber) = @_;
232              
233 0           my $has_seed = $has_seed;
234              
235 0 0         my $acc; $acc = $seed if $has_seed;
  0            
236 0           my $index = -1;
237             my $own_subscriber = {
238             %$subscriber,
239             (
240             next => sub {
241 0           my ($value) = @_;
242              
243 0 0         if (! $has_seed) {
244 0           $acc = $value;
245 0           $has_seed = 1;
246             } else {
247 0           ++$index;
248 0           $acc = $accumulator_function->($acc, $value, $index);
249             }
250              
251 0 0         $subscriber->{next}->($acc) if defined $subscriber->{next};
252             },
253             ) x!! defined $subscriber->{next},
254 0           };
255              
256 0           $source->subscribe($own_subscriber);
257              
258 0           return;
259 0           });
260 0           };
261             }
262              
263             sub op_share {
264             return (
265 0     0 0   op_multicast(sub { rx_subject->new }),
  0     0      
266             op_ref_count(),
267             );
268             }
269              
270             sub op_take {
271 0     0 0   my ($count) = @_;
272              
273 0 0         croak 'negative argument passed to op_take' unless $count >= 0;
274              
275             return sub {
276 0     0     my ($source) = @_;
277              
278             return rx_observable->new(sub {
279 0           my ($subscriber) = @_;
280              
281 0           my $remaining = int $count;
282              
283 0 0         if ($remaining == 0) {
284 0 0         $subscriber->{complete}->() if defined $subscriber->{complete};
285 0           return;
286             }
287              
288             my $own_subscriber = {
289             %$subscriber,
290             next => sub {
291 0 0         $subscriber->{next}->(@_) if defined $subscriber->{next};
292 0 0 0       $subscriber->{complete}->() if --$remaining == 0 and defined $subscriber->{complete};
293             },
294 0           };
295              
296 0           $source->subscribe($own_subscriber);
297              
298 0           return;
299 0           });
300 0           };
301             }
302              
303             sub op_take_until {
304 0     0 0   my ($notifier_observable) = @_;
305              
306             return sub {
307 0     0     my ($source) = @_;
308              
309             return rx_observable->new(sub {
310 0           my ($subscriber) = @_;
311              
312             my $n_s = $notifier_observable->subscribe(
313             sub {
314 0 0         $subscriber->{complete}->() if defined $subscriber->{complete};
315             },
316             sub {
317 0 0         $subscriber->{error}->(@_) if defined $subscriber->{error};
318             },
319 0           );
320              
321 0           $source->subscribe($subscriber);
322              
323 0           return $n_s;
324 0           });
325 0           };
326             }
327              
328             sub op_tap {
329 0     0 0   my @args = @_;
330              
331             return sub {
332 0     0     my ($source) = @_;
333              
334             return rx_observable->new(sub {
335 0           my ($subscriber) = @_;
336              
337 0           my @args = @args;
338 0 0 0       my $tap_subscriber = $args[0] if (reftype($args[0]) // '') eq 'HASH';
339             $tap_subscriber //= {
340 0   0       map {($_, shift @args)} qw/ next error complete /
  0            
341             };
342              
343 0           my %own_keys = map {$_ => 1} grep { /^(next|error|complete)\z/ } (keys(%$tap_subscriber), keys(%$subscriber));
  0            
  0            
344              
345             my $own_subscriber = {
346             %$subscriber,
347             map {
348 0           my $key = $_;
  0            
349             ($key => sub {
350 0 0         $tap_subscriber->{$key}->(@_) if defined $tap_subscriber->{$key};
351 0 0         $subscriber->{$key}->(@_) if defined $subscriber->{$key};
352 0           });
353             } keys %own_keys
354             };
355              
356 0           $source->subscribe($own_subscriber);
357              
358 0           return;
359 0           });
360 0           };
361             }
362              
363             1;