line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
package AnyEvent::ProcessPool::Pipeline; |
2
|
|
|
|
|
|
|
# ABSTRACT: A simplified, straightforward way to parallelize tasks |
3
|
|
|
|
|
|
|
$AnyEvent::ProcessPool::Pipeline::VERSION = '0.07'; |
4
|
1
|
|
|
1
|
|
172795
|
use common::sense; |
|
1
|
|
|
|
|
16
|
|
|
1
|
|
|
|
|
6
|
|
5
|
1
|
|
|
1
|
|
379
|
use AnyEvent::ProcessPool; |
|
1
|
|
|
|
|
3
|
|
|
1
|
|
|
|
|
27
|
|
6
|
1
|
|
|
1
|
|
5
|
use Try::Catch; |
|
1
|
|
|
|
|
1
|
|
|
1
|
|
|
|
|
41
|
|
7
|
|
|
|
|
|
|
|
8
|
1
|
|
|
1
|
|
5
|
use parent 'Exporter'; |
|
1
|
|
|
|
|
2
|
|
|
1
|
|
|
|
|
7
|
|
9
|
|
|
|
|
|
|
|
10
|
|
|
|
|
|
|
our @EXPORT = qw(pipeline in out); |
11
|
|
|
|
|
|
|
|
12
|
|
|
|
|
|
|
sub pipeline (%) { |
13
|
1
|
|
|
1
|
1
|
4
|
my %param = @_; |
14
|
1
|
|
|
|
|
3
|
my $in = delete $param{in}; |
15
|
1
|
|
|
|
|
2
|
my $out = delete $param{out}; |
16
|
1
|
|
33
|
|
|
12
|
my $pool = delete $param{pool} || AnyEvent::ProcessPool->new(%param); |
17
|
1
|
|
|
|
|
2
|
my $count = 0; |
18
|
|
|
|
|
|
|
|
19
|
1
|
|
|
|
|
2
|
my %pending; |
20
|
1
|
|
|
|
|
7
|
while (my @task = $in->()) { |
21
|
10
|
|
|
|
|
400
|
my $cv = $pool->async(@task); |
22
|
10
|
|
|
|
|
124
|
$pending{$cv} = $cv; |
23
|
10
|
|
|
10
|
|
148
|
$cv->cb(sub{ ++$count; $out->(shift) }); |
|
10
|
|
|
|
|
107
|
|
|
10
|
|
|
|
|
55
|
|
24
|
|
|
|
|
|
|
} |
25
|
|
|
|
|
|
|
|
26
|
1
|
|
|
|
|
46
|
$pool->join; # wait for all tasks to complete |
27
|
|
|
|
|
|
|
|
28
|
1
|
|
|
|
|
33
|
return $count; |
29
|
|
|
|
|
|
|
} |
30
|
|
|
|
|
|
|
|
31
|
1
|
|
|
1
|
1
|
93
|
sub in (&) { return (in => $_[0]) } |
32
|
1
|
|
|
1
|
1
|
4
|
sub out (&) { return (out => $_[0]) } |
33
|
|
|
|
|
|
|
|
34
|
|
|
|
|
|
|
1; |
35
|
|
|
|
|
|
|
|
36
|
|
|
|
|
|
|
__END__ |