File Coverage

xs/proto_native_parse.c
Criterion Covered Total %
statement 0 637 0.0
branch 0 658 0.0
condition n/a
subroutine n/a
pod n/a
total 0 1295 0.0


line stmt bran cond sub pod time code
1             /* --- Native protocol response parser --- */
2              
3             /*
4             * Skip block info fields (revision >= DBMS_MIN_REVISION_WITH_BLOCK_INFO).
5             * Returns 1 on success, 0 if need more data.
6             */
7 0           static int skip_block_info(const char *buf, size_t len, size_t *pos) {
8 0           for (;;) {
9             uint64_t field_num;
10 0           int rc = read_varuint(buf, len, pos, &field_num);
11 0 0         if (rc == 0) return 0;
12 0 0         if (rc < 0) return -1;
13 0 0         if (field_num == 0) return 1; /* end marker */
14 0 0         if (field_num == 1) {
15             /* is_overflows: UInt8 */
16             uint8_t dummy;
17 0           rc = read_u8(buf, len, pos, &dummy);
18 0 0         if (rc <= 0) return rc;
19 0 0         } else if (field_num == 2) {
20             /* bucket_num: Int32 */
21             int32_t dummy;
22 0           rc = read_i32(buf, len, pos, &dummy);
23 0 0         if (rc <= 0) return rc;
24             } else {
25 0           return -1; /* protocol error */
26             }
27             }
28             }
29              
30             /*
31             * Try to parse one server packet from recv_buf.
32             * Returns:
33             * 1 = packet consumed, continue reading
34             * 0 = need more data
35             * -1 = error (message in *errmsg, caller must Safefree)
36             * 2 = EndOfStream
37             * 3 = Pong
38             * 4 = Hello parsed (self->server_* fields populated)
39             */
40              
41             /* Parse a Data block (table_name + optional-LZ4-chain + block_info + columns)
42             * and decode-and-discard each column. Used by SERVER_LOG and SERVER_PROFILE_EVENTS,
43             * which carry a block in the same wire format as SERVER_DATA but whose contents
44             * the client does not surface to Perl.
45             *
46             * `lz4_optional` (used by PROFILE_EVENTS only): if set, fall back to parsing
47             * the body uncompressed when LZ4 hard-fails — older servers occasionally send
48             * profile_events uncompressed even on a compressed connection.
49             *
50             * Returns 1 on success (advances *outer_pos past the consumed bytes),
51             * 0 on need-more-data,
52             * -1 on hard error (sets *errmsg). */
53 0           static int parse_and_discard_block(ev_clickhouse_t *self,
54             const char *buf, size_t len, size_t *outer_pos,
55             const char *kind, int lz4_optional,
56             char **errmsg) {
57 0           size_t pos = *outer_pos;
58             int rc;
59 0           char *decompressed = NULL;
60             const char *bbuf;
61             size_t blen, bpos;
62             char errbuf[64];
63              
64 0           rc = skip_native_string(buf, len, &pos);
65 0 0         if (rc == 0) return 0;
66 0 0         if (rc < 0) {
67 0           snprintf(errbuf, sizeof(errbuf), "malformed %s block", kind);
68 0           *errmsg = safe_strdup(errbuf);
69 0           return -1;
70             }
71              
72             #ifdef HAVE_LZ4
73             if (self->compress) {
74             int need_more = 0;
75             const char *lz4_err = NULL;
76             decompressed = ch_lz4_decompress_chain(buf, len, &pos, &blen,
77             &need_more, &lz4_err);
78             if (!decompressed) {
79             if (need_more) return 0;
80             if (!lz4_optional) {
81             snprintf(errbuf, sizeof(errbuf), "%s: LZ4 decompression failed", kind);
82             *errmsg = safe_strdup(lz4_err ? lz4_err : errbuf);
83             return -1;
84             }
85             /* fall through to uncompressed parsing */
86             }
87             }
88             if (decompressed) {
89             bbuf = decompressed;
90             bpos = 0;
91             } else
92             #endif
93             {
94 0           bbuf = buf;
95 0           blen = len;
96 0           bpos = pos;
97             }
98              
99             #define _BAIL(rc_val) do { \
100             if (decompressed) Safefree(decompressed); \
101             if (rc_val < 0) { snprintf(errbuf, sizeof(errbuf), "malformed %s block", kind); *errmsg = safe_strdup(errbuf); } \
102             return rc_val; \
103             } while (0)
104              
105 0 0         if (self->server_revision >= DBMS_MIN_REVISION_WITH_BLOCK_INFO) {
106 0           rc = skip_block_info(bbuf, blen, &bpos);
107 0 0         if (rc <= 0) _BAIL(rc);
    0          
    0          
108             }
109             {
110             uint64_t nc, nr, c;
111 0           rc = read_varuint(bbuf, blen, &bpos, &nc);
112 0 0         if (rc <= 0) _BAIL(rc);
    0          
    0          
113 0           rc = read_varuint(bbuf, blen, &bpos, &nr);
114 0 0         if (rc <= 0) _BAIL(rc);
    0          
    0          
115              
116 0 0         for (c = 0; c < nc; c++) {
117             const char *ctype;
118             size_t ctype_len;
119 0           rc = skip_native_string(bbuf, blen, &bpos);
120 0 0         if (rc <= 0) _BAIL(rc);
    0          
    0          
121 0           rc = read_native_string_ref(bbuf, blen, &bpos, &ctype, &ctype_len);
122 0 0         if (rc <= 0) _BAIL(rc);
    0          
    0          
123             /* custom serialization flag (revision >= 54446) */
124 0 0         if (bpos >= blen) _BAIL(0);
    0          
125 0 0         if ((uint8_t)bbuf[bpos]) {
126 0 0         if (decompressed) Safefree(decompressed);
127 0           *errmsg = safe_strdup("custom serialization not supported");
128 0           return -1;
129             }
130 0           bpos++;
131 0 0         if (nr > 0) {
132 0           col_type_t *ct = parse_col_type(ctype, ctype_len);
133 0           int col_err = 0;
134 0           SV **vals = decode_column(bbuf, blen, &bpos, nr, ct, &col_err, 0);
135 0 0         if (!vals) {
136 0           free_col_type(ct);
137 0 0         if (col_err || decompressed) {
    0          
138 0 0         if (decompressed) Safefree(decompressed);
139 0           snprintf(errbuf, sizeof(errbuf), "malformed %s block", kind);
140 0           *errmsg = safe_strdup(errbuf);
141 0           return -1;
142             }
143 0           return 0;
144             }
145             {
146             uint64_t j;
147 0 0         for (j = 0; j < nr; j++) SvREFCNT_dec(vals[j]);
148             }
149 0           Safefree(vals);
150 0           free_col_type(ct);
151             }
152             }
153             }
154              
155             #undef _BAIL
156              
157 0 0         if (!decompressed) pos = bpos;
158 0 0         if (decompressed) Safefree(decompressed);
159 0           *outer_pos = pos;
160 0           return 1;
161             }
162              
163             /* Like parse_and_discard_block, but for each row of the block invokes
164             * `cb` with one hashref keyed by column name. Used by on_log.
165             * Caller has already verified cb is non-NULL. */
166 0           static int parse_and_emit_log_block(ev_clickhouse_t *self,
167             const char *buf, size_t len, size_t *outer_pos,
168             SV *cb, char **errmsg) {
169 0           size_t pos = *outer_pos;
170             int rc;
171 0           char *decompressed = NULL;
172             const char *bbuf;
173             size_t blen, bpos;
174              
175 0           rc = skip_native_string(buf, len, &pos);
176 0 0         if (rc == 0) return 0;
177 0 0         if (rc < 0) { *errmsg = safe_strdup("malformed log block"); return -1; }
178              
179             #ifdef HAVE_LZ4
180             if (self->compress) {
181             int need_more = 0;
182             const char *lz4_err = NULL;
183             decompressed = ch_lz4_decompress_chain(buf, len, &pos, &blen,
184             &need_more, &lz4_err);
185             if (!decompressed) {
186             if (need_more) return 0;
187             /* server log frames are not always compressed even with
188             * compress=1 negotiated — fall through to raw parsing. */
189             }
190             }
191             if (decompressed) { bbuf = decompressed; bpos = 0; }
192             else
193             #endif
194 0           { bbuf = buf; blen = len; bpos = pos; }
195              
196             #define _BAIL_LOG(rc_val) do { \
197             if (decompressed) Safefree(decompressed); \
198             if (rc_val < 0) { *errmsg = safe_strdup("malformed log block"); } \
199             return rc_val; \
200             } while (0)
201              
202 0 0         if (self->server_revision >= DBMS_MIN_REVISION_WITH_BLOCK_INFO) {
203 0           rc = skip_block_info(bbuf, blen, &bpos);
204 0 0         if (rc <= 0) _BAIL_LOG(rc);
    0          
    0          
205             }
206             uint64_t nc, nr;
207 0           rc = read_varuint(bbuf, blen, &bpos, &nc);
208 0 0         if (rc <= 0) _BAIL_LOG(rc);
    0          
    0          
