File Coverage

blib/lib/ClickHouse/Encoder/TCP.pm
Criterion Covered Total %
statement 195 207 94.2
branch 52 68 76.4
condition 48 76 63.1
subroutine 36 36 100.0
pod 8 8 100.0
total 339 395 85.8


line stmt bran cond sub pod time code
1             package ClickHouse::Encoder::TCP; ## no critic (Capitalization)
2 2     2   2010 use strict;
  2         6  
  2         77  
3 2     2   9 use warnings;
  2         3  
  2         143  
4              
5             our $VERSION = '0.01';
6              
7             # Encoders/decoders for a useful subset of ClickHouse's native TCP
8             # protocol packets. Built for insert pipelines: pack a Hello + Query,
9             # then wrap encoded Native blocks in Data packets, then signal
10             # end-of-insert. Transport is the caller's job (IO::Socket,
11             # AnyEvent::Handle, IO::Async::Stream, etc.).
12             #
13             # Varint and length-prefixed string codecs are XS (shared with the
14             # main encoder's buffer helpers); packet-level layout stays in Perl
15             # for readability since it's not a hot path.
16             #
17             # Targets protocol revision 54429: predates flexible settings,
18             # inter-server secret, OpenTelemetry, parallel-replica fields, and
19             # the recent chunking negotiation extension. Modern CH servers
20             # (protocol revision >= ~54475) handshake additional bytes past
21             # Hello that this subset doesn't respond to - prefer HTTP for
22             # integration with recent servers.
23              
24             # XS loader handled by the parent module; ensure it's loaded so the
25             # XSUBs under PACKAGE = ClickHouse::Encoder::TCP are available.
26 2     2   22 use ClickHouse::Encoder ();
  2         2  
  2         68  
27              
28             ## no critic (ProhibitConstantPragma)
29             # Readability beats Readonly here - these are protocol-defined
30             # numeric tags, used as bare identifiers throughout the module.
31 2     2   7 use constant DEFAULT_REVISION => 54429;
  2         2  
  2         167  
32              
33             # Client packet types
34 2     2   9 use constant CLIENT_HELLO => 0;
  2         3  
  2         79  
35 2     2   7 use constant CLIENT_QUERY => 1;
  2         2  
  2         61  
36 2     2   5 use constant CLIENT_DATA => 2;
  2         11  
  2         58  
37 2     2   5 use constant CLIENT_CANCEL => 3;
  2         2  
  2         61  
38 2     2   5 use constant CLIENT_PING => 4;
  2         2  
  2         50  
39              
40             # Server packet types
41 2     2   6 use constant SERVER_HELLO => 0;
  2         1  
  2         67  
42 2     2   5 use constant SERVER_DATA => 1;
  2         8  
  2         59  
43 2     2   6 use constant SERVER_EXCEPTION => 2;
  2         1  
  2         68  
44 2     2   4 use constant SERVER_PROGRESS => 3;
  2         2  
  2         47  
45 2     2   4 use constant SERVER_PONG => 4;
  2         2  
  2         82  
46 2     2   6 use constant SERVER_END_OF_STREAM => 5;
  2         2  
  2         54  
47 2     2   10 use constant SERVER_PROFILE_INFO => 6;
  2         2  
  2         53  
48 2     2   6 use constant SERVER_TOTALS => 7;
  2         2  
  2         53  
49 2     2   4 use constant SERVER_EXTREMES => 8;
  2         3  
  2         1546  
50 2     2   9 use constant SERVER_TABLE_COLUMNS => 11;
  2         2  
  2         60  
51 2     2   5 use constant SERVER_PROFILE_EVENTS => 14;
  2         2  
  2         56  
52              
53             # Query processing stages
54 2     2   12 use constant STAGE_FETCH_COLUMNS => 0;
  2         3  
  2         51  
55 2     2   5 use constant STAGE_WITH_MERGEABLE => 1;
  2         2  
  2         70  
56 2     2   6 use constant STAGE_COMPLETE => 2;
  2         2  
  2         61  
57              
58             # Compression flags in Query packet
59 2     2   6 use constant COMPRESSION_DISABLE => 0;
  2         1  
  2         56  
60 2     2   6 use constant COMPRESSION_ENABLE => 1;
  2         2  
  2         3196  
