File Coverage

blib/lib/DBIx/QuickDB/Watcher.pm
Criterion Covered Total %
statement 24 180 13.3
branch 0 86 0.0
condition 0 24 0.0
subroutine 8 26 30.7
pod 0 8 0.0
total 32 324 9.8


line stmt bran cond sub pod time code
1             package DBIx::QuickDB::Watcher;
2 19     19   133 use strict;
  19         45  
  19         780  
3 19     19   94 use warnings;
  19         37  
  19         1599  
4              
5             our $VERSION = '0.000039';
6              
7 19     19   142 use Carp qw/croak/;
  19         76  
  19         1238  
8 19     19   132 use POSIX qw/:sys_wait_h/;
  19         95  
  19         143  
9 19     19   3733 use Time::HiRes qw/sleep time/;
  19         99  
  19         187  
10 19     19   1636 use Scalar::Util qw/weaken/;
  19         51  
  19         1107  
11 19     19   112 use File::Path qw/remove_tree/;
  19         36  
  19         1273  
12              
13 19         178 use DBIx::QuickDB::Util::HashBase qw{
14            
15            
16            
17            
18            
19              
20            
21            
22            
23              
24            
25 19     19   11386 };
  19         66  
26              
27             sub init {
28 0     0 0   my $self = shift;
29              
30 0   0       $self->{+MASTER_PID} ||= $$;
31              
32 0           $self->{+LOG_FILE} = $self->{+DB}->gen_log;
33              
34 0           $self->start();
35              
36 0 0         weaken($self->{+DB}) if $self->{+MASTER_PID} == $$;
37             }
38              
39             sub start {
40 0     0 0   my $self = shift;
41 0 0         return if $self->{+SERVER_PID};
42              
43 0           my ($rh, $wh);
44 0 0         pipe($rh, $wh) or die "Could not open pipe: $!";
45              
46 0           my $pid = fork;
47 0 0         die "Could not fork: $!" unless defined $pid;
48              
49 0 0         if ($pid) {
50 0           close($wh);
51 0           waitpid($pid, 0);
52 0           chomp($self->{+WATCHER_PID} = <$rh>);
53 0           chomp($self->{+SERVER_PID} = <$rh>);
54 0           close($rh);
55 0 0         die "Did not get watcher pid!" unless $self->{+WATCHER_PID};
56 0 0         die "Did not get server pid!" unless $self->{+SERVER_PID};
57 0           return;
58             }
59              
60 0           close($rh);
61 0           POSIX::setsid();
62 0           setpgrp(0, 0);
63 0           $pid = fork;
64 0 0         die "Could not fork: $!" unless defined $pid;
65 0 0         POSIX::_exit(0) if $pid;
66              
67 0           $wh->autoflush(1);
68 0           print $wh "$$\n";
69              
70             # In watcher now
71 0 0         eval { $self->watch($wh); 1 } or POSIX::_exit(1);
  0            
  0            
72 0           POSIX::_exit(0);
73             }
74              
75             sub watch {
76 0     0 0   my $self = shift;
77 0           my ($wh) = @_;
78              
79 0           $0 = 'db-quick-watcher';
80              
81 0           my $kill = '';
82 0           my $hup = 0;
83 0     0     local $SIG{TERM} = sub { $kill = 'TERM' };
  0            
84 0     0     local $SIG{INT} = sub { $kill = 'INT' };
  0            
85 0     0     local $SIG{HUP} = sub { $hup = 1 };
  0            
86              
87 0           my $start_pid = $$;
88 0           my $pid = $self->spawn();
89 0           print $wh "$pid\n";
90 0           close($wh);
91              
92 0           my $mpid = $self->{+MASTER_PID};
93 0 0         my $spid = $self->{+SERVER_PID} or die "No server pid";
94              
95 0           my $ddir = $self->{+DB}->dir;
96 0   0       my $ssig = $self->{+DB}->stop_sig // 'TERM';
97              
98 0           exec(
99             $^X, '-Ilib',
100              
101             '-e' => "require DBIx::QuickDB::Watcher; DBIx::QuickDB::Watcher->_do_watch()",
102              
103             master_pid => $mpid,
104             data_dir => $ddir,
105             server_pid => $spid,
106             signal => $ssig,
107             kill => $kill,
108             hup => $hup,
109             );
110             }
111              
112             sub _do_watch {
113 0     0     my $class = shift;
114              
115 0           $0 = 'db-quick-watcher';
116              
117 0           my %params = @ARGV;
118              
119 0   0       my $kill = $params{kill} // '';
120 0   0       my $hup = $params{hup} // 0;
121 0     0     local $SIG{TERM} = sub { $kill = 'TERM' };
  0            
122 0     0     local $SIG{INT} = sub { $kill = 'INT' };
  0            
123 0     0     local $SIG{HUP} = sub { $hup = 1 };
  0            
124              
125 0           my $blah;
126 0           close(STDIN);
127 0 0         open(STDIN, '<', \$blah) or warn "$!";
128              
129 0 0         my $master_pid = $params{master_pid} or die "No master pid provided";
130 0 0         my $server_pid = $params{server_pid} or die "No server pid provided";
131 0 0         my $data_dir = $params{data_dir} or die "No data dir provided";
132 0   0       my $signal = $params{signal} // 'TERM';
133              
134 0           my $hupped = 0;
135 0           while (!$kill) {
136 0 0 0       if ($hup && !$hupped) {
137 0           close(STDOUT);
138 0 0         open(STDOUT, '>', \$blah) or warn "$!";
139 0           close(STDERR);
140 0 0         open(STDERR, '>', \$blah) or warn "$!";
141             }
142              
143 0           sleep 0.1;
144              
145 0 0         next if kill(0, $master_pid);
146 0           $kill = 'TERM';
147             }
148              
149 0 0         unless (eval { $class->_watcher_terminate(send_sig => $signal, got_sig => $kill, pid => $server_pid, dir => $data_dir); 1 }) {
  0            
  0            
150 0           my $err = $@;
151 0           eval { warn $@ };
  0            
152 0           POSIX::_exit(1);
153             }
154              
155 0           POSIX::_exit(0);
156             }
157              
158             sub spawn {
159 0     0 0   my $self = shift;
160              
161 0 0         croak "Extra spawn" if $self->{+SERVER_PID};
162              
163 0           my $db = $self->{+DB};
164 0   0       my $args = $self->{+ARGS} || [];
165              
166 0           my $init_pid = $$;
167 0           my ($pid, $log_file) = $db->run_command([$db->start_command, @$args], {no_wait => 1, log_file => $self->{+LOG_FILE}});
168 0           $self->{+SERVER_PID} = $pid;
169 0           $self->{+LOG_FILE} = $log_file;
170              
171 0           return $pid;
172             }
173              
174             sub _watcher_terminate {
175 0     0     my $class = shift;
176 0           my %params = @_;
177              
178 0 0         my $pid = $params{pid} or die "No pid";
179 0 0         my $dir = $params{dir} or die "No dir";
180              
181 0           my $got_sig = $params{got_sig};
182 0   0       my $send_sig = $params{send_sig} // $got_sig // 'TERM';
      0        
183              
184 0           $class->_watcher_kill($send_sig, $pid);
185              
186 0 0 0       if ($got_sig && $got_sig eq 'TERM') {
187             # Ignore errors here.
188 0           my $err = [];
189 0 0         remove_tree($dir, {safe => 1, error => \$err}) if -d $dir;
190             }
191             }
192              
193             sub _watcher_kill {
194 0     0     my $class = shift;
195 0           my ($sig, $pid) = @_;
196              
197 0 0         kill($sig, $pid) or die "Could not send kill signal";
198              
199 0           my ($check, $exit, $killed);
200 0           my $start = time;
201 0           until ($check) {
202 0           local $?;
203 0           my $delta = time - $start;
204              
205 0 0         if ($delta >= 4) {
206 0 0         if ($killed) {
207 0           my $delta2 = time - $killed;
208 0 0         next unless $delta2 >= 1;
209             }
210              
211 0           warn "Server taking too long to shut down, sending SIGKILL";
212 0           $killed = time;
213 0           kill('KILL', $pid);
214              
215 0 0         last if $delta > 8;
216             }
217              
218 0           $check = waitpid($pid, WNOHANG);
219 0           $exit = $?;
220              
221 0           sleep 0.1;
222             }
223              
224 0 0         die "PID refused to exit" unless $check;
225 0 0         die "Something else reaped our process" if $check < 0;
226 0 0         die "Reaped the wrong process '$check' instead of '$pid'" if $pid != $check;
227              
228 0           return;
229             }
230              
231             sub stop {
232 0     0 0   my $self = shift;
233 0 0         return if $self->{+STOPPED}++;
234 0 0         my $pid = $self->{+WATCHER_PID} or return;
235 0           kill('INT', $pid);
236             }
237              
238             sub eliminate {
239 0     0 0   my $self = shift;
240 0 0         return if $self->{+ELIMINATED}++;
241 0 0         my $pid = $self->{+WATCHER_PID} or return;
242 0           kill('TERM', $pid);
243             }
244              
245             sub detach {
246 0     0 0   my $self = shift;
247 0 0         return if $self->{+DETACHED}++;
248 0 0         my $pid = $self->{+WATCHER_PID} or return;
249 0           kill('HUP', $pid);
250             }
251              
252             sub wait {
253 0     0 0   my $self = shift;
254 0 0         my $pid = $self->{+WATCHER_PID} or return;
255              
256 0           my $start = time;
257 0           while(kill(0, $pid)) {
258 0           my $waited = time - $start;
259 0 0         if ($waited > 10) {
260 0           kill('KILL', $pid);
261 0           $start = time;
262             }
263 0           sleep 0.02;
264             }
265             }
266              
267             sub DESTROY {
268 0     0     my $self = shift;
269              
270 0 0         if ($self->{+MASTER_PID} == $$) {
271 0           $self->eliminate;
272 0           $self->wait;
273             }
274             else {
275 0 0         unlink($self->{+LOG_FILE}) if $self->{+LOG_FILE};
276             }
277             }
278              
279             1;
280              
281             __END__