File Coverage

blib/lib/AnyEvent/MP.pm
Criterion Covered Total %
statement 1 3 33.3
branch n/a
condition n/a
subroutine 1 1 100.0
pod n/a
total 2 4 50.0


line stmt bran cond sub pod time code
1             =head1 NAME
2              
3             AnyEvent::MP - erlang-style multi-processing/message-passing framework
4              
5             =head1 SYNOPSIS
6              
7             use AnyEvent::MP;
8              
9             $NODE # contains this node's node ID
10             NODE # returns this node's node ID
11              
12             $SELF # receiving/own port id in rcv callbacks
13              
14             # initialise the node so it can send/receive messages
15             configure;
16              
17             # ports are message destinations
18              
19             # sending messages
20             snd $port, type => data...;
21             snd $port, @msg;
22             snd @msg_with_first_element_being_a_port;
23              
24             # creating/using ports, the simple way
25             my $simple_port = port { my @msg = @_ };
26              
27             # creating/using ports, tagged message matching
28             my $port = port;
29             rcv $port, ping => sub { snd $_[0], "pong" };
30             rcv $port, pong => sub { warn "pong received\n" };
31              
32             # create a port on another node
33             my $port = spawn $node, $initfunc, @initdata;
34              
35             # destroy a port again
36             kil $port; # "normal" kill
37             kil $port, my_error => "everything is broken"; # error kill
38              
39             # monitoring
40             mon $localport, $cb->(@msg) # callback is invoked on death
41             mon $localport, $otherport # kill otherport on abnormal death
42             mon $localport, $otherport, @msg # send message on death
43              
44             # temporarily execute code in port context
45             peval $port, sub { die "kill the port!" };
46              
47             # execute callbacks in $SELF port context
48             my $timer = AE::timer 1, 0, psub {
49             die "kill the port, delayed";
50             };
51              
52             =head1 CURRENT STATUS
53              
54             bin/aemp - stable.
55             AnyEvent::MP - stable API, should work.
56             AnyEvent::MP::Intro - explains most concepts.
57             AnyEvent::MP::Kernel - mostly stable API.
58             AnyEvent::MP::Global - stable API.
59              
60             =head1 DESCRIPTION
61              
62             This module (-family) implements a simple message passing framework.
63              
64             Despite its simplicity, you can securely message other processes running
65             on the same or other hosts, and you can supervise entities remotely.
66              
67             For an introduction to this module family, see the L<AnyEvent::MP::Intro>
68             manual page and the examples under F<eg/>.
69              
70             =head1 CONCEPTS
71              
72             =over 4
73              
74             =item port
75              
76             Not to be confused with a TCP port, a "port" is something you can send
77             messages to (with the C<snd> function).
78              
79             Ports allow you to register C<rcv> handlers that can match all or just
80             some messages. Messages send to ports will not be queued, regardless of
81             anything was listening for them or not.
82              
83             =item port ID - C<nodeid#portname>
84              
85             A port ID is the concatenation of a node ID, a hash-mark (C<#>) as
86             separator, and a port name (a printable string of unspecified format).
87              
88             =item node
89              
90             A node is a single process containing at least one port - the node port,
91             which enables nodes to manage each other remotely, and to create new
92             ports.
93              
94             Nodes are either public (have one or more listening ports) or private
95             (no listening ports). Private nodes cannot talk to other private nodes
96             currently.
97              
98             =item node ID - C<[A-Za-z0-9_\-.:]*>
99              
100             A node ID is a string that uniquely identifies the node within a
101             network. Depending on the configuration used, node IDs can look like a
102             hostname, a hostname and a port, or a random string. AnyEvent::MP itself
103             doesn't interpret node IDs in any way.
104              
105             =item binds - C<ip:port>
106              
107             Nodes can only talk to each other by creating some kind of connection to
108             each other. To do this, nodes should listen on one or more local transport
109             endpoints - binds. Currently, only standard C<ip:port> specifications can
110             be used, which specify TCP ports to listen on.
111              
112             =item seed nodes
113              
114             When a node starts, it knows nothing about the network. To teach the node
115             about the network it first has to contact some other node within the
116             network. This node is called a seed.
117              
118             Apart from the fact that other nodes know them as seed nodes and they have
119             to have fixed listening addresses, seed nodes are perfectly normal nodes -
120             any node can function as a seed node for others.
121              
122             In addition to discovering the network, seed nodes are also used to
123             maintain the network and to connect nodes that otherwise would have
124             trouble connecting. They form the backbone of an AnyEvent::MP network.
125              
126             Seed nodes are expected to be long-running, and at least one seed node
127             should always be available. They should also be relatively responsive - a
128             seed node that blocks for long periods will slow down everybody else.
129              
130             =item seeds - C<host:port>
131              
132             Seeds are transport endpoint(s) (usually a hostname/IP address and a
133             TCP port) of nodes that should be used as seed nodes.
134              
135             The nodes listening on those endpoints are expected to be long-running,
136             and at least one of those should always be available. When nodes run out
137             of connections (e.g. due to a network error), they try to re-establish
138             connections to some seednodes again to join the network.
139              
140             =back
141              
142             =head1 VARIABLES/FUNCTIONS
143              
144             =over 4
145              
146             =cut
147              
148             package AnyEvent::MP;
149              
150 1     1   1616 use AnyEvent::MP::Kernel;
  0            
  0            
151              
152             use common::sense;
153              
154             use Carp ();
155              
156             use AE ();
157              
158             use base "Exporter";
159              
160             our $VERSION = '1.30';
161              
162             our @EXPORT = qw(
163             NODE $NODE *SELF node_of after
164             configure
165             snd rcv mon mon_guard kil psub peval spawn cal
166             port
167             );
168              
169             our $SELF;
170              
171             sub _self_die() {
172             my $msg = $@;
173             $msg =~ s/\n+$// unless ref $msg;
174             kil $SELF, die => $msg;
175             }
176              
177             =item $thisnode = NODE / $NODE
178              
179             The C<NODE> function returns, and the C<$NODE> variable contains, the node
180             ID of the node running in the current process. This value is initialised by
181             a call to C<configure>.
182              
183             =item $nodeid = node_of $port
184              
185             Extracts and returns the node ID from a port ID or a node ID.
186              
187             =item configure $profile, key => value...
188              
189             =item configure key => value...
190              
191             Before a node can talk to other nodes on the network (i.e. enter
192             "distributed mode") it has to configure itself - the minimum a node needs
193             to know is its own name, and optionally it should know the addresses of
194             some other nodes in the network to discover other nodes.
195              
196             The key/value pairs are basically the same ones as documented for the
197             F<aemp> command line utility (sans the set/del prefix).
198              
199             This function configures a node - it must be called exactly once (or
200             never) before calling other AnyEvent::MP functions.
201              
202             =over 4
203              
204             =item step 1, gathering configuration from profiles
205              
206             The function first looks up a profile in the aemp configuration (see the
207             L<aemp> commandline utility). The profile name can be specified via the
208             named C<profile> parameter or can simply be the first parameter). If it is
209             missing, then the nodename (F<uname -n>) will be used as profile name.
210              
211             The profile data is then gathered as follows:
212              
213             First, all remaining key => value pairs (all of which are conveniently
214             undocumented at the moment) will be interpreted as configuration
215             data. Then they will be overwritten by any values specified in the global
216             default configuration (see the F<aemp> utility), then the chain of
217             profiles chosen by the profile name (and any C<parent> attributes).
218              
219             That means that the values specified in the profile have highest priority
220             and the values specified directly via C<configure> have lowest priority,
221             and can only be used to specify defaults.
222              
223             If the profile specifies a node ID, then this will become the node ID of
224             this process. If not, then the profile name will be used as node ID. The
225             special node ID of C<anon/> will be replaced by a random node ID.
226              
227             =item step 2, bind listener sockets
228              
229             The next step is to look up the binds in the profile, followed by binding
230             aemp protocol listeners on all binds specified (it is possible and valid
231             to have no binds, meaning that the node cannot be contacted form the
232             outside. This means the node cannot talk to other nodes that also have no
233             binds, but it can still talk to all "normal" nodes).
234              
235             If the profile does not specify a binds list, then a default of C<*> is
236             used, meaning the node will bind on a dynamically-assigned port on every
237             local IP address it finds.
238              
239             =item step 3, connect to seed nodes
240              
241             As the last step, the seeds list from the profile is passed to the
242             L<AnyEvent::MP::Global> module, which will then use it to keep
243             connectivity with at least one node at any point in time.
244              
245             =back
246              
247             Example: become a distributed node using the local node name as profile.
248             This should be the most common form of invocation for "daemon"-type nodes.
249              
250             configure
251              
252             Example: become an anonymous node. This form is often used for commandline
253             clients.
254              
255             configure nodeid => "anon/";
256              
257             Example: configure a node using a profile called seed, which si suitable
258             for a seed node as it binds on all local addresses on a fixed port (4040,
259             customary for aemp).
260              
261             # use the aemp commandline utility
262             # aemp profile seed nodeid anon/ binds '*:4040'
263              
264             # then use it
265             configure profile => "seed";
266              
267             # or simply use aemp from the shell again:
268             # aemp run profile seed
269              
270             # or provide a nicer-to-remember nodeid
271             # aemp run profile seed nodeid "$(hostname)"
272              
273             =item $SELF
274              
275             Contains the current port id while executing C<rcv> callbacks or C<psub>
276             blocks.
277              
278             =item *SELF, SELF, %SELF, @SELF...
279              
280             Due to some quirks in how perl exports variables, it is impossible to
281             just export C<$SELF>, all the symbols named C<SELF> are exported by this
282             module, but only C<$SELF> is currently used.
283              
284             =item snd $port, type => @data
285              
286             =item snd $port, @msg
287              
288             Send the given message to the given port, which can identify either a
289             local or a remote port, and must be a port ID.
290              
291             While the message can be almost anything, it is highly recommended to
292             use a string as first element (a port ID, or some word that indicates a
293             request type etc.) and to consist if only simple perl values (scalars,
294             arrays, hashes) - if you think you need to pass an object, think again.
295              
296             The message data logically becomes read-only after a call to this
297             function: modifying any argument (or values referenced by them) is
298             forbidden, as there can be considerable time between the call to C<snd>
299             and the time the message is actually being serialised - in fact, it might
300             never be copied as within the same process it is simply handed to the
301             receiving port.
302              
303             The type of data you can transfer depends on the transport protocol: when
304             JSON is used, then only strings, numbers and arrays and hashes consisting
305             of those are allowed (no objects). When Storable is used, then anything
306             that Storable can serialise and deserialise is allowed, and for the local
307             node, anything can be passed. Best rely only on the common denominator of
308             these.
309              
310             =item $local_port = port
311              
312             Create a new local port object and returns its port ID. Initially it has
313             no callbacks set and will throw an error when it receives messages.
314              
315             =item $local_port = port { my @msg = @_ }
316              
317             Creates a new local port, and returns its ID. Semantically the same as
318             creating a port and calling C<rcv $port, $callback> on it.
319              
320             The block will be called for every message received on the port, with the
321             global variable C<$SELF> set to the port ID. Runtime errors will cause the
322             port to be C<kil>ed. The message will be passed as-is, no extra argument
323             (i.e. no port ID) will be passed to the callback.
324              
325             If you want to stop/destroy the port, simply C<kil> it:
326              
327             my $port = port {
328             my @msg = @_;
329             ...
330             kil $SELF;
331             };
332              
333             =cut
334              
335             sub rcv($@);
336              
337             sub _kilme {
338             die "received message on port without callback";
339             }
340              
341             sub port(;&) {
342             my $id = "$UNIQ." . $ID++;
343             my $port = "$NODE#$id";
344              
345             rcv $port, shift || \&_kilme;
346              
347             $port
348             }
349              
350             =item rcv $local_port, $callback->(@msg)
351              
352             Replaces the default callback on the specified port. There is no way to
353             remove the default callback: use C<sub { }> to disable it, or better
354             C<kil> the port when it is no longer needed.
355              
356             The global C<$SELF> (exported by this module) contains C<$port> while
357             executing the callback. Runtime errors during callback execution will
358             result in the port being C<kil>ed.
359              
360             The default callback received all messages not matched by a more specific
361             C<tag> match.
362              
363             =item rcv $local_port, tag => $callback->(@msg_without_tag), ...
364              
365             Register (or replace) callbacks to be called on messages starting with the
366             given tag on the given port (and return the port), or unregister it (when
367             C<$callback> is C<$undef> or missing). There can only be one callback
368             registered for each tag.
369              
370             The original message will be passed to the callback, after the first
371             element (the tag) has been removed. The callback will use the same
372             environment as the default callback (see above).
373              
374             Example: create a port and bind receivers on it in one go.
375              
376             my $port = rcv port,
377             msg1 => sub { ... },
378             msg2 => sub { ... },
379             ;
380              
381             Example: create a port, bind receivers and send it in a message elsewhere
382             in one go:
383              
384             snd $otherport, reply =>
385             rcv port,
386             msg1 => sub { ... },
387             ...
388             ;
389              
390             Example: temporarily register a rcv callback for a tag matching some port
391             (e.g. for an rpc reply) and unregister it after a message was received.
392              
393             rcv $port, $otherport => sub {
394             my @reply = @_;
395              
396             rcv $SELF, $otherport;
397             };
398              
399             =cut
400              
401             sub rcv($@) {
402             my $port = shift;
403             my ($nodeid, $portid) = split /#/, $port, 2;
404              
405             $NODE{$nodeid} == $NODE{""}
406             or Carp::croak "$port: rcv can only be called on local ports, caught";
407              
408             while (@_) {
409             if (ref $_[0]) {
410             if (my $self = $PORT_DATA{$portid}) {
411             "AnyEvent::MP::Port" eq ref $self
412             or Carp::croak "$port: rcv can only be called on message matching ports, caught";
413              
414             $self->[0] = shift;
415             } else {
416             my $cb = shift;
417             $PORT{$portid} = sub {
418             local $SELF = $port;
419             eval { &$cb }; _self_die if $@;
420             };
421             }
422             } elsif (defined $_[0]) {
423             my $self = $PORT_DATA{$portid} ||= do {
424             my $self = bless [$PORT{$portid} || sub { }, { }, $port], "AnyEvent::MP::Port";
425              
426             $PORT{$portid} = sub {
427             local $SELF = $port;
428              
429             if (my $cb = $self->[1]{$_[0]}) {
430             shift;
431             eval { &$cb }; _self_die if $@;
432             } else {
433             &{ $self->[0] };
434             }
435             };
436              
437             $self
438             };
439              
440             "AnyEvent::MP::Port" eq ref $self
441             or Carp::croak "$port: rcv can only be called on message matching ports, caught";
442              
443             my ($tag, $cb) = splice @_, 0, 2;
444              
445             if (defined $cb) {
446             $self->[1]{$tag} = $cb;
447             } else {
448             delete $self->[1]{$tag};
449             }
450             }
451             }
452              
453             $port
454             }
455              
456             =item peval $port, $coderef[, @args]
457              
458             Evaluates the given C<$codref> within the contetx of C<$port>, that is,
459             when the code throews an exception the C<$port> will be killed.
460              
461             Any remaining args will be passed to the callback. Any return values will
462             be returned to the caller.
463              
464             This is useful when you temporarily want to execute code in the context of
465             a port.
466              
467             Example: create a port and run some initialisation code in it's context.
468              
469             my $port = port { ... };
470              
471             peval $port, sub {
472             init
473             or die "unable to init";
474             };
475              
476             =cut
477              
478             sub peval($$) {
479             local $SELF = shift;
480             my $cb = shift;
481              
482             if (wantarray) {
483             my @res = eval { &$cb };
484             _self_die if $@;
485             @res
486             } else {
487             my $res = eval { &$cb };
488             _self_die if $@;
489             $res
490             }
491             }
492              
493             =item $closure = psub { BLOCK }
494              
495             Remembers C<$SELF> and creates a closure out of the BLOCK. When the
496             closure is executed, sets up the environment in the same way as in C<rcv>
497             callbacks, i.e. runtime errors will cause the port to get C<kil>ed.
498              
499             The effect is basically as if it returned C<< sub { peval $SELF, sub {
500             BLOCK }, @_ } >>.
501              
502             This is useful when you register callbacks from C<rcv> callbacks:
503              
504             rcv delayed_reply => sub {
505             my ($delay, @reply) = @_;
506             my $timer = AE::timer $delay, 0, psub {
507             snd @reply, $SELF;
508             };
509             };
510              
511             =cut
512              
513             sub psub(&) {
514             my $cb = shift;
515              
516             my $port = $SELF
517             or Carp::croak "psub can only be called from within rcv or psub callbacks, not";
518              
519             sub {
520             local $SELF = $port;
521              
522             if (wantarray) {
523             my @res = eval { &$cb };
524             _self_die if $@;
525             @res
526             } else {
527             my $res = eval { &$cb };
528             _self_die if $@;
529             $res
530             }
531             }
532             }
533              
534             =item $guard = mon $port, $cb->(@reason) # call $cb when $port dies
535              
536             =item $guard = mon $port, $rcvport # kill $rcvport when $port dies
537              
538             =item $guard = mon $port # kill $SELF when $port dies
539              
540             =item $guard = mon $port, $rcvport, @msg # send a message when $port dies
541              
542             Monitor the given port and do something when the port is killed or
543             messages to it were lost, and optionally return a guard that can be used
544             to stop monitoring again.
545              
546             In the first form (callback), the callback is simply called with any
547             number of C<@reason> elements (no @reason means that the port was deleted
548             "normally"). Note also that I<< the callback B<must> never die >>, so use
549             C<eval> if unsure.
550              
551             In the second form (another port given), the other port (C<$rcvport>)
552             will be C<kil>'ed with C<@reason>, if a @reason was specified, i.e. on
553             "normal" kils nothing happens, while under all other conditions, the other
554             port is killed with the same reason.
555              
556             The third form (kill self) is the same as the second form, except that
557             C<$rvport> defaults to C<$SELF>.
558              
559             In the last form (message), a message of the form C<@msg, @reason> will be
560             C<snd>.
561              
562             Monitoring-actions are one-shot: once messages are lost (and a monitoring
563             alert was raised), they are removed and will not trigger again.
564              
565             As a rule of thumb, monitoring requests should always monitor a port from
566             a local port (or callback). The reason is that kill messages might get
567             lost, just like any other message. Another less obvious reason is that
568             even monitoring requests can get lost (for example, when the connection
569             to the other node goes down permanently). When monitoring a port locally
570             these problems do not exist.
571              
572             C<mon> effectively guarantees that, in the absence of hardware failures,
573             after starting the monitor, either all messages sent to the port will
574             arrive, or the monitoring action will be invoked after possible message
575             loss has been detected. No messages will be lost "in between" (after
576             the first lost message no further messages will be received by the
577             port). After the monitoring action was invoked, further messages might get
578             delivered again.
579              
580             Inter-host-connection timeouts and monitoring depend on the transport
581             used. The only transport currently implemented is TCP, and AnyEvent::MP
582             relies on TCP to detect node-downs (this can take 10-15 minutes on a
583             non-idle connection, and usually around two hours for idle connections).
584              
585             This means that monitoring is good for program errors and cleaning up
586             stuff eventually, but they are no replacement for a timeout when you need
587             to ensure some maximum latency.
588              
589             Example: call a given callback when C<$port> is killed.
590              
591             mon $port, sub { warn "port died because of <@_>\n" };
592              
593             Example: kill ourselves when C<$port> is killed abnormally.
594              
595             mon $port;
596              
597             Example: send us a restart message when another C<$port> is killed.
598              
599             mon $port, $self => "restart";
600              
601             =cut
602              
603             sub mon {
604             my ($nodeid, $port) = split /#/, shift, 2;
605              
606             my $node = $NODE{$nodeid} || add_node $nodeid;
607              
608             my $cb = @_ ? shift : $SELF || Carp::croak 'mon: called with one argument only, but $SELF not set,';
609              
610             unless (ref $cb) {
611             if (@_) {
612             # send a kill info message
613             my (@msg) = ($cb, @_);
614             $cb = sub { snd @msg, @_ };
615             } else {
616             # simply kill other port
617             my $port = $cb;
618             $cb = sub { kil $port, @_ if @_ };
619             }
620             }
621              
622             $node->monitor ($port, $cb);
623              
624             defined wantarray
625             and ($cb += 0, AnyEvent::Util::guard { $node->unmonitor ($port, $cb) })
626             }
627              
628             =item $guard = mon_guard $port, $ref, $ref...
629              
630             Monitors the given C<$port> and keeps the passed references. When the port
631             is killed, the references will be freed.
632              
633             Optionally returns a guard that will stop the monitoring.
634              
635             This function is useful when you create e.g. timers or other watchers and
636             want to free them when the port gets killed (note the use of C<psub>):
637              
638             $port->rcv (start => sub {
639             my $timer; $timer = mon_guard $port, AE::timer 1, 1, psub {
640             undef $timer if 0.9 < rand;
641             });
642             });
643              
644             =cut
645              
646             sub mon_guard {
647             my ($port, @refs) = @_;
648              
649             #TODO: mon-less form?
650              
651             mon $port, sub { 0 && @refs }
652             }
653              
654             =item kil $port[, @reason]
655              
656             Kill the specified port with the given C<@reason>.
657              
658             If no C<@reason> is specified, then the port is killed "normally" -
659             monitor callback will be invoked, but the kil will not cause linked ports
660             (C<mon $mport, $lport> form) to get killed.
661              
662             If a C<@reason> is specified, then linked ports (C<mon $mport, $lport>
663             form) get killed with the same reason.
664              
665             Runtime errors while evaluating C<rcv> callbacks or inside C<psub> blocks
666             will be reported as reason C<< die => $@ >>.
667              
668             Transport/communication errors are reported as C<< transport_error =>
669             $message >>.
670              
671             =cut
672              
673             =item $port = spawn $node, $initfunc[, @initdata]
674              
675             Creates a port on the node C<$node> (which can also be a port ID, in which
676             case it's the node where that port resides).
677              
678             The port ID of the newly created port is returned immediately, and it is
679             possible to immediately start sending messages or to monitor the port.
680              
681             After the port has been created, the init function is called on the remote
682             node, in the same context as a C<rcv> callback. This function must be a
683             fully-qualified function name (e.g. C<MyApp::Chat::Server::init>). To
684             specify a function in the main program, use C<::name>.
685              
686             If the function doesn't exist, then the node tries to C<require>
687             the package, then the package above the package and so on (e.g.
688             C<MyApp::Chat::Server>, C<MyApp::Chat>, C<MyApp>) until the function
689             exists or it runs out of package names.
690              
691             The init function is then called with the newly-created port as context
692             object (C<$SELF>) and the C<@initdata> values as arguments. It I<must>
693             call one of the C<rcv> functions to set callbacks on C<$SELF>, otherwise
694             the port might not get created.
695              
696             A common idiom is to pass a local port, immediately monitor the spawned
697             port, and in the remote init function, immediately monitor the passed
698             local port. This two-way monitoring ensures that both ports get cleaned up
699             when there is a problem.
700              
701             C<spawn> guarantees that the C<$initfunc> has no visible effects on the
702             caller before C<spawn> returns (by delaying invocation when spawn is
703             called for the local node).
704              
705             Example: spawn a chat server port on C<$othernode>.
706              
707             # this node, executed from within a port context:
708             my $server = spawn $othernode, "MyApp::Chat::Server::connect", $SELF;
709             mon $server;
710              
711             # init function on C<$othernode>
712             sub connect {
713             my ($srcport) = @_;
714              
715             mon $srcport;
716              
717             rcv $SELF, sub {
718             ...
719             };
720             }
721              
722             =cut
723              
724             sub _spawn {
725             my $port = shift;
726             my $init = shift;
727              
728             # rcv will create the actual port
729             local $SELF = "$NODE#$port";
730             eval {
731             &{ load_func $init }
732             };
733             _self_die if $@;
734             }
735              
736             sub spawn(@) {
737             my ($nodeid, undef) = split /#/, shift, 2;
738              
739             my $id = "$RUNIQ." . $ID++;
740              
741             $_[0] =~ /::/
742             or Carp::croak "spawn init function must be a fully-qualified name, caught";
743              
744             snd_to_func $nodeid, "AnyEvent::MP::_spawn" => $id, @_;
745              
746             "$nodeid#$id"
747             }
748              
749             =item after $timeout, @msg
750              
751             =item after $timeout, $callback
752              
753             Either sends the given message, or call the given callback, after the
754             specified number of seconds.
755              
756             This is simply a utility function that comes in handy at times - the
757             AnyEvent::MP author is not convinced of the wisdom of having it, though,
758             so it may go away in the future.
759              
760             =cut
761              
762             sub after($@) {
763             my ($timeout, @action) = @_;
764              
765             my $t; $t = AE::timer $timeout, 0, sub {
766             undef $t;
767             ref $action[0]
768             ? $action[0]()
769             : snd @action;
770             };
771             }
772              
773             =item cal $port, @msg, $callback[, $timeout]
774              
775             A simple form of RPC - sends a message to the given C<$port> with the
776             given contents (C<@msg>), but adds a reply port to the message.
777              
778             The reply port is created temporarily just for the purpose of receiving
779             the reply, and will be C<kil>ed when no longer needed.
780              
781             A reply message sent to the port is passed to the C<$callback> as-is.
782              
783             If an optional time-out (in seconds) is given and it is not C<undef>,
784             then the callback will be called without any arguments after the time-out
785             elapsed and the port is C<kil>ed.
786              
787             If no time-out is given (or it is C<undef>), then the local port will
788             monitor the remote port instead, so it eventually gets cleaned-up.
789              
790             Currently this function returns the temporary port, but this "feature"
791             might go in future versions unless you can make a convincing case that
792             this is indeed useful for something.
793              
794             =cut
795              
796             sub cal(@) {
797             my $timeout = ref $_[-1] ? undef : pop;
798             my $cb = pop;
799              
800             my $port = port {
801             undef $timeout;
802             kil $SELF;
803             &$cb;
804             };
805              
806             if (defined $timeout) {
807             $timeout = AE::timer $timeout, 0, sub {
808             undef $timeout;
809             kil $port;
810             $cb->();
811             };
812             } else {
813             mon $_[0], sub {
814             kil $port;
815             $cb->();
816             };
817             }
818              
819             push @_, $port;
820             &snd;
821              
822             $port
823             }
824              
825             =back
826              
827             =head1 AnyEvent::MP vs. Distributed Erlang
828              
829             AnyEvent::MP got lots of its ideas from distributed Erlang (Erlang node
830             == aemp node, Erlang process == aemp port), so many of the documents and
831             programming techniques employed by Erlang apply to AnyEvent::MP. Here is a
832             sample:
833              
834             http://www.erlang.se/doc/programming_rules.shtml
835             http://erlang.org/doc/getting_started/part_frame.html # chapters 3 and 4
836             http://erlang.org/download/erlang-book-part1.pdf # chapters 5 and 6
837             http://erlang.org/download/armstrong_thesis_2003.pdf # chapters 4 and 5
838              
839             Despite the similarities, there are also some important differences:
840              
841             =over 4
842              
843             =item * Node IDs are arbitrary strings in AEMP.
844              
845             Erlang relies on special naming and DNS to work everywhere in the same
846             way. AEMP relies on each node somehow knowing its own address(es) (e.g. by
847             configuration or DNS), and possibly the addresses of some seed nodes, but
848             will otherwise discover other nodes (and their IDs) itself.
849              
850             =item * Erlang has a "remote ports are like local ports" philosophy, AEMP
851             uses "local ports are like remote ports".
852              
853             The failure modes for local ports are quite different (runtime errors
854             only) then for remote ports - when a local port dies, you I<know> it dies,
855             when a connection to another node dies, you know nothing about the other
856             port.
857              
858             Erlang pretends remote ports are as reliable as local ports, even when
859             they are not.
860              
861             AEMP encourages a "treat remote ports differently" philosophy, with local
862             ports being the special case/exception, where transport errors cannot
863             occur.
864              
865             =item * Erlang uses processes and a mailbox, AEMP does not queue.
866              
867             Erlang uses processes that selectively receive messages, and therefore
868             needs a queue. AEMP is event based, queuing messages would serve no
869             useful purpose. For the same reason the pattern-matching abilities of
870             AnyEvent::MP are more limited, as there is little need to be able to
871             filter messages without dequeuing them.
872              
873             (But see L<Coro::MP> for a more Erlang-like process model on top of AEMP).
874              
875             =item * Erlang sends are synchronous, AEMP sends are asynchronous.
876              
877             Sending messages in Erlang is synchronous and blocks the process (and
878             so does not need a queue that can overflow). AEMP sends are immediate,
879             connection establishment is handled in the background.
880              
881             =item * Erlang suffers from silent message loss, AEMP does not.
882              
883             Erlang implements few guarantees on messages delivery - messages can get
884             lost without any of the processes realising it (i.e. you send messages a,
885             b, and c, and the other side only receives messages a and c).
886              
887             AEMP guarantees (modulo hardware errors) correct ordering, and the
888             guarantee that after one message is lost, all following ones sent to the
889             same port are lost as well, until monitoring raises an error, so there are
890             no silent "holes" in the message sequence.
891              
892             =item * Erlang can send messages to the wrong port, AEMP does not.
893              
894             In Erlang it is quite likely that a node that restarts reuses a process ID
895             known to other nodes for a completely different process, causing messages
896             destined for that process to end up in an unrelated process.
897              
898             AEMP never reuses port IDs, so old messages or old port IDs floating
899             around in the network will not be sent to an unrelated port.
900              
901             =item * Erlang uses unprotected connections, AEMP uses secure
902             authentication and can use TLS.
903              
904             AEMP can use a proven protocol - TLS - to protect connections and
905             securely authenticate nodes.
906              
907             =item * The AEMP protocol is optimised for both text-based and binary
908             communications.
909              
910             The AEMP protocol, unlike the Erlang protocol, supports both programming
911             language independent text-only protocols (good for debugging) and binary,
912             language-specific serialisers (e.g. Storable). By default, unless TLS is
913             used, the protocol is actually completely text-based.
914              
915             It has also been carefully designed to be implementable in other languages
916             with a minimum of work while gracefully degrading functionality to make the
917             protocol simple.
918              
919             =item * AEMP has more flexible monitoring options than Erlang.
920              
921             In Erlang, you can chose to receive I<all> exit signals as messages
922             or I<none>, there is no in-between, so monitoring single processes is
923             difficult to implement. Monitoring in AEMP is more flexible than in
924             Erlang, as one can choose between automatic kill, exit message or callback
925             on a per-process basis.
926              
927             =item * Erlang tries to hide remote/local connections, AEMP does not.
928              
929             Monitoring in Erlang is not an indicator of process death/crashes, in the
930             same way as linking is (except linking is unreliable in Erlang).
931              
932             In AEMP, you don't "look up" registered port names or send to named ports
933             that might or might not be persistent. Instead, you normally spawn a port
934             on the remote node. The init function monitors you, and you monitor the
935             remote port. Since both monitors are local to the node, they are much more
936             reliable (no need for C<spawn_link>).
937              
938             This also saves round-trips and avoids sending messages to the wrong port
939             (hard to do in Erlang).
940              
941             =back
942              
943             =head1 RATIONALE
944              
945             =over 4
946              
947             =item Why strings for port and node IDs, why not objects?
948              
949             We considered "objects", but found that the actual number of methods
950             that can be called are quite low. Since port and node IDs travel over
951             the network frequently, the serialising/deserialising would add lots of
952             overhead, as well as having to keep a proxy object everywhere.
953              
954             Strings can easily be printed, easily serialised etc. and need no special
955             procedures to be "valid".
956              
957             And as a result, a port with just a default receiver consists of a single
958             code reference stored in a global hash - it can't become much cheaper.
959              
960             =item Why favour JSON, why not a real serialising format such as Storable?
961              
962             In fact, any AnyEvent::MP node will happily accept Storable as framing
963             format, but currently there is no way to make a node use Storable by
964             default (although all nodes will accept it).
965              
966             The default framing protocol is JSON because a) JSON::XS is many times
967             faster for small messages and b) most importantly, after years of
968             experience we found that object serialisation is causing more problems
969             than it solves: Just like function calls, objects simply do not travel
970             easily over the network, mostly because they will always be a copy, so you
971             always have to re-think your design.
972              
973             Keeping your messages simple, concentrating on data structures rather than
974             objects, will keep your messages clean, tidy and efficient.
975              
976             =back
977              
978             =head1 SEE ALSO
979              
980             L<AnyEvent::MP::Intro> - a gentle introduction.
981              
982             L<AnyEvent::MP::Kernel> - more, lower-level, stuff.
983              
984             L<AnyEvent::MP::Global> - network maintenance and port groups, to find
985             your applications.
986              
987             L<AnyEvent::MP::DataConn> - establish data connections between nodes.
988              
989             L<AnyEvent::MP::LogCatcher> - simple service to display log messages from
990             all nodes.
991              
992             L<AnyEvent>.
993              
994             =head1 AUTHOR
995              
996             Marc Lehmann <schmorp@schmorp.de>
997             http://home.schmorp.de/
998              
999             =cut
1000              
1001             1
1002