| line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
|
1
|
|
|
|
|
|
|
#define PERL_NO_GET_CONTEXT |
|
2
|
|
|
|
|
|
|
#include "EXTERN.h" |
|
3
|
|
|
|
|
|
|
#include "perl.h" |
|
4
|
|
|
|
|
|
|
#include "XSUB.h" |
|
5
|
|
|
|
|
|
|
|
|
6
|
|
|
|
|
|
|
#include "types.h" |
|
7
|
|
|
|
|
|
|
#include "buffer.h" |
|
8
|
|
|
|
|
|
|
#include "encode.h" |
|
9
|
|
|
|
|
|
|
#include "runtime.h" |
|
10
|
|
|
|
|
|
|
|
|
11
|
2138
|
|
|
|
|
|
void do_encode(pTHX_ Encoder *enc, AV *rows_av, Buffer *buf) { |
|
12
|
2138
|
|
|
|
|
|
SSize_t num_rows = av_len(rows_av) + 1; |
|
13
|
|
|
|
|
|
|
SSize_t r, c; |
|
14
|
|
|
|
|
|
|
|
|
15
|
|
|
|
|
|
|
/* Validate every row up front and cache the dereffed AV*. Caching |
|
16
|
|
|
|
|
|
|
* avoids a second av_fetch per row in the column loop and removes |
|
17
|
|
|
|
|
|
|
* any concern about tied / magic arrays returning different values |
|
18
|
|
|
|
|
|
|
* across passes. */ |
|
19
|
2138
|
|
|
|
|
|
AV **row_avs = NULL; |
|
20
|
2138
|
100
|
|
|
|
|
if (num_rows > 0) { |
|
21
|
2127
|
50
|
|
|
|
|
Newx(row_avs, num_rows, AV*); |
|
22
|
2127
|
|
|
|
|
|
SAVEFREEPV(row_avs); |
|
23
|
|
|
|
|
|
|
} |
|
24
|
465658
|
100
|
|
|
|
|
for (r = 0; r < num_rows; r++) { |
|
25
|
463526
|
|
|
|
|
|
SV **row_sv = av_fetch(rows_av, r, 0); |
|
26
|
|
|
|
|
|
|
AV *row_av; |
|
27
|
|
|
|
|
|
|
SSize_t row_cols; |
|
28
|
463526
|
50
|
|
|
|
|
if (!row_sv) |
|
29
|
0
|
|
|
|
|
|
croak("Row %" IVdf " must be arrayref", (IV)r); |
|
30
|
|
|
|
|
|
|
/* Materialize any magic on the fetched SV - for tied AVs the |
|
31
|
|
|
|
|
|
|
* FETCH result is delivered through magic and SvROK / SvTYPE |
|
32
|
|
|
|
|
|
|
* see the unrealized magical placeholder until SvGETMAGIC has |
|
33
|
|
|
|
|
|
|
* been called. Common in DBI-driven ingest (fetchall_arrayref |
|
34
|
|
|
|
|
|
|
* may return magical AVs at either level). */ |
|
35
|
463526
|
|
|
|
|
|
SvGETMAGIC(*row_sv); |
|
36
|
463526
|
100
|
|
|
|
|
if (!SvROK(*row_sv) || SvTYPE(SvRV(*row_sv)) != SVt_PVAV) |
|
|
|
50
|
|
|
|
|
|
|
37
|
3
|
|
|
|
|
|
croak("Row %" IVdf " must be arrayref", (IV)r); |
|
38
|
463523
|
|
|
|
|
|
row_av = (AV*)SvRV(*row_sv); |
|
39
|
463523
|
|
|
|
|
|
row_cols = av_len(row_av) + 1; |
|
40
|
463523
|
100
|
|
|
|
|
if (row_cols != enc->num_columns) |
|
41
|
3
|
|
|
|
|
|
croak("Row %" IVdf " has %" IVdf " columns, expected %d", |
|
42
|
|
|
|
|
|
|
(IV)r, (IV)row_cols, enc->num_columns); |
|
43
|
463520
|
|
|
|
|
|
row_avs[r] = row_av; |
|
44
|
|
|
|
|
|
|
} |
|
45
|
|
|
|
|
|
|
|
|
46
|
2132
|
|
|
|
|
|
buf_varint(aTHX_ buf, enc->num_columns); |
|
47
|
2132
|
|
|
|
|
|
buf_varint(aTHX_ buf, num_rows); |
|
48
|
|
|
|
|
|
|
|
|
49
|
2132
|
|
|
|
|
|
SV **col_values = alloc_sv_array(aTHX_ num_rows); |
|
50
|
|
|
|
|
|
|
|
|
51
|
7778
|
100
|
|
|
|
|
for (c = 0; c < enc->num_columns; c++) { |
|
52
|
5702
|
|
|
|
|
|
Column *col = &enc->columns[c]; |
|
53
|
5702
|
|
|
|
|
|
buf_string(aTHX_ buf, col->name, col->name_len); |
|
54
|
5702
|
|
|
|
|
|
buf_string(aTHX_ buf, col->type_str, col->type_len); |
|
55
|
738020
|
100
|
|
|
|
|
for (r = 0; r < num_rows; r++) { |
|
56
|
732318
|
|
|
|
|
|
SV **val_sv = av_fetch(row_avs[r], c, 0); |
|
57
|
732318
|
50
|
|
|
|
|
if (val_sv) { |
|
58
|
|
|
|
|
|
|
/* Same magic-realization step as the row-level loop: |
|
59
|
|
|
|
|
|
|
* tied inner AVs return magical placeholders whose |
|
60
|
|
|
|
|
|
|
* SvPVbyte / SvIV / SvNV downstream won't pick up the |
|
61
|
|
|
|
|
|
|
* real value until SvGETMAGIC has fired. */ |
|
62
|
732318
|
|
|
|
|
|
SvGETMAGIC(*val_sv); |
|
63
|
732318
|
|
|
|
|
|
col_values[r] = *val_sv; |
|
64
|
|
|
|
|
|
|
} else { |
|
65
|
0
|
|
|
|
|
|
col_values[r] = &PL_sv_undef; |
|
66
|
|
|
|
|
|
|
} |
|
67
|
|
|
|
|
|
|
} |
|
68
|
5702
|
|
|
|
|
|
encode_column(aTHX_ buf, col_values, num_rows, col->type); |
|
69
|
|
|
|
|
|
|
} |
|
70
|
2076
|
|
|
|
|
|
} |
|
71
|
|
|
|
|
|
|
|
|
72
|
|
|
|
|
|
|
/* Route `bytes` through ClickHouse::Encoder->compress_native_block |
|
73
|
|
|
|
|
|
|
* and return a freshly-allocated SV with the compressed-block-framed |
|
74
|
|
|
|
|
|
|
* result. Caller takes ownership (refcount 1) and is responsible for |
|
75
|
|
|
|
|
|
|
* mortalizing or freeing. mode == NULL / "none" / "raw" -> just bumps |
|
76
|
|
|
|
|
|
|
* the input's refcount and returns it. */ |
|
77
|
4
|
|
|
|
|
|
static SV *maybe_compress(pTHX_ SV *bytes, |
|
78
|
|
|
|
|
|
|
const char *mode, SV *hasher_sv) { |
|
79
|
4
|
50
|
|
|
|
|
if (!mode || strcmp(mode, "none") == 0 || strcmp(mode, "raw") == 0) |
|
|
|
50
|
|
|
|
|
|
|
|
|
50
|
|
|
|
|
|
|
80
|
0
|
|
|
|
|
|
return SvREFCNT_inc(bytes); |
|
81
|
4
|
|
|
|
|
|
dSP; |
|
82
|
4
|
|
|
|
|
|
ENTER; |
|
83
|
4
|
|
|
|
|
|
SAVETMPS; |
|
84
|
4
|
50
|
|
|
|
|
PUSHMARK(SP); |
|
85
|
4
|
50
|
|
|
|
|
XPUSHs(sv_2mortal(newSVpvs("ClickHouse::Encoder"))); |
|
86
|
4
|
50
|
|
|
|
|
XPUSHs(bytes); |
|
87
|
4
|
50
|
|
|
|
|
XPUSHs(sv_2mortal(newSVpvs("mode"))); |
|
88
|
4
|
50
|
|
|
|
|
XPUSHs(sv_2mortal(newSVpv(mode, 0))); |
|
89
|
4
|
100
|
|
|
|
|
if (hasher_sv) { |
|
90
|
1
|
50
|
|
|
|
|
XPUSHs(sv_2mortal(newSVpvs("hasher"))); |
|
91
|
1
|
50
|
|
|
|
|
XPUSHs(hasher_sv); |
|
92
|
|
|
|
|
|
|
} |
|
93
|
4
|
|
|
|
|
|
PUTBACK; |
|
94
|
4
|
|
|
|
|
|
int n = call_method("compress_native_block", G_SCALAR); |
|
95
|
4
|
|
|
|
|
|
SPAGAIN; |
|
96
|
4
|
|
|
|
|
|
SV *out = NULL; |
|
97
|
4
|
50
|
|
|
|
|
if (n == 1) { |
|
98
|
|
|
|
|
|
|
/* POPs gives a mortal SV; copy its body via newSVsv into a |
|
99
|
|
|
|
|
|
|
* non-mortal SV that survives the FREETMPS below. */ |
|
100
|
4
|
|
|
|
|
|
out = newSVsv(POPs); |
|
101
|
|
|
|
|
|
|
} |
|
102
|
4
|
|
|
|
|
|
PUTBACK; |
|
103
|
4
|
50
|
|
|
|
|
FREETMPS; |
|
104
|
4
|
|
|
|
|
|
LEAVE; |
|
105
|
4
|
50
|
|
|
|
|
if (!out) croak("streamer: compress_native_block returned nothing"); |
|
106
|
4
|
|
|
|
|
|
return out; |
|
107
|
|
|
|
|
|
|
} |
|
108
|
|
|
|
|
|
|
|
|
109
|
77
|
|
|
|
|
|
void encode_and_emit(pTHX_ Encoder *enc, AV *batch, SV *writer) { |
|
110
|
77
|
|
|
|
|
|
dSP; |
|
111
|
77
|
|
|
|
|
|
ENTER; |
|
112
|
77
|
|
|
|
|
|
SAVETMPS; |
|
113
|
|
|
|
|
|
|
|
|
114
|
|
|
|
|
|
|
Buffer buf; |
|
115
|
77
|
|
|
|
|
|
buf_init(aTHX_ &buf); |
|
116
|
77
|
|
|
|
|
|
do_encode(aTHX_ enc, batch, &buf); |
|
117
|
|
|
|
|
|
|
|
|
118
|
77
|
|
|
|
|
|
SV *bytes = sv_2mortal(newSVpvn(buf.ptr, buf.len)); |
|
119
|
|
|
|
|
|
|
|
|
120
|
77
|
50
|
|
|
|
|
PUSHMARK(SP); |
|
121
|
77
|
50
|
|
|
|
|
XPUSHs(bytes); |
|
122
|
77
|
|
|
|
|
|
PUTBACK; |
|
123
|
77
|
|
|
|
|
|
call_sv(writer, G_VOID | G_DISCARD); |
|
124
|
|
|
|
|
|
|
|
|
125
|
74
|
50
|
|
|
|
|
FREETMPS; |
|
126
|
74
|
|
|
|
|
|
LEAVE; |
|
127
|
74
|
|
|
|
|
|
} |
|
128
|
|
|
|
|
|
|
|
|
129
|
34
|
|
|
|
|
|
void streamer_flush(pTHX_ Streamer *s) { |
|
130
|
34
|
100
|
|
|
|
|
if (av_len(s->buffer) < 0) return; |
|
131
|
|
|
|
|
|
|
/* Swap in a fresh buffer BEFORE the (potentially croaking) emit so that |
|
132
|
|
|
|
|
|
|
* if the writer dies and the user catches it via eval{}, the streamer |
|
133
|
|
|
|
|
|
|
* is left in a clean state instead of replaying the failed batch's rows |
|
134
|
|
|
|
|
|
|
* on the next push. The old buffer is mortalized so it's reclaimed |
|
135
|
|
|
|
|
|
|
* either way. */ |
|
136
|
28
|
|
|
|
|
|
AV *batch = s->buffer; |
|
137
|
28
|
|
|
|
|
|
s->buffer = newAV(); |
|
138
|
28
|
|
|
|
|
|
sv_2mortal((SV *)batch); |
|
139
|
|
|
|
|
|
|
|
|
140
|
|
|
|
|
|
|
/* If compression is enabled, build the bytes locally then route them |
|
141
|
|
|
|
|
|
|
* through compress_native_block before invoking the user's writer. |
|
142
|
|
|
|
|
|
|
* Otherwise fall through to encode_and_emit's direct call. */ |
|
143
|
28
|
100
|
|
|
|
|
if (s->compress_mode) { |
|
144
|
4
|
|
|
|
|
|
dSP; |
|
145
|
4
|
|
|
|
|
|
ENTER; |
|
146
|
4
|
|
|
|
|
|
SAVETMPS; |
|
147
|
|
|
|
|
|
|
Buffer buf; |
|
148
|
4
|
|
|
|
|
|
buf_init(aTHX_ &buf); |
|
149
|
4
|
|
|
|
|
|
do_encode(aTHX_ s->enc, batch, &buf); |
|
150
|
4
|
|
|
|
|
|
SV *bytes = sv_2mortal(newSVpvn(buf.ptr, buf.len)); |
|
151
|
|
|
|
|
|
|
/* maybe_compress returns an owned SV (refcount 1); mortalize it |
|
152
|
|
|
|
|
|
|
* here so it's cleaned up after the writer call. */ |
|
153
|
4
|
|
|
|
|
|
SV *out = sv_2mortal( |
|
154
|
|
|
|
|
|
|
maybe_compress(aTHX_ bytes, s->compress_mode, s->hasher_sv)); |
|
155
|
4
|
50
|
|
|
|
|
PUSHMARK(SP); |
|
156
|
4
|
50
|
|
|
|
|
XPUSHs(out); |
|
157
|
4
|
|
|
|
|
|
PUTBACK; |
|
158
|
4
|
|
|
|
|
|
call_sv(s->writer, G_VOID | G_DISCARD); |
|
159
|
4
|
50
|
|
|
|
|
FREETMPS; |
|
160
|
4
|
|
|
|
|
|
LEAVE; |
|
161
|
4
|
|
|
|
|
|
return; |
|
162
|
|
|
|
|
|
|
} |
|
163
|
24
|
|
|
|
|
|
encode_and_emit(aTHX_ s->enc, batch, s->writer); |
|
164
|
|
|
|
|
|
|
} |
|
165
|
|
|
|
|
|
|
|
|
166
|
19
|
|
|
|
|
|
void free_streamer(pTHX_ Streamer *s) { |
|
167
|
19
|
50
|
|
|
|
|
if (!s) return; |
|
168
|
19
|
50
|
|
|
|
|
if (s->enc_sv) SvREFCNT_dec(s->enc_sv); |
|
169
|
19
|
50
|
|
|
|
|
if (s->writer) SvREFCNT_dec(s->writer); |
|
170
|
19
|
50
|
|
|
|
|
if (s->buffer) SvREFCNT_dec((SV *)s->buffer); |
|
171
|
19
|
100
|
|
|
|
|
if (s->compress_mode) Safefree(s->compress_mode); |
|
172
|
19
|
100
|
|
|
|
|
if (s->hasher_sv) SvREFCNT_dec(s->hasher_sv); |
|
173
|
19
|
|
|
|
|
|
Safefree(s); |
|
174
|
|
|
|
|
|
|
} |
|
175
|
|
|
|
|
|
|
|
|
176
|
19
|
|
|
|
|
|
void cleanup_streamer_slot(pTHX_ void *p) { |
|
177
|
19
|
|
|
|
|
|
Streamer **slot = (Streamer **)p; |
|
178
|
19
|
50
|
|
|
|
|
if (*slot) free_streamer(aTHX_ *slot); |
|
179
|
19
|
|
|
|
|
|
} |
|
180
|
|
|
|
|
|
|
|
|
181
|
1519
|
|
|
|
|
|
void free_encoder(pTHX_ Encoder *enc) { |
|
182
|
|
|
|
|
|
|
int i; |
|
183
|
1519
|
50
|
|
|
|
|
if (!enc) return; |
|
184
|
1519
|
50
|
|
|
|
|
if (enc->columns) { |
|
185
|
5724
|
100
|
|
|
|
|
for (i = 0; i < enc->num_columns; i++) { |
|
186
|
4205
|
100
|
|
|
|
|
if (enc->columns[i].name) Safefree(enc->columns[i].name); |
|
187
|
4205
|
100
|
|
|
|
|
if (enc->columns[i].type_str) Safefree(enc->columns[i].type_str); |
|
188
|
4205
|
100
|
|
|
|
|
if (enc->columns[i].type) free_typeinfo(aTHX_ enc->columns[i].type); |
|
189
|
|
|
|
|
|
|
} |
|
190
|
1519
|
|
|
|
|
|
Safefree(enc->columns); |
|
191
|
|
|
|
|
|
|
} |
|
192
|
1519
|
|
|
|
|
|
Safefree(enc); |
|
193
|
|
|
|
|
|
|
} |
|
194
|
|
|
|
|
|
|
|
|
195
|
1519
|
|
|
|
|
|
void cleanup_encoder_slot(pTHX_ void *p) { |
|
196
|
1519
|
|
|
|
|
|
Encoder **slot = (Encoder **)p; |
|
197
|
1519
|
100
|
|
|
|
|
if (*slot) free_encoder(aTHX_ *slot); |
|
198
|
1519
|
|
|
|
|
|
} |