File Coverage

blib/lib/AnyEvent/Fork/Remote.pm
Criterion Covered Total %
statement 64 89 71.9
branch 16 34 47.0
condition 4 12 33.3
subroutine 13 21 61.9
pod 11 13 84.6
total 108 169 63.9


line stmt bran cond sub pod time code
1             =head1 NAME
2              
3             AnyEvent::Fork::Remote - remote processes with AnyEvent::Fork interface
4              
5             THE API IS NOT FINISHED, CONSIDER THIS A BETA RELEASE
6              
7             =head1 SYNOPSIS
8              
9             use AnyEvent;
10             use AnyEvent::Fork::Remote;
11              
12             my $rpc = AnyEvent::Fork::Remote
13             ->new_execp ("ssh", "ssh", "othermachine", "perl")
14             ->require ("MyModule")
15             ->run ("MyModule::run", my $cv = AE::cv);
16              
17             my $fh = $cv->recv;
18              
19             =head1 DESCRIPTION
20              
21             Despite what the name of this module might suggest, it doesn't actually
22             create remote processes for you. But it does make it easy to use them,
23             once you have started them.
24              
25             This module implements a very similar API as L. In fact,
26             similar enough to require at most minor modifications to support both
27             at the same time. For example, it works with L and
28             L.
29              
30             The documentation for this module will therefore only document the parts
31             of the API that differ between the two modules.
32              
33             =head2 SUMMARY OF DIFFERENCES
34              
35             Here is a short summary of the main differences between L
36             and this module:
37              
38             =over 4
39              
40             =item * C is not implemented and will fail
41              
42             =item * the child-side C function must read from STDIN and write to STDOUT
43              
44             =item * C does not actually fork, but will create a new process
45              
46             =back
47              
48             =head1 EXAMPLE
49              
50             This example uses a local perl (because that is likely going to work
51             without further setup) and the L to create simple
52             worker process.
53              
54             First load the modules we are going to use:
55              
56             use AnyEvent;
57             use AnyEvent::Fork::Remote;
58             use AnyEvent::Fork::RPC;
59              
60             Then create, configure and run the process:
61              
62             my $rpc = AnyEvent::Fork::Remote
63             ->new_execp ("perl", "perl")
64             ->eval ('
65             sub myrun {
66             "this is process $$, and you passed <@_>"
67             }
68             ')
69             ->AnyEvent::Fork::RPC::run ("myrun");
70              
71             We use C to execute the first F found in the PATH. You'll
72             have to make sure there is one for this to work. The perl does not
73             actually have to be the same perl as the one running the example, and it
74             doesn't need to have any modules installed.
75              
76             The reason we have to specify C twice is that the first argument to
77             C (and also C) is the program name or path, while
78             the remaining ones are the arguments, and the first argument passed to a
79             program is the program name, so it has to be specified twice.
80              
81             Finally, the standard example, send some numbers to the remote function,
82             and print whatever it returns:
83              
84             my $cv = AE::cv;
85              
86             for (1..10) {
87             $cv->begin;
88             $rpc->($_, sub {
89             print "remote function returned: $_[0]\n";
90             $cv->end;
91             });
92             }
93              
94             $cv->recv;
95              
96             Now, executing F in the PATH isn't very interesting - you could have
97             done the same with L, and it might even be more efficient.
98              
99             The power of this module is that the F doesn't need to run on the
100             local box, you could simply substitute another command, such as F
101             remotebox perl>:
102              
103             my $rpc = AnyEvent::Fork::Remote
104             ->new_execp ("ssh", "ssh", "remotebox", "perl")
105              
106             And if you want to use a specific path for ssh, use C:
107              
108             my $rpc = AnyEvent::Fork::Remote
109             ->new_exec ("/usr/bin/ssh", "ssh", "remotebox", "perl")
110              
111             Of course, it doesn't really matter to this module how you construct your
112             perl processes, what matters is that somehow, you give it a file handle
113             connected to the new perls STDIN and STDOUT.
114              
115             =head1 PARENT PROCESS USAGE
116              
117             =over 4
118              
119             =cut
120              
121             package AnyEvent::Fork::Remote;
122              
123 2     2   15091 use common::sense;
  2         14  
  2         6  
124              
125 2     2   73 use Carp ();
  2         2  
  2         19  
126 2     2   402 use Errno ();
  2         856  
  2         27  
127              
128 2     2   882 use AnyEvent ();
  2         3625  
  2         2318  
129              
130             our $VERSION = '1.0';
131              
132             # xored together must start and and with \n
133             my $magic0 = "Pdk{6y[_zZ";
134             my $magic1 = "Z^yZ7~i=oP";
135              
136             =item my $proc = new_exec AnyEvent::Fork::Remote $path, @args...
137              
138             Creates a new C object. Unlike L,
139             processes are only created when C is called, every other method call
140             is is simply recorded until then.
141              
142             Each time a new process is needed, it executes C<$path> with the given
143             arguments (the first array member must be the program name, as with
144             the C function with explicit PROGRAM argument) and both C
145             and C connected to a communications socket. No input must be
146             consumed by the command before F is started, and no output should be
147             generated.
148              
149             The program I invoke F somehow, with STDIN and STDOUT intact,
150             without specifying anything to execute (no script file name, no C<-e>
151             switch etc.).
152              
153             Here are some examples to give you an idea:
154              
155             # just "perl"
156             $proc = new_exec AnyEvent::Fork::Remote
157             "/usr/bin/perl", "perl";
158              
159             # rsh othernode exec perl
160             $proc = new_exec AnyEvent::Fork::Remote
161             "/usr/bin/rsh", "rsh", "othernode", "exec perl";
162              
163             # a complicated ssh command
164             $proc = new_exec AnyEvent::Fork::Remote
165             "/usr/bin/ssh",
166             qw(ssh -q
167             -oCheckHostIP=no -oTCPKeepAlive=yes -oStrictHostKeyChecking=no
168             -oGlobalKnownHostsFile=/dev/null -oUserKnownHostsFile=/dev/null
169             otherhost
170             exec perl);
171              
172             =item my $proc = new_execp AnyEvent::Fork::Remote $file, @args...
173              
174             Just like C, except that the program is searched in the
175             C<$ENV{PATH}> first, similarly to how the shell does it. This makes it easier
176             to find e.g. C:
177              
178             $proc = new_execp AnyEvent::Fork::Remote "ssh", "ssh", "otherhost", "perl";
179              
180             =item my $proc = new AnyEvent::Fork::Remote $create_callback
181              
182             Basically the same as C, but instead of a command to execute,
183             it expects a callback which is invoked each time a process needs to be
184             created.
185              
186             The C<$create_callback> is called with another callback as argument,
187             and should call this callback with the file handle that is connected
188             to a F process. This callback can be invoked even after the
189             C<$create_callback> returns.
190              
191             Example: emulate C using C.
192              
193             use AnyEvent::Util;
194             use Proc::FastSpawn;
195              
196             $proc = new AnyEvent::Fork::Remote sub {
197             my $done = shift;
198              
199             my ($a, $b) = AnyEvent::Util::portable_socketpair
200             or die;
201              
202             open my $oldin , "<&0" or die;
203             open my $oldout, ">&1" or die;
204              
205             open STDIN , "<&" . fileno $b or die;
206             open STDOUT, ">&" . fileno $b or die;
207              
208             spawn "/usr/bin/rsh", ["rsh", "othernode", "perl"];
209              
210             open STDIN , "<&" . fileno $oldin ;
211             open STDOUT, ">&" . fileno $oldout;
212              
213             $done->($a);
214             };
215              
216             =item my $proc = new_from_fh $fh
217              
218             Creates an C object from a file handle. This file
219             handle must be connected to both STDIN and STDOUT of a F process.
220              
221             This form might be more convenient than C or C when
222             creating an C object, but the resulting object
223             does not support C.
224              
225             =cut
226              
227             sub new {
228 1     1 1 1 my ($class, $create) = @_;
229              
230 1         5 bless [
231             $create,
232             "",
233             [],
234             ], $class
235             }
236              
237             sub new_from_fh {
238 0     0 1 0 my ($class, @fh) = @_;
239              
240             $class->new (sub {
241 0 0   0   0 my $fh = shift @fh
242             or Carp::croak "AnyEvent::Fork::Remote::new_from_fh does not support fork";
243              
244 0         0 $_[0]($fh);
245 0         0 });
246             }
247              
248             sub _new_exec {
249 1     1   1 my $p = pop;
250              
251 1         3 my ($class, $program, @argv) = @_;
252              
253 1         4 require AnyEvent::Util;
254 1         2 require Proc::FastSpawn;
255              
256             $class->new (sub {
257 1     1   2 my $done = shift;
258              
259 1 50       2 my ($a, $b) = AnyEvent::Util::portable_socketpair ()
260             or die;
261              
262 1 50       51 open my $oldin , "<&0" or die;
263 1 50       7 open my $oldout, ">&1" or die;
264              
265 1 50       10 open STDIN , "<&" . fileno $b or die;
266 1 50       8 open STDOUT, ">&" . fileno $b or die;
267              
268 1 50       327 $p ? Proc::FastSpawn::spawnp ($program, \@argv)
269             : Proc::FastSpawn::spawn ($program, \@argv);
270              
271 1         18 open STDIN , "<&" . fileno $oldin ;
272 1         6 open STDOUT, ">&" . fileno $oldout;
273              
274 1         3 $done->($a);
275             })
276 1         8 }
277              
278             sub new_exec {
279 1     1 1 32 push @_, 0;
280 1         3 &_new_exec
281             }
282              
283             sub new_execp {
284 0     0 1 0 push @_, 1;
285 0         0 &_new_exec
286             }
287              
288             =item $new_proc = $proc->fork
289              
290             Quite the same as the same method of L, except that it
291             simply clones the object without creating an actual process.
292              
293             =cut
294              
295             sub fork {
296 0     0 1 0 my $self = shift;
297              
298             bless [
299             $self->[0],
300             $self->[1],
301 0         0 [@{ $self->[2] }],
  0         0  
302             ], ref $self
303             }
304              
305             =item undef = $proc->pid
306              
307             The C method always returns C and only exists for
308             compatibility with L.
309              
310             =cut
311              
312             sub pid {
313             undef
314 0     0 1 0 }
315              
316             =item $proc = $proc->send_fh (...)
317              
318             Not supported and always croaks.
319              
320             =cut
321              
322             sub send_fh {
323 0     0 1 0 Carp::croak "send_fh is not supported on AnyEvent::Fork::Remote objects";
324             }
325              
326             =item $proc = $proc->eval ($perlcode, @args)
327              
328             Quite the same as the same method of L.
329              
330             =cut
331              
332             # quote a binary string as a perl scalar
333             sub sq($) {
334 3     3 0 4 my $s = shift;
335              
336 3 50       17 $s =~ /'/
337             or return "'$s'";
338              
339 0         0 $s =~ s/(\x10+)/\x10.'$1'.q\x10/g;
340 0         0 "q\x10$s\x10"
341             }
342              
343             # quote a list of strings
344             sub aq(@) {
345 2     2 0 8 "(" . (join ",", map sq $_, @_) . ")"
346             }
347              
348             sub eval {
349 1     1 1 7 my ($self, $perlcode, @args) = @_;
350              
351 1         1 my $linecode = $perlcode;
352 1         8 $linecode =~ s/\s+/ /g; # takes care of \n
353 1         3 $linecode =~ s/"/''/g;
354 1 50       3 substr $linecode, 70, length $linecode, "..." if length $linecode > 70;
355              
356 1         5 $self->[1] .= '{ local @_ = ' . (aq @args) . ";\n#line 1 \"'$linecode'\"\n$perlcode;\n}\n";
357              
358 1         2 $self
359             }
360              
361             =item $proc = $proc->require ($module, ...)
362              
363             Quite the same as the same method of L.
364              
365             =cut
366              
367             sub require {
368 0     0 1 0 my ($self, @modules) = @_;
369              
370             $self->eval ("require $_")
371 0         0 for @modules;
372              
373 0         0 $self
374             }
375              
376             =item $proc = $proc->send_arg ($string, ...)
377              
378             Quite the same as the same method of L.
379              
380             =cut
381              
382             sub send_arg {
383 0     0 1 0 my ($self, @arg) = @_;
384              
385 0         0 push @{ $self->[2] }, @arg;
  0         0  
386              
387 0         0 $self
388             }
389              
390             =item $proc->run ($func, $cb->($fh))
391              
392             Very similar to the run method of L.
393              
394             On the parent side, the API is identical, except that a C<$cb> argument of
395             C instead of a valid file handle signals an error.
396              
397             On the child side, the "communications socket" is in fact just C<*STDIN>,
398             and typically can only be read from (this highly depends on how the
399             program is created - if you just run F locally, it will work for
400             both reading and writing, but commands such as F or F typically
401             only provide read-only handles for STDIN).
402              
403             To be portable, if the run function wants to read data that is written to
404             C<$fh> in the parent, then it should read from STDIN. If the run function
405             wants to provide data that can later be read from C<$fh>, then it should
406             write them to STDOUT.
407              
408             You can write a run function that works with both L
409             and this module by checking C. If it is C<0> (meaning
410             it is STDIN), then you should use it for reading, and STDOUT for
411             writing. Otherwise, you should use the file handle for both:
412              
413             sub run {
414             my ($rfh, ...) = @_;
415             my $wfh = fileno $rfh ? $rfh : *STDOUT;
416              
417             # now use $rfh for reading and $wfh for writing
418             }
419              
420             =cut
421              
422             sub run {
423 1     1 1 2869 my ($self, $func, $cb) = @_;
424              
425             $self->[0](sub {
426 1 50   1   6 my $fh = shift
427             or die "AnyEvent::Fork::Remote: create callback failed";
428              
429 1 50       10 my $owner = length $ENV{HOSTNAME} ? "$ENV{HOSTNAME}:$$" : "*:$$";
430              
431             my $code = 'BEGIN { $0 = ' . (sq "$owner $func") . '; ' . $self->[1] . "}\n"
432             . 'syswrite STDOUT, ' . (sq $magic0) . '^' . (sq $magic1) . ';'
433             . '{ sysread STDIN, my $dummy, 1 }'
434 1         5 . "\n$func*STDIN," . (aq @{ $self->[2] }) . ';'
  1         3  
435             . "\n__END__\n";
436              
437 1         4 AnyEvent::Util::fh_nonblocking $fh, 1;
438              
439 1         4 my ($rw, $ww);
440              
441 0         0 my $ofs;
442              
443             $ww = AE::io $fh, 1, sub {
444 1         74 my $len = syswrite $fh, $code, 1<<20, $ofs;
445              
446 1 50 33     5 if ($len || $! == Errno::EAGAIN || $! == Errno::EWOULDBLOCK) {
      33        
447 1         1 $ofs += $len;
448 1 50       25 undef $ww if $ofs >= length $code;
449             } else {
450             # error
451 0         0 ($ww, $rw) = (); $cb->(undef);
  0         0  
452             }
453 1         14 };
454              
455 1         2 my $rbuf;
456              
457             $rw = AE::io $fh, 0, sub {
458 1         1766 my $len = sysread $fh, $rbuf, 1<<10;
459              
460 1 50 33     7 if ($len || $! == Errno::EAGAIN || $! == Errno::EWOULDBLOCK) {
      33        
461 1 50       3 $rbuf = substr $rbuf, -length $magic0 if length $rbuf > length $magic0;
462              
463 1 50       5 if ($rbuf eq ($magic0 ^ $magic1)) {
464             # all data was sent, magic was received - both
465             # directions should be "empty", and therefore
466             # the socket must accept at least a single octet,
467             # to signal the "child" to go on.
468 1         1 undef $rw;
469 1 50       2 die if $ww; # uh-oh
470              
471 1         6 syswrite $fh, "\n";
472 1         5 $cb->($fh);
473             }
474             } else {
475             # error
476 0           ($ww, $rw) = (); $cb->(undef);
  0            
477             }
478 1         11 };
479 1         7 });
480             }
481              
482             =back
483              
484             =head1 SEE ALSO
485              
486             L, the same as this module, for local processes.
487              
488             L, to talk to the created processes.
489              
490             L, to manage whole pools of processes.
491              
492             =head1 AUTHOR AND CONTACT INFORMATION
493              
494             Marc Lehmann
495             http://software.schmorp.de/pkg/AnyEvent-Fork-Remote
496              
497             =cut
498              
499             1
500