File Coverage

blib/lib/ClickHouse/Encoder.pm
Criterion Covered Total %
statement 768 869 88.3
branch 367 470 78.0
condition 127 192 66.1
subroutine 73 80 91.2
pod 31 31 100.0
total 1366 1642 83.1


line stmt bran cond sub pod time code
1             package ClickHouse::Encoder;
2 68     68   5971325 use strict;
  68         97  
  68         1967  
3 68     68   239 use warnings;
  68         86  
  68         615950  
4              
5             our $VERSION = '0.01';
6              
7             require XSLoader;
8             XSLoader::load('ClickHouse::Encoder', $VERSION);
9              
10             # Validate a [db.]table identifier as ASCII word characters only.
11             # Rejects anything that could inject SQL via the --query argument.
12             sub _validate_table_name {
13 36     36   49 my $table = shift;
14 36 100       289 $table =~ /\A[A-Za-z_][A-Za-z0-9_]*(?:\.[A-Za-z_][A-Za-z0-9_]*)?\z/
15             or die "Invalid table name '$table': expected [db.]name with [A-Za-z0-9_]";
16 32         60 return;
17             }
18              
19             # Build (url, headers) for a ClickHouse HTTP request with the given SQL
20             # in the `query` parameter. Pulls connection params from %opts using
21             # the same defaults as for_table / insert_http. UTF-8 encodes before
22             # percent-escaping so non-ASCII (caf%C3%A9, emoji) round-trips correctly.
23             sub _http_url_headers {
24 41     41   297133 my ($sql, %opts) = @_;
25 41         222 require Encode;
26             my $esc = sub {
27 72     72   307 my $s = Encode::encode('UTF-8', $_[0], 0);
28 72         1833 $s =~ s/([^A-Za-z0-9\-_.~])/sprintf('%%%02X', ord($1))/ge;
  101         310  
29 72         148 $s;
30 41         143 };
31 41         123 my ($scheme, $host, $port) = _check_endpoint(\%opts);
32 31   100     102 my $database = $opts{database} // 'default';
33 31   100     71 my $user = $opts{user} // 'default';
34 31   100     94 my $password = $opts{password} // '';
35 31         75 my $url = "$scheme://$host:$port/?database=" . $esc->($database);
36 31 100       89 $url .= "&query=" . $esc->($sql) if length $sql;
37             # Per-query settings: { max_memory_usage => '...', max_execution_time => 30 }
38 31 100       87 if (my $s = $opts{settings}) {
39 3         13 for my $k (sort keys %$s) {
40 4         6 $url .= "&" . $esc->($k) . "=" . $esc->($s->{$k});
41             }
42             }
43             # Insert-side idempotency token: identical token + payload is rejected.
44 31 100       85 if (defined(my $tok = $opts{dedup_token})) {
45 4         8 $url .= "&insert_deduplication_token=" . $esc->($tok);
46             }
47 31         100 my %hdr = ('X-ClickHouse-User' => $user);
48 31 100       68 $hdr{'X-ClickHouse-Key'} = $password if $password ne '';
49 31         171 return ($url, \%hdr);
50             }
51              
52             # Validate the host/port/scheme triple shared by every HTTP entry point.
53             # Rejects anything other than http/https, ensures the port is a positive
54             # integer, and refuses host strings that contain URL-structural characters
55             # (':/?#&'). Centralised here so insert_http, bulk_inserter, ping, and
56             # select_blocks share a single allow-list and identical error messages.
57             sub _check_endpoint {
58 43     43   592 my ($opts) = @_;
59 43   100     162 my $scheme = $opts->{scheme} // 'http';
60 43   100     104 my $host = $opts->{host} // 'localhost';
61 43   100     117 my $port = $opts->{port} // 8123;
62 43 100 100     133 die "endpoint: scheme must be 'http' or 'https' (got '$scheme')\n"
63             unless $scheme eq 'http' || $scheme eq 'https';
64 41 100 100     209 die "endpoint: host must not contain URL-structural characters "
65             . "(got '$host')\n"
66             if $host =~ m{[:/?#&\s]} || !length $host;
67 37 100 100     231 die "endpoint: port must be a positive integer (got '$port')\n"
68             unless $port =~ /\A[1-9]\d{0,4}\z/ && $port < 65536;
69 33         107 return ($scheme, $host, $port);
70             }
71              
72             # Build an HTTP::Tiny instance honoring ssl_options (verify_SSL, SSL_ca_file,
73             # etc.) and keep_alive. Shared by insert_http, bulk_inserter, server_version,
74             # ping, select_blocks; callers pass %opts unchanged. Loading HTTP::Tiny here
75             # keeps the require local to HTTP code paths.
76             sub _http_tiny {
77 22     22   190827 my (%opts) = @_;
78 22         2836 require HTTP::Tiny;
79 22   100     165314 my @args = (timeout => $opts{timeout} // 60);
80 22 100       58 push @args, keep_alive => 1 if $opts{keep_alive};
81 22 100       50 push @args, SSL_options => $opts{ssl_options} if $opts{ssl_options};
82 22 100       49 push @args, verify_SSL => $opts{verify_SSL} if exists $opts{verify_SSL};
83 22         114 return HTTP::Tiny->new(@args);
84             }
85              
86             # Parse a flat CH JSON object string (X-ClickHouse-Summary /
87             # X-ClickHouse-Progress) without depending on JSON::PP. Both are small
88             # flat objects of stringified integers (read_rows, written_rows,
89             # total_rows_to_read, elapsed_ns, ...). Returns a hashref or undef.
90             sub _parse_ch_kv {
91 10     10   18 my ($str) = @_;
92 10 100 66     34 return unless defined $str && length $str;
93 5         6 my %h;
94             # NB: stash $1/$2 before the digit-test regex - that regex resets
95             # capture variables and would silently turn every key into ''.
96 5         36 while ($str =~ /"([^"\\]+)"\s*:\s*"([^"\\]*)"/g) {
97 14         61 my ($k, $v) = ($1, $2);
98 14 50       81 $h{$k} = ($v =~ /\A-?\d+\z/) ? $v + 0 : $v;
99             }
100 5 50       18 return scalar(keys %h) ? \%h : undef;
101             }
102              
103             # Lift a few ClickHouse response headers into a small hashref. Returns
104             # undef when none are present; otherwise carries query_id, server
105             # (revision), format, a parsed summary, and the final progress snapshot
106             # so callers don't reparse the same headers. X-ClickHouse-Progress is
107             # sent repeatedly while a query runs (with send_progress_in_http_headers
108             # =1); HTTP::Tiny collapses repeats into an arrayref, so we take the
109             # last - the most complete - snapshot.
110             sub _decorate_response {
111 13     13   3872 my ($resp) = @_;
112 13 50       40 return $resp unless ref $resp eq 'HASH';
113 13 100       30 my $h = $resp->{headers} or return $resp;
114 8         10 my %ch;
115 8         15 for my $k (qw(query-id server format exception-code)) {
116 32         43 my $hv = $h->{"x-clickhouse-$k"};
117 32 100       50 $ch{$k} = $hv if defined $hv;
118             }
119 8 100       25 if (my $sum = _parse_ch_kv($h->{'x-clickhouse-summary'})) {
120 3         5 $ch{summary} = $sum;
121             }
122 8 100       20 if (defined(my $pv = $h->{'x-clickhouse-progress'})) {
123 2 100       5 $pv = $pv->[-1] if ref $pv eq 'ARRAY';
124 2 50       4 if (my $prog = _parse_ch_kv($pv)) {
125 2         3 $ch{progress} = $prog;
126             }
127             }
128 8 100       22 $resp->{ch} = \%ch if %ch;
129 8         12 return $resp;
130             }
131              
132             sub for_table {
133 0     0 1 0 my ($class, $table, %opts) = @_;
134 0         0 _validate_table_name($table);
135 0         0 return $class->_for_describe("describe table $table", %opts);
136             }
137              
138             sub _for_describe {
139 0     0   0 my ($class, $describe_sql, %opts) = @_;
140              
141 0   0     0 my $via = $opts{via} // 'client';
142 0         0 my @lines;
143              
144 0 0       0 if ($via eq 'http') {
    0          
145 0         0 my ($url, $hdr) = _http_url_headers(
146             "$describe_sql format tabseparated", %opts);
147 0   0     0 my $resp = _http_tiny(%opts, timeout => $opts{timeout} // 10)
148             ->get($url, { headers => $hdr });
149             die "HTTP describe table failed (status $resp->{status}): $resp->{content}\n"
150 0 0       0 unless $resp->{success};
151 0         0 @lines = split /\n/, $resp->{content};
152             }
153             elsif ($via eq 'client') {
154 0   0     0 my $host = $opts{host} // 'localhost';
155 0   0     0 my $port = $opts{port} // 9000;
156 0   0     0 my $database = $opts{database} // 'default';
157 0   0     0 my $user = $opts{user} // 'default';
158 0   0     0 my $password = $opts{password} // '';
159 0   0     0 my $client = $opts{client} // 'clickhouse-client';
160              
161 0         0 my @cmd = (
162             $client,
163             '--host', $host,
164             '--port', $port,
165             '--database', $database,
166             '--user', $user,
167             '--query', "$describe_sql format tabseparated",
168             );
169              
170             # Pass password via env so it doesn't show up in `ps`. Always
171             # set $password (even empty) so an empty `password => ''` arg
172             # also overrides any inherited CLICKHOUSE_PASSWORD.
173 0         0 local $ENV{CLICKHOUSE_PASSWORD} = $password;
174              
175 0 0       0 open my $fh, '-|', @cmd
176             or die "Failed to run clickhouse-client: $!";
177 0         0 @lines = <$fh>;
178 0         0 close $fh;
179 0 0       0 die "clickhouse-client failed: exit code " . ($? >> 8) if $?;
180             }
181             else {
182 0         0 die "Unknown via='$via' (expected 'client' or 'http')";
183             }
184              
185 0         0 my @columns;
186             # ClickHouse's TabSeparated format escapes \\ \n \t \r \0 in field
187             # values. Multi-line type expressions (e.g. named Tuple, Nested) come
188             # out as a single TSV line with embedded \n and indentation; un-escape
189             # so the XS parser sees the canonical type string. The default branch
190             # ($1) handles \\ -> \ and any other backslash-escape forms transparently.
191             my $unesc = sub {
192 0   0 0   0 my $s = shift // '';
193 0 0       0 $s =~ s{\\(.)}{ $1 eq 'n' ? "\n"
  0 0       0  
    0          
    0          
194             : $1 eq 't' ? "\t"
195             : $1 eq 'r' ? "\r"
196             : $1 eq '0' ? "\0"
197             : $1 }ge;
198 0         0 $s;
199 0         0 };
200 0         0 for my $line (@lines) {
201 0         0 chomp $line;
202 0 0       0 next if $line eq '';
203 0         0 my ($name, $type) = split /\t/, $line;
204 0         0 push @columns, [$unesc->($name), $unesc->($type)];
205             }
206              
207 0 0       0 die "No columns found for: $describe_sql" unless @columns;
208 0         0 return $class->new(columns => \@columns);
209             }
210              
211             sub columns {
212 43     43 1 153658 my $self = shift;
213 43         221 return $self->_columns;
214             }
215              
216             sub validate_rows {
217 8     8 1 812108 my ($self, $rows) = @_;
218 8         8 my @errors;
219 8         24 for my $i (0 .. $#$rows) {
220 24         24 local $@;
221 24 100       28 unless (eval { $self->encode([$rows->[$i]]); 1 }) {
  24         100  
  20         60  
222 4         40 (my $msg = "$@") =~ s/\s*at .+ line \d+\.\s*\z//;
223 4         24 push @errors, { row => $i, error => $msg };
224             }
225             }
226 8         28 return \@errors;
227             }
228              
229             sub compressed_writer {
230 20     20 1 8912 my ($class, $mode, $writer) = @_;
231 20 100 100     112 return $writer if !defined $mode || $mode eq 'raw';
232 12 50       32 if ($mode eq 'zstd') {
233 0         0 require Compress::Zstd;
234             return sub {
235 0     0   0 my $out = Compress::Zstd::compress($_[0]);
236 0 0       0 defined $out or die "zstd compression failed";
237 0         0 $writer->($out);
238 0         0 };
239             }
240 12 100       24 if ($mode eq 'gzip') {
241 4         2664 require IO::Compress::Gzip;
242             return sub {
243 4     4   164 my $out;
244 4 50       12 IO::Compress::Gzip::gzip(\$_[0], \$out)
245             or die "gzip failed: $IO::Compress::Gzip::GzipError";
246 4         6368 $writer->($out);
247 4         132088 };
248             }
249 8         60 die "Unknown compress mode '$mode' (expected 'zstd', 'gzip', or 'raw')";
250             }
251              
252             # Compressed-block framing used by ClickHouse's CompressedReadBuffer /
253             # CompressedWriteBuffer (native TCP protocol with compression=1 enabled,
254             # and Native-over-HTTP with Content-Encoding: clickhouse-lz4 etc).
255             # Layout:
256             # 16 bytes : checksum (CityHash128 of the next 9 + N bytes)
257             # 1 byte : method tag (0x82 LZ4, 0x90 ZSTD, 0x02 none)
258             # 4 bytes : compressed_size = 1 + 4 + 4 + N (LE UInt32)
259             # 4 bytes : uncompressed_size (LE UInt32)
260             # N bytes : compressed payload
261             #
262             # CH's checksum is CityHash128 from the "cityhash102" variant (Google
263             # CityHash v1.0.2 with ClickHouse's namespace fork). We bundle a port
264             # of that algorithm in cityhash.c, exposed as the `_cityhash128` XSUB,
265             # used as the default hasher below. Callers can plug in a different
266             # 16-byte hasher via the `hasher => \&h` option.
267             my %_COMPRESS_METHOD_TAG = (lz4 => 0x82, zstd => 0x90, none => 0x02);
268              
269             sub compress_native_block {
270 27     27 1 303556 my ($class, $bytes, %opts) = @_;
271 27   100     94 my $mode = $opts{mode} // 'lz4';
272             # Default to the bundled CityHash128 v1.0.2 (the "cityhash102"
273             # variant CH itself uses for the compressed-block checksum). The
274             # `hasher` opt is still honored for callers that want to plug in
275             # something else (Digest::CityHash, a vendored copy, etc.).
276 27   100     81 my $hasher = $opts{hasher} // \&_cityhash128;
277              
278             # mode=auto: try LZ4 first, fall back to method-tag 0x02 (uncompressed
279             # inside compressed framing - same shape CH's own CompressedWriteBuffer
280             # emits when the compressed result is >= the input). Saves CPU on
281             # already-incompressible payloads without giving up the checksum
282             # protection of the framing.
283             # The auto-mode probe doubles as the payload when LZ4 wins, so the
284             # input is compressed at most once regardless of which branch wins.
285 27         29 my $payload;
286 27 100       51 if ($mode eq 'auto') {
287 3         11 require Compress::LZ4;
288 3         13 my $lz4 = Compress::LZ4::lz4_compress($bytes);
289 3 100       13 if (length($lz4) < length($bytes)) {
290 2         2 $mode = 'lz4';
291 2         3 $payload = $lz4;
292             } else {
293 1         2 $mode = 'none';
294             }
295             }
296              
297 27 100       80 my $tag = $_COMPRESS_METHOD_TAG{$mode}
298             or die "compress_native_block: unknown mode '$mode' "
299             . "(expected 'auto', 'lz4', 'zstd', or 'none')\n";
300              
301 26 100       41 if (!defined $payload) {
302 24 100       59 if ($mode eq 'lz4') {
    100          
303 17         73 require Compress::LZ4;
304             # Compress::LZ4::compress() prepends a 4-byte size header
305             # that ClickHouse doesn't want; lz4_compress is the raw-
306             # form variant emitting just the LZ4 byte stream.
307 17         99 $payload = Compress::LZ4::lz4_compress($bytes);
308             } elsif ($mode eq 'zstd') {
309 3         15 require Compress::Zstd;
310 3         211 my $z = Compress::Zstd::compress($bytes);
311 3 50       10 defined $z
312             or die "compress_native_block: zstd compression failed\n";
313 3         7 $payload = $z;
314             } else { # none = uncompressed inside framing (method tag 0x02)
315 4         5 $payload = $bytes;
316             }
317             }
318              
319 26         38 my $compressed_size = 1 + 4 + 4 + length($payload); # tag + 2*UInt32 + N
320 26         96 my $hdr = pack('C V V', $tag, $compressed_size, length $bytes);
321 26         124 my $checksum = $hasher->($hdr . $payload);
322 26 100       63 die "compress_native_block: hasher returned "
323             . length($checksum) . " bytes (expected 16)\n"
324             unless length($checksum) == 16;
325 25         91 return $checksum . $hdr . $payload;
326             }
327              
328             # Inverse of compress_native_block. Returns (uncompressed_bytes, bytes_consumed).
329             # Verifies the checksum if a hasher is supplied; if hasher is omitted the
330             # block is decompressed without verification (useful for inspecting captured
331             # data when the cityhash impl isn't available).
332             sub decompress_native_block {
333 27     27 1 14565 my ($class, $bytes, %opts) = @_;
334 27   100     88 my $off = $opts{offset} // 0;
335             # Same default as compress_native_block. Callers can pass
336             # hasher => undef explicitly to skip checksum verification.
337 27 100       59 my $hasher = exists $opts{hasher} ? $opts{hasher} : \&_cityhash128;
338 27         36 my $total = length $bytes;
339 27 50       59 die "decompress_native_block: truncated header at offset $off "
340             . "(need >= 25 bytes, have " . ($total - $off) . ")\n"
341             if $total - $off < 25;
342              
343 27         44 my $checksum = substr($bytes, $off, 16);
344 27         40 my $tag = ord substr($bytes, $off + 16, 1);
345 27         74 my $csize = unpack 'V', substr($bytes, $off + 17, 4); # incl. 9-byte header
346 27         45 my $usize = unpack 'V', substr($bytes, $off + 21, 4);
347              
348 27 50       42 die "decompress_native_block: compressed_size=$csize < 9 (corrupt)\n"
349             if $csize < 9;
350 27         36 my $payload_len = $csize - 9;
351 27         30 my $end = $off + 16 + $csize;
352 27 50       38 die "decompress_native_block: block extends past buffer end\n"
353             if $end > $total;
354              
355 27         36 my $hdr = substr($bytes, $off + 16, 9);
356 27         34 my $payload = substr($bytes, $off + 25, $payload_len);
357              
358 27 100       41 if ($hasher) {
359 24         89 my $got = $hasher->($hdr . $payload);
360 24 100       45 die "decompress_native_block: checksum mismatch\n"
361             unless $got eq $checksum;
362             }
363              
364 26         25 my $out;
365 26 100       52 if ($tag == 0x02) { # uncompressed inside framing
    100          
    100          
366 3         3 $out = $payload;
367             } elsif ($tag == 0x82) { # LZ4
368 19         65 require Compress::LZ4;
369             # lz4_decompress wants the expected uncompressed size as its
370             # second argument since the raw stream has no length prefix.
371 19         77 $out = Compress::LZ4::lz4_decompress($payload, $usize);
372 19 50       31 die "decompress_native_block: lz4 decompression failed\n"
373             unless defined $out;
374             } elsif ($tag == 0x90) { # ZSTD
375 3         13 require Compress::Zstd;
376 3         208 $out = Compress::Zstd::decompress($payload);
377 3 50       12 die "decompress_native_block: zstd decompression failed\n"
378             unless defined $out;
379             } else {
380 1         7 die sprintf("decompress_native_block: unknown method tag 0x%02x "
381             . "(expected 0x02 NONE, 0x82 LZ4, or 0x90 ZSTD)\n", $tag);
382             }
383 25 100       44 die "decompress_native_block: decompressed size mismatch "
384             . "(header says $usize, got " . length($out) . ")\n"
385             unless length($out) == $usize;
386 24 100       93 return wantarray ? ($out, $end - $off) : $out;
387             }
388              
389             # Expand any Nested(field T, ...) entries in a column list into the flat
390             # `name.field Array(T)` columns ClickHouse stores them as. Returns a new
391             # arrayref; non-Nested columns pass through unchanged. Pair with new():
392             # my $enc = ClickHouse::Encoder->new(
393             # columns => ClickHouse::Encoder->flatten_nested(\@user_columns));
394             sub flatten_nested {
395 13     13 1 32930 my ($class, $cols) = @_;
396 13         15 my @out;
397 13         23 for my $c (@$cols) {
398 17         31 my ($name, $type) = @$c;
399 17 100       102 if ($type =~ /\ANested\((.+)\)\z/s) {
400 9         28 my @parts = _split_paren_list($1);
401 9 100       33 @parts or die "Nested($1) for column '$name' has no elements";
402 8         12 for my $part (@parts) {
403 16 100       92 $part =~ /\A([A-Za-z_][A-Za-z0-9_]*)\s+(.+?)\s*\z/s
404             or die "Nested element '$part' is not 'name Type'";
405 12         48 push @out, ["$name.$1", "Array($2)"];
406             }
407             } else {
408 8         20 push @out, [$name, $type];
409             }
410             }
411 8         16 return \@out;
412             }
413              
414             # Split a comma-separated list at depth-0 commas (so Tuple(Int32, String)
415             # inside a Nested element stays one entry).
416             sub _split_paren_list {
417 9     9   23 my $body = shift;
418 9         10 my @parts;
419 9         20 my ($start, $depth, $len) = (0, 0, length $body);
420 9         24 for (my $i = 0; $i <= $len; $i++) {
421 232 100       240 my $c = $i < $len ? substr($body, $i, 1) : ',';
422 232 100 66     503 if ($c eq '(') { $depth++ }
  4 100       4  
    100          
423 4         4 elsif ($c eq ')') { $depth-- }
424             elsif ($c eq ',' && $depth == 0) {
425 17         117 (my $p = substr($body, $start, $i - $start)) =~ s/\A\s+|\s+\z//g;
426 17 100       76 push @parts, $p if length $p;
427 17         28 $start = $i + 1;
428             }
429             }
430 9         23 return @parts;
431             }
432              
433             # Row-oriented decode: { ncols, nrows, names, types, rows } where
434             # rows is an arrayref of arrayrefs. Calls the XS decode_block_rows,
435             # which distributes column values into per-row arrayrefs as each
436             # column is decoded and frees the column AV eagerly. Peak memory
437             # is one column's AV plus the row AVs (vs a Perl-side transpose
438             # that holds ALL column AVs alongside the half-built row AVs).
439             # Throughput is similar to the column-major path; the win is the
440             # tighter memory profile on wide blocks.
441             sub decode_rows {
442 4     4 1 193782 my ($class, $bytes, $offset) = @_;
443 4   50     124 return $class->decode_block_rows($bytes, $offset // 0);
444             }
445              
446             # Decode a concatenated stream of Native blocks (the body of a
447             # `select ... format native` response). Returns an arrayref of the
448             # same hashref shape as decode_block. Stops cleanly when bytes are
449             # exhausted; partial trailing bytes raise an error from XS.
450             # Uses the 3-arg form of decode_block (with offset) to avoid O(N^2)
451             # substr copies on long streams.
452             sub decode_blocks {
453 8     8 1 183430 my ($class, $bytes, $cb, %opts) = @_;
454 8         13 my $keep = $opts{keep};
455 8         8 my $off = 0;
456 8         10 my $len = length $bytes;
457             # Callback form: hand each block to $cb as it's decoded; never
458             # accumulate. Useful for streaming selects where the full block
459             # list would not fit comfortably in memory.
460 8 100       18 if ($cb) {
461 2         6 while ($off < $len) {
462 5         317 my $block = $class->decode_block($bytes, $off, $keep);
463 5         7 $off += $block->{consumed};
464 5         7 $cb->($block);
465             }
466 2         216 return;
467             }
468 6         7 my @blocks;
469 6         11 while ($off < $len) {
470 10         98 my $block = $class->decode_block($bytes, $off, $keep);
471 9         10 $off += $block->{consumed};
472 9         14 push @blocks, $block;
473             }
474 5         12 return \@blocks;
475             }
476              
477             # Return a coderef that yields one block per call (undef when done).
478             # Holds a reference to $bytes; the closure is the only thing that
479             # survives between calls.
480             sub decode_blocks_iter {
481 3     3 1 2154 my ($class, $bytes, %opts) = @_;
482 3         5 my $keep = $opts{keep};
483 3         7 my $off = 0;
484 3         5 my $len = length $bytes;
485             return sub {
486 9 100   9   1892 return if $off >= $len;
487 5         46 my $block = $class->decode_block($bytes, $off, $keep);
488 5         7 $off += $block->{consumed};
489 5         8 return $block;
490 3         13 };
491             }
492              
493             # Pull-style decoder that reads incrementally from a filehandle (or
494             # any IO::Handle-ish object). For each complete block, invokes $cb
495             # with the block hashref. Keeps a sliding buffer; on truncated decode
496             # it reads more bytes and retries. Useful when the response body is
497             # too large to fit in memory.
498             sub decode_stream {
499 6     6 1 18808 my ($class, $fh, $cb, %opts) = @_;
500 6   100     21 my $chunk_size = $opts{chunk_size} // 64 * 1024;
501 6         7 my $keep = $opts{keep};
502 6         5 my $decompress = $opts{decompress};
503 6         7 my $buf = ''; # raw bytes from the filehandle
504 6         9 my $inner = ''; # decompressed Native bytes (== $buf when !decompress)
505 6         6 my $done = 0;
506 6         12 until ($done) {
507             # Phase 1: peel compressed-block frames out of $buf into $inner.
508 47 100       51 if ($decompress) {
509 12         17 while (length($buf) >= 25) {
510 10         14 my $csize = unpack 'V', substr($buf, 17, 4);
511 10 100       30 last if length($buf) < 16 + $csize;
512 5         5 my ($plain, $consumed) = eval {
513 5         8 $class->decompress_native_block($buf)
514             };
515 5 50       6 die $@ if $@;
516 5         4 $inner .= $plain;
517 5         10 substr($buf, 0, $consumed, '');
518             }
519             } else {
520 35         31 $inner = $buf;
521             }
522             # Phase 2: decode complete Native blocks out of $inner.
523 47         74 while (length($inner) > 0) {
524 40         32 my $block = eval { $class->decode_block($inner, 0, $keep) };
  40         277  
525 40 100       53 if ($@) {
526             # Truncation or malformed mid-block; need more bytes.
527             # Only "buffer truncated" means "data ran short, read
528             # more"; "exceeds remaining" indicates a malformed wire
529             # value (e.g. a corrupted varint count) and should die
530             # rather than spin reading more bytes.
531 30 50       69 last if $@ =~ /buffer truncated/i;
532 0         0 die $@; # real error
533             }
534 10         20 $cb->($block);
535 10         531 substr($inner, 0, $block->{consumed}, '');
536             }
537             # When not decompressing, $inner aliases $buf - carry the residual
538             # back so the next read sees the unconsumed tail.
539 47 100       53 $buf = $inner unless $decompress;
540             # Read more bytes. read() returns 0 on EOF, undef on error.
541 47         28 my $more;
542 47         93 my $n = read $fh, $more, $chunk_size;
543 47 50       57 die "decode_stream: read error: $!" if !defined $n;
544 47 100       49 if ($n == 0) {
545             # EOF. If anything is left in either buffer it's a truncated
546             # final block; raise rather than swallow.
547 6 100 100     25 die "decode_stream: " . length($buf) . " trailing bytes "
548             . "after last complete compressed block"
549             if $decompress && length $buf;
550 5 100       16 die "decode_stream: " . length($inner) . " trailing bytes "
551             . "after last complete block" if length $inner;
552 4         7 $done = 1;
553             } else {
554 41         56 $buf .= $more;
555             }
556             }
557 4         7 return;
558             }
559              
560             # Query the ClickHouse HTTP endpoint for its version. Returns a list
561             # of (major, minor, patch, build) integers and the raw string. Useful
562             # for capability gating in user code (e.g. only use JSON columns if
563             # the server is at least 24.8). HTTP-only; native TCP not supported.
564             sub server_version {
565 5     5 1 193592 my ($class, %opts) = @_;
566 5         13 my ($url, $hdr) = _http_url_headers('select version()', %opts);
567 5   50     22 my $resp = _http_tiny(%opts, timeout => $opts{timeout} // 10)
568             ->get($url, { headers => $hdr });
569             die "HTTP select version() failed (status $resp->{status}): "
570             . "$resp->{content}\n"
571 5 100       433 unless $resp->{success};
572 4         19 (my $raw = $resp->{content}) =~ s/\s+\z//;
573 4         18 my @parts = ($raw =~ /(\d+)/g);
574             return wantarray
575 4 100 50     38 ? (@parts, $raw)
      50        
      50        
      100        
576             : { major => $parts[0] // 0, minor => $parts[1] // 0,
577             patch => $parts[2] // 0, build => $parts[3] // 0,
578             raw => $raw };
579             }
580              
581             # Lightweight liveness check via CH's /ping endpoint. Returns 1 on
582             # success; croaks on HTTP failure (which includes connection refused,
583             # timeout, or non-2xx response). Use to gate on server availability in
584             # bootstrap scripts and bulk-load orchestration.
585             sub ping {
586 1     1 1 126882 my ($class, %opts) = @_;
587 1         4 my ($scheme, $host, $port) = _check_endpoint(\%opts);
588 1         3 my $url = "$scheme://$host:$port/ping";
589 1   50     5 my $resp = _http_tiny(%opts, timeout => $opts{timeout} // 5)->get($url);
590             die "ping: HTTP $resp->{status}: $resp->{content}\n"
591 1 50       2945 unless $resp->{success};
592 0         0 return 1;
593             }
594              
595             # Parse a Well-Known-Text (WKT) geometry string into the nested-arrayref
596             # representation that the Geo column encoders accept. Supported geometries:
597             # POINT, LINESTRING, MULTILINESTRING, POLYGON, MULTIPOLYGON. Coordinates
598             # are decimal numbers separated by whitespace; rings/parts are
599             # parenthesized. The Ring CH type has no WKT name (it is a single closed
600             # LINESTRING); accept LINESTRING for Ring as well. Returns the structure
601             # only; the caller chooses which Geo column to feed it into.
602             sub parse_wkt {
603 20     20 1 138228 my ($class, $wkt) = @_;
604 20 100       45 die "parse_wkt: input required\n" unless defined $wkt;
605 19         51 $wkt =~ s/\A\s+//;
606 19         60 $wkt =~ s/\s+\z//;
607 19 100       112 my ($kind, $rest) = $wkt =~ /\A([A-Za-z]+)\s*(.*)\z/s
608             or die "parse_wkt: not a WKT geometry: $wkt\n";
609 18         28 $kind = uc $kind;
610             # Strip the outermost parens (which every geometry has) once for
611             # uniform downstream parsing. EMPTY is rejected because CH Geo
612             # columns have no null/empty representation other than zero-length.
613 18 100       57 $rest =~ s/\A\(\s*//
614             or die "parse_wkt: $kind missing '(': $wkt\n";
615 17 100       70 $rest =~ s/\s*\)\z//
616             or die "parse_wkt: $kind unmatched parens: $wkt\n";
617 16 100       28 if ($kind eq 'POINT') {
618 5         10 return _wkt_point($rest);
619             }
620 11 100       23 if ($kind eq 'LINESTRING') {
621 3         7 return [ _wkt_points($rest) ];
622             }
623 8 100 100     26 if ($kind eq 'MULTILINESTRING' || $kind eq 'POLYGON') {
624             # Same shape on the wire (CH Polygon = ring + holes; outer ring
625             # comes first; MultiLineString is parallel parts). Parse as a
626             # list of paren-wrapped point-lists.
627 4         5 my @parts;
628 4         22 while ($rest =~ /\G\s*,?\s*\(\s*([^()]+?)\s*\)/gc) {
629 5         7 push @parts, [ _wkt_points($1) ];
630             }
631 4 100       11 die "parse_wkt: $kind: no parts parsed in $wkt\n" unless @parts;
632 3         31 return \@parts;
633             }
634 4 100       9 if ($kind eq 'MULTIPOLYGON') {
635 3         4 my @polys;
636             # MULTIPOLYGON(((...),(...)), ((...))): the outer level is
637             # polygons, each polygon is a list of rings.
638 3         16 while ($rest =~ /\G\s*,?\s*\(\s*(.*?)\s*\)\s*(?=,|\z)/gcs) {
639 3         6 my $poly_body = $1;
640 3         2 my @rings;
641 3         14 while ($poly_body =~ /\G\s*,?\s*\(\s*([^()]+?)\s*\)/gc) {
642 3         4 push @rings, [ _wkt_points($1) ];
643             }
644 3 50       5 die "parse_wkt: MULTIPOLYGON ring parse failed in $wkt\n"
645             unless @rings;
646 3         11 push @polys, \@rings;
647             }
648 3 100       13 die "parse_wkt: MULTIPOLYGON: no polygons parsed in $wkt\n"
649             unless @polys;
650 2         18 return \@polys;
651             }
652 1         6 die "parse_wkt: unsupported geometry '$kind'\n";
653             }
654              
655             sub _wkt_point {
656 5     5   6 my ($s) = @_;
657 5         9 my @c = split /\s+/, $s;
658 5 100       16 die "parse_wkt: POINT needs 2 coords, got '$s'\n" unless @c == 2;
659 4         6 return [ map { $_ + 0 } @c ];
  8         38  
660             }
661              
662             sub _wkt_points {
663 11     11   16 my ($s) = @_;
664 11         10 my @pts;
665 11         39 for my $pair (split /\s*,\s*/, $s) {
666 38         44 my @c = split /\s+/, $pair;
667 38 100       59 die "parse_wkt: point needs 2 coords, got '$pair'\n" unless @c == 2;
668 37         34 push @pts, [ map { $_ + 0 } @c ];
  74         96  
669             }
670 10         35 return @pts;
671             }
672              
673             # Return the list of supported ClickHouse type names (parametric types
674             # are listed as their syntactic prefix). For runtime feature detection
675             # and tooling.
676             sub types {
677 1     1 1 543 return (qw(
678             Int8 Int16 Int32 Int64
679             UInt8 UInt16 UInt32 UInt64
680             Float32 Float64 BFloat16
681             String FixedString
682             Date Date32 DateTime DateTime64
683             Decimal Decimal32 Decimal64 Decimal128 Decimal256
684             Enum8 Enum16
685             Bool Boolean UUID IPv4 IPv6
686             Array Tuple Nullable Map LowCardinality Variant
687             Point Ring LineString MultiLineString Polygon MultiPolygon
688             SimpleAggregateFunction
689             JSON Object Dynamic
690             ));
691             }
692              
693             # Compare two column lists, return { added, removed, changed }. Each
694             # slot holds [name, type] pairs from $b (added/changed) or $a (removed).
695             # Useful for migration scripts and detecting drift between source and
696             # destination schemas in CH-to-CH pipelines.
697             sub schema_diff {
698 13     13 1 279535 my ($class, $a, $b) = @_;
699 13         26 my %a = map { $_->[0] => $_->[1] } @$a;
  19         53  
700 13         24 my %b = map { $_->[0] => $_->[1] } @$b;
  21         43  
701 13         19 my (@added, @removed, @changed);
702 13         49 for my $name (sort keys %b) {
703 21 100       109 if (!exists $a{$name}) {
    100          
704 7         38 push @added, [$name, $b{$name}];
705             } elsif ($a{$name} ne $b{$name}) {
706 5         16 push @changed, [$name, $a{$name}, $b{$name}];
707             }
708             }
709 13         27 for my $name (sort keys %a) {
710 19 100       45 push @removed, [$name, $a{$name}] unless exists $b{$name};
711             }
712 13         79 return { added => \@added, removed => \@removed, changed => \@changed };
713             }
714              
715             # Quote a CH identifier (table name, column name) with backticks,
716             # escaping any backtick within. CH's lexer processes C-style backslash
717             # escapes inside backtick identifiers, so a literal backslash must be
718             # escaped too (else `a\` would read as an escaped backtick and the
719             # identifier would never close). This mirrors ClickHouse's own
720             # backQuote(): backslash first, then backtick. CH also accepts
721             # double-quote quoting; we use backticks because show create table does.
722             sub _quote_ident {
723 49     49   62 my $name = shift;
724 49         67 $name =~ s/\\/\\\\/g;
725 49         55 $name =~ s/`/\\`/g;
726 49         110 return "`$name`";
727             }
728              
729             # Render one column definition for format_create_table. A column entry
730             # is [name, type] or [name, type, \%col] where %col may carry
731             # default/materialized/alias (mutually exclusive), codec, ttl, comment.
732             # Expression-valued keys are inserted verbatim - the caller owns their
733             # SQL-correctness; only the comment is quoted (it is the one string
734             # literal). Clause order matches CH's own `show create table` output;
735             # keywords are lowercased to match the rest of the generated SQL.
736             sub _format_column {
737 22     22   30 my ($col) = @_;
738 22         39 my ($name, $type, $extra) = @$col;
739 22         36 my $sql = _quote_ident($name) . ' ' . $type;
740 22 100 66     84 return $sql unless $extra && ref $extra eq 'HASH';
741              
742 8         9 my @kind = grep { defined $extra->{$_} }
  24         35  
743             qw(default materialized alias);
744 8 100       23 die "format_create_table: column '$name' has more than one of "
745             . "default/materialized/alias\n"
746             if @kind > 1;
747 7 100       9 if (@kind) {
748 3         5 $sql .= " $kind[0] $extra->{$kind[0]}";
749             }
750 7 100       13 $sql .= " codec($extra->{codec})" if defined $extra->{codec};
751 7 100       9 $sql .= " ttl $extra->{ttl}" if defined $extra->{ttl};
752 7 100       9 if (defined $extra->{comment}) {
753             # CH string literals take C-style backslash escapes; escape the
754             # backslash before the quote so an embedded backslash survives.
755 3         7 (my $c = $extra->{comment}) =~ s/\\/\\\\/g;
756 3         5 $c =~ s/'/\\'/g;
757 3         5 $sql .= " comment '$c'";
758             }
759 7         16 return $sql;
760             }
761              
762             # Emit a create table statement for the given column list. Table-level
763             # opts (engine, partition_by, primary_key, order_by, sample_by, ttl,
764             # settings) are emitted in CH's canonical `show create table` order
765             # and inserted verbatim - the caller owns SQL correctness there; this
766             # helper validates only the column list. Per-column
767             # default/materialized/alias/codec/ttl/comment are supported by passing
768             # [name, type, \%col] entries. Returns the SQL string (no trailing
769             # semicolon).
770             sub format_create_table {
771 13     13 1 150317 my ($class, %opts) = @_;
772 13   50     42 my $table = $opts{table} // die "format_create_table: 'table' required\n";
773 13   50     44 my $columns = $opts{columns} // die "format_create_table: 'columns' arrayref required\n";
774 13   100     41 my $engine = $opts{engine} // 'MergeTree';
775 13         32 _validate_table_name($table);
776              
777 12         31 my $body = join ",\n ", map { _format_column($_) } @$columns;
  22         35  
778              
779 11         27 my $sql = "create table " . _quote_ident($table) . " (\n $body\n)";
780 11         28 $sql .= "\nengine = $engine";
781 11 100       19 $sql .= "\npartition by $opts{partition_by}" if defined $opts{partition_by};
782 11 100       40 $sql .= "\nprimary key $opts{primary_key}" if defined $opts{primary_key};
783 11 100       22 $sql .= "\norder by $opts{order_by}" if defined $opts{order_by};
784 11 100       24 $sql .= "\nsample by $opts{sample_by}" if defined $opts{sample_by};
785 11 100       29 $sql .= "\nttl $opts{ttl}" if defined $opts{ttl};
786 11 100       21 $sql .= "\nsettings $opts{settings}" if defined $opts{settings};
787 11         31 return $sql;
788             }
789              
790             # Translate a schema_diff hashref into a list of alter table statements
791             # (one per column change). Returns an arrayref of SQL strings; the
792             # caller decides whether to apply them transactionally or one at a
793             # time. Conservative ordering: drops first, then modifies, then adds,
794             # so a column-rename modeled as drop+add ends up with the new column
795             # at the right position.
796             sub apply_schema_diff {
797 8     8 1 526 my ($class, $diff, %opts) = @_;
798 8   100     49 my $table = $opts{table} // die "apply_schema_diff: 'table' required\n";
799 7         21 _validate_table_name($table);
800 6         10 my $qt = _quote_ident($table);
801 6         8 my @sql;
802 6   50     7 for my $row (@{ $diff->{removed} // [] }) {
  6         18  
803 3         10 push @sql, "alter table $qt drop column "
804             . _quote_ident($row->[0]);
805             }
806 6   50     8 for my $row (@{ $diff->{changed} // [] }) {
  6         14  
807 3         8 push @sql, "alter table $qt modify column "
808             . _quote_ident($row->[0]) . " $row->[2]";
809             }
810 6   50     9 for my $row (@{ $diff->{added} // [] }) {
  6         14  
811 4         10 push @sql, "alter table $qt add column "
812             . _quote_ident($row->[0]) . " $row->[1]";
813             }
814 6         19 return \@sql;
815             }
816              
817             # Unquote a CH identifier: strip surrounding backticks and collapse both
818             # escape conventions CH's lexer accepts - C-style backslash escapes
819             # (\X -> X, what backQuote() and _quote_ident emit) and doubled
820             # backticks (`` -> `). Bare identifiers pass through untouched.
821             sub _unquote_ident {
822 33     33   61 my $s = shift;
823 33 100       72 if ($s =~ /\A`(.*)`\z/s) {
824 23         26 $s = $1;
825 23 100       43 $s =~ s/\\(.)|``/defined $1 ? $1 : '`'/ges;
  2         8  
826             }
827 33         69 return $s;
828             }
829              
830             # Given the index of an opening backtick in $str, return the index just
831             # past the matching closing backtick. Honors both escape conventions: a
832             # backslash escapes the next char, and a doubled `` is a literal
833             # backtick. Returns length($str) if the quote is never closed. The
834             # CREATE-TABLE scanners below use this so an escaped backtick inside an
835             # identifier cannot be mistaken for the end of the quoted region.
836             sub _skip_backtick_quoted {
837 42     42   45 my ($str, $i) = @_;
838 42         40 my $len = length $str;
839 42         37 $i++; # past the opening backtick
840 42         43 while ($i < $len) {
841 157         133 my $c = substr($str, $i, 1);
842 157 100       154 if ($c eq '\\') { $i += 2; next }
  2         1  
  2         2  
843 155 100       144 if ($c eq '`') {
844 44 100 100     145 return $i + 1
845             unless $i + 1 < $len && substr($str, $i + 1, 1) eq '`';
846 2         2 $i += 2; # doubled-backtick escape
847 2         3 next;
848             }
849 111         103 $i++;
850             }
851 0         0 return $len;
852             }
853              
854             # Split a (possibly backtick-quoted, possibly database-qualified) table
855             # name into ($database, $table). The qualifying dot is the first one
856             # that is not inside backticks; a name with no such dot has an undef
857             # database.
858             sub _split_qname {
859 12     12   21 my ($qname) = @_;
860 12         27 my $len = length $qname;
861 12         8 my $dot = -1;
862 12         28 for (my $i = 0; $i < $len; $i++) {
863 27         33 my $c = substr($qname, $i, 1);
864 27 100       36 if ($c eq '`') { $i = _skip_backtick_quoted($qname, $i) - 1; next }
  3         7  
  3         7  
865 24 100       39 if ($c eq '.') { $dot = $i; last }
  2         3  
  2         4  
866             }
867 12 100       39 return (undef, _unquote_ident($qname)) if $dot < 0;
868 2         29 return (_unquote_ident(substr($qname, 0, $dot)),
869             _unquote_ident(substr($qname, $dot + 1)));
870             }
871              
872             # Split a create table column block on top-level commas, respecting both
873             # parentheses (nested type args) and backtick-quoted identifiers (which
874             # may legally contain commas or parens). _split_paren_list alone is not
875             # backtick-aware, hence this dedicated splitter.
876             sub _split_column_defs {
877 10     10   10 my $body = shift;
878 10         10 my @parts;
879 10         13 my ($start, $depth, $len) = (0, 0, length $body);
880 10         13 for (my $i = 0; $i <= $len; $i++) {
881 337 100       325 my $c = $i < $len ? substr($body, $i, 1) : ',';
882 337 100       331 if ($c eq '`') { $i = _skip_backtick_quoted($body, $i) - 1; next }
  19         19  
  19         26  
883 318 100 100     668 if ($c eq '(') { $depth++ }
  8 100       21  
    100          
884 8         9 elsif ($c eq ')') { $depth-- }
885             elsif ($c eq ',' && $depth == 0) {
886 19         108 (my $p = substr($body, $start, $i - $start))
887             =~ s/\A\s+|\s+\z//g;
888 19 50       32 push @parts, $p if length $p;
889 19         28 $start = $i + 1;
890             }
891             }
892 10         20 return @parts;
893             }
894              
895             # Parse the output of `show create table` (or any create table DDL) into
896             # a structured hashref: { database, table, columns => [[name,type],...],
897             # engine, order_by, partition_by, primary_key, sample_by, ttl, settings }.
898             # Clause values are returned verbatim (trimmed); columns is in the same
899             # [name,type] shape schema_diff and format_create_table consume, so a
900             # round-trip CH -> parse -> diff -> ALTER is one call each. Per-column
901             # DEFAULT/CODEC/TTL modifiers are dropped from the type (they are not
902             # part of the type proper); the bare type is what CH's own `describe`
903             # would report. Croaks if no create table header or column block is found.
904             sub parse_create_table {
905 14     14 1 164529 my ($class, $ddl) = @_;
906 14 100       36 die "parse_create_table: input required\n" unless defined $ddl;
907              
908             # A name part is a backtick-quoted identifier or a bare run with no
909             # space / dot / paren; the table name is one part, optionally
910             # database-qualified with a second. Inside backticks both escape
911             # forms are accepted: \X (backslash) and `` (doubled).
912 13         33 my $part = qr/(?:`(?:[^`\\]|\\.|``)+`|[^\s.(]+)/;
913 13 100       489 $ddl =~ /\bCREATE\s+(?:OR\s+REPLACE\s+)?(?:TEMPORARY\s+)?TABLE\s+
914             (?:IF\s+NOT\s+EXISTS\s+)?
915             ($part (?:\.$part)?)/xgci
916             or die "parse_create_table: no create table header found\n";
917 12         30 my $qname = $1;
918 12         15 my $name_end = pos $ddl;
919              
920 12         20 my ($database, $table) = _split_qname($qname);
921              
922             # Locate the column block: the first balanced (...) after the name.
923 12         20 my $open = index $ddl, '(', $name_end;
924 12 100       22 die "parse_create_table: no column list found\n" if $open < 0;
925 11         19 my ($depth, $close, $len) = (0, -1, length $ddl);
926 11         19 for (my $i = $open; $i < $len; $i++) {
927 355         283 my $c = substr($ddl, $i, 1);
928 355 100       329 if ($c eq '`') { $i = _skip_backtick_quoted($ddl, $i) - 1; next }
  20         24  
  20         29  
929 335 100       479 if ($c eq '(') { $depth++ }
  19 100       23  
930 18 100       13 elsif ($c eq ')') { $depth--; if ($depth == 0) { $close = $i; last } }
  18         25  
  10         25  
  10         11  
931             }
932 11 100       19 die "parse_create_table: unbalanced column list\n" if $close < 0;
933 10         14 my $block = substr $ddl, $open + 1, $close - $open - 1;
934              
935 10         11 my @columns;
936 10         17 for my $def (_split_column_defs($block)) {
937             # Skip table-level INDEX / CONSTRAINT / PROJECTION / primary key
938             # entries that share the column block.
939 19 50       38 next if $def =~ /\A(?:INDEX|CONSTRAINT|PROJECTION|PRIMARY\s+KEY)\b/i;
940 19         18 my ($cname, $rest);
941 19 50       74 if ($def =~ /\A(`(?:[^`\\]|\\.|``)+`)\s+(.*)\z/s) {
    0          
942 19         23 ($cname, $rest) = (_unquote_ident($1), $2);
943             } elsif ($def =~ /\A(\S+)\s+(.*)\z/s) {
944 0         0 ($cname, $rest) = ($1, $2);
945             } else {
946 0         0 next;
947             }
948             # The type is the leading identifier plus its balanced (...) args;
949             # DEFAULT / CODEC / TTL / COMMENT modifiers come after and are
950             # not part of the type.
951 19         33 my $type = _take_type(\$rest);
952 19 50       53 push @columns, [$cname, $type] if length $type;
953             }
954 10 50       20 die "parse_create_table: no columns parsed\n" unless @columns;
955              
956 10         12 my $tail = substr $ddl, $close + 1;
957 10         35 my %out = (
958             database => $database,
959             table => $table,
960             columns => \@columns,
961             );
962             # Trailing clauses. ENGINE has no terminating keyword of its own;
963             # each clause runs until the next clause keyword or end of string.
964 10         19 my $stop = qr/\bENGINE\b|\bPARTITION\s+BY\b|\bPRIMARY\s+KEY\b|
965             \bORDER\s+BY\b|\bSAMPLE\s+BY\b|\bTTL\b|\bSETTINGS\b|\z/xi;
966 10         96 my %clause = (
967             engine => qr/\bENGINE\s*=\s*/i,
968             partition_by => qr/\bPARTITION\s+BY\s+/i,
969             primary_key => qr/\bPRIMARY\s+KEY\s+/i,
970             order_by => qr/\bORDER\s+BY\s+/i,
971             sample_by => qr/\bSAMPLE\s+BY\s+/i,
972             ttl => qr/\bTTL\s+/i,
973             settings => qr/\bSETTINGS\s+/i,
974             );
975 10         23 for my $k (keys %clause) {
976             # No /g: each clause is searched from the start of $tail
977             # independently. With /g the shared pos() on $tail would make a
978             # clause that sorts earlier in the string than a previously
979             # matched one unfindable. $+[0] is the end offset of the match.
980 70 100       219 next unless $tail =~ $clause{$k};
981 22         56 my $val = substr $tail, $+[0];
982 22         312 $val =~ s/$stop.*\z//s;
983 22         58 $val =~ s/\A\s+|\s+\z//g;
984 22 50       46 $out{$k} = $val if length $val;
985             }
986 10         64 return \%out;
987             }
988              
989             # Consume one CH type expression from the front of $$rest (an identifier
990             # optionally followed by a balanced parenthesised argument list), advance
991             # $$rest past it, and return the type string. Used by parse_create_table
992             # to separate the type from trailing DEFAULT/CODEC/... modifiers.
993             sub _take_type {
994 19     19   20 my ($rest) = @_;
995 19         24 $$rest =~ s/\A\s+//;
996 19 50       35 return '' unless $$rest =~ /\A([A-Za-z_]\w*)/;
997 19         44 my $type = $1;
998 19         17 my $pos = length $type;
999 19 100       38 if (substr($$rest, $pos, 1) eq '(') {
1000 5         5 my ($depth, $len) = (0, length $$rest);
1001 5         9 for (my $i = $pos; $i < $len; $i++) {
1002 80         70 my $c = substr($$rest, $i, 1);
1003 80 50       84 if ($c eq '`') { $i = _skip_backtick_quoted($$rest, $i) - 1; next }
  0         0  
  0         0  
1004 80 100       115 if ($c eq '(') { $depth++ }
  7 100       7  
1005 7 100       5 elsif ($c eq ')') { $depth--; if (!$depth) { $pos = $i + 1; last } }
  7         9  
  5         4  
  5         6  
1006             }
1007             }
1008 19         26 $type = substr $$rest, 0, $pos;
1009 19         31 substr($$rest, 0, $pos) = '';
1010 19         24 return $type;
1011             }
1012              
1013             # Inspect a captured Native block and return its column shape as a
1014             # fresh encoder configured for that shape. Useful for diagnosing
1015             # captured payloads off-line (no server, no schema source) and for
1016             # round-tripping bytes through a transform/filter step. Zero-row
1017             # blocks work fine - the column headers are still on the wire.
1018             sub for_native_bytes {
1019 2     2 1 1410 my ($class, $bytes) = @_;
1020 2         54 my $blk = $class->decode_block($bytes);
1021 2         5 my @cols;
1022 2         3 for my $col (@{ $blk->{columns} }) {
  2         7  
1023 4         16 push @cols, [ $col->{name}, $col->{type} ];
1024             }
1025 2         21 return $class->new(columns => \@cols);
1026             }
1027              
1028             # --- RowBinary -------------------------------------------------------
1029             #
1030             # RowBinary is ClickHouse's row-major binary format: each row is the
1031             # concatenation of its column values, with no headers. The per-value
1032             # byte encoding for scalar / String / FixedString / Nullable columns is
1033             # byte-identical to that column's data in a one-row Native block, so
1034             # encode_row_binary reuses the XS Native encoder and slices the value
1035             # region out; decode_row_binary wraps a value back into a one-row Native
1036             # block and runs the XS decoder. Array(T) is the one shape that differs
1037             # (RowBinary uses a varint element count where Native uses a UInt64
1038             # offset) and is handled by recursion. This keeps a single source of
1039             # truth for every type's wire bytes - no second per-type codec in Perl.
1040             #
1041             # Supported: all scalar types (Int*/UInt*/Float*/Bool/Date*/DateTime*/
1042             # Decimal*/UUID/IPv4/IPv6/Enum*), String, FixedString(N), Nullable of
1043             # any of those, Array(...) nesting, and LowCardinality(...) (encoded as
1044             # its inner type, which is how RowBinary represents it). Map, Tuple,
1045             # Variant, JSON, Dynamic, Geo and Nested croak - their Native and
1046             # RowBinary framings diverge in ways the slice trick cannot bridge.
1047              
1048             # Fixed on-wire byte width per scalar base type (parametric suffix
1049             # already stripped by the caller). Decimal/FixedString are sized
1050             # separately because their width depends on the type parameters.
1051             my %RB_FIXED_WIDTH = (
1052             Int8 => 1, UInt8 => 1, Bool => 1, Boolean => 1, Enum8 => 1,
1053             Int16 => 2, UInt16 => 2, Date => 2, Enum16 => 2,
1054             Int32 => 4, UInt32 => 4, Float32 => 4, BFloat16 => 2,
1055             Date32 => 4, DateTime => 4, IPv4 => 4, Decimal32 => 4,
1056             Int64 => 8, UInt64 => 8, Float64 => 8, DateTime64 => 8,
1057             Decimal64 => 8,
1058             Int128 => 16, UInt128 => 16, UUID => 16, IPv6 => 16, Decimal128 => 16,
1059             Int256 => 32, UInt256 => 32, Decimal256 => 32,
1060             );
1061              
1062             # Croak unless $type is a RowBinary-supported scalar (or Nullable of
1063             # one). Array / LowCardinality are peeled by the caller before this
1064             # runs, so anything parenthesised-and-recursive that reaches here is
1065             # unsupported.
1066             sub _rb_assert_scalar {
1067 82     82   90 my ($type) = @_;
1068             # ClickHouse does not nest Nullable, so a single peel is enough.
1069 82         78 my $t = $type;
1070 82 100       137 $t = $1 if $t =~ /\ANullable\((.+)\)\z/s;
1071 82 100       172 die "row_binary: type '$type' is not supported (RowBinary covers "
1072             . "scalar/String/FixedString columns, optionally Nullable, "
1073             . "Array, or LowCardinality)\n"
1074             if $t =~ /\A(?:Map|Tuple|Variant|JSON|Object|Dynamic|Point|Ring
1075             |LineString|MultiLineString|Polygon|MultiPolygon
1076             |Nested|SimpleAggregateFunction|AggregateFunction
1077             |Array|LowCardinality)\b/x;
1078 78         76 return;
1079             }
1080              
1081             # The varint / length-string codecs are XS, registered into the
1082             # ClickHouse::Encoder::TCP package (the protocol packers use them);
1083             # the single shared object installs them regardless of package, so
1084             # they are callable here once the main module's XS has booted. Alias
1085             # them in as glob aliases (not wrapper subs) so a call goes straight
1086             # to the XSUB with no extra frame and exact context propagation.
1087             ## no critic (ProhibitCallsToUnexportedSubs)
1088             *_rb_pack_varint = \&ClickHouse::Encoder::TCP::pack_varint;
1089             *_rb_unpack_varint = \&ClickHouse::Encoder::TCP::unpack_varint;
1090             *_rb_pack_string = \&ClickHouse::Encoder::TCP::pack_string;
1091             ## use critic
1092              
1093             # Encode one value of $type into RowBinary bytes. $cache memoises the
1094             # single-column Native encoder + value-region offset per scalar type.
1095             sub _rb_encode_value {
1096 55     55   78 my ($type, $val, $cache) = @_;
1097 55 100       90 if ($type =~ /\AArray\((.+)\)\z/s) {
1098 8         13 my $inner = $1;
1099 8 50       14 die "encode_row_binary: Array column needs an arrayref value\n"
1100             unless ref $val eq 'ARRAY';
1101 8         15 my $s = _rb_pack_varint(scalar @$val);
1102 8         27 $s .= _rb_encode_value($inner, $_, $cache) for @$val;
1103 8         17 return $s;
1104             }
1105 47 100       61 if ($type =~ /\ALowCardinality\((.+)\)\z/s) {
1106 2         4 return _rb_encode_value($1, $val, $cache);
1107             }
1108 45         72 _rb_assert_scalar($type);
1109 41   66     75 my $slot = $cache->{$type} ||= do {
1110 17         77 my $enc = ClickHouse::Encoder->new(columns => [['c', $type]]);
1111             # One-row, one-column Native block prefix:
1112             # varint(ncols=1) varint(nrows=1) lenstr("c") lenstr(type)
1113 17         52 my $prefix = 2 + length(_rb_pack_string('c'))
1114             + length(_rb_pack_string($type));
1115 17         47 [$enc, $prefix];
1116             };
1117 41         249 return substr($slot->[0]->encode([[ $val ]]), $slot->[1]);
1118             }
1119              
1120             # Encode an arrayref of rows into a RowBinary byte string. Call on an
1121             # encoder instance (its column types drive serialisation). The result
1122             # is the request body for `insert ... format RowBinary`.
1123             sub encode_row_binary {
1124 12     12 1 168961 my ($self, $rows) = @_;
1125 12 100       37 die "encode_row_binary: rows must be an arrayref\n"
1126             unless ref $rows eq 'ARRAY';
1127 11         26 my $cols = $self->columns;
1128 11         13 my %cache;
1129 11         11 my $out = '';
1130 11         23 for my $ri (0 .. $#$rows) {
1131 14         28 my $row = $rows->[$ri];
1132 14 50       39 die "encode_row_binary: row $ri must be an arrayref\n"
1133             unless ref $row eq 'ARRAY';
1134 14 100       31 die "encode_row_binary: row $ri has " . scalar(@$row)
1135             . " values, expected " . scalar(@$cols) . "\n"
1136             unless @$row == @$cols;
1137 13         16 for my $ci (0 .. $#$cols) {
1138 41         64 $out .= _rb_encode_value($cols->[$ci][1], $row->[$ci], \%cache);
1139             }
1140             }
1141 6         41 return $out;
1142             }
1143              
1144             # Byte length of one scalar/String/FixedString/Nullable value at
1145             # $$bufref position $pos (does not advance). Array is handled by the
1146             # caller's recursion, never reaching here.
1147             sub _rb_value_len {
1148 42     42   47 my ($type, $bufref, $pos) = @_;
1149 42 100       52 if ($type =~ /\ANullable\((.+)\)\z/s) {
1150 5         9 return 1 + _rb_value_len($1, $bufref, $pos + 1);
1151             }
1152 37 50       41 if ($type =~ /\ALowCardinality\((.+)\)\z/s) {
1153 0         0 return _rb_value_len($1, $bufref, $pos);
1154             }
1155 37 100       46 if ($type eq 'String') {
1156 7         13 my ($len, $after) = _rb_unpack_varint($$bufref, $pos);
1157 7         10 return ($after - $pos) + $len;
1158             }
1159 30 100       55 if ($type =~ /\AFixedString\((\d+)\)\z/) {
1160 2         6 return $1;
1161             }
1162 28         40 (my $base = $type) =~ s/\(.*//s;
1163 28 50       35 if ($base eq 'Decimal') {
1164             # Decimal(P, S): storage width follows the precision P.
1165 0         0 my ($p) = $type =~ /\(\s*(\d+)/;
1166 0 0       0 return $p <= 9 ? 4 : $p <= 18 ? 8
    0          
    0          
1167             : $p <= 38 ? 16 : 32;
1168             }
1169 28         35 my $w = $RB_FIXED_WIDTH{$base};
1170 28 50       41 die "decode_row_binary: cannot size unsupported type '$type'\n"
1171             unless defined $w;
1172 28         32 return $w;
1173             }
1174              
1175             # Decode one value of $type at $$posref, advancing $$posref past it.
1176             sub _rb_decode_value {
1177 46     46   59 my ($type, $bufref, $posref, $cache) = @_;
1178 46 100       68 if ($type =~ /\AArray\((.+)\)\z/s) {
1179 7         10 my $inner = $1;
1180 7         13 my ($n, $after) = _rb_unpack_varint($$bufref, $$posref);
1181 7         7 $$posref = $after;
1182 7         16 return [ map { _rb_decode_value($inner, $bufref, $posref, $cache) }
  9         17  
1183             1 .. $n ];
1184             }
1185 39 100       51 if ($type =~ /\ALowCardinality\((.+)\)\z/s) {
1186 2         4 return _rb_decode_value($1, $bufref, $posref, $cache);
1187             }
1188 37         45 _rb_assert_scalar($type);
1189 37         47 my $vlen = _rb_value_len($type, $bufref, $$posref);
1190 37         50 my $vbytes = substr($$bufref, $$posref, $vlen);
1191 37 50       47 die "decode_row_binary: truncated value for '$type' at offset $$posref\n"
1192             if length($vbytes) != $vlen;
1193 37         32 $$posref += $vlen;
1194 37   66     97 my $prefix = $cache->{$type} ||=
1195             "\x01\x01" . _rb_pack_string('c') . _rb_pack_string($type);
1196 37         229 my $blk = ClickHouse::Encoder->decode_block($prefix . $vbytes);
1197 37         142 return $blk->{columns}[0]{values}[0];
1198             }
1199              
1200             # Decode a RowBinary byte string into an arrayref of row arrayrefs.
1201             # Call on an encoder instance whose column types match the producer.
1202             sub decode_row_binary {
1203 7     7 1 1517 my ($self, $bytes) = @_;
1204 7 100       19 die "decode_row_binary: must be called on an encoder instance\n"
1205             unless ref $self;
1206 6 50       11 die "decode_row_binary: input must be defined\n" unless defined $bytes;
1207 6         9 my $cols = $self->columns;
1208 6         6 my $pos = 0;
1209 6         6 my $len = length $bytes;
1210             # With zero columns the inner per-column loop is a no-op, so a
1211             # non-empty buffer would never make $pos advance - guard explicitly
1212             # rather than spin forever.
1213 6 100 100     22 die "decode_row_binary: encoder has no columns but $len bytes given\n"
1214             if !@$cols && $len;
1215 5         6 my %cache;
1216             my @rows;
1217 5         10 while ($pos < $len) {
1218 7         7 my @row;
1219 7         9 for my $col (@$cols) {
1220 35         48 push @row, _rb_decode_value($col->[1], \$bytes, \$pos, \%cache);
1221             }
1222 7         14 push @rows, \@row;
1223             }
1224 5         23 return \@rows;
1225             }
1226              
1227             # Post-process a decoded block (or any one column's values arrayref):
1228             # rewrite Date / Date32 / DateTime / DateTime64 integer epochs into
1229             # ISO 8601 strings or Time::Moment instances. Modifies the block in
1230             # place AND returns it so the call can be chained. as => 'iso' (the
1231             # default) emits UTC strings with a 'Z' suffix; as => 'datetime'
1232             # returns Time::Moment objects (requires Time::Moment installed).
1233             # DateTime64 precision is read from the column's type string so each
1234             # tick converts to the correct number of fractional digits.
1235             sub coerce_datetimes {
1236 9     9 1 303418 my ($class_or_self, $block, %opts) = @_;
1237 9   100     74 my $as = $opts{as} // 'iso';
1238 9 100 66     47 die "coerce_datetimes: 'as' must be 'iso' or 'datetime' (got '$as')\n"
1239             unless $as eq 'iso' || $as eq 'datetime';
1240              
1241 7 50       17 if ($as eq 'datetime') {
1242 0         0 require Time::Moment;
1243             } else {
1244 7         1103 require POSIX;
1245             }
1246              
1247 7         12060 for my $col (@{ $block->{columns} }) {
  7         23  
1248 19 100       46 next if $col->{skipped};
1249 18         28 my $type = $col->{type};
1250 18         41 my $vals = $col->{values};
1251              
1252             # Strip Nullable() wrapping so the inner type matches below.
1253             # Nullable values come through as undef in the values array;
1254             # the loops already skip undef.
1255 18 100       50 $type = $1 if $type =~ /^Nullable\((.*)\)\z/;
1256              
1257 18 100 100     114 if ($type eq 'Date' || $type eq 'Date32') {
    100 100        
    100          
1258 4         6 for my $v (@$vals) {
1259 6 50       13 next unless defined $v;
1260 6         19 $v = _epoch_to_string($v * 86400, 0, $as, 'Y-m-d');
1261             }
1262             }
1263             elsif ($type eq 'DateTime' || $type =~ /^DateTime\(/) {
1264 6         15 for my $v (@$vals) {
1265 10 100       28 next unless defined $v;
1266 8         26 $v = _epoch_to_string($v, 0, $as, 'iso');
1267             }
1268             }
1269             elsif ($type =~ /^DateTime64\((\d+)/) {
1270 6         16 my $precision = $1;
1271 6         13 my $scale = 10 ** $precision;
1272 6         10 for my $v (@$vals) {
1273 7 50       14 next unless defined $v;
1274             # Decoded DateTime64 is an integer count of (10^precision)
1275             # ticks since the Unix epoch (a signed int64).
1276 68     68   31386 use integer;
  68         910  
  68         277  
1277 7         13 my $secs = int($v / $scale);
1278 68     68   2729 no integer;
  68         125  
  68         193  
1279 7         12 my $frac = $v - $secs * $scale;
1280             # Normalize negative fractional tail to a positive frac
1281             # below the integer epoch.
1282 7 100       14 if ($frac < 0) { $frac += $scale; $secs -= 1 }
  1         2  
  1         1  
1283 7         17 $v = _epoch_to_string($secs, $frac, $as,
1284             'iso', $precision);
1285             }
1286             }
1287             # other columns (non-time) untouched
1288             }
1289 7         25 return $block;
1290             }
1291              
1292             # Format a (whole_seconds, fractional_ticks_at_$precision) pair under
1293             # either 'iso' or 'datetime' modes. Internal helper; the strftime is
1294             # UTC-only on purpose (CH itself stores UTC ticks; per-column timezone
1295             # is a display concern handled separately by the user if needed).
1296             sub _epoch_to_string {
1297 21     21   38 my ($secs, $frac_ticks, $as, $shape, $precision) = @_;
1298 21 50       35 if ($as eq 'datetime') {
1299 0 0 0     0 if ($shape ne 'Y-m-d' && $precision) {
1300             # Time::Moment uses nanosecond precision; widen the ticks
1301             # accordingly. Precision > 9 isn't supported here.
1302 0         0 my $ns = $frac_ticks * (10 ** (9 - $precision));
1303 0         0 return Time::Moment->from_epoch($secs, $ns);
1304             }
1305 0         0 return Time::Moment->from_epoch($secs);
1306             }
1307             # 'iso' string form
1308 21         76 my @t = gmtime $secs;
1309 21 100       35 if ($shape eq 'Y-m-d') {
1310 6         85 return POSIX::strftime('%Y-%m-%d', @t);
1311             }
1312 15         340 my $base = POSIX::strftime('%Y-%m-%dT%H:%M:%S', @t);
1313 15 100       42 if ($precision) {
1314 6         21 $base .= sprintf('.%0*d', $precision, $frac_ticks);
1315             }
1316 15         62 return $base . 'Z';
1317             }
1318              
1319             # Static per-type byte budgets used by estimate_size. Returns the
1320             # bytes-per-row for fixed-width types; for variable types the second
1321             # return value is an "average string size" heuristic the caller can
1322             # use, and undef means "walk the actual values for accuracy". The
1323             # table is intentionally coarse - the goal is order-of-magnitude
1324             # sizing for batch-split decisions, not byte-exact accounting.
1325             my %FIXED_TYPE_BYTES = (
1326             Int8 => 1, UInt8 => 1, Bool => 1, Boolean => 1,
1327             Int16 => 2, UInt16 => 2, Date => 2,
1328             Int32 => 4, UInt32 => 4, Float32 => 4, BFloat16 => 2,
1329             Date32 => 4, DateTime => 4, IPv4 => 4, Decimal32 => 4,
1330             Enum8 => 1, Enum16 => 2,
1331             Int64 => 8, UInt64 => 8, Float64 => 8, DateTime64 => 8,
1332             Decimal64 => 8,
1333             UUID => 16, IPv6 => 16, Decimal128 => 16,
1334             Decimal256 => 32,
1335             # Geo aliases:
1336             Point => 16, Ring => 64, LineString => 64,
1337             MultiLineString => 256, Polygon => 256, MultiPolygon => 256,
1338             );
1339              
1340             sub _type_byte_estimate {
1341 25     25   38 my ($type, $avg_str) = @_;
1342 25   50     45 $avg_str //= 16;
1343             # Strip outer parens/args for parametric prefix match.
1344 25         52 my $base = $type;
1345 25         58 $base =~ s/\(.*\z//s;
1346 25 100       115 return $FIXED_TYPE_BYTES{$base} if exists $FIXED_TYPE_BYTES{$base};
1347              
1348 11 100       27 if ($base eq 'String') {
1349 6         27 return $avg_str + 1; # +1 for varint length prefix (small lens)
1350             }
1351 5 100       13 if ($base eq 'FixedString') {
1352 1         7 my ($n) = $type =~ /^FixedString\((\d+)\)/;
1353 1   33     9 return $n // $avg_str;
1354             }
1355 4 50       11 if ($base eq 'Nullable') {
1356 0         0 my ($inner) = $type =~ /^Nullable\((.+)\)$/;
1357 0         0 return 1 + _type_byte_estimate($inner, $avg_str);
1358             }
1359 4 100 66     19 if ($base eq 'Array' || $base eq 'Map') {
1360             # 8-byte offset per row + N_avg(=4) inner elements.
1361 1         8 my ($inner) = $type =~ /^\w+\((.+)\)$/;
1362 1         7 return 8 + 4 * _type_byte_estimate($inner, $avg_str);
1363             }
1364 3 50       8 if ($base eq 'Tuple') {
1365 0         0 my ($body) = $type =~ /^Tuple\((.+)\)$/;
1366 0 0       0 return 0 unless defined $body;
1367 0         0 my @parts = _split_paren_list($body);
1368 0         0 my $sum = 0;
1369 0         0 for my $p (@parts) {
1370             # Tuple elements may be named: "name Type"
1371 0         0 $p =~ s/^[A-Za-z_]\w*\s+//;
1372 0         0 $sum += _type_byte_estimate($p, $avg_str);
1373             }
1374 0         0 return $sum;
1375             }
1376             # LowCardinality: dict + per-row 1-byte index (typical low cardinality).
1377             # Variant: 1 disc byte + (heuristically) inner avg.
1378 3 100 66     16 return 1 + $avg_str if $base eq 'LowCardinality' || $base eq 'Variant';
1379             # JSON/Object/Dynamic: shape-dependent. Heuristic: roughly two
1380             # avg_string_size payloads per row (one for path machinery + one
1381             # for value bytes), so the caller's avg_string_size override
1382             # actually moves the estimate.
1383 2 0 33     10 return 2 * $avg_str if $base eq 'JSON' || $base eq 'Object' || $base eq 'Dynamic';
      33        
1384             # SimpleAggregateFunction(func, T) -> inner T.
1385 0 0       0 if ($base eq 'SimpleAggregateFunction') {
1386 0         0 my ($inner) = $type =~ /^SimpleAggregateFunction\([^,]+,\s*(.+)\)$/;
1387 0 0       0 return _type_byte_estimate($inner, $avg_str) if defined $inner;
1388             }
1389 0         0 return $avg_str; # unknown -> conservative
1390             }
1391              
1392             # Coarse byte-size estimate for an encoded block, parameterized on
1393             # row count (an integer) or arrayref (counted). Uses per-type byte
1394             # budgets; variable-length types use a 16-byte average per value
1395             # (override via $avg_str). For batch-size decisions ("is this 1 MiB
1396             # or 100 MiB before I compress?"). NOT byte-exact: a String row with
1397             # a 10 KiB blob will be undercounted. Run encode() for the real size.
1398             sub estimate_size {
1399 19     19 1 152539 my ($self, $rows_or_n, %opts) = @_;
1400 19 100       52 my $n = ref $rows_or_n eq 'ARRAY' ? scalar @$rows_or_n : $rows_or_n;
1401 19   100     65 my $avg_str = $opts{avg_string_size} // 16;
1402 19         43 my $cols = $self->columns;
1403 19         25 my $total = 4; # block header (ncols + nrows varints, ~tiny)
1404 19         35 for my $c (@$cols) {
1405 24         46 my ($name, $type) = @$c;
1406 24         38 $total += length($name) + length($type) + 2; # lenstr headers
1407 24         43 $total += $n * _type_byte_estimate($type, $avg_str);
1408             }
1409 19         65 return $total;
1410             }
1411              
1412             # Return a configured encoder for the column shape produced by an
1413             # arbitrary select. Runs `describe ($sql)` via the same `via=>...`
1414             # transport as for_table. Useful when the schema isn't a real table.
1415             sub for_query {
1416 0     0 1 0 my ($class, $sql, %opts) = @_;
1417             # ClickHouse's describe accepts a subquery, but the SQL must not
1418             # contain unmatched parentheses; let the server reject malformed
1419             # queries rather than re-implementing SQL validation here.
1420 0         0 my $describe = "describe ($sql)";
1421 0         0 return $class->_for_describe($describe, %opts);
1422             }
1423              
1424             # Issue an insert ... format native over HTTP using HTTP::Tiny. Returns
1425             # the response hashref from HTTP::Tiny (->{success}, ->{status},
1426             # ->{content}). Compresses with zstd/gzip if `compress` is set; takes
1427             # whatever encoder produces (so `for_table` + rows is the typical
1428             # combination). Does not retry; the caller does HTTP-level error policy.
1429             # Set up URL + headers for an insert ... format native HTTP request.
1430             # Shared by insert_http and BulkInserter::new. Validates the table name
1431             # and stamps the Content-Type / Content-Encoding headers as needed.
1432             sub _build_insert_endpoint {
1433 16     16   505 my ($table, $compress, %args) = @_;
1434 16         48 _validate_table_name($table);
1435 14 50 66     103 die "unknown compress='$compress' "
      66        
1436             . "(expected 'raw', 'zstd', or 'gzip')\n"
1437             unless $compress eq 'raw' || $compress eq 'zstd'
1438             || $compress eq 'gzip';
1439 12         51 my ($url, $hdr) = _http_url_headers(
1440             "insert into $table format native", %args);
1441 12         30 $hdr->{'Content-Type'} = 'application/octet-stream';
1442 12 50 33     45 $hdr->{'Content-Encoding'} = $compress
1443             if $compress eq 'zstd' || $compress eq 'gzip';
1444 12         34 return ($url, $hdr);
1445             }
1446              
1447             # Apply zstd/gzip compression to $body in place (or pass through for 'raw').
1448             # $compress is validated upstream by _build_insert_endpoint; we trust it
1449             # here. $origin is the class used to resolve compressed_writer (so the
1450             # helper works for class-method and instance callers alike).
1451             sub _apply_compression {
1452 13     13   26 my ($origin, $compress, $body) = @_;
1453 13 50       39 return $body if $compress eq 'raw';
1454 0         0 my $compressed;
1455             my $wrap = $origin->compressed_writer(
1456 0     0   0 $compress, sub { $compressed = $_[0] });
  0         0  
1457 0         0 $wrap->($body);
1458 0         0 return $compressed;
1459             }
1460              
1461             sub insert_http {
1462 3     3 1 135668 my ($class_or_self, %args) = @_;
1463 3   33     10 my $enc = $args{encoder} // do {
1464 3 50       7 my $cols = $args{columns} or die "insert_http needs columns or encoder";
1465 3         50 $class_or_self->new(columns => $cols);
1466             };
1467 3 100       16 my $rows = $args{rows} or die "insert_http needs rows arrayref";
1468 2 50       4 my $table = $args{table} or die "insert_http needs table";
1469 2   50     6 my $timeout = $args{timeout} // 60;
1470 2   100     6 my $compress = $args{compress} // 'raw';
1471 2   33     13 my $origin = ref $class_or_self || $class_or_self;
1472              
1473 2         6 my ($url, $hdr) = _build_insert_endpoint($table, $compress, %args);
1474 0         0 my $body = _apply_compression($origin, $compress, $enc->encode($rows));
1475              
1476 0         0 my $resp = _http_tiny(%args, timeout => $timeout)
1477             ->post($url, { headers => $hdr, content => $body });
1478 0         0 return _decorate_response($resp);
1479             }
1480              
1481             # Stream a select response: POST the SQL with default_format=Native,
1482             # feed the response chunks into a sliding buffer, decode complete blocks
1483             # as they arrive, and pass each one to $opts{on_block}. Memory stays
1484             # bounded by chunk_size + one block, so this is the right entry point
1485             # for selects that won't fit in memory. The user's $sql must NOT include
1486             # a format clause - this helper always requests format Native.
1487             sub select_blocks {
1488 5     5 1 176248 my ($class, $sql, %opts) = @_;
1489             my $cb = $opts{on_block}
1490 5 100       19 or die "select_blocks: 'on_block' coderef required\n";
1491 4 100       39 die "select_blocks: \$sql should not include a format clause "
1492             . "(select_blocks always requests format Native)\n"
1493             if $sql =~ /\bformat\s+\w+\s*\z/i;
1494 1         3 my $keep = $opts{keep};
1495 1         3 my $decompress = $opts{decompress};
1496              
1497             # Build URL+headers via the same helper insert_http uses, but with an
1498             # empty SQL placeholder; we POST the SQL as the request body. Adding
1499             # default_format=Native ensures the response is Native bytes even if
1500             # the user's SQL doesn't terminate with format.
1501 1         3 my %h_opts = %opts;
1502             # Drop keys this method consumes; also drop dedup_token, which is
1503             # meaningful only on insert (would be silently dead weight on the
1504             # select URL otherwise and could mask a typo by the caller).
1505 1         5 delete @h_opts{qw(on_block keep timeout decompress dedup_token)};
1506 1         4 my ($url, $hdr) = _http_url_headers('', %h_opts);
1507 1         2 $url .= '&default_format=Native';
1508             # When decompress=1 is requested the server wraps each Native block
1509             # in its compressed-block framing (X-ClickHouse-Compressed header).
1510             # Add ?compress=1 to the URL so CH knows to compress the response.
1511 1 50       2 $url .= '&compress=1' if $decompress;
1512              
1513 1         1 my $buf = '';
1514             # Block walker: when decompress is set, walk through compressed-block-
1515             # framing entries and feed the decompressed bytes into a second
1516             # accumulator that decode_block reads. Otherwise feed buf directly.
1517 1         1 my $inner_buf = '';
1518             my $drain = sub {
1519             # Phase 1: pull compressed-block frames out of $buf into $inner_buf
1520 1 50   1   2 if ($decompress) {
1521 0         0 while (length($buf) >= 25) { # 16 hash + 9 header minimum
1522 0         0 my $csize = unpack 'V', substr($buf, 17, 4);
1523 0 0       0 last if length($buf) < 16 + $csize;
1524 0         0 my ($plain, $consumed) =
1525             $class->decompress_native_block($buf);
1526 0         0 $inner_buf .= $plain;
1527 0         0 substr($buf, 0, $consumed, '');
1528             }
1529             } else {
1530 1         2 $inner_buf = $buf;
1531             }
1532             # Phase 2: decode whole Native blocks out of $inner_buf
1533 1         3 while (length($inner_buf) > 0) {
1534 0         0 my $block = eval { $class->decode_block($inner_buf, 0, $keep) };
  0         0  
1535 0 0       0 if ($@) {
1536 0 0       0 last if $@ =~ /buffer truncated/i;
1537 0         0 die $@;
1538             }
1539 0         0 $cb->($block);
1540 0         0 substr($inner_buf, 0, $block->{consumed}, '');
1541             }
1542             # When not decompressing, inner_buf IS buf; carry the residual
1543             # back so the next data_callback append sees the unconsumed tail.
1544 1 50       2 if (!$decompress) {
1545 1         2 $buf = $inner_buf;
1546             }
1547 1         4 };
1548              
1549             my $resp = _http_tiny(%opts, timeout => $opts{timeout} // 60)->post(
1550             $url,
1551             { content => $sql,
1552             headers => { %$hdr, 'Content-Type' => 'text/plain' },
1553 0     0   0 data_callback => sub { $buf .= $_[0]; $drain->() },
  0         0  
1554 1   50     7 });
1555              
1556             die "select_blocks: HTTP $resp->{status}: $resp->{content}\n"
1557 1 50       111 unless $resp->{success};
1558              
1559 1         2 $drain->();
1560 1 50 33     2 die "select_blocks: " . length($buf) . " trailing bytes "
1561             . "after last complete compressed block\n"
1562             if $decompress && length $buf;
1563 1 50       3 die "select_blocks: " . length($inner_buf) . " trailing bytes "
1564             . "after last complete block\n"
1565             if length $inner_buf;
1566 1         10 return;
1567             }
1568              
1569             # Returns a bulk-inserter object: ->push($row), ->push_many(\@rows),
1570             # ->flush (idempotent), ->finish. Holds a single HTTP::Tiny instance
1571             # across batches (so keepalive applies) and auto-flushes when the
1572             # accumulated row count crosses batch_size. Transient HTTP failures
1573             # (5xx, network errors) are retried up to retries times with linear
1574             # backoff; 4xx errors die immediately.
1575             sub bulk_inserter {
1576 15     15 1 561051 my ($class_or_self, %args) = @_;
1577 15         81 return ClickHouse::Encoder::BulkInserter->new(%args,
1578             _origin => $class_or_self);
1579             }
1580              
1581             package ClickHouse::Encoder::BulkInserter; ## no critic (ProhibitMultiplePackages)
1582              
1583             sub new {
1584 15     15   47 my ($class, %args) = @_;
1585 15         30 my $origin_raw = delete $args{_origin};
1586 15   50     85 my $origin = (ref $origin_raw || $origin_raw) || 'ClickHouse::Encoder';
1587 15   33     42 my $enc = $args{encoder} // do {
1588 15 100       64 my $cols = $args{columns} or die "bulk_inserter needs columns or encoder";
1589 14         236 $origin->new(columns => $cols);
1590             };
1591 14 100       51 my $table = $args{table} or die "bulk_inserter needs table";
1592 13   100     45 my $compress = $args{compress} // 'raw';
1593 13   50     42 my $timeout = $args{timeout} // 60;
1594              
1595 13         45 my ($url, $hdr) = ClickHouse::Encoder::_build_insert_endpoint(
1596             $table, $compress, %args);
1597              
1598             return bless {
1599             enc => $enc,
1600             url => $url,
1601             hdr => $hdr,
1602             rows => [],
1603             batch_size => $args{batch_size} // 10_000,
1604             retries => $args{retries} // 3,
1605             retry_wait => $args{retry_wait} // 0.5,
1606 11   100     110 retry_max_wait => $args{retry_max_wait} // 30,
      100        
      100        
      100        
1607             compress => $compress,
1608             http => ClickHouse::Encoder::_http_tiny(
1609             %args, timeout => $timeout, keep_alive => 1),
1610             origin => $origin,
1611             sent_rows => 0,
1612             sent_batches => 0,
1613             last_response => undef,
1614             summary => {},
1615             }, $class;
1616             }
1617              
1618             sub push :method { ## no critic (ProhibitBuiltinHomonyms)
1619 12     12   851 my ($self, $row) = @_;
1620 12         12 CORE::push @{ $self->{rows} }, $row;
  12         23  
1621 12 50       15 $self->flush if @{ $self->{rows} } >= $self->{batch_size};
  12         30  
1622 12         30 return $self;
1623             }
1624              
1625             sub push_many {
1626 4     4   269 my ($self, $rows) = @_;
1627 4         4 CORE::push @{ $self->{rows} }, @{$rows};
  4         8  
  4         20  
1628             # Slice exactly batch_size rows per flush so we never POST one
1629             # oversized body when push_many is called with N >> batch_size.
1630             # `local` restores $self->{rows} to the remainder arrayref even
1631             # when `flush` croaks mid-batch (so caller's eval{} sees the
1632             # untried rows still buffered for a retry).
1633 4         5 while (@{ $self->{rows} } > $self->{batch_size}) {
  7         13  
1634 4         5 my @batch = splice @{ $self->{rows} }, 0, $self->{batch_size};
  4         10  
1635 4         7 local $self->{rows} = \@batch;
1636 4         10 $self->flush;
1637             }
1638 3 100       4 $self->flush if @{ $self->{rows} } >= $self->{batch_size};
  3         7  
1639 3         4 return $self;
1640             }
1641              
1642             sub flush {
1643 14     14   48 my $self = shift;
1644 14         16 my $rows = $self->{rows};
1645 14 100       17 return $self if !@{$rows};
  14         28  
1646             my $body = ClickHouse::Encoder::_apply_compression(
1647 13         184 $self->{origin}, $self->{compress}, $self->{enc}->encode($rows));
1648 13         20 my $resp;
1649             my $last_err;
1650 13         36 for my $attempt (0 .. $self->{retries}) {
1651             $resp = $self->{http}->post($self->{url},
1652 18         200 { headers => $self->{hdr}, content => $body });
1653 18 100       324 last if $resp->{success};
1654             # 4xx errors are not retryable - the request is malformed.
1655             die "bulk_inserter: HTTP $resp->{status}: $resp->{content}\n"
1656 9 100 66     63 if $resp->{status} >= 400 && $resp->{status} < 500;
1657 8         22 $last_err = "HTTP $resp->{status}: $resp->{content}";
1658             # 5xx and network failures (599) are retryable. Exponential
1659             # backoff (retry_wait * 2^attempt, capped at retry_max_wait)
1660             # with equal jitter: sleep half the window deterministically
1661             # then a random half, so concurrent inserters retrying the
1662             # same failed server don't resynchronise into a thundering herd.
1663 8 100       25 if ($attempt < $self->{retries}) {
1664 5         48 require Time::HiRes;
1665 5         15 my $window = $self->{retry_wait} * (2 ** $attempt);
1666             $window = $self->{retry_max_wait}
1667 5 50       15 if $window > $self->{retry_max_wait};
1668 5         2521206 Time::HiRes::sleep($window / 2 + rand($window / 2));
1669             }
1670             }
1671             die "bulk_inserter: gave up after $self->{retries} retries; "
1672             . "last error: $last_err\n"
1673 12 100       76 unless $resp->{success};
1674 9         26 ClickHouse::Encoder::_decorate_response($resp);
1675 9         17 $self->{last_response} = $resp;
1676 9 100       21 if (my $sum = $resp->{ch}{summary}) {
1677             # Roll up CH summary fields across batches (read_rows,
1678             # written_rows, written_bytes, elapsed_ns, ...). Caller uses
1679             # ->summary to get the running totals; ->last_response to get
1680             # the most recent per-batch detail.
1681             $self->{summary}{$_} = ($self->{summary}{$_} // 0) + $sum->{$_}
1682 2   100     5 for grep { $sum->{$_} =~ /\A-?\d+\z/ } keys %$sum;
  8         37  
1683             }
1684 9         16 $self->{rows} = [];
1685 9         24 $self->{sent_rows} += @{$rows};
  9         10  
1686 9         12 $self->{sent_batches}++;
1687 9         25 return $self;
1688             }
1689              
1690 2     2   4 sub last_response { my $self = shift; return $self->{last_response} }
  2         10  
1691 2     2   6 sub summary { my $self = shift; return $self->{summary} }
  2         17  
1692              
1693             sub finish {
1694 2     2   6 my $self = shift;
1695 2         5 $self->flush;
1696 2         5 return { rows => $self->{sent_rows}, batches => $self->{sent_batches} };
1697             }
1698              
1699 4     4   2038 sub buffered_count { my $self = shift; return scalar @{ $self->{rows} } }
  4         6  
  4         17  
1700 4     4   1581 sub sent_rows { my $self = shift; return $self->{sent_rows} }
  4         13  
1701 1     1   10 sub sent_batches { my $self = shift; return $self->{sent_batches} }
  1         4  
1702              
1703             package ClickHouse::Encoder; ## no critic (ProhibitMultiplePackages)
1704              
1705             # Decimal128 values returned by decode_block come as [lo_uint64,
1706             # hi_int64]; Decimal256 as a 4-limb arrayref. Use Math::BigInt to
1707             # stitch limbs into a scaled decimal string.
1708             sub decimal128_str {
1709 8     8 1 277878 my ($class, $lo, $hi, $scale) = @_;
1710 8         3603 require Math::BigInt;
1711 8         65325 my $two64 = Math::BigInt->new(1)->blsft(64);
1712 8         49898 my $v = Math::BigInt->new($hi)->bmul($two64)->badd($lo);
1713 8         2626 return _scale_int_to_str($v, $scale);
1714             }
1715              
1716             sub decimal256_str {
1717 5     5 1 1255 my ($class, $limbs, $scale) = @_;
1718 5         33 require Math::BigInt;
1719 5         19 my $two64 = Math::BigInt->new(1)->blsft(64);
1720             # Top limb (limbs[3]) is the sign-extended high quarter. The sign
1721             # check uses BigInt comparisons; native Perl `1 << 64` returns 0 on
1722             # a 64-bit Perl (shift past word width), which would silently turn
1723             # a negative value into a wrong positive one.
1724 5         3475 my $top = Math::BigInt->new($limbs->[3]);
1725 5         326 my $two63 = Math::BigInt->new(1)->blsft(63);
1726 5 100       3299 $top->bsub($two64) if $top >= $two63;
1727 5         387 my $v = $top;
1728 5         12 $v->bmul($two64)->badd(Math::BigInt->new($limbs->[2]));
1729 5         1184 $v->bmul($two64)->badd(Math::BigInt->new($limbs->[1]));
1730 5         1215 $v->bmul($two64)->badd(Math::BigInt->new($limbs->[0]));
1731 5         1218 return _scale_int_to_str($v, $scale);
1732             }
1733              
1734             sub _scale_int_to_str {
1735 13     13   21 my ($big, $scale) = @_;
1736 13 100       27 my $sign = $big->is_neg ? '-' : '';
1737 13         80 my $abs = $big->copy->babs->bstr;
1738 13 100 66     1393 return "$sign$abs" if !$scale || $scale == 0;
1739 5 100       18 $abs = ('0' x ($scale - length($abs) + 1)) . $abs
1740             if length($abs) <= $scale;
1741 5         61 return $sign . substr($abs, 0, length($abs) - $scale)
1742             . '.'
1743             . substr($abs, length($abs) - $scale);
1744             }
1745              
1746             # Convenience: open `@cmd` as a write pipe, encode rows into it, and
1747             # close. Croaks on fork failure, exec failure, or non-zero child exit.
1748             # Used by examples piping into
1749             # `clickhouse-client insert ... format native`.
1750             sub encode_to_command {
1751 13     13 1 279083 my ($self, $cmd, $rows) = @_;
1752 13 100       85 ref $cmd eq 'ARRAY' or die "encode_to_command: cmd must be arrayref";
1753 11 100       58 @$cmd or die "encode_to_command: cmd must be non-empty arrayref";
1754              
1755             # Without ignoring SIGPIPE, an early child exit (e.g. clickhouse-client
1756             # rejecting the schema) would kill the parent silently with status
1757             # 141 instead of producing a trappable diagnostic on close.
1758 9         138 local $SIG{PIPE} = 'IGNORE';
1759              
1760             ## no critic (InputOutput::RequireBriefOpen) -- $fh is closed below
1761 9 50       35726 defined(my $pid = open my $fh, '|-') or die "fork: $!";
1762 9 100       481 if ($pid == 0) {
1763             # exec only returns on failure. Fold the failure path onto the
1764             # same statement (via `or do {...}`) so Perl doesn't flag the
1765             # post-exec code as unreachable at compile time. POSIX::_exit
1766             # (not die or plain exit) skips END blocks and DESTROY handlers
1767             # inherited from the parent; syswrite avoids running PerlIO
1768             # layers and __WARN__ handlers. Suppress Perl's default "Can't
1769             # exec" warning so the only diagnostic comes from our syswrite.
1770 68     68   206216 no warnings 'exec'; ## no critic (TestingAndDebugging::ProhibitNoWarnings)
  68         143  
  68         14346  
1771 3 0       171 exec { $cmd->[0] } @$cmd or do {
  3         0  
1772 0         0 my $err = $!;
1773 0         0 require POSIX;
1774 0         0 syswrite STDERR, "exec @$cmd: $err\n";
1775 0         0 POSIX::_exit(127);
1776             };
1777             }
1778 6         42 binmode $fh;
1779 6         2579619 $self->encode_to_handle($fh, $rows);
1780 5 50       11071437 close $fh
    100          
1781             or die "@$cmd " . ($! ? "close: $!" : "exit " . ($? >> 8));
1782 3         372 return;
1783             }
1784              
1785             1;
1786              
1787             __END__