File Coverage

blib/lib/AnyEvent/MP/Global.pm
Criterion Covered Total %
statement 13 15 86.6
branch n/a
condition n/a
subroutine 5 5 100.0
pod n/a
total 18 20 90.0


line stmt bran cond sub pod time code
1             =head1 NAME
2              
3             AnyEvent::MP::Global - some network-global services
4              
5             =head1 SYNOPSIS
6              
7             use AnyEvent::MP::Global;
8              
9             =head1 DESCRIPTION
10              
11             This module maintains a fully-meshed network, if possible, and tries to
12             ensure that we are connected to at least one other node.
13              
14             It also manages named port groups - ports can register themselves in any
15             number of groups that will be available network-wide, which is great for
16             discovering services.
17              
18             Running it on one node will automatically run it on all nodes, although,
19             at the moment, the global service is started by default anyways.
20              
21             =head1 GLOBALS AND FUNCTIONS
22              
23             =over 4
24              
25             =cut
26              
27             package AnyEvent::MP::Global;
28              
29 1     1   1326 use common::sense;
  1         2  
  1         8  
30 1     1   60 use Carp ();
  1         22  
  1         16  
31              
32 1     1   5 use AnyEvent ();
  1         2  
  1         16  
33 1     1   4 use AnyEvent::Util ();
  1         2  
  1         18  
34              
35 1     1   44 use AnyEvent::MP;
  0            
  0            
