File Coverage

runtime.c
Criterion Covered Total %
statement 115 118 97.4
branch 62 92 67.3
condition n/a
subroutine n/a
pod n/a
total 177 210 84.2


line stmt bran cond sub pod time code
1             #define PERL_NO_GET_CONTEXT
2             #include "EXTERN.h"
3             #include "perl.h"
4             #include "XSUB.h"
5              
6             #include "types.h"
7             #include "buffer.h"
8             #include "encode.h"
9             #include "runtime.h"
10              
11 2138           void do_encode(pTHX_ Encoder *enc, AV *rows_av, Buffer *buf) {
12 2138           SSize_t num_rows = av_len(rows_av) + 1;
13             SSize_t r, c;
14              
15             /* Validate every row up front and cache the dereffed AV*. Caching
16             * avoids a second av_fetch per row in the column loop and removes
17             * any concern about tied / magic arrays returning different values
18             * across passes. */
19 2138           AV **row_avs = NULL;
20 2138 100         if (num_rows > 0) {
21 2127 50         Newx(row_avs, num_rows, AV*);
22 2127           SAVEFREEPV(row_avs);
23             }
24 465658 100         for (r = 0; r < num_rows; r++) {
25 463526           SV **row_sv = av_fetch(rows_av, r, 0);
26             AV *row_av;
27             SSize_t row_cols;
28 463526 50         if (!row_sv)
29 0           croak("Row %" IVdf " must be arrayref", (IV)r);
30             /* Materialize any magic on the fetched SV - for tied AVs the
31             * FETCH result is delivered through magic and SvROK / SvTYPE
32             * see the unrealized magical placeholder until SvGETMAGIC has
33             * been called. Common in DBI-driven ingest (fetchall_arrayref
34             * may return magical AVs at either level). */
35 463526           SvGETMAGIC(*row_sv);
36 463526 100         if (!SvROK(*row_sv) || SvTYPE(SvRV(*row_sv)) != SVt_PVAV)
    50          
37 3           croak("Row %" IVdf " must be arrayref", (IV)r);
38 463523           row_av = (AV*)SvRV(*row_sv);
39 463523           row_cols = av_len(row_av) + 1;
40 463523 100         if (row_cols != enc->num_columns)
41 3           croak("Row %" IVdf " has %" IVdf " columns, expected %d",
42             (IV)r, (IV)row_cols, enc->num_columns);
43 463520           row_avs[r] = row_av;
44             }
45              
46 2132           buf_varint(aTHX_ buf, enc->num_columns);
47 2132           buf_varint(aTHX_ buf, num_rows);
48              
49 2132           SV **col_values = alloc_sv_array(aTHX_ num_rows);
50              
51 7778 100         for (c = 0; c < enc->num_columns; c++) {
52 5702           Column *col = &enc->columns[c];
53 5702           buf_string(aTHX_ buf, col->name, col->name_len);
54 5702           buf_string(aTHX_ buf, col->type_str, col->type_len);
55 738020 100         for (r = 0; r < num_rows; r++) {
56 732318           SV **val_sv = av_fetch(row_avs[r], c, 0);
57 732318 50         if (val_sv) {
58             /* Same magic-realization step as the row-level loop:
59             * tied inner AVs return magical placeholders whose
60             * SvPVbyte / SvIV / SvNV downstream won't pick up the
61             * real value until SvGETMAGIC has fired. */
62 732318           SvGETMAGIC(*val_sv);
63 732318           col_values[r] = *val_sv;
64             } else {
65 0           col_values[r] = &PL_sv_undef;
66             }
67             }
68 5702           encode_column(aTHX_ buf, col_values, num_rows, col->type);
69             }
70 2076           }
71              
72             /* Route `bytes` through ClickHouse::Encoder->compress_native_block
73             * and return a freshly-allocated SV with the compressed-block-framed
74             * result. Caller takes ownership (refcount 1) and is responsible for
75             * mortalizing or freeing. mode == NULL / "none" / "raw" -> just bumps
76             * the input's refcount and returns it. */
77 4           static SV *maybe_compress(pTHX_ SV *bytes,
78             const char *mode, SV *hasher_sv) {
79 4 50         if (!mode || strcmp(mode, "none") == 0 || strcmp(mode, "raw") == 0)
    50          
    50          
80 0           return SvREFCNT_inc(bytes);
81 4           dSP;
82 4           ENTER;
83 4           SAVETMPS;
84 4 50         PUSHMARK(SP);
85 4 50         XPUSHs(sv_2mortal(newSVpvs("ClickHouse::Encoder")));
86 4 50         XPUSHs(bytes);
87 4 50         XPUSHs(sv_2mortal(newSVpvs("mode")));
88 4 50         XPUSHs(sv_2mortal(newSVpv(mode, 0)));
89 4 100         if (hasher_sv) {
90 1 50         XPUSHs(sv_2mortal(newSVpvs("hasher")));
91 1 50         XPUSHs(hasher_sv);
92             }
93 4           PUTBACK;
94 4           int n = call_method("compress_native_block", G_SCALAR);
95 4           SPAGAIN;
96 4           SV *out = NULL;
97 4 50         if (n == 1) {
98             /* POPs gives a mortal SV; copy its body via newSVsv into a
99             * non-mortal SV that survives the FREETMPS below. */
100 4           out = newSVsv(POPs);
101             }
102 4           PUTBACK;
103 4 50         FREETMPS;
104 4           LEAVE;
105 4 50         if (!out) croak("streamer: compress_native_block returned nothing");
106 4           return out;
107             }
108              
109 77           void encode_and_emit(pTHX_ Encoder *enc, AV *batch, SV *writer) {
110 77           dSP;
111 77           ENTER;
112 77           SAVETMPS;
113              
114             Buffer buf;
115 77           buf_init(aTHX_ &buf);
116 77           do_encode(aTHX_ enc, batch, &buf);
117              
118 77           SV *bytes = sv_2mortal(newSVpvn(buf.ptr, buf.len));
119              
120 77 50         PUSHMARK(SP);
121 77 50         XPUSHs(bytes);
122 77           PUTBACK;
123 77           call_sv(writer, G_VOID | G_DISCARD);
124              
125 74 50         FREETMPS;
126 74           LEAVE;
127 74           }
128              
129 34           void streamer_flush(pTHX_ Streamer *s) {
130 34 100         if (av_len(s->buffer) < 0) return;
131             /* Swap in a fresh buffer BEFORE the (potentially croaking) emit so that
132             * if the writer dies and the user catches it via eval{}, the streamer
133             * is left in a clean state instead of replaying the failed batch's rows
134             * on the next push. The old buffer is mortalized so it's reclaimed
135             * either way. */
136 28           AV *batch = s->buffer;
137 28           s->buffer = newAV();
138 28           sv_2mortal((SV *)batch);
139              
140             /* If compression is enabled, build the bytes locally then route them
141             * through compress_native_block before invoking the user's writer.
142             * Otherwise fall through to encode_and_emit's direct call. */
143 28 100         if (s->compress_mode) {
144 4           dSP;
145 4           ENTER;
146 4           SAVETMPS;
147             Buffer buf;
148 4           buf_init(aTHX_ &buf);
149 4           do_encode(aTHX_ s->enc, batch, &buf);
150 4           SV *bytes = sv_2mortal(newSVpvn(buf.ptr, buf.len));
151             /* maybe_compress returns an owned SV (refcount 1); mortalize it
152             * here so it's cleaned up after the writer call. */
153 4           SV *out = sv_2mortal(
154             maybe_compress(aTHX_ bytes, s->compress_mode, s->hasher_sv));
155 4 50         PUSHMARK(SP);
156 4 50         XPUSHs(out);
157 4           PUTBACK;
158 4           call_sv(s->writer, G_VOID | G_DISCARD);
159 4 50         FREETMPS;
160 4           LEAVE;
161 4           return;
162             }
163 24           encode_and_emit(aTHX_ s->enc, batch, s->writer);
164             }
165              
166 19           void free_streamer(pTHX_ Streamer *s) {
167 19 50         if (!s) return;
168 19 50         if (s->enc_sv) SvREFCNT_dec(s->enc_sv);
169 19 50         if (s->writer) SvREFCNT_dec(s->writer);
170 19 50         if (s->buffer) SvREFCNT_dec((SV *)s->buffer);
171 19 100         if (s->compress_mode) Safefree(s->compress_mode);
172 19 100         if (s->hasher_sv) SvREFCNT_dec(s->hasher_sv);
173 19           Safefree(s);
174             }
175              
176 19           void cleanup_streamer_slot(pTHX_ void *p) {
177 19           Streamer **slot = (Streamer **)p;
178 19 50         if (*slot) free_streamer(aTHX_ *slot);
179 19           }
180              
181 1519           void free_encoder(pTHX_ Encoder *enc) {
182             int i;
183 1519 50         if (!enc) return;
184 1519 50         if (enc->columns) {
185 5724 100         for (i = 0; i < enc->num_columns; i++) {
186 4205 100         if (enc->columns[i].name) Safefree(enc->columns[i].name);
187 4205 100         if (enc->columns[i].type_str) Safefree(enc->columns[i].type_str);
188 4205 100         if (enc->columns[i].type) free_typeinfo(aTHX_ enc->columns[i].type);
189             }
190 1519           Safefree(enc->columns);
191             }
192 1519           Safefree(enc);
193             }
194              
195 1519           void cleanup_encoder_slot(pTHX_ void *p) {
196 1519           Encoder **slot = (Encoder **)p;
197 1519 100         if (*slot) free_encoder(aTHX_ *slot);
198 1519           }