File Coverage

blib/lib/AnyEvent/MP/Kernel.pm
Criterion Covered Total %
statement 29 296 9.8
branch 3 132 2.2
condition 1 44 2.2
subroutine 11 72 15.2
pod 6 40 15.0
total 50 584 8.5


line stmt bran cond sub pod time code
1             =head1 NAME
2              
3             AnyEvent::MP::Kernel - the actual message passing kernel
4              
5             =head1 SYNOPSIS
6              
7             use AnyEvent::MP::Kernel;
8              
9             $AnyEvent::MP::Kernel::SRCNODE # contains msg origin node id, for debugging
10              
11             snd_to_func $node, $func, @args # send msg to function
12             snd_on $node, @msg # snd message again (relay)
13             eval_on $node, $string[, @reply] # execute perl code on another node
14              
15             node_is_up $nodeid # return true if a node is connected
16             @nodes = up_nodes # return a list of all connected nodes
17             $guard = mon_nodes $callback->($node, $is_up, @reason) # connections up/downs
18              
19             =head1 DESCRIPTION
20              
21             This module implements most of the inner workings of AnyEvent::MP. It
22             offers mostly lower-level functions that deal with network connectivity
23             and special requests.
24              
25             You normally interface with AnyEvent::MP through a higher level interface
26             such as L and L, although there is nothing wrong
27             with using the functions from this module.
28              
29             =head1 GLOBALS AND FUNCTIONS
30              
31             =over 4
32              
33             =cut
34              
35             package AnyEvent::MP::Kernel;
36              
37 1     1   454 use common::sense;
  1         2  
  1         4  
38 1     1   35 use Carp ();
  1         1  
  1         10  
39              
40 1     1   2 use AnyEvent ();
  1         2  
  1         9  
41 1     1   3 use Guard ();
  1         2  
  1         11  
42              
43 1     1   3 use AnyEvent::MP::Node;
  1         1  
  1         13  
44 1     1   3 use AnyEvent::MP::Transport;
  1         1  
  1         14  
45              
46 1     1   3 use base "Exporter";
  1         1  
  1         861  
