File Coverage

blib/lib/AnyEvent/ProcessPool.pm
Criterion Covered Total %
statement 67 68 98.5
branch 8 12 66.6
condition 5 6 83.3
subroutine 13 13 100.0
pod 2 4 50.0
total 95 103 92.2


line stmt bran cond sub pod time code
1             package AnyEvent::ProcessPool;
2             # ABSTRACT: Asynchronously runs code concurrently in a pool of perl processes
3             $AnyEvent::ProcessPool::VERSION = '0.06';
4 2     2   142572 use strict;
  2         9  
  2         45  
5 2     2   9 use warnings;
  2         4  
  2         34  
6 2     2   8 use Carp;
  2         4  
  2         86  
7 2     2   1278 use AnyEvent;
  2         7609  
  2         57  
8 2     2   525 use AnyEvent::ProcessPool::Process;
  2         5  
  2         67  
9 2     2   11 use AnyEvent::ProcessPool::Task;
  2         4  
  2         43  
10 2     2   8 use AnyEvent::ProcessPool::Util qw(next_id cpu_count);
  2         2  
  2         881  
11              
12             sub new {
13 4     4 0 12274 my ($class, %param) = @_;
14              
15             my $self = bless {
16             workers => $param{workers} || cpu_count,
17             limit => $param{limit},
18             include => $param{include},
19 4   66     43 pool => [], # array of AE::PP::Process objects
20             queue => [], # array of [id, code] tasks
21             complete => {}, # task_id => condvar: signals result to caller
22             pending => {}, # task_id => condvar: signals result internally
23             }, $class;
24              
25             # Initialize workers but do not yet wait for them to be started
26 4         34 foreach (1 .. $self->{workers}) {
27             my $worker = AnyEvent::ProcessPool::Process->new(
28             limit => $self->{limit},
29             include => $self->{include},
30 50         139 );
31 50         875 push @{$self->{pool}}, $worker;
  50         103  
32             }
33              
34 4         30 return $self;
35             }
36              
37             sub join {
38 1     1 1 7 my $self = shift;
39 1         4 foreach my $task_id (keys %{$self->{complete}}) {
  1         14  
40 10 100       50 if (my $cv = $self->{complete}{$task_id}) {
41 2         12 $cv->recv;
42             }
43             }
44             }
45              
46             sub DESTROY {
47 4     4   3170 my ($self, $global) = @_;
48              
49 4 50       17 if ($self) {
50             # Unblock watchers for any remaining pending tasks
51 4 50       24 if (ref $self->{pending}) {
52 4         8 foreach my $cv (values %{$self->{pending}}) {
  4         21  
53 0         0 $cv->croak('AnyEvent::ProcessPool destroyed with pending tasks remaining');
54             }
55             }
56              
57             # Terminate any workers still alive
58 4 50       17 if (ref $self->{pool}) {
59 4         9 foreach my $worker (@{$self->{pool}}) {
  4         12  
60 50 50       124 $worker->stop if $worker;
61             }
62             }
63             }
64             }
65              
66             sub async {
67 23     23 1 116 my ($self, $code, @args) = @_;
68 23         119 my $id = next_id;
69 23         729 my $task = AnyEvent::ProcessPool::Task->new($code, \@args);
70 23         680 $self->{complete}{$id} = AE::cv;
71 23         7761 push @{$self->{queue}}, [$id, $task];
  23         99  
72 23         84 $self->process_queue;
73 23         408 return $self->{complete}{$id};
74             }
75              
76             sub process_queue {
77 46     46 0 111 my $self = shift;
78 46         103 my $queue = $self->{queue};
79 46         112 my $pool = $self->{pool};
80              
81 46   100     283 while (@$queue && @$pool) {
82 23         54 my ($id, $task) = @{shift @$queue};
  23         76  
83 23         66 my $worker = shift @$pool;
84              
85 23         98 $self->{pending}{$id} = $worker->run($task);
86              
87             $self->{pending}{$id}->cb(sub{
88 23     23   435 my $task = shift->recv;
89              
90 23 100       304 if ($task->failed) {
91 1         11 $self->{complete}{$id}->croak($task->result);
92             } else {
93 22         162 $self->{complete}{$id}->send($task->result);
94             }
95              
96 23         473 delete $self->{pending}{$id};
97 23         70 delete $self->{complete}{$id};
98              
99 23         73 push @$pool, $worker;
100 23         103 $self->process_queue;
101 23         339 });
102             }
103             }
104              
105             1;
106              
107             __END__