File Coverage

lib/App/MtAws/ForkEngine.pm
Criterion Covered Total %
statement 119 153 77.7
branch 12 28 42.8
condition 2 3 66.6
subroutine 19 24 79.1
pod 0 9 0.0
total 152 217 70.0


line stmt bran cond sub pod time code
1             # mt-aws-glacier - Amazon Glacier sync client
2             # Copyright (C) 2012-2014 Victor Efimov
3             # http://mt-aws.com (also http://vs-dev.com) vs@vs-dev.com
4             # License: GPLv3
5             #
6             # This file is part of "mt-aws-glacier"
7             #
8             # mt-aws-glacier is free software: you can redistribute it and/or modify
9             # it under the terms of the GNU General Public License as published by
10             # the Free Software Foundation, either version 3 of the License, or
11             # (at your option) any later version.
12             #
13             # mt-aws-glacier is distributed in the hope that it will be useful,
14             # but WITHOUT ANY WARRANTY; without even the implied warranty of
15             # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16             # GNU General Public License for more details.
17             #
18             # You should have received a copy of the GNU General Public License
19             # along with this program. If not, see <http://www.gnu.org/licenses/>.
20              
21             package App::MtAws::ForkEngine;
22              
23             our $VERSION = '1.114_2';
24              
25 18     18   323681 use strict;
  18         159  
  18         510  
26 18     18   80 use warnings;
  18         20  
  18         432  
27 18     18   56 use utf8;
  18         29  
  18         94  
28 18     18   9583 use IO::Select;
  18         15234  
  18         667  
29 18     18   74 use IO::Pipe;
  18         164  
  18         308  
30 18     18   80 use IO::Handle;
  18         30  
  18         471  
31 18     18   71 use Carp;
  18         33  
  18         739  
32 18     18   5174 use App::MtAws::ChildWorker;
  18         39  
  18         517  
33 18     18   7616 use App::MtAws::ParentWorker;
  18         30  
  18         438  
34 18     18   81 use App::MtAws::Utils;
  18         20  
  18         1999  
35 18     18   77 use App::MtAws::Exceptions;
  18         30  
  18         921  
36 18     18   70 use POSIX;
  18         25  
  18         111  
37              
38 18     18   26209 use Exporter 'import';
  18         23  
  18         18909  
