File Coverage

blib/lib/AnyEvent/Fork/Pool.pm
Criterion Covered Total %
statement 83 105 79.0
branch 27 58 46.5
condition 16 45 35.5
subroutine 15 18 83.3
pod 1 1 100.0
total 142 227 62.5


line stmt bran cond sub pod time code
1             =head1 NAME
2              
3             AnyEvent::Fork::Pool - simple process pool manager on top of AnyEvent::Fork
4              
5             =head1 SYNOPSIS
6              
7             use AnyEvent;
8             use AnyEvent::Fork;
9             use AnyEvent::Fork::Pool;
10              
11             # all possible parameters shown, with default values
12             my $pool = AnyEvent::Fork
13             ->new
14             ->require ("MyWorker")
15             ->AnyEvent::Fork::Pool::run (
16             "MyWorker::run", # the worker function
17              
18             # pool management
19             max => 4, # absolute maximum # of processes
20             idle => 0, # minimum # of idle processes
21             load => 2, # queue at most this number of jobs per process
22             start => 0.1, # wait this many seconds before starting a new process
23             stop => 10, # wait this many seconds before stopping an idle process
24             on_destroy => (my $finish = AE::cv), # called when object is destroyed
25              
26             # parameters passed to AnyEvent::Fork::RPC
27             async => 0,
28             on_error => sub { die "FATAL: $_[0]\n" },
29             on_event => sub { my @ev = @_ },
30             init => "MyWorker::init",
31             serialiser => $AnyEvent::Fork::RPC::STRING_SERIALISER,
32             );
33              
34             for (1..10) {
35             $pool->(doit => $_, sub {
36             print "MyWorker::run returned @_\n";
37             });
38             }
39              
40             undef $pool;
41              
42             $finish->recv;
43              
44             =head1 DESCRIPTION
45              
46             This module uses processes created via L (or
47             L) and the RPC protocol implement in
48             L to create a load-balanced pool of processes that
49             handles jobs.
50              
51             Understanding of L is helpful but not critical to be able
52             to use this module, but a thorough understanding of L
53             is, as it defines the actual API that needs to be implemented in the
54             worker processes.
55              
56             =head1 PARENT USAGE
57              
58             To create a pool, you first have to create a L object -
59             this object becomes your template process. Whenever a new worker process
60             is needed, it is forked from this template process. Then you need to
61             "hand off" this template process to the C module by
62             calling its run method on it:
63              
64             my $template = AnyEvent::Fork
65             ->new
66             ->require ("SomeModule", "MyWorkerModule");
67              
68             my $pool = $template->AnyEvent::Fork::Pool::run ("MyWorkerModule::myfunction");
69              
70             The pool "object" is not a regular Perl object, but a code reference that
71             you can call and that works roughly like calling the worker function
72             directly, except that it returns nothing but instead you need to specify a
73             callback to be invoked once results are in:
74              
75             $pool->(1, 2, 3, sub { warn "myfunction(1,2,3) returned @_" });
76              
77             =over 4
78              
79             =cut
80              
81             package AnyEvent::Fork::Pool;
82              
83 2     2   21239 use common::sense;
  2         13  
  2         10  
84              
85 2     2   112 use Scalar::Util ();
  2         3  
  2         36  
86              
87 2     2   650 use Guard ();
  2         640  
  2         38  
88 2     2   1157 use Array::Heap ();
  2         1776  
  2         53  
89              
90 2     2   1380 use AnyEvent;
  2         7334  
  2         73  
91 2     2   1589 use AnyEvent::Fork::RPC;
  2         5827  
  2         3601  