61              
62             # Varint and length-prefixed string codecs are XS-backed; the bound
63             # subs (pack_varint, unpack_varint, pack_string, unpack_string)
64             # live in PACKAGE = ClickHouse::Encoder::TCP inside Encoder.xs.
65              
66             # ----- client packets ------------------------------------------------
67              
68             # Hello packet: announces our protocol revision and credentials.
69             #
70             # pack_hello(
71             # client_name => 'ClickHouse::Encoder', # default
72             # major => 1, # default
73             # minor => 0, # default
74             # revision => 54429, # default
75             # database => 'default',
76             # user => 'default',
77             # password => '',
78             # );
79             sub pack_hello {
80 1     1 1 131308 my (undef, %o) = @_;
81             return pack_varint(CLIENT_HELLO)
82             . pack_string($o{client_name} // 'ClickHouse::Encoder')
83             . pack_varint($o{major} // 1)
84             . pack_varint($o{minor} // 0)
85             . pack_varint($o{revision} // DEFAULT_REVISION)
86             . pack_string($o{database} // 'default')
87             . pack_string($o{user} // 'default')
88 1   50     47 . pack_string($o{password} // '');
      50        
      50        
      50        
      50        
      50        
      50        
89             }
90              
91             # Query packet. At revision 54429 the body is:
92             # varint CLIENT_QUERY (=1)
93             # string query_id (often empty so the server generates one)
94             # (block - see _pack_client_info)
95             # string settings (\n-separated key=value pairs, empty for none)
96             # varint query_processing_stage (default Complete)
97             # varint compression flag (default Disable)
98             # string query SQL text
99             sub pack_query {
100 12     12 1 4465 my (undef, %o) = @_;
101             return pack_varint(CLIENT_QUERY)
102             . pack_string($o{query_id} // '')
103             . _pack_client_info(\%o)
104             . _pack_settings($o{settings})
105             . pack_varint($o{stage} // STAGE_COMPLETE)
106             . pack_varint($o{compression} // COMPRESSION_DISABLE)
107 12   100     60 . pack_string($o{query} // die "pack_query: 'query' is required\n");
      100        
      50        
      100        
108             }
109              
110             # ClientInfo subblock. At revision 54429 the fields after the
111             # fixed prefix are: quota_key (since 54058), version_patch (since
112             # 54401). We use those defaults; later-revision fields (interserver
113             # secret, OpenTelemetry trace context, parallel-replica metadata)
114             # would need a higher client revision to be negotiated.
115             sub _pack_client_info {
116 12     12   12 my $o = shift;
117 12   100     47 my $rev = $o->{revision} // DEFAULT_REVISION;
118 12         12 my $out = '';
119 12   50     31 $out .= chr($o->{query_kind} // 1); # 1 = initial query
120 12   50     28 $out .= pack_string($o->{initial_user} // '');
121 12   50     970 $out .= pack_string($o->{initial_query_id} // '');
122 12   50     33 $out .= pack_string($o->{initial_address} // '0.0.0.0:0');
123             # interface: 1 = TCP
124 12         13 $out .= chr(1);
125             # os_user, client_hostname, client_name
126 12   50     24 $out .= pack_string($o->{os_user} // '');
127 12   50     30 $out .= pack_string($o->{client_hostname} // 'localhost');
128 12   50     28 $out .= pack_string($o->{client_name} // 'ClickHouse::Encoder');
129             # client_version_{major,minor}, client_revision
130 12   50     67 $out .= pack_varint($o->{client_version_major} // 1);
131 12   50     30 $out .= pack_varint($o->{client_version_minor} // 0);
132 12   33     29 $out .= pack_varint($o->{client_revision} // $rev);
133             # quota_key (rev >= 54058)
134 12 100 50     34 $out .= pack_string($o->{quota_key} // '') if $rev >= 54058;
135             # client_version_patch (rev >= 54401)
136 12 100 50     37 $out .= pack_varint($o->{client_version_patch} // 0)
137             if $rev >= 54401;
138 12         29 return $out;
139             }
140              
141             # Settings block. At revision >= 54429 each entry is:
142             # string name + varint flags + string value
143             # terminated by an empty-key string. flags is a bitmask where 0 means
144             # "ordinary setting"; bit 0x01 = IMPORTANT, bit 0x02 = CUSTOM. We
145             # always emit flags=0 - the server still accepts any setting at flag 0.
146             sub _pack_settings {
147 13     13   3922 my $s = shift;
148 13 100       79 return pack_string('') unless defined $s;
149 4 100       8 if (ref $s eq 'HASH') {
150 3         4 my $body = '';
151 3         9 for my $k (sort keys %$s) {
152 4         11 $body .= pack_string($k);
153 4         6 $body .= pack_varint(0); # flags: ordinary
154 4         10 $body .= pack_string($s->{$k});
155             }
156 3         4 $body .= pack_string(''); # end marker
157 3         16 return $body;
158             }
159             # Allow caller to pass raw bytes (already in the right shape).
160 1         6 return $s;
161             }
162              
163             # Data packet wrapping a Native block. table_name is usually empty
164             # for inserts; the server already knows the target from the Query.
165             # With compress => 'lz4' (or 'zstd') the block is wrapped in CH's
166             # compressed-block framing (16-byte CityHash128 + 9-byte header +
167             # compressed payload) before being placed on the wire; the server
168             # must have been told to expect compressed data via the Query packet's
169             # compression flag (see pack_query's `compression =E
170             # COMPRESSION_ENABLE`).
171             sub pack_data {
172 4     4 1 10711 my (undef, $block_bytes, %o) = @_;
173 4 100       15 die "pack_data: block bytes required\n" unless defined $block_bytes;
174 3 50 66     20 if (defined $o{compress} && $o{compress} ne 'none' && $o{compress} ne 'raw') {
      66        
175             $block_bytes = ClickHouse::Encoder->compress_native_block(
176 2         43 $block_bytes, mode => $o{compress});
177             }
178             return pack_varint(CLIENT_DATA)
179 3   50     25 . pack_string($o{table_name} // '')
180             . $block_bytes;
181             }
182              
183             # Empty data packet: signals end of insert. Even with negotiated
184             # compression, the end-of-insert empty block goes through the same
185             # compressed-block framing so the server's CompressedReadBuffer parses
186             # it the same way every other Data packet.
187             sub pack_data_end {
188 3     3 1 2116 my (undef, %o) = @_;
189             # A "data" packet with an empty Native block: ncols=0, nrows=0.
190 3         13 my $empty_block = pack_varint(0) . pack_varint(0); # ncols + nrows
191 3 50 66     19 if (defined $o{compress} && $o{compress} ne 'none' && $o{compress} ne 'raw') {
      66        
192             $empty_block = ClickHouse::Encoder->compress_native_block(
193 2         7 $empty_block, mode => $o{compress});
194             }
195             return pack_varint(CLIENT_DATA)
196 3   50     22 . pack_string($o{table_name} // '')
197             . $empty_block;
198             }
199              
200             sub pack_ping {
201 1     1 1 485 return pack_varint(CLIENT_PING);
202             }
203              
204             sub pack_cancel {
205 1     1 1 20 return pack_varint(CLIENT_CANCEL);
206             }
207              
208             # ----- server packet decoder -----------------------------------------
209              
210             # Parse one server packet from $bytes starting at $offset. Returns
211             # (\%packet, $new_offset). %packet always has 'type' (numeric);
212             # payload fields depend on type. Croaks on truncation; caller can
213             # pre-screen by ensuring at least 1 byte is available.
214             sub unpack_packet {
215 24     24 1 31367 my (undef, $bytes, $offset) = @_;
216 24   50     47 $offset //= 0;
217 24         108 (my $type, $offset) = unpack_varint($bytes, $offset);
218 20         39 my %pkt = (type => $type);
219 20 100 100     113 if ($type == SERVER_HELLO) {
    100 100        
    100 100        
    100          
    100          
    100          
    100          
    100          
220 4         9 ($pkt{name}, $offset) = unpack_string($bytes, $offset);
221 4         9 ($pkt{major}, $offset) = unpack_varint($bytes, $offset);
222 4         5 ($pkt{minor}, $offset) = unpack_varint($bytes, $offset);
223 4         8 ($pkt{revision}, $offset) = unpack_varint($bytes, $offset);
224             # Newer revisions add timezone / display_name / version_patch.
225             # Gate on the revision the server just reported, not on "are
226             # there bytes left": a buffer that ends mid-Hello must croak
227             # 'truncated' so the reader fetches more, not silently drop a
228             # field whose bytes simply have not arrived yet. Thresholds
229             # match ClickHouse's DBMS_MIN_REVISION_WITH_* constants.
230 4 100       8 if ($pkt{revision} >= 54058) {
231 3         11 ($pkt{timezone}, $offset)
232             = unpack_string($bytes, $offset);
233             }
234 3 100       6 if ($pkt{revision} >= 54372) {
235 2         9 ($pkt{display_name}, $offset)
236             = unpack_string($bytes, $offset);
237             }
238 3 100       5 if ($pkt{revision} >= 54401) {
239 2         5 ($pkt{version_patch}, $offset)
240             = unpack_varint($bytes, $offset);
241             }
242             }
243             elsif ($type == SERVER_EXCEPTION) {
244             # ExceptionPacket: code(Int32 LE), name, message, stack_trace, has_nested(byte)
245 1 50       4 die "unpack_packet: truncated at offset $offset"
246             if $offset + 4 > length $bytes;
247 1         5 $pkt{code} = unpack 'l<', substr($bytes, $offset, 4);
248 1         2 $offset += 4;
249 1         5 ($pkt{name}, $offset) = unpack_string($bytes, $offset);
250 1         3 ($pkt{message}, $offset) = unpack_string($bytes, $offset);
251 1         3 ($pkt{stack_trace}, $offset) = unpack_string($bytes, $offset);
252 1 50       3 die "unpack_packet: truncated at offset $offset"
253             if $offset >= length $bytes;
254 1         3 $pkt{has_nested} = ord substr($bytes, $offset++, 1);
255             }
256             elsif ($type == SERVER_PROGRESS) {
257             # At the targeted protocol revision (54429) a Progress packet
258             # is exactly these five varints. Later revisions append fields
259             # (elapsed_ns, ...), but the Progress packet carries no
260             # revision of its own, so the only safe stopping point without
261             # threading the negotiated revision through is the end of the
262             # 54429 layout. Reading a trailing field "if bytes remain"
263             # would, on a buffer that already holds the next packet,
264             # consume that packet's bytes instead.
265 2         5 ($pkt{rows}, $offset) = unpack_varint($bytes, $offset);
266 2         15 ($pkt{bytes}, $offset) = unpack_varint($bytes, $offset);
267 2         6 ($pkt{total_rows}, $offset) = unpack_varint($bytes, $offset);
268 2         4 ($pkt{written_rows}, $offset) = unpack_varint($bytes, $offset);
269 2         8 ($pkt{written_bytes}, $offset) = unpack_varint($bytes, $offset);
270             }
271             elsif ($type == SERVER_PONG || $type == SERVER_END_OF_STREAM) {
272             # No payload.
273             }
274             elsif ($type == SERVER_PROFILE_INFO) {
275 1         4 ($pkt{rows}, $offset) = unpack_varint($bytes, $offset);
276 1         4 ($pkt{blocks}, $offset) = unpack_varint($bytes, $offset);
277 1         2 ($pkt{rows_bytes}, $offset) = unpack_varint($bytes, $offset);
278 1 50       7 die "unpack_packet: truncated at offset $offset"
279             if $offset >= length $bytes;
280 1         3 $pkt{applied_limit} = ord substr($bytes, $offset++, 1);
281 1         4 ($pkt{rows_before_limit}, $offset)
282             = unpack_varint($bytes, $offset);
283 1 50       2 die "unpack_packet: truncated at offset $offset"
284             if $offset >= length $bytes;
285             $pkt{calculated_rows_before_limit}
286 1         3 = ord substr($bytes, $offset++, 1);
287             }
288             elsif ($type == SERVER_DATA || $type == SERVER_TOTALS
289             || $type == SERVER_EXTREMES) {
290             # Data packet: table_name then a Native block payload.
291             # We surface the table name and the byte range of the block;
292             # the caller passes those bytes to ClickHouse::Encoder->decode_block.
293 3         10 ($pkt{table_name}, $offset) = unpack_string($bytes, $offset);
294 3         4 $pkt{block_offset} = $offset;
295             # Caller decodes from here; advancing $offset requires parsing
296             # the inner block (ncols + nrows + per-column bytes). For the
297             # caller's convenience we don't attempt that here - they should
298             # use ClickHouse::Encoder->decode_block on the slice and add
299             # block_consumed = $decoded->{consumed} to advance.
300             }
301             elsif ($type == SERVER_TABLE_COLUMNS) {
302 1         5 ($pkt{table_name}, $offset) = unpack_string($bytes, $offset);
303 1         3 ($pkt{column_descriptor}, $offset) = unpack_string($bytes, $offset);
304             }
305             elsif ($type == SERVER_PROFILE_EVENTS) {
306             # Same shape as a Data packet.
307 1         5 ($pkt{table_name}, $offset) = unpack_string($bytes, $offset);
308 1         1 $pkt{block_offset} = $offset;
309             }
310             else {
311 1         6 die "unpack_packet: unknown server packet type $type\n";
312             }
313 18         67 return (\%pkt, $offset);
314             }
315              
316             # Blocking read helper for IO::Socket / IO::Handle. Pulls bytes until
317             # one full packet parses, returns the packet hashref. For
318             # Data/Totals/Extremes/ProfileEvents packets the inner Native block
319             # bytes are read into $pkt->{block_bytes} (the raw slice ready for
320             # ClickHouse::Encoder->decode_block); $pkt->{block} carries the
321             # pre-decoded block hashref.
322             #
323             # Reads whatever sysread returns (up to 4 KiB at a time): the
324             # protocol packets are size-self-describing, so we let unpack_packet
325             # tell us when it has enough bytes. Avoids the trap of trying to
326             # block-read N bytes when the server has only sent K < N.
327             sub read_packet {
328 5     5 1 5230 my (undef, $fh, %opts) = @_;
329             # When `compressed => 1` is passed, the inner Native block of any
330             # Data-shaped packet is expected to be wrapped in CH's
331             # compressed-block framing (16-byte CityHash128 + 9-byte header +
332             # LZ4/ZSTD/raw payload). This must match what the caller negotiated
333             # in pack_query(... compression => COMPRESSION_ENABLE); reading
334             # without `compressed => 1` from a connection that DOES use
335             # compression will misparse the Native block and croak with a
336             # decode error rather than a clean diagnostic.
337 5         10 my $compressed = $opts{compressed};
338             # Optional caller-owned buffer: pass `buffer => \my $buf` and thread
339             # the same ref through every read_packet call on this filehandle.
340             # A fast server can pack several packets into one TCP segment;
341             # read_packet then leaves the bytes it over-read in that buffer for
342             # the next call. Without it, read_packet is a one-shot helper and
343             # over-read bytes are lost - unsafe to call in a loop.
344 5         6 my $bufref = $opts{buffer};
345 5 100       10 my $buf = $bufref ? $$bufref : '';
346             my $read_some = sub {
347 4     4   25 my $got = sysread $fh, my $chunk, 4096;
348 4 50       6 die "read_packet: read error: $!\n" if !defined $got;
349 4 100       14 die "read_packet: connection closed mid-packet\n" if $got == 0;
350 3         4 $buf .= $chunk;
351 5         18 };
352              
353 5         8 my $pkt;
354             my $end; # offset just past the consumed packet
355 5         5 while (1) {
356 8         7 my $ok = eval {
357 8         14 ($pkt, $end) = __PACKAGE__->unpack_packet($buf, 0);
358 4         4 1;
359             };
360 8 100       17 last if $ok;
361 4 50       14 die $@ unless $@ =~ /truncated/;
362 4         5 $read_some->();
363             }
364              
365             # For Data-shaped packets, also pull the inner Native block bytes.
366             # decode_block tells us how many bytes it consumed; if the block
367             # is still truncated, read more and retry.
368 4 100       7 if (exists $pkt->{block_offset}) {
369 1         4 require ClickHouse::Encoder;
370 1 50       3 if ($compressed) {
371             # Step 1: pull the compressed-block frame (16 hash + 9 hdr
372             # + payload). Read more bytes until the frame is complete.
373 1         2 while (1) {
374 1         1 my $ok = eval {
375             my ($plain, $consumed) =
376             ClickHouse::Encoder->decompress_native_block(
377 1         5 $buf, offset => $pkt->{block_offset});
378             # Decode the inner Native block out of $plain.
379 1         30 my $b = ClickHouse::Encoder->decode_block($plain);
380 1         2 $pkt->{block} = $b;
381 1         2 $pkt->{block_bytes} = $plain; # decompressed bytes
382 1         4 $pkt->{compressed_consumed} = $consumed;
383 1         2 1;
384             };
385 1 50       2 last if $ok;
386             # decompress_native_block raises "truncated header" /
387             # "block extends past buffer end" when more bytes are
388             # needed; also "buffer truncated" on the inner Native
389             # parse (shouldn't fire after decompress succeeds).
390 0 0       0 die $@ unless $@ =~ /truncated|extends past|need \d+ more/;
391 0         0 $read_some->();
392             }
393 1         3 $end = $pkt->{block_offset} + $pkt->{compressed_consumed};
394             } else {
395 0         0 while (1) {
396 0         0 my $ok = eval {
397             my $b = ClickHouse::Encoder->decode_block(
398 0         0 substr($buf, $pkt->{block_offset}));
399 0         0 $pkt->{block} = $b;
400             $pkt->{block_bytes} =
401 0         0 substr($buf, $pkt->{block_offset}, $b->{consumed});
402 0         0 1;
403             };
404 0 0       0 last if $ok;
405 0 0       0 die $@ unless $@ =~ /truncated/;
406 0         0 $read_some->();
407             }
408 0         0 $end = $pkt->{block_offset} + length $pkt->{block_bytes};
409             }
410             }
411             # Hand any over-read bytes back to the caller's buffer so a looping
412             # caller does not lose packets the same sysread happened to pull in.
413             # Without a caller buffer those bytes are dropped (one-shot mode).
414 4 100       6 $$bufref = substr($buf, $end) if $bufref;
415 4         35 return $pkt;
416             }
417              
418             1;
419              
420             __END__