File Coverage

encode.c
Criterion Covered Total %
statement 808 861 93.8
branch 557 698 79.8
condition n/a
subroutine n/a
pod n/a
total 1365 1559 87.5


line stmt bran cond sub pod time code
1             #define PERL_NO_GET_CONTEXT
2             #include "EXTERN.h"
3             #include "perl.h"
4             #include "XSUB.h"
5              
6             #include
7             #include
8             #include
9             #include
10              
11             #ifdef _WIN32
12             # include
13             # include
14             #else
15             # include
16             # include
17             #endif
18              
19             #include "types.h"
20             #include "buffer.h"
21             #include "decimal.h"
22             #include "scalar_kind.h"
23             #include "datetime.h"
24             #include "json_kind.h"
25             #include "encode.h"
26              
27             /* Build a recursive NULL placeholder for `t`. Returns a mortal SV.
28             * Leaf types get a defined zero/empty SV (not &PL_sv_undef) so that the
29             * encoder doesn't warn about uninit values when filling padding bytes. */
30 36           static SV *make_null_placeholder(pTHX_ TypeInfo *t) {
31 36 100         if (t->code == T_ARRAY || t->code == T_MAP) {
    100          
32 8           AV *empty = newAV();
33 8           return sv_2mortal(newRV_noinc((SV*)empty));
34             }
35 28 100         if (t->code == T_TUPLE) {
36 6           AV *tuple = newAV();
37             int i;
38 17 100         for (i = 0; i < t->tuple_len; i++) {
39 11           SV *child = make_null_placeholder(aTHX_ t->tuple[i]);
40 11           av_push(tuple, SvREFCNT_inc(child));
41             }
42 6           return sv_2mortal(newRV_noinc((SV*)tuple));
43             }
44 22 50         if (t->code == T_LOWCARDINALITY)
45 0           return make_null_placeholder(aTHX_ t->inner);
46 22 50         if (t->code == T_VARIANT)
47             /* Variant's encoder treats undef as the NULL discriminator. */
48 0           return &PL_sv_undef;
49 22 100         if (t->code == T_STRING || t->code == T_FIXEDSTRING)
    50          
50 7           return sv_2mortal(newSVpvn("", 0));
51 15 50         if (t->code == T_UUID || t->code == T_IPV6)
    50          
52 0           return sv_2mortal(newSVpvn("\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 16));
53 15 50         if (t->code == T_FLOAT32 || t->code == T_FLOAT64 || t->code == T_BFLOAT16)
    50          
    50          
54 0           return sv_2mortal(newSVnv(0.0));
55             /* numeric / date / decimal / enum / bool / ipv4: a defined integer 0 is fine */
56 15           return sv_2mortal(newSViv(0));
57             }
58              
59 2020           static int enum_lookup_name(pTHX_ TypeInfo *t, const char *s, STRLEN slen, int16_t *out) {
60 2020           SV **psv = hv_fetch(t->enum_lookup, s, slen, 0);
61 2020 100         if (!psv) return 0;
62 2018           *out = (int16_t)SvIV(*psv);
63 2018           return 1;
64             }
65              
66 904828           static void encode_scalar(pTHX_ Buffer *b, SV *val, TypeInfo *t) {
67 904828           switch (t->code) {
68 11           case T_INT8: buf_byte(aTHX_ b,(uint8_t)(int8_t)SvIV(val)); break;
69 8           case T_INT16: buf_le16(aTHX_ b,(uint16_t)(int16_t)SvIV(val)); break;
70 12314           case T_INT32: buf_le32(aTHX_ b,(uint32_t)(int32_t)SvIV(val)); break;
71 14           case T_INT64: buf_le64(aTHX_ b,(uint64_t)(int64_t)SvIV(val)); break;
72 104           case T_UINT8: buf_byte(aTHX_ b,(uint8_t)SvUV(val)); break;
73 4           case T_UINT16: buf_le16(aTHX_ b,(uint16_t)SvUV(val)); break;
74 250515           case T_UINT32: buf_le32(aTHX_ b,(uint32_t)SvUV(val)); break;
75 41079           case T_UINT64: buf_le64(aTHX_ b,(uint64_t)SvUV(val)); break;
76 15           case T_FLOAT32: buf_lefloat(aTHX_ b,(float)SvNV(val)); break;
77 42936           case T_FLOAT64: buf_ledouble(aTHX_ b,SvNV(val)); break;
78 12           case T_BFLOAT16: {
79             /* BFloat16 = top 16 bits of the Float32 binary representation
80             * (truncation, no rounding -- matches ClickHouse server). */
81             union { float f; uint32_t u; } u;
82 12           u.f = (float)SvNV(val);
83 12           buf_le16(aTHX_ b, (uint16_t)(u.u >> 16));
84 12           break;
85             }
86 492501           case T_STRING: {
87 492501 100         if (!SvOK(val)) {
88 2           buf_varint(aTHX_ b,0);
89             } else {
90             STRLEN len;
91 492499           const char *s = SvPV(val, len);
92 492499           buf_string(aTHX_ b,s, len);
93             }
94 492501           break;
95             }
96 22           case T_FIXEDSTRING: {
97 22           buf_grow(aTHX_ b,t->param);
98 22 50         if (!SvOK(val)) {
99 0           memset(b->ptr + b->len, 0, t->param);
100 0           b->len += t->param;
101             } else {
102             STRLEN len;
103 22           const char *s = SvPV(val, len);
104 22 100         if (len >= (STRLEN)t->param) {
105 15           memcpy(b->ptr + b->len, s, t->param);
106 15           b->len += t->param;
107             } else {
108 7           memcpy(b->ptr + b->len, s, len);
109 7           memset(b->ptr + b->len + len, 0, t->param - len);
110 7           b->len += t->param;
111             }
112             }
113 22           break;
114             }
115 2027           case T_ENUM8: {
116 2027           int16_t v = 0;
117 2027 50         if (!SvOK(val)) {
118             /* zero */
119 2027 100         } else if (SvIOK(val) || SvNOK(val)) {
    50          
120 9           IV iv = SvIV(val);
121 9 100         if (iv < -128 || iv > 127)
    100          
122 3           croak("Enum8 value %" IVdf " out of range", iv);
123 6           v = (int16_t)iv;
124             } else {
125             STRLEN slen;
126 2018           const char *s = SvPV(val, slen);
127 2018 100         if (!enum_lookup_name(aTHX_ t, s, slen, &v))
128 2           croak("Unknown enum value: %.*s", (int)slen, s);
129             }
130 2022           buf_byte(aTHX_ b,(uint8_t)(int8_t)v);
131 2022           break;
132             }
133 6           case T_ENUM16: {
134 6           int16_t v = 0;
135 6 50         if (!SvOK(val)) {
136             /* zero */
137 6 100         } else if (SvIOK(val) || SvNOK(val)) {
    50          
138 4           IV iv = SvIV(val);
139 4 50         if (iv < -32768 || iv > 32767)
    100          
140 1           croak("Enum16 value %" IVdf " out of range", iv);
141 3           v = (int16_t)iv;
142             } else {
143             STRLEN slen;
144 2           const char *s = SvPV(val, slen);
145 2 50         if (!enum_lookup_name(aTHX_ t, s, slen, &v))
146 0           croak("Unknown enum value: %.*s", (int)slen, s);
147             }
148 5           buf_le16(aTHX_ b,(uint16_t)v);
149 5           break;
150             }
151 12           case T_DECIMAL32: {
152             int64_t v64;
153 12 100         if (looks_stringy(val)) {
154             STRLEN slen;
155 5           const char *s = SvPV(val, slen);
156 5 50         if (!parse_decimal_int64_str(s, slen, t->param, &v64))
157 0           croak("Invalid decimal string: %.*s", (int)slen, s);
158             } else {
159 7           double d = SvNV(val) * decimal_pow10(t->param);
160 7 50         if (!(d >= (double)INT32_MIN && d <= (double)INT32_MAX))
    50          
161 0           croak("Decimal32 overflow");
162 7           v64 = (int64_t)llround(d);
163             }
164 12 50         if (v64 < INT32_MIN || v64 > INT32_MAX)
    100          
165 2           croak("Decimal32 overflow");
166 10           buf_le32(aTHX_ b,(uint32_t)(int32_t)v64);
167 10           break;
168             }
169 2015           case T_DECIMAL64: {
170             int64_t v;
171 2015 100         if (looks_stringy(val)) {
172             STRLEN slen;
173 2008           const char *s = SvPV(val, slen);
174 2008 50         if (!parse_decimal_int64_str(s, slen, t->param, &v))
175 0           croak("Invalid decimal string: %.*s", (int)slen, s);
176             } else {
177 7           double d = SvNV(val) * decimal_pow10(t->param);
178             /* INT64_MAX (2^63 - 1) isn't exactly representable in double;
179             * the next representable double above it is 2^63 itself, so use
180             * a strict less-than against 2^63 to bracket the valid range. */
181 7 50         if (!(d > -9.223372036854776e18 && d < 9.223372036854776e18))
    100          
182 3           croak("Decimal64 overflow (or non-finite value)");
183 4           v = (int64_t)llround(d);
184             }
185 2012           buf_le64(aTHX_ b,(uint64_t)v);
186 2012           break;
187             }
188 16           case T_DECIMAL256: {
189 16           uint64_t limbs[4] = {0,0,0,0};
190 16 100         if (looks_stringy(val)) {
191             STRLEN slen;
192 13           const char *s = SvPV(val, slen);
193 13 100         if (!parse_decimal256_str(s, slen, t->param, limbs))
194 2           croak("Invalid decimal string: %.*s", (int)slen, s);
195             } else {
196             /* Float path through long double; lossy past ~18 digits. */
197 3           long double d = (long double)SvNV(val) * decimal_pow10l(t->param);
198 3 50         if (!isfinite(d))
199 0           croak("Decimal256: cannot encode non-finite value");
200 3           int neg = d < 0;
201 3 50         if (neg) d = -d;
202 3           d = roundl(d);
203 3           long double two64 = (long double)18446744073709551616.0L;
204 3           long double two255 = two64 * two64 * two64
205             * (long double)9223372036854775808.0L;
206             /* Reject values that would overflow signed 256-bit (i.e. set
207             * the sign bit, which CH would interpret as negative).
208             * Mirrors the >= 2^127 check in the Decimal128 path. */
209 3 50         if (d >= two255)
210 3           croak("Decimal256 overflow");
211             int j;
212 0 0         for (j = 0; j < 4 && d > 0; j++) {
    0          
213 0           limbs[j] = (uint64_t)fmodl(d, two64);
214 0           d = (d - limbs[j]) / two64;
215             }
216 0 0         if (neg) {
217 0 0         for (j = 0; j < 4; j++) limbs[j] = ~limbs[j];
218 0           add_digit_256(limbs, 1);
219             }
220             }
221 11           buf_le64(aTHX_ b, limbs[0]);
222 11           buf_le64(aTHX_ b, limbs[1]);
223 11           buf_le64(aTHX_ b, limbs[2]);
224 11           buf_le64(aTHX_ b, limbs[3]);
225 11           break;
226             }
227 23           case T_DECIMAL128: {
228             uint64_t lo, hi;
229 23 100         if (looks_stringy(val)) {
230             STRLEN slen;
231 20           const char *s = SvPV(val, slen);
232 20 100         if (!parse_decimal128_str(s, slen, t->param, &hi, &lo))
233 4           croak("Invalid decimal string: %.*s", (int)slen, s);
234             } else {
235 3           long double d = (long double)SvNV(val) * decimal_pow10l(t->param);
236 3 50         if (!isfinite(d))
237 0           croak("Decimal128: cannot encode non-finite value");
238 3           int neg = d < 0;
239 3 100         if (neg) d = -d;
240 3           d = roundl(d);
241 3           long double two64 = (long double)18446744073709551616.0L;
242 3           long double two127 = two64 * (long double)9223372036854775808.0L;
243 3 100         if (d >= two127)
244 1           croak("Decimal128 overflow");
245 2           hi = (uint64_t)(d / two64);
246 2           lo = (uint64_t)fmodl(d, two64);
247 2 100         if (neg) {
248 1           lo = ~lo + 1;
249 1 50         hi = ~hi + (lo == 0 ? 1 : 0);
250             }
251             }
252 18           buf_le64(aTHX_ b,lo);
253 18           buf_le64(aTHX_ b,hi);
254 18           break;
255             }
256 29           case T_DATE: {
257             uint16_t v;
258 29 50         if (!SvOK(val)) v = 0;
259 29 100         else if (SvIOK(val) || SvNOK(val)) v = (uint16_t)SvUV(val);
    50          
260             else {
261             STRLEN slen;
262 21           const char *s = SvPV(val, slen);
263 21 100         if (looks_like_int_str(s, slen)) v = (uint16_t)SvUV(val);
264 20           else v = (uint16_t)parse_date_string(aTHX_ s, slen);
265             }
266 20           buf_le16(aTHX_ b,v);
267 20           break;
268             }
269 7           case T_DATE32: {
270             int32_t v;
271 7 50         if (!SvOK(val)) v = 0;
272 7 100         else if (SvIOK(val) || SvNOK(val)) v = (int32_t)SvIV(val);
    50          
273             else {
274             STRLEN slen;
275 6           const char *s = SvPV(val, slen);
276 6 50         if (looks_like_int_str(s, slen)) v = (int32_t)SvIV(val);
277 6           else v = parse_date_string(aTHX_ s, slen);
278             }
279 7           buf_le32(aTHX_ b,(uint32_t)v);
280 7           break;
281             }
282 51085           case T_DATETIME: {
283             uint32_t v;
284 51085 50         if (!SvOK(val)) v = 0;
285 51085 100         else if (SvIOK(val) || SvNOK(val)) v = (uint32_t)SvUV(val);
    50          
286             else {
287             STRLEN slen;
288 15           const char *s = SvPV(val, slen);
289 15 100         if (looks_like_int_str(s, slen)) v = (uint32_t)SvUV(val);
290 14           else v = parse_datetime_string(aTHX_ s, slen);
291             }
292 51080           buf_le32(aTHX_ b,v);
293 51080           break;
294             }
295 20           case T_DATETIME64: {
296             int64_t v;
297 20 50         if (!SvOK(val)) v = 0;
298 20 100         else if (SvIOK(val)) v = (int64_t)SvIV(val);
299 17 100         else if (SvNOK(val)) v = dt64_double_to_int64(aTHX_ SvNV(val), t->param);
300             else {
301             STRLEN slen;
302 13           const char *s = SvPV(val, slen);
303 13 100         if (looks_like_int_str(s, slen))
304 2           v = (int64_t)SvIV(val);
305 11 100         else if (looks_like_number_str(s, slen))
306 3           v = dt64_double_to_int64(aTHX_ SvNV(val), t->param);
307             else
308 8           v = parse_datetime64_string(aTHX_ s, slen, t->param);
309             }
310 17           buf_le64(aTHX_ b,(uint64_t)v);
311 17           break;
312             }
313 10019           case T_BOOL: {
314 10019 100         uint8_t v = SvOK(val) && SvTRUE(val) ? 1 : 0;
    100          
315 10019           buf_byte(aTHX_ b, v);
316 10019           break;
317             }
318 12           case T_UUID: {
319             /* Wire format: two LE UInt64 halves with bytes reversed within
320             * each half. Standard input is "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx";
321             * also accept 16 raw bytes. */
322             uint8_t buf16[16];
323 12 50         if (!SvOK(val)) {
324 0           memset(buf16, 0, 16);
325             } else {
326             STRLEN slen;
327 12           const char *s = SvPV(val, slen);
328 12 100         if (slen == 16) {
329 1           memcpy(buf16, s, 16);
330 11 100         } else if (slen == 36) {
331             /* parse hex with dashes at positions 8, 13, 18, 23 */
332 10           int i, j = 0;
333 10 50         if (s[8] != '-' || s[13] != '-' || s[18] != '-' || s[23] != '-')
    50          
    50          
    50          
334 0           croak("Invalid UUID format: %.*s", (int)slen, s);
335 334 100         for (i = 0; i < 36; i++) {
336 325 100         if (i == 8 || i == 13 || i == 18 || i == 23) continue;
    100          
    100          
    100          
337 289           char c = s[i];
338 289 100         int nyb = (c >= '0' && c <= '9') ? c - '0'
339 598 50         : (c >= 'a' && c <= 'f') ? c - 'a' + 10
    50          
340 43 100         : (c >= 'A' && c <= 'F') ? c - 'A' + 10
    50          
341 2 50         : -1;
342 289 100         if (nyb < 0)
343 1           croak("Invalid UUID hex digit: %.*s", (int)slen, s);
344 288 100         if ((j & 1) == 0) buf16[j >> 1] = nyb << 4;
345 144           else buf16[j >> 1] |= nyb;
346 288           j++;
347             }
348             } else {
349 1           croak("UUID requires 36-char string or 16 raw bytes, got %d bytes",
350             (int)slen);
351             }
352             }
353             /* Reverse first half then second half. */
354             uint8_t out[16];
355             int i;
356 90 100         for (i = 0; i < 8; i++) out[i] = buf16[7 - i];
357 90 100         for (i = 0; i < 8; i++) out[8+i] = buf16[15 - i];
358 10           buf_append(aTHX_ b, (const char *)out, 16);
359 10           break;
360             }
361 14           case T_IPV4: {
362             uint32_t v;
363 14 50         if (!SvOK(val)) v = 0;
364 14 50         else if (SvIOK(val) || SvNOK(val)) v = (uint32_t)SvUV(val);
    50          
365             else {
366             STRLEN slen;
367 14           const char *s = SvPV(val, slen);
368 14 50         if (looks_like_int_str(s, slen)) {
369 0           v = (uint32_t)SvUV(val);
370             } else {
371             char tmp[INET_ADDRSTRLEN];
372 14 50         if (slen >= sizeof(tmp))
373 0           croak("IPv4 string too long: %.*s", (int)slen, s);
374 14           memcpy(tmp, s, slen);
375 14           tmp[slen] = 0;
376             struct in_addr addr;
377 14 100         if (inet_pton(AF_INET, tmp, &addr) != 1)
378 2           croak("Invalid IPv4 address: %s", tmp);
379             /* inet_pton stores in network byte order; ntohl gives
380             * the canonical host-order integer (e.g. 0x01020304
381             * for "1.2.3.4") on any host. buf_le32 then writes
382             * the LE wire bytes [04][03][02][01]. */
383 12           v = ntohl((uint32_t)addr.s_addr);
384             }
385             }
386 12           buf_le32(aTHX_ b, v);
387 12           break;
388             }
389 8           case T_IPV6: {
390             uint8_t out[16];
391 8 50         if (!SvOK(val)) {
392 0           memset(out, 0, 16);
393             } else {
394             STRLEN slen;
395 8           const char *s = SvPV(val, slen);
396 8 100         if (slen == 16) {
397 1           memcpy(out, s, 16);
398             } else {
399             char tmp[INET6_ADDRSTRLEN];
400 7 50         if (slen >= sizeof(tmp))
401 0           croak("IPv6 string too long: %.*s", (int)slen, s);
402 7           memcpy(tmp, s, slen);
403 7           tmp[slen] = 0;
404 7 100         if (inet_pton(AF_INET6, tmp, out) != 1)
405 1           croak("Invalid IPv6 address: %s", tmp);
406             }
407             }
408 7           buf_append(aTHX_ b, (const char *)out, 16);
409 7           break;
410             }
411 0           default:
412 0           croak("encode_scalar called on non-scalar type");
413             }
414 904785           }
415              
416 9228           static void encode_null_scalar(pTHX_ Buffer *b, TypeInfo *t) {
417 9228           switch (t->code) {
418 1           case T_INT8: case T_UINT8: case T_ENUM8: case T_BOOL:
419 1           buf_byte(aTHX_ b,0); break;
420 2           case T_INT16: case T_UINT16: case T_ENUM16: case T_DATE: case T_BFLOAT16:
421 2           buf_le16(aTHX_ b,0); break;
422 18           case T_INT32: case T_UINT32: case T_FLOAT32:
423             case T_DECIMAL32: case T_DATE32: case T_DATETIME: case T_IPV4:
424 18           buf_le32(aTHX_ b,0); break;
425 8201           case T_INT64: case T_UINT64: case T_FLOAT64:
426             case T_DECIMAL64: case T_DATETIME64:
427 8201           buf_le64(aTHX_ b,0); break;
428 0           case T_DECIMAL128: case T_UUID: case T_IPV6:
429 0           buf_le64(aTHX_ b,0); buf_le64(aTHX_ b,0); break;
430 0           case T_DECIMAL256:
431 0           buf_le64(aTHX_ b,0); buf_le64(aTHX_ b,0);
432 0           buf_le64(aTHX_ b,0); buf_le64(aTHX_ b,0); break;
433 1005           case T_STRING:
434 1005           buf_varint(aTHX_ b,0); break;
435 1           case T_FIXEDSTRING: {
436 1           buf_grow(aTHX_ b,t->param);
437 1           memset(b->ptr + b->len, 0, t->param);
438 1           b->len += t->param;
439 1           break;
440             }
441 0           default:
442 0           croak("encode_null_scalar called on complex type");
443             }
444 9228           }
445              
446 1241           static int is_simple_type(TypeInfo *t) {
447 1239 100         return t->code != T_ARRAY && t->code != T_TUPLE && t->code != T_NULLABLE
    50          
448 1234 100         && t->code != T_MAP && t->code != T_LOWCARDINALITY
    50          
449 1233 50         && t->code != T_VARIANT && t->code != T_JSON
    50          
450 2480 100         && t->code != T_DYNAMIC;
    50          
451             }
452              
453             /* Allocate an SV** array backed by a mortal SV (auto-freed on scope exit). */
454 4593           SV **alloc_sv_array(pTHX_ SSize_t n) {
455 4593 100         if (n <= 0) return NULL;
456 4580           STRLEN bytes = n * sizeof(SV*);
457 4580           SV *holder = sv_2mortal(newSV(bytes));
458 4580           SvPOK_only(holder);
459 4580           SvCUR_set(holder, bytes); /* keep the SV's claimed length in sync */
460 4580           return (SV**)SvPVX(holder);
461             }
462              
463             static int json_is_bool_ref(pTHX_ SV *val);
464              
465             /* For an arrayref leaf, infer the element kind. All elements must
466             * resolve to the same scalar kind (Bool/Float64/Int64/String) or be
467             * undef. Returns the matching JV_ARRAY_* kind, or -1 if heterogeneous
468             * / unsupported. NULL elements are accepted and encoded as zero-valued
469             * placeholders; CH represents them via the element type's Nullable
470             * wrapper if the user declares one - we use the bare type here. */
471 3409           static int json_classify_array(pTHX_ AV *av) {
472 3409           SSize_t n = av_len(av) + 1;
473 3409           int seen_kind = -1;
474             SSize_t i;
475 11441 100         for (i = 0; i < n; i++) {
476 8033           SV **e = av_fetch(av, i, 0);
477 8033 50         if (!e || !SvOK(*e)) continue;
    50          
478 8033 100         if (SvROK(*e)) {
479             /* recognized booleans only; nested arrays/hashes in array
480             * leaves are not yet supported */
481 620 50         if (json_is_bool_ref(aTHX_ *e)) {
482 620 100         if (seen_kind == -1) seen_kind = JV_BOOL;
483 386 50         else if (seen_kind != JV_BOOL) return -1;
484 620           continue;
485             }
486 0           return -1;
487             }
488             int leaf;
489             #ifdef SvIsBOOL
490 7413 50         if (SvIsBOOL(*e)) leaf = JV_BOOL;
491             else
492             #endif
493 7413 100         if (SvIOK(*e) && !SvNOK(*e)) leaf = JV_INT64;
    50          
494 5099 100         else if (SvNOK(*e)) {
495 1752           NV nv = SvNV(*e);
496 1752           leaf = (nv == (NV)(int64_t)nv
497 0 0         && nv >= (NV)INT64_MIN && nv <= (NV)INT64_MAX)
    0          
498 1752 50         ? JV_INT64 : JV_FLOAT64;
499             } else {
500 3347           leaf = JV_STRING;
501             }
502 7413 100         if (seen_kind == -1) seen_kind = leaf;
503 4464 100         else if (seen_kind != leaf) return -1;
504             }
505             /* all-NULL or empty: default to Array(Int64). The empty array
506             * doesn't carry an element type, so we pick one arbitrarily. If
507             * another row in the same path has a non-empty Array(T), the kind
508             * mask will then carry both Array(Int64) and Array(T); for a clean
509             * single-variant column the user can declare the column explicitly
510             * via a Variant(...) type. */
511 3408 100         if (seen_kind == -1) return JV_ARRAY_INT64;
512 3182           switch (seen_kind) {
513 234           case JV_BOOL: return JV_ARRAY_BOOL;
514 806           case JV_FLOAT64: return JV_ARRAY_FLOAT64;
515 811           case JV_INT64: return JV_ARRAY_INT64;
516 1331           case JV_STRING: return JV_ARRAY_STRING;
517 0           default: return -1;
518             }
519             }
520              
521 2219           static int json_pkg_is_bool(const char *pkg) {
522 2234 50         return pkg && (strcmp(pkg, "JSON::PP::Boolean") == 0
    100          
523 15 50         || strcmp(pkg, "Types::Serialiser::Boolean") == 0
524 15 100         || strcmp(pkg, "JSON::XS::Boolean") == 0
525 8 100         || strcmp(pkg, "Cpanel::JSON::XS::Boolean") == 0
526 1 50         || strcmp(pkg, "boolean") == 0);
527             }
528              
529             /* True if val is a blessed ref into one of the recognized boolean packages. */
530 12423           static int json_is_bool_ref(pTHX_ SV *val) {
531 12423 100         if (!(SvROK(val) && sv_isobject(val))) return 0;
    100          
532 2219           HV *stash = SvSTASH(SvRV(val));
533 2219 50         return stash && json_pkg_is_bool(HvNAME(stash));
    50          
    50          
    50          
    0          
    50          
    50          
    100          
534             }
535              
536             /* Classify a non-undef leaf SV for JSON encoding. Hash/array refs (other
537             * than recognized booleans) must be rejected by the caller before this. */
538 6184           static JsonValueKind json_classify_leaf(pTHX_ SV *val) {
539 6184 100         if (json_is_bool_ref(aTHX_ val)) return JV_BOOL;
540             #ifdef SvIsBOOL
541 5499 100         if (SvIsBOOL(val)) return JV_BOOL;
542             #endif
543 5493 100         if (SvIOK(val) && !SvNOK(val)) return JV_INT64;
    50          
544 3138 100         if (SvNOK(val)) {
545 907           NV n = SvNV(val);
546             /* NV holding an exact integer value (e.g. `1.0`) collapses to
547             * Int64 - matches CH's JSONEachRow type-inference. Users who
548             * need `1.0` to survive as Float64 on the wire should put the
549             * value through a float-typed Variant or an explicit Float64
550             * column. NaN/Inf fall through to Float64 because the integer
551             * round-trip equality check fails for them. */
552 907 50         if (n == (NV)(int64_t)n
553 0 0         && n >= (NV)INT64_MIN && n <= (NV)INT64_MAX)
    0          
554 0           return JV_INT64;
555 907           return JV_FLOAT64;
556             }
557 2231           return JV_STRING;
558             }
559              
560             /* Classify a (defined) JSON/Dynamic value SV. Dispatches arrayrefs to
561             * json_classify_array and everything else to json_classify_leaf, so the
562             * caller doesn't have to repeat the SvROK / bool-ref check. Returns the
563             * JsonValueKind, or -1 if `val` is an arrayref with heterogeneous
564             * elements. Caller has already ruled out hashref-non-bool refs. */
565 9593           static int json_classify_value(pTHX_ SV *val) {
566 9593 100         if (SvROK(val) && !json_is_bool_ref(aTHX_ val))
    100          
567 3409           return json_classify_array(aTHX_ (AV*)SvRV(val));
568 6184           return json_classify_leaf(aTHX_ val);
569             }
570              
571             /* Emit one element of an Array(T) Dynamic variant. Used by both T_JSON
572             * (per-path) and T_DYNAMIC (top-level) encode paths; `ev` may be undef
573             * in which case a zero-valued placeholder of the element type is written
574             * (CH represents nulls via Nullable wrappers, which we don't introduce
575             * inside Dynamic variant arrays). */
576 2004           static void json_emit_array_elem(pTHX_ Buffer *b, SV *ev, int k_match) {
577 2004           switch (k_match) {
578 153           case JV_ARRAY_BOOL: {
579 153 50         if (!SvOK(ev)) { buf_byte(aTHX_ b, 0); break; }
580 153 50         SV *bv = SvROK(ev) ? SvRV(ev) : ev;
581 153           buf_byte(aTHX_ b, SvTRUE(bv) ? 1 : 0);
582 153           break;
583             }
584 579           case JV_ARRAY_INT64:
585 579 50         buf_le64(aTHX_ b, SvOK(ev) ? (uint64_t)(int64_t)SvIV(ev) : 0);
586 579           break;
587 436           case JV_ARRAY_FLOAT64:
588 436 50         buf_ledouble(aTHX_ b, SvOK(ev) ? SvNV(ev) : 0.0);
589 436           break;
590 836           case JV_ARRAY_STRING: {
591 836 50         if (!SvOK(ev)) { buf_varint(aTHX_ b, 0); break; }
592             STRLEN sl;
593 836           const char *ss = SvPV(ev, sl);
594 836           buf_string(aTHX_ b, ss, sl);
595 836           break;
596             }
597             }
598 2004           }
599              
600             /* Emit one scalar value of a Dynamic variant. `val` is assumed defined
601             * (the disc loop has already routed undef rows to disc 0xff). */
602 1782           static void json_emit_scalar(pTHX_ Buffer *b, SV *val, int k_match) {
603 1782           switch (k_match) {
604 230           case JV_BOOL: {
605 230 100         SV *bv = SvROK(val) ? SvRV(val) : val;
606 230           buf_byte(aTHX_ b, SvTRUE(bv) ? 1 : 0);
607 230           break;
608             }
609 693           case JV_INT64:
610 693           buf_le64(aTHX_ b, (uint64_t)(int64_t)SvIV(val));
611 693           break;
612 209           case JV_FLOAT64:
613 209           buf_ledouble(aTHX_ b, SvNV(val));
614 209           break;
615 650           case JV_STRING: {
616             STRLEN sl;
617 650           const char *s = SvPV(val, sl);
618 650           buf_string(aTHX_ b, s, sl);
619 650           break;
620             }
621 0           default: break;
622             }
623 1782           }
624              
625             /* Recursively flatten a JSON value hash into a flat HV of dotted-path
626             * names. Allocated SVs are mortalized so the caller never has to free
627             * them; the only references stored in `out_flat` are to leaf SVs from
628             * the original hash (which the caller pins).
629             *
630             * If `stop_paths` is non-NULL, it's a HV of dotted-path names to treat
631             * as leaves: when we encounter a hash value at a path in the set, we
632             * store the hashref directly rather than recursing. Used by JSON typed
633             * paths whose declared inner type is a hash-shape (Map / Tuple). */
634 2010           static void flatten_json_hash(pTHX_ HV *src,
635             const char *prefix, STRLEN prefix_len,
636             HV *out_flat, HV *stop_paths) {
637 2010           hv_iterinit(src);
638             HE *he;
639 5316 100         while ((he = hv_iternext(src))) {
640             I32 klen;
641 3307           char *kstr = hv_iterkey(he, &klen);
642 3307           SV *vsv = hv_iterval(src, he);
643              
644 3307 100         STRLEN new_len = prefix_len + (prefix_len ? 1 : 0) + klen;
645 3307           SV *path_sv = sv_2mortal(newSV(new_len));
646 3307           SvPOK_only(path_sv);
647 3307           char *pbuf = SvPVX(path_sv);
648 3307 100         if (prefix_len) {
649 791           memcpy(pbuf, prefix, prefix_len);
650 791           pbuf[prefix_len] = '.';
651 791           memcpy(pbuf + prefix_len + 1, kstr, klen);
652             } else {
653 2516           memcpy(pbuf, kstr, klen);
654             }
655 3307           SvCUR_set(path_sv, new_len);
656              
657 3307           int stop_here = stop_paths
658 3307 100         && hv_exists(stop_paths, SvPVX(path_sv), new_len);
    100          
659              
660 3307 100         if (!stop_here
661 3303 100         && SvROK(vsv) && SvTYPE(SvRV(vsv)) == SVt_PVHV
    100          
662 444 50         && !json_is_bool_ref(aTHX_ vsv)) {
663             /* Reject blessed hashrefs that aren't recognized Booleans:
664             * a JSON value of `bless {}, "Custom"` shouldn't silently
665             * flatten as if it were a plain hash. */
666 444 100         if (sv_isobject(vsv))
667 1 50         croak("JSON column: opaque blessed hashref (package '%s') "
    50          
    50          
    0          
    50          
    50          
668             "is not a JSON value; only known Boolean classes "
669             "are accepted as object leaves",
670             HvNAME(SvSTASH(SvRV(vsv))));
671 443           flatten_json_hash(aTHX_ (HV*)SvRV(vsv),
672 443           SvPVX(path_sv), new_len, out_flat,
673             stop_paths);
674             } else {
675 2863           hv_store(out_flat, SvPVX(path_sv), new_len,
676             SvREFCNT_inc_simple_NN(vsv), 0);
677             }
678             }
679 2009           }
680              
681             /* (path, len) pair: lets the JSON encoder sort paths whose keys may
682             * contain embedded NUL bytes without falling back to strlen() after
683             * sort. Perl HV keys are NUL-terminated but the value bytes themselves
684             * are arbitrary, so we carry both together. */
685             typedef struct { char *path; STRLEN len; } PathEntry;
686              
687 5465           static int cmp_path_entry(const void *a, const void *b) {
688 5465           const PathEntry *pa = (const PathEntry *)a;
689 5465           const PathEntry *pb = (const PathEntry *)b;
690 5465           STRLEN n = pa->len < pb->len ? pa->len : pb->len;
691 5465           int r = memcmp(pa->path, pb->path, n);
692 5465 100         if (r) return r;
693 46 100         if (pa->len < pb->len) return -1;
694 25 50         if (pa->len > pb->len) return 1;
695 0           return 0;
696             }
697              
698 9169           void encode_column(pTHX_ Buffer *b, SV **values, SSize_t num_rows, TypeInfo *t) {
699             SSize_t r;
700              
701 9169           switch (t->code) {
702 5317           case T_INT8: case T_INT16: case T_INT32: case T_INT64:
703             case T_UINT8: case T_UINT16: case T_UINT32: case T_UINT64:
704             case T_FLOAT32: case T_FLOAT64: case T_BFLOAT16:
705             case T_STRING: case T_FIXEDSTRING:
706             case T_ENUM8: case T_ENUM16:
707             case T_DECIMAL32: case T_DECIMAL64: case T_DECIMAL128: case T_DECIMAL256:
708             case T_DATE: case T_DATE32: case T_DATETIME: case T_DATETIME64:
709             case T_BOOL: case T_UUID: case T_IPV4: case T_IPV6:
710 876263 100         for (r = 0; r < num_rows; r++) encode_scalar(aTHX_ b,values[r], t);
711 5274           break;
712              
713 1314           case T_ARRAY: {
714 1314           uint64_t offset = 0;
715 1314           SSize_t total_elems = 0;
716              
717 94519 100         for (r = 0; r < num_rows; r++) {
718 93206           SV *val = values[r];
719 93206 100         if (!SvROK(val) || SvTYPE(SvRV(val)) != SVt_PVAV)
    50          
720 1           croak("Expected arrayref for Array type");
721 93205           AV *av = (AV*)SvRV(val);
722 93205           SSize_t n = av_len(av) + 1;
723 93205           offset += (uint64_t)n;
724 93205           total_elems += n;
725 93205           buf_le64(aTHX_ b,offset);
726             }
727              
728 1313 100         if (total_elems > 0) {
729 1311           SV **all_elems = alloc_sv_array(aTHX_ total_elems);
730 1311           SSize_t idx = 0;
731 94514 100         for (r = 0; r < num_rows; r++) {
732 93203           AV *av = (AV*)SvRV(values[r]);
733 93203           SSize_t n = av_len(av) + 1;
734             SSize_t i;
735 368462 100         for (i = 0; i < n; i++) {
736 275259           SV **elem = av_fetch(av, i, 0);
737 275259 50         all_elems[idx++] = elem ? *elem : &PL_sv_undef;
738             }
739             }
740 1311           encode_column(aTHX_ b, all_elems, total_elems, t->inner);
741             }
742 1313           break;
743             }
744              
745 1034           case T_TUPLE: {
746             int i;
747 1034           SV **elem_values = alloc_sv_array(aTHX_ num_rows);
748              
749 3098 100         for (i = 0; i < t->tuple_len; i++) {
750 6211 100         for (r = 0; r < num_rows; r++) {
751 4147           SV *val = values[r];
752 4147 100         if (!SvROK(val))
753 1           croak("Expected arrayref or hashref for Tuple type");
754 4146           SV *rv = SvRV(val);
755 4146 100         if (SvTYPE(rv) == SVt_PVAV) {
756 4139           SV **elem = av_fetch((AV*)rv, i, 0);
757 4139 50         elem_values[r] = elem ? *elem : &PL_sv_undef;
758 7 50         } else if (SvTYPE(rv) == SVt_PVHV
759 7 100         && t->tuple_names && t->tuple_names[i]) {
    50          
760             /* Named-tuple input as hashref: look up by name. */
761 6           SV **elem = hv_fetch((HV*)rv, t->tuple_names[i],
762             strlen(t->tuple_names[i]), 0);
763 6 100         elem_values[r] = elem ? *elem : &PL_sv_undef;
764 1 50         } else if (SvTYPE(rv) == SVt_PVHV) {
765 1           croak("Tuple given as hashref but type is unnamed; "
766             "use arrayref or declare named Tuple(name Type, ...)");
767             } else {
768 0           croak("Expected arrayref or hashref for Tuple type");
769             }
770             }
771 2064           encode_column(aTHX_ b, elem_values, num_rows, t->tuple[i]);
772             }
773 1032           break;
774             }
775              
776 1241           case T_NULLABLE: {
777 44330 100         for (r = 0; r < num_rows; r++) {
778 43089           buf_byte(aTHX_ b,!SvOK(values[r]) ? 1 : 0);
779             }
780              
781 1241 100         if (is_simple_type(t->inner)) {
782 44300 100         for (r = 0; r < num_rows; r++) {
783 43067 100         if (!SvOK(values[r])) encode_null_scalar(aTHX_ b, t->inner);
784 33839           else encode_scalar(aTHX_ b,values[r], t->inner);
785             }
786             } else {
787 8           SV **inner_values = alloc_sv_array(aTHX_ num_rows);
788 30 100         for (r = 0; r < num_rows; r++) {
789 22 100         if (!SvOK(values[r]))
790 9           inner_values[r] = make_null_placeholder(aTHX_ t->inner);
791             else
792 13           inner_values[r] = values[r];
793             }
794 8           encode_column(aTHX_ b, inner_values, num_rows, t->inner);
795             }
796 1241           break;
797             }
798              
799 9           case T_MAP: {
800             /* Wire format is identical to Array(Tuple(K, V)). Each row's value
801             * is either a hashref or arrayref-of-pairs; normalize to a flat
802             * arrayref of [k, v] pairs and reuse the Array(Tuple) path. */
803 9           SV **norm = alloc_sv_array(aTHX_ num_rows);
804 25 100         for (r = 0; r < num_rows; r++) {
805 17           SV *val = values[r];
806 17 100         if (!SvROK(val))
807 1           croak("Expected hashref or arrayref for Map type");
808 16           SV *rv = SvRV(val);
809 16 100         if (SvTYPE(rv) == SVt_PVAV) {
810 7           norm[r] = val; /* already arrayref-of-pairs */
811 9 50         } else if (SvTYPE(rv) == SVt_PVHV) {
812 9           HV *hv = (HV *)rv;
813 9           AV *pairs = newAV();
814 9           hv_iterinit(hv);
815             HE *he;
816 22 100         while ((he = hv_iternext(hv))) {
817             I32 klen;
818 13           char *kstr = hv_iterkey(he, &klen);
819 13           SV *vsv = hv_iterval(hv, he);
820 13           AV *pair = newAV();
821 13           av_push(pair, newSVpvn(kstr, klen));
822 13           av_push(pair, SvREFCNT_inc(vsv));
823 13           av_push(pairs, newRV_noinc((SV *)pair));
824             }
825 9           norm[r] = sv_2mortal(newRV_noinc((SV *)pairs));
826             } else {
827 0           croak("Expected hashref or arrayref for Map type");
828             }
829             }
830              
831             /* Synthesize the Array(Tuple(K,V)) wire path. */
832 8           uint64_t offset = 0;
833 8           SSize_t total = 0;
834 24 100         for (r = 0; r < num_rows; r++) {
835 16           AV *av = (AV *)SvRV(norm[r]);
836 16           SSize_t n = av_len(av) + 1;
837 16           offset += (uint64_t)n;
838 16           total += n;
839 16           buf_le64(aTHX_ b, offset);
840             }
841 8 50         if (total > 0) {
842 8           SV **all = alloc_sv_array(aTHX_ total);
843 8           SSize_t idx = 0;
844 24 100         for (r = 0; r < num_rows; r++) {
845 16           AV *av = (AV *)SvRV(norm[r]);
846 16           SSize_t n = av_len(av) + 1;
847             SSize_t i;
848 33 100         for (i = 0; i < n; i++) {
849 17           SV **elem = av_fetch(av, i, 0);
850 17 50         all[idx++] = elem ? *elem : &PL_sv_undef;
851             }
852             }
853             /* Encode as Tuple(K, V): first all keys, then all values. */
854             int j;
855 24 100         for (j = 0; j < 2; j++) {
856 16           SV **col = alloc_sv_array(aTHX_ total);
857             SSize_t k;
858 50 100         for (k = 0; k < total; k++) {
859 34 50         if (!SvROK(all[k]) || SvTYPE(SvRV(all[k])) != SVt_PVAV)
    50          
860 0           croak("Map pair must be a 2-element arrayref");
861 34           AV *pair = (AV *)SvRV(all[k]);
862 34           SV **e = av_fetch(pair, j, 0);
863 34 50         col[k] = e ? *e : &PL_sv_undef;
864             }
865 16           encode_column(aTHX_ b, col, total, t->tuple[j]);
866             }
867             }
868 8           break;
869             }
870              
871 16           case T_VARIANT: {
872             /* Variant wire format (CH 24.1+):
873             * UInt64 LE mode = 0 (non-shared serialization)
874             * UInt8[N] wire discriminators (255 = NULL, else 0..n-1
875             * referring to the alphabetical-order position)
876             * one per variant, in alphabetical order of type
877             * names; each contains the values whose
878             * discriminator equals that variant's wire index
879             *
880             * The user passes [decl_idx, value] using declaration order;
881             * variant_decl_to_wire[d] / variant_wire_to_decl[w] handle
882             * the alphabetical reordering ClickHouse requires. */
883 16           int nvar = t->tuple_len;
884             #define VARIANT_NULL 255
885              
886 16           buf_le64(aTHX_ b, 0);
887              
888             int *counts; /* indexed by declaration idx */
889 16           Newxz(counts, nvar, int);
890 16           SAVEFREEPV(counts);
891             SV ***per_var; /* per_var[decl_idx] = values for that variant */
892 16           Newxz(per_var, nvar, SV **);
893 16           SAVEFREEPV(per_var);
894             int v;
895 50 100         for (v = 0; v < nvar; v++)
896 34           per_var[v] = alloc_sv_array(aTHX_ num_rows);
897              
898 43 100         for (r = 0; r < num_rows; r++) {
899 32           SV *val = values[r];
900 32 100         if (!SvOK(val)) {
901 12           buf_byte(aTHX_ b, VARIANT_NULL);
902 12           continue;
903             }
904 20 100         if (!SvROK(val) || SvTYPE(SvRV(val)) != SVt_PVAV)
    50          
905 2           croak("Variant value must be undef or [variant_idx, value]");
906 18           AV *av = (AV *)SvRV(val);
907 18 100         if (av_len(av) + 1 != 2)
908 1           croak("Variant value must be a 2-element [idx, value] pair");
909 17           SV **idx_sv = av_fetch(av, 0, 0);
910 17           SV **vsv = av_fetch(av, 1, 0);
911 17 50         IV idx = idx_sv ? SvIV(*idx_sv) : -1;
912 17 50         if (idx < 0 || idx >= nvar)
    100          
913 2           croak("Variant index %" IVdf " out of range (0..%d)",
914             idx, nvar - 1);
915 15           buf_byte(aTHX_ b, (uint8_t)t->variant_decl_to_wire[idx]);
916 15 50         per_var[idx][counts[idx]++] = vsv ? *vsv : &PL_sv_undef;
917             }
918              
919             /* Emit sub-columns in wire (alphabetical) order. */
920 35 100         for (v = 0; v < nvar; v++) {
921 24           int d = t->variant_wire_to_decl[v];
922 24           encode_column(aTHX_ b, per_var[d], counts[d], t->tuple[d]);
923             }
924             #undef VARIANT_NULL
925 11           break;
926             }
927              
928 21           case T_LOWCARDINALITY: {
929             /* LowCardinality block format (per ClickHouse Native protocol):
930             * UInt64 LE serialization version (= 1)
931             * UInt64 LE flags: HasAdditionalKeys(=1<<9) | index-type-bits(0..3)
932             * UInt64 LE dictionary keys count
933             * keys serialized using the inner type's column format
934             * UInt64 LE index count (= num_rows)
935             * packed UInt8/16/32/64 according to flags low byte
936             *
937             * For Nullable(T) the underlying inner type for serialization is
938             * the bare T; NULL is represented by reserving dictionary index 0
939             * (which always carries the "default" value) and the wire format
940             * for the actual non-NULL dict starts at index 1.
941             */
942 21           TypeInfo *inner = t->inner;
943 21           int is_null = (inner->code == T_NULLABLE);
944 21 100         TypeInfo *leaf = is_null ? inner->inner : inner;
945              
946 21           HV *seen = (HV *)sv_2mortal((SV *)newHV());
947 21           AV *dict_av = (AV *)sv_2mortal((SV *)newAV());
948              
949             /* For FixedString(N) inner, dedup by the canonical N-byte form
950             * (truncate or zero-pad) so inputs of different length that
951             * encode to the same wire bytes collapse into one dict slot. */
952 21 100         int fixed_n = (leaf->code == T_FIXEDSTRING) ? leaf->param : 0;
953 21           SV *canon_holder = NULL;
954 21 100         if (fixed_n > 0) {
955 1           canon_holder = sv_2mortal(newSV(fixed_n));
956 1           SvPOK_only(canon_holder);
957 1           SvCUR_set(canon_holder, fixed_n);
958             }
959              
960             /* Reserve dict slot 0. For Nullable inner this slot is the NULL
961             * sentinel and must not be reused for the literal default value;
962             * for non-Nullable it IS the default (empty / N-zero-bytes). */
963 21           av_push(dict_av, newSVpvn("", 0));
964 21 100         if (!is_null) {
965 14 100         if (fixed_n > 0) {
966 1           char *cz = SvPVX(canon_holder);
967 1           memset(cz, 0, fixed_n);
968 1           hv_store(seen, cz, fixed_n, newSViv(0), 0);
969             } else {
970 13           hv_store(seen, "", 0, newSViv(0), 0);
971             }
972             }
973              
974             /* Pre-allocate index buffer. */
975 21           STRLEN idx_buf_len = num_rows * sizeof(uint64_t);
976 21           SV *idx_sv = sv_2mortal(newSV(idx_buf_len));
977 21           SvPOK_only(idx_sv);
978 21           SvCUR_set(idx_sv, idx_buf_len);
979 21           uint64_t *indices = (uint64_t *)SvPVX(idx_sv);
980              
981 67423 100         for (r = 0; r < num_rows; r++) {
982 67402           SV *val = values[r];
983 67402 100         if (!SvOK(val)) {
984             /* Nullable: slot 0 is the NULL sentinel. Non-nullable:
985             * coerce undef to empty string (matches plain String
986             * behaviour) without invoking SvPV on undef, which
987             * would emit a "Use of uninitialized value" warning. */
988 12 100         if (is_null) { indices[r] = 0; continue; }
989 3           val = sv_2mortal(newSVpvn("", 0));
990             }
991             STRLEN slen;
992 67393           const char *s = SvPV(val, slen);
993             const char *key;
994             STRLEN klen;
995 67393 100         if (fixed_n > 0) {
996             /* Canonicalize to exactly N bytes: truncate or zero-pad. */
997 6           char *cz = SvPVX(canon_holder);
998 6 100         if (slen >= (STRLEN)fixed_n) {
999 5           memcpy(cz, s, fixed_n);
1000             } else {
1001 1           memcpy(cz, s, slen);
1002 1           memset(cz + slen, 0, fixed_n - slen);
1003             }
1004 6           key = cz;
1005 6           klen = fixed_n;
1006             } else {
1007 67387           key = s;
1008 67387           klen = slen;
1009             }
1010 67393           SV **slot_sv = hv_fetch(seen, key, klen, 0);
1011 67393 100         if (slot_sv) {
1012 721           indices[r] = (uint64_t)SvUV(*slot_sv);
1013             } else {
1014 66672           UV pos = (UV)(av_len(dict_av) + 1);
1015 66672           av_push(dict_av, newSVpvn(key, klen));
1016 66672           hv_store(seen, key, klen, newSVuv(pos), 0);
1017 66672           indices[r] = (uint64_t)pos;
1018             }
1019             }
1020              
1021 21           UV dict_count = av_len(dict_av) + 1;
1022             int idx_bytes;
1023             int idx_type;
1024 21 100         if (dict_count <= 256) { idx_type = 0; idx_bytes = 1; }
1025 4 50         else if (dict_count <= 65536) { idx_type = 1; idx_bytes = 2; }
1026 0 0         else if (dict_count <= 0xFFFFFFFF) { idx_type = 2; idx_bytes = 4; }
1027 0           else { idx_type = 3; idx_bytes = 8; }
1028              
1029 21           uint64_t flags = (uint64_t)idx_type | (1ULL << 9); /* HasAdditionalKeys */
1030              
1031 21           buf_le64(aTHX_ b, 1); /* version */
1032 21           buf_le64(aTHX_ b, flags);
1033 21           buf_le64(aTHX_ b, (uint64_t)dict_count);
1034              
1035             /* Serialize the dict using the leaf type. */
1036 21           SV **dict_vals = alloc_sv_array(aTHX_ (SSize_t)dict_count);
1037             SSize_t di;
1038 66714 100         for (di = 0; di < (SSize_t)dict_count; di++) {
1039 66693           SV **e = av_fetch(dict_av, di, 0);
1040 66693 50         dict_vals[di] = e ? *e : &PL_sv_undef;
1041             }
1042 21           encode_column(aTHX_ b, dict_vals, (SSize_t)dict_count, leaf);
1043              
1044 21           buf_le64(aTHX_ b, (uint64_t)num_rows);
1045              
1046 67423 100         for (r = 0; r < num_rows; r++) {
1047 67402           uint64_t v = indices[r];
1048 67402           switch (idx_bytes) {
1049 309           case 1: buf_byte(aTHX_ b, (uint8_t)v); break;
1050 67093           case 2: buf_le16(aTHX_ b, (uint16_t)v); break;
1051 0           case 4: buf_le32(aTHX_ b, (uint32_t)v); break;
1052 0           case 8: buf_le64(aTHX_ b, v); break;
1053             }
1054             }
1055 21           break;
1056             }
1057              
1058 211           case T_JSON: {
1059             /* Per doc/json-research/README.md (validated byte-for-byte
1060             * against ClickHouse 26.3). Wire layout (V1):
1061             * Object prefix: UInt64=0, varint K (twice for V1),
1062             * K * lenstr path -- K = dynamic paths
1063             * For each typed path (sorted): inner prefix (empty for simple types)
1064             * For each dynamic path: Dynamic prefix (UInt64=1, varint T x2,
1065             * T * lenstr type-name, UInt64=0)
1066             * SharedData prefix: empty for MAP version
1067             * For each typed path (sorted): inner column data
1068             * For each dynamic path: N disc bytes + per-variant data
1069             * Shared data: N * UInt64 LE zero
1070             *
1071             * t->tuple_len > 0 means JSON(name Type, ...) was declared:
1072             * those paths emit as regular columns, the rest as Dynamic.
1073             */
1074              
1075             /* Step 1: flatten each row's hashref to dotted-path leaves.
1076             * Typed paths whose declared inner type is itself a hash
1077             * shape (Map / Tuple) must NOT be recursed into during
1078             * flatten - the user's nested hash for those is the leaf
1079             * value, not a sub-object. Build a stop-set of those names
1080             * and pass it to flatten so it stops recursing at them. */
1081 211           HV *stop_paths = NULL;
1082 211 100         if (t->tuple_len > 0) {
1083 15           stop_paths = (HV*)sv_2mortal((SV*)newHV());
1084             int sk;
1085 35 100         for (sk = 0; sk < t->tuple_len; sk++) {
1086             /* Unwrap Nullable so JSON(x Nullable(Map(...))) also
1087             * stops at "x" rather than flattening the inner
1088             * hash and losing the user's data. */
1089 20           TypeInfo *inner = t->tuple[sk];
1090 20 100         if (inner->code == T_NULLABLE) inner = inner->inner;
1091 20 100         if (inner->code == T_MAP || inner->code == T_TUPLE) {
    50          
1092 3           STRLEN nlen = strlen(t->tuple_names[sk]);
1093 3           hv_store(stop_paths, t->tuple_names[sk],
1094             (I32)nlen, newSViv(1), 0);
1095             }
1096             }
1097             }
1098              
1099 211           HV **row_hvs = NULL;
1100 211 100         if (num_rows > 0) {
1101 209 50         Newxz(row_hvs, num_rows, HV*);
1102 209           SAVEFREEPV(row_hvs);
1103             }
1104 211           HV *all_paths = (HV*)sv_2mortal((SV*)newHV());
1105              
1106 1918 100         for (r = 0; r < num_rows; r++) {
1107 1709           SV *val = values[r];
1108 1709 100         if (!SvOK(val)) { row_hvs[r] = NULL; continue; }
1109 1568 100         if (!SvROK(val) || SvTYPE(SvRV(val)) != SVt_PVHV)
    50          
1110 1           croak("JSON column row %" IVdf
1111             ": must be hashref or undef", (IV)r);
1112 1567           HV *flat = (HV*)sv_2mortal((SV*)newHV());
1113 1567           flatten_json_hash(aTHX_ (HV*)SvRV(val), NULL, 0, flat,
1114             stop_paths);
1115 1566           row_hvs[r] = flat;
1116             }
1117              
1118             /* Step 1b: if typed paths are declared, extract their values
1119             * per row from each flat HV (and hv_delete to keep them out
1120             * of the dynamic-paths discovery below). For missing keys
1121             * we substitute a type-appropriate placeholder: undef for
1122             * Nullable/Variant, [] for Array/Map, "" for String, 0 for
1123             * numerics. The inner encoder then emits the default
1124             * bytes the same way it would for any other column. */
1125 209           SV ***typed_vals = NULL;
1126 209           int n_typed = t->tuple_len;
1127 209 100         if (n_typed > 0 && num_rows > 0) {
    100          
1128 14           Newxz(typed_vals, n_typed, SV **);
1129 14           SAVEFREEPV(typed_vals);
1130             int tp;
1131 32 100         for (tp = 0; tp < n_typed; tp++) {
1132 18           typed_vals[tp] = alloc_sv_array(aTHX_ num_rows);
1133 18           STRLEN nlen = strlen(t->tuple_names[tp]);
1134 36           SV *missing = (t->tuple[tp]->code == T_NULLABLE)
1135             ? &PL_sv_undef
1136 18 100         : make_null_placeholder(aTHX_ t->tuple[tp]);
1137 54 100         for (r = 0; r < num_rows; r++) {
1138 36 50         if (!row_hvs[r]) {
1139 0           typed_vals[tp][r] = missing;
1140 0           continue;
1141             }
1142 36           SV **e = hv_fetch(row_hvs[r], t->tuple_names[tp],
1143             (I32)nlen, 0);
1144 36 100         if (e) {
1145 30 100         typed_vals[tp][r] = SvOK(*e) ? *e : missing;
1146             /* hv_delete unconditionally on key presence
1147             * so an explicit `undef` value doesn't leak
1148             * into the dynamic-paths discovery below
1149             * (would double-emit the path). */
1150 30           hv_delete(row_hvs[r], t->tuple_names[tp],
1151             (I32)nlen, G_DISCARD);
1152             } else {
1153 6           typed_vals[tp][r] = missing;
1154             }
1155             }
1156             }
1157             }
1158              
1159             /* Step 1c: union remaining dynamic-path keys across rows. */
1160 1916 100         for (r = 0; r < num_rows; r++) {
1161 1707 100         if (!row_hvs[r]) continue;
1162 1566           hv_iterinit(row_hvs[r]);
1163             HE *he;
1164 4399 100         while ((he = hv_iternext(row_hvs[r]))) {
1165             I32 klen;
1166 2833           char *kstr = hv_iterkey(he, &klen);
1167 2833           SV **e = hv_fetch(row_hvs[r], kstr, klen, 0);
1168 2833 50         if (e && SvOK(*e))
    100          
1169 2622           hv_store(all_paths, kstr, klen, newSViv(1), 0);
1170             }
1171             }
1172              
1173             /* Step 2: collect & sort path names. Snapshot (ptr, len) into
1174             * a paired struct array so the sort preserves the original
1175             * length even if a key contains an embedded NUL byte (legal
1176             * in Perl HVs; strlen() after sort would silently truncate). */
1177 209           int num_paths = 0;
1178 209           char **paths = NULL;
1179 209           STRLEN *path_lens = NULL;
1180             {
1181 209 50         int total = (int)HvUSEDKEYS(all_paths);
1182 209 100         if (total > 0) {
1183 194           Newx(paths, total, char*);
1184 194           SAVEFREEPV(paths);
1185 194           Newx(path_lens, total, STRLEN);
1186 194           SAVEFREEPV(path_lens);
1187             PathEntry *pe;
1188 194           Newx(pe, total, PathEntry);
1189 194           SAVEFREEPV(pe);
1190 194           hv_iterinit(all_paths);
1191             HE *he;
1192 2237 100         while ((he = hv_iternext(all_paths))) {
1193             I32 klen;
1194 2043           char *kstr = hv_iterkey(he, &klen);
1195 2043           pe[num_paths].path = kstr;
1196 2043           pe[num_paths].len = (STRLEN)klen;
1197 2043           num_paths++;
1198             }
1199 194 100         if (num_paths > 1)
1200 171           qsort(pe, num_paths, sizeof(*pe), cmp_path_entry);
1201             int pi;
1202 2237 100         for (pi = 0; pi < num_paths; pi++) {
1203 2043           paths[pi] = pe[pi].path;
1204 2043           path_lens[pi] = pe[pi].len;
1205             }
1206             }
1207             }
1208              
1209             /* Step 3: per-path kind mask. Walk each leaf; scalars and
1210             * recognized bool refs classify directly; arrayrefs classify
1211             * via json_classify_array (homogeneous element kind). Hash
1212             * refs or heterogeneous arrays fail loud with location. */
1213 209           unsigned *kind_masks = NULL;
1214 209 100         if (num_paths > 0) {
1215 194           Newxz(kind_masks, num_paths, unsigned);
1216 194           SAVEFREEPV(kind_masks);
1217             int p;
1218 2236 100         for (p = 0; p < num_paths; p++) {
1219 23582 100         for (r = 0; r < num_rows; r++) {
1220 21540 100         if (!row_hvs[r]) continue;
1221 20139           SV **e = hv_fetch(row_hvs[r], paths[p],
1222             (I32)path_lens[p], 0);
1223 20139 100         if (!e || !SvOK(*e)) continue;
    100          
1224 2622 100         if (SvROK(*e) && !json_is_bool_ref(aTHX_ *e)
    100          
1225 849 50         && SvTYPE(SvRV(*e)) != SVt_PVAV)
1226 0           croak("JSON column row %" IVdf
1227             " path '%s': "
1228             "hash refs as leaves are not "
1229             "supported (already flattened); "
1230             "got %s",
1231             (IV)r, paths[p],
1232             sv_reftype(SvRV(*e), 0));
1233 2622           int k = json_classify_value(aTHX_ *e);
1234 2622 100         if (k < 0)
1235 1           croak("JSON column row %" IVdf " path '%s': "
1236             "heterogeneous or unsupported array "
1237             "(elements must all be int, float, "
1238             "bool, or string)",
1239             (IV)r, paths[p]);
1240 2621           kind_masks[p] |= 1u << k;
1241             }
1242             }
1243             }
1244              
1245             /* Step 4: emit Object structure prefix (V1). */
1246 208           buf_le64(aTHX_ b, 0); /* Object V1 */
1247 208           buf_varint(aTHX_ b, (UV)num_paths); /* max_dynamic_paths */
1248 208           buf_varint(aTHX_ b, (UV)num_paths); /* actual count */
1249             int p;
1250 2250 100         for (p = 0; p < num_paths; p++)
1251 2042           buf_string(aTHX_ b, paths[p], path_lens[p]);
1252              
1253             /* Per-path wire-slot tables. path_slots[p*5+s] is the lex-ordered
1254             * slot at index s (kind index or -1 for SharedVariant);
1255             * wire_slot_counts[p] is the slot count for path p. Computed
1256             * once, used by both prefix and data loops below. */
1257 208           int *path_slots = NULL;
1258 208           int *wire_slot_counts = NULL;
1259 208 100         if (num_paths > 0) {
1260 193           Newx(path_slots, num_paths * JSON_LEX_SLOTS, int);
1261 193           SAVEFREEPV(path_slots);
1262 193           Newx(wire_slot_counts, num_paths, int);
1263 193           SAVEFREEPV(wire_slot_counts);
1264 2235 100         for (p = 0; p < num_paths; p++)
1265 2042           wire_slot_counts[p] =
1266 2042           json_build_lex_table(kind_masks[p], path_slots + p*JSON_LEX_SLOTS);
1267             }
1268              
1269             /* Step 5: per-path Dynamic V1 prefix + Variant mode. */
1270 2250 100         for (p = 0; p < num_paths; p++) {
1271 2042           int wire_slots = wire_slot_counts[p];
1272 2042           int kc = wire_slots - 1; /* minus SharedVariant */
1273              
1274 2042           buf_le64(aTHX_ b, 1); /* Dynamic V1 */
1275 2042           buf_varint(aTHX_ b, (UV)kc); /* max_dynamic_types */
1276 2042           buf_varint(aTHX_ b, (UV)kc); /* actual count */
1277              
1278             /* User variant type names in lex order (skip SharedVariant). */
1279 2042           int *slots = path_slots + p*JSON_LEX_SLOTS;
1280             int s;
1281 6414 100         for (s = 0; s < wire_slots; s++) {
1282 4372           int k = slots[s];
1283 4372 100         if (k < 0) continue;
1284 2330           const char *nm = json_kind_type_name[k];
1285 2330           buf_string(aTHX_ b, nm, strlen(nm));
1286             }
1287 2042           buf_le64(aTHX_ b, 0); /* Variant BASIC */
1288             }
1289              
1290             /* Step 5b: emit typed-path column data (sorted by name).
1291             * Typed paths come before dynamic-path Variant data on the
1292             * wire; their inner types are simple-prefix so encode_column
1293             * emits only the column body. typed_vals is only allocated
1294             * when both n_typed and num_rows are positive; skip when
1295             * num_rows == 0 to avoid dereferencing a NULL outer array. */
1296 208 100         if (n_typed > 0 && num_rows > 0) {
    100          
1297             int tp;
1298 32 100         for (tp = 0; tp < n_typed; tp++)
1299 18           encode_column(aTHX_ b, typed_vals[tp], num_rows,
1300 18           t->tuple[tp]);
1301             }
1302              
1303             /* Step 6: per-path Variant data (discs + per-variant values).
1304             * For Array(T) variants the column's wire layout is:
1305             * N UInt64 LE offsets (cumulative element counts)
1306             *
1307             * We emit offsets and inner data in a single pass per variant. */
1308 2250 100         for (p = 0; p < num_paths; p++) {
1309 2042           int wire_slots = wire_slot_counts[p];
1310 2042           int *slots = path_slots + p*JSON_LEX_SLOTS;
1311              
1312             /* Discriminator byte per row. */
1313 23581 100         for (r = 0; r < num_rows; r++) {
1314 21539 100         if (!row_hvs[r]) { buf_byte(aTHX_ b, 0xff); continue; }
1315 20138           SV **e = hv_fetch(row_hvs[r], paths[p],
1316             (I32)path_lens[p], 0);
1317 20138 100         if (!e || !SvOK(*e)) { buf_byte(aTHX_ b, 0xff); continue; }
    100          
1318 2621           int k = json_classify_value(aTHX_ *e);
1319 2621           buf_byte(aTHX_ b,
1320 2621           (uint8_t)json_kind_disc_in(k, slots, wire_slots));
1321             }
1322              
1323             /* Per-variant data in lex order. SharedVariant has zero
1324             * rows in our encoder's output. */
1325             int s;
1326 6414 100         for (s = 0; s < wire_slots; s++) {
1327 4372           int k_match = slots[s];
1328 4372 100         if (k_match < 0) continue;
1329              
1330             /* Array(T) variants: first pass emits offsets (so the
1331             * downstream offset cursor is contiguous) and counts
1332             * total elements; second pass emits inner values. */
1333 2330           int is_array = (k_match >= JV_ARRAY_BOOL
1334 2330 50         && k_match <= JV_ARRAY_STRING);
    100          
1335 2330 100         if (is_array) {
1336 847           uint64_t offset = 0;
1337 9980 100         for (r = 0; r < num_rows; r++) {
1338 9133 100         if (!row_hvs[r]) continue;
1339 8573           SV **e = hv_fetch(row_hvs[r], paths[p],
1340             (I32)path_lens[p], 0);
1341 8573 100         if (!e || !SvOK(*e)) continue;
    100          
1342 855 100         if (json_classify_value(aTHX_ *e) != k_match)
1343 7           continue;
1344 848           AV *av = (AV*)SvRV(*e);
1345 848           SSize_t n = av_len(av) + 1;
1346 848           offset += (uint64_t)n;
1347 848           buf_le64(aTHX_ b, offset);
1348             }
1349             }
1350              
1351             /* Element-value pass (Array(T)) or scalar pass (T). */
1352 26734 100         for (r = 0; r < num_rows; r++) {
1353 24404 100         if (!row_hvs[r]) continue;
1354 22723           SV **e = hv_fetch(row_hvs[r], paths[p],
1355             (I32)path_lens[p], 0);
1356 22723 100         if (!e || !SvOK(*e)) continue;
    100          
1357 3483 100         if (json_classify_value(aTHX_ *e) != k_match) continue;
1358              
1359 2621 100         if (is_array) {
1360 848           AV *av = (AV*)SvRV(*e);
1361 848           SSize_t n = av_len(av) + 1, i;
1362 2845 100         for (i = 0; i < n; i++) {
1363 1997           SV **elem = av_fetch(av, i, 0);
1364 1997 50         SV *ev = (elem && SvOK(*elem))
1365 3994 50         ? *elem : &PL_sv_undef;
1366 1997           json_emit_array_elem(aTHX_ b, ev, k_match);
1367             }
1368             } else {
1369 1773           json_emit_scalar(aTHX_ b, *e, k_match);
1370             }
1371             }
1372             }
1373             }
1374              
1375             /* Step 7: shared data trailer (Array(Tuple(String,String))
1376             * with all rows empty -> N UInt64 LE zero offsets). */
1377 208 100         if (num_rows > 0) {
1378 206           STRLEN nbytes = (STRLEN)num_rows * 8;
1379 206           buf_grow(aTHX_ b, nbytes);
1380 206           memset(b->ptr + b->len, 0, nbytes);
1381 206           b->len += nbytes;
1382             }
1383 208           break;
1384             }
1385              
1386 6           case T_DYNAMIC: {
1387             /* Standalone Dynamic column: same wire format as one JSON
1388             * path's Dynamic sub-column (Dynamic V1 prefix + Variant
1389             * mode + Variant data) with no Object wrapping or shared
1390             * data trailer. Each row is a scalar / array / undef. */
1391 6           unsigned mask = 0;
1392 6           int *row_kinds = NULL;
1393 6 50         if (num_rows > 0) {
1394 6 50         Newx(row_kinds, num_rows, int);
1395 6           SAVEFREEPV(row_kinds);
1396             }
1397 23 100         for (r = 0; r < num_rows; r++) {
1398 18           SV *val = values[r];
1399 18 100         if (!SvOK(val)) { row_kinds[r] = -1; continue; }
1400 13 100         if (SvROK(val) && !json_is_bool_ref(aTHX_ val)
    100          
1401 4 100         && SvTYPE(SvRV(val)) != SVt_PVAV)
1402 1           croak("Dynamic row %" IVdf ": hash refs are not "
1403             "supported; use JSON column instead "
1404             "(got %s)", (IV)r, sv_reftype(SvRV(val), 0));
1405 12           int k = json_classify_value(aTHX_ val);
1406 12 50         if (k < 0)
1407 0           croak("Dynamic row %" IVdf ": heterogeneous or "
1408             "unsupported array", (IV)r);
1409 12           row_kinds[r] = k;
1410 12           mask |= 1u << k;
1411             }
1412              
1413             int slots[JSON_LEX_SLOTS];
1414 5           int wire_slots = json_build_lex_table(mask, slots);
1415 5           int kc = wire_slots - 1;
1416              
1417 5           buf_le64(aTHX_ b, 1); /* Dynamic V1 */
1418 5           buf_varint(aTHX_ b, (UV)kc); /* max_dynamic_types */
1419 5           buf_varint(aTHX_ b, (UV)kc); /* actual count */
1420             int s;
1421 22 100         for (s = 0; s < wire_slots; s++) {
1422 17           int k = slots[s];
1423 17 100         if (k < 0) continue;
1424 12           const char *nm = json_kind_type_name[k];
1425 12           buf_string(aTHX_ b, nm, strlen(nm));
1426             }
1427 5           buf_le64(aTHX_ b, 0); /* Variant BASIC */
1428              
1429             /* Discriminator byte per row. */
1430 22 100         for (r = 0; r < num_rows; r++) {
1431 17 100         if (row_kinds[r] < 0) { buf_byte(aTHX_ b, 0xff); continue; }
1432 12           buf_byte(aTHX_ b,
1433 12           (uint8_t)json_kind_disc_in(row_kinds[r],
1434             slots, wire_slots));
1435             }
1436              
1437             /* Per-variant data in lex order. */
1438 22 100         for (s = 0; s < wire_slots; s++) {
1439 17           int k_match = slots[s];
1440 17 100         if (k_match < 0) continue;
1441 12           int is_array = (k_match >= JV_ARRAY_BOOL
1442 12 50         && k_match <= JV_ARRAY_STRING);
    100          
1443              
1444 12 100         if (is_array) {
1445 3           uint64_t offset = 0;
1446 12 100         for (r = 0; r < num_rows; r++) {
1447 9 100         if (row_kinds[r] != k_match) continue;
1448 3           AV *av = (AV*)SvRV(values[r]);
1449 3           offset += (uint64_t)(av_len(av) + 1);
1450 3           buf_le64(aTHX_ b, offset);
1451             }
1452 12 100         for (r = 0; r < num_rows; r++) {
1453 9 100         if (row_kinds[r] != k_match) continue;
1454 3           AV *av = (AV*)SvRV(values[r]);
1455 3           SSize_t n = av_len(av) + 1, i;
1456 10 100         for (i = 0; i < n; i++) {
1457 7           SV **elem = av_fetch(av, i, 0);
1458 7 50         SV *ev = (elem && SvOK(*elem))
1459 14 50         ? *elem : &PL_sv_undef;
1460 7           json_emit_array_elem(aTHX_ b, ev, k_match);
1461             }
1462             }
1463             } else {
1464 44 100         for (r = 0; r < num_rows; r++) {
1465 35 100         if (row_kinds[r] != k_match) continue;
1466 9           json_emit_scalar(aTHX_ b, values[r], k_match);
1467             }
1468             }
1469             }
1470 5           break;
1471             }
1472              
1473 0           default:
1474 0           croak("encode_column: unhandled type code %d", t->code);
1475             }
1476 9113           }