File Coverage

blib/lib/RxPerl/Operators/Pipeable.pm
Criterion Covered Total %
statement 813 1084 75.0
branch 199 430 46.2
condition 48 90 53.3
subroutine 122 164 74.3
pod n/a
total 1182 1768 66.8


line stmt bran cond sub pod time code
1             package RxPerl::Operators::Pipeable;
2 5     5   33 use strict;
  5         11  
  5         234  
3 5     5   26 use warnings;
  5         9  
  5         339  
4              
5 5         663 use RxPerl::Operators::Creation qw/
6             rx_observable rx_subject rx_concat rx_of rx_interval rx_combine_latest rx_concat
7             rx_throw_error rx_zip rx_merge rx_on_error_resume_next rx_race rx_timer
8             rx_behavior_subject rx_defer
9 5     5   47 /;
  5         9  
10 5     5   2659 use RxPerl::ConnectableObservable;
  5         16  
  5         200  
11 5     5   63 use RxPerl::Utils qw/ get_timer_subs /;
  5         19  
  5         334  
12 5     5   31 use RxPerl::Subscription;
  5         8  
  5         120  
13              
14 5     5   22 use Carp 'croak';
  5         8  
  5         268  
15 5     5   30 use Scalar::Util 'reftype', 'refaddr', 'blessed', 'weaken';
  5         9  
  5         356  
16 5     5   64 use Time::HiRes ();
  5         9  
  5         192  
17              
18 5     5   24 use Exporter 'import';
  5         11  
  5         86597  
