File Coverage

xs/proto_native_build.c
Criterion Covered Total %
statement 0 74 0.0
branch 0 20 0.0
condition n/a
subroutine n/a
pod n/a
total 0 94 0.0


line stmt bran cond sub pod time code
1             /* --- Native protocol packet builders --- */
2              
3 0           static char* build_native_hello(ev_clickhouse_t *self, size_t *out_len) {
4             native_buf_t b;
5 0           nbuf_init(&b);
6              
7 0           nbuf_varuint(&b, CLIENT_HELLO);
8 0           nbuf_cstring(&b, CH_CLIENT_NAME);
9 0           nbuf_varuint(&b, CH_CLIENT_VERSION_MAJOR);
10 0           nbuf_varuint(&b, CH_CLIENT_VERSION_MINOR);
11 0           nbuf_varuint(&b, CH_CLIENT_REVISION);
12 0 0         nbuf_cstring(&b, self->database ? self->database : "default");
13 0 0         nbuf_cstring(&b, self->user ? self->user : "default");
14 0 0         nbuf_cstring(&b, self->password ? self->password : "");
15              
16 0           *out_len = b.len;
17 0           return b.data;
18             }
19              
20 0           static char* build_native_ping(size_t *out_len) {
21             native_buf_t b;
22 0           nbuf_init(&b);
23 0           nbuf_varuint(&b, CLIENT_PING);
24 0           *out_len = b.len;
25 0           return b.data;
26             }
27              
28             /* Write a Data-block info header (field_num=1 is_overflows + field_num=2
29             * bucket_num=-1 + end marker). Required for revision >= DBMS_MIN_REVISION_WITH_BLOCK_INFO. */
30 0           static void nbuf_block_info(native_buf_t *b) {
31 0           int32_t bucket = -1;
32 0           nbuf_varuint(b, 1); /* field_num = 1 */
33 0           nbuf_u8(b, 0); /* is_overflows = false */
34 0           nbuf_varuint(b, 2); /* field_num = 2 */
35 0           nbuf_append(b, (const char *)&bucket, 4); /* bucket_num = -1 */
36 0           nbuf_varuint(b, 0); /* end of block info */
37 0           }
38              
39             /* Build an empty Data block (signals end of client data after Query) */
40 0           static void nbuf_empty_data_block(native_buf_t *b, int do_compress) {
41 0           nbuf_varuint(b, CLIENT_DATA);
42 0           nbuf_cstring(b, ""); /* table name — outside compression */
43              
44             /* block body: block info + num_cols + num_rows */
45             #ifdef HAVE_LZ4
46             if (do_compress) {
47             native_buf_t body;
48             char *compressed;
49             size_t comp_len;
50              
51             nbuf_init(&body);
52             nbuf_block_info(&body);
53             nbuf_varuint(&body, 0); /* num_columns = 0 */
54             nbuf_varuint(&body, 0); /* num_rows = 0 */
55              
56             compressed = ch_lz4_compress(body.data, body.len, &comp_len);
57             Safefree(body.data);
58             if (compressed) {
59             nbuf_append(b, compressed, comp_len);
60             Safefree(compressed);
61             return;
62             }
63             /* LZ4 failed (should never happen) — fall through to uncompressed */
64             }
65             #else
66             (void)do_compress;
67             #endif
68              
69 0           nbuf_block_info(b);
70 0           nbuf_varuint(b, 0); /* num_columns = 0 */
71 0           nbuf_varuint(b, 0); /* num_rows = 0 */
72 0           }
73              
74 0           static char* build_native_query(ev_clickhouse_t *self, const char *sql,
75             size_t sql_len, HV *defaults, HV *overrides,
76             const char *ext_data, size_t ext_len,
77             size_t *out_len) {
78             native_buf_t b;
79 0           const char *query_id = NULL;
80 0           STRLEN query_id_len = 0;
81 0           nbuf_init(&b);
82              
83             /* Pre-scan settings for query_id (needed before settings block) */
84             {
85             SV **svp;
86 0 0         if (overrides && (svp = hv_fetch(overrides, "query_id", 8, 0)))
    0          
87 0           query_id = SvPV(*svp, query_id_len);
88 0 0         else if (defaults && (svp = hv_fetch(defaults, "query_id", 8, 0)))
    0          
89 0           query_id = SvPV(*svp, query_id_len);
90             }
91              
92             /* Query packet */
93 0           nbuf_varuint(&b, CLIENT_QUERY);
94 0 0         nbuf_string(&b, query_id ? query_id : "", query_id_len);
95              
96             /* Client info — field order must match ClientInfo::read() */
97 0           nbuf_u8(&b, QUERY_INITIAL);
98 0           nbuf_cstring(&b, ""); /* initial_user */
99 0           nbuf_cstring(&b, ""); /* initial_query_id */
100 0           nbuf_cstring(&b, "[::ffff:127.0.0.1]:0"); /* initial_address */
101              
102             /* initial_query_start_time_microseconds (revision >= 54449) */
103             {
104 0           uint64_t zero64 = 0;
105 0           nbuf_append(&b, (const char *)&zero64, 8);
106             }
107              
108             /* iface_type: 1=TCP, os_user, client_hostname, client_name */
109 0           nbuf_u8(&b, 1);
110 0           nbuf_cstring(&b, ""); /* os_user */
111 0           nbuf_cstring(&b, ""); /* client_hostname */
112 0           nbuf_cstring(&b, CH_CLIENT_NAME);
113 0           nbuf_varuint(&b, CH_CLIENT_VERSION_MAJOR);
114 0           nbuf_varuint(&b, CH_CLIENT_VERSION_MINOR);
115 0           nbuf_varuint(&b, CH_CLIENT_REVISION);
116              
117             /* quota_key_in_client_info (always present, revision >= ~54060) */
118 0           nbuf_cstring(&b, "");
119              
120             /* distributed_depth (revision >= 54448) */
121 0           nbuf_varuint(&b, 0);
122              
123             /* version_patch (revision >= 54401) */
124 0           nbuf_varuint(&b, 0);
125              
126             /* OpenTelemetry trace context (revision >= 54442): no trace */
127 0           nbuf_u8(&b, 0);
128              
129             /* parallel_replicas (revision >= 54453) */
130 0           nbuf_varuint(&b, 0); /* collaborate_with_initiator */
131 0           nbuf_varuint(&b, 0); /* count_participating_replicas */
132 0           nbuf_varuint(&b, 0); /* number_of_current_replica */
133              
134             /* Settings (serialized as strings: revision >= 54429)
135             * Format: repeated (String name, UInt8 is_important, String value),
136             * terminated by empty name. */
137 0           write_native_settings(&b, defaults, overrides);
138 0           nbuf_cstring(&b, ""); /* empty name = end of settings */
139              
140             /* interserver_secret: empty string (revision >= 54441) */
141 0           nbuf_cstring(&b, "");
142              
143             /* state (stage), compression, query */
144 0           nbuf_varuint(&b, STAGE_COMPLETE);
145             #ifdef HAVE_LZ4
146             nbuf_varuint(&b, self->compress ? 1 : 0);
147             #else
148 0           nbuf_varuint(&b, 0);
149             #endif
150 0           nbuf_string(&b, sql, sql_len);
151              
152             /* Parameters block: param_* keys, terminated by empty name. */
153 0           write_native_params(&b, defaults, overrides);
154 0           nbuf_cstring(&b, ""); /* end of parameters */
155              
156             /* External tables: named Data packets the query can reference as
157             * tables, sent before the terminating empty block. */
158 0 0         if (ext_data && ext_len) nbuf_append(&b, ext_data, ext_len);
    0          
159              
160 0           nbuf_empty_data_block(&b, self->compress);
161              
162 0           *out_len = b.len;
163 0           return b.data;
164             }
165