File Coverage

blib/lib/AnyEvent/MP/Node.pm
Criterion Covered Total %
statement 13 15 86.6
branch n/a
condition n/a
subroutine 5 5 100.0
pod n/a
total 18 20 90.0


line stmt bran cond sub pod time code
1             =head1 NAME
2              
3             AnyEvent::MP::Node - represent a node
4              
5             =head1 SYNOPSIS
6              
7             use AnyEvent::MP::Node;
8              
9             =head1 DESCRIPTION
10              
11             This is an internal utility module, horrible to look at, so don't.
12              
13             =cut
14              
15             package AnyEvent::MP::Node;
16              
17 1     1   1442 use common::sense;
  1         2  
  1         8  
18              
19 1     1   2049 use AE ();
  1         85  
  1         29  
20 1     1   1253 use AnyEvent::Util ();
  1         18832  
  1         31  
21 1     1   1496 use AnyEvent::Socket ();
  1         19561  
  1         42  
22              
23 1     1   68 use AnyEvent::MP::Transport ();
  0            
  0            
24              
25             sub new {
26             my ($self, $id) = @_;
27              
28             $self = bless { id => $id }, $self;
29              
30             $self->init;
31             $self->transport_reset;
32              
33             $self
34             }
35              
36             sub init {
37             #
38             }
39              
40             sub send {
41             &{ shift->{send} }
42             }
43              
44             # nodes reachable via the network
45             package AnyEvent::MP::Node::External;
46              
47             use base "AnyEvent::MP::Node";
48              
49             # called at init time, mostly sets {send}
50             sub transport_reset {
51             my ($self) = @_;
52              
53             delete $self->{transport};
54              
55             Scalar::Util::weaken $self;
56              
57             $self->{send} = sub {
58             push @{$self->{queue}}, shift;
59             $self->connect;
60             };
61             }
62              
63             # called each time we fail to establish a connection,
64             # or the existing connection failed
65             sub transport_error {
66             my ($self, @reason) = @_;
67              
68             my $no_transport = !$self->{transport};
69              
70             delete $self->{connect_w};
71             delete $self->{connect_to};
72              
73             delete $self->{queue};
74             $self->transport_reset;
75              
76             if (my $mon = delete $self->{lmon}) {
77             $_->(@reason) for map @$_, values %$mon;
78             }
79              
80             AnyEvent::MP::Kernel::_inject_nodeevent ($self, 0, @reason)
81             unless $no_transport;
82              
83             # if we are here and are idle, we nuke ourselves
84             delete $AnyEvent::MP::Kernel::NODE{$self->{id}}
85             unless $self->{transport} || $self->{connect_to};
86             }
87              
88             # called after handshake was successful
89             sub transport_connect {
90             my ($self, $transport) = @_;
91              
92             delete $self->{trial};
93              
94             $self->transport_error (transport_error => $self->{id}, "switched connections")
95             if $self->{transport};
96              
97             delete $self->{connect_addr};
98             delete $self->{connect_w};
99             delete $self->{connect_to};
100              
101             $self->{transport} = $transport;
102              
103             my $transport_send = $transport->{send};
104              
105             AnyEvent::MP::Kernel::_inject_nodeevent ($self, 1);
106              
107             $self->{send} = $transport_send;
108              
109             $transport_send->($_)
110             for @{ delete $self->{queue} || [] };
111             }
112              
113             sub connect {
114             my ($self, @addresses) = @_;
115              
116             return if $self->{transport};
117              
118             Scalar::Util::weaken $self;
119              
120             my $monitor = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
121              
122             $self->{connect_to} ||= AE::timer $monitor, 0, sub {
123             $self->transport_error (transport_error => $self->{id}, "unable to connect");
124             };
125              
126             return unless @addresses;
127              
128             if ($self->{connect_w}) {
129             # sometimes we get told about new addresses after we started to connect
130             unshift @{$self->{connect_addr}}, @addresses;
131             return;
132             }
133              
134             $self->{connect_addr} = \@addresses; # a bit weird, but efficient
135              
136             $AnyEvent::MP::Kernel::WARN->(9, "connecting to $self->{id} with [@addresses]");
137              
138             my $interval = $AnyEvent::MP::Kernel::CONFIG->{connect_interval};
139              
140             $interval = ($monitor - $interval) / @addresses
141             if ($monitor - $interval) / @addresses < $interval;
142              
143             $interval = 0.4 if $interval < 0.4;
144              
145             my @endpoints;
146              
147             $self->{connect_w} = AE::timer 0.050 * rand, $interval * (0.9 + 0.1 * rand), sub {
148             @endpoints = @addresses
149             unless @endpoints;
150              
151             my $endpoint = shift @endpoints;
152              
153             $AnyEvent::MP::Kernel::WARN->(9, "connecting to $self->{id} at $endpoint");
154              
155             $self->{trial}{$endpoint} ||= do {
156             my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
157             or return $AnyEvent::MP::Kernel::WARN->(1, "$self->{id}: not a resolved node reference.");
158              
159             AnyEvent::MP::Transport::mp_connect
160             $host, $port,
161             sub { delete $self->{trial}{$endpoint} },
162             };
163             };
164             }
165              
166             sub kill {
167             my ($self, $port, @reason) = @_;
168              
169             $self->send (["", kil => $port, @reason]);
170             }
171              
172             sub monitor {
173             my ($self, $portid, $cb) = @_;
174              
175             my $list = $self->{lmon}{$portid} ||= [];
176              
177             $self->send (["", mon1 => $portid])
178             unless @$list || !length $portid;
179              
180             push @$list, $cb;
181             }
182              
183             sub unmonitor {
184             my ($self, $portid, $cb) = @_;
185              
186             my $list = $self->{lmon}{$portid}
187             or return;
188              
189             @$list = grep $_ != $cb, @$list;
190              
191             unless (@$list) {
192             $self->send (["", mon0 => $portid]);
193             delete $self->{monitor}{$portid};
194             }
195             }
196              
197             # used for direct slave connections as well
198             package AnyEvent::MP::Node::Direct;
199              
200             use base "AnyEvent::MP::Node::External";
201              
202             package AnyEvent::MP::Node::Self;
203              
204             use base "AnyEvent::MP::Node";
205              
206             sub connect {
207             # we are trivially connected
208             }
209              
210             # delay every so often to avoid recursion, also used to delay after spawn
211             our $DELAY = -50;
212             our @DELAY;
213             our $DELAY_W;
214              
215             sub _send_delayed {
216             local $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE{""};
217             (shift @DELAY)->()
218             while @DELAY;
219             undef $DELAY_W;
220             $DELAY = -50;
221             }
222              
223             sub transport_reset {
224             my ($self) = @_;
225              
226             Scalar::Util::weaken $self;
227              
228             $self->{send} = sub {
229             if ($DELAY++ >= 0) {
230             my $msg = $_[0];
231             push @DELAY, sub { AnyEvent::MP::Kernel::_inject (@$msg) };
232             $DELAY_W ||= AE::timer 0, 0, \&_send_delayed;
233             return;
234             }
235              
236             local $AnyEvent::MP::Kernel::SRCNODE = $self;
237             AnyEvent::MP::Kernel::_inject (@{ $_[0] });
238             };
239             }
240              
241             sub transport_connect {
242             my ($self, $tp) = @_;
243              
244             $AnyEvent::MP::Kernel::WARN->(9, "I refuse to talk to myself ($tp->{peerhost}:$tp->{peerport})");
245             }
246              
247             sub kill {
248             my (undef, @args) = @_;
249              
250             # we _always_ delay kil's, to avoid calling mon callbacks
251             # from anything but the event loop context.
252             $DELAY = 1;
253             push @DELAY, sub { AnyEvent::MP::Kernel::_kill (@args) };
254             $DELAY_W ||= AE::timer 0, 0, \&_send_delayed;
255             }
256              
257             sub monitor {
258             # maybe always delay, too?
259             if ($DELAY_W) {
260             my @args = @_;
261             push @DELAY, sub { AnyEvent::MP::Kernel::_monitor (@args) };
262             return;
263             }
264             &AnyEvent::MP::Kernel::_monitor;
265             }
266              
267             sub unmonitor {
268             # no need to always delay
269             if ($DELAY_W) {
270             my @args = @_;
271             push @DELAY, sub { AnyEvent::MP::Kernel::_unmonitor (@args) };
272             return;
273             }
274              
275             &AnyEvent::MP::Kernel::_unmonitor;
276             }
277              
278             =head1 SEE ALSO
279              
280             L<AnyEvent::MP>.
281              
282             =head1 AUTHOR
283              
284             Marc Lehmann <schmorp@schmorp.de>
285             http://home.schmorp.de/
286              
287             =cut
288              
289             1
290