209 0           rc = read_varuint(bbuf, blen, &bpos, &nr);
210 0 0         if (rc <= 0) _BAIL_LOG(rc);
    0          
    0          
211              
212             /* Collect column name + decoded values, then assemble per-row HVs. */
213 0           char **names = NULL;
214 0           SV ***data = NULL; /* data[col][row] */
215 0 0         if (nc > 0) {
216 0 0         Newxz(names, nc, char *);
217 0 0         Newxz(data, nc, SV **);
218             }
219             /* err_seen: -1 = malformed, 0 = success / need-more, +1 = all columns
220             * parsed cleanly. We distinguish "loop completed all nc columns" from
221             * "loop broke early needing more data" via the explicit flag rather
222             * than inspecting bpos, since bpos always advances past the header. */
223 0           int err_seen = 1;
224 0 0         for (uint64_t c = 0; c < nc; c++) {
225             const char *cname; size_t cname_len;
226 0           rc = read_native_string_ref(bbuf, blen, &bpos, &cname, &cname_len);
227 0 0         if (rc <= 0) { err_seen = rc < 0 ? -1 : 0; break; }
    0          
228 0           Newx(names[c], cname_len + 1, char);
229 0           Copy(cname, names[c], cname_len, char);
230 0           names[c][cname_len] = '\0';
231              
232             const char *ctype; size_t ctype_len;
233 0           rc = read_native_string_ref(bbuf, blen, &bpos, &ctype, &ctype_len);
234 0 0         if (rc <= 0) { err_seen = rc < 0 ? -1 : 0; break; }
    0          
235 0 0         if (bpos >= blen) { err_seen = 0; break; }
236 0 0         if ((uint8_t)bbuf[bpos]) { err_seen = -1; break; }
237 0           bpos++;
238 0 0         if (nr > 0) {
239 0           col_type_t *ct = parse_col_type(ctype, ctype_len);
240 0           int col_err = 0;
241 0           SV **vals = decode_column(bbuf, blen, &bpos, nr, ct, &col_err, 0);
242 0           free_col_type(ct);
243 0 0         if (!vals) { err_seen = col_err ? -1 : 0; break; }
    0          
244 0           data[c] = vals;
245             }
246             }
247              
248 0           int destroyed_during_cb = 0;
249 0 0         if (err_seen == 1) {
250             /* Pin cb across the loop: an on_log handler that calls
251             * $ch->on_log(undef) would otherwise free the CV mid-iteration. */
252 0           SvREFCNT_inc(cb);
253 0           self->callback_depth++;
254 0 0         for (uint64_t r = 0; r < nr; r++) {
255 0           HV *row = newHV();
256 0 0         for (uint64_t c = 0; c < nc; c++) {
257 0           SV *v = data[c][r];
258 0           SvREFCNT_inc(v); /* hv_store consumes one ref */
259 0           (void)hv_store(row, names[c], strlen(names[c]), v, 0);
260             }
261 0           dSP;
262 0 0         ENTER; SAVETMPS; PUSHMARK(SP);
263 0 0         EXTEND(SP, 1);
264 0           PUSHs(sv_2mortal(newRV_noinc((SV *)row)));
265 0           PUTBACK;
266 0           call_sv(cb, G_EVAL | G_VOID | G_DISCARD);
267 0 0         WARN_AND_CLEAR_ERRSV("on_log");
    0          
    0          
    0          
268 0 0         FREETMPS; LEAVE;
269 0 0         if (self->magic == EV_CH_FREED) { destroyed_during_cb = 1; break; }
270             }
271             /* Drop the cb pin BEFORE callback_depth-- : dropping the last ref
272             * to the on_log CV (a handler may have reassigned on_log) can free
273             * a closure that captured $ch, triggering DESTROY. Doing it while
274             * callback_depth is still raised keeps Safefree(self) deferred so
275             * the check_destroyed below can detect and finalize it. */
276 0           SvREFCNT_dec(cb);
277 0           self->callback_depth--;
278             }
279             (void)destroyed_during_cb; /* loop-break flag only; freed state is
280             * authoritatively reported by check_destroyed */
281              
282             /* Free locally-collected SVs + names regardless of self's state. */
283 0 0         for (uint64_t c = 0; c < nc; c++) {
284 0 0         if (data && data[c]) {
    0          
285 0 0         for (uint64_t r = 0; r < nr; r++) SvREFCNT_dec(data[c][r]);
286 0           Safefree(data[c]);
287             }
288 0 0         if (names && names[c]) Safefree(names[c]);
    0          
289             }
290 0 0         if (data) Safefree(data);
291 0 0         if (names) Safefree(names);
292              
293             #undef _BAIL_LOG
294              
295             /* Single cleanup point: decompressed always needs freeing if allocated,
296             * regardless of which branch we return from. */
297             int ret;
298             /* on_log — or dropping the pinned cb above — may have freed self.
299             * check_destroyed finalizes any deferred Safefree and reports it. */
300 0 0         if (check_destroyed(self)) {
301 0           ret = -2;
302 0 0         } else if (err_seen < 0) {
303 0           *errmsg = safe_strdup("malformed log block");
304 0           ret = -1;
305 0 0         } else if (err_seen == 0) { /* need more data — do not advance outer_pos */
306 0           ret = 0;
307             } else {
308             /* For the uncompressed path bpos has been advanced inside the outer
309             * buf; copy it back to pos. For the LZ4 path pos was already advanced
310             * past the chain by ch_lz4_decompress_chain, and bpos is an offset
311             * into the freed decompressed buffer — don't touch pos. */
312 0 0         if (!decompressed) pos = bpos;
313 0           *outer_pos = pos;
314 0           ret = 1;
315             }
316 0 0         if (decompressed) Safefree(decompressed);
317 0           return ret;
318             }
319              
320             /* Dispatch the on_progress callback with five UInt values. Returns -2 if
321             * the handler freed self, 0 otherwise. Shared between SERVER_PROGRESS
322             * (per-packet) and EndOfStream (flush of any uncoalesced accumulator). */
323 0           static int fire_progress_cb(ev_clickhouse_t *self, const uint64_t pp[5]) {
324             int i;
325 0           dSP;
326 0           self->callback_depth++;
327 0           ENTER; SAVETMPS;
328 0 0         PUSHMARK(SP);
329 0 0         EXTEND(SP, 5);
330 0 0         for (i = 0; i < 5; i++) PUSHs(sv_2mortal(newSVuv(pp[i])));
331 0           PUTBACK;
332 0           call_sv(self->on_progress, G_DISCARD | G_EVAL);
333 0 0         WARN_AND_CLEAR_ERRSV("progress handler");
    0          
    0          
    0          
334 0 0         FREETMPS; LEAVE;
335 0           self->callback_depth--;
336 0 0         return check_destroyed(self) ? -2 : 0;
337             }
338              
339 0           static int parse_native_packet(ev_clickhouse_t *self, char **errmsg) {
340 0           const char *buf = self->recv_buf;
341 0           size_t len = self->recv_len;
342 0           size_t pos = 0;
343             uint64_t ptype;
344             int rc;
345              
346 0           rc = read_varuint(buf, len, &pos, &ptype);
347 0 0         if (rc == 0) return 0;
348 0 0         if (rc < 0) {
349 0           *errmsg = safe_strdup("malformed packet type");
350 0           return -1;
351             }
352              
353 0           switch ((int)ptype) {
354              
355 0           case SERVER_HELLO: {
356 0           char *sname = NULL;
357             uint64_t major, minor, revision;
358              
359 0           rc = read_native_string_alloc(buf, len, &pos, &sname, NULL);
360 0 0         if (rc == 0) return 0;
361 0 0         if (rc < 0) { *errmsg = safe_strdup("malformed server name"); return -1; }
362              
363 0           rc = read_varuint(buf, len, &pos, &major);
364 0 0         if (rc == 0) { Safefree(sname); return 0; }
365 0 0         if (rc < 0) { Safefree(sname); *errmsg = safe_strdup("malformed server version major"); return -1; }
366              
367 0           rc = read_varuint(buf, len, &pos, &minor);
368 0 0         if (rc == 0) { Safefree(sname); return 0; }
369 0 0         if (rc < 0) { Safefree(sname); *errmsg = safe_strdup("malformed server version minor"); return -1; }
370              
371 0           rc = read_varuint(buf, len, &pos, &revision);
372 0 0         if (rc == 0) { Safefree(sname); return 0; }
373 0 0         if (rc < 0) { Safefree(sname); *errmsg = safe_strdup("malformed server revision"); return -1; }
374              
375 0 0         CLEAR_STR(self->server_name);
376 0           self->server_name = sname;
377 0           self->server_version_major = (unsigned int)major;
378 0           self->server_version_minor = (unsigned int)minor;
379 0           self->server_revision = (unsigned int)revision;
380              
381             /* The server emits these fields per the negotiated revision =
382             * min(server_rev, our advertised CH_CLIENT_REVISION). Gate on
383             * server_revision so we don't read garbage from older servers. */
384 0 0         if (self->server_revision >= DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE) {
385 0           char *tz = NULL;
386 0           rc = read_native_string_alloc(buf, len, &pos, &tz, NULL);
387 0 0         if (rc == 0) return 0;
388 0 0         if (rc < 0) { *errmsg = safe_strdup("malformed timezone"); return -1; }
389 0 0         CLEAR_STR(self->server_timezone);
390 0           self->server_timezone = tz;
391             }
392              
393 0 0         if (self->server_revision >= DBMS_MIN_REVISION_WITH_SERVER_DISPLAY_NAME) {
394 0           char *dn = NULL;
395 0           rc = read_native_string_alloc(buf, len, &pos, &dn, NULL);
396 0 0         if (rc == 0) return 0;
397 0 0         if (rc < 0) { *errmsg = safe_strdup("malformed display name"); return -1; }
398 0 0         CLEAR_STR(self->server_display_name);
399 0           self->server_display_name = dn;
400             }
401              
402 0 0         if (self->server_revision >= DBMS_MIN_REVISION_WITH_VERSION_PATCH) {
403             uint64_t patch;
404 0           rc = read_varuint(buf, len, &pos, &patch);
405 0 0         if (rc == 0) return 0;
406 0 0         if (rc < 0) { *errmsg = safe_strdup("malformed version patch"); return -1; }
407 0           self->server_version_patch = (unsigned int)patch;
408             }
409              
410             /* consume from recv_buf */
411 0           recv_consume(self, pos);
412 0           return 4;
413             }
414              
415 0           case SERVER_DATA:
416             case SERVER_TOTALS:
417             case SERVER_EXTREMES: {
418             uint64_t num_cols, num_rows;
419             const char *dbuf; /* data buffer (may point to decompressed data) */
420             size_t dlen, dpos;
421 0           char *decompressed = NULL;
422              
423             /* table name — outside compression */
424 0           rc = skip_native_string(buf, len, &pos);
425 0 0         if (rc == 0) return 0;
426 0 0         if (rc < 0) { *errmsg = safe_strdup("malformed table name"); return -1; }
427              
428             #ifdef HAVE_LZ4
429             if (self->compress) {
430             /* Decompress the block body — may span multiple LZ4 sub-blocks. */
431             int need_more = 0;
432             const char *lz4_err = NULL;
433             decompressed = ch_lz4_decompress_chain(buf, len, &pos, &dlen,
434             &need_more, &lz4_err);
435             if (!decompressed) {
436             if (need_more) return 0;
437             *errmsg = safe_strdup(lz4_err ? lz4_err : "LZ4 decompression failed");
438             return -1;
439             }
440             dbuf = decompressed;
441             dpos = 0;
442             } else
443             #endif
444             {
445 0           dbuf = buf;
446 0           dlen = len;
447 0           dpos = pos;
448             }
449              
450             /* block info */
451 0 0         if (self->server_revision >= DBMS_MIN_REVISION_WITH_BLOCK_INFO) {
452 0           rc = skip_block_info(dbuf, dlen, &dpos);
453 0 0         if (rc == 0) {
454 0 0         if (decompressed) { Safefree(decompressed); *errmsg = safe_strdup("truncated compressed block"); return -1; }
455 0           return 0;
456             }
457 0 0         if (rc < 0) { if (decompressed) Safefree(decompressed); *errmsg = safe_strdup("malformed block info"); return -1; }
    0          
458             }
459              
460 0           rc = read_varuint(dbuf, dlen, &dpos, &num_cols);
461 0 0         if (rc == 0) {
462 0 0         if (decompressed) { Safefree(decompressed); *errmsg = safe_strdup("truncated compressed block"); return -1; }
463 0           return 0;
464             }
465 0 0         if (rc < 0) { if (decompressed) Safefree(decompressed); *errmsg = safe_strdup("malformed num_cols"); return -1; }
    0          
466              
467 0           rc = read_varuint(dbuf, dlen, &dpos, &num_rows);
468 0 0         if (rc == 0) {
469 0 0         if (decompressed) { Safefree(decompressed); *errmsg = safe_strdup("truncated compressed block"); return -1; }
470 0           return 0;
471             }
472 0 0         if (rc < 0) { if (decompressed) Safefree(decompressed); *errmsg = safe_strdup("malformed num_rows"); return -1; }
    0          
473              
474             /* Empty data block — skip or handle column names/types */
475 0 0         if (num_rows == 0) {
476             /* INSERT two-phase: server sent sample block with column structure */
477 0 0         if (self->native_state == NATIVE_WAIT_INSERT_META
478 0 0         && (self->insert_data || self->insert_av) && num_cols > 0) {
    0          
    0          
479             const char **cnames;
480             size_t *cname_lens;
481             const char **ctypes_str;
482             size_t *ctype_lens;
483             col_type_t **ctypes;
484             char *data_pkt;
485             size_t data_pkt_len;
486             uint64_t c;
487 0           int meta_hard = 0; /* 1 = hard error, 0 = need more */
488 0           const char *meta_err = NULL; /* non-NULL → goto meta_cleanup */
489              
490 0 0         Newxz(cnames, num_cols, const char *);
491 0 0         Newxz(cname_lens, num_cols, size_t);
492 0 0         Newxz(ctypes_str, num_cols, const char *);
493 0 0         Newxz(ctype_lens, num_cols, size_t);
494 0 0         Newxz(ctypes, num_cols, col_type_t *);
495              
496 0 0         for (c = 0; c < num_cols; c++) {
497 0           rc = read_native_string_ref(dbuf, dlen, &dpos,
498 0           &cnames[c], &cname_lens[c]);
499 0 0         if (rc <= 0) {
500 0 0         meta_hard = (rc < 0 || decompressed != NULL);
    0          
501 0           meta_err = "malformed cname";
502 0           goto meta_cleanup;
503             }
504 0           rc = read_native_string_ref(dbuf, dlen, &dpos,
505 0           &ctypes_str[c], &ctype_lens[c]);
506 0 0         if (rc <= 0) {
507 0 0         meta_hard = (rc < 0 || decompressed != NULL);
    0          
508 0           meta_err = "malformed ctype";
509 0           goto meta_cleanup;
510             }
511 0           ctypes[c] = parse_col_type(ctypes_str[c], ctype_lens[c]);
512              
513             /* custom serialization flag (revision >= 54446) */
514 0 0         if (dpos >= dlen) {
515 0           meta_hard = (decompressed != NULL);
516 0           meta_err = "truncated custom_ser";
517 0           goto meta_cleanup;
518             }
519 0 0         if ((uint8_t)dbuf[dpos]) {
520 0           meta_hard = 1;
521 0           meta_err = "custom serialization not supported";
522 0           goto meta_cleanup;
523             }
524 0           dpos++;
525             }
526 0           goto meta_ok;
527              
528 0           meta_cleanup:
529 0 0         for (c = 0; c < num_cols; c++) if (ctypes[c]) free_col_type(ctypes[c]);
    0          
530 0           Safefree(cnames); Safefree(cname_lens);
531 0           Safefree(ctypes_str); Safefree(ctype_lens);
532 0           Safefree(ctypes);
533 0 0         if (decompressed) Safefree(decompressed);
534 0 0         if (meta_hard) { *errmsg = safe_strdup(meta_err); return -1; }
535 0           return 0;
536              
537 0           meta_ok: ;
538              
539             /* Build binary data block from stored data */
540 0 0         if (self->insert_av) {
541 0           data_pkt = build_native_insert_data_from_av(aTHX_ self,
542 0           (AV *)SvRV(self->insert_av),
543             cnames, cname_lens, ctypes_str, ctype_lens,
544             ctypes, (int)num_cols, &data_pkt_len);
545             } else {
546 0           data_pkt = build_native_insert_data(self,
547 0           self->insert_data, self->insert_data_len,
548             cnames, cname_lens, ctypes_str, ctype_lens,
549             ctypes, (int)num_cols, &data_pkt_len);
550             }
551              
552 0 0         for (c = 0; c < num_cols; c++)
553 0           free_col_type(ctypes[c]);
554 0           Safefree(cnames); Safefree(cname_lens);
555 0           Safefree(ctypes_str); Safefree(ctype_lens);
556 0           Safefree(ctypes);
557              
558             {
559             /* Check encode-failure sentinel before freeing insert data */
560 0 0         int encode_failed = (!data_pkt && data_pkt_len == (size_t)-1);
    0          
561              
562             /* Free stored INSERT data */
563 0 0         CLEAR_INSERT(self);
    0          
564              
565 0 0         if (decompressed) Safefree(decompressed);
566 0           else pos = dpos;
567 0           recv_consume(self, pos);
568              
569 0 0         if (!data_pkt) {
570             /* Send empty Data block to complete the INSERT protocol */
571             native_buf_t fallback;
572 0           nbuf_init(&fallback);
573 0           nbuf_empty_data_block(&fallback, self->compress);
574 0           data_pkt = fallback.data;
575 0           data_pkt_len = fallback.len;
576 0 0         if (encode_failed)
577 0           self->insert_err = safe_strdup(
578             "native INSERT encoding failed (unsupported type)");
579             }
580             }
581              
582             /* Send the data block — write to send_buf and start writing */
583 0           self->native_state = NATIVE_WAIT_RESULT;
584 0           send_replace(self, data_pkt, data_pkt_len);
585 0 0         if (try_write(self)) return -2;
586 0           return 1;
587             }
588              
589             /* INSERT two-phase with 0-column sample block: free data,
590             * send empty Data block, transition to WAIT_RESULT */
591 0 0         if (self->native_state == NATIVE_WAIT_INSERT_META
592 0 0         && (self->insert_data || self->insert_av) && num_cols == 0) {
    0          
    0          
593             native_buf_t fallback;
594              
595 0 0         CLEAR_INSERT(self);
    0          
596              
597 0 0         if (decompressed) Safefree(decompressed);
598 0           else pos = dpos;
599 0           recv_consume(self, pos);
600              
601 0           nbuf_init(&fallback);
602 0           nbuf_empty_data_block(&fallback, self->compress);
603 0           self->native_state = NATIVE_WAIT_RESULT;
604 0           send_replace(self, fallback.data, fallback.len);
605 0           self->insert_err = safe_strdup(
606             "INSERT failed: server sent 0-column sample block");
607 0 0         if (try_write(self)) return -2;
608 0           return 1;
609             }
610              
611             /* Normal empty block — capture column names/types so callers can
612             * inspect schema after a zero-row SELECT. Reset only when we have
613             * columns to record so the terminating empty block (num_cols=0)
614             * doesn't wipe the schema we already captured; this also drops
615             * any partial state from a need-more retry. */
616             {
617             uint64_t c;
618 0 0         if (num_cols > 0) {
619 0 0         CLEAR_SV(self->native_col_names);
620 0 0         CLEAR_SV(self->native_col_types);
621 0           self->native_col_names = newAV();
622 0           self->native_col_types = newAV();
623             }
624 0 0         for (c = 0; c < num_cols; c++) {
625             const char *cname, *ctype;
626             size_t cname_len, ctype_len;
627              
628 0           rc = read_native_string_ref(dbuf, dlen, &dpos, &cname, &cname_len);
629 0 0         if (rc <= 0) {
630 0 0         if (decompressed) Safefree(decompressed);
631 0 0         if (rc < 0) { *errmsg = safe_strdup("malformed cname"); return -1; }
632 0           return 0;
633             }
634 0           av_push(self->native_col_names, newSVpvn(cname, cname_len));
635              
636 0           rc = read_native_string_ref(dbuf, dlen, &dpos, &ctype, &ctype_len);
637 0 0         if (rc <= 0) {
638 0 0         if (decompressed) Safefree(decompressed);
639 0 0         if (rc < 0) { *errmsg = safe_strdup("malformed ctype"); return -1; }
640 0           return 0;
641             }
642 0           av_push(self->native_col_types, newSVpvn(ctype, ctype_len));
643              
644             /* custom serialization flag (revision >= 54446) */
645 0 0         if (dpos >= dlen) {
646 0 0         if (decompressed) Safefree(decompressed);
647 0           return 0;
648             }
649 0 0         if ((uint8_t)dbuf[dpos]) {
650 0 0         if (decompressed) Safefree(decompressed);
651 0           *errmsg = safe_strdup("custom serialization not supported");
652 0           return -1;
653             }
654 0           dpos++;
655             }
656             }
657 0 0         if (decompressed) Safefree(decompressed);
658 0           else pos = dpos; /* uncompressed: advance pos to match dpos */
659 0           recv_consume(self, pos);
660 0           return 1;
661             }
662              
663             /* Decode columns and convert to rows */
664             {
665 0           SV ***columns = NULL;
666 0           col_type_t **col_types = NULL;
667 0           const char **cnames = NULL;
668 0           size_t *cname_lens = NULL;
669             uint64_t c, r;
670 0           int named = (self->decode_flags & DECODE_NAMED_ROWS) ? 1 : 0;
671              
672 0 0         Newxz(columns, num_cols, SV**);
673 0 0         Newxz(col_types, num_cols, col_type_t*);
674 0 0         if (named) {
675 0 0         Newxz(cnames, num_cols, const char *);
676 0 0         Newx(cname_lens, num_cols, size_t);
677             }
678              
679 0 0         for (c = 0; c < num_cols; c++) {
680             const char *cname, *ctype;
681             size_t cname_len, ctype_len;
682              
683 0           rc = read_native_string_ref(dbuf, dlen, &dpos, &cname, &cname_len);
684 0 0         if (rc == 0) {
685 0 0         if (decompressed) { *errmsg = safe_strdup("truncated cname"); goto data_error; }
686 0           goto data_need_more;
687             }
688 0 0         if (rc < 0) { *errmsg = safe_strdup("malformed cname"); goto data_error; }
689              
690 0 0         if (named) {
691 0           cnames[c] = cname;
692 0           cname_lens[c] = cname_len;
693             }
694              
695             /* Allocate AVs on first column of first data block. Reset
696             * any partial state from a need-more retry: pipeline_advance
697             * cleared at dispatch, but if we returned 0 mid-loop the AVs
698             * may now hold a partial column list. */
699 0 0         if (c == 0) {
700 0 0         CLEAR_SV(self->native_col_names);
701 0 0         CLEAR_SV(self->native_col_types);
702 0           self->native_col_names = newAV();
703 0           self->native_col_types = newAV();
704             }
705 0           av_push(self->native_col_names, newSVpvn(cname, cname_len));
706              
707 0           rc = read_native_string_ref(dbuf, dlen, &dpos, &ctype, &ctype_len);
708 0 0         if (rc == 0) {
709 0 0         if (decompressed) { *errmsg = safe_strdup("truncated ctype"); goto data_error; }
710 0           goto data_need_more;
711             }
712 0 0         if (rc < 0) { *errmsg = safe_strdup("malformed ctype"); goto data_error; }
713              
714 0           col_types[c] = parse_col_type(ctype, ctype_len);
715 0           av_push(self->native_col_types, newSVpvn(ctype, ctype_len));
716              
717             /* custom serialization flag (revision >= 54446) */
718 0 0         if (dpos >= dlen) {
719 0 0         if (decompressed) { *errmsg = safe_strdup("truncated custom_ser"); goto data_error; }
720 0           goto data_need_more;
721             }
722 0 0         if ((uint8_t)dbuf[dpos]) {
723 0           *errmsg = safe_strdup("custom serialization not supported");
724 0           goto data_error;
725             }
726 0           dpos++;
727              
728             /* Allocate LC dict state on first column of first block */
729 0 0         if (c == 0 && !self->lc_dicts) {
    0          
730 0 0         Newxz(self->lc_dicts, num_cols, SV**);
731 0 0         Newxz(self->lc_dict_sizes, num_cols, uint64_t);
732 0           self->lc_num_cols = (int)num_cols;
733             }
734              
735             {
736 0           int col_err = 0;
737 0           columns[c] = decode_column_ex(dbuf, dlen, &dpos, num_rows, col_types[c], &col_err, self->decode_flags, self, (int)c);
738 0 0         if (!columns[c]) {
739 0 0         if (col_err || decompressed) {
    0          
740 0           *errmsg = safe_strdup("decode_column failed");
741 0           goto data_error;
742             }
743 0           goto data_need_more;
744             }
745             }
746             }
747              
748             /* Convert column-oriented to row-oriented */
749             {
750             AV **target;
751 0 0         if (ptype == SERVER_TOTALS) {
752 0 0         if (!self->native_totals) self->native_totals = newAV();
753 0           target = &self->native_totals;
754 0 0         } else if (ptype == SERVER_EXTREMES) {
755 0 0         if (!self->native_extremes) self->native_extremes = newAV();
756 0           target = &self->native_extremes;
757             } else {
758 0 0         if (!self->native_rows) self->native_rows = newAV();
759 0           target = &self->native_rows;
760             }
761              
762 0 0         if (named) {
763 0 0         for (r = 0; r < num_rows; r++) {
764 0           HV *hv = newHV();
765 0 0         for (c = 0; c < num_cols; c++) {
766 0 0         if (!hv_store(hv, cnames[c], cname_lens[c], columns[c][r], 0))
767 0           SvREFCNT_dec(columns[c][r]);
768             }
769 0           av_push(*target, newRV_noinc((SV*)hv));
770             }
771             } else {
772 0 0         for (r = 0; r < num_rows; r++) {
773 0           AV *row = newAV();
774 0 0         if (num_cols > 0)
775 0           av_extend(row, num_cols - 1);
776 0 0         for (c = 0; c < num_cols; c++) {
777 0           av_push(row, columns[c][r]);
778             }
779 0           av_push(*target, newRV_noinc((SV*)row));
780             }
781             }
782             }
783              
784             /* Fire on_data streaming callback if set (only for DATA, not TOTALS/EXTREMES) */
785             {
786 0 0         SV *on_data = (ptype == SERVER_DATA) ? peek_cb_on_data(self) : NULL;
787 0 0         if (on_data && self->native_rows) {
    0          
788             /* Hold a reference across call_sv: a reentrant
789             * skip_pending() / cancel() in the handler would
790             * otherwise pop the cb_queue entry and free this
791             * callback while we're still invoking it. */
792 0           SvREFCNT_inc(on_data);
793 0           self->callback_depth++;
794             {
795 0           dSP;
796 0           ENTER; SAVETMPS;
797 0 0         PUSHMARK(SP);
798 0           PUSHs(sv_2mortal(newRV_inc((SV*)self->native_rows)));
799 0           PUTBACK;
800 0           call_sv(on_data, G_DISCARD | G_EVAL);
801 0 0         WARN_AND_CLEAR_ERRSV("on_data handler");
    0          
    0          
    0          
802 0 0         FREETMPS; LEAVE;
803             }
804             /* Decrement on_data BEFORE callback_depth-- so a DESTROY
805             * triggered by the dec (closure holding last $ch ref)
806             * still sees callback_depth > 0 and defers Safefree(self). */
807 0           SvREFCNT_dec(on_data);
808 0           self->callback_depth--;
809             /* Clear accumulated rows for next block */
810 0 0         CLEAR_SV(self->native_rows);
811 0 0         if (check_destroyed(self)) {
812 0 0         if (cnames) Safefree(cnames);
813 0 0         if (cname_lens) Safefree(cname_lens);
814 0 0         for (c = 0; c < num_cols; c++) {
815 0           Safefree(columns[c]);
816 0           free_col_type(col_types[c]);
817             }
818 0           Safefree(columns); Safefree(col_types);
819 0 0         if (decompressed) Safefree(decompressed);
820 0           return -2;
821             }
822             }
823             }
824              
825             /* Cleanup column arrays (SVs moved to rows, don't dec refcnt) */
826 0 0         for (c = 0; c < num_cols; c++) {
827 0           Safefree(columns[c]);
828 0           free_col_type(col_types[c]);
829             }
830 0           Safefree(columns);
831 0           Safefree(col_types);
832 0 0         if (cnames) Safefree(cnames);
833 0 0         if (cname_lens) Safefree(cname_lens);
834 0 0         if (decompressed) Safefree(decompressed);
835 0           else pos = dpos; /* uncompressed: advance pos to match dpos */
836              
837             /* Consume from recv_buf */
838 0           recv_consume(self, pos);
839 0           return 1;
840              
841 0           data_error:
842 0           data_need_more:
843             /* Cleanup partial decode */
844 0 0         for (c = 0; c < num_cols; c++) {
845 0 0         if (columns[c]) {
846             uint64_t j;
847 0 0         for (j = 0; j < num_rows; j++) {
848 0 0         if (columns[c][j]) SvREFCNT_dec(columns[c][j]);
849             }
850 0           Safefree(columns[c]);
851             }
852 0 0         if (col_types[c]) free_col_type(col_types[c]);
853             }
854 0           Safefree(columns);
855 0           Safefree(col_types);
856 0 0         if (cnames) Safefree(cnames);
857 0 0         if (cname_lens) Safefree(cname_lens);
858 0 0         if (decompressed) Safefree(decompressed);
859 0 0         if (*errmsg) {
860             /* data_error: flush recv_buf — data is malformed, cannot resume */
861 0           self->recv_len = 0;
862 0           return -1;
863             }
864 0           return 0;
865             }
866             }
867              
868 0           case SERVER_EXCEPTION: {
869             /* code: Int32, name: String, message: String,
870             * stack_trace: String, has_nested: UInt8 */
871             int32_t code;
872             const char *name, *msg, *stack;
873             size_t name_len, msg_len, stack_len;
874             uint8_t has_nested;
875             char *err;
876              
877             /* We just read the top-level exception */
878 0           rc = read_i32(buf, len, &pos, &code);
879 0 0         if (rc == 0) return 0;
880 0 0         if (rc < 0) { *errmsg = safe_strdup("malformed exception code"); return -1; }
881              
882 0           rc = read_native_string_ref(buf, len, &pos, &name, &name_len);
883 0 0         if (rc == 0) return 0;
884 0 0         if (rc < 0) { *errmsg = safe_strdup("malformed exception name"); return -1; }
885              
886 0           rc = read_native_string_ref(buf, len, &pos, &msg, &msg_len);
887 0 0         if (rc == 0) return 0;
888 0 0         if (rc < 0) { *errmsg = safe_strdup("malformed exception message"); return -1; }
889              
890 0           rc = read_native_string_ref(buf, len, &pos, &stack, &stack_len);
891 0 0         if (rc == 0) return 0;
892 0 0         if (rc < 0) { *errmsg = safe_strdup("malformed exception stack"); return -1; }
893              
894 0           rc = read_u8(buf, len, &pos, &has_nested);
895 0 0         if (rc == 0) return 0;
896 0 0         if (rc < 0) { *errmsg = safe_strdup("malformed exception has_nested"); return -1; }
897              
898             /* Skip nested exceptions — keep the top-level code, not the innermost. */
899 0 0         while (has_nested) {
900             int32_t nested_code;
901             int i;
902 0           rc = read_i32(buf, len, &pos, &nested_code);
903 0 0         if (rc == 0) return 0;
904 0 0         if (rc < 0) { *errmsg = safe_strdup("malformed nested exception"); return -1; }
905             /* name, message, stack_trace */
906 0 0         for (i = 0; i < 3; i++) {
907 0           rc = skip_native_string(buf, len, &pos);
908 0 0         if (rc == 0) return 0;
909 0 0         if (rc < 0) { *errmsg = safe_strdup("malformed nested exception"); return -1; }
910             }
911 0           rc = read_u8(buf, len, &pos, &has_nested);
912 0 0         if (rc == 0) return 0;
913 0 0         if (rc < 0) { *errmsg = safe_strdup("malformed nested exception"); return -1; }
914             }
915              
916 0           self->last_error_code = code;
917              
918 0           Newx(err, msg_len + name_len + 64, char);
919 0           snprintf(err, msg_len + name_len + 64, "Code: %d. %.*s: %.*s",
920             (int)code, (int)name_len, name, (int)msg_len, msg);
921              
922 0           recv_consume(self, pos);
923              
924 0           *errmsg = err;
925 0           return -1;
926             }
927              
928 0           case SERVER_PROGRESS: {
929             /* rows, bytes, total_rows, written_rows (>=54420), written_bytes (>=54420) */
930 0           uint64_t pp[5] = {0};
931 0 0         int n = (self->server_revision >= DBMS_MIN_REVISION_WITH_PROGRESS_WRITES) ? 5 : 3;
932             int i;
933 0 0         for (i = 0; i < n; i++) {
934 0           rc = read_varuint(buf, len, &pos, &pp[i]);
935 0 0         if (rc == 0) return 0;
936 0 0         if (rc < 0) { *errmsg = safe_strdup("malformed progress packet"); return -1; }
937             }
938              
939 0           recv_consume(self, pos);
940              
941 0 0         if (self->on_progress) {
942             /* Coalesce when progress_period is set: the protocol sends
943             * incremental deltas, so accumulate and fire at the cadence. */
944 0 0         if (self->progress_period > 0) {
945 0 0         for (i = 0; i < 5; i++) self->progress_acc[i] += pp[i];
946 0           double now = ev_now(self->loop);
947 0 0         if (now - self->progress_last < self->progress_period)
948 0           return 1;
949 0           self->progress_last = now;
950 0 0         for (i = 0; i < 5; i++) { pp[i] = self->progress_acc[i]; self->progress_acc[i] = 0; }
951             }
952              
953 0 0         if (fire_progress_cb(self, pp) < 0) return -2;
954             }
955              
956 0           return 1;
957             }
958              
959 0           case SERVER_PROFILE_INFO: {
960             /* rows, blocks, bytes, applied_limit, rows_before_limit, calc_rows_before_limit */
961             uint64_t pi[6];
962             int i;
963 0 0         for (i = 0; i < 6; i++) {
964 0           rc = read_varuint(buf, len, &pos, &pi[i]);
965 0 0         if (rc == 0) return 0;
966 0 0         if (rc < 0) { *errmsg = safe_strdup("malformed profile_info packet"); return -1; }
967             }
968 0           self->profile_rows = pi[0];
969 0           self->profile_bytes = pi[2];
970 0           self->profile_rows_before_limit = pi[4];
971 0           recv_consume(self, pos);
972 0           return 1;
973             }
974              
975 0           case SERVER_TABLE_COLUMNS: {
976             /* Format: string(table_name) + string(column_description) */
977             int i;
978 0 0         for (i = 0; i < 2; i++) {
979 0           rc = skip_native_string(buf, len, &pos);
980 0 0         if (rc == 0) return 0;
981 0 0         if (rc < 0) { *errmsg = safe_strdup("malformed table_columns packet"); return -1; }
982             }
983 0           recv_consume(self, pos);
984 0           return 1;
985             }
986              
987 0           case SERVER_LOG: {
988 0 0         if (self->on_log) {
989 0           rc = parse_and_emit_log_block(self, buf, len, &pos, self->on_log, errmsg);
990             } else {
991 0           rc = parse_and_discard_block(self, buf, len, &pos, "server log", 0, errmsg);
992             }
993 0 0         if (rc <= 0) return rc;
994 0           recv_consume(self, pos);
995 0           return 1;
996             }
997              
998 0           case SERVER_PROFILE_EVENTS: {
999 0           rc = parse_and_discard_block(self, buf, len, &pos, "profile_events", 1, errmsg);
1000 0 0         if (rc <= 0) return rc;
1001 0           recv_consume(self, pos);
1002 0           return 1;
1003             }
1004              
1005 0           case SERVER_TIMEZONE_UPDATE: {
1006             /* Server-side session timezone changed mid-query. Format: string(tz). */
1007 0           char *tz = NULL;
1008 0           rc = read_native_string_alloc(buf, len, &pos, &tz, NULL);
1009 0 0         if (rc == 0) return 0;
1010 0 0         if (rc < 0) { *errmsg = safe_strdup("malformed timezone_update packet"); return -1; }
1011 0 0         CLEAR_STR(self->server_timezone);
1012 0           self->server_timezone = tz;
1013 0           recv_consume(self, pos);
1014 0           return 1;
1015             }
1016              
1017 0           case SERVER_PONG:
1018 0           recv_consume(self, pos);
1019 0           return 3;
1020              
1021 0           case SERVER_END_OF_STREAM:
1022 0           recv_consume(self, pos);
1023 0           return 2;
1024              
1025 0           default: {
1026             /* Unknown packet type */
1027             char err[64];
1028 0           snprintf(err, sizeof(err), "unknown server packet type: %llu",
1029             (unsigned long long)ptype);
1030 0           *errmsg = safe_strdup(err);
1031 0           self->recv_len = 0;
1032 0           return -1;
1033             }
1034             }
1035             }
1036              
1037             /*
1038             * Process native protocol responses from recv_buf.
1039             * Called from on_readable when protocol == PROTO_NATIVE.
1040             */
1041 0           static void process_native_response(ev_clickhouse_t *self) {
1042 0 0         while (self->recv_len > 0 && self->magic == EV_CH_MAGIC) {
    0          
1043 0           char *errmsg = NULL;
1044             int rc;
1045 0           rc = parse_native_packet(self, &errmsg);
1046              
1047 0 0         if (rc == 0) {
1048             /* need more data */
1049 0           return;
1050             }
1051              
1052 0 0         if (rc == -2) {
1053             /* object destroyed inside callback */
1054 0           return;
1055             }
1056              
1057 0 0         if (rc == 4) {
1058             /* ServerHello received — send addendum (revision >= 54458) */
1059 0 0         if (self->native_state == NATIVE_WAIT_HELLO) {
1060             /* Addendum: quota_key (only if server supports it) */
1061 0 0         if (self->server_revision >= DBMS_MIN_PROTOCOL_VERSION_WITH_ADDENDUM) {
1062             native_buf_t ab;
1063 0           nbuf_init(&ab);
1064 0           nbuf_cstring(&ab, ""); /* quota_key */
1065 0           send_replace(self, ab.data, ab.len);
1066 0 0         if (try_write(self)) return;
1067 0 0         if (self->send_pos < self->send_len) {
1068             /* Addendum partially written (EAGAIN); io_cb finishes
1069             * connect once the buffer drains, otherwise pipeline_advance
1070             * could overwrite the unsent tail with a queued query. */
1071 0           self->pending_addendum_finish = 1;
1072 0           return;
1073             }
1074             }
1075 0           self->native_state = NATIVE_IDLE;
1076 0 0         if (finish_connect(self)) return;
1077             }
1078             /* pipeline_advance -> try_write may free self; no data
1079             * in recv_buf for the just-dispatched request yet */
1080 0           return;
1081             }
1082              
1083 0 0         if (rc == -1) {
1084             /* error */
1085 0 0         if (self->native_state == NATIVE_WAIT_HELLO) {
1086             /* Skip auto_reconnect on a malformed ServerHello: the peer
1087             * isn't a ClickHouse server, so retrying just spins.
1088             * (connect_timeout still uses fail_connection, which retries.) */
1089 0           teardown_io_error(self, errmsg, "connection failed");
1090 0           Safefree(errmsg);
1091 0           return;
1092             }
1093              
1094             /* Stop query timeout timer */
1095 0           stop_timing(self);
1096              
1097             /* Query error — deliver to callback */
1098 0 0         CLEAR_SV(self->native_rows);
1099 0 0         CLEAR_INSERT(self);
    0          
1100 0 0         CLEAR_STR(self->insert_err);
1101 0           self->native_state = NATIVE_IDLE;
1102 0           self->recv_len = 0; /* flush malformed data */
1103 0 0         if (self->send_count > 0) self->send_count--;
1104 0           lc_free_dicts(self);
1105 0           int destroyed = deliver_error(self, errmsg);
1106 0           Safefree(errmsg);
1107 0 0         if (destroyed) return;
1108              
1109             /* advance pipeline — may free self via try_write error */
1110 0           pipeline_advance(self);
1111 0           return;
1112             }
1113              
1114 0 0         if (rc == 2) {
1115             /* EndOfStream — deliver accumulated rows or deferred error */
1116 0           stop_timing(self);
1117 0           self->native_state = NATIVE_IDLE;
1118 0 0         if (self->send_count > 0) self->send_count--;
1119 0           lc_free_dicts(self);
1120              
1121             /* Flush any uncoalesced progress accumulated since the last
1122             * fire so users instrumenting via on_progress see the full
1123             * total when the query completes within one progress_period
1124             * of the last fire. */
1125 0 0         if (self->on_progress && self->progress_period > 0) {
    0          
1126 0           int any = 0, i;
1127 0 0         for (i = 0; i < 5; i++) if (self->progress_acc[i]) { any = 1; break; }
    0          
1128 0 0         if (any) {
1129             uint64_t pp[5];
1130 0           memcpy(pp, self->progress_acc, sizeof(pp));
1131 0           memset(self->progress_acc, 0, sizeof(self->progress_acc));
1132 0 0         if (fire_progress_cb(self, pp) < 0) return;
1133             }
1134             }
1135              
1136 0 0         if (self->insert_err) {
1137 0           char *err = self->insert_err;
1138 0           self->insert_err = NULL;
1139 0 0         CLEAR_SV(self->native_rows);
1140 0           int destroyed = deliver_error(self, err);
1141 0           Safefree(err);
1142 0 0         if (destroyed) return;
1143             } else {
1144 0           AV *rows = self->native_rows;
1145 0           self->native_rows = NULL;
1146 0 0         if (deliver_rows(self, rows)) return;
1147             }
1148              
1149             /* advance pipeline — may free self via try_write error */
1150 0           pipeline_advance(self);
1151 0           return;
1152             }
1153              
1154 0 0         if (rc == 3) {
1155             /* Pong — ack a keepalive ping, or deliver to user's ping() cb */
1156 0           self->native_state = NATIVE_IDLE;
1157 0 0         if (self->ka_in_flight > 0) {
1158             /* Keepalive ack: not tied to send_count or any user cb */
1159 0           self->ka_in_flight--;
1160 0           continue;
1161             }
1162 0           stop_timing(self);
1163 0 0         if (self->send_count > 0) self->send_count--;
1164 0           AV *rows = newAV();
1165 0 0         if (deliver_rows(self, rows)) return;
1166 0           pipeline_advance(self);
1167 0           return;
1168             }
1169              
1170             /* rc == 1: Data/Progress/ProfileInfo — continue reading */
1171             }
1172             }
1173