File Coverage

ClickHouse.xs
Criterion Covered Total %
statement 4 1324 0.3
branch 3 1330 0.2
condition n/a
subroutine n/a
pod n/a
total 7 2654 0.2


line stmt bran cond sub pod time code
1             #include "EXTERN.h"
2             #include "perl.h"
3             #include "XSUB.h"
4              
5             #include "EVAPI.h"
6             #include
7             #include
8             #include
9             #include
10             #include
11             #include
12             #include
13             #include
14             #include
15             #include
16             #include
17              
18             #define CH_MAX_DECOMPRESS_SIZE (128 * 1024 * 1024) /* 128 MB safety limit */
19              
20             #ifdef HAVE_LZ4
21             #include
22             #include "cityhash.h"
23              
24             #define CH_LZ4_METHOD 0x82
25             #define CH_CHECKSUM_SIZE 16
26             #define CH_COMPRESS_HEADER_SIZE 9 /* 1 (method) + 4 (compressed_size) + 4 (uncompressed_size) */
27             #endif
28              
29             #ifdef HAVE_OPENSSL
30             #include
31             #include
32             #include
33             #endif
34              
35             #include "ngx_queue.h"
36              
37             typedef struct ev_clickhouse_s ev_clickhouse_t;
38             typedef struct ev_ch_cb_s ev_ch_cb_t;
39             typedef struct ev_ch_send_s ev_ch_send_t;
40              
41             typedef ev_clickhouse_t* EV__ClickHouse;
42             typedef struct ev_loop* EV__Loop;
43              
44             #define EV_CH_MAGIC 0xC11C4011
45             #define EV_CH_FREED 0xFEEDFACE
46              
47             #define PROTO_HTTP 0
48             #define PROTO_NATIVE 1
49              
50             #define RECV_BUF_INIT 8192
51             #define SEND_BUF_INIT 4096
52              
53             /* ClickHouse native protocol client info */
54             #define CH_CLIENT_NAME "EV::ClickHouse"
55             #define CH_CLIENT_VERSION_MAJOR 0
56             #define CH_CLIENT_VERSION_MINOR 1
57             /* CH_CLIENT_REVISION is the protocol revision we negotiate. Bumping it
58             * unlocks server features (extra Progress fields, parallel-replica
59             * extensions, quota-key handshake additions, …) but each step requires
60             * matching client-side handling, otherwise the server starts sending
61             * fields we'd misframe. 54459 is the conservative anchor that lights up
62             * everything we currently parse. */
63             #define CH_CLIENT_REVISION 54459
64              
65             /* Protocol revision thresholds */
66             #define DBMS_MIN_REVISION_WITH_BLOCK_INFO 51903
67             #define DBMS_MIN_REVISION_WITH_SERVER_DISPLAY_NAME 54372
68             #define DBMS_MIN_REVISION_WITH_VERSION_PATCH 54401
69             #define DBMS_MIN_REVISION_WITH_PROGRESS_WRITES 54420
70             #define DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE 54423
71             #define DBMS_MIN_PROTOCOL_VERSION_WITH_ADDENDUM 54458
72              
73             /* Client packet types */
74             #define CLIENT_HELLO 0
75             #define CLIENT_QUERY 1
76             #define CLIENT_DATA 2
77             #define CLIENT_CANCEL 3
78             #define CLIENT_PING 4
79              
80             /* Server packet types */
81             #define SERVER_HELLO 0
82             #define SERVER_DATA 1
83             #define SERVER_EXCEPTION 2
84             #define SERVER_PROGRESS 3
85             #define SERVER_PONG 4
86             #define SERVER_END_OF_STREAM 5
87             #define SERVER_PROFILE_INFO 6
88             #define SERVER_TOTALS 7
89             #define SERVER_EXTREMES 8
90             #define SERVER_LOG 10
91             #define SERVER_TABLE_COLUMNS 11
92             #define SERVER_PROFILE_EVENTS 14
93             #define SERVER_TIMEZONE_UPDATE 17
94              
95             /* Query kind */
96             #define QUERY_INITIAL 1
97              
98             /* Query stage */
99             #define STAGE_COMPLETE 2
100              
101             /* Native protocol states */
102             #define NATIVE_IDLE 0
103             #define NATIVE_WAIT_HELLO 1
104             #define NATIVE_WAIT_RESULT 2
105             #define NATIVE_WAIT_INSERT_META 3
106              
107             /* Decode flags for column value formatting (opt-in) */
108             #define DECODE_DT_STR (1 << 0) /* Date/DateTime/DateTime64 → string */
109             #define DECODE_DEC_SCALE (1 << 1) /* Decimal → scaled NV */
110             #define DECODE_ENUM_STR (1 << 2) /* Enum → string label */
111             #define DECODE_NAMED_ROWS (1 << 3) /* results as arrayref of hashrefs */
112              
113             struct ev_clickhouse_s {
114             unsigned int magic;
115             struct ev_loop *loop;
116              
117             int fd;
118             ev_io rio, wio;
119             ev_timer timer;
120             int reading, writing, timing;
121             int connected, connecting;
122             int dns_pending; /* set while EV::cares is resolving the host;
123             query()/insert()/ping() queue against this
124             state so calls between new() and connect
125             don't croak with "not connected" */
126             unsigned int connect_gen; /* bumped by every start_connect; lets
127             fail_connection notice when the user
128             started a new connect from on_error */
129             int protocol; /* PROTO_HTTP or PROTO_NATIVE */
130              
131             #ifdef HAVE_OPENSSL
132             SSL_CTX *ssl_ctx;
133             SSL *ssl;
134             #endif
135             int tls_enabled;
136             char *tls_ca_file;
137             char *tls_cert_file; /* client certificate (mutual TLS) */
138             char *tls_key_file; /* client private key (mutual TLS) */
139              
140             /* connection params */
141             char *host, *user, *password, *database;
142             unsigned int port;
143              
144             /* send/recv buffers */
145             char *send_buf;
146             size_t send_len, send_pos, send_cap;
147             char *recv_buf;
148             size_t recv_len, recv_cap;
149              
150             /* native protocol state */
151             char *server_name;
152             char *server_display_name;
153             char *server_timezone;
154             unsigned int server_version_major, server_version_minor, server_revision;
155             unsigned int server_version_patch;
156             int native_state; /* NATIVE_IDLE, NATIVE_WAIT_HELLO, NATIVE_WAIT_RESULT, ... */
157             AV *native_rows; /* accumulate rows across Data blocks */
158             char *insert_data; /* pending TabSeparated data for two-phase INSERT */
159             size_t insert_data_len;
160             SV *insert_av; /* pending AV* of AV*s for arrayref INSERT */
161             char *insert_err; /* deferred error from unsupported INSERT encoding */
162              
163             /* queues */
164             ngx_queue_t cb_queue;
165             ngx_queue_t send_queue;
166             int pending_count;
167             int send_count;
168              
169             /* options */
170             char *session_id;
171             char *query_log_comment; /* prepended as a SQL block comment per query */
172             int compress;
173             double connect_timeout;
174             HV *default_settings; /* connection-level ClickHouse settings */
175              
176             SV *on_connect;
177             SV *on_error;
178             SV *on_progress;
179             SV *on_disconnect;
180             SV *on_query_complete; /* fires after each query (success or error) */
181             SV *on_query_start; /* fires when a query is dispatched */
182             SV *on_log; /* native SERVER_LOG packets */
183             SV *on_failover; /* multi-host: ($oh, $op, $nh, $np, $msg) */
184             char *last_tls_error; /* OpenSSL error from last failed handshake */
185             char **failover_hosts; /* parallel arrays: hosts + ports. */
186             unsigned int *failover_ports; /* NULL if multi-host failover disabled. */
187             int failover_n;
188             int failover_idx;
189             unsigned int failover_default_port;
190             double query_start_time; /* ev_now() captured in pipeline_advance */
191             int tls_skip_verify;
192             double query_timeout;
193             size_t max_query_size; /* 0 = unlimited; client-side croak guard */
194             size_t max_recv_buffer; /* 0 = unlimited; defensive recv ceiling */
195             int http_basic_auth; /* 0=X-ClickHouse-{User,Key} (default);
196             * 1=Authorization: Basic ... (for proxies) */
197             int auto_reconnect;
198             uint32_t decode_flags;
199             AV *native_col_names; /* column names from last native result */
200             AV *native_col_types; /* column type strings from last native result */
201             SV *on_drain; /* callback fired when pending_count drops to 0 */
202             char *last_query_id; /* query_id of the last dispatched query */
203             SV *on_trace; /* debug trace callback */
204             ev_timer ka_timer; /* keepalive timer */
205             double keepalive; /* keepalive interval (0 = disabled) */
206             int ka_timing;
207             unsigned int ka_in_flight; /* keepalive pings sent but not yet ack'd */
208             int callback_depth;
209             /* error info from last SERVER_EXCEPTION or HTTP error */
210             int32_t last_error_code;
211             /* profile info from last SERVER_PROFILE_INFO */
212             uint64_t profile_rows;
213             uint64_t profile_bytes;
214             uint64_t profile_rows_before_limit;
215             /* totals / extremes from last native query */
216             AV *native_totals;
217             AV *native_extremes;
218             /* reconnect backoff */
219             double reconnect_delay;
220             double reconnect_max_delay;
221             double reconnect_jitter; /* fractional [0, 1+]: actual delay
222             picks uniformly in [d, d*(1+jitter)] */
223             int reconnect_attempts;
224             int reconnect_max_attempts; /* 0 = unlimited */
225             int pending_addendum_finish; /* set when addendum partially written;
226             * io_cb completes finish_connect after drain */
227             ev_timer reconnect_timer;
228             int reconnect_timing;
229             /* on_progress throttling (0 = fire every packet) */
230             double progress_period;
231             double progress_last; /* ev_now() of last on_progress dispatch */
232             uint64_t progress_acc[5]; /* coalesced totals since last dispatch */
233             /* LowCardinality cross-block dictionary state */
234             SV ***lc_dicts; /* array of dictionaries, one per column */
235             uint64_t *lc_dict_sizes; /* size of each dictionary */
236             int lc_num_cols; /* number of columns with LC state */
237             };
238              
239             struct ev_ch_cb_s {
240             SV *cb;
241             int raw; /* return raw response body instead of parsed rows */
242             SV *on_data; /* per-query streaming callback (fires per block) */
243             SV *on_complete;/* per-query on_query_complete override (or NULL) */
244             double query_timeout; /* per-query timeout (0=use default) */
245             ngx_queue_t queue;
246             };
247              
248             struct ev_ch_send_s {
249             char *data; /* full HTTP request or native packet */
250             size_t data_len;
251             SV *cb;
252             char *insert_data; /* deferred TSV data for native INSERT */
253             size_t insert_data_len;
254             SV *insert_av; /* deferred AV* data for native INSERT */
255             int raw; /* return raw response body */
256             SV *on_data; /* per-query streaming callback */
257             SV *on_complete; /* per-query on_query_complete override */
258             double query_timeout; /* per-query timeout */
259             char *query_id; /* query_id for tracking */
260             ngx_queue_t queue;
261             };
262              
263             /* Forward declarations for helpers defined further down (or in xs/io.c)
264             * but called from earlier code in this file or from xs/*.c included
265             * before the definition site. */
266             static void timer_cb(EV_P_ ev_timer *w, int revents);
267             static void stop_keepalive(ev_clickhouse_t *self);
268             static void schedule_reconnect(ev_clickhouse_t *self);
269             static void lc_free_dicts(ev_clickhouse_t *self);
270             static void start_reading(ev_clickhouse_t *self);
271             static void stop_reading(ev_clickhouse_t *self);
272             static void start_writing(ev_clickhouse_t *self);
273             static void stop_writing(ev_clickhouse_t *self);
274             static void emit_error(ev_clickhouse_t *self, const char *msg);
275             static void emit_trace(ev_clickhouse_t *self, const char *fmt, ...);
276             static int cleanup_connection(ev_clickhouse_t *self);
277             static int cancel_pending(ev_clickhouse_t *self, const char *errmsg);
278             static int check_destroyed(ev_clickhouse_t *self);
279             static char *safe_strdup(const char *s);
280             static void failover_free(ev_clickhouse_t *self);
281             static int finish_connect(ev_clickhouse_t *self);
282             static int try_write(ev_clickhouse_t *self);
283             static int pipeline_advance(ev_clickhouse_t *self);
284              
285             /* Two helpers shared between the PL_dirty and normal arms of DESTROY:
286             * adding a new on_* slot or persistent string requires touching only
287             * one place. on_failover is omitted from CONNECTION_HANDLERS because
288             * failover_free() handles it together with the host ring. */
289             #define CLEAR_CONNECTION_HANDLERS(self) do { \
290             CLEAR_SV((self)->on_connect); \
291             CLEAR_SV((self)->on_error); \
292             CLEAR_SV((self)->on_progress); \
293             CLEAR_SV((self)->on_disconnect); \
294             CLEAR_SV((self)->on_query_complete); \
295             CLEAR_SV((self)->on_query_start); \
296             CLEAR_SV((self)->on_log); \
297             CLEAR_SV((self)->on_drain); \
298             CLEAR_SV((self)->on_trace); \
299             } while (0)
300              
301             #define CLEAR_PERSISTENT_STATE(self) do { \
302             CLEAR_STR((self)->last_tls_error); \
303             CLEAR_STR((self)->last_query_id); \
304             CLEAR_STR((self)->host); \
305             CLEAR_STR((self)->user); \
306             CLEAR_STR((self)->password); \
307             CLEAR_STR((self)->database); \
308             CLEAR_STR((self)->session_id); \
309             CLEAR_STR((self)->query_log_comment); \
310             CLEAR_STR((self)->tls_ca_file); \
311             CLEAR_STR((self)->tls_cert_file); \
312             CLEAR_STR((self)->tls_key_file); \
313             CLEAR_STR((self)->server_name); \
314             CLEAR_STR((self)->server_display_name); \
315             CLEAR_STR((self)->server_timezone); \
316             CLEAR_STR((self)->insert_err); \
317             CLEAR_STR((self)->recv_buf); \
318             CLEAR_STR((self)->send_buf); \
319             CLEAR_SV((self)->native_rows); \
320             CLEAR_SV((self)->native_col_names); \
321             CLEAR_SV((self)->native_col_types); \
322             CLEAR_SV((self)->native_totals); \
323             CLEAR_SV((self)->native_extremes); \
324             CLEAR_SV((self)->default_settings); \
325             } while (0)
326              
327             #include "xs/macros.h"
328             #include "xs/queues.c"
329              
330             /* --- watcher helpers --- */
331              
332 0           static void start_reading(ev_clickhouse_t *self) {
333 0 0         if (!self->reading && self->fd >= 0) {
    0          
334 0           ev_io_start(self->loop, &self->rio);
335 0           self->reading = 1;
336             }
337 0           }
338              
339 0           static void stop_reading(ev_clickhouse_t *self) {
340 0 0         if (self->reading) {
341 0           ev_io_stop(self->loop, &self->rio);
342 0           self->reading = 0;
343             }
344 0           }
345              
346 0           static void start_writing(ev_clickhouse_t *self) {
347 0 0         if (!self->writing && self->fd >= 0) {
    0          
348 0           ev_io_start(self->loop, &self->wio);
349 0           self->writing = 1;
350             }
351 0           }
352              
353 0           static void stop_writing(ev_clickhouse_t *self) {
354 0 0         if (self->writing) {
355 0           ev_io_stop(self->loop, &self->wio);
356 0           self->writing = 0;
357             }
358 0           }
359              
360 0           static void stop_timing(ev_clickhouse_t *self) {
361 0 0         if (self->timing) {
362 0           ev_timer_stop(self->loop, &self->timer);
363 0           self->timing = 0;
364             }
365 0           }
366              
367 0           static int check_destroyed(ev_clickhouse_t *self) {
368 0 0         if (self->magic == EV_CH_FREED && self->callback_depth == 0) {
    0          
369 0           Safefree(self);
370 0           return 1;
371             }
372 0           return 0;
373             }
374              
375             /* Free the per-connection failover host list (allocated by setter). */
376             /* Free just the host-list arrays. Called from _set_failover before
377             * re-populating + from failover_free below. Keeps on_failover alive. */
378 0           static void failover_free_hosts(ev_clickhouse_t *self) {
379 0 0         if (self->failover_hosts) {
380 0 0         for (int i = 0; i < self->failover_n; i++)
381 0 0         if (self->failover_hosts[i]) Safefree(self->failover_hosts[i]);
382 0           Safefree(self->failover_hosts);
383 0           self->failover_hosts = NULL;
384             }
385 0 0         if (self->failover_ports) {
386 0           Safefree(self->failover_ports);
387 0           self->failover_ports = NULL;
388             }
389 0           self->failover_n = 0;
390 0           self->failover_idx = 0;
391 0           }
392              
393             /* Full failover-state teardown including the on_failover SV. DESTROY only. */
394 0           static void failover_free(ev_clickhouse_t *self) {
395 0           failover_free_hosts(self);
396 0 0         if (self->on_failover) {
397 0           SvREFCNT_dec(self->on_failover);
398 0           self->on_failover = NULL;
399             }
400 0           }
401              
402             /* Word-boundary case-insensitive match against the failover-trigger
403             * keyword set. Used by emit_error to decide whether to rotate the
404             * multi-host ring. Returns 1 on match, 0 otherwise. */
405 0           static int failover_msg_match(const char *msg) {
406             static const char *KEYWORDS[] = {
407             "connect", "refused", "timeout", "unreachable", "route",
408             "reset", "closed", "broken", "down", "dns", "resolution",
409             "getaddrinfo", NULL,
410             };
411 0 0         if (!msg) return 0;
412 0 0         for (const char **k = KEYWORDS; *k; k++) {
413 0           size_t kl = strlen(*k);
414 0           const char *p = msg;
415 0           for (;;) {
416 0           const char *m = NULL;
417 0 0         for (const char *q = p; *q; q++) {
418 0 0         if (strncasecmp(q, *k, kl) == 0) { m = q; break; }
419             }
420 0 0         if (!m) break;
421 0 0         int boundary_l = (m == msg) || !isalnum((unsigned char)m[-1]);
    0          
422 0           int boundary_r = !isalnum((unsigned char)m[kl]);
423 0 0         if (boundary_l && boundary_r) return 1;
    0          
424 0           p = m + 1;
425             }
426             }
427 0           return 0;
428             }
429              
430             /* Advance failover ring + fire on_failover callback (if set). Caller is
431             * responsible for callback_depth bookkeeping. */
432 0           static void failover_advance(ev_clickhouse_t *self, const char *msg) {
433 0 0         if (!self->failover_hosts || self->failover_n <= 0) return;
    0          
434 0 0         if (!failover_msg_match(msg)) return;
435 0 0         char *old_host = self->host ? safe_strdup(self->host) : NULL;
436 0           unsigned int old_port = self->port;
437 0           self->failover_idx = (self->failover_idx + 1) % self->failover_n;
438 0 0         CLEAR_STR(self->host);
439 0           self->host = safe_strdup(self->failover_hosts[self->failover_idx]);
440 0           self->port = self->failover_ports[self->failover_idx];
441 0 0         if (self->on_failover) {
442 0           dSP;
443 0           ENTER; SAVETMPS;
444 0 0         PUSHMARK(SP);
445 0 0         XPUSHs(old_host ? sv_2mortal(newSVpv(old_host, 0)) : &PL_sv_undef);
    0          
446 0 0         mXPUSHu(old_port);
447 0 0         XPUSHs(sv_2mortal(newSVpv(self->host, 0)));
448 0 0         mXPUSHu(self->port);
449 0 0         XPUSHs(sv_2mortal(newSVpv(msg ? msg : "", 0)));
    0          
450 0           PUTBACK;
451 0           call_sv(self->on_failover, G_DISCARD | G_EVAL);
452 0 0         WARN_AND_CLEAR_ERRSV("on_failover");
    0          
    0          
    0          
453 0 0         FREETMPS; LEAVE;
454             }
455 0 0         if (old_host) Safefree(old_host);
456             }
457              
458 0           static void emit_error(ev_clickhouse_t *self, const char *msg) {
459             /* Guard callback_depth across BOTH dispatches: failover_advance can
460             * fire on_failover, and a user reset/finish from there must not
461             * cause Safefree(self) before emit_error finishes its own work.
462             * Caller must invoke check_destroyed() afterwards. */
463 0           self->callback_depth++;
464 0           failover_advance(self, msg);
465 0 0         if (!self->on_error) goto done;
466             {
467 0           dSP;
468 0           ENTER;
469 0           SAVETMPS;
470 0 0         PUSHMARK(SP);
471 0 0         XPUSHs(sv_2mortal(newSVpv(msg, 0)));
472 0           PUTBACK;
473              
474 0           call_sv(self->on_error, G_DISCARD | G_EVAL);
475 0 0         WARN_AND_CLEAR_ERRSV("error handler");
    0          
    0          
    0          
476              
477 0 0         FREETMPS;
478 0           LEAVE;
479             }
480 0           done:
481 0           self->callback_depth--;
482 0           }
483              
484             /* emit_error + cancel_pending + cleanup_connection. Returns 1 if self was
485             * freed; on 0 the connection is still gone, so caller should return either way.
486             * Re-checks connect_gen after BOTH emit_error and cancel_pending so a user
487             * reset() from on_error OR from any error-callback dispatched by
488             * cancel_pending wins over our teardown. */
489 0           static int teardown_io_error(ev_clickhouse_t *self, const char *emit_msg,
490             const char *cancel_msg) {
491 0           int gen = self->connect_gen;
492 0           emit_error(self, emit_msg);
493 0 0         if (check_destroyed(self)) return 1;
494 0 0         if (self->connect_gen != gen) return 0;
495 0 0         if (cancel_pending(self, cancel_msg)) return 1;
496 0 0         if (self->connect_gen != gen) return 0;
497 0           return cleanup_connection(self); /* 1 if on_disconnect freed self */
498             }
499              
500 0           static int fail_connection(ev_clickhouse_t *self, const char *msg) {
501 0           int gen = self->connect_gen;
502 0 0         if (teardown_io_error(self, msg, msg)) return 1;
503             /* gen mismatch means user reset() inside a callback — don't override
504             * their new connect with an auto-reconnect. */
505 0 0         if (self->connect_gen == gen && self->auto_reconnect && self->host)
    0          
    0          
506 0           schedule_reconnect(self);
507 0           return 0;
508             }
509              
510             /* Invoke a zero-argument callback (on_connect, on_disconnect, on_drain).
511             * Self-guards callback_depth and consumes ERRSV; caller decides whether to
512             * SvREFCNT_dec the captured cb. Returns 1 if self was freed. */
513 0           static int fire_zero_arg_cb(ev_clickhouse_t *self, SV *cb, const char *what) {
514 0           self->callback_depth++;
515             {
516 0           dSP;
517 0           ENTER;
518 0           SAVETMPS;
519 0 0         PUSHMARK(SP);
520 0           PUTBACK;
521 0           call_sv(cb, G_DISCARD | G_EVAL);
522 0 0         if (SvTRUE(ERRSV)) {
    0          
523 0 0         warn("EV::ClickHouse: exception in %s handler: %s",
524             what, SvPV_nolen(ERRSV));
525 0 0         sv_setsv(ERRSV, &PL_sv_undef);
526             }
527 0 0         FREETMPS;
528 0           LEAVE;
529             }
530 0           self->callback_depth--;
531 0           return check_destroyed(self);
532             }
533              
534 0           static void emit_trace(ev_clickhouse_t *self, const char *fmt, ...) {
535             char buf[512];
536             va_list ap;
537 0 0         if (!self->on_trace) return;
538 0           va_start(ap, fmt);
539 0           vsnprintf(buf, sizeof(buf), fmt, ap);
540 0           va_end(ap);
541              
542             /* Guard callback_depth; deliberately skip check_destroyed because most
543             * callers continue to use self after emit_trace — outer check_destroyed
544             * picks up the EV_CH_FREED state on the next opportunity. */
545 0           self->callback_depth++;
546             {
547 0           dSP;
548 0           ENTER;
549 0           SAVETMPS;
550 0 0         PUSHMARK(SP);
551 0 0         XPUSHs(sv_2mortal(newSVpv(buf, 0)));
552 0           PUTBACK;
553 0           call_sv(self->on_trace, G_DISCARD | G_EVAL);
554 0 0         WARN_AND_CLEAR_ERRSV("trace handler");
    0          
    0          
    0          
555 0 0         FREETMPS;
556 0           LEAVE;
557             }
558 0           self->callback_depth--;
559             }
560              
561             /* Pop the head cb_queue entry. If `out_on_complete` is non-NULL, the
562             * per-query on_query_complete override (if any) is moved into *out_on_complete
563             * with ownership transferred to the caller (caller must SvREFCNT_dec). */
564 0           static SV* pop_cb_ex(ev_clickhouse_t *self, SV **out_on_complete) {
565             ngx_queue_t *q;
566             ev_ch_cb_t *cbt;
567             SV *cb;
568              
569 0 0         if (out_on_complete) *out_on_complete = NULL;
570 0 0         if (ngx_queue_empty(&self->cb_queue)) return NULL;
571              
572 0           q = ngx_queue_head(&self->cb_queue);
573 0           cbt = ngx_queue_data(q, ev_ch_cb_t, queue);
574              
575 0           cb = cbt->cb;
576 0 0         CLEAR_SV(cbt->on_data);
577 0 0         if (out_on_complete) {
578 0           *out_on_complete = cbt->on_complete; /* transfer ownership */
579 0           cbt->on_complete = NULL;
580             } else {
581 0 0         CLEAR_SV(cbt->on_complete);
582             }
583 0           ngx_queue_remove(q);
584 0           self->pending_count--;
585 0           release_cbt(cbt);
586              
587 0           return cb;
588             }
589              
590             /* Peek the on_data callback from front of cb_queue (NULL if none) */
591 0           static SV* peek_cb_on_data(ev_clickhouse_t *self) {
592             ngx_queue_t *q;
593             ev_ch_cb_t *cbt;
594 0 0         if (ngx_queue_empty(&self->cb_queue)) return NULL;
595 0           q = ngx_queue_head(&self->cb_queue);
596 0           cbt = ngx_queue_data(q, ev_ch_cb_t, queue);
597 0           return cbt->on_data;
598             }
599              
600 0           static int peek_cb_raw(ev_clickhouse_t *self) {
601             ngx_queue_t *q;
602             ev_ch_cb_t *cbt;
603 0 0         if (ngx_queue_empty(&self->cb_queue)) return 0;
604 0           q = ngx_queue_head(&self->cb_queue);
605 0           cbt = ngx_queue_data(q, ev_ch_cb_t, queue);
606 0           return cbt->raw;
607             }
608              
609 0           static void invoke_cb(SV *cb) {
610 0           call_sv(cb, G_DISCARD | G_EVAL);
611 0 0         WARN_AND_CLEAR_ERRSV("callback");
    0          
    0          
    0          
612 0           SvREFCNT_dec(cb);
613 0           }
614              
615             /* Invoke `cb` with (undef, errmsg). Caller manages callback_depth. */
616 0           static void invoke_err_cb(SV *cb, const char *errmsg) {
617 0           dSP;
618 0           ENTER;
619 0           SAVETMPS;
620 0 0         PUSHMARK(SP);
621 0           PUSHs(&PL_sv_undef);
622 0           PUSHs(sv_2mortal(newSVpv(errmsg, 0)));
623 0           PUTBACK;
624 0           invoke_cb(cb);
625 0 0         FREETMPS;
626 0           LEAVE;
627 0           }
628              
629             /* Fire on_query_complete with (query_id, rows, bytes, error_code, duration_s, errmsg).
630             * Caller must guard callback_depth around any invoke that follows. Safe to
631             * call when on_query_complete is unset. `override` (when non-NULL) is the
632             * per-query override hook from the settings hashref; it REPLACES the
633             * connection-level handler for this query so per-query instrumentation
634             * doesn't double-count against global metrics. */
635 0           static void fire_on_query_complete_ex(ev_clickhouse_t *self, const char *errmsg,
636             SV *override) {
637 0 0         SV *target = override ? override : self->on_query_complete;
638 0 0         if (!target) return;
639 0           double dur = self->query_start_time > 0
640 0 0         ? ev_now(self->loop) - self->query_start_time : 0.0;
641 0           self->callback_depth++;
642             {
643 0           dSP;
644 0           ENTER; SAVETMPS;
645 0 0         PUSHMARK(SP);
646 0 0         EXTEND(SP, 6);
647 0 0         PUSHs(self->last_query_id
648             ? sv_2mortal(newSVpv(self->last_query_id, 0)) : &PL_sv_undef);
649 0           PUSHs(sv_2mortal(newSVuv(self->profile_rows)));
650 0           PUSHs(sv_2mortal(newSVuv(self->profile_bytes)));
651 0           PUSHs(sv_2mortal(newSViv(self->last_error_code)));
652 0           PUSHs(sv_2mortal(newSVnv(dur)));
653 0 0         PUSHs(errmsg ? sv_2mortal(newSVpv(errmsg, 0)) : &PL_sv_undef);
654 0           PUTBACK;
655 0           call_sv(target, G_DISCARD | G_EVAL);
656 0 0         WARN_AND_CLEAR_ERRSV("on_query_complete");
    0          
    0          
    0          
657 0 0         FREETMPS; LEAVE;
658             }
659 0           self->callback_depth--;
660             /* Reset so a subsequent fire for a never-dispatched cancelled
661             * query (e.g. from cancel_pending draining send_queue) sees
662             * query_start_time == 0 and reports dur = 0.0, not a stale
663             * duration carried over from the previous in-flight query. */
664 0           self->query_start_time = 0;
665             }
666              
667             /* IS_KEEPALIVE_CB is defined in xs/macros.h; keepalive_noop_cb is the
668             * sentinel that backs it (declared below this comment). */
669              
670 0           static int deliver_error(ev_clickhouse_t *self, const char *errmsg) {
671 0           SV *oqc = NULL;
672 0           SV *cb = pop_cb_ex(self, &oqc);
673 0 0         if (cb == NULL) {
674 0           fire_on_query_complete_ex(self, errmsg, oqc);
675 0 0         if (oqc) SvREFCNT_dec(oqc);
676 0           return check_destroyed(self);
677             }
678              
679 0           self->callback_depth++;
680 0 0         if (!IS_KEEPALIVE_CB(cb)) fire_on_query_complete_ex(self, errmsg, oqc);
    0          
681 0           invoke_err_cb(cb, errmsg);
682 0 0         if (oqc) SvREFCNT_dec(oqc);
683 0           self->callback_depth--;
684 0           return check_destroyed(self);
685             }
686              
687             /* Returns 1 if self was freed. */
688 0           static int deliver_rows(ev_clickhouse_t *self, AV *rows) {
689 0           SV *oqc = NULL;
690 0           SV *cb = pop_cb_ex(self, &oqc);
691 0 0         if (cb == NULL) {
692 0 0         if (rows) SvREFCNT_dec((SV*)rows);
693 0           fire_on_query_complete_ex(self, NULL, oqc);
694 0 0         if (oqc) SvREFCNT_dec(oqc);
695 0           return check_destroyed(self);
696             }
697              
698 0           self->callback_depth++;
699 0 0         if (!IS_KEEPALIVE_CB(cb)) fire_on_query_complete_ex(self, NULL, oqc);
    0          
700             {
701 0           dSP;
702 0           ENTER;
703 0           SAVETMPS;
704 0 0         PUSHMARK(SP);
705 0 0         PUSHs(rows ? sv_2mortal(newRV_noinc((SV*)rows)) : &PL_sv_undef);
706 0           PUTBACK;
707 0           invoke_cb(cb);
708 0 0         FREETMPS;
709 0           LEAVE;
710             }
711 0 0         if (oqc) SvREFCNT_dec(oqc);
712 0           self->callback_depth--;
713 0           return check_destroyed(self);
714             }
715              
716             /* Deliver raw response body as scalar string. Returns 1 if self was freed. */
717 0           static int deliver_raw_body(ev_clickhouse_t *self, const char *data, size_t len) {
718 0           SV *oqc = NULL;
719 0           SV *cb = pop_cb_ex(self, &oqc);
720 0 0         if (cb == NULL) {
721 0           fire_on_query_complete_ex(self, NULL, oqc);
722 0 0         if (oqc) SvREFCNT_dec(oqc);
723 0           return check_destroyed(self);
724             }
725              
726 0           self->callback_depth++;
727 0 0         if (!IS_KEEPALIVE_CB(cb)) fire_on_query_complete_ex(self, NULL, oqc);
    0          
728             {
729 0           dSP;
730 0           ENTER;
731 0           SAVETMPS;
732 0 0         PUSHMARK(SP);
733 0           PUSHs(sv_2mortal(newSVpvn(data, len)));
734 0           PUTBACK;
735 0           invoke_cb(cb);
736 0 0         FREETMPS;
737 0           LEAVE;
738             }
739 0 0         if (oqc) SvREFCNT_dec(oqc);
740 0           self->callback_depth--;
741 0           return check_destroyed(self);
742             }
743              
744             /* deliver_error + cancel_pending + cleanup_connection. Mirrors
745             * teardown_io_error but for paths that consume the in-flight callback
746             * (via deliver_error) rather than firing on_error. Returns 1 if self
747             * was freed; on 0 the connection is gone — caller must return either
748             * way. Re-checks connect_gen after BOTH dispatches so a user reset()
749             * from the delivered error-cb OR from any error-cb dispatched by
750             * cancel_pending wins over our teardown. */
751 0           static int teardown_after_deliver(ev_clickhouse_t *self,
752             const char *deliver_msg,
753             const char *cancel_msg) {
754 0           int gen = self->connect_gen;
755 0 0         if (deliver_error(self, deliver_msg)) return 1;
756 0 0         if (self->connect_gen != gen) return 0;
757 0 0         if (cancel_pending(self, cancel_msg)) return 1;
758 0 0         if (self->connect_gen != gen) return 0;
759 0           return cleanup_connection(self); /* 1 if on_disconnect freed self */
760             }
761              
762             /* on_complete refcount: caller transfers an already-owned ref (or NULL).
763             * Mirror semantics: the send entry held its own SvREFCNT_inc; we adopt
764             * that ref here and clear send->on_complete in the call site. */
765 0           static void push_cb_owned_ex(ev_clickhouse_t *self, SV *cb, int raw,
766             SV *on_data, SV *on_complete,
767             double query_timeout) {
768 0           ev_ch_cb_t *cbt = alloc_cbt();
769 0           cbt->cb = cb;
770 0           cbt->raw = raw;
771 0 0         cbt->on_data = on_data ? SvREFCNT_inc(on_data) : NULL;
772 0           cbt->on_complete = on_complete; /* ownership adopted */
773 0           cbt->query_timeout = query_timeout;
774 0           ngx_queue_insert_tail(&self->cb_queue, &cbt->queue);
775 0           }
776              
777 0           static SV* handler_accessor(SV **slot, SV *handler, int has_arg) {
778 0 0         if (has_arg) {
779 0 0         CLEAR_SV(*slot);
780 0 0         if (handler && SvROK(handler) && SvTYPE(SvRV(handler)) == SVt_PVCV) {
    0          
    0          
781 0           *slot = SvREFCNT_inc(handler);
782             }
783             }
784 0 0         return *slot ? SvREFCNT_inc(*slot) : &PL_sv_undef;
785             }
786              
787 0           static char* safe_strdup(const char *s) {
788             char *d;
789             size_t len;
790 0 0         if (!s) return NULL;
791 0           len = strlen(s);
792 0           Newx(d, len + 1, char);
793 0           Copy(s, d, len + 1, char);
794 0           return d;
795             }
796              
797             /* Write a SQL block comment " " to buf.
798             * Returns bytes written, or 0 if cmt is NULL. Both call sites reserve
799             * strlen(cmt) + 7 bytes for the comment plus the 7 framing bytes. */
800 0           static size_t qlc_emit_prefix(char *buf, const char *cmt) {
801 0           size_t cl, off = 0;
802 0 0         if (!cmt) return 0;
803 0           cl = strlen(cmt);
804 0           memcpy(buf + off, "/* ", 3); off += 3;
805 0           memcpy(buf + off, cmt, cl); off += cl;
806 0           memcpy(buf + off, " */ ", 4); off += 4;
807 0           return off;
808             }
809              
810 0           static int has_http_unsafe_chars(const char *s) {
811             /* XS gives us NUL-terminated C strings, so only reject CR/LF. */
812 0 0         if (!s) return 0;
813 0 0         for (; *s; s++)
814 0 0         if (*s == '\r' || *s == '\n') return 1;
    0          
815 0           return 0;
816             }
817              
818             /* Drop the first `n` bytes from recv_buf, shifting any remaining bytes left. */
819 0           static inline void recv_consume(struct ev_clickhouse_s *self, size_t n) {
820             /* Saturating: a user callback fired from inside the parser may
821             * call reset() which zeroes recv_len; an unguarded subtract would
822             * underflow size_t to a huge value and the next ch_read would write
823             * past the buffer. */
824 0 0         if (n >= self->recv_len) { self->recv_len = 0; return; }
825 0           memmove(self->recv_buf, self->recv_buf + n, self->recv_len - n);
826 0           self->recv_len -= n;
827             }
828              
829             static void ensure_send_cap(struct ev_clickhouse_s *self, size_t need);
830              
831             /* Replace send_buf content with `src` (heap-allocated, freed here). */
832 0           static inline void send_replace(struct ev_clickhouse_s *self, char *src, size_t len) {
833 0           ensure_send_cap(self, len);
834 0           Copy(src, self->send_buf, len, char);
835 0           self->send_len = len;
836 0           self->send_pos = 0;
837 0           Safefree(src);
838 0           }
839              
840 0           static int is_ip_literal(const char *s) {
841             struct in_addr a4;
842             struct in6_addr a6;
843 0           return (inet_pton(AF_INET, s, &a4) == 1 ||
844 0           inet_pton(AF_INET6, s, &a6) == 1);
845             }
846              
847             /* Tears down the socket + per-connection state, then fires on_disconnect.
848             * Returns 1 if on_disconnect freed self (caller must not touch self), else 0. */
849 0           static int cleanup_connection(ev_clickhouse_t *self) {
850 0           int was_connected = self->connected;
851              
852 0 0         if (was_connected) emit_trace(self, "disconnect");
853 0           stop_reading(self);
854 0           stop_writing(self);
855 0           stop_keepalive(self);
856 0           stop_timing(self);
857              
858             #ifdef HAVE_OPENSSL
859 0 0         if (self->ssl) {
860 0           SSL_shutdown(self->ssl);
861 0           SSL_free(self->ssl);
862 0           self->ssl = NULL;
863             }
864 0 0         if (self->ssl_ctx) {
865 0           SSL_CTX_free(self->ssl_ctx);
866 0           self->ssl_ctx = NULL;
867             }
868             #endif
869              
870 0 0         if (self->fd >= 0) {
871 0           close(self->fd);
872 0           self->fd = -1;
873             }
874              
875 0           self->connected = 0;
876 0           self->connecting = 0;
877 0           self->dns_pending = 0; /* finish/reset interrupts async DNS */
878 0           self->send_len = 0;
879 0           self->send_pos = 0;
880 0           self->recv_len = 0;
881 0           self->send_count = 0;
882 0           self->ka_in_flight = 0;
883 0           self->pending_addendum_finish = 0;
884 0           self->native_state = NATIVE_IDLE;
885 0 0         CLEAR_SV(self->native_rows);
886 0 0         CLEAR_SV(self->native_col_names);
887 0 0         CLEAR_SV(self->native_col_types);
888 0 0         CLEAR_SV(self->native_totals);
889 0 0         CLEAR_SV(self->native_extremes);
890 0           lc_free_dicts(self);
891 0 0         CLEAR_INSERT(self);
    0          
892 0 0         CLEAR_STR(self->insert_err);
893              
894             /* Fire on_disconnect AFTER state is reset, so a handler that queues
895             * new queries or calls reconnect sees clean state. The handler can
896             * drop the last $ch ref (-> DESTROY); propagate that so callers
897             * don't touch a freed self. */
898 0 0         if (was_connected && self->on_disconnect)
    0          
899 0           return fire_zero_arg_cb(self, self->on_disconnect, "disconnect");
900 0           return 0;
901             }
902              
903             /* Fire user error callback + on_query_complete (per-query override or
904             * connection-level — fire_on_query_complete_ex falls back). Honors the
905             * documented "fires after every query (success or error)" contract for
906             * cancelled queries. oqc is consumed (refcount-dec'd) here.
907             * HTTP keepalive PINGs are suppressed to match the success-path behavior:
908             * users instrumenting via on_query_complete shouldn't see spurious zero-
909             * row completions for pings they didn't initiate. */
910 0           static void fire_err_and_complete(ev_clickhouse_t *self, SV *cb,
911             const char *errmsg, SV *oqc) {
912             /* Fire on_query_complete BEFORE the user cb to match the order
913             * used by deliver_error / deliver_rows on the normal path —
914             * instrumentation observers expect the global hook to run first.
915             * Keepalive PINGs are suppressed to match the success-path
916             * behavior so observers don't see spurious zero-row completions. */
917 0 0         if (!IS_KEEPALIVE_CB(cb))
    0          
918 0           fire_on_query_complete_ex(self, errmsg, oqc);
919 0 0         if (oqc) SvREFCNT_dec(oqc);
920             /* A user error callback may drop the last ref to $ch (DESTROY runs
921             * deferred while callback_depth > 0). invoke_err_cb itself is safe
922             * either way — caller's outer magic check handles the next loop. */
923 0           invoke_err_cb(cb, errmsg);
924 0           }
925              
926             /* Drain in-flight cb_queue, delivering errmsg to each callback and resetting
927             * send_count. Caller manages callback_depth. */
928 0           static void drain_cb_queue(ev_clickhouse_t *self, const char *errmsg) {
929 0 0         while (!ngx_queue_empty(&self->cb_queue)) {
930 0           SV *oqc = NULL;
931 0           SV *cb = pop_cb_ex(self, &oqc);
932 0 0         if (cb == NULL) break;
933 0           fire_err_and_complete(self, cb, errmsg, oqc);
934 0 0         if (self->magic != EV_CH_MAGIC) break;
935             }
936 0           self->send_count = 0;
937 0           }
938              
939             /* Returns 1 if self was freed. */
940 0           static int cancel_pending(ev_clickhouse_t *self, const char *errmsg) {
941 0           self->callback_depth++;
942              
943 0 0         while (!ngx_queue_empty(&self->send_queue)) {
944 0           ngx_queue_t *q = ngx_queue_head(&self->send_queue);
945 0           ev_ch_send_t *send = ngx_queue_data(q, ev_ch_send_t, queue);
946 0           SV *cb = send->cb;
947 0           SV *oqc = send->on_complete; /* transfer ownership for fire below */
948 0           send->on_complete = NULL;
949 0           ngx_queue_remove(q);
950 0           Safefree(send->data);
951 0 0         CLEAR_INSERT(send);
    0          
952 0 0         CLEAR_SV(send->on_data);
953 0           release_send(send);
954 0           self->pending_count--;
955              
956 0           fire_err_and_complete(self, cb, errmsg, oqc);
957 0 0         if (self->magic != EV_CH_MAGIC) break;
958             }
959              
960 0           drain_cb_queue(self, errmsg);
961 0           self->callback_depth--;
962 0           return check_destroyed(self);
963             }
964              
965             /* --- I/O helpers (with optional TLS) --- */
966              
967 0           static ssize_t ch_read(ev_clickhouse_t *self, void *buf, size_t len) {
968             #ifdef HAVE_OPENSSL
969 0 0         if (self->ssl) {
970 0 0         int ssl_len = (len > (size_t)INT_MAX) ? INT_MAX : (int)len;
971 0           int ret = SSL_read(self->ssl, buf, ssl_len);
972 0 0         if (ret <= 0) {
973 0           int err = SSL_get_error(self->ssl, ret);
974 0 0         if (err == SSL_ERROR_WANT_READ) {
975 0           errno = EAGAIN;
976 0           return -1;
977             }
978 0 0         if (err == SSL_ERROR_WANT_WRITE) {
979 0           start_writing(self);
980 0           errno = EAGAIN;
981 0           return -1;
982             }
983 0 0         if (err == SSL_ERROR_ZERO_RETURN) return 0;
984 0           errno = EIO;
985 0           return -1;
986             }
987 0           return ret;
988             }
989             #endif
990 0           return read(self->fd, buf, len);
991             }
992              
993 0           static ssize_t ch_write(ev_clickhouse_t *self, const void *buf, size_t len) {
994             #ifdef HAVE_OPENSSL
995 0 0         if (self->ssl) {
996 0 0         int ssl_len = (len > (size_t)INT_MAX) ? INT_MAX : (int)len;
997 0           int ret = SSL_write(self->ssl, buf, ssl_len);
998 0 0         if (ret <= 0) {
999 0           int err = SSL_get_error(self->ssl, ret);
1000 0 0         if (err == SSL_ERROR_WANT_WRITE) {
1001 0           errno = EAGAIN;
1002 0           return -1;
1003             }
1004 0 0         if (err == SSL_ERROR_WANT_READ) {
1005 0           start_reading(self);
1006 0           errno = EAGAIN;
1007 0           return -1;
1008             }
1009 0           errno = EIO;
1010 0           return -1;
1011             }
1012 0           return ret;
1013             }
1014             #endif
1015 0           return write(self->fd, buf, len);
1016             }
1017              
1018             /* --- Buffer management --- */
1019              
1020 0           static void ensure_recv_cap(ev_clickhouse_t *self, size_t need) {
1021 0 0         if (self->recv_cap >= need) return;
1022 0 0         if (need > SIZE_MAX / 2) croak("recv buffer overflow");
1023 0           size_t newcap = self->recv_cap * 2;
1024 0 0         if (newcap < need) newcap = need;
1025 0           Renew(self->recv_buf, newcap, char);
1026 0           self->recv_cap = newcap;
1027             }
1028              
1029 0           static void ensure_send_cap(ev_clickhouse_t *self, size_t need) {
1030 0 0         if (self->send_cap >= need) return;
1031 0 0         if (need > SIZE_MAX / 2) croak("send buffer overflow");
1032 0           size_t newcap = self->send_cap * 2;
1033 0 0         if (newcap < need) newcap = need;
1034 0           Renew(self->send_buf, newcap, char);
1035 0           self->send_cap = newcap;
1036             }
1037              
1038             /* --- Native protocol buffer (for building packets) --- */
1039              
1040             typedef struct {
1041             char *data;
1042             size_t len;
1043             size_t cap;
1044             } native_buf_t;
1045              
1046 0           static void nbuf_init(native_buf_t *b) {
1047 0           b->cap = 256;
1048 0           b->len = 0;
1049 0           Newx(b->data, b->cap, char);
1050 0           }
1051              
1052 0           static void nbuf_grow(native_buf_t *b, size_t need) {
1053             /* Guard the b->len + need sum itself: a wraparound would make the
1054             * loop condition false and let nbuf_append memcpy past the buffer. */
1055 0 0         if (need > SIZE_MAX - b->len) croak("native buffer overflow");
1056 0 0         if (b->len + need > b->cap) {
1057 0 0         while (b->len + need > b->cap) {
1058 0 0         if (b->cap > SIZE_MAX / 2) croak("native buffer overflow");
1059 0           b->cap *= 2;
1060             }
1061 0           Renew(b->data, b->cap, char);
1062             }
1063 0           }
1064              
1065 0           static void nbuf_append(native_buf_t *b, const char *data, size_t len) {
1066 0           nbuf_grow(b, len);
1067 0           memcpy(b->data + b->len, data, len);
1068 0           b->len += len;
1069 0           }
1070              
1071 0           static void nbuf_varuint(native_buf_t *b, uint64_t n) {
1072 0           nbuf_grow(b, 10);
1073 0 0         while (n >= 0x80) {
1074 0           b->data[b->len++] = (char)((n & 0x7F) | 0x80);
1075 0           n >>= 7;
1076             }
1077 0           b->data[b->len++] = (char)n;
1078 0           }
1079              
1080 0           static void nbuf_string(native_buf_t *b, const char *s, size_t len) {
1081 0           nbuf_varuint(b, (uint64_t)len);
1082 0           nbuf_append(b, s, len);
1083 0           }
1084              
1085 0           static void nbuf_cstring(native_buf_t *b, const char *s) {
1086 0 0         nbuf_string(b, s, s ? strlen(s) : 0);
1087 0           }
1088              
1089 0           static void nbuf_u8(native_buf_t *b, uint8_t v) {
1090 0           nbuf_grow(b, 1);
1091 0           b->data[b->len++] = (char)v;
1092 0           }
1093              
1094 0           static void nbuf_le64(native_buf_t *b, uint64_t v) {
1095 0           nbuf_grow(b, 8);
1096 0           Copy(&v, b->data + b->len, 8, char);
1097 0           b->len += 8;
1098 0           }
1099              
1100 0           static void nbuf_ledouble(native_buf_t *b, double d) {
1101             uint64_t v;
1102 0           Copy(&d, &v, 8, char);
1103 0           nbuf_le64(b, v);
1104 0           }
1105              
1106             /* Write a parameter value as a Field::dump-format quoted string.
1107             * ClickHouse's Field::restoreFromDump for String parses a single-quoted token
1108             * with backslash escaping for ' and \. Without escaping, embedded single
1109             * quotes truncate the value silently. */
1110 0           static void nbuf_quoted_param(native_buf_t *b, const char *s, size_t len) {
1111 0           size_t i, esc = 0;
1112 0 0         for (i = 0; i < len; i++) if (s[i] == '\'' || s[i] == '\\') esc++;
    0          
    0          
1113 0           nbuf_varuint(b, (uint64_t)(len + esc + 2));
1114 0           nbuf_grow(b, len + esc + 2);
1115 0           b->data[b->len++] = '\'';
1116 0 0         if (esc == 0) {
1117 0           memcpy(b->data + b->len, s, len);
1118 0           b->len += len;
1119             } else {
1120 0 0         for (i = 0; i < len; i++) {
1121 0 0         if (s[i] == '\'' || s[i] == '\\') b->data[b->len++] = '\\';
    0          
1122 0           b->data[b->len++] = s[i];
1123             }
1124             }
1125 0           b->data[b->len++] = '\'';
1126 0           }
1127              
1128             /* --- Native protocol read helpers (from recv_buf) --- */
1129              
1130             /* Returns 1=success, 0=need more data, -1=overflow */
1131 0           static int read_varuint(const char *buf, size_t len, size_t *pos, uint64_t *out) {
1132 0           uint64_t val = 0;
1133 0           unsigned shift = 0;
1134 0           size_t p = *pos;
1135 0 0         while (p < len) {
1136 0           uint8_t byte = (uint8_t)buf[p++];
1137 0           val |= (uint64_t)(byte & 0x7F) << shift;
1138 0 0         if (!(byte & 0x80)) {
1139 0           *out = val;
1140 0           *pos = p;
1141 0           return 1;
1142             }
1143 0           shift += 7;
1144 0 0         if (shift >= 64) return -1;
1145             }
1146 0           return 0;
1147             }
1148              
1149 0           static int read_native_string_alloc(const char *buf, size_t len, size_t *pos,
1150             char **out, size_t *out_len) {
1151             uint64_t slen;
1152 0           size_t saved = *pos;
1153 0           int rc = read_varuint(buf, len, pos, &slen);
1154 0 0         if (rc <= 0) { *pos = saved; return rc; }
1155 0 0         if (slen > len - *pos) { *pos = saved; return 0; }
1156 0           Newx(*out, slen + 1, char);
1157 0           Copy(buf + *pos, *out, slen, char);
1158 0           (*out)[slen] = '\0';
1159 0 0         if (out_len) *out_len = (size_t)slen;
1160 0           *pos += slen;
1161 0           return 1;
1162             }
1163              
1164             /* Read a string without allocating — returns pointer into buf */
1165 0           static int read_native_string_ref(const char *buf, size_t len, size_t *pos,
1166             const char **out, size_t *out_len) {
1167             uint64_t slen;
1168 0           size_t saved = *pos;
1169 0           int rc = read_varuint(buf, len, pos, &slen);
1170 0 0         if (rc <= 0) { *pos = saved; return rc; }
1171 0 0         if (slen > len - *pos) { *pos = saved; return 0; }
1172 0           *out = buf + *pos;
1173 0           *out_len = (size_t)slen;
1174 0           *pos += slen;
1175 0           return 1;
1176             }
1177              
1178 0           static int read_u8(const char *buf, size_t len, size_t *pos, uint8_t *out) {
1179 0 0         if (*pos + 1 > len) return 0;
1180 0           *out = (uint8_t)buf[*pos];
1181 0           (*pos)++;
1182 0           return 1;
1183             }
1184              
1185 0           static int read_i32(const char *buf, size_t len, size_t *pos, int32_t *out) {
1186 0 0         if (*pos + 4 > len) return 0;
1187 0           memcpy(out, buf + *pos, 4);
1188 0           *pos += 4;
1189 0           return 1;
1190             }
1191              
1192             /* Skip native string */
1193 0           static int skip_native_string(const char *buf, size_t len, size_t *pos) {
1194             uint64_t slen;
1195 0           size_t saved = *pos;
1196 0           int rc = read_varuint(buf, len, pos, &slen);
1197 0 0         if (rc <= 0) { *pos = saved; return rc; }
1198 0 0         if (slen > len - *pos) { *pos = saved; return 0; }
1199 0           *pos += slen;
1200 0           return 1;
1201             }
1202              
1203             /* --- URL encoding --- */
1204              
1205 0           static size_t url_encode(const char *src, size_t src_len, char *dst) {
1206             static const char hex[] = "0123456789ABCDEF";
1207 0           size_t j = 0;
1208             size_t i;
1209 0 0         for (i = 0; i < src_len; i++) {
1210 0           unsigned char c = (unsigned char)src[i];
1211 0 0         if ((c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z') ||
    0          
    0          
    0          
    0          
1212 0 0         (c >= '0' && c <= '9') || c == '-' || c == '_' || c == '.' || c == '~') {
    0          
    0          
    0          
    0          
1213 0           dst[j++] = c;
1214             } else {
1215 0           dst[j++] = '%';
1216 0           dst[j++] = hex[c >> 4];
1217 0           dst[j++] = hex[c & 0x0F];
1218             }
1219             }
1220 0           return j;
1221             }
1222              
1223             /* --- Per-query settings helpers --- */
1224              
1225             /* Settings keys consumed by the client and never sent to the server. */
1226 0           static int is_client_only_key(const char *key, I32 klen) {
1227 0 0         return (klen == 3 && memcmp(key, "raw", 3) == 0)
1228 0 0         || (klen == 8 && memcmp(key, "query_id", 8) == 0)
    0          
1229 0 0         || (klen == 7 && memcmp(key, "on_data", 7) == 0)
    0          
1230 0 0         || (klen == 13 && memcmp(key, "query_timeout", 13) == 0)
    0          
1231 0 0         || (klen == 6 && memcmp(key, "params", 6) == 0)
    0          
1232 0 0         || (klen == 8 && memcmp(key, "external", 8) == 0)
    0          
1233 0 0         || (klen == 17 && memcmp(key, "on_query_complete", 17) == 0);
    0          
    0          
1234             }
1235              
1236             /* Upper bound on bytes needed for "&key=value" URL-encoded settings pairs. */
1237 0           static size_t settings_url_params_size(HV *defaults, HV *overrides) {
1238 0           size_t total = 0;
1239             HE *entry;
1240 0 0         if (overrides) {
1241 0           hv_iterinit(overrides);
1242 0 0         while ((entry = hv_iternext(overrides))) {
1243             I32 klen;
1244             STRLEN vlen;
1245 0           char *key = hv_iterkey(entry, &klen);
1246 0 0         if (is_client_only_key(key, klen)) continue;
1247 0           (void)SvPV(hv_iterval(overrides, entry), vlen);
1248 0           total += 2 + (size_t)klen * 3 + (size_t)vlen * 3;
1249             }
1250             }
1251 0 0         if (defaults) {
1252 0           hv_iterinit(defaults);
1253 0 0         while ((entry = hv_iternext(defaults))) {
1254             I32 klen;
1255             STRLEN vlen;
1256 0           char *key = hv_iterkey(entry, &klen);
1257 0 0         if (overrides && hv_exists(overrides, key, klen))
    0          
1258 0           continue; /* overridden */
1259 0 0         if (is_client_only_key(key, klen)) continue;
1260 0           (void)SvPV(hv_iterval(defaults, entry), vlen);
1261 0           total += 2 + (size_t)klen * 3 + (size_t)vlen * 3;
1262             }
1263             }
1264 0           return total;
1265             }
1266              
1267             /* Append merged settings as URL params (&key=encoded_value).
1268             * Extracts query_id into *query_id_out (caller must not free — points into HV).
1269             * Per-query overrides take precedence over connection defaults.
1270             * Returns new position in params buffer. */
1271 0           static size_t append_settings_url_params(char *params, size_t plen,
1272             HV *defaults, HV *overrides,
1273             const char **query_id_out, STRLEN *query_id_len_out) {
1274             HE *entry;
1275 0           *query_id_out = NULL;
1276 0           *query_id_len_out = 0;
1277              
1278             /* Write overrides first */
1279 0 0         if (overrides) {
1280 0           hv_iterinit(overrides);
1281 0 0         while ((entry = hv_iternext(overrides))) {
1282             I32 klen;
1283             STRLEN vlen;
1284 0           char *key = hv_iterkey(entry, &klen);
1285 0           char *val = SvPV(hv_iterval(overrides, entry), vlen);
1286 0 0         if (klen == 8 && memcmp(key, "query_id", 8) == 0) {
    0          
1287 0           *query_id_out = val;
1288 0           *query_id_len_out = vlen;
1289 0           continue;
1290             }
1291 0 0         if (is_client_only_key(key, klen)) continue;
1292 0           params[plen++] = '&';
1293 0           plen += url_encode(key, (size_t)klen, params + plen);
1294 0           params[plen++] = '=';
1295 0           plen += url_encode(val, vlen, params + plen);
1296             }
1297             }
1298             /* Write defaults, skipping keys present in overrides */
1299 0 0         if (defaults) {
1300 0           hv_iterinit(defaults);
1301 0 0         while ((entry = hv_iternext(defaults))) {
1302             I32 klen;
1303             STRLEN vlen;
1304 0           char *key = hv_iterkey(entry, &klen);
1305 0           char *val = SvPV(hv_iterval(defaults, entry), vlen);
1306 0 0         if (overrides && hv_exists(overrides, key, klen))
    0          
1307 0           continue;
1308 0 0         if (klen == 8 && memcmp(key, "query_id", 8) == 0) {
    0          
1309 0 0         if (!*query_id_out) {
1310 0           *query_id_out = val;
1311 0           *query_id_len_out = vlen;
1312             }
1313 0           continue;
1314             }
1315 0 0         if (is_client_only_key(key, klen)) continue;
1316 0           params[plen++] = '&';
1317 0           plen += url_encode(key, (size_t)klen, params + plen);
1318 0           params[plen++] = '=';
1319 0           plen += url_encode(val, vlen, params + plen);
1320             }
1321             }
1322 0           return plen;
1323             }
1324              
1325             /* Write merged param_* entries as native parameters block.
1326             * Format: (String name, VarUInt flags=2, String quoted-value)* — name is the
1327             * portion after the "param_" prefix. Caller writes the empty-name terminator. */
1328 0           static void write_native_params(native_buf_t *b, HV *defaults, HV *overrides) {
1329             HE *entry;
1330 0 0         if (overrides) {
1331 0           hv_iterinit(overrides);
1332 0 0         while ((entry = hv_iternext(overrides))) {
1333             I32 klen;
1334             STRLEN vlen;
1335 0           char *key = hv_iterkey(entry, &klen);
1336 0           char *val = SvPV(hv_iterval(overrides, entry), vlen);
1337 0 0         if (klen <= 6 || memcmp(key, "param_", 6) != 0) continue;
    0          
1338 0           nbuf_varuint(b, (uint64_t)(klen - 6));
1339 0           nbuf_append(b, key + 6, (size_t)(klen - 6));
1340 0           nbuf_varuint(b, 2); /* flags: CUSTOM */
1341 0           nbuf_quoted_param(b, val, vlen);
1342             }
1343             }
1344 0 0         if (defaults) {
1345 0           hv_iterinit(defaults);
1346 0 0         while ((entry = hv_iternext(defaults))) {
1347             I32 klen;
1348             STRLEN vlen;
1349 0           char *key = hv_iterkey(entry, &klen);
1350             char *val;
1351 0 0         if (klen <= 6 || memcmp(key, "param_", 6) != 0) continue;
    0          
1352 0 0         if (overrides && hv_exists(overrides, key, klen)) continue;
    0          
1353 0           val = SvPV(hv_iterval(defaults, entry), vlen);
1354 0           nbuf_varuint(b, (uint64_t)(klen - 6));
1355 0           nbuf_append(b, key + 6, (size_t)(klen - 6));
1356 0           nbuf_varuint(b, 2); /* flags: CUSTOM */
1357 0           nbuf_quoted_param(b, val, vlen);
1358             }
1359             }
1360 0           }
1361              
1362             /* Write merged settings in native protocol wire format.
1363             * Format per setting: String name, UInt8 is_important(0), String value.
1364             * Terminated by empty name string (written by caller). */
1365 0           static void write_native_settings(native_buf_t *b, HV *defaults, HV *overrides) {
1366             HE *entry;
1367              
1368 0 0         if (overrides) {
1369 0           hv_iterinit(overrides);
1370 0 0         while ((entry = hv_iternext(overrides))) {
1371             I32 klen;
1372             STRLEN vlen;
1373 0           char *key = hv_iterkey(entry, &klen);
1374 0           char *val = SvPV(hv_iterval(overrides, entry), vlen);
1375 0 0         if (is_client_only_key(key, klen)) continue;
1376             /* param_* keys go in the parameters block, not settings */
1377 0 0         if (klen > 6 && memcmp(key, "param_", 6) == 0) continue;
    0          
1378 0           nbuf_varuint(b, (uint64_t)klen);
1379 0           nbuf_append(b, key, (size_t)klen);
1380 0           nbuf_u8(b, 0); /* is_important = 0 */
1381 0           nbuf_varuint(b, (uint64_t)vlen);
1382 0           nbuf_append(b, val, vlen);
1383             }
1384             }
1385 0 0         if (defaults) {
1386 0           hv_iterinit(defaults);
1387 0 0         while ((entry = hv_iternext(defaults))) {
1388             I32 klen;
1389             STRLEN vlen;
1390 0           char *key = hv_iterkey(entry, &klen);
1391 0           char *val = SvPV(hv_iterval(defaults, entry), vlen);
1392 0 0         if (overrides && hv_exists(overrides, key, klen))
    0          
1393 0           continue;
1394 0 0         if (is_client_only_key(key, klen)) continue;
1395 0 0         if (klen > 6 && memcmp(key, "param_", 6) == 0) continue;
    0          
1396 0           nbuf_varuint(b, (uint64_t)klen);
1397 0           nbuf_append(b, key, (size_t)klen);
1398 0           nbuf_u8(b, 0); /* is_important = 0 */
1399 0           nbuf_varuint(b, (uint64_t)vlen);
1400 0           nbuf_append(b, val, vlen);
1401             }
1402             }
1403 0           }
1404              
1405             /* Textually included so all helpers stay file-local statics and the
1406             * compiler sees one translation unit. Order matters: each file may
1407             * call into helpers defined above it, but not below. xs/io.c must
1408             * come last because it's the only file that reaches into every other
1409             * subsystem (TCP/TLS, HTTP, native, types). */
1410             #include "xs/codecs.c"
1411             #include "xs/proto_http.c"
1412             #include "xs/proto_native_build.c"
1413             #include "xs/types.c"
1414             #include "xs/proto_native_parse.c"
1415             #include "xs/io.c"
1416              
1417             /* --- XS interface --- */
1418              
1419             MODULE = EV::ClickHouse PACKAGE = EV::ClickHouse
1420              
1421             BOOT:
1422             {
1423 48 50         I_EV_API("EV::ClickHouse");
    50          
    50          
1424 48           ch_openssl_init();
1425             /* Permanent no-op CV used for internal callbacks (HTTP keepalive ping). */
1426 48           keepalive_noop_cb = newRV_inc((SV*)get_cv("EV::ClickHouse::__keepalive_noop", GV_ADD));
1427             /* Per-process rand() seed so reconnect_jitter desynchronises forks
1428             * (otherwise every worker generates the same sequence and the
1429             * jitter is uniform across the herd, defeating its purpose). */
1430 48           srand((unsigned)time(NULL) ^ (unsigned)getpid());
1431             }
1432              
1433             EV::ClickHouse
1434             _new(char *class, EV::Loop loop)
1435             CODE:
1436             {
1437             PERL_UNUSED_VAR(class);
1438 0           Newxz(RETVAL, 1, ev_clickhouse_t);
1439 0           RETVAL->magic = EV_CH_MAGIC;
1440 0           RETVAL->loop = loop;
1441 0           RETVAL->fd = -1;
1442 0           RETVAL->protocol = PROTO_HTTP;
1443 0           ngx_queue_init(&RETVAL->cb_queue);
1444 0           ngx_queue_init(&RETVAL->send_queue);
1445              
1446 0           Newx(RETVAL->recv_buf, RECV_BUF_INIT, char);
1447 0           RETVAL->recv_cap = RECV_BUF_INIT;
1448 0           Newx(RETVAL->send_buf, SEND_BUF_INIT, char);
1449 0           RETVAL->send_cap = SEND_BUF_INIT;
1450              
1451 0           ev_init(&RETVAL->timer, timer_cb);
1452 0           RETVAL->timer.data = (void *)RETVAL;
1453             }
1454             OUTPUT:
1455             RETVAL
1456              
1457             void
1458             DESTROY(EV::ClickHouse self)
1459             CODE:
1460             {
1461 0 0         if (self->magic != EV_CH_MAGIC) return;
1462              
1463 0           stop_reading(self);
1464 0           stop_writing(self);
1465 0           stop_timing(self);
1466 0           stop_keepalive(self);
1467 0 0         if (self->reconnect_timing) {
1468 0           ev_timer_stop(self->loop, &self->reconnect_timer);
1469 0           self->reconnect_timing = 0;
1470             }
1471              
1472 0 0         if (PL_dirty) {
1473 0           self->magic = EV_CH_FREED;
1474 0 0         while (!ngx_queue_empty(&self->send_queue)) {
1475 0           ngx_queue_t *q = ngx_queue_head(&self->send_queue);
1476 0           ev_ch_send_t *send = ngx_queue_data(q, ev_ch_send_t, queue);
1477 0           ngx_queue_remove(q);
1478 0           Safefree(send->data);
1479 0 0         CLEAR_INSERT(send);
    0          
1480 0 0         CLEAR_SV(send->on_data);
1481 0 0         CLEAR_SV(send->on_complete);
1482 0           SvREFCNT_dec(send->cb);
1483 0           release_send(send);
1484             }
1485 0 0         while (!ngx_queue_empty(&self->cb_queue)) {
1486 0           ngx_queue_t *q = ngx_queue_head(&self->cb_queue);
1487 0           ev_ch_cb_t *cbt = ngx_queue_data(q, ev_ch_cb_t, queue);
1488 0           ngx_queue_remove(q);
1489 0 0         CLEAR_SV(cbt->on_data);
1490 0 0         CLEAR_SV(cbt->on_complete);
1491 0           SvREFCNT_dec(cbt->cb);
1492 0           release_cbt(cbt);
1493             }
1494              
1495 0 0         #ifdef HAVE_OPENSSL
1496 0 0         if (self->ssl) { SSL_free(self->ssl); self->ssl = NULL; }
1497 0 0         if (self->ssl_ctx) { SSL_CTX_free(self->ssl_ctx); self->ssl_ctx = NULL; }
1498 0 0         #endif
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
1499 0 0         if (self->fd >= 0) close(self->fd);
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
1500 0           CLEAR_CONNECTION_HANDLERS(self);
1501 0 0         CLEAR_PERSISTENT_STATE(self);
    0          
1502 0           failover_free(self);
1503 0           CLEAR_INSERT(self);
1504 0           lc_free_dicts(self);
1505             Safefree(self);
1506             return;
1507 0 0         }
1508 0            
1509             if (cancel_pending(self, "object destroyed"))
1510             return; /* inner DESTROY already freed self */
1511              
1512             /* A user callback fired from cancel_pending may have called
1513             * $ch->reset() which re-arms watchers / opens a fresh fd.
1514 0           * Stop everything again before tearing the struct down so the
1515 0           * EV loop can't dispatch into freed memory. */
1516 0           stop_reading(self);
1517 0           stop_writing(self);
1518 0 0         stop_timing(self);
1519 0           stop_keepalive(self);
1520 0           if (self->reconnect_timing) {
1521             ev_timer_stop(self->loop, &self->reconnect_timer);
1522             self->reconnect_timing = 0;
1523 0 0         }
1524 0            
1525 0           #ifdef HAVE_OPENSSL
1526 0           if (self->ssl) {
1527             SSL_shutdown(self->ssl);
1528 0 0         SSL_free(self->ssl);
1529 0           self->ssl = NULL;
1530 0           }
1531             if (self->ssl_ctx) {
1532             SSL_CTX_free(self->ssl_ctx);
1533 0 0         self->ssl_ctx = NULL;
1534 0           }
1535 0           #endif
1536              
1537             if (self->fd >= 0) {
1538 0           close(self->fd);
1539 0           self->fd = -1;
1540             }
1541 0 0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
1542 0 0         self->loop = NULL;
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
1543 0           self->connected = 0;
1544 0            
1545 0 0         CLEAR_CONNECTION_HANDLERS(self);
    0          
1546             CLEAR_PERSISTENT_STATE(self);
1547 0           failover_free(self);
1548 0 0         lc_free_dicts(self);
1549 0           CLEAR_INSERT(self);
1550              
1551             self->magic = EV_CH_FREED;
1552             if (self->callback_depth == 0) {
1553             Safefree(self);
1554             }
1555             /* else: check_destroyed() will Safefree when callback_depth reaches 0 */
1556             }
1557              
1558             void
1559             _set_tls_ca_file(EV::ClickHouse self, const char *path)
1560             CODE:
1561             {
1562 0 0         CLEAR_STR(self->tls_ca_file);
1563 0           self->tls_ca_file = safe_strdup(path);
1564             }
1565              
1566             void
1567             _set_tls_cert_file(EV::ClickHouse self, const char *path)
1568             CODE:
1569             {
1570 0 0         CLEAR_STR(self->tls_cert_file);
1571 0           self->tls_cert_file = safe_strdup(path);
1572             }
1573              
1574             void
1575             _set_tls_key_file(EV::ClickHouse self, const char *path)
1576             CODE:
1577             {
1578 0 0         CLEAR_STR(self->tls_key_file);
1579 0           self->tls_key_file = safe_strdup(path);
1580             }
1581              
1582             void
1583             connect(EV::ClickHouse self, const char *host, unsigned int port, const char *user, const char *password, const char *database)
1584             CODE:
1585             {
1586 0 0         if (self->connected || self->connecting) croak("already connected");
    0          
1587 0 0         if (has_http_unsafe_chars(host) || has_http_unsafe_chars(user) ||
1588 0 0         has_http_unsafe_chars(password) || has_http_unsafe_chars(database))
1589 0           croak("connection parameters must not contain CR or LF");
1590              
1591 0 0         CLEAR_STR(self->host);
1592 0 0         CLEAR_STR(self->user);
1593 0 0         CLEAR_STR(self->password);
1594 0 0         CLEAR_STR(self->database);
1595              
1596 0           self->host = safe_strdup(host);
1597 0           self->port = port;
1598 0           self->user = safe_strdup(user);
1599 0           self->password = safe_strdup(password);
1600 0           self->database = safe_strdup(database);
1601              
1602 0           start_connect(self);
1603             }
1604              
1605             void
1606             reset(EV::ClickHouse self)
1607             CODE:
1608             {
1609 0 0         if (!self->host) croak("no previous connection to reset");
1610 0 0         if (cancel_pending(self, "connection reset")) return;
1611 0 0         if (cleanup_connection(self)) return; /* on_disconnect freed self */
1612 0           start_connect(self);
1613             }
1614              
1615             void
1616             finish(EV::ClickHouse self)
1617             CODE:
1618             {
1619 0 0         if (cancel_pending(self, "connection finished")) return;
1620 0           (void)cleanup_connection(self); /* nothing follows — freed-or-not is moot */
1621             }
1622              
1623             void
1624             query(EV::ClickHouse self, SV *sql_sv, ...)
1625             CODE:
1626             {
1627             STRLEN sql_len;
1628             const char *sql;
1629             ev_ch_send_t *s;
1630             char *req;
1631             size_t req_len;
1632             SV *cb;
1633 0           HV *settings = NULL;
1634 0           int raw = 0;
1635              
1636 0 0         if (items == 3) {
1637 0           cb = ST(2);
1638 0 0         } else if (items == 4) {
1639 0 0         if (!(SvROK(ST(2)) && SvTYPE(SvRV(ST(2))) == SVt_PVHV))
    0          
1640 0           croak("settings must be a HASH reference");
1641 0           settings = (HV *)SvRV(ST(2));
1642 0           cb = ST(3);
1643             } else {
1644 0           croak("Usage: $ch->query($sql, [\\%%settings], $cb)");
1645             }
1646              
1647 0 0         if (!self->connected && !self->connecting && !self->dns_pending) croak("not connected");
    0          
    0          
1648 0 0         if (!(SvROK(cb) && SvTYPE(SvRV(cb)) == SVt_PVCV))
    0          
1649 0           croak("callback must be a CODE reference");
1650              
1651 0 0         if (self->protocol == PROTO_NATIVE && (self->insert_data || self->insert_av))
    0          
    0          
1652 0           croak("cannot queue native query while INSERT is pending");
1653              
1654             /* Extract client-side options from settings */
1655 0           SV *on_data_sv = NULL;
1656 0           SV *on_complete_sv = NULL;
1657 0           HV *external = NULL; /* per-query external tables (native only) */
1658 0           HV *settings_copy = NULL; /* owned copy if we need to expand params */
1659 0 0         if (settings) {
1660 0           settings_copy = expand_params(aTHX_ settings);
1661 0 0         if (settings_copy) settings = settings_copy;
1662 0           SV **svp = hv_fetch(settings, "raw", 3, 0);
1663 0 0         if (svp) raw = SvTRUE(*svp) ? 1 : 0;
1664 0           svp = hv_fetch(settings, "on_data", 7, 0);
1665 0 0         if (svp && SvROK(*svp) && SvTYPE(SvRV(*svp)) == SVt_PVCV)
    0          
    0          
1666 0           on_data_sv = *svp;
1667 0           svp = hv_fetch(settings, "on_query_complete", 17, 0);
1668 0 0         if (svp && SvROK(*svp) && SvTYPE(SvRV(*svp)) == SVt_PVCV)
    0          
    0          
1669 0           on_complete_sv = *svp;
1670 0           svp = hv_fetch(settings, "external", 8, 0);
1671 0 0         if (svp && SvROK(*svp) && SvTYPE(SvRV(*svp)) == SVt_PVHV
    0          
    0          
1672 0 0         && HvKEYS((HV *)SvRV(*svp)) > 0)
    0          
1673 0           external = (HV *)SvRV(*svp);
1674             }
1675              
1676 0           sql = SvPV(sql_sv, sql_len);
1677              
1678 0 0         if (self->max_query_size > 0 && sql_len > self->max_query_size) {
    0          
1679 0 0         if (settings_copy) SvREFCNT_dec((SV*)settings_copy);
1680 0           croak("query size %lu exceeds max_query_size %lu",
1681             (unsigned long)sql_len, (unsigned long)self->max_query_size);
1682             }
1683              
1684 0 0         if (raw && self->protocol == PROTO_NATIVE) {
    0          
1685 0 0         if (settings_copy) SvREFCNT_dec((SV*)settings_copy);
1686 0           croak("raw mode is only supported with the HTTP protocol");
1687             }
1688              
1689 0 0         if (on_data_sv && self->protocol == PROTO_HTTP) {
    0          
1690 0 0         if (settings_copy) SvREFCNT_dec((SV*)settings_copy);
1691 0           croak("on_data is only supported with the native protocol");
1692             }
1693              
1694 0 0         if (external && self->protocol == PROTO_HTTP) {
    0          
1695 0 0         if (settings_copy) SvREFCNT_dec((SV*)settings_copy);
1696 0           croak("external tables are only supported with the native protocol");
1697             }
1698              
1699             /* Optionally prepend a SQL block comment carrying query_log_comment
1700             * for system.query_log traceability. Connection-level setting; the
1701             * prefix lives for the query call only. */
1702 0           char *qlc_sql = NULL;
1703 0 0         if (self->query_log_comment) {
1704 0           size_t qlc_sql_len = strlen(self->query_log_comment) + sql_len + 7;
1705 0           Newx(qlc_sql, qlc_sql_len + 1, char);
1706 0           size_t off = qlc_emit_prefix(qlc_sql, self->query_log_comment);
1707 0           memcpy(qlc_sql + off, sql, sql_len);
1708 0           qlc_sql[qlc_sql_len] = '\0';
1709 0           sql = qlc_sql;
1710 0           sql_len = qlc_sql_len;
1711             }
1712              
1713 0 0         if (self->protocol == PROTO_HTTP) {
1714 0           req = build_http_post_request(self, NULL, 0, sql, sql_len,
1715             self->default_settings, settings,
1716             &req_len);
1717             } else {
1718 0           char *ext_data = NULL;
1719 0           size_t ext_len = 0;
1720 0 0         if (external) {
1721             char ext_errbuf[256];
1722 0           ext_errbuf[0] = '\0';
1723 0           ext_data = build_external_tables(aTHX_ self, external, &ext_len,
1724             ext_errbuf, sizeof(ext_errbuf));
1725 0 0         if (!ext_data) {
1726 0 0         if (settings_copy) SvREFCNT_dec((SV*)settings_copy);
1727 0 0         if (qlc_sql) Safefree(qlc_sql);
1728 0           croak("%s", ext_errbuf);
1729             }
1730             }
1731 0           req = build_native_query(self, sql, sql_len,
1732             self->default_settings, settings,
1733             ext_data, ext_len, &req_len);
1734 0 0         if (ext_data) Safefree(ext_data);
1735             }
1736 0 0         if (qlc_sql) Safefree(qlc_sql);
1737              
1738 0           s = alloc_send();
1739 0           s->data = req;
1740 0           s->data_len = req_len;
1741 0           s->raw = raw;
1742 0 0         if (on_data_sv) s->on_data = SvREFCNT_inc(on_data_sv);
1743 0 0         if (on_complete_sv) s->on_complete = SvREFCNT_inc(on_complete_sv);
1744 0 0         if (settings) send_apply_settings(s, settings);
1745 0           s->cb = SvREFCNT_inc(cb);
1746 0 0         if (settings_copy) SvREFCNT_dec((SV*)settings_copy);
1747 0           enqueue_send(self, s);
1748             }
1749              
1750             void
1751             insert(EV::ClickHouse self, SV *table_sv, SV *data_sv, ...)
1752             CODE:
1753             {
1754             STRLEN table_len;
1755             const char *table;
1756             ev_ch_send_t *s;
1757             char *req;
1758             size_t req_len;
1759             SV *cb;
1760 0           HV *settings = NULL;
1761 0           int data_is_av = 0;
1762 0           AV *data_av = NULL;
1763              
1764 0 0         if (items == 4) {
1765 0           cb = ST(3);
1766 0 0         } else if (items == 5) {
1767 0 0         if (!(SvROK(ST(3)) && SvTYPE(SvRV(ST(3))) == SVt_PVHV))
    0          
1768 0           croak("settings must be a HASH reference");
1769 0           settings = (HV *)SvRV(ST(3));
1770 0           cb = ST(4);
1771             } else {
1772 0           croak("Usage: $ch->insert($table, $data, [\\%%settings], $cb)");
1773             }
1774              
1775 0 0         if (!self->connected && !self->connecting && !self->dns_pending) croak("not connected");
    0          
    0          
1776 0 0         if (!(SvROK(cb) && SvTYPE(SvRV(cb)) == SVt_PVCV))
    0          
1777 0           croak("callback must be a CODE reference");
1778              
1779 0           table = SvPV(table_sv, table_len);
1780              
1781             /* Detect arrayref-of-arrayrefs vs TSV string */
1782 0 0         if (SvROK(data_sv) && SvTYPE(SvRV(data_sv)) == SVt_PVAV) {
    0          
1783 0           data_is_av = 1;
1784 0           data_av = (AV *)SvRV(data_sv);
1785             }
1786              
1787             /* Native two-phase INSERT can only have one in flight at a time */
1788 0 0         if (self->protocol == PROTO_NATIVE && (self->insert_data || self->insert_av))
    0          
    0          
1789 0           croak("cannot pipeline native INSERT: previous INSERT still pending");
1790              
1791             /* Validate/serialize HTTP TSV data first — serialize_av_to_tsv can croak,
1792             * and we don't want to leak settings_copy or insert_sql via longjmp. */
1793 0           char *tsv_buf = NULL;
1794 0           STRLEN tsv_len = 0;
1795 0           const char *http_data = NULL;
1796 0 0         if (self->protocol == PROTO_HTTP) {
1797 0 0         if (data_is_av) {
1798 0           tsv_buf = serialize_av_to_tsv(aTHX_ data_av, &tsv_len);
1799 0           http_data = tsv_buf;
1800             } else {
1801 0           http_data = SvPV(data_sv, tsv_len);
1802             }
1803             }
1804              
1805             /* Expand params => { x => 1 } to param_x => '1' AND idempotent => 1|$tok
1806             * to insert_deduplication_token in a single owned settings copy. */
1807 0           HV *settings_copy = NULL;
1808 0 0         if (settings) {
1809 0           SV **idem = hv_fetch(settings, "idempotent", 10, 0);
1810 0           settings_copy = expand_params(aTHX_ settings);
1811             /* `idempotent => 1` (true scalar) auto-mints a token;
1812             * `idempotent => "any-other-string"` is used as the literal token;
1813             * a falsy value (0 / "" / undef / not present) is a no-op. */
1814 0 0         if (idem && SvTRUE(*idem)) {
    0          
1815 0 0         if (!settings_copy) {
1816 0           settings_copy = newHVhv(settings);
1817             }
1818 0           (void)hv_delete(settings_copy, "idempotent", 10, G_DISCARD);
1819 0           STRLEN tlen = 0;
1820 0           const char *tstr = NULL;
1821             char tbuf[48];
1822 0           int generate = 1;
1823 0 0         if (SvPOK(*idem) || SvIOK(*idem) || SvNOK(*idem)) {
    0          
    0          
1824 0           tstr = SvPV(*idem, tlen);
1825             /* Only the literal "1" auto-generates; any other truthy
1826             * stringy value is the user's own token. */
1827 0 0         if (!(tlen == 1 && tstr[0] == '1')) generate = 0;
    0          
1828             }
1829 0 0         if (generate) {
1830             static uint64_t idem_seq = 0;
1831 0           idem_seq++;
1832 0           tlen = (size_t)snprintf(tbuf, sizeof(tbuf),
1833             "ev-ch-%lld-%d-%llu",
1834 0           (long long)time(NULL), (int)getpid(),
1835             (unsigned long long)idem_seq);
1836 0           tstr = tbuf;
1837             }
1838 0           (void)hv_store(settings_copy, "insert_deduplication_token", 26,
1839             newSVpvn(tstr, tlen), 0);
1840             }
1841             /* `async_insert => 1` toggles ClickHouse server-side INSERT batching;
1842             * fills in wait_for_async_insert=0 unless the caller overrode it. */
1843 0           SV **async = hv_fetch(settings, "async_insert", 12, 0);
1844 0 0         if (async && SvTRUE(*async)) {
    0          
1845 0 0         if (!settings_copy) settings_copy = newHVhv(settings);
1846 0           (void)hv_store(settings_copy, "async_insert", 12, newSViv(1), 0);
1847 0 0         if (!hv_exists(settings_copy, "wait_for_async_insert", 21))
1848 0           (void)hv_store(settings_copy, "wait_for_async_insert", 21,
1849             newSViv(0), 0);
1850             }
1851 0 0         if (settings_copy) settings = settings_copy;
1852             }
1853              
1854             /* Build "insert into table format TabSeparated" (no inline data),
1855             * optionally prefixed with the connection's query_log_comment so the
1856             * server's system.query_log shows the same tag for selects and
1857             * inserts (parity with the query() XSUB above). */
1858 0           size_t qlc_extra = self->query_log_comment
1859 0           ? strlen(self->query_log_comment) + 7
1860 0 0         : 0;
1861             char *insert_sql;
1862 0           Newx(insert_sql, table_len + qlc_extra + 64, char);
1863 0           size_t off = qlc_emit_prefix(insert_sql, self->query_log_comment);
1864 0           size_t insert_sql_len = off + (size_t)snprintf(insert_sql + off,
1865             table_len + 64,
1866             "insert into %.*s format TabSeparated",
1867             (int)table_len, table);
1868              
1869 0 0         if (self->protocol == PROTO_HTTP) {
1870 0           req = build_http_post_request(self, insert_sql, insert_sql_len,
1871             http_data, tsv_len,
1872             self->default_settings, settings,
1873             &req_len);
1874 0 0         if (tsv_buf) Safefree(tsv_buf);
1875             } else {
1876             /* Native two-phase: phase 1 sends the query (no data), phase 2
1877             * sends the binary Data block once we have the sample block.
1878             * INSERT carries no external tables. */
1879 0           req = build_native_query(self, insert_sql, insert_sql_len,
1880             self->default_settings, settings,
1881             NULL, 0, &req_len);
1882             }
1883 0           Safefree(insert_sql);
1884              
1885 0           s = alloc_send();
1886 0           s->data = req;
1887 0           s->data_len = req_len;
1888 0 0         if (settings) send_apply_settings(s, settings);
1889             /* Per-query on_query_complete override: extract BEFORE freeing
1890             * settings_copy (matches query() XSUB pattern; otherwise the
1891             * hv_fetch below would read a freed HV). */
1892 0 0         if (settings) {
1893 0           SV **svp = hv_fetch(settings, "on_query_complete", 17, 0);
1894 0 0         if (svp && SvROK(*svp) && SvTYPE(SvRV(*svp)) == SVt_PVCV)
    0          
    0          
1895 0           s->on_complete = SvREFCNT_inc(*svp);
1896             }
1897 0 0         if (settings_copy) SvREFCNT_dec((SV*)settings_copy);
1898              
1899             /* For native INSERT, store data in the send entry (deferred to dispatch).
1900             * Even empty data needs the two-phase INSERT protocol to send an empty
1901             * DATA block. */
1902 0 0         if (self->protocol == PROTO_NATIVE) {
1903 0 0         if (data_is_av) {
1904 0           s->insert_av = SvREFCNT_inc(data_sv);
1905             } else {
1906             STRLEN data_len;
1907 0           const char *data = SvPV(data_sv, data_len);
1908 0 0         Newx(s->insert_data, data_len > 0 ? data_len : 1, char);
1909 0 0         if (data_len > 0)
1910 0           Copy(data, s->insert_data, data_len, char);
1911 0           s->insert_data_len = data_len;
1912             }
1913             }
1914              
1915 0           s->cb = SvREFCNT_inc(cb);
1916 0           enqueue_send(self, s);
1917             }
1918              
1919             void
1920             ping(EV::ClickHouse self, SV *cb)
1921             CODE:
1922             {
1923             ev_ch_send_t *s;
1924             char *req;
1925             size_t req_len;
1926              
1927 0 0         if (!self->connected && !self->connecting && !self->dns_pending) croak("not connected");
    0          
    0          
1928 0 0         if (!(SvROK(cb) && SvTYPE(SvRV(cb)) == SVt_PVCV))
    0          
1929 0           croak("callback must be a CODE reference");
1930              
1931 0 0         if (self->protocol == PROTO_HTTP) {
1932 0           req = build_http_ping_request(self, &req_len);
1933             } else {
1934 0           req = build_native_ping(&req_len);
1935             }
1936              
1937 0           s = alloc_send();
1938 0           s->data = req;
1939 0           s->data_len = req_len;
1940 0           s->cb = SvREFCNT_inc(cb);
1941 0           enqueue_send(self, s);
1942             }
1943              
1944             SV*
1945             on_connect(EV::ClickHouse self, SV *handler = NULL)
1946             CODE:
1947             {
1948 0           RETVAL = handler_accessor(&self->on_connect, handler, items > 1);
1949             }
1950             OUTPUT:
1951             RETVAL
1952              
1953             SV*
1954             on_error(EV::ClickHouse self, SV *handler = NULL)
1955             CODE:
1956             {
1957 0           RETVAL = handler_accessor(&self->on_error, handler, items > 1);
1958             }
1959             OUTPUT:
1960             RETVAL
1961              
1962             SV*
1963             on_progress(EV::ClickHouse self, SV *handler = NULL)
1964             CODE:
1965             {
1966 0           RETVAL = handler_accessor(&self->on_progress, handler, items > 1);
1967             }
1968             OUTPUT:
1969             RETVAL
1970              
1971             int
1972             is_connected(EV::ClickHouse self)
1973             CODE:
1974             {
1975 0 0         RETVAL = self->connected ? 1 : 0;
1976             }
1977             OUTPUT:
1978             RETVAL
1979              
1980             int
1981             pending_count(EV::ClickHouse self)
1982             CODE:
1983             {
1984 0 0         RETVAL = self->pending_count;
1985             }
1986             OUTPUT:
1987             RETVAL
1988              
1989             # Snapshot of pending queries: returns arrayref of hashrefs. The
1990             # in-flight head (if any) appears first with state => 'in_flight',
1991             # query_id => last_query_id, age => seconds since dispatch. Queued
1992             # entries follow in dispatch order with state => 'queued' and the
1993             # query_id from settings (or undef if none was supplied). We only
1994             # expose what is cheaply reachable from struct state — full SQL or
1995             # settings hashes aren't retained after enqueue.
1996             SV*
1997             pending_queries(EV::ClickHouse self)
1998             CODE:
1999             {
2000 0           AV *out = newAV();
2001 0 0         if (!ngx_queue_empty(&self->cb_queue)) {
2002 0           HV *h = newHV();
2003 0           (void)hv_stores(h, "state", newSVpvs("in_flight"));
2004 0 0         (void)hv_stores(h, "query_id",
2005             self->last_query_id
2006             ? newSVpv(self->last_query_id, 0)
2007             : newSV(0));
2008 0           double age = self->query_start_time > 0
2009 0 0         ? ev_now(self->loop) - self->query_start_time : 0.0;
2010 0           (void)hv_stores(h, "age", newSVnv(age));
2011 0           av_push(out, newRV_noinc((SV *)h));
2012             }
2013             ngx_queue_t *q;
2014 0           for (q = ngx_queue_head(&self->send_queue);
2015 0 0         q != ngx_queue_sentinel(&self->send_queue);
2016 0           q = ngx_queue_next(q)) {
2017 0           ev_ch_send_t *s = ngx_queue_data(q, ev_ch_send_t, queue);
2018 0           HV *h = newHV();
2019 0           (void)hv_stores(h, "state", newSVpvs("queued"));
2020 0 0         (void)hv_stores(h, "query_id",
2021             s->query_id ? newSVpv(s->query_id, 0) : newSV(0));
2022 0           (void)hv_stores(h, "age", newSVnv(0.0));
2023 0           av_push(out, newRV_noinc((SV *)h));
2024             }
2025 0           RETVAL = newRV_noinc((SV *)out);
2026             }
2027             OUTPUT:
2028             RETVAL
2029              
2030             # Diagnostic snapshot of internal struct state — useful for debugging
2031             # stuck connections / leaks. Returns a hashref with a small fixed set
2032             # of fields; do NOT script against this in production (the shape may
2033             # change between versions).
2034             SV*
2035             dump_state(EV::ClickHouse self)
2036             CODE:
2037             {
2038 0           HV *h = newHV();
2039 0           (void)hv_stores(h, "connected", newSViv(self->connected ? 1 : 0));
2040 0           (void)hv_stores(h, "connecting", newSViv(self->connecting ? 1 : 0));
2041 0           (void)hv_stores(h, "dns_pending", newSViv(self->dns_pending ? 1 : 0));
2042 0           (void)hv_stores(h, "pending_count", newSViv(self->pending_count));
2043 0           (void)hv_stores(h, "callback_depth", newSViv(self->callback_depth));
2044 0           (void)hv_stores(h, "send_len", newSVuv((UV)self->send_len));
2045 0           (void)hv_stores(h, "send_pos", newSVuv((UV)self->send_pos));
2046 0           (void)hv_stores(h, "send_cap", newSVuv((UV)self->send_cap));
2047 0           (void)hv_stores(h, "recv_len", newSVuv((UV)self->recv_len));
2048 0           (void)hv_stores(h, "recv_cap", newSVuv((UV)self->recv_cap));
2049 0           (void)hv_stores(h, "fd", newSViv(self->fd));
2050 0 0         (void)hv_stores(h, "protocol", newSVpv(self->protocol == PROTO_NATIVE ? "native" : "http", 0));
2051 0           (void)hv_stores(h, "server_revision", newSViv(self->server_revision));
2052 0           (void)hv_stores(h, "reconnect_attempts", newSViv(self->reconnect_attempts));
2053 0 0         (void)hv_stores(h, "host",
2054             self->host ? newSVpv(self->host, 0) : newSV(0));
2055 0           (void)hv_stores(h, "port", newSVuv(self->port));
2056 0           (void)hv_stores(h, "send_count", newSVuv((UV)self->send_count));
2057 0           (void)hv_stores(h, "compress", newSViv(self->compress ? 1 : 0));
2058 0           (void)hv_stores(h, "tls", newSViv(self->ssl ? 1 : 0));
2059 0           RETVAL = newRV_noinc((SV *)h);
2060             }
2061             OUTPUT:
2062             RETVAL
2063              
2064             SV *
2065             current_host(EV::ClickHouse self)
2066             CODE:
2067             {
2068 0 0         RETVAL = self->host ? newSVpv(self->host, 0) : &PL_sv_undef;
2069             }
2070             OUTPUT:
2071             RETVAL
2072              
2073             unsigned int
2074             current_port(EV::ClickHouse self)
2075             CODE:
2076             {
2077 0 0         RETVAL = self->port;
2078             }
2079             OUTPUT:
2080             RETVAL
2081              
2082             SV *
2083             server_info(EV::ClickHouse self)
2084             CODE:
2085             {
2086 0 0         if (self->server_name) {
2087             char buf[256];
2088 0           int n = snprintf(buf, sizeof(buf), "%s %u.%u.%u (revision %u)",
2089             self->server_name,
2090             self->server_version_major,
2091             self->server_version_minor,
2092             self->server_version_patch,
2093             self->server_revision);
2094 0 0         if (n >= (int)sizeof(buf)) n = (int)sizeof(buf) - 1;
2095 0           RETVAL = newSVpvn(buf, n);
2096             } else {
2097 0           RETVAL = &PL_sv_undef;
2098             }
2099             }
2100             OUTPUT:
2101             RETVAL
2102              
2103             SV *
2104             server_version(EV::ClickHouse self)
2105             CODE:
2106             {
2107 0 0         if (self->server_revision) {
2108             char buf[64];
2109 0           int n = snprintf(buf, sizeof(buf), "%u.%u.%u",
2110             self->server_version_major,
2111             self->server_version_minor,
2112             self->server_version_patch);
2113 0 0         if (n >= (int)sizeof(buf)) n = (int)sizeof(buf) - 1;
2114 0           RETVAL = newSVpvn(buf, n);
2115             } else {
2116 0           RETVAL = &PL_sv_undef;
2117             }
2118             }
2119             OUTPUT:
2120             RETVAL
2121              
2122             UV
2123             server_revision(EV::ClickHouse self)
2124             CODE:
2125             {
2126 0 0         RETVAL = (UV)self->server_revision;
2127             }
2128             OUTPUT:
2129             RETVAL
2130              
2131             void
2132             skip_pending(EV::ClickHouse self)
2133             CODE:
2134             {
2135             /* Cancel queued + in-flight callbacks first (delivers errors), then
2136             * tear down the socket if a request was on the wire — capture the
2137             * had_inflight state up front because cancel_pending zeroes send_count. */
2138 0           int had_inflight = self->send_count > 0;
2139 0 0         if (cancel_pending(self, "skipped")) return;
2140 0 0         CLEAR_INSERT(self);
    0          
2141 0 0         CLEAR_STR(self->insert_err);
2142 0 0         if (had_inflight)
2143 0           (void)cleanup_connection(self); /* nothing follows — freed-or-not is moot */
2144             }
2145              
2146             void
2147             _set_protocol(EV::ClickHouse self, int proto)
2148             CODE:
2149             {
2150 0           self->protocol = proto;
2151             }
2152              
2153             void
2154             _set_compress(EV::ClickHouse self, int val)
2155             CODE:
2156             {
2157 0           self->compress = val;
2158             }
2159              
2160             void
2161             _set_session_id(EV::ClickHouse self, const char *sid)
2162             CODE:
2163             {
2164 0 0         CLEAR_STR(self->session_id);
2165 0           self->session_id = safe_strdup(sid);
2166             }
2167              
2168             void
2169             _set_query_log_comment(EV::ClickHouse self, const char *cmt)
2170             CODE:
2171             {
2172 0 0         CLEAR_STR(self->query_log_comment);
2173 0           self->query_log_comment = safe_strdup(cmt);
2174             }
2175              
2176             void
2177             _set_host(EV::ClickHouse self, const char *host, unsigned int port)
2178             CODE:
2179             {
2180 0 0         CLEAR_STR(self->host);
2181 0           self->host = safe_strdup(host);
2182 0           self->port = port;
2183             /* Don't reset reconnect_attempts here — that would defeat
2184             * reconnect_max_attempts for failover (every host advance would
2185             * restart the budget). Backoff naturally widens across the rotation,
2186             * which is what we want when every server is unreachable. */
2187             }
2188              
2189             void
2190             _set_dns_pending(EV::ClickHouse self, int v)
2191             CODE:
2192             {
2193 0           self->dns_pending = v ? 1 : 0;
2194             }
2195              
2196             int
2197             _take_dns_pending(EV::ClickHouse self)
2198             CODE:
2199             {
2200             /* Atomically read-and-clear. Returns 1 only on the first call after
2201             * dns_pending was set, so the Perl DNS callback can detect a finish
2202             * that ran during resolution and skip the post-DNS connect. */
2203 0           RETVAL = self->dns_pending;
2204 0 0         self->dns_pending = 0;
2205             }
2206             OUTPUT:
2207             RETVAL
2208              
2209             void
2210             _set_connect_timeout(EV::ClickHouse self, NV val)
2211             CODE:
2212             {
2213 0           self->connect_timeout = val;
2214             }
2215              
2216             void
2217             _set_tls(EV::ClickHouse self, int val)
2218             CODE:
2219             #ifdef HAVE_OPENSSL
2220 0           self->tls_enabled = val;
2221             #else
2222             if (val) croak("TLS support not compiled in (OpenSSL not found)");
2223             #endif
2224              
2225             void
2226             _set_settings(EV::ClickHouse self, SV *href)
2227             CODE:
2228             {
2229 0 0         if (!(SvROK(href) && SvTYPE(SvRV(href)) == SVt_PVHV))
    0          
2230 0           croak("settings must be a HASH reference");
2231 0 0         CLEAR_SV(self->default_settings);
2232 0           self->default_settings = (HV *)SvREFCNT_inc(SvRV(href));
2233             }
2234              
2235             SV*
2236             on_disconnect(EV::ClickHouse self, SV *handler = NULL)
2237             CODE:
2238             {
2239 0           RETVAL = handler_accessor(&self->on_disconnect, handler, items > 1);
2240             }
2241             OUTPUT:
2242             RETVAL
2243              
2244             SV*
2245             on_query_complete(EV::ClickHouse self, SV *handler = NULL)
2246             CODE:
2247             {
2248 0           RETVAL = handler_accessor(&self->on_query_complete, handler, items > 1);
2249             }
2250             OUTPUT:
2251             RETVAL
2252              
2253             SV*
2254             on_query_start(EV::ClickHouse self, SV *handler = NULL)
2255             CODE:
2256             {
2257 0           RETVAL = handler_accessor(&self->on_query_start, handler, items > 1);
2258             }
2259             OUTPUT:
2260             RETVAL
2261              
2262             SV*
2263             on_log(EV::ClickHouse self, SV *handler = NULL)
2264             CODE:
2265             {
2266 0           RETVAL = handler_accessor(&self->on_log, handler, items > 1);
2267             }
2268             OUTPUT:
2269             RETVAL
2270              
2271             SV*
2272             last_tls_error(EV::ClickHouse self)
2273             CODE:
2274             {
2275 0 0         RETVAL = self->last_tls_error ? newSVpv(self->last_tls_error, 0) : &PL_sv_undef;
2276             }
2277             OUTPUT:
2278             RETVAL
2279              
2280             SV*
2281             on_failover(EV::ClickHouse self, SV *handler = NULL)
2282             CODE:
2283             {
2284 0           RETVAL = handler_accessor(&self->on_failover, handler, items > 1);
2285             }
2286             OUTPUT:
2287             RETVAL
2288              
2289             void
2290             _set_failover(EV::ClickHouse self, SV *hosts_av_ref, unsigned int default_port)
2291             CODE:
2292             {
2293 0           failover_free_hosts(self);
2294 0 0         if (!SvOK(hosts_av_ref)) return;
2295 0 0         if (!(SvROK(hosts_av_ref) && SvTYPE(SvRV(hosts_av_ref)) == SVt_PVAV))
    0          
2296 0           croak("_set_failover: hosts must be an arrayref");
2297 0           AV *hosts = (AV*)SvRV(hosts_av_ref);
2298 0 0         SSize_t n = av_top_index(hosts) + 1;
2299 0 0         if (n <= 0) return;
2300 0 0         Newx(self->failover_hosts, n, char *);
2301 0 0         Newx(self->failover_ports, n, unsigned int);
2302 0           self->failover_n = (int)n;
2303 0           self->failover_idx = 0;
2304 0           self->failover_default_port = default_port;
2305             /* Each entry is "host" or "host:port" or "[ipv6]:port". Parse here so
2306             * the hot emit_error path doesn't have to. */
2307 0 0         for (SSize_t i = 0; i < n; i++) {
2308 0           SV **e = av_fetch(hosts, i, 0);
2309 0 0         if (!e || !SvOK(*e)) {
    0          
2310 0           self->failover_hosts[i] = safe_strdup("");
2311 0           self->failover_ports[i] = default_port;
2312 0           continue;
2313             }
2314             STRLEN sl;
2315 0           const char *s = SvPV(*e, sl);
2316 0           unsigned int port = default_port;
2317 0           char *host = NULL;
2318             const char *colon;
2319 0 0         if (sl > 0 && s[0] == '[') {
    0          
2320 0           const char *close = (const char *)memchr(s, ']', sl);
2321 0 0         if (close) {
2322 0           Newx(host, close - s, char);
2323 0           memcpy(host, s + 1, close - s - 1);
2324 0           host[close - s - 1] = '\0';
2325 0 0         if (close + 1 < s + sl && close[1] == ':')
    0          
2326 0           port = (unsigned int)atoi(close + 2);
2327             }
2328             }
2329 0 0         if (!host && (colon = (const char *)memchr(s, ':', sl))) {
    0          
2330 0           size_t hl = colon - s;
2331 0           Newx(host, hl + 1, char);
2332 0           memcpy(host, s, hl);
2333 0           host[hl] = '\0';
2334 0           port = (unsigned int)atoi(colon + 1);
2335             }
2336 0 0         if (!host) host = safe_strdup(s);
2337 0           self->failover_hosts[i] = host;
2338 0           self->failover_ports[i] = port;
2339             }
2340             }
2341              
2342             SV *
2343             server_timezone(EV::ClickHouse self)
2344             CODE:
2345             {
2346 0           RETVAL = self->server_timezone ? newSVpv(self->server_timezone, 0)
2347 0 0         : &PL_sv_undef;
2348             }
2349             OUTPUT:
2350             RETVAL
2351              
2352             void
2353             _set_tls_skip_verify(EV::ClickHouse self, int val)
2354             CODE:
2355             {
2356 0           self->tls_skip_verify = val;
2357             }
2358              
2359             void
2360             _set_query_timeout(EV::ClickHouse self, NV val)
2361             CODE:
2362             {
2363 0           self->query_timeout = val;
2364             }
2365              
2366             void
2367             _set_max_recv_buffer(EV::ClickHouse self, UV val)
2368             CODE:
2369             {
2370 0           self->max_recv_buffer = (size_t)val;
2371             }
2372              
2373             void
2374             _set_http_basic_auth(EV::ClickHouse self, int v)
2375             CODE:
2376             {
2377 0           self->http_basic_auth = v ? 1 : 0;
2378             }
2379              
2380             void
2381             _set_max_query_size(EV::ClickHouse self, UV val)
2382             CODE:
2383             {
2384 0           self->max_query_size = (size_t)val;
2385             }
2386              
2387             void
2388             _set_auto_reconnect(EV::ClickHouse self, int val)
2389             CODE:
2390             {
2391 0           self->auto_reconnect = val;
2392             }
2393              
2394             void
2395             _set_decode_flags(EV::ClickHouse self, unsigned int flags)
2396             CODE:
2397             {
2398 0           self->decode_flags = (uint32_t)flags;
2399             }
2400              
2401             SV *
2402             column_names(EV::ClickHouse self)
2403             CODE:
2404             {
2405 0           RETVAL = self->native_col_names ? newRV_inc((SV*)self->native_col_names)
2406 0 0         : &PL_sv_undef;
2407             }
2408             OUTPUT:
2409             RETVAL
2410              
2411             SV *
2412             last_query_id(EV::ClickHouse self)
2413             CODE:
2414             {
2415 0           RETVAL = self->last_query_id ? newSVpv(self->last_query_id, 0)
2416 0 0         : &PL_sv_undef;
2417             }
2418             OUTPUT:
2419             RETVAL
2420              
2421             SV *
2422             last_error_code(EV::ClickHouse self)
2423             CODE:
2424             {
2425 0           RETVAL = newSViv(self->last_error_code);
2426             }
2427             OUTPUT:
2428             RETVAL
2429              
2430             SV *
2431             column_types(EV::ClickHouse self)
2432             CODE:
2433             {
2434 0           RETVAL = self->native_col_types ? newRV_inc((SV*)self->native_col_types)
2435 0 0         : &PL_sv_undef;
2436             }
2437             OUTPUT:
2438             RETVAL
2439              
2440             SV *
2441             last_totals(EV::ClickHouse self)
2442             CODE:
2443             {
2444 0           RETVAL = self->native_totals ? newRV_inc((SV*)self->native_totals)
2445 0 0         : &PL_sv_undef;
2446             }
2447             OUTPUT:
2448             RETVAL
2449              
2450             SV *
2451             last_extremes(EV::ClickHouse self)
2452             CODE:
2453             {
2454 0           RETVAL = self->native_extremes ? newRV_inc((SV*)self->native_extremes)
2455 0 0         : &PL_sv_undef;
2456             }
2457             OUTPUT:
2458             RETVAL
2459              
2460             SV *
2461             profile_rows_before_limit(EV::ClickHouse self)
2462             CODE:
2463             {
2464 0           RETVAL = newSVuv(self->profile_rows_before_limit);
2465             }
2466             OUTPUT:
2467             RETVAL
2468              
2469             SV *
2470             profile_rows(EV::ClickHouse self)
2471             CODE:
2472             {
2473 0           RETVAL = newSVuv(self->profile_rows);
2474             }
2475             OUTPUT:
2476             RETVAL
2477              
2478             SV *
2479             profile_bytes(EV::ClickHouse self)
2480             CODE:
2481             {
2482 0           RETVAL = newSVuv(self->profile_bytes);
2483             }
2484             OUTPUT:
2485             RETVAL
2486              
2487             SV*
2488             on_trace(EV::ClickHouse self, SV *handler = NULL)
2489             CODE:
2490             {
2491 0           RETVAL = handler_accessor(&self->on_trace, handler, items > 1);
2492             }
2493             OUTPUT:
2494             RETVAL
2495              
2496             void
2497             _set_keepalive(EV::ClickHouse self, double val)
2498             CODE:
2499             {
2500 0           self->keepalive = val;
2501             }
2502              
2503             void
2504             _set_reconnect_delay(EV::ClickHouse self, double val)
2505             CODE:
2506             {
2507 0           self->reconnect_delay = val;
2508             }
2509              
2510             void
2511             _set_reconnect_max_delay(EV::ClickHouse self, double val)
2512             CODE:
2513             {
2514 0           self->reconnect_max_delay = val;
2515             }
2516              
2517             void
2518             _set_reconnect_jitter(EV::ClickHouse self, double val)
2519             CODE:
2520             {
2521 0 0         self->reconnect_jitter = val < 0 ? 0 : val;
2522             }
2523              
2524             void
2525             _set_reconnect_max_attempts(EV::ClickHouse self, int val)
2526             CODE:
2527             {
2528 0           self->reconnect_max_attempts = val;
2529             }
2530              
2531             void
2532             _set_progress_period(EV::ClickHouse self, double val)
2533             CODE:
2534             {
2535 0           self->progress_period = val;
2536             }
2537              
2538             void
2539             drain(EV::ClickHouse self, SV *cb)
2540             CODE:
2541             {
2542 0 0         if (!(SvROK(cb) && SvTYPE(SvRV(cb)) == SVt_PVCV))
    0          
2543 0           croak("drain callback must be a CODE reference");
2544 0 0         CLEAR_SV(self->on_drain);
2545 0 0         if (self->pending_count == 0 && ngx_queue_empty(&self->send_queue)) {
    0          
2546             /* Nothing pending — fire immediately */
2547 0           (void)fire_zero_arg_cb(self, cb, "drain");
2548             } else {
2549 0           self->on_drain = SvREFCNT_inc(cb);
2550             }
2551             }
2552              
2553             void
2554             cancel(EV::ClickHouse self)
2555             CODE:
2556             {
2557 0 0         if (self->protocol == PROTO_NATIVE && self->send_count > 0) {
    0          
2558             /* Send CLIENT_CANCEL packet */
2559             native_buf_t pkt;
2560 0           nbuf_init(&pkt);
2561 0           nbuf_varuint(&pkt, CLIENT_CANCEL);
2562 0           ensure_send_cap(self, self->send_len + pkt.len);
2563 0           Copy(pkt.data, self->send_buf + self->send_len, pkt.len, char);
2564 0           self->send_len += pkt.len;
2565 0           Safefree(pkt.data);
2566 0           start_writing(self);
2567             /* We still need to wait for EndOfStream or Exception from server */
2568 0 0         } else if (self->protocol == PROTO_HTTP && self->send_count > 0) {
    0          
2569             /* HTTP: close connection to cancel */
2570 0 0         CLEAR_SV(self->native_rows);
2571 0           int gen = self->connect_gen;
2572 0 0         if (cancel_pending(self, "query cancelled")) return;
2573 0 0         if (self->connect_gen != gen) return;
2574 0 0         if (cleanup_connection(self)) return; /* on_disconnect freed self */
2575 0 0         if (self->auto_reconnect && self->host)
    0          
2576 0           schedule_reconnect(self);
2577             }
2578             }
2579              
2580             # --- XS-resident hot-path helpers (Streamer, Pool, Iterator, breaker) ---
2581             # Each takes the Perl object hash directly; they read/write hash slots
2582             # from C and call back into Perl only for the cold paths (_flush,
2583             # on_high_water; EV::run is invoked via the C API).
2584              
2585             SV *
2586             _streamer_push_row(SV *self_sv, SV *row)
2587             CODE:
2588             {
2589 0 0         if (!(SvROK(self_sv) && SvTYPE(SvRV(self_sv)) == SVt_PVHV))
    0          
2590 0           croak("_streamer_push_row: self must be a hash ref");
2591 0           HV *self = (HV*)SvRV(self_sv);
2592              
2593 0           SV **buf_p = hv_fetchs(self, "buffer", 0);
2594 0 0         if (!buf_p || !SvROK(*buf_p) || SvTYPE(SvRV(*buf_p)) != SVt_PVAV)
    0          
    0          
2595 0           croak("_streamer_push_row: buffer slot missing");
2596 0           AV *buffer = (AV*)SvRV(*buf_p);
2597              
2598             SV *to_push;
2599 0 0         if (SvROK(row) && SvTYPE(SvRV(row)) == SVt_PVHV) {
    0          
2600 0           SV **cols_p = hv_fetchs(self, "columns", 0);
2601 0 0         if (!cols_p || !SvROK(*cols_p) || SvTYPE(SvRV(*cols_p)) != SVt_PVAV)
    0          
    0          
2602 0           croak("push_row(\\%%hash) requires columns => [...] at insert_streamer creation");
2603 0           AV *cols = (AV*)SvRV(*cols_p);
2604 0           HV *h = (HV*)SvRV(row);
2605 0 0         SSize_t n = av_top_index(cols) + 1;
2606 0           AV *out = newAV();
2607 0           av_extend(out, n - 1);
2608 0 0         for (SSize_t i = 0; i < n; i++) {
2609 0           SV **col = av_fetch(cols, i, 0);
2610 0 0         if (!col) { av_push(out, newSV(0)); continue; }
2611             STRLEN cl;
2612 0           const char *cp = SvPV(*col, cl);
2613 0           SV **vp = hv_fetch(h, cp, cl, 0);
2614 0 0         av_push(out, vp ? newSVsv(*vp) : newSV(0));
2615             }
2616 0           to_push = newRV_noinc((SV*)out);
2617             } else {
2618 0           to_push = newSVsv(row);
2619             }
2620 0           av_push(buffer, to_push);
2621 0 0         SSize_t buf_n = av_top_index(buffer) + 1;
2622              
2623 0           SV **bs_p = hv_fetchs(self, "batch_size", 0);
2624 0 0         SSize_t batch = bs_p ? SvIV(*bs_p) : 10000;
2625 0           int need_flush = buf_n >= batch;
2626              
2627 0           int need_hw = 0;
2628 0           SV **hw_p = hv_fetchs(self, "high_water", 0);
2629 0 0         SSize_t hw = hw_p ? SvIV(*hw_p) : 0;
2630 0 0         if (hw && buf_n >= hw) {
    0          
2631 0           SV **act_p = hv_fetchs(self, "high_water_active", 0);
2632 0 0         if (!act_p || !SvTRUE(*act_p)) {
    0          
2633 0           (void)hv_stores(self, "high_water_active", newSViv(1));
2634 0           need_hw = 1;
2635             }
2636             }
2637              
2638             /* G_EVAL on both dispatches: insert() reachable from _flush can
2639             * croak ("not connected" etc.); the user's on_high_water is also
2640             * untrusted. Match the rest of the codebase's exception policy. */
2641 0 0         if (need_flush) {
2642 0           dSP;
2643 0           ENTER; SAVETMPS;
2644 0 0         PUSHMARK(SP);
2645 0 0         XPUSHs(self_sv);
2646 0           PUTBACK;
2647 0           call_method("_flush", G_DISCARD | G_EVAL);
2648 0 0         WARN_AND_CLEAR_ERRSV("Streamer _flush");
    0          
    0          
    0          
2649 0 0         FREETMPS; LEAVE;
2650             }
2651              
2652 0 0         if (need_hw) {
2653 0           SV **cb_p = hv_fetchs(self, "on_high_water", 0);
2654 0 0         if (cb_p && SvROK(*cb_p) && SvTYPE(SvRV(*cb_p)) == SVt_PVCV) {
    0          
    0          
2655 0           SV **inflight_p = hv_fetchs(self, "in_flight", 0);
2656 0 0         IV inflight = inflight_p ? SvIV(*inflight_p) : 0;
2657 0           dSP;
2658 0           ENTER; SAVETMPS;
2659 0 0         PUSHMARK(SP);
2660 0 0         mXPUSHi(buf_n);
2661 0 0         mXPUSHi(inflight);
2662 0           PUTBACK;
2663 0           call_sv(*cb_p, G_DISCARD | G_EVAL);
2664 0 0         WARN_AND_CLEAR_ERRSV("on_high_water");
    0          
    0          
    0          
2665 0 0         FREETMPS; LEAVE;
2666             }
2667             }
2668              
2669             /* Return self to allow chaining ($s->push_row(...)->push_row(...))
2670             * without allocating a fresh RV per call. */
2671 0           RETVAL = SvREFCNT_inc(self_sv);
2672             }
2673             OUTPUT:
2674             RETVAL
2675              
2676             SV *
2677             _pool_pick(SV *pool_sv)
2678             CODE:
2679             {
2680 0 0         if (!(SvROK(pool_sv) && SvTYPE(SvRV(pool_sv)) == SVt_PVHV))
    0          
2681 0           croak("_pool_pick: pool must be a hash ref");
2682 0           HV *pool = (HV*)SvRV(pool_sv);
2683              
2684 0           SV **conns_p = hv_fetchs(pool, "conns", 0);
2685 0 0         if (!conns_p || !SvROK(*conns_p) || SvTYPE(SvRV(*conns_p)) != SVt_PVAV)
    0          
    0          
2686 0           croak("_pool_pick: conns slot missing");
2687 0           AV *conns = (AV*)SvRV(*conns_p);
2688 0 0         SSize_t n = av_top_index(conns) + 1;
2689 0 0         if (n <= 0) croak("_pool_pick: empty pool");
2690              
2691 0           SV **thresh_p = hv_fetchs(pool, "cb_thresh", 0);
2692 0 0         IV thresh = thresh_p ? SvIV(*thresh_p) : 0;
2693 0           AV *cb_state = NULL;
2694 0           double now = 0;
2695 0 0         if (thresh > 0) {
2696 0           SV **cs_p = hv_fetchs(pool, "cb_state", 0);
2697 0 0         if (cs_p && SvROK(*cs_p) && SvTYPE(SvRV(*cs_p)) == SVt_PVAV)
    0          
    0          
2698 0           cb_state = (AV*)SvRV(*cs_p);
2699 0           now = ev_time();
2700             }
2701             /* with_session "pinned" members get round-robined LAST — they're
2702             * still selectable but only as a fallback. _pinned is a refaddr-
2703             * keyed hash maintained by Pool::with_session in the .pm. */
2704 0           HV *pinned = NULL;
2705             {
2706 0           SV **p_p = hv_fetchs(pool, "_pinned", 0);
2707 0 0         if (p_p && SvROK(*p_p) && SvTYPE(SvRV(*p_p)) == SVt_PVHV)
    0          
    0          
2708 0           pinned = (HV*)SvRV(*p_p);
2709             }
2710              
2711             int stack_ties[64];
2712 0           int *ties = stack_ties;
2713 0           int ties_cap = 64;
2714 0 0         if (n > ties_cap) {
2715 0 0         Newx(ties, n, int);
2716 0           ties_cap = (int)n;
2717             }
2718              
2719 0           int best = -1;
2720 0           int best_n = 0;
2721 0           int n_ties = 0;
2722             /* 3 passes: live+unpinned, live (incl pinned), all members (fallback). */
2723 0 0         for (int pass = 0; pass < 3; pass++) {
2724 0 0         for (SSize_t i = 0; i < n; i++) {
2725 0 0         if (pass < 2 && cb_state) {
    0          
2726 0           SV **slot_sv = av_fetch(cb_state, i, 0);
2727 0 0         if (slot_sv && SvROK(*slot_sv) && SvTYPE(SvRV(*slot_sv)) == SVt_PVHV) {
    0          
    0          
2728 0           HV *slot = (HV*)SvRV(*slot_sv);
2729 0           SV **du_p = hv_fetchs(slot, "dead_until", 0);
2730 0 0         if (du_p && SvNV(*du_p) > now) continue;
    0          
2731             }
2732             }
2733 0           SV **csv = av_fetch(conns, i, 0);
2734 0 0         if (!csv || !SvROK(*csv) || !sv_isa(*csv, "EV::ClickHouse")) continue;
    0          
    0          
2735 0           ev_clickhouse_t *ch = INT2PTR(ev_clickhouse_t*, SvIV(SvRV(*csv)));
2736 0 0         if (ch->magic != EV_CH_MAGIC) continue; /* freed mid-callback */
2737 0 0         if (pass == 0 && pinned) {
    0          
2738             /* Match Pool::with_session's `refaddr($conn)` key —
2739             * that's the address of the referent SV (SvRV(*csv)),
2740             * not the struct pointer. */
2741             char key[32];
2742 0           int klen = snprintf(key, sizeof(key), "%lu",
2743 0           (unsigned long)(uintptr_t)SvRV(*csv));
2744 0 0         if (hv_exists(pinned, key, klen)) continue;
2745             }
2746 0           int pc = ch->pending_count;
2747 0 0         if (best < 0 || pc < best_n) {
    0          
2748 0           best_n = pc;
2749 0           best = (int)i;
2750 0           ties[0] = (int)i;
2751 0           n_ties = 1;
2752 0 0         } else if (pc == best_n) {
2753 0 0         if (n_ties < ties_cap) ties[n_ties++] = (int)i;
2754             }
2755             }
2756 0 0         if (best >= 0) break;
2757             }
2758              
2759             /* Defensive: if every entry was filtered out (sv_isa mismatch /
2760             * magic mismatch / external corruption) all three passes leave
2761             * best=-1. Free the heap ties array before croaking. */
2762 0 0         if (best < 0) {
2763 0 0         if (ties != stack_ties) Safefree(ties);
2764 0           croak("_pool_pick: no valid EV::ClickHouse entries in pool");
2765             }
2766              
2767             int picked;
2768 0 0         if (n_ties == 1) {
2769 0           picked = ties[0];
2770             } else {
2771 0           SV **idx_p = hv_fetchs(pool, "idx", 0);
2772 0 0         IV idx = idx_p ? SvIV(*idx_p) : 0;
2773 0           picked = ties[idx % n_ties];
2774             /* Store a monotonic counter, not (idx+1) % n_ties: n_ties varies
2775             * per call, so a mod-n_ties write would pin idx into the smallest
2776             * recent tie-set and starve higher tie indices under fluctuating
2777             * load. The mod is applied at read time above. */
2778 0           (void)hv_stores(pool, "idx", newSViv(idx + 1));
2779             }
2780 0 0         if (ties != stack_ties) Safefree(ties);
2781              
2782 0           SV **csv = av_fetch(conns, picked, 0);
2783 0           RETVAL = SvREFCNT_inc(*csv);
2784             }
2785             OUTPUT:
2786             RETVAL
2787              
2788             SV *
2789             _iterator_next(SV *self_sv, SV *timeout_sv = &PL_sv_undef)
2790             CODE:
2791             {
2792 0 0         if (!(SvROK(self_sv) && SvTYPE(SvRV(self_sv)) == SVt_PVHV))
    0          
2793 0           croak("_iterator_next: self must be a hash ref");
2794 0           HV *self = (HV*)SvRV(self_sv);
2795 0           SV **ch_p = hv_fetchs(self, "ch", 0);
2796 0 0         if (!ch_p || !SvROK(*ch_p) || !sv_isa(*ch_p, "EV::ClickHouse"))
    0          
    0          
2797 0           croak("_iterator_next: ch slot is not an EV::ClickHouse");
2798 0           ev_clickhouse_t *ch = INT2PTR(ev_clickhouse_t*, SvIV(SvRV(*ch_p)));
2799 0 0         struct ev_loop *loop = ch->loop ? ch->loop : EV_DEFAULT;
2800              
2801 0           SV **batches_p = hv_fetchs(self, "batches", 0);
2802 0 0         if (!batches_p || !SvROK(*batches_p) || SvTYPE(SvRV(*batches_p)) != SVt_PVAV)
    0          
    0          
2803 0           croak("_iterator_next: batches slot missing");
2804 0           AV *batches = (AV*)SvRV(*batches_p);
2805              
2806 0           double timeout = 0;
2807 0 0         if (SvOK(timeout_sv)) {
2808 0           double t = SvNV(timeout_sv);
2809 0 0         if (t > 0) timeout = t;
2810             }
2811 0 0         double expires = timeout > 0 ? ev_now(loop) + timeout : 0;
2812              
2813             ev_timer to;
2814 0           int armed = 0;
2815              
2816 0 0         while (av_top_index(batches) < 0) {
    0          
2817 0           SV **done_p = hv_fetchs(self, "done", 0);
2818 0 0         if (done_p && SvTRUE(*done_p)) break;
    0          
2819 0 0         if (expires) {
2820 0           double left = expires - ev_now(loop);
2821 0 0         if (left <= 0) {
2822 0 0         if (armed) { ev_timer_stop(loop, &to); armed = 0; }
2823 0           RETVAL = &PL_sv_undef;
2824 0           goto out;
2825             }
2826 0           ev_timer_init(&to, iter_timeout_cb, left, 0);
2827 0           ev_timer_start(loop, &to);
2828 0           armed = 1;
2829             }
2830 0           ev_run(loop, 0);
2831 0 0         if (armed) { ev_timer_stop(loop, &to); armed = 0; }
2832 0 0         if (av_top_index(batches) < 0) {
    0          
2833 0           SV **dp2 = hv_fetchs(self, "done", 0);
2834 0 0         if (!dp2 || !SvTRUE(*dp2)) {
    0          
2835 0           RETVAL = &PL_sv_undef;
2836 0           goto out;
2837             }
2838 0           break;
2839             }
2840             }
2841              
2842 0 0         if (av_top_index(batches) >= 0) {
    0          
2843 0           SV *b = av_shift(batches);
2844 0 0         RETVAL = b ? b : &PL_sv_undef;
2845             } else {
2846 0           RETVAL = &PL_sv_undef;
2847             }
2848 0           out:
2849             ;
2850             }
2851             OUTPUT:
2852             RETVAL
2853              
2854             void
2855             _breaker_observe(SV *slot_sv, SV *err_sv, IV threshold, NV cooldown)
2856             CODE:
2857             {
2858 0 0         if (!(SvROK(slot_sv) && SvTYPE(SvRV(slot_sv)) == SVt_PVHV)) return;
    0          
2859 0           HV *slot = (HV*)SvRV(slot_sv);
2860 0 0         if (SvTRUE(err_sv)) {
2861 0           SV **fails_p = hv_fetchs(slot, "fails", 0);
2862 0 0         IV fails = (fails_p ? SvIV(*fails_p) : 0) + 1;
2863 0           (void)hv_stores(slot, "fails", newSViv(fails));
2864 0 0         if (threshold > 0 && fails >= threshold) {
    0          
2865 0           (void)hv_stores(slot, "dead_until",
2866             newSVnv(ev_time() + cooldown));
2867             }
2868             } else {
2869 0           (void)hv_stores(slot, "fails", newSViv(0));
2870 0           (void)hv_stores(slot, "dead_until", newSViv(0));
2871             }
2872             }