File Coverage

blib/lib/AnyEvent/MP/Kernel.pm
Criterion Covered Total %
statement 16 18 88.8
branch n/a
condition n/a
subroutine 6 6 100.0
pod n/a
total 22 24 91.6


line stmt bran cond sub pod time code
1             =head1 NAME
2              
3             AnyEvent::MP::Kernel - the actual message passing kernel
4              
5             =head1 SYNOPSIS
6              
7             use AnyEvent::MP::Kernel;
8              
9             =head1 DESCRIPTION
10              
11             This module provides most of the basic functionality of AnyEvent::MP,
12             exposed through higher level interfaces such as L<AnyEvent::MP> and
13             L<Coro::MP>.
14              
15             This module is mainly of interest when knowledge about connectivity,
16             connected nodes etc. is sought.
17              
18             =head1 GLOBALS AND FUNCTIONS
19              
20             =over 4
21              
22             =cut
23              
24             package AnyEvent::MP::Kernel;
25              
26 1     1   1706 use common::sense;
  1         3  
  1         13  
27 1     1   1087 use POSIX ();
  1         7904  
  1         35  
28 1     1   7 use Carp ();
  1         3  
  1         16  
29 1     1   5 use MIME::Base64 ();
  1         3  
  1         14  
30              
31 1     1   5 use AE ();
  1         2  
  1         17  
32              
33 1     1   60 use AnyEvent::MP::Node;
  0            
  0            
