File Coverage

lib/DR/Tnt/LowLevel/Connector.pm
Criterion Covered Total %
statement 42 164 25.6
branch 0 30 0.0
condition 0 13 0.0
subroutine 14 25 56.0
pod 0 7 0.0
total 56 239 23.4


line stmt bran cond sub pod time code
1 6     6   2493 use utf8;
  6         13  
  6         36  
2 6     6   166 use strict;
  6         10  
  6         123  
3 6     6   41 use warnings;
  6         10  
  6         222  
4              
5             package DR::Tnt::LowLevel::Connector;
6 6     6   25 use Mouse;
  6         9  
  6         35  
7 6     6   3563 use DR::Tnt::Proto;
  6         15  
  6         285  
8 6     6   3025 use List::MoreUtils 'any';
  6         63339  
  6         30  
9 6     6   5198 use feature 'state';
  6         11  
  6         655  
10 6     6   33 use Carp;
  6         10  
  6         395  
11             $Carp::Internal{ (__PACKAGE__) }++;
12 6     6   29 use Data::Dumper;
  6         7  
  6         247  
13 6     6   36 use feature 'switch';
  6         17  
  6         158  
14 6     6   2733 use Time::HiRes ();
  6         7006  
  6         148  
15 6     6   1873 use DR::Tnt::Dumper;
  6         13  
  6         277  
16 6     6   35 use Mouse::Util::TypeConstraints;
  6         8  
  6         113  
17             enum LLConnectorState => [
18             'init',
19             'connecting',
20             'connected',
21             'ready',
22             'error'
23             ];
24 6     6   723 no Mouse::Util::TypeConstraints;
  6         12  
  6         24  
