File Coverage

blib/lib/Cassandra/Client/Connection.pm
Criterion Covered Total %
statement 58 608 9.5
branch 0 252 0.0
condition 0 60 0.0
subroutine 20 81 24.6
pod 0 30 0.0
total 78 1031 7.5


line stmt bran cond sub pod time code
1             package Cassandra::Client::Connection;
2             our $AUTHORITY = 'cpan:TVDW';
3             $Cassandra::Client::Connection::VERSION = '0.21';
4 13     13   269 use 5.010;
  13         89  
5 13     13   131 use strict;
  13         28  
  13         349  
6 13     13   77 use warnings;
  13         23  
  13         721  
7 13     13   82 use vars qw/$BUFFER/;
  13         22  
  13         801  
8              
9 13     13   65 use Ref::Util qw/is_blessed_ref is_plain_arrayref/;
  13         23  
  13         936  
10 13     13   7303 use IO::Socket::INET;
  13         389789  
  13         94  
11 13     13   15592 use IO::Socket::INET6;
  13         88553  
  13         165  
12 13     13   12802 use Errno qw/EAGAIN/;
  13         32  
  13         2085  
13 13     13   91 use Socket qw/SOL_SOCKET IPPROTO_TCP SO_KEEPALIVE TCP_NODELAY/;
  13         27  
  13         1007  
14 13     13   77 use Scalar::Util qw/weaken/;
  13         40  
  13         723  
15 13     13   9691 use Net::SSLeay qw/ERROR_WANT_READ ERROR_WANT_WRITE ERROR_NONE/;
  13         170094  
  13         12203  
16              
17 13     13   8560 use Cassandra::Client::Util;
  13         51  
  13         1350  
18 13         6870 use Cassandra::Client::Protocol qw/
19             :constants
20             %consistency_lookup
21             %batch_type_lookup
22             pack_bytes
23             pack_longstring
24             pack_queryparameters
25             pack_shortbytes
26             pack_stringmap
27             pack_stringlist
28             unpack_bytes
29             unpack_errordata
30             unpack_inet
31             unpack_int
32             unpack_metadata
33             unpack_shortbytes
34             unpack_string
35             unpack_stringlist
36             unpack_stringmultimap
37 13     13   10548 /;
  13         57  
38 13     13   146 use Cassandra::Client::Error::Base;
  13         28  
  13         312  
39 13     13   7734 use Cassandra::Client::ResultSet;
  13         45  
  13         527  
40 13     13   6661 use Cassandra::Client::TLSHandling;
  13         55  
  13         2004  
41              
42 13     13   94 use constant STREAM_ID_LIMIT => 32768;
  13         25  
  13         114131  
