File Coverage

blib/lib/AnyEvent/ForkManager.pm
Criterion Covered Total %
statement 120 129 93.0
branch 23 34 67.6
condition 8 14 57.1
subroutine 26 28 92.8
pod 4 13 30.7
total 181 218 83.0


line stmt bran cond sub pod time code
1             package AnyEvent::ForkManager;
2 15     15   1786996 use 5.008_001;
  15         102  
3 15     15   75 use strict;
  15         30  
  15         294  
4 15     15   75 use warnings;
  15         29  
  15         724  
5              
6             our $VERSION = '0.07';
7              
8 15     15   1058 use AnyEvent;
  15         5329  
  15         506  
9 15     15   84 use Scalar::Util qw/weaken/;
  15         23  
  15         895  
10 15     15   595 use POSIX qw/WNOHANG/;
  15         6233  
  15         75  
11 15     15   29740 use Time::HiRes ();
  15         18731  
  15         748  
12              
13             use Class::Accessor::Lite 0.04 (
14 15         119 ro => [
15             qw/max_workers manager_pid/,
16             ],
17             rw => [
18             qw/on_start on_finish on_error on_enqueue on_dequeue on_working_max/,
19             qw/process_queue running_worker process_cb wait_async/,
20             ],
21 15     15   7040 );
  15         18523  
