File Coverage

blib/lib/Cassandra/Client.pm
Criterion Covered Total %
statement 81 321 25.2
branch 0 94 0.0
condition 0 45 0.0
subroutine 29 69 42.0
pod 2 3 66.6
total 112 532 21.0


line stmt bran cond sub pod time code
1             package Cassandra::Client;
2             our $AUTHORITY = 'cpan:TVDW';
3             $Cassandra::Client::VERSION = '0.21';
4             # ABSTRACT: Perl library for accessing Cassandra using its binary network protocol
5              
6 13     13   1792830 use 5.010;
  13         54  
7 13     13   77 use strict;
  13         38  
  13         505  
8 13     13   79 use warnings;
  13         23  
  13         669  
9              
10 13     13   7541 use Cassandra::Client::AsyncAnyEvent;
  13         39  
  13         498  
11 13     13   6957 use Cassandra::Client::AsyncEV;
  13         38  
  13         588  
12 13     13   6854 use Cassandra::Client::Config;
  13         48  
  13         574  
13 13     13   11451 use Cassandra::Client::Connection;
  13         71  
  13         673  
14 13     13   6873 use Cassandra::Client::Metadata;
  13         172  
  13         901  
15 13     13   7307 use Cassandra::Client::Policy::Queue::Default;
  13         61  
  13         548  
16 13     13   7201 use Cassandra::Client::Policy::Retry::Default;
  13         47  
  13         531  
17 13     13   85 use Cassandra::Client::Policy::Retry;
  13         25  
  13         575  
18 13     13   7765 use Cassandra::Client::Policy::Throttle::Default;
  13         45  
  13         823  
19 13     13   7268 use Cassandra::Client::Policy::LoadBalancing::Default;
  13         43  
  13         597  
20 13     13   7107 use Cassandra::Client::Pool;
  13         46  
  13         535  
21 13     13   121 use Cassandra::Client::TLSHandling;
  13         30  
  13         381  
22 13     13   66 use Cassandra::Client::Util qw/series whilst/;
  13         26  
  13         1138  
23              
24 13     13   7285 use Clone 0.36 qw/clone/;
  13         11444  
  13         1042  
25 13     13   102 use List::Util qw/shuffle/;
  13         29  
  13         1004  
26 13     13   6894 use AnyEvent::XSPromises qw/deferred/;
  13         149126  
  13         1116  
27 13     13   224 use Time::HiRes ();
  13         28  
  13         500  
28 13     13   85 use Ref::Util 0.008 qw/is_ref/;
  13         353  
  13         845  
29 13     13   90 use Devel::GlobalDestruction 0.11;
  13         287  
  13         115  
30 13     13   1050 use XSLoader;
  13         31  
  13         58165  
