line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
package Cassandra::Client::Connection; |
2
|
|
|
|
|
|
|
our $AUTHORITY = 'cpan:TVDW'; |
3
|
|
|
|
|
|
|
$Cassandra::Client::Connection::VERSION = '0.13_007'; # TRIAL |
4
|
|
|
|
|
|
|
|
5
|
1
|
|
|
1
|
|
14
|
$Cassandra::Client::Connection::VERSION = '0.13007';use 5.010; |
|
1
|
|
|
|
|
3
|
|
6
|
1
|
|
|
1
|
|
8
|
use strict; |
|
1
|
|
|
|
|
3
|
|
|
1
|
|
|
|
|
28
|
|
7
|
1
|
|
|
1
|
|
8
|
use warnings; |
|
1
|
|
|
|
|
2
|
|
|
1
|
|
|
|
|
35
|
|
8
|
1
|
|
|
1
|
|
6
|
use vars qw/$BUFFER/; |
|
1
|
|
|
|
|
3
|
|
|
1
|
|
|
|
|
50
|
|
9
|
|
|
|
|
|
|
|
10
|
1
|
|
|
1
|
|
7
|
use Ref::Util qw/is_blessed_ref is_plain_arrayref/; |
|
1
|
|
|
|
|
8
|
|
|
1
|
|
|
|
|
59
|
|
11
|
1
|
|
|
1
|
|
350
|
use IO::Socket::INET; |
|
1
|
|
|
|
|
16412
|
|
|
1
|
|
|
|
|
6
|
|
12
|
1
|
|
|
1
|
|
926
|
use IO::Socket::INET6; |
|
1
|
|
|
|
|
5925
|
|
|
1
|
|
|
|
|
19
|
|
13
|
1
|
|
|
1
|
|
1382
|
use Errno qw/EAGAIN/; |
|
1
|
|
|
|
|
5
|
|
|
1
|
|
|
|
|
172
|
|
14
|
1
|
|
|
1
|
|
10
|
use Socket qw/SOL_SOCKET IPPROTO_TCP SO_KEEPALIVE TCP_NODELAY/; |
|
1
|
|
|
|
|
4
|
|
|
1
|
|
|
|
|
100
|
|
15
|
1
|
|
|
1
|
|
12
|
use Scalar::Util qw/weaken/; |
|
1
|
|
|
|
|
3
|
|
|
1
|
|
|
|
|
73
|
|
16
|
1
|
|
|
1
|
|
710
|
use Net::SSLeay qw/ERROR_WANT_READ ERROR_WANT_WRITE ERROR_NONE/; |
|
1
|
|
|
|
|
8892
|
|
|
1
|
|
|
|
|
408
|
|
17
|
|
|
|
|
|
|
|
18
|
1
|
|
|
1
|
|
328
|
use Cassandra::Client::Util; |
|
1
|
|
|
|
|
2
|
|
|
1
|
|
|
|
|
62
|
|
19
|
1
|
|
|
|
|
385
|
use Cassandra::Client::Protocol qw/ |
20
|
|
|
|
|
|
|
:constants |
21
|
|
|
|
|
|
|
%consistency_lookup |
22
|
|
|
|
|
|
|
%batch_type_lookup |
23
|
|
|
|
|
|
|
pack_bytes |
24
|
|
|
|
|
|
|
pack_longstring |
25
|
|
|
|
|
|
|
pack_queryparameters |
26
|
|
|
|
|
|
|
pack_shortbytes |
27
|
|
|
|
|
|
|
pack_stringmap |
28
|
|
|
|
|
|
|
pack_stringlist |
29
|
|
|
|
|
|
|
unpack_errordata |
30
|
|
|
|
|
|
|
unpack_inet |
31
|
|
|
|
|
|
|
unpack_int |
32
|
|
|
|
|
|
|
unpack_metadata |
33
|
|
|
|
|
|
|
unpack_shortbytes |
34
|
|
|
|
|
|
|
unpack_string |
35
|
|
|
|
|
|
|
unpack_stringmultimap |
36
|
1
|
|
|
1
|
|
288
|
/; |
|
1
|
|
|
|
|
2
|
|
37
|
1
|
|
|
1
|
|
7
|
use Cassandra::Client::Error::Base; |
|
1
|
|
|
|
|
2
|
|
|
1
|
|
|
|
|
17
|
|
38
|
1
|
|
|
1
|
|
280
|
use Cassandra::Client::ResultSet; |
|
1
|
|
|
|
|
2
|
|
|
1
|
|
|
|
|
24
|
|
39
|
1
|
|
|
1
|
|
252
|
use Cassandra::Client::TLSHandling; |
|
1
|
|
|
|
|
3
|
|
|
1
|
|
|
|
|
30
|
|
40
|
|
|
|
|
|
|
|
41
|
1
|
|
|
1
|
|
5
|
use constant STREAM_ID_LIMIT => 32768; |
|
1
|
|
|
|
|
2
|
|
|
1
|
|
|
|
|
4549
|
|
42
|
|
|
|
|
|
|
|
43
|
|
|
|
|
|
|
# Populated at BEGIN{} time |
44
|
|
|
|
|
|
|
my @compression_preference; |
45
|
|
|
|
|
|
|
my %available_compression; |
46
|
|
|
|
|
|
|
|
47
|
|
|
|
|
|
|
sub new { |
48
|
0
|
|
|
0
|
0
|
|
my ($class, %args)= @_; |
49
|
|
|
|
|
|
|
|
50
|
|
|
|
|
|
|
my $self= bless { |
51
|
|
|
|
|
|
|
client => $args{client}, |
52
|
|
|
|
|
|
|
async_io => $args{async_io}, |
53
|
|
|
|
|
|
|
pool_id => undef, |
54
|
|
|
|
|
|
|
|
55
|
|
|
|
|
|
|
options => $args{options}, |
56
|
|
|
|
|
|
|
request_timeout => $args{options}{request_timeout}, |
57
|
|
|
|
|
|
|
host => $args{host}, |
58
|
|
|
|
|
|
|
metadata => $args{metadata}, |
59
|
|
|
|
|
|
|
prepare_cache => $args{metadata}->prepare_cache, |
60
|
0
|
|
|
|
|
|
last_stream_id => 0, |
61
|
|
|
|
|
|
|
pending_streams => {}, |
62
|
|
|
|
|
|
|
in_prepare => {}, |
63
|
|
|
|
|
|
|
|
64
|
|
|
|
|
|
|
decompress_func => undef, |
65
|
|
|
|
|
|
|
compress_func => undef, |
66
|
|
|
|
|
|
|
connected => 0, |
67
|
|
|
|
|
|
|
connecting => undef, |
68
|
|
|
|
|
|
|
socket => undef, |
69
|
|
|
|
|
|
|
fileno => undef, |
70
|
|
|
|
|
|
|
pending_write => undef, |
71
|
|
|
|
|
|
|
shutdown => 0, |
72
|
|
|
|
|
|
|
read_buffer => \(my $empty= ''), |
73
|
|
|
|
|
|
|
|
74
|
|
|
|
|
|
|
tls => undef, |
75
|
|
|
|
|
|
|
tls_want_write => undef, |
76
|
|
|
|
|
|
|
}, $class; |
77
|
0
|
|
|
|
|
|
weaken($self->{async_io}); |
78
|
0
|
|
|
|
|
|
weaken($self->{client}); |
79
|
0
|
|
|
|
|
|
return $self; |
80
|
|
|
|
|
|
|
} |
81
|
|
|
|
|
|
|
|
82
|
|
|
|
|
|
|
sub get_local_status { |
83
|
0
|
|
|
0
|
0
|
|
my ($self, $callback)= @_; |
84
|
|
|
|
|
|
|
|
85
|
|
|
|
|
|
|
series([ |
86
|
|
|
|
|
|
|
sub { |
87
|
0
|
|
|
0
|
|
|
my ($next)= @_; |
88
|
0
|
|
|
|
|
|
$self->execute_prepared($next, \"select key, data_center, host_id, broadcast_address, rack, release_version, tokens, schema_version from system.local"); |
89
|
|
|
|
|
|
|
}, |
90
|
|
|
|
|
|
|
sub { |
91
|
0
|
|
|
0
|
|
|
my ($next, $result)= @_; |
92
|
|
|
|
|
|
|
|
93
|
0
|
|
|
|
|
|
my %local_status= map { $_->[3] => { |
94
|
|
|
|
|
|
|
peer => $_->[3], |
95
|
|
|
|
|
|
|
data_center => $_->[1], |
96
|
|
|
|
|
|
|
host_id => $_->[2], |
97
|
|
|
|
|
|
|
preferred_ip => $_->[3], |
98
|
|
|
|
|
|
|
rack => $_->[4], |
99
|
|
|
|
|
|
|
release_version => $_->[5], |
100
|
|
|
|
|
|
|
tokens => $_->[6], |
101
|
|
|
|
|
|
|
schema_version => $_->[7], |
102
|
0
|
|
|
|
|
|
} } @{$result->rows}; |
|
0
|
|
|
|
|
|
|
103
|
|
|
|
|
|
|
|
104
|
0
|
|
|
|
|
|
$next->(undef, \%local_status); |
105
|
|
|
|
|
|
|
}, |
106
|
0
|
|
|
|
|
|
], $callback); |
107
|
|
|
|
|
|
|
|
108
|
0
|
|
|
|
|
|
return; |
109
|
|
|
|
|
|
|
} |
110
|
|
|
|
|
|
|
|
111
|
|
|
|
|
|
|
sub get_peers_status { |
112
|
0
|
|
|
0
|
0
|
|
my ($self, $callback)= @_; |
113
|
|
|
|
|
|
|
|
114
|
|
|
|
|
|
|
series([ |
115
|
|
|
|
|
|
|
sub { |
116
|
0
|
|
|
0
|
|
|
my ($next)= @_; |
117
|
0
|
|
|
|
|
|
$self->execute_prepared($next, \"select peer, data_center, host_id, preferred_ip, rack, release_version, tokens, schema_version from system.peers"); |
118
|
|
|
|
|
|
|
}, |
119
|
|
|
|
|
|
|
sub { |
120
|
0
|
|
|
0
|
|
|
my ($next, $result)= @_; |
121
|
|
|
|
|
|
|
|
122
|
0
|
|
|
|
|
|
my %network_status= map { $_->[0] => { |
123
|
|
|
|
|
|
|
peer => $_->[0], |
124
|
|
|
|
|
|
|
data_center => $_->[1], |
125
|
|
|
|
|
|
|
host_id => $_->[2], |
126
|
|
|
|
|
|
|
preferred_ip => $_->[3], |
127
|
|
|
|
|
|
|
rack => $_->[4], |
128
|
|
|
|
|
|
|
release_version => $_->[5], |
129
|
|
|
|
|
|
|
tokens => $_->[6], |
130
|
|
|
|
|
|
|
schema_version => $_->[7], |
131
|
0
|
|
|
|
|
|
} } @{$result->rows}; |
|
0
|
|
|
|
|
|
|
132
|
|
|
|
|
|
|
|
133
|
0
|
|
|
|
|
|
$next->(undef, \%network_status); |
134
|
|
|
|
|
|
|
}, |
135
|
0
|
|
|
|
|
|
], $callback); |
136
|
|
|
|
|
|
|
|
137
|
0
|
|
|
|
|
|
return; |
138
|
|
|
|
|
|
|
} |
139
|
|
|
|
|
|
|
|
140
|
|
|
|
|
|
|
sub get_network_status { |
141
|
0
|
|
|
0
|
0
|
|
my ($self, $callback)= @_; |
142
|
|
|
|
|
|
|
|
143
|
|
|
|
|
|
|
parallel([ |
144
|
|
|
|
|
|
|
sub { |
145
|
0
|
|
|
0
|
|
|
my ($next)= @_; |
146
|
0
|
|
|
|
|
|
$self->get_peers_status($next); |
147
|
|
|
|
|
|
|
}, |
148
|
|
|
|
|
|
|
sub { |
149
|
0
|
|
|
0
|
|
|
my ($next)= @_; |
150
|
0
|
|
|
|
|
|
$self->get_local_status($next); |
151
|
|
|
|
|
|
|
}, |
152
|
|
|
|
|
|
|
], sub { |
153
|
0
|
|
|
0
|
|
|
my ($error, $peers, $local)= @_; |
154
|
0
|
0
|
|
|
|
|
if ($error) { return $callback->($error); } |
|
0
|
|
|
|
|
|
|
155
|
0
|
|
|
|
|
|
return $callback->(undef, { %$peers, %$local }); |
156
|
0
|
|
|
|
|
|
}); |
157
|
|
|
|
|
|
|
} |
158
|
|
|
|
|
|
|
|
159
|
|
|
|
|
|
|
sub register_events { |
160
|
0
|
|
|
0
|
0
|
|
my ($self, $callback)= @_; |
161
|
|
|
|
|
|
|
|
162
|
0
|
|
|
|
|
|
$self->request($callback, OPCODE_REGISTER, pack_stringlist([ |
163
|
|
|
|
|
|
|
'TOPOLOGY_CHANGE', |
164
|
|
|
|
|
|
|
'STATUS_CHANGE', |
165
|
|
|
|
|
|
|
])); |
166
|
|
|
|
|
|
|
|
167
|
0
|
|
|
|
|
|
return; |
168
|
|
|
|
|
|
|
} |
169
|
|
|
|
|
|
|
|
170
|
|
|
|
|
|
|
|
171
|
|
|
|
|
|
|
###### QUERY CODE |
172
|
|
|
|
|
|
|
sub execute_prepared { |
173
|
0
|
|
|
0
|
0
|
|
my ($self, $callback, $queryref, $parameters, $attr, $exec_info)= @_; |
174
|
|
|
|
|
|
|
|
175
|
|
|
|
|
|
|
# Note: parameters is retained until the query is complete. It must not be changed; clone if needed. |
176
|
|
|
|
|
|
|
# Same for attr. Note that external callers automatically have their arguments cloned. |
177
|
|
|
|
|
|
|
|
178
|
0
|
0
|
|
|
|
|
my $prepared= $self->{prepare_cache}{$$queryref} or do { |
179
|
0
|
|
|
|
|
|
return $self->prepare_and_try_execute_again($callback, $queryref, $parameters, $attr, $exec_info); |
180
|
|
|
|
|
|
|
}; |
181
|
|
|
|
|
|
|
|
182
|
0
|
|
|
|
|
|
my $want_result_metadata= !$prepared->{decoder}; |
183
|
0
|
|
|
|
|
|
my $row; |
184
|
0
|
0
|
|
|
|
|
if ($parameters) { |
185
|
|
|
|
|
|
|
eval { |
186
|
0
|
|
|
|
|
|
$row= $prepared->{encoder}->encode($parameters); |
187
|
0
|
|
|
|
|
|
1; |
188
|
0
|
0
|
|
|
|
|
} or do { |
189
|
0
|
|
0
|
|
|
|
my $error= $@ || "??"; |
190
|
0
|
|
|
|
|
|
return $callback->("Failed to encode row to native protocol: $error"); |
191
|
|
|
|
|
|
|
}; |
192
|
|
|
|
|
|
|
} |
193
|
|
|
|
|
|
|
|
194
|
0
|
|
0
|
|
|
|
my $consistency= $consistency_lookup{$attr->{consistency} || 'one'}; |
195
|
0
|
0
|
|
|
|
|
if (!defined $consistency) { |
196
|
0
|
|
|
|
|
|
return $callback->("Invalid consistency level specified: $attr->{consistency}"); |
197
|
|
|
|
|
|
|
} |
198
|
|
|
|
|
|
|
|
199
|
0
|
|
0
|
|
|
|
my $page_size= (0+($attr->{page_size} || $self->{options}{max_page_size} || 0)) || undef; |
200
|
0
|
|
0
|
|
|
|
my $paging_state= $attr->{page} || undef; |
201
|
0
|
|
|
|
|
|
my $execute_body= pack_shortbytes($prepared->{id}).pack_queryparameters($consistency, !$want_result_metadata, $page_size, $paging_state, undef, $row); |
202
|
|
|
|
|
|
|
|
203
|
|
|
|
|
|
|
my $on_completion= sub { |
204
|
|
|
|
|
|
|
# my ($body)= $_[2]; (not copying, because performance. assuming ownership) |
205
|
0
|
|
|
0
|
|
|
my ($err, $code)= @_; |
206
|
|
|
|
|
|
|
|
207
|
0
|
0
|
|
|
|
|
if ($err) { |
208
|
0
|
0
|
0
|
|
|
|
if (is_blessed_ref($err) && $err->code == 0x2500) { |
209
|
0
|
|
|
|
|
|
return $self->prepare_and_try_execute_again($callback, $queryref, $parameters, $attr, $exec_info); |
210
|
|
|
|
|
|
|
} |
211
|
0
|
|
|
|
|
|
return $callback->($err); |
212
|
|
|
|
|
|
|
} |
213
|
|
|
|
|
|
|
|
214
|
0
|
0
|
|
|
|
|
if ($code != OPCODE_RESULT) { |
215
|
|
|
|
|
|
|
# This shouldn't ever happen... |
216
|
0
|
|
|
|
|
|
return $callback->(Cassandra::Client::Error::Base->new( |
217
|
|
|
|
|
|
|
message => "Expected a RESULT frame but got something else; considering the query failed", |
218
|
|
|
|
|
|
|
request_error => 1, |
219
|
|
|
|
|
|
|
)); |
220
|
|
|
|
|
|
|
} |
221
|
|
|
|
|
|
|
|
222
|
0
|
|
|
|
|
|
$self->decode_result($callback, $prepared, $_[2]); |
223
|
0
|
|
|
|
|
|
}; |
224
|
|
|
|
|
|
|
|
225
|
0
|
|
|
|
|
|
$self->request($on_completion, OPCODE_EXECUTE, $execute_body); |
226
|
|
|
|
|
|
|
|
227
|
0
|
|
|
|
|
|
return; |
228
|
|
|
|
|
|
|
} |
229
|
|
|
|
|
|
|
|
230
|
|
|
|
|
|
|
sub prepare_and_try_execute_again { |
231
|
0
|
|
|
0
|
0
|
|
my ($self, $callback, $queryref, $parameters, $attr, $exec_info)= @_; |
232
|
|
|
|
|
|
|
|
233
|
0
|
0
|
|
|
|
|
if ($exec_info->{_prepared_and_tried_again}++) { |
234
|
0
|
|
|
|
|
|
return $callback->("Query failed because it seems to be missing from the server's prepared statement cache"); |
235
|
|
|
|
|
|
|
} |
236
|
|
|
|
|
|
|
|
237
|
|
|
|
|
|
|
series([ |
238
|
|
|
|
|
|
|
sub { |
239
|
0
|
|
|
0
|
|
|
my ($next)= @_; |
240
|
0
|
|
|
|
|
|
$self->prepare($next, $$queryref); |
241
|
|
|
|
|
|
|
}, |
242
|
|
|
|
|
|
|
], sub { |
243
|
0
|
0
|
|
0
|
|
|
return $callback->($_[0]) if $_[0]; |
244
|
|
|
|
|
|
|
|
245
|
0
|
0
|
|
|
|
|
unless ($self->{prepare_cache}{$$queryref}) { |
246
|
|
|
|
|
|
|
# We're recursing, so let's make sure we avoid the infinite loop |
247
|
0
|
|
|
|
|
|
return $callback->("Internal error: expected query to be prepared but it was not"); |
248
|
|
|
|
|
|
|
} |
249
|
|
|
|
|
|
|
|
250
|
0
|
|
|
|
|
|
return $self->execute_prepared($callback, $queryref, $parameters, $attr, $exec_info); |
251
|
0
|
|
|
|
|
|
}); |
252
|
0
|
|
|
|
|
|
return; |
253
|
|
|
|
|
|
|
} |
254
|
|
|
|
|
|
|
|
255
|
|
|
|
|
|
|
sub execute_batch { |
256
|
0
|
|
|
0
|
0
|
|
my ($self, $callback, $queries, $attribs, $exec_info)= @_; |
257
|
|
|
|
|
|
|
# Like execute_prepared, assumes ownership of $queries and $attribs |
258
|
|
|
|
|
|
|
|
259
|
0
|
0
|
|
|
|
|
if (!is_plain_arrayref($queries)) { |
260
|
0
|
|
|
|
|
|
return $callback->("execute_batch: queries argument must be an array of arrays"); |
261
|
|
|
|
|
|
|
} |
262
|
|
|
|
|
|
|
|
263
|
0
|
|
|
|
|
|
my @prepared; |
264
|
0
|
|
|
|
|
|
for my $query (@$queries) { |
265
|
0
|
0
|
|
|
|
|
if (!is_plain_arrayref($query)) { |
266
|
0
|
|
|
|
|
|
return $callback->("execute_batch: entries in query argument must be arrayrefs"); |
267
|
|
|
|
|
|
|
} |
268
|
0
|
0
|
|
|
|
|
if (!$query->[0]) { |
269
|
0
|
|
|
|
|
|
return $callback->("Empty or no query given, cannot execute as part of a batch"); |
270
|
|
|
|
|
|
|
} |
271
|
0
|
0
|
0
|
|
|
|
if ($query->[1] && !is_plain_arrayref($query->[1])) { |
272
|
0
|
|
|
|
|
|
return $callback->("Query parameters to batch() must be given as an arrayref"); |
273
|
|
|
|
|
|
|
} |
274
|
|
|
|
|
|
|
|
275
|
0
|
0
|
|
|
|
|
if (my $prep= $self->{prepare_cache}{$query->[0]}) { |
276
|
0
|
|
|
|
|
|
push @prepared, [ $prep, $query->[1] ]; |
277
|
|
|
|
|
|
|
|
278
|
|
|
|
|
|
|
} else { |
279
|
0
|
|
|
|
|
|
return $self->prepare_and_try_batch_again($callback, $queries, $attribs, $exec_info); |
280
|
|
|
|
|
|
|
} |
281
|
|
|
|
|
|
|
} |
282
|
|
|
|
|
|
|
|
283
|
0
|
|
|
|
|
|
my $batch_type= 0; |
284
|
0
|
0
|
|
|
|
|
if ($attribs->{batch_type}) { |
285
|
0
|
|
|
|
|
|
$batch_type= $batch_type_lookup{$attribs->{batch_type}}; |
286
|
0
|
0
|
|
|
|
|
if (!defined $batch_type) { |
287
|
0
|
|
|
|
|
|
return $callback->("Unknown batch_type: <$attribs->{batch_type}>"); |
288
|
|
|
|
|
|
|
} |
289
|
|
|
|
|
|
|
} |
290
|
|
|
|
|
|
|
|
291
|
0
|
|
0
|
|
|
|
my $consistency= $consistency_lookup{$attribs->{consistency} || 'one'}; |
292
|
0
|
0
|
|
|
|
|
if (!defined $consistency) { |
293
|
0
|
|
|
|
|
|
return $callback->("Invalid consistency level specified: $attribs->{consistency}"); |
294
|
|
|
|
|
|
|
} |
295
|
|
|
|
|
|
|
|
296
|
0
|
|
|
|
|
|
my $batch_frame= pack('Cn', $batch_type, (0+@prepared)); |
297
|
0
|
|
|
|
|
|
for my $prep (@prepared) { |
298
|
0
|
|
|
|
|
|
$batch_frame .= pack('C', 1).pack_shortbytes($prep->[0]{id}).$prep->[0]{encoder}->encode($prep->[1]); |
299
|
|
|
|
|
|
|
} |
300
|
0
|
|
|
|
|
|
$batch_frame .= pack('nC', $consistency, 0); |
301
|
|
|
|
|
|
|
|
302
|
|
|
|
|
|
|
my $on_completion= sub { |
303
|
|
|
|
|
|
|
# my ($body)= $_[2]; (not copying, because performance. assuming ownership) |
304
|
0
|
|
|
0
|
|
|
my ($err, $code)= @_; |
305
|
|
|
|
|
|
|
|
306
|
0
|
0
|
|
|
|
|
if ($err) { |
307
|
0
|
0
|
0
|
|
|
|
if (is_blessed_ref($err) && $err->code == 0x2500) { |
308
|
0
|
|
|
|
|
|
return $self->prepare_and_try_batch_again($callback, $queries, $attribs, $exec_info); |
309
|
|
|
|
|
|
|
} |
310
|
0
|
|
|
|
|
|
return $callback->($err); |
311
|
|
|
|
|
|
|
} |
312
|
|
|
|
|
|
|
|
313
|
0
|
0
|
|
|
|
|
if ($code != OPCODE_RESULT) { |
314
|
|
|
|
|
|
|
# This shouldn't ever happen... |
315
|
0
|
|
|
|
|
|
return $callback->(Cassandra::Client::Error::Base->new( |
316
|
|
|
|
|
|
|
message => "Expected a RESULT frame but got something else; considering the batch failed", |
317
|
|
|
|
|
|
|
request_error => 1, |
318
|
|
|
|
|
|
|
)); |
319
|
|
|
|
|
|
|
} |
320
|
|
|
|
|
|
|
|
321
|
0
|
|
|
|
|
|
$self->decode_result($callback, undef, $_[2]); |
322
|
0
|
|
|
|
|
|
}; |
323
|
|
|
|
|
|
|
|
324
|
0
|
|
|
|
|
|
$self->request($on_completion, OPCODE_BATCH, $batch_frame); |
325
|
|
|
|
|
|
|
|
326
|
0
|
|
|
|
|
|
return; |
327
|
|
|
|
|
|
|
} |
328
|
|
|
|
|
|
|
|
329
|
|
|
|
|
|
|
sub prepare_and_try_batch_again { |
330
|
0
|
|
|
0
|
0
|
|
my ($self, $callback, $queries, $attribs, $exec_info)= @_; |
331
|
|
|
|
|
|
|
|
332
|
0
|
0
|
|
|
|
|
if ($exec_info->{_prepared_and_tried_again}++) { |
333
|
0
|
|
|
|
|
|
return $callback->("Batch failed because one or more queries seem to be missing from the server's prepared statement cache"); |
334
|
|
|
|
|
|
|
} |
335
|
|
|
|
|
|
|
|
336
|
0
|
|
|
|
|
|
my %to_be_prepared; |
337
|
0
|
|
|
|
|
|
$to_be_prepared{$_->[0]}= 1 for @$queries; |
338
|
|
|
|
|
|
|
|
339
|
|
|
|
|
|
|
parallel([ |
340
|
0
|
|
|
|
|
|
map { my $query= $_; sub { |
341
|
0
|
|
|
0
|
|
|
my ($next)= @_; |
342
|
0
|
|
|
|
|
|
$self->prepare($next, $query); |
343
|
0
|
|
|
|
|
|
} } keys %to_be_prepared |
344
|
|
|
|
|
|
|
], sub { |
345
|
0
|
0
|
|
0
|
|
|
return $callback->($_[0]) if $_[0]; |
346
|
|
|
|
|
|
|
|
347
|
0
|
|
|
|
|
|
return $self->execute_batch($callback, $queries, $attribs, $exec_info); |
348
|
0
|
|
|
|
|
|
}); |
349
|
0
|
|
|
|
|
|
return; |
350
|
|
|
|
|
|
|
} |
351
|
|
|
|
|
|
|
|
352
|
|
|
|
|
|
|
sub prepare { |
353
|
0
|
|
|
0
|
0
|
|
my ($self, $callback, $query)= @_; |
354
|
|
|
|
|
|
|
|
355
|
0
|
0
|
|
|
|
|
if (exists $self->{in_prepare}{$query}) { |
356
|
0
|
|
|
|
|
|
push @{$self->{in_prepare}{$query}}, $callback; |
|
0
|
|
|
|
|
|
|
357
|
0
|
|
|
|
|
|
return; |
358
|
|
|
|
|
|
|
} |
359
|
|
|
|
|
|
|
|
360
|
0
|
|
|
|
|
|
$self->{in_prepare}{$query}= [ $callback ]; |
361
|
|
|
|
|
|
|
|
362
|
|
|
|
|
|
|
series([ |
363
|
|
|
|
|
|
|
sub { |
364
|
0
|
|
|
0
|
|
|
my ($next)= @_; |
365
|
0
|
|
|
|
|
|
my $req= pack_longstring($query); |
366
|
0
|
|
|
|
|
|
$self->request($next, OPCODE_PREPARE, $req); |
367
|
|
|
|
|
|
|
}, |
368
|
|
|
|
|
|
|
sub { |
369
|
0
|
|
|
0
|
|
|
my ($next, $code, $body)= @_; |
370
|
0
|
0
|
|
|
|
|
if ($code != OPCODE_RESULT) { |
371
|
0
|
|
|
|
|
|
return $next->("Got unexpected failure while trying to prepare"); |
372
|
|
|
|
|
|
|
} |
373
|
|
|
|
|
|
|
|
374
|
0
|
|
|
|
|
|
my $result_type= unpack_int($body); |
375
|
0
|
0
|
|
|
|
|
if ($result_type != RESULT_PREPARED) { |
376
|
0
|
|
|
|
|
|
return $next->("Unexpected response from server while preparing"); |
377
|
|
|
|
|
|
|
} |
378
|
|
|
|
|
|
|
|
379
|
0
|
|
|
|
|
|
my $id= unpack_shortbytes($body); |
380
|
|
|
|
|
|
|
|
381
|
0
|
|
|
|
|
|
my ($encoder, $decoder); |
382
|
0
|
0
|
|
|
|
|
eval { |
383
|
0
|
|
|
|
|
|
($encoder)= unpack_metadata($body); |
384
|
0
|
|
|
|
|
|
1; |
385
|
|
|
|
|
|
|
} or return $next->("Unable to unpack query metadata: $@"); |
386
|
0
|
0
|
|
|
|
|
eval { |
387
|
0
|
|
|
|
|
|
($decoder)= unpack_metadata($body); |
388
|
0
|
|
|
|
|
|
1; |
389
|
|
|
|
|
|
|
} or return $next->("Unable to unpack query result metadata: $@"); |
390
|
|
|
|
|
|
|
|
391
|
0
|
|
|
|
|
|
$self->{metadata}->add_prepared($query, $id, $decoder, $encoder); |
392
|
0
|
|
|
|
|
|
return $next->(); |
393
|
|
|
|
|
|
|
}, |
394
|
|
|
|
|
|
|
], sub { |
395
|
0
|
|
|
0
|
|
|
my $error= shift; |
396
|
0
|
0
|
|
|
|
|
my $in_prepare= delete($self->{in_prepare}{$query}) or die "BUG"; |
397
|
0
|
|
|
|
|
|
$_->($error) for @$in_prepare; |
398
|
0
|
|
|
|
|
|
}); |
399
|
|
|
|
|
|
|
|
400
|
0
|
|
|
|
|
|
return; |
401
|
|
|
|
|
|
|
} |
402
|
|
|
|
|
|
|
|
403
|
|
|
|
|
|
|
sub decode_result { |
404
|
0
|
|
|
0
|
0
|
|
my ($self, $callback, $prepared)= @_; # $_[3]=$body |
405
|
|
|
|
|
|
|
|
406
|
0
|
|
|
|
|
|
my $result_type= unpack('l>', substr($_[3], 0, 4, '')); |
407
|
0
|
0
|
|
|
|
|
if ($result_type == RESULT_ROWS) { # Rows |
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
408
|
0
|
|
|
|
|
|
my ($paging_state, $decoder); |
409
|
0
|
0
|
|
|
|
|
eval { ($decoder, $paging_state)= unpack_metadata($_[3]); 1 } or return $callback->("Unable to unpack query metadata: $@"); |
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
410
|
0
|
|
0
|
|
|
|
$decoder= $prepared->{decoder} || $decoder; |
411
|
|
|
|
|
|
|
|
412
|
0
|
|
|
|
|
|
$callback->(undef, |
413
|
|
|
|
|
|
|
Cassandra::Client::ResultSet->new( |
414
|
|
|
|
|
|
|
\$_[3], |
415
|
|
|
|
|
|
|
$decoder, |
416
|
|
|
|
|
|
|
$paging_state, |
417
|
|
|
|
|
|
|
) |
418
|
|
|
|
|
|
|
); |
419
|
|
|
|
|
|
|
|
420
|
|
|
|
|
|
|
} elsif ($result_type == RESULT_VOID) { # Void |
421
|
0
|
|
|
|
|
|
return $callback->(); |
422
|
|
|
|
|
|
|
|
423
|
|
|
|
|
|
|
} elsif ($result_type == RESULT_SET_KEYSPACE) { # Set_keyspace |
424
|
0
|
|
|
|
|
|
my $new_keyspace= unpack_string($_[3]); |
425
|
0
|
|
|
|
|
|
return $callback->(); |
426
|
|
|
|
|
|
|
|
427
|
|
|
|
|
|
|
} elsif ($result_type == RESULT_SCHEMA_CHANGE) { # Schema change |
428
|
|
|
|
|
|
|
return $self->wait_for_schema_agreement(sub { |
429
|
|
|
|
|
|
|
# We may be passed an error. Ignore it, our query succeeded |
430
|
0
|
|
|
0
|
|
|
$callback->(); |
431
|
0
|
|
|
|
|
|
}); |
432
|
|
|
|
|
|
|
|
433
|
|
|
|
|
|
|
} else { |
434
|
0
|
|
|
|
|
|
return $callback->("Query executed successfully but got an unexpected response type"); |
435
|
|
|
|
|
|
|
} |
436
|
0
|
|
|
|
|
|
return; |
437
|
|
|
|
|
|
|
} |
438
|
|
|
|
|
|
|
|
439
|
|
|
|
|
|
|
sub wait_for_schema_agreement { |
440
|
0
|
|
|
0
|
0
|
|
my ($self, $callback)= @_; |
441
|
|
|
|
|
|
|
|
442
|
0
|
|
|
|
|
|
my $waited= 0; |
443
|
0
|
|
|
|
|
|
my $wait_delay= 0.5; |
444
|
0
|
|
|
|
|
|
my $max_wait= 5; |
445
|
|
|
|
|
|
|
|
446
|
0
|
|
|
|
|
|
my $done; |
447
|
|
|
|
|
|
|
whilst( |
448
|
0
|
|
|
0
|
|
|
sub { !$done }, |
449
|
|
|
|
|
|
|
sub { |
450
|
0
|
|
|
0
|
|
|
my ($whilst_next)= @_; |
451
|
|
|
|
|
|
|
|
452
|
|
|
|
|
|
|
series([ |
453
|
|
|
|
|
|
|
sub { |
454
|
0
|
|
|
|
|
|
my ($next)= @_; |
455
|
0
|
|
|
|
|
|
$self->{async_io}->timer($next, $wait_delay); |
456
|
|
|
|
|
|
|
}, |
457
|
|
|
|
|
|
|
sub { |
458
|
0
|
|
|
|
|
|
my ($next)= @_; |
459
|
0
|
|
|
|
|
|
$waited += $wait_delay; |
460
|
0
|
|
|
|
|
|
$self->get_network_status($next); |
461
|
|
|
|
|
|
|
}, |
462
|
|
|
|
|
|
|
], sub { |
463
|
0
|
|
|
|
|
|
my ($error, $network_status)= @_; |
464
|
0
|
0
|
|
|
|
|
return $whilst_next->($error) if $error; |
465
|
|
|
|
|
|
|
|
466
|
0
|
|
|
|
|
|
my %versions; |
467
|
0
|
|
|
|
|
|
$versions{$_->{schema_version}}= 1 for values %$network_status; |
468
|
0
|
0
|
|
|
|
|
if (keys %versions > 1) { |
469
|
0
|
0
|
|
|
|
|
if ($waited >= $max_wait) { |
470
|
0
|
|
|
|
|
|
return $whilst_next->("wait_for_schema_agreement timed out after $waited seconds"); |
471
|
|
|
|
|
|
|
} |
472
|
|
|
|
|
|
|
} else { |
473
|
0
|
|
|
|
|
|
$done= 1; |
474
|
|
|
|
|
|
|
} |
475
|
0
|
|
|
|
|
|
return $whilst_next->(); |
476
|
0
|
|
|
|
|
|
}); |
477
|
|
|
|
|
|
|
}, |
478
|
0
|
|
|
|
|
|
$callback, |
479
|
|
|
|
|
|
|
); |
480
|
|
|
|
|
|
|
|
481
|
0
|
|
|
|
|
|
return; |
482
|
|
|
|
|
|
|
} |
483
|
|
|
|
|
|
|
|
484
|
|
|
|
|
|
|
|
485
|
|
|
|
|
|
|
|
486
|
|
|
|
|
|
|
###### PROTOCOL CODE |
487
|
|
|
|
|
|
|
sub handshake { |
488
|
0
|
|
|
0
|
0
|
|
my ($self, $callback)= @_; |
489
|
|
|
|
|
|
|
series([ |
490
|
|
|
|
|
|
|
sub { # Send the OPCODE_OPTIONS |
491
|
0
|
|
|
0
|
|
|
my ($next)= @_; |
492
|
0
|
|
|
|
|
|
$self->request($next, OPCODE_OPTIONS, ''); |
493
|
|
|
|
|
|
|
}, |
494
|
|
|
|
|
|
|
sub { # The server hopefully just told us what it supports, let's respond with a STARTUP message |
495
|
0
|
|
|
0
|
|
|
my ($next, $response_code, $body)= @_; |
496
|
0
|
0
|
|
|
|
|
if ($response_code != OPCODE_SUPPORTED) { |
497
|
0
|
|
|
|
|
|
return $next->("Server returned an unexpected handshake"); |
498
|
|
|
|
|
|
|
} |
499
|
|
|
|
|
|
|
|
500
|
0
|
|
|
|
|
|
my $map= unpack_stringmultimap($body); |
501
|
|
|
|
|
|
|
|
502
|
0
|
0
|
0
|
|
|
|
unless ($map->{CQL_VERSION} && $map->{COMPRESSION}) { |
503
|
0
|
|
|
|
|
|
return $next->("Server did not return compression and cql version information"); |
504
|
|
|
|
|
|
|
} |
505
|
|
|
|
|
|
|
|
506
|
0
|
|
|
|
|
|
my $selected_cql_version= $self->{options}{cql_version}; |
507
|
0
|
0
|
|
|
|
|
if (!$selected_cql_version) { |
508
|
0
|
|
|
|
|
|
($selected_cql_version)= reverse sort @{$map->{CQL_VERSION}}; |
|
0
|
|
|
|
|
|
|
509
|
|
|
|
|
|
|
} |
510
|
|
|
|
|
|
|
|
511
|
0
|
|
|
|
|
|
my %ss_compression= map { $_, 1 } @{$map->{COMPRESSION}}; |
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
512
|
0
|
|
|
|
|
|
my $selected_compression= $self->{options}{compression}; |
513
|
0
|
0
|
|
|
|
|
if (!$selected_compression) { |
514
|
0
|
|
|
|
|
|
for (@compression_preference) { |
515
|
0
|
0
|
0
|
|
|
|
if ($ss_compression{$_} && $available_compression{$_}) { |
516
|
0
|
|
|
|
|
|
$selected_compression= $_; |
517
|
0
|
|
|
|
|
|
last; |
518
|
|
|
|
|
|
|
} |
519
|
|
|
|
|
|
|
} |
520
|
|
|
|
|
|
|
} |
521
|
0
|
0
|
0
|
|
|
|
$selected_compression= undef if $selected_compression && $selected_compression eq 'none'; |
522
|
|
|
|
|
|
|
|
523
|
0
|
0
|
|
|
|
|
if ($selected_compression) { |
524
|
0
|
0
|
|
|
|
|
if (!$ss_compression{$selected_compression}) { |
525
|
0
|
|
|
|
|
|
return $next->("Server did not support requested compression method <$selected_compression>"); |
526
|
|
|
|
|
|
|
} |
527
|
0
|
0
|
|
|
|
|
if (!$available_compression{$selected_compression}) { |
528
|
0
|
|
|
|
|
|
return $next->("Requested compression method <$selected_compression> is supported by the server but not by us"); |
529
|
|
|
|
|
|
|
} |
530
|
|
|
|
|
|
|
} |
531
|
|
|
|
|
|
|
|
532
|
0
|
0
|
|
|
|
|
my $request_body= pack_stringmap({ |
533
|
|
|
|
|
|
|
CQL_VERSION => $selected_cql_version, |
534
|
|
|
|
|
|
|
($selected_compression ? (COMPRESSION => $selected_compression) : ()), |
535
|
|
|
|
|
|
|
}); |
536
|
|
|
|
|
|
|
|
537
|
0
|
|
|
|
|
|
$self->request($next, OPCODE_STARTUP, $request_body); |
538
|
|
|
|
|
|
|
|
539
|
|
|
|
|
|
|
# This needs to happen after we send the STARTUP message |
540
|
0
|
|
|
|
|
|
$self->setup_compression($selected_compression); |
541
|
|
|
|
|
|
|
}, |
542
|
|
|
|
|
|
|
sub { # By now we should know whether we need to authenticate |
543
|
0
|
|
|
0
|
|
|
my ($next, $response_code, $body)= @_; |
544
|
0
|
0
|
|
|
|
|
if ($response_code == OPCODE_READY) { |
545
|
0
|
|
|
|
|
|
return $next->(undef, $body); # Pass it along |
546
|
|
|
|
|
|
|
} |
547
|
|
|
|
|
|
|
|
548
|
0
|
0
|
|
|
|
|
if ($response_code == OPCODE_AUTHENTICATE) { |
549
|
0
|
|
|
|
|
|
return $self->authenticate($next, unpack_string($body)); |
550
|
|
|
|
|
|
|
} |
551
|
|
|
|
|
|
|
|
552
|
0
|
|
|
|
|
|
return $next->("Unexpected response from the server"); |
553
|
|
|
|
|
|
|
}, |
554
|
|
|
|
|
|
|
sub { |
555
|
0
|
|
|
0
|
|
|
my ($next)= @_; |
556
|
0
|
0
|
|
|
|
|
if ($self->{options}{keyspace}) { |
557
|
0
|
|
|
|
|
|
return $self->execute_prepared($next, \('use "'.$self->{options}{keyspace}.'"')); |
558
|
|
|
|
|
|
|
} |
559
|
0
|
|
|
|
|
|
return $next->(); |
560
|
|
|
|
|
|
|
}, |
561
|
|
|
|
|
|
|
sub { |
562
|
0
|
|
|
0
|
|
|
my ($next)= @_; |
563
|
0
|
0
|
|
|
|
|
if (!$self->{ipaddress}) { |
564
|
0
|
|
|
|
|
|
return $self->get_local_status($next); |
565
|
|
|
|
|
|
|
} |
566
|
0
|
|
|
|
|
|
return $next->(); |
567
|
|
|
|
|
|
|
}, |
568
|
|
|
|
|
|
|
sub { |
569
|
0
|
|
|
0
|
|
|
my ($next, $status)= @_; |
570
|
0
|
0
|
|
|
|
|
if ($status) { |
571
|
0
|
|
|
|
|
|
my ($local)= values %$status; |
572
|
0
|
|
|
|
|
|
$self->{ipaddress}= $local->{peer}; |
573
|
0
|
|
|
|
|
|
$self->{datacenter}= $local->{data_center}; |
574
|
|
|
|
|
|
|
} |
575
|
0
|
0
|
|
|
|
|
if (!$self->{ipaddress}) { |
576
|
0
|
|
|
|
|
|
return $next->("Unable to determine node's IP address"); |
577
|
|
|
|
|
|
|
} |
578
|
0
|
|
|
|
|
|
return $next->(); |
579
|
|
|
|
|
|
|
} |
580
|
0
|
|
|
|
|
|
], $callback); |
581
|
|
|
|
|
|
|
|
582
|
0
|
|
|
|
|
|
return; |
583
|
|
|
|
|
|
|
} |
584
|
|
|
|
|
|
|
|
585
|
|
|
|
|
|
|
sub authenticate { |
586
|
0
|
|
|
0
|
0
|
|
my ($self, $callback, $authenticator)= @_; |
587
|
|
|
|
|
|
|
|
588
|
0
|
|
|
|
|
|
my $user= "$self->{options}{username}"; |
589
|
0
|
|
|
|
|
|
my $pass= "$self->{options}{password}"; |
590
|
0
|
0
|
|
|
|
|
utf8::encode($user) if utf8::is_utf8($user); |
591
|
0
|
0
|
|
|
|
|
utf8::encode($pass) if utf8::is_utf8($pass); |
592
|
|
|
|
|
|
|
|
593
|
0
|
0
|
0
|
|
|
|
if (!$user || !$pass) { |
594
|
0
|
|
|
|
|
|
return $callback->("Server expected authentication using <$authenticator> but no credentials were set"); |
595
|
|
|
|
|
|
|
} |
596
|
|
|
|
|
|
|
|
597
|
|
|
|
|
|
|
series([ |
598
|
|
|
|
|
|
|
sub { |
599
|
0
|
|
|
0
|
|
|
my ($next)= @_; |
600
|
0
|
|
|
|
|
|
my $auth_body= pack_bytes("\0$user\0$pass"); |
601
|
0
|
|
|
|
|
|
$self->request($next, OPCODE_AUTH_RESPONSE, $auth_body); |
602
|
|
|
|
|
|
|
}, |
603
|
|
|
|
|
|
|
sub { |
604
|
0
|
|
|
0
|
|
|
my ($next, $code, $body)= @_; |
605
|
0
|
0
|
|
|
|
|
if ($code == OPCODE_AUTH_SUCCESS) { |
606
|
0
|
|
|
|
|
|
$next->(); |
607
|
|
|
|
|
|
|
} else { |
608
|
0
|
|
|
|
|
|
$next->("Failed to authenticate: unknown error"); |
609
|
|
|
|
|
|
|
} |
610
|
|
|
|
|
|
|
}, |
611
|
0
|
|
|
|
|
|
], $callback); |
612
|
|
|
|
|
|
|
|
613
|
0
|
|
|
|
|
|
return; |
614
|
|
|
|
|
|
|
} |
615
|
|
|
|
|
|
|
|
616
|
|
|
|
|
|
|
sub handle_event { |
617
|
0
|
|
|
0
|
0
|
|
my ($self, $eventdata)= @_; |
618
|
0
|
|
|
|
|
|
my $type= unpack_string($eventdata); |
619
|
0
|
0
|
|
|
|
|
if ($type eq 'TOPOLOGY_CHANGE') { |
|
|
0
|
|
|
|
|
|
620
|
0
|
|
|
|
|
|
my ($change, $ipaddress)= (unpack_string($eventdata), unpack_inet($eventdata)); |
621
|
0
|
|
|
|
|
|
$self->{client}->_handle_topology_change($change, $ipaddress); |
622
|
|
|
|
|
|
|
|
623
|
|
|
|
|
|
|
} elsif ($type eq 'STATUS_CHANGE') { |
624
|
0
|
|
|
|
|
|
my ($change, $ipaddress)= (unpack_string($eventdata), unpack_inet($eventdata)); |
625
|
0
|
|
|
|
|
|
$self->{client}->_handle_status_change($change, $ipaddress); |
626
|
|
|
|
|
|
|
|
627
|
|
|
|
|
|
|
} else { |
628
|
0
|
|
|
|
|
|
warn 'Received unknown event type: '.$type; |
629
|
|
|
|
|
|
|
} |
630
|
|
|
|
|
|
|
} |
631
|
|
|
|
|
|
|
|
632
|
|
|
|
|
|
|
sub get_pool_id { |
633
|
|
|
|
|
|
|
$_[0]{pool_id} |
634
|
0
|
|
|
0
|
0
|
|
} |
635
|
|
|
|
|
|
|
|
636
|
|
|
|
|
|
|
sub set_pool_id { |
637
|
0
|
|
|
0
|
0
|
|
$_[0]{pool_id}= $_[1]; |
638
|
|
|
|
|
|
|
} |
639
|
|
|
|
|
|
|
|
640
|
|
|
|
|
|
|
sub ip_address { |
641
|
|
|
|
|
|
|
$_[0]{ipaddress} |
642
|
0
|
|
|
0
|
0
|
|
} |
643
|
|
|
|
|
|
|
|
644
|
|
|
|
|
|
|
|
645
|
|
|
|
|
|
|
|
646
|
|
|
|
|
|
|
####### IO LOGIC |
647
|
|
|
|
|
|
|
sub connect { |
648
|
0
|
|
|
0
|
0
|
|
my ($self, $callback)= @_; |
649
|
0
|
0
|
|
|
|
|
return $callback->() if $self->{connected}; |
650
|
|
|
|
|
|
|
|
651
|
0
|
0
|
|
|
|
|
if ($self->{connecting}++) { |
652
|
0
|
|
|
|
|
|
warn "BUG: Calling connect twice?"; |
653
|
0
|
|
|
|
|
|
return $callback->("Internal bug: called connect twice."); |
654
|
0
|
|
|
|
|
|
return; |
655
|
|
|
|
|
|
|
} |
656
|
|
|
|
|
|
|
|
657
|
0
|
0
|
|
|
|
|
if ($self->{options}{tls}) { |
658
|
|
|
|
|
|
|
eval { |
659
|
0
|
|
|
|
|
|
$self->{tls}= $self->{client}{tls}->new_conn; |
660
|
0
|
|
|
|
|
|
1; |
661
|
0
|
0
|
|
|
|
|
} or do { |
662
|
0
|
|
0
|
|
|
|
my $error= $@ || "unknown TLS error"; |
663
|
0
|
|
|
|
|
|
return $callback->($error); |
664
|
|
|
|
|
|
|
}; |
665
|
|
|
|
|
|
|
} |
666
|
|
|
|
|
|
|
|
667
|
0
|
|
|
|
|
|
my $socket; { |
668
|
0
|
|
|
|
|
|
local $@; |
|
0
|
|
|
|
|
|
|
669
|
|
|
|
|
|
|
|
670
|
0
|
0
|
|
|
|
|
if ($self->{host} =~ /:/) { |
671
|
|
|
|
|
|
|
# IPv6 |
672
|
|
|
|
|
|
|
$socket= IO::Socket::INET6->new( |
673
|
|
|
|
|
|
|
PeerAddr => $self->{host}, |
674
|
|
|
|
|
|
|
PeerPort => $self->{options}{port}, |
675
|
0
|
|
|
|
|
|
Proto => 'tcp', |
676
|
|
|
|
|
|
|
Blocking => 0, |
677
|
|
|
|
|
|
|
); |
678
|
|
|
|
|
|
|
} else { |
679
|
|
|
|
|
|
|
# IPv6 |
680
|
|
|
|
|
|
|
$socket= IO::Socket::INET->new( |
681
|
|
|
|
|
|
|
PeerAddr => $self->{host}, |
682
|
|
|
|
|
|
|
PeerPort => $self->{options}{port}, |
683
|
0
|
|
|
|
|
|
Proto => 'tcp', |
684
|
|
|
|
|
|
|
Blocking => 0, |
685
|
|
|
|
|
|
|
); |
686
|
|
|
|
|
|
|
} |
687
|
|
|
|
|
|
|
|
688
|
0
|
0
|
|
|
|
|
unless ($socket) { |
689
|
0
|
|
|
|
|
|
my $error= "Could not connect: $@"; |
690
|
0
|
|
|
|
|
|
return $callback->($error); |
691
|
|
|
|
|
|
|
} |
692
|
|
|
|
|
|
|
|
693
|
0
|
|
|
|
|
|
$socket->setsockopt(SOL_SOCKET, SO_KEEPALIVE, 1); |
694
|
0
|
|
|
|
|
|
$socket->setsockopt(IPPROTO_TCP, TCP_NODELAY, 1); |
695
|
|
|
|
|
|
|
} |
696
|
|
|
|
|
|
|
|
697
|
0
|
|
|
|
|
|
$self->{socket}= $socket; |
698
|
0
|
|
|
|
|
|
$self->{fileno}= $socket->fileno; |
699
|
0
|
|
|
|
|
|
$self->{async_io}->register($self->{fileno}, $self); |
700
|
0
|
|
|
|
|
|
$self->{async_io}->register_read($self->{fileno}); |
701
|
|
|
|
|
|
|
|
702
|
|
|
|
|
|
|
# We create a fake buffer, to ensure we wait until we can actually write |
703
|
0
|
|
|
|
|
|
$self->{pending_write}= ''; |
704
|
0
|
|
|
|
|
|
$self->{async_io}->register_write($self->{fileno}); |
705
|
|
|
|
|
|
|
|
706
|
0
|
0
|
|
|
|
|
if ($self->{options}{tls}) { |
707
|
0
|
|
|
|
|
|
Net::SSLeay::set_fd(${$self->{tls}}, $self->{fileno}); |
|
0
|
|
|
|
|
|
|
708
|
0
|
|
|
|
|
|
Net::SSLeay::set_connect_state(${$self->{tls}}); |
|
0
|
|
|
|
|
|
|
709
|
|
|
|
|
|
|
} |
710
|
|
|
|
|
|
|
|
711
|
|
|
|
|
|
|
$self->handshake(sub { |
712
|
0
|
|
|
0
|
|
|
my $error= shift; |
713
|
0
|
|
|
|
|
|
$self->{connected}= 1; |
714
|
0
|
0
|
|
|
|
|
if ($error) { |
715
|
0
|
|
|
|
|
|
$self->shutdown("Failed to connect: $error"); |
716
|
|
|
|
|
|
|
} |
717
|
0
|
|
|
|
|
|
return $callback->($error); |
718
|
0
|
|
|
|
|
|
}); |
719
|
|
|
|
|
|
|
|
720
|
0
|
|
|
|
|
|
return; |
721
|
|
|
|
|
|
|
} |
722
|
|
|
|
|
|
|
|
723
|
|
|
|
|
|
|
sub request { |
724
|
|
|
|
|
|
|
# my $body= $_[3] (let's avoid copying that blob). Yes, this code assumes ownership of the body. |
725
|
0
|
|
|
0
|
0
|
|
my ($self, $cb, $opcode)= @_; |
726
|
|
|
|
|
|
|
return $cb->(Cassandra::Client::Error::Base->new( |
727
|
|
|
|
|
|
|
message => "Connection shutting down", |
728
|
|
|
|
|
|
|
request_error => 1, |
729
|
0
|
0
|
|
|
|
|
)) if $self->{shutdown}; |
730
|
|
|
|
|
|
|
|
731
|
0
|
|
|
|
|
|
my $pending= $self->{pending_streams}; |
732
|
|
|
|
|
|
|
|
733
|
0
|
|
|
|
|
|
my $stream_id= $self->{last_stream_id} + 1; |
734
|
0
|
|
|
|
|
|
my $attempts= 0; |
735
|
0
|
|
0
|
|
|
|
while (exists($pending->{$stream_id}) || $stream_id >= STREAM_ID_LIMIT) { |
736
|
0
|
|
|
|
|
|
$stream_id= (++$stream_id) % STREAM_ID_LIMIT; |
737
|
0
|
0
|
|
|
|
|
return $cb->(Cassandra::Client::Error::Base->new( |
738
|
|
|
|
|
|
|
message => "Cannot find a stream ID to post query with", |
739
|
|
|
|
|
|
|
request_error => 1, |
740
|
|
|
|
|
|
|
)) if ++$attempts >= STREAM_ID_LIMIT; |
741
|
|
|
|
|
|
|
} |
742
|
0
|
|
|
|
|
|
$self->{last_stream_id}= $stream_id; |
743
|
0
|
|
|
|
|
|
$pending->{$stream_id}= [$cb, $self->{async_io}->deadline($self->{fileno}, $stream_id, $self->{request_timeout})]; |
744
|
|
|
|
|
|
|
|
745
|
|
|
|
|
|
|
WRITE: { |
746
|
0
|
|
|
|
|
|
my $flags= 0; |
|
0
|
|
|
|
|
|
|
747
|
|
|
|
|
|
|
|
748
|
0
|
0
|
0
|
|
|
|
if (length($_[3]) > 500 && (my $compress_func= $self->{compress_func})) { |
749
|
0
|
|
|
|
|
|
$flags |= 1; |
750
|
0
|
|
|
|
|
|
$compress_func->($_[3]); |
751
|
|
|
|
|
|
|
} |
752
|
|
|
|
|
|
|
|
753
|
0
|
|
|
|
|
|
my $data= pack('CCsCN/a', 3, $flags, $stream_id, $opcode, $_[3]); |
754
|
|
|
|
|
|
|
|
755
|
0
|
0
|
|
|
|
|
if (defined $self->{pending_write}) { |
756
|
0
|
|
|
|
|
|
$self->{pending_write} .= $data; |
757
|
0
|
|
|
|
|
|
last WRITE; |
758
|
|
|
|
|
|
|
} |
759
|
|
|
|
|
|
|
|
760
|
0
|
0
|
|
|
|
|
if ($self->{tls}) { |
761
|
0
|
|
|
|
|
|
my $length= length $data; |
762
|
0
|
|
|
|
|
|
my $rv= Net::SSLeay::write(${$self->{tls}}, $data); |
|
0
|
|
|
|
|
|
|
763
|
0
|
0
|
|
|
|
|
if ($rv == $length) { |
|
|
0
|
|
|
|
|
|
764
|
|
|
|
|
|
|
# All good |
765
|
|
|
|
|
|
|
} elsif ($rv > 0) { |
766
|
|
|
|
|
|
|
# Partital write |
767
|
0
|
|
|
|
|
|
substr($data, 0, $rv, ''); |
768
|
0
|
|
|
|
|
|
$self->{pending_write}= $data; |
769
|
0
|
|
|
|
|
|
$self->{async_io}->register_write($self->{fileno}); |
770
|
|
|
|
|
|
|
} else { |
771
|
0
|
|
|
|
|
|
$rv= Net::SSLeay::get_error(${$self->{tls}}, $rv); |
|
0
|
|
|
|
|
|
|
772
|
0
|
0
|
0
|
|
|
|
if ($rv == ERROR_WANT_WRITE || $rv == ERROR_WANT_READ || $rv == ERROR_NONE) { |
|
|
|
0
|
|
|
|
|
773
|
|
|
|
|
|
|
# Ok... |
774
|
0
|
|
|
|
|
|
$self->{pending_write}= $data; |
775
|
0
|
0
|
|
|
|
|
if ($rv == ERROR_WANT_READ) { |
776
|
0
|
|
|
|
|
|
$self->{tls_want_write}= 1; |
777
|
|
|
|
|
|
|
} else { |
778
|
0
|
|
|
|
|
|
$self->{async_io}->register_write($self->{fileno}); |
779
|
|
|
|
|
|
|
} |
780
|
|
|
|
|
|
|
} else { |
781
|
|
|
|
|
|
|
# We failed to send the request. |
782
|
0
|
|
|
|
|
|
my $error= Net::SSLeay::ERR_error_string(Net::SSLeay::ERR_get_error()); |
783
|
|
|
|
|
|
|
|
784
|
|
|
|
|
|
|
# We never actually sent our request, so take it out again |
785
|
0
|
|
|
|
|
|
my $my_stream= delete $pending->{$stream_id}; |
786
|
|
|
|
|
|
|
|
787
|
|
|
|
|
|
|
# Disable our stream's deadline |
788
|
0
|
|
|
|
|
|
${$my_stream->[1]}= 1; |
|
0
|
|
|
|
|
|
|
789
|
|
|
|
|
|
|
|
790
|
0
|
|
|
|
|
|
$self->shutdown($error); |
791
|
|
|
|
|
|
|
|
792
|
|
|
|
|
|
|
# Now fail our stream properly, but include the retry notice |
793
|
0
|
|
|
|
|
|
$my_stream->[0]->(Cassandra::Client::Error::Base->new( |
794
|
|
|
|
|
|
|
message => "Disconnected: $error", |
795
|
|
|
|
|
|
|
do_retry => 1, |
796
|
|
|
|
|
|
|
request_error => 1, |
797
|
|
|
|
|
|
|
)); |
798
|
|
|
|
|
|
|
} |
799
|
|
|
|
|
|
|
} |
800
|
|
|
|
|
|
|
|
801
|
|
|
|
|
|
|
} else { |
802
|
0
|
|
|
|
|
|
my $length= length $data; |
803
|
0
|
|
|
|
|
|
my $result= syswrite($self->{socket}, $data, $length); |
804
|
0
|
0
|
0
|
|
|
|
if ($result && $result == $length) { |
|
|
0
|
0
|
|
|
|
|
805
|
|
|
|
|
|
|
# All good |
806
|
|
|
|
|
|
|
} elsif (defined $result || $! == EAGAIN) { |
807
|
0
|
0
|
|
|
|
|
substr($data, 0, $result, '') if $result; |
808
|
0
|
|
|
|
|
|
$self->{pending_write}= $data; |
809
|
0
|
|
|
|
|
|
$self->{async_io}->register_write($self->{fileno}); |
810
|
|
|
|
|
|
|
} else { |
811
|
|
|
|
|
|
|
# Oh, we failed to send out the request. That's bad. Let's first find out what happened. |
812
|
0
|
|
|
|
|
|
my $error= $!; |
813
|
|
|
|
|
|
|
|
814
|
|
|
|
|
|
|
# We never actually sent our request, so take it out again |
815
|
0
|
|
|
|
|
|
my $my_stream= delete $pending->{$stream_id}; |
816
|
|
|
|
|
|
|
|
817
|
|
|
|
|
|
|
# Disable our stream's deadline |
818
|
0
|
|
|
|
|
|
${$my_stream->[1]}= 1; |
|
0
|
|
|
|
|
|
|
819
|
|
|
|
|
|
|
|
820
|
0
|
|
|
|
|
|
$self->shutdown($error); |
821
|
|
|
|
|
|
|
|
822
|
|
|
|
|
|
|
# Now fail our stream properly, but include the retry notice |
823
|
0
|
|
|
|
|
|
$my_stream->[0]->(Cassandra::Client::Error::Base->new( |
824
|
|
|
|
|
|
|
message => "Disconnected: $error", |
825
|
|
|
|
|
|
|
do_retry => 1, |
826
|
|
|
|
|
|
|
request_error => 1, |
827
|
|
|
|
|
|
|
)); |
828
|
|
|
|
|
|
|
} |
829
|
|
|
|
|
|
|
} |
830
|
|
|
|
|
|
|
} |
831
|
|
|
|
|
|
|
|
832
|
0
|
|
|
|
|
|
return; |
833
|
|
|
|
|
|
|
} |
834
|
|
|
|
|
|
|
|
835
|
|
|
|
|
|
|
sub can_read { |
836
|
0
|
|
|
0
|
0
|
|
my ($self)= @_; |
837
|
0
|
|
|
|
|
|
my $shutdown_when_done; |
838
|
0
|
|
|
|
|
|
local *BUFFER= $self->{read_buffer}; |
839
|
0
|
|
|
|
|
|
my $bufsize= length $BUFFER; |
840
|
|
|
|
|
|
|
|
841
|
|
|
|
|
|
|
READ: |
842
|
0
|
|
|
|
|
|
while (!$self->{shutdown}) { |
843
|
0
|
|
|
|
|
|
my $should_read_more; |
844
|
|
|
|
|
|
|
|
845
|
0
|
0
|
|
|
|
|
if ($self->{tls}) { |
846
|
0
|
|
|
|
|
|
my ($bytes, $rv)= Net::SSLeay::read(${$self->{tls}}); |
|
0
|
|
|
|
|
|
|
847
|
0
|
0
|
|
|
|
|
if (length $bytes) { |
848
|
0
|
|
|
|
|
|
$BUFFER .= $bytes; |
849
|
0
|
|
|
|
|
|
$bufsize += $rv; |
850
|
0
|
|
|
|
|
|
$should_read_more= 1; |
851
|
|
|
|
|
|
|
} |
852
|
|
|
|
|
|
|
|
853
|
0
|
0
|
|
|
|
|
if ($rv <= 0) { |
854
|
0
|
|
|
|
|
|
$rv= Net::SSLeay::get_error(${$self->{tls}}, $rv); |
|
0
|
|
|
|
|
|
|
855
|
0
|
0
|
|
|
|
|
if ($rv == ERROR_WANT_WRITE) { |
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
856
|
0
|
|
|
|
|
|
$self->{async_io}->register_write($self->{fileno}); |
857
|
|
|
|
|
|
|
} elsif ($rv == ERROR_WANT_READ) { |
858
|
|
|
|
|
|
|
# Can do! Wait for the next event. |
859
|
|
|
|
|
|
|
|
860
|
|
|
|
|
|
|
# Resume our write if needed. |
861
|
0
|
0
|
|
|
|
|
if (delete $self->{tls_want_write}) { |
862
|
|
|
|
|
|
|
# Try our write again! |
863
|
0
|
|
|
|
|
|
$self->{async_io}->register_write($self->{fileno}); |
864
|
|
|
|
|
|
|
} |
865
|
|
|
|
|
|
|
} elsif ($rv == ERROR_NONE) { |
866
|
|
|
|
|
|
|
# Huh? |
867
|
|
|
|
|
|
|
} else { |
868
|
0
|
|
|
|
|
|
my $error= Net::SSLeay::ERR_error_string(Net::SSLeay::ERR_get_error()); |
869
|
0
|
|
|
|
|
|
$shutdown_when_done= "TLS error: $error"; |
870
|
|
|
|
|
|
|
} |
871
|
|
|
|
|
|
|
} |
872
|
|
|
|
|
|
|
|
873
|
|
|
|
|
|
|
} else { |
874
|
0
|
|
|
|
|
|
my $read_cnt= sysread($self->{socket}, $BUFFER, 16384, $bufsize); |
875
|
0
|
0
|
|
|
|
|
if ($read_cnt) { |
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
876
|
0
|
|
|
|
|
|
$bufsize += $read_cnt; |
877
|
0
|
0
|
|
|
|
|
$should_read_more= 1 if $read_cnt >= 16384; |
878
|
|
|
|
|
|
|
|
879
|
|
|
|
|
|
|
} elsif (!defined $read_cnt) { |
880
|
0
|
0
|
|
|
|
|
if ($! != EAGAIN) { |
881
|
0
|
|
|
|
|
|
my $error= "$!"; |
882
|
0
|
|
|
|
|
|
$shutdown_when_done= $error; |
883
|
|
|
|
|
|
|
} |
884
|
|
|
|
|
|
|
} elsif ($read_cnt == 0) { # EOF |
885
|
0
|
|
|
|
|
|
$shutdown_when_done= "Disconnected from server"; |
886
|
|
|
|
|
|
|
} |
887
|
|
|
|
|
|
|
} |
888
|
|
|
|
|
|
|
|
889
|
0
|
0
|
|
|
|
|
READ_NEXT: |
890
|
|
|
|
|
|
|
goto READ_MORE if $bufsize < 9; |
891
|
0
|
|
|
|
|
|
my ($version, $flags, $stream_id, $opcode, $bodylen)= unpack('CCsCN', substr($BUFFER, 0, 9)); |
892
|
0
|
0
|
|
|
|
|
if ($bufsize < $bodylen+9) { |
893
|
0
|
|
|
|
|
|
goto READ_MORE; |
894
|
|
|
|
|
|
|
} |
895
|
|
|
|
|
|
|
|
896
|
0
|
|
|
|
|
|
substr($BUFFER, 0, 9, ''); |
897
|
0
|
|
|
|
|
|
my $body= substr($BUFFER, 0, $bodylen, ''); |
898
|
0
|
|
|
|
|
|
$bufsize -= 9 + $bodylen; |
899
|
|
|
|
|
|
|
|
900
|
|
|
|
|
|
|
# Decompress if needed |
901
|
0
|
0
|
0
|
|
|
|
if (($flags & 1) && $body) { |
902
|
0
|
|
|
|
|
|
$self->{decompress_func}->($body); |
903
|
|
|
|
|
|
|
} |
904
|
|
|
|
|
|
|
|
905
|
0
|
0
|
|
|
|
|
if ($stream_id != -1) { |
906
|
0
|
|
|
|
|
|
my $stream_cb= delete $self->{pending_streams}{$stream_id}; |
907
|
0
|
0
|
|
|
|
|
if (!$stream_cb) { |
|
|
0
|
|
|
|
|
|
908
|
0
|
|
|
|
|
|
warn 'BUG: received response for unknown stream'; |
909
|
|
|
|
|
|
|
|
910
|
|
|
|
|
|
|
} elsif ($opcode == OPCODE_ERROR) { |
911
|
0
|
|
|
|
|
|
my ($cb, $dl)= @$stream_cb; |
912
|
0
|
|
|
|
|
|
$$dl= 1; |
913
|
|
|
|
|
|
|
|
914
|
0
|
|
|
|
|
|
my $error= unpack_errordata($body); |
915
|
0
|
|
|
|
|
|
$cb->($error); |
916
|
|
|
|
|
|
|
|
917
|
|
|
|
|
|
|
} else { |
918
|
0
|
|
|
|
|
|
my ($cb, $dl)= @$stream_cb; |
919
|
0
|
|
|
|
|
|
$$dl= 1; |
920
|
0
|
|
|
|
|
|
$cb->(undef, $opcode, $body); |
921
|
|
|
|
|
|
|
} |
922
|
|
|
|
|
|
|
|
923
|
|
|
|
|
|
|
} else { |
924
|
0
|
|
|
|
|
|
$self->handle_event($body); |
925
|
|
|
|
|
|
|
} |
926
|
|
|
|
|
|
|
|
927
|
0
|
|
|
|
|
|
goto READ_NEXT; |
928
|
|
|
|
|
|
|
|
929
|
0
|
0
|
|
|
|
|
READ_MORE: |
930
|
|
|
|
|
|
|
last READ unless $should_read_more; |
931
|
|
|
|
|
|
|
} |
932
|
|
|
|
|
|
|
|
933
|
0
|
0
|
|
|
|
|
if ($shutdown_when_done) { |
934
|
0
|
|
|
|
|
|
$self->shutdown($shutdown_when_done); |
935
|
|
|
|
|
|
|
} |
936
|
|
|
|
|
|
|
|
937
|
0
|
|
|
|
|
|
return; |
938
|
|
|
|
|
|
|
} |
939
|
|
|
|
|
|
|
|
940
|
|
|
|
|
|
|
sub can_write { |
941
|
0
|
|
|
0
|
0
|
|
my ($self)= @_; |
942
|
|
|
|
|
|
|
|
943
|
0
|
0
|
|
|
|
|
if ($self->{tls}) { |
944
|
0
|
|
|
|
|
|
my $rv= Net::SSLeay::write(${$self->{tls}}, $self->{pending_write}); |
|
0
|
|
|
|
|
|
|
945
|
0
|
0
|
|
|
|
|
if ($rv > 0) { |
946
|
0
|
|
|
|
|
|
substr($self->{pending_write}, 0, $rv, ''); |
947
|
0
|
0
|
|
|
|
|
if (!length $self->{pending_write}) { |
948
|
0
|
|
|
|
|
|
$self->{async_io}->unregister_write($self->{fileno}); |
949
|
0
|
|
|
|
|
|
delete $self->{pending_write}; |
950
|
|
|
|
|
|
|
} |
951
|
0
|
|
|
|
|
|
return; |
952
|
|
|
|
|
|
|
|
953
|
|
|
|
|
|
|
} else { |
954
|
0
|
|
|
|
|
|
$rv= Net::SSLeay::get_error(${$self->{tls}}, $rv); |
|
0
|
|
|
|
|
|
|
955
|
0
|
0
|
|
|
|
|
if ($rv == ERROR_WANT_WRITE) { |
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
956
|
|
|
|
|
|
|
# Wait until the next callback. |
957
|
0
|
|
|
|
|
|
return; |
958
|
|
|
|
|
|
|
} elsif ($rv == ERROR_WANT_READ) { |
959
|
|
|
|
|
|
|
# Unschedule ourselves |
960
|
0
|
|
|
|
|
|
$self->{async_io}->unregister_write($self->{fileno}); |
961
|
0
|
|
|
|
|
|
$self->{tls_want_write}= 1; |
962
|
0
|
|
|
|
|
|
return; |
963
|
|
|
|
|
|
|
} elsif ($rv == ERROR_NONE) { |
964
|
|
|
|
|
|
|
# Huh? |
965
|
0
|
|
|
|
|
|
return; |
966
|
|
|
|
|
|
|
} else { |
967
|
0
|
|
|
|
|
|
my $error= Net::SSLeay::ERR_error_string(Net::SSLeay::ERR_get_error()); |
968
|
0
|
|
|
|
|
|
return $self->shutdown("TLS error: $error"); |
969
|
|
|
|
|
|
|
} |
970
|
|
|
|
|
|
|
} |
971
|
|
|
|
|
|
|
|
972
|
|
|
|
|
|
|
} else { |
973
|
0
|
|
|
|
|
|
my $result= syswrite($self->{socket}, $self->{pending_write}); |
974
|
0
|
0
|
|
|
|
|
if (!defined($result)) { |
975
|
0
|
0
|
|
|
|
|
if ($! == EAGAIN) { |
976
|
0
|
|
|
|
|
|
return; # Huh. Oh well, whatever |
977
|
|
|
|
|
|
|
} |
978
|
|
|
|
|
|
|
|
979
|
0
|
|
|
|
|
|
my $error= "$!"; |
980
|
0
|
|
|
|
|
|
return $self->shutdown($error); |
981
|
|
|
|
|
|
|
} |
982
|
0
|
0
|
|
|
|
|
if ($result == 0) { return; } # No idea whether that happens, but guard anyway. |
|
0
|
|
|
|
|
|
|
983
|
0
|
|
|
|
|
|
substr($self->{pending_write}, 0, $result, ''); |
984
|
|
|
|
|
|
|
|
985
|
0
|
0
|
|
|
|
|
if (!length $self->{pending_write}) { |
986
|
0
|
|
|
|
|
|
$self->{async_io}->unregister_write($self->{fileno}); |
987
|
0
|
|
|
|
|
|
delete $self->{pending_write}; |
988
|
|
|
|
|
|
|
} |
989
|
|
|
|
|
|
|
} |
990
|
|
|
|
|
|
|
|
991
|
0
|
|
|
|
|
|
return; |
992
|
|
|
|
|
|
|
} |
993
|
|
|
|
|
|
|
|
994
|
|
|
|
|
|
|
sub can_timeout { |
995
|
0
|
|
|
0
|
0
|
|
my ($self, $id)= @_; |
996
|
0
|
|
|
|
|
|
my $stream= delete $self->{pending_streams}{$id}; |
997
|
0
|
|
|
0
|
|
|
$self->{pending_streams}{$id}= [ sub{}, \(my $zero= 0) ]; # fake it |
998
|
0
|
|
|
|
|
|
$stream->[0]->(Cassandra::Client::Error::Base->new( |
999
|
|
|
|
|
|
|
message => "Request timed out", |
1000
|
|
|
|
|
|
|
is_timeout => 1, |
1001
|
|
|
|
|
|
|
request_error => 1, |
1002
|
|
|
|
|
|
|
)); |
1003
|
0
|
|
|
|
|
|
return; |
1004
|
|
|
|
|
|
|
} |
1005
|
|
|
|
|
|
|
|
1006
|
|
|
|
|
|
|
sub shutdown { |
1007
|
0
|
|
|
0
|
0
|
|
my ($self, $shutdown_reason)= @_; |
1008
|
|
|
|
|
|
|
|
1009
|
0
|
0
|
|
|
|
|
return if $self->{shutdown}; |
1010
|
0
|
|
|
|
|
|
$self->{shutdown}= 1; |
1011
|
|
|
|
|
|
|
|
1012
|
0
|
|
|
|
|
|
my $pending= $self->{pending_streams}; |
1013
|
0
|
|
|
|
|
|
$self->{pending_streams}= {}; |
1014
|
|
|
|
|
|
|
|
1015
|
|
|
|
|
|
|
# Disable our deadlines |
1016
|
0
|
|
|
|
|
|
${$_->[1]}= 1 for values %$pending; |
|
0
|
|
|
|
|
|
|
1017
|
|
|
|
|
|
|
|
1018
|
0
|
|
|
|
|
|
$self->{async_io}->unregister_read($self->{fileno}); |
1019
|
0
|
0
|
|
|
|
|
if (defined(delete $self->{pending_write})) { |
1020
|
0
|
|
|
|
|
|
$self->{async_io}->unregister_write($self->{fileno}); |
1021
|
|
|
|
|
|
|
} |
1022
|
0
|
|
|
|
|
|
$self->{async_io}->unregister($self->{fileno}, $self); |
1023
|
0
|
|
|
|
|
|
$self->{client}->_disconnected($self->get_pool_id); |
1024
|
0
|
|
|
|
|
|
$self->{socket}->close; |
1025
|
|
|
|
|
|
|
|
1026
|
0
|
|
|
|
|
|
for (values %$pending) { |
1027
|
0
|
|
|
|
|
|
$_->[0]->(Cassandra::Client::Error::Base->new( |
1028
|
|
|
|
|
|
|
message => "Disconnected: $shutdown_reason", |
1029
|
|
|
|
|
|
|
request_error => 1, |
1030
|
|
|
|
|
|
|
)); |
1031
|
|
|
|
|
|
|
} |
1032
|
|
|
|
|
|
|
|
1033
|
0
|
|
|
|
|
|
return; |
1034
|
|
|
|
|
|
|
} |
1035
|
|
|
|
|
|
|
|
1036
|
|
|
|
|
|
|
|
1037
|
|
|
|
|
|
|
|
1038
|
|
|
|
|
|
|
###### COMPRESSION |
1039
|
|
|
|
|
|
|
BEGIN { |
1040
|
1
|
|
|
1
|
|
6
|
@compression_preference= qw/lz4 snappy/; |
1041
|
|
|
|
|
|
|
|
1042
|
1
|
|
|
1
|
|
60
|
%available_compression= ( |
|
1
|
|
|
1
|
|
294
|
|
|
1
|
|
|
|
|
430
|
|
|
1
|
|
|
|
|
9
|
|
|
1
|
|
|
|
|
235
|
|
|
1
|
|
|
|
|
427
|
|
|
1
|
|
|
|
|
9
|
|
1043
|
|
|
|
|
|
|
snappy => scalar eval "use Compress::Snappy (); 1;", |
1044
|
|
|
|
|
|
|
lz4 => scalar eval "use Compress::LZ4 (); 1;", |
1045
|
|
|
|
|
|
|
); |
1046
|
|
|
|
|
|
|
} |
1047
|
|
|
|
|
|
|
|
1048
|
|
|
|
|
|
|
sub setup_compression { |
1049
|
0
|
|
|
0
|
0
|
|
my ($self, $type)= @_; |
1050
|
|
|
|
|
|
|
|
1051
|
0
|
0
|
|
|
|
|
return unless $type; |
1052
|
0
|
0
|
|
|
|
|
if ($type eq 'snappy') { |
|
|
0
|
|
|
|
|
|
1053
|
0
|
|
|
|
|
|
$self->{compress_func}= \&compress_snappy; |
1054
|
0
|
|
|
|
|
|
$self->{decompress_func}= \&decompress_snappy; |
1055
|
|
|
|
|
|
|
} elsif ($type eq 'lz4') { |
1056
|
0
|
|
|
|
|
|
$self->{compress_func}= \&compress_lz4; |
1057
|
0
|
|
|
|
|
|
$self->{decompress_func}= \&decompress_lz4; |
1058
|
|
|
|
|
|
|
} else { |
1059
|
0
|
|
|
|
|
|
warn 'Internal error: failed to set compression'; |
1060
|
|
|
|
|
|
|
} |
1061
|
|
|
|
|
|
|
|
1062
|
0
|
|
|
|
|
|
return; |
1063
|
|
|
|
|
|
|
} |
1064
|
|
|
|
|
|
|
|
1065
|
|
|
|
|
|
|
sub compress_snappy { |
1066
|
0
|
|
|
0
|
0
|
|
$_[0]= Compress::Snappy::compress(\$_[0]); |
1067
|
0
|
|
|
|
|
|
return; |
1068
|
|
|
|
|
|
|
} |
1069
|
|
|
|
|
|
|
|
1070
|
|
|
|
|
|
|
sub decompress_snappy { |
1071
|
0
|
0
|
|
0
|
0
|
|
if ($_[0] ne "\0") { |
1072
|
0
|
|
|
|
|
|
$_[0]= Compress::Snappy::decompress(\$_[0]); |
1073
|
|
|
|
|
|
|
} else { |
1074
|
0
|
|
|
|
|
|
$_[0]= ''; |
1075
|
|
|
|
|
|
|
} |
1076
|
0
|
|
|
|
|
|
return; |
1077
|
|
|
|
|
|
|
} |
1078
|
|
|
|
|
|
|
|
1079
|
|
|
|
|
|
|
sub compress_lz4 { |
1080
|
0
|
|
|
0
|
0
|
|
$_[0]= pack('N', length($_[0])) . Compress::LZ4::lz4_compress(\$_[0]); |
1081
|
0
|
|
|
|
|
|
return; |
1082
|
|
|
|
|
|
|
} |
1083
|
|
|
|
|
|
|
|
1084
|
|
|
|
|
|
|
sub decompress_lz4 { |
1085
|
0
|
|
|
0
|
0
|
|
my $len= unpack('N', substr $_[0], 0, 4, ''); |
1086
|
0
|
0
|
|
|
|
|
if ($len) { |
1087
|
0
|
|
|
|
|
|
$_[0]= Compress::LZ4::lz4_decompress(\$_[0], $len); |
1088
|
|
|
|
|
|
|
} else { |
1089
|
0
|
|
|
|
|
|
$_[0]= ''; |
1090
|
|
|
|
|
|
|
} |
1091
|
0
|
|
|
|
|
|
return; |
1092
|
|
|
|
|
|
|
} |
1093
|
|
|
|
|
|
|
|
1094
|
|
|
|
|
|
|
1; |
1095
|
|
|
|
|
|
|
|
1096
|
|
|
|
|
|
|
__END__ |