File Coverage

blib/lib/AnyEvent/ProcessPool/Pipeline.pm
Criterion Covered Total %
statement 31 31 100.0
branch n/a
condition 1 3 33.3
subroutine 9 9 100.0
pod 3 3 100.0
total 44 46 95.6


line stmt bran cond sub pod time code
1             package AnyEvent::ProcessPool::Pipeline;
2             # ABSTRACT: A simplified, straightforward way to parallelize tasks
3             $AnyEvent::ProcessPool::Pipeline::VERSION = '0.06';
4 1     1   142636 use strict;
  1         6  
  1         25  
5 1     1   4 use warnings;
  1         2  
  1         19  
6 1     1   245 use AnyEvent::ProcessPool;
  1         4  
  1         26  
7 1     1   27 use Try::Catch;
  1         3  
  1         48  
8              
9 1     1   5 use parent 'Exporter';
  1         3  
  1         7  
10              
11             our @EXPORT = qw(pipeline in out);
12              
13             sub pipeline (%) {
14 1     1 1 5 my %param = @_;
15 1         3 my $in = delete $param{in};
16 1         2 my $out = delete $param{out};
17 1   33     13 my $pool = delete $param{pool} || AnyEvent::ProcessPool->new(%param);
18 1         28 my $count = 0;
19              
20 1         3 my %pending;
21 1         8 while (my @task = $in->()) {
22 10         310 my $cv = $pool->async(@task);
23 10         69 $pending{$cv} = $cv;
24 10     10   87 $cv->cb(sub{ ++$count; $out->(shift) });
  10         134  
  10         71  
25             }
26              
27 1         46 $pool->join; # wait for all tasks to complete
28              
29 1         25 return $count;
30             }
31              
32 1     1 1 132 sub in (&) { return (in => $_[0]) }
33 1     1 1 4 sub out (&) { return (out => $_[0]) }
34              
35             1;
36              
37             __END__