line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
=head1 NAME |
2
|
|
|
|
|
|
|
|
3
|
|
|
|
|
|
|
AnyEvent::Fork::Pool - simple process pool manager on top of AnyEvent::Fork |
4
|
|
|
|
|
|
|
|
5
|
|
|
|
|
|
|
=head1 SYNOPSIS |
6
|
|
|
|
|
|
|
|
7
|
|
|
|
|
|
|
use AnyEvent; |
8
|
|
|
|
|
|
|
use AnyEvent::Fork; |
9
|
|
|
|
|
|
|
use AnyEvent::Fork::Pool; |
10
|
|
|
|
|
|
|
|
11
|
|
|
|
|
|
|
# all possible parameters shown, with default values |
12
|
|
|
|
|
|
|
my $pool = AnyEvent::Fork |
13
|
|
|
|
|
|
|
->new |
14
|
|
|
|
|
|
|
->require ("MyWorker") |
15
|
|
|
|
|
|
|
->AnyEvent::Fork::Pool::run ( |
16
|
|
|
|
|
|
|
"MyWorker::run", # the worker function |
17
|
|
|
|
|
|
|
|
18
|
|
|
|
|
|
|
# pool management |
19
|
|
|
|
|
|
|
max => 4, # absolute maximum # of processes |
20
|
|
|
|
|
|
|
idle => 0, # minimum # of idle processes |
21
|
|
|
|
|
|
|
load => 2, # queue at most this number of jobs per process |
22
|
|
|
|
|
|
|
start => 0.1, # wait this many seconds before starting a new process |
23
|
|
|
|
|
|
|
stop => 10, # wait this many seconds before stopping an idle process |
24
|
|
|
|
|
|
|
on_destroy => (my $finish = AE::cv), # called when object is destroyed |
25
|
|
|
|
|
|
|
|
26
|
|
|
|
|
|
|
# parameters passed to AnyEvent::Fork::RPC |
27
|
|
|
|
|
|
|
async => 0, |
28
|
|
|
|
|
|
|
on_error => sub { die "FATAL: $_[0]\n" }, |
29
|
|
|
|
|
|
|
on_event => sub { my @ev = @_ }, |
30
|
|
|
|
|
|
|
init => "MyWorker::init", |
31
|
|
|
|
|
|
|
serialiser => $AnyEvent::Fork::RPC::STRING_SERIALISER, |
32
|
|
|
|
|
|
|
); |
33
|
|
|
|
|
|
|
|
34
|
|
|
|
|
|
|
for (1..10) { |
35
|
|
|
|
|
|
|
$pool->(doit => $_, sub { |
36
|
|
|
|
|
|
|
print "MyWorker::run returned @_\n"; |
37
|
|
|
|
|
|
|
}); |
38
|
|
|
|
|
|
|
} |
39
|
|
|
|
|
|
|
|
40
|
|
|
|
|
|
|
undef $pool; |
41
|
|
|
|
|
|
|
|
42
|
|
|
|
|
|
|
$finish->recv; |
43
|
|
|
|
|
|
|
|
44
|
|
|
|
|
|
|
=head1 DESCRIPTION |
45
|
|
|
|
|
|
|
|
46
|
|
|
|
|
|
|
This module uses processes created via L (or |
47
|
|
|
|
|
|
|
L) and the RPC protocol implement in |
48
|
|
|
|
|
|
|
L to create a load-balanced pool of processes that |
49
|
|
|
|
|
|
|
handles jobs. |
50
|
|
|
|
|
|
|
|
51
|
|
|
|
|
|
|
Understanding of L is helpful but not critical to be able |
52
|
|
|
|
|
|
|
to use this module, but a thorough understanding of L |
53
|
|
|
|
|
|
|
is, as it defines the actual API that needs to be implemented in the |
54
|
|
|
|
|
|
|
worker processes. |
55
|
|
|
|
|
|
|
|
56
|
|
|
|
|
|
|
=head1 PARENT USAGE |
57
|
|
|
|
|
|
|
|
58
|
|
|
|
|
|
|
To create a pool, you first have to create a L object - |
59
|
|
|
|
|
|
|
this object becomes your template process. Whenever a new worker process |
60
|
|
|
|
|
|
|
is needed, it is forked from this template process. Then you need to |
61
|
|
|
|
|
|
|
"hand off" this template process to the C module by |
62
|
|
|
|
|
|
|
calling its run method on it: |
63
|
|
|
|
|
|
|
|
64
|
|
|
|
|
|
|
my $template = AnyEvent::Fork |
65
|
|
|
|
|
|
|
->new |
66
|
|
|
|
|
|
|
->require ("SomeModule", "MyWorkerModule"); |
67
|
|
|
|
|
|
|
|
68
|
|
|
|
|
|
|
my $pool = $template->AnyEvent::Fork::Pool::run ("MyWorkerModule::myfunction"); |
69
|
|
|
|
|
|
|
|
70
|
|
|
|
|
|
|
The pool "object" is not a regular Perl object, but a code reference that |
71
|
|
|
|
|
|
|
you can call and that works roughly like calling the worker function |
72
|
|
|
|
|
|
|
directly, except that it returns nothing but instead you need to specify a |
73
|
|
|
|
|
|
|
callback to be invoked once results are in: |
74
|
|
|
|
|
|
|
|
75
|
|
|
|
|
|
|
$pool->(1, 2, 3, sub { warn "myfunction(1,2,3) returned @_" }); |
76
|
|
|
|
|
|
|
|
77
|
|
|
|
|
|
|
=over 4 |
78
|
|
|
|
|
|
|
|
79
|
|
|
|
|
|
|
=cut |
80
|
|
|
|
|
|
|
|
81
|
|
|
|
|
|
|
package AnyEvent::Fork::Pool; |
82
|
|
|
|
|
|
|
|
83
|
2
|
|
|
2
|
|
21239
|
use common::sense; |
|
2
|
|
|
|
|
13
|
|
|
2
|
|
|
|
|
10
|
|
84
|
|
|
|
|
|
|
|
85
|
2
|
|
|
2
|
|
112
|
use Scalar::Util (); |
|
2
|
|
|
|
|
3
|
|
|
2
|
|
|
|
|
36
|
|
86
|
|
|
|
|
|
|
|
87
|
2
|
|
|
2
|
|
650
|
use Guard (); |
|
2
|
|
|
|
|
640
|
|
|
2
|
|
|
|
|
38
|
|
88
|
2
|
|
|
2
|
|
1157
|
use Array::Heap (); |
|
2
|
|
|
|
|
1776
|
|
|
2
|
|
|
|
|
53
|
|
89
|
|
|
|
|
|
|
|
90
|
2
|
|
|
2
|
|
1380
|
use AnyEvent; |
|
2
|
|
|
|
|
7334
|
|
|
2
|
|
|
|
|
73
|
|
91
|
2
|
|
|
2
|
|
1589
|
use AnyEvent::Fork::RPC; |
|
2
|
|
|
|
|
5827
|
|
|
2
|
|
|
|
|
3601
|
|
92
|
|
|
|
|
|
|
|
93
|
|
|
|
|
|
|
# these are used for the first and last argument of events |
94
|
|
|
|
|
|
|
# in the hope of not colliding. yes, I don't like it either, |
95
|
|
|
|
|
|
|
# but didn't come up with an obviously better alternative. |
96
|
|
|
|
|
|
|
my $magic0 = ':t6Z@HK1N%Dx@_7?=~-7NQgWDdAs6a,jFN=wLO0*jD*1%P'; |
97
|
|
|
|
|
|
|
my $magic1 = '<~53rexz.U`!]X[A235^"fyEoiTF\T~oH1l/N6+Djep9b~bI9`\1x%B~vWO1q*'; |
98
|
|
|
|
|
|
|
|
99
|
|
|
|
|
|
|
our $VERSION = 1.2; |
100
|
|
|
|
|
|
|
|
101
|
|
|
|
|
|
|
=item my $pool = AnyEvent::Fork::Pool::run $fork, $function, [key => value...] |
102
|
|
|
|
|
|
|
|
103
|
|
|
|
|
|
|
The traditional way to call the pool creation function. But it is way |
104
|
|
|
|
|
|
|
cooler to call it in the following way: |
105
|
|
|
|
|
|
|
|
106
|
|
|
|
|
|
|
=item my $pool = $fork->AnyEvent::Fork::Pool::run ($function, [key => value...]) |
107
|
|
|
|
|
|
|
|
108
|
|
|
|
|
|
|
Creates a new pool object with the specified C<$function> as function |
109
|
|
|
|
|
|
|
(name) to call for each request. The pool uses the C<$fork> object as the |
110
|
|
|
|
|
|
|
template when creating worker processes. |
111
|
|
|
|
|
|
|
|
112
|
|
|
|
|
|
|
You can supply your own template process, or tell C |
113
|
|
|
|
|
|
|
to create one. |
114
|
|
|
|
|
|
|
|
115
|
|
|
|
|
|
|
A relatively large number of key/value pairs can be specified to influence |
116
|
|
|
|
|
|
|
the behaviour. They are grouped into the categories "pool management", |
117
|
|
|
|
|
|
|
"template process" and "rpc parameters". |
118
|
|
|
|
|
|
|
|
119
|
|
|
|
|
|
|
=over 4 |
120
|
|
|
|
|
|
|
|
121
|
|
|
|
|
|
|
=item Pool Management |
122
|
|
|
|
|
|
|
|
123
|
|
|
|
|
|
|
The pool consists of a certain number of worker processes. These options |
124
|
|
|
|
|
|
|
decide how many of these processes exist and when they are started and |
125
|
|
|
|
|
|
|
stopped. |
126
|
|
|
|
|
|
|
|
127
|
|
|
|
|
|
|
The worker pool is dynamically resized, according to (perceived :) |
128
|
|
|
|
|
|
|
load. The minimum size is given by the C parameter and the maximum |
129
|
|
|
|
|
|
|
size is given by the C parameter. A new worker is started every |
130
|
|
|
|
|
|
|
C seconds at most, and an idle worker is stopped at most every |
131
|
|
|
|
|
|
|
C second. |
132
|
|
|
|
|
|
|
|
133
|
|
|
|
|
|
|
You can specify the amount of jobs sent to a worker concurrently using the |
134
|
|
|
|
|
|
|
C parameter. |
135
|
|
|
|
|
|
|
|
136
|
|
|
|
|
|
|
=over 4 |
137
|
|
|
|
|
|
|
|
138
|
|
|
|
|
|
|
=item idle => $count (default: 0) |
139
|
|
|
|
|
|
|
|
140
|
|
|
|
|
|
|
The minimum amount of idle processes in the pool - when there are fewer |
141
|
|
|
|
|
|
|
than this many idle workers, C will try to start new |
142
|
|
|
|
|
|
|
ones, subject to the limits set by C and C. |
143
|
|
|
|
|
|
|
|
144
|
|
|
|
|
|
|
This is also the initial amount of workers in the pool. The default of |
145
|
|
|
|
|
|
|
zero means that the pool starts empty and can shrink back to zero workers |
146
|
|
|
|
|
|
|
over time. |
147
|
|
|
|
|
|
|
|
148
|
|
|
|
|
|
|
=item max => $count (default: 4) |
149
|
|
|
|
|
|
|
|
150
|
|
|
|
|
|
|
The maximum number of processes in the pool, in addition to the template |
151
|
|
|
|
|
|
|
process. C will never have more than this number of |
152
|
|
|
|
|
|
|
worker processes, although there can be more temporarily when a worker is |
153
|
|
|
|
|
|
|
shut down and hasn't exited yet. |
154
|
|
|
|
|
|
|
|
155
|
|
|
|
|
|
|
=item load => $count (default: 2) |
156
|
|
|
|
|
|
|
|
157
|
|
|
|
|
|
|
The maximum number of concurrent jobs sent to a single worker process. |
158
|
|
|
|
|
|
|
|
159
|
|
|
|
|
|
|
Jobs that cannot be sent to a worker immediately (because all workers are |
160
|
|
|
|
|
|
|
busy) will be queued until a worker is available. |
161
|
|
|
|
|
|
|
|
162
|
|
|
|
|
|
|
Setting this low improves latency. For example, at C<1>, every job that |
163
|
|
|
|
|
|
|
is sent to a worker is sent to a completely idle worker that doesn't run |
164
|
|
|
|
|
|
|
any other jobs. The downside is that throughput is reduced - a worker that |
165
|
|
|
|
|
|
|
finishes a job needs to wait for a new job from the parent. |
166
|
|
|
|
|
|
|
|
167
|
|
|
|
|
|
|
The default of C<2> is usually a good compromise. |
168
|
|
|
|
|
|
|
|
169
|
|
|
|
|
|
|
=item start => $seconds (default: 0.1) |
170
|
|
|
|
|
|
|
|
171
|
|
|
|
|
|
|
When there are fewer than C workers (or all workers are completely |
172
|
|
|
|
|
|
|
busy), then a timer is started. If the timer elapses and there are still |
173
|
|
|
|
|
|
|
jobs that cannot be queued to a worker, a new worker is started. |
174
|
|
|
|
|
|
|
|
175
|
|
|
|
|
|
|
This sets the minimum time that all workers must be busy before a new |
176
|
|
|
|
|
|
|
worker is started. Or, put differently, the minimum delay between starting |
177
|
|
|
|
|
|
|
new workers. |
178
|
|
|
|
|
|
|
|
179
|
|
|
|
|
|
|
The delay is small by default, which means new workers will be started |
180
|
|
|
|
|
|
|
relatively quickly. A delay of C<0> is possible, and ensures that the pool |
181
|
|
|
|
|
|
|
will grow as quickly as possible under load. |
182
|
|
|
|
|
|
|
|
183
|
|
|
|
|
|
|
Non-zero values are useful to avoid "exploding" a pool because a lot of |
184
|
|
|
|
|
|
|
jobs are queued in an instant. |
185
|
|
|
|
|
|
|
|
186
|
|
|
|
|
|
|
Higher values are often useful to improve efficiency at the cost of |
187
|
|
|
|
|
|
|
latency - when fewer processes can do the job over time, starting more and |
188
|
|
|
|
|
|
|
more is not necessarily going to help. |
189
|
|
|
|
|
|
|
|
190
|
|
|
|
|
|
|
=item stop => $seconds (default: 10) |
191
|
|
|
|
|
|
|
|
192
|
|
|
|
|
|
|
When a worker has no jobs to execute it becomes idle. An idle worker that |
193
|
|
|
|
|
|
|
hasn't executed a job within this amount of time will be stopped, unless |
194
|
|
|
|
|
|
|
the other parameters say otherwise. |
195
|
|
|
|
|
|
|
|
196
|
|
|
|
|
|
|
Setting this to a very high value means that workers stay around longer, |
197
|
|
|
|
|
|
|
even when they have nothing to do, which can be good as they don't have to |
198
|
|
|
|
|
|
|
be started on the netx load spike again. |
199
|
|
|
|
|
|
|
|
200
|
|
|
|
|
|
|
Setting this to a lower value can be useful to avoid memory or simply |
201
|
|
|
|
|
|
|
process table wastage. |
202
|
|
|
|
|
|
|
|
203
|
|
|
|
|
|
|
Usually, setting this to a time longer than the time between load spikes |
204
|
|
|
|
|
|
|
is best - if you expect a lot of requests every minute and little work |
205
|
|
|
|
|
|
|
in between, setting this to longer than a minute avoids having to stop |
206
|
|
|
|
|
|
|
and start workers. On the other hand, you have to ask yourself if letting |
207
|
|
|
|
|
|
|
workers run idle is a good use of your resources. Try to find a good |
208
|
|
|
|
|
|
|
balance between resource usage of your workers and the time to start new |
209
|
|
|
|
|
|
|
workers - the processes created by L itself is fats at |
210
|
|
|
|
|
|
|
creating workers while not using much memory for them, so most of the |
211
|
|
|
|
|
|
|
overhead is likely from your own code. |
212
|
|
|
|
|
|
|
|
213
|
|
|
|
|
|
|
=item on_destroy => $callback->() (default: none) |
214
|
|
|
|
|
|
|
|
215
|
|
|
|
|
|
|
When a pool object goes out of scope, the outstanding requests are still |
216
|
|
|
|
|
|
|
handled till completion. Only after handling all jobs will the workers |
217
|
|
|
|
|
|
|
be destroyed (and also the template process if it isn't referenced |
218
|
|
|
|
|
|
|
otherwise). |
219
|
|
|
|
|
|
|
|
220
|
|
|
|
|
|
|
To find out when a pool I has finished its work, you can set this |
221
|
|
|
|
|
|
|
callback, which will be called when the pool has been destroyed. |
222
|
|
|
|
|
|
|
|
223
|
|
|
|
|
|
|
=back |
224
|
|
|
|
|
|
|
|
225
|
|
|
|
|
|
|
=item AnyEvent::Fork::RPC Parameters |
226
|
|
|
|
|
|
|
|
227
|
|
|
|
|
|
|
These parameters are all passed more or less directly to |
228
|
|
|
|
|
|
|
L. They are only briefly mentioned here, for |
229
|
|
|
|
|
|
|
their full documentation please refer to the L |
230
|
|
|
|
|
|
|
documentation. Also, the default values mentioned here are only documented |
231
|
|
|
|
|
|
|
as a best effort - the L documentation is binding. |
232
|
|
|
|
|
|
|
|
233
|
|
|
|
|
|
|
=over 4 |
234
|
|
|
|
|
|
|
|
235
|
|
|
|
|
|
|
=item async => $boolean (default: 0) |
236
|
|
|
|
|
|
|
|
237
|
|
|
|
|
|
|
Whether to use the synchronous or asynchronous RPC backend. |
238
|
|
|
|
|
|
|
|
239
|
|
|
|
|
|
|
=item on_error => $callback->($message) (default: die with message) |
240
|
|
|
|
|
|
|
|
241
|
|
|
|
|
|
|
The callback to call on any (fatal) errors. |
242
|
|
|
|
|
|
|
|
243
|
|
|
|
|
|
|
=item on_event => $callback->(...) (default: C, unlike L) |
244
|
|
|
|
|
|
|
|
245
|
|
|
|
|
|
|
The callback to invoke on events. |
246
|
|
|
|
|
|
|
|
247
|
|
|
|
|
|
|
=item init => $initfunction (default: none) |
248
|
|
|
|
|
|
|
|
249
|
|
|
|
|
|
|
The function to call in the child, once before handling requests. |
250
|
|
|
|
|
|
|
|
251
|
|
|
|
|
|
|
=item serialiser => $serialiser (defailt: $AnyEvent::Fork::RPC::STRING_SERIALISER) |
252
|
|
|
|
|
|
|
|
253
|
|
|
|
|
|
|
The serialiser to use. |
254
|
|
|
|
|
|
|
|
255
|
|
|
|
|
|
|
=back |
256
|
|
|
|
|
|
|
|
257
|
|
|
|
|
|
|
=back |
258
|
|
|
|
|
|
|
|
259
|
|
|
|
|
|
|
=cut |
260
|
|
|
|
|
|
|
|
261
|
|
|
|
|
|
|
sub run { |
262
|
1
|
|
|
1
|
1
|
5562
|
my ($template, $function, %arg) = @_; |
263
|
|
|
|
|
|
|
|
264
|
1
|
|
50
|
|
|
8
|
my $max = $arg{max} || 4; |
265
|
|
|
|
|
|
|
my $idle = $arg{idle} || 0, |
266
|
|
|
|
|
|
|
my $load = $arg{load} || 2, |
267
|
|
|
|
|
|
|
my $start = $arg{start} || 0.1, |
268
|
|
|
|
|
|
|
my $stop = $arg{stop} || 10, |
269
|
0
|
|
|
0
|
|
|
my $on_event = $arg{on_event} || sub { }, |
270
|
1
|
|
50
|
|
|
23
|
my $on_destroy = $arg{on_destroy}; |
|
|
|
50
|
|
|
|
|
|
|
|
50
|
|
|
|
|
|
|
|
50
|
|
|
|
|
|
|
|
50
|
|
|
|
|
271
|
|
|
|
|
|
|
|
272
|
1
|
|
|
|
|
99
|
my @rpc = ( |
273
|
|
|
|
|
|
|
async => $arg{async}, |
274
|
|
|
|
|
|
|
init => $arg{init}, |
275
|
|
|
|
|
|
|
serialiser => delete $arg{serialiser}, |
276
|
|
|
|
|
|
|
on_error => $arg{on_error}, |
277
|
|
|
|
|
|
|
); |
278
|
|
|
|
|
|
|
|
279
|
1
|
|
|
|
|
3
|
my (@pool, @queue, $nidle, $start_w, $stop_w, $shutdown); |
280
|
0
|
|
|
|
|
0
|
my ($start_worker, $stop_worker, $want_start, $want_stop, $scheduler); |
281
|
|
|
|
|
|
|
|
282
|
|
|
|
|
|
|
my $destroy_guard = Guard::guard { |
283
|
1
|
50
|
|
1
|
|
77
|
$on_destroy->() |
284
|
|
|
|
|
|
|
if $on_destroy; |
285
|
1
|
|
|
|
|
8
|
}; |
286
|
|
|
|
|
|
|
|
287
|
1
|
50
|
|
|
|
8
|
$template |
288
|
|
|
|
|
|
|
->require ("AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync")) |
289
|
|
|
|
|
|
|
->eval (' |
290
|
|
|
|
|
|
|
my ($magic0, $magic1) = @_; |
291
|
|
|
|
|
|
|
sub AnyEvent::Fork::Pool::retire() { |
292
|
|
|
|
|
|
|
AnyEvent::Fork::RPC::event $magic0, "quit", $magic1; |
293
|
|
|
|
|
|
|
} |
294
|
|
|
|
|
|
|
', $magic0, $magic1) |
295
|
|
|
|
|
|
|
; |
296
|
|
|
|
|
|
|
|
297
|
|
|
|
|
|
|
$start_worker = sub { |
298
|
11
|
|
|
11
|
|
58
|
my $proc = [0, 0, undef]; # load, index, rpc |
299
|
|
|
|
|
|
|
|
300
|
|
|
|
|
|
|
$proc->[2] = $template |
301
|
|
|
|
|
|
|
->fork |
302
|
|
|
|
|
|
|
->AnyEvent::Fork::RPC::run ($function, |
303
|
|
|
|
|
|
|
@rpc, |
304
|
|
|
|
|
|
|
on_event => sub { |
305
|
19
|
50
|
33
|
|
|
404631
|
if (@_ == 3 && $_[0] eq $magic0 && $_[2] eq $magic1) { |
|
|
|
33
|
|
|
|
|
306
|
19
|
|
|
|
|
28
|
$destroy_guard if 0; # keep it alive |
307
|
|
|
|
|
|
|
|
308
|
19
|
50
|
|
|
|
104
|
$_[1] eq "quit" and $stop_worker->($proc); |
309
|
19
|
|
|
|
|
201
|
return; |
310
|
|
|
|
|
|
|
} |
311
|
|
|
|
|
|
|
|
312
|
0
|
|
|
|
|
0
|
&$on_event; |
313
|
|
|
|
|
|
|
}, |
314
|
|
|
|
|
|
|
) |
315
|
11
|
|
|
|
|
124
|
; |
316
|
|
|
|
|
|
|
|
317
|
11
|
|
|
|
|
5271
|
++$nidle; |
318
|
11
|
|
|
|
|
63
|
Array::Heap::push_heap_idx @pool, $proc; |
319
|
|
|
|
|
|
|
|
320
|
11
|
|
|
|
|
55
|
Scalar::Util::weaken $proc; |
321
|
1
|
|
|
|
|
41
|
}; |
322
|
|
|
|
|
|
|
|
323
|
|
|
|
|
|
|
$stop_worker = sub { |
324
|
19
|
|
|
19
|
|
33
|
my $proc = shift; |
325
|
|
|
|
|
|
|
|
326
|
19
|
50
|
|
|
|
119
|
$proc->[0] |
327
|
|
|
|
|
|
|
or --$nidle; |
328
|
|
|
|
|
|
|
|
329
|
19
|
100
|
|
|
|
105
|
Array::Heap::splice_heap_idx @pool, $proc->[1] |
330
|
|
|
|
|
|
|
if defined $proc->[1]; |
331
|
|
|
|
|
|
|
|
332
|
19
|
|
|
|
|
206
|
@$proc = 0; # tell others to leave it be |
333
|
1
|
|
|
|
|
4
|
}; |
334
|
|
|
|
|
|
|
|
335
|
|
|
|
|
|
|
$want_start = sub { |
336
|
60
|
|
|
60
|
|
70
|
undef $stop_w; |
337
|
|
|
|
|
|
|
|
338
|
|
|
|
|
|
|
$start_w ||= AE::timer $start, $start, sub { |
339
|
9
|
100
|
66
|
|
|
347423
|
if (($nidle < $idle || @queue) && @pool < $max) { |
|
|
|
66
|
|
|
|
|
340
|
8
|
|
|
|
|
45
|
$start_worker->(); |
341
|
8
|
|
|
|
|
25
|
$scheduler->(); |
342
|
|
|
|
|
|
|
} else { |
343
|
1
|
|
|
|
|
54
|
undef $start_w; |
344
|
|
|
|
|
|
|
} |
345
|
60
|
|
66
|
|
|
138
|
}; |
346
|
1
|
|
|
|
|
4
|
}; |
347
|
|
|
|
|
|
|
|
348
|
|
|
|
|
|
|
$want_stop = sub { |
349
|
|
|
|
|
|
|
$stop_w ||= AE::timer $stop, $stop, sub { |
350
|
0
|
0
|
|
|
|
0
|
$stop_worker->($pool[0]) |
351
|
|
|
|
|
|
|
if $nidle; |
352
|
|
|
|
|
|
|
|
353
|
0
|
0
|
|
|
|
0
|
undef $stop_w |
354
|
|
|
|
|
|
|
if $nidle <= $idle; |
355
|
0
|
|
0
|
0
|
|
0
|
}; |
356
|
1
|
|
|
|
|
4
|
}; |
357
|
|
|
|
|
|
|
|
358
|
|
|
|
|
|
|
$scheduler = sub { |
359
|
69
|
100
|
|
69
|
|
171
|
if (@queue) { |
|
|
50
|
|
|
|
|
|
360
|
63
|
|
|
|
|
142
|
while (@queue) { |
361
|
90
|
100
|
|
|
|
1207
|
@pool or $start_worker->(); |
362
|
|
|
|
|
|
|
|
363
|
90
|
|
|
|
|
105
|
my $proc = $pool[0]; |
364
|
|
|
|
|
|
|
|
365
|
90
|
100
|
|
|
|
217
|
if ($proc->[0] < $load) { |
366
|
|
|
|
|
|
|
# found free worker, increase load |
367
|
30
|
100
|
|
|
|
128
|
unless ($proc->[0]++) { |
368
|
|
|
|
|
|
|
# worker became busy |
369
|
11
|
50
|
|
|
|
33
|
--$nidle |
370
|
|
|
|
|
|
|
or undef $stop_w; |
371
|
|
|
|
|
|
|
|
372
|
11
|
50
|
33
|
|
|
36
|
$want_start->() |
373
|
|
|
|
|
|
|
if $nidle < $idle && @pool < $max; |
374
|
|
|
|
|
|
|
} |
375
|
|
|
|
|
|
|
|
376
|
30
|
|
|
|
|
102
|
Array::Heap::adjust_heap_idx @pool, 0; |
377
|
|
|
|
|
|
|
|
378
|
30
|
|
|
|
|
68
|
my $job = shift @queue; |
379
|
30
|
|
|
|
|
55
|
my $ocb = pop @$job; |
380
|
|
|
|
|
|
|
|
381
|
|
|
|
|
|
|
$proc->[2]->(@$job, sub { |
382
|
|
|
|
|
|
|
# reduce load |
383
|
30
|
50
|
33
|
|
|
215453
|
--$proc->[0] # worker still busy? |
384
|
|
|
|
|
|
|
or ++$nidle > $idle # not too many idle processes? |
385
|
|
|
|
|
|
|
or $want_stop->(); |
386
|
|
|
|
|
|
|
|
387
|
30
|
100
|
|
|
|
150
|
Array::Heap::adjust_heap_idx @pool, $proc->[1] |
388
|
|
|
|
|
|
|
if defined $proc->[1]; |
389
|
|
|
|
|
|
|
|
390
|
30
|
|
|
|
|
96
|
&$ocb; |
391
|
|
|
|
|
|
|
|
392
|
30
|
|
|
|
|
3934
|
$scheduler->(); |
393
|
30
|
|
|
|
|
235
|
}); |
394
|
|
|
|
|
|
|
} else { |
395
|
60
|
50
|
|
|
|
189
|
$want_start->() |
396
|
|
|
|
|
|
|
unless @pool >= $max; |
397
|
|
|
|
|
|
|
|
398
|
60
|
|
|
|
|
372
|
last; |
399
|
|
|
|
|
|
|
} |
400
|
|
|
|
|
|
|
} |
401
|
|
|
|
|
|
|
} elsif ($shutdown) { |
402
|
|
|
|
|
|
|
undef $_->[2] |
403
|
6
|
|
|
|
|
43
|
for @pool; |
404
|
|
|
|
|
|
|
|
405
|
6
|
|
|
|
|
45
|
undef $start_w; |
406
|
6
|
|
|
|
|
9
|
undef $start_worker; # frees $destroy_guard reference |
407
|
|
|
|
|
|
|
|
408
|
6
|
|
|
|
|
97
|
$stop_worker->($pool[0]) |
409
|
|
|
|
|
|
|
while $nidle; |
410
|
|
|
|
|
|
|
} |
411
|
1
|
|
|
|
|
4
|
}; |
412
|
|
|
|
|
|
|
|
413
|
|
|
|
|
|
|
my $shutdown_guard = Guard::guard { |
414
|
1
|
|
|
1
|
|
16
|
$shutdown = 1; |
415
|
1
|
|
|
|
|
2
|
$scheduler->(); |
416
|
1
|
|
|
|
|
5
|
}; |
417
|
|
|
|
|
|
|
|
418
|
1
|
|
|
|
|
4
|
$start_worker->() |
419
|
|
|
|
|
|
|
while @pool < $idle; |
420
|
|
|
|
|
|
|
|
421
|
|
|
|
|
|
|
sub { |
422
|
30
|
|
|
30
|
|
184
|
$shutdown_guard if 0; # keep it alive |
423
|
|
|
|
|
|
|
|
424
|
30
|
100
|
|
|
|
38
|
$start_worker->() |
425
|
|
|
|
|
|
|
unless @pool; |
426
|
|
|
|
|
|
|
|
427
|
30
|
|
|
|
|
48
|
push @queue, [@_]; |
428
|
30
|
|
|
|
|
32
|
$scheduler->(); |
429
|
|
|
|
|
|
|
} |
430
|
1
|
|
|
|
|
6
|
} |
431
|
|
|
|
|
|
|
|
432
|
|
|
|
|
|
|
=item $pool->(..., $cb->(...)) |
433
|
|
|
|
|
|
|
|
434
|
|
|
|
|
|
|
Call the RPC function of a worker with the given arguments, and when the |
435
|
|
|
|
|
|
|
worker is done, call the C<$cb> with the results, just like calling the |
436
|
|
|
|
|
|
|
RPC object durectly - see the L documentation for |
437
|
|
|
|
|
|
|
details on the RPC API. |
438
|
|
|
|
|
|
|
|
439
|
|
|
|
|
|
|
If there is no free worker, the call will be queued until a worker becomes |
440
|
|
|
|
|
|
|
available. |
441
|
|
|
|
|
|
|
|
442
|
|
|
|
|
|
|
Note that there can be considerable time between calling this method and |
443
|
|
|
|
|
|
|
the call actually being executed. During this time, the parameters passed |
444
|
|
|
|
|
|
|
to this function are effectively read-only - modifying them after the call |
445
|
|
|
|
|
|
|
and before the callback is invoked causes undefined behaviour. |
446
|
|
|
|
|
|
|
|
447
|
|
|
|
|
|
|
=cut |
448
|
|
|
|
|
|
|
|
449
|
|
|
|
|
|
|
=item $cpus = AnyEvent::Fork::Pool::ncpu [$default_cpus] |
450
|
|
|
|
|
|
|
|
451
|
|
|
|
|
|
|
=item ($cpus, $eus) = AnyEvent::Fork::Pool::ncpu [$default_cpus] |
452
|
|
|
|
|
|
|
|
453
|
|
|
|
|
|
|
Tries to detect the number of CPUs (C<$cpus> often called CPU cores |
454
|
|
|
|
|
|
|
nowadays) and execution units (C<$eus>) which include e.g. extra |
455
|
|
|
|
|
|
|
hyperthreaded units). When C<$cpus> cannot be determined reliably, |
456
|
|
|
|
|
|
|
C<$default_cpus> is returned for both values, or C<1> if it is missing. |
457
|
|
|
|
|
|
|
|
458
|
|
|
|
|
|
|
For normal CPU bound uses, it is wise to have as many worker processes |
459
|
|
|
|
|
|
|
as CPUs in the system (C<$cpus>), if nothing else uses the CPU. Using |
460
|
|
|
|
|
|
|
hyperthreading is usually detrimental to performance, but in those rare |
461
|
|
|
|
|
|
|
cases where that really helps it might be beneficial to use more workers |
462
|
|
|
|
|
|
|
(C<$eus>). |
463
|
|
|
|
|
|
|
|
464
|
|
|
|
|
|
|
Currently, F is parsed on GNU/Linux systems for both |
465
|
|
|
|
|
|
|
C<$cpus> and C<$eus>, and on {Free,Net,Open}BSD, F is |
466
|
|
|
|
|
|
|
used for C<$cpus>. |
467
|
|
|
|
|
|
|
|
468
|
|
|
|
|
|
|
Example: create a worker pool with as many workers as CPU cores, or C<2>, |
469
|
|
|
|
|
|
|
if the actual number could not be determined. |
470
|
|
|
|
|
|
|
|
471
|
|
|
|
|
|
|
$fork->AnyEvent::Fork::Pool::run ("myworker::function", |
472
|
|
|
|
|
|
|
max => (scalar AnyEvent::Fork::Pool::ncpu 2), |
473
|
|
|
|
|
|
|
); |
474
|
|
|
|
|
|
|
|
475
|
|
|
|
|
|
|
=cut |
476
|
|
|
|
|
|
|
|
477
|
|
|
|
|
|
|
BEGIN { |
478
|
2
|
50
|
0
|
2
|
|
14
|
if ($^O eq "linux") { |
|
|
0
|
0
|
|
|
|
|
479
|
|
|
|
|
|
|
*ncpu = sub(;$) { |
480
|
0
|
|
|
0
|
|
0
|
my ($cpus, $eus); |
481
|
|
|
|
|
|
|
|
482
|
0
|
0
|
|
|
|
0
|
if (open my $fh, "<", "/proc/cpuinfo") { |
483
|
0
|
|
|
|
|
0
|
my %id; |
484
|
|
|
|
|
|
|
|
485
|
0
|
|
|
|
|
0
|
while (<$fh>) { |
486
|
0
|
0
|
|
|
|
0
|
if (/^core id\s*:\s*(\d+)/) { |
487
|
0
|
|
|
|
|
0
|
++$eus; |
488
|
0
|
|
|
|
|
0
|
undef $id{$1}; |
489
|
|
|
|
|
|
|
} |
490
|
|
|
|
|
|
|
} |
491
|
|
|
|
|
|
|
|
492
|
0
|
|
|
|
|
0
|
$cpus = scalar keys %id; |
493
|
|
|
|
|
|
|
} else { |
494
|
0
|
0
|
|
|
|
0
|
$cpus = $eus = @_ ? shift : 1; |
495
|
|
|
|
|
|
|
} |
496
|
0
|
0
|
|
|
|
0
|
wantarray ? ($cpus, $eus) : $cpus |
497
|
2
|
|
|
|
|
117
|
}; |
498
|
|
|
|
|
|
|
} elsif ($^O eq "freebsd" || $^O eq "netbsd" || $^O eq "openbsd") { |
499
|
|
|
|
|
|
|
*ncpu = sub(;$) { |
500
|
0
|
|
0
|
|
|
0
|
my $cpus = qx * 1 |
501
|
|
|
|
|
|
|
|| (@_ ? shift : 1); |
502
|
0
|
0
|
|
|
|
0
|
wantarray ? ($cpus, $cpus) : $cpus |
503
|
0
|
|
|
|
|
0
|
}; |
504
|
|
|
|
|
|
|
} else { |
505
|
|
|
|
|
|
|
*ncpu = sub(;$) { |
506
|
0
|
0
|
|
|
|
0
|
my $cpus = @_ ? shift : 1; |
507
|
0
|
0
|
|
|
|
0
|
wantarray ? ($cpus, $cpus) : $cpus |
508
|
0
|
|
|
|
|
0
|
}; |
509
|
|
|
|
|
|
|
} |
510
|
|
|
|
|
|
|
} |
511
|
|
|
|
|
|
|
|
512
|
|
|
|
|
|
|
=back |
513
|
|
|
|
|
|
|
|
514
|
|
|
|
|
|
|
=head1 CHILD USAGE |
515
|
|
|
|
|
|
|
|
516
|
|
|
|
|
|
|
In addition to the L API, this module implements one |
517
|
|
|
|
|
|
|
more child-side function: |
518
|
|
|
|
|
|
|
|
519
|
|
|
|
|
|
|
=over 4 |
520
|
|
|
|
|
|
|
|
521
|
|
|
|
|
|
|
=item AnyEvent::Fork::Pool::retire () |
522
|
|
|
|
|
|
|
|
523
|
|
|
|
|
|
|
This function sends an event to the parent process to request retirement: |
524
|
|
|
|
|
|
|
the worker is removed from the pool and no new jobs will be sent to it, |
525
|
|
|
|
|
|
|
but it still has to handle the jobs that are already queued. |
526
|
|
|
|
|
|
|
|
527
|
|
|
|
|
|
|
The parentheses are part of the syntax: the function usually isn't defined |
528
|
|
|
|
|
|
|
when you compile your code (because that happens I handing the |
529
|
|
|
|
|
|
|
template process over to C, so you need the |
530
|
|
|
|
|
|
|
empty parentheses to tell Perl that the function is indeed a function. |
531
|
|
|
|
|
|
|
|
532
|
|
|
|
|
|
|
Retiring a worker can be useful to gracefully shut it down when the worker |
533
|
|
|
|
|
|
|
deems this useful. For example, after executing a job, it could check the |
534
|
|
|
|
|
|
|
process size or the number of jobs handled so far, and if either is too |
535
|
|
|
|
|
|
|
high, the worker could request to be retired, to avoid memory leaks to |
536
|
|
|
|
|
|
|
accumulate. |
537
|
|
|
|
|
|
|
|
538
|
|
|
|
|
|
|
Example: retire a worker after it has handled roughly 100 requests. It |
539
|
|
|
|
|
|
|
doesn't matter whether you retire at the beginning or end of your request, |
540
|
|
|
|
|
|
|
as the worker will continue to handle some outstanding requests. Likewise, |
541
|
|
|
|
|
|
|
it's ok to call retire multiple times. |
542
|
|
|
|
|
|
|
|
543
|
|
|
|
|
|
|
my $count = 0; |
544
|
|
|
|
|
|
|
|
545
|
|
|
|
|
|
|
sub my::worker { |
546
|
|
|
|
|
|
|
|
547
|
|
|
|
|
|
|
++$count == 100 |
548
|
|
|
|
|
|
|
and AnyEvent::Fork::Pool::retire (); |
549
|
|
|
|
|
|
|
|
550
|
|
|
|
|
|
|
... normal code goes here |
551
|
|
|
|
|
|
|
} |
552
|
|
|
|
|
|
|
|
553
|
|
|
|
|
|
|
=back |
554
|
|
|
|
|
|
|
|
555
|
|
|
|
|
|
|
=head1 POOL PARAMETERS RECIPES |
556
|
|
|
|
|
|
|
|
557
|
|
|
|
|
|
|
This section describes some recipes for pool parameters. These are mostly |
558
|
|
|
|
|
|
|
meant for the synchronous RPC backend, as the asynchronous RPC backend |
559
|
|
|
|
|
|
|
changes the rules considerably, making workers themselves responsible for |
560
|
|
|
|
|
|
|
their scheduling. |
561
|
|
|
|
|
|
|
|
562
|
|
|
|
|
|
|
=over 4 |
563
|
|
|
|
|
|
|
|
564
|
|
|
|
|
|
|
=item low latency - set load = 1 |
565
|
|
|
|
|
|
|
|
566
|
|
|
|
|
|
|
If you need a deterministic low latency, you should set the C |
567
|
|
|
|
|
|
|
parameter to C<1>. This ensures that never more than one job is sent to |
568
|
|
|
|
|
|
|
each worker. This avoids having to wait for a previous job to finish. |
569
|
|
|
|
|
|
|
|
570
|
|
|
|
|
|
|
This makes most sense with the synchronous (default) backend, as the |
571
|
|
|
|
|
|
|
asynchronous backend can handle multiple requests concurrently. |
572
|
|
|
|
|
|
|
|
573
|
|
|
|
|
|
|
=item lowest latency - set load = 1 and idle = max |
574
|
|
|
|
|
|
|
|
575
|
|
|
|
|
|
|
To achieve the lowest latency, you additionally should disable any dynamic |
576
|
|
|
|
|
|
|
resizing of the pool by setting C to the same value as C. |
577
|
|
|
|
|
|
|
|
578
|
|
|
|
|
|
|
=item high throughput, cpu bound jobs - set load >= 2, max = #cpus |
579
|
|
|
|
|
|
|
|
580
|
|
|
|
|
|
|
To get high throughput with cpu-bound jobs, you should set the maximum |
581
|
|
|
|
|
|
|
pool size to the number of cpus in your system, and C to at least |
582
|
|
|
|
|
|
|
C<2>, to make sure there can be another job waiting for the worker when it |
583
|
|
|
|
|
|
|
has finished one. |
584
|
|
|
|
|
|
|
|
585
|
|
|
|
|
|
|
The value of C<2> for C is the minimum value that I achieve |
586
|
|
|
|
|
|
|
100% throughput, but if your parent process itself is sometimes busy, you |
587
|
|
|
|
|
|
|
might need higher values. Also there is a limit on the amount of data that |
588
|
|
|
|
|
|
|
can be "in flight" to the worker, so if you send big blobs of data to your |
589
|
|
|
|
|
|
|
worker, C might have much less of an effect. |
590
|
|
|
|
|
|
|
|
591
|
|
|
|
|
|
|
=item high throughput, I/O bound jobs - set load >= 2, max = 1, or very high |
592
|
|
|
|
|
|
|
|
593
|
|
|
|
|
|
|
When your jobs are I/O bound, using more workers usually boils down to |
594
|
|
|
|
|
|
|
higher throughput, depending very much on your actual workload - sometimes |
595
|
|
|
|
|
|
|
having only one worker is best, for example, when you read or write big |
596
|
|
|
|
|
|
|
files at maximum speed, as a second worker will increase seek times. |
597
|
|
|
|
|
|
|
|
598
|
|
|
|
|
|
|
=back |
599
|
|
|
|
|
|
|
|
600
|
|
|
|
|
|
|
=head1 EXCEPTIONS |
601
|
|
|
|
|
|
|
|
602
|
|
|
|
|
|
|
The same "policy" as with L applies - exceptions |
603
|
|
|
|
|
|
|
will not be caught, and exceptions in both worker and in callbacks causes |
604
|
|
|
|
|
|
|
undesirable or undefined behaviour. |
605
|
|
|
|
|
|
|
|
606
|
|
|
|
|
|
|
=head1 SEE ALSO |
607
|
|
|
|
|
|
|
|
608
|
|
|
|
|
|
|
L, to create the processes in the first place. |
609
|
|
|
|
|
|
|
|
610
|
|
|
|
|
|
|
L, likewise, but helpful for remote processes. |
611
|
|
|
|
|
|
|
|
612
|
|
|
|
|
|
|
L, which implements the RPC protocol and API. |
613
|
|
|
|
|
|
|
|
614
|
|
|
|
|
|
|
=head1 AUTHOR AND CONTACT INFORMATION |
615
|
|
|
|
|
|
|
|
616
|
|
|
|
|
|
|
Marc Lehmann |
617
|
|
|
|
|
|
|
http://software.schmorp.de/pkg/AnyEvent-Fork-Pool |
618
|
|
|
|
|
|
|
|
619
|
|
|
|
|
|
|
=cut |
620
|
|
|
|
|
|
|
|
621
|
|
|
|
|
|
|
1 |
622
|
|
|
|
|
|
|
|