File Coverage

blib/lib/App/Base/Daemon/Supervisor.pm
Criterion Covered Total %
statement 51 132 38.6
branch 6 64 9.3
condition 0 6 0.0
subroutine 13 21 61.9
pod 5 5 100.0
total 75 228 32.8


line stmt bran cond sub pod time code
1             package App::Base::Daemon::Supervisor;
2 3     3   2124 use 5.010;
  3         6  
3 3     3   1185 use Moose::Role;
  3         9798  
  3         9  
4             with 'App::Base::Daemon';
5              
6             our $VERSION = '0.07'; ## VERSION
7              
8             =head1 NAME
9              
10             App::Base::Daemon::Supervisor - supervise daemon process
11              
12             =head1 SYNOPSIS
13              
14             package App::Base::Daemon::Foo;
15             use Moose;
16             with 'App::Base::Daemon::Supervisor';
17              
18             sub documentation { return 'foo'; }
19             sub options { ... }
20             sub supervised_process {
21             my $self = shift;
22             # the main daemon process
23             while(1) {
24             # check if supervisor is alive, exit otherwise
25             $self->ping_supervisor;
26             # do something
27             ...
28             }
29             }
30             sub supervised_shutdown {
31             # this is called during shutdown
32             }
33              
34             =head1 DESCRIPTION
35              
36             App::Base::Daemon::Supervisor allows to run code under supervision, it also
37             provides support for zero downtime reloading. When you run Supervisor based
38             daemon, the first process becomes a supervisor, it forks a child process which
39             invokes I<supervised_process> method that should contain the main daemon code.
40             Supervisor and worker connected with a socketpair. If the worker exits for some
41             reason, the supervisor detects it and starts a new worker after a small delay.
42             Worker should periodically call I<ping_supervisor> method, so it would be able
43             to detect the case when supervisor has been killed and exit.
44              
45             If module needs hot reloading feature, it should redefine I<can_do_hot_reload>
46             method to return true value. In this case supervisor process sets a handler for
47             I<SIGUSR2> signal. When I<SIGUSR2> signal is received, supervisor starts a new
48             copy of the script via fork/exec, so this new copy runs a new code. New
49             supervisor starts a worker process and waits a signal that this new worker is
50             ready to do its job. To send that signal worker should invoke
51             I<ready_to_take_over> method. Then the new supervisor receives that signal, it
52             sends I<SIGQUIT> to the old supervisor and waits for it to exit. After the old
53             supervisor exited (normally new supervisor detects that because it can flock
54             the pid file), the new supervisor sends signal to the worker,
55             I<ready_to_take_over> method in worker returns, and worker can start doing its
56             job. If supervisor receives I<SIGUSR2> when it is already in the process of
57             reloading, it ignores this signal. If supervisor didn't get I<SIGQUIT> in 60
58             seconds after starting hot reloading process, it sends I<SIGKILL> to the new
59             supervisor and resumes normal work.
60              
61             =cut
62              
63 3     3   12996 use namespace::autoclean;
  3         16020  
  3         9  
64 3     3   1551 use Socket qw();
  3         7743  
  3         93  
65 3     3   12 use POSIX qw(:errno_h);
  3         6  
  3         27  
66 3     3   1314 use Time::HiRes;
  3         6  
  3         30  
67 3     3   1533 use IO::Handle;
  3         13011  
  3         108  
68 3     3   18 use Try::Tiny;
  3         3  
  3         2712  
