File Coverage

lib/Mojo/IOLoop/ReadWriteProcess/Session.pm
Criterion Covered Total %
statement 109 117 93.1
branch 27 58 46.5
condition 11 26 42.3
subroutine 37 37 100.0
pod 16 18 88.8
total 200 256 78.1


line stmt bran cond sub pod time code
1             package Mojo::IOLoop::ReadWriteProcess::Session;
2              
3 39     39   259 use Mojo::Base 'Mojo::EventEmitter';
  39         87  
  39         258  
4 39     39   9292 use Mojo::IOLoop::ReadWriteProcess;
  39         85  
  39         406  
5 39     39   1571 use Carp 'confess';
  39         61  
  39         2727  
6 39     39   241 use POSIX qw( :sys_wait_h :signal_h );
  39         190  
  39         521  
7 39     39   17978 use Mojo::Collection 'c';
  39         92  
  39         3773  
8              
9             our @EXPORT_OK = qw(session);
10 39     39   257 use Exporter 'import';
  39         61  
  39         1541  
11              
12 39     39   197 use Config;
  39         61  
  39         3448  
13              
14 39     39   264 use constant DEBUG => $ENV{MOJO_PROCESS_DEBUG};
  39         80  
  39         3280  
15              
16             # See https://github.com/torvalds/linux/blob/master/include/uapi/linux/prctl.h
17 39     39   291 use constant PR_SET_CHILD_SUBREAPER => 36;
  39         57  
  39         2348  
18 39     39   245 use constant PR_GET_CHILD_SUBREAPER => 37;
  39         62  
  39         114197  
19              
20             has subreaper => 0;
21             has collect_status => 1;
22             has orphans => sub { {} };
23             has process_table => sub { {} };
24             has collected_info => sub { [] };
25             has 'handler';
26             has emit_from_sigchld => 1;
27              
28             my $singleton;
29              
30 428   66 428 1 2892716 sub new { $singleton ||= __PACKAGE__->SUPER::new }
31              
32             sub disable {
33 1     1 1 7 $singleton->_protect(sub { $SIG{CHLD} = $singleton->handler() });
  1     1   8  
34             }
35              
36             sub _protect {
37 915 100 66 915   20625 shift if $_[0] && $_[0] eq $singleton;
38 915 100       4198 my ($sig, $cb) = (@_ > 1 ? pop : SIGCHLD, pop);
39 915         47203 my ($sigset, $blockset) = (POSIX::SigSet->new, POSIX::SigSet->new($sig));
40 915         5825 $singleton->emit(protect => [$cb, $sig]);
41 915         17606 sigprocmask(SIG_BLOCK, $blockset, $sigset);
42 915         2738 my $r = $cb->();
43 915         11983 sigprocmask(SIG_SETMASK, $sigset);
44 915         6464 return $r;
45             }
46              
47             sub enable {
48 276 100   276 1 2204 return if $singleton->handler();
49 85         737 $singleton->handler($SIG{CHLD});
50             $singleton->_protect(
51             sub {
52             $SIG{CHLD} = sub {
53 270         34113148 local ($!, $?);
54 270         3488 $singleton->emit('SIG_CHLD');
55 270 100       8805 return unless $singleton->collect_status;
56 268         14367 while ((my $pid = waitpid(-1, WNOHANG)) > 0) {
57 276         3891 $singleton->add_collected_info($pid, $?, $!);
58             }
59             $singleton->consume_collected_info()
60 268 100       6161 if ($singleton->emit_from_sigchld());
61             }
62 85     85   1566 });
  85         1193  
63             }
64              
65             sub _collect {
66 227     227   807 my ($self, $pid, $status, $errno) = @_;
67 227         665 my $p = $singleton->resolve($pid);
68 227         1205 $p->emit('SIG_CHLD')
69             ->emit(collect_status => $pid => $status => $errno)
70             ->emit('collected')
71             ->emit('stop');
72             }
73              
74             sub collect {
75 276     276 1 1039 my ($self, $pid, $status, $errno) = @_;
76 276 100       4682 if ($singleton->resolve($pid)) {
77 227         1489 $singleton->_collect($pid => $status => $errno);
78 227         9272 $singleton->emit(collected => $singleton->resolve($pid));
79             }
80             else {
81 49         423 $singleton->orphans->{$pid}
82             = Mojo::IOLoop::ReadWriteProcess->new(process_id => $pid)
83             ->_fork_collect_status($pid => $status => $errno);
84 49         626 $singleton->emit(collected_orphan => $singleton->orphan($pid));
85             }
86 276         3584 return $singleton;
87             }
88              
89             sub consume_collected_info {
90 2593     2593 0 31321 while (my $i = shift @{$singleton->collected_info}) {
  2869         11787  
91 276         3618 $singleton->collect(@$i);
92             }
93             }
94              
95             sub add_collected_info {
96 276     276 0 666 shift;
97 276         735 push @{$singleton->collected_info}, [@_];
  276         1307  
98             }
99              
100             # Use as $pid => Mojo::IOLoop::ReadWriteProcess
101             sub register {
102 311     311 1 15795 my ($process, $pid) = (pop, pop);
103 311         8061 $singleton->process_table()->{$pid} = \$process;
104 311         8922 $singleton->emit(register => $process);
105             }
106              
107 1     1 1 6 sub unregister { delete($singleton->process_table()->{+pop()}) }
108              
109             sub _resolve {
110 785     785   2268 my ($el, $w) = (pop, pop);
111             return
112             exists $singleton->{$w}->{$el}
113             ? $w eq 'orphans'
114             ? $singleton->{$w}->{$el}
115 785 100       3631 : ${$singleton->{$w}->{$el}}
  686 100       2543  
116             : undef;
117             }
118 49     49 1 113 sub orphan { _resolve(orphans => pop()) }
119 736     736 1 2214 sub resolve { _resolve(process_table => pop()) }
120              
121             sub clean {
122             $_[0]->resolve($_)->stop() and $_[0]->resolve($_)->DESTROY()
123 2   33 2 1 6 for keys %{$_[0]->process_table()};
  2         9  
124             $_[0]->orphan($_)->stop() and $_[0]->orphan($_)->DESTROY()
125 2   0     9 for keys %{$_[0]->orphans()};
  2         9  
126 2         11 shift->reset();
127             }
128              
129 43     43 1 9878 sub all { c($singleton->all_processes, $singleton->all_orphans)->flatten }
130 55     55 1 14036 sub all_orphans { c(values %{$singleton->orphans}) }
  55         190  
