File Coverage

lib/Beekeeper/WorkerPool.pm
Criterion Covered Total %
statement 21 168 12.5
branch 0 62 0.0
condition 0 25 0.0
subroutine 7 18 38.8
pod 0 5 0.0
total 28 278 10.0


line stmt bran cond sub pod time code
1             package Beekeeper::WorkerPool;
2              
3 1     1   1081 use strict;
  1         2  
  1         35  
4 1     1   5 use warnings;
  1         2  
  1         48  
5              
6             our $VERSION = '0.08';
7              
8 1     1   5 use base 'Beekeeper::WorkerPool::Daemon';
  1         3  
  1         76  
9 1     1   5 use POSIX ":sys_wait_h";
  1         2  
  1         6  
10 1     1   476 use Time::HiRes 'sleep';
  1         2  
  1         6  
11 1     1   159 use Beekeeper::Config;
  1         2  
  1         37  
12              
13 1     1   6 use constant COMPILE_ERROR_EXIT_CODE => 99;
  1         2  
  1         1968  
14              
15              
16             sub new {
17 0     0 0   my ($class, %args) = @_;
18              
19 0           my $self = $class->SUPER::new(
20             daemon_name => "beekeeper",
21             description => "worker pool",
22             get_options => [ "pool=s", "config-dir=s", "debug" ],
23             %args,
24             );
25              
26 0           $self->parse_options;
27              
28 0   0       my $pool_id = $self->{options}->{'pool'} || '';
29 0           ($pool_id) = ($pool_id =~ m/^([\w-]+)$/); # untaint
30              
31 0 0         unless ($pool_id) {
32 0           print "Mandatory parameter --pool was not specified.\n\n";
33             #ENHACEMENT: list available pools
34 0           $self->cmd_help;
35 0           CORE::exit(1);
36             }
37              
38 0           $self->{config}->{'pool_id'} = $pool_id;
39 0           $self->{config}->{daemon_name} = "beekeeper-$pool_id";
40 0           $self->{config}->{description} = "worker pool $pool_id";
41              
42             # Pool cannot be started without a proper config file
43 0 0         $self->load_config || CORE::exit(1);
44              
45 0 0         unless ($self->{config}->{log_file}) {
46 0           my $file = "$pool_id-pool.log";
47 0           my $dir = '/var/log';
48 0   0       my $user = $self->{options}->{'user'} || getpwuid($>);
49 0           ($user) = ($user =~ m/^(\w+)$/); # untaint
50 0 0         $self->{config}->{log_file} = (-d "$dir/$user") ? "$dir/$user/$file" : "$dir/$file";
51             }
52              
53 0           return $self;
54             }
55              
56             sub cmd_help {
57 0     0 0   my $self = shift;
58              
59 0           my $progname = $0;
60 0           $progname =~ s|.*/||;
61              
62 0           print "Usage: $progname [options] {start|stop|restart|reload|check}\n";
63 0           print " --foreground Run in foreground (do not daemonize)\n";
64 0           print " --pool str Worker pool name (mandatory)\n";
65 0           print " --user str Run as specified user\n";
66 0           print " --group str Run as specified group\n";
67 0           print " --config-dir str Path to directory containing config files\n";
68 0           print " --debug Turn on workers debug flag\n";
69 0           print " --help Display this help and exit\n";
70             }
71              
72             sub load_config {
73 0     0 0   my $self = shift;
74              
75 0           my $pool_id = $self->{config}->{'pool_id'};
76 0           my $conf_dir = $self->{options}->{'config-dir'};
77              
78 0 0         Beekeeper::Config->set_config_dir($conf_dir) if ($conf_dir);
79              
80 0           my $pool_cfg = Beekeeper::Config->get_pool_config( pool_id => $pool_id );
81 0           my $bus_cfg = Beekeeper::Config->get_bus_config( bus_id => '*' );
82              
83 0 0         unless ($pool_cfg) {
84 0           die "Worker pool '$pool_id' is not defined into config file pool.config.json\n";
85             }
86              
87             # Ensure that local bus is defined
88 0           my $bus_id = $pool_cfg->{'bus_id'};
89              
90 0 0         unless ($bus_cfg->{$bus_id}) {
91 0           die "Bus '$bus_id' is not defined into config file bus.config.json\n";
92             }
93              
94             # Merge pool.config.json contents
95 0           $self->{config}->{$_} = $pool_cfg->{$_} foreach (keys %$pool_cfg);
96              
97             # Keep bus.config.json
98 0           $self->{bus_config} = $bus_cfg;
99              
100             # Remove unused inherited entry
101 0           delete $self->{config}->{get_options};
102              
103 0           return 1;
104             }
105              
106             sub main {
107 0     0 0   my $self = shift;
108              
109 0           my @spawn_queue; # will hold a list of worker classes to be spawned
110             my %workers; # will hold a pid -> class map of running workers
111              
112 0           my $workers_config = $self->{config}->{'workers'};
113 0           my $pool_id = $self->{config}->{'pool_id'};
114              
115 0           my @spawn_workers = (
116             # Every pool spawns a Supervisor worker
117             'Beekeeper::Service::Supervisor::Worker',
118             );
119              
120 0 0         if ($self->{config}->{'use_toybroker'}) {
121             # Spawn the broker in first place as other workers may depend of it
122 0           unshift @spawn_workers, 'Beekeeper::Service::ToyBroker::Worker';
123             }
124              
125 0           foreach my $worker_class (@spawn_workers) {
126 0   0       $workers_config->{$worker_class} ||= { worker_count => 1 };
127             }
128              
129 0           foreach my $worker_class (sort keys %$workers_config) {
130 0 0         next if grep { $_ eq $worker_class } @spawn_workers;
  0            
131 0           push @spawn_workers, $worker_class;
132             }
133              
134             # Make a list of individual workers to spawn
135 0           foreach my $worker_class (@spawn_workers) {
136             my $worker_count = $workers_config->{$worker_class}->{worker_count} ||
137 0   0       $workers_config->{$worker_class}->{workers_count} ; # compat
138 0 0         $worker_count = 1 unless defined $worker_count;
139 0           for (1..$worker_count) {
140 0           push @spawn_queue, $worker_class;
141             }
142             }
143              
144             # Very basic log handler (STDERR was already redirected to a log file)
145             $SIG{'__WARN__'} = sub {
146 0     0     my @t = reverse((localtime)[0..5]); $t[0] += 1900; $t[1]++;
  0            
  0            
147 0           my $tstamp = sprintf("%4d-%02d-%02d %02d:%02d:%02d.000", @t);
148 0           warn "[$tstamp][$$]", @_;
149 0           };
150              
151 0           warn "[info] Pool $pool_id started\n";
152              
153             # Install signal handlers to control this daemon and forked workers.
154             # The supported signals and related actions are:
155             #
156             # TERM tell workers to quit after finishing their current tasks, then quit
157             # INT tell workers to quit immediately (even in the middle of a task), then quit
158             # PWR received when system is being shut down, it is handled the same as TERM
159             # HUP restart workers after finishing their current tasks
160              
161 0           my $mode = '';
162              
163 0     0     $SIG{TERM} = sub { $mode = 'QUIT_GRACEFULLY' };
  0            
164 0     0     $SIG{INT} = sub { $mode = 'QUIT_IMMEDIATELY' };
  0            
165 0     0     $SIG{PWR} = sub { $mode = 'QUIT_GRACEFULLY' };
  0            
166 0 0   0     $SIG{HUP} = sub { $mode = 'RESTART_POOL' unless $mode };
  0            
167              
168             # Install a SIGCHLD handler to reap or respawn forked workers. This is
169             # executed when one or more subprocess exits, either normally or abnormally.
170              
171             $SIG{CHLD} = sub {
172              
173 0     0     while ((my $worker_pid = waitpid(-1, WNOHANG)) > 0) {
174              
175 0           my $worker_class = $workers{$worker_pid};
176              
177             # Mark the worker as defunct
178 0           $workers{$worker_pid} = undef;
179              
180             # Handle the edge case of a worker exiting too quickly
181 0 0         return unless ($worker_class);
182              
183             # The wait status of the defunct subprocess ($?) encodes both the
184             # actual exit code and the signal which caused the exit, if any.
185 0           my $exit_code = $? >> 8;
186 0           my $signal = $? & 127;
187              
188 0 0 0       if ($exit_code || $signal) {
189 0 0 0       warn "[error] $worker_class #$worker_pid exited abormally ($exit_code, $signal)\n"
      0        
      0        
190             unless ($mode ne '' && $exit_code == 0 && ($signal == 2 || $signal == 15));
191             }
192              
193 0 0 0       if ($mode eq 'QUIT_IMMEDIATELY' || $mode eq 'QUIT_GRACEFULLY') {
    0          
    0          
194             # Worker terminated just before signaling it to quit.
195             # Do not respawn the worker, as we are trying to get rid of it.
196 0           next;
197             }
198             elsif ($mode eq 'WAIT_CHILDS_TO_QUIT') {
199             # Worker terminated after signaling it to quit.
200             # Do not respawn it, we are indeed waiting for workers to quit.
201 0           next;
202             }
203             elsif ($exit_code == COMPILE_ERROR_EXIT_CODE) {
204             # Worker does not compile. Do not respawn, it will fail again.
205 0           next;
206             }
207              
208             # Spawn a worker of the same class that the defuncted one.
209             # This is the core functionality of this daemon: when a worker exits
210             # for whatever reason, it is immediately replaced by another.
211 0           push @spawn_queue, $worker_class;
212             }
213 0           };
214              
215             RUN_FOREVER: {
216              
217 0 0         if ($mode eq 'QUIT_GRACEFULLY') {
  0 0          
    0          
    0          
218              
219 0           warn "[info] Quitting gracefully...\n";
220              
221             # SIGTERM received, propagate signal to all workers to quit gracefully.
222             # Then wait until all workers are gone and quit.
223            
224 0           $mode = 'WAIT_CHILDS_TO_QUIT';
225 0           kill 'TERM', keys %workers;
226             }
227             elsif ($mode eq 'QUIT_IMMEDIATELY') {
228              
229 0           warn "[info] Quitting...\n";
230              
231             # SIGINT received, propagate signal to all al workers to quit immediately.
232             # Then wait until all workers are gone and quit.
233              
234 0           $mode = 'WAIT_CHILDS_TO_QUIT';
235 0           kill 'INT', keys %workers;
236             }
237             elsif ($mode eq 'RESTART_POOL') {
238              
239 0           warn "[info] Restarting pool\n";
240              
241             # SIGHUP received, signal all workers to quit gracefully.
242             # Workers will be automatically respawned again.
243              
244 0           $mode = '';
245 0           kill 'TERM', keys %workers;
246             }
247             elsif ($mode eq 'WAIT_CHILDS_TO_QUIT') {
248              
249             # Quit if there are no workers running anymore. This can be
250             # determined because when a worker exits the SIGCHLD handler
251             # removes the corresponding entry into %workers.
252              
253 0           my @still_running = grep { defined $_ } values %workers;
  0            
254              
255 0 0         last RUN_FOREVER unless (@still_running);
256             }
257              
258 0 0         if (@spawn_queue) {
259              
260             # @spawn_queue contains the list of workers to be spawned.
261             # It is populated at startup, and then by the SIGCHLD handler
262             # which adds workers to replace the defuncted ones.
263              
264 0           while (@spawn_queue) {
265              
266             # Spawn a new worker and remove it from the queue
267 0           my $worker_class = shift @spawn_queue;
268 0           my $worker_pid = $self->spawn_worker($worker_class);
269              
270 0 0         unless ($worker_pid) {
271             # Could not fork, try again later
272 0           unshift @spawn_queue, $worker_class;
273 0           last;
274             }
275              
276             # Add to our list of spawned workers (only if it isn't already defunct)
277 0 0         $workers{$worker_pid} = $worker_class unless (exists $workers{$worker_pid});
278              
279             # Give ToyBroker enough time to start accepting connections
280 0 0         sleep 0.05 if ($worker_class eq 'Beekeeper::Service::ToyBroker::Worker');
281             }
282              
283 0           foreach my $worker_pid (keys %workers) {
284             # Remove defunct workers from our list because pids may be reused
285 0 0         delete $workers{$worker_pid} if (!defined $workers{$worker_pid});
286             }
287             }
288              
289 0           sleep 1;
290              
291 0           redo RUN_FOREVER;
292             }
293              
294 0           warn "[info] Pool $pool_id stopped\n";
295             }
296              
297              
298             sub spawn_worker {
299 0     0 0   my ($self, $worker_class) = @_;
300              
301 0           my $parent_pid = $$;
302 0           my $worker_pid = fork;
303              
304 0 0         unless (defined $worker_pid) {
305 0           warn "[error] Failed to fork $worker_class: $!\n";
306 0           return;
307             }
308              
309 0 0         if ($worker_pid) {
310             # Parent
311 0           return $worker_pid;
312             }
313              
314             # Forked child
315              
316 0           $SIG{CHLD} = 'IGNORE';
317 0           $SIG{TERM} = 'DEFAULT';
318 0           $SIG{INT} = 'DEFAULT';
319 0           $SIG{HUP} = 'DEFAULT';
320              
321             # Ensure that workers don't get the same random numbers
322 0           srand;
323              
324             # Load worker codebase
325 0           eval "use $worker_class";
326              
327 0 0         if ($@) {
328             # Worker does not compile
329 0           warn "[error] $worker_class does not compile: " . $@;
330 0           CORE::exit( COMPILE_ERROR_EXIT_CODE );
331             };
332              
333 0 0         unless ($worker_class->can('__work_forever')) {
334             # Module compiles fine, but it doesn't seems to be a worker
335 0           warn "[error] $worker_class doesn't know how to __work_forever\n";
336 0           CORE::exit( COMPILE_ERROR_EXIT_CODE );
337             }
338              
339             my $worker = $worker_class->new(
340             parent_pid => $parent_pid,
341             foreground => $self->{options}->{foreground},
342             debug => $self->{options}->{debug},
343             bus_config => $self->{bus_config},
344             pool_config => $self->{config},
345             pool_id => $self->{config}->{'pool_id'},
346             bus_id => $self->{config}->{'bus_id'},
347 0           config => $self->{config}->{'workers'}->{$worker_class},
348             );
349              
350             # Destroy daemon object
351 0           %$self = ();
352 0           undef $self;
353              
354 0           $worker->__work_forever;
355              
356 0           CORE::exit;
357             }
358              
359             1;
360              
361             __END__