File Coverage

blib/lib/Gearman/WorkerSpawner.pm
Criterion Covered Total %
statement 19 21 90.4
branch n/a
condition n/a
subroutine 7 7 100.0
pod n/a
total 26 28 92.8


line stmt bran cond sub pod time code
1             package Gearman::WorkerSpawner;
2              
3             =head1 NAME
4              
5             Gearman::WorkerSpawner - Subprocess manager for Gearman workers in a
6             Danga::Socket environment
7              
8             =head1 SYNOPSIS
9              
10             # write client code in some Danga::Socket environment, e.g. Perlbal:
11              
12             my $worker_manager = Gearman::WorkerSpawner->new;
13              
14             # add one or more workers
15             $worker_manager->add_worker(
16             class => 'AdditionWorker',
17             num_workers => 4,
18             config => {
19             left_hand => 3,
20             },
21             );
22             $worker_manager->run_method(adder => { right_hand => 3 }, sub {
23             my $return = shift;
24             print $return->{sum};
25             });
26             Danga::Socket->EventLoop;
27              
28             # and in the worker:
29              
30             package MethodWorker;
31             use base 'Gearman::WorkerSpawner::BaseWorker';
32              
33             sub new {
34             my MethodWorker $self = fields::new(shift);
35             $self->SUPER::new(@_);
36              
37             $self->register_method(adder => \&add);
38             return $self;
39             }
40              
41             sub add {
42             my MethodWorker $self = shift;
43             my $args = shift;
44             return { sum => $self->{config}{left_hand} + $args->{right_hand} };
45             }
46              
47             =head1 DESCRIPTION
48              
49             Launches subclasses of L in their own processes for
50             communication with a gearmand. External Gearman servers may be used, or one can
51             be created for the lifetime of the spawner.
52              
53             =cut
54              
55 7     7   333264 use strict;
  7         16  
  7         289  
56 7     7   39 use warnings;
  7         15  
  7         391  
57              
58             our $VERSION = '2.16';
59              
60 7     7   37 use Carp qw/ croak /;
  7         18  
  7         372  
61 7     7   28182 use Danga::Socket ();
  7         266306  
  7         253  
62 7     7   68 use Fcntl qw/ F_GETFD F_SETFD FD_CLOEXEC /;
  7         10  
  7         562  
63 7     7   8053 use Gearman::Client ();
  7         351759  
  7         157  
64 7     7   10833 use Gearman::Client::Async ();
  0            
  0            
