| line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
|
1
|
|
|
|
|
|
|
package ClickHouse::Encoder::TCP; ## no critic (Capitalization) |
|
2
|
2
|
|
|
2
|
|
2010
|
use strict; |
|
|
2
|
|
|
|
|
6
|
|
|
|
2
|
|
|
|
|
77
|
|
|
3
|
2
|
|
|
2
|
|
9
|
use warnings; |
|
|
2
|
|
|
|
|
3
|
|
|
|
2
|
|
|
|
|
143
|
|
|
4
|
|
|
|
|
|
|
|
|
5
|
|
|
|
|
|
|
our $VERSION = '0.01'; |
|
6
|
|
|
|
|
|
|
|
|
7
|
|
|
|
|
|
|
# Encoders/decoders for a useful subset of ClickHouse's native TCP |
|
8
|
|
|
|
|
|
|
# protocol packets. Built for insert pipelines: pack a Hello + Query, |
|
9
|
|
|
|
|
|
|
# then wrap encoded Native blocks in Data packets, then signal |
|
10
|
|
|
|
|
|
|
# end-of-insert. Transport is the caller's job (IO::Socket, |
|
11
|
|
|
|
|
|
|
# AnyEvent::Handle, IO::Async::Stream, etc.). |
|
12
|
|
|
|
|
|
|
# |
|
13
|
|
|
|
|
|
|
# Varint and length-prefixed string codecs are XS (shared with the |
|
14
|
|
|
|
|
|
|
# main encoder's buffer helpers); packet-level layout stays in Perl |
|
15
|
|
|
|
|
|
|
# for readability since it's not a hot path. |
|
16
|
|
|
|
|
|
|
# |
|
17
|
|
|
|
|
|
|
# Targets protocol revision 54429: predates flexible settings, |
|
18
|
|
|
|
|
|
|
# inter-server secret, OpenTelemetry, parallel-replica fields, and |
|
19
|
|
|
|
|
|
|
# the recent chunking negotiation extension. Modern CH servers |
|
20
|
|
|
|
|
|
|
# (protocol revision >= ~54475) handshake additional bytes past |
|
21
|
|
|
|
|
|
|
# Hello that this subset doesn't respond to - prefer HTTP for |
|
22
|
|
|
|
|
|
|
# integration with recent servers. |
|
23
|
|
|
|
|
|
|
|
|
24
|
|
|
|
|
|
|
# XS loader handled by the parent module; ensure it's loaded so the |
|
25
|
|
|
|
|
|
|
# XSUBs under PACKAGE = ClickHouse::Encoder::TCP are available. |
|
26
|
2
|
|
|
2
|
|
22
|
use ClickHouse::Encoder (); |
|
|
2
|
|
|
|
|
2
|
|
|
|
2
|
|
|
|
|
68
|
|
|
27
|
|
|
|
|
|
|
|
|
28
|
|
|
|
|
|
|
## no critic (ProhibitConstantPragma) |
|
29
|
|
|
|
|
|
|
# Readability beats Readonly here - these are protocol-defined |
|
30
|
|
|
|
|
|
|
# numeric tags, used as bare identifiers throughout the module. |
|
31
|
2
|
|
|
2
|
|
7
|
use constant DEFAULT_REVISION => 54429; |
|
|
2
|
|
|
|
|
2
|
|
|
|
2
|
|
|
|
|
167
|
|
|
32
|
|
|
|
|
|
|
|
|
33
|
|
|
|
|
|
|
# Client packet types |
|
34
|
2
|
|
|
2
|
|
9
|
use constant CLIENT_HELLO => 0; |
|
|
2
|
|
|
|
|
3
|
|
|
|
2
|
|
|
|
|
79
|
|
|
35
|
2
|
|
|
2
|
|
7
|
use constant CLIENT_QUERY => 1; |
|
|
2
|
|
|
|
|
2
|
|
|
|
2
|
|
|
|
|
61
|
|
|
36
|
2
|
|
|
2
|
|
5
|
use constant CLIENT_DATA => 2; |
|
|
2
|
|
|
|
|
11
|
|
|
|
2
|
|
|
|
|
58
|
|
|
37
|
2
|
|
|
2
|
|
5
|
use constant CLIENT_CANCEL => 3; |
|
|
2
|
|
|
|
|
2
|
|
|
|
2
|
|
|
|
|
61
|
|
|
38
|
2
|
|
|
2
|
|
5
|
use constant CLIENT_PING => 4; |
|
|
2
|
|
|
|
|
2
|
|
|
|
2
|
|
|
|
|
50
|
|
|
39
|
|
|
|
|
|
|
|
|
40
|
|
|
|
|
|
|
# Server packet types |
|
41
|
2
|
|
|
2
|
|
6
|
use constant SERVER_HELLO => 0; |
|
|
2
|
|
|
|
|
1
|
|
|
|
2
|
|
|
|
|
67
|
|
|
42
|
2
|
|
|
2
|
|
5
|
use constant SERVER_DATA => 1; |
|
|
2
|
|
|
|
|
8
|
|
|
|
2
|
|
|
|
|
59
|
|
|
43
|
2
|
|
|
2
|
|
6
|
use constant SERVER_EXCEPTION => 2; |
|
|
2
|
|
|
|
|
1
|
|
|
|
2
|
|
|
|
|
68
|
|
|
44
|
2
|
|
|
2
|
|
4
|
use constant SERVER_PROGRESS => 3; |
|
|
2
|
|
|
|
|
2
|
|
|
|
2
|
|
|
|
|
47
|
|
|
45
|
2
|
|
|
2
|
|
4
|
use constant SERVER_PONG => 4; |
|
|
2
|
|
|
|
|
2
|
|
|
|
2
|
|
|
|
|
82
|
|
|
46
|
2
|
|
|
2
|
|
6
|
use constant SERVER_END_OF_STREAM => 5; |
|
|
2
|
|
|
|
|
2
|
|
|
|
2
|
|
|
|
|
54
|
|
|
47
|
2
|
|
|
2
|
|
10
|
use constant SERVER_PROFILE_INFO => 6; |
|
|
2
|
|
|
|
|
2
|
|
|
|
2
|
|
|
|
|
53
|
|
|
48
|
2
|
|
|
2
|
|
6
|
use constant SERVER_TOTALS => 7; |
|
|
2
|
|
|
|
|
2
|
|
|
|
2
|
|
|
|
|
53
|
|
|
49
|
2
|
|
|
2
|
|
4
|
use constant SERVER_EXTREMES => 8; |
|
|
2
|
|
|
|
|
3
|
|
|
|
2
|
|
|
|
|
1546
|
|
|
50
|
2
|
|
|
2
|
|
9
|
use constant SERVER_TABLE_COLUMNS => 11; |
|
|
2
|
|
|
|
|
2
|
|
|
|
2
|
|
|
|
|
60
|
|
|
51
|
2
|
|
|
2
|
|
5
|
use constant SERVER_PROFILE_EVENTS => 14; |
|
|
2
|
|
|
|
|
2
|
|
|
|
2
|
|
|
|
|
56
|
|
|
52
|
|
|
|
|
|
|
|
|
53
|
|
|
|
|
|
|
# Query processing stages |
|
54
|
2
|
|
|
2
|
|
12
|
use constant STAGE_FETCH_COLUMNS => 0; |
|
|
2
|
|
|
|
|
3
|
|
|
|
2
|
|
|
|
|
51
|
|
|
55
|
2
|
|
|
2
|
|
5
|
use constant STAGE_WITH_MERGEABLE => 1; |
|
|
2
|
|
|
|
|
2
|
|
|
|
2
|
|
|
|
|
70
|
|
|
56
|
2
|
|
|
2
|
|
6
|
use constant STAGE_COMPLETE => 2; |
|
|
2
|
|
|
|
|
2
|
|
|
|
2
|
|
|
|
|
61
|
|
|
57
|
|
|
|
|
|
|
|
|
58
|
|
|
|
|
|
|
# Compression flags in Query packet |
|
59
|
2
|
|
|
2
|
|
6
|
use constant COMPRESSION_DISABLE => 0; |
|
|
2
|
|
|
|
|
1
|
|
|
|
2
|
|
|
|
|
56
|
|
|
60
|
2
|
|
|
2
|
|
6
|
use constant COMPRESSION_ENABLE => 1; |
|
|
2
|
|
|
|
|
2
|
|
|
|
2
|
|
|
|
|
3196
|
|
|
61
|
|
|
|
|
|
|
|
|
62
|
|
|
|
|
|
|
# Varint and length-prefixed string codecs are XS-backed; the bound |
|
63
|
|
|
|
|
|
|
# subs (pack_varint, unpack_varint, pack_string, unpack_string) |
|
64
|
|
|
|
|
|
|
# live in PACKAGE = ClickHouse::Encoder::TCP inside Encoder.xs. |
|
65
|
|
|
|
|
|
|
|
|
66
|
|
|
|
|
|
|
# ----- client packets ------------------------------------------------ |
|
67
|
|
|
|
|
|
|
|
|
68
|
|
|
|
|
|
|
# Hello packet: announces our protocol revision and credentials. |
|
69
|
|
|
|
|
|
|
# |
|
70
|
|
|
|
|
|
|
# pack_hello( |
|
71
|
|
|
|
|
|
|
# client_name => 'ClickHouse::Encoder', # default |
|
72
|
|
|
|
|
|
|
# major => 1, # default |
|
73
|
|
|
|
|
|
|
# minor => 0, # default |
|
74
|
|
|
|
|
|
|
# revision => 54429, # default |
|
75
|
|
|
|
|
|
|
# database => 'default', |
|
76
|
|
|
|
|
|
|
# user => 'default', |
|
77
|
|
|
|
|
|
|
# password => '', |
|
78
|
|
|
|
|
|
|
# ); |
|
79
|
|
|
|
|
|
|
sub pack_hello { |
|
80
|
1
|
|
|
1
|
1
|
131308
|
my (undef, %o) = @_; |
|
81
|
|
|
|
|
|
|
return pack_varint(CLIENT_HELLO) |
|
82
|
|
|
|
|
|
|
. pack_string($o{client_name} // 'ClickHouse::Encoder') |
|
83
|
|
|
|
|
|
|
. pack_varint($o{major} // 1) |
|
84
|
|
|
|
|
|
|
. pack_varint($o{minor} // 0) |
|
85
|
|
|
|
|
|
|
. pack_varint($o{revision} // DEFAULT_REVISION) |
|
86
|
|
|
|
|
|
|
. pack_string($o{database} // 'default') |
|
87
|
|
|
|
|
|
|
. pack_string($o{user} // 'default') |
|
88
|
1
|
|
50
|
|
|
47
|
. pack_string($o{password} // ''); |
|
|
|
|
50
|
|
|
|
|
|
|
|
|
50
|
|
|
|
|
|
|
|
|
50
|
|
|
|
|
|
|
|
|
50
|
|
|
|
|
|
|
|
|
50
|
|
|
|
|
|
|
|
|
50
|
|
|
|
|
|
89
|
|
|
|
|
|
|
} |
|
90
|
|
|
|
|
|
|
|
|
91
|
|
|
|
|
|
|
# Query packet. At revision 54429 the body is: |
|
92
|
|
|
|
|
|
|
# varint CLIENT_QUERY (=1) |
|
93
|
|
|
|
|
|
|
# string query_id (often empty so the server generates one) |
|
94
|
|
|
|
|
|
|
# (block - see _pack_client_info) |
|
95
|
|
|
|
|
|
|
# string settings (\n-separated key=value pairs, empty for none) |
|
96
|
|
|
|
|
|
|
# varint query_processing_stage (default Complete) |
|
97
|
|
|
|
|
|
|
# varint compression flag (default Disable) |
|
98
|
|
|
|
|
|
|
# string query SQL text |
|
99
|
|
|
|
|
|
|
sub pack_query { |
|
100
|
12
|
|
|
12
|
1
|
4465
|
my (undef, %o) = @_; |
|
101
|
|
|
|
|
|
|
return pack_varint(CLIENT_QUERY) |
|
102
|
|
|
|
|
|
|
. pack_string($o{query_id} // '') |
|
103
|
|
|
|
|
|
|
. _pack_client_info(\%o) |
|
104
|
|
|
|
|
|
|
. _pack_settings($o{settings}) |
|
105
|
|
|
|
|
|
|
. pack_varint($o{stage} // STAGE_COMPLETE) |
|
106
|
|
|
|
|
|
|
. pack_varint($o{compression} // COMPRESSION_DISABLE) |
|
107
|
12
|
|
100
|
|
|
60
|
. pack_string($o{query} // die "pack_query: 'query' is required\n"); |
|
|
|
|
100
|
|
|
|
|
|
|
|
|
50
|
|
|
|
|
|
|
|
|
100
|
|
|
|
|
|
108
|
|
|
|
|
|
|
} |
|
109
|
|
|
|
|
|
|
|
|
110
|
|
|
|
|
|
|
# ClientInfo subblock. At revision 54429 the fields after the |
|
111
|
|
|
|
|
|
|
# fixed prefix are: quota_key (since 54058), version_patch (since |
|
112
|
|
|
|
|
|
|
# 54401). We use those defaults; later-revision fields (interserver |
|
113
|
|
|
|
|
|
|
# secret, OpenTelemetry trace context, parallel-replica metadata) |
|
114
|
|
|
|
|
|
|
# would need a higher client revision to be negotiated. |
|
115
|
|
|
|
|
|
|
sub _pack_client_info { |
|
116
|
12
|
|
|
12
|
|
12
|
my $o = shift; |
|
117
|
12
|
|
100
|
|
|
47
|
my $rev = $o->{revision} // DEFAULT_REVISION; |
|
118
|
12
|
|
|
|
|
12
|
my $out = ''; |
|
119
|
12
|
|
50
|
|
|
31
|
$out .= chr($o->{query_kind} // 1); # 1 = initial query |
|
120
|
12
|
|
50
|
|
|
28
|
$out .= pack_string($o->{initial_user} // ''); |
|
121
|
12
|
|
50
|
|
|
970
|
$out .= pack_string($o->{initial_query_id} // ''); |
|
122
|
12
|
|
50
|
|
|
33
|
$out .= pack_string($o->{initial_address} // '0.0.0.0:0'); |
|
123
|
|
|
|
|
|
|
# interface: 1 = TCP |
|
124
|
12
|
|
|
|
|
13
|
$out .= chr(1); |
|
125
|
|
|
|
|
|
|
# os_user, client_hostname, client_name |
|
126
|
12
|
|
50
|
|
|
24
|
$out .= pack_string($o->{os_user} // ''); |
|
127
|
12
|
|
50
|
|
|
30
|
$out .= pack_string($o->{client_hostname} // 'localhost'); |
|
128
|
12
|
|
50
|
|
|
28
|
$out .= pack_string($o->{client_name} // 'ClickHouse::Encoder'); |
|
129
|
|
|
|
|
|
|
# client_version_{major,minor}, client_revision |
|
130
|
12
|
|
50
|
|
|
67
|
$out .= pack_varint($o->{client_version_major} // 1); |
|
131
|
12
|
|
50
|
|
|
30
|
$out .= pack_varint($o->{client_version_minor} // 0); |
|
132
|
12
|
|
33
|
|
|
29
|
$out .= pack_varint($o->{client_revision} // $rev); |
|
133
|
|
|
|
|
|
|
# quota_key (rev >= 54058) |
|
134
|
12
|
100
|
50
|
|
|
34
|
$out .= pack_string($o->{quota_key} // '') if $rev >= 54058; |
|
135
|
|
|
|
|
|
|
# client_version_patch (rev >= 54401) |
|
136
|
12
|
100
|
50
|
|
|
37
|
$out .= pack_varint($o->{client_version_patch} // 0) |
|
137
|
|
|
|
|
|
|
if $rev >= 54401; |
|
138
|
12
|
|
|
|
|
29
|
return $out; |
|
139
|
|
|
|
|
|
|
} |
|
140
|
|
|
|
|
|
|
|
|
141
|
|
|
|
|
|
|
# Settings block. At revision >= 54429 each entry is: |
|
142
|
|
|
|
|
|
|
# string name + varint flags + string value |
|
143
|
|
|
|
|
|
|
# terminated by an empty-key string. flags is a bitmask where 0 means |
|
144
|
|
|
|
|
|
|
# "ordinary setting"; bit 0x01 = IMPORTANT, bit 0x02 = CUSTOM. We |
|
145
|
|
|
|
|
|
|
# always emit flags=0 - the server still accepts any setting at flag 0. |
|
146
|
|
|
|
|
|
|
sub _pack_settings { |
|
147
|
13
|
|
|
13
|
|
3922
|
my $s = shift; |
|
148
|
13
|
100
|
|
|
|
79
|
return pack_string('') unless defined $s; |
|
149
|
4
|
100
|
|
|
|
8
|
if (ref $s eq 'HASH') { |
|
150
|
3
|
|
|
|
|
4
|
my $body = ''; |
|
151
|
3
|
|
|
|
|
9
|
for my $k (sort keys %$s) { |
|
152
|
4
|
|
|
|
|
11
|
$body .= pack_string($k); |
|
153
|
4
|
|
|
|
|
6
|
$body .= pack_varint(0); # flags: ordinary |
|
154
|
4
|
|
|
|
|
10
|
$body .= pack_string($s->{$k}); |
|
155
|
|
|
|
|
|
|
} |
|
156
|
3
|
|
|
|
|
4
|
$body .= pack_string(''); # end marker |
|
157
|
3
|
|
|
|
|
16
|
return $body; |
|
158
|
|
|
|
|
|
|
} |
|
159
|
|
|
|
|
|
|
# Allow caller to pass raw bytes (already in the right shape). |
|
160
|
1
|
|
|
|
|
6
|
return $s; |
|
161
|
|
|
|
|
|
|
} |
|
162
|
|
|
|
|
|
|
|
|
163
|
|
|
|
|
|
|
# Data packet wrapping a Native block. table_name is usually empty |
|
164
|
|
|
|
|
|
|
# for inserts; the server already knows the target from the Query. |
|
165
|
|
|
|
|
|
|
# With compress => 'lz4' (or 'zstd') the block is wrapped in CH's |
|
166
|
|
|
|
|
|
|
# compressed-block framing (16-byte CityHash128 + 9-byte header + |
|
167
|
|
|
|
|
|
|
# compressed payload) before being placed on the wire; the server |
|
168
|
|
|
|
|
|
|
# must have been told to expect compressed data via the Query packet's |
|
169
|
|
|
|
|
|
|
# compression flag (see pack_query's `compression =E |
|
170
|
|
|
|
|
|
|
# COMPRESSION_ENABLE`). |
|
171
|
|
|
|
|
|
|
sub pack_data { |
|
172
|
4
|
|
|
4
|
1
|
10711
|
my (undef, $block_bytes, %o) = @_; |
|
173
|
4
|
100
|
|
|
|
15
|
die "pack_data: block bytes required\n" unless defined $block_bytes; |
|
174
|
3
|
50
|
66
|
|
|
20
|
if (defined $o{compress} && $o{compress} ne 'none' && $o{compress} ne 'raw') { |
|
|
|
|
66
|
|
|
|
|
|
175
|
|
|
|
|
|
|
$block_bytes = ClickHouse::Encoder->compress_native_block( |
|
176
|
2
|
|
|
|
|
43
|
$block_bytes, mode => $o{compress}); |
|
177
|
|
|
|
|
|
|
} |
|
178
|
|
|
|
|
|
|
return pack_varint(CLIENT_DATA) |
|
179
|
3
|
|
50
|
|
|
25
|
. pack_string($o{table_name} // '') |
|
180
|
|
|
|
|
|
|
. $block_bytes; |
|
181
|
|
|
|
|
|
|
} |
|
182
|
|
|
|
|
|
|
|
|
183
|
|
|
|
|
|
|
# Empty data packet: signals end of insert. Even with negotiated |
|
184
|
|
|
|
|
|
|
# compression, the end-of-insert empty block goes through the same |
|
185
|
|
|
|
|
|
|
# compressed-block framing so the server's CompressedReadBuffer parses |
|
186
|
|
|
|
|
|
|
# it the same way every other Data packet. |
|
187
|
|
|
|
|
|
|
sub pack_data_end { |
|
188
|
3
|
|
|
3
|
1
|
2116
|
my (undef, %o) = @_; |
|
189
|
|
|
|
|
|
|
# A "data" packet with an empty Native block: ncols=0, nrows=0. |
|
190
|
3
|
|
|
|
|
13
|
my $empty_block = pack_varint(0) . pack_varint(0); # ncols + nrows |
|
191
|
3
|
50
|
66
|
|
|
19
|
if (defined $o{compress} && $o{compress} ne 'none' && $o{compress} ne 'raw') { |
|
|
|
|
66
|
|
|
|
|
|
192
|
|
|
|
|
|
|
$empty_block = ClickHouse::Encoder->compress_native_block( |
|
193
|
2
|
|
|
|
|
7
|
$empty_block, mode => $o{compress}); |
|
194
|
|
|
|
|
|
|
} |
|
195
|
|
|
|
|
|
|
return pack_varint(CLIENT_DATA) |
|
196
|
3
|
|
50
|
|
|
22
|
. pack_string($o{table_name} // '') |
|
197
|
|
|
|
|
|
|
. $empty_block; |
|
198
|
|
|
|
|
|
|
} |
|
199
|
|
|
|
|
|
|
|
|
200
|
|
|
|
|
|
|
sub pack_ping { |
|
201
|
1
|
|
|
1
|
1
|
485
|
return pack_varint(CLIENT_PING); |
|
202
|
|
|
|
|
|
|
} |
|
203
|
|
|
|
|
|
|
|
|
204
|
|
|
|
|
|
|
sub pack_cancel { |
|
205
|
1
|
|
|
1
|
1
|
20
|
return pack_varint(CLIENT_CANCEL); |
|
206
|
|
|
|
|
|
|
} |
|
207
|
|
|
|
|
|
|
|
|
208
|
|
|
|
|
|
|
# ----- server packet decoder ----------------------------------------- |
|
209
|
|
|
|
|
|
|
|
|
210
|
|
|
|
|
|
|
# Parse one server packet from $bytes starting at $offset. Returns |
|
211
|
|
|
|
|
|
|
# (\%packet, $new_offset). %packet always has 'type' (numeric); |
|
212
|
|
|
|
|
|
|
# payload fields depend on type. Croaks on truncation; caller can |
|
213
|
|
|
|
|
|
|
# pre-screen by ensuring at least 1 byte is available. |
|
214
|
|
|
|
|
|
|
sub unpack_packet { |
|
215
|
24
|
|
|
24
|
1
|
31367
|
my (undef, $bytes, $offset) = @_; |
|
216
|
24
|
|
50
|
|
|
47
|
$offset //= 0; |
|
217
|
24
|
|
|
|
|
108
|
(my $type, $offset) = unpack_varint($bytes, $offset); |
|
218
|
20
|
|
|
|
|
39
|
my %pkt = (type => $type); |
|
219
|
20
|
100
|
100
|
|
|
113
|
if ($type == SERVER_HELLO) { |
|
|
|
100
|
100
|
|
|
|
|
|
|
|
100
|
100
|
|
|
|
|
|
|
|
100
|
|
|
|
|
|
|
|
|
100
|
|
|
|
|
|
|
|
|
100
|
|
|
|
|
|
|
|
|
100
|
|
|
|
|
|
|
|
|
100
|
|
|
|
|
|
|
220
|
4
|
|
|
|
|
9
|
($pkt{name}, $offset) = unpack_string($bytes, $offset); |
|
221
|
4
|
|
|
|
|
9
|
($pkt{major}, $offset) = unpack_varint($bytes, $offset); |
|
222
|
4
|
|
|
|
|
5
|
($pkt{minor}, $offset) = unpack_varint($bytes, $offset); |
|
223
|
4
|
|
|
|
|
8
|
($pkt{revision}, $offset) = unpack_varint($bytes, $offset); |
|
224
|
|
|
|
|
|
|
# Newer revisions add timezone / display_name / version_patch. |
|
225
|
|
|
|
|
|
|
# Gate on the revision the server just reported, not on "are |
|
226
|
|
|
|
|
|
|
# there bytes left": a buffer that ends mid-Hello must croak |
|
227
|
|
|
|
|
|
|
# 'truncated' so the reader fetches more, not silently drop a |
|
228
|
|
|
|
|
|
|
# field whose bytes simply have not arrived yet. Thresholds |
|
229
|
|
|
|
|
|
|
# match ClickHouse's DBMS_MIN_REVISION_WITH_* constants. |
|
230
|
4
|
100
|
|
|
|
8
|
if ($pkt{revision} >= 54058) { |
|
231
|
3
|
|
|
|
|
11
|
($pkt{timezone}, $offset) |
|
232
|
|
|
|
|
|
|
= unpack_string($bytes, $offset); |
|
233
|
|
|
|
|
|
|
} |
|
234
|
3
|
100
|
|
|
|
6
|
if ($pkt{revision} >= 54372) { |
|
235
|
2
|
|
|
|
|
9
|
($pkt{display_name}, $offset) |
|
236
|
|
|
|
|
|
|
= unpack_string($bytes, $offset); |
|
237
|
|
|
|
|
|
|
} |
|
238
|
3
|
100
|
|
|
|
5
|
if ($pkt{revision} >= 54401) { |
|
239
|
2
|
|
|
|
|
5
|
($pkt{version_patch}, $offset) |
|
240
|
|
|
|
|
|
|
= unpack_varint($bytes, $offset); |
|
241
|
|
|
|
|
|
|
} |
|
242
|
|
|
|
|
|
|
} |
|
243
|
|
|
|
|
|
|
elsif ($type == SERVER_EXCEPTION) { |
|
244
|
|
|
|
|
|
|
# ExceptionPacket: code(Int32 LE), name, message, stack_trace, has_nested(byte) |
|
245
|
1
|
50
|
|
|
|
4
|
die "unpack_packet: truncated at offset $offset" |
|
246
|
|
|
|
|
|
|
if $offset + 4 > length $bytes; |
|
247
|
1
|
|
|
|
|
5
|
$pkt{code} = unpack 'l<', substr($bytes, $offset, 4); |
|
248
|
1
|
|
|
|
|
2
|
$offset += 4; |
|
249
|
1
|
|
|
|
|
5
|
($pkt{name}, $offset) = unpack_string($bytes, $offset); |
|
250
|
1
|
|
|
|
|
3
|
($pkt{message}, $offset) = unpack_string($bytes, $offset); |
|
251
|
1
|
|
|
|
|
3
|
($pkt{stack_trace}, $offset) = unpack_string($bytes, $offset); |
|
252
|
1
|
50
|
|
|
|
3
|
die "unpack_packet: truncated at offset $offset" |
|
253
|
|
|
|
|
|
|
if $offset >= length $bytes; |
|
254
|
1
|
|
|
|
|
3
|
$pkt{has_nested} = ord substr($bytes, $offset++, 1); |
|
255
|
|
|
|
|
|
|
} |
|
256
|
|
|
|
|
|
|
elsif ($type == SERVER_PROGRESS) { |
|
257
|
|
|
|
|
|
|
# At the targeted protocol revision (54429) a Progress packet |
|
258
|
|
|
|
|
|
|
# is exactly these five varints. Later revisions append fields |
|
259
|
|
|
|
|
|
|
# (elapsed_ns, ...), but the Progress packet carries no |
|
260
|
|
|
|
|
|
|
# revision of its own, so the only safe stopping point without |
|
261
|
|
|
|
|
|
|
# threading the negotiated revision through is the end of the |
|
262
|
|
|
|
|
|
|
# 54429 layout. Reading a trailing field "if bytes remain" |
|
263
|
|
|
|
|
|
|
# would, on a buffer that already holds the next packet, |
|
264
|
|
|
|
|
|
|
# consume that packet's bytes instead. |
|
265
|
2
|
|
|
|
|
5
|
($pkt{rows}, $offset) = unpack_varint($bytes, $offset); |
|
266
|
2
|
|
|
|
|
15
|
($pkt{bytes}, $offset) = unpack_varint($bytes, $offset); |
|
267
|
2
|
|
|
|
|
6
|
($pkt{total_rows}, $offset) = unpack_varint($bytes, $offset); |
|
268
|
2
|
|
|
|
|
4
|
($pkt{written_rows}, $offset) = unpack_varint($bytes, $offset); |
|
269
|
2
|
|
|
|
|
8
|
($pkt{written_bytes}, $offset) = unpack_varint($bytes, $offset); |
|
270
|
|
|
|
|
|
|
} |
|
271
|
|
|
|
|
|
|
elsif ($type == SERVER_PONG || $type == SERVER_END_OF_STREAM) { |
|
272
|
|
|
|
|
|
|
# No payload. |
|
273
|
|
|
|
|
|
|
} |
|
274
|
|
|
|
|
|
|
elsif ($type == SERVER_PROFILE_INFO) { |
|
275
|
1
|
|
|
|
|
4
|
($pkt{rows}, $offset) = unpack_varint($bytes, $offset); |
|
276
|
1
|
|
|
|
|
4
|
($pkt{blocks}, $offset) = unpack_varint($bytes, $offset); |
|
277
|
1
|
|
|
|
|
2
|
($pkt{rows_bytes}, $offset) = unpack_varint($bytes, $offset); |
|
278
|
1
|
50
|
|
|
|
7
|
die "unpack_packet: truncated at offset $offset" |
|
279
|
|
|
|
|
|
|
if $offset >= length $bytes; |
|
280
|
1
|
|
|
|
|
3
|
$pkt{applied_limit} = ord substr($bytes, $offset++, 1); |
|
281
|
1
|
|
|
|
|
4
|
($pkt{rows_before_limit}, $offset) |
|
282
|
|
|
|
|
|
|
= unpack_varint($bytes, $offset); |
|
283
|
1
|
50
|
|
|
|
2
|
die "unpack_packet: truncated at offset $offset" |
|
284
|
|
|
|
|
|
|
if $offset >= length $bytes; |
|
285
|
|
|
|
|
|
|
$pkt{calculated_rows_before_limit} |
|
286
|
1
|
|
|
|
|
3
|
= ord substr($bytes, $offset++, 1); |
|
287
|
|
|
|
|
|
|
} |
|
288
|
|
|
|
|
|
|
elsif ($type == SERVER_DATA || $type == SERVER_TOTALS |
|
289
|
|
|
|
|
|
|
|| $type == SERVER_EXTREMES) { |
|
290
|
|
|
|
|
|
|
# Data packet: table_name then a Native block payload. |
|
291
|
|
|
|
|
|
|
# We surface the table name and the byte range of the block; |
|
292
|
|
|
|
|
|
|
# the caller passes those bytes to ClickHouse::Encoder->decode_block. |
|
293
|
3
|
|
|
|
|
10
|
($pkt{table_name}, $offset) = unpack_string($bytes, $offset); |
|
294
|
3
|
|
|
|
|
4
|
$pkt{block_offset} = $offset; |
|
295
|
|
|
|
|
|
|
# Caller decodes from here; advancing $offset requires parsing |
|
296
|
|
|
|
|
|
|
# the inner block (ncols + nrows + per-column bytes). For the |
|
297
|
|
|
|
|
|
|
# caller's convenience we don't attempt that here - they should |
|
298
|
|
|
|
|
|
|
# use ClickHouse::Encoder->decode_block on the slice and add |
|
299
|
|
|
|
|
|
|
# block_consumed = $decoded->{consumed} to advance. |
|
300
|
|
|
|
|
|
|
} |
|
301
|
|
|
|
|
|
|
elsif ($type == SERVER_TABLE_COLUMNS) { |
|
302
|
1
|
|
|
|
|
5
|
($pkt{table_name}, $offset) = unpack_string($bytes, $offset); |
|
303
|
1
|
|
|
|
|
3
|
($pkt{column_descriptor}, $offset) = unpack_string($bytes, $offset); |
|
304
|
|
|
|
|
|
|
} |
|
305
|
|
|
|
|
|
|
elsif ($type == SERVER_PROFILE_EVENTS) { |
|
306
|
|
|
|
|
|
|
# Same shape as a Data packet. |
|
307
|
1
|
|
|
|
|
5
|
($pkt{table_name}, $offset) = unpack_string($bytes, $offset); |
|
308
|
1
|
|
|
|
|
1
|
$pkt{block_offset} = $offset; |
|
309
|
|
|
|
|
|
|
} |
|
310
|
|
|
|
|
|
|
else { |
|
311
|
1
|
|
|
|
|
6
|
die "unpack_packet: unknown server packet type $type\n"; |
|
312
|
|
|
|
|
|
|
} |
|
313
|
18
|
|
|
|
|
67
|
return (\%pkt, $offset); |
|
314
|
|
|
|
|
|
|
} |
|
315
|
|
|
|
|
|
|
|
|
316
|
|
|
|
|
|
|
# Blocking read helper for IO::Socket / IO::Handle. Pulls bytes until |
|
317
|
|
|
|
|
|
|
# one full packet parses, returns the packet hashref. For |
|
318
|
|
|
|
|
|
|
# Data/Totals/Extremes/ProfileEvents packets the inner Native block |
|
319
|
|
|
|
|
|
|
# bytes are read into $pkt->{block_bytes} (the raw slice ready for |
|
320
|
|
|
|
|
|
|
# ClickHouse::Encoder->decode_block); $pkt->{block} carries the |
|
321
|
|
|
|
|
|
|
# pre-decoded block hashref. |
|
322
|
|
|
|
|
|
|
# |
|
323
|
|
|
|
|
|
|
# Reads whatever sysread returns (up to 4 KiB at a time): the |
|
324
|
|
|
|
|
|
|
# protocol packets are size-self-describing, so we let unpack_packet |
|
325
|
|
|
|
|
|
|
# tell us when it has enough bytes. Avoids the trap of trying to |
|
326
|
|
|
|
|
|
|
# block-read N bytes when the server has only sent K < N. |
|
327
|
|
|
|
|
|
|
sub read_packet { |
|
328
|
5
|
|
|
5
|
1
|
5230
|
my (undef, $fh, %opts) = @_; |
|
329
|
|
|
|
|
|
|
# When `compressed => 1` is passed, the inner Native block of any |
|
330
|
|
|
|
|
|
|
# Data-shaped packet is expected to be wrapped in CH's |
|
331
|
|
|
|
|
|
|
# compressed-block framing (16-byte CityHash128 + 9-byte header + |
|
332
|
|
|
|
|
|
|
# LZ4/ZSTD/raw payload). This must match what the caller negotiated |
|
333
|
|
|
|
|
|
|
# in pack_query(... compression => COMPRESSION_ENABLE); reading |
|
334
|
|
|
|
|
|
|
# without `compressed => 1` from a connection that DOES use |
|
335
|
|
|
|
|
|
|
# compression will misparse the Native block and croak with a |
|
336
|
|
|
|
|
|
|
# decode error rather than a clean diagnostic. |
|
337
|
5
|
|
|
|
|
10
|
my $compressed = $opts{compressed}; |
|
338
|
|
|
|
|
|
|
# Optional caller-owned buffer: pass `buffer => \my $buf` and thread |
|
339
|
|
|
|
|
|
|
# the same ref through every read_packet call on this filehandle. |
|
340
|
|
|
|
|
|
|
# A fast server can pack several packets into one TCP segment; |
|
341
|
|
|
|
|
|
|
# read_packet then leaves the bytes it over-read in that buffer for |
|
342
|
|
|
|
|
|
|
# the next call. Without it, read_packet is a one-shot helper and |
|
343
|
|
|
|
|
|
|
# over-read bytes are lost - unsafe to call in a loop. |
|
344
|
5
|
|
|
|
|
6
|
my $bufref = $opts{buffer}; |
|
345
|
5
|
100
|
|
|
|
10
|
my $buf = $bufref ? $$bufref : ''; |
|
346
|
|
|
|
|
|
|
my $read_some = sub { |
|
347
|
4
|
|
|
4
|
|
25
|
my $got = sysread $fh, my $chunk, 4096; |
|
348
|
4
|
50
|
|
|
|
6
|
die "read_packet: read error: $!\n" if !defined $got; |
|
349
|
4
|
100
|
|
|
|
14
|
die "read_packet: connection closed mid-packet\n" if $got == 0; |
|
350
|
3
|
|
|
|
|
4
|
$buf .= $chunk; |
|
351
|
5
|
|
|
|
|
18
|
}; |
|
352
|
|
|
|
|
|
|
|
|
353
|
5
|
|
|
|
|
8
|
my $pkt; |
|
354
|
|
|
|
|
|
|
my $end; # offset just past the consumed packet |
|
355
|
5
|
|
|
|
|
5
|
while (1) { |
|
356
|
8
|
|
|
|
|
7
|
my $ok = eval { |
|
357
|
8
|
|
|
|
|
14
|
($pkt, $end) = __PACKAGE__->unpack_packet($buf, 0); |
|
358
|
4
|
|
|
|
|
4
|
1; |
|
359
|
|
|
|
|
|
|
}; |
|
360
|
8
|
100
|
|
|
|
17
|
last if $ok; |
|
361
|
4
|
50
|
|
|
|
14
|
die $@ unless $@ =~ /truncated/; |
|
362
|
4
|
|
|
|
|
5
|
$read_some->(); |
|
363
|
|
|
|
|
|
|
} |
|
364
|
|
|
|
|
|
|
|
|
365
|
|
|
|
|
|
|
# For Data-shaped packets, also pull the inner Native block bytes. |
|
366
|
|
|
|
|
|
|
# decode_block tells us how many bytes it consumed; if the block |
|
367
|
|
|
|
|
|
|
# is still truncated, read more and retry. |
|
368
|
4
|
100
|
|
|
|
7
|
if (exists $pkt->{block_offset}) { |
|
369
|
1
|
|
|
|
|
4
|
require ClickHouse::Encoder; |
|
370
|
1
|
50
|
|
|
|
3
|
if ($compressed) { |
|
371
|
|
|
|
|
|
|
# Step 1: pull the compressed-block frame (16 hash + 9 hdr |
|
372
|
|
|
|
|
|
|
# + payload). Read more bytes until the frame is complete. |
|
373
|
1
|
|
|
|
|
2
|
while (1) { |
|
374
|
1
|
|
|
|
|
1
|
my $ok = eval { |
|
375
|
|
|
|
|
|
|
my ($plain, $consumed) = |
|
376
|
|
|
|
|
|
|
ClickHouse::Encoder->decompress_native_block( |
|
377
|
1
|
|
|
|
|
5
|
$buf, offset => $pkt->{block_offset}); |
|
378
|
|
|
|
|
|
|
# Decode the inner Native block out of $plain. |
|
379
|
1
|
|
|
|
|
30
|
my $b = ClickHouse::Encoder->decode_block($plain); |
|
380
|
1
|
|
|
|
|
2
|
$pkt->{block} = $b; |
|
381
|
1
|
|
|
|
|
2
|
$pkt->{block_bytes} = $plain; # decompressed bytes |
|
382
|
1
|
|
|
|
|
4
|
$pkt->{compressed_consumed} = $consumed; |
|
383
|
1
|
|
|
|
|
2
|
1; |
|
384
|
|
|
|
|
|
|
}; |
|
385
|
1
|
50
|
|
|
|
2
|
last if $ok; |
|
386
|
|
|
|
|
|
|
# decompress_native_block raises "truncated header" / |
|
387
|
|
|
|
|
|
|
# "block extends past buffer end" when more bytes are |
|
388
|
|
|
|
|
|
|
# needed; also "buffer truncated" on the inner Native |
|
389
|
|
|
|
|
|
|
# parse (shouldn't fire after decompress succeeds). |
|
390
|
0
|
0
|
|
|
|
0
|
die $@ unless $@ =~ /truncated|extends past|need \d+ more/; |
|
391
|
0
|
|
|
|
|
0
|
$read_some->(); |
|
392
|
|
|
|
|
|
|
} |
|
393
|
1
|
|
|
|
|
3
|
$end = $pkt->{block_offset} + $pkt->{compressed_consumed}; |
|
394
|
|
|
|
|
|
|
} else { |
|
395
|
0
|
|
|
|
|
0
|
while (1) { |
|
396
|
0
|
|
|
|
|
0
|
my $ok = eval { |
|
397
|
|
|
|
|
|
|
my $b = ClickHouse::Encoder->decode_block( |
|
398
|
0
|
|
|
|
|
0
|
substr($buf, $pkt->{block_offset})); |
|
399
|
0
|
|
|
|
|
0
|
$pkt->{block} = $b; |
|
400
|
|
|
|
|
|
|
$pkt->{block_bytes} = |
|
401
|
0
|
|
|
|
|
0
|
substr($buf, $pkt->{block_offset}, $b->{consumed}); |
|
402
|
0
|
|
|
|
|
0
|
1; |
|
403
|
|
|
|
|
|
|
}; |
|
404
|
0
|
0
|
|
|
|
0
|
last if $ok; |
|
405
|
0
|
0
|
|
|
|
0
|
die $@ unless $@ =~ /truncated/; |
|
406
|
0
|
|
|
|
|
0
|
$read_some->(); |
|
407
|
|
|
|
|
|
|
} |
|
408
|
0
|
|
|
|
|
0
|
$end = $pkt->{block_offset} + length $pkt->{block_bytes}; |
|
409
|
|
|
|
|
|
|
} |
|
410
|
|
|
|
|
|
|
} |
|
411
|
|
|
|
|
|
|
# Hand any over-read bytes back to the caller's buffer so a looping |
|
412
|
|
|
|
|
|
|
# caller does not lose packets the same sysread happened to pull in. |
|
413
|
|
|
|
|
|
|
# Without a caller buffer those bytes are dropped (one-shot mode). |
|
414
|
4
|
100
|
|
|
|
6
|
$$bufref = substr($buf, $end) if $bufref; |
|
415
|
4
|
|
|
|
|
35
|
return $pkt; |
|
416
|
|
|
|
|
|
|
} |
|
417
|
|
|
|
|
|
|
|
|
418
|
|
|
|
|
|
|
1; |
|
419
|
|
|
|
|
|
|
|
|
420
|
|
|
|
|
|
|
__END__ |