22              
23 0     0 0 0 sub default_max_workers { 10 }
24              
25             sub new {
26 14     14 1 14686 my $class = shift;
27 14 50       119 my $arg = (@_ == 1) ? +shift : +{ @_ };
28 14   33     70 $arg->{max_workers} ||= $class->default_max_workers;
29              
30 14         126 bless(+{
31             %$arg,
32             manager_pid => $$,
33             } => $class)->init;
34             }
35              
36             sub init {
37 14     14 0 35 my $self = shift;
38              
39 14         77 $self->process_queue([]);
40 14         308 $self->running_worker(+{});
41 14         161 $self->process_cb(+{});
42              
43 14         112 return $self;
44             }
45              
46 106     106 0 395 sub is_child { shift->manager_pid != $$ }
47             sub is_working_max {
48 142     142 0 251 my $self = shift;
49              
50 142         304 $self->num_workers >= $self->max_workers;
51             }
52              
53             sub num_workers {
54 382     382 0 191440 my $self = shift;
55 382         614 return scalar keys %{ $self->running_worker };
  382         1427  
56             }
57              
58             sub num_queues {
59 36     36 0 1519 my $self = shift;
60 36         69 return scalar @{ $self->process_queue };
  36         257  
61             }
62              
63             sub start {
64 94     94 1 26457 my $self = shift;
65 94 100       687 my $arg = (@_ == 1) ? +shift : +{ @_ };
66              
67 94 50       394 die "\$fork_manager->start() should be called within the manager process\n"
68             if $self->is_child;
69              
70 94 100       1029 if ($self->is_working_max) {## child working max
71 40         460 $self->_run_cb('on_working_max' => @{ $arg->{args} });
  40         220  
72 40         69625 $self->enqueue($arg);
73 40         305 return;
74             }
75             else {## create child process
76 54         45344 my $pid = fork;
77              
78 54 50       2935 if (not(defined $pid)) {
    100          
79 0         0 $self->_run_cb('on_error' => @{ $arg->{args} });
  0         0  
80 0         0 return;
81             }
82             elsif ($pid) {
83             # parent
84 42         542 $self->_run_cb('on_start' => $pid, @{ $arg->{args} });
  42         2243  
85 42         107156 $self->process_cb->{$pid} = $self->_create_callback(@{ $arg->{args} });
  42         268  
86             $self->running_worker->{$pid} = AnyEvent->child(
87             pid => $pid,
88 42         1101 cb => $self->process_cb->{$pid},
89             );
90              
91             # delete worker watcher if already finished child process.
92 42 50       72946 delete $self->running_worker->{$pid} unless exists $self->process_cb->{$pid};
93              
94 42         1837 return $pid;
95             }
96             else {
97             # child
98 12         607 $arg->{cb}->($self, @{ $arg->{args} });
  12         1106  
99 0         0 $self->finish;
100             }
101             }
102             }
103              
104             sub _create_callback {
105 42     42   232 my($self, @args) = @_;
106              
107 42         504 weaken($self);
108             return sub {
109 32     32   2231384 my ($pid, $status) = @_;
110 32         168 delete $self->running_worker->{$pid};
111 32         657 delete $self->process_cb->{$pid};
112 32         511 $self->_run_cb('on_finish' => $pid, $status, @args);
113              
114 32 100       22131 if ($self->num_queues) {
115             ## dequeue
116 28         261 $self->dequeue;
117             }
118 42         1178 };
119             }
120              
121             sub finish {
122 12     12 0 164070 my ($self, $exit_code) = @_;
123 12 50       194 die "\$fork_manager->finish() shouln't be called within the manager process\n"
124             unless $self->is_child;
125              
126 12   50     4402 exit($exit_code || 0);
127             }
128              
129             sub enqueue {
130 40     40 0 160 my($self, $arg) = @_;
131              
132 40         85 $self->_run_cb('on_enqueue' => @{ $arg->{args} });
  40         260  
133 40         68640 push @{ $self->process_queue } => $arg;
  40         230  
134             }
135              
136             sub dequeue {
137 28     28 0 59 my $self = shift;
138              
139 28         85 until ($self->is_working_max) {
140 28 50       314 last unless @{ $self->process_queue };
  28         63  
141              
142             # dequeue
143 28 50       151 if (my $arg = shift @{ $self->process_queue }) {
  28         78  
144 28         163 $self->_run_cb('on_dequeue' => @{ $arg->{args} });
  28         120  
145 28         45138 $self->start($arg);
146             }
147             }
148             }
149              
150             sub signal_all_children {
151 0     0 1 0 my ($self, $sig) = @_;
152 0         0 foreach my $pid (sort keys %{ $self->running_worker }) {
  0         0  
153 0         0 kill $sig, $pid;
154             }
155             }
156              
157             sub wait_all_children {
158 10     10 1 28070 my $self = shift;
159 10 50       175 my $arg = (@_ == 1) ? +shift : +{ @_ };
160              
161 10         45 my $cb = $arg->{cb};
162 10 100       75 if ($arg->{blocking}) {
163 5         25 $self->_wait_all_children_with_blocking;
164 1         24 $self->$cb;
165             }
166             else {
167 5 50       60 die 'cannot call.' if $self->wait_async;
168              
169 5         60 my $super = $self->on_finish;
170              
171 5         40 weaken($self);
172             $self->on_finish(
173             sub {
174 16     16   92 $super->(@_);
175 16 100 66     32013 if ($self->num_workers == 0 and $self->num_queues == 0) {
176 1         28 $self->$cb;
177 1         1233 $self->on_finish($super);
178 1         18 $self->wait_async(0);
179             }
180             }
181 5         105 );
182              
183 5         40 $self->wait_async(1);
184             }
185             }
186              
187             sub _run_cb {
188 182     182   589 my $self = shift;
189 182         1119 my $name = shift;
190              
191 182         2210 my $cb = $self->$name();
192 182 50       1729 if ($cb) {
193 182         1481 $self->$cb(@_);
194             }
195             }
196              
197             our $WAIT_INTERVAL = 0.1 * 1000 * 1000;
198             sub _wait_all_children_with_blocking {
199 5     5   15 my $self = shift;
200              
201 5   66     15 until ($self->num_workers == 0 and $self->num_queues == 0) {
202 39         980 my($pid, $status) = _wait_with_status(-1, WNOHANG);
203 39 50 66     389 if ($pid and exists $self->running_worker->{$pid}) {
204 16         200 $self->process_cb->{$pid}->($pid, $status);
205             }
206             }
207             continue {
208             # retry interval
209 35         2392022 Time::HiRes::usleep( $WAIT_INTERVAL );
210             }
211             }
212              
213             # function
214             sub _wait_with_status {## blocking
215 39     39   160 my($waitpid, $option) = @_;
216              
217 15     15   31016 use vmsish 'status';
  15         254  
  15         75  
218 39         423 local $?;
219              
220 39         702 my $pid = waitpid($waitpid, $option);
221 39         408 return ($pid, $?);
222             }
223              
224             1;
225             __END__