File Coverage

blib/lib/Mojo/Rx/Operators/Creation.pm
Criterion Covered Total %
statement 59 160 36.8
branch 7 42 16.6
condition 1 9 11.1
subroutine 15 38 39.4
pod 0 14 0.0
total 82 263 31.1


line stmt bran cond sub pod time code
1             package Mojo::Rx::Operators::Creation;
2 2     2   14 use strict;
  2         4  
  2         64  
3 2     2   10 use warnings FATAL => 'all';
  2         4  
  2         82  
4              
5 2     2   838 use Mojo::Rx::Observable;
  2         5  
  2         60  
6 2     2   13 use Mojo::Rx::Subscription;
  2         4  
  2         47  
7 2     2   792 use Mojo::Rx::Utils 'get_subscription_from_subscriber';
  2         6  
  2         105  
8 2     2   842 use Mojo::Rx::Subject;
  2         6  
  2         55  
9              
10 2     2   1050 use Mojo::IOLoop;
  2         650129  
  2         34  
11              
12 2     2   105 use Carp 'croak';
  2         5  
  2         97  
13 2     2   11 use Scalar::Util 'weaken';
  2         6  
  2         77  
14              
15 2     2   11 use Exporter 'import';
  2         5  
  2         4066  
16             our @EXPORT_OK = qw/
17             rx_observable rx_of rx_concat rx_defer rx_EMPTY rx_from_event
18             rx_from_event_array rx_interval rx_merge rx_never rx_race
19             rx_subject rx_throw_error rx_timer
20             /;
21             our %EXPORT_TAGS = (all => \@EXPORT_OK);
22              
23             sub rx_observable;
24              
25             sub _rx_concat_helper {
26 0     0   0 my ($sources, $subscriber, $early_returns) = @_;
27              
28 0 0       0 @$sources or do {
29 0 0       0 $subscriber->{complete}->() if defined $subscriber->{complete};
30 0         0 return;
31             };
32              
33 0         0 my $source = shift @$sources;
34              
35 0         0 my $own_subscription = Mojo::Rx::Subscription->new;
36 0         0 @$early_returns = ($own_subscription);
37 0         0 get_subscription_from_subscriber($subscriber)->add_dependents($early_returns);
38              
39             my $own_subscriber = {
40             new_subscription => $own_subscription,
41             next => $subscriber->{next},
42             error => $subscriber->{error},
43             complete => sub {
44 0     0   0 _rx_concat_helper->($sources, $subscriber, $early_returns);
45             },
46 0         0 };
47              
48 0         0 $source->subscribe($own_subscriber);
49             }
50              
51             sub rx_concat {
52 0     0 0 0 my (@sources) = @_;
53              
54             return rx_observable->new(sub {
55 0     0   0 my ($subscriber) = @_;
56              
57 0         0 my @sources = @sources;
58              
59 0         0 my $early_returns = [];
60 0         0 get_subscription_from_subscriber($subscriber)->add_dependents($early_returns, sub { @sources = () });
  0         0  
61 0         0 _rx_concat_helper(\@sources, $subscriber, $early_returns);
62              
63 0         0 return;
64 0         0 });
65             }
66              
67             sub rx_defer {
68 0     0 0 0 my ($observable_factory) = @_;
69              
70             return rx_observable->new(sub {
71 0     0   0 my ($subscriber) = @_;
72              
73 0         0 my $observable = $observable_factory->();
74              
75 0         0 return $observable->subscribe($subscriber);
76 0         0 });
77             }
78              
79             my $rx_EMPTY;
80              
81             sub rx_EMPTY {
82             $rx_EMPTY //= rx_observable->new(sub {
83 0     0   0 my ($subscriber) = @_;
84              
85 0 0       0 $subscriber->{complete}->() if defined $subscriber->{complete};
86              
87 0         0 return;
88 0   0 0 0 0 });
89             }
90              
91             # NOTE: rx_from_event and rx_from_event_array keep a hard reference to the
92             # EventEmitter $object. Should this change? TODO: think about that.
93              
94             sub rx_from_event {
95 0     0 0 0 my ($object, $event_type) = @_;
96              
97 0 0       0 croak 'invalid object type, at rx_from_event' if not $object->isa('Mojo::EventEmitter');
98              
99             return rx_observable->new(sub {
100 0     0   0 my ($subscriber) = @_;
101              
102             my $cb = sub {
103 0         0 my ($e, @args) = @_;
104              
105 0 0       0 $subscriber->{next}->(splice @args, 0, 1) if defined $subscriber->{next};
106 0         0 };
107              
108 0         0 get_subscription_from_subscriber($subscriber)->add_dependents(sub { $object->unsubscribe($cb) });
  0         0  
109              
110 0         0 $object->on($event_type, $cb);
111              
112 0         0 return;
113 0         0 });
114             }
115              
116             sub rx_from_event_array {
117 0     0 0 0 my ($object, $event_type) = @_;
118              
119 0 0       0 croak 'invalid object type, at rx_from_event' if not $object->isa('Mojo::EventEmitter');
120              
121             return rx_observable->new(sub {
122 0     0   0 my ($subscriber) = @_;
123              
124             my $cb = sub {
125 0         0 my ($e, @args) = @_;
126              
127 0 0       0 $subscriber->{next}->([@args]) if defined $subscriber->{next};
128 0         0 };
129              
130 0         0 get_subscription_from_subscriber($subscriber)->add_dependents(sub { $object->unsubscribe($cb) });
  0         0  
131              
132 0         0 $object->on($event_type, $cb);
133              
134 0         0 return;
135 0         0 });
136             }
137              
138             sub rx_interval {
139 0     0 0 0 my ($after) = @_;
140              
141             return rx_observable->new(sub {
142 0     0   0 my ($subscriber) = @_;
143              
144 0         0 my $counter = 0;
145             my $id = Mojo::IOLoop->recurring($after, sub {
146 0 0       0 $subscriber->{next}->($counter++) if defined $subscriber->{next};
147 0         0 });
148              
149             return sub {
150 0         0 Mojo::IOLoop->remove($id);
151 0         0 };
152 0         0 });
153             }
154              
155             sub rx_merge {
156 1     1 0 6 my @sources = @_;
157              
158             return rx_observable->new(sub {
159 1     1   4 my ($subscriber) = @_;
160              
161 1         2 my @sources = @sources;
162              
163 1         2 my %own_subscriptions;
164             get_subscription_from_subscriber($subscriber)->add_dependents(
165             \%own_subscriptions,
166 1         7 sub { @sources = () },
167 1         5 );
168              
169 1         3 my $num_active_subscriptions = @sources;
170 1 50 33     6 $num_active_subscriptions or $subscriber->{complete}->() if defined $subscriber->{complete};
171              
172 1         5 for (my $i = 0; $i < @sources; $i++) {
173 2         4 my $source = $sources[$i];
174 2         5 my $own_subscription = Mojo::Rx::Subscription->new;
175 2         7 $own_subscriptions{$own_subscription} = $own_subscription;
176             my $own_subscriber = {
177             new_subscription => $own_subscription,
178             next => $subscriber->{next},
179             error => $subscriber->{error},
180             complete => sub {
181 2         6 delete $own_subscriptions{$own_subscription};
182 2 100       21 if (! --$num_active_subscriptions) {
183 1 50       5 $subscriber->{complete}->() if defined $subscriber->{complete};
184             }
185             },
186 2         11 };
187 2         9 $source->subscribe($own_subscriber);
188             }
189              
190 1         4 return;
191 1         3 });
192             }
193              
194             my $rx_never;
195              
196             sub rx_never {
197             return $rx_never //= rx_observable->new(sub {
198 0     0   0 return;
199 0   0 0 0 0 });
200             }
201              
202 6     6 0 9033 sub rx_observable { "Mojo::Rx::Observable" }
203              
204             sub rx_of {
205 3     3 0 5077 my (@values) = @_;
206              
207             return rx_observable->new(sub {
208 3     3   6 my ($subscriber) = @_;
209              
210 3         8 foreach my $value (@values) {
211 9 50       23 return if !! ${ $subscriber->{closed_ref} };
  9         19  
212 9 50       28 $subscriber->{next}->($value) if defined $subscriber->{next};
213             }
214 3 50       16 $subscriber->{complete}->() if defined $subscriber->{complete};
215              
216 3         17 return;
217 3         8 });
218             }
219              
220             sub rx_race {
221 0     0 0   my (@sources) = @_;
222              
223             return rx_observable->new(sub {
224 0     0     my ($subscriber) = @_;
225             # TODO: experiment in the end with passing a second parameter here, an arrayref, called \@early_return_values
226             # TODO: like: my ($subscriber, $early_return_values) = @_; and then push @$early_return_values, sub {...};
227              
228 0           my @sources = @sources;
229              
230 0           my @own_subscriptions;
231 0           get_subscription_from_subscriber($subscriber)->add_dependents(\@own_subscriptions);
232              
233 0           for (my $i = 0; $i < @sources; $i++) {
234 0           my $source = $sources[$i];
235              
236 0           my $own_subscription = Mojo::Rx::Subscription->new;
237 0           push @own_subscriptions, $own_subscription;
238 0           my $own_subscriber = {
239             new_subscription => $own_subscription,
240             };
241              
242 0           foreach my $type (qw/ next error complete /) {
243             $own_subscriber->{$type} = sub {
244 0           $_->unsubscribe foreach grep $_ ne $own_subscription, @own_subscriptions;
245 0           @own_subscriptions = ($own_subscription);
246 0           @sources = ();
247 0 0         $subscriber->{$type}->(@_) if defined $subscriber->{$type};
248 0           @$own_subscriber{qw/ next error complete /} = @$subscriber{qw/ next error complete /};
249 0           };
250             }
251              
252 0           $source->subscribe($own_subscriber);
253             }
254              
255             # this could be replaced with a 'return undef' at this point
256 0           return \@own_subscriptions;
257 0           });
258             }
259              
260 0     0 0   sub rx_subject { "Mojo::Rx::Subject" }
261              
262             sub rx_throw_error {
263 0     0 0   my ($error) = @_;
264              
265             return rx_observable->new(sub {
266 0     0     my ($subscriber) = @_;
267              
268 0 0         $subscriber->{error}->($error) if defined $subscriber->{error};
269              
270 0           return;
271 0           });
272             };
273              
274             sub rx_timer {
275 0     0 0   my ($after, $period) = @_;
276              
277             return rx_observable->new(sub {
278 0     0     my ($subscriber) = @_;
279              
280 0           my $counter = 0;
281 0           my $id;
282             $id = Mojo::IOLoop->timer($after, sub {
283 0           undef $id;
284 0 0         $subscriber->{next}->($counter++) if defined $subscriber->{next};
285 0 0         if (defined $period) {
286             $id = Mojo::IOLoop->recurring($period, sub {
287 0 0         $subscriber->{next}->($counter++) if defined $subscriber->{next};
288 0           });
289             } else {
290 0 0         $subscriber->{complete}->() if defined $subscriber->{complete};
291             }
292 0           });
293              
294             return sub {
295 0 0         Mojo::IOLoop->remove($id) if defined $id;
296 0           };
297 0           });
298             };
299              
300             1;