| line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
|
1
|
|
|
|
|
|
|
package ClickHouse::Encoder; |
|
2
|
68
|
|
|
68
|
|
5971325
|
use strict; |
|
|
68
|
|
|
|
|
97
|
|
|
|
68
|
|
|
|
|
1967
|
|
|
3
|
68
|
|
|
68
|
|
239
|
use warnings; |
|
|
68
|
|
|
|
|
86
|
|
|
|
68
|
|
|
|
|
615950
|
|
|
4
|
|
|
|
|
|
|
|
|
5
|
|
|
|
|
|
|
our $VERSION = '0.01'; |
|
6
|
|
|
|
|
|
|
|
|
7
|
|
|
|
|
|
|
require XSLoader; |
|
8
|
|
|
|
|
|
|
XSLoader::load('ClickHouse::Encoder', $VERSION); |
|
9
|
|
|
|
|
|
|
|
|
10
|
|
|
|
|
|
|
# Validate a [db.]table identifier as ASCII word characters only. |
|
11
|
|
|
|
|
|
|
# Rejects anything that could inject SQL via the --query argument. |
|
12
|
|
|
|
|
|
|
sub _validate_table_name { |
|
13
|
36
|
|
|
36
|
|
49
|
my $table = shift; |
|
14
|
36
|
100
|
|
|
|
289
|
$table =~ /\A[A-Za-z_][A-Za-z0-9_]*(?:\.[A-Za-z_][A-Za-z0-9_]*)?\z/ |
|
15
|
|
|
|
|
|
|
or die "Invalid table name '$table': expected [db.]name with [A-Za-z0-9_]"; |
|
16
|
32
|
|
|
|
|
60
|
return; |
|
17
|
|
|
|
|
|
|
} |
|
18
|
|
|
|
|
|
|
|
|
19
|
|
|
|
|
|
|
# Build (url, headers) for a ClickHouse HTTP request with the given SQL |
|
20
|
|
|
|
|
|
|
# in the `query` parameter. Pulls connection params from %opts using |
|
21
|
|
|
|
|
|
|
# the same defaults as for_table / insert_http. UTF-8 encodes before |
|
22
|
|
|
|
|
|
|
# percent-escaping so non-ASCII (caf%C3%A9, emoji) round-trips correctly. |
|
23
|
|
|
|
|
|
|
sub _http_url_headers { |
|
24
|
41
|
|
|
41
|
|
297133
|
my ($sql, %opts) = @_; |
|
25
|
41
|
|
|
|
|
222
|
require Encode; |
|
26
|
|
|
|
|
|
|
my $esc = sub { |
|
27
|
72
|
|
|
72
|
|
307
|
my $s = Encode::encode('UTF-8', $_[0], 0); |
|
28
|
72
|
|
|
|
|
1833
|
$s =~ s/([^A-Za-z0-9\-_.~])/sprintf('%%%02X', ord($1))/ge; |
|
|
101
|
|
|
|
|
310
|
|
|
29
|
72
|
|
|
|
|
148
|
$s; |
|
30
|
41
|
|
|
|
|
143
|
}; |
|
31
|
41
|
|
|
|
|
123
|
my ($scheme, $host, $port) = _check_endpoint(\%opts); |
|
32
|
31
|
|
100
|
|
|
102
|
my $database = $opts{database} // 'default'; |
|
33
|
31
|
|
100
|
|
|
71
|
my $user = $opts{user} // 'default'; |
|
34
|
31
|
|
100
|
|
|
94
|
my $password = $opts{password} // ''; |
|
35
|
31
|
|
|
|
|
75
|
my $url = "$scheme://$host:$port/?database=" . $esc->($database); |
|
36
|
31
|
100
|
|
|
|
89
|
$url .= "&query=" . $esc->($sql) if length $sql; |
|
37
|
|
|
|
|
|
|
# Per-query settings: { max_memory_usage => '...', max_execution_time => 30 } |
|
38
|
31
|
100
|
|
|
|
87
|
if (my $s = $opts{settings}) { |
|
39
|
3
|
|
|
|
|
13
|
for my $k (sort keys %$s) { |
|
40
|
4
|
|
|
|
|
6
|
$url .= "&" . $esc->($k) . "=" . $esc->($s->{$k}); |
|
41
|
|
|
|
|
|
|
} |
|
42
|
|
|
|
|
|
|
} |
|
43
|
|
|
|
|
|
|
# Insert-side idempotency token: identical token + payload is rejected. |
|
44
|
31
|
100
|
|
|
|
85
|
if (defined(my $tok = $opts{dedup_token})) { |
|
45
|
4
|
|
|
|
|
8
|
$url .= "&insert_deduplication_token=" . $esc->($tok); |
|
46
|
|
|
|
|
|
|
} |
|
47
|
31
|
|
|
|
|
100
|
my %hdr = ('X-ClickHouse-User' => $user); |
|
48
|
31
|
100
|
|
|
|
68
|
$hdr{'X-ClickHouse-Key'} = $password if $password ne ''; |
|
49
|
31
|
|
|
|
|
171
|
return ($url, \%hdr); |
|
50
|
|
|
|
|
|
|
} |
|
51
|
|
|
|
|
|
|
|
|
52
|
|
|
|
|
|
|
# Validate the host/port/scheme triple shared by every HTTP entry point. |
|
53
|
|
|
|
|
|
|
# Rejects anything other than http/https, ensures the port is a positive |
|
54
|
|
|
|
|
|
|
# integer, and refuses host strings that contain URL-structural characters |
|
55
|
|
|
|
|
|
|
# (':/?#&'). Centralised here so insert_http, bulk_inserter, ping, and |
|
56
|
|
|
|
|
|
|
# select_blocks share a single allow-list and identical error messages. |
|
57
|
|
|
|
|
|
|
sub _check_endpoint { |
|
58
|
43
|
|
|
43
|
|
592
|
my ($opts) = @_; |
|
59
|
43
|
|
100
|
|
|
162
|
my $scheme = $opts->{scheme} // 'http'; |
|
60
|
43
|
|
100
|
|
|
104
|
my $host = $opts->{host} // 'localhost'; |
|
61
|
43
|
|
100
|
|
|
117
|
my $port = $opts->{port} // 8123; |
|
62
|
43
|
100
|
100
|
|
|
133
|
die "endpoint: scheme must be 'http' or 'https' (got '$scheme')\n" |
|
63
|
|
|
|
|
|
|
unless $scheme eq 'http' || $scheme eq 'https'; |
|
64
|
41
|
100
|
100
|
|
|
209
|
die "endpoint: host must not contain URL-structural characters " |
|
65
|
|
|
|
|
|
|
. "(got '$host')\n" |
|
66
|
|
|
|
|
|
|
if $host =~ m{[:/?#&\s]} || !length $host; |
|
67
|
37
|
100
|
100
|
|
|
231
|
die "endpoint: port must be a positive integer (got '$port')\n" |
|
68
|
|
|
|
|
|
|
unless $port =~ /\A[1-9]\d{0,4}\z/ && $port < 65536; |
|
69
|
33
|
|
|
|
|
107
|
return ($scheme, $host, $port); |
|
70
|
|
|
|
|
|
|
} |
|
71
|
|
|
|
|
|
|
|
|
72
|
|
|
|
|
|
|
# Build an HTTP::Tiny instance honoring ssl_options (verify_SSL, SSL_ca_file, |
|
73
|
|
|
|
|
|
|
# etc.) and keep_alive. Shared by insert_http, bulk_inserter, server_version, |
|
74
|
|
|
|
|
|
|
# ping, select_blocks; callers pass %opts unchanged. Loading HTTP::Tiny here |
|
75
|
|
|
|
|
|
|
# keeps the require local to HTTP code paths. |
|
76
|
|
|
|
|
|
|
sub _http_tiny { |
|
77
|
22
|
|
|
22
|
|
190827
|
my (%opts) = @_; |
|
78
|
22
|
|
|
|
|
2836
|
require HTTP::Tiny; |
|
79
|
22
|
|
100
|
|
|
165314
|
my @args = (timeout => $opts{timeout} // 60); |
|
80
|
22
|
100
|
|
|
|
58
|
push @args, keep_alive => 1 if $opts{keep_alive}; |
|
81
|
22
|
100
|
|
|
|
50
|
push @args, SSL_options => $opts{ssl_options} if $opts{ssl_options}; |
|
82
|
22
|
100
|
|
|
|
49
|
push @args, verify_SSL => $opts{verify_SSL} if exists $opts{verify_SSL}; |
|
83
|
22
|
|
|
|
|
114
|
return HTTP::Tiny->new(@args); |
|
84
|
|
|
|
|
|
|
} |
|
85
|
|
|
|
|
|
|
|
|
86
|
|
|
|
|
|
|
# Parse a flat CH JSON object string (X-ClickHouse-Summary / |
|
87
|
|
|
|
|
|
|
# X-ClickHouse-Progress) without depending on JSON::PP. Both are small |
|
88
|
|
|
|
|
|
|
# flat objects of stringified integers (read_rows, written_rows, |
|
89
|
|
|
|
|
|
|
# total_rows_to_read, elapsed_ns, ...). Returns a hashref or undef. |
|
90
|
|
|
|
|
|
|
sub _parse_ch_kv { |
|
91
|
10
|
|
|
10
|
|
18
|
my ($str) = @_; |
|
92
|
10
|
100
|
66
|
|
|
34
|
return unless defined $str && length $str; |
|
93
|
5
|
|
|
|
|
6
|
my %h; |
|
94
|
|
|
|
|
|
|
# NB: stash $1/$2 before the digit-test regex - that regex resets |
|
95
|
|
|
|
|
|
|
# capture variables and would silently turn every key into ''. |
|
96
|
5
|
|
|
|
|
36
|
while ($str =~ /"([^"\\]+)"\s*:\s*"([^"\\]*)"/g) { |
|
97
|
14
|
|
|
|
|
61
|
my ($k, $v) = ($1, $2); |
|
98
|
14
|
50
|
|
|
|
81
|
$h{$k} = ($v =~ /\A-?\d+\z/) ? $v + 0 : $v; |
|
99
|
|
|
|
|
|
|
} |
|
100
|
5
|
50
|
|
|
|
18
|
return scalar(keys %h) ? \%h : undef; |
|
101
|
|
|
|
|
|
|
} |
|
102
|
|
|
|
|
|
|
|
|
103
|
|
|
|
|
|
|
# Lift a few ClickHouse response headers into a small hashref. Returns |
|
104
|
|
|
|
|
|
|
# undef when none are present; otherwise carries query_id, server |
|
105
|
|
|
|
|
|
|
# (revision), format, a parsed summary, and the final progress snapshot |
|
106
|
|
|
|
|
|
|
# so callers don't reparse the same headers. X-ClickHouse-Progress is |
|
107
|
|
|
|
|
|
|
# sent repeatedly while a query runs (with send_progress_in_http_headers |
|
108
|
|
|
|
|
|
|
# =1); HTTP::Tiny collapses repeats into an arrayref, so we take the |
|
109
|
|
|
|
|
|
|
# last - the most complete - snapshot. |
|
110
|
|
|
|
|
|
|
sub _decorate_response { |
|
111
|
13
|
|
|
13
|
|
3872
|
my ($resp) = @_; |
|
112
|
13
|
50
|
|
|
|
40
|
return $resp unless ref $resp eq 'HASH'; |
|
113
|
13
|
100
|
|
|
|
30
|
my $h = $resp->{headers} or return $resp; |
|
114
|
8
|
|
|
|
|
10
|
my %ch; |
|
115
|
8
|
|
|
|
|
15
|
for my $k (qw(query-id server format exception-code)) { |
|
116
|
32
|
|
|
|
|
43
|
my $hv = $h->{"x-clickhouse-$k"}; |
|
117
|
32
|
100
|
|
|
|
50
|
$ch{$k} = $hv if defined $hv; |
|
118
|
|
|
|
|
|
|
} |
|
119
|
8
|
100
|
|
|
|
25
|
if (my $sum = _parse_ch_kv($h->{'x-clickhouse-summary'})) { |
|
120
|
3
|
|
|
|
|
5
|
$ch{summary} = $sum; |
|
121
|
|
|
|
|
|
|
} |
|
122
|
8
|
100
|
|
|
|
20
|
if (defined(my $pv = $h->{'x-clickhouse-progress'})) { |
|
123
|
2
|
100
|
|
|
|
5
|
$pv = $pv->[-1] if ref $pv eq 'ARRAY'; |
|
124
|
2
|
50
|
|
|
|
4
|
if (my $prog = _parse_ch_kv($pv)) { |
|
125
|
2
|
|
|
|
|
3
|
$ch{progress} = $prog; |
|
126
|
|
|
|
|
|
|
} |
|
127
|
|
|
|
|
|
|
} |
|
128
|
8
|
100
|
|
|
|
22
|
$resp->{ch} = \%ch if %ch; |
|
129
|
8
|
|
|
|
|
12
|
return $resp; |
|
130
|
|
|
|
|
|
|
} |
|
131
|
|
|
|
|
|
|
|
|
132
|
|
|
|
|
|
|
sub for_table { |
|
133
|
0
|
|
|
0
|
1
|
0
|
my ($class, $table, %opts) = @_; |
|
134
|
0
|
|
|
|
|
0
|
_validate_table_name($table); |
|
135
|
0
|
|
|
|
|
0
|
return $class->_for_describe("describe table $table", %opts); |
|
136
|
|
|
|
|
|
|
} |
|
137
|
|
|
|
|
|
|
|
|
138
|
|
|
|
|
|
|
sub _for_describe { |
|
139
|
0
|
|
|
0
|
|
0
|
my ($class, $describe_sql, %opts) = @_; |
|
140
|
|
|
|
|
|
|
|
|
141
|
0
|
|
0
|
|
|
0
|
my $via = $opts{via} // 'client'; |
|
142
|
0
|
|
|
|
|
0
|
my @lines; |
|
143
|
|
|
|
|
|
|
|
|
144
|
0
|
0
|
|
|
|
0
|
if ($via eq 'http') { |
|
|
|
0
|
|
|
|
|
|
|
145
|
0
|
|
|
|
|
0
|
my ($url, $hdr) = _http_url_headers( |
|
146
|
|
|
|
|
|
|
"$describe_sql format tabseparated", %opts); |
|
147
|
0
|
|
0
|
|
|
0
|
my $resp = _http_tiny(%opts, timeout => $opts{timeout} // 10) |
|
148
|
|
|
|
|
|
|
->get($url, { headers => $hdr }); |
|
149
|
|
|
|
|
|
|
die "HTTP describe table failed (status $resp->{status}): $resp->{content}\n" |
|
150
|
0
|
0
|
|
|
|
0
|
unless $resp->{success}; |
|
151
|
0
|
|
|
|
|
0
|
@lines = split /\n/, $resp->{content}; |
|
152
|
|
|
|
|
|
|
} |
|
153
|
|
|
|
|
|
|
elsif ($via eq 'client') { |
|
154
|
0
|
|
0
|
|
|
0
|
my $host = $opts{host} // 'localhost'; |
|
155
|
0
|
|
0
|
|
|
0
|
my $port = $opts{port} // 9000; |
|
156
|
0
|
|
0
|
|
|
0
|
my $database = $opts{database} // 'default'; |
|
157
|
0
|
|
0
|
|
|
0
|
my $user = $opts{user} // 'default'; |
|
158
|
0
|
|
0
|
|
|
0
|
my $password = $opts{password} // ''; |
|
159
|
0
|
|
0
|
|
|
0
|
my $client = $opts{client} // 'clickhouse-client'; |
|
160
|
|
|
|
|
|
|
|
|
161
|
0
|
|
|
|
|
0
|
my @cmd = ( |
|
162
|
|
|
|
|
|
|
$client, |
|
163
|
|
|
|
|
|
|
'--host', $host, |
|
164
|
|
|
|
|
|
|
'--port', $port, |
|
165
|
|
|
|
|
|
|
'--database', $database, |
|
166
|
|
|
|
|
|
|
'--user', $user, |
|
167
|
|
|
|
|
|
|
'--query', "$describe_sql format tabseparated", |
|
168
|
|
|
|
|
|
|
); |
|
169
|
|
|
|
|
|
|
|
|
170
|
|
|
|
|
|
|
# Pass password via env so it doesn't show up in `ps`. Always |
|
171
|
|
|
|
|
|
|
# set $password (even empty) so an empty `password => ''` arg |
|
172
|
|
|
|
|
|
|
# also overrides any inherited CLICKHOUSE_PASSWORD. |
|
173
|
0
|
|
|
|
|
0
|
local $ENV{CLICKHOUSE_PASSWORD} = $password; |
|
174
|
|
|
|
|
|
|
|
|
175
|
0
|
0
|
|
|
|
0
|
open my $fh, '-|', @cmd |
|
176
|
|
|
|
|
|
|
or die "Failed to run clickhouse-client: $!"; |
|
177
|
0
|
|
|
|
|
0
|
@lines = <$fh>; |
|
178
|
0
|
|
|
|
|
0
|
close $fh; |
|
179
|
0
|
0
|
|
|
|
0
|
die "clickhouse-client failed: exit code " . ($? >> 8) if $?; |
|
180
|
|
|
|
|
|
|
} |
|
181
|
|
|
|
|
|
|
else { |
|
182
|
0
|
|
|
|
|
0
|
die "Unknown via='$via' (expected 'client' or 'http')"; |
|
183
|
|
|
|
|
|
|
} |
|
184
|
|
|
|
|
|
|
|
|
185
|
0
|
|
|
|
|
0
|
my @columns; |
|
186
|
|
|
|
|
|
|
# ClickHouse's TabSeparated format escapes \\ \n \t \r \0 in field |
|
187
|
|
|
|
|
|
|
# values. Multi-line type expressions (e.g. named Tuple, Nested) come |
|
188
|
|
|
|
|
|
|
# out as a single TSV line with embedded \n and indentation; un-escape |
|
189
|
|
|
|
|
|
|
# so the XS parser sees the canonical type string. The default branch |
|
190
|
|
|
|
|
|
|
# ($1) handles \\ -> \ and any other backslash-escape forms transparently. |
|
191
|
|
|
|
|
|
|
my $unesc = sub { |
|
192
|
0
|
|
0
|
0
|
|
0
|
my $s = shift // ''; |
|
193
|
0
|
0
|
|
|
|
0
|
$s =~ s{\\(.)}{ $1 eq 'n' ? "\n" |
|
|
0
|
0
|
|
|
|
0
|
|
|
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
194
|
|
|
|
|
|
|
: $1 eq 't' ? "\t" |
|
195
|
|
|
|
|
|
|
: $1 eq 'r' ? "\r" |
|
196
|
|
|
|
|
|
|
: $1 eq '0' ? "\0" |
|
197
|
|
|
|
|
|
|
: $1 }ge; |
|
198
|
0
|
|
|
|
|
0
|
$s; |
|
199
|
0
|
|
|
|
|
0
|
}; |
|
200
|
0
|
|
|
|
|
0
|
for my $line (@lines) { |
|
201
|
0
|
|
|
|
|
0
|
chomp $line; |
|
202
|
0
|
0
|
|
|
|
0
|
next if $line eq ''; |
|
203
|
0
|
|
|
|
|
0
|
my ($name, $type) = split /\t/, $line; |
|
204
|
0
|
|
|
|
|
0
|
push @columns, [$unesc->($name), $unesc->($type)]; |
|
205
|
|
|
|
|
|
|
} |
|
206
|
|
|
|
|
|
|
|
|
207
|
0
|
0
|
|
|
|
0
|
die "No columns found for: $describe_sql" unless @columns; |
|
208
|
0
|
|
|
|
|
0
|
return $class->new(columns => \@columns); |
|
209
|
|
|
|
|
|
|
} |
|
210
|
|
|
|
|
|
|
|
|
211
|
|
|
|
|
|
|
sub columns { |
|
212
|
43
|
|
|
43
|
1
|
153658
|
my $self = shift; |
|
213
|
43
|
|
|
|
|
221
|
return $self->_columns; |
|
214
|
|
|
|
|
|
|
} |
|
215
|
|
|
|
|
|
|
|
|
216
|
|
|
|
|
|
|
sub validate_rows { |
|
217
|
8
|
|
|
8
|
1
|
812108
|
my ($self, $rows) = @_; |
|
218
|
8
|
|
|
|
|
8
|
my @errors; |
|
219
|
8
|
|
|
|
|
24
|
for my $i (0 .. $#$rows) { |
|
220
|
24
|
|
|
|
|
24
|
local $@; |
|
221
|
24
|
100
|
|
|
|
28
|
unless (eval { $self->encode([$rows->[$i]]); 1 }) { |
|
|
24
|
|
|
|
|
100
|
|
|
|
20
|
|
|
|
|
60
|
|
|
222
|
4
|
|
|
|
|
40
|
(my $msg = "$@") =~ s/\s*at .+ line \d+\.\s*\z//; |
|
223
|
4
|
|
|
|
|
24
|
push @errors, { row => $i, error => $msg }; |
|
224
|
|
|
|
|
|
|
} |
|
225
|
|
|
|
|
|
|
} |
|
226
|
8
|
|
|
|
|
28
|
return \@errors; |
|
227
|
|
|
|
|
|
|
} |
|
228
|
|
|
|
|
|
|
|
|
229
|
|
|
|
|
|
|
sub compressed_writer { |
|
230
|
20
|
|
|
20
|
1
|
8912
|
my ($class, $mode, $writer) = @_; |
|
231
|
20
|
100
|
100
|
|
|
112
|
return $writer if !defined $mode || $mode eq 'raw'; |
|
232
|
12
|
50
|
|
|
|
32
|
if ($mode eq 'zstd') { |
|
233
|
0
|
|
|
|
|
0
|
require Compress::Zstd; |
|
234
|
|
|
|
|
|
|
return sub { |
|
235
|
0
|
|
|
0
|
|
0
|
my $out = Compress::Zstd::compress($_[0]); |
|
236
|
0
|
0
|
|
|
|
0
|
defined $out or die "zstd compression failed"; |
|
237
|
0
|
|
|
|
|
0
|
$writer->($out); |
|
238
|
0
|
|
|
|
|
0
|
}; |
|
239
|
|
|
|
|
|
|
} |
|
240
|
12
|
100
|
|
|
|
24
|
if ($mode eq 'gzip') { |
|
241
|
4
|
|
|
|
|
2664
|
require IO::Compress::Gzip; |
|
242
|
|
|
|
|
|
|
return sub { |
|
243
|
4
|
|
|
4
|
|
164
|
my $out; |
|
244
|
4
|
50
|
|
|
|
12
|
IO::Compress::Gzip::gzip(\$_[0], \$out) |
|
245
|
|
|
|
|
|
|
or die "gzip failed: $IO::Compress::Gzip::GzipError"; |
|
246
|
4
|
|
|
|
|
6368
|
$writer->($out); |
|
247
|
4
|
|
|
|
|
132088
|
}; |
|
248
|
|
|
|
|
|
|
} |
|
249
|
8
|
|
|
|
|
60
|
die "Unknown compress mode '$mode' (expected 'zstd', 'gzip', or 'raw')"; |
|
250
|
|
|
|
|
|
|
} |
|
251
|
|
|
|
|
|
|
|
|
252
|
|
|
|
|
|
|
# Compressed-block framing used by ClickHouse's CompressedReadBuffer / |
|
253
|
|
|
|
|
|
|
# CompressedWriteBuffer (native TCP protocol with compression=1 enabled, |
|
254
|
|
|
|
|
|
|
# and Native-over-HTTP with Content-Encoding: clickhouse-lz4 etc). |
|
255
|
|
|
|
|
|
|
# Layout: |
|
256
|
|
|
|
|
|
|
# 16 bytes : checksum (CityHash128 of the next 9 + N bytes) |
|
257
|
|
|
|
|
|
|
# 1 byte : method tag (0x82 LZ4, 0x90 ZSTD, 0x02 none) |
|
258
|
|
|
|
|
|
|
# 4 bytes : compressed_size = 1 + 4 + 4 + N (LE UInt32) |
|
259
|
|
|
|
|
|
|
# 4 bytes : uncompressed_size (LE UInt32) |
|
260
|
|
|
|
|
|
|
# N bytes : compressed payload |
|
261
|
|
|
|
|
|
|
# |
|
262
|
|
|
|
|
|
|
# CH's checksum is CityHash128 from the "cityhash102" variant (Google |
|
263
|
|
|
|
|
|
|
# CityHash v1.0.2 with ClickHouse's namespace fork). We bundle a port |
|
264
|
|
|
|
|
|
|
# of that algorithm in cityhash.c, exposed as the `_cityhash128` XSUB, |
|
265
|
|
|
|
|
|
|
# used as the default hasher below. Callers can plug in a different |
|
266
|
|
|
|
|
|
|
# 16-byte hasher via the `hasher => \&h` option. |
|
267
|
|
|
|
|
|
|
my %_COMPRESS_METHOD_TAG = (lz4 => 0x82, zstd => 0x90, none => 0x02); |
|
268
|
|
|
|
|
|
|
|
|
269
|
|
|
|
|
|
|
sub compress_native_block { |
|
270
|
27
|
|
|
27
|
1
|
303556
|
my ($class, $bytes, %opts) = @_; |
|
271
|
27
|
|
100
|
|
|
94
|
my $mode = $opts{mode} // 'lz4'; |
|
272
|
|
|
|
|
|
|
# Default to the bundled CityHash128 v1.0.2 (the "cityhash102" |
|
273
|
|
|
|
|
|
|
# variant CH itself uses for the compressed-block checksum). The |
|
274
|
|
|
|
|
|
|
# `hasher` opt is still honored for callers that want to plug in |
|
275
|
|
|
|
|
|
|
# something else (Digest::CityHash, a vendored copy, etc.). |
|
276
|
27
|
|
100
|
|
|
81
|
my $hasher = $opts{hasher} // \&_cityhash128; |
|
277
|
|
|
|
|
|
|
|
|
278
|
|
|
|
|
|
|
# mode=auto: try LZ4 first, fall back to method-tag 0x02 (uncompressed |
|
279
|
|
|
|
|
|
|
# inside compressed framing - same shape CH's own CompressedWriteBuffer |
|
280
|
|
|
|
|
|
|
# emits when the compressed result is >= the input). Saves CPU on |
|
281
|
|
|
|
|
|
|
# already-incompressible payloads without giving up the checksum |
|
282
|
|
|
|
|
|
|
# protection of the framing. |
|
283
|
|
|
|
|
|
|
# The auto-mode probe doubles as the payload when LZ4 wins, so the |
|
284
|
|
|
|
|
|
|
# input is compressed at most once regardless of which branch wins. |
|
285
|
27
|
|
|
|
|
29
|
my $payload; |
|
286
|
27
|
100
|
|
|
|
51
|
if ($mode eq 'auto') { |
|
287
|
3
|
|
|
|
|
11
|
require Compress::LZ4; |
|
288
|
3
|
|
|
|
|
13
|
my $lz4 = Compress::LZ4::lz4_compress($bytes); |
|
289
|
3
|
100
|
|
|
|
13
|
if (length($lz4) < length($bytes)) { |
|
290
|
2
|
|
|
|
|
2
|
$mode = 'lz4'; |
|
291
|
2
|
|
|
|
|
3
|
$payload = $lz4; |
|
292
|
|
|
|
|
|
|
} else { |
|
293
|
1
|
|
|
|
|
2
|
$mode = 'none'; |
|
294
|
|
|
|
|
|
|
} |
|
295
|
|
|
|
|
|
|
} |
|
296
|
|
|
|
|
|
|
|
|
297
|
27
|
100
|
|
|
|
80
|
my $tag = $_COMPRESS_METHOD_TAG{$mode} |
|
298
|
|
|
|
|
|
|
or die "compress_native_block: unknown mode '$mode' " |
|
299
|
|
|
|
|
|
|
. "(expected 'auto', 'lz4', 'zstd', or 'none')\n"; |
|
300
|
|
|
|
|
|
|
|
|
301
|
26
|
100
|
|
|
|
41
|
if (!defined $payload) { |
|
302
|
24
|
100
|
|
|
|
59
|
if ($mode eq 'lz4') { |
|
|
|
100
|
|
|
|
|
|
|
303
|
17
|
|
|
|
|
73
|
require Compress::LZ4; |
|
304
|
|
|
|
|
|
|
# Compress::LZ4::compress() prepends a 4-byte size header |
|
305
|
|
|
|
|
|
|
# that ClickHouse doesn't want; lz4_compress is the raw- |
|
306
|
|
|
|
|
|
|
# form variant emitting just the LZ4 byte stream. |
|
307
|
17
|
|
|
|
|
99
|
$payload = Compress::LZ4::lz4_compress($bytes); |
|
308
|
|
|
|
|
|
|
} elsif ($mode eq 'zstd') { |
|
309
|
3
|
|
|
|
|
15
|
require Compress::Zstd; |
|
310
|
3
|
|
|
|
|
211
|
my $z = Compress::Zstd::compress($bytes); |
|
311
|
3
|
50
|
|
|
|
10
|
defined $z |
|
312
|
|
|
|
|
|
|
or die "compress_native_block: zstd compression failed\n"; |
|
313
|
3
|
|
|
|
|
7
|
$payload = $z; |
|
314
|
|
|
|
|
|
|
} else { # none = uncompressed inside framing (method tag 0x02) |
|
315
|
4
|
|
|
|
|
5
|
$payload = $bytes; |
|
316
|
|
|
|
|
|
|
} |
|
317
|
|
|
|
|
|
|
} |
|
318
|
|
|
|
|
|
|
|
|
319
|
26
|
|
|
|
|
38
|
my $compressed_size = 1 + 4 + 4 + length($payload); # tag + 2*UInt32 + N |
|
320
|
26
|
|
|
|
|
96
|
my $hdr = pack('C V V', $tag, $compressed_size, length $bytes); |
|
321
|
26
|
|
|
|
|
124
|
my $checksum = $hasher->($hdr . $payload); |
|
322
|
26
|
100
|
|
|
|
63
|
die "compress_native_block: hasher returned " |
|
323
|
|
|
|
|
|
|
. length($checksum) . " bytes (expected 16)\n" |
|
324
|
|
|
|
|
|
|
unless length($checksum) == 16; |
|
325
|
25
|
|
|
|
|
91
|
return $checksum . $hdr . $payload; |
|
326
|
|
|
|
|
|
|
} |
|
327
|
|
|
|
|
|
|
|
|
328
|
|
|
|
|
|
|
# Inverse of compress_native_block. Returns (uncompressed_bytes, bytes_consumed). |
|
329
|
|
|
|
|
|
|
# Verifies the checksum if a hasher is supplied; if hasher is omitted the |
|
330
|
|
|
|
|
|
|
# block is decompressed without verification (useful for inspecting captured |
|
331
|
|
|
|
|
|
|
# data when the cityhash impl isn't available). |
|
332
|
|
|
|
|
|
|
sub decompress_native_block { |
|
333
|
27
|
|
|
27
|
1
|
14565
|
my ($class, $bytes, %opts) = @_; |
|
334
|
27
|
|
100
|
|
|
88
|
my $off = $opts{offset} // 0; |
|
335
|
|
|
|
|
|
|
# Same default as compress_native_block. Callers can pass |
|
336
|
|
|
|
|
|
|
# hasher => undef explicitly to skip checksum verification. |
|
337
|
27
|
100
|
|
|
|
59
|
my $hasher = exists $opts{hasher} ? $opts{hasher} : \&_cityhash128; |
|
338
|
27
|
|
|
|
|
36
|
my $total = length $bytes; |
|
339
|
27
|
50
|
|
|
|
59
|
die "decompress_native_block: truncated header at offset $off " |
|
340
|
|
|
|
|
|
|
. "(need >= 25 bytes, have " . ($total - $off) . ")\n" |
|
341
|
|
|
|
|
|
|
if $total - $off < 25; |
|
342
|
|
|
|
|
|
|
|
|
343
|
27
|
|
|
|
|
44
|
my $checksum = substr($bytes, $off, 16); |
|
344
|
27
|
|
|
|
|
40
|
my $tag = ord substr($bytes, $off + 16, 1); |
|
345
|
27
|
|
|
|
|
74
|
my $csize = unpack 'V', substr($bytes, $off + 17, 4); # incl. 9-byte header |
|
346
|
27
|
|
|
|
|
45
|
my $usize = unpack 'V', substr($bytes, $off + 21, 4); |
|
347
|
|
|
|
|
|
|
|
|
348
|
27
|
50
|
|
|
|
42
|
die "decompress_native_block: compressed_size=$csize < 9 (corrupt)\n" |
|
349
|
|
|
|
|
|
|
if $csize < 9; |
|
350
|
27
|
|
|
|
|
36
|
my $payload_len = $csize - 9; |
|
351
|
27
|
|
|
|
|
30
|
my $end = $off + 16 + $csize; |
|
352
|
27
|
50
|
|
|
|
38
|
die "decompress_native_block: block extends past buffer end\n" |
|
353
|
|
|
|
|
|
|
if $end > $total; |
|
354
|
|
|
|
|
|
|
|
|
355
|
27
|
|
|
|
|
36
|
my $hdr = substr($bytes, $off + 16, 9); |
|
356
|
27
|
|
|
|
|
34
|
my $payload = substr($bytes, $off + 25, $payload_len); |
|
357
|
|
|
|
|
|
|
|
|
358
|
27
|
100
|
|
|
|
41
|
if ($hasher) { |
|
359
|
24
|
|
|
|
|
89
|
my $got = $hasher->($hdr . $payload); |
|
360
|
24
|
100
|
|
|
|
45
|
die "decompress_native_block: checksum mismatch\n" |
|
361
|
|
|
|
|
|
|
unless $got eq $checksum; |
|
362
|
|
|
|
|
|
|
} |
|
363
|
|
|
|
|
|
|
|
|
364
|
26
|
|
|
|
|
25
|
my $out; |
|
365
|
26
|
100
|
|
|
|
52
|
if ($tag == 0x02) { # uncompressed inside framing |
|
|
|
100
|
|
|
|
|
|
|
|
|
100
|
|
|
|
|
|
|
366
|
3
|
|
|
|
|
3
|
$out = $payload; |
|
367
|
|
|
|
|
|
|
} elsif ($tag == 0x82) { # LZ4 |
|
368
|
19
|
|
|
|
|
65
|
require Compress::LZ4; |
|
369
|
|
|
|
|
|
|
# lz4_decompress wants the expected uncompressed size as its |
|
370
|
|
|
|
|
|
|
# second argument since the raw stream has no length prefix. |
|
371
|
19
|
|
|
|
|
77
|
$out = Compress::LZ4::lz4_decompress($payload, $usize); |
|
372
|
19
|
50
|
|
|
|
31
|
die "decompress_native_block: lz4 decompression failed\n" |
|
373
|
|
|
|
|
|
|
unless defined $out; |
|
374
|
|
|
|
|
|
|
} elsif ($tag == 0x90) { # ZSTD |
|
375
|
3
|
|
|
|
|
13
|
require Compress::Zstd; |
|
376
|
3
|
|
|
|
|
208
|
$out = Compress::Zstd::decompress($payload); |
|
377
|
3
|
50
|
|
|
|
12
|
die "decompress_native_block: zstd decompression failed\n" |
|
378
|
|
|
|
|
|
|
unless defined $out; |
|
379
|
|
|
|
|
|
|
} else { |
|
380
|
1
|
|
|
|
|
7
|
die sprintf("decompress_native_block: unknown method tag 0x%02x " |
|
381
|
|
|
|
|
|
|
. "(expected 0x02 NONE, 0x82 LZ4, or 0x90 ZSTD)\n", $tag); |
|
382
|
|
|
|
|
|
|
} |
|
383
|
25
|
100
|
|
|
|
44
|
die "decompress_native_block: decompressed size mismatch " |
|
384
|
|
|
|
|
|
|
. "(header says $usize, got " . length($out) . ")\n" |
|
385
|
|
|
|
|
|
|
unless length($out) == $usize; |
|
386
|
24
|
100
|
|
|
|
93
|
return wantarray ? ($out, $end - $off) : $out; |
|
387
|
|
|
|
|
|
|
} |
|
388
|
|
|
|
|
|
|
|
|
389
|
|
|
|
|
|
|
# Expand any Nested(field T, ...) entries in a column list into the flat |
|
390
|
|
|
|
|
|
|
# `name.field Array(T)` columns ClickHouse stores them as. Returns a new |
|
391
|
|
|
|
|
|
|
# arrayref; non-Nested columns pass through unchanged. Pair with new(): |
|
392
|
|
|
|
|
|
|
# my $enc = ClickHouse::Encoder->new( |
|
393
|
|
|
|
|
|
|
# columns => ClickHouse::Encoder->flatten_nested(\@user_columns)); |
|
394
|
|
|
|
|
|
|
sub flatten_nested { |
|
395
|
13
|
|
|
13
|
1
|
32930
|
my ($class, $cols) = @_; |
|
396
|
13
|
|
|
|
|
15
|
my @out; |
|
397
|
13
|
|
|
|
|
23
|
for my $c (@$cols) { |
|
398
|
17
|
|
|
|
|
31
|
my ($name, $type) = @$c; |
|
399
|
17
|
100
|
|
|
|
102
|
if ($type =~ /\ANested\((.+)\)\z/s) { |
|
400
|
9
|
|
|
|
|
28
|
my @parts = _split_paren_list($1); |
|
401
|
9
|
100
|
|
|
|
33
|
@parts or die "Nested($1) for column '$name' has no elements"; |
|
402
|
8
|
|
|
|
|
12
|
for my $part (@parts) { |
|
403
|
16
|
100
|
|
|
|
92
|
$part =~ /\A([A-Za-z_][A-Za-z0-9_]*)\s+(.+?)\s*\z/s |
|
404
|
|
|
|
|
|
|
or die "Nested element '$part' is not 'name Type'"; |
|
405
|
12
|
|
|
|
|
48
|
push @out, ["$name.$1", "Array($2)"]; |
|
406
|
|
|
|
|
|
|
} |
|
407
|
|
|
|
|
|
|
} else { |
|
408
|
8
|
|
|
|
|
20
|
push @out, [$name, $type]; |
|
409
|
|
|
|
|
|
|
} |
|
410
|
|
|
|
|
|
|
} |
|
411
|
8
|
|
|
|
|
16
|
return \@out; |
|
412
|
|
|
|
|
|
|
} |
|
413
|
|
|
|
|
|
|
|
|
414
|
|
|
|
|
|
|
# Split a comma-separated list at depth-0 commas (so Tuple(Int32, String) |
|
415
|
|
|
|
|
|
|
# inside a Nested element stays one entry). |
|
416
|
|
|
|
|
|
|
sub _split_paren_list { |
|
417
|
9
|
|
|
9
|
|
23
|
my $body = shift; |
|
418
|
9
|
|
|
|
|
10
|
my @parts; |
|
419
|
9
|
|
|
|
|
20
|
my ($start, $depth, $len) = (0, 0, length $body); |
|
420
|
9
|
|
|
|
|
24
|
for (my $i = 0; $i <= $len; $i++) { |
|
421
|
232
|
100
|
|
|
|
240
|
my $c = $i < $len ? substr($body, $i, 1) : ','; |
|
422
|
232
|
100
|
66
|
|
|
503
|
if ($c eq '(') { $depth++ } |
|
|
4
|
100
|
|
|
|
4
|
|
|
|
|
100
|
|
|
|
|
|
|
423
|
4
|
|
|
|
|
4
|
elsif ($c eq ')') { $depth-- } |
|
424
|
|
|
|
|
|
|
elsif ($c eq ',' && $depth == 0) { |
|
425
|
17
|
|
|
|
|
117
|
(my $p = substr($body, $start, $i - $start)) =~ s/\A\s+|\s+\z//g; |
|
426
|
17
|
100
|
|
|
|
76
|
push @parts, $p if length $p; |
|
427
|
17
|
|
|
|
|
28
|
$start = $i + 1; |
|
428
|
|
|
|
|
|
|
} |
|
429
|
|
|
|
|
|
|
} |
|
430
|
9
|
|
|
|
|
23
|
return @parts; |
|
431
|
|
|
|
|
|
|
} |
|
432
|
|
|
|
|
|
|
|
|
433
|
|
|
|
|
|
|
# Row-oriented decode: { ncols, nrows, names, types, rows } where |
|
434
|
|
|
|
|
|
|
# rows is an arrayref of arrayrefs. Calls the XS decode_block_rows, |
|
435
|
|
|
|
|
|
|
# which distributes column values into per-row arrayrefs as each |
|
436
|
|
|
|
|
|
|
# column is decoded and frees the column AV eagerly. Peak memory |
|
437
|
|
|
|
|
|
|
# is one column's AV plus the row AVs (vs a Perl-side transpose |
|
438
|
|
|
|
|
|
|
# that holds ALL column AVs alongside the half-built row AVs). |
|
439
|
|
|
|
|
|
|
# Throughput is similar to the column-major path; the win is the |
|
440
|
|
|
|
|
|
|
# tighter memory profile on wide blocks. |
|
441
|
|
|
|
|
|
|
sub decode_rows { |
|
442
|
4
|
|
|
4
|
1
|
193782
|
my ($class, $bytes, $offset) = @_; |
|
443
|
4
|
|
50
|
|
|
124
|
return $class->decode_block_rows($bytes, $offset // 0); |
|
444
|
|
|
|
|
|
|
} |
|
445
|
|
|
|
|
|
|
|
|
446
|
|
|
|
|
|
|
# Decode a concatenated stream of Native blocks (the body of a |
|
447
|
|
|
|
|
|
|
# `select ... format native` response). Returns an arrayref of the |
|
448
|
|
|
|
|
|
|
# same hashref shape as decode_block. Stops cleanly when bytes are |
|
449
|
|
|
|
|
|
|
# exhausted; partial trailing bytes raise an error from XS. |
|
450
|
|
|
|
|
|
|
# Uses the 3-arg form of decode_block (with offset) to avoid O(N^2) |
|
451
|
|
|
|
|
|
|
# substr copies on long streams. |
|
452
|
|
|
|
|
|
|
sub decode_blocks { |
|
453
|
8
|
|
|
8
|
1
|
183430
|
my ($class, $bytes, $cb, %opts) = @_; |
|
454
|
8
|
|
|
|
|
13
|
my $keep = $opts{keep}; |
|
455
|
8
|
|
|
|
|
8
|
my $off = 0; |
|
456
|
8
|
|
|
|
|
10
|
my $len = length $bytes; |
|
457
|
|
|
|
|
|
|
# Callback form: hand each block to $cb as it's decoded; never |
|
458
|
|
|
|
|
|
|
# accumulate. Useful for streaming selects where the full block |
|
459
|
|
|
|
|
|
|
# list would not fit comfortably in memory. |
|
460
|
8
|
100
|
|
|
|
18
|
if ($cb) { |
|
461
|
2
|
|
|
|
|
6
|
while ($off < $len) { |
|
462
|
5
|
|
|
|
|
317
|
my $block = $class->decode_block($bytes, $off, $keep); |
|
463
|
5
|
|
|
|
|
7
|
$off += $block->{consumed}; |
|
464
|
5
|
|
|
|
|
7
|
$cb->($block); |
|
465
|
|
|
|
|
|
|
} |
|
466
|
2
|
|
|
|
|
216
|
return; |
|
467
|
|
|
|
|
|
|
} |
|
468
|
6
|
|
|
|
|
7
|
my @blocks; |
|
469
|
6
|
|
|
|
|
11
|
while ($off < $len) { |
|
470
|
10
|
|
|
|
|
98
|
my $block = $class->decode_block($bytes, $off, $keep); |
|
471
|
9
|
|
|
|
|
10
|
$off += $block->{consumed}; |
|
472
|
9
|
|
|
|
|
14
|
push @blocks, $block; |
|
473
|
|
|
|
|
|
|
} |
|
474
|
5
|
|
|
|
|
12
|
return \@blocks; |
|
475
|
|
|
|
|
|
|
} |
|
476
|
|
|
|
|
|
|
|
|
477
|
|
|
|
|
|
|
# Return a coderef that yields one block per call (undef when done). |
|
478
|
|
|
|
|
|
|
# Holds a reference to $bytes; the closure is the only thing that |
|
479
|
|
|
|
|
|
|
# survives between calls. |
|
480
|
|
|
|
|
|
|
sub decode_blocks_iter { |
|
481
|
3
|
|
|
3
|
1
|
2154
|
my ($class, $bytes, %opts) = @_; |
|
482
|
3
|
|
|
|
|
5
|
my $keep = $opts{keep}; |
|
483
|
3
|
|
|
|
|
7
|
my $off = 0; |
|
484
|
3
|
|
|
|
|
5
|
my $len = length $bytes; |
|
485
|
|
|
|
|
|
|
return sub { |
|
486
|
9
|
100
|
|
9
|
|
1892
|
return if $off >= $len; |
|
487
|
5
|
|
|
|
|
46
|
my $block = $class->decode_block($bytes, $off, $keep); |
|
488
|
5
|
|
|
|
|
7
|
$off += $block->{consumed}; |
|
489
|
5
|
|
|
|
|
8
|
return $block; |
|
490
|
3
|
|
|
|
|
13
|
}; |
|
491
|
|
|
|
|
|
|
} |
|
492
|
|
|
|
|
|
|
|
|
493
|
|
|
|
|
|
|
# Pull-style decoder that reads incrementally from a filehandle (or |
|
494
|
|
|
|
|
|
|
# any IO::Handle-ish object). For each complete block, invokes $cb |
|
495
|
|
|
|
|
|
|
# with the block hashref. Keeps a sliding buffer; on truncated decode |
|
496
|
|
|
|
|
|
|
# it reads more bytes and retries. Useful when the response body is |
|
497
|
|
|
|
|
|
|
# too large to fit in memory. |
|
498
|
|
|
|
|
|
|
sub decode_stream { |
|
499
|
6
|
|
|
6
|
1
|
18808
|
my ($class, $fh, $cb, %opts) = @_; |
|
500
|
6
|
|
100
|
|
|
21
|
my $chunk_size = $opts{chunk_size} // 64 * 1024; |
|
501
|
6
|
|
|
|
|
7
|
my $keep = $opts{keep}; |
|
502
|
6
|
|
|
|
|
5
|
my $decompress = $opts{decompress}; |
|
503
|
6
|
|
|
|
|
7
|
my $buf = ''; # raw bytes from the filehandle |
|
504
|
6
|
|
|
|
|
9
|
my $inner = ''; # decompressed Native bytes (== $buf when !decompress) |
|
505
|
6
|
|
|
|
|
6
|
my $done = 0; |
|
506
|
6
|
|
|
|
|
12
|
until ($done) { |
|
507
|
|
|
|
|
|
|
# Phase 1: peel compressed-block frames out of $buf into $inner. |
|
508
|
47
|
100
|
|
|
|
51
|
if ($decompress) { |
|
509
|
12
|
|
|
|
|
17
|
while (length($buf) >= 25) { |
|
510
|
10
|
|
|
|
|
14
|
my $csize = unpack 'V', substr($buf, 17, 4); |
|
511
|
10
|
100
|
|
|
|
30
|
last if length($buf) < 16 + $csize; |
|
512
|
5
|
|
|
|
|
5
|
my ($plain, $consumed) = eval { |
|
513
|
5
|
|
|
|
|
8
|
$class->decompress_native_block($buf) |
|
514
|
|
|
|
|
|
|
}; |
|
515
|
5
|
50
|
|
|
|
6
|
die $@ if $@; |
|
516
|
5
|
|
|
|
|
4
|
$inner .= $plain; |
|
517
|
5
|
|
|
|
|
10
|
substr($buf, 0, $consumed, ''); |
|
518
|
|
|
|
|
|
|
} |
|
519
|
|
|
|
|
|
|
} else { |
|
520
|
35
|
|
|
|
|
31
|
$inner = $buf; |
|
521
|
|
|
|
|
|
|
} |
|
522
|
|
|
|
|
|
|
# Phase 2: decode complete Native blocks out of $inner. |
|
523
|
47
|
|
|
|
|
74
|
while (length($inner) > 0) { |
|
524
|
40
|
|
|
|
|
32
|
my $block = eval { $class->decode_block($inner, 0, $keep) }; |
|
|
40
|
|
|
|
|
277
|
|
|
525
|
40
|
100
|
|
|
|
53
|
if ($@) { |
|
526
|
|
|
|
|
|
|
# Truncation or malformed mid-block; need more bytes. |
|
527
|
|
|
|
|
|
|
# Only "buffer truncated" means "data ran short, read |
|
528
|
|
|
|
|
|
|
# more"; "exceeds remaining" indicates a malformed wire |
|
529
|
|
|
|
|
|
|
# value (e.g. a corrupted varint count) and should die |
|
530
|
|
|
|
|
|
|
# rather than spin reading more bytes. |
|
531
|
30
|
50
|
|
|
|
69
|
last if $@ =~ /buffer truncated/i; |
|
532
|
0
|
|
|
|
|
0
|
die $@; # real error |
|
533
|
|
|
|
|
|
|
} |
|
534
|
10
|
|
|
|
|
20
|
$cb->($block); |
|
535
|
10
|
|
|
|
|
531
|
substr($inner, 0, $block->{consumed}, ''); |
|
536
|
|
|
|
|
|
|
} |
|
537
|
|
|
|
|
|
|
# When not decompressing, $inner aliases $buf - carry the residual |
|
538
|
|
|
|
|
|
|
# back so the next read sees the unconsumed tail. |
|
539
|
47
|
100
|
|
|
|
53
|
$buf = $inner unless $decompress; |
|
540
|
|
|
|
|
|
|
# Read more bytes. read() returns 0 on EOF, undef on error. |
|
541
|
47
|
|
|
|
|
28
|
my $more; |
|
542
|
47
|
|
|
|
|
93
|
my $n = read $fh, $more, $chunk_size; |
|
543
|
47
|
50
|
|
|
|
57
|
die "decode_stream: read error: $!" if !defined $n; |
|
544
|
47
|
100
|
|
|
|
49
|
if ($n == 0) { |
|
545
|
|
|
|
|
|
|
# EOF. If anything is left in either buffer it's a truncated |
|
546
|
|
|
|
|
|
|
# final block; raise rather than swallow. |
|
547
|
6
|
100
|
100
|
|
|
25
|
die "decode_stream: " . length($buf) . " trailing bytes " |
|
548
|
|
|
|
|
|
|
. "after last complete compressed block" |
|
549
|
|
|
|
|
|
|
if $decompress && length $buf; |
|
550
|
5
|
100
|
|
|
|
16
|
die "decode_stream: " . length($inner) . " trailing bytes " |
|
551
|
|
|
|
|
|
|
. "after last complete block" if length $inner; |
|
552
|
4
|
|
|
|
|
7
|
$done = 1; |
|
553
|
|
|
|
|
|
|
} else { |
|
554
|
41
|
|
|
|
|
56
|
$buf .= $more; |
|
555
|
|
|
|
|
|
|
} |
|
556
|
|
|
|
|
|
|
} |
|
557
|
4
|
|
|
|
|
7
|
return; |
|
558
|
|
|
|
|
|
|
} |
|
559
|
|
|
|
|
|
|
|
|
560
|
|
|
|
|
|
|
# Query the ClickHouse HTTP endpoint for its version. Returns a list |
|
561
|
|
|
|
|
|
|
# of (major, minor, patch, build) integers and the raw string. Useful |
|
562
|
|
|
|
|
|
|
# for capability gating in user code (e.g. only use JSON columns if |
|
563
|
|
|
|
|
|
|
# the server is at least 24.8). HTTP-only; native TCP not supported. |
|
564
|
|
|
|
|
|
|
sub server_version { |
|
565
|
5
|
|
|
5
|
1
|
193592
|
my ($class, %opts) = @_; |
|
566
|
5
|
|
|
|
|
13
|
my ($url, $hdr) = _http_url_headers('select version()', %opts); |
|
567
|
5
|
|
50
|
|
|
22
|
my $resp = _http_tiny(%opts, timeout => $opts{timeout} // 10) |
|
568
|
|
|
|
|
|
|
->get($url, { headers => $hdr }); |
|
569
|
|
|
|
|
|
|
die "HTTP select version() failed (status $resp->{status}): " |
|
570
|
|
|
|
|
|
|
. "$resp->{content}\n" |
|
571
|
5
|
100
|
|
|
|
433
|
unless $resp->{success}; |
|
572
|
4
|
|
|
|
|
19
|
(my $raw = $resp->{content}) =~ s/\s+\z//; |
|
573
|
4
|
|
|
|
|
18
|
my @parts = ($raw =~ /(\d+)/g); |
|
574
|
|
|
|
|
|
|
return wantarray |
|
575
|
4
|
100
|
50
|
|
|
38
|
? (@parts, $raw) |
|
|
|
|
50
|
|
|
|
|
|
|
|
|
50
|
|
|
|
|
|
|
|
|
100
|
|
|
|
|
|
576
|
|
|
|
|
|
|
: { major => $parts[0] // 0, minor => $parts[1] // 0, |
|
577
|
|
|
|
|
|
|
patch => $parts[2] // 0, build => $parts[3] // 0, |
|
578
|
|
|
|
|
|
|
raw => $raw }; |
|
579
|
|
|
|
|
|
|
} |
|
580
|
|
|
|
|
|
|
|
|
581
|
|
|
|
|
|
|
# Lightweight liveness check via CH's /ping endpoint. Returns 1 on |
|
582
|
|
|
|
|
|
|
# success; croaks on HTTP failure (which includes connection refused, |
|
583
|
|
|
|
|
|
|
# timeout, or non-2xx response). Use to gate on server availability in |
|
584
|
|
|
|
|
|
|
# bootstrap scripts and bulk-load orchestration. |
|
585
|
|
|
|
|
|
|
sub ping { |
|
586
|
1
|
|
|
1
|
1
|
126882
|
my ($class, %opts) = @_; |
|
587
|
1
|
|
|
|
|
4
|
my ($scheme, $host, $port) = _check_endpoint(\%opts); |
|
588
|
1
|
|
|
|
|
3
|
my $url = "$scheme://$host:$port/ping"; |
|
589
|
1
|
|
50
|
|
|
5
|
my $resp = _http_tiny(%opts, timeout => $opts{timeout} // 5)->get($url); |
|
590
|
|
|
|
|
|
|
die "ping: HTTP $resp->{status}: $resp->{content}\n" |
|
591
|
1
|
50
|
|
|
|
2945
|
unless $resp->{success}; |
|
592
|
0
|
|
|
|
|
0
|
return 1; |
|
593
|
|
|
|
|
|
|
} |
|
594
|
|
|
|
|
|
|
|
|
595
|
|
|
|
|
|
|
# Parse a Well-Known-Text (WKT) geometry string into the nested-arrayref |
|
596
|
|
|
|
|
|
|
# representation that the Geo column encoders accept. Supported geometries: |
|
597
|
|
|
|
|
|
|
# POINT, LINESTRING, MULTILINESTRING, POLYGON, MULTIPOLYGON. Coordinates |
|
598
|
|
|
|
|
|
|
# are decimal numbers separated by whitespace; rings/parts are |
|
599
|
|
|
|
|
|
|
# parenthesized. The Ring CH type has no WKT name (it is a single closed |
|
600
|
|
|
|
|
|
|
# LINESTRING); accept LINESTRING for Ring as well. Returns the structure |
|
601
|
|
|
|
|
|
|
# only; the caller chooses which Geo column to feed it into. |
|
602
|
|
|
|
|
|
|
sub parse_wkt { |
|
603
|
20
|
|
|
20
|
1
|
138228
|
my ($class, $wkt) = @_; |
|
604
|
20
|
100
|
|
|
|
45
|
die "parse_wkt: input required\n" unless defined $wkt; |
|
605
|
19
|
|
|
|
|
51
|
$wkt =~ s/\A\s+//; |
|
606
|
19
|
|
|
|
|
60
|
$wkt =~ s/\s+\z//; |
|
607
|
19
|
100
|
|
|
|
112
|
my ($kind, $rest) = $wkt =~ /\A([A-Za-z]+)\s*(.*)\z/s |
|
608
|
|
|
|
|
|
|
or die "parse_wkt: not a WKT geometry: $wkt\n"; |
|
609
|
18
|
|
|
|
|
28
|
$kind = uc $kind; |
|
610
|
|
|
|
|
|
|
# Strip the outermost parens (which every geometry has) once for |
|
611
|
|
|
|
|
|
|
# uniform downstream parsing. EMPTY is rejected because CH Geo |
|
612
|
|
|
|
|
|
|
# columns have no null/empty representation other than zero-length. |
|
613
|
18
|
100
|
|
|
|
57
|
$rest =~ s/\A\(\s*// |
|
614
|
|
|
|
|
|
|
or die "parse_wkt: $kind missing '(': $wkt\n"; |
|
615
|
17
|
100
|
|
|
|
70
|
$rest =~ s/\s*\)\z// |
|
616
|
|
|
|
|
|
|
or die "parse_wkt: $kind unmatched parens: $wkt\n"; |
|
617
|
16
|
100
|
|
|
|
28
|
if ($kind eq 'POINT') { |
|
618
|
5
|
|
|
|
|
10
|
return _wkt_point($rest); |
|
619
|
|
|
|
|
|
|
} |
|
620
|
11
|
100
|
|
|
|
23
|
if ($kind eq 'LINESTRING') { |
|
621
|
3
|
|
|
|
|
7
|
return [ _wkt_points($rest) ]; |
|
622
|
|
|
|
|
|
|
} |
|
623
|
8
|
100
|
100
|
|
|
26
|
if ($kind eq 'MULTILINESTRING' || $kind eq 'POLYGON') { |
|
624
|
|
|
|
|
|
|
# Same shape on the wire (CH Polygon = ring + holes; outer ring |
|
625
|
|
|
|
|
|
|
# comes first; MultiLineString is parallel parts). Parse as a |
|
626
|
|
|
|
|
|
|
# list of paren-wrapped point-lists. |
|
627
|
4
|
|
|
|
|
5
|
my @parts; |
|
628
|
4
|
|
|
|
|
22
|
while ($rest =~ /\G\s*,?\s*\(\s*([^()]+?)\s*\)/gc) { |
|
629
|
5
|
|
|
|
|
7
|
push @parts, [ _wkt_points($1) ]; |
|
630
|
|
|
|
|
|
|
} |
|
631
|
4
|
100
|
|
|
|
11
|
die "parse_wkt: $kind: no parts parsed in $wkt\n" unless @parts; |
|
632
|
3
|
|
|
|
|
31
|
return \@parts; |
|
633
|
|
|
|
|
|
|
} |
|
634
|
4
|
100
|
|
|
|
9
|
if ($kind eq 'MULTIPOLYGON') { |
|
635
|
3
|
|
|
|
|
4
|
my @polys; |
|
636
|
|
|
|
|
|
|
# MULTIPOLYGON(((...),(...)), ((...))): the outer level is |
|
637
|
|
|
|
|
|
|
# polygons, each polygon is a list of rings. |
|
638
|
3
|
|
|
|
|
16
|
while ($rest =~ /\G\s*,?\s*\(\s*(.*?)\s*\)\s*(?=,|\z)/gcs) { |
|
639
|
3
|
|
|
|
|
6
|
my $poly_body = $1; |
|
640
|
3
|
|
|
|
|
2
|
my @rings; |
|
641
|
3
|
|
|
|
|
14
|
while ($poly_body =~ /\G\s*,?\s*\(\s*([^()]+?)\s*\)/gc) { |
|
642
|
3
|
|
|
|
|
4
|
push @rings, [ _wkt_points($1) ]; |
|
643
|
|
|
|
|
|
|
} |
|
644
|
3
|
50
|
|
|
|
5
|
die "parse_wkt: MULTIPOLYGON ring parse failed in $wkt\n" |
|
645
|
|
|
|
|
|
|
unless @rings; |
|
646
|
3
|
|
|
|
|
11
|
push @polys, \@rings; |
|
647
|
|
|
|
|
|
|
} |
|
648
|
3
|
100
|
|
|
|
13
|
die "parse_wkt: MULTIPOLYGON: no polygons parsed in $wkt\n" |
|
649
|
|
|
|
|
|
|
unless @polys; |
|
650
|
2
|
|
|
|
|
18
|
return \@polys; |
|
651
|
|
|
|
|
|
|
} |
|
652
|
1
|
|
|
|
|
6
|
die "parse_wkt: unsupported geometry '$kind'\n"; |
|
653
|
|
|
|
|
|
|
} |
|
654
|
|
|
|
|
|
|
|
|
655
|
|
|
|
|
|
|
sub _wkt_point { |
|
656
|
5
|
|
|
5
|
|
6
|
my ($s) = @_; |
|
657
|
5
|
|
|
|
|
9
|
my @c = split /\s+/, $s; |
|
658
|
5
|
100
|
|
|
|
16
|
die "parse_wkt: POINT needs 2 coords, got '$s'\n" unless @c == 2; |
|
659
|
4
|
|
|
|
|
6
|
return [ map { $_ + 0 } @c ]; |
|
|
8
|
|
|
|
|
38
|
|
|
660
|
|
|
|
|
|
|
} |
|
661
|
|
|
|
|
|
|
|
|
662
|
|
|
|
|
|
|
sub _wkt_points { |
|
663
|
11
|
|
|
11
|
|
16
|
my ($s) = @_; |
|
664
|
11
|
|
|
|
|
10
|
my @pts; |
|
665
|
11
|
|
|
|
|
39
|
for my $pair (split /\s*,\s*/, $s) { |
|
666
|
38
|
|
|
|
|
44
|
my @c = split /\s+/, $pair; |
|
667
|
38
|
100
|
|
|
|
59
|
die "parse_wkt: point needs 2 coords, got '$pair'\n" unless @c == 2; |
|
668
|
37
|
|
|
|
|
34
|
push @pts, [ map { $_ + 0 } @c ]; |
|
|
74
|
|
|
|
|
96
|
|
|
669
|
|
|
|
|
|
|
} |
|
670
|
10
|
|
|
|
|
35
|
return @pts; |
|
671
|
|
|
|
|
|
|
} |
|
672
|
|
|
|
|
|
|
|
|
673
|
|
|
|
|
|
|
# Return the list of supported ClickHouse type names (parametric types |
|
674
|
|
|
|
|
|
|
# are listed as their syntactic prefix). For runtime feature detection |
|
675
|
|
|
|
|
|
|
# and tooling. |
|
676
|
|
|
|
|
|
|
sub types { |
|
677
|
1
|
|
|
1
|
1
|
543
|
return (qw( |
|
678
|
|
|
|
|
|
|
Int8 Int16 Int32 Int64 |
|
679
|
|
|
|
|
|
|
UInt8 UInt16 UInt32 UInt64 |
|
680
|
|
|
|
|
|
|
Float32 Float64 BFloat16 |
|
681
|
|
|
|
|
|
|
String FixedString |
|
682
|
|
|
|
|
|
|
Date Date32 DateTime DateTime64 |
|
683
|
|
|
|
|
|
|
Decimal Decimal32 Decimal64 Decimal128 Decimal256 |
|
684
|
|
|
|
|
|
|
Enum8 Enum16 |
|
685
|
|
|
|
|
|
|
Bool Boolean UUID IPv4 IPv6 |
|
686
|
|
|
|
|
|
|
Array Tuple Nullable Map LowCardinality Variant |
|
687
|
|
|
|
|
|
|
Point Ring LineString MultiLineString Polygon MultiPolygon |
|
688
|
|
|
|
|
|
|
SimpleAggregateFunction |
|
689
|
|
|
|
|
|
|
JSON Object Dynamic |
|
690
|
|
|
|
|
|
|
)); |
|
691
|
|
|
|
|
|
|
} |
|
692
|
|
|
|
|
|
|
|
|
693
|
|
|
|
|
|
|
# Compare two column lists, return { added, removed, changed }. Each |
|
694
|
|
|
|
|
|
|
# slot holds [name, type] pairs from $b (added/changed) or $a (removed). |
|
695
|
|
|
|
|
|
|
# Useful for migration scripts and detecting drift between source and |
|
696
|
|
|
|
|
|
|
# destination schemas in CH-to-CH pipelines. |
|
697
|
|
|
|
|
|
|
sub schema_diff { |
|
698
|
13
|
|
|
13
|
1
|
279535
|
my ($class, $a, $b) = @_; |
|
699
|
13
|
|
|
|
|
26
|
my %a = map { $_->[0] => $_->[1] } @$a; |
|
|
19
|
|
|
|
|
53
|
|
|
700
|
13
|
|
|
|
|
24
|
my %b = map { $_->[0] => $_->[1] } @$b; |
|
|
21
|
|
|
|
|
43
|
|
|
701
|
13
|
|
|
|
|
19
|
my (@added, @removed, @changed); |
|
702
|
13
|
|
|
|
|
49
|
for my $name (sort keys %b) { |
|
703
|
21
|
100
|
|
|
|
109
|
if (!exists $a{$name}) { |
|
|
|
100
|
|
|
|
|
|
|
704
|
7
|
|
|
|
|
38
|
push @added, [$name, $b{$name}]; |
|
705
|
|
|
|
|
|
|
} elsif ($a{$name} ne $b{$name}) { |
|
706
|
5
|
|
|
|
|
16
|
push @changed, [$name, $a{$name}, $b{$name}]; |
|
707
|
|
|
|
|
|
|
} |
|
708
|
|
|
|
|
|
|
} |
|
709
|
13
|
|
|
|
|
27
|
for my $name (sort keys %a) { |
|
710
|
19
|
100
|
|
|
|
45
|
push @removed, [$name, $a{$name}] unless exists $b{$name}; |
|
711
|
|
|
|
|
|
|
} |
|
712
|
13
|
|
|
|
|
79
|
return { added => \@added, removed => \@removed, changed => \@changed }; |
|
713
|
|
|
|
|
|
|
} |
|
714
|
|
|
|
|
|
|
|
|
715
|
|
|
|
|
|
|
# Quote a CH identifier (table name, column name) with backticks, |
|
716
|
|
|
|
|
|
|
# escaping any backtick within. CH's lexer processes C-style backslash |
|
717
|
|
|
|
|
|
|
# escapes inside backtick identifiers, so a literal backslash must be |
|
718
|
|
|
|
|
|
|
# escaped too (else `a\` would read as an escaped backtick and the |
|
719
|
|
|
|
|
|
|
# identifier would never close). This mirrors ClickHouse's own |
|
720
|
|
|
|
|
|
|
# backQuote(): backslash first, then backtick. CH also accepts |
|
721
|
|
|
|
|
|
|
# double-quote quoting; we use backticks because show create table does. |
|
722
|
|
|
|
|
|
|
sub _quote_ident { |
|
723
|
49
|
|
|
49
|
|
62
|
my $name = shift; |
|
724
|
49
|
|
|
|
|
67
|
$name =~ s/\\/\\\\/g; |
|
725
|
49
|
|
|
|
|
55
|
$name =~ s/`/\\`/g; |
|
726
|
49
|
|
|
|
|
110
|
return "`$name`"; |
|
727
|
|
|
|
|
|
|
} |
|
728
|
|
|
|
|
|
|
|
|
729
|
|
|
|
|
|
|
# Render one column definition for format_create_table. A column entry |
|
730
|
|
|
|
|
|
|
# is [name, type] or [name, type, \%col] where %col may carry |
|
731
|
|
|
|
|
|
|
# default/materialized/alias (mutually exclusive), codec, ttl, comment. |
|
732
|
|
|
|
|
|
|
# Expression-valued keys are inserted verbatim - the caller owns their |
|
733
|
|
|
|
|
|
|
# SQL-correctness; only the comment is quoted (it is the one string |
|
734
|
|
|
|
|
|
|
# literal). Clause order matches CH's own `show create table` output; |
|
735
|
|
|
|
|
|
|
# keywords are lowercased to match the rest of the generated SQL. |
|
736
|
|
|
|
|
|
|
sub _format_column { |
|
737
|
22
|
|
|
22
|
|
30
|
my ($col) = @_; |
|
738
|
22
|
|
|
|
|
39
|
my ($name, $type, $extra) = @$col; |
|
739
|
22
|
|
|
|
|
36
|
my $sql = _quote_ident($name) . ' ' . $type; |
|
740
|
22
|
100
|
66
|
|
|
84
|
return $sql unless $extra && ref $extra eq 'HASH'; |
|
741
|
|
|
|
|
|
|
|
|
742
|
8
|
|
|
|
|
9
|
my @kind = grep { defined $extra->{$_} } |
|
|
24
|
|
|
|
|
35
|
|
|
743
|
|
|
|
|
|
|
qw(default materialized alias); |
|
744
|
8
|
100
|
|
|
|
23
|
die "format_create_table: column '$name' has more than one of " |
|
745
|
|
|
|
|
|
|
. "default/materialized/alias\n" |
|
746
|
|
|
|
|
|
|
if @kind > 1; |
|
747
|
7
|
100
|
|
|
|
9
|
if (@kind) { |
|
748
|
3
|
|
|
|
|
5
|
$sql .= " $kind[0] $extra->{$kind[0]}"; |
|
749
|
|
|
|
|
|
|
} |
|
750
|
7
|
100
|
|
|
|
13
|
$sql .= " codec($extra->{codec})" if defined $extra->{codec}; |
|
751
|
7
|
100
|
|
|
|
9
|
$sql .= " ttl $extra->{ttl}" if defined $extra->{ttl}; |
|
752
|
7
|
100
|
|
|
|
9
|
if (defined $extra->{comment}) { |
|
753
|
|
|
|
|
|
|
# CH string literals take C-style backslash escapes; escape the |
|
754
|
|
|
|
|
|
|
# backslash before the quote so an embedded backslash survives. |
|
755
|
3
|
|
|
|
|
7
|
(my $c = $extra->{comment}) =~ s/\\/\\\\/g; |
|
756
|
3
|
|
|
|
|
5
|
$c =~ s/'/\\'/g; |
|
757
|
3
|
|
|
|
|
5
|
$sql .= " comment '$c'"; |
|
758
|
|
|
|
|
|
|
} |
|
759
|
7
|
|
|
|
|
16
|
return $sql; |
|
760
|
|
|
|
|
|
|
} |
|
761
|
|
|
|
|
|
|
|
|
762
|
|
|
|
|
|
|
# Emit a create table statement for the given column list. Table-level |
|
763
|
|
|
|
|
|
|
# opts (engine, partition_by, primary_key, order_by, sample_by, ttl, |
|
764
|
|
|
|
|
|
|
# settings) are emitted in CH's canonical `show create table` order |
|
765
|
|
|
|
|
|
|
# and inserted verbatim - the caller owns SQL correctness there; this |
|
766
|
|
|
|
|
|
|
# helper validates only the column list. Per-column |
|
767
|
|
|
|
|
|
|
# default/materialized/alias/codec/ttl/comment are supported by passing |
|
768
|
|
|
|
|
|
|
# [name, type, \%col] entries. Returns the SQL string (no trailing |
|
769
|
|
|
|
|
|
|
# semicolon). |
|
770
|
|
|
|
|
|
|
sub format_create_table { |
|
771
|
13
|
|
|
13
|
1
|
150317
|
my ($class, %opts) = @_; |
|
772
|
13
|
|
50
|
|
|
42
|
my $table = $opts{table} // die "format_create_table: 'table' required\n"; |
|
773
|
13
|
|
50
|
|
|
44
|
my $columns = $opts{columns} // die "format_create_table: 'columns' arrayref required\n"; |
|
774
|
13
|
|
100
|
|
|
41
|
my $engine = $opts{engine} // 'MergeTree'; |
|
775
|
13
|
|
|
|
|
32
|
_validate_table_name($table); |
|
776
|
|
|
|
|
|
|
|
|
777
|
12
|
|
|
|
|
31
|
my $body = join ",\n ", map { _format_column($_) } @$columns; |
|
|
22
|
|
|
|
|
35
|
|
|
778
|
|
|
|
|
|
|
|
|
779
|
11
|
|
|
|
|
27
|
my $sql = "create table " . _quote_ident($table) . " (\n $body\n)"; |
|
780
|
11
|
|
|
|
|
28
|
$sql .= "\nengine = $engine"; |
|
781
|
11
|
100
|
|
|
|
19
|
$sql .= "\npartition by $opts{partition_by}" if defined $opts{partition_by}; |
|
782
|
11
|
100
|
|
|
|
40
|
$sql .= "\nprimary key $opts{primary_key}" if defined $opts{primary_key}; |
|
783
|
11
|
100
|
|
|
|
22
|
$sql .= "\norder by $opts{order_by}" if defined $opts{order_by}; |
|
784
|
11
|
100
|
|
|
|
24
|
$sql .= "\nsample by $opts{sample_by}" if defined $opts{sample_by}; |
|
785
|
11
|
100
|
|
|
|
29
|
$sql .= "\nttl $opts{ttl}" if defined $opts{ttl}; |
|
786
|
11
|
100
|
|
|
|
21
|
$sql .= "\nsettings $opts{settings}" if defined $opts{settings}; |
|
787
|
11
|
|
|
|
|
31
|
return $sql; |
|
788
|
|
|
|
|
|
|
} |
|
789
|
|
|
|
|
|
|
|
|
790
|
|
|
|
|
|
|
# Translate a schema_diff hashref into a list of alter table statements |
|
791
|
|
|
|
|
|
|
# (one per column change). Returns an arrayref of SQL strings; the |
|
792
|
|
|
|
|
|
|
# caller decides whether to apply them transactionally or one at a |
|
793
|
|
|
|
|
|
|
# time. Conservative ordering: drops first, then modifies, then adds, |
|
794
|
|
|
|
|
|
|
# so a column-rename modeled as drop+add ends up with the new column |
|
795
|
|
|
|
|
|
|
# at the right position. |
|
796
|
|
|
|
|
|
|
sub apply_schema_diff { |
|
797
|
8
|
|
|
8
|
1
|
526
|
my ($class, $diff, %opts) = @_; |
|
798
|
8
|
|
100
|
|
|
49
|
my $table = $opts{table} // die "apply_schema_diff: 'table' required\n"; |
|
799
|
7
|
|
|
|
|
21
|
_validate_table_name($table); |
|
800
|
6
|
|
|
|
|
10
|
my $qt = _quote_ident($table); |
|
801
|
6
|
|
|
|
|
8
|
my @sql; |
|
802
|
6
|
|
50
|
|
|
7
|
for my $row (@{ $diff->{removed} // [] }) { |
|
|
6
|
|
|
|
|
18
|
|
|
803
|
3
|
|
|
|
|
10
|
push @sql, "alter table $qt drop column " |
|
804
|
|
|
|
|
|
|
. _quote_ident($row->[0]); |
|
805
|
|
|
|
|
|
|
} |
|
806
|
6
|
|
50
|
|
|
8
|
for my $row (@{ $diff->{changed} // [] }) { |
|
|
6
|
|
|
|
|
14
|
|
|
807
|
3
|
|
|
|
|
8
|
push @sql, "alter table $qt modify column " |
|
808
|
|
|
|
|
|
|
. _quote_ident($row->[0]) . " $row->[2]"; |
|
809
|
|
|
|
|
|
|
} |
|
810
|
6
|
|
50
|
|
|
9
|
for my $row (@{ $diff->{added} // [] }) { |
|
|
6
|
|
|
|
|
14
|
|
|
811
|
4
|
|
|
|
|
10
|
push @sql, "alter table $qt add column " |
|
812
|
|
|
|
|
|
|
. _quote_ident($row->[0]) . " $row->[1]"; |
|
813
|
|
|
|
|
|
|
} |
|
814
|
6
|
|
|
|
|
19
|
return \@sql; |
|
815
|
|
|
|
|
|
|
} |
|
816
|
|
|
|
|
|
|
|
|
817
|
|
|
|
|
|
|
# Unquote a CH identifier: strip surrounding backticks and collapse both |
|
818
|
|
|
|
|
|
|
# escape conventions CH's lexer accepts - C-style backslash escapes |
|
819
|
|
|
|
|
|
|
# (\X -> X, what backQuote() and _quote_ident emit) and doubled |
|
820
|
|
|
|
|
|
|
# backticks (`` -> `). Bare identifiers pass through untouched. |
|
821
|
|
|
|
|
|
|
sub _unquote_ident { |
|
822
|
33
|
|
|
33
|
|
61
|
my $s = shift; |
|
823
|
33
|
100
|
|
|
|
72
|
if ($s =~ /\A`(.*)`\z/s) { |
|
824
|
23
|
|
|
|
|
26
|
$s = $1; |
|
825
|
23
|
100
|
|
|
|
43
|
$s =~ s/\\(.)|``/defined $1 ? $1 : '`'/ges; |
|
|
2
|
|
|
|
|
8
|
|
|
826
|
|
|
|
|
|
|
} |
|
827
|
33
|
|
|
|
|
69
|
return $s; |
|
828
|
|
|
|
|
|
|
} |
|
829
|
|
|
|
|
|
|
|
|
830
|
|
|
|
|
|
|
# Given the index of an opening backtick in $str, return the index just |
|
831
|
|
|
|
|
|
|
# past the matching closing backtick. Honors both escape conventions: a |
|
832
|
|
|
|
|
|
|
# backslash escapes the next char, and a doubled `` is a literal |
|
833
|
|
|
|
|
|
|
# backtick. Returns length($str) if the quote is never closed. The |
|
834
|
|
|
|
|
|
|
# CREATE-TABLE scanners below use this so an escaped backtick inside an |
|
835
|
|
|
|
|
|
|
# identifier cannot be mistaken for the end of the quoted region. |
|
836
|
|
|
|
|
|
|
sub _skip_backtick_quoted { |
|
837
|
42
|
|
|
42
|
|
45
|
my ($str, $i) = @_; |
|
838
|
42
|
|
|
|
|
40
|
my $len = length $str; |
|
839
|
42
|
|
|
|
|
37
|
$i++; # past the opening backtick |
|
840
|
42
|
|
|
|
|
43
|
while ($i < $len) { |
|
841
|
157
|
|
|
|
|
133
|
my $c = substr($str, $i, 1); |
|
842
|
157
|
100
|
|
|
|
154
|
if ($c eq '\\') { $i += 2; next } |
|
|
2
|
|
|
|
|
1
|
|
|
|
2
|
|
|
|
|
2
|
|
|
843
|
155
|
100
|
|
|
|
144
|
if ($c eq '`') { |
|
844
|
44
|
100
|
100
|
|
|
145
|
return $i + 1 |
|
845
|
|
|
|
|
|
|
unless $i + 1 < $len && substr($str, $i + 1, 1) eq '`'; |
|
846
|
2
|
|
|
|
|
2
|
$i += 2; # doubled-backtick escape |
|
847
|
2
|
|
|
|
|
3
|
next; |
|
848
|
|
|
|
|
|
|
} |
|
849
|
111
|
|
|
|
|
103
|
$i++; |
|
850
|
|
|
|
|
|
|
} |
|
851
|
0
|
|
|
|
|
0
|
return $len; |
|
852
|
|
|
|
|
|
|
} |
|
853
|
|
|
|
|
|
|
|
|
854
|
|
|
|
|
|
|
# Split a (possibly backtick-quoted, possibly database-qualified) table |
|
855
|
|
|
|
|
|
|
# name into ($database, $table). The qualifying dot is the first one |
|
856
|
|
|
|
|
|
|
# that is not inside backticks; a name with no such dot has an undef |
|
857
|
|
|
|
|
|
|
# database. |
|
858
|
|
|
|
|
|
|
sub _split_qname { |
|
859
|
12
|
|
|
12
|
|
21
|
my ($qname) = @_; |
|
860
|
12
|
|
|
|
|
27
|
my $len = length $qname; |
|
861
|
12
|
|
|
|
|
8
|
my $dot = -1; |
|
862
|
12
|
|
|
|
|
28
|
for (my $i = 0; $i < $len; $i++) { |
|
863
|
27
|
|
|
|
|
33
|
my $c = substr($qname, $i, 1); |
|
864
|
27
|
100
|
|
|
|
36
|
if ($c eq '`') { $i = _skip_backtick_quoted($qname, $i) - 1; next } |
|
|
3
|
|
|
|
|
7
|
|
|
|
3
|
|
|
|
|
7
|
|
|
865
|
24
|
100
|
|
|
|
39
|
if ($c eq '.') { $dot = $i; last } |
|
|
2
|
|
|
|
|
3
|
|
|
|
2
|
|
|
|
|
4
|
|
|
866
|
|
|
|
|
|
|
} |
|
867
|
12
|
100
|
|
|
|
39
|
return (undef, _unquote_ident($qname)) if $dot < 0; |
|
868
|
2
|
|
|
|
|
29
|
return (_unquote_ident(substr($qname, 0, $dot)), |
|
869
|
|
|
|
|
|
|
_unquote_ident(substr($qname, $dot + 1))); |
|
870
|
|
|
|
|
|
|
} |
|
871
|
|
|
|
|
|
|
|
|
872
|
|
|
|
|
|
|
# Split a create table column block on top-level commas, respecting both |
|
873
|
|
|
|
|
|
|
# parentheses (nested type args) and backtick-quoted identifiers (which |
|
874
|
|
|
|
|
|
|
# may legally contain commas or parens). _split_paren_list alone is not |
|
875
|
|
|
|
|
|
|
# backtick-aware, hence this dedicated splitter. |
|
876
|
|
|
|
|
|
|
sub _split_column_defs { |
|
877
|
10
|
|
|
10
|
|
10
|
my $body = shift; |
|
878
|
10
|
|
|
|
|
10
|
my @parts; |
|
879
|
10
|
|
|
|
|
13
|
my ($start, $depth, $len) = (0, 0, length $body); |
|
880
|
10
|
|
|
|
|
13
|
for (my $i = 0; $i <= $len; $i++) { |
|
881
|
337
|
100
|
|
|
|
325
|
my $c = $i < $len ? substr($body, $i, 1) : ','; |
|
882
|
337
|
100
|
|
|
|
331
|
if ($c eq '`') { $i = _skip_backtick_quoted($body, $i) - 1; next } |
|
|
19
|
|
|
|
|
19
|
|
|
|
19
|
|
|
|
|
26
|
|
|
883
|
318
|
100
|
100
|
|
|
668
|
if ($c eq '(') { $depth++ } |
|
|
8
|
100
|
|
|
|
21
|
|
|
|
|
100
|
|
|
|
|
|
|
884
|
8
|
|
|
|
|
9
|
elsif ($c eq ')') { $depth-- } |
|
885
|
|
|
|
|
|
|
elsif ($c eq ',' && $depth == 0) { |
|
886
|
19
|
|
|
|
|
108
|
(my $p = substr($body, $start, $i - $start)) |
|
887
|
|
|
|
|
|
|
=~ s/\A\s+|\s+\z//g; |
|
888
|
19
|
50
|
|
|
|
32
|
push @parts, $p if length $p; |
|
889
|
19
|
|
|
|
|
28
|
$start = $i + 1; |
|
890
|
|
|
|
|
|
|
} |
|
891
|
|
|
|
|
|
|
} |
|
892
|
10
|
|
|
|
|
20
|
return @parts; |
|
893
|
|
|
|
|
|
|
} |
|
894
|
|
|
|
|
|
|
|
|
895
|
|
|
|
|
|
|
# Parse the output of `show create table` (or any create table DDL) into |
|
896
|
|
|
|
|
|
|
# a structured hashref: { database, table, columns => [[name,type],...], |
|
897
|
|
|
|
|
|
|
# engine, order_by, partition_by, primary_key, sample_by, ttl, settings }. |
|
898
|
|
|
|
|
|
|
# Clause values are returned verbatim (trimmed); columns is in the same |
|
899
|
|
|
|
|
|
|
# [name,type] shape schema_diff and format_create_table consume, so a |
|
900
|
|
|
|
|
|
|
# round-trip CH -> parse -> diff -> ALTER is one call each. Per-column |
|
901
|
|
|
|
|
|
|
# DEFAULT/CODEC/TTL modifiers are dropped from the type (they are not |
|
902
|
|
|
|
|
|
|
# part of the type proper); the bare type is what CH's own `describe` |
|
903
|
|
|
|
|
|
|
# would report. Croaks if no create table header or column block is found. |
|
904
|
|
|
|
|
|
|
sub parse_create_table { |
|
905
|
14
|
|
|
14
|
1
|
164529
|
my ($class, $ddl) = @_; |
|
906
|
14
|
100
|
|
|
|
36
|
die "parse_create_table: input required\n" unless defined $ddl; |
|
907
|
|
|
|
|
|
|
|
|
908
|
|
|
|
|
|
|
# A name part is a backtick-quoted identifier or a bare run with no |
|
909
|
|
|
|
|
|
|
# space / dot / paren; the table name is one part, optionally |
|
910
|
|
|
|
|
|
|
# database-qualified with a second. Inside backticks both escape |
|
911
|
|
|
|
|
|
|
# forms are accepted: \X (backslash) and `` (doubled). |
|
912
|
13
|
|
|
|
|
33
|
my $part = qr/(?:`(?:[^`\\]|\\.|``)+`|[^\s.(]+)/; |
|
913
|
13
|
100
|
|
|
|
489
|
$ddl =~ /\bCREATE\s+(?:OR\s+REPLACE\s+)?(?:TEMPORARY\s+)?TABLE\s+ |
|
914
|
|
|
|
|
|
|
(?:IF\s+NOT\s+EXISTS\s+)? |
|
915
|
|
|
|
|
|
|
($part (?:\.$part)?)/xgci |
|
916
|
|
|
|
|
|
|
or die "parse_create_table: no create table header found\n"; |
|
917
|
12
|
|
|
|
|
30
|
my $qname = $1; |
|
918
|
12
|
|
|
|
|
15
|
my $name_end = pos $ddl; |
|
919
|
|
|
|
|
|
|
|
|
920
|
12
|
|
|
|
|
20
|
my ($database, $table) = _split_qname($qname); |
|
921
|
|
|
|
|
|
|
|
|
922
|
|
|
|
|
|
|
# Locate the column block: the first balanced (...) after the name. |
|
923
|
12
|
|
|
|
|
20
|
my $open = index $ddl, '(', $name_end; |
|
924
|
12
|
100
|
|
|
|
22
|
die "parse_create_table: no column list found\n" if $open < 0; |
|
925
|
11
|
|
|
|
|
19
|
my ($depth, $close, $len) = (0, -1, length $ddl); |
|
926
|
11
|
|
|
|
|
19
|
for (my $i = $open; $i < $len; $i++) { |
|
927
|
355
|
|
|
|
|
283
|
my $c = substr($ddl, $i, 1); |
|
928
|
355
|
100
|
|
|
|
329
|
if ($c eq '`') { $i = _skip_backtick_quoted($ddl, $i) - 1; next } |
|
|
20
|
|
|
|
|
24
|
|
|
|
20
|
|
|
|
|
29
|
|
|
929
|
335
|
100
|
|
|
|
479
|
if ($c eq '(') { $depth++ } |
|
|
19
|
100
|
|
|
|
23
|
|
|
930
|
18
|
100
|
|
|
|
13
|
elsif ($c eq ')') { $depth--; if ($depth == 0) { $close = $i; last } } |
|
|
18
|
|
|
|
|
25
|
|
|
|
10
|
|
|
|
|
25
|
|
|
|
10
|
|
|
|
|
11
|
|
|
931
|
|
|
|
|
|
|
} |
|
932
|
11
|
100
|
|
|
|
19
|
die "parse_create_table: unbalanced column list\n" if $close < 0; |
|
933
|
10
|
|
|
|
|
14
|
my $block = substr $ddl, $open + 1, $close - $open - 1; |
|
934
|
|
|
|
|
|
|
|
|
935
|
10
|
|
|
|
|
11
|
my @columns; |
|
936
|
10
|
|
|
|
|
17
|
for my $def (_split_column_defs($block)) { |
|
937
|
|
|
|
|
|
|
# Skip table-level INDEX / CONSTRAINT / PROJECTION / primary key |
|
938
|
|
|
|
|
|
|
# entries that share the column block. |
|
939
|
19
|
50
|
|
|
|
38
|
next if $def =~ /\A(?:INDEX|CONSTRAINT|PROJECTION|PRIMARY\s+KEY)\b/i; |
|
940
|
19
|
|
|
|
|
18
|
my ($cname, $rest); |
|
941
|
19
|
50
|
|
|
|
74
|
if ($def =~ /\A(`(?:[^`\\]|\\.|``)+`)\s+(.*)\z/s) { |
|
|
|
0
|
|
|
|
|
|
|
942
|
19
|
|
|
|
|
23
|
($cname, $rest) = (_unquote_ident($1), $2); |
|
943
|
|
|
|
|
|
|
} elsif ($def =~ /\A(\S+)\s+(.*)\z/s) { |
|
944
|
0
|
|
|
|
|
0
|
($cname, $rest) = ($1, $2); |
|
945
|
|
|
|
|
|
|
} else { |
|
946
|
0
|
|
|
|
|
0
|
next; |
|
947
|
|
|
|
|
|
|
} |
|
948
|
|
|
|
|
|
|
# The type is the leading identifier plus its balanced (...) args; |
|
949
|
|
|
|
|
|
|
# DEFAULT / CODEC / TTL / COMMENT modifiers come after and are |
|
950
|
|
|
|
|
|
|
# not part of the type. |
|
951
|
19
|
|
|
|
|
33
|
my $type = _take_type(\$rest); |
|
952
|
19
|
50
|
|
|
|
53
|
push @columns, [$cname, $type] if length $type; |
|
953
|
|
|
|
|
|
|
} |
|
954
|
10
|
50
|
|
|
|
20
|
die "parse_create_table: no columns parsed\n" unless @columns; |
|
955
|
|
|
|
|
|
|
|
|
956
|
10
|
|
|
|
|
12
|
my $tail = substr $ddl, $close + 1; |
|
957
|
10
|
|
|
|
|
35
|
my %out = ( |
|
958
|
|
|
|
|
|
|
database => $database, |
|
959
|
|
|
|
|
|
|
table => $table, |
|
960
|
|
|
|
|
|
|
columns => \@columns, |
|
961
|
|
|
|
|
|
|
); |
|
962
|
|
|
|
|
|
|
# Trailing clauses. ENGINE has no terminating keyword of its own; |
|
963
|
|
|
|
|
|
|
# each clause runs until the next clause keyword or end of string. |
|
964
|
10
|
|
|
|
|
19
|
my $stop = qr/\bENGINE\b|\bPARTITION\s+BY\b|\bPRIMARY\s+KEY\b| |
|
965
|
|
|
|
|
|
|
\bORDER\s+BY\b|\bSAMPLE\s+BY\b|\bTTL\b|\bSETTINGS\b|\z/xi; |
|
966
|
10
|
|
|
|
|
96
|
my %clause = ( |
|
967
|
|
|
|
|
|
|
engine => qr/\bENGINE\s*=\s*/i, |
|
968
|
|
|
|
|
|
|
partition_by => qr/\bPARTITION\s+BY\s+/i, |
|
969
|
|
|
|
|
|
|
primary_key => qr/\bPRIMARY\s+KEY\s+/i, |
|
970
|
|
|
|
|
|
|
order_by => qr/\bORDER\s+BY\s+/i, |
|
971
|
|
|
|
|
|
|
sample_by => qr/\bSAMPLE\s+BY\s+/i, |
|
972
|
|
|
|
|
|
|
ttl => qr/\bTTL\s+/i, |
|
973
|
|
|
|
|
|
|
settings => qr/\bSETTINGS\s+/i, |
|
974
|
|
|
|
|
|
|
); |
|
975
|
10
|
|
|
|
|
23
|
for my $k (keys %clause) { |
|
976
|
|
|
|
|
|
|
# No /g: each clause is searched from the start of $tail |
|
977
|
|
|
|
|
|
|
# independently. With /g the shared pos() on $tail would make a |
|
978
|
|
|
|
|
|
|
# clause that sorts earlier in the string than a previously |
|
979
|
|
|
|
|
|
|
# matched one unfindable. $+[0] is the end offset of the match. |
|
980
|
70
|
100
|
|
|
|
219
|
next unless $tail =~ $clause{$k}; |
|
981
|
22
|
|
|
|
|
56
|
my $val = substr $tail, $+[0]; |
|
982
|
22
|
|
|
|
|
312
|
$val =~ s/$stop.*\z//s; |
|
983
|
22
|
|
|
|
|
58
|
$val =~ s/\A\s+|\s+\z//g; |
|
984
|
22
|
50
|
|
|
|
46
|
$out{$k} = $val if length $val; |
|
985
|
|
|
|
|
|
|
} |
|
986
|
10
|
|
|
|
|
64
|
return \%out; |
|
987
|
|
|
|
|
|
|
} |
|
988
|
|
|
|
|
|
|
|
|
989
|
|
|
|
|
|
|
# Consume one CH type expression from the front of $$rest (an identifier |
|
990
|
|
|
|
|
|
|
# optionally followed by a balanced parenthesised argument list), advance |
|
991
|
|
|
|
|
|
|
# $$rest past it, and return the type string. Used by parse_create_table |
|
992
|
|
|
|
|
|
|
# to separate the type from trailing DEFAULT/CODEC/... modifiers. |
|
993
|
|
|
|
|
|
|
sub _take_type { |
|
994
|
19
|
|
|
19
|
|
20
|
my ($rest) = @_; |
|
995
|
19
|
|
|
|
|
24
|
$$rest =~ s/\A\s+//; |
|
996
|
19
|
50
|
|
|
|
35
|
return '' unless $$rest =~ /\A([A-Za-z_]\w*)/; |
|
997
|
19
|
|
|
|
|
44
|
my $type = $1; |
|
998
|
19
|
|
|
|
|
17
|
my $pos = length $type; |
|
999
|
19
|
100
|
|
|
|
38
|
if (substr($$rest, $pos, 1) eq '(') { |
|
1000
|
5
|
|
|
|
|
5
|
my ($depth, $len) = (0, length $$rest); |
|
1001
|
5
|
|
|
|
|
9
|
for (my $i = $pos; $i < $len; $i++) { |
|
1002
|
80
|
|
|
|
|
70
|
my $c = substr($$rest, $i, 1); |
|
1003
|
80
|
50
|
|
|
|
84
|
if ($c eq '`') { $i = _skip_backtick_quoted($$rest, $i) - 1; next } |
|
|
0
|
|
|
|
|
0
|
|
|
|
0
|
|
|
|
|
0
|
|
|
1004
|
80
|
100
|
|
|
|
115
|
if ($c eq '(') { $depth++ } |
|
|
7
|
100
|
|
|
|
7
|
|
|
1005
|
7
|
100
|
|
|
|
5
|
elsif ($c eq ')') { $depth--; if (!$depth) { $pos = $i + 1; last } } |
|
|
7
|
|
|
|
|
9
|
|
|
|
5
|
|
|
|
|
4
|
|
|
|
5
|
|
|
|
|
6
|
|
|
1006
|
|
|
|
|
|
|
} |
|
1007
|
|
|
|
|
|
|
} |
|
1008
|
19
|
|
|
|
|
26
|
$type = substr $$rest, 0, $pos; |
|
1009
|
19
|
|
|
|
|
31
|
substr($$rest, 0, $pos) = ''; |
|
1010
|
19
|
|
|
|
|
24
|
return $type; |
|
1011
|
|
|
|
|
|
|
} |
|
1012
|
|
|
|
|
|
|
|
|
1013
|
|
|
|
|
|
|
# Inspect a captured Native block and return its column shape as a |
|
1014
|
|
|
|
|
|
|
# fresh encoder configured for that shape. Useful for diagnosing |
|
1015
|
|
|
|
|
|
|
# captured payloads off-line (no server, no schema source) and for |
|
1016
|
|
|
|
|
|
|
# round-tripping bytes through a transform/filter step. Zero-row |
|
1017
|
|
|
|
|
|
|
# blocks work fine - the column headers are still on the wire. |
|
1018
|
|
|
|
|
|
|
sub for_native_bytes { |
|
1019
|
2
|
|
|
2
|
1
|
1410
|
my ($class, $bytes) = @_; |
|
1020
|
2
|
|
|
|
|
54
|
my $blk = $class->decode_block($bytes); |
|
1021
|
2
|
|
|
|
|
5
|
my @cols; |
|
1022
|
2
|
|
|
|
|
3
|
for my $col (@{ $blk->{columns} }) { |
|
|
2
|
|
|
|
|
7
|
|
|
1023
|
4
|
|
|
|
|
16
|
push @cols, [ $col->{name}, $col->{type} ]; |
|
1024
|
|
|
|
|
|
|
} |
|
1025
|
2
|
|
|
|
|
21
|
return $class->new(columns => \@cols); |
|
1026
|
|
|
|
|
|
|
} |
|
1027
|
|
|
|
|
|
|
|
|
1028
|
|
|
|
|
|
|
# --- RowBinary ------------------------------------------------------- |
|
1029
|
|
|
|
|
|
|
# |
|
1030
|
|
|
|
|
|
|
# RowBinary is ClickHouse's row-major binary format: each row is the |
|
1031
|
|
|
|
|
|
|
# concatenation of its column values, with no headers. The per-value |
|
1032
|
|
|
|
|
|
|
# byte encoding for scalar / String / FixedString / Nullable columns is |
|
1033
|
|
|
|
|
|
|
# byte-identical to that column's data in a one-row Native block, so |
|
1034
|
|
|
|
|
|
|
# encode_row_binary reuses the XS Native encoder and slices the value |
|
1035
|
|
|
|
|
|
|
# region out; decode_row_binary wraps a value back into a one-row Native |
|
1036
|
|
|
|
|
|
|
# block and runs the XS decoder. Array(T) is the one shape that differs |
|
1037
|
|
|
|
|
|
|
# (RowBinary uses a varint element count where Native uses a UInt64 |
|
1038
|
|
|
|
|
|
|
# offset) and is handled by recursion. This keeps a single source of |
|
1039
|
|
|
|
|
|
|
# truth for every type's wire bytes - no second per-type codec in Perl. |
|
1040
|
|
|
|
|
|
|
# |
|
1041
|
|
|
|
|
|
|
# Supported: all scalar types (Int*/UInt*/Float*/Bool/Date*/DateTime*/ |
|
1042
|
|
|
|
|
|
|
# Decimal*/UUID/IPv4/IPv6/Enum*), String, FixedString(N), Nullable of |
|
1043
|
|
|
|
|
|
|
# any of those, Array(...) nesting, and LowCardinality(...) (encoded as |
|
1044
|
|
|
|
|
|
|
# its inner type, which is how RowBinary represents it). Map, Tuple, |
|
1045
|
|
|
|
|
|
|
# Variant, JSON, Dynamic, Geo and Nested croak - their Native and |
|
1046
|
|
|
|
|
|
|
# RowBinary framings diverge in ways the slice trick cannot bridge. |
|
1047
|
|
|
|
|
|
|
|
|
1048
|
|
|
|
|
|
|
# Fixed on-wire byte width per scalar base type (parametric suffix |
|
1049
|
|
|
|
|
|
|
# already stripped by the caller). Decimal/FixedString are sized |
|
1050
|
|
|
|
|
|
|
# separately because their width depends on the type parameters. |
|
1051
|
|
|
|
|
|
|
my %RB_FIXED_WIDTH = ( |
|
1052
|
|
|
|
|
|
|
Int8 => 1, UInt8 => 1, Bool => 1, Boolean => 1, Enum8 => 1, |
|
1053
|
|
|
|
|
|
|
Int16 => 2, UInt16 => 2, Date => 2, Enum16 => 2, |
|
1054
|
|
|
|
|
|
|
Int32 => 4, UInt32 => 4, Float32 => 4, BFloat16 => 2, |
|
1055
|
|
|
|
|
|
|
Date32 => 4, DateTime => 4, IPv4 => 4, Decimal32 => 4, |
|
1056
|
|
|
|
|
|
|
Int64 => 8, UInt64 => 8, Float64 => 8, DateTime64 => 8, |
|
1057
|
|
|
|
|
|
|
Decimal64 => 8, |
|
1058
|
|
|
|
|
|
|
Int128 => 16, UInt128 => 16, UUID => 16, IPv6 => 16, Decimal128 => 16, |
|
1059
|
|
|
|
|
|
|
Int256 => 32, UInt256 => 32, Decimal256 => 32, |
|
1060
|
|
|
|
|
|
|
); |
|
1061
|
|
|
|
|
|
|
|
|
1062
|
|
|
|
|
|
|
# Croak unless $type is a RowBinary-supported scalar (or Nullable of |
|
1063
|
|
|
|
|
|
|
# one). Array / LowCardinality are peeled by the caller before this |
|
1064
|
|
|
|
|
|
|
# runs, so anything parenthesised-and-recursive that reaches here is |
|
1065
|
|
|
|
|
|
|
# unsupported. |
|
1066
|
|
|
|
|
|
|
sub _rb_assert_scalar { |
|
1067
|
82
|
|
|
82
|
|
90
|
my ($type) = @_; |
|
1068
|
|
|
|
|
|
|
# ClickHouse does not nest Nullable, so a single peel is enough. |
|
1069
|
82
|
|
|
|
|
78
|
my $t = $type; |
|
1070
|
82
|
100
|
|
|
|
137
|
$t = $1 if $t =~ /\ANullable\((.+)\)\z/s; |
|
1071
|
82
|
100
|
|
|
|
172
|
die "row_binary: type '$type' is not supported (RowBinary covers " |
|
1072
|
|
|
|
|
|
|
. "scalar/String/FixedString columns, optionally Nullable, " |
|
1073
|
|
|
|
|
|
|
. "Array, or LowCardinality)\n" |
|
1074
|
|
|
|
|
|
|
if $t =~ /\A(?:Map|Tuple|Variant|JSON|Object|Dynamic|Point|Ring |
|
1075
|
|
|
|
|
|
|
|LineString|MultiLineString|Polygon|MultiPolygon |
|
1076
|
|
|
|
|
|
|
|Nested|SimpleAggregateFunction|AggregateFunction |
|
1077
|
|
|
|
|
|
|
|Array|LowCardinality)\b/x; |
|
1078
|
78
|
|
|
|
|
76
|
return; |
|
1079
|
|
|
|
|
|
|
} |
|
1080
|
|
|
|
|
|
|
|
|
1081
|
|
|
|
|
|
|
# The varint / length-string codecs are XS, registered into the |
|
1082
|
|
|
|
|
|
|
# ClickHouse::Encoder::TCP package (the protocol packers use them); |
|
1083
|
|
|
|
|
|
|
# the single shared object installs them regardless of package, so |
|
1084
|
|
|
|
|
|
|
# they are callable here once the main module's XS has booted. Alias |
|
1085
|
|
|
|
|
|
|
# them in as glob aliases (not wrapper subs) so a call goes straight |
|
1086
|
|
|
|
|
|
|
# to the XSUB with no extra frame and exact context propagation. |
|
1087
|
|
|
|
|
|
|
## no critic (ProhibitCallsToUnexportedSubs) |
|
1088
|
|
|
|
|
|
|
*_rb_pack_varint = \&ClickHouse::Encoder::TCP::pack_varint; |
|
1089
|
|
|
|
|
|
|
*_rb_unpack_varint = \&ClickHouse::Encoder::TCP::unpack_varint; |
|
1090
|
|
|
|
|
|
|
*_rb_pack_string = \&ClickHouse::Encoder::TCP::pack_string; |
|
1091
|
|
|
|
|
|
|
## use critic |
|
1092
|
|
|
|
|
|
|
|
|
1093
|
|
|
|
|
|
|
# Encode one value of $type into RowBinary bytes. $cache memoises the |
|
1094
|
|
|
|
|
|
|
# single-column Native encoder + value-region offset per scalar type. |
|
1095
|
|
|
|
|
|
|
sub _rb_encode_value { |
|
1096
|
55
|
|
|
55
|
|
78
|
my ($type, $val, $cache) = @_; |
|
1097
|
55
|
100
|
|
|
|
90
|
if ($type =~ /\AArray\((.+)\)\z/s) { |
|
1098
|
8
|
|
|
|
|
13
|
my $inner = $1; |
|
1099
|
8
|
50
|
|
|
|
14
|
die "encode_row_binary: Array column needs an arrayref value\n" |
|
1100
|
|
|
|
|
|
|
unless ref $val eq 'ARRAY'; |
|
1101
|
8
|
|
|
|
|
15
|
my $s = _rb_pack_varint(scalar @$val); |
|
1102
|
8
|
|
|
|
|
27
|
$s .= _rb_encode_value($inner, $_, $cache) for @$val; |
|
1103
|
8
|
|
|
|
|
17
|
return $s; |
|
1104
|
|
|
|
|
|
|
} |
|
1105
|
47
|
100
|
|
|
|
61
|
if ($type =~ /\ALowCardinality\((.+)\)\z/s) { |
|
1106
|
2
|
|
|
|
|
4
|
return _rb_encode_value($1, $val, $cache); |
|
1107
|
|
|
|
|
|
|
} |
|
1108
|
45
|
|
|
|
|
72
|
_rb_assert_scalar($type); |
|
1109
|
41
|
|
66
|
|
|
75
|
my $slot = $cache->{$type} ||= do { |
|
1110
|
17
|
|
|
|
|
77
|
my $enc = ClickHouse::Encoder->new(columns => [['c', $type]]); |
|
1111
|
|
|
|
|
|
|
# One-row, one-column Native block prefix: |
|
1112
|
|
|
|
|
|
|
# varint(ncols=1) varint(nrows=1) lenstr("c") lenstr(type) |
|
1113
|
17
|
|
|
|
|
52
|
my $prefix = 2 + length(_rb_pack_string('c')) |
|
1114
|
|
|
|
|
|
|
+ length(_rb_pack_string($type)); |
|
1115
|
17
|
|
|
|
|
47
|
[$enc, $prefix]; |
|
1116
|
|
|
|
|
|
|
}; |
|
1117
|
41
|
|
|
|
|
249
|
return substr($slot->[0]->encode([[ $val ]]), $slot->[1]); |
|
1118
|
|
|
|
|
|
|
} |
|
1119
|
|
|
|
|
|
|
|
|
1120
|
|
|
|
|
|
|
# Encode an arrayref of rows into a RowBinary byte string. Call on an |
|
1121
|
|
|
|
|
|
|
# encoder instance (its column types drive serialisation). The result |
|
1122
|
|
|
|
|
|
|
# is the request body for `insert ... format RowBinary`. |
|
1123
|
|
|
|
|
|
|
sub encode_row_binary { |
|
1124
|
12
|
|
|
12
|
1
|
168961
|
my ($self, $rows) = @_; |
|
1125
|
12
|
100
|
|
|
|
37
|
die "encode_row_binary: rows must be an arrayref\n" |
|
1126
|
|
|
|
|
|
|
unless ref $rows eq 'ARRAY'; |
|
1127
|
11
|
|
|
|
|
26
|
my $cols = $self->columns; |
|
1128
|
11
|
|
|
|
|
13
|
my %cache; |
|
1129
|
11
|
|
|
|
|
11
|
my $out = ''; |
|
1130
|
11
|
|
|
|
|
23
|
for my $ri (0 .. $#$rows) { |
|
1131
|
14
|
|
|
|
|
28
|
my $row = $rows->[$ri]; |
|
1132
|
14
|
50
|
|
|
|
39
|
die "encode_row_binary: row $ri must be an arrayref\n" |
|
1133
|
|
|
|
|
|
|
unless ref $row eq 'ARRAY'; |
|
1134
|
14
|
100
|
|
|
|
31
|
die "encode_row_binary: row $ri has " . scalar(@$row) |
|
1135
|
|
|
|
|
|
|
. " values, expected " . scalar(@$cols) . "\n" |
|
1136
|
|
|
|
|
|
|
unless @$row == @$cols; |
|
1137
|
13
|
|
|
|
|
16
|
for my $ci (0 .. $#$cols) { |
|
1138
|
41
|
|
|
|
|
64
|
$out .= _rb_encode_value($cols->[$ci][1], $row->[$ci], \%cache); |
|
1139
|
|
|
|
|
|
|
} |
|
1140
|
|
|
|
|
|
|
} |
|
1141
|
6
|
|
|
|
|
41
|
return $out; |
|
1142
|
|
|
|
|
|
|
} |
|
1143
|
|
|
|
|
|
|
|
|
1144
|
|
|
|
|
|
|
# Byte length of one scalar/String/FixedString/Nullable value at |
|
1145
|
|
|
|
|
|
|
# $$bufref position $pos (does not advance). Array is handled by the |
|
1146
|
|
|
|
|
|
|
# caller's recursion, never reaching here. |
|
1147
|
|
|
|
|
|
|
sub _rb_value_len { |
|
1148
|
42
|
|
|
42
|
|
47
|
my ($type, $bufref, $pos) = @_; |
|
1149
|
42
|
100
|
|
|
|
52
|
if ($type =~ /\ANullable\((.+)\)\z/s) { |
|
1150
|
5
|
|
|
|
|
9
|
return 1 + _rb_value_len($1, $bufref, $pos + 1); |
|
1151
|
|
|
|
|
|
|
} |
|
1152
|
37
|
50
|
|
|
|
41
|
if ($type =~ /\ALowCardinality\((.+)\)\z/s) { |
|
1153
|
0
|
|
|
|
|
0
|
return _rb_value_len($1, $bufref, $pos); |
|
1154
|
|
|
|
|
|
|
} |
|
1155
|
37
|
100
|
|
|
|
46
|
if ($type eq 'String') { |
|
1156
|
7
|
|
|
|
|
13
|
my ($len, $after) = _rb_unpack_varint($$bufref, $pos); |
|
1157
|
7
|
|
|
|
|
10
|
return ($after - $pos) + $len; |
|
1158
|
|
|
|
|
|
|
} |
|
1159
|
30
|
100
|
|
|
|
55
|
if ($type =~ /\AFixedString\((\d+)\)\z/) { |
|
1160
|
2
|
|
|
|
|
6
|
return $1; |
|
1161
|
|
|
|
|
|
|
} |
|
1162
|
28
|
|
|
|
|
40
|
(my $base = $type) =~ s/\(.*//s; |
|
1163
|
28
|
50
|
|
|
|
35
|
if ($base eq 'Decimal') { |
|
1164
|
|
|
|
|
|
|
# Decimal(P, S): storage width follows the precision P. |
|
1165
|
0
|
|
|
|
|
0
|
my ($p) = $type =~ /\(\s*(\d+)/; |
|
1166
|
0
|
0
|
|
|
|
0
|
return $p <= 9 ? 4 : $p <= 18 ? 8 |
|
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
1167
|
|
|
|
|
|
|
: $p <= 38 ? 16 : 32; |
|
1168
|
|
|
|
|
|
|
} |
|
1169
|
28
|
|
|
|
|
35
|
my $w = $RB_FIXED_WIDTH{$base}; |
|
1170
|
28
|
50
|
|
|
|
41
|
die "decode_row_binary: cannot size unsupported type '$type'\n" |
|
1171
|
|
|
|
|
|
|
unless defined $w; |
|
1172
|
28
|
|
|
|
|
32
|
return $w; |
|
1173
|
|
|
|
|
|
|
} |
|
1174
|
|
|
|
|
|
|
|
|
1175
|
|
|
|
|
|
|
# Decode one value of $type at $$posref, advancing $$posref past it. |
|
1176
|
|
|
|
|
|
|
sub _rb_decode_value { |
|
1177
|
46
|
|
|
46
|
|
59
|
my ($type, $bufref, $posref, $cache) = @_; |
|
1178
|
46
|
100
|
|
|
|
68
|
if ($type =~ /\AArray\((.+)\)\z/s) { |
|
1179
|
7
|
|
|
|
|
10
|
my $inner = $1; |
|
1180
|
7
|
|
|
|
|
13
|
my ($n, $after) = _rb_unpack_varint($$bufref, $$posref); |
|
1181
|
7
|
|
|
|
|
7
|
$$posref = $after; |
|
1182
|
7
|
|
|
|
|
16
|
return [ map { _rb_decode_value($inner, $bufref, $posref, $cache) } |
|
|
9
|
|
|
|
|
17
|
|
|
1183
|
|
|
|
|
|
|
1 .. $n ]; |
|
1184
|
|
|
|
|
|
|
} |
|
1185
|
39
|
100
|
|
|
|
51
|
if ($type =~ /\ALowCardinality\((.+)\)\z/s) { |
|
1186
|
2
|
|
|
|
|
4
|
return _rb_decode_value($1, $bufref, $posref, $cache); |
|
1187
|
|
|
|
|
|
|
} |
|
1188
|
37
|
|
|
|
|
45
|
_rb_assert_scalar($type); |
|
1189
|
37
|
|
|
|
|
47
|
my $vlen = _rb_value_len($type, $bufref, $$posref); |
|
1190
|
37
|
|
|
|
|
50
|
my $vbytes = substr($$bufref, $$posref, $vlen); |
|
1191
|
37
|
50
|
|
|
|
47
|
die "decode_row_binary: truncated value for '$type' at offset $$posref\n" |
|
1192
|
|
|
|
|
|
|
if length($vbytes) != $vlen; |
|
1193
|
37
|
|
|
|
|
32
|
$$posref += $vlen; |
|
1194
|
37
|
|
66
|
|
|
97
|
my $prefix = $cache->{$type} ||= |
|
1195
|
|
|
|
|
|
|
"\x01\x01" . _rb_pack_string('c') . _rb_pack_string($type); |
|
1196
|
37
|
|
|
|
|
229
|
my $blk = ClickHouse::Encoder->decode_block($prefix . $vbytes); |
|
1197
|
37
|
|
|
|
|
142
|
return $blk->{columns}[0]{values}[0]; |
|
1198
|
|
|
|
|
|
|
} |
|
1199
|
|
|
|
|
|
|
|
|
1200
|
|
|
|
|
|
|
# Decode a RowBinary byte string into an arrayref of row arrayrefs. |
|
1201
|
|
|
|
|
|
|
# Call on an encoder instance whose column types match the producer. |
|
1202
|
|
|
|
|
|
|
sub decode_row_binary { |
|
1203
|
7
|
|
|
7
|
1
|
1517
|
my ($self, $bytes) = @_; |
|
1204
|
7
|
100
|
|
|
|
19
|
die "decode_row_binary: must be called on an encoder instance\n" |
|
1205
|
|
|
|
|
|
|
unless ref $self; |
|
1206
|
6
|
50
|
|
|
|
11
|
die "decode_row_binary: input must be defined\n" unless defined $bytes; |
|
1207
|
6
|
|
|
|
|
9
|
my $cols = $self->columns; |
|
1208
|
6
|
|
|
|
|
6
|
my $pos = 0; |
|
1209
|
6
|
|
|
|
|
6
|
my $len = length $bytes; |
|
1210
|
|
|
|
|
|
|
# With zero columns the inner per-column loop is a no-op, so a |
|
1211
|
|
|
|
|
|
|
# non-empty buffer would never make $pos advance - guard explicitly |
|
1212
|
|
|
|
|
|
|
# rather than spin forever. |
|
1213
|
6
|
100
|
100
|
|
|
22
|
die "decode_row_binary: encoder has no columns but $len bytes given\n" |
|
1214
|
|
|
|
|
|
|
if !@$cols && $len; |
|
1215
|
5
|
|
|
|
|
6
|
my %cache; |
|
1216
|
|
|
|
|
|
|
my @rows; |
|
1217
|
5
|
|
|
|
|
10
|
while ($pos < $len) { |
|
1218
|
7
|
|
|
|
|
7
|
my @row; |
|
1219
|
7
|
|
|
|
|
9
|
for my $col (@$cols) { |
|
1220
|
35
|
|
|
|
|
48
|
push @row, _rb_decode_value($col->[1], \$bytes, \$pos, \%cache); |
|
1221
|
|
|
|
|
|
|
} |
|
1222
|
7
|
|
|
|
|
14
|
push @rows, \@row; |
|
1223
|
|
|
|
|
|
|
} |
|
1224
|
5
|
|
|
|
|
23
|
return \@rows; |
|
1225
|
|
|
|
|
|
|
} |
|
1226
|
|
|
|
|
|
|
|
|
1227
|
|
|
|
|
|
|
# Post-process a decoded block (or any one column's values arrayref): |
|
1228
|
|
|
|
|
|
|
# rewrite Date / Date32 / DateTime / DateTime64 integer epochs into |
|
1229
|
|
|
|
|
|
|
# ISO 8601 strings or Time::Moment instances. Modifies the block in |
|
1230
|
|
|
|
|
|
|
# place AND returns it so the call can be chained. as => 'iso' (the |
|
1231
|
|
|
|
|
|
|
# default) emits UTC strings with a 'Z' suffix; as => 'datetime' |
|
1232
|
|
|
|
|
|
|
# returns Time::Moment objects (requires Time::Moment installed). |
|
1233
|
|
|
|
|
|
|
# DateTime64 precision is read from the column's type string so each |
|
1234
|
|
|
|
|
|
|
# tick converts to the correct number of fractional digits. |
|
1235
|
|
|
|
|
|
|
sub coerce_datetimes { |
|
1236
|
9
|
|
|
9
|
1
|
303418
|
my ($class_or_self, $block, %opts) = @_; |
|
1237
|
9
|
|
100
|
|
|
74
|
my $as = $opts{as} // 'iso'; |
|
1238
|
9
|
100
|
66
|
|
|
47
|
die "coerce_datetimes: 'as' must be 'iso' or 'datetime' (got '$as')\n" |
|
1239
|
|
|
|
|
|
|
unless $as eq 'iso' || $as eq 'datetime'; |
|
1240
|
|
|
|
|
|
|
|
|
1241
|
7
|
50
|
|
|
|
17
|
if ($as eq 'datetime') { |
|
1242
|
0
|
|
|
|
|
0
|
require Time::Moment; |
|
1243
|
|
|
|
|
|
|
} else { |
|
1244
|
7
|
|
|
|
|
1103
|
require POSIX; |
|
1245
|
|
|
|
|
|
|
} |
|
1246
|
|
|
|
|
|
|
|
|
1247
|
7
|
|
|
|
|
12060
|
for my $col (@{ $block->{columns} }) { |
|
|
7
|
|
|
|
|
23
|
|
|
1248
|
19
|
100
|
|
|
|
46
|
next if $col->{skipped}; |
|
1249
|
18
|
|
|
|
|
28
|
my $type = $col->{type}; |
|
1250
|
18
|
|
|
|
|
41
|
my $vals = $col->{values}; |
|
1251
|
|
|
|
|
|
|
|
|
1252
|
|
|
|
|
|
|
# Strip Nullable() wrapping so the inner type matches below. |
|
1253
|
|
|
|
|
|
|
# Nullable values come through as undef in the values array; |
|
1254
|
|
|
|
|
|
|
# the loops already skip undef. |
|
1255
|
18
|
100
|
|
|
|
50
|
$type = $1 if $type =~ /^Nullable\((.*)\)\z/; |
|
1256
|
|
|
|
|
|
|
|
|
1257
|
18
|
100
|
100
|
|
|
114
|
if ($type eq 'Date' || $type eq 'Date32') { |
|
|
|
100
|
100
|
|
|
|
|
|
|
|
100
|
|
|
|
|
|
|
1258
|
4
|
|
|
|
|
6
|
for my $v (@$vals) { |
|
1259
|
6
|
50
|
|
|
|
13
|
next unless defined $v; |
|
1260
|
6
|
|
|
|
|
19
|
$v = _epoch_to_string($v * 86400, 0, $as, 'Y-m-d'); |
|
1261
|
|
|
|
|
|
|
} |
|
1262
|
|
|
|
|
|
|
} |
|
1263
|
|
|
|
|
|
|
elsif ($type eq 'DateTime' || $type =~ /^DateTime\(/) { |
|
1264
|
6
|
|
|
|
|
15
|
for my $v (@$vals) { |
|
1265
|
10
|
100
|
|
|
|
28
|
next unless defined $v; |
|
1266
|
8
|
|
|
|
|
26
|
$v = _epoch_to_string($v, 0, $as, 'iso'); |
|
1267
|
|
|
|
|
|
|
} |
|
1268
|
|
|
|
|
|
|
} |
|
1269
|
|
|
|
|
|
|
elsif ($type =~ /^DateTime64\((\d+)/) { |
|
1270
|
6
|
|
|
|
|
16
|
my $precision = $1; |
|
1271
|
6
|
|
|
|
|
13
|
my $scale = 10 ** $precision; |
|
1272
|
6
|
|
|
|
|
10
|
for my $v (@$vals) { |
|
1273
|
7
|
50
|
|
|
|
14
|
next unless defined $v; |
|
1274
|
|
|
|
|
|
|
# Decoded DateTime64 is an integer count of (10^precision) |
|
1275
|
|
|
|
|
|
|
# ticks since the Unix epoch (a signed int64). |
|
1276
|
68
|
|
|
68
|
|
31386
|
use integer; |
|
|
68
|
|
|
|
|
910
|
|
|
|
68
|
|
|
|
|
277
|
|
|
1277
|
7
|
|
|
|
|
13
|
my $secs = int($v / $scale); |
|
1278
|
68
|
|
|
68
|
|
2729
|
no integer; |
|
|
68
|
|
|
|
|
125
|
|
|
|
68
|
|
|
|
|
193
|
|
|
1279
|
7
|
|
|
|
|
12
|
my $frac = $v - $secs * $scale; |
|
1280
|
|
|
|
|
|
|
# Normalize negative fractional tail to a positive frac |
|
1281
|
|
|
|
|
|
|
# below the integer epoch. |
|
1282
|
7
|
100
|
|
|
|
14
|
if ($frac < 0) { $frac += $scale; $secs -= 1 } |
|
|
1
|
|
|
|
|
2
|
|
|
|
1
|
|
|
|
|
1
|
|
|
1283
|
7
|
|
|
|
|
17
|
$v = _epoch_to_string($secs, $frac, $as, |
|
1284
|
|
|
|
|
|
|
'iso', $precision); |
|
1285
|
|
|
|
|
|
|
} |
|
1286
|
|
|
|
|
|
|
} |
|
1287
|
|
|
|
|
|
|
# other columns (non-time) untouched |
|
1288
|
|
|
|
|
|
|
} |
|
1289
|
7
|
|
|
|
|
25
|
return $block; |
|
1290
|
|
|
|
|
|
|
} |
|
1291
|
|
|
|
|
|
|
|
|
1292
|
|
|
|
|
|
|
# Format a (whole_seconds, fractional_ticks_at_$precision) pair under |
|
1293
|
|
|
|
|
|
|
# either 'iso' or 'datetime' modes. Internal helper; the strftime is |
|
1294
|
|
|
|
|
|
|
# UTC-only on purpose (CH itself stores UTC ticks; per-column timezone |
|
1295
|
|
|
|
|
|
|
# is a display concern handled separately by the user if needed). |
|
1296
|
|
|
|
|
|
|
sub _epoch_to_string { |
|
1297
|
21
|
|
|
21
|
|
38
|
my ($secs, $frac_ticks, $as, $shape, $precision) = @_; |
|
1298
|
21
|
50
|
|
|
|
35
|
if ($as eq 'datetime') { |
|
1299
|
0
|
0
|
0
|
|
|
0
|
if ($shape ne 'Y-m-d' && $precision) { |
|
1300
|
|
|
|
|
|
|
# Time::Moment uses nanosecond precision; widen the ticks |
|
1301
|
|
|
|
|
|
|
# accordingly. Precision > 9 isn't supported here. |
|
1302
|
0
|
|
|
|
|
0
|
my $ns = $frac_ticks * (10 ** (9 - $precision)); |
|
1303
|
0
|
|
|
|
|
0
|
return Time::Moment->from_epoch($secs, $ns); |
|
1304
|
|
|
|
|
|
|
} |
|
1305
|
0
|
|
|
|
|
0
|
return Time::Moment->from_epoch($secs); |
|
1306
|
|
|
|
|
|
|
} |
|
1307
|
|
|
|
|
|
|
# 'iso' string form |
|
1308
|
21
|
|
|
|
|
76
|
my @t = gmtime $secs; |
|
1309
|
21
|
100
|
|
|
|
35
|
if ($shape eq 'Y-m-d') { |
|
1310
|
6
|
|
|
|
|
85
|
return POSIX::strftime('%Y-%m-%d', @t); |
|
1311
|
|
|
|
|
|
|
} |
|
1312
|
15
|
|
|
|
|
340
|
my $base = POSIX::strftime('%Y-%m-%dT%H:%M:%S', @t); |
|
1313
|
15
|
100
|
|
|
|
42
|
if ($precision) { |
|
1314
|
6
|
|
|
|
|
21
|
$base .= sprintf('.%0*d', $precision, $frac_ticks); |
|
1315
|
|
|
|
|
|
|
} |
|
1316
|
15
|
|
|
|
|
62
|
return $base . 'Z'; |
|
1317
|
|
|
|
|
|
|
} |
|
1318
|
|
|
|
|
|
|
|
|
1319
|
|
|
|
|
|
|
# Static per-type byte budgets used by estimate_size. Returns the |
|
1320
|
|
|
|
|
|
|
# bytes-per-row for fixed-width types; for variable types the second |
|
1321
|
|
|
|
|
|
|
# return value is an "average string size" heuristic the caller can |
|
1322
|
|
|
|
|
|
|
# use, and undef means "walk the actual values for accuracy". The |
|
1323
|
|
|
|
|
|
|
# table is intentionally coarse - the goal is order-of-magnitude |
|
1324
|
|
|
|
|
|
|
# sizing for batch-split decisions, not byte-exact accounting. |
|
1325
|
|
|
|
|
|
|
my %FIXED_TYPE_BYTES = ( |
|
1326
|
|
|
|
|
|
|
Int8 => 1, UInt8 => 1, Bool => 1, Boolean => 1, |
|
1327
|
|
|
|
|
|
|
Int16 => 2, UInt16 => 2, Date => 2, |
|
1328
|
|
|
|
|
|
|
Int32 => 4, UInt32 => 4, Float32 => 4, BFloat16 => 2, |
|
1329
|
|
|
|
|
|
|
Date32 => 4, DateTime => 4, IPv4 => 4, Decimal32 => 4, |
|
1330
|
|
|
|
|
|
|
Enum8 => 1, Enum16 => 2, |
|
1331
|
|
|
|
|
|
|
Int64 => 8, UInt64 => 8, Float64 => 8, DateTime64 => 8, |
|
1332
|
|
|
|
|
|
|
Decimal64 => 8, |
|
1333
|
|
|
|
|
|
|
UUID => 16, IPv6 => 16, Decimal128 => 16, |
|
1334
|
|
|
|
|
|
|
Decimal256 => 32, |
|
1335
|
|
|
|
|
|
|
# Geo aliases: |
|
1336
|
|
|
|
|
|
|
Point => 16, Ring => 64, LineString => 64, |
|
1337
|
|
|
|
|
|
|
MultiLineString => 256, Polygon => 256, MultiPolygon => 256, |
|
1338
|
|
|
|
|
|
|
); |
|
1339
|
|
|
|
|
|
|
|
|
1340
|
|
|
|
|
|
|
sub _type_byte_estimate { |
|
1341
|
25
|
|
|
25
|
|
38
|
my ($type, $avg_str) = @_; |
|
1342
|
25
|
|
50
|
|
|
45
|
$avg_str //= 16; |
|
1343
|
|
|
|
|
|
|
# Strip outer parens/args for parametric prefix match. |
|
1344
|
25
|
|
|
|
|
52
|
my $base = $type; |
|
1345
|
25
|
|
|
|
|
58
|
$base =~ s/\(.*\z//s; |
|
1346
|
25
|
100
|
|
|
|
115
|
return $FIXED_TYPE_BYTES{$base} if exists $FIXED_TYPE_BYTES{$base}; |
|
1347
|
|
|
|
|
|
|
|
|
1348
|
11
|
100
|
|
|
|
27
|
if ($base eq 'String') { |
|
1349
|
6
|
|
|
|
|
27
|
return $avg_str + 1; # +1 for varint length prefix (small lens) |
|
1350
|
|
|
|
|
|
|
} |
|
1351
|
5
|
100
|
|
|
|
13
|
if ($base eq 'FixedString') { |
|
1352
|
1
|
|
|
|
|
7
|
my ($n) = $type =~ /^FixedString\((\d+)\)/; |
|
1353
|
1
|
|
33
|
|
|
9
|
return $n // $avg_str; |
|
1354
|
|
|
|
|
|
|
} |
|
1355
|
4
|
50
|
|
|
|
11
|
if ($base eq 'Nullable') { |
|
1356
|
0
|
|
|
|
|
0
|
my ($inner) = $type =~ /^Nullable\((.+)\)$/; |
|
1357
|
0
|
|
|
|
|
0
|
return 1 + _type_byte_estimate($inner, $avg_str); |
|
1358
|
|
|
|
|
|
|
} |
|
1359
|
4
|
100
|
66
|
|
|
19
|
if ($base eq 'Array' || $base eq 'Map') { |
|
1360
|
|
|
|
|
|
|
# 8-byte offset per row + N_avg(=4) inner elements. |
|
1361
|
1
|
|
|
|
|
8
|
my ($inner) = $type =~ /^\w+\((.+)\)$/; |
|
1362
|
1
|
|
|
|
|
7
|
return 8 + 4 * _type_byte_estimate($inner, $avg_str); |
|
1363
|
|
|
|
|
|
|
} |
|
1364
|
3
|
50
|
|
|
|
8
|
if ($base eq 'Tuple') { |
|
1365
|
0
|
|
|
|
|
0
|
my ($body) = $type =~ /^Tuple\((.+)\)$/; |
|
1366
|
0
|
0
|
|
|
|
0
|
return 0 unless defined $body; |
|
1367
|
0
|
|
|
|
|
0
|
my @parts = _split_paren_list($body); |
|
1368
|
0
|
|
|
|
|
0
|
my $sum = 0; |
|
1369
|
0
|
|
|
|
|
0
|
for my $p (@parts) { |
|
1370
|
|
|
|
|
|
|
# Tuple elements may be named: "name Type" |
|
1371
|
0
|
|
|
|
|
0
|
$p =~ s/^[A-Za-z_]\w*\s+//; |
|
1372
|
0
|
|
|
|
|
0
|
$sum += _type_byte_estimate($p, $avg_str); |
|
1373
|
|
|
|
|
|
|
} |
|
1374
|
0
|
|
|
|
|
0
|
return $sum; |
|
1375
|
|
|
|
|
|
|
} |
|
1376
|
|
|
|
|
|
|
# LowCardinality: dict + per-row 1-byte index (typical low cardinality). |
|
1377
|
|
|
|
|
|
|
# Variant: 1 disc byte + (heuristically) inner avg. |
|
1378
|
3
|
100
|
66
|
|
|
16
|
return 1 + $avg_str if $base eq 'LowCardinality' || $base eq 'Variant'; |
|
1379
|
|
|
|
|
|
|
# JSON/Object/Dynamic: shape-dependent. Heuristic: roughly two |
|
1380
|
|
|
|
|
|
|
# avg_string_size payloads per row (one for path machinery + one |
|
1381
|
|
|
|
|
|
|
# for value bytes), so the caller's avg_string_size override |
|
1382
|
|
|
|
|
|
|
# actually moves the estimate. |
|
1383
|
2
|
0
|
33
|
|
|
10
|
return 2 * $avg_str if $base eq 'JSON' || $base eq 'Object' || $base eq 'Dynamic'; |
|
|
|
|
33
|
|
|
|
|
|
1384
|
|
|
|
|
|
|
# SimpleAggregateFunction(func, T) -> inner T. |
|
1385
|
0
|
0
|
|
|
|
0
|
if ($base eq 'SimpleAggregateFunction') { |
|
1386
|
0
|
|
|
|
|
0
|
my ($inner) = $type =~ /^SimpleAggregateFunction\([^,]+,\s*(.+)\)$/; |
|
1387
|
0
|
0
|
|
|
|
0
|
return _type_byte_estimate($inner, $avg_str) if defined $inner; |
|
1388
|
|
|
|
|
|
|
} |
|
1389
|
0
|
|
|
|
|
0
|
return $avg_str; # unknown -> conservative |
|
1390
|
|
|
|
|
|
|
} |
|
1391
|
|
|
|
|
|
|
|
|
1392
|
|
|
|
|
|
|
# Coarse byte-size estimate for an encoded block, parameterized on |
|
1393
|
|
|
|
|
|
|
# row count (an integer) or arrayref (counted). Uses per-type byte |
|
1394
|
|
|
|
|
|
|
# budgets; variable-length types use a 16-byte average per value |
|
1395
|
|
|
|
|
|
|
# (override via $avg_str). For batch-size decisions ("is this 1 MiB |
|
1396
|
|
|
|
|
|
|
# or 100 MiB before I compress?"). NOT byte-exact: a String row with |
|
1397
|
|
|
|
|
|
|
# a 10 KiB blob will be undercounted. Run encode() for the real size. |
|
1398
|
|
|
|
|
|
|
sub estimate_size { |
|
1399
|
19
|
|
|
19
|
1
|
152539
|
my ($self, $rows_or_n, %opts) = @_; |
|
1400
|
19
|
100
|
|
|
|
52
|
my $n = ref $rows_or_n eq 'ARRAY' ? scalar @$rows_or_n : $rows_or_n; |
|
1401
|
19
|
|
100
|
|
|
65
|
my $avg_str = $opts{avg_string_size} // 16; |
|
1402
|
19
|
|
|
|
|
43
|
my $cols = $self->columns; |
|
1403
|
19
|
|
|
|
|
25
|
my $total = 4; # block header (ncols + nrows varints, ~tiny) |
|
1404
|
19
|
|
|
|
|
35
|
for my $c (@$cols) { |
|
1405
|
24
|
|
|
|
|
46
|
my ($name, $type) = @$c; |
|
1406
|
24
|
|
|
|
|
38
|
$total += length($name) + length($type) + 2; # lenstr headers |
|
1407
|
24
|
|
|
|
|
43
|
$total += $n * _type_byte_estimate($type, $avg_str); |
|
1408
|
|
|
|
|
|
|
} |
|
1409
|
19
|
|
|
|
|
65
|
return $total; |
|
1410
|
|
|
|
|
|
|
} |
|
1411
|
|
|
|
|
|
|
|
|
1412
|
|
|
|
|
|
|
# Return a configured encoder for the column shape produced by an |
|
1413
|
|
|
|
|
|
|
# arbitrary select. Runs `describe ($sql)` via the same `via=>...` |
|
1414
|
|
|
|
|
|
|
# transport as for_table. Useful when the schema isn't a real table. |
|
1415
|
|
|
|
|
|
|
sub for_query { |
|
1416
|
0
|
|
|
0
|
1
|
0
|
my ($class, $sql, %opts) = @_; |
|
1417
|
|
|
|
|
|
|
# ClickHouse's describe accepts a subquery, but the SQL must not |
|
1418
|
|
|
|
|
|
|
# contain unmatched parentheses; let the server reject malformed |
|
1419
|
|
|
|
|
|
|
# queries rather than re-implementing SQL validation here. |
|
1420
|
0
|
|
|
|
|
0
|
my $describe = "describe ($sql)"; |
|
1421
|
0
|
|
|
|
|
0
|
return $class->_for_describe($describe, %opts); |
|
1422
|
|
|
|
|
|
|
} |
|
1423
|
|
|
|
|
|
|
|
|
1424
|
|
|
|
|
|
|
# Issue an insert ... format native over HTTP using HTTP::Tiny. Returns |
|
1425
|
|
|
|
|
|
|
# the response hashref from HTTP::Tiny (->{success}, ->{status}, |
|
1426
|
|
|
|
|
|
|
# ->{content}). Compresses with zstd/gzip if `compress` is set; takes |
|
1427
|
|
|
|
|
|
|
# whatever encoder produces (so `for_table` + rows is the typical |
|
1428
|
|
|
|
|
|
|
# combination). Does not retry; the caller does HTTP-level error policy. |
|
1429
|
|
|
|
|
|
|
# Set up URL + headers for an insert ... format native HTTP request. |
|
1430
|
|
|
|
|
|
|
# Shared by insert_http and BulkInserter::new. Validates the table name |
|
1431
|
|
|
|
|
|
|
# and stamps the Content-Type / Content-Encoding headers as needed. |
|
1432
|
|
|
|
|
|
|
sub _build_insert_endpoint { |
|
1433
|
16
|
|
|
16
|
|
505
|
my ($table, $compress, %args) = @_; |
|
1434
|
16
|
|
|
|
|
48
|
_validate_table_name($table); |
|
1435
|
14
|
50
|
66
|
|
|
103
|
die "unknown compress='$compress' " |
|
|
|
|
66
|
|
|
|
|
|
1436
|
|
|
|
|
|
|
. "(expected 'raw', 'zstd', or 'gzip')\n" |
|
1437
|
|
|
|
|
|
|
unless $compress eq 'raw' || $compress eq 'zstd' |
|
1438
|
|
|
|
|
|
|
|| $compress eq 'gzip'; |
|
1439
|
12
|
|
|
|
|
51
|
my ($url, $hdr) = _http_url_headers( |
|
1440
|
|
|
|
|
|
|
"insert into $table format native", %args); |
|
1441
|
12
|
|
|
|
|
30
|
$hdr->{'Content-Type'} = 'application/octet-stream'; |
|
1442
|
12
|
50
|
33
|
|
|
45
|
$hdr->{'Content-Encoding'} = $compress |
|
1443
|
|
|
|
|
|
|
if $compress eq 'zstd' || $compress eq 'gzip'; |
|
1444
|
12
|
|
|
|
|
34
|
return ($url, $hdr); |
|
1445
|
|
|
|
|
|
|
} |
|
1446
|
|
|
|
|
|
|
|
|
1447
|
|
|
|
|
|
|
# Apply zstd/gzip compression to $body in place (or pass through for 'raw'). |
|
1448
|
|
|
|
|
|
|
# $compress is validated upstream by _build_insert_endpoint; we trust it |
|
1449
|
|
|
|
|
|
|
# here. $origin is the class used to resolve compressed_writer (so the |
|
1450
|
|
|
|
|
|
|
# helper works for class-method and instance callers alike). |
|
1451
|
|
|
|
|
|
|
sub _apply_compression { |
|
1452
|
13
|
|
|
13
|
|
26
|
my ($origin, $compress, $body) = @_; |
|
1453
|
13
|
50
|
|
|
|
39
|
return $body if $compress eq 'raw'; |
|
1454
|
0
|
|
|
|
|
0
|
my $compressed; |
|
1455
|
|
|
|
|
|
|
my $wrap = $origin->compressed_writer( |
|
1456
|
0
|
|
|
0
|
|
0
|
$compress, sub { $compressed = $_[0] }); |
|
|
0
|
|
|
|
|
0
|
|
|
1457
|
0
|
|
|
|
|
0
|
$wrap->($body); |
|
1458
|
0
|
|
|
|
|
0
|
return $compressed; |
|
1459
|
|
|
|
|
|
|
} |
|
1460
|
|
|
|
|
|
|
|
|
1461
|
|
|
|
|
|
|
sub insert_http { |
|
1462
|
3
|
|
|
3
|
1
|
135668
|
my ($class_or_self, %args) = @_; |
|
1463
|
3
|
|
33
|
|
|
10
|
my $enc = $args{encoder} // do { |
|
1464
|
3
|
50
|
|
|
|
7
|
my $cols = $args{columns} or die "insert_http needs columns or encoder"; |
|
1465
|
3
|
|
|
|
|
50
|
$class_or_self->new(columns => $cols); |
|
1466
|
|
|
|
|
|
|
}; |
|
1467
|
3
|
100
|
|
|
|
16
|
my $rows = $args{rows} or die "insert_http needs rows arrayref"; |
|
1468
|
2
|
50
|
|
|
|
4
|
my $table = $args{table} or die "insert_http needs table"; |
|
1469
|
2
|
|
50
|
|
|
6
|
my $timeout = $args{timeout} // 60; |
|
1470
|
2
|
|
100
|
|
|
6
|
my $compress = $args{compress} // 'raw'; |
|
1471
|
2
|
|
33
|
|
|
13
|
my $origin = ref $class_or_self || $class_or_self; |
|
1472
|
|
|
|
|
|
|
|
|
1473
|
2
|
|
|
|
|
6
|
my ($url, $hdr) = _build_insert_endpoint($table, $compress, %args); |
|
1474
|
0
|
|
|
|
|
0
|
my $body = _apply_compression($origin, $compress, $enc->encode($rows)); |
|
1475
|
|
|
|
|
|
|
|
|
1476
|
0
|
|
|
|
|
0
|
my $resp = _http_tiny(%args, timeout => $timeout) |
|
1477
|
|
|
|
|
|
|
->post($url, { headers => $hdr, content => $body }); |
|
1478
|
0
|
|
|
|
|
0
|
return _decorate_response($resp); |
|
1479
|
|
|
|
|
|
|
} |
|
1480
|
|
|
|
|
|
|
|
|
1481
|
|
|
|
|
|
|
# Stream a select response: POST the SQL with default_format=Native, |
|
1482
|
|
|
|
|
|
|
# feed the response chunks into a sliding buffer, decode complete blocks |
|
1483
|
|
|
|
|
|
|
# as they arrive, and pass each one to $opts{on_block}. Memory stays |
|
1484
|
|
|
|
|
|
|
# bounded by chunk_size + one block, so this is the right entry point |
|
1485
|
|
|
|
|
|
|
# for selects that won't fit in memory. The user's $sql must NOT include |
|
1486
|
|
|
|
|
|
|
# a format clause - this helper always requests format Native. |
|
1487
|
|
|
|
|
|
|
sub select_blocks { |
|
1488
|
5
|
|
|
5
|
1
|
176248
|
my ($class, $sql, %opts) = @_; |
|
1489
|
|
|
|
|
|
|
my $cb = $opts{on_block} |
|
1490
|
5
|
100
|
|
|
|
19
|
or die "select_blocks: 'on_block' coderef required\n"; |
|
1491
|
4
|
100
|
|
|
|
39
|
die "select_blocks: \$sql should not include a format clause " |
|
1492
|
|
|
|
|
|
|
. "(select_blocks always requests format Native)\n" |
|
1493
|
|
|
|
|
|
|
if $sql =~ /\bformat\s+\w+\s*\z/i; |
|
1494
|
1
|
|
|
|
|
3
|
my $keep = $opts{keep}; |
|
1495
|
1
|
|
|
|
|
3
|
my $decompress = $opts{decompress}; |
|
1496
|
|
|
|
|
|
|
|
|
1497
|
|
|
|
|
|
|
# Build URL+headers via the same helper insert_http uses, but with an |
|
1498
|
|
|
|
|
|
|
# empty SQL placeholder; we POST the SQL as the request body. Adding |
|
1499
|
|
|
|
|
|
|
# default_format=Native ensures the response is Native bytes even if |
|
1500
|
|
|
|
|
|
|
# the user's SQL doesn't terminate with format. |
|
1501
|
1
|
|
|
|
|
3
|
my %h_opts = %opts; |
|
1502
|
|
|
|
|
|
|
# Drop keys this method consumes; also drop dedup_token, which is |
|
1503
|
|
|
|
|
|
|
# meaningful only on insert (would be silently dead weight on the |
|
1504
|
|
|
|
|
|
|
# select URL otherwise and could mask a typo by the caller). |
|
1505
|
1
|
|
|
|
|
5
|
delete @h_opts{qw(on_block keep timeout decompress dedup_token)}; |
|
1506
|
1
|
|
|
|
|
4
|
my ($url, $hdr) = _http_url_headers('', %h_opts); |
|
1507
|
1
|
|
|
|
|
2
|
$url .= '&default_format=Native'; |
|
1508
|
|
|
|
|
|
|
# When decompress=1 is requested the server wraps each Native block |
|
1509
|
|
|
|
|
|
|
# in its compressed-block framing (X-ClickHouse-Compressed header). |
|
1510
|
|
|
|
|
|
|
# Add ?compress=1 to the URL so CH knows to compress the response. |
|
1511
|
1
|
50
|
|
|
|
2
|
$url .= '&compress=1' if $decompress; |
|
1512
|
|
|
|
|
|
|
|
|
1513
|
1
|
|
|
|
|
1
|
my $buf = ''; |
|
1514
|
|
|
|
|
|
|
# Block walker: when decompress is set, walk through compressed-block- |
|
1515
|
|
|
|
|
|
|
# framing entries and feed the decompressed bytes into a second |
|
1516
|
|
|
|
|
|
|
# accumulator that decode_block reads. Otherwise feed buf directly. |
|
1517
|
1
|
|
|
|
|
1
|
my $inner_buf = ''; |
|
1518
|
|
|
|
|
|
|
my $drain = sub { |
|
1519
|
|
|
|
|
|
|
# Phase 1: pull compressed-block frames out of $buf into $inner_buf |
|
1520
|
1
|
50
|
|
1
|
|
2
|
if ($decompress) { |
|
1521
|
0
|
|
|
|
|
0
|
while (length($buf) >= 25) { # 16 hash + 9 header minimum |
|
1522
|
0
|
|
|
|
|
0
|
my $csize = unpack 'V', substr($buf, 17, 4); |
|
1523
|
0
|
0
|
|
|
|
0
|
last if length($buf) < 16 + $csize; |
|
1524
|
0
|
|
|
|
|
0
|
my ($plain, $consumed) = |
|
1525
|
|
|
|
|
|
|
$class->decompress_native_block($buf); |
|
1526
|
0
|
|
|
|
|
0
|
$inner_buf .= $plain; |
|
1527
|
0
|
|
|
|
|
0
|
substr($buf, 0, $consumed, ''); |
|
1528
|
|
|
|
|
|
|
} |
|
1529
|
|
|
|
|
|
|
} else { |
|
1530
|
1
|
|
|
|
|
2
|
$inner_buf = $buf; |
|
1531
|
|
|
|
|
|
|
} |
|
1532
|
|
|
|
|
|
|
# Phase 2: decode whole Native blocks out of $inner_buf |
|
1533
|
1
|
|
|
|
|
3
|
while (length($inner_buf) > 0) { |
|
1534
|
0
|
|
|
|
|
0
|
my $block = eval { $class->decode_block($inner_buf, 0, $keep) }; |
|
|
0
|
|
|
|
|
0
|
|
|
1535
|
0
|
0
|
|
|
|
0
|
if ($@) { |
|
1536
|
0
|
0
|
|
|
|
0
|
last if $@ =~ /buffer truncated/i; |
|
1537
|
0
|
|
|
|
|
0
|
die $@; |
|
1538
|
|
|
|
|
|
|
} |
|
1539
|
0
|
|
|
|
|
0
|
$cb->($block); |
|
1540
|
0
|
|
|
|
|
0
|
substr($inner_buf, 0, $block->{consumed}, ''); |
|
1541
|
|
|
|
|
|
|
} |
|
1542
|
|
|
|
|
|
|
# When not decompressing, inner_buf IS buf; carry the residual |
|
1543
|
|
|
|
|
|
|
# back so the next data_callback append sees the unconsumed tail. |
|
1544
|
1
|
50
|
|
|
|
2
|
if (!$decompress) { |
|
1545
|
1
|
|
|
|
|
2
|
$buf = $inner_buf; |
|
1546
|
|
|
|
|
|
|
} |
|
1547
|
1
|
|
|
|
|
4
|
}; |
|
1548
|
|
|
|
|
|
|
|
|
1549
|
|
|
|
|
|
|
my $resp = _http_tiny(%opts, timeout => $opts{timeout} // 60)->post( |
|
1550
|
|
|
|
|
|
|
$url, |
|
1551
|
|
|
|
|
|
|
{ content => $sql, |
|
1552
|
|
|
|
|
|
|
headers => { %$hdr, 'Content-Type' => 'text/plain' }, |
|
1553
|
0
|
|
|
0
|
|
0
|
data_callback => sub { $buf .= $_[0]; $drain->() }, |
|
|
0
|
|
|
|
|
0
|
|
|
1554
|
1
|
|
50
|
|
|
7
|
}); |
|
1555
|
|
|
|
|
|
|
|
|
1556
|
|
|
|
|
|
|
die "select_blocks: HTTP $resp->{status}: $resp->{content}\n" |
|
1557
|
1
|
50
|
|
|
|
111
|
unless $resp->{success}; |
|
1558
|
|
|
|
|
|
|
|
|
1559
|
1
|
|
|
|
|
2
|
$drain->(); |
|
1560
|
1
|
50
|
33
|
|
|
2
|
die "select_blocks: " . length($buf) . " trailing bytes " |
|
1561
|
|
|
|
|
|
|
. "after last complete compressed block\n" |
|
1562
|
|
|
|
|
|
|
if $decompress && length $buf; |
|
1563
|
1
|
50
|
|
|
|
3
|
die "select_blocks: " . length($inner_buf) . " trailing bytes " |
|
1564
|
|
|
|
|
|
|
. "after last complete block\n" |
|
1565
|
|
|
|
|
|
|
if length $inner_buf; |
|
1566
|
1
|
|
|
|
|
10
|
return; |
|
1567
|
|
|
|
|
|
|
} |
|
1568
|
|
|
|
|
|
|
|
|
1569
|
|
|
|
|
|
|
# Returns a bulk-inserter object: ->push($row), ->push_many(\@rows), |
|
1570
|
|
|
|
|
|
|
# ->flush (idempotent), ->finish. Holds a single HTTP::Tiny instance |
|
1571
|
|
|
|
|
|
|
# across batches (so keepalive applies) and auto-flushes when the |
|
1572
|
|
|
|
|
|
|
# accumulated row count crosses batch_size. Transient HTTP failures |
|
1573
|
|
|
|
|
|
|
# (5xx, network errors) are retried up to retries times with linear |
|
1574
|
|
|
|
|
|
|
# backoff; 4xx errors die immediately. |
|
1575
|
|
|
|
|
|
|
sub bulk_inserter { |
|
1576
|
15
|
|
|
15
|
1
|
561051
|
my ($class_or_self, %args) = @_; |
|
1577
|
15
|
|
|
|
|
81
|
return ClickHouse::Encoder::BulkInserter->new(%args, |
|
1578
|
|
|
|
|
|
|
_origin => $class_or_self); |
|
1579
|
|
|
|
|
|
|
} |
|
1580
|
|
|
|
|
|
|
|
|
1581
|
|
|
|
|
|
|
package ClickHouse::Encoder::BulkInserter; ## no critic (ProhibitMultiplePackages) |
|
1582
|
|
|
|
|
|
|
|
|
1583
|
|
|
|
|
|
|
sub new { |
|
1584
|
15
|
|
|
15
|
|
47
|
my ($class, %args) = @_; |
|
1585
|
15
|
|
|
|
|
30
|
my $origin_raw = delete $args{_origin}; |
|
1586
|
15
|
|
50
|
|
|
85
|
my $origin = (ref $origin_raw || $origin_raw) || 'ClickHouse::Encoder'; |
|
1587
|
15
|
|
33
|
|
|
42
|
my $enc = $args{encoder} // do { |
|
1588
|
15
|
100
|
|
|
|
64
|
my $cols = $args{columns} or die "bulk_inserter needs columns or encoder"; |
|
1589
|
14
|
|
|
|
|
236
|
$origin->new(columns => $cols); |
|
1590
|
|
|
|
|
|
|
}; |
|
1591
|
14
|
100
|
|
|
|
51
|
my $table = $args{table} or die "bulk_inserter needs table"; |
|
1592
|
13
|
|
100
|
|
|
45
|
my $compress = $args{compress} // 'raw'; |
|
1593
|
13
|
|
50
|
|
|
42
|
my $timeout = $args{timeout} // 60; |
|
1594
|
|
|
|
|
|
|
|
|
1595
|
13
|
|
|
|
|
45
|
my ($url, $hdr) = ClickHouse::Encoder::_build_insert_endpoint( |
|
1596
|
|
|
|
|
|
|
$table, $compress, %args); |
|
1597
|
|
|
|
|
|
|
|
|
1598
|
|
|
|
|
|
|
return bless { |
|
1599
|
|
|
|
|
|
|
enc => $enc, |
|
1600
|
|
|
|
|
|
|
url => $url, |
|
1601
|
|
|
|
|
|
|
hdr => $hdr, |
|
1602
|
|
|
|
|
|
|
rows => [], |
|
1603
|
|
|
|
|
|
|
batch_size => $args{batch_size} // 10_000, |
|
1604
|
|
|
|
|
|
|
retries => $args{retries} // 3, |
|
1605
|
|
|
|
|
|
|
retry_wait => $args{retry_wait} // 0.5, |
|
1606
|
11
|
|
100
|
|
|
110
|
retry_max_wait => $args{retry_max_wait} // 30, |
|
|
|
|
100
|
|
|
|
|
|
|
|
|
100
|
|
|
|
|
|
|
|
|
100
|
|
|
|
|
|
1607
|
|
|
|
|
|
|
compress => $compress, |
|
1608
|
|
|
|
|
|
|
http => ClickHouse::Encoder::_http_tiny( |
|
1609
|
|
|
|
|
|
|
%args, timeout => $timeout, keep_alive => 1), |
|
1610
|
|
|
|
|
|
|
origin => $origin, |
|
1611
|
|
|
|
|
|
|
sent_rows => 0, |
|
1612
|
|
|
|
|
|
|
sent_batches => 0, |
|
1613
|
|
|
|
|
|
|
last_response => undef, |
|
1614
|
|
|
|
|
|
|
summary => {}, |
|
1615
|
|
|
|
|
|
|
}, $class; |
|
1616
|
|
|
|
|
|
|
} |
|
1617
|
|
|
|
|
|
|
|
|
1618
|
|
|
|
|
|
|
sub push :method { ## no critic (ProhibitBuiltinHomonyms) |
|
1619
|
12
|
|
|
12
|
|
851
|
my ($self, $row) = @_; |
|
1620
|
12
|
|
|
|
|
12
|
CORE::push @{ $self->{rows} }, $row; |
|
|
12
|
|
|
|
|
23
|
|
|
1621
|
12
|
50
|
|
|
|
15
|
$self->flush if @{ $self->{rows} } >= $self->{batch_size}; |
|
|
12
|
|
|
|
|
30
|
|
|
1622
|
12
|
|
|
|
|
30
|
return $self; |
|
1623
|
|
|
|
|
|
|
} |
|
1624
|
|
|
|
|
|
|
|
|
1625
|
|
|
|
|
|
|
sub push_many { |
|
1626
|
4
|
|
|
4
|
|
269
|
my ($self, $rows) = @_; |
|
1627
|
4
|
|
|
|
|
4
|
CORE::push @{ $self->{rows} }, @{$rows}; |
|
|
4
|
|
|
|
|
8
|
|
|
|
4
|
|
|
|
|
20
|
|
|
1628
|
|
|
|
|
|
|
# Slice exactly batch_size rows per flush so we never POST one |
|
1629
|
|
|
|
|
|
|
# oversized body when push_many is called with N >> batch_size. |
|
1630
|
|
|
|
|
|
|
# `local` restores $self->{rows} to the remainder arrayref even |
|
1631
|
|
|
|
|
|
|
# when `flush` croaks mid-batch (so caller's eval{} sees the |
|
1632
|
|
|
|
|
|
|
# untried rows still buffered for a retry). |
|
1633
|
4
|
|
|
|
|
5
|
while (@{ $self->{rows} } > $self->{batch_size}) { |
|
|
7
|
|
|
|
|
13
|
|
|
1634
|
4
|
|
|
|
|
5
|
my @batch = splice @{ $self->{rows} }, 0, $self->{batch_size}; |
|
|
4
|
|
|
|
|
10
|
|
|
1635
|
4
|
|
|
|
|
7
|
local $self->{rows} = \@batch; |
|
1636
|
4
|
|
|
|
|
10
|
$self->flush; |
|
1637
|
|
|
|
|
|
|
} |
|
1638
|
3
|
100
|
|
|
|
4
|
$self->flush if @{ $self->{rows} } >= $self->{batch_size}; |
|
|
3
|
|
|
|
|
7
|
|
|
1639
|
3
|
|
|
|
|
4
|
return $self; |
|
1640
|
|
|
|
|
|
|
} |
|
1641
|
|
|
|
|
|
|
|
|
1642
|
|
|
|
|
|
|
sub flush { |
|
1643
|
14
|
|
|
14
|
|
48
|
my $self = shift; |
|
1644
|
14
|
|
|
|
|
16
|
my $rows = $self->{rows}; |
|
1645
|
14
|
100
|
|
|
|
17
|
return $self if !@{$rows}; |
|
|
14
|
|
|
|
|
28
|
|
|
1646
|
|
|
|
|
|
|
my $body = ClickHouse::Encoder::_apply_compression( |
|
1647
|
13
|
|
|
|
|
184
|
$self->{origin}, $self->{compress}, $self->{enc}->encode($rows)); |
|
1648
|
13
|
|
|
|
|
20
|
my $resp; |
|
1649
|
|
|
|
|
|
|
my $last_err; |
|
1650
|
13
|
|
|
|
|
36
|
for my $attempt (0 .. $self->{retries}) { |
|
1651
|
|
|
|
|
|
|
$resp = $self->{http}->post($self->{url}, |
|
1652
|
18
|
|
|
|
|
200
|
{ headers => $self->{hdr}, content => $body }); |
|
1653
|
18
|
100
|
|
|
|
324
|
last if $resp->{success}; |
|
1654
|
|
|
|
|
|
|
# 4xx errors are not retryable - the request is malformed. |
|
1655
|
|
|
|
|
|
|
die "bulk_inserter: HTTP $resp->{status}: $resp->{content}\n" |
|
1656
|
9
|
100
|
66
|
|
|
63
|
if $resp->{status} >= 400 && $resp->{status} < 500; |
|
1657
|
8
|
|
|
|
|
22
|
$last_err = "HTTP $resp->{status}: $resp->{content}"; |
|
1658
|
|
|
|
|
|
|
# 5xx and network failures (599) are retryable. Exponential |
|
1659
|
|
|
|
|
|
|
# backoff (retry_wait * 2^attempt, capped at retry_max_wait) |
|
1660
|
|
|
|
|
|
|
# with equal jitter: sleep half the window deterministically |
|
1661
|
|
|
|
|
|
|
# then a random half, so concurrent inserters retrying the |
|
1662
|
|
|
|
|
|
|
# same failed server don't resynchronise into a thundering herd. |
|
1663
|
8
|
100
|
|
|
|
25
|
if ($attempt < $self->{retries}) { |
|
1664
|
5
|
|
|
|
|
48
|
require Time::HiRes; |
|
1665
|
5
|
|
|
|
|
15
|
my $window = $self->{retry_wait} * (2 ** $attempt); |
|
1666
|
|
|
|
|
|
|
$window = $self->{retry_max_wait} |
|
1667
|
5
|
50
|
|
|
|
15
|
if $window > $self->{retry_max_wait}; |
|
1668
|
5
|
|
|
|
|
2521206
|
Time::HiRes::sleep($window / 2 + rand($window / 2)); |
|
1669
|
|
|
|
|
|
|
} |
|
1670
|
|
|
|
|
|
|
} |
|
1671
|
|
|
|
|
|
|
die "bulk_inserter: gave up after $self->{retries} retries; " |
|
1672
|
|
|
|
|
|
|
. "last error: $last_err\n" |
|
1673
|
12
|
100
|
|
|
|
76
|
unless $resp->{success}; |
|
1674
|
9
|
|
|
|
|
26
|
ClickHouse::Encoder::_decorate_response($resp); |
|
1675
|
9
|
|
|
|
|
17
|
$self->{last_response} = $resp; |
|
1676
|
9
|
100
|
|
|
|
21
|
if (my $sum = $resp->{ch}{summary}) { |
|
1677
|
|
|
|
|
|
|
# Roll up CH summary fields across batches (read_rows, |
|
1678
|
|
|
|
|
|
|
# written_rows, written_bytes, elapsed_ns, ...). Caller uses |
|
1679
|
|
|
|
|
|
|
# ->summary to get the running totals; ->last_response to get |
|
1680
|
|
|
|
|
|
|
# the most recent per-batch detail. |
|
1681
|
|
|
|
|
|
|
$self->{summary}{$_} = ($self->{summary}{$_} // 0) + $sum->{$_} |
|
1682
|
2
|
|
100
|
|
|
5
|
for grep { $sum->{$_} =~ /\A-?\d+\z/ } keys %$sum; |
|
|
8
|
|
|
|
|
37
|
|
|
1683
|
|
|
|
|
|
|
} |
|
1684
|
9
|
|
|
|
|
16
|
$self->{rows} = []; |
|
1685
|
9
|
|
|
|
|
24
|
$self->{sent_rows} += @{$rows}; |
|
|
9
|
|
|
|
|
10
|
|
|
1686
|
9
|
|
|
|
|
12
|
$self->{sent_batches}++; |
|
1687
|
9
|
|
|
|
|
25
|
return $self; |
|
1688
|
|
|
|
|
|
|
} |
|
1689
|
|
|
|
|
|
|
|
|
1690
|
2
|
|
|
2
|
|
4
|
sub last_response { my $self = shift; return $self->{last_response} } |
|
|
2
|
|
|
|
|
10
|
|
|
1691
|
2
|
|
|
2
|
|
6
|
sub summary { my $self = shift; return $self->{summary} } |
|
|
2
|
|
|
|
|
17
|
|
|
1692
|
|
|
|
|
|
|
|
|
1693
|
|
|
|
|
|
|
sub finish { |
|
1694
|
2
|
|
|
2
|
|
6
|
my $self = shift; |
|
1695
|
2
|
|
|
|
|
5
|
$self->flush; |
|
1696
|
2
|
|
|
|
|
5
|
return { rows => $self->{sent_rows}, batches => $self->{sent_batches} }; |
|
1697
|
|
|
|
|
|
|
} |
|
1698
|
|
|
|
|
|
|
|
|
1699
|
4
|
|
|
4
|
|
2038
|
sub buffered_count { my $self = shift; return scalar @{ $self->{rows} } } |
|
|
4
|
|
|
|
|
6
|
|
|
|
4
|
|
|
|
|
17
|
|
|
1700
|
4
|
|
|
4
|
|
1581
|
sub sent_rows { my $self = shift; return $self->{sent_rows} } |
|
|
4
|
|
|
|
|
13
|
|
|
1701
|
1
|
|
|
1
|
|
10
|
sub sent_batches { my $self = shift; return $self->{sent_batches} } |
|
|
1
|
|
|
|
|
4
|
|
|
1702
|
|
|
|
|
|
|
|
|
1703
|
|
|
|
|
|
|
package ClickHouse::Encoder; ## no critic (ProhibitMultiplePackages) |
|
1704
|
|
|
|
|
|
|
|
|
1705
|
|
|
|
|
|
|
# Decimal128 values returned by decode_block come as [lo_uint64, |
|
1706
|
|
|
|
|
|
|
# hi_int64]; Decimal256 as a 4-limb arrayref. Use Math::BigInt to |
|
1707
|
|
|
|
|
|
|
# stitch limbs into a scaled decimal string. |
|
1708
|
|
|
|
|
|
|
sub decimal128_str { |
|
1709
|
8
|
|
|
8
|
1
|
277878
|
my ($class, $lo, $hi, $scale) = @_; |
|
1710
|
8
|
|
|
|
|
3603
|
require Math::BigInt; |
|
1711
|
8
|
|
|
|
|
65325
|
my $two64 = Math::BigInt->new(1)->blsft(64); |
|
1712
|
8
|
|
|
|
|
49898
|
my $v = Math::BigInt->new($hi)->bmul($two64)->badd($lo); |
|
1713
|
8
|
|
|
|
|
2626
|
return _scale_int_to_str($v, $scale); |
|
1714
|
|
|
|
|
|
|
} |
|
1715
|
|
|
|
|
|
|
|
|
1716
|
|
|
|
|
|
|
sub decimal256_str { |
|
1717
|
5
|
|
|
5
|
1
|
1255
|
my ($class, $limbs, $scale) = @_; |
|
1718
|
5
|
|
|
|
|
33
|
require Math::BigInt; |
|
1719
|
5
|
|
|
|
|
19
|
my $two64 = Math::BigInt->new(1)->blsft(64); |
|
1720
|
|
|
|
|
|
|
# Top limb (limbs[3]) is the sign-extended high quarter. The sign |
|
1721
|
|
|
|
|
|
|
# check uses BigInt comparisons; native Perl `1 << 64` returns 0 on |
|
1722
|
|
|
|
|
|
|
# a 64-bit Perl (shift past word width), which would silently turn |
|
1723
|
|
|
|
|
|
|
# a negative value into a wrong positive one. |
|
1724
|
5
|
|
|
|
|
3475
|
my $top = Math::BigInt->new($limbs->[3]); |
|
1725
|
5
|
|
|
|
|
326
|
my $two63 = Math::BigInt->new(1)->blsft(63); |
|
1726
|
5
|
100
|
|
|
|
3299
|
$top->bsub($two64) if $top >= $two63; |
|
1727
|
5
|
|
|
|
|
387
|
my $v = $top; |
|
1728
|
5
|
|
|
|
|
12
|
$v->bmul($two64)->badd(Math::BigInt->new($limbs->[2])); |
|
1729
|
5
|
|
|
|
|
1184
|
$v->bmul($two64)->badd(Math::BigInt->new($limbs->[1])); |
|
1730
|
5
|
|
|
|
|
1215
|
$v->bmul($two64)->badd(Math::BigInt->new($limbs->[0])); |
|
1731
|
5
|
|
|
|
|
1218
|
return _scale_int_to_str($v, $scale); |
|
1732
|
|
|
|
|
|
|
} |
|
1733
|
|
|
|
|
|
|
|
|
1734
|
|
|
|
|
|
|
sub _scale_int_to_str { |
|
1735
|
13
|
|
|
13
|
|
21
|
my ($big, $scale) = @_; |
|
1736
|
13
|
100
|
|
|
|
27
|
my $sign = $big->is_neg ? '-' : ''; |
|
1737
|
13
|
|
|
|
|
80
|
my $abs = $big->copy->babs->bstr; |
|
1738
|
13
|
100
|
66
|
|
|
1393
|
return "$sign$abs" if !$scale || $scale == 0; |
|
1739
|
5
|
100
|
|
|
|
18
|
$abs = ('0' x ($scale - length($abs) + 1)) . $abs |
|
1740
|
|
|
|
|
|
|
if length($abs) <= $scale; |
|
1741
|
5
|
|
|
|
|
61
|
return $sign . substr($abs, 0, length($abs) - $scale) |
|
1742
|
|
|
|
|
|
|
. '.' |
|
1743
|
|
|
|
|
|
|
. substr($abs, length($abs) - $scale); |
|
1744
|
|
|
|
|
|
|
} |
|
1745
|
|
|
|
|
|
|
|
|
1746
|
|
|
|
|
|
|
# Convenience: open `@cmd` as a write pipe, encode rows into it, and |
|
1747
|
|
|
|
|
|
|
# close. Croaks on fork failure, exec failure, or non-zero child exit. |
|
1748
|
|
|
|
|
|
|
# Used by examples piping into |
|
1749
|
|
|
|
|
|
|
# `clickhouse-client insert ... format native`. |
|
1750
|
|
|
|
|
|
|
sub encode_to_command { |
|
1751
|
13
|
|
|
13
|
1
|
279083
|
my ($self, $cmd, $rows) = @_; |
|
1752
|
13
|
100
|
|
|
|
85
|
ref $cmd eq 'ARRAY' or die "encode_to_command: cmd must be arrayref"; |
|
1753
|
11
|
100
|
|
|
|
58
|
@$cmd or die "encode_to_command: cmd must be non-empty arrayref"; |
|
1754
|
|
|
|
|
|
|
|
|
1755
|
|
|
|
|
|
|
# Without ignoring SIGPIPE, an early child exit (e.g. clickhouse-client |
|
1756
|
|
|
|
|
|
|
# rejecting the schema) would kill the parent silently with status |
|
1757
|
|
|
|
|
|
|
# 141 instead of producing a trappable diagnostic on close. |
|
1758
|
9
|
|
|
|
|
138
|
local $SIG{PIPE} = 'IGNORE'; |
|
1759
|
|
|
|
|
|
|
|
|
1760
|
|
|
|
|
|
|
## no critic (InputOutput::RequireBriefOpen) -- $fh is closed below |
|
1761
|
9
|
50
|
|
|
|
35726
|
defined(my $pid = open my $fh, '|-') or die "fork: $!"; |
|
1762
|
9
|
100
|
|
|
|
481
|
if ($pid == 0) { |
|
1763
|
|
|
|
|
|
|
# exec only returns on failure. Fold the failure path onto the |
|
1764
|
|
|
|
|
|
|
# same statement (via `or do {...}`) so Perl doesn't flag the |
|
1765
|
|
|
|
|
|
|
# post-exec code as unreachable at compile time. POSIX::_exit |
|
1766
|
|
|
|
|
|
|
# (not die or plain exit) skips END blocks and DESTROY handlers |
|
1767
|
|
|
|
|
|
|
# inherited from the parent; syswrite avoids running PerlIO |
|
1768
|
|
|
|
|
|
|
# layers and __WARN__ handlers. Suppress Perl's default "Can't |
|
1769
|
|
|
|
|
|
|
# exec" warning so the only diagnostic comes from our syswrite. |
|
1770
|
68
|
|
|
68
|
|
206216
|
no warnings 'exec'; ## no critic (TestingAndDebugging::ProhibitNoWarnings) |
|
|
68
|
|
|
|
|
143
|
|
|
|
68
|
|
|
|
|
14346
|
|
|
1771
|
3
|
0
|
|
|
|
171
|
exec { $cmd->[0] } @$cmd or do { |
|
|
3
|
|
|
|
|
0
|
|
|
1772
|
0
|
|
|
|
|
0
|
my $err = $!; |
|
1773
|
0
|
|
|
|
|
0
|
require POSIX; |
|
1774
|
0
|
|
|
|
|
0
|
syswrite STDERR, "exec @$cmd: $err\n"; |
|
1775
|
0
|
|
|
|
|
0
|
POSIX::_exit(127); |
|
1776
|
|
|
|
|
|
|
}; |
|
1777
|
|
|
|
|
|
|
} |
|
1778
|
6
|
|
|
|
|
42
|
binmode $fh; |
|
1779
|
6
|
|
|
|
|
2579619
|
$self->encode_to_handle($fh, $rows); |
|
1780
|
5
|
50
|
|
|
|
11071437
|
close $fh |
|
|
|
100
|
|
|
|
|
|
|
1781
|
|
|
|
|
|
|
or die "@$cmd " . ($! ? "close: $!" : "exit " . ($? >> 8)); |
|
1782
|
3
|
|
|
|
|
372
|
return; |
|
1783
|
|
|
|
|
|
|
} |
|
1784
|
|
|
|
|
|
|
|
|
1785
|
|
|
|
|
|
|
1; |
|
1786
|
|
|
|
|
|
|
|
|
1787
|
|
|
|
|
|
|
__END__ |