| line | stmt | bran | cond | sub | pod | time | code | 
| 1 |  |  |  |  |  |  | =head1 NAME | 
| 2 |  |  |  |  |  |  |  | 
| 3 |  |  |  |  |  |  | AnyEvent::MP::Kernel - the actual message passing kernel | 
| 4 |  |  |  |  |  |  |  | 
| 5 |  |  |  |  |  |  | =head1 SYNOPSIS | 
| 6 |  |  |  |  |  |  |  | 
| 7 |  |  |  |  |  |  | use AnyEvent::MP::Kernel; | 
| 8 |  |  |  |  |  |  |  | 
| 9 |  |  |  |  |  |  | $AnyEvent::MP::Kernel::SRCNODE   # contains msg origin node id, for debugging | 
| 10 |  |  |  |  |  |  |  | 
| 11 |  |  |  |  |  |  | snd_to_func $node, $func, @args  # send msg to function | 
| 12 |  |  |  |  |  |  | snd_on $node, @msg               # snd message again (relay) | 
| 13 |  |  |  |  |  |  | eval_on $node, $string[, @reply] # execute perl code on another node | 
| 14 |  |  |  |  |  |  |  | 
| 15 |  |  |  |  |  |  | node_is_up $nodeid               # return true if a node is connected | 
| 16 |  |  |  |  |  |  | @nodes = up_nodes                # return a list of all connected nodes | 
| 17 |  |  |  |  |  |  | $guard = mon_nodes $callback->($node, $is_up, @reason) # connections up/downs | 
| 18 |  |  |  |  |  |  |  | 
| 19 |  |  |  |  |  |  | =head1 DESCRIPTION | 
| 20 |  |  |  |  |  |  |  | 
| 21 |  |  |  |  |  |  | This module implements most of the inner workings of AnyEvent::MP. It | 
| 22 |  |  |  |  |  |  | offers mostly lower-level functions that deal with network connectivity | 
| 23 |  |  |  |  |  |  | and special requests. | 
| 24 |  |  |  |  |  |  |  | 
| 25 |  |  |  |  |  |  | You normally interface with AnyEvent::MP through a higher level interface | 
| 26 |  |  |  |  |  |  | such as L and L, although there is nothing wrong | 
| 27 |  |  |  |  |  |  | with using the functions from this module. | 
| 28 |  |  |  |  |  |  |  | 
| 29 |  |  |  |  |  |  | =head1 GLOBALS AND FUNCTIONS | 
| 30 |  |  |  |  |  |  |  | 
| 31 |  |  |  |  |  |  | =over 4 | 
| 32 |  |  |  |  |  |  |  | 
| 33 |  |  |  |  |  |  | =cut | 
| 34 |  |  |  |  |  |  |  | 
| 35 |  |  |  |  |  |  | package AnyEvent::MP::Kernel; | 
| 36 |  |  |  |  |  |  |  | 
| 37 | 1 |  |  | 1 |  | 504 | use common::sense; | 
|  | 1 |  |  |  |  | 2 |  | 
|  | 1 |  |  |  |  | 4 |  | 
| 38 | 1 |  |  | 1 |  | 36 | use Carp (); | 
|  | 1 |  |  |  |  | 2 |  | 
|  | 1 |  |  |  |  | 10 |  | 
| 39 |  |  |  |  |  |  |  | 
| 40 | 1 |  |  | 1 |  | 3 | use AnyEvent (); | 
|  | 1 |  |  |  |  | 2 |  | 
|  | 1 |  |  |  |  | 9 |  | 
| 41 | 1 |  |  | 1 |  | 3 | use Guard (); | 
|  | 1 |  |  |  |  | 1 |  | 
|  | 1 |  |  |  |  | 11 |  | 
| 42 |  |  |  |  |  |  |  | 
| 43 | 1 |  |  | 1 |  | 3 | use AnyEvent::MP::Node; | 
|  | 1 |  |  |  |  | 1 |  | 
|  | 1 |  |  |  |  | 14 |  | 
| 44 | 1 |  |  | 1 |  | 3 | use AnyEvent::MP::Transport; | 
|  | 1 |  |  |  |  | 1 |  | 
|  | 1 |  |  |  |  | 14 |  | 
| 45 |  |  |  |  |  |  |  | 
| 46 | 1 |  |  | 1 |  | 3 | use base "Exporter"; | 
|  | 1 |  |  |  |  | 1 |  | 
|  | 1 |  |  |  |  | 778 |  | 
| 47 |  |  |  |  |  |  |  | 
| 48 |  |  |  |  |  |  | # for re-export in AnyEvent::MP and Coro::MP | 
| 49 |  |  |  |  |  |  | our @EXPORT_API = qw( | 
| 50 |  |  |  |  |  |  | NODE $NODE | 
| 51 |  |  |  |  |  |  | configure | 
| 52 |  |  |  |  |  |  | node_of port_is_local | 
| 53 |  |  |  |  |  |  | snd kil | 
| 54 |  |  |  |  |  |  | db_set db_del | 
| 55 |  |  |  |  |  |  | db_mon db_family db_keys db_values | 
| 56 |  |  |  |  |  |  | ); | 
| 57 |  |  |  |  |  |  |  | 
| 58 |  |  |  |  |  |  | our @EXPORT_OK = ( | 
| 59 |  |  |  |  |  |  | # these are internal | 
| 60 |  |  |  |  |  |  | qw( | 
| 61 |  |  |  |  |  |  | %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID | 
| 62 |  |  |  |  |  |  | add_node load_func | 
| 63 |  |  |  |  |  |  | ), | 
| 64 |  |  |  |  |  |  | @EXPORT_API, | 
| 65 |  |  |  |  |  |  | ); | 
| 66 |  |  |  |  |  |  |  | 
| 67 |  |  |  |  |  |  | our @EXPORT = qw( | 
| 68 |  |  |  |  |  |  | snd_to_func snd_on eval_on | 
| 69 |  |  |  |  |  |  | port_is_local | 
| 70 |  |  |  |  |  |  | up_nodes mon_nodes node_is_up | 
| 71 |  |  |  |  |  |  | ); | 
| 72 |  |  |  |  |  |  |  | 
| 73 |  |  |  |  |  |  | our @CARP_NOT = (AnyEvent::MP::); | 
| 74 |  |  |  |  |  |  |  | 
| 75 |  |  |  |  |  |  | sub load_func($) { | 
| 76 | 0 |  |  | 0 | 0 | 0 | my $func = $_[0]; | 
| 77 |  |  |  |  |  |  |  | 
| 78 | 0 | 0 |  |  |  | 0 | unless (defined &$func) { | 
| 79 | 0 |  |  |  |  | 0 | my $pkg = $func; | 
| 80 | 0 |  |  |  |  | 0 | do { | 
| 81 |  |  |  |  |  |  | $pkg =~ s/::[^:]+$// | 
| 82 | 0 | 0 |  | 0 |  | 0 | or return sub { die "unable to resolve function '$func'" }; | 
|  | 0 |  |  |  |  | 0 |  | 
| 83 |  |  |  |  |  |  |  | 
| 84 | 0 |  |  |  |  | 0 | local $@; | 
| 85 | 0 | 0 |  |  |  | 0 | unless (eval "require $pkg; 1") { | 
| 86 | 0 |  |  |  |  | 0 | my $error = $@; | 
| 87 |  |  |  |  |  |  | $error =~ /^Can't locate .*.pm in \@INC \(/ | 
| 88 | 0 | 0 |  | 0 |  | 0 | or return sub { die $error }; | 
|  | 0 |  |  |  |  | 0 |  | 
| 89 |  |  |  |  |  |  | } | 
| 90 |  |  |  |  |  |  | } until defined &$func; | 
| 91 |  |  |  |  |  |  | } | 
| 92 |  |  |  |  |  |  |  | 
| 93 | 0 |  |  |  |  | 0 | \&$func | 
| 94 |  |  |  |  |  |  | } | 
| 95 |  |  |  |  |  |  |  | 
| 96 |  |  |  |  |  |  | my @alnum = ('0' .. '9', 'A' .. 'Z', 'a' .. 'z'); | 
| 97 |  |  |  |  |  |  |  | 
| 98 |  |  |  |  |  |  | sub nonce($) { | 
| 99 | 0 |  |  | 0 | 0 | 0 | join "", map chr rand 256, 1 .. $_[0] | 
| 100 |  |  |  |  |  |  | } | 
| 101 |  |  |  |  |  |  |  | 
| 102 |  |  |  |  |  |  | sub nonce62($) { | 
| 103 | 2 |  |  | 2 | 0 | 13 | join "", map $alnum[rand 62], 1 .. $_[0] | 
| 104 |  |  |  |  |  |  | } | 
| 105 |  |  |  |  |  |  |  | 
| 106 |  |  |  |  |  |  | our $CONFIG; # this node's configuration | 
| 107 |  |  |  |  |  |  | our $SECURE; | 
| 108 |  |  |  |  |  |  |  | 
| 109 |  |  |  |  |  |  | our $RUNIQ; # remote uniq value | 
| 110 |  |  |  |  |  |  | our $UNIQ;  # per-process/node unique cookie | 
| 111 |  |  |  |  |  |  | our $NODE; | 
| 112 |  |  |  |  |  |  | our $ID = "a"; | 
| 113 |  |  |  |  |  |  |  | 
| 114 |  |  |  |  |  |  | our %NODE; # node id to transport mapping, or "undef", for local node | 
| 115 |  |  |  |  |  |  | our (%PORT, %PORT_DATA); # local ports | 
| 116 |  |  |  |  |  |  |  | 
| 117 |  |  |  |  |  |  | our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb) | 
| 118 |  |  |  |  |  |  | our %LMON; # monitored _local_ ports | 
| 119 |  |  |  |  |  |  |  | 
| 120 |  |  |  |  |  |  | #our $GLOBAL; # true if node is a global ("directory") node | 
| 121 |  |  |  |  |  |  | our %BINDS; | 
| 122 |  |  |  |  |  |  | our $BINDS; # our listeners, as arrayref | 
| 123 |  |  |  |  |  |  |  | 
| 124 |  |  |  |  |  |  | our $SRCNODE; # holds the sending node _object_ during _inject | 
| 125 |  |  |  |  |  |  | our $GLOBAL;  # true when this is a global node (only set by AnyEvent::MP::Global) | 
| 126 |  |  |  |  |  |  |  | 
| 127 |  |  |  |  |  |  | # initialise names for non-networked operation | 
| 128 |  |  |  |  |  |  | { | 
| 129 |  |  |  |  |  |  | # ~54 bits, for local port names, lowercase $ID appended | 
| 130 |  |  |  |  |  |  | my $now = AE::now; | 
| 131 |  |  |  |  |  |  | $UNIQ = | 
| 132 |  |  |  |  |  |  | (join "", | 
| 133 |  |  |  |  |  |  | map $alnum[$_], | 
| 134 |  |  |  |  |  |  | $$ / 62 % 62, | 
| 135 |  |  |  |  |  |  | $$ % 62, | 
| 136 |  |  |  |  |  |  | (int $now        ) % 62, | 
| 137 |  |  |  |  |  |  | (int $now *   100) % 62, | 
| 138 |  |  |  |  |  |  | (int $now * 10000) % 62, | 
| 139 |  |  |  |  |  |  | ) . nonce62 4 | 
| 140 |  |  |  |  |  |  | ; | 
| 141 |  |  |  |  |  |  |  | 
| 142 |  |  |  |  |  |  | # ~59 bits, for remote port names, one longer than $UNIQ and uppercase at the end to avoid clashes | 
| 143 |  |  |  |  |  |  | $RUNIQ = nonce62 10; | 
| 144 |  |  |  |  |  |  | $RUNIQ =~ s/(.)$/\U$1/; | 
| 145 |  |  |  |  |  |  |  | 
| 146 |  |  |  |  |  |  | $NODE = ""; | 
| 147 |  |  |  |  |  |  | } | 
| 148 |  |  |  |  |  |  |  | 
| 149 |  |  |  |  |  |  | sub NODE() { | 
| 150 | 0 |  |  | 0 | 0 | 0 | $NODE | 
| 151 |  |  |  |  |  |  | } | 
| 152 |  |  |  |  |  |  |  | 
| 153 |  |  |  |  |  |  | sub node_of($) { | 
| 154 | 0 |  |  | 0 | 0 | 0 | my ($node, undef) = split /#/, $_[0], 2; | 
| 155 |  |  |  |  |  |  |  | 
| 156 | 0 |  |  |  |  | 0 | $node | 
| 157 |  |  |  |  |  |  | } | 
| 158 |  |  |  |  |  |  |  | 
| 159 |  |  |  |  |  |  | BEGIN { | 
| 160 |  |  |  |  |  |  | *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE} | 
| 161 |  |  |  |  |  |  | ? sub () { 1 } | 
| 162 | 1 | 50 |  | 1 |  | 6075 | : sub () { 0 }; | 
| 163 |  |  |  |  |  |  | } | 
| 164 |  |  |  |  |  |  |  | 
| 165 |  |  |  |  |  |  | our $DELAY_TIMER; | 
| 166 |  |  |  |  |  |  | our @DELAY_QUEUE; | 
| 167 |  |  |  |  |  |  |  | 
| 168 |  |  |  |  |  |  | our $delay_run = sub { | 
| 169 |  |  |  |  |  |  | (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1; | 
| 170 |  |  |  |  |  |  | }; | 
| 171 |  |  |  |  |  |  |  | 
| 172 |  |  |  |  |  |  | sub delay($) { | 
| 173 | 0 |  |  | 0 | 0 | 0 | push @DELAY_QUEUE, shift; | 
| 174 | 0 |  | 0 |  |  | 0 | $DELAY_TIMER ||= AE::timer 0, 0, $delay_run; | 
| 175 |  |  |  |  |  |  | } | 
| 176 |  |  |  |  |  |  |  | 
| 177 |  |  |  |  |  |  | =item $AnyEvent::MP::Kernel::SRCNODE | 
| 178 |  |  |  |  |  |  |  | 
| 179 |  |  |  |  |  |  | During execution of a message callback, this variable contains the node ID | 
| 180 |  |  |  |  |  |  | of the origin node. | 
| 181 |  |  |  |  |  |  |  | 
| 182 |  |  |  |  |  |  | The main use of this variable is for debugging output - there are probably | 
| 183 |  |  |  |  |  |  | very few other cases where you need to know the source node ID. | 
| 184 |  |  |  |  |  |  |  | 
| 185 |  |  |  |  |  |  | =cut | 
| 186 |  |  |  |  |  |  |  | 
| 187 |  |  |  |  |  |  | sub _inject { | 
| 188 | 0 |  |  | 0 |  | 0 | warn "RCV $SRCNODE -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_; | 
| 189 |  |  |  |  |  |  |  | 
| 190 | 0 | 0 |  |  |  | 0 | &{ $PORT{+shift} or return }; | 
|  | 0 |  |  |  |  | 0 |  | 
| 191 |  |  |  |  |  |  | } | 
| 192 |  |  |  |  |  |  |  | 
| 193 |  |  |  |  |  |  | # this function adds a node-ref, so you can send stuff to it | 
| 194 |  |  |  |  |  |  | # it is basically the central routing component. | 
| 195 |  |  |  |  |  |  | sub add_node { | 
| 196 | 0 | 0 |  | 0 | 0 | 0 | $NODE{$_[0]} || do { | 
| 197 | 0 |  |  |  |  | 0 | my ($node) = @_; | 
| 198 |  |  |  |  |  |  |  | 
| 199 | 0 | 0 |  |  |  | 0 | length $node | 
| 200 |  |  |  |  |  |  | or Carp::croak "'undef' or the empty string are not valid node/port IDs"; | 
| 201 |  |  |  |  |  |  |  | 
| 202 |  |  |  |  |  |  | # registers itself in %NODE | 
| 203 | 0 |  |  |  |  | 0 | new AnyEvent::MP::Node::Remote $node | 
| 204 |  |  |  |  |  |  | } | 
| 205 |  |  |  |  |  |  | } | 
| 206 |  |  |  |  |  |  |  | 
| 207 |  |  |  |  |  |  | sub snd(@) { | 
| 208 | 0 |  |  | 0 | 0 | 0 | my ($nodeid, $portid) = split /#/, shift, 2; | 
| 209 |  |  |  |  |  |  |  | 
| 210 | 0 |  |  |  |  | 0 | warn "SND $nodeid <- " . eval { JSON::XS->new->encode ([$portid, @_]) } . "\n" if TRACE && @_; | 
| 211 |  |  |  |  |  |  |  | 
| 212 |  |  |  |  |  |  | ($NODE{$nodeid} || add_node $nodeid) | 
| 213 | 0 |  | 0 |  |  | 0 | ->{send} (["$portid", @_]); | 
| 214 |  |  |  |  |  |  | } | 
| 215 |  |  |  |  |  |  |  | 
| 216 |  |  |  |  |  |  | sub port_is_local($) { | 
| 217 | 0 |  |  | 0 | 0 | 0 | my ($nodeid, undef) = split /#/, $_[0], 2; | 
| 218 |  |  |  |  |  |  |  | 
| 219 | 0 |  |  |  |  | 0 | $nodeid eq $NODE | 
| 220 |  |  |  |  |  |  | } | 
| 221 |  |  |  |  |  |  |  | 
| 222 |  |  |  |  |  |  | =item snd_to_func $node, $func, @args | 
| 223 |  |  |  |  |  |  |  | 
| 224 |  |  |  |  |  |  | Expects a node ID and a name of a function. Asynchronously tries to call | 
| 225 |  |  |  |  |  |  | this function with the given arguments on that node. | 
| 226 |  |  |  |  |  |  |  | 
| 227 |  |  |  |  |  |  | This function can be used to implement C-like interfaces. | 
| 228 |  |  |  |  |  |  |  | 
| 229 |  |  |  |  |  |  | =cut | 
| 230 |  |  |  |  |  |  |  | 
| 231 |  |  |  |  |  |  | sub snd_to_func($$;@) { | 
| 232 | 0 |  |  | 0 | 1 | 0 | my $nodeid = shift; | 
| 233 |  |  |  |  |  |  |  | 
| 234 |  |  |  |  |  |  | # on $NODE, we artificially delay... (for spawn) | 
| 235 |  |  |  |  |  |  | # this is very ugly - maybe we should simply delay ALL messages, | 
| 236 |  |  |  |  |  |  | # to avoid deep recursion issues. but that's so... slow... | 
| 237 | 0 | 0 |  |  |  | 0 | $AnyEvent::MP::Node::Self::DELAY = 1 | 
| 238 |  |  |  |  |  |  | if $nodeid ne $NODE; | 
| 239 |  |  |  |  |  |  |  | 
| 240 | 0 |  | 0 |  |  | 0 | ($NODE{$nodeid} || add_node $nodeid)->{send} (["", @_]); | 
| 241 |  |  |  |  |  |  | } | 
| 242 |  |  |  |  |  |  |  | 
| 243 |  |  |  |  |  |  | =item snd_on $node, @msg | 
| 244 |  |  |  |  |  |  |  | 
| 245 |  |  |  |  |  |  | Executes C with the given C<@msg> (which must include the destination | 
| 246 |  |  |  |  |  |  | port) on the given node. | 
| 247 |  |  |  |  |  |  |  | 
| 248 |  |  |  |  |  |  | =cut | 
| 249 |  |  |  |  |  |  |  | 
| 250 |  |  |  |  |  |  | sub snd_on($@) { | 
| 251 | 0 |  |  | 0 | 1 | 0 | my $node = shift; | 
| 252 | 0 |  |  |  |  | 0 | snd $node, snd => @_; | 
| 253 |  |  |  |  |  |  | } | 
| 254 |  |  |  |  |  |  |  | 
| 255 |  |  |  |  |  |  | =item eval_on $node, $string[, @reply] | 
| 256 |  |  |  |  |  |  |  | 
| 257 |  |  |  |  |  |  | Evaluates the given string as Perl expression on the given node. When | 
| 258 |  |  |  |  |  |  | @reply is specified, then it is used to construct a reply message with | 
| 259 |  |  |  |  |  |  | C<"$@"> and any results from the eval appended. | 
| 260 |  |  |  |  |  |  |  | 
| 261 |  |  |  |  |  |  | =cut | 
| 262 |  |  |  |  |  |  |  | 
| 263 |  |  |  |  |  |  | sub eval_on($$;@) { | 
| 264 | 0 |  |  | 0 | 1 | 0 | my $node = shift; | 
| 265 | 0 |  |  |  |  | 0 | snd $node, eval => @_; | 
| 266 |  |  |  |  |  |  | } | 
| 267 |  |  |  |  |  |  |  | 
| 268 |  |  |  |  |  |  | sub kil(@) { | 
| 269 | 0 |  |  | 0 | 0 | 0 | my ($nodeid, $portid) = split /#/, shift, 2; | 
| 270 |  |  |  |  |  |  |  | 
| 271 | 0 | 0 |  |  |  | 0 | length $portid | 
| 272 |  |  |  |  |  |  | or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught"; | 
| 273 |  |  |  |  |  |  |  | 
| 274 | 0 |  | 0 |  |  | 0 | ($NODE{$nodeid} || add_node $nodeid) | 
| 275 |  |  |  |  |  |  | ->kill ("$portid", @_); | 
| 276 |  |  |  |  |  |  | } | 
| 277 |  |  |  |  |  |  |  | 
| 278 |  |  |  |  |  |  | ############################################################################# | 
| 279 |  |  |  |  |  |  | # node monitoring and info | 
| 280 |  |  |  |  |  |  |  | 
| 281 |  |  |  |  |  |  | =item $bool = node_is_up $nodeid | 
| 282 |  |  |  |  |  |  |  | 
| 283 |  |  |  |  |  |  | Returns true if the given node is "up", that is, the kernel thinks it has | 
| 284 |  |  |  |  |  |  | a working connection to it. | 
| 285 |  |  |  |  |  |  |  | 
| 286 |  |  |  |  |  |  | More precisely, if the node is up, returns C<1>. If the node is currently | 
| 287 |  |  |  |  |  |  | connecting or otherwise known but not connected, returns C<0>. If nothing | 
| 288 |  |  |  |  |  |  | is known about the node, returns C. | 
| 289 |  |  |  |  |  |  |  | 
| 290 |  |  |  |  |  |  | =cut | 
| 291 |  |  |  |  |  |  |  | 
| 292 |  |  |  |  |  |  | sub node_is_up($) { | 
| 293 |  |  |  |  |  |  | ($_[0] eq $NODE) || ($NODE{$_[0]} or return)->{transport} | 
| 294 | 0 | 0 | 0 | 0 | 1 | 0 | ? 1 : 0 | 
| 295 |  |  |  |  |  |  | } | 
| 296 |  |  |  |  |  |  |  | 
| 297 |  |  |  |  |  |  | =item @nodes = up_nodes | 
| 298 |  |  |  |  |  |  |  | 
| 299 |  |  |  |  |  |  | Return the node IDs of all nodes that are currently connected (excluding | 
| 300 |  |  |  |  |  |  | the node itself). | 
| 301 |  |  |  |  |  |  |  | 
| 302 |  |  |  |  |  |  | =cut | 
| 303 |  |  |  |  |  |  |  | 
| 304 |  |  |  |  |  |  | sub up_nodes() { | 
| 305 | 0 |  |  | 0 | 1 | 0 | map $_->{id}, grep $_->{transport}, values %NODE | 
| 306 |  |  |  |  |  |  | } | 
| 307 |  |  |  |  |  |  |  | 
| 308 |  |  |  |  |  |  | =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason) | 
| 309 |  |  |  |  |  |  |  | 
| 310 |  |  |  |  |  |  | Registers a callback that is called each time a node goes up (a connection | 
| 311 |  |  |  |  |  |  | is established) or down (the connection is lost). | 
| 312 |  |  |  |  |  |  |  | 
| 313 |  |  |  |  |  |  | Node up messages can only be followed by node down messages for the same | 
| 314 |  |  |  |  |  |  | node, and vice versa. | 
| 315 |  |  |  |  |  |  |  | 
| 316 |  |  |  |  |  |  | Note that monitoring a node is usually better done by monitoring its node | 
| 317 |  |  |  |  |  |  | port. This function is mainly of interest to modules that are concerned | 
| 318 |  |  |  |  |  |  | about the network topology and low-level connection handling. | 
| 319 |  |  |  |  |  |  |  | 
| 320 |  |  |  |  |  |  | Callbacks I block and I send any messages. | 
| 321 |  |  |  |  |  |  |  | 
| 322 |  |  |  |  |  |  | The function returns an optional guard which can be used to unregister | 
| 323 |  |  |  |  |  |  | the monitoring callback again. | 
| 324 |  |  |  |  |  |  |  | 
| 325 |  |  |  |  |  |  | Example: make sure you call function C for all nodes that are up | 
| 326 |  |  |  |  |  |  | or go up (and down). | 
| 327 |  |  |  |  |  |  |  | 
| 328 |  |  |  |  |  |  | newnode $_, 1 for up_nodes; | 
| 329 |  |  |  |  |  |  | mon_nodes \&newnode; | 
| 330 |  |  |  |  |  |  |  | 
| 331 |  |  |  |  |  |  | =cut | 
| 332 |  |  |  |  |  |  |  | 
| 333 |  |  |  |  |  |  | our %MON_NODES; | 
| 334 |  |  |  |  |  |  |  | 
| 335 |  |  |  |  |  |  | sub mon_nodes($) { | 
| 336 | 3 |  |  | 3 | 1 | 4 | my ($cb) = @_; | 
| 337 |  |  |  |  |  |  |  | 
| 338 | 3 |  |  |  |  | 7 | $MON_NODES{$cb+0} = $cb; | 
| 339 |  |  |  |  |  |  |  | 
| 340 |  |  |  |  |  |  | defined wantarray | 
| 341 | 0 |  |  | 0 |  | 0 | and Guard::guard { delete $MON_NODES{$cb+0} } | 
| 342 | 3 | 50 |  |  |  | 8 | } | 
| 343 |  |  |  |  |  |  |  | 
| 344 |  |  |  |  |  |  | sub _inject_nodeevent($$;@) { | 
| 345 | 0 |  |  | 0 |  | 0 | my ($node, $up, @reason) = @_; | 
| 346 |  |  |  |  |  |  |  | 
| 347 | 0 | 0 |  |  |  | 0 | AE::log 7 => "$node->{id} is " . ($up ? "up." : "down (@reason)."); | 
| 348 |  |  |  |  |  |  |  | 
| 349 | 0 |  |  |  |  | 0 | for my $cb (values %MON_NODES) { | 
| 350 | 0 | 0 |  |  |  | 0 | eval { $cb->($node->{id}, $up, @reason); 1 } | 
|  | 0 |  |  |  |  | 0 |  | 
|  | 0 |  |  |  |  | 0 |  | 
| 351 |  |  |  |  |  |  | or AE::log die => $@; | 
| 352 |  |  |  |  |  |  | } | 
| 353 |  |  |  |  |  |  | } | 
| 354 |  |  |  |  |  |  |  | 
| 355 |  |  |  |  |  |  | ############################################################################# | 
| 356 |  |  |  |  |  |  | # self node code | 
| 357 |  |  |  |  |  |  |  | 
| 358 |  |  |  |  |  |  | sub _kill { | 
| 359 | 0 |  |  | 0 |  | 0 | my $port = shift; | 
| 360 |  |  |  |  |  |  |  | 
| 361 | 0 | 0 |  |  |  | 0 | delete $PORT{$port} | 
| 362 |  |  |  |  |  |  | or return; # killing nonexistent ports is O.K. | 
| 363 | 0 |  |  |  |  | 0 | delete $PORT_DATA{$port}; | 
| 364 |  |  |  |  |  |  |  | 
| 365 | 0 | 0 | 0 |  |  | 0 | my $mon = delete $LMON{$port} | 
| 366 |  |  |  |  |  |  | or !@_ | 
| 367 |  |  |  |  |  |  | or AE::log die => "unmonitored local port $port died with reason: @_"; | 
| 368 |  |  |  |  |  |  |  | 
| 369 | 0 |  |  |  |  | 0 | $_->(@_) for values %$mon; | 
| 370 |  |  |  |  |  |  | } | 
| 371 |  |  |  |  |  |  |  | 
| 372 |  |  |  |  |  |  | sub _monitor { | 
| 373 |  |  |  |  |  |  | return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]") | 
| 374 | 0 | 0 |  | 0 |  | 0 | unless exists $PORT{$_[1]}; | 
| 375 |  |  |  |  |  |  |  | 
| 376 | 0 |  |  |  |  | 0 | $LMON{$_[1]}{$_[2]+0} = $_[2]; | 
| 377 |  |  |  |  |  |  | } | 
| 378 |  |  |  |  |  |  |  | 
| 379 |  |  |  |  |  |  | sub _unmonitor { | 
| 380 |  |  |  |  |  |  | delete $LMON{$_[1]}{$_[2]+0} | 
| 381 | 0 | 0 |  | 0 |  | 0 | if exists $LMON{$_[1]}; | 
| 382 |  |  |  |  |  |  | } | 
| 383 |  |  |  |  |  |  |  | 
| 384 |  |  |  |  |  |  | sub _secure_check { | 
| 385 | 0 | 0 |  | 0 |  | 0 | $SECURE | 
| 386 |  |  |  |  |  |  | and die "remote execution not allowed\n"; | 
| 387 |  |  |  |  |  |  | } | 
| 388 |  |  |  |  |  |  |  | 
| 389 |  |  |  |  |  |  | our %NODE_REQ; | 
| 390 |  |  |  |  |  |  |  | 
| 391 |  |  |  |  |  |  | %NODE_REQ = ( | 
| 392 |  |  |  |  |  |  | # "mproto" - monitoring protocol | 
| 393 |  |  |  |  |  |  |  | 
| 394 |  |  |  |  |  |  | # monitoring | 
| 395 |  |  |  |  |  |  | mon0 => sub { # stop monitoring a port for another node | 
| 396 |  |  |  |  |  |  | my $portid = shift; | 
| 397 |  |  |  |  |  |  | # the if exists should not be needed, but there is apparently a bug | 
| 398 |  |  |  |  |  |  | # elsewhere, and this works around that, silently suppressing that bug. sigh. | 
| 399 |  |  |  |  |  |  | _unmonitor undef, $portid, delete $NODE{$SRCNODE}{rmon}{$portid} | 
| 400 |  |  |  |  |  |  | if exists $NODE{$SRCNODE}; | 
| 401 |  |  |  |  |  |  | }, | 
| 402 |  |  |  |  |  |  | mon1 => sub { # start monitoring a port for another node | 
| 403 |  |  |  |  |  |  | my $portid = shift; | 
| 404 |  |  |  |  |  |  | Scalar::Util::weaken (my $node = $NODE{$SRCNODE}); | 
| 405 |  |  |  |  |  |  | _monitor undef, $portid, $node->{rmon}{$portid} = sub { | 
| 406 |  |  |  |  |  |  | delete $node->{rmon}{$portid}; | 
| 407 |  |  |  |  |  |  | $node->send (["", kil0 => $portid, @_]) | 
| 408 |  |  |  |  |  |  | if $node && $node->{transport}; | 
| 409 |  |  |  |  |  |  | }; | 
| 410 |  |  |  |  |  |  | }, | 
| 411 |  |  |  |  |  |  | # another node has killed a monitored port | 
| 412 |  |  |  |  |  |  | kil0 => sub { | 
| 413 |  |  |  |  |  |  | my $cbs = delete $NODE{$SRCNODE}{lmon}{+shift} | 
| 414 |  |  |  |  |  |  | or return; | 
| 415 |  |  |  |  |  |  |  | 
| 416 |  |  |  |  |  |  | $_->(@_) for @$cbs; | 
| 417 |  |  |  |  |  |  | }, | 
| 418 |  |  |  |  |  |  | # another node wants to kill a local port | 
| 419 |  |  |  |  |  |  | kil1 => \&_kill, | 
| 420 |  |  |  |  |  |  |  | 
| 421 |  |  |  |  |  |  | # "public" services - not actually public | 
| 422 |  |  |  |  |  |  |  | 
| 423 |  |  |  |  |  |  | # relay message to another node / generic echo | 
| 424 |  |  |  |  |  |  | snd => sub { | 
| 425 |  |  |  |  |  |  | &snd | 
| 426 |  |  |  |  |  |  | }, | 
| 427 |  |  |  |  |  |  | # ask if a node supports the given request, only works for fixed tags | 
| 428 |  |  |  |  |  |  | can => sub { | 
| 429 |  |  |  |  |  |  | my $method = shift; | 
| 430 |  |  |  |  |  |  | snd @_, exists $NODE_REQ{$method}; | 
| 431 |  |  |  |  |  |  | }, | 
| 432 |  |  |  |  |  |  |  | 
| 433 |  |  |  |  |  |  | # random utilities | 
| 434 |  |  |  |  |  |  | eval => sub { | 
| 435 |  |  |  |  |  |  | &_secure_check; | 
| 436 |  |  |  |  |  |  | my @res = do { package main; eval shift }; | 
| 437 |  |  |  |  |  |  | snd @_, "$@", @res if @_; | 
| 438 |  |  |  |  |  |  | }, | 
| 439 |  |  |  |  |  |  | time => sub { | 
| 440 |  |  |  |  |  |  | snd @_, AE::now; | 
| 441 |  |  |  |  |  |  | }, | 
| 442 |  |  |  |  |  |  | devnull => sub { | 
| 443 |  |  |  |  |  |  | # | 
| 444 |  |  |  |  |  |  | }, | 
| 445 |  |  |  |  |  |  | "" => sub { | 
| 446 |  |  |  |  |  |  | # empty messages are keepalives or similar devnull-applications | 
| 447 |  |  |  |  |  |  | }, | 
| 448 |  |  |  |  |  |  | ); | 
| 449 |  |  |  |  |  |  |  | 
| 450 |  |  |  |  |  |  | # the node port | 
| 451 |  |  |  |  |  |  | new AnyEvent::MP::Node::Self $NODE; # registers itself in %NODE | 
| 452 |  |  |  |  |  |  |  | 
| 453 |  |  |  |  |  |  | $PORT{""} = sub { | 
| 454 |  |  |  |  |  |  | my $tag = shift; | 
| 455 |  |  |  |  |  |  | eval { &{ $NODE_REQ{$tag} ||= do { &_secure_check; load_func $tag } } }; | 
| 456 |  |  |  |  |  |  | AE::log die => "error processing node message from $SRCNODE: $@" if $@; | 
| 457 |  |  |  |  |  |  | }; | 
| 458 |  |  |  |  |  |  |  | 
| 459 |  |  |  |  |  |  | our $MPROTO = 1; | 
| 460 |  |  |  |  |  |  |  | 
| 461 |  |  |  |  |  |  | # tell everybody who connects our nproto | 
| 462 |  |  |  |  |  |  | push @AnyEvent::MP::Transport::HOOK_GREET, sub { | 
| 463 |  |  |  |  |  |  | $_[0]{local_greeting}{mproto} = $MPROTO; | 
| 464 |  |  |  |  |  |  | }; | 
| 465 |  |  |  |  |  |  |  | 
| 466 |  |  |  |  |  |  | ############################################################################# | 
| 467 |  |  |  |  |  |  | # seed management, try to keep connections to all seeds at all times | 
| 468 |  |  |  |  |  |  |  | 
| 469 |  |  |  |  |  |  | our %SEED_NODE;    # seed ID => node ID|undef | 
| 470 |  |  |  |  |  |  | our %NODE_SEED;    # map node ID to seed ID | 
| 471 |  |  |  |  |  |  | our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting | 
| 472 |  |  |  |  |  |  | our $SEED_WATCHER; | 
| 473 |  |  |  |  |  |  | our $SEED_RETRY; | 
| 474 |  |  |  |  |  |  | our %GLOBAL_NODE;  # global => undef | 
| 475 |  |  |  |  |  |  |  | 
| 476 |  |  |  |  |  |  | sub seed_connect { | 
| 477 | 0 |  |  | 0 | 0 | 0 | my ($seed) = @_; | 
| 478 |  |  |  |  |  |  |  | 
| 479 | 0 | 0 |  |  |  | 0 | my ($host, $port) = AnyEvent::Socket::parse_hostport $seed | 
| 480 |  |  |  |  |  |  | or Carp::croak "$seed: unparsable seed address"; | 
| 481 |  |  |  |  |  |  |  | 
| 482 | 0 |  |  |  |  | 0 | AE::log 9 => "trying connect to seed node $seed."; | 
| 483 |  |  |  |  |  |  |  | 
| 484 |  |  |  |  |  |  | $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect | 
| 485 |  |  |  |  |  |  | $host, $port, | 
| 486 |  |  |  |  |  |  | on_greeted => sub { | 
| 487 |  |  |  |  |  |  | # called after receiving remote greeting, learn remote node name | 
| 488 |  |  |  |  |  |  |  | 
| 489 |  |  |  |  |  |  | # we rely on untrusted data here (the remote node name) this is | 
| 490 |  |  |  |  |  |  | # hopefully ok, as this can at most be used for DOSing, which is easy | 
| 491 |  |  |  |  |  |  | # when you can do MITM anyway. | 
| 492 |  |  |  |  |  |  |  | 
| 493 |  |  |  |  |  |  | # if we connect to ourselves, nuke this seed, but make sure we act like a seed | 
| 494 | 0 | 0 |  | 0 |  | 0 | if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) { | 
| 495 | 0 |  |  |  |  | 0 | require AnyEvent::MP::Global; # every seed becomes a global node currently | 
| 496 | 0 |  |  |  |  | 0 | delete $SEED_NODE{$seed}; | 
| 497 |  |  |  |  |  |  | } else { | 
| 498 | 0 |  |  |  |  | 0 | $SEED_NODE{$seed} = $_[0]{remote_node}; | 
| 499 | 0 |  |  |  |  | 0 | $NODE_SEED{$_[0]{remote_node}} = $seed; | 
| 500 |  |  |  |  |  |  |  | 
| 501 |  |  |  |  |  |  | # also start global service, in case it isn't running | 
| 502 |  |  |  |  |  |  | # since we probably switch conenctions, maybe we don't need to do this here? | 
| 503 | 0 |  |  |  |  | 0 | snd $_[0]{remote_node}, "g_slave"; | 
| 504 |  |  |  |  |  |  | } | 
| 505 |  |  |  |  |  |  | }, | 
| 506 |  |  |  |  |  |  | sub { | 
| 507 | 0 |  |  | 0 |  | 0 | delete $SEED_CONNECT{$seed}; | 
| 508 |  |  |  |  |  |  | } | 
| 509 | 0 |  | 0 |  |  | 0 | ; | 
| 510 |  |  |  |  |  |  | } | 
| 511 |  |  |  |  |  |  |  | 
| 512 |  |  |  |  |  |  | sub seed_all { | 
| 513 |  |  |  |  |  |  | my @seeds = grep | 
| 514 | 0 |  | 0 | 0 | 0 | 0 | !(defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_}), | 
| 515 |  |  |  |  |  |  | keys %SEED_NODE; | 
| 516 |  |  |  |  |  |  |  | 
| 517 | 0 | 0 |  |  |  | 0 | if (@seeds) { | 
| 518 |  |  |  |  |  |  | # start connection attempt for every seed we are not connected to yet | 
| 519 |  |  |  |  |  |  | seed_connect $_ | 
| 520 | 0 |  |  |  |  | 0 | for grep !exists $SEED_CONNECT{$_}, @seeds; | 
| 521 |  |  |  |  |  |  |  | 
| 522 | 0 |  |  |  |  | 0 | $SEED_RETRY = $SEED_RETRY * 2; | 
| 523 |  |  |  |  |  |  | $SEED_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout} | 
| 524 | 0 | 0 |  |  |  | 0 | if $SEED_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}; | 
| 525 |  |  |  |  |  |  |  | 
| 526 | 0 |  |  |  |  | 0 | $SEED_WATCHER = AE::timer $SEED_RETRY, 0, \&seed_all; | 
| 527 |  |  |  |  |  |  |  | 
| 528 |  |  |  |  |  |  | } else { | 
| 529 |  |  |  |  |  |  | # all seeds connected or connecting, no need to restart timer | 
| 530 | 0 |  |  |  |  | 0 | undef $SEED_WATCHER; | 
| 531 |  |  |  |  |  |  | } | 
| 532 |  |  |  |  |  |  | } | 
| 533 |  |  |  |  |  |  |  | 
| 534 |  |  |  |  |  |  | sub seed_again { | 
| 535 | 0 |  |  | 0 | 0 | 0 | $SEED_RETRY = (1 + rand) * 0.6; | 
| 536 | 0 |  | 0 |  |  | 0 | $SEED_WATCHER ||= AE::timer 0, 0, \&seed_all; | 
| 537 |  |  |  |  |  |  | } | 
| 538 |  |  |  |  |  |  |  | 
| 539 |  |  |  |  |  |  | # sets new seed list, starts connecting | 
| 540 |  |  |  |  |  |  | sub set_seeds(@) { | 
| 541 | 0 |  |  | 0 | 0 | 0 | %SEED_NODE     = (); | 
| 542 | 0 |  |  |  |  | 0 | %NODE_SEED     = (); | 
| 543 | 0 |  |  |  |  | 0 | %SEED_CONNECT  = (); | 
| 544 |  |  |  |  |  |  |  | 
| 545 | 0 |  |  |  |  | 0 | @SEED_NODE{@_} = (); | 
| 546 |  |  |  |  |  |  |  | 
| 547 | 0 |  |  |  |  | 0 | seed_again; | 
| 548 |  |  |  |  |  |  | } | 
| 549 |  |  |  |  |  |  |  | 
| 550 |  |  |  |  |  |  | # normal nodes only record global node connections | 
| 551 |  |  |  |  |  |  | $NODE_REQ{g_global} = sub { | 
| 552 |  |  |  |  |  |  | undef $GLOBAL_NODE{$SRCNODE}; | 
| 553 |  |  |  |  |  |  | }; | 
| 554 |  |  |  |  |  |  |  | 
| 555 |  |  |  |  |  |  | mon_nodes sub { | 
| 556 |  |  |  |  |  |  | delete $GLOBAL_NODE{$_[0]} | 
| 557 |  |  |  |  |  |  | unless $_[1]; | 
| 558 |  |  |  |  |  |  |  | 
| 559 |  |  |  |  |  |  | return unless exists $NODE_SEED{$_[0]}; | 
| 560 |  |  |  |  |  |  |  | 
| 561 |  |  |  |  |  |  | if ($_[1]) { | 
| 562 |  |  |  |  |  |  | # each time a connection to a seed node goes up, make | 
| 563 |  |  |  |  |  |  | # sure it runs the global service. | 
| 564 |  |  |  |  |  |  | snd $_[0], "g_slave"; | 
| 565 |  |  |  |  |  |  | } else { | 
| 566 |  |  |  |  |  |  | # if we lost the connection to a seed node, make sure we are seeding | 
| 567 |  |  |  |  |  |  | seed_again; | 
| 568 |  |  |  |  |  |  | } | 
| 569 |  |  |  |  |  |  | }; | 
| 570 |  |  |  |  |  |  |  | 
| 571 |  |  |  |  |  |  | ############################################################################# | 
| 572 |  |  |  |  |  |  | # keepalive code - used to kepe conenctions to certain nodes alive | 
| 573 |  |  |  |  |  |  | # only used by global code atm., but ought to be exposed somehow. | 
| 574 |  |  |  |  |  |  | #TODO: should probbaly be done directly by node objects | 
| 575 |  |  |  |  |  |  |  | 
| 576 |  |  |  |  |  |  | our $KEEPALIVE_RETRY; | 
| 577 |  |  |  |  |  |  | our $KEEPALIVE_WATCHER; | 
| 578 |  |  |  |  |  |  | our %KEEPALIVE; # we want to keep these nodes alive | 
| 579 |  |  |  |  |  |  | our %KEEPALIVE_DOWN; # nodes that are down currently | 
| 580 |  |  |  |  |  |  |  | 
| 581 |  |  |  |  |  |  | sub keepalive_all { | 
| 582 | 0 |  |  | 0 | 0 | 0 | AE::log 9 => "keepalive: trying to establish connections with: " | 
| 583 |  |  |  |  |  |  | . (join " ", keys %KEEPALIVE_DOWN) | 
| 584 |  |  |  |  |  |  | . "."; | 
| 585 |  |  |  |  |  |  |  | 
| 586 |  |  |  |  |  |  | (add_node $_)->connect | 
| 587 | 0 |  |  |  |  | 0 | for keys %KEEPALIVE_DOWN; | 
| 588 |  |  |  |  |  |  |  | 
| 589 | 0 |  |  |  |  | 0 | $KEEPALIVE_RETRY = $KEEPALIVE_RETRY * 2; | 
| 590 |  |  |  |  |  |  | $KEEPALIVE_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout} | 
| 591 | 0 | 0 |  |  |  | 0 | if $KEEPALIVE_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}; | 
| 592 |  |  |  |  |  |  |  | 
| 593 | 0 |  |  |  |  | 0 | $KEEPALIVE_WATCHER = AE::timer $KEEPALIVE_RETRY, 0, \&keepalive_all; | 
| 594 |  |  |  |  |  |  | } | 
| 595 |  |  |  |  |  |  |  | 
| 596 |  |  |  |  |  |  | sub keepalive_again { | 
| 597 | 0 |  |  | 0 | 0 | 0 | $KEEPALIVE_RETRY = (1 + rand) * 0.3; | 
| 598 | 0 |  |  |  |  | 0 | keepalive_all; | 
| 599 |  |  |  |  |  |  | } | 
| 600 |  |  |  |  |  |  |  | 
| 601 |  |  |  |  |  |  | sub keepalive_add { | 
| 602 | 0 | 0 |  | 0 | 0 | 0 | return if $KEEPALIVE{$_[0]}++; | 
| 603 |  |  |  |  |  |  |  | 
| 604 | 0 | 0 |  |  |  | 0 | return if node_is_up $_[0]; | 
| 605 | 0 |  |  |  |  | 0 | undef $KEEPALIVE_DOWN{$_[0]}; | 
| 606 | 0 |  |  |  |  | 0 | keepalive_again; | 
| 607 |  |  |  |  |  |  | } | 
| 608 |  |  |  |  |  |  |  | 
| 609 |  |  |  |  |  |  | sub keepalive_del { | 
| 610 | 0 | 0 |  | 0 | 0 | 0 | return if --$KEEPALIVE{$_[0]}; | 
| 611 |  |  |  |  |  |  |  | 
| 612 | 0 |  |  |  |  | 0 | delete $KEEPALIVE     {$_[0]}; | 
| 613 | 0 |  |  |  |  | 0 | delete $KEEPALIVE_DOWN{$_[0]}; | 
| 614 |  |  |  |  |  |  |  | 
| 615 | 0 | 0 |  |  |  | 0 | undef $KEEPALIVE_WATCHER | 
| 616 |  |  |  |  |  |  | unless %KEEPALIVE_DOWN; | 
| 617 |  |  |  |  |  |  | } | 
| 618 |  |  |  |  |  |  |  | 
| 619 |  |  |  |  |  |  | mon_nodes sub { | 
| 620 |  |  |  |  |  |  | return unless exists $KEEPALIVE{$_[0]}; | 
| 621 |  |  |  |  |  |  |  | 
| 622 |  |  |  |  |  |  | if ($_[1]) { | 
| 623 |  |  |  |  |  |  | delete $KEEPALIVE_DOWN{$_[0]}; | 
| 624 |  |  |  |  |  |  |  | 
| 625 |  |  |  |  |  |  | undef $KEEPALIVE_WATCHER | 
| 626 |  |  |  |  |  |  | unless %KEEPALIVE_DOWN; | 
| 627 |  |  |  |  |  |  | } else { | 
| 628 |  |  |  |  |  |  | # lost the conenction, try to connect again | 
| 629 |  |  |  |  |  |  | undef $KEEPALIVE_DOWN{$_[0]}; | 
| 630 |  |  |  |  |  |  | keepalive_again; | 
| 631 |  |  |  |  |  |  | } | 
| 632 |  |  |  |  |  |  | }; | 
| 633 |  |  |  |  |  |  |  | 
| 634 |  |  |  |  |  |  | ############################################################################# | 
| 635 |  |  |  |  |  |  | # talk with/to global nodes | 
| 636 |  |  |  |  |  |  |  | 
| 637 |  |  |  |  |  |  | # protocol messages: | 
| 638 |  |  |  |  |  |  | # | 
| 639 |  |  |  |  |  |  | # sent by global nodes | 
| 640 |  |  |  |  |  |  | # g_global                  - global nodes send this to all others | 
| 641 |  |  |  |  |  |  | # | 
| 642 |  |  |  |  |  |  | # database protocol | 
| 643 |  |  |  |  |  |  | # g_slave database          - make other global node master of the sender | 
| 644 |  |  |  |  |  |  | # g_set database            - global node's database to other global nodes | 
| 645 |  |  |  |  |  |  | # g_upd family set del      - update single family (any to global) | 
| 646 |  |  |  |  |  |  | # | 
| 647 |  |  |  |  |  |  | # slave <-> global protocol | 
| 648 |  |  |  |  |  |  | # g_find node               - query addresses for node (slave to global) | 
| 649 |  |  |  |  |  |  | # g_found node binds        - node addresses (global to slave) | 
| 650 |  |  |  |  |  |  | # g_db_family family id     - send g_reply with data (global to slave) | 
| 651 |  |  |  |  |  |  | # g_db_keys   family id     - send g_reply with data (global to slave) | 
| 652 |  |  |  |  |  |  | # g_db_values family id     - send g_reply with data (global to slave) | 
| 653 |  |  |  |  |  |  | # g_reply id result         - result of any query (global to slave) | 
| 654 |  |  |  |  |  |  | # g_mon1 family             - start to monitor family, replies with g_chg1 | 
| 655 |  |  |  |  |  |  | # g_mon0 family             - stop monitoring family | 
| 656 |  |  |  |  |  |  | # g_chg1 family hash        - initial value of family when starting to monitor | 
| 657 |  |  |  |  |  |  | # g_chg2 family set del     - like g_upd, but for monitoring only | 
| 658 |  |  |  |  |  |  | # | 
| 659 |  |  |  |  |  |  | # internal database families: | 
| 660 |  |  |  |  |  |  | # "'l" -> node -> listeners | 
| 661 |  |  |  |  |  |  | # "'g" -> node -> undef | 
| 662 |  |  |  |  |  |  | # ... | 
| 663 |  |  |  |  |  |  | # | 
| 664 |  |  |  |  |  |  |  | 
| 665 |  |  |  |  |  |  | # used on all nodes: | 
| 666 |  |  |  |  |  |  | our $MASTER;       # the global node we bind ourselves to | 
| 667 |  |  |  |  |  |  | our $MASTER_MON; | 
| 668 |  |  |  |  |  |  | our %LOCAL_DB;     # this node database | 
| 669 |  |  |  |  |  |  |  | 
| 670 |  |  |  |  |  |  | our $GPROTO = 1; | 
| 671 |  |  |  |  |  |  |  | 
| 672 |  |  |  |  |  |  | # tell everybody who connects our gproto | 
| 673 |  |  |  |  |  |  | push @AnyEvent::MP::Transport::HOOK_GREET, sub { | 
| 674 |  |  |  |  |  |  | $_[0]{local_greeting}{gproto} = $GPROTO; | 
| 675 |  |  |  |  |  |  | }; | 
| 676 |  |  |  |  |  |  |  | 
| 677 |  |  |  |  |  |  | ############################################################################# | 
| 678 |  |  |  |  |  |  | # master selection | 
| 679 |  |  |  |  |  |  |  | 
| 680 |  |  |  |  |  |  | # master requests | 
| 681 |  |  |  |  |  |  | our %GLOBAL_REQ; # $id => \@req | 
| 682 |  |  |  |  |  |  |  | 
| 683 |  |  |  |  |  |  | sub global_req_add { | 
| 684 | 0 |  |  | 0 | 0 | 0 | my ($id, $req) = @_; | 
| 685 |  |  |  |  |  |  |  | 
| 686 | 0 | 0 |  |  |  | 0 | return if exists $GLOBAL_REQ{$id}; | 
| 687 |  |  |  |  |  |  |  | 
| 688 | 0 |  |  |  |  | 0 | $GLOBAL_REQ{$id} = $req; | 
| 689 |  |  |  |  |  |  |  | 
| 690 | 0 | 0 |  |  |  | 0 | snd $MASTER, @$req | 
| 691 |  |  |  |  |  |  | if $MASTER; | 
| 692 |  |  |  |  |  |  | } | 
| 693 |  |  |  |  |  |  |  | 
| 694 |  |  |  |  |  |  | sub global_req_del { | 
| 695 | 0 |  |  | 0 | 0 | 0 | delete $GLOBAL_REQ{$_[0]}; | 
| 696 |  |  |  |  |  |  | } | 
| 697 |  |  |  |  |  |  |  | 
| 698 |  |  |  |  |  |  | ################################# | 
| 699 |  |  |  |  |  |  | # master rpc | 
| 700 |  |  |  |  |  |  |  | 
| 701 |  |  |  |  |  |  | our %GLOBAL_RES; | 
| 702 |  |  |  |  |  |  | our $GLOBAL_RES_ID = "a"; | 
| 703 |  |  |  |  |  |  |  | 
| 704 |  |  |  |  |  |  | sub global_call { | 
| 705 | 0 |  |  | 0 | 0 | 0 | my $id = ++$GLOBAL_RES_ID; | 
| 706 | 0 |  |  |  |  | 0 | $GLOBAL_RES{$id} = pop; | 
| 707 | 0 |  |  |  |  | 0 | global_req_add $id, [@_, $id]; | 
| 708 |  |  |  |  |  |  | } | 
| 709 |  |  |  |  |  |  |  | 
| 710 |  |  |  |  |  |  | $NODE_REQ{g_reply} = sub { | 
| 711 |  |  |  |  |  |  | my $id = shift; | 
| 712 |  |  |  |  |  |  | global_req_del $id; | 
| 713 |  |  |  |  |  |  | my $cb = delete $GLOBAL_RES{$id} | 
| 714 |  |  |  |  |  |  | or return; | 
| 715 |  |  |  |  |  |  | &$cb | 
| 716 |  |  |  |  |  |  | }; | 
| 717 |  |  |  |  |  |  |  | 
| 718 |  |  |  |  |  |  | ################################# | 
| 719 |  |  |  |  |  |  |  | 
| 720 |  |  |  |  |  |  | sub g_find { | 
| 721 | 0 |  |  | 0 | 0 | 0 | global_req_add "g_find $_[0]", [g_find => $_[0]]; | 
| 722 |  |  |  |  |  |  | } | 
| 723 |  |  |  |  |  |  |  | 
| 724 |  |  |  |  |  |  | # reply for g_find started in Node.pm | 
| 725 |  |  |  |  |  |  | $NODE_REQ{g_found} = sub { | 
| 726 |  |  |  |  |  |  | global_req_del "g_find $_[0]"; | 
| 727 |  |  |  |  |  |  |  | 
| 728 |  |  |  |  |  |  | my $node = $NODE{$_[0]} or return; | 
| 729 |  |  |  |  |  |  |  | 
| 730 |  |  |  |  |  |  | $node->connect_to ($_[1]); | 
| 731 |  |  |  |  |  |  | }; | 
| 732 |  |  |  |  |  |  |  | 
| 733 |  |  |  |  |  |  | sub master_set { | 
| 734 | 0 |  |  | 0 | 0 | 0 | $MASTER = $_[0]; | 
| 735 | 0 |  |  |  |  | 0 | AE::log 8 => "new master node: $MASTER."; | 
| 736 |  |  |  |  |  |  |  | 
| 737 |  |  |  |  |  |  | $MASTER_MON = mon_nodes sub { | 
| 738 | 0 | 0 | 0 | 0 |  | 0 | if ($_[0] eq $MASTER && !$_[1]) { | 
| 739 | 0 |  |  |  |  | 0 | undef $MASTER; | 
| 740 | 0 |  |  |  |  | 0 | master_search (); | 
| 741 |  |  |  |  |  |  | } | 
| 742 | 0 |  |  |  |  | 0 | }; | 
| 743 |  |  |  |  |  |  |  | 
| 744 | 0 |  |  |  |  | 0 | snd $MASTER, g_slave => \%LOCAL_DB; | 
| 745 |  |  |  |  |  |  |  | 
| 746 |  |  |  |  |  |  | # (re-)send queued requests | 
| 747 |  |  |  |  |  |  | snd $MASTER, @$_ | 
| 748 | 0 |  |  |  |  | 0 | for values %GLOBAL_REQ; | 
| 749 |  |  |  |  |  |  | } | 
| 750 |  |  |  |  |  |  |  | 
| 751 |  |  |  |  |  |  | sub master_search { | 
| 752 | 0 |  |  | 0 | 0 | 0 | AE::log 9 => "starting search for master node."; | 
| 753 |  |  |  |  |  |  |  | 
| 754 |  |  |  |  |  |  | #TODO: should also look for other global nodes, but we don't know them | 
| 755 | 0 |  |  |  |  | 0 | for (keys %NODE_SEED) { | 
| 756 | 0 | 0 |  |  |  | 0 | if (node_is_up $_) { | 
| 757 | 0 |  |  |  |  | 0 | master_set $_; | 
| 758 | 0 |  |  |  |  | 0 | return; | 
| 759 |  |  |  |  |  |  | } | 
| 760 |  |  |  |  |  |  | } | 
| 761 |  |  |  |  |  |  |  | 
| 762 |  |  |  |  |  |  | $MASTER_MON = mon_nodes sub { | 
| 763 | 0 | 0 |  | 0 |  | 0 | return unless $_[1]; # we are only interested in node-ups | 
| 764 | 0 | 0 |  |  |  | 0 | return unless $NODE_SEED{$_[0]}; # we are only interested in seed nodes | 
| 765 |  |  |  |  |  |  |  | 
| 766 | 0 |  |  |  |  | 0 | master_set $_[0]; | 
| 767 | 0 |  |  |  |  | 0 | }; | 
| 768 |  |  |  |  |  |  | } | 
| 769 |  |  |  |  |  |  |  | 
| 770 |  |  |  |  |  |  | # other node wants to make us the master, so start the global service | 
| 771 |  |  |  |  |  |  | $NODE_REQ{g_slave} = sub { | 
| 772 |  |  |  |  |  |  | # load global module and redo the request | 
| 773 |  |  |  |  |  |  | require AnyEvent::MP::Global; | 
| 774 |  |  |  |  |  |  | &{ $NODE_REQ{g_slave} } | 
| 775 |  |  |  |  |  |  | }; | 
| 776 |  |  |  |  |  |  |  | 
| 777 |  |  |  |  |  |  | ############################################################################# | 
| 778 |  |  |  |  |  |  | # local database operations | 
| 779 |  |  |  |  |  |  |  | 
| 780 |  |  |  |  |  |  | # canonical probably not needed | 
| 781 |  |  |  |  |  |  | our $sv_eq_coder = JSON::XS->new->utf8->allow_nonref; | 
| 782 |  |  |  |  |  |  |  | 
| 783 |  |  |  |  |  |  | # are the two scalars equal? very very ugly and slow, need better way | 
| 784 |  |  |  |  |  |  | sub sv_eq($$) { | 
| 785 | 0 | 0 | 0 | 0 | 0 | 0 | ref $_[0] || ref $_[1] | 
|  |  |  | 0 |  |  |  |  | 
| 786 |  |  |  |  |  |  | ? (JSON::XS::encode $sv_eq_coder, $_[0]) eq (JSON::XS::encode $sv_eq_coder, $_[1]) | 
| 787 |  |  |  |  |  |  | : $_[0] eq $_[1] | 
| 788 |  |  |  |  |  |  | && defined $_[0] == defined $_[1] | 
| 789 |  |  |  |  |  |  | } | 
| 790 |  |  |  |  |  |  |  | 
| 791 |  |  |  |  |  |  | # local database management | 
| 792 |  |  |  |  |  |  |  | 
| 793 |  |  |  |  |  |  | sub db_del($@) { | 
| 794 | 0 |  |  | 0 | 0 | 0 | my $family = shift; | 
| 795 |  |  |  |  |  |  |  | 
| 796 | 0 |  |  |  |  | 0 | my @del = grep exists $LOCAL_DB{$family}{$_}, @_; | 
| 797 |  |  |  |  |  |  |  | 
| 798 | 0 | 0 |  |  |  | 0 | return unless @del; | 
| 799 |  |  |  |  |  |  |  | 
| 800 | 0 |  |  |  |  | 0 | delete @{ $LOCAL_DB{$family} }{@del}; | 
|  | 0 |  |  |  |  | 0 |  | 
| 801 | 0 | 0 |  |  |  | 0 | snd $MASTER, g_upd => $family => undef, \@del | 
| 802 |  |  |  |  |  |  | if defined $MASTER; | 
| 803 |  |  |  |  |  |  | } | 
| 804 |  |  |  |  |  |  |  | 
| 805 |  |  |  |  |  |  | sub db_set($$;$) { | 
| 806 | 0 |  |  | 0 | 0 | 0 | my ($family, $subkey) = @_; | 
| 807 |  |  |  |  |  |  |  | 
| 808 |  |  |  |  |  |  | #   if (ref $_[1]) { | 
| 809 |  |  |  |  |  |  | #      # bulk | 
| 810 |  |  |  |  |  |  | #      my @del = grep exists $LOCAL_DB{$_[0]}{$_}, keys ${ $_[1] }; | 
| 811 |  |  |  |  |  |  | #      $LOCAL_DB{$_[0]} = $_[1]; | 
| 812 |  |  |  |  |  |  | #      snd $MASTER, g_upd => $_[0] => $_[1], \@del | 
| 813 |  |  |  |  |  |  | #         if defined $MASTER; | 
| 814 |  |  |  |  |  |  | #   } else { | 
| 815 |  |  |  |  |  |  | # single-key | 
| 816 | 0 | 0 | 0 |  |  | 0 | unless (exists $LOCAL_DB{$family}{$subkey} && sv_eq $LOCAL_DB{$family}{$subkey}, $_[2]) { | 
| 817 | 0 |  |  |  |  | 0 | $LOCAL_DB{$family}{$subkey} = $_[2]; | 
| 818 | 0 | 0 |  |  |  | 0 | snd $MASTER, g_upd => $family => { $subkey => $_[2] } | 
| 819 |  |  |  |  |  |  | if defined $MASTER; | 
| 820 |  |  |  |  |  |  | } | 
| 821 |  |  |  |  |  |  | #   } | 
| 822 |  |  |  |  |  |  |  | 
| 823 |  |  |  |  |  |  | defined wantarray | 
| 824 | 0 |  |  | 0 |  | 0 | and Guard::guard { db_del $family => $subkey } | 
| 825 | 0 | 0 |  |  |  | 0 | } | 
| 826 |  |  |  |  |  |  |  | 
| 827 |  |  |  |  |  |  | # database query | 
| 828 |  |  |  |  |  |  |  | 
| 829 |  |  |  |  |  |  | sub db_family { | 
| 830 | 0 |  |  | 0 | 0 | 0 | my ($family, $cb) = @_; | 
| 831 | 0 |  |  |  |  | 0 | global_call g_db_family => $family, $cb; | 
| 832 |  |  |  |  |  |  | } | 
| 833 |  |  |  |  |  |  |  | 
| 834 |  |  |  |  |  |  | sub db_keys { | 
| 835 | 0 |  |  | 0 | 0 | 0 | my ($family, $cb) = @_; | 
| 836 | 0 |  |  |  |  | 0 | global_call g_db_keys   => $family, $cb; | 
| 837 |  |  |  |  |  |  | } | 
| 838 |  |  |  |  |  |  |  | 
| 839 |  |  |  |  |  |  | sub db_values { | 
| 840 | 0 |  |  | 0 | 0 | 0 | my ($family, $cb) = @_; | 
| 841 | 0 |  |  |  |  | 0 | global_call g_db_values => $family, $cb; | 
| 842 |  |  |  |  |  |  | } | 
| 843 |  |  |  |  |  |  |  | 
| 844 |  |  |  |  |  |  | # database monitoring | 
| 845 |  |  |  |  |  |  |  | 
| 846 |  |  |  |  |  |  | our %LOCAL_MON; # f, reply | 
| 847 |  |  |  |  |  |  | our %MON_DB;    # f, k, value | 
| 848 |  |  |  |  |  |  |  | 
| 849 |  |  |  |  |  |  | sub db_mon($@) { | 
| 850 | 0 |  |  | 0 | 0 | 0 | my ($family, $cb) = @_; | 
| 851 |  |  |  |  |  |  |  | 
| 852 | 0 | 0 |  |  |  | 0 | if (my $db = $MON_DB{$family}) { | 
| 853 |  |  |  |  |  |  | # we already monitor, so create a "dummy" change event | 
| 854 |  |  |  |  |  |  | # this is postponed, which might be too late (we could process | 
| 855 |  |  |  |  |  |  | # change events), so disable the callback at first | 
| 856 | 0 |  |  | 0 |  | 0 | $LOCAL_MON{$family}{$cb+0} = sub { }; | 
| 857 |  |  |  |  |  |  | AE::postpone { | 
| 858 | 0 | 0 |  | 0 |  | 0 | return unless exists $LOCAL_MON{$family}{$cb+0}; # guard might have gone away already | 
| 859 |  |  |  |  |  |  |  | 
| 860 |  |  |  |  |  |  | # set actual callback | 
| 861 | 0 |  |  |  |  | 0 | $LOCAL_MON{$family}{$cb+0} = $cb; | 
| 862 | 0 |  |  |  |  | 0 | $cb->($db, [keys %$db]); | 
| 863 | 0 |  |  |  |  | 0 | }; | 
| 864 |  |  |  |  |  |  | } else { | 
| 865 |  |  |  |  |  |  | # new monitor, request chg1 from upstream | 
| 866 | 0 |  |  |  |  | 0 | $LOCAL_MON{$family}{$cb+0} = $cb; | 
| 867 | 0 |  |  |  |  | 0 | global_req_add "mon1 $family" => [g_mon1 => $family]; | 
| 868 | 0 |  |  |  |  | 0 | $MON_DB{$family} = {}; | 
| 869 |  |  |  |  |  |  | } | 
| 870 |  |  |  |  |  |  |  | 
| 871 |  |  |  |  |  |  | defined wantarray | 
| 872 |  |  |  |  |  |  | and Guard::guard { | 
| 873 | 0 |  |  | 0 |  | 0 | my $mon = $LOCAL_MON{$family}; | 
| 874 | 0 |  |  |  |  | 0 | delete $mon->{$cb+0}; | 
| 875 |  |  |  |  |  |  |  | 
| 876 | 0 | 0 |  |  |  | 0 | unless (%$mon) { | 
| 877 | 0 |  |  |  |  | 0 | global_req_del "mon1 $family"; | 
| 878 |  |  |  |  |  |  |  | 
| 879 |  |  |  |  |  |  | # no global_req, because we don't care if we are not connected | 
| 880 | 0 | 0 |  |  |  | 0 | snd $MASTER, g_mon0 => $family | 
| 881 |  |  |  |  |  |  | if $MASTER; | 
| 882 |  |  |  |  |  |  |  | 
| 883 | 0 |  |  |  |  | 0 | delete $LOCAL_MON{$family}; | 
| 884 | 0 |  |  |  |  | 0 | delete $MON_DB{$family}; | 
| 885 |  |  |  |  |  |  | } | 
| 886 |  |  |  |  |  |  | } | 
| 887 | 0 | 0 |  |  |  | 0 | } | 
| 888 |  |  |  |  |  |  |  | 
| 889 |  |  |  |  |  |  | # full update | 
| 890 |  |  |  |  |  |  | $NODE_REQ{g_chg1} = sub { | 
| 891 |  |  |  |  |  |  | return unless $SRCNODE eq $MASTER; | 
| 892 |  |  |  |  |  |  | my ($f, $ndb) = @_; | 
| 893 |  |  |  |  |  |  |  | 
| 894 |  |  |  |  |  |  | my $db = $MON_DB{$f}; | 
| 895 |  |  |  |  |  |  | my (@a, @c, @d); | 
| 896 |  |  |  |  |  |  |  | 
| 897 |  |  |  |  |  |  | # add or replace keys | 
| 898 |  |  |  |  |  |  | while (my ($k, $v) = each %$ndb) { | 
| 899 |  |  |  |  |  |  | exists $db->{$k} | 
| 900 |  |  |  |  |  |  | ? push @c, $k | 
| 901 |  |  |  |  |  |  | : push @a, $k; | 
| 902 |  |  |  |  |  |  | $db->{$k} = $v; | 
| 903 |  |  |  |  |  |  | } | 
| 904 |  |  |  |  |  |  |  | 
| 905 |  |  |  |  |  |  | # delete keys that are no longer present | 
| 906 |  |  |  |  |  |  | for (grep !exists $ndb->{$_}, keys %$db) { | 
| 907 |  |  |  |  |  |  | delete $db->{$_}; | 
| 908 |  |  |  |  |  |  | push @d, $_; | 
| 909 |  |  |  |  |  |  | } | 
| 910 |  |  |  |  |  |  |  | 
| 911 |  |  |  |  |  |  | $_->($db, \@a, \@c, \@d) | 
| 912 |  |  |  |  |  |  | for values %{ $LOCAL_MON{$_[0]} }; | 
| 913 |  |  |  |  |  |  | }; | 
| 914 |  |  |  |  |  |  |  | 
| 915 |  |  |  |  |  |  | # incremental update | 
| 916 |  |  |  |  |  |  | $NODE_REQ{g_chg2} = sub { | 
| 917 |  |  |  |  |  |  | return unless $SRCNODE eq $MASTER; | 
| 918 |  |  |  |  |  |  | my ($family, $set, $del) = @_; | 
| 919 |  |  |  |  |  |  |  | 
| 920 |  |  |  |  |  |  | my $db = $MON_DB{$family}; | 
| 921 |  |  |  |  |  |  |  | 
| 922 |  |  |  |  |  |  | my (@a, @c); | 
| 923 |  |  |  |  |  |  |  | 
| 924 |  |  |  |  |  |  | while (my ($k, $v) = each %$set) { | 
| 925 |  |  |  |  |  |  | exists $db->{$k} | 
| 926 |  |  |  |  |  |  | ? push @c, $k | 
| 927 |  |  |  |  |  |  | : push @a, $k; | 
| 928 |  |  |  |  |  |  | $db->{$k} = $v; | 
| 929 |  |  |  |  |  |  | } | 
| 930 |  |  |  |  |  |  |  | 
| 931 |  |  |  |  |  |  | delete @$db{@$del}; | 
| 932 |  |  |  |  |  |  |  | 
| 933 |  |  |  |  |  |  | $_->($db, \@a, \@c, $del) | 
| 934 |  |  |  |  |  |  | for values %{ $LOCAL_MON{$family} }; | 
| 935 |  |  |  |  |  |  | }; | 
| 936 |  |  |  |  |  |  |  | 
| 937 |  |  |  |  |  |  | ############################################################################# | 
| 938 |  |  |  |  |  |  | # configure | 
| 939 |  |  |  |  |  |  |  | 
| 940 |  |  |  |  |  |  | sub nodename { | 
| 941 | 0 |  |  | 0 | 0 | 0 | require POSIX; | 
| 942 | 0 |  |  |  |  | 0 | (POSIX::uname ())[1] | 
| 943 |  |  |  |  |  |  | } | 
| 944 |  |  |  |  |  |  |  | 
| 945 |  |  |  |  |  |  | sub _resolve($) { | 
| 946 | 0 |  |  | 0 |  | 0 | my ($nodeid) = @_; | 
| 947 |  |  |  |  |  |  |  | 
| 948 | 0 |  |  |  |  | 0 | my $cv = AE::cv; | 
| 949 | 0 |  |  |  |  | 0 | my @res; | 
| 950 |  |  |  |  |  |  |  | 
| 951 |  |  |  |  |  |  | $cv->begin (sub { | 
| 952 | 0 |  |  | 0 |  | 0 | my %seen; | 
| 953 |  |  |  |  |  |  | my @refs; | 
| 954 | 0 |  |  |  |  | 0 | for (sort { $a->[0] <=> $b->[0] } @res) { | 
|  | 0 |  |  |  |  | 0 |  | 
| 955 | 0 | 0 |  |  |  | 0 | push @refs, $_->[1] unless $seen{$_->[1]}++ | 
| 956 |  |  |  |  |  |  | } | 
| 957 | 0 |  |  |  |  | 0 | shift->send (@refs); | 
| 958 | 0 |  |  |  |  | 0 | }); | 
| 959 |  |  |  |  |  |  |  | 
| 960 | 0 |  |  |  |  | 0 | my $idx; | 
| 961 | 0 |  |  |  |  | 0 | for my $t (split /,/, $nodeid) { | 
| 962 | 0 |  |  |  |  | 0 | my $pri = ++$idx; | 
| 963 |  |  |  |  |  |  |  | 
| 964 | 0 | 0 |  |  |  | 0 | $t = length $t ? nodename . ":$t" : nodename | 
|  |  | 0 |  |  |  |  |  | 
| 965 |  |  |  |  |  |  | if $t =~ /^\d*$/; | 
| 966 |  |  |  |  |  |  |  | 
| 967 | 0 | 0 |  |  |  | 0 | my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0 | 
| 968 |  |  |  |  |  |  | or Carp::croak "$t: unparsable transport descriptor"; | 
| 969 |  |  |  |  |  |  |  | 
| 970 | 0 | 0 |  |  |  | 0 | $port = "0" if $port eq "*"; | 
| 971 |  |  |  |  |  |  |  | 
| 972 | 0 | 0 |  |  |  | 0 | if ($host eq "*") { | 
| 973 | 0 |  |  |  |  | 0 | $cv->begin; | 
| 974 |  |  |  |  |  |  |  | 
| 975 |  |  |  |  |  |  | my $get_addr = sub { | 
| 976 | 0 |  |  | 0 |  | 0 | my @addr; | 
| 977 |  |  |  |  |  |  |  | 
| 978 | 0 |  |  |  |  | 0 | require Net::Interface; | 
| 979 |  |  |  |  |  |  |  | 
| 980 |  |  |  |  |  |  | # Net::Interface hangs on some systems, so hope for the best | 
| 981 | 0 |  |  |  |  | 0 | local $SIG{ALRM} = 'DEFAULT'; | 
| 982 | 0 |  |  |  |  | 0 | alarm 2; | 
| 983 |  |  |  |  |  |  |  | 
| 984 | 0 |  |  |  |  | 0 | for my $if (Net::Interface->interfaces) { | 
| 985 |  |  |  |  |  |  | # we statically lower-prioritise ipv6 here, TODO :() | 
| 986 | 0 |  |  |  |  | 0 | for $_ ($if->address (Net::Interface::AF_INET ())) { | 
| 987 | 0 | 0 |  |  |  | 0 | next if /^\x7f/; # skip localhost etc. | 
| 988 | 0 |  |  |  |  | 0 | push @addr, $_; | 
| 989 |  |  |  |  |  |  | } | 
| 990 | 0 |  |  |  |  | 0 | for ($if->address (Net::Interface::AF_INET6 ())) { | 
| 991 |  |  |  |  |  |  | #next if $if->scope ($_) <= 2; | 
| 992 | 0 | 0 |  |  |  | 0 | next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast | 
| 993 | 0 |  |  |  |  | 0 | push @addr, $_; | 
| 994 |  |  |  |  |  |  | } | 
| 995 |  |  |  |  |  |  | } | 
| 996 |  |  |  |  |  |  |  | 
| 997 | 0 |  |  |  |  | 0 | alarm 0; | 
| 998 |  |  |  |  |  |  |  | 
| 999 |  |  |  |  |  |  | @addr | 
| 1000 | 0 |  |  |  |  | 0 | }; | 
|  | 0 |  |  |  |  | 0 |  | 
| 1001 |  |  |  |  |  |  |  | 
| 1002 | 0 |  |  |  |  | 0 | my @addr; | 
| 1003 |  |  |  |  |  |  |  | 
| 1004 | 0 |  |  |  |  | 0 | if (AnyEvent::WIN32) { | 
| 1005 |  |  |  |  |  |  | @addr = $get_addr->(); | 
| 1006 |  |  |  |  |  |  | } else { | 
| 1007 |  |  |  |  |  |  | # use a child process, as Net::Interface is big, and we need it only once. | 
| 1008 |  |  |  |  |  |  |  | 
| 1009 | 0 | 0 |  |  |  | 0 | pipe my $r, my $w | 
| 1010 |  |  |  |  |  |  | or die "pipe: $!"; | 
| 1011 |  |  |  |  |  |  |  | 
| 1012 | 0 | 0 |  |  |  | 0 | if (fork eq 0) { | 
| 1013 | 0 |  |  |  |  | 0 | close $r; | 
| 1014 | 0 |  |  |  |  | 0 | syswrite $w, pack "(C/a*)*", $get_addr->(); | 
| 1015 | 0 |  |  |  |  | 0 | require POSIX; | 
| 1016 | 0 |  |  |  |  | 0 | POSIX::_exit (0); | 
| 1017 |  |  |  |  |  |  | } else { | 
| 1018 | 0 |  |  |  |  | 0 | close $w; | 
| 1019 |  |  |  |  |  |  |  | 
| 1020 | 0 |  |  |  |  | 0 | my $addr; | 
| 1021 |  |  |  |  |  |  |  | 
| 1022 | 0 |  |  |  |  | 0 | 1 while sysread $r, $addr, 1024, length $addr; | 
| 1023 |  |  |  |  |  |  |  | 
| 1024 | 0 |  |  |  |  | 0 | @addr = unpack "(C/a*)*", $addr; | 
| 1025 |  |  |  |  |  |  | } | 
| 1026 |  |  |  |  |  |  | } | 
| 1027 |  |  |  |  |  |  |  | 
| 1028 | 0 |  |  |  |  | 0 | for my $ip (@addr) { | 
| 1029 | 0 |  |  |  |  | 0 | push @res, [ | 
| 1030 |  |  |  |  |  |  | $pri += 1e-5, | 
| 1031 |  |  |  |  |  |  | AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port | 
| 1032 |  |  |  |  |  |  | ]; | 
| 1033 |  |  |  |  |  |  | } | 
| 1034 | 0 |  |  |  |  | 0 | $cv->end; | 
| 1035 |  |  |  |  |  |  | } else { | 
| 1036 | 0 |  |  |  |  | 0 | $cv->begin; | 
| 1037 |  |  |  |  |  |  | AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub { | 
| 1038 | 0 |  |  | 0 |  | 0 | for (@_) { | 
| 1039 | 0 |  |  |  |  | 0 | my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3]; | 
| 1040 | 0 |  |  |  |  | 0 | push @res, [ | 
| 1041 |  |  |  |  |  |  | $pri += 1e-5, | 
| 1042 |  |  |  |  |  |  | AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service | 
| 1043 |  |  |  |  |  |  | ]; | 
| 1044 |  |  |  |  |  |  | } | 
| 1045 | 0 |  |  |  |  | 0 | $cv->end; | 
| 1046 | 0 |  |  |  |  | 0 | }; | 
| 1047 |  |  |  |  |  |  | } | 
| 1048 |  |  |  |  |  |  | } | 
| 1049 |  |  |  |  |  |  |  | 
| 1050 | 0 |  |  |  |  | 0 | $cv->end; | 
| 1051 |  |  |  |  |  |  |  | 
| 1052 | 0 |  |  |  |  | 0 | $cv | 
| 1053 |  |  |  |  |  |  | } | 
| 1054 |  |  |  |  |  |  |  | 
| 1055 |  |  |  |  |  |  | our @POST_CONFIGURE; | 
| 1056 |  |  |  |  |  |  |  | 
| 1057 |  |  |  |  |  |  | # not yet documented | 
| 1058 |  |  |  |  |  |  | sub post_configure(&) { | 
| 1059 | 1 | 50 |  | 1 | 0 | 3 | die "AnyEvent::MP::Kernel::post_configure must be called in void context" if defined wantarray; | 
| 1060 |  |  |  |  |  |  |  | 
| 1061 | 1 |  |  |  |  | 2 | push @POST_CONFIGURE, @_; | 
| 1062 | 1 |  | 33 |  |  | 5 | (shift @POST_CONFIGURE)->() while $NODE && @POST_CONFIGURE; | 
| 1063 |  |  |  |  |  |  | } | 
| 1064 |  |  |  |  |  |  |  | 
| 1065 |  |  |  |  |  |  | sub configure(@) { | 
| 1066 | 0 | 0 |  | 0 | 0 |  | unshift @_, "profile" if @_ & 1; | 
| 1067 | 0 |  |  |  |  |  | my (%kv) = @_; | 
| 1068 |  |  |  |  |  |  |  | 
| 1069 | 0 |  |  |  |  |  | my $profile = delete $kv{profile}; | 
| 1070 |  |  |  |  |  |  |  | 
| 1071 | 0 | 0 |  |  |  |  | $profile = nodename | 
| 1072 |  |  |  |  |  |  | unless defined $profile; | 
| 1073 |  |  |  |  |  |  |  | 
| 1074 | 0 |  |  |  |  |  | $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv; | 
| 1075 |  |  |  |  |  |  |  | 
| 1076 | 0 |  |  |  |  |  | $SECURE = $CONFIG->{secure}; | 
| 1077 |  |  |  |  |  |  |  | 
| 1078 | 0 | 0 |  |  |  |  | my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : "$profile/"; | 
| 1079 |  |  |  |  |  |  |  | 
| 1080 | 0 | 0 |  |  |  |  | $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n"; | 
| 1081 |  |  |  |  |  |  |  | 
| 1082 | 0 |  |  |  |  |  | my $node_obj = delete $NODE{$NODE}; # we do not support doing stuff before configure | 
| 1083 |  |  |  |  |  |  |  | 
| 1084 | 0 |  |  |  |  |  | $NODE = $node; | 
| 1085 |  |  |  |  |  |  |  | 
| 1086 | 0 |  |  |  |  |  | $NODE =~ s/%n/nodename/ge; | 
|  | 0 |  |  |  |  |  |  | 
| 1087 |  |  |  |  |  |  |  | 
| 1088 | 0 | 0 |  |  |  |  | if ($NODE =~ s!(?:(?<=/)$|%u)!$RUNIQ!g) { | 
| 1089 |  |  |  |  |  |  | # nodes with randomised node names do not need randomised port names | 
| 1090 | 0 |  |  |  |  |  | $UNIQ = ""; | 
| 1091 |  |  |  |  |  |  | } | 
| 1092 |  |  |  |  |  |  |  | 
| 1093 | 0 |  |  |  |  |  | $node_obj->{id} = $NODE; | 
| 1094 | 0 |  |  |  |  |  | $NODE{$NODE} = $node_obj; | 
| 1095 |  |  |  |  |  |  |  | 
| 1096 | 0 |  |  |  |  |  | my $seeds = $CONFIG->{seeds}; | 
| 1097 | 0 |  |  |  |  |  | my $binds = $CONFIG->{binds}; | 
| 1098 |  |  |  |  |  |  |  | 
| 1099 | 0 |  | 0 |  |  |  | $binds ||= ["*"]; | 
| 1100 |  |  |  |  |  |  |  | 
| 1101 | 0 |  |  |  |  |  | AE::log 8 => "node $NODE starting up."; | 
| 1102 |  |  |  |  |  |  |  | 
| 1103 | 0 |  |  |  |  |  | $BINDS = []; | 
| 1104 | 0 |  |  |  |  |  | %BINDS = (); | 
| 1105 |  |  |  |  |  |  |  | 
| 1106 | 0 |  |  |  |  |  | for (map _resolve $_, @$binds) { | 
| 1107 | 0 |  |  |  |  |  | for my $bind ($_->recv) { | 
| 1108 | 0 | 0 |  |  |  |  | my ($host, $port) = AnyEvent::Socket::parse_hostport $bind | 
| 1109 |  |  |  |  |  |  | or Carp::croak "$bind: unparsable local bind address"; | 
| 1110 |  |  |  |  |  |  |  | 
| 1111 |  |  |  |  |  |  | my $listener = AnyEvent::MP::Transport::mp_server | 
| 1112 |  |  |  |  |  |  | $host, | 
| 1113 |  |  |  |  |  |  | $port, | 
| 1114 |  |  |  |  |  |  | prepare => sub { | 
| 1115 | 0 |  |  | 0 |  |  | my (undef, $host, $port) = @_; | 
| 1116 | 0 |  |  |  |  |  | $bind = AnyEvent::Socket::format_hostport $host, $port; | 
| 1117 | 0 |  |  |  |  |  | 0 | 
| 1118 |  |  |  |  |  |  | }, | 
| 1119 | 0 |  |  |  |  |  | ; | 
| 1120 | 0 |  |  |  |  |  | $BINDS{$bind} = $listener; | 
| 1121 | 0 |  |  |  |  |  | push @$BINDS, $bind; | 
| 1122 |  |  |  |  |  |  | } | 
| 1123 |  |  |  |  |  |  | } | 
| 1124 |  |  |  |  |  |  |  | 
| 1125 | 0 |  |  |  |  |  | AE::log 9 => "running post config hooks and init."; | 
| 1126 |  |  |  |  |  |  |  | 
| 1127 |  |  |  |  |  |  | # might initialise Global, so need to do it before db_set | 
| 1128 | 0 |  |  | 0 |  |  | post_configure { }; | 
| 1129 |  |  |  |  |  |  |  | 
| 1130 | 0 |  |  |  |  |  | db_set "'l" => $NODE => $BINDS; | 
| 1131 |  |  |  |  |  |  |  | 
| 1132 | 0 |  |  |  |  |  | AE::log 8 => "node listens on [@$BINDS]."; | 
| 1133 |  |  |  |  |  |  |  | 
| 1134 |  |  |  |  |  |  | # connect to all seednodes | 
| 1135 | 0 |  |  |  |  |  | set_seeds map $_->recv, map _resolve $_, @$seeds; | 
| 1136 | 0 |  |  |  |  |  | master_search; | 
| 1137 |  |  |  |  |  |  |  | 
| 1138 |  |  |  |  |  |  | # save gobs of memory | 
| 1139 | 0 |  |  |  |  |  | undef &_resolve; | 
| 1140 | 0 |  |  | 0 |  |  | *configure = sub (@){ }; | 
| 1141 |  |  |  |  |  |  |  | 
| 1142 | 0 |  |  |  |  |  | AE::log 9 => "starting services."; | 
| 1143 |  |  |  |  |  |  |  | 
| 1144 | 0 |  |  |  |  |  | for (@{ $CONFIG->{services} }) { | 
|  | 0 |  |  |  |  |  |  | 
| 1145 | 0 | 0 |  |  |  |  | if (ref) { | 
|  |  | 0 |  |  |  |  |  | 
| 1146 | 0 |  |  |  |  |  | my ($func, @args) = @$_; | 
| 1147 | 0 |  |  |  |  |  | (load_func $func)->(@args); | 
| 1148 |  |  |  |  |  |  | } elsif (s/::$//) { | 
| 1149 | 0 |  |  |  |  |  | eval "require $_"; | 
| 1150 | 0 | 0 |  |  |  |  | die $@ if $@; | 
| 1151 |  |  |  |  |  |  | } else { | 
| 1152 | 0 |  |  |  |  |  | (load_func $_)->(); | 
| 1153 |  |  |  |  |  |  | } | 
| 1154 |  |  |  |  |  |  | } | 
| 1155 |  |  |  |  |  |  |  | 
| 1156 | 0 |  |  |  |  |  | eval "#line 1 \"(eval configure parameter)\"\n$CONFIG->{eval}"; | 
| 1157 | 0 | 0 |  |  |  |  | die "$@" if $@; | 
| 1158 |  |  |  |  |  |  | } | 
| 1159 |  |  |  |  |  |  |  | 
| 1160 |  |  |  |  |  |  | =back | 
| 1161 |  |  |  |  |  |  |  | 
| 1162 |  |  |  |  |  |  | =head1 LOGGING | 
| 1163 |  |  |  |  |  |  |  | 
| 1164 |  |  |  |  |  |  | AnyEvent::MP::Kernel logs high-level information about the current node, | 
| 1165 |  |  |  |  |  |  | when nodes go up and down, and most runtime errors. It also logs some | 
| 1166 |  |  |  |  |  |  | debugging and trace messages about network maintainance, such as seed | 
| 1167 |  |  |  |  |  |  | connections and global node management. | 
| 1168 |  |  |  |  |  |  |  | 
| 1169 |  |  |  |  |  |  | =head1 SEE ALSO | 
| 1170 |  |  |  |  |  |  |  | 
| 1171 |  |  |  |  |  |  | L. | 
| 1172 |  |  |  |  |  |  |  | 
| 1173 |  |  |  |  |  |  | =head1 AUTHOR | 
| 1174 |  |  |  |  |  |  |  | 
| 1175 |  |  |  |  |  |  | Marc Lehmann | 
| 1176 |  |  |  |  |  |  | http://home.schmorp.de/ | 
| 1177 |  |  |  |  |  |  |  | 
| 1178 |  |  |  |  |  |  | =cut | 
| 1179 |  |  |  |  |  |  |  | 
| 1180 |  |  |  |  |  |  | 1 | 
| 1181 |  |  |  |  |  |  |  |