131              
132             sub all_processes {
133 171     171 1 1868 c(values %{$singleton->process_table})->map(sub { ${$_} });
  171     53   424  
  53         18881  
  53         192  
134             }
135              
136             sub contains {
137 30     30 1 19480 my $pid = pop;
138 30     90   120 $singleton->all->grep(sub { $_->pid eq $pid })->size == 1;
  90         3510  
139             }
140              
141             sub reset {
142 56         1237 @{+shift}
143 56     56 1 124949 {qw(events orphans process_table collected_info handler emit_from_sigchld)}
144             = ({}, {}, {}, [], undef, 1);
145             }
146              
147             # XXX: This should be replaced by PR_GET_CHILD_SUBREAPER
148             sub disable_subreaper {
149 7 50   7 1 6458 $singleton->subreaper(
150             $singleton->_prctl(PR_SET_CHILD_SUBREAPER, 0) == 0 ? 0 : 1);
151             }
152              
153             sub enable_subreaper {
154 47 50   47 1 708 $singleton->subreaper(
155             $singleton->_prctl(PR_SET_CHILD_SUBREAPER, 1) == 0 ? 1 : 0);
156             }
157              
158             sub _get_prctl_syscall {
159              
160             # Courtesy of Sys::Prctl
161 122 50   122   585 confess "Only Linux is supported" unless $^O eq 'linux';
162              
163 122         2204 my $machine = (POSIX::uname())[4];
164 122 50       331 die "Could not get machine type" unless $machine;
165              
166             # if we're running on an x86_64 kernel, but a 32-bit process,
167             # we need to use the i386 syscall numbers.
168 122 50 33     3736 $machine = "i386" if ($machine eq "x86_64" && $Config{ptrsize} == 4);
169              
170 122 0 0     1034 my $prctl_call
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    50          
    50          
171             = $machine
172             =~ /^i[3456]86|^blackfin|cris|frv|h8300|m32r|m68k|microblaze|mn10300|sh|s390|parisc$/
173             ? 172
174             : $machine eq "x86_64" ? 157
175             : $machine eq "sparc64" ? 147
176             : $machine eq "aarch64" ? 167
177             : ($machine eq "ppc" || $machine eq "ppc64le") ? 171
178             : $machine eq "ia64" ? 1170
179             : $machine eq "alpha" ? 348
180             : $machine eq "avr32" ? 148
181             : $machine eq "mips" ? 4000 + 192
182             : $machine eq "mips64" ? 5000 + 153
183             : $machine eq "xtensa" ? 130
184             : undef;
185              
186 122 50       265 unless (defined $prctl_call) {
187             delete @INC{
188 0         0 qw
189             sys/syscall.ph>
190             };
191 0         0 my $rv = eval { require 'syscall.ph'; 1 } ## no critic
  0         0  
192 0 0       0 or eval { require 'sys/syscall.ph'; 1 }; ## no critic
  0         0  
  0         0  
193              
194 0         0 $prctl_call = eval { &SYS_prctl; };
  0         0  
195             }
196 122         1287 return $prctl_call;
197             }
198              
199             sub _prctl {
200 61     61   1217 my ($self, $option, $arg2, $arg3, $arg4, $arg5) = @_;
201 61 50       194 confess 'prctl not supported in this platform!'
202             unless defined _get_prctl_syscall;
203 61         248 local $!;
204 61   100     208 my $ret = syscall(
      50        
      50        
      50        
205             _get_prctl_syscall(), $option,
206             ($arg2 or 0),
207             ($arg3 or 0),
208             ($arg4 or 0),
209             ($arg5 or 0));
210              
211 61 50       962 warn "prctl($option) is unavailable on this platform." if $!{EINVAL};
212 61 50       981 warn "Error! $!" if $!;
213 61         368 return $ret;
214             }
215              
216             *singleton = \&new;
217             *session = \&new;
218             *protect = \&_protect;
219              
220             1;
221              
222             =encoding utf-8
223              
224             =head1 NAME
225              
226             Mojo::IOLoop::ReadWriteProcess::Session - Session manager for handling child processes.
227              
228             =head1 SYNOPSIS
229              
230             use Mojo::IOLoop::ReadWriteProcess::Session;
231             use Mojo::IOLoop::ReadWriteProcess qw(process);
232              
233             my $session = process()->session; # or Mojo::IOLoop::ReadWriteProcess::Session->singleton
234              
235             $session->enable; # Modifies your SIG_CHLD
236              
237             $session->on(collected => sub { warn "Process ".(shift->pid)." collected! "});
238             $session->on(collected_orphan => sub { warn "Orphan process collected! "});
239              
240             $session->enable_subreaper(); # Mark the current process as subreaper
241             $session->disable_subreaper(); # Disable subreaper
242              
243             $session->reset(); # Resets events and clear the process tables
244             $session->clean(); # Stop all processes that result as running and reset
245              
246              
247             =head1 DESCRIPTION
248              
249             Mojo::IOLoop::ReadWriteProcess::Session is a session manager for the collected processes
250              
251             =head1 EVENTS
252              
253             L inherits all events from L and can emit
254             the following new ones.
255              
256             =head2 SIG_CHLD
257              
258             $session->on(SIG_CHLD => sub {
259             my ($self) = @_;
260             ...
261             });
262              
263             Emitted when we receive SIG_CHLD.
264              
265             =head2 collected
266              
267             $session->on(collected => sub {
268             my ($self, $process) = @_;
269             ...
270             });
271              
272             Emitted when child process is collected and it's return status is available.
273              
274             =head2 protect
275              
276             $session->on(protect => sub {
277             my ($self, $detail) = @_;
278             my ($cb, $signal) = @$detail;
279             ...
280             });
281              
282             Emitted when protected callbacks are fired.
283              
284             =head2 collected_orphan
285              
286             $session->on(collected_orphan => sub {
287             my ($self, $process) = @_;
288             $process->pid;
289             $process->exit_status;
290             ...
291             });
292              
293             Emitted when child process is collected and it's exit status is available.
294             Note: here are collected processes that weren't created with L.
295              
296             =head2 register
297              
298             $session->on(register => sub {
299             my ($self, $process) = @_;
300             $process->pid;
301             $process->exit_status;
302             ...
303             });
304              
305             Emitted when a process is registering to a session.
306              
307             =head1 ATTRIBUTES
308              
309             L inherits all attributes from L and implements
310             the following new ones.
311              
312              
313             =head2 subreaper
314              
315             use Mojo::IOLoop::ReadWriteProcess::Session qw(session);
316             session->enable_subreaper;
317             my $process = Mojo::IOLoop::ReadWriteProcess->new(code => sub { print "Hello ".$_[1] }, args => "User" );
318             $process->start();
319             $process->on( stop => sub { shift()->disable_subreaper } );
320             $process->stop();
321              
322             # The process will print "Hello User"
323              
324             Mark the current process (not the child) as subreaper on start.
325             It's on invoker behalf to disable subreaper when process stops, as it marks the current process and not the
326             child.
327              
328             =head2 collect_status
329              
330             Defaults to C<1>, If enabled it will automatically collect the status of the children process.
331             Disable it in case you want to manage your process child directly, and do not want to rely on
332             automatic collect status. If you won't overwrite your C handler,
333             the C event will be still emitted.
334              
335             =head2 handler()
336              
337             use Mojo::IOLoop::ReadWriteProcess::Session qw(session);
338             session->handler(sub {});
339              
340             Default handler for SIG_CHLD processing, used when C is invoked.
341              
342             =head1 METHODS
343              
344             L inherits all methods from L and implements
345             the following new ones.
346              
347             =head2 enable()
348              
349             use Mojo::IOLoop::ReadWriteProcess::Session qw(session);
350             session->enable();
351              
352             Sets the SIG_CHLD handler.
353              
354             =head2 disable()
355              
356             use Mojo::IOLoop::ReadWriteProcess::Session qw(session);
357             session->disable();
358              
359             Disables the SIG_CHLD handler and reset with the previous one.
360              
361             =head2 enable_subreaper()
362              
363             use Mojo::IOLoop::ReadWriteProcess qw(process);
364             my $p = process()->enable_subreaper;
365             # or
366             use Mojo::IOLoop::ReadWriteProcess::Session qw(session);
367             session->enable_subreaper;
368              
369             Mark the current process (not the child) as subreaper.
370             This is used typically if you want to mark further children as subreapers inside other forks.
371              
372             use Mojo::IOLoop::ReadWriteProcess::Session qw(session);
373              
374             my $master_p = process(
375             sub {
376             my $p = shift;
377             $p->enable_subreaper;
378              
379             process(sub { sleep 4; exit 1 })->start();
380             process(
381             sub {
382             sleep 4;
383             process(sub { sleep 1; })->start();
384             })->start();
385             process(sub { sleep 4; exit 0 })->start();
386             process(sub { sleep 4; die })->start();
387             my $manager
388             = process(sub { sleep 2 })->subreaper(1)->start();
389             sleep 1 for (0 .. 10);
390             $manager->stop;
391             return session->all->size;
392             });
393              
394             $master_p->subreaper(1);
395             $master_p->on(collect_status => sub { $status++ });
396              
397             $master_p->on(stop => sub { shift()->disable_subreaper });
398             $master_p->start();
399             session->all->size();
400             ....
401              
402             =head2 disable_subreaper()
403              
404             use Mojo::IOLoop::ReadWriteProcess qw(process);
405             my $p = process()->disable_subreaper;
406              
407             Unset the current process as subreaper.
408              
409             =head2 prctl()
410              
411             use Mojo::IOLoop::ReadWriteProcess qw(process);
412             my $p = process();
413             $p->prctl($option, $arg2, $arg3, $arg4, $arg5);
414              
415             Internal function to execute and wrap the prctl syscall, accepts the same arguments as prctl.
416              
417             =head2 reset()
418              
419             use Mojo::IOLoop::ReadWriteProcess qw(session);
420             session->reset;
421              
422             Wipe the process tables.
423              
424             =head2 clean()
425              
426             use Mojo::IOLoop::ReadWriteProcess qw(session);
427             session->clean;
428              
429             Wipe the process tables, but before attempt to stop running procesess.
430              
431             =head2 all()
432              
433             use Mojo::IOLoop::ReadWriteProcess::Session qw(session);
434             my $collection = session->all;
435             $collection->size;
436              
437             Returns a L of L that belongs to a session.
438              
439             =head2 all_orphans()
440              
441             use Mojo::IOLoop::ReadWriteProcess::Session qw(session);
442             my $collection = session->all_orphans;
443             $collection->size;
444              
445             Returns a L of L of orphaned processes that belongs to a session.
446             They are automatically turned into a L, also if processes were created by C.
447              
448             =head2 all_processes()
449              
450             use Mojo::IOLoop::ReadWriteProcess::Session qw(session);
451             my $collection = session->all_processes;
452             $collection->size;
453              
454             Returns a L of all L known processes that belongs to a session.
455              
456             =head2 contains()
457              
458             use Mojo::IOLoop::ReadWriteProcess::Session qw(session);
459             my $collection = session->contains(13443);
460             $collection->size;
461              
462             Returns true if the pid is contained in any of the process tables.
463              
464             =head2 resolve()
465              
466             use Mojo::IOLoop::ReadWriteProcess::Session qw(session);
467             my $process = session->resolve(12233);
468              
469             Returns the L process identified by its pid if belongs to the process table.
470              
471             =head2 orphan()
472              
473             use Mojo::IOLoop::ReadWriteProcess::Session qw(session);
474             my $process = session->orphan(12233);
475              
476             Returns the L process identified by its pid if belongs to the process table of unknown processes.
477              
478             =head2 register()
479              
480             use Mojo::IOLoop::ReadWriteProcess::Session qw(session);
481             my $process = session->register('pid' => Mojo::IOLoop::ReadWriteProcess->new);
482              
483             Register the L process to the session.
484              
485             =head2 unregister()
486              
487             use Mojo::IOLoop::ReadWriteProcess::Session qw(session);
488             my $process = session->unregister(123342);
489              
490             Unregister the corresponding L with the given pid.
491              
492             =head2 collect()
493              
494             use Mojo::IOLoop::ReadWriteProcess::Session qw(session);
495             my $process = session->collect(123342 => 0 => undef);
496              
497             Collect the status for the given pid.
498              
499             =head2 protect()
500              
501             use Mojo::IOLoop::ReadWriteProcess::Session qw(session);
502             use POSIX;
503              
504             my $return = session->protect(sub { print "Hello World\n" });
505              
506             session->protect(sub { print "Hello World\n" } => SIGTERM);
507              
508             Try to protect the execution of the callback from signal interrupts.
509              
510             =head1 EXPORTS
511              
512             =head2 session()
513              
514             use Mojo::IOLoop::ReadWriteProcess::Session qw(session);
515             session->enable_subreaper;
516              
517             Returns the L singleton.
518              
519             =head1 DEBUGGING
520              
521             You can set the MOJO_EVENTEMITTER_DEBUG environment variable to get some advanced diagnostics information printed to STDERR.
522              
523             MOJO_EVENTEMITTER_DEBUG=1
524              
525             Also, you can set MOJO_PROCESS_DEBUG environment variable to get diagnostics about the process execution.
526              
527             MOJO_PROCESS_DEBUG=1
528              
529             =head1 LICENSE
530              
531             Copyright (C) Ettore Di Giacinto.
532              
533             This library is free software; you can redistribute it and/or modify
534             it under the same terms as Perl itself.
535              
536             =head1 AUTHOR
537              
538             Ettore Di Giacinto Eedigiacinto@suse.comE
539              
540             =cut