92              
93             # these are used for the first and last argument of events
94             # in the hope of not colliding. yes, I don't like it either,
95             # but didn't come up with an obviously better alternative.
96             my $magic0 = ':t6Z@HK1N%Dx@_7?=~-7NQgWDdAs6a,jFN=wLO0*jD*1%P';
97             my $magic1 = '<~53rexz.U`!]X[A235^"fyEoiTF\T~oH1l/N6+Djep9b~bI9`\1x%B~vWO1q*';
98              
99             our $VERSION = 1.2;
100              
101             =item my $pool = AnyEvent::Fork::Pool::run $fork, $function, [key => value...]
102              
103             The traditional way to call the pool creation function. But it is way
104             cooler to call it in the following way:
105              
106             =item my $pool = $fork->AnyEvent::Fork::Pool::run ($function, [key => value...])
107              
108             Creates a new pool object with the specified C<$function> as function
109             (name) to call for each request. The pool uses the C<$fork> object as the
110             template when creating worker processes.
111              
112             You can supply your own template process, or tell C
113             to create one.
114              
115             A relatively large number of key/value pairs can be specified to influence
116             the behaviour. They are grouped into the categories "pool management",
117             "template process" and "rpc parameters".
118              
119             =over 4
120              
121             =item Pool Management
122              
123             The pool consists of a certain number of worker processes. These options
124             decide how many of these processes exist and when they are started and
125             stopped.
126              
127             The worker pool is dynamically resized, according to (perceived :)
128             load. The minimum size is given by the C parameter and the maximum
129             size is given by the C parameter. A new worker is started every
130             C seconds at most, and an idle worker is stopped at most every
131             C second.
132              
133             You can specify the amount of jobs sent to a worker concurrently using the
134             C parameter.
135              
136             =over 4
137              
138             =item idle => $count (default: 0)
139              
140             The minimum amount of idle processes in the pool - when there are fewer
141             than this many idle workers, C will try to start new
142             ones, subject to the limits set by C and C.
143              
144             This is also the initial amount of workers in the pool. The default of
145             zero means that the pool starts empty and can shrink back to zero workers
146             over time.
147              
148             =item max => $count (default: 4)
149              
150             The maximum number of processes in the pool, in addition to the template
151             process. C will never have more than this number of
152             worker processes, although there can be more temporarily when a worker is
153             shut down and hasn't exited yet.
154              
155             =item load => $count (default: 2)
156              
157             The maximum number of concurrent jobs sent to a single worker process.
158              
159             Jobs that cannot be sent to a worker immediately (because all workers are
160             busy) will be queued until a worker is available.
161              
162             Setting this low improves latency. For example, at C<1>, every job that
163             is sent to a worker is sent to a completely idle worker that doesn't run
164             any other jobs. The downside is that throughput is reduced - a worker that
165             finishes a job needs to wait for a new job from the parent.
166              
167             The default of C<2> is usually a good compromise.
168              
169             =item start => $seconds (default: 0.1)
170              
171             When there are fewer than C workers (or all workers are completely
172             busy), then a timer is started. If the timer elapses and there are still
173             jobs that cannot be queued to a worker, a new worker is started.
174              
175             This sets the minimum time that all workers must be busy before a new
176             worker is started. Or, put differently, the minimum delay between starting
177             new workers.
178              
179             The delay is small by default, which means new workers will be started
180             relatively quickly. A delay of C<0> is possible, and ensures that the pool
181             will grow as quickly as possible under load.
182              
183             Non-zero values are useful to avoid "exploding" a pool because a lot of
184             jobs are queued in an instant.
185              
186             Higher values are often useful to improve efficiency at the cost of
187             latency - when fewer processes can do the job over time, starting more and
188             more is not necessarily going to help.
189              
190             =item stop => $seconds (default: 10)
191              
192             When a worker has no jobs to execute it becomes idle. An idle worker that
193             hasn't executed a job within this amount of time will be stopped, unless
194             the other parameters say otherwise.
195              
196             Setting this to a very high value means that workers stay around longer,
197             even when they have nothing to do, which can be good as they don't have to
198             be started on the netx load spike again.
199              
200             Setting this to a lower value can be useful to avoid memory or simply
201             process table wastage.
202              
203             Usually, setting this to a time longer than the time between load spikes
204             is best - if you expect a lot of requests every minute and little work
205             in between, setting this to longer than a minute avoids having to stop
206             and start workers. On the other hand, you have to ask yourself if letting
207             workers run idle is a good use of your resources. Try to find a good
208             balance between resource usage of your workers and the time to start new
209             workers - the processes created by L itself is fats at
210             creating workers while not using much memory for them, so most of the
211             overhead is likely from your own code.
212              
213             =item on_destroy => $callback->() (default: none)
214              
215             When a pool object goes out of scope, the outstanding requests are still
216             handled till completion. Only after handling all jobs will the workers
217             be destroyed (and also the template process if it isn't referenced
218             otherwise).
219              
220             To find out when a pool I has finished its work, you can set this
221             callback, which will be called when the pool has been destroyed.
222              
223             =back
224              
225             =item AnyEvent::Fork::RPC Parameters
226              
227             These parameters are all passed more or less directly to
228             L. They are only briefly mentioned here, for
229             their full documentation please refer to the L
230             documentation. Also, the default values mentioned here are only documented
231             as a best effort - the L documentation is binding.
232              
233             =over 4
234              
235             =item async => $boolean (default: 0)
236              
237             Whether to use the synchronous or asynchronous RPC backend.
238              
239             =item on_error => $callback->($message) (default: die with message)
240              
241             The callback to call on any (fatal) errors.
242              
243             =item on_event => $callback->(...) (default: C, unlike L)
244              
245             The callback to invoke on events.
246              
247             =item init => $initfunction (default: none)
248              
249             The function to call in the child, once before handling requests.
250              
251             =item serialiser => $serialiser (defailt: $AnyEvent::Fork::RPC::STRING_SERIALISER)
252              
253             The serialiser to use.
254              
255             =back
256              
257             =back
258              
259             =cut
260              
261             sub run {
262 1     1 1 5562 my ($template, $function, %arg) = @_;
263              
264 1   50     8 my $max = $arg{max} || 4;
265             my $idle = $arg{idle} || 0,
266             my $load = $arg{load} || 2,
267             my $start = $arg{start} || 0.1,
268             my $stop = $arg{stop} || 10,
269 0     0     my $on_event = $arg{on_event} || sub { },
270 1   50     23 my $on_destroy = $arg{on_destroy};
      50        
      50        
      50        
      50        
271              
272 1         99 my @rpc = (
273             async => $arg{async},
274             init => $arg{init},
275             serialiser => delete $arg{serialiser},
276             on_error => $arg{on_error},
277             );
278              
279 1         3 my (@pool, @queue, $nidle, $start_w, $stop_w, $shutdown);
280 0         0 my ($start_worker, $stop_worker, $want_start, $want_stop, $scheduler);
281              
282             my $destroy_guard = Guard::guard {
283 1 50   1   77 $on_destroy->()
284             if $on_destroy;
285 1         8 };
286              
287 1 50       8 $template
288             ->require ("AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync"))
289             ->eval ('
290             my ($magic0, $magic1) = @_;
291             sub AnyEvent::Fork::Pool::retire() {
292             AnyEvent::Fork::RPC::event $magic0, "quit", $magic1;
293             }
294             ', $magic0, $magic1)
295             ;
296              
297             $start_worker = sub {
298 11     11   58 my $proc = [0, 0, undef]; # load, index, rpc
299              
300             $proc->[2] = $template
301             ->fork
302             ->AnyEvent::Fork::RPC::run ($function,
303             @rpc,
304             on_event => sub {
305 19 50 33     404631 if (@_ == 3 && $_[0] eq $magic0 && $_[2] eq $magic1) {
      33        
306 19         28 $destroy_guard if 0; # keep it alive
307              
308 19 50       104 $_[1] eq "quit" and $stop_worker->($proc);
309 19         201 return;
310             }
311              
312 0         0 &$on_event;
313             },
314             )
315 11         124 ;
316              
317 11         5271 ++$nidle;
318 11         63 Array::Heap::push_heap_idx @pool, $proc;
319              
320 11         55 Scalar::Util::weaken $proc;
321 1         41 };
322              
323             $stop_worker = sub {
324 19     19   33 my $proc = shift;
325              
326 19 50       119 $proc->[0]
327             or --$nidle;
328              
329 19 100       105 Array::Heap::splice_heap_idx @pool, $proc->[1]
330             if defined $proc->[1];
331              
332 19         206 @$proc = 0; # tell others to leave it be
333 1         4 };
334              
335             $want_start = sub {
336 60     60   70 undef $stop_w;
337              
338             $start_w ||= AE::timer $start, $start, sub {
339 9 100 66     347423 if (($nidle < $idle || @queue) && @pool < $max) {
      66        
340 8         45 $start_worker->();
341 8         25 $scheduler->();
342             } else {
343 1         54 undef $start_w;
344             }
345 60   66     138 };
346 1         4 };
347              
348             $want_stop = sub {
349             $stop_w ||= AE::timer $stop, $stop, sub {
350 0 0       0 $stop_worker->($pool[0])
351             if $nidle;
352              
353 0 0       0 undef $stop_w
354             if $nidle <= $idle;
355 0   0 0   0 };
356 1         4 };
357              
358             $scheduler = sub {
359 69 100   69   171 if (@queue) {
    50          
360 63         142 while (@queue) {
361 90 100       1207 @pool or $start_worker->();
362              
363 90         105 my $proc = $pool[0];
364              
365 90 100       217 if ($proc->[0] < $load) {
366             # found free worker, increase load
367 30 100       128 unless ($proc->[0]++) {
368             # worker became busy
369 11 50       33 --$nidle
370             or undef $stop_w;
371              
372 11 50 33     36 $want_start->()
373             if $nidle < $idle && @pool < $max;
374             }
375              
376 30         102 Array::Heap::adjust_heap_idx @pool, 0;
377              
378 30         68 my $job = shift @queue;
379 30         55 my $ocb = pop @$job;
380              
381             $proc->[2]->(@$job, sub {
382             # reduce load
383 30 50 33     215453 --$proc->[0] # worker still busy?
384             or ++$nidle > $idle # not too many idle processes?
385             or $want_stop->();
386              
387 30 100       150 Array::Heap::adjust_heap_idx @pool, $proc->[1]
388             if defined $proc->[1];
389              
390 30         96 &$ocb;
391              
392 30         3934 $scheduler->();
393 30         235 });
394             } else {
395 60 50       189 $want_start->()
396             unless @pool >= $max;
397              
398 60         372 last;
399             }
400             }
401             } elsif ($shutdown) {
402             undef $_->[2]
403 6         43 for @pool;
404              
405 6         45 undef $start_w;
406 6         9 undef $start_worker; # frees $destroy_guard reference
407              
408 6         97 $stop_worker->($pool[0])
409             while $nidle;
410             }
411 1         4 };
412              
413             my $shutdown_guard = Guard::guard {
414 1     1   16 $shutdown = 1;
415 1         2 $scheduler->();
416 1         5 };
417              
418 1         4 $start_worker->()
419             while @pool < $idle;
420              
421             sub {
422 30     30   184 $shutdown_guard if 0; # keep it alive
423              
424 30 100       38 $start_worker->()
425             unless @pool;
426              
427 30         48 push @queue, [@_];
428 30         32 $scheduler->();
429             }
430 1         6 }
431              
432             =item $pool->(..., $cb->(...))
433              
434             Call the RPC function of a worker with the given arguments, and when the
435             worker is done, call the C<$cb> with the results, just like calling the
436             RPC object durectly - see the L documentation for
437             details on the RPC API.
438              
439             If there is no free worker, the call will be queued until a worker becomes
440             available.
441              
442             Note that there can be considerable time between calling this method and
443             the call actually being executed. During this time, the parameters passed
444             to this function are effectively read-only - modifying them after the call
445             and before the callback is invoked causes undefined behaviour.
446              
447             =cut
448              
449             =item $cpus = AnyEvent::Fork::Pool::ncpu [$default_cpus]
450              
451             =item ($cpus, $eus) = AnyEvent::Fork::Pool::ncpu [$default_cpus]
452              
453             Tries to detect the number of CPUs (C<$cpus> often called CPU cores
454             nowadays) and execution units (C<$eus>) which include e.g. extra
455             hyperthreaded units). When C<$cpus> cannot be determined reliably,
456             C<$default_cpus> is returned for both values, or C<1> if it is missing.
457              
458             For normal CPU bound uses, it is wise to have as many worker processes
459             as CPUs in the system (C<$cpus>), if nothing else uses the CPU. Using
460             hyperthreading is usually detrimental to performance, but in those rare
461             cases where that really helps it might be beneficial to use more workers
462             (C<$eus>).
463              
464             Currently, F is parsed on GNU/Linux systems for both
465             C<$cpus> and C<$eus>, and on {Free,Net,Open}BSD, F is
466             used for C<$cpus>.
467              
468             Example: create a worker pool with as many workers as CPU cores, or C<2>,
469             if the actual number could not be determined.
470              
471             $fork->AnyEvent::Fork::Pool::run ("myworker::function",
472             max => (scalar AnyEvent::Fork::Pool::ncpu 2),
473             );
474              
475             =cut
476              
477             BEGIN {
478 2 50 0 2   14 if ($^O eq "linux") {
    0 0        
479             *ncpu = sub(;$) {
480 0     0   0 my ($cpus, $eus);
481              
482 0 0       0 if (open my $fh, "<", "/proc/cpuinfo") {
483 0         0 my %id;
484              
485 0         0 while (<$fh>) {
486 0 0       0 if (/^core id\s*:\s*(\d+)/) {
487 0         0 ++$eus;
488 0         0 undef $id{$1};
489             }
490             }
491              
492 0         0 $cpus = scalar keys %id;
493             } else {
494 0 0       0 $cpus = $eus = @_ ? shift : 1;
495             }
496 0 0       0 wantarray ? ($cpus, $eus) : $cpus
497 2         117 };
498             } elsif ($^O eq "freebsd" || $^O eq "netbsd" || $^O eq "openbsd") {
499             *ncpu = sub(;$) {
500 0   0     0 my $cpus = qx * 1
501             || (@_ ? shift : 1);
502 0 0       0 wantarray ? ($cpus, $cpus) : $cpus
503 0         0 };
504             } else {
505             *ncpu = sub(;$) {
506 0 0       0 my $cpus = @_ ? shift : 1;
507 0 0       0 wantarray ? ($cpus, $cpus) : $cpus
508 0         0 };
509             }
510             }
511              
512             =back
513              
514             =head1 CHILD USAGE
515              
516             In addition to the L API, this module implements one
517             more child-side function:
518              
519             =over 4
520              
521             =item AnyEvent::Fork::Pool::retire ()
522              
523             This function sends an event to the parent process to request retirement:
524             the worker is removed from the pool and no new jobs will be sent to it,
525             but it still has to handle the jobs that are already queued.
526              
527             The parentheses are part of the syntax: the function usually isn't defined
528             when you compile your code (because that happens I handing the
529             template process over to C, so you need the
530             empty parentheses to tell Perl that the function is indeed a function.
531              
532             Retiring a worker can be useful to gracefully shut it down when the worker
533             deems this useful. For example, after executing a job, it could check the
534             process size or the number of jobs handled so far, and if either is too
535             high, the worker could request to be retired, to avoid memory leaks to
536             accumulate.
537              
538             Example: retire a worker after it has handled roughly 100 requests. It
539             doesn't matter whether you retire at the beginning or end of your request,
540             as the worker will continue to handle some outstanding requests. Likewise,
541             it's ok to call retire multiple times.
542              
543             my $count = 0;
544              
545             sub my::worker {
546              
547             ++$count == 100
548             and AnyEvent::Fork::Pool::retire ();
549              
550             ... normal code goes here
551             }
552              
553             =back
554              
555             =head1 POOL PARAMETERS RECIPES
556              
557             This section describes some recipes for pool parameters. These are mostly
558             meant for the synchronous RPC backend, as the asynchronous RPC backend
559             changes the rules considerably, making workers themselves responsible for
560             their scheduling.
561              
562             =over 4
563              
564             =item low latency - set load = 1
565              
566             If you need a deterministic low latency, you should set the C
567             parameter to C<1>. This ensures that never more than one job is sent to
568             each worker. This avoids having to wait for a previous job to finish.
569              
570             This makes most sense with the synchronous (default) backend, as the
571             asynchronous backend can handle multiple requests concurrently.
572              
573             =item lowest latency - set load = 1 and idle = max
574              
575             To achieve the lowest latency, you additionally should disable any dynamic
576             resizing of the pool by setting C to the same value as C.
577              
578             =item high throughput, cpu bound jobs - set load >= 2, max = #cpus
579              
580             To get high throughput with cpu-bound jobs, you should set the maximum
581             pool size to the number of cpus in your system, and C to at least
582             C<2>, to make sure there can be another job waiting for the worker when it
583             has finished one.
584              
585             The value of C<2> for C is the minimum value that I achieve
586             100% throughput, but if your parent process itself is sometimes busy, you
587             might need higher values. Also there is a limit on the amount of data that
588             can be "in flight" to the worker, so if you send big blobs of data to your
589             worker, C might have much less of an effect.
590              
591             =item high throughput, I/O bound jobs - set load >= 2, max = 1, or very high
592              
593             When your jobs are I/O bound, using more workers usually boils down to
594             higher throughput, depending very much on your actual workload - sometimes
595             having only one worker is best, for example, when you read or write big
596             files at maximum speed, as a second worker will increase seek times.
597              
598             =back
599              
600             =head1 EXCEPTIONS
601              
602             The same "policy" as with L applies - exceptions
603             will not be caught, and exceptions in both worker and in callbacks causes
604             undesirable or undefined behaviour.
605              
606             =head1 SEE ALSO
607              
608             L, to create the processes in the first place.
609              
610             L, likewise, but helpful for remote processes.
611              
612             L, which implements the RPC protocol and API.
613              
614             =head1 AUTHOR AND CONTACT INFORMATION
615              
616             Marc Lehmann
617             http://software.schmorp.de/pkg/AnyEvent-Fork-Pool
618              
619             =cut
620              
621             1
622