File Coverage

lib/DR/Tnt/FullCb.pm
Criterion Covered Total %
statement 45 376 11.9
branch 0 126 0.0
condition 0 34 0.0
subroutine 15 40 37.5
pod 1 3 33.3
total 61 579 10.5


line stmt bran cond sub pod time code
1 13     13   892045 use utf8;
  13         113  
  13         88  
2 13     13   441 use strict;
  13         25  
  13         321  
3 13     13   72 use warnings;
  13         27  
  13         643  
4              
5             package DR::Tnt::FullCb;
6 13     13   4942 use Mouse;
  13         334372  
  13         65  
7              
8             require DR::Tnt::LowLevel;
9 13     13   12606 use File::Spec::Functions 'catfile', 'rel2abs';
  13         13051  
  13         1107  
10 13     13   114 use Carp;
  13         45  
  13         1045  
11             $Carp::Internal{ (__PACKAGE__) }++;
12 13     13   5095 use DR::Tnt::Dumper;
  13         37  
  13         1248  
13             with 'DR::Tnt::Role::Logging';
14 13     13   109 use Scalar::Util;
  13         31  
  13         575  
15 13     13   86 use feature 'state';
  13         27  
  13         1818  
16              
17 13     13   97 use constant SPACE_space => 281; # _vspace
  13         32  
  13         806  
18 13     13   87 use constant SPACE_index => 289; # _vindex
  13         27  
  13         753  
19 13     13   92 use constant ER_TNT_PERMISSIONS => 0x8037;
  13         30  
  13         744  
20 13     13   87 use constant ER_TNT_SCHEMA => 0x806D;
  13         25  
  13         774  
21              
22              
23 13     13   100 use Mouse::Util::TypeConstraints;
  13         24  
  13         119  
24              
25             enum DriverType => [ 'sync', 'async' ];
26             enum FullCbState => [
27             'init',
28             'connecting',
29             'schema',
30             'ready',
31             'pause',
32             ];
33              
34 13     13   2208 no Mouse::Util::TypeConstraints;
  13         159  
  13         74  
