File Coverage

blib/lib/AnyEvent/Fork/Pool.pm
Criterion Covered Total %
statement 23 108 21.3
branch 1 58 1.7
condition 0 45 0.0
subroutine 8 19 42.1
pod 1 1 100.0
total 33 231 14.2


line stmt bran cond sub pod time code
1             =head1 NAME
2              
3             AnyEvent::Fork::Pool - simple process pool manager on top of AnyEvent::Fork
4              
5             THE API IS NOT FINISHED, CONSIDER THIS AN ALPHA RELEASE
6              
7             =head1 SYNOPSIS
8              
9             use AnyEvent;
10             use AnyEvent::Fork::Pool;
11             # use AnyEvent::Fork is not needed
12              
13             # all possible parameters shown, with default values
14             my $pool = AnyEvent::Fork
15             ->new
16             ->require ("MyWorker")
17             ->AnyEvent::Fork::Pool::run (
18             "MyWorker::run", # the worker function
19              
20             # pool management
21             max => 4, # absolute maximum # of processes
22             idle => 0, # minimum # of idle processes
23             load => 2, # queue at most this number of jobs per process
24             start => 0.1, # wait this many seconds before starting a new process
25             stop => 10, # wait this many seconds before stopping an idle process
26             on_destroy => (my $finish = AE::cv), # called when object is destroyed
27              
28             # parameters passed to AnyEvent::Fork::RPC
29             async => 0,
30             on_error => sub { die "FATAL: $_[0]\n" },
31             on_event => sub { my @ev = @_ },
32             init => "MyWorker::init",
33             serialiser => $AnyEvent::Fork::RPC::STRING_SERIALISER,
34             );
35              
36             for (1..10) {
37             $pool->(doit => $_, sub {
38             print "MyWorker::run returned @_\n";
39             });
40             }
41              
42             undef $pool;
43              
44             $finish->recv;
45              
46             =head1 DESCRIPTION
47              
48             This module uses processes created via L and the RPC
49             protocol implement in L to create a load-balanced
50             pool of processes that handles jobs.
51              
52             Understanding of L is helpful but not critical to be able
53             to use this module, but a thorough understanding of L
54             is, as it defines the actual API that needs to be implemented in the
55             worker processes.
56              
57             =head1 EXAMPLES
58              
59             =head1 PARENT USAGE
60              
61             To create a pool, you first have to create a L object -
62             this object becomes your template process. Whenever a new worker process
63             is needed, it is forked from this template process. Then you need to
64             "hand off" this template process to the C module by
65             calling its run method on it:
66              
67             my $template = AnyEvent::Fork
68             ->new
69             ->require ("SomeModule", "MyWorkerModule");
70              
71             my $pool = $template->AnyEvent::Fork::Pool::run ("MyWorkerModule::myfunction");
72              
73             The pool "object" is not a regular Perl object, but a code reference that
74             you can call and that works roughly like calling the worker function
75             directly, except that it returns nothing but instead you need to specify a
76             callback to be invoked once results are in:
77              
78             $pool->(1, 2, 3, sub { warn "myfunction(1,2,3) returned @_" });
79              
80             =over 4
81              
82             =cut
83              
84             package AnyEvent::Fork::Pool;
85              
86 1     1   1382 use common::sense;
  1         7  
  1         4  
87              
88 1     1   44 use Scalar::Util ();
  1         1  
  1         12  
89              
90 1     1   727 use Guard ();
  1         507  
  1         18  
91 1     1   1977 use Array::Heap ();
  1         630  
  1         20  
92              
93 1     1   1649 use AnyEvent;
  1         5309  
  1         46  
94             # explicit version on next line, as some cpan-testers test with the 0.1 version,
95             # ignoring dependencies, and this line will at least give a clear indication of that.
96 1     1   895 use AnyEvent::Fork 0.6; # we don't actually depend on it, this is for convenience
  1         15487  
  1         34  
97 1     1   1043 use AnyEvent::Fork::RPC;
  1         1367  
  1         1594  
