File Coverage

blib/lib/App/RemoteCommand.pm
Criterion Covered Total %
statement 48 232 20.6
branch 0 74 0.0
condition 0 30 0.0
subroutine 16 38 42.1
pod 0 9 0.0
total 64 383 16.7


line stmt bran cond sub pod time code
1             package App::RemoteCommand;
2 1     1   71162 use strict;
  1         2  
  1         41  
3 1     1   15 use warnings;
  1         2  
  1         26  
4              
5 1     1   446 use App::RemoteCommand::Pool;
  1         3  
  1         33  
6 1     1   465 use App::RemoteCommand::SSH;
  1         4  
  1         42  
7 1     1   486 use App::RemoteCommand::Select;
  1         2  
  1         39  
8 1     1   7 use App::RemoteCommand::Util qw(prompt DEBUG logger);
  1         15  
  1         57  
9              
10 1     1   5 use File::Basename ();
  1         3  
  1         14  
11 1     1   531 use File::Copy ();
  1         4780  
  1         29  
12 1     1   1146 use File::Temp ();
  1         10270  
  1         31  
13 1     1   867 use Getopt::Long qw(:config no_auto_abbrev no_ignore_case bundling);
  1         10836  
  1         5  
14 1     1   203 use IO::Select;
  1         3  
  1         31  
15 1     1   6 use List::Util ();
  1         3  
  1         18  
16 1     1   5 use POSIX 'strftime';
  1         2  
  1         7  
17 1     1   653 use Pod::Usage 'pod2usage';
  1         51840  
  1         104  
18 1     1   610 use String::Glob::Permute 'string_glob_permute';
  1         668  
  1         67  
19              
20 1     1   7 use constant TICK_SECOND => 0.1;
  1         2  
  1         3215  
