File Coverage

lib/Mojo/IOLoop/ReadWriteProcess/Session.pm
Criterion Covered Total %
statement 101 109 92.6
branch 23 56 41.0
condition 11 26 42.3
subroutine 35 35 100.0
pod 16 16 100.0
total 186 242 76.8


line stmt bran cond sub pod time code
1             package Mojo::IOLoop::ReadWriteProcess::Session;
2              
3 38     38   256 use Mojo::Base 'Mojo::EventEmitter';
  38         63  
  38         247  
4 38     38   5760 use Mojo::IOLoop::ReadWriteProcess;
  38         77  
  38         265  
5 38     38   1145 use Carp 'confess';
  38         149  
  38         2499  
6 38     38   281 use POSIX qw( :sys_wait_h :signal_h );
  38         105  
  38         343  
7 38     38   14788 use Mojo::Collection 'c';
  38         76  
  38         3390  
8              
9             our @EXPORT_OK = qw(session);
10 38     38   286 use Exporter 'import';
  38         65  
  38         1294  
11              
12 38     38   235 use Config;
  38         73  
  38         2590  
13              
14 38     38   630 use constant DEBUG => $ENV{MOJO_PROCESS_DEBUG};
  38         98  
  38         2858  
15              
16             # See https://github.com/torvalds/linux/blob/master/include/uapi/linux/prctl.h
17 38     38   265 use constant PR_SET_CHILD_SUBREAPER => 36;
  38         75  
  38         2137  
18 38     38   245 use constant PR_GET_CHILD_SUBREAPER => 37;
  38         63  
  38         79793  
19              
20             has subreaper => 0;
21             has collect_status => 1;
22             has orphans => sub { {} };
23             has process_table => sub { {} };
24             has 'handler';
25              
26             my $singleton;
27              
28 409   66 409 1 2062079 sub new { $singleton ||= __PACKAGE__->SUPER::new }
29              
30             sub disable {
31 1     1 1 6 $singleton->_protect(sub { $SIG{CHLD} = $singleton->handler() });
  1     1   8  
32             }
33              
34             sub _protect {
35 840 100 66 840   17988 shift if $_[0] && $_[0] eq $singleton;
36 840 100       4082 my ($sig, $cb) = (@_ > 1 ? pop : SIGCHLD, pop);
37 840         16320 my ($sigset, $blockset) = (POSIX::SigSet->new, POSIX::SigSet->new($sig));
38 840         11750 $singleton->emit(protect => [$cb, $sig]);
39 840         19371 sigprocmask(SIG_BLOCK, $blockset, $sigset);
40 840         3227 my $r = $cb->();
41 840         10030 sigprocmask(SIG_SETMASK, $sigset);
42 840         7044 return $r;
43             }
44              
45             sub enable {
46 241     241 1 9608 $singleton->handler($SIG{CHLD});
47             $singleton->_protect(
48             sub {
49             $SIG{CHLD} = sub {
50 241         45303515 local ($!, $?);
51 241         3124 $singleton->emit('SIG_CHLD');
52 241 100       6651 return unless $singleton->collect_status;
53 239         11882 while ((my $pid = waitpid(-1, WNOHANG)) > 0) {
54 256         2121 $singleton->collect($pid => $? => $!);
55             }
56             }
57 241     241   23956 });
  241         10310  
58             }
59              
60             sub _collect {
61 209     209   1803 my ($self, $pid, $status, $errno) = @_;
62 209         872 my $p = $singleton->resolve($pid);
63 209         1266 $p->emit('SIG_CHLD')->emit(collect_status => $pid => $status => $errno)
64             ->emit('collected')->emit('stop');
65             }
66              
67             sub collect {
68 256     256 1 2377 my ($errno, $status, $pid) = (pop, pop, pop);
69 256 100       1975 if ($singleton->resolve($pid)) {
70 209         1551 $singleton->_collect($pid => $status => $errno);
71 209         10203 $singleton->emit(collected => $singleton->resolve($pid));
72             }
73             else {
74 47         505 $singleton->orphans->{$pid}
75             = Mojo::IOLoop::ReadWriteProcess->new(process_id => $pid)
76             ->_fork_collect_status($pid => $status => $errno);
77 47         734 $singleton->emit(collected_orphan => $singleton->orphan($pid));
78             }
79 256         65028193 return $singleton;
80             }
81              
82             # Use as $pid => Mojo::IOLoop::ReadWriteProcess
83             sub register {
84 296     296 1 5916 my ($process, $pid) = (pop, pop);
85 296         2345 $singleton->process_table()->{$pid} = \$process;
86 296         6280 $singleton->emit(register => $process);
87             }
88              
89 1     1 1 4 sub unregister { delete($singleton->process_table()->{+pop()}) }
90              
91             sub _resolve {
92 727     727   1838 my ($el, $w) = (pop, pop);
93             return
94             exists $singleton->{$w}->{$el}
95             ? $w eq 'orphans'
96             ? $singleton->{$w}->{$el}
97 727 100       3734 : ${$singleton->{$w}->{$el}}
  632 100       2546  
98             : undef;
99             }
100 47     47 1 151 sub orphan { _resolve(orphans => pop()) }
101 680     680 1 2634 sub resolve { _resolve(process_table => pop()) }
102              
103             sub clean {
104             $_[0]->resolve($_)->stop() and $_[0]->resolve($_)->DESTROY()
105 2   33 2 1 4 for keys %{$_[0]->process_table()};
  2         8  
106             $_[0]->orphan($_)->stop() and $_[0]->orphan($_)->DESTROY()
107 2   0     8 for keys %{$_[0]->orphans()};
  2         7  
108 2         11 shift->reset();
109             }
110              
111 43     43 1 10493 sub all { c($singleton->all_processes, $singleton->all_orphans)->flatten }
112 55     55 1 11542 sub all_orphans { c(values %{$singleton->orphans}) }
  55         204  
