File Coverage

blib/lib/BusyBird/Timeline.pm
Criterion Covered Total %
statement 166 170 97.6
branch 36 42 85.7
condition 20 27 74.0
subroutine 41 41 100.0
pod 15 15 100.0
total 278 295 94.2


line stmt bran cond sub pod time code
1             package BusyBird::Timeline;
2 11     11   10724 use strict;
  11         18  
  11         335  
3 11     11   41 use warnings;
  11         10  
  11         269  
4 11     11   2092 use BusyBird::Util qw(set_param);
  11         110  
  11         568  
5 11     11   49 use BusyBird::Log qw(bblog);
  11         12  
  11         386  
6 11     11   3672 use BusyBird::Flow;
  11         24  
  11         288  
7 11     11   3692 use BusyBird::Watcher::Aggregator;
  11         26  
  11         309  
8 11     11   58 use BusyBird::DateTime::Format 0.04;
  11         187  
  11         198  
9 11     11   3456 use BusyBird::Config;
  11         27  
  11         392  
10 11     11   69 use Async::Selector 1.0;
  11         223  
  11         214  
11 11     11   4774 use Data::UUID;
  11         6224  
  11         601  
12 11     11   56 use Carp;
  11         14  
  11         547  
13 11     11   4724 use Storable qw(dclone);
  11         22127  
  11         687  
14 11     11   60 use Scalar::Util qw(weaken looks_like_number);
  11         12  
  11         542  
15 11     11   49 use DateTime;
  11         14  
  11         16263  
