File Coverage

blib/lib/AnyEvent/ProcessPool/Pipeline.pm
Criterion Covered Total %
statement 28 28 100.0
branch n/a
condition 1 3 33.3
subroutine 8 8 100.0
pod 3 3 100.0
total 40 42 95.2


line stmt bran cond sub pod time code
1             package AnyEvent::ProcessPool::Pipeline;
2             # ABSTRACT: A simplified, straightforward way to parallelize tasks
3             $AnyEvent::ProcessPool::Pipeline::VERSION = '0.07';
4 1     1   172795 use common::sense;
  1         16  
  1         6  
5 1     1   379 use AnyEvent::ProcessPool;
  1         3  
  1         27  
6 1     1   5 use Try::Catch;
  1         1  
  1         41  
7              
8 1     1   5 use parent 'Exporter';
  1         2  
  1         7  
9              
10             our @EXPORT = qw(pipeline in out);
11              
12             sub pipeline (%) {
13 1     1 1 4 my %param = @_;
14 1         3 my $in = delete $param{in};
15 1         2 my $out = delete $param{out};
16 1   33     12 my $pool = delete $param{pool} || AnyEvent::ProcessPool->new(%param);
17 1         2 my $count = 0;
18              
19 1         2 my %pending;
20 1         7 while (my @task = $in->()) {
21 10         400 my $cv = $pool->async(@task);
22 10         124 $pending{$cv} = $cv;
23 10     10   148 $cv->cb(sub{ ++$count; $out->(shift) });
  10         107  
  10         55  
24             }
25              
26 1         46 $pool->join; # wait for all tasks to complete
27              
28 1         33 return $count;
29             }
30              
31 1     1 1 93 sub in (&) { return (in => $_[0]) }
32 1     1 1 4 sub out (&) { return (out => $_[0]) }
33              
34             1;
35              
36             __END__