File Coverage

blib/lib/Cassandra/Client/NetworkStatus.pm
Criterion Covered Total %
statement 14 90 15.5
branch 0 12 0.0
condition 0 9 0.0
subroutine 5 22 22.7
pod 0 9 0.0
total 19 142 13.3


line stmt bran cond sub pod time code
1             package Cassandra::Client::NetworkStatus;
2             our $AUTHORITY = 'cpan:TVDW';
3             $Cassandra::Client::NetworkStatus::VERSION = '0.21';
4 13     13   220 use 5.010;
  13         48  
5 13     13   68 use strict;
  13         25  
  13         323  
6 13     13   57 use warnings;
  13         22  
  13         761  
7              
8 13     13   73 use Scalar::Util qw/weaken/;
  13         34  
  13         973  
9 13     13   125 use Cassandra::Client::Util;
  13         32  
  13         17212  
10              
11             sub new {
12 0     0 0   my ($class, %args)= @_;
13              
14             my $self= bless {
15             pool => $args{pool},
16             async_io => $args{async_io},
17              
18 0           waiting_for_cb => [],
19             master_id => undef,
20              
21             shutdown => undef,
22             }, $class;
23 0           weaken($self->{pool});
24 0           return $self;
25             }
26              
27             sub init {
28 0     0 0   my ($self, $callback)= @_;
29 0           $self->select_master($callback);
30             }
31              
32             sub select_master {
33 0     0 0   my ($self, $callback)= @_;
34              
35 0 0         return $callback->() if $self->{master_id};
36 0 0         if (@{$self->{waiting_for_cb}}) {
  0            
37 0           push @{$self->{waiting_for_cb}}, $callback;
  0            
38 0           return;
39             }
40 0           push @{$self->{waiting_for_cb}}, $callback;
  0            
41              
42 0           my $pool= $self->{pool}; # non-weak
43              
44 0           my $attempts= 0;
45             whilst(
46             sub { # condition
47             !$self->{shutdown} && !$self->{master_id}
48 0   0 0     },
49             sub { # while
50 0     0     my ($wnext)= @_;
51             series([
52             sub {
53 0           my ($next)= @_;
54 0 0         if ($attempts++) {
55             # Don't retry immediately
56 0           $self->{async_io}->timer($next, 1);
57             } else {
58 0           $next->();
59             }
60             },
61             sub {
62 0           my ($next)= @_;
63 0           $pool->get_one_cb($next);
64             },
65             sub {
66 0           my ($next, $connection)= @_;
67             parallel([
68             sub {
69 0           my ($pnext)= @_;
70 0           $connection->register_events($pnext);
71             },
72             sub {
73 0           my ($pnext)= @_;
74 0           $connection->get_network_status($pnext);
75             },
76             sub {
77 0           $_[0]->(undef, $connection);
78             },
79 0           ], $next);
80             }, sub {
81 0           my ($next, undef, $networkstatus, $connection)= @_;
82 0           $self->{master_id}= $connection->get_pool_id;
83 0           $self->load_status($networkstatus);
84 0           $next->();
85             },
86             ], sub {
87 0           $wnext->();
88 0           });
89             },
90             sub { # finish
91 0     0     my ($error)= @_;
92 0           my @cb= @{$self->{waiting_for_cb}};
  0            
93 0           $self->{waiting_for_cb}= [];
94 0   0       $error= $error || ($self->{master_id} ? undef : "Master selection aborted");
95 0           $_->($error) for @cb;
96             }
97 0           );
98             }
99              
100             sub shutdown {
101 0     0 0   my ($self)= @_;
102 0           $self->{shutdown}= 1;
103             }
104              
105             sub load_status {
106 0     0 0   my ($self, $new_status)= @_;
107 0           my $old_status= $self->{status};
108 0           $self->{status}= $new_status;
109              
110 0           my @old_hosts= grep {!$new_status->{$_}} keys %$old_status;
  0            
111 0           my @new_hosts= grep {!$old_status->{$_}} keys %$new_status;
  0            
112              
113 0           $self->{pool}->on_removed_node($old_status->{$_}) for @old_hosts;
114 0           $self->{pool}->on_new_node($new_status->{$_}) for @new_hosts;
115             }
116              
117             sub event_added_node {
118 0     0 0   my ($self, $ipaddress)= @_;
119 0 0         $self->refresh_network_status unless $self->{status}{$ipaddress};
120             }
121              
122             sub event_removed_node {
123 0     0 0   my ($self, $ipaddress)= @_;
124 0           my $old_node= delete $self->{status}{$ipaddress};
125 0 0         if ($old_node) {
126 0           $self->{pool}->on_removed_node($old_node);
127             }
128             }
129              
130             sub disconnected {
131 0     0 0   my ($self, $id)= @_;
132 0 0 0       if ($self->{master_id} && $self->{master_id} == $id) {
133 0           $self->{master_id}= undef;
134 0     0     $self->select_master(sub{});
135             }
136             }
137              
138             sub refresh_network_status {
139 0     0 0   my ($self)= @_;
140              
141             series([
142             sub {
143 0     0     my ($next)= @_;
144 0           $self->{pool}->get_one_cb($next);
145             }, sub {
146 0     0     my ($next, $connection)= @_;
147 0           $connection->get_network_status($next);
148             }, sub {
149 0     0     my ($next, $status)= @_;
150 0           $self->load_status($status);
151 0           return $next->();
152             }
153             ], sub {
154 0     0     my ($error)= @_;
155             # XXX And now?
156 0           });
157             }
158              
159             1;
160              
161             __END__