File Coverage

blib/lib/EV/ClickHouse.pm
Criterion Covered Total %
statement 23 778 2.9
branch 0 418 0.0
condition 0 334 0.0
subroutine 8 123 6.5
pod 25 27 92.5
total 56 1680 3.3


line stmt bran cond sub pod time code
1             package EV::ClickHouse;
2 48     48   5089085 use strict;
  48         73  
  48         1601  
3 48     48   342 use warnings;
  48         93  
  48         2283  
4              
5 48     48   802 use EV;
  48         2395  
  48         1111  
6 48     48   174 use Scalar::Util qw(refaddr weaken);
  48         122  
  48         8187  
7              
8             # Identifier validation regexes — single source of truth for the table
9             # / column / function names that get spliced into SQL via the helpers.
10             my $RE_IDENT = qr/\A[A-Za-z_][A-Za-z0-9_]*\z/;
11             my $RE_TABLE = qr/\A[A-Za-z_][A-Za-z0-9_]*(?:\.[A-Za-z_][A-Za-z0-9_]*)?\z/;
12              
13             BEGIN {
14 48     48   140 our $VERSION = '0.03';
15 48     48   316 use XSLoader;
  48         82  
  48         1381  
16 48         551041 XSLoader::load __PACKAGE__, $VERSION;
17             }
18              
19             # Holds in-flight EV::cares resolvers so they aren't garbage-collected
20             # before their callback fires. Keyed by refaddr of the resolver itself
21             # (not the connection) — see new() for the rationale. Each entry is
22             # deleted from inside its own resolved callback via a deferred timer.
23             # Plain package hash (not Hash::Util::FieldHash) so module load doesn't
24             # add tied/magic SVs that Test::LeakTrace would flag.
25             our %_failover;
26              
27             *q = \&query;
28             *reconnect = \&reset;
29             *disconnect = \&finish;
30             *ddl = \&query; # readability at call sites for DDL/DML
31              
32 0     0     sub _uri_unescape { my $s = $_[0]; $s =~ s/%([0-9A-Fa-f]{2})/chr(hex($1))/ge; $s }
  0            
  0            
  0            