35              
36             has logger => is => 'ro', isa => 'Maybe[CodeRef]';
37             has host => is => 'ro', isa => 'Str', required => 1;
38             has port => is => 'ro', isa => 'Str', required => 1;
39             has user => is => 'ro', isa => 'Maybe[Str]';
40             has password => is => 'ro', isa => 'Maybe[Str]';
41             has driver => is => 'ro', isa => 'DriverType', required => 1;
42             has reconnect_interval => is => 'ro', isa => 'Maybe[Num]';
43             has hashify_tuples => is => 'ro', isa => 'Bool', default => 0;
44             has utf8 => is => 'ro', isa => 'Bool', default => 1;
45             has lua_dir =>
46             is => 'ro',
47             isa => 'Maybe[Str]',
48             writer => '_set_lua_dir'
49             ;
50             has last_error =>
51             is => 'ro',
52             isa => 'Maybe[ArrayRef]',
53             writer => '_set_last_error'
54             ;
55             has state =>
56             is => 'ro',
57             isa => 'FullCbState',
58             default => 'init',
59             writer => '_set_state',
60             trigger => sub {
61             my ($self, undef, $old_state) = @_;
62             $self->_state_changed($self->_now);
63              
64             $self->_reconnector->event($self->state, $old_state);
65             $self->_log(info => 'Connector is in state: %s', $self->state);
66             };
67             ;
68              
69             has _state_changed => is => 'rw', isa => 'Maybe[Num]';
70              
71              
72             has last_schema =>
73             is => 'ro',
74             isa => 'Int',
75             default => 0,
76             writer => '_set_last_schema'
77             ;
78              
79              
80             has _reconnector =>
81             is => 'ro',
82             isa => 'Object',
83             lazy => 1,
84             builder => sub {
85             my ($self) = @_;
86              
87             goto $self->driver;
88              
89             sync:
90             require DR::Tnt::FullCb::Reconnector::Sync;
91             return DR::Tnt::FullCb::Reconnector::Sync->new(fcb => $self);
92              
93             async:
94             require DR::Tnt::FullCb::Reconnector::AE;
95             return DR::Tnt::FullCb::Reconnector::AE->new(fcb => $self);
96              
97             }
98             ;
99              
100              
101              
102             has _unsent_lua => is => 'rw', isa => 'ArrayRef', default => sub {[]};
103              
104             sub _preeval_lua {
105 0     0     my ($self, $cb) = @_;
106              
107 0           $self->_unsent_lua([]);
108              
109 0 0         if ($self->lua_dir) {
110 0           my @lua = sort glob catfile $self->lua_dir, '*.lua';
111 0           $self->_unsent_lua(\@lua);
112             }
113              
114 0           $self->_preeval_unsent_lua($cb);
115 0           return;
116             }
117              
118             sub _preeval_unsent_lua {
119 0     0     my ($self, $cb) = @_;
120              
121 0 0         unless (@{ $self->_unsent_lua }) {
  0            
122 0           $self->_invalid_schema($cb);
123 0           return;
124             }
125              
126 0           my $lua = shift @{ $self->_unsent_lua };
  0            
127              
128 0           $self->_log(debug => 'Eval "%s" after connection', $lua);
129              
130 0 0         if (open my $fh, '<:raw', $lua) {
131 0           local $/;
132 0           my $body = <$fh>;
133             $self->_reconnector->ll->send_request(eval_lua => undef, $body, sub {
134 0     0     my ($state, $message, $sync) = @_;
135 0 0         unless ($state eq 'OK') {
136 0           $self->_set_last_error([ $state, $message ]);
137 0           $self->_set_state('pause');
138 0           $cb->($state => $message);
139 0           return;
140             }
141              
142             $self->_reconnector->ll->wait_response($sync, sub {
143 0           my ($state, $message, $resp) = @_;
144 0 0         unless ($state eq 'OK') {
145 0           $self->_set_last_error([ $state, $message ]);
146 0           $self->_set_state('pause');
147 0           $cb->($state => $message);
148 0           return;
149             }
150 0 0         unless ($resp->{CODE} == 0) {
151             $cb->(ER_INIT_LUA =>
152             sprintf "lua (%s) error: %s",
153 0   0       $lua, $resp->{ERROR} // 'Unknown error'
154             );
155 0           return;
156             }
157 0           $self->_preeval_unsent_lua($cb);
158 0           });
159 0           });
160              
161             } else {
162 0           $self->_set_last_error(ER_OPEN_FILE => "$lua: $!");
163 0           $self->_set_state('pause');
164 0           $cb->(@{ $self->last_error });
  0            
165 0           return;
166             }
167             }
168              
169              
170             has _sch => is => 'rw', isa => 'HashRef';
171             has _spaces => is => 'rw', isa => 'ArrayRef', default => sub {[]};
172             has _indexes => is => 'rw', isa => 'ArrayRef', default => sub {[]};
173              
174             has _wait_ready => is => 'rw', isa => 'ArrayRef', default => sub { [] };
175              
176             sub _invalid_schema {
177 0     0     my ($self, $cb) = @_;
178              
179 0           goto $self->state;
180              
181 0           init:
182             pause:
183 0           confess "Internal error: _invalid_schema in state " . $self->state;
184              
185 0           schema:
186             connecting:
187             ready:
188 0           $self->_set_state('schema');
  0            
189             $self->_reconnector->ll->send_request(select =>
190             undef, SPACE_space, 0, [], undef, undef, 'ALL', sub {
191 0     0     my ($state, $message, $sync) = @_;
192 0           $self->_log(debug => 'Loading spaces');
193 0 0         unless ($state eq 'OK') {
194 0           $self->_set_last_error([ $state, $message ]);
195 0           $self->_set_state('pause');
196 0           $cb->($state => $message);
197 0           return;
198             }
199              
200             $self->_reconnector->ll->wait_response($sync, sub {
201 0           my ($state, $message, $resp) = @_;
202 0 0         unless ($state eq 'OK') {
203 0           $self->_set_last_error([ $state, $message ]);
204 0           $self->_set_state('pause');
205 0           $cb->($state => $message);
206 0           return;
207             }
208              
209              
210             # have no permissions
211 0 0         if ($resp->{CODE} == ER_TNT_PERMISSIONS) {
    0          
212 0           $self->_spaces([]);
213             } elsif ($resp->{CODE}) {
214             $self->_set_last_error([ ER_REQUEST =>
215 0           'Can not load tarantool schema', $resp->{CODE} ]);
216 0           $self->_set_state('pause');
217 0           $cb->(@{ $self->last_error });
  0            
218 0           return;
219             } else {
220 0           $self->_spaces($resp->{DATA});
221             }
222              
223 0           $self->_log(debug => 'Loading indexes');
224             $self->_reconnector->ll->send_request(select =>
225             $resp->{SCHEMA_ID}, SPACE_index, 0, [], undef, undef, 'ALL', sub {
226              
227 0           my ($state, $message, $sync) = @_;
228 0 0         unless ($state eq 'OK') {
229 0           $self->_set_last_error([ $state, $message ]);
230 0           $self->_set_state('pause');
231 0           $cb->($state => $message);
232 0           return;
233             }
234              
235              
236             $self->_reconnector->ll->wait_response($sync, sub {
237 0           my ($state, $message, $resp) = @_;
238 0 0         unless ($state eq 'OK') {
239 0           $self->_set_last_error([ $state, $message ]);
240 0           $self->_set_state('pause');
241 0           $cb->($state => $message);
242 0           return;
243             }
244              
245 0 0         if ($resp->{CODE} == ER_TNT_PERMISSIONS) {
    0          
    0          
246 0           $self->_indexes([]);
247             } elsif ($resp->{CODE} == ER_TNT_SCHEMA) {
248             # collision again!
249 0           $self->_invalid_schema($cb);
250 0           return;
251              
252             } elsif ($resp->{CODE}) {
253             $self->_set_last_error([ ER_REQUEST =>
254 0           'Can not load tarantool schema', $resp->{CODE} ]);
255 0           $self->_set_state('pause');
256 0           $cb->(@{ $self->last_error });
  0            
257 0           return;
258              
259             } else {
260 0           $self->_indexes($resp->{DATA});
261             }
262              
263 0           $self->_set_schema($resp->{SCHEMA_ID});
264 0           $self->_set_state('ready');
265              
266 0           $cb->('OK', 'Connected, schema loaded');
267 0           $self->request;
268 0           });
269 0           });
270 0           });
271 0           });
272             }
273              
274             sub _set_schema {
275 0     0     my ($self, $schema_id) = @_;
276              
277 0           my %sch;
278              
279 0           for (@{ $self->_spaces }) {
  0            
280 0           my $space = $sch{ $_->[0] } = $sch{ $_->[2] } = {
281             id => $_->[0],
282             name => $_->[2],
283             engine => $_->[3],
284             flags => $_->[5],
285             fields => $_->[6],
286             indexes => { }
287             };
288              
289 0           for (@{ $self->_indexes }) {
  0            
290 0 0         next unless $_->[0] == $space->{id};
291              
292             $space->{indexes}{ $_->[2] } =
293             $space->{indexes}{ $_->[1] } = {
294             id => $_->[1],
295             name => $_->[2],
296             type => $_->[3],
297             flags => $_->[4],
298             fields => [
299             map {
300             'HASH' eq ref $_ ?
301             { type => $_->{type}, no => $_->{field} }
302 0 0         : { type => $_->[1], no => $_->[0] }
303             }
304            
305 0           @{ $_->[5] }
  0            
306             ]
307             }
308             }
309             }
310              
311 0           $self->_set_last_schema($schema_id);
312 0           $self->_sch(\%sch);
313 0           $self->_indexes([]);
314 0           $self->_spaces([]);
315             }
316              
317             sub _tuples {
318 0     0     my ($self, $resp, $space, $cb) = @_;
319              
320              
321 0 0         unless (defined $space) {
322 0   0       $cb->(OK => 'Response received', $resp->{DATA} // []);
323 0           return;
324             }
325              
326 0 0         unless (exists $self->_sch->{ $space }) {
327 0   0       $cb->(OK => "Space $space not exists in schema", $resp->{DATA} // []);
328 0           return;
329             }
330              
331 0   0       my $res = $resp->{DATA} // [];
332 0           $space = $self->_sch->{ $space };
333              
334 0 0         if ($self->hashify_tuples) {
335 0           for my $tuple (@$res) {
336 0 0         next unless 'ARRAY' eq ref $tuple;
337 0           my %t;
338              
339 0           for (0 .. $#{ $space->{fields} }) {
  0            
340 0   0       my $fname = $space->{fields}[$_]{name} // sprintf "field:%02X", $_;
341 0           $t{$fname} = $tuple->[$_];
342             }
343              
344 0 0         if (@{ $space->{fields} } < @$tuple) {
  0            
345 0           $t{tail} = [ splice @$tuple, scalar @{ $space->{fields} } ];
  0            
346             } else {
347 0           $t{tail} = [];
348             }
349 0           $tuple = \%t;
350             }
351             }
352              
353 0           $cb->(OK => 'Response received', $res);
354             }
355              
356             sub restart {
357 0     0 0   my ($self, $cbc) = @_;
358              
359 0   0 0     $cbc ||= sub { };
360              
361 0           $self->_log(info => 'Starting connection to %s:%s (driver: %s)',
362             $self->host, $self->port, $self->driver);
363              
364 0           goto $self->state;
365              
366 0           init:
367             connecting:
368             schema:
369             pause:
370             ready:
371 0           $self->_set_state('connecting');
  0            
  0            
  0            
372             $self->_reconnector->ll->connect(sub {
373 0     0     my ($state, $message) = @_;
374 0 0         unless ($state eq 'OK') {
375 0           $self->_set_last_error([ $state, $message ]);
376 0           $self->_set_state('pause');
377 0           $cbc->(@{ $self->last_error });
  0            
378 0           return;
379             }
380              
381             $self->_reconnector->ll->handshake(sub {
382 0           my ($state, $message) = @_;
383 0 0         unless ($state eq 'OK') {
384 0           $self->_set_last_error([ $state, $message ]);
385 0           $self->_set_state('pause');
386 0           $cbc->(@{ $self->last_error });
  0            
387 0           return;
388             }
389              
390 0 0 0       unless ($self->user and $self->password) {
391 0           return $self->_preeval_lua($cbc);
392             }
393              
394             $self->_reconnector->ll->send_request(auth => undef, sub {
395 0           my ($state, $message, $sync) = @_;
396 0 0         unless ($state eq 'OK') {
397 0           $self->_set_last_error([ $state, $message ]);
398 0           $self->_set_state('pause');
399 0           $cbc->(@{ $self->last_error });
  0            
400 0           return;
401             }
402              
403             $self->_reconnector->ll->wait_response($sync, sub {
404 0           my ($state, $message, $resp) = @_;
405 0 0         unless ($state eq 'OK') {
406 0           $self->_set_last_error([ $state, $message ]);
407 0           $self->_set_state('pause');
408 0           $cbc->(@{ $self->last_error });
  0            
409 0           return;
410             }
411              
412 0 0         unless ($resp->{CODE} == 0) {
413 0           $self->_log(warning => 'Can not auth: Wrong login or password');
414             $self->_set_last_error([ ER_BROKEN_PASSWORD =>
415 0   0       $resp->{ERROR} // 'Wrong password']
416             );
417 0           $self->_set_state('pause');
418 0           $cbc->(@{ $self->last_error });
  0            
419 0           return;
420             }
421 0           $self->_preeval_lua($cbc);
422 0           });
423 0           });
424 0           });
425 0           });
426             }
427              
428             sub request {
429 0     0 0   my $self = shift;
430              
431 0 0         if (@_) {
432              
433 0 0         unless ('CODE' eq ref $_[-1]) {
434 0           croak 'usage: $connector->request(..., $CALLBACK)';
435             }
436             state $check = {
437             get => sub {
438 0 0   0     croak 'usage: $connector->get(space, index, key)'
439             unless @_ == 5;
440             },
441             select => sub {
442 0 0 0 0     croak 'usage: $connector->select(space, index, key[, limit, offset, iterator])'
443             unless @_ >= 5 and @_ <= 8;
444             },
445       0     update => sub { },
446             insert => sub {
447 0 0 0 0     croak 'usage: $connector->insert(space, tuple)'
448             unless @_ == 4 and 'ARRAY' eq ref $_[2];
449             },
450             replace => sub {
451 0 0 0 0     croak 'usage: $connector->replace(space, tuple)'
452             unless @_ == 4 and 'ARRAY' eq ref $_[2];
453             },
454             delete => sub {
455 0 0   0     croak 'usage: $connector->delete(space, key)'
456             unless @_ == 4;
457             },
458             call_lua => sub {
459 0 0   0     croak 'usage: $connector->call_lua(name[, args])'
460             unless @_ >= 3;
461             },
462             eval_lua => sub {
463 0 0   0     croak 'usage: $connector->eval_lua(code[, args])'
464             unless @_ >= 3;
465             },
466       0     ping => sub {
467             },
468             auth => sub {
469 0 0 0 0     croak 'usage: $connector->auth([user, password])'
470             unless @_ == 3 or @_ == 5;
471             },
472 0           };
473              
474 0 0 0       unless (exists $check->{ $_[0] // 'undef' }) {
475 0   0       croak 'unknown request method: ' . $_[0] // 'undef';
476             }
477              
478 0           $check->{ $_[0] }(@_);
479              
480 0           push @{ $self->_wait_ready } => \@_;
  0            
481             }
482              
483             restart:
484 0           goto $self->state;
485              
486              
487 0           init:
488             $self->_log(info => 'Autoconnect before first request');
489              
490             reinit:
491             $self->restart(sub {
492 0 0   0     return if $self->state eq 'ready';
493 0 0         unless (defined $self->reconnect_interval) {
494 0           my $list = $self->_wait_ready;
495 0           $self->_wait_ready([]);
496 0           for (@$list) {
497 0           $_->[-1](@{ $self->last_error });
  0            
498             }
499 0           return;
500             }
501 0           });
502              
503 0 0         if ($self->driver eq 'sync') {
504 0 0         goto ready if $self->state eq 'ready';
505 0           goto sync_redo_check;
506             }
507              
508 0           return;
509            
510 0 0         sync_redo_check:
511             goto no_reconnect_errors unless defined $self->reconnect_interval;
512 0           Time::HiRes::sleep($self->reconnect_interval);
513 0           goto reinit;
514              
515 0           schema:
516             connecting:
517 0           return;
518              
519             pause:
520 0 0         if ($self->driver eq 'sync') {
521 0 0         goto no_reconnect_errors unless defined $self->reconnect_interval;
522 0           my $pause = $self->reconnect_interval -
523             ($self->_now - $self->_state_changed);
524 0 0         Time::HiRes::sleep($pause) if $pause > 0;
525 0           goto reinit;
526             }
527 0 0         goto no_reconnect_errors unless defined $self->reconnect_interval;
528 0           return;
529             no_reconnect_errors: {
530 0           my $list = $self->_wait_ready;
  0            
531 0           $self->_wait_ready([]);
532 0           for (@$list) {
533 0           $_->[-1](@{ $self->last_error });
  0            
534             }
535             return
536 0           }
537              
538             ready:
539 0           while (my $request = shift @{ $self->_wait_ready }) {
  0            
540              
541 0 0         unless ($self->state eq 'ready') {
542 0           unshift @{ $self->_wait_ready } => $request;
  0            
543 0           goto restart;
544             };
545              
546 0           my @args = @$request;
547 0           my $name = shift @args;
548 0           my $cb = pop @args;
549              
550 0           my ($space, $index);
551 0           state $space_pos = {
552             select => 'index',
553             update => 'normal',
554             insert => 'normal',
555             replace => 'normal',
556             delete => 'normal',
557             call_lua => 'mayberef',
558             eval_lua => 'mayberef',
559             ping => 'none',
560             auth => 'none',
561             };
562              
563 0 0         croak "unknown method $name" unless exists $space_pos->{$name};
564              
565 0           goto $space_pos->{$name};
566              
567 0           index:
568             $space = $args[0];
569 0 0         unless (exists $self->_sch->{ $space }) {
570 0           $self->_set_last_error([ER_NOSPACE => "Space $space not found"]);
571 0           $cb->(@{ $self->last_error });
  0            
572 0           next;
573             }
574 0           $args[0] = $self->_sch->{ $space }{id};
575              
576 0           $index = $args[1];
577 0 0         unless (exists $self->_sch->{ $space }{indexes}{ $index }) {
578 0           $self->_set_last_error(
579             [ER_NOINDEX => "Index space[$space].$index not found"]
580             );
581 0           $cb->(@{ $self->last_error });
  0            
582 0           next;
583             }
584              
585 0           $index = $args[1] = $self->_sch->{$space}{indexes}{ $index }{id};
586 0           goto do_request;
587              
588 0           normal:
589             $space = $args[0];
590 0 0         unless (exists $self->_sch->{ $space }) {
591 0           $self->_set_last_error([ER_NOSPACE => "Space $space not found"]);
592 0           $cb->(@{ $self->last_error });
  0            
593 0           next;
594             }
595 0           $space = $args[0] = $self->_sch->{ $space }{id};
596 0           goto do_request;
597              
598             mayberef:
599 0 0         if ('ARRAY' eq ref $args[0]) {
600 0           ($args[0], $space) = @{ $args[0] };
  0            
601             }
602 0 0         goto do_request unless defined $space;
603 0 0         unless (exists $self->_sch->{ $space }) {
604 0           $self->_set_last_error([ER_NOSPACE => "Space $space not found"]);
605 0           $cb->(@{ $self->last_error });
  0            
606 0           next;
607             }
608 0           $space = $self->_sch->{ $space }{id};
609              
610              
611             none:
612              
613             do_request:
614              
615             $self->_reconnector->ll->send_request($name, $self->last_schema,
616             @args, sub {
617 0     0     my ($state, $message, $sync) = @_;
618 0 0         unless ($state eq 'OK') {
619 0           $self->_set_last_error([ $state => $message ]);
620 0           $self->_set_state('pause');
621 0           $cb->(@{ $self->last_error });
  0            
622 0           return;
623             }
624              
625             $self->_reconnector->ll->wait_response($sync, sub {
626 0           my ($state, $message, $resp) = @_;
627 0 0         unless ($state eq 'OK') {
628 0           $self->_set_last_error([ $state => $message ]);
629 0           $self->_set_state('pause');
630 0           $cb->(@{ $self->last_error });
  0            
631 0           return;
632             }
633              
634             # schema collision
635 0 0         if ($resp->{CODE} == ER_TNT_SCHEMA) {
636 0           $self->_log(warning => 'Detected schema collision');
637 0           $self->_log(info => 'Defer request "%s" until schema loaded', $name);
638 0           unshift @{ $self->_wait_ready } => $request;
  0            
639 0 0         $self->_invalid_schema(sub {}) if $self->state eq 'ready';
640 0           return;
641             }
642              
643 0 0         unless ($resp->{CODE} == 0) {
644             $self->_set_last_error(
645 0           [ ER_REQUEST => $resp->{ERROR}, $resp->{CODE} ]
646             );
647 0           $cb->(@{ $self->last_error });
  0            
648 0           return;
649             }
650              
651 0 0         if ($resp->{SCHEMA_ID} != $self->last_schema) {
652 0           $self->_log(info => 'request was changed schema id');
653             }
654 0           $self->_set_last_error(undef);
655 0           $self->_tuples($resp, $space, $cb);
656 0           });
657 0           });
  0            
658             }
659 0           return;
660              
661             }
662              
663             sub _now {
664 0     0     my ($self) = @_;
665 0 0         return Time::HiRes::time() if $self->driver eq 'sync';
666 0           return AnyEvent->now();
667             }
668              
669             sub BUILD {
670 0     0 1   my ($self) = @_;
671 0           goto $self->driver;
672              
673 0           sync:
674             require Time::HiRes;
675 0           return;
676 0           async:
677             require AnyEvent;
678 0           return;
679             }
680              
681             __PACKAGE__->meta->make_immutable;
682