16              
17             our @CARP_NOT = qw(BusyBird::Config);
18              
19             sub new {
20 229     229 1 8631 my ($class, %args) = @_;
21 229         1192 my $self = bless {
22             filter_flow => BusyBird::Flow->new,
23             selector => Async::Selector->new,
24             unacked_counts => {total => 0},
25             config => BusyBird::Config->new(type => "timeline", with_default => 0),
26             id_generator => Data::UUID->new,
27             }, $class;
28 229         1219 $self->set_param(\%args, 'name', undef, 1);
29 229         668 $self->set_param(\%args, 'storage', undef, 1);
30 229         581 $self->set_param(\%args, 'watcher_max', 512);
31 229 50       680 croak 'name must not be empty' if $self->{name} eq '';
32 229         685 $self->_init_selector();
33 229         2951 $self->_update_unacked_counts();
34 229         9870 return $self;
35             }
36              
37             sub _log {
38 11     11   17 my ($self, $level, $msg) = @_;
39 11         20 bblog($level, $self->name . ": $msg");
40             }
41              
42             sub _update_unacked_counts {
43 546     546   943 my ($self) = @_;
44             $self->get_unacked_counts(callback => sub {
45 546     546   1495 my ($error, $unacked_counts) = @_;
46 546 100       1489 if(defined($error)) {
47 11         35 $self->_log('error', "error while updating unacked count: $error");
48 11         39 return;
49             }
50 535         1255 $self->{unacked_counts} = $unacked_counts;
51 535         3039 $self->{selector}->trigger('unacked_counts');
52 546         3114 });
53             }
54              
55             sub _init_selector {
56 229     229   349 my ($self) = @_;
57 229         703 weaken $self;
58             $self->{selector}->register(unacked_counts => sub {
59 212     212   7969 my ($exp_unacked_counts) = @_;
60 212 50 33     1121 if(!defined($exp_unacked_counts) || ref($exp_unacked_counts) ne 'HASH') {
61 0         0 croak "unacked_counts watcher: condition input must be a hash-ref";
62             }
63 212 100       738 return { %{$self->{unacked_counts}} } if !%$exp_unacked_counts;
  10         50  
64 202         454 foreach my $key (keys %$exp_unacked_counts) {
65 235   100     638 my $exp_val = $exp_unacked_counts->{$key} || 0;
66 235   100     661 my $got_val = $self->{unacked_counts}{$key} || 0;
67 235 100       589 return { %{$self->{unacked_counts}} } if $exp_val != $got_val;
  89         450  
68             }
69 113         246 return undef;
70 229         1771 });
71             $self->{selector}->register(watcher_quota => sub {
72 302     302   11194 my ($in) = @_;
73 302         717 my @watchers = $self->{selector}->watchers('watcher_quota');
74 302 100       7646 if(int(@watchers) <= $self->{watcher_max}) {
75 289         582 return undef;
76             }
77 13   100     37 my $watcher_age = $in->{age} || 0;
78 13 100       37 return $watcher_age > $self->{watcher_max} ? 1 : undef;
79 229         5923 });
80             }
81              
82             sub name {
83 1857     1857 1 45272 return shift->{name};
84             }
85              
86             sub _get_from_storage {
87 917     917   1369 my ($self, $method, $args_ref) = @_;
88 917         2256 $args_ref->{timeline} = $self->name;
89 917         2871 local @CARP_NOT = (ref($self->{storage}));
90 917         4725 $self->{storage}->$method(%$args_ref);
91             }
92              
93             sub get_statuses {
94 310     310 1 210965 my ($self, %args) = @_;
95 310         914 $self->_get_from_storage("get_statuses", \%args);
96             }
97              
98             sub get_unacked_counts {
99 590     590 1 12573 my ($self, %args) = @_;
100 590         1532 $self->_get_from_storage("get_unacked_counts", \%args);
101             }
102              
103             sub _write_statuses {
104 319     319   554 my ($self, $method, $args_ref) = @_;
105 319         886 $args_ref->{timeline} = $self->name;
106 319         1174 local @CARP_NOT = (ref($self->{storage}));
107 319         617 my $orig_callback = $args_ref->{callback};
108             $self->{storage}->$method(%$args_ref, callback => sub {
109 317     317   1785 $self->_update_unacked_counts();
110 317 100       9733 goto $orig_callback if defined($orig_callback);
111 319         2648 });
112             }
113              
114             sub put_statuses {
115 32     32 1 54021 my ($self, %args) = @_;
116 32         110 $self->_write_statuses('put_statuses', \%args);
117             }
118              
119             sub delete_statuses {
120 19     19 1 89730 my ($self, %args) = @_;
121 19         82 $self->_write_statuses('delete_statuses', \%args);
122             }
123              
124             sub ack_statuses {
125 85     85 1 66444 my ($self, %args) = @_;
126 85         336 $self->_write_statuses('ack_statuses', \%args);
127             }
128              
129             sub add_statuses {
130 183     183 1 507971 my ($self, %args) = @_;
131 183 50       737 if(!defined($args{statuses})) {
132 0         0 croak "statuses argument is mandatory";
133             }
134 183         454 my $ref = ref($args{statuses});
135 183 100       936 if($ref eq "HASH") {
    50          
136 9         29 $args{statuses} = [ $args{statuses} ];
137             }elsif($ref ne "ARRAY") {
138 0         0 croak "statuses argument must be a status or an array-ref of statuses";
139             }
140 183         11701 my $statuses = dclone($args{statuses});
141 183         396 my $final_callback = $args{callback};
142             $self->{filter_flow}->execute($statuses, sub {
143 183     183   1018 my $filter_result = shift;
144 183         255 my $cur_time;
145 183         433 foreach my $status (@$filter_result) {
146 1521 50 33     6287 next if !defined($status) || ref($status) ne 'HASH';
147 1521 100       2398 if(!defined($status->{id})) {
148 19         52 $status->{id} = sprintf('busybird://%s/%s', $self->name, $self->{id_generator}->create_str);
149             }
150 1521 100       2689 if(!defined($status->{created_at})) {
151 16   66     127 $cur_time ||= DateTime->now;
152 16         3603 $status->{created_at} = BusyBird::DateTime::Format->format_datetime($cur_time);
153             }
154             }
155 183         3701 $self->_write_statuses('put_statuses', {
156             mode => 'insert', statuses => $filter_result,
157             callback => $final_callback
158             });
159 183         1657 });
160             }
161              
162             sub add {
163 29     29 1 18697 my ($self, $statuses, $callback) = @_;
164 29         107 $self->add_statuses(statuses => $statuses, callback => $callback);
165             }
166              
167             sub contains {
168 17     17 1 41258 my ($self, %args) = @_;
169 17         64 $self->_get_from_storage("contains", \%args);
170             }
171              
172             sub add_filter {
173 57     57 1 22689 my ($self, $filter, $is_async) = @_;
174 57 100       168 if(!$is_async) {
175 33         45 my $sync_filter = $filter;
176             $filter = sub {
177 50     50   86 my ($statuses, $done) = @_;
178 50         141 @_ = $sync_filter->($statuses);
179 48         1924 goto $done;
180 33         139 };
181             }
182 57         217 $self->{filter_flow}->add($filter);
183             }
184              
185             sub add_filter_async {
186 8     8 1 53 my ($self, $filter) = @_;
187 8         21 $self->add_filter($filter, 1);
188             }
189              
190             sub set_config {
191 12     12 1 616 shift()->{config}->set_config(@_);
192             }
193              
194             sub get_config {
195 330     330 1 1606 shift()->{config}->get_config(@_);
196             }
197              
198             sub watch_unacked_counts {
199 170     170 1 149497 my ($self, %watch_args) = @_;
200 170         294 my $callback = $watch_args{callback};
201 170         212 my $assumed = $watch_args{assumed};
202 170 100 100     989 if(!defined($callback) || ref($callback) ne 'CODE') {
203 10         835 croak "watch_unacked_counts: callback must be a code-ref";
204             }
205 160 100 100     672 if(!defined($assumed) || ref($assumed) ne 'HASH') {
206 10         995 croak "watch_unacked_counts: assumed must be a hash-ref";
207             }
208 150         446 $assumed = +{ %$assumed };
209 150         400 foreach my $key (keys %$assumed) {
210 192 100 66     1210 next if $key eq 'total' || (looks_like_number($key) && int($key) == $key);
      66        
211 15         33 delete $assumed->{$key};
212             }
213 150         601 my $watcher = BusyBird::Watcher::Aggregator->new();
214             my $orig_watcher = $self->{selector}->watch(
215             unacked_counts => $assumed, watcher_quota => { age => 0 }, sub {
216 103     103   915 my ($orig_w, %res) = @_;
217 103 100       263 if($res{watcher_quota}) {
218 4         13 $watcher->cancel();
219 4         128 $callback->("watcher cancelled because it is too old", $watcher);
220 4         21 return;
221             }
222 99 50       235 if($res{unacked_counts}) {
223 99         272 $callback->(undef, $watcher, $res{unacked_counts});
224 99         51217 return;
225             }
226 0         0 confess("Something terrible happened.");
227             }
228 150         1930 );
229 150         1315 $watcher->add($orig_watcher);
230 150 100       4563 if($watcher->active) {
231 112         611 my @quota_watchers = $self->{selector}->watchers('watcher_quota');
232 112         2490 foreach my $w (@quota_watchers) {
233 152         274 my %cond = $w->conditions;
234 152         769 $cond{watcher_quota}{age}++;
235             }
236 112         302 $self->{selector}->trigger('watcher_quota');
237             }
238 150         1195 return $watcher;
239             }
240              
241              
242             1;
243              
244             __END__