File Coverage

blib/lib/AnyEvent/MP/Global.pm
Criterion Covered Total %
statement 24 87 27.5
branch 0 26 0.0
condition 0 16 0.0
subroutine 8 20 40.0
pod 0 9 0.0
total 32 158 20.2


line stmt bran cond sub pod time code
1             =head1 NAME
2              
3             AnyEvent::MP::Global - network backbone services
4              
5             =head1 SYNOPSIS
6              
7             use AnyEvent::MP::Global;
8              
9             =head1 DESCRIPTION
10              
11             This module is usually run (or started on) seed nodes and provides a
12             variety of services to connected nodes, such as the distributed database.
13              
14             The global nodes form a fully-meshed network, that is, all global nodes
15             currently maintain connections to all other global nodes.
16              
17             Loading this module (e.g. as a service) transforms the local node into a
18             global node. There are no user-servicable parts inside.
19              
20             For a limited time, this module also exports some AEMP 1.x compatibility
21             functions (C, C and C).
22              
23             =cut
24              
25             package AnyEvent::MP::Global;
26              
27 1     1   572 use common::sense;
  1         2  
  1         4  
28 1     1   36 use Carp ();
  1         1  
  1         10  
29 1     1   4 use List::Util ();
  1         1  
  1         9  
30              
31 1     1   3 use AnyEvent ();
  1         2  
  1         11  
32              
33 1     1   3 use AnyEvent::MP;
  1         1  
  1         177  
34 1     1   5 use AnyEvent::MP::Kernel;
  1         2  
  1         71  
35              
36             AE::log 7 => "starting global service.";
37              
38             #############################################################################
39             # node protocol parts for global nodes
40              
41             package AnyEvent::MP::Kernel;
42              
43 1     1   5 use JSON::XS ();
  1         2  
  1         1708  
