File Coverage

blib/lib/DR/Tarantool/AEConnection.pm
Criterion Covered Total %
statement 111 140 79.2
branch 23 48 47.9
condition 10 26 38.4
subroutine 28 36 77.7
pod 0 15 0.0
total 172 265 64.9


line stmt bran cond sub pod time code
1 2     2   43453 use utf8;
  2         3  
  2         11  
2 2     2   61 use strict;
  2         4  
  2         61  
3 2     2   11 use warnings;
  2         3  
  2         82  
4              
5             package DR::Tarantool::AEConnection;
6 2     2   2443 use AnyEvent;
  2         11255  
  2         68  
7 2     2   1535 use AnyEvent::Socket ();
  2         66707  
  2         84  
8 2     2   27 use Carp;
  2         6  
  2         184  
9 2     2   1459 use List::MoreUtils ();
  2         2134  
  2         48  
10 2     2   13 use Scalar::Util ();
  2         3  
  2         2801  
11              
12             sub _errno() {
13 12     12   79 while (my ($k, $v) = each(%!)) {
14 804 100       7761 return $k if $v;
15             }
16 6         47 return $!;
17             }
18              
19             sub new {
20 6     6 0 602664 my ($class, %opts) = @_;
21              
22 6         14 $opts{state} = 'init';
23 6   50     40 $opts{host} ||= '127.0.0.1';
24 6 50       16 croak 'port is undefined' unless $opts{port};
25              
26              
27 6   50 0   54 $opts{on}{connected} ||= sub { };
  0         0  
28 6   50 0   38 $opts{on}{connfail} ||= sub { };
  0         0  
29 6   50 0   33 $opts{on}{disconnect} ||= sub { };
  0         0  
30 6   50 2   37 $opts{on}{error} ||= sub { };
  2         3  
31 6   50 10   36 $opts{on}{reconnecting} ||= sub { };
  10         25  
32              
33 6         10 $opts{success_connects} = 0;
34 6         11 $opts{wbuf} = '';
35              
36 6         25 $opts{read} = { any => [] };
37              
38 6   33     38 bless \%opts => ref($class) || $class;
39             }
40              
41              
42             sub on {
43 8     8 0 20 my ($self, $name, $cb) = @_;
44 8 50       26 croak "wrong event name: $name" unless exists $self->{on}{$name};
45 8   50 0   25 $self->{on}{$name} = $cb || sub { };
  0         0  
46 8         26 $self;
47             }
48              
49 6     6 0 569 sub fh { $_[0]->{fh} }
50 85     85 0 11603 sub state { $_[0]->{state} }
51 16     16 0 66 sub host { $_[0]->{host} }
52 16     16 0 210 sub port { $_[0]->{port} }
53 0     0 0 0 sub error { $_[0]->{error} }
54 2     2 0 9 sub errno { $_[0]->{errno} }
55 13     13 0 50 sub reconnect_always { $_[0]->{reconnect_always} }
56 27     27 0 417 sub reconnect_period { $_[0]->{reconnect_period} }
57             sub timeout {
58 17     17 0 39 my ($self) = @_;
59 17 50       100 return $self->{timeout} if @_ == 1;
60 0         0 return $self->{timeout} = $_[1];
61             }
62              
63              
64             sub set_error {
65 2     2 0 1053 my ($self, $error, $errno) = @_;
66 2   33     13 $errno ||= $error;
67 2         4 $self->{state} = 'error';
68 2         5 $self->{error} = $error;
69 2         41 $self->{errno} = $errno;
70 2         9 $self->{on}{error}($self);
71 2         6 $self->{guard} = {};
72 2         4 $self->{wbuf} = '';
73              
74 2         7 $self->_check_reconnect;
75            
76             }
77              
78             sub _check_reconnect {
79 15     15   43 Scalar::Util::weaken(my $self = shift);
80 15 50       34 return if $self->state eq 'connected';
81 15 50       33 return if $self->state eq 'connecting';
82 15 50       50 return if $self->{guard}{rc};
83              
84 15 100       38 return unless $self->reconnect_period;
85 13 100       44 unless ($self->reconnect_always) {
86 3 100       25 return unless $self->{success_connects};
87             }
88              
89             $self->{guard}{rc} = AE::timer $self->reconnect_period, 0, sub {
90 10 50   10   984370 return unless $self;
91 10         62 delete $self->{guard}{rc};
92 10         69 $self->{on}{reconnecting}($self);
93 10         50 $self->connect;
94 12         26 };
95             }
96              
97             sub connect {
98 16     16 0 90 Scalar::Util::weaken(my $self = shift);
99              
100 16 50 33     59 return if $self->state eq 'connected' or $self->state eq 'connecting';
101              
102 16         39 $self->{state} = 'connecting';
103 16         48 $self->{error} = undef;
104 16         46 $self->{errno} = undef;
105 16         47 $self->{guard} = {};
106              
107             $self->{guard}{c} = AnyEvent::Socket::tcp_connect
108             $self->host,
109             $self->port,
110             sub {
111 15     15   1372 $self->{guard} = {};
112 15         78 my ($fh) = @_;
113 15 100       121 if ($fh) {
114 3         7 $self->{fh} = $fh;
115 3         66 $self->{state} = 'connected';
116 3         7 $self->{success_connects}++;
117 3 50       12 $self->push_write('') if length $self->{wbuf};
118 3         13 $self->{on}{connected}($self);
119 3         42 return;
120             }
121            
122 12         45 $self->{error} = $!;
123 12         40 $self->{errno} = _errno;
124 12         25 $self->{state} = 'connfail';
125 12         24 $self->{guard} = {};
126 12         60 $self->{on}{connfail}($self);
127 12 50       113 return unless $self;
128 12         35 $self->_check_reconnect;
129             },
130 16     16   4667 sub {
131              
132             }
133 16         62 ;
134              
135 16 100       2411 if (defined $self->timeout) {
136             $self->{guard}{t} = AE::timer $self->timeout, 0, sub {
137 1     1   18 delete $self->{guard}{t};
138 1 50       2 return unless $self->state eq 'connecting';
139              
140 1         2 $self->{error} = 'Connection timeout';
141 1         3 $self->{errno} = 'ETIMEOUT';
142 1         2 $self->{state} = 'connfail';
143 1         2 $self->{guard} = {};
144 1         3 $self->{on}{connfail}($self);
145 1         16 $self->_check_reconnect;
146 1         3 };
147             }
148            
149 16         139 $self;
150             }
151              
152             sub disconnect {
153 0     0 0   Scalar::Util::weaken(my $self = shift);
154 0 0 0       return if $self->state eq 'disconnect' or $self->state eq 'init';
155              
156 0           $self->{guard} = {};
157 0           $self->{error} = 'Disconnected';
158 0           $self->{errno} = 'SUCCESS';
159 0           $self->{state} = 'disconnect';
160 0           $self->{wbuf} = '';
161 0           $self->{on}{disconnect}($self);
162             }
163              
164              
165             sub push_write {
166 0     0 0   Scalar::Util::weaken(my $self = shift);
167 0           my ($str) = @_;
168              
169 0           $self->{wbuf} .= $str;
170              
171 0 0         return unless $self->state eq 'connected';
172 0 0         return unless length $self->{wbuf};
173 0 0         return if $self->{guard}{write};
174              
175             $self->{guard}{write} = AE::io $self->fh, 1, sub {
176 0     0     my $l = syswrite $self->fh, $self->{wbuf};
177 0 0         unless(defined $l) {
178 0 0         return if $!{EINTR};
179 0           $self->set_error($!, _errno);
180 0           return;
181             }
182 0           substr $self->{wbuf}, 0, $l, '';
183 0 0         return if length $self->{wbuf};
184 0           delete $self->{guard}{write};
185 0           };
186             }
187              
188              
189              
190              
191             1;