File Coverage

blib/lib/AnyEvent/Fork/RPC.pm
Criterion Covered Total %
statement 68 82 82.9
branch 24 44 54.5
condition 11 38 28.9
subroutine 10 13 76.9
pod 1 1 100.0
total 114 178 64.0


line stmt bran cond sub pod time code
1             =head1 NAME
2              
3             AnyEvent::Fork::RPC - simple RPC extension for AnyEvent::Fork
4              
5             =head1 SYNOPSIS
6              
7             use AnyEvent::Fork;
8             use AnyEvent::Fork::RPC;
9              
10             my $rpc = AnyEvent::Fork
11             ->new
12             ->require ("MyModule")
13             ->AnyEvent::Fork::RPC::run (
14             "MyModule::server",
15             );
16              
17             use AnyEvent;
18              
19             my $cv = AE::cv;
20              
21             $rpc->(1, 2, 3, sub {
22             print "MyModule::server returned @_\n";
23             $cv->send;
24             });
25              
26             $cv->recv;
27              
28             =head1 DESCRIPTION
29              
30             This module implements a simple RPC protocol and backend for processes
31             created via L or L, allowing you
32             to call a function in the child process and receive its return values (up
33             to 4GB serialised).
34              
35             It implements two different backends: a synchronous one that works like a
36             normal function call, and an asynchronous one that can run multiple jobs
37             concurrently in the child, using AnyEvent.
38              
39             It also implements an asynchronous event mechanism from the child to the
40             parent, that could be used for progress indications or other information.
41              
42             =head1 EXAMPLES
43              
44             =head2 Example 1: Synchronous Backend
45              
46             Here is a simple example that implements a backend that executes C
47             and C calls, and reports their status back. It also reports the
48             number of requests it has processed every three requests, which is clearly
49             silly, but illustrates the use of events.
50              
51             First the parent process:
52              
53             use AnyEvent;
54             use AnyEvent::Fork;
55             use AnyEvent::Fork::RPC;
56              
57             my $done = AE::cv;
58              
59             my $rpc = AnyEvent::Fork
60             ->new
61             ->require ("MyWorker")
62             ->AnyEvent::Fork::RPC::run ("MyWorker::run",
63             on_error => sub { warn "ERROR: $_[0]"; exit 1 },
64             on_event => sub { warn "$_[0] requests handled\n" },
65             on_destroy => $done,
66             );
67              
68             for my $id (1..6) {
69             $rpc->(rmdir => "/tmp/somepath/$id", sub {
70             $_[0]
71             or warn "/tmp/somepath/$id: $_[1]\n";
72             });
73             }
74              
75             undef $rpc;
76              
77             $done->recv;
78              
79             The parent creates the process, queues a few rmdir's. It then forgets
80             about the C<$rpc> object, so that the child exits after it has handled the
81             requests, and then it waits till the requests have been handled.
82              
83             The child is implemented using a separate module, C, shown here:
84              
85             package MyWorker;
86              
87             my $count;
88              
89             sub run {
90             my ($cmd, $path) = @_;
91              
92             AnyEvent::Fork::RPC::event ($count)
93             unless ++$count % 3;
94              
95             my $status = $cmd eq "rmdir" ? rmdir $path
96             : $cmd eq "unlink" ? unlink $path
97             : die "fatal error, illegal command '$cmd'";
98              
99             $status or (0, "$!")
100             }
101              
102             1
103              
104             The C function first sends a "progress" event every three calls, and
105             then executes C or C, depending on the first parameter (or
106             dies with a fatal error - obviously, you must never let this happen :).
107              
108             Eventually it returns the status value true if the command was successful,
109             or the status value 0 and the stringified error message.
110              
111             On my system, running the first code fragment with the given
112             F in the current directory yields:
113              
114             /tmp/somepath/1: No such file or directory
115             /tmp/somepath/2: No such file or directory
116             3 requests handled
117             /tmp/somepath/3: No such file or directory
118             /tmp/somepath/4: No such file or directory
119             /tmp/somepath/5: No such file or directory
120             6 requests handled
121             /tmp/somepath/6: No such file or directory
122              
123             Obviously, none of the directories I am trying to delete even exist. Also,
124             the events and responses are processed in exactly the same order as
125             they were created in the child, which is true for both synchronous and
126             asynchronous backends.
127              
128             Note that the parentheses in the call to C are
129             not optional. That is because the function isn't defined when the code is
130             compiled. You can make sure it is visible by pre-loading the correct
131             backend module in the call to C:
132              
133             ->require ("AnyEvent::Fork::RPC::Sync", "MyWorker")
134              
135             Since the backend module declares the C function, loading it first
136             ensures that perl will correctly interpret calls to it.
137              
138             And as a final remark, there is a fine module on CPAN that can
139             asynchronously C and C and a lot more, and more efficiently
140             than this example, namely L.
141              
142             =head3 Example 1a: the same with the asynchronous backend
143              
144             This example only shows what needs to be changed to use the async backend
145             instead. Doing this is not very useful, the purpose of this example is
146             to show the minimum amount of change that is required to go from the
147             synchronous to the asynchronous backend.
148              
149             To use the async backend in the previous example, you need to add the
150             C parameter to the C call:
151              
152             ->AnyEvent::Fork::RPC::run ("MyWorker::run",
153             async => 1,
154             ...
155              
156             And since the function call protocol is now changed, you need to adopt
157             C to the async API.
158              
159             First, you need to accept the extra initial C<$done> callback:
160              
161             sub run {
162             my ($done, $cmd, $path) = @_;
163              
164             And since a response is now generated when C<$done> is called, as opposed
165             to when the function returns, we need to call the C<$done> function with
166             the status:
167              
168             $done->($status or (0, "$!"));
169              
170             A few remarks are in order. First, it's quite pointless to use the async
171             backend for this example - but it I possible. Second, you can call
172             C<$done> before or after returning from the function. Third, having both
173             returned from the function and having called the C<$done> callback, the
174             child process may exit at any time, so you should call C<$done> only when
175             you really I done.
176              
177             =head2 Example 2: Asynchronous Backend
178              
179             This example implements multiple count-downs in the child, using
180             L timers. While this is a bit silly (one could use timers in the
181             parent just as well), it illustrates the ability to use AnyEvent in the
182             child and the fact that responses can arrive in a different order then the
183             requests.
184              
185             It also shows how to embed the actual child code into a C<__DATA__>
186             section, so it doesn't need any external files at all.
187              
188             And when your parent process is often busy, and you have stricter timing
189             requirements, then running timers in a child process suddenly doesn't look
190             so silly anymore.
191              
192             Without further ado, here is the code:
193              
194             use AnyEvent;
195             use AnyEvent::Fork;
196             use AnyEvent::Fork::RPC;
197              
198             my $done = AE::cv;
199              
200             my $rpc = AnyEvent::Fork
201             ->new
202             ->require ("AnyEvent::Fork::RPC::Async")
203             ->eval (do { local $/; })
204             ->AnyEvent::Fork::RPC::run ("run",
205             async => 1,
206             on_error => sub { warn "ERROR: $_[0]"; exit 1 },
207             on_event => sub { print $_[0] },
208             on_destroy => $done,
209             );
210              
211             for my $count (3, 2, 1) {
212             $rpc->($count, sub {
213             warn "job $count finished\n";
214             });
215             }
216              
217             undef $rpc;
218              
219             $done->recv;
220              
221             __DATA__
222              
223             # this ends up in main, as we don't use a package declaration
224              
225             use AnyEvent;
226              
227             sub run {
228             my ($done, $count) = @_;
229              
230             my $n;
231              
232             AnyEvent::Fork::RPC::event "starting to count up to $count\n";
233              
234             my $w; $w = AE::timer 1, 1, sub {
235             ++$n;
236              
237             AnyEvent::Fork::RPC::event "count $n of $count\n";
238              
239             if ($n == $count) {
240             undef $w;
241             $done->();
242             }
243             };
244             }
245              
246             The parent part (the one before the C<__DATA__> section) isn't very
247             different from the earlier examples. It sets async mode, preloads
248             the backend module (so the C function is
249             declared), uses a slightly different C handler (which we use
250             simply for logging purposes) and then, instead of loading a module with
251             the actual worker code, it C's the code from the data section in the
252             child process.
253              
254             It then starts three countdowns, from 3 to 1 seconds downwards, destroys
255             the rpc object so the example finishes eventually, and then just waits for
256             the stuff to trickle in.
257              
258             The worker code uses the event function to log some progress messages, but
259             mostly just creates a recurring one-second timer.
260              
261             The timer callback increments a counter, logs a message, and eventually,
262             when the count has been reached, calls the finish callback.
263              
264             On my system, this results in the following output. Since all timers fire
265             at roughly the same time, the actual order isn't guaranteed, but the order
266             shown is very likely what you would get, too.
267              
268             starting to count up to 3
269             starting to count up to 2
270             starting to count up to 1
271             count 1 of 3
272             count 1 of 2
273             count 1 of 1
274             job 1 finished
275             count 2 of 2
276             job 2 finished
277             count 2 of 3
278             count 3 of 3
279             job 3 finished
280              
281             While the overall ordering isn't guaranteed, the async backend still
282             guarantees that events and responses are delivered to the parent process
283             in the exact same ordering as they were generated in the child process.
284              
285             And unless your system is I busy, it should clearly show that the
286             job started last will finish first, as it has the lowest count.
287              
288             This concludes the async example. Since L does not
289             actually fork, you are free to use about any module in the child, not just
290             L, but also L, or L for example.
291              
292             =head2 Example 3: Asynchronous backend with Coro
293              
294             With L you can create a nice asynchronous backend implementation by
295             defining an rpc server function that creates a new Coro thread for every
296             request that calls a function "normally", i.e. the parameters from the
297             parent process are passed to it, and any return values are returned to the
298             parent process, e.g.:
299              
300             package My::Arith;
301              
302             sub add {
303             return $_[0] + $_[1];
304             }
305              
306             sub mul {
307             return $_[0] * $_[1];
308             }
309              
310             sub run {
311             my ($done, $func, @arg) = @_;
312              
313             Coro::async_pool {
314             $done->($func->(@arg));
315             };
316             }
317              
318             The C function creates a new thread for every invocation, using the
319             first argument as function name, and calls the C<$done> callback on it's
320             return values. This makes it quite natural to define the C and C
321             functions to add or multiply two numbers and return the result.
322              
323             Since this is the asynchronous backend, it's quite possible to define RPC
324             function that do I/O or wait for external events - their execution will
325             overlap as needed.
326              
327             The above could be used like this:
328              
329             my $rpc = AnyEvent::Fork
330             ->new
331             ->require ("MyWorker")
332             ->AnyEvent::Fork::RPC::run ("My::Arith::run",
333             on_error => ..., on_event => ..., on_destroy => ...,
334             );
335              
336             $rpc->(add => 1, 3, Coro::rouse_cb); say Coro::rouse_wait;
337             $rpc->(mul => 3, 2, Coro::rouse_cb); say Coro::rouse_wait;
338              
339             The C's will print C<4> and C<6>.
340              
341             =head2 Example 4: Forward AnyEvent::Log messages using C
342              
343             This partial example shows how to use the C function to forward
344             L messages to the parent.
345              
346             For this, the parent needs to provide a suitable C:
347              
348             ->AnyEvent::Fork::RPC::run (
349             on_event => sub {
350             if ($_[0] eq "ae_log") {
351             my (undef, $level, $message) = @_;
352             AE::log $level, $message;
353             } else {
354             # other event types
355             }
356             },
357             )
358              
359             In the child, as early as possible, the following code should reconfigure
360             L to log via C:
361              
362             $AnyEvent::Log::LOG->log_cb (sub {
363             my ($timestamp, $orig_ctx, $level, $message) = @{+shift};
364              
365             if (defined &AnyEvent::Fork::RPC::event) {
366             AnyEvent::Fork::RPC::event (ae_log => $level, $message);
367             } else {
368             warn "[$$ before init] $message\n";
369             }
370             });
371              
372             There is an important twist - the C function
373             is only defined when the child is fully initialised. If you redirect the
374             log messages in your C function for example, then the C
375             function might not yet be available. This is why the log callback checks
376             whether the fucntion is there using C, and only then uses it to
377             log the message.
378              
379             =head1 PARENT PROCESS USAGE
380              
381             This module exports nothing, and only implements a single function:
382              
383             =over 4
384              
385             =cut
386              
387             package AnyEvent::Fork::RPC;
388              
389 4     4   56259 use common::sense;
  4         56  
  4         23  
390              
391 4     4   1081 use Errno ();
  4         2817  
  4         72  
392 4     4   890 use Guard ();
  4         917  
  4         75  
393              
394 4     4   942 use AnyEvent;
  4         5254  
  4         5781  
395              
396             our $VERSION = 1.24;
397              
398             =item my $rpc = AnyEvent::Fork::RPC::run $fork, $function, [key => value...]
399              
400             The traditional way to call it. But it is way cooler to call it in the
401             following way:
402              
403             =item my $rpc = $fork->AnyEvent::Fork::RPC::run ($function, [key => value...])
404              
405             This C function/method can be used in place of the
406             L method. Just like that method, it takes over
407             the L process, but instead of calling the specified
408             C<$function> directly, it runs a server that accepts RPC calls and handles
409             responses.
410              
411             It returns a function reference that can be used to call the function in
412             the child process, handling serialisation and data transfers.
413              
414             The following key/value pairs are allowed. It is recommended to have at
415             least an C or C handler set.
416              
417             =over 4
418              
419             =item on_error => $cb->($msg)
420              
421             Called on (fatal) errors, with a descriptive (hopefully) message. If
422             this callback is not provided, but C is, then the C
423             callback is called with the first argument being the string C,
424             followed by the error message.
425              
426             If neither handler is provided, then the error is reported with loglevel
427             C via C.
428              
429             =item on_event => $cb->(...)
430              
431             Called for every call to the C function in the
432             child, with the arguments of that function passed to the callback.
433              
434             Also called on errors when no C handler is provided.
435              
436             =item on_destroy => $cb->()
437              
438             Called when the C<$rpc> object has been destroyed and all requests have
439             been successfully handled. This is useful when you queue some requests and
440             want the child to go away after it has handled them. The problem is that
441             the parent must not exit either until all requests have been handled, and
442             this can be accomplished by waiting for this callback.
443              
444             =item init => $function (default none)
445              
446             When specified (by name), this function is called in the child as the very
447             first thing when taking over the process, with all the arguments normally
448             passed to the C function, except the communications
449             socket.
450              
451             It can be used to do one-time things in the child such as storing passed
452             parameters or opening database connections.
453              
454             It is called very early - before the serialisers are created or the
455             C<$function> name is resolved into a function reference, so it could be
456             used to load any modules that provide the serialiser or function. It can
457             not, however, create events.
458              
459             =item done => $function (default C)
460              
461             The function to call when the asynchronous backend detects an end of file
462             condition when reading from the communications socket I there are no
463             outstanding requests. It's ignored by the synchronous backend.
464              
465             By overriding this you can prolong the life of a RPC process after e.g.
466             the parent has exited by running the event loop in the provided function
467             (or simply calling it, for example, when your child process uses L you
468             could provide L as C function).
469              
470             Of course, in that case you are responsible for exiting at the appropriate
471             time and not returning from
472              
473             =item async => $boolean (default: 0)
474              
475             The default server used in the child does all I/O blockingly, and only
476             allows a single RPC call to execute concurrently.
477              
478             Setting C to a true value switches to another implementation that
479             uses L in the child and allows multiple concurrent RPC calls (it
480             does not support recursion in the event loop however, blocking condvar
481             calls will fail).
482              
483             The actual API in the child is documented in the section that describes
484             the calling semantics of the returned C<$rpc> function.
485              
486             If you want to pre-load the actual back-end modules to enable memory
487             sharing, then you should load C for
488             synchronous, and C for asynchronous mode.
489              
490             If you use a template process and want to fork both sync and async
491             children, then it is permissible to load both modules.
492              
493             =item serialiser => $string (default: $AnyEvent::Fork::RPC::STRING_SERIALISER)
494              
495             All arguments, result data and event data have to be serialised to be
496             transferred between the processes. For this, they have to be frozen and
497             thawed in both parent and child processes.
498              
499             By default, only octet strings can be passed between the processes,
500             which is reasonably fast and efficient and requires no extra modules
501             (the C distribution does not provide these extra
502             serialiser modules).
503              
504             For more complicated use cases, you can provide your own freeze and thaw
505             functions, by specifying a string with perl source code. It's supposed to
506             return two code references when evaluated: the first receives a list of
507             perl values and must return an octet string. The second receives the octet
508             string and must return the original list of values.
509              
510             If you need an external module for serialisation, then you can either
511             pre-load it into your L process, or you can add a C
512             or C statement into the serialiser string. Or both.
513              
514             Here are some examples - all of them are also available as global
515             variables that make them easier to use.
516              
517             =over 4
518              
519             =item C<$AnyEvent::Fork::RPC::STRING_SERIALISER> - octet strings only
520              
521             This serialiser (currently the default) concatenates length-prefixes octet
522             strings, and is the default. That means you can only pass (and return)
523             strings containing character codes 0-255.
524              
525             The main advantages of this serialiser are the high speed and that it
526             doesn't need another module. The main disadvantage is that you are very
527             limited in what you can pass - only octet strings.
528              
529             Implementation:
530              
531             (
532             sub { pack "(w/a*)*", @_ },
533             sub { unpack "(w/a*)*", shift }
534             )
535              
536             =item C<$AnyEvent::Fork::RPC::CBOR_XS_SERIALISER> - uses L
537              
538             This serialiser creates CBOR::XS arrays - you have to make sure the
539             L module is installed for this serialiser to work. It can be
540             beneficial for sharing when you preload the L module in a template
541             process.
542              
543             L is about as fast as the octet string serialiser, but supports
544             complex data structures (similar to JSON) and is faster than any of the
545             other serialisers. If you have the L module available, it's the
546             best choice.
547              
548             The encoder enables C (so this serialisation method can
549             encode cyclic and self-referencing data structures).
550              
551             Implementation:
552              
553             use CBOR::XS ();
554             (
555             sub { CBOR::XS::encode_cbor_sharing \@_ },
556             sub { @{ CBOR::XS::decode_cbor shift } }
557             )
558              
559             =item C<$AnyEvent::Fork::RPC::JSON_SERIALISER> - uses L or L
560              
561             This serialiser creates JSON arrays - you have to make sure the L
562             module is installed for this serialiser to work. It can be beneficial for
563             sharing when you preload the L module in a template process.
564              
565             L (with L installed) is slower than the octet string
566             serialiser, but usually much faster than L, unless big chunks of
567             binary data need to be transferred.
568              
569             Implementation:
570              
571             use JSON ();
572             (
573             sub { JSON::encode_json \@_ },
574             sub { @{ JSON::decode_json shift } }
575             )
576              
577             =item C<$AnyEvent::Fork::RPC::STORABLE_SERIALISER> - L
578              
579             This serialiser uses L, which means it has high chance of
580             serialising just about anything you throw at it, at the cost of having
581             very high overhead per operation. It also comes with perl. It should be
582             used when you need to serialise complex data structures.
583              
584             Implementation:
585              
586             use Storable ();
587             (
588             sub { Storable::freeze \@_ },
589             sub { @{ Storable::thaw shift } }
590             )
591              
592             =item C<$AnyEvent::Fork::RPC::NSTORABLE_SERIALISER> - portable Storable
593              
594             This serialiser also uses L, but uses it's "network" format
595             to serialise data, which makes it possible to talk to different
596             perl binaries (for example, when talking to a process created with
597             L).
598              
599             Implementation:
600              
601             use Storable ();
602             (
603             sub { Storable::nfreeze \@_ },
604             sub { @{ Storable::thaw shift } }
605             )
606              
607             =back
608              
609             =back
610              
611             See the examples section earlier in this document for some actual
612             examples.
613              
614             =cut
615              
616             our $STRING_SERIALISER = '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })';
617             our $CBOR_XS_SERIALISER = 'use CBOR::XS (); (sub { CBOR::XS::encode_cbor_sharing \@_ }, sub { @{ CBOR::XS::decode_cbor shift } })';
618             our $JSON_SERIALISER = 'use JSON (); (sub { JSON::encode_json \@_ }, sub { @{ JSON::decode_json shift } })';
619             our $STORABLE_SERIALISER = 'use Storable (); (sub { Storable::freeze \@_ }, sub { @{ Storable::thaw shift } })';
620             our $NSTORABLE_SERIALISER = 'use Storable (); (sub { Storable::nfreeze \@_ }, sub { @{ Storable::thaw shift } })';
621              
622             sub run {
623 3     3 1 16103 my ($self, $function, %arg) = @_;
624              
625 3   33     25 my $serialiser = delete $arg{serialiser} || $STRING_SERIALISER;
626 3         8 my $on_event = delete $arg{on_event};
627 3         8 my $on_error = delete $arg{on_error};
628 3         7 my $on_destroy = delete $arg{on_destroy};
629            
630             # default for on_error is to on_event, if specified
631             $on_error ||= $on_event
632 0     0   0 ? sub { $on_event->(error => shift) }
633 3 0 33 0   11 : sub { AE::log die => "AnyEvent::Fork::RPC: uncaught error: $_[0]." };
  0         0  
634              
635             # default for on_event is to raise an error
636 3   50 0   10 $on_event ||= sub { $on_error->("event received, but no on_event handler") };
  0         0  
637              
638 3 50       358 my ($f, $t) = eval $serialiser; die $@ if $@;
  3         19  
639              
640 3         9 my (@rcb, %rcb, $fh, $shutdown, $wbuf, $ww);
641 3         9 my ($rlen, $rbuf, $rw) = 512 - 16;
642              
643             my $wcb = sub {
644 2     2   264 my $len = syswrite $fh, $wbuf;
645              
646 2 50       13 unless (defined $len) {
647 0 0 0     0 if ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
648 0         0 undef $rw; undef $ww; # it ends here
  0         0  
649 0         0 $on_error->("$!");
650             }
651             }
652              
653 2         6 substr $wbuf, 0, $len, "";
654              
655 2 50       6 unless (length $wbuf) {
656 2         6 undef $ww;
657 2 50       39 $shutdown and shutdown $fh, 1;
658             }
659 3         15 };
660              
661 3 100       19 my $module = "AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync");
662              
663             $self->require ($module)
664             ->send_arg ($function, $arg{init}, $serialiser, $arg{done} || "$module\::do_exit")
665             ->run ("$module\::run", sub {
666 3 100   3   1595 $fh = shift
667             or return $on_error->("connection failed");
668              
669 2         6 my ($id, $len);
670             $rw = AE::io $fh, 0, sub {
671 8 50       332776 $rlen = $rlen * 2 + 16 if $rlen - 128 < length $rbuf;
672 8         158 $len = sysread $fh, $rbuf, $rlen - length $rbuf, length $rbuf;
673              
674 8 100 0     47 if ($len) {
    50          
    0          
675 6         32 while (8 <= length $rbuf) {
676 13         249 ($id, $len) = unpack "NN", $rbuf;
677 13 50       74 8 + $len <= length $rbuf
678             or last;
679              
680 13         636 my @r = $t->(substr $rbuf, 8, $len);
681 13         111 substr $rbuf, 0, 8 + $len, "";
682              
683 13 100       48 if ($id) {
684 5 100       22 if (@rcb) {
    50          
685 4         12 (shift @rcb)->(@r);
686             } elsif (my $cb = delete $rcb{$id}) {
687 1         5 $cb->(@r);
688             } else {
689 0         0 undef $rw; undef $ww;
  0         0  
690 0         0 $on_error->("unexpected data from child");
691             }
692             } else {
693 8         45 $on_event->(@r);
694             }
695             }
696             } elsif (defined $len) {
697 2         5 undef $rw; undef $ww; # it ends here
  2         6  
698              
699 2 50 33     18 if (@rcb || %rcb) {
700 0         0 $on_error->("unexpected eof");
701             } else {
702 2 50       20 $on_destroy->()
703             if $on_destroy;
704             }
705             } elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
706 0         0 undef $rw; undef $ww; # it ends here
  0         0  
707 0         0 $on_error->("read: $!");
708             }
709 2         20 };
710              
711 2   33     20 $ww ||= AE::io $fh, 1, $wcb;
712 3   33     15 });
713              
714             my $guard = Guard::guard {
715 3     3   95 $shutdown = 1;
716              
717 3 50 33     26 shutdown $fh, 1 if $fh && !$ww;
718 3         225 };
719              
720 3         6 my $id;
721              
722             $arg{async}
723             ? sub {
724 2 50   2   147 $id = ($id == 0xffffffff ? 0 : $id) + 1;
725 2 0       18 $id = ($id == 0xffffffff ? 0 : $id) + 1 while exists $rcb{$id}; # rarely loops
726              
727 2         9 $rcb{$id} = pop;
728              
729 2         4 $guard if 0; # keep it alive
730              
731 2         59 $wbuf .= pack "NN/a*", $id, &$f;
732 2   33     18 $ww ||= $fh && AE::io $fh, 1, $wcb;
      33        
733             }
734             : sub {
735 4     4   92 push @rcb, pop;
736              
737 4         6 $guard; # keep it alive
738              
739 4         87 $wbuf .= pack "N/a*", &$f;
740 4   33     24 $ww ||= $fh && AE::io $fh, 1, $wcb;
      33        
741             }
742 3 100       25 }
743              
744             =item $rpc->(..., $cb->(...))
745              
746             The RPC object returned by C is actually a code
747             reference. There are two things you can do with it: call it, and let it go
748             out of scope (let it get destroyed).
749              
750             If C was false when C<$rpc> was created (the default), then, if you
751             call C<$rpc>, the C<$function> is invoked with all arguments passed to
752             C<$rpc> except the last one (the callback). When the function returns, the
753             callback will be invoked with all the return values.
754              
755             If C was true, then the C<$function> receives an additional
756             initial argument, the result callback. In this case, returning from
757             C<$function> does nothing - the function only counts as "done" when the
758             result callback is called, and any arguments passed to it are considered
759             the return values. This makes it possible to "return" from event handlers
760             or e.g. Coro threads.
761              
762             The other thing that can be done with the RPC object is to destroy it. In
763             this case, the child process will execute all remaining RPC calls, report
764             their results, and then exit.
765              
766             See the examples section earlier in this document for some actual
767             examples.
768              
769             =back
770              
771             =head1 CHILD PROCESS USAGE
772              
773             The following function is not available in this module. They are only
774             available in the namespace of this module when the child is running,
775             without having to load any extra modules. They are part of the child-side
776             API of L.
777              
778             =over 4
779              
780             =item AnyEvent::Fork::RPC::event ...
781              
782             Send an event to the parent. Events are a bit like RPC calls made by the
783             child process to the parent, except that there is no notion of return
784             values.
785              
786             See the examples section earlier in this document for some actual
787             examples.
788              
789             Note: the event data, like any data send to the parent, might not be sent
790             immediatelly but queued for later sending, so there is no guarantee that
791             the event has been sent to the parent when the call returns - when you
792             e.g. exit directly after calling this function, the parent might never
793             receive the event.
794              
795             =back
796              
797             =head2 PROCESS EXIT
798              
799             If and when the child process exits depends on the backend and
800             configuration. Apart from explicit exits (e.g. by calling C) or
801             runtime conditions (uncaught exceptions, signals etc.), the backends exit
802             under these conditions:
803              
804             =over 4
805              
806             =item Synchronous Backend
807              
808             The synchronous backend is very simple: when the process waits for another
809             request to arrive and the writing side (usually in the parent) is closed,
810             it will exit normally, i.e. as if your main program reached the end of the
811             file.
812              
813             That means that if your parent process exits, the RPC process will usually
814             exit as well, either because it is idle anyway, or because it executes a
815             request. In the latter case, you will likely get an error when the RPc
816             process tries to send the results to the parent (because agruably, you
817             shouldn't exit your parent while there are still outstanding requests).
818              
819             The process is usually quiescent when it happens, so it should rarely be a
820             problem, and C handlers can be used to clean up.
821              
822             =item Asynchronous Backend
823              
824             For the asynchronous backend, things are more complicated: Whenever it
825             listens for another request by the parent, it might detect that the socket
826             was closed (e.g. because the parent exited). It will sotp listening for
827             new requests and instead try to write out any remaining data (if any) or
828             simply check whether the socket can be written to. After this, the RPC
829             process is effectively done - no new requests are incoming, no outstanding
830             request data can be written back.
831              
832             Since chances are high that there are event watchers that the RPC server
833             knows nothing about (why else would one use the async backend if not for
834             the ability to register watchers?), the event loop would often happily
835             continue.
836              
837             This is why the asynchronous backend explicitly calls C when
838             it is done (under other circumstances, such as when there is an I/O error
839             and there is outstanding data to write, it will log a fatal message via
840             L, also causing the program to exit).
841              
842             You can override this by specifying a function name to call via the C
843             parameter instead.
844              
845             =back
846              
847             =head1 ADVANCED TOPICS
848              
849             =head2 Choosing a backend
850              
851             So how do you decide which backend to use? Well, that's your problem to
852             solve, but here are some thoughts on the matter:
853              
854             =over 4
855              
856             =item Synchronous
857              
858             The synchronous backend does not rely on any external modules (well,
859             except L, which works around a bug in how perl's warning
860             system works). This keeps the process very small, for example, on my
861             system, an empty perl interpreter uses 1492kB RSS, which becomes 2020kB
862             after C (for people who grew up with C64s around
863             them this is probably shocking every single time they see it). The worker
864             process in the first example in this document uses 1792kB.
865              
866             Since the calls are done synchronously, slow jobs will keep newer jobs
867             from executing.
868              
869             The synchronous backend also has no overhead due to running an event loop
870             - reading requests is therefore very efficient, while writing responses is
871             less so, as every response results in a write syscall.
872              
873             If the parent process is busy and a bit slow reading responses, the child
874             waits instead of processing further requests. This also limits the amount
875             of memory needed for buffering, as never more than one response has to be
876             buffered.
877              
878             The API in the child is simple - you just have to define a function that
879             does something and returns something.
880              
881             It's hard to use modules or code that relies on an event loop, as the
882             child cannot execute anything while it waits for more input.
883              
884             =item Asynchronous
885              
886             The asynchronous backend relies on L, which tries to be small,
887             but still comes at a price: On my system, the worker from example 1a uses
888             3420kB RSS (for L, which loads L, which needs L
889             which in turn loads a lot of other modules such as L, L,
890             L, L...).
891              
892             It batches requests and responses reasonably efficiently, doing only as
893             few reads and writes as needed, but needs to poll for events via the event
894             loop.
895              
896             Responses are queued when the parent process is busy. This means the child
897             can continue to execute any queued requests. It also means that a child
898             might queue a lot of responses in memory when it generates them and the
899             parent process is slow accepting them.
900              
901             The API is not a straightforward RPC pattern - you have to call a
902             "done" callback to pass return values and signal completion. Also, more
903             importantly, the API starts jobs as fast as possible - when 1000 jobs
904             are queued and the jobs are slow, they will all run concurrently. The
905             child must implement some queueing/limiting mechanism if this causes
906             problems. Alternatively, the parent could limit the amount of rpc calls
907             that are outstanding.
908              
909             Blocking use of condvars is not supported (in the main thread, outside of
910             e.g. L threads).
911              
912             Using event-based modules such as L, L, L and so on is
913             easy.
914              
915             =back
916              
917             =head2 Passing file descriptors
918              
919             Unlike L, this module has no in-built file handle or file
920             descriptor passing abilities.
921              
922             The reason is that passing file descriptors is extraordinary tricky
923             business, and conflicts with efficient batching of messages.
924              
925             There still is a method you can use: Create a
926             C and C one half of it to
927             the process before you pass control to C.
928              
929             Whenever you want to pass a file descriptor, send an rpc request to the
930             child process (so it expects the descriptor), then send it over the other
931             half of the socketpair. The child should fetch the descriptor from the
932             half it has passed earlier.
933              
934             Here is some (untested) pseudocode to that effect:
935              
936             use AnyEvent::Util;
937             use AnyEvent::Fork;
938             use AnyEvent::Fork::RPC;
939             use IO::FDPass;
940              
941             my ($s1, $s2) = AnyEvent::Util::portable_socketpair;
942              
943             my $rpc = AnyEvent::Fork
944             ->new
945             ->send_fh ($s2)
946             ->require ("MyWorker")
947             ->AnyEvent::Fork::RPC::run ("MyWorker::run"
948             init => "MyWorker::init",
949             );
950              
951             undef $s2; # no need to keep it around
952              
953             # pass an fd
954             $rpc->("i'll send some fd now, please expect it!", my $cv = AE::cv);
955              
956             IO::FDPass fileno $s1, fileno $handle_to_pass;
957              
958             $cv->recv;
959              
960             The MyWorker module could look like this:
961              
962             package MyWorker;
963              
964             use IO::FDPass;
965              
966             my $s2;
967              
968             sub init {
969             $s2 = $_[0];
970             }
971              
972             sub run {
973             if ($_[0] eq "i'll send some fd now, please expect it!") {
974             my $fd = IO::FDPass::recv fileno $s2;
975             ...
976             }
977             }
978              
979             Of course, this might be blocking if you pass a lot of file descriptors,
980             so you might want to look into L which can handle the
981             gory details.
982              
983             =head1 EXCEPTIONS
984              
985             There are no provisions whatsoever for catching exceptions at this time -
986             in the child, exceptions might kill the process, causing calls to be lost
987             and the parent encountering a fatal error. In the parent, exceptions in
988             the result callback will not be caught and cause undefined behaviour.
989              
990             =head1 SEE ALSO
991              
992             L, to create the processes in the first place.
993              
994             L, likewise, but helpful for remote processes.
995              
996             L, to manage whole pools of processes.
997              
998             =head1 AUTHOR AND CONTACT INFORMATION
999              
1000             Marc Lehmann
1001             http://software.schmorp.de/pkg/AnyEvent-Fork-RPC
1002              
1003             =cut
1004              
1005             1
1006