47              
48             # for re-export in AnyEvent::MP and Coro::MP
49             our @EXPORT_API = qw(
50             NODE $NODE
51             configure
52             node_of port_is_local
53             snd kil
54             db_set db_del
55             db_mon db_family db_keys db_values
56             );
57              
58             our @EXPORT_OK = (
59             # these are internal
60             qw(
61             %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
62             add_node load_func
63             ),
64             @EXPORT_API,
65             );
66              
67             our @EXPORT = qw(
68             snd_to_func snd_on eval_on
69             port_is_local
70             up_nodes mon_nodes node_is_up
71             );
72              
73             our @CARP_NOT = (AnyEvent::MP::);
74              
75             sub load_func($) {
76 0     0 0 0 my $func = $_[0];
77              
78 0 0       0 unless (defined &$func) {
79 0         0 my $pkg = $func;
80 0         0 do {
81             $pkg =~ s/::[^:]+$//
82 0 0   0   0 or return sub { die "unable to resolve function '$func'" };
  0         0  
83              
84 0         0 local $@;
85 0 0       0 unless (eval "require $pkg; 1") {
86 0         0 my $error = $@;
87             $error =~ /^Can't locate .*.pm in \@INC \(/
88 0 0   0   0 or return sub { die $error };
  0         0  
89             }
90             } until defined &$func;
91             }
92              
93 0         0 \&$func
94             }
95              
96             my @alnum = ('0' .. '9', 'A' .. 'Z', 'a' .. 'z');
97              
98             sub nonce($) {
99 0     0 0 0 join "", map chr rand 256, 1 .. $_[0]
100             }
101              
102             sub nonce62($) {
103 2     2 0 11 join "", map $alnum[rand 62], 1 .. $_[0]
104             }
105              
106             our $CONFIG; # this node's configuration
107             our $SECURE;
108              
109             our $RUNIQ; # remote uniq value
110             our $UNIQ; # per-process/node unique cookie
111             our $NODE;
112             our $ID = "a";
113              
114             our %NODE; # node id to transport mapping, or "undef", for local node
115             our (%PORT, %PORT_DATA); # local ports
116              
117             our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
118             our %LMON; # monitored _local_ ports
119              
120             #our $GLOBAL; # true if node is a global ("directory") node
121             our %BINDS;
122             our $BINDS; # our listeners, as arrayref
123              
124             our $SRCNODE; # holds the sending node _object_ during _inject
125             our $GLOBAL; # true when this is a global node (only set by AnyEvent::MP::Global)
126              
127             # initialise names for non-networked operation
128             {
129             # ~54 bits, for local port names, lowercase $ID appended
130             my $now = AE::now;
131             $UNIQ =
132             (join "",
133             map $alnum[$_],
134             $$ / 62 % 62,
135             $$ % 62,
136             (int $now ) % 62,
137             (int $now * 100) % 62,
138             (int $now * 10000) % 62,
139             ) . nonce62 4
140             ;
141              
142             # ~59 bits, for remote port names, one longer than $UNIQ and uppercase at the end to avoid clashes
143             $RUNIQ = nonce62 10;
144             $RUNIQ =~ s/(.)$/\U$1/;
145              
146             $NODE = "";
147             }
148              
149             sub NODE() {
150 0     0 0 0 $NODE
151             }
152              
153             sub node_of($) {
154 0     0 0 0 my ($node, undef) = split /#/, $_[0], 2;
155              
156 0         0 $node
157             }
158              
159             BEGIN {
160             *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
161             ? sub () { 1 }
162 1 50   1   6128 : sub () { 0 };
163             }
164              
165             our $DELAY_TIMER;
166             our @DELAY_QUEUE;
167              
168             our $delay_run = sub {
169             (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
170             };
171              
172             sub delay($) {
173 0     0 0 0 push @DELAY_QUEUE, shift;
174 0   0     0 $DELAY_TIMER ||= AE::timer 0, 0, $delay_run;
175             }
176              
177             =item $AnyEvent::MP::Kernel::SRCNODE
178              
179             During execution of a message callback, this variable contains the node ID
180             of the origin node.
181              
182             The main use of this variable is for debugging output - there are probably
183             very few other cases where you need to know the source node ID.
184              
185             =cut
186              
187             sub _inject {
188 0     0   0 warn "RCV $SRCNODE -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;
189              
190 0 0       0 &{ $PORT{+shift} or return };
  0         0  
191             }
192              
193             # this function adds a node-ref, so you can send stuff to it
194             # it is basically the central routing component.
195             sub add_node {
196 0 0   0 0 0 $NODE{$_[0]} || do {
197 0         0 my ($node) = @_;
198              
199 0 0       0 length $node
200             or Carp::croak "'undef' or the empty string are not valid node/port IDs";
201              
202             # registers itself in %NODE
203 0         0 new AnyEvent::MP::Node::Remote $node
204             }
205             }
206              
207             sub snd(@) {
208 0     0 0 0 my ($nodeid, $portid) = split /#/, shift, 2;
209              
210 0         0 warn "SND $nodeid <- " . eval { JSON::XS->new->encode ([$portid, @_]) } . "\n" if TRACE && @_;
211              
212             ($NODE{$nodeid} || add_node $nodeid)
213 0   0     0 ->{send} (["$portid", @_]);
214             }
215              
216             sub port_is_local($) {
217 0     0 0 0 my ($nodeid, undef) = split /#/, $_[0], 2;
218              
219 0         0 $nodeid eq $NODE
220             }
221              
222             =item snd_to_func $node, $func, @args
223              
224             Expects a node ID and a name of a function. Asynchronously tries to call
225             this function with the given arguments on that node.
226              
227             This function can be used to implement C-like interfaces.
228              
229             =cut
230              
231             sub snd_to_func($$;@) {
232 0     0 1 0 my $nodeid = shift;
233              
234             # on $NODE, we artificially delay... (for spawn)
235             # this is very ugly - maybe we should simply delay ALL messages,
236             # to avoid deep recursion issues. but that's so... slow...
237 0 0       0 $AnyEvent::MP::Node::Self::DELAY = 1
238             if $nodeid ne $NODE;
239              
240 0   0     0 ($NODE{$nodeid} || add_node $nodeid)->{send} (["", @_]);
241             }
242              
243             =item snd_on $node, @msg
244              
245             Executes C with the given C<@msg> (which must include the destination
246             port) on the given node.
247              
248             =cut
249              
250             sub snd_on($@) {
251 0     0 1 0 my $node = shift;
252 0         0 snd $node, snd => @_;
253             }
254              
255             =item eval_on $node, $string[, @reply]
256              
257             Evaluates the given string as Perl expression on the given node. When
258             @reply is specified, then it is used to construct a reply message with
259             C<"$@"> and any results from the eval appended.
260              
261             =cut
262              
263             sub eval_on($$;@) {
264 0     0 1 0 my $node = shift;
265 0         0 snd $node, eval => @_;
266             }
267              
268             sub kil(@) {
269 0     0 0 0 my ($nodeid, $portid) = split /#/, shift, 2;
270              
271 0 0       0 length $portid
272             or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
273              
274 0   0     0 ($NODE{$nodeid} || add_node $nodeid)
275             ->kill ("$portid", @_);
276             }
277              
278             #############################################################################
279             # node monitoring and info
280              
281             =item $bool = node_is_up $nodeid
282              
283             Returns true if the given node is "up", that is, the kernel thinks it has
284             a working connection to it.
285              
286             More precisely, if the node is up, returns C<1>. If the node is currently
287             connecting or otherwise known but not connected, returns C<0>. If nothing
288             is known about the node, returns C.
289              
290             =cut
291              
292             sub node_is_up($) {
293             ($_[0] eq $NODE) || ($NODE{$_[0]} or return)->{transport}
294 0 0 0 0 1 0 ? 1 : 0
295             }
296              
297             =item @nodes = up_nodes
298              
299             Return the node IDs of all nodes that are currently connected (excluding
300             the node itself).
301              
302             =cut
303              
304             sub up_nodes() {
305 0     0 1 0 map $_->{id}, grep $_->{transport}, values %NODE
306             }
307              
308             =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
309              
310             Registers a callback that is called each time a node goes up (a connection
311             is established) or down (the connection is lost).
312              
313             Node up messages can only be followed by node down messages for the same
314             node, and vice versa.
315              
316             Note that monitoring a node is usually better done by monitoring its node
317             port. This function is mainly of interest to modules that are concerned
318             about the network topology and low-level connection handling.
319              
320             Callbacks I block and I send any messages.
321              
322             The function returns an optional guard which can be used to unregister
323             the monitoring callback again.
324              
325             Example: make sure you call function C for all nodes that are up
326             or go up (and down).
327              
328             newnode $_, 1 for up_nodes;
329             mon_nodes \&newnode;
330              
331             =cut
332              
333             our %MON_NODES;
334              
335             sub mon_nodes($) {
336 3     3 1 5 my ($cb) = @_;
337              
338 3         7 $MON_NODES{$cb+0} = $cb;
339              
340             defined wantarray
341 0     0   0 and Guard::guard { delete $MON_NODES{$cb+0} }
342 3 50       10 }
343              
344             sub _inject_nodeevent($$;@) {
345 0     0   0 my ($node, $up, @reason) = @_;
346              
347 0 0       0 AE::log 7 => "$node->{id} is " . ($up ? "up." : "down (@reason).");
348              
349 0         0 for my $cb (values %MON_NODES) {
350 0 0       0 eval { $cb->($node->{id}, $up, @reason); 1 }
  0         0  
  0         0  
351             or AE::log die => $@;
352             }
353             }
354              
355             #############################################################################
356             # self node code
357              
358             sub _kill {
359 0     0   0 my $port = shift;
360              
361 0 0       0 delete $PORT{$port}
362             or return; # killing nonexistent ports is O.K.
363 0         0 delete $PORT_DATA{$port};
364              
365 0 0 0     0 my $mon = delete $LMON{$port}
366             or !@_
367             or AE::log die => "unmonitored local port $port died with reason: @_";
368              
369 0         0 $_->(@_) for values %$mon;
370             }
371              
372             sub _monitor {
373             return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]")
374 0 0   0   0 unless exists $PORT{$_[1]};
375              
376 0         0 $LMON{$_[1]}{$_[2]+0} = $_[2];
377             }
378              
379             sub _unmonitor {
380             delete $LMON{$_[1]}{$_[2]+0}
381 0 0   0   0 if exists $LMON{$_[1]};
382             }
383              
384             sub _secure_check {
385 0 0   0   0 $SECURE
386             and die "remote execution not allowed\n";
387             }
388              
389             our %NODE_REQ;
390              
391             %NODE_REQ = (
392             # "mproto" - monitoring protocol
393              
394             # monitoring
395             mon0 => sub { # stop monitoring a port for another node
396             my $portid = shift;
397             # the if exists should not be needed, but there is apparently a bug
398             # elsewhere, and this works around that, silently suppressing that bug. sigh.
399             _unmonitor undef, $portid, delete $NODE{$SRCNODE}{rmon}{$portid}
400             if exists $NODE{$SRCNODE};
401             },
402             mon1 => sub { # start monitoring a port for another node
403             my $portid = shift;
404             Scalar::Util::weaken (my $node = $NODE{$SRCNODE});
405             _monitor undef, $portid, $node->{rmon}{$portid} = sub {
406             delete $node->{rmon}{$portid};
407             $node->send (["", kil0 => $portid, @_])
408             if $node && $node->{transport};
409             };
410             },
411             # another node has killed a monitored port
412             kil0 => sub {
413             my $cbs = delete $NODE{$SRCNODE}{lmon}{+shift}
414             or return;
415              
416             $_->(@_) for @$cbs;
417             },
418             # another node wants to kill a local port
419             kil1 => \&_kill,
420              
421             # "public" services - not actually public
422              
423             # relay message to another node / generic echo
424             snd => sub {
425             &snd
426             },
427             # ask if a node supports the given request, only works for fixed tags
428             can => sub {
429             my $method = shift;
430             snd @_, exists $NODE_REQ{$method};
431             },
432              
433             # random utilities
434             eval => sub {
435             &_secure_check;
436             my @res = do { package main; eval shift };
437             snd @_, "$@", @res if @_;
438             },
439             time => sub {
440             snd @_, AE::now;
441             },
442             devnull => sub {
443             #
444             },
445             "" => sub {
446             # empty messages are keepalives or similar devnull-applications
447             },
448             );
449              
450             # the node port
451             new AnyEvent::MP::Node::Self $NODE; # registers itself in %NODE
452              
453             $PORT{""} = sub {
454             my $tag = shift;
455             eval { &{ $NODE_REQ{$tag} ||= do { &_secure_check; load_func $tag } } };
456             AE::log die => "error processing node message from $SRCNODE: $@" if $@;
457             };
458              
459             our $MPROTO = 1;
460              
461             # tell everybody who connects our nproto
462             push @AnyEvent::MP::Transport::HOOK_GREET, sub {
463             $_[0]{local_greeting}{mproto} = $MPROTO;
464             };
465              
466             #############################################################################
467             # seed management, try to keep connections to all seeds at all times
468              
469             our %SEED_NODE; # seed ID => node ID|undef
470             our %NODE_SEED; # map node ID to seed ID
471             our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting
472             our $SEED_WATCHER;
473             our $SEED_RETRY;
474             our %GLOBAL_NODE; # global => undef
475              
476             sub seed_connect {
477 0     0 0 0 my ($seed) = @_;
478              
479 0 0       0 my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
480             or Carp::croak "$seed: unparsable seed address";
481              
482 0         0 AE::log 9 => "trying connect to seed node $seed.";
483              
484             $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect
485             $host, $port,
486             on_greeted => sub {
487             # called after receiving remote greeting, learn remote node name
488              
489             # we rely on untrusted data here (the remote node name) this is
490             # hopefully ok, as this can at most be used for DOSing, which is easy
491             # when you can do MITM anyway.
492              
493             # if we connect to ourselves, nuke this seed, but make sure we act like a seed
494 0 0   0   0 if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
495 0         0 require AnyEvent::MP::Global; # every seed becomes a global node currently
496 0         0 delete $SEED_NODE{$seed};
497             } else {
498 0         0 $SEED_NODE{$seed} = $_[0]{remote_node};
499 0         0 $NODE_SEED{$_[0]{remote_node}} = $seed;
500              
501             # also start global service, in case it isn't running
502             # since we probably switch conenctions, maybe we don't need to do this here?
503 0         0 snd $_[0]{remote_node}, "g_slave";
504             }
505             },
506             sub {
507 0     0   0 delete $SEED_CONNECT{$seed};
508             }
509 0   0     0 ;
510             }
511              
512             sub seed_all {
513             my @seeds = grep
514 0   0 0 0 0 !(defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_}),
515             keys %SEED_NODE;
516              
517 0 0       0 if (@seeds) {
518             # start connection attempt for every seed we are not connected to yet
519             seed_connect $_
520 0         0 for grep !exists $SEED_CONNECT{$_}, @seeds;
521              
522 0         0 $SEED_RETRY = $SEED_RETRY * 2;
523             $SEED_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
524 0 0       0 if $SEED_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
525              
526 0         0 $SEED_WATCHER = AE::timer $SEED_RETRY, 0, \&seed_all;
527              
528             } else {
529             # all seeds connected or connecting, no need to restart timer
530 0         0 undef $SEED_WATCHER;
531             }
532             }
533              
534             sub seed_again {
535 0     0 0 0 $SEED_RETRY = (1 + rand) * 0.6;
536 0   0     0 $SEED_WATCHER ||= AE::timer 0, 0, \&seed_all;
537             }
538              
539             # sets new seed list, starts connecting
540             sub set_seeds(@) {
541 0     0 0 0 %SEED_NODE = ();
542 0         0 %NODE_SEED = ();
543 0         0 %SEED_CONNECT = ();
544              
545 0         0 @SEED_NODE{@_} = ();
546              
547 0         0 seed_again;
548             }
549              
550             # normal nodes only record global node connections
551             $NODE_REQ{g_global} = sub {
552             undef $GLOBAL_NODE{$SRCNODE};
553             };
554              
555             mon_nodes sub {
556             delete $GLOBAL_NODE{$_[0]}
557             unless $_[1];
558              
559             return unless exists $NODE_SEED{$_[0]};
560              
561             if ($_[1]) {
562             # each time a connection to a seed node goes up, make
563             # sure it runs the global service.
564             snd $_[0], "g_slave";
565             } else {
566             # if we lost the connection to a seed node, make sure we are seeding
567             seed_again;
568             }
569             };
570              
571             #############################################################################
572             # keepalive code - used to kepe conenctions to certain nodes alive
573             # only used by global code atm., but ought to be exposed somehow.
574             #TODO: should probbaly be done directly by node objects
575              
576             our $KEEPALIVE_RETRY;
577             our $KEEPALIVE_WATCHER;
578             our %KEEPALIVE; # we want to keep these nodes alive
579             our %KEEPALIVE_DOWN; # nodes that are down currently
580              
581             sub keepalive_all {
582 0     0 0 0 AE::log 9 => "keepalive: trying to establish connections with: "
583             . (join " ", keys %KEEPALIVE_DOWN)
584             . ".";
585              
586             (add_node $_)->connect
587 0         0 for keys %KEEPALIVE_DOWN;
588              
589 0         0 $KEEPALIVE_RETRY = $KEEPALIVE_RETRY * 2;
590             $KEEPALIVE_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
591 0 0       0 if $KEEPALIVE_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
592              
593 0         0 $KEEPALIVE_WATCHER = AE::timer $KEEPALIVE_RETRY, 0, \&keepalive_all;
594             }
595              
596             sub keepalive_again {
597 0     0 0 0 $KEEPALIVE_RETRY = (1 + rand) * 0.3;
598 0         0 keepalive_all;
599             }
600              
601             sub keepalive_add {
602 0 0   0 0 0 return if $KEEPALIVE{$_[0]}++;
603              
604 0 0       0 return if node_is_up $_[0];
605 0         0 undef $KEEPALIVE_DOWN{$_[0]};
606 0         0 keepalive_again;
607             }
608              
609             sub keepalive_del {
610 0 0   0 0 0 return if --$KEEPALIVE{$_[0]};
611              
612 0         0 delete $KEEPALIVE {$_[0]};
613 0         0 delete $KEEPALIVE_DOWN{$_[0]};
614              
615 0 0       0 undef $KEEPALIVE_WATCHER
616             unless %KEEPALIVE_DOWN;
617             }
618              
619             mon_nodes sub {
620             return unless exists $KEEPALIVE{$_[0]};
621              
622             if ($_[1]) {
623             delete $KEEPALIVE_DOWN{$_[0]};
624              
625             undef $KEEPALIVE_WATCHER
626             unless %KEEPALIVE_DOWN;
627             } else {
628             # lost the conenction, try to connect again
629             undef $KEEPALIVE_DOWN{$_[0]};
630             keepalive_again;
631             }
632             };
633              
634             #############################################################################
635             # talk with/to global nodes
636              
637             # protocol messages:
638             #
639             # sent by global nodes
640             # g_global - global nodes send this to all others
641             #
642             # database protocol
643             # g_slave database - make other global node master of the sender
644             # g_set database - global node's database to other global nodes
645             # g_upd family set del - update single family (any to global)
646             #
647             # slave <-> global protocol
648             # g_find node - query addresses for node (slave to global)
649             # g_found node binds - node addresses (global to slave)
650             # g_db_family family id - send g_reply with data (global to slave)
651             # g_db_keys family id - send g_reply with data (global to slave)
652             # g_db_values family id - send g_reply with data (global to slave)
653             # g_reply id result - result of any query (global to slave)
654             # g_mon1 family - start to monitor family, replies with g_chg1
655             # g_mon0 family - stop monitoring family
656             # g_chg1 family hash - initial value of family when starting to monitor
657             # g_chg2 family set del - like g_upd, but for monitoring only
658             #
659             # internal database families:
660             # "'l" -> node -> listeners
661             # "'g" -> node -> undef
662             # ...
663             #
664              
665             # used on all nodes:
666             our $MASTER; # the global node we bind ourselves to
667             our $MASTER_MON;
668             our %LOCAL_DB; # this node database
669              
670             our $GPROTO = 1;
671              
672             # tell everybody who connects our gproto
673             push @AnyEvent::MP::Transport::HOOK_GREET, sub {
674             $_[0]{local_greeting}{gproto} = $GPROTO;
675             };
676              
677             #############################################################################
678             # master selection
679              
680             # master requests
681             our %GLOBAL_REQ; # $id => \@req
682              
683             sub global_req_add {
684 0     0 0 0 my ($id, $req) = @_;
685              
686 0 0       0 return if exists $GLOBAL_REQ{$id};
687              
688 0         0 $GLOBAL_REQ{$id} = $req;
689              
690 0 0       0 snd $MASTER, @$req
691             if $MASTER;
692             }
693              
694             sub global_req_del {
695 0     0 0 0 delete $GLOBAL_REQ{$_[0]};
696             }
697              
698             #################################
699             # master rpc
700              
701             our %GLOBAL_RES;
702             our $GLOBAL_RES_ID = "a";
703              
704             sub global_call {
705 0     0 0 0 my $id = ++$GLOBAL_RES_ID;
706 0         0 $GLOBAL_RES{$id} = pop;
707 0         0 global_req_add $id, [@_, $id];
708             }
709              
710             $NODE_REQ{g_reply} = sub {
711             my $id = shift;
712             global_req_del $id;
713             my $cb = delete $GLOBAL_RES{$id}
714             or return;
715             &$cb
716             };
717              
718             #################################
719              
720             sub g_find {
721 0     0 0 0 global_req_add "g_find $_[0]", [g_find => $_[0]];
722             }
723              
724             # reply for g_find started in Node.pm
725             $NODE_REQ{g_found} = sub {
726             global_req_del "g_find $_[0]";
727              
728             my $node = $NODE{$_[0]} or return;
729              
730             $node->connect_to ($_[1]);
731             };
732              
733             sub master_set {
734 0     0 0 0 $MASTER = $_[0];
735 0         0 AE::log 8 => "new master node: $MASTER.";
736              
737             $MASTER_MON = mon_nodes sub {
738 0 0 0 0   0 if ($_[0] eq $MASTER && !$_[1]) {
739 0         0 undef $MASTER;
740 0         0 master_search ();
741             }
742 0         0 };
743              
744 0         0 snd $MASTER, g_slave => \%LOCAL_DB;
745              
746             # (re-)send queued requests
747             snd $MASTER, @$_
748 0         0 for values %GLOBAL_REQ;
749             }
750              
751             sub master_search {
752 0     0 0 0 AE::log 9 => "starting search for master node.";
753              
754             #TODO: should also look for other global nodes, but we don't know them
755 0         0 for (keys %NODE_SEED) {
756 0 0       0 if (node_is_up $_) {
757 0         0 master_set $_;
758 0         0 return;
759             }
760             }
761              
762             $MASTER_MON = mon_nodes sub {
763 0 0   0   0 return unless $_[1]; # we are only interested in node-ups
764 0 0       0 return unless $NODE_SEED{$_[0]}; # we are only interested in seed nodes
765              
766 0         0 master_set $_[0];
767 0         0 };
768             }
769              
770             # other node wants to make us the master, so start the global service
771             $NODE_REQ{g_slave} = sub {
772             # load global module and redo the request
773             require AnyEvent::MP::Global;
774             &{ $NODE_REQ{g_slave} }
775             };
776              
777             #############################################################################
778             # local database operations
779              
780             # canonical probably not needed
781             our $sv_eq_coder = JSON::XS->new->utf8->allow_nonref;
782              
783             # are the two scalars equal? very very ugly and slow, need better way
784             sub sv_eq($$) {
785 0 0 0 0 0 0 ref $_[0] || ref $_[1]
      0        
786             ? (JSON::XS::encode $sv_eq_coder, $_[0]) eq (JSON::XS::encode $sv_eq_coder, $_[1])
787             : $_[0] eq $_[1]
788             && defined $_[0] == defined $_[1]
789             }
790              
791             # local database management
792              
793             sub db_del($@) {
794 0     0 0 0 my $family = shift;
795              
796 0         0 my @del = grep exists $LOCAL_DB{$family}{$_}, @_;
797              
798 0 0       0 return unless @del;
799              
800 0         0 delete @{ $LOCAL_DB{$family} }{@del};
  0         0  
801 0 0       0 snd $MASTER, g_upd => $family => undef, \@del
802             if defined $MASTER;
803             }
804              
805             sub db_set($$;$) {
806 0     0 0 0 my ($family, $subkey) = @_;
807              
808             # if (ref $_[1]) {
809             # # bulk
810             # my @del = grep exists $LOCAL_DB{$_[0]}{$_}, keys ${ $_[1] };
811             # $LOCAL_DB{$_[0]} = $_[1];
812             # snd $MASTER, g_upd => $_[0] => $_[1], \@del
813             # if defined $MASTER;
814             # } else {
815             # single-key
816 0 0 0     0 unless (exists $LOCAL_DB{$family}{$subkey} && sv_eq $LOCAL_DB{$family}{$subkey}, $_[2]) {
817 0         0 $LOCAL_DB{$family}{$subkey} = $_[2];
818 0 0       0 snd $MASTER, g_upd => $family => { $subkey => $_[2] }
819             if defined $MASTER;
820             }
821             # }
822              
823             defined wantarray
824 0     0   0 and Guard::guard { db_del $family => $subkey }
825 0 0       0 }
826              
827             # database query
828              
829             sub db_family {
830 0     0 0 0 my ($family, $cb) = @_;
831 0         0 global_call g_db_family => $family, $cb;
832             }
833              
834             sub db_keys {
835 0     0 0 0 my ($family, $cb) = @_;
836 0         0 global_call g_db_keys => $family, $cb;
837             }
838              
839             sub db_values {
840 0     0 0 0 my ($family, $cb) = @_;
841 0         0 global_call g_db_values => $family, $cb;
842             }
843              
844             # database monitoring
845              
846             our %LOCAL_MON; # f, reply
847             our %MON_DB; # f, k, value
848              
849             sub db_mon($@) {
850 0     0 0 0 my ($family, $cb) = @_;
851              
852 0 0       0 if (my $db = $MON_DB{$family}) {
853             # we already monitor, so create a "dummy" change event
854             # this is postponed, which might be too late (we could process
855             # change events), so disable the callback at first
856 0     0   0 $LOCAL_MON{$family}{$cb+0} = sub { };
857             AE::postpone {
858 0 0   0   0 return unless exists $LOCAL_MON{$family}{$cb+0}; # guard might have gone away already
859              
860             # set actual callback
861 0         0 $LOCAL_MON{$family}{$cb+0} = $cb;
862 0         0 $cb->($db, [keys %$db]);
863 0         0 };
864             } else {
865             # new monitor, request chg1 from upstream
866 0         0 $LOCAL_MON{$family}{$cb+0} = $cb;
867 0         0 global_req_add "mon1 $family" => [g_mon1 => $family];
868 0         0 $MON_DB{$family} = {};
869             }
870              
871             defined wantarray
872             and Guard::guard {
873 0     0   0 my $mon = $LOCAL_MON{$family};
874 0         0 delete $mon->{$cb+0};
875              
876 0 0       0 unless (%$mon) {
877 0         0 global_req_del "mon1 $family";
878              
879             # no global_req, because we don't care if we are not connected
880 0 0       0 snd $MASTER, g_mon0 => $family
881             if $MASTER;
882              
883 0         0 delete $LOCAL_MON{$family};
884 0         0 delete $MON_DB{$family};
885             }
886             }
887 0 0       0 }
888              
889             # full update
890             $NODE_REQ{g_chg1} = sub {
891             return unless $SRCNODE eq $MASTER;
892             my ($f, $ndb) = @_;
893              
894             my $db = $MON_DB{$f};
895             my (@a, @c, @d);
896              
897             # add or replace keys
898             while (my ($k, $v) = each %$ndb) {
899             exists $db->{$k}
900             ? push @c, $k
901             : push @a, $k;
902             $db->{$k} = $v;
903             }
904              
905             # delete keys that are no longer present
906             for (grep !exists $ndb->{$_}, keys %$db) {
907             delete $db->{$_};
908             push @d, $_;
909             }
910              
911             $_->($db, \@a, \@c, \@d)
912             for values %{ $LOCAL_MON{$_[0]} };
913             };
914              
915             # incremental update
916             $NODE_REQ{g_chg2} = sub {
917             return unless $SRCNODE eq $MASTER;
918             my ($family, $set, $del) = @_;
919              
920             my $db = $MON_DB{$family};
921              
922             my (@a, @c);
923              
924             while (my ($k, $v) = each %$set) {
925             exists $db->{$k}
926             ? push @c, $k
927             : push @a, $k;
928             $db->{$k} = $v;
929             }
930              
931             delete @$db{@$del};
932              
933             $_->($db, \@a, \@c, $del)
934             for values %{ $LOCAL_MON{$family} };
935             };
936              
937             #############################################################################
938             # configure
939              
940             sub nodename {
941 0     0 0 0 require POSIX;
942 0         0 (POSIX::uname ())[1]
943             }
944              
945             sub _resolve($) {
946 0     0   0 my ($nodeid) = @_;
947              
948 0         0 my $cv = AE::cv;
949 0         0 my @res;
950              
951             $cv->begin (sub {
952 0     0   0 my %seen;
953             my @refs;
954 0         0 for (sort { $a->[0] <=> $b->[0] } @res) {
  0         0  
955 0 0       0 push @refs, $_->[1] unless $seen{$_->[1]}++
956             }
957 0         0 shift->send (@refs);
958 0         0 });
959              
960 0         0 my $idx;
961 0         0 for my $t (split /,/, $nodeid) {
962 0         0 my $pri = ++$idx;
963              
964 0 0       0 $t = length $t ? nodename . ":$t" : nodename
    0          
965             if $t =~ /^\d*$/;
966            
967 0 0       0 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
968             or Carp::croak "$t: unparsable transport descriptor";
969              
970 0 0       0 $port = "0" if $port eq "*";
971              
972 0 0       0 if ($host eq "*") {
973 0         0 $cv->begin;
974              
975             my $get_addr = sub {
976 0     0   0 my @addr;
977              
978 0         0 require Net::Interface;
979              
980             # Net::Interface hangs on some systems, so hope for the best
981 0         0 local $SIG{ALRM} = 'DEFAULT';
982 0         0 alarm 2;
983              
984 0         0 for my $if (Net::Interface->interfaces) {
985             # we statically lower-prioritise ipv6 here, TODO :()
986 0         0 for $_ ($if->address (Net::Interface::AF_INET ())) {
987 0 0       0 next if /^\x7f/; # skip localhost etc.
988 0         0 push @addr, $_;
989             }
990 0         0 for ($if->address (Net::Interface::AF_INET6 ())) {
991             #next if $if->scope ($_) <= 2;
992 0 0       0 next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
993 0         0 push @addr, $_;
994             }
995             }
996              
997 0         0 alarm 0;
998              
999             @addr
1000 0         0 };
  0         0  
1001              
1002 0         0 my @addr;
1003              
1004 0         0 if (AnyEvent::WIN32) {
1005             @addr = $get_addr->();
1006             } else {
1007             # use a child process, as Net::Interface is big, and we need it only once.
1008              
1009 0 0       0 pipe my $r, my $w
1010             or die "pipe: $!";
1011              
1012 0 0       0 if (fork eq 0) {
1013 0         0 close $r;
1014 0         0 syswrite $w, pack "(C/a*)*", $get_addr->();
1015 0         0 require POSIX;
1016 0         0 POSIX::_exit (0);
1017             } else {
1018 0         0 close $w;
1019              
1020 0         0 my $addr;
1021              
1022 0         0 1 while sysread $r, $addr, 1024, length $addr;
1023            
1024 0         0 @addr = unpack "(C/a*)*", $addr;
1025             }
1026             }
1027              
1028 0         0 for my $ip (@addr) {
1029 0         0 push @res, [
1030             $pri += 1e-5,
1031             AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
1032             ];
1033             }
1034 0         0 $cv->end;
1035             } else {
1036 0         0 $cv->begin;
1037             AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
1038 0     0   0 for (@_) {
1039 0         0 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
1040 0         0 push @res, [
1041             $pri += 1e-5,
1042             AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
1043             ];
1044             }
1045 0         0 $cv->end;
1046 0         0 };
1047             }
1048             }
1049              
1050 0         0 $cv->end;
1051              
1052 0         0 $cv
1053             }
1054              
1055             our @POST_CONFIGURE;
1056              
1057             # not yet documented
1058             sub post_configure(&) {
1059 1 50   1 0 3 die "AnyEvent::MP::Kernel::post_configure must be called in void context" if defined wantarray;
1060              
1061 1         3 push @POST_CONFIGURE, @_;
1062 1   33     3 (shift @POST_CONFIGURE)->() while $NODE && @POST_CONFIGURE;
1063             }
1064              
1065             sub configure(@) {
1066 0 0   0 0   unshift @_, "profile" if @_ & 1;
1067 0           my (%kv) = @_;
1068              
1069 0           my $profile = delete $kv{profile};
1070              
1071 0 0         $profile = nodename
1072             unless defined $profile;
1073              
1074 0           $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
1075              
1076 0           $SECURE = $CONFIG->{secure};
1077              
1078 0 0         my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : "$profile/";
1079              
1080 0 0         $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
1081              
1082 0           my $node_obj = delete $NODE{$NODE}; # we do not support doing stuff before configure
1083              
1084 0           $NODE = $node;
1085              
1086 0           $NODE =~ s/%n/nodename/ge;
  0            
1087              
1088 0 0         if ($NODE =~ s!(?:(?<=/)$|%u)!$RUNIQ!g) {
1089             # nodes with randomised node names do not need randomised port names
1090 0           $UNIQ = "";
1091             }
1092              
1093 0           $node_obj->{id} = $NODE;
1094 0           $NODE{$NODE} = $node_obj;
1095              
1096 0           my $seeds = $CONFIG->{seeds};
1097 0           my $binds = $CONFIG->{binds};
1098              
1099 0   0       $binds ||= ["*"];
1100              
1101 0           AE::log 8 => "node $NODE starting up.";
1102              
1103 0           $BINDS = [];
1104 0           %BINDS = ();
1105              
1106 0           for (map _resolve $_, @$binds) {
1107 0           for my $bind ($_->recv) {
1108 0 0         my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
1109             or Carp::croak "$bind: unparsable local bind address";
1110              
1111             my $listener = AnyEvent::MP::Transport::mp_server
1112             $host,
1113             $port,
1114             prepare => sub {
1115 0     0     my (undef, $host, $port) = @_;
1116 0           $bind = AnyEvent::Socket::format_hostport $host, $port;
1117 0           0
1118             },
1119 0           ;
1120 0           $BINDS{$bind} = $listener;
1121 0           push @$BINDS, $bind;
1122             }
1123             }
1124              
1125 0           AE::log 9 => "running post config hooks and init.";
1126              
1127             # might initialise Global, so need to do it before db_set
1128 0     0     post_configure { };
1129              
1130 0           db_set "'l" => $NODE => $BINDS;
1131              
1132 0           AE::log 8 => "node listens on [@$BINDS].";
1133              
1134             # connect to all seednodes
1135 0           set_seeds map $_->recv, map _resolve $_, @$seeds;
1136 0           master_search;
1137              
1138             # save gobs of memory
1139 0           undef &_resolve;
1140 0     0     *configure = sub (@){ };
1141              
1142 0           AE::log 9 => "starting services.";
1143              
1144 0           for (@{ $CONFIG->{services} }) {
  0            
1145 0 0         if (ref) {
    0          
1146 0           my ($func, @args) = @$_;
1147 0           (load_func $func)->(@args);
1148             } elsif (s/::$//) {
1149 0           eval "require $_";
1150 0 0         die $@ if $@;
1151             } else {
1152 0           (load_func $_)->();
1153             }
1154             }
1155              
1156 0           eval "#line 1 \"(eval configure parameter)\"\n$CONFIG->{eval}";
1157 0 0         die "$@" if $@;
1158             }
1159              
1160             =back
1161              
1162             =head1 LOGGING
1163              
1164             AnyEvent::MP::Kernel logs high-level information about the current node,
1165             when nodes go up and down, and most runtime errors. It also logs some
1166             debugging and trace messages about network maintainance, such as seed
1167             connections and global node management.
1168              
1169             =head1 SEE ALSO
1170              
1171             L.
1172              
1173             =head1 AUTHOR
1174              
1175             Marc Lehmann
1176             http://home.schmorp.de/
1177              
1178             =cut
1179              
1180             1
1181