File Coverage

blib/lib/AnyEvent/ProcessPool/Pipeline.pm
Criterion Covered Total %
statement 28 28 100.0
branch n/a
condition 1 3 33.3
subroutine 8 8 100.0
pod 3 3 100.0
total 40 42 95.2


line stmt bran cond sub pod time code
1             package AnyEvent::ProcessPool::Pipeline;
2             # ABSTRACT: A simplified, straightforward way to parallelize tasks
3             $AnyEvent::ProcessPool::Pipeline::VERSION = '0.06_001'; # TRIAL
4              
5 1     1   149561 $AnyEvent::ProcessPool::Pipeline::VERSION = '0.06001';use common::sense;
  1         16  
  1         5  
6 1     1   305 use AnyEvent::ProcessPool;
  1         3  
  1         29  
7 1     1   5 use Try::Catch;
  1         2  
  1         53  
8              
9 1     1   5 use parent 'Exporter';
  1         3  
  1         9  
10              
11             our @EXPORT = qw(pipeline in out);
12              
13             sub pipeline (%) {
14 1     1 1 4 my %param = @_;
15 1         2 my $in = delete $param{in};
16 1         3 my $out = delete $param{out};
17 1   33     11 my $pool = delete $param{pool} || AnyEvent::ProcessPool->new(%param);
18 1         2 my $count = 0;
19              
20 1         2 my %pending;
21 1         4 while (my @task = $in->()) {
22 10         383 my $cv = $pool->async(@task);
23 10         116 $pending{$cv} = $cv;
24 10     10   98 $cv->cb(sub{ ++$count; $out->(shift) });
  10         76  
  10         26  
25             }
26              
27 1         47 $pool->join; # wait for all tasks to complete
28              
29 1         20 return $count;
30             }
31              
32 1     1 1 99 sub in (&) { return (in => $_[0]) }
33 1     1 1 5 sub out (&) { return (out => $_[0]) }
34              
35             1;
36              
37             __END__