File Coverage

blib/lib/AnyEvent/Fork/RPC.pm
Criterion Covered Total %
statement 69 82 84.1
branch 27 44 61.3
condition 16 42 38.1
subroutine 10 13 76.9
pod 1 1 100.0
total 123 182 67.5


line stmt bran cond sub pod time code
1             =head1 NAME
2              
3             AnyEvent::Fork::RPC - simple RPC extension for AnyEvent::Fork
4              
5             =head1 SYNOPSIS
6              
7             use AnyEvent::Fork;
8             use AnyEvent::Fork::RPC;
9              
10             my $rpc = AnyEvent::Fork
11             ->new
12             ->require ("MyModule")
13             ->AnyEvent::Fork::RPC::run (
14             "MyModule::server",
15             );
16              
17             use AnyEvent;
18              
19             my $cv = AE::cv;
20              
21             $rpc->(1, 2, 3, sub {
22             print "MyModule::server returned @_\n";
23             $cv->send;
24             });
25              
26             $cv->recv;
27              
28             =head1 DESCRIPTION
29              
30             This module implements a simple RPC protocol and backend for processes
31             created via L or L, allowing you
32             to call a function in the child process and receive its return values (up
33             to 4GB serialised).
34              
35             It implements two different backends: a synchronous one that works like a
36             normal function call, and an asynchronous one that can run multiple jobs
37             concurrently in the child, using AnyEvent.
38              
39             It also implements an asynchronous event mechanism from the child to the
40             parent, that could be used for progress indications or other information.
41              
42             =head1 EXAMPLES
43              
44             =head2 Example 1: Synchronous Backend
45              
46             Here is a simple example that implements a backend that executes C
47             and C calls, and reports their status back. It also reports the
48             number of requests it has processed every three requests, which is clearly
49             silly, but illustrates the use of events.
50              
51             First the parent process:
52              
53             use AnyEvent;
54             use AnyEvent::Fork;
55             use AnyEvent::Fork::RPC;
56              
57             my $done = AE::cv;
58              
59             my $rpc = AnyEvent::Fork
60             ->new
61             ->require ("MyWorker")
62             ->AnyEvent::Fork::RPC::run ("MyWorker::run",
63             on_error => sub { warn "ERROR: $_[0]"; exit 1 },
64             on_event => sub { warn "$_[0] requests handled\n" },
65             on_destroy => $done,
66             );
67              
68             for my $id (1..6) {
69             $rpc->(rmdir => "/tmp/somepath/$id", sub {
70             $_[0]
71             or warn "/tmp/somepath/$id: $_[1]\n";
72             });
73             }
74              
75             undef $rpc;
76              
77             $done->recv;
78              
79             The parent creates the process, queues a few rmdir's. It then forgets
80             about the C<$rpc> object, so that the child exits after it has handled the
81             requests, and then it waits till the requests have been handled.
82              
83             The child is implemented using a separate module, C, shown here:
84              
85             package MyWorker;
86              
87             my $count;
88              
89             sub run {
90             my ($cmd, $path) = @_;
91              
92             AnyEvent::Fork::RPC::event ($count)
93             unless ++$count % 3;
94              
95             my $status = $cmd eq "rmdir" ? rmdir $path
96             : $cmd eq "unlink" ? unlink $path
97             : die "fatal error, illegal command '$cmd'";
98              
99             $status or (0, "$!")
100             }
101              
102             1
103              
104             The C function first sends a "progress" event every three calls, and
105             then executes C or C, depending on the first parameter (or
106             dies with a fatal error - obviously, you must never let this happen :).
107              
108             Eventually it returns the status value true if the command was successful,
109             or the status value 0 and the stringified error message.
110              
111             On my system, running the first code fragment with the given
112             F in the current directory yields:
113              
114             /tmp/somepath/1: No such file or directory
115             /tmp/somepath/2: No such file or directory
116             3 requests handled
117             /tmp/somepath/3: No such file or directory
118             /tmp/somepath/4: No such file or directory
119             /tmp/somepath/5: No such file or directory
120             6 requests handled
121             /tmp/somepath/6: No such file or directory
122              
123             Obviously, none of the directories I am trying to delete even exist. Also,
124             the events and responses are processed in exactly the same order as
125             they were created in the child, which is true for both synchronous and
126             asynchronous backends.
127              
128             Note that the parentheses in the call to C are
129             not optional. That is because the function isn't defined when the code is
130             compiled. You can make sure it is visible by pre-loading the correct
131             backend module in the call to C:
132              
133             ->require ("AnyEvent::Fork::RPC::Sync", "MyWorker")
134              
135             Since the backend module declares the C function, loading it first
136             ensures that perl will correctly interpret calls to it.
137              
138             And as a final remark, there is a fine module on CPAN that can
139             asynchronously C and C and a lot more, and more efficiently
140             than this example, namely L.
141              
142             =head3 Example 1a: the same with the asynchronous backend
143              
144             This example only shows what needs to be changed to use the async backend
145             instead. Doing this is not very useful, the purpose of this example is
146             to show the minimum amount of change that is required to go from the
147             synchronous to the asynchronous backend.
148              
149             To use the async backend in the previous example, you need to add the
150             C parameter to the C call:
151              
152             ->AnyEvent::Fork::RPC::run ("MyWorker::run",
153             async => 1,
154             ...
155              
156             And since the function call protocol is now changed, you need to adopt
157             C to the async API.
158              
159             First, you need to accept the extra initial C<$done> callback:
160              
161             sub run {
162             my ($done, $cmd, $path) = @_;
163              
164             And since a response is now generated when C<$done> is called, as opposed
165             to when the function returns, we need to call the C<$done> function with
166             the status:
167              
168             $done->($status or (0, "$!"));
169              
170             A few remarks are in order. First, it's quite pointless to use the async
171             backend for this example - but it I possible. Second, you can call
172             C<$done> before or after returning from the function. Third, having both
173             returned from the function and having called the C<$done> callback, the
174             child process may exit at any time, so you should call C<$done> only when
175             you really I done.
176              
177             =head2 Example 2: Asynchronous Backend
178              
179             This example implements multiple count-downs in the child, using
180             L timers. While this is a bit silly (one could use timers in the
181             parent just as well), it illustrates the ability to use AnyEvent in the
182             child and the fact that responses can arrive in a different order then the
183             requests.
184              
185             It also shows how to embed the actual child code into a C<__DATA__>
186             section, so it doesn't need any external files at all.
187              
188             And when your parent process is often busy, and you have stricter timing
189             requirements, then running timers in a child process suddenly doesn't look
190             so silly anymore.
191              
192             Without further ado, here is the code:
193              
194             use AnyEvent;
195             use AnyEvent::Fork;
196             use AnyEvent::Fork::RPC;
197              
198             my $done = AE::cv;
199              
200             my $rpc = AnyEvent::Fork
201             ->new
202             ->require ("AnyEvent::Fork::RPC::Async")
203             ->eval (do { local $/; })
204             ->AnyEvent::Fork::RPC::run ("run",
205             async => 1,
206             on_error => sub { warn "ERROR: $_[0]"; exit 1 },
207             on_event => sub { print $_[0] },
208             on_destroy => $done,
209             );
210              
211             for my $count (3, 2, 1) {
212             $rpc->($count, sub {
213             warn "job $count finished\n";
214             });
215             }
216              
217             undef $rpc;
218              
219             $done->recv;
220              
221             __DATA__
222              
223             # this ends up in main, as we don't use a package declaration
224              
225             use AnyEvent;
226              
227             sub run {
228             my ($done, $count) = @_;
229              
230             my $n;
231              
232             AnyEvent::Fork::RPC::event "starting to count up to $count\n";
233              
234             my $w; $w = AE::timer 1, 1, sub {
235             ++$n;
236              
237             AnyEvent::Fork::RPC::event "count $n of $count\n";
238              
239             if ($n == $count) {
240             undef $w;
241             $done->();
242             }
243             };
244             }
245              
246             The parent part (the one before the C<__DATA__> section) isn't very
247             different from the earlier examples. It sets async mode, preloads
248             the backend module (so the C function is
249             declared), uses a slightly different C handler (which we use
250             simply for logging purposes) and then, instead of loading a module with
251             the actual worker code, it C's the code from the data section in the
252             child process.
253              
254             It then starts three countdowns, from 3 to 1 seconds downwards, destroys
255             the rpc object so the example finishes eventually, and then just waits for
256             the stuff to trickle in.
257              
258             The worker code uses the event function to log some progress messages, but
259             mostly just creates a recurring one-second timer.
260              
261             The timer callback increments a counter, logs a message, and eventually,
262             when the count has been reached, calls the finish callback.
263              
264             On my system, this results in the following output. Since all timers fire
265             at roughly the same time, the actual order isn't guaranteed, but the order
266             shown is very likely what you would get, too.
267              
268             starting to count up to 3
269             starting to count up to 2
270             starting to count up to 1
271             count 1 of 3
272             count 1 of 2
273             count 1 of 1
274             job 1 finished
275             count 2 of 2
276             job 2 finished
277             count 2 of 3
278             count 3 of 3
279             job 3 finished
280              
281             While the overall ordering isn't guaranteed, the async backend still
282             guarantees that events and responses are delivered to the parent process
283             in the exact same ordering as they were generated in the child process.
284              
285             And unless your system is I busy, it should clearly show that the
286             job started last will finish first, as it has the lowest count.
287              
288             This concludes the async example. Since L does not
289             actually fork, you are free to use about any module in the child, not just
290             L, but also L, or L for example.
291              
292             =head2 Example 3: Asynchronous backend with Coro
293              
294             With L you can create a nice asynchronous backend implementation by
295             defining an rpc server function that creates a new Coro thread for every
296             request that calls a function "normally", i.e. the parameters from the
297             parent process are passed to it, and any return values are returned to the
298             parent process, e.g.:
299              
300             package My::Arith;
301              
302             sub add {
303             return $_[0] + $_[1];
304             }
305              
306             sub mul {
307             return $_[0] * $_[1];
308             }
309              
310             sub run {
311             my ($done, $func, @arg) = @_;
312              
313             Coro::async_pool {
314             $done->($func->(@arg));
315             };
316             }
317              
318             The C function creates a new thread for every invocation, using the
319             first argument as function name, and calls the C<$done> callback on it's
320             return values. This makes it quite natural to define the C and C
321             functions to add or multiply two numbers and return the result.
322              
323             Since this is the asynchronous backend, it's quite possible to define RPC
324             function that do I/O or wait for external events - their execution will
325             overlap as needed.
326              
327             The above could be used like this:
328              
329             my $rpc = AnyEvent::Fork
330             ->new
331             ->require ("MyWorker")
332             ->AnyEvent::Fork::RPC::run ("My::Arith::run",
333             on_error => ..., on_event => ..., on_destroy => ...,
334             );
335              
336             $rpc->(add => 1, 3, Coro::rouse_cb); say Coro::rouse_wait;
337             $rpc->(mul => 3, 2, Coro::rouse_cb); say Coro::rouse_wait;
338              
339             The C's will print C<4> and C<6>.
340              
341             =head2 Example 4: Forward AnyEvent::Log messages using C
342              
343             This partial example shows how to use the C function to forward
344             L messages to the parent.
345              
346             For this, the parent needs to provide a suitable C:
347              
348             ->AnyEvent::Fork::RPC::run (
349             on_event => sub {
350             if ($_[0] eq "ae_log") {
351             my (undef, $level, $message) = @_;
352             AE::log $level, $message;
353             } else {
354             # other event types
355             }
356             },
357             )
358              
359             In the child, as early as possible, the following code should reconfigure
360             L to log via C:
361              
362             $AnyEvent::Log::LOG->log_cb (sub {
363             my ($timestamp, $orig_ctx, $level, $message) = @{+shift};
364              
365             if (defined &AnyEvent::Fork::RPC::event) {
366             AnyEvent::Fork::RPC::event (ae_log => $level, $message);
367             } else {
368             warn "[$$ before init] $message\n";
369             }
370             });
371              
372             There is an important twist - the C function
373             is only defined when the child is fully initialised. If you redirect the
374             log messages in your C function for example, then the C
375             function might not yet be available. This is why the log callback checks
376             whether the function is there using C, and only then uses it to
377             log the message.
378              
379             =head1 PARENT PROCESS USAGE
380              
381             This module exports nothing, and only implements a single function:
382              
383             =over 4
384              
385             =cut
386              
387             package AnyEvent::Fork::RPC;
388              
389 5     5   82399 use common::sense;
  5         60  
  5         26  
390              
391 5     5   1157 use Errno ();
  5         2805  
  5         89  
392 5     5   1139 use Guard ();
  5         935  
  5         91  
393              
394 5     5   1000 use AnyEvent;
  5         5449  
  5         7214  
395              
396             our $VERSION = '2.0';
397              
398             =item my $rpc = AnyEvent::Fork::RPC::run $fork, $function, [key => value...]
399              
400             The traditional way to call it. But it is way cooler to call it in the
401             following way:
402              
403             =item my $rpc = $fork->AnyEvent::Fork::RPC::run ($function, [key => value...])
404              
405             This C function/method can be used in place of the
406             L method. Just like that method, it takes over
407             the L process, but instead of calling the specified
408             C<$function> directly, it runs a server that accepts RPC calls and handles
409             responses.
410              
411             It returns a function reference that can be used to call the function in
412             the child process, handling serialisation and data transfers.
413              
414             The following key/value pairs are allowed. It is recommended to have at
415             least an C or C handler set.
416              
417             =over 4
418              
419             =item on_error => $cb->($msg)
420              
421             Called on (fatal) errors, with a descriptive (hopefully) message. If
422             this callback is not provided, but C is, then the C
423             callback is called with the first argument being the string C,
424             followed by the error message.
425              
426             If neither handler is provided, then the error is reported with loglevel
427             C via C.
428              
429             =item on_event => $cb->(...)
430              
431             Called for every call to the C function in the
432             child, with the arguments of that function passed to the callback.
433              
434             Also called on errors when no C handler is provided.
435              
436             =item on_destroy => $cb->()
437              
438             Called when the C<$rpc> object has been destroyed and all requests have
439             been successfully handled. This is useful when you queue some requests and
440             want the child to go away after it has handled them. The problem is that
441             the parent must not exit either until all requests have been handled, and
442             this can be accomplished by waiting for this callback.
443              
444             =item init => $function (default: none)
445              
446             When specified (by name), this function is called in the child as the very
447             first thing when taking over the process, with all the arguments normally
448             passed to the C function, except the communications
449             socket.
450              
451             It can be used to do one-time things in the child such as storing passed
452             parameters or opening database connections.
453              
454             It is called very early - before the serialisers are created or the
455             C<$function> name is resolved into a function reference, so it could be
456             used to load any modules that provide the serialiser or function. It can
457             not, however, create events.
458              
459             =item done => $function (default: C)
460              
461             The function to call when the asynchronous backend detects an end of file
462             condition when reading from the communications socket I there are no
463             outstanding requests. It is ignored by the synchronous backend.
464              
465             By overriding this you can prolong the life of a RPC process after e.g.
466             the parent has exited by running the event loop in the provided function
467             (or simply calling it, for example, when your child process uses L you
468             could provide L as C function).
469              
470             Of course, in that case you are responsible for exiting at the appropriate
471             time and not returning from
472              
473             =item async => $boolean (default: C<0>)
474              
475             The default server used in the child does all I/O blockingly, and only
476             allows a single RPC call to execute concurrently.
477              
478             Setting C to a true value switches to another implementation that
479             uses L in the child and allows multiple concurrent RPC calls (it
480             does not support recursion in the event loop however, blocking condvar
481             calls will fail).
482              
483             The actual API in the child is documented in the section that describes
484             the calling semantics of the returned C<$rpc> function.
485              
486             If you want to pre-load the actual back-end modules to enable memory
487             sharing, then you should load C for
488             synchronous, and C for asynchronous mode.
489              
490             If you use a template process and want to fork both sync and async
491             children, then it is permissible to load both modules.
492              
493             =item serialiser => $string (default: C<$AnyEvent::Fork::RPC::STRING_SERIALISER>)
494              
495             All arguments, result data and event data have to be serialised to be
496             transferred between the processes. For this, they have to be frozen and
497             thawed in both parent and child processes.
498              
499             By default, only octet strings can be passed between the processes,
500             which is reasonably fast and efficient and requires no extra modules
501             (the C distribution does not provide these extra
502             serialiser modules).
503              
504             For more complicated use cases, you can provide your own freeze and thaw
505             functions, by specifying a string with perl source code. It's supposed to
506             return two code references when evaluated: the first receives a list of
507             perl values and must return an octet string. The second receives the octet
508             string and must return the original list of values.
509              
510             If you need an external module for serialisation, then you can either
511             pre-load it into your L process, or you can add a C
512             or C statement into the serialiser string. Or both.
513              
514             Here are some examples - all of them are also available as global
515             variables that make them easier to use.
516              
517             =over 4
518              
519             =item C<$AnyEvent::Fork::RPC::STRING_SERIALISER> - octet strings only
520              
521             This serialiser (currently the default) concatenates length-prefixes octet
522             strings, and is the default. That means you can only pass (and return)
523             strings containing character codes 0-255.
524              
525             The main advantages of this serialiser are the high speed and that it
526             doesn't need another module. The main disadvantage is that you are very
527             limited in what you can pass - only octet strings.
528              
529             Implementation:
530              
531             (
532             sub { pack "(w/a*)*", @_ },
533             sub { unpack "(w/a*)*", shift }
534             )
535              
536             =item C<$AnyEvent::Fork::RPC::CBOR_XS_SERIALISER> - uses L
537              
538             This serialiser creates CBOR::XS arrays - you have to make sure the
539             L module is installed for this serialiser to work. It can be
540             beneficial for sharing when you preload the L module in a template
541             process.
542              
543             L is about as fast as the octet string serialiser, but supports
544             complex data structures (similar to JSON) and is faster than any of the
545             other serialisers. If you have the L module available, it's the
546             best choice.
547              
548             The encoder enables C (so this serialisation method can
549             encode cyclic and self-referencing data structures).
550              
551             Implementation:
552              
553             use CBOR::XS ();
554             (
555             sub { CBOR::XS::encode_cbor_sharing \@_ },
556             sub { @{ CBOR::XS::decode_cbor shift } }
557             )
558              
559             =item C<$AnyEvent::Fork::RPC::JSON_SERIALISER> - uses L or L
560              
561             This serialiser creates JSON arrays - you have to make sure the L
562             module is installed for this serialiser to work. It can be beneficial for
563             sharing when you preload the L module in a template process.
564              
565             L (with L installed) is slower than the octet string
566             serialiser, but usually much faster than L, unless big chunks of
567             binary data need to be transferred.
568              
569             Implementation:
570              
571             use JSON ();
572             (
573             sub { JSON::encode_json \@_ },
574             sub { @{ JSON::decode_json shift } }
575             )
576              
577             =item C<$AnyEvent::Fork::RPC::STORABLE_SERIALISER> - L
578              
579             This serialiser uses L, which means it has high chance of
580             serialising just about anything you throw at it, at the cost of having
581             very high overhead per operation. It also comes with perl. It should be
582             used when you need to serialise complex data structures.
583              
584             Implementation:
585              
586             use Storable ();
587             (
588             sub { Storable::freeze \@_ },
589             sub { @{ Storable::thaw shift } }
590             )
591              
592             =item C<$AnyEvent::Fork::RPC::NSTORABLE_SERIALISER> - portable Storable
593              
594             This serialiser also uses L, but uses it's "network" format
595             to serialise data, which makes it possible to talk to different
596             perl binaries (for example, when talking to a process created with
597             L).
598              
599             Implementation:
600              
601             use Storable ();
602             (
603             sub { Storable::nfreeze \@_ },
604             sub { @{ Storable::thaw shift } }
605             )
606              
607             =back
608              
609             =item buflen => $bytes (default: C<512 - 16>)
610              
611             The starting size of the read buffer for request and response data.
612              
613             C ensures that the buffer for reeading request and
614             response data is large enough for at leats aingle request or response, and
615             will dynamically enlarge the buffer if needed.
616              
617             While this ensures that memory is not overly wasted, it typically leads
618             to having to do one syscall per request, which can be inefficient in some
619             cases. In such cases, it can be beneficient to increase the buffer size to
620             hold more than one request.
621              
622             =item buflen_req => $bytes (default: same as C)
623              
624             Overrides C for request data (as read by the forked process).
625              
626             =item buflen_res => $bytes (default: same as C)
627              
628             Overrides C for response data (replies read by the parent process).
629              
630             =back
631              
632             See the examples section earlier in this document for some actual
633             examples.
634              
635             =cut
636              
637             our $STRING_SERIALISER = '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })';
638             our $CBOR_XS_SERIALISER = 'use CBOR::XS (); (sub { CBOR::XS::encode_cbor_sharing \@_ }, sub { @{ CBOR::XS::decode_cbor shift } })';
639             our $JSON_SERIALISER = 'use JSON (); (sub { JSON::encode_json \@_ }, sub { @{ JSON::decode_json shift } })';
640             our $STORABLE_SERIALISER = 'use Storable (); (sub { Storable::freeze \@_ }, sub { @{ Storable::thaw shift } })';
641             our $NSTORABLE_SERIALISER = 'use Storable (); (sub { Storable::nfreeze \@_ }, sub { @{ Storable::thaw shift } })';
642              
643             sub run {
644 5     5 1 22210 my ($self, $function, %arg) = @_;
645              
646 5   33     37 my $serialiser = delete $arg{serialiser} || $STRING_SERIALISER;
647 5         16 my $on_event = delete $arg{on_event};
648 5         12 my $on_error = delete $arg{on_error};
649 5         12 my $on_destroy = delete $arg{on_destroy};
650            
651             # default for on_error is to on_event, if specified
652             $on_error ||= $on_event
653 0     0   0 ? sub { $on_event->(error => shift) }
654 5 0 33 0   18 : sub { AE::log die => "AnyEvent::Fork::RPC: uncaught error: $_[0]." };
  0         0  
655              
656             # default for on_event is to raise an error
657 5   50 0   15 $on_event ||= sub { $on_error->("event received, but no on_event handler") };
  0         0  
658              
659 5 50       617 my ($f, $t) = eval $serialiser; die $@ if $@;
  5         32  
660              
661 5         16 my (@rcb, %rcb, $fh, $shutdown, $wbuf, $ww);
662 5   50     43 my ($rlen, $rbuf, $rw) = $arg{buflen_res} || $arg{buflen} || 512 - 16;
663              
664             my $wcb = sub {
665 4     4   479 my $len = syswrite $fh, $wbuf;
666              
667 4 50       21 unless (defined $len) {
668 0 0 0     0 if ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
669 0         0 undef $rw; undef $ww; # it ends here
  0         0  
670 0         0 $on_error->("$!");
671             }
672             }
673              
674 4         11 substr $wbuf, 0, $len, "";
675              
676 4 50       14 unless (length $wbuf) {
677 4         9 undef $ww;
678 4 100       52 $shutdown and shutdown $fh, 1;
679             }
680 5         23 };
681              
682 5 100       28 my $module = "AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync");
683              
684             $self->eval ("use $module 2 ()")
685             ->send_arg (
686             function => $function,
687             init => $arg{init},
688             serialiser => $serialiser,
689             done => $arg{done} || "$module\::do_exit",
690             rlen => $arg{buflen_req} || $arg{buflen} || 512 - 16,
691             -10 # the above are 10 arguments
692             )
693             ->run ("$module\::run", sub {
694 5 100   5   2046 $fh = shift
695             or return $on_error->("connection failed");
696              
697 4         9 my ($id, $len);
698             $rw = AE::io $fh, 0, sub {
699 12 50       366596 $rlen = $rlen * 2 + 16 if $rlen - 128 < length $rbuf;
700 12         204 $len = sysread $fh, $rbuf, $rlen - length $rbuf, length $rbuf;
701              
702 12 100 0     67 if ($len) {
    50          
    0          
703 8         36 while (8 <= length $rbuf) {
704 15         235 ($id, $len) = unpack "NN", $rbuf;
705 15 50       57 8 + $len <= length $rbuf
706             or last;
707              
708 15         603 my @r = $t->(substr $rbuf, 8, $len);
709 15         56 substr $rbuf, 0, 8 + $len, "";
710              
711 15 100       45 if ($id) {
712 5 100       23 if (@rcb) {
    50          
713 4         10 (shift @rcb)->(@r);
714             } elsif (my $cb = delete $rcb{$id}) {
715 1         6 $cb->(@r);
716             } else {
717 0         0 undef $rw; undef $ww;
  0         0  
718 0         0 $on_error->("unexpected data from child");
719             }
720             } else {
721 10         45 $on_event->(@r);
722             }
723             }
724             } elsif (defined $len) {
725 4         9 undef $rw; undef $ww; # it ends here
  4         8  
726              
727 4 100 100     33 if (@rcb || %rcb) {
728 2         7 $on_error->("unexpected eof");
729             } else {
730 2 50       29 $on_destroy->()
731             if $on_destroy;
732             }
733             } elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
734 0         0 undef $rw; undef $ww; # it ends here
  0         0  
735 0         0 $on_error->("read: $!");
736             }
737 4         40 };
738              
739 4   33     42 $ww ||= AE::io $fh, 1, $wcb;
740 5   33     28 });
      50        
