File Coverage

lib/Mojo/IOLoop/ReadWriteProcess/Session.pm
Criterion Covered Total %
statement 109 117 93.1
branch 27 58 46.5
condition 11 26 42.3
subroutine 37 37 100.0
pod 16 18 88.8
total 200 256 78.1


line stmt bran cond sub pod time code
1             package Mojo::IOLoop::ReadWriteProcess::Session;
2              
3 38     38   271 use Mojo::Base 'Mojo::EventEmitter';
  38         62  
  38         246  
4 38     38   5988 use Mojo::IOLoop::ReadWriteProcess;
  38         80  
  38         397  
5 38     38   1577 use Carp 'confess';
  38         83  
  38         2854  
6 38     38   265 use POSIX qw( :sys_wait_h :signal_h );
  38         91  
  38         418  
7 38     38   15495 use Mojo::Collection 'c';
  38         78  
  38         3417  
8              
9             our @EXPORT_OK = qw(session);
10 38     38   289 use Exporter 'import';
  38         76  
  38         1547  
11              
12 38     38   225 use Config;
  38         83  
  38         1973  
13              
14 38     38   1056 use constant DEBUG => $ENV{MOJO_PROCESS_DEBUG};
  38         129  
  38         2694  
15              
16             # See https://github.com/torvalds/linux/blob/master/include/uapi/linux/prctl.h
17 38     38   239 use constant PR_SET_CHILD_SUBREAPER => 36;
  38         76  
  38         2117  
18 38     38   245 use constant PR_GET_CHILD_SUBREAPER => 37;
  38         75  
  38         94556  
19              
20             has subreaper => 0;
21             has collect_status => 1;
22             has orphans => sub { {} };
23             has process_table => sub { {} };
24             has collected_info => sub { [] };
25             has 'handler';
26             has emit_from_sigchld => 1;
27              
28             my $singleton;
29              
30 426   66 426 1 43922 sub new { $singleton ||= __PACKAGE__->SUPER::new }
31              
32             sub disable {
33 1     1 1 6 $singleton->_protect(sub { $SIG{CHLD} = $singleton->handler() });
  1     1   7  
34             }
35              
36             sub _protect {
37 807 100 66 807   14993 shift if $_[0] && $_[0] eq $singleton;
38 807 100       3052 my ($sig, $cb) = (@_ > 1 ? pop : SIGCHLD, pop);
39 807         9823 my ($sigset, $blockset) = (POSIX::SigSet->new, POSIX::SigSet->new($sig));
40 807         4879 $singleton->emit(protect => [$cb, $sig]);
41 807         15377 sigprocmask(SIG_BLOCK, $blockset, $sigset);
42 807         3296 my $r = $cb->();
43 807         10450 sigprocmask(SIG_SETMASK, $sigset);
44 807         6107 return $r;
45             }
46              
47             sub enable {
48 274 100   274 1 2740 return if $singleton->handler();
49 83         688 $singleton->handler($SIG{CHLD});
50             $singleton->_protect(
51             sub {
52             $SIG{CHLD} = sub {
53 251         34897620 local ($!, $?);
54 251         3473 $singleton->emit('SIG_CHLD');
55 251 100       8250 return unless $singleton->collect_status;
56 249         11897 while ((my $pid = waitpid(-1, WNOHANG)) > 0) {
57 271         3963 $singleton->add_collected_info($pid, $?, $!);
58             }
59             $singleton->consume_collected_info()
60 249 100       7264 if ($singleton->emit_from_sigchld());
61             }
62 83     83   1688 });
  83         1673  
63             }
64              
65             sub _collect {
66 222     222   1309 my ($self, $pid, $status, $errno) = @_;
67 222         969 my $p = $singleton->resolve($pid);
68 222         1317 $p->emit('SIG_CHLD')->emit(collect_status => $pid => $status => $errno)
69             ->emit('collected')->emit('stop');
70             }
71              
72             sub collect {
73 271     271 1 1605 my ($self, $pid, $status, $errno) = @_;
74 271 100       2003 if ($singleton->resolve($pid)) {
75 222         1529 $singleton->_collect($pid => $status => $errno);
76 222         11088 $singleton->emit(collected => $singleton->resolve($pid));
77             }
78             else {
79 49         602 $singleton->orphans->{$pid}
80             = Mojo::IOLoop::ReadWriteProcess->new(process_id => $pid)
81             ->_fork_collect_status($pid => $status => $errno);
82 49         829 $singleton->emit(collected_orphan => $singleton->orphan($pid));
83             }
84 271         3835 return $singleton;
85             }
86              
87             sub consume_collected_info {
88 2411     2411 0 24596 while (my $i = shift @{$singleton->collected_info}) {
  2682         9401  
89 271         4665 $singleton->collect(@$i);
90             }
91             }
92              
93             sub add_collected_info {
94 271     271 0 713 shift;
95 271         675 push @{$singleton->collected_info}, [@_];
  271         1393  
96             }
97              
98             # Use as $pid => Mojo::IOLoop::ReadWriteProcess
99             sub register {
100 309     309 1 10400 my ($process, $pid) = (pop, pop);
101 309         6486 $singleton->process_table()->{$pid} = \$process;
102 309         8050 $singleton->emit(register => $process);
103             }
104              
105 1     1 1 5 sub unregister { delete($singleton->process_table()->{+pop()}) }
106              
107             sub _resolve {
108 770     770   2933 my ($el, $w) = (pop, pop);
109             return
110             exists $singleton->{$w}->{$el}
111             ? $w eq 'orphans'
112             ? $singleton->{$w}->{$el}
113 770 100       4239 : ${$singleton->{$w}->{$el}}
  671 100       2981  
114             : undef;
115             }
116 49     49 1 140 sub orphan { _resolve(orphans => pop()) }
117 721     721 1 2147 sub resolve { _resolve(process_table => pop()) }
118              
119             sub clean {
120             $_[0]->resolve($_)->stop() and $_[0]->resolve($_)->DESTROY()
121 2   33 2 1 4 for keys %{$_[0]->process_table()};
  2         9  
122             $_[0]->orphan($_)->stop() and $_[0]->orphan($_)->DESTROY()
123 2   0     9 for keys %{$_[0]->orphans()};
  2         6  
124 2         10 shift->reset();
125             }
126              
127 43     43 1 12971 sub all { c($singleton->all_processes, $singleton->all_orphans)->flatten }
128 55     55 1 12878 sub all_orphans { c(values %{$singleton->orphans}) }
  55         204  
