File Coverage

Kafka.xs
Criterion Covered Total %
statement 1121 2770 40.4
branch 461 1932 23.8
condition n/a
subroutine n/a
pod n/a
total 1582 4702 33.6


line stmt bran cond sub pod time code
1             #include "EXTERN.h"
2             #include "perl.h"
3             #include "XSUB.h"
4             #include "ppport.h"
5              
6             #include "EVAPI.h"
7             #include "ngx-queue.h"
8              
9             #include
10             #include
11             #include
12             #include
13             #include
14             #include
15             #include
16             #include
17             #include
18             #include
19              
20             #ifdef HAVE_OPENSSL
21             #include
22             #include
23             #include
24             #include
25             #include
26             #include
27             #include
28             #include
29             #include
30             #endif
31              
32             #ifdef HAVE_LZ4
33             #include
34             #endif
35              
36             #ifdef HAVE_ZLIB
37             #include
38             #endif
39              
40             #ifdef HAVE_ZSTD
41             #include
42             #endif
43              
44             #ifdef HAVE_SNAPPY
45             #include
46             #endif
47              
48             /* ================================================================
49             * Constants
50             * ================================================================ */
51              
52             #define KF_MAGIC_ALIVE 0xCAFEBEEF
53             #define KF_MAGIC_FREED 0xDEADCAFE
54              
55             #define KF_BUF_INIT 16384
56              
57             /* True during Perl global destruction — calling user callbacks then is unsafe
58             * because closed-over SVs may already have been freed. */
59             #ifdef PERL_PHASE_DESTRUCT
60             # define KF_IN_GLOBAL_DESTRUCT() (PL_phase == PERL_PHASE_DESTRUCT)
61             #else
62             # define KF_IN_GLOBAL_DESTRUCT() (PL_dirty)
63             #endif
64              
65             /* Connection states */
66             #define CONN_DISCONNECTED 0
67             #define CONN_CONNECTING 1
68             #define CONN_TLS_HANDSHAKE 2
69             #define CONN_SASL_HANDSHAKE 3
70             #define CONN_SASL_AUTH 4
71             #define CONN_API_VERSIONS 5
72             #define CONN_READY 6
73              
74             /* Kafka API keys */
75             #define API_PRODUCE 0
76             #define API_FETCH 1
77             #define API_LIST_OFFSETS 2
78             #define API_METADATA 3
79             #define API_OFFSET_COMMIT 8
80             #define API_OFFSET_FETCH 9
81             #define API_FIND_COORDINATOR 10
82             #define API_JOIN_GROUP 11
83             #define API_HEARTBEAT 12
84             #define API_LEAVE_GROUP 13
85             #define API_SYNC_GROUP 14
86             #define API_SASL_HANDSHAKE 17
87             #define API_API_VERSIONS 18
88             #define API_CREATE_TOPICS 19
89             #define API_DELETE_TOPICS 20
90             #define API_INIT_PRODUCER_ID 22
91             #define API_ADD_PARTITIONS_TXN 24
92             #define API_END_TXN 26
93             #define API_TXN_OFFSET_COMMIT 28
94             #define API_SASL_AUTHENTICATE 36
95              
96             #define API_VERSIONS_MAX_KEY 64
97              
98             /* Compression types — Kafka wire codes. */
99             #define COMPRESS_NONE 0
100             #define COMPRESS_GZIP 1
101             #define COMPRESS_SNAPPY 2
102             #define COMPRESS_LZ4 3
103             #define COMPRESS_ZSTD 4
104              
105             #define CLEAR_HANDLER(field) \
106             do { if (NULL != (field)) { SvREFCNT_dec(field); (field) = NULL; } } while(0)
107              
108             /* ================================================================
109             * Type declarations
110             * ================================================================ */
111              
112             typedef struct kf_buf_s kf_buf_t;
113             typedef struct ev_kafka_conn_s ev_kafka_conn_t;
114             typedef struct ev_kafka_conn_cb_s ev_kafka_conn_cb_t;
115              
116             typedef ev_kafka_conn_t* EV__Kafka__Conn;
117              
118             /* ================================================================
119             * Dynamic buffer
120             * ================================================================ */
121              
122             struct kf_buf_s {
123             char *data;
124             size_t len;
125             size_t cap;
126             };
127              
128 205           static void kf_buf_init(kf_buf_t *b) {
129 205           Newx(b->data, 256, char);
130 205           b->len = 0;
131 205           b->cap = 256;
132 205           }
133              
134 1675           static void kf_buf_grow(kf_buf_t *b, size_t need) {
135 1675 100         if (b->cap >= need) return;
136 19           size_t newcap = b->cap * 2;
137 19 100         if (newcap < need) newcap = need;
138 19           Renew(b->data, newcap, char);
139 19           b->cap = newcap;
140             }
141              
142 205           static void kf_buf_free(kf_buf_t *b) {
143 205 50         if (b->data) { Safefree(b->data); b->data = NULL; }
144 205           b->len = 0;
145 205           b->cap = 0;
146 205           }
147              
148 458           static void kf_buf_append(kf_buf_t *b, const char *data, size_t len) {
149 458           kf_buf_grow(b, b->len + len);
150 458           Copy(data, b->data + b->len, len, char);
151 458           b->len += len;
152 458           }
153              
154 157           static void kf_buf_append_i8(kf_buf_t *b, int8_t val) {
155 157           kf_buf_grow(b, b->len + 1);
156 157           b->data[b->len++] = val;
157 157           }
158              
159 35           static void kf_buf_append_i16(kf_buf_t *b, int16_t val) {
160 35           kf_buf_grow(b, b->len + 2);
161 35           uint16_t v = htons((uint16_t)val);
162 35           memcpy(b->data + b->len, &v, 2);
163 35           b->len += 2;
164 35           }
165              
166 97           static void kf_buf_append_i32(kf_buf_t *b, int32_t val) {
167 97           kf_buf_grow(b, b->len + 4);
168 97           uint32_t v = htonl((uint32_t)val);
169 97           memcpy(b->data + b->len, &v, 4);
170 97           b->len += 4;
171 97           }
172              
173 64           static void kf_buf_append_i64(kf_buf_t *b, int64_t val) {
174 64           kf_buf_grow(b, b->len + 8);
175 64           uint32_t hi = htonl((uint32_t)((uint64_t)val >> 32));
176 64           uint32_t lo = htonl((uint32_t)((uint64_t)val & 0xFFFFFFFF));
177 64           memcpy(b->data + b->len, &hi, 4);
178 64           memcpy(b->data + b->len + 4, &lo, 4);
179 64           b->len += 8;
180 64           }
181              
182             /* Kafka STRING: INT16 length + bytes. NULL → -1 length */
183 0           static void kf_buf_append_string(kf_buf_t *b, const char *s, int16_t len) {
184 0           kf_buf_append_i16(b, len);
185 0 0         if (len > 0) kf_buf_append(b, s, len);
186 0           }
187              
188 1           static void kf_buf_append_nullable_string(kf_buf_t *b, const char *s, int16_t len) {
189 1 50         if (!s) {
190 0           kf_buf_append_i16(b, -1);
191             } else {
192 1           kf_buf_append_i16(b, len);
193 1 50         if (len > 0) kf_buf_append(b, s, len);
194             }
195 1           }
196              
197             /* Kafka BYTES: INT32 length + bytes */
198 0           static void kf_buf_append_bytes(kf_buf_t *b, const char *s, int32_t len) {
199 0           kf_buf_append_i32(b, len);
200 0 0         if (len > 0) kf_buf_append(b, s, len);
201 0           }
202              
203 0           static void kf_buf_append_nullable_bytes(kf_buf_t *b, const char *s, int32_t len) {
204 0 0         if (!s) {
205 0           kf_buf_append_i32(b, -1);
206             } else {
207 0           kf_buf_append_i32(b, len);
208 0 0         if (len > 0) kf_buf_append(b, s, len);
209             }
210 0           }
211              
212             /* Unsigned varint (ZigZag for signed is not used in Kafka framing) */
213 864           static void kf_buf_append_uvarint(kf_buf_t *b, uint64_t val) {
214 864           kf_buf_grow(b, b->len + 10);
215 885 100         while (val >= 0x80) {
216 21           b->data[b->len++] = (char)((val & 0x7F) | 0x80);
217 21           val >>= 7;
218             }
219 864           b->data[b->len++] = (char)val;
220 864           }
221              
222             /* Signed varint (ZigZag encoding) */
223 864           static void kf_buf_append_varint(kf_buf_t *b, int64_t val) {
224 864           uint64_t uval = ((uint64_t)val << 1) ^ (uint64_t)(val >> 63);
225 864           kf_buf_append_uvarint(b, uval);
226 864           }
227              
228             /* Compact string: uvarint(len+1) + bytes. NULL → 0 */
229 0           static void kf_buf_append_compact_string(kf_buf_t *b, const char *s, int32_t len) {
230 0 0         if (!s) {
231 0           kf_buf_append_uvarint(b, 0);
232             } else {
233 0           kf_buf_append_uvarint(b, (uint64_t)len + 1);
234 0 0         if (len > 0) kf_buf_append(b, s, len);
235             }
236 0           }
237              
238             /* Tagged fields: just 0 = no tagged fields */
239 0           static void kf_buf_append_tagged_fields(kf_buf_t *b) {
240 0           kf_buf_append_uvarint(b, 0);
241 0           }
242              
243             /* ================================================================
244             * Wire read helpers (big-endian)
245             * ================================================================ */
246              
247 46           static int16_t kf_read_i16(const char *buf) {
248             uint16_t v;
249 46           memcpy(&v, buf, 2);
250 46           return (int16_t)ntohs(v);
251             }
252              
253 74           static int32_t kf_read_i32(const char *buf) {
254             uint32_t v;
255 74           memcpy(&v, buf, 4);
256 74           return (int32_t)ntohl(v);
257             }
258              
259 32           static int64_t kf_read_i64(const char *buf) {
260             uint32_t hi, lo;
261 32           memcpy(&hi, buf, 4);
262 32           memcpy(&lo, buf + 4, 4);
263 32           return ((int64_t)ntohl(hi) << 32) | (uint32_t)ntohl(lo);
264             }
265              
266             /* Read unsigned varint, returns bytes consumed or -1 on error */
267 846           static int kf_read_uvarint(const char *buf, const char *end, uint64_t *out) {
268 846           uint64_t val = 0;
269 846           int shift = 0;
270 846           const char *p = buf;
271 867 50         while (p < end) {
272 867           uint8_t b = (uint8_t)*p++;
273 867           val |= ((uint64_t)(b & 0x7F)) << shift;
274 867 100         if (!(b & 0x80)) {
275 846           *out = val;
276 846           return (int)(p - buf);
277             }
278 21           shift += 7;
279 21 50         if (shift >= 64) return -1;
280             }
281 0           return -1; /* incomplete */
282             }
283              
284             /* Read signed varint (ZigZag) */
285 846           static int kf_read_varint(const char *buf, const char *end, int64_t *out) {
286             uint64_t uval;
287 846           int n = kf_read_uvarint(buf, end, &uval);
288 846 50         if (n < 0) return n;
289 846           *out = (int64_t)((uval >> 1) ^ -(int64_t)(uval & 1));
290 846           return n;
291             }
292              
293             /* Read Kafka STRING: i16 len + bytes. Returns pointer into buf, sets *len. Returns bytes consumed. */
294 13           static int kf_read_string(const char *buf, const char *end, const char **out, int16_t *slen) {
295 13 50         if (end - buf < 2) return -1;
296 13           int16_t len = kf_read_i16(buf);
297 13 100         if (len < 0) { /* nullable null */
298 1           *out = NULL;
299 1           *slen = 0;
300 1           return 2;
301             }
302 12 50         if (end - buf < 2 + len) return -1;
303 12           *out = buf + 2;
304 12           *slen = len;
305 12           return 2 + len;
306             }
307              
308             /* Read compact string: uvarint(len+1) + bytes */
309 0           static int kf_read_compact_string(const char *buf, const char *end, const char **out, int32_t *slen) {
310             uint64_t raw;
311 0           int n = kf_read_uvarint(buf, end, &raw);
312 0 0         if (n < 0) return -1;
313 0 0         if (raw == 0) {
314 0           *out = NULL;
315 0           *slen = 0;
316 0           return n;
317             }
318 0           int32_t len = (int32_t)(raw - 1);
319 0 0         if (end - buf - n < len) return -1;
320 0           *out = buf + n;
321 0           *slen = len;
322 0           return n + len;
323             }
324              
325             /* Skip tagged fields */
326 0           static int kf_skip_tagged_fields(const char *buf, const char *end) {
327             uint64_t count;
328 0           int n = kf_read_uvarint(buf, end, &count);
329 0 0         if (n < 0) return -1;
330 0           const char *p = buf + n;
331             uint64_t i;
332 0 0         for (i = 0; i < count; i++) {
333             uint64_t tag;
334 0           int tn = kf_read_uvarint(p, end, &tag);
335 0 0         if (tn < 0) return -1;
336 0           p += tn;
337             uint64_t dlen;
338 0           int dn = kf_read_uvarint(p, end, &dlen);
339 0 0         if (dn < 0) return -1;
340 0           p += dn;
341 0 0         if ((uint64_t)(end - p) < dlen) return -1;
342 0           p += dlen;
343             }
344 0           return (int)(p - buf);
345             }
346              
347             /* ================================================================
348             * CRC32C (software implementation)
349             * ================================================================ */
350              
351             static uint32_t crc32c_table[256];
352             static int crc32c_table_inited = 0;
353              
354 21           static void crc32c_init_table(void) {
355             uint32_t i, j;
356 5397 100         for (i = 0; i < 256; i++) {
357 5376           uint32_t crc = i;
358 48384 100         for (j = 0; j < 8; j++) {
359 43008 100         if (crc & 1)
360 21504           crc = (crc >> 1) ^ 0x82F63B78;
361             else
362 21504           crc >>= 1;
363             }
364 5376           crc32c_table[i] = crc;
365             }
366 21           crc32c_table_inited = 1;
367 21           }
368              
369 37           static uint32_t crc32c(const char *buf, size_t len) {
370 37           uint32_t crc = 0xFFFFFFFF;
371             size_t i;
372 37 50         if (!crc32c_table_inited) crc32c_init_table();
373 205109 100         for (i = 0; i < len; i++)
374 205072           crc = crc32c_table[(crc ^ (uint8_t)buf[i]) & 0xFF] ^ (crc >> 8);
375 37           return crc ^ 0xFFFFFFFF;
376             }
377              
378             /* ================================================================
379             * Connection callback struct
380             * ================================================================ */
381              
382             struct ev_kafka_conn_cb_s {
383             SV *cb;
384             ngx_queue_t queue;
385             int32_t correlation_id;
386             int16_t api_key;
387             int16_t api_version;
388             int internal; /* 1 = internal handshake cb, don't invoke Perl */
389             };
390              
391             /* ================================================================
392             * Broker connection struct (Layer 1)
393             * ================================================================ */
394              
395             struct ev_kafka_conn_s {
396             unsigned int magic;
397             struct ev_loop *loop;
398             int fd;
399             int state;
400              
401             /* EV watchers */
402             ev_io rio, wio;
403             ev_timer timer;
404             int reading, writing, timing;
405              
406             /* TLS */
407             #ifdef HAVE_OPENSSL
408             SSL_CTX *ssl_ctx;
409             SSL *ssl;
410             #endif
411             int tls_enabled;
412             char *tls_ca_file;
413             int tls_skip_verify;
414              
415             /* Connection params */
416             char *host;
417             int port;
418              
419             /* SASL */
420             char *sasl_mechanism;
421             char *sasl_username;
422             char *sasl_password;
423              
424             /* SCRAM state */
425             int scram_step;
426             char *scram_nonce;
427             char *scram_client_first;
428             size_t scram_client_first_len;
429             /* For server-final-message signature verification (RFC 5802). */
430             unsigned char scram_server_key[64];
431             int scram_server_key_len;
432             char *scram_auth_message;
433             size_t scram_auth_message_len;
434              
435             /* Buffers */
436             char *rbuf;
437             size_t rbuf_len, rbuf_cap;
438             char *wbuf;
439             size_t wbuf_len, wbuf_off, wbuf_cap;
440              
441             /* Request/response correlation */
442             ngx_queue_t cb_queue;
443             int32_t next_correlation_id;
444             int pending_count;
445              
446             /* Client identity */
447             char *client_id;
448             int client_id_len;
449              
450             /* API version negotiation */
451             int16_t api_versions[API_VERSIONS_MAX_KEY];
452             int api_versions_known;
453              
454             /* Event handlers */
455             SV *on_error;
456             SV *on_connect;
457             SV *on_disconnect;
458              
459             /* Reconnection */
460             int auto_reconnect;
461             int reconnect_delay_ms;
462             ev_timer reconnect_timer;
463             int reconnect_timing;
464             int intentional_disconnect;
465              
466             /* Safety */
467             int callback_depth;
468             };
469              
470             /* ================================================================
471             * Forward declarations
472             * ================================================================ */
473              
474             static void conn_io_cb(EV_P_ ev_io *w, int revents);
475             static void conn_timer_cb(EV_P_ ev_timer *w, int revents);
476             static void conn_reconnect_timer_cb(EV_P_ ev_timer *w, int revents);
477             static void conn_start_reading(ev_kafka_conn_t *self);
478             static void conn_stop_reading(ev_kafka_conn_t *self);
479             static void conn_start_writing(ev_kafka_conn_t *self);
480             static void conn_stop_writing(ev_kafka_conn_t *self);
481             static void conn_emit_error(pTHX_ ev_kafka_conn_t *self, const char *msg);
482             static void conn_cleanup(pTHX_ ev_kafka_conn_t *self);
483             static int conn_check_destroyed(ev_kafka_conn_t *self);
484             static void conn_cancel_pending(pTHX_ ev_kafka_conn_t *self, const char *err);
485             static void conn_on_connect_done(pTHX_ ev_kafka_conn_t *self);
486             static void conn_process_responses(pTHX_ ev_kafka_conn_t *self);
487             static void conn_schedule_reconnect(pTHX_ ev_kafka_conn_t *self);
488             static int32_t conn_send_request(pTHX_ ev_kafka_conn_t *self,
489             int16_t api_key, int16_t api_version, kf_buf_t *body, SV *cb,
490             int internal, int no_response);
491             static void conn_send_api_versions(pTHX_ ev_kafka_conn_t *self);
492             static void conn_start_connect(pTHX_ ev_kafka_conn_t *self,
493             const char *host, int port, double timeout);
494             static void conn_send_sasl_handshake(pTHX_ ev_kafka_conn_t *self);
495             static void conn_parse_sasl_handshake_response(pTHX_ ev_kafka_conn_t *self,
496             const char *data, size_t len);
497             static void conn_send_sasl_authenticate(pTHX_ ev_kafka_conn_t *self);
498             static void conn_parse_sasl_authenticate_response(pTHX_ ev_kafka_conn_t *self,
499             const char *data, size_t len);
500              
501             /* Response parsers */
502             static SV* conn_parse_metadata_response(pTHX_ ev_kafka_conn_t *self,
503             int16_t version, const char *data, size_t len);
504             static SV* conn_parse_produce_response(pTHX_ ev_kafka_conn_t *self,
505             int16_t version, const char *data, size_t len);
506             static SV* conn_parse_fetch_response(pTHX_ ev_kafka_conn_t *self,
507             int16_t version, const char *data, size_t len);
508             static SV* conn_parse_list_offsets_response(pTHX_ ev_kafka_conn_t *self,
509             int16_t version, const char *data, size_t len);
510             static SV* conn_parse_find_coordinator_response(pTHX_ ev_kafka_conn_t *self,
511             int16_t version, const char *data, size_t len);
512             static SV* conn_parse_join_group_response(pTHX_ ev_kafka_conn_t *self,
513             int16_t version, const char *data, size_t len);
514             static SV* conn_parse_sync_group_response(pTHX_ ev_kafka_conn_t *self,
515             int16_t version, const char *data, size_t len);
516             static SV* conn_parse_heartbeat_response(pTHX_ ev_kafka_conn_t *self,
517             int16_t version, const char *data, size_t len);
518             static SV* conn_parse_offset_commit_response(pTHX_ ev_kafka_conn_t *self,
519             int16_t version, const char *data, size_t len);
520             static SV* conn_parse_offset_fetch_response(pTHX_ ev_kafka_conn_t *self,
521             int16_t version, const char *data, size_t len);
522             static SV* conn_parse_leave_group_response(pTHX_ ev_kafka_conn_t *self,
523             int16_t version, const char *data, size_t len);
524             static SV* conn_parse_create_topics_response(pTHX_ ev_kafka_conn_t *self,
525             int16_t version, const char *data, size_t len);
526             static SV* conn_parse_delete_topics_response(pTHX_ ev_kafka_conn_t *self,
527             int16_t version, const char *data, size_t len);
528             static SV* conn_parse_init_producer_id_response(pTHX_ ev_kafka_conn_t *self,
529             int16_t version, const char *data, size_t len);
530             static SV* conn_parse_add_partitions_to_txn_response(pTHX_ ev_kafka_conn_t *self,
531             int16_t version, const char *data, size_t len);
532             static SV* conn_parse_end_txn_response(pTHX_ ev_kafka_conn_t *self,
533             int16_t version, const char *data, size_t len);
534             static SV* conn_parse_txn_offset_commit_response(pTHX_ ev_kafka_conn_t *self,
535             int16_t version, const char *data, size_t len);
536              
537             #ifdef HAVE_OPENSSL
538             static int is_ip_literal(const char *host);
539             #endif
540              
541             /* ================================================================
542             * I/O helpers (with optional TLS)
543             * ================================================================ */
544              
545 1           static ssize_t kf_io_read(ev_kafka_conn_t *self, void *buf, size_t len) {
546             #ifdef HAVE_OPENSSL
547 1 50         if (self->ssl) {
548 0 0         int ssl_len = (len > (size_t)INT_MAX) ? INT_MAX : (int)len;
549 0           ERR_clear_error();
550 0           int ret = SSL_read(self->ssl, buf, ssl_len);
551 0 0         if (ret <= 0) {
552 0           int err = SSL_get_error(self->ssl, ret);
553 0 0         if (err == SSL_ERROR_WANT_READ) {
554 0           errno = EAGAIN;
555 0           return -1;
556             }
557 0 0         if (err == SSL_ERROR_WANT_WRITE) {
558 0           conn_start_writing(self);
559 0           errno = EAGAIN;
560 0           return -1;
561             }
562 0 0         if (err == SSL_ERROR_ZERO_RETURN) return 0;
563 0           ERR_clear_error();
564 0           errno = EIO;
565 0           return -1;
566             }
567 0           return ret;
568             }
569             #endif
570 1           return read(self->fd, buf, len);
571             }
572              
573 1           static ssize_t kf_io_write(ev_kafka_conn_t *self, const void *buf, size_t len) {
574             #ifdef HAVE_OPENSSL
575 1 50         if (self->ssl) {
576 0 0         int ssl_len = (len > (size_t)INT_MAX) ? INT_MAX : (int)len;
577 0           ERR_clear_error();
578 0           int ret = SSL_write(self->ssl, buf, ssl_len);
579 0 0         if (ret <= 0) {
580 0           int err = SSL_get_error(self->ssl, ret);
581 0 0         if (err == SSL_ERROR_WANT_WRITE) {
582 0           errno = EAGAIN;
583 0           return -1;
584             }
585 0 0         if (err == SSL_ERROR_WANT_READ) {
586 0           conn_start_reading(self);
587 0           errno = EAGAIN;
588 0           return -1;
589             }
590 0           ERR_clear_error();
591 0           errno = EIO;
592 0           return -1;
593             }
594 0           return ret;
595             }
596             #endif
597 1           return write(self->fd, buf, len);
598             }
599              
600             /* ================================================================
601             * Buffer management
602             * ================================================================ */
603              
604 1           static void conn_ensure_rbuf(ev_kafka_conn_t *self, size_t need) {
605 1 50         if (self->rbuf_cap >= need) return;
606 0           size_t newcap = self->rbuf_cap * 2;
607 0 0         if (newcap < need) newcap = need;
608 0           Renew(self->rbuf, newcap, char);
609 0           self->rbuf_cap = newcap;
610             }
611              
612 1           static void conn_ensure_wbuf(ev_kafka_conn_t *self, size_t need) {
613 1 50         if (self->wbuf_cap >= need) return;
614 0           size_t newcap = self->wbuf_cap * 2;
615 0 0         if (newcap < need) newcap = need;
616 0           Renew(self->wbuf, newcap, char);
617 0           self->wbuf_cap = newcap;
618             }
619              
620             /* ================================================================
621             * Watcher control
622             * ================================================================ */
623              
624 1           static void conn_start_reading(ev_kafka_conn_t *self) {
625 1 50         if (!self->reading && self->fd >= 0) {
    50          
626 1           ev_io_start(self->loop, &self->rio);
627 1           self->reading = 1;
628             }
629 1           }
630              
631 9           static void conn_stop_reading(ev_kafka_conn_t *self) {
632 9 100         if (self->reading) {
633 1           ev_io_stop(self->loop, &self->rio);
634 1           self->reading = 0;
635             }
636 9           }
637              
638 3           static void conn_start_writing(ev_kafka_conn_t *self) {
639 3 50         if (!self->writing && self->fd >= 0) {
    50          
640 3           ev_io_start(self->loop, &self->wio);
641 3           self->writing = 1;
642             }
643 3           }
644              
645 12           static void conn_stop_writing(ev_kafka_conn_t *self) {
646 12 100         if (self->writing) {
647 3           ev_io_stop(self->loop, &self->wio);
648 3           self->writing = 0;
649             }
650 12           }
651              
652             /* ================================================================
653             * Callback invocation helpers
654             * ================================================================ */
655              
656 1           static void conn_emit_error(pTHX_ ev_kafka_conn_t *self, const char *msg) {
657 1 50         if (!self->on_error)
658 0           croak("EV::Kafka::Conn: %s", msg);
659              
660 1           self->callback_depth++;
661             {
662 1           dSP;
663 1           ENTER;
664 1           SAVETMPS;
665 1 50         PUSHMARK(SP);
666 1 50         XPUSHs(sv_2mortal(newSVpv(msg, 0)));
667 1           PUTBACK;
668 1           call_sv(self->on_error, G_DISCARD | G_EVAL);
669 1 50         if (SvTRUE(ERRSV)) {
    50          
670 0 0         warn("EV::Kafka::Conn: on_error callback error: %s", SvPV_nolen(ERRSV));
671 0 0         sv_setsv(ERRSV, &PL_sv_undef);
672             }
673 1 50         FREETMPS;
674 1           LEAVE;
675             }
676 1           self->callback_depth--;
677 1           }
678              
679 1           static void conn_emit_connect(pTHX_ ev_kafka_conn_t *self) {
680 1 50         if (!self->on_connect) return;
681              
682 1           self->callback_depth++;
683             {
684 1           dSP;
685 1           ENTER;
686 1           SAVETMPS;
687 1 50         PUSHMARK(SP);
688 1           PUTBACK;
689 1           call_sv(self->on_connect, G_DISCARD | G_EVAL);
690 1 50         if (SvTRUE(ERRSV)) {
    50          
691 0 0         warn("EV::Kafka::Conn: on_connect callback error: %s", SvPV_nolen(ERRSV));
692 0 0         sv_setsv(ERRSV, &PL_sv_undef);
693             }
694 1 50         FREETMPS;
695 1           LEAVE;
696             }
697 1           self->callback_depth--;
698             }
699              
700 2           static void conn_emit_disconnect(pTHX_ ev_kafka_conn_t *self) {
701 2 50         if (!self->on_disconnect) return;
702              
703 0           self->callback_depth++;
704             {
705 0           dSP;
706 0           ENTER;
707 0           SAVETMPS;
708 0 0         PUSHMARK(SP);
709 0           PUTBACK;
710 0           call_sv(self->on_disconnect, G_DISCARD | G_EVAL);
711 0 0         if (SvTRUE(ERRSV)) {
    0          
712 0 0         warn("EV::Kafka::Conn: on_disconnect callback error: %s", SvPV_nolen(ERRSV));
713 0 0         sv_setsv(ERRSV, &PL_sv_undef);
714             }
715 0 0         FREETMPS;
716 0           LEAVE;
717             }
718 0           self->callback_depth--;
719             }
720              
721             /* Invoke a command callback: cb->(result, error) */
722 0           static void conn_invoke_cb(pTHX_ ev_kafka_conn_t *self, SV *cb, SV *result, SV *error) {
723 0 0         if (!cb) return;
724              
725 0           self->callback_depth++;
726             {
727 0           dSP;
728 0           ENTER;
729 0           SAVETMPS;
730 0 0         PUSHMARK(SP);
731 0 0         EXTEND(SP, 2);
732 0 0         PUSHs(result ? result : &PL_sv_undef);
733 0 0         PUSHs(error ? error : &PL_sv_undef);
734 0           PUTBACK;
735 0           call_sv(cb, G_DISCARD | G_EVAL);
736 0 0         if (SvTRUE(ERRSV)) {
    0          
737 0 0         warn("EV::Kafka::Conn: callback error: %s", SvPV_nolen(ERRSV));
738 0 0         sv_setsv(ERRSV, &PL_sv_undef);
739             }
740 0 0         FREETMPS;
741 0           LEAVE;
742             }
743 0           self->callback_depth--;
744             }
745              
746 6           static int conn_check_destroyed(ev_kafka_conn_t *self) {
747 6           return self->magic != KF_MAGIC_ALIVE;
748             }
749              
750             /* ================================================================
751             * Connection cleanup
752             * ================================================================ */
753              
754 9           static void conn_cleanup(pTHX_ ev_kafka_conn_t *self) {
755 9           conn_stop_reading(self);
756 9           conn_stop_writing(self);
757 9 50         if (self->timing) {
758 0           ev_timer_stop(self->loop, &self->timer);
759 0           self->timing = 0;
760             }
761             #ifdef HAVE_OPENSSL
762 9 50         if (self->ssl) {
763 0           SSL_free(self->ssl);
764 0           self->ssl = NULL;
765             }
766 9 50         if (self->ssl_ctx) {
767 0           SSL_CTX_free(self->ssl_ctx);
768 0           self->ssl_ctx = NULL;
769             }
770             #endif
771 9 100         if (self->fd >= 0) {
772 2           close(self->fd);
773 2           self->fd = -1;
774             }
775 9           self->state = CONN_DISCONNECTED;
776 9           }
777              
778 9           static void conn_cancel_pending(pTHX_ ev_kafka_conn_t *self, const char *err) {
779 9           int in_destruct = KF_IN_GLOBAL_DESTRUCT();
780 9 50         SV *err_sv = in_destruct ? NULL : newSVpv(err, 0);
781             ngx_queue_t *q;
782              
783 9 50         while (!ngx_queue_empty(&self->cb_queue)) {
784 0           q = ngx_queue_head(&self->cb_queue);
785 0           ngx_queue_remove(q);
786 0           ev_kafka_conn_cb_t *cbt = ngx_queue_data(q, ev_kafka_conn_cb_t, queue);
787 0           self->pending_count--;
788 0 0         if (cbt->cb && !cbt->internal && !in_destruct) {
    0          
    0          
789 0           conn_invoke_cb(aTHX_ self, cbt->cb, NULL, sv_2mortal(newSVsv(err_sv)));
790 0 0         if (conn_check_destroyed(self)) {
791 0           SvREFCNT_dec(cbt->cb);
792 0           Safefree(cbt);
793 0           SvREFCNT_dec(err_sv);
794 0           return;
795             }
796             }
797 0 0         if (cbt->cb) SvREFCNT_dec(cbt->cb);
798 0           Safefree(cbt);
799             }
800 9 50         if (err_sv) SvREFCNT_dec(err_sv);
801             }
802              
803 2           static void conn_handle_disconnect(pTHX_ ev_kafka_conn_t *self, const char *reason) {
804 2           conn_cleanup(aTHX_ self);
805              
806 2           conn_emit_disconnect(aTHX_ self);
807 2 50         if (conn_check_destroyed(self)) return;
808              
809 2           conn_cancel_pending(aTHX_ self, reason);
810 2 50         if (conn_check_destroyed(self)) return;
811              
812 2 100         if (!self->intentional_disconnect && self->auto_reconnect) {
    50          
813 0           conn_schedule_reconnect(aTHX_ self);
814             }
815             }
816              
817             /* ================================================================
818             * Reconnection
819             * ================================================================ */
820              
821 0           static void conn_reconnect_timer_cb(EV_P_ ev_timer *w, int revents) {
822 0           ev_kafka_conn_t *self = (ev_kafka_conn_t *)((char *)w - offsetof(ev_kafka_conn_t, reconnect_timer));
823             dTHX;
824             (void)loop;
825             (void)revents;
826              
827 0           self->reconnect_timing = 0;
828 0 0         if (self->magic != KF_MAGIC_ALIVE) return;
829 0 0         if (self->state != CONN_DISCONNECTED) return;
830 0 0         if (!self->host) return;
831              
832 0           conn_start_connect(aTHX_ self, self->host, self->port, 10.0);
833             }
834              
835 0           static void conn_schedule_reconnect(pTHX_ ev_kafka_conn_t *self) {
836 0 0         if (self->reconnect_timing) return;
837 0           double delay = self->reconnect_delay_ms / 1000.0;
838 0 0         if (delay < 0.01) delay = 1.0;
839 0           ev_timer_init(&self->reconnect_timer, conn_reconnect_timer_cb, delay, 0.0);
840 0           ev_timer_start(self->loop, &self->reconnect_timer);
841 0           self->reconnect_timing = 1;
842             }
843              
844             /* ================================================================
845             * Request framing
846             * ================================================================ */
847              
848             /* Build request header + body, append to wbuf, enqueue callback.
849             * For api_version >= threshold, uses compact header (v2).
850             * ApiVersions v3+ uses flexible (v1) request header.
851             * Returns correlation_id.
852             */
853 1           static int32_t conn_send_request(pTHX_ ev_kafka_conn_t *self,
854             int16_t api_key, int16_t api_version, kf_buf_t *body, SV *cb,
855             int internal, int no_response)
856             {
857 1           int32_t corr_id = self->next_correlation_id++;
858             kf_buf_t hdr;
859 1           kf_buf_init(&hdr);
860              
861             /* Request header v1 (non-flexible): api_key, api_version, correlation_id, client_id */
862             /* Request header v2 (flexible): same + tagged_fields */
863             /* ApiVersions v3+ uses header v2 (flexible) */
864 1 50         int flexible = (api_key == API_API_VERSIONS && api_version >= 3);
    50          
865              
866 1           kf_buf_append_i16(&hdr, api_key);
867 1           kf_buf_append_i16(&hdr, api_version);
868 1           kf_buf_append_i32(&hdr, corr_id);
869              
870 1 50         if (flexible) {
871             /* compact string for client_id */
872 0           kf_buf_append_compact_string(&hdr, self->client_id, self->client_id_len);
873 0           kf_buf_append_tagged_fields(&hdr);
874             } else {
875 1           kf_buf_append_nullable_string(&hdr, self->client_id, self->client_id_len);
876             }
877              
878             /* Total size = header + body */
879 1           size_t raw_size = hdr.len + body->len;
880 1 50         if (raw_size > (size_t)INT32_MAX) {
881 0           kf_buf_free(&hdr);
882 0           croak("request too large");
883             }
884 1           int32_t total_size = (int32_t)raw_size;
885              
886             /* Compact wbuf if sent prefix wastes significant space */
887 1 50         if (self->wbuf_off > 0 && self->wbuf_off > self->wbuf_len / 2) {
    0          
888 0           self->wbuf_len -= self->wbuf_off;
889 0 0         if (self->wbuf_len > 0)
890 0           memmove(self->wbuf, self->wbuf + self->wbuf_off, self->wbuf_len);
891 0           self->wbuf_off = 0;
892             }
893              
894             /* Append to wbuf: [size:i32][header][body] */
895 1           conn_ensure_wbuf(self, self->wbuf_len + 4 + total_size);
896             {
897 1           uint32_t sz = htonl((uint32_t)total_size);
898 1           memcpy(self->wbuf + self->wbuf_len, &sz, 4);
899 1           self->wbuf_len += 4;
900             }
901 1           Copy(hdr.data, self->wbuf + self->wbuf_len, hdr.len, char);
902 1           self->wbuf_len += hdr.len;
903 1           Copy(body->data, self->wbuf + self->wbuf_len, body->len, char);
904 1           self->wbuf_len += body->len;
905              
906 1           kf_buf_free(&hdr);
907              
908             /* Enqueue callback (skip for no-response requests like acks=0) */
909 1 50         if (!no_response) {
910             ev_kafka_conn_cb_t *cbt;
911 1           Newxz(cbt, 1, ev_kafka_conn_cb_t);
912 1           cbt->correlation_id = corr_id;
913 1           cbt->api_key = api_key;
914 1           cbt->api_version = api_version;
915 1           cbt->internal = internal;
916 1 50         if (cb) {
917 0           cbt->cb = cb;
918 0           SvREFCNT_inc(cb);
919             }
920 1           ngx_queue_insert_tail(&self->cb_queue, &cbt->queue);
921 1           self->pending_count++;
922             }
923              
924 1           conn_start_writing(self);
925              
926 1           return corr_id;
927             }
928              
929             /* ================================================================
930             * ApiVersions (API 18)
931             * ================================================================ */
932              
933 1           static void conn_send_api_versions(pTHX_ ev_kafka_conn_t *self) {
934             kf_buf_t body;
935 1           kf_buf_init(&body);
936              
937             /* ApiVersions v0: empty body, most compatible */
938 1           self->state = CONN_API_VERSIONS;
939 1           conn_send_request(aTHX_ self, API_API_VERSIONS, 0, &body, NULL, 1, 0);
940 1           kf_buf_free(&body);
941 1           }
942              
943 1           static void conn_parse_api_versions_response(pTHX_ ev_kafka_conn_t *self,
944             const char *data, size_t len)
945             {
946 1           const char *p = data;
947 1           const char *end = data + len;
948             int i;
949              
950             /* Initialize all as unsupported */
951 65 100         for (i = 0; i < API_VERSIONS_MAX_KEY; i++)
952 64           self->api_versions[i] = -1;
953              
954 1 50         if (end - p < 2) goto err;
955 1           int16_t error_code = kf_read_i16(p); p += 2;
956 1 50         if (error_code != 0) {
957             char errbuf[128];
958 0           snprintf(errbuf, sizeof(errbuf), "ApiVersions error: %d", error_code);
959 0           conn_emit_error(aTHX_ self, errbuf);
960 0 0         if (conn_check_destroyed(self)) return;
961 0           conn_handle_disconnect(aTHX_ self, errbuf);
962 0           return;
963             }
964              
965             /* v0 response: array(api_key:i16, min_version:i16, max_version:i16) */
966 1 50         if (end - p < 4) goto err;
967 1           int32_t count = kf_read_i32(p); p += 4;
968              
969 3 100         for (i = 0; i < count; i++) {
970 2 50         if (end - p < 6) goto err;
971 2           int16_t key = kf_read_i16(p); p += 2;
972 2           /* int16_t min_version = kf_read_i16(p); */ p += 2;
973 2           int16_t max_version = kf_read_i16(p); p += 2;
974              
975 2 50         if (key >= 0 && key < API_VERSIONS_MAX_KEY)
    50          
976 2           self->api_versions[key] = max_version;
977             }
978              
979 1           self->api_versions_known = 1;
980              
981             /* Handshake complete (or continue to SASL) */
982 1 50         if (self->sasl_mechanism) {
983 0           self->state = CONN_SASL_HANDSHAKE;
984 0           conn_send_sasl_handshake(aTHX_ self);
985             } else {
986 1           self->state = CONN_READY;
987 1           conn_emit_connect(aTHX_ self);
988             }
989 1           return;
990              
991 0           err:
992 0           conn_emit_error(aTHX_ self, "malformed ApiVersions response");
993 0 0         if (conn_check_destroyed(self)) return;
994 0           conn_handle_disconnect(aTHX_ self, "malformed ApiVersions response");
995             }
996              
997             /* ================================================================
998             * SASL Handshake (API 17) + Authenticate (API 36)
999             * ================================================================ */
1000              
1001 0           static void conn_send_sasl_handshake(pTHX_ ev_kafka_conn_t *self) {
1002             kf_buf_t body;
1003 0           kf_buf_init(&body);
1004              
1005             /* SaslHandshake v1: mechanism (STRING) */
1006 0           STRLEN mech_len = strlen(self->sasl_mechanism);
1007 0           kf_buf_append_string(&body, self->sasl_mechanism, (int16_t)mech_len);
1008              
1009 0           self->state = CONN_SASL_HANDSHAKE;
1010 0           conn_send_request(aTHX_ self, API_SASL_HANDSHAKE, 1, &body, NULL, 1, 0);
1011 0           kf_buf_free(&body);
1012 0           }
1013              
1014 0           static void conn_parse_sasl_handshake_response(pTHX_ ev_kafka_conn_t *self,
1015             const char *data, size_t len)
1016             {
1017 0           const char *p = data;
1018 0           const char *end = data + len;
1019              
1020 0 0         if (end - p < 2) goto err;
1021 0           int16_t error_code = kf_read_i16(p); p += 2;
1022              
1023             /* skip mechanisms array */
1024 0 0         if (end - p < 4) goto err;
1025 0           int32_t count = kf_read_i32(p); p += 4;
1026             int32_t i;
1027 0 0         for (i = 0; i < count; i++) {
1028             const char *s; int16_t slen;
1029 0           int n = kf_read_string(p, end, &s, &slen);
1030 0 0         if (n < 0) goto err;
1031 0           p += n;
1032             }
1033              
1034 0 0         if (error_code != 0) {
1035 0           conn_emit_error(aTHX_ self, "SASL handshake failed: mechanism not supported");
1036 0 0         if (conn_check_destroyed(self)) return;
1037 0           conn_handle_disconnect(aTHX_ self, "SASL handshake failed");
1038 0           return;
1039             }
1040              
1041             /* Proceed to authenticate */
1042 0           conn_send_sasl_authenticate(aTHX_ self);
1043 0           return;
1044              
1045 0           err:
1046 0           conn_emit_error(aTHX_ self, "malformed SaslHandshake response");
1047 0 0         if (conn_check_destroyed(self)) return;
1048 0           conn_handle_disconnect(aTHX_ self, "malformed SaslHandshake response");
1049             }
1050              
1051             /* SCRAM state for multi-step SASL */
1052             #define SCRAM_STEP_CLIENT_FIRST 0
1053             #define SCRAM_STEP_CLIENT_FINAL 1
1054             #define SCRAM_STEP_DONE 2
1055              
1056 0           static void conn_send_sasl_authenticate(pTHX_ ev_kafka_conn_t *self) {
1057             kf_buf_t body;
1058 0           kf_buf_init(&body);
1059              
1060 0 0         if (self->sasl_mechanism && strcmp(self->sasl_mechanism, "PLAIN") == 0) {
    0          
1061             /* PLAIN: \0username\0password */
1062 0 0         STRLEN ulen = self->sasl_username ? strlen(self->sasl_username) : 0;
1063 0 0         STRLEN plen = self->sasl_password ? strlen(self->sasl_password) : 0;
1064 0           int32_t auth_len = 1 + (int32_t)ulen + 1 + (int32_t)plen;
1065              
1066 0           kf_buf_append_i32(&body, auth_len);
1067 0           kf_buf_append_i8(&body, 0);
1068 0 0         if (ulen > 0) kf_buf_append(&body, self->sasl_username, ulen);
1069 0           kf_buf_append_i8(&body, 0);
1070 0 0         if (plen > 0) kf_buf_append(&body, self->sasl_password, plen);
1071             }
1072             #ifdef HAVE_OPENSSL
1073 0 0         else if (self->sasl_mechanism &&
1074 0 0         (strcmp(self->sasl_mechanism, "SCRAM-SHA-256") == 0 ||
1075 0 0         strcmp(self->sasl_mechanism, "SCRAM-SHA-512") == 0)) {
1076             /* SCRAM client-first-message: n,,n=,r= */
1077             char nonce[33];
1078             {
1079             unsigned char rnd[16];
1080             int i;
1081 0           RAND_bytes(rnd, 16);
1082 0 0         for (i = 0; i < 16; i++)
1083 0           snprintf(nonce + i*2, 3, "%02x", rnd[i]);
1084 0           nonce[32] = '\0';
1085             }
1086              
1087             /* save nonce for later steps */
1088 0 0         if (self->scram_nonce) Safefree(self->scram_nonce);
1089 0           self->scram_nonce = savepv(nonce);
1090 0           self->scram_step = SCRAM_STEP_CLIENT_FIRST;
1091              
1092 0 0         if (!self->sasl_username) {
1093 0           conn_emit_error(aTHX_ self, "SCRAM: username required");
1094 0           kf_buf_free(&body);
1095 0           return;
1096             }
1097             kf_buf_t msg;
1098 0           kf_buf_init(&msg);
1099 0           kf_buf_append(&msg, "n,,n=", 5);
1100 0           kf_buf_append(&msg, self->sasl_username, strlen(self->sasl_username));
1101 0           kf_buf_append(&msg, ",r=", 3);
1102 0           kf_buf_append(&msg, nonce, 32);
1103              
1104             /* save client-first-message-bare for later binding */
1105 0 0         if (self->scram_client_first) Safefree(self->scram_client_first);
1106             /* bare = after "n,," */
1107 0           self->scram_client_first = savepvn(msg.data + 3, msg.len - 3);
1108 0           self->scram_client_first_len = msg.len - 3;
1109              
1110 0           kf_buf_append_i32(&body, (int32_t)msg.len);
1111 0           kf_buf_append(&body, msg.data, msg.len);
1112 0           kf_buf_free(&msg);
1113             }
1114             #endif
1115             else {
1116 0           conn_emit_error(aTHX_ self, "unsupported SASL mechanism");
1117 0           kf_buf_free(&body);
1118 0           return;
1119             }
1120              
1121 0           self->state = CONN_SASL_AUTH;
1122             {
1123 0           int16_t ver = self->api_versions[API_SASL_AUTHENTICATE];
1124 0 0         if (ver < 0) ver = 1;
1125 0 0         if (ver > 2) ver = 2;
1126 0           conn_send_request(aTHX_ self, API_SASL_AUTHENTICATE, ver, &body, NULL, 1, 0);
1127             }
1128 0           kf_buf_free(&body);
1129             }
1130              
1131 0           static void conn_parse_sasl_authenticate_response(pTHX_ ev_kafka_conn_t *self,
1132             const char *data, size_t len)
1133             {
1134 0           const char *p = data;
1135 0           const char *end = data + len;
1136              
1137 0 0         if (end - p < 2) goto err;
1138 0           int16_t error_code = kf_read_i16(p); p += 2;
1139              
1140 0           const char *errmsg_str = NULL;
1141 0           int16_t errmsg_len = 0;
1142             {
1143 0           int n = kf_read_string(p, end, &errmsg_str, &errmsg_len);
1144 0 0         if (n < 0) goto err;
1145 0           p += n;
1146             }
1147              
1148             /* auth_bytes */
1149 0           const char *auth_data = NULL;
1150 0           int32_t auth_data_len = 0;
1151 0 0         if (end - p >= 4) {
1152 0           auth_data_len = kf_read_i32(p); p += 4;
1153 0 0         if (auth_data_len > 0 && end - p >= auth_data_len) {
    0          
1154 0           auth_data = p;
1155 0           p += auth_data_len;
1156             }
1157             }
1158              
1159 0 0         if (error_code != 0) {
1160             char errbuf[512];
1161 0 0         if (errmsg_str && errmsg_len > 0)
    0          
1162 0           snprintf(errbuf, sizeof(errbuf), "SASL auth failed: %.*s", (int)errmsg_len, errmsg_str);
1163             else
1164 0           snprintf(errbuf, sizeof(errbuf), "SASL auth failed: error %d", error_code);
1165 0           conn_emit_error(aTHX_ self, errbuf);
1166 0 0         if (conn_check_destroyed(self)) return;
1167 0           conn_handle_disconnect(aTHX_ self, "SASL auth failed");
1168 0           return;
1169             }
1170              
1171             #ifdef HAVE_OPENSSL
1172             /* SCRAM multi-step handling */
1173 0 0         if (self->sasl_mechanism && self->scram_step == SCRAM_STEP_CLIENT_FIRST && auth_data) {
    0          
    0          
1174             /* Server-first-message: r=,s=,i= */
1175             /* Parse server response, compute proof, send client-final */
1176 0           const char *server_nonce = NULL;
1177 0           size_t server_nonce_len = 0;
1178 0           const char *salt_b64 = NULL;
1179 0           size_t salt_b64_len = 0;
1180 0           int iterations = 0;
1181             {
1182 0           const char *sp = auth_data;
1183 0           const char *se = auth_data + auth_data_len;
1184 0 0         while (sp < se) {
1185 0 0         if (sp + 2 <= se && sp[0] == 'r' && sp[1] == '=') {
    0          
    0          
1186 0           sp += 2; server_nonce = sp;
1187 0 0         while (sp < se && *sp != ',') sp++;
    0          
1188 0           server_nonce_len = sp - server_nonce;
1189 0 0         } else if (sp + 2 <= se && sp[0] == 's' && sp[1] == '=') {
    0          
    0          
1190 0           sp += 2; salt_b64 = sp;
1191 0 0         while (sp < se && *sp != ',') sp++;
    0          
1192 0           salt_b64_len = sp - salt_b64;
1193 0 0         } else if (sp + 2 <= se && sp[0] == 'i' && sp[1] == '=') {
    0          
    0          
1194 0           sp += 2;
1195 0           iterations = atoi(sp);
1196 0 0         while (sp < se && *sp != ',') sp++;
    0          
1197             }
1198 0 0         if (sp < se && *sp == ',') sp++;
    0          
1199 0           else sp++;
1200             }
1201             }
1202              
1203 0 0         if (!server_nonce || !salt_b64 || iterations <= 0) {
    0          
    0          
1204 0           conn_emit_error(aTHX_ self, "SCRAM: malformed server-first-message");
1205 0 0         if (conn_check_destroyed(self)) return;
1206 0           conn_handle_disconnect(aTHX_ self, "SCRAM auth failed");
1207 0           return;
1208             }
1209              
1210             /* RFC 5802: server nonce must start with client nonce */
1211 0 0         if (server_nonce_len < 32 ||
1212 0 0         memcmp(server_nonce, self->scram_nonce, 32) != 0) {
1213 0           conn_emit_error(aTHX_ self, "SCRAM: server nonce mismatch");
1214 0 0         if (conn_check_destroyed(self)) return;
1215 0           conn_handle_disconnect(aTHX_ self, "SCRAM auth failed");
1216 0           return;
1217             }
1218              
1219 0           int is_sha512 = (strcmp(self->sasl_mechanism, "SCRAM-SHA-512") == 0);
1220 0 0         const EVP_MD *md = is_sha512 ? EVP_sha512() : EVP_sha256();
1221 0 0         int digest_len = is_sha512 ? 64 : 32;
1222              
1223             /* Decode salt from base64 */
1224             unsigned char salt[128];
1225             int salt_len;
1226             {
1227 0           BIO *b64 = BIO_new(BIO_f_base64());
1228 0           BIO *bmem = BIO_new_mem_buf(salt_b64, (int)salt_b64_len);
1229 0           bmem = BIO_push(b64, bmem);
1230 0           BIO_set_flags(bmem, BIO_FLAGS_BASE64_NO_NL);
1231 0           salt_len = BIO_read(bmem, salt, sizeof(salt));
1232 0           BIO_free_all(bmem);
1233 0 0         if (salt_len <= 0) {
1234 0           conn_emit_error(aTHX_ self, "SCRAM: bad salt");
1235 0 0         if (conn_check_destroyed(self)) return;
1236 0           conn_handle_disconnect(aTHX_ self, "SCRAM auth failed");
1237 0           return;
1238             }
1239             }
1240              
1241             /* SaltedPassword = Hi(password, salt, iterations) using PBKDF2 */
1242             unsigned char salted_password[64];
1243 0           PKCS5_PBKDF2_HMAC(self->sasl_password, strlen(self->sasl_password),
1244             salt, salt_len, iterations, md, digest_len, salted_password);
1245              
1246             /* ClientKey = HMAC(SaltedPassword, "Client Key") */
1247             unsigned char client_key[64];
1248 0           unsigned int ck_len = digest_len;
1249 0           HMAC(md, salted_password, digest_len,
1250             (unsigned char *)"Client Key", 10, client_key, &ck_len);
1251              
1252             /* ServerKey = HMAC(SaltedPassword, "Server Key") — saved for
1253             * server-final-message verification. */
1254 0           unsigned int sk_hmac_len = digest_len;
1255 0           HMAC(md, salted_password, digest_len,
1256             (unsigned char *)"Server Key", 10,
1257 0           self->scram_server_key, &sk_hmac_len);
1258 0           self->scram_server_key_len = (int)sk_hmac_len;
1259              
1260             /* StoredKey = H(ClientKey) */
1261             unsigned char stored_key[64];
1262             {
1263 0           EVP_MD_CTX *ctx = EVP_MD_CTX_new();
1264             unsigned int sk_len;
1265 0           EVP_DigestInit_ex(ctx, md, NULL);
1266 0           EVP_DigestUpdate(ctx, client_key, digest_len);
1267 0           EVP_DigestFinal_ex(ctx, stored_key, &sk_len);
1268 0           EVP_MD_CTX_free(ctx);
1269             }
1270              
1271             /* AuthMessage = client-first-bare + "," + server-first + "," + client-final-without-proof */
1272 0           char channel_binding_b64[] = "biws"; /* base64("n,,") */
1273             kf_buf_t auth_msg;
1274 0           kf_buf_init(&auth_msg);
1275 0           kf_buf_append(&auth_msg, self->scram_client_first, self->scram_client_first_len);
1276 0           kf_buf_append(&auth_msg, ",", 1);
1277 0           kf_buf_append(&auth_msg, auth_data, auth_data_len);
1278 0           kf_buf_append(&auth_msg, ",c=", 3);
1279 0           kf_buf_append(&auth_msg, channel_binding_b64, 4);
1280 0           kf_buf_append(&auth_msg, ",r=", 3);
1281 0           kf_buf_append(&auth_msg, server_nonce, server_nonce_len);
1282              
1283             /* ClientSignature = HMAC(StoredKey, AuthMessage) */
1284             unsigned char client_sig[64];
1285 0           unsigned int cs_len = digest_len;
1286 0           HMAC(md, stored_key, digest_len,
1287 0           (unsigned char *)auth_msg.data, auth_msg.len, client_sig, &cs_len);
1288              
1289             /* ClientProof = ClientKey XOR ClientSignature */
1290             unsigned char proof[64];
1291             int di;
1292 0 0         for (di = 0; di < digest_len; di++)
1293 0           proof[di] = client_key[di] ^ client_sig[di];
1294              
1295             /* Save AuthMessage for server-signature verification at step 2. */
1296 0 0         if (self->scram_auth_message) Safefree(self->scram_auth_message);
1297 0           self->scram_auth_message = savepvn(auth_msg.data, auth_msg.len);
1298 0           self->scram_auth_message_len = auth_msg.len;
1299              
1300 0           kf_buf_free(&auth_msg);
1301              
1302             /* Base64 encode proof */
1303             char proof_b64[256];
1304             {
1305 0           BIO *b64 = BIO_new(BIO_f_base64());
1306 0           BIO *bmem = BIO_new(BIO_s_mem());
1307 0           b64 = BIO_push(b64, bmem);
1308 0           BIO_set_flags(b64, BIO_FLAGS_BASE64_NO_NL);
1309 0           BIO_write(b64, proof, digest_len);
1310 0           BIO_flush(b64);
1311             BUF_MEM *bptr;
1312 0           BIO_get_mem_ptr(b64, &bptr);
1313 0 0         int plen = bptr->length < 255 ? (int)bptr->length : 255;
1314 0           memcpy(proof_b64, bptr->data, plen);
1315 0           proof_b64[plen] = '\0';
1316 0           BIO_free_all(b64);
1317             }
1318              
1319             /* Build client-final-message: c=biws,r=,p= */
1320             kf_buf_t final_msg;
1321 0           kf_buf_init(&final_msg);
1322 0           kf_buf_append(&final_msg, "c=", 2);
1323 0           kf_buf_append(&final_msg, channel_binding_b64, 4);
1324 0           kf_buf_append(&final_msg, ",r=", 3);
1325 0           kf_buf_append(&final_msg, server_nonce, server_nonce_len);
1326 0           kf_buf_append(&final_msg, ",p=", 3);
1327 0           kf_buf_append(&final_msg, proof_b64, strlen(proof_b64));
1328              
1329             /* Send client-final via SaslAuthenticate */
1330             kf_buf_t body;
1331 0           kf_buf_init(&body);
1332 0           kf_buf_append_i32(&body, (int32_t)final_msg.len);
1333 0           kf_buf_append(&body, final_msg.data, final_msg.len);
1334              
1335 0           self->scram_step = SCRAM_STEP_CLIENT_FINAL;
1336             {
1337 0           int16_t ver = self->api_versions[API_SASL_AUTHENTICATE];
1338 0 0         if (ver < 0) ver = 1;
1339 0 0         if (ver > 2) ver = 2;
1340 0           conn_send_request(aTHX_ self, API_SASL_AUTHENTICATE, ver, &body, NULL, 1, 0);
1341             }
1342 0           kf_buf_free(&body);
1343 0           kf_buf_free(&final_msg);
1344 0           return;
1345             }
1346              
1347 0 0         if (self->sasl_mechanism && self->scram_step == SCRAM_STEP_CLIENT_FINAL) {
    0          
1348             /* Server-final-message: v=
1349             * ServerSignature = HMAC(ServerKey, AuthMessage) — RFC 5802. */
1350 0 0         if (!auth_data || auth_data_len < 2 || auth_data[0] != 'v' || auth_data[1] != '=') {
    0          
    0          
    0          
1351 0           conn_emit_error(aTHX_ self, "SCRAM: missing server signature");
1352 0 0         if (conn_check_destroyed(self)) return;
1353 0           conn_handle_disconnect(aTHX_ self, "SCRAM verify failed");
1354 0           return;
1355             }
1356 0           const char *recv_b64 = auth_data + 2;
1357 0           int recv_b64_len = (int)(auth_data_len - 2);
1358              
1359             /* Compute expected server signature. */
1360 0           const EVP_MD *md = NULL;
1361 0 0         if (strcmp(self->sasl_mechanism, "SCRAM-SHA-256") == 0) md = EVP_sha256();
1362 0 0         else if (strcmp(self->sasl_mechanism, "SCRAM-SHA-512") == 0) md = EVP_sha512();
1363 0 0         if (!md || self->scram_server_key_len <= 0 || !self->scram_auth_message) {
    0          
    0          
1364 0           conn_emit_error(aTHX_ self, "SCRAM: missing state for verification");
1365 0 0         if (conn_check_destroyed(self)) return;
1366 0           conn_handle_disconnect(aTHX_ self, "SCRAM verify failed");
1367 0           return;
1368             }
1369             unsigned char expected[64];
1370 0           unsigned int expected_len = (unsigned int)self->scram_server_key_len;
1371 0           HMAC(md, self->scram_server_key, self->scram_server_key_len,
1372 0           (unsigned char *)self->scram_auth_message,
1373             self->scram_auth_message_len, expected, &expected_len);
1374              
1375             /* Decode the received base64 signature. */
1376             unsigned char recv_sig[64];
1377 0           int recv_sig_len = 0;
1378             {
1379 0           BIO *b64 = BIO_new(BIO_f_base64());
1380 0           BIO *bmem = BIO_new_mem_buf(recv_b64, recv_b64_len);
1381 0           BIO_set_flags(b64, BIO_FLAGS_BASE64_NO_NL);
1382 0           b64 = BIO_push(b64, bmem);
1383 0           recv_sig_len = BIO_read(b64, recv_sig, (int)sizeof(recv_sig));
1384 0           BIO_free_all(b64);
1385             }
1386              
1387 0           int ok = (recv_sig_len == (int)expected_len)
1388 0 0         && (CRYPTO_memcmp(recv_sig, expected, expected_len) == 0);
    0          
1389              
1390             /* Wipe ServerKey/AuthMessage now that verification is done. */
1391 0           OPENSSL_cleanse(self->scram_server_key, sizeof(self->scram_server_key));
1392 0           self->scram_server_key_len = 0;
1393 0 0         if (self->scram_auth_message) {
1394 0           OPENSSL_cleanse(self->scram_auth_message,
1395             self->scram_auth_message_len);
1396 0           Safefree(self->scram_auth_message);
1397 0           self->scram_auth_message = NULL;
1398 0           self->scram_auth_message_len = 0;
1399             }
1400              
1401 0 0         if (!ok) {
1402 0           conn_emit_error(aTHX_ self, "SCRAM: server signature mismatch");
1403 0 0         if (conn_check_destroyed(self)) return;
1404 0           conn_handle_disconnect(aTHX_ self, "SCRAM verify failed");
1405 0           return;
1406             }
1407 0           self->scram_step = SCRAM_STEP_DONE;
1408             /* fall through to CONN_READY */
1409             }
1410             #endif
1411              
1412             /* Auth success — connection is ready */
1413 0           self->state = CONN_READY;
1414 0           conn_emit_connect(aTHX_ self);
1415 0           return;
1416              
1417 0           err:
1418 0           conn_emit_error(aTHX_ self, "malformed SaslAuthenticate response");
1419 0 0         if (conn_check_destroyed(self)) return;
1420 0           conn_handle_disconnect(aTHX_ self, "malformed SaslAuthenticate response");
1421             }
1422              
1423             /* ================================================================
1424             * Response dispatch
1425             * ================================================================ */
1426              
1427 1           static void conn_dispatch_response(pTHX_ ev_kafka_conn_t *self,
1428             ev_kafka_conn_cb_t *cbt, const char *data, size_t len)
1429             {
1430             /* Internal handshake responses */
1431 1 50         if (cbt->internal) {
1432 1           switch (cbt->api_key) {
1433 1           case API_API_VERSIONS:
1434 1           conn_parse_api_versions_response(aTHX_ self, data, len);
1435 1           break;
1436 0           case API_SASL_HANDSHAKE:
1437 0           conn_parse_sasl_handshake_response(aTHX_ self, data, len);
1438 0           break;
1439 0           case API_SASL_AUTHENTICATE:
1440 0           conn_parse_sasl_authenticate_response(aTHX_ self, data, len);
1441 0           break;
1442 0           default: break;
1443             }
1444 1           return;
1445             }
1446              
1447             /* User-facing responses: parse and invoke callback */
1448 0 0         if (!cbt->cb) return;
1449              
1450 0           switch (cbt->api_key) {
1451 0           case API_METADATA: {
1452             /* Parse metadata response and return as Perl hash */
1453 0           SV *result = conn_parse_metadata_response(aTHX_ self, cbt->api_version, data, len);
1454 0           conn_invoke_cb(aTHX_ self, cbt->cb, result, NULL);
1455 0           break;
1456             }
1457 0           case API_PRODUCE: {
1458 0           SV *result = conn_parse_produce_response(aTHX_ self, cbt->api_version, data, len);
1459 0           conn_invoke_cb(aTHX_ self, cbt->cb, result, NULL);
1460 0           break;
1461             }
1462 0           case API_FETCH: {
1463 0           SV *result = conn_parse_fetch_response(aTHX_ self, cbt->api_version, data, len);
1464 0           conn_invoke_cb(aTHX_ self, cbt->cb, result, NULL);
1465 0           break;
1466             }
1467 0           case API_LIST_OFFSETS: {
1468 0           SV *result = conn_parse_list_offsets_response(aTHX_ self, cbt->api_version, data, len);
1469 0           conn_invoke_cb(aTHX_ self, cbt->cb, result, NULL);
1470 0           break;
1471             }
1472 0           case API_FIND_COORDINATOR: {
1473 0           SV *result = conn_parse_find_coordinator_response(aTHX_ self, cbt->api_version, data, len);
1474 0           conn_invoke_cb(aTHX_ self, cbt->cb, result, NULL);
1475 0           break;
1476             }
1477 0           case API_JOIN_GROUP: {
1478 0           SV *result = conn_parse_join_group_response(aTHX_ self, cbt->api_version, data, len);
1479 0           conn_invoke_cb(aTHX_ self, cbt->cb, result, NULL);
1480 0           break;
1481             }
1482 0           case API_SYNC_GROUP: {
1483 0           SV *result = conn_parse_sync_group_response(aTHX_ self, cbt->api_version, data, len);
1484 0           conn_invoke_cb(aTHX_ self, cbt->cb, result, NULL);
1485 0           break;
1486             }
1487 0           case API_HEARTBEAT: {
1488 0           SV *result = conn_parse_heartbeat_response(aTHX_ self, cbt->api_version, data, len);
1489 0           conn_invoke_cb(aTHX_ self, cbt->cb, result, NULL);
1490 0           break;
1491             }
1492 0           case API_OFFSET_COMMIT: {
1493 0           SV *result = conn_parse_offset_commit_response(aTHX_ self, cbt->api_version, data, len);
1494 0           conn_invoke_cb(aTHX_ self, cbt->cb, result, NULL);
1495 0           break;
1496             }
1497 0           case API_OFFSET_FETCH: {
1498 0           SV *result = conn_parse_offset_fetch_response(aTHX_ self, cbt->api_version, data, len);
1499 0           conn_invoke_cb(aTHX_ self, cbt->cb, result, NULL);
1500 0           break;
1501             }
1502 0           case API_LEAVE_GROUP: {
1503 0           SV *result = conn_parse_leave_group_response(aTHX_ self, cbt->api_version, data, len);
1504 0           conn_invoke_cb(aTHX_ self, cbt->cb, result, NULL);
1505 0           break;
1506             }
1507 0           case API_CREATE_TOPICS: {
1508 0           SV *result = conn_parse_create_topics_response(aTHX_ self, cbt->api_version, data, len);
1509 0           conn_invoke_cb(aTHX_ self, cbt->cb, result, NULL);
1510 0           break;
1511             }
1512 0           case API_DELETE_TOPICS: {
1513 0           SV *result = conn_parse_delete_topics_response(aTHX_ self, cbt->api_version, data, len);
1514 0           conn_invoke_cb(aTHX_ self, cbt->cb, result, NULL);
1515 0           break;
1516             }
1517 0           case API_INIT_PRODUCER_ID: {
1518 0           SV *result = conn_parse_init_producer_id_response(aTHX_ self, cbt->api_version, data, len);
1519 0           conn_invoke_cb(aTHX_ self, cbt->cb, result, NULL);
1520 0           break;
1521             }
1522 0           case API_ADD_PARTITIONS_TXN: {
1523 0           SV *result = conn_parse_add_partitions_to_txn_response(aTHX_ self, cbt->api_version, data, len);
1524 0           conn_invoke_cb(aTHX_ self, cbt->cb, result, NULL);
1525 0           break;
1526             }
1527 0           case API_END_TXN: {
1528 0           SV *result = conn_parse_end_txn_response(aTHX_ self, cbt->api_version, data, len);
1529 0           conn_invoke_cb(aTHX_ self, cbt->cb, result, NULL);
1530 0           break;
1531             }
1532 0           case API_TXN_OFFSET_COMMIT: {
1533 0           SV *result = conn_parse_txn_offset_commit_response(aTHX_ self, cbt->api_version, data, len);
1534 0           conn_invoke_cb(aTHX_ self, cbt->cb, result, NULL);
1535 0           break;
1536             }
1537 0           default: {
1538             /* Unknown API — return raw bytes */
1539 0           SV *result = newSVpvn(data, len);
1540 0           conn_invoke_cb(aTHX_ self, cbt->cb, sv_2mortal(result), NULL);
1541 0           break;
1542             }
1543             }
1544             }
1545              
1546             /* ================================================================
1547             * RecordBatch encoder (for Produce requests)
1548             * ================================================================ */
1549              
1550             /* Build a single Record (within a RecordBatch).
1551             * Format: length(varint), attributes(i8=0), timestampDelta(varint),
1552             * offsetDelta(varint), key(varint-bytes), value(varint-bytes),
1553             * headers(varint array of {key:varint-str, value:varint-bytes})
1554             */
1555 141           static void kf_encode_record(kf_buf_t *b, int offset_delta, int64_t ts_delta,
1556             const char *key, STRLEN key_len, const char *value, STRLEN value_len,
1557             HV *headers)
1558             {
1559             kf_buf_t rec;
1560 141           kf_buf_init(&rec);
1561              
1562 141           kf_buf_append_i8(&rec, 0); /* attributes */
1563 141           kf_buf_append_varint(&rec, ts_delta);
1564 141           kf_buf_append_varint(&rec, offset_delta);
1565              
1566             /* key */
1567 141 100         if (key) {
1568 140           kf_buf_append_varint(&rec, (int64_t)key_len);
1569 140           kf_buf_append(&rec, key, key_len);
1570             } else {
1571 1           kf_buf_append_varint(&rec, -1);
1572             }
1573              
1574             /* value */
1575 141 100         if (value) {
1576 140           kf_buf_append_varint(&rec, (int64_t)value_len);
1577 140           kf_buf_append(&rec, value, value_len);
1578             } else {
1579 1           kf_buf_append_varint(&rec, -1);
1580             }
1581              
1582             /* headers */
1583 142 100         if (headers && HvUSEDKEYS(headers) > 0) {
    50          
    50          
1584 1 50         kf_buf_append_varint(&rec, (int64_t)HvUSEDKEYS(headers));
1585             HE *entry;
1586 1           hv_iterinit(headers);
1587 3 100         while ((entry = hv_iternext(headers))) {
1588             I32 hklen;
1589 2           const char *hkey = hv_iterkey(entry, &hklen);
1590 2           SV *hval = hv_iterval(headers, entry);
1591             STRLEN hvlen;
1592 2           const char *hvstr = SvPV(hval, hvlen);
1593              
1594 2           kf_buf_append_varint(&rec, (int64_t)hklen);
1595 2           kf_buf_append(&rec, hkey, hklen);
1596 2           kf_buf_append_varint(&rec, (int64_t)hvlen);
1597 2           kf_buf_append(&rec, hvstr, hvlen);
1598             }
1599             } else {
1600 140           kf_buf_append_varint(&rec, 0); /* 0 headers */
1601             }
1602              
1603             /* Write record: length(varint) + body */
1604 141           kf_buf_append_varint(b, (int64_t)rec.len);
1605 141           kf_buf_append(b, rec.data, rec.len);
1606 141           kf_buf_free(&rec);
1607 141           }
1608              
1609             /* Build a RecordBatch from one or more records, with optional
1610             * compression and producer-ID/transactional state. Caller frees *out. */
1611 16           static void kf_encode_record_batch_multi(pTHX_ kf_buf_t *out,
1612             AV *records_av, int64_t timestamp, int compression,
1613             int64_t producer_id, int16_t producer_epoch, int32_t base_sequence,
1614             int is_transactional)
1615             {
1616 16           SSize_t i, count = av_len(records_av) + 1;
1617              
1618             /* Validate up front so croak() doesn't longjmp over kf_buf_free below. */
1619 157 100         for (i = 0; i < count; i++) {
1620 141           SV **elem = av_fetch(records_av, i, 0);
1621 141 50         if (!elem || !SvROK(*elem) || SvTYPE(SvRV(*elem)) != SVt_PVHV)
    50          
    50          
1622 0           croak("produce_batch: record element must be a hashref");
1623             }
1624              
1625             kf_buf_t records;
1626 16           kf_buf_init(&records);
1627              
1628 157 100         for (i = 0; i < count; i++) {
1629 141           SV **elem = av_fetch(records_av, i, 0);
1630 141           HV *rh = (HV*)SvRV(*elem);
1631 141           SV **key_sv = hv_fetch(rh, "key", 3, 0);
1632 141           SV **val_sv = hv_fetch(rh, "value", 5, 0);
1633 141           SV **hdr_sv = hv_fetch(rh, "headers", 7, 0);
1634 141           const char *key = NULL; STRLEN key_len = 0;
1635 141           const char *val = NULL; STRLEN val_len = 0;
1636 141           HV *hdrs = NULL;
1637 141 50         if (key_sv && SvOK(*key_sv)) key = SvPV(*key_sv, key_len);
    100          
1638 141 50         if (val_sv && SvOK(*val_sv)) val = SvPV(*val_sv, val_len);
    100          
1639 141 100         if (hdr_sv && SvROK(*hdr_sv) && SvTYPE(SvRV(*hdr_sv)) == SVt_PVHV)
    50          
    50          
1640 1           hdrs = (HV*)SvRV(*hdr_sv);
1641 141           kf_encode_record(&records, (int)i, 0, key, key_len, val, val_len, hdrs);
1642             }
1643              
1644             kf_buf_t inner;
1645 16           kf_buf_init(&inner);
1646              
1647 16           int16_t attrs = (int16_t)(compression & 0x07);
1648 16 50         if (is_transactional) attrs |= 0x10; /* bit 4 = isTransactional */
1649 16           kf_buf_append_i16(&inner, attrs);
1650 16           kf_buf_append_i32(&inner, (int32_t)(count - 1)); /* lastOffsetDelta */
1651 16           kf_buf_append_i64(&inner, timestamp);
1652 16           kf_buf_append_i64(&inner, timestamp);
1653 16           kf_buf_append_i64(&inner, producer_id);
1654 16           kf_buf_append_i16(&inner, producer_epoch);
1655 16           kf_buf_append_i32(&inner, base_sequence);
1656              
1657             #ifdef HAVE_LZ4
1658             if (compression == COMPRESS_LZ4) {
1659             int max_compressed = LZ4_compressBound((int)records.len);
1660             char *compressed;
1661             Newx(compressed, max_compressed, char);
1662             int clen = LZ4_compress_default(records.data, compressed,
1663             (int)records.len, max_compressed);
1664             if (clen > 0) {
1665             kf_buf_append_i32(&inner, (int32_t)count);
1666             kf_buf_append(&inner, compressed, clen);
1667             } else {
1668             uint16_t zero = 0;
1669             memcpy(inner.data, &zero, 2);
1670             kf_buf_append_i32(&inner, (int32_t)count);
1671             kf_buf_append(&inner, records.data, records.len);
1672             }
1673             Safefree(compressed);
1674             } else
1675             #endif
1676             #ifdef HAVE_ZLIB
1677 16 100         if (compression == COMPRESS_GZIP) {
1678 2           size_t dest_cap = compressBound((uLong)records.len) + 32;
1679             char *compressed;
1680 2           Newx(compressed, dest_cap, char);
1681             z_stream zs;
1682 2           Zero(&zs, 1, z_stream);
1683 2           int zinit = deflateInit2(&zs, Z_DEFAULT_COMPRESSION, Z_DEFLATED,
1684             MAX_WBITS + 16, 8, Z_DEFAULT_STRATEGY);
1685 2           int zok = 0;
1686 2 50         if (zinit == Z_OK) {
1687 2           zs.next_in = (Bytef *)records.data;
1688 2           zs.avail_in = (uInt)records.len;
1689 2           zs.next_out = (Bytef *)compressed;
1690 2           zs.avail_out = (uInt)dest_cap;
1691 2 50         if (deflate(&zs, Z_FINISH) == Z_STREAM_END) zok = 1;
1692 2           deflateEnd(&zs);
1693             }
1694 2 50         if (zok) {
1695 2           kf_buf_append_i32(&inner, (int32_t)count);
1696 2           kf_buf_append(&inner, compressed, zs.total_out);
1697             } else {
1698 0           uint16_t zero = 0;
1699 0           memcpy(inner.data, &zero, 2);
1700 0           kf_buf_append_i32(&inner, (int32_t)count);
1701 0           kf_buf_append(&inner, records.data, records.len);
1702             }
1703 2           Safefree(compressed);
1704             } else
1705             #endif
1706             #ifdef HAVE_ZSTD
1707             if (compression == COMPRESS_ZSTD) {
1708             size_t dest_cap = ZSTD_compressBound(records.len);
1709             char *compressed;
1710             Newx(compressed, dest_cap, char);
1711             size_t clen = ZSTD_compress(compressed, dest_cap,
1712             records.data, records.len, ZSTD_CLEVEL_DEFAULT);
1713             if (!ZSTD_isError(clen)) {
1714             kf_buf_append_i32(&inner, (int32_t)count);
1715             kf_buf_append(&inner, compressed, clen);
1716             } else {
1717             uint16_t zero = 0;
1718             memcpy(inner.data, &zero, 2);
1719             kf_buf_append_i32(&inner, (int32_t)count);
1720             kf_buf_append(&inner, records.data, records.len);
1721             }
1722             Safefree(compressed);
1723             } else
1724             #endif
1725             #ifdef HAVE_SNAPPY
1726             if (compression == COMPRESS_SNAPPY) {
1727             size_t dest_cap = snappy_max_compressed_length(records.len);
1728             char *compressed;
1729             Newx(compressed, dest_cap, char);
1730             size_t clen = dest_cap;
1731             if (snappy_compress(records.data, records.len, compressed, &clen)
1732             == SNAPPY_OK) {
1733             kf_buf_append_i32(&inner, (int32_t)count);
1734             kf_buf_append(&inner, compressed, clen);
1735             } else {
1736             uint16_t zero = 0;
1737             memcpy(inner.data, &zero, 2);
1738             kf_buf_append_i32(&inner, (int32_t)count);
1739             kf_buf_append(&inner, records.data, records.len);
1740             }
1741             Safefree(compressed);
1742             } else
1743             #endif
1744             {
1745             (void)compression;
1746 14           kf_buf_append_i32(&inner, (int32_t)count);
1747 14           kf_buf_append(&inner, records.data, records.len);
1748             }
1749              
1750 16           uint32_t crc_val = crc32c(inner.data, inner.len);
1751 16           int32_t batch_length = 4 + 1 + 4 + (int32_t)inner.len;
1752              
1753 16           kf_buf_init(out);
1754 16           kf_buf_append_i64(out, 0);
1755 16           kf_buf_append_i32(out, batch_length);
1756 16           kf_buf_append_i32(out, 0);
1757 16           kf_buf_append_i8(out, 2);
1758 16           kf_buf_append_i32(out, (int32_t)crc_val);
1759 16           kf_buf_append(out, inner.data, inner.len);
1760              
1761 16           kf_buf_free(&inner);
1762 16           kf_buf_free(&records);
1763 16           }
1764              
1765             /* ================================================================
1766             * Response parsers
1767             * ================================================================ */
1768              
1769             /* Metadata response parser (API 3) */
1770 2           static SV* conn_parse_metadata_response(pTHX_ ev_kafka_conn_t *self,
1771             int16_t version, const char *data, size_t len)
1772             {
1773 2           const char *p = data;
1774 2           const char *end = data + len;
1775 2           HV *result = newHV();
1776 2           AV *brokers_av = newAV();
1777 2           AV *topics_av = newAV();
1778             int n;
1779              
1780             (void)self;
1781              
1782             /* For v9+ (flexible versions), fields use compact encoding */
1783 2           int flexible = (version >= 9);
1784              
1785 2 50         if (flexible) {
1786             /* throttle_time_ms */
1787 0 0         if (end - p < 4) goto done;
1788 0           /* int32_t throttle = kf_read_i32(p); */ p += 4;
1789              
1790             /* brokers: compact array */
1791             uint64_t raw;
1792 0           n = kf_read_uvarint(p, end, &raw);
1793 0 0         if (n < 0) goto done;
1794 0           p += n;
1795 0           int32_t broker_count = (int32_t)(raw - 1);
1796             int32_t i;
1797              
1798 0 0         for (i = 0; i < broker_count; i++) {
1799 0           HV *bh = newHV();
1800 0 0         if (end - p < 4) goto done;
1801 0           int32_t nid = kf_read_i32(p); p += 4;
1802 0           hv_store(bh, "node_id", 7, newSViv(nid), 0);
1803              
1804             const char *host; int32_t hlen;
1805 0           n = kf_read_compact_string(p, end, &host, &hlen);
1806 0 0         if (n < 0) goto done;
1807 0           p += n;
1808 0 0         hv_store(bh, "host", 4, newSVpvn(host ? host : "", host ? hlen : 0), 0);
    0          
1809              
1810 0 0         if (end - p < 4) goto done;
1811 0           int32_t port = kf_read_i32(p); p += 4;
1812 0           hv_store(bh, "port", 4, newSViv(port), 0);
1813              
1814             /* rack: compact nullable string */
1815             const char *rack; int32_t rlen;
1816 0           n = kf_read_compact_string(p, end, &rack, &rlen);
1817 0 0         if (n < 0) goto done;
1818 0           p += n;
1819              
1820             /* tagged fields */
1821 0           n = kf_skip_tagged_fields(p, end);
1822 0 0         if (n < 0) goto done;
1823 0           p += n;
1824              
1825 0           av_push(brokers_av, newRV_noinc((SV*)bh));
1826             }
1827              
1828             /* cluster_id */
1829             const char *cid; int32_t cidlen;
1830 0           n = kf_read_compact_string(p, end, &cid, &cidlen);
1831 0 0         if (n < 0) goto done;
1832 0           p += n;
1833              
1834             /* controller_id */
1835 0 0         if (end - p < 4) goto done;
1836 0           int32_t controller_id = kf_read_i32(p); p += 4;
1837 0           hv_store(result, "controller_id", 13, newSViv(controller_id), 0);
1838              
1839             /* topics: compact array */
1840 0           n = kf_read_uvarint(p, end, &raw);
1841 0 0         if (n < 0) goto done;
1842 0           p += n;
1843 0           int32_t topic_count = (int32_t)(raw - 1);
1844              
1845 0 0         for (i = 0; i < topic_count; i++) {
1846 0           HV *th = newHV();
1847 0 0         if (end - p < 2) goto done;
1848 0           int16_t terr = kf_read_i16(p); p += 2;
1849 0           hv_store(th, "error_code", 10, newSViv(terr), 0);
1850              
1851             const char *tname; int32_t tnlen;
1852 0           n = kf_read_compact_string(p, end, &tname, &tnlen);
1853 0 0         if (n < 0) goto done;
1854 0           p += n;
1855 0 0         hv_store(th, "name", 4, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
    0          
1856              
1857             /* topic_id (UUID, 16 bytes) — v10+ */
1858 0 0         if (version >= 10) {
1859 0 0         if (end - p < 16) goto done;
1860 0           p += 16;
1861             }
1862              
1863             /* is_internal */
1864 0 0         if (end - p < 1) goto done;
1865 0           p += 1;
1866              
1867             /* partitions: compact array */
1868 0           n = kf_read_uvarint(p, end, &raw);
1869 0 0         if (n < 0) goto done;
1870 0           p += n;
1871 0           int32_t part_count = (int32_t)(raw - 1);
1872 0           AV *parts_av = newAV();
1873             int32_t j;
1874              
1875 0 0         for (j = 0; j < part_count; j++) {
1876 0           HV *ph = newHV();
1877 0 0         if (end - p < 2) goto done;
1878 0           int16_t perr = kf_read_i16(p); p += 2;
1879 0           hv_store(ph, "error_code", 10, newSViv(perr), 0);
1880              
1881 0 0         if (end - p < 4) goto done;
1882 0           int32_t pid = kf_read_i32(p); p += 4;
1883 0           hv_store(ph, "partition", 9, newSViv(pid), 0);
1884              
1885 0 0         if (end - p < 4) goto done;
1886 0           int32_t leader = kf_read_i32(p); p += 4;
1887 0           hv_store(ph, "leader", 6, newSViv(leader), 0);
1888              
1889             /* leader_epoch — v7+ */
1890 0 0         if (version >= 7) {
1891 0 0         if (end - p < 4) goto done;
1892 0           p += 4;
1893             }
1894              
1895             /* replicas: compact array of i32 */
1896 0           n = kf_read_uvarint(p, end, &raw);
1897 0 0         if (n < 0) goto done;
1898 0           p += n;
1899 0           int32_t rcount = (int32_t)(raw - 1);
1900 0 0         if (rcount < 0 || rcount > 65536) goto done;
    0          
1901 0 0         if (end - p < (ptrdiff_t)((int64_t)rcount * 4)) goto done;
1902 0           p += (int64_t)rcount * 4;
1903              
1904             /* isr: compact array of i32 */
1905 0           n = kf_read_uvarint(p, end, &raw);
1906 0 0         if (n < 0) goto done;
1907 0           p += n;
1908 0           rcount = (int32_t)(raw - 1);
1909 0 0         if (rcount < 0 || rcount > 65536) goto done;
    0          
1910 0 0         if (end - p < (ptrdiff_t)((int64_t)rcount * 4)) goto done;
1911 0           p += (int64_t)rcount * 4;
1912              
1913             /* offline_replicas: compact array of i32 — v5+ */
1914 0 0         if (version >= 5) {
1915 0           n = kf_read_uvarint(p, end, &raw);
1916 0 0         if (n < 0) goto done;
1917 0           p += n;
1918 0           rcount = (int32_t)(raw - 1);
1919 0 0         if (rcount < 0 || rcount > 65536) goto done;
    0          
1920 0 0         if (end - p < (ptrdiff_t)((int64_t)rcount * 4)) goto done;
1921 0           p += (int64_t)rcount * 4;
1922             }
1923              
1924             /* tagged fields */
1925 0           n = kf_skip_tagged_fields(p, end);
1926 0 0         if (n < 0) goto done;
1927 0           p += n;
1928              
1929 0           av_push(parts_av, newRV_noinc((SV*)ph));
1930             }
1931 0           hv_store(th, "partitions", 10, newRV_noinc((SV*)parts_av), 0);
1932              
1933             /* topic authorized operations — v8+ */
1934 0 0         if (version >= 8) {
1935 0 0         if (end - p < 4) goto done;
1936 0           p += 4;
1937             }
1938              
1939             /* tagged fields */
1940 0           n = kf_skip_tagged_fields(p, end);
1941 0 0         if (n < 0) goto done;
1942 0           p += n;
1943              
1944 0           av_push(topics_av, newRV_noinc((SV*)th));
1945             }
1946             } else {
1947             /* Non-flexible (v0-v8) — use classic STRING/ARRAY encoding */
1948             /* throttle_time_ms (v3+) */
1949 2 50         if (version >= 3) {
1950 0 0         if (end - p < 4) goto done;
1951 0           p += 4;
1952             }
1953              
1954             /* brokers array */
1955 2 50         if (end - p < 4) goto done;
1956 2           int32_t broker_count = kf_read_i32(p); p += 4;
1957 2 50         if (broker_count < 0 || broker_count > 65536) goto done;
    50          
1958             int32_t i;
1959 3 100         for (i = 0; i < broker_count; i++) {
1960 2           HV *bh = newHV();
1961 2 100         if (end - p < 4) goto done;
1962 1           int32_t nid = kf_read_i32(p); p += 4;
1963 1           hv_store(bh, "node_id", 7, newSViv(nid), 0);
1964              
1965             const char *host; int16_t hlen;
1966 1           n = kf_read_string(p, end, &host, &hlen);
1967 1 50         if (n < 0) goto done;
1968 1           p += n;
1969 1 50         hv_store(bh, "host", 4, newSVpvn(host ? host : "", host ? hlen : 0), 0);
    50          
1970              
1971 1 50         if (end - p < 4) goto done;
1972 1           int32_t port = kf_read_i32(p); p += 4;
1973 1           hv_store(bh, "port", 4, newSViv(port), 0);
1974              
1975             /* rack (v1+) */
1976 1 50         if (version >= 1) {
1977             const char *r; int16_t rlen;
1978 0           n = kf_read_string(p, end, &r, &rlen);
1979 0 0         if (n < 0) goto done;
1980 0           p += n;
1981             }
1982              
1983 1           av_push(brokers_av, newRV_noinc((SV*)bh));
1984             }
1985              
1986             /* cluster_id (v2+) */
1987 1 50         if (version >= 2) {
1988             const char *cid; int16_t cidlen;
1989 0           n = kf_read_string(p, end, &cid, &cidlen);
1990 0 0         if (n < 0) goto done;
1991 0           p += n;
1992             }
1993              
1994             /* controller_id (v1+) */
1995 1 50         if (version >= 1) {
1996 0 0         if (end - p < 4) goto done;
1997 0           int32_t cid = kf_read_i32(p); p += 4;
1998 0           hv_store(result, "controller_id", 13, newSViv(cid), 0);
1999             }
2000              
2001             /* topics array */
2002 1 50         if (end - p < 4) goto done;
2003 1           int32_t topic_count = kf_read_i32(p); p += 4;
2004 1 50         if (topic_count < 0 || topic_count > 1000000) goto done;
    50          
2005 2 100         for (i = 0; i < topic_count; i++) {
2006 1           HV *th = newHV();
2007 1 50         if (end - p < 2) goto done;
2008 1           int16_t terr = kf_read_i16(p); p += 2;
2009 1           hv_store(th, "error_code", 10, newSViv(terr), 0);
2010              
2011             const char *tname; int16_t tnlen;
2012 1           n = kf_read_string(p, end, &tname, &tnlen);
2013 1 50         if (n < 0) goto done;
2014 1           p += n;
2015 1 50         hv_store(th, "name", 4, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
    50          
2016              
2017             /* is_internal (v1+) */
2018 1 50         if (version >= 1) {
2019 0 0         if (end - p < 1) goto done;
2020 0           p += 1;
2021             }
2022              
2023             /* partitions */
2024 1 50         if (end - p < 4) goto done;
2025 1           int32_t part_count = kf_read_i32(p); p += 4;
2026 1 50         if (part_count < 0 || part_count > 1000000) goto done;
    50          
2027 1           AV *parts_av = newAV();
2028             int32_t j;
2029 2 100         for (j = 0; j < part_count; j++) {
2030 1           HV *ph = newHV();
2031 1 50         if (end - p < 2) goto done;
2032 1           int16_t perr = kf_read_i16(p); p += 2;
2033 1           hv_store(ph, "error_code", 10, newSViv(perr), 0);
2034              
2035 1 50         if (end - p < 4) goto done;
2036 1           int32_t pid = kf_read_i32(p); p += 4;
2037 1           hv_store(ph, "partition", 9, newSViv(pid), 0);
2038              
2039 1 50         if (end - p < 4) goto done;
2040 1           int32_t leader = kf_read_i32(p); p += 4;
2041 1           hv_store(ph, "leader", 6, newSViv(leader), 0);
2042              
2043             /* leader_epoch (v7+) */
2044 1 50         if (version >= 7) {
2045 0 0         if (end - p < 4) goto done;
2046 0           p += 4;
2047             }
2048              
2049             /* replicas */
2050 1 50         if (end - p < 4) goto done;
2051 1           int32_t rcount = kf_read_i32(p); p += 4;
2052 1 50         if (rcount < 0 || rcount > 65536) goto done;
    50          
2053 1 50         if (end - p < (ptrdiff_t)((int64_t)rcount * 4)) goto done;
2054 1           p += (int64_t)rcount * 4;
2055              
2056             /* isr */
2057 1 50         if (end - p < 4) goto done;
2058 1           rcount = kf_read_i32(p); p += 4;
2059 1 50         if (rcount < 0 || rcount > 65536) goto done;
    50          
2060 1 50         if (end - p < (ptrdiff_t)((int64_t)rcount * 4)) goto done;
2061 1           p += (int64_t)rcount * 4;
2062              
2063             /* offline_replicas (v5+) */
2064 1 50         if (version >= 5) {
2065 0 0         if (end - p < 4) goto done;
2066 0           rcount = kf_read_i32(p); p += 4;
2067 0 0         if (rcount < 0 || rcount > 65536) goto done;
    0          
2068 0 0         if (end - p < (ptrdiff_t)((int64_t)rcount * 4)) goto done;
2069 0           p += (int64_t)rcount * 4;
2070             }
2071              
2072 1           av_push(parts_av, newRV_noinc((SV*)ph));
2073             }
2074 1           hv_store(th, "partitions", 10, newRV_noinc((SV*)parts_av), 0);
2075              
2076 1           av_push(topics_av, newRV_noinc((SV*)th));
2077             }
2078             }
2079              
2080 1           done:
2081 2           hv_store(result, "brokers", 7, newRV_noinc((SV*)brokers_av), 0);
2082 2           hv_store(result, "topics", 6, newRV_noinc((SV*)topics_av), 0);
2083 2           return sv_2mortal(newRV_noinc((SV*)result));
2084             }
2085              
2086             /* Produce response parser (API 0, v0-v7) */
2087 1           static SV* conn_parse_produce_response(pTHX_ ev_kafka_conn_t *self,
2088             int16_t version, const char *data, size_t len)
2089             {
2090 1           const char *p = data;
2091 1           const char *end = data + len;
2092 1           HV *result = newHV();
2093 1           AV *topics_av = newAV();
2094             int n;
2095              
2096             (void)self;
2097              
2098             /* responses: ARRAY */
2099 1 50         if (end - p < 4) goto done;
2100 1           int32_t topic_count = kf_read_i32(p); p += 4;
2101             int32_t i;
2102              
2103 2 100         for (i = 0; i < topic_count; i++) {
2104 1           HV *th = newHV();
2105             const char *tname; int16_t tnlen;
2106 1           n = kf_read_string(p, end, &tname, &tnlen);
2107 1 50         if (n < 0) goto done;
2108 1           p += n;
2109 1 50         hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
    50          
2110              
2111             /* partitions: ARRAY */
2112 1 50         if (end - p < 4) goto done;
2113 1           int32_t part_count = kf_read_i32(p); p += 4;
2114 1           AV *parts_av = newAV();
2115             int32_t j;
2116              
2117 2 100         for (j = 0; j < part_count; j++) {
2118 1           HV *ph = newHV();
2119 1 50         if (end - p < 4) goto done;
2120 1           int32_t pid = kf_read_i32(p); p += 4;
2121 1           hv_store(ph, "partition", 9, newSViv(pid), 0);
2122              
2123 1 50         if (end - p < 2) goto done;
2124 1           int16_t err = kf_read_i16(p); p += 2;
2125 1           hv_store(ph, "error_code", 10, newSViv(err), 0);
2126              
2127 1 50         if (end - p < 8) goto done;
2128 1           int64_t base_offset = kf_read_i64(p); p += 8;
2129 1           hv_store(ph, "base_offset", 11, newSViv(base_offset), 0);
2130              
2131             /* log_append_time (v2+) */
2132 1 50         if (version >= 2) {
2133 0 0         if (end - p < 8) goto done;
2134 0           p += 8;
2135             }
2136              
2137             /* log_start_offset (v5+) */
2138 1 50         if (version >= 5) {
2139 0 0         if (end - p < 8) goto done;
2140 0           p += 8;
2141             }
2142              
2143 1           av_push(parts_av, newRV_noinc((SV*)ph));
2144             }
2145 1           hv_store(th, "partitions", 10, newRV_noinc((SV*)parts_av), 0);
2146 1           av_push(topics_av, newRV_noinc((SV*)th));
2147             }
2148              
2149             /* throttle_time_ms (v1+) */
2150 1 50         if (version >= 1 && end - p >= 4) {
    0          
2151 0           int32_t throttle = kf_read_i32(p); p += 4;
2152 0           hv_store(result, "throttle_time_ms", 16, newSViv(throttle), 0);
2153             }
2154              
2155 1           done:
2156 1           hv_store(result, "topics", 6, newRV_noinc((SV*)topics_av), 0);
2157 1           return sv_2mortal(newRV_noinc((SV*)result));
2158             }
2159              
2160             /* ================================================================
2161             * RecordBatch decoder (for Fetch responses)
2162             * ================================================================ */
2163              
2164             /* Decode records from a RecordBatch, push them as hashrefs onto records_av.
2165             * Returns number of records decoded, or -1 on error. */
2166 16           static int kf_decode_record_batch(pTHX_ const char *data, size_t len,
2167             AV *records_av, int64_t *out_base_offset)
2168             {
2169 16           const char *p = data;
2170 16           const char *end = data + len;
2171             int n;
2172              
2173 16 50         if (end - p < 12) return -1;
2174 16           int64_t base_offset = kf_read_i64(p); p += 8;
2175 16 50         if (out_base_offset) *out_base_offset = base_offset;
2176 16           int32_t batch_length = kf_read_i32(p); p += 4;
2177              
2178 16 100         if (end - p < batch_length) return -1;
2179 15           const char *batch_end = p + batch_length;
2180              
2181 15 50         if (batch_end - p < 9) return -1;
2182 15           /* int32_t partition_leader_epoch = kf_read_i32(p); */ p += 4;
2183 15           int8_t magic = (int8_t)*p; p += 1;
2184 15 100         if (magic != 2) return -1; /* only support magic=2 (current format) */
2185 14           uint32_t expected_crc = (uint32_t)kf_read_i32(p); p += 4;
2186             /* CRC32C covers the bytes from attributes to end of batch. */
2187 14 100         if (crc32c(p, (size_t)(batch_end - p)) != expected_crc) return -1;
2188              
2189 13 50         if (batch_end - p < 36) return -1;
2190 13           int16_t attributes = kf_read_i16(p); p += 2;
2191 13           int compression_type = attributes & 0x07;
2192 13           /* int32_t last_offset_delta = kf_read_i32(p); */ p += 4;
2193 13           int64_t first_timestamp = kf_read_i64(p); p += 8;
2194 13           /* int64_t max_timestamp = kf_read_i64(p); */ p += 8;
2195 13           /* int64_t producer_id = kf_read_i64(p); */ p += 8;
2196 13           /* int16_t producer_epoch = kf_read_i16(p); */ p += 2;
2197 13           /* int32_t base_sequence = kf_read_i32(p); */ p += 4;
2198              
2199 13 50         if (batch_end - p < 4) return -1;
2200 13           int32_t record_count = kf_read_i32(p); p += 4;
2201              
2202             /* Decompress if needed */
2203 13           const char *rec_data = p;
2204 13           const char *rec_end = batch_end;
2205 13           char *decompressed = NULL;
2206              
2207 13 100         if (compression_type != COMPRESS_NONE && batch_end > p) {
    50          
2208 7           size_t compressed_len = batch_end - p;
2209 7           size_t decomp_cap = compressed_len * 4;
2210 7 100         if (decomp_cap < 4096) decomp_cap = 4096;
2211              
2212             #ifdef HAVE_ZLIB
2213 7 100         if (compression_type == COMPRESS_GZIP) {
2214 1           int zok = 0;
2215 2 100         while (!zok && decomp_cap < 64 * 1024 * 1024) {
    50          
2216 1           Newx(decompressed, decomp_cap, char);
2217             z_stream zs;
2218 1           Zero(&zs, 1, z_stream);
2219 1           int zinit = inflateInit2(&zs, MAX_WBITS + 16);
2220 1 50         if (zinit != Z_OK) {
2221 0           Safefree(decompressed);
2222 0           decompressed = NULL;
2223 0           break;
2224             }
2225 1           zs.next_in = (Bytef *)p;
2226 1           zs.avail_in = (uInt)compressed_len;
2227 1           zs.next_out = (Bytef *)decompressed;
2228 1           zs.avail_out = (uInt)decomp_cap;
2229 1           int zret = inflate(&zs, Z_FINISH);
2230 1           size_t dest_len = zs.total_out;
2231 1           inflateEnd(&zs);
2232 1 50         if (zret == Z_STREAM_END) {
2233 1           rec_data = decompressed;
2234 1           rec_end = decompressed + dest_len;
2235 1           zok = 1;
2236 0 0         } else if (zret == Z_BUF_ERROR || zret == Z_OK) {
    0          
2237 0           Safefree(decompressed);
2238 0           decompressed = NULL;
2239 0           decomp_cap *= 2;
2240             } else {
2241 0           Safefree(decompressed);
2242 0           decompressed = NULL;
2243 0           break;
2244             }
2245             }
2246             }
2247             #endif
2248             #ifdef HAVE_LZ4
2249             if (compression_type == COMPRESS_LZ4) {
2250             int dlen = -1;
2251             while (decomp_cap < 64 * 1024 * 1024) {
2252             Newx(decompressed, decomp_cap, char);
2253             dlen = LZ4_decompress_safe(p, decompressed,
2254             (int)compressed_len, (int)decomp_cap);
2255             if (dlen >= 0) break;
2256             /* Negative return can mean either malformed input or
2257             * insufficient output buffer — LZ4 doesn't distinguish.
2258             * Grow and retry; if it's malformed we'll bail at the cap. */
2259             Safefree(decompressed);
2260             decompressed = NULL;
2261             decomp_cap *= 2;
2262             }
2263             if (dlen > 0) {
2264             rec_data = decompressed;
2265             rec_end = decompressed + dlen;
2266             } else if (decompressed) {
2267             Safefree(decompressed);
2268             decompressed = NULL;
2269             }
2270             }
2271             #endif
2272             #ifdef HAVE_ZSTD
2273             if (compression_type == COMPRESS_ZSTD) {
2274             unsigned long long expected =
2275             ZSTD_getFrameContentSize(p, compressed_len);
2276             size_t dlen;
2277             if (expected != ZSTD_CONTENTSIZE_ERROR
2278             && expected != ZSTD_CONTENTSIZE_UNKNOWN
2279             && expected <= 64ULL * 1024 * 1024) {
2280             Newx(decompressed, (size_t)expected, char);
2281             dlen = ZSTD_decompress(decompressed, (size_t)expected,
2282             p, compressed_len);
2283             if (!ZSTD_isError(dlen)) {
2284             rec_data = decompressed;
2285             rec_end = decompressed + dlen;
2286             } else {
2287             Safefree(decompressed);
2288             decompressed = NULL;
2289             }
2290             }
2291             }
2292             #endif
2293             #ifdef HAVE_SNAPPY
2294             if (compression_type == COMPRESS_SNAPPY) {
2295             size_t dlen;
2296             if (snappy_uncompressed_length(p, compressed_len, &dlen)
2297             == SNAPPY_OK
2298             && dlen <= 64UL * 1024 * 1024) {
2299             Newx(decompressed, dlen, char);
2300             if (snappy_uncompress(p, compressed_len,
2301             decompressed, &dlen) == SNAPPY_OK) {
2302             rec_data = decompressed;
2303             rec_end = decompressed + dlen;
2304             } else {
2305             Safefree(decompressed);
2306             decompressed = NULL;
2307             }
2308             }
2309             }
2310             #endif
2311             }
2312              
2313 13           const char *rp = rec_data;
2314             int32_t i;
2315 151 100         for (i = 0; i < record_count; i++) {
2316             int64_t rec_len;
2317 138           n = kf_read_varint(rp, rec_end, &rec_len);
2318 138 50         if (n < 0) { if (decompressed) Safefree(decompressed); return -1; }
    0          
2319 138           rp += n;
2320 138 50         if (rec_end - rp < rec_len) { if (decompressed) Safefree(decompressed); return -1; }
    0          
2321 138           const char *this_rec_end = rp + rec_len;
2322              
2323 138 50         if (this_rec_end - rp < 1) { if (decompressed) Safefree(decompressed); return -1; }
    0          
2324 138           /* int8_t rec_attrs = (int8_t)*rp; */ rp += 1;
2325              
2326             int64_t ts_delta;
2327 138           n = kf_read_varint(rp, this_rec_end, &ts_delta);
2328 138 50         if (n < 0) { if (decompressed) Safefree(decompressed); return -1; }
    0          
2329 138           rp += n;
2330              
2331             int64_t offset_delta;
2332 138           n = kf_read_varint(rp, this_rec_end, &offset_delta);
2333 138 50         if (n < 0) { if (decompressed) Safefree(decompressed); return -1; }
    0          
2334 138           rp += n;
2335              
2336             /* key */
2337             int64_t key_len;
2338 138           n = kf_read_varint(rp, this_rec_end, &key_len);
2339 138 50         if (n < 0) { if (decompressed) Safefree(decompressed); return -1; }
    0          
2340 138           rp += n;
2341 138           const char *key_data = NULL;
2342 138 100         if (key_len >= 0) {
2343 137 50         if (this_rec_end - rp < key_len) { if (decompressed) Safefree(decompressed); return -1; }
    0          
2344 137           key_data = rp;
2345 137           rp += key_len;
2346             }
2347              
2348             /* value */
2349             int64_t val_len;
2350 138           n = kf_read_varint(rp, this_rec_end, &val_len);
2351 138 50         if (n < 0) { if (decompressed) Safefree(decompressed); return -1; }
    0          
2352 138           rp += n;
2353 138           const char *val_data = NULL;
2354 138 100         if (val_len >= 0) {
2355 137 50         if (this_rec_end - rp < val_len) { if (decompressed) Safefree(decompressed); return -1; }
    0          
2356 137           val_data = rp;
2357 137           rp += val_len;
2358             }
2359              
2360             /* headers */
2361             int64_t hdr_count;
2362 138           n = kf_read_varint(rp, this_rec_end, &hdr_count);
2363 138 50         if (n < 0) { if (decompressed) Safefree(decompressed); return -1; }
    0          
2364 138           rp += n;
2365 138           HV *hdr_hv = NULL;
2366 138 100         if (hdr_count > 0) {
2367 1           hdr_hv = newHV();
2368             int64_t h;
2369 3 100         for (h = 0; h < hdr_count; h++) {
2370             int64_t hk_len;
2371 2           n = kf_read_varint(rp, this_rec_end, &hk_len);
2372 2 50         if (n < 0) { SvREFCNT_dec((SV*)hdr_hv); if (decompressed) Safefree(decompressed); return -1; }
    0          
2373 2           rp += n;
2374 2           const char *hk_data = rp;
2375 2 50         if (hk_len < 0 || this_rec_end - rp < hk_len) { SvREFCNT_dec((SV*)hdr_hv); if (decompressed) Safefree(decompressed); return -1; }
    50          
    0          
2376 2           rp += hk_len;
2377              
2378             int64_t hv_len;
2379 2           n = kf_read_varint(rp, this_rec_end, &hv_len);
2380 2 50         if (n < 0) { SvREFCNT_dec((SV*)hdr_hv); if (decompressed) Safefree(decompressed); return -1; }
    0          
2381 2           rp += n;
2382 2           const char *hv_data = rp;
2383 2 50         if (hv_len >= 0) {
2384 2 50         if (this_rec_end - rp < hv_len) { SvREFCNT_dec((SV*)hdr_hv); if (decompressed) Safefree(decompressed); return -1; }
    0          
2385 2           rp += hv_len;
2386             }
2387              
2388 2 50         hv_store(hdr_hv, hk_data, (I32)hk_len,
2389             hv_len >= 0 ? newSVpvn(hv_data, (STRLEN)hv_len) : newSV(0), 0);
2390             }
2391             }
2392              
2393 138           HV *rec_hv = newHV();
2394 138           hv_store(rec_hv, "offset", 6, newSViv(base_offset + offset_delta), 0);
2395 138           hv_store(rec_hv, "timestamp", 9, newSViv(first_timestamp + ts_delta), 0);
2396 138 100         hv_store(rec_hv, "key", 3,
2397             key_data ? newSVpvn(key_data, (STRLEN)key_len) : newSV(0), 0);
2398 138 100         hv_store(rec_hv, "value", 5,
2399             val_data ? newSVpvn(val_data, (STRLEN)val_len) : newSV(0), 0);
2400 138 100         if (hdr_hv)
2401 1           hv_store(rec_hv, "headers", 7, newRV_noinc((SV*)hdr_hv), 0);
2402              
2403 138           av_push(records_av, newRV_noinc((SV*)rec_hv));
2404              
2405 138           rp = this_rec_end; /* skip any remaining bytes in the record */
2406             }
2407              
2408 13 100         if (decompressed) Safefree(decompressed);
2409 13           return record_count;
2410             }
2411              
2412             /* Fetch response parser (API 1, v4-v7 non-flexible) */
2413 0           static SV* conn_parse_fetch_response(pTHX_ ev_kafka_conn_t *self,
2414             int16_t version, const char *data, size_t len)
2415             {
2416 0           const char *p = data;
2417 0           const char *end = data + len;
2418 0           HV *result = newHV();
2419 0           AV *topics_av = newAV();
2420             int n;
2421              
2422             (void)self;
2423              
2424             /* throttle_time_ms (v1+) */
2425 0 0         if (version >= 1) {
2426 0 0         if (end - p < 4) goto done;
2427 0           int32_t throttle = kf_read_i32(p); p += 4;
2428 0           hv_store(result, "throttle_time_ms", 16, newSViv(throttle), 0);
2429             }
2430              
2431             /* error_code (v7+) */
2432 0 0         if (version >= 7) {
2433 0 0         if (end - p < 2) goto done;
2434 0           p += 2;
2435             }
2436              
2437             /* session_id (v7+) */
2438 0 0         if (version >= 7) {
2439 0 0         if (end - p < 4) goto done;
2440 0           p += 4;
2441             }
2442              
2443             /* responses: ARRAY */
2444 0 0         if (end - p < 4) goto done;
2445 0           int32_t topic_count = kf_read_i32(p); p += 4;
2446             int32_t i;
2447              
2448 0 0         for (i = 0; i < topic_count; i++) {
2449 0           HV *th = newHV();
2450              
2451             const char *tname; int16_t tnlen;
2452 0           n = kf_read_string(p, end, &tname, &tnlen);
2453 0 0         if (n < 0) goto done;
2454 0           p += n;
2455 0 0         hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
    0          
2456              
2457             /* partitions: ARRAY */
2458 0 0         if (end - p < 4) goto done;
2459 0           int32_t part_count = kf_read_i32(p); p += 4;
2460 0           AV *parts_av = newAV();
2461             int32_t j;
2462              
2463 0 0         for (j = 0; j < part_count; j++) {
2464 0           HV *ph = newHV();
2465              
2466 0 0         if (end - p < 4) goto done;
2467 0           int32_t pid = kf_read_i32(p); p += 4;
2468 0           hv_store(ph, "partition", 9, newSViv(pid), 0);
2469              
2470 0 0         if (end - p < 2) goto done;
2471 0           int16_t err = kf_read_i16(p); p += 2;
2472 0           hv_store(ph, "error_code", 10, newSViv(err), 0);
2473              
2474 0 0         if (end - p < 8) goto done;
2475 0           int64_t hw = kf_read_i64(p); p += 8;
2476 0           hv_store(ph, "high_watermark", 14, newSViv(hw), 0);
2477              
2478             /* last_stable_offset (v4+) */
2479 0 0         if (version >= 4) {
2480 0 0         if (end - p < 8) goto done;
2481 0           int64_t lso = kf_read_i64(p); p += 8;
2482 0           hv_store(ph, "last_stable_offset", 18, newSViv(lso), 0);
2483             }
2484              
2485             /* log_start_offset (v5+) */
2486 0 0         if (version >= 5) {
2487 0 0         if (end - p < 8) goto done;
2488 0           p += 8;
2489             }
2490              
2491             /* aborted_transactions (v4+) */
2492 0 0         if (version >= 4) {
2493 0 0         if (end - p < 4) goto done;
2494 0           int32_t at_count = kf_read_i32(p); p += 4;
2495             int32_t at;
2496 0 0         for (at = 0; at < at_count; at++) {
2497 0 0         if (end - p < 16) goto done;
2498 0           p += 16; /* producer_id(i64) + first_offset(i64) */
2499             }
2500             }
2501              
2502             /* record_set: BYTES (records data) */
2503 0 0         if (end - p < 4) goto done;
2504 0           int32_t records_size = kf_read_i32(p); p += 4;
2505              
2506 0           AV *records_av = newAV();
2507 0 0         if (records_size > 0 && end - p >= records_size) {
    0          
2508 0           const char *rp = p;
2509 0           const char *rend = p + records_size;
2510              
2511             /* May contain multiple RecordBatches */
2512 0 0         while (rp < rend && rend - rp >= 12) {
    0          
2513             int64_t bo;
2514 0           int32_t bl = kf_read_i32(rp + 8);
2515 0 0         if (bl < 0 || rend - rp < 12 + bl) break;
    0          
2516 0           kf_decode_record_batch(aTHX_ rp, 12 + (size_t)bl, records_av, &bo);
2517 0           rp += 12 + bl;
2518             }
2519              
2520 0           p += records_size;
2521 0 0         } else if (records_size > 0) {
2522 0           p = end; /* truncated */
2523             }
2524              
2525 0           hv_store(ph, "records", 7, newRV_noinc((SV*)records_av), 0);
2526 0           av_push(parts_av, newRV_noinc((SV*)ph));
2527             }
2528              
2529 0           hv_store(th, "partitions", 10, newRV_noinc((SV*)parts_av), 0);
2530 0           av_push(topics_av, newRV_noinc((SV*)th));
2531             }
2532              
2533 0           done:
2534 0           hv_store(result, "topics", 6, newRV_noinc((SV*)topics_av), 0);
2535 0           return sv_2mortal(newRV_noinc((SV*)result));
2536             }
2537              
2538             /* ListOffsets response parser (API 2, v1+) */
2539 0           static SV* conn_parse_list_offsets_response(pTHX_ ev_kafka_conn_t *self,
2540             int16_t version, const char *data, size_t len)
2541             {
2542 0           const char *p = data;
2543 0           const char *end = data + len;
2544 0           HV *result = newHV();
2545 0           AV *topics_av = newAV();
2546             int n;
2547              
2548             (void)self;
2549              
2550             /* throttle_time_ms (v2+) */
2551 0 0         if (version >= 2) {
2552 0 0         if (end - p < 4) goto done;
2553 0           p += 4;
2554             }
2555              
2556             /* topics: ARRAY */
2557 0 0         if (end - p < 4) goto done;
2558 0           int32_t topic_count = kf_read_i32(p); p += 4;
2559             int32_t i;
2560              
2561 0 0         for (i = 0; i < topic_count; i++) {
2562 0           HV *th = newHV();
2563             const char *tname; int16_t tnlen;
2564 0           n = kf_read_string(p, end, &tname, &tnlen);
2565 0 0         if (n < 0) goto done;
2566 0           p += n;
2567 0 0         hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
    0          
2568              
2569 0 0         if (end - p < 4) goto done;
2570 0           int32_t part_count = kf_read_i32(p); p += 4;
2571 0           AV *parts_av = newAV();
2572             int32_t j;
2573              
2574 0 0         for (j = 0; j < part_count; j++) {
2575 0           HV *ph = newHV();
2576 0 0         if (end - p < 4) goto done;
2577 0           int32_t pid = kf_read_i32(p); p += 4;
2578 0           hv_store(ph, "partition", 9, newSViv(pid), 0);
2579              
2580 0 0         if (end - p < 2) goto done;
2581 0           int16_t err = kf_read_i16(p); p += 2;
2582 0           hv_store(ph, "error_code", 10, newSViv(err), 0);
2583              
2584 0 0         if (version >= 1) {
2585 0 0         if (end - p < 8) goto done;
2586 0           int64_t ts = kf_read_i64(p); p += 8;
2587 0           hv_store(ph, "timestamp", 9, newSViv(ts), 0);
2588             }
2589              
2590 0 0         if (end - p < 8) goto done;
2591 0           int64_t offset = kf_read_i64(p); p += 8;
2592 0           hv_store(ph, "offset", 6, newSViv(offset), 0);
2593              
2594             /* leader_epoch (v4+) */
2595 0 0         if (version >= 4) {
2596 0 0         if (end - p < 4) goto done;
2597 0           p += 4;
2598             }
2599              
2600 0           av_push(parts_av, newRV_noinc((SV*)ph));
2601             }
2602 0           hv_store(th, "partitions", 10, newRV_noinc((SV*)parts_av), 0);
2603 0           av_push(topics_av, newRV_noinc((SV*)th));
2604             }
2605              
2606 0           done:
2607 0           hv_store(result, "topics", 6, newRV_noinc((SV*)topics_av), 0);
2608 0           return sv_2mortal(newRV_noinc((SV*)result));
2609             }
2610              
2611             /* FindCoordinator response parser (API 10, v0-v3) */
2612 1           static SV* conn_parse_find_coordinator_response(pTHX_ ev_kafka_conn_t *self,
2613             int16_t version, const char *data, size_t len)
2614             {
2615 1           const char *p = data;
2616 1           const char *end = data + len;
2617 1           HV *result = newHV();
2618             int n;
2619             (void)self;
2620              
2621             /* throttle_time_ms (v1+) */
2622 1 50         if (version >= 1) {
2623 0 0         if (end - p < 4) goto done;
2624 0           p += 4;
2625             }
2626              
2627 1 50         if (end - p < 2) goto done;
2628 1           int16_t err = kf_read_i16(p); p += 2;
2629 1           hv_store(result, "error_code", 10, newSViv(err), 0);
2630              
2631             /* error_message (v1+) */
2632 1 50         if (version >= 1) {
2633             const char *emsg; int16_t elen;
2634 0           n = kf_read_string(p, end, &emsg, &elen);
2635 0 0         if (n < 0) goto done;
2636 0           p += n;
2637 0 0         if (emsg && elen > 0)
    0          
2638 0           hv_store(result, "error_message", 13, newSVpvn(emsg, elen), 0);
2639             }
2640              
2641 1 50         if (end - p < 4) goto done;
2642 1           int32_t nid = kf_read_i32(p); p += 4;
2643 1           hv_store(result, "node_id", 7, newSViv(nid), 0);
2644              
2645             const char *host; int16_t hlen;
2646 1           n = kf_read_string(p, end, &host, &hlen);
2647 1 50         if (n < 0) goto done;
2648 1           p += n;
2649 1 50         hv_store(result, "host", 4, newSVpvn(host ? host : "", host ? hlen : 0), 0);
    50          
2650              
2651 1 50         if (end - p < 4) goto done;
2652 1           int32_t port = kf_read_i32(p); p += 4;
2653 1           hv_store(result, "port", 4, newSViv(port), 0);
2654              
2655 1           done:
2656 1           return sv_2mortal(newRV_noinc((SV*)result));
2657             }
2658              
2659             /* JoinGroup response parser (API 11, v0-v5) */
2660 1           static SV* conn_parse_join_group_response(pTHX_ ev_kafka_conn_t *self,
2661             int16_t version, const char *data, size_t len)
2662             {
2663 1           const char *p = data;
2664 1           const char *end = data + len;
2665 1           HV *result = newHV();
2666             int n;
2667             (void)self;
2668              
2669             /* throttle_time_ms (v2+) */
2670 1 50         if (version >= 2) {
2671 0 0         if (end - p < 4) goto done;
2672 0           p += 4;
2673             }
2674              
2675 1 50         if (end - p < 2) goto done;
2676 1           int16_t err = kf_read_i16(p); p += 2;
2677 1           hv_store(result, "error_code", 10, newSViv(err), 0);
2678              
2679 1 50         if (end - p < 4) goto done;
2680 1           int32_t gen = kf_read_i32(p); p += 4;
2681 1           hv_store(result, "generation_id", 13, newSViv(gen), 0);
2682              
2683             /* protocol_type (v7+) — skip for now, we use v5 max */
2684              
2685             const char *proto; int16_t plen;
2686 1           n = kf_read_string(p, end, &proto, &plen);
2687 1 50         if (n < 0) goto done;
2688 1           p += n;
2689 1 50         if (proto)
2690 1           hv_store(result, "protocol_name", 13, newSVpvn(proto, plen), 0);
2691              
2692             const char *leader; int16_t llen;
2693 1           n = kf_read_string(p, end, &leader, &llen);
2694 1 50         if (n < 0) goto done;
2695 1           p += n;
2696 1 50         if (leader)
2697 1           hv_store(result, "leader", 6, newSVpvn(leader, llen), 0);
2698              
2699             /* skip_assignment (v9+) — not applicable */
2700              
2701             const char *member_id; int16_t mlen;
2702 1           n = kf_read_string(p, end, &member_id, &mlen);
2703 1 50         if (n < 0) goto done;
2704 1           p += n;
2705 1 50         if (member_id)
2706 1           hv_store(result, "member_id", 9, newSVpvn(member_id, mlen), 0);
2707              
2708             /* members array */
2709 1 50         if (end - p < 4) goto done;
2710 1           int32_t mcount = kf_read_i32(p); p += 4;
2711 1           AV *members_av = newAV();
2712             int32_t i;
2713 2 100         for (i = 0; i < mcount; i++) {
2714 1           HV *mh = newHV();
2715              
2716             const char *mid; int16_t midlen;
2717 1           n = kf_read_string(p, end, &mid, &midlen);
2718 1 50         if (n < 0) goto done;
2719 1           p += n;
2720 1 50         if (mid)
2721 1           hv_store(mh, "member_id", 9, newSVpvn(mid, midlen), 0);
2722              
2723             /* group_instance_id (v5+) */
2724 1 50         if (version >= 5) {
2725             const char *gi; int16_t gilen;
2726 0           n = kf_read_string(p, end, &gi, &gilen);
2727 0 0         if (n < 0) goto done;
2728 0           p += n;
2729             }
2730              
2731             /* metadata: BYTES */
2732 1 50         if (end - p < 4) goto done;
2733 1           int32_t mdlen = kf_read_i32(p); p += 4;
2734 1 50         if (mdlen > 0) {
2735 0 0         if (end - p < mdlen) goto done;
2736 0           hv_store(mh, "metadata", 8, newSVpvn(p, mdlen), 0);
2737 0           p += mdlen;
2738             }
2739              
2740 1           av_push(members_av, newRV_noinc((SV*)mh));
2741             }
2742 1           hv_store(result, "members", 7, newRV_noinc((SV*)members_av), 0);
2743              
2744 1           done:
2745 1           return sv_2mortal(newRV_noinc((SV*)result));
2746             }
2747              
2748             /* SyncGroup response parser (API 14, v0-v3) */
2749 1           static SV* conn_parse_sync_group_response(pTHX_ ev_kafka_conn_t *self,
2750             int16_t version, const char *data, size_t len)
2751             {
2752 1           const char *p = data;
2753 1           const char *end = data + len;
2754 1           HV *result = newHV();
2755             (void)self;
2756              
2757             /* throttle_time_ms (v1+) */
2758 1 50         if (version >= 1) {
2759 0 0         if (end - p < 4) goto done;
2760 0           p += 4;
2761             }
2762              
2763 1 50         if (end - p < 2) goto done;
2764 1           int16_t err = kf_read_i16(p); p += 2;
2765 1           hv_store(result, "error_code", 10, newSViv(err), 0);
2766              
2767             /* assignment: BYTES */
2768 1 50         if (end - p < 4) goto done;
2769 1           int32_t alen = kf_read_i32(p); p += 4;
2770 1 50         if (alen > 0 && end - p >= alen) {
    50          
2771 1           hv_store(result, "assignment", 10, newSVpvn(p, alen), 0);
2772 1           p += alen;
2773             }
2774              
2775 0           done:
2776 1           return sv_2mortal(newRV_noinc((SV*)result));
2777             }
2778              
2779             /* Heartbeat response parser (API 12) */
2780 1           static SV* conn_parse_heartbeat_response(pTHX_ ev_kafka_conn_t *self,
2781             int16_t version, const char *data, size_t len)
2782             {
2783 1           const char *p = data;
2784 1           const char *end = data + len;
2785 1           HV *result = newHV();
2786             (void)self;
2787              
2788 1 50         if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */
    0          
2789 1 50         if (end - p >= 2) {
2790 1           int16_t err = kf_read_i16(p); p += 2;
2791 1           hv_store(result, "error_code", 10, newSViv(err), 0);
2792             }
2793              
2794 1           return sv_2mortal(newRV_noinc((SV*)result));
2795             }
2796              
2797             /* OffsetCommit response parser (API 8, v0-v7) */
2798 0           static SV* conn_parse_offset_commit_response(pTHX_ ev_kafka_conn_t *self,
2799             int16_t version, const char *data, size_t len)
2800             {
2801 0           const char *p = data;
2802 0           const char *end = data + len;
2803 0           HV *result = newHV();
2804 0           AV *topics_av = newAV();
2805             int n;
2806             (void)self;
2807              
2808 0 0         if (version >= 3 && end - p >= 4) p += 4; /* throttle_time_ms */
    0          
2809              
2810 0 0         if (end - p < 4) goto done;
2811 0           int32_t tc = kf_read_i32(p); p += 4;
2812             int32_t i;
2813 0 0         for (i = 0; i < tc; i++) {
2814 0           HV *th = newHV();
2815             const char *tname; int16_t tnlen;
2816 0           n = kf_read_string(p, end, &tname, &tnlen);
2817 0 0         if (n < 0) goto done;
2818 0           p += n;
2819 0 0         hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
    0          
2820              
2821 0 0         if (end - p < 4) goto done;
2822 0           int32_t pc = kf_read_i32(p); p += 4;
2823 0           AV *parts_av = newAV();
2824             int32_t j;
2825 0 0         for (j = 0; j < pc; j++) {
2826 0           HV *ph = newHV();
2827 0 0         if (end - p < 6) goto done;
2828 0           int32_t pid = kf_read_i32(p); p += 4;
2829 0           int16_t err = kf_read_i16(p); p += 2;
2830 0           hv_store(ph, "partition", 9, newSViv(pid), 0);
2831 0           hv_store(ph, "error_code", 10, newSViv(err), 0);
2832 0           av_push(parts_av, newRV_noinc((SV*)ph));
2833             }
2834 0           hv_store(th, "partitions", 10, newRV_noinc((SV*)parts_av), 0);
2835 0           av_push(topics_av, newRV_noinc((SV*)th));
2836             }
2837              
2838 0           done:
2839 0           hv_store(result, "topics", 6, newRV_noinc((SV*)topics_av), 0);
2840 0           return sv_2mortal(newRV_noinc((SV*)result));
2841             }
2842              
2843             /* OffsetFetch response parser (API 9, v0-v5) */
2844 1           static SV* conn_parse_offset_fetch_response(pTHX_ ev_kafka_conn_t *self,
2845             int16_t version, const char *data, size_t len)
2846             {
2847 1           const char *p = data;
2848 1           const char *end = data + len;
2849 1           HV *result = newHV();
2850 1           AV *topics_av = newAV();
2851             int n;
2852             (void)self;
2853              
2854 1 50         if (version >= 3 && end - p >= 4) p += 4; /* throttle_time_ms */
    0          
2855              
2856 1 50         if (end - p < 4) goto done;
2857 1           int32_t tc = kf_read_i32(p); p += 4;
2858             int32_t i;
2859 2 100         for (i = 0; i < tc; i++) {
2860 1           HV *th = newHV();
2861             const char *tname; int16_t tnlen;
2862 1           n = kf_read_string(p, end, &tname, &tnlen);
2863 1 50         if (n < 0) goto done;
2864 1           p += n;
2865 1 50         hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
    50          
2866              
2867 1 50         if (end - p < 4) goto done;
2868 1           int32_t pc = kf_read_i32(p); p += 4;
2869 1           AV *parts_av = newAV();
2870             int32_t j;
2871 2 100         for (j = 0; j < pc; j++) {
2872 1           HV *ph = newHV();
2873 1 50         if (end - p < 4) goto done;
2874 1           int32_t pid = kf_read_i32(p); p += 4;
2875 1           hv_store(ph, "partition", 9, newSViv(pid), 0);
2876              
2877 1 50         if (end - p < 8) goto done;
2878 1           int64_t offset = kf_read_i64(p); p += 8;
2879 1           hv_store(ph, "offset", 6, newSViv(offset), 0);
2880              
2881             /* leader_epoch (v5+) */
2882 1 50         if (version >= 5 && end - p >= 4) p += 4;
    0          
2883              
2884             const char *meta_str; int16_t meta_len;
2885 1           n = kf_read_string(p, end, &meta_str, &meta_len);
2886 1 50         if (n < 0) goto done;
2887 1           p += n;
2888              
2889 1 50         if (end - p < 2) goto done;
2890 1           int16_t err = kf_read_i16(p); p += 2;
2891 1           hv_store(ph, "error_code", 10, newSViv(err), 0);
2892              
2893 1           av_push(parts_av, newRV_noinc((SV*)ph));
2894             }
2895 1           hv_store(th, "partitions", 10, newRV_noinc((SV*)parts_av), 0);
2896 1           av_push(topics_av, newRV_noinc((SV*)th));
2897             }
2898              
2899             /* error_code (v2+) */
2900 1 50         if (version >= 2 && end - p >= 2) {
    0          
2901 0           int16_t err = kf_read_i16(p); p += 2;
2902 0           hv_store(result, "error_code", 10, newSViv(err), 0);
2903             }
2904              
2905 1           done:
2906 1           hv_store(result, "topics", 6, newRV_noinc((SV*)topics_av), 0);
2907 1           return sv_2mortal(newRV_noinc((SV*)result));
2908             }
2909              
2910             /* LeaveGroup response parser (API 13, v0-v3) */
2911 1           static SV* conn_parse_leave_group_response(pTHX_ ev_kafka_conn_t *self,
2912             int16_t version, const char *data, size_t len)
2913             {
2914 1           const char *p = data;
2915 1           const char *end = data + len;
2916 1           HV *result = newHV();
2917             (void)self;
2918              
2919 1 50         if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */
    0          
2920 1 50         if (end - p >= 2) {
2921 1           int16_t err = kf_read_i16(p); p += 2;
2922 1           hv_store(result, "error_code", 10, newSViv(err), 0);
2923             }
2924              
2925 1           return sv_2mortal(newRV_noinc((SV*)result));
2926             }
2927              
2928             /* CreateTopics response parser (API 19, v0-v4) */
2929 1           static SV* conn_parse_create_topics_response(pTHX_ ev_kafka_conn_t *self,
2930             int16_t version, const char *data, size_t len)
2931             {
2932 1           const char *p = data;
2933 1           const char *end = data + len;
2934 1           HV *result = newHV();
2935 1           AV *topics_av = newAV();
2936             int n;
2937             (void)self;
2938              
2939 1 50         if (version >= 2 && end - p >= 4) p += 4; /* throttle_time_ms */
    0          
2940              
2941 1 50         if (end - p < 4) goto done;
2942 1           int32_t tc = kf_read_i32(p); p += 4;
2943             int32_t i;
2944 2 100         for (i = 0; i < tc; i++) {
2945 1           HV *th = newHV();
2946             const char *tname; int16_t tnlen;
2947 1           n = kf_read_string(p, end, &tname, &tnlen);
2948 1 50         if (n < 0) goto done;
2949 1           p += n;
2950 1 50         hv_store(th, "name", 4, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
    50          
2951              
2952 1 50         if (end - p < 2) goto done;
2953 1           int16_t err = kf_read_i16(p); p += 2;
2954 1           hv_store(th, "error_code", 10, newSViv(err), 0);
2955              
2956             /* error_message (v1+) */
2957 1 50         if (version >= 1) {
2958             const char *emsg; int16_t elen;
2959 0           n = kf_read_string(p, end, &emsg, &elen);
2960 0 0         if (n < 0) goto done;
2961 0           p += n;
2962 0 0         if (emsg && elen > 0)
    0          
2963 0           hv_store(th, "error_message", 13, newSVpvn(emsg, elen), 0);
2964             }
2965              
2966 1           av_push(topics_av, newRV_noinc((SV*)th));
2967             }
2968              
2969 1           done:
2970 1           hv_store(result, "topics", 6, newRV_noinc((SV*)topics_av), 0);
2971 1           return sv_2mortal(newRV_noinc((SV*)result));
2972             }
2973              
2974             /* DeleteTopics response parser (API 20, v0-v3) */
2975 1           static SV* conn_parse_delete_topics_response(pTHX_ ev_kafka_conn_t *self,
2976             int16_t version, const char *data, size_t len)
2977             {
2978 1           const char *p = data;
2979 1           const char *end = data + len;
2980 1           HV *result = newHV();
2981 1           AV *topics_av = newAV();
2982             int n;
2983             (void)self;
2984              
2985 1 50         if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */
    0          
2986              
2987 1 50         if (end - p < 4) goto done;
2988 1           int32_t tc = kf_read_i32(p); p += 4;
2989             int32_t i;
2990 2 100         for (i = 0; i < tc; i++) {
2991 1           HV *th = newHV();
2992             const char *tname; int16_t tnlen;
2993 1           n = kf_read_string(p, end, &tname, &tnlen);
2994 1 50         if (n < 0) goto done;
2995 1           p += n;
2996 1 50         hv_store(th, "name", 4, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
    50          
2997              
2998 1 50         if (end - p < 2) goto done;
2999 1           int16_t err = kf_read_i16(p); p += 2;
3000 1           hv_store(th, "error_code", 10, newSViv(err), 0);
3001              
3002 1           av_push(topics_av, newRV_noinc((SV*)th));
3003             }
3004              
3005 1           done:
3006 1           hv_store(result, "topics", 6, newRV_noinc((SV*)topics_av), 0);
3007 1           return sv_2mortal(newRV_noinc((SV*)result));
3008             }
3009              
3010             /* InitProducerId response parser (API 22, v0-v2) */
3011 1           static SV* conn_parse_init_producer_id_response(pTHX_ ev_kafka_conn_t *self,
3012             int16_t version, const char *data, size_t len)
3013             {
3014 1           const char *p = data;
3015 1           const char *end = data + len;
3016 1           HV *result = newHV();
3017             (void)self;
3018              
3019 1 50         if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */
    50          
3020              
3021 1 50         if (end - p < 2) goto done;
3022 1           int16_t err = kf_read_i16(p); p += 2;
3023 1           hv_store(result, "error_code", 10, newSViv(err), 0);
3024              
3025 1 50         if (end - p < 8) goto done;
3026 1           int64_t producer_id = kf_read_i64(p); p += 8;
3027 1           hv_store(result, "producer_id", 11, newSViv(producer_id), 0);
3028              
3029 1 50         if (end - p < 2) goto done;
3030 1           int16_t producer_epoch = kf_read_i16(p); p += 2;
3031 1           hv_store(result, "producer_epoch", 14, newSViv(producer_epoch), 0);
3032              
3033 1           done:
3034 1           return sv_2mortal(newRV_noinc((SV*)result));
3035             }
3036              
3037             /* AddPartitionsToTxn response parser (API 24, v0-v1) */
3038 1           static SV* conn_parse_add_partitions_to_txn_response(pTHX_ ev_kafka_conn_t *self,
3039             int16_t version, const char *data, size_t len)
3040             {
3041 1           const char *p = data;
3042 1           const char *end = data + len;
3043 1           HV *result = newHV();
3044 1           AV *topics_av = newAV();
3045             int n;
3046             (void)self; (void)version;
3047              
3048 1 50         if (end - p < 4) goto done;
3049 1           p += 4; /* throttle_time_ms */
3050              
3051 1 50         if (end - p < 4) goto done;
3052 1           int32_t tc = kf_read_i32(p); p += 4;
3053             int32_t i;
3054 2 100         for (i = 0; i < tc; i++) {
3055 1           HV *th = newHV();
3056             const char *tname; int16_t tnlen;
3057 1           n = kf_read_string(p, end, &tname, &tnlen);
3058 1 50         if (n < 0) goto done;
3059 1           p += n;
3060 1 50         hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
    50          
3061              
3062 1 50         if (end - p < 4) goto done;
3063 1           int32_t pc = kf_read_i32(p); p += 4;
3064 1           AV *parts_av = newAV();
3065             int32_t j;
3066 2 100         for (j = 0; j < pc; j++) {
3067 1           HV *ph = newHV();
3068 1 50         if (end - p < 6) goto done;
3069 1           int32_t pid = kf_read_i32(p); p += 4;
3070 1           int16_t err = kf_read_i16(p); p += 2;
3071 1           hv_store(ph, "partition", 9, newSViv(pid), 0);
3072 1           hv_store(ph, "error_code", 10, newSViv(err), 0);
3073 1           av_push(parts_av, newRV_noinc((SV*)ph));
3074             }
3075 1           hv_store(th, "partitions", 10, newRV_noinc((SV*)parts_av), 0);
3076 1           av_push(topics_av, newRV_noinc((SV*)th));
3077             }
3078              
3079 1           done:
3080 1           hv_store(result, "topics", 6, newRV_noinc((SV*)topics_av), 0);
3081 1           return sv_2mortal(newRV_noinc((SV*)result));
3082             }
3083              
3084             /* EndTxn response parser (API 26, v0-v1) */
3085 1           static SV* conn_parse_end_txn_response(pTHX_ ev_kafka_conn_t *self,
3086             int16_t version, const char *data, size_t len)
3087             {
3088 1           const char *p = data;
3089 1           const char *end = data + len;
3090 1           HV *result = newHV();
3091             (void)self; (void)version;
3092              
3093 1 50         if (end - p >= 4) p += 4; /* throttle_time_ms */
3094 1 50         if (end - p >= 2) {
3095 1           int16_t err = kf_read_i16(p); p += 2;
3096 1           hv_store(result, "error_code", 10, newSViv(err), 0);
3097             }
3098              
3099 1           return sv_2mortal(newRV_noinc((SV*)result));
3100             }
3101              
3102             /* TxnOffsetCommit response parser (API 28, v0-v2) */
3103 1           static SV* conn_parse_txn_offset_commit_response(pTHX_ ev_kafka_conn_t *self,
3104             int16_t version, const char *data, size_t len)
3105             {
3106 1           const char *p = data;
3107 1           const char *end = data + len;
3108 1           HV *result = newHV();
3109 1           AV *topics_av = newAV();
3110             int n;
3111             (void)self;
3112              
3113 1 50         if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */
    0          
3114              
3115 1 50         if (end - p < 4) goto done;
3116 1           int32_t tc = kf_read_i32(p); p += 4;
3117             int32_t i;
3118 1 50         for (i = 0; i < tc; i++) {
3119 0           HV *th = newHV();
3120             const char *tname; int16_t tnlen;
3121 0           n = kf_read_string(p, end, &tname, &tnlen);
3122 0 0         if (n < 0) goto done;
3123 0           p += n;
3124 0 0         hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
    0          
3125              
3126 0 0         if (end - p < 4) goto done;
3127 0           int32_t pc = kf_read_i32(p); p += 4;
3128 0           AV *parts_av = newAV();
3129             int32_t j;
3130 0 0         for (j = 0; j < pc; j++) {
3131 0           HV *ph = newHV();
3132 0 0         if (end - p < 6) goto done;
3133 0           int32_t pid = kf_read_i32(p); p += 4;
3134 0           int16_t err = kf_read_i16(p); p += 2;
3135 0           hv_store(ph, "partition", 9, newSViv(pid), 0);
3136 0           hv_store(ph, "error_code", 10, newSViv(err), 0);
3137 0           av_push(parts_av, newRV_noinc((SV*)ph));
3138             }
3139 0           hv_store(th, "partitions", 10, newRV_noinc((SV*)parts_av), 0);
3140 0           av_push(topics_av, newRV_noinc((SV*)th));
3141             }
3142              
3143 1           done:
3144 1           hv_store(result, "topics", 6, newRV_noinc((SV*)topics_av), 0);
3145 1           return sv_2mortal(newRV_noinc((SV*)result));
3146             }
3147              
3148             /* ================================================================
3149             * Response processing loop
3150             * ================================================================ */
3151              
3152 1           static void conn_process_responses(pTHX_ ev_kafka_conn_t *self) {
3153 3 100         while (self->rbuf_len >= 4) {
3154 1           int32_t msg_size = kf_read_i32(self->rbuf);
3155 1 50         if (msg_size < 0 || msg_size > 256 * 1024 * 1024) {
    50          
3156 0           conn_emit_error(aTHX_ self, "invalid response size");
3157 0 0         if (conn_check_destroyed(self)) return;
3158 0           conn_handle_disconnect(aTHX_ self, "invalid response size");
3159 0           return;
3160             }
3161              
3162 1 50         if (self->rbuf_len < (size_t)(4 + msg_size))
3163 0           break; /* incomplete response */
3164              
3165 1           const char *msg = self->rbuf + 4;
3166              
3167             /* correlation_id is first 4 bytes of message */
3168 1 50         if (msg_size < 4) {
3169 0           conn_emit_error(aTHX_ self, "response too short");
3170 0 0         if (conn_check_destroyed(self)) return;
3171 0           conn_handle_disconnect(aTHX_ self, "response too short");
3172 0           return;
3173             }
3174              
3175 1           int32_t corr_id = kf_read_i32(msg);
3176 1           const char *payload = msg + 4;
3177 1           size_t payload_len = (size_t)msg_size - 4;
3178              
3179             /* Find matching callback (should be head of queue — Kafka guarantees ordering) */
3180 1           ev_kafka_conn_cb_t *cbt = NULL;
3181 1 50         if (!ngx_queue_empty(&self->cb_queue)) {
3182 1           ngx_queue_t *q = ngx_queue_head(&self->cb_queue);
3183 1           cbt = ngx_queue_data(q, ev_kafka_conn_cb_t, queue);
3184 1 50         if (cbt->correlation_id != corr_id) {
3185             /* Out of order — shouldn't happen with Kafka, but handle gracefully */
3186             char errbuf[128];
3187 0           snprintf(errbuf, sizeof(errbuf),
3188             "correlation ID mismatch: expected %d, got %d",
3189             cbt->correlation_id, corr_id);
3190 0           conn_emit_error(aTHX_ self, errbuf);
3191 0 0         if (conn_check_destroyed(self)) return;
3192 0           conn_handle_disconnect(aTHX_ self, "correlation ID mismatch");
3193 0           return;
3194             }
3195 1           ngx_queue_remove(q);
3196 1           self->pending_count--;
3197             }
3198              
3199             /* Dispatch BEFORE compacting — payload points into rbuf */
3200 1           size_t consumed = 4 + (size_t)msg_size;
3201 1 50         if (cbt) {
3202 1           conn_dispatch_response(aTHX_ self, cbt, payload, payload_len);
3203 1 50         if (cbt->cb) SvREFCNT_dec(cbt->cb);
3204 1           Safefree(cbt);
3205 1 50         if (conn_check_destroyed(self)) return;
3206             }
3207              
3208             /* Compact rbuf after dispatch */
3209 1           self->rbuf_len -= consumed;
3210 1 50         if (self->rbuf_len > 0)
3211 0           memmove(self->rbuf, self->rbuf + consumed, self->rbuf_len);
3212             }
3213             }
3214              
3215             /* ================================================================
3216             * I/O callback
3217             * ================================================================ */
3218              
3219 4           static void conn_io_cb(EV_P_ ev_io *w, int revents) {
3220 4           ev_kafka_conn_t *self = (ev_kafka_conn_t *)w->data;
3221             dTHX;
3222             (void)loop;
3223              
3224 4 50         if (!self || self->magic != KF_MAGIC_ALIVE) return;
    50          
3225              
3226             /* TCP connect in progress */
3227 4 100         if (self->state == CONN_CONNECTING) {
3228 2           int err = 0;
3229 2           socklen_t errlen = sizeof(err);
3230              
3231 2 50         if (self->timing) {
3232 2           ev_timer_stop(self->loop, &self->timer);
3233 2           self->timing = 0;
3234             }
3235 2           conn_stop_writing(self);
3236              
3237 2 50         if (getsockopt(self->fd, SOL_SOCKET, SO_ERROR, &err, &errlen) < 0)
3238 0           err = errno;
3239 2 100         if (err != 0) {
3240             char errbuf[256];
3241 1           snprintf(errbuf, sizeof(errbuf), "connect: %s", strerror(err));
3242 1           conn_emit_error(aTHX_ self, errbuf);
3243 1 50         if (conn_check_destroyed(self)) return;
3244 1           conn_handle_disconnect(aTHX_ self, errbuf);
3245 1           return;
3246             }
3247              
3248 1           conn_on_connect_done(aTHX_ self);
3249 1           return;
3250             }
3251              
3252             #ifdef HAVE_OPENSSL
3253             /* TLS handshake in progress */
3254 2 50         if (self->state == CONN_TLS_HANDSHAKE) {
3255 0           ERR_clear_error();
3256 0           int ret = SSL_connect(self->ssl);
3257 0 0         if (ret == 1) {
3258 0           conn_stop_reading(self);
3259 0           conn_stop_writing(self);
3260              
3261             /* TLS done — proceed to ApiVersions (or SASL if needed) */
3262 0           conn_send_api_versions(aTHX_ self);
3263 0           conn_start_reading(self);
3264 0           return;
3265             }
3266 0           int err = SSL_get_error(self->ssl, ret);
3267 0 0         if (err == SSL_ERROR_WANT_READ) {
3268 0           conn_stop_writing(self);
3269 0           conn_start_reading(self);
3270 0 0         } else if (err == SSL_ERROR_WANT_WRITE) {
3271 0           conn_stop_reading(self);
3272 0           conn_start_writing(self);
3273             } else {
3274             char errbuf[256];
3275 0           unsigned long e = ERR_peek_last_error();
3276 0 0         if (e) {
3277 0           snprintf(errbuf, sizeof(errbuf), "SSL_connect failed: %s",
3278             ERR_reason_error_string(e));
3279             } else {
3280 0           snprintf(errbuf, sizeof(errbuf), "SSL_connect failed (err=%d)", err);
3281             }
3282 0           ERR_clear_error();
3283 0           conn_emit_error(aTHX_ self, errbuf);
3284 0 0         if (conn_check_destroyed(self)) return;
3285 0           conn_handle_disconnect(aTHX_ self, errbuf);
3286             }
3287 0           return;
3288             }
3289             #endif
3290              
3291             /* Write */
3292 2 100         if (revents & EV_WRITE) {
3293 2 100         while (self->wbuf_off < self->wbuf_len) {
3294 1           ssize_t n = kf_io_write(self, self->wbuf + self->wbuf_off,
3295 1           self->wbuf_len - self->wbuf_off);
3296 1 50         if (n < 0) {
3297 0 0         if (errno == EAGAIN || errno == EWOULDBLOCK) break;
    0          
3298 0           conn_emit_error(aTHX_ self, strerror(errno));
3299 0 0         if (conn_check_destroyed(self)) return;
3300 0           conn_handle_disconnect(aTHX_ self, "write error");
3301 0           return;
3302             }
3303 1 50         if (n == 0) {
3304 0           conn_handle_disconnect(aTHX_ self, "connection closed");
3305 0           return;
3306             }
3307 1           self->wbuf_off += n;
3308             }
3309              
3310 1 50         if (self->wbuf_off >= self->wbuf_len) {
3311 1           self->wbuf_off = 0;
3312 1           self->wbuf_len = 0;
3313 1           conn_stop_writing(self);
3314             }
3315             }
3316              
3317             /* Read */
3318 2 100         if (revents & EV_READ) {
3319 1           conn_ensure_rbuf(self, self->rbuf_len + 8192);
3320 1           ssize_t n = kf_io_read(self, self->rbuf + self->rbuf_len,
3321 1           self->rbuf_cap - self->rbuf_len);
3322 1 50         if (n < 0) {
3323 0 0         if (errno == EAGAIN || errno == EWOULDBLOCK) return;
    0          
3324 0           conn_emit_error(aTHX_ self, strerror(errno));
3325 0 0         if (conn_check_destroyed(self)) return;
3326 0           conn_handle_disconnect(aTHX_ self, "read error");
3327 0           return;
3328             }
3329 1 50         if (n == 0) {
3330 0           conn_handle_disconnect(aTHX_ self, "connection closed by broker");
3331 0           return;
3332             }
3333 1           self->rbuf_len += n;
3334              
3335 1           conn_process_responses(aTHX_ self);
3336             }
3337             }
3338              
3339             /* ================================================================
3340             * Connect timer (timeout)
3341             * ================================================================ */
3342              
3343 0           static void conn_timer_cb(EV_P_ ev_timer *w, int revents) {
3344 0           ev_kafka_conn_t *self = (ev_kafka_conn_t *)w->data;
3345             dTHX;
3346             (void)loop;
3347             (void)revents;
3348              
3349 0           self->timing = 0;
3350 0 0         if (self->magic != KF_MAGIC_ALIVE) return;
3351              
3352 0           conn_emit_error(aTHX_ self, "connect timeout");
3353 0 0         if (conn_check_destroyed(self)) return;
3354 0           conn_handle_disconnect(aTHX_ self, "connect timeout");
3355             }
3356              
3357             /* ================================================================
3358             * TCP connect + handshake initiation
3359             * ================================================================ */
3360              
3361             #ifdef HAVE_OPENSSL
3362 0           static int is_ip_literal(const char *host) {
3363             struct in_addr addr4;
3364             struct in6_addr addr6;
3365 0           return (inet_pton(AF_INET, host, &addr4) == 1 ||
3366 0           inet_pton(AF_INET6, host, &addr6) == 1);
3367             }
3368             #endif
3369              
3370 1           static void conn_on_connect_done(pTHX_ ev_kafka_conn_t *self) {
3371             /* TCP connected — set up I/O watchers if not already */
3372              
3373             #ifdef HAVE_OPENSSL
3374 1 50         if (self->tls_enabled) {
3375 0           self->ssl_ctx = SSL_CTX_new(TLS_client_method());
3376 0 0         if (!self->ssl_ctx) {
3377 0           conn_emit_error(aTHX_ self, "SSL_CTX_new failed");
3378 0 0         if (conn_check_destroyed(self)) return;
3379 0           conn_handle_disconnect(aTHX_ self, "SSL_CTX_new failed");
3380 0           return;
3381             }
3382 0           SSL_CTX_set_default_verify_paths(self->ssl_ctx);
3383 0 0         if (self->tls_skip_verify)
3384 0           SSL_CTX_set_verify(self->ssl_ctx, SSL_VERIFY_NONE, NULL);
3385             else
3386 0           SSL_CTX_set_verify(self->ssl_ctx, SSL_VERIFY_PEER, NULL);
3387              
3388 0 0         if (self->tls_ca_file) {
3389 0 0         if (SSL_CTX_load_verify_locations(self->ssl_ctx, self->tls_ca_file, NULL) != 1) {
3390 0           conn_emit_error(aTHX_ self, "SSL_CTX_load_verify_locations failed");
3391 0 0         if (conn_check_destroyed(self)) return;
3392 0           conn_handle_disconnect(aTHX_ self, "SSL_CTX_load_verify_locations failed");
3393 0           return;
3394             }
3395             }
3396              
3397 0           self->ssl = SSL_new(self->ssl_ctx);
3398 0 0         if (!self->ssl) {
3399 0           conn_emit_error(aTHX_ self, "SSL_new failed");
3400 0 0         if (conn_check_destroyed(self)) return;
3401 0           conn_handle_disconnect(aTHX_ self, "SSL_new failed");
3402 0           return;
3403             }
3404 0           SSL_set_fd(self->ssl, self->fd);
3405              
3406 0 0         if (!is_ip_literal(self->host))
3407 0           SSL_set_tlsext_host_name(self->ssl, self->host);
3408              
3409 0 0         if (!self->tls_skip_verify) {
3410 0           X509_VERIFY_PARAM *param = SSL_get0_param(self->ssl);
3411 0           X509_VERIFY_PARAM_set_hostflags(param, X509_CHECK_FLAG_NO_PARTIAL_WILDCARDS);
3412 0 0         if (is_ip_literal(self->host))
3413 0           X509_VERIFY_PARAM_set1_ip_asc(param, self->host);
3414             else
3415 0           X509_VERIFY_PARAM_set1_host(param, self->host, 0);
3416             }
3417              
3418 0           self->state = CONN_TLS_HANDSHAKE;
3419 0           ERR_clear_error();
3420 0           int ret = SSL_connect(self->ssl);
3421 0 0         if (ret == 1) {
3422             /* Immediate success */
3423 0           conn_send_api_versions(aTHX_ self);
3424 0           conn_start_reading(self);
3425 0           return;
3426             }
3427 0           int err = SSL_get_error(self->ssl, ret);
3428 0 0         if (err == SSL_ERROR_WANT_READ) {
3429 0           conn_start_reading(self);
3430 0 0         } else if (err == SSL_ERROR_WANT_WRITE) {
3431 0           conn_start_writing(self);
3432             } else {
3433             char errbuf[256];
3434 0           unsigned long e = ERR_peek_last_error();
3435 0 0         if (e) {
3436 0           snprintf(errbuf, sizeof(errbuf), "SSL_connect failed: %s",
3437             ERR_reason_error_string(e));
3438             } else {
3439 0           snprintf(errbuf, sizeof(errbuf), "SSL_connect failed (err=%d)", err);
3440             }
3441 0           ERR_clear_error();
3442 0           conn_emit_error(aTHX_ self, errbuf);
3443 0 0         if (conn_check_destroyed(self)) return;
3444 0           conn_handle_disconnect(aTHX_ self, errbuf);
3445             }
3446 0           return;
3447             }
3448             #endif
3449              
3450             /* No TLS — send ApiVersions directly */
3451 1           conn_send_api_versions(aTHX_ self);
3452 1           conn_start_reading(self);
3453             }
3454              
3455 2           static void conn_start_connect(pTHX_ ev_kafka_conn_t *self,
3456             const char *host, int port, double timeout)
3457             {
3458             struct addrinfo hints, *res, *rp;
3459             char port_str[16];
3460 2           int fd = -1;
3461              
3462 2 50         if (self->state != CONN_DISCONNECTED) {
3463 0           conn_cleanup(aTHX_ self);
3464             }
3465              
3466             /* Save host/port (skip if already pointing to same string, e.g. reconnect) */
3467 2 50         if (host != self->host) {
3468 2 50         if (self->host) Safefree(self->host);
3469 2           self->host = savepv(host);
3470             }
3471 2           self->port = port;
3472 2           self->intentional_disconnect = 0;
3473              
3474 2           snprintf(port_str, sizeof(port_str), "%d", port);
3475              
3476 2           memset(&hints, 0, sizeof(hints));
3477 2           hints.ai_family = AF_UNSPEC;
3478 2           hints.ai_socktype = SOCK_STREAM;
3479              
3480             /* Fast path: if host is a numeric IP literal, AI_NUMERICHOST avoids any
3481             * DNS resolver work and returns synchronously without blocking the EV
3482             * loop. For non-literal hostnames getaddrinfo still blocks; users that
3483             * need fully-async DNS should resolve in Perl-land before connecting. */
3484 2           hints.ai_flags = AI_NUMERICHOST | AI_NUMERICSERV;
3485 2           int gai_err = getaddrinfo(host, port_str, &hints, &res);
3486 2 50         if (gai_err != 0) {
3487 0           hints.ai_flags = 0;
3488 0           gai_err = getaddrinfo(host, port_str, &hints, &res);
3489             }
3490 2 50         if (gai_err != 0) {
3491             char errbuf[256];
3492 0           snprintf(errbuf, sizeof(errbuf), "resolve: %s", gai_strerror(gai_err));
3493 0           conn_emit_error(aTHX_ self, errbuf);
3494 0           return;
3495             }
3496              
3497 2 50         for (rp = res; rp; rp = rp->ai_next) {
3498 2           fd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
3499 2 50         if (fd < 0) continue;
3500              
3501             /* Non-blocking */
3502 2           fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK);
3503              
3504             /* TCP_NODELAY */
3505             {
3506 2           int one = 1;
3507 2           setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one));
3508             }
3509              
3510 2           int ret = connect(fd, rp->ai_addr, rp->ai_addrlen);
3511 2 50         if (ret == 0) {
3512             /* Immediate connect */
3513 0           self->fd = fd;
3514 0           self->state = CONN_CONNECTING; /* will be advanced in on_connect_done */
3515 0           freeaddrinfo(res);
3516              
3517 0           ev_io_init(&self->rio, conn_io_cb, fd, EV_READ);
3518 0           self->rio.data = (void *)self;
3519 0           ev_io_init(&self->wio, conn_io_cb, fd, EV_WRITE);
3520 0           self->wio.data = (void *)self;
3521              
3522 0           conn_on_connect_done(aTHX_ self);
3523 0           return;
3524             }
3525              
3526 2 50         if (errno == EINPROGRESS) {
3527 2           self->fd = fd;
3528 2           self->state = CONN_CONNECTING;
3529 2           freeaddrinfo(res);
3530              
3531 2           ev_io_init(&self->rio, conn_io_cb, fd, EV_READ);
3532 2           self->rio.data = (void *)self;
3533 2           ev_io_init(&self->wio, conn_io_cb, fd, EV_WRITE);
3534 2           self->wio.data = (void *)self;
3535              
3536 2           conn_start_writing(self); /* wait for connect to complete */
3537              
3538 2 50         if (timeout > 0) {
3539 2           ev_timer_init(&self->timer, conn_timer_cb, timeout, 0.0);
3540 2           self->timer.data = (void *)self;
3541 2           ev_timer_start(self->loop, &self->timer);
3542 2           self->timing = 1;
3543             }
3544 2           return;
3545             }
3546              
3547 0           close(fd);
3548             }
3549              
3550 0           freeaddrinfo(res);
3551 0           conn_emit_error(aTHX_ self, "connect: all addresses failed");
3552             }
3553              
3554             /* ================================================================
3555             * XS INTERFACE
3556             * ================================================================ */
3557              
3558             MODULE = EV::Kafka PACKAGE = EV::Kafka::Conn
3559              
3560             PROTOTYPES: DISABLE
3561              
3562             EV::Kafka::Conn
3563             _new(char *cls, SV *loop_sv)
3564             CODE:
3565             {
3566             struct ev_loop *loop;
3567             ev_kafka_conn_t *self;
3568              
3569 7 50         if (SvOK(loop_sv) && sv_derived_from(loop_sv, "EV::Loop"))
    0          
3570 0           loop = (struct ev_loop *)SvIV(SvRV(loop_sv));
3571             else
3572 7           loop = EV_DEFAULT;
3573              
3574 7           Newxz(self, 1, ev_kafka_conn_t);
3575 7           self->magic = KF_MAGIC_ALIVE;
3576 7           self->loop = loop;
3577 7           self->fd = -1;
3578 7           self->state = CONN_DISCONNECTED;
3579 7           self->next_correlation_id = 1;
3580              
3581 7           Newx(self->rbuf, KF_BUF_INIT, char);
3582 7           self->rbuf_cap = KF_BUF_INIT;
3583 7           self->rbuf_len = 0;
3584 7           Newx(self->wbuf, KF_BUF_INIT, char);
3585 7           self->wbuf_cap = KF_BUF_INIT;
3586 7           self->wbuf_len = 0;
3587 7           self->wbuf_off = 0;
3588              
3589 7           ngx_queue_init(&self->cb_queue);
3590              
3591             /* Default client_id */
3592 7           self->client_id = savepv("ev-kafka");
3593 7           self->client_id_len = 8;
3594              
3595             /* Default: no API versions known */
3596             {
3597             int i;
3598 455 100         for (i = 0; i < API_VERSIONS_MAX_KEY; i++)
3599 448           self->api_versions[i] = -1;
3600             }
3601              
3602 7           self->reconnect_delay_ms = 1000;
3603              
3604 7           RETVAL = self;
3605             }
3606             OUTPUT:
3607             RETVAL
3608              
3609             void
3610             DESTROY(EV::Kafka::Conn self)
3611             CODE:
3612             {
3613 7 50         if (self->magic != KF_MAGIC_ALIVE) return;
3614              
3615 7           self->intentional_disconnect = 1;
3616 7           conn_cleanup(aTHX_ self);
3617 7           conn_cancel_pending(aTHX_ self, "destroyed");
3618              
3619 7           self->magic = KF_MAGIC_FREED;
3620              
3621 7 50         if (self->reconnect_timing) {
3622 0           ev_timer_stop(self->loop, &self->reconnect_timer);
3623 0           self->reconnect_timing = 0;
3624             }
3625              
3626 7 100         CLEAR_HANDLER(self->on_error);
3627 7 100         CLEAR_HANDLER(self->on_connect);
3628 7 50         CLEAR_HANDLER(self->on_disconnect);
3629              
3630 7 100         if (self->host) Safefree(self->host);
3631 7 50         if (self->client_id) Safefree(self->client_id);
3632 7 50         if (self->sasl_mechanism) Safefree(self->sasl_mechanism);
3633 7 50         if (self->sasl_username) Safefree(self->sasl_username);
3634 7 50         if (self->sasl_password) Safefree(self->sasl_password);
3635 7 50         if (self->scram_nonce) Safefree(self->scram_nonce);
3636 7 50         if (self->scram_client_first) Safefree(self->scram_client_first);
3637             #ifdef HAVE_OPENSSL
3638 7           OPENSSL_cleanse(self->scram_server_key, sizeof(self->scram_server_key));
3639 7 50         if (self->scram_auth_message) {
3640 0           OPENSSL_cleanse(self->scram_auth_message, self->scram_auth_message_len);
3641 0           Safefree(self->scram_auth_message);
3642             }
3643             #endif
3644 7 50         if (self->tls_ca_file) Safefree(self->tls_ca_file);
3645 7 50         if (self->rbuf) Safefree(self->rbuf);
3646 7 50         if (self->wbuf) Safefree(self->wbuf);
3647              
3648 7           Safefree(self);
3649             }
3650              
3651             void
3652             connect(EV::Kafka::Conn self, const char *host, int port, double timeout = 0)
3653             CODE:
3654             {
3655 2           conn_start_connect(aTHX_ self, host, port, timeout);
3656             }
3657              
3658             void
3659             disconnect(EV::Kafka::Conn self)
3660             CODE:
3661             {
3662 1           self->intentional_disconnect = 1;
3663 1 50         if (self->reconnect_timing) {
3664 0           ev_timer_stop(self->loop, &self->reconnect_timer);
3665 0           self->reconnect_timing = 0;
3666             }
3667 1           conn_handle_disconnect(aTHX_ self, "disconnected");
3668             }
3669              
3670             int
3671             connected(EV::Kafka::Conn self)
3672             CODE:
3673 3 50         RETVAL = (self->state == CONN_READY) ? 1 : 0;
3674             OUTPUT:
3675             RETVAL
3676              
3677             int
3678             state(EV::Kafka::Conn self)
3679             CODE:
3680 0 0         RETVAL = self->state;
3681             OUTPUT:
3682             RETVAL
3683              
3684             int
3685             pending(EV::Kafka::Conn self)
3686             CODE:
3687 0 0         RETVAL = self->pending_count;
3688             OUTPUT:
3689             RETVAL
3690              
3691             void
3692             on_error(EV::Kafka::Conn self, SV *cb = NULL)
3693             CODE:
3694             {
3695 2 50         CLEAR_HANDLER(self->on_error);
3696 2 50         if (cb && SvOK(cb)) {
    50          
3697 2           self->on_error = newSVsv(cb);
3698             }
3699             }
3700              
3701             void
3702             on_connect(EV::Kafka::Conn self, SV *cb = NULL)
3703             CODE:
3704             {
3705 1 50         CLEAR_HANDLER(self->on_connect);
3706 1 50         if (cb && SvOK(cb)) {
    50          
3707 1           self->on_connect = newSVsv(cb);
3708             }
3709             }
3710              
3711             void
3712             on_disconnect(EV::Kafka::Conn self, SV *cb = NULL)
3713             CODE:
3714             {
3715 0 0         CLEAR_HANDLER(self->on_disconnect);
3716 0 0         if (cb && SvOK(cb)) {
    0          
3717 0           self->on_disconnect = newSVsv(cb);
3718             }
3719             }
3720              
3721             void
3722             client_id(EV::Kafka::Conn self, const char *id = NULL)
3723             CODE:
3724             {
3725 0 0         if (id) {
3726 0 0         if (self->client_id) Safefree(self->client_id);
3727 0           self->client_id = savepv(id);
3728 0           self->client_id_len = strlen(id);
3729             }
3730             }
3731              
3732             void
3733             tls(EV::Kafka::Conn self, int enable, const char *ca_file = NULL, int skip_verify = 0)
3734             CODE:
3735             {
3736 0           self->tls_enabled = enable;
3737 0 0         if (self->tls_ca_file) { Safefree(self->tls_ca_file); self->tls_ca_file = NULL; }
3738 0 0         if (ca_file) self->tls_ca_file = savepv(ca_file);
3739 0           self->tls_skip_verify = skip_verify;
3740             }
3741              
3742             void
3743             sasl(EV::Kafka::Conn self, const char *mechanism, const char *username = NULL, const char *password = NULL)
3744             CODE:
3745             {
3746 0 0         if (self->sasl_mechanism) { Safefree(self->sasl_mechanism); self->sasl_mechanism = NULL; }
3747 0 0         if (self->sasl_username) { Safefree(self->sasl_username); self->sasl_username = NULL; }
3748 0 0         if (self->sasl_password) { Safefree(self->sasl_password); self->sasl_password = NULL; }
3749 0 0         if (SvOK(ST(1))) {
3750 0           self->sasl_mechanism = savepv(mechanism);
3751 0 0         if (username) self->sasl_username = savepv(username);
3752 0 0         if (password) self->sasl_password = savepv(password);
3753             }
3754             }
3755              
3756             void
3757             auto_reconnect(EV::Kafka::Conn self, int enable, int delay_ms = 1000)
3758             CODE:
3759             {
3760 0           self->auto_reconnect = enable;
3761 0           self->reconnect_delay_ms = delay_ms;
3762             }
3763              
3764             void
3765             metadata(EV::Kafka::Conn self, SV *topics_sv, SV *cb)
3766             CODE:
3767             {
3768 1 50         if (self->state != CONN_READY)
3769 1           croak("not connected");
3770              
3771             kf_buf_t body;
3772 0           kf_buf_init(&body);
3773              
3774             /* Metadata v1-v4 (non-flexible) */
3775 0           int16_t ver = self->api_versions[API_METADATA];
3776 0 0         if (ver < 0) ver = 1;
3777 0 0         if (ver > 4) ver = 4;
3778              
3779 0 0         if (SvOK(topics_sv) && SvROK(topics_sv) && SvTYPE(SvRV(topics_sv)) == SVt_PVAV) {
    0          
    0          
3780 0           AV *topics = (AV*)SvRV(topics_sv);
3781 0           SSize_t i, count = av_len(topics) + 1;
3782 0           kf_buf_append_i32(&body, (int32_t)count);
3783 0 0         for (i = 0; i < count; i++) {
3784 0           SV **elem = av_fetch(topics, i, 0);
3785             STRLEN tlen;
3786 0           const char *tname = SvPV(*elem, tlen);
3787 0           kf_buf_append_string(&body, tname, (int16_t)tlen);
3788             }
3789             } else {
3790 0           kf_buf_append_i32(&body, -1); /* null array = all topics */
3791             }
3792              
3793             /* allow_auto_topic_creation (v4+) */
3794 0 0         if (ver >= 4)
3795 0           kf_buf_append_i8(&body, 1);
3796              
3797 0           conn_send_request(aTHX_ self, API_METADATA, ver, &body, cb, 0, 0);
3798 0           kf_buf_free(&body);
3799             }
3800              
3801             void
3802             api_versions(EV::Kafka::Conn self)
3803             PPCODE:
3804             {
3805 0 0         if (!self->api_versions_known)
3806 0           XSRETURN_UNDEF;
3807              
3808 0           HV *hv = newHV();
3809             int i;
3810 0 0         for (i = 0; i < API_VERSIONS_MAX_KEY; i++) {
3811 0 0         if (self->api_versions[i] >= 0) {
3812             char key[8];
3813 0           int klen = snprintf(key, sizeof(key), "%d", i);
3814 0           hv_store(hv, key, klen, newSViv(self->api_versions[i]), 0);
3815             }
3816             }
3817 0 0         EXTEND(SP, 1);
3818 0           mPUSHs(newRV_noinc((SV*)hv));
3819 0           XSRETURN(1);
3820             }
3821              
3822             void
3823             fetch(EV::Kafka::Conn self, const char *topic, int partition, SV *offset_sv, SV *arg1 = NULL, SV *arg2 = NULL)
3824             CODE:
3825             {
3826 1 50         if (self->state != CONN_READY)
3827 1           croak("not connected");
3828              
3829             /* Accept either fetch(..., $cb) or fetch(..., \%opts, $cb).
3830             * $opts may set max_bytes, max_wait_ms, min_bytes. */
3831 0           SV *cb = NULL;
3832 0           HV *opts = NULL;
3833 0 0         if (arg1 && SvROK(arg1) && SvTYPE(SvRV(arg1)) == SVt_PVHV) {
    0          
    0          
3834 0           opts = (HV*)SvRV(arg1);
3835 0           cb = arg2;
3836             } else {
3837 0           cb = arg1;
3838             }
3839              
3840 0           int32_t max_bytes = 1048576;
3841 0           int32_t max_wait_ms = 500;
3842 0           int32_t min_bytes = 1;
3843 0 0         if (opts) {
3844             SV **v;
3845 0 0         if ((v = hv_fetchs(opts, "max_bytes", 0)) && SvOK(*v))
    0          
3846 0           max_bytes = (int32_t)SvIV(*v);
3847 0 0         if ((v = hv_fetchs(opts, "max_wait_ms", 0)) && SvOK(*v))
    0          
3848 0           max_wait_ms = (int32_t)SvIV(*v);
3849 0 0         if ((v = hv_fetchs(opts, "min_bytes", 0)) && SvOK(*v))
    0          
3850 0           min_bytes = (int32_t)SvIV(*v);
3851             }
3852              
3853 0           int64_t offset = SvIV(offset_sv);
3854 0           STRLEN topic_len = strlen(topic);
3855              
3856 0           int16_t ver = self->api_versions[API_FETCH];
3857 0 0         if (ver < 0) ver = 4;
3858 0 0         if (ver > 7) ver = 7;
3859              
3860             kf_buf_t body;
3861 0           kf_buf_init(&body);
3862              
3863 0           kf_buf_append_i32(&body, -1); /* replica_id = -1 (consumer) */
3864 0           kf_buf_append_i32(&body, max_wait_ms);
3865 0           kf_buf_append_i32(&body, min_bytes);
3866              
3867             /* max_bytes (v3+) */
3868 0 0         if (ver >= 3)
3869 0           kf_buf_append_i32(&body, max_bytes);
3870              
3871             /* isolation_level (v4+) */
3872 0 0         if (ver >= 4)
3873 0           kf_buf_append_i8(&body, 0); /* READ_UNCOMMITTED */
3874              
3875             /* session_id + session_epoch (v7+) */
3876 0 0         if (ver >= 7) {
3877 0           kf_buf_append_i32(&body, 0); /* session_id */
3878 0           kf_buf_append_i32(&body, -1); /* session_epoch */
3879             }
3880              
3881             /* topics: ARRAY(1) */
3882 0           kf_buf_append_i32(&body, 1);
3883 0           kf_buf_append_string(&body, topic, (int16_t)topic_len);
3884              
3885             /* partitions: ARRAY(1) */
3886 0           kf_buf_append_i32(&body, 1);
3887 0           kf_buf_append_i32(&body, (int32_t)partition);
3888              
3889             /* fetch_offset */
3890 0           kf_buf_append_i64(&body, offset);
3891              
3892             /* log_start_offset (v5+) */
3893 0 0         if (ver >= 5)
3894 0           kf_buf_append_i64(&body, -1);
3895              
3896             /* partition_max_bytes */
3897 0           kf_buf_append_i32(&body, max_bytes);
3898              
3899             /* forgotten_topics_data (v7+) */
3900 0 0         if (ver >= 7)
3901 0           kf_buf_append_i32(&body, 0); /* empty array */
3902              
3903 0           conn_send_request(aTHX_ self, API_FETCH, ver, &body, cb, 0, 0);
3904 0           kf_buf_free(&body);
3905             }
3906              
3907             void
3908             fetch_multi(EV::Kafka::Conn self, SV *topics_sv, SV *arg1 = NULL, SV *arg2 = NULL)
3909             CODE:
3910             {
3911 0 0         if (self->state != CONN_READY)
3912 0           croak("not connected");
3913              
3914             /* topics_sv: { topic => [{partition => N, offset => N}, ...], ... }
3915             * accept fetch_multi(\%topics, $cb) or fetch_multi(\%topics, \%opts, $cb). */
3916 0 0         if (!SvROK(topics_sv) || SvTYPE(SvRV(topics_sv)) != SVt_PVHV)
    0          
3917 0           croak("fetch_multi: expected hashref");
3918 0           HV *topics_hv = (HV*)SvRV(topics_sv);
3919              
3920 0           SV *cb = NULL;
3921 0           HV *opts = NULL;
3922 0 0         if (arg1 && SvROK(arg1) && SvTYPE(SvRV(arg1)) == SVt_PVHV) {
    0          
    0          
3923 0           opts = (HV*)SvRV(arg1);
3924 0           cb = arg2;
3925             } else {
3926 0           cb = arg1;
3927             }
3928              
3929 0           int32_t max_bytes = 1048576;
3930 0           int32_t max_wait_ms = 500;
3931 0           int32_t min_bytes = 1;
3932 0 0         if (opts) {
3933             SV **v;
3934 0 0         if ((v = hv_fetchs(opts, "max_bytes", 0)) && SvOK(*v))
    0          
3935 0           max_bytes = (int32_t)SvIV(*v);
3936 0 0         if ((v = hv_fetchs(opts, "max_wait_ms", 0)) && SvOK(*v))
    0          
3937 0           max_wait_ms = (int32_t)SvIV(*v);
3938 0 0         if ((v = hv_fetchs(opts, "min_bytes", 0)) && SvOK(*v))
    0          
3939 0           min_bytes = (int32_t)SvIV(*v);
3940             }
3941              
3942 0           int16_t ver = self->api_versions[API_FETCH];
3943 0 0         if (ver < 0) ver = 4;
3944 0 0         if (ver > 7) ver = 7;
3945              
3946             kf_buf_t body;
3947 0           kf_buf_init(&body);
3948              
3949 0           kf_buf_append_i32(&body, -1); /* replica_id */
3950 0           kf_buf_append_i32(&body, max_wait_ms);
3951 0           kf_buf_append_i32(&body, min_bytes);
3952 0 0         if (ver >= 3)
3953 0           kf_buf_append_i32(&body, max_bytes);
3954 0 0         if (ver >= 4)
3955 0           kf_buf_append_i8(&body, 0); /* isolation_level */
3956 0 0         if (ver >= 7) {
3957 0           kf_buf_append_i32(&body, 0); /* session_id */
3958 0           kf_buf_append_i32(&body, -1); /* session_epoch */
3959             }
3960              
3961             /* topics array */
3962 0 0         kf_buf_append_i32(&body, (int32_t)HvUSEDKEYS(topics_hv));
3963              
3964 0           hv_iterinit(topics_hv);
3965             HE *entry;
3966 0 0         while ((entry = hv_iternext(topics_hv))) {
3967             I32 tlen;
3968 0           const char *tname = hv_iterkey(entry, &tlen);
3969 0           kf_buf_append_string(&body, tname, (int16_t)tlen);
3970              
3971 0           SV *parts_sv = hv_iterval(topics_hv, entry);
3972 0 0         if (!SvROK(parts_sv) || SvTYPE(SvRV(parts_sv)) != SVt_PVAV)
    0          
3973 0           croak("fetch_multi: value must be an arrayref");
3974 0           AV *parts_av = (AV*)SvRV(parts_sv);
3975 0           SSize_t i, pc = av_len(parts_av) + 1;
3976 0           kf_buf_append_i32(&body, (int32_t)pc);
3977              
3978 0 0         for (i = 0; i < pc; i++) {
3979 0           SV **elem = av_fetch(parts_av, i, 0);
3980 0 0         if (!elem || !SvROK(*elem))
    0          
3981 0           croak("fetch_multi: partition entry must be a hashref");
3982 0           HV *ph = (HV*)SvRV(*elem);
3983              
3984 0           SV **pid_sv = hv_fetch(ph, "partition", 9, 0);
3985 0 0         int32_t pid = pid_sv ? (int32_t)SvIV(*pid_sv) : 0;
3986 0           kf_buf_append_i32(&body, pid);
3987              
3988 0           SV **off_sv = hv_fetch(ph, "offset", 6, 0);
3989 0 0         int64_t offset = off_sv ? (int64_t)SvIV(*off_sv) : 0;
3990 0           kf_buf_append_i64(&body, offset);
3991              
3992 0 0         if (ver >= 5)
3993 0           kf_buf_append_i64(&body, -1); /* log_start_offset */
3994              
3995 0           kf_buf_append_i32(&body, max_bytes); /* partition_max_bytes */
3996             }
3997             }
3998              
3999 0 0         if (ver >= 7)
4000 0           kf_buf_append_i32(&body, 0); /* forgotten_topics_data */
4001              
4002 0           conn_send_request(aTHX_ self, API_FETCH, ver, &body, cb, 0, 0);
4003 0           kf_buf_free(&body);
4004             }
4005              
4006             void
4007             produce_batch(EV::Kafka::Conn self, const char *topic, int partition, SV *records_sv, SV *opts_sv = NULL, SV *cb = NULL)
4008             CODE:
4009             {
4010 0 0         if (self->state != CONN_READY)
4011 0           croak("not connected");
4012              
4013 0 0         if (!SvROK(records_sv) || SvTYPE(SvRV(records_sv)) != SVt_PVAV)
    0          
4014 0           croak("produce_batch: expected arrayref of records");
4015 0           AV *records_av = (AV*)SvRV(records_sv);
4016              
4017 0           int16_t acks = 1;
4018 0           int compression = COMPRESS_NONE;
4019 0           int64_t producer_id = -1;
4020 0           int16_t producer_epoch = -1;
4021 0           int32_t base_sequence = -1;
4022 0           const char *txn_id = NULL;
4023 0           STRLEN txn_id_len = 0;
4024              
4025 0 0         if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV) {
    0          
    0          
4026 0           HV *opts = (HV*)SvRV(opts_sv);
4027             SV **tmp;
4028 0 0         if ((tmp = hv_fetch(opts, "acks", 4, 0)))
4029 0           acks = (int16_t)SvIV(*tmp);
4030 0 0         if ((tmp = hv_fetch(opts, "transactional_id", 16, 0)) && SvOK(*tmp))
    0          
4031 0           txn_id = SvPV(*tmp, txn_id_len);
4032 0 0         if ((tmp = hv_fetch(opts, "compression", 11, 0))) {
4033             STRLEN clen;
4034 0           const char *cstr = SvPV(*tmp, clen);
4035 0 0         if (clen == 4 && memcmp(cstr, "none", 4) == 0) compression = COMPRESS_NONE;
    0          
4036             #ifdef HAVE_LZ4
4037             else if (clen == 3 && memcmp(cstr, "lz4", 3) == 0) compression = COMPRESS_LZ4;
4038             #endif
4039             #ifdef HAVE_ZLIB
4040 0 0         else if (clen == 4 && memcmp(cstr, "gzip", 4) == 0) compression = COMPRESS_GZIP;
    0          
4041             #endif
4042             #ifdef HAVE_ZSTD
4043             else if (clen == 4 && memcmp(cstr, "zstd", 4) == 0) compression = COMPRESS_ZSTD;
4044             #endif
4045             #ifdef HAVE_SNAPPY
4046             else if (clen == 6 && memcmp(cstr, "snappy", 6) == 0) compression = COMPRESS_SNAPPY;
4047             #endif
4048 0           else croak("unsupported compression: %.*s", (int)clen, cstr);
4049             }
4050 0 0         if ((tmp = hv_fetch(opts, "producer_id", 11, 0)))
4051 0           producer_id = (int64_t)SvIV(*tmp);
4052 0 0         if ((tmp = hv_fetch(opts, "producer_epoch", 14, 0)))
4053 0           producer_epoch = (int16_t)SvIV(*tmp);
4054 0 0         if ((tmp = hv_fetch(opts, "base_sequence", 13, 0)))
4055 0           base_sequence = (int32_t)SvIV(*tmp);
4056 0 0         } else if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVCV) {
    0          
    0          
4057 0           cb = opts_sv;
4058 0           opts_sv = NULL;
4059             }
4060              
4061             struct timeval tv;
4062 0           gettimeofday(&tv, NULL);
4063 0           int64_t timestamp = (int64_t)tv.tv_sec * 1000 + tv.tv_usec / 1000;
4064              
4065             kf_buf_t batch;
4066 0           kf_encode_record_batch_multi(aTHX_ &batch, records_av, timestamp,
4067             compression, producer_id, producer_epoch, base_sequence,
4068             txn_id != NULL ? 1 : 0);
4069              
4070 0           int16_t ver = self->api_versions[API_PRODUCE];
4071 0 0         if (ver < 0) ver = 3;
4072 0 0         if (ver > 7) ver = 7;
4073              
4074 0           STRLEN topic_len = strlen(topic);
4075              
4076             kf_buf_t body;
4077 0           kf_buf_init(&body);
4078              
4079 0 0         if (ver >= 3)
4080 0 0         kf_buf_append_nullable_string(&body, txn_id, txn_id ? (int16_t)txn_id_len : 0);
4081              
4082 0           kf_buf_append_i16(&body, acks);
4083 0           kf_buf_append_i32(&body, 30000);
4084              
4085 0           kf_buf_append_i32(&body, 1);
4086 0           kf_buf_append_string(&body, topic, (int16_t)topic_len);
4087 0           kf_buf_append_i32(&body, 1);
4088 0           kf_buf_append_i32(&body, (int32_t)partition);
4089 0           kf_buf_append_i32(&body, (int32_t)batch.len);
4090 0           kf_buf_append(&body, batch.data, batch.len);
4091              
4092 0 0         conn_send_request(aTHX_ self, API_PRODUCE, ver, &body,
4093 0 0         (cb && SvOK(cb)) ? cb : NULL, 0, (acks == 0) ? 1 : 0);
4094              
4095 0           kf_buf_free(&body);
4096 0           kf_buf_free(&batch);
4097             }
4098              
4099             void
4100             list_offsets(EV::Kafka::Conn self, const char *topic, int partition, SV *timestamp_sv, SV *cb)
4101             CODE:
4102             {
4103 0 0         if (self->state != CONN_READY)
4104 0           croak("not connected");
4105              
4106 0           int64_t timestamp = SvIV(timestamp_sv);
4107 0           STRLEN topic_len = strlen(topic);
4108              
4109             /* -2 = earliest, -1 = latest */
4110 0           int16_t ver = self->api_versions[API_LIST_OFFSETS];
4111 0 0         if (ver < 0) ver = 1;
4112 0 0         if (ver > 5) ver = 5;
4113              
4114             kf_buf_t body;
4115 0           kf_buf_init(&body);
4116              
4117 0           kf_buf_append_i32(&body, -1); /* replica_id */
4118              
4119             /* isolation_level (v2+) */
4120 0 0         if (ver >= 2)
4121 0           kf_buf_append_i8(&body, 0);
4122              
4123             /* topics: ARRAY(1) */
4124 0           kf_buf_append_i32(&body, 1);
4125 0           kf_buf_append_string(&body, topic, (int16_t)topic_len);
4126              
4127             /* partitions: ARRAY(1) */
4128 0           kf_buf_append_i32(&body, 1);
4129 0           kf_buf_append_i32(&body, (int32_t)partition);
4130              
4131             /* current_leader_epoch (v4+) */
4132 0 0         if (ver >= 4)
4133 0           kf_buf_append_i32(&body, -1);
4134              
4135 0           kf_buf_append_i64(&body, timestamp);
4136              
4137 0           conn_send_request(aTHX_ self, API_LIST_OFFSETS, ver, &body, cb, 0, 0);
4138 0           kf_buf_free(&body);
4139             }
4140              
4141             void
4142             produce(EV::Kafka::Conn self, const char *topic, int partition, SV *key_sv, SV *value_sv, SV *opts_sv = NULL, SV *cb = NULL)
4143             CODE:
4144             {
4145 1 50         if (self->state != CONN_READY)
4146 1           croak("not connected");
4147              
4148             /* Handle optional opts hash: produce($topic, $part, $key, $val, \%opts, $cb)
4149             * or produce($topic, $part, $key, $val, $cb)
4150             */
4151 0           HV *headers = NULL;
4152 0           int16_t acks = 1;
4153 0           int compression = COMPRESS_NONE;
4154              
4155 0 0         if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV) {
    0          
    0          
4156 0           HV *opts = (HV*)SvRV(opts_sv);
4157             SV **tmp;
4158 0 0         if ((tmp = hv_fetch(opts, "headers", 7, 0)) && SvROK(*tmp) && SvTYPE(SvRV(*tmp)) == SVt_PVHV)
    0          
    0          
4159 0           headers = (HV*)SvRV(*tmp);
4160 0 0         if ((tmp = hv_fetch(opts, "acks", 4, 0)))
4161 0           acks = (int16_t)SvIV(*tmp);
4162 0 0         if ((tmp = hv_fetch(opts, "compression", 11, 0))) {
4163             STRLEN clen;
4164 0           const char *cstr = SvPV(*tmp, clen);
4165 0 0         if (clen == 4 && memcmp(cstr, "none", 4) == 0) compression = COMPRESS_NONE;
    0          
4166             #ifdef HAVE_LZ4
4167             else if (clen == 3 && memcmp(cstr, "lz4", 3) == 0) compression = COMPRESS_LZ4;
4168             #endif
4169             #ifdef HAVE_ZLIB
4170 0 0         else if (clen == 4 && memcmp(cstr, "gzip", 4) == 0) compression = COMPRESS_GZIP;
    0          
4171             #endif
4172             #ifdef HAVE_ZSTD
4173             else if (clen == 4 && memcmp(cstr, "zstd", 4) == 0) compression = COMPRESS_ZSTD;
4174             #endif
4175             #ifdef HAVE_SNAPPY
4176             else if (clen == 6 && memcmp(cstr, "snappy", 6) == 0) compression = COMPRESS_SNAPPY;
4177             #endif
4178 0           else croak("unsupported compression: %.*s", (int)clen, cstr);
4179             }
4180 0 0         } else if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVCV) {
    0          
    0          
4181             /* opts_sv is actually the callback */
4182 0           cb = opts_sv;
4183 0           opts_sv = NULL;
4184             }
4185              
4186 0           const char *key = NULL;
4187 0           STRLEN key_len = 0;
4188 0 0         if (SvOK(key_sv))
4189 0           key = SvPV(key_sv, key_len);
4190              
4191 0           const char *value = NULL;
4192 0           STRLEN value_len = 0;
4193 0 0         if (SvOK(value_sv))
4194 0           value = SvPV(value_sv, value_len);
4195              
4196             /* Build RecordBatch */
4197             int64_t timestamp;
4198             {
4199 0           SV **ts_tmp = NULL;
4200 0 0         if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV)
    0          
    0          
