File Coverage

lib/ControlFreak/Proxy.pm
Criterion Covered Total %
statement 208 301 69.1
branch 40 88 45.4
condition 14 32 43.7
subroutine 32 51 62.7
pod 11 26 42.3
total 305 498 61.2


line stmt bran cond sub pod time code
1             package ControlFreak::Proxy;
2 8     8   47 use strict;
  8         17  
  8         288  
3 8     8   44 use warnings;
  8         17  
  8         234  
4              
5 8     8   46 use AnyEvent::Util();
  8         15  
  8         121  
6 8     8   41 use Carp;
  8         14  
  8         574  
7 8     8   50 use ControlFreak::Util;
  8         25  
  8         297  
8 8     8   40 use Fcntl qw(F_GETFD F_SETFD FD_CLOEXEC);
  8         21  
  8         666  
9 8     8   46 use JSON::XS;
  8         16  
  8         498  
10 8     8   44 use Object::Tiny qw{ name cmd pid is_running env auto };
  8         12  
  8         59  
11 8     8   2314 use Params::Util qw{ _ARRAY _STRING };
  8         18  
  8         456  
12 8     8   41 use POSIX 'SIGTERM';
  8         15  
  8         120  
13 8     8   423 use Scalar::Util();
  8         17  
  8         159  
14 8     8   47 use Try::Tiny;
  8         13  
  8         34440  
