| line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
|
1
|
|
|
|
|
|
|
#define PERL_NO_GET_CONTEXT |
|
2
|
|
|
|
|
|
|
#include "EXTERN.h" |
|
3
|
|
|
|
|
|
|
#include "perl.h" |
|
4
|
|
|
|
|
|
|
#include "XSUB.h" |
|
5
|
|
|
|
|
|
|
|
|
6
|
|
|
|
|
|
|
#include |
|
7
|
|
|
|
|
|
|
#include |
|
8
|
|
|
|
|
|
|
|
|
9
|
|
|
|
|
|
|
#if IVSIZE < 8 |
|
10
|
|
|
|
|
|
|
#error "ClickHouse::Encoder requires a 64-bit Perl (IVSIZE >= 8)" |
|
11
|
|
|
|
|
|
|
#endif |
|
12
|
|
|
|
|
|
|
|
|
13
|
|
|
|
|
|
|
#include "types.h" |
|
14
|
|
|
|
|
|
|
#include "buffer.h" |
|
15
|
|
|
|
|
|
|
#include "encode.h" |
|
16
|
|
|
|
|
|
|
#include "decode.h" |
|
17
|
|
|
|
|
|
|
#include "runtime.h" |
|
18
|
|
|
|
|
|
|
#include "cityhash.h" |
|
19
|
|
|
|
|
|
|
|
|
20
|
|
|
|
|
|
|
/* Read one varint from an SV's byte buffer starting at *off; advances |
|
21
|
|
|
|
|
|
|
* *off past the consumed bytes. Croaks with TCP-style messages on |
|
22
|
|
|
|
|
|
|
* truncation or overflow. Used by the ClickHouse::Encoder::TCP XSUBs |
|
23
|
|
|
|
|
|
|
* which operate on offsets into a Perl scalar rather than the (p,end) |
|
24
|
|
|
|
|
|
|
* cursor pair used by the internal native decoder. */ |
|
25
|
86
|
|
|
|
|
|
static UV tcp_read_varint(pTHX_ const unsigned char *p, UV buf_len, |
|
26
|
|
|
|
|
|
|
UV *off) { |
|
27
|
86
|
|
|
|
|
|
UV v = 0; |
|
28
|
86
|
|
|
|
|
|
int shift = 0; |
|
29
|
16
|
|
|
|
|
|
while (1) { |
|
30
|
102
|
100
|
|
|
|
|
if (*off >= buf_len) |
|
31
|
6
|
|
|
|
|
|
croak("varint: truncated at offset %lu", (unsigned long)*off); |
|
32
|
96
|
|
|
|
|
|
unsigned char b = p[(*off)++]; |
|
33
|
96
|
|
|
|
|
|
v |= ((UV)(b & 0x7f)) << shift; |
|
34
|
96
|
100
|
|
|
|
|
if (!(b & 0x80)) break; |
|
35
|
16
|
|
|
|
|
|
shift += 7; |
|
36
|
16
|
50
|
|
|
|
|
if (shift >= 64) croak("varint exceeds 64 bits"); |
|
37
|
|
|
|
|
|
|
} |
|
38
|
80
|
|
|
|
|
|
return v; |
|
39
|
|
|
|
|
|
|
} |
|
40
|
|
|
|
|
|
|
|
|
41
|
|
|
|
|
|
|
MODULE = ClickHouse::Encoder PACKAGE = ClickHouse::Encoder |
|
42
|
|
|
|
|
|
|
|
|
43
|
|
|
|
|
|
|
SV* |
|
44
|
|
|
|
|
|
|
new(class, ...) |
|
45
|
|
|
|
|
|
|
const char *class |
|
46
|
|
|
|
|
|
|
CODE: |
|
47
|
|
|
|
|
|
|
{ |
|
48
|
1521
|
|
|
|
|
|
Encoder *enc = NULL; |
|
49
|
|
|
|
|
|
|
AV *cols_av; |
|
50
|
|
|
|
|
|
|
SSize_t i, n; |
|
51
|
1521
|
|
|
|
|
|
SV *cols_sv = NULL; |
|
52
|
|
|
|
|
|
|
|
|
53
|
1521
|
50
|
|
|
|
|
if (items % 2 == 0) |
|
54
|
0
|
|
|
|
|
|
croak("Expected key-value pairs"); |
|
55
|
|
|
|
|
|
|
|
|
56
|
3041
|
100
|
|
|
|
|
for (i = 1; i < items; i += 2) { |
|
57
|
|
|
|
|
|
|
STRLEN klen; |
|
58
|
1520
|
|
|
|
|
|
const char *key = SvPV(ST(i), klen); |
|
59
|
1520
|
50
|
|
|
|
|
if (klen == 7 && memcmp(key, "columns", 7) == 0) |
|
|
|
50
|
|
|
|
|
|
|
60
|
1520
|
|
|
|
|
|
cols_sv = ST(i+1); |
|
61
|
|
|
|
|
|
|
} |
|
62
|
|
|
|
|
|
|
|
|
63
|
1521
|
100
|
|
|
|
|
if (!cols_sv || !SvROK(cols_sv) || SvTYPE(SvRV(cols_sv)) != SVt_PVAV) |
|
|
|
100
|
|
|
|
|
|
|
|
|
50
|
|
|
|
|
|
|
64
|
2
|
|
|
|
|
|
croak("columns required and must be arrayref"); |
|
65
|
|
|
|
|
|
|
|
|
66
|
1519
|
|
|
|
|
|
cols_av = (AV*)SvRV(cols_sv); |
|
67
|
1519
|
|
|
|
|
|
n = av_len(cols_av) + 1; |
|
68
|
|
|
|
|
|
|
|
|
69
|
|
|
|
|
|
|
/* Wrap allocation in our own ENTER/LEAVE so the cleanup destructor fires |
|
70
|
|
|
|
|
|
|
* before this XSUB returns (XSUBs don't get implicit ENTER/LEAVE). */ |
|
71
|
1519
|
|
|
|
|
|
ENTER; |
|
72
|
1519
|
|
|
|
|
|
Newxz(enc, 1, Encoder); |
|
73
|
1519
|
|
|
|
|
|
SAVEDESTRUCTOR_X(cleanup_encoder_slot, &enc); |
|
74
|
|
|
|
|
|
|
|
|
75
|
1519
|
50
|
|
|
|
|
Newxz(enc->columns, n, Column); |
|
76
|
1519
|
|
|
|
|
|
enc->num_columns = n; |
|
77
|
|
|
|
|
|
|
|
|
78
|
5696
|
100
|
|
|
|
|
for (i = 0; i < n; i++) { |
|
79
|
4205
|
|
|
|
|
|
SV **col_sv = av_fetch(cols_av, i, 0); |
|
80
|
|
|
|
|
|
|
AV *col_av; |
|
81
|
|
|
|
|
|
|
SV **name_sv, **type_sv; |
|
82
|
|
|
|
|
|
|
STRLEN len; |
|
83
|
|
|
|
|
|
|
const char *s; |
|
84
|
|
|
|
|
|
|
|
|
85
|
4205
|
50
|
|
|
|
|
if (!col_sv || !SvROK(*col_sv) || SvTYPE(SvRV(*col_sv)) != SVt_PVAV) |
|
|
|
50
|
|
|
|
|
|
|
|
|
50
|
|
|
|
|
|
|
86
|
0
|
|
|
|
|
|
croak("Column must be [name, type]"); |
|
87
|
|
|
|
|
|
|
|
|
88
|
4205
|
|
|
|
|
|
col_av = (AV*)SvRV(*col_sv); |
|
89
|
4205
|
|
|
|
|
|
name_sv = av_fetch(col_av, 0, 0); |
|
90
|
4205
|
|
|
|
|
|
type_sv = av_fetch(col_av, 1, 0); |
|
91
|
|
|
|
|
|
|
|
|
92
|
4205
|
50
|
|
|
|
|
if (!name_sv || !type_sv) |
|
|
|
100
|
|
|
|
|
|
|
93
|
2
|
|
|
|
|
|
croak("Column must be [name, type]"); |
|
94
|
|
|
|
|
|
|
|
|
95
|
4203
|
|
|
|
|
|
s = SvPV(*name_sv, len); |
|
96
|
4203
|
|
|
|
|
|
Newx(enc->columns[i].name, len + 1, char); |
|
97
|
4203
|
|
|
|
|
|
memcpy(enc->columns[i].name, s, len); |
|
98
|
4203
|
|
|
|
|
|
enc->columns[i].name[len] = 0; |
|
99
|
4203
|
|
|
|
|
|
enc->columns[i].name_len = len; |
|
100
|
|
|
|
|
|
|
|
|
101
|
4203
|
|
|
|
|
|
s = SvPV(*type_sv, len); |
|
102
|
4203
|
|
|
|
|
|
Newx(enc->columns[i].type_str, len + 1, char); |
|
103
|
4203
|
|
|
|
|
|
memcpy(enc->columns[i].type_str, s, len); |
|
104
|
4203
|
|
|
|
|
|
enc->columns[i].type_str[len] = 0; |
|
105
|
4203
|
|
|
|
|
|
enc->columns[i].type_len = len; |
|
106
|
|
|
|
|
|
|
|
|
107
|
4203
|
|
|
|
|
|
enc->columns[i].type = parse_type(aTHX_ enc->columns[i].type_str, len); |
|
108
|
|
|
|
|
|
|
} |
|
109
|
|
|
|
|
|
|
|
|
110
|
1491
|
|
|
|
|
|
RETVAL = newSV(0); |
|
111
|
1491
|
|
|
|
|
|
sv_setref_pv(RETVAL, class, (void*)enc); |
|
112
|
1491
|
|
|
|
|
|
enc = NULL; /* Disarm: the SV's DESTROY now owns the encoder. */ |
|
113
|
1491
|
|
|
|
|
|
LEAVE; |
|
114
|
|
|
|
|
|
|
} |
|
115
|
|
|
|
|
|
|
OUTPUT: |
|
116
|
|
|
|
|
|
|
RETVAL |
|
117
|
|
|
|
|
|
|
|
|
118
|
|
|
|
|
|
|
SV* |
|
119
|
|
|
|
|
|
|
encode(self, rows) |
|
120
|
|
|
|
|
|
|
SV *self |
|
121
|
|
|
|
|
|
|
SV *rows |
|
122
|
|
|
|
|
|
|
CODE: |
|
123
|
|
|
|
|
|
|
{ |
|
124
|
|
|
|
|
|
|
Encoder *enc; |
|
125
|
|
|
|
|
|
|
Buffer buf; |
|
126
|
|
|
|
|
|
|
|
|
127
|
2053
|
50
|
|
|
|
|
if (!sv_isobject(self)) croak("Not an object"); |
|
128
|
2053
|
100
|
|
|
|
|
if (!SvROK(rows) || SvTYPE(SvRV(rows)) != SVt_PVAV) |
|
|
|
50
|
|
|
|
|
|
|
129
|
2
|
|
|
|
|
|
croak("rows must be arrayref"); |
|
130
|
|
|
|
|
|
|
|
|
131
|
2051
|
|
|
|
|
|
enc = INT2PTR(Encoder*, SvIV(SvRV(self))); |
|
132
|
2051
|
|
|
|
|
|
buf_init(aTHX_ &buf); |
|
133
|
2051
|
|
|
|
|
|
do_encode(aTHX_ enc, (AV *)SvRV(rows), &buf); |
|
134
|
1989
|
|
|
|
|
|
RETVAL = newSVpvn(buf.ptr, buf.len); |
|
135
|
|
|
|
|
|
|
} |
|
136
|
|
|
|
|
|
|
OUTPUT: |
|
137
|
|
|
|
|
|
|
RETVAL |
|
138
|
|
|
|
|
|
|
|
|
139
|
|
|
|
|
|
|
void |
|
140
|
|
|
|
|
|
|
encode_into(self, target_ref, rows) |
|
141
|
|
|
|
|
|
|
SV *self |
|
142
|
|
|
|
|
|
|
SV *target_ref |
|
143
|
|
|
|
|
|
|
SV *rows |
|
144
|
|
|
|
|
|
|
CODE: |
|
145
|
|
|
|
|
|
|
{ |
|
146
|
|
|
|
|
|
|
Encoder *enc; |
|
147
|
|
|
|
|
|
|
Buffer buf; |
|
148
|
|
|
|
|
|
|
SV *target; |
|
149
|
|
|
|
|
|
|
|
|
150
|
4
|
50
|
|
|
|
|
if (!sv_isobject(self)) croak("Not an object"); |
|
151
|
4
|
100
|
|
|
|
|
if (!SvROK(target_ref)) |
|
152
|
2
|
|
|
|
|
|
croak("encode_into: first argument must be a scalar reference"); |
|
153
|
2
|
50
|
|
|
|
|
if (!SvROK(rows) || SvTYPE(SvRV(rows)) != SVt_PVAV) |
|
|
|
50
|
|
|
|
|
|
|
154
|
0
|
|
|
|
|
|
croak("rows must be arrayref"); |
|
155
|
|
|
|
|
|
|
|
|
156
|
2
|
|
|
|
|
|
target = SvRV(target_ref); |
|
157
|
2
|
|
|
|
|
|
enc = INT2PTR(Encoder*, SvIV(SvRV(self))); |
|
158
|
|
|
|
|
|
|
|
|
159
|
2
|
|
|
|
|
|
buf_init(aTHX_ &buf); |
|
160
|
2
|
|
|
|
|
|
do_encode(aTHX_ enc, (AV *)SvRV(rows), &buf); |
|
161
|
|
|
|
|
|
|
|
|
162
|
2
|
50
|
|
|
|
|
if (!SvOK(target)) sv_setpvn(target, "", 0); |
|
163
|
2
|
|
|
|
|
|
sv_catpvn(target, buf.ptr, buf.len); |
|
164
|
|
|
|
|
|
|
} |
|
165
|
|
|
|
|
|
|
|
|
166
|
|
|
|
|
|
|
void |
|
167
|
|
|
|
|
|
|
encode_to_handle(self, fh, rows) |
|
168
|
|
|
|
|
|
|
SV *self |
|
169
|
|
|
|
|
|
|
SV *fh |
|
170
|
|
|
|
|
|
|
SV *rows |
|
171
|
|
|
|
|
|
|
CODE: |
|
172
|
|
|
|
|
|
|
{ |
|
173
|
|
|
|
|
|
|
Encoder *enc; |
|
174
|
|
|
|
|
|
|
Buffer buf; |
|
175
|
|
|
|
|
|
|
IO *io; |
|
176
|
|
|
|
|
|
|
PerlIO *pio; |
|
177
|
|
|
|
|
|
|
|
|
178
|
5
|
50
|
|
|
|
|
if (!sv_isobject(self)) croak("Not an object"); |
|
179
|
5
|
50
|
|
|
|
|
if (!SvROK(rows) || SvTYPE(SvRV(rows)) != SVt_PVAV) |
|
|
|
50
|
|
|
|
|
|
|
180
|
0
|
|
|
|
|
|
croak("rows must be arrayref"); |
|
181
|
5
|
|
|
|
|
|
io = sv_2io(fh); |
|
182
|
5
|
50
|
|
|
|
|
if (!io) croak("encode_to_handle: not a filehandle"); |
|
183
|
5
|
|
|
|
|
|
pio = IoOFP(io); |
|
184
|
5
|
100
|
|
|
|
|
if (!pio) croak("encode_to_handle: filehandle not open for writing"); |
|
185
|
|
|
|
|
|
|
|
|
186
|
4
|
|
|
|
|
|
enc = INT2PTR(Encoder*, SvIV(SvRV(self))); |
|
187
|
4
|
|
|
|
|
|
buf_init(aTHX_ &buf); |
|
188
|
4
|
|
|
|
|
|
do_encode(aTHX_ enc, (AV *)SvRV(rows), &buf); |
|
189
|
|
|
|
|
|
|
|
|
190
|
4
|
100
|
|
|
|
|
if (PerlIO_write(pio, buf.ptr, buf.len) != (SSize_t)buf.len) |
|
191
|
1
|
|
|
|
|
|
croak("encode_to_handle: short write: %s", strerror(errno)); |
|
192
|
|
|
|
|
|
|
} |
|
193
|
|
|
|
|
|
|
|
|
194
|
|
|
|
|
|
|
SV* |
|
195
|
|
|
|
|
|
|
encode_columns(self, cols_hv) |
|
196
|
|
|
|
|
|
|
SV *self |
|
197
|
|
|
|
|
|
|
SV *cols_hv |
|
198
|
|
|
|
|
|
|
CODE: |
|
199
|
|
|
|
|
|
|
{ |
|
200
|
|
|
|
|
|
|
/* Column-oriented input: cols_hv is a hashref { name => [v0, v1, ...] } |
|
201
|
|
|
|
|
|
|
* with one array per column, all the same length. Skips the row->column |
|
202
|
|
|
|
|
|
|
* permutation step that encode() does for arrayref-of-arrayref input. */ |
|
203
|
|
|
|
|
|
|
Encoder *enc; |
|
204
|
|
|
|
|
|
|
HV *cols; |
|
205
|
|
|
|
|
|
|
Buffer buf; |
|
206
|
8
|
|
|
|
|
|
SSize_t num_rows = -1; |
|
207
|
|
|
|
|
|
|
int c; |
|
208
|
|
|
|
|
|
|
|
|
209
|
8
|
50
|
|
|
|
|
if (!sv_isobject(self)) croak("Not an object"); |
|
210
|
8
|
100
|
|
|
|
|
if (!SvROK(cols_hv) || SvTYPE(SvRV(cols_hv)) != SVt_PVHV) |
|
|
|
50
|
|
|
|
|
|
|
211
|
1
|
|
|
|
|
|
croak("encode_columns: arg must be hashref { name => [...] }"); |
|
212
|
|
|
|
|
|
|
|
|
213
|
7
|
|
|
|
|
|
cols = (HV *)SvRV(cols_hv); |
|
214
|
7
|
|
|
|
|
|
enc = INT2PTR(Encoder*, SvIV(SvRV(self))); |
|
215
|
|
|
|
|
|
|
|
|
216
|
|
|
|
|
|
|
/* Validate: every declared column must be present and same length. */ |
|
217
|
16
|
100
|
|
|
|
|
for (c = 0; c < enc->num_columns; c++) { |
|
218
|
14
|
|
|
|
|
|
Column *col = &enc->columns[c]; |
|
219
|
14
|
|
|
|
|
|
SV **slot = hv_fetch(cols, col->name, col->name_len, 0); |
|
220
|
14
|
100
|
|
|
|
|
if (!slot || !SvOK(*slot)) |
|
|
|
50
|
|
|
|
|
|
|
221
|
2
|
|
|
|
|
|
croak("encode_columns: missing column '%.*s'", |
|
222
|
|
|
|
|
|
|
(int)col->name_len, col->name); |
|
223
|
12
|
100
|
|
|
|
|
if (!SvROK(*slot) || SvTYPE(SvRV(*slot)) != SVt_PVAV) |
|
|
|
50
|
|
|
|
|
|
|
224
|
1
|
|
|
|
|
|
croak("encode_columns: column '%.*s' must be an arrayref", |
|
225
|
|
|
|
|
|
|
(int)col->name_len, col->name); |
|
226
|
11
|
|
|
|
|
|
AV *av = (AV *)SvRV(*slot); |
|
227
|
11
|
|
|
|
|
|
SSize_t n = av_len(av) + 1; |
|
228
|
11
|
100
|
|
|
|
|
if (num_rows == -1) num_rows = n; |
|
229
|
5
|
100
|
|
|
|
|
else if (n != num_rows) |
|
230
|
2
|
|
|
|
|
|
croak("encode_columns: column '%.*s' has %" IVdf " rows, " |
|
231
|
|
|
|
|
|
|
"expected %" IVdf, (int)col->name_len, col->name, |
|
232
|
|
|
|
|
|
|
(IV)n, (IV)num_rows); |
|
233
|
|
|
|
|
|
|
} |
|
234
|
2
|
50
|
|
|
|
|
if (num_rows == -1) num_rows = 0; |
|
235
|
|
|
|
|
|
|
|
|
236
|
2
|
|
|
|
|
|
buf_init(aTHX_ &buf); |
|
237
|
2
|
|
|
|
|
|
buf_varint(aTHX_ &buf, enc->num_columns); |
|
238
|
2
|
|
|
|
|
|
buf_varint(aTHX_ &buf, num_rows); |
|
239
|
|
|
|
|
|
|
|
|
240
|
2
|
|
|
|
|
|
SV **col_values = alloc_sv_array(aTHX_ num_rows); |
|
241
|
7
|
100
|
|
|
|
|
for (c = 0; c < enc->num_columns; c++) { |
|
242
|
5
|
|
|
|
|
|
Column *col = &enc->columns[c]; |
|
243
|
5
|
|
|
|
|
|
SV **slot = hv_fetch(cols, col->name, col->name_len, 0); |
|
244
|
5
|
|
|
|
|
|
AV *av = (AV *)SvRV(*slot); |
|
245
|
|
|
|
|
|
|
SSize_t r; |
|
246
|
|
|
|
|
|
|
|
|
247
|
5
|
|
|
|
|
|
buf_string(aTHX_ &buf, col->name, col->name_len); |
|
248
|
5
|
|
|
|
|
|
buf_string(aTHX_ &buf, col->type_str, col->type_len); |
|
249
|
|
|
|
|
|
|
|
|
250
|
20
|
100
|
|
|
|
|
for (r = 0; r < num_rows; r++) { |
|
251
|
15
|
|
|
|
|
|
SV **e = av_fetch(av, r, 0); |
|
252
|
15
|
50
|
|
|
|
|
if (e) { |
|
253
|
|
|
|
|
|
|
/* Same magic-realization step as do_encode's gather: |
|
254
|
|
|
|
|
|
|
* tied per-column AVs return magical placeholders that |
|
255
|
|
|
|
|
|
|
* downstream SvIV / SvPV won't see until SvGETMAGIC. */ |
|
256
|
15
|
|
|
|
|
|
SvGETMAGIC(*e); |
|
257
|
15
|
|
|
|
|
|
col_values[r] = *e; |
|
258
|
|
|
|
|
|
|
} else { |
|
259
|
0
|
|
|
|
|
|
col_values[r] = &PL_sv_undef; |
|
260
|
|
|
|
|
|
|
} |
|
261
|
|
|
|
|
|
|
} |
|
262
|
5
|
|
|
|
|
|
encode_column(aTHX_ &buf, col_values, num_rows, col->type); |
|
263
|
|
|
|
|
|
|
} |
|
264
|
|
|
|
|
|
|
|
|
265
|
2
|
|
|
|
|
|
RETVAL = newSVpvn(buf.ptr, buf.len); |
|
266
|
|
|
|
|
|
|
} |
|
267
|
|
|
|
|
|
|
OUTPUT: |
|
268
|
|
|
|
|
|
|
RETVAL |
|
269
|
|
|
|
|
|
|
|
|
270
|
|
|
|
|
|
|
void |
|
271
|
|
|
|
|
|
|
stream(self, iter, writer, ...) |
|
272
|
|
|
|
|
|
|
SV *self |
|
273
|
|
|
|
|
|
|
SV *iter |
|
274
|
|
|
|
|
|
|
SV *writer |
|
275
|
|
|
|
|
|
|
CODE: |
|
276
|
|
|
|
|
|
|
{ |
|
277
|
|
|
|
|
|
|
/* XS-side iterator loop: pull rows via call_sv, batch them in an AV, |
|
278
|
|
|
|
|
|
|
* encode and call writer on threshold. Saves per-row Perl method |
|
279
|
|
|
|
|
|
|
* dispatch compared to driving the loop from Perl. */ |
|
280
|
|
|
|
|
|
|
Encoder *enc; |
|
281
|
2
|
|
|
|
|
|
int batch_size = 10000; |
|
282
|
|
|
|
|
|
|
AV *batch; |
|
283
|
|
|
|
|
|
|
int n; |
|
284
|
|
|
|
|
|
|
|
|
285
|
2
|
50
|
|
|
|
|
if (!sv_isobject(self)) croak("Not an object"); |
|
286
|
2
|
50
|
|
|
|
|
if (!SvROK(iter) || SvTYPE(SvRV(iter)) != SVt_PVCV) |
|
|
|
50
|
|
|
|
|
|
|
287
|
0
|
|
|
|
|
|
croak("stream: iter must be a coderef"); |
|
288
|
2
|
50
|
|
|
|
|
if (!SvROK(writer) || SvTYPE(SvRV(writer)) != SVt_PVCV) |
|
|
|
50
|
|
|
|
|
|
|
289
|
0
|
|
|
|
|
|
croak("stream: writer must be a coderef"); |
|
290
|
|
|
|
|
|
|
|
|
291
|
|
|
|
|
|
|
/* Optional named arg: batch_size => N */ |
|
292
|
2
|
50
|
|
|
|
|
if (items > 3) { |
|
293
|
|
|
|
|
|
|
int i; |
|
294
|
4
|
100
|
|
|
|
|
for (i = 3; i < items - 1; i += 2) { |
|
295
|
|
|
|
|
|
|
STRLEN klen; |
|
296
|
2
|
|
|
|
|
|
const char *key = SvPV(ST(i), klen); |
|
297
|
2
|
50
|
|
|
|
|
if (klen == 10 && memcmp(key, "batch_size", 10) == 0) |
|
|
|
50
|
|
|
|
|
|
|
298
|
2
|
|
|
|
|
|
batch_size = (int)SvIV(ST(i+1)); |
|
299
|
|
|
|
|
|
|
} |
|
300
|
2
|
50
|
|
|
|
|
if (batch_size < 1) batch_size = 1; |
|
301
|
|
|
|
|
|
|
} |
|
302
|
|
|
|
|
|
|
|
|
303
|
2
|
|
|
|
|
|
enc = INT2PTR(Encoder*, SvIV(SvRV(self))); |
|
304
|
2
|
|
|
|
|
|
batch = (AV *)sv_2mortal((SV *)newAV()); |
|
305
|
|
|
|
|
|
|
|
|
306
|
50025
|
|
|
|
|
|
for (;;) { |
|
307
|
50027
|
|
|
|
|
|
dSP; |
|
308
|
50027
|
|
|
|
|
|
ENTER; SAVETMPS; |
|
309
|
50027
|
50
|
|
|
|
|
PUSHMARK(SP); PUTBACK; |
|
310
|
50027
|
|
|
|
|
|
n = call_sv(iter, G_SCALAR); |
|
311
|
50027
|
|
|
|
|
|
SPAGAIN; |
|
312
|
50027
|
50
|
|
|
|
|
SV *row = (n > 0) ? POPs : &PL_sv_undef; |
|
313
|
50027
|
|
|
|
|
|
int got = SvOK(row); |
|
314
|
50027
|
100
|
|
|
|
|
if (got) SvREFCNT_inc(row); |
|
315
|
50027
|
|
|
|
|
|
PUTBACK; |
|
316
|
50027
|
50
|
|
|
|
|
FREETMPS; LEAVE; |
|
317
|
|
|
|
|
|
|
|
|
318
|
50027
|
100
|
|
|
|
|
if (!got) break; |
|
319
|
50025
|
|
|
|
|
|
av_push(batch, row); |
|
320
|
50025
|
100
|
|
|
|
|
if (av_len(batch) + 1 >= batch_size) { |
|
321
|
52
|
|
|
|
|
|
encode_and_emit(aTHX_ enc, batch, writer); |
|
322
|
52
|
|
|
|
|
|
av_clear(batch); |
|
323
|
|
|
|
|
|
|
} |
|
324
|
|
|
|
|
|
|
} |
|
325
|
|
|
|
|
|
|
|
|
326
|
2
|
100
|
|
|
|
|
if (av_len(batch) >= 0) |
|
327
|
1
|
|
|
|
|
|
encode_and_emit(aTHX_ enc, batch, writer); |
|
328
|
|
|
|
|
|
|
} |
|
329
|
|
|
|
|
|
|
|
|
330
|
|
|
|
|
|
|
SV* |
|
331
|
|
|
|
|
|
|
streamer(self, writer, ...) |
|
332
|
|
|
|
|
|
|
SV *self |
|
333
|
|
|
|
|
|
|
SV *writer |
|
334
|
|
|
|
|
|
|
CODE: |
|
335
|
|
|
|
|
|
|
{ |
|
336
|
|
|
|
|
|
|
/* Constructor for ClickHouse::Encoder::Streamer. */ |
|
337
|
|
|
|
|
|
|
Streamer *s; |
|
338
|
19
|
|
|
|
|
|
int batch_size = 10000; |
|
339
|
19
|
|
|
|
|
|
const char *compress_mode = NULL; |
|
340
|
19
|
|
|
|
|
|
STRLEN compress_mode_len = 0; |
|
341
|
19
|
|
|
|
|
|
SV *hasher_sv = NULL; |
|
342
|
|
|
|
|
|
|
|
|
343
|
19
|
50
|
|
|
|
|
if (!sv_isobject(self)) croak("Not an object"); |
|
344
|
19
|
50
|
|
|
|
|
if (!SvROK(writer) || SvTYPE(SvRV(writer)) != SVt_PVCV) |
|
|
|
50
|
|
|
|
|
|
|
345
|
0
|
|
|
|
|
|
croak("streamer: writer must be a coderef"); |
|
346
|
|
|
|
|
|
|
|
|
347
|
19
|
50
|
|
|
|
|
if (items > 2) { |
|
348
|
|
|
|
|
|
|
int i; |
|
349
|
42
|
100
|
|
|
|
|
for (i = 2; i < items - 1; i += 2) { |
|
350
|
|
|
|
|
|
|
STRLEN klen; |
|
351
|
23
|
|
|
|
|
|
const char *key = SvPV(ST(i), klen); |
|
352
|
23
|
100
|
|
|
|
|
if (klen == 10 && memcmp(key, "batch_size", 10) == 0) { |
|
|
|
50
|
|
|
|
|
|
|
353
|
19
|
|
|
|
|
|
batch_size = (int)SvIV(ST(i+1)); |
|
354
|
|
|
|
|
|
|
} |
|
355
|
4
|
100
|
|
|
|
|
else if (klen == 8 && memcmp(key, "compress", 8) == 0 |
|
|
|
50
|
|
|
|
|
|
|
356
|
3
|
50
|
|
|
|
|
&& SvOK(ST(i+1))) { |
|
357
|
3
|
|
|
|
|
|
compress_mode = SvPV(ST(i+1), compress_mode_len); |
|
358
|
|
|
|
|
|
|
} |
|
359
|
1
|
50
|
|
|
|
|
else if (klen == 6 && memcmp(key, "hasher", 6) == 0 |
|
|
|
50
|
|
|
|
|
|
|
360
|
1
|
50
|
|
|
|
|
&& SvROK(ST(i+1)) |
|
361
|
1
|
50
|
|
|
|
|
&& SvTYPE(SvRV(ST(i+1))) == SVt_PVCV) { |
|
362
|
1
|
|
|
|
|
|
hasher_sv = ST(i+1); |
|
363
|
|
|
|
|
|
|
} |
|
364
|
|
|
|
|
|
|
} |
|
365
|
19
|
50
|
|
|
|
|
if (batch_size < 1) batch_size = 1; |
|
366
|
|
|
|
|
|
|
} |
|
367
|
|
|
|
|
|
|
|
|
368
|
|
|
|
|
|
|
/* Construct under our own ENTER/LEAVE so a croak partway through |
|
369
|
|
|
|
|
|
|
* (e.g. OOM in newAV) frees the partially-built struct. */ |
|
370
|
19
|
|
|
|
|
|
ENTER; |
|
371
|
19
|
|
|
|
|
|
Newxz(s, 1, Streamer); |
|
372
|
19
|
|
|
|
|
|
SAVEDESTRUCTOR_X(cleanup_streamer_slot, &s); |
|
373
|
|
|
|
|
|
|
|
|
374
|
|
|
|
|
|
|
/* Hold an owned RV pointing at the same blessed inner SV, so the |
|
375
|
|
|
|
|
|
|
* encoder survives even if the user's $enc goes out of scope. */ |
|
376
|
19
|
|
|
|
|
|
s->enc_sv = newRV_inc(SvRV(self)); |
|
377
|
19
|
|
|
|
|
|
s->enc = INT2PTR(Encoder*, SvIV(SvRV(s->enc_sv))); |
|
378
|
19
|
|
|
|
|
|
s->writer = newSVsv(writer); |
|
379
|
19
|
|
|
|
|
|
s->buffer = newAV(); |
|
380
|
19
|
|
|
|
|
|
s->batch_size = batch_size; |
|
381
|
|
|
|
|
|
|
|
|
382
|
|
|
|
|
|
|
/* Copy the compress mode into Streamer-owned memory. NULL / "none" / |
|
383
|
|
|
|
|
|
|
* "raw" are all treated as "no compression" by streamer_flush. */ |
|
384
|
19
|
100
|
|
|
|
|
if (compress_mode |
|
385
|
3
|
50
|
|
|
|
|
&& !(compress_mode_len == 0) |
|
386
|
3
|
100
|
|
|
|
|
&& !(compress_mode_len == 4 && memcmp(compress_mode, "none", 4) == 0) |
|
|
|
50
|
|
|
|
|
|
|
387
|
2
|
50
|
|
|
|
|
&& !(compress_mode_len == 3 && memcmp(compress_mode, "raw", 3) == 0)) { |
|
|
|
50
|
|
|
|
|
|
|
388
|
2
|
|
|
|
|
|
Newx(s->compress_mode, compress_mode_len + 1, char); |
|
389
|
2
|
|
|
|
|
|
memcpy(s->compress_mode, compress_mode, compress_mode_len); |
|
390
|
2
|
|
|
|
|
|
s->compress_mode[compress_mode_len] = '\0'; |
|
391
|
|
|
|
|
|
|
} |
|
392
|
19
|
100
|
|
|
|
|
if (hasher_sv) s->hasher_sv = newSVsv(hasher_sv); |
|
393
|
|
|
|
|
|
|
|
|
394
|
19
|
|
|
|
|
|
RETVAL = newSV(0); |
|
395
|
19
|
|
|
|
|
|
sv_setref_pv(RETVAL, "ClickHouse::Encoder::Streamer", (void *)s); |
|
396
|
19
|
|
|
|
|
|
s = NULL; /* disarm: SV's DESTROY now owns the streamer */ |
|
397
|
19
|
|
|
|
|
|
LEAVE; |
|
398
|
|
|
|
|
|
|
} |
|
399
|
|
|
|
|
|
|
OUTPUT: |
|
400
|
|
|
|
|
|
|
RETVAL |
|
401
|
|
|
|
|
|
|
|
|
402
|
|
|
|
|
|
|
SV* |
|
403
|
|
|
|
|
|
|
_columns(self) |
|
404
|
|
|
|
|
|
|
SV *self |
|
405
|
|
|
|
|
|
|
CODE: |
|
406
|
|
|
|
|
|
|
{ |
|
407
|
|
|
|
|
|
|
Encoder *enc; |
|
408
|
|
|
|
|
|
|
AV *cols; |
|
409
|
|
|
|
|
|
|
int i; |
|
410
|
|
|
|
|
|
|
|
|
411
|
40
|
50
|
|
|
|
|
if (!sv_isobject(self)) |
|
412
|
0
|
|
|
|
|
|
croak("Not an object"); |
|
413
|
|
|
|
|
|
|
|
|
414
|
40
|
|
|
|
|
|
enc = INT2PTR(Encoder*, SvIV(SvRV(self))); |
|
415
|
40
|
|
|
|
|
|
cols = newAV(); |
|
416
|
|
|
|
|
|
|
|
|
417
|
118
|
100
|
|
|
|
|
for (i = 0; i < enc->num_columns; i++) { |
|
418
|
78
|
|
|
|
|
|
AV *col = newAV(); |
|
419
|
78
|
|
|
|
|
|
av_push(col, newSVpvn(enc->columns[i].name, enc->columns[i].name_len)); |
|
420
|
78
|
|
|
|
|
|
av_push(col, newSVpvn(enc->columns[i].type_str, enc->columns[i].type_len)); |
|
421
|
78
|
|
|
|
|
|
av_push(cols, newRV_noinc((SV*)col)); |
|
422
|
|
|
|
|
|
|
} |
|
423
|
|
|
|
|
|
|
|
|
424
|
40
|
|
|
|
|
|
RETVAL = newRV_noinc((SV*)cols); |
|
425
|
|
|
|
|
|
|
} |
|
426
|
|
|
|
|
|
|
OUTPUT: |
|
427
|
|
|
|
|
|
|
RETVAL |
|
428
|
|
|
|
|
|
|
|
|
429
|
|
|
|
|
|
|
void |
|
430
|
|
|
|
|
|
|
DESTROY(self) |
|
431
|
|
|
|
|
|
|
SV *self |
|
432
|
|
|
|
|
|
|
CODE: |
|
433
|
|
|
|
|
|
|
{ |
|
434
|
|
|
|
|
|
|
Encoder *enc; |
|
435
|
1491
|
50
|
|
|
|
|
if (!sv_isobject(self)) return; |
|
436
|
1491
|
|
|
|
|
|
enc = INT2PTR(Encoder*, SvIV(SvRV(self))); |
|
437
|
1491
|
|
|
|
|
|
free_encoder(aTHX_ enc); |
|
438
|
|
|
|
|
|
|
} |
|
439
|
|
|
|
|
|
|
|
|
440
|
|
|
|
|
|
|
SV * |
|
441
|
|
|
|
|
|
|
decode_block(class, bytes, ...) |
|
442
|
|
|
|
|
|
|
SV *class |
|
443
|
|
|
|
|
|
|
SV *bytes |
|
444
|
|
|
|
|
|
|
CODE: |
|
445
|
|
|
|
|
|
|
{ |
|
446
|
|
|
|
|
|
|
PERL_UNUSED_VAR(class); |
|
447
|
|
|
|
|
|
|
/* Optional 3rd arg: byte offset into the input. Lets callers walk |
|
448
|
|
|
|
|
|
|
* a concatenated multi-block stream in O(N) instead of O(N*K) - the |
|
449
|
|
|
|
|
|
|
* Perl-side decode_blocks wrapper relies on this to avoid substr |
|
450
|
|
|
|
|
|
|
* copies on each iteration. |
|
451
|
|
|
|
|
|
|
* |
|
452
|
|
|
|
|
|
|
* Optional 4th arg: hashref of column names to KEEP. When provided, |
|
453
|
|
|
|
|
|
|
* columns whose name isn't in the set get a placeholder values |
|
454
|
|
|
|
|
|
|
* arrayref (one undef per row) and the wire bytes are still consumed |
|
455
|
|
|
|
|
|
|
* for that column's data, but no SVs are allocated for the values. |
|
456
|
|
|
|
|
|
|
* Memory win on wide SELECT * when caller wants a few columns. */ |
|
457
|
227
|
|
|
|
|
|
UV start_offset = 0; |
|
458
|
227
|
|
|
|
|
|
HV *keep_set = NULL; |
|
459
|
227
|
100
|
|
|
|
|
if (items >= 3) { |
|
460
|
71
|
|
|
|
|
|
IV signed_off = SvIV(ST(2)); |
|
461
|
71
|
100
|
|
|
|
|
if (signed_off < 0) |
|
462
|
1
|
|
|
|
|
|
croak("decode_block: offset must be non-negative (got %" |
|
463
|
|
|
|
|
|
|
IVdf ")", signed_off); |
|
464
|
70
|
|
|
|
|
|
start_offset = (UV)signed_off; |
|
465
|
|
|
|
|
|
|
} |
|
466
|
226
|
100
|
|
|
|
|
if (items >= 4 && SvOK(ST(3))) { |
|
|
|
100
|
|
|
|
|
|
|
467
|
25
|
100
|
|
|
|
|
if (!SvROK(ST(3)) || SvTYPE(SvRV(ST(3))) != SVt_PVHV) |
|
|
|
100
|
|
|
|
|
|
|
468
|
2
|
100
|
|
|
|
|
croak("decode_block: columns filter must be a hashref " |
|
469
|
|
|
|
|
|
|
"(got %s)", |
|
470
|
|
|
|
|
|
|
SvROK(ST(3)) ? sv_reftype(SvRV(ST(3)), 0) |
|
471
|
|
|
|
|
|
|
: "non-reference"); |
|
472
|
23
|
|
|
|
|
|
keep_set = (HV *)SvRV(ST(3)); |
|
473
|
|
|
|
|
|
|
} |
|
474
|
|
|
|
|
|
|
const unsigned char *start, *p, *end; |
|
475
|
|
|
|
|
|
|
UV ncols, nrows; |
|
476
|
224
|
|
|
|
|
|
decode_block_prologue(aTHX_ bytes, start_offset, "decode_block", |
|
477
|
|
|
|
|
|
|
&start, &p, &end, &ncols, &nrows); |
|
478
|
|
|
|
|
|
|
|
|
479
|
|
|
|
|
|
|
/* Mortalize cols so a mid-loop croak (from decode_column, |
|
480
|
|
|
|
|
|
|
* dec_lenpfx_string, etc.) reclaims it instead of leaking. We |
|
481
|
|
|
|
|
|
|
* SvREFCNT_inc when transferring ownership to the result HV. */ |
|
482
|
220
|
|
|
|
|
|
AV *cols = (AV *)sv_2mortal((SV *)newAV()); |
|
483
|
220
|
50
|
|
|
|
|
if (ncols > 0) av_extend(cols, ncols - 1); |
|
484
|
|
|
|
|
|
|
UV c; |
|
485
|
1520
|
100
|
|
|
|
|
for (c = 0; c < ncols; c++) { |
|
486
|
|
|
|
|
|
|
const char *name; STRLEN name_len; |
|
487
|
|
|
|
|
|
|
const char *tstr; STRLEN tlen; |
|
488
|
1330
|
|
|
|
|
|
dec_lenpfx_string(aTHX_ &p, end, &name, &name_len); |
|
489
|
1330
|
|
|
|
|
|
dec_lenpfx_string(aTHX_ &p, end, &tstr, &tlen); |
|
490
|
|
|
|
|
|
|
/* parse_type uses heap-slot SAVEDESTRUCTOR_X for cleanup on |
|
491
|
|
|
|
|
|
|
* croak; on success the slot is disarmed. We're in the XSUB's |
|
492
|
|
|
|
|
|
|
* implicit save scope, so a croak from any nested decode |
|
493
|
|
|
|
|
|
|
* unwinds the type back through this cleanup. */ |
|
494
|
1320
|
|
|
|
|
|
TypeInfo *t = parse_type(aTHX_ tstr, tlen); |
|
495
|
|
|
|
|
|
|
|
|
496
|
1320
|
|
|
|
|
|
int keep = 1; |
|
497
|
1320
|
100
|
|
|
|
|
if (keep_set && !hv_exists(keep_set, name, name_len)) |
|
|
|
100
|
|
|
|
|
|
|
498
|
524
|
|
|
|
|
|
keep = 0; |
|
499
|
|
|
|
|
|
|
|
|
500
|
|
|
|
|
|
|
SV *values; |
|
501
|
1320
|
100
|
|
|
|
|
if (keep) { |
|
502
|
796
|
|
|
|
|
|
values = decode_column(aTHX_ &p, end, t, (SSize_t)nrows); |
|
503
|
|
|
|
|
|
|
} else { |
|
504
|
|
|
|
|
|
|
/* Decode then discard - we still must consume the wire |
|
505
|
|
|
|
|
|
|
* bytes to keep the cursor aligned for the next column. |
|
506
|
|
|
|
|
|
|
* The AV is freed immediately so peak memory is one |
|
507
|
|
|
|
|
|
|
* column's values, not the full block's. */ |
|
508
|
524
|
|
|
|
|
|
SV *tmp = decode_column(aTHX_ &p, end, t, (SSize_t)nrows); |
|
509
|
515
|
|
|
|
|
|
SvREFCNT_dec(tmp); |
|
510
|
515
|
|
|
|
|
|
AV *placeholder = newAV(); |
|
511
|
515
|
50
|
|
|
|
|
if (nrows > 0) { |
|
512
|
515
|
|
|
|
|
|
av_extend(placeholder, nrows - 1); |
|
513
|
|
|
|
|
|
|
SSize_t r; |
|
514
|
50060
|
100
|
|
|
|
|
for (r = 0; r < (SSize_t)nrows; r++) |
|
515
|
49545
|
|
|
|
|
|
av_store(placeholder, r, newSV(0)); |
|
516
|
|
|
|
|
|
|
} |
|
517
|
515
|
|
|
|
|
|
values = newRV_noinc((SV *)placeholder); |
|
518
|
|
|
|
|
|
|
} |
|
519
|
|
|
|
|
|
|
/* Free this column's TypeInfo eagerly to avoid piling them up |
|
520
|
|
|
|
|
|
|
* on the save stack for wide blocks. */ |
|
521
|
1300
|
|
|
|
|
|
free_typeinfo(aTHX_ t); |
|
522
|
|
|
|
|
|
|
|
|
523
|
1300
|
|
|
|
|
|
HV *col_hv = newHV(); |
|
524
|
1300
|
|
|
|
|
|
(void)hv_stores(col_hv, "name", newSVpvn(name, name_len)); |
|
525
|
1300
|
|
|
|
|
|
(void)hv_stores(col_hv, "type", newSVpvn(tstr, tlen)); |
|
526
|
1300
|
|
|
|
|
|
(void)hv_stores(col_hv, "values", values); |
|
527
|
1300
|
100
|
|
|
|
|
if (!keep) (void)hv_stores(col_hv, "skipped", newSViv(1)); |
|
528
|
1300
|
|
|
|
|
|
av_store(cols, c, newRV_noinc((SV *)col_hv)); |
|
529
|
|
|
|
|
|
|
} |
|
530
|
|
|
|
|
|
|
|
|
531
|
190
|
|
|
|
|
|
HV *result = newHV(); |
|
532
|
190
|
|
|
|
|
|
(void)hv_stores(result, "ncols", newSVuv(ncols)); |
|
533
|
190
|
|
|
|
|
|
(void)hv_stores(result, "nrows", newSVuv(nrows)); |
|
534
|
|
|
|
|
|
|
/* Transfer ownership out of the mortal: bump refcount, then the |
|
535
|
|
|
|
|
|
|
* mortal-stack cleanup at scope exit drops one back, leaving the |
|
536
|
|
|
|
|
|
|
* net refcount at 1 (owned by `result`). */ |
|
537
|
190
|
|
|
|
|
|
(void)hv_stores(result, "columns", |
|
538
|
|
|
|
|
|
|
newRV_inc((SV *)cols)); |
|
539
|
190
|
|
|
|
|
|
(void)hv_stores(result, "consumed", newSVuv((UV)(p - start))); |
|
540
|
190
|
|
|
|
|
|
RETVAL = newRV_noinc((SV *)result); |
|
541
|
|
|
|
|
|
|
} |
|
542
|
|
|
|
|
|
|
OUTPUT: RETVAL |
|
543
|
|
|
|
|
|
|
|
|
544
|
|
|
|
|
|
|
# Row-oriented decoder: same wire walk as decode_block, but values are |
|
545
|
|
|
|
|
|
|
# distributed into row-major arrayrefs as each column is decoded, then |
|
546
|
|
|
|
|
|
|
# the per-column AV is freed. Peak memory holds one column's values |
|
547
|
|
|
|
|
|
|
# plus all row AVs (vs decode_rows-via-Perl which holds both |
|
548
|
|
|
|
|
|
|
# representations + does the transpose in Perl). |
|
549
|
|
|
|
|
|
|
SV * |
|
550
|
|
|
|
|
|
|
decode_block_rows(class, bytes, ...) |
|
551
|
|
|
|
|
|
|
SV *class |
|
552
|
|
|
|
|
|
|
SV *bytes |
|
553
|
|
|
|
|
|
|
CODE: |
|
554
|
|
|
|
|
|
|
{ |
|
555
|
|
|
|
|
|
|
PERL_UNUSED_VAR(class); |
|
556
|
6
|
|
|
|
|
|
UV start_offset = 0; |
|
557
|
6
|
100
|
|
|
|
|
if (items >= 3) { |
|
558
|
5
|
|
|
|
|
|
IV signed_off = SvIV(ST(2)); |
|
559
|
5
|
50
|
|
|
|
|
if (signed_off < 0) |
|
560
|
0
|
|
|
|
|
|
croak("decode_block_rows: offset must be non-negative " |
|
561
|
|
|
|
|
|
|
"(got %" IVdf ")", signed_off); |
|
562
|
5
|
|
|
|
|
|
start_offset = (UV)signed_off; |
|
563
|
|
|
|
|
|
|
} |
|
564
|
|
|
|
|
|
|
const unsigned char *start, *p, *end; |
|
565
|
|
|
|
|
|
|
UV ncols, nrows; |
|
566
|
6
|
|
|
|
|
|
decode_block_prologue(aTHX_ bytes, start_offset, "decode_block_rows", |
|
567
|
|
|
|
|
|
|
&start, &p, &end, &ncols, &nrows); |
|
568
|
|
|
|
|
|
|
|
|
569
|
|
|
|
|
|
|
/* Mortalize so a mid-loop croak unwinds and reclaims these |
|
570
|
|
|
|
|
|
|
* (and, via the row RVs they hold, all populated row AVs). |
|
571
|
|
|
|
|
|
|
* SvREFCNT_inc-via-newRV_inc transfers them to the result HV |
|
572
|
|
|
|
|
|
|
* on the success path. */ |
|
573
|
5
|
|
|
|
|
|
AV *names = (AV *)sv_2mortal((SV *)newAV()); |
|
574
|
5
|
|
|
|
|
|
AV *types = (AV *)sv_2mortal((SV *)newAV()); |
|
575
|
5
|
|
|
|
|
|
AV *rows = (AV *)sv_2mortal((SV *)newAV()); |
|
576
|
5
|
50
|
|
|
|
|
if (ncols > 0) { av_extend(names, ncols - 1); av_extend(types, ncols - 1); } |
|
577
|
5
|
50
|
|
|
|
|
if (nrows > 0) av_extend(rows, nrows - 1); |
|
578
|
|
|
|
|
|
|
|
|
579
|
|
|
|
|
|
|
/* Pre-create row AVs - we'll fill column c into row_av[c] as we |
|
580
|
|
|
|
|
|
|
* decode each column. Stash AV pointers for fast access. */ |
|
581
|
5
|
|
|
|
|
|
AV **row_avs = NULL; |
|
582
|
5
|
50
|
|
|
|
|
if (nrows > 0) { |
|
583
|
5
|
50
|
|
|
|
|
Newx(row_avs, nrows, AV *); |
|
584
|
5
|
|
|
|
|
|
SAVEFREEPV(row_avs); |
|
585
|
|
|
|
|
|
|
UV r; |
|
586
|
17
|
100
|
|
|
|
|
for (r = 0; r < nrows; r++) { |
|
587
|
12
|
|
|
|
|
|
AV *row_av = newAV(); |
|
588
|
12
|
50
|
|
|
|
|
if (ncols > 0) av_extend(row_av, ncols - 1); |
|
589
|
12
|
|
|
|
|
|
row_avs[r] = row_av; |
|
590
|
12
|
|
|
|
|
|
av_store(rows, r, newRV_noinc((SV *)row_av)); |
|
591
|
|
|
|
|
|
|
} |
|
592
|
|
|
|
|
|
|
} |
|
593
|
|
|
|
|
|
|
|
|
594
|
|
|
|
|
|
|
UV c; |
|
595
|
27
|
100
|
|
|
|
|
for (c = 0; c < ncols; c++) { |
|
596
|
|
|
|
|
|
|
const char *name; STRLEN name_len; |
|
597
|
|
|
|
|
|
|
const char *tstr; STRLEN tlen; |
|
598
|
22
|
|
|
|
|
|
dec_lenpfx_string(aTHX_ &p, end, &name, &name_len); |
|
599
|
22
|
|
|
|
|
|
dec_lenpfx_string(aTHX_ &p, end, &tstr, &tlen); |
|
600
|
22
|
|
|
|
|
|
TypeInfo *t = parse_type(aTHX_ tstr, tlen); |
|
601
|
22
|
|
|
|
|
|
SV *col_rv = decode_column(aTHX_ &p, end, t, (SSize_t)nrows); |
|
602
|
22
|
|
|
|
|
|
free_typeinfo(aTHX_ t); |
|
603
|
|
|
|
|
|
|
|
|
604
|
22
|
|
|
|
|
|
AV *col_av = (AV *)SvRV(col_rv); |
|
605
|
|
|
|
|
|
|
UV r; |
|
606
|
69
|
100
|
|
|
|
|
for (r = 0; r < nrows; r++) { |
|
607
|
47
|
|
|
|
|
|
SV **e = av_fetch(col_av, r, 0); |
|
608
|
47
|
50
|
|
|
|
|
av_store(row_avs[r], c, e ? SvREFCNT_inc(*e) : newSV(0)); |
|
609
|
|
|
|
|
|
|
} |
|
610
|
|
|
|
|
|
|
/* Free the column AV eagerly; we no longer need it. */ |
|
611
|
22
|
|
|
|
|
|
SvREFCNT_dec(col_rv); |
|
612
|
|
|
|
|
|
|
|
|
613
|
22
|
|
|
|
|
|
av_store(names, c, newSVpvn(name, name_len)); |
|
614
|
22
|
|
|
|
|
|
av_store(types, c, newSVpvn(tstr, tlen)); |
|
615
|
|
|
|
|
|
|
} |
|
616
|
|
|
|
|
|
|
|
|
617
|
5
|
|
|
|
|
|
HV *result = newHV(); |
|
618
|
5
|
|
|
|
|
|
(void)hv_stores(result, "ncols", newSVuv(ncols)); |
|
619
|
5
|
|
|
|
|
|
(void)hv_stores(result, "nrows", newSVuv(nrows)); |
|
620
|
|
|
|
|
|
|
/* Transfer ownership out of the mortal stack: newRV_inc bumps |
|
621
|
|
|
|
|
|
|
* the AV refcount so when the mortal cleanup drops one, the net |
|
622
|
|
|
|
|
|
|
* count stays at 1 (owned by `result`). */ |
|
623
|
5
|
|
|
|
|
|
(void)hv_stores(result, "names", newRV_inc((SV *)names)); |
|
624
|
5
|
|
|
|
|
|
(void)hv_stores(result, "types", newRV_inc((SV *)types)); |
|
625
|
5
|
|
|
|
|
|
(void)hv_stores(result, "rows", newRV_inc((SV *)rows)); |
|
626
|
5
|
|
|
|
|
|
(void)hv_stores(result, "consumed", newSVuv((UV)(p - start))); |
|
627
|
5
|
|
|
|
|
|
RETVAL = newRV_noinc((SV *)result); |
|
628
|
|
|
|
|
|
|
} |
|
629
|
|
|
|
|
|
|
OUTPUT: RETVAL |
|
630
|
|
|
|
|
|
|
|
|
631
|
|
|
|
|
|
|
# CityHash128 in the "cityhash102" variant used by ClickHouse's |
|
632
|
|
|
|
|
|
|
# compressed-block prefix. Returns a 16-byte string (8 bytes low, |
|
633
|
|
|
|
|
|
|
# then 8 bytes high; matches the wire layout CH expects). Exposed |
|
634
|
|
|
|
|
|
|
# so compress_native_block can default its `hasher` to this XSUB |
|
635
|
|
|
|
|
|
|
# instead of forcing every caller to supply one. |
|
636
|
|
|
|
|
|
|
SV * |
|
637
|
|
|
|
|
|
|
_cityhash128(class_or_bytes, ...) |
|
638
|
|
|
|
|
|
|
SV *class_or_bytes |
|
639
|
|
|
|
|
|
|
CODE: |
|
640
|
|
|
|
|
|
|
{ |
|
641
|
|
|
|
|
|
|
SV *input_sv; |
|
642
|
|
|
|
|
|
|
/* Accept both class-method ($class->_cityhash128($bytes)) and |
|
643
|
|
|
|
|
|
|
* function-style (\&_cityhash128 passed as a hasher coderef) |
|
644
|
|
|
|
|
|
|
* call shapes. A class-method call has items >= 2 and the first |
|
645
|
|
|
|
|
|
|
* arg is a non-reference string (the class name); a coderef call |
|
646
|
|
|
|
|
|
|
* has items == 1 and the first arg IS the bytes. */ |
|
647
|
40
|
50
|
|
|
|
|
if (items >= 2 && SvPOK(class_or_bytes) && !SvROK(class_or_bytes)) { |
|
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
648
|
0
|
|
|
|
|
|
input_sv = ST(1); |
|
649
|
|
|
|
|
|
|
} else { |
|
650
|
40
|
|
|
|
|
|
input_sv = class_or_bytes; |
|
651
|
|
|
|
|
|
|
} |
|
652
|
|
|
|
|
|
|
STRLEN len; |
|
653
|
40
|
|
|
|
|
|
const char *s = SvPVbyte(input_sv, len); |
|
654
|
|
|
|
|
|
|
unsigned char out[16]; |
|
655
|
40
|
|
|
|
|
|
cityhash128_v102(s, (size_t)len, out); |
|
656
|
40
|
|
|
|
|
|
RETVAL = newSVpvn((const char *)out, 16); |
|
657
|
|
|
|
|
|
|
} |
|
658
|
|
|
|
|
|
|
OUTPUT: RETVAL |
|
659
|
|
|
|
|
|
|
|
|
660
|
|
|
|
|
|
|
MODULE = ClickHouse::Encoder PACKAGE = ClickHouse::Encoder::Streamer |
|
661
|
|
|
|
|
|
|
|
|
662
|
|
|
|
|
|
|
void |
|
663
|
|
|
|
|
|
|
push_row(self, row) |
|
664
|
|
|
|
|
|
|
SV *self |
|
665
|
|
|
|
|
|
|
SV *row |
|
666
|
|
|
|
|
|
|
CODE: |
|
667
|
|
|
|
|
|
|
{ |
|
668
|
|
|
|
|
|
|
Streamer *s; |
|
669
|
98
|
50
|
|
|
|
|
if (!sv_isobject(self)) croak("Not an object"); |
|
670
|
98
|
|
|
|
|
|
s = INT2PTR(Streamer *, SvIV(SvRV(self))); |
|
671
|
98
|
|
|
|
|
|
av_push(s->buffer, SvREFCNT_inc(row)); |
|
672
|
98
|
100
|
|
|
|
|
if (av_len(s->buffer) + 1 >= s->batch_size) |
|
673
|
16
|
|
|
|
|
|
streamer_flush(aTHX_ s); |
|
674
|
|
|
|
|
|
|
} |
|
675
|
|
|
|
|
|
|
|
|
676
|
|
|
|
|
|
|
void |
|
677
|
|
|
|
|
|
|
finish(self) |
|
678
|
|
|
|
|
|
|
SV *self |
|
679
|
|
|
|
|
|
|
CODE: |
|
680
|
|
|
|
|
|
|
{ |
|
681
|
|
|
|
|
|
|
Streamer *s; |
|
682
|
18
|
50
|
|
|
|
|
if (!sv_isobject(self)) croak("Not an object"); |
|
683
|
18
|
|
|
|
|
|
s = INT2PTR(Streamer *, SvIV(SvRV(self))); |
|
684
|
18
|
|
|
|
|
|
streamer_flush(aTHX_ s); |
|
685
|
|
|
|
|
|
|
} |
|
686
|
|
|
|
|
|
|
|
|
687
|
|
|
|
|
|
|
void |
|
688
|
|
|
|
|
|
|
reset(self) |
|
689
|
|
|
|
|
|
|
SV *self |
|
690
|
|
|
|
|
|
|
CODE: |
|
691
|
|
|
|
|
|
|
{ |
|
692
|
|
|
|
|
|
|
/* Discard buffered rows without flushing -- useful for error recovery |
|
693
|
|
|
|
|
|
|
* after an upstream failure when the in-flight batch should be dropped. */ |
|
694
|
|
|
|
|
|
|
Streamer *s; |
|
695
|
3
|
50
|
|
|
|
|
if (!sv_isobject(self)) croak("Not an object"); |
|
696
|
3
|
|
|
|
|
|
s = INT2PTR(Streamer *, SvIV(SvRV(self))); |
|
697
|
3
|
50
|
|
|
|
|
if (av_len(s->buffer) >= 0) av_clear(s->buffer); |
|
698
|
|
|
|
|
|
|
} |
|
699
|
|
|
|
|
|
|
|
|
700
|
|
|
|
|
|
|
UV |
|
701
|
|
|
|
|
|
|
buffered_count(self) |
|
702
|
|
|
|
|
|
|
SV *self |
|
703
|
|
|
|
|
|
|
CODE: |
|
704
|
|
|
|
|
|
|
{ |
|
705
|
|
|
|
|
|
|
Streamer *s; |
|
706
|
10
|
50
|
|
|
|
|
if (!sv_isobject(self)) croak("Not an object"); |
|
707
|
10
|
|
|
|
|
|
s = INT2PTR(Streamer *, SvIV(SvRV(self))); |
|
708
|
10
|
50
|
|
|
|
|
RETVAL = (UV)(av_len(s->buffer) + 1); |
|
709
|
|
|
|
|
|
|
} |
|
710
|
|
|
|
|
|
|
OUTPUT: |
|
711
|
|
|
|
|
|
|
RETVAL |
|
712
|
|
|
|
|
|
|
|
|
713
|
|
|
|
|
|
|
bool |
|
714
|
|
|
|
|
|
|
is_empty(self) |
|
715
|
|
|
|
|
|
|
SV *self |
|
716
|
|
|
|
|
|
|
CODE: |
|
717
|
|
|
|
|
|
|
{ |
|
718
|
|
|
|
|
|
|
Streamer *s; |
|
719
|
6
|
50
|
|
|
|
|
if (!sv_isobject(self)) croak("Not an object"); |
|
720
|
6
|
|
|
|
|
|
s = INT2PTR(Streamer *, SvIV(SvRV(self))); |
|
721
|
6
|
100
|
|
|
|
|
RETVAL = (av_len(s->buffer) < 0); |
|
722
|
|
|
|
|
|
|
} |
|
723
|
|
|
|
|
|
|
OUTPUT: |
|
724
|
|
|
|
|
|
|
RETVAL |
|
725
|
|
|
|
|
|
|
|
|
726
|
|
|
|
|
|
|
void |
|
727
|
|
|
|
|
|
|
DESTROY(self) |
|
728
|
|
|
|
|
|
|
SV *self |
|
729
|
|
|
|
|
|
|
CODE: |
|
730
|
|
|
|
|
|
|
{ |
|
731
|
|
|
|
|
|
|
Streamer *s; |
|
732
|
19
|
50
|
|
|
|
|
if (!sv_isobject(self)) return; |
|
733
|
19
|
|
|
|
|
|
s = INT2PTR(Streamer *, SvIV(SvRV(self))); |
|
734
|
19
|
|
|
|
|
|
free_streamer(aTHX_ s); |
|
735
|
|
|
|
|
|
|
} |
|
736
|
|
|
|
|
|
|
|
|
737
|
|
|
|
|
|
|
MODULE = ClickHouse::Encoder PACKAGE = ClickHouse::Encoder::TCP |
|
738
|
|
|
|
|
|
|
|
|
739
|
|
|
|
|
|
|
# Pack an unsigned varint (LEB128 / CH varuint form). Used by the |
|
740
|
|
|
|
|
|
|
# protocol-packet builders in ClickHouse::Encoder::TCP. Equivalent |
|
741
|
|
|
|
|
|
|
# to the buf_varint() internal helper but exposed as a Perl callable |
|
742
|
|
|
|
|
|
|
# so the pure-Perl module can produce wire-format bytes via XS rather |
|
743
|
|
|
|
|
|
|
# than a per-byte chr/shift loop. |
|
744
|
|
|
|
|
|
|
SV * |
|
745
|
|
|
|
|
|
|
pack_varint(v) |
|
746
|
|
|
|
|
|
|
UV v |
|
747
|
|
|
|
|
|
|
CODE: |
|
748
|
|
|
|
|
|
|
{ |
|
749
|
|
|
|
|
|
|
char buf[10]; |
|
750
|
122
|
|
|
|
|
|
int i = 0; |
|
751
|
154
|
100
|
|
|
|
|
while (v >= 0x80) { |
|
752
|
32
|
|
|
|
|
|
buf[i++] = (char)((v & 0x7f) | 0x80); |
|
753
|
32
|
|
|
|
|
|
v >>= 7; |
|
754
|
|
|
|
|
|
|
} |
|
755
|
122
|
|
|
|
|
|
buf[i++] = (char)v; |
|
756
|
122
|
|
|
|
|
|
RETVAL = newSVpvn(buf, i); |
|
757
|
|
|
|
|
|
|
} |
|
758
|
|
|
|
|
|
|
OUTPUT: |
|
759
|
|
|
|
|
|
|
RETVAL |
|
760
|
|
|
|
|
|
|
|
|
761
|
|
|
|
|
|
|
# Unpack one varint from a byte string starting at the given offset. |
|
762
|
|
|
|
|
|
|
# Returns (value, new_offset). Croaks on truncated input or a varint |
|
763
|
|
|
|
|
|
|
# wider than 64 bits. |
|
764
|
|
|
|
|
|
|
void |
|
765
|
|
|
|
|
|
|
unpack_varint(bytes, offset) |
|
766
|
|
|
|
|
|
|
SV *bytes |
|
767
|
|
|
|
|
|
|
UV offset |
|
768
|
|
|
|
|
|
|
PPCODE: |
|
769
|
|
|
|
|
|
|
{ |
|
770
|
|
|
|
|
|
|
STRLEN len; |
|
771
|
67
|
|
|
|
|
|
const unsigned char *p = (const unsigned char *)SvPVbyte(bytes, len); |
|
772
|
67
|
|
|
|
|
|
UV v = tcp_read_varint(aTHX_ p, (UV)len, &offset); |
|
773
|
62
|
50
|
|
|
|
|
EXTEND(SP, 2); |
|
774
|
62
|
|
|
|
|
|
PUSHs(sv_2mortal(newSVuv(v))); |
|
775
|
62
|
|
|
|
|
|
PUSHs(sv_2mortal(newSVuv(offset))); |
|
776
|
|
|
|
|
|
|
} |
|
777
|
|
|
|
|
|
|
|
|
778
|
|
|
|
|
|
|
# Pack a length-prefixed string (varint length + UTF-8 bytes). The |
|
779
|
|
|
|
|
|
|
# caller should pass Perl strings; we encode to UTF-8 explicitly so |
|
780
|
|
|
|
|
|
|
# non-ASCII chars get the right byte length on the wire. |
|
781
|
|
|
|
|
|
|
SV * |
|
782
|
|
|
|
|
|
|
pack_string(s) |
|
783
|
|
|
|
|
|
|
SV *s |
|
784
|
|
|
|
|
|
|
CODE: |
|
785
|
|
|
|
|
|
|
{ |
|
786
|
|
|
|
|
|
|
STRLEN len; |
|
787
|
|
|
|
|
|
|
const char *p; |
|
788
|
205
|
50
|
|
|
|
|
if (!SvOK(s)) { |
|
789
|
0
|
|
|
|
|
|
p = ""; len = 0; |
|
790
|
|
|
|
|
|
|
} else { |
|
791
|
|
|
|
|
|
|
/* SvPVutf8 returns the UTF-8 byte form */ |
|
792
|
205
|
|
|
|
|
|
p = SvPVutf8(s, len); |
|
793
|
|
|
|
|
|
|
} |
|
794
|
|
|
|
|
|
|
/* Build prefix + bytes. */ |
|
795
|
|
|
|
|
|
|
char prefix[10]; |
|
796
|
205
|
|
|
|
|
|
int pi = 0; |
|
797
|
205
|
|
|
|
|
|
UV vv = (UV)len; |
|
798
|
205
|
50
|
|
|
|
|
while (vv >= 0x80) { |
|
799
|
0
|
|
|
|
|
|
prefix[pi++] = (char)((vv & 0x7f) | 0x80); |
|
800
|
0
|
|
|
|
|
|
vv >>= 7; |
|
801
|
|
|
|
|
|
|
} |
|
802
|
205
|
|
|
|
|
|
prefix[pi++] = (char)vv; |
|
803
|
205
|
|
|
|
|
|
RETVAL = newSVpvn(prefix, pi); |
|
804
|
205
|
|
|
|
|
|
sv_catpvn(RETVAL, p, len); |
|
805
|
|
|
|
|
|
|
} |
|
806
|
|
|
|
|
|
|
OUTPUT: |
|
807
|
|
|
|
|
|
|
RETVAL |
|
808
|
|
|
|
|
|
|
|
|
809
|
|
|
|
|
|
|
# Unpack a length-prefixed string at the given offset. Returns (string, |
|
810
|
|
|
|
|
|
|
# new_offset). String bytes are returned without decoding (caller |
|
811
|
|
|
|
|
|
|
# decides UTF-8 vs binary). |
|
812
|
|
|
|
|
|
|
void |
|
813
|
|
|
|
|
|
|
unpack_string(bytes, offset) |
|
814
|
|
|
|
|
|
|
SV *bytes |
|
815
|
|
|
|
|
|
|
UV offset |
|
816
|
|
|
|
|
|
|
PPCODE: |
|
817
|
|
|
|
|
|
|
{ |
|
818
|
|
|
|
|
|
|
STRLEN buf_len; |
|
819
|
19
|
|
|
|
|
|
const unsigned char *p = (const unsigned char *)SvPVbyte(bytes, buf_len); |
|
820
|
19
|
|
|
|
|
|
UV slen = tcp_read_varint(aTHX_ p, (UV)buf_len, &offset); |
|
821
|
|
|
|
|
|
|
/* Subtraction form: addition (offset + slen) can wrap UV_MAX when |
|
822
|
|
|
|
|
|
|
* slen is attacker-controlled via a crafted varint. tcp_read_varint |
|
823
|
|
|
|
|
|
|
* guarantees offset <= buf_len on return, so buf_len - offset is |
|
824
|
|
|
|
|
|
|
* safe. */ |
|
825
|
18
|
100
|
|
|
|
|
if (slen > (UV)buf_len - offset) |
|
826
|
1
|
|
|
|
|
|
croak("string: truncated at offset %lu (need %lu)", |
|
827
|
|
|
|
|
|
|
(unsigned long)offset, (unsigned long)slen); |
|
828
|
17
|
|
|
|
|
|
SV *out = newSVpvn((const char *)(p + offset), (STRLEN)slen); |
|
829
|
17
|
50
|
|
|
|
|
EXTEND(SP, 2); |
|
830
|
17
|
|
|
|
|
|
PUSHs(sv_2mortal(out)); |
|
831
|
17
|
|
|
|
|
|
PUSHs(sv_2mortal(newSVuv(offset + slen))); |
|
832
|
|
|
|
|
|
|
} |
|
833
|
|
|
|
|
|
|
|