File Coverage

blib/lib/App/RemoteCommand.pm
Criterion Covered Total %
statement 47 238 19.7
branch 0 74 0.0
condition 0 30 0.0
subroutine 16 39 41.0
pod 0 10 0.0
total 63 391 16.1


line stmt bran cond sub pod time code
1             package App::RemoteCommand 0.990;
2 1     1   68575 use v5.16;
  1         4  
3 1     1   5 use warnings;
  1         5  
  1         25  
4              
5 1     1   421 use App::RemoteCommand::Pool;
  1         3  
  1         29  
6 1     1   427 use App::RemoteCommand::SSH;
  1         4  
  1         39  
7 1     1   467 use App::RemoteCommand::Select;
  1         3  
  1         38  
8 1     1   7 use App::RemoteCommand::Util qw(prompt DEBUG logger);
  1         18  
  1         57  
9              
10 1     1   5 use File::Basename ();
  1         2  
  1         13  
11 1     1   560 use File::Copy ();
  1         4560  
  1         26  
12 1     1   763 use File::Temp ();
  1         9456  
  1         26  
13 1     1   761 use Getopt::Long ();
  1         10345  
  1         31  
14 1     1   7 use IO::Select;
  1         6  
  1         38  
15 1     1   8 use List::Util ();
  1         2  
  1         17  
16 1     1   9 use POSIX 'strftime';
  1         2  
  1         9  
17 1     1   683 use Pod::Usage ();
  1         50382  
  1         33  
18 1     1   546 use String::Glob::Permute 'string_glob_permute';
  1         672  
  1         74  
19              
20 1     1   8 use constant TICK_SECOND => 0.1;
  1         3  
  1         3091  