36             use AnyEvent::MP::Kernel;
37             use AnyEvent::MP::Transport ();
38              
39             use base "Exporter";
40              
41             our @EXPORT = qw(
42             grp_reg
43             grp_get
44             grp_mon
45             );
46              
47             our $GLOBAL_VERSION = 0;
48              
49             our %ON_SETUP; # take note: not public
50              
51             our %addr; # port ID => [address...] mapping
52              
53             our %port; # our rendezvous port on the other side
54             our %lreg; # local registry, name => [pid...]
55             our %lmon; # local registry monitoring name,pid => mon
56             our %greg; # global regstry, name => pid => undef
57             our %gmon; # group monitoring, group => [$cb...]
58              
59             our $nodecnt;
60              
61             $AnyEvent::MP::Kernel::WARN->(7, "starting global service.");
62              
63             #############################################################################
64             # seednodes
65              
66             our $MASTER; # our current master (which we regularly query for net updates)
67              
68             our %SEEDME; # $node => $port
69             our @SEEDS;
70             our %SEEDS; # just to check whether a seed is a seed
71             our %SEED_CONNECT;
72             our $SEED_WATCHER;
73              
74             push @AnyEvent::MP::Transport::HOOK_CONNECT, sub {
75             my $peer = $_[0]{local_greeting}{peeraddr};
76             return unless exists $SEEDS{$peer};
77             $SEED_CONNECT{$peer} = 2;
78             };
79              
80             push @AnyEvent::MP::Transport::HOOK_GREETING, sub {
81             # we rely on untrusted data here (the remote node name)
82             # this is hopefully ok, as we override it on successful
83             # connects, and this can at most be used for DOSing,
84             # which is easy when you can do MITM.
85             my $peer = $_[0]{local_greeting}{peeraddr};
86             return unless exists $SEEDS{$peer};
87             $SEEDS{$peer} ||= $_[0]{remote_node};
88             };
89              
90             push @AnyEvent::MP::Transport::HOOK_CONNECTED, sub {
91             my $peer = $_[0]{local_greeting}{peeraddr};
92             return unless exists $SEEDS{$peer};
93             $SEEDS{$peer} = $_[0]{remote_node};
94             };
95              
96             push @AnyEvent::MP::Transport::HOOK_DESTROY, sub {
97             delete $SEED_CONNECT{$_[0]{local_greeting}{peeraddr}};
98              
99             # check if we contacted ourselves, so nuke this seed
100             if (exists $_[0]{seed} && $_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
101             # $AnyEvent::MP::Kernel::WARN->(0,"avoiding seed $_[0]{seed}\n");#d#
102             delete $SEEDS{$_[0]{seed}};
103             }
104             };
105              
106             sub seed_connect {
107             my ($seed) = @_;
108              
109             my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
110             or Carp::croak "$seed: unparsable seed address";
111              
112             return if $SEED_CONNECT{$seed};
113             return if defined $SEEDS{$seed} && node_is_up $SEEDS{$seed};
114              
115             $AnyEvent::MP::Kernel::WARN->(9, "trying connect to seed node $seed.");
116              
117             # ughhh
118             $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect $host, $port,
119             seed => $seed,
120             sub {
121             $SEED_CONNECT{$seed} = 1;
122             },
123             ;
124             }
125              
126             sub more_seeding {
127             my $int = List::Util::max 1,
128             $AnyEvent::MP::Kernel::CONFIG->{connect_interval}
129             * ($nodecnt ? keys %AnyEvent::MP::Kernel::NODE : 1)
130             - rand;
131              
132             $SEED_WATCHER = AE::timer $int, 0, \&more_seeding;
133              
134             @SEEDS = keys %SEEDS unless @SEEDS;
135             return unless @SEEDS;
136              
137             seed_connect splice @SEEDS, rand @SEEDS, 1;
138             }
139              
140             sub set_seeds(@) {
141             @SEEDS{@_} = ();
142              
143             $SEED_WATCHER ||= AE::timer 5, $AnyEvent::MP::Kernel::MONITOR_TIMEOUT, \&more_seeding;
144              
145             after 0.100 * rand, \&more_seeding
146             for 1 .. keys %SEEDS;
147             }
148              
149             sub up_seeds() {
150             grep node_is_up $_, values %SEEDS
151             }
152              
153             sub node_is_seed($) {
154             grep $_ eq $_[0], grep defined, values %SEEDS
155             }
156              
157             # returns all (up) seed nodes, or all nodes if no seednodes are up/known
158             sub route_nodes {
159             my @seeds = up_seeds;
160             @seeds = up_nodes unless @seeds;
161             @seeds
162             }
163              
164             #############################################################################
165              
166             sub _change {
167             my ($group, $add, $del) = @_;
168              
169             my $kv = $greg{$group} ||= {};
170              
171             delete @$kv{@$del};
172             @$kv{@$add} = ();
173              
174             my $ports = [keys %$kv];
175             $_->($ports, $add, $del) for @{ $gmon{$group} };
176             }
177              
178             sub unreg_groups($) {
179             my ($node) = @_;
180              
181             my $qr = qr/^\Q$node\E(?:#|$)/;
182             my @del;
183              
184             while (my ($group, $ports) = each %greg) {
185             @del = grep /$qr/, keys %$ports;
186             _change $group, [], \@del
187             if @del;
188             }
189             }
190              
191             sub set_groups($$) {
192             my ($node, $lreg) = @_;
193              
194             while (my ($k, $v) = each %$lreg) {
195             _change $k, $v, [];
196             }
197             }
198              
199             =item $guard = grp_reg $group, $port
200              
201             Register the given (local!) port in the named global group C<$group>.
202              
203             The port will be unregistered automatically when the port is destroyed.
204              
205             When not called in void context, then a guard object will be returned that
206             will also cause the name to be unregistered when destroyed.
207              
208             =cut
209              
210             # unregister local port
211             sub unregister {
212             my ($port, $group) = @_;
213              
214             delete $lmon{"$group\x00$port"};
215             @{ $lreg{$group} } = grep $_ ne $port, @{ $lreg{$group} };
216              
217             _change $group, [], [$port];
218              
219             snd $_, reg0 => $group, $port
220             for values %port;
221             }
222              
223             # register local port
224             sub grp_reg($$) {
225             my ($group, $port) = @_;
226              
227             port_is_local $port
228             or Carp::croak "AnyEvent::MP::Global::grp_reg can only be called for local ports, caught";
229              
230             grep $_ eq $port, @{ $lreg{$group} }
231             and Carp::croak "'$group': group already registered, cannot register a second time";
232              
233             $lmon{"$group\x00$port"} = mon $port, sub { unregister $port, $group };
234             push @{ $lreg{$group} }, $port;
235              
236             snd $_, reg1 => $group, $port
237             for values %port;
238              
239             _change $group, [$port], [];
240              
241             defined wantarray && AnyEvent::Util::guard { unregister $port, $group }
242             }
243              
244             =item $ports = grp_get $group
245              
246             Returns all the ports currently registered to the given group (as
247             read-only(!) array reference). When the group has no registered members,
248             return C<undef>.
249              
250             =cut
251              
252             sub grp_get($) {
253             my @ports = keys %{ $greg{$_[0]} };
254             @ports ? \@ports : undef
255             }
256              
257             =item $guard = grp_mon $group, $callback->($ports, $add, $del)
258              
259             Installs a monitor on the given group. Each time there is a change it
260             will be called with the current group members as an arrayref as the
261             first argument. The second argument only contains ports added, the third
262             argument only ports removed.
263              
264             Unlike C<grp_get>, all three arguments will always be array-refs, even if
265             the array is empty. None of the arrays must be modified in any way.
266              
267             The first invocation will be with the first two arguments set to the
268             current members, as if all of them were just added, but only when the
269             group is actually non-empty.
270              
271             Optionally returns a guard object that uninstalls the watcher when it is
272             destroyed.
273              
274             =cut
275              
276             sub grp_mon($$) {
277             my ($grp, $cb) = @_;
278              
279             AnyEvent::MP::Kernel::delay sub {
280             return unless $cb;
281              
282             push @{ $gmon{$grp} }, $cb;
283             $cb->(((grp_get $grp) || return) x 2, []);
284             };
285              
286             defined wantarray && AnyEvent::Util::guard {
287             my @mon = grep $_ != $cb, @{ delete $gmon{$grp} };
288             $gmon{$grp} = \@mon if @mon;
289             undef $cb;
290             }
291             }
292              
293             sub start_node {
294             my ($node) = @_;
295              
296             return if exists $port{$node};
297             return if $node eq $NODE; # do not connect to ourselves
298              
299             # establish connection
300             my $port = $port{$node} = spawn $node, "AnyEvent::MP::Global::connect", $GLOBAL_VERSION, $NODE;
301              
302             mon $port, sub {
303             unreg_groups $node;
304             delete $port{$node};
305             };
306              
307             snd $port, addr => $AnyEvent::MP::Kernel::LISTENER;
308             snd $port, nodes => \%addr if %addr;
309             snd $port, set => \%lreg if %lreg;
310             snd $port, "setup"; # tell the other side that we are in business now
311             }
312              
313             # other nodes connect via this
314             sub connect {
315             my ($version, $node) = @_;
316              
317             (int $version) == (int $GLOBAL_VERSION)
318             or die "node version mismatch (requested $version; we have $GLOBAL_VERSION)";
319              
320             # monitor them, silently die
321             mon $node, psub {
322             delete $SEEDME{$node};
323             kil $SELF;
324             };
325              
326             rcv $SELF,
327             setup => sub {
328             $_->($node) for values %ON_SETUP;
329             },
330             addr => sub {
331             my $addresses = shift;
332             $AnyEvent::MP::Kernel::WARN->(9, "$node told us its addresses (@$addresses).");
333             $addr{$node} = $addresses;
334              
335             # delay broadcast by a random amount, to avoid nodes connecting to each other
336             # at the same time.
337             after 2 + rand, sub {
338             for my $slave (keys %SEEDME) {
339             snd $port{$slave} || next, nodes => { $node => $addresses };
340             }
341             };
342             },
343             nodes => sub {
344             my ($kv) = @_;
345              
346             use JSON::XS;#d#
347             my $kv_txt = JSON::XS->new->encode ($kv);#d#
348             $AnyEvent::MP::Kernel::WARN->(9, "$node told us it knows about $kv_txt.");#d#
349              
350             while (my ($id, $addresses) = each %$kv) {
351             my $node = AnyEvent::MP::Kernel::add_node $id;
352             $node->connect (@$addresses);
353             start_node $id;
354             }
355             },
356             set => sub {
357             set_groups $node, shift;
358             },
359             find => sub {
360             my ($othernode) = @_;
361              
362             $AnyEvent::MP::Kernel::WARN->(9, "$node asked us to find $othernode.");
363             snd $port{$node}, nodes => { $othernode => $addr{$othernode} }
364             if $addr{$othernode};
365             },
366             reg0 => sub {
367             _change $_[0], [], [$_[1]];
368             },
369             reg1 => sub {
370             _change $_[0], [$_[1]], [];
371             },
372              
373             # some node asks us to provide network updates
374             seedme0 => sub {
375             $AnyEvent::MP::Kernel::WARN->(9, "$node asked us to NOT seed it.");#d#
376             delete $SEEDME{$node};
377             },
378             seedme1 => sub {
379             $AnyEvent::MP::Kernel::WARN->(9, "$node asked us to seed it.");#d#
380             $SEEDME{$node} = ();
381              
382             # for good measure
383             snd $port{$node}, nodes => \%addr if %addr;
384             },
385             ;
386             }
387              
388             sub set_master($) {
389             return if $MASTER eq $_[0];
390              
391             snd $port{$MASTER}, "seedme0"
392             if $MASTER && node_is_up $MASTER;
393              
394             $MASTER = $_[0];
395              
396             if ($MASTER) {
397             snd $port{$MASTER}, "seedme1";
398             $AnyEvent::MP::Kernel::WARN->(7, "selected new master: $MASTER.");
399             } else {
400             $AnyEvent::MP::Kernel::WARN->(1, "no contact to any other node, cannot seed.");
401             }
402             }
403              
404             sub mon_node {
405             my ($node, $is_up) = @_;
406              
407             if ($is_up) {
408             ++$nodecnt;
409             start_node $node;
410              
411             if (node_is_seed $node) {
412             if (node_is_seed $MASTER) {
413             my @SEEDS = up_seeds;
414              
415             # switching here with lower chance roughly hopefully still gives us
416             # an equal selection.
417             set_master $node
418             if 1 < rand @SEEDS;
419             } else {
420             # a seed always beats a non-seed
421             set_master $node;
422             }
423             }
424             }
425              
426             # select a new(?) master, if required
427             unless ($MASTER and node_is_up $MASTER) {
428             if (my @SEEDS = up_seeds) {
429             set_master $SEEDS[rand @SEEDS];
430             } else {
431             # select "last" non-seed node
432             set_master +(sort +up_nodes)[-1];
433             }
434             }
435              
436             unless ($is_up) {
437             --$nodecnt;
438             more_seeding unless $nodecnt;
439             unreg_groups $node;
440              
441             # forget about the node
442             delete $addr{$node};
443              
444             # ask our master for quick recovery
445             snd $port{$MASTER}, find => $node
446             if $MASTER;
447             }
448             }
449              
450             mon_node $_, 1
451             for up_nodes;
452              
453             mon_nodes \&mon_node;
454              
455             =back
456              
457             =head1 SEE ALSO
458              
459             L<AnyEvent::MP>.
460              
461             =head1 AUTHOR
462              
463             Marc Lehmann <schmorp@schmorp.de>
464             http://home.schmorp.de/
465              
466             =cut
467              
468             1
469