File Coverage

blib/lib/AnyEvent/ForkManager.pm
Criterion Covered Total %
statement 125 130 96.1
branch 24 34 70.5
condition 8 14 57.1
subroutine 27 28 96.4
pod 4 13 30.7
total 188 219 85.8


line stmt bran cond sub pod time code
1             package AnyEvent::ForkManager;
2 15     15   601694 use 5.008_001;
  15         32  
  15         420  
3 15     15   48 use strict;
  15         29  
  15         295  
4 15     15   46 use warnings;
  15         18  
  15         447  
5              
6             our $VERSION = '0.05';
7              
8 15     15   1068 use AnyEvent;
  15         5775  
  15         295  
9 15     15   49 use Scalar::Util qw/weaken/;
  15         23  
  15         642  
10 15     15   6067 use POSIX qw/WNOHANG/;
  15         65394  
  15         63  
11 15     15   16397 use Time::HiRes ();
  15         15189  
  15         543  
12              
13             use Class::Accessor::Lite 0.04 (
14 15         83 ro => [
15             qw/max_workers manager_pid/,
16             ],
17             rw => [
18             qw/on_start on_finish on_error on_enqueue on_dequeue on_working_max/,
19             qw/process_queue running_worker process_cb wait_async/,
20             ],
21 15     15   6435 );
  15         11012  
