File Coverage

xs/proto_native_parse.c
Criterion Covered Total %
statement 0 646 0.0
branch 0 670 0.0
condition n/a
subroutine n/a
pod n/a
total 0 1316 0.0


line stmt bran cond sub pod time code
1             /* --- Native protocol response parser --- */
2              
3             /*
4             * Skip block info fields (revision >= DBMS_MIN_REVISION_WITH_BLOCK_INFO).
5             * Returns 1 on success, 0 if need more data.
6             */
7 0           static int skip_block_info(const char *buf, size_t len, size_t *pos) {
8 0           for (;;) {
9             uint64_t field_num;
10 0           int rc = read_varuint(buf, len, pos, &field_num);
11 0 0         if (rc == 0) return 0;
12 0 0         if (rc < 0) return -1;
13 0 0         if (field_num == 0) return 1; /* end marker */
14 0 0         if (field_num == 1) {
15             /* is_overflows: UInt8 */
16             uint8_t dummy;
17 0           rc = read_u8(buf, len, pos, &dummy);
18 0 0         if (rc <= 0) return rc;
19 0 0         } else if (field_num == 2) {
20             /* bucket_num: Int32 */
21             int32_t dummy;
22 0           rc = read_i32(buf, len, pos, &dummy);
23 0 0         if (rc <= 0) return rc;
24             } else {
25 0           return -1; /* protocol error */
26             }
27             }
28             }
29              
30             /*
31             * Try to parse one server packet from recv_buf.
32             * Returns:
33             * 1 = packet consumed, continue reading
34             * 0 = need more data
35             * -1 = error (message in *errmsg, caller must Safefree)
36             * 2 = EndOfStream
37             * 3 = Pong
38             * 4 = Hello parsed (self->server_* fields populated)
39             */
40              
41             /* Parse a Data block (table_name + optional-LZ4-chain + block_info + columns)
42             * and decode-and-discard each column. Used by SERVER_LOG and SERVER_PROFILE_EVENTS,
43             * which carry a block in the same wire format as SERVER_DATA but whose contents
44             * the client does not surface to Perl.
45             *
46             * `lz4_optional` (used by PROFILE_EVENTS only): if set, fall back to parsing
47             * the body uncompressed when LZ4 hard-fails — older servers occasionally send
48             * profile_events uncompressed even on a compressed connection.
49             *
50             * Returns 1 on success (advances *outer_pos past the consumed bytes),
51             * 0 on need-more-data,
52             * -1 on hard error (sets *errmsg). */
53 0           static int parse_and_discard_block(ev_clickhouse_t *self,
54             const char *buf, size_t len, size_t *outer_pos,
55             const char *kind, int lz4_optional,
56             char **errmsg) {
57 0           size_t pos = *outer_pos;
58             int rc;
59 0           char *decompressed = NULL;
60             const char *bbuf;
61             size_t blen, bpos;
62             char errbuf[64];
63              
64 0           rc = skip_native_string(buf, len, &pos);
65 0 0         if (rc == 0) return 0;
66 0 0         if (rc < 0) {
67 0           snprintf(errbuf, sizeof(errbuf), "malformed %s block", kind);
68 0           *errmsg = safe_strdup(errbuf);
69 0           return -1;
70             }
71              
72             #ifdef HAVE_LZ4
73             if (self->compress) {
74             int need_more = 0;
75             const char *lz4_err = NULL;
76             decompressed = ch_lz4_decompress_chain(buf, len, &pos, &blen,
77             &need_more, &lz4_err);
78             if (!decompressed) {
79             if (need_more) return 0;
80             if (!lz4_optional) {
81             snprintf(errbuf, sizeof(errbuf), "%s: LZ4 decompression failed", kind);
82             *errmsg = safe_strdup(lz4_err ? lz4_err : errbuf);
83             return -1;
84             }
85             /* fall through to uncompressed parsing */
86             }
87             }
88             if (decompressed) {
89             bbuf = decompressed;
90             bpos = 0;
91             } else
92             #endif
93             {
94 0           bbuf = buf;
95 0           blen = len;
96 0           bpos = pos;
97             }
98              
99             #define _BAIL(rc_val) do { \
100             if (decompressed) Safefree(decompressed); \
101             if (rc_val < 0) { snprintf(errbuf, sizeof(errbuf), "malformed %s block", kind); *errmsg = safe_strdup(errbuf); } \
102             return rc_val; \
103             } while (0)
104              
105 0 0         if (self->server_revision >= DBMS_MIN_REVISION_WITH_BLOCK_INFO) {
106 0           rc = skip_block_info(bbuf, blen, &bpos);
107 0 0         if (rc <= 0) _BAIL(rc);
    0          
    0          
108             }
109             {
110             uint64_t nc, nr, c;
111 0           rc = read_varuint(bbuf, blen, &bpos, &nc);
112 0 0         if (rc <= 0) _BAIL(rc);
    0          
    0          
113 0           rc = read_varuint(bbuf, blen, &bpos, &nr);
114 0 0         if (rc <= 0) _BAIL(rc);
    0          
    0          
115              
116 0 0         for (c = 0; c < nc; c++) {
117             const char *ctype;
118             size_t ctype_len;
119 0           rc = skip_native_string(bbuf, blen, &bpos);
120 0 0         if (rc <= 0) _BAIL(rc);
    0          
    0          
121 0           rc = read_native_string_ref(bbuf, blen, &bpos, &ctype, &ctype_len);
122 0 0         if (rc <= 0) _BAIL(rc);
    0          
    0          
123             /* custom serialization flag (revision >= 54446) */
124 0 0         if (bpos >= blen) _BAIL(0);
    0          
125 0 0         if ((uint8_t)bbuf[bpos]) {
126 0 0         if (decompressed) Safefree(decompressed);
127 0           *errmsg = safe_strdup("custom serialization not supported");
128 0           return -1;
129             }
130 0           bpos++;
131 0 0         if (nr > 0) {
132 0           col_type_t *ct = parse_col_type(ctype, ctype_len);
133 0           int col_err = 0;
134 0           SV **vals = decode_column(bbuf, blen, &bpos, nr, ct, &col_err, 0);
135 0 0         if (!vals) {
136 0           free_col_type(ct);
137 0 0         if (col_err || decompressed) {
    0          
138 0 0         if (decompressed) Safefree(decompressed);
139 0           snprintf(errbuf, sizeof(errbuf), "malformed %s block", kind);
140 0           *errmsg = safe_strdup(errbuf);
141 0           return -1;
142             }
143 0           return 0;
144             }
145             {
146             uint64_t j;
147 0 0         for (j = 0; j < nr; j++) SvREFCNT_dec(vals[j]);
148             }
149 0           Safefree(vals);
150 0           free_col_type(ct);
151             }
152             }
153             }
154              
155             #undef _BAIL
156              
157 0 0         if (!decompressed) pos = bpos;
158 0 0         if (decompressed) Safefree(decompressed);
159 0           *outer_pos = pos;
160 0           return 1;
161             }
162              
163             /* Like parse_and_discard_block, but for each row of the block invokes
164             * `cb` with one hashref keyed by column name. Used by on_log.
165             * Caller has already verified cb is non-NULL. */
166 0           static int parse_and_emit_log_block(ev_clickhouse_t *self,
167             const char *buf, size_t len, size_t *outer_pos,
168             SV *cb, char **errmsg) {
169 0           size_t pos = *outer_pos;
170             int rc;
171 0           char *decompressed = NULL;
172             const char *bbuf;
173             size_t blen, bpos;
174              
175 0           rc = skip_native_string(buf, len, &pos);
176 0 0         if (rc == 0) return 0;
177 0 0         if (rc < 0) { *errmsg = safe_strdup("malformed log block"); return -1; }
178              
179             #ifdef HAVE_LZ4
180             if (self->compress) {
181             int need_more = 0;
182             const char *lz4_err = NULL;
183             decompressed = ch_lz4_decompress_chain(buf, len, &pos, &blen,
184             &need_more, &lz4_err);
185             if (!decompressed) {
186             if (need_more) return 0;
187             /* server log frames are not always compressed even with
188             * compress=1 negotiated — fall through to raw parsing. */
189             }
190             }
191             if (decompressed) { bbuf = decompressed; bpos = 0; }
192             else
193             #endif
194 0           { bbuf = buf; blen = len; bpos = pos; }
195              
196             #define _BAIL_LOG(rc_val) do { \
197             if (decompressed) Safefree(decompressed); \
198             if (rc_val < 0) { *errmsg = safe_strdup("malformed log block"); } \
199             return rc_val; \
200             } while (0)
201              
202 0 0         if (self->server_revision >= DBMS_MIN_REVISION_WITH_BLOCK_INFO) {
203 0           rc = skip_block_info(bbuf, blen, &bpos);
204 0 0         if (rc <= 0) _BAIL_LOG(rc);
    0          
    0          
205             }
206             uint64_t nc, nr;
207 0           rc = read_varuint(bbuf, blen, &bpos, &nc);
208 0 0         if (rc <= 0) _BAIL_LOG(rc);
    0          
    0          