31              
32             our $XS_VERSION = ($Cassandra::Client::VERSION || '');
33             $XS_VERSION =~ s/\A(\d+)\.(\d+)(\d{3})\z/$1.$2_$3/;
34             XSLoader::load(__PACKAGE__, $XS_VERSION);
35              
36             sub new {
37 0     0 1 0 my ($class, %args)= @_;
38              
39 0         0 my $self= bless {
40             connected => 0,
41             connect_callbacks => undef,
42             shutdown => 0,
43              
44             active_queries => 0,
45             }, $class;
46              
47 0         0 my $options= Cassandra::Client::Config->new(
48             \%args
49             );
50              
51 0   0     0 $self->{throttler}= $options->{throttler} || Cassandra::Client::Policy::Throttle::Default->new();
52 0   0     0 $self->{retry_policy}= $options->{retry_policy} || Cassandra::Client::Policy::Retry::Default->new();
53 0   0     0 $self->{command_queue}= $options->{command_queue} || Cassandra::Client::Policy::Queue::Default->new();
54 0   0     0 $self->{load_balancing_policy}= $options->{load_balancing_policy} || Cassandra::Client::Policy::LoadBalancing::Default->new();
55              
56 0 0       0 my $async_class= $options->{anyevent} ? "Cassandra::Client::AsyncAnyEvent" : "Cassandra::Client::AsyncEV";
57 0         0 my $async_io= $async_class->new(
58             options => $options,
59             );
60 0         0 my $metadata= Cassandra::Client::Metadata->new(
61             options => $options,
62             );
63             my $pool= Cassandra::Client::Pool->new(
64             client => $self,
65             options => $options,
66             metadata => $metadata,
67             async_io => $async_io,
68             load_balancing_policy => $self->{load_balancing_policy},
69 0         0 );
70 0 0       0 my $tls= $options->{tls} ? Cassandra::Client::TLSHandling->new() : undef;
71              
72 0         0 $self->{options}= $options;
73 0         0 $self->{async_io}= $async_io;
74 0         0 $self->{metadata}= $metadata;
75 0         0 $self->{pool}= $pool;
76 0         0 $self->{tls}= $tls;
77              
78 0         0 return $self;
79             }
80              
81             sub _connect {
82 0     0   0 my ($self, $callback)= @_;
83 0 0       0 return _cb($callback) if $self->{connected};
84 0 0       0 return _cb($callback, 'Cannot connect: shutdown() has been called') if $self->{shutdown};
85              
86             # This is ONLY useful if the user doesn't throw away the C::C object on connect errors.
87 0 0 0     0 if (!$self->{connecting} && (my $error= $self->{throttler}->should_fail())) {
88 0         0 return _cb($callback, $error);
89             }
90              
91 0   0     0 push @{$self->{connect_callbacks}||=[]}, $callback;
  0         0  
92 0 0       0 if ($self->{connecting}++) {
93 0         0 return;
94             }
95              
96 0         0 my @contact_points= shuffle @{$self->{options}{contact_points}};
  0         0  
97 0         0 my $last_error= "No hosts to connect to";
98              
99 0         0 my $next_connect;
100             $next_connect= sub {
101 0     0   0 my $contact_point= shift @contact_points;
102 0 0       0 if (!$contact_point) {
103 0         0 delete $self->{connecting};
104 0         0 undef $next_connect;
105 0         0 _cb($_, "Unable to connect to any Cassandra server. Last error: $last_error") for @{delete $self->{connect_callbacks}};
  0         0  
106 0         0 return;
107             }
108              
109             my $connection= Cassandra::Client::Connection->new(
110             client => $self,
111             options => $self->{options},
112             host => $contact_point,
113             async_io => $self->{async_io},
114             metadata => $self->{metadata},
115 0         0 );
116              
117             series([
118             sub {
119 0         0 my ($next)= @_;
120 0         0 $connection->connect($next);
121             },
122             sub {
123 0         0 my ($next)= @_;
124 0         0 $self->{pool}->init($next, $connection);
125             },
126             ], sub {
127 0         0 my $error= shift;
128 0         0 $self->{throttler}->count($error);
129 0 0       0 if ($error) {
130 0         0 $last_error= "On $contact_point: $error";
131 0         0 return $next_connect->();
132             }
133              
134 0         0 undef $next_connect;
135 0         0 $self->{connected}= 1;
136 0         0 delete $self->{connecting};
137 0         0 _cb($_) for @{delete $self->{connect_callbacks}};
  0         0  
138 0         0 });
139 0         0 };
140 0         0 $next_connect->();
141              
142 0         0 return;
143             }
144              
145             sub shutdown {
146 0     0 1 0 my ($self)= @_;
147              
148 0 0       0 return if $self->{shutdown};
149 0         0 $self->{shutdown}= 1;
150 0         0 $self->{connected}= 0;
151              
152 0         0 $self->{pool}->shutdown;
153              
154 0         0 return;
155             }
156              
157             sub is_active {
158 0     0 0 0 my ($self)= @_;
159 0 0       0 return 0 unless $self->{connected};
160 0         0 return 1;
161             }
162              
163             sub _disconnected {
164 0     0   0 my ($self, $connid)= @_;
165 0         0 $self->{pool}->remove($connid);
166 0         0 return;
167             }
168              
169             sub _handle_topology_change {
170 0     0   0 my ($self, $change, $ipaddress)= @_;
171 0 0       0 if ($change eq 'NEW_NODE') {
    0          
172 0         0 $self->{pool}->event_added_node($ipaddress);
173             } elsif ($change eq 'REMOVED_NODE') {
174 0         0 $self->{pool}->event_removed_node($ipaddress);
175             } else {
176 0         0 warn "Received unknown topology change: $change for $ipaddress";
177             }
178             }
179              
180             sub _handle_status_change {
181 0     0   0 my ($self, $change, $ipaddress)= @_;
182             # XXX Ignored, for now
183 0         0 $self->{pool}->connect_if_needed;
184             }
185              
186              
187              
188             # Query functions
189             sub _prepare {
190 0     0   0 my ($self, $callback, $query)= @_;
191              
192             # Fast path: we're already done
193 0 0       0 if ($self->{metadata}->is_prepared(\$query)) {
194 0         0 return _cb($callback);
195             }
196              
197 0         0 $self->_command("prepare", $callback, [ $query ]);
198 0         0 return;
199             }
200              
201             sub _execute {
202 0     0   0 my ($self, $callback, $query, $params, $attribs)= @_;
203              
204 0         0 my $attribs_clone= clone($attribs);
205 0   0     0 $attribs_clone->{consistency} ||= $self->{options}{default_consistency};
206 0   0     0 $attribs_clone->{idempotent} ||= $self->{options}{default_idempotency};
207              
208 0         0 $self->_command("execute_prepared", $callback, [ \$query, clone($params), $attribs_clone ]);
209 0         0 return;
210             }
211              
212             sub _batch {
213 0     0   0 my ($self, $callback, $queries, $attribs)= @_;
214              
215 0         0 my $attribs_clone= clone($attribs);
216 0   0     0 $attribs_clone->{consistency} ||= $self->{options}{default_consistency};
217 0   0     0 $attribs_clone->{idempotent} ||= $self->{options}{default_idempotency};
218              
219 0         0 $self->_command("execute_batch", $callback, [ clone($queries), $attribs_clone ]);
220 0         0 return;
221             }
222              
223             sub _wait_for_schema_agreement {
224 0     0   0 my ($self, $callback)= @_;
225 0         0 $self->_command("wait_for_schema_agreement", $callback, []);
226 0         0 return;
227             }
228              
229              
230             # Command queue
231             sub _command {
232 0     0   0 my ($self, $command, $callback, $args)= @_;
233              
234 0         0 my $command_info= {
235             start_time => Time::HiRes::time(),
236             };
237              
238 0 0       0 goto OVERFLOW if $self->{active_queries} >= $self->{options}{max_concurrent_queries};
239              
240 0 0       0 goto SLOWPATH if !$self->{connected};
241              
242 0         0 my $connection= $self->{pool}->get_one;
243 0 0       0 goto SLOWPATH if !$connection;
244              
245 0 0       0 if (my $error= $self->{throttler}->should_fail()) {
246 0         0 return $self->_command_failed($command, $callback, $args, $command_info, $error);
247             }
248              
249 0         0 $self->{active_queries}++;
250             $connection->$command(sub {
251 0     0   0 my ($error, $result)= @_;
252 0         0 $self->{throttler}->count($error);
253              
254 0         0 $self->{active_queries}--;
255 0 0       0 $self->_schedule_command_dequeue if $self->{command_queue}{has_any};
256              
257 0 0       0 return $self->_command_failed($command, $callback, $args, $command_info, $error) if $error;
258              
259 0         0 $self->_report_stats($command, $command_info);
260 0         0 return _cb($callback, $error, $result);
261 0         0 }, @$args);
262              
263 0         0 return;
264              
265 0         0 SLOWPATH:
266             return $self->_command_slowpath($command, $callback, $args, $command_info);
267              
268 0         0 OVERFLOW:
269             return $self->_command_enqueue($command, $callback, $args, $command_info);
270             }
271              
272             sub _command_slowpath {
273 0     0   0 my ($self, $command, $callback, $args, $command_info)= @_;
274              
275 0         0 $self->{active_queries}++;
276              
277             series([
278             sub {
279 0     0   0 my ($next)= @_;
280 0         0 $self->_connect($next);
281             }, sub {
282 0     0   0 my ($next)= @_;
283 0         0 $self->{pool}->get_one_cb($next);
284             }, sub {
285 0     0   0 my ($next, $connection)= @_;
286 0 0       0 if (my $error= $self->{throttler}->should_fail()) {
287 0         0 return $next->($error);
288             }
289 0         0 $connection->$command($next, @$args);
290             }
291             ], sub {
292 0     0   0 my ($error, $result)= @_;
293 0         0 $self->{throttler}->count($error);
294              
295 0         0 $self->{active_queries}--;
296 0 0       0 $self->_schedule_command_dequeue if $self->{command_queue}{has_any};
297              
298 0 0       0 return $self->_command_failed($command, $callback, $args, $command_info, $error) if $error;
299              
300 0         0 $self->_report_stats($command, $command_info);
301 0         0 return _cb($callback, $error, $result);
302 0         0 });
303 0         0 return;
304             }
305              
306             sub _command_retry {
307 0     0   0 my ($self, $command, $callback, $args, $command_info)= @_;
308              
309 0         0 $command_info->{retries}++;
310              
311 0         0 my $delay= 0.1 * (2 ** $command_info->{retries});
312             $self->{async_io}->timer(sub {
313 0 0   0   0 if ($self->{active_queries} >= $self->{options}{max_concurrent_queries}) {
314 0         0 $self->_command_enqueue($command, $callback, $args, $command_info);
315             } else {
316 0         0 $self->_command_slowpath($command, $callback, $args, $command_info);
317             }
318 0         0 }, $delay);
319             }
320              
321             sub _command_failed {
322 0     0   0 my ($self, $command, $callback, $args, $command_info, $error)= @_;
323              
324 0 0       0 if (is_ref($error)) {
325 0         0 my $retry_decision;
326 0 0       0 my $statement = $command eq 'execute_prepared' ? {idempotent => $args->[2]->{idempotent}} : {};
327              
328 0 0       0 if ($error->do_retry) {
    0          
    0          
    0          
    0          
329 0         0 $retry_decision= Cassandra::Client::Policy::Retry::retry;
330             } elsif ($error->is_request_error) {
331 0   0     0 $retry_decision= $self->{retry_policy}->on_request_error($statement, undef, $error, ($command_info->{retries}||0));
332             } elsif ($error->isa('Cassandra::Client::Error::WriteTimeoutException')) {
333 0   0     0 $retry_decision= $self->{retry_policy}->on_write_timeout($statement, $error->cl, $error->write_type, $error->blockfor, $error->received, ($command_info->{retries}||0));
334             } elsif ($error->isa('Cassandra::Client::Error::ReadTimeoutException')) {
335 0   0     0 $retry_decision= $self->{retry_policy}->on_read_timeout($statement, $error->cl, $error->blockfor, $error->received, $error->data_retrieved, ($command_info->{retries}||0));
336             } elsif ($error->isa('Cassandra::Client::Error::UnavailableException')) {
337 0   0     0 $retry_decision= $self->{retry_policy}->on_unavailable($statement, $error->cl, $error->required, $error->alive, ($command_info->{retries}||0));
338             } else {
339 0         0 $retry_decision= Cassandra::Client::Policy::Retry::rethrow;
340             }
341              
342 0 0 0     0 if ($retry_decision && $retry_decision eq 'retry') {
343 0         0 return $self->_command_retry($command, $callback, $args, $command_info);
344             }
345             }
346              
347 0         0 $self->_report_stats($command, $command_info);
348 0         0 return $callback->($error);
349             }
350              
351             sub _command_enqueue {
352 0     0   0 my ($self, $command, $callback, $args, $command_info)= @_;
353 0 0       0 if (my $error= $self->{command_queue}->enqueue([$command, $callback, $args, $command_info])) {
354 0         0 return $self->_command_failed($command, $callback, $args, $command_info, "Cannot $command: $error");
355             }
356 0         0 return;
357             }
358              
359             sub _schedule_command_dequeue {
360 0     0   0 my ($self)= @_;
361 0 0       0 unless ($self->{command_callback_scheduled}++) {
362             $self->{async_io}->later(sub {
363 0     0   0 delete $self->{command_callback_scheduled};
364              
365 0   0     0 while ($self->{command_queue}{has_any} && $self->{active_queries} < $self->{options}{max_concurrent_queries}) {
366 0 0       0 my $item= $self->{command_queue}->dequeue or return;
367 0         0 $self->_command_slowpath(@$item);
368             }
369 0         0 });
370             }
371             }
372              
373             sub _report_stats {
374 0     0   0 my ($self, $command, $command_info)= @_;
375              
376 0         0 $command_info->{end_time}= Time::HiRes::time();
377              
378 0 0       0 if (my $stats_hook= $self->{options}{stats_hook}) {
379             _cb($stats_hook, timing => {
380             command => $command,
381             start_time => $command_info->{start_time},
382             end_time => $command_info->{end_time},
383 0         0 });
384             }
385             }
386              
387             # Utility functions that wrap query functions
388             sub _each_page {
389 0     0   0 my ($self, $callback, $query, $params, $attribs, $page_callback)= @_;
390              
391 0 0       0 my $params_copy= $params ? clone($params) : undef;
392 0 0       0 my $attribs_copy= $attribs ? clone($attribs) : undef;
393              
394 0         0 my $done= 0;
395             whilst(
396 0     0   0 sub { !$done },
397             sub {
398 0     0   0 my $next= shift;
399              
400             $self->_execute(sub {
401             # Completion handler, with page data (or an error)
402 0         0 my ($error, $result)= @_;
403 0 0       0 return $next->($error) if $error;
404              
405 0         0 my $next_page_id= $result->next_page;
406 0         0 _cb($page_callback, $result); # Note that page_callback doesn't get an error argument, that's intentional
407              
408 0 0       0 if ($next_page_id) {
409 0         0 $attribs_copy->{page}= $next_page_id;
410             } else {
411 0         0 $done= 1;
412             }
413 0         0 return $next->();
414 0         0 }, $query, $params_copy, $attribs_copy);
415             },
416             sub {
417 0     0   0 my $error= shift;
418 0         0 return _cb($callback, $error);
419             }
420 0         0 );
421              
422 0         0 return;
423             }
424              
425             sub DESTROY {
426 0     0   0 local $@;
427 0 0       0 return if in_global_destruction;
428              
429 0         0 my $self= shift;
430 0 0       0 if ($self->{connected}) {
431 0         0 $self->shutdown;
432             }
433             }
434              
435              
436             # Utility functions for callers
437             sub _get_stacktrace {
438             # This gets called a lot. Let's keep it fast.
439              
440 0     0   0 my $trace= '';
441 0         0 my ($c, $file, $line)= caller(1);
442 0         0 $trace .= " $c ($file:$line)\n";
443 0 0       0 ($c, $file, $line)= caller(2) or goto DONE;
444 0         0 $trace .= " $c ($file:$line)\n";
445 0 0       0 ($c, $file, $line)= caller(3) or goto DONE;
446 0         0 $trace .= " $c ($file:$line)\n";
447              
448 0         0 DONE:
449             return $trace;
450             }
451              
452             sub _cb {
453 0     0   0 my $cb= shift;
454             eval {
455 0         0 &$cb; 1
  0         0  
456 0 0       0 } or do {
457 0   0     0 my $error= $@ || "unknown error";
458 0         0 warn "Ignoring unhandled exception in callback: $error";
459             };
460              
461 0         0 return;
462             }
463              
464             sub _mksync { # Translates an asynchronous call into something that looks like Perl
465 78     78   137 my ($sub)= @_;
466             return sub {
467 0     0   0 my $self= shift;
468 0         0 $sub->($self, $self->{async_io}->wait(my $w), @_);
469 0         0 my ($err, @output)= $w->();
470 0 0       0 if ($err) { die $err; }
  0         0  
471 0         0 return @output;
472 78         362 };
473             }
474              
475             sub _mkcall { # Basically _mksync, but returns the error instead of dying
476 78     78   132 my ($sub)= @_;
477             return sub {
478 0     0   0 my $self= shift;
479 0         0 $sub->($self, $self->{async_io}->wait(my $w), @_);
480 0         0 return $w->();
481 78         337 };
482             }
483              
484             sub _mkpromise {
485 78     78   127 my ($sub)= @_;
486             return sub {
487 0     0   0 my $self= shift;
488 0         0 my $trace= &_get_stacktrace;
489 0         0 my $deferred= deferred;
490              
491             $sub->($self, sub {
492 0     0   0 my ($error, @output)= @_;
493 0 0       0 if ($error) {
494 0         0 $deferred->reject("$error\n\nTrace:\n$trace");
495             } else {
496 0         0 $deferred->resolve(@output);
497             }
498 0         0 }, @_);
499              
500 0         0 return $deferred->promise;
501 78         346 };
502             }
503              
504             sub _mkfuture {
505 78     78   158 my ($sub)= @_;
506             return sub {
507 0     0   0 my $self= shift;
508 0         0 my $trace= &_get_stacktrace;
509 0         0 $sub->($self, $self->{async_io}->wait(my $w), @_);
510             return sub {
511 0     0   0 my ($error, @output)= $w->();
512 0 0       0 if ($error) { die "$error\n\nTrace:\n$trace"; }
  0         0  
513 0         0 return @output;
514 0         0 };
515             }
516 78         356 }
517              
518             sub _mkfuture_call {
519 78     78   150 my ($sub)= @_;
520             return sub {
521 0     0     my $self= shift;
522 0           $sub->($self, $self->{async_io}->wait(my $w), @_);
523 0           return $w;
524             }
525 78         419 }
526              
527             PUBLIC_METHODS: {
528 13     13   137 no strict 'refs';
  13         43  
  13         3805  
529             for (qw/
530             batch
531             connect
532             execute
533             each_page
534             prepare
535             wait_for_schema_agreement
536             /) {
537             *{$_}= _mksync (\&{"_$_"});
538             *{"call_$_"}= _mkcall (\&{"_$_"});
539             *{"async_$_"}= _mkpromise (\&{"_$_"});
540             *{"future_$_"}= _mkfuture (\&{"_$_"});
541             *{"future_call_$_"}= _mkfuture_call (\&{"_$_"});
542             }
543             }
544              
545             1;
546              
547             __END__