File Coverage

blib/lib/POE/Component/Supervisor/Handle/Proc.pm
Criterion Covered Total %
statement 1 3 33.3
branch n/a
condition n/a
subroutine 1 1 100.0
pod n/a
total 2 4 50.0


line stmt bran cond sub pod time code
1             #!/usr/bin/perl
2              
3             package POE::Component::Supervisor::Handle::Proc;
4 1     1   2541 use MooseX::POE;
  0            
  0            
5              
6             use POSIX qw(WIFSIGNALED WIFEXITED WEXITSTATUS WTERMSIG);
7              
8             use POE::Wheel::Run;
9              
10             use namespace::clean -except => 'meta';
11              
12             with qw(
13             POE::Component::Supervisor::Handle
14             POE::Component::Supervisor::LogDispatch
15             );
16              
17             use Time::HiRes qw(time);
18              
19             use Scalar::Util ();
20              
21             has wheel_parameters => (
22             isa => "HashRef",
23             is => "ro",
24             auto_deref => 1,
25             default => sub { +{ } },
26             );
27              
28             has enable_nested_poe => (
29             isa => "Bool",
30             is => "ro",
31             default => 1,
32             );
33              
34             has start_nested_poe => (
35             isa => "Bool",
36             is => "ro",
37             default => 1,
38             );
39              
40             has [map { "std${_}_callback" } qw(out err in)] => (
41             isa => "CodeRef",
42             is => "rw",
43             required => 0,
44             );
45              
46             has program => (
47             isa => 'ArrayRef|CodeRef',
48             is => "ro",
49             required => 1,
50             );
51              
52             has until_term => (
53             isa => "Num|Undef",
54             is => "ro",
55             default => 0.1,
56             );
57              
58             has until_kill => (
59             isa => "Num|Undef",
60             is => "ro",
61             default => 10,
62             );
63              
64             has wait_for => (
65             isa => "Num|Undef",
66             is => "ro",
67             lazy => 1,
68             predicate => "has_wait_for",
69             default => sub {
70             my $self = shift;
71             5 + ( $self->until_kill || $self->until_term || 0 );
72             },
73             );
74              
75             has _wheel => (
76             isa => "POE::Wheel::Run",
77             is => "rw",
78             init_arg => undef,
79             clearer => "_clear_wheel",
80             );
81              
82             has pid => (
83             isa => "Int",
84             is => "ro",
85             init_arg => undef,
86             writer => "_pid",
87             );
88              
89             has exited => (
90             isa => "Int",
91             is => "rw",
92             init_arg => undef,
93             required => 0,
94             predicate => "has_exited",
95             writer => "_exited",
96             );
97              
98             has exite_code => (
99             isa => "Int",
100             is => "rw",
101             init_arg => undef,
102             required => 0,
103             predicate => "has_exit_code",
104             writer => "_exit_code",
105             );
106              
107             has exit_signal => (
108             isa => "Int",
109             is => "rw",
110             init_arg => undef,
111             required => 0,
112             predicate => "has_exit_signal",
113             writer => "_exit_signal",
114             );
115              
116             sub STOP {
117             $_[OBJECT]->logger->debug("stopping child handle session $_[SESSION]");
118             }
119              
120             sub START {
121             my ( $self, $kernel ) = @_[OBJECT, KERNEL];
122              
123             $kernel->refcount_increment( $self->get_session_id, __PACKAGE__ );
124              
125             my $program = $self->_wrapped_program;
126              
127             my $wheel = POE::Wheel::Run->new(
128             StderrEvent => "stderr",
129             StdoutEvent => "stdout",
130             StdinEvent => "stdin",
131             $self->wheel_parameters,
132             Program => $program,
133             );
134            
135             my $pid = $wheel->PID;
136              
137             $self->_wheel($wheel);
138             $self->_pid($pid);
139              
140             $self->notify_spawn( pid => $pid );
141              
142             $kernel->sig_child( $wheel->PID, "child_exit" );
143             }
144              
145             sub _wrapped_program {
146             my ( $self, $program ) = @_;
147              
148             $program ||= $self->program;
149              
150             if ( ref($program) eq 'CODE' ) {
151             if ( $self->enable_nested_poe ) {
152             my $also_start = $self->start_nested_poe;
153             return sub {
154             my @args = @_;
155              
156             $poe_kernel->stop;
157              
158             $program->(@args);
159              
160             $poe_kernel->run if $also_start;
161             },
162             }
163             }
164              
165             return $program;
166             }
167              
168             foreach my $event (qw(stdout stderr stdin)) {
169             my $cb_name = "${event}_callback";
170             event $event => sub {
171             if ( my $cb = $_[OBJECT]->$cb_name ) {
172             $cb->(@_);
173             }
174             };
175             }
176              
177             event child_exit => sub {
178             my ( $self, $exit ) = @_[OBJECT, ARG2];
179              
180             my $exit_code = WIFEXITED($exit) ? WEXITSTATUS($exit) : undef;
181             my $exit_signal = WIFSIGNALED($exit) ? WTERMSIG($exit) : undef;
182              
183             $self->_exited($exit);
184             $self->_exit_code($exit_code) if defined $exit_code;
185             $self->_exit_signal($exit_signal) if defined($exit_signal);
186              
187             $self->logger->info("child exited with status " . ($exit_code || "undef") . " ($exit), notifying supervisor");
188              
189             $self->notify_stop(
190             pid => $self->pid,
191             exit => $exit,
192             exit_code => $exit_code,
193             exit_signal => $exit_signal,
194             );
195              
196             $self->call("_cleanup");
197              
198             };
199              
200             event _cleanup => sub {
201             my ( $self, $kernel ) = @_[OBJECT, KERNEL];
202              
203             if ( my $wheel = $self->_wheel ) {
204             $wheel->shutdown_stdin;
205             $self->_clear_wheel;
206             }
207            
208             $kernel->alarm_remove_all();
209              
210             $kernel->refcount_decrement( $self->get_session_id, __PACKAGE__ );
211             };
212              
213             sub stop {
214             my $self = shift;
215              
216             $self->call("_stop_child");
217             }
218              
219             event _stop_child => sub {
220             my ( $self, $kernel, $heap ) = @_[OBJECT, KERNEL, HEAP];
221              
222             $self->call("_close_stdin");
223              
224             my $now = time;
225              
226             my ( $until_term, $until_kill ) = ( $self->until_term, $self->until_kill );
227              
228             my $start_term = defined($until_term) && $now + $until_term;
229             my $start_kill = defined($until_kill) && $now + $until_kill;
230              
231             my $give_up = $self->has_wait_for && $now + $self->wait_for;
232              
233             $kernel->alarm_set( _term_loop => $start_term, $start_kill || $give_up ) if $start_term;
234              
235             $kernel->alarm_set( _kill_loop => $start_kill, $give_up ) if $start_kill;
236              
237             $kernel->alarm_set( _couldnt_kill => $give_up ) if $give_up;
238             };
239              
240             event _close_stdin => sub {
241             my ( $self, $kernel ) = @_[OBJECT, KERNEL];
242              
243             $self->logger->info("closing child stdin");
244              
245             if ( my $wheel = $self->_wheel ) {
246             $wheel->shutdown_stdin;
247             }
248             };
249              
250             foreach my $sig (qw(term kill)) {
251             my $SIG = uc($sig);
252              
253             my $event = "_${sig}_loop";
254              
255             event $event => sub {
256             my ( $self, $kernel, $until, $iter ) = @_[OBJECT, KERNEL, ARG0 .. $#_];
257              
258             $iter ||= 0;
259             my $delay = 2 ** $iter / 10; # exponential back off
260             my $next_attempt = time() + $delay;
261              
262             if ( !defined($until) or $next_attempt < $until ) {
263             $kernel->alarm_set( $event, $next_attempt, $until, $iter + 1 );
264             } else {
265             undef $delay;
266             }
267              
268             $self->logger->info("sending SIG$SIG, attempt #" . ( $iter + 1) . ( $delay ? ", next attempt in $delay" : " (last attempt)" ));
269              
270             $self->_wheel->kill($SIG);
271             };
272             }
273              
274             event _couldnt_kill => sub {
275             die "couldn't kill child";
276             };
277              
278             sub is_running {
279             my $self = shift;
280             not $self->has_exited;
281             }
282              
283             __PACKAGE__
284              
285             __END__
286              
287             =pod
288              
289             =head1 NAME
290              
291             POE::Component::Supervisor::Handle::Proc - A supervisor child handle for a POSIXish process.
292              
293             =head1 SYNOPSIS
294              
295             # created by POE::Component::Supervisor::Supervised::Proc
296              
297             =head1 DESCRIPTION
298              
299             These objects manage a real UNIX process (signalling, monitoring) within a
300             L<POE::Component::Supervisor>.
301              
302             =head1 SIGNALLING
303              
304             In order to kill a child process first the child's standard input is closed,
305             then the C<TERM> signal is sent, and after a wait period the C<KILL> signal is
306             sent.
307              
308             If the child has not died by the time the C<KILL> loop times out then an error
309             is thrown (this happens under weird OS scenarios and shouldn't happen
310             normally).
311              
312             The attributes C<until_term>, C<until_kill> and C<wait_for> determine the
313             durations of these loops.
314              
315             Initially inputs will be closed. Then, after C<until_term> seconds have passed
316             the C<TERM> sending loop will start, sending the C<TERM> signal with an
317             exponential backoff.
318              
319             When C<until_kill> seconds have passed, from the time of the C<stop> method
320             being called, the C<TERM> loop will be stopped, and instead the C<KILL> signal
321             will be sent, also with an exponential backoff.
322              
323             From the time of the C<stop> method being called the handle will wait for a
324             maximum of C<wait_for> seconds before giving up on the child process.
325              
326             ANy of these attributes may be set to C<undef> to disable their corresponding
327             behaviors (suppress sending of a certain signal, or wait indefinitely).
328              
329             =head1 ATTRIBUTES
330              
331             B<NOTE>: All the attributes are generally passsed in by
332             L<POE::Component::Supervisor::Supervised::Proc>, the factory for this class.
333              
334             They are documented here because that is where their behavior is defined.
335              
336             L<POE::Component::Supervisor::Supervised::Proc> will borrow all the attributes
337             from this class that have an C<init_arg>, and as such they should be passed to
338             L<POE::Component::Supervisor::Supervised::Proc/new>, while this class is never
339             instantiated directly..
340              
341             =over 4
342              
343             =item until_term
344              
345             The time to wait after closing inputs, and before sending the C<TERM> signal.
346             Defaults to one tenth of a second.
347              
348             Set to C<undef> to disable sending the C<TERM> signal.
349              
350             =item until_kill
351              
352             The time to wait after closing inputs, and before sending the C<KILL> signal.
353             Defaults to 10 seconds.
354              
355             Set to C<undef> to disable sending the C<KILL> signal.
356              
357             =item wait_for
358              
359             How long to keep sending exit signals for.
360              
361             Defaults to
362              
363             5 + ( $self->until_kill || $self->until_term || 0 )
364              
365             =item enable_nested_poe
366              
367             Whether or not to call L<POE::Kernel/stop> in the child program, before the
368             callback. Only applies to code references.
369              
370             This allows a nested POE kernel to be started in the forked environment without
371             needing to C<exec> a new program.
372              
373             Defaults to true.
374              
375             =item start_nested_poe
376              
377             Whether or not to call L<POE::Kernel/run> in the child program, after the callback. Only applies to
378             code references.
379              
380             Defaults to true.
381              
382             =item program
383              
384             A coderef or an array ref. Passed as the C<Program> parameter to the wheel, but
385             may be wrapped depending on the values of C<enable_nested_poe> and
386             C<start_nested_poe> if it's a code ref.
387              
388             Required.
389              
390             =item wheel_parameters
391              
392             Additional parameters to pass to L<POE::Wheel::Run/new>.
393              
394             =item stdin_callback
395              
396             =item stdout_callback
397              
398             =item stderr_callback
399              
400             Callbacks to be fired when the corresponding L<POE::Wheel::Run> events are
401             handled.
402              
403             This only affects the default event handlers, if you ovverride those by passing
404             your own C<wheel_parameters> these callbacks will never take effect.
405              
406             The arguments are passed through as is, see L<POE::Wheel::Run> for the details.
407              
408             Not required.
409              
410             =item pid
411              
412             Read only attribute containing the process ID.
413              
414             =item exited
415              
416             =item exit_code
417              
418             =item exit_signal
419              
420             After the process has exited these read only attributes are filled in with the exit information.
421              
422             C<exited> is the raw value of C<$?>, and C<exit_code> and C<exit_signal> are
423             the values of applying C<WEXITSTATUS> and C<WTERMSIG> to that value.
424              
425             See L<POSIX> for details.
426              
427             =item use_logger_singleton
428              
429             Changes the default value of the original L<MooseX::LogDispatch> attribute to
430             true.
431              
432             =back
433              
434             =head1 METHODS
435              
436             =over 4
437              
438             =item new
439              
440             Never called directly, but called by L<POE::Component::Supervisor::Supervised::Proc>.
441              
442             =item stop
443              
444             Stop the running process
445              
446             =item is_running
447              
448             Check whether or not the process is still running.
449              
450             =back
451              
452             =head1 EVENTS
453              
454             All L<POE> events supported by this object are currently internal, and as such
455             the session corresponding to this object provides no useful L<POE> interface.
456              
457             =cut
458              
459