21              
22             our $VERSION = '0.982';
23              
24             my $SCRIPT = File::Basename::basename($0);
25             my $SUDO_PROMPT = sprintf "sudo password (asking with %s): ", $SCRIPT;
26             my $SUDO_FAIL = "Sorry, try again.";
27              
28             sub new {
29 0     0 0   my ($class, %option) = @_;
30 0           bless {
31             %option,
32             pending => [],
33             running => App::RemoteCommand::Pool->new,
34             select => App::RemoteCommand::Select->new,
35             }, $class;
36             }
37              
38             sub run {
39 0     0 0   my ($self, @argv) = @_;
40 0 0         $self = $self->new unless ref $self;
41 0           $self->parse_options(@argv);
42 0           $self->register;
43              
44 0           local $| = 1;
45 0     0     my $INT; local $SIG{INT} = sub { $INT++ };
  0            
  0            
46 0     0     my $TERM; local $SIG{TERM} = sub { $TERM++ };
  0            
  0            
47 0           while (1) {
48 0 0 0       if ($INT || $TERM) {
49 0 0         my $signal = $TERM ? "TERM" : "INT";
50 0           warn "\nCatch SIG$signal, try to shutdown gracefully...\n";
51 0           DEBUG and logger "handling signal %s", $signal;
52 0           $self->cancel($signal);
53 0           $INT = $TERM = 0;
54             }
55 0           $self->one_tick;
56 0 0 0       last if @{$self->{pending}} == 0 && $self->{running}->count == 0;
  0            
57             }
58              
59 0           my @success = sort grep { $self->{exit}{$_} == 0 } keys %{$self->{exit}};
  0            
  0            
60 0           my @fail = sort grep { $self->{exit}{$_} != 0 } keys %{$self->{exit}};
  0            
  0            
61 0 0         if (!$self->{quiet}) {
62 0           print STDERR "\e[32mSUCCESS\e[m $_\n" for @success;
63 0           print STDERR "\e[31mFAIL\e[m $_\n" for @fail;
64             }
65 0 0         return @fail ? 1 : 0;
66             }
67              
68             sub parse_options {
69 0     0 0   my ($self, @argv) = @_;
70 0           local @ARGV = @argv;
71             GetOptions
72             "c|concurrency=i" => \($self->{concurrency} = 5),
73 0     0     "h|help" => sub { pod2usage(verbose => 99, sections => 'SYNOPSIS|OPTIONS|EXAMPLES') },
74             "s|script=s" => \($self->{script}),
75 0     0     "v|version" => sub { printf "%s %s\n", __PACKAGE__, $VERSION; exit },
  0            
76             "a|ask-sudo-password" => \(my $ask_sudo_password),
77             "H|host-file=s" => \(my $host_file),
78             "sudo-password=s" => \($self->{sudo_password}),
79             "append-hostname!" => \(my $append_hostname = 1),
80             "append-time!" => \(my $append_time),
81             "sudo=s" => \($self->{sudo_user}),
82             "q|quiet" => \($self->{quiet}),
83 0 0         "F=s" => \($self->{configfile}),
84             or exit(2);
85              
86 0 0         my $host_arg = $host_file ? undef : shift @ARGV;
87 0 0         if ($self->{script}) {
88 0           $self->{script_arg} = \@ARGV;
89             } else {
90 0           $self->{command} = \@ARGV;
91             }
92              
93 0 0 0       if (!@{$self->{command} || []} && !$self->{script}) {
  0 0          
94 0           warn "COMMANDS or --script option is required\n";
95 0           exit(2);
96             }
97 0 0         if ($self->{script}) {
98 0           my ($tempfh, $tempfile) = File::Temp::tempfile(UNLINK => 1, EXLOCK => 0);
99 0 0         File::Copy::copy($self->{script}, $tempfh)
100             or die "copy $self->{script} to tempfile: $!";
101 0           close $tempfh;
102 0           chmod 0755, $tempfile;
103 0           $self->{script} = $tempfile;
104             }
105              
106 0           $self->{format} = $self->make_format(
107             append_hostname => $append_hostname,
108             append_time => $append_time,
109             );
110              
111 0 0         if ($ask_sudo_password) {
112 0           my $password = prompt $SUDO_PROMPT;
113 0           $self->{sudo_password} = $password;
114             }
115 0 0         $self->{host} = $host_file ? $self->parse_host_file($host_file)
116             : $self->parse_host_arg($host_arg);
117 0           $self;
118             }
119              
120             sub cancel {
121 0     0 0   my ($self, $signal) = @_;
122 0           @{$self->{pending}} = ();
  0            
123 0           for my $ssh ($self->{running}->all) {
124 0           $ssh->cancel($signal);
125             }
126             }
127              
128             sub one_tick {
129 0     0 0   my $self = shift;
130              
131 0   0       while ($self->{running}->count < $self->{concurrency} and my $ssh = shift @{$self->{pending}}) {
  0            
132 0           $self->{running}->add($ssh);
133             }
134              
135 0           my @ready = $self->{select}->can_read(TICK_SECOND);
136             DEBUG and logger "one tick running %d (watching %d, can_read %d), pending %d",
137 0           $self->{running}->count, $self->{select}->count, scalar @ready, scalar @{$self->{pending}};
138              
139 0 0         if ($self->{select}->count == 0) {
140 0           select undef, undef, undef, TICK_SECOND;
141             } else {
142 0           $self->_process($_) for @ready;
143             }
144              
145             # -1: there is no child process
146             # 0: all child process are running
147 0           my $pid = waitpid -1, POSIX::WNOHANG;
148 0           my $exit = $?;
149              
150 0 0 0       if ($pid > 0 and my $remove = $self->{select}->remove(pid => $pid)) {
151 0           $self->_post_process($remove);
152             }
153              
154 0           for my $ssh ($self->{running}->all) {
155 0 0         my %args = (select => $self->{select}, $pid > 0 ? (pid => $pid, exit => $exit) : ());
156 0           my $is_running = $ssh->one_tick(%args);
157 0 0         if (!$is_running) {
158 0           $self->{exit}{$ssh->host} = $ssh->exit;
159 0           $self->{running}->remove($ssh);
160             }
161             }
162             }
163              
164             # We close fh explicitly; otherwise it happens that
165             # perl warns "unnable to close filehandle properly: Input/output error" under ssh proxy
166             sub _process {
167 0     0     my ($self, $ready) = @_;
168 0           my ($fh, $pid, $host, $buffer) = @{$ready}{qw(fh pid host buffer)};
  0            
169 0           my $len = sysread $fh, my $buf, 64*1024;
170 0           my ($errno, $errmsg) = (0+$!, "$!");
171 0           DEBUG and logger " READ %s, pid %d, len %s, err: %s",
172             $host, $pid, defined $len ? $len : 'undef', $errmsg || "N/A";
173 0 0         if ($len) {
    0          
174 0 0         if (my @line = $buffer->add($buf)->get) {
175 0           print $self->{format}->($host, $_) for @line;
176 0 0 0       if ($ready->{sudo} and @line == 1 and $line[0] eq $SUDO_FAIL) {
      0        
177 0           $self->{select}->remove(fh => $fh);
178 0           close $fh;
179 0           return;
180             }
181             }
182              
183 0 0         if ($buffer->raw eq $SUDO_PROMPT) {
184 0           $ready->{sudo}++;
185 0           my ($line) = $buffer->get(1);
186 0           print $self->{format}->($host, $line);
187 0 0         if (my $sudo_password = $self->{sudo_password}) {
188 0           syswrite $fh, "$sudo_password\n";
189             } else {
190 0           my $err = "have to provide sudo passowrd first, try again with --ask-sudo-password option.";
191 0           print $self->{format}->($host, $err);
192 0           $self->{select}->remove(fh => $fh);
193 0           close $fh;
194             }
195             }
196             } elsif (!defined $len) {
197 0 0         if ($errno != Errno::EIO) { # this happens when use ssh proxy, so skip
198 0           print $self->{format}->($host, "sysread $errmsg");
199             }
200             } else {
201 0           my @line = $buffer->get(1);
202 0           print $self->{format}->($host, $_) for @line;
203 0           $self->{select}->remove(fh => $fh);
204 0           close $fh;
205             }
206             }
207              
208             sub _post_process {
209 0     0     my ($self, $ready) = @_;
210 0           my ($fh, $pid, $host, $buffer) = @{$ready}{qw(fh pid host buffer)};
  0            
211 0 0         if ($fh) {
212             # XXX: We use select() here; otherwise it happens that
213             # <$fh> is blocked under ssh proxy
214 0           my $select = IO::Select->new($fh);
215 0           while ($select->can_read(TICK_SECOND)) {
216 0           my $len = sysread $fh, my $buf, 64*1024;
217 0           DEBUG and logger " POST READ %s, pid %d, len %s", $host, $pid, defined $len ? $len : 'undef';
218 0 0 0       if (defined $len && $len > 0) {
219 0           $buffer->add($buf);
220             } else {
221 0           last;
222             }
223             }
224 0           my @line = $buffer->get(1);
225 0           print $self->{format}->($host, $_) for @line;
226 0           close $fh;
227             }
228             }
229              
230             sub register {
231 0     0 0   my $self = shift;
232              
233 0           my @prefix = ("env", "SUDO_PROMPT=$SUDO_PROMPT");
234 0 0         push @prefix, "sudo", "-u", $self->{sudo_user} if $self->{sudo_user};
235              
236 0           my (@ssh_cmd, $ssh_at_exit);
237 0           my @command;
238 0 0         if (my $script = $self->{script}) {
239 0           my $name = sprintf "/tmp/%s.%d.%d", $SCRIPT, time, rand(10_000);
240             push @ssh_cmd, sub {
241 0     0     my $ssh = shift;
242 0           my $pid = $ssh->scp_put({async => 1, copy_attrs => 1}, $script, $name);
243 0           return ($pid, undef);
244 0           };
245             $ssh_at_exit = sub {
246 0     0     my $ssh = shift;
247 0           my $pid = $ssh->system({async => 1}, "rm", "-f", $name);
248 0           return ($pid, undef);
249 0           };
250 0           @command = (@prefix, $name, @{$self->{script_arg}});
  0            
251             } else {
252 0           my $escape = qr{[^a-zA-Z0-9/_:%\.-]};
253             @command = (
254             @prefix,
255             (@{$self->{command}} == 1 && $self->{command}[0] =~ $escape ? ("bash", "-c") : ()),
256 0 0 0       @{$self->{command}},
  0            
257             );
258             }
259 0           DEBUG and logger "execute %s", join(" ", map { qq('$_') } @command);
260             push @ssh_cmd, sub {
261 0     0     my $ssh = shift;
262 0           my ($fh, $pid) = $ssh->open2pty(@command);
263 0           return ($pid, $fh);
264 0           };
265              
266 0           for my $host (@{$self->{host}}) {
  0            
267 0           my $ssh = App::RemoteCommand::SSH->new(host => $host, configfile => $self->{configfile});
268 0           $ssh->add($_) for @ssh_cmd;
269 0 0         $ssh->at_exit($ssh_at_exit) if $ssh_at_exit;
270 0           push @{$self->{pending}}, $ssh;
  0            
271             }
272             }
273              
274             sub make_format {
275 0     0 0   my ($self, %opt) = @_;
276 0 0 0       if ($opt{append_time} && $opt{append_hostname}) {
    0          
    0          
277 0     0     sub { my ($host, $msg) = @_; "[@{[strftime '%F %T', localtime]}][$host] $msg\n" };
  0            
  0            
  0            
278             } elsif ($opt{append_time}) {
279 0     0     sub { my ($host, $msg) = @_; "[@{[strftime '%F %T', localtime]}] $msg\n" };
  0            
  0            
  0            
280             } elsif ($opt{append_hostname}) {
281 0     0     sub { my ($host, $msg) = @_; "[$host] $msg\n" };
  0            
  0            
282             } else {
283 0     0     sub { my ($host, $msg) = @_; "$msg\n" };
  0            
  0            
284             }
285             }
286              
287             sub parse_host_arg {
288 0     0 0   my ($self, $host_arg) = @_;
289 0           [ List::Util::uniq string_glob_permute($host_arg) ];
290             }
291              
292             sub parse_host_file {
293 0     0 0   my ($self, $host_file) = @_;
294 0 0         open my $fh, "<", $host_file or die "Cannot open '$host_file': $!\n";
295 0           my @host;
296 0           while (my $line = <$fh>) {
297 0           $line =~ s/^\s+//; $line =~ s/\s+$//;
  0            
298 0 0         push @host, string_glob_permute($line) if $line =~ /^[^#\s]/;
299             }
300 0           [ List::Util::uniq @host ];
301             }
302              
303             1;
304             __END__