File Coverage

blib/lib/MHFS/Process.pm
Criterion Covered Total %
statement 59 244 24.1
branch 0 68 0.0
condition 0 14 0.0
subroutine 20 43 46.5
pod 0 10 0.0
total 79 379 20.8


line stmt bran cond sub pod time code
1             package MHFS::Process v0.7.0;
2 1     1   21 use 5.014;
  1         6  
3 1     1   7 use strict; use warnings;
  1     1   3  
  1         27  
  1         6  
  1         2  
  1         72  
4 1     1   7 use feature 'say';
  1         2  
  1         150  
5 1     1   7 use Symbol 'gensym';
  1         3  
  1         69  
6 1     1   6 use Time::HiRes qw( usleep clock_gettime CLOCK_REALTIME CLOCK_MONOTONIC);
  1         47  
  1         12  
7 1     1   120 use POSIX ":sys_wait_h";
  1         2  
  1         31  
8 1     1   233 use IO::Socket::INET;
  1         3  
  1         16  
9 1     1   744 use IO::Poll qw(POLLIN POLLOUT POLLHUP);
  1         3  
  1         81  
10 1     1   6 use Errno qw(EINTR EIO :POSIX);
  1         3  
  1         391  
11 1     1   9 use Fcntl qw(:seek :mode);
  1         3  
  1         275  
12 1     1   52 use File::stat;
  1         4  
  1         83  
13 1     1   1049 use IPC::Open3;
  1         4078  
  1         75  
14 1     1   9 use Scalar::Util qw(looks_like_number weaken);
  1         2  
  1         65  
15 1     1   7 use Data::Dumper;
  1         2  
  1         57  
16 1     1   9 use Devel::Peek;
  1         41  
  1         40  
17 1     1   717 use MHFS::FD::Reader;
  1         4  
  1         43  
18 1     1   589 use MHFS::FD::Writer;
  1         4  
  1         49  
19 1     1   9 use MHFS::EventLoop::Poll;
  1         3  
  1         27  
20 1     1   5 use Carp;
  1         2  
  1         4608  
