File Coverage

blib/lib/AnyEvent/MP/Node.pm
Criterion Covered Total %
statement 27 131 20.6
branch 0 44 0.0
condition 0 17 0.0
subroutine 9 31 29.0
pod 0 3 0.0
total 36 226 15.9


line stmt bran cond sub pod time code
1             =head1 NAME
2              
3             AnyEvent::MP::Node - represent a node
4              
5             =head1 SYNOPSIS
6              
7             use AnyEvent::MP::Node;
8              
9             =head1 DESCRIPTION
10              
11             This is an internal utility module, horrible to look at, so don't.
12              
13             =cut
14              
15             package AnyEvent::MP::Node; # base class for nodes
16              
17 1     1   566 use common::sense;
  1         2  
  1         5  
18              
19 1     1   43 use AnyEvent ();
  1         1  
  1         11  
20 1     1   3 use AnyEvent::Socket ();
  1         1  
  1         10  
21              
22 1     1   3 use AnyEvent::MP::Transport ();
  1         1  
  1         146  
23              
24             sub new {
25 1     1 0 4 my ($self, $id) = @_;
26              
27 1         3 $self = bless { id => $id }, $self;
28              
29             # register
30 1         3 $AnyEvent::MP::Kernel::NODE{$id} = $self;
31              
32 1         4 $self->init;
33 1         10 $self->transport_reset;
34              
35 1         2 $self
36             }
37              
38             sub DESTROY {
39             # unregister
40 0     0   0 delete $AnyEvent::MP::Kernel::NODE{$_[0]{id}};
41             }
42              
43       1 0   sub init {
44             #
45             }
46              
47             sub send {
48 0     0 0 0 &{ shift->{send} }
  0         0  
49             }
50              
51             # nodes reachable via the network
52             package AnyEvent::MP::Node::Remote; # a remote node
53              
54 1     1   5 use base "AnyEvent::MP::Node";
  1         2  
  1         1144  
55              
56             # called at init time, mostly sets {send}
57             sub transport_reset {
58 0     0   0 my ($self) = @_;
59              
60 0         0 delete $self->{transport};
61              
62 0         0 Scalar::Util::weaken $self;
63              
64             $self->{send} = sub {
65 0     0   0 push @{$self->{queue}}, shift;
  0         0  
66 0         0 $self->connect;
67 0         0 };
68             }
69              
70             # called each time we fail to establish a connection,
71             # or the existing connection failed
72             sub transport_error {
73 0     0   0 my ($self, @reason) = @_;
74              
75 0         0 my $no_transport = !$self->{transport};
76              
77 0         0 delete $self->{connect_w};
78 0         0 delete $self->{connect_to};
79              
80 0         0 delete $self->{queue};
81 0         0 $self->transport_reset;
82              
83 0 0       0 if (my $mon = delete $self->{lmon}) {
84 0         0 $_->(@reason) for map @$_, values %$mon;
85             }
86              
87 0 0       0 AnyEvent::MP::Kernel::_inject_nodeevent ($self, 0, @reason)
88             unless $no_transport;
89              
90             # we weaken the node reference, so it can go away if unused
91             Scalar::Util::weaken $AnyEvent::MP::Kernel::NODE{$self->{id}}
92 0 0       0 unless $self->{connect_to};
93              
94 0         0 AE::log 9 => "@reason";
95             }
96              
97             # called after handshake was successful
98             sub transport_connect {
99 0     0   0 my ($self, $transport) = @_;
100              
101 0         0 delete $self->{trial};
102              
103             $self->transport_error (transport_error => $self->{id}, "switched connections")
104 0 0       0 if $self->{transport};
105              
106 0         0 delete $self->{connect_w};
107 0         0 delete $self->{connect_to};
108              
109 0         0 $self->{transport} = $transport;
110              
111 0         0 my $transport_send = $transport->{send};
112              
113 0         0 AnyEvent::MP::Kernel::_inject_nodeevent ($self, 1);
114              
115 0         0 $self->{send} = $transport_send;
116              
117             $transport_send->($_)
118 0 0       0 for @{ delete $self->{queue} || [] };
  0         0  
119             }
120              
121             sub connect {
122 0     0   0 my ($self) = @_;
123              
124 0 0       0 return if $self->{transport};
125 0 0       0 return if $self->{connect_w};
126              
127             # we unweaken the node reference, in case it was weakened before
128             $AnyEvent::MP::Kernel::NODE{$self->{id}}
129 0         0 = $AnyEvent::MP::Kernel::NODE{$self->{id}};
130              
131 0         0 Scalar::Util::weaken $self;
132              
133             $self->{connect_to} ||= AE::timer $AnyEvent::MP::Kernel::CONFIG->{connect_interval}, 0, sub {
134 0     0   0 $self->transport_error (transport_error => $self->{id}, "connect timeout");
135 0   0     0 };
136              
137             # maybe @$addresses?
138 0         0 my $addresses = $AnyEvent::MP::Kernel::GLOBAL_DB{"'l"}{$self->{id}};
139              
140 0 0       0 if ($addresses) {
141 0         0 $self->connect_to ($addresses);
142             } else {
143             # on global nodes, all bets are off now - we either know the node, or we don't
144 0 0       0 if ($AnyEvent::MP::Kernel::GLOBAL) {
145 0         0 $self->transport_error (transport_error => $self->{id}, "no known address");
146             } else {
147 0         0 AnyEvent::MP::Kernel::g_find ($self->{id});
148             }
149             }
150             }
151              
152             sub connect_to {
153 0     0   0 my ($self, $addresses) = @_;
154              
155 0 0       0 return if $self->{transport};
156 0 0       0 return if $self->{connect_w};
157              
158 0 0       0 unless (@$addresses) {
159 0         0 $self->transport_error (transport_error => $self->{id}, "no known address");
160 0         0 return;
161             }
162            
163 0         0 AE::log 9 => "connecting to $self->{id} with [@$addresses]";
164              
165 0         0 my $monitor = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
166 0         0 my $interval = $AnyEvent::MP::Kernel::CONFIG->{connect_interval};
167              
168 0 0       0 $interval = ($monitor - $interval) / @$addresses
169             if ($monitor - $interval) / @$addresses < $interval;
170              
171 0 0       0 $interval = 0.4 if $interval < 0.4;
172              
173 0         0 my @endpoints = reverse @$addresses;
174              
175             $self->{connect_w} = AE::timer 0, $interval * (0.9 + 0.1 * rand), sub {
176 0 0   0   0 my $endpoint = pop @endpoints
177             or return;
178              
179 0         0 AE::log 9 => "connecting to $self->{id} at $endpoint";
180              
181 0   0     0 $self->{trial}{$endpoint} ||= do {
182 0 0       0 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
183             or return AE::log critical => "$self->{id}: '$endpoint' is not a resolved node reference.";
184              
185             AnyEvent::MP::Transport::mp_connect
186             $host, $port,
187 0         0 sub { delete $self->{trial}{$endpoint} },
188 0         0 };
189 0         0 };
190             }
191              
192             sub kill {
193 0     0   0 my ($self, $port, @reason) = @_;
194              
195 0         0 $self->{send} (["", kil1 => $port, @reason]);
196             }
197              
198             sub monitor {
199 0     0   0 my ($self, $portid, $cb) = @_;
200              
201 0   0     0 my $list = $self->{lmon}{$portid} ||= [];
202              
203 0 0 0     0 $self->send (["", mon1 => $portid])
204             unless @$list || !length $portid;
205              
206 0         0 push @$list, $cb;
207             }
208              
209             sub unmonitor {
210 0     0   0 my ($self, $portid, $cb) = @_;
211              
212 0 0       0 my $list = $self->{lmon}{$portid}
213             or return;
214              
215 0         0 @$list = grep $_ != $cb, @$list;
216              
217 0 0       0 unless (@$list) {
218 0         0 $self->send (["", mon0 => $portid]);
219 0         0 delete $self->{monitor}{$portid};
220             }
221             }
222              
223             package AnyEvent::MP::Node::Self; # the local node
224              
225 1     1   6 use base "AnyEvent::MP::Node";
  1         2  
  1         642  