113              
114             sub all_processes {
115 171     171 1 1902 c(values %{$singleton->process_table})->map(sub { ${$_} });
  171     53   423  
  53         20082  
  53         203  
116             }
117              
118             sub contains {
119 30     30 1 15060 my $pid = pop;
120 30     90   110 $singleton->all->grep(sub { $_->pid eq $pid })->size == 1;
  90         3190  
121             }
122              
123 53     53 1 94759 sub reset { @{+shift}{qw(events orphans process_table)} = ({}, {}, {}) }
  53         8740  
124              
125             # XXX: This should be replaced by PR_GET_CHILD_SUBREAPER
126             sub disable_subreaper {
127 7 50   7 1 4115 $singleton->subreaper(
128             $singleton->_prctl(PR_SET_CHILD_SUBREAPER, 0) == 0 ? 0 : 1);
129             }
130              
131             sub enable_subreaper {
132 47 50   47 1 458 $singleton->subreaper(
133             $singleton->_prctl(PR_SET_CHILD_SUBREAPER, 1) == 0 ? 1 : 0);
134             }
135              
136             sub _get_prctl_syscall {
137              
138             # Courtesy of Sys::Prctl
139 122 50   122   848 confess "Only Linux is supported" unless $^O eq 'linux';
140              
141 122         1330 my $machine = (POSIX::uname())[4];
142 122 50       479 die "Could not get machine type" unless $machine;
143              
144             # if we're running on an x86_64 kernel, but a 32-bit process,
145             # we need to use the i386 syscall numbers.
146 122 50 33     4418 $machine = "i386" if ($machine eq "x86_64" && $Config{ptrsize} == 4);
147              
148 122 0 0     1387 my $prctl_call
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    50          
    50          
149             = $machine
150             =~ /^i[3456]86|^blackfin|cris|frv|h8300|m32r|m68k|microblaze|mn10300|sh|s390|parisc$/
151             ? 172
152             : $machine eq "x86_64" ? 157
153             : $machine eq "sparc64" ? 147
154             : $machine eq "aarch64" ? 167
155             : ($machine eq "ppc" || $machine eq "ppc64le") ? 171
156             : $machine eq "ia64" ? 1170
157             : $machine eq "alpha" ? 348
158             : $machine eq "arm" ? 0x900000 + 172
159             : $machine eq "avr32" ? 148
160             : $machine eq "mips" ? 4000 + 192
161             : $machine eq "mips64" ? 5000 + 153
162             : $machine eq "xtensa" ? 130
163             : undef;
164              
165 122 50       431 unless (defined $prctl_call) {
166             delete @INC{
167 0         0 qw
168             sys/syscall.ph>
169             };
170 0         0 my $rv = eval { require 'syscall.ph'; 1 } ## no critic
  0         0  
171 0 0       0 or eval { require 'sys/syscall.ph'; 1 }; ## no critic
  0         0  
  0         0  
172              
173 0         0 $prctl_call = eval { &SYS_prctl; };
  0         0  
174             }
175 122         2168 return $prctl_call;
176             }
177              
178             sub _prctl {
179 61     61   1061 my ($self, $option, $arg2, $arg3, $arg4, $arg5) = @_;
180 61 50       210 confess 'prctl not supported in this platform!'
181             unless defined _get_prctl_syscall;
182 61         277 local $!;
183 61   100     184 my $ret = syscall(
      50        
      50        
      50        
184             _get_prctl_syscall(), $option,
185             ($arg2 or 0),
186             ($arg3 or 0),
187             ($arg4 or 0),
188             ($arg5 or 0));
189              
190 61 50       1231 warn "prctl($option) is unavailable on this platform." if $!{EINVAL};
191 61 50       1131 warn "Error! $!" if $!;
192 61         507 return $ret;
193             }
194              
195             *singleton = \&new;
196             *session = \&new;
197             *protect = \&_protect;
198              
199             1;
200              
201             =encoding utf-8
202              
203             =head1 NAME
204              
205             Mojo::IOLoop::ReadWriteProcess::Session - Session manager for handling child processes.
206              
207             =head1 SYNOPSIS
208              
209             use Mojo::IOLoop::ReadWriteProcess::Session;
210             use Mojo::IOLoop::ReadWriteProcess qw(process);
211              
212             my $session = process()->session; # or Mojo::IOLoop::ReadWriteProcess::Session->singleton
213              
214             $session->enable; # Modifies your SIG_CHLD
215              
216             $session->on(collected => sub { warn "Process ".(shift->pid)." collected! "});
217             $session->on(collected_orphan => sub { warn "Orphan process collected! "});
218              
219             $session->enable_subreaper(); # Mark the current process as subreaper
220             $session->disable_subreaper(); # Disable subreaper
221              
222             $session->reset(); # Resets events and clear the process tables
223             $session->clean(); # Stop all processes that result as running and reset
224              
225              
226             =head1 DESCRIPTION
227              
228             Mojo::IOLoop::ReadWriteProcess::Session is a session manager for the collected processes
229              
230             =head1 EVENTS
231              
232             L inherits all events from L and can emit
233             the following new ones.
234              
235             =head2 SIG_CHLD
236              
237             $session->on(SIG_CHLD => sub {
238             my ($self) = @_;
239             ...
240             });
241              
242             Emitted when we receive SIG_CHLD.
243              
244             =head2 collected
245              
246             $session->on(collected => sub {
247             my ($self, $process) = @_;
248             ...
249             });
250              
251             Emitted when child process is collected and it's return status is available.
252              
253             =head2 protect
254              
255             $session->on(protect => sub {
256             my ($self, $detail) = @_;
257             my ($cb, $signal) = @$detail;
258             ...
259             });
260              
261             Emitted when protected callbacks are fired.
262              
263             =head2 collected_orphan
264              
265             $session->on(collected_orphan => sub {
266             my ($self, $process) = @_;
267             $process->pid;
268             $process->exit_status;
269             ...
270             });
271              
272             Emitted when child process is collected and it's exit status is available.
273             Note: here are collected processes that weren't created with L.
274              
275             =head2 register
276              
277             $session->on(register => sub {
278             my ($self, $process) = @_;
279             $process->pid;
280             $process->exit_status;
281             ...
282             });
283              
284             Emitted when a process is registering to a session.
285              
286             =head1 ATTRIBUTES
287              
288             L inherits all attributes from L and implements
289             the following new ones.
290              
291              
292             =head2 subreaper
293              
294             use Mojo::IOLoop::ReadWriteProcess::Session qw(session);
295             session->enable_subreaper;
296             my $process = Mojo::IOLoop::ReadWriteProcess->new(code => sub { print "Hello ".$_[1] }, args => "User" );
297             $process->start();
298             $process->on( stop => sub { shift()->disable_subreaper } );
299             $process->stop();
300              
301             # The process will print "Hello User"
302              
303             Mark the current process (not the child) as subreaper on start.
304             It's on invoker behalf to disable subreaper when process stops, as it marks the current process and not the
305             child.
306              
307             =head2 collect_status
308              
309             Defaults to C<1>, If enabled it will automatically collect the status of the children process.
310             Disable it in case you want to manage your process child directly, and do not want to rely on
311             automatic collect status. If you won't overwrite your C handler,
312             the C event will be still emitted.
313              
314             =head2 handler()
315              
316             use Mojo::IOLoop::ReadWriteProcess::Session qw(session);
317             session->handler(sub {});
318              
319             Default handler for SIG_CHLD processing, used when C is invoked.
320              
321             =head1 METHODS
322              
323             L inherits all methods from L and implements
324             the following new ones.
325              
326             =head2 enable()
327              
328             use Mojo::IOLoop::ReadWriteProcess::Session qw(session);
329             session->enable();
330              
331             Sets the SIG_CHLD handler.
332              
333             =head2 disable()
334              
335             use Mojo::IOLoop::ReadWriteProcess::Session qw(session);
336             session->disable();
337              
338             Disables the SIG_CHLD handler and reset with the previous one.
339              
340             =head2 enable_subreaper()
341              
342             use Mojo::IOLoop::ReadWriteProcess qw(process);
343             my $p = process()->enable_subreaper;
344             # or
345             use Mojo::IOLoop::ReadWriteProcess::Session qw(session);
346             session->enable_subreaper;
347              
348             Mark the current process (not the child) as subreaper.
349             This is used typically if you want to mark further childs as subreapers inside other forks.
350              
351             use Mojo::IOLoop::ReadWriteProcess::Session qw(session);
352              
353             my $master_p = process(
354             sub {
355             my $p = shift;
356             $p->enable_subreaper;
357              
358             process(sub { sleep 4; exit 1 })->start();
359             process(
360             sub {
361             sleep 4;
362             process(sub { sleep 1; })->start();
363             })->start();
364             process(sub { sleep 4; exit 0 })->start();
365             process(sub { sleep 4; die })->start();
366             my $manager
367             = process(sub { sleep 2 })->subreaper(1)->start();
368             sleep 1 for (0 .. 10);
369             $manager->stop;
370             return session->all->size;
371             });
372              
373             $master_p->subreaper(1);
374             $master_p->on(collect_status => sub { $status++ });
375              
376             $master_p->on(stop => sub { shift()->disable_subreaper });
377             $master_p->start();
378             session->all->size();
379             ....
380              
381             =head2 disable_subreaper()
382              
383             use Mojo::IOLoop::ReadWriteProcess qw(process);
384             my $p = process()->disable_subreaper;
385              
386             Unset the current process as subreaper.
387              
388             =head2 prctl()
389              
390             use Mojo::IOLoop::ReadWriteProcess qw(process);
391             my $p = process();
392             $p->prctl($option, $arg2, $arg3, $arg4, $arg5);
393              
394             Internal function to execute and wrap the prctl syscall, accepts the same arguments as prctl.
395              
396             =head2 reset()
397              
398             use Mojo::IOLoop::ReadWriteProcess qw(session);
399             session->reset;
400              
401             Wipe the process tables.
402              
403             =head2 clean()
404              
405             use Mojo::IOLoop::ReadWriteProcess qw(session);
406             session->clean;
407              
408             Wipe the process tables, but before attempt to stop running procesess.
409              
410             =head2 all()
411              
412             use Mojo::IOLoop::ReadWriteProcess::Session qw(session);
413             my $collection = session->all;
414             $collection->size;
415              
416             Returns a L of L that belongs to a session.
417              
418             =head2 all_orphans()
419              
420             use Mojo::IOLoop::ReadWriteProcess::Session qw(session);
421             my $collection = session->all_orphans;
422             $collection->size;
423              
424             Returns a L of L of orphaned processes that belongs to a session.
425             They are automatically turned into a L, also if processes were created by C.
426              
427             =head2 all_processes()
428              
429             use Mojo::IOLoop::ReadWriteProcess::Session qw(session);
430             my $collection = session->all_processes;
431             $collection->size;
432              
433             Returns a L of all L known processes that belongs to a session.
434              
435             =head2 contains()
436              
437             use Mojo::IOLoop::ReadWriteProcess::Session qw(session);
438             my $collection = session->contains(13443);
439             $collection->size;
440              
441             Returns true if the pid is contained in any of the process tables.
442              
443             =head2 resolve()
444              
445             use Mojo::IOLoop::ReadWriteProcess::Session qw(session);
446             my $process = session->resolve(12233);
447              
448             Returns the L process identified by its pid if belongs to the process table.
449              
450             =head2 orphan()
451              
452             use Mojo::IOLoop::ReadWriteProcess::Session qw(session);
453             my $process = session->orphan(12233);
454              
455             Returns the L process identified by its pid if belongs to the process table of unknown processes.
456              
457             =head2 register()
458              
459             use Mojo::IOLoop::ReadWriteProcess::Session qw(session);
460             my $process = session->register('pid' => Mojo::IOLoop::ReadWriteProcess->new);
461              
462             Register the L process to the session.
463              
464             =head2 unregister()
465              
466             use Mojo::IOLoop::ReadWriteProcess::Session qw(session);
467             my $process = session->unregister(123342);
468              
469             Unregister the corresponding L with the given pid.
470              
471             =head2 collect()
472              
473             use Mojo::IOLoop::ReadWriteProcess::Session qw(session);
474             my $process = session->collect(123342 => 0 => undef);
475              
476             Collect the status for the given pid.
477              
478             =head2 protect()
479              
480             use Mojo::IOLoop::ReadWriteProcess::Session qw(session);
481             use POSIX;
482              
483             my $return = session->protect(sub { print "Hello World\n" });
484              
485             session->protect(sub { print "Hello World\n" } => SIGTERM);
486              
487             Try to protect the execution of the callback from signal interrupts.
488              
489             =head1 EXPORTS
490              
491             =head2 session()
492              
493             use Mojo::IOLoop::ReadWriteProcess::Session qw(session);
494             session->enable_subreaper;
495              
496             Returns the L singleton.
497              
498             =head1 DEBUGGING
499              
500             You can set the MOJO_EVENTEMITTER_DEBUG environment variable to get some advanced diagnostics information printed to STDERR.
501              
502             MOJO_EVENTEMITTER_DEBUG=1
503              
504             Also, you can set MOJO_PROCESS_DEBUG environment variable to get diagnostics about the process execution.
505              
506             MOJO_PROCESS_DEBUG=1
507              
508             =head1 LICENSE
509              
510             Copyright (C) Ettore Di Giacinto.
511              
512             This library is free software; you can redistribute it and/or modify
513             it under the same terms as Perl itself.
514              
515             =head1 AUTHOR
516              
517             Ettore Di Giacinto Eedigiacinto@suse.comE
518              
519             =cut