File Coverage

xs/proto_http.c
Criterion Covered Total %
statement 0 285 0.0
branch 0 238 0.0
condition n/a
subroutine n/a
pod n/a
total 0 523 0.0


line stmt bran cond sub pod time code
1             /* --- HTTP request building --- */
2              
3             /* Tiny base64 encoder (RFC 4648). Writes ((src_len + 2) / 3) * 4 bytes plus
4             * a NUL into dst (caller-allocated). Used only for HTTP basic auth. */
5 0           static size_t base64_encode(const unsigned char *src, size_t src_len, char *dst) {
6             static const char ALPHA[] =
7             "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
8 0           size_t i, o = 0;
9 0 0         for (i = 0; i + 3 <= src_len; i += 3) {
10 0           dst[o++] = ALPHA[ src[i] >> 2];
11 0           dst[o++] = ALPHA[((src[i] & 0x03) << 4) | (src[i+1] >> 4)];
12 0           dst[o++] = ALPHA[((src[i+1] & 0x0f) << 2) | (src[i+2] >> 6)];
13 0           dst[o++] = ALPHA[ src[i+2] & 0x3f];
14             }
15 0 0         if (i < src_len) {
16 0           dst[o++] = ALPHA[src[i] >> 2];
17 0 0         if (i + 1 == src_len) {
18 0           dst[o++] = ALPHA[(src[i] & 0x03) << 4];
19 0           dst[o++] = '=';
20             } else {
21 0           dst[o++] = ALPHA[((src[i] & 0x03) << 4) | (src[i+1] >> 4)];
22 0           dst[o++] = ALPHA[ (src[i+1] & 0x0f) << 2];
23             }
24 0           dst[o++] = '=';
25             }
26 0           dst[o] = '\0';
27 0           return o;
28             }
29              
30             /*
31             * Build HTTP POST request. Used for both SELECT (sql in body, url_sql=NULL)
32             * and INSERT (url_sql="INSERT ... FORMAT TabSeparated" in URL, data in body).
33             * Returns malloc'd buffer with full request.
34             */
35 0           static char* build_http_post_request(ev_clickhouse_t *self,
36             const char *url_sql, size_t url_sql_len,
37             const char *body_data, size_t body_data_len,
38             HV *defaults, HV *overrides,
39             size_t *req_len) {
40             char *req;
41             size_t req_cap;
42 0           size_t pos = 0;
43 0           char *body = NULL;
44 0           size_t body_len = body_data_len;
45 0           const char *content_encoding = NULL;
46              
47             /* compress body if requested */
48 0 0         if (self->compress && body_data_len > 0) {
    0          
49             size_t gz_len;
50 0           body = gzip_compress(body_data, body_data_len, &gz_len);
51 0 0         if (body) {
52 0           body_len = gz_len;
53 0           content_encoding = "Content-Encoding: gzip\r\n";
54             }
55             }
56              
57             /* build URL params (dynamically allocated) */
58 0           const char *query_id = NULL;
59 0           STRLEN query_id_len = 0;
60 0           size_t params_cap = 128
61 0 0         + (self->database ? strlen(self->database) * 3 : 0)
62 0 0         + (self->session_id ? strlen(self->session_id) * 3 : 0)
63 0           + url_sql_len * 3
64 0           + settings_url_params_size(defaults, overrides);
65             char *params;
66 0           size_t plen = 0;
67 0           Newx(params, params_cap, char);
68 0 0         if (self->database) {
69 0           plen = (size_t)snprintf(params, params_cap, "?database=");
70 0           plen += url_encode(self->database, strlen(self->database), params + plen);
71 0           plen += (size_t)snprintf(params + plen, params_cap - plen, "&wait_end_of_query=1");
72             } else {
73 0           plen = (size_t)snprintf(params, params_cap, "?wait_end_of_query=1");
74             }
75 0 0         if (self->session_id) {
76 0           plen += (size_t)snprintf(params + plen, params_cap - plen, "&session_id=");
77 0           plen += url_encode(self->session_id, strlen(self->session_id), params + plen);
78             }
79 0 0         if (url_sql) {
80 0           plen += (size_t)snprintf(params + plen, params_cap - plen, "&query=");
81 0           plen += url_encode(url_sql, url_sql_len, params + plen);
82             }
83 0           plen = append_settings_url_params(params, plen,
84             defaults, overrides,
85             &query_id, &query_id_len);
86 0 0         if (query_id) {
87 0           size_t need = plen + 10 + query_id_len * 3 + 1;
88 0 0         if (need > params_cap) {
89 0           params_cap = need;
90 0           Renew(params, params_cap, char);
91             }
92 0           plen += (size_t)snprintf(params + plen, params_cap - plen, "&query_id=");
93 0           plen += url_encode(query_id, query_id_len, params + plen);
94             }
95 0           params[plen] = '\0';
96              
97             /* user/password are quoted as-is in the X-ClickHouse-* form, or
98             * base64-expanded ((n+2)/3)*4 in the Basic auth form. The 4/3 factor
99             * is enough either way. */
100 0           req_cap = 512 + body_len + plen
101 0 0         + (self->host ? strlen(self->host) : 0)
102 0 0         + (self->user ? strlen(self->user) * 2 : 0)
103 0 0         + (self->password ? strlen(self->password) * 2 : 0);
104 0           Newx(req, req_cap, char);
105              
106             /* request line + headers */
107 0           pos += snprintf(req + pos, req_cap - pos,
108             "POST /%s HTTP/1.1\r\n", params);
109 0           Safefree(params);
110 0           pos += snprintf(req + pos, req_cap - pos,
111             "Host: %s:%u\r\n", self->host, self->port);
112 0 0         if (self->http_basic_auth && self->user) {
    0          
113             /* "user:pass" → base64. Single allocation: [cred][b64]. */
114 0           size_t ul = strlen(self->user);
115 0 0         size_t pl = self->password ? strlen(self->password) : 0;
116 0           size_t cred_len = ul + 1 + pl;
117 0           size_t b64_cap = ((cred_len + 2) / 3) * 4 + 1;
118             char *buf;
119 0           Newx(buf, cred_len + b64_cap, char);
120 0           memcpy(buf, self->user, ul);
121 0           buf[ul] = ':';
122 0 0         if (pl) memcpy(buf + ul + 1, self->password, pl);
123 0           base64_encode((const unsigned char *)buf, cred_len, buf + cred_len);
124 0           pos += snprintf(req + pos, req_cap - pos,
125             "Authorization: Basic %s\r\n", buf + cred_len);
126 0           Safefree(buf);
127             } else {
128 0 0         if (self->user)
129 0           pos += snprintf(req + pos, req_cap - pos,
130             "X-ClickHouse-User: %s\r\n", self->user);
131 0 0         if (self->password && self->password[0])
    0          
132 0           pos += snprintf(req + pos, req_cap - pos,
133             "X-ClickHouse-Key: %s\r\n", self->password);
134             }
135 0           pos += snprintf(req + pos, req_cap - pos, "Connection: keep-alive\r\n");
136 0 0         if (self->compress)
137 0           pos += snprintf(req + pos, req_cap - pos, "Accept-Encoding: gzip\r\n");
138 0 0         if (content_encoding)
139 0           pos += snprintf(req + pos, req_cap - pos, "%s", content_encoding);
140 0           pos += snprintf(req + pos, req_cap - pos,
141             "Content-Length: %lu\r\n\r\n", (unsigned long)body_len);
142              
143             /* body */
144 0 0         if (body_len > 0) {
145 0 0         if (pos + body_len > req_cap) {
146 0           req_cap = pos + body_len + 1;
147 0           Renew(req, req_cap, char);
148             }
149 0 0         Copy(body ? body : body_data, req + pos, body_len, char);
150 0           pos += body_len;
151             }
152              
153 0 0         if (body) Safefree(body);
154              
155 0           *req_len = pos;
156 0           return req;
157             }
158              
159             /* Build HTTP GET /ping request */
160 0           static char* build_http_ping_request(ev_clickhouse_t *self, size_t *req_len) {
161             char *req;
162 0 0         size_t req_cap = 128 + (self->host ? strlen(self->host) : 0);
163 0           size_t pos = 0;
164              
165 0           Newx(req, req_cap, char);
166 0           pos = snprintf(req, req_cap,
167             "GET /ping HTTP/1.1\r\n"
168             "Host: %s:%u\r\n"
169             "Connection: keep-alive\r\n\r\n",
170             self->host, self->port);
171 0 0         if (pos >= req_cap) pos = req_cap - 1;
172 0           *req_len = pos;
173 0           return req;
174             }
175              
176             /* --- HTTP response parsing --- */
177              
178             /* Length-bounded uint parser. Stops at the first non-digit OR at `len`,
179             * whichever comes first. Used to parse X-ClickHouse-Summary values out
180             * of a non-NUL-terminated header buffer without letting strtoull scan
181             * into bytes past the header on malformed input. */
182 0           static uint64_t parse_uint_within(const char *p, size_t len) {
183 0           uint64_t n = 0;
184             size_t i;
185 0 0         for (i = 0; i < len; i++) {
186 0           unsigned char c = (unsigned char)p[i];
187 0 0         if (c < '0' || c > '9') break;
    0          
188 0           n = n * 10 + (c - '0');
189             }
190 0           return n;
191             }
192              
193             /* Find \r\n\r\n in recv_buf. Returns offset past it, or 0 if not found. */
194 0           static size_t find_header_end(const char *buf, size_t len) {
195             size_t i;
196 0 0         if (len < 4) return 0;
197 0 0         for (i = 0; i <= len - 4; i++) {
198 0 0         if (buf[i] == '\r' && buf[i+1] == '\n' &&
    0          
199 0 0         buf[i+2] == '\r' && buf[i+3] == '\n') {
    0          
200 0           return i + 4;
201             }
202             }
203 0           return 0;
204             }
205              
206             /* Extract ClickHouse error code from HTTP error body ("Code: NNN. ...") */
207 0           static int32_t parse_ch_error_code(const char *body, size_t len) {
208 0 0         if (len > 6 && memcmp(body, "Code: ", 6) == 0)
    0          
209 0           return (int32_t)atoi(body + 6);
210 0           return 0;
211             }
212              
213             /* Format an HTTP error response into a Newx-allocated "HTTP NNN: ..." message
214             * and update self->last_error_code. Body may be gzip-compressed. Caller must
215             * Safefree the returned pointer. */
216 0           static char* format_http_error(ev_clickhouse_t *self, int status,
217             const char *body, size_t body_len, int is_gzip) {
218             char *errmsg;
219 0           char *err_body = (char *)body;
220 0           size_t err_len = body_len;
221 0 0         if (is_gzip && body_len > 0) {
    0          
222             size_t dec_len;
223 0           char *dec = gzip_decompress(body, body_len, &dec_len);
224 0 0         if (dec) { err_body = dec; err_len = dec_len; }
225             }
226 0 0         while (err_len > 0 && (err_body[err_len-1] == '\n' || err_body[err_len-1] == '\r'))
    0          
    0          
227 0           err_len--;
228 0           self->last_error_code = parse_ch_error_code(err_body, err_len);
229 0           Newx(errmsg, err_len + 64, char);
230 0           snprintf(errmsg, err_len + 64, "HTTP %d: %.*s",
231             status, (int)err_len, err_body);
232 0 0         if (err_body != body) Safefree(err_body);
233 0           return errmsg;
234             }
235              
236             /* Parse HTTP status line, extract status code */
237 0           static int parse_http_status(const char *buf, size_t len) {
238             /* HTTP/1.1 200 OK\r\n */
239 0           const char *p = buf;
240 0           const char *end = buf + len;
241             int status;
242              
243             /* skip "HTTP/1.x " */
244 0 0         while (p < end && *p != ' ') p++;
    0          
245 0 0         if (p >= end) return 0;
246 0           p++;
247              
248 0           status = atoi(p);
249 0 0         if (status < 100 || status > 599) return 500; /* treat malformed as server error */
    0          
250 0           return status;
251             }
252              
253             /* Find header value (case-insensitive). Returns pointer into buf or NULL. */
254 0           static const char* find_header(const char *headers, size_t headers_len,
255             const char *name, size_t *value_len) {
256 0           size_t name_len = strlen(name);
257 0           const char *p = headers;
258 0           const char *end = headers + headers_len;
259              
260 0 0         while (p < end) {
261 0           const char *line_end = p;
262 0 0         while (line_end < end && *line_end != '\r') line_end++;
    0          
263              
264 0 0         if ((size_t)(line_end - p) > name_len + 1 && p[name_len] == ':') {
    0          
265 0           int match = 1;
266             size_t i;
267 0 0         for (i = 0; i < name_len; i++) {
268 0 0         if (tolower((unsigned char)p[i]) != tolower((unsigned char)name[i])) {
269 0           match = 0;
270 0           break;
271             }
272             }
273 0 0         if (match) {
274 0           const char *val = p + name_len + 1;
275 0 0         while (val < line_end && *val == ' ') val++;
    0          
276 0           *value_len = line_end - val;
277 0           return val;
278             }
279             }
280              
281             /* advance past \r\n */
282 0 0         if (line_end + 2 <= end) p = line_end + 2;
283 0           else break;
284             }
285 0           return NULL;
286             }
287              
288             /* Parse a complete HTTP response from recv_buf. */
289 0           static void process_http_response(ev_clickhouse_t *self) {
290             size_t hdr_end;
291             int status;
292             const char *val;
293             size_t val_len;
294 0           size_t content_length = 0;
295 0           int chunked = 0;
296 0           int is_gzip = 0;
297             const char *body;
298             size_t body_len;
299 0           char *decoded = NULL;
300 0           size_t decoded_len = 0;
301 0           size_t decoded_cap = 0;
302              
303 0 0         if (self->recv_len == 0 || self->send_count == 0) return;
    0          
304              
305             /* find headers end */
306 0           hdr_end = find_header_end(self->recv_buf, self->recv_len);
307 0 0         if (hdr_end == 0) return; /* need more data */
308              
309             /* parse status */
310 0           status = parse_http_status(self->recv_buf, hdr_end);
311              
312             /* parse Content-Length */
313 0           val = find_header(self->recv_buf, hdr_end, "Content-Length", &val_len);
314 0 0         if (val) {
315 0           content_length = (size_t)strtoul(val, NULL, 10);
316             }
317              
318             /* check Transfer-Encoding: chunked */
319 0           val = find_header(self->recv_buf, hdr_end, "Transfer-Encoding", &val_len);
320 0 0         if (val && val_len >= 7 && strncasecmp(val, "chunked", 7) == 0) {
    0          
    0          
321 0           chunked = 1;
322             }
323              
324             /* check Content-Encoding: gzip */
325 0           val = find_header(self->recv_buf, hdr_end, "Content-Encoding", &val_len);
326 0 0         if (val && val_len >= 4 && strncasecmp(val, "gzip", 4) == 0) {
    0          
    0          
327 0           is_gzip = 1;
328             }
329              
330             /* parse X-ClickHouse-Summary: {"read_rows":"N","read_bytes":"N",...}
331             * to populate profile_rows/profile_bytes for HTTP, mirroring the
332             * native protocol's SERVER_PROFILE_INFO packet. */
333 0           val = find_header(self->recv_buf, hdr_end, "X-ClickHouse-Summary", &val_len);
334 0 0         if (val && val_len > 12) {
    0          
335             size_t i;
336 0 0         for (i = 0; i + 12 < val_len; i++) {
337 0 0         if (val[i] != '"') continue;
338             /* val isn't NUL-terminated (it points into recv_buf), so use
339             * parse_uint_within which takes an explicit length cap rather
340             * than scanning until a non-digit/NUL like strtoull would. */
341 0 0         if (i + 13 < val_len
342 0 0         && memcmp(val + i + 1, "read_rows\":\"", 12) == 0)
343 0           self->profile_rows = parse_uint_within(val + i + 13,
344 0           val_len - (i + 13));
345 0 0         else if (i + 14 < val_len
346 0 0         && memcmp(val + i + 1, "read_bytes\":\"", 13) == 0)
347 0           self->profile_bytes = parse_uint_within(val + i + 14,
348 0           val_len - (i + 14));
349             }
350             }
351              
352             size_t consumed;
353 0 0         if (chunked) {
354             /* decode chunked transfer encoding */
355 0           const char *cp = self->recv_buf + hdr_end;
356 0           const char *cp_end = self->recv_buf + self->recv_len;
357              
358             {
359 0           int chunked_complete = 0;
360 0 0         while (cp < cp_end) {
361             /* read chunk size */
362 0           const char *nl = cp;
363             unsigned long chunk_size;
364 0 0         while (nl < cp_end && *nl != '\r') nl++;
    0          
365 0 0         if (nl + 2 > cp_end) goto need_more; /* need more data */
366              
367 0           chunk_size = strtoul(cp, NULL, 16);
368 0           cp = nl + 2; /* skip \r\n */
369              
370 0 0         if (chunk_size == 0) {
371             /* terminal chunk; skip trailing \r\n */
372 0 0         if (cp + 2 > cp_end) goto need_more;
373 0           cp += 2;
374 0           chunked_complete = 1;
375 0           break;
376             }
377              
378 0 0         if ((size_t)(cp_end - cp) < 2
379 0 0         || chunk_size > (size_t)(cp_end - cp) - 2) goto need_more;
380              
381             /* guard against overflow and unbounded growth —
382             * close connection since remaining chunks would
383             * corrupt the stream for subsequent requests.
384             * Apply both the hard ceiling and the user's
385             * opt-in max_recv_buffer (when set) to the
386             * post-decode body size. */
387 0           size_t cap = CH_MAX_DECOMPRESS_SIZE;
388 0 0         if (self->max_recv_buffer > 0
389 0 0         && self->max_recv_buffer < cap)
390 0           cap = self->max_recv_buffer;
391 0 0         if (decoded_len + chunk_size < decoded_len
392 0 0         || decoded_len + chunk_size > cap) {
393 0 0         if (decoded) Safefree(decoded);
394 0           self->send_count--;
395 0           teardown_after_deliver(self,
396             "chunked response too large", "connection closed");
397 0           return;
398             }
399 0 0         if (decoded == NULL) {
400 0           decoded_cap = chunk_size + 256;
401 0           Newx(decoded, decoded_cap, char);
402 0 0         } else if (decoded_len + chunk_size > decoded_cap) {
403 0           decoded_cap = (decoded_len + chunk_size) * 2;
404 0           Renew(decoded, decoded_cap, char);
405             }
406 0           Copy(cp, decoded + decoded_len, chunk_size, char);
407 0           decoded_len += chunk_size;
408 0           cp += chunk_size + 2; /* skip chunk data + \r\n */
409             }
410              
411 0 0         if (!chunked_complete) goto need_more;
412             }
413              
414 0           body = decoded;
415 0           body_len = decoded_len;
416 0           consumed = cp - self->recv_buf;
417             } else {
418             /* Content-Length based */
419 0 0         if (self->recv_len < hdr_end + content_length) return; /* need more data */
420 0           body = self->recv_buf + hdr_end;
421 0           body_len = content_length;
422 0           consumed = hdr_end + content_length;
423             }
424              
425             /* deliver response (body, body_len, consumed are set; decoded may be NULL) */
426 0           self->send_count--;
427 0 0         if (status == 200) {
428 0           char *final_body = (char *)body;
429 0           size_t final_len = body_len;
430              
431 0 0         if (is_gzip && body_len > 0) {
    0          
432             size_t dec_len;
433 0           char *dec = gzip_decompress(body, body_len, &dec_len);
434 0 0         if (!dec) {
435 0 0         if (decoded) Safefree(decoded);
436 0           recv_consume(self, consumed);
437 0           int destroyed = deliver_error(self, "gzip decompression failed");
438 0 0         if (destroyed) return;
439 0           goto done;
440             }
441             /* Apply user's opt-in max_recv_buffer to the gzip-decoded
442             * body too — same trip-wire semantics as the chunked path
443             * and the on_readable raw-recv check. Tear the connection
444             * down on overflow so subsequent queries can't slip past
445             * the cap on the same socket (matches the chunked path's
446             * behaviour and the POD contract). */
447 0 0         if (self->max_recv_buffer > 0
448 0 0         && dec_len > self->max_recv_buffer) {
449 0           Safefree(dec);
450 0 0         if (decoded) Safefree(decoded);
451 0           teardown_after_deliver(self,
452             "gzip body exceeds max_recv_buffer", "connection closed");
453 0           return;
454             }
455 0           final_body = dec;
456 0           final_len = dec_len;
457             }
458              
459             {
460 0           int is_raw = peek_cb_raw(self);
461             int destroyed;
462 0 0         if (is_raw) {
463             /* raw mode — deliver body as scalar, skip TSV parsing */
464 0           destroyed = deliver_raw_body(self, final_body, final_len);
465             } else {
466 0           AV *rows = NULL;
467 0 0         if (final_len > 0)
468 0           rows = parse_tab_separated(final_body, final_len);
469 0           destroyed = deliver_rows(self, rows);
470             }
471 0 0         if (final_body != body) Safefree(final_body);
472 0 0         if (decoded) Safefree(decoded);
473 0           recv_consume(self, consumed);
474 0 0         if (destroyed) return;
475             }
476             } else {
477             /* error */
478 0           char *errmsg = format_http_error(self, status, body, body_len, is_gzip);
479 0 0         if (decoded) Safefree(decoded);
480 0           recv_consume(self, consumed);
481 0           int destroyed = deliver_error(self, errmsg);
482 0           Safefree(errmsg);
483 0 0         if (destroyed) return;
484             }
485              
486 0 0         if (self->magic != EV_CH_MAGIC) return;
487              
488 0           done:
489             /* Stop query timeout timer on response */
490 0           stop_timing(self);
491 0           pipeline_advance(self);
492 0           return;
493              
494 0           need_more:
495             /* incomplete response — keep reading */
496 0 0         if (decoded) Safefree(decoded);
497 0           return;
498             }
499