File Coverage

blib/lib/App/RemoteCommand.pm
Criterion Covered Total %
statement 50 290 17.2
branch 0 74 0.0
condition 0 30 0.0
subroutine 17 40 42.5
pod 0 10 0.0
total 67 444 15.0


line stmt bran cond sub pod time code
1             package App::RemoteCommand v1.0.0;
2 1     1   94942 use v5.24;
  1         3  
3 1     1   3 use warnings;
  1         2  
  1         52  
4 1     1   4 use experimental qw(lexical_subs signatures);
  1         1  
  1         6  
5              
6 1     1   495 use App::RemoteCommand::Pool;
  1         2  
  1         28  
7 1     1   409 use App::RemoteCommand::SSH;
  1         3  
  1         36  
8 1     1   413 use App::RemoteCommand::Select;
  1         2  
  1         57  
9 1     1   6 use App::RemoteCommand::Util qw(prompt DEBUG logger);
  1         1  
  1         47  
10              
11 1     1   4 use File::Basename ();
  1         1  
  1         9  
12 1     1   423 use File::Copy ();
  1         4210  
  1         23  
13 1     1   843 use File::Temp ();
  1         9452  
  1         29  
14 1     1   586 use Getopt::Long ();
  1         12766  
  1         54  
15 1     1   10 use IO::Select;
  1         3  
  1         62  
16 1     1   9 use List::Util ();
  1         2  
  1         24  
17 1     1   6 use POSIX 'strftime';
  1         2  
  1         10  
18 1     1   855 use Pod::Usage ();
  1         90591  
  1         55  
19 1     1   746 use String::Glob::Permute 'string_glob_permute';
  1         1179  
  1         104  
20              
21 1     1   10 use constant TICK_SECOND => 0.1;
  1         2  
  1         6351  