22              
23 0     0 0 0 sub default_max_workers { 10 }
24              
25             sub new {
26 14     14 1 3745 my $class = shift;
27 14 50       91 my $arg = (@_ == 1) ? +shift : +{ @_ };
28 14   33     42 $arg->{max_workers} ||= $class->default_max_workers;
29              
30 14         91 bless(+{
31             %$arg,
32             manager_pid => $$,
33             } => $class)->init;
34             }
35              
36             sub init {
37 14     14 0 28 my $self = shift;
38              
39 14         49 $self->process_queue([]);
40 14         140 $self->running_worker(+{});
41 14         63 $self->process_cb(+{});
42              
43 14         70 return $self;
44             }
45              
46 106     106 0 316 sub is_child { shift->manager_pid != $$ }
47             sub is_working_max {
48 142     142 0 173 my $self = shift;
49              
50 142         243 $self->num_workers >= $self->max_workers;
51             }
52              
53             sub num_workers {
54 373     373 0 100427 my $self = shift;
55 373         320 return scalar keys %{ $self->running_worker };
  373         881  
56             }
57              
58             sub num_queues {
59 36     36 0 4853 my $self = shift;
60 36         45 return scalar @{ $self->process_queue };
  36         109  
61             }
62              
63             sub start {
64 94     94 1 45580 my $self = shift;
65 94 100       326 my $arg = (@_ == 1) ? +shift : +{ @_ };
66              
67 94 50       225 die "\$fork_manager->start() should be called within the manager process\n"
68             if $self->is_child;
69              
70 94 100       779 if ($self->is_working_max) {## child working max
71 40         280 $self->_run_cb('on_working_max' => @{ $arg->{args} });
  40         115  
72 40         92155 $self->enqueue($arg);
73 40         230 return;
74             }
75             else {## create child process
76 54         32117 my $pid = fork;
77              
78 54 50       1716 if (not(defined $pid)) {
    100          
79 0         0 $self->_run_cb('on_error' => @{ $arg->{args} });
  0         0  
80 0         0 return;
81             }
82             elsif ($pid) {
83             # parent
84 42         247 $self->_run_cb('on_start' => $pid, @{ $arg->{args} });
  42         1303  
85 42         564105 $self->process_cb->{$pid} = $self->_create_callback(@{ $arg->{args} });
  42         803  
86 42         479 $self->running_worker->{$pid} = AnyEvent->child(
87             pid => $pid,
88             cb => $self->process_cb->{$pid},
89             );
90              
91             # delete worker watcher if already finished child process.
92 42 50       23562 delete $self->running_worker->{$pid} unless exists $self->process_cb->{$pid};
93              
94 42         946 return $pid;
95             }
96             else {
97             # child
98 12         347 $arg->{cb}->($self, @{ $arg->{args} });
  12         624  
99 0         0 $self->finish;
100             }
101             }
102             }
103              
104             sub _create_callback {
105 42     42   132 my($self, @args) = @_;
106              
107 42         181 weaken($self);
108             return sub {
109 32     32   1697312 my ($pid, $status) = @_;
110 32         183 delete $self->running_worker->{$pid};
111 32         470 delete $self->process_cb->{$pid};
112 32         345 $self->_run_cb('on_finish' => $pid, $status, @args);
113              
114 32 100       51602 if ($self->num_queues) {
115             ## dequeue
116 28         221 $self->dequeue;
117             }
118 42         702 };
119             }
120              
121             sub finish {
122 12     12 0 210538 my ($self, $exit_code) = @_;
123 12 50       123 die "\$fork_manager->finish() shouln't be called within the manager process\n"
124             unless $self->is_child;
125              
126 12   50     3100 exit($exit_code || 0);
127             }
128              
129             sub enqueue {
130 40     40 0 75 my($self, $arg) = @_;
131              
132 40         75 $self->_run_cb('on_enqueue' => @{ $arg->{args} });
  40         110  
133 40         82985 push @{ $self->process_queue } => $arg;
  40         135  
134             }
135              
136             sub dequeue {
137 28     28 0 40 my $self = shift;
138              
139 28         85 until ($self->is_working_max) {
140 28 50       256 last unless @{ $self->process_queue };
  28         60  
141              
142             # dequeue
143 28 50       128 if (my $arg = shift @{ $self->process_queue }) {
  28         60  
144 28         128 $self->_run_cb('on_dequeue' => @{ $arg->{args} });
  28         81  
145 28         85947 $self->start($arg);
146             }
147             }
148             }
149              
150             sub signal_all_children {
151 10     10 1 5650 my ($self, $sig) = @_;
152 10         20 foreach my $pid (sort keys %{ $self->running_worker }) {
  10         55  
153 20         605 kill $sig, $pid;
154             }
155             }
156              
157             sub wait_all_children {
158 10     10 1 145 my $self = shift;
159 10 50       100 my $arg = (@_ == 1) ? +shift : +{ @_ };
160              
161 10         25 my $cb = $arg->{cb};
162 10 100       135 if ($arg->{blocking}) {
163 5         10 $self->_wait_all_children_with_blocking;
164 1         20 $self->$cb;
165             }
166             else {
167 5 50       20 die 'cannot call.' if $self->wait_async;
168              
169 5         50 my $super = $self->on_finish;
170              
171 5         35 weaken($self);
172             $self->on_finish(
173             sub {
174 16     16   92 $super->(@_);
175 16 100 66     57963 if ($self->num_workers == 0 and $self->num_queues == 0) {
176 1         12 $self->$cb;
177 1         2212 $self->on_finish($super);
178 1         7 $self->wait_async(0);
179             }
180             }
181 5         70 );
182              
183 5         35 $self->wait_async(1);
184             }
185             }
186              
187             sub _run_cb {
188 182     182   327 my $self = shift;
189 182         619 my $name = shift;
190              
191 182         1631 my $cb = $self->$name();
192 182 50       1354 if ($cb) {
193 182         904 $self->$cb(@_);
194             }
195             }
196              
197             our $WAIT_INTERVAL = 0.1 * 1000 * 1000;
198             sub _wait_all_children_with_blocking {
199 5     5   10 my $self = shift;
200              
201 5   66     10 until ($self->num_workers == 0 and $self->num_queues == 0) {
202 30         539 my($pid, $status) = _wait_with_status(-1, WNOHANG);
203 30 100 66     165 if ($pid and exists $self->running_worker->{$pid}) {
204 16         167 $self->process_cb->{$pid}->($pid, $status);
205             }
206             }
207             continue {
208             # retry interval
209 26         1798334 Time::HiRes::usleep( $WAIT_INTERVAL );
210             }
211             }
212              
213             # function
214             sub _wait_with_status {## blocking
215 30     30   47 my($waitpid, $option) = @_;
216              
217 15     15   19905 use vmsish 'status';
  15         120  
  15         589  
218 30         192 local $?;
219              
220 30         551 my $pid = waitpid($waitpid, $option);
221 30         115 return ($pid, $?);
222             }
223              
224             1;
225             __END__