33              
34             # Parse a URI query string into a hash. Bare keys (no `=`) are stored
35             # as 1 (standard URL flag convention); existing keys in $h are preserved
36             # (path-derived host/port/etc win over query-string overrides).
37             sub _uri_qs_into {
38 0     0     my ($qs, $h) = @_;
39 0 0 0       return unless defined $qs && length $qs;
40 0           for my $pair (split /&/, $qs) {
41 0           my ($k, $v) = split /=/, $pair, 2;
42 0 0 0       next unless defined $k && length $k;
43 0 0 0       $h->{$k} //= defined $v ? _uri_unescape($v) : 1;
44             }
45             }
46              
47             sub new {
48 0     0 1   my ($class, %args) = @_;
49              
50             # Connection URI: clickhouse://user:pass@host:port/database
51             # host accepts a bracketed IPv6 literal (e.g. clickhouse://[::1]:9000/db)
52 0 0         if (my $uri = delete $args{uri}) {
53 0 0         if ($uri =~ m{^clickhouse(?:\+(\w+))?://(?:([^:@]*?)(?::([^@]*))?\@)?(\[[^\]]+\]|[^/:?]+)(?::(\d+))?(?:/([^?]*))?(?:\?(.*))?$}) {
54 0           my ($proto, $u, $pw, $h, $p, $db, $qs) = ($1, $2, $3, $4, $5, $6, $7);
55 0           $h =~ s/^\[(.*)\]$/$1/;
56 0 0 0       $args{protocol} //= $proto if $proto;
57 0 0 0       $args{user} //= _uri_unescape($u) if defined $u && $u ne '';
      0        
58 0 0 0       $args{password} //= _uri_unescape($pw) if defined $pw;
59 0   0       $args{host} //= $h;
60 0 0 0       $args{port} //= $p if defined $p;
61 0 0 0       $args{database} //= _uri_unescape($db) if defined $db && $db ne '';
      0        
62 0           _uri_qs_into($qs, \%args);
63             } else {
64 0           die "EV::ClickHouse: invalid URI '$uri'\n";
65             }
66             }
67              
68 0   0       my $loop = delete $args{loop} || EV::default_loop;
69 0           my $self = $class->_new($loop);
70              
71             # Multi-host failover: hosts => ['a', 'b', 'c'] or ['a:9000', 'b:9001'].
72             # On a connect-phase failure, advance to the next host and reconnect
73             # via auto_reconnect (or the user calling reset). Falls back to single
74             # host => '...' when not provided.
75 0           my $hosts_list;
76 0 0         if (my $h = delete $args{hosts}) {
77 0 0 0       die "hosts must be a non-empty arrayref"
78             unless ref($h) eq 'ARRAY' && @$h;
79 0           $hosts_list = $h;
80 0 0         if (!defined $args{host}) {
81 0           my ($h0, $p0) = _split_host_port($h->[0], $args{port});
82 0           $args{host} = $h0;
83 0   0       $args{port} //= $p0;
84             }
85             }
86              
87             # Failover state + on_failover both live in the connection's C struct
88             # now: emit_error advances the host ring before firing on_error,
89             # which keeps the hot error path off the Perl stack.
90             my $user_on_error = exists $args{on_error}
91 0 0   0     ? delete $args{on_error} : sub { die @_ };
  0            
92 0           $self->on_error($user_on_error);
93 0 0         if (my $cb = delete $args{on_failover}) { $self->on_failover($cb) }
  0            
94 0           for my $h (qw(on_connect on_progress on_disconnect on_trace on_query_complete on_query_start on_log)) {
95 0 0         $self->$h(delete $args{$h}) if exists $args{$h};
96             }
97              
98 0   0       my $host = delete $args{host} // '127.0.0.1';
99 0           my $port = delete $args{port};
100 0   0       my $protocol = delete $args{protocol} // 'http';
101 0   0       my $user = delete $args{user} // 'default';
102 0   0       my $password = delete $args{password} // '';
103 0   0       my $database = delete $args{database} // delete $args{db} // 'default';
      0        
104              
105 0 0 0       die "EV::ClickHouse: unknown protocol '$protocol' (expected 'http' or 'native')\n"
106             unless $protocol eq 'http' || $protocol eq 'native';
107              
108 0 0 0       $port //= ($protocol eq 'native') ? 9000 : 8123;
109              
110 0 0         $self->_set_protocol($protocol eq 'native' ? 1 : 0);
111              
112             # Pass-through setters. Skip only when the key was absent — explicit
113             # 0/'' must reach the setter so e.g. `compress => 0` is honored, not
114             # ignored. (Use `exists` rather than `defined` so a deliberate undef
115             # is also passed through and rejected by the setter if invalid.)
116 0           for my $opt (qw(compress tls tls_skip_verify auto_reconnect
117             keepalive reconnect_delay reconnect_max_delay
118             reconnect_jitter reconnect_max_attempts
119             progress_period http_basic_auth
120             connect_timeout query_timeout
121             max_query_size max_recv_buffer)) {
122 0 0         next unless exists $args{$opt};
123 0           my $val = delete $args{$opt};
124 0           my $setter = "_set_$opt";
125 0           $self->$setter($val);
126             }
127 0           for my $opt (qw(session_id tls_ca_file tls_cert_file tls_key_file)) {
128 0 0         defined(my $val = delete $args{$opt}) or next;
129 0           my $setter = "_set_$opt";
130 0           $self->$setter($val);
131             }
132              
133             # query_log_comment: 1 = auto-generate "ev_ch user=$ENV{USER} pid=$$"; any
134             # other defined non-empty string (including "0") is taken literally;
135             # undef / not present / empty string is disabled.
136             {
137 0           my $qlc = delete $args{query_log_comment};
  0            
138 0 0 0       if (defined $qlc && length $qlc) {
139             my $cmt = (!ref($qlc) && "$qlc" ne '1')
140             ? $qlc
141 0 0 0       : sprintf 'ev_ch user=%s pid=%d', $ENV{USER} // 'na', $$;
      0        
142 0           $cmt =~ s{\*/}{*\\/}g;
143 0           $self->_set_query_log_comment($cmt);
144             }
145             }
146              
147             # decode_flags bitmask (DT_STR=1, DEC_SCALE=2, ENUM_STR=4, NAMED_ROWS=8)
148             my $decode_flags = (delete $args{decode_datetime} ? 1 : 0)
149             | (delete $args{decode_decimal} ? 2 : 0)
150             | (delete $args{decode_enum} ? 4 : 0)
151 0 0         | (delete $args{named_rows} ? 8 : 0);
    0          
    0          
    0          
152 0 0         $self->_set_decode_flags($decode_flags) if $decode_flags;
153              
154 0 0         if (my $settings = delete $args{settings}) { $self->_set_settings($settings) }
  0            
155              
156 0 0         warn "EV::ClickHouse->new: unknown parameter(s): " . join(', ', sort keys %args) . "\n"
157             if %args;
158              
159 0 0         if ($hosts_list) {
160 0           $self->_set_failover($hosts_list, $port);
161             }
162              
163             # Async DNS via EV::cares when available — non-IP hostnames are
164             # resolved off-loop so the constructor returns immediately and the
165             # main EV loop never blocks on getaddrinfo. Pre-connect-queued
166             # queries fire once the resolved-address connect completes. Falls
167             # back to the XS blocking resolver if EV::cares isn't installed
168             # or the host is already an IP literal.
169 0 0 0       if ($host !~ /^[\d.]+$|^\[?[0-9a-fA-F:]+\]?$/
170 0           && eval { require EV::cares; 1 }) {
  0            
171             # Stash the resolver in %_failover, keyed by refaddr of the
172             # resolver itself (NOT of $self). Two reasons:
173             # - never delete the resolver from inside its own callback —
174             # ares_destroy from a c-ares cb corrupts the channel heap.
175             # We defer the delete via EV::timer(0,...) so it runs from
176             # a clean stack frame.
177             # - keying by refaddr($self) was racy: A's deferred-delete
178             # could fire after A's struct was freed and B got the same
179             # refaddr, dropping B's resolver. refaddr($r) is unique
180             # while $r is alive in %_failover.
181 0           my $r = EV::cares->new;
182 0           my $key = refaddr($r);
183 0           $_failover{$key} = $r;
184 0           my $weak2 = $self; weaken $weak2;
  0            
185 0           $self->_set_dns_pending(1);
186             $r->resolve($host, sub {
187 0     0     my ($status, @addrs) = @_;
188 0           my $w; $w = EV::timer(0, 0, sub { undef $w; delete $_failover{$key} });
  0            
  0            
  0            
189             # Skip if the connection has been DESTROYed or the user
190             # finished it while DNS was in flight (cleanup_connection
191             # clears dns_pending; if it's 0 here, finish ran already).
192 0 0 0       return unless $weak2 && $weak2->_take_dns_pending;
193 0 0 0       if ($status != 0 || !@addrs) {
194 0           $weak2->skip_pending;
195             # Warn if the handler itself throws — matches the XS
196             # emit_error path (WARN_AND_CLEAR_ERRSV); a bare eval here
197             # would make a DNS failure vanish under the default
198             # `on_error => sub { die @_ }`.
199 0 0         eval { $weak2->on_error->("DNS resolution failed for '$host'"); 1 }
  0            
  0            
200             or warn "EV::ClickHouse: exception in error handler: $@";
201 0           return;
202             }
203 0           my ($v4) = grep /^[\d.]+$/, @addrs;
204 0   0       $weak2->connect($v4 // $addrs[0], $port, $user, $password, $database);
205 0           });
206             } else {
207 0           $self->connect($host, $port, $user, $password, $database);
208             }
209              
210 0           $self;
211             }
212              
213             sub _split_host_port {
214 0     0     my ($entry, $default_port) = @_;
215 0 0         if ($entry =~ /^\[([^\]]+)\](?::(\d+))?$/) {
216 0   0       return ($1, $2 // $default_port); # IPv6 literal in brackets
217             }
218 0 0         if ($entry =~ /^([^:]+):(\d+)$/) {
219 0           return ($1, $2);
220             }
221 0           return ($entry, $default_port);
222             }
223              
224             # Pull-based result iterator: $it = $ch->iterate($sql, [\%settings])
225             # while (my $batch = $it->next($timeout)) { ... }
226             # Wraps the native on_data per-block callback in a synchronous-feeling
227             # pull interface for procedural code. The iterator drives the EV loop
228             # from inside ->next until the next block arrives, the query completes,
229             # or the optional timeout (seconds) expires.
230             sub iterate {
231 0     0 1   my ($self, $sql, $settings) = @_;
232 0           my $it = bless {
233             ch => $self,
234             batches => [],
235             done => 0,
236             err => undef,
237             }, 'EV::ClickHouse::Iterator';
238             my $on_data = sub {
239 0     0     push @{ $it->{batches} }, $_[0];
  0            
240 0           EV::break;
241 0           };
242 0 0         my %s = $settings ? %$settings : ();
243 0           $s{on_data} = $on_data;
244             $self->query($sql, \%s, sub {
245 0     0     my (undef, $err) = @_;
246 0           $it->{done} = 1;
247 0 0         $it->{err} = $err if $err;
248 0           EV::break;
249 0           });
250 0           $it;
251             }
252              
253             # Streaming insert: $s = $ch->insert_streamer($table, %opts)
254             # $s->push_row([...]); ...; $s->finish(sub { my (undef, $err) = @_ });
255             # Buffers rows in batches of `batch_size` (default 10_000) and dispatches
256             # each batch as an insert(). Dispatches are serialised (the native protocol
257             # cannot pipeline INSERTs); push_row keeps buffering while a batch is in
258             # flight and the next batch fires from the in-flight callback.
259             sub insert_streamer {
260 0     0 1   my ($self, $table, %opts) = @_;
261 0           return EV::ClickHouse::Streamer->_new($self, $table, %opts);
262             }
263              
264             # Generator-driven insert. $producer is a code ref returning one row
265             # per call (arrayref or hashref) and undef when exhausted. Rows pump
266             # into an insert_streamer with backpressure: when buffered_count
267             # crosses high_water (default 50_000) the producer is paused until
268             # the streamer drains below the watermark. $cb fires once with
269             # (undef) on success or (undef, $err) on first failure (same shape
270             # as Streamer::finish).
271             sub insert_iter {
272 0     0 1   my ($self, $table, $producer, $cb, %opts) = @_;
273 0 0 0       die "Usage: \$ch->insert_iter(\$table, \$producer, \$cb, [\%opts])"
      0        
274             unless defined $table && ref($producer) eq 'CODE' && ref($cb) eq 'CODE';
275 0   0       my $hi = $opts{high_water} //= 50_000;
276 0           my $s = $self->insert_streamer($table, %opts);
277 0           my $pump; $pump = EV::idle(sub {
278             # Abort early on sticky error so we don't keep pumping rows into
279             # a streamer that will never accept them (and that would otherwise
280             # accumulate the buffer indefinitely if the producer is fast).
281 0 0   0     if (my $err = $s->sticky_error) {
282 0           $pump->stop; undef $pump;
  0            
283 0           $s->finish($cb);
284 0           return;
285             }
286             # Backpressure: skip this tick if the buffer is at the watermark.
287             # In-flight batches are handled inside the streamer (push_row keeps
288             # buffering while one is on the wire), so we only gate on buffer
289             # size here. $hi == 0 means the caller disabled backpressure — the
290             # gate must then never fire (>= 0 would be permanently true).
291 0 0 0       return if $hi && $s->buffered_count >= $hi;
292 0           my $row = $producer->();
293 0 0         if (defined $row) {
294 0           $s->push_row($row);
295             } else {
296 0           $pump->stop;
297 0           undef $pump;
298 0           $s->finish($cb);
299             }
300 0           });
301 0           return;
302             }
303              
304             # Ergonomic wrapper around server-side async_insert. Defaults to
305             # wait_for_flush => 1 so the callback fires after the asynchronous batch
306             # has been committed, matching the wait_for_async_insert=1 semantics in
307             # clickhouse-server. Set wait_for_flush => 0 for fire-and-forget (the
308             # callback then resolves as soon as the server has accepted the row
309             # into the in-memory async batch).
310             #
311             # $ch->insert_async('events', \@rows, sub {
312             # my (undef, $err) = @_;
313             # die "async insert: $err" if $err;
314             # });
315             sub insert_async {
316 0     0 1   my ($self, $table, $data, $cb, %opts) = @_;
317 0 0 0       die "Usage: \$ch->insert_async(\$table, \$data, \$cb, [%opts])"
      0        
318             unless defined $table && defined $data && ref($cb) eq 'CODE';
319 0 0         my $wait = exists $opts{wait_for_flush} ? delete($opts{wait_for_flush}) : 1;
320 0   0       my %extra = %{ delete($opts{settings}) // {} };
  0            
321 0 0         my %settings = (
322             async_insert => 1,
323             wait_for_async_insert => $wait ? 1 : 0,
324             %extra,
325             );
326 0           $self->insert($table, $data, \%settings, $cb);
327             }
328              
329             # Server-side insert into AggregateFunction columns. Per-aggregator
330             # state binary format isn't replicable client-side, so each per-row
331             # value is wrapped in the ${func}State combinator inside a single-row
332             # select, all union all'd into one insert.
333             sub insert_aggregated {
334 0     0 1   my ($self, $table, %opts) = @_;
335 0   0       my $cb = delete $opts{cb} // die "insert_aggregated: cb required";
336 0   0       my $rows = delete $opts{rows} // die "insert_aggregated: rows required";
337 0 0         die "insert_aggregated: cb must be a coderef" if ref($cb) ne 'CODE';
338 0 0 0       die "insert_aggregated: rows must be a non-empty arrayref"
339             if ref($rows) ne 'ARRAY' || !@$rows;
340 0   0       my $key_cols = delete $opts{key_cols} || [];
341 0 0         die "insert_aggregated: key_cols must be an arrayref"
342             if ref($key_cols) ne 'ARRAY';
343 0           my @agg = grep { ref $opts{$_} eq 'HASH' } sort keys %opts;
  0            
344 0 0         die "insert_aggregated: no aggregate columns specified" unless @agg;
345 0 0         die "insert_aggregated: invalid table '$table'"
346             unless $table =~ $RE_TABLE;
347 0           for my $c (@$key_cols, @agg) {
348 0 0         die "insert_aggregated: invalid column '$c'"
349             unless $c =~ $RE_IDENT;
350             }
351 0           my @agg_meta;
352 0           for my $ac (@agg) {
353 0           my $spec = $opts{$ac};
354 0   0       my $func = $spec->{func} // die "insert_aggregated: $ac.func required";
355 0   0       my $args = $spec->{args} // die "insert_aggregated: $ac.args required";
356 0 0         die "insert_aggregated: $ac.func must be a simple identifier"
357             unless $func =~ $RE_IDENT;
358 0           for my $t (@$args) {
359 0 0         die "insert_aggregated: $ac arg type contains illegal chars"
360             unless $t =~ /\A[A-Za-z0-9_(),\s]+\z/;
361             }
362 0           push @agg_meta, { col => $ac, func => $func, types => [@$args] };
363             }
364 0           my $expected_cols = @$key_cols;
365 0           $expected_cols += @{ $_->{types} } for @agg_meta;
  0            
366 0           for my $r (@$rows) {
367 0 0         die sprintf("insert_aggregated: row width mismatch (got %d, want %d)",
368             scalar @$r, $expected_cols)
369             if @$r != $expected_cols;
370             }
371 0           my $col_list = join ', ', @$key_cols, @agg;
372             # VALUES doesn't permit aggregate combinators (treated as non-constant),
373             # so build a UNION ALL of single-row SELECTs.
374 0           my @selects;
375 0           for my $r (@$rows) {
376 0           my $offset = 0;
377 0           my @parts;
378 0           for my $kc (@$key_cols) {
379 0           push @parts, _sql_quote_value($r->[$offset++]) . " as $kc";
380             }
381 0           for my $m (@agg_meta) {
382 0           my @args;
383 0           for my $t (@{ $m->{types} }) {
  0            
384 0           push @args, "cast(" . _sql_quote_value($r->[$offset++]) . " as $t)";
385             }
386 0           push @parts, "$m->{func}State(" . join(', ', @args) . ") as $m->{col}";
387             }
388 0           push @selects, "select " . join(', ', @parts);
389             }
390 0           my $sql = "insert into $table ($col_list) " . join(' union all ', @selects);
391 0           $self->query($sql, $cb);
392             }
393              
394             # Minimal SQL literal quoter for insert_aggregated. Numbers go raw,
395             # strings get single-quoted with embedded ' and \ escaped, undef → NULL.
396             sub _sql_quote_value {
397 0     0     my ($v) = @_;
398 0 0         return 'null' unless defined $v;
399 0 0         die "insert_aggregated: cannot quote a reference (got " . ref($v) . ")"
400             if ref $v;
401 0 0         return $v if $v =~ /\A-?(?:0|[1-9][0-9]*)(?:\.[0-9]+)?(?:[eE][-+]?[0-9]+)?\z/;
402 0           my $s = $v;
403 0           $s =~ s/\\/\\\\/g;
404 0           $s =~ s/'/\\'/g;
405 0           return "'$s'";
406             }
407              
408             sub kill_query {
409 0     0 1   my ($self, $query_id, $cb, %opts) = @_;
410 0 0 0       die "Usage: \$ch->kill_query(\$query_id, \$cb, [%opts])"
411             unless defined $query_id && ref($cb) eq 'CODE';
412 0 0         die "kill_query: invalid query_id '$query_id'"
413             unless $query_id =~ /\A[A-Za-z0-9_\-]+\z/;
414 0 0         my $mode = $opts{async} ? 'async' : 'sync';
415 0           $self->query("kill query where query_id = '$query_id' $mode", $cb);
416             }
417              
418             # Per-connection latency tracking. EV::ClickHouse is a blessed scalar
419             # (XS struct pointer) so we can't stash hash slots on it. We keep a
420             # lexical refaddr-keyed map and hold the entry's lifetime via a guard
421             # object stored inside the wrapper closure: when the user replaces
422             # on_query_complete (or XS DESTROY clears it), the closure is freed,
423             # the guard goes out of scope, and the %DUR_STATE entry is reaped.
424             my %DUR_STATE;
425             {
426             package EV::ClickHouse::_DurGuard;
427             # During global destruction %DUR_STATE may have already been
428             # reaped; touching it then is undefined.
429             sub DESTROY {
430 0 0   0     return if ${^GLOBAL_PHASE} eq 'DESTRUCT';
431 0           delete $DUR_STATE{ ${ $_[0] } };
  0            
432             }
433             }
434              
435             sub track_query_durations {
436 0     0 1   my ($self, $size) = @_;
437 0   0       $size //= 1024;
438 0           my $key = refaddr($self);
439 0 0         if ($size == 0) {
440 0 0         if (my $st = delete $DUR_STATE{$key}) {
441 0           $self->on_query_complete($st->{prev}); # undef restores no-handler
442             }
443 0           return $self;
444             }
445 0           my $st = $DUR_STATE{$key};
446 0 0         if (!$st) {
447 0           $st = $DUR_STATE{$key} = {
448             size => $size,
449             buf => [],
450             pos => 0,
451             prev => $self->on_query_complete,
452             };
453 0           my $guard = bless \(my $k = $key), 'EV::ClickHouse::_DurGuard';
454 0           my $prev = $st->{prev};
455             $self->on_query_complete(sub {
456             # Use captured $key (not refaddr at fire time) so we still
457             # reach the right ring after DESTROY zeroed the SV.
458 0     0     my $ring = $DUR_STATE{$key};
459 0 0         if ($ring) {
460 0           my $dur = $_[4];
461 0 0 0       if (defined $dur && $dur >= 0) {
462 0 0         if (@{ $ring->{buf} } < $ring->{size}) {
  0            
463 0           push @{ $ring->{buf} }, $dur;
  0            
464             } else {
465 0           $ring->{buf}[ $ring->{pos} ] = $dur;
466 0           $ring->{pos} = ($ring->{pos} + 1) % $ring->{size};
467             }
468             }
469             }
470 0 0         $prev->(@_) if $prev;
471 0           $guard; # keep the guard alive for the closure's lifetime
472 0           });
473             } else {
474             # Resize: linearize chronological order (oldest at $st->{pos}
475             # once the ring is full), keep the newest min(N, $size) samples.
476             # Plain shift would drop by physical index instead of by age.
477 0           my $buf = $st->{buf};
478 0 0         if (@$buf >= $st->{size}) {
479             # Ring full: chronological is buf[pos..end] then buf[0..pos-1].
480 0           my $pos = $st->{pos} % $st->{size};
481 0           @$buf = (@{$buf}[$pos .. $#$buf], @{$buf}[0 .. $pos - 1]);
  0            
  0            
482             }
483             # Insertion order is now buf[0..n-1] — already chronological.
484 0           shift @$buf while @$buf > $size;
485 0           $st->{size} = $size;
486             # After linearization the oldest item is at index 0. While the
487             # ring is still filling, push appends and pos is ignored; once
488             # full, the next overwrite must target the oldest (index 0).
489 0           $st->{pos} = 0;
490             }
491 0           return $self;
492             }
493              
494             sub query_duration_p {
495 0     0 0   my ($self, $p) = @_;
496 0 0         my $st = $DUR_STATE{ refaddr($self) } or return undef;
497 0           my @s = sort { $a <=> $b } @{ $st->{buf} };
  0            
  0            
498 0 0         return undef unless @s;
499 0 0         $p = 0 if $p < 0;
500 0 0         $p = 1 if $p > 1;
501 0           $s[ int($p * (@s - 1) + 0.5) ];
502             }
503              
504             sub query_duration_count {
505 0 0   0 0   my $st = $DUR_STATE{ refaddr($_[0]) } or return 0;
506 0           scalar @{ $st->{buf} };
  0            
507             }
508              
509             # Local in-flight cancel guarded by query_id match. Only triggers
510             # cancel() if the connection's current in-flight query (last_query_id)
511             # matches $query_id, so a race where the intended query has already
512             # finished and a different one is now running can't silently kill the
513             # wrong query. Returns 1 if it cancelled, 0 if the id didn't match.
514             sub cancel_by_query_id {
515 0     0 1   my ($self, $query_id) = @_;
516 0 0 0       die "cancel_by_query_id: query_id required" unless defined $query_id && length $query_id;
517 0           my $cur = $self->last_query_id;
518 0 0 0       return 0 unless defined $cur && $cur eq $query_id;
519 0           $self->cancel;
520 0           return 1;
521             }
522              
523             # Retry a query over the same connection with exponential backoff,
524             # only on retryable server errors. Falls through to the user's $cb
525             # with the final result (success or last error) — never invokes $cb
526             # more than once. Per-attempt $settings are honored; pass via the
527             # settings => \%hash key.
528             #
529             # $ch->retry("select * from t",
530             # retries => 3, backoff => 0.5, jitter => 0.25,
531             # cb => sub { my ($rows, $err) = @_; ... });
532             sub retry {
533 0     0 1   my ($self, $sql, %opts) = @_;
534 0   0       my $cb = delete $opts{cb} // die "retry: cb required";
535 0   0       my $tries = delete $opts{retries} // 3;
536 0   0       my $delay = delete $opts{backoff} // 0.25;
537 0   0       my $jitter = delete $opts{jitter} // 0;
538 0           my $settings = delete $opts{settings};
539 0 0         die "retry: cb must be a coderef" if ref($cb) ne 'CODE';
540              
541 0           my $attempt = 0;
542 0           my $timer;
543             my $dispatch;
544             $dispatch = sub {
545 0     0     $attempt++;
546             my $on_done = sub {
547 0           my ($rows, $err) = @_;
548 0 0 0       if (!$err || $attempt > $tries
      0        
549             || !EV::ClickHouse->is_retryable_error($self->last_error_code)) {
550 0           undef $dispatch; # collapse the cycle on terminal call
551 0           return $cb->($rows, $err);
552             }
553 0           my $wait = $delay * (2 ** ($attempt - 1));
554 0 0         $wait += rand($wait * $jitter) if $jitter > 0;
555 0           $timer = EV::timer($wait, 0, sub { undef $timer; $dispatch->() });
  0            
  0            
556 0           };
557             # Synchronous query() can croak (e.g. "not connected" if the
558             # connection was finish()d between attempts). Catch and route to
559             # the user cb instead of letting it escape into the event loop.
560 0           my $ok = eval {
561 0 0         if ($settings) { $self->query($sql, $settings, $on_done) }
  0            
562 0           else { $self->query($sql, $on_done) }
563 0           1;
564             };
565 0 0         if (!$ok) {
566 0   0       my $e = $@ || 'unknown error';
567 0           undef $dispatch;
568 0           $cb->(undef, $e);
569             }
570 0           };
571 0           $dispatch->();
572 0           return;
573             }
574              
575             # Schema introspection: $ch->for_table('db.t', sub { my ($info, $err) = @_; ... })
576             # Delivers { columns => [{name=>..., type=>...}, ...] } or (undef, $err).
577             sub for_table {
578 0     0 1   my ($self, $table, $cb) = @_;
579 0 0 0       die "Usage: \$ch->for_table(\$table, \$cb)" unless defined $table && ref($cb) eq 'CODE';
580 0 0         die "for_table: invalid table name '$table'"
581             unless $table =~ $RE_TABLE;
582             # HTTP needs an explicit FORMAT; native ignores it and returns
583             # typed rows that respect the connection's named_rows setting.
584 0           my $sql = "describe table $table";
585 0 0         $sql .= " format TabSeparated" if !$self->server_revision;
586             $self->query($sql, sub {
587 0     0     my ($rows, $err) = @_;
588 0 0         return $cb->(undef, $err) if $err;
589             # Connection may be in named_rows mode, in which case each row
590             # is a hashref keyed by the DESCRIBE column names. Handle both.
591             my @cols = map {
592             ref $_ eq 'HASH'
593             ? { name => $_->{name}, type => $_->{type} }
594 0 0         : { name => $_->[0], type => $_->[1] };
595 0   0       } @{ $rows // [] };
  0            
596 0           $cb->({ columns => \@cols });
597 0           });
598             }
599              
600             # Discover the dynamic JSON path layout for a JSON/Object('json') column.
601             # Walks the Map(String, String) returned by JSONAllPathsWithTypes, deduped
602             # across the table.
603             sub for_json_paths {
604 0     0 1   my ($self, $table, $column, $cb) = @_;
605 0 0 0       die "Usage: \$ch->for_json_paths(\$table, \$column, \$cb)"
      0        
606             unless defined $table && defined $column && ref($cb) eq 'CODE';
607 0 0         die "for_json_paths: invalid table '$table'"
608             unless $table =~ $RE_TABLE;
609 0 0         die "for_json_paths: invalid column '$column'"
610             unless $column =~ $RE_IDENT;
611             # Single arrayJoin to avoid Cartesian; reference the alias to look
612             # up the type so each (path, type) pair stays correlated.
613 0           my $sql = "select distinct path, m[path] as type from ("
614             . " select m, arrayJoin(mapKeys(m)) as path"
615             . " from (select JSONAllPathsWithTypes($column) as m from $table)"
616             . ") order by path";
617             # HTTP needs explicit format; native returns typed rows directly.
618 0 0         $sql .= " format TabSeparated" if !$self->server_revision;
619             $self->query($sql, sub {
620 0     0     my ($rows, $err) = @_;
621 0 0         return $cb->(undef, $err) if $err;
622             # Handle both arrayref and named_rows hashref shapes — same idiom
623             # as for_table.
624             $cb->([ map {
625             ref $_ eq 'HASH'
626             ? { path => $_->{path}, type => $_->{type} }
627 0 0         : { path => $_->[0], type => $_->[1] }
628 0   0       } @{ $rows // [] } ]);
  0            
629 0           });
630             }
631              
632             # is_healthy: ping with a deadline. Callback receives (1, undef) on
633             # success or (0, $err_msg) on timeout or ping error. The connection
634             # itself isn't disturbed - failure does not call finish/reset; the
635             # caller decides recovery.
636             sub is_healthy {
637 0     0 1   my ($self, $cb, $timeout) = @_;
638 0 0         die "Usage: \$ch->is_healthy(\$cb, [\$timeout])" unless ref($cb) eq 'CODE';
639 0   0       $timeout //= 5;
640 0           my $done = 0;
641             my $t = EV::timer($timeout, 0, sub {
642 0 0   0     return if $done;
643 0           $done = 1;
644 0           $cb->(0, "health check timeout after ${timeout}s");
645 0           });
646             $self->ping(sub {
647 0 0   0     return if $done;
648 0           $done = 1;
649 0           undef $t;
650 0           my (undef, $err) = @_;
651 0           $cb->(!$err, $err);
652 0           });
653 0           return;
654             }
655              
656             # Single PING + measure wall-clock latency. Callback receives
657             # ($seconds, undef) on success or (undef, $err) on failure.
658             # Cheaper than installing track_query_durations for a one-shot probe.
659             sub ping_round_trip {
660 0     0 1   my ($self, $cb) = @_;
661 0 0         die "Usage: \$ch->ping_round_trip(\$cb)" unless ref($cb) eq 'CODE';
662 0           my $start = EV::time();
663             $self->ping(sub {
664 0     0     my (undef, $err) = @_;
665 0 0         return $cb->(undef, $err) if $err;
666 0           $cb->(EV::time() - $start, undef);
667 0           });
668 0           return;
669             }
670              
671             # Filter callback that fires only when a query exceeds $threshold
672             # seconds. Installs an on_query_complete observer that composes with
673             # any existing handler. Returns the previous on_query_complete so the
674             # caller can restore it later.
675             sub slow_query_log {
676 0     0 1   my ($self, $threshold, $cb) = @_;
677 0 0 0       die "Usage: \$ch->slow_query_log(\$threshold, \$cb)"
678             unless defined $threshold && ref($cb) eq 'CODE';
679 0           my $prev = $self->on_query_complete;
680             $self->on_query_complete(sub {
681             # on_query_complete args: ($query_id, $rows, $bytes, $code, $dur, $err)
682 0     0     my $dur = $_[4];
683 0 0 0       $cb->(@_) if defined $dur && $dur >= $threshold;
684 0 0         $prev->(@_) if $prev;
685 0           });
686 0           return $prev;
687             }
688              
689             # Fetch one value from system.settings. Identifier-safe ($name is
690             # validated; the lookup uses parameter binding so a bad name produces
691             # a server-side error rather than SQL injection).
692             sub server_setting {
693 0     0 1   my ($self, $name, $cb) = @_;
694 0 0 0       die "Usage: \$ch->server_setting(\$name, \$cb)"
695             unless defined $name && ref($cb) eq 'CODE';
696 0 0         die "server_setting: invalid setting name '$name'"
697             unless $name =~ $RE_IDENT;
698             $self->query(
699             "select value from system.settings where name = {n:String}",
700             { params => { n => $name } },
701             sub {
702 0     0     my ($rows, $err) = @_;
703 0 0         return $cb->(undef, $err) if $err;
704 0 0 0       $cb->($rows && @$rows ? $rows->[0][0] : undef);
705             },
706 0           );
707             }
708              
709             # Count rows in a table, optionally with a server-side WHERE filter.
710             # $where is interpolated literally so the caller is responsible for
711             # its safety (use parameterized queries via settings.params for
712             # user-supplied predicates). Returns the integer count or (undef, $err).
713             sub row_count {
714 0     0 1   my ($self, $table, $where, $cb) = @_;
715 0 0         ($where, $cb) = (undef, $where) if ref($where) eq 'CODE';
716 0 0 0       die "Usage: \$ch->row_count(\$table, [\$where], \$cb)"
717             unless defined $table && ref($cb) eq 'CODE';
718 0 0         die "row_count: invalid table '$table'" unless $table =~ $RE_TABLE;
719 0           my $tbl = EV::ClickHouse->bind_ident($table);
720 0           my $sql = "select count() from $tbl";
721 0 0 0       $sql .= " where $where" if defined $where && length $where;
722             $self->query($sql, sub {
723 0     0     my ($rows, $err) = @_;
724 0 0         return $cb->(undef, $err) if $err;
725 0 0 0       $cb->($rows && @$rows ? $rows->[0][0] : 0);
726 0           });
727             }
728              
729             # Approx on-disk + uncompressed sizes from system.parts. Returns
730             # { rows => N, bytes_on_disk => N, data_uncompressed_bytes => N }.
731             sub table_size {
732 0     0 1   my ($self, $table, $cb) = @_;
733 0 0 0       die "Usage: \$ch->table_size(\$table, \$cb)"
734             unless defined $table && ref($cb) eq 'CODE';
735 0 0         die "table_size: invalid table '$table'" unless $table =~ $RE_TABLE;
736 0 0         my ($db, $name) = $table =~ /\./ ? split(/\./, $table, 2)
737             : (undef, $table);
738 0 0         my $sql = "select sum(rows), sum(bytes_on_disk),"
739             . " sum(data_uncompressed_bytes)"
740             . " from system.parts where active and table = {t:String}"
741             . (defined $db ? " and database = {d:String}" : "");
742             $self->query($sql, {
743             params => {
744             t => $name,
745             (defined $db ? (d => $db) : ()),
746             },
747             }, sub {
748 0     0     my ($rows, $err) = @_;
749 0 0         return $cb->(undef, $err) if $err;
750 0 0 0       my $r = $rows && @$rows ? $rows->[0] : [0, 0, 0];
751 0   0       $cb->({
      0        
      0        
752             rows => $r->[0] // 0,
753             bytes_on_disk => $r->[1] // 0,
754             data_uncompressed_bytes => $r->[2] // 0,
755             });
756 0 0         });
757             }
758              
759             # SYSTEM RELOAD DICTIONARY shortcut. Validates the dictionary name.
760             sub dictionary_reload {
761 0     0 1   my ($self, $name, $cb) = @_;
762 0 0 0       die "Usage: \$ch->dictionary_reload(\$name, \$cb)"
763             unless defined $name && ref($cb) eq 'CODE';
764 0 0         die "dictionary_reload: invalid name '$name'" unless $name =~ $RE_TABLE;
765 0           $self->query("system reload dictionary " . EV::ClickHouse->bind_ident($name),
766             $cb);
767             }
768              
769             # REFRESH MATERIALIZED VIEW (server >= 23.12). Validates the view name.
770             sub refresh_view {
771 0     0 1   my ($self, $name, $cb) = @_;
772 0 0 0       die "Usage: \$ch->refresh_view(\$name, \$cb)"
773             unless defined $name && ref($cb) eq 'CODE';
774 0 0         die "refresh_view: invalid name '$name'" unless $name =~ $RE_TABLE;
775 0           $self->query("system refresh view " . EV::ClickHouse->bind_ident($name),
776             $cb);
777             }
778              
779             # Poll system.mutations until a table's incomplete mutations finish.
780             # ALTER ... UPDATE/DELETE is asynchronous; this resolves once the
781             # mutation(s) reach is_done=1. $cb->({ pending => 0 }) on success,
782             # $cb->(undef, $err) on a failed mutation / query error / timeout.
783             # poll => seconds between polls (default 1)
784             # timeout => give up after N seconds (optional)
785             # mutation_id => wait only for this specific mutation (optional)
786             sub wait_mutation {
787 0     0 1   my ($self, $table, $cb, %opts) = @_;
788 0 0 0       die "Usage: \$ch->wait_mutation(\$table, \$cb, [%opts])"
789             unless defined $table && ref($cb) eq 'CODE';
790 0 0         die "wait_mutation: invalid table '$table'" unless $table =~ $RE_TABLE;
791 0   0       my $poll = $opts{poll} // 1; # defined-or: an explicit poll => 0 is honored
792 0           my $timeout = $opts{timeout};
793 0           my $mid = $opts{mutation_id};
794 0 0         my ($db, $name) = $table =~ /\./ ? split(/\./, $table, 2) : (undef, $table);
795              
796             # A mutation that keeps failing stays is_done=0 with latest_fail_reason
797             # set, so filtering on is_done=0 surfaces both running and failing ones.
798 0 0         my $sql = "select count() as pending,"
    0          
799             . " anyIf(latest_fail_reason, latest_fail_reason != '') as fail"
800             . " from system.mutations"
801             . " where table = {t:String} and is_done = 0"
802             . (defined $db ? " and database = {d:String}" : "")
803             . (defined $mid ? " and mutation_id = {m:String}" : "");
804 0           my %params = (t => $name);
805 0 0         $params{d} = $db if defined $db;
806 0 0         $params{m} = $mid if defined $mid;
807              
808 0           my $started = EV::time();
809 0           my $fail_streak = 0; # consecutive polls that saw a fail reason
810 0           my $timer; my $poll_once;
811             $poll_once = sub {
812             # query() can croak synchronously (e.g. "not connected" if the
813             # connection was finished between polls); route that to $cb
814             # instead of letting it escape into the event loop.
815 0     0     my $ok = eval {
816             $self->query($sql, { params => \%params }, sub {
817 0           my ($rows, $err) = @_;
818 0 0         if ($err) { undef $poll_once; return $cb->(undef, $err) }
  0            
  0            
819 0 0 0       my $r = $rows && @$rows ? $rows->[0] : [0, ''];
820 0           my ($pending, $fail) = @$r;
821             # Require a fail reason to persist across polls: a single
822             # transient latest_fail_reason can clear on the mutation's
823             # next retry, so one sighting is not a terminal failure.
824 0 0 0       if (defined $fail && length $fail) {
825 0 0         if (++$fail_streak >= 2) {
826 0           undef $poll_once;
827 0           return $cb->(undef, "wait_mutation: $fail");
828             }
829             } else {
830 0           $fail_streak = 0;
831             }
832 0 0         if (!$pending) {
833 0           undef $poll_once;
834 0           return $cb->({ pending => 0 });
835             }
836 0 0 0       if (defined $timeout && EV::time() - $started >= $timeout) {
837 0           undef $poll_once;
838 0           return $cb->(undef,
839             "wait_mutation: timed out after ${timeout}s");
840             }
841 0           $timer = EV::timer($poll, 0, sub { undef $timer; $poll_once->() });
  0            
  0            
842 0           });
843 0           1;
844             };
845 0 0         if (!$ok) {
846 0   0       my $e = $@ || 'unknown error';
847 0           undef $poll_once;
848 0           $cb->(undef, $e);
849             }
850 0           };
851 0           $poll_once->();
852 0           return;
853             }
854              
855             # Parse a clickhouse[+native]:// URI into a hash without opening a
856             # connection. Lets tooling validate user-supplied URIs ahead of
857             # instantiation. Returns undef on a malformed URI.
858             sub parse_uri {
859 0     0 1   my ($class, $uri) = @_;
860 0 0         return undef unless defined $uri;
861 0 0         return undef unless $uri =~ m{
862             ^clickhouse(?:\+(\w+))?://
863             (?:([^:@]*?)(?::([^@]*))?\@)?
864             (\[[^\]]+\]|[^/:?]+)
865             (?::(\d+))?
866             (?:/([^?]*))?
867             (?:\?(.*))?$
868             }x;
869 0           my ($proto, $user, $pass, $host, $port, $db, $qs) =
870             ($1, $2, $3, $4, $5, $6, $7);
871 0           $host =~ s/^\[(.*)\]$/$1/;
872 0           my %out = (host => $host);
873 0 0         $out{protocol} = $proto if defined $proto;
874 0 0 0       $out{user} = _uri_unescape($user) if defined $user && $user ne '';
875 0 0         $out{password} = _uri_unescape($pass) if defined $pass;
876 0 0         $out{port} = $port + 0 if defined $port;
877 0 0 0       $out{database} = _uri_unescape($db) if defined $db && $db ne '';
878             # Query-string keys land at the top level (path-derived host/port/etc
879             # win on collision) so the resulting hash can be passed verbatim to new().
880 0           _uri_qs_into($qs, \%out);
881 0   0       $out{protocol} //= 'http'; # apply default after QS merge
882 0           return \%out;
883             }
884              
885             # is_retryable_error($code) -> bool. Class method (no $self required).
886             # Identifies the common transient ClickHouse error codes that warrant
887             # an automatic retry. Source: ClickHouse src/Common/ErrorCodes.cpp.
888             my %RETRYABLE = map { $_ => 1 } (
889             159, # TIMEOUT_EXCEEDED
890             202, # TOO_MANY_SIMULTANEOUS_QUERIES
891             203, # NO_FREE_CONNECTION
892             209, # SOCKET_TIMEOUT
893             210, # NETWORK_ERROR
894             241, # MEMORY_LIMIT_EXCEEDED (often transient under contention)
895             242, # TABLE_IS_READ_ONLY (replica catching up)
896             252, # TOO_MANY_PARTS (merge backlog, retry after backoff)
897             285, # TOO_FEW_LIVE_REPLICAS
898             319, # UNKNOWN_STATUS_OF_INSERT (idempotent insert salvages this)
899             373, # SESSION_IS_LOCKED
900             # NB: deliberately NOT including 394 (QUERY_WAS_CANCELLED). A
901             # cancellation expresses caller intent; auto-retrying would loop
902             # and burn server resources for no benefit.
903             439, # CANNOT_SCHEDULE_TASK
904             999, # KEEPER_EXCEPTION
905             );
906             sub is_retryable_error {
907 0     0 1   my ($self_or_class, $code) = @_;
908 0 0         return 0 unless defined $code;
909 0 0         $RETRYABLE{ $code + 0 } ? 1 : 0;
910             }
911              
912             # bind_ident($name) -> backtick-quoted identifier safe for SQL splicing.
913             # ClickHouse identifier rules: alnum + _, no leading digit, optional
914             # dotted form ("db.table"). We accept that subset, croak otherwise.
915             # The $RE_IDENT validation rejects backticks outright, so the quoter
916             # can just wrap each part without escaping.
917             sub bind_ident {
918 0     0 1   my ($self_or_class, $name) = @_;
919 0 0 0       die "bind_ident: identifier must be defined and non-empty"
920             unless defined $name && length $name;
921 0           my @parts = split /\./, $name, -1;
922             die "bind_ident: empty component in '$name'"
923 0 0         if grep { !length } @parts;
  0            
924 0           for my $p (@parts) {
925 0 0         die "bind_ident: invalid identifier component '$p' in '$name'"
926             unless $p =~ $RE_IDENT;
927             }
928 0           join '.', map { "`$_`" } @parts;
  0            
929             }
930              
931             # Capability table: feature name → minimum native server revision.
932             # Lets user code branch cleanly on protocol features instead of
933             # hard-coding numeric revisions all over the place. HTTP connections
934             # have no protocol revision (server_revision == 0), so server_supports
935             # returns false for any non-trivial feature on HTTP - by design.
936             my %FEATURES = (
937             block_info => 51903, # DBMS_MIN_REVISION_WITH_BLOCK_INFO
938             server_display_name => 54372,
939             version_patch => 54401,
940             progress_writes => 54420,
941             server_timezone => 54423,
942             addendum => 54458,
943             );
944             sub server_supports {
945 0     0 1   my ($self, $feature) = @_;
946 0 0         return 0 unless ref $self;
947 0 0         return 0 unless defined $feature;
948 0           my $required = $FEATURES{$feature};
949 0 0         return 0 unless defined $required;
950 0 0         my $have = $self->server_revision or return 0;
951 0           $have >= $required;
952             }
953              
954             package EV::ClickHouse::Streamer;
955              
956             sub _new {
957 0     0     my ($class, $ch, $table, %opts) = @_;
958             # `columns => [@names]` enables named-row mode: push_row({}) hashes
959             # are reordered into arrayref by the streamer instead of the caller
960             # having to know column position. `named => 1` is a tolerated alias
961             # but `columns` is what actually drives the lookup.
962 0           my $columns = $opts{columns};
963 0 0 0       die "insert_streamer: columns must be a non-empty arrayref"
      0        
964             if defined $columns && (ref($columns) ne 'ARRAY' || !@$columns);
965             bless {
966             ch => $ch,
967             table => $table,
968             settings => $opts{settings}, # per-insert hashref
969             batch_size => $opts{batch_size} || 10_000,
970       0     on_batch_error => $opts{on_batch_error} || sub { }, # per-failure cb
971             high_water => $opts{high_water} || 0, # 0 = disabled
972       0     on_high_water => $opts{on_high_water} || sub { }, # ($buffered, $in_flight)
973             # low_water: threshold at which await_drain callbacks fire.
974             # Defaults to half of high_water; honoured only when high_water
975             # is set (otherwise await_drain fires whenever buffer is empty).
976             # Use defined() so an explicit `low_water => 0` (fire only when
977             # the buffer is fully empty) is respected, not treated as unset.
978             low_water => defined($opts{low_water})
979             ? $opts{low_water}
980 0 0 0       : (($opts{high_water} || 0) / 2),
      0        
      0        
      0        
      0        
981             columns => $columns,
982             high_water_active => 0,
983             buffer => [],
984             in_flight => 0,
985             pending_finish => undef,
986             drain_waiters => [],
987             sticky_err => undef,
988             }, $class;
989             }
990              
991 0     0     sub push_row { EV::ClickHouse::_streamer_push_row(@_) }
992              
993             sub _flush {
994 0     0     my $self = shift;
995 0 0 0       return if $self->{in_flight} || !@{ $self->{buffer} };
  0            
996 0           my $batch = $self->{buffer};
997 0           $self->{buffer} = [];
998 0           $self->{in_flight} = 1;
999             my $cb = sub {
1000 0     0     my (undef, $err) = @_;
1001 0           $self->{in_flight} = 0;
1002 0 0         if ($err) {
1003 0   0       $self->{sticky_err} //= $err;
1004 0           $self->{on_batch_error}->($err);
1005             }
1006             # Drain any buffered rows queued during the batch
1007 0           $self->_flush;
1008             # Reset high_water latch once we drop below the threshold
1009             $self->{high_water_active} = 0
1010             if $self->{high_water_active}
1011 0 0 0       && @{ $self->{buffer} } < $self->{high_water};
  0            
1012             # Fire await_drain waiters only when the buffer is at/below
1013             # low_water AND nothing is in flight. _flush above may have
1014             # immediately re-dispatched (in_flight=1, buffer=[]); waking
1015             # the waiter then would lie about the streamer being idle.
1016 0           my $low = $self->{low_water};
1017 0 0 0       if (!$self->{in_flight}
      0        
      0        
1018 0           && @{ $self->{drain_waiters} }
1019 0           && @{ $self->{buffer} } <= ($low || 0)) {
1020 0           my @w = @{ $self->{drain_waiters} };
  0            
1021 0           $self->{drain_waiters} = [];
1022 0           $_->(undef) for @w; # undef err = normal drain
1023             }
1024             # Notify finish() if no work remains
1025 0 0 0       if ($self->{pending_finish}
      0        
1026             && !$self->{in_flight}
1027 0           && !@{ $self->{buffer} }) {
1028 0           my $fcb = delete $self->{pending_finish};
1029 0           $fcb->(undef, $self->{sticky_err});
1030             }
1031 0           };
1032 0 0         my @opt = $self->{settings} ? ($self->{settings}) : ();
1033 0           $self->{ch}->insert($self->{table}, $batch, @opt, $cb);
1034             }
1035              
1036             sub finish {
1037 0     0     my ($self, $cb) = @_;
1038 0 0         die "Usage: \$streamer->finish(\$cb)" unless ref($cb) eq 'CODE';
1039 0           $self->_flush;
1040 0 0 0       if (!$self->{in_flight} && !@{ $self->{buffer} }) {
1041 0           $cb->(undef, $self->{sticky_err});
1042             } else {
1043 0           $self->{pending_finish} = $cb;
1044             }
1045 0           return;
1046             }
1047              
1048 0     0     sub buffered_count { scalar @{ $_[0]{buffer} } }
  0            
1049 0     0     sub in_flight { $_[0]{in_flight} }
1050 0     0     sub sticky_error { $_[0]{sticky_err} }
1051              
1052             # Register a callback that fires once the buffered row count drops
1053             # to low_water (defaults to high_water/2; 0 if high_water not set).
1054             # Pairs with on_high_water to close the backpressure loop:
1055             # on_high_water => sub { $producer->pause },
1056             # $streamer->await_drain(sub { $producer->resume });
1057             # Fires synchronously if the buffer is already at/below the threshold.
1058             sub await_drain {
1059 0     0     my ($self, $cb) = @_;
1060 0 0         die "Usage: \$streamer->await_drain(\$cb)" unless ref($cb) eq 'CODE';
1061 0   0       my $low = $self->{low_water} || 0;
1062             # Fire synchronously when nothing is in flight AND the buffer is
1063             # at/below low_water: there's no flush pending so no waiter would
1064             # ever fire otherwise.
1065 0 0 0       return $cb->(undef) if !$self->{in_flight} && @{ $self->{buffer} } <= $low;
  0            
1066 0           push @{ $self->{drain_waiters} }, $cb;
  0            
1067 0           return;
1068             }
1069              
1070             # Discard buffered rows + sticky error without finishing. Useful for
1071             # "retry after permanent error" patterns where the producer wants to
1072             # wipe the slate clean (typically after a schema-level fix) and keep
1073             # pushing into the same streamer object. The underlying $ch is NOT
1074             # touched - any in-flight batch already on the wire still completes.
1075             sub reset {
1076 0     0     my ($self) = @_;
1077 0           $self->{buffer} = [];
1078 0           $self->{sticky_err} = undef;
1079 0           $self->{high_water_active} = 0;
1080             # Deliver any pending finish/drain callbacks with a reset error
1081             # rather than silently dropping them - quiet loss of a finish cb
1082             # leaves the producer waiting forever.
1083 0           my $pf = delete $self->{pending_finish};
1084 0 0         my @dw = @{ delete $self->{drain_waiters} || [] };
  0            
1085 0           $self->{drain_waiters} = [];
1086 0 0         $pf->(undef, 'streamer reset') if $pf;
1087 0           $_->('streamer reset') for @dw;
1088 0           return $self;
1089             }
1090              
1091             # Discover column names from the target table via for_table, then
1092             # enable named-rows mode by populating $self->{columns}. Callback
1093             # receives undef on success, error string on failure. Useful when
1094             # the producer side doesn't know (or shouldn't care about) the
1095             # schema in advance.
1096             sub columns_from_table {
1097 0     0     my ($self, $cb) = @_;
1098 0 0         die "Usage: \$streamer->columns_from_table(\$cb)" unless ref($cb) eq 'CODE';
1099             $self->{ch}->for_table($self->{table}, sub {
1100 0     0     my ($info, $err) = @_;
1101 0 0         return $cb->($err) if $err;
1102 0           $self->{columns} = [ map { $_->{name} } @{ $info->{columns} } ];
  0            
  0            
1103 0           $cb->(undef);
1104 0           });
1105 0           return;
1106             }
1107              
1108             package EV::ClickHouse::Pool;
1109 48     48   561 use Scalar::Util qw(refaddr);
  48         78  
  48         130483  
1110              
1111             # Built-in connection pool. Round-robin dispatch with least-busy fallback;
1112             # each connection is independent (own auto_reconnect, own send_queue),
1113             # so a hung query on one doesn't block the others. Pass any EV::ClickHouse
1114             # constructor option in %args; it's applied to every pool member.
1115             #
1116             # my $pool = EV::ClickHouse::Pool->new(host => 'ch', size => 10, ...);
1117             # $pool->query($sql, $cb);
1118             # $pool->insert($table, $data, $cb);
1119             # $pool->drain(sub { ... }); # all connections drained
1120             # $pool->finish;
1121             sub new {
1122 0     0     my ($class, %args) = @_;
1123 0   0       my $size = delete $args{size} || 4;
1124 0 0         die "Pool size must be >= 1" if $size < 1;
1125             # Circuit breaker per member. After `circuit_threshold` consecutive
1126             # query/insert failures, mark the member dead for `circuit_cooldown`
1127             # seconds; _pick skips dead members. 0 disables.
1128 0   0       my $threshold = delete $args{circuit_threshold} || 0;
1129 0   0       my $cooldown = delete $args{circuit_cooldown} || 30;
1130 0           my @conns;
1131 0           for (1 .. $size) {
1132 0           push @conns, EV::ClickHouse->new(%args);
1133             }
1134             bless {
1135             conns => \@conns,
1136             idx => 0,
1137             cb_thresh => $threshold,
1138             cb_cool => $cooldown,
1139 0           cb_state => [ map { { fails => 0, dead_until => 0 } } @conns ],
  0            
1140             }, $class;
1141             }
1142              
1143             # Pick the connection with the fewest in-flight queries; ties broken by
1144             # round-robin. With circuit_threshold > 0, dead members are skipped
1145             # (unless all are dead - then the breaker is bypassed). Hot path - the
1146             # implementation lives in XS for ~5x lower per-pick cost.
1147 0     0     sub _pick { EV::ClickHouse::_pool_pick($_[0]) }
1148              
1149             # Find the {fails,dead_until} slot for a given $ch (object identity match).
1150             sub _slot_for {
1151 0     0     my ($self, $ch) = @_;
1152 0           my $r = refaddr $ch;
1153 0           for my $i (0 .. $#{ $self->{conns} }) {
  0            
1154 0 0         return $self->{cb_state}[$i] if refaddr($self->{conns}[$i]) == $r;
1155             }
1156 0           return undef;
1157             }
1158              
1159             # Wrap the user callback so the circuit breaker can observe success/failure.
1160             # The user's $cb is the LAST argument of query/insert/ping (per the public
1161             # API). The slot update itself is in XS (_breaker_observe) so the wrapper
1162             # closure body is one XSUB call rather than a handful of Perl ops.
1163             sub _cb_observer {
1164 0     0     my ($self, $ch, $user_cb, $observe_failures) = @_;
1165 0   0       $observe_failures //= 1;
1166 0 0 0       return $user_cb unless $self->{cb_thresh} && ref($user_cb) eq 'CODE';
1167 0 0         my $slot = ref($ch) ? $self->_slot_for($ch) : $self->{cb_state}[$ch];
1168 0 0         return $user_cb unless $slot;
1169 0           my $thresh = $self->{cb_thresh};
1170 0           my $cool = $self->{cb_cool};
1171             sub {
1172             # all-dead fallback: observe successes only (resets the breaker
1173             # on recovery), skip failures (so under-load loss doesn't extend
1174             # dead_until repeatedly).
1175 0 0 0 0     EV::ClickHouse::_breaker_observe($slot, $_[1], $thresh, $cool)
1176             if $observe_failures || !$_[1];
1177 0           $user_cb->(@_);
1178 0           };
1179             }
1180              
1181             # Dispatch a method on a picked connection, wrapping the trailing CODE arg
1182             # (the user callback) with the circuit-breaker observer so success/failure
1183             # updates the per-member slot.
1184             sub _dispatch {
1185 0     0     my ($self, $method, @rest) = @_;
1186 0           my $ch = $self->_pick;
1187 0 0         $rest[-1] = $self->_cb_observer($ch, $rest[-1]) if ref $rest[-1] eq 'CODE';
1188 0           $ch->$method(@rest);
1189             }
1190              
1191 0     0     sub query { shift->_dispatch(query => @_) }
1192 0     0     sub insert { shift->_dispatch(insert => @_) }
1193 0     0     sub ping { shift->_dispatch(ping => @_) }
1194              
1195 0     0     sub for_table { shift->_pick->for_table(@_) }
1196 0     0     sub iterate { shift->_pick->iterate(@_) }
1197 0     0     sub insert_streamer { shift->_pick->insert_streamer(@_) }
1198              
1199             # Same as _dispatch but pins the target to $conn[$idx] instead of
1200             # polling _pick. Circuit-breaker observation still applies.
1201             sub _dispatch_to {
1202 0     0     my ($self, $method, $idx, @rest) = @_;
1203             die "${method}_to: index $idx out of range"
1204 0 0 0       if $idx < 0 || $idx >= @{ $self->{conns} };
  0            
1205 0           my $ch = $self->{conns}[$idx];
1206 0 0         $rest[-1] = $self->_cb_observer($ch, $rest[-1]) if ref $rest[-1] eq 'CODE';
1207 0           $ch->$method(@rest);
1208             }
1209 0     0     sub query_to { shift->_dispatch_to(query => @_) }
1210 0     0     sub insert_to { shift->_dispatch_to(insert => @_) }
1211              
1212             # Nominate a member: returns its connection object so subsequent calls
1213             # stick to it. The caller is responsible for not abusing this (the pool
1214             # can't apply the circuit breaker to calls it doesn't see).
1215             sub nominate {
1216 0     0     my ($self, $idx) = @_;
1217 0 0 0       die "nominate: index $idx out of range" if $idx < 0 || $idx >= @{ $self->{conns} };
  0            
1218 0           $self->{conns}[$idx];
1219             }
1220              
1221             # Hedged read: dispatch the same query to N (default 2) distinct
1222             # members and resolve with whichever returns first. Subsequent
1223             # completions are silently dropped. Errors are reported only if every
1224             # member fails. Recommended for tail-latency-sensitive selects on
1225             # replicated tables; do NOT use for insert (would silently double-write
1226             # on dedupe miss). $cb receives ($rows, undef, $member_idx) on success
1227             # or (undef, $err) when every member fails.
1228             sub hedged_query {
1229 0     0     my ($self, $sql, @rest) = @_;
1230 0           my $cb = pop @rest;
1231 0           my %opts = @rest;
1232 0   0       my $hedge_n = delete $opts{hedge} // 2;
1233 0           my $settings = delete $opts{settings};
1234 0 0         die "hedged_query: callback required" unless ref($cb) eq 'CODE';
1235 0 0         die "hedged_query: unknown options: " . join(', ', sort keys %opts)
1236             if %opts;
1237 0           my @c = @{ $self->{conns} };
  0            
1238 0 0         die "hedged_query: no members" unless @c;
1239             # Filter out circuit-broken members. If the breaker tripped everywhere
1240             # fall back to the full set so the caller still hears something — but
1241             # in that fallback skip _cb_observer too, otherwise every failed hedge
1242             # extends each member's dead_until (resetting cooldown indefinitely
1243             # under load).
1244 0           my $now = EV::time();
1245 0           my @alive = grep { $self->{cb_state}[$_]{dead_until} <= $now } 0 .. $#c;
  0            
1246 0           my $all_dead = !@alive;
1247 0 0         @alive = (0 .. $#c) if $all_dead;
1248 0 0         $hedge_n = @alive if $hedge_n > @alive;
1249 0 0         $hedge_n = 1 if $hedge_n < 1;
1250             # Reservoir-style shuffle for distinct random picks.
1251 0           my @pool = @alive;
1252 0           my @idx;
1253 0           while (@idx < $hedge_n) { push @idx, splice(@pool, int(rand(scalar @pool)), 1) }
  0            
1254 0           my $fired = 0;
1255 0           my $pending = scalar @idx;
1256 0           my $first_err;
1257 0           for my $i (@idx) {
1258 0           my $ch = $c[$i];
1259             my $inner = sub {
1260 0     0     my ($rows, $err) = @_;
1261 0           $pending--;
1262 0 0         return if $fired;
1263 0 0         if (!$err) {
1264 0           $fired = 1;
1265             # 3rd arg = winning member index, so callers can attribute
1266             # wins / track per-replica latency without scanning conns.
1267 0           $cb->($rows, undef, $i);
1268 0           return;
1269             }
1270 0   0       $first_err //= $err;
1271 0 0         if (!$pending) {
1272 0           $fired = 1;
1273 0           $cb->(undef, $first_err);
1274             }
1275 0           };
1276             # Pass $i (not $ch) so _cb_observer can index cb_state directly
1277             # instead of walking conns via _slot_for.
1278 0           my $obs = $self->_cb_observer($i, $inner, !$all_dead);
1279 0 0         if ($settings) { $ch->query($sql, $settings, $obs) }
  0            
1280 0           else { $ch->query($sql, $obs) }
1281             }
1282 0           return;
1283             }
1284              
1285             # Circuit breaker introspection: per-member state for monitoring.
1286             # Returns ({ fails => N, dead_until => $epoch_seconds, alive => 0|1 }, ...).
1287             sub circuit_state {
1288 0     0     my $self = shift;
1289 0           my $now = EV::time();
1290             map +{ %$_, alive => $_->{dead_until} <= $now },
1291 0           @{ $self->{cb_state} };
  0            
1292             }
1293              
1294             # Aggregate stats
1295 0     0     sub size { scalar @{ $_[0]{conns} } }
  0            
1296 0     0     sub pending_count { my $t = 0; $t += $_->pending_count for @{ $_[0]{conns} }; $t }
  0            
  0            
  0            
1297 0     0     sub conns { @{ $_[0]{conns} } }
  0            
1298              
1299             # Apply a code ref to every pool member. The callback receives
1300             # ($conn, $idx) per call. Useful for warm-up (preload dictionaries,
1301             # set session-level variables, dispatch a probe per member). The
1302             # callback is invoked synchronously in pool order; if it throws,
1303             # subsequent members are still visited (errors silently swallowed,
1304             # matching the broadcast cancel/skip_pending/reset convention).
1305             sub with_each {
1306 0     0     my ($self, $cb) = @_;
1307 0 0         die "Usage: \$pool->with_each(\$cb)" unless ref($cb) eq 'CODE';
1308 0           my @c = @{ $self->{conns} };
  0            
1309 0           for my $i (0 .. $#c) { eval { $cb->($c[$i], $i) } }
  0            
  0            
1310 0           return;
1311             }
1312              
1313             # Broadcast the same SELECT to every member and collect per-member
1314             # results. Useful for `system.replicas`-style diagnostics where each
1315             # shard needs to be queried directly. Callback fires once with an
1316             # arrayref of { member => $i, rows => [...], err => $msg }, ordered
1317             # by member index. Per-query settings are honoured. Dead members are
1318             # included in the result with a "circuit open" error string rather
1319             # than dispatched — the breaker would refuse them anyway.
1320             sub fan_out {
1321 0     0     my ($self, $sql, @rest) = @_;
1322 0           my $cb = pop @rest;
1323 0           my %opts = @rest;
1324 0           my $settings = delete $opts{settings};
1325 0 0         die "fan_out: callback required" unless ref($cb) eq 'CODE';
1326 0           my @c = @{ $self->{conns} };
  0            
1327 0 0         die "fan_out: no members" unless @c;
1328 0           my @out = map { { member => $_, rows => undef, err => undef } } 0 .. $#c;
  0            
1329 0           my $left = scalar @c;
1330 0 0   0     my $deliver = sub { $cb->(\@out) unless --$left };
  0            
1331 0           my $now = EV::time();
1332 0           for my $i (0 .. $#c) {
1333 0           my $ch = $c[$i];
1334             # Short-circuit dead members so a long cooldown doesn't stall fan_out.
1335 0 0 0       if ($self->{cb_thresh}
1336             && $self->{cb_state}[$i]{dead_until} > $now) {
1337 0           $out[$i]{err} = "fan_out: member $i circuit open";
1338 0           $deliver->();
1339 0           next;
1340             }
1341             my $obs = $self->_cb_observer($i, sub {
1342 0     0     ($out[$i]{rows}, $out[$i]{err}) = @_;
1343 0           $deliver->();
1344 0           });
1345             # Wrap each member's dispatch so a synchronous croak (e.g.
1346             # "not connected" before auto_reconnect catches up) doesn't
1347             # strand the rest of the callbacks waiting on $left.
1348             eval {
1349 0 0         if ($settings) { $ch->query($sql, $settings, $obs) }
  0            
1350 0           else { $ch->query($sql, $obs) }
1351 0           1;
1352 0 0         } or do {
1353 0   0       $out[$i]{err} = "$@" || "fan_out: dispatch failed";
1354 0           $deliver->();
1355             };
1356             }
1357 0           return;
1358             }
1359              
1360             # Checkout-style pin: hand the user a least-busy member, run their cb
1361             # with ($conn, $release). Until $release->() is called, the pool will
1362             # avoid handing the same member to other callers via _pick — useful
1363             # for temp tables / SET / session-state work that must land on the
1364             # same connection across multiple queries.
1365             sub with_session {
1366 0     0     my ($self, $cb) = @_;
1367 0 0         die "Usage: \$pool->with_session(\$cb)" unless ref($cb) eq 'CODE';
1368 0           my $ch = $self->_pick;
1369 0           my $r = refaddr $ch;
1370 0   0       $self->{_pinned}{$r} = ($self->{_pinned}{$r} // 0) + 1;
1371 0           my $released = 0;
1372             my $release = sub {
1373 0 0   0     return if $released++;
1374 0 0         if (--$self->{_pinned}{$r} <= 0) { delete $self->{_pinned}{$r} }
  0            
1375 0           };
1376 0 0         eval { $cb->($ch, $release); 1 } or do {
  0            
  0            
1377 0           my $err = $@;
1378 0           $release->();
1379 0           die $err;
1380             };
1381 0           return;
1382             }
1383              
1384             # Drain when ALL connections have completed pending work.
1385             sub drain {
1386 0     0     my ($self, $cb) = @_;
1387 0           my $left = scalar @{ $self->{conns} };
  0            
1388 0           my $err;
1389 0           for my $c (@{ $self->{conns} }) {
  0            
1390             $c->drain(sub {
1391 0 0 0 0     $err //= $_[0] if $_[0];
1392 0 0         $cb->($err) unless --$left;
1393 0           });
1394             }
1395             }
1396              
1397 0     0     sub finish { $_->finish for @{ $_[0]{conns} } }
  0            
1398              
1399             # Coordinated graceful shutdown: drain every member, then finish. If
1400             # the optional $grace_seconds elapses before all members drain, force
1401             # finish and report a timeout in the callback. Callback receives undef
1402             # on clean shutdown, an error string on per-member drain error or
1403             # timeout. $cb is optional.
1404             #
1405             # $pool->shutdown(10, sub {
1406             # my ($err) = @_;
1407             # warn "shutdown: $err" if $err;
1408             # EV::break;
1409             # });
1410             sub shutdown {
1411 0     0     my ($self, $grace, $cb) = @_;
1412             # Two-arg form: $pool->shutdown($cb). Treat the coderef as the cb
1413             # with no grace timer rather than silently dropping it.
1414 0 0         ($grace, $cb) = (undef, $grace) if ref($grace) eq 'CODE';
1415 0   0 0     $cb //= sub { };
1416 0 0         die "Usage: \$pool->shutdown([\$grace_seconds], \$cb)" unless ref($cb) eq 'CODE';
1417 0           my $left = scalar @{ $self->{conns} };
  0            
1418 0           my $err;
1419             my $timer;
1420 0           my $fired = 0;
1421             my $finalize = sub {
1422 0 0   0     return if $fired++;
1423 0           undef $timer;
1424 0           $self->finish;
1425 0           $cb->($err);
1426 0           };
1427 0           for my $c (@{ $self->{conns} }) {
  0            
1428             $c->drain(sub {
1429 0 0 0 0     $err //= $_[0] if $_[0];
1430 0 0         $finalize->() if --$left == 0;
1431 0           });
1432             }
1433 0 0 0       if ($grace && $grace > 0) {
1434             $timer = EV::timer($grace, 0, sub {
1435 0   0 0     $err //= "Pool::shutdown timed out after ${grace}s";
1436 0           $finalize->();
1437 0           });
1438             }
1439 0           return;
1440             }
1441              
1442             # Broadcast-to-all helpers — these touch every member because they affect
1443             # state owned per connection (queued queries, in-flight cancellation, the
1444             # socket itself). Picking a single member would silently leave the other
1445             # pool connections untouched.
1446 0     0     sub cancel { for (@{ $_[0]{conns} }) { eval { $_->cancel } } }
  0            
  0            
  0            
1447 0     0     sub skip_pending { for (@{ $_[0]{conns} }) { eval { $_->skip_pending } } }
  0            
  0            
  0            
1448 0     0     sub reset { for (@{ $_[0]{conns} }) { eval { $_->reset } } }
  0            
  0            
  0            
1449              
1450             package EV::ClickHouse::Iterator;
1451              
1452 0     0     sub next { EV::ClickHouse::_iterator_next(@_) }
1453              
1454 0     0     sub error { $_[0]{err} }
1455 0 0   0     sub is_done { $_[0]{done} && !@{ $_[0]{batches} } }
  0            
1456 0     0     sub cancel { $_[0]{ch}->cancel }
1457              
1458             package EV::ClickHouse::Error;
1459              
1460             # Lightweight error object — wraps the (message, code) pair callers
1461             # already get on $err, plus a symbolic name and is_retryable boolean.
1462             # Stringifies to the message so legacy callsites that string-compare
1463             # against $err keep working.
1464 48     48   38670 use overload '""' => sub { $_[0]{message} }, fallback => 1;
  48     0   107852  
  48         491  
  0            
1465              
1466             # Symbolic names for ClickHouse error codes that user code is likely
1467             # to want to branch on by name. Sourced from src/Common/ErrorCodes.cpp;
1468             # extend liberally — the table is informational only.
1469             my %CODE_NAME = (
1470             0 => 'OK',
1471             27 => 'CANNOT_PARSE_INPUT_ASSERTION_FAILED',
1472             32 => 'ATTEMPT_TO_READ_AFTER_EOF',
1473             33 => 'CANNOT_READ_ALL_DATA',
1474             44 => 'ILLEGAL_COLUMN',
1475             47 => 'UNKNOWN_IDENTIFIER',
1476             60 => 'UNKNOWN_TABLE',
1477             62 => 'SYNTAX_ERROR',
1478             81 => 'UNKNOWN_DATABASE',
1479             86 => 'RECEIVED_ERROR_FROM_REMOTE_IO_SERVER',
1480             113 => 'UNKNOWN_SETTING',
1481             159 => 'TIMEOUT_EXCEEDED',
1482             164 => 'READONLY',
1483             192 => 'UNKNOWN_USER',
1484             193 => 'WRONG_PASSWORD',
1485             194 => 'REQUIRED_PASSWORD',
1486             202 => 'TOO_MANY_SIMULTANEOUS_QUERIES',
1487             203 => 'NO_FREE_CONNECTION',
1488             209 => 'SOCKET_TIMEOUT',
1489             210 => 'NETWORK_ERROR',
1490             225 => 'NO_ZOOKEEPER',
1491             236 => 'ABORTED',
1492             241 => 'MEMORY_LIMIT_EXCEEDED',
1493             242 => 'TABLE_IS_READ_ONLY',
1494             252 => 'TOO_MANY_PARTS',
1495             285 => 'TOO_FEW_LIVE_REPLICAS',
1496             319 => 'UNKNOWN_STATUS_OF_INSERT',
1497             341 => 'UNFINISHED',
1498             373 => 'SESSION_IS_LOCKED',
1499             389 => 'INSERT_WAS_DEDUPLICATED',
1500             394 => 'QUERY_WAS_CANCELLED',
1501             439 => 'CANNOT_SCHEDULE_TASK',
1502             497 => 'ACCESS_DENIED',
1503             516 => 'AUTHENTICATION_FAILED',
1504             999 => 'KEEPER_EXCEPTION',
1505             );
1506              
1507             sub new {
1508 0     0     my ($class, %args) = @_;
1509             bless {
1510             message => $args{message} // '',
1511 0   0       code => $args{code} // 0,
      0        
1512             }, $class;
1513             }
1514              
1515             # Convenience: build from ($ch, $err) — the typical pair that callbacks
1516             # receive. The ClickHouse-side code comes from $ch->last_error_code.
1517             sub from_ch {
1518 0     0     my ($class, $ch, $err) = @_;
1519 0 0 0       return undef unless defined $err && length $err;
1520             $class->new(
1521             message => "$err",
1522 0   0       code => (eval { $ch->last_error_code } || 0),
1523             );
1524             }
1525              
1526 0     0     sub message { $_[0]{message} }
1527 0     0     sub code { $_[0]{code} }
1528 0     0     sub name { $CODE_NAME{ $_[0]{code} } }
1529 0     0     sub is_retryable { EV::ClickHouse->is_retryable_error($_[0]{code}) }
1530              
1531             # Class-level table introspection.
1532 0   0 0     sub code_name { $CODE_NAME{ $_[1] // 0 } }
1533 0     0     sub known_codes { sort { $a <=> $b } keys %CODE_NAME }
  0            
1534              
1535             1;
1536              
1537             __END__