209 0           rc = read_varuint(bbuf, blen, &bpos, &nr);
210 0 0         if (rc <= 0) _BAIL_LOG(rc);
    0          
    0          
211              
212             /* Collect column name + decoded values, then assemble per-row HVs. */
213 0           char **names = NULL;
214 0           SV ***data = NULL; /* data[col][row] */
215 0 0         if (nc > 0) {
216             /* A column occupies at least one wire byte (its name-length varint),
217             * so more columns than remaining bytes is malformed — reject before
218             * the allocation rather than letting Newxz attempt a huge size. */
219 0 0         if (nc > (uint64_t)(blen - bpos)) _BAIL_LOG(-1);
    0          
220 0 0         Newxz(names, nc, char *);
221 0 0         Newxz(data, nc, SV **);
222             }
223             /* err_seen: -1 = malformed, 0 = success / need-more, +1 = all columns
224             * parsed cleanly. We distinguish "loop completed all nc columns" from
225             * "loop broke early needing more data" via the explicit flag rather
226             * than inspecting bpos, since bpos always advances past the header. */
227 0           int err_seen = 1;
228 0 0         for (uint64_t c = 0; c < nc; c++) {
229             const char *cname; size_t cname_len;
230 0           rc = read_native_string_ref(bbuf, blen, &bpos, &cname, &cname_len);
231 0 0         if (rc <= 0) { err_seen = rc < 0 ? -1 : 0; break; }
    0          
232 0           Newx(names[c], cname_len + 1, char);
233 0           Copy(cname, names[c], cname_len, char);
234 0           names[c][cname_len] = '\0';
235              
236             const char *ctype; size_t ctype_len;
237 0           rc = read_native_string_ref(bbuf, blen, &bpos, &ctype, &ctype_len);
238 0 0         if (rc <= 0) { err_seen = rc < 0 ? -1 : 0; break; }
    0          
239 0 0         if (bpos >= blen) { err_seen = 0; break; }
240 0 0         if ((uint8_t)bbuf[bpos]) { err_seen = -1; break; }
241 0           bpos++;
242 0 0         if (nr > 0) {
243 0           col_type_t *ct = parse_col_type(ctype, ctype_len);
244 0           int col_err = 0;
245 0           SV **vals = decode_column(bbuf, blen, &bpos, nr, ct, &col_err, 0);
246 0           free_col_type(ct);
247 0 0         if (!vals) { err_seen = col_err ? -1 : 0; break; }
    0          
248 0           data[c] = vals;
249             }
250             }
251              
252 0           int destroyed_during_cb = 0;
253 0 0         if (err_seen == 1) {
254             /* Pin cb across the loop: an on_log handler that calls
255             * $ch->on_log(undef) would otherwise free the CV mid-iteration. */
256 0           SvREFCNT_inc(cb);
257 0           self->callback_depth++;
258 0 0         for (uint64_t r = 0; r < nr; r++) {
259 0           HV *row = newHV();
260 0 0         for (uint64_t c = 0; c < nc; c++) {
261 0           SV *v = data[c][r];
262 0           SvREFCNT_inc(v); /* hv_store consumes one ref */
263 0           (void)hv_store(row, names[c], strlen(names[c]), v, 0);
264             }
265 0           dSP;
266 0 0         ENTER; SAVETMPS; PUSHMARK(SP);
267 0 0         EXTEND(SP, 1);
268 0           PUSHs(sv_2mortal(newRV_noinc((SV *)row)));
269 0           PUTBACK;
270 0           call_sv(cb, G_EVAL | G_VOID | G_DISCARD);
271 0 0         WARN_AND_CLEAR_ERRSV("on_log");
    0          
    0          
    0          
272 0 0         FREETMPS; LEAVE;
273 0 0         if (self->magic == EV_CH_FREED) { destroyed_during_cb = 1; break; }
274             }
275             /* Drop the cb pin BEFORE callback_depth-- : dropping the last ref
276             * to the on_log CV (a handler may have reassigned on_log) can free
277             * a closure that captured $ch, triggering DESTROY. Doing it while
278             * callback_depth is still raised keeps Safefree(self) deferred so
279             * the check_destroyed below can detect and finalize it. */
280 0           SvREFCNT_dec(cb);
281 0           self->callback_depth--;
282             }
283             (void)destroyed_during_cb; /* loop-break flag only; freed state is
284             * authoritatively reported by check_destroyed */
285              
286             /* Free locally-collected SVs + names regardless of self's state. */
287 0 0         for (uint64_t c = 0; c < nc; c++) {
288 0 0         if (data && data[c]) {
    0          
289 0 0         for (uint64_t r = 0; r < nr; r++) SvREFCNT_dec(data[c][r]);
290 0           Safefree(data[c]);
291             }
292 0 0         if (names && names[c]) Safefree(names[c]);
    0          
293             }
294 0 0         if (data) Safefree(data);
295 0 0         if (names) Safefree(names);
296              
297             #undef _BAIL_LOG
298              
299             /* Single cleanup point: decompressed always needs freeing if allocated,
300             * regardless of which branch we return from. */
301             int ret;
302             /* on_log — or dropping the pinned cb above — may have freed self.
303             * check_destroyed finalizes any deferred Safefree and reports it. */
304 0 0         if (check_destroyed(self)) {
305 0           ret = -2;
306 0 0         } else if (err_seen < 0) {
307 0           *errmsg = safe_strdup("malformed log block");
308 0           ret = -1;
309 0 0         } else if (err_seen == 0) { /* need more data — do not advance outer_pos */
310 0           ret = 0;
311             } else {
312             /* For the uncompressed path bpos has been advanced inside the outer
313             * buf; copy it back to pos. For the LZ4 path pos was already advanced
314             * past the chain by ch_lz4_decompress_chain, and bpos is an offset
315             * into the freed decompressed buffer — don't touch pos. */
316 0 0         if (!decompressed) pos = bpos;
317 0           *outer_pos = pos;
318 0           ret = 1;
319             }
320 0 0         if (decompressed) Safefree(decompressed);
321 0           return ret;
322             }
323              
324             /* Dispatch the on_progress callback with five UInt values. Returns -2 if
325             * the handler freed self, 0 otherwise. Shared between SERVER_PROGRESS
326             * (per-packet) and EndOfStream (flush of any uncoalesced accumulator). */
327 0           static int fire_progress_cb(ev_clickhouse_t *self, const uint64_t pp[5]) {
328             int i;
329 0           dSP;
330 0           self->callback_depth++;
331 0           ENTER; SAVETMPS;
332 0 0         PUSHMARK(SP);
333 0 0         EXTEND(SP, 5);
334 0 0         for (i = 0; i < 5; i++) PUSHs(sv_2mortal(newSVuv(pp[i])));
335 0           PUTBACK;
336 0           call_sv(self->on_progress, G_DISCARD | G_EVAL);
337 0 0         WARN_AND_CLEAR_ERRSV("progress handler");
    0          
    0          
    0          
338 0 0         FREETMPS; LEAVE;
339 0           self->callback_depth--;
340 0 0         return check_destroyed(self) ? -2 : 0;
341             }
342              
343 0           static int parse_native_packet(ev_clickhouse_t *self, char **errmsg) {
344 0           const char *buf = self->recv_buf;
345 0           size_t len = self->recv_len;
346 0           size_t pos = 0;
347             uint64_t ptype;
348             int rc;
349              
350 0           rc = read_varuint(buf, len, &pos, &ptype);
351 0 0         if (rc == 0) return 0;
352 0 0         if (rc < 0) {
353 0           *errmsg = safe_strdup("malformed packet type");
354 0           return -1;
355             }
356              
357 0           switch ((int)ptype) {
358              
359 0           case SERVER_HELLO: {
360 0           char *sname = NULL;
361             uint64_t major, minor, revision;
362              
363 0           rc = read_native_string_alloc(buf, len, &pos, &sname, NULL);
364 0 0         if (rc == 0) return 0;
365 0 0         if (rc < 0) { *errmsg = safe_strdup("malformed server name"); return -1; }
366              
367 0           rc = read_varuint(buf, len, &pos, &major);
368 0 0         if (rc == 0) { Safefree(sname); return 0; }
369 0 0         if (rc < 0) { Safefree(sname); *errmsg = safe_strdup("malformed server version major"); return -1; }
370              
371 0           rc = read_varuint(buf, len, &pos, &minor);
372 0 0         if (rc == 0) { Safefree(sname); return 0; }
373 0 0         if (rc < 0) { Safefree(sname); *errmsg = safe_strdup("malformed server version minor"); return -1; }
374              
375 0           rc = read_varuint(buf, len, &pos, &revision);
376 0 0         if (rc == 0) { Safefree(sname); return 0; }
377 0 0         if (rc < 0) { Safefree(sname); *errmsg = safe_strdup("malformed server revision"); return -1; }
378              
379 0 0         CLEAR_STR(self->server_name);
380 0           self->server_name = sname;
381 0           self->server_version_major = (unsigned int)major;
382 0           self->server_version_minor = (unsigned int)minor;
383 0           self->server_revision = (unsigned int)revision;
384              
385             /* The server emits these fields per the negotiated revision =
386             * min(server_rev, our advertised CH_CLIENT_REVISION). Gate on
387             * server_revision so we don't read garbage from older servers. */
388 0 0         if (self->server_revision >= DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE) {
389 0           char *tz = NULL;
390 0           rc = read_native_string_alloc(buf, len, &pos, &tz, NULL);
391 0 0         if (rc == 0) return 0;
392 0 0         if (rc < 0) { *errmsg = safe_strdup("malformed timezone"); return -1; }
393 0 0         CLEAR_STR(self->server_timezone);
394 0           self->server_timezone = tz;
395             }
396              
397 0 0         if (self->server_revision >= DBMS_MIN_REVISION_WITH_SERVER_DISPLAY_NAME) {
398 0           char *dn = NULL;
399 0           rc = read_native_string_alloc(buf, len, &pos, &dn, NULL);
400 0 0         if (rc == 0) return 0;
401 0 0         if (rc < 0) { *errmsg = safe_strdup("malformed display name"); return -1; }
402 0 0         CLEAR_STR(self->server_display_name);
403 0           self->server_display_name = dn;
404             }
405              
406 0 0         if (self->server_revision >= DBMS_MIN_REVISION_WITH_VERSION_PATCH) {
407             uint64_t patch;
408 0           rc = read_varuint(buf, len, &pos, &patch);
409 0 0         if (rc == 0) return 0;
410 0 0         if (rc < 0) { *errmsg = safe_strdup("malformed version patch"); return -1; }
411 0           self->server_version_patch = (unsigned int)patch;
412             }
413              
414             /* consume from recv_buf */
415 0           recv_consume(self, pos);
416 0           return 4;
417             }
418              
419 0           case SERVER_DATA:
420             case SERVER_TOTALS:
421             case SERVER_EXTREMES: {
422             uint64_t num_cols, num_rows;
423             const char *dbuf; /* data buffer (may point to decompressed data) */
424             size_t dlen, dpos;
425 0           char *decompressed = NULL;
426              
427             /* table name — outside compression */
428 0           rc = skip_native_string(buf, len, &pos);
429 0 0         if (rc == 0) return 0;
430 0 0         if (rc < 0) { *errmsg = safe_strdup("malformed table name"); return -1; }
431              
432             #ifdef HAVE_LZ4
433             if (self->compress) {
434             /* Decompress the block body — may span multiple LZ4 sub-blocks. */
435             int need_more = 0;
436             const char *lz4_err = NULL;
437             decompressed = ch_lz4_decompress_chain(buf, len, &pos, &dlen,
438             &need_more, &lz4_err);
439             if (!decompressed) {
440             if (need_more) return 0;
441             *errmsg = safe_strdup(lz4_err ? lz4_err : "LZ4 decompression failed");
442             return -1;
443             }
444             dbuf = decompressed;
445             dpos = 0;
446             } else
447             #endif
448             {
449 0           dbuf = buf;
450 0           dlen = len;
451 0           dpos = pos;
452             }
453              
454             /* block info */
455 0 0         if (self->server_revision >= DBMS_MIN_REVISION_WITH_BLOCK_INFO) {
456 0           rc = skip_block_info(dbuf, dlen, &dpos);
457 0 0         if (rc == 0) {
458 0 0         if (decompressed) { Safefree(decompressed); *errmsg = safe_strdup("truncated compressed block"); return -1; }
459 0           return 0;
460             }
461 0 0         if (rc < 0) { if (decompressed) Safefree(decompressed); *errmsg = safe_strdup("malformed block info"); return -1; }
    0          
462             }
463              
464 0           rc = read_varuint(dbuf, dlen, &dpos, &num_cols);
465 0 0         if (rc == 0) {
466 0 0         if (decompressed) { Safefree(decompressed); *errmsg = safe_strdup("truncated compressed block"); return -1; }
467 0           return 0;
468             }
469 0 0         if (rc < 0) { if (decompressed) Safefree(decompressed); *errmsg = safe_strdup("malformed num_cols"); return -1; }
    0          
470              
471 0           rc = read_varuint(dbuf, dlen, &dpos, &num_rows);
472 0 0         if (rc == 0) {
473 0 0         if (decompressed) { Safefree(decompressed); *errmsg = safe_strdup("truncated compressed block"); return -1; }
474 0           return 0;
475             }
476 0 0         if (rc < 0) { if (decompressed) Safefree(decompressed); *errmsg = safe_strdup("malformed num_rows"); return -1; }
    0          
477              
478             /* Empty data block — skip or handle column names/types */
479 0 0         if (num_rows == 0) {
480             /* INSERT two-phase: server sent sample block with column structure */
481 0 0         if (self->native_state == NATIVE_WAIT_INSERT_META
482 0 0         && (self->insert_data || self->insert_av) && num_cols > 0) {
    0          
    0          
483             const char **cnames;
484             size_t *cname_lens;
485             const char **ctypes_str;
486             size_t *ctype_lens;
487             col_type_t **ctypes;
488             char *data_pkt;
489             size_t data_pkt_len;
490             uint64_t c;
491 0           int meta_hard = 0; /* 1 = hard error, 0 = need more */
492 0           const char *meta_err = NULL; /* non-NULL → goto meta_cleanup */
493              
494             /* Bound the column count to the remaining wire bytes before
495             * allocating, same as the other block-decode sites. */
496 0 0         if (num_cols > (uint64_t)(dlen - dpos)) {
497 0 0         if (decompressed) Safefree(decompressed);
498 0           *errmsg = safe_strdup("too many columns");
499 0           return -1;
500             }
501 0 0         Newxz(cnames, num_cols, const char *);
502 0 0         Newxz(cname_lens, num_cols, size_t);
503 0 0         Newxz(ctypes_str, num_cols, const char *);
504 0 0         Newxz(ctype_lens, num_cols, size_t);
505 0 0         Newxz(ctypes, num_cols, col_type_t *);
506              
507 0 0         for (c = 0; c < num_cols; c++) {
508 0           rc = read_native_string_ref(dbuf, dlen, &dpos,
509 0           &cnames[c], &cname_lens[c]);
510 0 0         if (rc <= 0) {
511 0 0         meta_hard = (rc < 0 || decompressed != NULL);
    0          
512 0           meta_err = "malformed cname";
513 0           goto meta_cleanup;
514             }
515 0           rc = read_native_string_ref(dbuf, dlen, &dpos,
516 0           &ctypes_str[c], &ctype_lens[c]);
517 0 0         if (rc <= 0) {
518 0 0         meta_hard = (rc < 0 || decompressed != NULL);
    0          
519 0           meta_err = "malformed ctype";
520 0           goto meta_cleanup;
521             }
522 0           ctypes[c] = parse_col_type(ctypes_str[c], ctype_lens[c]);
523              
524             /* custom serialization flag (revision >= 54446) */
525 0 0         if (dpos >= dlen) {
526 0           meta_hard = (decompressed != NULL);
527 0           meta_err = "truncated custom_ser";
528 0           goto meta_cleanup;
529             }
530 0 0         if ((uint8_t)dbuf[dpos]) {
531 0           meta_hard = 1;
532 0           meta_err = "custom serialization not supported";
533 0           goto meta_cleanup;
534             }
535 0           dpos++;
536             }
537 0           goto meta_ok;
538              
539 0           meta_cleanup:
540 0 0         for (c = 0; c < num_cols; c++) if (ctypes[c]) free_col_type(ctypes[c]);
    0          
541 0           Safefree(cnames); Safefree(cname_lens);
542 0           Safefree(ctypes_str); Safefree(ctype_lens);
543 0           Safefree(ctypes);
544 0 0         if (decompressed) Safefree(decompressed);
545 0 0         if (meta_hard) { *errmsg = safe_strdup(meta_err); return -1; }
546 0           return 0;
547              
548 0           meta_ok: ;
549              
550             /* Build binary data block from stored data */
551 0 0         if (self->insert_av) {
552 0           data_pkt = build_native_insert_data_from_av(aTHX_ self,
553 0           (AV *)SvRV(self->insert_av),
554             cnames, cname_lens, ctypes_str, ctype_lens,
555             ctypes, (int)num_cols, &data_pkt_len);
556             } else {
557 0           data_pkt = build_native_insert_data(self,
558 0           self->insert_data, self->insert_data_len,
559             cnames, cname_lens, ctypes_str, ctype_lens,
560             ctypes, (int)num_cols, &data_pkt_len);
561             }
562              
563 0 0         for (c = 0; c < num_cols; c++)
564 0           free_col_type(ctypes[c]);
565 0           Safefree(cnames); Safefree(cname_lens);
566 0           Safefree(ctypes_str); Safefree(ctype_lens);
567 0           Safefree(ctypes);
568              
569             {
570             /* Check encode-failure sentinel before freeing insert data */
571 0 0         int encode_failed = (!data_pkt && data_pkt_len == (size_t)-1);
    0          
572              
573             /* Free stored INSERT data */
574 0 0         CLEAR_INSERT(self);
    0          
575              
576 0 0         if (decompressed) Safefree(decompressed);
577 0           else pos = dpos;
578 0           recv_consume(self, pos);
579              
580 0 0         if (!data_pkt) {
581             /* Send empty Data block to complete the INSERT protocol */
582             native_buf_t fallback;
583 0           nbuf_init(&fallback);
584 0           nbuf_empty_data_block(&fallback, self->compress);
585 0           data_pkt = fallback.data;
586 0           data_pkt_len = fallback.len;
587 0 0         if (encode_failed)
588 0           self->insert_err = safe_strdup(
589             "native INSERT encoding failed (unsupported type)");
590             }
591             }
592              
593             /* Send the data block — write to send_buf and start writing */
594 0           self->native_state = NATIVE_WAIT_RESULT;
595 0           send_replace(self, data_pkt, data_pkt_len);
596 0 0         if (try_write(self)) return -2;
597 0           return 1;
598             }
599              
600             /* INSERT two-phase with 0-column sample block: free data,
601             * send empty Data block, transition to WAIT_RESULT */
602 0 0         if (self->native_state == NATIVE_WAIT_INSERT_META
603 0 0         && (self->insert_data || self->insert_av) && num_cols == 0) {
    0          
    0          
604             native_buf_t fallback;
605              
606 0 0         CLEAR_INSERT(self);
    0          
607              
608 0 0         if (decompressed) Safefree(decompressed);
609 0           else pos = dpos;
610 0           recv_consume(self, pos);
611              
612 0           nbuf_init(&fallback);
613 0           nbuf_empty_data_block(&fallback, self->compress);
614 0           self->native_state = NATIVE_WAIT_RESULT;
615 0           send_replace(self, fallback.data, fallback.len);
616 0           self->insert_err = safe_strdup(
617             "INSERT failed: server sent 0-column sample block");
618 0 0         if (try_write(self)) return -2;
619 0           return 1;
620             }
621              
622             /* Normal empty block — capture column names/types so callers can
623             * inspect schema after a zero-row SELECT. Reset only when we have
624             * columns to record so the terminating empty block (num_cols=0)
625             * doesn't wipe the schema we already captured; this also drops
626             * any partial state from a need-more retry. */
627             {
628             uint64_t c;
629 0 0         if (num_cols > 0) {
630 0 0         CLEAR_SV(self->native_col_names);
631 0 0         CLEAR_SV(self->native_col_types);
632 0           self->native_col_names = newAV();
633 0           self->native_col_types = newAV();
634             }
635 0 0         for (c = 0; c < num_cols; c++) {
636             const char *cname, *ctype;
637             size_t cname_len, ctype_len;
638              
639 0           rc = read_native_string_ref(dbuf, dlen, &dpos, &cname, &cname_len);
640 0 0         if (rc <= 0) {
641 0 0         if (decompressed) Safefree(decompressed);
642 0 0         if (rc < 0) { *errmsg = safe_strdup("malformed cname"); return -1; }
643 0           return 0;
644             }
645 0           av_push(self->native_col_names, newSVpvn(cname, cname_len));
646              
647 0           rc = read_native_string_ref(dbuf, dlen, &dpos, &ctype, &ctype_len);
648 0 0         if (rc <= 0) {
649 0 0         if (decompressed) Safefree(decompressed);
650 0 0         if (rc < 0) { *errmsg = safe_strdup("malformed ctype"); return -1; }
651 0           return 0;
652             }
653 0           av_push(self->native_col_types, newSVpvn(ctype, ctype_len));
654              
655             /* custom serialization flag (revision >= 54446) */
656 0 0         if (dpos >= dlen) {
657 0 0         if (decompressed) Safefree(decompressed);
658 0           return 0;
659             }
660 0 0         if ((uint8_t)dbuf[dpos]) {
661 0 0         if (decompressed) Safefree(decompressed);
662 0           *errmsg = safe_strdup("custom serialization not supported");
663 0           return -1;
664             }
665 0           dpos++;
666             }
667             }
668 0 0         if (decompressed) Safefree(decompressed);
669 0           else pos = dpos; /* uncompressed: advance pos to match dpos */
670 0           recv_consume(self, pos);
671 0           return 1;
672             }
673              
674             /* Decode columns and convert to rows */
675             {
676 0           SV ***columns = NULL;
677 0           col_type_t **col_types = NULL;
678 0           const char **cnames = NULL;
679 0           size_t *cname_lens = NULL;
680             uint64_t c, r;
681 0           int named = (self->decode_flags & DECODE_NAMED_ROWS) ? 1 : 0;
682              
683             /* More columns than remaining wire bytes is malformed; reject
684             * before allocating so a bogus count can't drive a huge Newxz. */
685 0 0         if (num_cols > (uint64_t)(dlen - dpos)) {
686 0 0         if (decompressed) Safefree(decompressed);
687 0           *errmsg = safe_strdup("too many columns");
688 0           return -1;
689             }
690 0 0         Newxz(columns, num_cols, SV**);
691 0 0         Newxz(col_types, num_cols, col_type_t*);
692 0 0         if (named) {
693 0 0         Newxz(cnames, num_cols, const char *);
694 0 0         Newx(cname_lens, num_cols, size_t);
695             }
696              
697 0 0         for (c = 0; c < num_cols; c++) {
698             const char *cname, *ctype;
699             size_t cname_len, ctype_len;
700              
701 0           rc = read_native_string_ref(dbuf, dlen, &dpos, &cname, &cname_len);
702 0 0         if (rc == 0) {
703 0 0         if (decompressed) { *errmsg = safe_strdup("truncated cname"); goto data_error; }
704 0           goto data_need_more;
705             }
706 0 0         if (rc < 0) { *errmsg = safe_strdup("malformed cname"); goto data_error; }
707              
708 0 0         if (named) {
709 0           cnames[c] = cname;
710 0           cname_lens[c] = cname_len;
711             }
712              
713             /* Allocate AVs on first column of first data block. Reset
714             * any partial state from a need-more retry: pipeline_advance
715             * cleared at dispatch, but if we returned 0 mid-loop the AVs
716             * may now hold a partial column list. */
717 0 0         if (c == 0) {
718 0 0         CLEAR_SV(self->native_col_names);
719 0 0         CLEAR_SV(self->native_col_types);
720 0           self->native_col_names = newAV();
721 0           self->native_col_types = newAV();
722             }
723 0           av_push(self->native_col_names, newSVpvn(cname, cname_len));
724              
725 0           rc = read_native_string_ref(dbuf, dlen, &dpos, &ctype, &ctype_len);
726 0 0         if (rc == 0) {
727 0 0         if (decompressed) { *errmsg = safe_strdup("truncated ctype"); goto data_error; }
728 0           goto data_need_more;
729             }
730 0 0         if (rc < 0) { *errmsg = safe_strdup("malformed ctype"); goto data_error; }
731              
732 0           col_types[c] = parse_col_type(ctype, ctype_len);
733 0           av_push(self->native_col_types, newSVpvn(ctype, ctype_len));
734              
735             /* custom serialization flag (revision >= 54446) */
736 0 0         if (dpos >= dlen) {
737 0 0         if (decompressed) { *errmsg = safe_strdup("truncated custom_ser"); goto data_error; }
738 0           goto data_need_more;
739             }
740 0 0         if ((uint8_t)dbuf[dpos]) {
741 0           *errmsg = safe_strdup("custom serialization not supported");
742 0           goto data_error;
743             }
744 0           dpos++;
745              
746             /* Allocate LC dict state on first column of first block */
747 0 0         if (c == 0 && !self->lc_dicts) {
    0          
748 0 0         Newxz(self->lc_dicts, num_cols, SV**);
749 0 0         Newxz(self->lc_dict_sizes, num_cols, uint64_t);
750 0           self->lc_num_cols = (int)num_cols;
751             }
752              
753             {
754 0           int col_err = 0;
755 0           columns[c] = decode_column_ex(dbuf, dlen, &dpos, num_rows, col_types[c], &col_err, self->decode_flags, self, (int)c);
756 0 0         if (!columns[c]) {
757 0 0         if (col_err || decompressed) {
    0          
758 0           *errmsg = safe_strdup("decode_column failed");
759 0           goto data_error;
760             }
761 0           goto data_need_more;
762             }
763             }
764             }
765              
766             /* Convert column-oriented to row-oriented */
767             {
768             AV **target;
769 0 0         if (ptype == SERVER_TOTALS) {
770 0 0         if (!self->native_totals) self->native_totals = newAV();
771 0           target = &self->native_totals;
772 0 0         } else if (ptype == SERVER_EXTREMES) {
773 0 0         if (!self->native_extremes) self->native_extremes = newAV();
774 0           target = &self->native_extremes;
775             } else {
776 0 0         if (!self->native_rows) self->native_rows = newAV();
777 0           target = &self->native_rows;
778             }
779              
780 0 0         if (named) {
781 0 0         for (r = 0; r < num_rows; r++) {
782 0           HV *hv = newHV();
783 0 0         for (c = 0; c < num_cols; c++) {
784 0 0         if (!hv_store(hv, cnames[c], cname_lens[c], columns[c][r], 0))
785 0           SvREFCNT_dec(columns[c][r]);
786             }
787 0           av_push(*target, newRV_noinc((SV*)hv));
788             }
789             } else {
790 0 0         for (r = 0; r < num_rows; r++) {
791 0           AV *row = newAV();
792 0 0         if (num_cols > 0)
793 0           av_extend(row, num_cols - 1);
794 0 0         for (c = 0; c < num_cols; c++) {
795 0           av_push(row, columns[c][r]);
796             }
797 0           av_push(*target, newRV_noinc((SV*)row));
798             }
799             }
800             }
801              
802             /* Fire on_data streaming callback if set (only for DATA, not TOTALS/EXTREMES) */
803             {
804 0 0         SV *on_data = (ptype == SERVER_DATA) ? peek_cb_on_data(self) : NULL;
805 0 0         if (on_data && self->native_rows) {
    0          
806             /* Hold a reference across call_sv: a reentrant
807             * skip_pending() / cancel() in the handler would
808             * otherwise pop the cb_queue entry and free this
809             * callback while we're still invoking it. */
810 0           SvREFCNT_inc(on_data);
811 0           self->callback_depth++;
812             {
813 0           dSP;
814 0           ENTER; SAVETMPS;
815 0 0         PUSHMARK(SP);
816 0           PUSHs(sv_2mortal(newRV_inc((SV*)self->native_rows)));
817 0           PUTBACK;
818 0           call_sv(on_data, G_DISCARD | G_EVAL);
819 0 0         WARN_AND_CLEAR_ERRSV("on_data handler");
    0          
    0          
    0          
820 0 0         FREETMPS; LEAVE;
821             }
822             /* Decrement on_data BEFORE callback_depth-- so a DESTROY
823             * triggered by the dec (closure holding last $ch ref)
824             * still sees callback_depth > 0 and defers Safefree(self). */
825 0           SvREFCNT_dec(on_data);
826 0           self->callback_depth--;
827             /* Clear accumulated rows for next block */
828 0 0         CLEAR_SV(self->native_rows);
829 0 0         if (check_destroyed(self)) {
830 0 0         if (cnames) Safefree(cnames);
831 0 0         if (cname_lens) Safefree(cname_lens);
832 0 0         for (c = 0; c < num_cols; c++) {
833 0           Safefree(columns[c]);
834 0           free_col_type(col_types[c]);
835             }
836 0           Safefree(columns); Safefree(col_types);
837 0 0         if (decompressed) Safefree(decompressed);
838 0           return -2;
839             }
840             }
841             }
842              
843             /* Cleanup column arrays (SVs moved to rows, don't dec refcnt) */
844 0 0         for (c = 0; c < num_cols; c++) {
845 0           Safefree(columns[c]);
846 0           free_col_type(col_types[c]);
847             }
848 0           Safefree(columns);
849 0           Safefree(col_types);
850 0 0         if (cnames) Safefree(cnames);
851 0 0         if (cname_lens) Safefree(cname_lens);
852 0 0         if (decompressed) Safefree(decompressed);
853 0           else pos = dpos; /* uncompressed: advance pos to match dpos */
854              
855             /* Consume from recv_buf */
856 0           recv_consume(self, pos);
857 0           return 1;
858              
859 0           data_error:
860 0           data_need_more:
861             /* Cleanup partial decode */
862 0 0         for (c = 0; c < num_cols; c++) {
863 0 0         if (columns[c]) {
864             uint64_t j;
865 0 0         for (j = 0; j < num_rows; j++) {
866 0 0         if (columns[c][j]) SvREFCNT_dec(columns[c][j]);
867             }
868 0           Safefree(columns[c]);
869             }
870 0 0         if (col_types[c]) free_col_type(col_types[c]);
871             }
872 0           Safefree(columns);
873 0           Safefree(col_types);
874 0 0         if (cnames) Safefree(cnames);
875 0 0         if (cname_lens) Safefree(cname_lens);
876 0 0         if (decompressed) Safefree(decompressed);
877 0 0         if (*errmsg) {
878             /* data_error: flush recv_buf — data is malformed, cannot resume */
879 0           self->recv_len = 0;
880 0           return -1;
881             }
882 0           return 0;
883             }
884             }
885              
886 0           case SERVER_EXCEPTION: {
887             /* code: Int32, name: String, message: String,
888             * stack_trace: String, has_nested: UInt8 */
889             int32_t code;
890             const char *name, *msg, *stack;
891             size_t name_len, msg_len, stack_len;
892             uint8_t has_nested;
893             char *err;
894              
895             /* We just read the top-level exception */
896 0           rc = read_i32(buf, len, &pos, &code);
897 0 0         if (rc == 0) return 0;
898 0 0         if (rc < 0) { *errmsg = safe_strdup("malformed exception code"); return -1; }
899              
900 0           rc = read_native_string_ref(buf, len, &pos, &name, &name_len);
901 0 0         if (rc == 0) return 0;
902 0 0         if (rc < 0) { *errmsg = safe_strdup("malformed exception name"); return -1; }
903              
904 0           rc = read_native_string_ref(buf, len, &pos, &msg, &msg_len);
905 0 0         if (rc == 0) return 0;
906 0 0         if (rc < 0) { *errmsg = safe_strdup("malformed exception message"); return -1; }
907              
908 0           rc = read_native_string_ref(buf, len, &pos, &stack, &stack_len);
909 0 0         if (rc == 0) return 0;
910 0 0         if (rc < 0) { *errmsg = safe_strdup("malformed exception stack"); return -1; }
911              
912 0           rc = read_u8(buf, len, &pos, &has_nested);
913 0 0         if (rc == 0) return 0;
914 0 0         if (rc < 0) { *errmsg = safe_strdup("malformed exception has_nested"); return -1; }
915              
916             /* Skip nested exceptions — keep the top-level code, not the innermost. */
917 0 0         while (has_nested) {
918             int32_t nested_code;
919             int i;
920 0           rc = read_i32(buf, len, &pos, &nested_code);
921 0 0         if (rc == 0) return 0;
922 0 0         if (rc < 0) { *errmsg = safe_strdup("malformed nested exception"); return -1; }
923             /* name, message, stack_trace */
924 0 0         for (i = 0; i < 3; i++) {
925 0           rc = skip_native_string(buf, len, &pos);
926 0 0         if (rc == 0) return 0;
927 0 0         if (rc < 0) { *errmsg = safe_strdup("malformed nested exception"); return -1; }
928             }
929 0           rc = read_u8(buf, len, &pos, &has_nested);
930 0 0         if (rc == 0) return 0;
931 0 0         if (rc < 0) { *errmsg = safe_strdup("malformed nested exception"); return -1; }
932             }
933              
934 0           self->last_error_code = code;
935              
936 0           Newx(err, msg_len + name_len + 64, char);
937 0           snprintf(err, msg_len + name_len + 64, "Code: %d. %.*s: %.*s",
938             (int)code, (int)name_len, name, (int)msg_len, msg);
939              
940 0           recv_consume(self, pos);
941              
942 0           *errmsg = err;
943 0           return -1;
944             }
945              
946 0           case SERVER_PROGRESS: {
947             /* rows, bytes, total_rows, written_rows (>=54420), written_bytes (>=54420) */
948 0           uint64_t pp[5] = {0};
949 0 0         int n = (self->server_revision >= DBMS_MIN_REVISION_WITH_PROGRESS_WRITES) ? 5 : 3;
950             int i;
951 0 0         for (i = 0; i < n; i++) {
952 0           rc = read_varuint(buf, len, &pos, &pp[i]);
953 0 0         if (rc == 0) return 0;
954 0 0         if (rc < 0) { *errmsg = safe_strdup("malformed progress packet"); return -1; }
955             }
956              
957 0           recv_consume(self, pos);
958              
959 0 0         if (self->on_progress) {
960             /* Coalesce when progress_period is set: the protocol sends
961             * incremental deltas, so accumulate and fire at the cadence. */
962 0 0         if (self->progress_period > 0) {
963 0 0         for (i = 0; i < 5; i++) self->progress_acc[i] += pp[i];
964 0           double now = ev_now(self->loop);
965 0 0         if (now - self->progress_last < self->progress_period)
966 0           return 1;
967 0           self->progress_last = now;
968 0 0         for (i = 0; i < 5; i++) { pp[i] = self->progress_acc[i]; self->progress_acc[i] = 0; }
969             }
970              
971 0 0         if (fire_progress_cb(self, pp) < 0) return -2;
972             }
973              
974 0           return 1;
975             }
976              
977 0           case SERVER_PROFILE_INFO: {
978             /* rows, blocks, bytes, applied_limit, rows_before_limit, calc_rows_before_limit */
979             uint64_t pi[6];
980             int i;
981 0 0         for (i = 0; i < 6; i++) {
982 0           rc = read_varuint(buf, len, &pos, &pi[i]);
983 0 0         if (rc == 0) return 0;
984 0 0         if (rc < 0) { *errmsg = safe_strdup("malformed profile_info packet"); return -1; }
985             }
986 0           self->profile_rows = pi[0];
987 0           self->profile_bytes = pi[2];
988 0           self->profile_rows_before_limit = pi[4];
989 0           recv_consume(self, pos);
990 0           return 1;
991             }
992              
993 0           case SERVER_TABLE_COLUMNS: {
994             /* Format: string(table_name) + string(column_description) */
995             int i;
996 0 0         for (i = 0; i < 2; i++) {
997 0           rc = skip_native_string(buf, len, &pos);
998 0 0         if (rc == 0) return 0;
999 0 0         if (rc < 0) { *errmsg = safe_strdup("malformed table_columns packet"); return -1; }
1000             }
1001 0           recv_consume(self, pos);
1002 0           return 1;
1003             }
1004              
1005 0           case SERVER_LOG: {
1006 0 0         if (self->on_log) {
1007 0           rc = parse_and_emit_log_block(self, buf, len, &pos, self->on_log, errmsg);
1008             } else {
1009 0           rc = parse_and_discard_block(self, buf, len, &pos, "server log", 0, errmsg);
1010             }
1011 0 0         if (rc <= 0) return rc;
1012 0           recv_consume(self, pos);
1013 0           return 1;
1014             }
1015              
1016 0           case SERVER_PROFILE_EVENTS: {
1017 0           rc = parse_and_discard_block(self, buf, len, &pos, "profile_events", 1, errmsg);
1018 0 0         if (rc <= 0) return rc;
1019 0           recv_consume(self, pos);
1020 0           return 1;
1021             }
1022              
1023 0           case SERVER_TIMEZONE_UPDATE: {
1024             /* Server-side session timezone changed mid-query. Format: string(tz). */
1025 0           char *tz = NULL;
1026 0           rc = read_native_string_alloc(buf, len, &pos, &tz, NULL);
1027 0 0         if (rc == 0) return 0;
1028 0 0         if (rc < 0) { *errmsg = safe_strdup("malformed timezone_update packet"); return -1; }
1029 0 0         CLEAR_STR(self->server_timezone);
1030 0           self->server_timezone = tz;
1031 0           recv_consume(self, pos);
1032 0           return 1;
1033             }
1034              
1035 0           case SERVER_PONG:
1036 0           recv_consume(self, pos);
1037 0           return 3;
1038              
1039 0           case SERVER_END_OF_STREAM:
1040 0           recv_consume(self, pos);
1041 0           return 2;
1042              
1043 0           default: {
1044             /* Unknown packet type */
1045             char err[64];
1046 0           snprintf(err, sizeof(err), "unknown server packet type: %llu",
1047             (unsigned long long)ptype);
1048 0           *errmsg = safe_strdup(err);
1049 0           self->recv_len = 0;
1050 0           return -1;
1051             }
1052             }
1053             }
1054              
1055             /*
1056             * Process native protocol responses from recv_buf.
1057             * Called from on_readable when protocol == PROTO_NATIVE.
1058             */
1059 0           static void process_native_response(ev_clickhouse_t *self) {
1060 0 0         while (self->recv_len > 0 && self->magic == EV_CH_MAGIC) {
    0          
1061 0           char *errmsg = NULL;
1062             int rc;
1063 0           rc = parse_native_packet(self, &errmsg);
1064              
1065 0 0         if (rc == 0) {
1066             /* need more data */
1067 0           return;
1068             }
1069              
1070 0 0         if (rc == -2) {
1071             /* object destroyed inside callback */
1072 0           return;
1073             }
1074              
1075 0 0         if (rc == 4) {
1076             /* ServerHello received — send addendum (revision >= 54458) */
1077 0 0         if (self->native_state == NATIVE_WAIT_HELLO) {
1078             /* Addendum: quota_key (only if server supports it) */
1079 0 0         if (self->server_revision >= DBMS_MIN_PROTOCOL_VERSION_WITH_ADDENDUM) {
1080             native_buf_t ab;
1081 0           nbuf_init(&ab);
1082 0           nbuf_cstring(&ab, ""); /* quota_key */
1083 0           send_replace(self, ab.data, ab.len);
1084 0 0         if (try_write(self)) return;
1085 0 0         if (self->send_pos < self->send_len) {
1086             /* Addendum partially written (EAGAIN); io_cb finishes
1087             * connect once the buffer drains, otherwise pipeline_advance
1088             * could overwrite the unsent tail with a queued query. */
1089 0           self->pending_addendum_finish = 1;
1090 0           return;
1091             }
1092             }
1093 0           self->native_state = NATIVE_IDLE;
1094 0 0         if (finish_connect(self)) return;
1095             }
1096             /* pipeline_advance -> try_write may free self; no data
1097             * in recv_buf for the just-dispatched request yet */
1098 0           return;
1099             }
1100              
1101 0 0         if (rc == -1) {
1102             /* error */
1103 0 0         if (self->native_state == NATIVE_WAIT_HELLO) {
1104             /* Skip auto_reconnect on a malformed ServerHello: the peer
1105             * isn't a ClickHouse server, so retrying just spins.
1106             * (connect_timeout still uses fail_connection, which retries.) */
1107 0           teardown_io_error(self, errmsg, "connection failed");
1108 0           Safefree(errmsg);
1109 0           return;
1110             }
1111              
1112             /* Stop query timeout timer */
1113 0           stop_timing(self);
1114              
1115             /* Query error — deliver to callback */
1116 0 0         CLEAR_SV(self->native_rows);
1117 0 0         CLEAR_INSERT(self);
    0          
1118 0 0         CLEAR_STR(self->insert_err);
1119 0           self->native_state = NATIVE_IDLE;
1120 0           self->recv_len = 0; /* flush malformed data */
1121 0 0         if (self->send_count > 0) self->send_count--;
1122 0           lc_free_dicts(self);
1123 0           int destroyed = deliver_error(self, errmsg);
1124 0           Safefree(errmsg);
1125 0 0         if (destroyed) return;
1126              
1127             /* advance pipeline — may free self via try_write error */
1128 0           pipeline_advance(self);
1129 0           return;
1130             }
1131              
1132 0 0         if (rc == 2) {
1133             /* EndOfStream — deliver accumulated rows or deferred error */
1134 0           stop_timing(self);
1135 0           self->native_state = NATIVE_IDLE;
1136 0 0         if (self->send_count > 0) self->send_count--;
1137 0           lc_free_dicts(self);
1138              
1139             /* Flush any uncoalesced progress accumulated since the last
1140             * fire so users instrumenting via on_progress see the full
1141             * total when the query completes within one progress_period
1142             * of the last fire. */
1143 0 0         if (self->on_progress && self->progress_period > 0) {
    0          
1144 0           int any = 0, i;
1145 0 0         for (i = 0; i < 5; i++) if (self->progress_acc[i]) { any = 1; break; }
    0          
1146 0 0         if (any) {
1147             uint64_t pp[5];
1148 0           memcpy(pp, self->progress_acc, sizeof(pp));
1149 0           memset(self->progress_acc, 0, sizeof(self->progress_acc));
1150 0 0         if (fire_progress_cb(self, pp) < 0) return;
1151             }
1152             }
1153              
1154 0 0         if (self->insert_err) {
1155 0           char *err = self->insert_err;
1156 0           self->insert_err = NULL;
1157 0 0         CLEAR_SV(self->native_rows);
1158 0           int destroyed = deliver_error(self, err);
1159 0           Safefree(err);
1160 0 0         if (destroyed) return;
1161             } else {
1162 0           AV *rows = self->native_rows;
1163 0           self->native_rows = NULL;
1164 0 0         if (deliver_rows(self, rows)) return;
1165             }
1166              
1167             /* advance pipeline — may free self via try_write error */
1168 0           pipeline_advance(self);
1169 0           return;
1170             }
1171              
1172 0 0         if (rc == 3) {
1173             /* Pong — ack a keepalive ping, or deliver to user's ping() cb */
1174 0           self->native_state = NATIVE_IDLE;
1175 0 0         if (self->ka_in_flight > 0) {
1176             /* Keepalive ack: not tied to send_count or any user cb */
1177 0           self->ka_in_flight--;
1178 0           continue;
1179             }
1180 0           stop_timing(self);
1181 0 0         if (self->send_count > 0) self->send_count--;
1182 0           AV *rows = newAV();
1183 0 0         if (deliver_rows(self, rows)) return;
1184 0           pipeline_advance(self);
1185 0           return;
1186             }
1187              
1188             /* rc == 1: Data/Progress/ProfileInfo — continue reading */
1189             }
1190             }
1191