File Coverage

blib/lib/IPC/PerlSSH/Async.pm
Criterion Covered Total %
statement 136 148 91.8
branch 41 74 55.4
condition 10 20 50.0
subroutine 22 23 95.6
pod 6 9 66.6
total 215 274 78.4


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2008-2012 -- leonerd@leonerd.org.uk
5              
6             package IPC::PerlSSH::Async;
7              
8 6     6   533438 use strict;
  6         36  
  6         231  
9 6     6   79 use warnings;
  6         11  
  6         239  
10 6     6   42 use base qw( IO::Async::Notifier IPC::PerlSSH::Base );
  6         10  
  6         6311  
11             IPC::PerlSSH::Base->VERSION( '0.16' );
12              
13 6     6   38724 use IO::Async::Process 0.37;
  6         162132  
  6         291  
14              
15             our $VERSION = '0.07';
16              
17 6     6   64 use Carp;
  6         16  
  6         12181  
18              
19             =head1 NAME
20              
21             C - Asynchronous wrapper around L
22              
23             =head1 SYNOPSIS
24              
25             I the constructor has changed since version 0.03.
26              
27             use IO::Async::Loop;
28             use IPC::PerlSSH::Async;
29              
30             my $loop = IO::Async::Loop->new();
31              
32             my $ips = IPC::PerlSSH::Async->new(
33             on_exception => sub { die "Failed - $_[0]\n" },
34              
35             Host => "over.there",
36             );
37              
38             $loop->add( $ips );
39              
40             $ips->eval(
41             code => "use POSIX qw( uname ); uname()",
42             on_result => sub { print "Remote uname is ".join( ",", @_ )."\n"; },
43             );
44              
45             # We can pass arguments
46             $ips->eval(
47             code => 'open FILE, ">", shift; print FILE shift; close FILE;',
48             args => [ "foo.txt", "Hello, world!" ],
49             on_result => sub { print "Wrote foo.txt\n" },
50             );
51              
52             # We can load pre-defined libraries
53             $ips->use_library(
54             library => "FS",
55             funcs => [qw( unlink )],
56             on_loaded => sub {
57             $ips->call(
58             name => "unlink",
59             args => [ "foo.txt" ],
60             on_result => sub { print "Removed foo.txt\n" },
61             );
62             },
63             );
64              
65             $loop->loop_forever;
66              
67             =head1 DESCRIPTION
68              
69             This module provides an object class that implements the C
70             behaviour in an asynchronous way, suitable for use in an C-based
71             program.
72              
73             Briefly, C is a module that allows execution of perl code in a
74             remote perl instance, usually accessed via F, with the notable
75             distinction that the module does not need to be present in the remote end, nor
76             does any special server need to be running, besides F itself. For more
77             detail, see the L documentation.
78              
79             =cut
80              
81             =head1 INITIAL PARAMETERS
82              
83             As well as the L named below, the constructor will take any of
84             the constructor arguments named by L, to set up the connection.
85              
86             =cut
87              
88             =head1 PARAMETERS
89              
90             The following named parameters may be passed to C or C:
91              
92             =over 8
93              
94             =item on_exception => CODE
95              
96             Optional. A default callback to use if a call to C, C or
97             C does not provide one. If it is changed while a result it
98             outstanding, the handler that was in place at the time it was invoked will be
99             used in case of errors. Changes will only affect new C, C or
100             C calls made after the change.
101              
102             =item on_exit => CODE
103              
104             Optional. A callback to invoke if the remote perl process exits. Will be
105             passed directly to the C C method.
106              
107             =back
108              
109             =cut
110              
111             sub new
112             {
113 5     5 1 34266 my $class = shift;
114 5         32 my %args = @_;
115              
116 5         20 my $loop = delete $args{loop};
117              
118 5         67 my $self = $class->SUPER::new( %args );
119              
120 5 100       95 if( $loop ) {
121 1         47 warnings::warnif( deprecated => "'loop' constructor argument is deprecated" );
122 1         1282 $loop->add( $self );
123             }
124              
125 5         566 return $self;
126             }
127              
128             sub _init
129             {
130 5     5   65 my $self = shift;
131 5         14 my ( $params ) = @_;
132              
133             # This will delete keys
134 5         64 $self->{IPC_PerlSSH_command} = [ $self->build_command_from( $params ) ];
135              
136 5         130 $self->{message_queue} = [];
137              
138 5         45 return $self->SUPER::_init( $params );
139             }
140              
141             sub configure
142             {
143 5     5 1 40 my $self = shift;
144 5         17 my %params = @_;
145              
146 5 100       31 if( exists $params{on_exception} ) {
147 4         11 my $on_exception = delete $params{on_exception};
148 4 50 33     56 !$on_exception or ref $on_exception eq "CODE"
149             or croak "Expected 'on_exception' to be a CODE reference";
150              
151 4         14 $self->{on_exception} = $on_exception;
152             }
153              
154 5 100       35 if( exists $params{on_exit} ) {
155 1         4 my $on_exit = delete $params{on_exit};
156 1 50 33     16 !$on_exit or ref $on_exit eq "CODE"
157             or croak "Expected 'on_exit' to be a CODE reference";
158              
159 1         3 $self->{on_exit} = $on_exit;
160             }
161              
162 5         50 $self->SUPER::configure( %params );
163             }
164              
165             sub _add_to_loop
166             {
167 5     5   1152 my $self = shift;
168 5         46 $self->SUPER::_add_to_loop( @_ );
169              
170             my $on_exit = $self->{on_exit} || sub {
171 0     0   0 print STDERR "Remote SSH died early";
172 5   100     65 };
173              
174 5 50       31 if( my $command = delete $self->{IPC_PerlSSH_command} ) {
175             # TODO: IO::Async ought to have nice ways to do this
176 5         70 my $process = $self->{process} = IO::Async::Process->new(
177             command => $command,
178             stdio => { via => "pipe_rdwr" },
179             on_finish => $on_exit,
180             );
181              
182 5         900 $process->stdio->configure(
183             on_read => $self->_replace_weakself( "on_read" ),
184             );
185              
186 5         127636 $self->add_child( $process );
187             }
188              
189 5         104616 $self->send_firmware;
190             }
191              
192             sub on_read
193             {
194 19     19 0 11766735 my $self = shift;
195 19         54 my ( $buffref, $closed ) = @_;
196              
197 19 100       116 if( $closed ) {
198 1         4 while( my $cb = shift @{ $self->{message_queue} } ) {
  2         26  
199 1         3 $cb->( "CLOSED" );
200             }
201 1         5 return 0;
202             }
203              
204 18         206 my ( $message, @args ) = $self->parse_message( $$buffref );
205 18 100       246233 return 0 unless defined $message;
206              
207 14         30 my $cb = shift @{ $self->{message_queue} };
  14         51  
208 14         206 $cb->( $message, @args );
209              
210 14         4151 return 1;
211             }
212              
213             sub write
214             {
215 20     20 0 28097 my $self = shift;
216 20         314 $self->{process}->stdio->write( @_ );
217             }
218              
219             sub do_message
220             {
221 15     15 0 116 my $self = shift;
222 15         172 my %args = @_;
223              
224 15         323 my $message = $args{message};
225 15         32 my $args = $args{args};
226              
227 15         30 my $on_response = $args{on_response};
228 15 50       61 ref $on_response eq "CODE" or croak "Expected 'on_response' as a CODE reference";
229              
230 15         424 $self->write_message( $message, @$args );
231              
232 15         2648 push @{ $self->{message_queue} }, $on_response;
  15         109  
233             }
234              
235             =head1 METHODS
236              
237             =cut
238              
239             =head2 $ips->eval( %args )
240              
241             This method evaluates code in the remote host, passing arguments and returning
242             the result.
243              
244             The C<%args> hash takes the following keys:
245              
246             =over 8
247              
248             =item code => STRING
249              
250             The perl code to execute, in a string. (i.e. NOT a CODE reference).
251              
252             =item args => ARRAY
253              
254             Optional. An ARRAY reference containing arguments to pass to the code.
255              
256             =item on_result => CODE
257              
258             Continuation to invoke when the code returns a result.
259              
260             =item on_exception => CODE
261              
262             Optional. Continuation to invoke if the code throws an exception.
263              
264             =back
265              
266             The code should be passed in a string, and is evaluated using a string
267             C in the remote host, in list context. If this method is called in
268             scalar context, then only the first element of the returned list is returned.
269             Only string scalar values are supported in either the arguments or the return
270             values; no deeply-nested structures can be passed.
271              
272             To pass or return a more complex structure, consider using a module such as
273             L, which can serialise the structure into a plain string, to be
274             deserialised on the remote end.
275              
276             If the remote code threw an exception, then this function propagates it as a
277             plain string. If the remote process exits before responding, this will be
278             propagated as an exception.
279              
280             =cut
281              
282             sub eval
283             {
284 10     10 1 30410 my $self = shift;
285 10         134 my %args = @_;
286              
287 10         289 my $code = $args{code};
288 10         36 my $args = $args{args};
289              
290 10         23 my $on_result = $args{on_result};
291 10 50       79 ref $on_result eq "CODE" or croak "Expected 'on_result' as a CODE reference";
292              
293 10   66     106 my $on_exception = $args{on_exception} || $self->{on_exception};
294 10 50       47 ref $on_exception eq "CODE" or croak "Expected 'on_exception' as a CODE reference";
295              
296             $self->do_message(
297             message => "EVAL",
298             args => [ $code, $args ? @$args : () ],
299              
300             on_response => sub {
301 10     10   1761 my ( $ret, @args ) = @_;
302              
303 10 100       52 if( $ret eq "RETURNED" ) { $on_result->( @args ); }
  9 50       221  
    50          
304 0         0 elsif( $ret eq "DIED" ) { $on_exception->( $args[0] ); }
305 1         3 elsif( $ret eq "CLOSED" ) { $on_exception->( "Remote connection closed" ); }
306 0         0 else { warn "Unknown return result $ret"; }
307             },
308 10 100       3360 );
309             }
310              
311             =head2 $ips->store( %args )
312              
313             This method sends code to the remote host to store in a named procedure which
314             can be executed later.
315              
316             The C<%args> hash takes the following keys:
317              
318             =over 8
319              
320             =item name => STRING
321              
322             A name for the stored procedure.
323              
324             =item code => STRING
325              
326             The perl code to store, in a string. (i.e. NOT a CODE reference).
327              
328             =item on_stored => CODE
329              
330             Continuation to invoke when the code is successfully stored.
331              
332             =item on_exception => CODE
333              
334             Optional. Continuation to invoke if compiling the code throws an exception.
335              
336             =back
337              
338             The code should be passed in a string, along with a name which can later be
339             called by the C method.
340              
341             While the code is not executed, it will still be compiled into a CODE
342             reference in the remote host. Any compile errors that occur will still invoke
343             the C continuation. If the remote process exits before
344             responding, this will be propagated as an exception.
345              
346             =cut
347              
348             sub store
349             {
350 1     1 1 1232 my $self = shift;
351 1         11 my %args = @_;
352              
353 1         4 my $name = $args{name};
354 1         3 my $code = $args{code};
355              
356 1         3 my $on_stored = $args{on_stored};
357 1 50       10 ref $on_stored eq "CODE" or croak "Expected 'on_stored' as a CODE reference";
358              
359 1   33     11 my $on_exception = $args{on_exception} || $self->{on_exception};
360 1 50       6 ref $on_exception eq "CODE" or croak "Expected 'on_exception' as a CODE reference";
361              
362 1 50       13 $self->_has_stored_code( $name ) and return $on_exception->( "Already have a stored function called '$name'" );
363              
364             $self->do_message(
365             message => "STORE",
366             args => [ $name, $code ],
367              
368             on_response => sub {
369 1     1   3 my ( $ret, @args ) = @_;
370              
371 1 50       5 if( $ret eq "OK" ) {
    0          
    0          
372 1         4 $self->{stored}{$name} = 1;
373 1         3 $on_stored->();
374             }
375 0         0 elsif( $ret eq "DIED" ) { $on_exception->( $args[0] ); }
376 0         0 elsif( $ret eq "CLOSED" ) { $on_exception->( "Remote connection closed" ); }
377 0         0 else { warn "Unknown return result $ret"; }
378             },
379 1         17 );
380             }
381              
382             sub _has_stored_code
383             {
384 5     5   3534 my $self = shift;
385 5         77 my ( $name ) = @_;
386 5         58 return exists $self->{stored}{$name};
387             }
388              
389             =head2 $ips->call( %args )
390              
391             This method invokes a stored procedure that has earlier been defined using the
392             C method. The arguments are passed and the result is returned in the
393             same way as with the C method.
394              
395             The C<%params> hash takes the following keys:
396              
397             =over 8
398              
399             =item name => STRING
400              
401             The name of the stored procedure.
402              
403             =item args => ARRAY
404              
405             Optional. An ARRAY reference containing arguments to pass to the code.
406              
407             =item on_result => CODE
408              
409             Continuation to invoke when the code returns a result.
410              
411             =item on_exception => CODE
412              
413             Optional. Continuation to invoke if the code throws an exception or exits.
414              
415             =back
416              
417             =cut
418              
419             sub call
420             {
421 2     2 1 823 my $self = shift;
422 2         22 my %args = @_;
423              
424 2         10 my $name = $args{name};
425 2         5 my $args = $args{args};
426              
427 2         5 my $on_result = $args{on_result};
428 2 50       11 ref $on_result eq "CODE" or croak "Expected 'on_result' as a CODE reference";
429              
430 2   33     38 my $on_exception = $args{on_exception} || $self->{on_exception};
431 2 50       11 ref $on_exception eq "CODE" or croak "Expected 'on_exception' as a CODE reference";
432              
433 2 50       7 $self->_has_stored_code( $name ) or return $on_exception->( "Do not have a stored function called '$name'" );
434              
435             $self->do_message(
436             message => "CALL",
437             args => [ $name, $args ? @$args : () ],
438              
439             on_response => sub {
440 2     2   7 my ( $ret, @args ) = @_;
441              
442 2 50       9 if( $ret eq "RETURNED" ) { $on_result->( @args ); }
  2 0       9  
    0          
443 0         0 elsif( $ret eq "DIED" ) { $on_exception->( $args[0] ); }
444 0         0 elsif( $ret eq "CLOSED" ) { $on_exception->( "Remote connection closed" ); }
445 0         0 else { warn "Unknown return result $ret"; }
446             },
447 2 50       40 );
448             }
449              
450             =head2 $ips->use_library( %args )
451              
452             This method loads a library of code from a module, and stores them to the
453             remote perl by calling C on each one.
454              
455             The C<%params> hash takes the following keys:
456              
457             =over 8
458              
459             =item library => STRING
460              
461             Name of the library to load
462              
463             =item funcs => ARRAY
464              
465             Optional. Reference to an array containing names of functions to load.
466              
467             =item on_loaded => CODE
468              
469             Continuation to invoke when all the functions are stored.
470              
471             =item on_exception => CODE
472              
473             Optional. Continuation to invoke if storing a function throws an exception or
474             exits.
475              
476             =back
477              
478             The library name may be a full class name, or a name within the
479             C space.
480              
481             If the funcs list is non-empty, then only those named functions are stored
482             (analogous to the C perl statement). This may be useful in large
483             libraries that define many functions, only a few of which are actually used.
484              
485             For more information, see L.
486              
487             =cut
488              
489             sub use_library
490             {
491 4     4 1 3160 my $self = shift;
492 4         40 my %args = @_;
493              
494 4         29 my $library = $args{library};
495 4         14 my $funcs = $args{funcs};
496              
497 4         6 my $on_loaded = $args{on_loaded};
498 4 50       22 ref $on_loaded eq "CODE" or croak "Expected 'on_loaded' as a CODE reference";
499              
500 4   66     26 my $on_exception = $args{on_exception} || $self->{on_exception};
501 4 50       12 ref $on_exception eq "CODE" or croak "Expected 'on_exception' as a CODE reference";
502              
503 4 100       7 my ( $package, $funcshash ) = eval { $self->load_library_pkg( $library, $funcs ? @$funcs : () ) };
  4         55  
504 4 100       2128 if( $@ ) {
505 2         7 $on_exception->( $@ );
506 2         12 return;
507             }
508              
509 2 100       8 $self->{stored_pkg}{$package} and delete $funcshash->{_init};
510              
511             $self->do_message(
512             message => "STOREPKG",
513             args => [ $package, %$funcshash ],
514              
515             on_response => sub {
516 2     2   5 my ( $ret, @args ) = @_;
517              
518 2 50       8 if( $ret eq "OK" ) {
    0          
    0          
519 2         9 $self->{stored_pkg}{$package} = 1;
520 2         13 $self->{stored}{$_} = 1 for keys %$funcshash;
521 2         9 $on_loaded->();
522             }
523 0         0 elsif( $ret eq "DIED" ) { $on_exception->( $args[0] ); }
524 0         0 elsif( $ret eq "CLOSED" ) { $on_exception->( "Remote connection closed" ); }
525 0         0 else { warn "Unknown return result $ret"; }
526             },
527 2         35 );
528             }
529              
530             sub DESTROY
531             {
532 2     2   2847 my $self = shift;
533              
534             # Be safe at global destruction time
535 2 50       59 $self->{stream}->close if defined $self->{stream};
536             }
537              
538             # Keep perl happy; keep Britain tidy
539             1;
540              
541             __END__