File Coverage

blib/lib/AnyEvent/ForkManager.pm
Criterion Covered Total %
statement 125 130 96.1
branch 24 34 70.5
condition 8 14 57.1
subroutine 27 28 96.4
pod 4 13 30.7
total 188 219 85.8


line stmt bran cond sub pod time code
1             package AnyEvent::ForkManager;
2 15     15   565981 use 5.008_001;
  15         30  
  15         367  
3 15     15   38 use strict;
  15         15  
  15         309  
4 15     15   32 use warnings;
  15         17  
  15         414  
5              
6             our $VERSION = '0.06';
7              
8 15     15   791 use AnyEvent;
  15         4360  
  15         278  
9 15     15   67 use Scalar::Util qw/weaken/;
  15         10  
  15         672  
10 15     15   5915 use POSIX qw/WNOHANG/;
  15         64031  
  15         59  
11 15     15   16101 use Time::HiRes ();
  15         14948  
  15         517  
12              
13             use Class::Accessor::Lite 0.04 (
14 15         91 ro => [
15             qw/max_workers manager_pid/,
16             ],
17             rw => [
18             qw/on_start on_finish on_error on_enqueue on_dequeue on_working_max/,
19             qw/process_queue running_worker process_cb wait_async/,
20             ],
21 15     15   5995 );
  15         11047  
