File Coverage

blib/lib/AnyEvent/ProcessPool.pm
Criterion Covered Total %
statement 72 73 98.6
branch 13 18 72.2
condition 5 6 83.3
subroutine 13 13 100.0
pod 2 4 50.0
total 105 114 92.1


line stmt bran cond sub pod time code
1             package AnyEvent::ProcessPool;
2             # ABSTRACT: Asynchronously runs code concurrently in a pool of perl processes
3             $AnyEvent::ProcessPool::VERSION = '0.06_001'; # TRIAL
4              
5 2     2   162866 $AnyEvent::ProcessPool::VERSION = '0.06001';use common::sense;
  2         18  
  2         8  
6 2     2   89 use Carp;
  2         4  
  2         90  
7 2     2   1223 use AnyEvent;
  2         7697  
  2         72  
8 2     2   779 use AnyEvent::Util;
  2         17908  
  2         187  
9 2     2   640 use AnyEvent::ProcessPool::Process;
  2         7  
  2         68  
10 2     2   17 use AnyEvent::ProcessPool::Task;
  2         4  
  2         45  
11 2     2   10 use AnyEvent::ProcessPool::Util qw(next_id cpu_count);
  2         3  
  2         1543  
12              
13             sub new {
14 4     4 0 7938 my ($class, %param) = @_;
15              
16             my $self = bless {
17             workers => $param{workers} || cpu_count,
18             limit => $param{limit},
19             include => $param{include},
20 4   66     50 pid => $$,
21             pool => [], # array of AE::PP::Process objects
22             queue => [], # array of [id, code] tasks
23             complete => {}, # task_id => condvar: signals result to caller
24             pending => {}, # task_id => condvar: signals result internally
25             }, $class;
26              
27             # Initialize workers but do not yet wait for them to be started
28 4 100       25 if ($self->{limit}) {
29 3         10 $AnyEvent::Util::MAX_FORKS = $self->{limit};
30             }
31              
32 4         26 foreach (1 .. $self->{workers}) {
33 50         706 my $worker = AnyEvent::ProcessPool::Process->new(limit => $self->{limit});
34 50         67 push @{$self->{pool}}, $worker;
  50         123  
35             }
36              
37 4         31 return $self;
38             }
39              
40             sub join {
41 1     1 1 7 my $self = shift;
42 1         5 foreach my $task_id (keys %{$self->{complete}}) {
  1         15  
43 10 100       102 if (my $cv = $self->{complete}{$task_id}) {
44 2         25 $cv->recv;
45             }
46             }
47             }
48              
49             sub DESTROY {
50 4     4   5577 my ($self, $global) = @_;
51 4 50       24 return unless $self;
52 4 50       33 return unless $self->{pid} == $$;
53              
54             # Unblock watchers for any remaining pending tasks
55 4 50       14 if (ref $self->{pending}) {
56 4         9 foreach my $cv (values %{$self->{pending}}) {
  4         32  
57 0         0 $cv->croak('AnyEvent::ProcessPool destroyed with pending tasks remaining');
58             }
59             }
60              
61             # Terminate any workers still alive
62 4 50       15 if (ref $self->{pool}) {
63 4         9 foreach my $worker (@{$self->{pool}}) {
  4         17  
64 50 50       405 $worker->stop if $worker;
65             }
66             }
67             }
68              
69             sub async {
70 23     23 1 85 my $self = shift;
71 23         32 my $code = shift;
72 23         128 my $id = next_id;
73 23         817 my $task = AnyEvent::ProcessPool::Task->new($code, [@_]);
74 23         522 $self->{complete}{$id} = AE::cv;
75 23         6388 push @{$self->{queue}}, [$id, $task];
  23         129  
76 23         92 $self->process_queue;
77 23         674 return $self->{complete}{$id};
78             }
79              
80             sub process_queue {
81 46     46 0 107 my $self = shift;
82 46         87 my $queue = $self->{queue};
83 46         88 my $pool = $self->{pool};
84              
85 46 100 100     297 if (@$queue && @$pool) {
86 23         97 my ($id, $task) = @{shift @$queue};
  23         64  
87 23         78 my $worker = shift @$pool;
88              
89 23         81 $self->{pending}{$id} = $worker->run($task);
90              
91             $self->{pending}{$id}->cb(sub{
92 23     23   238 my $task = shift->recv;
93              
94 23 100       180 if ($task->failed) {
95 1         10 $self->{complete}{$id}->croak($task->result);
96             } else {
97 22         76 $self->{complete}{$id}->send($task->result);
98             }
99              
100 23         310 delete $self->{pending}{$id};
101 23         55 delete $self->{complete}{$id};
102              
103 23         34 push @{$self->{pool}}, $worker;
  23         78  
104 23         63 $self->process_queue;
105 23         438 });
106             }
107             }
108              
109             1;
110              
111             __END__