39              
40             our @EXPORT_OK = qw/with_forks fork_engine/;
41              
42             # some DSL
43              
44             our $FE = undef;
45              
46             sub fork_engine()
47             {
48 0 0   0 0 0 $FE||confess;
49             }
50              
51             sub with_forks($$&)
52             {
53 0     0 0 0 my ($flag, $options, $cb) = @_;
54 0         0 local $FE = undef;
55 0 0       0 if ($flag) {
56 0         0 $FE = App::MtAws::ForkEngine->new(options => $options);
57 0         0 $FE->start_children();
58 0 0       0 if (defined eval {$cb->(); 1;}) {
  0         0  
  0         0  
59 0         0 $FE->terminate_children();
60             } else {
61 0         0 dump_error(q{parent});
62 0         0 $FE->terminate_children();
63 0         0 exit(1);
64             }
65             } else {
66 0         0 $cb->();
67             }
68             }
69              
70             # class
71              
72             sub new
73             {
74 64     64 0 85198 my ($class, %args) = @_;
75 64         132 my $self = \%args;
76 64 50       238 $self->{options}||die;
77 64         165 $self->{children} = {};
78             # $self->{disp_select}||die;
79             # @{$self->{freeworkers}} = keys %{$self->{children}};
80 64         126 bless $self, $class;
81 64         210 return $self;
82             }
83              
84             sub run_children
85             {
86 0     0 0 0 my ($self, $child_fromchild, $child_tochild) = @_;
87 0         0 my $C = App::MtAws::ChildWorker->new(options => $self->{options}, fromchild => $child_fromchild, tochild => $child_tochild);
88 0 0       0 dump_error("child $$") unless (defined eval {$C->process(); 1;});
  0         0  
  0         0  
89             }
90              
91             sub run_parent
92             {
93 0     0 0 0 my ($self, $disp_select) = @_;
94 0         0 return $self->{parent_worker} = App::MtAws::ParentWorker->new(children => $self->{children}, disp_select => $disp_select, options=>$self->{options});
95             }
96              
97             sub parent_exit_on_signal
98             {
99 0     0 0 0 my ($self, $sig, $status) = @_;
100 0         0 my $status_str = do {
101 0 0       0 if (defined $status) {
102 0         0 my $exit_code = $status >> 8;
103 0         0 my $signal = $status & 127;
104 0 0       0 if ($signal) {
105 0         0 " (signal $signal, exit_code $exit_code)";
106             } else {
107 0         0 " (exit_code $exit_code)";
108             }
109             } else {
110 0         0 "";
111             }
112             };
113 0         0 print STDERR "\nEXIT on SIG$sig$status_str\n";
114 0         0 exit(1);
115             }
116              
117             sub start_children
118             {
119 64     64 0 311 my ($self) = @_;
120             # parent's data
121 64         568 my $disp_select = IO::Select->new();
122 64         860 $self->{parent_pid} = $$;
123             # child/parent code
124 64         203 for my $n (1..$self->{options}->{concurrency}) {
125 64         777 my ($ischild, $child_fromchild, $child_tochild) = $self->create_child($disp_select);
126 64 100       653 if ($ischild) {
127             # child code
128 9         28 my $first_time = 1;
129 9         301 my @signals = qw/INT TERM USR2 HUP/;
130 9         124 for my $sig (@signals) {
131             $SIG{$sig} = sub {
132 9 50   9   320847 if ($first_time) {
133 9         36 $first_time = 0;
134 9         2646 exit(1); # we need exit, it will call all destructors which will destroy tempfiles
135             }
136 36         597 };
137             }
138 9         117 $self->run_children($child_fromchild, $child_tochild);
139 0         0 exit(1);
140             }
141             }
142              
143 55         93 my $first_time = 1;
144 55         181 for my $sig (qw/INT TERM CHLD USR1 HUP/) {
145             $SIG{$sig} = sub {
146 65     65   56602653 local ($!,$^E,$@);
147 65         277 my $status = undef;
148 65 100       1233 if ($sig eq 'CHLD') {
149 55         1407 my $pid = waitpid(-1, WNOHANG);
150 55         221 $status = $?;
151             # make sure we caugth signal from our children, not from external command executionin 3rd party module
152             # easy to test by adding `whoami` to parent after-fork-code
153 55 100 66     1145 return unless $pid > 0 and defined delete $self->{children}{$pid}; # we also remove $pid
154             }
155 46 50       1055 if ($first_time) {
156 46         97 $first_time = 0;
157 46         78 kill (POSIX::SIGUSR2, keys %{$self->{children}});
  46         295  
158 46         20876906 while( wait() != -1 ){};
159 46         565 $self->parent_exit_on_signal($sig, $status);
160             }
161 275         2693 };
162             }
163              
164 55         255 return $self->run_parent($disp_select);
165             }
166              
167             #
168             # child/parent code
169             #
170             sub create_child
171             {
172 64     64 0 116 my ($self, $disp_select) = @_;
173              
174 64         558 my $fromchild = new IO::Pipe;
175             #log("created fromchild pipe $!", 10) if level(10);
176 64         5852 my $tochild = new IO::Pipe;
177             #log("created tochild pipe $!", 10) if level(10);
178 64         2650 my $pid;
179 64         142 my $parent_pid = $$;
180              
181 64 100       67686 if($pid = fork()) { # Parent
    50          
182 55         794 $|=1;
183 55         2643 STDERR->autoflush(1);
184 55         6727 $fromchild->reader();
185 55         5045 $fromchild->autoflush(1);
186 55         1702 $fromchild->blocking(1);
187 55         186 binmode $fromchild;
188              
189 55         234 $tochild->writer();
190 55         2255 $tochild->autoflush(1);
191 55         1072 $tochild->blocking(1);
192 55         156 binmode $tochild;
193              
194 55         868 $disp_select->add($fromchild);
195 55         5389 $self->{children}->{$pid} = { pid => $pid, fromchild => $fromchild, tochild => $tochild };
196              
197 55         740 print "PID $pid Started worker\n";
198 55         577 return (0, undef, undef);
199             } elsif (defined ($pid)) { # Child
200 9         658 $|=1;
201 9         617 STDERR->autoflush(1);
202 9         1710 $fromchild->writer();
203 9         1266 $fromchild->autoflush(1);
204 9         747 $fromchild->blocking(1);
205 9         100 binmode $fromchild;
206              
207 9         221 $tochild->reader();
208 9         547 $tochild->autoflush(1);
209 9         530 $tochild->blocking(1);
210 9         26 binmode $tochild;
211              
212 9         67 undef $disp_select; # we discard tonns of unneeded pipes !
213 9         158 undef $self->{children};
214              
215 9         149 return (1, $fromchild, $tochild);
216             } else {
217 0         0 die "Cannot fork()";
218             }
219             }
220              
221              
222             sub terminate_children
223             {
224 55     55 0 2715944 my ($self) = @_;
225 55         679 $SIG{CHLD} = 'DEFAULT'; # don't set SIGCHLD to IGNORE, prevents wait() from working under 5.12.2,3 undef OpenBSD
226 55         1067 $SIG{INT} = $SIG{USR2}='IGNORE';
227              
228             # close all pipes, just in case select() in child is not interruptable (seems it is under 5.14.2?)
229             # https://rt.perl.org/Ticket/Display.html?id=93428
230 55         291 for (values %{$self->{children}}) {
  55         322  
231 19         445 close $_->{fromchild};
232 19         193 close $_->{tochild};
233             }
234 55         180 kill (POSIX::SIGUSR2, keys %{$self->{children}}); # TODO: we terminate all children with SIGUSR2 even on normal exit
  55         717  
235 55         387 $SIG{TERM} = 'DEFAULT';
236 55         20371355 while( wait() != -1 ){};
237             }
238             1;