File Coverage

blib/lib/App/RemoteCommand.pm
Criterion Covered Total %
statement 48 232 20.6
branch 0 74 0.0
condition 0 30 0.0
subroutine 16 38 42.1
pod 0 9 0.0
total 64 383 16.7


line stmt bran cond sub pod time code
1             package App::RemoteCommand;
2 1     1   54344 use strict;
  1         3  
  1         21  
3 1     1   4 use warnings;
  1         2  
  1         18  
4              
5 1     1   316 use App::RemoteCommand::Pool;
  1         2  
  1         22  
6 1     1   325 use App::RemoteCommand::SSH;
  1         2  
  1         28  
7 1     1   354 use App::RemoteCommand::Select;
  1         2  
  1         27  
8 1     1   5 use App::RemoteCommand::Util qw(prompt DEBUG logger);
  1         14  
  1         41  
9              
10 1     1   5 use File::Basename ();
  1         1  
  1         12  
11 1     1   405 use File::Copy ();
  1         3519  
  1         24  
12 1     1   626 use File::Temp ();
  1         7574  
  1         25  
13 1     1   581 use Getopt::Long qw(:config no_auto_abbrev no_ignore_case bundling);
  1         8095  
  1         4  
14 1     1   148 use IO::Select;
  1         1  
  1         25  
15 1     1   4 use List::Util ();
  1         2  
  1         13  
16 1     1   4 use POSIX 'strftime';
  1         2  
  1         5  
17 1     1   488 use Pod::Usage 'pod2usage';
  1         38800  
  1         71  
18 1     1   426 use String::Glob::Permute 'string_glob_permute';
  1         499  
  1         51  
19              
20 1     1   5 use constant TICK_SECOND => 0.1;
  1         2  
  1         2179  
