File Coverage

blib/lib/RxPerl/Extras.pm
Criterion Covered Total %
statement 85 88 96.5
branch 16 26 61.5
condition 5 9 55.5
subroutine 13 13 100.0
pod 4 4 100.0
total 123 140 87.8


line stmt bran cond sub pod time code
1             package RxPerl::Extras;
2              
3 2     2   1178086 use strict;
  2         5  
  2         90  
4 2     2   13 use warnings;
  2         4  
  2         193  
5              
6 2     2   1177 use RxPerl::Operators::Creation 'rx_observable', 'rx_timer', 'rx_defer';
  2         23271  
  2         194  
7 2     2   943 use RxPerl::Operators::Pipeable 'op_map', 'op_take';
  2         15784  
  2         200  
8              
9 2     2   18 use Exporter 'import';
  2         3  
  2         2629  
10             our @EXPORT_OK = qw/
11             op_exhaust_all_with_latest op_exhaust_map_with_latest
12             op_throttle_time_with_both_leading_and_trailing
13             op_throttle_with_both_leading_and_trailing
14             /;
15             our %EXPORT_TAGS = (all => \@EXPORT_OK);
16              
17             our $VERSION = "v0.0.5";
18              
19             sub op_exhaust_all_with_latest {
20             return sub {
21 2     2   41 my ($source) = @_;
22              
23             return rx_observable->new(sub {
24 2         398 my ($subscriber) = @_;
25              
26 2         4 my $active_subscription;
27             my $big_completed;
28 2         9 my $own_subscription = RxPerl::Subscription->new;
29              
30 2         32 my ($owed_val, $val_is_owed);
31 2         0 my $helper_sub;
32              
33             $subscriber->subscription->add(
34             \$active_subscription,
35             $own_subscription,
36 2         362 sub { undef $helper_sub }, # break ref cycle
37 2         41 );
38              
39             $helper_sub = sub {
40 18         400 my ($new_obs) = @_;
41              
42 18 100       59 !$active_subscription or do {
43 10         51 $owed_val = $new_obs;
44 10         81 $val_is_owed = 1;
45 10         34 return;
46             };
47              
48 8         28 $active_subscription = RxPerl::Subscription->new;
49             my $small_subscriber = {
50             new_subscription => $active_subscription,
51             next => sub {
52 8 50       3589 $subscriber->{next}->(@_) if defined $subscriber->{next};
53             },
54             error => sub {
55 0 0       0 $subscriber->{error}->(@_) if defined $subscriber->{error};
56             },
57             complete => sub {
58 7         124 undef $active_subscription;
59 7 100       16 if ($val_is_owed) {
60 4         8 $val_is_owed = 0;
61 4         11 $helper_sub->($owed_val);
62             } else {
63 3 100 66     18 $subscriber->{complete}->() if $big_completed and defined $subscriber->{complete};
64             }
65             },
66 8         183 };
67 8         32 $new_obs->subscribe($small_subscriber);
68 2         81 };
69              
70             my $own_subscriber = {
71             new_subscription => $own_subscription,
72             next => $helper_sub,
73             error => sub {
74 0 0       0 $subscriber->{error}->(@_) if defined $subscriber->{error};
75             },
76             complete => sub {
77 1         25 $big_completed = 1;
78 1 50 33     9 $subscriber->{complete}->() if !$active_subscription and defined $subscriber->{complete};
79             },
80 2         41 };
81              
82 2         11 $source->subscribe($own_subscriber);
83              
84 2         872 return;
85 2         14 });
86             }
87 2     2 1 27 }
88              
89             sub op_exhaust_map_with_latest {
90 2     2 1 380957 my ($observable_factory) = @_;
91              
92             return sub {
93 2     2   126 my ($source) = @_;
94              
95             return $source->pipe(
96             op_map(sub {
97 14         9597 my @args = @_;
98             rx_defer(sub {
99 8         473 local $_ = $args[0];
100 8         24 $observable_factory->(@args);
101 14         87 });
102 2         19 }),
103             op_exhaust_all_with_latest(),
104             );
105 2         22 };
106             }
107              
108             sub op_throttle_time_with_both_leading_and_trailing {
109 3     3 1 17547 my ($duration) = @_;
110              
111 3     4   14 return op_throttle_with_both_leading_and_trailing(sub { rx_timer($duration) });
  4         14  
112             }
113              
114             sub op_throttle_with_both_leading_and_trailing {
115 3     3 1 7 my ($duration_selector) = @_;
116              
117             return sub {
118 3     3   48 my ($source) = @_;
119              
120             return rx_observable->new(sub {
121 3         412 my ($subscriber) = @_;
122              
123 3         9 my ($owed_val, $val_is_owed);
124 3         0 my ($helper_sub, $mini_subscriber, $mini_subscription);
125              
126 3         13 my $own_subscription = RxPerl::Subscription->new;
127              
128             $subscriber->subscription->add(
129             $own_subscription,
130             \$mini_subscription,
131 3         201 sub { undef $helper_sub }, # break ref cycle
132 3         30 );
133              
134             $mini_subscriber = {
135             error => sub {
136 0 0       0 $subscriber->{error}->(@_) if defined $subscriber->{error};
137             },
138             complete => sub {
139 2         281 undef $mini_subscription;
140 2 100       40 if ($val_is_owed) {
141 1         13 $val_is_owed = 0;
142 1         4 $helper_sub->($owed_val);
143             }
144             },
145 3         103 };
146              
147             $helper_sub = sub {
148 7         740 my ($v) = @_;
149              
150 7 100       23 if ($mini_subscription) {
151 3         6 $owed_val = $v;
152 3         7 $val_is_owed = 1;
153             } else {
154 4         6 $mini_subscription = do { local $_ = $v; $duration_selector->($v) }->pipe(
  4         8  
  4         13  
155             op_take(1),
156             )->subscribe($mini_subscriber);
157 4 50       1281 $subscriber->{next}->(@_) if defined $subscriber->{next};
158             }
159 3         13 };
160              
161             my $own_subscriber = {
162             new_subscription => $own_subscription,
163             %$subscriber,
164             next => $helper_sub,
165             complete => sub {
166 2 100 66     227 $subscriber->{next}->($owed_val), $val_is_owed = 0 if $val_is_owed and defined $subscriber->{next};
167 2 50       54 $subscriber->{complete}->() if defined $subscriber->{complete};
168             },
169 3         23 };
170              
171 3         13 $source->subscribe($own_subscriber);
172              
173 3         843 return;
174 3         18 });
175 3         23 };
176             }
177              
178              
179             1;
180             __END__