43              
44             # Populated at BEGIN{} time
45             my @compression_preference;
46             my %available_compression;
47              
48             sub new {
49 0     0 0   my ($class, %args)= @_;
50              
51             my $self= bless {
52             client => $args{client},
53             async_io => $args{async_io},
54             pool_id => undef,
55              
56             options => $args{options},
57             request_timeout => $args{options}{request_timeout},
58             host => $args{host},
59             metadata => $args{metadata},
60             prepare_cache => $args{metadata}->prepare_cache,
61             last_stream_id => 0,
62             pending_streams => {},
63             in_prepare => {},
64              
65             decompress_func => undef,
66             compress_func => undef,
67             connected => 0,
68             connecting => undef,
69             socket => undef,
70             fileno => undef,
71             pending_write => undef,
72             shutdown => 0,
73             read_buffer => \(my $empty= ''),
74             bytes_sent => 0,
75             bytes_read => 0,
76              
77             tls => undef,
78             tls_want_write => undef,
79              
80             healthcheck => undef,
81             protocol_version => $args{options}{protocol_version},
82 0           }, $class;
83 0           weaken($self->{async_io});
84 0           weaken($self->{client});
85 0           return $self;
86             }
87              
88             sub get_local_status {
89 0     0 0   my ($self, $callback)= @_;
90              
91             series([
92             sub {
93 0     0     my ($next)= @_;
94 0           $self->execute_prepared($next, \"select key, data_center, host_id, broadcast_address, rack, release_version, tokens, schema_version from system.local");
95             },
96             sub {
97 0     0     my ($next, $result)= @_;
98              
99 0           my %local_status= map { $_->[3] => {
100             peer => $_->[3],
101             data_center => $_->[1],
102             host_id => $_->[2],
103             preferred_ip => $_->[3],
104             rack => $_->[4],
105             release_version => $_->[5],
106             tokens => $_->[6],
107             schema_version => $_->[7],
108 0           } } @{$result->rows};
  0            
109              
110 0           $next->(undef, \%local_status);
111             },
112 0           ], $callback);
113              
114 0           return;
115             }
116              
117             sub get_peers_status {
118 0     0 0   my ($self, $callback)= @_;
119              
120             series([
121             sub {
122 0     0     my ($next)= @_;
123 0           $self->execute_prepared($next, \"select peer, data_center, host_id, preferred_ip, rack, release_version, tokens, schema_version from system.peers");
124             },
125             sub {
126 0     0     my ($next, $result)= @_;
127              
128 0           my %network_status= map { $_->[0] => {
129             peer => $_->[0],
130             data_center => $_->[1],
131             host_id => $_->[2],
132             preferred_ip => $_->[3],
133             rack => $_->[4],
134             release_version => $_->[5],
135             tokens => $_->[6],
136             schema_version => $_->[7],
137 0           } } @{$result->rows};
  0            
138              
139 0           $next->(undef, \%network_status);
140             },
141 0           ], $callback);
142              
143 0           return;
144             }
145              
146             sub get_network_status {
147 0     0 0   my ($self, $callback)= @_;
148              
149             parallel([
150             sub {
151 0     0     my ($next)= @_;
152 0           $self->get_peers_status($next);
153             },
154             sub {
155 0     0     my ($next)= @_;
156 0           $self->get_local_status($next);
157             },
158             ], sub {
159 0     0     my ($error, $peers, $local)= @_;
160 0 0         if ($error) { return $callback->($error); }
  0            
161 0           return $callback->(undef, { %$peers, %$local });
162 0           });
163             }
164              
165             sub register_events {
166 0     0 0   my ($self, $callback)= @_;
167              
168 0           $self->request($callback, OPCODE_REGISTER, pack_stringlist([
169             'TOPOLOGY_CHANGE',
170             'STATUS_CHANGE',
171             ]));
172              
173 0           return;
174             }
175              
176              
177             ###### QUERY CODE
178             sub execute_prepared {
179 0     0 0   my ($self, $callback, $queryref, $parameters, $attr, $exec_info)= @_;
180              
181             # Note: parameters is retained until the query is complete. It must not be changed; clone if needed.
182             # Same for attr. Note that external callers automatically have their arguments cloned.
183              
184 0 0         my $prepared= $self->{prepare_cache}{$$queryref} or do {
185 0           return $self->prepare_and_try_execute_again($callback, $queryref, $parameters, $attr, $exec_info);
186             };
187              
188 0           my $want_result_metadata= !$prepared->{decoder};
189 0           my $row;
190 0 0         if ($parameters) {
191             eval {
192 0           $row= $prepared->{encoder}->encode($parameters);
193 0           1;
194 0 0         } or do {
195 0   0       my $error= $@ || "??";
196 0           return $callback->("Failed to encode row to native protocol: $error");
197             };
198             }
199              
200 0   0       my $consistency= $consistency_lookup{$attr->{consistency} || 'one'};
201 0 0         if (!defined $consistency) {
202 0           return $callback->("Invalid consistency level specified: $attr->{consistency}");
203             }
204              
205 0   0       my $page_size= (0+($attr->{page_size} || $self->{options}{max_page_size} || 0)) || undef;
206 0   0       my $paging_state= $attr->{page} || undef;
207 0           my $execute_body= pack_shortbytes($prepared->{id}).pack_queryparameters($consistency, !$want_result_metadata, $page_size, $paging_state, undef, $row);
208              
209             my $on_completion= sub {
210             # my ($body)= $_[2]; (not copying, because performance. assuming ownership)
211 0     0     my ($err, $code)= @_;
212              
213 0 0         if ($err) {
214 0 0 0       if (is_blessed_ref($err) && $err->code == 0x2500) {
215 0           return $self->prepare_and_try_execute_again($callback, $queryref, $parameters, $attr, $exec_info);
216             }
217 0           return $callback->($err);
218             }
219              
220 0 0         if ($code != OPCODE_RESULT) {
221             # This shouldn't ever happen...
222 0           return $callback->(Cassandra::Client::Error::Base->new(
223             message => "Expected a RESULT frame but got something else; considering the query failed",
224             request_error => 1,
225             ));
226             }
227              
228 0           $self->decode_result($callback, $prepared, $_[2]);
229 0           };
230              
231 0 0         return $callback->($attr->{_synthetic_error}) if ($attr->{_synthetic_error});
232              
233 0           $self->request($on_completion, OPCODE_EXECUTE, $execute_body);
234              
235 0           return;
236             }
237              
238             sub prepare_and_try_execute_again {
239 0     0 0   my ($self, $callback, $queryref, $parameters, $attr, $exec_info)= @_;
240              
241 0 0         if ($exec_info->{_prepared_and_tried_again}++) {
242 0           return $callback->("Query failed because it seems to be missing from the server's prepared statement cache");
243             }
244              
245             series([
246             sub {
247 0     0     my ($next)= @_;
248 0           $self->prepare($next, $$queryref);
249             },
250             ], sub {
251 0 0   0     return $callback->($_[0]) if $_[0];
252              
253 0 0         unless ($self->{prepare_cache}{$$queryref}) {
254             # We're recursing, so let's make sure we avoid the infinite loop
255 0           return $callback->("Internal error: expected query to be prepared but it was not");
256             }
257              
258 0           return $self->execute_prepared($callback, $queryref, $parameters, $attr, $exec_info);
259 0           });
260 0           return;
261             }
262              
263             sub execute_batch {
264 0     0 0   my ($self, $callback, $queries, $attribs, $exec_info)= @_;
265             # Like execute_prepared, assumes ownership of $queries and $attribs
266              
267 0 0         if (!is_plain_arrayref($queries)) {
268 0           return $callback->("execute_batch: queries argument must be an array of arrays");
269             }
270              
271 0           my @prepared;
272 0           for my $query (@$queries) {
273 0 0         if (!is_plain_arrayref($query)) {
274 0           return $callback->("execute_batch: entries in query argument must be arrayrefs");
275             }
276 0 0         if (!$query->[0]) {
277 0           return $callback->("Empty or no query given, cannot execute as part of a batch");
278             }
279 0 0 0       if ($query->[1] && !is_plain_arrayref($query->[1])) {
280 0           return $callback->("Query parameters to batch() must be given as an arrayref");
281             }
282              
283 0 0         if (my $prep= $self->{prepare_cache}{$query->[0]}) {
284 0           push @prepared, [ $prep, $query->[1] ];
285              
286             } else {
287 0           return $self->prepare_and_try_batch_again($callback, $queries, $attribs, $exec_info);
288             }
289             }
290              
291 0           my $batch_type= 0;
292 0 0         if ($attribs->{batch_type}) {
293 0           $batch_type= $batch_type_lookup{$attribs->{batch_type}};
294 0 0         if (!defined $batch_type) {
295 0           return $callback->("Unknown batch_type: <$attribs->{batch_type}>");
296             }
297             }
298              
299 0   0       my $consistency= $consistency_lookup{$attribs->{consistency} || 'one'};
300 0 0         if (!defined $consistency) {
301 0           return $callback->("Invalid consistency level specified: $attribs->{consistency}");
302             }
303              
304 0           my $batch_frame= pack('Cn', $batch_type, (0+@prepared));
305 0           for my $prep (@prepared) {
306 0           $batch_frame .= pack('C', 1).pack_shortbytes($prep->[0]{id}).$prep->[0]{encoder}->encode($prep->[1]);
307             }
308 0           $batch_frame .= pack('nC', $consistency, 0);
309              
310             my $on_completion= sub {
311             # my ($body)= $_[2]; (not copying, because performance. assuming ownership)
312 0     0     my ($err, $code)= @_;
313              
314 0 0         if ($err) {
315 0 0 0       if (is_blessed_ref($err) && $err->code == 0x2500) {
316 0           return $self->prepare_and_try_batch_again($callback, $queries, $attribs, $exec_info);
317             }
318 0           return $callback->($err);
319             }
320              
321 0 0         if ($code != OPCODE_RESULT) {
322             # This shouldn't ever happen...
323 0           return $callback->(Cassandra::Client::Error::Base->new(
324             message => "Expected a RESULT frame but got something else; considering the batch failed",
325             request_error => 1,
326             ));
327             }
328              
329 0           $self->decode_result($callback, undef, $_[2]);
330 0           };
331              
332 0           $self->request($on_completion, OPCODE_BATCH, $batch_frame);
333              
334 0           return;
335             }
336              
337             sub prepare_and_try_batch_again {
338 0     0 0   my ($self, $callback, $queries, $attribs, $exec_info)= @_;
339              
340 0 0         if ($exec_info->{_prepared_and_tried_again}++) {
341 0           return $callback->("Batch failed because one or more queries seem to be missing from the server's prepared statement cache");
342             }
343              
344 0           my %to_be_prepared;
345 0           $to_be_prepared{$_->[0]}= 1 for @$queries;
346              
347             parallel([
348 0           map { my $query= $_; sub {
349 0     0     my ($next)= @_;
350 0           $self->prepare($next, $query);
351 0           } } keys %to_be_prepared
352             ], sub {
353 0 0   0     return $callback->($_[0]) if $_[0];
354              
355 0           return $self->execute_batch($callback, $queries, $attribs, $exec_info);
356 0           });
357 0           return;
358             }
359              
360             sub prepare {
361 0     0 0   my ($self, $callback, $query)= @_;
362              
363 0 0         if (exists $self->{in_prepare}{$query}) {
364 0           push @{$self->{in_prepare}{$query}}, $callback;
  0            
365 0           return;
366             }
367              
368 0           $self->{in_prepare}{$query}= [ $callback ];
369              
370             series([
371             sub {
372 0     0     my ($next)= @_;
373 0           my $req= pack_longstring($query);
374 0           $self->request($next, OPCODE_PREPARE, $req);
375             },
376             sub {
377 0     0     my ($next, $code, $body)= @_;
378 0 0         if ($code != OPCODE_RESULT) {
379 0           return $next->("Got unexpected failure while trying to prepare");
380             }
381              
382 0           my $result_type= unpack_int($body);
383 0 0         if ($result_type != RESULT_PREPARED) {
384 0           return $next->("Unexpected response from server while preparing");
385             }
386              
387 0           my $id= unpack_shortbytes($body);
388              
389 0           my ($encoder, $decoder);
390 0 0         eval {
391 0           ($encoder)= unpack_metadata($self->{protocol_version}, 0, $body);
392 0           1;
393             } or return $next->("Unable to unpack query metadata: $@");
394 0 0         eval {
395 0           ($decoder)= unpack_metadata($self->{protocol_version}, 1, $body);
396 0           1;
397             } or return $next->("Unable to unpack query result metadata: $@");
398              
399 0           $self->{metadata}->add_prepared($query, $id, $decoder, $encoder);
400 0           return $next->();
401             },
402             ], sub {
403 0     0     my $error= shift;
404 0 0         my $in_prepare= delete($self->{in_prepare}{$query}) or die "BUG";
405 0           $_->($error) for @$in_prepare;
406 0           });
407              
408 0           return;
409             }
410              
411             sub decode_result {
412 0     0 0   my ($self, $callback, $prepared)= @_; # $_[3]=$body
413              
414 0           my $result_type= unpack('l>', substr($_[3], 0, 4, ''));
415 0 0         if ($result_type == RESULT_ROWS) { # Rows
    0          
    0          
    0          
416 0           my ($paging_state, $decoder);
417             eval {
418 0           ($decoder, $paging_state)= unpack_metadata($self->{protocol_version}, 1, $_[3]);
419 0           1;
420 0 0         } or do {
421 0           return $callback->("Unable to unpack query metadata: $@");
422             };
423 0   0       $decoder= $prepared->{decoder} || $decoder;
424              
425 0           $callback->(undef,
426             Cassandra::Client::ResultSet->new(
427             \$_[3],
428             $decoder,
429             $paging_state,
430             )
431             );
432              
433             } elsif ($result_type == RESULT_VOID) { # Void
434 0           return $callback->();
435              
436             } elsif ($result_type == RESULT_SET_KEYSPACE) { # Set_keyspace
437 0           my $new_keyspace= unpack_string($_[3]);
438 0           return $callback->();
439              
440             } elsif ($result_type == RESULT_SCHEMA_CHANGE) { # Schema change
441             return $self->wait_for_schema_agreement(sub {
442             # We may be passed an error. Ignore it, our query succeeded
443 0     0     $callback->();
444 0           });
445              
446             } else {
447 0           return $callback->("Query executed successfully but got an unexpected response type");
448             }
449 0           return;
450             }
451              
452             sub wait_for_schema_agreement {
453 0     0 0   my ($self, $callback)= @_;
454              
455 0           my $waited= 0;
456 0           my $wait_delay= 0.5;
457 0           my $max_wait= 20;
458              
459 0           my $done;
460             whilst(
461 0     0     sub { !$done },
462             sub {
463 0     0     my ($whilst_next)= @_;
464              
465             $self->get_network_status(sub {
466 0           my ($error, $network_status)= @_;
467 0 0         return $whilst_next->($error) if $error;
468              
469 0           my %versions;
470 0           $versions{$_->{schema_version}}= 1 for values %$network_status;
471 0 0         if (keys %versions > 1) {
472 0 0         if ($waited >= $max_wait) {
473 0           return $whilst_next->("wait_for_schema_agreement timed out after $waited seconds");
474             }
475              
476 0           $waited += $wait_delay;
477 0           return $self->{async_io}->timer($whilst_next, $wait_delay);
478             } else {
479 0           $done= 1;
480 0           return $whilst_next->();
481             }
482 0           });
483             },
484 0           $callback,
485             );
486              
487 0           return;
488             }
489              
490              
491              
492             ###### PROTOCOL CODE
493             sub handshake {
494 0     0 0   my ($self, $callback)= @_;
495             series([
496             sub { # Send the OPCODE_OPTIONS
497 0     0     my ($next)= @_;
498 0           $self->request($next, OPCODE_OPTIONS, '');
499             },
500             sub { # The server hopefully just told us what it supports, let's respond with a STARTUP message
501 0     0     my ($next, $response_code, $body)= @_;
502 0 0         if ($response_code != OPCODE_SUPPORTED) {
503 0           return $next->("Server returned an unexpected handshake");
504             }
505              
506 0           my $map= unpack_stringmultimap($body);
507              
508 0 0 0       unless ($map->{CQL_VERSION} && $map->{COMPRESSION}) {
509 0           return $next->("Server did not return compression and cql version information");
510             }
511              
512 0           my $selected_cql_version= $self->{options}{cql_version};
513 0 0         if (!$selected_cql_version) {
514 0           ($selected_cql_version)= reverse sort @{$map->{CQL_VERSION}};
  0            
515             }
516              
517 0           my %ss_compression= map { $_, 1 } @{$map->{COMPRESSION}};
  0            
  0            
518 0           my $selected_compression= $self->{options}{compression};
519 0 0         if (!$selected_compression) {
520 0           for (@compression_preference) {
521 0 0 0       if ($ss_compression{$_} && $available_compression{$_}) {
522 0           $selected_compression= $_;
523 0           last;
524             }
525             }
526             }
527 0 0 0       $selected_compression= undef if $selected_compression && $selected_compression eq 'none';
528              
529 0 0         if ($selected_compression) {
530 0 0         if (!$ss_compression{$selected_compression}) {
531 0           return $next->("Server did not support requested compression method <$selected_compression>");
532             }
533 0 0         if (!$available_compression{$selected_compression}) {
534 0           return $next->("Requested compression method <$selected_compression> is supported by the server but not by us");
535             }
536             }
537              
538 0 0         my $request_body= pack_stringmap({
539             CQL_VERSION => $selected_cql_version,
540             ($selected_compression ? (COMPRESSION => $selected_compression) : ()),
541             });
542              
543 0           $self->request($next, OPCODE_STARTUP, $request_body);
544              
545             # This needs to happen after we send the STARTUP message
546 0           $self->setup_compression($selected_compression);
547             },
548             sub { # By now we should know whether we need to authenticate
549 0     0     my ($next, $response_code, $body)= @_;
550 0 0         if ($response_code == OPCODE_READY) {
551 0           return $next->(undef, $body); # Pass it along
552             }
553              
554 0 0         if ($response_code == OPCODE_AUTHENTICATE) {
555 0           return $self->authenticate($next, $body);
556             }
557              
558 0           return $next->("Unexpected response from the server");
559             },
560             sub {
561 0     0     my ($next)= @_;
562 0 0         if ($self->{options}{keyspace}) {
563 0           return $self->execute_prepared($next, \('use "'.$self->{options}{keyspace}.'"'));
564             }
565 0           return $next->();
566             },
567             sub {
568 0     0     my ($next)= @_;
569 0 0         if (!$self->{ipaddress}) {
570 0           return $self->get_local_status($next);
571             }
572 0           return $next->();
573             },
574             sub {
575 0     0     my ($next, $status)= @_;
576 0 0         if ($status) {
577 0           my ($local)= values %$status;
578 0           $self->{ipaddress}= $local->{peer};
579 0           $self->{datacenter}= $local->{data_center};
580             }
581 0 0         if (!$self->{ipaddress}) {
582 0           return $next->("Unable to determine node's IP address");
583             }
584 0           return $next->();
585             }
586 0           ], $callback);
587              
588 0           return;
589             }
590              
591             sub authenticate {
592 0     0 0   my ($self, $callback, $initial_challenge)= @_;
593              
594 0           my $authenticator= unpack_string($initial_challenge);
595 0 0         if (!$self->{options}{authentication}) {
596 0           return $callback->("Server expected authentication using <$authenticator> but no credentials were set");
597             }
598              
599 0           my $auth;
600             eval {
601 0           $auth= $self->{options}{authentication}->begin($authenticator);
602 0           1;
603 0 0         } or do {
604 0           my $error= "Failed to initialize authentication mechanism: $@";
605 0           return $callback->($error);
606             };
607              
608 0           my $auth_done;
609 0           my $next_challenge= undef;
610             whilst(
611 0     0     sub { !$auth_done },
612             sub {
613 0     0     my ($whilst_next)= @_;
614              
615             series([
616             sub {
617 0           my $next= shift;
618             eval {
619 0           $auth->evaluate($next, $next_challenge);
620 0           1;
621 0 0         } or do {
622 0           return $next->("Failed to evaluate challenge: $@");
623             };
624             },
625             sub {
626 0           my ($next, $auth_response)= @_;
627 0           $self->request($next, OPCODE_AUTH_RESPONSE, pack_bytes($auth_response));
628             },
629             sub {
630 0           my ($next, $opcode, $body)= @_;
631 0 0         if ($opcode == OPCODE_AUTH_CHALLENGE) {
632 0           $next_challenge= unpack_bytes($body);
633 0           return $next->();
634             }
635 0 0         if ($opcode == OPCODE_AUTH_SUCCESS) {
636 0           $auth_done= 1;
637             eval {
638 0           $auth->success(unpack_bytes($body));
639 0           1;
640 0 0         } or do {
641 0           return $next->("Failed while finishing authentication: $@");
642             };
643 0           return $next->();
644             }
645 0           return $next->("Received unexpected opcode $opcode during authentication");
646             },
647 0           ], $whilst_next);
648             },
649 0           $callback,
650             );
651              
652 0           return;
653             }
654              
655             sub handle_event {
656 0     0 0   my ($self, $eventdata)= @_;
657 0           my $type= unpack_string($eventdata);
658 0 0         if ($type eq 'TOPOLOGY_CHANGE') {
    0          
659 0           my ($change, $ipaddress)= (unpack_string($eventdata), unpack_inet($eventdata));
660 0           $self->{client}->_handle_topology_change($change, $ipaddress);
661              
662             } elsif ($type eq 'STATUS_CHANGE') {
663 0           my ($change, $ipaddress)= (unpack_string($eventdata), unpack_inet($eventdata));
664 0           $self->{client}->_handle_status_change($change, $ipaddress);
665              
666             } else {
667 0           warn 'Received unknown event type: '.$type;
668             }
669             }
670              
671             sub get_pool_id {
672             $_[0]{pool_id}
673 0     0 0   }
674              
675             sub set_pool_id {
676 0     0 0   $_[0]{pool_id}= $_[1];
677             }
678              
679             sub ip_address {
680             $_[0]{ipaddress}
681 0     0 0   }
682              
683              
684              
685             ####### IO LOGIC
686             sub connect {
687 0     0 0   my ($self, $callback)= @_;
688 0 0         return $callback->() if $self->{connected};
689              
690 0 0         if ($self->{connecting}++) {
691 0           warn "BUG: Calling connect twice?";
692 0           return $callback->("Internal bug: called connect twice.");
693 0           return;
694             }
695              
696 0 0         if ($self->{options}{tls}) {
697             eval {
698 0           $self->{tls}= $self->{client}{tls}->new_conn;
699 0           1;
700 0 0         } or do {
701 0   0       my $error= $@ || "unknown TLS error";
702 0           return $callback->($error);
703             };
704             }
705              
706 0           my $socket; {
707 0           local $@;
  0            
708              
709 0 0         if ($self->{host} =~ /:/) {
710             # IPv6
711             $socket= IO::Socket::INET6->new(
712             PeerAddr => $self->{host},
713             PeerPort => $self->{options}{port},
714 0           Proto => 'tcp',
715             Blocking => 0,
716             );
717             } else {
718             # IPv6
719             $socket= IO::Socket::INET->new(
720             PeerAddr => $self->{host},
721             PeerPort => $self->{options}{port},
722 0           Proto => 'tcp',
723             Blocking => 0,
724             );
725             }
726              
727 0 0         unless ($socket) {
728 0           my $error= "Could not connect: $@";
729 0           return $callback->($error);
730             }
731              
732 0           $socket->setsockopt(SOL_SOCKET, SO_KEEPALIVE, 1);
733 0           $socket->setsockopt(IPPROTO_TCP, TCP_NODELAY, 1);
734             }
735              
736 0           $self->{socket}= $socket;
737 0           $self->{fileno}= $socket->fileno;
738 0           $self->{async_io}->register($self->{fileno}, $self);
739 0           $self->{async_io}->register_read($self->{fileno});
740              
741             # We create a fake buffer, to ensure we wait until we can actually write
742 0           $self->{pending_write}= '';
743 0           $self->{async_io}->register_write($self->{fileno});
744              
745 0 0         if ($self->{options}{tls}) {
746 0           Net::SSLeay::set_fd(${$self->{tls}}, $self->{fileno});
  0            
747 0           Net::SSLeay::set_connect_state(${$self->{tls}});
  0            
748             }
749              
750             $self->handshake(sub {
751 0     0     my $error= shift;
752 0           $self->{connected}= 1;
753 0 0         if ($error) {
754 0           $self->shutdown("Failed to connect: $error");
755             }
756 0           return $callback->($error);
757 0           });
758              
759 0           return;
760             }
761              
762             sub request {
763             # my $body= $_[3] (let's avoid copying that blob). Yes, this code assumes ownership of the body.
764 0     0 0   my ($self, $cb, $opcode)= @_;
765             return $cb->(Cassandra::Client::Error::Base->new(
766             message => "Connection shutting down",
767             request_error => 1,
768 0 0         )) if $self->{shutdown};
769              
770 0           my $pending= $self->{pending_streams};
771              
772 0           my $stream_id= $self->{last_stream_id} + 1;
773 0           my $attempts= 0;
774 0   0       while (exists($pending->{$stream_id}) || $stream_id >= STREAM_ID_LIMIT) {
775 0           $stream_id= (++$stream_id) % STREAM_ID_LIMIT;
776 0 0         return $cb->(Cassandra::Client::Error::Base->new(
777             message => "Cannot find a stream ID to post query with",
778             request_error => 1,
779             )) if ++$attempts >= STREAM_ID_LIMIT;
780             }
781 0           $self->{last_stream_id}= $stream_id;
782 0           $pending->{$stream_id}= [$cb, $self->{async_io}->deadline($self->{fileno}, $stream_id, $self->{request_timeout})];
783              
784             WRITE: {
785 0           my $flags= 0;
  0            
786              
787 0 0 0       if (length($_[3]) > 500 && (my $compress_func= $self->{compress_func})) {
788 0           $flags |= 1;
789 0           $compress_func->($_[3]);
790             }
791              
792 0           my $data= pack('CCsCN/a', $self->{protocol_version}, $flags, $stream_id, $opcode, $_[3]);
793              
794 0 0         if (defined $self->{pending_write}) {
795 0           $self->{pending_write} .= $data;
796 0           last WRITE;
797             }
798              
799 0 0         if ($self->{tls}) {
800 0           my $length= length $data;
801 0           my $rv= Net::SSLeay::write(${$self->{tls}}, $data);
  0            
802 0 0         if ($rv == $length) {
    0          
803 0           $self->{bytes_sent} += $rv;
804             # All good
805             } elsif ($rv > 0) {
806             # Partital write
807 0           substr($data, 0, $rv, '');
808 0           $self->{bytes_sent} += $rv;
809 0           $self->{pending_write}= $data;
810 0           $self->{async_io}->register_write($self->{fileno});
811             } else {
812 0           $rv= Net::SSLeay::get_error(${$self->{tls}}, $rv);
  0            
813 0 0 0       if ($rv == ERROR_WANT_WRITE || $rv == ERROR_WANT_READ || $rv == ERROR_NONE) {
      0        
814             # Ok...
815 0           $self->{pending_write}= $data;
816 0 0         if ($rv == ERROR_WANT_READ) {
817 0           $self->{tls_want_write}= 1;
818             } else {
819 0           $self->{async_io}->register_write($self->{fileno});
820             }
821             } else {
822             # We failed to send the request.
823 0           my $error= Net::SSLeay::ERR_error_string(Net::SSLeay::ERR_get_error());
824              
825             # We never actually sent our request, so take it out again
826 0           my $my_stream= delete $pending->{$stream_id};
827              
828             # Disable our stream's deadline
829 0           ${$my_stream->[1]}= 1;
  0            
830              
831 0           $self->shutdown($error);
832              
833             # Now fail our stream properly, but include the retry notice
834 0           $my_stream->[0]->(Cassandra::Client::Error::Base->new(
835             message => "Disconnected: $error",
836             do_retry => 1,
837             request_error => 1,
838             ));
839             }
840             }
841              
842             } else {
843 0           my $length= length $data;
844 0           my $result= syswrite($self->{socket}, $data, $length);
845 0 0 0       if ($result && $result == $length) {
    0 0        
846 0           $self->{bytes_sent} += $result;
847             # All good
848             } elsif (defined $result || $! == EAGAIN) {
849 0 0         substr($data, 0, $result, '') if $result;
850 0           $self->{bytes_sent} += $result;
851 0           $self->{pending_write}= $data;
852 0           $self->{async_io}->register_write($self->{fileno});
853             } else {
854             # Oh, we failed to send out the request. That's bad. Let's first find out what happened.
855 0           my $error= $!;
856              
857             # We never actually sent our request, so take it out again
858 0           my $my_stream= delete $pending->{$stream_id};
859              
860             # Disable our stream's deadline
861 0           ${$my_stream->[1]}= 1;
  0            
862              
863 0           $self->shutdown($error);
864              
865             # Now fail our stream properly, but include the retry notice
866 0           $my_stream->[0]->(Cassandra::Client::Error::Base->new(
867             message => "Disconnected: $error",
868             do_retry => 1,
869             request_error => 1,
870             ));
871             }
872             }
873             }
874              
875 0           return;
876             }
877              
878             sub can_read {
879 0     0 0   my ($self)= @_;
880 0           my $shutdown_when_done;
881 0           local *BUFFER= $self->{read_buffer};
882 0           my $bufsize= length $BUFFER;
883              
884             READ:
885 0           while (!$self->{shutdown}) {
886 0           my $should_read_more;
887              
888 0 0         if ($self->{tls}) {
889 0           my ($bytes, $rv)= Net::SSLeay::read(${$self->{tls}});
  0            
890 0 0         if (length $bytes) {
891 0           $BUFFER .= $bytes;
892 0           $bufsize += $rv;
893 0           $should_read_more= 1;
894 0           $self->{bytes_read} += $rv;
895             }
896              
897 0 0         if ($rv <= 0) {
898 0           $rv= Net::SSLeay::get_error(${$self->{tls}}, $rv);
  0            
899 0 0         if ($rv == ERROR_WANT_WRITE) {
    0          
    0          
900 0           $self->{async_io}->register_write($self->{fileno});
901             } elsif ($rv == ERROR_WANT_READ) {
902             # Can do! Wait for the next event.
903              
904             # Resume our write if needed.
905 0 0         if (delete $self->{tls_want_write}) {
906             # Try our write again!
907 0           $self->{async_io}->register_write($self->{fileno});
908             }
909             } elsif ($rv == ERROR_NONE) {
910             # Huh?
911             } else {
912 0           my $error= Net::SSLeay::ERR_error_string(Net::SSLeay::ERR_get_error());
913 0           $shutdown_when_done= "TLS error: $error";
914             }
915             }
916              
917             } else {
918 0           my $read_cnt= sysread($self->{socket}, $BUFFER, 16384, $bufsize);
919 0 0         if ($read_cnt) {
    0          
    0          
920 0           $bufsize += $read_cnt;
921 0 0         $should_read_more= 1 if $read_cnt >= 16384;
922 0           $self->{bytes_read} += $read_cnt;
923              
924             } elsif (!defined $read_cnt) {
925 0 0         if ($! != EAGAIN) {
926 0           my $error= "$!";
927 0           $shutdown_when_done= $error;
928             }
929             } elsif ($read_cnt == 0) { # EOF
930 0           $shutdown_when_done= "Disconnected from server";
931             }
932             }
933              
934             READ_NEXT:
935 0           while (1) {
936 0 0         last READ_NEXT if $bufsize < 9;
937 0           my ($version, $flags, $stream_id, $opcode, $bodylen)= unpack('CCsCN', substr($BUFFER, 0, 9));
938 0 0         if ($bufsize < $bodylen+9) {
939 0           last READ_NEXT;
940             }
941              
942 0           substr($BUFFER, 0, 9, '');
943 0           my $body= substr($BUFFER, 0, $bodylen, '');
944 0           $bufsize -= 9 + $bodylen;
945              
946 0 0 0       if (($flags & 1) && $body) {
947             # Decompress if needed
948 0           $self->{decompress_func}->($body);
949             }
950 0 0         if ($flags & 4) {
951             # FIXME: If we reach this (we shouldn't!), we're corrupting the user's data.
952 0           warn 'BUG: unexpectedly received custom QueryHandler payload';
953             }
954 0 0         if ($flags & 8) {
955             # Warnings were sent from the server. Relay them.
956 0           my $warnings= unpack_stringlist($body);
957 0           for my $warning (@$warnings) {
958 0           warn $warning;
959             }
960             }
961              
962 0 0         if ($stream_id != -1) {
963 0           my $stream_cb= delete $self->{pending_streams}{$stream_id};
964 0 0         if (!$stream_cb) {
    0          
965 0           warn 'BUG: received response for unknown stream';
966              
967             } elsif ($opcode == OPCODE_ERROR) {
968 0           my ($cb, $dl)= @$stream_cb;
969 0           $$dl= 1;
970              
971 0           my $error= unpack_errordata($body);
972 0           $cb->($error);
973              
974             } else {
975 0           my ($cb, $dl)= @$stream_cb;
976 0           $$dl= 1;
977 0           $cb->(undef, $opcode, $body);
978             }
979              
980             } else {
981 0           $self->handle_event($body);
982             }
983             }
984              
985 0 0         last READ unless $should_read_more;
986             }
987              
988 0 0         if ($shutdown_when_done) {
989 0           $self->shutdown($shutdown_when_done);
990             }
991              
992 0           return;
993             }
994              
995             sub can_write {
996 0     0 0   my ($self)= @_;
997              
998 0 0         if ($self->{tls}) {
999 0           my $rv= Net::SSLeay::write(${$self->{tls}}, $self->{pending_write});
  0            
1000 0 0         if ($rv > 0) {
1001 0           substr($self->{pending_write}, 0, $rv, '');
1002 0           $self->{bytes_sent} += $rv;
1003 0 0         if (!length $self->{pending_write}) {
1004 0           $self->{async_io}->unregister_write($self->{fileno});
1005 0           delete $self->{pending_write};
1006             }
1007 0           return;
1008              
1009             } else {
1010 0           $rv= Net::SSLeay::get_error(${$self->{tls}}, $rv);
  0            
1011 0 0         if ($rv == ERROR_WANT_WRITE) {
    0          
    0          
1012             # Wait until the next callback.
1013 0           return;
1014             } elsif ($rv == ERROR_WANT_READ) {
1015             # Unschedule ourselves
1016 0           $self->{async_io}->unregister_write($self->{fileno});
1017 0           $self->{tls_want_write}= 1;
1018 0           return;
1019             } elsif ($rv == ERROR_NONE) {
1020             # Huh?
1021 0           return;
1022             } else {
1023 0           my $error= Net::SSLeay::ERR_error_string(Net::SSLeay::ERR_get_error());
1024 0           return $self->shutdown("TLS error: $error");
1025             }
1026             }
1027              
1028             } else {
1029 0           my $result= syswrite($self->{socket}, $self->{pending_write});
1030 0 0         if (!defined($result)) {
1031 0 0         if ($! == EAGAIN) {
1032 0           return; # Huh. Oh well, whatever
1033             }
1034              
1035 0           my $error= "$!";
1036 0           return $self->shutdown($error);
1037             }
1038 0 0         if ($result == 0) { return; } # No idea whether that happens, but guard anyway.
  0            
1039 0           substr($self->{pending_write}, 0, $result, '');
1040 0           $self->{bytes_sent} += $result;
1041              
1042 0 0         if (!length $self->{pending_write}) {
1043 0           $self->{async_io}->unregister_write($self->{fileno});
1044 0           delete $self->{pending_write};
1045             }
1046             }
1047              
1048 0           return;
1049             }
1050              
1051             sub can_timeout {
1052 0     0 0   my ($self, $id)= @_;
1053 0           my $stream= delete $self->{pending_streams}{$id};
1054 0     0     $self->{pending_streams}{$id}= [ sub{}, \(my $zero= 0) ]; # fake it
1055 0           $stream->[0]->(Cassandra::Client::Error::Base->new(
1056             message => "Request timed out",
1057             is_timeout => 1,
1058             request_error => 1,
1059             ));
1060 0           $self->maybe_healthcheck;
1061 0           return;
1062             }
1063              
1064             sub maybe_healthcheck {
1065 0     0 0   my ($self)= @_;
1066 0 0         return if $self->{healthcheck};
1067 0           my $check= $self->{healthcheck}= {};
1068              
1069             # Mark how many bytes we've read right now
1070 0           $check->{bytes_read}= $self->{bytes_read};
1071              
1072             series([
1073             sub {
1074 0     0     my ($next)= @_;
1075 0           $self->request($next, OPCODE_OPTIONS, '');
1076             }
1077             ], sub {
1078 0     0     my ($error)= @_;
1079 0           $self->{healthcheck}= undef;
1080 0 0 0       if ($self->{bytes_read} > $check->{bytes_read}) {
    0 0        
    0          
1081             # Don't care what happened, the connection is still fine
1082             } elsif (!$error) {
1083             # All good
1084             } elsif (is_blessed_ref($error) && $error->is_request_error && $error->is_timeout) {
1085             # Not good.
1086 0           $self->shutdown("health check timed out");
1087             } else {
1088             # No clue. :-)
1089             }
1090 0           });
1091             }
1092              
1093             sub shutdown {
1094 0     0 0   my ($self, $shutdown_reason)= @_;
1095              
1096 0 0         return if $self->{shutdown};
1097 0           $self->{shutdown}= 1;
1098              
1099 0           my $pending= $self->{pending_streams};
1100 0           $self->{pending_streams}= {};
1101              
1102             # Disable our deadlines
1103 0           ${$_->[1]}= 1 for values %$pending;
  0            
1104              
1105 0           $self->{async_io}->unregister_read($self->{fileno});
1106 0 0         if (defined(delete $self->{pending_write})) {
1107 0           $self->{async_io}->unregister_write($self->{fileno});
1108             }
1109 0           $self->{async_io}->unregister($self->{fileno}, $self);
1110 0           $self->{client}->_disconnected($self->get_pool_id);
1111 0           $self->{socket}->close;
1112              
1113 0           for (values %$pending) {
1114 0           $_->[0]->(Cassandra::Client::Error::Base->new(
1115             message => "Disconnected: $shutdown_reason",
1116             request_error => 1,
1117             ));
1118             }
1119              
1120 0           return;
1121             }
1122              
1123              
1124              
1125             ###### COMPRESSION
1126             BEGIN {
1127 13     13   98 @compression_preference= qw/lz4 snappy/;
1128              
1129 13     13   1488 %available_compression= (
  13     13   8731  
  13         10316  
  13         205  
  13         6503  
  13         10243  
  13         229  
1130             snappy => scalar eval "use Compress::Snappy (); 1;",
1131             lz4 => scalar eval "use Compress::LZ4 (); 1;",
1132             );
1133             }
1134              
1135             sub setup_compression {
1136 0     0 0   my ($self, $type)= @_;
1137              
1138 0 0         return unless $type;
1139 0 0         if ($type eq 'snappy') {
    0          
1140 0           $self->{compress_func}= \&compress_snappy;
1141 0           $self->{decompress_func}= \&decompress_snappy;
1142             } elsif ($type eq 'lz4') {
1143 0           $self->{compress_func}= \&compress_lz4;
1144 0           $self->{decompress_func}= \&decompress_lz4;
1145             } else {
1146 0           warn 'Internal error: failed to set compression';
1147             }
1148              
1149 0           return;
1150             }
1151              
1152             sub compress_snappy {
1153 0     0 0   $_[0]= Compress::Snappy::compress(\$_[0]);
1154 0           return;
1155             }
1156              
1157             sub decompress_snappy {
1158 0 0   0 0   if ($_[0] ne "\0") {
1159 0           $_[0]= Compress::Snappy::decompress(\$_[0]);
1160             } else {
1161 0           $_[0]= '';
1162             }
1163 0           return;
1164             }
1165              
1166             sub compress_lz4 {
1167 0     0 0   $_[0]= pack('N', length($_[0])) . Compress::LZ4::lz4_compress(\$_[0]);
1168 0           return;
1169             }
1170              
1171             sub decompress_lz4 {
1172 0     0 0   my $len= unpack('N', substr $_[0], 0, 4, '');
1173 0 0         if ($len) {
1174 0           $_[0]= Compress::LZ4::lz4_decompress(\$_[0], $len);
1175             } else {
1176 0           $_[0]= '';
1177             }
1178 0           return;
1179             }
1180              
1181             1;
1182              
1183             __END__