129              
130             sub all_processes {
131 171     171 1 2111 c(values %{$singleton->process_table})->map(sub { ${$_} });
  171     53   450  
  53         17175  
  53         199  
132             }
133              
134             sub contains {
135 30     30 1 17060 my $pid = pop;
136 30     90   160 $singleton->all->grep(sub { $_->pid eq $pid })->size == 1;
  90         3030  
137             }
138              
139             sub reset {
140 56         1563 @{+shift}
141 56     56 1 103395 {qw(events orphans process_table collected_info handler emit_from_sigchld)}
142             = ({}, {}, {}, [], undef, 1);
143             }
144              
145             # XXX: This should be replaced by PR_GET_CHILD_SUBREAPER
146             sub disable_subreaper {
147 7 50   7 1 4599 $singleton->subreaper(
148             $singleton->_prctl(PR_SET_CHILD_SUBREAPER, 0) == 0 ? 0 : 1);
149             }
150              
151             sub enable_subreaper {
152 47 50   47 1 1367 $singleton->subreaper(
153             $singleton->_prctl(PR_SET_CHILD_SUBREAPER, 1) == 0 ? 1 : 0);
154             }
155              
156             sub _get_prctl_syscall {
157              
158             # Courtesy of Sys::Prctl
159 122 50   122   853 confess "Only Linux is supported" unless $^O eq 'linux';
160              
161 122         1563 my $machine = (POSIX::uname())[4];
162 122 50       546 die "Could not get machine type" unless $machine;
163              
164             # if we're running on an x86_64 kernel, but a 32-bit process,
165             # we need to use the i386 syscall numbers.
166 122 50 33     4977 $machine = "i386" if ($machine eq "x86_64" && $Config{ptrsize} == 4);
167              
168 122 0 0     1263 my $prctl_call
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    50          
    50          
169             = $machine
170             =~ /^i[3456]86|^blackfin|cris|frv|h8300|m32r|m68k|microblaze|mn10300|sh|s390|parisc$/
171             ? 172
172             : $machine eq "x86_64" ? 157
173             : $machine eq "sparc64" ? 147
174             : $machine eq "aarch64" ? 167
175             : ($machine eq "ppc" || $machine eq "ppc64le") ? 171
176             : $machine eq "ia64" ? 1170
177             : $machine eq "alpha" ? 348
178             : $machine eq "avr32" ? 148
179             : $machine eq "mips" ? 4000 + 192
180             : $machine eq "mips64" ? 5000 + 153
181             : $machine eq "xtensa" ? 130
182             : undef;
183              
184 122 50       640 unless (defined $prctl_call) {
185             delete @INC{
186 0         0 qw
187             sys/syscall.ph>
188             };
189 0         0 my $rv = eval { require 'syscall.ph'; 1 } ## no critic
  0         0  
190 0 0       0 or eval { require 'sys/syscall.ph'; 1 }; ## no critic
  0         0  
  0         0  
191              
192 0         0 $prctl_call = eval { &SYS_prctl; };
  0         0  
193             }
194 122         1870 return $prctl_call;
195             }
196              
197             sub _prctl {
198 61     61   1667 my ($self, $option, $arg2, $arg3, $arg4, $arg5) = @_;
199 61 50       203 confess 'prctl not supported in this platform!'
200             unless defined _get_prctl_syscall;
201 61         323 local $!;
202 61   100     193 my $ret = syscall(
      50        
      50        
      50        
203             _get_prctl_syscall(), $option,
204             ($arg2 or 0),
205             ($arg3 or 0),
206             ($arg4 or 0),
207             ($arg5 or 0));
208              
209 61 50       1654 warn "prctl($option) is unavailable on this platform." if $!{EINVAL};
210 61 50       1246 warn "Error! $!" if $!;
211 61         476 return $ret;
212             }
213              
214             *singleton = \&new;
215             *session = \&new;
216             *protect = \&_protect;
217              
218             1;
219              
220             =encoding utf-8
221              
222             =head1 NAME
223              
224             Mojo::IOLoop::ReadWriteProcess::Session - Session manager for handling child processes.
225              
226             =head1 SYNOPSIS
227              
228             use Mojo::IOLoop::ReadWriteProcess::Session;
229             use Mojo::IOLoop::ReadWriteProcess qw(process);
230              
231             my $session = process()->session; # or Mojo::IOLoop::ReadWriteProcess::Session->singleton
232              
233             $session->enable; # Modifies your SIG_CHLD
234              
235             $session->on(collected => sub { warn "Process ".(shift->pid)." collected! "});
236             $session->on(collected_orphan => sub { warn "Orphan process collected! "});
237              
238             $session->enable_subreaper(); # Mark the current process as subreaper
239             $session->disable_subreaper(); # Disable subreaper
240              
241             $session->reset(); # Resets events and clear the process tables
242             $session->clean(); # Stop all processes that result as running and reset
243              
244              
245             =head1 DESCRIPTION
246              
247             Mojo::IOLoop::ReadWriteProcess::Session is a session manager for the collected processes
248              
249             =head1 EVENTS
250              
251             L inherits all events from L and can emit
252             the following new ones.
253              
254             =head2 SIG_CHLD
255              
256             $session->on(SIG_CHLD => sub {
257             my ($self) = @_;
258             ...
259             });
260              
261             Emitted when we receive SIG_CHLD.
262              
263             =head2 collected
264              
265             $session->on(collected => sub {
266             my ($self, $process) = @_;
267             ...
268             });
269              
270             Emitted when child process is collected and it's return status is available.
271              
272             =head2 protect
273              
274             $session->on(protect => sub {
275             my ($self, $detail) = @_;
276             my ($cb, $signal) = @$detail;
277             ...
278             });
279              
280             Emitted when protected callbacks are fired.
281              
282             =head2 collected_orphan
283              
284             $session->on(collected_orphan => sub {
285             my ($self, $process) = @_;
286             $process->pid;
287             $process->exit_status;
288             ...
289             });
290              
291             Emitted when child process is collected and it's exit status is available.
292             Note: here are collected processes that weren't created with L.
293              
294             =head2 register
295              
296             $session->on(register => sub {
297             my ($self, $process) = @_;
298             $process->pid;
299             $process->exit_status;
300             ...
301             });
302              
303             Emitted when a process is registering to a session.
304              
305             =head1 ATTRIBUTES
306              
307             L inherits all attributes from L and implements
308             the following new ones.
309              
310              
311             =head2 subreaper
312              
313             use Mojo::IOLoop::ReadWriteProcess::Session qw(session);
314             session->enable_subreaper;
315             my $process = Mojo::IOLoop::ReadWriteProcess->new(code => sub { print "Hello ".$_[1] }, args => "User" );
316             $process->start();
317             $process->on( stop => sub { shift()->disable_subreaper } );
318             $process->stop();
319              
320             # The process will print "Hello User"
321              
322             Mark the current process (not the child) as subreaper on start.
323             It's on invoker behalf to disable subreaper when process stops, as it marks the current process and not the
324             child.
325              
326             =head2 collect_status
327              
328             Defaults to C<1>, If enabled it will automatically collect the status of the children process.
329             Disable it in case you want to manage your process child directly, and do not want to rely on
330             automatic collect status. If you won't overwrite your C handler,
331             the C event will be still emitted.
332              
333             =head2 handler()
334              
335             use Mojo::IOLoop::ReadWriteProcess::Session qw(session);
336             session->handler(sub {});
337              
338             Default handler for SIG_CHLD processing, used when C is invoked.
339              
340             =head1 METHODS
341              
342             L inherits all methods from L and implements
343             the following new ones.
344              
345             =head2 enable()
346              
347             use Mojo::IOLoop::ReadWriteProcess::Session qw(session);
348             session->enable();
349              
350             Sets the SIG_CHLD handler.
351              
352             =head2 disable()
353              
354             use Mojo::IOLoop::ReadWriteProcess::Session qw(session);
355             session->disable();
356              
357             Disables the SIG_CHLD handler and reset with the previous one.
358              
359             =head2 enable_subreaper()
360              
361             use Mojo::IOLoop::ReadWriteProcess qw(process);
362             my $p = process()->enable_subreaper;
363             # or
364             use Mojo::IOLoop::ReadWriteProcess::Session qw(session);
365             session->enable_subreaper;
366              
367             Mark the current process (not the child) as subreaper.
368             This is used typically if you want to mark further children as subreapers inside other forks.
369              
370             use Mojo::IOLoop::ReadWriteProcess::Session qw(session);
371              
372             my $master_p = process(
373             sub {
374             my $p = shift;
375             $p->enable_subreaper;
376              
377             process(sub { sleep 4; exit 1 })->start();
378             process(
379             sub {
380             sleep 4;
381             process(sub { sleep 1; })->start();
382             })->start();
383             process(sub { sleep 4; exit 0 })->start();
384             process(sub { sleep 4; die })->start();
385             my $manager
386             = process(sub { sleep 2 })->subreaper(1)->start();
387             sleep 1 for (0 .. 10);
388             $manager->stop;
389             return session->all->size;
390             });
391              
392             $master_p->subreaper(1);
393             $master_p->on(collect_status => sub { $status++ });
394              
395             $master_p->on(stop => sub { shift()->disable_subreaper });
396             $master_p->start();
397             session->all->size();
398             ....
399              
400             =head2 disable_subreaper()
401              
402             use Mojo::IOLoop::ReadWriteProcess qw(process);
403             my $p = process()->disable_subreaper;
404              
405             Unset the current process as subreaper.
406              
407             =head2 prctl()
408              
409             use Mojo::IOLoop::ReadWriteProcess qw(process);
410             my $p = process();
411             $p->prctl($option, $arg2, $arg3, $arg4, $arg5);
412              
413             Internal function to execute and wrap the prctl syscall, accepts the same arguments as prctl.
414              
415             =head2 reset()
416              
417             use Mojo::IOLoop::ReadWriteProcess qw(session);
418             session->reset;
419              
420             Wipe the process tables.
421              
422             =head2 clean()
423              
424             use Mojo::IOLoop::ReadWriteProcess qw(session);
425             session->clean;
426              
427             Wipe the process tables, but before attempt to stop running procesess.
428              
429             =head2 all()
430              
431             use Mojo::IOLoop::ReadWriteProcess::Session qw(session);
432             my $collection = session->all;
433             $collection->size;
434              
435             Returns a L of L that belongs to a session.
436              
437             =head2 all_orphans()
438              
439             use Mojo::IOLoop::ReadWriteProcess::Session qw(session);
440             my $collection = session->all_orphans;
441             $collection->size;
442              
443             Returns a L of L of orphaned processes that belongs to a session.
444             They are automatically turned into a L, also if processes were created by C.
445              
446             =head2 all_processes()
447              
448             use Mojo::IOLoop::ReadWriteProcess::Session qw(session);
449             my $collection = session->all_processes;
450             $collection->size;
451              
452             Returns a L of all L known processes that belongs to a session.
453              
454             =head2 contains()
455              
456             use Mojo::IOLoop::ReadWriteProcess::Session qw(session);
457             my $collection = session->contains(13443);
458             $collection->size;
459              
460             Returns true if the pid is contained in any of the process tables.
461              
462             =head2 resolve()
463              
464             use Mojo::IOLoop::ReadWriteProcess::Session qw(session);
465             my $process = session->resolve(12233);
466              
467             Returns the L process identified by its pid if belongs to the process table.
468              
469             =head2 orphan()
470              
471             use Mojo::IOLoop::ReadWriteProcess::Session qw(session);
472             my $process = session->orphan(12233);
473              
474             Returns the L process identified by its pid if belongs to the process table of unknown processes.
475              
476             =head2 register()
477              
478             use Mojo::IOLoop::ReadWriteProcess::Session qw(session);
479             my $process = session->register('pid' => Mojo::IOLoop::ReadWriteProcess->new);
480              
481             Register the L process to the session.
482              
483             =head2 unregister()
484              
485             use Mojo::IOLoop::ReadWriteProcess::Session qw(session);
486             my $process = session->unregister(123342);
487              
488             Unregister the corresponding L with the given pid.
489              
490             =head2 collect()
491              
492             use Mojo::IOLoop::ReadWriteProcess::Session qw(session);
493             my $process = session->collect(123342 => 0 => undef);
494              
495             Collect the status for the given pid.
496              
497             =head2 protect()
498              
499             use Mojo::IOLoop::ReadWriteProcess::Session qw(session);
500             use POSIX;
501              
502             my $return = session->protect(sub { print "Hello World\n" });
503              
504             session->protect(sub { print "Hello World\n" } => SIGTERM);
505              
506             Try to protect the execution of the callback from signal interrupts.
507              
508             =head1 EXPORTS
509              
510             =head2 session()
511              
512             use Mojo::IOLoop::ReadWriteProcess::Session qw(session);
513             session->enable_subreaper;
514              
515             Returns the L singleton.
516              
517             =head1 DEBUGGING
518              
519             You can set the MOJO_EVENTEMITTER_DEBUG environment variable to get some advanced diagnostics information printed to STDERR.
520              
521             MOJO_EVENTEMITTER_DEBUG=1
522              
523             Also, you can set MOJO_PROCESS_DEBUG environment variable to get diagnostics about the process execution.
524              
525             MOJO_PROCESS_DEBUG=1
526              
527             =head1 LICENSE
528              
529             Copyright (C) Ettore Di Giacinto.
530              
531             This library is free software; you can redistribute it and/or modify
532             it under the same terms as Perl itself.
533              
534             =head1 AUTHOR
535              
536             Ettore Di Giacinto Eedigiacinto@suse.comE
537              
538             =cut