21              
22             our $VERSION = '0.981';
23              
24             my $SCRIPT = File::Basename::basename($0);
25             my $SUDO_PROMPT = sprintf "sudo password (asking with %s): ", $SCRIPT;
26             my $SUDO_FAIL = "Sorry, try again.";
27              
28             sub new {
29 0     0 0   my ($class, %option) = @_;
30 0           bless {
31             %option,
32             pending => [],
33             running => App::RemoteCommand::Pool->new,
34             select => App::RemoteCommand::Select->new,
35             }, $class;
36             }
37              
38             sub run {
39 0     0 0   my ($self, @argv) = @_;
40 0 0         $self = $self->new unless ref $self;
41 0           $self->parse_options(@argv);
42 0           $self->register;
43              
44 0           local $| = 1;
45 0     0     my $INT; local $SIG{INT} = sub { $INT++ };
  0            
  0            
46 0     0     my $TERM; local $SIG{TERM} = sub { $TERM++ };
  0            
  0            
47 0           while (1) {
48 0 0 0       if ($INT || $TERM) {
49 0 0         my $signal = $TERM ? "TERM" : "INT";
50 0           warn "\nCatch SIG$signal, try to shutdown gracefully...\n";
51 0           DEBUG and logger "handling signal %s", $signal;
52 0           $self->cancel($signal);
53 0           $INT = $TERM = 0;
54             }
55 0           $self->one_tick;
56 0 0 0       last if @{$self->{pending}} == 0 && $self->{running}->count == 0;
  0            
57             }
58              
59 0           my @success = sort grep { $self->{exit}{$_} == 0 } keys %{$self->{exit}};
  0            
  0            
60 0           my @fail = sort grep { $self->{exit}{$_} != 0 } keys %{$self->{exit}};
  0            
  0            
61 0 0         if (!$self->{quiet}) {
62 0           print STDERR "\e[32mSUCCESS\e[m $_\n" for @success;
63 0           print STDERR "\e[31mFAIL\e[m $_\n" for @fail;
64             }
65 0 0         return @fail ? 1 : 0;
66             }
67              
68             sub parse_options {
69 0     0 0   my ($self, @argv) = @_;
70 0           local @ARGV = @argv;
71             GetOptions
72             "c|concurrency=i" => \($self->{concurrency} = 5),
73 0     0     "h|help" => sub { pod2usage(verbose => 99, sections => 'SYNOPSIS|OPTIONS|EXAMPLES') },
74             "s|script=s" => \($self->{script}),
75 0     0     "v|version" => sub { printf "%s %s\n", __PACKAGE__, $VERSION; exit },
  0            
76             "a|ask-sudo-password" => \(my $ask_sudo_password),
77             "H|host-file=s" => \(my $host_file),
78             "sudo-password=s" => \($self->{sudo_password}),
79             "append-hostname!" => \(my $append_hostname = 1),
80             "append-time!" => \(my $append_time),
81             "sudo=s" => \($self->{sudo_user}),
82 0 0         "q|quiet" => \($self->{quiet}),
83             or exit(2);
84              
85 0 0         my $host_arg = $host_file ? undef : shift @ARGV;
86 0 0         if ($self->{script}) {
87 0           $self->{script_arg} = \@ARGV;
88             } else {
89 0           $self->{command} = \@ARGV;
90             }
91              
92 0 0 0       if (!@{$self->{command} || []} && !$self->{script}) {
  0 0          
93 0           warn "COMMANDS or --script option is required\n";
94 0           exit(2);
95             }
96 0 0         if ($self->{script}) {
97 0           my ($tempfh, $tempfile) = File::Temp::tempfile(UNLINK => 1, EXLOCK => 0);
98 0 0         File::Copy::copy($self->{script}, $tempfh)
99             or die "copy $self->{script} to tempfile: $!";
100 0           close $tempfh;
101 0           chmod 0755, $tempfile;
102 0           $self->{script} = $tempfile;
103             }
104              
105 0           $self->{format} = $self->make_format(
106             append_hostname => $append_hostname,
107             append_time => $append_time,
108             );
109              
110 0 0         if ($ask_sudo_password) {
111 0           my $password = prompt $SUDO_PROMPT;
112 0           $self->{sudo_password} = $password;
113             }
114 0 0         $self->{host} = $host_file ? $self->parse_host_file($host_file)
115             : $self->parse_host_arg($host_arg);
116 0           $self;
117             }
118              
119             sub cancel {
120 0     0 0   my ($self, $signal) = @_;
121 0           @{$self->{pending}} = ();
  0            
122 0           for my $ssh ($self->{running}->all) {
123 0           $ssh->cancel($signal);
124             }
125             }
126              
127             sub one_tick {
128 0     0 0   my $self = shift;
129              
130 0   0       while ($self->{running}->count < $self->{concurrency} and my $ssh = shift @{$self->{pending}}) {
  0            
131 0           $self->{running}->add($ssh);
132             }
133              
134 0           my @ready = $self->{select}->can_read(TICK_SECOND);
135             DEBUG and logger "one tick running %d (watching %d, can_read %d), pending %d",
136 0           $self->{running}->count, $self->{select}->count, scalar @ready, scalar @{$self->{pending}};
137              
138 0 0         if ($self->{select}->count == 0) {
139 0           select undef, undef, undef, TICK_SECOND;
140             } else {
141 0           $self->_process($_) for @ready;
142             }
143              
144             # -1: there is no child process
145             # 0: all child process are running
146 0           my $pid = waitpid -1, POSIX::WNOHANG;
147 0           my $exit = $?;
148              
149 0 0 0       if ($pid > 0 and my $remove = $self->{select}->remove(pid => $pid)) {
150 0           $self->_post_process($remove);
151             }
152              
153 0           for my $ssh ($self->{running}->all) {
154 0 0         my %args = (select => $self->{select}, $pid > 0 ? (pid => $pid, exit => $exit) : ());
155 0           my $is_running = $ssh->one_tick(%args);
156 0 0         if (!$is_running) {
157 0           $self->{exit}{$ssh->host} = $ssh->exit;
158 0           $self->{running}->remove($ssh);
159             }
160             }
161             }
162              
163             # We close fh explicitly; otherwise it happens that
164             # perl warns "unnable to close filehandle properly: Input/output error" under ssh proxy
165             sub _process {
166 0     0     my ($self, $ready) = @_;
167 0           my ($fh, $pid, $host, $buffer) = @{$ready}{qw(fh pid host buffer)};
  0            
168 0           my $len = sysread $fh, my $buf, 64*1024;
169 0           my ($errno, $errmsg) = (0+$!, "$!");
170 0           DEBUG and logger " READ %s, pid %d, len %s, err: %s",
171             $host, $pid, defined $len ? $len : 'undef', $errmsg || "N/A";
172 0 0         if ($len) {
    0          
173 0 0         if (my @line = $buffer->add($buf)->get) {
174 0           print $self->{format}->($host, $_) for @line;
175 0 0 0       if ($ready->{sudo} and @line == 1 and $line[0] eq $SUDO_FAIL) {
      0        
176 0           $self->{select}->remove(fh => $fh);
177 0           close $fh;
178 0           return;
179             }
180             }
181              
182 0 0         if ($buffer->raw eq $SUDO_PROMPT) {
183 0           $ready->{sudo}++;
184 0           my ($line) = $buffer->get(1);
185 0           print $self->{format}->($host, $line);
186 0 0         if (my $sudo_password = $self->{sudo_password}) {
187 0           syswrite $fh, "$sudo_password\n";
188             } else {
189 0           my $err = "have to provide sudo passowrd first, try again with --ask-sudo-password option.";
190 0           print $self->{format}->($host, $err);
191 0           $self->{select}->remove(fh => $fh);
192 0           close $fh;
193             }
194             }
195             } elsif (!defined $len) {
196 0 0         if ($errno != Errno::EIO) { # this happens when use ssh proxy, so skip
197 0           print $self->{format}->($host, "sysread $errmsg");
198             }
199             } else {
200 0           my @line = $buffer->get(1);
201 0           print $self->{format}->($host, $_) for @line;
202 0           $self->{select}->remove(fh => $fh);
203 0           close $fh;
204             }
205             }
206              
207             sub _post_process {
208 0     0     my ($self, $ready) = @_;
209 0           my ($fh, $pid, $host, $buffer) = @{$ready}{qw(fh pid host buffer)};
  0            
210 0 0         if ($fh) {
211             # XXX: We use select() here; otherwise it happens that
212             # <$fh> is blocked under ssh proxy
213 0           my $select = IO::Select->new($fh);
214 0           while ($select->can_read(TICK_SECOND)) {
215 0           my $len = sysread $fh, my $buf, 64*1024;
216 0           DEBUG and logger " POST READ %s, pid %d, len %s", $host, $pid, defined $len ? $len : 'undef';
217 0 0 0       if (defined $len && $len > 0) {
218 0           $buffer->add($buf);
219             } else {
220 0           last;
221             }
222             }
223 0           my @line = $buffer->get(1);
224 0           print $self->{format}->($host, $_) for @line;
225 0           close $fh;
226             }
227             }
228              
229             sub register {
230 0     0 0   my $self = shift;
231              
232 0           my @prefix = ("env", "SUDO_PROMPT=$SUDO_PROMPT");
233 0 0         push @prefix, "sudo", "-u", $self->{sudo_user} if $self->{sudo_user};
234              
235 0           my (@ssh_cmd, $ssh_at_exit);
236 0           my @command;
237 0 0         if (my $script = $self->{script}) {
238 0           my $name = sprintf "/tmp/%s.%d.%d", $SCRIPT, time, rand(10_000);
239             push @ssh_cmd, sub {
240 0     0     my $ssh = shift;
241 0           my $pid = $ssh->scp_put({async => 1, copy_attrs => 1}, $script, $name);
242 0           return ($pid, undef);
243 0           };
244             $ssh_at_exit = sub {
245 0     0     my $ssh = shift;
246 0           my $pid = $ssh->system({async => 1}, "rm", "-f", $name);
247 0           return ($pid, undef);
248 0           };
249 0           @command = (@prefix, $name, @{$self->{script_arg}});
  0            
250             } else {
251 0           my $escape = qr{[^a-zA-Z0-9/_:%\.-]};
252             @command = (
253             @prefix,
254             (@{$self->{command}} == 1 && $self->{command}[0] =~ $escape ? ("bash", "-c") : ()),
255 0 0 0       @{$self->{command}},
  0            
256             );
257             }
258 0           DEBUG and logger "execute %s", join(" ", map { qq('$_') } @command);
259             push @ssh_cmd, sub {
260 0     0     my $ssh = shift;
261 0           my ($fh, $pid) = $ssh->open2pty(@command);
262 0           return ($pid, $fh);
263 0           };
264              
265 0           for my $host (@{$self->{host}}) {
  0            
266 0           my $ssh = App::RemoteCommand::SSH->new($host);
267 0           $ssh->add($_) for @ssh_cmd;
268 0 0         $ssh->at_exit($ssh_at_exit) if $ssh_at_exit;
269 0           push @{$self->{pending}}, $ssh;
  0            
270             }
271             }
272              
273             sub make_format {
274 0     0 0   my ($self, %opt) = @_;
275 0 0 0       if ($opt{append_time} && $opt{append_hostname}) {
    0          
    0          
276 0     0     sub { my ($host, $msg) = @_; "[@{[strftime '%F %T', localtime]}][$host] $msg\n" };
  0            
  0            
  0            
277             } elsif ($opt{append_time}) {
278 0     0     sub { my ($host, $msg) = @_; "[@{[strftime '%F %T', localtime]}] $msg\n" };
  0            
  0            
  0            
279             } elsif ($opt{append_hostname}) {
280 0     0     sub { my ($host, $msg) = @_; "[$host] $msg\n" };
  0            
  0            
281             } else {
282 0     0     sub { my ($host, $msg) = @_; "$msg\n" };
  0            
  0            
283             }
284             }
285              
286             sub parse_host_arg {
287 0     0 0   my ($self, $host_arg) = @_;
288 0           [ List::Util::uniq string_glob_permute($host_arg) ];
289             }
290              
291             sub parse_host_file {
292 0     0 0   my ($self, $host_file) = @_;
293 0 0         open my $fh, "<", $host_file or die "Cannot open '$host_file': $!\n";
294 0           my @host;
295 0           while (my $line = <$fh>) {
296 0           $line =~ s/^\s+//; $line =~ s/\s+$//;
  0            
297 0 0         push @host, string_glob_permute($line) if $line =~ /^[^#\s]/;
298             }
299 0           [ List::Util::uniq @host ];
300             }
301              
302             1;
303             __END__