| line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
|
1
|
|
|
|
|
|
|
/* --- Native protocol response parser --- */ |
|
2
|
|
|
|
|
|
|
|
|
3
|
|
|
|
|
|
|
/* |
|
4
|
|
|
|
|
|
|
* Skip block info fields (revision >= DBMS_MIN_REVISION_WITH_BLOCK_INFO). |
|
5
|
|
|
|
|
|
|
* Returns 1 on success, 0 if need more data. |
|
6
|
|
|
|
|
|
|
*/ |
|
7
|
0
|
|
|
|
|
|
static int skip_block_info(const char *buf, size_t len, size_t *pos) { |
|
8
|
0
|
|
|
|
|
|
for (;;) { |
|
9
|
|
|
|
|
|
|
uint64_t field_num; |
|
10
|
0
|
|
|
|
|
|
int rc = read_varuint(buf, len, pos, &field_num); |
|
11
|
0
|
0
|
|
|
|
|
if (rc == 0) return 0; |
|
12
|
0
|
0
|
|
|
|
|
if (rc < 0) return -1; |
|
13
|
0
|
0
|
|
|
|
|
if (field_num == 0) return 1; /* end marker */ |
|
14
|
0
|
0
|
|
|
|
|
if (field_num == 1) { |
|
15
|
|
|
|
|
|
|
/* is_overflows: UInt8 */ |
|
16
|
|
|
|
|
|
|
uint8_t dummy; |
|
17
|
0
|
|
|
|
|
|
rc = read_u8(buf, len, pos, &dummy); |
|
18
|
0
|
0
|
|
|
|
|
if (rc <= 0) return rc; |
|
19
|
0
|
0
|
|
|
|
|
} else if (field_num == 2) { |
|
20
|
|
|
|
|
|
|
/* bucket_num: Int32 */ |
|
21
|
|
|
|
|
|
|
int32_t dummy; |
|
22
|
0
|
|
|
|
|
|
rc = read_i32(buf, len, pos, &dummy); |
|
23
|
0
|
0
|
|
|
|
|
if (rc <= 0) return rc; |
|
24
|
|
|
|
|
|
|
} else { |
|
25
|
0
|
|
|
|
|
|
return -1; /* protocol error */ |
|
26
|
|
|
|
|
|
|
} |
|
27
|
|
|
|
|
|
|
} |
|
28
|
|
|
|
|
|
|
} |
|
29
|
|
|
|
|
|
|
|
|
30
|
|
|
|
|
|
|
/* |
|
31
|
|
|
|
|
|
|
* Try to parse one server packet from recv_buf. |
|
32
|
|
|
|
|
|
|
* Returns: |
|
33
|
|
|
|
|
|
|
* 1 = packet consumed, continue reading |
|
34
|
|
|
|
|
|
|
* 0 = need more data |
|
35
|
|
|
|
|
|
|
* -1 = error (message in *errmsg, caller must Safefree) |
|
36
|
|
|
|
|
|
|
* 2 = EndOfStream |
|
37
|
|
|
|
|
|
|
* 3 = Pong |
|
38
|
|
|
|
|
|
|
* 4 = Hello parsed (self->server_* fields populated) |
|
39
|
|
|
|
|
|
|
*/ |
|
40
|
|
|
|
|
|
|
|
|
41
|
|
|
|
|
|
|
/* Parse a Data block (table_name + optional-LZ4-chain + block_info + columns) |
|
42
|
|
|
|
|
|
|
* and decode-and-discard each column. Used by SERVER_LOG and SERVER_PROFILE_EVENTS, |
|
43
|
|
|
|
|
|
|
* which carry a block in the same wire format as SERVER_DATA but whose contents |
|
44
|
|
|
|
|
|
|
* the client does not surface to Perl. |
|
45
|
|
|
|
|
|
|
* |
|
46
|
|
|
|
|
|
|
* `lz4_optional` (used by PROFILE_EVENTS only): if set, fall back to parsing |
|
47
|
|
|
|
|
|
|
* the body uncompressed when LZ4 hard-fails — older servers occasionally send |
|
48
|
|
|
|
|
|
|
* profile_events uncompressed even on a compressed connection. |
|
49
|
|
|
|
|
|
|
* |
|
50
|
|
|
|
|
|
|
* Returns 1 on success (advances *outer_pos past the consumed bytes), |
|
51
|
|
|
|
|
|
|
* 0 on need-more-data, |
|
52
|
|
|
|
|
|
|
* -1 on hard error (sets *errmsg). */ |
|
53
|
0
|
|
|
|
|
|
static int parse_and_discard_block(ev_clickhouse_t *self, |
|
54
|
|
|
|
|
|
|
const char *buf, size_t len, size_t *outer_pos, |
|
55
|
|
|
|
|
|
|
const char *kind, int lz4_optional, |
|
56
|
|
|
|
|
|
|
char **errmsg) { |
|
57
|
0
|
|
|
|
|
|
size_t pos = *outer_pos; |
|
58
|
|
|
|
|
|
|
int rc; |
|
59
|
0
|
|
|
|
|
|
char *decompressed = NULL; |
|
60
|
|
|
|
|
|
|
const char *bbuf; |
|
61
|
|
|
|
|
|
|
size_t blen, bpos; |
|
62
|
|
|
|
|
|
|
char errbuf[64]; |
|
63
|
|
|
|
|
|
|
|
|
64
|
0
|
|
|
|
|
|
rc = skip_native_string(buf, len, &pos); |
|
65
|
0
|
0
|
|
|
|
|
if (rc == 0) return 0; |
|
66
|
0
|
0
|
|
|
|
|
if (rc < 0) { |
|
67
|
0
|
|
|
|
|
|
snprintf(errbuf, sizeof(errbuf), "malformed %s block", kind); |
|
68
|
0
|
|
|
|
|
|
*errmsg = safe_strdup(errbuf); |
|
69
|
0
|
|
|
|
|
|
return -1; |
|
70
|
|
|
|
|
|
|
} |
|
71
|
|
|
|
|
|
|
|
|
72
|
|
|
|
|
|
|
#ifdef HAVE_LZ4 |
|
73
|
|
|
|
|
|
|
if (self->compress) { |
|
74
|
|
|
|
|
|
|
int need_more = 0; |
|
75
|
|
|
|
|
|
|
const char *lz4_err = NULL; |
|
76
|
|
|
|
|
|
|
decompressed = ch_lz4_decompress_chain(buf, len, &pos, &blen, |
|
77
|
|
|
|
|
|
|
&need_more, &lz4_err); |
|
78
|
|
|
|
|
|
|
if (!decompressed) { |
|
79
|
|
|
|
|
|
|
if (need_more) return 0; |
|
80
|
|
|
|
|
|
|
if (!lz4_optional) { |
|
81
|
|
|
|
|
|
|
snprintf(errbuf, sizeof(errbuf), "%s: LZ4 decompression failed", kind); |
|
82
|
|
|
|
|
|
|
*errmsg = safe_strdup(lz4_err ? lz4_err : errbuf); |
|
83
|
|
|
|
|
|
|
return -1; |
|
84
|
|
|
|
|
|
|
} |
|
85
|
|
|
|
|
|
|
/* fall through to uncompressed parsing */ |
|
86
|
|
|
|
|
|
|
} |
|
87
|
|
|
|
|
|
|
} |
|
88
|
|
|
|
|
|
|
if (decompressed) { |
|
89
|
|
|
|
|
|
|
bbuf = decompressed; |
|
90
|
|
|
|
|
|
|
bpos = 0; |
|
91
|
|
|
|
|
|
|
} else |
|
92
|
|
|
|
|
|
|
#endif |
|
93
|
|
|
|
|
|
|
{ |
|
94
|
0
|
|
|
|
|
|
bbuf = buf; |
|
95
|
0
|
|
|
|
|
|
blen = len; |
|
96
|
0
|
|
|
|
|
|
bpos = pos; |
|
97
|
|
|
|
|
|
|
} |
|
98
|
|
|
|
|
|
|
|
|
99
|
|
|
|
|
|
|
#define _BAIL(rc_val) do { \ |
|
100
|
|
|
|
|
|
|
if (decompressed) Safefree(decompressed); \ |
|
101
|
|
|
|
|
|
|
if (rc_val < 0) { snprintf(errbuf, sizeof(errbuf), "malformed %s block", kind); *errmsg = safe_strdup(errbuf); } \ |
|
102
|
|
|
|
|
|
|
return rc_val; \ |
|
103
|
|
|
|
|
|
|
} while (0) |
|
104
|
|
|
|
|
|
|
|
|
105
|
0
|
0
|
|
|
|
|
if (self->server_revision >= DBMS_MIN_REVISION_WITH_BLOCK_INFO) { |
|
106
|
0
|
|
|
|
|
|
rc = skip_block_info(bbuf, blen, &bpos); |
|
107
|
0
|
0
|
|
|
|
|
if (rc <= 0) _BAIL(rc); |
|
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
108
|
|
|
|
|
|
|
} |
|
109
|
|
|
|
|
|
|
{ |
|
110
|
|
|
|
|
|
|
uint64_t nc, nr, c; |
|
111
|
0
|
|
|
|
|
|
rc = read_varuint(bbuf, blen, &bpos, &nc); |
|
112
|
0
|
0
|
|
|
|
|
if (rc <= 0) _BAIL(rc); |
|
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
113
|
0
|
|
|
|
|
|
rc = read_varuint(bbuf, blen, &bpos, &nr); |
|
114
|
0
|
0
|
|
|
|
|
if (rc <= 0) _BAIL(rc); |
|
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
115
|
|
|
|
|
|
|
|
|
116
|
0
|
0
|
|
|
|
|
for (c = 0; c < nc; c++) { |
|
117
|
|
|
|
|
|
|
const char *ctype; |
|
118
|
|
|
|
|
|
|
size_t ctype_len; |
|
119
|
0
|
|
|
|
|
|
rc = skip_native_string(bbuf, blen, &bpos); |
|
120
|
0
|
0
|
|
|
|
|
if (rc <= 0) _BAIL(rc); |
|
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
121
|
0
|
|
|
|
|
|
rc = read_native_string_ref(bbuf, blen, &bpos, &ctype, &ctype_len); |
|
122
|
0
|
0
|
|
|
|
|
if (rc <= 0) _BAIL(rc); |
|
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
123
|
|
|
|
|
|
|
/* custom serialization flag (revision >= 54446) */ |
|
124
|
0
|
0
|
|
|
|
|
if (bpos >= blen) _BAIL(0); |
|
|
|
0
|
|
|
|
|
|
|
125
|
0
|
0
|
|
|
|
|
if ((uint8_t)bbuf[bpos]) { |
|
126
|
0
|
0
|
|
|
|
|
if (decompressed) Safefree(decompressed); |
|
127
|
0
|
|
|
|
|
|
*errmsg = safe_strdup("custom serialization not supported"); |
|
128
|
0
|
|
|
|
|
|
return -1; |
|
129
|
|
|
|
|
|
|
} |
|
130
|
0
|
|
|
|
|
|
bpos++; |
|
131
|
0
|
0
|
|
|
|
|
if (nr > 0) { |
|
132
|
0
|
|
|
|
|
|
col_type_t *ct = parse_col_type(ctype, ctype_len); |
|
133
|
0
|
|
|
|
|
|
int col_err = 0; |
|
134
|
0
|
|
|
|
|
|
SV **vals = decode_column(bbuf, blen, &bpos, nr, ct, &col_err, 0); |
|
135
|
0
|
0
|
|
|
|
|
if (!vals) { |
|
136
|
0
|
|
|
|
|
|
free_col_type(ct); |
|
137
|
0
|
0
|
|
|
|
|
if (col_err || decompressed) { |
|
|
|
0
|
|
|
|
|
|
|
138
|
0
|
0
|
|
|
|
|
if (decompressed) Safefree(decompressed); |
|
139
|
0
|
|
|
|
|
|
snprintf(errbuf, sizeof(errbuf), "malformed %s block", kind); |
|
140
|
0
|
|
|
|
|
|
*errmsg = safe_strdup(errbuf); |
|
141
|
0
|
|
|
|
|
|
return -1; |
|
142
|
|
|
|
|
|
|
} |
|
143
|
0
|
|
|
|
|
|
return 0; |
|
144
|
|
|
|
|
|
|
} |
|
145
|
|
|
|
|
|
|
{ |
|
146
|
|
|
|
|
|
|
uint64_t j; |
|
147
|
0
|
0
|
|
|
|
|
for (j = 0; j < nr; j++) SvREFCNT_dec(vals[j]); |
|
148
|
|
|
|
|
|
|
} |
|
149
|
0
|
|
|
|
|
|
Safefree(vals); |
|
150
|
0
|
|
|
|
|
|
free_col_type(ct); |
|
151
|
|
|
|
|
|
|
} |
|
152
|
|
|
|
|
|
|
} |
|
153
|
|
|
|
|
|
|
} |
|
154
|
|
|
|
|
|
|
|
|
155
|
|
|
|
|
|
|
#undef _BAIL |
|
156
|
|
|
|
|
|
|
|
|
157
|
0
|
0
|
|
|
|
|
if (!decompressed) pos = bpos; |
|
158
|
0
|
0
|
|
|
|
|
if (decompressed) Safefree(decompressed); |
|
159
|
0
|
|
|
|
|
|
*outer_pos = pos; |
|
160
|
0
|
|
|
|
|
|
return 1; |
|
161
|
|
|
|
|
|
|
} |
|
162
|
|
|
|
|
|
|
|
|
163
|
|
|
|
|
|
|
/* Like parse_and_discard_block, but for each row of the block invokes |
|
164
|
|
|
|
|
|
|
* `cb` with one hashref keyed by column name. Used by on_log. |
|
165
|
|
|
|
|
|
|
* Caller has already verified cb is non-NULL. */ |
|
166
|
0
|
|
|
|
|
|
static int parse_and_emit_log_block(ev_clickhouse_t *self, |
|
167
|
|
|
|
|
|
|
const char *buf, size_t len, size_t *outer_pos, |
|
168
|
|
|
|
|
|
|
SV *cb, char **errmsg) { |
|
169
|
0
|
|
|
|
|
|
size_t pos = *outer_pos; |
|
170
|
|
|
|
|
|
|
int rc; |
|
171
|
0
|
|
|
|
|
|
char *decompressed = NULL; |
|
172
|
|
|
|
|
|
|
const char *bbuf; |
|
173
|
|
|
|
|
|
|
size_t blen, bpos; |
|
174
|
|
|
|
|
|
|
|
|
175
|
0
|
|
|
|
|
|
rc = skip_native_string(buf, len, &pos); |
|
176
|
0
|
0
|
|
|
|
|
if (rc == 0) return 0; |
|
177
|
0
|
0
|
|
|
|
|
if (rc < 0) { *errmsg = safe_strdup("malformed log block"); return -1; } |
|
178
|
|
|
|
|
|
|
|
|
179
|
|
|
|
|
|
|
#ifdef HAVE_LZ4 |
|
180
|
|
|
|
|
|
|
if (self->compress) { |
|
181
|
|
|
|
|
|
|
int need_more = 0; |
|
182
|
|
|
|
|
|
|
const char *lz4_err = NULL; |
|
183
|
|
|
|
|
|
|
decompressed = ch_lz4_decompress_chain(buf, len, &pos, &blen, |
|
184
|
|
|
|
|
|
|
&need_more, &lz4_err); |
|
185
|
|
|
|
|
|
|
if (!decompressed) { |
|
186
|
|
|
|
|
|
|
if (need_more) return 0; |
|
187
|
|
|
|
|
|
|
/* server log frames are not always compressed even with |
|
188
|
|
|
|
|
|
|
* compress=1 negotiated — fall through to raw parsing. */ |
|
189
|
|
|
|
|
|
|
} |
|
190
|
|
|
|
|
|
|
} |
|
191
|
|
|
|
|
|
|
if (decompressed) { bbuf = decompressed; bpos = 0; } |
|
192
|
|
|
|
|
|
|
else |
|
193
|
|
|
|
|
|
|
#endif |
|
194
|
0
|
|
|
|
|
|
{ bbuf = buf; blen = len; bpos = pos; } |
|
195
|
|
|
|
|
|
|
|
|
196
|
|
|
|
|
|
|
#define _BAIL_LOG(rc_val) do { \ |
|
197
|
|
|
|
|
|
|
if (decompressed) Safefree(decompressed); \ |
|
198
|
|
|
|
|
|
|
if (rc_val < 0) { *errmsg = safe_strdup("malformed log block"); } \ |
|
199
|
|
|
|
|
|
|
return rc_val; \ |
|
200
|
|
|
|
|
|
|
} while (0) |
|
201
|
|
|
|
|
|
|
|
|
202
|
0
|
0
|
|
|
|
|
if (self->server_revision >= DBMS_MIN_REVISION_WITH_BLOCK_INFO) { |
|
203
|
0
|
|
|
|
|
|
rc = skip_block_info(bbuf, blen, &bpos); |
|
204
|
0
|
0
|
|
|
|
|
if (rc <= 0) _BAIL_LOG(rc); |
|
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
205
|
|
|
|
|
|
|
} |
|
206
|
|
|
|
|
|
|
uint64_t nc, nr; |
|
207
|
0
|
|
|
|
|
|
rc = read_varuint(bbuf, blen, &bpos, &nc); |
|
208
|
0
|
0
|
|
|
|
|
if (rc <= 0) _BAIL_LOG(rc); |
|
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
209
|
0
|
|
|
|
|
|
rc = read_varuint(bbuf, blen, &bpos, &nr); |
|
210
|
0
|
0
|
|
|
|
|
if (rc <= 0) _BAIL_LOG(rc); |
|
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
211
|
|
|
|
|
|
|
|
|
212
|
|
|
|
|
|
|
/* Collect column name + decoded values, then assemble per-row HVs. */ |
|
213
|
0
|
|
|
|
|
|
char **names = NULL; |
|
214
|
0
|
|
|
|
|
|
SV ***data = NULL; /* data[col][row] */ |
|
215
|
0
|
0
|
|
|
|
|
if (nc > 0) { |
|
216
|
0
|
0
|
|
|
|
|
Newxz(names, nc, char *); |
|
217
|
0
|
0
|
|
|
|
|
Newxz(data, nc, SV **); |
|
218
|
|
|
|
|
|
|
} |
|
219
|
|
|
|
|
|
|
/* err_seen: -1 = malformed, 0 = success / need-more, +1 = all columns |
|
220
|
|
|
|
|
|
|
* parsed cleanly. We distinguish "loop completed all nc columns" from |
|
221
|
|
|
|
|
|
|
* "loop broke early needing more data" via the explicit flag rather |
|
222
|
|
|
|
|
|
|
* than inspecting bpos, since bpos always advances past the header. */ |
|
223
|
0
|
|
|
|
|
|
int err_seen = 1; |
|
224
|
0
|
0
|
|
|
|
|
for (uint64_t c = 0; c < nc; c++) { |
|
225
|
|
|
|
|
|
|
const char *cname; size_t cname_len; |
|
226
|
0
|
|
|
|
|
|
rc = read_native_string_ref(bbuf, blen, &bpos, &cname, &cname_len); |
|
227
|
0
|
0
|
|
|
|
|
if (rc <= 0) { err_seen = rc < 0 ? -1 : 0; break; } |
|
|
|
0
|
|
|
|
|
|
|
228
|
0
|
|
|
|
|
|
Newx(names[c], cname_len + 1, char); |
|
229
|
0
|
|
|
|
|
|
Copy(cname, names[c], cname_len, char); |
|
230
|
0
|
|
|
|
|
|
names[c][cname_len] = '\0'; |
|
231
|
|
|
|
|
|
|
|
|
232
|
|
|
|
|
|
|
const char *ctype; size_t ctype_len; |
|
233
|
0
|
|
|
|
|
|
rc = read_native_string_ref(bbuf, blen, &bpos, &ctype, &ctype_len); |
|
234
|
0
|
0
|
|
|
|
|
if (rc <= 0) { err_seen = rc < 0 ? -1 : 0; break; } |
|
|
|
0
|
|
|
|
|
|
|
235
|
0
|
0
|
|
|
|
|
if (bpos >= blen) { err_seen = 0; break; } |
|
236
|
0
|
0
|
|
|
|
|
if ((uint8_t)bbuf[bpos]) { err_seen = -1; break; } |
|
237
|
0
|
|
|
|
|
|
bpos++; |
|
238
|
0
|
0
|
|
|
|
|
if (nr > 0) { |
|
239
|
0
|
|
|
|
|
|
col_type_t *ct = parse_col_type(ctype, ctype_len); |
|
240
|
0
|
|
|
|
|
|
int col_err = 0; |
|
241
|
0
|
|
|
|
|
|
SV **vals = decode_column(bbuf, blen, &bpos, nr, ct, &col_err, 0); |
|
242
|
0
|
|
|
|
|
|
free_col_type(ct); |
|
243
|
0
|
0
|
|
|
|
|
if (!vals) { err_seen = col_err ? -1 : 0; break; } |
|
|
|
0
|
|
|
|
|
|
|
244
|
0
|
|
|
|
|
|
data[c] = vals; |
|
245
|
|
|
|
|
|
|
} |
|
246
|
|
|
|
|
|
|
} |
|
247
|
|
|
|
|
|
|
|
|
248
|
0
|
|
|
|
|
|
int destroyed_during_cb = 0; |
|
249
|
0
|
0
|
|
|
|
|
if (err_seen == 1) { |
|
250
|
|
|
|
|
|
|
/* Pin cb across the loop: an on_log handler that calls |
|
251
|
|
|
|
|
|
|
* $ch->on_log(undef) would otherwise free the CV mid-iteration. */ |
|
252
|
0
|
|
|
|
|
|
SvREFCNT_inc(cb); |
|
253
|
0
|
|
|
|
|
|
self->callback_depth++; |
|
254
|
0
|
0
|
|
|
|
|
for (uint64_t r = 0; r < nr; r++) { |
|
255
|
0
|
|
|
|
|
|
HV *row = newHV(); |
|
256
|
0
|
0
|
|
|
|
|
for (uint64_t c = 0; c < nc; c++) { |
|
257
|
0
|
|
|
|
|
|
SV *v = data[c][r]; |
|
258
|
0
|
|
|
|
|
|
SvREFCNT_inc(v); /* hv_store consumes one ref */ |
|
259
|
0
|
|
|
|
|
|
(void)hv_store(row, names[c], strlen(names[c]), v, 0); |
|
260
|
|
|
|
|
|
|
} |
|
261
|
0
|
|
|
|
|
|
dSP; |
|
262
|
0
|
0
|
|
|
|
|
ENTER; SAVETMPS; PUSHMARK(SP); |
|
263
|
0
|
0
|
|
|
|
|
EXTEND(SP, 1); |
|
264
|
0
|
|
|
|
|
|
PUSHs(sv_2mortal(newRV_noinc((SV *)row))); |
|
265
|
0
|
|
|
|
|
|
PUTBACK; |
|
266
|
0
|
|
|
|
|
|
call_sv(cb, G_EVAL | G_VOID | G_DISCARD); |
|
267
|
0
|
0
|
|
|
|
|
WARN_AND_CLEAR_ERRSV("on_log"); |
|
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
268
|
0
|
0
|
|
|
|
|
FREETMPS; LEAVE; |
|
269
|
0
|
0
|
|
|
|
|
if (self->magic == EV_CH_FREED) { destroyed_during_cb = 1; break; } |
|
270
|
|
|
|
|
|
|
} |
|
271
|
|
|
|
|
|
|
/* Drop the cb pin BEFORE callback_depth-- : dropping the last ref |
|
272
|
|
|
|
|
|
|
* to the on_log CV (a handler may have reassigned on_log) can free |
|
273
|
|
|
|
|
|
|
* a closure that captured $ch, triggering DESTROY. Doing it while |
|
274
|
|
|
|
|
|
|
* callback_depth is still raised keeps Safefree(self) deferred so |
|
275
|
|
|
|
|
|
|
* the check_destroyed below can detect and finalize it. */ |
|
276
|
0
|
|
|
|
|
|
SvREFCNT_dec(cb); |
|
277
|
0
|
|
|
|
|
|
self->callback_depth--; |
|
278
|
|
|
|
|
|
|
} |
|
279
|
|
|
|
|
|
|
(void)destroyed_during_cb; /* loop-break flag only; freed state is |
|
280
|
|
|
|
|
|
|
* authoritatively reported by check_destroyed */ |
|
281
|
|
|
|
|
|
|
|
|
282
|
|
|
|
|
|
|
/* Free locally-collected SVs + names regardless of self's state. */ |
|
283
|
0
|
0
|
|
|
|
|
for (uint64_t c = 0; c < nc; c++) { |
|
284
|
0
|
0
|
|
|
|
|
if (data && data[c]) { |
|
|
|
0
|
|
|
|
|
|
|
285
|
0
|
0
|
|
|
|
|
for (uint64_t r = 0; r < nr; r++) SvREFCNT_dec(data[c][r]); |
|
286
|
0
|
|
|
|
|
|
Safefree(data[c]); |
|
287
|
|
|
|
|
|
|
} |
|
288
|
0
|
0
|
|
|
|
|
if (names && names[c]) Safefree(names[c]); |
|
|
|
0
|
|
|
|
|
|
|
289
|
|
|
|
|
|
|
} |
|
290
|
0
|
0
|
|
|
|
|
if (data) Safefree(data); |
|
291
|
0
|
0
|
|
|
|
|
if (names) Safefree(names); |
|
292
|
|
|
|
|
|
|
|
|
293
|
|
|
|
|
|
|
#undef _BAIL_LOG |
|
294
|
|
|
|
|
|
|
|
|
295
|
|
|
|
|
|
|
/* Single cleanup point: decompressed always needs freeing if allocated, |
|
296
|
|
|
|
|
|
|
* regardless of which branch we return from. */ |
|
297
|
|
|
|
|
|
|
int ret; |
|
298
|
|
|
|
|
|
|
/* on_log — or dropping the pinned cb above — may have freed self. |
|
299
|
|
|
|
|
|
|
* check_destroyed finalizes any deferred Safefree and reports it. */ |
|
300
|
0
|
0
|
|
|
|
|
if (check_destroyed(self)) { |
|
301
|
0
|
|
|
|
|
|
ret = -2; |
|
302
|
0
|
0
|
|
|
|
|
} else if (err_seen < 0) { |
|
303
|
0
|
|
|
|
|
|
*errmsg = safe_strdup("malformed log block"); |
|
304
|
0
|
|
|
|
|
|
ret = -1; |
|
305
|
0
|
0
|
|
|
|
|
} else if (err_seen == 0) { /* need more data — do not advance outer_pos */ |
|
306
|
0
|
|
|
|
|
|
ret = 0; |
|
307
|
|
|
|
|
|
|
} else { |
|
308
|
|
|
|
|
|
|
/* For the uncompressed path bpos has been advanced inside the outer |
|
309
|
|
|
|
|
|
|
* buf; copy it back to pos. For the LZ4 path pos was already advanced |
|
310
|
|
|
|
|
|
|
* past the chain by ch_lz4_decompress_chain, and bpos is an offset |
|
311
|
|
|
|
|
|
|
* into the freed decompressed buffer — don't touch pos. */ |
|
312
|
0
|
0
|
|
|
|
|
if (!decompressed) pos = bpos; |
|
313
|
0
|
|
|
|
|
|
*outer_pos = pos; |
|
314
|
0
|
|
|
|
|
|
ret = 1; |
|
315
|
|
|
|
|
|
|
} |
|
316
|
0
|
0
|
|
|
|
|
if (decompressed) Safefree(decompressed); |
|
317
|
0
|
|
|
|
|
|
return ret; |
|
318
|
|
|
|
|
|
|
} |
|
319
|
|
|
|
|
|
|
|
|
320
|
|
|
|
|
|
|
/* Dispatch the on_progress callback with five UInt values. Returns -2 if |
|
321
|
|
|
|
|
|
|
* the handler freed self, 0 otherwise. Shared between SERVER_PROGRESS |
|
322
|
|
|
|
|
|
|
* (per-packet) and EndOfStream (flush of any uncoalesced accumulator). */ |
|
323
|
0
|
|
|
|
|
|
static int fire_progress_cb(ev_clickhouse_t *self, const uint64_t pp[5]) { |
|
324
|
|
|
|
|
|
|
int i; |
|
325
|
0
|
|
|
|
|
|
dSP; |
|
326
|
0
|
|
|
|
|
|
self->callback_depth++; |
|
327
|
0
|
|
|
|
|
|
ENTER; SAVETMPS; |
|
328
|
0
|
0
|
|
|
|
|
PUSHMARK(SP); |
|
329
|
0
|
0
|
|
|
|
|
EXTEND(SP, 5); |
|
330
|
0
|
0
|
|
|
|
|
for (i = 0; i < 5; i++) PUSHs(sv_2mortal(newSVuv(pp[i]))); |
|
331
|
0
|
|
|
|
|
|
PUTBACK; |
|
332
|
0
|
|
|
|
|
|
call_sv(self->on_progress, G_DISCARD | G_EVAL); |
|
333
|
0
|
0
|
|
|
|
|
WARN_AND_CLEAR_ERRSV("progress handler"); |
|
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
334
|
0
|
0
|
|
|
|
|
FREETMPS; LEAVE; |
|
335
|
0
|
|
|
|
|
|
self->callback_depth--; |
|
336
|
0
|
0
|
|
|
|
|
return check_destroyed(self) ? -2 : 0; |
|
337
|
|
|
|
|
|
|
} |
|
338
|
|
|
|
|
|
|
|
|
339
|
0
|
|
|
|
|
|
static int parse_native_packet(ev_clickhouse_t *self, char **errmsg) { |
|
340
|
0
|
|
|
|
|
|
const char *buf = self->recv_buf; |
|
341
|
0
|
|
|
|
|
|
size_t len = self->recv_len; |
|
342
|
0
|
|
|
|
|
|
size_t pos = 0; |
|
343
|
|
|
|
|
|
|
uint64_t ptype; |
|
344
|
|
|
|
|
|
|
int rc; |
|
345
|
|
|
|
|
|
|
|
|
346
|
0
|
|
|
|
|
|
rc = read_varuint(buf, len, &pos, &ptype); |
|
347
|
0
|
0
|
|
|
|
|
if (rc == 0) return 0; |
|
348
|
0
|
0
|
|
|
|
|
if (rc < 0) { |
|
349
|
0
|
|
|
|
|
|
*errmsg = safe_strdup("malformed packet type"); |
|
350
|
0
|
|
|
|
|
|
return -1; |
|
351
|
|
|
|
|
|
|
} |
|
352
|
|
|
|
|
|
|
|
|
353
|
0
|
|
|
|
|
|
switch ((int)ptype) { |
|
354
|
|
|
|
|
|
|
|
|
355
|
0
|
|
|
|
|
|
case SERVER_HELLO: { |
|
356
|
0
|
|
|
|
|
|
char *sname = NULL; |
|
357
|
|
|
|
|
|
|
uint64_t major, minor, revision; |
|
358
|
|
|
|
|
|
|
|
|
359
|
0
|
|
|
|
|
|
rc = read_native_string_alloc(buf, len, &pos, &sname, NULL); |
|
360
|
0
|
0
|
|
|
|
|
if (rc == 0) return 0; |
|
361
|
0
|
0
|
|
|
|
|
if (rc < 0) { *errmsg = safe_strdup("malformed server name"); return -1; } |
|
362
|
|
|
|
|
|
|
|
|
363
|
0
|
|
|
|
|
|
rc = read_varuint(buf, len, &pos, &major); |
|
364
|
0
|
0
|
|
|
|
|
if (rc == 0) { Safefree(sname); return 0; } |
|
365
|
0
|
0
|
|
|
|
|
if (rc < 0) { Safefree(sname); *errmsg = safe_strdup("malformed server version major"); return -1; } |
|
366
|
|
|
|
|
|
|
|
|
367
|
0
|
|
|
|
|
|
rc = read_varuint(buf, len, &pos, &minor); |
|
368
|
0
|
0
|
|
|
|
|
if (rc == 0) { Safefree(sname); return 0; } |
|
369
|
0
|
0
|
|
|
|
|
if (rc < 0) { Safefree(sname); *errmsg = safe_strdup("malformed server version minor"); return -1; } |
|
370
|
|
|
|
|
|
|
|
|
371
|
0
|
|
|
|
|
|
rc = read_varuint(buf, len, &pos, &revision); |
|
372
|
0
|
0
|
|
|
|
|
if (rc == 0) { Safefree(sname); return 0; } |
|
373
|
0
|
0
|
|
|
|
|
if (rc < 0) { Safefree(sname); *errmsg = safe_strdup("malformed server revision"); return -1; } |
|
374
|
|
|
|
|
|
|
|
|
375
|
0
|
0
|
|
|
|
|
CLEAR_STR(self->server_name); |
|
376
|
0
|
|
|
|
|
|
self->server_name = sname; |
|
377
|
0
|
|
|
|
|
|
self->server_version_major = (unsigned int)major; |
|
378
|
0
|
|
|
|
|
|
self->server_version_minor = (unsigned int)minor; |
|
379
|
0
|
|
|
|
|
|
self->server_revision = (unsigned int)revision; |
|
380
|
|
|
|
|
|
|
|
|
381
|
|
|
|
|
|
|
/* The server emits these fields per the negotiated revision = |
|
382
|
|
|
|
|
|
|
* min(server_rev, our advertised CH_CLIENT_REVISION). Gate on |
|
383
|
|
|
|
|
|
|
* server_revision so we don't read garbage from older servers. */ |
|
384
|
0
|
0
|
|
|
|
|
if (self->server_revision >= DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE) { |
|
385
|
0
|
|
|
|
|
|
char *tz = NULL; |
|
386
|
0
|
|
|
|
|
|
rc = read_native_string_alloc(buf, len, &pos, &tz, NULL); |
|
387
|
0
|
0
|
|
|
|
|
if (rc == 0) return 0; |
|
388
|
0
|
0
|
|
|
|
|
if (rc < 0) { *errmsg = safe_strdup("malformed timezone"); return -1; } |
|
389
|
0
|
0
|
|
|
|
|
CLEAR_STR(self->server_timezone); |
|
390
|
0
|
|
|
|
|
|
self->server_timezone = tz; |
|
391
|
|
|
|
|
|
|
} |
|
392
|
|
|
|
|
|
|
|
|
393
|
0
|
0
|
|
|
|
|
if (self->server_revision >= DBMS_MIN_REVISION_WITH_SERVER_DISPLAY_NAME) { |
|
394
|
0
|
|
|
|
|
|
char *dn = NULL; |
|
395
|
0
|
|
|
|
|
|
rc = read_native_string_alloc(buf, len, &pos, &dn, NULL); |
|
396
|
0
|
0
|
|
|
|
|
if (rc == 0) return 0; |
|
397
|
0
|
0
|
|
|
|
|
if (rc < 0) { *errmsg = safe_strdup("malformed display name"); return -1; } |
|
398
|
0
|
0
|
|
|
|
|
CLEAR_STR(self->server_display_name); |
|
399
|
0
|
|
|
|
|
|
self->server_display_name = dn; |
|
400
|
|
|
|
|
|
|
} |
|
401
|
|
|
|
|
|
|
|
|
402
|
0
|
0
|
|
|
|
|
if (self->server_revision >= DBMS_MIN_REVISION_WITH_VERSION_PATCH) { |
|
403
|
|
|
|
|
|
|
uint64_t patch; |
|
404
|
0
|
|
|
|
|
|
rc = read_varuint(buf, len, &pos, &patch); |
|
405
|
0
|
0
|
|
|
|
|
if (rc == 0) return 0; |
|
406
|
0
|
0
|
|
|
|
|
if (rc < 0) { *errmsg = safe_strdup("malformed version patch"); return -1; } |
|
407
|
0
|
|
|
|
|
|
self->server_version_patch = (unsigned int)patch; |
|
408
|
|
|
|
|
|
|
} |
|
409
|
|
|
|
|
|
|
|
|
410
|
|
|
|
|
|
|
/* consume from recv_buf */ |
|
411
|
0
|
|
|
|
|
|
recv_consume(self, pos); |
|
412
|
0
|
|
|
|
|
|
return 4; |
|
413
|
|
|
|
|
|
|
} |
|
414
|
|
|
|
|
|
|
|
|
415
|
0
|
|
|
|
|
|
case SERVER_DATA: |
|
416
|
|
|
|
|
|
|
case SERVER_TOTALS: |
|
417
|
|
|
|
|
|
|
case SERVER_EXTREMES: { |
|
418
|
|
|
|
|
|
|
uint64_t num_cols, num_rows; |
|
419
|
|
|
|
|
|
|
const char *dbuf; /* data buffer (may point to decompressed data) */ |
|
420
|
|
|
|
|
|
|
size_t dlen, dpos; |
|
421
|
0
|
|
|
|
|
|
char *decompressed = NULL; |
|
422
|
|
|
|
|
|
|
|
|
423
|
|
|
|
|
|
|
/* table name — outside compression */ |
|
424
|
0
|
|
|
|
|
|
rc = skip_native_string(buf, len, &pos); |
|
425
|
0
|
0
|
|
|
|
|
if (rc == 0) return 0; |
|
426
|
0
|
0
|
|
|
|
|
if (rc < 0) { *errmsg = safe_strdup("malformed table name"); return -1; } |
|
427
|
|
|
|
|
|
|
|
|
428
|
|
|
|
|
|
|
#ifdef HAVE_LZ4 |
|
429
|
|
|
|
|
|
|
if (self->compress) { |
|
430
|
|
|
|
|
|
|
/* Decompress the block body — may span multiple LZ4 sub-blocks. */ |
|
431
|
|
|
|
|
|
|
int need_more = 0; |
|
432
|
|
|
|
|
|
|
const char *lz4_err = NULL; |
|
433
|
|
|
|
|
|
|
decompressed = ch_lz4_decompress_chain(buf, len, &pos, &dlen, |
|
434
|
|
|
|
|
|
|
&need_more, &lz4_err); |
|
435
|
|
|
|
|
|
|
if (!decompressed) { |
|
436
|
|
|
|
|
|
|
if (need_more) return 0; |
|
437
|
|
|
|
|
|
|
*errmsg = safe_strdup(lz4_err ? lz4_err : "LZ4 decompression failed"); |
|
438
|
|
|
|
|
|
|
return -1; |
|
439
|
|
|
|
|
|
|
} |
|
440
|
|
|
|
|
|
|
dbuf = decompressed; |
|
441
|
|
|
|
|
|
|
dpos = 0; |
|
442
|
|
|
|
|
|
|
} else |
|
443
|
|
|
|
|
|
|
#endif |
|
444
|
|
|
|
|
|
|
{ |
|
445
|
0
|
|
|
|
|
|
dbuf = buf; |
|
446
|
0
|
|
|
|
|
|
dlen = len; |
|
447
|
0
|
|
|
|
|
|
dpos = pos; |
|
448
|
|
|
|
|
|
|
} |
|
449
|
|
|
|
|
|
|
|
|
450
|
|
|
|
|
|
|
/* block info */ |
|
451
|
0
|
0
|
|
|
|
|
if (self->server_revision >= DBMS_MIN_REVISION_WITH_BLOCK_INFO) { |
|
452
|
0
|
|
|
|
|
|
rc = skip_block_info(dbuf, dlen, &dpos); |
|
453
|
0
|
0
|
|
|
|
|
if (rc == 0) { |
|
454
|
0
|
0
|
|
|
|
|
if (decompressed) { Safefree(decompressed); *errmsg = safe_strdup("truncated compressed block"); return -1; } |
|
455
|
0
|
|
|
|
|
|
return 0; |
|
456
|
|
|
|
|
|
|
} |
|
457
|
0
|
0
|
|
|
|
|
if (rc < 0) { if (decompressed) Safefree(decompressed); *errmsg = safe_strdup("malformed block info"); return -1; } |
|
|
|
0
|
|
|
|
|
|
|
458
|
|
|
|
|
|
|
} |
|
459
|
|
|
|
|
|
|
|
|
460
|
0
|
|
|
|
|
|
rc = read_varuint(dbuf, dlen, &dpos, &num_cols); |
|
461
|
0
|
0
|
|
|
|
|
if (rc == 0) { |
|
462
|
0
|
0
|
|
|
|
|
if (decompressed) { Safefree(decompressed); *errmsg = safe_strdup("truncated compressed block"); return -1; } |
|
463
|
0
|
|
|
|
|
|
return 0; |
|
464
|
|
|
|
|
|
|
} |
|
465
|
0
|
0
|
|
|
|
|
if (rc < 0) { if (decompressed) Safefree(decompressed); *errmsg = safe_strdup("malformed num_cols"); return -1; } |
|
|
|
0
|
|
|
|
|
|
|
466
|
|
|
|
|
|
|
|
|
467
|
0
|
|
|
|
|
|
rc = read_varuint(dbuf, dlen, &dpos, &num_rows); |
|
468
|
0
|
0
|
|
|
|
|
if (rc == 0) { |
|
469
|
0
|
0
|
|
|
|
|
if (decompressed) { Safefree(decompressed); *errmsg = safe_strdup("truncated compressed block"); return -1; } |
|
470
|
0
|
|
|
|
|
|
return 0; |
|
471
|
|
|
|
|
|
|
} |
|
472
|
0
|
0
|
|
|
|
|
if (rc < 0) { if (decompressed) Safefree(decompressed); *errmsg = safe_strdup("malformed num_rows"); return -1; } |
|
|
|
0
|
|
|
|
|
|
|
473
|
|
|
|
|
|
|
|
|
474
|
|
|
|
|
|
|
/* Empty data block — skip or handle column names/types */ |
|
475
|
0
|
0
|
|
|
|
|
if (num_rows == 0) { |
|
476
|
|
|
|
|
|
|
/* INSERT two-phase: server sent sample block with column structure */ |
|
477
|
0
|
0
|
|
|
|
|
if (self->native_state == NATIVE_WAIT_INSERT_META |
|
478
|
0
|
0
|
|
|
|
|
&& (self->insert_data || self->insert_av) && num_cols > 0) { |
|
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
479
|
|
|
|
|
|
|
const char **cnames; |
|
480
|
|
|
|
|
|
|
size_t *cname_lens; |
|
481
|
|
|
|
|
|
|
const char **ctypes_str; |
|
482
|
|
|
|
|
|
|
size_t *ctype_lens; |
|
483
|
|
|
|
|
|
|
col_type_t **ctypes; |
|
484
|
|
|
|
|
|
|
char *data_pkt; |
|
485
|
|
|
|
|
|
|
size_t data_pkt_len; |
|
486
|
|
|
|
|
|
|
uint64_t c; |
|
487
|
0
|
|
|
|
|
|
int meta_hard = 0; /* 1 = hard error, 0 = need more */ |
|
488
|
0
|
|
|
|
|
|
const char *meta_err = NULL; /* non-NULL → goto meta_cleanup */ |
|
489
|
|
|
|
|
|
|
|
|
490
|
0
|
0
|
|
|
|
|
Newxz(cnames, num_cols, const char *); |
|
491
|
0
|
0
|
|
|
|
|
Newxz(cname_lens, num_cols, size_t); |
|
492
|
0
|
0
|
|
|
|
|
Newxz(ctypes_str, num_cols, const char *); |
|
493
|
0
|
0
|
|
|
|
|
Newxz(ctype_lens, num_cols, size_t); |
|
494
|
0
|
0
|
|
|
|
|
Newxz(ctypes, num_cols, col_type_t *); |
|
495
|
|
|
|
|
|
|
|
|
496
|
0
|
0
|
|
|
|
|
for (c = 0; c < num_cols; c++) { |
|
497
|
0
|
|
|
|
|
|
rc = read_native_string_ref(dbuf, dlen, &dpos, |
|
498
|
0
|
|
|
|
|
|
&cnames[c], &cname_lens[c]); |
|
499
|
0
|
0
|
|
|
|
|
if (rc <= 0) { |
|
500
|
0
|
0
|
|
|
|
|
meta_hard = (rc < 0 || decompressed != NULL); |
|
|
|
0
|
|
|
|
|
|
|
501
|
0
|
|
|
|
|
|
meta_err = "malformed cname"; |
|
502
|
0
|
|
|
|
|
|
goto meta_cleanup; |
|
503
|
|
|
|
|
|
|
} |
|
504
|
0
|
|
|
|
|
|
rc = read_native_string_ref(dbuf, dlen, &dpos, |
|
505
|
0
|
|
|
|
|
|
&ctypes_str[c], &ctype_lens[c]); |
|
506
|
0
|
0
|
|
|
|
|
if (rc <= 0) { |
|
507
|
0
|
0
|
|
|
|
|
meta_hard = (rc < 0 || decompressed != NULL); |
|
|
|
0
|
|
|
|
|
|
|
508
|
0
|
|
|
|
|
|
meta_err = "malformed ctype"; |
|
509
|
0
|
|
|
|
|
|
goto meta_cleanup; |
|
510
|
|
|
|
|
|
|
} |
|
511
|
0
|
|
|
|
|
|
ctypes[c] = parse_col_type(ctypes_str[c], ctype_lens[c]); |
|
512
|
|
|
|
|
|
|
|
|
513
|
|
|
|
|
|
|
/* custom serialization flag (revision >= 54446) */ |
|
514
|
0
|
0
|
|
|
|
|
if (dpos >= dlen) { |
|
515
|
0
|
|
|
|
|
|
meta_hard = (decompressed != NULL); |
|
516
|
0
|
|
|
|
|
|
meta_err = "truncated custom_ser"; |
|
517
|
0
|
|
|
|
|
|
goto meta_cleanup; |
|
518
|
|
|
|
|
|
|
} |
|
519
|
0
|
0
|
|
|
|
|
if ((uint8_t)dbuf[dpos]) { |
|
520
|
0
|
|
|
|
|
|
meta_hard = 1; |
|
521
|
0
|
|
|
|
|
|
meta_err = "custom serialization not supported"; |
|
522
|
0
|
|
|
|
|
|
goto meta_cleanup; |
|
523
|
|
|
|
|
|
|
} |
|
524
|
0
|
|
|
|
|
|
dpos++; |
|
525
|
|
|
|
|
|
|
} |
|
526
|
0
|
|
|
|
|
|
goto meta_ok; |
|
527
|
|
|
|
|
|
|
|
|
528
|
0
|
|
|
|
|
|
meta_cleanup: |
|
529
|
0
|
0
|
|
|
|
|
for (c = 0; c < num_cols; c++) if (ctypes[c]) free_col_type(ctypes[c]); |
|
|
|
0
|
|
|
|
|
|
|
530
|
0
|
|
|
|
|
|
Safefree(cnames); Safefree(cname_lens); |
|
531
|
0
|
|
|
|
|
|
Safefree(ctypes_str); Safefree(ctype_lens); |
|
532
|
0
|
|
|
|
|
|
Safefree(ctypes); |
|
533
|
0
|
0
|
|
|
|
|
if (decompressed) Safefree(decompressed); |
|
534
|
0
|
0
|
|
|
|
|
if (meta_hard) { *errmsg = safe_strdup(meta_err); return -1; } |
|
535
|
0
|
|
|
|
|
|
return 0; |
|
536
|
|
|
|
|
|
|
|
|
537
|
0
|
|
|
|
|
|
meta_ok: ; |
|
538
|
|
|
|
|
|
|
|
|
539
|
|
|
|
|
|
|
/* Build binary data block from stored data */ |
|
540
|
0
|
0
|
|
|
|
|
if (self->insert_av) { |
|
541
|
0
|
|
|
|
|
|
data_pkt = build_native_insert_data_from_av(aTHX_ self, |
|
542
|
0
|
|
|
|
|
|
(AV *)SvRV(self->insert_av), |
|
543
|
|
|
|
|
|
|
cnames, cname_lens, ctypes_str, ctype_lens, |
|
544
|
|
|
|
|
|
|
ctypes, (int)num_cols, &data_pkt_len); |
|
545
|
|
|
|
|
|
|
} else { |
|
546
|
0
|
|
|
|
|
|
data_pkt = build_native_insert_data(self, |
|
547
|
0
|
|
|
|
|
|
self->insert_data, self->insert_data_len, |
|
548
|
|
|
|
|
|
|
cnames, cname_lens, ctypes_str, ctype_lens, |
|
549
|
|
|
|
|
|
|
ctypes, (int)num_cols, &data_pkt_len); |
|
550
|
|
|
|
|
|
|
} |
|
551
|
|
|
|
|
|
|
|
|
552
|
0
|
0
|
|
|
|
|
for (c = 0; c < num_cols; c++) |
|
553
|
0
|
|
|
|
|
|
free_col_type(ctypes[c]); |
|
554
|
0
|
|
|
|
|
|
Safefree(cnames); Safefree(cname_lens); |
|
555
|
0
|
|
|
|
|
|
Safefree(ctypes_str); Safefree(ctype_lens); |
|
556
|
0
|
|
|
|
|
|
Safefree(ctypes); |
|
557
|
|
|
|
|
|
|
|
|
558
|
|
|
|
|
|
|
{ |
|
559
|
|
|
|
|
|
|
/* Check encode-failure sentinel before freeing insert data */ |
|
560
|
0
|
0
|
|
|
|
|
int encode_failed = (!data_pkt && data_pkt_len == (size_t)-1); |
|
|
|
0
|
|
|
|
|
|
|
561
|
|
|
|
|
|
|
|
|
562
|
|
|
|
|
|
|
/* Free stored INSERT data */ |
|
563
|
0
|
0
|
|
|
|
|
CLEAR_INSERT(self); |
|
|
|
0
|
|
|
|
|
|
|
564
|
|
|
|
|
|
|
|
|
565
|
0
|
0
|
|
|
|
|
if (decompressed) Safefree(decompressed); |
|
566
|
0
|
|
|
|
|
|
else pos = dpos; |
|
567
|
0
|
|
|
|
|
|
recv_consume(self, pos); |
|
568
|
|
|
|
|
|
|
|
|
569
|
0
|
0
|
|
|
|
|
if (!data_pkt) { |
|
570
|
|
|
|
|
|
|
/* Send empty Data block to complete the INSERT protocol */ |
|
571
|
|
|
|
|
|
|
native_buf_t fallback; |
|
572
|
0
|
|
|
|
|
|
nbuf_init(&fallback); |
|
573
|
0
|
|
|
|
|
|
nbuf_empty_data_block(&fallback, self->compress); |
|
574
|
0
|
|
|
|
|
|
data_pkt = fallback.data; |
|
575
|
0
|
|
|
|
|
|
data_pkt_len = fallback.len; |
|
576
|
0
|
0
|
|
|
|
|
if (encode_failed) |
|
577
|
0
|
|
|
|
|
|
self->insert_err = safe_strdup( |
|
578
|
|
|
|
|
|
|
"native INSERT encoding failed (unsupported type)"); |
|
579
|
|
|
|
|
|
|
} |
|
580
|
|
|
|
|
|
|
} |
|
581
|
|
|
|
|
|
|
|
|
582
|
|
|
|
|
|
|
/* Send the data block — write to send_buf and start writing */ |
|
583
|
0
|
|
|
|
|
|
self->native_state = NATIVE_WAIT_RESULT; |
|
584
|
0
|
|
|
|
|
|
send_replace(self, data_pkt, data_pkt_len); |
|
585
|
0
|
0
|
|
|
|
|
if (try_write(self)) return -2; |
|
586
|
0
|
|
|
|
|
|
return 1; |
|
587
|
|
|
|
|
|
|
} |
|
588
|
|
|
|
|
|
|
|
|
589
|
|
|
|
|
|
|
/* INSERT two-phase with 0-column sample block: free data, |
|
590
|
|
|
|
|
|
|
* send empty Data block, transition to WAIT_RESULT */ |
|
591
|
0
|
0
|
|
|
|
|
if (self->native_state == NATIVE_WAIT_INSERT_META |
|
592
|
0
|
0
|
|
|
|
|
&& (self->insert_data || self->insert_av) && num_cols == 0) { |
|
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
593
|
|
|
|
|
|
|
native_buf_t fallback; |
|
594
|
|
|
|
|
|
|
|
|
595
|
0
|
0
|
|
|
|
|
CLEAR_INSERT(self); |
|
|
|
0
|
|
|
|
|
|
|
596
|
|
|
|
|
|
|
|
|
597
|
0
|
0
|
|
|
|
|
if (decompressed) Safefree(decompressed); |
|
598
|
0
|
|
|
|
|
|
else pos = dpos; |
|
599
|
0
|
|
|
|
|
|
recv_consume(self, pos); |
|
600
|
|
|
|
|
|
|
|
|
601
|
0
|
|
|
|
|
|
nbuf_init(&fallback); |
|
602
|
0
|
|
|
|
|
|
nbuf_empty_data_block(&fallback, self->compress); |
|
603
|
0
|
|
|
|
|
|
self->native_state = NATIVE_WAIT_RESULT; |
|
604
|
0
|
|
|
|
|
|
send_replace(self, fallback.data, fallback.len); |
|
605
|
0
|
|
|
|
|
|
self->insert_err = safe_strdup( |
|
606
|
|
|
|
|
|
|
"INSERT failed: server sent 0-column sample block"); |
|
607
|
0
|
0
|
|
|
|
|
if (try_write(self)) return -2; |
|
608
|
0
|
|
|
|
|
|
return 1; |
|
609
|
|
|
|
|
|
|
} |
|
610
|
|
|
|
|
|
|
|
|
611
|
|
|
|
|
|
|
/* Normal empty block — capture column names/types so callers can |
|
612
|
|
|
|
|
|
|
* inspect schema after a zero-row SELECT. Reset only when we have |
|
613
|
|
|
|
|
|
|
* columns to record so the terminating empty block (num_cols=0) |
|
614
|
|
|
|
|
|
|
* doesn't wipe the schema we already captured; this also drops |
|
615
|
|
|
|
|
|
|
* any partial state from a need-more retry. */ |
|
616
|
|
|
|
|
|
|
{ |
|
617
|
|
|
|
|
|
|
uint64_t c; |
|
618
|
0
|
0
|
|
|
|
|
if (num_cols > 0) { |
|
619
|
0
|
0
|
|
|
|
|
CLEAR_SV(self->native_col_names); |
|
620
|
0
|
0
|
|
|
|
|
CLEAR_SV(self->native_col_types); |
|
621
|
0
|
|
|
|
|
|
self->native_col_names = newAV(); |
|
622
|
0
|
|
|
|
|
|
self->native_col_types = newAV(); |
|
623
|
|
|
|
|
|
|
} |
|
624
|
0
|
0
|
|
|
|
|
for (c = 0; c < num_cols; c++) { |
|
625
|
|
|
|
|
|
|
const char *cname, *ctype; |
|
626
|
|
|
|
|
|
|
size_t cname_len, ctype_len; |
|
627
|
|
|
|
|
|
|
|
|
628
|
0
|
|
|
|
|
|
rc = read_native_string_ref(dbuf, dlen, &dpos, &cname, &cname_len); |
|
629
|
0
|
0
|
|
|
|
|
if (rc <= 0) { |
|
630
|
0
|
0
|
|
|
|
|
if (decompressed) Safefree(decompressed); |
|
631
|
0
|
0
|
|
|
|
|
if (rc < 0) { *errmsg = safe_strdup("malformed cname"); return -1; } |
|
632
|
0
|
|
|
|
|
|
return 0; |
|
633
|
|
|
|
|
|
|
} |
|
634
|
0
|
|
|
|
|
|
av_push(self->native_col_names, newSVpvn(cname, cname_len)); |
|
635
|
|
|
|
|
|
|
|
|
636
|
0
|
|
|
|
|
|
rc = read_native_string_ref(dbuf, dlen, &dpos, &ctype, &ctype_len); |
|
637
|
0
|
0
|
|
|
|
|
if (rc <= 0) { |
|
638
|
0
|
0
|
|
|
|
|
if (decompressed) Safefree(decompressed); |
|
639
|
0
|
0
|
|
|
|
|
if (rc < 0) { *errmsg = safe_strdup("malformed ctype"); return -1; } |
|
640
|
0
|
|
|
|
|
|
return 0; |
|
641
|
|
|
|
|
|
|
} |
|
642
|
0
|
|
|
|
|
|
av_push(self->native_col_types, newSVpvn(ctype, ctype_len)); |
|
643
|
|
|
|
|
|
|
|
|
644
|
|
|
|
|
|
|
/* custom serialization flag (revision >= 54446) */ |
|
645
|
0
|
0
|
|
|
|
|
if (dpos >= dlen) { |
|
646
|
0
|
0
|
|
|
|
|
if (decompressed) Safefree(decompressed); |
|
647
|
0
|
|
|
|
|
|
return 0; |
|
648
|
|
|
|
|
|
|
} |
|
649
|
0
|
0
|
|
|
|
|
if ((uint8_t)dbuf[dpos]) { |
|
650
|
0
|
0
|
|
|
|
|
if (decompressed) Safefree(decompressed); |
|
651
|
0
|
|
|
|
|
|
*errmsg = safe_strdup("custom serialization not supported"); |
|
652
|
0
|
|
|
|
|
|
return -1; |
|
653
|
|
|
|
|
|
|
} |
|
654
|
0
|
|
|
|
|
|
dpos++; |
|
655
|
|
|
|
|
|
|
} |
|
656
|
|
|
|
|
|
|
} |
|
657
|
0
|
0
|
|
|
|
|
if (decompressed) Safefree(decompressed); |
|
658
|
0
|
|
|
|
|
|
else pos = dpos; /* uncompressed: advance pos to match dpos */ |
|
659
|
0
|
|
|
|
|
|
recv_consume(self, pos); |
|
660
|
0
|
|
|
|
|
|
return 1; |
|
661
|
|
|
|
|
|
|
} |
|
662
|
|
|
|
|
|
|
|
|
663
|
|
|
|
|
|
|
/* Decode columns and convert to rows */ |
|
664
|
|
|
|
|
|
|
{ |
|
665
|
0
|
|
|
|
|
|
SV ***columns = NULL; |
|
666
|
0
|
|
|
|
|
|
col_type_t **col_types = NULL; |
|
667
|
0
|
|
|
|
|
|
const char **cnames = NULL; |
|
668
|
0
|
|
|
|
|
|
size_t *cname_lens = NULL; |
|
669
|
|
|
|
|
|
|
uint64_t c, r; |
|
670
|
0
|
|
|
|
|
|
int named = (self->decode_flags & DECODE_NAMED_ROWS) ? 1 : 0; |
|
671
|
|
|
|
|
|
|
|
|
672
|
0
|
0
|
|
|
|
|
Newxz(columns, num_cols, SV**); |
|
673
|
0
|
0
|
|
|
|
|
Newxz(col_types, num_cols, col_type_t*); |
|
674
|
0
|
0
|
|
|
|
|
if (named) { |
|
675
|
0
|
0
|
|
|
|
|
Newxz(cnames, num_cols, const char *); |
|
676
|
0
|
0
|
|
|
|
|
Newx(cname_lens, num_cols, size_t); |
|
677
|
|
|
|
|
|
|
} |
|
678
|
|
|
|
|
|
|
|
|
679
|
0
|
0
|
|
|
|
|
for (c = 0; c < num_cols; c++) { |
|
680
|
|
|
|
|
|
|
const char *cname, *ctype; |
|
681
|
|
|
|
|
|
|
size_t cname_len, ctype_len; |
|
682
|
|
|
|
|
|
|
|
|
683
|
0
|
|
|
|
|
|
rc = read_native_string_ref(dbuf, dlen, &dpos, &cname, &cname_len); |
|
684
|
0
|
0
|
|
|
|
|
if (rc == 0) { |
|
685
|
0
|
0
|
|
|
|
|
if (decompressed) { *errmsg = safe_strdup("truncated cname"); goto data_error; } |
|
686
|
0
|
|
|
|
|
|
goto data_need_more; |
|
687
|
|
|
|
|
|
|
} |
|
688
|
0
|
0
|
|
|
|
|
if (rc < 0) { *errmsg = safe_strdup("malformed cname"); goto data_error; } |
|
689
|
|
|
|
|
|
|
|
|
690
|
0
|
0
|
|
|
|
|
if (named) { |
|
691
|
0
|
|
|
|
|
|
cnames[c] = cname; |
|
692
|
0
|
|
|
|
|
|
cname_lens[c] = cname_len; |
|
693
|
|
|
|
|
|
|
} |
|
694
|
|
|
|
|
|
|
|
|
695
|
|
|
|
|
|
|
/* Allocate AVs on first column of first data block. Reset |
|
696
|
|
|
|
|
|
|
* any partial state from a need-more retry: pipeline_advance |
|
697
|
|
|
|
|
|
|
* cleared at dispatch, but if we returned 0 mid-loop the AVs |
|
698
|
|
|
|
|
|
|
* may now hold a partial column list. */ |
|
699
|
0
|
0
|
|
|
|
|
if (c == 0) { |
|
700
|
0
|
0
|
|
|
|
|
CLEAR_SV(self->native_col_names); |
|
701
|
0
|
0
|
|
|
|
|
CLEAR_SV(self->native_col_types); |
|
702
|
0
|
|
|
|
|
|
self->native_col_names = newAV(); |
|
703
|
0
|
|
|
|
|
|
self->native_col_types = newAV(); |
|
704
|
|
|
|
|
|
|
} |
|
705
|
0
|
|
|
|
|
|
av_push(self->native_col_names, newSVpvn(cname, cname_len)); |
|
706
|
|
|
|
|
|
|
|
|
707
|
0
|
|
|
|
|
|
rc = read_native_string_ref(dbuf, dlen, &dpos, &ctype, &ctype_len); |
|
708
|
0
|
0
|
|
|
|
|
if (rc == 0) { |
|
709
|
0
|
0
|
|
|
|
|
if (decompressed) { *errmsg = safe_strdup("truncated ctype"); goto data_error; } |
|
710
|
0
|
|
|
|
|
|
goto data_need_more; |
|
711
|
|
|
|
|
|
|
} |
|
712
|
0
|
0
|
|
|
|
|
if (rc < 0) { *errmsg = safe_strdup("malformed ctype"); goto data_error; } |
|
713
|
|
|
|
|
|
|
|
|
714
|
0
|
|
|
|
|
|
col_types[c] = parse_col_type(ctype, ctype_len); |
|
715
|
0
|
|
|
|
|
|
av_push(self->native_col_types, newSVpvn(ctype, ctype_len)); |
|
716
|
|
|
|
|
|
|
|
|
717
|
|
|
|
|
|
|
/* custom serialization flag (revision >= 54446) */ |
|
718
|
0
|
0
|
|
|
|
|
if (dpos >= dlen) { |
|
719
|
0
|
0
|
|
|
|
|
if (decompressed) { *errmsg = safe_strdup("truncated custom_ser"); goto data_error; } |
|
720
|
0
|
|
|
|
|
|
goto data_need_more; |
|
721
|
|
|
|
|
|
|
} |
|
722
|
0
|
0
|
|
|
|
|
if ((uint8_t)dbuf[dpos]) { |
|
723
|
0
|
|
|
|
|
|
*errmsg = safe_strdup("custom serialization not supported"); |
|
724
|
0
|
|
|
|
|
|
goto data_error; |
|
725
|
|
|
|
|
|
|
} |
|
726
|
0
|
|
|
|
|
|
dpos++; |
|
727
|
|
|
|
|
|
|
|
|
728
|
|
|
|
|
|
|
/* Allocate LC dict state on first column of first block */ |
|
729
|
0
|
0
|
|
|
|
|
if (c == 0 && !self->lc_dicts) { |
|
|
|
0
|
|
|
|
|
|
|
730
|
0
|
0
|
|
|
|
|
Newxz(self->lc_dicts, num_cols, SV**); |
|
731
|
0
|
0
|
|
|
|
|
Newxz(self->lc_dict_sizes, num_cols, uint64_t); |
|
732
|
0
|
|
|
|
|
|
self->lc_num_cols = (int)num_cols; |
|
733
|
|
|
|
|
|
|
} |
|
734
|
|
|
|
|
|
|
|
|
735
|
|
|
|
|
|
|
{ |
|
736
|
0
|
|
|
|
|
|
int col_err = 0; |
|
737
|
0
|
|
|
|
|
|
columns[c] = decode_column_ex(dbuf, dlen, &dpos, num_rows, col_types[c], &col_err, self->decode_flags, self, (int)c); |
|
738
|
0
|
0
|
|
|
|
|
if (!columns[c]) { |
|
739
|
0
|
0
|
|
|
|
|
if (col_err || decompressed) { |
|
|
|
0
|
|
|
|
|
|
|
740
|
0
|
|
|
|
|
|
*errmsg = safe_strdup("decode_column failed"); |
|
741
|
0
|
|
|
|
|
|
goto data_error; |
|
742
|
|
|
|
|
|
|
} |
|
743
|
0
|
|
|
|
|
|
goto data_need_more; |
|
744
|
|
|
|
|
|
|
} |
|
745
|
|
|
|
|
|
|
} |
|
746
|
|
|
|
|
|
|
} |
|
747
|
|
|
|
|
|
|
|
|
748
|
|
|
|
|
|
|
/* Convert column-oriented to row-oriented */ |
|
749
|
|
|
|
|
|
|
{ |
|
750
|
|
|
|
|
|
|
AV **target; |
|
751
|
0
|
0
|
|
|
|
|
if (ptype == SERVER_TOTALS) { |
|
752
|
0
|
0
|
|
|
|
|
if (!self->native_totals) self->native_totals = newAV(); |
|
753
|
0
|
|
|
|
|
|
target = &self->native_totals; |
|
754
|
0
|
0
|
|
|
|
|
} else if (ptype == SERVER_EXTREMES) { |
|
755
|
0
|
0
|
|
|
|
|
if (!self->native_extremes) self->native_extremes = newAV(); |
|
756
|
0
|
|
|
|
|
|
target = &self->native_extremes; |
|
757
|
|
|
|
|
|
|
} else { |
|
758
|
0
|
0
|
|
|
|
|
if (!self->native_rows) self->native_rows = newAV(); |
|
759
|
0
|
|
|
|
|
|
target = &self->native_rows; |
|
760
|
|
|
|
|
|
|
} |
|
761
|
|
|
|
|
|
|
|
|
762
|
0
|
0
|
|
|
|
|
if (named) { |
|
763
|
0
|
0
|
|
|
|
|
for (r = 0; r < num_rows; r++) { |
|
764
|
0
|
|
|
|
|
|
HV *hv = newHV(); |
|
765
|
0
|
0
|
|
|
|
|
for (c = 0; c < num_cols; c++) { |
|
766
|
0
|
0
|
|
|
|
|
if (!hv_store(hv, cnames[c], cname_lens[c], columns[c][r], 0)) |
|
767
|
0
|
|
|
|
|
|
SvREFCNT_dec(columns[c][r]); |
|
768
|
|
|
|
|
|
|
} |
|
769
|
0
|
|
|
|
|
|
av_push(*target, newRV_noinc((SV*)hv)); |
|
770
|
|
|
|
|
|
|
} |
|
771
|
|
|
|
|
|
|
} else { |
|
772
|
0
|
0
|
|
|
|
|
for (r = 0; r < num_rows; r++) { |
|
773
|
0
|
|
|
|
|
|
AV *row = newAV(); |
|
774
|
0
|
0
|
|
|
|
|
if (num_cols > 0) |
|
775
|
0
|
|
|
|
|
|
av_extend(row, num_cols - 1); |
|
776
|
0
|
0
|
|
|
|
|
for (c = 0; c < num_cols; c++) { |
|
777
|
0
|
|
|
|
|
|
av_push(row, columns[c][r]); |
|
778
|
|
|
|
|
|
|
} |
|
779
|
0
|
|
|
|
|
|
av_push(*target, newRV_noinc((SV*)row)); |
|
780
|
|
|
|
|
|
|
} |
|
781
|
|
|
|
|
|
|
} |
|
782
|
|
|
|
|
|
|
} |
|
783
|
|
|
|
|
|
|
|
|
784
|
|
|
|
|
|
|
/* Fire on_data streaming callback if set (only for DATA, not TOTALS/EXTREMES) */ |
|
785
|
|
|
|
|
|
|
{ |
|
786
|
0
|
0
|
|
|
|
|
SV *on_data = (ptype == SERVER_DATA) ? peek_cb_on_data(self) : NULL; |
|
787
|
0
|
0
|
|
|
|
|
if (on_data && self->native_rows) { |
|
|
|
0
|
|
|
|
|
|
|
788
|
|
|
|
|
|
|
/* Hold a reference across call_sv: a reentrant |
|
789
|
|
|
|
|
|
|
* skip_pending() / cancel() in the handler would |
|
790
|
|
|
|
|
|
|
* otherwise pop the cb_queue entry and free this |
|
791
|
|
|
|
|
|
|
* callback while we're still invoking it. */ |
|
792
|
0
|
|
|
|
|
|
SvREFCNT_inc(on_data); |
|
793
|
0
|
|
|
|
|
|
self->callback_depth++; |
|
794
|
|
|
|
|
|
|
{ |
|
795
|
0
|
|
|
|
|
|
dSP; |
|
796
|
0
|
|
|
|
|
|
ENTER; SAVETMPS; |
|
797
|
0
|
0
|
|
|
|
|
PUSHMARK(SP); |
|
798
|
0
|
|
|
|
|
|
PUSHs(sv_2mortal(newRV_inc((SV*)self->native_rows))); |
|
799
|
0
|
|
|
|
|
|
PUTBACK; |
|
800
|
0
|
|
|
|
|
|
call_sv(on_data, G_DISCARD | G_EVAL); |
|
801
|
0
|
0
|
|
|
|
|
WARN_AND_CLEAR_ERRSV("on_data handler"); |
|
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
802
|
0
|
0
|
|
|
|
|
FREETMPS; LEAVE; |
|
803
|
|
|
|
|
|
|
} |
|
804
|
|
|
|
|
|
|
/* Decrement on_data BEFORE callback_depth-- so a DESTROY |
|
805
|
|
|
|
|
|
|
* triggered by the dec (closure holding last $ch ref) |
|
806
|
|
|
|
|
|
|
* still sees callback_depth > 0 and defers Safefree(self). */ |
|
807
|
0
|
|
|
|
|
|
SvREFCNT_dec(on_data); |
|
808
|
0
|
|
|
|
|
|
self->callback_depth--; |
|
809
|
|
|
|
|
|
|
/* Clear accumulated rows for next block */ |
|
810
|
0
|
0
|
|
|
|
|
CLEAR_SV(self->native_rows); |
|
811
|
0
|
0
|
|
|
|
|
if (check_destroyed(self)) { |
|
812
|
0
|
0
|
|
|
|
|
if (cnames) Safefree(cnames); |
|
813
|
0
|
0
|
|
|
|
|
if (cname_lens) Safefree(cname_lens); |
|
814
|
0
|
0
|
|
|
|
|
for (c = 0; c < num_cols; c++) { |
|
815
|
0
|
|
|
|
|
|
Safefree(columns[c]); |
|
816
|
0
|
|
|
|
|
|
free_col_type(col_types[c]); |
|
817
|
|
|
|
|
|
|
} |
|
818
|
0
|
|
|
|
|
|
Safefree(columns); Safefree(col_types); |
|
819
|
0
|
0
|
|
|
|
|
if (decompressed) Safefree(decompressed); |
|
820
|
0
|
|
|
|
|
|
return -2; |
|
821
|
|
|
|
|
|
|
} |
|
822
|
|
|
|
|
|
|
} |
|
823
|
|
|
|
|
|
|
} |
|
824
|
|
|
|
|
|
|
|
|
825
|
|
|
|
|
|
|
/* Cleanup column arrays (SVs moved to rows, don't dec refcnt) */ |
|
826
|
0
|
0
|
|
|
|
|
for (c = 0; c < num_cols; c++) { |
|
827
|
0
|
|
|
|
|
|
Safefree(columns[c]); |
|
828
|
0
|
|
|
|
|
|
free_col_type(col_types[c]); |
|
829
|
|
|
|
|
|
|
} |
|
830
|
0
|
|
|
|
|
|
Safefree(columns); |
|
831
|
0
|
|
|
|
|
|
Safefree(col_types); |
|
832
|
0
|
0
|
|
|
|
|
if (cnames) Safefree(cnames); |
|
833
|
0
|
0
|
|
|
|
|
if (cname_lens) Safefree(cname_lens); |
|
834
|
0
|
0
|
|
|
|
|
if (decompressed) Safefree(decompressed); |
|
835
|
0
|
|
|
|
|
|
else pos = dpos; /* uncompressed: advance pos to match dpos */ |
|
836
|
|
|
|
|
|
|
|
|
837
|
|
|
|
|
|
|
/* Consume from recv_buf */ |
|
838
|
0
|
|
|
|
|
|
recv_consume(self, pos); |
|
839
|
0
|
|
|
|
|
|
return 1; |
|
840
|
|
|
|
|
|
|
|
|
841
|
0
|
|
|
|
|
|
data_error: |
|
842
|
0
|
|
|
|
|
|
data_need_more: |
|
843
|
|
|
|
|
|
|
/* Cleanup partial decode */ |
|
844
|
0
|
0
|
|
|
|
|
for (c = 0; c < num_cols; c++) { |
|
845
|
0
|
0
|
|
|
|
|
if (columns[c]) { |
|
846
|
|
|
|
|
|
|
uint64_t j; |
|
847
|
0
|
0
|
|
|
|
|
for (j = 0; j < num_rows; j++) { |
|
848
|
0
|
0
|
|
|
|
|
if (columns[c][j]) SvREFCNT_dec(columns[c][j]); |
|
849
|
|
|
|
|
|
|
} |
|
850
|
0
|
|
|
|
|
|
Safefree(columns[c]); |
|
851
|
|
|
|
|
|
|
} |
|
852
|
0
|
0
|
|
|
|
|
if (col_types[c]) free_col_type(col_types[c]); |
|
853
|
|
|
|
|
|
|
} |
|
854
|
0
|
|
|
|
|
|
Safefree(columns); |
|
855
|
0
|
|
|
|
|
|
Safefree(col_types); |
|
856
|
0
|
0
|
|
|
|
|
if (cnames) Safefree(cnames); |
|
857
|
0
|
0
|
|
|
|
|
if (cname_lens) Safefree(cname_lens); |
|
858
|
0
|
0
|
|
|
|
|
if (decompressed) Safefree(decompressed); |
|
859
|
0
|
0
|
|
|
|
|
if (*errmsg) { |
|
860
|
|
|
|
|
|
|
/* data_error: flush recv_buf — data is malformed, cannot resume */ |
|
861
|
0
|
|
|
|
|
|
self->recv_len = 0; |
|
862
|
0
|
|
|
|
|
|
return -1; |
|
863
|
|
|
|
|
|
|
} |
|
864
|
0
|
|
|
|
|
|
return 0; |
|
865
|
|
|
|
|
|
|
} |
|
866
|
|
|
|
|
|
|
} |
|
867
|
|
|
|
|
|
|
|
|
868
|
0
|
|
|
|
|
|
case SERVER_EXCEPTION: { |
|
869
|
|
|
|
|
|
|
/* code: Int32, name: String, message: String, |
|
870
|
|
|
|
|
|
|
* stack_trace: String, has_nested: UInt8 */ |
|
871
|
|
|
|
|
|
|
int32_t code; |
|
872
|
|
|
|
|
|
|
const char *name, *msg, *stack; |
|
873
|
|
|
|
|
|
|
size_t name_len, msg_len, stack_len; |
|
874
|
|
|
|
|
|
|
uint8_t has_nested; |
|
875
|
|
|
|
|
|
|
char *err; |
|
876
|
|
|
|
|
|
|
|
|
877
|
|
|
|
|
|
|
/* We just read the top-level exception */ |
|
878
|
0
|
|
|
|
|
|
rc = read_i32(buf, len, &pos, &code); |
|
879
|
0
|
0
|
|
|
|
|
if (rc == 0) return 0; |
|
880
|
0
|
0
|
|
|
|
|
if (rc < 0) { *errmsg = safe_strdup("malformed exception code"); return -1; } |
|
881
|
|
|
|
|
|
|
|
|
882
|
0
|
|
|
|
|
|
rc = read_native_string_ref(buf, len, &pos, &name, &name_len); |
|
883
|
0
|
0
|
|
|
|
|
if (rc == 0) return 0; |
|
884
|
0
|
0
|
|
|
|
|
if (rc < 0) { *errmsg = safe_strdup("malformed exception name"); return -1; } |
|
885
|
|
|
|
|
|
|
|
|
886
|
0
|
|
|
|
|
|
rc = read_native_string_ref(buf, len, &pos, &msg, &msg_len); |
|
887
|
0
|
0
|
|
|
|
|
if (rc == 0) return 0; |
|
888
|
0
|
0
|
|
|
|
|
if (rc < 0) { *errmsg = safe_strdup("malformed exception message"); return -1; } |
|
889
|
|
|
|
|
|
|
|
|
890
|
0
|
|
|
|
|
|
rc = read_native_string_ref(buf, len, &pos, &stack, &stack_len); |
|
891
|
0
|
0
|
|
|
|
|
if (rc == 0) return 0; |
|
892
|
0
|
0
|
|
|
|
|
if (rc < 0) { *errmsg = safe_strdup("malformed exception stack"); return -1; } |
|
893
|
|
|
|
|
|
|
|
|
894
|
0
|
|
|
|
|
|
rc = read_u8(buf, len, &pos, &has_nested); |
|
895
|
0
|
0
|
|
|
|
|
if (rc == 0) return 0; |
|
896
|
0
|
0
|
|
|
|
|
if (rc < 0) { *errmsg = safe_strdup("malformed exception has_nested"); return -1; } |
|
897
|
|
|
|
|
|
|
|
|
898
|
|
|
|
|
|
|
/* Skip nested exceptions — keep the top-level code, not the innermost. */ |
|
899
|
0
|
0
|
|
|
|
|
while (has_nested) { |
|
900
|
|
|
|
|
|
|
int32_t nested_code; |
|
901
|
|
|
|
|
|
|
int i; |
|
902
|
0
|
|
|
|
|
|
rc = read_i32(buf, len, &pos, &nested_code); |
|
903
|
0
|
0
|
|
|
|
|
if (rc == 0) return 0; |
|
904
|
0
|
0
|
|
|
|
|
if (rc < 0) { *errmsg = safe_strdup("malformed nested exception"); return -1; } |
|
905
|
|
|
|
|
|
|
/* name, message, stack_trace */ |
|
906
|
0
|
0
|
|
|
|
|
for (i = 0; i < 3; i++) { |
|
907
|
0
|
|
|
|
|
|
rc = skip_native_string(buf, len, &pos); |
|
908
|
0
|
0
|
|
|
|
|
if (rc == 0) return 0; |
|
909
|
0
|
0
|
|
|
|
|
if (rc < 0) { *errmsg = safe_strdup("malformed nested exception"); return -1; } |
|
910
|
|
|
|
|
|
|
} |
|
911
|
0
|
|
|
|
|
|
rc = read_u8(buf, len, &pos, &has_nested); |
|
912
|
0
|
0
|
|
|
|
|
if (rc == 0) return 0; |
|
913
|
0
|
0
|
|
|
|
|
if (rc < 0) { *errmsg = safe_strdup("malformed nested exception"); return -1; } |
|
914
|
|
|
|
|
|
|
} |
|
915
|
|
|
|
|
|
|
|
|
916
|
0
|
|
|
|
|
|
self->last_error_code = code; |
|
917
|
|
|
|
|
|
|
|
|
918
|
0
|
|
|
|
|
|
Newx(err, msg_len + name_len + 64, char); |
|
919
|
0
|
|
|
|
|
|
snprintf(err, msg_len + name_len + 64, "Code: %d. %.*s: %.*s", |
|
920
|
|
|
|
|
|
|
(int)code, (int)name_len, name, (int)msg_len, msg); |
|
921
|
|
|
|
|
|
|
|
|
922
|
0
|
|
|
|
|
|
recv_consume(self, pos); |
|
923
|
|
|
|
|
|
|
|
|
924
|
0
|
|
|
|
|
|
*errmsg = err; |
|
925
|
0
|
|
|
|
|
|
return -1; |
|
926
|
|
|
|
|
|
|
} |
|
927
|
|
|
|
|
|
|
|
|
928
|
0
|
|
|
|
|
|
case SERVER_PROGRESS: { |
|
929
|
|
|
|
|
|
|
/* rows, bytes, total_rows, written_rows (>=54420), written_bytes (>=54420) */ |
|
930
|
0
|
|
|
|
|
|
uint64_t pp[5] = {0}; |
|
931
|
0
|
0
|
|
|
|
|
int n = (self->server_revision >= DBMS_MIN_REVISION_WITH_PROGRESS_WRITES) ? 5 : 3; |
|
932
|
|
|
|
|
|
|
int i; |
|
933
|
0
|
0
|
|
|
|
|
for (i = 0; i < n; i++) { |
|
934
|
0
|
|
|
|
|
|
rc = read_varuint(buf, len, &pos, &pp[i]); |
|
935
|
0
|
0
|
|
|
|
|
if (rc == 0) return 0; |
|
936
|
0
|
0
|
|
|
|
|
if (rc < 0) { *errmsg = safe_strdup("malformed progress packet"); return -1; } |
|
937
|
|
|
|
|
|
|
} |
|
938
|
|
|
|
|
|
|
|
|
939
|
0
|
|
|
|
|
|
recv_consume(self, pos); |
|
940
|
|
|
|
|
|
|
|
|
941
|
0
|
0
|
|
|
|
|
if (self->on_progress) { |
|
942
|
|
|
|
|
|
|
/* Coalesce when progress_period is set: the protocol sends |
|
943
|
|
|
|
|
|
|
* incremental deltas, so accumulate and fire at the cadence. */ |
|
944
|
0
|
0
|
|
|
|
|
if (self->progress_period > 0) { |
|
945
|
0
|
0
|
|
|
|
|
for (i = 0; i < 5; i++) self->progress_acc[i] += pp[i]; |
|
946
|
0
|
|
|
|
|
|
double now = ev_now(self->loop); |
|
947
|
0
|
0
|
|
|
|
|
if (now - self->progress_last < self->progress_period) |
|
948
|
0
|
|
|
|
|
|
return 1; |
|
949
|
0
|
|
|
|
|
|
self->progress_last = now; |
|
950
|
0
|
0
|
|
|
|
|
for (i = 0; i < 5; i++) { pp[i] = self->progress_acc[i]; self->progress_acc[i] = 0; } |
|
951
|
|
|
|
|
|
|
} |
|
952
|
|
|
|
|
|
|
|
|
953
|
0
|
0
|
|
|
|
|
if (fire_progress_cb(self, pp) < 0) return -2; |
|
954
|
|
|
|
|
|
|
} |
|
955
|
|
|
|
|
|
|
|
|
956
|
0
|
|
|
|
|
|
return 1; |
|
957
|
|
|
|
|
|
|
} |
|
958
|
|
|
|
|
|
|
|
|
959
|
0
|
|
|
|
|
|
case SERVER_PROFILE_INFO: { |
|
960
|
|
|
|
|
|
|
/* rows, blocks, bytes, applied_limit, rows_before_limit, calc_rows_before_limit */ |
|
961
|
|
|
|
|
|
|
uint64_t pi[6]; |
|
962
|
|
|
|
|
|
|
int i; |
|
963
|
0
|
0
|
|
|
|
|
for (i = 0; i < 6; i++) { |
|
964
|
0
|
|
|
|
|
|
rc = read_varuint(buf, len, &pos, &pi[i]); |
|
965
|
0
|
0
|
|
|
|
|
if (rc == 0) return 0; |
|
966
|
0
|
0
|
|
|
|
|
if (rc < 0) { *errmsg = safe_strdup("malformed profile_info packet"); return -1; } |
|
967
|
|
|
|
|
|
|
} |
|
968
|
0
|
|
|
|
|
|
self->profile_rows = pi[0]; |
|
969
|
0
|
|
|
|
|
|
self->profile_bytes = pi[2]; |
|
970
|
0
|
|
|
|
|
|
self->profile_rows_before_limit = pi[4]; |
|
971
|
0
|
|
|
|
|
|
recv_consume(self, pos); |
|
972
|
0
|
|
|
|
|
|
return 1; |
|
973
|
|
|
|
|
|
|
} |
|
974
|
|
|
|
|
|
|
|
|
975
|
0
|
|
|
|
|
|
case SERVER_TABLE_COLUMNS: { |
|
976
|
|
|
|
|
|
|
/* Format: string(table_name) + string(column_description) */ |
|
977
|
|
|
|
|
|
|
int i; |
|
978
|
0
|
0
|
|
|
|
|
for (i = 0; i < 2; i++) { |
|
979
|
0
|
|
|
|
|
|
rc = skip_native_string(buf, len, &pos); |
|
980
|
0
|
0
|
|
|
|
|
if (rc == 0) return 0; |
|
981
|
0
|
0
|
|
|
|
|
if (rc < 0) { *errmsg = safe_strdup("malformed table_columns packet"); return -1; } |
|
982
|
|
|
|
|
|
|
} |
|
983
|
0
|
|
|
|
|
|
recv_consume(self, pos); |
|
984
|
0
|
|
|
|
|
|
return 1; |
|
985
|
|
|
|
|
|
|
} |
|
986
|
|
|
|
|
|
|
|
|
987
|
0
|
|
|
|
|
|
case SERVER_LOG: { |
|
988
|
0
|
0
|
|
|
|
|
if (self->on_log) { |
|
989
|
0
|
|
|
|
|
|
rc = parse_and_emit_log_block(self, buf, len, &pos, self->on_log, errmsg); |
|
990
|
|
|
|
|
|
|
} else { |
|
991
|
0
|
|
|
|
|
|
rc = parse_and_discard_block(self, buf, len, &pos, "server log", 0, errmsg); |
|
992
|
|
|
|
|
|
|
} |
|
993
|
0
|
0
|
|
|
|
|
if (rc <= 0) return rc; |
|
994
|
0
|
|
|
|
|
|
recv_consume(self, pos); |
|
995
|
0
|
|
|
|
|
|
return 1; |
|
996
|
|
|
|
|
|
|
} |
|
997
|
|
|
|
|
|
|
|
|
998
|
0
|
|
|
|
|
|
case SERVER_PROFILE_EVENTS: { |
|
999
|
0
|
|
|
|
|
|
rc = parse_and_discard_block(self, buf, len, &pos, "profile_events", 1, errmsg); |
|
1000
|
0
|
0
|
|
|
|
|
if (rc <= 0) return rc; |
|
1001
|
0
|
|
|
|
|
|
recv_consume(self, pos); |
|
1002
|
0
|
|
|
|
|
|
return 1; |
|
1003
|
|
|
|
|
|
|
} |
|
1004
|
|
|
|
|
|
|
|
|
1005
|
0
|
|
|
|
|
|
case SERVER_TIMEZONE_UPDATE: { |
|
1006
|
|
|
|
|
|
|
/* Server-side session timezone changed mid-query. Format: string(tz). */ |
|
1007
|
0
|
|
|
|
|
|
char *tz = NULL; |
|
1008
|
0
|
|
|
|
|
|
rc = read_native_string_alloc(buf, len, &pos, &tz, NULL); |
|
1009
|
0
|
0
|
|
|
|
|
if (rc == 0) return 0; |
|
1010
|
0
|
0
|
|
|
|
|
if (rc < 0) { *errmsg = safe_strdup("malformed timezone_update packet"); return -1; } |
|
1011
|
0
|
0
|
|
|
|
|
CLEAR_STR(self->server_timezone); |
|
1012
|
0
|
|
|
|
|
|
self->server_timezone = tz; |
|
1013
|
0
|
|
|
|
|
|
recv_consume(self, pos); |
|
1014
|
0
|
|
|
|
|
|
return 1; |
|
1015
|
|
|
|
|
|
|
} |
|
1016
|
|
|
|
|
|
|
|
|
1017
|
0
|
|
|
|
|
|
case SERVER_PONG: |
|
1018
|
0
|
|
|
|
|
|
recv_consume(self, pos); |
|
1019
|
0
|
|
|
|
|
|
return 3; |
|
1020
|
|
|
|
|
|
|
|
|
1021
|
0
|
|
|
|
|
|
case SERVER_END_OF_STREAM: |
|
1022
|
0
|
|
|
|
|
|
recv_consume(self, pos); |
|
1023
|
0
|
|
|
|
|
|
return 2; |
|
1024
|
|
|
|
|
|
|
|
|
1025
|
0
|
|
|
|
|
|
default: { |
|
1026
|
|
|
|
|
|
|
/* Unknown packet type */ |
|
1027
|
|
|
|
|
|
|
char err[64]; |
|
1028
|
0
|
|
|
|
|
|
snprintf(err, sizeof(err), "unknown server packet type: %llu", |
|
1029
|
|
|
|
|
|
|
(unsigned long long)ptype); |
|
1030
|
0
|
|
|
|
|
|
*errmsg = safe_strdup(err); |
|
1031
|
0
|
|
|
|
|
|
self->recv_len = 0; |
|
1032
|
0
|
|
|
|
|
|
return -1; |
|
1033
|
|
|
|
|
|
|
} |
|
1034
|
|
|
|
|
|
|
} |
|
1035
|
|
|
|
|
|
|
} |
|
1036
|
|
|
|
|
|
|
|
|
1037
|
|
|
|
|
|
|
/* |
|
1038
|
|
|
|
|
|
|
* Process native protocol responses from recv_buf. |
|
1039
|
|
|
|
|
|
|
* Called from on_readable when protocol == PROTO_NATIVE. |
|
1040
|
|
|
|
|
|
|
*/ |
|
1041
|
0
|
|
|
|
|
|
static void process_native_response(ev_clickhouse_t *self) { |
|
1042
|
0
|
0
|
|
|
|
|
while (self->recv_len > 0 && self->magic == EV_CH_MAGIC) { |
|
|
|
0
|
|
|
|
|
|
|
1043
|
0
|
|
|
|
|
|
char *errmsg = NULL; |
|
1044
|
|
|
|
|
|
|
int rc; |
|
1045
|
0
|
|
|
|
|
|
rc = parse_native_packet(self, &errmsg); |
|
1046
|
|
|
|
|
|
|
|
|
1047
|
0
|
0
|
|
|
|
|
if (rc == 0) { |
|
1048
|
|
|
|
|
|
|
/* need more data */ |
|
1049
|
0
|
|
|
|
|
|
return; |
|
1050
|
|
|
|
|
|
|
} |
|
1051
|
|
|
|
|
|
|
|
|
1052
|
0
|
0
|
|
|
|
|
if (rc == -2) { |
|
1053
|
|
|
|
|
|
|
/* object destroyed inside callback */ |
|
1054
|
0
|
|
|
|
|
|
return; |
|
1055
|
|
|
|
|
|
|
} |
|
1056
|
|
|
|
|
|
|
|
|
1057
|
0
|
0
|
|
|
|
|
if (rc == 4) { |
|
1058
|
|
|
|
|
|
|
/* ServerHello received — send addendum (revision >= 54458) */ |
|
1059
|
0
|
0
|
|
|
|
|
if (self->native_state == NATIVE_WAIT_HELLO) { |
|
1060
|
|
|
|
|
|
|
/* Addendum: quota_key (only if server supports it) */ |
|
1061
|
0
|
0
|
|
|
|
|
if (self->server_revision >= DBMS_MIN_PROTOCOL_VERSION_WITH_ADDENDUM) { |
|
1062
|
|
|
|
|
|
|
native_buf_t ab; |
|
1063
|
0
|
|
|
|
|
|
nbuf_init(&ab); |
|
1064
|
0
|
|
|
|
|
|
nbuf_cstring(&ab, ""); /* quota_key */ |
|
1065
|
0
|
|
|
|
|
|
send_replace(self, ab.data, ab.len); |
|
1066
|
0
|
0
|
|
|
|
|
if (try_write(self)) return; |
|
1067
|
0
|
0
|
|
|
|
|
if (self->send_pos < self->send_len) { |
|
1068
|
|
|
|
|
|
|
/* Addendum partially written (EAGAIN); io_cb finishes |
|
1069
|
|
|
|
|
|
|
* connect once the buffer drains, otherwise pipeline_advance |
|
1070
|
|
|
|
|
|
|
* could overwrite the unsent tail with a queued query. */ |
|
1071
|
0
|
|
|
|
|
|
self->pending_addendum_finish = 1; |
|
1072
|
0
|
|
|
|
|
|
return; |
|
1073
|
|
|
|
|
|
|
} |
|
1074
|
|
|
|
|
|
|
} |
|
1075
|
0
|
|
|
|
|
|
self->native_state = NATIVE_IDLE; |
|
1076
|
0
|
0
|
|
|
|
|
if (finish_connect(self)) return; |
|
1077
|
|
|
|
|
|
|
} |
|
1078
|
|
|
|
|
|
|
/* pipeline_advance -> try_write may free self; no data |
|
1079
|
|
|
|
|
|
|
* in recv_buf for the just-dispatched request yet */ |
|
1080
|
0
|
|
|
|
|
|
return; |
|
1081
|
|
|
|
|
|
|
} |
|
1082
|
|
|
|
|
|
|
|
|
1083
|
0
|
0
|
|
|
|
|
if (rc == -1) { |
|
1084
|
|
|
|
|
|
|
/* error */ |
|
1085
|
0
|
0
|
|
|
|
|
if (self->native_state == NATIVE_WAIT_HELLO) { |
|
1086
|
|
|
|
|
|
|
/* Skip auto_reconnect on a malformed ServerHello: the peer |
|
1087
|
|
|
|
|
|
|
* isn't a ClickHouse server, so retrying just spins. |
|
1088
|
|
|
|
|
|
|
* (connect_timeout still uses fail_connection, which retries.) */ |
|
1089
|
0
|
|
|
|
|
|
teardown_io_error(self, errmsg, "connection failed"); |
|
1090
|
0
|
|
|
|
|
|
Safefree(errmsg); |
|
1091
|
0
|
|
|
|
|
|
return; |
|
1092
|
|
|
|
|
|
|
} |
|
1093
|
|
|
|
|
|
|
|
|
1094
|
|
|
|
|
|
|
/* Stop query timeout timer */ |
|
1095
|
0
|
|
|
|
|
|
stop_timing(self); |
|
1096
|
|
|
|
|
|
|
|
|
1097
|
|
|
|
|
|
|
/* Query error — deliver to callback */ |
|
1098
|
0
|
0
|
|
|
|
|
CLEAR_SV(self->native_rows); |
|
1099
|
0
|
0
|
|
|
|
|
CLEAR_INSERT(self); |
|
|
|
0
|
|
|
|
|
|
|
1100
|
0
|
0
|
|
|
|
|
CLEAR_STR(self->insert_err); |
|
1101
|
0
|
|
|
|
|
|
self->native_state = NATIVE_IDLE; |
|
1102
|
0
|
|
|
|
|
|
self->recv_len = 0; /* flush malformed data */ |
|
1103
|
0
|
0
|
|
|
|
|
if (self->send_count > 0) self->send_count--; |
|
1104
|
0
|
|
|
|
|
|
lc_free_dicts(self); |
|
1105
|
0
|
|
|
|
|
|
int destroyed = deliver_error(self, errmsg); |
|
1106
|
0
|
|
|
|
|
|
Safefree(errmsg); |
|
1107
|
0
|
0
|
|
|
|
|
if (destroyed) return; |
|
1108
|
|
|
|
|
|
|
|
|
1109
|
|
|
|
|
|
|
/* advance pipeline — may free self via try_write error */ |
|
1110
|
0
|
|
|
|
|
|
pipeline_advance(self); |
|
1111
|
0
|
|
|
|
|
|
return; |
|
1112
|
|
|
|
|
|
|
} |
|
1113
|
|
|
|
|
|
|
|
|
1114
|
0
|
0
|
|
|
|
|
if (rc == 2) { |
|
1115
|
|
|
|
|
|
|
/* EndOfStream — deliver accumulated rows or deferred error */ |
|
1116
|
0
|
|
|
|
|
|
stop_timing(self); |
|
1117
|
0
|
|
|
|
|
|
self->native_state = NATIVE_IDLE; |
|
1118
|
0
|
0
|
|
|
|
|
if (self->send_count > 0) self->send_count--; |
|
1119
|
0
|
|
|
|
|
|
lc_free_dicts(self); |
|
1120
|
|
|
|
|
|
|
|
|
1121
|
|
|
|
|
|
|
/* Flush any uncoalesced progress accumulated since the last |
|
1122
|
|
|
|
|
|
|
* fire so users instrumenting via on_progress see the full |
|
1123
|
|
|
|
|
|
|
* total when the query completes within one progress_period |
|
1124
|
|
|
|
|
|
|
* of the last fire. */ |
|
1125
|
0
|
0
|
|
|
|
|
if (self->on_progress && self->progress_period > 0) { |
|
|
|
0
|
|
|
|
|
|
|
1126
|
0
|
|
|
|
|
|
int any = 0, i; |
|
1127
|
0
|
0
|
|
|
|
|
for (i = 0; i < 5; i++) if (self->progress_acc[i]) { any = 1; break; } |
|
|
|
0
|
|
|
|
|
|
|
1128
|
0
|
0
|
|
|
|
|
if (any) { |
|
1129
|
|
|
|
|
|
|
uint64_t pp[5]; |
|
1130
|
0
|
|
|
|
|
|
memcpy(pp, self->progress_acc, sizeof(pp)); |
|
1131
|
0
|
|
|
|
|
|
memset(self->progress_acc, 0, sizeof(self->progress_acc)); |
|
1132
|
0
|
0
|
|
|
|
|
if (fire_progress_cb(self, pp) < 0) return; |
|
1133
|
|
|
|
|
|
|
} |
|
1134
|
|
|
|
|
|
|
} |
|
1135
|
|
|
|
|
|
|
|
|
1136
|
0
|
0
|
|
|
|
|
if (self->insert_err) { |
|
1137
|
0
|
|
|
|
|
|
char *err = self->insert_err; |
|
1138
|
0
|
|
|
|
|
|
self->insert_err = NULL; |
|
1139
|
0
|
0
|
|
|
|
|
CLEAR_SV(self->native_rows); |
|
1140
|
0
|
|
|
|
|
|
int destroyed = deliver_error(self, err); |
|
1141
|
0
|
|
|
|
|
|
Safefree(err); |
|
1142
|
0
|
0
|
|
|
|
|
if (destroyed) return; |
|
1143
|
|
|
|
|
|
|
} else { |
|
1144
|
0
|
|
|
|
|
|
AV *rows = self->native_rows; |
|
1145
|
0
|
|
|
|
|
|
self->native_rows = NULL; |
|
1146
|
0
|
0
|
|
|
|
|
if (deliver_rows(self, rows)) return; |
|
1147
|
|
|
|
|
|
|
} |
|
1148
|
|
|
|
|
|
|
|
|
1149
|
|
|
|
|
|
|
/* advance pipeline — may free self via try_write error */ |
|
1150
|
0
|
|
|
|
|
|
pipeline_advance(self); |
|
1151
|
0
|
|
|
|
|
|
return; |
|
1152
|
|
|
|
|
|
|
} |
|
1153
|
|
|
|
|
|
|
|
|
1154
|
0
|
0
|
|
|
|
|
if (rc == 3) { |
|
1155
|
|
|
|
|
|
|
/* Pong — ack a keepalive ping, or deliver to user's ping() cb */ |
|
1156
|
0
|
|
|
|
|
|
self->native_state = NATIVE_IDLE; |
|
1157
|
0
|
0
|
|
|
|
|
if (self->ka_in_flight > 0) { |
|
1158
|
|
|
|
|
|
|
/* Keepalive ack: not tied to send_count or any user cb */ |
|
1159
|
0
|
|
|
|
|
|
self->ka_in_flight--; |
|
1160
|
0
|
|
|
|
|
|
continue; |
|
1161
|
|
|
|
|
|
|
} |
|
1162
|
0
|
|
|
|
|
|
stop_timing(self); |
|
1163
|
0
|
0
|
|
|
|
|
if (self->send_count > 0) self->send_count--; |
|
1164
|
0
|
|
|
|
|
|
AV *rows = newAV(); |
|
1165
|
0
|
0
|
|
|
|
|
if (deliver_rows(self, rows)) return; |
|
1166
|
0
|
|
|
|
|
|
pipeline_advance(self); |
|
1167
|
0
|
|
|
|
|
|
return; |
|
1168
|
|
|
|
|
|
|
} |
|
1169
|
|
|
|
|
|
|
|
|
1170
|
|
|
|
|
|
|
/* rc == 1: Data/Progress/ProfileInfo — continue reading */ |
|
1171
|
|
|
|
|
|
|
} |
|
1172
|
|
|
|
|
|
|
} |
|
1173
|
|
|
|
|
|
|
|