21              
22             my $SCRIPT = File::Basename::basename($0);
23             my $SUDO_PROMPT = sprintf "sudo password (asking with %s): ", $SCRIPT;
24             my $SUDO_FAIL = "Sorry, try again.";
25              
26             sub new {
27 0     0 0   my ($class, %option) = @_;
28 0           bless {
29             %option,
30             pending => [],
31             running => App::RemoteCommand::Pool->new,
32             select => App::RemoteCommand::Select->new,
33             }, $class;
34             }
35              
36             sub run {
37 0     0 0   my ($self, @argv) = @_;
38 0 0         $self = $self->new unless ref $self;
39 0           $self->parse_options(@argv);
40 0           $self->register;
41              
42 0           local $| = 1;
43 0     0     my $INT; local $SIG{INT} = sub { $INT++ };
  0            
  0            
44 0     0     my $TERM; local $SIG{TERM} = sub { $TERM++ };
  0            
  0            
45 0           while (1) {
46 0 0 0       if ($INT || $TERM) {
47 0 0         my $signal = $TERM ? "TERM" : "INT";
48 0           warn "\nCatch SIG$signal, try to shutdown gracefully...\n";
49 0           DEBUG and logger "handling signal %s", $signal;
50 0           $self->cancel($signal);
51 0           $INT = $TERM = 0;
52             }
53 0           $self->one_tick;
54 0 0 0       last if @{$self->{pending}} == 0 && $self->{running}->count == 0;
  0            
55             }
56              
57 0           my @success = sort grep { $self->{exit}{$_} == 0 } keys %{$self->{exit}};
  0            
  0            
58 0           my @fail = sort grep { $self->{exit}{$_} != 0 } keys %{$self->{exit}};
  0            
  0            
59 0 0         if (!$self->{quiet}) {
60 0           print STDERR "\e[32mSUCCESS\e[m $_\n" for @success;
61 0           print STDERR "\e[31mFAIL\e[m $_\n" for @fail;
62             }
63 0 0         return @fail ? 1 : 0;
64             }
65              
66             sub show_help {
67 0     0 0   my $self = shift;
68 0           open my $fh, '>', \my $out;
69 0           Pod::Usage::pod2usage
70             exitval => 'noexit',
71             input => $0,
72             output => $fh,
73             sections => 'SYNOPSIS|OPTIONS|EXAMPLES',
74             verbose => 99,
75             ;
76 0           $out =~ s/^[ ]{4,6}/ /mg;
77 0           $out =~ s/\n$//;
78 0           print $out;
79             }
80              
81             sub parse_options {
82 0     0 0   my ($self, @argv) = @_;
83 0           my $parser = Getopt::Long::Parser->new(
84             config => [qw(no_auto_abbrev no_ignore_case)],
85             );
86             $parser->getoptionsfromarray(
87             \@argv,
88             "c|concurrency=i" => \($self->{concurrency} = 5),
89 0     0     "h|help" => sub { $self->show_help; exit 1 },
  0            
90             "s|script=s" => \($self->{script}),
91 0     0     "v|version" => sub { printf "%s %s\n", __PACKAGE__, __PACKAGE__->VERSION; exit },
  0            
92             "a|ask-sudo-password" => \(my $ask_sudo_password),
93             "H|host-file=s" => \(my $host_file),
94             "sudo-password=s" => \($self->{sudo_password}),
95             "append-hostname!" => \(my $append_hostname = 1),
96             "append-time!" => \(my $append_time),
97             "sudo=s" => \($self->{sudo_user}),
98             "q|quiet" => \($self->{quiet}),
99 0 0         "F=s" => \($self->{configfile}),
100             ) or exit(2);
101              
102 0 0         my $host_arg = $host_file ? undef : shift @argv;
103 0 0         if ($self->{script}) {
104 0           $self->{script_arg} = \@argv;
105             } else {
106 0           $self->{command} = \@argv;
107             }
108              
109 0 0 0       if (!@{$self->{command} || []} && !$self->{script}) {
  0 0          
110 0           warn "COMMANDS or --script option is required\n";
111 0           exit(2);
112             }
113 0 0         if ($self->{script}) {
114 0           my ($tempfh, $tempfile) = File::Temp::tempfile(UNLINK => 1, EXLOCK => 0);
115 0 0         File::Copy::copy($self->{script}, $tempfh)
116             or die "copy $self->{script} to tempfile: $!";
117 0           close $tempfh;
118 0           chmod 0755, $tempfile;
119 0           $self->{script} = $tempfile;
120             }
121              
122 0           $self->{format} = $self->make_format(
123             append_hostname => $append_hostname,
124             append_time => $append_time,
125             );
126              
127 0 0         if ($ask_sudo_password) {
128 0           my $password = prompt $SUDO_PROMPT;
129 0           $self->{sudo_password} = $password;
130             }
131 0 0         $self->{host} = $host_file ? $self->parse_host_file($host_file)
132             : $self->parse_host_arg($host_arg);
133 0           $self;
134             }
135              
136             sub cancel {
137 0     0 0   my ($self, $signal) = @_;
138 0           @{$self->{pending}} = ();
  0            
139 0           for my $ssh ($self->{running}->all) {
140 0           $ssh->cancel($signal);
141             }
142             }
143              
144             sub one_tick {
145 0     0 0   my $self = shift;
146              
147 0   0       while ($self->{running}->count < $self->{concurrency} and my $ssh = shift @{$self->{pending}}) {
  0            
148 0           $self->{running}->add($ssh);
149             }
150              
151 0           my @ready = $self->{select}->can_read(TICK_SECOND);
152             DEBUG and logger "one tick running %d (watching %d, can_read %d), pending %d",
153 0           $self->{running}->count, $self->{select}->count, scalar @ready, scalar @{$self->{pending}};
154              
155 0 0         if ($self->{select}->count == 0) {
156 0           select undef, undef, undef, TICK_SECOND;
157             } else {
158 0           $self->_process($_) for @ready;
159             }
160              
161             # -1: there is no child process
162             # 0: all child process are running
163 0           my $pid = waitpid -1, POSIX::WNOHANG;
164 0           my $exit = $?;
165              
166 0 0 0       if ($pid > 0 and my $remove = $self->{select}->remove(pid => $pid)) {
167 0           $self->_post_process($remove);
168             }
169              
170 0           for my $ssh ($self->{running}->all) {
171 0 0         my %args = (select => $self->{select}, $pid > 0 ? (pid => $pid, exit => $exit) : ());
172 0           my $is_running = $ssh->one_tick(%args);
173 0 0         if (!$is_running) {
174 0           $self->{exit}{$ssh->host} = $ssh->exit;
175 0           $self->{running}->remove($ssh);
176             }
177             }
178             }
179              
180             # We close fh explicitly; otherwise it happens that
181             # perl warns "unnable to close filehandle properly: Input/output error" under ssh proxy
182             sub _process {
183 0     0     my ($self, $ready) = @_;
184 0           my ($fh, $pid, $host, $buffer) = @{$ready}{qw(fh pid host buffer)};
  0            
185 0           my $len = sysread $fh, my $buf, 64*1024;
186 0           my ($errno, $errmsg) = (0+$!, "$!");
187 0           DEBUG and logger " READ %s, pid %d, len %s, err: %s",
188             $host, $pid, defined $len ? $len : 'undef', $errmsg || "N/A";
189 0 0         if ($len) {
    0          
190 0 0         if (my @line = $buffer->add($buf)->get) {
191 0           print $self->{format}->($host, $_) for @line;
192 0 0 0       if ($ready->{sudo} and @line == 1 and $line[0] eq $SUDO_FAIL) {
      0        
193 0           $self->{select}->remove(fh => $fh);
194 0           close $fh;
195 0           return;
196             }
197             }
198              
199 0 0         if ($buffer->raw eq $SUDO_PROMPT) {
200 0           $ready->{sudo}++;
201 0           my ($line) = $buffer->get(1);
202 0           print $self->{format}->($host, $line);
203 0 0         if (my $sudo_password = $self->{sudo_password}) {
204 0           syswrite $fh, "$sudo_password\n";
205             } else {
206 0           my $err = "have to provide sudo passowrd first, try again with --ask-sudo-password option.";
207 0           print $self->{format}->($host, $err);
208 0           $self->{select}->remove(fh => $fh);
209 0           close $fh;
210             }
211             }
212             } elsif (!defined $len) {
213 0 0         if ($errno != Errno::EIO) { # this happens when use ssh proxy, so skip
214 0           print $self->{format}->($host, "sysread $errmsg");
215             }
216             } else {
217 0           my @line = $buffer->get(1);
218 0           print $self->{format}->($host, $_) for @line;
219 0           $self->{select}->remove(fh => $fh);
220 0           close $fh;
221             }
222             }
223              
224             sub _post_process {
225 0     0     my ($self, $ready) = @_;
226 0           my ($fh, $pid, $host, $buffer) = @{$ready}{qw(fh pid host buffer)};
  0            
227 0 0         if ($fh) {
228             # XXX: We use select() here; otherwise it happens that
229             # <$fh> is blocked under ssh proxy
230 0           my $select = IO::Select->new($fh);
231 0           while ($select->can_read(TICK_SECOND)) {
232 0           my $len = sysread $fh, my $buf, 64*1024;
233 0           DEBUG and logger " POST READ %s, pid %d, len %s", $host, $pid, defined $len ? $len : 'undef';
234 0 0 0       if (defined $len && $len > 0) {
235 0           $buffer->add($buf);
236             } else {
237 0           last;
238             }
239             }
240 0           my @line = $buffer->get(1);
241 0           print $self->{format}->($host, $_) for @line;
242 0           close $fh;
243             }
244             }
245              
246             sub register {
247 0     0 0   my $self = shift;
248              
249 0           my @prefix = ("env", "SUDO_PROMPT=$SUDO_PROMPT");
250 0 0         push @prefix, "sudo", "-u", $self->{sudo_user} if $self->{sudo_user};
251              
252 0           my (@ssh_cmd, $ssh_at_exit);
253 0           my @command;
254 0 0         if (my $script = $self->{script}) {
255 0           my $name = sprintf "/tmp/%s.%d.%d", $SCRIPT, time, rand(10_000);
256             push @ssh_cmd, sub {
257 0     0     my $ssh = shift;
258 0           my $pid = $ssh->scp_put({async => 1, copy_attrs => 1}, $script, $name);
259 0           return ($pid, undef);
260 0           };
261             $ssh_at_exit = sub {
262 0     0     my $ssh = shift;
263 0           my $pid = $ssh->system({async => 1}, "rm", "-f", $name);
264 0           return ($pid, undef);
265 0           };
266 0           @command = (@prefix, $name, @{$self->{script_arg}});
  0            
267             } else {
268 0           my $escape = qr{[^a-zA-Z0-9/_:%\.-]};
269             @command = (
270             @prefix,
271             (@{$self->{command}} == 1 && $self->{command}[0] =~ $escape ? ("bash", "-c") : ()),
272 0 0 0       @{$self->{command}},
  0            
273             );
274             }
275 0           DEBUG and logger "execute %s", join(" ", map { qq('$_') } @command);
276             push @ssh_cmd, sub {
277 0     0     my $ssh = shift;
278 0           my ($fh, $pid) = $ssh->open2pty(@command);
279 0           return ($pid, $fh);
280 0           };
281              
282 0           for my $host (@{$self->{host}}) {
  0            
283 0           my $ssh = App::RemoteCommand::SSH->new(host => $host, configfile => $self->{configfile});
284 0           $ssh->add($_) for @ssh_cmd;
285 0 0         $ssh->at_exit($ssh_at_exit) if $ssh_at_exit;
286 0           push @{$self->{pending}}, $ssh;
  0            
287             }
288             }
289              
290             sub make_format {
291 0     0 0   my ($self, %opt) = @_;
292 0 0 0       if ($opt{append_time} && $opt{append_hostname}) {
    0          
    0          
293 0     0     sub { my ($host, $msg) = @_; "[@{[strftime '%F %T', localtime]}][$host] $msg\n" };
  0            
  0            
  0            
294             } elsif ($opt{append_time}) {
295 0     0     sub { my ($host, $msg) = @_; "[@{[strftime '%F %T', localtime]}] $msg\n" };
  0            
  0            
  0            
296             } elsif ($opt{append_hostname}) {
297 0     0     sub { my ($host, $msg) = @_; "[$host] $msg\n" };
  0            
  0            
298             } else {
299 0     0     sub { my ($host, $msg) = @_; "$msg\n" };
  0            
  0            
300             }
301             }
302              
303             sub parse_host_arg {
304 0     0 0   my ($self, $host_arg) = @_;
305 0           [ List::Util::uniq string_glob_permute($host_arg) ];
306             }
307              
308             sub parse_host_file {
309 0     0 0   my ($self, $host_file) = @_;
310 0 0         open my $fh, "<", $host_file or die "Cannot open '$host_file': $!\n";
311 0           my @host;
312 0           while (my $line = <$fh>) {
313 0           $line =~ s/^\s+//; $line =~ s/\s+$//;
  0            
314 0 0         push @host, string_glob_permute($line) if $line =~ /^[^#\s]/;
315             }
316 0           [ List::Util::uniq @host ];
317             }
318              
319             1;
320             __END__