File Coverage

xs/io.c
Criterion Covered Total %
statement 3 434 0.6
branch 0 348 0.0
condition n/a
subroutine n/a
pod n/a
total 3 782 0.3


line stmt bran cond sub pod time code
1             /* --- Async TCP connect, I/O dispatch, timers, keepalive, reconnect,
2             * pipeline advance, and OpenSSL one-time init. ---
3             *
4             * Forward decls for symbols defined later in this file but called from
5             * earlier in the same file (cross-file callers use the forwards in
6             * ClickHouse.xs).
7             */
8             static void io_cb(EV_P_ ev_io *w, int revents);
9             static void on_connect_done(ev_clickhouse_t *self);
10              
11             #ifdef HAVE_OPENSSL
12             /* Drain OpenSSL's per-thread error queue into self->last_tls_error
13             * (most recent error wins). Safe to call when the queue is empty —
14             * we leave the previous value untouched. */
15 0           static void capture_tls_error(ev_clickhouse_t *self) {
16 0           unsigned long e = 0, last = 0;
17 0 0         while ((e = ERR_get_error()) != 0) last = e;
18 0 0         if (!last) return;
19             char buf[256];
20 0           ERR_error_string_n(last, buf, sizeof(buf));
21 0 0         CLEAR_STR(self->last_tls_error);
22 0           self->last_tls_error = savepv(buf);
23             }
24             #endif
25              
26 0           static void start_connect(ev_clickhouse_t *self) {
27 0           struct addrinfo hints, *res = NULL;
28             int fd, ret;
29             char port_str[16];
30              
31 0           self->connect_gen++;
32              
33 0           emit_trace(self, "connect %s:%u (%s)",
34             self->host, self->port,
35 0 0         self->protocol == PROTO_NATIVE ? "native" : "http");
36 0           snprintf(port_str, sizeof(port_str), "%u", self->port);
37              
38 0           Zero(&hints, 1, struct addrinfo);
39 0           hints.ai_family = AF_UNSPEC;
40 0           hints.ai_socktype = SOCK_STREAM;
41              
42 0           ret = getaddrinfo(self->host, port_str, &hints, &res);
43 0 0         if (ret != 0) {
44             char errbuf[256];
45 0           snprintf(errbuf, sizeof(errbuf), "getaddrinfo: %s", gai_strerror(ret));
46 0           fail_connection(self, errbuf);
47 0           return;
48             }
49              
50 0           fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
51 0 0         if (fd < 0) {
52 0           freeaddrinfo(res);
53 0           fail_connection(self, "socket() failed");
54 0           return;
55             }
56              
57             /* non-blocking */
58             {
59 0           int fl = fcntl(fd, F_GETFL);
60 0 0         if (fl < 0 || fcntl(fd, F_SETFL, fl | O_NONBLOCK) < 0) {
    0          
61 0           freeaddrinfo(res);
62 0           close(fd);
63 0           fail_connection(self, "fcntl O_NONBLOCK failed");
64 0           return;
65             }
66             }
67              
68             /* TCP_NODELAY */
69             {
70 0           int one = 1;
71 0           setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one));
72             }
73              
74 0           self->fd = fd;
75 0           self->connecting = 1;
76              
77 0           ret = connect(fd, res->ai_addr, res->ai_addrlen);
78 0           freeaddrinfo(res);
79              
80 0 0         if (ret == 0) {
81             /* connected immediately — connected=1 is deferred for native
82             * (until ServerHello) and TLS (until handshake completes) */
83 0           self->connecting = 0;
84 0 0         if (self->protocol != PROTO_NATIVE && !self->tls_enabled)
    0          
85 0           self->connected = 1;
86 0           ev_io_init(&self->rio, io_cb, self->fd, EV_READ);
87 0           self->rio.data = (void *)self;
88 0           ev_io_init(&self->wio, io_cb, self->fd, EV_WRITE);
89 0           self->wio.data = (void *)self;
90 0           on_connect_done(self);
91 0           return;
92             }
93              
94 0 0         if (errno != EINPROGRESS) {
95             char errbuf[256];
96 0           snprintf(errbuf, sizeof(errbuf), "connect: %s", strerror(errno));
97 0           close(fd);
98 0           self->fd = -1;
99 0           self->connecting = 0;
100 0           fail_connection(self, errbuf);
101 0           return;
102             }
103              
104             /* in progress — wait for writability */
105 0           ev_io_init(&self->rio, io_cb, self->fd, EV_READ);
106 0           self->rio.data = (void *)self;
107 0           ev_io_init(&self->wio, io_cb, self->fd, EV_WRITE);
108 0           self->wio.data = (void *)self;
109              
110 0           start_writing(self);
111              
112 0 0         if (self->connect_timeout > 0) {
113 0           ev_timer_set(&self->timer, (ev_tstamp)self->connect_timeout, 0.0);
114 0           ev_timer_start(self->loop, &self->timer);
115 0           self->timing = 1;
116             }
117             }
118              
119             /* Mark the connection as ready, fire on_connect, dispatch queued queries.
120             * Returns 1 if self was freed. */
121 0           static int finish_connect(ev_clickhouse_t *self) {
122 0           stop_timing(self);
123 0           self->connected = 1;
124 0 0         CLEAR_STR(self->last_tls_error); /* successful connect supersedes stale TLS error */
125 0           if (self->on_connect &&
126 0           fire_zero_arg_cb(self, self->on_connect, "connect")) return 1;
127 0 0         if (!ngx_queue_empty(&self->send_queue))
128 0           return pipeline_advance(self);
129 0           return 0;
130             }
131              
132 0           static void on_connect_done(ev_clickhouse_t *self) {
133 0           self->connecting = 0;
134 0           self->reconnect_attempts = 0;
135              
136 0           stop_writing(self);
137             /* Keep the connect_timeout timer armed across the TLS handshake and
138             * native ServerHello phases — finish_connect() stops it once the
139             * connection is fully ready to accept queries. */
140              
141             #ifdef HAVE_OPENSSL
142 0 0         if (self->tls_enabled) {
143             int ret;
144 0           self->ssl_ctx = SSL_CTX_new(TLS_client_method());
145 0 0         if (!self->ssl_ctx) {
146 0           fail_connection(self, "SSL_CTX_new failed");
147 0           return;
148             }
149 0           SSL_CTX_set_default_verify_paths(self->ssl_ctx);
150 0 0         if (self->tls_skip_verify)
151 0           SSL_CTX_set_verify(self->ssl_ctx, SSL_VERIFY_NONE, NULL);
152             else
153 0           SSL_CTX_set_verify(self->ssl_ctx, SSL_VERIFY_PEER, NULL);
154 0 0         if (self->tls_ca_file) {
155 0 0         if (SSL_CTX_load_verify_locations(self->ssl_ctx, self->tls_ca_file, NULL) != 1) {
156 0           capture_tls_error(self);
157 0           fail_connection(self, "SSL_CTX_load_verify_locations failed");
158 0           return;
159             }
160             }
161             /* Mutual TLS: load client certificate + private key when both
162             * are configured. SSL_CTX_check_private_key verifies that the
163             * private key matches the loaded certificate's public half. */
164 0 0         if (self->tls_cert_file && self->tls_key_file) {
    0          
165 0 0         if (SSL_CTX_use_certificate_chain_file(self->ssl_ctx, self->tls_cert_file) != 1) {
166 0           capture_tls_error(self);
167 0           fail_connection(self, "SSL_CTX_use_certificate_chain_file failed");
168 0           return;
169             }
170 0 0         if (SSL_CTX_use_PrivateKey_file(self->ssl_ctx, self->tls_key_file, SSL_FILETYPE_PEM) != 1) {
171 0           capture_tls_error(self);
172 0           fail_connection(self, "SSL_CTX_use_PrivateKey_file failed");
173 0           return;
174             }
175 0 0         if (SSL_CTX_check_private_key(self->ssl_ctx) != 1) {
176 0           capture_tls_error(self);
177 0           fail_connection(self, "TLS client cert / private key mismatch");
178 0           return;
179             }
180 0 0         } else if (self->tls_cert_file || self->tls_key_file) {
    0          
181 0           fail_connection(self, "tls_cert_file and tls_key_file must both be set");
182 0           return;
183             }
184 0           self->ssl = SSL_new(self->ssl_ctx);
185 0 0         if (!self->ssl) {
186 0           capture_tls_error(self);
187 0           fail_connection(self, "SSL_new failed");
188 0           return;
189             }
190 0           SSL_set_fd(self->ssl, self->fd);
191              
192 0           int host_is_ip = is_ip_literal(self->host);
193              
194             /* SNI must not be sent for IP address literals (RFC 6066 s3) */
195 0 0         if (!host_is_ip)
196 0           SSL_set_tlsext_host_name(self->ssl, self->host);
197              
198             /* Verify server certificate matches hostname or IP */
199 0 0         if (!self->tls_skip_verify) {
200 0           X509_VERIFY_PARAM *param = SSL_get0_param(self->ssl);
201 0           X509_VERIFY_PARAM_set_hostflags(param, X509_CHECK_FLAG_NO_PARTIAL_WILDCARDS);
202 0 0         if (host_is_ip)
203 0           X509_VERIFY_PARAM_set1_ip_asc(param, self->host);
204             else
205 0           X509_VERIFY_PARAM_set1_host(param, self->host, 0);
206             }
207              
208 0           ret = SSL_connect(self->ssl);
209 0 0         if (ret == 1) {
210             /* handshake done immediately */
211 0           goto handshake_done;
212             } else {
213 0           int err = SSL_get_error(self->ssl, ret);
214 0 0         if (err == SSL_ERROR_WANT_READ) {
215 0           start_reading(self);
216 0 0         } else if (err == SSL_ERROR_WANT_WRITE) {
217 0           start_writing(self);
218             } else {
219 0           capture_tls_error(self);
220 0           fail_connection(self, "SSL_connect failed");
221 0           return;
222             }
223             /* continue TLS handshake in io_cb */
224 0           return;
225             }
226             }
227 0           handshake_done:
228             #endif
229              
230 0 0         if (self->protocol == PROTO_NATIVE) {
231             /* Send ClientHello and wait for ServerHello */
232             size_t hello_len;
233 0           char *hello = build_native_hello(self, &hello_len);
234 0           send_replace(self, hello, hello_len);
235              
236 0           self->native_state = NATIVE_WAIT_HELLO;
237 0           start_writing(self);
238 0           return;
239             }
240              
241             /* HTTP protocol: connection is ready */
242 0           (void)finish_connect(self);
243             }
244              
245             /* --- I/O callbacks --- */
246              
247             /* Returns 1 if self was freed (caller must not access self). */
248 0           static int try_write(ev_clickhouse_t *self) {
249 0 0         while (self->send_pos < self->send_len) {
250 0           ssize_t n = ch_write(self, self->send_buf + self->send_pos,
251 0           self->send_len - self->send_pos);
252 0 0         if (n < 0) {
253 0 0         if (errno == EAGAIN || errno == EWOULDBLOCK) {
    0          
254 0           start_writing(self);
255 0           return 0;
256             }
257 0           return teardown_io_error(self, strerror(errno), "write error");
258             }
259 0 0         if (n == 0)
260 0           return teardown_io_error(self, "connection closed during write",
261             "connection closed");
262 0           self->send_pos += n;
263             }
264              
265             /* all sent */
266 0           stop_writing(self);
267 0           self->send_len = 0;
268 0           self->send_pos = 0;
269              
270             /* start reading responses */
271 0           start_reading(self);
272              
273             /* check if more to send */
274 0 0         if (!ngx_queue_empty(&self->send_queue))
275 0           return pipeline_advance(self);
276 0           return 0;
277             }
278              
279 0           static void on_readable(ev_clickhouse_t *self) {
280             ssize_t n;
281              
282 0           ensure_recv_cap(self, self->recv_len + 4096);
283 0           n = ch_read(self, self->recv_buf + self->recv_len,
284 0           self->recv_cap - self->recv_len);
285              
286 0 0         if (n < 0) {
287 0 0         if (errno == EAGAIN || errno == EWOULDBLOCK) return;
    0          
288 0           teardown_io_error(self, strerror(errno), "read error");
289 0           return;
290             }
291              
292 0 0         if (n == 0) {
293             /* connection closed — fire on_error and drain pending if we
294             * have an in-flight request or haven't finished handshake */
295 0 0         int had_inflight = (self->send_count > 0 || !self->connected);
    0          
296 0           int has_queued = !ngx_queue_empty(&self->send_queue);
297              
298 0 0         if (had_inflight) {
299 0           int gen = self->connect_gen;
300 0           emit_error(self, "connection closed by server");
301 0 0         if (check_destroyed(self)) return;
302 0 0         if (self->connect_gen != gen) return;
303             /* Only cancel in-flight cb_queue (irrecoverable).
304             * Keep send_queue if auto_reconnect — those haven't been sent yet. */
305 0 0         if (!self->auto_reconnect || !has_queued) {
    0          
306 0 0         if (cancel_pending(self, "connection closed")) return;
307             } else {
308             /* Cancel only the in-flight cb_queue entries */
309 0           self->callback_depth++;
310 0           drain_cb_queue(self, "connection closed");
311 0           self->callback_depth--;
312 0 0         if (check_destroyed(self)) return;
313             }
314 0 0         if (self->connect_gen != gen) return;
315             }
316 0 0         if (cleanup_connection(self)) return; /* on_disconnect freed self */
317              
318             /* Auto-reconnect if we have queued requests or flag is set */
319 0 0         if (self->auto_reconnect && self->host) {
    0          
320 0           schedule_reconnect(self);
321             }
322 0           return;
323             }
324              
325 0           self->recv_len += n;
326              
327             /* Defensive ceiling: a runaway response (server stuck in a bad state,
328             * pathological row sizes, or a header-injection attack) shouldn't be
329             * able to consume unbounded memory. Caller opts in via max_recv_buffer;
330             * 0 (default) keeps the historical behaviour of growing without limit
331             * up to CH_MAX_DECOMPRESS_SIZE on compressed paths. */
332 0 0         if (self->max_recv_buffer > 0 && self->recv_len > self->max_recv_buffer) {
    0          
333             char errmsg[80];
334 0           snprintf(errmsg, sizeof(errmsg),
335             "recv buffer %lu exceeded max_recv_buffer %lu",
336 0           (unsigned long)self->recv_len,
337 0           (unsigned long)self->max_recv_buffer);
338 0           teardown_io_error(self, errmsg, "recv buffer overflow");
339 0           return;
340             }
341              
342 0 0         if (self->protocol == PROTO_HTTP) {
343 0           process_http_response(self);
344             } else {
345 0           process_native_response(self);
346             }
347             }
348              
349 0           static void io_cb(EV_P_ ev_io *w, int revents) {
350 0           ev_clickhouse_t *self = (ev_clickhouse_t *)w->data;
351             (void)loop;
352              
353 0 0         if (self == NULL || self->magic != EV_CH_MAGIC) return;
    0          
354              
355 0 0         if (self->connecting) {
356             /* check connect result */
357 0           int err = 0;
358 0           socklen_t errlen = sizeof(err);
359              
360 0           stop_writing(self);
361             /* Don't stop the connect_timeout timer here — it must keep
362             * running across TLS handshake and native ServerHello stages.
363             * finish_connect() stops it once the connection is fully ready. */
364              
365 0 0         if (getsockopt(self->fd, SOL_SOCKET, SO_ERROR, &err, &errlen) < 0)
366 0           err = errno;
367 0 0         if (err != 0) {
368             char errbuf[256];
369 0           snprintf(errbuf, sizeof(errbuf), "connect: %s", strerror(err));
370 0           fail_connection(self, errbuf);
371 0           return;
372             }
373              
374 0           on_connect_done(self);
375 0           return;
376             }
377              
378             #ifdef HAVE_OPENSSL
379 0 0         if (self->ssl && !self->connected && self->native_state != NATIVE_WAIT_HELLO
    0          
    0          
380 0 0         && self->native_state != NATIVE_WAIT_RESULT
381 0 0         && self->native_state != NATIVE_WAIT_INSERT_META) {
382             /* TLS handshake in progress */
383 0           int ret = SSL_connect(self->ssl);
384 0 0         if (ret == 1) {
385 0           stop_reading(self);
386 0           stop_writing(self);
387              
388 0 0         if (self->protocol == PROTO_NATIVE) {
389             /* Send ClientHello over TLS, then wait for ServerHello */
390             size_t hello_len;
391 0           char *hello = build_native_hello(self, &hello_len);
392 0           send_replace(self, hello, hello_len);
393 0           self->native_state = NATIVE_WAIT_HELLO;
394 0           start_writing(self);
395 0           return;
396             }
397              
398             /* HTTP protocol: fire on_connect */
399 0           (void)finish_connect(self);
400 0           return;
401             } else {
402 0           int err = SSL_get_error(self->ssl, ret);
403 0           stop_reading(self);
404 0           stop_writing(self);
405 0 0         if (err == SSL_ERROR_WANT_READ) {
406 0           start_reading(self);
407 0 0         } else if (err == SSL_ERROR_WANT_WRITE) {
408 0           start_writing(self);
409             } else {
410 0           capture_tls_error(self);
411 0           fail_connection(self, "SSL handshake failed");
412             }
413 0           return;
414             }
415             }
416             #endif
417              
418 0 0         if (revents & EV_WRITE) {
419 0 0         if (try_write(self)) return;
420 0 0         if (self->fd < 0) return;
421 0 0         if (self->pending_addendum_finish && self->send_pos >= self->send_len) {
    0          
422 0           self->pending_addendum_finish = 0;
423 0           self->native_state = NATIVE_IDLE;
424 0 0         if (finish_connect(self)) return;
425             }
426             }
427              
428 0 0         if (revents & EV_READ) {
429 0           on_readable(self);
430             }
431             }
432              
433 0           static void timer_cb(EV_P_ ev_timer *w, int revents) {
434 0           ev_clickhouse_t *self = (ev_clickhouse_t *)w->data;
435             (void)loop;
436             (void)revents;
437              
438 0 0         if (self == NULL || self->magic != EV_CH_MAGIC) return;
    0          
439              
440 0           self->timing = 0;
441              
442             /* Treat any pre-`connected=1` timeout as a connect timeout — covers
443             * TCP connect, TLS handshake, and native ServerHello stages. */
444 0 0         if (!self->connected) {
445 0           stop_writing(self);
446 0           fail_connection(self, "connect timeout");
447             } else {
448             /* query timeout */
449 0 0         CLEAR_SV(self->native_rows);
450 0 0         CLEAR_SV(self->native_col_names);
451 0 0         CLEAR_SV(self->native_col_types);
452 0 0         CLEAR_SV(self->native_totals);
453 0 0         CLEAR_SV(self->native_extremes);
454 0           lc_free_dicts(self);
455 0 0         CLEAR_INSERT(self);
    0          
456 0 0         CLEAR_STR(self->insert_err);
457 0           self->native_state = NATIVE_IDLE;
458 0 0         if (self->send_count > 0) self->send_count--;
459              
460 0           int gen = self->connect_gen;
461             /* Must reconnect — server may still be processing */
462 0 0         if (teardown_after_deliver(self, "query timeout", "query timeout")) return;
463 0 0         if (self->connect_gen != gen) return;
464 0 0         if (self->auto_reconnect && self->host)
    0          
465 0           schedule_reconnect(self);
466             }
467             }
468              
469             /* --- Keepalive timer callback --- */
470              
471 0           static void ka_timer_cb(EV_P_ ev_timer *w, int revents) {
472 0           ev_clickhouse_t *self = (ev_clickhouse_t *)((char *)w -
473             offsetof(ev_clickhouse_t, ka_timer));
474             (void)revents;
475              
476 0 0         if (self->magic != EV_CH_MAGIC) return;
477 0 0         if (!self->connected || self->send_count > 0) return;
    0          
478              
479             /* Native: append PING bytes directly; the SERVER_PONG handler tracks
480             * ka_in_flight so no callback queue entry is needed. */
481 0 0         if (self->protocol == PROTO_NATIVE) {
482             size_t ping_len;
483 0           char *ping = build_native_ping(&ping_len);
484 0           ensure_send_cap(self, self->send_len + ping_len);
485 0           Copy(ping, self->send_buf + self->send_len, ping_len, char);
486 0           self->send_len += ping_len;
487 0           self->ka_in_flight++;
488 0           Safefree(ping);
489 0           start_writing(self);
490             } else {
491             /* HTTP: enqueue a real ping with a no-op callback. ClickHouse's
492             * HTTP server closes idle connections after a few seconds, so
493             * relying on TCP keepalive (kernel default ~2h) is not enough. */
494             size_t req_len;
495 0           char *req = build_http_ping_request(self, &req_len);
496 0           ev_ch_send_t *s = alloc_send();
497 0           s->data = req;
498 0           s->data_len = req_len;
499 0           s->cb = SvREFCNT_inc(keepalive_noop_cb);
500 0           enqueue_send(self, s);
501             }
502             }
503              
504 0           static void start_keepalive(ev_clickhouse_t *self) {
505 0 0         if (self->keepalive > 0 && !self->ka_timing && self->connected) {
    0          
    0          
506 0           ev_timer_init(&self->ka_timer, ka_timer_cb, self->keepalive, self->keepalive);
507 0           ev_timer_start(self->loop, &self->ka_timer);
508 0           self->ka_timing = 1;
509             }
510 0           }
511              
512 0           static void stop_keepalive(ev_clickhouse_t *self) {
513 0 0         if (self->ka_timing) {
514 0           ev_timer_stop(self->loop, &self->ka_timer);
515 0           self->ka_timing = 0;
516             }
517 0           }
518              
519             /* --- Reconnect with backoff --- */
520              
521 0           static void reconnect_timer_cb(EV_P_ ev_timer *w, int revents) {
522 0           ev_clickhouse_t *self = (ev_clickhouse_t *)((char *)w -
523             offsetof(ev_clickhouse_t, reconnect_timer));
524             (void)revents; (void)loop;
525 0           self->reconnect_timing = 0;
526 0 0         if (self->magic != EV_CH_MAGIC || self->connected || self->connecting) return;
    0          
    0          
527 0           start_connect(self);
528             }
529              
530 0           static void schedule_reconnect(ev_clickhouse_t *self) {
531 0 0         if (!self->auto_reconnect || !self->host || self->magic != EV_CH_MAGIC) return;
    0          
    0          
532 0 0         if (self->reconnect_max_attempts > 0
533 0 0         && self->reconnect_attempts >= self->reconnect_max_attempts) {
534             /* Give up so the user isn't trapped in an infinite loop on a
535             * permanent failure (bad host, wrong creds). Drain any queries
536             * still in send_queue first so their callbacks see the failure
537             * instead of being silently orphaned. */
538 0           emit_error(self, "max reconnect attempts exceeded");
539 0 0         if (check_destroyed(self)) return;
540 0           (void)cancel_pending(self, "max reconnect attempts exceeded");
541 0           return;
542             }
543             /* Always defer through ev_timer so a synchronous start_connect failure
544             * (e.g. getaddrinfo error) cannot cause unbounded fail_connection ->
545             * schedule_reconnect -> start_connect recursion on the C stack. A
546             * delay of 0 fires on the next event-loop iteration, not inline. */
547 0 0         double delay = self->reconnect_delay > 0 ? self->reconnect_delay : 0.0;
548             int i;
549 0 0         for (i = 0; i < self->reconnect_attempts && i < 20; i++)
    0          
550 0           delay *= 2;
551 0 0         if (self->reconnect_max_delay > 0 && delay > self->reconnect_max_delay)
    0          
552 0           delay = self->reconnect_max_delay;
553             /* Apply jitter AFTER the cap so a configured ceiling isn't silently
554             * exceeded. rand() / RAND_MAX is uniform in [0, 1]; clamping to the
555             * cap again keeps the worst case bounded. */
556 0 0         if (self->reconnect_jitter > 0 && delay > 0) {
    0          
557 0           double j = ((double)rand() / (double)RAND_MAX) * self->reconnect_jitter;
558 0           delay += delay * j;
559 0 0         if (self->reconnect_max_delay > 0 && delay > self->reconnect_max_delay)
    0          
560 0           delay = self->reconnect_max_delay;
561             }
562 0           self->reconnect_attempts++;
563 0 0         if (self->reconnect_timing) {
564 0           ev_timer_stop(self->loop, &self->reconnect_timer);
565 0           self->reconnect_timing = 0;
566             }
567 0           ev_timer_init(&self->reconnect_timer, reconnect_timer_cb, delay, 0);
568 0           ev_timer_start(self->loop, &self->reconnect_timer);
569 0           self->reconnect_timing = 1;
570             }
571              
572             /* Free LowCardinality cross-block dictionary state */
573 0           static void lc_free_dicts(ev_clickhouse_t *self) {
574 0 0         if (self->lc_dicts) {
575             int c;
576 0 0         for (c = 0; c < self->lc_num_cols; c++) {
577 0 0         if (self->lc_dicts[c]) {
578             uint64_t j;
579 0 0         for (j = 0; j < self->lc_dict_sizes[c]; j++)
580 0           SvREFCNT_dec(self->lc_dicts[c][j]);
581 0           Safefree(self->lc_dicts[c]);
582             }
583             }
584 0           Safefree(self->lc_dicts);
585 0           Safefree(self->lc_dict_sizes);
586 0           self->lc_dicts = NULL;
587 0           self->lc_dict_sizes = NULL;
588 0           self->lc_num_cols = 0;
589             }
590 0           }
591              
592             /* --- Pipeline orchestrator --- */
593              
594             /* Send one request at a time, wait for response, then send the next.
595             * Returns 1 if self was freed (caller must not access self). */
596 0           static int pipeline_advance(ev_clickhouse_t *self) {
597 0 0         if (!self->connected) return 0;
598              
599 0 0         if (self->send_count > 0) {
600 0           start_reading(self);
601 0           return 0;
602             }
603              
604             /* Check drain callback when all pending work is done */
605 0 0         if (ngx_queue_empty(&self->send_queue) && self->pending_count == 0
    0          
606 0 0         && self->on_drain) {
607 0           SV *drain_cb = self->on_drain;
608 0           self->on_drain = NULL;
609 0 0         if (fire_zero_arg_cb(self, drain_cb, "drain")) {
610 0           SvREFCNT_dec(drain_cb);
611 0           return 1;
612             }
613             /* Dropping the last ref to the drain CV can free a closure that
614             * captured $ch — DESTROY then runs. Guard with callback_depth so
615             * the free is deferred, then detect it before touching self. */
616 0           self->callback_depth++;
617 0           SvREFCNT_dec(drain_cb);
618 0           self->callback_depth--;
619 0 0         if (check_destroyed(self)) return 1;
620             }
621              
622             /* Restart keepalive timer when idle (start_keepalive is a no-op if already
623             * timing or if keepalive disabled) */
624 0 0         if (ngx_queue_empty(&self->send_queue) && self->pending_count == 0)
    0          
625 0           start_keepalive(self);
626              
627             /* send next request from queue */
628 0 0         if (!ngx_queue_empty(&self->send_queue)) {
629             /* Stop keepalive during active query */
630 0           stop_keepalive(self);
631 0           emit_trace(self, "dispatch query (pending=%d)", self->pending_count);
632              
633             /* on_trace is user code: it may have called finish()/reset() or
634             * dropped the last reference (DESTROY), any of which drains or
635             * frees the send queue. Re-validate the connection and re-derive
636             * the head entry AFTER the trace so we never touch a send struct
637             * that cancel_pending has already released to the freelist. */
638 0 0         if (self->magic != EV_CH_MAGIC || !self->connected
    0          
639 0 0         || ngx_queue_empty(&self->send_queue))
640 0           return check_destroyed(self);
641              
642 0           ngx_queue_t *q = ngx_queue_head(&self->send_queue);
643 0           ev_ch_send_t *send = ngx_queue_data(q, ev_ch_send_t, queue);
644              
645             /* set up send buffer */
646 0           ensure_send_cap(self, send->data_len);
647 0           Copy(send->data, self->send_buf, send->data_len, char);
648 0           self->send_len = send->data_len;
649 0           self->send_pos = 0;
650              
651             /* move cb to recv queue */
652 0           SV *dispatched_cb = send->cb;
653 0           ngx_queue_remove(q);
654 0           push_cb_owned_ex(self, send->cb, send->raw,
655             send->on_data, send->on_complete,
656             send->query_timeout);
657 0 0         CLEAR_SV(send->on_data);
658 0           send->on_complete = NULL; /* ownership transferred to cbt */
659             /* Track query_id + dispatch start time (used by on_query_complete). */
660 0 0         CLEAR_STR(self->last_query_id);
661 0 0         if (send->query_id) { self->last_query_id = send->query_id; send->query_id = NULL; }
662 0           self->query_start_time = ev_now(self->loop);
663              
664             /* on_query_start: fire with the resolved query_id, just before
665             * the write side runs. Suppressed for keepalive PINGs to match
666             * on_query_complete semantics. */
667 0 0         if (self->on_query_start && !IS_KEEPALIVE_CB(dispatched_cb)) {
    0          
    0          
668 0           dSP;
669 0 0         ENTER; SAVETMPS; PUSHMARK(SP);
670 0 0         EXTEND(SP, 1);
671 0 0         PUSHs(self->last_query_id
672             ? sv_2mortal(newSVpv(self->last_query_id, 0))
673             : &PL_sv_undef);
674 0           PUTBACK;
675 0           int gen_before = self->connect_gen;
676 0           self->callback_depth++;
677 0           call_sv(self->on_query_start, G_EVAL | G_VOID | G_DISCARD);
678 0           self->callback_depth--;
679 0 0         WARN_AND_CLEAR_ERRSV("on_query_start");
    0          
    0          
    0          
680 0 0         FREETMPS; LEAVE;
681             /* The handler may have torn the connection down: DESTROY
682             * (dropped the last ref), reset() (rotated it — connect_gen
683             * bumped), or finish() (closed it — connected cleared). In
684             * every case the cb was already delivered by cancel_pending;
685             * drop the local send entry (already dequeued, so cancel_pending
686             * never saw it) without dispatching it. Falling through on a
687             * !connected struct would re-arm a stray query-timeout timer
688             * and leave send_count stuck at 1. The `||` short-circuits
689             * before reading self->connect_gen when self has been freed. */
690 0           int destroyed = check_destroyed(self);
691 0 0         if (destroyed || self->connect_gen != gen_before
    0          
692 0 0         || !self->connected) {
693 0           Safefree(send->data);
694 0 0         CLEAR_INSERT(send);
    0          
695 0           release_send(send);
696 0           return destroyed ? 1 : 0;
697             }
698             }
699              
700             /* Clear per-query accumulated state so accessors don't return
701             * the previous query's data. native_rows is already NULL at
702             * EndOfStream; col_names/types are also cleared here so DDL
703             * (or any query that emits no DATA block) does not leave the
704             * previous SELECT's schema visible. */
705 0 0         CLEAR_SV(self->native_col_names);
706 0 0         CLEAR_SV(self->native_col_types);
707 0 0         CLEAR_SV(self->native_totals);
708 0 0         CLEAR_SV(self->native_extremes);
709 0           self->last_error_code = 0;
710 0           self->profile_rows = 0;
711 0           self->profile_bytes = 0;
712 0           self->profile_rows_before_limit = 0;
713 0 0         if (self->progress_period > 0) {
714 0           memset(self->progress_acc, 0, sizeof(self->progress_acc));
715 0           self->progress_last = 0.0;
716             }
717              
718             /* transfer deferred insert data from send entry to self */
719 0 0         if (send->insert_data) {
720 0           self->insert_data = send->insert_data;
721 0           self->insert_data_len = send->insert_data_len;
722 0           send->insert_data = NULL;
723             }
724 0 0         if (send->insert_av) {
725 0           self->insert_av = send->insert_av;
726 0           send->insert_av = NULL;
727             }
728              
729 0           Safefree(send->data);
730 0           double qt = send->query_timeout;
731 0           release_send(send);
732 0           self->send_count++;
733              
734             /* Start query timeout timer */
735 0 0         double timeout = qt > 0 ? qt : self->query_timeout;
736 0 0         if (timeout > 0 && !self->timing) {
    0          
737 0           ev_timer_set(&self->timer, (ev_tstamp)timeout, 0.0);
738 0           ev_timer_start(self->loop, &self->timer);
739 0           self->timing = 1;
740             }
741              
742 0 0         if (self->protocol == PROTO_NATIVE) {
743 0 0         if (self->insert_data || self->insert_av)
    0          
744 0           self->native_state = NATIVE_WAIT_INSERT_META;
745             else
746 0           self->native_state = NATIVE_WAIT_RESULT;
747             }
748              
749 0           return try_write(self);
750             }
751 0           return 0;
752             }
753              
754             /* --- OpenSSL init (must be in plain C, not inside XS BOOT) --- */
755              
756 48           static void ch_openssl_init(void) {
757             #ifdef HAVE_OPENSSL
758             #if OPENSSL_VERSION_NUMBER >= 0x10100000L
759 48           OPENSSL_init_ssl(0, NULL);
760             #else
761             SSL_library_init();
762             SSL_load_error_strings();
763             OpenSSL_add_all_algorithms();
764             #endif
765             #endif
766 48           }