44              
45             # TODO: this is ugly (classical use vars vs. our),
46             # maybe this should go into MP::Kernel
47              
48             # "import" from Kernel
49             our %NODE;
50             our $NODE;
51             #our $GLOBAL;
52             our $SRCNODE; # the origin node id
53             our %NODE_REQ;
54             our %GLOBAL_NODE;
55             our $GLOBAL;
56              
57             # only in global code
58             our %GLOBAL_SLAVE;
59             our %GLOBAL_MON; # monitors {family}
60              
61             our %GLOBAL_DB; # all local databases, merged - empty on non-global nodes
62             our %LOCAL_DBS; # local databases of other nodes (global and slave)
63             our %LOCAL_DB; # this node database
64              
65             # broadcasts a message to all other global nodes
66             sub g_broadcast {
67             snd $_, @_
68 0     0 0   for keys %GLOBAL_NODE;
69             }
70              
71             # add/replace/del inside a family in the database
72             # @$del must not contain any key in %$set
73             sub g_upd {
74 0     0 0   my ($node, $family, $set, $del) = @_;
75              
76 0   0       my $ldb = $LOCAL_DBS{$node}{$family} ||= {};
77 0   0       my $gdb = $GLOBAL_DB {$family} ||= {};
78              
79 0           my %local_set; # extra local set's created by deletes
80              
81             # add/replace keys
82 0           while (my ($k, $v) = each %$set) {
83             #TODO# optimize duplicate gdb-set's, to some extent, maybe
84             # but is probably difficult and slow, so don't for the time being.
85              
86             $ldb->{$k} =
87 0           $gdb->{$k} = $v;
88             }
89              
90 0           my (@del_local, @del_global); # actual deletes for other global nodes / our slaves
91              
92             # take care of deletes
93 0           for my $k (@$del) {
94 0           delete $ldb->{$k};
95              
96 0 0         if (my @other = grep exists $LOCAL_DBS{$_}{$family}{$k}, keys %LOCAL_DBS) {
97             # key exists in some other db shard(s)
98              
99             # if there is a local one, we have to update
100             # otherwise, we update and delete on other globals
101              
102 0 0   0     if (my $local = List::Util::first { exists $GLOBAL_SLAVE{$_} } @other) {
  0            
103             $set->{$k} =
104             $gdb->{$k} = $LOCAL_DBS{$local}{$family}{$k}
105 0 0         unless sv_eq $gdb->{$k}, $LOCAL_DBS{$local}{$family}{$k};
106              
107             } else {
108             # must be in a global one then
109 0     0     my $global = List::Util::first { !exists $GLOBAL_SLAVE{$_} } @other;
  0            
110              
111 0           push @del_global, $k;
112              
113             $local_set{$k} =
114             $gdb->{$k} = $LOCAL_DBS{$global}{$family}{$k}
115 0 0         unless sv_eq $gdb->{$k}, $LOCAL_DBS{$global}{$family}{$k};
116             }
117             } else {
118 0           delete $gdb->{$k};
119              
120             # this was the only one, so delete locally
121 0           push @del_local, $k;
122             # and globally, if it's a local key
123 0 0         push @del_global, $k if exists $GLOBAL_SLAVE{$node};
124             }
125             }
126              
127             # family could be empty now
128 0 0         delete $GLOBAL_DB {$family} unless %$gdb;
129 0 0         delete $LOCAL_DBS{$node}{$family} unless %$ldb;
130              
131             # tell other global nodes any changes in our database
132             g_broadcast g_upd => $family, $set, \@del_global
133 0 0 0       if exists $GLOBAL_SLAVE{$node} && (%$set || @del_global);
      0        
134              
135             # tell subscribers we have changed the family
136 0 0 0       if (%$set || %local_set || @del_local) {
      0        
137 0           @$set{keys %local_set} = values %local_set;
138              
139             snd $_ => g_chg2 => $family, $set, \@del_local
140 0           for keys %{ $GLOBAL_MON{$family} };
  0            
141             }
142             }
143              
144             # set the whole (node-local) database - previous value must be empty
145             sub g_set($$) {
146 0     0 0   my ($node, $db) = @_;
147              
148 0           while (my ($f, $k) = each %$db) {
149 0           g_upd $node, $f, $k;
150             }
151             }
152              
153             # delete all keys from a database
154             sub g_clr($) {
155 0     0 0   my ($node) = @_;
156              
157 0           my $db = $LOCAL_DBS{$node};
158              
159 0           while (my ($f, $k) = each %$db) {
160 0           g_upd $node, $f, undef, [keys %$k];
161             }
162              
163 0           delete $LOCAL_DBS{$node};
164             }
165              
166             # gather node databases from slaves
167              
168             # other node wants to make us the master and sends us their db
169             $NODE_REQ{g_slave} = sub {
170             my ($db) = @_
171             or return; # empty g_slave is used to start global service
172              
173             my $node = $SRCNODE;
174             undef $GLOBAL_SLAVE{$node};
175             g_set $node, $db;
176             };
177              
178             # other global node sends us their database
179             $NODE_REQ{g_set} = sub {
180             my ($db) = @_;
181              
182             # need to get it here, because g_set destroys it
183             my $binds = $db->{"'l"}{$SRCNODE};
184              
185             g_set $SRCNODE, $db;
186              
187             # a remote node always has to provide their listeners. for global
188             # nodes, we mirror their 'l locally, just as we also set 'g.
189             # that's not very efficient, but ensures that global nodes
190             # find each other.
191             db_set "'l" => $SRCNODE => $binds;
192             };
193              
194             # other node (global and slave) sends us a family update
195             $NODE_REQ{g_upd} = sub {
196             &g_upd ($SRCNODE, @_);
197             };
198              
199             # slave node wants to know the listeners of a node
200             $NODE_REQ{g_find} = sub {
201             my ($node) = @_;
202              
203             snd $SRCNODE, g_found => $node, $GLOBAL_DB{"'l"}{$node};
204             };
205              
206             $NODE_REQ{g_db_family} = sub {
207             my ($family, $id) = @_;
208             snd $SRCNODE, g_reply => $id, $GLOBAL_DB{$family} || {};
209             };
210              
211             $NODE_REQ{g_db_keys} = sub {
212             my ($family, $id) = @_;
213             snd $SRCNODE, g_reply => $id, [keys %{ $GLOBAL_DB{$family} } ];
214             };
215              
216             $NODE_REQ{g_db_values} = sub {
217             my ($family, $id) = @_;
218             snd $SRCNODE, g_reply => $id, [values %{ $GLOBAL_DB{$family} } ];
219             };
220              
221             # monitoring
222              
223             sub g_disconnect($) {
224 0     0 0   my ($node) = @_;
225              
226 0           delete $GLOBAL_NODE{$node}; # also done in Kernel.pm, but doing it here avoids overhead
227              
228 0           db_del "'g" => $node;
229 0           db_del "'l" => $node;
230 0           g_clr $node;
231              
232 0 0         if (my $mon = delete $GLOBAL_SLAVE{$node}) {
233 0           while (my ($f, $fv) = each %$mon) {
234             delete $GLOBAL_MON{$f}{$_}
235 0           for keys %$fv;
236              
237             delete $GLOBAL_MON{$f}
238 0 0         unless %{ $GLOBAL_MON{$f} };
  0            
239             }
240             }
241             }
242              
243             # g_mon0 family - stop monitoring
244             $NODE_REQ{g_mon0} = sub {
245             delete $GLOBAL_MON{$_[0]}{$SRCNODE};
246             delete $GLOBAL_MON{$_[0]} unless %{ $GLOBAL_MON{$_[0]} };
247              
248             delete $GLOBAL_SLAVE{$SRCNODE}{$_[0]};
249             };
250              
251             # g_mon1 family key - start monitoring
252             $NODE_REQ{g_mon1} = sub {
253             undef $GLOBAL_SLAVE{$SRCNODE}{$_[0]};
254             undef $GLOBAL_MON{$_[0]}{$SRCNODE};
255              
256             snd $SRCNODE, g_chg1 => $_[0], $GLOBAL_DB{$_[0]};
257             };
258              
259             #############################################################################
260             # switch to global mode
261              
262             # connect from a global node
263             sub g_global_connect {
264 0     0 0   my ($node) = @_;
265              
266             # each node puts the set of connected global nodes into
267             # 'g - this causes a big duplication and mergefest, but
268             # is the easiest way to ensure global nodes have a list
269             # of all other global nodes.
270             # we also mirror 'l as soon as we receive it, causing
271             # even more overhead.
272 0           db_set "'g" => $node;
273              
274             # global nodes send all local databases of their slaves, merged,
275             # as their database to other global nodes
276 0           my %db;
277              
278 0           while (my ($k, $v) = each %LOCAL_DBS) {
279 0 0         next unless exists $GLOBAL_SLAVE{$k};
280              
281 0           while (my ($f, $fv) = each %$v) {
282 0           while (my ($k, $kv) = each %$fv) {
283 0           $db{$f}{$k} = $kv;
284             }
285             }
286             }
287              
288 0           snd $node => g_set => \%db;
289             }
290              
291             # overrides request in Kernel
292             $NODE_REQ{g_global} = sub {
293             g_disconnect $SRCNODE; # usually a nop, but not when a normal node becomes global
294             undef $GLOBAL_NODE{$SRCNODE}; # same as in Kernel.pm
295             g_global_connect $SRCNODE;
296             };
297              
298             # delete data from other nodes on node-down
299             mon_nodes sub {
300             if ($_[1]) {
301             snd $_[0] => "g_global"; # tell everybody that we are a global node
302             } else {
303             g_disconnect $_[0];
304             }
305             };
306              
307             # now, this is messy
308             AnyEvent::MP::Kernel::post_configure {
309             # enable global mode
310             $GLOBAL = 1;
311              
312             # global nodes are their own masters - this
313             # resends global requests and sets the local database.
314             master_set $NODE;
315              
316             # now add us to the set of global nodes
317             db_set "'g" => $NODE;
318              
319             # tell other nodes that we are global now
320             for (up_nodes) {
321             snd $_, "g_global";
322              
323             # if the node is global, connect
324             g_global_connect $_
325             if exists $GLOBAL_NODE{$_};
326             }
327              
328             # from here on we should be able to act "normally"
329              
330             # maintain connections to all global nodes that we know of
331             db_mon "'g" => sub {
332             keepalive_add $_ for @{ $_[1] };
333             keepalive_del $_ for @{ $_[3] };
334             };
335             };
336              
337             #############################################################################
338             # compatibility functions for aemp 1.0
339              
340             package AnyEvent::MP::Global;
341              
342 1     1   6 use base "Exporter";
  1         2  
  1         346  
343             our @EXPORT = qw(grp_reg grp_get grp_mon);
344              
345             sub grp_reg($$) {
346 0     0 0   &db_reg
347             }
348              
349             sub grp_get($) {
350 0     0 0   my @ports = keys %{ $AnyEvent::MP::Kernel::GLOBAL_DB{$_[0]} };
  0            
351              
352 0 0         @ports ? \@ports : undef
353             }
354              
355             sub grp_mon($$) {
356 0     0 0   my ($grp, $cb) = @_;
357              
358             db_mon $grp => sub {
359 0     0     my ($ports, $add, $chg, $del) = @_;
360              
361 0           $cb->([keys %$ports], $add, $del);
362 0           };
363             }
364              
365             =head1 SEE ALSO
366              
367             L.
368              
369             =head1 AUTHOR
370              
371             Marc Lehmann
372             http://home.schmorp.de/
373              
374             =cut
375              
376             1
377