File Coverage

Encoder.xs
Criterion Covered Total %
statement 333 347 95.9
branch 182 264 68.9
condition n/a
subroutine n/a
pod n/a
total 515 611 84.2


line stmt bran cond sub pod time code
1             #define PERL_NO_GET_CONTEXT
2             #include "EXTERN.h"
3             #include "perl.h"
4             #include "XSUB.h"
5              
6             #include
7             #include
8              
9             #if IVSIZE < 8
10             #error "ClickHouse::Encoder requires a 64-bit Perl (IVSIZE >= 8)"
11             #endif
12              
13             #include "types.h"
14             #include "buffer.h"
15             #include "encode.h"
16             #include "decode.h"
17             #include "runtime.h"
18             #include "cityhash.h"
19              
20             /* Read one varint from an SV's byte buffer starting at *off; advances
21             * *off past the consumed bytes. Croaks with TCP-style messages on
22             * truncation or overflow. Used by the ClickHouse::Encoder::TCP XSUBs
23             * which operate on offsets into a Perl scalar rather than the (p,end)
24             * cursor pair used by the internal native decoder. */
25 86           static UV tcp_read_varint(pTHX_ const unsigned char *p, UV buf_len,
26             UV *off) {
27 86           UV v = 0;
28 86           int shift = 0;
29 16           while (1) {
30 102 100         if (*off >= buf_len)
31 6           croak("varint: truncated at offset %lu", (unsigned long)*off);
32 96           unsigned char b = p[(*off)++];
33 96           v |= ((UV)(b & 0x7f)) << shift;
34 96 100         if (!(b & 0x80)) break;
35 16           shift += 7;
36 16 50         if (shift >= 64) croak("varint exceeds 64 bits");
37             }
38 80           return v;
39             }
40              
41             MODULE = ClickHouse::Encoder PACKAGE = ClickHouse::Encoder
42              
43             SV*
44             new(class, ...)
45             const char *class
46             CODE:
47             {
48 1521           Encoder *enc = NULL;
49             AV *cols_av;
50             SSize_t i, n;
51 1521           SV *cols_sv = NULL;
52              
53 1521 50         if (items % 2 == 0)
54 0           croak("Expected key-value pairs");
55              
56 3041 100         for (i = 1; i < items; i += 2) {
57             STRLEN klen;
58 1520           const char *key = SvPV(ST(i), klen);
59 1520 50         if (klen == 7 && memcmp(key, "columns", 7) == 0)
    50          
60 1520           cols_sv = ST(i+1);
61             }
62              
63 1521 100         if (!cols_sv || !SvROK(cols_sv) || SvTYPE(SvRV(cols_sv)) != SVt_PVAV)
    100          
    50          
64 2           croak("columns required and must be arrayref");
65              
66 1519           cols_av = (AV*)SvRV(cols_sv);
67 1519           n = av_len(cols_av) + 1;
68              
69             /* Wrap allocation in our own ENTER/LEAVE so the cleanup destructor fires
70             * before this XSUB returns (XSUBs don't get implicit ENTER/LEAVE). */
71 1519           ENTER;
72 1519           Newxz(enc, 1, Encoder);
73 1519           SAVEDESTRUCTOR_X(cleanup_encoder_slot, &enc);
74              
75 1519 50         Newxz(enc->columns, n, Column);
76 1519           enc->num_columns = n;
77              
78 5696 100         for (i = 0; i < n; i++) {
79 4205           SV **col_sv = av_fetch(cols_av, i, 0);
80             AV *col_av;
81             SV **name_sv, **type_sv;
82             STRLEN len;
83             const char *s;
84              
85 4205 50         if (!col_sv || !SvROK(*col_sv) || SvTYPE(SvRV(*col_sv)) != SVt_PVAV)
    50          
    50          
86 0           croak("Column must be [name, type]");
87              
88 4205           col_av = (AV*)SvRV(*col_sv);
89 4205           name_sv = av_fetch(col_av, 0, 0);
90 4205           type_sv = av_fetch(col_av, 1, 0);
91              
92 4205 50         if (!name_sv || !type_sv)
    100          
93 2           croak("Column must be [name, type]");
94              
95 4203           s = SvPV(*name_sv, len);
96 4203           Newx(enc->columns[i].name, len + 1, char);
97 4203           memcpy(enc->columns[i].name, s, len);
98 4203           enc->columns[i].name[len] = 0;
99 4203           enc->columns[i].name_len = len;
100              
101 4203           s = SvPV(*type_sv, len);
102 4203           Newx(enc->columns[i].type_str, len + 1, char);
103 4203           memcpy(enc->columns[i].type_str, s, len);
104 4203           enc->columns[i].type_str[len] = 0;
105 4203           enc->columns[i].type_len = len;
106              
107 4203           enc->columns[i].type = parse_type(aTHX_ enc->columns[i].type_str, len);
108             }
109              
110 1491           RETVAL = newSV(0);
111 1491           sv_setref_pv(RETVAL, class, (void*)enc);
112 1491           enc = NULL; /* Disarm: the SV's DESTROY now owns the encoder. */
113 1491           LEAVE;
114             }
115             OUTPUT:
116             RETVAL
117              
118             SV*
119             encode(self, rows)
120             SV *self
121             SV *rows
122             CODE:
123             {
124             Encoder *enc;
125             Buffer buf;
126              
127 2053 50         if (!sv_isobject(self)) croak("Not an object");
128 2053 100         if (!SvROK(rows) || SvTYPE(SvRV(rows)) != SVt_PVAV)
    50          
129 2           croak("rows must be arrayref");
130              
131 2051           enc = INT2PTR(Encoder*, SvIV(SvRV(self)));
132 2051           buf_init(aTHX_ &buf);
133 2051           do_encode(aTHX_ enc, (AV *)SvRV(rows), &buf);
134 1989           RETVAL = newSVpvn(buf.ptr, buf.len);
135             }
136             OUTPUT:
137             RETVAL
138              
139             void
140             encode_into(self, target_ref, rows)
141             SV *self
142             SV *target_ref
143             SV *rows
144             CODE:
145             {
146             Encoder *enc;
147             Buffer buf;
148             SV *target;
149              
150 4 50         if (!sv_isobject(self)) croak("Not an object");
151 4 100         if (!SvROK(target_ref))
152 2           croak("encode_into: first argument must be a scalar reference");
153 2 50         if (!SvROK(rows) || SvTYPE(SvRV(rows)) != SVt_PVAV)
    50          
154 0           croak("rows must be arrayref");
155              
156 2           target = SvRV(target_ref);
157 2           enc = INT2PTR(Encoder*, SvIV(SvRV(self)));
158              
159 2           buf_init(aTHX_ &buf);
160 2           do_encode(aTHX_ enc, (AV *)SvRV(rows), &buf);
161              
162 2 50         if (!SvOK(target)) sv_setpvn(target, "", 0);
163 2           sv_catpvn(target, buf.ptr, buf.len);
164             }
165              
166             void
167             encode_to_handle(self, fh, rows)
168             SV *self
169             SV *fh
170             SV *rows
171             CODE:
172             {
173             Encoder *enc;
174             Buffer buf;
175             IO *io;
176             PerlIO *pio;
177              
178 5 50         if (!sv_isobject(self)) croak("Not an object");
179 5 50         if (!SvROK(rows) || SvTYPE(SvRV(rows)) != SVt_PVAV)
    50          
180 0           croak("rows must be arrayref");
181 5           io = sv_2io(fh);
182 5 50         if (!io) croak("encode_to_handle: not a filehandle");
183 5           pio = IoOFP(io);
184 5 100         if (!pio) croak("encode_to_handle: filehandle not open for writing");
185              
186 4           enc = INT2PTR(Encoder*, SvIV(SvRV(self)));
187 4           buf_init(aTHX_ &buf);
188 4           do_encode(aTHX_ enc, (AV *)SvRV(rows), &buf);
189              
190 4 100         if (PerlIO_write(pio, buf.ptr, buf.len) != (SSize_t)buf.len)
191 1           croak("encode_to_handle: short write: %s", strerror(errno));
192             }
193              
194             SV*
195             encode_columns(self, cols_hv)
196             SV *self
197             SV *cols_hv
198             CODE:
199             {
200             /* Column-oriented input: cols_hv is a hashref { name => [v0, v1, ...] }
201             * with one array per column, all the same length. Skips the row->column
202             * permutation step that encode() does for arrayref-of-arrayref input. */
203             Encoder *enc;
204             HV *cols;
205             Buffer buf;
206 8           SSize_t num_rows = -1;
207             int c;
208              
209 8 50         if (!sv_isobject(self)) croak("Not an object");
210 8 100         if (!SvROK(cols_hv) || SvTYPE(SvRV(cols_hv)) != SVt_PVHV)
    50          
211 1           croak("encode_columns: arg must be hashref { name => [...] }");
212              
213 7           cols = (HV *)SvRV(cols_hv);
214 7           enc = INT2PTR(Encoder*, SvIV(SvRV(self)));
215              
216             /* Validate: every declared column must be present and same length. */
217 16 100         for (c = 0; c < enc->num_columns; c++) {
218 14           Column *col = &enc->columns[c];
219 14           SV **slot = hv_fetch(cols, col->name, col->name_len, 0);
220 14 100         if (!slot || !SvOK(*slot))
    50          
221 2           croak("encode_columns: missing column '%.*s'",
222             (int)col->name_len, col->name);
223 12 100         if (!SvROK(*slot) || SvTYPE(SvRV(*slot)) != SVt_PVAV)
    50          
224 1           croak("encode_columns: column '%.*s' must be an arrayref",
225             (int)col->name_len, col->name);
226 11           AV *av = (AV *)SvRV(*slot);
227 11           SSize_t n = av_len(av) + 1;
228 11 100         if (num_rows == -1) num_rows = n;
229 5 100         else if (n != num_rows)
230 2           croak("encode_columns: column '%.*s' has %" IVdf " rows, "
231             "expected %" IVdf, (int)col->name_len, col->name,
232             (IV)n, (IV)num_rows);
233             }
234 2 50         if (num_rows == -1) num_rows = 0;
235              
236 2           buf_init(aTHX_ &buf);
237 2           buf_varint(aTHX_ &buf, enc->num_columns);
238 2           buf_varint(aTHX_ &buf, num_rows);
239              
240 2           SV **col_values = alloc_sv_array(aTHX_ num_rows);
241 7 100         for (c = 0; c < enc->num_columns; c++) {
242 5           Column *col = &enc->columns[c];
243 5           SV **slot = hv_fetch(cols, col->name, col->name_len, 0);
244 5           AV *av = (AV *)SvRV(*slot);
245             SSize_t r;
246              
247 5           buf_string(aTHX_ &buf, col->name, col->name_len);
248 5           buf_string(aTHX_ &buf, col->type_str, col->type_len);
249              
250 20 100         for (r = 0; r < num_rows; r++) {
251 15           SV **e = av_fetch(av, r, 0);
252 15 50         if (e) {
253             /* Same magic-realization step as do_encode's gather:
254             * tied per-column AVs return magical placeholders that
255             * downstream SvIV / SvPV won't see until SvGETMAGIC. */
256 15           SvGETMAGIC(*e);
257 15           col_values[r] = *e;
258             } else {
259 0           col_values[r] = &PL_sv_undef;
260             }
261             }
262 5           encode_column(aTHX_ &buf, col_values, num_rows, col->type);
263             }
264              
265 2           RETVAL = newSVpvn(buf.ptr, buf.len);
266             }
267             OUTPUT:
268             RETVAL
269              
270             void
271             stream(self, iter, writer, ...)
272             SV *self
273             SV *iter
274             SV *writer
275             CODE:
276             {
277             /* XS-side iterator loop: pull rows via call_sv, batch them in an AV,
278             * encode and call writer on threshold. Saves per-row Perl method
279             * dispatch compared to driving the loop from Perl. */
280             Encoder *enc;
281 2           int batch_size = 10000;
282             AV *batch;
283             int n;
284              
285 2 50         if (!sv_isobject(self)) croak("Not an object");
286 2 50         if (!SvROK(iter) || SvTYPE(SvRV(iter)) != SVt_PVCV)
    50          
287 0           croak("stream: iter must be a coderef");
288 2 50         if (!SvROK(writer) || SvTYPE(SvRV(writer)) != SVt_PVCV)
    50          
289 0           croak("stream: writer must be a coderef");
290              
291             /* Optional named arg: batch_size => N */
292 2 50         if (items > 3) {
293             int i;
294 4 100         for (i = 3; i < items - 1; i += 2) {
295             STRLEN klen;
296 2           const char *key = SvPV(ST(i), klen);
297 2 50         if (klen == 10 && memcmp(key, "batch_size", 10) == 0)
    50          
298 2           batch_size = (int)SvIV(ST(i+1));
299             }
300 2 50         if (batch_size < 1) batch_size = 1;
301             }
302              
303 2           enc = INT2PTR(Encoder*, SvIV(SvRV(self)));
304 2           batch = (AV *)sv_2mortal((SV *)newAV());
305              
306 50025           for (;;) {
307 50027           dSP;
308 50027           ENTER; SAVETMPS;
309 50027 50         PUSHMARK(SP); PUTBACK;
310 50027           n = call_sv(iter, G_SCALAR);
311 50027           SPAGAIN;
312 50027 50         SV *row = (n > 0) ? POPs : &PL_sv_undef;
313 50027           int got = SvOK(row);
314 50027 100         if (got) SvREFCNT_inc(row);
315 50027           PUTBACK;
316 50027 50         FREETMPS; LEAVE;
317              
318 50027 100         if (!got) break;
319 50025           av_push(batch, row);
320 50025 100         if (av_len(batch) + 1 >= batch_size) {
321 52           encode_and_emit(aTHX_ enc, batch, writer);
322 52           av_clear(batch);
323             }
324             }
325              
326 2 100         if (av_len(batch) >= 0)
327 1           encode_and_emit(aTHX_ enc, batch, writer);
328             }
329              
330             SV*
331             streamer(self, writer, ...)
332             SV *self
333             SV *writer
334             CODE:
335             {
336             /* Constructor for ClickHouse::Encoder::Streamer. */
337             Streamer *s;
338 19           int batch_size = 10000;
339 19           const char *compress_mode = NULL;
340 19           STRLEN compress_mode_len = 0;
341 19           SV *hasher_sv = NULL;
342              
343 19 50         if (!sv_isobject(self)) croak("Not an object");
344 19 50         if (!SvROK(writer) || SvTYPE(SvRV(writer)) != SVt_PVCV)
    50          
345 0           croak("streamer: writer must be a coderef");
346              
347 19 50         if (items > 2) {
348             int i;
349 42 100         for (i = 2; i < items - 1; i += 2) {
350             STRLEN klen;
351 23           const char *key = SvPV(ST(i), klen);
352 23 100         if (klen == 10 && memcmp(key, "batch_size", 10) == 0) {
    50          
353 19           batch_size = (int)SvIV(ST(i+1));
354             }
355 4 100         else if (klen == 8 && memcmp(key, "compress", 8) == 0
    50          
356 3 50         && SvOK(ST(i+1))) {
357 3           compress_mode = SvPV(ST(i+1), compress_mode_len);
358             }
359 1 50         else if (klen == 6 && memcmp(key, "hasher", 6) == 0
    50          
360 1 50         && SvROK(ST(i+1))
361 1 50         && SvTYPE(SvRV(ST(i+1))) == SVt_PVCV) {
362 1           hasher_sv = ST(i+1);
363             }
364             }
365 19 50         if (batch_size < 1) batch_size = 1;
366             }
367              
368             /* Construct under our own ENTER/LEAVE so a croak partway through
369             * (e.g. OOM in newAV) frees the partially-built struct. */
370 19           ENTER;
371 19           Newxz(s, 1, Streamer);
372 19           SAVEDESTRUCTOR_X(cleanup_streamer_slot, &s);
373              
374             /* Hold an owned RV pointing at the same blessed inner SV, so the
375             * encoder survives even if the user's $enc goes out of scope. */
376 19           s->enc_sv = newRV_inc(SvRV(self));
377 19           s->enc = INT2PTR(Encoder*, SvIV(SvRV(s->enc_sv)));
378 19           s->writer = newSVsv(writer);
379 19           s->buffer = newAV();
380 19           s->batch_size = batch_size;
381              
382             /* Copy the compress mode into Streamer-owned memory. NULL / "none" /
383             * "raw" are all treated as "no compression" by streamer_flush. */
384 19 100         if (compress_mode
385 3 50         && !(compress_mode_len == 0)
386 3 100         && !(compress_mode_len == 4 && memcmp(compress_mode, "none", 4) == 0)
    50          
387 2 50         && !(compress_mode_len == 3 && memcmp(compress_mode, "raw", 3) == 0)) {
    50          
388 2           Newx(s->compress_mode, compress_mode_len + 1, char);
389 2           memcpy(s->compress_mode, compress_mode, compress_mode_len);
390 2           s->compress_mode[compress_mode_len] = '\0';
391             }
392 19 100         if (hasher_sv) s->hasher_sv = newSVsv(hasher_sv);
393              
394 19           RETVAL = newSV(0);
395 19           sv_setref_pv(RETVAL, "ClickHouse::Encoder::Streamer", (void *)s);
396 19           s = NULL; /* disarm: SV's DESTROY now owns the streamer */
397 19           LEAVE;
398             }
399             OUTPUT:
400             RETVAL
401              
402             SV*
403             _columns(self)
404             SV *self
405             CODE:
406             {
407             Encoder *enc;
408             AV *cols;
409             int i;
410              
411 40 50         if (!sv_isobject(self))
412 0           croak("Not an object");
413              
414 40           enc = INT2PTR(Encoder*, SvIV(SvRV(self)));
415 40           cols = newAV();
416              
417 118 100         for (i = 0; i < enc->num_columns; i++) {
418 78           AV *col = newAV();
419 78           av_push(col, newSVpvn(enc->columns[i].name, enc->columns[i].name_len));
420 78           av_push(col, newSVpvn(enc->columns[i].type_str, enc->columns[i].type_len));
421 78           av_push(cols, newRV_noinc((SV*)col));
422             }
423              
424 40           RETVAL = newRV_noinc((SV*)cols);
425             }
426             OUTPUT:
427             RETVAL
428              
429             void
430             DESTROY(self)
431             SV *self
432             CODE:
433             {
434             Encoder *enc;
435 1491 50         if (!sv_isobject(self)) return;
436 1491           enc = INT2PTR(Encoder*, SvIV(SvRV(self)));
437 1491           free_encoder(aTHX_ enc);
438             }
439              
440             SV *
441             decode_block(class, bytes, ...)
442             SV *class
443             SV *bytes
444             CODE:
445             {
446             PERL_UNUSED_VAR(class);
447             /* Optional 3rd arg: byte offset into the input. Lets callers walk
448             * a concatenated multi-block stream in O(N) instead of O(N*K) - the
449             * Perl-side decode_blocks wrapper relies on this to avoid substr
450             * copies on each iteration.
451             *
452             * Optional 4th arg: hashref of column names to KEEP. When provided,
453             * columns whose name isn't in the set get a placeholder values
454             * arrayref (one undef per row) and the wire bytes are still consumed
455             * for that column's data, but no SVs are allocated for the values.
456             * Memory win on wide SELECT * when caller wants a few columns. */
457 227           UV start_offset = 0;
458 227           HV *keep_set = NULL;
459 227 100         if (items >= 3) {
460 71           IV signed_off = SvIV(ST(2));
461 71 100         if (signed_off < 0)
462 1           croak("decode_block: offset must be non-negative (got %"
463             IVdf ")", signed_off);
464 70           start_offset = (UV)signed_off;
465             }
466 226 100         if (items >= 4 && SvOK(ST(3))) {
    100          
467 25 100         if (!SvROK(ST(3)) || SvTYPE(SvRV(ST(3))) != SVt_PVHV)
    100          
468 2 100         croak("decode_block: columns filter must be a hashref "
469             "(got %s)",
470             SvROK(ST(3)) ? sv_reftype(SvRV(ST(3)), 0)
471             : "non-reference");
472 23           keep_set = (HV *)SvRV(ST(3));
473             }
474             const unsigned char *start, *p, *end;
475             UV ncols, nrows;
476 224           decode_block_prologue(aTHX_ bytes, start_offset, "decode_block",
477             &start, &p, &end, &ncols, &nrows);
478              
479             /* Mortalize cols so a mid-loop croak (from decode_column,
480             * dec_lenpfx_string, etc.) reclaims it instead of leaking. We
481             * SvREFCNT_inc when transferring ownership to the result HV. */
482 220           AV *cols = (AV *)sv_2mortal((SV *)newAV());
483 220 50         if (ncols > 0) av_extend(cols, ncols - 1);
484             UV c;
485 1520 100         for (c = 0; c < ncols; c++) {
486             const char *name; STRLEN name_len;
487             const char *tstr; STRLEN tlen;
488 1330           dec_lenpfx_string(aTHX_ &p, end, &name, &name_len);
489 1330           dec_lenpfx_string(aTHX_ &p, end, &tstr, &tlen);
490             /* parse_type uses heap-slot SAVEDESTRUCTOR_X for cleanup on
491             * croak; on success the slot is disarmed. We're in the XSUB's
492             * implicit save scope, so a croak from any nested decode
493             * unwinds the type back through this cleanup. */
494 1320           TypeInfo *t = parse_type(aTHX_ tstr, tlen);
495              
496 1320           int keep = 1;
497 1320 100         if (keep_set && !hv_exists(keep_set, name, name_len))
    100          
498 524           keep = 0;
499              
500             SV *values;
501 1320 100         if (keep) {
502 796           values = decode_column(aTHX_ &p, end, t, (SSize_t)nrows);
503             } else {
504             /* Decode then discard - we still must consume the wire
505             * bytes to keep the cursor aligned for the next column.
506             * The AV is freed immediately so peak memory is one
507             * column's values, not the full block's. */
508 524           SV *tmp = decode_column(aTHX_ &p, end, t, (SSize_t)nrows);
509 515           SvREFCNT_dec(tmp);
510 515           AV *placeholder = newAV();
511 515 50         if (nrows > 0) {
512 515           av_extend(placeholder, nrows - 1);
513             SSize_t r;
514 50060 100         for (r = 0; r < (SSize_t)nrows; r++)
515 49545           av_store(placeholder, r, newSV(0));
516             }
517 515           values = newRV_noinc((SV *)placeholder);
518             }
519             /* Free this column's TypeInfo eagerly to avoid piling them up
520             * on the save stack for wide blocks. */
521 1300           free_typeinfo(aTHX_ t);
522              
523 1300           HV *col_hv = newHV();
524 1300           (void)hv_stores(col_hv, "name", newSVpvn(name, name_len));
525 1300           (void)hv_stores(col_hv, "type", newSVpvn(tstr, tlen));
526 1300           (void)hv_stores(col_hv, "values", values);
527 1300 100         if (!keep) (void)hv_stores(col_hv, "skipped", newSViv(1));
528 1300           av_store(cols, c, newRV_noinc((SV *)col_hv));
529             }
530              
531 190           HV *result = newHV();
532 190           (void)hv_stores(result, "ncols", newSVuv(ncols));
533 190           (void)hv_stores(result, "nrows", newSVuv(nrows));
534             /* Transfer ownership out of the mortal: bump refcount, then the
535             * mortal-stack cleanup at scope exit drops one back, leaving the
536             * net refcount at 1 (owned by `result`). */
537 190           (void)hv_stores(result, "columns",
538             newRV_inc((SV *)cols));
539 190           (void)hv_stores(result, "consumed", newSVuv((UV)(p - start)));
540 190           RETVAL = newRV_noinc((SV *)result);
541             }
542             OUTPUT: RETVAL
543              
544             # Row-oriented decoder: same wire walk as decode_block, but values are
545             # distributed into row-major arrayrefs as each column is decoded, then
546             # the per-column AV is freed. Peak memory holds one column's values
547             # plus all row AVs (vs decode_rows-via-Perl which holds both
548             # representations + does the transpose in Perl).
549             SV *
550             decode_block_rows(class, bytes, ...)
551             SV *class
552             SV *bytes
553             CODE:
554             {
555             PERL_UNUSED_VAR(class);
556 6           UV start_offset = 0;
557 6 100         if (items >= 3) {
558 5           IV signed_off = SvIV(ST(2));
559 5 50         if (signed_off < 0)
560 0           croak("decode_block_rows: offset must be non-negative "
561             "(got %" IVdf ")", signed_off);
562 5           start_offset = (UV)signed_off;
563             }
564             const unsigned char *start, *p, *end;
565             UV ncols, nrows;
566 6           decode_block_prologue(aTHX_ bytes, start_offset, "decode_block_rows",
567             &start, &p, &end, &ncols, &nrows);
568              
569             /* Mortalize so a mid-loop croak unwinds and reclaims these
570             * (and, via the row RVs they hold, all populated row AVs).
571             * SvREFCNT_inc-via-newRV_inc transfers them to the result HV
572             * on the success path. */
573 5           AV *names = (AV *)sv_2mortal((SV *)newAV());
574 5           AV *types = (AV *)sv_2mortal((SV *)newAV());
575 5           AV *rows = (AV *)sv_2mortal((SV *)newAV());
576 5 50         if (ncols > 0) { av_extend(names, ncols - 1); av_extend(types, ncols - 1); }
577 5 50         if (nrows > 0) av_extend(rows, nrows - 1);
578              
579             /* Pre-create row AVs - we'll fill column c into row_av[c] as we
580             * decode each column. Stash AV pointers for fast access. */
581 5           AV **row_avs = NULL;
582 5 50         if (nrows > 0) {
583 5 50         Newx(row_avs, nrows, AV *);
584 5           SAVEFREEPV(row_avs);
585             UV r;
586 17 100         for (r = 0; r < nrows; r++) {
587 12           AV *row_av = newAV();
588 12 50         if (ncols > 0) av_extend(row_av, ncols - 1);
589 12           row_avs[r] = row_av;
590 12           av_store(rows, r, newRV_noinc((SV *)row_av));
591             }
592             }
593              
594             UV c;
595 27 100         for (c = 0; c < ncols; c++) {
596             const char *name; STRLEN name_len;
597             const char *tstr; STRLEN tlen;
598 22           dec_lenpfx_string(aTHX_ &p, end, &name, &name_len);
599 22           dec_lenpfx_string(aTHX_ &p, end, &tstr, &tlen);
600 22           TypeInfo *t = parse_type(aTHX_ tstr, tlen);
601 22           SV *col_rv = decode_column(aTHX_ &p, end, t, (SSize_t)nrows);
602 22           free_typeinfo(aTHX_ t);
603              
604 22           AV *col_av = (AV *)SvRV(col_rv);
605             UV r;
606 69 100         for (r = 0; r < nrows; r++) {
607 47           SV **e = av_fetch(col_av, r, 0);
608 47 50         av_store(row_avs[r], c, e ? SvREFCNT_inc(*e) : newSV(0));
609             }
610             /* Free the column AV eagerly; we no longer need it. */
611 22           SvREFCNT_dec(col_rv);
612              
613 22           av_store(names, c, newSVpvn(name, name_len));
614 22           av_store(types, c, newSVpvn(tstr, tlen));
615             }
616              
617 5           HV *result = newHV();
618 5           (void)hv_stores(result, "ncols", newSVuv(ncols));
619 5           (void)hv_stores(result, "nrows", newSVuv(nrows));
620             /* Transfer ownership out of the mortal stack: newRV_inc bumps
621             * the AV refcount so when the mortal cleanup drops one, the net
622             * count stays at 1 (owned by `result`). */
623 5           (void)hv_stores(result, "names", newRV_inc((SV *)names));
624 5           (void)hv_stores(result, "types", newRV_inc((SV *)types));
625 5           (void)hv_stores(result, "rows", newRV_inc((SV *)rows));
626 5           (void)hv_stores(result, "consumed", newSVuv((UV)(p - start)));
627 5           RETVAL = newRV_noinc((SV *)result);
628             }
629             OUTPUT: RETVAL
630              
631             # CityHash128 in the "cityhash102" variant used by ClickHouse's
632             # compressed-block prefix. Returns a 16-byte string (8 bytes low,
633             # then 8 bytes high; matches the wire layout CH expects). Exposed
634             # so compress_native_block can default its `hasher` to this XSUB
635             # instead of forcing every caller to supply one.
636             SV *
637             _cityhash128(class_or_bytes, ...)
638             SV *class_or_bytes
639             CODE:
640             {
641             SV *input_sv;
642             /* Accept both class-method ($class->_cityhash128($bytes)) and
643             * function-style (\&_cityhash128 passed as a hasher coderef)
644             * call shapes. A class-method call has items >= 2 and the first
645             * arg is a non-reference string (the class name); a coderef call
646             * has items == 1 and the first arg IS the bytes. */
647 40 50         if (items >= 2 && SvPOK(class_or_bytes) && !SvROK(class_or_bytes)) {
    0          
    0          
648 0           input_sv = ST(1);
649             } else {
650 40           input_sv = class_or_bytes;
651             }
652             STRLEN len;
653 40           const char *s = SvPVbyte(input_sv, len);
654             unsigned char out[16];
655 40           cityhash128_v102(s, (size_t)len, out);
656 40           RETVAL = newSVpvn((const char *)out, 16);
657             }
658             OUTPUT: RETVAL
659              
660             MODULE = ClickHouse::Encoder PACKAGE = ClickHouse::Encoder::Streamer
661              
662             void
663             push_row(self, row)
664             SV *self
665             SV *row
666             CODE:
667             {
668             Streamer *s;
669 98 50         if (!sv_isobject(self)) croak("Not an object");
670 98           s = INT2PTR(Streamer *, SvIV(SvRV(self)));
671 98           av_push(s->buffer, SvREFCNT_inc(row));
672 98 100         if (av_len(s->buffer) + 1 >= s->batch_size)
673 16           streamer_flush(aTHX_ s);
674             }
675              
676             void
677             finish(self)
678             SV *self
679             CODE:
680             {
681             Streamer *s;
682 18 50         if (!sv_isobject(self)) croak("Not an object");
683 18           s = INT2PTR(Streamer *, SvIV(SvRV(self)));
684 18           streamer_flush(aTHX_ s);
685             }
686              
687             void
688             reset(self)
689             SV *self
690             CODE:
691             {
692             /* Discard buffered rows without flushing -- useful for error recovery
693             * after an upstream failure when the in-flight batch should be dropped. */
694             Streamer *s;
695 3 50         if (!sv_isobject(self)) croak("Not an object");
696 3           s = INT2PTR(Streamer *, SvIV(SvRV(self)));
697 3 50         if (av_len(s->buffer) >= 0) av_clear(s->buffer);
698             }
699              
700             UV
701             buffered_count(self)
702             SV *self
703             CODE:
704             {
705             Streamer *s;
706 10 50         if (!sv_isobject(self)) croak("Not an object");
707 10           s = INT2PTR(Streamer *, SvIV(SvRV(self)));
708 10 50         RETVAL = (UV)(av_len(s->buffer) + 1);
709             }
710             OUTPUT:
711             RETVAL
712              
713             bool
714             is_empty(self)
715             SV *self
716             CODE:
717             {
718             Streamer *s;
719 6 50         if (!sv_isobject(self)) croak("Not an object");
720 6           s = INT2PTR(Streamer *, SvIV(SvRV(self)));
721 6 100         RETVAL = (av_len(s->buffer) < 0);
722             }
723             OUTPUT:
724             RETVAL
725              
726             void
727             DESTROY(self)
728             SV *self
729             CODE:
730             {
731             Streamer *s;
732 19 50         if (!sv_isobject(self)) return;
733 19           s = INT2PTR(Streamer *, SvIV(SvRV(self)));
734 19           free_streamer(aTHX_ s);
735             }
736              
737             MODULE = ClickHouse::Encoder PACKAGE = ClickHouse::Encoder::TCP
738              
739             # Pack an unsigned varint (LEB128 / CH varuint form). Used by the
740             # protocol-packet builders in ClickHouse::Encoder::TCP. Equivalent
741             # to the buf_varint() internal helper but exposed as a Perl callable
742             # so the pure-Perl module can produce wire-format bytes via XS rather
743             # than a per-byte chr/shift loop.
744             SV *
745             pack_varint(v)
746             UV v
747             CODE:
748             {
749             char buf[10];
750 122           int i = 0;
751 154 100         while (v >= 0x80) {
752 32           buf[i++] = (char)((v & 0x7f) | 0x80);
753 32           v >>= 7;
754             }
755 122           buf[i++] = (char)v;
756 122           RETVAL = newSVpvn(buf, i);
757             }
758             OUTPUT:
759             RETVAL
760              
761             # Unpack one varint from a byte string starting at the given offset.
762             # Returns (value, new_offset). Croaks on truncated input or a varint
763             # wider than 64 bits.
764             void
765             unpack_varint(bytes, offset)
766             SV *bytes
767             UV offset
768             PPCODE:
769             {
770             STRLEN len;
771 67           const unsigned char *p = (const unsigned char *)SvPVbyte(bytes, len);
772 67           UV v = tcp_read_varint(aTHX_ p, (UV)len, &offset);
773 62 50         EXTEND(SP, 2);
774 62           PUSHs(sv_2mortal(newSVuv(v)));
775 62           PUSHs(sv_2mortal(newSVuv(offset)));
776             }
777              
778             # Pack a length-prefixed string (varint length + UTF-8 bytes). The
779             # caller should pass Perl strings; we encode to UTF-8 explicitly so
780             # non-ASCII chars get the right byte length on the wire.
781             SV *
782             pack_string(s)
783             SV *s
784             CODE:
785             {
786             STRLEN len;
787             const char *p;
788 205 50         if (!SvOK(s)) {
789 0           p = ""; len = 0;
790             } else {
791             /* SvPVutf8 returns the UTF-8 byte form */
792 205           p = SvPVutf8(s, len);
793             }
794             /* Build prefix + bytes. */
795             char prefix[10];
796 205           int pi = 0;
797 205           UV vv = (UV)len;
798 205 50         while (vv >= 0x80) {
799 0           prefix[pi++] = (char)((vv & 0x7f) | 0x80);
800 0           vv >>= 7;
801             }
802 205           prefix[pi++] = (char)vv;
803 205           RETVAL = newSVpvn(prefix, pi);
804 205           sv_catpvn(RETVAL, p, len);
805             }
806             OUTPUT:
807             RETVAL
808              
809             # Unpack a length-prefixed string at the given offset. Returns (string,
810             # new_offset). String bytes are returned without decoding (caller
811             # decides UTF-8 vs binary).
812             void
813             unpack_string(bytes, offset)
814             SV *bytes
815             UV offset
816             PPCODE:
817             {
818             STRLEN buf_len;
819 19           const unsigned char *p = (const unsigned char *)SvPVbyte(bytes, buf_len);
820 19           UV slen = tcp_read_varint(aTHX_ p, (UV)buf_len, &offset);
821             /* Subtraction form: addition (offset + slen) can wrap UV_MAX when
822             * slen is attacker-controlled via a crafted varint. tcp_read_varint
823             * guarantees offset <= buf_len on return, so buf_len - offset is
824             * safe. */
825 18 100         if (slen > (UV)buf_len - offset)
826 1           croak("string: truncated at offset %lu (need %lu)",
827             (unsigned long)offset, (unsigned long)slen);
828 17           SV *out = newSVpvn((const char *)(p + offset), (STRLEN)slen);
829 17 50         EXTEND(SP, 2);
830 17           PUSHs(sv_2mortal(out));
831 17           PUSHs(sv_2mortal(newSVuv(offset + slen)));
832             }
833