line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
=head1 NAME |
2
|
|
|
|
|
|
|
|
3
|
|
|
|
|
|
|
AnyEvent::Fork::Pool - simple process pool manager on top of AnyEvent::Fork |
4
|
|
|
|
|
|
|
|
5
|
|
|
|
|
|
|
THE API IS NOT FINISHED, CONSIDER THIS AN ALPHA RELEASE |
6
|
|
|
|
|
|
|
|
7
|
|
|
|
|
|
|
=head1 SYNOPSIS |
8
|
|
|
|
|
|
|
|
9
|
|
|
|
|
|
|
use AnyEvent; |
10
|
|
|
|
|
|
|
use AnyEvent::Fork::Pool; |
11
|
|
|
|
|
|
|
# use AnyEvent::Fork is not needed |
12
|
|
|
|
|
|
|
|
13
|
|
|
|
|
|
|
# all possible parameters shown, with default values |
14
|
|
|
|
|
|
|
my $pool = AnyEvent::Fork |
15
|
|
|
|
|
|
|
->new |
16
|
|
|
|
|
|
|
->require ("MyWorker") |
17
|
|
|
|
|
|
|
->AnyEvent::Fork::Pool::run ( |
18
|
|
|
|
|
|
|
"MyWorker::run", # the worker function |
19
|
|
|
|
|
|
|
|
20
|
|
|
|
|
|
|
# pool management |
21
|
|
|
|
|
|
|
max => 4, # absolute maximum # of processes |
22
|
|
|
|
|
|
|
idle => 0, # minimum # of idle processes |
23
|
|
|
|
|
|
|
load => 2, # queue at most this number of jobs per process |
24
|
|
|
|
|
|
|
start => 0.1, # wait this many seconds before starting a new process |
25
|
|
|
|
|
|
|
stop => 10, # wait this many seconds before stopping an idle process |
26
|
|
|
|
|
|
|
on_destroy => (my $finish = AE::cv), # called when object is destroyed |
27
|
|
|
|
|
|
|
|
28
|
|
|
|
|
|
|
# parameters passed to AnyEvent::Fork::RPC |
29
|
|
|
|
|
|
|
async => 0, |
30
|
|
|
|
|
|
|
on_error => sub { die "FATAL: $_[0]\n" }, |
31
|
|
|
|
|
|
|
on_event => sub { my @ev = @_ }, |
32
|
|
|
|
|
|
|
init => "MyWorker::init", |
33
|
|
|
|
|
|
|
serialiser => $AnyEvent::Fork::RPC::STRING_SERIALISER, |
34
|
|
|
|
|
|
|
); |
35
|
|
|
|
|
|
|
|
36
|
|
|
|
|
|
|
for (1..10) { |
37
|
|
|
|
|
|
|
$pool->(doit => $_, sub { |
38
|
|
|
|
|
|
|
print "MyWorker::run returned @_\n"; |
39
|
|
|
|
|
|
|
}); |
40
|
|
|
|
|
|
|
} |
41
|
|
|
|
|
|
|
|
42
|
|
|
|
|
|
|
undef $pool; |
43
|
|
|
|
|
|
|
|
44
|
|
|
|
|
|
|
$finish->recv; |
45
|
|
|
|
|
|
|
|
46
|
|
|
|
|
|
|
=head1 DESCRIPTION |
47
|
|
|
|
|
|
|
|
48
|
|
|
|
|
|
|
This module uses processes created via L and the RPC |
49
|
|
|
|
|
|
|
protocol implement in L to create a load-balanced |
50
|
|
|
|
|
|
|
pool of processes that handles jobs. |
51
|
|
|
|
|
|
|
|
52
|
|
|
|
|
|
|
Understanding of L is helpful but not critical to be able |
53
|
|
|
|
|
|
|
to use this module, but a thorough understanding of L |
54
|
|
|
|
|
|
|
is, as it defines the actual API that needs to be implemented in the |
55
|
|
|
|
|
|
|
worker processes. |
56
|
|
|
|
|
|
|
|
57
|
|
|
|
|
|
|
=head1 EXAMPLES |
58
|
|
|
|
|
|
|
|
59
|
|
|
|
|
|
|
=head1 PARENT USAGE |
60
|
|
|
|
|
|
|
|
61
|
|
|
|
|
|
|
To create a pool, you first have to create a L object - |
62
|
|
|
|
|
|
|
this object becomes your template process. Whenever a new worker process |
63
|
|
|
|
|
|
|
is needed, it is forked from this template process. Then you need to |
64
|
|
|
|
|
|
|
"hand off" this template process to the C module by |
65
|
|
|
|
|
|
|
calling its run method on it: |
66
|
|
|
|
|
|
|
|
67
|
|
|
|
|
|
|
my $template = AnyEvent::Fork |
68
|
|
|
|
|
|
|
->new |
69
|
|
|
|
|
|
|
->require ("SomeModule", "MyWorkerModule"); |
70
|
|
|
|
|
|
|
|
71
|
|
|
|
|
|
|
my $pool = $template->AnyEvent::Fork::Pool::run ("MyWorkerModule::myfunction"); |
72
|
|
|
|
|
|
|
|
73
|
|
|
|
|
|
|
The pool "object" is not a regular Perl object, but a code reference that |
74
|
|
|
|
|
|
|
you can call and that works roughly like calling the worker function |
75
|
|
|
|
|
|
|
directly, except that it returns nothing but instead you need to specify a |
76
|
|
|
|
|
|
|
callback to be invoked once results are in: |
77
|
|
|
|
|
|
|
|
78
|
|
|
|
|
|
|
$pool->(1, 2, 3, sub { warn "myfunction(1,2,3) returned @_" }); |
79
|
|
|
|
|
|
|
|
80
|
|
|
|
|
|
|
=over 4 |
81
|
|
|
|
|
|
|
|
82
|
|
|
|
|
|
|
=cut |
83
|
|
|
|
|
|
|
|
84
|
|
|
|
|
|
|
package AnyEvent::Fork::Pool; |
85
|
|
|
|
|
|
|
|
86
|
1
|
|
|
1
|
|
1382
|
use common::sense; |
|
1
|
|
|
|
|
7
|
|
|
1
|
|
|
|
|
4
|
|
87
|
|
|
|
|
|
|
|
88
|
1
|
|
|
1
|
|
44
|
use Scalar::Util (); |
|
1
|
|
|
|
|
1
|
|
|
1
|
|
|
|
|
12
|
|
89
|
|
|
|
|
|
|
|
90
|
1
|
|
|
1
|
|
727
|
use Guard (); |
|
1
|
|
|
|
|
507
|
|
|
1
|
|
|
|
|
18
|
|
91
|
1
|
|
|
1
|
|
1977
|
use Array::Heap (); |
|
1
|
|
|
|
|
630
|
|
|
1
|
|
|
|
|
20
|
|
92
|
|
|
|
|
|
|
|
93
|
1
|
|
|
1
|
|
1649
|
use AnyEvent; |
|
1
|
|
|
|
|
5309
|
|
|
1
|
|
|
|
|
46
|
|
94
|
|
|
|
|
|
|
# explicit version on next line, as some cpan-testers test with the 0.1 version, |
95
|
|
|
|
|
|
|
# ignoring dependencies, and this line will at least give a clear indication of that. |
96
|
1
|
|
|
1
|
|
895
|
use AnyEvent::Fork 0.6; # we don't actually depend on it, this is for convenience |
|
1
|
|
|
|
|
15487
|
|
|
1
|
|
|
|
|
34
|
|
97
|
1
|
|
|
1
|
|
1043
|
use AnyEvent::Fork::RPC; |
|
1
|
|
|
|
|
1367
|
|
|
1
|
|
|
|
|
1594
|
|
98
|
|
|
|
|
|
|
|
99
|
|
|
|
|
|
|
# these are used for the first and last argument of events |
100
|
|
|
|
|
|
|
# in the hope of not colliding. yes, I don't like it either, |
101
|
|
|
|
|
|
|
# but didn't come up with an obviously better alternative. |
102
|
|
|
|
|
|
|
my $magic0 = ':t6Z@HK1N%Dx@_7?=~-7NQgWDdAs6a,jFN=wLO0*jD*1%P'; |
103
|
|
|
|
|
|
|
my $magic1 = '<~53rexz.U`!]X[A235^"fyEoiTF\T~oH1l/N6+Djep9b~bI9`\1x%B~vWO1q*'; |
104
|
|
|
|
|
|
|
|
105
|
|
|
|
|
|
|
our $VERSION = 1.1; |
106
|
|
|
|
|
|
|
|
107
|
|
|
|
|
|
|
=item my $pool = AnyEvent::Fork::Pool::run $fork, $function, [key => value...] |
108
|
|
|
|
|
|
|
|
109
|
|
|
|
|
|
|
The traditional way to call the pool creation function. But it is way |
110
|
|
|
|
|
|
|
cooler to call it in the following way: |
111
|
|
|
|
|
|
|
|
112
|
|
|
|
|
|
|
=item my $pool = $fork->AnyEvent::Fork::Pool::run ($function, [key => value...]) |
113
|
|
|
|
|
|
|
|
114
|
|
|
|
|
|
|
Creates a new pool object with the specified C<$function> as function |
115
|
|
|
|
|
|
|
(name) to call for each request. The pool uses the C<$fork> object as the |
116
|
|
|
|
|
|
|
template when creating worker processes. |
117
|
|
|
|
|
|
|
|
118
|
|
|
|
|
|
|
You can supply your own template process, or tell C |
119
|
|
|
|
|
|
|
to create one. |
120
|
|
|
|
|
|
|
|
121
|
|
|
|
|
|
|
A relatively large number of key/value pairs can be specified to influence |
122
|
|
|
|
|
|
|
the behaviour. They are grouped into the categories "pool management", |
123
|
|
|
|
|
|
|
"template process" and "rpc parameters". |
124
|
|
|
|
|
|
|
|
125
|
|
|
|
|
|
|
=over 4 |
126
|
|
|
|
|
|
|
|
127
|
|
|
|
|
|
|
=item Pool Management |
128
|
|
|
|
|
|
|
|
129
|
|
|
|
|
|
|
The pool consists of a certain number of worker processes. These options |
130
|
|
|
|
|
|
|
decide how many of these processes exist and when they are started and |
131
|
|
|
|
|
|
|
stopped. |
132
|
|
|
|
|
|
|
|
133
|
|
|
|
|
|
|
The worker pool is dynamically resized, according to (perceived :) |
134
|
|
|
|
|
|
|
load. The minimum size is given by the C parameter and the maximum |
135
|
|
|
|
|
|
|
size is given by the C parameter. A new worker is started every |
136
|
|
|
|
|
|
|
C seconds at most, and an idle worker is stopped at most every |
137
|
|
|
|
|
|
|
C second. |
138
|
|
|
|
|
|
|
|
139
|
|
|
|
|
|
|
You can specify the amount of jobs sent to a worker concurrently using the |
140
|
|
|
|
|
|
|
C parameter. |
141
|
|
|
|
|
|
|
|
142
|
|
|
|
|
|
|
=over 4 |
143
|
|
|
|
|
|
|
|
144
|
|
|
|
|
|
|
=item idle => $count (default: 0) |
145
|
|
|
|
|
|
|
|
146
|
|
|
|
|
|
|
The minimum amount of idle processes in the pool - when there are fewer |
147
|
|
|
|
|
|
|
than this many idle workers, C will try to start new |
148
|
|
|
|
|
|
|
ones, subject to the limits set by C and C. |
149
|
|
|
|
|
|
|
|
150
|
|
|
|
|
|
|
This is also the initial amount of workers in the pool. The default of |
151
|
|
|
|
|
|
|
zero means that the pool starts empty and can shrink back to zero workers |
152
|
|
|
|
|
|
|
over time. |
153
|
|
|
|
|
|
|
|
154
|
|
|
|
|
|
|
=item max => $count (default: 4) |
155
|
|
|
|
|
|
|
|
156
|
|
|
|
|
|
|
The maximum number of processes in the pool, in addition to the template |
157
|
|
|
|
|
|
|
process. C will never have more than this number of |
158
|
|
|
|
|
|
|
worker processes, although there can be more temporarily when a worker is |
159
|
|
|
|
|
|
|
shut down and hasn't exited yet. |
160
|
|
|
|
|
|
|
|
161
|
|
|
|
|
|
|
=item load => $count (default: 2) |
162
|
|
|
|
|
|
|
|
163
|
|
|
|
|
|
|
The maximum number of concurrent jobs sent to a single worker process. |
164
|
|
|
|
|
|
|
|
165
|
|
|
|
|
|
|
Jobs that cannot be sent to a worker immediately (because all workers are |
166
|
|
|
|
|
|
|
busy) will be queued until a worker is available. |
167
|
|
|
|
|
|
|
|
168
|
|
|
|
|
|
|
Setting this low improves latency. For example, at C<1>, every job that |
169
|
|
|
|
|
|
|
is sent to a worker is sent to a completely idle worker that doesn't run |
170
|
|
|
|
|
|
|
any other jobs. The downside is that throughput is reduced - a worker that |
171
|
|
|
|
|
|
|
finishes a job needs to wait for a new job from the parent. |
172
|
|
|
|
|
|
|
|
173
|
|
|
|
|
|
|
The default of C<2> is usually a good compromise. |
174
|
|
|
|
|
|
|
|
175
|
|
|
|
|
|
|
=item start => $seconds (default: 0.1) |
176
|
|
|
|
|
|
|
|
177
|
|
|
|
|
|
|
When there are fewer than C workers (or all workers are completely |
178
|
|
|
|
|
|
|
busy), then a timer is started. If the timer elapses and there are still |
179
|
|
|
|
|
|
|
jobs that cannot be queued to a worker, a new worker is started. |
180
|
|
|
|
|
|
|
|
181
|
|
|
|
|
|
|
This sets the minimum time that all workers must be busy before a new |
182
|
|
|
|
|
|
|
worker is started. Or, put differently, the minimum delay between starting |
183
|
|
|
|
|
|
|
new workers. |
184
|
|
|
|
|
|
|
|
185
|
|
|
|
|
|
|
The delay is small by default, which means new workers will be started |
186
|
|
|
|
|
|
|
relatively quickly. A delay of C<0> is possible, and ensures that the pool |
187
|
|
|
|
|
|
|
will grow as quickly as possible under load. |
188
|
|
|
|
|
|
|
|
189
|
|
|
|
|
|
|
Non-zero values are useful to avoid "exploding" a pool because a lot of |
190
|
|
|
|
|
|
|
jobs are queued in an instant. |
191
|
|
|
|
|
|
|
|
192
|
|
|
|
|
|
|
Higher values are often useful to improve efficiency at the cost of |
193
|
|
|
|
|
|
|
latency - when fewer processes can do the job over time, starting more and |
194
|
|
|
|
|
|
|
more is not necessarily going to help. |
195
|
|
|
|
|
|
|
|
196
|
|
|
|
|
|
|
=item stop => $seconds (default: 10) |
197
|
|
|
|
|
|
|
|
198
|
|
|
|
|
|
|
When a worker has no jobs to execute it becomes idle. An idle worker that |
199
|
|
|
|
|
|
|
hasn't executed a job within this amount of time will be stopped, unless |
200
|
|
|
|
|
|
|
the other parameters say otherwise. |
201
|
|
|
|
|
|
|
|
202
|
|
|
|
|
|
|
Setting this to a very high value means that workers stay around longer, |
203
|
|
|
|
|
|
|
even when they have nothing to do, which can be good as they don't have to |
204
|
|
|
|
|
|
|
be started on the netx load spike again. |
205
|
|
|
|
|
|
|
|
206
|
|
|
|
|
|
|
Setting this to a lower value can be useful to avoid memory or simply |
207
|
|
|
|
|
|
|
process table wastage. |
208
|
|
|
|
|
|
|
|
209
|
|
|
|
|
|
|
Usually, setting this to a time longer than the time between load spikes |
210
|
|
|
|
|
|
|
is best - if you expect a lot of requests every minute and little work |
211
|
|
|
|
|
|
|
in between, setting this to longer than a minute avoids having to stop |
212
|
|
|
|
|
|
|
and start workers. On the other hand, you have to ask yourself if letting |
213
|
|
|
|
|
|
|
workers run idle is a good use of your resources. Try to find a good |
214
|
|
|
|
|
|
|
balance between resource usage of your workers and the time to start new |
215
|
|
|
|
|
|
|
workers - the processes created by L itself is fats at |
216
|
|
|
|
|
|
|
creating workers while not using much memory for them, so most of the |
217
|
|
|
|
|
|
|
overhead is likely from your own code. |
218
|
|
|
|
|
|
|
|
219
|
|
|
|
|
|
|
=item on_destroy => $callback->() (default: none) |
220
|
|
|
|
|
|
|
|
221
|
|
|
|
|
|
|
When a pool object goes out of scope, the outstanding requests are still |
222
|
|
|
|
|
|
|
handled till completion. Only after handling all jobs will the workers |
223
|
|
|
|
|
|
|
be destroyed (and also the template process if it isn't referenced |
224
|
|
|
|
|
|
|
otherwise). |
225
|
|
|
|
|
|
|
|
226
|
|
|
|
|
|
|
To find out when a pool I has finished its work, you can set this |
227
|
|
|
|
|
|
|
callback, which will be called when the pool has been destroyed. |
228
|
|
|
|
|
|
|
|
229
|
|
|
|
|
|
|
=back |
230
|
|
|
|
|
|
|
|
231
|
|
|
|
|
|
|
=item AnyEvent::Fork::RPC Parameters |
232
|
|
|
|
|
|
|
|
233
|
|
|
|
|
|
|
These parameters are all passed more or less directly to |
234
|
|
|
|
|
|
|
L. They are only briefly mentioned here, for |
235
|
|
|
|
|
|
|
their full documentation please refer to the L |
236
|
|
|
|
|
|
|
documentation. Also, the default values mentioned here are only documented |
237
|
|
|
|
|
|
|
as a best effort - the L documentation is binding. |
238
|
|
|
|
|
|
|
|
239
|
|
|
|
|
|
|
=over 4 |
240
|
|
|
|
|
|
|
|
241
|
|
|
|
|
|
|
=item async => $boolean (default: 0) |
242
|
|
|
|
|
|
|
|
243
|
|
|
|
|
|
|
Whether to use the synchronous or asynchronous RPC backend. |
244
|
|
|
|
|
|
|
|
245
|
|
|
|
|
|
|
=item on_error => $callback->($message) (default: die with message) |
246
|
|
|
|
|
|
|
|
247
|
|
|
|
|
|
|
The callback to call on any (fatal) errors. |
248
|
|
|
|
|
|
|
|
249
|
|
|
|
|
|
|
=item on_event => $callback->(...) (default: C, unlike L) |
250
|
|
|
|
|
|
|
|
251
|
|
|
|
|
|
|
The callback to invoke on events. |
252
|
|
|
|
|
|
|
|
253
|
|
|
|
|
|
|
=item init => $initfunction (default: none) |
254
|
|
|
|
|
|
|
|
255
|
|
|
|
|
|
|
The function to call in the child, once before handling requests. |
256
|
|
|
|
|
|
|
|
257
|
|
|
|
|
|
|
=item serialiser => $serialiser (defailt: $AnyEvent::Fork::RPC::STRING_SERIALISER) |
258
|
|
|
|
|
|
|
|
259
|
|
|
|
|
|
|
The serialiser to use. |
260
|
|
|
|
|
|
|
|
261
|
|
|
|
|
|
|
=back |
262
|
|
|
|
|
|
|
|
263
|
|
|
|
|
|
|
=back |
264
|
|
|
|
|
|
|
|
265
|
|
|
|
|
|
|
=cut |
266
|
|
|
|
|
|
|
|
267
|
|
|
|
|
|
|
sub run { |
268
|
0
|
|
|
0
|
1
|
|
my ($template, $function, %arg) = @_; |
269
|
|
|
|
|
|
|
|
270
|
0
|
|
0
|
|
|
|
my $max = $arg{max} || 4; |
271
|
|
|
|
|
|
|
my $idle = $arg{idle} || 0, |
272
|
|
|
|
|
|
|
my $load = $arg{load} || 2, |
273
|
|
|
|
|
|
|
my $start = $arg{start} || 0.1, |
274
|
|
|
|
|
|
|
my $stop = $arg{stop} || 10, |
275
|
0
|
|
|
0
|
|
|
my $on_event = $arg{on_event} || sub { }, |
276
|
0
|
|
0
|
|
|
|
my $on_destroy = $arg{on_destroy}; |
|
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
277
|
|
|
|
|
|
|
|
278
|
0
|
|
|
|
|
|
my @rpc = ( |
279
|
|
|
|
|
|
|
async => $arg{async}, |
280
|
|
|
|
|
|
|
init => $arg{init}, |
281
|
|
|
|
|
|
|
serialiser => delete $arg{serialiser}, |
282
|
|
|
|
|
|
|
on_error => $arg{on_error}, |
283
|
|
|
|
|
|
|
); |
284
|
|
|
|
|
|
|
|
285
|
0
|
|
|
|
|
|
my (@pool, @queue, $nidle, $start_w, $stop_w, $shutdown); |
286
|
0
|
|
|
|
|
|
my ($start_worker, $stop_worker, $want_start, $want_stop, $scheduler); |
287
|
|
|
|
|
|
|
|
288
|
|
|
|
|
|
|
my $destroy_guard = Guard::guard { |
289
|
0
|
0
|
|
0
|
|
|
$on_destroy->() |
290
|
|
|
|
|
|
|
if $on_destroy; |
291
|
0
|
|
|
|
|
|
}; |
292
|
|
|
|
|
|
|
|
293
|
0
|
0
|
|
|
|
|
$template |
294
|
|
|
|
|
|
|
->require ("AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync")) |
295
|
|
|
|
|
|
|
->eval (' |
296
|
|
|
|
|
|
|
my ($magic0, $magic1) = @_; |
297
|
|
|
|
|
|
|
sub AnyEvent::Fork::Pool::retire() { |
298
|
|
|
|
|
|
|
AnyEvent::Fork::RPC::event $magic0, "quit", $magic1; |
299
|
|
|
|
|
|
|
} |
300
|
|
|
|
|
|
|
', $magic0, $magic1) |
301
|
|
|
|
|
|
|
; |
302
|
|
|
|
|
|
|
|
303
|
|
|
|
|
|
|
$start_worker = sub { |
304
|
0
|
|
|
0
|
|
|
my $proc = [0, 0, undef]; # load, index, rpc |
305
|
|
|
|
|
|
|
|
306
|
|
|
|
|
|
|
$proc->[2] = $template |
307
|
|
|
|
|
|
|
->fork |
308
|
|
|
|
|
|
|
->AnyEvent::Fork::RPC::run ($function, |
309
|
|
|
|
|
|
|
@rpc, |
310
|
|
|
|
|
|
|
on_event => sub { |
311
|
0
|
0
|
0
|
|
|
|
if (@_ == 3 && $_[0] eq $magic0 && $_[2] eq $magic1) { |
|
|
|
0
|
|
|
|
|
312
|
0
|
|
|
|
|
|
$destroy_guard if 0; # keep it alive |
313
|
|
|
|
|
|
|
|
314
|
0
|
0
|
|
|
|
|
$_[1] eq "quit" and $stop_worker->($proc); |
315
|
0
|
|
|
|
|
|
return; |
316
|
|
|
|
|
|
|
} |
317
|
|
|
|
|
|
|
|
318
|
0
|
|
|
|
|
|
&$on_event; |
319
|
|
|
|
|
|
|
}, |
320
|
|
|
|
|
|
|
) |
321
|
0
|
|
|
|
|
|
; |
322
|
|
|
|
|
|
|
|
323
|
0
|
|
|
|
|
|
++$nidle; |
324
|
0
|
|
|
|
|
|
Array::Heap::push_heap_idx @pool, $proc; |
325
|
|
|
|
|
|
|
|
326
|
0
|
|
|
|
|
|
Scalar::Util::weaken $proc; |
327
|
0
|
|
|
|
|
|
}; |
328
|
|
|
|
|
|
|
|
329
|
|
|
|
|
|
|
$stop_worker = sub { |
330
|
0
|
|
|
0
|
|
|
my $proc = shift; |
331
|
|
|
|
|
|
|
|
332
|
0
|
0
|
|
|
|
|
$proc->[0] |
333
|
|
|
|
|
|
|
or --$nidle; |
334
|
|
|
|
|
|
|
|
335
|
0
|
0
|
|
|
|
|
Array::Heap::splice_heap_idx @pool, $proc->[1] |
336
|
|
|
|
|
|
|
if defined $proc->[1]; |
337
|
|
|
|
|
|
|
|
338
|
0
|
|
|
|
|
|
@$proc = 0; # tell others to leave it be |
339
|
0
|
|
|
|
|
|
}; |
340
|
|
|
|
|
|
|
|
341
|
|
|
|
|
|
|
$want_start = sub { |
342
|
0
|
|
|
0
|
|
|
undef $stop_w; |
343
|
|
|
|
|
|
|
|
344
|
|
|
|
|
|
|
$start_w ||= AE::timer $start, $start, sub { |
345
|
0
|
0
|
0
|
|
|
|
if (($nidle < $idle || @queue) && @pool < $max) { |
|
|
|
0
|
|
|
|
|
346
|
0
|
|
|
|
|
|
$start_worker->(); |
347
|
0
|
|
|
|
|
|
$scheduler->(); |
348
|
|
|
|
|
|
|
} else { |
349
|
0
|
|
|
|
|
|
undef $start_w; |
350
|
|
|
|
|
|
|
} |
351
|
0
|
|
0
|
|
|
|
}; |
352
|
0
|
|
|
|
|
|
}; |
353
|
|
|
|
|
|
|
|
354
|
|
|
|
|
|
|
$want_stop = sub { |
355
|
|
|
|
|
|
|
$stop_w ||= AE::timer $stop, $stop, sub { |
356
|
0
|
0
|
|
|
|
|
$stop_worker->($pool[0]) |
357
|
|
|
|
|
|
|
if $nidle; |
358
|
|
|
|
|
|
|
|
359
|
0
|
0
|
|
|
|
|
undef $stop_w |
360
|
|
|
|
|
|
|
if $nidle <= $idle; |
361
|
0
|
|
0
|
0
|
|
|
}; |
362
|
0
|
|
|
|
|
|
}; |
363
|
|
|
|
|
|
|
|
364
|
|
|
|
|
|
|
$scheduler = sub { |
365
|
0
|
0
|
|
0
|
|
|
if (@queue) { |
|
|
0
|
|
|
|
|
|
366
|
0
|
|
|
|
|
|
while (@queue) { |
367
|
0
|
0
|
|
|
|
|
@pool or $start_worker->(); |
368
|
|
|
|
|
|
|
|
369
|
0
|
|
|
|
|
|
my $proc = $pool[0]; |
370
|
|
|
|
|
|
|
|
371
|
0
|
0
|
|
|
|
|
if ($proc->[0] < $load) { |
372
|
|
|
|
|
|
|
# found free worker, increase load |
373
|
0
|
0
|
|
|
|
|
unless ($proc->[0]++) { |
374
|
|
|
|
|
|
|
# worker became busy |
375
|
0
|
0
|
|
|
|
|
--$nidle |
376
|
|
|
|
|
|
|
or undef $stop_w; |
377
|
|
|
|
|
|
|
|
378
|
0
|
0
|
0
|
|
|
|
$want_start->() |
379
|
|
|
|
|
|
|
if $nidle < $idle && @pool < $max; |
380
|
|
|
|
|
|
|
} |
381
|
|
|
|
|
|
|
|
382
|
0
|
|
|
|
|
|
Array::Heap::adjust_heap_idx @pool, 0; |
383
|
|
|
|
|
|
|
|
384
|
0
|
|
|
|
|
|
my $job = shift @queue; |
385
|
0
|
|
|
|
|
|
my $ocb = pop @$job; |
386
|
|
|
|
|
|
|
|
387
|
|
|
|
|
|
|
$proc->[2]->(@$job, sub { |
388
|
|
|
|
|
|
|
# reduce load |
389
|
0
|
0
|
0
|
|
|
|
--$proc->[0] # worker still busy? |
390
|
|
|
|
|
|
|
or ++$nidle > $idle # not too many idle processes? |
391
|
|
|
|
|
|
|
or $want_stop->(); |
392
|
|
|
|
|
|
|
|
393
|
0
|
0
|
|
|
|
|
Array::Heap::adjust_heap_idx @pool, $proc->[1] |
394
|
|
|
|
|
|
|
if defined $proc->[1]; |
395
|
|
|
|
|
|
|
|
396
|
0
|
|
|
|
|
|
&$ocb; |
397
|
|
|
|
|
|
|
|
398
|
0
|
|
|
|
|
|
$scheduler->(); |
399
|
0
|
|
|
|
|
|
}); |
400
|
|
|
|
|
|
|
} else { |
401
|
0
|
0
|
|
|
|
|
$want_start->() |
402
|
|
|
|
|
|
|
unless @pool >= $max; |
403
|
|
|
|
|
|
|
|
404
|
0
|
|
|
|
|
|
last; |
405
|
|
|
|
|
|
|
} |
406
|
|
|
|
|
|
|
} |
407
|
|
|
|
|
|
|
} elsif ($shutdown) { |
408
|
0
|
|
|
|
|
|
@pool = (); |
409
|
0
|
|
|
|
|
|
undef $start_w; |
410
|
0
|
|
|
|
|
|
undef $start_worker; # frees $destroy_guard reference |
411
|
|
|
|
|
|
|
|
412
|
0
|
|
|
|
|
|
$stop_worker->($pool[0]) |
413
|
|
|
|
|
|
|
while $nidle; |
414
|
|
|
|
|
|
|
} |
415
|
0
|
|
|
|
|
|
}; |
416
|
|
|
|
|
|
|
|
417
|
|
|
|
|
|
|
my $shutdown_guard = Guard::guard { |
418
|
0
|
|
|
0
|
|
|
$shutdown = 1; |
419
|
0
|
|
|
|
|
|
$scheduler->(); |
420
|
0
|
|
|
|
|
|
}; |
421
|
|
|
|
|
|
|
|
422
|
0
|
|
|
|
|
|
$start_worker->() |
423
|
|
|
|
|
|
|
while @pool < $idle; |
424
|
|
|
|
|
|
|
|
425
|
|
|
|
|
|
|
sub { |
426
|
0
|
|
|
0
|
|
|
$shutdown_guard if 0; # keep it alive |
427
|
|
|
|
|
|
|
|
428
|
0
|
0
|
|
|
|
|
$start_worker->() |
429
|
|
|
|
|
|
|
unless @pool; |
430
|
|
|
|
|
|
|
|
431
|
0
|
|
|
|
|
|
push @queue, [@_]; |
432
|
0
|
|
|
|
|
|
$scheduler->(); |
433
|
|
|
|
|
|
|
} |
434
|
0
|
|
|
|
|
|
} |
435
|
|
|
|
|
|
|
|
436
|
|
|
|
|
|
|
=item $pool->(..., $cb->(...)) |
437
|
|
|
|
|
|
|
|
438
|
|
|
|
|
|
|
Call the RPC function of a worker with the given arguments, and when the |
439
|
|
|
|
|
|
|
worker is done, call the C<$cb> with the results, just like calling the |
440
|
|
|
|
|
|
|
RPC object durectly - see the L documentation for |
441
|
|
|
|
|
|
|
details on the RPC API. |
442
|
|
|
|
|
|
|
|
443
|
|
|
|
|
|
|
If there is no free worker, the call will be queued until a worker becomes |
444
|
|
|
|
|
|
|
available. |
445
|
|
|
|
|
|
|
|
446
|
|
|
|
|
|
|
Note that there can be considerable time between calling this method and |
447
|
|
|
|
|
|
|
the call actually being executed. During this time, the parameters passed |
448
|
|
|
|
|
|
|
to this function are effectively read-only - modifying them after the call |
449
|
|
|
|
|
|
|
and before the callback is invoked causes undefined behaviour. |
450
|
|
|
|
|
|
|
|
451
|
|
|
|
|
|
|
=cut |
452
|
|
|
|
|
|
|
|
453
|
|
|
|
|
|
|
=item $cpus = AnyEvent::Fork::Pool::ncpu [$default_cpus] |
454
|
|
|
|
|
|
|
|
455
|
|
|
|
|
|
|
=item ($cpus, $eus) = AnyEvent::Fork::Pool::ncpu [$default_cpus] |
456
|
|
|
|
|
|
|
|
457
|
|
|
|
|
|
|
Tries to detect the number of CPUs (C<$cpus> often called cpu cores |
458
|
|
|
|
|
|
|
nowadays) and execution units (C<$eus>) which include e.g. extra |
459
|
|
|
|
|
|
|
hyperthreaded units). When C<$cpus> cannot be determined reliably, |
460
|
|
|
|
|
|
|
C<$default_cpus> is returned for both values, or C<1> if it is missing. |
461
|
|
|
|
|
|
|
|
462
|
|
|
|
|
|
|
For normal CPU bound uses, it is wise to have as many worker processes |
463
|
|
|
|
|
|
|
as CPUs in the system (C<$cpus>), if nothing else uses the CPU. Using |
464
|
|
|
|
|
|
|
hyperthreading is usually detrimental to performance, but in those rare |
465
|
|
|
|
|
|
|
cases where that really helps it might be beneficial to use more workers |
466
|
|
|
|
|
|
|
(C<$eus>). |
467
|
|
|
|
|
|
|
|
468
|
|
|
|
|
|
|
Currently, F is parsed on GNU/Linux systems for both |
469
|
|
|
|
|
|
|
C<$cpus> and C<$eu>, and on {Free,Net,Open}BSD, F is |
470
|
|
|
|
|
|
|
used for C<$cpus>. |
471
|
|
|
|
|
|
|
|
472
|
|
|
|
|
|
|
Example: create a worker pool with as many workers as cpu cores, or C<2>, |
473
|
|
|
|
|
|
|
if the actual number could not be determined. |
474
|
|
|
|
|
|
|
|
475
|
|
|
|
|
|
|
$fork->AnyEvent::Fork::Pool::run ("myworker::function", |
476
|
|
|
|
|
|
|
max => (scalar AnyEvent::Fork::Pool::ncpu 2), |
477
|
|
|
|
|
|
|
); |
478
|
|
|
|
|
|
|
|
479
|
|
|
|
|
|
|
=cut |
480
|
|
|
|
|
|
|
|
481
|
|
|
|
|
|
|
BEGIN { |
482
|
1
|
50
|
0
|
1
|
|
6
|
if ($^O eq "linux") { |
|
|
0
|
0
|
|
|
|
|
483
|
|
|
|
|
|
|
*ncpu = sub(;$) { |
484
|
0
|
|
|
0
|
|
|
my ($cpus, $eus); |
485
|
|
|
|
|
|
|
|
486
|
0
|
0
|
|
|
|
|
if (open my $fh, "<", "/proc/cpuinfo") { |
487
|
0
|
|
|
|
|
|
my %id; |
488
|
|
|
|
|
|
|
|
489
|
0
|
|
|
|
|
|
while (<$fh>) { |
490
|
0
|
0
|
|
|
|
|
if (/^core id\s*:\s*(\d+)/) { |
491
|
0
|
|
|
|
|
|
++$eus; |
492
|
0
|
|
|
|
|
|
undef $id{$1}; |
493
|
|
|
|
|
|
|
} |
494
|
|
|
|
|
|
|
} |
495
|
|
|
|
|
|
|
|
496
|
0
|
|
|
|
|
|
$cpus = scalar keys %id; |
497
|
|
|
|
|
|
|
} else { |
498
|
0
|
0
|
|
|
|
|
$cpus = $eus = @_ ? shift : 1; |
499
|
|
|
|
|
|
|
} |
500
|
0
|
0
|
|
|
|
|
wantarray ? ($cpus, $eus) : $cpus |
501
|
1
|
|
|
|
|
53
|
}; |
502
|
|
|
|
|
|
|
} elsif ($^O eq "freebsd" || $^O eq "netbsd" || $^O eq "openbsd") { |
503
|
|
|
|
|
|
|
*ncpu = sub(;$) { |
504
|
0
|
|
0
|
|
|
|
my $cpus = qx * 1 |
505
|
|
|
|
|
|
|
|| (@_ ? shift : 1); |
506
|
0
|
0
|
|
|
|
|
wantarray ? ($cpus, $cpus) : $cpus |
507
|
0
|
|
|
|
|
|
}; |
508
|
|
|
|
|
|
|
} else { |
509
|
|
|
|
|
|
|
*ncpu = sub(;$) { |
510
|
0
|
0
|
|
|
|
|
my $cpus = @_ ? shift : 1; |
511
|
0
|
0
|
|
|
|
|
wantarray ? ($cpus, $cpus) : $cpus |
512
|
0
|
|
|
|
|
|
}; |
513
|
|
|
|
|
|
|
} |
514
|
|
|
|
|
|
|
} |
515
|
|
|
|
|
|
|
|
516
|
|
|
|
|
|
|
=back |
517
|
|
|
|
|
|
|
|
518
|
|
|
|
|
|
|
=head1 CHILD USAGE |
519
|
|
|
|
|
|
|
|
520
|
|
|
|
|
|
|
In addition to the L API, this module implements one |
521
|
|
|
|
|
|
|
more child-side function: |
522
|
|
|
|
|
|
|
|
523
|
|
|
|
|
|
|
=over 4 |
524
|
|
|
|
|
|
|
|
525
|
|
|
|
|
|
|
=item AnyEvent::Fork::Pool::retire () |
526
|
|
|
|
|
|
|
|
527
|
|
|
|
|
|
|
This function sends an event to the parent process to request retirement: |
528
|
|
|
|
|
|
|
the worker is removed from the pool and no new jobs will be sent to it, |
529
|
|
|
|
|
|
|
but it has to handle the jobs that are already queued. |
530
|
|
|
|
|
|
|
|
531
|
|
|
|
|
|
|
The parentheses are part of the syntax: the function usually isn't defined |
532
|
|
|
|
|
|
|
when you compile your code (because that happens I handing the |
533
|
|
|
|
|
|
|
template process over to C, so you need the |
534
|
|
|
|
|
|
|
empty parentheses to tell Perl that the function is indeed a function. |
535
|
|
|
|
|
|
|
|
536
|
|
|
|
|
|
|
Retiring a worker can be useful to gracefully shut it down when the worker |
537
|
|
|
|
|
|
|
deems this useful. For example, after executing a job, one could check |
538
|
|
|
|
|
|
|
the process size or the number of jobs handled so far, and if either is |
539
|
|
|
|
|
|
|
too high, the worker could ask to get retired, to avoid memory leaks to |
540
|
|
|
|
|
|
|
accumulate. |
541
|
|
|
|
|
|
|
|
542
|
|
|
|
|
|
|
Example: retire a worker after it has handled roughly 100 requests. |
543
|
|
|
|
|
|
|
|
544
|
|
|
|
|
|
|
my $count = 0; |
545
|
|
|
|
|
|
|
|
546
|
|
|
|
|
|
|
sub my::worker { |
547
|
|
|
|
|
|
|
|
548
|
|
|
|
|
|
|
++$count == 100 |
549
|
|
|
|
|
|
|
and AnyEvent::Fork::Pool::retire (); |
550
|
|
|
|
|
|
|
|
551
|
|
|
|
|
|
|
... normal code goes here |
552
|
|
|
|
|
|
|
} |
553
|
|
|
|
|
|
|
|
554
|
|
|
|
|
|
|
=back |
555
|
|
|
|
|
|
|
|
556
|
|
|
|
|
|
|
=head1 POOL PARAMETERS RECIPES |
557
|
|
|
|
|
|
|
|
558
|
|
|
|
|
|
|
This section describes some recipes for pool paramaters. These are mostly |
559
|
|
|
|
|
|
|
meant for the synchronous RPC backend, as the asynchronous RPC backend |
560
|
|
|
|
|
|
|
changes the rules considerably, making workers themselves responsible for |
561
|
|
|
|
|
|
|
their scheduling. |
562
|
|
|
|
|
|
|
|
563
|
|
|
|
|
|
|
=over 4 |
564
|
|
|
|
|
|
|
|
565
|
|
|
|
|
|
|
=item low latency - set load = 1 |
566
|
|
|
|
|
|
|
|
567
|
|
|
|
|
|
|
If you need a deterministic low latency, you should set the C |
568
|
|
|
|
|
|
|
parameter to C<1>. This ensures that never more than one job is sent to |
569
|
|
|
|
|
|
|
each worker. This avoids having to wait for a previous job to finish. |
570
|
|
|
|
|
|
|
|
571
|
|
|
|
|
|
|
This makes most sense with the synchronous (default) backend, as the |
572
|
|
|
|
|
|
|
asynchronous backend can handle multiple requests concurrently. |
573
|
|
|
|
|
|
|
|
574
|
|
|
|
|
|
|
=item lowest latency - set load = 1 and idle = max |
575
|
|
|
|
|
|
|
|
576
|
|
|
|
|
|
|
To achieve the lowest latency, you additionally should disable any dynamic |
577
|
|
|
|
|
|
|
resizing of the pool by setting C to the same value as C. |
578
|
|
|
|
|
|
|
|
579
|
|
|
|
|
|
|
=item high throughput, cpu bound jobs - set load >= 2, max = #cpus |
580
|
|
|
|
|
|
|
|
581
|
|
|
|
|
|
|
To get high throughput with cpu-bound jobs, you should set the maximum |
582
|
|
|
|
|
|
|
pool size to the number of cpus in your system, and C to at least |
583
|
|
|
|
|
|
|
C<2>, to make sure there can be another job waiting for the worker when it |
584
|
|
|
|
|
|
|
has finished one. |
585
|
|
|
|
|
|
|
|
586
|
|
|
|
|
|
|
The value of C<2> for C is the minimum value that I achieve |
587
|
|
|
|
|
|
|
100% throughput, but if your parent process itself is sometimes busy, you |
588
|
|
|
|
|
|
|
might need higher values. Also there is a limit on the amount of data that |
589
|
|
|
|
|
|
|
can be "in flight" to the worker, so if you send big blobs of data to your |
590
|
|
|
|
|
|
|
worker, C might have much less of an effect. |
591
|
|
|
|
|
|
|
|
592
|
|
|
|
|
|
|
=item high throughput, I/O bound jobs - set load >= 2, max = 1, or very high |
593
|
|
|
|
|
|
|
|
594
|
|
|
|
|
|
|
When your jobs are I/O bound, using more workers usually boils down to |
595
|
|
|
|
|
|
|
higher throughput, depending very much on your actual workload - sometimes |
596
|
|
|
|
|
|
|
having only one worker is best, for example, when you read or write big |
597
|
|
|
|
|
|
|
files at maixmum speed, as a second worker will increase seek times. |
598
|
|
|
|
|
|
|
|
599
|
|
|
|
|
|
|
=back |
600
|
|
|
|
|
|
|
|
601
|
|
|
|
|
|
|
=head1 EXCEPTIONS |
602
|
|
|
|
|
|
|
|
603
|
|
|
|
|
|
|
The same "policy" as with L applies - exceptins will |
604
|
|
|
|
|
|
|
not be caught, and exceptions in both worker and in callbacks causes |
605
|
|
|
|
|
|
|
undesirable or undefined behaviour. |
606
|
|
|
|
|
|
|
|
607
|
|
|
|
|
|
|
=head1 SEE ALSO |
608
|
|
|
|
|
|
|
|
609
|
|
|
|
|
|
|
L, to create the processes in the first place. |
610
|
|
|
|
|
|
|
|
611
|
|
|
|
|
|
|
L, which implements the RPC protocol and API. |
612
|
|
|
|
|
|
|
|
613
|
|
|
|
|
|
|
=head1 AUTHOR AND CONTACT INFORMATION |
614
|
|
|
|
|
|
|
|
615
|
|
|
|
|
|
|
Marc Lehmann |
616
|
|
|
|
|
|
|
http://software.schmorp.de/pkg/AnyEvent-Fork-Pool |
617
|
|
|
|
|
|
|
|
618
|
|
|
|
|
|
|
=cut |
619
|
|
|
|
|
|
|
|
620
|
|
|
|
|
|
|
1 |
621
|
|
|
|
|
|
|
|