741              
742             my $guard = Guard::guard {
743 5     5   298 $shutdown = 1;
744              
745 5 100 66     109 shutdown $fh, 1 if $fh && !$ww;
746 5         408 };
747              
748 5         11 my $id;
749              
750             $arg{async}
751             ? sub {
752 3 50   3   221 $id = ($id == 0xffffffff ? 0 : $id) + 1;
753 3 0       76 $id = ($id == 0xffffffff ? 0 : $id) + 1 while exists $rcb{$id}; # rarely loops
754              
755 3         14 $rcb{$id} = pop;
756              
757 3         6 $guard if 0; # keep it alive
758              
759 3         81 $wbuf .= pack "NN/a*", $id, &$f;
760 3   33     24 $ww ||= $fh && AE::io $fh, 1, $wcb;
      33        
761             }
762             : sub {
763 5     5   344 push @rcb, pop;
764              
765 5         9 $guard; # keep it alive
766              
767 5         112 $wbuf .= pack "N/a*", &$f;
768 5   33     34 $ww ||= $fh && AE::io $fh, 1, $wcb;
      33        
769             }
770 5 100       39 }
771              
772             =item $rpc->(..., $cb->(...))
773              
774             The RPC object returned by C is actually a code
775             reference. There are two things you can do with it: call it, and let it go
776             out of scope (let it get destroyed).
777              
778             If C was false when C<$rpc> was created (the default), then, if you
779             call C<$rpc>, the C<$function> is invoked with all arguments passed to
780             C<$rpc> except the last one (the callback). When the function returns, the
781             callback will be invoked with all the return values.
782              
783             If C was true, then the C<$function> receives an additional
784             initial argument, the result callback. In this case, returning from
785             C<$function> does nothing - the function only counts as "done" when the
786             result callback is called, and any arguments passed to it are considered
787             the return values. This makes it possible to "return" from event handlers
788             or e.g. Coro threads.
789              
790             The other thing that can be done with the RPC object is to destroy it. In
791             this case, the child process will execute all remaining RPC calls, report
792             their results, and then exit.
793              
794             See the examples section earlier in this document for some actual
795             examples.
796              
797             =back
798              
799             =head1 CHILD PROCESS USAGE
800              
801             The following function is not available in this module. They are only
802             available in the namespace of this module when the child is running,
803             without having to load any extra modules. They are part of the child-side
804             API of L.
805              
806             Note that these functions are typically not yet declared when code is
807             compiled into the child, because the backend module is only loaded when
808             you call C, which is typically the last method you call on the fork
809             object.
810              
811             Therefore, you either have to explicitly pre-load the right backend module
812             or mark calls to these functions as function calls, e.g.:
813              
814             AnyEvent::Fork::RPC::event (0 => "five");
815             AnyEvent::Fork::RPC::event->(0 => "five");
816             &AnyEvent::Fork::RPC::flush;
817              
818             =over 4
819              
820             =item AnyEvent::Fork::RPC::event (...)
821              
822             Send an event to the parent. Events are a bit like RPC calls made by the
823             child process to the parent, except that there is no notion of return
824             values.
825              
826             See the examples section earlier in this document for some actual
827             examples.
828              
829             Note: the event data, like any data send to the parent, might not be sent
830             immediatelly but queued for later sending, so there is no guarantee that
831             the event has been sent to the parent when the call returns - when you
832             e.g. exit directly after calling this function, the parent might never
833             receive the event. See the next function for a remedy.
834              
835             =item $success = AnyEvent::Fork::RPC::flush ()
836              
837             Synchronously wait and flush the reply data to the parent. Returns true on
838             success and false otherwise (i.e. when the reply data cannot be written at
839             all). Ignoring the success status is a common and healthy behaviour.
840              
841             Only the "async" backend does something on C - the "sync" backend
842             is not buffering reply data and always returns true from this function.
843              
844             Normally, reply data might or might not be written to the parent
845             immediatelly but is buffered. This can greatly improve performance and
846             efficiency, but sometimes can get in your way: for example. when you want
847             to send an error message just before exiting, or when you want to ensure
848             replies timely reach the parent before starting a long blocking operation.
849              
850             In these cases, you can call this function to flush any outstanding reply
851             data to the parent. This is done blockingly, so no requests will be
852             handled and no event callbacks will be called.
853              
854             For example, you could wrap your request function in a C block and
855             report the exception string back to the caller just before exiting:
856              
857             sub req {
858             ...
859              
860             eval {
861             ...
862             };
863              
864             if ($@) {
865             AnyEvent::RPC::event (throw => "$@");
866             AnyEvent::RPC::flush ();
867             exit;
868             }
869              
870             ...
871             }
872              
873             =back
874              
875             =head2 PROCESS EXIT
876              
877             If and when the child process exits depends on the backend and
878             configuration. Apart from explicit exits (e.g. by calling C) or
879             runtime conditions (uncaught exceptions, signals etc.), the backends exit
880             under these conditions:
881              
882             =over 4
883              
884             =item Synchronous Backend
885              
886             The synchronous backend is very simple: when the process waits for another
887             request to arrive and the writing side (usually in the parent) is closed,
888             it will exit normally, i.e. as if your main program reached the end of the
889             file.
890              
891             That means that if your parent process exits, the RPC process will usually
892             exit as well, either because it is idle anyway, or because it executes a
893             request. In the latter case, you will likely get an error when the RPc
894             process tries to send the results to the parent (because agruably, you
895             shouldn't exit your parent while there are still outstanding requests).
896              
897             The process is usually quiescent when it happens, so it should rarely be a
898             problem, and C handlers can be used to clean up.
899              
900             =item Asynchronous Backend
901              
902             For the asynchronous backend, things are more complicated: Whenever it
903             listens for another request by the parent, it might detect that the socket
904             was closed (e.g. because the parent exited). It will sotp listening for
905             new requests and instead try to write out any remaining data (if any) or
906             simply check whether the socket can be written to. After this, the RPC
907             process is effectively done - no new requests are incoming, no outstanding
908             request data can be written back.
909              
910             Since chances are high that there are event watchers that the RPC server
911             knows nothing about (why else would one use the async backend if not for
912             the ability to register watchers?), the event loop would often happily
913             continue.
914              
915             This is why the asynchronous backend explicitly calls C when
916             it is done (under other circumstances, such as when there is an I/O error
917             and there is outstanding data to write, it will log a fatal message via
918             L, also causing the program to exit).
919              
920             You can override this by specifying a function name to call via the C
921             parameter instead.
922              
923             =back
924              
925             =head1 ADVANCED TOPICS
926              
927             =head2 Choosing a backend
928              
929             So how do you decide which backend to use? Well, that's your problem to
930             solve, but here are some thoughts on the matter:
931              
932             =over 4
933              
934             =item Synchronous
935              
936             The synchronous backend does not rely on any external modules (well,
937             except L, which works around a bug in how perl's warning
938             system works). This keeps the process very small, for example, on my
939             system, an empty perl interpreter uses 1492kB RSS, which becomes 2020kB
940             after C (for people who grew up with C64s around
941             them this is probably shocking every single time they see it). The worker
942             process in the first example in this document uses 1792kB.
943              
944             Since the calls are done synchronously, slow jobs will keep newer jobs
945             from executing.
946              
947             The synchronous backend also has no overhead due to running an event loop
948             - reading requests is therefore very efficient, while writing responses is
949             less so, as every response results in a write syscall.
950              
951             If the parent process is busy and a bit slow reading responses, the child
952             waits instead of processing further requests. This also limits the amount
953             of memory needed for buffering, as never more than one response has to be
954             buffered.
955              
956             The API in the child is simple - you just have to define a function that
957             does something and returns something.
958              
959             It's hard to use modules or code that relies on an event loop, as the
960             child cannot execute anything while it waits for more input.
961              
962             =item Asynchronous
963              
964             The asynchronous backend relies on L, which tries to be small,
965             but still comes at a price: On my system, the worker from example 1a uses
966             3420kB RSS (for L, which loads L, which needs L
967             which in turn loads a lot of other modules such as L, L,
968             L, L...).
969              
970             It batches requests and responses reasonably efficiently, doing only as
971             few reads and writes as needed, but needs to poll for events via the event
972             loop.
973              
974             Responses are queued when the parent process is busy. This means the child
975             can continue to execute any queued requests. It also means that a child
976             might queue a lot of responses in memory when it generates them and the
977             parent process is slow accepting them.
978              
979             The API is not a straightforward RPC pattern - you have to call a
980             "done" callback to pass return values and signal completion. Also, more
981             importantly, the API starts jobs as fast as possible - when 1000 jobs
982             are queued and the jobs are slow, they will all run concurrently. The
983             child must implement some queueing/limiting mechanism if this causes
984             problems. Alternatively, the parent could limit the amount of rpc calls
985             that are outstanding.
986              
987             Blocking use of condvars is not supported (in the main thread, outside of
988             e.g. L threads).
989              
990             Using event-based modules such as L, L, L and so on is
991             easy.
992              
993             =back
994              
995             =head2 Passing file descriptors
996              
997             Unlike L, this module has no in-built file handle or file
998             descriptor passing abilities.
999              
1000             The reason is that passing file descriptors is extraordinary tricky
1001             business, and conflicts with efficient batching of messages.
1002              
1003             There still is a method you can use: Create a
1004             C and C one half of it to
1005             the process before you pass control to C.
1006              
1007             Whenever you want to pass a file descriptor, send an rpc request to the
1008             child process (so it expects the descriptor), then send it over the other
1009             half of the socketpair. The child should fetch the descriptor from the
1010             half it has passed earlier.
1011              
1012             Here is some (untested) pseudocode to that effect:
1013              
1014             use AnyEvent::Util;
1015             use AnyEvent::Fork;
1016             use AnyEvent::Fork::RPC;
1017             use IO::FDPass;
1018              
1019             my ($s1, $s2) = AnyEvent::Util::portable_socketpair;
1020              
1021             my $rpc = AnyEvent::Fork
1022             ->new
1023             ->send_fh ($s2)
1024             ->require ("MyWorker")
1025             ->AnyEvent::Fork::RPC::run ("MyWorker::run"
1026             init => "MyWorker::init",
1027             );
1028              
1029             undef $s2; # no need to keep it around
1030              
1031             # pass an fd
1032             $rpc->("i'll send some fd now, please expect it!", my $cv = AE::cv);
1033              
1034             IO::FDPass fileno $s1, fileno $handle_to_pass;
1035              
1036             $cv->recv;
1037              
1038             The MyWorker module could look like this:
1039              
1040             package MyWorker;
1041              
1042             use IO::FDPass;
1043              
1044             my $s2;
1045              
1046             sub init {
1047             $s2 = $_[0];
1048             }
1049              
1050             sub run {
1051             if ($_[0] eq "i'll send some fd now, please expect it!") {
1052             my $fd = IO::FDPass::recv fileno $s2;
1053             ...
1054             }
1055             }
1056              
1057             Of course, this might be blocking if you pass a lot of file descriptors,
1058             so you might want to look into L which can handle the
1059             gory details.
1060              
1061             =head1 EXCEPTIONS
1062              
1063             There are no provisions whatsoever for catching exceptions at this time -
1064             in the child, exceptions might kill the process, causing calls to be lost
1065             and the parent encountering a fatal error. In the parent, exceptions in
1066             the result callback will not be caught and cause undefined behaviour.
1067              
1068             =head1 SEE ALSO
1069              
1070             L, to create the processes in the first place.
1071              
1072             L, likewise, but helpful for remote processes.
1073              
1074             L, to manage whole pools of processes.
1075              
1076             =head1 AUTHOR AND CONTACT INFORMATION
1077              
1078             Marc Lehmann
1079             http://software.schmorp.de/pkg/AnyEvent-Fork-RPC
1080              
1081             =cut
1082              
1083             1
1084