4201 0           ts_tmp = hv_fetch((HV*)SvRV(opts_sv), "timestamp", 9, 0);
4202 0 0         if (ts_tmp && SvOK(*ts_tmp)) {
    0          
4203 0           timestamp = (int64_t)SvIV(*ts_tmp);
4204             } else {
4205             struct timeval tv;
4206 0           gettimeofday(&tv, NULL);
4207 0           timestamp = (int64_t)tv.tv_sec * 1000 + tv.tv_usec / 1000;
4208             }
4209             }
4210              
4211             /* Build a single-element AV and dispatch to the multi encoder so
4212             * the compression / CRC framing logic lives in one place. */
4213             kf_buf_t batch;
4214             {
4215 0           HV *rec = newHV();
4216 0 0         if (key) hv_stores(rec, "key", newSVpvn(key, key_len));
4217 0 0         if (value) hv_stores(rec, "value", newSVpvn(value, value_len));
4218 0 0         if (headers) hv_stores(rec, "headers", newRV_inc((SV*)headers));
4219 0           AV *records_av = newAV();
4220 0           av_push(records_av, newRV_noinc((SV*)rec));
4221 0           kf_encode_record_batch_multi(aTHX_ &batch, records_av, timestamp,
4222             compression, -1, -1, -1, 0);
4223 0           SvREFCNT_dec((SV*)records_av);
4224             }
4225              
4226             /* Use Produce version — cap at v7 */
4227 0           int16_t ver = self->api_versions[API_PRODUCE];
4228 0 0         if (ver < 0) ver = 3;
4229 0 0         if (ver > 7) ver = 7;
4230              
4231 0           STRLEN topic_len = strlen(topic);
4232              
4233             kf_buf_t body;
4234 0           kf_buf_init(&body);
4235              
4236             /* transactional_id (v3+): nullable string = null */
4237 0 0         if (ver >= 3)
4238 0           kf_buf_append_nullable_string(&body, NULL, 0);
4239              
4240 0           kf_buf_append_i16(&body, acks); /* acks */
4241 0           kf_buf_append_i32(&body, 30000); /* timeout_ms = 30s */
4242              
4243             /* topic_data: ARRAY(1) */
4244 0           kf_buf_append_i32(&body, 1); /* 1 topic */
4245 0           kf_buf_append_string(&body, topic, (int16_t)topic_len);
4246              
4247             /* partition_data: ARRAY(1) */
4248 0           kf_buf_append_i32(&body, 1); /* 1 partition */
4249 0           kf_buf_append_i32(&body, partition);
4250              
4251             /* record_set: BYTES (i32 length + record_batch) */
4252 0           kf_buf_append_i32(&body, (int32_t)batch.len);
4253 0           kf_buf_append(&body, batch.data, batch.len);
4254              
4255 0 0         conn_send_request(aTHX_ self, API_PRODUCE, ver, &body,
4256 0 0         (cb && SvOK(cb)) ? cb : NULL, 0, (acks == 0) ? 1 : 0);
4257              
4258 0           kf_buf_free(&body);
4259 0           kf_buf_free(&batch);
4260             }
4261              
4262             void
4263             find_coordinator(EV::Kafka::Conn self, const char *group_id, SV *cb, int key_type = 0)
4264             CODE:
4265             {
4266 0 0         if (self->state != CONN_READY)
4267 0           croak("not connected");
4268              
4269 0           int16_t ver = self->api_versions[API_FIND_COORDINATOR];
4270 0 0         if (ver < 0) ver = 0;
4271 0 0         if (ver > 2) ver = 2;
4272              
4273             kf_buf_t body;
4274 0           kf_buf_init(&body);
4275              
4276 0           STRLEN glen = strlen(group_id);
4277 0           kf_buf_append_string(&body, group_id, (int16_t)glen);
4278              
4279             /* key_type (v1+): 0=group, 1=transaction */
4280 0 0         if (ver >= 1)
4281 0           kf_buf_append_i8(&body, (int8_t)key_type);
4282              
4283 0           conn_send_request(aTHX_ self, API_FIND_COORDINATOR, ver, &body, cb, 0, 0);
4284 0           kf_buf_free(&body);
4285             }
4286              
4287             void
4288             join_group(EV::Kafka::Conn self, const char *group_id, const char *member_id, SV *topics_sv, SV *cb, int session_timeout_ms = 30000, int rebalance_timeout_ms = 60000, SV *group_instance_id_sv = NULL)
4289             CODE:
4290             {
4291 0 0         if (self->state != CONN_READY)
4292 0           croak("not connected");
4293              
4294 0           int16_t ver = self->api_versions[API_JOIN_GROUP];
4295 0 0         if (ver < 0) ver = 1;
4296 0 0         if (ver > 5) ver = 5;
4297              
4298             kf_buf_t body;
4299 0           kf_buf_init(&body);
4300              
4301 0           STRLEN glen = strlen(group_id);
4302 0           kf_buf_append_string(&body, group_id, (int16_t)glen);
4303 0           kf_buf_append_i32(&body, session_timeout_ms);
4304              
4305             /* rebalance_timeout_ms (v1+) */
4306 0 0         if (ver >= 1)
4307 0           kf_buf_append_i32(&body, rebalance_timeout_ms);
4308              
4309 0           STRLEN mlen = strlen(member_id);
4310 0           kf_buf_append_string(&body, member_id, (int16_t)mlen);
4311              
4312             /* group_instance_id (v5+) */
4313 0 0         if (ver >= 5) {
4314 0 0         if (group_instance_id_sv && SvOK(group_instance_id_sv)) {
    0          
4315             STRLEN gilen;
4316 0           const char *gi = SvPV(group_instance_id_sv, gilen);
4317 0           kf_buf_append_nullable_string(&body, gi, (int16_t)gilen);
4318             } else {
4319 0           kf_buf_append_nullable_string(&body, NULL, 0);
4320             }
4321             }
4322              
4323             /* protocol_type = "consumer" */
4324 0           kf_buf_append_string(&body, "consumer", 8);
4325              
4326             /* protocols: ARRAY(1) */
4327 0           kf_buf_append_i32(&body, 1);
4328              
4329             /* protocol name = "sticky" */
4330 0           kf_buf_append_string(&body, "sticky", 6);
4331              
4332             /* protocol metadata (ConsumerProtocol subscription) */
4333             /* Version:0, Topics:array, UserData:null */
4334             kf_buf_t meta;
4335 0           kf_buf_init(&meta);
4336 0           kf_buf_append_i16(&meta, 0); /* version */
4337              
4338 0           AV *topics = (AV*)SvRV(topics_sv);
4339 0           SSize_t i, tc = av_len(topics) + 1;
4340 0           kf_buf_append_i32(&meta, (int32_t)tc);
4341 0 0         for (i = 0; i < tc; i++) {
4342 0           SV **elem = av_fetch(topics, i, 0);
4343             STRLEN tlen;
4344 0           const char *tname = SvPV(*elem, tlen);
4345 0           kf_buf_append_string(&meta, tname, (int16_t)tlen);
4346             }
4347 0           kf_buf_append_nullable_bytes(&meta, NULL, 0); /* user_data = null */
4348              
4349 0           kf_buf_append_bytes(&body, meta.data, (int32_t)meta.len);
4350 0           kf_buf_free(&meta);
4351              
4352 0           conn_send_request(aTHX_ self, API_JOIN_GROUP, ver, &body, cb, 0, 0);
4353 0           kf_buf_free(&body);
4354             }
4355              
4356             void
4357             sync_group(EV::Kafka::Conn self, const char *group_id, int generation_id, const char *member_id, SV *assignments_sv, SV *cb, SV *group_instance_id_sv = NULL)
4358             CODE:
4359             {
4360 0 0         if (self->state != CONN_READY)
4361 0           croak("not connected");
4362              
4363 0           int16_t ver = self->api_versions[API_SYNC_GROUP];
4364 0 0         if (ver < 0) ver = 0;
4365 0 0         if (ver > 3) ver = 3;
4366              
4367             kf_buf_t body;
4368 0           kf_buf_init(&body);
4369              
4370 0           STRLEN glen = strlen(group_id);
4371 0           kf_buf_append_string(&body, group_id, (int16_t)glen);
4372 0           kf_buf_append_i32(&body, generation_id);
4373              
4374 0           STRLEN mlen = strlen(member_id);
4375 0           kf_buf_append_string(&body, member_id, (int16_t)mlen);
4376              
4377             /* group_instance_id (v3+) */
4378 0 0         if (ver >= 3) {
4379 0 0         if (group_instance_id_sv && SvOK(group_instance_id_sv)) {
    0          
4380             STRLEN gilen;
4381 0           const char *gi = SvPV(group_instance_id_sv, gilen);
4382 0           kf_buf_append_nullable_string(&body, gi, (int16_t)gilen);
4383             } else {
4384 0           kf_buf_append_nullable_string(&body, NULL, 0);
4385             }
4386             }
4387              
4388             /* assignments: ARRAY of {member_id, assignment_bytes} */
4389 0 0         if (SvOK(assignments_sv) && SvROK(assignments_sv)
    0          
4390 0 0         && SvTYPE(SvRV(assignments_sv)) == SVt_PVAV) {
4391 0           AV *assigns = (AV*)SvRV(assignments_sv);
4392 0           SSize_t i, ac = av_len(assigns) + 1;
4393 0           kf_buf_append_i32(&body, (int32_t)ac);
4394              
4395 0 0         for (i = 0; i < ac; i++) {
4396 0           SV **elem = av_fetch(assigns, i, 0);
4397 0 0         if (!elem || !SvROK(*elem)) continue;
    0          
4398 0           HV *ah = (HV*)SvRV(*elem);
4399              
4400 0           SV **mid_sv = hv_fetch(ah, "member_id", 9, 0);
4401 0 0         if (!mid_sv) continue;
4402             STRLEN mid_len;
4403 0           const char *mid = SvPV(*mid_sv, mid_len);
4404 0           kf_buf_append_string(&body, mid, (int16_t)mid_len);
4405              
4406 0           SV **data_sv = hv_fetch(ah, "assignment", 10, 0);
4407 0 0         if (!data_sv) { kf_buf_append_bytes(&body, NULL, 0); continue; }
4408             STRLEN dlen;
4409 0           const char *ddata = SvPV(*data_sv, dlen);
4410 0           kf_buf_append_bytes(&body, ddata, (int32_t)dlen);
4411             }
4412             } else {
4413 0           kf_buf_append_i32(&body, 0); /* empty array */
4414             }
4415              
4416 0           conn_send_request(aTHX_ self, API_SYNC_GROUP, ver, &body, cb, 0, 0);
4417 0           kf_buf_free(&body);
4418             }
4419              
4420             void
4421             heartbeat(EV::Kafka::Conn self, const char *group_id, int generation_id, const char *member_id, SV *cb, SV *group_instance_id_sv = NULL)
4422             CODE:
4423             {
4424 0 0         if (self->state != CONN_READY)
4425 0           croak("not connected");
4426              
4427 0           int16_t ver = self->api_versions[API_HEARTBEAT];
4428 0 0         if (ver < 0) ver = 0;
4429 0 0         if (ver > 4) ver = 4;
4430              
4431             kf_buf_t body;
4432 0           kf_buf_init(&body);
4433              
4434 0           STRLEN glen = strlen(group_id);
4435 0           kf_buf_append_string(&body, group_id, (int16_t)glen);
4436 0           kf_buf_append_i32(&body, generation_id);
4437              
4438 0           STRLEN mlen = strlen(member_id);
4439 0           kf_buf_append_string(&body, member_id, (int16_t)mlen);
4440              
4441             /* group_instance_id (v3+) */
4442 0 0         if (ver >= 3) {
4443 0 0         if (group_instance_id_sv && SvOK(group_instance_id_sv)) {
    0          
4444             STRLEN gilen;
4445 0           const char *gi = SvPV(group_instance_id_sv, gilen);
4446 0           kf_buf_append_nullable_string(&body, gi, (int16_t)gilen);
4447             } else {
4448 0           kf_buf_append_nullable_string(&body, NULL, 0);
4449             }
4450             }
4451              
4452 0           conn_send_request(aTHX_ self, API_HEARTBEAT, ver, &body, cb, 0, 0);
4453 0           kf_buf_free(&body);
4454             }
4455              
4456             void
4457             offset_commit(EV::Kafka::Conn self, const char *group_id, int generation_id, const char *member_id, SV *offsets_sv, SV *cb)
4458             CODE:
4459             {
4460 0 0         if (self->state != CONN_READY)
4461 0           croak("not connected");
4462              
4463 0           int16_t ver = self->api_versions[API_OFFSET_COMMIT];
4464 0 0         if (ver < 0) ver = 2;
4465 0 0         if (ver > 7) ver = 7;
4466              
4467             kf_buf_t body;
4468 0           kf_buf_init(&body);
4469              
4470 0           STRLEN glen = strlen(group_id);
4471 0           kf_buf_append_string(&body, group_id, (int16_t)glen);
4472              
4473             /* generation_id (v1+) */
4474 0 0         if (ver >= 1)
4475 0           kf_buf_append_i32(&body, generation_id);
4476              
4477             /* member_id (v1+) */
4478 0 0         if (ver >= 1) {
4479 0           STRLEN mlen = strlen(member_id);
4480 0           kf_buf_append_string(&body, member_id, (int16_t)mlen);
4481             }
4482              
4483             /* group_instance_id (v7+): null */
4484 0 0         if (ver >= 7)
4485 0           kf_buf_append_nullable_string(&body, NULL, 0);
4486              
4487             /* topics: ARRAY of {topic, partitions: [{partition, committed_offset, metadata}]} */
4488 0           AV *topics = (AV*)SvRV(offsets_sv);
4489 0           SSize_t i, tc = av_len(topics) + 1;
4490 0           kf_buf_append_i32(&body, (int32_t)tc);
4491              
4492 0 0         for (i = 0; i < tc; i++) {
4493 0           SV **elem = av_fetch(topics, i, 0);
4494 0 0         if (!elem || !SvROK(*elem)) continue;
    0          
4495 0           HV *th = (HV*)SvRV(*elem);
4496 0           SV **tname_sv = hv_fetch(th, "topic", 5, 0);
4497 0 0         if (!tname_sv) continue;
4498             STRLEN tnlen;
4499 0           const char *tname = SvPV(*tname_sv, tnlen);
4500 0           kf_buf_append_string(&body, tname, (int16_t)tnlen);
4501              
4502 0           SV **parts_sv = hv_fetch(th, "partitions", 10, 0);
4503 0 0         if (!parts_sv || !SvROK(*parts_sv)) { kf_buf_append_i32(&body, 0); continue; }
    0          
4504 0           AV *parts = (AV*)SvRV(*parts_sv);
4505 0           SSize_t j, pc = av_len(parts) + 1;
4506 0           kf_buf_append_i32(&body, (int32_t)pc);
4507              
4508 0 0         for (j = 0; j < pc; j++) {
4509 0           SV **pelem = av_fetch(parts, j, 0);
4510 0 0         if (!pelem || !SvROK(*pelem)) continue;
    0          
4511 0           HV *ph = (HV*)SvRV(*pelem);
4512              
4513 0           SV **pid_sv = hv_fetch(ph, "partition", 9, 0);
4514 0 0         kf_buf_append_i32(&body, pid_sv ? (int32_t)SvIV(*pid_sv) : 0);
4515              
4516 0           SV **off_sv = hv_fetch(ph, "offset", 6, 0);
4517 0 0         kf_buf_append_i64(&body, off_sv ? (int64_t)SvIV(*off_sv) : 0);
4518              
4519             /* leader_epoch (v6+) */
4520 0 0         if (ver >= 6)
4521 0           kf_buf_append_i32(&body, -1);
4522              
4523             /* metadata: nullable string = empty */
4524 0           kf_buf_append_nullable_string(&body, "", 0);
4525             }
4526             }
4527              
4528 0           conn_send_request(aTHX_ self, API_OFFSET_COMMIT, ver, &body, cb, 0, 0);
4529 0           kf_buf_free(&body);
4530             }
4531              
4532             void
4533             offset_fetch(EV::Kafka::Conn self, const char *group_id, SV *topics_sv, SV *cb)
4534             CODE:
4535             {
4536 0 0         if (self->state != CONN_READY)
4537 0           croak("not connected");
4538              
4539 0           int16_t ver = self->api_versions[API_OFFSET_FETCH];
4540 0 0         if (ver < 0) ver = 1;
4541 0 0         if (ver > 5) ver = 5;
4542              
4543             kf_buf_t body;
4544 0           kf_buf_init(&body);
4545              
4546 0           STRLEN glen = strlen(group_id);
4547 0           kf_buf_append_string(&body, group_id, (int16_t)glen);
4548              
4549             /* topics: ARRAY */
4550 0           AV *topics = (AV*)SvRV(topics_sv);
4551 0           SSize_t i, tc = av_len(topics) + 1;
4552 0           kf_buf_append_i32(&body, (int32_t)tc);
4553              
4554 0 0         for (i = 0; i < tc; i++) {
4555 0           SV **elem = av_fetch(topics, i, 0);
4556 0 0         if (!elem || !SvROK(*elem)) continue;
    0          
4557 0           HV *th = (HV*)SvRV(*elem);
4558 0           SV **tname_sv = hv_fetch(th, "topic", 5, 0);
4559 0 0         if (!tname_sv) continue;
4560             STRLEN tnlen;
4561 0           const char *tname = SvPV(*tname_sv, tnlen);
4562 0           kf_buf_append_string(&body, tname, (int16_t)tnlen);
4563              
4564 0           SV **parts_sv = hv_fetch(th, "partitions", 10, 0);
4565 0 0         if (!parts_sv || !SvROK(*parts_sv)) { kf_buf_append_i32(&body, 0); continue; }
    0          
4566 0           AV *parts = (AV*)SvRV(*parts_sv);
4567 0           SSize_t j, pc = av_len(parts) + 1;
4568 0           kf_buf_append_i32(&body, (int32_t)pc);
4569              
4570 0 0         for (j = 0; j < pc; j++) {
4571 0           SV **pelem = av_fetch(parts, j, 0);
4572 0 0         kf_buf_append_i32(&body, pelem ? (int32_t)SvIV(*pelem) : 0);
4573             }
4574             }
4575              
4576 0           conn_send_request(aTHX_ self, API_OFFSET_FETCH, ver, &body, cb, 0, 0);
4577 0           kf_buf_free(&body);
4578             }
4579              
4580             void
4581             leave_group(EV::Kafka::Conn self, const char *group_id, const char *member_id, SV *cb)
4582             CODE:
4583             {
4584 0 0         if (self->state != CONN_READY)
4585 0           croak("not connected");
4586              
4587 0           int16_t ver = self->api_versions[API_LEAVE_GROUP];
4588 0 0         if (ver < 0) ver = 0;
4589 0 0         if (ver > 3) ver = 3;
4590              
4591             kf_buf_t body;
4592 0           kf_buf_init(&body);
4593              
4594 0           STRLEN glen = strlen(group_id);
4595 0           kf_buf_append_string(&body, group_id, (int16_t)glen);
4596              
4597 0           STRLEN mlen = strlen(member_id);
4598 0           kf_buf_append_string(&body, member_id, (int16_t)mlen);
4599              
4600 0           conn_send_request(aTHX_ self, API_LEAVE_GROUP, ver, &body, cb, 0, 0);
4601 0           kf_buf_free(&body);
4602             }
4603              
4604             void
4605             create_topics(EV::Kafka::Conn self, SV *topics_sv, int timeout_ms, SV *cb)
4606             CODE:
4607             {
4608 0 0         if (self->state != CONN_READY)
4609 0           croak("not connected");
4610              
4611 0           int16_t ver = self->api_versions[API_CREATE_TOPICS];
4612 0 0         if (ver < 0) ver = 0;
4613 0 0         if (ver > 4) ver = 4;
4614              
4615             kf_buf_t body;
4616 0           kf_buf_init(&body);
4617              
4618 0           AV *topics = (AV*)SvRV(topics_sv);
4619 0           SSize_t i, tc = av_len(topics) + 1;
4620 0           kf_buf_append_i32(&body, (int32_t)tc);
4621              
4622 0 0         for (i = 0; i < tc; i++) {
4623 0           SV **elem = av_fetch(topics, i, 0);
4624 0 0         if (!elem || !SvROK(*elem)) continue;
    0          
4625 0           HV *th = (HV*)SvRV(*elem);
4626              
4627 0           SV **name_sv = hv_fetch(th, "name", 4, 0);
4628 0 0         if (!name_sv) continue;
4629             STRLEN nlen;
4630 0           const char *name = SvPV(*name_sv, nlen);
4631 0           kf_buf_append_string(&body, name, (int16_t)nlen);
4632              
4633 0           SV **np_sv = hv_fetch(th, "num_partitions", 14, 0);
4634 0 0         int32_t num_partitions = np_sv ? (int32_t)SvIV(*np_sv) : 1;
4635 0           kf_buf_append_i32(&body, num_partitions);
4636              
4637 0           SV **rf_sv = hv_fetch(th, "replication_factor", 18, 0);
4638 0 0         int16_t replication_factor = rf_sv ? (int16_t)SvIV(*rf_sv) : 1;
4639 0           kf_buf_append_i16(&body, replication_factor);
4640              
4641             /* assignments: empty array */
4642 0           kf_buf_append_i32(&body, 0);
4643              
4644             /* configs: empty array */
4645 0           kf_buf_append_i32(&body, 0);
4646             }
4647              
4648 0           kf_buf_append_i32(&body, timeout_ms);
4649              
4650             /* validate_only (v1+) */
4651 0 0         if (ver >= 1)
4652 0           kf_buf_append_i8(&body, 0);
4653              
4654 0           conn_send_request(aTHX_ self, API_CREATE_TOPICS, ver, &body, cb, 0, 0);
4655 0           kf_buf_free(&body);
4656             }
4657              
4658             void
4659             delete_topics(EV::Kafka::Conn self, SV *topics_sv, int timeout_ms, SV *cb)
4660             CODE:
4661             {
4662 0 0         if (self->state != CONN_READY)
4663 0           croak("not connected");
4664              
4665 0           int16_t ver = self->api_versions[API_DELETE_TOPICS];
4666 0 0         if (ver < 0) ver = 0;
4667 0 0         if (ver > 3) ver = 3;
4668              
4669             kf_buf_t body;
4670 0           kf_buf_init(&body);
4671              
4672 0           AV *topics = (AV*)SvRV(topics_sv);
4673 0           SSize_t i, tc = av_len(topics) + 1;
4674 0           kf_buf_append_i32(&body, (int32_t)tc);
4675              
4676 0 0         for (i = 0; i < tc; i++) {
4677 0           SV **elem = av_fetch(topics, i, 0);
4678 0 0         if (!elem) continue;
4679             STRLEN tlen;
4680 0           const char *tname = SvPV(*elem, tlen);
4681 0           kf_buf_append_string(&body, tname, (int16_t)tlen);
4682             }
4683              
4684 0           kf_buf_append_i32(&body, timeout_ms);
4685              
4686 0           conn_send_request(aTHX_ self, API_DELETE_TOPICS, ver, &body, cb, 0, 0);
4687 0           kf_buf_free(&body);
4688             }
4689              
4690             void
4691             init_producer_id(EV::Kafka::Conn self, SV *transactional_id_sv, int txn_timeout_ms, SV *cb)
4692             CODE:
4693             {
4694 0 0         if (self->state != CONN_READY)
4695 0           croak("not connected");
4696              
4697 0           int16_t ver = self->api_versions[API_INIT_PRODUCER_ID];
4698 0 0         if (ver < 0) ver = 0;
4699 0 0         if (ver > 1) ver = 1;
4700              
4701             kf_buf_t body;
4702 0           kf_buf_init(&body);
4703              
4704 0 0         if (SvOK(transactional_id_sv)) {
4705             STRLEN tlen;
4706 0           const char *tid = SvPV(transactional_id_sv, tlen);
4707 0           kf_buf_append_string(&body, tid, (int16_t)tlen);
4708             } else {
4709 0           kf_buf_append_nullable_string(&body, NULL, 0);
4710             }
4711              
4712 0           kf_buf_append_i32(&body, txn_timeout_ms);
4713              
4714             /* v2+: producer_id(i64=-1), producer_epoch(i16=-1) */
4715 0 0         if (ver >= 2) {
4716 0           kf_buf_append_i64(&body, -1);
4717 0           kf_buf_append_i16(&body, -1);
4718             }
4719              
4720 0           conn_send_request(aTHX_ self, API_INIT_PRODUCER_ID, ver, &body, cb, 0, 0);
4721 0           kf_buf_free(&body);
4722             }
4723              
4724             void
4725             add_partitions_to_txn(EV::Kafka::Conn self, const char *transactional_id, SV *producer_id_sv, int producer_epoch, SV *topics_sv, SV *cb)
4726             CODE:
4727             {
4728 0 0         if (self->state != CONN_READY)
4729 0           croak("not connected");
4730              
4731 0           int16_t ver = self->api_versions[API_ADD_PARTITIONS_TXN];
4732 0 0         if (ver < 0) ver = 0;
4733 0 0         if (ver > 1) ver = 1;
4734              
4735 0           int64_t pid = SvIV(producer_id_sv);
4736              
4737             kf_buf_t body;
4738 0           kf_buf_init(&body);
4739              
4740 0           STRLEN tid_len = strlen(transactional_id);
4741 0           kf_buf_append_string(&body, transactional_id, (int16_t)tid_len);
4742 0           kf_buf_append_i64(&body, pid);
4743 0           kf_buf_append_i16(&body, (int16_t)producer_epoch);
4744              
4745             /* topics: ARRAY of {topic, partitions: ARRAY(i32)} */
4746 0 0         if (!SvROK(topics_sv) || SvTYPE(SvRV(topics_sv)) != SVt_PVAV)
    0          
4747 0           croak("add_partitions_to_txn: expected arrayref");
4748 0           AV *topics = (AV*)SvRV(topics_sv);
4749 0           SSize_t i, tc = av_len(topics) + 1;
4750 0           kf_buf_append_i32(&body, (int32_t)tc);
4751              
4752 0 0         for (i = 0; i < tc; i++) {
4753 0           SV **elem = av_fetch(topics, i, 0);
4754 0 0         if (!elem || !SvROK(*elem)) croak("add_partitions_to_txn: bad element");
    0          
4755 0           HV *th = (HV*)SvRV(*elem);
4756 0           SV **tname_sv = hv_fetch(th, "topic", 5, 0);
4757 0 0         if (!tname_sv) croak("add_partitions_to_txn: missing topic");
4758             STRLEN tnlen;
4759 0           const char *tname = SvPV(*tname_sv, tnlen);
4760 0           kf_buf_append_string(&body, tname, (int16_t)tnlen);
4761              
4762 0           SV **parts_sv = hv_fetch(th, "partitions", 10, 0);
4763 0 0         if (!parts_sv || !SvROK(*parts_sv)) croak("add_partitions_to_txn: missing partitions");
    0          
4764 0           AV *parts = (AV*)SvRV(*parts_sv);
4765 0           SSize_t j, pc = av_len(parts) + 1;
4766 0           kf_buf_append_i32(&body, (int32_t)pc);
4767 0 0         for (j = 0; j < pc; j++) {
4768 0           SV **pelem = av_fetch(parts, j, 0);
4769 0 0         kf_buf_append_i32(&body, pelem ? (int32_t)SvIV(*pelem) : 0);
4770             }
4771             }
4772              
4773 0           conn_send_request(aTHX_ self, API_ADD_PARTITIONS_TXN, ver, &body, cb, 0, 0);
4774 0           kf_buf_free(&body);
4775             }
4776              
4777             void
4778             end_txn(EV::Kafka::Conn self, const char *transactional_id, SV *producer_id_sv, int producer_epoch, int committed, SV *cb)
4779             CODE:
4780             {
4781 0 0         if (self->state != CONN_READY)
4782 0           croak("not connected");
4783              
4784 0           int16_t ver = self->api_versions[API_END_TXN];
4785 0 0         if (ver < 0) ver = 0;
4786 0 0         if (ver > 1) ver = 1;
4787              
4788 0           int64_t pid = SvIV(producer_id_sv);
4789              
4790             kf_buf_t body;
4791 0           kf_buf_init(&body);
4792              
4793 0           STRLEN tid_len = strlen(transactional_id);
4794 0           kf_buf_append_string(&body, transactional_id, (int16_t)tid_len);
4795 0           kf_buf_append_i64(&body, pid);
4796 0           kf_buf_append_i16(&body, (int16_t)producer_epoch);
4797 0           kf_buf_append_i8(&body, committed ? 1 : 0);
4798              
4799 0           conn_send_request(aTHX_ self, API_END_TXN, ver, &body, cb, 0, 0);
4800 0           kf_buf_free(&body);
4801             }
4802              
4803             void
4804             txn_offset_commit(EV::Kafka::Conn self, const char *transactional_id, const char *group_id, SV *producer_id_sv, int producer_epoch, int generation_id, const char *member_id, SV *offsets_sv, SV *cb)
4805             CODE:
4806             {
4807 0 0         if (self->state != CONN_READY)
4808 0           croak("not connected");
4809              
4810 0           int16_t ver = self->api_versions[API_TXN_OFFSET_COMMIT];
4811 0 0         if (ver < 0) ver = 0;
4812 0 0         if (ver > 3) ver = 3;
4813              
4814 0           int64_t pid = SvIV(producer_id_sv);
4815              
4816             kf_buf_t body;
4817 0           kf_buf_init(&body);
4818              
4819 0           STRLEN tid_len = strlen(transactional_id);
4820 0           kf_buf_append_string(&body, transactional_id, (int16_t)tid_len);
4821              
4822 0           STRLEN gid_len = strlen(group_id);
4823 0           kf_buf_append_string(&body, group_id, (int16_t)gid_len);
4824              
4825 0           kf_buf_append_i64(&body, pid);
4826 0           kf_buf_append_i16(&body, (int16_t)producer_epoch);
4827              
4828             /* v3+: generation_id, member_id, group_instance_id */
4829 0 0         if (ver >= 3) {
4830 0           kf_buf_append_i32(&body, generation_id);
4831 0           STRLEN mid_len = strlen(member_id);
4832 0           kf_buf_append_string(&body, member_id, (int16_t)mid_len);
4833 0           kf_buf_append_nullable_string(&body, NULL, 0); /* group_instance_id */
4834             }
4835              
4836             /* offsets: ARRAY of {topic, partitions: [{partition, offset}]} */
4837 0 0         if (!SvROK(offsets_sv) || SvTYPE(SvRV(offsets_sv)) != SVt_PVAV)
    0          
4838 0           croak("txn_offset_commit: expected arrayref");
4839 0           AV *topics = (AV*)SvRV(offsets_sv);
4840 0           SSize_t i, tc = av_len(topics) + 1;
4841 0           kf_buf_append_i32(&body, (int32_t)tc);
4842              
4843 0 0         for (i = 0; i < tc; i++) {
4844 0           SV **elem = av_fetch(topics, i, 0);
4845 0 0         if (!elem || !SvROK(*elem)) croak("txn_offset_commit: bad element");
    0          
4846 0           HV *th = (HV*)SvRV(*elem);
4847              
4848 0           SV **tname_sv = hv_fetch(th, "topic", 5, 0);
4849 0 0         if (!tname_sv) croak("txn_offset_commit: missing topic");
4850             STRLEN tnlen;
4851 0           const char *tname = SvPV(*tname_sv, tnlen);
4852 0           kf_buf_append_string(&body, tname, (int16_t)tnlen);
4853              
4854 0           SV **parts_sv = hv_fetch(th, "partitions", 10, 0);
4855 0 0         if (!parts_sv || !SvROK(*parts_sv)) croak("txn_offset_commit: missing partitions");
    0          
4856 0           AV *parts = (AV*)SvRV(*parts_sv);
4857 0           SSize_t j, pc = av_len(parts) + 1;
4858 0           kf_buf_append_i32(&body, (int32_t)pc);
4859              
4860 0 0         for (j = 0; j < pc; j++) {
4861 0           SV **pelem = av_fetch(parts, j, 0);
4862 0 0         if (!pelem || !SvROK(*pelem)) croak("txn_offset_commit: bad partition");
    0          
4863 0           HV *ph = (HV*)SvRV(*pelem);
4864              
4865 0           SV **ppid_sv = hv_fetch(ph, "partition", 9, 0);
4866 0 0         kf_buf_append_i32(&body, ppid_sv ? (int32_t)SvIV(*ppid_sv) : 0);
4867              
4868 0           SV **off_sv = hv_fetch(ph, "offset", 6, 0);
4869 0 0         kf_buf_append_i64(&body, off_sv ? (int64_t)SvIV(*off_sv) : 0);
4870              
4871             /* leader_epoch (v2+) */
4872 0 0         if (ver >= 2)
4873 0           kf_buf_append_i32(&body, -1);
4874              
4875 0           kf_buf_append_nullable_string(&body, "", 0); /* metadata */
4876             }
4877             }
4878              
4879 0           conn_send_request(aTHX_ self, API_TXN_OFFSET_COMMIT, ver, &body, cb, 0, 0);
4880 0           kf_buf_free(&body);
4881             }
4882              
4883             MODULE = EV::Kafka PACKAGE = EV::Kafka
4884              
4885             int
4886             _murmur2(SV *data_sv)
4887             CODE:
4888             {
4889             STRLEN len;
4890 16           const unsigned char *data = (const unsigned char *)SvPV(data_sv, len);
4891 16           uint32_t h = 0x9747b28c ^ (uint32_t)len;
4892 16           const uint32_t m = 0x5bd1e995;
4893 16           size_t i = 0;
4894 16           size_t remaining = len;
4895              
4896 275 100         while (remaining >= 4) {
4897             uint32_t k;
4898 259           memcpy(&k, data + i, 4); /* little-endian on x86, matches Java */
4899 259           k *= m;
4900 259           k ^= k >> 24;
4901 259           k *= m;
4902 259           h *= m;
4903 259           h ^= k;
4904 259           i += 4;
4905 259           remaining -= 4;
4906             }
4907              
4908 16           switch (remaining) {
4909 5           case 3: h ^= (uint32_t)data[i + 2] << 16; /* fallthrough */
4910 7           case 2: h ^= (uint32_t)data[i + 1] << 8; /* fallthrough */
4911 10           case 1: h ^= (uint32_t)data[i]; h *= m;
4912             }
4913              
4914 16           h ^= h >> 13;
4915 16           h *= m;
4916 16           h ^= h >> 15;
4917              
4918 16 100         RETVAL = (int)(h & 0x7FFFFFFF);
4919             }
4920             OUTPUT:
4921             RETVAL
4922              
4923             unsigned int
4924             _crc32c(SV *data_sv)
4925             CODE:
4926             {
4927             STRLEN len;
4928 7           const char *data = SvPV(data_sv, len);
4929 7           RETVAL = crc32c(data, len);
4930             }
4931             OUTPUT:
4932             RETVAL
4933              
4934             void
4935             _error_name(int code)
4936             PPCODE:
4937             {
4938 12           const char *name = NULL;
4939 12           switch (code) {
4940 1           case 0: name = "NONE"; break;
4941 1           case 1: name = "OFFSET_OUT_OF_RANGE"; break;
4942 0           case 2: name = "CORRUPT_MESSAGE"; break;
4943 1           case 3: name = "UNKNOWN_TOPIC_OR_PARTITION"; break;
4944 0           case 5: name = "LEADER_NOT_AVAILABLE"; break;
4945 1           case 6: name = "NOT_LEADER_OR_FOLLOWER"; break;
4946 0           case 7: name = "REQUEST_TIMED_OUT"; break;
4947 0           case 10: name = "MESSAGE_TOO_LARGE"; break;
4948 1           case 15: name = "COORDINATOR_NOT_AVAILABLE"; break;
4949 1           case 16: name = "NOT_COORDINATOR"; break;
4950 0           case 17: name = "INVALID_TOPIC_EXCEPTION"; break;
4951 0           case 19: name = "NOT_ENOUGH_REPLICAS"; break;
4952 0           case 20: name = "NOT_ENOUGH_REPLICAS_AFTER_APPEND"; break;
4953 0           case 22: name = "ILLEGAL_GENERATION"; break;
4954 0           case 25: name = "UNKNOWN_MEMBER_ID"; break;
4955 0           case 26: name = "INVALID_SESSION_TIMEOUT"; break;
4956 1           case 27: name = "REBALANCE_IN_PROGRESS"; break;
4957 0           case 35: name = "UNSUPPORTED_VERSION"; break;
4958 1           case 36: name = "TOPIC_ALREADY_EXISTS"; break;
4959 0           case 39: name = "REASSIGNMENT_IN_PROGRESS"; break;
4960 0           case 41: name = "NOT_CONTROLLER"; break;
4961 1           case 45: name = "OUT_OF_ORDER_SEQUENCE_NUMBER"; break;
4962 1           case 46: name = "DUPLICATE_SEQUENCE_NUMBER"; break;
4963 0           case 47: name = "INVALID_REPLICATION_FACTOR"; break;
4964 0           case 58: name = "SASL_AUTHENTICATION_FAILED"; break;
4965 0           case 72: name = "LISTENER_NOT_FOUND"; break;
4966 1           case 79: name = "MEMBER_ID_REQUIRED"; break;
4967 1           default: name = "UNKNOWN"; break;
4968             }
4969 12 50         EXTEND(SP, 1);
4970 12           mPUSHp(name, strlen(name));
4971 12           XSRETURN(1);
4972             }
4973              
4974             # ---- internal test helpers (not part of the public API) ----
4975              
4976             SV*
4977             _test_zigzag_i32(int v)
4978             CODE:
4979             {
4980 5           int32_t v32 = (int32_t)v;
4981 5           uint32_t z = (uint32_t)((v32 << 1) ^ (v32 >> 31));
4982 5           RETVAL = newSVuv(z);
4983             }
4984             OUTPUT:
4985             RETVAL
4986              
4987             SV*
4988             _test_zigzag_i64(SV *v_sv)
4989             CODE:
4990             {
4991 5           int64_t v = (int64_t)SvIV(v_sv);
4992 5           uint64_t z = (uint64_t)((v << 1) ^ (v >> 63));
4993 5           RETVAL = newSVuv(z);
4994             }
4995             OUTPUT:
4996             RETVAL
4997              
4998             # Encode then decode a varint; returns the round-tripped i64 or undef on
4999             # malformed input.
5000             SV*
5001             _test_varint_roundtrip(SV *v_sv)
5002             CODE:
5003             {
5004 14           int64_t v = (int64_t)SvIV(v_sv);
5005             kf_buf_t b;
5006 14           kf_buf_init(&b);
5007 14           kf_buf_append_varint(&b, v);
5008             int64_t out;
5009 14           int n = kf_read_varint(b.data, b.data + b.len, &out);
5010 14 50         if (n < 0) RETVAL = &PL_sv_undef;
5011 14           else RETVAL = newSViv((IV)out);
5012 14           kf_buf_free(&b);
5013             }
5014             OUTPUT:
5015             RETVAL
5016              
5017             # Encode a single-record RecordBatch v2 with given options.
5018             # opts: { compression => 0|1|3, producer_id, producer_epoch,
5019             # base_sequence, is_transactional, timestamp }.
5020             SV*
5021             _test_encode_batch(SV *records_sv, SV *opts_sv = NULL)
5022             CODE:
5023             {
5024 16 50         if (!SvROK(records_sv) || SvTYPE(SvRV(records_sv)) != SVt_PVAV)
    50          
5025 0           croak("_test_encode_batch: arrayref required");
5026 16           AV *records = (AV*)SvRV(records_sv);
5027              
5028 16           int compression = 0;
5029 16           int64_t producer_id = -1;
5030 16           int16_t producer_epoch = -1;
5031 16           int32_t base_sequence = -1;
5032 16           int is_txn = 0;
5033 16           int64_t ts = 0;
5034 16 100         if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV) {
    50          
    50          
5035 11           HV *opts = (HV*)SvRV(opts_sv);
5036             SV **v;
5037 11 100         if ((v = hv_fetchs(opts, "compression", 0)) && SvOK(*v))
    50          
5038 10           compression = (int)SvIV(*v);
5039 11 100         if ((v = hv_fetchs(opts, "producer_id", 0)) && SvOK(*v))
    50          
5040 1           producer_id = (int64_t)SvIV(*v);
5041 11 100         if ((v = hv_fetchs(opts, "producer_epoch", 0)) && SvOK(*v))
    50          
5042 1           producer_epoch = (int16_t)SvIV(*v);
5043 11 100         if ((v = hv_fetchs(opts, "base_sequence", 0)) && SvOK(*v))
    50          
5044 1           base_sequence = (int32_t)SvIV(*v);
5045 11 50         if ((v = hv_fetchs(opts, "is_transactional", 0)) && SvOK(*v))
    0          
5046 0           is_txn = (int)SvIV(*v);
5047 11 50         if ((v = hv_fetchs(opts, "timestamp", 0)) && SvOK(*v))
    0          
5048 0           ts = (int64_t)SvIV(*v);
5049             }
5050              
5051             kf_buf_t out;
5052 16           kf_encode_record_batch_multi(aTHX_ &out, records, ts, compression,
5053             producer_id, producer_epoch, base_sequence, is_txn);
5054 16           RETVAL = newSVpvn(out.data, out.len);
5055 16           kf_buf_free(&out);
5056             }
5057             OUTPUT:
5058             RETVAL
5059              
5060             # Parse a captured response body of a known API. Returns the parsed hash
5061             # (same shape user callbacks see) or undef if the parser rejected the bytes.
5062             # api: 'metadata' | 'produce' | 'fetch' | 'list_offsets' | 'find_coordinator'.
5063             SV*
5064             _test_parse_response(const char *api, int version, SV *bytes_sv)
5065             CODE:
5066             {
5067             STRLEN len;
5068 15           const char *data = SvPV(bytes_sv, len);
5069             ev_kafka_conn_t stub;
5070 15           memset(&stub, 0, sizeof(stub));
5071 15           stub.magic = KF_MAGIC_ALIVE;
5072 15           stub.fd = -1;
5073 15           SV *result = NULL;
5074 15 100         if (strcmp(api, "metadata") == 0) {
5075 2           result = conn_parse_metadata_response(aTHX_ &stub, (int16_t)version, data, len);
5076 13 100         } else if (strcmp(api, "produce") == 0) {
5077 1           result = conn_parse_produce_response(aTHX_ &stub, (int16_t)version, data, len);
5078 12 50         } else if (strcmp(api, "fetch") == 0) {
5079 0           result = conn_parse_fetch_response(aTHX_ &stub, (int16_t)version, data, len);
5080 12 50         } else if (strcmp(api, "list_offsets") == 0) {
5081 0           result = conn_parse_list_offsets_response(aTHX_ &stub, (int16_t)version, data, len);
5082 12 100         } else if (strcmp(api, "find_coordinator") == 0) {
5083 1           result = conn_parse_find_coordinator_response(aTHX_ &stub, (int16_t)version, data, len);
5084 11 100         } else if (strcmp(api, "join_group") == 0) {
5085 1           result = conn_parse_join_group_response(aTHX_ &stub, (int16_t)version, data, len);
5086 10 100         } else if (strcmp(api, "sync_group") == 0) {
5087 1           result = conn_parse_sync_group_response(aTHX_ &stub, (int16_t)version, data, len);
5088 9 100         } else if (strcmp(api, "heartbeat") == 0) {
5089 1           result = conn_parse_heartbeat_response(aTHX_ &stub, (int16_t)version, data, len);
5090 8 50         } else if (strcmp(api, "offset_commit") == 0) {
5091 0           result = conn_parse_offset_commit_response(aTHX_ &stub, (int16_t)version, data, len);
5092 8 100         } else if (strcmp(api, "offset_fetch") == 0) {
5093 1           result = conn_parse_offset_fetch_response(aTHX_ &stub, (int16_t)version, data, len);
5094 7 100         } else if (strcmp(api, "leave_group") == 0) {
5095 1           result = conn_parse_leave_group_response(aTHX_ &stub, (int16_t)version, data, len);
5096 6 100         } else if (strcmp(api, "create_topics") == 0) {
5097 1           result = conn_parse_create_topics_response(aTHX_ &stub, (int16_t)version, data, len);
5098 5 100         } else if (strcmp(api, "delete_topics") == 0) {
5099 1           result = conn_parse_delete_topics_response(aTHX_ &stub, (int16_t)version, data, len);
5100 4 100         } else if (strcmp(api, "init_producer_id") == 0) {
5101 1           result = conn_parse_init_producer_id_response(aTHX_ &stub, (int16_t)version, data, len);
5102 3 100         } else if (strcmp(api, "add_partitions_to_txn") == 0) {
5103 1           result = conn_parse_add_partitions_to_txn_response(aTHX_ &stub, (int16_t)version, data, len);
5104 2 100         } else if (strcmp(api, "end_txn") == 0) {
5105 1           result = conn_parse_end_txn_response(aTHX_ &stub, (int16_t)version, data, len);
5106 1 50         } else if (strcmp(api, "txn_offset_commit") == 0) {
5107 1           result = conn_parse_txn_offset_commit_response(aTHX_ &stub, (int16_t)version, data, len);
5108             } else {
5109 0           croak("_test_parse_response: unknown api '%s'", api);
5110             }
5111 15 50         RETVAL = result ? SvREFCNT_inc(result) : &PL_sv_undef;
5112             }
5113             OUTPUT:
5114             RETVAL
5115              
5116             # Decode a RecordBatch v2 blob. Returns arrayref of {offset, key, value,
5117             # headers} on success, or undef on malformed input (CRC mismatch, etc.).
5118             SV*
5119             _test_decode_batch(SV *bytes_sv)
5120             CODE:
5121             {
5122             STRLEN len;
5123 16           const char *data = SvPV(bytes_sv, len);
5124 16           AV *records_av = newAV();
5125             int64_t base_offset;
5126 16           int n = kf_decode_record_batch(aTHX_ data, len, records_av, &base_offset);
5127 16 100         if (n < 0) {
5128 3           SvREFCNT_dec((SV*)records_av);
5129 3           RETVAL = &PL_sv_undef;
5130             } else {
5131 13           RETVAL = newRV_noinc((SV*)records_av);
5132             }
5133             }
5134             OUTPUT:
5135             RETVAL
5136              
5137             BOOT:
5138             {
5139 21 50         I_EV_API("EV::Kafka");
    50          
    50          
5140 21           crc32c_init_table();
5141             }