34             use AnyEvent::MP::Transport;
35              
36             use base "Exporter";
37              
38             our @EXPORT = qw(
39             %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
40             add_node load_func snd_to_func snd_on eval_on
41              
42             NODE $NODE node_of snd kil port_is_local
43             configure
44             up_nodes mon_nodes node_is_up
45             );
46              
47             =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
48              
49             This value is called with an error or warning message, when e.g. a
50             connection could not be created, authorisation failed and so on.
51              
52             It I<must not> block or send messages -queue it and use an idle watcher if
53             you need to do any of these things.
54              
55             C<$level> should be C<0> for messages to be logged always, C<1> for
56             unexpected messages and errors, C<2> for warnings, C<7> for messages about
57             node connectivity and services, C<8> for debugging messages and C<9> for
58             tracing messages.
59              
60             The default simply logs the message to STDERR.
61              
62             =item @AnyEvent::MP::Kernel::WARN
63              
64             All code references in this array are called for every log message, from
65             the default C<$WARN> handler. This is an easy way to tie into the log
66             messages without disturbing others.
67              
68             =cut
69              
70             our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
71             our @WARN;
72             our $WARN = sub {
73             &$_ for @WARN;
74              
75             return if $WARNLEVEL < $_[0];
76              
77             my ($level, $msg) = @_;
78              
79             $msg =~ s/\n$//;
80              
81             printf STDERR "%s <%d> %s\n",
82             (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
83             $level,
84             $msg;
85             };
86              
87             =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
88              
89             The maximum level at which warning messages will be printed to STDERR by
90             the default warn handler.
91              
92             =cut
93              
94             sub load_func($) {
95             my $func = $_[0];
96              
97             unless (defined &$func) {
98             my $pkg = $func;
99             do {
100             $pkg =~ s/::[^:]+$//
101             or return sub { die "unable to resolve function '$func'" };
102              
103             local $@;
104             unless (eval "require $pkg; 1") {
105             my $error = $@;
106             $error =~ /^Can't locate .*.pm in \@INC \(/
107             or return sub { die $error };
108             }
109             } until defined &$func;
110             }
111              
112             \&$func
113             }
114              
115             sub nonce($) {
116             my $nonce;
117              
118             if (open my $fh, "</dev/urandom") {
119             sysread $fh, $nonce, $_[0];
120             } else {
121             # shit...
122             $nonce = join "", map +(chr rand 256), 1 .. $_[0]
123             }
124              
125             $nonce
126             }
127              
128             sub alnumbits($) {
129             my $data = $_[0];
130              
131             if (eval "use Math::GMP 2.05; 1") {
132             $data = Math::GMP::get_str_gmp (
133             (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
134             62
135             );
136             } else {
137             $data = MIME::Base64::encode_base64 $data, "";
138             $data =~ s/=//;
139             $data =~ s/x/x0/g;
140             $data =~ s/\//x1/g;
141             $data =~ s/\+/x2/g;
142             }
143              
144             $data
145             }
146              
147             sub gen_uniq {
148             alnumbits pack "nna*", $$ & 0xffff, time & 0xffff, nonce 2
149             }
150              
151             our $CONFIG; # this node's configuration
152              
153             our $RUNIQ; # remote uniq value
154             our $UNIQ; # per-process/node unique cookie
155             our $NODE;
156             our $ID = "a";
157              
158             our %NODE; # node id to transport mapping, or "undef", for local node
159             our (%PORT, %PORT_DATA); # local ports
160              
161             our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
162             our %LMON; # monitored _local_ ports
163              
164             our %LISTENER;
165             our $LISTENER; # our listeners, as arrayref
166              
167             our $SRCNODE; # holds the sending node during _inject
168              
169             sub _seed {
170             $RUNIQ = alnumbits nonce 96/8;
171             $UNIQ = gen_uniq;
172             $NODE = "anon/$RUNIQ";
173             }
174              
175             _seed;
176              
177             sub NODE() {
178             $NODE
179             }
180              
181             sub node_of($) {
182             my ($node, undef) = split /#/, $_[0], 2;
183              
184             $node
185             }
186              
187             BEGIN {
188             *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
189             ? sub () { 1 }
190             : sub () { 0 };
191             }
192              
193             our $DELAY_TIMER;
194             our @DELAY_QUEUE;
195              
196             sub _delay_run {
197             (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
198             }
199              
200             sub delay($) {
201             push @DELAY_QUEUE, shift;
202             $DELAY_TIMER ||= AE::timer 0, 0, \&_delay_run;
203             }
204              
205             sub _inject {
206             warn "RCV $SRCNODE->{id} -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
207             &{ $PORT{+shift} or return };
208             }
209              
210             # this function adds a node-ref, so you can send stuff to it
211             # it is basically the central routing component.
212             sub add_node {
213             my ($node) = @_;
214              
215             $NODE{$node} ||= new AnyEvent::MP::Node::Direct $node
216             }
217              
218             sub snd(@) {
219             my ($nodeid, $portid) = split /#/, shift, 2;
220              
221             warn "SND $nodeid <- " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
222              
223             defined $nodeid #d#UGLY
224             or Carp::croak "'undef' is not a valid node ID/port ID";
225              
226             ($NODE{$nodeid} || add_node $nodeid)
227             ->{send} (["$portid", @_]);
228             }
229              
230             =item $is_local = port_is_local $port
231              
232             Returns true iff the port is a local port.
233              
234             =cut
235              
236             sub port_is_local($) {
237             my ($nodeid, undef) = split /#/, $_[0], 2;
238              
239             $NODE{$nodeid} == $NODE{""}
240             }
241              
242             =item snd_to_func $node, $func, @args
243              
244             Expects a node ID and a name of a function. Asynchronously tries to call
245             this function with the given arguments on that node.
246              
247             This function can be used to implement C<spawn>-like interfaces.
248              
249             =cut
250              
251             sub snd_to_func($$;@) {
252             my $nodeid = shift;
253              
254             # on $NODE, we artificially delay... (for spawn)
255             # this is very ugly - maybe we should simply delay ALL messages,
256             # to avoid deep recursion issues. but that's so... slow...
257             $AnyEvent::MP::Node::Self::DELAY = 1
258             if $nodeid ne $NODE;
259              
260             defined $nodeid #d#UGLY
261             or Carp::croak "'undef' is not a valid node ID/port ID";
262              
263             ($NODE{$nodeid} || add_node $nodeid)->send (["", @_]);
264             }
265              
266             =item snd_on $node, @msg
267              
268             Executes C<snd> with the given C<@msg> (which must include the destination
269             port) on the given node.
270              
271             =cut
272              
273             sub snd_on($@) {
274             my $node = shift;
275             snd $node, snd => @_;
276             }
277              
278             =item eval_on $node, $string[, @reply]
279              
280             Evaluates the given string as Perl expression on the given node. When
281             @reply is specified, then it is used to construct a reply message with
282             C<"$@"> and any results from the eval appended.
283              
284             =cut
285              
286             sub eval_on($$;@) {
287             my $node = shift;
288             snd $node, eval => @_;
289             }
290              
291             sub kil(@) {
292             my ($nodeid, $portid) = split /#/, shift, 2;
293              
294             length $portid
295             or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
296              
297             ($NODE{$nodeid} || add_node $nodeid)
298             ->kill ("$portid", @_);
299             }
300              
301             sub _nodename {
302             require POSIX;
303             (POSIX::uname ())[1]
304             }
305              
306             sub _resolve($) {
307             my ($nodeid) = @_;
308              
309             my $cv = AE::cv;
310             my @res;
311              
312             $cv->begin (sub {
313             my %seen;
314             my @refs;
315             for (sort { $a->[0] <=> $b->[0] } @res) {
316             push @refs, $_->[1] unless $seen{$_->[1]}++
317             }
318             shift->send (@refs);
319             });
320              
321             my $idx;
322             for my $t (split /,/, $nodeid) {
323             my $pri = ++$idx;
324              
325             $t = length $t ? _nodename . ":$t" : _nodename
326             if $t =~ /^\d*$/;
327            
328             my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
329             or Carp::croak "$t: unparsable transport descriptor";
330              
331             $port = "0" if $port eq "*";
332              
333             if ($host eq "*") {
334             $cv->begin;
335             # use fork_call, as Net::Interface is big, and we need it rarely.
336             require AnyEvent::Util;
337             AnyEvent::Util::fork_call (
338             sub {
339             my @addr;
340              
341             require Net::Interface;
342              
343             for my $if (Net::Interface->interfaces) {
344             # we statically lower-prioritise ipv6 here, TODO :()
345             for $_ ($if->address (Net::Interface::AF_INET ())) {
346             next if /^\x7f/; # skip localhost etc.
347             push @addr, $_;
348             }
349             for ($if->address (Net::Interface::AF_INET6 ())) {
350             #next if $if->scope ($_) <= 2;
351             next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
352             push @addr, $_;
353             }
354              
355             }
356             @addr
357             }, sub {
358             for my $ip (@_) {
359             push @res, [
360             $pri += 1e-5,
361             AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
362             ];
363             }
364             $cv->end;
365             }
366             );
367             } else {
368             $cv->begin;
369             AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
370             for (@_) {
371             my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
372             push @res, [
373             $pri += 1e-5,
374             AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
375             ];
376             }
377             $cv->end;
378             };
379             }
380             }
381              
382             $cv->end;
383              
384             $cv
385             }
386              
387             sub configure(@) {
388             unshift @_, "profile" if @_ & 1;
389             my (%kv) = @_;
390              
391             delete $NODE{$NODE}; # we do not support doing stuff before configure
392             _seed;
393              
394             my $profile = delete $kv{profile};
395              
396             $profile = _nodename
397             unless defined $profile;
398              
399             $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
400              
401             my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : $profile;
402              
403             $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
404              
405             $NODE = $node
406             unless $node eq "anon/";
407              
408             $NODE{$NODE} = $NODE{""};
409             $NODE{$NODE}{id} = $NODE;
410              
411             my $seeds = $CONFIG->{seeds};
412             my $binds = $CONFIG->{binds};
413              
414             $binds ||= ["*"];
415              
416             $WARN->(8, "node $NODE starting up.");
417              
418             $LISTENER = [];
419             %LISTENER = ();
420              
421             for (map _resolve $_, @$binds) {
422             for my $bind ($_->recv) {
423             my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
424             or Carp::croak "$bind: unparsable local bind address";
425              
426             my $listener = AnyEvent::MP::Transport::mp_server
427             $host,
428             $port,
429             prepare => sub {
430             my (undef, $host, $port) = @_;
431             $bind = AnyEvent::Socket::format_hostport $host, $port;
432             0
433             },
434             ;
435             $LISTENER{$bind} = $listener;
436             push @$LISTENER, $bind;
437             }
438             }
439              
440             $WARN->(8, "node listens on [@$LISTENER].");
441              
442             # the global service is mandatory currently
443             require AnyEvent::MP::Global;
444              
445             # connect to all seednodes
446             AnyEvent::MP::Global::set_seeds (map $_->recv, map _resolve $_, @$seeds);
447              
448             for (@{ $CONFIG->{services} }) {
449             if (ref) {
450             my ($func, @args) = @$_;
451             (load_func $func)->(@args);
452             } elsif (s/::$//) {
453             eval "require $_";
454             die $@ if $@;
455             } else {
456             (load_func $_)->();
457             }
458             }
459             }
460              
461             #############################################################################
462             # node monitoring and info
463              
464             =item node_is_known $nodeid
465              
466             Returns true iff the given node is currently known to the system. The only
467             time a node is known but not up currently is when a conenction request is
468             pending.
469              
470             =cut
471              
472             sub node_is_known($) {
473             exists $NODE{$_[0]}
474             }
475              
476             =item node_is_up $nodeid
477              
478             Returns true if the given node is "up", that is, the kernel thinks it has
479             a working connection to it.
480              
481             If the node is known but not currently connected, returns C<0>. If the
482             node is not known, returns C<undef>.
483              
484             =cut
485              
486             sub node_is_up($) {
487             ($NODE{$_[0]} or return)->{transport}
488             ? 1 : 0
489             }
490              
491             =item known_nodes
492              
493             Returns the node IDs of all nodes currently known to this node, including
494             itself and nodes not currently connected.
495              
496             =cut
497              
498             sub known_nodes() {
499             map $_->{id}, values %NODE
500             }
501              
502             =item up_nodes
503              
504             Return the node IDs of all nodes that are currently connected (excluding
505             the node itself).
506              
507             =cut
508              
509             sub up_nodes() {
510             map $_->{id}, grep $_->{transport}, values %NODE
511             }
512              
513             =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
514              
515             Registers a callback that is called each time a node goes up (a connection
516             is established) or down (the connection is lost).
517              
518             Node up messages can only be followed by node down messages for the same
519             node, and vice versa.
520              
521             Note that monitoring a node is usually better done by monitoring it's node
522             port. This function is mainly of interest to modules that are concerned
523             about the network topology and low-level connection handling.
524              
525             Callbacks I<must not> block and I<should not> send any messages.
526              
527             The function returns an optional guard which can be used to unregister
528             the monitoring callback again.
529              
530             Example: make sure you call function C<newnode> for all nodes that are up
531             or go up (and down).
532              
533             newnode $_, 1 for up_nodes;
534             mon_nodes \&newnode;
535              
536             =cut
537              
538             our %MON_NODES;
539              
540             sub mon_nodes($) {
541             my ($cb) = @_;
542              
543             $MON_NODES{$cb+0} = $cb;
544              
545             defined wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
546             }
547              
548             sub _inject_nodeevent($$;@) {
549             my ($node, $up, @reason) = @_;
550              
551             for my $cb (values %MON_NODES) {
552             eval { $cb->($node->{id}, $up, @reason); 1 }
553             or $WARN->(1, $@);
554             }
555              
556             $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
557             }
558              
559             #############################################################################
560             # self node code
561              
562             sub _kill {
563             my $port = shift;
564              
565             delete $PORT{$port}
566             or return; # killing nonexistent ports is O.K.
567             delete $PORT_DATA{$port};
568              
569             my $mon = delete $LMON{$port}
570             or !@_
571             or $WARN->(2, "unmonitored local port $port died with reason: @_");
572              
573             $_->(@_) for values %$mon;
574             }
575              
576             sub _monitor {
577             return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]")
578             unless exists $PORT{$_[1]};
579              
580             $LMON{$_[1]}{$_[2]+0} = $_[2];
581             }
582              
583             sub _unmonitor {
584             delete $LMON{$_[1]}{$_[2]+0}
585             if exists $LMON{$_[1]};
586             }
587              
588             our %node_req = (
589             # internal services
590              
591             # monitoring
592             mon0 => sub { # stop monitoring a port for another node
593             my $portid = shift;
594             _unmonitor undef, $portid, delete $SRCNODE->{rmon}{$portid};
595             },
596             mon1 => sub { # start monitoring a port for another node
597             my $portid = shift;
598             Scalar::Util::weaken (my $node = $SRCNODE);
599             _monitor undef, $portid, $node->{rmon}{$portid} = sub {
600             delete $node->{rmon}{$portid};
601             $node->send (["", kil0 => $portid, @_])
602             if $node && $node->{transport};
603             };
604             },
605             # another node has killed a monitored port
606             kil0 => sub {
607             my $cbs = delete $SRCNODE->{lmon}{+shift}
608             or return;
609              
610             $_->(@_) for @$cbs;
611             },
612              
613             # "public" services - not actually public
614              
615             # another node wants to kill a local port
616             kil => \&_kill,
617              
618             # relay message to another node / generic echo
619             snd => \&snd,
620             snd_multiple => sub {
621             snd @$_ for @_
622             },
623              
624             # informational
625             info => sub {
626             snd @_, $NODE;
627             },
628             known_nodes => sub {
629             snd @_, known_nodes;
630             },
631             up_nodes => sub {
632             snd @_, up_nodes;
633             },
634              
635             # random utilities
636             eval => sub {
637             my @res = do { package main; eval shift };
638             snd @_, "$@", @res if @_;
639             },
640             time => sub {
641             snd @_, AE::time;
642             },
643             devnull => sub {
644             #
645             },
646             "" => sub {
647             # empty messages are keepalives or similar devnull-applications
648             },
649             );
650              
651             $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
652             $PORT{""} = sub {
653             my $tag = shift;
654             eval { &{ $node_req{$tag} ||= load_func $tag } };
655             $WARN->(2, "error processing node message: $@") if $@;
656             };
657              
658             =back
659              
660             =head1 SEE ALSO
661              
662             L<AnyEvent::MP>.
663              
664             =head1 AUTHOR
665              
666             Marc Lehmann <schmorp@schmorp.de>
667             http://home.schmorp.de/
668              
669             =cut
670              
671             1
672