line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
package AnyEvent::ProcessPool::Pipeline; |
2
|
|
|
|
|
|
|
# ABSTRACT: A simplified, straightforward way to parallelize tasks |
3
|
|
|
|
|
|
|
$AnyEvent::ProcessPool::Pipeline::VERSION = '0.06'; |
4
|
1
|
|
|
1
|
|
142636
|
use strict; |
|
1
|
|
|
|
|
6
|
|
|
1
|
|
|
|
|
25
|
|
5
|
1
|
|
|
1
|
|
4
|
use warnings; |
|
1
|
|
|
|
|
2
|
|
|
1
|
|
|
|
|
19
|
|
6
|
1
|
|
|
1
|
|
245
|
use AnyEvent::ProcessPool; |
|
1
|
|
|
|
|
4
|
|
|
1
|
|
|
|
|
26
|
|
7
|
1
|
|
|
1
|
|
27
|
use Try::Catch; |
|
1
|
|
|
|
|
3
|
|
|
1
|
|
|
|
|
48
|
|
8
|
|
|
|
|
|
|
|
9
|
1
|
|
|
1
|
|
5
|
use parent 'Exporter'; |
|
1
|
|
|
|
|
3
|
|
|
1
|
|
|
|
|
7
|
|
10
|
|
|
|
|
|
|
|
11
|
|
|
|
|
|
|
our @EXPORT = qw(pipeline in out); |
12
|
|
|
|
|
|
|
|
13
|
|
|
|
|
|
|
sub pipeline (%) { |
14
|
1
|
|
|
1
|
1
|
5
|
my %param = @_; |
15
|
1
|
|
|
|
|
3
|
my $in = delete $param{in}; |
16
|
1
|
|
|
|
|
2
|
my $out = delete $param{out}; |
17
|
1
|
|
33
|
|
|
13
|
my $pool = delete $param{pool} || AnyEvent::ProcessPool->new(%param); |
18
|
1
|
|
|
|
|
28
|
my $count = 0; |
19
|
|
|
|
|
|
|
|
20
|
1
|
|
|
|
|
3
|
my %pending; |
21
|
1
|
|
|
|
|
8
|
while (my @task = $in->()) { |
22
|
10
|
|
|
|
|
310
|
my $cv = $pool->async(@task); |
23
|
10
|
|
|
|
|
69
|
$pending{$cv} = $cv; |
24
|
10
|
|
|
10
|
|
87
|
$cv->cb(sub{ ++$count; $out->(shift) }); |
|
10
|
|
|
|
|
134
|
|
|
10
|
|
|
|
|
71
|
|
25
|
|
|
|
|
|
|
} |
26
|
|
|
|
|
|
|
|
27
|
1
|
|
|
|
|
46
|
$pool->join; # wait for all tasks to complete |
28
|
|
|
|
|
|
|
|
29
|
1
|
|
|
|
|
25
|
return $count; |
30
|
|
|
|
|
|
|
} |
31
|
|
|
|
|
|
|
|
32
|
1
|
|
|
1
|
1
|
132
|
sub in (&) { return (in => $_[0]) } |
33
|
1
|
|
|
1
|
1
|
4
|
sub out (&) { return (out => $_[0]) } |
34
|
|
|
|
|
|
|
|
35
|
|
|
|
|
|
|
1; |
36
|
|
|
|
|
|
|
|
37
|
|
|
|
|
|
|
__END__ |