File Coverage

blib/lib/AnyEvent/ProcessPool.pm
Criterion Covered Total %
statement 64 65 98.4
branch 8 12 66.6
condition 5 6 83.3
subroutine 12 12 100.0
pod 2 4 50.0
total 91 99 91.9


line stmt bran cond sub pod time code
1             package AnyEvent::ProcessPool;
2             # ABSTRACT: Asynchronously runs code concurrently in a pool of perl processes
3             $AnyEvent::ProcessPool::VERSION = '0.07';
4 2     2   173485 use common::sense;
  2         16  
  2         9  
5 2     2   98 use Carp;
  2         4  
  2         94  
6 2     2   1564 use AnyEvent;
  2         8545  
  2         55  
7 2     2   695 use AnyEvent::ProcessPool::Process;
  2         5  
  2         68  
8 2     2   12 use AnyEvent::ProcessPool::Task;
  2         3  
  2         43  
9 2     2   9 use AnyEvent::ProcessPool::Util qw(next_id cpu_count);
  2         3  
  2         1419  
10              
11             sub new {
12 4     4 0 8691 my ($class, %param) = @_;
13              
14             my $self = bless {
15             workers => $param{workers} || cpu_count,
16             limit => $param{limit},
17             include => $param{include},
18 4   66     44 pool => [], # array of AE::PP::Process objects
19             queue => [], # array of [id, code] tasks
20             complete => {}, # task_id => condvar: signals result to caller
21             pending => {}, # task_id => condvar: signals result internally
22             }, $class;
23              
24             # Initialize workers but do not yet wait for them to be started
25 4         40 foreach (1 .. $self->{workers}) {
26             my $worker = AnyEvent::ProcessPool::Process->new(
27             limit => $self->{limit},
28             include => $self->{include},
29 50         148 );
30 50         764 push @{$self->{pool}}, $worker;
  50         92  
31             }
32              
33 4         23 return $self;
34             }
35              
36             sub join {
37 1     1 1 4 my $self = shift;
38 1         8 foreach my $task_id (keys %{$self->{complete}}) {
  1         22  
39 10 100       58 if (my $cv = $self->{complete}{$task_id}) {
40 3         10 $cv->recv;
41             }
42             }
43             }
44              
45             sub DESTROY {
46 4     4   4716 my ($self, $global) = @_;
47              
48 4 50       17 if ($self) {
49             # Unblock watchers for any remaining pending tasks
50 4 50       28 if (ref $self->{pending}) {
51 4         14 foreach my $cv (values %{$self->{pending}}) {
  4         36  
52 0         0 $cv->croak('AnyEvent::ProcessPool destroyed with pending tasks remaining');
53             }
54             }
55              
56             # Terminate any workers still alive
57 4 50       26 if (ref $self->{pool}) {
58 4         8 foreach my $worker (@{$self->{pool}}) {
  4         15  
59 50 50       116 $worker->stop if $worker;
60             }
61             }
62             }
63             }
64              
65             sub async {
66 23     23 1 125 my ($self, $code, @args) = @_;
67 23         154 my $id = next_id;
68 23         778 my $task = AnyEvent::ProcessPool::Task->new($code, \@args);
69 23         511 $self->{complete}{$id} = AE::cv;
70 23         6945 push @{$self->{queue}}, [$id, $task];
  23         81  
71 23         69 $self->process_queue;
72 23         610 return $self->{complete}{$id};
73             }
74              
75             sub process_queue {
76 46     46 0 117 my $self = shift;
77 46         98 my $queue = $self->{queue};
78 46         114 my $pool = $self->{pool};
79              
80 46   100     269 while (@$queue && @$pool) {
81 23         94 my ($id, $task) = @{shift @$queue};
  23         83  
82 23         103 my $worker = shift @$pool;
83              
84 23         128 $self->{pending}{$id} = $worker->run($task);
85              
86             $self->{pending}{$id}->cb(sub{
87 23     23   342 my $task = shift->recv;
88              
89 23 100       296 if ($task->failed) {
90 1         14 $self->{complete}{$id}->croak($task->result);
91             } else {
92 22         116 $self->{complete}{$id}->send($task->result);
93             }
94              
95 23         422 delete $self->{pending}{$id};
96 23         79 delete $self->{complete}{$id};
97              
98 23         68 push @$pool, $worker;
99 23         91 $self->process_queue;
100 23         445 });
101             }
102             }
103              
104             1;
105              
106             __END__