File Coverage

blib/lib/AnyEvent/MP.pm
Criterion Covered Total %
statement 24 145 16.5
branch 0 60 0.0
condition 0 14 0.0
subroutine 8 33 24.2
pod 10 10 100.0
total 42 262 16.0


line stmt bran cond sub pod time code
1             =head1 NAME
2              
3             AnyEvent::MP - erlang-style multi-processing/message-passing framework
4              
5             =head1 SYNOPSIS
6              
7             use AnyEvent::MP;
8              
9             $NODE # contains this node's node ID
10             NODE # returns this node's node ID
11              
12             $SELF # receiving/own port id in rcv callbacks
13              
14             # initialise the node so it can send/receive messages
15             configure;
16              
17             # ports are message destinations
18              
19             # sending messages
20             snd $port, type => data...;
21             snd $port, @msg;
22             snd @msg_with_first_element_being_a_port;
23              
24             # creating/using ports, the simple way
25             my $simple_port = port { my @msg = @_ };
26              
27             # creating/using ports, tagged message matching
28             my $port = port;
29             rcv $port, ping => sub { snd $_[0], "pong" };
30             rcv $port, pong => sub { warn "pong received\n" };
31              
32             # create a port on another node
33             my $port = spawn $node, $initfunc, @initdata;
34              
35             # destroy a port again
36             kil $port; # "normal" kill
37             kil $port, my_error => "everything is broken"; # error kill
38              
39             # monitoring
40             mon $port, $cb->(@msg) # callback is invoked on death
41             mon $port, $localport # kill localport on abnormal death
42             mon $port, $localport, @msg # send message on death
43              
44             # temporarily execute code in port context
45             peval $port, sub { die "kill the port!" };
46              
47             # execute callbacks in $SELF port context
48             my $timer = AE::timer 1, 0, psub {
49             die "kill the port, delayed";
50             };
51              
52             # distributed database - modification
53             db_set $family => $subkey [=> $value] # add a subkey
54             db_del $family => $subkey... # delete one or more subkeys
55             db_reg $family => $port [=> $value] # register a port
56              
57             # distributed database - queries
58             db_family $family => $cb->(\%familyhash)
59             db_keys $family => $cb->(\@keys)
60             db_values $family => $cb->(\@values)
61              
62             # distributed database - monitoring a family
63             db_mon $family => $cb->(\%familyhash, \@added, \@changed, \@deleted)
64              
65             =head1 DESCRIPTION
66              
67             This module (-family) implements a simple message passing framework.
68              
69             Despite its simplicity, you can securely message other processes running
70             on the same or other hosts, and you can supervise entities remotely.
71              
72             For an introduction to this module family, see the L
73             manual page and the examples under F.
74              
75             =head1 CONCEPTS
76              
77             =over 4
78              
79             =item port
80              
81             Not to be confused with a TCP port, a "port" is something you can send
82             messages to (with the C function).
83              
84             Ports allow you to register C handlers that can match all or just
85             some messages. Messages send to ports will not be queued, regardless of
86             anything was listening for them or not.
87              
88             Ports are represented by (printable) strings called "port IDs".
89              
90             =item port ID - C
91              
92             A port ID is the concatenation of a node ID, a hash-mark (C<#>)
93             as separator, and a port name (a printable string of unspecified
94             format created by AnyEvent::MP).
95              
96             =item node
97              
98             A node is a single process containing at least one port - the node port,
99             which enables nodes to manage each other remotely, and to create new
100             ports.
101              
102             Nodes are either public (have one or more listening ports) or private
103             (no listening ports). Private nodes cannot talk to other private nodes
104             currently, but all nodes can talk to public nodes.
105              
106             Nodes is represented by (printable) strings called "node IDs".
107              
108             =item node ID - C<[A-Za-z0-9_\-.:]*>
109              
110             A node ID is a string that uniquely identifies the node within a
111             network. Depending on the configuration used, node IDs can look like a
112             hostname, a hostname and a port, or a random string. AnyEvent::MP itself
113             doesn't interpret node IDs in any way except to uniquely identify a node.
114              
115             =item binds - C
116              
117             Nodes can only talk to each other by creating some kind of connection to
118             each other. To do this, nodes should listen on one or more local transport
119             endpoints - binds.
120              
121             Currently, only standard C specifications can be used, which
122             specify TCP ports to listen on. So a bind is basically just a tcp socket
123             in listening mode that accepts connections from other nodes.
124              
125             =item seed nodes
126              
127             When a node starts, it knows nothing about the network it is in - it
128             needs to connect to at least one other node that is already in the
129             network. These other nodes are called "seed nodes".
130              
131             Seed nodes themselves are not special - they are seed nodes only because
132             some other node I them as such, but any node can be used as seed
133             node for other nodes, and eahc node can use a different set of seed nodes.
134              
135             In addition to discovering the network, seed nodes are also used to
136             maintain the network - all nodes using the same seed node are part of the
137             same network. If a network is split into multiple subnets because e.g. the
138             network link between the parts goes down, then using the same seed nodes
139             for all nodes ensures that eventually the subnets get merged again.
140              
141             Seed nodes are expected to be long-running, and at least one seed node
142             should always be available. They should also be relatively responsive - a
143             seed node that blocks for long periods will slow down everybody else.
144              
145             For small networks, it's best if every node uses the same set of seed
146             nodes. For large networks, it can be useful to specify "regional" seed
147             nodes for most nodes in an area, and use all seed nodes as seed nodes for
148             each other. What's important is that all seed nodes connections form a
149             complete graph, so that the network cannot split into separate subnets
150             forever.
151              
152             Seed nodes are represented by seed IDs.
153              
154             =item seed IDs - C
155              
156             Seed IDs are transport endpoint(s) (usually a hostname/IP address and a
157             TCP port) of nodes that should be used as seed nodes.
158              
159             =item global nodes
160              
161             An AEMP network needs a discovery service - nodes need to know how to
162             connect to other nodes they only know by name. In addition, AEMP offers a
163             distributed "group database", which maps group names to a list of strings
164             - for example, to register worker ports.
165              
166             A network needs at least one global node to work, and allows every node to
167             be a global node.
168              
169             Any node that loads the L module becomes a global
170             node and tries to keep connections to all other nodes. So while it can
171             make sense to make every node "global" in small networks, it usually makes
172             sense to only make seed nodes into global nodes in large networks (nodes
173             keep connections to seed nodes and global nodes, so making them the same
174             reduces overhead).
175              
176             =back
177              
178             =head1 VARIABLES/FUNCTIONS
179              
180             =over 4
181              
182             =cut
183              
184             package AnyEvent::MP;
185              
186 1     1   621 use AnyEvent::MP::Config ();
  1         2  
  1         19  
187 1     1   4 use AnyEvent::MP::Kernel;
  1         1  
  1         54  
188 1         160 use AnyEvent::MP::Kernel qw(
189             %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
190             add_node load_func
191              
192             NODE $NODE
193             configure
194             node_of port_is_local
195             snd kil
196             db_set db_del
197             db_mon db_family db_keys db_values
198 1     1   4 );
  1         2  
199              
200 1     1   5 use common::sense;
  1         1  
  1         5  
201              
202 1     1   32 use Carp ();
  1         1  
  1         11  
203              
204 1     1   3 use AnyEvent ();
  1         1  
  1         9  
205 1     1   4 use Guard ();
  1         1  
  1         13  
206              
207 1     1   4 use base "Exporter";
  1         1  
  1         2006  
208              
209             our $VERSION = '2.02'; # also in MP/Config.pm
210              
211             our @EXPORT = qw(
212             NODE $NODE
213             configure
214             node_of port_is_local
215             snd kil
216             db_set db_del
217             db_mon db_family db_keys db_values
218              
219             *SELF
220              
221             port rcv mon mon_guard psub peval spawn cal
222             db_set db_del db_reg
223             db_mon db_family db_keys db_values
224              
225             after
226             );
227              
228             our $SELF;
229              
230             sub _self_die() {
231 0     0     my $msg = $@;
232 0 0         $msg =~ s/\n+$// unless ref $msg;
233 0           kil $SELF, die => $msg;
234             }
235              
236             =item $thisnode = NODE / $NODE
237              
238             The C function returns, and the C<$NODE> variable contains, the node
239             ID of the node running in the current process. This value is initialised by
240             a call to C.
241              
242             =item $nodeid = node_of $port
243              
244             Extracts and returns the node ID from a port ID or a node ID.
245              
246             =item $is_local = port_is_local $port
247              
248             Returns true iff the port is a local port.
249              
250             =item configure $profile, key => value...
251              
252             =item configure key => value...
253              
254             Before a node can talk to other nodes on the network (i.e. enter
255             "distributed mode") it has to configure itself - the minimum a node needs
256             to know is its own name, and optionally it should know the addresses of
257             some other nodes in the network to discover other nodes.
258              
259             This function configures a node - it must be called exactly once (or
260             never) before calling other AnyEvent::MP functions.
261              
262             The key/value pairs are basically the same ones as documented for the
263             F command line utility (sans the set/del prefix), with these additions:
264              
265             =over 4
266              
267             =item norc => $boolean (default false)
268              
269             If true, then the rc file (e.g. F<~/.perl-anyevent-mp>) will I
270             be consulted - all configuration options must be specified in the
271             C call.
272              
273             =item force => $boolean (default false)
274              
275             IF true, then the values specified in the C will take
276             precedence over any values configured via the rc file. The default is for
277             the rc file to override any options specified in the program.
278              
279             =back
280              
281             =over 4
282              
283             =item step 1, gathering configuration from profiles
284              
285             The function first looks up a profile in the aemp configuration (see the
286             L commandline utility). The profile name can be specified via the
287             named C parameter or can simply be the first parameter). If it is
288             missing, then the nodename (F) will be used as profile name.
289              
290             The profile data is then gathered as follows:
291              
292             First, all remaining key => value pairs (all of which are conveniently
293             undocumented at the moment) will be interpreted as configuration
294             data. Then they will be overwritten by any values specified in the global
295             default configuration (see the F utility), then the chain of
296             profiles chosen by the profile name (and any C attributes).
297              
298             That means that the values specified in the profile have highest priority
299             and the values specified directly via C have lowest priority,
300             and can only be used to specify defaults.
301              
302             If the profile specifies a node ID, then this will become the node ID of
303             this process. If not, then the profile name will be used as node ID, with
304             a unique randoms tring (C) appended.
305              
306             The node ID can contain some C<%> sequences that are expanded: C<%n>
307             is expanded to the local nodename, C<%u> is replaced by a random
308             strign to make the node unique. For example, the F commandline
309             utility uses C as nodename, which might expand to
310             C.
311              
312             =item step 2, bind listener sockets
313              
314             The next step is to look up the binds in the profile, followed by binding
315             aemp protocol listeners on all binds specified (it is possible and valid
316             to have no binds, meaning that the node cannot be contacted from the
317             outside. This means the node cannot talk to other nodes that also have no
318             binds, but it can still talk to all "normal" nodes).
319              
320             If the profile does not specify a binds list, then a default of C<*> is
321             used, meaning the node will bind on a dynamically-assigned port on every
322             local IP address it finds.
323              
324             =item step 3, connect to seed nodes
325              
326             As the last step, the seed ID list from the profile is passed to the
327             L module, which will then use it to keep
328             connectivity with at least one node at any point in time.
329              
330             =back
331              
332             Example: become a distributed node using the local node name as profile.
333             This should be the most common form of invocation for "daemon"-type nodes.
334              
335             configure
336              
337             Example: become a semi-anonymous node. This form is often used for
338             commandline clients.
339              
340             configure nodeid => "myscript/%n/%u";
341              
342             Example: configure a node using a profile called seed, which is suitable
343             for a seed node as it binds on all local addresses on a fixed port (4040,
344             customary for aemp).
345              
346             # use the aemp commandline utility
347             # aemp profile seed binds '*:4040'
348              
349             # then use it
350             configure profile => "seed";
351              
352             # or simply use aemp from the shell again:
353             # aemp run profile seed
354              
355             # or provide a nicer-to-remember nodeid
356             # aemp run profile seed nodeid "$(hostname)"
357              
358             =item $SELF
359              
360             Contains the current port id while executing C callbacks or C
361             blocks.
362              
363             =item *SELF, SELF, %SELF, @SELF...
364              
365             Due to some quirks in how perl exports variables, it is impossible to
366             just export C<$SELF>, all the symbols named C are exported by this
367             module, but only C<$SELF> is currently used.
368              
369             =item snd $port, type => @data
370              
371             =item snd $port, @msg
372              
373             Send the given message to the given port, which can identify either a
374             local or a remote port, and must be a port ID.
375              
376             While the message can be almost anything, it is highly recommended to
377             use a string as first element (a port ID, or some word that indicates a
378             request type etc.) and to consist if only simple perl values (scalars,
379             arrays, hashes) - if you think you need to pass an object, think again.
380              
381             The message data logically becomes read-only after a call to this
382             function: modifying any argument (or values referenced by them) is
383             forbidden, as there can be considerable time between the call to C
384             and the time the message is actually being serialised - in fact, it might
385             never be copied as within the same process it is simply handed to the
386             receiving port.
387              
388             The type of data you can transfer depends on the transport protocol: when
389             JSON is used, then only strings, numbers and arrays and hashes consisting
390             of those are allowed (no objects). When Storable is used, then anything
391             that Storable can serialise and deserialise is allowed, and for the local
392             node, anything can be passed. Best rely only on the common denominator of
393             these.
394              
395             =item $local_port = port
396              
397             Create a new local port object and returns its port ID. Initially it has
398             no callbacks set and will throw an error when it receives messages.
399              
400             =item $local_port = port { my @msg = @_ }
401              
402             Creates a new local port, and returns its ID. Semantically the same as
403             creating a port and calling C on it.
404              
405             The block will be called for every message received on the port, with the
406             global variable C<$SELF> set to the port ID. Runtime errors will cause the
407             port to be Ced. The message will be passed as-is, no extra argument
408             (i.e. no port ID) will be passed to the callback.
409              
410             If you want to stop/destroy the port, simply C it:
411              
412             my $port = port {
413             my @msg = @_;
414             ...
415             kil $SELF;
416             };
417              
418             =cut
419              
420             sub rcv($@);
421              
422             my $KILME = sub {
423             (my $tag = substr $_[0], 0, 30) =~ s/([^\x20-\x7e])/./g;
424             kil $SELF, unhandled_message => "no callback found for message '$tag'";
425             };
426              
427             sub port(;&) {
428 0     0 1   my $id = $UNIQ . ++$ID;
429 0           my $port = "$NODE#$id";
430              
431 0   0       rcv $port, shift || $KILME;
432              
433 0           $port
434             }
435              
436             =item rcv $local_port, $callback->(@msg)
437              
438             Replaces the default callback on the specified port. There is no way to
439             remove the default callback: use C to disable it, or better
440             C the port when it is no longer needed.
441              
442             The global C<$SELF> (exported by this module) contains C<$port> while
443             executing the callback. Runtime errors during callback execution will
444             result in the port being Ced.
445              
446             The default callback receives all messages not matched by a more specific
447             C match.
448              
449             =item rcv $local_port, tag => $callback->(@msg_without_tag), ...
450              
451             Register (or replace) callbacks to be called on messages starting with the
452             given tag on the given port (and return the port), or unregister it (when
453             C<$callback> is C<$undef> or missing). There can only be one callback
454             registered for each tag.
455              
456             The original message will be passed to the callback, after the first
457             element (the tag) has been removed. The callback will use the same
458             environment as the default callback (see above).
459              
460             Example: create a port and bind receivers on it in one go.
461              
462             my $port = rcv port,
463             msg1 => sub { ... },
464             msg2 => sub { ... },
465             ;
466              
467             Example: create a port, bind receivers and send it in a message elsewhere
468             in one go:
469              
470             snd $otherport, reply =>
471             rcv port,
472             msg1 => sub { ... },
473             ...
474             ;
475              
476             Example: temporarily register a rcv callback for a tag matching some port
477             (e.g. for an rpc reply) and unregister it after a message was received.
478              
479             rcv $port, $otherport => sub {
480             my @reply = @_;
481              
482             rcv $SELF, $otherport;
483             };
484              
485             =cut
486              
487             sub rcv($@) {
488 0     0 1   my $port = shift;
489 0           my ($nodeid, $portid) = split /#/, $port, 2;
490              
491 0 0         $nodeid eq $NODE
492             or Carp::croak "$port: rcv can only be called on local ports, caught";
493              
494 0           while (@_) {
495 0 0         if (ref $_[0]) {
    0          
496 0 0         if (my $self = $PORT_DATA{$portid}) {
497 0 0         "AnyEvent::MP::Port" eq ref $self
498             or Carp::croak "$port: rcv can only be called on message matching ports, caught";
499              
500 0           $self->[0] = shift;
501             } else {
502 0           my $cb = shift;
503             $PORT{$portid} = sub {
504 0     0     local $SELF = $port;
505 0 0         eval { &$cb }; _self_die if $@;
  0            
  0            
506 0           };
507             }
508             } elsif (defined $_[0]) {
509 0   0       my $self = $PORT_DATA{$portid} ||= do {
510 0   0 0     my $self = bless [$PORT{$portid} || sub { }, { }, $port], "AnyEvent::MP::Port";
511              
512             $PORT{$portid} = sub {
513 0     0     local $SELF = $port;
514              
515 0 0         if (my $cb = $self->[1]{$_[0]}) {
516 0           shift;
517 0 0         eval { &$cb }; _self_die if $@;
  0            
  0            
518             } else {
519 0           &{ $self->[0] };
  0            
520             }
521 0           };
522              
523 0           $self
524             };
525              
526 0 0         "AnyEvent::MP::Port" eq ref $self
527             or Carp::croak "$port: rcv can only be called on message matching ports, caught";
528              
529 0           my ($tag, $cb) = splice @_, 0, 2;
530              
531 0 0         if (defined $cb) {
532 0           $self->[1]{$tag} = $cb;
533             } else {
534 0           delete $self->[1]{$tag};
535             }
536             }
537             }
538              
539             $port
540 0           }
541              
542             =item peval $port, $coderef[, @args]
543              
544             Evaluates the given C<$codref> within the context of C<$port>, that is,
545             when the code throws an exception the C<$port> will be killed.
546              
547             Any remaining args will be passed to the callback. Any return values will
548             be returned to the caller.
549              
550             This is useful when you temporarily want to execute code in the context of
551             a port.
552              
553             Example: create a port and run some initialisation code in it's context.
554              
555             my $port = port { ... };
556              
557             peval $port, sub {
558             init
559             or die "unable to init";
560             };
561              
562             =cut
563              
564             sub peval($$) {
565 0     0 1   local $SELF = shift;
566 0           my $cb = shift;
567              
568 0 0         if (wantarray) {
569 0           my @res = eval { &$cb };
  0            
570 0 0         _self_die if $@;
571             @res
572 0           } else {
573 0           my $res = eval { &$cb };
  0            
574 0 0         _self_die if $@;
575 0           $res
576             }
577             }
578              
579             =item $closure = psub { BLOCK }
580              
581             Remembers C<$SELF> and creates a closure out of the BLOCK. When the
582             closure is executed, sets up the environment in the same way as in C
583             callbacks, i.e. runtime errors will cause the port to get Ced.
584              
585             The effect is basically as if it returned C<< sub { peval $SELF, sub {
586             BLOCK }, @_ } >>.
587              
588             This is useful when you register callbacks from C callbacks:
589              
590             rcv delayed_reply => sub {
591             my ($delay, @reply) = @_;
592             my $timer = AE::timer $delay, 0, psub {
593             snd @reply, $SELF;
594             };
595             };
596              
597             =cut
598              
599             sub psub(&) {
600 0     0 1   my $cb = shift;
601              
602 0 0         my $port = $SELF
603             or Carp::croak "psub can only be called from within rcv or psub callbacks, not";
604              
605             sub {
606 0     0     local $SELF = $port;
607              
608 0 0         if (wantarray) {
609 0           my @res = eval { &$cb };
  0            
610 0 0         _self_die if $@;
611             @res
612 0           } else {
613 0           my $res = eval { &$cb };
  0            
614 0 0         _self_die if $@;
615 0           $res
616             }
617             }
618 0           }
619              
620             =item $guard = mon $port, $rcvport # kill $rcvport when $port dies
621              
622             =item $guard = mon $port # kill $SELF when $port dies
623              
624             =item $guard = mon $port, $cb->(@reason) # call $cb when $port dies
625              
626             =item $guard = mon $port, $rcvport, @msg # send a message when $port dies
627              
628             Monitor the given port and do something when the port is killed or
629             messages to it were lost, and optionally return a guard that can be used
630             to stop monitoring again.
631              
632             The first two forms distinguish between "normal" and "abnormal" kil's:
633              
634             In the first form (another port given), if the C<$port> is C'ed with
635             a non-empty reason, the other port (C<$rcvport>) will be kil'ed with the
636             same reason. That is, on "normal" kil's nothing happens, while under all
637             other conditions, the other port is killed with the same reason.
638              
639             The second form (kill self) is the same as the first form, except that
640             C<$rvport> defaults to C<$SELF>.
641              
642             The remaining forms don't distinguish between "normal" and "abnormal" kil's
643             - it's up to the callback or receiver to check whether the C<@reason> is
644             empty and act accordingly.
645              
646             In the third form (callback), the callback is simply called with any
647             number of C<@reason> elements (empty @reason means that the port was deleted
648             "normally"). Note also that I<< the callback B never die >>, so use
649             C if unsure.
650              
651             In the last form (message), a message of the form C<$rcvport, @msg,
652             @reason> will be C.
653              
654             Monitoring-actions are one-shot: once messages are lost (and a monitoring
655             alert was raised), they are removed and will not trigger again, even if it
656             turns out that the port is still alive.
657              
658             As a rule of thumb, monitoring requests should always monitor a remote
659             port locally (using a local C<$rcvport> or a callback). The reason is that
660             kill messages might get lost, just like any other message. Another less
661             obvious reason is that even monitoring requests can get lost (for example,
662             when the connection to the other node goes down permanently). When
663             monitoring a port locally these problems do not exist.
664              
665             C effectively guarantees that, in the absence of hardware failures,
666             after starting the monitor, either all messages sent to the port will
667             arrive, or the monitoring action will be invoked after possible message
668             loss has been detected. No messages will be lost "in between" (after
669             the first lost message no further messages will be received by the
670             port). After the monitoring action was invoked, further messages might get
671             delivered again.
672              
673             Inter-host-connection timeouts and monitoring depend on the transport
674             used. The only transport currently implemented is TCP, and AnyEvent::MP
675             relies on TCP to detect node-downs (this can take 10-15 minutes on a
676             non-idle connection, and usually around two hours for idle connections).
677              
678             This means that monitoring is good for program errors and cleaning up
679             stuff eventually, but they are no replacement for a timeout when you need
680             to ensure some maximum latency.
681              
682             Example: call a given callback when C<$port> is killed.
683              
684             mon $port, sub { warn "port died because of <@_>\n" };
685              
686             Example: kill ourselves when C<$port> is killed abnormally.
687              
688             mon $port;
689              
690             Example: send us a restart message when another C<$port> is killed.
691              
692             mon $port, $self => "restart";
693              
694             =cut
695              
696             sub mon {
697 0     0 1   my ($nodeid, $port) = split /#/, shift, 2;
698              
699 0   0       my $node = $NODE{$nodeid} || add_node $nodeid;
700              
701 0 0 0       my $cb = @_ ? shift : $SELF || Carp::croak 'mon: called with one argument only, but $SELF not set,';
702              
703 0 0         unless (ref $cb) {
704 0 0         if (@_) {
705             # send a kill info message
706 0           my (@msg) = ($cb, @_);
707 0     0     $cb = sub { snd @msg, @_ };
  0            
708             } else {
709             # simply kill other port
710 0           my $port = $cb;
711 0 0   0     $cb = sub { kil $port, @_ if @_ };
  0            
712             }
713             }
714              
715 0           $node->monitor ($port, $cb);
716              
717             defined wantarray
718 0     0     and ($cb += 0, Guard::guard { $node->unmonitor ($port, $cb) })
719 0 0         }
720              
721             =item $guard = mon_guard $port, $ref, $ref...
722              
723             Monitors the given C<$port> and keeps the passed references. When the port
724             is killed, the references will be freed.
725              
726             Optionally returns a guard that will stop the monitoring.
727              
728             This function is useful when you create e.g. timers or other watchers and
729             want to free them when the port gets killed (note the use of C):
730              
731             $port->rcv (start => sub {
732             my $timer; $timer = mon_guard $port, AE::timer 1, 1, psub {
733             undef $timer if 0.9 < rand;
734             });
735             });
736              
737             =cut
738              
739             sub mon_guard {
740 0     0 1   my ($port, @refs) = @_;
741              
742             #TODO: mon-less form?
743              
744 0     0     mon $port, sub { 0 && @refs }
745 0           }
746              
747             =item kil $port[, @reason]
748              
749             Kill the specified port with the given C<@reason>.
750              
751             If no C<@reason> is specified, then the port is killed "normally" -
752             monitor callback will be invoked, but the kil will not cause linked ports
753             (C form) to get killed.
754              
755             If a C<@reason> is specified, then linked ports (C
756             form) get killed with the same reason.
757              
758             Runtime errors while evaluating C callbacks or inside C blocks
759             will be reported as reason C<< die => $@ >>.
760              
761             Transport/communication errors are reported as C<< transport_error =>
762             $message >>.
763              
764             Common idioms:
765              
766             # silently remove yourself, do not kill linked ports
767             kil $SELF;
768              
769             # report a failure in some detail
770             kil $SELF, failure_mode_1 => "it failed with too high temperature";
771              
772             # do not waste much time with killing, just die when something goes wrong
773             open my $fh, "
774             or die "file: $!";
775              
776             =item $port = spawn $node, $initfunc[, @initdata]
777              
778             Creates a port on the node C<$node> (which can also be a port ID, in which
779             case it's the node where that port resides).
780              
781             The port ID of the newly created port is returned immediately, and it is
782             possible to immediately start sending messages or to monitor the port.
783              
784             After the port has been created, the init function is called on the remote
785             node, in the same context as a C callback. This function must be a
786             fully-qualified function name (e.g. C). To
787             specify a function in the main program, use C<::name>.
788              
789             If the function doesn't exist, then the node tries to C
790             the package, then the package above the package and so on (e.g.
791             C, C, C) until the function
792             exists or it runs out of package names.
793              
794             The init function is then called with the newly-created port as context
795             object (C<$SELF>) and the C<@initdata> values as arguments. It I
796             call one of the C functions to set callbacks on C<$SELF>, otherwise
797             the port might not get created.
798              
799             A common idiom is to pass a local port, immediately monitor the spawned
800             port, and in the remote init function, immediately monitor the passed
801             local port. This two-way monitoring ensures that both ports get cleaned up
802             when there is a problem.
803              
804             C guarantees that the C<$initfunc> has no visible effects on the
805             caller before C returns (by delaying invocation when spawn is
806             called for the local node).
807              
808             Example: spawn a chat server port on C<$othernode>.
809              
810             # this node, executed from within a port context:
811             my $server = spawn $othernode, "MyApp::Chat::Server::connect", $SELF;
812             mon $server;
813              
814             # init function on C<$othernode>
815             sub connect {
816             my ($srcport) = @_;
817              
818             mon $srcport;
819              
820             rcv $SELF, sub {
821             ...
822             };
823             }
824              
825             =cut
826              
827             sub _spawn {
828 0     0     my $port = shift;
829 0           my $init = shift;
830              
831             # rcv will create the actual port
832 0           local $SELF = "$NODE#$port";
833 0           eval {
834 0           &{ load_func $init }
  0            
835             };
836 0 0         _self_die if $@;
837             }
838              
839             sub spawn(@) {
840 0     0 1   my ($nodeid, undef) = split /#/, shift, 2;
841              
842 0           my $id = $RUNIQ . ++$ID;
843              
844 0 0         $_[0] =~ /::/
845             or Carp::croak "spawn init function must be a fully-qualified name, caught";
846              
847 0           snd_to_func $nodeid, "AnyEvent::MP::_spawn" => $id, @_;
848              
849 0           "$nodeid#$id"
850             }
851              
852              
853             =item after $timeout, @msg
854              
855             =item after $timeout, $callback
856              
857             Either sends the given message, or call the given callback, after the
858             specified number of seconds.
859              
860             This is simply a utility function that comes in handy at times - the
861             AnyEvent::MP author is not convinced of the wisdom of having it, though,
862             so it may go away in the future.
863              
864             =cut
865              
866             sub after($@) {
867 0     0 1   my ($timeout, @action) = @_;
868              
869 0           my $t; $t = AE::timer $timeout, 0, sub {
870 0     0     undef $t;
871 0 0         ref $action[0]
872             ? $action[0]()
873             : snd @action;
874 0           };
875             }
876              
877             #=item $cb2 = timeout $seconds, $cb[, @args]
878              
879             =item cal $port, @msg, $callback[, $timeout]
880              
881             A simple form of RPC - sends a message to the given C<$port> with the
882             given contents (C<@msg>), but adds a reply port to the message.
883              
884             The reply port is created temporarily just for the purpose of receiving
885             the reply, and will be Ced when no longer needed.
886              
887             A reply message sent to the port is passed to the C<$callback> as-is.
888              
889             If an optional time-out (in seconds) is given and it is not C,
890             then the callback will be called without any arguments after the time-out
891             elapsed and the port is Ced.
892              
893             If no time-out is given (or it is C), then the local port will
894             monitor the remote port instead, so it eventually gets cleaned-up.
895              
896             Currently this function returns the temporary port, but this "feature"
897             might go in future versions unless you can make a convincing case that
898             this is indeed useful for something.
899              
900             =cut
901              
902             sub cal(@) {
903 0 0   0 1   my $timeout = ref $_[-1] ? undef : pop;
904 0           my $cb = pop;
905              
906             my $port = port {
907 0     0     undef $timeout;
908 0           kil $SELF;
909 0           &$cb;
910 0           };
911              
912 0 0         if (defined $timeout) {
913             $timeout = AE::timer $timeout, 0, sub {
914 0     0     undef $timeout;
915 0           kil $port;
916 0           $cb->();
917 0           };
918             } else {
919             mon $_[0], sub {
920 0     0     kil $port;
921 0           $cb->();
922 0           };
923             }
924              
925 0           push @_, $port;
926 0           &snd;
927              
928 0           $port
929             }
930              
931             =back
932              
933             =head1 DISTRIBUTED DATABASE
934              
935             AnyEvent::MP comes with a simple distributed database. The database will
936             be mirrored asynchronously on all global nodes. Other nodes bind to one
937             of the global nodes for their needs. Every node has a "local database"
938             which contains all the values that are set locally. All local databases
939             are merged together to form the global database, which can be queried.
940              
941             The database structure is that of a two-level hash - the database hash
942             contains hashes which contain values, similarly to a perl hash of hashes,
943             i.e.:
944              
945             $DATABASE{$family}{$subkey} = $value
946              
947             The top level hash key is called "family", and the second-level hash key
948             is called "subkey" or simply "key".
949              
950             The family must be alphanumeric, i.e. start with a letter and consist
951             of letters, digits, underscores and colons (C<[A-Za-z][A-Za-z0-9_:]*>,
952             pretty much like Perl module names.
953              
954             As the family namespace is global, it is recommended to prefix family names
955             with the name of the application or module using it.
956              
957             The subkeys must be non-empty strings, with no further restrictions.
958              
959             The values should preferably be strings, but other perl scalars should
960             work as well (such as C, arrays and hashes).
961              
962             Every database entry is owned by one node - adding the same family/subkey
963             combination on multiple nodes will not cause discomfort for AnyEvent::MP,
964             but the result might be nondeterministic, i.e. the key might have
965             different values on different nodes.
966              
967             Different subkeys in the same family can be owned by different nodes
968             without problems, and in fact, this is the common method to create worker
969             pools. For example, a worker port for image scaling might do this:
970              
971             db_set my_image_scalers => $port;
972              
973             And clients looking for an image scaler will want to get the
974             C keys from time to time:
975              
976             db_keys my_image_scalers => sub {
977             @ports = @{ $_[0] };
978             };
979              
980             Or better yet, they want to monitor the database family, so they always
981             have a reasonable up-to-date copy:
982              
983             db_mon my_image_scalers => sub {
984             @ports = keys %{ $_[0] };
985             };
986              
987             In general, you can set or delete single subkeys, but query and monitor
988             whole families only.
989              
990             If you feel the need to monitor or query a single subkey, try giving it
991             it's own family.
992              
993             =over
994              
995             =item $guard = db_set $family => $subkey [=> $value]
996              
997             Sets (or replaces) a key to the database - if C<$value> is omitted,
998             C is used instead.
999              
1000             When called in non-void context, C returns a guard that
1001             automatically calls C when it is destroyed.
1002              
1003             =item db_del $family => $subkey...
1004              
1005             Deletes one or more subkeys from the database family.
1006              
1007             =item $guard = db_reg $family => $port => $value
1008              
1009             =item $guard = db_reg $family => $port
1010              
1011             =item $guard = db_reg $family
1012              
1013             Registers a port in the given family and optionally returns a guard to
1014             remove it.
1015              
1016             This function basically does the same as:
1017              
1018             db_set $family => $port => $value
1019              
1020             Except that the port is monitored and automatically removed from the
1021             database family when it is kil'ed.
1022              
1023             If C<$value> is missing, C is used. If C<$port> is missing, then
1024             C<$SELF> is used.
1025              
1026             This function is most useful to register a port in some port group (which
1027             is just another name for a database family), and have it removed when the
1028             port is gone. This works best when the port is a local port.
1029              
1030             =cut
1031              
1032             sub db_reg($$;$) {
1033 0     0 1   my $family = shift;
1034 0 0         my $port = @_ ? shift : $SELF;
1035              
1036 0     0     my $clr = sub { db_del $family => $port };
  0            
1037 0           mon $port, $clr;
1038              
1039 0           db_set $family => $port => $_[0];
1040              
1041             defined wantarray
1042 0 0         and &Guard::guard ($clr)
1043             }
1044              
1045             =item db_family $family => $cb->(\%familyhash)
1046              
1047             Queries the named database C<$family> and call the callback with the
1048             family represented as a hash. You can keep and freely modify the hash.
1049              
1050             =item db_keys $family => $cb->(\@keys)
1051              
1052             Same as C, except it only queries the family I and passes
1053             them as array reference to the callback.
1054              
1055             =item db_values $family => $cb->(\@values)
1056              
1057             Same as C, except it only queries the family I and passes them
1058             as array reference to the callback.
1059              
1060             =item $guard = db_mon $family => $cb->(\%familyhash, \@added, \@changed, \@deleted)
1061              
1062             Creates a monitor on the given database family. Each time a key is
1063             set or is deleted the callback is called with a hash containing the
1064             database family and three lists of added, changed and deleted subkeys,
1065             respectively. If no keys have changed then the array reference might be
1066             C or even missing.
1067              
1068             If not called in void context, a guard object is returned that, when
1069             destroyed, stops the monitor.
1070              
1071             The family hash reference and the key arrays belong to AnyEvent::MP and
1072             B by the callback. When in doubt, make a
1073             copy.
1074              
1075             As soon as possible after the monitoring starts, the callback will be
1076             called with the intiial contents of the family, even if it is empty,
1077             i.e. there will always be a timely call to the callback with the current
1078             contents.
1079              
1080             It is possible that the callback is called with a change event even though
1081             the subkey is already present and the value has not changed.
1082              
1083             The monitoring stops when the guard object is destroyed.
1084              
1085             Example: on every change to the family "mygroup", print out all keys.
1086              
1087             my $guard = db_mon mygroup => sub {
1088             my ($family, $a, $c, $d) = @_;
1089             print "mygroup members: ", (join " ", keys %$family), "\n";
1090             };
1091              
1092             Exmaple: wait until the family "My::Module::workers" is non-empty.
1093              
1094             my $guard; $guard = db_mon My::Module::workers => sub {
1095             my ($family, $a, $c, $d) = @_;
1096             return unless %$family;
1097             undef $guard;
1098             print "My::Module::workers now nonempty\n";
1099             };
1100              
1101             Example: print all changes to the family "AnyEvent::Fantasy::Module".
1102              
1103             my $guard = db_mon AnyEvent::Fantasy::Module => sub {
1104             my ($family, $a, $c, $d) = @_;
1105              
1106             print "+$_=$family->{$_}\n" for @$a;
1107             print "*$_=$family->{$_}\n" for @$c;
1108             print "-$_=$family->{$_}\n" for @$d;
1109             };
1110              
1111             =cut
1112              
1113             =back
1114              
1115             =head1 AnyEvent::MP vs. Distributed Erlang
1116              
1117             AnyEvent::MP got lots of its ideas from distributed Erlang (Erlang node
1118             == aemp node, Erlang process == aemp port), so many of the documents and
1119             programming techniques employed by Erlang apply to AnyEvent::MP. Here is a
1120             sample:
1121              
1122             http://www.erlang.se/doc/programming_rules.shtml
1123             http://erlang.org/doc/getting_started/part_frame.html # chapters 3 and 4
1124             http://erlang.org/download/erlang-book-part1.pdf # chapters 5 and 6
1125             http://erlang.org/download/armstrong_thesis_2003.pdf # chapters 4 and 5
1126              
1127             Despite the similarities, there are also some important differences:
1128              
1129             =over 4
1130              
1131             =item * Node IDs are arbitrary strings in AEMP.
1132              
1133             Erlang relies on special naming and DNS to work everywhere in the same
1134             way. AEMP relies on each node somehow knowing its own address(es) (e.g. by
1135             configuration or DNS), and possibly the addresses of some seed nodes, but
1136             will otherwise discover other nodes (and their IDs) itself.
1137              
1138             =item * Erlang has a "remote ports are like local ports" philosophy, AEMP
1139             uses "local ports are like remote ports".
1140              
1141             The failure modes for local ports are quite different (runtime errors
1142             only) then for remote ports - when a local port dies, you I it dies,
1143             when a connection to another node dies, you know nothing about the other
1144             port.
1145              
1146             Erlang pretends remote ports are as reliable as local ports, even when
1147             they are not.
1148              
1149             AEMP encourages a "treat remote ports differently" philosophy, with local
1150             ports being the special case/exception, where transport errors cannot
1151             occur.
1152              
1153             =item * Erlang uses processes and a mailbox, AEMP does not queue.
1154              
1155             Erlang uses processes that selectively receive messages out of order, and
1156             therefore needs a queue. AEMP is event based, queuing messages would serve
1157             no useful purpose. For the same reason the pattern-matching abilities
1158             of AnyEvent::MP are more limited, as there is little need to be able to
1159             filter messages without dequeuing them.
1160              
1161             This is not a philosophical difference, but simply stems from AnyEvent::MP
1162             being event-based, while Erlang is process-based.
1163              
1164             You can have a look at L for a more Erlang-like process model on
1165             top of AEMP and Coro threads.
1166              
1167             =item * Erlang sends are synchronous, AEMP sends are asynchronous.
1168              
1169             Sending messages in Erlang is synchronous and blocks the process until
1170             a connection has been established and the message sent (and so does not
1171             need a queue that can overflow). AEMP sends return immediately, connection
1172             establishment is handled in the background.
1173              
1174             =item * Erlang suffers from silent message loss, AEMP does not.
1175              
1176             Erlang implements few guarantees on messages delivery - messages can get
1177             lost without any of the processes realising it (i.e. you send messages a,
1178             b, and c, and the other side only receives messages a and c).
1179              
1180             AEMP guarantees (modulo hardware errors) correct ordering, and the
1181             guarantee that after one message is lost, all following ones sent to the
1182             same port are lost as well, until monitoring raises an error, so there are
1183             no silent "holes" in the message sequence.
1184              
1185             If you want your software to be very reliable, you have to cope with
1186             corrupted and even out-of-order messages in both Erlang and AEMP. AEMP
1187             simply tries to work better in common error cases, such as when a network
1188             link goes down.
1189              
1190             =item * Erlang can send messages to the wrong port, AEMP does not.
1191              
1192             In Erlang it is quite likely that a node that restarts reuses an Erlang
1193             process ID known to other nodes for a completely different process,
1194             causing messages destined for that process to end up in an unrelated
1195             process.
1196              
1197             AEMP does not reuse port IDs, so old messages or old port IDs floating
1198             around in the network will not be sent to an unrelated port.
1199              
1200             =item * Erlang uses unprotected connections, AEMP uses secure
1201             authentication and can use TLS.
1202              
1203             AEMP can use a proven protocol - TLS - to protect connections and
1204             securely authenticate nodes.
1205              
1206             =item * The AEMP protocol is optimised for both text-based and binary
1207             communications.
1208              
1209             The AEMP protocol, unlike the Erlang protocol, supports both programming
1210             language independent text-only protocols (good for debugging), and binary,
1211             language-specific serialisers (e.g. Storable). By default, unless TLS is
1212             used, the protocol is actually completely text-based.
1213              
1214             It has also been carefully designed to be implementable in other languages
1215             with a minimum of work while gracefully degrading functionality to make the
1216             protocol simple.
1217              
1218             =item * AEMP has more flexible monitoring options than Erlang.
1219              
1220             In Erlang, you can chose to receive I exit signals as messages or
1221             I, there is no in-between, so monitoring single Erlang processes is
1222             difficult to implement.
1223              
1224             Monitoring in AEMP is more flexible than in Erlang, as one can choose
1225             between automatic kill, exit message or callback on a per-port basis.
1226              
1227             =item * Erlang tries to hide remote/local connections, AEMP does not.
1228              
1229             Monitoring in Erlang is not an indicator of process death/crashes, in the
1230             same way as linking is (except linking is unreliable in Erlang).
1231              
1232             In AEMP, you don't "look up" registered port names or send to named ports
1233             that might or might not be persistent. Instead, you normally spawn a port
1234             on the remote node. The init function monitors you, and you monitor the
1235             remote port. Since both monitors are local to the node, they are much more
1236             reliable (no need for C).
1237              
1238             This also saves round-trips and avoids sending messages to the wrong port
1239             (hard to do in Erlang).
1240              
1241             =back
1242              
1243             =head1 RATIONALE
1244              
1245             =over 4
1246              
1247             =item Why strings for port and node IDs, why not objects?
1248              
1249             We considered "objects", but found that the actual number of methods
1250             that can be called are quite low. Since port and node IDs travel over
1251             the network frequently, the serialising/deserialising would add lots of
1252             overhead, as well as having to keep a proxy object everywhere.
1253              
1254             Strings can easily be printed, easily serialised etc. and need no special
1255             procedures to be "valid".
1256              
1257             And as a result, a port with just a default receiver consists of a single
1258             code reference stored in a global hash - it can't become much cheaper.
1259              
1260             =item Why favour JSON, why not a real serialising format such as Storable?
1261              
1262             In fact, any AnyEvent::MP node will happily accept Storable as framing
1263             format, but currently there is no way to make a node use Storable by
1264             default (although all nodes will accept it).
1265              
1266             The default framing protocol is JSON because a) JSON::XS is many times
1267             faster for small messages and b) most importantly, after years of
1268             experience we found that object serialisation is causing more problems
1269             than it solves: Just like function calls, objects simply do not travel
1270             easily over the network, mostly because they will always be a copy, so you
1271             always have to re-think your design.
1272              
1273             Keeping your messages simple, concentrating on data structures rather than
1274             objects, will keep your messages clean, tidy and efficient.
1275              
1276             =back
1277              
1278             =head1 PORTING FROM AnyEvent::MP VERSION 1.X
1279              
1280             AEMP version 2 has a few major incompatible changes compared to version 1:
1281              
1282             =over 4
1283              
1284             =item AnyEvent::MP::Global no longer has group management functions.
1285              
1286             At least not officially - the grp_* functions are still exported and might
1287             work, but they will be removed in some later release.
1288              
1289             AnyEvent::MP now comes with a distributed database that is more
1290             powerful. Its database families map closely to port groups, but the API
1291             has changed (the functions are also now exported by AnyEvent::MP). Here is
1292             a rough porting guide:
1293              
1294             grp_reg $group, $port # old
1295             db_reg $group, $port # new
1296              
1297             $list = grp_get $group # old
1298             db_keys $group, sub { my $list = shift } # new
1299              
1300             grp_mon $group, $cb->(\@ports, $add, $del) # old
1301             db_mon $group, $cb->(\%ports, $add, $change, $del) # new
1302              
1303             C is a no-brainer (just replace by C), but C is
1304             no longer instant, because the local node might not have a copy of the
1305             group. You can either modify your code to allow for a callback, or use
1306             C to keep an updated copy of the group:
1307              
1308             my $local_group_copy;
1309             db_mon $group => sub { $local_group_copy = $_[0] };
1310              
1311             # now "keys %$local_group_copy" always returns the most up-to-date
1312             # list of ports in the group.
1313              
1314             C can be replaced by C with minor changes - C
1315             passes a hash as first argument, and an extra C<$chg> argument that can be
1316             ignored:
1317              
1318             db_mon $group => sub {
1319             my ($ports, $add, $chg, $del) = @_;
1320             $ports = [keys %$ports];
1321              
1322             # now $ports, $add and $del are the same as
1323             # were originally passed by grp_mon.
1324             ...
1325             };
1326              
1327             =item Nodes not longer connect to all other nodes.
1328              
1329             In AEMP 1.x, every node automatically loads the L
1330             module, which in turn would create connections to all other nodes in the
1331             network (helped by the seed nodes).
1332              
1333             In version 2.x, global nodes still connect to all other global nodes, but
1334             other nodes don't - now every node either is a global node itself, or
1335             attaches itself to another global node.
1336              
1337             If a node isn't a global node itself, then it attaches itself to one
1338             of its seed nodes. If that seed node isn't a global node yet, it will
1339             automatically be upgraded to a global node.
1340              
1341             So in many cases, nothing needs to be changed - one just has to make sure
1342             that all seed nodes are meshed together with the other seed nodes (as with
1343             AEMP 1.x), and other nodes specify them as seed nodes. This is most easily
1344             achieved by specifying the same set of seed nodes for all nodes in the
1345             network.
1346              
1347             Not opening a connection to every other node is usually an advantage,
1348             except when you need the lower latency of an already established
1349             connection. To ensure a node establishes a connection to another node,
1350             you can monitor the node port (C), which will attempt to
1351             create the connection (and notify you when the connection fails).
1352              
1353             =item Listener-less nodes (nodes without binds) are gone.
1354              
1355             And are not coming back, at least not in their old form. If no C
1356             are specified for a node, AnyEvent::MP assumes a default of C<*:*>.
1357              
1358             There are vague plans to implement some form of routing domains, which
1359             might or might not bring back listener-less nodes, but don't count on it.
1360              
1361             The fact that most connections are now optional somewhat mitigates this,
1362             as a node can be effectively unreachable from the outside without any
1363             problems, as long as it isn't a global node and only reaches out to other
1364             nodes (as opposed to being contacted from other nodes).
1365              
1366             =item $AnyEvent::MP::Kernel::WARN has gone.
1367              
1368             AnyEvent has acquired a logging framework (L), and AEMP now
1369             uses this, and so should your programs.
1370              
1371             Every module now documents what kinds of messages it generates, with
1372             AnyEvent::MP acting as a catch all.
1373              
1374             On the positive side, this means that instead of setting
1375             C, you can get away by setting C -
1376             much less to type.
1377              
1378             =back
1379              
1380             =head1 LOGGING
1381              
1382             AnyEvent::MP does not normally log anything by itself, but since it is the
1383             root of the context hierarchy for AnyEvent::MP modules, it will receive
1384             all log messages by submodules.
1385              
1386             =head1 SEE ALSO
1387              
1388             L - a gentle introduction.
1389              
1390             L - more, lower-level, stuff.
1391              
1392             L - network maintenance and port groups, to find
1393             your applications.
1394              
1395             L - establish data connections between nodes.
1396              
1397             L - simple service to display log messages from
1398             all nodes.
1399              
1400             L.
1401              
1402             =head1 AUTHOR
1403              
1404             Marc Lehmann
1405             http://home.schmorp.de/
1406              
1407             =cut
1408              
1409             1
1410