22              
23 0     0 0 0 sub default_max_workers { 10 }
24              
25             sub new {
26 14     14 1 3556 my $class = shift;
27 14 50       77 my $arg = (@_ == 1) ? +shift : +{ @_ };
28 14   33     56 $arg->{max_workers} ||= $class->default_max_workers;
29              
30 14         84 bless(+{
31             %$arg,
32             manager_pid => $$,
33             } => $class)->init;
34             }
35              
36             sub init {
37 14     14 0 28 my $self = shift;
38              
39 14         63 $self->process_queue([]);
40 14         119 $self->running_worker(+{});
41 14         77 $self->process_cb(+{});
42              
43 14         56 return $self;
44             }
45              
46 106     106 0 273 sub is_child { shift->manager_pid != $$ }
47             sub is_working_max {
48 142     142 0 153 my $self = shift;
49              
50 142         276 $self->num_workers >= $self->max_workers;
51             }
52              
53             sub num_workers {
54 373     373 0 96742 my $self = shift;
55 373         338 return scalar keys %{ $self->running_worker };
  373         776  
56             }
57              
58             sub num_queues {
59 36     36 0 5469 my $self = shift;
60 36         50 return scalar @{ $self->process_queue };
  36         147  
61             }
62              
63             sub start {
64 94     94 1 46764 my $self = shift;
65 94 100       326 my $arg = (@_ == 1) ? +shift : +{ @_ };
66              
67 94 50       260 die "\$fork_manager->start() should be called within the manager process\n"
68             if $self->is_child;
69              
70 94 100       766 if ($self->is_working_max) {## child working max
71 40         345 $self->_run_cb('on_working_max' => @{ $arg->{args} });
  40         155  
72 40         90085 $self->enqueue($arg);
73 40         260 return;
74             }
75             else {## create child process
76 54         31171 my $pid = fork;
77              
78 54 50       1708 if (not(defined $pid)) {
    100          
79 0         0 $self->_run_cb('on_error' => @{ $arg->{args} });
  0         0  
80 0         0 return;
81             }
82             elsif ($pid) {
83             # parent
84 42         205 $self->_run_cb('on_start' => $pid, @{ $arg->{args} });
  42         1218  
85 42         189374 $self->process_cb->{$pid} = $self->_create_callback(@{ $arg->{args} });
  42         559  
86 42         439 $self->running_worker->{$pid} = AnyEvent->child(
87             pid => $pid,
88             cb => $self->process_cb->{$pid},
89             );
90              
91             # delete worker watcher if already finished child process.
92 42 50       24511 delete $self->running_worker->{$pid} unless exists $self->process_cb->{$pid};
93              
94 42         868 return $pid;
95             }
96             else {
97             # child
98 12         325 $arg->{cb}->($self, @{ $arg->{args} });
  12         615  
99 0         0 $self->finish;
100             }
101             }
102             }
103              
104             sub _create_callback {
105 42     42   123 my($self, @args) = @_;
106              
107 42         194 weaken($self);
108             return sub {
109 32     32   1527839 my ($pid, $status) = @_;
110 32         163 delete $self->running_worker->{$pid};
111 32         471 delete $self->process_cb->{$pid};
112 32         250 $self->_run_cb('on_finish' => $pid, $status, @args);
113              
114 32 100       61116 if ($self->num_queues) {
115             ## dequeue
116 28         205 $self->dequeue;
117             }
118 42         589 };
119             }
120              
121             sub finish {
122 12     12 0 153047 my ($self, $exit_code) = @_;
123 12 50       114 die "\$fork_manager->finish() shouln't be called within the manager process\n"
124             unless $self->is_child;
125              
126 12   50     2864 exit($exit_code || 0);
127             }
128              
129             sub enqueue {
130 40     40 0 75 my($self, $arg) = @_;
131              
132 40         95 $self->_run_cb('on_enqueue' => @{ $arg->{args} });
  40         150  
133 40         93015 push @{ $self->process_queue } => $arg;
  40         150  
134             }
135              
136             sub dequeue {
137 28     28 0 45 my $self = shift;
138              
139 28         93 until ($self->is_working_max) {
140 28 50       245 last unless @{ $self->process_queue };
  28         75  
141              
142             # dequeue
143 28 50       114 if (my $arg = shift @{ $self->process_queue }) {
  28         61  
144 28         116 $self->_run_cb('on_dequeue' => @{ $arg->{args} });
  28         81  
145 28         96289 $self->start($arg);
146             }
147             }
148             }
149              
150             sub signal_all_children {
151 10     10 1 5305 my ($self, $sig) = @_;
152 10         10 foreach my $pid (sort keys %{ $self->running_worker }) {
  10         65  
153 20         500 kill $sig, $pid;
154             }
155             }
156              
157             sub wait_all_children {
158 10     10 1 125 my $self = shift;
159 10 50       75 my $arg = (@_ == 1) ? +shift : +{ @_ };
160              
161 10         20 my $cb = $arg->{cb};
162 10 100       80 if ($arg->{blocking}) {
163 5         15 $self->_wait_all_children_with_blocking;
164 1         18 $self->$cb;
165             }
166             else {
167 5 50       10 die 'cannot call.' if $self->wait_async;
168              
169 5         35 my $super = $self->on_finish;
170              
171 5         25 weaken($self);
172             $self->on_finish(
173             sub {
174 16     16   66 $super->(@_);
175 16 100 66     49757 if ($self->num_workers == 0 and $self->num_queues == 0) {
176 1         9 $self->$cb;
177 1         2189 $self->on_finish($super);
178 1         8 $self->wait_async(0);
179             }
180             }
181 5         30 );
182              
183 5         25 $self->wait_async(1);
184             }
185             }
186              
187             sub _run_cb {
188 182     182   276 my $self = shift;
189 182         532 my $name = shift;
190              
191 182         1663 my $cb = $self->$name();
192 182 50       1273 if ($cb) {
193 182         879 $self->$cb(@_);
194             }
195             }
196              
197             our $WAIT_INTERVAL = 0.1 * 1000 * 1000;
198             sub _wait_all_children_with_blocking {
199 5     5   5 my $self = shift;
200              
201 5   66     10 until ($self->num_workers == 0 and $self->num_queues == 0) {
202 30         645 my($pid, $status) = _wait_with_status(-1, WNOHANG);
203 30 100 66     274 if ($pid and exists $self->running_worker->{$pid}) {
204 16         184 $self->process_cb->{$pid}->($pid, $status);
205             }
206             }
207             continue {
208             # retry interval
209 26         1649221 Time::HiRes::usleep( $WAIT_INTERVAL );
210             }
211             }
212              
213             # function
214             sub _wait_with_status {## blocking
215 30     30   65 my($waitpid, $option) = @_;
216              
217 15     15   19457 use vmsish 'status';
  15         114  
  15         812  
218 30         196 local $?;
219              
220 30         450 my $pid = waitpid($waitpid, $option);
221 30         134 return ($pid, $?);
222             }
223              
224             1;
225             __END__