File Coverage

blib/lib/AnyEvent/SSH2.pm
Criterion Covered Total %
statement 10 12 83.3
branch n/a
condition n/a
subroutine 4 4 100.0
pod n/a
total 14 16 87.5


line stmt bran cond sub pod time code
1             # $Id: SSH2.pm,v 1.47 2009/01/26 01:50:38 turnstep Exp $
2             package AnyEvent::SSH2;
3 1     1   15158 use strict;
  1         2  
  1         44  
4 1     1   445 use AE;
  1         5392  
  1         27  
5 1     1   724 use AnyEvent::Handle;
  1         16166  
  1         41  
6 1     1   250 use Net::SSH::Perl::Kex;
  0            
  0            
7             use Net::SSH::Perl::ChannelMgr;
8             use Net::SSH::Perl::Packet;
9             use Net::SSH::Perl::Buffer;
10             use Net::SSH::Perl::Constants qw( :protocol :msg2 :compat :hosts :channels :proposal :kex
11             CHAN_INPUT_CLOSED CHAN_INPUT_WAIT_DRAIN );
12             use Net::SSH::Perl::Cipher;
13             use Net::SSH::Perl::AuthMgr;
14             use Net::SSH::Perl::Comp;
15             use Net::SSH::Perl::Util qw(:hosts);
16             use Scalar::Util qw(blessed weaken);
17             use Carp qw( croak );
18              
19             use base qw( Net::SSH::Perl );
20             our $VERSION = '0.04';
21              
22             use Errno qw( EAGAIN EWOULDBLOCK );
23             use vars qw( $VERSION $CONFIG $HOSTNAME @PROPOSAL );
24             use vars qw( @PROPOSAL );
25             @PROPOSAL = (
26             KEX_DEFAULT_KEX,
27             KEX_DEFAULT_PK_ALG,
28             KEX_DEFAULT_ENCRYPT,
29             KEX_DEFAULT_ENCRYPT,
30             KEX_DEFAULT_MAC,
31             KEX_DEFAULT_MAC,
32             KEX_DEFAULT_COMP,
33             KEX_DEFAULT_COMP,
34             KEX_DEFAULT_LANG,
35             KEX_DEFAULT_LANG,
36             );
37              
38             $CONFIG = {};
39              
40             BEGIN {
41             use Net::SSH::Perl::Packet;
42             no warnings qw(redefine);
43             *Net::SSH::Perl::Packet::send_ssh2 = sub {
44             my $pack = shift;
45             my $buffer = shift || $pack->{data};
46             my $ssh = $pack->{ssh};
47              
48             my $kex = $ssh->kex;
49             my($ciph, $mac, $comp);
50             if ($kex) {
51             $ciph = $kex->send_cipher;
52             $mac = $kex->send_mac;
53             $comp = $kex->send_comp;
54             }
55             my $block_size = 8;
56              
57             if ($comp && $comp->enabled) {
58             my $compressed = $comp->compress($buffer->bytes);
59             $buffer->empty;
60             $buffer->append($compressed);
61             }
62              
63             my $len = $buffer->length + 4 + 1;
64             my $padlen = $block_size - ($len % $block_size);
65             $padlen += $block_size if $padlen < 4;
66             my $junk = $ciph ? (join '', map chr rand 255, 1..$padlen) : ("\0" x $padlen);
67             $buffer->append($junk);
68              
69             my $packet_len = $buffer->length + 1;
70             $buffer->bytes(0, 0, pack("N", $packet_len) . pack("c", $padlen));
71              
72             my($macbuf);
73             if ($mac && $mac->enabled) {
74             $macbuf = $mac->hmac(pack("N", $ssh->{session}{seqnr_out}) . $buffer->bytes);
75             }
76             my $output = Net::SSH::Perl::Buffer->new( MP => 'SSH2' );
77             $output->append( $ciph && $ciph->enabled ? $ciph->encrypt($buffer->bytes) : $buffer->bytes );
78             $output->append($macbuf) if $mac && $mac->enabled;
79              
80             $ssh->{session}{seqnr_out}++;
81              
82             my $handle = $ssh->sock;
83             my $stat = $handle->push_write($output->bytes);
84             };
85             *Net::SSH::Perl::Packet::read_expect = sub {
86             my $class = shift;
87             my($ssh, $type, $cb) = @_;
88             my $pack = $class->read($ssh, sub{
89             my ($ssh, $pack) = @_;
90             if ($pack->type != $type) {
91             $ssh->fatal_disconnect(sprintf
92             "Protocol error: expected packet type %d, got %d",
93             $type, $pack->type);
94             }
95             $cb->($ssh, $pack);
96             });
97             };
98              
99             *Net::SSH::Perl::Packet::read = sub {
100             my $class = shift;
101             my $ssh = shift;
102             my $cb = shift;
103             my $sock = $ssh->sock;
104             if (my $packet = $class->read_poll($ssh)) {
105             $cb->($ssh, $packet);
106             }
107             else {
108             $sock->push_read(chunk => 4 => sub {
109             my ($hdl, $buf) = @_;
110             if (length($buf) == 0) {
111             croak "Connection closed by remote host." if !$buf;
112             }
113             if (!defined $buf) {
114             next if $! == EAGAIN || $! == EWOULDBLOCK;
115             croak "Read from socket failed: $!";
116             }
117             # Untaint data read from sshd. This is binary data,
118             # so there's nothing to taint-check against/for.
119             ($buf) = $buf =~ /(.*)/s;
120             $ssh->incoming_data->append($buf);
121             $class->read($ssh, $cb);
122             })
123             }
124             };
125             use Net::SSH::Perl::Kex;
126             *Net::SSH::Perl::Kex::exchange_kexinit = sub {
127             my $kex = shift;
128             my $ssh = $kex->{ssh};
129             my $received_packet = shift;
130             my $cb = shift;
131             my $packet;
132            
133             $packet = $ssh->packet_start(SSH2_MSG_KEXINIT);
134             $packet->put_chars($kex->client_kexinit->bytes);
135             $packet->send;
136            
137             if ( defined $received_packet ) {
138             $ssh->debug("Received key-exchange init (KEXINIT), sent response.");
139             $packet = $received_packet;
140             }
141             else {
142             $ssh->debug("Sent key-exchange init (KEXINIT), wait response.");
143             Net::SSH::Perl::Packet->read_expect($ssh, SSH2_MSG_KEXINIT, sub{
144             my ($ssh, $packet) = @_;
145             $kex->{server_kexinit} = $packet->data;
146            
147             $packet->get_char for 1..16;
148             my @s_props = map $packet->get_str, 1..10;
149             $packet->get_int8;
150             $packet->get_int32;
151             $cb->($ssh, \@s_props);
152             });
153             }
154             };
155             *Net::SSH::Perl::Kex::exchange = sub {
156             my $kex = shift;
157             my $ssh = $kex->{ssh};
158             my $packet = shift;
159             my $cb = shift;
160            
161             my @proposal = @PROPOSAL;
162             if (!$ssh->config->get('ciphers')) {
163             if (my $c = $ssh->config->get('cipher')) {
164             $ssh->config->set('ciphers', $c);
165             }
166             }
167             if (my $cs = $ssh->config->get('ciphers')) {
168             # SSH2 cipher names are different; for compatibility, we'll map
169             # valid SSH1 ciphers to the SSH2 equivalent names
170             if($ssh->protocol eq PROTOCOL_SSH2) {
171             my %ssh2_cipher = reverse %Net::SSH::Perl::Cipher::CIPHERS_SSH2;
172             $cs = join ',', map $ssh2_cipher{$_} || $_, split(/,/, $cs);
173             }
174             $proposal[ PROPOSAL_CIPH_ALGS_CTOS ] =
175             $proposal[ PROPOSAL_CIPH_ALGS_STOC ] = $cs;
176             }
177             if ($ssh->config->get('compression')) {
178             $proposal[ PROPOSAL_COMP_ALGS_CTOS ] =
179             $proposal[ PROPOSAL_COMP_ALGS_STOC ] = "zlib";
180             }
181             else {
182             $proposal[ PROPOSAL_COMP_ALGS_CTOS ] =
183             $proposal[ PROPOSAL_COMP_ALGS_STOC ] = "none";
184             }
185             if ($ssh->config->get('host_key_algorithms')) {
186             $proposal[ PROPOSAL_SERVER_HOST_KEY_ALGS ] =
187             $ssh->config->get('host_key_algorithms');
188             }
189            
190             $kex->{client_kexinit} = $kex->kexinit(\@proposal);
191             $kex->exchange_kexinit($packet, sub{
192             my ($ssh, $sprop) = @_;
193             $kex->choose_conf(\@proposal, $sprop);
194             $ssh->debug("Algorithms, c->s: " .
195             "$kex->{ciph_name}[0] $kex->{mac_name}[0] $kex->{comp_name}[0]");
196             $ssh->debug("Algorithms, s->c: " .
197             "$kex->{ciph_name}[1] $kex->{mac_name}[1] $kex->{comp_name}[1]");
198            
199             bless $kex, $kex->{class_name};
200             $kex->exchange(sub{
201             my $ssh = shift;
202             $ssh->debug("Waiting for NEWKEYS message.");
203             Net::SSH::Perl::Packet->read_expect($ssh, SSH2_MSG_NEWKEYS, sub{
204             my ($ssh, $packet) = @_;
205             $ssh->debug("Send NEWKEYS.");
206             $packet = $ssh->packet_start(SSH2_MSG_NEWKEYS);
207             $packet->send;
208            
209             $ssh->debug("Enabling encryption/MAC/compression.");
210             $ssh->{kex} = $kex;
211             for my $att (qw( mac ciph comp )) {
212             $kex->{$att}[0]->enable if $kex->{$att}[0];
213             $kex->{$att}[1]->enable if $kex->{$att}[1];
214             }
215             $cb->($ssh);
216             });
217             });
218            
219             });
220             };
221             use Net::SSH::Perl::Kex::DH1;
222             no strict "subs";
223             *Net::SSH::Perl::Kex::DH1::exchange = sub {
224             package Net::SSH::Perl::Kex::DH1;
225             my $kex = shift;
226             my $ssh = $kex->{ssh};
227             my $packet;
228             my $dh = _dh_new_group1;
229             my $cb = shift;
230              
231             $ssh->debug("Entering Diffie-Hellman Group 1 key exchange.");
232             $packet = $ssh->packet_start(SSH2_MSG_KEXDH_INIT);
233             $packet->put_mp_int($dh->pub_key);
234             $packet->send;
235              
236             $ssh->debug("Sent DH public key, waiting for reply.");
237             Net::SSH::Perl::Packet->read_expect($ssh,
238             SSH2_MSG_KEXDH_REPLY, sub {
239             my ($ssh, $packet) = @_;
240             my $host_key_blob = $packet->get_str;
241             my $s_host_key = Net::SSH::Perl::Key->new_from_blob($host_key_blob,
242             \$ssh->{datafellows});
243             $ssh->debug("Received host key, type '" . $s_host_key->ssh_name . "'.");
244              
245             $ssh->check_host_key($s_host_key);
246              
247             my $dh_server_pub = $packet->get_mp_int;
248             my $signature = $packet->get_str;
249              
250             $ssh->fatal_disconnect("Bad server public DH value")
251             unless _pub_is_valid($dh, $dh_server_pub);
252              
253             $ssh->debug("Computing shared secret key.");
254             my $shared_secret = $dh->compute_key($dh_server_pub);
255              
256             my $hash = $kex->kex_hash(
257             $ssh->client_version_string,
258             $ssh->server_version_string,
259             $kex->client_kexinit,
260             $kex->server_kexinit,
261             $host_key_blob,
262             $dh->pub_key,
263             $dh_server_pub,
264             $shared_secret);
265              
266             $ssh->debug("Verifying server signature.");
267             croak "Key verification failed for server host key"
268             unless $s_host_key->verify($signature, $hash);
269              
270             $ssh->session_id($hash);
271              
272             $kex->derive_keys($hash, $shared_secret, $ssh->session_id);
273             $cb->($ssh);
274             });
275             };
276             use Net::SSH::Perl::AuthMgr;
277             no warnings qw(redefine);
278             #no strict "refs";
279             *Net::SSH::Perl::AuthMgr::new = sub {
280             my $class = shift;
281             my $ssh = shift;
282             my $amgr = bless { ssh => $ssh }, $class;
283             weaken $amgr->{ssh};
284             $amgr;
285             };
286             *Net::SSH::Perl::AuthMgr::run = sub {
287             my $amgr = shift;
288             my $cb = pop @_;
289             my($end, @args) = @_;
290             Net::SSH::Perl::Packet->read($amgr->{ssh}, sub{
291             my ($ssh, $packet) = @_;
292             my $code = $amgr->handler_for($packet->type);
293             unless (defined $code) {
294             $code = $amgr->error_handler ||
295             sub { croak "Protocol error: received type ", $packet->type };
296             }
297             $code->($amgr, $packet, @args);
298             if ($$end) {
299             $cb->($amgr);
300             return;
301             }
302             $amgr->run($end, $cb);
303             });
304             };
305             *Net::SSH::Perl::AuthMgr::authenticate = sub {
306             package Net::SSH::Perl::AuthMgr;
307             my $amgr = shift;
308             my $cb = shift;
309             $amgr->init(sub{
310             my ($ssh, $amgr) = @_;
311             my($packet);
312            
313             my $valid = 0;
314             $amgr->{_done} = 0;
315             $amgr->register_handler(SSH2_MSG_USERAUTH_SUCCESS, sub {
316             $valid++;
317             $amgr->{_done}++
318             });
319             $amgr->register_handler(SSH2_MSG_USERAUTH_BANNER, sub {
320             my $amgr = shift;
321             my($packet) = @_;
322             if ($amgr->{ssh}->config->get('interactive')) {
323             print $packet->get_str, "\n";
324             }
325             });
326             $amgr->register_handler(SSH2_MSG_USERAUTH_FAILURE, \&auth_failure);
327             $amgr->register_error(
328             sub { croak "userauth error: bad message during auth" } );
329             $amgr->run( \$amgr->{_done}, sub{
330             my ($amgr) = shift;
331             $amgr->{agent}->close_socket if $amgr->{agent};
332            
333             $cb->($ssh, $amgr, $valid);
334             } );
335            
336             });
337             };
338              
339             *Net::SSH::Perl::AuthMgr::init = sub {
340             package Net::SSH::Perl::AuthMgr;
341             my $amgr = shift;
342             my $cb = shift;
343             my $ssh = $amgr->{ssh};
344             my($packet);
345            
346             $ssh->debug("Sending request for user-authentication service.");
347             $packet = $ssh->packet_start(SSH2_MSG_SERVICE_REQUEST);
348             $packet->put_str("ssh-userauth");
349             $packet->send;
350            
351             Net::SSH::Perl::Packet->read($ssh, sub {
352             my ($ssh, $packet) = @_;
353             croak "Server denied SSH2_MSG_SERVICE_ACCEPT: ", $packet->type
354             unless $packet->type == SSH2_MSG_SERVICE_ACCEPT;
355             $ssh->debug("Service accepted: " . $packet->get_str . ".");
356            
357             $amgr->{agent} = Net::SSH::Perl::Agent->new(2);
358             $amgr->{service} = "ssh-connection";
359            
360             $amgr->send_auth_none;
361             $cb->($ssh, $amgr);
362             });
363            
364             };
365             };
366             use Carp qw( croak );
367              
368             sub VERSION { $VERSION }
369              
370             sub new {
371             my $class = shift;
372             my $host = shift;
373             croak "usage: ", __PACKAGE__, "->new(\$host)"
374             unless defined $host;
375             my $ssh = bless { host => $host }, $class;
376             my %p = @_;
377             $ssh->{_test} = delete $p{_test};
378             $ssh->_init(%p);
379             $ssh;
380             }
381              
382             sub _init {
383             my $ssh = shift;
384              
385             my %arg = @_;
386             my $user_config = delete $arg{user_config} || "$ENV{HOME}/.ssh/config";
387             my $sys_config = delete $arg{sys_config} || "/etc/ssh_config";
388              
389             my $directives = delete $arg{options} || [];
390              
391             if (my $proto = delete $arg{protocol}) {
392             push @$directives, "Protocol $proto";
393             }
394              
395             my $cfg = Net::SSH::Perl::Config->new($ssh->{host}, %arg);
396             $ssh->{config} = $cfg;
397              
398             # Merge config-format directives given through "options"
399             # (just like -o option to ssh command line). Do this before
400             # reading config files so we override files.
401             for my $d (@$directives) {
402             $cfg->merge_directive($d);
403             }
404              
405             for my $f (($user_config, $sys_config)) {
406             $ssh->debug("Reading configuration data $f");
407             $cfg->read_config($f);
408             }
409              
410             if (my $real_host = $ssh->{config}->get('hostname')) {
411             $ssh->{host} = $real_host;
412             }
413              
414             my $user = _current_user();
415             if ($user && $user eq "root" &&
416             !defined $ssh->{config}->get('privileged')) {
417             $ssh->{config}->set('privileged', 1);
418             }
419              
420             unless ($ssh->{config}->get('protocol')) {
421             $ssh->{config}->set('protocol',
422             PROTOCOL_SSH1 | PROTOCOL_SSH2 | PROTOCOL_SSH1_PREFERRED);
423             }
424              
425             unless (defined $ssh->{config}->get('password_prompt_login')) {
426             $ssh->{config}->set('password_prompt_login', 1);
427             }
428             unless (defined $ssh->{config}->get('password_prompt_host')) {
429             $ssh->{config}->set('password_prompt_host', 1);
430             }
431              
432             unless (defined $ssh->{config}->get('number_of_password_prompts')) {
433             $ssh->{config}->set('number_of_password_prompts', 3);
434             }
435              
436             # login
437             if (!defined $ssh->{config}->get('user')) {
438             $ssh->{config}->set('user',
439             defined $arg{user} ? $arg{user} : _current_user());
440             }
441             if (!defined $arg{pass} && exists $CONFIG->{ssh_password}) {
442             $arg{pass} = $CONFIG->{ssh_password};
443             }
444             $ssh->{config}->set('pass', $arg{pass});
445              
446             #my $suppress_shell = $_[2];
447             }
448              
449             sub _current_user {
450             my $user;
451             eval { $user = scalar getpwuid $> };
452             return $user;
453             }
454              
455             sub set_protocol {
456             my $ssh = shift;
457             my $proto = shift;
458             $ssh->{use_protocol} = $proto;
459             $ssh->debug($ssh->version_string);
460             $ssh->_proto_init;
461             }
462              
463              
464             sub _dup {
465             my($fh, $mode) = @_;
466             my $dup = Symbol::gensym;
467             my $str = "${mode}&$fh";
468             open ($dup, $str) or die "Could not dupe: $!\n"; ## no critic
469             $dup;
470             }
471              
472             sub version_string {
473             my $class = shift;
474             sprintf "Net::SSH::Perl Version %s, protocol version %s.%s.",
475             $class->VERSION, PROTOCOL_MAJOR_2, PROTOCOL_MINOR_2;
476             }
477              
478             sub _exchange_identification {
479             my $ssh = shift;
480             my $remote_id = $ssh->_read_version(@_);
481             ($ssh->{server_version_string} = $remote_id) =~ s/\cM?$//;
482             my($remote_major, $remote_minor, $remote_version) = $remote_id =~
483             /^SSH-(\d+)\.(\d+)-([^\n]+)$/;
484             $ssh->debug("Remote protocol version $remote_major.$remote_minor, remote software version $remote_version");
485              
486             my $proto = $ssh->config->get('protocol');
487             my($mismatch, $set_proto);
488             if ($remote_major == 1) {
489             if ($remote_minor == 99 && $proto & PROTOCOL_SSH2 &&
490             !($proto & PROTOCOL_SSH1_PREFERRED)) {
491             $set_proto = PROTOCOL_SSH2;
492             }
493             elsif (!($proto & PROTOCOL_SSH1)) {
494             $mismatch = 1;
495             }
496             else {
497             $set_proto = PROTOCOL_SSH1;
498             }
499             }
500             elsif ($remote_major == 2) {
501             if ($proto & PROTOCOL_SSH2) {
502             $set_proto = PROTOCOL_SSH2;
503             }
504             }
505             if ($mismatch) {
506             croak sprintf "Protocol major versions differ: %d vs. %d",
507             ($proto & PROTOCOL_SSH2) ? PROTOCOL_MAJOR_2 :
508             PROTOCOL_MAJOR_1, $remote_major;
509             }
510             my $compat20 = $set_proto == PROTOCOL_SSH2;
511             my $buf = sprintf "SSH-%d.%d-%s\n",
512             $compat20 ? PROTOCOL_MAJOR_2 : PROTOCOL_MAJOR_1,
513             $compat20 ? PROTOCOL_MINOR_2 : PROTOCOL_MINOR_1,
514             $VERSION;
515             $ssh->{client_version_string} = substr $buf, 0, -1;
516             my $handle = $ssh->{session}{sock};
517             $handle->push_write($buf);
518             $ssh->set_protocol($set_proto);
519             $ssh->_compat_init($remote_version);
520             }
521              
522             sub _proto_init {
523             my $ssh = shift;
524             my $home = $ENV{HOME} || (getpwuid($>))[7];
525             unless ($ssh->{config}->get('user_known_hosts')) {
526             defined $home or croak "Cannot determine home directory, please set the environment variable HOME";
527             $ssh->{config}->set('user_known_hosts', "$home/.ssh/known_hosts2");
528             }
529             unless ($ssh->{config}->get('global_known_hosts')) {
530             $ssh->{config}->set('global_known_hosts', "/etc/ssh_known_hosts2");
531             }
532             unless (my $if = $ssh->{config}->get('identity_files')) {
533             defined $home or croak "Cannot determine home directory, please set the environment variable HOME";
534             $ssh->{config}->set('identity_files', [ "$home/.ssh/id_dsa" ]);
535             }
536              
537             for my $a (qw( password dsa kbd_interactive )) {
538             $ssh->{config}->set("auth_$a", 1)
539             unless defined $ssh->{config}->get("auth_$a");
540             }
541             }
542              
543             sub kex { $_[0]->{kex} }
544              
545             sub register_handler {
546             my($ssh, $type, $sub, @extra) = @_;
547             $ssh->{client_handlers}{$type} = { code => $sub, extra => \@extra };
548             }
549              
550             sub connect {
551             my $ssh = shift;
552             my($type, @args) = @_;
553             $ssh->{session}{sock} = new AnyEvent::Handle
554             connect => [
555             $ssh->{host} => $ssh->{config}->get('port') || 'ssh'
556             ],
557             on_error => sub {
558             my ($hdl, $fatal, $msg) = @_;
559             $ssh->debug("Can't connect to $ssh->{host}, port $ssh->{config}->get('port'): $msg");
560             $hdl->destroy;
561             },
562             on_connect_error => sub {
563             $ssh->debug("Can't connect to $ssh->{host}, port $ssh->{config}->get('port'): $!");
564             },
565             on_eof => sub {
566             shift->destroy; # explicitly destroy handle
567             };
568             $ssh->{session}{sock}->push_read( line => sub {
569             my ($handle, $line) = @_;
570             $ssh->_exchange_identification($line);
571             $ssh->debug("Connection established.");
572             $ssh->_login();
573              
574              
575             });
576             }
577              
578             sub _login {
579             my $ssh = shift;
580              
581             my $kex = Net::SSH::Perl::Kex->new($ssh);
582             $kex->exchange(undef, sub{
583             my $ssh = shift;
584             my $amgr = Net::SSH::Perl::AuthMgr->new($ssh);
585             $amgr->authenticate(sub{
586             my ($ssh, $amgr, $valid) = @_;
587             $ssh->debug("Login completed, opening dummy shell channel.");
588             my $cmgr = $ssh->channel_mgr;
589             my $channel = $cmgr->new_channel(
590             ctype => 'session', local_window => 0,
591             local_maxpacket => 0, remote_name => 'client-session');
592             $channel->open;
593              
594             Net::SSH::Perl::Packet->read_expect($ssh,
595             SSH2_MSG_CHANNEL_OPEN_CONFIRMATION, sub{
596             my ($ssh, $packet) = @_;
597             $cmgr->input_open_confirmation($packet);
598              
599             #my $suppress_shell = $_[2];
600             #unless ($suppress_shell) {
601             # $ssh->debug("Got channel open confirmation, requesting shell.");
602             # $channel->request("shell", 0);
603             #}
604              
605             $ssh->client_loop;
606             });
607             });
608             })
609             }
610              
611             sub emit {
612             my ($self, $name) = (shift, shift);
613              
614             if (my $s = $self->{events}{$name}) {
615             $self->debug("-- Emit $name in @{[blessed $self]} (@{[scalar @$s]})\n");
616             my $arg = shift @$s;
617             $self->$name(@$arg);
618             }
619             else {
620             $self->debug("-- Emit $name in @{[blessed $self]} (0)\n");
621             die "@{[blessed $self]}: $_[0]" if $name eq 'error';
622             }
623              
624             return $self;
625             }
626              
627             sub _session_channel {
628             my $ssh = shift;
629             my $cmgr = $ssh->channel_mgr;
630              
631             my $channel = $cmgr->new_channel(
632             ctype => 'session', local_window => 32*1024,
633             local_maxpacket => 16*1024, remote_name => 'client-session',
634             rfd => _dup('STDIN', '<'), wfd => _dup('STDOUT', '>'),
635             efd => _dup('STDERR', '>'));
636              
637             $channel;
638             }
639              
640             sub _make_input_channel_req {
641             my($r_exit) = @_;
642             return sub {
643             my($channel, $packet) = @_;
644             my $rtype = $packet->get_str;
645             my $reply = $packet->get_int8;
646             $channel->{ssh}->debug("input_channel_request: rtype $rtype reply $reply");
647             if ($rtype eq "exit-status") {
648             $$r_exit = $packet->get_int32;
649             }
650             if ($reply) {
651             my $r_packet = $channel->{ssh}->packet_start(SSH2_MSG_CHANNEL_SUCCESS);
652             $r_packet->put_int($channel->{remote_id});
653             $r_packet->send;
654             }
655             };
656             }
657              
658             sub on { push @{$_[0]->{events}{$_[1]}}, [$_[-2], $_[-1]] }
659              
660             sub send {
661             my ($ssh, $cmd, $cb) = @_;
662             $ssh->on(cmd => $cmd => $cb);
663             $ssh;
664             }
665              
666              
667             #sub shell {
668             # my $ssh = shift;
669             # my $cb = shift;
670             # $ssh->on(_shell => '');
671             # $ssh->on(on_fininsh => $cb);
672             # $ssh;
673             #}
674             #
675             #sub _shell {
676             # my $ssh = shift;
677             # my $cmgr = $ssh->channel_mgr;
678             # my $channel = $ssh->_session_channel;
679             # $channel->open;
680             #
681             # $channel->register_handler(SSH2_MSG_CHANNEL_OPEN_CONFIRMATION, sub {
682             # my($channel, $packet) = @_;
683             # my $r_packet = $channel->request_start('pty-req', 0);
684             # my($term) = $ENV{TERM} =~ /(\S+)/;
685             # $r_packet->put_str($term);
686             # my $foundsize = 0;
687             # if (eval "require Term::ReadKey") {
688             # my @sz = Term::ReadKey::GetTerminalSize($ssh->sock);
689             # if (defined $sz[0]) {
690             # $foundsize = 1;
691             # $r_packet->put_int32($sz[1]); # height
692             # $r_packet->put_int32($sz[0]); # width
693             # $r_packet->put_int32($sz[2]); # xpix
694             # $r_packet->put_int32($sz[3]); # ypix
695             # }
696             # }
697             # if (!$foundsize) {
698             # $r_packet->put_int32(0) for 1..4;
699             # }
700             # $r_packet->put_str("");
701             # $r_packet->send;
702             # $channel->{ssh}->debug("Requesting shell.");
703             # $channel->request("shell", 0);
704             # });
705             #
706             # my($exit);
707             # $channel->register_handler(SSH2_MSG_CHANNEL_REQUEST,
708             # _make_input_channel_req(\$exit));
709             #
710             # $channel->register_handler("_output_buffer", sub {
711             # syswrite STDOUT, $_[1]->bytes;
712             # });
713             # $channel->register_handler("_extended_buffer", sub {
714             # syswrite STDERR, $_[1]->bytes;
715             # });
716             #
717             # $ssh->debug("Entering interactive session.");
718             #}
719              
720             sub cmd {
721             my $ssh = shift;
722             my($cmd, $cb) = @_;
723              
724             my $cmgr = $ssh->channel_mgr;
725             my $channel = $ssh->_session_channel;
726             $channel->open;
727              
728              
729             $channel->register_handler(SSH2_MSG_CHANNEL_OPEN_CONFIRMATION, sub {
730             my($channel, $packet) = @_;
731              
732             ## Experimental pty support:
733             if ($ssh->{config}->get('use_pty')) {
734             $ssh->debug("Requesting pty.");
735              
736             my $packet = $channel->request_start('pty-req', 0);
737              
738             my($term) = $ENV{TERM} =~ /(\w+)/;
739             $packet->put_str($term);
740             my $foundsize = 0;
741             if (eval "require Term::ReadKey") {
742             my @sz = Term::ReadKey::GetTerminalSize($ssh->sock);
743             if (defined $sz[0]) {
744             $foundsize = 1;
745             $packet->put_int32($sz[1]); # height
746             $packet->put_int32($sz[0]); # width
747             $packet->put_int32($sz[2]); # xpix
748             $packet->put_int32($sz[3]); # ypix
749             }
750             }
751             if (!$foundsize) {
752             $packet->put_int32(0) for 1..4;
753             }
754              
755             # Array used to build Pseudo-tty terminal modes; fat commas separate opcodes from values for clarity.
756             my $terminal_mode_string;
757             if(!defined($ssh->{config}->get('terminal_mode_string'))) {
758             my @terminal_modes = (
759             5 => 0,0,0,4, # VEOF => 0x04 (^d)
760             0 # string must end with a 0 opcode
761             );
762             for my $char (@terminal_modes) {
763             $terminal_mode_string .= chr($char);
764             }
765             }
766             else {
767             $terminal_mode_string = $ssh->{config}->get('terminal_mode_string');
768             }
769             $packet->put_str($terminal_mode_string);
770             $packet->send;
771             }
772              
773             my $r_packet = $channel->request_start("exec", 0);
774             $r_packet->put_str($cmd);
775             $r_packet->send;
776              
777             });
778              
779             my($exit);
780             $channel->register_handler(SSH2_MSG_CHANNEL_REQUEST,
781             _make_input_channel_req(\$exit));
782              
783             my $h = $ssh->{client_handlers};
784             my($stdout, $stderr);
785             if (my $r = $h->{stdout}) {
786             $channel->register_handler("_output_buffer",
787             $r->{code}, @{ $r->{extra} });
788             }
789             else {
790             $channel->register_handler("_output_buffer", sub {
791             $stdout .= $_[1]->bytes;
792             });
793             }
794             if (my $r = $h->{stderr}) {
795             $channel->register_handler("_extended_buffer",
796             $r->{code}, @{ $r->{extra} });
797             }
798             else {
799             $channel->register_handler("_extended_buffer", sub {
800             $stderr .= $_[1]->bytes;
801             });
802             }
803              
804             $ssh->debug("Entering interactive session.");
805             $channel->{cb} = sub {
806             $cb->($ssh, $stdout, $stderr);
807             }
808            
809             }
810              
811             sub break_client_loop { $_[0]->{ek_client_loopcl_quit_pending} = 1 }
812             sub restore_client_loop { $_[0]->{_cl_quit_pending} = 0 }
813             sub _quit_pending { $_[0]->{_cl_quit_pending} }
814              
815             sub client_loop {
816             my $ssh = shift;
817             return unless scalar @{$ssh->{events}{cmd}} > 0;
818             $ssh->emit('cmd');
819             $ssh->{_cl_quit_pending} = 0;
820              
821             # 取所有频道
822             my $cmgr = $ssh->channel_mgr;
823            
824             # 处理每个频道的事件
825             my $h = $cmgr->handlers;
826             $ssh->event_loop($cmgr, $h);
827             }
828              
829             sub event_loop {
830             my ($ssh, $cmgr, $h, $cb) = @_;
831             return $ssh->client_loop if $ssh->_quit_pending;
832             while (my $packet = Net::SSH::Perl::Packet->read_poll($ssh)) {
833             if (my $code = $h->{ $packet->type }) {
834             $code->($cmgr, $packet);
835             }
836             else {
837             $ssh->debug("Warning: ignore packet type " . $packet->type);
838             }
839             }
840              
841             return $ssh->client_loop if $ssh->_quit_pending;
842              
843             $cmgr->process_output_packets;
844              
845             # 如果处理完了. 关掉所有的连接
846             # 之所以在这进行这个操作是因为主 channel 也需要操作
847             for my $c (@{ $cmgr->{channels} }) {
848             next unless defined $c;
849             if ($c->{wfd} &&
850             $c->{extended}->length == 0 &&
851             $c->{output}->length == 0 &&
852             $c->{ostate} == CHAN_OUTPUT_WAIT_DRAIN ) {
853             $c->obuf_empty;
854             }
855             # 上面 obuf_empty 会给 ostate 变成 CHAN_OUTPUT_CLOSED
856             # 下面这个就会发关闭给远程
857             if ($c->delete_if_full_closed) {
858             defined $c->{cb} ? $c->{cb}->() : '';
859             $cmgr->remove($c->{id});
860             }
861             }
862            
863             my $oc = grep { defined } @{ $cmgr->{channels} };
864             return $ssh->client_loop unless $oc > 1;
865              
866             my $cv = AE::cv sub {
867             my $result = shift->recv;
868             delete $ssh->{watcher};
869             $ssh->event_loop($cmgr, $h, $cb);
870             };
871              
872             # 这是处理频道上的输出, 客户端的输入
873             for my $c (@{ $cmgr->{channels} }) {
874             next unless defined $c;
875             my $id = $c->{id};
876             if ($c->{rfd} && $c->{istate} == CHAN_INPUT_OPEN &&
877             $c->{remote_window} > 0 &&
878             $c->{input}->length < $c->{remote_window}) {
879             $ssh->{watcher}{$id}{rfd} = AE::io $c->{rfd}, 0, sub {
880             # 顺序记录 - 频道 - rfd
881             my $buf;
882             sysread $c->{rfd}, $buf, 8192;
883             ($buf) = $buf =~ /(.*)/s;
884             $c->send_data($buf);
885             $cv->send('rfd');
886             delete $ssh->{watcher}{$id}{rfd}
887             };
888             }
889              
890             # 给内容输出
891             if (defined $c->{wfd} &&
892             $c->{ostate} == CHAN_OUTPUT_OPEN ||
893             $c->{ostate} == CHAN_OUTPUT_WAIT_DRAIN) {
894             if ($c->{output} and $c->{output}->length > 0) {
895             $ssh->{watcher}{$id}{wfd} = AE::io $c->{wfd}, 1, sub {
896             if (my $r = $c->{handlers}{"_output_buffer"}) {
897             $r->{code}->( $c, $c->{output}, @{ $r->{extra} } );
898             }
899             $c->{local_consumed} += $c->{output}->length;
900             $c->{output}->empty;
901             $cv->send('wfd');
902             delete $ssh->{watcher}{$id}{wfd}
903             }
904             }
905             }
906            
907             if ($c->{efd} && $c->{extended}->length > 0) {
908             my $c->{watcher}{$id}{efd} = AE::io $c->{efd}, 1, sub {
909             if (my $r = $c->{handlers}{"_extended_buffer"}) {
910             $r->{code}->( $c, $c->{extended}, @{ $r->{extra} } );
911             }
912             $c->{local_consumed} += $c->{extended}->length;
913             $c->{extended}->empty;
914             $cv->send('efd');
915             delete $ssh->{watcher}{$id}{efd}
916             };
917             }
918              
919            
920             # 原进程
921             $c->check_window;
922             if ($c->delete_if_full_closed) {
923             defined $c->{cb} ? $c->{cb}->() : '';
924             $cmgr->remove($c->{id});
925             }
926             }
927              
928              
929             # 这是主连接的句柄
930             my $handle = $ssh->{session}{sock};
931             $handle->push_read( chunk => 4 => sub {
932             my ($handle, $buf) = @_;
933             if (!length($buf)) {
934             croak "Connection failed: $!\n";
935             }
936             $ssh->break_client_loop if length($buf) == 0;
937             ($buf) = $buf =~ /(.*)/s; ## Untaint data. Anything allowed.
938             $ssh->incoming_data->append($buf);
939             $cv->send('main');
940             });
941             }
942              
943             sub channel_mgr {
944             my $ssh = shift;
945             unless (defined $ssh->{channel_mgr}) {
946             $ssh->{channel_mgr} = Net::SSH::Perl::ChannelMgr->new($ssh);
947             }
948             $ssh->{channel_mgr};
949             }
950             sub _read_version {
951             my $ssh = shift;
952             my $line = shift;;
953             my $len = length $line;
954             unless(defined($len)) {
955             next if $! == EAGAIN || $! == EWOULDBLOCK;
956             croak "Read from socket failed: $!";
957             }
958             croak "Connection closed by remote host" if $len == 0;
959             croak "Version line too long: $line"
960             if substr($line, 0, 4) eq "SSH-" and length($line) > 255;
961             croak "Pre-version line too long: $line" if length($line) > 4*1024;
962             if (substr($line, 0, 4) ne "SSH-") {
963             $ssh->debug("Remote version string: $line");
964             }
965             return $line;
966             }
967             sub sock { $_[0]->{session}{sock} }
968              
969             1;
970             __END__
971              
972             =pod
973            
974             =encoding utf8
975              
976             =head1 NAME
977              
978             AnyEvent::SSH2 - 基于 AnyEvent çš„ SSH2 的非阻塞事件驱动的实现
979              
980             =head1 SYNOPSIS
981              
982             对多台主机, 并行的远程执行一些命令.
983              
984             use AE;
985             use AnyEvent::SSH2;
986              
987             my $ssh1 = AnyEvent::SSH2->new(
988             'ip',
989             user => 'root',
990             pass => 'pass',
991             );
992            
993             my $ssh2 = AnyEvent::SSH2->new(
994             'ip'
995             user => 'root',
996             pass => 'pass',
997             );
998            
999             my $cv = AE::cv;
1000              
1001             $cv->begin;
1002             $ssh1->send('sleep 5;hostname' => sub {
1003             my ($ssh, $stdout, $stderr) = @_;
1004             print "$stdout";
1005             $cv->end;
1006             })->connect;
1007            
1008             $cv->begin;
1009             $ssh2->send('sleep 1;hostname' => sub {
1010             my ($ssh, $stdout, $stderr) = @_;
1011             print "$stdout";
1012             $cv->end;
1013             })->connect;
1014              
1015             $cv->recv;
1016              
1017             对同一个主机, 并行的执行多条命令...注意顺序并不固定, 任何一个命令先执行完都会先回调.
1018              
1019             use AnyEvent::SSH2;
1020             my $ssh = AnyEvent::SSH2->new(
1021             'ip'
1022             user => 'root',
1023             pass => 'pass',
1024             );
1025            
1026            
1027             my $cv = AE::cv;
1028             $cv->begin;
1029             $ssh->send('sleep 5; echo 5' => sub {
1030             my ($ssh, $stdout, $stderr) = @_;
1031             print "$stdout";
1032             $cv->end;
1033             });
1034            
1035             $cv->begin;
1036             $ssh->send('sleep 1; echo 1 ; uptime' => sub {
1037             my ($ssh, $stdout, $stderr) = @_;
1038             print "$stdout";
1039             $cv->end;
1040             });
1041            
1042             $ssh->connect;
1043            
1044             $cv->recv;
1045              
1046             或者你可能想有一定层次, 根据前一条命令的条件来执行指定的命令.
1047              
1048             my $cv = AE::cv;
1049             $ssh->send('sleep 5; echo 5' => sub {
1050             my ($ssh, $stdout, $stderr) = @_;
1051             print "$stdout";
1052             $ssh->send('sleep 1; echo 1 ; uptime' => sub {
1053             my ($ssh, $stdout, $stderr) = @_;
1054             print "$stdout";
1055             $cv->send;
1056             });
1057             });
1058            
1059             $ssh->connect;
1060            
1061             $cv->recv;
1062              
1063             =head1 DESCRIPTION
1064              
1065             这个模块是基于 Net::SSH::Perl 实现的在 AnyEvent 上的事件驱动的支持. 并不是使用的 Fork 的实现 (non-fork), 这是基于 socket 的原生事件驱动实现.
1066             可以同时异步的连接多个主机进行操作. 并且也可以支持同时对一个主机同时执行多条命令与根据前面结果然后在执行指定命令.
1067              
1068             =head1 属性
1069              
1070             默认对象 new 的时候需要提供连接的主机地址. 本对象的属性继承所有的 L<Net::SSH::Perl> 的属性. 并实现了下列这些
1071              
1072             =head2 user
1073              
1074             提供用于远程连接的用户名. 如果不提供会默认使用当前用户.
1075              
1076             =head2 pass
1077              
1078             提供用于远程连接的密码. 也支持 key 方式认证. 需要指定如下属性
1079              
1080             identity_files => ['/root/.ssh/id_rsa'],
1081             options => [
1082             'PubkeyAuthentication yes',
1083             'PasswordAuthentication no', # 可能你想关掉密码认证
1084             ],
1085              
1086             =head1 方法
1087              
1088             本对象所支持的方法如下
1089              
1090             =head2 send
1091              
1092             这个需要提供你要给远程执行的命令做为第一个参数, 第二个参数是命令执行完的回调函数.
1093             回调函数的第二个和第三个参数会别会是命令执行的标准输出和标准错误.
1094              
1095             本方法可以重复设置, 都会一次性发给远程主机执行. 所以执行完会根据执行结果的速度, 会立即返回并调用回调.
1096              
1097             =head2 connect
1098              
1099             当上面的命令定义完了, 可以调用 connect 方法来运行整个事件.
1100              
1101             =head1 SEE ALSO
1102              
1103             L<AnyEvent>, L<Net::SSH::Perl>
1104              
1105             =head1 AUTHOR
1106              
1107             扶凯 fukai <iakuf@163.com>
1108              
1109             =cut