22              
23             my $SCRIPT = File::Basename::basename($0);
24             my $SUDO_PROMPT = sprintf "sudo password (asking with %s): ", $SCRIPT;
25             my $SUDO_FAIL = "Sorry, try again.";
26              
27             our $TRIAL = 0;
28              
29 0     0 0   sub new ($class, %option) {
  0            
  0            
  0            
30 0           bless {
31             %option,
32             pending => [],
33             running => App::RemoteCommand::Pool->new,
34             select => App::RemoteCommand::Select->new,
35             }, $class;
36             }
37              
38 0     0 0   sub run ($self, @argv) {
  0            
  0            
  0            
39 0 0         $self = $self->new unless ref $self;
40 0           $self->parse_options(@argv);
41 0           $self->register;
42              
43 0           local $| = 1;
44 0     0     my $INT; local $SIG{INT} = sub (@) { $INT++ };
  0            
  0            
  0            
  0            
45 0     0     my $TERM; local $SIG{TERM} = sub (@) { $TERM++ };
  0            
  0            
  0            
  0            
46 0           while (1) {
47 0 0 0       if ($INT || $TERM) {
48 0 0         my $signal = $TERM ? "TERM" : "INT";
49 0           warn "\nCatch SIG$signal, try to shutdown gracefully...\n";
50 0           DEBUG and logger "handling signal %s", $signal;
51 0           $self->cancel($signal);
52 0           $INT = $TERM = 0;
53             }
54 0           $self->one_tick;
55 0 0 0       last if $self->{pending}->@* == 0 && $self->{running}->count == 0;
56             }
57              
58 0           my @success = sort grep { $self->{exit}{$_} == 0 } keys $self->{exit}->%*;
  0            
59 0           my @fail = sort grep { $self->{exit}{$_} != 0 } keys $self->{exit}->%*;
  0            
60 0 0         if (!$self->{quiet}) {
61 0           print STDERR "\e[32mSUCCESS\e[m $_\n" for @success;
62 0           print STDERR "\e[31mFAIL\e[m $_\n" for @fail;
63             }
64 0 0         return @fail ? 1 : 0;
65             }
66              
67 0     0 0   sub show_help ($self) {
  0            
  0            
68 0           open my $fh, '>', \my $out;
69 0           Pod::Usage::pod2usage
70             exitval => 'noexit',
71             input => $0,
72             output => $fh,
73             sections => 'SYNOPSIS|OPTIONS|EXAMPLES',
74             verbose => 99,
75             ;
76 0           $out =~ s/^[ ]{4,6}/ /mg;
77 0           $out =~ s/\n$//;
78 0           print $out;
79             }
80              
81 0     0 0   sub parse_options ($self, @argv) {
  0            
  0            
  0            
82 0           my $parser = Getopt::Long::Parser->new(
83             config => [qw(no_auto_abbrev no_ignore_case)],
84             );
85             $parser->getoptionsfromarray(
86             \@argv,
87 0     0     "c|concurrency=i" => \($self->{concurrency} = 5),
88 0           "h|help" => sub (@) { $self->show_help; exit 1 },
  0            
  0            
89 0     0     "s|script=s" => \($self->{script}),
90 0           "v|version" => sub (@) { printf "%s %s\n", __PACKAGE__, __PACKAGE__->VERSION; exit },
  0            
  0            
91             "a|ask-sudo-password" => \(my $ask_sudo_password),
92             "H|host-file=s" => \(my $host_file),
93             "sudo-password=s" => \($self->{sudo_password}),
94             "append-hostname!" => \(my $append_hostname = 1),
95             "append-time!" => \(my $append_time),
96             "sudo=s" => \($self->{sudo_user}),
97             "q|quiet" => \($self->{quiet}),
98 0 0         "F=s" => \($self->{configfile}),
99             ) or exit(2);
100              
101 0 0         my $host_arg = $host_file ? undef : shift @argv;
102 0 0         if ($self->{script}) {
103 0           $self->{script_arg} = \@argv;
104             } else {
105 0           $self->{command} = \@argv;
106             }
107              
108 0 0 0       if (!($self->{command} || [])->@* && !$self->{script}) {
    0          
109 0           warn "COMMANDS or --script option is required\n";
110 0           exit(2);
111             }
112 0 0         if ($self->{script}) {
113 0           my ($tempfh, $tempfile) = File::Temp::tempfile(UNLINK => 1, EXLOCK => 0);
114 0 0         File::Copy::copy($self->{script}, $tempfh)
115             or die "copy $self->{script} to tempfile: $!";
116 0           close $tempfh;
117 0           chmod 0755, $tempfile;
118 0           $self->{script} = $tempfile;
119             }
120              
121 0           $self->{format} = $self->make_format(
122             append_hostname => $append_hostname,
123             append_time => $append_time,
124             );
125              
126 0 0         if ($ask_sudo_password) {
127 0           my $password = prompt $SUDO_PROMPT;
128 0           $self->{sudo_password} = $password;
129             }
130 0 0         $self->{host} = $host_file ? $self->parse_host_file($host_file)
131             : $self->parse_host_arg($host_arg);
132 0           $self;
133             }
134              
135 0     0 0   sub cancel ($self, $signal) {
  0            
  0            
  0            
136 0           $self->{pending}->@* = ();
137 0           for my $ssh ($self->{running}->all) {
138 0           $ssh->cancel($signal);
139             }
140             }
141              
142 0     0 0   sub one_tick ($self) {
  0            
  0            
143              
144 0   0       while ($self->{running}->count < $self->{concurrency} and my $ssh = shift $self->{pending}->@*) {
145 0           $self->{running}->add($ssh);
146             }
147              
148 0           my @ready = $self->{select}->can_read(TICK_SECOND);
149             DEBUG and logger "one tick running %d (watching %d, can_read %d), pending %d",
150 0           $self->{running}->count, $self->{select}->count, scalar @ready, scalar $self->{pending}->@*;
151              
152 0 0         if ($self->{select}->count == 0) {
153 0           select undef, undef, undef, TICK_SECOND;
154             } else {
155 0           $self->_process($_) for @ready;
156             }
157              
158             # -1: there is no child process
159             # 0: all child process are running
160 0           my $pid = waitpid -1, POSIX::WNOHANG;
161 0           my $exit = $?;
162              
163 0 0 0       if ($pid > 0 and my $remove = $self->{select}->remove(pid => $pid)) {
164 0           $self->_post_process($remove);
165             }
166              
167 0           for my $ssh ($self->{running}->all) {
168 0 0         my %args = (select => $self->{select}, $pid > 0 ? (pid => $pid, exit => $exit) : ());
169 0           my $is_running = $ssh->one_tick(%args);
170 0 0         if (!$is_running) {
171 0           $self->{exit}{$ssh->host} = $ssh->exit;
172 0           $self->{running}->remove($ssh);
173             }
174             }
175             }
176              
177             # We close fh explicitly; otherwise it happens that
178             # perl warns "unnable to close filehandle properly: Input/output error" under ssh proxy
179 0     0     sub _process ($self, $ready) {
  0            
  0            
  0            
180 0           my ($fh, $pid, $host, $buffer) = @{$ready}{qw(fh pid host buffer)};
  0            
181 0           my $len = sysread $fh, my $buf, 64*1024;
182 0           my ($errno, $errmsg) = (0+$!, "$!");
183 0           DEBUG and logger " READ %s, pid %d, len %s, err: %s",
184             $host, $pid, defined $len ? $len : 'undef', $errmsg || "N/A";
185 0 0         if ($len) {
    0          
186 0 0         if (my @line = $buffer->add($buf)->get) {
187 0           print $self->{format}->($host, $_) for @line;
188 0 0 0       if ($ready->{sudo} and @line == 1 and $line[0] eq $SUDO_FAIL) {
      0        
189 0           $self->{select}->remove(fh => $fh);
190 0           close $fh;
191 0           return;
192             }
193             }
194              
195 0 0         if ($buffer->raw eq $SUDO_PROMPT) {
196 0           $ready->{sudo}++;
197 0           my ($line) = $buffer->get(1);
198 0           print $self->{format}->($host, $line);
199 0 0         if (my $sudo_password = $self->{sudo_password}) {
200 0           syswrite $fh, "$sudo_password\n";
201             } else {
202 0           my $err = "have to provide sudo passowrd first, try again with --ask-sudo-password option.";
203 0           print $self->{format}->($host, $err);
204 0           $self->{select}->remove(fh => $fh);
205 0           close $fh;
206             }
207             }
208             } elsif (!defined $len) {
209 0 0         if ($errno != Errno::EIO) { # this happens when use ssh proxy, so skip
210 0           print $self->{format}->($host, "sysread $errmsg");
211             }
212             } else {
213 0           my @line = $buffer->get(1);
214 0           print $self->{format}->($host, $_) for @line;
215 0           $self->{select}->remove(fh => $fh);
216 0           close $fh;
217             }
218             }
219              
220 0     0     sub _post_process ($self, $ready) {
  0            
  0            
  0            
221 0           my ($fh, $pid, $host, $buffer) = @{$ready}{qw(fh pid host buffer)};
  0            
222 0 0         if ($fh) {
223             # XXX: We use select() here; otherwise it happens that
224             # <$fh> is blocked under ssh proxy
225 0           my $select = IO::Select->new($fh);
226 0           while ($select->can_read(TICK_SECOND)) {
227 0           my $len = sysread $fh, my $buf, 64*1024;
228 0           DEBUG and logger " POST READ %s, pid %d, len %s", $host, $pid, defined $len ? $len : 'undef';
229 0 0 0       if (defined $len && $len > 0) {
230 0           $buffer->add($buf);
231             } else {
232 0           last;
233             }
234             }
235 0           my @line = $buffer->get(1);
236 0           print $self->{format}->($host, $_) for @line;
237 0           close $fh;
238             }
239             }
240              
241 0     0 0   sub register ($self) {
  0            
  0            
242              
243 0           my @prefix = ("env", "SUDO_PROMPT=$SUDO_PROMPT");
244 0 0         push @prefix, "sudo", "-u", $self->{sudo_user} if $self->{sudo_user};
245              
246 0           my (@ssh_cmd, $ssh_at_exit);
247 0           my @command;
248 0 0         if (my $script = $self->{script}) {
249 0           my $name = sprintf "/tmp/%s.%d.%d", $SCRIPT, time, rand(10_000);
250 0     0     push @ssh_cmd, sub ($ssh) {
  0            
  0            
251 0           my $pid = $ssh->scp_put({async => 1, copy_attrs => 1}, $script, $name);
252 0           return ($pid, undef);
253 0           };
254 0     0     $ssh_at_exit = sub ($ssh) {
  0            
  0            
255 0           my $pid = $ssh->system({async => 1}, "rm", "-f", $name);
256 0           return ($pid, undef);
257 0           };
258 0           @command = (@prefix, $name, $self->{script_arg}->@*);
259             } else {
260 0           my $escape = qr{[^a-zA-Z0-9/_:%\.-]};
261             @command = (
262             @prefix,
263             ($self->{command}->@* == 1 && $self->{command}[0] =~ $escape ? ("bash", "-c") : ()),
264             $self->{command}->@*,
265 0 0 0       );
266             }
267 0           DEBUG and logger "execute %s", join(" ", map { qq('$_') } @command);
268 0     0     push @ssh_cmd, sub ($ssh) {
  0            
  0            
269 0           my ($fh, $pid) = $ssh->open2pty(@command);
270 0           return ($pid, $fh);
271 0           };
272              
273 0           for my $host ($self->{host}->@*) {
274 0           my $ssh = App::RemoteCommand::SSH->new(host => $host, configfile => $self->{configfile});
275 0           $ssh->add($_) for @ssh_cmd;
276 0 0         $ssh->at_exit($ssh_at_exit) if $ssh_at_exit;
277 0           push $self->{pending}->@*, $ssh;
278             }
279             }
280              
281 0     0 0   sub make_format ($self, %opt) {
  0            
  0            
  0            
282 0 0 0       if ($opt{append_time} && $opt{append_hostname}) {
    0          
    0          
283 0     0     sub ($host, $msg) { "[@{[strftime '%F %T', localtime]}][$host] $msg\n" };
  0            
  0            
  0            
  0            
  0            
  0            
284             } elsif ($opt{append_time}) {
285 0     0     sub ($host, $msg) { "[@{[strftime '%F %T', localtime]}] $msg\n" };
  0            
  0            
  0            
  0            
  0            
  0            
286             } elsif ($opt{append_hostname}) {
287 0     0     sub ($host, $msg) { "[$host] $msg\n" };
  0            
  0            
  0            
  0            
  0            
288             } else {
289 0     0     sub ($host, $msg) { "$msg\n" };
  0            
  0            
  0            
  0            
  0            
290             }
291             }
292              
293 0     0 0   sub parse_host_arg ($self, $host_arg) {
  0            
  0            
  0            
294 0           [ List::Util::uniq string_glob_permute($host_arg) ];
295             }
296              
297 0     0 0   sub parse_host_file ($self, $host_file) {
  0            
  0            
  0            
298 0 0         open my $fh, "<", $host_file or die "Cannot open '$host_file': $!\n";
299 0           my @host;
300 0           while (my $line = <$fh>) {
301 0           $line =~ s/^\s+//; $line =~ s/\s+$//;
  0            
302 0 0         push @host, string_glob_permute($line) if $line =~ /^[^#\s]/;
303             }
304 0           [ List::Util::uniq @host ];
305             }
306              
307             1;
308             __END__