| line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
|
1
|
|
|
|
|
|
|
=head1 NAME |
|
2
|
|
|
|
|
|
|
|
|
3
|
|
|
|
|
|
|
AnyEvent::Fork::Pool - simple process pool manager on top of AnyEvent::Fork |
|
4
|
|
|
|
|
|
|
|
|
5
|
|
|
|
|
|
|
THE API IS NOT FINISHED, CONSIDER THIS AN ALPHA RELEASE |
|
6
|
|
|
|
|
|
|
|
|
7
|
|
|
|
|
|
|
=head1 SYNOPSIS |
|
8
|
|
|
|
|
|
|
|
|
9
|
|
|
|
|
|
|
use AnyEvent; |
|
10
|
|
|
|
|
|
|
use AnyEvent::Fork::Pool; |
|
11
|
|
|
|
|
|
|
# use AnyEvent::Fork is not needed |
|
12
|
|
|
|
|
|
|
|
|
13
|
|
|
|
|
|
|
# all possible parameters shown, with default values |
|
14
|
|
|
|
|
|
|
my $pool = AnyEvent::Fork |
|
15
|
|
|
|
|
|
|
->new |
|
16
|
|
|
|
|
|
|
->require ("MyWorker") |
|
17
|
|
|
|
|
|
|
->AnyEvent::Fork::Pool::run ( |
|
18
|
|
|
|
|
|
|
"MyWorker::run", # the worker function |
|
19
|
|
|
|
|
|
|
|
|
20
|
|
|
|
|
|
|
# pool management |
|
21
|
|
|
|
|
|
|
max => 4, # absolute maximum # of processes |
|
22
|
|
|
|
|
|
|
idle => 0, # minimum # of idle processes |
|
23
|
|
|
|
|
|
|
load => 2, # queue at most this number of jobs per process |
|
24
|
|
|
|
|
|
|
start => 0.1, # wait this many seconds before starting a new process |
|
25
|
|
|
|
|
|
|
stop => 10, # wait this many seconds before stopping an idle process |
|
26
|
|
|
|
|
|
|
on_destroy => (my $finish = AE::cv), # called when object is destroyed |
|
27
|
|
|
|
|
|
|
|
|
28
|
|
|
|
|
|
|
# parameters passed to AnyEvent::Fork::RPC |
|
29
|
|
|
|
|
|
|
async => 0, |
|
30
|
|
|
|
|
|
|
on_error => sub { die "FATAL: $_[0]\n" }, |
|
31
|
|
|
|
|
|
|
on_event => sub { my @ev = @_ }, |
|
32
|
|
|
|
|
|
|
init => "MyWorker::init", |
|
33
|
|
|
|
|
|
|
serialiser => $AnyEvent::Fork::RPC::STRING_SERIALISER, |
|
34
|
|
|
|
|
|
|
); |
|
35
|
|
|
|
|
|
|
|
|
36
|
|
|
|
|
|
|
for (1..10) { |
|
37
|
|
|
|
|
|
|
$pool->(doit => $_, sub { |
|
38
|
|
|
|
|
|
|
print "MyWorker::run returned @_\n"; |
|
39
|
|
|
|
|
|
|
}); |
|
40
|
|
|
|
|
|
|
} |
|
41
|
|
|
|
|
|
|
|
|
42
|
|
|
|
|
|
|
undef $pool; |
|
43
|
|
|
|
|
|
|
|
|
44
|
|
|
|
|
|
|
$finish->recv; |
|
45
|
|
|
|
|
|
|
|
|
46
|
|
|
|
|
|
|
=head1 DESCRIPTION |
|
47
|
|
|
|
|
|
|
|
|
48
|
|
|
|
|
|
|
This module uses processes created via L and the RPC |
|
49
|
|
|
|
|
|
|
protocol implement in L to create a load-balanced |
|
50
|
|
|
|
|
|
|
pool of processes that handles jobs. |
|
51
|
|
|
|
|
|
|
|
|
52
|
|
|
|
|
|
|
Understanding of L is helpful but not critical to be able |
|
53
|
|
|
|
|
|
|
to use this module, but a thorough understanding of L |
|
54
|
|
|
|
|
|
|
is, as it defines the actual API that needs to be implemented in the |
|
55
|
|
|
|
|
|
|
worker processes. |
|
56
|
|
|
|
|
|
|
|
|
57
|
|
|
|
|
|
|
=head1 EXAMPLES |
|
58
|
|
|
|
|
|
|
|
|
59
|
|
|
|
|
|
|
=head1 PARENT USAGE |
|
60
|
|
|
|
|
|
|
|
|
61
|
|
|
|
|
|
|
To create a pool, you first have to create a L object - |
|
62
|
|
|
|
|
|
|
this object becomes your template process. Whenever a new worker process |
|
63
|
|
|
|
|
|
|
is needed, it is forked from this template process. Then you need to |
|
64
|
|
|
|
|
|
|
"hand off" this template process to the C module by |
|
65
|
|
|
|
|
|
|
calling its run method on it: |
|
66
|
|
|
|
|
|
|
|
|
67
|
|
|
|
|
|
|
my $template = AnyEvent::Fork |
|
68
|
|
|
|
|
|
|
->new |
|
69
|
|
|
|
|
|
|
->require ("SomeModule", "MyWorkerModule"); |
|
70
|
|
|
|
|
|
|
|
|
71
|
|
|
|
|
|
|
my $pool = $template->AnyEvent::Fork::Pool::run ("MyWorkerModule::myfunction"); |
|
72
|
|
|
|
|
|
|
|
|
73
|
|
|
|
|
|
|
The pool "object" is not a regular Perl object, but a code reference that |
|
74
|
|
|
|
|
|
|
you can call and that works roughly like calling the worker function |
|
75
|
|
|
|
|
|
|
directly, except that it returns nothing but instead you need to specify a |
|
76
|
|
|
|
|
|
|
callback to be invoked once results are in: |
|
77
|
|
|
|
|
|
|
|
|
78
|
|
|
|
|
|
|
$pool->(1, 2, 3, sub { warn "myfunction(1,2,3) returned @_" }); |
|
79
|
|
|
|
|
|
|
|
|
80
|
|
|
|
|
|
|
=over 4 |
|
81
|
|
|
|
|
|
|
|
|
82
|
|
|
|
|
|
|
=cut |
|
83
|
|
|
|
|
|
|
|
|
84
|
|
|
|
|
|
|
package AnyEvent::Fork::Pool; |
|
85
|
|
|
|
|
|
|
|
|
86
|
1
|
|
|
1
|
|
1382
|
use common::sense; |
|
|
1
|
|
|
|
|
7
|
|
|
|
1
|
|
|
|
|
4
|
|
|
87
|
|
|
|
|
|
|
|
|
88
|
1
|
|
|
1
|
|
44
|
use Scalar::Util (); |
|
|
1
|
|
|
|
|
1
|
|
|
|
1
|
|
|
|
|
12
|
|
|
89
|
|
|
|
|
|
|
|
|
90
|
1
|
|
|
1
|
|
727
|
use Guard (); |
|
|
1
|
|
|
|
|
507
|
|
|
|
1
|
|
|
|
|
18
|
|
|
91
|
1
|
|
|
1
|
|
1977
|
use Array::Heap (); |
|
|
1
|
|
|
|
|
630
|
|
|
|
1
|
|
|
|
|
20
|
|
|
92
|
|
|
|
|
|
|
|
|
93
|
1
|
|
|
1
|
|
1649
|
use AnyEvent; |
|
|
1
|
|
|
|
|
5309
|
|
|
|
1
|
|
|
|
|
46
|
|
|
94
|
|
|
|
|
|
|
# explicit version on next line, as some cpan-testers test with the 0.1 version, |
|
95
|
|
|
|
|
|
|
# ignoring dependencies, and this line will at least give a clear indication of that. |
|
96
|
1
|
|
|
1
|
|
895
|
use AnyEvent::Fork 0.6; # we don't actually depend on it, this is for convenience |
|
|
1
|
|
|
|
|
15487
|
|
|
|
1
|
|
|
|
|
34
|
|
|
97
|
1
|
|
|
1
|
|
1043
|
use AnyEvent::Fork::RPC; |
|
|
1
|
|
|
|
|
1367
|
|
|
|
1
|
|
|
|
|
1594
|
|
|
98
|
|
|
|
|
|
|
|
|
99
|
|
|
|
|
|
|
# these are used for the first and last argument of events |
|
100
|
|
|
|
|
|
|
# in the hope of not colliding. yes, I don't like it either, |
|
101
|
|
|
|
|
|
|
# but didn't come up with an obviously better alternative. |
|
102
|
|
|
|
|
|
|
my $magic0 = ':t6Z@HK1N%Dx@_7?=~-7NQgWDdAs6a,jFN=wLO0*jD*1%P'; |
|
103
|
|
|
|
|
|
|
my $magic1 = '<~53rexz.U`!]X[A235^"fyEoiTF\T~oH1l/N6+Djep9b~bI9`\1x%B~vWO1q*'; |
|
104
|
|
|
|
|
|
|
|
|
105
|
|
|
|
|
|
|
our $VERSION = 1.1; |
|
106
|
|
|
|
|
|
|
|
|
107
|
|
|
|
|
|
|
=item my $pool = AnyEvent::Fork::Pool::run $fork, $function, [key => value...] |
|
108
|
|
|
|
|
|
|
|
|
109
|
|
|
|
|
|
|
The traditional way to call the pool creation function. But it is way |
|
110
|
|
|
|
|
|
|
cooler to call it in the following way: |
|
111
|
|
|
|
|
|
|
|
|
112
|
|
|
|
|
|
|
=item my $pool = $fork->AnyEvent::Fork::Pool::run ($function, [key => value...]) |
|
113
|
|
|
|
|
|
|
|
|
114
|
|
|
|
|
|
|
Creates a new pool object with the specified C<$function> as function |
|
115
|
|
|
|
|
|
|
(name) to call for each request. The pool uses the C<$fork> object as the |
|
116
|
|
|
|
|
|
|
template when creating worker processes. |
|
117
|
|
|
|
|
|
|
|
|
118
|
|
|
|
|
|
|
You can supply your own template process, or tell C |
|
119
|
|
|
|
|
|
|
to create one. |
|
120
|
|
|
|
|
|
|
|
|
121
|
|
|
|
|
|
|
A relatively large number of key/value pairs can be specified to influence |
|
122
|
|
|
|
|
|
|
the behaviour. They are grouped into the categories "pool management", |
|
123
|
|
|
|
|
|
|
"template process" and "rpc parameters". |
|
124
|
|
|
|
|
|
|
|
|
125
|
|
|
|
|
|
|
=over 4 |
|
126
|
|
|
|
|
|
|
|
|
127
|
|
|
|
|
|
|
=item Pool Management |
|
128
|
|
|
|
|
|
|
|
|
129
|
|
|
|
|
|
|
The pool consists of a certain number of worker processes. These options |
|
130
|
|
|
|
|
|
|
decide how many of these processes exist and when they are started and |
|
131
|
|
|
|
|
|
|
stopped. |
|
132
|
|
|
|
|
|
|
|
|
133
|
|
|
|
|
|
|
The worker pool is dynamically resized, according to (perceived :) |
|
134
|
|
|
|
|
|
|
load. The minimum size is given by the C parameter and the maximum |
|
135
|
|
|
|
|
|
|
size is given by the C parameter. A new worker is started every |
|
136
|
|
|
|
|
|
|
C seconds at most, and an idle worker is stopped at most every |
|
137
|
|
|
|
|
|
|
C second. |
|
138
|
|
|
|
|
|
|
|
|
139
|
|
|
|
|
|
|
You can specify the amount of jobs sent to a worker concurrently using the |
|
140
|
|
|
|
|
|
|
C parameter. |
|
141
|
|
|
|
|
|
|
|
|
142
|
|
|
|
|
|
|
=over 4 |
|
143
|
|
|
|
|
|
|
|
|
144
|
|
|
|
|
|
|
=item idle => $count (default: 0) |
|
145
|
|
|
|
|
|
|
|
|
146
|
|
|
|
|
|
|
The minimum amount of idle processes in the pool - when there are fewer |
|
147
|
|
|
|
|
|
|
than this many idle workers, C will try to start new |
|
148
|
|
|
|
|
|
|
ones, subject to the limits set by C and C. |
|
149
|
|
|
|
|
|
|
|
|
150
|
|
|
|
|
|
|
This is also the initial amount of workers in the pool. The default of |
|
151
|
|
|
|
|
|
|
zero means that the pool starts empty and can shrink back to zero workers |
|
152
|
|
|
|
|
|
|
over time. |
|
153
|
|
|
|
|
|
|
|
|
154
|
|
|
|
|
|
|
=item max => $count (default: 4) |
|
155
|
|
|
|
|
|
|
|
|
156
|
|
|
|
|
|
|
The maximum number of processes in the pool, in addition to the template |
|
157
|
|
|
|
|
|
|
process. C will never have more than this number of |
|
158
|
|
|
|
|
|
|
worker processes, although there can be more temporarily when a worker is |
|
159
|
|
|
|
|
|
|
shut down and hasn't exited yet. |
|
160
|
|
|
|
|
|
|
|
|
161
|
|
|
|
|
|
|
=item load => $count (default: 2) |
|
162
|
|
|
|
|
|
|
|
|
163
|
|
|
|
|
|
|
The maximum number of concurrent jobs sent to a single worker process. |
|
164
|
|
|
|
|
|
|
|
|
165
|
|
|
|
|
|
|
Jobs that cannot be sent to a worker immediately (because all workers are |
|
166
|
|
|
|
|
|
|
busy) will be queued until a worker is available. |
|
167
|
|
|
|
|
|
|
|
|
168
|
|
|
|
|
|
|
Setting this low improves latency. For example, at C<1>, every job that |
|
169
|
|
|
|
|
|
|
is sent to a worker is sent to a completely idle worker that doesn't run |
|
170
|
|
|
|
|
|
|
any other jobs. The downside is that throughput is reduced - a worker that |
|
171
|
|
|
|
|
|
|
finishes a job needs to wait for a new job from the parent. |
|
172
|
|
|
|
|
|
|
|
|
173
|
|
|
|
|
|
|
The default of C<2> is usually a good compromise. |
|
174
|
|
|
|
|
|
|
|
|
175
|
|
|
|
|
|
|
=item start => $seconds (default: 0.1) |
|
176
|
|
|
|
|
|
|
|
|
177
|
|
|
|
|
|
|
When there are fewer than C workers (or all workers are completely |
|
178
|
|
|
|
|
|
|
busy), then a timer is started. If the timer elapses and there are still |
|
179
|
|
|
|
|
|
|
jobs that cannot be queued to a worker, a new worker is started. |
|
180
|
|
|
|
|
|
|
|
|
181
|
|
|
|
|
|
|
This sets the minimum time that all workers must be busy before a new |
|
182
|
|
|
|
|
|
|
worker is started. Or, put differently, the minimum delay between starting |
|
183
|
|
|
|
|
|
|
new workers. |
|
184
|
|
|
|
|
|
|
|
|
185
|
|
|
|
|
|
|
The delay is small by default, which means new workers will be started |
|
186
|
|
|
|
|
|
|
relatively quickly. A delay of C<0> is possible, and ensures that the pool |
|
187
|
|
|
|
|
|
|
will grow as quickly as possible under load. |
|
188
|
|
|
|
|
|
|
|
|
189
|
|
|
|
|
|
|
Non-zero values are useful to avoid "exploding" a pool because a lot of |
|
190
|
|
|
|
|
|
|
jobs are queued in an instant. |
|
191
|
|
|
|
|
|
|
|
|
192
|
|
|
|
|
|
|
Higher values are often useful to improve efficiency at the cost of |
|
193
|
|
|
|
|
|
|
latency - when fewer processes can do the job over time, starting more and |
|
194
|
|
|
|
|
|
|
more is not necessarily going to help. |
|
195
|
|
|
|
|
|
|
|
|
196
|
|
|
|
|
|
|
=item stop => $seconds (default: 10) |
|
197
|
|
|
|
|
|
|
|
|
198
|
|
|
|
|
|
|
When a worker has no jobs to execute it becomes idle. An idle worker that |
|
199
|
|
|
|
|
|
|
hasn't executed a job within this amount of time will be stopped, unless |
|
200
|
|
|
|
|
|
|
the other parameters say otherwise. |
|
201
|
|
|
|
|
|
|
|
|
202
|
|
|
|
|
|
|
Setting this to a very high value means that workers stay around longer, |
|
203
|
|
|
|
|
|
|
even when they have nothing to do, which can be good as they don't have to |
|
204
|
|
|
|
|
|
|
be started on the netx load spike again. |
|
205
|
|
|
|
|
|
|
|
|
206
|
|
|
|
|
|
|
Setting this to a lower value can be useful to avoid memory or simply |
|
207
|
|
|
|
|
|
|
process table wastage. |
|
208
|
|
|
|
|
|
|
|
|
209
|
|
|
|
|
|
|
Usually, setting this to a time longer than the time between load spikes |
|
210
|
|
|
|
|
|
|
is best - if you expect a lot of requests every minute and little work |
|
211
|
|
|
|
|
|
|
in between, setting this to longer than a minute avoids having to stop |
|
212
|
|
|
|
|
|
|
and start workers. On the other hand, you have to ask yourself if letting |
|
213
|
|
|
|
|
|
|
workers run idle is a good use of your resources. Try to find a good |
|
214
|
|
|
|
|
|
|
balance between resource usage of your workers and the time to start new |
|
215
|
|
|
|
|
|
|
workers - the processes created by L itself is fats at |
|
216
|
|
|
|
|
|
|
creating workers while not using much memory for them, so most of the |
|
217
|
|
|
|
|
|
|
overhead is likely from your own code. |
|
218
|
|
|
|
|
|
|
|
|
219
|
|
|
|
|
|
|
=item on_destroy => $callback->() (default: none) |
|
220
|
|
|
|
|
|
|
|
|
221
|
|
|
|
|
|
|
When a pool object goes out of scope, the outstanding requests are still |
|
222
|
|
|
|
|
|
|
handled till completion. Only after handling all jobs will the workers |
|
223
|
|
|
|
|
|
|
be destroyed (and also the template process if it isn't referenced |
|
224
|
|
|
|
|
|
|
otherwise). |
|
225
|
|
|
|
|
|
|
|
|
226
|
|
|
|
|
|
|
To find out when a pool I has finished its work, you can set this |
|
227
|
|
|
|
|
|
|
callback, which will be called when the pool has been destroyed. |
|
228
|
|
|
|
|
|
|
|
|
229
|
|
|
|
|
|
|
=back |
|
230
|
|
|
|
|
|
|
|
|
231
|
|
|
|
|
|
|
=item AnyEvent::Fork::RPC Parameters |
|
232
|
|
|
|
|
|
|
|
|
233
|
|
|
|
|
|
|
These parameters are all passed more or less directly to |
|
234
|
|
|
|
|
|
|
L. They are only briefly mentioned here, for |
|
235
|
|
|
|
|
|
|
their full documentation please refer to the L |
|
236
|
|
|
|
|
|
|
documentation. Also, the default values mentioned here are only documented |
|
237
|
|
|
|
|
|
|
as a best effort - the L documentation is binding. |
|
238
|
|
|
|
|
|
|
|
|
239
|
|
|
|
|
|
|
=over 4 |
|
240
|
|
|
|
|
|
|
|
|
241
|
|
|
|
|
|
|
=item async => $boolean (default: 0) |
|
242
|
|
|
|
|
|
|
|
|
243
|
|
|
|
|
|
|
Whether to use the synchronous or asynchronous RPC backend. |
|
244
|
|
|
|
|
|
|
|
|
245
|
|
|
|
|
|
|
=item on_error => $callback->($message) (default: die with message) |
|
246
|
|
|
|
|
|
|
|
|
247
|
|
|
|
|
|
|
The callback to call on any (fatal) errors. |
|
248
|
|
|
|
|
|
|
|
|
249
|
|
|
|
|
|
|
=item on_event => $callback->(...) (default: C, unlike L) |
|
250
|
|
|
|
|
|
|
|
|
251
|
|
|
|
|
|
|
The callback to invoke on events. |
|
252
|
|
|
|
|
|
|
|
|
253
|
|
|
|
|
|
|
=item init => $initfunction (default: none) |
|
254
|
|
|
|
|
|
|
|
|
255
|
|
|
|
|
|
|
The function to call in the child, once before handling requests. |
|
256
|
|
|
|
|
|
|
|
|
257
|
|
|
|
|
|
|
=item serialiser => $serialiser (defailt: $AnyEvent::Fork::RPC::STRING_SERIALISER) |
|
258
|
|
|
|
|
|
|
|
|
259
|
|
|
|
|
|
|
The serialiser to use. |
|
260
|
|
|
|
|
|
|
|
|
261
|
|
|
|
|
|
|
=back |
|
262
|
|
|
|
|
|
|
|
|
263
|
|
|
|
|
|
|
=back |
|
264
|
|
|
|
|
|
|
|
|
265
|
|
|
|
|
|
|
=cut |
|
266
|
|
|
|
|
|
|
|
|
267
|
|
|
|
|
|
|
sub run { |
|
268
|
0
|
|
|
0
|
1
|
|
my ($template, $function, %arg) = @_; |
|
269
|
|
|
|
|
|
|
|
|
270
|
0
|
|
0
|
|
|
|
my $max = $arg{max} || 4; |
|
271
|
|
|
|
|
|
|
my $idle = $arg{idle} || 0, |
|
272
|
|
|
|
|
|
|
my $load = $arg{load} || 2, |
|
273
|
|
|
|
|
|
|
my $start = $arg{start} || 0.1, |
|
274
|
|
|
|
|
|
|
my $stop = $arg{stop} || 10, |
|
275
|
0
|
|
|
0
|
|
|
my $on_event = $arg{on_event} || sub { }, |
|
276
|
0
|
|
0
|
|
|
|
my $on_destroy = $arg{on_destroy}; |
|
|
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
277
|
|
|
|
|
|
|
|
|
278
|
0
|
|
|
|
|
|
my @rpc = ( |
|
279
|
|
|
|
|
|
|
async => $arg{async}, |
|
280
|
|
|
|
|
|
|
init => $arg{init}, |
|
281
|
|
|
|
|
|
|
serialiser => delete $arg{serialiser}, |
|
282
|
|
|
|
|
|
|
on_error => $arg{on_error}, |
|
283
|
|
|
|
|
|
|
); |
|
284
|
|
|
|
|
|
|
|
|
285
|
0
|
|
|
|
|
|
my (@pool, @queue, $nidle, $start_w, $stop_w, $shutdown); |
|
286
|
0
|
|
|
|
|
|
my ($start_worker, $stop_worker, $want_start, $want_stop, $scheduler); |
|
287
|
|
|
|
|
|
|
|
|
288
|
|
|
|
|
|
|
my $destroy_guard = Guard::guard { |
|
289
|
0
|
0
|
|
0
|
|
|
$on_destroy->() |
|
290
|
|
|
|
|
|
|
if $on_destroy; |
|
291
|
0
|
|
|
|
|
|
}; |
|
292
|
|
|
|
|
|
|
|
|
293
|
0
|
0
|
|
|
|
|
$template |
|
294
|
|
|
|
|
|
|
->require ("AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync")) |
|
295
|
|
|
|
|
|
|
->eval (' |
|
296
|
|
|
|
|
|
|
my ($magic0, $magic1) = @_; |
|
297
|
|
|
|
|
|
|
sub AnyEvent::Fork::Pool::retire() { |
|
298
|
|
|
|
|
|
|
AnyEvent::Fork::RPC::event $magic0, "quit", $magic1; |
|
299
|
|
|
|
|
|
|
} |
|
300
|
|
|
|
|
|
|
', $magic0, $magic1) |
|
301
|
|
|
|
|
|
|
; |
|
302
|
|
|
|
|
|
|
|
|
303
|
|
|
|
|
|
|
$start_worker = sub { |
|
304
|
0
|
|
|
0
|
|
|
my $proc = [0, 0, undef]; # load, index, rpc |
|
305
|
|
|
|
|
|
|
|
|
306
|
|
|
|
|
|
|
$proc->[2] = $template |
|
307
|
|
|
|
|
|
|
->fork |
|
308
|
|
|
|
|
|
|
->AnyEvent::Fork::RPC::run ($function, |
|
309
|
|
|
|
|
|
|
@rpc, |
|
310
|
|
|
|
|
|
|
on_event => sub { |
|
311
|
0
|
0
|
0
|
|
|
|
if (@_ == 3 && $_[0] eq $magic0 && $_[2] eq $magic1) { |
|
|
|
|
0
|
|
|
|
|
|
312
|
0
|
|
|
|
|
|
$destroy_guard if 0; # keep it alive |
|
313
|
|
|
|
|
|
|
|
|
314
|
0
|
0
|
|
|
|
|
$_[1] eq "quit" and $stop_worker->($proc); |
|
315
|
0
|
|
|
|
|
|
return; |
|
316
|
|
|
|
|
|
|
} |
|
317
|
|
|
|
|
|
|
|
|
318
|
0
|
|
|
|
|
|
&$on_event; |
|
319
|
|
|
|
|
|
|
}, |
|
320
|
|
|
|
|
|
|
) |
|
321
|
0
|
|
|
|
|
|
; |
|
322
|
|
|
|
|
|
|
|
|
323
|
0
|
|
|
|
|
|
++$nidle; |
|
324
|
0
|
|
|
|
|
|
Array::Heap::push_heap_idx @pool, $proc; |
|
325
|
|
|
|
|
|
|
|
|
326
|
0
|
|
|
|
|
|
Scalar::Util::weaken $proc; |
|
327
|
0
|
|
|
|
|
|
}; |
|
328
|
|
|
|
|
|
|
|
|
329
|
|
|
|
|
|
|
$stop_worker = sub { |
|
330
|
0
|
|
|
0
|
|
|
my $proc = shift; |
|
331
|
|
|
|
|
|
|
|
|
332
|
0
|
0
|
|
|
|
|
$proc->[0] |
|
333
|
|
|
|
|
|
|
or --$nidle; |
|
334
|
|
|
|
|
|
|
|
|
335
|
0
|
0
|
|
|
|
|
Array::Heap::splice_heap_idx @pool, $proc->[1] |
|
336
|
|
|
|
|
|
|
if defined $proc->[1]; |
|
337
|
|
|
|
|
|
|
|
|
338
|
0
|
|
|
|
|
|
@$proc = 0; # tell others to leave it be |
|
339
|
0
|
|
|
|
|
|
}; |
|
340
|
|
|
|
|
|
|
|
|
341
|
|
|
|
|
|
|
$want_start = sub { |
|
342
|
0
|
|
|
0
|
|
|
undef $stop_w; |
|
343
|
|
|
|
|
|
|
|
|
344
|
|
|
|
|
|
|
$start_w ||= AE::timer $start, $start, sub { |
|
345
|
0
|
0
|
0
|
|
|
|
if (($nidle < $idle || @queue) && @pool < $max) { |
|
|
|
|
0
|
|
|
|
|
|
346
|
0
|
|
|
|
|
|
$start_worker->(); |
|
347
|
0
|
|
|
|
|
|
$scheduler->(); |
|
348
|
|
|
|
|
|
|
} else { |
|
349
|
0
|
|
|
|
|
|
undef $start_w; |
|
350
|
|
|
|
|
|
|
} |
|
351
|
0
|
|
0
|
|
|
|
}; |
|
352
|
0
|
|
|
|
|
|
}; |
|
353
|
|
|
|
|
|
|
|
|
354
|
|
|
|
|
|
|
$want_stop = sub { |
|
355
|
|
|
|
|
|
|
$stop_w ||= AE::timer $stop, $stop, sub { |
|
356
|
0
|
0
|
|
|
|
|
$stop_worker->($pool[0]) |
|
357
|
|
|
|
|
|
|
if $nidle; |
|
358
|
|
|
|
|
|
|
|
|
359
|
0
|
0
|
|
|
|
|
undef $stop_w |
|
360
|
|
|
|
|
|
|
if $nidle <= $idle; |
|
361
|
0
|
|
0
|
0
|
|
|
}; |
|
362
|
0
|
|
|
|
|
|
}; |
|
363
|
|
|
|
|
|
|
|
|
364
|
|
|
|
|
|
|
$scheduler = sub { |
|
365
|
0
|
0
|
|
0
|
|
|
if (@queue) { |
|
|
|
0
|
|
|
|
|
|
|
366
|
0
|
|
|
|
|
|
while (@queue) { |
|
367
|
0
|
0
|
|
|
|
|
@pool or $start_worker->(); |
|
368
|
|
|
|
|
|
|
|
|
369
|
0
|
|
|
|
|
|
my $proc = $pool[0]; |
|
370
|
|
|
|
|
|
|
|
|
371
|
0
|
0
|
|
|
|
|
if ($proc->[0] < $load) { |
|
372
|
|
|
|
|
|
|
# found free worker, increase load |
|
373
|
0
|
0
|
|
|
|
|
unless ($proc->[0]++) { |
|
374
|
|
|
|
|
|
|
# worker became busy |
|
375
|
0
|
0
|
|
|
|
|
--$nidle |
|
376
|
|
|
|
|
|
|
or undef $stop_w; |
|
377
|
|
|
|
|
|
|
|
|
378
|
0
|
0
|
0
|
|
|
|
$want_start->() |
|
379
|
|
|
|
|
|
|
if $nidle < $idle && @pool < $max; |
|
380
|
|
|
|
|
|
|
} |
|
381
|
|
|
|
|
|
|
|
|
382
|
0
|
|
|
|
|
|
Array::Heap::adjust_heap_idx @pool, 0; |
|
383
|
|
|
|
|
|
|
|
|
384
|
0
|
|
|
|
|
|
my $job = shift @queue; |
|
385
|
0
|
|
|
|
|
|
my $ocb = pop @$job; |
|
386
|
|
|
|
|
|
|
|
|
387
|
|
|
|
|
|
|
$proc->[2]->(@$job, sub { |
|
388
|
|
|
|
|
|
|
# reduce load |
|
389
|
0
|
0
|
0
|
|
|
|
--$proc->[0] # worker still busy? |
|
390
|
|
|
|
|
|
|
or ++$nidle > $idle # not too many idle processes? |
|
391
|
|
|
|
|
|
|
or $want_stop->(); |
|
392
|
|
|
|
|
|
|
|
|
393
|
0
|
0
|
|
|
|
|
Array::Heap::adjust_heap_idx @pool, $proc->[1] |
|
394
|
|
|
|
|
|
|
if defined $proc->[1]; |
|
395
|
|
|
|
|
|
|
|
|
396
|
0
|
|
|
|
|
|
&$ocb; |
|
397
|
|
|
|
|
|
|
|
|
398
|
0
|
|
|
|
|
|
$scheduler->(); |
|
399
|
0
|
|
|
|
|
|
}); |
|
400
|
|
|
|
|
|
|
} else { |
|
401
|
0
|
0
|
|
|
|
|
$want_start->() |
|
402
|
|
|
|
|
|
|
unless @pool >= $max; |
|
403
|
|
|
|
|
|
|
|
|
404
|
0
|
|
|
|
|
|
last; |
|
405
|
|
|
|
|
|
|
} |
|
406
|
|
|
|
|
|
|
} |
|
407
|
|
|
|
|
|
|
} elsif ($shutdown) { |
|
408
|
0
|
|
|
|
|
|
@pool = (); |
|
409
|
0
|
|
|
|
|
|
undef $start_w; |
|
410
|
0
|
|
|
|
|
|
undef $start_worker; # frees $destroy_guard reference |
|
411
|
|
|
|
|
|
|
|
|
412
|
0
|
|
|
|
|
|
$stop_worker->($pool[0]) |
|
413
|
|
|
|
|
|
|
while $nidle; |
|
414
|
|
|
|
|
|
|
} |
|
415
|
0
|
|
|
|
|
|
}; |
|
416
|
|
|
|
|
|
|
|
|
417
|
|
|
|
|
|
|
my $shutdown_guard = Guard::guard { |
|
418
|
0
|
|
|
0
|
|
|
$shutdown = 1; |
|
419
|
0
|
|
|
|
|
|
$scheduler->(); |
|
420
|
0
|
|
|
|
|
|
}; |
|
421
|
|
|
|
|
|
|
|
|
422
|
0
|
|
|
|
|
|
$start_worker->() |
|
423
|
|
|
|
|
|
|
while @pool < $idle; |
|
424
|
|
|
|
|
|
|
|
|
425
|
|
|
|
|
|
|
sub { |
|
426
|
0
|
|
|
0
|
|
|
$shutdown_guard if 0; # keep it alive |
|
427
|
|
|
|
|
|
|
|
|
428
|
0
|
0
|
|
|
|
|
$start_worker->() |
|
429
|
|
|
|
|
|
|
unless @pool; |
|
430
|
|
|
|
|
|
|
|
|
431
|
0
|
|
|
|
|
|
push @queue, [@_]; |
|
432
|
0
|
|
|
|
|
|
$scheduler->(); |
|
433
|
|
|
|
|
|
|
} |
|
434
|
0
|
|
|
|
|
|
} |
|
435
|
|
|
|
|
|
|
|
|
436
|
|
|
|
|
|
|
=item $pool->(..., $cb->(...)) |
|
437
|
|
|
|
|
|
|
|
|
438
|
|
|
|
|
|
|
Call the RPC function of a worker with the given arguments, and when the |
|
439
|
|
|
|
|
|
|
worker is done, call the C<$cb> with the results, just like calling the |
|
440
|
|
|
|
|
|
|
RPC object durectly - see the L documentation for |
|
441
|
|
|
|
|
|
|
details on the RPC API. |
|
442
|
|
|
|
|
|
|
|
|
443
|
|
|
|
|
|
|
If there is no free worker, the call will be queued until a worker becomes |
|
444
|
|
|
|
|
|
|
available. |
|
445
|
|
|
|
|
|
|
|
|
446
|
|
|
|
|
|
|
Note that there can be considerable time between calling this method and |
|
447
|
|
|
|
|
|
|
the call actually being executed. During this time, the parameters passed |
|
448
|
|
|
|
|
|
|
to this function are effectively read-only - modifying them after the call |
|
449
|
|
|
|
|
|
|
and before the callback is invoked causes undefined behaviour. |
|
450
|
|
|
|
|
|
|
|
|
451
|
|
|
|
|
|
|
=cut |
|
452
|
|
|
|
|
|
|
|
|
453
|
|
|
|
|
|
|
=item $cpus = AnyEvent::Fork::Pool::ncpu [$default_cpus] |
|
454
|
|
|
|
|
|
|
|
|
455
|
|
|
|
|
|
|
=item ($cpus, $eus) = AnyEvent::Fork::Pool::ncpu [$default_cpus] |
|
456
|
|
|
|
|
|
|
|
|
457
|
|
|
|
|
|
|
Tries to detect the number of CPUs (C<$cpus> often called cpu cores |
|
458
|
|
|
|
|
|
|
nowadays) and execution units (C<$eus>) which include e.g. extra |
|
459
|
|
|
|
|
|
|
hyperthreaded units). When C<$cpus> cannot be determined reliably, |
|
460
|
|
|
|
|
|
|
C<$default_cpus> is returned for both values, or C<1> if it is missing. |
|
461
|
|
|
|
|
|
|
|
|
462
|
|
|
|
|
|
|
For normal CPU bound uses, it is wise to have as many worker processes |
|
463
|
|
|
|
|
|
|
as CPUs in the system (C<$cpus>), if nothing else uses the CPU. Using |
|
464
|
|
|
|
|
|
|
hyperthreading is usually detrimental to performance, but in those rare |
|
465
|
|
|
|
|
|
|
cases where that really helps it might be beneficial to use more workers |
|
466
|
|
|
|
|
|
|
(C<$eus>). |
|
467
|
|
|
|
|
|
|
|
|
468
|
|
|
|
|
|
|
Currently, F is parsed on GNU/Linux systems for both |
|
469
|
|
|
|
|
|
|
C<$cpus> and C<$eu>, and on {Free,Net,Open}BSD, F is |
|
470
|
|
|
|
|
|
|
used for C<$cpus>. |
|
471
|
|
|
|
|
|
|
|
|
472
|
|
|
|
|
|
|
Example: create a worker pool with as many workers as cpu cores, or C<2>, |
|
473
|
|
|
|
|
|
|
if the actual number could not be determined. |
|
474
|
|
|
|
|
|
|
|
|
475
|
|
|
|
|
|
|
$fork->AnyEvent::Fork::Pool::run ("myworker::function", |
|
476
|
|
|
|
|
|
|
max => (scalar AnyEvent::Fork::Pool::ncpu 2), |
|
477
|
|
|
|
|
|
|
); |
|
478
|
|
|
|
|
|
|
|
|
479
|
|
|
|
|
|
|
=cut |
|
480
|
|
|
|
|
|
|
|
|
481
|
|
|
|
|
|
|
BEGIN { |
|
482
|
1
|
50
|
0
|
1
|
|
6
|
if ($^O eq "linux") { |
|
|
|
0
|
0
|
|
|
|
|
|
483
|
|
|
|
|
|
|
*ncpu = sub(;$) { |
|
484
|
0
|
|
|
0
|
|
|
my ($cpus, $eus); |
|
485
|
|
|
|
|
|
|
|
|
486
|
0
|
0
|
|
|
|
|
if (open my $fh, "<", "/proc/cpuinfo") { |
|
487
|
0
|
|
|
|
|
|
my %id; |
|
488
|
|
|
|
|
|
|
|
|
489
|
0
|
|
|
|
|
|
while (<$fh>) { |
|
490
|
0
|
0
|
|
|
|
|
if (/^core id\s*:\s*(\d+)/) { |
|
491
|
0
|
|
|
|
|
|
++$eus; |
|
492
|
0
|
|
|
|
|
|
undef $id{$1}; |
|
493
|
|
|
|
|
|
|
} |
|
494
|
|
|
|
|
|
|
} |
|
495
|
|
|
|
|
|
|
|
|
496
|
0
|
|
|
|
|
|
$cpus = scalar keys %id; |
|
497
|
|
|
|
|
|
|
} else { |
|
498
|
0
|
0
|
|
|
|
|
$cpus = $eus = @_ ? shift : 1; |
|
499
|
|
|
|
|
|
|
} |
|
500
|
0
|
0
|
|
|
|
|
wantarray ? ($cpus, $eus) : $cpus |
|
501
|
1
|
|
|
|
|
53
|
}; |
|
502
|
|
|
|
|
|
|
} elsif ($^O eq "freebsd" || $^O eq "netbsd" || $^O eq "openbsd") { |
|
503
|
|
|
|
|
|
|
*ncpu = sub(;$) { |
|
504
|
0
|
|
0
|
|
|
|
my $cpus = qx * 1 |
|
505
|
|
|
|
|
|
|
|| (@_ ? shift : 1); |
|
506
|
0
|
0
|
|
|
|
|
wantarray ? ($cpus, $cpus) : $cpus |
|
507
|
0
|
|
|
|
|
|
}; |
|
508
|
|
|
|
|
|
|
} else { |
|
509
|
|
|
|
|
|
|
*ncpu = sub(;$) { |
|
510
|
0
|
0
|
|
|
|
|
my $cpus = @_ ? shift : 1; |
|
511
|
0
|
0
|
|
|
|
|
wantarray ? ($cpus, $cpus) : $cpus |
|
512
|
0
|
|
|
|
|
|
}; |
|
513
|
|
|
|
|
|
|
} |
|
514
|
|
|
|
|
|
|
} |
|
515
|
|
|
|
|
|
|
|
|
516
|
|
|
|
|
|
|
=back |
|
517
|
|
|
|
|
|
|
|
|
518
|
|
|
|
|
|
|
=head1 CHILD USAGE |
|
519
|
|
|
|
|
|
|
|
|
520
|
|
|
|
|
|
|
In addition to the L API, this module implements one |
|
521
|
|
|
|
|
|
|
more child-side function: |
|
522
|
|
|
|
|
|
|
|
|
523
|
|
|
|
|
|
|
=over 4 |
|
524
|
|
|
|
|
|
|
|
|
525
|
|
|
|
|
|
|
=item AnyEvent::Fork::Pool::retire () |
|
526
|
|
|
|
|
|
|
|
|
527
|
|
|
|
|
|
|
This function sends an event to the parent process to request retirement: |
|
528
|
|
|
|
|
|
|
the worker is removed from the pool and no new jobs will be sent to it, |
|
529
|
|
|
|
|
|
|
but it has to handle the jobs that are already queued. |
|
530
|
|
|
|
|
|
|
|
|
531
|
|
|
|
|
|
|
The parentheses are part of the syntax: the function usually isn't defined |
|
532
|
|
|
|
|
|
|
when you compile your code (because that happens I handing the |
|
533
|
|
|
|
|
|
|
template process over to C, so you need the |
|
534
|
|
|
|
|
|
|
empty parentheses to tell Perl that the function is indeed a function. |
|
535
|
|
|
|
|
|
|
|
|
536
|
|
|
|
|
|
|
Retiring a worker can be useful to gracefully shut it down when the worker |
|
537
|
|
|
|
|
|
|
deems this useful. For example, after executing a job, one could check |
|
538
|
|
|
|
|
|
|
the process size or the number of jobs handled so far, and if either is |
|
539
|
|
|
|
|
|
|
too high, the worker could ask to get retired, to avoid memory leaks to |
|
540
|
|
|
|
|
|
|
accumulate. |
|
541
|
|
|
|
|
|
|
|
|
542
|
|
|
|
|
|
|
Example: retire a worker after it has handled roughly 100 requests. |
|
543
|
|
|
|
|
|
|
|
|
544
|
|
|
|
|
|
|
my $count = 0; |
|
545
|
|
|
|
|
|
|
|
|
546
|
|
|
|
|
|
|
sub my::worker { |
|
547
|
|
|
|
|
|
|
|
|
548
|
|
|
|
|
|
|
++$count == 100 |
|
549
|
|
|
|
|
|
|
and AnyEvent::Fork::Pool::retire (); |
|
550
|
|
|
|
|
|
|
|
|
551
|
|
|
|
|
|
|
... normal code goes here |
|
552
|
|
|
|
|
|
|
} |
|
553
|
|
|
|
|
|
|
|
|
554
|
|
|
|
|
|
|
=back |
|
555
|
|
|
|
|
|
|
|
|
556
|
|
|
|
|
|
|
=head1 POOL PARAMETERS RECIPES |
|
557
|
|
|
|
|
|
|
|
|
558
|
|
|
|
|
|
|
This section describes some recipes for pool paramaters. These are mostly |
|
559
|
|
|
|
|
|
|
meant for the synchronous RPC backend, as the asynchronous RPC backend |
|
560
|
|
|
|
|
|
|
changes the rules considerably, making workers themselves responsible for |
|
561
|
|
|
|
|
|
|
their scheduling. |
|
562
|
|
|
|
|
|
|
|
|
563
|
|
|
|
|
|
|
=over 4 |
|
564
|
|
|
|
|
|
|
|
|
565
|
|
|
|
|
|
|
=item low latency - set load = 1 |
|
566
|
|
|
|
|
|
|
|
|
567
|
|
|
|
|
|
|
If you need a deterministic low latency, you should set the C |
|
568
|
|
|
|
|
|
|
parameter to C<1>. This ensures that never more than one job is sent to |
|
569
|
|
|
|
|
|
|
each worker. This avoids having to wait for a previous job to finish. |
|
570
|
|
|
|
|
|
|
|
|
571
|
|
|
|
|
|
|
This makes most sense with the synchronous (default) backend, as the |
|
572
|
|
|
|
|
|
|
asynchronous backend can handle multiple requests concurrently. |
|
573
|
|
|
|
|
|
|
|
|
574
|
|
|
|
|
|
|
=item lowest latency - set load = 1 and idle = max |
|
575
|
|
|
|
|
|
|
|
|
576
|
|
|
|
|
|
|
To achieve the lowest latency, you additionally should disable any dynamic |
|
577
|
|
|
|
|
|
|
resizing of the pool by setting C to the same value as C. |
|
578
|
|
|
|
|
|
|
|
|
579
|
|
|
|
|
|
|
=item high throughput, cpu bound jobs - set load >= 2, max = #cpus |
|
580
|
|
|
|
|
|
|
|
|
581
|
|
|
|
|
|
|
To get high throughput with cpu-bound jobs, you should set the maximum |
|
582
|
|
|
|
|
|
|
pool size to the number of cpus in your system, and C to at least |
|
583
|
|
|
|
|
|
|
C<2>, to make sure there can be another job waiting for the worker when it |
|
584
|
|
|
|
|
|
|
has finished one. |
|
585
|
|
|
|
|
|
|
|
|
586
|
|
|
|
|
|
|
The value of C<2> for C is the minimum value that I achieve |
|
587
|
|
|
|
|
|
|
100% throughput, but if your parent process itself is sometimes busy, you |
|
588
|
|
|
|
|
|
|
might need higher values. Also there is a limit on the amount of data that |
|
589
|
|
|
|
|
|
|
can be "in flight" to the worker, so if you send big blobs of data to your |
|
590
|
|
|
|
|
|
|
worker, C might have much less of an effect. |
|
591
|
|
|
|
|
|
|
|
|
592
|
|
|
|
|
|
|
=item high throughput, I/O bound jobs - set load >= 2, max = 1, or very high |
|
593
|
|
|
|
|
|
|
|
|
594
|
|
|
|
|
|
|
When your jobs are I/O bound, using more workers usually boils down to |
|
595
|
|
|
|
|
|
|
higher throughput, depending very much on your actual workload - sometimes |
|
596
|
|
|
|
|
|
|
having only one worker is best, for example, when you read or write big |
|
597
|
|
|
|
|
|
|
files at maixmum speed, as a second worker will increase seek times. |
|
598
|
|
|
|
|
|
|
|
|
599
|
|
|
|
|
|
|
=back |
|
600
|
|
|
|
|
|
|
|
|
601
|
|
|
|
|
|
|
=head1 EXCEPTIONS |
|
602
|
|
|
|
|
|
|
|
|
603
|
|
|
|
|
|
|
The same "policy" as with L applies - exceptins will |
|
604
|
|
|
|
|
|
|
not be caught, and exceptions in both worker and in callbacks causes |
|
605
|
|
|
|
|
|
|
undesirable or undefined behaviour. |
|
606
|
|
|
|
|
|
|
|
|
607
|
|
|
|
|
|
|
=head1 SEE ALSO |
|
608
|
|
|
|
|
|
|
|
|
609
|
|
|
|
|
|
|
L, to create the processes in the first place. |
|
610
|
|
|
|
|
|
|
|
|
611
|
|
|
|
|
|
|
L, which implements the RPC protocol and API. |
|
612
|
|
|
|
|
|
|
|
|
613
|
|
|
|
|
|
|
=head1 AUTHOR AND CONTACT INFORMATION |
|
614
|
|
|
|
|
|
|
|
|
615
|
|
|
|
|
|
|
Marc Lehmann |
|
616
|
|
|
|
|
|
|
http://software.schmorp.de/pkg/AnyEvent-Fork-Pool |
|
617
|
|
|
|
|
|
|
|
|
618
|
|
|
|
|
|
|
=cut |
|
619
|
|
|
|
|
|
|
|
|
620
|
|
|
|
|
|
|
1 |
|
621
|
|
|
|
|
|
|
|