69              
70             =head1 REQUIRED METHODS
71              
72             Class consuming this role must implement the following methods:
73              
74             =cut
75              
76             =head2 supervised_process
77              
78             The main daemon subroutine. Inside this subroutine you should periodically
79             check that supervisor is still alive using I<ping_supervisor> method. If
80             supervisor exited, daemon should also exit.
81              
82             =cut
83              
84             requires 'supervised_process';
85              
86             =head2 supervised_shutdown
87              
88             This subroutine is executed then daemon process is shutting down. Put cleanup
89             code inside.
90              
91             =cut
92              
93             requires 'supervised_shutdown';
94              
95             =head1 ATTRIBUTES
96              
97             =cut
98              
99             =head2 is_supervisor
100              
101             returns true inside supervisor process and false inside supervised daemon
102              
103             =cut
104              
105             has is_supervisor => (
106             is => 'rw',
107             default => 1,
108             );
109              
110             =head2 delay_before_respawn
111              
112             how long supervisor should wait after child process exited before starting a
113             new child. Default value is 5.
114              
115             =cut
116              
117             has delay_before_respawn => (
118             is => 'rw',
119             default => 5,
120             );
121              
122             =head2 supervisor_pipe
123              
124             File descriptor of the pipe to supervisor
125              
126             =cut
127              
128             has supervisor_pipe => (
129             is => 'rw',
130             writer => '_supervisor_pipe',
131             );
132              
133             has _child_pid => (is => 'rw');
134              
135             =head1 METHODS
136              
137             =cut
138              
139             =head2 $self->ping_supervisor
140              
141             Should only be called from supervised process. Checks if supervisor is alive
142             and initiates shutdown if it is not.
143              
144             =cut
145              
146             sub ping_supervisor {
147 10     10 1 1002705 my $self = shift;
148 10 50       894 my $pipe = $self->supervisor_pipe or $self->error("Supervisor pipe is not defined");
149 10         322 say $pipe "ping";
150 10         772 my $pong = <$pipe>;
151 10 50       57 unless (defined $pong) {
152 0         0 $self->error("Error reading from supervisor pipe: $!");
153             }
154 10         78 return;
155             }
156              
157             =head2 $self->ready_to_take_over
158              
159             Used to support hot reloading. If daemon support hot restart,
160             I<supervised_process> is called while the old daemon is still running.
161             I<supervised_process> should perform initialization, e.g. open listening
162             sockets, and then call this method. Method will cause termination of old daemon
163             and after return the new process may start serving clients.
164              
165             =cut
166              
167             sub ready_to_take_over {
168 0     0 1 0 my $self = shift;
169 0 0       0 my $pipe = $self->supervisor_pipe or die "Supervisor pipe is not defined";
170 0         0 say $pipe "takeover";
171 0         0 my $ok = <$pipe>;
172 0 0       0 defined($ok) or $self->error("Failed to take over");
173 0         0 return;
174             }
175              
176             =head2 $self->daemon_run
177              
178             See L<App::Base::Daemon>
179              
180             =cut
181              
182             sub daemon_run {
183 1     1 1 8 my $self = shift;
184 1         14 $self->_set_hot_reload_handler;
185              
186 1         3 while (1) {
187 1         81 socketpair my $chld, my $par, Socket::AF_UNIX, Socket::SOCK_STREAM, Socket::PF_UNSPEC;
188 1         1590 my $pid = fork;
189 1         217 $self->_child_pid($pid);
190 1 50       23 if ($pid) {
    50          
191             local $SIG{QUIT} = sub {
192 0     0   0 kill TERM => $pid;
193 0         0 waitpid $pid, 0;
194 0         0 exit 0;
195 0         0 };
196 0         0 $chld->close;
197 0         0 $par->autoflush(1);
198 0         0 $self->_supervisor_pipe($par);
199 0         0 while (<$par>) {
200 0         0 chomp;
201 0 0       0 if ($_ eq 'ping') {
    0          
    0          
202 0         0 say $par 'pong';
203             } elsif ($_ eq 'takeover') {
204 0         0 $self->_control_takeover;
205 0         0 say $par 'ok';
206             } elsif ($_ eq 'shutdown') {
207 0         0 kill KILL => $pid;
208 0         0 close $par;
209             } else {
210 0 0       0 warn("Received unknown command from the supervised process: $_") unless $self->getOption('no-warn');
211             }
212             }
213 0         0 my $kid = waitpid $pid, 0;
214 0 0       0 warn("Supervised process $kid exited with status $?") unless $self->getOption('no-warn');
215             } elsif (not defined $pid) {
216 0 0       0 warn("Couldn't fork: $!") unless $self->getOption('no-warn');
217             } else {
218 1         53 local $SIG{USR2};
219 1         57 $par->close;
220 1         60 $chld->autoflush(1);
221 1         289 $self->_supervisor_pipe($chld);
222 1         58 $self->is_supervisor(0);
223 1         40 $self->supervised_process;
224 0         0 exit 0;
225             }
226 0         0 Time::HiRes::usleep(1_000_000 * $self->delay_before_respawn);
227             }
228              
229             # for critic
230 0         0 return;
231             }
232              
233             # this initializes SIGUSR2 handler to perform hot reload
234             sub _set_hot_reload_handler {
235 1     1   7 my $self = shift;
236              
237 1 50       21 return unless $self->can_do_hot_reload;
238 0         0 my $upgrading;
239              
240             ## no critic (RequireLocalizedPunctuationVars)
241             $SIG{USR2} = sub {
242 0 0   0   0 return unless $ENV{APP_BASE_DAEMON_PID} == $$;
243 0 0       0 if ($upgrading) {
244 0 0       0 warn("Received USR2, but hot reload is already in progress") unless $self->getOption('no-warn');
245 0         0 return;
246             }
247 0 0       0 warn("Received USR2, initiating hot reload") unless $self->getOption('no-warn');
248 0         0 my $pid;
249 0 0       0 unless (defined($pid = fork)) {
250 0 0       0 warn("Could not fork, cancelling reload") unless $self->getOption('no-warn');
251             }
252 0 0       0 unless ($pid) {
253 0 0       0 exec($ENV{APP_BASE_SCRIPT_EXE}, @{$self->{orig_args}})
  0         0  
254             or $self->error("Couldn't exec: $!");
255             }
256 0         0 $upgrading = time;
257 0 0       0 if ($SIG{ALRM}) {
258 0 0       0 warn("ALRM handler is already defined!") unless $self->getOption('no-warn');
259             }
260             $SIG{ALRM} = sub {
261 0 0       0 warn("Hot reloading timed out, cancelling") unless $self->getOption('no-warn');
262 0         0 kill KILL => $pid;
263 0         0 undef $upgrading;
264 0         0 };
265 0         0 alarm 60;
266 0         0 };
267             {
268 0         0 my $usr2 = POSIX::SigSet->new(POSIX::SIGUSR2());
  0         0  
269 0         0 my $old = POSIX::SigSet->new();
270 0         0 POSIX::sigprocmask(POSIX::SIG_UNBLOCK(), $usr2, $old);
271             }
272              
273 0         0 return;
274             }
275              
276             my $pid;
277              
278             # kill the old daemon and lock pid file
279             sub _control_takeover {
280 0     0   0 my $self = shift;
281              
282             ## no critic (RequireLocalizedPunctuationVars)
283              
284             # if it is first generation, when pid file should be already locked in App::Base::Daemon
285 0 0 0     0 if ($ENV{APP_BASE_DAEMON_GEN} > 1 and $ENV{APP_BASE_DAEMON_PID} != $$) {
286 0         0 kill QUIT => $ENV{APP_BASE_DAEMON_PID};
287 0 0       0 if ($self->getOption('no-pid-file')) {
288              
289             # we don't have pid file, so let's just poke it to death
290 0         0 my $attempts = 14;
291 0 0 0     0 while (kill(($attempts == 1 ? 'KILL' : 'ZERO') => $ENV{APP_BASE_DAEMON_PID})
292             and $attempts--)
293             {
294 0         0 Time::HiRes::usleep(500_000);
295             }
296             } else {
297             local $SIG{ALRM} = sub {
298 0 0   0   0 warn("Couldn't lock the file. Sending KILL to previous generation process") unless $self->getOption('no-warn');
299 0         0 };
300 0         0 alarm 5;
301              
302             # We may fail because two reasons:
303             # a) previous process didn't exit and still holds the lock
304             # b) new process was started and locked pid
305 0     0   0 $pid = try { File::Flock::Tiny->lock($self->pid_file) };
  0         0  
306 0 0       0 unless ($pid) {
307              
308             # So let's try killing old process, if after that locking still will fail
309             # then probably it is the case b) and we should exit
310 0         0 kill KILL => $ENV{APP_BASE_DAEMON_PID};
311 0     0   0 $SIG{ALRM} = sub { $self->error("Still couldn't lock pid file, aborting") };
  0         0  
312 0         0 alarm 5;
313 0         0 $pid = File::Flock::Tiny->lock($self->pid_file);
314             }
315 0         0 alarm 0;
316 0         0 $pid->write_pid;
317             }
318             }
319 0         0 $ENV{APP_BASE_DAEMON_PID} = $$;
320 0         0 return;
321             }
322              
323             =head2 $self->handle_shutdown
324              
325             See L<App::Base::Daemon>
326              
327             =cut
328              
329             sub handle_shutdown {
330 1     1 1 2 my $self = shift;
331 1 50       57 if ($self->is_supervisor) {
332 0 0       0 kill TERM => $self->_child_pid if $self->_child_pid;
333             } else {
334 1         14 $self->supervised_shutdown;
335             }
336              
337 1         374 return;
338             }
339              
340             =head2 DEMOLISH
341              
342             =cut
343              
344             sub DEMOLISH {
345 0     0 1   my $self = shift;
346 0 0         shutdown $self->supervisor_pipe, 2 if $self->supervisor_pipe;
347 0           return;
348             }
349              
350 3     3   15 no Moose::Role;
  3         6  
  3         21  
351             1;
352              
353             __END__
354              
355             =head1 LICENSE AND COPYRIGHT
356              
357             Copyright (C) 2010-2014 Binary.com
358              
359             This program is free software; you can redistribute it and/or modify it
360             under the terms of either: the GNU General Public License as published
361             by the Free Software Foundation; or the Artistic License.
362              
363             See http://dev.perl.org/licenses/ for more information.
364              
365             =cut