File Coverage

xs/types.c
Criterion Covered Total %
statement 0 1911 0.0
branch 0 1750 0.0
condition n/a
subroutine n/a
pod n/a
total 0 3661 0.0


line stmt bran cond sub pod time code
1             /* --- JSON column helpers (ported from Clickhouse::Encoder) ---
2             * ClickHouse's stable JSON / Object('json') type (24.8+). Native wire
3             * layout (V1+V2+V3) is documented in Clickhouse-Encoder/doc/json-research.
4             * The encoder accepts a Perl hashref per row (auto-flattened to dotted
5             * paths) and the decoder returns the same shape on the way back.
6             * Supported leaf kinds: Int64, Float64, Bool, String, Array().
7             */
8             typedef enum {
9             JV_ARRAY_BOOL = 0,
10             JV_ARRAY_FLOAT64,
11             JV_ARRAY_INT64,
12             JV_ARRAY_STRING,
13             JV_BOOL,
14             JV_FLOAT64,
15             JV_INT64,
16             JV_STRING,
17             JV_KIND_COUNT
18             } json_kind_t;
19              
20             /* Lex-sort position (0..N) for each kind in the variant list with
21             * "SharedVariant" inserted. Sorted order:
22             * "Array(Bool)" < "Array(Float64)" < "Array(Int64)" < "Array(String)"
23             * < "Bool" < "Float64" < "Int64" < "SharedVariant" < "String".
24             * SharedVariant takes lex pos 7; "String" follows it. */
25             static const int json_kind_to_lex_pos[JV_KIND_COUNT] = {
26             /* JV_ARRAY_BOOL */ 0,
27             /* JV_ARRAY_FLOAT64 */ 1,
28             /* JV_ARRAY_INT64 */ 2,
29             /* JV_ARRAY_STRING */ 3,
30             /* JV_BOOL */ 4,
31             /* JV_FLOAT64 */ 5,
32             /* JV_INT64 */ 6,
33             /* JV_STRING */ 8
34             };
35              
36             static const char * const json_kind_type_name[JV_KIND_COUNT] = {
37             "Array(Bool)", "Array(Float64)", "Array(Int64)", "Array(String)",
38             "Bool", "Float64", "Int64", "String"
39             };
40              
41             #define JSON_SHAREDVARIANT_LEX_POS 7
42             #define JSON_LEX_SLOTS 9
43              
44             /* Build the ordered wire-slot table: slots[disc] = kind (-1 for SharedVariant),
45             * skipping kinds absent from mask. Returns the total wire-slot count
46             * (present user kinds + 1 for the always-present SharedVariant slot). */
47 0           static int json_build_lex_table(unsigned mask, int slots[JSON_LEX_SLOTS]) {
48 0           int n = 0, lex;
49 0 0         for (lex = 0; lex < JSON_LEX_SLOTS; lex++) {
50 0 0         if (lex == JSON_SHAREDVARIANT_LEX_POS) { slots[n++] = -1; continue; }
51             int k;
52 0 0         for (k = 0; k < JV_KIND_COUNT; k++) {
53 0 0         if (json_kind_to_lex_pos[k] == lex && (mask & (1u << k))) {
    0          
54 0           slots[n++] = k;
55 0           break;
56             }
57             }
58             }
59 0           return n;
60             }
61              
62 0           static int json_kind_disc_in(int kind, const int slots[JSON_LEX_SLOTS], int n) {
63             int i;
64 0 0         for (i = 0; i < n; i++) if (slots[i] == kind) return i;
    0          
65 0           return -1;
66             }
67              
68 0           static int json_pkg_is_bool(const char *pkg) {
69 0 0         return pkg && (strcmp(pkg, "JSON::PP::Boolean") == 0
    0          
70 0 0         || strcmp(pkg, "Types::Serialiser::Boolean") == 0
71 0 0         || strcmp(pkg, "JSON::XS::Boolean") == 0
72 0 0         || strcmp(pkg, "Cpanel::JSON::XS::Boolean") == 0
73 0 0         || strcmp(pkg, "boolean") == 0);
74             }
75              
76 0           static int json_is_bool_ref(pTHX_ SV *val) {
77 0 0         if (!(SvROK(val) && sv_isobject(val))) return 0;
    0          
78 0           HV *stash = SvSTASH(SvRV(val));
79 0 0         return stash && json_pkg_is_bool(HvNAME(stash));
    0          
    0          
    0          
    0          
    0          
    0          
    0          
80             }
81              
82             /* For an arrayref leaf, infer the element kind. Returns the matching
83             * JV_ARRAY_* kind, or -1 if heterogeneous / unsupported. */
84 0           static int json_classify_array(pTHX_ AV *av) {
85 0           SSize_t n = av_len(av) + 1, i;
86 0           int seen_kind = -1;
87 0 0         for (i = 0; i < n; i++) {
88 0           SV **e = av_fetch(av, i, 0);
89 0 0         if (!e || !SvOK(*e)) continue;
    0          
90             int leaf;
91 0 0         if (SvROK(*e)) {
92 0 0         if (json_is_bool_ref(aTHX_ *e)) leaf = JV_BOOL;
93 0           else return -1; /* nested arrays/hashes not supported in array leaves */
94             }
95             #ifdef SvIsBOOL
96 0 0         else if (SvIsBOOL(*e)) leaf = JV_BOOL;
97             #endif
98 0 0         else if (SvIOK(*e) && !SvNOK(*e)) leaf = JV_INT64;
    0          
99 0 0         else if (SvNOK(*e)) {
100 0           NV nv = SvNV(*e);
101 0           leaf = (nv == (NV)(int64_t)nv
102 0 0         && nv >= (NV)INT64_MIN && nv <= (NV)INT64_MAX)
    0          
103 0 0         ? JV_INT64 : JV_FLOAT64;
104             } else {
105 0           leaf = JV_STRING;
106             }
107 0 0         if (seen_kind == -1) seen_kind = leaf;
108 0 0         else if (seen_kind != leaf) return -1;
109             }
110 0 0         if (seen_kind == -1) return JV_ARRAY_INT64; /* empty/all-null */
111 0           switch (seen_kind) {
112 0           case JV_BOOL: return JV_ARRAY_BOOL;
113 0           case JV_FLOAT64: return JV_ARRAY_FLOAT64;
114 0           case JV_INT64: return JV_ARRAY_INT64;
115 0           case JV_STRING: return JV_ARRAY_STRING;
116 0           default: return -1;
117             }
118             }
119              
120 0           static int json_classify_leaf(pTHX_ SV *val) {
121 0 0         if (json_is_bool_ref(aTHX_ val)) return JV_BOOL;
122             #ifdef SvIsBOOL
123 0 0         if (SvIsBOOL(val)) return JV_BOOL;
124             #endif
125 0 0         if (SvIOK(val) && !SvNOK(val)) return JV_INT64;
    0          
126 0 0         if (SvNOK(val)) {
127 0           NV n = SvNV(val);
128 0 0         if (n == (NV)(int64_t)n && n >= (NV)INT64_MIN && n <= (NV)INT64_MAX)
    0          
    0          
129 0           return JV_INT64;
130 0           return JV_FLOAT64;
131             }
132 0           return JV_STRING;
133             }
134              
135             /* Classify any JSON value SV. Routes arrayrefs through json_classify_array
136             * and scalars/Booleans through json_classify_leaf. Returns -1 for an
137             * unsupported reference (hashref, blessed non-Boolean) or for a
138             * heterogeneous array. */
139 0           static int json_classify_value(pTHX_ SV *val) {
140 0 0         if (SvROK(val) && !json_is_bool_ref(aTHX_ val)) {
    0          
141 0 0         if (SvTYPE(SvRV(val)) != SVt_PVAV) return -1;
142 0           return json_classify_array(aTHX_ (AV*)SvRV(val));
143             }
144 0           return json_classify_leaf(aTHX_ val);
145             }
146              
147 0           static int json_kind_from_type_name(const char *ts, size_t tl) {
148 0 0         if (tl == 4 && memcmp(ts, "Bool", 4) == 0) return JV_BOOL;
    0          
149 0 0         if (tl == 7 && memcmp(ts, "Float64", 7) == 0) return JV_FLOAT64;
    0          
150 0 0         if (tl == 5 && memcmp(ts, "Int64", 5) == 0) return JV_INT64;
    0          
151 0 0         if (tl == 6 && memcmp(ts, "String", 6) == 0) return JV_STRING;
    0          
152 0 0         if (tl == 11 && memcmp(ts, "Array(Bool)", 11) == 0) return JV_ARRAY_BOOL;
    0          
153 0 0         if (tl == 14 && memcmp(ts, "Array(Float64)", 14) == 0) return JV_ARRAY_FLOAT64;
    0          
154 0 0         if (tl == 12 && memcmp(ts, "Array(Int64)", 12) == 0) return JV_ARRAY_INT64;
    0          
155 0 0         if (tl == 13 && memcmp(ts, "Array(String)", 13) == 0) return JV_ARRAY_STRING;
    0          
156 0           return -1;
157             }
158              
159             /* Recursively flatten a JSON value hash into a flat HV of dotted-path names. */
160 0           static void flatten_json_hash(pTHX_ HV *src,
161             const char *prefix, STRLEN prefix_len,
162             HV *out_flat) {
163 0           hv_iterinit(src);
164             HE *he;
165 0 0         while ((he = hv_iternext(src))) {
166             I32 klen;
167 0           char *kstr = hv_iterkey(he, &klen);
168 0           SV *vsv = hv_iterval(src, he);
169 0 0         STRLEN new_len = prefix_len + (prefix_len ? 1 : 0) + klen;
170 0           SV *path_sv = sv_2mortal(newSV(new_len));
171 0           SvPOK_only(path_sv);
172 0           char *pbuf = SvPVX(path_sv);
173 0 0         if (prefix_len) {
174 0           memcpy(pbuf, prefix, prefix_len);
175 0           pbuf[prefix_len] = '.';
176 0           memcpy(pbuf + prefix_len + 1, kstr, klen);
177             } else {
178 0           memcpy(pbuf, kstr, klen);
179             }
180 0           pbuf[new_len] = '\0'; /* keep the POK SV contract NUL-terminated */
181 0           SvCUR_set(path_sv, new_len);
182 0 0         if (SvROK(vsv) && SvTYPE(SvRV(vsv)) == SVt_PVHV
    0          
183 0 0         && !json_is_bool_ref(aTHX_ vsv)) {
184 0 0         if (sv_isobject(vsv))
185 0 0         croak("JSON column: opaque blessed hashref (package '%s') "
    0          
    0          
    0          
    0          
    0          
186             "is not a JSON value; only known Boolean classes "
187             "are accepted as object leaves",
188             HvNAME(SvSTASH(SvRV(vsv))));
189 0           flatten_json_hash(aTHX_ (HV*)SvRV(vsv),
190 0           SvPVX(path_sv), new_len, out_flat);
191             } else {
192 0           (void)hv_store(out_flat, SvPVX(path_sv), new_len,
193             SvREFCNT_inc_simple_NN(vsv), 0);
194             }
195             }
196 0           }
197              
198             /* (path, len) pair for sorting paths whose keys may contain embedded NULs. */
199             typedef struct { char *path; STRLEN len; } json_path_entry_t;
200              
201 0           static int json_cmp_path_entry(const void *a, const void *b) {
202 0           const json_path_entry_t *pa = (const json_path_entry_t *)a;
203 0           const json_path_entry_t *pb = (const json_path_entry_t *)b;
204 0           STRLEN n = pa->len < pb->len ? pa->len : pb->len;
205 0           int r = memcmp(pa->path, pb->path, n);
206 0 0         if (r) return r;
207 0 0         if (pa->len < pb->len) return -1;
208 0 0         if (pa->len > pb->len) return 1;
209 0           return 0;
210             }
211              
212             /* Emit one element of an Array(T) Dynamic variant. */
213 0           static void json_emit_array_elem(pTHX_ native_buf_t *b, SV *ev, int k_match) {
214 0           switch (k_match) {
215 0           case JV_ARRAY_BOOL:
216 0 0         if (!SvOK(ev)) { nbuf_u8(b, 0); break; }
217 0 0         { SV *bv = SvROK(ev) ? SvRV(ev) : ev;
218 0           nbuf_u8(b, SvTRUE(bv) ? 1 : 0); break; }
219 0           case JV_ARRAY_INT64:
220 0 0         nbuf_le64(b, SvOK(ev) ? (uint64_t)(int64_t)SvIV(ev) : 0);
221 0           break;
222 0           case JV_ARRAY_FLOAT64:
223 0 0         nbuf_ledouble(b, SvOK(ev) ? SvNV(ev) : 0.0);
224 0           break;
225 0           case JV_ARRAY_STRING: {
226 0 0         if (!SvOK(ev)) { nbuf_varuint(b, 0); break; }
227             STRLEN sl;
228 0           const char *ss = SvPV(ev, sl);
229 0           nbuf_string(b, ss, sl);
230 0           break;
231             }
232             }
233 0           }
234              
235 0           static void json_emit_scalar(pTHX_ native_buf_t *b, SV *val, int k_match) {
236 0           switch (k_match) {
237 0           case JV_BOOL: {
238 0 0         SV *bv = SvROK(val) ? SvRV(val) : val;
239 0           nbuf_u8(b, SvTRUE(bv) ? 1 : 0);
240 0           break;
241             }
242 0           case JV_INT64:
243 0           nbuf_le64(b, (uint64_t)(int64_t)SvIV(val));
244 0           break;
245 0           case JV_FLOAT64:
246 0           nbuf_ledouble(b, SvNV(val));
247 0           break;
248 0           case JV_STRING: {
249             STRLEN sl;
250 0           const char *s = SvPV(val, sl);
251 0           nbuf_string(b, s, sl);
252 0           break;
253             }
254 0           default: break;
255             }
256 0           }
257              
258             /* --- Native protocol column decoder --- */
259              
260             /* Column type codes for decoding. */
261             enum {
262             CT_INT8, CT_INT16, CT_INT32, CT_INT64,
263             CT_UINT8, CT_UINT16, CT_UINT32, CT_UINT64,
264             CT_FLOAT32, CT_FLOAT64, CT_BFLOAT16,
265             CT_STRING, CT_FIXEDSTRING,
266             CT_ARRAY, CT_NULLABLE,
267             CT_DATE, CT_DATE32, CT_DATETIME, CT_DATETIME64,
268             CT_UUID, CT_ENUM8, CT_ENUM16,
269             CT_DECIMAL32, CT_DECIMAL64, CT_DECIMAL128, CT_DECIMAL256,
270             CT_LOWCARDINALITY, CT_NOTHING,
271             CT_BOOL, CT_IPV4, CT_IPV6,
272             CT_INT128, CT_UINT128,
273             CT_INT256, CT_UINT256,
274             CT_TUPLE, CT_MAP,
275             CT_JSON,
276             CT_VARIANT, CT_DYNAMIC,
277             CT_UNKNOWN
278             };
279              
280             typedef struct col_type_s col_type_t;
281             struct col_type_s {
282             int code;
283             int param; /* FixedString(N), DateTime64 precision, Decimal scale */
284             col_type_t *inner; /* Nullable, Array, LowCardinality */
285             col_type_t **inners; /* Tuple elements, Map key+value, JSON typed paths */
286             char **inner_names; /* JSON typed path names (NULL for other types) */
287             int num_inners;
288             char *type_str; /* full type string (for Enum label lookup) */
289             size_t type_str_len;
290             char *tz; /* timezone for DateTime/DateTime64 (NULL = UTC) */
291             };
292              
293 0           static void free_col_type(col_type_t *t) {
294             int i;
295 0 0         if (!t) return;
296 0 0         if (t->inner) free_col_type(t->inner);
297 0 0         if (t->inners) {
298 0 0         for (i = 0; i < t->num_inners; i++)
299 0           free_col_type(t->inners[i]);
300 0           Safefree(t->inners);
301             }
302 0 0         if (t->inner_names) {
303 0 0         for (i = 0; i < t->num_inners; i++)
304 0 0         if (t->inner_names[i]) Safefree(t->inner_names[i]);
305 0           Safefree(t->inner_names);
306             }
307 0 0         if (t->type_str) Safefree(t->type_str);
308 0 0         if (t->tz) Safefree(t->tz);
309 0           Safefree(t);
310             }
311              
312             static col_type_t* parse_col_type(const char *type, size_t len);
313              
314             /*
315             * Parse comma-separated type list inside Tuple(...) or Map(...).
316             * Handles nested parentheses correctly.
317             * Sets t->inners and t->num_inners.
318             */
319 0           static void parse_type_list(col_type_t *t, const char *inner, size_t inner_len) {
320 0           int depth = 0, count = 0;
321 0           size_t i, start = 0;
322              
323             /* Count elements */
324 0 0         for (i = 0; i <= inner_len; i++) {
325 0 0         if (i < inner_len && inner[i] == '(') depth++;
    0          
326 0 0         else if (i < inner_len && inner[i] == ')') depth--;
    0          
327 0 0         else if (i == inner_len || (inner[i] == ',' && depth == 0))
    0          
    0          
328 0           count++;
329             }
330              
331 0           Newxz(t->inners, count, col_type_t*);
332 0           t->num_inners = count;
333              
334             /* Parse each element */
335 0           count = 0;
336 0           depth = 0;
337 0           start = 0;
338 0 0         for (i = 0; i <= inner_len; i++) {
339 0 0         if (i < inner_len && inner[i] == '(') depth++;
    0          
340 0 0         else if (i < inner_len && inner[i] == ')') depth--;
    0          
341 0 0         else if (i == inner_len || (inner[i] == ',' && depth == 0)) {
    0          
    0          
342 0           size_t s = start, e = i;
343 0 0         while (s < e && inner[s] == ' ') s++;
    0          
344 0 0         while (e > s && inner[e-1] == ' ') e--;
    0          
345             /* Strip named tuple field prefix: "name Type" -> "Type" */
346             {
347             size_t sp;
348 0 0         for (sp = s; sp < e; sp++) {
349 0 0         if (inner[sp] == '(') break; /* type with parens, stop */
350 0 0         if (inner[sp] == ' ') { s = sp + 1; break; }
351             }
352             }
353 0           t->inners[count++] = parse_col_type(inner + s, e - s);
354 0           start = i + 1;
355             }
356             }
357 0           }
358              
359 0           static col_type_t* parse_col_type(const char *type, size_t len) {
360             col_type_t *t;
361 0           Newxz(t, 1, col_type_t);
362              
363 0 0         if (len == 4 && memcmp(type, "Int8", 4) == 0) t->code = CT_INT8;
    0          
364 0 0         else if (len == 5 && memcmp(type, "Int16", 5) == 0) t->code = CT_INT16;
    0          
365 0 0         else if (len == 5 && memcmp(type, "Int32", 5) == 0) t->code = CT_INT32;
    0          
366 0 0         else if (len == 5 && memcmp(type, "Int64", 5) == 0) t->code = CT_INT64;
    0          
367 0 0         else if (len > 8 && memcmp(type, "Interval", 8) == 0) t->code = CT_INT64;
    0          
368 0 0         else if (len == 5 && memcmp(type, "UInt8", 5) == 0) t->code = CT_UINT8;
    0          
369 0 0         else if (len == 6 && memcmp(type, "UInt16", 6) == 0) t->code = CT_UINT16;
    0          
370 0 0         else if (len == 6 && memcmp(type, "UInt32", 6) == 0) t->code = CT_UINT32;
    0          
371 0 0         else if (len == 6 && memcmp(type, "UInt64", 6) == 0) t->code = CT_UINT64;
    0          
372 0 0         else if (len == 7 && memcmp(type, "Float32", 7) == 0) t->code = CT_FLOAT32;
    0          
373 0 0         else if (len == 7 && memcmp(type, "Float64", 7) == 0) t->code = CT_FLOAT64;
    0          
374 0 0         else if (len == 6 && memcmp(type, "String", 6) == 0) t->code = CT_STRING;
    0          
375 0 0         else if (len > 12 && memcmp(type, "FixedString(", 12) == 0) {
    0          
376 0           t->code = CT_FIXEDSTRING;
377 0           t->param = atoi(type + 12);
378             }
379 0 0         else if (len > 6 && memcmp(type, "Array(", 6) == 0) {
    0          
380 0           t->code = CT_ARRAY;
381 0           t->inner = parse_col_type(type + 6, len - 7);
382             }
383 0 0         else if (len > 9 && memcmp(type, "Nullable(", 9) == 0) {
    0          
384 0           t->code = CT_NULLABLE;
385 0           t->inner = parse_col_type(type + 9, len - 10);
386             }
387 0 0         else if (len > 15 && memcmp(type, "LowCardinality(", 15) == 0) {
    0          
388 0           t->code = CT_LOWCARDINALITY;
389 0           t->inner = parse_col_type(type + 15, len - 16);
390             }
391 0 0         else if (len == 4 && memcmp(type, "Date", 4) == 0) t->code = CT_DATE;
    0          
392 0 0         else if (len == 6 && memcmp(type, "Date32", 6) == 0) t->code = CT_DATE32;
    0          
393 0 0         else if (len == 8 && memcmp(type, "DateTime", 8) == 0) t->code = CT_DATETIME;
    0          
394 0 0         else if (len > 9 && memcmp(type, "DateTime(", 9) == 0) {
    0          
395 0           t->code = CT_DATETIME;
396             /* DateTime('timezone') — extract timezone */
397 0           {
398 0           const char *q = memchr(type + 9, '\'', len - 9);
399 0 0         if (q) {
400 0           const char *qe = memchr(q + 1, '\'', type + len - q - 1);
401 0 0         if (qe && qe > q + 1) {
    0          
402 0           size_t tzlen = qe - q - 1;
403 0           Newx(t->tz, tzlen + 1, char);
404 0           Copy(q + 1, t->tz, tzlen, char);
405 0           t->tz[tzlen] = '\0';
406             }
407             }
408             }
409             }
410 0 0         else if (len > 11 && memcmp(type, "DateTime64(", 11) == 0) {
    0          
411 0           t->code = CT_DATETIME64;
412 0           t->param = atoi(type + 11);
413             /* DateTime64(N, 'timezone') — extract timezone */
414 0           {
415 0           const char *comma = memchr(type + 11, ',', len - 11);
416 0 0         if (comma) {
417 0           const char *q = memchr(comma, '\'', type + len - comma);
418 0 0         if (q) {
419 0           const char *qe = memchr(q + 1, '\'', type + len - q - 1);
420 0 0         if (qe && qe > q + 1) {
    0          
421 0           size_t tzlen = qe - q - 1;
422 0           Newx(t->tz, tzlen + 1, char);
423 0           Copy(q + 1, t->tz, tzlen, char);
424 0           t->tz[tzlen] = '\0';
425             }
426             }
427             }
428             }
429             }
430 0 0         else if (len == 4 && memcmp(type, "UUID", 4) == 0) t->code = CT_UUID;
    0          
431 0 0         else if (len > 6 && memcmp(type, "Enum8(", 6) == 0) {
    0          
432 0           t->code = CT_ENUM8;
433 0           Newx(t->type_str, len + 1, char);
434 0           Copy(type, t->type_str, len, char);
435 0           t->type_str[len] = '\0';
436 0           t->type_str_len = len;
437             }
438 0 0         else if (len > 7 && memcmp(type, "Enum16(", 7) == 0) {
    0          
439 0           t->code = CT_ENUM16;
440 0           Newx(t->type_str, len + 1, char);
441 0           Copy(type, t->type_str, len, char);
442 0           t->type_str[len] = '\0';
443 0           t->type_str_len = len;
444             }
445 0 0         else if (len > 10 && memcmp(type, "Decimal32(", 10) == 0) {
    0          
446 0           t->code = CT_DECIMAL32;
447 0           t->param = atoi(type + 10);
448             }
449 0 0         else if (len > 10 && memcmp(type, "Decimal64(", 10) == 0) {
    0          
450 0           t->code = CT_DECIMAL64;
451 0           t->param = atoi(type + 10);
452             }
453 0 0         else if (len > 11 && memcmp(type, "Decimal128(", 11) == 0) {
    0          
454 0           t->code = CT_DECIMAL128;
455 0           t->param = atoi(type + 11);
456             }
457 0 0         else if (len > 11 && memcmp(type, "Decimal256(", 11) == 0) {
    0          
458 0           t->code = CT_DECIMAL256;
459 0           t->param = atoi(type + 11);
460             }
461 0 0         else if (len > 8 && memcmp(type, "Decimal(", 8) == 0) {
    0          
462 0           int precision = atoi(type + 8);
463 0           const char *comma = memchr(type + 8, ',', len - 8);
464 0 0         t->param = comma ? atoi(comma + 1) : 0;
465 0 0         if (precision <= 9) t->code = CT_DECIMAL32;
466 0 0         else if (precision <= 18) t->code = CT_DECIMAL64;
467 0 0         else if (precision <= 38) t->code = CT_DECIMAL128;
468 0           else t->code = CT_DECIMAL256;
469             }
470 0 0         else if (len == 8 && memcmp(type, "BFloat16", 8) == 0) t->code = CT_BFLOAT16;
    0          
471             /* Variant(...) and Dynamic: recognised here so the schema parser
472             * doesn't choke. Wire-format decoding is NOT generic — selecting
473             * one produces a clean decode error from decode_column so the
474             * caller's response parsing doesn't desync. Use
475             * `SELECT toString(col) FROM …` server-side to read the value as
476             * its JSON representation, or `CAST(col AS String)`. */
477 0 0         else if (len > 8 && memcmp(type, "Variant(", 8) == 0) {
    0          
478 0           t->code = CT_VARIANT;
479             }
480 0 0         else if (len == 7 && memcmp(type, "Dynamic", 7) == 0) {
    0          
481 0           t->code = CT_DYNAMIC;
482             }
483 0 0         else if (len == 7 && memcmp(type, "Nothing", 7) == 0) t->code = CT_NOTHING;
    0          
484 0 0         else if (len == 4 && memcmp(type, "Bool", 4) == 0) t->code = CT_BOOL;
    0          
485 0 0         else if (len == 4 && memcmp(type, "IPv4", 4) == 0) t->code = CT_IPV4;
    0          
486 0 0         else if (len == 4 && memcmp(type, "IPv6", 4) == 0) t->code = CT_IPV6;
    0          
487 0 0         else if (len == 6 && memcmp(type, "Int128", 6) == 0) t->code = CT_INT128;
    0          
488 0 0         else if (len == 7 && memcmp(type, "UInt128", 7) == 0) t->code = CT_UINT128;
    0          
489 0 0         else if (len == 6 && memcmp(type, "Int256", 6) == 0) t->code = CT_INT256;
    0          
490 0 0         else if (len == 7 && memcmp(type, "UInt256", 7) == 0) t->code = CT_UINT256;
    0          
491 0 0         else if (len > 6 && memcmp(type, "Tuple(", 6) == 0) {
    0          
492 0           t->code = CT_TUPLE;
493 0           parse_type_list(t, type + 6, len - 7);
494             }
495 0 0         else if (len > 4 && memcmp(type, "Map(", 4) == 0) {
    0          
496 0           t->code = CT_MAP;
497 0           parse_type_list(t, type + 4, len - 5);
498             }
499 0 0         else if (len > 7 && memcmp(type, "Nested(", 7) == 0) {
    0          
500             /* Nested(name1 Type1, name2 Type2) = Array(Tuple(Type1, Type2)) */
501             col_type_t *tuple;
502 0           Newxz(tuple, 1, col_type_t);
503 0           tuple->code = CT_TUPLE;
504 0           parse_type_list(tuple, type + 7, len - 8);
505 0           t->code = CT_ARRAY;
506 0           t->inner = tuple;
507             }
508 0 0         else if ((len == 4 && memcmp(type, "JSON", 4) == 0)
    0          
509 0 0         || (len > 5 && memcmp(type, "JSON(", 5) == 0 && type[len-1] == ')')
    0          
    0          
510 0 0         || (len > 7 && memcmp(type, "Object(", 7) == 0)) {
    0          
511             /* ClickHouse stable JSON type (24.8+); Object('json') is the
512             * legacy spelling. JSON(name Type, ...) pins specific paths to
513             * concrete inner types ("typed paths"); those skip the
514             * Dynamic+Variant wrap and write as regular columns inline. */
515 0           t->code = CT_JSON;
516 0 0         if (len > 5 && type[4] == '(') {
    0          
517             /* parse "name Type, name Type, ..." */
518 0           const char *body = type + 5;
519 0           size_t blen = len - 6;
520 0           int idx = 0, depth = 0;
521 0           size_t start = 0, i;
522             typedef struct { size_t name_start, name_len, type_start, type_len; }
523             jp_bound_t;
524             jp_bound_t bounds[64];
525 0 0         for (i = 0; i <= blen; i++) {
526 0 0         char c = (i < blen) ? body[i] : ',';
527 0 0         if (c == '(') depth++;
528 0 0         else if (c == ')') depth--;
529 0 0         else if ((c == ',' && depth == 0) || i == blen) {
    0          
    0          
530 0           size_t ts = start, te = i;
531 0 0         while (ts < te && (body[ts]==' '||body[ts]=='\t')) ts++;
    0          
    0          
532 0 0         while (te > ts && (body[te-1]==' '||body[te-1]=='\t')) te--;
    0          
    0          
533 0 0         if (te > ts && idx < 64) {
    0          
534 0           size_t id = ts;
535 0 0         while (id < te && body[id] != ' ' && body[id] != '\t') id++;
    0          
    0          
536 0           size_t ws = id;
537 0 0         while (ws < te && (body[ws]==' '||body[ws]=='\t')) ws++;
    0          
    0          
538 0 0         if (id > ts && ws < te) {
    0          
539 0           bounds[idx].name_start = ts;
540 0           bounds[idx].name_len = id - ts;
541 0           bounds[idx].type_start = ws;
542 0           bounds[idx].type_len = te - ws;
543 0           idx++;
544             }
545             }
546 0           start = i + 1;
547             }
548             }
549 0 0         if (idx > 0) {
550             /* Sort by name (lex) — wire-order requirement. */
551             int j;
552 0 0         for (j = 1; j < idx; j++) {
553 0           int z = j;
554 0 0         while (z > 0) {
555 0           size_t la = bounds[z-1].name_len, lb = bounds[z].name_len;
556 0           size_t m = la < lb ? la : lb;
557 0           int cmp = memcmp(body + bounds[z-1].name_start,
558 0           body + bounds[z].name_start, m);
559 0 0         if (cmp == 0) cmp = (int)la - (int)lb;
560 0 0         if (cmp <= 0) break;
561 0           jp_bound_t tmp = bounds[z-1];
562 0           bounds[z-1] = bounds[z];
563 0           bounds[z] = tmp;
564 0           z--;
565             }
566             }
567 0           t->num_inners = idx;
568 0           Newxz(t->inners, idx, col_type_t *);
569 0           Newxz(t->inner_names, idx, char *);
570 0 0         for (j = 0; j < idx; j++) {
571 0           Newx(t->inner_names[j], bounds[j].name_len + 1, char);
572 0           memcpy(t->inner_names[j],
573 0           body + bounds[j].name_start, bounds[j].name_len);
574 0           t->inner_names[j][bounds[j].name_len] = '\0';
575 0           t->inners[j] = parse_col_type(
576 0           body + bounds[j].type_start, bounds[j].type_len);
577             }
578             }
579             }
580             }
581             /* Geo type aliases (per ClickHouse docs / Encoder layout) */
582 0 0         else if (len == 5 && memcmp(type, "Point", 5) == 0) {
    0          
583             /* Point = Tuple(Float64, Float64) */
584 0           t->code = CT_TUPLE;
585 0           parse_type_list(t, "Float64,Float64", 15);
586             }
587 0 0         else if (len == 4 && memcmp(type, "Ring", 4) == 0) {
    0          
588             /* Ring = Array(Point) */
589 0           t->code = CT_ARRAY;
590 0           t->inner = parse_col_type("Point", 5);
591             }
592 0 0         else if (len == 10 && memcmp(type, "LineString", 10) == 0) {
    0          
593             /* LineString = Array(Point) */
594 0           t->code = CT_ARRAY;
595 0           t->inner = parse_col_type("Point", 5);
596             }
597 0 0         else if (len == 15 && memcmp(type, "MultiLineString", 15) == 0) {
    0          
598             /* MultiLineString = Array(Array(Point)) */
599 0           t->code = CT_ARRAY;
600 0           t->inner = parse_col_type("Array(Point)", 12);
601             }
602 0 0         else if (len == 7 && memcmp(type, "Polygon", 7) == 0) {
    0          
603             /* Polygon = Array(Ring) */
604 0           t->code = CT_ARRAY;
605 0           t->inner = parse_col_type("Ring", 4);
606             }
607 0 0         else if (len == 12 && memcmp(type, "MultiPolygon", 12) == 0) {
    0          
608             /* MultiPolygon = Array(Polygon) */
609 0           t->code = CT_ARRAY;
610 0           t->inner = parse_col_type("Polygon", 7);
611             }
612 0 0         else if ((len > 25 && memcmp(type, "SimpleAggregateFunction(", 24) == 0)
    0          
613 0 0         || (len > 19 && memcmp(type, "AggregateFunction(", 18) == 0)) {
    0          
614             /* (Simple)AggregateFunction(func, Type...) — skip func, parse inner.
615             * For SAF this is exact. For full AggregateFunction the raw inner
616             * type matches simple aggregates (sum/min/max etc) but is wrong
617             * for complex states (quantile, uniqExact, ...) — for those, run
618             * finalizeAggregation(col) server-side and read the result. */
619 0 0         size_t off = (memcmp(type, "Simple", 6) == 0) ? 24 : 18;
620 0           const char *inner = type + off;
621 0           size_t inner_len = len - off - 1;
622             size_t ci;
623 0           int depth = 0;
624 0 0         for (ci = 0; ci < inner_len; ci++) {
625 0 0         if (inner[ci] == '(') depth++;
626 0 0         else if (inner[ci] == ')') depth--;
627 0 0         else if (inner[ci] == ',' && depth == 0) break;
    0          
628             }
629 0 0         if (ci < inner_len) {
630 0           ci++;
631 0 0         while (ci < inner_len && inner[ci] == ' ') ci++;
    0          
632 0           Safefree(t);
633 0           t = parse_col_type(inner + ci, inner_len - ci);
634             } else {
635 0           t->code = CT_UNKNOWN;
636             }
637             }
638             else {
639             /* Unknown type — treat as String (read raw bytes) */
640 0           t->code = CT_UNKNOWN;
641             }
642              
643 0           return t;
644             }
645              
646             /* Size in bytes for fixed-width types. Returns 0 for variable-width. */
647 0           static size_t col_type_fixed_size(col_type_t *t) {
648 0           switch (t->code) {
649 0           case CT_INT8: case CT_UINT8: case CT_ENUM8: case CT_BOOL: return 1;
650 0           case CT_INT16: case CT_UINT16: case CT_ENUM16:
651 0           case CT_DATE: case CT_BFLOAT16: return 2;
652 0           case CT_INT32: case CT_UINT32: case CT_FLOAT32:
653             case CT_DECIMAL32: case CT_DATE32: case CT_DATETIME:
654 0           case CT_IPV4: return 4;
655 0           case CT_INT64: case CT_UINT64: case CT_FLOAT64:
656 0           case CT_DECIMAL64: case CT_DATETIME64: return 8;
657 0           case CT_UUID: case CT_DECIMAL128:
658 0           case CT_INT128: case CT_UINT128: case CT_IPV6: return 16;
659 0           case CT_INT256: case CT_UINT256: case CT_DECIMAL256: return 32;
660 0           case CT_FIXEDSTRING: return (size_t)t->param;
661 0           default: return 0;
662             }
663             }
664              
665             /* --- Decode helper functions for opt-in type formatting --- */
666              
667             /* Convert days since Unix epoch to "YYYY-MM-DD".
668             * Cast to int64_t before multiply: 32-bit time_t platforms would otherwise
669             * overflow for any date past 2038. */
670 0           static SV* days_to_date_sv(int32_t days) {
671 0           time_t t = (time_t)((int64_t)days * 86400);
672             struct tm tm;
673             char buf[11];
674 0 0         if (!gmtime_r(&t, &tm)) return newSVpvn("0000-00-00", 10);
675 0           snprintf(buf, sizeof(buf), "%04d-%02d-%02d",
676 0           tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday);
677 0           return newSVpvn(buf, 10);
678             }
679              
680             /* Convert epoch seconds to "YYYY-MM-DD HH:MM:SS" in UTC. */
681 0           static SV* epoch_to_datetime_sv(uint32_t epoch) {
682 0           time_t t = (time_t)epoch;
683             struct tm tm;
684             char buf[20];
685 0 0         if (!gmtime_r(&t, &tm)) return newSVpvn("0000-00-00 00:00:00", 19);
686 0           snprintf(buf, sizeof(buf), "%04d-%02d-%02d %02d:%02d:%02d",
687 0           tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday,
688             tm.tm_hour, tm.tm_min, tm.tm_sec);
689 0           return newSVpvn(buf, 19);
690             }
691              
692             /* Format epoch in the caller's currently-active TZ (set via set_tz). */
693 0           static SV* epoch_to_datetime_sv_local(uint32_t epoch) {
694 0           time_t t = (time_t)epoch;
695             struct tm tm;
696             char buf[20];
697 0 0         if (!localtime_r(&t, &tm)) return newSVpvn("0000-00-00 00:00:00", 19);
698 0           snprintf(buf, sizeof(buf), "%04d-%02d-%02d %02d:%02d:%02d",
699 0           tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday,
700             tm.tm_hour, tm.tm_min, tm.tm_sec);
701 0           return newSVpvn(buf, 19);
702             }
703              
704             /* Convert DateTime64 to "YYYY-MM-DD HH:MM:SS.fff...", use_local=1 for localtime */
705 0           static SV* dt64_to_datetime_sv_ex(int64_t val, int precision, int use_local) {
706 0           int64_t scale = 1;
707             int p;
708             int64_t epoch, frac;
709             time_t t;
710             struct tm tm;
711             char buf[32];
712             int n;
713              
714 0 0         for (p = 0; p < precision; p++) scale *= 10;
715 0           epoch = val / scale;
716 0           frac = val % scale;
717 0 0         if (frac < 0) { epoch--; frac += scale; }
718              
719 0           t = (time_t)epoch;
720 0 0         if (use_local) {
721 0 0         if (!localtime_r(&t, &tm)) return newSVpvn("0000-00-00 00:00:00", 19);
722             } else {
723 0 0         if (!gmtime_r(&t, &tm)) return newSVpvn("0000-00-00 00:00:00", 19);
724             }
725 0           n = snprintf(buf, sizeof(buf), "%04d-%02d-%02d %02d:%02d:%02d",
726 0           tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday,
727             tm.tm_hour, tm.tm_min, tm.tm_sec);
728 0 0         if (precision > 0 && n < 30) {
    0          
729             char fracbuf[16];
730             int fi;
731 0           snprintf(fracbuf, sizeof(fracbuf), "%0*lld", precision, (long long)frac);
732 0           buf[n++] = '.';
733 0 0         for (fi = 0; fi < precision && n < 31; fi++)
    0          
734 0           buf[n++] = fracbuf[fi];
735             }
736 0           return newSVpvn(buf, n);
737             }
738              
739             /* Set TZ env var and tzset(); returns saved old TZ (caller frees).
740             * Safe because EV is single-threaded and the set_tz / decode / restore_tz
741             * window contains no Perl callback dispatch. */
742 0           static char* set_tz(const char *tz) {
743 0           char *saved = safe_strdup(getenv("TZ"));
744 0           setenv("TZ", tz, 1);
745 0           tzset();
746 0           return saved;
747             }
748              
749             /* Restore TZ from saved value (which may be NULL), then free saved */
750 0           static void restore_tz(char *saved) {
751 0 0         if (saved) {
752 0           setenv("TZ", saved, 1);
753 0           Safefree(saved);
754             } else {
755 0           unsetenv("TZ");
756             }
757 0           tzset();
758 0           }
759              
760             /* Compute 10^n as double */
761 0           static double pow10_int(int n) {
762 0           double r = 1.0;
763             int i;
764 0 0         for (i = 0; i < n; i++) r *= 10.0;
765 0           return r;
766             }
767              
768             /* Parse enum label for a given code from type string like "Enum8('a'=1,'b'=2)" */
769 0           static SV* enum_label_for_code(const char *type_str, size_t type_str_len, int code) {
770             /* Find the opening '(' */
771 0           const char *p = memchr(type_str, '(', type_str_len);
772             const char *end;
773 0 0         if (!p) return newSViv(code);
774 0           p++;
775 0           end = type_str + type_str_len - 1; /* skip closing ')' */
776              
777 0 0         while (p < end) {
778             /* Skip whitespace */
779 0 0         while (p < end && *p == ' ') p++;
    0          
780 0 0         if (p >= end || *p != '\'') break;
    0          
781 0           p++; /* skip opening quote */
782              
783             /* Read label (handle escaped quotes) */
784             {
785 0           const char *label_start = p;
786             size_t label_len;
787             int val;
788              
789 0 0         while (p < end && !(*p == '\'' && (p + 1 >= end || *(p+1) != '\''))) {
    0          
    0          
    0          
790 0 0         if (*p == '\'' && p + 1 < end && *(p+1) == '\'') { p += 2; continue; }
    0          
    0          
791 0           p++;
792             }
793 0           label_len = p - label_start;
794 0 0         if (p < end) p++; /* skip closing quote */
795              
796             /* Skip ' = ' */
797 0 0         while (p < end && (*p == ' ' || *p == '=')) p++;
    0          
    0          
798              
799             /* Read integer value */
800 0           val = (int)strtol(p, NULL, 10);
801              
802 0 0         if (val == code) return newSVpvn(label_start, label_len);
803              
804             /* Skip to next entry */
805 0 0         while (p < end && *p != ',') p++;
    0          
806 0 0         if (p < end) p++; /* skip comma */
807             }
808             }
809             /* Not found — return numeric code */
810 0           return newSViv(code);
811             }
812              
813             /*
814             * Decode a column of `nrows` values from the native binary format.
815             * Returns an array of SVs (one per row). Returns NULL on failure.
816             * Sets *decode_err=1 on definitive errors (vs needing more data).
817             * Advances *pos past the consumed bytes.
818             */
819              
820             #ifdef __SIZEOF_INT128__
821 0           static SV* int128_to_sv(const char *p, int is_signed) {
822             unsigned __int128 uv;
823             char dbuf[42];
824 0           int dlen = 0, neg = 0, k;
825 0 0         if (is_signed) {
826             __int128 sv;
827 0           memcpy(&sv, p, 16);
828 0           neg = sv < 0;
829 0 0         uv = neg ? -(unsigned __int128)sv : (unsigned __int128)sv;
830             } else {
831 0           memcpy(&uv, p, 16);
832             }
833             do {
834 0           dbuf[dlen++] = '0' + (int)(uv % 10);
835 0           uv /= 10;
836 0 0         } while (uv);
837 0 0         if (neg) dbuf[dlen++] = '-';
838 0 0         for (k = 0; k < dlen/2; k++) {
839 0           char tmp = dbuf[k]; dbuf[k] = dbuf[dlen-1-k]; dbuf[dlen-1-k] = tmp;
840             }
841 0           return newSVpvn(dbuf, dlen);
842             }
843             #endif
844              
845             /* Convert a 256-bit LE unsigned integer (as 4 x uint64_t) to decimal string.
846             * Works on all platforms (no __int128 required). */
847 0           static SV* uint256_to_sv(const char *p) {
848             /* Copy into 4 x uint64_t LE limbs: v[0] = lowest */
849             uint64_t v[4];
850             char dbuf[80];
851 0           int dlen = 0, k;
852              
853 0           memcpy(v, p, 32);
854              
855             /* Handle zero */
856 0 0         if (v[0] == 0 && v[1] == 0 && v[2] == 0 && v[3] == 0)
    0          
    0          
    0          
857 0           return newSVpvn("0", 1);
858              
859             /* Repeatedly divide by 10, collecting remainders */
860 0 0         while (v[0] || v[1] || v[2] || v[3]) {
    0          
    0          
    0          
861 0           uint64_t rem = 0;
862             int i;
863 0 0         for (i = 3; i >= 0; i--) {
864             #ifdef __SIZEOF_INT128__
865 0           unsigned __int128 cur = ((unsigned __int128)rem << 64) | v[i];
866 0           v[i] = (uint64_t)(cur / 10);
867 0           rem = (uint64_t)(cur % 10);
868             #else
869             /* Without 128-bit: split each 64-bit limb into hi32:lo32 */
870             uint64_t hi = (rem << 32) | (v[i] >> 32);
871             uint64_t q_hi = hi / 10;
872             uint64_t r_hi = hi % 10;
873             uint64_t lo = (r_hi << 32) | (v[i] & 0xFFFFFFFFULL);
874             uint64_t q_lo = lo / 10;
875             rem = lo % 10;
876             v[i] = (q_hi << 32) | q_lo;
877             #endif
878             }
879 0           dbuf[dlen++] = '0' + (int)rem;
880             }
881 0 0         for (k = 0; k < dlen/2; k++) {
882 0           char tmp = dbuf[k]; dbuf[k] = dbuf[dlen-1-k]; dbuf[dlen-1-k] = tmp;
883             }
884 0           return newSVpvn(dbuf, dlen);
885             }
886              
887 0           static SV* int256_to_sv(const char *p, int is_signed) {
888 0 0         if (is_signed && ((unsigned char)p[31] & 0x80)) {
    0          
889             /* Negative: two's complement negate, format, prepend '-' */
890             unsigned char neg[32];
891 0           int i, carry = 1;
892             SV *sv;
893             STRLEN svlen;
894             char *s;
895 0 0         for (i = 0; i < 32; i++) {
896 0           int b = (unsigned char)(~((unsigned char)p[i])) + carry;
897 0           neg[i] = (unsigned char)(b & 0xFF);
898 0           carry = b >> 8;
899             }
900 0           sv = uint256_to_sv((const char *)neg);
901             /* Prepend '-' */
902 0           s = SvPV(sv, svlen);
903             {
904 0           SV *result = newSV(svlen + 1);
905 0           SvPOK_on(result);
906 0           SvCUR_set(result, svlen + 1);
907 0           *SvPVX(result) = '-';
908 0           Copy(s, SvPVX(result) + 1, svlen, char);
909 0           SvPVX(result)[svlen + 1] = '\0';
910 0           SvREFCNT_dec(sv);
911 0           return result;
912             }
913             }
914 0           return uint256_to_sv(p);
915             }
916              
917             static SV** decode_column_ex(const char *buf, size_t len, size_t *pos,
918             uint64_t nrows, col_type_t *ct, int *decode_err,
919             uint32_t decode_flags, ev_clickhouse_t *lc_self,
920             int lc_col_idx);
921              
922 0           static SV** decode_column(const char *buf, size_t len, size_t *pos,
923             uint64_t nrows, col_type_t *ct, int *decode_err,
924             uint32_t decode_flags) {
925 0           return decode_column_ex(buf, len, pos, nrows, ct, decode_err, decode_flags, NULL, -1);
926             }
927              
928 0           static SV** decode_column_ex(const char *buf, size_t len, size_t *pos,
929             uint64_t nrows, col_type_t *ct, int *decode_err,
930             uint32_t decode_flags, ev_clickhouse_t *lc_self,
931             int lc_col_idx) {
932             SV **out;
933             uint64_t i;
934             size_t fsz;
935              
936 0 0         Newxz(out, nrows ? nrows : 1, SV*);
    0          
    0          
937              
938 0 0         if (ct->code == CT_NOTHING) {
939             /* Nothing type: 1 placeholder byte ('0') per row */
940 0 0         if (*pos > len || nrows > len - *pos) goto fail;
    0          
941 0           *pos += nrows;
942 0 0         for (i = 0; i < nrows; i++)
943 0           out[i] = newSV(0);
944 0           return out;
945             }
946              
947 0 0         if (ct->code == CT_VARIANT || ct->code == CT_DYNAMIC) {
    0          
948             /* The wire format is per-version and includes shared substream
949             * dispatch we don't replicate here. Set decode_err so the calling
950             * parser surfaces a clean error and tears the connection down,
951             * instead of trying to read garbage strings (which would desync
952             * every subsequent column). Recommend a server-side workaround. */
953 0 0         if (decode_err) *decode_err = 1;
954 0           goto fail;
955             }
956              
957 0 0         if (ct->code == CT_NULLABLE) {
958             /* null bitmap: nrows bytes of UInt8 */
959             uint8_t *nulls;
960             SV **inner;
961 0 0         if (*pos > len || nrows > len - *pos) goto fail;
    0          
962 0           Newx(nulls, nrows, uint8_t);
963 0           Copy(buf + *pos, nulls, nrows, uint8_t);
964 0           *pos += nrows;
965              
966             /* decode inner column */
967 0           inner = decode_column(buf, len, pos, nrows, ct->inner, decode_err, decode_flags);
968 0 0         if (!inner) { Safefree(nulls); goto fail; }
969              
970 0 0         for (i = 0; i < nrows; i++) {
971 0 0         if (nulls[i]) {
972 0           SvREFCNT_dec(inner[i]);
973 0           out[i] = newSV(0); /* undef */
974             } else {
975 0           out[i] = inner[i];
976             }
977             }
978 0           Safefree(nulls);
979 0           Safefree(inner);
980 0           return out;
981             }
982              
983 0 0         if (ct->code == CT_LOWCARDINALITY) {
984             /*
985             * LowCardinality wire format (all multi-byte integers are UInt64 LE):
986             * PREFIX: UInt64 key_version (1=SharedDicts, 2=SingleDict)
987             * DATA: UInt64 serialization_type (bits 0-7: index type,
988             * bit 8: NeedGlobalDictionary, bit 9: HasAdditionalKeys,
989             * bit 10: NeedUpdateDictionary)
990             * if NeedUpdateDictionary: UInt64 num_keys + dictionary data
991             * UInt64 num_indices + index data
992             */
993             uint64_t version, ser_type, num_keys, num_indices;
994 0           size_t saved = *pos;
995             int key_type;
996             size_t idx_size;
997 0           SV **dict = NULL;
998 0           int dict_borrowed = 0; /* 1 if dict points to lc_self storage */
999              
1000             /* key_version: UInt64 (from serializeBinaryBulkStatePrefix) */
1001 0 0         if (*pos + 8 > len) goto lc_fail;
1002 0           memcpy(&version, buf + *pos, 8); *pos += 8;
1003              
1004             /* serialization_type: UInt64 */
1005 0 0         if (*pos + 8 > len) goto lc_fail;
1006 0           memcpy(&ser_type, buf + *pos, 8); *pos += 8;
1007              
1008 0           key_type = (int)(ser_type & 0xFF);
1009             /* key_type: 0=UInt8, 1=UInt16, 2=UInt32, 3=UInt64 */
1010              
1011             /* Read dictionary if NeedUpdateDictionary (bit 10) */
1012 0 0         if (ser_type & (1ULL << 10)) {
1013 0 0         if (*pos + 8 > len) goto lc_fail;
1014 0           memcpy(&num_keys, buf + *pos, 8); *pos += 8;
1015              
1016 0           dict = decode_column(buf, len, pos, num_keys, ct->inner, decode_err, decode_flags);
1017 0 0         if (!dict) goto lc_fail;
1018             } else {
1019             /* NeedUpdateDictionary=0: reuse dictionary from prior block */
1020 0 0         if (lc_self && lc_col_idx >= 0 && lc_col_idx < lc_self->lc_num_cols
    0          
    0          
1021 0 0         && lc_self->lc_dicts[lc_col_idx]) {
1022 0           dict = lc_self->lc_dicts[lc_col_idx];
1023 0           num_keys = lc_self->lc_dict_sizes[lc_col_idx];
1024 0           dict_borrowed = 1;
1025             } else {
1026 0 0         if (decode_err) *decode_err = 1;
1027 0           goto lc_fail;
1028             }
1029             }
1030              
1031             /* Read indices: UInt64 num_indices + index data */
1032 0 0         if (*pos + 8 > len) goto lc_fail;
1033 0           memcpy(&num_indices, buf + *pos, 8); *pos += 8;
1034              
1035 0 0         idx_size = (key_type == 0) ? 1 : (key_type == 1) ? 2 :
    0          
    0          
1036             (key_type == 2) ? 4 : 8;
1037 0 0         if (num_indices != nrows) {
1038 0 0         if (decode_err) *decode_err = 1;
1039 0           goto lc_fail;
1040             }
1041 0 0         if (*pos > len || num_indices > (len - *pos) / idx_size) goto lc_fail;
    0          
1042              
1043             /* Store new dictionary for cross-block reuse (after validation) */
1044 0 0         if (!dict_borrowed && lc_self && lc_col_idx >= 0 && lc_col_idx < lc_self->lc_num_cols) {
    0          
    0          
    0          
1045 0 0         if (lc_self->lc_dicts[lc_col_idx]) {
1046             uint64_t di;
1047 0 0         for (di = 0; di < lc_self->lc_dict_sizes[lc_col_idx]; di++)
1048 0           SvREFCNT_dec(lc_self->lc_dicts[lc_col_idx][di]);
1049 0           Safefree(lc_self->lc_dicts[lc_col_idx]);
1050             }
1051             SV **dcopy;
1052 0 0         Newx(dcopy, num_keys > 0 ? num_keys : 1, SV*);
    0          
    0          
1053 0 0         for (i = 0; i < num_keys; i++)
1054 0           dcopy[i] = SvREFCNT_inc(dict[i]);
1055 0           lc_self->lc_dicts[lc_col_idx] = dcopy;
1056 0           lc_self->lc_dict_sizes[lc_col_idx] = num_keys;
1057             }
1058              
1059 0 0         for (i = 0; i < nrows; i++) {
1060 0           uint64_t idx = 0;
1061 0           memcpy(&idx, buf + *pos + i * idx_size, idx_size);
1062 0 0         if (dict && idx < num_keys) {
    0          
1063 0           out[i] = SvREFCNT_inc(dict[idx]);
1064             } else {
1065 0           out[i] = newSV(0); /* undef for missing dict entry */
1066             }
1067             }
1068 0           *pos += num_indices * idx_size;
1069              
1070 0 0         if (dict && !dict_borrowed) {
    0          
1071 0 0         for (i = 0; i < num_keys; i++) SvREFCNT_dec(dict[i]);
1072 0           Safefree(dict);
1073             }
1074 0           return out;
1075              
1076 0           lc_fail:
1077 0 0         if (dict && !dict_borrowed) {
    0          
1078 0 0         for (i = 0; i < num_keys; i++) SvREFCNT_dec(dict[i]);
1079 0           Safefree(dict);
1080             }
1081 0           *pos = saved;
1082 0           goto fail;
1083             }
1084              
1085 0 0         if (ct->code == CT_STRING) {
1086 0 0         for (i = 0; i < nrows; i++) {
1087             const char *s;
1088             size_t slen;
1089 0 0         if (read_native_string_ref(buf, len, pos, &s, &slen) <= 0) {
1090             /* clean up already-created SVs */
1091             uint64_t j;
1092 0 0         for (j = 0; j < i; j++) SvREFCNT_dec(out[j]);
1093 0           goto fail;
1094             }
1095 0           out[i] = newSVpvn(s, slen);
1096             }
1097 0           return out;
1098             }
1099              
1100 0 0         if (ct->code == CT_ARRAY) {
1101             /* offsets: nrows x UInt64 */
1102             uint64_t *offsets;
1103             SV **elems;
1104             uint64_t total, prev;
1105              
1106 0 0         if (*pos > len || nrows > (len - *pos) / 8) goto fail;
    0          
1107 0 0         Newx(offsets, nrows, uint64_t);
1108 0 0         Copy(buf + *pos, offsets, nrows, uint64_t);
1109 0           *pos += nrows * 8;
1110              
1111             /* validate offset monotonicity */
1112 0           prev = 0;
1113 0 0         for (i = 0; i < nrows; i++) {
1114 0 0         if (offsets[i] < prev) { Safefree(offsets); goto fail; }
1115 0           prev = offsets[i];
1116             }
1117              
1118 0 0         total = nrows > 0 ? offsets[nrows - 1] : 0;
1119              
1120             /* decode all inner elements */
1121 0           elems = decode_column(buf, len, pos, total, ct->inner, decode_err, decode_flags);
1122 0 0         if (!elems) { Safefree(offsets); goto fail; }
1123              
1124             /* build AV for each row */
1125 0           prev = 0;
1126 0 0         for (i = 0; i < nrows; i++) {
1127 0           uint64_t count = offsets[i] - prev;
1128 0           AV *av = newAV();
1129             uint64_t j;
1130 0 0         if (count > 0) av_extend(av, count - 1);
1131 0 0         for (j = 0; j < count; j++) {
1132 0           av_push(av, elems[prev + j]);
1133             }
1134 0           out[i] = newRV_noinc((SV*)av);
1135 0           prev = offsets[i];
1136             }
1137              
1138 0           Safefree(offsets);
1139 0           Safefree(elems);
1140 0           return out;
1141             }
1142              
1143 0 0         if (ct->code == CT_TUPLE) {
1144             /* Tuple: each element is a separate column, transpose to row arrays */
1145             SV ***cols;
1146             int j;
1147              
1148 0           Newxz(cols, ct->num_inners, SV**);
1149 0 0         for (j = 0; j < ct->num_inners; j++) {
1150 0           cols[j] = decode_column(buf, len, pos, nrows, ct->inners[j], decode_err, decode_flags);
1151 0 0         if (!cols[j]) {
1152             int k;
1153 0 0         for (k = 0; k < j; k++) {
1154 0 0         for (i = 0; i < nrows; i++) SvREFCNT_dec(cols[k][i]);
1155 0           Safefree(cols[k]);
1156             }
1157 0           Safefree(cols);
1158 0           goto fail;
1159             }
1160             }
1161              
1162 0 0         for (i = 0; i < nrows; i++) {
1163 0           AV *av = newAV();
1164 0           av_extend(av, ct->num_inners - 1);
1165 0 0         for (j = 0; j < ct->num_inners; j++)
1166 0           av_push(av, cols[j][i]);
1167 0           out[i] = newRV_noinc((SV*)av);
1168             }
1169              
1170 0 0         for (j = 0; j < ct->num_inners; j++) Safefree(cols[j]);
1171 0           Safefree(cols);
1172 0           return out;
1173             }
1174              
1175 0 0         if (ct->code == CT_JSON) {
1176             /* Wire layout (V1/V2/V3 supported); see JSON helpers + Encoder docs. */
1177 0           size_t saved = *pos;
1178             uint64_t obj_ver;
1179             uint64_t num_paths64;
1180 0           json_path_entry_t *jpe = NULL;
1181 0           int *path_kinds_buf = NULL; /* concatenated [path][kind_idx] */
1182 0           int *path_kind_count = NULL;
1183 0           int wire_slots_cleanup = 0; /* slot-count for var_avs cleanup */
1184 0           AV **var_avs = NULL;
1185 0           uint64_t *offs = NULL;
1186 0           AV *pending_inner = NULL;
1187             int p;
1188              
1189 0 0         if (*pos > len || len - *pos < 8) goto json_fail;
    0          
1190 0           memcpy(&obj_ver, buf + *pos, 8); *pos += 8;
1191 0 0         if (obj_ver != 0 && obj_ver != 2 && obj_ver != 4) goto json_fail;
    0          
    0          
1192 0 0         if (obj_ver == 0) {
1193             uint64_t dummy;
1194 0 0         if (read_varuint(buf, len, pos, &dummy) <= 0) goto json_fail;
1195             }
1196 0 0         if (read_varuint(buf, len, pos, &num_paths64) <= 0) goto json_fail;
1197 0 0         if (num_paths64 > (uint64_t)INT_MAX) goto json_fail;
1198 0           int num_paths = (int)num_paths64;
1199              
1200 0 0         if (num_paths > 0) {
1201 0           Newx(jpe, num_paths, json_path_entry_t);
1202 0           Newxz(path_kind_count, num_paths, int);
1203 0           Newxz(path_kinds_buf, num_paths * JSON_LEX_SLOTS, int);
1204             }
1205 0 0         for (p = 0; p < num_paths; p++) {
1206             const char *ps; size_t pl;
1207 0 0         if (read_native_string_ref(buf, len, pos, &ps, &pl) <= 0) goto json_fail;
1208 0           jpe[p].path = (char*)ps;
1209 0           jpe[p].len = pl;
1210             }
1211 0 0         if (obj_ver == 4) {
1212             uint64_t shared_ver, dummy;
1213 0 0         if (read_varuint(buf, len, pos, &shared_ver) <= 0) goto json_fail;
1214 0 0         if (shared_ver == 1 || shared_ver == 2)
    0          
1215 0 0         if (read_varuint(buf, len, pos, &dummy) <= 0) goto json_fail;
1216             }
1217              
1218 0 0         for (p = 0; p < num_paths; p++) {
1219             uint64_t dyn_ver, var_mode, ntypes;
1220 0 0         if (*pos > len || len - *pos < 8) goto json_fail;
    0          
1221 0           memcpy(&dyn_ver, buf + *pos, 8); *pos += 8;
1222 0 0         if (dyn_ver != 1 && dyn_ver != 2 && dyn_ver != 4) goto json_fail;
    0          
    0          
1223 0 0         if (dyn_ver == 1) {
1224             uint64_t dummy;
1225 0 0         if (read_varuint(buf, len, pos, &dummy) <= 0) goto json_fail;
1226             }
1227 0 0         if (read_varuint(buf, len, pos, &ntypes) <= 0) goto json_fail;
1228 0 0         if (ntypes > (uint64_t)JSON_LEX_SLOTS) goto json_fail;
1229 0           int *kinds = path_kinds_buf + p * JSON_LEX_SLOTS;
1230             uint64_t ti;
1231 0 0         for (ti = 0; ti < ntypes; ti++) {
1232             const char *ts; size_t tl;
1233 0 0         if (read_native_string_ref(buf, len, pos, &ts, &tl) <= 0) goto json_fail;
1234 0           int k = json_kind_from_type_name(ts, tl);
1235 0 0         if (k < 0) goto json_fail;
1236 0           kinds[ti] = k;
1237             }
1238 0           path_kind_count[p] = (int)ntypes;
1239 0 0         if (*pos > len || len - *pos < 8) goto json_fail;
    0          
1240 0           memcpy(&var_mode, buf + *pos, 8); *pos += 8;
1241 0 0         if (var_mode != 0) goto json_fail;
1242             }
1243              
1244             /* Allocate per-row hashref. */
1245 0 0         for (i = 0; i < nrows; i++) out[i] = newRV_noinc((SV*)newHV());
1246              
1247             /* Typed-path data first (in declaration order). For each typed path,
1248             * decode a regular column via inner type, then store per-row into
1249             * out[i]'s hashref under the typed-path name. */
1250 0 0         if (ct->num_inners > 0) {
1251             int tp;
1252 0 0         for (tp = 0; tp < ct->num_inners; tp++) {
1253 0           SV **tpcol = decode_column(buf, len, pos, (uint64_t)nrows,
1254 0           ct->inners[tp], decode_err, decode_flags);
1255 0 0         if (!tpcol) goto json_fail;
1256 0           size_t nlen = strlen(ct->inner_names[tp]);
1257 0 0         for (i = 0; i < nrows; i++) {
1258 0           HV *row_hv = (HV*)SvRV(out[i]);
1259 0 0         if (!hv_store(row_hv, ct->inner_names[tp], (I32)nlen,
1260             tpcol[i], 0))
1261 0           SvREFCNT_dec(tpcol[i]);
1262             }
1263 0           Safefree(tpcol);
1264             }
1265             }
1266              
1267 0 0         for (p = 0; p < num_paths; p++) {
1268 0 0         if (*pos > len || len - *pos < (size_t)nrows) goto json_fail;
    0          
1269 0           const unsigned char *discs = (const unsigned char *)(buf + *pos);
1270 0           *pos += (size_t)nrows;
1271              
1272 0           int nv = path_kind_count[p];
1273 0           int wire_slots = nv + 1;
1274             int slot_to_kind[JSON_LEX_SLOTS];
1275 0           unsigned mask = 0;
1276             int s, kk;
1277 0           int *kinds = path_kinds_buf + p * JSON_LEX_SLOTS;
1278 0 0         for (kk = 0; kk < nv; kk++) mask |= 1u << kinds[kk];
1279 0           (void)json_build_lex_table(mask, slot_to_kind);
1280              
1281 0           uint64_t var_counts[JSON_LEX_SLOTS] = {0};
1282             uint64_t r2;
1283 0 0         for (r2 = 0; r2 < (uint64_t)nrows; r2++) {
1284 0           unsigned char d = discs[r2];
1285 0 0         if (d == 0xff) continue;
1286 0 0         if (d >= wire_slots) goto json_fail;
1287 0           var_counts[d]++;
1288             }
1289              
1290 0           Newxz(var_avs, wire_slots, AV*);
1291 0           wire_slots_cleanup = wire_slots;
1292              
1293 0 0         for (s = 0; s < wire_slots; s++) {
1294 0           int kind = slot_to_kind[s];
1295 0           uint64_t nv_rows = var_counts[s];
1296 0           AV *sub = newAV();
1297 0           var_avs[s] = sub;
1298 0 0         if (kind < 0 || kind == JV_STRING) {
    0          
1299             /* SharedVariant (kind<0) takes the same String wire shape;
1300             * the encoder never routes rows there but the format still
1301             * allocates the slot. */
1302             uint64_t k;
1303 0 0         for (k = 0; k < nv_rows; k++) {
1304             const char *vs; size_t vl;
1305 0 0         if (read_native_string_ref(buf, len, pos, &vs, &vl) <= 0)
1306 0           goto json_fail;
1307 0           av_push(sub, newSVpvn(vs, vl));
1308             }
1309 0 0         } else if (kind == JV_INT64) {
1310 0 0         if (*pos > len || len - *pos < 8 * nv_rows) goto json_fail;
    0          
1311             uint64_t k;
1312 0 0         for (k = 0; k < nv_rows; k++) {
1313 0           int64_t v; memcpy(&v, buf + *pos, 8); *pos += 8;
1314 0           av_push(sub, newSViv((IV)v));
1315             }
1316 0 0         } else if (kind == JV_FLOAT64) {
1317 0 0         if (*pos > len || len - *pos < 8 * nv_rows) goto json_fail;
    0          
1318             uint64_t k;
1319 0 0         for (k = 0; k < nv_rows; k++) {
1320 0           double v; memcpy(&v, buf + *pos, 8); *pos += 8;
1321 0           av_push(sub, newSVnv(v));
1322             }
1323 0 0         } else if (kind == JV_BOOL) {
1324 0 0         if (*pos > len || len - *pos < nv_rows) goto json_fail;
    0          
1325             uint64_t k;
1326 0 0         for (k = 0; k < nv_rows; k++) {
1327 0           unsigned char b8 = (unsigned char)buf[(*pos)++];
1328 0           av_push(sub, newSViv(b8 ? 1 : 0));
1329             }
1330 0 0         } else if (kind >= JV_ARRAY_BOOL && kind <= JV_ARRAY_STRING) {
    0          
1331 0 0         if (nv_rows == 0) continue;
1332 0 0         if (*pos > len || len - *pos < 8 * nv_rows) goto json_fail;
    0          
1333 0 0         Newx(offs, nv_rows, uint64_t);
1334             uint64_t k;
1335 0 0         for (k = 0; k < nv_rows; k++) {
1336 0           memcpy(&offs[k], buf + *pos, 8); *pos += 8;
1337             }
1338 0           uint64_t prev = 0;
1339 0 0         for (k = 0; k < nv_rows; k++) {
1340 0           uint64_t cnt = offs[k] - prev;
1341 0           pending_inner = newAV();
1342 0 0         if (cnt > 0) av_extend(pending_inner, (SSize_t)cnt - 1);
1343             uint64_t j;
1344 0 0         for (j = 0; j < cnt; j++) {
1345 0           switch (kind) {
1346 0           case JV_ARRAY_BOOL:
1347 0 0         if (*pos >= len) goto json_fail;
1348 0           av_push(pending_inner,
1349             newSViv((unsigned char)buf[(*pos)++] ? 1 : 0));
1350 0           break;
1351 0           case JV_ARRAY_INT64: {
1352 0 0         if (*pos > len || len - *pos < 8) goto json_fail;
    0          
1353 0           int64_t v; memcpy(&v, buf + *pos, 8); *pos += 8;
1354 0           av_push(pending_inner, newSViv((IV)v));
1355 0           break;
1356             }
1357 0           case JV_ARRAY_FLOAT64: {
1358 0 0         if (*pos > len || len - *pos < 8) goto json_fail;
    0          
1359 0           double v; memcpy(&v, buf + *pos, 8); *pos += 8;
1360 0           av_push(pending_inner, newSVnv(v));
1361 0           break;
1362             }
1363 0           case JV_ARRAY_STRING: {
1364             const char *vs; size_t vl;
1365 0 0         if (read_native_string_ref(buf, len, pos, &vs, &vl) <= 0)
1366 0           goto json_fail;
1367 0           av_push(pending_inner, newSVpvn(vs, vl));
1368 0           break;
1369             }
1370             }
1371             }
1372 0           av_push(sub, newRV_noinc((SV*)pending_inner));
1373 0           pending_inner = NULL; /* now owned by sub */
1374 0           prev = offs[k];
1375             }
1376 0           Safefree(offs); offs = NULL;
1377             } else {
1378 0           goto json_fail;
1379             }
1380             }
1381              
1382             /* Distribute values into per-row hashes (dotted keys, unflattened later). */
1383 0           SSize_t cursors[JSON_LEX_SLOTS] = {0};
1384             uint64_t r3;
1385 0 0         for (r3 = 0; r3 < (uint64_t)nrows; r3++) {
1386 0           unsigned char d = discs[r3];
1387 0 0         if (d == 0xff) continue;
1388 0           SV **e = av_fetch(var_avs[d], cursors[d]++, 0);
1389 0 0         if (!e) continue;
1390 0           HV *row_hv = (HV*)SvRV(out[r3]);
1391 0           SV *vsv = SvREFCNT_inc(*e);
1392 0 0         if (!hv_store(row_hv, jpe[p].path, (I32)jpe[p].len, vsv, 0))
1393 0           SvREFCNT_dec(vsv);
1394             }
1395 0 0         for (s = 0; s < wire_slots; s++)
1396 0 0         if (var_avs[s]) SvREFCNT_dec((SV*)var_avs[s]);
1397 0           Safefree(var_avs); var_avs = NULL;
1398 0           wire_slots_cleanup = 0;
1399             }
1400              
1401             /* Trailing shared data: N UInt64 offsets, then if last>0 strings. */
1402 0 0         if (nrows > 0) {
1403 0 0         if (*pos > len || len - *pos < 8 * (size_t)nrows) goto json_fail;
    0          
1404             uint64_t last_offset;
1405 0           memcpy(&last_offset, buf + *pos + 8 * (nrows - 1), 8);
1406 0           *pos += 8 * (size_t)nrows;
1407 0 0         if (last_offset > 0) {
1408             uint64_t k;
1409 0 0         for (k = 0; k < 2 * last_offset; k++) {
1410             const char *s; size_t l;
1411 0 0         if (read_native_string_ref(buf, len, pos, &s, &l) <= 0) goto json_fail;
1412             }
1413             }
1414             }
1415              
1416             /* Unflatten dotted keys back into nested hashes. */
1417 0 0         for (i = 0; i < nrows; i++) {
1418 0           HV *row_hv = (HV*)SvRV(out[i]);
1419 0           AV *keys = (AV*)sv_2mortal((SV*)newAV());
1420 0           hv_iterinit(row_hv);
1421             HE *he;
1422 0 0         while ((he = hv_iternext(row_hv))) {
1423             I32 klen;
1424 0           char *kstr = hv_iterkey(he, &klen);
1425 0 0         if (memchr(kstr, '.', klen))
1426 0           av_push(keys, newSVpvn(kstr, klen));
1427             }
1428 0           SSize_t nk = av_len(keys) + 1, ki;
1429 0 0         for (ki = 0; ki < nk; ki++) {
1430 0           SV *ksv = *av_fetch(keys, ki, 0);
1431             STRLEN klen;
1432 0           const char *kstr = SvPV(ksv, klen);
1433 0           HV *cur = row_hv;
1434 0           STRLEN seg_start = 0, off;
1435 0           int conflict = 0;
1436 0 0         for (off = 0; off <= klen; off++) {
1437 0 0         if (off == klen || kstr[off] == '.') {
    0          
1438 0           const char *seg = kstr + seg_start;
1439 0           STRLEN slen = off - seg_start;
1440 0 0         if (off == klen) {
1441 0           SV **leaf = hv_fetch(row_hv, kstr, (I32)klen, 0);
1442 0 0         if (!leaf) { conflict = 1; break; }
1443 0           SV *val = SvREFCNT_inc(*leaf);
1444 0 0         if (!hv_store(cur, seg, (I32)slen, val, 0)) {
1445 0           SvREFCNT_dec(val);
1446 0           conflict = 1;
1447             }
1448             } else {
1449 0           SV **next = hv_fetch(cur, seg, (I32)slen, 0);
1450 0 0         if (next && SvROK(*next) && SvTYPE(SvRV(*next)) == SVt_PVHV) {
    0          
    0          
1451 0           cur = (HV*)SvRV(*next);
1452 0 0         } else if (next) {
1453 0           conflict = 1; break;
1454             } else {
1455 0           HV *new_hv = newHV();
1456 0           hv_store(cur, seg, (I32)slen,
1457             newRV_noinc((SV*)new_hv), 0);
1458 0           cur = new_hv;
1459             }
1460             }
1461 0           seg_start = off + 1;
1462             }
1463             }
1464 0 0         if (!conflict)
1465 0           (void)hv_delete(row_hv, kstr, (I32)klen, G_DISCARD);
1466             }
1467             }
1468              
1469 0 0         if (jpe) Safefree(jpe);
1470 0 0         if (path_kinds_buf) Safefree(path_kinds_buf);
1471 0 0         if (path_kind_count) Safefree(path_kind_count);
1472 0           return out;
1473              
1474 0           json_fail:
1475 0 0         if (pending_inner) SvREFCNT_dec((SV*)pending_inner);
1476 0 0         if (offs) Safefree(offs);
1477 0 0         if (var_avs) {
1478             int sx;
1479 0 0         for (sx = 0; sx < wire_slots_cleanup; sx++)
1480 0 0         if (var_avs[sx]) SvREFCNT_dec((SV*)var_avs[sx]);
1481 0           Safefree(var_avs);
1482             }
1483 0 0         if (jpe) Safefree(jpe);
1484 0 0         if (path_kinds_buf) Safefree(path_kinds_buf);
1485 0 0         if (path_kind_count) Safefree(path_kind_count);
1486 0 0         for (i = 0; i < nrows; i++) if (out[i]) SvREFCNT_dec(out[i]);
    0          
1487 0           *pos = saved;
1488 0 0         if (decode_err) *decode_err = 1;
1489 0           goto fail;
1490             }
1491              
1492 0 0         if (ct->code == CT_MAP) {
1493 0 0         if (ct->num_inners != 2) { if (decode_err) *decode_err = 1; goto fail; }
    0          
1494             /* Map(K,V): wire format same as Array — offsets + keys column + values column */
1495             uint64_t *offsets, total, prev;
1496             SV **keys_col, **vals_col;
1497              
1498 0 0         if (*pos > len || nrows > (len - *pos) / 8) goto fail;
    0          
1499 0 0         Newx(offsets, nrows, uint64_t);
1500 0 0         Copy(buf + *pos, offsets, nrows, uint64_t);
1501 0           *pos += nrows * 8;
1502              
1503             /* validate offset monotonicity */
1504 0           prev = 0;
1505 0 0         for (i = 0; i < nrows; i++) {
1506 0 0         if (offsets[i] < prev) { Safefree(offsets); goto fail; }
1507 0           prev = offsets[i];
1508             }
1509              
1510 0 0         total = nrows > 0 ? offsets[nrows - 1] : 0;
1511              
1512 0           keys_col = decode_column(buf, len, pos, total, ct->inners[0], decode_err, decode_flags);
1513 0 0         if (!keys_col) { Safefree(offsets); goto fail; }
1514              
1515 0           vals_col = decode_column(buf, len, pos, total, ct->inners[1], decode_err, decode_flags);
1516 0 0         if (!vals_col) {
1517 0 0         for (i = 0; i < total; i++) SvREFCNT_dec(keys_col[i]);
1518 0           Safefree(keys_col);
1519 0           Safefree(offsets);
1520 0           goto fail;
1521             }
1522              
1523 0           prev = 0;
1524 0 0         for (i = 0; i < nrows; i++) {
1525 0           uint64_t count = offsets[i] - prev;
1526 0           HV *hv = newHV();
1527             uint64_t j;
1528 0 0         for (j = 0; j < count; j++) {
1529             STRLEN klen;
1530 0           const char *kstr = SvPV(keys_col[prev + j], klen);
1531             {
1532 0           SV *val_sv = SvREFCNT_inc(vals_col[prev + j]);
1533 0 0         if (!hv_store(hv, kstr, klen, val_sv, 0))
1534 0           SvREFCNT_dec(val_sv);
1535             }
1536             }
1537 0           out[i] = newRV_noinc((SV*)hv);
1538 0           prev = offsets[i];
1539             }
1540              
1541 0 0         for (i = 0; i < total; i++) {
1542 0           SvREFCNT_dec(keys_col[i]);
1543 0           SvREFCNT_dec(vals_col[i]);
1544             }
1545 0           Safefree(keys_col);
1546 0           Safefree(vals_col);
1547 0           Safefree(offsets);
1548 0           return out;
1549             }
1550              
1551             /* Fixed-width types */
1552 0           fsz = col_type_fixed_size(ct);
1553 0 0         if (ct->code == CT_FIXEDSTRING && fsz == 0) {
    0          
1554             /* FixedString(0): 0 bytes per row, produce empty strings */
1555 0 0         for (i = 0; i < nrows; i++)
1556 0           out[i] = newSVpvn("", 0);
1557 0           return out;
1558             }
1559 0 0         if (fsz > 0) {
1560 0           char *saved_tz = NULL;
1561 0           int tz_set = 0;
1562              
1563 0 0         if (*pos > len || nrows > (len - *pos) / fsz) goto fail;
    0          
1564              
1565             /* Set timezone for DateTime/DateTime64 columns with explicit tz */
1566 0 0         if (ct->tz && (decode_flags & DECODE_DT_STR) &&
    0          
1567 0 0         (ct->code == CT_DATETIME || ct->code == CT_DATETIME64)) {
    0          
1568 0           saved_tz = set_tz(ct->tz);
1569 0           tz_set = 1;
1570             }
1571              
1572 0 0         for (i = 0; i < nrows; i++) {
1573 0           const char *p = buf + *pos + i * fsz;
1574 0           switch (ct->code) {
1575 0           case CT_INT8: out[i] = newSViv(*(int8_t*)p); break;
1576 0           case CT_INT16: { int16_t v; memcpy(&v, p, 2); out[i] = newSViv(v); break; }
1577 0           case CT_INT32: { int32_t v; memcpy(&v, p, 4); out[i] = newSViv(v); break; }
1578 0           case CT_INT64: { int64_t v; memcpy(&v, p, 8); out[i] = newSViv((IV)v); break; }
1579 0           case CT_UINT8: case CT_BOOL:
1580 0           out[i] = newSVuv(*(uint8_t*)p); break;
1581 0           case CT_UINT16: { uint16_t v; memcpy(&v, p, 2); out[i] = newSVuv(v); break; }
1582 0           case CT_UINT32: { uint32_t v; memcpy(&v, p, 4); out[i] = newSVuv(v); break; }
1583 0           case CT_UINT64: { uint64_t v; memcpy(&v, p, 8); out[i] = newSVuv((UV)v); break; }
1584 0           case CT_FLOAT32: { float v; memcpy(&v, p, 4); out[i] = newSVnv(v); break; }
1585 0           case CT_FLOAT64: { double v; memcpy(&v, p, 8); out[i] = newSVnv(v); break; }
1586 0           case CT_ENUM8:
1587 0 0         if (decode_flags & DECODE_ENUM_STR)
1588 0           out[i] = enum_label_for_code(ct->type_str, ct->type_str_len, *(int8_t*)p);
1589             else
1590 0           out[i] = newSViv(*(int8_t*)p);
1591 0           break;
1592 0           case CT_ENUM16: {
1593 0           int16_t v; memcpy(&v, p, 2);
1594 0 0         if (decode_flags & DECODE_ENUM_STR)
1595 0           out[i] = enum_label_for_code(ct->type_str, ct->type_str_len, v);
1596             else
1597 0           out[i] = newSViv(v);
1598 0           break;
1599             }
1600 0           case CT_DATE: {
1601 0           uint16_t v; memcpy(&v, p, 2);
1602 0 0         if (decode_flags & DECODE_DT_STR)
1603 0           out[i] = days_to_date_sv((int32_t)v);
1604             else
1605 0           out[i] = newSVuv(v);
1606 0           break;
1607             }
1608 0           case CT_DATE32: {
1609 0           int32_t v; memcpy(&v, p, 4);
1610 0 0         if (decode_flags & DECODE_DT_STR)
1611 0           out[i] = days_to_date_sv(v);
1612             else
1613 0           out[i] = newSViv(v);
1614 0           break;
1615             }
1616 0           case CT_DATETIME: {
1617 0           uint32_t v; memcpy(&v, p, 4);
1618 0 0         if (decode_flags & DECODE_DT_STR)
1619 0           out[i] = tz_set ? epoch_to_datetime_sv_local(v)
1620 0 0         : epoch_to_datetime_sv(v);
1621             else
1622 0           out[i] = newSVuv(v);
1623 0           break;
1624             }
1625 0           case CT_DATETIME64: {
1626 0           int64_t v; memcpy(&v, p, 8);
1627 0 0         if (decode_flags & DECODE_DT_STR)
1628 0           out[i] = dt64_to_datetime_sv_ex(v, ct->param, tz_set);
1629             else
1630 0           out[i] = newSViv((IV)v);
1631 0           break;
1632             }
1633 0           case CT_DECIMAL32: {
1634 0           int32_t v; memcpy(&v, p, 4);
1635 0 0         if (decode_flags & DECODE_DEC_SCALE)
1636 0           out[i] = newSVnv((double)v / pow10_int(ct->param));
1637             else
1638 0           out[i] = newSViv(v);
1639 0           break;
1640             }
1641 0           case CT_DECIMAL64: {
1642 0           int64_t v; memcpy(&v, p, 8);
1643 0 0         if (decode_flags & DECODE_DEC_SCALE)
1644 0           out[i] = newSVnv((double)v / pow10_int(ct->param));
1645             else
1646 0           out[i] = newSViv((IV)v);
1647 0           break;
1648             }
1649 0           case CT_DECIMAL128: {
1650             #ifdef __SIZEOF_INT128__
1651 0 0         if (decode_flags & DECODE_DEC_SCALE) {
1652             __int128 sv128;
1653 0           memcpy(&sv128, p, 16);
1654             /* Use long double for Decimal128 to preserve more precision */
1655 0           out[i] = newSVnv((NV)((long double)sv128 / (long double)pow10_int(ct->param)));
1656             } else {
1657 0           out[i] = int128_to_sv(p, 1);
1658             }
1659             #else
1660             out[i] = newSVpvn(p, 16);
1661             #endif
1662 0           break;
1663             }
1664 0           case CT_DECIMAL256: {
1665             /* No native int256; deliver raw 32 bytes (LE, signed) so
1666             * users can pass them to e.g. Math::BigInt. */
1667 0           out[i] = newSVpvn(p, 32);
1668 0           break;
1669             }
1670 0           case CT_BFLOAT16: {
1671             /* Top 16 bits of a Float32 (BE assembled into LE Float32). */
1672 0           uint8_t buf[4] = {0, 0, ((const uint8_t*)p)[0], ((const uint8_t*)p)[1]};
1673             float v;
1674 0           memcpy(&v, buf, 4);
1675 0           out[i] = newSVnv(v);
1676 0           break;
1677             }
1678 0           case CT_UUID: {
1679             /* UUID: two LE UInt64 halves, each reversed for display */
1680             char ustr[37];
1681 0           const unsigned char *u = (const unsigned char *)p;
1682 0           snprintf(ustr, sizeof(ustr),
1683             "%02x%02x%02x%02x-%02x%02x-%02x%02x-%02x%02x-%02x%02x%02x%02x%02x%02x",
1684 0           u[7],u[6],u[5],u[4],u[3],u[2],u[1],u[0],
1685 0           u[15],u[14],u[13],u[12],u[11],u[10],u[9],u[8]);
1686 0           out[i] = newSVpvn(ustr, 36);
1687 0           break;
1688             }
1689 0           case CT_IPV4: {
1690             /* UInt32 LE, MSB is first octet */
1691             uint32_t v;
1692             struct in_addr addr;
1693             char abuf[INET_ADDRSTRLEN];
1694 0           memcpy(&v, p, 4);
1695 0           addr.s_addr = htonl(v);
1696 0           inet_ntop(AF_INET, &addr, abuf, sizeof(abuf));
1697 0           out[i] = newSVpv(abuf, 0);
1698 0           break;
1699             }
1700 0           case CT_IPV6: {
1701             /* 16 bytes in network byte order */
1702             char abuf[INET6_ADDRSTRLEN];
1703 0           inet_ntop(AF_INET6, p, abuf, sizeof(abuf));
1704 0           out[i] = newSVpv(abuf, 0);
1705 0           break;
1706             }
1707 0           case CT_INT128: {
1708             #ifdef __SIZEOF_INT128__
1709 0           out[i] = int128_to_sv(p, 1);
1710             #else
1711             out[i] = newSVpvn(p, 16);
1712             #endif
1713 0           break;
1714             }
1715 0           case CT_UINT128: {
1716             #ifdef __SIZEOF_INT128__
1717 0           out[i] = int128_to_sv(p, 0);
1718             #else
1719             out[i] = newSVpvn(p, 16);
1720             #endif
1721 0           break;
1722             }
1723 0           case CT_INT256:
1724 0           out[i] = int256_to_sv(p, 1); break;
1725 0           case CT_UINT256:
1726 0           out[i] = int256_to_sv(p, 0); break;
1727 0           case CT_FIXEDSTRING: default:
1728 0           out[i] = newSVpvn(p, fsz); break;
1729             }
1730             }
1731 0 0         if (tz_set) restore_tz(saved_tz);
1732 0           *pos += nrows * fsz;
1733 0           return out;
1734             }
1735              
1736             /* CT_UNKNOWN: try reading as String */
1737 0 0         for (i = 0; i < nrows; i++) {
1738             const char *s;
1739             size_t slen;
1740 0 0         if (read_native_string_ref(buf, len, pos, &s, &slen) <= 0) {
1741             uint64_t j;
1742 0 0         for (j = 0; j < i; j++) SvREFCNT_dec(out[j]);
1743 0           goto fail;
1744             }
1745 0           out[i] = newSVpvn(s, slen);
1746             }
1747 0           return out;
1748              
1749 0           fail:
1750 0           Safefree(out);
1751 0           return NULL;
1752             }
1753              
1754             /* --- Native protocol column encoder (for INSERT) --- */
1755              
1756             /* Append a NUL-terminated copy of `s` (truncated to <64 bytes) into `tmp`.
1757             * Returns the truncated length. Used to give inet_pton a NUL-terminated
1758             * input regardless of the source's storage form. */
1759 0           static size_t copy_for_inet_pton(char tmp[64], const char *s, size_t slen) {
1760 0           size_t cplen = slen < 63 ? slen : 63;
1761 0           memcpy(tmp, s, cplen);
1762 0           tmp[cplen] = '\0';
1763 0           return cplen;
1764             }
1765              
1766             /* Parse an IPv4 address text into a host-order UInt32. Returns 0 on failure
1767             * (matches existing semantics: invalid inputs serialize as 0.0.0.0). */
1768 0           static uint32_t parse_ipv4_to_u32(const char *s, size_t slen) {
1769             struct in_addr addr;
1770             char tmp[64];
1771 0           copy_for_inet_pton(tmp, s, slen);
1772 0 0         if (inet_pton(AF_INET, tmp, &addr) == 1)
1773 0           return ntohl(addr.s_addr);
1774 0           return 0;
1775             }
1776              
1777             /* Parse an IPv6 address text into 16 network-order bytes. Zero-fills on
1778             * failure (matches existing semantics: invalid inputs serialize as ::). */
1779 0           static void parse_ipv6_to_bytes(const char *s, size_t slen, unsigned char addr[16]) {
1780             char tmp[64];
1781 0           copy_for_inet_pton(tmp, s, slen);
1782 0           memset(addr, 0, 16);
1783 0           inet_pton(AF_INET6, tmp, addr);
1784 0           }
1785              
1786             /* Write the LowCardinality version + serialization-type + num_keys prefix
1787             * for a trivial 1:1 dictionary encoding (every value becomes its own dict
1788             * entry). Returns idx_size via *out_idx_size for the caller's index-write loop. */
1789 0           static void lc_write_prefix(native_buf_t *b, uint64_t nrows, size_t *out_idx_size) {
1790             int key_type;
1791             size_t idx_size;
1792 0           uint64_t ser_type, version = 1, nk = nrows;
1793 0 0         if (nrows <= 0xFF) { key_type = 0; idx_size = 1; }
1794 0 0         else if (nrows <= 0xFFFF) { key_type = 1; idx_size = 2; }
1795 0           else { key_type = 2; idx_size = 4; }
1796             /* HasAdditionalKeys | NeedUpdateDictionary */
1797 0           ser_type = (uint64_t)key_type | (1ULL << 9) | (1ULL << 10);
1798 0           nbuf_append(b, (const char *)&version, 8);
1799 0           nbuf_append(b, (const char *)&ser_type, 8);
1800 0           nbuf_append(b, (const char *)&nk, 8);
1801 0           *out_idx_size = idx_size;
1802 0           }
1803              
1804             /* Write num_indices + identity-mapped indices [0..nrows-1] for a trivial
1805             * LowCardinality 1:1 dictionary. */
1806 0           static void lc_write_indices(native_buf_t *b, uint64_t nrows, size_t idx_size) {
1807 0           uint64_t i, ni = nrows;
1808 0           nbuf_append(b, (const char *)&ni, 8);
1809 0 0         for (i = 0; i < nrows; i++) {
1810 0 0         if (idx_size == 1) { uint8_t idx = (uint8_t)i; nbuf_append(b, (const char *)&idx, 1); }
1811 0 0         else if (idx_size == 2) { uint16_t idx = (uint16_t)i; nbuf_append(b, (const char *)&idx, 2); }
1812 0           else { uint32_t idx = (uint32_t)i; nbuf_append(b, (const char *)&idx, 4); }
1813             }
1814 0           }
1815              
1816             /* Parse "YYYY-MM-DD HH:MM:SS[.fff...]" into a DateTime64 scaled int64
1817             * (epoch * 10^precision + fractional). Caller has already verified that
1818             * `s` looks like a date prefix (len >= 10, s[4] == '-'). */
1819 0           static int64_t parse_datetime64_to_scaled(const char *s, size_t len, int precision) {
1820 0           uint32_t epoch = datetime_string_to_epoch(s, len);
1821 0           int64_t v = (int64_t)epoch;
1822             int sc;
1823 0 0         for (sc = 0; sc < precision; sc++) v *= 10;
1824 0 0         if (len >= 20 && s[19] == '.') {
    0          
1825 0           const char *fp = s + 20;
1826 0           const char *fe = s + len;
1827 0           int64_t frac = 0;
1828 0           int digits = 0;
1829 0 0         while (fp < fe && digits < precision) {
    0          
1830 0           frac = frac * 10 + (*fp - '0');
1831 0           fp++;
1832 0           digits++;
1833             }
1834 0 0         while (digits < precision) { frac *= 10; digits++; }
1835 0           v += frac;
1836             }
1837 0           return v;
1838             }
1839              
1840             /* Parse a decimal text representation ("[+-]?digits[.digits]") into a
1841             * scaled int64 raw value. `scale` is the number of fractional digits the
1842             * target Decimal type requires (extra fractional input is truncated). */
1843 0           static int64_t parse_decimal_to_raw(const char *p, int scale) {
1844 0           int neg = 0, frac_digits = 0, s;
1845             /* Accumulate unsigned: a value whose digit count exceeds the target
1846             * Decimal's precision would overflow a signed int64 (UB). Unsigned
1847             * arithmetic wraps with defined behavior; valid in-range input (total
1848             * significant digits <= 18 for Decimal64) is unaffected. Out-of-range
1849             * input yields a wrapped value, which the server then rejects. */
1850 0           uint64_t integer_part = 0, frac_part = 0, raw;
1851 0 0         if (*p == '-') { neg = 1; p++; }
1852 0 0         else if (*p == '+') p++;
1853 0 0         while (*p >= '0' && *p <= '9') { integer_part = integer_part * 10 + (uint64_t)(*p - '0'); p++; }
    0          
1854 0 0         if (*p == '.') {
1855 0           p++;
1856 0 0         while (*p >= '0' && *p <= '9' && frac_digits < scale) {
    0          
    0          
1857 0           frac_part = frac_part * 10 + (uint64_t)(*p - '0');
1858 0           p++;
1859 0           frac_digits++;
1860             }
1861             }
1862 0 0         for (s = frac_digits; s < scale; s++) frac_part *= 10;
1863 0 0         for (s = 0; s < scale; s++) integer_part *= 10;
1864 0           raw = integer_part + frac_part;
1865 0 0         return (int64_t)(neg ? (uint64_t)0 - raw : raw);
1866             }
1867              
1868             /* Parse a UUID string into 16 LE bytes (two LE UInt64 halves of the 128-bit
1869             * value). Zero-fills the output if the input is shorter than 36 bytes. */
1870 0           static void parse_uuid_to_le_bytes(const char *s, size_t slen, unsigned char ubytes[16]) {
1871 0 0         if (slen < 36) {
1872 0           memset(ubytes, 0, 16);
1873 0           return;
1874             }
1875             {
1876 0           unsigned char raw[16] = {0};
1877 0           int k = 0, j;
1878 0 0         for (j = 0; j < (int)slen && k < 32; j++) {
    0          
1879 0           char c = s[j];
1880             unsigned char nibble;
1881 0 0         if (c == '-') continue;
1882 0 0         if (c >= '0' && c <= '9') nibble = c - '0';
    0          
1883 0 0         else if (c >= 'a' && c <= 'f') nibble = 10 + c - 'a';
    0          
1884 0 0         else if (c >= 'A' && c <= 'F') nibble = 10 + c - 'A';
    0          
1885 0           else nibble = 0;
1886 0 0         if (k % 2 == 0) raw[k/2] = nibble << 4;
1887 0           else raw[k/2] |= nibble;
1888 0           k++;
1889             }
1890             /* Reverse each 8-byte half for LE storage */
1891 0 0         for (k = 0; k < 8; k++) ubytes[k] = raw[7 - k];
1892 0 0         for (k = 0; k < 8; k++) ubytes[8 + k] = raw[15 - k];
1893             }
1894             }
1895              
1896             /* TSV unescape: \\ → \, \n → newline, \t → tab, \0 → null byte */
1897 0           static size_t tsv_unescape(const char *src, size_t src_len, char *dst) {
1898 0           size_t i, j = 0;
1899 0 0         for (i = 0; i < src_len; i++) {
1900 0 0         if (src[i] == '\\' && i + 1 < src_len) {
    0          
1901 0           switch (src[i+1]) {
1902 0           case '\\': dst[j++] = '\\'; i++; break;
1903 0           case 'n': dst[j++] = '\n'; i++; break;
1904 0           case 't': dst[j++] = '\t'; i++; break;
1905 0           case '0': dst[j++] = '\0'; i++; break;
1906 0           case '\'': dst[j++] = '\''; i++; break;
1907 0           case 'b': dst[j++] = '\b'; i++; break;
1908 0           case 'r': dst[j++] = '\r'; i++; break;
1909 0           case 'a': dst[j++] = '\a'; i++; break;
1910 0           case 'f': dst[j++] = '\f'; i++; break;
1911 0           default: dst[j++] = src[i]; break;
1912             }
1913             } else {
1914 0           dst[j++] = src[i];
1915             }
1916             }
1917 0           return j;
1918             }
1919              
1920 0           static int is_tsv_null(const char *s, size_t len) {
1921 0 0         return len == 2 && s[0] == '\\' && s[1] == 'N';
    0          
    0          
1922             }
1923              
1924             /* TSV escape: inverse of tsv_unescape — appends escaped bytes to buffer */
1925 0           static void tsv_escape(native_buf_t *b, const char *s, size_t len) {
1926 0           size_t i, start = 0;
1927 0 0         for (i = 0; i < len; i++) {
1928 0           char esc = 0;
1929 0           switch (s[i]) {
1930 0           case '\\': esc = '\\'; break;
1931 0           case '\t': esc = 't'; break;
1932 0           case '\n': esc = 'n'; break;
1933 0           case '\0': esc = '0'; break;
1934 0           case '\b': esc = 'b'; break;
1935 0           case '\r': esc = 'r'; break;
1936 0           case '\a': esc = 'a'; break;
1937 0           case '\f': esc = 'f'; break;
1938             }
1939 0 0         if (esc) {
1940 0 0         if (i > start)
1941 0           nbuf_append(b, s + start, i - start);
1942 0           nbuf_grow(b, 2);
1943 0           b->data[b->len++] = '\\';
1944 0           b->data[b->len++] = esc;
1945 0           start = i + 1;
1946             }
1947             }
1948 0 0         if (start < len)
1949 0           nbuf_append(b, s + start, len - start);
1950 0           }
1951              
1952             /* Serialize an AV of AVs to TabSeparated format for HTTP INSERT.
1953             * Returns malloc'd buffer; caller must Safefree(). */
1954 0           static char* serialize_av_to_tsv(pTHX_ AV *rows, size_t *out_len) {
1955             native_buf_t b;
1956 0           SSize_t nrows = av_len(rows) + 1;
1957             SSize_t r;
1958              
1959 0           nbuf_init(&b);
1960              
1961 0 0         for (r = 0; r < nrows; r++) {
1962 0           SV **row_svp = av_fetch(rows, r, 0);
1963             AV *row;
1964             SSize_t ncols, c;
1965              
1966 0 0         if (!row_svp || !SvROK(*row_svp) ||
    0          
1967 0 0         SvTYPE(SvRV(*row_svp)) != SVt_PVAV) {
1968 0           Safefree(b.data);
1969 0           croak("insert data: row %" IVdf " is not an ARRAY ref", (IV)r);
1970             }
1971 0           row = (AV *)SvRV(*row_svp);
1972 0           ncols = av_len(row) + 1;
1973              
1974 0 0         for (c = 0; c < ncols; c++) {
1975 0           SV **val_svp = av_fetch(row, c, 0);
1976 0 0         if (c > 0)
1977 0           nbuf_u8(&b, '\t');
1978              
1979 0 0         if (!val_svp || !SvOK(*val_svp)) {
    0          
1980 0           nbuf_append(&b, "\\N", 2);
1981 0 0         } else if (SvROK(*val_svp)) {
1982             /* Nested AV/HV refs (Array/Tuple/Map columns) cannot be
1983             * round-tripped through TSV without column-type info from
1984             * the server, which the HTTP path doesn't have. Fail loudly
1985             * rather than silently sending ARRAY(0x...) garbage. */
1986 0           Safefree(b.data);
1987 0           croak("insert data: row %" IVdf " column %" IVdf " is a "
1988             "reference; nested Array/Tuple/Map columns require "
1989             "the native protocol", (IV)r, (IV)c);
1990             } else {
1991             STRLEN vlen;
1992 0           const char *v = SvPV(*val_svp, vlen);
1993 0           tsv_escape(&b, v, vlen);
1994             }
1995             }
1996 0           nbuf_u8(&b, '\n');
1997             }
1998              
1999 0           *out_len = b.len;
2000 0           return b.data;
2001             }
2002              
2003             /*
2004             * Encode a column of text values into native binary format.
2005             * Returns 1 on success, 0 if type is unsupported (caller falls back to inline SQL).
2006             */
2007 0           static int encode_column_text(native_buf_t *b,
2008             const char **values, size_t *value_lens,
2009             uint64_t nrows, col_type_t *ct) {
2010             uint64_t i;
2011              
2012 0           switch (ct->code) {
2013 0           case CT_INT8: case CT_ENUM8: {
2014 0 0         for (i = 0; i < nrows; i++) {
2015 0           int8_t v = (int8_t)strtol(values[i], NULL, 10);
2016 0           nbuf_append(b, (const char *)&v, 1);
2017             }
2018 0           return 1;
2019             }
2020 0           case CT_INT16: case CT_ENUM16: {
2021 0 0         for (i = 0; i < nrows; i++) {
2022 0           int16_t v = (int16_t)strtol(values[i], NULL, 10);
2023 0           nbuf_append(b, (const char *)&v, 2);
2024             }
2025 0           return 1;
2026             }
2027 0           case CT_INT32: case CT_DATE32: {
2028 0 0         for (i = 0; i < nrows; i++) {
2029             int32_t v;
2030 0 0         if (ct->code == CT_DATE32 && value_lens[i] >= 10
    0          
2031 0 0         && values[i][4] == '-')
2032 0           v = date_string_to_days(values[i], value_lens[i]);
2033             else
2034 0           v = (int32_t)strtol(values[i], NULL, 10);
2035 0           nbuf_append(b, (const char *)&v, 4);
2036             }
2037 0           return 1;
2038             }
2039 0           case CT_INT64: {
2040 0 0         for (i = 0; i < nrows; i++) {
2041 0           int64_t v = (int64_t)strtoll(values[i], NULL, 10);
2042 0           nbuf_append(b, (const char *)&v, 8);
2043             }
2044 0           return 1;
2045             }
2046 0           case CT_UINT8: case CT_BOOL: {
2047 0 0         for (i = 0; i < nrows; i++) {
2048 0           uint8_t v = (uint8_t)strtoul(values[i], NULL, 10);
2049 0           nbuf_append(b, (const char *)&v, 1);
2050             }
2051 0           return 1;
2052             }
2053 0           case CT_UINT16: case CT_DATE: {
2054 0 0         for (i = 0; i < nrows; i++) {
2055             uint16_t v;
2056 0 0         if (ct->code == CT_DATE && value_lens[i] >= 10
    0          
2057 0 0         && values[i][4] == '-')
2058 0           v = (uint16_t)date_string_to_days(values[i], value_lens[i]);
2059             else
2060 0           v = (uint16_t)strtoul(values[i], NULL, 10);
2061 0           nbuf_append(b, (const char *)&v, 2);
2062             }
2063 0           return 1;
2064             }
2065 0           case CT_UINT32: case CT_DATETIME: {
2066 0 0         for (i = 0; i < nrows; i++) {
2067             uint32_t v;
2068 0 0         if (ct->code == CT_DATETIME && value_lens[i] >= 10
    0          
2069 0 0         && values[i][4] == '-')
2070 0           v = datetime_string_to_epoch(values[i], value_lens[i]);
2071             else
2072 0           v = (uint32_t)strtoul(values[i], NULL, 10);
2073 0           nbuf_append(b, (const char *)&v, 4);
2074             }
2075 0           return 1;
2076             }
2077 0           case CT_UINT64: {
2078 0 0         for (i = 0; i < nrows; i++) {
2079 0           uint64_t v = (uint64_t)strtoull(values[i], NULL, 10);
2080 0           nbuf_append(b, (const char *)&v, 8);
2081             }
2082 0           return 1;
2083             }
2084 0           case CT_FLOAT32: {
2085 0 0         for (i = 0; i < nrows; i++) {
2086 0           float v = strtof(values[i], NULL);
2087 0           nbuf_append(b, (const char *)&v, 4);
2088             }
2089 0           return 1;
2090             }
2091 0           case CT_FLOAT64: {
2092 0 0         for (i = 0; i < nrows; i++) {
2093 0           double v = strtod(values[i], NULL);
2094 0           nbuf_append(b, (const char *)&v, 8);
2095             }
2096 0           return 1;
2097             }
2098 0           case CT_DATETIME64: {
2099 0 0         for (i = 0; i < nrows; i++) {
2100             int64_t v;
2101 0 0         if (value_lens[i] >= 10 && values[i][4] == '-')
    0          
2102 0           v = parse_datetime64_to_scaled(values[i], value_lens[i], ct->param);
2103             else
2104 0           v = (int64_t)strtoll(values[i], NULL, 10);
2105 0           nbuf_append(b, (const char *)&v, 8);
2106             }
2107 0           return 1;
2108             }
2109 0           case CT_DECIMAL32: {
2110 0 0         for (i = 0; i < nrows; i++) {
2111 0           int32_t v = (int32_t)parse_decimal_to_raw(values[i], ct->param);
2112 0           nbuf_append(b, (const char *)&v, 4);
2113             }
2114 0           return 1;
2115             }
2116 0           case CT_DECIMAL64: {
2117 0 0         for (i = 0; i < nrows; i++) {
2118 0           int64_t v = parse_decimal_to_raw(values[i], ct->param);
2119 0           nbuf_append(b, (const char *)&v, 8);
2120             }
2121 0           return 1;
2122             }
2123 0           case CT_DECIMAL256: {
2124             /* TSV input is the raw 32-byte LE buffer (same as encode_column_sv);
2125             * unescape backslash sequences first if present. tsv_unescape
2126             * writes up to value_lens[i] bytes, so size the temp buffer to
2127             * the input length (heap-alloc to avoid stack overflow on
2128             * malformed/oversize TSV fields). */
2129 0 0         for (i = 0; i < nrows; i++) {
2130 0           char buf[32] = {0};
2131 0 0         if (memchr(values[i], '\\', value_lens[i])) {
2132             char *tmp;
2133 0 0         Newx(tmp, value_lens[i] ? value_lens[i] : 1, char);
2134 0           size_t ulen = tsv_unescape(values[i], value_lens[i], tmp);
2135 0           memcpy(buf, tmp, ulen < 32 ? ulen : 32);
2136 0           Safefree(tmp);
2137             } else {
2138 0           memcpy(buf, values[i], value_lens[i] < 32 ? value_lens[i] : 32);
2139             }
2140 0           nbuf_append(b, buf, 32);
2141             }
2142 0           return 1;
2143             }
2144 0           case CT_BFLOAT16: {
2145 0 0         for (i = 0; i < nrows; i++) {
2146 0           float fv = (float)strtod(values[i], NULL);
2147 0           uint8_t b32[4]; memcpy(b32, &fv, 4);
2148 0           nbuf_append(b, (const char *)&b32[2], 2);
2149             }
2150 0           return 1;
2151             }
2152 0           case CT_STRING: {
2153 0 0         for (i = 0; i < nrows; i++) {
2154 0 0         if (memchr(values[i], '\\', value_lens[i])) {
2155             char *tmp;
2156             size_t ulen;
2157 0           Newx(tmp, value_lens[i], char);
2158 0           ulen = tsv_unescape(values[i], value_lens[i], tmp);
2159 0           nbuf_string(b, tmp, ulen);
2160 0           Safefree(tmp);
2161             } else {
2162 0           nbuf_string(b, values[i], value_lens[i]);
2163             }
2164             }
2165 0           return 1;
2166             }
2167 0           case CT_FIXEDSTRING: {
2168 0           size_t fsz = (size_t)ct->param;
2169 0 0         for (i = 0; i < nrows; i++) {
2170 0 0         if (memchr(values[i], '\\', value_lens[i])) {
2171             char *tmp;
2172 0           size_t tmp_sz = value_lens[i] > fsz ? value_lens[i] : fsz;
2173 0           Newxz(tmp, tmp_sz, char);
2174 0           (void)tsv_unescape(values[i], value_lens[i], tmp);
2175 0           nbuf_append(b, tmp, fsz);
2176 0           Safefree(tmp);
2177             } else {
2178 0           nbuf_grow(b, fsz);
2179             {
2180 0           size_t cplen = value_lens[i] < fsz ? value_lens[i] : fsz;
2181 0           memcpy(b->data + b->len, values[i], cplen);
2182 0 0         if (cplen < fsz)
2183 0           memset(b->data + b->len + cplen, 0, fsz - cplen);
2184             }
2185 0           b->len += fsz;
2186             }
2187             }
2188 0           return 1;
2189             }
2190 0           case CT_UUID: {
2191 0 0         for (i = 0; i < nrows; i++) {
2192             unsigned char ubytes[16];
2193 0           parse_uuid_to_le_bytes(values[i], value_lens[i], ubytes);
2194 0           nbuf_append(b, (const char *)ubytes, 16);
2195             }
2196 0           return 1;
2197             }
2198 0           case CT_IPV4: {
2199 0 0         for (i = 0; i < nrows; i++) {
2200 0           uint32_t v = parse_ipv4_to_u32(values[i], value_lens[i]);
2201 0           nbuf_append(b, (const char *)&v, 4);
2202             }
2203 0           return 1;
2204             }
2205 0           case CT_IPV6: {
2206 0 0         for (i = 0; i < nrows; i++) {
2207             unsigned char addr[16];
2208 0           parse_ipv6_to_bytes(values[i], value_lens[i], addr);
2209 0           nbuf_append(b, (const char *)addr, 16);
2210             }
2211 0           return 1;
2212             }
2213 0           case CT_NULLABLE: {
2214             /* null bitmap + inner column */
2215             uint8_t *nulls;
2216             const char **inner_vals;
2217             size_t *inner_lens;
2218             static const char zero_str[] = "0";
2219             static const char empty_str[] = "";
2220              
2221 0           Newx(nulls, nrows, uint8_t);
2222 0 0         Newxz(inner_vals, nrows, const char *);
2223 0 0         Newx(inner_lens, nrows, size_t);
2224              
2225 0 0         for (i = 0; i < nrows; i++) {
2226 0 0         if (is_tsv_null(values[i], value_lens[i])) {
2227 0           nulls[i] = 1;
2228             /* placeholder for null — use zero/empty depending on inner type */
2229 0 0         if (ct->inner->code == CT_STRING || ct->inner->code == CT_FIXEDSTRING) {
    0          
2230 0           inner_vals[i] = empty_str;
2231 0           inner_lens[i] = 0;
2232             } else {
2233 0           inner_vals[i] = zero_str;
2234 0           inner_lens[i] = 1;
2235             }
2236             } else {
2237 0           nulls[i] = 0;
2238 0           inner_vals[i] = values[i];
2239 0           inner_lens[i] = value_lens[i];
2240             }
2241             }
2242              
2243 0           nbuf_append(b, (const char *)nulls, nrows);
2244             {
2245 0           int rc = encode_column_text(b, inner_vals, inner_lens, nrows, ct->inner);
2246 0           Safefree(nulls);
2247 0           Safefree(inner_vals);
2248 0           Safefree(inner_lens);
2249 0           return rc;
2250             }
2251             }
2252 0           case CT_LOWCARDINALITY: {
2253             /* Trivial 1:1 dictionary: each value is its own dict entry. */
2254             size_t idx_size;
2255             native_buf_t dict_buf;
2256             int rc;
2257              
2258 0           lc_write_prefix(b, nrows, &idx_size);
2259 0           nbuf_init(&dict_buf);
2260 0           rc = encode_column_text(&dict_buf, values, value_lens, nrows, ct->inner);
2261 0 0         if (!rc) { Safefree(dict_buf.data); return 0; }
2262 0           nbuf_append(b, dict_buf.data, dict_buf.len);
2263 0           Safefree(dict_buf.data);
2264 0           lc_write_indices(b, nrows, idx_size);
2265 0           return 1;
2266             }
2267 0           default:
2268 0           return 0; /* unsupported type — fall back to inline SQL */
2269             }
2270             }
2271              
2272             /*
2273             * Encode a column of Perl SV values into native binary format.
2274             * Like encode_column_text() but takes SVs directly — no TSV parsing/unescaping.
2275             * Returns 1 on success, 0 if type is unsupported.
2276             */
2277 0           static int encode_column_sv(pTHX_ native_buf_t *b,
2278             SV **values, uint64_t nrows,
2279             col_type_t *ct) {
2280             uint64_t i;
2281              
2282 0           switch (ct->code) {
2283 0           case CT_INT8: case CT_ENUM8: {
2284 0 0         for (i = 0; i < nrows; i++) {
2285 0 0         int8_t v = SvIOK(values[i]) ? (int8_t)SvIV(values[i])
2286 0           : (int8_t)strtol(SvPV_nolen(values[i]), NULL, 10);
2287 0           nbuf_append(b, (const char *)&v, 1);
2288             }
2289 0           return 1;
2290             }
2291 0           case CT_INT16: case CT_ENUM16: {
2292 0 0         for (i = 0; i < nrows; i++) {
2293 0 0         int16_t v = SvIOK(values[i]) ? (int16_t)SvIV(values[i])
2294 0           : (int16_t)strtol(SvPV_nolen(values[i]), NULL, 10);
2295 0           nbuf_append(b, (const char *)&v, 2);
2296             }
2297 0           return 1;
2298             }
2299 0           case CT_INT32: case CT_DATE32: {
2300 0 0         for (i = 0; i < nrows; i++) {
2301             int32_t v;
2302 0 0         if (SvIOK(values[i])) {
2303 0           v = (int32_t)SvIV(values[i]);
2304             } else {
2305             STRLEN vlen;
2306 0           const char *s = SvPV(values[i], vlen);
2307 0 0         if (ct->code == CT_DATE32 && vlen >= 10 && s[4] == '-')
    0          
    0          
2308 0           v = date_string_to_days(s, vlen);
2309             else
2310 0           v = (int32_t)strtol(s, NULL, 10);
2311             }
2312 0           nbuf_append(b, (const char *)&v, 4);
2313             }
2314 0           return 1;
2315             }
2316 0           case CT_INT64: {
2317 0 0         for (i = 0; i < nrows; i++) {
2318 0           int64_t v = SvIOK(values[i]) ? (int64_t)SvIV(values[i])
2319 0 0         : (int64_t)strtoll(SvPV_nolen(values[i]), NULL, 10);
2320 0           nbuf_append(b, (const char *)&v, 8);
2321             }
2322 0           return 1;
2323             }
2324 0           case CT_UINT8: case CT_BOOL: {
2325 0 0         for (i = 0; i < nrows; i++) {
2326 0 0         uint8_t v = SvIOK(values[i]) ? (uint8_t)SvUV(values[i])
2327 0           : (uint8_t)strtoul(SvPV_nolen(values[i]), NULL, 10);
2328 0           nbuf_append(b, (const char *)&v, 1);
2329             }
2330 0           return 1;
2331             }
2332 0           case CT_UINT16: case CT_DATE: {
2333 0 0         for (i = 0; i < nrows; i++) {
2334             uint16_t v;
2335 0 0         if (SvIOK(values[i])) {
2336 0           v = (uint16_t)SvUV(values[i]);
2337             } else {
2338             STRLEN vlen;
2339 0           const char *s = SvPV(values[i], vlen);
2340 0 0         if (ct->code == CT_DATE && vlen >= 10 && s[4] == '-')
    0          
    0          
2341 0           v = (uint16_t)date_string_to_days(s, vlen);
2342             else
2343 0           v = (uint16_t)strtoul(s, NULL, 10);
2344             }
2345 0           nbuf_append(b, (const char *)&v, 2);
2346             }
2347 0           return 1;
2348             }
2349 0           case CT_UINT32: case CT_DATETIME: {
2350 0 0         for (i = 0; i < nrows; i++) {
2351             uint32_t v;
2352 0 0         if (SvIOK(values[i])) {
2353 0           v = (uint32_t)SvUV(values[i]);
2354             } else {
2355             STRLEN vlen;
2356 0           const char *s = SvPV(values[i], vlen);
2357 0 0         if (ct->code == CT_DATETIME && vlen >= 10 && s[4] == '-')
    0          
    0          
2358 0           v = datetime_string_to_epoch(s, vlen);
2359             else
2360 0           v = (uint32_t)strtoul(s, NULL, 10);
2361             }
2362 0           nbuf_append(b, (const char *)&v, 4);
2363             }
2364 0           return 1;
2365             }
2366 0           case CT_UINT64: {
2367 0 0         for (i = 0; i < nrows; i++) {
2368 0           uint64_t v = SvIOK(values[i]) ? (uint64_t)SvUV(values[i])
2369 0 0         : (uint64_t)strtoull(SvPV_nolen(values[i]), NULL, 10);
2370 0           nbuf_append(b, (const char *)&v, 8);
2371             }
2372 0           return 1;
2373             }
2374 0           case CT_FLOAT32: {
2375 0 0         for (i = 0; i < nrows; i++) {
2376 0           float v = SvNOK(values[i]) ? (float)SvNV(values[i])
2377 0 0         : strtof(SvPV_nolen(values[i]), NULL);
2378 0           nbuf_append(b, (const char *)&v, 4);
2379             }
2380 0           return 1;
2381             }
2382 0           case CT_FLOAT64: {
2383 0 0         for (i = 0; i < nrows; i++) {
2384 0           double v = SvNOK(values[i]) ? SvNV(values[i])
2385 0 0         : strtod(SvPV_nolen(values[i]), NULL);
2386 0           nbuf_append(b, (const char *)&v, 8);
2387             }
2388 0           return 1;
2389             }
2390 0           case CT_DATETIME64: {
2391 0 0         for (i = 0; i < nrows; i++) {
2392             int64_t v;
2393 0 0         if (SvIOK(values[i])) {
2394 0           v = (int64_t)SvIV(values[i]);
2395             } else {
2396             STRLEN vlen;
2397 0           const char *s = SvPV(values[i], vlen);
2398 0 0         if (vlen >= 10 && s[4] == '-')
    0          
2399 0           v = parse_datetime64_to_scaled(s, vlen, ct->param);
2400             else
2401 0           v = (int64_t)strtoll(s, NULL, 10);
2402             }
2403 0           nbuf_append(b, (const char *)&v, 8);
2404             }
2405 0           return 1;
2406             }
2407 0           case CT_DECIMAL32: {
2408 0 0         for (i = 0; i < nrows; i++) {
2409 0           int32_t v = (int32_t)parse_decimal_to_raw(SvPV_nolen(values[i]), ct->param);
2410 0           nbuf_append(b, (const char *)&v, 4);
2411             }
2412 0           return 1;
2413             }
2414 0           case CT_DECIMAL64: {
2415 0 0         for (i = 0; i < nrows; i++) {
2416 0           int64_t v = parse_decimal_to_raw(SvPV_nolen(values[i]), ct->param);
2417 0           nbuf_append(b, (const char *)&v, 8);
2418             }
2419 0           return 1;
2420             }
2421 0           case CT_DECIMAL256: {
2422             /* Caller passes raw 32 bytes (LE, signed) — same shape as the
2423             * decoder delivers. Useful with Math::BigInt::to_bytes(LE). */
2424 0 0         for (i = 0; i < nrows; i++) {
2425             STRLEN slen;
2426 0           const char *s = SvPV(values[i], slen);
2427 0           char buf[32] = {0};
2428 0           memcpy(buf, s, slen < 32 ? (size_t)slen : 32);
2429 0           nbuf_append(b, buf, 32);
2430             }
2431 0           return 1;
2432             }
2433 0           case CT_BFLOAT16: {
2434 0 0         for (i = 0; i < nrows; i++) {
2435 0           float fv = SvNOK(values[i]) ? (float)SvNV(values[i])
2436 0 0         : (float)strtod(SvPV_nolen(values[i]), NULL);
2437 0           uint8_t b32[4]; memcpy(b32, &fv, 4);
2438             /* Truncate to top 16 bits — bytes [2,3] of a LE Float32. */
2439 0           nbuf_append(b, (const char *)&b32[2], 2);
2440             }
2441 0           return 1;
2442             }
2443 0           case CT_STRING: {
2444 0 0         for (i = 0; i < nrows; i++) {
2445             STRLEN vlen;
2446 0           const char *v = SvPV(values[i], vlen);
2447 0           nbuf_string(b, v, vlen);
2448             }
2449 0           return 1;
2450             }
2451 0           case CT_FIXEDSTRING: {
2452 0           size_t fsz = (size_t)ct->param;
2453 0 0         for (i = 0; i < nrows; i++) {
2454             STRLEN vlen;
2455 0           const char *v = SvPV(values[i], vlen);
2456 0           size_t cplen = (size_t)vlen < fsz ? (size_t)vlen : fsz;
2457 0           nbuf_grow(b, fsz);
2458 0           memcpy(b->data + b->len, v, cplen);
2459 0 0         if (cplen < fsz)
2460 0           memset(b->data + b->len + cplen, 0, fsz - cplen);
2461 0           b->len += fsz;
2462             }
2463 0           return 1;
2464             }
2465 0           case CT_UUID: {
2466 0 0         for (i = 0; i < nrows; i++) {
2467             unsigned char ubytes[16];
2468             STRLEN slen;
2469 0           const char *s = SvPV(values[i], slen);
2470 0           parse_uuid_to_le_bytes(s, slen, ubytes);
2471 0           nbuf_append(b, (const char *)ubytes, 16);
2472             }
2473 0           return 1;
2474             }
2475 0           case CT_IPV4: {
2476 0 0         for (i = 0; i < nrows; i++) {
2477             STRLEN vlen;
2478 0           const char *s = SvPV(values[i], vlen);
2479 0           uint32_t v = parse_ipv4_to_u32(s, (size_t)vlen);
2480 0           nbuf_append(b, (const char *)&v, 4);
2481             }
2482 0           return 1;
2483             }
2484 0           case CT_IPV6: {
2485 0 0         for (i = 0; i < nrows; i++) {
2486             unsigned char addr[16];
2487             STRLEN vlen;
2488 0           const char *s = SvPV(values[i], vlen);
2489 0           parse_ipv6_to_bytes(s, (size_t)vlen, addr);
2490 0           nbuf_append(b, (const char *)addr, 16);
2491             }
2492 0           return 1;
2493             }
2494 0           case CT_NULLABLE: {
2495             uint8_t *nulls;
2496             SV **inner_vals;
2497             SV *zero_sv;
2498              
2499 0           Newx(nulls, nrows, uint8_t);
2500 0 0         Newx(inner_vals, nrows ? nrows : 1, SV *);
    0          
    0          
2501 0           zero_sv = newSViv(0);
2502              
2503 0 0         for (i = 0; i < nrows; i++) {
2504 0 0         if (!SvOK(values[i])) {
2505 0           nulls[i] = 1;
2506 0           inner_vals[i] = zero_sv;
2507             } else {
2508 0           nulls[i] = 0;
2509 0           inner_vals[i] = values[i];
2510             }
2511             }
2512              
2513 0           nbuf_append(b, (const char *)nulls, nrows);
2514             {
2515 0           int rc = encode_column_sv(aTHX_ b, inner_vals, nrows, ct->inner);
2516 0           Safefree(nulls);
2517 0           Safefree(inner_vals);
2518 0           SvREFCNT_dec(zero_sv);
2519 0           return rc;
2520             }
2521             }
2522 0           case CT_LOWCARDINALITY: {
2523             size_t idx_size;
2524             native_buf_t dict_buf;
2525             int rc;
2526              
2527 0           lc_write_prefix(b, nrows, &idx_size);
2528 0           nbuf_init(&dict_buf);
2529 0           rc = encode_column_sv(aTHX_ &dict_buf, values, nrows, ct->inner);
2530 0 0         if (!rc) { Safefree(dict_buf.data); return 0; }
2531 0           nbuf_append(b, dict_buf.data, dict_buf.len);
2532 0           Safefree(dict_buf.data);
2533 0           lc_write_indices(b, nrows, idx_size);
2534 0           return 1;
2535             }
2536 0           case CT_ARRAY: {
2537             /* Each value must be an AV ref. Wire format: offsets + flat inner data */
2538 0           uint64_t total = 0;
2539             uint64_t *offsets;
2540             SV **all_elems;
2541 0           uint64_t pos = 0;
2542             int rc;
2543              
2544 0 0         for (i = 0; i < nrows; i++) {
2545             AV *av;
2546 0 0         if (!SvROK(values[i]) || SvTYPE(SvRV(values[i])) != SVt_PVAV)
    0          
2547 0           return 0;
2548 0           av = (AV *)SvRV(values[i]);
2549 0 0         { SSize_t cnt = av_len(av) + 1; if (cnt > 0) total += (uint64_t)cnt; }
2550             }
2551              
2552 0 0         Newx(offsets, nrows, uint64_t);
2553 0 0         Newx(all_elems, total ? total : 1, SV *);
    0          
    0          
2554              
2555 0 0         for (i = 0; i < nrows; i++) {
2556 0           AV *av = (AV *)SvRV(values[i]);
2557 0           SSize_t n = av_len(av) + 1, j;
2558 0 0         for (j = 0; j < n; j++) {
2559 0           SV **ep = av_fetch(av, j, 0);
2560 0 0         all_elems[pos++] = ep ? *ep : &PL_sv_undef;
2561             }
2562 0           offsets[i] = pos;
2563             }
2564              
2565             /* write offsets as uint64 LE */
2566 0           nbuf_append(b, (const char *)offsets, nrows * 8);
2567 0           rc = encode_column_sv(aTHX_ b, all_elems, total, ct->inner);
2568 0           Safefree(offsets);
2569 0           Safefree(all_elems);
2570 0           return rc;
2571             }
2572 0           case CT_TUPLE: {
2573             /* Each value must be an AV ref with num_inners elements */
2574             int j;
2575 0 0         for (j = 0; j < ct->num_inners; j++) {
2576             SV **col_vals;
2577             int rc;
2578 0 0         Newx(col_vals, nrows ? nrows : 1, SV *);
    0          
    0          
2579 0 0         for (i = 0; i < nrows; i++) {
2580             AV *av;
2581             SV **ep;
2582 0 0         if (!SvROK(values[i]) || SvTYPE(SvRV(values[i])) != SVt_PVAV) {
    0          
2583 0           Safefree(col_vals);
2584 0           return 0;
2585             }
2586 0           av = (AV *)SvRV(values[i]);
2587 0           ep = av_fetch(av, j, 0);
2588 0 0         col_vals[i] = ep ? *ep : &PL_sv_undef;
2589             }
2590 0           rc = encode_column_sv(aTHX_ b, col_vals, nrows, ct->inners[j]);
2591 0           Safefree(col_vals);
2592 0 0         if (!rc) return 0;
2593             }
2594 0           return 1;
2595             }
2596 0           case CT_MAP: {
2597             /* Each value must be a hashref. Wire format: offsets + key column + value column */
2598 0           uint64_t total = 0;
2599             uint64_t *offsets;
2600             SV **all_keys, **all_vals;
2601 0           uint64_t pos = 0;
2602             int rc;
2603              
2604 0 0         if (ct->num_inners != 2) return 0;
2605              
2606 0 0         for (i = 0; i < nrows; i++) {
2607             HV *hv;
2608 0 0         if (!SvROK(values[i]) || SvTYPE(SvRV(values[i])) != SVt_PVHV)
    0          
2609 0           return 0;
2610 0           hv = (HV *)SvRV(values[i]);
2611 0 0         total += HvUSEDKEYS(hv);
2612             }
2613              
2614 0 0         Newx(offsets, nrows, uint64_t);
2615 0 0         Newx(all_keys, total ? total : 1, SV *);
    0          
    0          
2616 0 0         Newx(all_vals, total ? total : 1, SV *);
    0          
    0          
2617              
2618 0 0         for (i = 0; i < nrows; i++) {
2619 0           HV *hv = (HV *)SvRV(values[i]);
2620             HE *he;
2621 0           hv_iterinit(hv);
2622 0 0         while ((he = hv_iternext(hv))) {
2623 0           all_keys[pos] = hv_iterkeysv(he);
2624 0           all_vals[pos] = hv_iterval(hv, he);
2625 0           pos++;
2626             }
2627 0           offsets[i] = pos;
2628             }
2629              
2630 0           nbuf_append(b, (const char *)offsets, nrows * 8);
2631 0           rc = encode_column_sv(aTHX_ b, all_keys, total, ct->inners[0]);
2632 0 0         if (rc) rc = encode_column_sv(aTHX_ b, all_vals, total, ct->inners[1]);
2633 0           Safefree(offsets);
2634 0           Safefree(all_keys);
2635 0           Safefree(all_vals);
2636 0           return rc;
2637             }
2638 0           case CT_JSON: {
2639             /* Wire layout (V1) per Clickhouse-Encoder/doc/json-research:
2640             * Object prefix: UInt64=0, varint K (twice), K * lenstr path
2641             * Per-path Dynamic prefix: UInt64=1, varint T (twice),
2642             * T * lenstr type-name, UInt64=0
2643             * Per-path data: N disc bytes, then per-variant data in lex
2644             * order including a SharedVariant slot
2645             * Shared data trailer: N * UInt64 LE zero (no shared paths)
2646             */
2647 0           ENTER; SAVETMPS;
2648             int p;
2649 0           HV **row_hvs = NULL;
2650 0 0         if (nrows > 0) {
2651 0 0         Newxz(row_hvs, nrows, HV*);
2652 0           SAVEFREEPV(row_hvs);
2653             }
2654 0           HV *all_paths = (HV*)sv_2mortal((SV*)newHV());
2655              
2656 0 0         for (i = 0; i < nrows; i++) {
2657 0           SV *val = values[i];
2658 0 0         if (!SvOK(val)) { row_hvs[i] = NULL; continue; }
2659 0 0         if (!SvROK(val) || SvTYPE(SvRV(val)) != SVt_PVHV) {
    0          
2660 0 0         FREETMPS; LEAVE;
2661 0           return 0;
2662             }
2663 0           HV *flat = (HV*)sv_2mortal((SV*)newHV());
2664 0           flatten_json_hash(aTHX_ (HV*)SvRV(val), NULL, 0, flat);
2665 0           row_hvs[i] = flat;
2666             }
2667              
2668             /* Typed paths: extract per-row values from each flat HV (and
2669             * hv_delete to keep them out of the dynamic-paths discovery).
2670             * Missing keys substitute undef as a placeholder; the inner
2671             * encoder treats undef as the type's zero/default. */
2672 0 0         int n_typed = (ct->code == CT_JSON) ? ct->num_inners : 0;
2673 0           SV ***typed_vals = NULL;
2674 0 0         if (n_typed > 0 && nrows > 0) {
    0          
2675 0           Newxz(typed_vals, n_typed, SV **);
2676 0           SAVEFREEPV(typed_vals);
2677             int tp;
2678 0 0         for (tp = 0; tp < n_typed; tp++) {
2679 0 0         Newxz(typed_vals[tp], nrows, SV *);
2680 0           SAVEFREEPV(typed_vals[tp]);
2681 0           size_t nlen = strlen(ct->inner_names[tp]);
2682 0 0         for (i = 0; i < nrows; i++) {
2683 0 0         if (!row_hvs[i]) {
2684 0           typed_vals[tp][i] = &PL_sv_undef;
2685 0           continue;
2686             }
2687 0           SV **e = hv_fetch(row_hvs[i], ct->inner_names[tp],
2688             (I32)nlen, 0);
2689 0 0         if (e && SvOK(*e)) {
    0          
2690 0           typed_vals[tp][i] = SvREFCNT_inc(*e);
2691 0           sv_2mortal(typed_vals[tp][i]);
2692 0           hv_delete(row_hvs[i], ct->inner_names[tp],
2693             (I32)nlen, G_DISCARD);
2694             } else {
2695 0           typed_vals[tp][i] = &PL_sv_undef;
2696             }
2697             }
2698             }
2699             }
2700              
2701             /* Now collect dynamic-path keys (typed paths already removed). */
2702 0 0         for (i = 0; i < nrows; i++) {
2703 0 0         if (!row_hvs[i]) continue;
2704 0           hv_iterinit(row_hvs[i]);
2705             HE *he;
2706 0 0         while ((he = hv_iternext(row_hvs[i]))) {
2707             I32 klen;
2708 0           char *kstr = hv_iterkey(he, &klen);
2709 0           (void)hv_store(all_paths, kstr, klen, newSViv(1), 0);
2710             }
2711             }
2712              
2713 0 0         int num_paths = (int)HvUSEDKEYS(all_paths);
2714 0           json_path_entry_t *pe = NULL;
2715 0 0         if (num_paths > 0) {
2716 0           Newx(pe, num_paths, json_path_entry_t);
2717 0           SAVEFREEPV(pe);
2718 0           p = 0;
2719 0           hv_iterinit(all_paths);
2720             HE *he;
2721 0 0         while ((he = hv_iternext(all_paths))) {
2722             I32 klen;
2723 0           char *kstr = hv_iterkey(he, &klen);
2724 0           pe[p].path = kstr;
2725 0           pe[p].len = (STRLEN)klen;
2726 0           p++;
2727             }
2728 0 0         if (num_paths > 1)
2729 0           qsort(pe, num_paths, sizeof(*pe), json_cmp_path_entry);
2730             }
2731              
2732 0           unsigned *kind_masks = NULL;
2733 0 0         if (num_paths > 0) {
2734 0           Newxz(kind_masks, num_paths, unsigned);
2735 0           SAVEFREEPV(kind_masks);
2736 0 0         for (p = 0; p < num_paths; p++) {
2737 0 0         for (i = 0; i < nrows; i++) {
2738 0 0         if (!row_hvs[i]) continue;
2739 0           SV **e = hv_fetch(row_hvs[i], pe[p].path, (I32)pe[p].len, 0);
2740 0 0         if (!e || !SvOK(*e)) continue;
    0          
2741 0           int k = json_classify_value(aTHX_ *e);
2742 0 0         if (k < 0) { FREETMPS; LEAVE; return 0; }
    0          
2743 0           kind_masks[p] |= 1u << k;
2744             }
2745             }
2746             }
2747              
2748             /* Object structure prefix */
2749 0           nbuf_le64(b, 0); /* Object V1 */
2750 0           nbuf_varuint(b, (uint64_t)num_paths);
2751 0           nbuf_varuint(b, (uint64_t)num_paths);
2752 0 0         for (p = 0; p < num_paths; p++)
2753 0           nbuf_string(b, pe[p].path, pe[p].len);
2754              
2755 0           int *path_slots = NULL;
2756 0           int *wire_slot_counts = NULL;
2757 0 0         if (num_paths > 0) {
2758 0           Newx(path_slots, num_paths * JSON_LEX_SLOTS, int);
2759 0           SAVEFREEPV(path_slots);
2760 0           Newx(wire_slot_counts, num_paths, int);
2761 0           SAVEFREEPV(wire_slot_counts);
2762 0 0         for (p = 0; p < num_paths; p++)
2763 0           wire_slot_counts[p] = json_build_lex_table(
2764 0           kind_masks[p], path_slots + p*JSON_LEX_SLOTS);
2765             }
2766              
2767             /* Per-path Dynamic V1 prefix + Variant mode */
2768 0 0         for (p = 0; p < num_paths; p++) {
2769 0           int wire_slots = wire_slot_counts[p];
2770 0           int kc = wire_slots - 1;
2771 0           nbuf_le64(b, 1); /* Dynamic V1 */
2772 0           nbuf_varuint(b, (uint64_t)kc);
2773 0           nbuf_varuint(b, (uint64_t)kc);
2774 0           int *slots = path_slots + p*JSON_LEX_SLOTS;
2775             int s;
2776 0 0         for (s = 0; s < wire_slots; s++) {
2777 0           int k = slots[s];
2778 0 0         if (k < 0) continue;
2779 0           const char *nm = json_kind_type_name[k];
2780 0           nbuf_string(b, nm, strlen(nm));
2781             }
2782 0           nbuf_le64(b, 0); /* Variant BASIC */
2783             }
2784              
2785             /* Typed-path column data — comes BEFORE dynamic-path Variant data
2786             * on the wire. Emitted in declaration (sorted) order via the
2787             * inner type's encoder. */
2788 0 0         if (n_typed > 0) {
2789             int tp;
2790 0 0         for (tp = 0; tp < n_typed; tp++) {
2791 0           int rc = encode_column_sv(aTHX_ b, typed_vals[tp],
2792 0           nrows, ct->inners[tp]);
2793 0 0         if (rc != 1) { FREETMPS; LEAVE; return 0; }
    0          
2794             }
2795             }
2796              
2797             /* Per-path Variant data */
2798 0 0         for (p = 0; p < num_paths; p++) {
2799 0           int wire_slots = wire_slot_counts[p];
2800 0           int *slots = path_slots + p*JSON_LEX_SLOTS;
2801              
2802             /* Discriminator byte per row */
2803 0 0         for (i = 0; i < nrows; i++) {
2804 0 0         if (!row_hvs[i]) { nbuf_u8(b, 0xff); continue; }
2805 0           SV **e = hv_fetch(row_hvs[i], pe[p].path, (I32)pe[p].len, 0);
2806 0 0         if (!e || !SvOK(*e)) { nbuf_u8(b, 0xff); continue; }
    0          
2807 0           int k = json_classify_value(aTHX_ *e);
2808 0           nbuf_u8(b, (uint8_t)json_kind_disc_in(k, slots, wire_slots));
2809             }
2810              
2811             /* Per-variant data in lex order. SharedVariant has zero rows. */
2812             int s;
2813 0 0         for (s = 0; s < wire_slots; s++) {
2814 0           int k_match = slots[s];
2815 0 0         if (k_match < 0) continue;
2816 0           int is_array = (k_match >= JV_ARRAY_BOOL
2817 0 0         && k_match <= JV_ARRAY_STRING);
    0          
2818 0 0         if (is_array) {
2819 0           uint64_t offset = 0;
2820 0 0         for (i = 0; i < nrows; i++) {
2821 0 0         if (!row_hvs[i]) continue;
2822 0           SV **e = hv_fetch(row_hvs[i], pe[p].path,
2823             (I32)pe[p].len, 0);
2824 0 0         if (!e || !SvOK(*e)) continue;
    0          
2825 0 0         if (json_classify_value(aTHX_ *e) != k_match) continue;
2826 0           AV *av = (AV*)SvRV(*e);
2827 0           SSize_t n = av_len(av) + 1;
2828 0           offset += (uint64_t)n;
2829 0           nbuf_le64(b, offset);
2830             }
2831             }
2832 0 0         for (i = 0; i < nrows; i++) {
2833 0 0         if (!row_hvs[i]) continue;
2834 0           SV **e = hv_fetch(row_hvs[i], pe[p].path,
2835             (I32)pe[p].len, 0);
2836 0 0         if (!e || !SvOK(*e)) continue;
    0          
2837 0 0         if (json_classify_value(aTHX_ *e) != k_match) continue;
2838 0 0         if (is_array) {
2839 0           AV *av = (AV*)SvRV(*e);
2840 0           SSize_t n = av_len(av) + 1, j;
2841 0 0         for (j = 0; j < n; j++) {
2842 0           SV **elem = av_fetch(av, j, 0);
2843 0 0         SV *ev = (elem && SvOK(*elem))
2844 0 0         ? *elem : &PL_sv_undef;
2845 0           json_emit_array_elem(aTHX_ b, ev, k_match);
2846             }
2847             } else {
2848 0           json_emit_scalar(aTHX_ b, *e, k_match);
2849             }
2850             }
2851             }
2852             }
2853              
2854             /* Shared data trailer: N UInt64 LE zero offsets */
2855 0 0         if (nrows > 0) {
2856 0           size_t nbytes = (size_t)nrows * 8;
2857 0           nbuf_grow(b, nbytes);
2858 0           memset(b->data + b->len, 0, nbytes);
2859 0           b->len += nbytes;
2860             }
2861 0 0         FREETMPS; LEAVE;
2862 0           return 1;
2863             }
2864 0           default:
2865 0           return 0;
2866             }
2867             }
2868              
2869             /*
2870             * Wrap a filled Data block body into a CLIENT_DATA packet with optional LZ4 + empty trailing block.
2871             * Consumes body->data (frees it). Returns malloc'd packet, or NULL on failure.
2872             */
2873 0           static char* wrap_data_block(ev_clickhouse_t *self, native_buf_t *body, size_t *out_len) {
2874             native_buf_t pkt;
2875              
2876 0           nbuf_init(&pkt);
2877 0           nbuf_varuint(&pkt, CLIENT_DATA);
2878 0           nbuf_cstring(&pkt, ""); /* table name — outside compression */
2879              
2880             #ifdef HAVE_LZ4
2881             if (self->compress) {
2882             char *compressed;
2883             size_t comp_len;
2884             compressed = ch_lz4_compress(body->data, body->len, &comp_len);
2885             Safefree(body->data);
2886             body->data = NULL;
2887             if (compressed) {
2888             nbuf_append(&pkt, compressed, comp_len);
2889             Safefree(compressed);
2890             } else {
2891             Safefree(pkt.data);
2892             *out_len = 0;
2893             return NULL;
2894             }
2895             } else
2896             #endif
2897             {
2898 0           nbuf_append(&pkt, body->data, body->len);
2899 0           Safefree(body->data);
2900 0           body->data = NULL;
2901             }
2902              
2903 0           nbuf_empty_data_block(&pkt, self->compress);
2904              
2905 0           *out_len = pkt.len;
2906 0           return pkt.data;
2907             }
2908              
2909             /*
2910             * Build a native protocol Data block from TabSeparated text data.
2911             * col_names/col_types_str are string references into the sample block buffer.
2912             * Returns malloc'd packet data (CLIENT_DATA + block), or NULL on failure.
2913             */
2914 0           static char* build_native_insert_data(ev_clickhouse_t *self,
2915             const char *tsv_data, size_t tsv_len,
2916             const char **col_names, size_t *col_name_lens,
2917             const char **col_types_str, size_t *col_type_lens,
2918             col_type_t **col_types,
2919             int num_cols,
2920             size_t *out_len) {
2921             /* Parse TSV into rows and fields */
2922 0           int nrows = 0, max_rows = 64;
2923 0           const char **fields = NULL; /* flat array: fields[row * num_cols + col] */
2924 0           size_t *field_lens = NULL;
2925 0           const char *p = tsv_data;
2926 0           const char *end = tsv_data + tsv_len;
2927              
2928 0           Newxz(fields, max_rows * num_cols, const char *);
2929 0           Newx(field_lens, max_rows * num_cols, size_t);
2930              
2931 0 0         while (p < end) {
2932 0           const char *line_end = memchr(p, '\n', end - p);
2933 0 0         const char *line_limit = line_end ? line_end : end;
2934             int col;
2935              
2936             /* skip empty trailing line */
2937 0 0         if (p == line_limit) { p = line_limit + 1; continue; }
2938              
2939 0 0         if (nrows >= max_rows) {
2940 0 0         if (max_rows > INT_MAX / 2 ||
    0          
2941 0 0         (num_cols > 0 && max_rows * 2 > INT_MAX / num_cols)) {
2942 0           Safefree(fields);
2943 0           Safefree(field_lens);
2944 0           *out_len = 0;
2945 0           return NULL;
2946             }
2947 0           max_rows *= 2;
2948 0           Renew(fields, max_rows * num_cols, const char *);
2949 0           Renew(field_lens, max_rows * num_cols, size_t);
2950             }
2951              
2952             /* split line by tabs */
2953             {
2954 0           const char *fp = p;
2955 0 0         for (col = 0; col < num_cols; col++) {
2956             const char *tab;
2957 0 0         if (fp > line_limit) fp = line_limit;
2958 0 0         if (col < num_cols - 1) {
2959 0           tab = memchr(fp, '\t', line_limit - fp);
2960 0 0         if (!tab) tab = line_limit;
2961             } else {
2962 0           tab = line_limit;
2963             }
2964 0           fields[nrows * num_cols + col] = fp;
2965 0           field_lens[nrows * num_cols + col] = tab - fp;
2966 0           fp = tab + 1;
2967             }
2968             }
2969 0           nrows++;
2970 0           p = line_limit + 1;
2971             }
2972              
2973 0 0         if (nrows == 0) {
2974 0           Safefree(fields);
2975 0           Safefree(field_lens);
2976 0           *out_len = 0;
2977 0           return NULL;
2978             }
2979              
2980             /* Build the Data block body: block info + num_cols + num_rows + columns */
2981             {
2982             native_buf_t body;
2983             int col;
2984              
2985 0           nbuf_init(&body);
2986 0           nbuf_block_info(&body);
2987 0           nbuf_varuint(&body, (uint64_t)num_cols);
2988 0           nbuf_varuint(&body, (uint64_t)nrows);
2989              
2990             /* encode each column */
2991 0 0         for (col = 0; col < num_cols; col++) {
2992             const char **col_vals;
2993             size_t *col_vlens;
2994             int row;
2995              
2996             /* column name and type */
2997 0           nbuf_string(&body, col_names[col], col_name_lens[col]);
2998 0           nbuf_string(&body, col_types_str[col], col_type_lens[col]);
2999 0           nbuf_u8(&body, 0); /* has_custom_serialization = false */
3000              
3001             /* gather column values from row-major fields */
3002 0           Newxz(col_vals, nrows, const char *);
3003 0           Newx(col_vlens, nrows, size_t);
3004 0 0         for (row = 0; row < nrows; row++) {
3005 0           col_vals[row] = fields[row * num_cols + col];
3006 0           col_vlens[row] = field_lens[row * num_cols + col];
3007             }
3008              
3009 0 0         if (!encode_column_text(&body, col_vals, col_vlens,
3010 0           (uint64_t)nrows, col_types[col])) {
3011 0           Safefree(col_vals);
3012 0           Safefree(col_vlens);
3013 0           Safefree(body.data);
3014 0           Safefree(fields);
3015 0           Safefree(field_lens);
3016 0           *out_len = (size_t)-1; /* sentinel: encode failure */
3017 0           return NULL;
3018             }
3019 0           Safefree(col_vals);
3020 0           Safefree(col_vlens);
3021             }
3022              
3023 0           Safefree(fields);
3024 0           Safefree(field_lens);
3025              
3026 0           return wrap_data_block(self, &body, out_len);
3027             }
3028             }
3029              
3030             /*
3031             * Build a native protocol Data block from an AV of AV refs.
3032             * Like build_native_insert_data() but encodes SVs directly via encode_column_sv().
3033             */
3034 0           static char* build_native_insert_data_from_av(pTHX_ ev_clickhouse_t *self,
3035             AV *rows,
3036             const char **col_names, size_t *col_name_lens,
3037             const char **col_types_str, size_t *col_type_lens,
3038             col_type_t **col_types,
3039             int num_cols,
3040             size_t *out_len) {
3041 0           SSize_t nrows = av_len(rows) + 1;
3042             native_buf_t body;
3043             int col;
3044              
3045 0 0         if (nrows <= 0) {
3046 0           *out_len = 0;
3047 0           return NULL;
3048             }
3049              
3050 0           nbuf_init(&body);
3051 0           nbuf_block_info(&body);
3052 0           nbuf_varuint(&body, (uint64_t)num_cols);
3053 0           nbuf_varuint(&body, (uint64_t)nrows);
3054              
3055             /* encode each column */
3056 0 0         for (col = 0; col < num_cols; col++) {
3057             SV **col_vals;
3058             SSize_t row;
3059              
3060 0           nbuf_string(&body, col_names[col], col_name_lens[col]);
3061 0           nbuf_string(&body, col_types_str[col], col_type_lens[col]);
3062 0           nbuf_u8(&body, 0); /* has_custom_serialization = false */
3063              
3064             /* gather column values from row-major AV */
3065 0 0         Newx(col_vals, nrows, SV *);
3066 0 0         for (row = 0; row < nrows; row++) {
3067 0           SV **row_svp = av_fetch(rows, row, 0);
3068             AV *row_av;
3069             SV **val_svp;
3070              
3071 0 0         if (!row_svp || !SvROK(*row_svp) || SvTYPE(SvRV(*row_svp)) != SVt_PVAV) {
    0          
    0          
3072 0           Safefree(col_vals);
3073 0           Safefree(body.data);
3074 0           *out_len = (size_t)-1; /* sentinel: encode failure */
3075 0           return NULL;
3076             }
3077 0           row_av = (AV *)SvRV(*row_svp);
3078 0           val_svp = av_fetch(row_av, col, 0);
3079 0 0         col_vals[row] = (val_svp && *val_svp) ? *val_svp : &PL_sv_undef;
    0          
3080             }
3081              
3082 0 0         if (!encode_column_sv(aTHX_ &body, col_vals, (uint64_t)nrows, col_types[col])) {
3083 0           Safefree(col_vals);
3084 0           Safefree(body.data);
3085 0           *out_len = (size_t)-1; /* sentinel: encode failure */
3086 0           return NULL;
3087             }
3088 0           Safefree(col_vals);
3089             }
3090              
3091 0           return wrap_data_block(self, &body, out_len);
3092             }
3093              
3094             /*
3095             * Build the external-table Data packets for a native query. `external`
3096             * maps table-name => { structure => [name => type, ...], data => [[..],..] }.
3097             * Each entry becomes one CLIENT_DATA packet (named block, optionally LZ4)
3098             * to be spliced into the query stream before the terminating empty block.
3099             *
3100             * Returns a malloc'd buffer of the concatenated packets with *out_len set
3101             * (an empty 0-length buffer when `external` has no tables). On error
3102             * returns NULL and writes a message into errbuf. The caller must
3103             * Safefree the returned buffer.
3104             */
3105 0           static char* build_external_tables(pTHX_ ev_clickhouse_t *self, HV *external,
3106             size_t *out_len, char *errbuf,
3107             size_t errbuf_sz) {
3108             native_buf_t out;
3109             HE *he;
3110              
3111 0           *out_len = 0;
3112 0           nbuf_init(&out);
3113 0           hv_iterinit(external);
3114              
3115 0 0         while ((he = hv_iternext(external))) {
3116             I32 tnlen;
3117 0           const char *tname = hv_iterkey(he, &tnlen);
3118 0           SV *spec_sv = hv_iterval(external, he);
3119             HV *spec;
3120             SV **st, **dt;
3121             AV *structure, *rows;
3122             SSize_t sn, nrows, row;
3123             int ncols, col;
3124             native_buf_t body;
3125 0           body.data = NULL;
3126              
3127 0 0         if (!SvROK(spec_sv) || SvTYPE(SvRV(spec_sv)) != SVt_PVHV) {
    0          
3128 0           snprintf(errbuf, errbuf_sz, "external table '%s': spec must be a "
3129             "hashref { structure => [...], data => [...] }", tname);
3130 0           goto fail;
3131             }
3132 0           spec = (HV *)SvRV(spec_sv);
3133 0           st = hv_fetchs(spec, "structure", 0);
3134 0           dt = hv_fetchs(spec, "data", 0);
3135 0 0         if (!st || !SvROK(*st) || SvTYPE(SvRV(*st)) != SVt_PVAV) {
    0          
    0          
3136 0           snprintf(errbuf, errbuf_sz, "external table '%s': 'structure' must "
3137             "be an arrayref of name => type pairs", tname);
3138 0           goto fail;
3139             }
3140 0 0         if (!dt || !SvROK(*dt) || SvTYPE(SvRV(*dt)) != SVt_PVAV) {
    0          
    0          
3141 0           snprintf(errbuf, errbuf_sz, "external table '%s': 'data' must be "
3142             "an arrayref of rows", tname);
3143 0           goto fail;
3144             }
3145 0           structure = (AV *)SvRV(*st);
3146 0           rows = (AV *)SvRV(*dt);
3147 0           sn = av_len(structure) + 1;
3148 0 0         if (sn < 2 || (sn & 1)) {
    0          
3149 0           snprintf(errbuf, errbuf_sz, "external table '%s': 'structure' needs "
3150             "a non-empty, even list of name => type pairs", tname);
3151 0           goto fail;
3152             }
3153 0           ncols = (int)(sn / 2);
3154 0           nrows = av_len(rows) + 1;
3155 0 0         if (nrows < 0) nrows = 0;
3156              
3157             /* block body: block info + num_columns + num_rows + columns */
3158 0           nbuf_init(&body);
3159 0           nbuf_block_info(&body);
3160 0           nbuf_varuint(&body, (uint64_t)ncols);
3161 0           nbuf_varuint(&body, (uint64_t)nrows);
3162              
3163 0 0         for (col = 0; col < ncols; col++) {
3164 0           SV **nm = av_fetch(structure, col * 2, 0);
3165 0           SV **ty = av_fetch(structure, col * 2 + 1, 0);
3166             STRLEN cnl, ctl;
3167             const char *cname, *ctype;
3168             col_type_t *ct;
3169             SV **col_vals;
3170             int enc;
3171              
3172 0 0         if (!nm || !ty || !SvOK(*nm) || !SvOK(*ty)) {
    0          
    0          
    0          
3173 0           snprintf(errbuf, errbuf_sz, "external table '%s': structure "
3174             "name/type entries must be defined", tname);
3175 0           Safefree(body.data);
3176 0           goto fail;
3177             }
3178 0           cname = SvPV(*nm, cnl);
3179 0           ctype = SvPV(*ty, ctl);
3180 0           nbuf_string(&body, cname, cnl);
3181 0           nbuf_string(&body, ctype, ctl);
3182 0           nbuf_u8(&body, 0); /* has_custom_serialization = false */
3183              
3184 0           ct = parse_col_type(ctype, ctl);
3185              
3186             /* gather this column's values from the row-major data */
3187 0 0         Newx(col_vals, nrows > 0 ? nrows : 1, SV *);
    0          
    0          
3188 0 0         for (row = 0; row < nrows; row++) {
3189 0           SV **rsv = av_fetch(rows, row, 0);
3190             AV *rav;
3191             SV **vsv;
3192 0 0         if (!rsv || !SvROK(*rsv) || SvTYPE(SvRV(*rsv)) != SVt_PVAV) {
    0          
    0          
3193 0           snprintf(errbuf, errbuf_sz, "external table '%s': each data "
3194             "row must be an arrayref", tname);
3195 0           Safefree(col_vals);
3196 0           free_col_type(ct);
3197 0           Safefree(body.data);
3198 0           goto fail;
3199             }
3200 0           rav = (AV *)SvRV(*rsv);
3201 0           vsv = av_fetch(rav, col, 0);
3202 0 0         col_vals[row] = (vsv && *vsv) ? *vsv : &PL_sv_undef;
    0          
3203             }
3204              
3205 0           enc = encode_column_sv(aTHX_ &body, col_vals,
3206             (uint64_t)nrows, ct);
3207 0           Safefree(col_vals);
3208 0           free_col_type(ct);
3209 0 0         if (!enc) {
3210 0           snprintf(errbuf, errbuf_sz, "external table '%s': cannot encode "
3211             "column '%.*s' of type '%.*s'", tname,
3212             (int)cnl, cname, (int)ctl, ctype);
3213 0           Safefree(body.data);
3214 0           goto fail;
3215             }
3216             }
3217              
3218             /* [CLIENT_DATA] [table name] [block — optionally LZ4-compressed] */
3219 0           nbuf_varuint(&out, CLIENT_DATA);
3220 0           nbuf_string(&out, tname, (size_t)tnlen);
3221             #ifdef HAVE_LZ4
3222             if (self->compress) {
3223             char *comp;
3224             size_t comp_len;
3225             comp = ch_lz4_compress(body.data, body.len, &comp_len);
3226             Safefree(body.data);
3227             body.data = NULL;
3228             if (!comp) {
3229             snprintf(errbuf, errbuf_sz, "external table '%s': LZ4 "
3230             "compression failed", tname);
3231             goto fail;
3232             }
3233             nbuf_append(&out, comp, comp_len);
3234             Safefree(comp);
3235             } else
3236             #endif
3237             {
3238 0           nbuf_append(&out, body.data, body.len);
3239 0           Safefree(body.data);
3240 0           body.data = NULL;
3241             }
3242             }
3243              
3244 0           *out_len = out.len;
3245 0           return out.data;
3246              
3247 0           fail:
3248 0           Safefree(out.data);
3249 0           *out_len = 0;
3250 0           return NULL;
3251             }
3252