File Coverage

xs/codecs.c
Criterion Covered Total %
statement 0 120 0.0
branch 0 84 0.0
condition n/a
subroutine n/a
pod n/a
total 0 204 0.0


line stmt bran cond sub pod time code
1             /* --- Gzip compression/decompression --- */
2              
3             /* Compress data with gzip. Returns malloc'd buffer, sets *out_len. NULL on error. */
4 0           static char* gzip_compress(const char *data, size_t data_len, size_t *out_len) {
5             z_stream strm;
6             char *out;
7             size_t out_cap;
8             int ret;
9              
10 0 0         if (data_len > (size_t)UINT_MAX) return NULL;
11              
12 0           Zero(&strm, 1, z_stream);
13 0           ret = deflateInit2(&strm, Z_DEFAULT_COMPRESSION, Z_DEFLATED, 15 + 16, 8, Z_DEFAULT_STRATEGY);
14 0 0         if (ret != Z_OK) return NULL;
15              
16 0           out_cap = deflateBound(&strm, (uLong)data_len);
17 0 0         if (out_cap > (size_t)UINT_MAX) { deflateEnd(&strm); return NULL; }
18 0           Newx(out, out_cap, char);
19              
20 0           strm.next_in = (Bytef *)data;
21 0           strm.avail_in = (uInt)data_len;
22 0           strm.next_out = (Bytef *)out;
23 0           strm.avail_out = (uInt)out_cap;
24              
25 0           ret = deflate(&strm, Z_FINISH);
26 0 0         if (ret != Z_STREAM_END) {
27 0           Safefree(out);
28 0           deflateEnd(&strm);
29 0           return NULL;
30             }
31              
32 0           *out_len = strm.total_out;
33 0           deflateEnd(&strm);
34 0           return out;
35             }
36              
37             /* Decompress gzip data. Returns malloc'd buffer, sets *out_len. NULL on error. */
38 0           static char* gzip_decompress(const char *data, size_t data_len, size_t *out_len) {
39             z_stream strm;
40             char *out;
41             size_t out_cap;
42             int ret;
43              
44 0 0         if (data_len > (size_t)UINT_MAX) return NULL;
45              
46 0           Zero(&strm, 1, z_stream);
47 0           ret = inflateInit2(&strm, 15 + 16); /* auto-detect gzip */
48 0 0         if (ret != Z_OK) return NULL;
49              
50             /* Estimate 4x expansion, but clamp to CH_MAX_DECOMPRESS_SIZE so the
51             * initial allocation can never exceed the cap on its own. Comparing
52             * data_len against cap/4 (instead of multiplying first) also avoids a
53             * size_t overflow of data_len*4 on 32-bit. If the real output is
54             * larger than the cap, the doubling branch below trips the limit. */
55 0 0         if (data_len > CH_MAX_DECOMPRESS_SIZE / 4)
56 0           out_cap = CH_MAX_DECOMPRESS_SIZE;
57             else
58 0           out_cap = data_len * 4;
59 0 0         if (out_cap < 4096) out_cap = 4096;
60 0           Newx(out, out_cap, char);
61              
62 0           strm.next_in = (Bytef *)data;
63 0           strm.avail_in = (uInt)data_len;
64              
65 0           *out_len = 0;
66             do {
67 0 0         if (*out_len + 4096 > out_cap) {
68 0           out_cap *= 2;
69 0 0         if (out_cap > CH_MAX_DECOMPRESS_SIZE) {
70 0           Safefree(out);
71 0           inflateEnd(&strm);
72 0           return NULL;
73             }
74 0           Renew(out, out_cap, char);
75             }
76 0           strm.next_out = (Bytef *)(out + *out_len);
77 0           strm.avail_out = (uInt)(out_cap - *out_len);
78              
79 0           ret = inflate(&strm, Z_NO_FLUSH);
80 0 0         if (ret == Z_STREAM_ERROR || ret == Z_DATA_ERROR ||
    0          
    0          
81 0 0         ret == Z_MEM_ERROR || ret == Z_BUF_ERROR) {
82 0           Safefree(out);
83 0           inflateEnd(&strm);
84 0           return NULL;
85             }
86 0           *out_len = strm.total_out;
87 0 0         } while (ret != Z_STREAM_END);
88              
89 0           inflateEnd(&strm);
90 0           return out;
91             }
92              
93             #ifdef HAVE_LZ4
94              
95             /*
96             * Decompress a ClickHouse LZ4 compressed block.
97             * Input: compressed block starting at checksum (16 + 9 + payload bytes).
98             * Returns malloc'd buffer with decompressed data, sets *out_len.
99             * Returns NULL on error or if need more data (sets *need_more=1).
100             */
101             static char* ch_lz4_decompress(const char *data, size_t data_len,
102             size_t *out_len, size_t *consumed,
103             int *need_more, const char **err_reason) {
104             uint32_t compressed_with_header, uncompressed_size;
105             uint32_t payload_size;
106             uint8_t method;
107             char *out;
108             int ret;
109              
110             *need_more = 0;
111             *consumed = 0;
112             if (err_reason) *err_reason = NULL;
113              
114             /* Need at least checksum (16) + header (9) */
115             if (data_len < CH_CHECKSUM_SIZE + CH_COMPRESS_HEADER_SIZE) {
116             *need_more = 1;
117             return NULL;
118             }
119              
120             /* Read header fields (after 16-byte checksum) */
121             method = (uint8_t)data[CH_CHECKSUM_SIZE];
122             if (method != CH_LZ4_METHOD) {
123             if (err_reason) *err_reason = "unsupported compression method";
124             return NULL;
125             }
126             memcpy(&compressed_with_header, data + CH_CHECKSUM_SIZE + 1, 4);
127             memcpy(&uncompressed_size, data + CH_CHECKSUM_SIZE + 5, 4);
128              
129             if (uncompressed_size > CH_MAX_DECOMPRESS_SIZE) {
130             if (err_reason) *err_reason = "decompressed size exceeds 128 MB limit";
131             return NULL;
132             }
133              
134             if (compressed_with_header < CH_COMPRESS_HEADER_SIZE) {
135             if (err_reason) *err_reason = "compressed_with_header too small";
136             return NULL;
137             }
138              
139             payload_size = compressed_with_header - CH_COMPRESS_HEADER_SIZE;
140              
141             /* Need full block */
142             if (data_len < CH_CHECKSUM_SIZE + CH_COMPRESS_HEADER_SIZE + payload_size) {
143             *need_more = 1;
144             return NULL;
145             }
146              
147             /* Verify checksum */
148             {
149             ch_uint128_t expected, actual;
150             memcpy(&expected.lo, data, 8);
151             memcpy(&expected.hi, data + 8, 8);
152             actual = ch_city_hash128(data + CH_CHECKSUM_SIZE, compressed_with_header);
153             if (actual.lo != expected.lo || actual.hi != expected.hi) {
154             if (err_reason) *err_reason = "CityHash128 checksum mismatch";
155             return NULL;
156             }
157             }
158              
159             Newx(out, uncompressed_size, char);
160             ret = LZ4_decompress_safe(data + CH_CHECKSUM_SIZE + CH_COMPRESS_HEADER_SIZE,
161             out, (int)payload_size, (int)uncompressed_size);
162             if (ret < 0 || (uint32_t)ret != uncompressed_size) {
163             Safefree(out);
164             if (err_reason) *err_reason = "LZ4 decompression failed";
165             return NULL;
166             }
167              
168             *out_len = uncompressed_size;
169             *consumed = CH_CHECKSUM_SIZE + CH_COMPRESS_HEADER_SIZE + payload_size;
170             return out;
171             }
172              
173             /*
174             * Compress data into a ClickHouse LZ4 compressed block.
175             * Returns malloc'd buffer (checksum + header + LZ4 payload), sets *out_len.
176             */
177             static char* ch_lz4_compress(const char *data, size_t data_len, size_t *out_len) {
178             int max_compressed;
179             if (data_len > (size_t)INT_MAX) return NULL;
180             max_compressed = LZ4_compressBound((int)data_len);
181             char *out;
182             int compressed_size;
183             uint32_t compressed_with_header;
184             ch_uint128_t checksum;
185              
186             Newx(out, CH_CHECKSUM_SIZE + CH_COMPRESS_HEADER_SIZE + max_compressed, char);
187              
188             compressed_size = LZ4_compress_default(
189             data, out + CH_CHECKSUM_SIZE + CH_COMPRESS_HEADER_SIZE,
190             (int)data_len, max_compressed);
191              
192             if (compressed_size <= 0) {
193             Safefree(out);
194             return NULL;
195             }
196              
197             compressed_with_header = (uint32_t)compressed_size + CH_COMPRESS_HEADER_SIZE;
198              
199             /* Write header */
200             out[CH_CHECKSUM_SIZE] = (char)CH_LZ4_METHOD;
201             memcpy(out + CH_CHECKSUM_SIZE + 1, &compressed_with_header, 4);
202             { uint32_t uncomp = (uint32_t)data_len;
203             memcpy(out + CH_CHECKSUM_SIZE + 5, &uncomp, 4);
204             }
205              
206             /* Compute checksum over header + compressed data */
207             checksum = ch_city_hash128(out + CH_CHECKSUM_SIZE, compressed_with_header);
208             memcpy(out, &checksum.lo, 8);
209             memcpy(out + 8, &checksum.hi, 8);
210              
211             *out_len = CH_CHECKSUM_SIZE + CH_COMPRESS_HEADER_SIZE + compressed_size;
212             return out;
213             }
214              
215             /* Decompress one or more consecutive LZ4 sub-blocks starting at buf[*pos].
216             * Advances *pos past every consumed sub-block; *out_len is total decompressed
217             * size. Returns malloc'd buffer (caller Safefrees) on success, NULL on
218             * not-enough-data (sets *need_more=1) or on hard error (sets *err). */
219             static char* ch_lz4_decompress_chain(const char *buf, size_t len, size_t *pos,
220             size_t *out_len, int *need_more,
221             const char **err) {
222             size_t comp_consumed;
223             char *out;
224              
225             *need_more = 0;
226             *err = NULL;
227              
228             out = ch_lz4_decompress(buf + *pos, len - *pos, out_len,
229             &comp_consumed, need_more, err);
230             if (!out) return NULL;
231             *pos += comp_consumed;
232              
233             while (len - *pos >= CH_CHECKSUM_SIZE + CH_COMPRESS_HEADER_SIZE
234             && (uint8_t)buf[*pos + CH_CHECKSUM_SIZE] == CH_LZ4_METHOD) {
235             size_t extra_len, extra_consumed;
236             int extra_need_more = 0;
237             const char *extra_err = NULL;
238             char *extra = ch_lz4_decompress(buf + *pos, len - *pos, &extra_len,
239             &extra_consumed,
240             &extra_need_more, &extra_err);
241             if (!extra) {
242             Safefree(out);
243             if (extra_need_more) { *need_more = 1; return NULL; }
244             *err = extra_err;
245             return NULL;
246             }
247             Renew(out, *out_len + extra_len, char);
248             Copy(extra, out + *out_len, extra_len, char);
249             *out_len += extra_len;
250             *pos += extra_consumed;
251             Safefree(extra);
252             }
253             return out;
254             }
255              
256             #endif /* HAVE_LZ4 */
257              
258             /* --- Days-since-epoch calculation for Date encoding --- */
259              
260 0           static int32_t date_string_to_days(const char *s, size_t len) {
261             int year, month, day;
262 0 0         if (len >= 10 && s[4] == '-' && s[7] == '-') {
    0          
    0          
263 0           year = atoi(s);
264 0           month = atoi(s + 5);
265 0           day = atoi(s + 8);
266             /* civil_from_days algorithm (Howard Hinnant) */
267 0 0         if (month <= 2) { year--; month += 9; } else { month -= 3; }
268             {
269 0 0         int era = (year >= 0 ? year : year - 399) / 400;
270 0           unsigned yoe = (unsigned)(year - era * 400);
271 0           unsigned doy = (153 * (unsigned)month + 2) / 5 + (unsigned)day - 1;
272 0           unsigned doe = yoe * 365 + yoe/4 - yoe/100 + doy;
273 0           return (int32_t)(era * 146097 + (int)doe - 719468);
274             }
275             }
276             /* fallback: numeric value */
277 0           return (int32_t)strtol(s, NULL, 10);
278             }
279              
280 0           static uint32_t datetime_string_to_epoch(const char *s, size_t len) {
281 0           int hour = 0, min = 0, sec = 0;
282 0 0         if (len >= 10 && s[4] == '-' && s[7] == '-') {
    0          
    0          
283 0 0         if (len >= 19) {
284 0           hour = atoi(s + 11);
285 0           min = atoi(s + 14);
286 0           sec = atoi(s + 17);
287             }
288             {
289 0           int32_t days = date_string_to_days(s, 10);
290 0           return (uint32_t)((int64_t)days * 86400 + hour * 3600 + min * 60 + sec);
291             }
292             }
293 0           return (uint32_t)strtoul(s, NULL, 10);
294             }
295              
296             /* --- TabSeparated parser --- */
297              
298             /* Parse TabSeparated body into AV of AV. Handles \N -> undef, backslash escapes. */
299 0           static AV* parse_tab_separated(const char *data, size_t len) {
300 0           AV *rows = newAV();
301 0           const char *p = data;
302 0           const char *end = data + len;
303             const char *line_start;
304             AV *row;
305             char *buf;
306             size_t buf_len;
307              
308             /* pre-allocate scratch buffer for unescaping */
309 0           Newx(buf, len + 1, char);
310              
311 0 0         while (p < end) {
312             /* skip trailing empty line */
313 0 0         if (p + 1 == end && *p == '\n') break;
    0          
314              
315 0           row = newAV();
316 0           line_start = p;
317              
318 0 0         while (p <= end) {
319 0 0         int is_end_of_line = (p == end || *p == '\n');
    0          
320 0 0         int is_tab = (!is_end_of_line && *p == '\t');
    0          
321              
322 0 0         if (is_end_of_line || is_tab) {
    0          
323 0           const char *field_start = line_start;
324 0           size_t field_len = p - field_start;
325              
326             /* check for \N (NULL) */
327 0 0         if (field_len == 2 && field_start[0] == '\\' && field_start[1] == 'N') {
    0          
    0          
328 0           av_push(row, newSV(0));
329             } else {
330             /* unescape */
331 0           buf_len = 0;
332 0           const char *s = field_start;
333 0           const char *s_end = field_start + field_len;
334 0 0         while (s < s_end) {
335 0 0         if (*s == '\\' && s + 1 < s_end) {
    0          
336 0           s++;
337 0           switch (*s) {
338 0           case 'n': buf[buf_len++] = '\n'; break;
339 0           case 't': buf[buf_len++] = '\t'; break;
340 0           case '\\': buf[buf_len++] = '\\'; break;
341 0           case '\'': buf[buf_len++] = '\''; break;
342 0           case '0': buf[buf_len++] = '\0'; break;
343 0           case 'a': buf[buf_len++] = '\a'; break;
344 0           case 'b': buf[buf_len++] = '\b'; break;
345 0           case 'f': buf[buf_len++] = '\f'; break;
346 0           case 'r': buf[buf_len++] = '\r'; break;
347 0           default: buf[buf_len++] = '\\'; buf[buf_len++] = *s; break;
348             }
349 0           s++;
350             } else {
351 0           buf[buf_len++] = *s++;
352             }
353             }
354 0           av_push(row, newSVpvn(buf, buf_len));
355             }
356              
357 0 0         if (is_tab) {
358 0           p++;
359 0           line_start = p;
360             } else {
361 0 0         if (p < end) p++; /* skip \n */
362 0           break;
363             }
364             } else {
365 0           p++;
366             }
367             }
368 0           av_push(rows, newRV_noinc((SV*)row));
369             }
370              
371 0           Safefree(buf);
372 0           return rows;
373             }
374