25              
26             has host => is => 'ro', isa => 'Str', required => 1;
27             has port => is => 'ro', isa => 'Str', required => 1;
28             has user => is => 'ro', isa => 'Maybe[Str]';
29             has password => is => 'ro', isa => 'Maybe[Str]';
30             has utf8 => is => 'ro', isa => 'Bool', default => 1;
31              
32             has state =>
33             is => 'rw',
34             isa => 'LLConnectorState',
35             default => 'init',
36             trigger => sub {
37             my ($self) = @_;
38             goto $self->state;
39              
40             init:
41             $self->last_error(undef);
42             die 1;
43              
44             connecting: {
45             $self->last_error(undef);
46             $self->_clean_fh;
47             $self->_active_sync({});
48             $self->_watcher({});
49             return;
50             }
51              
52             connected:
53             $self->last_error(undef);
54             return;
55              
56             ready:
57             $self->last_error(undef);
58             return;
59              
60             error: {
61             $self->_clean_fh;
62              
63             confess "Can't set state 'error' without last_error"
64             unless $self->last_error;
65            
66             # on_handshake erorrs
67             my $list = $self->_on_handshake;
68             $self->_on_handshake([]);
69             $_->(@{ $self->last_error }) for @$list;
70            
71             # waiter errors
72             $list = $self->_watcher;
73             $self->_watcher({});
74             $_->(@{ $self->last_error }) for map { @$_ } values %$list;
75              
76             # unsent errors
77             $list = $self->_unsent;
78             $self->_unsent([]);
79             $_->[-1](@{ $self->last_error }) for @$list;
80             return;
81             }
82             };
83              
84             has fh =>
85             is => 'ro',
86             isa => 'Maybe[Any]',
87             clearer => '_clean_fh',
88             writer => '_set_fh';
89              
90             has last_error_time => is => 'rw', isa => 'Num', default => 0;
91             has last_error =>
92             is => 'rw',
93             isa => 'Maybe[ArrayRef]',
94             trigger => sub { $_[0]->last_error_time(Time::HiRes::time) }
95             ;
96              
97             has greeting => is => 'rw', isa => 'Maybe[HashRef]';
98             has rbuf => is => 'rw', isa => 'Str', default => '';
99              
100             has _unsent => is => 'rw', isa => 'ArrayRef', default => sub {[]};
101             has _last_sync => is => 'rw', isa => 'Int', default => 0;
102             has _active_sync => is => 'rw', isa => 'HashRef', default => sub {{}};
103             has _watcher => is => 'rw', isa => 'HashRef', default => sub {{}};
104             has _on_handshake => is => 'rw', isa => 'ArrayRef', default => sub {[]};
105              
106             sub next_sync {
107 0     0 0   my ($self) = @_;
108              
109 0           for (my $sync = $self->_last_sync + 1;; $sync++) {
110 0 0         $sync = 1 if $sync > 0x7FFF_FFFF;
111 0 0         next if exists $self->_active_sync->{ $sync };
112 0           $self->_last_sync($sync);
113 0           $self->_active_sync->{ $sync } = 1;
114 0           return $sync;
115             }
116             }
117              
118             sub connect {
119 0     0 0   my ($self, $cb) = @_;
120              
121 0 0   0     if (any { $_ eq $self->state } 'init', 'error', 'ready') {
  0            
122 0           $self->_clean_fh;
123 0           $self->state('connecting');
124             $self->_connect(sub {
125 0     0     my ($state, $message) = @_;
126 0 0         if ($state eq 'OK') {
127 0           $self->state('connected');
128             } else {
129             # TODO connection error
130 0   0       $self->last_error([$state, $message // "Can't connect to remote host"]);
131 0           $self->state('error');
132             }
133 0           goto &$cb;
134 0           });
135 0           return;
136             }
137 0           $cb->(fatal => 'can not connect in state: ' . $self->state);
138 0           return;
139             }
140              
141             sub socket_error {
142 0     0 0   my ($self, $message) = @_;
143 0   0       $self->last_error([ER_SOCKET => $message // 'Socket error']);
144 0           $self->state('error');
145             }
146              
147             sub handshake {
148 0     0 0   my ($self, $cb) = @_;
149              
150 0           goto $self->state;
151              
152 0           ready:
153             $cb->(OK => 'Handshake was received', $self->greeting);
154 0           return;
155              
156             init:
157             connecting:
158             connected:
159 0           push @{ $self->_on_handshake } => $cb;
  0            
  0            
  0            
160 0           return;
161              
162             error:
163 0           $cb->(@{ $self->last_error });
  0            
164 0           return;
165             }
166              
167              
168             sub send_request {
169 0     0 0   my $cb = pop;
170 0           my ($self, $name, @args) = @_;
171              
172              
173 0           goto $self->state;
174              
175              
176             error: {
177 0           $cb->(@{ $self->last_error });
  0            
  0            
178 0           return;
179             }
180              
181              
182             ready: {
183 0           state $r = {
  0            
184             select => \&DR::Tnt::Proto::select,
185             update => \&DR::Tnt::Proto::update,
186             insert => \&DR::Tnt::Proto::insert,
187             replace => \&DR::Tnt::Proto::replace,
188             delete => \&DR::Tnt::Proto::del,
189             call_lua => \&DR::Tnt::Proto::call_lua,
190             eval_lua => \&DR::Tnt::Proto::eval_lua,
191             ping => \&DR::Tnt::Proto::ping,
192             auth => \&DR::Tnt::Proto::auth,
193             };
194              
195 0 0         croak "unknown method $name" unless exists $r->{$name};
196              
197             state $ra = {
198             auth => sub {
199 0     0     my ($self, $schema_id, $user, $password) = @_;
200             return (
201             $schema_id,
202             $user // $self->user,
203             $password // $self->password,
204             $self->greeting->{salt},
205 0   0       );
      0        
206             }
207 0           };
208            
209 0 0         @args = $ra->{$name}->($self, @args) if exists $ra->{$name};
210            
211 0           my $sync = $self->next_sync;
212 0           my $pkt = $r->{$name}->($sync, @args);
213              
214 0 0         if ($ENV{DR_SEND_DUMP}) {
215 0           warn pkt_dump($name, $pkt);
216             }
217              
218              
219             $self->send_pkt($pkt, sub {
220 0     0     my ($state, $message) = @_;
221 0 0         unless ($state eq 'OK') {
222 0           $self->last_error([$state, $message]);
223 0           $self->state('error');
224 0           $self->fh(undef);
225 0           goto &$cb;
226             }
227 0           $cb->(OK => sprintf("packet '%s' sent", $name), $sync);
228 0           });
229              
230 0           return;
231             }
232              
233             init:
234             connected:
235             connecting:
236             {
237 0           push @{ $self->_unsent } => [ $name, @args, $cb ];
  0            
  0            
238 0           return;
239             }
240 0           }
  0            
241              
242             sub wait_response {
243 0     0 0   my ($self, $sync, $cb) = @_;
244 0 0         unless (exists $self->_active_sync->{$sync}) {
245 0           $cb->(ER_FATAL => "Request $sync was not sent");
246 0           return;
247             }
248 0 0         if (ref $self->_active_sync->{$sync}) {
249 0           my $resp = delete $self->_active_sync->{$sync};
250 0           $cb->(OK => 'Request was read', $resp);
251 0           return;
252             }
253 0           push @{ $self->_watcher->{$sync} } => $cb;
  0            
254 0           return;
255             }
256              
257             sub check_rbuf {
258 0     0 0   my ($self) = @_;
259            
260 0           my $found = 0;
261              
262             # handshake
263 0           goto $self->state;
264              
265             ready: {
266 0           while (length $self->rbuf) {
  0            
267 0           my ($res, $tail) = DR::Tnt::Proto::response($self->rbuf, $self->utf8);
268 0 0         return $found unless defined $res;
269            
270 0           $found++;
271              
272 0           $self->rbuf($tail);
273              
274 0           my $sync = $res->{SYNC};
275 0 0         if (exists $self->_watcher->{$sync}) {
276 0           my $list = delete $self->_watcher->{$sync};
277 0           delete $self->_active_sync->{$sync};
278 0           for my $cb (@$list) {
279 0           $cb->(OK => 'Response received', $res);
280             }
281 0           next;
282             }
283              
284 0 0         unless (exists $self->_active_sync->{$sync}) {
285 0           warn "Unexpected tarantool reply $sync";
286 0           next;
287             }
288              
289 0           $self->_active_sync->{$sync} = $res;
290             };
291 0           return $found;
292             }
293              
294             connected: {
295 0 0         return $found if 128 > length $self->rbuf;
  0            
296 0           my $handshake = substr $self->rbuf, 0, 128;
297 0           $self->rbuf(substr $self->rbuf, 128);
298              
299 0           my $greeting = DR::Tnt::Proto::parse_greeting($handshake);
300 0 0 0       unless ($greeting and $greeting->{salt}) {
301 0           $self->fh(undef);
302 0           $self->last_error([ER_HANDSHAKE => 'Broken handshake']);
303 0           $self->state('error');
304 0           return $found;
305             }
306            
307 0           $self->greeting($greeting);
308 0           $self->state('ready');
309            
310 0           for (my $list = $self->_on_handshake) {
311 0           $self->_on_handshake([]);
312 0           $_->(OK => 'Handshake received', $self->greeting) for @$list;
313 0           $found++;
314             }
315            
316 0           for (my $list = $self->_unsent) {
317 0           $self->_unsent([]);
318 0           $self->send_request(@$_) for @$list;
319             }
320 0           goto ready;
321             }
322             }
323             __PACKAGE__->meta->make_immutable;