File Coverage

blib/lib/AnyEvent/Fork/Remote.pm
Criterion Covered Total %
statement 64 88 72.7
branch 16 34 47.0
condition 4 12 33.3
subroutine 13 21 61.9
pod 11 13 84.6
total 108 168 64.2


line stmt bran cond sub pod time code
1             =head1 NAME
2              
3             AnyEvent::Fork::Remote - remote processes with AnyEvent::Fork interface
4              
5             THE API IS NOT FINISHED, CONSIDER THIS A BETA RELEASE
6              
7             =head1 SYNOPSIS
8              
9             use AnyEvent;
10             use AnyEvent::Fork::Remote;
11              
12             my $rpc = AnyEvent::Fork::Remote
13             ->new_execp ("ssh", "ssh", "othermachine", "perl")
14             ->require ("MyModule")
15             ->run ("MyModule::run", my $cv = AE::cv);
16              
17             my $fh = $cv->recv;
18              
19             =head1 DESCRIPTION
20              
21             Despite what the name of this module might suggest, it doesn't actually
22             create remote processes for you. But it does make it easy to use them,
23             once you have started them.
24              
25             This module implements a very similar API as L. In fact,
26             similar enough to require at most minor modifications to support both
27             at the same time. For example, it works with L and
28             L.
29              
30             The documentation for this module will therefore only document the parts
31             of the API that differ between the two modules.
32              
33             =head2 SUMMARY OF DIFFERENCES
34              
35             Here is a short summary of the main differences between L
36             and this module:
37              
38             =over 4
39              
40             =item * C is not implemented and will fail
41              
42             =item * the child-side C function must read from STDIN and write to STDOUT
43              
44             =item * C does not actually fork, but will create a new process
45              
46             =back
47              
48             =head1 EXAMPLE
49              
50             This example uses a local perl (because that is likely going to work
51             without further setup) and the L to create simple
52             worker process.
53              
54             First load the modules we are going to use:
55              
56             use AnyEvent;
57             use AnyEvent::Fork::Remote;
58             use AnyEvent::Fork::RPC;
59              
60             Then create, configure and run the process:
61              
62             my $rpc = AnyEvent::Fork::Remote
63             ->new_execp ("perl", "perl")
64             ->eval ('
65             sub myrun {
66             "this is process $$, and you passed <@_>"
67             }
68             ')
69             ->AnyEvent::Fork::RPC::run ("myrun");
70              
71             We use C to execute the first F found in the PATH. You'll
72             have to make sure there is one for this to work. The perl does not
73             actually have to be the same perl as the one running the example, and it
74             doesn't need to have any modules installed.
75              
76             The reason we have to specif< C twice is that the first argument to
77             C (and also C) is the program name or path, while
78             the remaining ones are the arguments, and the first argument passed to a
79             program is the program name, so it has to be specified twice.
80              
81             Finally, the standard example, send some numbers to the remote function,
82             and print whatever it returns:
83              
84             my $cv = AE::cv;
85              
86             for (1..10) {
87             $cv->begin;
88             $rpc->($_, sub {
89             print "remote function returned: $_[0]\n";
90             $cv->end;
91             });
92             }
93              
94             $cv->recv;
95              
96             Now, executing F in the PATH isn't very interesting - you could have
97             done the same with L, and it might even be more efficient.
98              
99             The power of this module is that the F doesn't need to run on the
100             local box, you could simply substitute another command, such as F
101             remotebox perl>:
102              
103             my $rpc = AnyEvent::Fork::Remote
104             ->new_execp ("ssh", "ssh", "remotebox", "perl")
105              
106             And if you want to use a specific path for ssh, use C:
107              
108             my $rpc = AnyEvent::Fork::Remote
109             ->new_exec ("/usr/bin/ssh", "ssh", "remotebox", "perl")
110              
111             Of course, it doesn't really matter to this module how you construct your
112             perl processes, what matters is that somehow, you give it a file handle
113             connected to the new perls STDIN and STDOUT.
114              
115             =head1 PARENT PROCESS USAGE
116              
117             =over 4
118              
119             =cut
120              
121             package AnyEvent::Fork::Remote;
122              
123 2     2   35369 use common::sense;
  2         22  
  2         13  
124              
125 2     2   130 use Carp ();
  2         3  
  2         40  
126 2     2   1640 use Errno ();
  2         1880  
  2         50  
127              
128 2     2   2289 use AnyEvent ();
  2         5164  
  2         3732  
129              
130             our $VERSION = 0.2;
131              
132             # xored together must start and and with \n
133             my $magic0 = "Pdk{6y[_zZ";
134             my $magic1 = "Z^yZ7~i=oP";
135              
136             =item my $proc = new_exec AnyEvent::Fork::Remote $path, @args...
137              
138             Creates a new C object. Unlike L,
139             processes are only created when C is called, every other method call
140             is is simply recorded until then.
141              
142             Each time a new process is needed, it executes C<$path> with the given
143             arguments (the first array member must be the program name, as with
144             the C function with explicit PROGRAM argument) and both C
145             and C connected to a communications socket. No input must be
146             consumed by the command before F is started, and no output should be
147             generated.
148              
149             The program I invoke F somehow, with STDIN and STDOUT intact,
150             without specifying anything to execute (no script file name, no C<-e>
151             switch etc.).
152              
153             Here are some examples to give you an idea:
154              
155             # just "perl"
156             $proc = new_exec AnyEvent::Fork::Remote
157             "/usr/bin/perl", "perl";
158              
159             # rsh othernode exec perl
160             $proc = new_exec AnyEvent::Fork::Remote
161             "/usr/bin/rsh", "rsh", "othernode", "exec perl";
162              
163             # a complicated ssh command
164             $proc = new_exec AnyEvent::Fork::Remote
165             "/usr/bin/ssh",
166             qw(ssh -q
167             -oCheckHostIP=no -oTCPKeepAlive=yes -oStrictHostKeyChecking=no
168             -oGlobalKnownHostsFile=/dev/null -oUserKnownHostsFile=/dev/null
169             otherhost
170             exec perl);
171              
172             =item my $proc = new_execp AnyEvent::Fork::Remote $file, @args...
173              
174             Just like C, except that the program is searched in the
175             C<$ENV{PATH}> first, similarly to how the shell does it. This makes it easier
176             to find e.g. C:
177              
178             $proc = new_execp AnyEvent::Fork::Remote "ssh", "ssh", "otherhost", "perl";
179              
180             =item my $proc = new AnyEvent::Fork::Remote $create_callback
181              
182             Basically the same as C, but instead of a command to execute,
183             it expects a callback which is invoked each time a process needs to be
184             created.
185              
186             The C<$create_callback> is called with another callback as argument,
187             and should call this callback with the file handle that is connected
188             to a F process. This callback can be invoked even after the
189             C<$create_callback> returns.
190              
191             Example: emulate C using C.
192              
193             use AnyEvent::Util;
194             use Proc::FastSpawn;
195              
196             $proc = new AnyEvent::Fork::Remote sub {
197             my $done = shift;
198              
199             my ($a, $b) = AnyEvent::Util::portable_socketpair
200             or die;
201              
202             open my $oldin , "<&0" or die;
203             open my $oldout, ">&1" or die;
204              
205             open STDIN , "<&" . fileno $b or die;
206             open STDOUT, ">&" . fileno $b or die;
207              
208             spawn "/usr/bin/rsh", ["rsh", "othernode", "perl"];
209              
210             open STDIN , "<&" . fileno $oldin ;
211             open STDOUT, ">&" . fileno $oldout;
212              
213             $done->($a);
214             };
215              
216             =item my $proc = new_from_fh $fh
217              
218             Creates an C object from a file handle. This file
219             handle must be connected to both STDIN and STDOUT of a F process.
220              
221             This form might be more convenient than C or C when
222             creating an C object, but the resulting object
223             does not support C.
224              
225             =cut
226              
227             sub new {
228 1     1 1 2 my ($class, $create) = @_;
229              
230 1         9 bless [
231             $create,
232             "",
233             [],
234             ], $class
235             }
236              
237             sub new_from_fh {
238 0     0 1 0 my ($class, @fh) = @_;
239              
240             $class->new (sub {
241             shift @fh
242 0 0   0   0 or Carp::croak "AnyEvent::Fork::Remote::new_from_fh does not support fork";
243 0         0 });
244             }
245              
246             sub _new_exec {
247 1     1   3 my $p = pop;
248              
249 1         4 my ($class, $program, @argv) = @_;
250              
251 1         9 require AnyEvent::Util;
252 1         6 require Proc::FastSpawn;
253              
254             $class->new (sub {
255 1     1   3 my $done = shift;
256              
257 1 50       5 my ($a, $b) = AnyEvent::Util::portable_socketpair ()
258             or die;
259              
260 1 50       89 open my $oldin , "<&0" or die;
261 1 50       18 open my $oldout, ">&1" or die;
262              
263 1 50       23 open STDIN , "<&" . fileno $b or die;
264 1 50       23 open STDOUT, ">&" . fileno $b or die;
265              
266 1 50       1709 $p ? Proc::FastSpawn::spawnp ($program, \@argv)
267             : Proc::FastSpawn::spawn ($program, \@argv);
268              
269 1         27 open STDIN , "<&" . fileno $oldin ;
270 1         21 open STDOUT, ">&" . fileno $oldout;
271              
272 1         5 $done->($a);
273             })
274 1         13 }
275              
276             sub new_exec {
277 1     1 1 53 push @_, 0;
278 1         3 &_new_exec
279             }
280              
281             sub new_execp {
282 0     0 1 0 push @_, 1;
283 0         0 &_new_exec
284             }
285              
286             =item $new_proc = $proc->fork
287              
288             Quite the same as the same method of L, except that it
289             simply clones the object without creating an actual process.
290              
291             =cut
292              
293             sub fork {
294 0     0 1 0 my $self = shift;
295              
296 0         0 bless [
297             $self->[0],
298             $self->[1],
299 0         0 [@{ $self->[2] }],
300             ], ref $self
301             }
302              
303             =item undef = $proc->pid
304              
305             The C method always returns C and only exists for
306             compatibility with L.
307              
308             =cut
309              
310             sub pid {
311             undef
312 0     0 1 0 }
313              
314             =item $proc = $proc->send_fh (...)
315              
316             Not supported and always croaks.
317              
318             =cut
319              
320             sub send_fh {
321 0     0 1 0 Carp::croak "send_fh is not supported on AnyEvent::Fork::Remote objects";
322             }
323              
324             =item $proc = $proc->eval ($perlcode, @args)
325              
326             Quite the same as the same method of L.
327              
328             =cut
329              
330             # quote a binary string as a perl scalar
331             sub sq($) {
332 3     3 0 7 my $s = shift;
333              
334 3 50       27 $s =~ /'/
335             or return "'$s'";
336              
337 0         0 $s =~ s/(\x10+)/\x10.'$1'.q\x10/g;
338 0         0 "q\x10$s\x10"
339             }
340              
341             # quote a list of strings
342             sub aq(@) {
343 2     2 0 17 "(" . (join ",", map sq $_, @_) . ")"
344             }
345              
346             sub eval {
347 1     1 1 12 my ($self, $perlcode, @args) = @_;
348              
349 1         2 my $linecode = $perlcode;
350 1         12 $linecode =~ s/\s+/ /g; # takes care of \n
351 1         6 $linecode =~ s/"/''/g;
352 1 50       5 substr $linecode, 70, length $linecode, "..." if length $linecode > 70;
353              
354 1         9 $self->[1] .= '{ local @_ = ' . (aq @args) . ";\n#line 1 \"'$linecode'\"\n$perlcode;\n}\n";
355              
356 1         141 $self
357             }
358              
359             =item $proc = $proc->require ($module, ...)
360              
361             Quite the same as the same method of L.
362              
363             =cut
364              
365             sub require {
366 0     0 1 0 my ($self, @modules) = @_;
367              
368             $self->eval ("require $_")
369 0         0 for @modules;
370              
371 0         0 $self
372             }
373              
374             =item $proc = $proc->send_arg ($string, ...)
375              
376             Quite the same as the same method of L.
377              
378             =cut
379              
380             sub send_arg {
381 0     0 1 0 my ($self, @arg) = @_;
382              
383 0         0 push @{ $self->[2] }, @arg;
  0         0  
384              
385 0         0 $self
386             }
387              
388             =item $proc->run ($func, $cb->($fh))
389              
390             Very similar to the run method of L.
391              
392             On the parent side, the API is identical, except that a C<$cb> argument of
393             C instead of a valid file handle signals an error.
394              
395             On the child side, the "communications socket" is in fact just C<*STDIN>,
396             and typically can only be read from (this highly depends on how the
397             program is created - if you just run F locally, it will work for
398             both reading and writing, but commands such as F or F typically
399             only provide read-only handles for STDIN).
400              
401             To be portable, if the run function wants to read data that is written to
402             C<$fh> in the parent, then it should read from STDIN. If the run function
403             wants to provide data that can later be read from C<$fh>, then it should
404             write them to STDOUT.
405              
406             You can write a run function that works with both L
407             and this module by checking C. If it is C<0> (meaning
408             it is STDIN), then you should use it for reading, and STDOUT for
409             writing. Otherwise, you should use the file handle for both:
410              
411             sub run {
412             my ($rfh, ...) = @_;
413             my $wfh = fileno $rfh ? $rfh : *STDOUT;
414              
415             # now use $rfh for reading and $wfh for writing
416             }
417              
418             =cut
419              
420             sub run {
421 1     1 1 5837 my ($self, $func, $cb) = @_;
422              
423             $self->[0](sub {
424 1 50   1   6 my $fh = shift
425             or die "AnyEvent::Fork::Remote: create callback failed";
426              
427 1 50       17 my $owner = length $ENV{HOSTNAME} ? "$ENV{HOSTNAME}:$$" : "*:$$";
428              
429 1         6 my $code = 'BEGIN { $0 = ' . (sq "$func of $owner") . '; ' . $self->[1] . "}\n"
430             . 'syswrite STDOUT, ' . (sq $magic0) . '^' . (sq $magic1) . ';'
431             . '{ sysread STDIN, my $dummy, 1 }'
432 1         7 . "\n$func*STDIN," . (aq @{ $self->[2] }) . ';'
433             . "\n__END__\n";
434              
435 1         7 AnyEvent::Util::fh_nonblocking $fh, 1;
436              
437 1         28 my ($rw, $ww);
438              
439 0         0 my $ofs;
440              
441             $ww = AE::io $fh, 1, sub {
442 1         155 my $len = syswrite $fh, $code, 1<<20, $ofs;
443              
444 1 50 33     8 if ($len || $! == Errno::EAGAIN || $! == Errno::EWOULDBLOCK) {
      33        
445 1         3 $ofs += $len;
446 1 50       36 undef $ww if $ofs >= length $code;
447             } else {
448             # error
449 0         0 ($ww, $rw) = (); $cb->(undef);
  0         0  
450             }
451 1         23 };
452              
453 1         2 my $rbuf;
454              
455             $rw = AE::io $fh, 0, sub {
456 1         3341 my $len = sysread $fh, $rbuf, 1<<10;
457              
458 1 50 33     9 if ($len || $! == Errno::EAGAIN || $! == Errno::EWOULDBLOCK) {
      33        
459 1 50       5 $rbuf = substr $rbuf, -length $magic0 if length $rbuf > length $magic0;
460              
461 1 50       7 if ($rbuf eq ($magic0 ^ $magic1)) {
462             # all data was sent, magic was received - both
463             # directions should be "empty", and therefore
464             # the socket must accept at least a single octet,
465             # to signal the "child" to go on.
466 1         2 undef $rw;
467 1 50       4 die if $ww; # uh-oh
468              
469 1         8 syswrite $fh, "\n";
470 1         8 $cb->($fh);
471             }
472             } else {
473             # error
474 0           ($ww, $rw) = (); $cb->(undef);
  0            
475             }
476 1         24 };
477 1         12 });
478             }
479              
480             =back
481              
482             =head1 SEE ALSO
483              
484             L, the same as this module, for local processes.
485              
486             L, to talk to the created processes.
487              
488             L, to manage whole pools of processes.
489              
490             =head1 AUTHOR AND CONTACT INFORMATION
491              
492             Marc Lehmann
493             http://software.schmorp.de/pkg/AnyEvent-Fork-Remote
494              
495             =cut
496              
497             1
498