line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
package BusyBird::Flow; |
2
|
12
|
|
|
12
|
|
30980
|
use v5.8.0; |
|
12
|
|
|
|
|
36
|
|
|
12
|
|
|
|
|
527
|
|
3
|
12
|
|
|
12
|
|
55
|
use strict; |
|
12
|
|
|
|
|
20
|
|
|
12
|
|
|
|
|
364
|
|
4
|
12
|
|
|
12
|
|
54
|
use warnings; |
|
12
|
|
|
|
|
21
|
|
|
12
|
|
|
|
|
378
|
|
5
|
12
|
|
|
12
|
|
6715
|
use Async::Queue; |
|
12
|
|
|
|
|
15108
|
|
|
12
|
|
|
|
|
408
|
|
6
|
12
|
|
|
12
|
|
596
|
use BusyBird::Log qw(bblog); |
|
12
|
|
|
|
|
18
|
|
|
12
|
|
|
|
|
624
|
|
7
|
12
|
|
|
12
|
|
6413
|
use CPS qw(kforeach); |
|
12
|
|
|
|
|
40784
|
|
|
12
|
|
|
|
|
833
|
|
8
|
12
|
|
|
12
|
|
92
|
use Carp; |
|
12
|
|
|
|
|
24
|
|
|
12
|
|
|
|
|
650
|
|
9
|
12
|
|
|
12
|
|
99
|
use Scalar::Util qw(weaken); |
|
12
|
|
|
|
|
17
|
|
|
12
|
|
|
|
|
494
|
|
10
|
12
|
|
|
12
|
|
994
|
use Try::Tiny; |
|
12
|
|
|
|
|
1394
|
|
|
12
|
|
|
|
|
5075
|
|
11
|
|
|
|
|
|
|
|
12
|
|
|
|
|
|
|
sub new { |
13
|
230
|
|
|
230
|
1
|
449
|
my ($class) = @_; |
14
|
230
|
|
|
|
|
1111
|
my $self = bless { |
15
|
|
|
|
|
|
|
filters => [], |
16
|
|
|
|
|
|
|
}, $class; |
17
|
230
|
|
|
|
|
713
|
$self->{queue} = $self->_create_queue(); |
18
|
230
|
|
|
|
|
18912
|
return $self; |
19
|
|
|
|
|
|
|
} |
20
|
|
|
|
|
|
|
|
21
|
|
|
|
|
|
|
sub _create_queue { |
22
|
230
|
|
|
230
|
|
346
|
my ($self) = @_; |
23
|
230
|
|
|
|
|
821
|
weaken $self; |
24
|
|
|
|
|
|
|
return Async::Queue->new(concurrency => 1, worker => sub { |
25
|
183
|
|
|
183
|
|
11731
|
my ($data, $done) = @_; |
26
|
|
|
|
|
|
|
kforeach $self->{filters}, sub { |
27
|
89
|
|
|
|
|
5034
|
my ($filter, $knext) = @_; |
28
|
|
|
|
|
|
|
try { |
29
|
|
|
|
|
|
|
$filter->($data, sub { |
30
|
87
|
|
|
|
|
21506
|
my ($result) = @_; |
31
|
87
|
100
|
100
|
|
|
594
|
if(ref($result) && ref($result) eq 'ARRAY') { |
32
|
71
|
|
|
|
|
107
|
$data = $result; |
33
|
|
|
|
|
|
|
}else { |
34
|
16
|
|
|
|
|
76
|
bblog('warn', 'The filter did not return an array-ref. Ignored.'); |
35
|
|
|
|
|
|
|
} |
36
|
87
|
|
|
|
|
308
|
$knext->(); |
37
|
89
|
|
|
|
|
2981
|
}); |
38
|
|
|
|
|
|
|
}catch { |
39
|
2
|
|
|
|
|
53
|
my ($e) = @_; |
40
|
2
|
|
|
|
|
11
|
bblog('error', "Filter dies: $e"); |
41
|
2
|
|
|
|
|
11
|
$knext->(); |
42
|
89
|
|
|
|
|
638
|
}; |
43
|
|
|
|
|
|
|
}, sub { |
44
|
183
|
|
|
|
|
12010
|
$done->($data); |
45
|
183
|
|
|
|
|
1840
|
}; |
46
|
230
|
|
|
|
|
2233
|
}); |
47
|
|
|
|
|
|
|
} |
48
|
|
|
|
|
|
|
|
49
|
|
|
|
|
|
|
sub add { |
50
|
57
|
|
|
57
|
1
|
83
|
my ($self, $async_filter) = @_; |
51
|
57
|
50
|
|
|
|
234
|
if($self->{queue}->running) { |
52
|
0
|
|
|
|
|
0
|
croak "You cannot add a filter while there is a status running in it." |
53
|
|
|
|
|
|
|
} |
54
|
57
|
|
|
|
|
286
|
push(@{$self->{filters}}, $async_filter); |
|
57
|
|
|
|
|
188
|
|
55
|
|
|
|
|
|
|
} |
56
|
|
|
|
|
|
|
|
57
|
|
|
|
|
|
|
sub execute { |
58
|
183
|
|
|
183
|
1
|
316
|
my ($self, $data, $callback) = @_; |
59
|
183
|
|
|
|
|
1018
|
$self->{queue}->push($data, $callback); |
60
|
|
|
|
|
|
|
} |
61
|
|
|
|
|
|
|
|
62
|
|
|
|
|
|
|
|
63
|
|
|
|
|
|
|
1; |
64
|
|
|
|
|
|
|
|
65
|
|
|
|
|
|
|
__END__ |