19             our @EXPORT_OK = qw/
20             op_audit op_audit_time op_buffer op_buffer_count op_buffer_time op_catch_error op_combine_latest_with op_concat_all
21             op_concat_map op_concat_with op_count op_debounce op_debounce_time op_default_if_empty op_delay op_delay_when
22             op_distinct op_distinct_until_changed op_distinct_until_key_changed op_element_at op_end_with op_every
23             op_exhaust_all op_exhaust_map op_filter op_finalize op_find op_find_index op_first op_ignore_elements
24             op_is_empty op_last op_map op_map_to op_max op_merge_all op_merge_map op_merge_with op_min op_multicast
25             op_on_error_resume_next_with op_pairwise op_pluck op_race_with op_reduce op_ref_count op_repeat op_retry op_sample
26             op_sample_time op_scan op_share op_single op_skip op_skip_last op_skip_until op_skip_while op_start_with
27             op_switch_all op_switch_map op_take op_take_last op_take_until op_take_while op_tap op_throttle op_throttle_time
28             op_throw_if_empty op_time_interval op_timeout op_timestamp op_to_array op_with_latest_from op_zip_with
29             /;
30             our %EXPORT_TAGS = (all => \@EXPORT_OK);
31              
32             our $VERSION = "v6.29.8";
33              
34             sub op_audit {
35 3     3   7 my ($duration_selector) = @_;
36              
37             return sub {
38 3     3   5 my ($source) = @_;
39              
40             return rx_observable->new(sub {
41 3         4 my ($subscriber) = @_;
42              
43 3         8 my $last_val;
44             my $mini_subscription;
45 3         0 my $main_is_complete;
46              
47             my $mini_subscriber = {
48             next => sub {
49 5 100       31 $subscriber->{next}->($last_val) if defined $subscriber->{next};
50             },
51             error => sub {
52 1 50       5 $subscriber->{error}->(@_) if defined $subscriber->{error};
53             },
54             complete => sub {
55 5         7 undef $mini_subscription;
56 5         7 undef $last_val;
57 5 100       9 if ($main_is_complete) {
58 1 50       5 $subscriber->{complete}->() if defined $subscriber->{complete};
59             }
60             },
61 3         17 };
62              
63             my $own_subscriber = {
64             next => sub {
65 11         18 my ($v) = @_;
66              
67 11         18 $last_val = $v;
68 11 100       23 if (!defined $mini_subscription) {
69 6         7 my $o = do { local $_ = $v; $duration_selector->($v) };
  6         10  
  6         32  
70 6         16 $mini_subscription = $o->pipe(
71             op_take(1),
72             )->subscribe($mini_subscriber);
73             }
74             },
75             error => sub {
76 1 50       8 $subscriber->{error}->(@_) if defined $subscriber->{error};
77             },
78             complete => sub {
79 1         3 $main_is_complete = 1;
80 1 50       3 if (! defined $mini_subscription) {
81 0 0       0 $subscriber->{complete}->() if defined $subscriber->{complete};
82             }
83             }
84 3         17 };
85              
86 3         9 my $own_subscription = $source->subscribe($own_subscriber);
87              
88 3         9 return $mini_subscription, $own_subscription;
89 3         12 });
90 3         11 };
91             }
92              
93             sub op_audit_time {
94 0     0   0 my ($duration) = @_;
95              
96 0     0   0 return op_audit(sub { rx_timer($duration) });
  0         0  
97             }
98              
99             sub op_buffer {
100 2     2   6 my ($notifier) = @_;
101              
102             return sub {
103 2     2   4 my ($source) = @_;
104              
105             return rx_observable->new(sub {
106 2         4 my ($subscriber) = @_;
107              
108 2         3 my @buffer;
109              
110             my $own_subscriber = {
111             %$subscriber,
112             next => sub {
113 16         31 push @buffer, $_[0];
114             },
115             error => sub {
116 0 0       0 $subscriber->{error}->($_[0]) if defined $subscriber->{error};
117             },
118             complete => sub {
119 2 50 33     12 $subscriber->{next}->([@buffer]) if @buffer and defined $subscriber->{next};
120 2         3 undef @buffer;
121 2 50       6 $subscriber->{complete}->() if defined $subscriber->{complete};
122             },
123 2         15 };
124              
125             my $notifier_subscriber = {
126             next => sub {
127 5 50       23 $subscriber->{next}->([@buffer]) if defined $subscriber->{next};
128 5         10 undef @buffer;
129             },
130             error => sub {
131 0 0       0 $subscriber->{error}->($_[0]) if defined $subscriber->{error};
132             },
133 2         7 };
134              
135 2         6 my $s1 = $source->subscribe($own_subscriber);
136 2         3 my $s2 = $notifier->subscribe($notifier_subscriber);
137              
138 2         9 return [$s1, $s2], sub { undef @buffer };
  2         4  
139             })
140 2         7 }
141 2         9 }
142              
143             sub op_buffer_count {
144 3     3   9 my ($buffer_size, $start_buffer_every) = @_;
145              
146 3   66     13 $start_buffer_every //= $buffer_size;
147              
148             return sub {
149 3     3   8 my ($source) = @_;
150              
151             return rx_observable->new(sub {
152 3         9 my ($subscriber) = @_;
153              
154 3         3 my @buffers;
155 3         7 my $count = 0;
156             my $own_subscriber = {
157             %$subscriber,
158             next => sub {
159 18         31 my ($value) = @_;
160              
161 18 100       42 if ($count++ % $start_buffer_every == 0) {
162 13         25 push @buffers, [];
163             }
164              
165 18         38 for (my $i = 0; $i < @buffers; $i++) {
166 25         54 my $buffer = $buffers[$i];
167              
168 25         34 push @$buffer, $value;
169              
170 25 100       80 if (@$buffer == $buffer_size) {
171 10 50       37 $subscriber->{next}->($buffer) if defined $subscriber->{next};
172 10         19 splice @buffers, $i, 1;
173 10         12 $i--;
174 10         25 next;
175             }
176             }
177             },
178             complete => sub {
179 3 50       10 if (defined $subscriber->{next}) {
180 3         11 $subscriber->{next}->($_) foreach @buffers;
181             }
182              
183 3 50       13 $subscriber->{complete}->() if defined $subscriber->{complete};
184             },
185 3         28 };
186              
187 3         13 $source->subscribe($own_subscriber);
188              
189 3         6 return;
190 3         18 });
191 3         13 };
192             }
193              
194             sub op_buffer_time {
195 1     1   5 my ($buffer_time_span) = @_;
196              
197 1         5 return op_buffer(rx_interval($buffer_time_span));
198             }
199              
200             sub op_catch_error {
201 8     8   12 my ($selector) = @_;
202              
203             return sub {
204 8     8   12 my ($source) = @_;
205              
206             return rx_observable->new(sub {
207 6         7 my ($subscriber) = @_;
208              
209 6         11 my $own_subscription = RxPerl::Subscription->new;
210             my $own_subscriber = {
211             new_subscription => $own_subscription,
212             %$subscriber,
213             error => sub {
214 5         9 my ($err) = @_;
215              
216 5         7 my $new_o = do {
217 5         7 local $_ = $err;
218 5         8 $selector->(
219             $err,
220             $source->pipe(op_catch_error($selector)),
221             );
222             };
223              
224 5         25 my $new_subscriber = {
225             new_subscription => (my $new_subscription = RxPerl::Subscription->new),
226             %$subscriber,
227             };
228 5         11 $subscriber->subscription->add($new_subscription);
229 5         10 $new_o->subscribe($new_subscriber);
230             },
231 6         32 };
232              
233 6         14 $subscriber->subscription->add($own_subscription);
234              
235 6         14 $source->subscribe($own_subscriber);
236              
237 6         29 return;
238 8         28 });
239 8         32 };
240             }
241              
242             sub op_combine_latest_with {
243 1     1   21 my (@other_observables) = @_;
244              
245             return sub {
246 1     1   3 my $source = shift;
247              
248 1         8 return rx_combine_latest([$source, @other_observables]);
249             }
250 1         188 }
251              
252             sub op_concat_all {
253 3     3   10 return op_merge_all(1);
254             }
255              
256             sub op_concat_map {
257 2     2   5 my ($observable_factory) = @_;
258              
259             return sub {
260 2     2   4 my ($source) = @_;
261              
262             return $source->pipe(
263             op_map(sub {
264 6         11 my @args = @_;
265             return rx_defer(sub {
266 6         8 local $_ = $args[0];
267 6         15 $observable_factory->(@args);
268 6         21 });
269 2         30 }),
270             op_concat_all(),
271             );
272 2         10 };
273             }
274              
275             sub op_concat_with {
276 3     3   7 my @other_observables = @_;
277              
278             return sub {
279 3     3   5 my ($source) = @_;
280              
281 3         11 return rx_concat(
282             $source,
283             @other_observables,
284             );
285 3         13 };
286             }
287              
288             sub op_count {
289 3     3   10 my ($predicate) = @_;
290              
291             return sub {
292 3     3   5 my ($source) = @_;
293              
294             return rx_observable->new(sub {
295 3         6 my ($subscriber) = @_;
296              
297 3         4 my $count = 0;
298 3         5 my $idx = 0;
299              
300             my $own_subscriber = {
301             %$subscriber,
302             next => sub {
303 14         20 my ($v) = @_;
304 14         20 local $_ = $v;
305 14 100 66     44 if (!$predicate or $predicate->($v, $idx++)) {
306 7         32 $count++;
307             }
308             },
309             complete => sub {
310 3 50       13 $subscriber->{next}->($count) if defined $subscriber->{next};
311 3 50       9 $subscriber->{complete}->() if defined $subscriber->{complete};
312             },
313 3         21 };
314              
315 3         12 $source->subscribe($own_subscriber);
316              
317 3         14 return;
318 3         14 });
319 3         42 };
320             }
321              
322             sub op_debounce {
323 2     2   6 my ($duration_selector) = @_;
324              
325             return sub {
326 2     2   5 my ($source) = @_;
327              
328             return rx_observable->new(sub {
329 2         5 my ($subscriber) = @_;
330              
331 2         8 my $mini_subscription;
332             my $last_val;
333 2         0 my $has_last_val;
334              
335             my $mini_subscriber = {
336             next => sub {
337 5 50       37 $subscriber->{next}->($last_val) if defined $subscriber->{next};
338 5         13 undef $has_last_val;
339             },
340             error => sub {
341 0 0       0 $subscriber->{error}->(@_) if defined $subscriber->{error};
342             },
343             complete => sub {
344 5         12 undef $mini_subscription;
345             },
346 2         22 };
347              
348             my $own_subscriber = {
349             next => sub {
350 12         32 my ($v) = @_;
351              
352 12 100       33 if (defined $mini_subscription) {
353 6         28 $mini_subscription->unsubscribe();
354             }
355              
356 12         28 $last_val = $v;
357 12         23 $has_last_val = 1;
358              
359 12         18 my $o = do { local $_ = $v; $duration_selector->($v) };
  12         25  
  12         34  
360 12         46 $mini_subscription = $o->pipe(
361             op_take(1),
362             )->subscribe($mini_subscriber);
363             },
364             error => sub {
365 0 0       0 $subscriber->{error}->(@_) if defined $subscriber->{error};
366             },
367             complete => sub {
368 2 100       8 if ($has_last_val) {
369 1 50       9 $subscriber->{next}->($last_val) if defined $subscriber->{next};
370             }
371 2 50       15 $subscriber->{complete}->() if defined $subscriber->{complete};
372             }
373 2         23 };
374              
375 2         10 my $main_subscription = $source->subscribe($own_subscriber);
376 2         12 $main_subscription->add(\$mini_subscription);
377              
378 2         11 return $main_subscription, $mini_subscription;
379 2         16 });
380 2         16 };
381             }
382              
383             sub op_debounce_time {
384 2     2   7 my ($due_time) = @_;
385              
386 2     12   15 return op_debounce(sub { rx_timer($due_time) });
  12         40  
387             }
388              
389             sub op_default_if_empty {
390 7     7   31 my ($default_value) = @_;
391              
392             return sub {
393 7     7   13 my ($source) = @_;
394              
395             return rx_observable->new(sub {
396 7         10 my ($subscriber) = @_;
397              
398 7         11 my $source_emitted = 0;
399              
400             my $own_subscriber = {
401             %$subscriber,
402             next => sub {
403 3         5 $source_emitted = 1;
404 3 50       14 $subscriber->{next}->(@_) if exists $subscriber->{next};
405             },
406             complete => sub {
407 7 50 66     33 $subscriber->{next}->($default_value) if ! $source_emitted and exists $subscriber->{next};
408 7 50       22 $subscriber->{complete}->() if exists $subscriber->{complete};
409             },
410 7         35 };
411              
412 7         20 $source->subscribe($own_subscriber);
413              
414 7         24 return;
415 7         48 });
416 7         32 };
417             }
418              
419             sub op_delay {
420 708     708   1157 my ($delay) = @_;
421              
422 708         1664 my ($timer_sub, $cancel_timer_sub) = get_timer_subs;
423              
424             return sub {
425 708     708   1005 my ($source) = @_;
426              
427             return rx_observable->new(sub {
428 758         1412 my ($subscriber) = @_;
429              
430 758         1778 my %timers;
431             my $queue;
432 758         0 my $completed;
433             my $own_subscriber = {
434             error => sub {
435 1         2 my @value = @_;
436 1 50       6 $subscriber->{error}->(@value) if defined $subscriber->{error};
437             },
438             next => sub {
439 764         1624 my @value = @_;
440              
441 764 100       1381 if (!defined $queue) {
442 763         1105 $queue = [];
443 763         1039 my ($timer1, $timer2);
444             $timer1 = $timer_sub->(0, sub {
445 763         1257 delete $timers{$timer1};
446 763         1464 my @queue_copy = @$queue;
447 763         1309 undef $queue;
448             $timer2 = $timer_sub->($delay, sub {
449 728         1183 delete $timers{$timer2};
450 728         1278 foreach my $item (@queue_copy) {
451 729 100       2581 $subscriber->{next}->(@$item) if defined $subscriber->{next};
452             }
453 728 100 100     2427 if ($completed and ! %timers) {
454 723 100       2190 $subscriber->{complete}->() if defined $subscriber->{complete};
455             }
456 763         3311 });
457 763         1722 $timers{$timer2} = $timer2;
458 763         3601 });
459 763         1682 $timers{$timer1} = $timer1;
460             }
461 764         2861 push @$queue, \@value;
462             },
463             complete => sub {
464 757         1146 $completed = 1;
465 757 100       1595 if (! %timers) {
466 1 50       5 $subscriber->{complete}->() if defined $subscriber->{complete};
467             }
468             },
469 758         5759 };
470              
471             return [
472             $source->subscribe($own_subscriber),
473             sub {
474 758         1549 $cancel_timer_sub->($_) foreach values %timers;
475 758         1598 %timers = ();
476             },
477 758         2065 ];
478 708         2729 });
479 708         2921 };
480             }
481              
482             sub op_delay_when {
483 1     1   7 my ($delay_duration_selector) = @_;
484              
485             return sub {
486 1     1   4 my ($source) = @_;
487              
488             return rx_observable->new(sub {
489 1         3 my ($subscriber) = @_;
490              
491 1         2 my $idx = 0;
492 1         2 my %mini_subscriptions;
493             my $main_finished;
494              
495             my $make_mini_subscriber = sub {
496 3         6 my ($v) = @_;
497              
498 3         5 my $mini_subscription = RxPerl::Subscription->new;
499 3         7 $mini_subscriptions{$mini_subscription} = $mini_subscription;
500              
501             return {
502             new_subscription => $mini_subscription,
503             next => sub {
504 3 50       11 $subscriber->{next}->($v) if defined $subscriber->{next};
505             },
506             error => sub {
507 0 0       0 $subscriber->{error}->(@_) if defined $subscriber->{error};
508             },
509             complete => sub {
510 3         7 delete $mini_subscriptions{$mini_subscription};
511 3 100 66     47 if ($main_finished and ! %mini_subscriptions) {
512 1 50       7 $subscriber->{complete}->() if defined $subscriber->{complete};
513             }
514             },
515 3         21 };
516 1         3 };
517              
518 1         4 my $own_subscription = RxPerl::Subscription->new;
519             my $own_subscriber = {
520             new_subscription => $own_subscription,
521             next => sub {
522 3         5 my ($v) = @_;
523              
524 3         4 local $_ = $v;
525 3         8 my $mini_obs = $delay_duration_selector->($v, $idx++);
526 3         6 $mini_obs->subscribe($make_mini_subscriber->($v));
527             },
528             error => sub {
529 0 0       0 $subscriber->{error}->(@_) if defined $subscriber->{error};
530             },
531             complete => sub {
532 1         3 $main_finished = 1;
533 1 50       4 if (!%mini_subscriptions) {
534 0 0       0 $subscriber->{complete}->() if defined $subscriber->{complete};
535             }
536             },
537 1         7 };
538              
539 1         3 $subscriber->subscription->add($own_subscription, \%mini_subscriptions);
540              
541 1         4 $source->subscribe($own_subscriber);
542              
543 1         8 return;
544 1         7 });
545 1         13 };
546             }
547              
548             sub op_distinct {
549 2     2   3 my ($key_selector) = @_;
550              
551             return sub {
552 2     2   3 my ($source) = @_;
553              
554             return rx_observable->new(sub {
555 2         5 my ($subscriber) = @_;
556              
557 2         3 my %keys_passed;
558 2         3 $subscriber->subscription->add(sub { %keys_passed = () });
  2         6  
559              
560             my $own_subscriber = {
561             %$subscriber,
562             next => sub {
563 15         18 my ($v) = @_;
564              
565 15         12 my $k;
566 15 100       19 if ($key_selector) {
567 3         3 my $ok = eval { local $_ = $v; $k = $key_selector->($v); 1 };
  3         4  
  3         5  
  3         8  
568 3 50       5 if (! $ok) {
569 0 0       0 $subscriber->{error}->($@) if defined $subscriber->{error};
570 0         0 return;
571             }
572             } else {
573 12         11 $k = $v;
574             }
575 15 100       28 if (! exists $keys_passed{$k}) {
576 6         10 $keys_passed{$k} = 1;
577 6 50       25 $subscriber->{next}->($v) if defined $subscriber->{next};
578             }
579             },
580 2         9 };
581              
582 2         6 $source->subscribe($own_subscriber);
583              
584 2         16 return;
585 2         8 });
586 2         11 };
587             }
588              
589             sub op_distinct_until_changed {
590 5     5   16 my ($comparison_function) = @_;
591              
592 5   100     34 $comparison_function //= \&_eqq;
593              
594             return sub {
595 5     5   14 my ($source) = @_;
596              
597             return rx_observable->new(sub {
598 5         14 my ($subscriber) = @_;
599              
600 5         13 my $prev_value;
601 5         13 my $have_prev_value = 0;
602              
603             my $own_subscriber = {
604             %$subscriber,
605             next => sub {
606 36         73 my @value = @_;
607              
608 36 100 100     130 if (! $have_prev_value or ! $comparison_function->($prev_value, $value[0])) {
609 19 50       131 $subscriber->{next}->(@value) if defined $subscriber->{next};
610 19         36 $have_prev_value = 1;
611 19         62 $prev_value = $value[0];
612             }
613             },
614 5         49 };
615              
616 5         28 $source->subscribe($own_subscriber);
617              
618 5         49 return;
619 5         56 });
620 5         51 };
621             }
622              
623             sub op_distinct_until_key_changed {
624 1     1   5 my ($key) = @_;
625              
626             return op_distinct_until_changed(sub {
627 5     5   19 _eqq($_[0]->{$key}, $_[1]->{$key});
628 1         10 }),
629             }
630              
631             sub op_element_at {
632 2     2   5 my ($index, $default) = @_;
633 2         3 my $has_default = @_ >= 2;
634 2         4 $index = int $index;
635              
636             return sub {
637 2     2   3 my ($source) = @_;
638              
639 2 50       4 $index >= 0 or return rx_throw_error('ArgumentOutOfRangeError');
640              
641             return rx_observable->new(sub {
642 2         4 my ($subscriber) = @_;
643              
644 2         4 my $i = 0;
645             my $own_subscriber = {
646             %$subscriber,
647             next => sub {
648 6 100       20 if ($i++ == $index) {
649 1 50       6 $subscriber->{next}->(@_) if defined $subscriber->{next};
650 1 50       5 $subscriber->{complete}->() if defined $subscriber->{complete};
651             }
652             },
653             complete => sub {
654 1 50       4 if ($has_default) {
655 1 50       6 $subscriber->{next}->($default) if defined $subscriber->{next};
656 1 50       4 $subscriber->{complete}->() if defined $subscriber->{complete};
657             } else {
658 0 0       0 $subscriber->{error}->('ArgumentOutOfRangeError') if defined $subscriber->{error};
659             }
660             },
661 2         14 };
662              
663 2         8 $source->subscribe($own_subscriber);
664              
665 2         8 return;
666 2         8 });
667 2         8 };
668             }
669              
670             sub op_end_with {
671 0     0   0 my (@values) = @_;
672              
673             return sub {
674 0     0   0 my ($source) = @_;
675              
676 0         0 return rx_concat(
677             $source,
678             rx_of(@values),
679             );
680             }
681 0         0 }
682              
683             sub op_every {
684 2     2   5 my ($predicate) = @_;
685              
686             return sub {
687 2     2   5 my ($source) = @_;
688              
689             return rx_observable->new(sub {
690 2         3 my ($subscriber) = @_;
691              
692 2         4 my $idx = 0;
693             my $own_subscriber = {
694             %$subscriber,
695             next => sub {
696 8         11 my ($v) = @_;
697 8         12 local $_ = $v;
698 8 100       20 if (! $predicate->($v, $idx++)) {
699 1 50       8 $subscriber->{next}->(!!0) if defined $subscriber->{next};
700 1 50       5 $subscriber->{complete}->() if defined $subscriber->{complete};
701             }
702             },
703             complete => sub {
704 1 50       41 $subscriber->{next}->(!!1) if defined $subscriber->{next};
705 1 50       4 $subscriber->{complete}->() if defined $subscriber->{complete};
706             },
707 2         11 };
708              
709 2         6 $source->subscribe($own_subscriber);
710              
711 2         5 return;
712 2         9 });
713 2         16 };
714             }
715              
716             sub op_exhaust_all {
717             return sub {
718 2     2   5 my ($source) = @_;
719              
720             return rx_observable->new(sub {
721 2         5 my ($subscriber) = @_;
722              
723 2         3 my $active_subscription;
724             my $big_completed;
725 2         5 my $own_subscription = RxPerl::Subscription->new;
726              
727 2         6 $subscriber->subscription->add(
728             \$active_subscription,
729             $own_subscription,
730             );
731              
732             my $own_subscriber = {
733             new_subscription => $own_subscription,
734             next => sub {
735 10         13 my ($new_obs) = @_;
736              
737 10 100       72 !$active_subscription or return;
738 5         12 $active_subscription = RxPerl::Subscription->new;
739             my $small_subscriber = {
740             new_subscription => $active_subscription,
741             next => sub {
742 14 50       34 $subscriber->{next}->(@_) if defined $subscriber->{next};
743             },
744             error => sub {
745 0 0       0 $subscriber->{error}->(@_) if defined $subscriber->{error};
746             },
747             complete => sub {
748 3         3 undef $active_subscription;
749 3 50 33     8 $subscriber->{complete}->() if $big_completed and defined $subscriber->{complete};
750             },
751 5         26 };
752 5         7 $new_obs->subscribe($small_subscriber);
753             },
754             error => sub {
755 0 0       0 $subscriber->{error}->(@_) if defined $subscriber->{error};
756             },
757             complete => sub {
758 0         0 $big_completed = 1;
759 0 0 0     0 $subscriber->{complete}->() if !$active_subscription and defined $subscriber->{complete};
760             },
761 2         14 };
762              
763 2         6 $source->subscribe($own_subscriber);
764              
765 2         5 return;
766 2         10 });
767 2     2   9 };
768             }
769              
770             sub op_exhaust_map {
771 1     1   2 my ($observable_factory) = @_;
772              
773             return sub {
774 1     1   2 my ($source) = @_;
775              
776             return $source->pipe(
777             op_map(sub {
778 4         8 my @args = @_;
779             rx_defer(sub {
780 2         3 local $_ = $args[0];
781 2         5 $observable_factory->(@args);
782 4         17 });
783 1         6 }),
784             op_exhaust_all(),
785             );
786 1         5 };
787             }
788              
789             sub op_filter {
790 9     9   18 my ($filtering_sub) = @_;
791              
792             return sub {
793 9     9   16 my ($source) = @_;
794              
795             return rx_observable->new(sub {
796 9         18 my ($subscriber) = @_;
797              
798 9         32 my $own_subscriber = { %$subscriber };
799 9         20 my $idx = 0;
800             $own_subscriber->{next} = sub {
801 75         134 my ($value) = @_;
802 75         111 my $passes = eval {
803 75         135 local $_ = $value;
804 75         188 $filtering_sub->($value, $idx++);
805             };
806 75 50       410 if (my $error = $@) {
807 0         0 $subscriber->{error}->($error);
808             } else {
809 75 100 66     277 $subscriber->{next}->(@_) if $passes and defined $subscriber->{next};
810             }
811 9         36 };
812              
813 9         28 $source->subscribe($own_subscriber);
814              
815 9         29 return;
816 9         36 });
817 9         40 };
818             }
819              
820             sub op_finalize {
821 8     8   14 my ($fn) = @_;
822              
823             return sub {
824 8     8   10 my ($source) = @_;
825              
826             return rx_observable->new(sub {
827 8         10 my ($subscriber) = @_;
828              
829 8   100     21 my $arr = $subscriber->{_subscription}{_finalize_cbs} //= [];
830 8         11 unshift @$arr, $fn;
831 8         17 $subscriber->{_subscription}->add( $arr );
832              
833 8         43 $source->subscribe($subscriber);
834              
835 8         9 return;
836 8         52 });
837 8         47 };
838             }
839              
840             sub op_find {
841 2     2   4 my ($predicate) = @_;
842              
843             return sub {
844 2     2   3 my ($source) = @_;
845              
846 2 50       6 $predicate or return rx_throw_error('missing predicate in op_find');
847              
848 2         5 return $source->pipe(
849             op_first($predicate),
850             op_default_if_empty(undef),
851             );
852 2         9 };
853             }
854              
855             sub op_find_index {
856 2     2   5 my ($predicate) = @_;
857              
858             return sub {
859 2     2   4 my ($source) = @_;
860              
861 2 50       5 $predicate or return rx_throw_error('missing predicate in op_find_index');
862              
863             return rx_observable->new(sub {
864 2         4 my ($subscriber) = @_;
865              
866 2         4 my $idx = 0;
867             my $own_subscriber = {
868             %$subscriber,
869             next => sub {
870 13         16 my ($val) = @_;
871 13         16 my $truth;
872 13         17 my $ok = eval {
873 13         14 local $_ = $val;
874 13         26 $truth = $predicate->($val, $idx++);
875 13         34 1
876             };
877 13 50       23 if (!$ok) {
878 0 0       0 $subscriber->{error}->($@) if defined $subscriber->{error};
879             }
880 13 100       25 if ($truth) {
881 1 50       10 $subscriber->{next}->($idx - 1) if defined $subscriber->{next};
882 1 50       7 $subscriber->{complete}->() if defined $subscriber->{complete};
883             }
884             },
885             complete => sub {
886 1 50       5 $subscriber->{next}->(-1) if defined $subscriber->{next};
887 1 50       4 $subscriber->{complete}->() if defined $subscriber->{complete};
888             },
889 2         13 };
890              
891 2         6 $source->subscribe($own_subscriber);
892              
893 2         4 return;
894 2         7 });
895 2         8 };
896             }
897              
898             sub op_first {
899 7     7   18 my ($condition) = @_;
900              
901             return sub {
902 7     7   15 my ($source) = @_;
903              
904 7         43 my @pipes = (op_take(1));
905 7 100       27 unshift @pipes, op_filter($condition) if defined $condition;
906              
907 7         30 return $source->pipe(@pipes);
908 7         48 };
909             }
910              
911             sub op_ignore_elements {
912             return sub {
913 69     69   139 my ($source) = @_;
914              
915             return rx_observable->new(sub {
916 77         162 my ($subscriber) = @_;
917              
918 77         405 my %own_subscriber = %$subscriber;
919 77         213 delete $own_subscriber{next};
920              
921 77         314 $source->subscribe(\%own_subscriber);
922              
923 77         230 return;
924 69         307 });
925             }
926 69     69   334 }
927              
928             sub op_is_empty {
929             return sub {
930 2     2   5 my ($source) = @_;
931              
932 2         156 return $source->pipe(
933             op_first(),
934             op_map_to(!!0),
935             op_default_if_empty(!!1),
936             );
937 2     2   15 };
938             }
939              
940             sub op_last {
941 3     3   5 my ($predicate, $default) = @_;
942 3         5 my $has_default = @_ >= 2;
943              
944             return sub {
945 3     3   4 my ($source) = @_;
946              
947             return rx_observable->new(sub {
948 3         5 my ($subscriber) = @_;
949              
950 3         5 my $last_val;
951             my $last_val_obtained;
952              
953 3         3 my $idx = 0;
954             my $own_subscriber = {
955             %$subscriber,
956             next => sub {
957 5         8 my ($v) = @_;
958              
959 5 100       11 if ($predicate) {
960 3         5 my $passes;
961 3         4 my $ok = eval { local $_ = $v; $passes = $predicate->($v, $idx++); 1 };
  3         5  
  3         9  
  3         12  
962 3 50       6 $ok or do {
963 0 0       0 $subscriber->{error}->($@) if defined $subscriber->{error};
964 0         0 return;
965             };
966 3 100       9 if ($passes) {
967 1         2 $last_val = $v;
968 1         2 $last_val_obtained = 1;
969             }
970             } else {
971 2         2 $last_val = $v;
972 2         3 $last_val_obtained = 1;
973             }
974             },
975             complete => sub {
976 3 100       9 if (! $last_val_obtained) {
977 1 50       3 if ($has_default) {
978 1 50       5 $subscriber->{next}->($default) if defined $subscriber->{next};
979 1 50       4 $subscriber->{complete}->() if defined $subscriber->{complete};
980             } else {
981 0         0 $subscriber->{error}->("no last value found");
982             }
983             } else {
984 2 50       10 $subscriber->{next}->($last_val) if defined $subscriber->{next};
985 2 50       6 $subscriber->{complete}->() if defined $subscriber->{complete};
986             }
987             },
988 3         18 };
989              
990 3         7 $source->subscribe($own_subscriber);
991              
992 3         7 return;
993 3         9 });
994 3         11 };
995             }
996              
997             sub op_map {
998 36     36   79 my ($mapping_sub) = @_;
999              
1000             return sub {
1001 36     36   60 my ($source) = @_;
1002              
1003             return rx_observable->new(sub {
1004 27         44 my ($subscriber) = @_;
1005              
1006 27         115 my $own_subscriber = { %$subscriber };
1007 27         54 my $idx = 0;
1008             $own_subscriber->{next} = sub {
1009 116         192 my ($value) = @_;
1010 116         184 my $result = eval {
1011 116         163 local $_ = $value;
1012 116         305 $mapping_sub->($value, $idx++);
1013             };
1014 116 50       473 if (my $error = $@) {
1015 0 0       0 $subscriber->{error}->($error) if defined $subscriber->{error};
1016             } else {
1017 116 50       364 $subscriber->{next}->($result) if defined $subscriber->{next};
1018             }
1019 27         95 };
1020              
1021 27         82 $source->subscribe($own_subscriber);
1022              
1023 27         87 return;
1024 36         140 });
1025 36         261 };
1026             }
1027              
1028             sub op_map_to {
1029 2     2   4 my ($mapping_value) = @_;
1030              
1031             return sub {
1032 2     2   4 my ($source) = @_;
1033              
1034             return rx_observable->new(sub {
1035 2         4 my ($subscriber) = @_;
1036              
1037 2         7 my $own_subscriber = { %$subscriber };
1038             $own_subscriber->{next} &&= sub {
1039 1 50       5 $subscriber->{next}->($mapping_value) if defined $subscriber->{next};
1040 2   33     16 };
1041              
1042 2         5 $source->subscribe($own_subscriber);
1043              
1044 2         6 return;
1045 2         7 });
1046 2         9 };
1047             }
1048              
1049             sub op_max {
1050 0     0   0 my ($comparer) = @_;
1051              
1052             return sub {
1053 0     0   0 my ($source) = @_;
1054              
1055             return rx_observable->new(sub {
1056 0         0 my ($subscriber) = @_;
1057              
1058 0         0 my $curr_max;
1059             my $has_curr_max;
1060              
1061             my $own_subscriber = {
1062             %$subscriber,
1063             next => sub {
1064 0         0 my ($v) = @_;
1065              
1066 0 0       0 if (!$has_curr_max) {
1067 0         0 $curr_max = $v;
1068 0         0 $has_curr_max = 1;
1069             }
1070             else {
1071 0 0       0 if (!$comparer) {
1072 0 0       0 if ($v > $curr_max) {
1073 0         0 $curr_max = $v;
1074             }
1075             }
1076             else {
1077 0 0       0 if ($comparer->($v, $curr_max) > 0) {
1078 0         0 $curr_max = $v;
1079             }
1080             }
1081             }
1082             },
1083             complete => sub {
1084 0 0       0 if ($has_curr_max) {
1085 0 0       0 $subscriber->{next}->($curr_max) if defined $subscriber->{next};
1086             }
1087 0 0       0 $subscriber->{complete}->() if defined $subscriber->{complete};
1088             },
1089 0         0 };
1090              
1091 0         0 $source->subscribe($own_subscriber);
1092              
1093 0         0 return;
1094 0         0 });
1095 0         0 };
1096             }
1097              
1098             sub _op_merge_all_make_subscriber {
1099 17     17   28 my ($small_subscriptions, $subscriber, $stored_observables, $big_completed_ref) = @_;
1100              
1101 17         35 my $small_subscription = RxPerl::Subscription->new;
1102 17         36 $small_subscriptions->{$small_subscription} = $small_subscription;
1103             return {
1104             new_subscription => $small_subscription,
1105             next => sub {
1106 38 50   38   88 $subscriber->{next}->(@_) if defined $subscriber->{next};
1107             },
1108             error => sub {
1109 0 0   0   0 $subscriber->{error}->(@_) if defined $subscriber->{error};
1110             },
1111             complete => sub {
1112 15     15   30 delete $small_subscriptions->{$small_subscription};
1113 15 100 33     53 if (@$stored_observables) {
    50          
1114 12         17 my $new_obs = shift @$stored_observables;
1115 12         22 $new_obs->subscribe(
1116             _op_merge_all_make_subscriber(
1117             $small_subscriptions,
1118             $subscriber,
1119             $stored_observables,
1120             $big_completed_ref,
1121             ),
1122             );
1123             } elsif ($$big_completed_ref and !%$small_subscriptions) {
1124 3 50       10 $subscriber->{complete}->() if defined $subscriber->{complete};
1125             }
1126             },
1127 17         134 };
1128             }
1129              
1130             sub op_merge_all {
1131 5     5   9 my ($concurrent) = @_;
1132              
1133             return sub {
1134 5     5   7 my ($source) = @_;
1135              
1136             return rx_observable->new(sub {
1137 5         9 my ($subscriber) = @_;
1138              
1139 5         9 my @stored_observables;
1140             my %small_subscriptions;
1141 5         0 my $big_completed;
1142              
1143 5         10 my $own_subscription = RxPerl::Subscription->new;
1144 5         11 $subscriber->subscription->add(
1145             $own_subscription,
1146             \%small_subscriptions,
1147             );
1148              
1149             my $own_subscriber = {
1150             new_subscription => $own_subscription,
1151             next => sub {
1152 30         36 my ($new_observable) = @_;
1153              
1154 30         37 push @stored_observables, $new_observable;
1155 30 100 66     112 if (!defined $concurrent or keys(%small_subscriptions) < $concurrent) {
1156 5         6 my $new_obs = shift @stored_observables;
1157 5         16 $new_obs->subscribe(
1158             _op_merge_all_make_subscriber(
1159             \%small_subscriptions,
1160             $subscriber,
1161             \@stored_observables,
1162             \$big_completed,
1163             ),
1164             );
1165             }
1166             },
1167             error => sub {
1168 0 0       0 $subscriber->{error}->(@_) if defined $subscriber->{error};
1169             },
1170             complete => sub {
1171 3         5 $big_completed = 1;
1172 3 50 33     145 $subscriber->{complete}->() if !%small_subscriptions and defined $subscriber->{complete};
1173             },
1174 5         29 };
1175              
1176 5         14 $source->subscribe($own_subscriber);
1177              
1178 5         19 return;
1179 5         22 });
1180 5         22 };
1181             }
1182              
1183             sub op_merge_map {
1184 0     0   0 my ($observable_factory) = @_;
1185              
1186             return sub {
1187 0     0   0 my ($source) = @_;
1188              
1189 0         0 return $source->pipe(
1190             op_map($observable_factory),
1191             op_merge_all(),
1192             );
1193 0         0 };
1194             }
1195              
1196             sub op_merge_with {
1197 1     1   5 my @other_sources = @_;
1198              
1199             return sub {
1200 1     1   3 my ($source) = @_;
1201              
1202 1         4 return rx_merge(
1203             $source,
1204             @other_sources,
1205             );
1206 1         8 };
1207             }
1208              
1209             sub op_min {
1210 0     0   0 my ($comparer) = @_;
1211              
1212             return sub {
1213 0     0   0 my ($source) = @_;
1214              
1215             return rx_observable->new(sub {
1216 0         0 my ($subscriber) = @_;
1217              
1218 0         0 my $curr_min;
1219             my $has_curr_min;
1220              
1221             my $own_subscriber = {
1222             %$subscriber,
1223             next => sub {
1224 0         0 my ($v) = @_;
1225              
1226 0 0       0 if (!$has_curr_min) {
1227 0         0 $curr_min = $v;
1228 0         0 $has_curr_min = 1;
1229             }
1230             else {
1231 0 0       0 if (!$comparer) {
1232 0 0       0 if ($v < $curr_min) {
1233 0         0 $curr_min = $v;
1234             }
1235             }
1236             else {
1237 0 0       0 if ($comparer->($v, $curr_min) < 0) {
1238 0         0 $curr_min = $v;
1239             }
1240             }
1241             }
1242             },
1243             complete => sub {
1244 0 0       0 if ($has_curr_min) {
1245 0 0       0 $subscriber->{next}->($curr_min) if defined $subscriber->{next};
1246             }
1247 0 0       0 $subscriber->{complete}->() if defined $subscriber->{complete};
1248             },
1249 0         0 };
1250              
1251 0         0 $source->subscribe($own_subscriber);
1252              
1253 0         0 return;
1254 0         0 });
1255 0         0 };
1256             }
1257              
1258             sub op_multicast {
1259 0     0   0 my ($subject_factory) = @_;
1260              
1261             return sub {
1262 0     0   0 my ($source) = @_;
1263              
1264 0         0 return RxPerl::ConnectableObservable->new($source, $subject_factory);
1265 0         0 };
1266             }
1267              
1268             sub op_on_error_resume_next_with {
1269 1     1   3 my @other_sources = @_;
1270              
1271             return sub {
1272 1     1   2 my ($source) = @_;
1273              
1274 1         3 return rx_on_error_resume_next(
1275             $source,
1276             @other_sources,
1277             );
1278 1         5 };
1279             }
1280              
1281             sub op_pairwise {
1282             return sub {
1283 0     0   0 my ($source) = @_;
1284              
1285             return rx_observable->new(sub {
1286 0         0 my ($subscriber) = @_;
1287              
1288 0         0 my $prev_value;
1289 0         0 my $have_prev_value = 0;
1290              
1291             my $own_subscriber = {
1292             %$subscriber,
1293             (
1294             next => sub {
1295 0         0 my ($value) = @_;
1296              
1297 0 0       0 if ($have_prev_value) {
1298 0 0       0 $subscriber->{next}->([$prev_value, $value]) if defined $subscriber->{next};
1299             } else {
1300 0         0 $have_prev_value = 1;
1301             }
1302              
1303 0         0 $prev_value = $value;
1304             }
1305             ) x!! defined $subscriber->{next},
1306 0         0 };
1307              
1308 0         0 $source->subscribe($own_subscriber);
1309              
1310 0         0 return;
1311 0         0 });
1312 0     0   0 };
1313             }
1314              
1315             sub op_pluck {
1316 1     1   4 my (@keys) = @_;
1317              
1318 1 50       23 croak 'List of properties cannot be empty,' unless @keys;
1319              
1320             return sub {
1321 1     1   3 my ($source) = @_;
1322              
1323             return rx_observable->new(sub {
1324 1         4 my ($subscriber) = @_;
1325              
1326             my $own_subscriber = {
1327             %$subscriber,
1328             next => sub {
1329 5         12 my (@value) = @_;
1330              
1331 5 50       12 if (! @value) {
1332 0 0       0 $subscriber->{next}->() if defined $subscriber->{next};
1333 0         0 return;
1334             }
1335              
1336 5         12 my $cursor = $value[0];
1337 5         10 foreach my $key (@keys) {
1338 7 100 100     42 if ((reftype($cursor) // '') eq 'HASH' and exists $cursor->{$key}) {
      100        
1339 4         10 $cursor = $cursor->{$key};
1340             } else {
1341 3 50       15 $subscriber->{next}->(undef) if defined $subscriber->{next};
1342 3         10 return;
1343             }
1344             }
1345              
1346 2 50       10 $subscriber->{next}->($cursor) if defined $subscriber->{next};
1347             },
1348 1         8 };
1349              
1350 1         5 $source->subscribe($own_subscriber);
1351 1         7 });
1352 1         10 };
1353             }
1354              
1355             sub op_race_with {
1356 1     1   3 my @other_sources = @_;
1357              
1358             return sub {
1359 1     1   2 my ($source) = @_;
1360              
1361 1         5 return rx_race(
1362             $source,
1363             @other_sources,
1364             );
1365 1         6 };
1366             }
1367              
1368             sub op_reduce {
1369 2     2   7 my ($accumulator, @seed) = @_;
1370              
1371             return sub {
1372 2     2   4 my ($source) = @_;
1373              
1374             return rx_observable->new(sub {
1375 2         7 my ($subscriber) = @_;
1376              
1377 2         7 my $got_first = @seed;
1378 2         3 my $acc;
1379 2 100       8 $acc = $seed[0] if $got_first;
1380              
1381 2         5 my $idx = 0;
1382             my $own_subscriber = {
1383             %$subscriber,
1384             next => sub {
1385 10         21 my ($v) = @_;
1386              
1387 10 100       24 if ($got_first) {
1388 9         16 my $ok = eval { $acc = $accumulator->($acc, $v, $idx++); 1 };
  9         29  
  9         35  
1389 9 50 33     32 $ok or $subscriber->{error}->($@) if defined $subscriber->{error};
1390             } else {
1391 1         2 $acc = $v;
1392 1         5 $got_first = 1;
1393             }
1394             },
1395             complete => sub {
1396 2 50 33     15 $subscriber->{next}->($acc) if $got_first and defined $subscriber->{next};
1397 2 50       10 $subscriber->{complete}->() if defined $subscriber->{complete};
1398             },
1399 2         19 };
1400              
1401 2         10 $source->subscribe($own_subscriber);
1402              
1403 2         7 return;
1404 2         12 });
1405 2         10 };
1406             }
1407              
1408             sub op_ref_count {
1409             return sub {
1410 0     0   0 my ($source) = @_;
1411              
1412 0 0       0 croak 'op_ref_count() was not applied to a connectable observable'
1413             unless $source->isa('RxPerl::ConnectableObservable');
1414              
1415 0         0 my $count = 0;
1416              
1417 0         0 my $connection_subscription;
1418             my $typical_unsubscription_fn = sub {
1419 0 0       0 if (--$count == 0) {
1420 0         0 $connection_subscription->unsubscribe;
1421             }
1422 0         0 };
1423              
1424             return rx_observable->new(sub {
1425 0         0 my ($subscriber) = @_;
1426              
1427 0         0 my $count_was = $count++;
1428              
1429 0 0       0 if ($count_was == 0) {
1430 0         0 $connection_subscription = RxPerl::Subscription->new;
1431              
1432 0         0 $subscriber->subscription->add($typical_unsubscription_fn);
1433 0         0 $source->subscribe($subscriber);
1434              
1435 0         0 $connection_subscription = $source->connect;
1436             } else {
1437 0         0 $subscriber->subscription->add($typical_unsubscription_fn);
1438 0         0 $source->subscribe($subscriber);
1439             }
1440              
1441 0         0 return;
1442 0         0 });
1443 0     0   0 };
1444             }
1445              
1446             sub _op_repeat_helper {
1447 8     8   22 my ($subscriber, $source, $count_ref, $own_subscription_ref) = @_;
1448              
1449 8         26 my $own_subscription = RxPerl::Subscription->new;
1450 8         17 $$own_subscription_ref = $own_subscription;
1451             my $own_subscriber = {
1452             new_subscription => $own_subscription,
1453             next => $subscriber->{next},
1454             error => $subscriber->{error},
1455             complete => sub {
1456 6 100   6   18 if (--$$count_ref) {
1457 5         16 _op_repeat_helper(
1458             $subscriber, $source, $count_ref, $own_subscription_ref,
1459             );
1460             } else {
1461 1 50       8 $subscriber->{complete}->() if defined $subscriber->{complete};
1462             }
1463             },
1464 8         64 };
1465              
1466 8         32 $source->subscribe($own_subscriber);
1467             }
1468              
1469             sub op_repeat {
1470 3     3   8 my ($count) = @_;
1471              
1472             return sub {
1473 3     3   7 my ($source) = @_;
1474              
1475             return rx_observable->new(sub {
1476 3         7 my ($subscriber) = @_;
1477              
1478 3         7 my $count = $count;
1479              
1480 3 50       11 $count = -1 if ! defined $count;
1481 3 50       10 if ($count == 0) {
1482 0 0       0 $subscriber->{complete}->() if defined $subscriber->{complete};
1483 0         0 return;
1484             }
1485              
1486 3         6 my $own_subscription;
1487 3         8 my $own_subscription_ref = \$own_subscription;
1488              
1489 3         10 $subscriber->subscription->add(
1490             $own_subscription_ref,
1491             );
1492              
1493 3         13 _op_repeat_helper(
1494             $subscriber, $source, \$count, $own_subscription_ref,
1495             );
1496              
1497 3         8 return;
1498 3         18 });
1499 3         20 };
1500             }
1501              
1502             sub _op_retry_helper {
1503 8     8   18 my ($subscriber, $source, $count_ref, $own_subscription_ref) = @_;
1504              
1505 8         27 my $own_subscription = RxPerl::Subscription->new;
1506 8         15 $$own_subscription_ref = $own_subscription;
1507             my $own_subscriber = {
1508             new_subscription => $own_subscription,
1509             next => $subscriber->{next},
1510             error => sub {
1511 7 100   7   26 if ($$count_ref--) {
1512 6         16 _op_retry_helper(
1513             $subscriber, $source, $count_ref, $own_subscription_ref,
1514             );
1515             } else {
1516 1 50       7 $subscriber->{error}->(@_) if defined $subscriber->{error};
1517             }
1518             },
1519             complete => $subscriber->{complete},
1520 8         50 };
1521              
1522 8         22 $source->subscribe($own_subscriber);
1523             }
1524              
1525             sub op_retry {
1526 2     2   9 my ($count) = @_;
1527              
1528             return sub {
1529 2     2   6 my ($source) = @_;
1530              
1531             return rx_observable->new(sub {
1532 2         7 my ($subscriber) = @_;
1533              
1534 2         5 my $count = $count;
1535              
1536 2 50       9 $count = -1 if ! defined $count;
1537              
1538 2         3 my $own_subscription;
1539 2         19 my $own_subscription_ref = \$own_subscription;
1540              
1541 2         11 $subscriber->subscription->add(
1542             $own_subscription_ref,
1543             );
1544              
1545 2         10 _op_retry_helper(
1546             $subscriber, $source, \$count, $own_subscription_ref,
1547             );
1548              
1549 2         6 return;
1550 2         41 });
1551 2         22 };
1552             }
1553              
1554             sub op_sample {
1555 0     0   0 my ($notifier) = @_;
1556              
1557             return sub {
1558 0     0   0 my ($source) = @_;
1559              
1560             return rx_observable->new(sub {
1561 0         0 my ($subscriber) = @_;
1562              
1563 0         0 my $last_val;
1564             my $has_last_val;
1565              
1566 0         0 my $notifier_subscription = RxPerl::Subscription->new;
1567             my $notifier_subscriber = {
1568             new_subscription => $notifier_subscription,
1569             next => sub {
1570 0 0       0 if ($has_last_val) {
1571 0 0       0 $subscriber->{next}->($last_val) if defined $subscriber->{next};
1572 0         0 undef $has_last_val;
1573             }
1574             },
1575             error => sub {
1576 0 0       0 $subscriber->{error}->(@_) if defined $subscriber->{error};
1577             },
1578 0         0 };
1579              
1580 0         0 $subscriber->subscription->add($notifier_subscription);
1581              
1582             my $own_subscriber = {
1583             %$subscriber,
1584             next => sub {
1585 0         0 my ($v) = @_;
1586              
1587 0         0 $last_val = $v;
1588 0         0 $has_last_val = 1;
1589             },
1590 0         0 };
1591              
1592 0         0 $notifier->subscribe($notifier_subscriber);
1593 0         0 $source->subscribe($own_subscriber);
1594              
1595 0         0 return;
1596 0         0 });
1597 0         0 };
1598             }
1599              
1600             sub op_sample_time {
1601 0     0   0 my ($period) = @_;
1602              
1603 0         0 return op_sample(rx_interval($period));
1604             }
1605              
1606             sub op_scan {
1607 0     0   0 my ($accumulator_function, $seed) = @_;
1608 0         0 my $has_seed = @_ >= 2;
1609              
1610             return sub {
1611 0     0   0 my ($source) = @_;
1612              
1613             return rx_observable->new(sub {
1614 0         0 my ($subscriber) = @_;
1615              
1616 0         0 my $has_seed = $has_seed;
1617              
1618 0 0       0 my $acc; $acc = $seed if $has_seed;
  0         0  
1619 0         0 my $index = -1;
1620             my $own_subscriber = {
1621             %$subscriber,
1622             (
1623             next => sub {
1624 0         0 my ($value) = @_;
1625              
1626 0 0       0 if (! $has_seed) {
1627 0         0 $acc = $value;
1628 0         0 $has_seed = 1;
1629             } else {
1630 0         0 ++$index;
1631 0         0 $acc = $accumulator_function->($acc, $value, $index);
1632             }
1633              
1634 0 0       0 $subscriber->{next}->($acc) if defined $subscriber->{next};
1635             },
1636             ) x!! defined $subscriber->{next},
1637 0         0 };
1638              
1639 0         0 $source->subscribe($own_subscriber);
1640              
1641 0         0 return;
1642 0         0 });
1643 0         0 };
1644             }
1645              
1646             sub op_share {
1647             return (
1648 0     0   0 op_multicast(sub { rx_subject->new }),
  0     0   0  
1649             op_ref_count(),
1650             );
1651             }
1652              
1653             sub op_single {
1654 2     2   3 my ($predicate) = @_;
1655              
1656             return sub {
1657 2     2   3 my ($source) = @_;
1658              
1659             return rx_observable->new(sub {
1660 2         3 my ($subscriber) = @_;
1661              
1662 2         3 my @found;
1663              
1664 2         3 my $idx = 0;
1665             my $own_subscriber = {
1666             %$subscriber,
1667             next => sub {
1668 3         6 my ($v) = @_;
1669              
1670 3 100       8 if (!$predicate) {
1671 1         2 push @found, $v;
1672             } else {
1673 2         2 my $found;
1674 2         5 my $ok = eval { local $_ = $v; $found = $predicate->($v, $idx++); 1 };
  2         5  
  2         7  
  2         9  
1675 2 50       6 if (! $ok) {
1676 0 0       0 $subscriber->{error}->($@) if defined $subscriber->{error};
1677 0         0 return;
1678             }
1679 2 100       8 push @found, $v if $found;
1680             }
1681              
1682 3 50 33     11 $subscriber->{error}->('Too many values match') if @found > 1 and defined $subscriber->{error};
1683             },
1684             complete => sub {
1685 2 50       6 if (! @found) {
1686 0 0       0 $subscriber->{error}->('No values match') if defined $subscriber->{error};
1687             } else {
1688 2 50       9 $subscriber->{next}->($found[0]) if defined $subscriber->{next};
1689 2 50       6 $subscriber->{complete}->() if defined $subscriber->{complete};
1690             }
1691             },
1692 2         13 };
1693              
1694 2         5 $source->subscribe($own_subscriber);
1695              
1696 2         5 return;
1697 2         9 });
1698 2         9 };
1699             }
1700              
1701             sub op_skip {
1702 1     1   5 my ($count) = @_;
1703              
1704             return sub {
1705 1     1   3 my ($source) = @_;
1706              
1707             return rx_observable->new(sub {
1708 1         3 my ($subscriber) = @_;
1709              
1710 1         3 my $count = int $count;
1711              
1712 1         4 my $own_subscriber;
1713 1 50       4 $own_subscriber = $subscriber if $count <= 0;
1714             $own_subscriber //= {
1715             %$subscriber,
1716             next => sub {
1717 5 100       16 if ($count-- <= 0) {
1718 2 50       9 $subscriber->{next}->(@_) if defined $subscriber->{next};
1719             }
1720             },
1721 1   50     13 };
1722              
1723 1         5 $source->subscribe($own_subscriber);
1724              
1725 1         9 return;
1726 1         9 });
1727 1         9 };
1728             }
1729              
1730             sub op_skip_last {
1731 1     1   3 my ($skip_count) = @_;
1732              
1733             return sub {
1734 1     1   3 my ($source) = @_;
1735              
1736             return rx_observable->new(sub {
1737 1         2 my ($subscriber) = @_;
1738              
1739 1         2 my @skipped;
1740 1         4 $subscriber->subscription->add(sub { undef @skipped });
  1         4  
1741              
1742 1         4 my $own_subscriber = { %$subscriber };
1743             $own_subscriber->{next} &&= sub {
1744 4         8 my ($v) = @_;
1745              
1746 4         32 push @skipped, $v;
1747 4 100       12 if (@skipped > $skip_count) {
1748 2         5 my $new_v = shift @skipped;
1749 2 50       8 $subscriber->{next}->($new_v) if defined $subscriber->{next};
1750             }
1751 1   33     8 };
1752              
1753 1         2 $source->subscribe($own_subscriber);
1754              
1755 1         2 return;
1756 1         6 });
1757 1         27 };
1758             }
1759              
1760             sub op_skip_until {
1761 6     6   9 my ($notifier) = @_;
1762              
1763             # FUTURE TODO: allow notifier to be a promise
1764 6 50       13 croak q"You provided 'undef' where a stream was expected. You can provide an observable."
1765             unless defined $notifier;
1766 6 50 33     32 croak q"The notifier of 'op_skip_until' needs to be an observable."
1767             unless blessed $notifier and $notifier->isa('RxPerl::Observable');
1768              
1769             return sub {
1770 6     6   10 my ($source) = @_;
1771              
1772             return rx_observable->new(sub {
1773 6         9 my ($subscriber) = @_;
1774              
1775 6         7 my $notifier_has_emitted;
1776             my $n_s = $notifier->pipe(
1777             op_take(1),
1778             )->subscribe(
1779             sub {
1780 3         7 $notifier_has_emitted = 1;
1781             },
1782             sub {
1783 1 50       8 $subscriber->{error}->(@_) if defined $subscriber->{error};
1784             },
1785 6         15 );
1786              
1787             my $own_subscriber = {
1788             %$subscriber,
1789             next => sub {
1790             $subscriber->{next}->(@_) if defined $subscriber->{next}
1791 32 100 66     95 and $notifier_has_emitted;
1792             },
1793 6         39 };
1794              
1795 6         34 $source->subscribe($own_subscriber);
1796              
1797 6         16 return $n_s;
1798 6         20 });
1799 6         24 };
1800             }
1801              
1802             sub op_skip_while {
1803 1     1   2 my ($predicate) = @_;
1804              
1805             return sub {
1806 1     1   2 my ($source) = @_;
1807              
1808             return rx_observable->new(sub {
1809 1         4 my ($subscriber) = @_;
1810              
1811 1         2 my $finished_skipping = 0;
1812              
1813 1         2 my $idx = 0;
1814             my $own_subscriber = {
1815             %$subscriber,
1816             next => sub {
1817 6         8 my ($v) = @_;
1818              
1819 6         7 my $should_display;
1820 6 100       8 if ($finished_skipping) {
1821 3         3 $should_display = 1;
1822             } else {
1823 3         3 my $satisfies_predicate;
1824 3         4 my $ok = eval { local $_ = $v; $satisfies_predicate = $predicate->($v, $idx++); 1 };
  3         4  
  3         5  
  3         8  
1825 3 50       4 $ok or do {
1826 0 0       0 $subscriber->{error}->($@) if defined $subscriber->{error};
1827 0         0 return;
1828             };
1829 3 100       6 if (! $satisfies_predicate) {
1830 1         2 $finished_skipping = 1;
1831 1         2 $should_display = 1;
1832             }
1833             }
1834              
1835 6 100 66     18 $subscriber->{next}->(@_) if $should_display and defined $subscriber->{next};
1836             }
1837 1         5 };
1838              
1839 1         3 $source->subscribe($own_subscriber);
1840              
1841 1         8 return;
1842 1         5 });
1843 1         6 };
1844             }
1845              
1846             sub op_start_with {
1847 3     3   12 my (@values) = @_;
1848              
1849             return sub {
1850 3     3   11 my ($source) = @_;
1851              
1852 3         15 return rx_concat(
1853             rx_of(@values),
1854             $source,
1855             );
1856 3         21 };
1857             }
1858              
1859             sub op_switch_all {
1860             return sub {
1861 1     1   2 my ($source) = @_;
1862              
1863             return rx_observable->new(sub {
1864 1         3 my ($subscriber) = @_;
1865              
1866 1         3 my $old_subscription;
1867              
1868             my $chief_is_complete;
1869 1         0 my $sub_is_complete;
1870              
1871             my $obs_subscriber = {
1872             next => sub {
1873 6 50       15 $subscriber->{next}->(@_) if defined $subscriber->{next};
1874             },
1875             error => sub {
1876 0 0       0 $subscriber->{error}->(@_) if defined $subscriber->{error};
1877             },
1878             complete => sub {
1879 0         0 $sub_is_complete = 1;
1880 0 0 0     0 $subscriber->{complete}->() if $chief_is_complete and defined $subscriber->{complete};
1881             },
1882 1         6 };
1883              
1884 1         3 my $own_subscription = RxPerl::Subscription->new;
1885 1         3 $subscriber->subscription->add(\$old_subscription, $own_subscription);
1886              
1887             my $own_subscriber = {
1888             new_subscription => $own_subscription,
1889             next => sub {
1890 3         4 my ($new_observable) = @_;
1891              
1892 3         3 $sub_is_complete = 0;
1893 3 100       9 $old_subscription->unsubscribe() if $old_subscription;
1894 3         6 $old_subscription = $new_observable->subscribe($obs_subscriber);
1895             },
1896             error => sub {
1897 0 0       0 $subscriber->{error}->(@_) if defined $subscriber->{error};
1898             },
1899             complete => sub {
1900 1         1 $chief_is_complete = 1;
1901 1 50 33     5 $subscriber->{complete}->() if $sub_is_complete and defined $subscriber->{complete};
1902             },
1903 1         6 };
1904              
1905 1         3 $source->subscribe($own_subscriber);
1906              
1907 1         3 return;
1908 1         5 });
1909 1     1   3 };
1910             }
1911              
1912             sub op_switch_map {
1913 0     0   0 my ($observable_factory) = @_;
1914              
1915             return sub {
1916 0     0   0 my ($source) = @_;
1917              
1918 0         0 return $source->pipe(
1919             op_map($observable_factory),
1920             op_switch_all(),
1921             );
1922             }
1923 0         0 }
1924              
1925             sub op_take {
1926 95     95   170 my ($count) = @_;
1927              
1928 95 50       249 croak 'negative argument passed to op_take' unless $count >= 0;
1929              
1930             return sub {
1931 95     95   160 my ($source) = @_;
1932              
1933             return rx_observable->new(sub {
1934 79         140 my ($subscriber) = @_;
1935              
1936 79         168 my $remaining = int $count;
1937              
1938 79 100       182 if ($remaining == 0) {
1939 1 50       10 $subscriber->{complete}->() if defined $subscriber->{complete};
1940 1         4 return;
1941             }
1942              
1943             my $own_subscriber = {
1944             %$subscriber,
1945             next => sub {
1946 215 100       811 $subscriber->{next}->(@_) if defined $subscriber->{next};
1947 215 100 100     717 $subscriber->{complete}->() if --$remaining == 0 and defined $subscriber->{complete};
1948             },
1949 78         449 };
1950              
1951 78         315 $source->subscribe($own_subscriber);
1952              
1953 78         265 return;
1954 95         361 });
1955 95         600 };
1956             }
1957              
1958             sub op_take_last {
1959 2     2   6 my ($count) = @_;
1960              
1961             return sub {
1962 2     2   4 my ($source) = @_;
1963              
1964             return rx_observable->new(sub {
1965 2         3 my ($subscriber) = @_;
1966              
1967 2         3 my @last_values;
1968              
1969             my $own_subscriber = {
1970             %$subscriber,
1971             next => sub {
1972 7         7 my ($v) = @_;
1973              
1974 7         9 push @last_values, $v;
1975 7 100       12 if (@last_values > $count) {
1976 2         6 shift @last_values;
1977             }
1978             },
1979             complete => sub {
1980 2         3 foreach my $last_val (@last_values) {
1981 5 50       12 $subscriber->{next}->($last_val) if defined $subscriber->{next};
1982             }
1983 2 50       5 $subscriber->{complete}->() if defined $subscriber->{complete};
1984             },
1985 2         10 };
1986              
1987 2         7 $source->subscribe($own_subscriber);
1988              
1989 2         12 return;
1990 2         10 });
1991             }
1992 2         12 }
1993              
1994             sub op_take_until {
1995 1     1   7 my ($notifier_observable) = @_;
1996              
1997             return sub {
1998 1     1   2 my ($source) = @_;
1999              
2000             return rx_observable->new(sub {
2001 1         2 my ($subscriber) = @_;
2002              
2003             my $n_s = $notifier_observable->subscribe(
2004             sub {
2005 1 50       15 $subscriber->{complete}->() if defined $subscriber->{complete};
2006             },
2007             sub {
2008 0 0       0 $subscriber->{error}->(@_) if defined $subscriber->{error};
2009             },
2010 1         7 );
2011              
2012 1         4 $source->subscribe($subscriber);
2013              
2014 1         4 return $n_s;
2015 1         6 });
2016 1         6 };
2017             }
2018              
2019             sub op_take_while {
2020 2     2   8 my ($cond, $include) = @_;
2021              
2022             return sub {
2023 2     2   8 my ($source) = @_;
2024              
2025             return rx_observable->new(sub {
2026 2         6 my ($subscriber) = @_;
2027              
2028 2         5 my $i = 0;
2029             my $own_subscriber = {
2030             %$subscriber,
2031             next => sub {
2032 6         43 my ($value) = @_;
2033              
2034 6 100       15 if (! do { local $_ = $value; $cond->($value, $i++) }) {
  6         15  
  6         33  
2035 2 100 66     25 $subscriber->{next}->($value) if $include and defined $subscriber->{next};
2036 2 50       16 $subscriber->{complete}->() if defined $subscriber->{complete};
2037 2         46 return;
2038             }
2039              
2040 4 50       47 $subscriber->{next}->(@_) if defined $subscriber->{next};
2041             },
2042 2         23 };
2043              
2044 2         12 $source->subscribe($own_subscriber);
2045              
2046 2         9 return;
2047 2         16 });
2048 2         18 };
2049             }
2050              
2051             sub op_tap {
2052 0     0   0 my @args = @_;
2053              
2054             return sub {
2055 0     0   0 my ($source) = @_;
2056              
2057             return rx_observable->new(sub {
2058 0         0 my ($subscriber) = @_;
2059              
2060 0         0 my @args = @args;
2061 0 0 0     0 my $tap_subscriber; $tap_subscriber = $args[0] if (reftype($args[0]) // '') eq 'HASH';
  0         0  
2062             $tap_subscriber //= {
2063 0   0     0 map {($_, shift @args)} qw/ next error complete /
  0         0  
2064             };
2065              
2066 0         0 my %own_keys = map {$_ => 1} grep { /^(next|error|complete)\z/ } (keys(%$tap_subscriber), keys(%$subscriber));
  0         0  
  0         0  
2067              
2068             my $own_subscriber = {
2069             %$subscriber,
2070             map {
2071 0         0 my $key = $_;
  0         0  
2072             ($key => sub {
2073 0 0       0 $tap_subscriber->{$key}->(@_) if defined $tap_subscriber->{$key};
2074 0 0       0 $subscriber->{$key}->(@_) if defined $subscriber->{$key};
2075 0         0 });
2076             } keys %own_keys
2077             };
2078              
2079 0         0 $source->subscribe($own_subscriber);
2080              
2081 0         0 return;
2082 0         0 });
2083 0         0 };
2084             }
2085              
2086             sub op_throttle {
2087 0     0   0 my ($duration_selector) = @_;
2088              
2089             return sub {
2090 0     0   0 my ($source) = @_;
2091              
2092             return rx_observable->new(sub {
2093 0         0 my ($subscriber) = @_;
2094              
2095 0         0 my $mini_subscription;
2096              
2097             my $mini_subscriber = {
2098             error => sub {
2099 0 0       0 $subscriber->{error}->(@_) if defined $subscriber->{error};
2100             },
2101             complete => sub {
2102 0         0 undef $mini_subscription;
2103             },
2104 0         0 };
2105              
2106             my $own_subscriber = {
2107             %$subscriber,
2108             next => sub {
2109 0         0 my ($v) = @_;
2110              
2111 0 0       0 if (! $mini_subscription) {
2112 0 0       0 $subscriber->{next}->(@_) if defined $subscriber->{next};
2113 0         0 $mini_subscription = do { local $_ = $v; $duration_selector->($v) }->pipe(
  0         0  
  0         0  
2114             op_take(1),
2115             )->subscribe($mini_subscriber);
2116             }
2117             },
2118 0         0 };
2119              
2120 0         0 $subscriber->subscription->add(\$mini_subscription);
2121              
2122 0         0 $source->subscribe($own_subscriber);
2123              
2124 0         0 return;
2125 0         0 });
2126 0         0 };
2127             }
2128              
2129             sub op_throttle_time {
2130 0     0   0 my ($duration) = @_;
2131              
2132 0     0   0 return op_throttle(sub { rx_timer($duration) });
  0         0  
2133             }
2134              
2135             sub op_throw_if_empty {
2136 0     0   0 my ($error_factory) = @_;
2137              
2138             return sub {
2139 0     0   0 my ($source) = @_;
2140              
2141             return rx_observable->new(sub {
2142 0         0 my ($subscriber) = @_;
2143              
2144 0         0 my $is_empty = 1;
2145              
2146             my $own_subscriber = {
2147             %$subscriber,
2148             next => sub {
2149 0         0 $is_empty = 0;
2150 0 0       0 $subscriber->{next}->(@_) if defined $subscriber->{next};
2151             },
2152             complete => sub {
2153 0 0       0 if ($is_empty) {
2154 0 0       0 $subscriber->{error}->($error_factory->()) if defined $subscriber->{error};
2155             } else {
2156 0 0       0 $subscriber->{complete}->() if defined $subscriber->{complete};
2157             }
2158             },
2159 0         0 };
2160              
2161 0         0 $source->subscribe($own_subscriber);
2162              
2163 0         0 return;
2164 0         0 });
2165 0         0 };
2166             }
2167              
2168             sub op_time_interval {
2169             return sub {
2170 0     0   0 my ($source) = @_;
2171              
2172 0         0 my $t0 = Time::HiRes::time();
2173              
2174             return $source->pipe(
2175             op_map(sub {
2176 0         0 my $now = Time::HiRes::time();
2177 0         0 my $interval = $now - $t0;
2178 0         0 $t0 = $now;
2179 0         0 return { value => $_, interval => $interval };
2180 0         0 }),
2181             );
2182 0     0   0 };
2183             }
2184              
2185             sub op_timeout {
2186 0     0   0 my ($duration) = @_;
2187              
2188             return sub {
2189 0     0   0 my ($source) = @_;
2190              
2191             return rx_observable->new(sub {
2192 0         0 my ($subscriber) = @_;
2193              
2194 0         0 my $subject = rx_behavior_subject->new(1);
2195             my $s_s = $subject->pipe(
2196 0         0 op_switch_map(sub { rx_timer($duration) }),
2197             )->subscribe(sub {
2198 0 0       0 $subscriber->{error}->('Timeout has occurred') if defined $subscriber->{error};
2199 0         0 });
2200              
2201 0         0 my $own_subscription = RxPerl::Subscription->new;
2202 0         0 $subscriber->subscription->add($own_subscription, $s_s);
2203              
2204             my $own_subscriber = {
2205             new_subscription => $own_subscription,
2206             next => sub {
2207 0 0       0 $subject->{next}->(1) if defined $subject->{next};
2208 0 0       0 $subscriber->{next}->(@_) if defined $subscriber->{next};
2209             },
2210             error => sub {
2211 0 0       0 $subscriber->{error}->(@_) if defined $subscriber->{error};
2212             },
2213             complete => sub {
2214 0 0       0 $subscriber->{complete}->() if defined $subscriber->{complete};
2215             }
2216 0         0 };
2217              
2218 0         0 $source->subscribe($own_subscriber);
2219              
2220 0         0 return;
2221 0         0 });
2222 0         0 };
2223             }
2224              
2225             sub op_timestamp {
2226             return op_map(sub {
2227             return {
2228 0     0   0 value => $_,
2229             timestamp => Time::HiRes::time(),
2230             };
2231 0     0   0 });
2232             }
2233              
2234             sub op_to_array {
2235             return sub {
2236 0     0   0 my ($source) = @_;
2237              
2238             return rx_observable->new(sub {
2239 0         0 my ($subscriber) = @_;
2240              
2241 0         0 my @values;
2242              
2243             my $own_subscriber = {
2244             %$subscriber,
2245             next => sub {
2246 0         0 my ($v) = @_;
2247 0         0 push @values, $v;
2248             },
2249             complete => sub {
2250 0 0       0 $subscriber->{next}->(\@values) if defined $subscriber->{next};
2251 0 0       0 $subscriber->{complete}->() if defined $subscriber->{complete};
2252             },
2253 0         0 };
2254              
2255 0         0 $source->subscribe($own_subscriber);
2256              
2257 0         0 return;
2258 0         0 });
2259 0     0   0 };
2260             }
2261              
2262             sub op_with_latest_from {
2263 1     1   11 my (@other_observables) = @_;
2264              
2265             return sub {
2266 1     1   5 my ($source) = @_;
2267              
2268             return rx_observable->new(sub {
2269 1         4 my ($subscriber) = @_;
2270              
2271 1         5 my @other_observables = @other_observables;
2272 1         2 my $i = 0;
2273 1         5 my %didnt_emit = map {($i++, 1)} @other_observables;
  1         7  
2274 1         3 my @latest_values;
2275             my %other_subscriptions;
2276              
2277             $subscriber->subscription->add(
2278 1         7 \%other_subscriptions, sub { undef @other_observables },
2279 1         7 );
2280              
2281 1         6 for (my $i = 0; $i < @other_observables; $i++) {
2282 1         3 my $j = $i;
2283 1         3 my $other_observable = $other_observables[$j];
2284              
2285 1         5 my $other_subscription = RxPerl::Subscription->new;
2286 1         5 $other_subscriptions{$other_subscription} = $other_subscription;
2287             $other_observable->subscribe({
2288             new_subscription => $other_subscription,
2289             next => sub {
2290 4         13 my ($value) = @_;
2291              
2292 4         12 $latest_values[$j] = $value;
2293 4         14 delete $didnt_emit{$j};
2294             },
2295             error => $subscriber->{error},
2296 1         13 });
2297             }
2298              
2299             $source->subscribe({
2300             %$subscriber,
2301             next => sub {
2302 5         17 my ($value) = @_;
2303              
2304 5 100       16 if (! %didnt_emit) {
2305 4 50       24 $subscriber->{next}->([$value, @latest_values]) if defined $subscriber->{next};
2306             }
2307             },
2308 1         15 });
2309              
2310 1         7 return;
2311 1         10 });
2312 1         11 };
2313             }
2314              
2315             sub op_zip_with {
2316 1     1   2 my @other_sources = @_;
2317              
2318             return sub {
2319 1     1   2 my ($source) = @_;
2320              
2321 1         4 return rx_zip(
2322             $source,
2323             @other_sources,
2324             );
2325 1         6 };
2326             }
2327              
2328             sub _eqq {
2329 20     20   47 my ($x, $y) = @_;
2330              
2331 20 100       91 defined $x or return !defined $y;
2332 17 100       47 defined $y or return !!0;
2333 15 100       45 ref $x eq ref $y or return !!0;
2334 14 100       76 return length(ref $x) ? refaddr $x == refaddr $y : $x eq $y;
2335             }
2336              
2337             1;