15              
16             =pod
17              
18             =head1 NAME
19              
20             ControlFreak::Proxy - Delegate some control to an intermediary process.
21              
22             =head1 DESCRIPTION
23              
24             There are some cases where you want some services managed in a special way,
25             and it makes no sense to implement this in C itself.
26              
27             Indeed, one design trait of B is its absolute simplicity, we
28             don't want to clutter it with features that are only rarely used or that
29             could make the controller unstable.
30              
31             One example of that is Memory Sharing. If you have 20 application processes
32             running on one machine all having the same code running, there is a
33             memory benefit into making sure the app is loaded in the parent process
34             of all these applications. Indeed, it would allow all children to initially
35             share parent code and thus potentially reduce the memory footprint of the
36             application by quite a while, maybe. But, it's out of question for the
37             C to load that code in its own memory. A better solution is to use
38             a C separate process that will:
39              
40             =over 4
41              
42             =item * load the application code once and for all
43              
44             =item * take commands from the main C (over pipes)
45              
46             =item * fork children when instructed, that exec some user defined commands
47              
48             =back
49              
50             =head1 SYNOPSIS
51              
52             $proxy = ControlFreak::Proxy->new(
53             ctrl => $ctrl,
54             cmd => '/usr/bin/cfk-share-mem-proxy.pl --preload Some::Module',
55              
56             );
57             $proxy->add_service($svc);
58             $proxy->destroy_service($svc);
59             $proxy->run;
60             $proxy->start_service($svc);
61             $proxy->stop_service($svc);
62             @list = $proxy->services;
63             $proxy->shutdown;
64             $proxy->is_running;
65              
66             =head1 METHODS
67              
68             =head2 new(%param)
69              
70             =cut
71              
72             sub new {
73 3     3 1 23 my $class = shift;
74 3         11 my %param = @_;
75              
76 3         7 my $ctrl = $param{ctrl};
77 3 50       10 unless ($ctrl) {
78 0         0 warn "Proxy creation attempt without ctrl";
79 0         0 return;
80             }
81              
82 3 50       10 unless ($param{name}) {
83 0         0 $ctrl->log->error("Proxy creation attempt without a name");
84 0         0 return;
85             }
86              
87 3         22 my $proxy = $class->SUPER::new(%param);
88 3         28 $proxy->{ctrl} = $ctrl;
89 3         9 $proxy->{servicemap} = {};
90 3   50     18 $proxy->{env} ||= {};
91 3 100       10 unless (defined $param{auto}) {
92 2         6 $proxy->{auto} = 1; # proxy is 'auto' by default
93             }
94 3 50       15 unless ($ctrl->add_proxy($proxy)) {
95 0         0 $ctrl->log->error("A proxy by that name already exists");
96 0         0 return;
97             }
98 3         17 Scalar::Util::weaken($proxy->{ctrl});
99 3         9 return $proxy;
100             }
101              
102             =head2 status_as_text
103              
104             Returns the status of the proxy, including its eventual pid in one line of
105             text, where the following fields are seperated with tabs:
106              
107             =over 4
108              
109             =item * name
110              
111             =item * status ('up' or 'down')
112              
113             =item * pid, if proxy is up
114              
115             =back
116              
117             =cut
118              
119             sub status {
120 0     0 1 0 my $proxy = shift;
121 0 0       0 return $proxy->is_running ? "up" : "down";
122             }
123              
124             sub status_as_text {
125 0     0 1 0 my $proxy = shift;
126 0 0       0 return join "\t", map { $proxy->$_ || "" } qw/name status pid/;
  0         0  
127             }
128              
129 1     1   14 sub _err { ControlFreak::Util::error(@_) }
130              
131             =head2 services
132              
133             Returns a list of L objects related to the
134             proxy.
135              
136             =cut
137             sub services {
138 6     6 1 24 my $proxy = shift;
139 6         8 return values %{ $proxy->{servicemap} };
  6         48  
140             }
141              
142             =head2 add_service($svc)
143              
144             Declares a service under the control of the proxy.
145              
146             =cut
147              
148             sub add_service {
149 3     3 1 1927 my $proxy = shift;
150 3         7 my $svc = shift;
151 3         83 $proxy->{servicemap}->{$svc->name} = $svc;
152 3         29 $svc->assign_proxy($proxy);
153 3         6 return 1;
154             }
155              
156             =head2 start_service
157              
158             Given a L, check that it is effectively
159             under the control of a L object and contact the
160             later to instruct it to start the service on our behalf.
161              
162             =cut
163              
164             sub start_service {
165 5     5 1 19 my $proxy = shift;
166 5         41 my %param = @_;
167              
168 5         15 my $svc = $param{service};
169              
170 5         146 my $name = $svc->name;
171 5 50 33     77 unless ($svc->{proxy} && $svc->{proxy} eq $proxy) {
172 0         0 return $proxy->_err(
173             %param, "Cannot start svc '$name': inappropriate proxy"
174             );
175             }
176 5 100       153 unless ($proxy->is_running) {
177 1 50       23 if ($proxy->auto) {
178 1         9 $proxy->run;
179             }
180             else {
181 0         0 return $proxy->_err(
182             %param, "Proxy is not running, not starting service '$name'"
183             );
184             }
185             }
186 5         351 my $hdl = $proxy->{command_hdl};
187 5         159 my $descr = {
188             command => 'start',
189             cmd => $svc->cmd,
190             name => $svc->name,
191             ignore_stderr => $svc->ignore_stderr,
192             ignore_stdout => $svc->ignore_stdout,
193             env => $svc->env,
194             tie_stdin_to => $svc->tie_stdin_to,
195             no_new_session => $svc->no_new_session,
196             };
197 5         352 my $string = encode_json($descr);
198 5         61 $hdl->push_write("$string\n");
199             }
200              
201             sub stop_service {
202 1     1 0 3 my $proxy = shift;
203 1         5 my %param = @_;
204 1         2 my $svc = $param{service};
205              
206 1         29 my $pname = $proxy->name;
207 1         223 my $sname = $svc->name;
208 1 50       33 return $proxy->_err(%param, "proxy '$pname' not running for '$sname'")
209             unless $proxy->is_running;
210              
211 1         9 my $hdl = $proxy->{command_hdl};
212 1 50       5 unless ($hdl) {
213             ## TODO: cleanup?
214 0         0 $proxy->{ctrl}->log->error("proxy '$pname' is gone");
215 0         0 return;
216             }
217 1         100 my $descr = {
218             command => 'stop',
219             name => $svc->name,
220             };
221 1         105 my $string = encode_json($descr);
222 1         11 $hdl->push_write("$string\n");
223             }
224              
225             sub unset {
226 0     0 0 0 my $proxy = shift;
227 0         0 my $attr = shift;
228 0         0 $proxy->{$attr} = undef;
229 0         0 return 1;
230             }
231              
232             sub setup_environment {
233 0     0 0 0 my $proxy = shift;
234 0         0 my $env = $proxy->env;
235 0 0       0 return unless $env;
236 0 0       0 return unless ref $env eq 'HASH';
237 0         0 while (my ($k, $v) = each %$env) {
238 0         0 $ENV{$k} = $v;
239             }
240 0         0 return 1;
241             }
242              
243             sub set_add_env {
244 0     0 0 0 my $proxy = shift;
245 0 0       0 my $value = _STRING($_[0]) or return;
246 0         0 my ($key, $val) = split /=/, $value, 2;
247 0         0 $proxy->{ctrl}->log->debug( "Setting ENV{$key} to '$val'" );
248 0         0 $proxy->add_env($key, $val);
249             }
250              
251             =head2 add_env($key => $value)
252              
253             Adds an environment key, value pair to the proxy
254              
255             =cut
256              
257             sub add_env {
258 0     0 1 0 my $proxy = shift;
259 0         0 my ($key, $value) = @_;
260 0         0 $proxy->env->{$key} = $value;
261 0         0 return 1;
262             }
263              
264             =head2 clear_env()
265              
266             Resets proxy environment to empty.
267              
268             =cut
269              
270             sub clear_env {
271 0     0 1 0 my $proxy = shift;
272 0         0 $proxy->{env} = {};
273             }
274              
275             sub set_cmd {
276 2 50   2 0 499 my $value = (ref $_[1] ? _ARRAY($_[1]) : _STRING($_[1])) or return;
    50          
277 2         11 shift->_set('cmd', $value);
278             }
279              
280             sub set_cmd_from_con {
281 0     0 0 0 my $proxy = shift;
282 0         0 my $value = shift;
283 0 0       0 return $proxy->unset('cmd') unless defined $value;
284 0 0       0 if ($value =~ /^\[/) {
285 0     0   0 $value = try { decode_json($value) }
286             catch {
287 0     0   0 my $error = $_;
288 0         0 $proxy->{ctrl}->log->error("Invalid JSON: $error");
289 0         0 return;
290 0         0 };
291             }
292 0         0 return $proxy->set_cmd($value);
293             }
294              
295             sub set_desc {
296 0 0   0 0 0 my $value = _STRING($_[1]) or return;
297 0         0 $value =~ s/[\n\r\t\0]+//g; ## desc should be one line
298 0         0 shift->_set('desc', $value);
299             }
300              
301             sub set_noauto {
302 0     0 0 0 my $value = _STRING($_[1]);
303 0 0       0 return unless defined $value;
304 0         0 shift->_set('auto', !$value);
305             }
306              
307             sub _set {
308 2     2   4 my $proxy = shift;
309 2         7 my ($attr, $value) = @_;
310              
311 2         56 my $old = $proxy->$attr;
312              
313 2 50       14 my $v = defined $value ? $value : "~";
314 2         10 local $Data::Dumper::Indent = 0;
315 2         10 local $Data::Dumper::Terse = 1;
316 2 50       7 if (ref $v) {
317 0         0 $v = Data::Dumper::Dumper($v);
318             }
319 2 100       7 if ($old) {
320 1 50       5 my $oldv = defined $old ? $old : "~";
321 1 50       10 $oldv = Data::Dumper::Dumper($oldv) if ref $oldv;
322 1         30 $proxy->{ctrl}->log->debug( "Changing $attr from '$oldv' to '$v'" );
323             }
324             else {
325 1         20 $proxy->{ctrl}->log->debug( "Setting $attr to '$v'" );
326             }
327 2         136 $proxy->{$attr} = $value;
328 2         17 return 1;
329             }
330              
331              
332             ## open in the proxy process before exec
333             sub prepare_child_fds {
334 0     0 0 0 my $proxy = shift;
335 0         0 my ($cr, $sw, $lw) = @_;
336              
337 0         0 $proxy->no_close_on_exec($_) for ($cr, $sw, $lw);
338              
339 0         0 $ENV{_CFK_COMMAND_FD} = fileno $cr;
340 0         0 $ENV{_CFK_STATUS_FD} = fileno $sw;
341 0         0 $ENV{_CFK_LOG_FD} = fileno $lw;
342              
343 0         0 $proxy->write_sockets_to_env;
344             }
345              
346             sub write_sockets_to_env {
347 0     0 0 0 my $proxy = shift;
348              
349 0         0 my $ctrl = $proxy->{ctrl};
350 0         0 for my $socket ($ctrl->sockets) {
351 0 0       0 my $fh = $socket->fh or next;
352              
353 0         0 $proxy->no_close_on_exec($fh);
354 0         0 my $prefix = "_CFK_SOCK_";
355 0         0 my $name = $prefix . $socket->name;
356 0         0 $ENV{$name} = fileno $fh;
357             }
358             }
359              
360             sub no_close_on_exec {
361 0     0 0 0 my $proxy = shift;
362 0         0 my $fh = shift;
363 0         0 my $flags = fcntl($fh, F_GETFD, 0);
364 0         0 fcntl($fh, F_SETFD, $flags & ~FD_CLOEXEC);
365             }
366              
367             =head2 run
368              
369             Runs the proxy command.
370              
371             =cut
372              
373             sub run {
374 5     5 1 42869 my $proxy = shift;
375 5         12 my %param = @_;
376 5   50 1   46 my $err = $param{err_cb} ||= sub {};
  1         5  
377              
378 5         119 my $name = $proxy->name;
379 5 100       119 return $proxy->_err(%param, "Proxy '$name' can't run: no command")
380             unless $proxy->cmd;
381              
382 4         25 $proxy->{is_running} = 1;
383              
384             ## Command, Status and Log pipes
385 4         31 my ($cr, $cw) = AnyEvent::Util::portable_pipe;
386 4         196 my ($sr, $sw) = AnyEvent::Util::portable_pipe;
387 4         98 my ($lr, $lw) = AnyEvent::Util::portable_pipe;
388              
389 4         119 AnyEvent::Util::fh_nonblocking($_, 1) for ($sr, $cw, $lr);
390              
391 4         158 my $cmd = $proxy->cmd;
392              
393             ## XXX redir std /dev/null
394             $proxy->{proxy_cv} = AnyEvent::Util::run_cmd(
395             $cmd,
396             '$$' => \$proxy->{pid},
397             close_all => 0,
398             on_prepare => sub {
399 0     0   0 $proxy->setup_environment;
400 0         0 $proxy->prepare_child_fds($cr, $sw, $lw);
401             },
402 4         53 );
403              
404             $proxy->{proxy_cv}->cb( sub {
405 3     3   744 my $es = shift()->recv;
406 3         41 $proxy->{proxy_cv} = undef;
407 3         8 $proxy->{pid} = undef;
408 3         20 $proxy->{exit_status} = $es;
409 3         324 my $name = $proxy->name;
410 3         25 my $state;
411 3 100 66     69 if (POSIX::WIFEXITED($es) && !POSIX::WEXITSTATUS($es)) {
    50 33        
412 1         33 $proxy->{ctrl}->log->info("proxy '$name' exited");
413             }
414             elsif (POSIX::WIFSIGNALED($es) && POSIX::WTERMSIG($es) == SIGTERM) {
415 2         72 $proxy->{ctrl}->log->info("proxy '$name' gracefully killed");
416             }
417             else {
418 0         0 my $r = ControlFreak::Util::exit_reason($es);
419 0         0 $proxy->{ctrl}->log->info(
420             "proxy '$name' abnormal termination " . $r
421             );
422             }
423              
424 3         481 $proxy->has_stopped;
425 4         16638 });
426 4         216 close $cr;
427 4         87 close $sw;
428 4         29 close $lw;
429              
430 4         24 $proxy->{status_fh} = $sr;
431 4         46 $proxy->{log_fh} = $lr;
432 4     84   184 $proxy->{status_cv} = AE::io $sr, 0, sub { $proxy->read_status };
  84         61281  
433 4     79   123 $proxy->{log_cv} = AE::io $lr, 0, sub { $proxy->read_log };
  79         5834  
434              
435             $proxy->{command_hdl} = AnyEvent::Handle->new(
436             fh => $cw,
437             on_error => sub {
438 0     0   0 my ($h, $fatal, $message) = @_;
439 0   0     0 $proxy->{ctrl}->log->error($message || "unknown proxy error");
440 0 0       0 if ($fatal) {
441 0         0 $proxy->{ctrl}->log->error("Proxy fatal error");
442 0         0 $proxy->shutdown;
443 0         0 undef $h;
444             }
445             },
446 4         274 );
447             }
448              
449             =head2 shutdown
450              
451             Quits the proxy (and consequently stops all related services).
452              
453             =cut
454              
455             sub shutdown {
456 3     3 1 1955 my $proxy = shift;
457 3         11 my %param = @_;
458              
459 3   50 3   58 my $ok = $param{ok_cb} ||= sub {};
  3         7  
460 3   50 0   42 my $err = $param{err_cb} ||= sub {};
  0         0  
461              
462 3         110 my $name = $proxy->name;
463 3         104 $proxy->{ctrl}->log->info("shutting down proxy '$name'");
464 3         595 $proxy->{command_hdl} = undef;
465              
466 3 50       48 if (my $pid = $proxy->pid) {
467 3         532 kill 'TERM', $pid;
468             }
469             ## eventually mark it has dead
470 3     0   62 $proxy->{shutdown_cv} = AE::timer 3, 0, sub { $proxy->has_stopped(1) };
  0         0  
471              
472 3         18 $ok->();
473 3         28 return 1;
474             }
475              
476             sub read_log {
477 79     79 0 216 my $proxy = shift;
478 79 50       270 my $log_fh = $proxy->{log_fh} or return;
479 79         86 my @logs;
480 79         607 while (<$log_fh>) {
481 6         35 push @logs, $_;
482             }
483 79         140 for (@logs) {
484 6 50       23 next unless $_;
485 6         23 $proxy->process_log($_);
486             }
487 79         391 return;
488             }
489              
490             sub read_status {
491 84     84 0 133 my $proxy = shift;
492 84 50       345 my $status_fh = $proxy->{status_fh} or return;
493 84         97 my @statuses;
494 84         827 while (<$status_fh>) {
495 8         91 push @statuses, $_;
496             }
497 84         279 for (@statuses) {
498 8 50       30 next unless $_;
499 8         44 $proxy->process_status($_);
500             }
501 84         326 return;
502             }
503              
504             sub process_status {
505 8     8 0 18 my $proxy = shift;
506 8         19 my $json_data = shift;
507              
508 8         42 my $ctrl = $proxy->{ctrl};
509 8         447 $ctrl->log->debug("Got a new status: $json_data");
510              
511 8         1112 my $data = decode_json($json_data);
512              
513 8         259 my $pname = $proxy->name;
514 8   50     147 my $name = $data->{name} || "";
515 8         35 my $svc = $proxy->{servicemap}{$name};
516 8         23 my $status = $data->{status};
517              
518 8 100 66     129 if ($status && $status eq 'started') {
    50 33        
519 4         14 my $pid = $data->{pid};
520 4 50       16 unless ($pid) {
521 0         0 $ctrl->log->fatal("Started '$name' without pid!");
522             }
523 4         44 $svc->assign_pid( $pid );
524 4         24 $svc->set_check_running_state_timer;
525             }
526             elsif ($status && $status eq 'stopped') {
527 4         55 $svc->acknowledge_exit($data->{exit_status});
528 4 100       200 if ($proxy->auto) {
529 1         100 my @up = grep { $_->is_up } $proxy->services;
  1         6  
530 1 50       7 unless (@up) {
531 1         5 $proxy->shutdown;
532             }
533             }
534             }
535             else {
536 0         0 $ctrl->log->fatal( "Unknown status '$status' sent to proxy '$pname'");
537             }
538             }
539              
540             sub process_log {
541 6     6 0 10 my $proxy = shift;
542 6         11 my $log_data = shift;
543              
544 6         12 my $ctrl = $proxy->{ctrl};
545 6         31 my ($type, $svcname, $msg) = split ':', $log_data, 3;
546              
547 6 50 33     48 if ($svcname && $svcname eq '-') {
548             ## this is a proxy log
549 0         0 $ctrl->log->proxy_log([ $type, $proxy, $msg ]);
550 0         0 return;
551             }
552 6         35 my $svc = $ctrl->service($svcname);
553 6 50       24 unless ($svc) {
554 6   50     16 $svcname ||= "";
555 6         13 chomp $msg;
556 6         1103 $ctrl->log->error("Cannot find svc '$svcname' for proxy log. [$msg]");
557 6         672 return;
558             }
559 0         0 $ctrl->log->proxy_svc_log([ $type, $svc, $msg ]);
560 0         0 return;
561             }
562              
563             =head2 has_stopped
564              
565             Called when the proxy has exited. It performs a number
566             of cleaning tasks.
567              
568             =cut
569              
570             sub has_stopped {
571 3     3 1 10 my $proxy = shift;
572 3         8 my $finally = shift;
573              
574             ## ignore if already dead
575 3 50       18 return unless $proxy->{is_running};
576              
577             ## cancel timer
578 3         18 $proxy->{shutdown_cv} = undef;
579              
580 3 50       28 if ($finally) {
581 0         0 my $pname = $proxy->name;
582 0         0 $proxy->{ctrl}->log->warn("Proxy '$pname' didn't clean after itself?");
583             }
584             ## not running anymore, obviously
585 3         9 $proxy->{is_running} = 0;
586 3         8 $proxy->{pid} = undef;
587              
588 3         7 $proxy->{proxy_cv} = undef;
589 3         7 $proxy->{status_cv} = undef;
590 3         52 $proxy->{status_fh} = undef;
591 3         105 $proxy->{log_cv} = undef;
592 3         161 $proxy->{log_fh} = undef;
593              
594             ## no matter what, clean the mess
595 3         60 for my $svc ($proxy->services) {
596 3         30 $svc->has_stopped("proxy stopped");
597             }
598             }
599              
600             1;