21              
22             #my %CHILDREN;
23             #$SIG{CHLD} = sub {
24             # while((my $child = waitpid(-1, WNOHANG)) > 0) {
25             # my ($wstatus, $exitcode) = ($?, $?>> 8);
26             # if(defined $CHILDREN{$child}) {
27             # say "PID $child reaped (func) $exitcode";
28             # $CHILDREN{$child}->($exitcode);
29             # # remove file handles here?
30             # $CHILDREN{$child} = undef;
31             # }
32             # else {
33             # say "PID $child reaped (No func) $exitcode";
34             # }
35             # }
36             #};
37              
38             sub _setup_handlers {
39 0     0     my ($self, $in, $out, $err, $fddispatch, $handlesettings) = @_;
40 0           my $pid = $self->{'pid'};
41 0           my $evp = $self->{'evp'};
42              
43 0 0         if($fddispatch->{'SIGCHLD'}) {
44 0           say "PID $pid custom SIGCHLD handler";
45             #$CHILDREN{$pid} = $fddispatch->{'SIGCHLD'};
46 0           $evp->register_child($pid, $fddispatch->{'SIGCHLD'});
47             }
48 0 0         if($fddispatch->{'STDIN'}) {
49 0           $self->{'fd'}{'stdin'} = MHFS::FD::Writer->new($self, $in, $fddispatch->{'STDIN'});
50 0           $evp->set($in, $self->{'fd'}{'stdin'}, POLLOUT | MHFS::EventLoop::Poll->ALWAYSMASK);
51             }
52             else {
53 0           $self->{'fd'}{'stdin'}{'fd'} = $in;
54             }
55 0 0         if($fddispatch->{'STDOUT'}) {
56 0           $self->{'fd'}{'stdout'} = MHFS::FD::Reader->new($self, $out, $fddispatch->{'STDOUT'});
57 0           $evp->set($out, $self->{'fd'}{'stdout'}, POLLIN | MHFS::EventLoop::Poll->ALWAYSMASK());
58             }
59             else {
60 0           $self->{'fd'}{'stdout'}{'fd'} = $out;
61             }
62 0 0         if($fddispatch->{'STDERR'}) {
63 0           $self->{'fd'}{'stderr'} = MHFS::FD::Reader->new($self, $err, $fddispatch->{'STDERR'});
64 0           $evp->set($err, $self->{'fd'}{'stderr'}, POLLIN | MHFS::EventLoop::Poll->ALWAYSMASK);
65             }
66             else {
67 0           $self->{'fd'}{'stderr'}{'fd'} = $err;
68             }
69              
70 0 0         if($handlesettings->{'O_NONBLOCK'}) {
71             # stderr
72             {
73 0 0         my $flags = fcntl($err, Fcntl::F_GETFL, 0) or die "$!";
74 0 0         fcntl($err, Fcntl::F_SETFL, $flags | Fcntl::O_NONBLOCK) or die "$!";
75             }
76             # stdout
77             {
78 0 0         my $flags = fcntl($out, Fcntl::F_GETFL, 0) or die "$!";
  0            
  0            
79 0 0         fcntl($out, Fcntl::F_SETFL, $flags | Fcntl::O_NONBLOCK) or die "$!";
80             }
81             # stdin
82 0 0         defined($in->blocking(0)) or die($!);
83             #(0 == fcntl($in, Fcntl::F_GETFL, $flags)) or die("$!");#return undef;
84             #$flags |= Fcntl::O_NONBLOCK;
85             #(0 == fcntl($in, Fcntl::F_SETFL, $flags)) or die;#return undef;
86 0           return $self;
87             }
88             }
89              
90             sub sigkill {
91 0     0 0   my ($self, $cb) = @_;
92 0 0         if($cb) {
93 0           $self->{'evp'}{'children'}{$self->{'pid'}} = $cb;
94             }
95 0           kill('KILL', $self->{'pid'});
96             }
97              
98             sub stopSTDOUT {
99 0     0 0   my ($self) = @_;
100 0           $self->{'evp'}->set($self->{'fd'}{'stdout'}{'fd'}, $self->{'fd'}{'stdout'}, MHFS::EventLoop::Poll->ALWAYSMASK);
101             }
102              
103             sub resumeSTDOUT {
104 0     0 0   my ($self) = @_;
105 0           $self->{'evp'}->set($self->{'fd'}{'stdout'}{'fd'}, $self->{'fd'}{'stdout'}, POLLIN | MHFS::EventLoop::Poll->ALWAYSMASK);
106             }
107              
108             sub new {
109 0     0 0   my ($class, $torun, $evp, $fddispatch, $handlesettings, $env) = @_;
110 0           my %self = ('time' => clock_gettime(CLOCK_MONOTONIC), 'evp' => $evp);
111              
112              
113 0           my %oldenvvars;
114 0 0         if($env) {
115 0           foreach my $key(keys %{$env}) {
  0            
116             # save current value
117 0           $oldenvvars{$key} = $ENV{$key};
118             # set new value
119 0           $ENV{$key} = $env->{$key};
120 0   0       my $oldval = $oldenvvars{$key} // '{undef}';
121 0   0       my $newval = $env->{$key} // '{undef}';
122 0           say "Changed \$ENV{$key} from $oldval to $newval";
123             }
124             }
125              
126 0           my ($pid, $in, $out, $err);
127 0           eval{ $pid = open3($in, $out, $err = gensym, @$torun); };
  0            
128 0 0         if($@) {
129 0           say "BAD process";
130 0           return undef;
131             }
132 0           $self{'pid'} = $pid;
133 0           say 'PID '. $pid . ' NEW PROCESS: ' . $torun->[0];
134 0 0         if($env) {
135             # restore environment
136 0           foreach my $key(keys %oldenvvars) {
137 0           $ENV{$key} = $oldenvvars{$key};
138 0   0       my $oldval = $env->{$key} // '{undef}';
139 0   0       my $newval = $oldenvvars{$key} // '{undef}';
140 0           say "Restored \$ENV{$key} from $oldval to $newval";
141             }
142             }
143 0           _setup_handlers(\%self, $in, $out, $err, $fddispatch, $handlesettings);
144 0           return bless \%self, $class;
145             }
146              
147             sub _new_ex {
148 0     0     my ($make_process, $make_process_args, $context) = @_;
149 0           my $process;
150 0           $context->{'stdout'} = '';
151 0           $context->{'stderr'} = '';
152             my $prochandlers = {
153             'STDOUT' => sub {
154 0     0     my ($handle) = @_;
155 0           my $buf;
156 0           while(read($handle, $buf, 4096)) {
157 0           $context->{'stdout'} .= $buf;
158             }
159 0 0         if($context->{'on_stdout_data'}) {
160 0           $context->{'on_stdout_data'}->($context);
161             }
162 0           return 1;
163             },
164             'STDERR' => sub {
165 0     0     my ($handle) = @_;
166 0           my $buf;
167 0           while(read($handle, $buf, 4096)) {
168 0           $context->{'stderr'} .= $buf;
169             }
170 0           return 1;
171             },
172             'SIGCHLD' => sub {
173 0     0     $context->{exit_status} = $_[0];
174 0           my $obuf;
175 0           my $handle = $process->{'fd'}{'stdout'}{'fd'};
176 0           while(read($handle, $obuf, 100000)) {
177 0           $context->{'stdout'} .= $obuf;
178 0           say "stdout sigchld read";
179             }
180 0           my $ebuf;
181 0           $handle = $process->{'fd'}{'stderr'}{'fd'};
182 0           while(read($handle, $ebuf, 100000)) {
183 0           $context->{'stderr'} .= $ebuf;
184 0           say "stderr sigchld read";
185             }
186 0 0         if($context->{'on_stdout_data'}) {
187 0           $context->{'on_stdout_data'}->($context);
188             }
189 0           $context->{'at_exit'}->($context);
190             },
191 0           };
192              
193 0 0         if($context->{'input'}) {
194             $prochandlers->{'STDIN'} = sub {
195 0     0     my ($fh) = @_;
196 0           while(1) {
197 0           my $curbuf = $context->{'curbuf'};
198 0 0         if($curbuf) {
199 0           my $rv = syswrite($fh, $curbuf, length($curbuf));
200 0 0         if(!defined($rv)) {
    0          
201 0 0         if(! $!{EAGAIN}) {
202 0           say "Critical write error";
203 0           return -1;
204             }
205 0           return 1;
206             }
207             elsif($rv != length($curbuf)) {
208 0           substr($context->{'curbuf'}, 0, $rv, '');
209 0           return 1;
210             }
211             else {
212 0           say "wrote all";
213             }
214             }
215 0           $context->{'curbuf'} = $context->{'input'}->($context);
216 0 0         if(! defined $context->{'curbuf'}) {
217 0           return 0;
218             }
219             }
220 0           };
221             }
222              
223 0           $process = $make_process->($make_process_args, $prochandlers, {'O_NONBLOCK' => 1});
224 0           return $process;
225             }
226              
227             # launch a command process with poll handlers
228             sub _new_cmd {
229 0     0     my ($mpa, $prochandlers, $handlesettings) = @_;
230 0           return $mpa->{'class'}->new($mpa->{'cmd'}, $mpa->{'evp'}, $prochandlers, $handlesettings);
231             }
232              
233             # launch a command process
234             sub new_cmd_process {
235 0     0 0   my ($class, $evp, $cmd, $context) = @_;
236 0           my $mpa = {'class' => $class, 'evp' => $evp, 'cmd' => $cmd};
237 0           return _new_ex(\&_new_cmd, $mpa, $context);
238             }
239              
240             # subset of command process, just need the data on SIGCHLD
241             sub new_output_process {
242 0     0 0   my ($class, $evp, $cmd, $handler) = @_;
243              
244             return new_cmd_process($class, $evp, $cmd, {
245             'at_exit' => sub {
246 0     0     my ($context) = @_;
247 0           say 'run handler';
248 0           $handler->($context->{'stdout'}, $context->{'stderr'});
249             }
250 0           });
251             }
252              
253             sub new_io_process {
254 0     0 0   my ($class, $evp, $cmd, $handler, $inputdata) = @_;
255             my $ctx = {
256             'at_exit' => sub {
257 0     0     my ($context) = @_;
258 0           say 'run handler';
259 0           $handler->($context->{'stdout'}, $context->{'stderr'});
260             }
261 0           };
262 0 0         if(defined $inputdata) {
263 0           $ctx->{'curbuf'} = $inputdata;
264             $ctx->{'input'} = sub {
265 0     0     say "all written";
266 0           return undef;
267 0           };
268             }
269 0           return new_cmd_process($class, $evp, $cmd, $ctx);
270             }
271              
272             # launch a process without a new exe with poll handlers
273             sub _new_child {
274 0     0     my ($mpa, $prochandlers, $handlesettings) = @_;
275              
276 0           my %self = ('time' => clock_gettime(CLOCK_MONOTONIC), 'evp' => $mpa->{'evp'});
277             # inreader/inwriter is the parent to child data channel
278             # outreader/outwriter is the child to parent data channel
279             # errreader/errwriter is the child to parent log channel
280 0 0         pipe(my $inreader, my $inwriter) or die("pipe failed $!");
281 0 0         pipe(my $outreader, my $outwriter) or die("pipe failed $!");
282 0 0         pipe(my $errreader, my $errwriter) or die("pipe failed $!");
283             # the childs stderr will be UTF-8 text
284 0           binmode($errreader, ':encoding(UTF-8)');
285 0   0       my $pid = fork() // do {
286 0           say "failed to fork";
287 0           return undef;
288             };
289 0 0         if($pid == 0) {
290 0           close($inwriter);
291 0           close($outreader);
292 0           close($errreader);
293 0 0         open(STDIN, "<&", $inreader) or die("Can't dup \$inreader to STDIN");
294 0 0         open(STDOUT, ">&", $errwriter) or die("Can't dup \$errwriter to STDOUT");
295 0 0         open(STDERR, ">&", $errwriter) or die("Can't dup \$errwriter to STDERR");
296 0           $mpa->{'func'}->($outwriter);
297 0           exit 0;
298             }
299 0           close($inreader);
300 0           close($outwriter);
301 0           close($errwriter);
302 0           $self{'pid'} = $pid;
303 0           say 'PID '. $pid . ' NEW CHILD';
304 0           _setup_handlers(\%self, $inwriter, $outreader, $errreader, $prochandlers, $handlesettings);
305 0           return bless \%self, $mpa->{'class'};
306             }
307              
308             sub cmd_to_sock {
309 0     0 0   my ($name, $cmd, $sockfh) = @_;
310 0 0         if(fork() == 0) {
311 0 0         open(STDOUT, ">&", $sockfh) or die("Can't dup \$sockfh to STDOUT");
312 0           exec(@$cmd);
313 0           die;
314             }
315 0           close($sockfh);
316             }
317              
318             # launch a process without a new exe with just sigchld handler
319             sub new_output_child {
320 0     0 0   my ($class, $evp, $func, $handler) = @_;
321 0           my $mpa = {'class' => $class, 'evp' => $evp, 'func' => $func};
322             return _new_ex(\&_new_child, $mpa, {
323             'at_exit' => sub {
324 0     0     my ($context) = @_;
325 0           $handler->($context->{'stdout'}, $context->{'stderr'}, $context->{exit_status});
326             }
327 0           });
328             }
329              
330             sub remove {
331 0     0 0   my ($self, $fd) = @_;
332 0           $self->{'evp'}->remove($fd);
333 0           say "poll has " . scalar ( $self->{'evp'}{'poll'}->handles) . " handles";
334 0           foreach my $key (keys %{$self->{'fd'}}) {
  0            
335 0 0 0       if(defined($self->{'fd'}{$key}{'fd'}) && ($fd == $self->{'fd'}{$key}{'fd'})) {
336 0           $self->{'fd'}{$key} = undef;
337 0           last;
338             }
339             }
340             }
341              
342              
343             sub DESTROY {
344 0     0     my $self = shift;
345 0           say "PID " . $self->{'pid'} . ' DESTROY called';
346 0           foreach my $key (keys %{$self->{'fd'}}) {
  0            
347 0 0         if(defined($self->{'fd'}{$key}{'fd'})) {
348             #Dump($self->{'fd'}{$key});
349 0           $self->{'evp'}->remove($self->{'fd'}{$key}{'fd'});
350 0           $self->{'fd'}{$key} = undef;
351             }
352             }
353             }
354              
355             1;