File Coverage

blib/lib/Cassandra/Client/NetworkStatus.pm
Criterion Covered Total %
statement 14 90 15.5
branch 0 12 0.0
condition 0 9 0.0
subroutine 5 22 22.7
pod 0 9 0.0
total 19 142 13.3


line stmt bran cond sub pod time code
1             package Cassandra::Client::NetworkStatus;
2             our $AUTHORITY = 'cpan:TVDW';
3             $Cassandra::Client::NetworkStatus::VERSION = '0.13_004'; # TRIAL
4              
5 1     1   14 $Cassandra::Client::NetworkStatus::VERSION = '0.13004';use 5.010;
  1         3  
6 1     1   4 use strict;
  1         2  
  1         15  
7 1     1   3 use warnings;
  1         2  
  1         21  
8              
9 1     1   4 use Scalar::Util qw/weaken/;
  1         2  
  1         40  
10 1     1   5 use Cassandra::Client::Util;
  1         4  
  1         816  
11              
12             sub new {
13 0     0 0   my ($class, %args)= @_;
14              
15             my $self= bless {
16             pool => $args{pool},
17             async_io => $args{async_io},
18              
19 0           waiting_for_cb => [],
20             master_id => undef,
21              
22             shutdown => undef,
23             }, $class;
24 0           weaken($self->{pool});
25 0           return $self;
26             }
27              
28             sub init {
29 0     0 0   my ($self, $callback)= @_;
30 0           $self->select_master($callback);
31             }
32              
33             sub select_master {
34 0     0 0   my ($self, $callback)= @_;
35              
36 0 0         return $callback->() if $self->{master_id};
37 0 0         if (@{$self->{waiting_for_cb}}) {
  0            
38 0           push @{$self->{waiting_for_cb}}, $callback;
  0            
39 0           return;
40             }
41 0           push @{$self->{waiting_for_cb}}, $callback;
  0            
42              
43 0           my $pool= $self->{pool}; # non-weak
44              
45 0           my $attempts= 0;
46             whilst(
47             sub { # condition
48             !$self->{shutdown} && !$self->{master_id}
49 0   0 0     },
50             sub { # while
51 0     0     my ($wnext)= @_;
52             series([
53             sub {
54 0           my ($next)= @_;
55 0 0         if ($attempts++) {
56             # Don't retry immediately
57 0           $self->{async_io}->timer($next, 1);
58             } else {
59 0           $next->();
60             }
61             },
62             sub {
63 0           my ($next)= @_;
64 0           $pool->get_one_cb($next);
65             },
66             sub {
67 0           my ($next, $connection)= @_;
68             parallel([
69             sub {
70 0           my ($pnext)= @_;
71 0           $connection->register_events($pnext);
72             },
73             sub {
74 0           my ($pnext)= @_;
75 0           $connection->get_network_status($pnext);
76             },
77             sub {
78 0           $_[0]->(undef, $connection);
79             },
80 0           ], $next);
81             }, sub {
82 0           my ($next, undef, $networkstatus, $connection)= @_;
83 0           $self->{master_id}= $connection->get_pool_id;
84 0           $self->load_status($networkstatus);
85 0           $next->();
86             },
87             ], sub {
88 0           $wnext->();
89 0           });
90             },
91             sub { # finish
92 0     0     my ($error)= @_;
93 0           my @cb= @{$self->{waiting_for_cb}};
  0            
94 0           $self->{waiting_for_cb}= [];
95 0   0       $error= $error || ($self->{master_id} ? undef : "Master selection aborted");
96 0           $_->($error) for @cb;
97             }
98 0           );
99             }
100              
101             sub shutdown {
102 0     0 0   my ($self)= @_;
103 0           $self->{shutdown}= 1;
104             }
105              
106             sub load_status {
107 0     0 0   my ($self, $new_status)= @_;
108 0           my $old_status= $self->{status};
109 0           $self->{status}= $new_status;
110              
111 0           my @old_hosts= grep {!$new_status->{$_}} keys %$old_status;
  0            
112 0           my @new_hosts= grep {!$old_status->{$_}} keys %$new_status;
  0            
113              
114 0           $self->{pool}->on_removed_node($old_status->{$_}) for @old_hosts;
115 0           $self->{pool}->on_new_node($new_status->{$_}) for @new_hosts;
116             }
117              
118             sub event_added_node {
119 0     0 0   my ($self, $ipaddress)= @_;
120 0 0         $self->refresh_network_status unless $self->{status}{$ipaddress};
121             }
122              
123             sub event_removed_node {
124 0     0 0   my ($self, $ipaddress)= @_;
125 0           my $old_node= delete $self->{status}{$ipaddress};
126 0 0         if ($old_node) {
127 0           $self->{pool}->on_removed_node($old_node);
128             }
129             }
130              
131             sub disconnected {
132 0     0 0   my ($self, $id)= @_;
133 0 0 0       if ($self->{master_id} && $self->{master_id} == $id) {
134 0           $self->{master_id}= undef;
135 0     0     $self->select_master(sub{});
136             }
137             }
138              
139             sub refresh_network_status {
140 0     0 0   my ($self)= @_;
141              
142             series([
143             sub {
144 0     0     my ($next)= @_;
145 0           $self->{pool}->get_one_cb($next);
146             }, sub {
147 0     0     my ($next, $connection)= @_;
148 0           $connection->get_network_status($next);
149             }, sub {
150 0     0     my ($next, $status)= @_;
151 0           $self->load_status($status);
152 0           return $next->();
153             }
154             ], sub {
155 0     0     my ($error)= @_;
156             # XXX And now?
157 0           });
158             }
159              
160             1;
161              
162             __END__