File Coverage

lib/Beekeeper/Service/Supervisor/Worker.pm
Criterion Covered Total %
statement 15 175 8.5
branch 0 48 0.0
condition 0 63 0.0
subroutine 5 25 20.0
pod 4 15 26.6
total 24 326 7.3


line stmt bran cond sub pod time code
1             package Beekeeper::Service::Supervisor::Worker;
2              
3 1     1   1018 use strict;
  1         2  
  1         31  
4 1     1   6 use warnings;
  1         2  
  1         41  
5              
6             our $VERSION = '0.10';
7              
8 1     1   5 use Beekeeper::Worker ':log';
  1         2  
  1         146  
9 1     1   7 use base 'Beekeeper::Worker';
  1         2  
  1         97  
10              
11 1     1   8 use Beekeeper::Worker::Extension::SharedCache;
  1         2  
  1         2580  
12              
13             our $CHECK_PERIOD = $Beekeeper::Worker::REPORT_STATUS_PERIOD;
14              
15              
16             sub authorize_request {
17 0     0 1   my ($self, $req) = @_;
18              
19 0 0 0       if ($req->{method} eq '_bkpr.supervisor.worker_status' ||
20             $req->{method} eq '_bkpr.supervisor.worker_exit' ) {
21              
22 0 0         return unless $self->__has_authorization_token('BKPR_SYSTEM');
23             }
24             else {
25              
26 0 0         return unless $self->__has_authorization_token('BKPR_ADMIN');
27             }
28              
29 0           return BKPR_REQUEST_AUTHORIZED;
30             }
31              
32             sub on_startup {
33 0     0 1   my $self = shift;
34              
35 0           $self->{host} = $self->{_WORKER}->{hostname};
36 0           $self->{pool} = $self->{_WORKER}->{pool_id};
37              
38 0           $self->{Workers} = $self->shared_cache( id => "workers", max_age => $CHECK_PERIOD * 4 );
39 0           $self->{Queues} = {};
40 0           $self->{Load} = {};
41              
42 0           $self->accept_notifications(
43             '_bkpr.supervisor.restart_pool' => 'restart_pool',
44             '_bkpr.supervisor.restart_workers' => 'restart_workers',
45             );
46              
47 0           $self->accept_remote_calls(
48             '_bkpr.supervisor.worker_status' => 'worker_status',
49             '_bkpr.supervisor.worker_exit' => 'worker_exit',
50             '_bkpr.supervisor.get_workers_status' => 'get_workers_status',
51             '_bkpr.supervisor.get_services_status' => 'get_services_status',
52             );
53              
54             $self->{check_status_tmr} = AnyEvent->timer(
55             after => rand($CHECK_PERIOD),
56             interval => $CHECK_PERIOD,
57             cb => sub {
58 0     0     $self->check_workers;
59 0           $self->check_queues;
60             },
61 0           );
62             }
63              
64             sub on_shutdown {
65 0     0 1   my $self = shift;
66              
67             # Disconnect shared cache
68 0           undef $self->{Workers};
69             }
70              
71             sub log_handler {
72 0     0 1   my $self = shift;
73              
74             # Use pool's logfile
75 0           $self->SUPER::log_handler( foreground => 1 );
76             }
77              
78             sub worker_status {
79 0     0 0   my ($self, $params) = @_;
80              
81 0           $self->set_worker_status( %$params );
82             }
83              
84             sub worker_exit {
85 0     0 0   my ($self, $params) = @_;
86              
87 0           $self->remove_worker_status( %$params );
88              
89             # Check for unserviced queues, just in case of worker being the last of its kind
90 0           $self->check_queues;
91             }
92              
93             sub set_worker_status {
94 0     0 0   my ($self, %args) = @_;
95              
96 0   0       my $pool = $args{'pool'} || die;
97 0   0       my $host = $args{'host'} || die;
98 0   0       my $pid = $args{'pid'} || die;
99              
100 0           my $worker_id = "$host:$pool:$pid";
101              
102 0   0       my $status = $self->{Workers}->get( $worker_id ) || {};
103              
104 0           $status = { %$status, %args };
105              
106 0           $self->{Workers}->set( $worker_id => $status );
107              
108 0 0         if ($status->{queue}) {
109 0           $self->{Queues}->{$_} = 1 foreach @{$status->{queue}};
  0            
110             }
111             }
112              
113             sub touch_worker_status {
114 0     0 0   my ($self, %args) = @_;
115              
116 0   0       my $pool = $args{'pool'} || die;
117 0   0       my $host = $args{'host'} || die;
118 0   0       my $pid = $args{'pid'} || die;
119              
120 0           my $worker_id = "$host:$pool:$pid";
121              
122 0           $self->{Workers}->touch( $worker_id );
123             }
124              
125             sub remove_worker_status {
126 0     0 0   my ($self, %args) = @_;
127              
128 0   0       my $pool = $args{'pool'} || die;
129 0   0       my $host = $args{'host'} || die;
130 0   0       my $pid = $args{'pid'} || die;
131              
132 0           my $worker_id = "$host:$pool:$pid";
133              
134 0           $self->{Workers}->delete( $worker_id );
135             }
136              
137             sub _get_workers {
138 0     0     my ($self, %args) = @_;
139              
140 0           my $host = $args{'host'};
141 0           my $pool = $args{'pool'};
142 0           my $class = $args{'class'};
143              
144             my @workers = grep { defined $_ &&
145             (!$host || $_->{host} eq $host ) &&
146             (!$pool || $_->{pool} eq $pool ) &&
147 0 0 0       (!$class || $_->{class} eq $class)
      0        
      0        
      0        
      0        
148 0           } values %{$self->{Workers}->{data}};
  0            
149              
150 0           return \@workers;
151             }
152              
153              
154             sub check_workers {
155 0     0 0   my $self = shift;
156              
157 0           my $local_workers = $self->_get_workers( host => $self->{host} );
158              
159 0           foreach my $worker (@$local_workers) {
160              
161 0 0         next unless defined $worker;
162              
163 0           my $pid = $worker->{pid};
164              
165 0           my ($mem_size, $cpu_ticks);
166              
167 0 0         if (open my $fh, '<', "/proc/$pid/statm") {
168             # Linux on intel x86 has a fixed 4KiB page size
169 0           my ($virt, $res, $share) = map { $_ * 4 } (split /\s/, scalar <$fh>)[0,1,2];
  0            
170 0           close $fh;
171              
172             # Apache::SizeLimit uses $virt + $share but that doensn't look useful
173 0           $mem_size = $res - $share;
174             }
175             else {
176             # Worker is not running anymore
177             $self->remove_worker_status(
178             pool => $worker->{pool},
179             host => $worker->{host},
180             pid => $worker->{pid},
181 0           );
182              
183 0           next;
184             }
185              
186 0 0         if (open my $fh, '<', "/proc/$pid/stat") {
187 0           my ($utime, $stime) = (split /\s/, scalar <$fh>)[13,14];
188 0           close $fh;
189              
190             # Values in clock ticks, usually 100 (getconf CLK_TCK)
191 0           $cpu_ticks = $utime + $stime;
192             }
193              
194 0   0       my $cpu_load = sprintf("%.2f",($cpu_ticks - ($self->{Load}->{$pid} || 0)) / $CHECK_PERIOD);
195 0           $self->{Load}->{$pid} = $cpu_ticks;
196              
197 0   0       my $old_msize = $worker->{msize} || 0.01;
198 0   0       my $old_load = $worker->{cpu} || 0.01;
199              
200 0 0 0       if (( abs($mem_size - $old_msize) / $old_msize < .05 ) &&
201             ( abs($cpu_load - $old_load) / $old_load < .05 )) {
202              
203             # Avoid sending messages when changes are below 5%
204             $self->touch_worker_status(
205             pool => $worker->{pool},
206             host => $worker->{host},
207             pid => $worker->{pid},
208 0           );
209             }
210             else {
211             # Update worker memory usage and cpu load
212             $self->set_worker_status(
213             pool => $worker->{pool},
214             host => $worker->{host},
215             pid => $worker->{pid},
216 0           mem => $mem_size,
217             cpu => $cpu_load,
218             );
219             }
220             }
221             }
222              
223              
224             sub check_queues {
225 0     0 0   my $self = shift;
226              
227 0           my $Queues = $self->{Queues};
228              
229 0           $Queues->{$_} = 0 foreach (keys %$Queues);
230              
231             # Count how many workers are servicing each queue
232 0           foreach my $worker (values %{$self->{Workers}->{data}}) {
  0            
233            
234             # Skip defunct workers (which are remembered a while)
235 0 0         next unless defined $worker;
236              
237             # Do not count queues being drained by Sinkhole
238 0 0         next if ($worker->{class} eq 'Beekeeper::Service::Sinkhole::Worker');
239              
240 0           $Queues->{$_}++ foreach @{$worker->{queue}};
  0            
241             }
242              
243 0           my @unserviced = grep { $Queues->{$_} == 0 } keys %$Queues;
  0            
244              
245 0 0         return unless @unserviced;
246              
247             # Tell Sinkhole to respond immediately to all requests sent to
248             # unserviced queues with a "Method not available" error response
249              
250 0           my $guard = $self->__use_authorization_token('BKPR_SYSTEM');
251              
252 0           $self->send_notification(
253             method => '_bkpr.sinkhole.unserviced_queues',
254             params => { queues => \@unserviced },
255             );
256             }
257              
258              
259             sub get_workers_status {
260 0     0 0   my ($self, $args) = @_;
261              
262             my $workers = $self->_get_workers(
263             host => $args->{host},
264             pool => $args->{pool},
265             class => $args->{class},
266 0           );
267              
268 0           return $workers;
269             }
270              
271              
272             sub get_services_status {
273 0     0 0   my ($self, $args) = @_;
274              
275             my $workers = $self->_get_workers(
276             host => $args->{host},
277             pool => $args->{pool},
278             class => $args->{class},
279 0           );
280              
281 0           my %services;
282              
283 0           foreach my $worker (@$workers) {
284 0           $services{$worker->{class}}{count}++;
285 0           $services{$worker->{class}}{cps} += $worker->{cps};
286 0           $services{$worker->{class}}{nps} += $worker->{nps};
287 0           $services{$worker->{class}}{err} += $worker->{err};
288 0   0       $services{$worker->{class}}{cpu} += $worker->{cpu} || 0;
289 0   0       $services{$worker->{class}}{mem} += $worker->{mem} || 0;
290 0           $services{$worker->{class}}{load} += $worker->{load};
291             }
292              
293 0           foreach my $service (values %services) {
294 0           $service->{load} = $service->{load} / $service->{count};
295             }
296              
297 0           foreach my $service (values %services) {
298 0           $service->{cps} = sprintf("%.2f", $service->{cps} );
299 0           $service->{nps} = sprintf("%.2f", $service->{nps} );
300 0           $service->{err} = sprintf("%.2f", $service->{err} );
301 0           $service->{cpu} = sprintf("%.2f", $service->{cpu} );
302 0           $service->{mem} = sprintf("%.2f", $service->{mem} );
303 0           $service->{load} = sprintf("%.2f", $service->{load});
304             }
305              
306 0           return \%services;
307             }
308              
309              
310             sub restart_workers {
311 0     0 0   my ($self, $args) = @_;
312              
313 0 0 0       return if ($args->{host} && $args->{host} ne $self->{host});
314 0 0 0       return if ($args->{pool} && $args->{pool} ne $self->{pool});
315              
316             my $workers = $self->_get_workers(
317             host => $self->{host},
318             pool => $self->{pool},
319             class => $args->{class},
320 0           );
321              
322 0 0         log_info "Restarting workers" . ($args->{class} ? " $args->{class}..." : "...");
323              
324 0           my @worker_pids;
325              
326 0           foreach my $worker (@$workers) {
327             # Do not restart supervisor
328 0 0         next if ($worker->{class} eq 'Beekeeper::Service::Supervisor::Worker');
329              
330 0           my ($pid) = ($worker->{pid} =~ m/^(\d+)$/); # untaint
331 0 0         push @worker_pids, $pid if ($pid);
332             }
333              
334 0 0         if (!$args->{delay}) {
335             # Restart all workers at once
336 0           foreach my $pid (@worker_pids) {
337 0           kill( 'TERM', $pid );
338             }
339             }
340             else {
341             # Slowly restart all workers
342 0           my $delay = $args->{delay};
343 0           my $count = 0;
344              
345 0           foreach my $pid (@worker_pids) {
346             $self->{restart_worker_tmr}->{$pid} = AnyEvent->timer(
347             after => $delay * $count++,
348             cb => sub {
349 0     0     delete $self->{restart_worker_tmr}->{$pid};
350 0           kill( 'TERM', $pid );
351             },
352 0           );
353             }
354             }
355             }
356              
357              
358             sub restart_pool {
359 0     0 0   my ($self, $args) = @_;
360              
361 0 0 0       return if ($args->{host} && $args->{host} ne $self->{host});
362 0 0 0       return if ($args->{pool} && $args->{pool} ne $self->{pool});
363              
364 0           my $wpool_pid = $self->{_WORKER}->{parent_pid};
365 0           my $delay = $args->{delay};
366              
367 0 0         if (!$delay) {
368 0           kill( 'HUP', $wpool_pid );
369             }
370             else {
371              
372 0           my $index = $self->_get_pool_index( $self->{host}, $self->{pool} );
373              
374             $self->{restart_pool_tmr} = AnyEvent->timer(
375             after => $delay * $index,
376             cb => sub {
377 0     0     delete $self->{restart_pool_tmr};
378 0           kill( 'HUP', $wpool_pid );
379             },
380 0           );
381             }
382             }
383              
384             sub _get_pool_index {
385 0     0     my ($self, $host, $pool) = @_;
386              
387             # Sort all pools by name, then return the index of the requested one.
388             # Used by restart_pool() to determine restart order across hosts
389              
390 0           my %pools;
391              
392 0           foreach my $worker (values %{$self->{Workers}->{data}}) {
  0            
393 0 0         next unless defined $worker;
394 0           $pools{"$worker->{host}:$worker->{pool}"} = 1;
395             }
396              
397 0 0         return 0 unless $pools{"$host:$pool"};
398              
399 0           my $index = 0;
400              
401 0           foreach my $key (sort keys %pools) {
402 0 0         last if ($key eq "$host:$pool");
403 0           $index++;
404             }
405              
406 0           return $index;
407             }
408              
409             1;
410              
411             __END__