226              
227       0     sub connect {
228             # we are trivially connected
229             }
230              
231             # delay every so often to avoid recursion, also used to delay after spawn
232             our $DELAY = -50;
233             our @DELAY;
234             our $DELAY_W;
235              
236             our $send_delayed = sub {
237             $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE;
238             (shift @DELAY)->()
239             while @DELAY;
240             undef $DELAY_W;
241             $DELAY = -50;
242             };
243              
244             sub transport_reset {
245 1     1   3 my ($self) = @_;
246              
247 1         5 Scalar::Util::weaken $self;
248              
249             $self->{send} = sub {
250 0 0   0     if (++$DELAY > 0) {
251 0           my $msg = $_[0];
252 0           push @DELAY, sub { AnyEvent::MP::Kernel::_inject (@$msg) };
  0            
253 0   0       $DELAY_W ||= AE::timer 0, 0, $send_delayed;
254 0           return;
255             }
256              
257 0           local $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE;
258 0           AnyEvent::MP::Kernel::_inject (@{ $_[0] });
  0            
259 1         6 };
260             }
261              
262             sub transport_connect {
263 0     0     my ($self, $tp) = @_;
264              
265 0           AE::log 9 => "I refuse to talk to myself ($tp->{peerhost}:$tp->{peerport})";
266             }
267              
268             sub kill {
269 0     0     my (undef, @args) = @_;
270              
271             # we _always_ delay kil's, to avoid calling mon callbacks
272             # from anything but the event loop context.
273 0           $DELAY = 1;
274 0     0     push @DELAY, sub { AnyEvent::MP::Kernel::_kill (@args) };
  0            
275 0   0       $DELAY_W ||= AE::timer 0, 0, $send_delayed;
276             }
277              
278             sub monitor {
279             # maybe always delay, too?
280 0 0   0     if ($DELAY_W) {
281 0           my @args = @_;
282 0     0     push @DELAY, sub { AnyEvent::MP::Kernel::_monitor (@args) };
  0            
283 0           return;
284             }
285 0           &AnyEvent::MP::Kernel::_monitor;
286             }
287              
288             sub unmonitor {
289             # no need to always delay
290 0 0   0     if ($DELAY_W) {
291 0           my @args = @_;
292 0     0     push @DELAY, sub { AnyEvent::MP::Kernel::_unmonitor (@args) };
  0            
293 0           return;
294             }
295              
296 0           &AnyEvent::MP::Kernel::_unmonitor;
297             }
298              
299             =head1 SEE ALSO
300              
301             L.
302              
303             =head1 AUTHOR
304              
305             Marc Lehmann
306             http://home.schmorp.de/
307              
308             =cut
309              
310             1
311