File Coverage

blib/lib/Cassandra/Client/Pool.pm
Criterion Covered Total %
statement 17 151 11.2
branch 0 48 0.0
condition 0 16 0.0
subroutine 6 25 24.0
pod 0 14 0.0
total 23 254 9.0


line stmt bran cond sub pod time code
1             package Cassandra::Client::Pool;
2             our $AUTHORITY = 'cpan:TVDW';
3             $Cassandra::Client::Pool::VERSION = '0.21';
4 13     13   226 use 5.010;
  13         52  
5 13     13   71 use strict;
  13         26  
  13         325  
6 13     13   54 use warnings;
  13         24  
  13         728  
7              
8 13     13   78 use Scalar::Util 'weaken';
  13         26  
  13         1162  
9 13     13   119 use Cassandra::Client::Util;
  13         30  
  13         906  
10 13     13   7100 use Cassandra::Client::NetworkStatus;
  13         71  
  13         29462  
11              
12             sub new {
13 0     0 0   my ($class, %args)= @_;
14             my $self= bless {
15             client => $args{client},
16             options => $args{options},
17             metadata => $args{metadata},
18             max_connections => $args{options}{max_connections},
19             async_io => $args{async_io},
20             policy => $args{load_balancing_policy},
21              
22 0           shutdown => 0,
23             pool => {},
24             count => 0,
25             list => [],
26              
27             last_id => 0,
28             id2ip => {},
29              
30             i => 0,
31              
32             connecting => {},
33             wait_connect => [],
34             }, $class;
35 0           weaken($self->{client});
36 0           $self->{network_status}= Cassandra::Client::NetworkStatus->new(pool => $self, async_io => $args{async_io});
37 0           return $self;
38             }
39              
40             sub init {
41 0     0 0   my ($self, $callback, $first_connection)= @_;
42              
43             # This code can be called twice.
44              
45             # If we didn't have a datacenter pinned before, now we do
46 0   0       $self->{policy}{datacenter} ||= $first_connection->{datacenter};
47              
48 0           $self->add($first_connection);
49 0           $self->{policy}->set_connecting($first_connection->ip_address);
50 0           $self->{policy}->set_connected($first_connection->ip_address);
51              
52             # Master selection, warmup, etc
53             series([
54             sub {
55 0     0     my ($next)= @_;
56 0           $self->{network_status}->init($next);
57             },
58             sub {
59 0     0     my ($next)= @_;
60              
61 0 0         if ($self->{config}{warmup}) {
62 0           $self->connect_if_needed($next);
63             } else {
64 0           $self->connect_if_needed();
65 0           return $next->();
66             }
67             },
68 0           ], $callback);
69             }
70              
71             sub get_one {
72 0     0 0   my ($self)= @_;
73 0 0         return undef unless $self->{count};
74              
75             # Round-robin: pick the next one
76 0           return $self->{list}[$self->{i}= (($self->{i}+1) % $self->{count})];
77             }
78              
79             sub get_one_cb {
80 0     0 0   my ($self, $callback)= @_;
81              
82 0 0         return $callback->(undef, $self->get_one) if $self->{count};
83              
84 0 0         if (!%{$self->{connecting}}) {
  0            
85 0           $self->connect_if_needed;
86             }
87 0 0         if (!%{$self->{connecting}}) {
  0            
88 0           return $callback->("Disconnected: all servers unreachable");
89             }
90              
91 0   0       push @{$self->{wait_connect} ||= []}, {
  0            
92             callback => $callback,
93             attempts => 0,
94             };
95             }
96              
97             sub remove {
98 0     0 0   my ($self, $id)= @_;
99 0 0         if (!$id) {
100             # Probably never got added. Ignore.
101 0           return;
102             }
103              
104 0           my $ipaddress= delete $self->{id2ip}{$id};
105 0 0         if (!$ipaddress) {
106 0           warn 'BUG: Tried to remove an unregistered connection. Probably a bad idea.';
107 0           return;
108             }
109              
110 0           my $connection= delete $self->{pool}{$ipaddress};
111 0 0         if (!$connection) {
112 0           warn 'BUG: Found a registered but unknown connection. This should not happen.';
113 0           return;
114             }
115              
116 0           $self->rebuild;
117              
118 0           $self->{policy}->set_disconnected($ipaddress);
119 0           $self->{network_status}->disconnected($connection->get_pool_id);
120 0           $self->connect_if_needed;
121              
122 0           return;
123             }
124              
125             sub add {
126 0     0 0   my ($self, $connection)= @_;
127              
128 0           my $ipaddress= $connection->ip_address;
129              
130 0 0         if ($self->{pool}{$ipaddress}) {
131 0           warn 'BUG: Duplicate connection for '.$ipaddress.'!';
132             }
133              
134 0           my $id= (++($self->{last_id}));
135 0           $connection->set_pool_id($id);
136 0           $self->{pool}{$ipaddress}= $connection;
137 0           $self->{id2ip}{$id}= $ipaddress;
138              
139 0           $self->rebuild;
140              
141 0           my $waiters= delete $self->{wait_connect};
142 0           $_->{callback}->(undef, $connection) for @$waiters;
143              
144 0     0     $self->{network_status}->select_master(sub{});
145              
146 0           return;
147             }
148              
149             sub rebuild {
150 0     0 0   my ($self)= @_;
151              
152 0           $self->{list}= [ values %{$self->{pool}} ];
  0            
153 0           $self->{count}= 0+ @{$self->{list}};
  0            
154              
155 0           return;
156             }
157              
158             sub shutdown {
159 0     0 0   my ($self)= @_;
160              
161 0           $self->{network_status}->shutdown;
162 0           $self->{shutdown}= 1;
163              
164 0           my @pool= @{$self->{list}};
  0            
165 0           $_->shutdown("Shutting down") for @pool;
166              
167 0           my @connecting= values %{$self->{connecting}};
  0            
168 0           $_->shutdown("Shutting down") for @connecting;
169              
170 0           return;
171             }
172              
173             sub connect_if_needed {
174 0     0 0   my ($self, $callback)= @_;
175              
176 0           my $max_connect= $self->{max_connections} - $self->{count};
177 0 0         return if $max_connect <= 0;
178              
179 0           $max_connect -= keys %{$self->{connecting}};
  0            
180 0 0         return if $max_connect <= 0;
181              
182 0 0         return if $self->{shutdown};
183              
184 0 0         if ($self->{_in_connect}) {
185 0           return;
186             }
187 0           local $self->{_in_connect}= 1;
188              
189 0           my $done= 0;
190 0           my $expect= $max_connect;
191 0           for (1..$max_connect) {
192             $expect-- unless $self->spawn_new_connection(sub {
193 0     0     $done++;
194              
195 0 0         if ($done == $expect) {
196 0 0         $callback->() if $callback;
197 0           undef $callback;
198             }
199 0 0         });
200             }
201 0 0 0       if ($callback && !$expect) {
202 0           $callback->();
203             }
204             }
205              
206             sub spawn_new_connection {
207 0     0 0   my ($self, $callback)= @_;
208              
209 0           my $host= $self->{policy}->get_next_candidate;
210 0 0         return unless $host;
211              
212             my $connection= Cassandra::Client::Connection->new(
213             client => $self->{client},
214             options => $self->{options},
215             host => $host,
216             async_io => $self->{async_io},
217             metadata => $self->{metadata},
218 0           );
219              
220 0           $self->{connecting}{$host}= $connection;
221 0           $self->{policy}->set_connecting($host);
222              
223             $connection->connect(sub {
224 0     0     my ($error)= @_;
225              
226 0           delete $self->{connecting}{$host};
227 0 0         if ($error) {
228 0           $self->{policy}->set_disconnected($host);
229              
230 0 0         if (my $waiters= delete $self->{wait_connect}) {
231 0 0 0       if ($self->{count} && @$waiters) {
232 0           warn 'We have callbacks waiting for a connection while we\'re connected';
233             }
234              
235 0           my $max_conn= $self->{max_connections};
236 0           my $known_node_count= $self->{policy}->known_node_count;
237 0 0         my $max_attempts = ($max_conn < $known_node_count ? $max_conn : $known_node_count) + 1;
238              
239 0           for my $waiter (@$waiters) {
240 0 0 0       if ((++$waiter->{attempts}) >= $max_attempts || !%{$self->{connecting}}) {
  0            
241 0           $waiter->{callback}->("Failed to connect to server: $error");
242             } else {
243 0   0       push @{$self->{wait_connect} ||= []}, $waiter;
  0            
244             }
245             }
246             }
247              
248 0           $self->connect_if_needed;
249             } else {
250 0           $self->{policy}->set_connected($host);
251              
252 0           $self->add($connection);
253             }
254              
255 0           $callback->($error);
256 0           });
257              
258 0           return 1;
259             }
260              
261             # Events coming from the network
262             sub event_added_node {
263 0     0 0   my ($self, $ipaddress)= @_;
264 0           $self->{network_status}->event_added_node($ipaddress);
265             }
266              
267             sub event_removed_node {
268 0     0 0   my ($self, $ipaddress)= @_;
269 0           $self->{network_status}->event_removed_node($ipaddress);
270              
271 0 0         if (my $conn= $self->{pool}{$ipaddress}) {
272 0           $conn->shutdown("Removed from pool");
273             }
274             }
275              
276             # Events coming from network_status
277             sub on_new_node {
278 0     0 0   my ($self, $node)= @_;
279 0           $self->{policy}->on_new_node($node);
280             }
281              
282             sub on_removed_node {
283 0     0 0   my ($self, $node)= @_;
284 0           $self->{policy}->on_removed_node($node);
285             }
286              
287             1;
288              
289             __END__