65             use Gearman::Server ();
66             use IO::Handle ();
67             use IO::Socket::INET qw/ SOCK_STREAM /;
68             use POSIX qw/ :sys_wait_h /;
69             use Storable qw/ nfreeze thaw /;
70              
71             =head1 CLASS METHODS
72              
73             =over 4
74              
75             =item * Gearman::WorkerSpawner->new(%params)
76              
77             Constructor, can take the following parameters:
78              
79             =over 4
80              
81             =item * gearmand
82              
83             Specifies the location of the Gearman server to use. This may either be an
84             array reference of host:port specs ; or a comma separated list of host:port
85             specs; or I, which specifies that the WorkerSpawner should spawn a
86             separate process to contain a Gearman server. The advantage of using this over
87             running gearmand externally is that the Gearman server process will halt itself
88             in the event of the calling process' demise; the disadvantage is that you give
89             up gearmand redundancy. Defaults to I.
90              
91             =item * check_period
92              
93             Time in seconds between live-worker checks. Any zombie children are reaped with
94             C during the check, and enough workers are spawned to make the total
95             C again.
96              
97             =item * perl
98              
99             Path to the C binary with which to execute workers. Defaults to
100             C<$^X>.
101              
102             =item * reaper
103              
104             WorkerSpawner periodically reaps any dead children of its running process. If
105             there are non-WorkerSpawner child processes in your program, you won't know
106             when they die. To be notified of such events, you can provide a subref as the
107             C parameter which will be called with the PID and exit code of any
108             reaped children which don't belong to WorkerSpawner.
109              
110             Along that line, only a single WorkerSpawner may be created in a process
111             (otherwise multiple spawners would race to reap each others' children, making
112             worker accounting impossible). As such, new() will croak if called more than
113             once.
114              
115             =item * sigchld
116              
117             If true, a SIGCHLD handler is installed which immediately schedules a child
118             check, rather than waiting upwards of C seconds. Defaults to
119             true.
120              
121             =back
122              
123             =cut
124              
125             our $gearmand_spec;
126             our $singleton;
127             my $num_workers = 0;
128             my @open_slots;
129             my $started = 0;
130             my $quitting = 0;
131              
132             sub new {
133             croak 'only one WorkerSpawner allowed per process' if $started;
134              
135             my $class = shift;
136             my $ref = ref $class || $class;
137              
138             my %params = (
139             check_period => 1,
140             perl => $^X,
141             quitting => 0,
142             gearmand => 'auto',
143             sigchld => 1,
144             method_suffix => '_m',
145             @_
146             );
147              
148             $gearmand_spec = $params{gearmand};
149             gearman_servers(); # init the server singleton if necessary
150              
151             croak 'gearmand location not specified' unless defined @{ gearman_servers() };
152              
153             # NB: this structure must be Storable-serializable for all bits used by
154             # _supervise. see special handling in add_worker
155             my $self = bless \%params, $class;
156              
157             $params{initial_pid} = $$;
158              
159             # clean up any dead supervisors. will also catch non-WorkerSpawner processes,
160             # so fire the callback for those if provided
161             my $child_handler = sub {
162             my %reaped = $self->_reap();
163             while (my ($pid, $thing) = each %reaped) {
164             if ($thing->{action}) {
165             # spawner
166             $thing->{action}->($thing->{exit_code});
167             }
168             elsif ($self->{reaper}) {
169             # unowned child
170             $self->{reaper}->($pid, $thing->{exit_code});
171             }
172             }
173             };
174              
175             # restart children immediately if installing sigchld handler
176             $SIG{CHLD} = sub {
177             Danga::Socket->AddTimer(0, $child_handler);
178             } if $params{sigchld};
179              
180             # ... and/or check periodically
181             _run_periodically($child_handler, $self->{check_period});
182              
183             $started = 1;
184              
185             return $singleton = $self;
186             }
187              
188             =item * Gearman::WorkerSpawner->old
189              
190             Returns the Gearman::WorkerSpawner object created by a previous call to ->new.
191             Use this if you need a WorkerSpawner in multiple places in your code within the
192             same process and passing the object is tricky.
193              
194             =cut
195              
196             sub old {
197             return $singleton;
198             }
199              
200             =item Gearman::WorkerSpawner->gearmand_pid()
201              
202             Returns the PID of the gearmand which was started up if I was given
203             as the C parameter to C, or undef otherwise.
204              
205             =head1 OBJECT METHODS
206              
207             =over 4
208              
209             =item $spawner->add_worker(%options)
210              
211             Add a new worker set to the manager. A new supervisor process will be created
212             to manage it if one does not already exist for the worker class. Can take the
213             following parameters:
214              
215             =over 4
216              
217             =item * class
218              
219             (Required) The package name of the L subclass which will
220             register itself for work when instantiated. This need not be distinct across
221             different calls.
222              
223             =item * source
224              
225             (Optional) The path to the file containing the definition of 'class'; only
226             necessary if the module can't be use'd for some reason.
227              
228             =item * caller_source
229              
230             (Optional) If true, assume that the source for 'class' is the calling module or
231             script. This will generally fail if the working directory has changed since
232             program startup. This overrides I if both are provided.
233              
234             =item * num_workers
235              
236             The number of worker children to spawn. If any child processes die they will be
237             respawned. Defaults to 1.
238              
239             =item * config
240              
241             An opaque data structure to pass to the child process, generally used to keep
242             configuration that is specific to the worker but not any one job. Must be
243             serializable via Storable.
244              
245             =back
246              
247             =cut
248              
249             use constant SLOT_NUM => 0;
250             use constant SLOT_ID => 1;
251             use constant SLOT_PARAMS => 2;
252              
253             sub add_worker {
254             my Gearman::WorkerSpawner $self = shift;
255             my %params = (
256             num_workers => 1,
257             @_
258             );
259              
260             my $class = $params{class};
261             croak 'no class provided' unless $class;
262              
263             # exec this .pm file
264             (my $package_spec = __PACKAGE__ . '.pm') =~ s{::}{/}g;
265             my $package_file = $INC{$package_spec};
266             die "couldn't determine location of myself" unless $package_file;
267              
268              
269             # "slots" are the set of jobs that each supervisor is managing. each worker
270             # slot gets different parameters so they can differentiate themselves
271             # (like an MPI rank). @open_slots contains the slot# and startup params for
272             # any slot without a live worker child. the originating process has only
273             # open slots; supervisors in child processes fill the slots by spawning
274             # workers. parent assigns the slots so that it knows how to contact them
275             # in wait_until_all_ready.
276             my @slots;
277             for my $slot_num ($num_workers..$num_workers+$params{num_workers}-1) {
278             my $worker_id = sprintf '%d:%s/%s', $slot_num, $class, substr rand() . '0'x16, 2, 16;
279             push @slots, [$slot_num, $worker_id, \%params];
280             }
281             push @open_slots, @slots;
282             $num_workers += $params{num_workers};
283              
284             my $success = 1;
285             local $SIG{CHLD} = 'IGNORE';
286             for (1 .. 10) {
287             my $cmd = '';
288              
289             my $writer = $self->{supervisors}{$class};
290             if (!defined $writer) {
291             # don't have an existing child for this worker class
292              
293             # logically, we want to call $self->_supervise, except in a separate
294             # process which has a reduced memory footprint after exec'ing. therefore we
295             # need to recreate $self and parameters in the "remote" _supervise
296             # procedure. create a pipe over which to do that.
297             pipe(my $reader, $writer) or die "pipe failed: $!\n";
298             $writer->autoflush(1);
299             $reader->autoflush(1);
300              
301             # so exec doesn't close it
302             fcntl($reader, F_GETFD, my $flags = '');
303             vec($flags, FD_CLOEXEC, 1) = 0;
304             fcntl($reader, F_SETFD, $flags);
305              
306             my $parent_pid = $$;
307              
308             my $pid = fork;
309             die "failed to fork: $!\n" unless defined $pid;
310              
311             if ($pid) {
312             # parent
313             $self->{supervisors}{$class} = $writer;
314             close $reader;
315              
316             $self->{kids}{$pid}{action} = sub {
317             # supervisor shouldn't exit; compilation of worker class probably failed
318             my $code = shift;
319             if ($code != 0) {
320             die "supervisor died ($code)\n";
321             }
322              
323             # invalidate cmd pipe "cache" when kid dies
324             delete $self->{supervisors}{$class};
325             };
326              
327             # make a serializable copy of $self
328             my $storable_self = bless {
329             map { $_ => $self->{$_} }
330             grep {
331             $_ ne 'supervisors' && # globs aren't serializable
332             $_ ne 'kids' # so DESTROY doesn't kill them
333             }
334             keys %$self
335             }, __PACKAGE__;
336              
337             $params{source} = (caller)[1] if $params{caller_source};
338              
339             # first command is startup parameters
340             $cmd = _serialize({
341             spawner => $storable_self,
342             class => $class,
343             ppid => $parent_pid,
344             gearmand => gearman_servers(),
345             source => $params{source},
346             inc => \@INC,
347             });
348             }
349             else {
350             # child: start supervisor in a distinct process to manage the new jobs
351             exec $self->{perl}, $package_file, fileno $reader; # $self->_supervise
352             die "exec failed: $!\n";
353             }
354             }
355              
356             # subsequent commands start new workers
357             $cmd .= _serialize(\@slots);
358              
359             local $SIG{PIPE} = 'IGNORE';
360             return if print $writer $cmd;
361              
362             # print failed, try again
363             delete $self->{supervisors}{$class} unless $success;
364             sleep 1;
365             }
366             die "failed to spawn workers";
367             }
368              
369             =item $spawner->wait_until_all_ready()
370              
371             Returns only once all worker are ready to accept jobs. This will only wait on
372             workers which have been started since the last call to wait_until_all_ready.
373              
374             =cut
375              
376             sub wait_until_all_ready {
377             my Gearman::WorkerSpawner $self = shift;
378             my $timeout = shift || 0.1;
379              
380             my $client = Gearman::Client->new(job_servers => gearman_servers());
381             my $task_set = $client->new_task_set;
382              
383             while (my $slot = shift @open_slots) {
384             $task_set->add_task(
385             _ping_name($slot->[SLOT_ID]),
386             undef,
387             {
388             timeout => $timeout,
389             retry_count => 1_000_000,
390             }
391             );
392             }
393              
394             $task_set->wait;
395             }
396              
397             =item $spawner->add_task($task)
398              
399             =item $spawner->add_task($funcname, $arg, \%options)
400              
401             Asynchronously submits a task to a configured Gearman server. May either
402             take a L object, or the 3 arguments that the Gearman::Task
403             constructor takes.
404              
405             =cut
406              
407             sub add_task {
408             my Gearman::WorkerSpawner $self = shift;
409             my $task = shift;
410              
411             croak "task object or Gearman::Task->new parameters required)"
412             unless $task;
413              
414             if (ref $task && $task->isa('Gearman::Task')) {
415             _gearman_client()->add_task($task);
416             }
417             else {
418             _gearman_client()->add_task(Gearman::Task->new($task, @_));
419             }
420             }
421              
422             =item $spawner->run_method($funcname, $arg, \%options)
423              
424             =item $spawner->run_method($funcname, $arg, $callback)
425              
426             Submits a task but with less boilerplate than add_task. %options is the same as
427             for add_task. Marshaling of $arg is done for you in a manner compatible with
428             methods created with Gearman::WorkerSpawner::BaseWorker::register_method. The
429             on_fail handler will be called if marshalling fails for some reason.
430              
431             If the second form is used, an empty %options is created and $callback is used
432             as the on_complete handler.
433              
434             =cut
435              
436             sub run_method {
437             my Gearman::WorkerSpawner $self = shift;
438             my ($methodname, $arg, $options) = @_;
439              
440             $methodname .= $self->{method_suffix};
441              
442             if (ref $options eq 'CODE') {
443             $options = { on_complete => $options };
444             }
445              
446             # wrap callback with Storable marshaling of arguments
447             if (my $cb = delete $options->{on_complete}) {
448             $options->{on_complete} = sub {
449             my $ref_to_frozen_retval = shift;
450              
451             if (!$ref_to_frozen_retval || ref $ref_to_frozen_retval ne 'SCALAR') {
452             $options->{on_fail}->('marshaling error') if exists $options->{on_fail};
453             return;
454             }
455              
456             my $rets = eval { thaw($$ref_to_frozen_retval) };
457             if ($@) {
458             $options->{on_fail}->($@) if exists $options->{on_fail};
459             return;
460             }
461             elsif (ref $rets ne 'ARRAY') {
462             $options->{on_fail}->('marshaling error') if exists $options->{on_fail};
463             return;
464             }
465              
466             $cb->(@$rets);
467             };
468             }
469              
470             # serialize parameter
471             _gearman_client()->add_task(Gearman::Task->new($methodname, \nfreeze([$arg]), $options));
472             }
473              
474             =item method_suffix([$suffix])
475              
476             Accessor for the suffix which is appended to the method name. Defaults to '_m'.
477              
478             =cut
479              
480             sub method_suffix {
481             my Gearman::WorkerSpawner $self = shift;
482             $self->{method_suffix} = shift if @_;;
483             return $self->{method_suffix};
484             }
485              
486             =item $spawner->stop_workers([$sig])
487              
488             Tell all spawned processes to quit (by default, with SIGINT).
489              
490             =cut
491              
492             sub stop_workers {
493             my Gearman::WorkerSpawner $self = shift;
494             my $signal = shift || 'INT';
495             $self->{quitting}++;
496             kill $signal, keys %{ $self->{kids} };
497             }
498              
499             =item DESTROY
500              
501             Upon destruction, stop_workers is called unless you've already called it.
502              
503             =cut
504              
505             sub DESTROY {
506             my Gearman::WorkerSpawner $self = shift;
507             $self->stop_workers unless $self->{quitting} || $self->{initial_pid} != $$;
508             }
509              
510             =item $spawner->gearman_servers()
511              
512             Returns an arrayref of server host:port specs. If an 'auto' server was
513             requested, its hostspec is included.
514              
515             =cut
516              
517             # singleton server list
518             my $gearman_servers;
519             my $gearmand_pid;
520             sub gearman_servers {
521             unless ($gearman_servers) {
522             use Carp; Carp::cluck("bad server list") unless defined $gearmand_spec;
523             if (ref $gearmand_spec eq 'ARRAY') {
524             $gearman_servers = [@$gearmand_spec];
525             }
526             elsif ($gearmand_spec eq 'auto' || $gearmand_spec eq 'external') {
527             # ask OS for open listening port
528             my $gearmand_port;
529             eval {
530             my $sock = IO::Socket::INET->new(
531             Type => SOCK_STREAM,
532             Proto => 'tcp',
533             Reuse => 1,
534             Listen => 1,
535             );
536             $gearmand_port = $sock->sockport;
537             $sock->close;
538             };
539             die "failed to create listening socket: $@" if $@;
540              
541             die "couldn't find an open port for gearmand" unless $gearmand_port;
542              
543             # fork a clingy gearmand
544             my $parent_pid = $$;
545             my $pid = fork;
546             die "fork failed: $!" unless defined $pid;
547             if ($pid) {
548             $gearman_servers = ["127.0.0.1:$gearmand_port"];
549             $gearmand_pid = $pid;
550             # don't return until the server is contactable
551             while (1) {
552             last if IO::Socket::INET->new(
553             PeerAddr => $gearman_servers->[0],
554             );
555             select undef, undef, undef, 0.1;
556             }
557             }
558             else {
559             $0 = 'gearmand-WorkerSpawner';
560             Danga::Socket->Reset();
561             my $server = Gearman::Server->new;
562             $server->create_listening_sock($gearmand_port);
563             _run_periodically(sub { exit if getppid != $parent_pid }, 5);
564             Danga::Socket->EventLoop();
565             exit 0;
566             }
567             }
568             else {
569             $gearman_servers = [split /[ ,]+/, $gearmand_spec];
570             }
571             }
572             return $gearman_servers;
573             }
574              
575             # historical alias
576             no warnings 'once';
577             *gearman_server = \&gearman_servers;
578              
579             sub gearmand_pid {
580             return $gearmand_pid || undef;
581             }
582              
583             =back
584              
585             =head1 INTERNAL METHODS
586              
587             =over 4
588              
589             =cut
590              
591             =item $spawner->_gearman_client()
592              
593             Returns the L object used by the spawner.
594              
595             =cut
596              
597             my $gearman_client;
598             sub _gearman_client {
599             return $gearman_client ||= Gearman::Client::Async->new(job_servers => gearman_servers());
600             }
601              
602             =item Gearman::WorkerSpawner->_supervise('My::WorkerClass', @ARGV)
603              
604             Loads the given L subclass, then parses additional arguments
605             as specified by the return value of the worker class' C class method
606             via L. These options are passed to the worker object's
607             constructor and the C method of the worker object is called repeatedly
608             until either SIG_INT is received or the ppid changes (parent went away).
609              
610             This class method is automatically executed if Gearman/WorkerSpawner.pm has no
611             C, i.e. if it is run as a script rather than loaded as a module. This
612             should only be done by other internal methods of this package (add_worker).
613              
614             =back
615              
616             =cut
617              
618             sub _supervise {
619             my $spawner_class = shift;
620              
621             die "modulino invoked incorrectly, see documentation\n" unless @_;
622              
623             my $fileno = shift;
624             open my $reader, '<&=', $fileno or die "failed to open pipe: $!\n";
625              
626             chomp(my $startup_data = <$reader>); # need this now, so allow blocking read
627             my $startup_params = _unserialize($startup_data);
628              
629             @INC = @{ $startup_params->{inc} };
630              
631             my $worker_class = $startup_params->{class};
632             $0 = sprintf "%s supervisor", $worker_class;
633              
634             die "no worker class provided" unless $worker_class;
635             die "parent went away before I started" if getppid != $startup_params->{ppid};
636              
637             if (my $source_file = $startup_params->{source}) {
638             unless (eval "require '$source_file'; 1") {
639             die "failed to load worker class $worker_class from $source_file: $@";
640             }
641             }
642             else {
643             unless (eval "use $worker_class; 1") {
644             die "failed to load worker class $worker_class: $@";
645             }
646             }
647              
648             my $self = $startup_params->{spawner};
649              
650             $gearman_servers = $self->{gearmand} = $startup_params->{gearmand};
651             $self->{supervisor_pid} = $$;
652              
653             # set nonblocking since these commands come any time
654             IO::Handle::blocking($reader, 0);
655             my $read_buf = '';
656             my $handler = sub {
657             while (my $line = <$reader>) {
658             $read_buf .= $line;
659             last unless $line =~ /\n$/;
660             chomp($read_buf);
661             my $slots = _unserialize($read_buf);
662             $read_buf = '';
663             push @open_slots, @$slots;
664             }
665             };
666             $handler->();
667              
668             # spin up initial workers
669             $self->_check_workers;
670              
671             # watch for parent going away
672             _run_periodically(sub { $self->_cleanup() if getppid != $startup_params->{ppid} }, 5);
673             $SIG{INT} = $SIG{TERM} = sub { $self->_cleanup };
674              
675             # install handler for parent asking to start more workers
676             Danga::Socket->AddOtherFds(fileno $reader, $handler);
677              
678             # periodically check for children needing replacement
679             _run_periodically(sub { $self->_check_workers }, $self->{check_period});
680              
681             Danga::Socket->EventLoop;
682             exit 1;
683             }
684              
685             # try to reap any worker processes, and start up any that are missing. also
686             # starts up workers for the first time after they're added
687             sub _check_workers {
688             my Gearman::WorkerSpawner $self = shift;
689              
690             # reap slots from dead kids
691             my %reaped = $self->_reap();
692              
693             for my $pid (keys %reaped) {
694             my $open_slot = $reaped{$pid}{slot};
695             if (defined $open_slot) {
696             push @open_slots, $open_slot;
697             }
698             else {
699             warn "dead child $pid didn't own a slot";
700             }
701             }
702              
703             return if $self->{quitting};
704              
705             return unless @open_slots;
706              
707             # refill lowest slots first
708             @open_slots = sort {$a->[SLOT_NUM]<=>$b->[SLOT_NUM]} @open_slots;
709              
710             while (my $slot = shift @open_slots) {
711             my $pid = fork;
712             die "fork failed: $!\n" unless defined $pid;
713              
714             unless ($pid) {
715             # child is a worker
716             $SIG{INT} = $SIG{TERM} = sub { $self->_cleanup };
717             $self->_do_work($slot);
718             exit 1;
719             }
720              
721             # parent is still supervisor
722             $self->{kids}{$pid}{slot} = $slot;
723             }
724             }
725              
726             # create a worker and run it forever
727             sub _do_work {
728             my Gearman::WorkerSpawner $self = shift;
729             my $slot = shift;
730              
731             my $params = $slot->[SLOT_PARAMS];
732             my $worker_class = $params->{class};
733             $0 = sprintf "%s #%d", $worker_class, $slot->[SLOT_NUM];
734              
735             my $worker = $worker_class->new($slot->[SLOT_NUM], $params->{config}, gearman_servers());
736              
737             die "failed to create $worker_class object" unless $worker;
738              
739             $worker->job_servers(@{ $self->{gearmand} });
740              
741             # each worker gets a unique function so we can ping it in wait_until_all_ready
742             $worker->register_function(_ping_name($slot->[SLOT_ID]) => sub {
743             if ($worker->can('unregister_function')) {
744             # remove the function so it doesn't pollute server "status" command
745             $worker->unregister_function(_ping_name($slot->[SLOT_ID]));
746             }
747             return 1;
748             });
749              
750             $SIG{INT} = sub { $quitting = 1 };
751             while (!$quitting) {
752             {
753             eval {
754             $worker->work(stop_if => sub {1});
755             };
756             $@ && warn "$worker_class [$$] failed: $@";
757             }
758              
759             $worker->post_work if $worker->can('post_work');
760              
761             # bail if supervisor went away
762             $quitting++ if getppid != $self->{supervisor_pid};
763             }
764             exit 0;
765             }
766              
767             # takes a subref and a number of seconds, and runs the sub that often
768             sub _run_periodically {
769             my $sub = shift;
770             my $period = shift;
771             my $recycler;
772             $recycler = sub {
773             $sub->();
774             Danga::Socket->AddTimer($period, $recycler);
775             };
776             Danga::Socket->AddTimer(0, $recycler);
777             }
778              
779             sub _serialize {
780             return join '', unpack('h*', nfreeze shift), "\n";
781             }
782              
783             sub _unserialize {
784             my $frozen = shift;
785             return thaw pack 'h*', $frozen;
786             }
787              
788             sub _ping_name {
789             my $id = shift;
790             return "ping_$id";
791             }
792              
793             # consume kids and returns a hash $self->{kids} contents for reaped pids, or
794             # undef for unknown kids
795             sub _reap {
796             my Gearman::WorkerSpawner $self = shift;
797             my %reaped;
798             while ((my $pid = waitpid(-1, WNOHANG)) > 0) {
799             $reaped{$pid} = delete $self->{kids}{$pid};
800             $reaped{$pid}{exit_code} = $?;
801             }
802             return %reaped;
803             }
804              
805             sub _cleanup {
806             my $self = shift;
807             return if $quitting++;
808             my @kids = keys %{ $self->{kids} };
809             push @kids, $gearmand_pid if $gearmand_pid;
810             kill 'INT', @kids;
811             exit 0;
812             }
813              
814             if (!caller()) {
815             # we're being called as a script, not a module, presumably from exec in _spawn_workers.
816             __PACKAGE__->_supervise(@ARGV);
817             }
818              
819             1;
820              
821             __END__