File Coverage

blib/lib/App/Sv.pm
Criterion Covered Total %
statement 57 278 20.5
branch 28 152 18.4
condition 5 51 9.8
subroutine 10 54 18.5
pod 2 2 100.0
total 102 537 18.9


line stmt bran cond sub pod time code
1             package App::Sv;
2             # ABSTRACT: Event-based multi-process supervisor
3             our $VERSION = '0.014';
4              
5 1     1   45308 use 5.008001;
  1         4  
  1         36  
6 1     1   6 use strict;
  1         1  
  1         27  
7 1     1   4 use warnings;
  1         6  
  1         34  
8              
9 1     1   35 use Carp 'croak';
  1         1  
  1         42  
10 1     1   1075 use POSIX;
  1         14341  
  1         9  
11 1     1   7380 use AnyEvent;
  1         6839  
  1         31  
12 1     1   1095 use AnyEvent::Socket;
  1         32414  
  1         182  
13 1     1   1392 use AnyEvent::Handle;
  1         7837  
  1         154  
14 1     1   644 use App::Sv::Log;
  1         3  
  1         3789  
15              
16             # Constructor
17             sub new {
18 14     14 1 49825 my $class = shift;
19 14         20 my $conf;
20            
21 14 100 100     79 if (@_ && scalar @_ > 1) {
22 7 100       133 croak "Odd number of arguments to App::Sv" if @_ % 2 != 0;
23 6         14 foreach my $id (0..$#_) {
24 12 100       47 $conf->{$_[$id]} = $_[$id+1] if $id % 2 == 0;
25             }
26             }
27             else {
28 7         11 $conf = shift @_;
29             }
30            
31 13         23 my $run = $conf->{run};
32 13         45 $conf->{global}->{log} = delete $conf->{log};
33 13 100       418 croak "Commands must be passed as a hash ref" if ref $run ne 'HASH';
34 11 100       216 croak "Missing command list" if !scalar (keys %$run);
35            
36             # set defaults
37 9         43 my $defaults = {
38             start_retries => 8,
39             restart_delay => 1,
40             start_wait => 1,
41             stop_wait => 0,
42             setsid => 1
43             };
44             # check options
45 9         23 foreach my $svc (keys %$run) {
46 10 100       53 if (!$run->{$svc}) {
    100          
    100          
    100          
47 1         115 croak "Missing command for \'$svc\'";
48             }
49             elsif (!ref $run->{$svc}) {
50 2         11 $run->{$svc} = { cmd => $run->{$svc} };
51             }
52             elsif (ref $run->{$svc} eq 'CODE') {
53 1         4 $run->{$svc} = { code => $run->{$svc} };
54             }
55             elsif (ref $run->{$svc} eq 'ARRAY') {
56 3 100       18 if (!$run->{$svc}->[0]) {
    100          
    50          
57 1         97 croak "Missing command for \'$svc\'";
58             }
59             elsif (!ref $run->{$svc}->[0]) {
60 1         4 $run->{$svc} = { cmd => $run->{$svc} };
61             }
62             elsif (ref $run->{$svc}->[0] eq 'CODE') {
63 1         4 $run->{$svc} = { code => $run->{$svc} };
64             }
65             }
66            
67 8 100       21 if (ref $run->{$svc} eq 'HASH') {
68 7 100 66     29 if (!$run->{$svc}->{cmd} && !$run->{$svc}->{code}) {
69 1         112 croak "Missing command for \'$svc\'"
70             }
71 6         9 $run->{$svc}->{name} = $svc;
72 6         19 foreach my $opt (keys %$defaults) {
73 30 50 0     51 if (!defined $run->{$svc}->{$opt}) {
    0          
74 30         60 $run->{$svc}->{$opt} = $defaults->{$opt};
75             }
76             elsif ($opt =~ /delay|wait/ && $run->{$svc}->{$opt} <= 0) {
77 0         0 $run->{$svc}->{$opt} = $defaults->{$opt};
78             }
79             }
80             }
81             else {
82 1         94 croak "Missing command for \'$svc\'";
83             }
84             }
85            
86 5         34 return bless { run => $run, conf => $conf->{global} }, $class;
87             }
88              
89             # Start everything
90             sub run {
91 0     0 1   my $self = shift;
92 0           my $cv = AE::cv;
93            
94             # signal watchers
95             my $int_s = AE::signal 'INT' => sub {
96 0     0     $self->_signal_all_svc('INT', $cv);
97 0           };
98             my $hup_s = AE::signal 'HUP' => sub {
99 0     0     $self->_signal_all_svc('HUP', $cv);
100 0           };
101             my $term_s = AE::signal 'TERM' => sub {
102 0     0     $self->_signal_all_svc('TERM');
103 0           $cv->send
104 0           };
105             # set global umask
106 0 0         umask oct($self->{conf}->{umask}) if $self->{conf}->{umask};
107             # initialize logger
108 0           $self->{log} = App::Sv::Log->new($self->{conf}->{log});
109             # open controling socket; load commands
110 0 0         $self->_listener() if $self->{conf}->{listen};
111 0 0         $self->{cmds} = $self->_client_cmds() if ref $self->{server} eq 'Guard';
112            
113             # start all services
114 0           foreach my $key (keys %{ $self->{run} }) {
  0            
115 0           my $svc = $self->{run}->{$key};
116 0           $self->_start_svc($svc);
117             }
118            
119 0           $cv->recv;
120             }
121              
122             sub _start_svc {
123 0     0     my ($self, $svc) = @_;
124            
125 0           my $debug = $self->{log}->logger(8);
126 0           my $warn = $self->{log}->logger(5);
127 0           $svc->{state} = 'start';
128 0 0         if ($svc->{start_count}) {
129 0           $svc->{start_count}++;
130             }
131             else {
132 0           $svc->{start_count} = 1;
133             }
134            
135 0           $debug->("Starting '$svc->{name}' attempt $svc->{start_count}");
136 0           my $pid = fork();
137 0 0         if (!defined $pid) {
138 0           $warn->("Failed to fork '$svc->{name}': $!");
139 0           $self->_restart_svc($svc);
140 0           return;
141             }
142            
143 0 0         if ($pid == 0) {
144             # child
145             # set egid/euid
146 0 0         if ($svc->{group}) {
147 0           $svc->{gid} = getgrnam($svc->{group});
148 0           $) = $svc->{gid};
149             }
150 0 0         if ($svc->{user}) {
151 0           $svc->{uid} = getpwnam($svc->{user});
152 0           $> = $svc->{uid};
153             }
154             # set process umask
155 0 0         umask oct($svc->{umask}) if $svc->{umask};
156             # change working directory
157 0 0         if ($svc->{cwd}) {
158 0 0         chdir $svc->{cwd}
159             or $warn->("Failed cwd for '$svc->{name}': $!");
160             }
161             # set environment
162 0 0 0       %ENV = %{$svc->{env}} if $svc->{env} && ref $svc->{env} eq 'HASH';
  0            
163             # set session id
164 0 0         if ($svc->{setsid}) {
165 0 0         $svc->{pgrp} = POSIX::setsid()
166             or $warn->("Failed setsid for '$svc->{name}': $!");
167             };
168             # start process
169 0 0 0       if ($svc->{cmd} && !ref $svc->{cmd}) {
    0 0        
    0 0        
    0 0        
170 0           $debug->("Executing command '$svc->{name}'");
171 0           exec($svc->{cmd});
172             }
173             elsif ($svc->{cmd} && ref $svc->{cmd} eq 'ARRAY') {
174 0           $debug->("Executing command '$svc->{name}'");
175 0           exec(@{$svc->{cmd}});
  0            
176             }
177             elsif ($svc->{code} && ref $svc->{code} eq 'CODE') {
178 0           $debug->("Executing code '$svc->{name}'");
179 0           $svc->{code}->();
180             }
181             elsif ($svc->{code} && ref $svc->{code} eq 'ARRAY') {
182 0           my $code = shift @{$svc->{code}};
  0            
183 0 0         if (ref $code eq 'CODE') {
184 0           $debug->("Executing code '$svc->{name}'");
185 0           $code->(@{$svc->{code}});
  0            
186             }
187             }
188 0           POSIX::_exit(1);
189             }
190             else {
191             # parent
192 0           $debug->("Watching pid $pid for '$svc->{name}'");
193 0           $svc->{pid} = $pid;
194             $svc->{watcher} = AE::child $pid, sub {
195 0     0     $self->_child_exited($svc, @_);
196 0           };
197 0           $svc->{start_ts} = time;
198 0           my $t; $t = AE::timer $svc->{start_wait}, 0, sub {
199 0     0     $self->_check_svc_up($svc);
200 0           undef $t;
201 0           };
202             }
203            
204 0           return $pid;
205             }
206              
207             sub _child_exited {
208 0     0     my ($self, $svc, undef, $status) = @_;
209            
210 0           my $debug = $self->{log}->logger(8);
211 0           $debug->("Child $svc->{pid} exited, status $status: '$svc->{name}'");
212 0           delete $svc->{watcher};
213 0           delete $svc->{pid};
214 0           $svc->{last_status} = $status >> 8;
215 0 0         if ($svc->{state} eq 'stop') {
    0          
216 0           delete $svc->{start_count};
217 0           $svc->{state} = 'down';
218             }
219             elsif ($svc->{once}) {
220 0           delete $svc->{start_count};
221 0           $svc->{state} = 'fatal';
222             }
223             else {
224 0           $self->_restart_svc($svc);
225             }
226             }
227              
228             sub _restart_svc {
229 0     0     my ($self, $svc) = @_;
230              
231 0 0         if ($svc->{start_retries}) {
232 0 0 0       if ($svc->{start_count} &&
233             ($svc->{start_count} >= $svc->{start_retries})) {
234 0           $svc->{state} = 'fatal';
235 0           return;
236             }
237             }
238             else {
239 0           $svc->{state} = 'fatal';
240 0           return;
241             }
242 0           my $debug = $self->{log}->logger(8);
243 0           $svc->{state} = 'restart';
244 0           $debug->("Restarting '$svc->{name}' in $svc->{restart_delay} seconds");
245 0           my $t; $t = AE::timer $svc->{restart_delay}, 0, sub {
246 0     0     $self->_start_svc($svc);
247 0           undef $t;
248 0           };
249             }
250              
251             sub _check_svc_up {
252 0     0     my ($self, $svc) = @_;
253            
254 0 0         return unless $svc->{state} eq 'start';
255 0 0         if (!$svc->{pid}) {
256 0           $svc->{state} = 'fail';
257 0           return;
258             }
259 0           delete $svc->{start_count};
260 0           $svc->{state} = 'up';
261             }
262              
263             sub _stop_svc {
264 0     0     my ($self, $svc) = @_;
265            
266 0           $svc->{state} = 'stop';
267 0           my $st = $self->_signal_svc($svc, 'TERM');
268 0 0 0       if ($svc->{stop_wait} && $svc->{stop_wait} > 0) {
269 0           my $t; $t = AE::timer $svc->{stop_wait}, 0, sub {
270 0     0     $self->_check_svc_down($svc);
271 0           undef $t;
272 0           };
273             }
274            
275 0           return $st;
276             }
277              
278             sub _check_svc_down {
279 0     0     my ($self, $svc) = @_;
280            
281 0 0         return unless $svc->{state} eq 'stop';
282 0 0         if ($svc->{pid}) {
283 0           my $st = $self->_signal_svc($svc, 'KILL');
284             }
285             }
286              
287             sub _signal_svc {
288 0     0     my ($self, $svc, $sig) = @_;
289            
290 0 0 0       return unless ($svc->{pid} && $sig);
291 0           my $debug = $self->{log}->logger(8);
292 0           $debug->("Sent signal $sig to pid $svc->{pid}");
293 0           my $st = kill($sig, $svc->{pid});
294            
295 0           return $st;
296             }
297              
298             sub _signal_all_svc {
299 0     0     my ($self, $sig, $cv) = @_;
300            
301 0           my $debug = $self->{log}->logger(8);
302 0           $debug->("Received signal $sig");
303 0           my $is_any_alive = 0;
304 0           foreach my $key (keys %{ $self->{run} }) {
  0            
305 0           my $svc = $self->{run}->{$key};
306 0 0         next unless my $pid = $svc->{pid};
307 0           $debug->("... sent signal $sig to pid $pid");
308 0           $is_any_alive++;
309 0           kill($sig, $pid);
310             }
311              
312 0 0 0       return if $cv and $is_any_alive;
313              
314 0           $debug->('Exiting...');
315 0 0         $cv->send if $cv;
316             }
317              
318             # Contolling socket
319             sub _listener {
320 0     0     my $self = shift;
321            
322 0           my $debug = $self->{log}->logger(8);
323 0           my ($host, $port) = parse_hostport($self->{conf}->{listen});
324 0 0 0       croak "Socket \'$port\' already in use" if ($host eq 'unix/' && -e $port);
325            
326             $self->{server} = tcp_server $host, $port,
327 0     0     sub { $self->_client_conn(@_) },
328             sub {
329 0     0     my ($fh, $host, $port) = @_;
330 0           $debug->("Listening at $host:$port");
331 0           };
332             }
333              
334             sub _client_conn {
335 0     0     my ($self, $fh, $host, $port) = @_;
336            
337 0 0         return unless $fh;
338 0           my $debug = $self->{log}->logger(8);
339 0           $debug->("New connection to $host:$port");
340            
341 0           my $hdl; $hdl = AnyEvent::Handle->new(
342             fh => $fh,
343             timeout => 30,
344             rbuf_max => 64,
345             wbuf_max => 64,
346 0     0     on_read => sub { $self->_client_input($hdl) },
347 0     0     on_eof => sub { $self->_client_disconn($hdl) },
348 0     0     on_timeout => sub { $self->_client_error($hdl, undef, 'Timeout') },
349 0     0     on_error => sub { $self->_client_error($hdl, undef, $!) }
350 0           );
351 0           $self->{conn}->{fileno($fh)} = $hdl;
352            
353 0           return $fh;
354             }
355              
356             sub _client_input {
357 0     0     my ($self, $hdl) = @_;
358            
359             $hdl->push_read(line => sub {
360 0     0     my ($hdl, $ln) = @_;
361            
362 0           my $client = $self->{conn}->{fileno($hdl->fh)};
363 0           my $cmds = $self->{cmds};
364 0 0         if ($ln) {
365             # generic commands
366 0           $hdl->push_write("\n");
367 0 0         if ($ln =~ /^(\.|quit)$/) {
    0          
    0          
368 0           $self->_client_disconn($hdl);
369             }
370             elsif ($ln eq 'status') {
371 0           $self->_status($hdl);
372             }
373             elsif (index($ln, ' ') >= 0) {
374 0           my ($sw, $svc) = split(' ', $ln);
375 0 0 0       if ($sw && $svc) {
376 0           my $st;
377 0 0 0       if ($self->{run}->{$svc} && ref $cmds->{$sw} eq 'CODE') {
378 0           $svc = $self->{run}->{$svc};
379 0           $st = $cmds->{$sw}->($svc);
380             }
381             else {
382 0           $hdl->push_write("$ln unknown\n");
383 0           return;
384             }
385             # response
386 0 0         $st = ref $st eq 'ARRAY' ? join(' ', @$st) : $st;
387 0 0         $st = $st ? $st : 'fail';
388 0 0         $hdl->push_write("$ln $st\n") if $st;
389             }
390             }
391             else {
392 0           $hdl->push_write("$ln unknown\n");
393             }
394             }
395 0           });
396             }
397              
398             sub _client_disconn {
399 0     0     my ($self, $hdl) = @_;
400            
401 0           my $debug = $self->{log}->logger(8);
402 0           delete $self->{conn}->{fileno($hdl->fh)};
403 0           $hdl->destroy();
404 0           $debug->("Connection closed");
405             }
406              
407             sub _client_error {
408 0     0     my ($self, $hdl, $fatal, $msg) = @_;
409            
410 0           my $debug = $self->{log}->logger(8);
411 0           delete $self->{conn}->{fileno($hdl->fh)};
412 0           $debug->("Connection error: $msg");
413 0           $hdl->destroy();
414             }
415              
416             sub _client_cmds {
417 0     0     my $self = shift;
418            
419             my $cmds = {
420             up => sub {
421 0 0   0     unless ($_[0]->{pid}) {
422 0           delete $_[0]->{once};
423 0           return $self->_start_svc($_[0]);
424             }
425             },
426             once => sub {
427 0 0   0     unless ($_[0]->{pid}) {
428 0           $_[0]->{once} = 1;
429 0           return $self->_start_svc($_[0]);
430             }
431             },
432             down => sub {
433 0 0   0     return $self->_stop_svc($_[0]) if $_[0]->{pid};
434             },
435             pause => sub {
436 0 0   0     return $self->_signal_svc($_[0], 'STOP') if $_[0]->{pid};
437             },
438             cont => sub {
439 0 0   0     return $self->_signal_svc($_[0], 'CONT') if $_[0]->{pid};
440             },
441             hup => sub {
442 0 0   0     return $self->_signal_svc($_[0], 'HUP') if $_[0]->{pid};
443             },
444             alarm => sub {
445 0 0   0     return $self->_signal_svc($_[0], 'ALRM') if $_[0]->{pid};
446             },
447             int => sub {
448 0 0   0     return $self->_signal_svc($_[0], 'INT') if $_[0]->{pid};
449             },
450             quit => sub {
451 0 0   0     return $self->_signal_svc($_[0], 'QUIT') if $_[0]->{pid};
452             },
453             usr1 => sub {
454 0 0   0     return $self->_signal_svc($_[0], 'USR1') if $_[0]->{pid};
455             },
456             usr2 => sub {
457 0 0   0     return $self->_signal_svc($_[0], 'USR2') if $_[0]->{pid};
458             },
459             term => sub {
460 0 0   0     return $self->_signal_svc($_[0], 'TERM') if $_[0]->{pid};
461             },
462             kill => sub {
463 0 0   0     return $self->_signal_svc($_[0], 'KILL') if $_[0]->{pid};
464             },
465             status => sub {
466 0 0 0 0     if ($_[0]->{pid} && $_[0]->{start_ts}) {
    0          
467             return([
468 0           $_[0]->{state},
469             $_[0]->{pid},
470             time - $_[0]->{start_ts}
471             ]);
472             }
473             elsif ($_[0]->{start_count}) {
474             return([
475 0           $_[0]->{state},
476             $_[0]->{start_count}
477             ]);
478             }
479             else {
480 0           return $_[0]->{state};
481             }
482             }
483 0           };
484            
485 0           return $cmds;
486             }
487              
488             # Commands status
489             sub _status {
490 0     0     my ($self, $hdl) = @_;
491            
492 0 0 0       return unless ($hdl && ref $self->{cmds}->{status} eq 'CODE');
493 0           foreach my $key (keys %{ $self->{run} }) {
  0            
494 0           my $st = $self->{cmds}->{status}->($self->{run}->{$key});
495 0 0         $st = ref $st eq 'ARRAY' ? join(' ', @$st) : $st;
496 0           $hdl->push_write("$key $st\n");
497             }
498 0           $hdl->push_write("\n");
499             }
500              
501             1;
502              
503             __END__