98              
99             # these are used for the first and last argument of events
100             # in the hope of not colliding. yes, I don't like it either,
101             # but didn't come up with an obviously better alternative.
102             my $magic0 = ':t6Z@HK1N%Dx@_7?=~-7NQgWDdAs6a,jFN=wLO0*jD*1%P';
103             my $magic1 = '<~53rexz.U`!]X[A235^"fyEoiTF\T~oH1l/N6+Djep9b~bI9`\1x%B~vWO1q*';
104              
105             our $VERSION = 1.1;
106              
107             =item my $pool = AnyEvent::Fork::Pool::run $fork, $function, [key => value...]
108              
109             The traditional way to call the pool creation function. But it is way
110             cooler to call it in the following way:
111              
112             =item my $pool = $fork->AnyEvent::Fork::Pool::run ($function, [key => value...])
113              
114             Creates a new pool object with the specified C<$function> as function
115             (name) to call for each request. The pool uses the C<$fork> object as the
116             template when creating worker processes.
117              
118             You can supply your own template process, or tell C
119             to create one.
120              
121             A relatively large number of key/value pairs can be specified to influence
122             the behaviour. They are grouped into the categories "pool management",
123             "template process" and "rpc parameters".
124              
125             =over 4
126              
127             =item Pool Management
128              
129             The pool consists of a certain number of worker processes. These options
130             decide how many of these processes exist and when they are started and
131             stopped.
132              
133             The worker pool is dynamically resized, according to (perceived :)
134             load. The minimum size is given by the C parameter and the maximum
135             size is given by the C parameter. A new worker is started every
136             C seconds at most, and an idle worker is stopped at most every
137             C second.
138              
139             You can specify the amount of jobs sent to a worker concurrently using the
140             C parameter.
141              
142             =over 4
143              
144             =item idle => $count (default: 0)
145              
146             The minimum amount of idle processes in the pool - when there are fewer
147             than this many idle workers, C will try to start new
148             ones, subject to the limits set by C and C.
149              
150             This is also the initial amount of workers in the pool. The default of
151             zero means that the pool starts empty and can shrink back to zero workers
152             over time.
153              
154             =item max => $count (default: 4)
155              
156             The maximum number of processes in the pool, in addition to the template
157             process. C will never have more than this number of
158             worker processes, although there can be more temporarily when a worker is
159             shut down and hasn't exited yet.
160              
161             =item load => $count (default: 2)
162              
163             The maximum number of concurrent jobs sent to a single worker process.
164              
165             Jobs that cannot be sent to a worker immediately (because all workers are
166             busy) will be queued until a worker is available.
167              
168             Setting this low improves latency. For example, at C<1>, every job that
169             is sent to a worker is sent to a completely idle worker that doesn't run
170             any other jobs. The downside is that throughput is reduced - a worker that
171             finishes a job needs to wait for a new job from the parent.
172              
173             The default of C<2> is usually a good compromise.
174              
175             =item start => $seconds (default: 0.1)
176              
177             When there are fewer than C workers (or all workers are completely
178             busy), then a timer is started. If the timer elapses and there are still
179             jobs that cannot be queued to a worker, a new worker is started.
180              
181             This sets the minimum time that all workers must be busy before a new
182             worker is started. Or, put differently, the minimum delay between starting
183             new workers.
184              
185             The delay is small by default, which means new workers will be started
186             relatively quickly. A delay of C<0> is possible, and ensures that the pool
187             will grow as quickly as possible under load.
188              
189             Non-zero values are useful to avoid "exploding" a pool because a lot of
190             jobs are queued in an instant.
191              
192             Higher values are often useful to improve efficiency at the cost of
193             latency - when fewer processes can do the job over time, starting more and
194             more is not necessarily going to help.
195              
196             =item stop => $seconds (default: 10)
197              
198             When a worker has no jobs to execute it becomes idle. An idle worker that
199             hasn't executed a job within this amount of time will be stopped, unless
200             the other parameters say otherwise.
201              
202             Setting this to a very high value means that workers stay around longer,
203             even when they have nothing to do, which can be good as they don't have to
204             be started on the netx load spike again.
205              
206             Setting this to a lower value can be useful to avoid memory or simply
207             process table wastage.
208              
209             Usually, setting this to a time longer than the time between load spikes
210             is best - if you expect a lot of requests every minute and little work
211             in between, setting this to longer than a minute avoids having to stop
212             and start workers. On the other hand, you have to ask yourself if letting
213             workers run idle is a good use of your resources. Try to find a good
214             balance between resource usage of your workers and the time to start new
215             workers - the processes created by L itself is fats at
216             creating workers while not using much memory for them, so most of the
217             overhead is likely from your own code.
218              
219             =item on_destroy => $callback->() (default: none)
220              
221             When a pool object goes out of scope, the outstanding requests are still
222             handled till completion. Only after handling all jobs will the workers
223             be destroyed (and also the template process if it isn't referenced
224             otherwise).
225              
226             To find out when a pool I has finished its work, you can set this
227             callback, which will be called when the pool has been destroyed.
228              
229             =back
230              
231             =item AnyEvent::Fork::RPC Parameters
232              
233             These parameters are all passed more or less directly to
234             L. They are only briefly mentioned here, for
235             their full documentation please refer to the L
236             documentation. Also, the default values mentioned here are only documented
237             as a best effort - the L documentation is binding.
238              
239             =over 4
240              
241             =item async => $boolean (default: 0)
242              
243             Whether to use the synchronous or asynchronous RPC backend.
244              
245             =item on_error => $callback->($message) (default: die with message)
246              
247             The callback to call on any (fatal) errors.
248              
249             =item on_event => $callback->(...) (default: C, unlike L)
250              
251             The callback to invoke on events.
252              
253             =item init => $initfunction (default: none)
254              
255             The function to call in the child, once before handling requests.
256              
257             =item serialiser => $serialiser (defailt: $AnyEvent::Fork::RPC::STRING_SERIALISER)
258              
259             The serialiser to use.
260              
261             =back
262              
263             =back
264              
265             =cut
266              
267             sub run {
268 0     0 1   my ($template, $function, %arg) = @_;
269              
270 0   0       my $max = $arg{max} || 4;
271             my $idle = $arg{idle} || 0,
272             my $load = $arg{load} || 2,
273             my $start = $arg{start} || 0.1,
274             my $stop = $arg{stop} || 10,
275 0     0     my $on_event = $arg{on_event} || sub { },
276 0   0       my $on_destroy = $arg{on_destroy};
      0        
      0        
      0        
      0        
277              
278 0           my @rpc = (
279             async => $arg{async},
280             init => $arg{init},
281             serialiser => delete $arg{serialiser},
282             on_error => $arg{on_error},
283             );
284              
285 0           my (@pool, @queue, $nidle, $start_w, $stop_w, $shutdown);
286 0           my ($start_worker, $stop_worker, $want_start, $want_stop, $scheduler);
287              
288             my $destroy_guard = Guard::guard {
289 0 0   0     $on_destroy->()
290             if $on_destroy;
291 0           };
292              
293 0 0         $template
294             ->require ("AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync"))
295             ->eval ('
296             my ($magic0, $magic1) = @_;
297             sub AnyEvent::Fork::Pool::retire() {
298             AnyEvent::Fork::RPC::event $magic0, "quit", $magic1;
299             }
300             ', $magic0, $magic1)
301             ;
302              
303             $start_worker = sub {
304 0     0     my $proc = [0, 0, undef]; # load, index, rpc
305              
306             $proc->[2] = $template
307             ->fork
308             ->AnyEvent::Fork::RPC::run ($function,
309             @rpc,
310             on_event => sub {
311 0 0 0       if (@_ == 3 && $_[0] eq $magic0 && $_[2] eq $magic1) {
      0        
312 0           $destroy_guard if 0; # keep it alive
313              
314 0 0         $_[1] eq "quit" and $stop_worker->($proc);
315 0           return;
316             }
317              
318 0           &$on_event;
319             },
320             )
321 0           ;
322              
323 0           ++$nidle;
324 0           Array::Heap::push_heap_idx @pool, $proc;
325              
326 0           Scalar::Util::weaken $proc;
327 0           };
328              
329             $stop_worker = sub {
330 0     0     my $proc = shift;
331              
332 0 0         $proc->[0]
333             or --$nidle;
334              
335 0 0         Array::Heap::splice_heap_idx @pool, $proc->[1]
336             if defined $proc->[1];
337              
338 0           @$proc = 0; # tell others to leave it be
339 0           };
340              
341             $want_start = sub {
342 0     0     undef $stop_w;
343              
344             $start_w ||= AE::timer $start, $start, sub {
345 0 0 0       if (($nidle < $idle || @queue) && @pool < $max) {
      0        
346 0           $start_worker->();
347 0           $scheduler->();
348             } else {
349 0           undef $start_w;
350             }
351 0   0       };
352 0           };
353              
354             $want_stop = sub {
355             $stop_w ||= AE::timer $stop, $stop, sub {
356 0 0         $stop_worker->($pool[0])
357             if $nidle;
358              
359 0 0         undef $stop_w
360             if $nidle <= $idle;
361 0   0 0     };
362 0           };
363              
364             $scheduler = sub {
365 0 0   0     if (@queue) {
    0          
366 0           while (@queue) {
367 0 0         @pool or $start_worker->();
368              
369 0           my $proc = $pool[0];
370              
371 0 0         if ($proc->[0] < $load) {
372             # found free worker, increase load
373 0 0         unless ($proc->[0]++) {
374             # worker became busy
375 0 0         --$nidle
376             or undef $stop_w;
377              
378 0 0 0       $want_start->()
379             if $nidle < $idle && @pool < $max;
380             }
381              
382 0           Array::Heap::adjust_heap_idx @pool, 0;
383              
384 0           my $job = shift @queue;
385 0           my $ocb = pop @$job;
386              
387             $proc->[2]->(@$job, sub {
388             # reduce load
389 0 0 0       --$proc->[0] # worker still busy?
390             or ++$nidle > $idle # not too many idle processes?
391             or $want_stop->();
392              
393 0 0         Array::Heap::adjust_heap_idx @pool, $proc->[1]
394             if defined $proc->[1];
395              
396 0           &$ocb;
397              
398 0           $scheduler->();
399 0           });
400             } else {
401 0 0         $want_start->()
402             unless @pool >= $max;
403              
404 0           last;
405             }
406             }
407             } elsif ($shutdown) {
408 0           @pool = ();
409 0           undef $start_w;
410 0           undef $start_worker; # frees $destroy_guard reference
411              
412 0           $stop_worker->($pool[0])
413             while $nidle;
414             }
415 0           };
416              
417             my $shutdown_guard = Guard::guard {
418 0     0     $shutdown = 1;
419 0           $scheduler->();
420 0           };
421              
422 0           $start_worker->()
423             while @pool < $idle;
424              
425             sub {
426 0     0     $shutdown_guard if 0; # keep it alive
427              
428 0 0         $start_worker->()
429             unless @pool;
430              
431 0           push @queue, [@_];
432 0           $scheduler->();
433             }
434 0           }
435              
436             =item $pool->(..., $cb->(...))
437              
438             Call the RPC function of a worker with the given arguments, and when the
439             worker is done, call the C<$cb> with the results, just like calling the
440             RPC object durectly - see the L documentation for
441             details on the RPC API.
442              
443             If there is no free worker, the call will be queued until a worker becomes
444             available.
445              
446             Note that there can be considerable time between calling this method and
447             the call actually being executed. During this time, the parameters passed
448             to this function are effectively read-only - modifying them after the call
449             and before the callback is invoked causes undefined behaviour.
450              
451             =cut
452              
453             =item $cpus = AnyEvent::Fork::Pool::ncpu [$default_cpus]
454              
455             =item ($cpus, $eus) = AnyEvent::Fork::Pool::ncpu [$default_cpus]
456              
457             Tries to detect the number of CPUs (C<$cpus> often called cpu cores
458             nowadays) and execution units (C<$eus>) which include e.g. extra
459             hyperthreaded units). When C<$cpus> cannot be determined reliably,
460             C<$default_cpus> is returned for both values, or C<1> if it is missing.
461              
462             For normal CPU bound uses, it is wise to have as many worker processes
463             as CPUs in the system (C<$cpus>), if nothing else uses the CPU. Using
464             hyperthreading is usually detrimental to performance, but in those rare
465             cases where that really helps it might be beneficial to use more workers
466             (C<$eus>).
467              
468             Currently, F is parsed on GNU/Linux systems for both
469             C<$cpus> and C<$eu>, and on {Free,Net,Open}BSD, F is
470             used for C<$cpus>.
471              
472             Example: create a worker pool with as many workers as cpu cores, or C<2>,
473             if the actual number could not be determined.
474              
475             $fork->AnyEvent::Fork::Pool::run ("myworker::function",
476             max => (scalar AnyEvent::Fork::Pool::ncpu 2),
477             );
478              
479             =cut
480              
481             BEGIN {
482 1 50 0 1   6 if ($^O eq "linux") {
    0 0        
483             *ncpu = sub(;$) {
484 0     0     my ($cpus, $eus);
485              
486 0 0         if (open my $fh, "<", "/proc/cpuinfo") {
487 0           my %id;
488              
489 0           while (<$fh>) {
490 0 0         if (/^core id\s*:\s*(\d+)/) {
491 0           ++$eus;
492 0           undef $id{$1};
493             }
494             }
495              
496 0           $cpus = scalar keys %id;
497             } else {
498 0 0         $cpus = $eus = @_ ? shift : 1;
499             }
500 0 0         wantarray ? ($cpus, $eus) : $cpus
501 1         53 };
502             } elsif ($^O eq "freebsd" || $^O eq "netbsd" || $^O eq "openbsd") {
503             *ncpu = sub(;$) {
504 0   0       my $cpus = qx * 1
505             || (@_ ? shift : 1);
506 0 0         wantarray ? ($cpus, $cpus) : $cpus
507 0           };
508             } else {
509             *ncpu = sub(;$) {
510 0 0         my $cpus = @_ ? shift : 1;
511 0 0         wantarray ? ($cpus, $cpus) : $cpus
512 0           };
513             }
514             }
515              
516             =back
517              
518             =head1 CHILD USAGE
519              
520             In addition to the L API, this module implements one
521             more child-side function:
522              
523             =over 4
524              
525             =item AnyEvent::Fork::Pool::retire ()
526              
527             This function sends an event to the parent process to request retirement:
528             the worker is removed from the pool and no new jobs will be sent to it,
529             but it has to handle the jobs that are already queued.
530              
531             The parentheses are part of the syntax: the function usually isn't defined
532             when you compile your code (because that happens I handing the
533             template process over to C, so you need the
534             empty parentheses to tell Perl that the function is indeed a function.
535              
536             Retiring a worker can be useful to gracefully shut it down when the worker
537             deems this useful. For example, after executing a job, one could check
538             the process size or the number of jobs handled so far, and if either is
539             too high, the worker could ask to get retired, to avoid memory leaks to
540             accumulate.
541              
542             Example: retire a worker after it has handled roughly 100 requests.
543              
544             my $count = 0;
545              
546             sub my::worker {
547              
548             ++$count == 100
549             and AnyEvent::Fork::Pool::retire ();
550              
551             ... normal code goes here
552             }
553              
554             =back
555              
556             =head1 POOL PARAMETERS RECIPES
557              
558             This section describes some recipes for pool paramaters. These are mostly
559             meant for the synchronous RPC backend, as the asynchronous RPC backend
560             changes the rules considerably, making workers themselves responsible for
561             their scheduling.
562              
563             =over 4
564              
565             =item low latency - set load = 1
566              
567             If you need a deterministic low latency, you should set the C
568             parameter to C<1>. This ensures that never more than one job is sent to
569             each worker. This avoids having to wait for a previous job to finish.
570              
571             This makes most sense with the synchronous (default) backend, as the
572             asynchronous backend can handle multiple requests concurrently.
573              
574             =item lowest latency - set load = 1 and idle = max
575              
576             To achieve the lowest latency, you additionally should disable any dynamic
577             resizing of the pool by setting C to the same value as C.
578              
579             =item high throughput, cpu bound jobs - set load >= 2, max = #cpus
580              
581             To get high throughput with cpu-bound jobs, you should set the maximum
582             pool size to the number of cpus in your system, and C to at least
583             C<2>, to make sure there can be another job waiting for the worker when it
584             has finished one.
585              
586             The value of C<2> for C is the minimum value that I achieve
587             100% throughput, but if your parent process itself is sometimes busy, you
588             might need higher values. Also there is a limit on the amount of data that
589             can be "in flight" to the worker, so if you send big blobs of data to your
590             worker, C might have much less of an effect.
591              
592             =item high throughput, I/O bound jobs - set load >= 2, max = 1, or very high
593              
594             When your jobs are I/O bound, using more workers usually boils down to
595             higher throughput, depending very much on your actual workload - sometimes
596             having only one worker is best, for example, when you read or write big
597             files at maixmum speed, as a second worker will increase seek times.
598              
599             =back
600              
601             =head1 EXCEPTIONS
602              
603             The same "policy" as with L applies - exceptins will
604             not be caught, and exceptions in both worker and in callbacks causes
605             undesirable or undefined behaviour.
606              
607             =head1 SEE ALSO
608              
609             L, to create the processes in the first place.
610              
611             L, which implements the RPC protocol and API.
612              
613             =head1 AUTHOR AND CONTACT INFORMATION
614              
615             Marc Lehmann
616             http://software.schmorp.de/pkg/AnyEvent-Fork-Pool
617              
618             =cut
619              
620             1
621