File Coverage

blib/lib/DBIx/QuickDB/Watcher.pm
Criterion Covered Total %
statement 24 185 12.9
branch 0 88 0.0
condition 0 27 0.0
subroutine 8 26 30.7
pod 0 8 0.0
total 32 334 9.5


line stmt bran cond sub pod time code
1             package DBIx::QuickDB::Watcher;
2 23     23   119 use strict;
  23         41  
  23         655  
3 23     23   74 use warnings;
  23         35  
  23         1253  
4              
5             our $VERSION = '0.000045';
6              
7 23     23   100 use Carp qw/croak/;
  23         45  
  23         1106  
8 23     23   93 use POSIX qw/:sys_wait_h/;
  23         58  
  23         153  
9 23     23   3453 use Time::HiRes qw/sleep time/;
  23         40  
  23         148  
10 23     23   1347 use Scalar::Util qw/weaken/;
  23         83  
  23         900  
11 23     23   88 use File::Path qw/remove_tree/;
  23         58  
  23         1160  
12              
13 23         159 use DBIx::QuickDB::Util::HashBase qw{
14            
15            
16            
17            
18            
19              
20            
21            
22            
23              
24            
25 23     23   10159 };
  23         50  
26              
27             sub init {
28 0     0 0   my $self = shift;
29              
30 0   0       $self->{+MASTER_PID} ||= $$;
31              
32 0           $self->{+LOG_FILE} = $self->{+DB}->gen_log;
33              
34 0           $self->start();
35              
36 0 0         weaken($self->{+DB}) if $self->{+MASTER_PID} == $$;
37             }
38              
39             sub start {
40 0     0 0   my $self = shift;
41 0 0         return if $self->{+SERVER_PID};
42              
43 0           my ($rh, $wh);
44 0 0         pipe($rh, $wh) or die "Could not open pipe: $!";
45              
46 0           my $pid = fork;
47 0 0         die "Could not fork: $!" unless defined $pid;
48              
49 0 0         if ($pid) {
50 0           close($wh);
51 0           waitpid($pid, 0);
52 0           chomp($self->{+WATCHER_PID} = <$rh>);
53 0           chomp($self->{+SERVER_PID} = <$rh>);
54 0           close($rh);
55 0 0         die "Did not get watcher pid!" unless $self->{+WATCHER_PID};
56 0 0         die "Did not get server pid!" unless $self->{+SERVER_PID};
57 0           return;
58             }
59              
60 0           close($rh);
61 0           POSIX::setsid();
62 0           setpgrp(0, 0);
63 0           $pid = fork;
64 0 0         die "Could not fork: $!" unless defined $pid;
65 0 0         POSIX::_exit(0) if $pid;
66              
67 0           $wh->autoflush(1);
68 0           print $wh "$$\n";
69              
70             # In watcher now
71 0 0         eval { $self->watch($wh); 1 } or POSIX::_exit(1);
  0            
  0            
72 0           POSIX::_exit(0);
73             }
74              
75             sub watch {
76 0     0 0   my $self = shift;
77 0           my ($wh) = @_;
78              
79 0           $0 = 'db-quick-watcher';
80              
81 0           my $kill = '';
82 0           my $hup = 0;
83 0     0     local $SIG{TERM} = sub { $kill = 'TERM' };
  0            
84 0     0     local $SIG{INT} = sub { $kill = 'INT' };
  0            
85 0     0     local $SIG{HUP} = sub { $hup = 1 };
  0            
86              
87 0           my $start_pid = $$;
88 0           my $pid = $self->spawn();
89 0           print $wh "$pid\n";
90 0           close($wh);
91              
92 0           my $mpid = $self->{+MASTER_PID};
93 0 0         my $spid = $self->{+SERVER_PID} or die "No server pid";
94              
95 0           my $ddir = $self->{+DB}->dir;
96 0   0       my $ssig = $self->{+DB}->stop_sig // 'TERM';
97              
98             # Ignore SIGTERM/SIGINT before exec so the watcher cannot be killed
99             # during startup before _do_watch installs its signal handlers.
100             # SIG_IGN persists across exec, and any pending signal will be held
101             # until _do_watch replaces these with proper handlers.
102 0           $SIG{TERM} = 'IGNORE';
103 0           $SIG{INT} = 'IGNORE';
104              
105 0           exec(
106             $^X, '-Ilib',
107              
108             '-e' => "require DBIx::QuickDB::Watcher; DBIx::QuickDB::Watcher->_do_watch()",
109              
110             master_pid => $mpid,
111             data_dir => $ddir,
112             server_pid => $spid,
113             signal => $ssig,
114             kill => $kill,
115             hup => $hup,
116             );
117             }
118              
119             sub _do_watch {
120 0     0     my $class = shift;
121              
122 0           $0 = 'db-quick-watcher';
123              
124 0           my %params = @ARGV;
125              
126 0   0       my $kill = $params{kill} // '';
127 0   0       my $hup = $params{hup} // 0;
128 0     0     local $SIG{TERM} = sub { $kill = 'TERM' };
  0            
129 0     0     local $SIG{INT} = sub { $kill = 'INT' };
  0            
130 0     0     local $SIG{HUP} = sub { $hup = 1 };
  0            
131              
132 0           my $blah;
133 0           close(STDIN);
134 0 0         open(STDIN, '<', \$blah) or warn "$!";
135              
136 0 0         my $master_pid = $params{master_pid} or die "No master pid provided";
137 0 0         my $server_pid = $params{server_pid} or die "No server pid provided";
138 0 0         my $data_dir = $params{data_dir} or die "No data dir provided";
139 0   0       my $signal = $params{signal} // 'TERM';
140              
141 0           my $hupped = 0;
142 0           while (!$kill) {
143 0 0 0       if ($hup && !$hupped) {
144 0           close(STDOUT);
145 0 0         open(STDOUT, '>', \$blah) or warn "$!";
146 0           close(STDERR);
147 0 0         open(STDERR, '>', \$blah) or warn "$!";
148             }
149              
150 0           sleep 0.1;
151              
152 0 0         next if kill(0, $master_pid);
153 0           $kill = 'TERM';
154             }
155              
156 0 0         unless (eval { $class->_watcher_terminate(send_sig => $signal, got_sig => $kill, pid => $server_pid, dir => $data_dir); 1 }) {
  0            
  0            
157 0           my $err = $@;
158 0           eval { warn $@ };
  0            
159 0           POSIX::_exit(1);
160             }
161              
162 0           POSIX::_exit(0);
163             }
164              
165             sub spawn {
166 0     0 0   my $self = shift;
167              
168 0 0         croak "Extra spawn" if $self->{+SERVER_PID};
169              
170 0           my $db = $self->{+DB};
171 0   0       my $args = $self->{+ARGS} || [];
172              
173 0           my $init_pid = $$;
174 0           my ($pid, $log_file) = $db->run_command([$db->start_command, @$args], {no_wait => 1, log_file => $self->{+LOG_FILE}});
175 0           $self->{+SERVER_PID} = $pid;
176 0           $self->{+LOG_FILE} = $log_file;
177              
178 0           return $pid;
179             }
180              
181             sub _watcher_terminate {
182 0     0     my $class = shift;
183 0           my %params = @_;
184              
185 0 0         my $pid = $params{pid} or die "No pid";
186 0 0         my $dir = $params{dir} or die "No dir";
187              
188 0           my $got_sig = $params{got_sig};
189 0   0       my $send_sig = $params{send_sig} // $got_sig // 'TERM';
      0        
190              
191 0           $class->_watcher_kill($send_sig, $pid);
192              
193 0 0 0       if ($got_sig && $got_sig eq 'TERM') {
194             # Ignore errors here.
195 0           my $err = [];
196 0 0         remove_tree($dir, {safe => 1, error => \$err}) if -d $dir;
197             }
198             }
199              
200             sub _watcher_kill {
201 0     0     my $class = shift;
202 0           my ($sig, $pid) = @_;
203              
204 0 0         kill($sig, $pid) or die "Could not send kill signal";
205              
206             # How long to wait for a graceful shutdown before escalating to SIGKILL,
207             # and how much longer before giving up entirely. Keep this generous: a slow
208             # or loaded host (e.g. a CPAN smoke box) can need well over a few seconds to
209             # finish PostgreSQL's shutdown checkpoint. A premature SIGKILL leaves the
210             # data dir in a crash-recovery state, and a clone of that dir then replays
211             # WAL on first start, jumping SERIAL sequences forward by SEQ_LOG_VALS (32)
212             # -- silently corrupting cloned databases. Tunable via QDB_STOP_GRACE.
213 0           my $kill_after = $ENV{QDB_STOP_GRACE};
214 0 0 0       $kill_after = 15 unless defined($kill_after) && $kill_after =~ /^\d+$/;
215 0           my $give_up = $kill_after * 2;
216              
217 0           my ($check, $exit, $killed);
218 0           my $start = time;
219 0           until ($check) {
220 0           local $?;
221 0           my $delta = time - $start;
222              
223 0 0         if ($delta >= $kill_after) {
224 0 0         if ($killed) {
225 0           my $delta2 = time - $killed;
226 0 0         next unless $delta2 >= 1;
227             }
228              
229 0           warn "Server taking too long to shut down, sending SIGKILL";
230 0           $killed = time;
231 0           kill('KILL', $pid);
232              
233 0 0         last if $delta > $give_up;
234             }
235              
236 0           $check = waitpid($pid, WNOHANG);
237 0           $exit = $?;
238              
239 0           sleep 0.1;
240             }
241              
242 0 0         die "PID refused to exit" unless $check;
243 0 0         die "Something else reaped our process" if $check < 0;
244 0 0         die "Reaped the wrong process '$check' instead of '$pid'" if $pid != $check;
245              
246 0           return;
247             }
248              
249             sub stop {
250 0     0 0   my $self = shift;
251 0 0         return if $self->{+STOPPED}++;
252 0 0         my $pid = $self->{+WATCHER_PID} or return;
253 0           kill('INT', $pid);
254             }
255              
256             sub eliminate {
257 0     0 0   my $self = shift;
258 0 0         return if $self->{+ELIMINATED}++;
259 0 0         my $pid = $self->{+WATCHER_PID} or return;
260 0           kill('TERM', $pid);
261             }
262              
263             sub detach {
264 0     0 0   my $self = shift;
265 0 0         return if $self->{+DETACHED}++;
266 0 0         my $pid = $self->{+WATCHER_PID} or return;
267 0           kill('HUP', $pid);
268             }
269              
270             sub wait {
271 0     0 0   my $self = shift;
272 0 0         my $pid = $self->{+WATCHER_PID} or return;
273              
274 0           my $start = time;
275 0           while(kill(0, $pid)) {
276 0           my $waited = time - $start;
277 0 0         if ($waited > 10) {
278 0           kill('KILL', $pid);
279 0           $start = time;
280             }
281 0           sleep 0.02;
282             }
283             }
284              
285             sub DESTROY {
286 0     0     my $self = shift;
287              
288 0 0         if ($self->{+MASTER_PID} == $$) {
289 0           $self->eliminate;
290 0           $self->wait;
291             }
292             else {
293 0 0         unlink($self->{+LOG_FILE}) if $self->{+LOG_FILE};
294             }
295             }
296              
297             1;
298              
299             __END__