File Coverage

Kafka.xs
Criterion Covered Total %
statement 259 2664 9.7
branch 101 1778 5.6
condition n/a
subroutine n/a
pod n/a
total 360 4442 8.1


line stmt bran cond sub pod time code
1             #include "EXTERN.h"
2             #include "perl.h"
3             #include "XSUB.h"
4             #include "ppport.h"
5              
6             #include "EVAPI.h"
7             #include "ngx-queue.h"
8              
9             #include
10             #include
11             #include
12             #include
13             #include
14             #include
15             #include
16             #include
17             #include
18             #include
19              
20             #ifdef HAVE_OPENSSL
21             #include
22             #include
23             #include
24             #include
25             #include
26             #include
27             #include
28             #include
29             #include
30             #endif
31              
32             #ifdef HAVE_LZ4
33             #include
34             #endif
35              
36             #ifdef HAVE_ZLIB
37             #include
38             #endif
39              
40             /* ================================================================
41             * Constants
42             * ================================================================ */
43              
44             #define KF_MAGIC_ALIVE 0xCAFEBEEF
45             #define KF_MAGIC_FREED 0xDEADCAFE
46              
47             #define KF_BUF_INIT 16384
48              
49             /* Connection states */
50             #define CONN_DISCONNECTED 0
51             #define CONN_CONNECTING 1
52             #define CONN_TLS_HANDSHAKE 2
53             #define CONN_SASL_HANDSHAKE 3
54             #define CONN_SASL_AUTH 4
55             #define CONN_API_VERSIONS 5
56             #define CONN_READY 6
57              
58             /* Kafka API keys */
59             #define API_PRODUCE 0
60             #define API_FETCH 1
61             #define API_LIST_OFFSETS 2
62             #define API_METADATA 3
63             #define API_OFFSET_COMMIT 8
64             #define API_OFFSET_FETCH 9
65             #define API_FIND_COORDINATOR 10
66             #define API_JOIN_GROUP 11
67             #define API_HEARTBEAT 12
68             #define API_LEAVE_GROUP 13
69             #define API_SYNC_GROUP 14
70             #define API_DESCRIBE_GROUPS 15
71             #define API_LIST_GROUPS 16
72             #define API_SASL_HANDSHAKE 17
73             #define API_API_VERSIONS 18
74             #define API_CREATE_TOPICS 19
75             #define API_DELETE_TOPICS 20
76             #define API_INIT_PRODUCER_ID 22
77             #define API_ADD_PARTITIONS_TXN 24
78             #define API_END_TXN 26
79             #define API_TXN_OFFSET_COMMIT 28
80             #define API_SASL_AUTHENTICATE 36
81              
82             #define API_VERSIONS_MAX_KEY 64
83              
84             /* Compression types */
85             #define COMPRESS_NONE 0
86             #define COMPRESS_GZIP 1
87             #define COMPRESS_SNAPPY 2
88             #define COMPRESS_LZ4 3
89             #define COMPRESS_ZSTD 4
90              
91             #define CLEAR_HANDLER(field) \
92             do { if (NULL != (field)) { SvREFCNT_dec(field); (field) = NULL; } } while(0)
93              
94             /* ================================================================
95             * Type declarations
96             * ================================================================ */
97              
98             typedef struct kf_buf_s kf_buf_t;
99             typedef struct ev_kafka_conn_s ev_kafka_conn_t;
100             typedef struct ev_kafka_conn_cb_s ev_kafka_conn_cb_t;
101             typedef struct kf_topic_meta_s kf_topic_meta_t;
102             typedef struct kf_partition_meta_s kf_partition_meta_t;
103             typedef struct kf_consumer_group_s kf_consumer_group_t;
104             typedef struct ev_kafka_s ev_kafka_t;
105              
106             typedef ev_kafka_conn_t* EV__Kafka__Conn;
107             typedef ev_kafka_t* EV__Kafka;
108              
109             /* ================================================================
110             * Dynamic buffer
111             * ================================================================ */
112              
113             struct kf_buf_s {
114             char *data;
115             size_t len;
116             size_t cap;
117             };
118              
119 0           static void kf_buf_init(kf_buf_t *b) {
120 0           Newx(b->data, 256, char);
121 0           b->len = 0;
122 0           b->cap = 256;
123 0           }
124              
125 0           static void kf_buf_grow(kf_buf_t *b, size_t need) {
126 0 0         if (b->cap >= need) return;
127 0           size_t newcap = b->cap * 2;
128 0 0         if (newcap < need) newcap = need;
129 0           Renew(b->data, newcap, char);
130 0           b->cap = newcap;
131             }
132              
133 0           static void kf_buf_free(kf_buf_t *b) {
134 0 0         if (b->data) { Safefree(b->data); b->data = NULL; }
135 0           b->len = 0;
136 0           b->cap = 0;
137 0           }
138              
139 0           static void kf_buf_append(kf_buf_t *b, const char *data, size_t len) {
140 0           kf_buf_grow(b, b->len + len);
141 0           Copy(data, b->data + b->len, len, char);
142 0           b->len += len;
143 0           }
144              
145 0           static void kf_buf_append_i8(kf_buf_t *b, int8_t val) {
146 0           kf_buf_grow(b, b->len + 1);
147 0           b->data[b->len++] = val;
148 0           }
149              
150 0           static void kf_buf_append_i16(kf_buf_t *b, int16_t val) {
151 0           kf_buf_grow(b, b->len + 2);
152 0           uint16_t v = htons((uint16_t)val);
153 0           memcpy(b->data + b->len, &v, 2);
154 0           b->len += 2;
155 0           }
156              
157 0           static void kf_buf_append_i32(kf_buf_t *b, int32_t val) {
158 0           kf_buf_grow(b, b->len + 4);
159 0           uint32_t v = htonl((uint32_t)val);
160 0           memcpy(b->data + b->len, &v, 4);
161 0           b->len += 4;
162 0           }
163              
164 0           static void kf_buf_append_i64(kf_buf_t *b, int64_t val) {
165 0           kf_buf_grow(b, b->len + 8);
166 0           uint32_t hi = htonl((uint32_t)((uint64_t)val >> 32));
167 0           uint32_t lo = htonl((uint32_t)((uint64_t)val & 0xFFFFFFFF));
168 0           memcpy(b->data + b->len, &hi, 4);
169 0           memcpy(b->data + b->len + 4, &lo, 4);
170 0           b->len += 8;
171 0           }
172              
173             /* Kafka STRING: INT16 length + bytes. NULL → -1 length */
174 0           static void kf_buf_append_string(kf_buf_t *b, const char *s, int16_t len) {
175 0           kf_buf_append_i16(b, len);
176 0 0         if (len > 0) kf_buf_append(b, s, len);
177 0           }
178              
179 0           static void kf_buf_append_nullable_string(kf_buf_t *b, const char *s, int16_t len) {
180 0 0         if (!s) {
181 0           kf_buf_append_i16(b, -1);
182             } else {
183 0           kf_buf_append_i16(b, len);
184 0 0         if (len > 0) kf_buf_append(b, s, len);
185             }
186 0           }
187              
188             /* Kafka BYTES: INT32 length + bytes */
189 0           static void kf_buf_append_bytes(kf_buf_t *b, const char *s, int32_t len) {
190 0           kf_buf_append_i32(b, len);
191 0 0         if (len > 0) kf_buf_append(b, s, len);
192 0           }
193              
194 0           static void kf_buf_append_nullable_bytes(kf_buf_t *b, const char *s, int32_t len) {
195 0 0         if (!s) {
196 0           kf_buf_append_i32(b, -1);
197             } else {
198 0           kf_buf_append_i32(b, len);
199 0 0         if (len > 0) kf_buf_append(b, s, len);
200             }
201 0           }
202              
203             /* Unsigned varint (ZigZag for signed is not used in Kafka framing) */
204 0           static void kf_buf_append_uvarint(kf_buf_t *b, uint64_t val) {
205 0           kf_buf_grow(b, b->len + 10);
206 0 0         while (val >= 0x80) {
207 0           b->data[b->len++] = (char)((val & 0x7F) | 0x80);
208 0           val >>= 7;
209             }
210 0           b->data[b->len++] = (char)val;
211 0           }
212              
213             /* Signed varint (ZigZag encoding) */
214 0           static void kf_buf_append_varint(kf_buf_t *b, int64_t val) {
215 0           uint64_t uval = ((uint64_t)val << 1) ^ (uint64_t)(val >> 63);
216 0           kf_buf_append_uvarint(b, uval);
217 0           }
218              
219             /* Compact string: uvarint(len+1) + bytes. NULL → 0 */
220 0           static void kf_buf_append_compact_string(kf_buf_t *b, const char *s, int32_t len) {
221 0 0         if (!s) {
222 0           kf_buf_append_uvarint(b, 0);
223             } else {
224 0           kf_buf_append_uvarint(b, (uint64_t)len + 1);
225 0 0         if (len > 0) kf_buf_append(b, s, len);
226             }
227 0           }
228              
229             /* Tagged fields: just 0 = no tagged fields */
230 0           static void kf_buf_append_tagged_fields(kf_buf_t *b) {
231 0           kf_buf_append_uvarint(b, 0);
232 0           }
233              
234             /* ================================================================
235             * Wire read helpers (big-endian)
236             * ================================================================ */
237              
238 0           static int16_t kf_read_i16(const char *buf) {
239             uint16_t v;
240 0           memcpy(&v, buf, 2);
241 0           return (int16_t)ntohs(v);
242             }
243              
244 0           static int32_t kf_read_i32(const char *buf) {
245             uint32_t v;
246 0           memcpy(&v, buf, 4);
247 0           return (int32_t)ntohl(v);
248             }
249              
250 0           static int64_t kf_read_i64(const char *buf) {
251             uint32_t hi, lo;
252 0           memcpy(&hi, buf, 4);
253 0           memcpy(&lo, buf + 4, 4);
254 0           return ((int64_t)ntohl(hi) << 32) | (uint32_t)ntohl(lo);
255             }
256              
257             /* Read unsigned varint, returns bytes consumed or -1 on error */
258 0           static int kf_read_uvarint(const char *buf, const char *end, uint64_t *out) {
259 0           uint64_t val = 0;
260 0           int shift = 0;
261 0           const char *p = buf;
262 0 0         while (p < end) {
263 0           uint8_t b = (uint8_t)*p++;
264 0           val |= ((uint64_t)(b & 0x7F)) << shift;
265 0 0         if (!(b & 0x80)) {
266 0           *out = val;
267 0           return (int)(p - buf);
268             }
269 0           shift += 7;
270 0 0         if (shift >= 64) return -1;
271             }
272 0           return -1; /* incomplete */
273             }
274              
275             /* Read signed varint (ZigZag) */
276 0           static int kf_read_varint(const char *buf, const char *end, int64_t *out) {
277             uint64_t uval;
278 0           int n = kf_read_uvarint(buf, end, &uval);
279 0 0         if (n < 0) return n;
280 0           *out = (int64_t)((uval >> 1) ^ -(int64_t)(uval & 1));
281 0           return n;
282             }
283              
284             /* Read Kafka STRING: i16 len + bytes. Returns pointer into buf, sets *len. Returns bytes consumed. */
285 0           static int kf_read_string(const char *buf, const char *end, const char **out, int16_t *slen) {
286 0 0         if (end - buf < 2) return -1;
287 0           int16_t len = kf_read_i16(buf);
288 0 0         if (len < 0) { /* nullable null */
289 0           *out = NULL;
290 0           *slen = 0;
291 0           return 2;
292             }
293 0 0         if (end - buf < 2 + len) return -1;
294 0           *out = buf + 2;
295 0           *slen = len;
296 0           return 2 + len;
297             }
298              
299             /* Read compact string: uvarint(len+1) + bytes */
300 0           static int kf_read_compact_string(const char *buf, const char *end, const char **out, int32_t *slen) {
301             uint64_t raw;
302 0           int n = kf_read_uvarint(buf, end, &raw);
303 0 0         if (n < 0) return -1;
304 0 0         if (raw == 0) {
305 0           *out = NULL;
306 0           *slen = 0;
307 0           return n;
308             }
309 0           int32_t len = (int32_t)(raw - 1);
310 0 0         if (end - buf - n < len) return -1;
311 0           *out = buf + n;
312 0           *slen = len;
313 0           return n + len;
314             }
315              
316             /* Skip tagged fields */
317 0           static int kf_skip_tagged_fields(const char *buf, const char *end) {
318             uint64_t count;
319 0           int n = kf_read_uvarint(buf, end, &count);
320 0 0         if (n < 0) return -1;
321 0           const char *p = buf + n;
322             uint64_t i;
323 0 0         for (i = 0; i < count; i++) {
324             uint64_t tag;
325 0           int tn = kf_read_uvarint(p, end, &tag);
326 0 0         if (tn < 0) return -1;
327 0           p += tn;
328             uint64_t dlen;
329 0           int dn = kf_read_uvarint(p, end, &dlen);
330 0 0         if (dn < 0) return -1;
331 0           p += dn;
332 0 0         if ((uint64_t)(end - p) < dlen) return -1;
333 0           p += dlen;
334             }
335 0           return (int)(p - buf);
336             }
337              
338             /* ================================================================
339             * CRC32C (software implementation)
340             * ================================================================ */
341              
342             static uint32_t crc32c_table[256];
343             static int crc32c_table_inited = 0;
344              
345 15           static void crc32c_init_table(void) {
346             uint32_t i, j;
347 3855 100         for (i = 0; i < 256; i++) {
348 3840           uint32_t crc = i;
349 34560 100         for (j = 0; j < 8; j++) {
350 30720 100         if (crc & 1)
351 15360           crc = (crc >> 1) ^ 0x82F63B78;
352             else
353 15360           crc >>= 1;
354             }
355 3840           crc32c_table[i] = crc;
356             }
357 15           crc32c_table_inited = 1;
358 15           }
359              
360 7           static uint32_t crc32c(const char *buf, size_t len) {
361 7           uint32_t crc = 0xFFFFFFFF;
362             size_t i;
363 7 50         if (!crc32c_table_inited) crc32c_init_table();
364 31 100         for (i = 0; i < len; i++)
365 24           crc = crc32c_table[(crc ^ (uint8_t)buf[i]) & 0xFF] ^ (crc >> 8);
366 7           return crc ^ 0xFFFFFFFF;
367             }
368              
369             /* ================================================================
370             * Connection callback struct
371             * ================================================================ */
372              
373             struct ev_kafka_conn_cb_s {
374             SV *cb;
375             ngx_queue_t queue;
376             int32_t correlation_id;
377             int16_t api_key;
378             int16_t api_version;
379             int internal; /* 1 = internal handshake cb, don't invoke Perl */
380             };
381              
382             /* ================================================================
383             * Broker connection struct (Layer 1)
384             * ================================================================ */
385              
386             struct ev_kafka_conn_s {
387             unsigned int magic;
388             struct ev_loop *loop;
389             int fd;
390             int state;
391              
392             /* EV watchers */
393             ev_io rio, wio;
394             ev_timer timer;
395             int reading, writing, timing;
396              
397             /* TLS */
398             #ifdef HAVE_OPENSSL
399             SSL_CTX *ssl_ctx;
400             SSL *ssl;
401             #endif
402             int tls_enabled;
403             char *tls_ca_file;
404             int tls_skip_verify;
405              
406             /* Connection params */
407             char *host;
408             int port;
409             int node_id;
410              
411             /* SASL */
412             char *sasl_mechanism;
413             char *sasl_username;
414             char *sasl_password;
415              
416             /* SCRAM state */
417             int scram_step;
418             char *scram_nonce;
419             char *scram_client_first;
420             size_t scram_client_first_len;
421              
422             /* Buffers */
423             char *rbuf;
424             size_t rbuf_len, rbuf_cap;
425             char *wbuf;
426             size_t wbuf_len, wbuf_off, wbuf_cap;
427              
428             /* Request/response correlation */
429             ngx_queue_t cb_queue;
430             int32_t next_correlation_id;
431             int pending_count;
432              
433             /* Client identity */
434             char *client_id;
435             int client_id_len;
436              
437             /* API version negotiation */
438             int16_t api_versions[API_VERSIONS_MAX_KEY];
439             int api_versions_known;
440              
441             /* Event handlers */
442             SV *on_error;
443             SV *on_connect;
444             SV *on_disconnect;
445              
446             /* Reconnection */
447             int auto_reconnect;
448             int reconnect_delay_ms;
449             ev_timer reconnect_timer;
450             int reconnect_timing;
451             int intentional_disconnect;
452              
453             /* Safety */
454             int callback_depth;
455              
456             /* Back-pointer to cluster */
457             ev_kafka_t *cluster;
458             ngx_queue_t cluster_queue;
459             };
460              
461             /* ================================================================
462             * Forward declarations
463             * ================================================================ */
464              
465             static void conn_io_cb(EV_P_ ev_io *w, int revents);
466             static void conn_timer_cb(EV_P_ ev_timer *w, int revents);
467             static void conn_reconnect_timer_cb(EV_P_ ev_timer *w, int revents);
468             static void conn_start_reading(ev_kafka_conn_t *self);
469             static void conn_stop_reading(ev_kafka_conn_t *self);
470             static void conn_start_writing(ev_kafka_conn_t *self);
471             static void conn_stop_writing(ev_kafka_conn_t *self);
472             static void conn_emit_error(pTHX_ ev_kafka_conn_t *self, const char *msg);
473             static void conn_cleanup(pTHX_ ev_kafka_conn_t *self);
474             static int conn_check_destroyed(ev_kafka_conn_t *self);
475             static void conn_cancel_pending(pTHX_ ev_kafka_conn_t *self, const char *err);
476             static void conn_on_connect_done(pTHX_ ev_kafka_conn_t *self);
477             static void conn_process_responses(pTHX_ ev_kafka_conn_t *self);
478             static void conn_schedule_reconnect(pTHX_ ev_kafka_conn_t *self);
479             static int32_t conn_send_request(pTHX_ ev_kafka_conn_t *self,
480             int16_t api_key, int16_t api_version, kf_buf_t *body, SV *cb,
481             int internal, int no_response);
482             static void conn_send_api_versions(pTHX_ ev_kafka_conn_t *self);
483             static void conn_start_connect(pTHX_ ev_kafka_conn_t *self,
484             const char *host, int port, double timeout);
485             static void conn_send_sasl_handshake(pTHX_ ev_kafka_conn_t *self);
486             static void conn_parse_sasl_handshake_response(pTHX_ ev_kafka_conn_t *self,
487             const char *data, size_t len);
488             static void conn_send_sasl_authenticate(pTHX_ ev_kafka_conn_t *self);
489             static void conn_parse_sasl_authenticate_response(pTHX_ ev_kafka_conn_t *self,
490             const char *data, size_t len);
491              
492             /* Response parsers */
493             static SV* conn_parse_metadata_response(pTHX_ ev_kafka_conn_t *self,
494             int16_t version, const char *data, size_t len);
495             static SV* conn_parse_produce_response(pTHX_ ev_kafka_conn_t *self,
496             int16_t version, const char *data, size_t len);
497             static SV* conn_parse_fetch_response(pTHX_ ev_kafka_conn_t *self,
498             int16_t version, const char *data, size_t len);
499             static SV* conn_parse_list_offsets_response(pTHX_ ev_kafka_conn_t *self,
500             int16_t version, const char *data, size_t len);
501             static SV* conn_parse_find_coordinator_response(pTHX_ ev_kafka_conn_t *self,
502             int16_t version, const char *data, size_t len);
503             static SV* conn_parse_join_group_response(pTHX_ ev_kafka_conn_t *self,
504             int16_t version, const char *data, size_t len);
505             static SV* conn_parse_sync_group_response(pTHX_ ev_kafka_conn_t *self,
506             int16_t version, const char *data, size_t len);
507             static SV* conn_parse_heartbeat_response(pTHX_ ev_kafka_conn_t *self,
508             int16_t version, const char *data, size_t len);
509             static SV* conn_parse_offset_commit_response(pTHX_ ev_kafka_conn_t *self,
510             int16_t version, const char *data, size_t len);
511             static SV* conn_parse_offset_fetch_response(pTHX_ ev_kafka_conn_t *self,
512             int16_t version, const char *data, size_t len);
513             static SV* conn_parse_leave_group_response(pTHX_ ev_kafka_conn_t *self,
514             int16_t version, const char *data, size_t len);
515             static SV* conn_parse_create_topics_response(pTHX_ ev_kafka_conn_t *self,
516             int16_t version, const char *data, size_t len);
517             static SV* conn_parse_delete_topics_response(pTHX_ ev_kafka_conn_t *self,
518             int16_t version, const char *data, size_t len);
519             static SV* conn_parse_init_producer_id_response(pTHX_ ev_kafka_conn_t *self,
520             int16_t version, const char *data, size_t len);
521             static SV* conn_parse_add_partitions_to_txn_response(pTHX_ ev_kafka_conn_t *self,
522             int16_t version, const char *data, size_t len);
523             static SV* conn_parse_end_txn_response(pTHX_ ev_kafka_conn_t *self,
524             int16_t version, const char *data, size_t len);
525             static SV* conn_parse_txn_offset_commit_response(pTHX_ ev_kafka_conn_t *self,
526             int16_t version, const char *data, size_t len);
527              
528             #ifdef HAVE_OPENSSL
529             static int is_ip_literal(const char *host);
530             #endif
531              
532             /* ================================================================
533             * I/O helpers (with optional TLS)
534             * ================================================================ */
535              
536 0           static ssize_t kf_io_read(ev_kafka_conn_t *self, void *buf, size_t len) {
537             #ifdef HAVE_OPENSSL
538 0 0         if (self->ssl) {
539 0 0         int ssl_len = (len > (size_t)INT_MAX) ? INT_MAX : (int)len;
540 0           int ret = SSL_read(self->ssl, buf, ssl_len);
541 0 0         if (ret <= 0) {
542 0           int err = SSL_get_error(self->ssl, ret);
543 0 0         if (err == SSL_ERROR_WANT_READ) {
544 0           errno = EAGAIN;
545 0           return -1;
546             }
547 0 0         if (err == SSL_ERROR_WANT_WRITE) {
548 0           conn_start_writing(self);
549 0           errno = EAGAIN;
550 0           return -1;
551             }
552 0 0         if (err == SSL_ERROR_ZERO_RETURN) return 0;
553 0           errno = EIO;
554 0           return -1;
555             }
556 0           return ret;
557             }
558             #endif
559 0           return read(self->fd, buf, len);
560             }
561              
562 0           static ssize_t kf_io_write(ev_kafka_conn_t *self, const void *buf, size_t len) {
563             #ifdef HAVE_OPENSSL
564 0 0         if (self->ssl) {
565 0 0         int ssl_len = (len > (size_t)INT_MAX) ? INT_MAX : (int)len;
566 0           int ret = SSL_write(self->ssl, buf, ssl_len);
567 0 0         if (ret <= 0) {
568 0           int err = SSL_get_error(self->ssl, ret);
569 0 0         if (err == SSL_ERROR_WANT_WRITE) {
570 0           errno = EAGAIN;
571 0           return -1;
572             }
573 0 0         if (err == SSL_ERROR_WANT_READ) {
574 0           conn_start_reading(self);
575 0           errno = EAGAIN;
576 0           return -1;
577             }
578 0           errno = EIO;
579 0           return -1;
580             }
581 0           return ret;
582             }
583             #endif
584 0           return write(self->fd, buf, len);
585             }
586              
587             /* ================================================================
588             * Buffer management
589             * ================================================================ */
590              
591 0           static void conn_ensure_rbuf(ev_kafka_conn_t *self, size_t need) {
592 0 0         if (self->rbuf_cap >= need) return;
593 0           size_t newcap = self->rbuf_cap * 2;
594 0 0         if (newcap < need) newcap = need;
595 0           Renew(self->rbuf, newcap, char);
596 0           self->rbuf_cap = newcap;
597             }
598              
599 0           static void conn_ensure_wbuf(ev_kafka_conn_t *self, size_t need) {
600 0 0         if (self->wbuf_cap >= need) return;
601 0           size_t newcap = self->wbuf_cap * 2;
602 0 0         if (newcap < need) newcap = need;
603 0           Renew(self->wbuf, newcap, char);
604 0           self->wbuf_cap = newcap;
605             }
606              
607             /* ================================================================
608             * Watcher control
609             * ================================================================ */
610              
611 0           static void conn_start_reading(ev_kafka_conn_t *self) {
612 0 0         if (!self->reading && self->fd >= 0) {
    0          
613 0           ev_io_start(self->loop, &self->rio);
614 0           self->reading = 1;
615             }
616 0           }
617              
618 7           static void conn_stop_reading(ev_kafka_conn_t *self) {
619 7 50         if (self->reading) {
620 0           ev_io_stop(self->loop, &self->rio);
621 0           self->reading = 0;
622             }
623 7           }
624              
625 1           static void conn_start_writing(ev_kafka_conn_t *self) {
626 1 50         if (!self->writing && self->fd >= 0) {
    50          
627 1           ev_io_start(self->loop, &self->wio);
628 1           self->writing = 1;
629             }
630 1           }
631              
632 8           static void conn_stop_writing(ev_kafka_conn_t *self) {
633 8 100         if (self->writing) {
634 1           ev_io_stop(self->loop, &self->wio);
635 1           self->writing = 0;
636             }
637 8           }
638              
639             /* ================================================================
640             * Callback invocation helpers
641             * ================================================================ */
642              
643 1           static void conn_emit_error(pTHX_ ev_kafka_conn_t *self, const char *msg) {
644 1 50         if (!self->on_error)
645 0           croak("EV::Kafka::Conn: %s", msg);
646              
647 1           self->callback_depth++;
648             {
649 1           dSP;
650 1           ENTER;
651 1           SAVETMPS;
652 1 50         PUSHMARK(SP);
653 1 50         XPUSHs(sv_2mortal(newSVpv(msg, 0)));
654 1           PUTBACK;
655 1           call_sv(self->on_error, G_DISCARD | G_EVAL);
656 1 50         if (SvTRUE(ERRSV)) {
    50          
657 0 0         warn("EV::Kafka::Conn: on_error callback error: %s", SvPV_nolen(ERRSV));
658 0 0         sv_setsv(ERRSV, &PL_sv_undef);
659             }
660 1 50         FREETMPS;
661 1           LEAVE;
662             }
663 1           self->callback_depth--;
664 1           }
665              
666 0           static void conn_emit_connect(pTHX_ ev_kafka_conn_t *self) {
667 0 0         if (!self->on_connect) return;
668              
669 0           self->callback_depth++;
670             {
671 0           dSP;
672 0           ENTER;
673 0           SAVETMPS;
674 0 0         PUSHMARK(SP);
675 0           PUTBACK;
676 0           call_sv(self->on_connect, G_DISCARD | G_EVAL);
677 0 0         if (SvTRUE(ERRSV)) {
    0          
678 0 0         warn("EV::Kafka::Conn: on_connect callback error: %s", SvPV_nolen(ERRSV));
679 0 0         sv_setsv(ERRSV, &PL_sv_undef);
680             }
681 0 0         FREETMPS;
682 0           LEAVE;
683             }
684 0           self->callback_depth--;
685             }
686              
687 1           static void conn_emit_disconnect(pTHX_ ev_kafka_conn_t *self) {
688 1 50         if (!self->on_disconnect) return;
689              
690 0           self->callback_depth++;
691             {
692 0           dSP;
693 0           ENTER;
694 0           SAVETMPS;
695 0 0         PUSHMARK(SP);
696 0           PUTBACK;
697 0           call_sv(self->on_disconnect, G_DISCARD | G_EVAL);
698 0 0         if (SvTRUE(ERRSV)) {
    0          
699 0 0         warn("EV::Kafka::Conn: on_disconnect callback error: %s", SvPV_nolen(ERRSV));
700 0 0         sv_setsv(ERRSV, &PL_sv_undef);
701             }
702 0 0         FREETMPS;
703 0           LEAVE;
704             }
705 0           self->callback_depth--;
706             }
707              
708             /* Invoke a command callback: cb->(result, error) */
709 0           static void conn_invoke_cb(pTHX_ ev_kafka_conn_t *self, SV *cb, SV *result, SV *error) {
710 0 0         if (!cb) return;
711              
712 0           self->callback_depth++;
713             {
714 0           dSP;
715 0           ENTER;
716 0           SAVETMPS;
717 0 0         PUSHMARK(SP);
718 0 0         EXTEND(SP, 2);
719 0 0         PUSHs(result ? result : &PL_sv_undef);
720 0 0         PUSHs(error ? error : &PL_sv_undef);
721 0           PUTBACK;
722 0           call_sv(cb, G_DISCARD | G_EVAL);
723 0 0         if (SvTRUE(ERRSV)) {
    0          
724 0 0         warn("EV::Kafka::Conn: callback error: %s", SvPV_nolen(ERRSV));
725 0 0         sv_setsv(ERRSV, &PL_sv_undef);
726             }
727 0 0         FREETMPS;
728 0           LEAVE;
729             }
730 0           self->callback_depth--;
731             }
732              
733 3           static int conn_check_destroyed(ev_kafka_conn_t *self) {
734 3           return self->magic != KF_MAGIC_ALIVE;
735             }
736              
737             /* ================================================================
738             * Connection cleanup
739             * ================================================================ */
740              
741 7           static void conn_cleanup(pTHX_ ev_kafka_conn_t *self) {
742 7           conn_stop_reading(self);
743 7           conn_stop_writing(self);
744 7 50         if (self->timing) {
745 0           ev_timer_stop(self->loop, &self->timer);
746 0           self->timing = 0;
747             }
748             #ifdef HAVE_OPENSSL
749 7 50         if (self->ssl) {
750 0           SSL_free(self->ssl);
751 0           self->ssl = NULL;
752             }
753 7 50         if (self->ssl_ctx) {
754 0           SSL_CTX_free(self->ssl_ctx);
755 0           self->ssl_ctx = NULL;
756             }
757             #endif
758 7 100         if (self->fd >= 0) {
759 1           close(self->fd);
760 1           self->fd = -1;
761             }
762 7           self->state = CONN_DISCONNECTED;
763 7           }
764              
765 7           static void conn_cancel_pending(pTHX_ ev_kafka_conn_t *self, const char *err) {
766 7           SV *err_sv = newSVpv(err, 0);
767             ngx_queue_t *q;
768              
769 7 50         while (!ngx_queue_empty(&self->cb_queue)) {
770 0           q = ngx_queue_head(&self->cb_queue);
771 0           ngx_queue_remove(q);
772 0           ev_kafka_conn_cb_t *cbt = ngx_queue_data(q, ev_kafka_conn_cb_t, queue);
773 0           self->pending_count--;
774 0 0         if (cbt->cb && !cbt->internal) {
    0          
775 0           conn_invoke_cb(aTHX_ self, cbt->cb, NULL, sv_2mortal(newSVsv(err_sv)));
776 0 0         if (conn_check_destroyed(self)) {
777 0           SvREFCNT_dec(cbt->cb);
778 0           Safefree(cbt);
779 0           SvREFCNT_dec(err_sv);
780 0           return;
781             }
782             }
783 0 0         if (cbt->cb) SvREFCNT_dec(cbt->cb);
784 0           Safefree(cbt);
785             }
786 7           SvREFCNT_dec(err_sv);
787             }
788              
789 1           static void conn_handle_disconnect(pTHX_ ev_kafka_conn_t *self, const char *reason) {
790 1           conn_cleanup(aTHX_ self);
791              
792 1           conn_emit_disconnect(aTHX_ self);
793 1 50         if (conn_check_destroyed(self)) return;
794              
795 1           conn_cancel_pending(aTHX_ self, reason);
796 1 50         if (conn_check_destroyed(self)) return;
797              
798 1 50         if (!self->intentional_disconnect && self->auto_reconnect) {
    50          
799 0           conn_schedule_reconnect(aTHX_ self);
800             }
801             }
802              
803             /* ================================================================
804             * Reconnection
805             * ================================================================ */
806              
807 0           static void conn_reconnect_timer_cb(EV_P_ ev_timer *w, int revents) {
808 0           ev_kafka_conn_t *self = (ev_kafka_conn_t *)((char *)w - offsetof(ev_kafka_conn_t, reconnect_timer));
809             dTHX;
810             (void)loop;
811             (void)revents;
812              
813 0           self->reconnect_timing = 0;
814 0 0         if (self->magic != KF_MAGIC_ALIVE) return;
815 0 0         if (self->state != CONN_DISCONNECTED) return;
816 0 0         if (!self->host) return;
817              
818 0           conn_start_connect(aTHX_ self, self->host, self->port, 10.0);
819             }
820              
821 0           static void conn_schedule_reconnect(pTHX_ ev_kafka_conn_t *self) {
822 0 0         if (self->reconnect_timing) return;
823 0           double delay = self->reconnect_delay_ms / 1000.0;
824 0 0         if (delay < 0.01) delay = 1.0;
825 0           ev_timer_init(&self->reconnect_timer, conn_reconnect_timer_cb, delay, 0.0);
826 0           ev_timer_start(self->loop, &self->reconnect_timer);
827 0           self->reconnect_timing = 1;
828             }
829              
830             /* ================================================================
831             * Request framing
832             * ================================================================ */
833              
834             /* Build request header + body, append to wbuf, enqueue callback.
835             * For api_version >= threshold, uses compact header (v2).
836             * ApiVersions v3+ uses flexible (v1) request header.
837             * Returns correlation_id.
838             */
839 0           static int32_t conn_send_request(pTHX_ ev_kafka_conn_t *self,
840             int16_t api_key, int16_t api_version, kf_buf_t *body, SV *cb,
841             int internal, int no_response)
842             {
843 0           int32_t corr_id = self->next_correlation_id++;
844             kf_buf_t hdr;
845 0           kf_buf_init(&hdr);
846              
847             /* Request header v1 (non-flexible): api_key, api_version, correlation_id, client_id */
848             /* Request header v2 (flexible): same + tagged_fields */
849             /* ApiVersions v3+ uses header v2 (flexible) */
850 0 0         int flexible = (api_key == API_API_VERSIONS && api_version >= 3);
    0          
851              
852 0           kf_buf_append_i16(&hdr, api_key);
853 0           kf_buf_append_i16(&hdr, api_version);
854 0           kf_buf_append_i32(&hdr, corr_id);
855              
856 0 0         if (flexible) {
857             /* compact string for client_id */
858 0           kf_buf_append_compact_string(&hdr, self->client_id, self->client_id_len);
859 0           kf_buf_append_tagged_fields(&hdr);
860             } else {
861 0           kf_buf_append_nullable_string(&hdr, self->client_id, self->client_id_len);
862             }
863              
864             /* Total size = header + body */
865 0           size_t raw_size = hdr.len + body->len;
866 0 0         if (raw_size > (size_t)INT32_MAX) {
867 0           kf_buf_free(&hdr);
868 0           croak("request too large");
869             }
870 0           int32_t total_size = (int32_t)raw_size;
871              
872             /* Compact wbuf if sent prefix wastes significant space */
873 0 0         if (self->wbuf_off > 0 && self->wbuf_off > self->wbuf_len / 2) {
    0          
874 0           self->wbuf_len -= self->wbuf_off;
875 0 0         if (self->wbuf_len > 0)
876 0           memmove(self->wbuf, self->wbuf + self->wbuf_off, self->wbuf_len);
877 0           self->wbuf_off = 0;
878             }
879              
880             /* Append to wbuf: [size:i32][header][body] */
881 0           conn_ensure_wbuf(self, self->wbuf_len + 4 + total_size);
882             {
883 0           uint32_t sz = htonl((uint32_t)total_size);
884 0           memcpy(self->wbuf + self->wbuf_len, &sz, 4);
885 0           self->wbuf_len += 4;
886             }
887 0           Copy(hdr.data, self->wbuf + self->wbuf_len, hdr.len, char);
888 0           self->wbuf_len += hdr.len;
889 0           Copy(body->data, self->wbuf + self->wbuf_len, body->len, char);
890 0           self->wbuf_len += body->len;
891              
892 0           kf_buf_free(&hdr);
893              
894             /* Enqueue callback (skip for no-response requests like acks=0) */
895 0 0         if (!no_response) {
896             ev_kafka_conn_cb_t *cbt;
897 0           Newxz(cbt, 1, ev_kafka_conn_cb_t);
898 0           cbt->correlation_id = corr_id;
899 0           cbt->api_key = api_key;
900 0           cbt->api_version = api_version;
901 0           cbt->internal = internal;
902 0 0         if (cb) {
903 0           cbt->cb = cb;
904 0           SvREFCNT_inc(cb);
905             }
906 0           ngx_queue_insert_tail(&self->cb_queue, &cbt->queue);
907 0           self->pending_count++;
908             }
909              
910 0           conn_start_writing(self);
911              
912 0           return corr_id;
913             }
914              
915             /* ================================================================
916             * ApiVersions (API 18)
917             * ================================================================ */
918              
919 0           static void conn_send_api_versions(pTHX_ ev_kafka_conn_t *self) {
920             kf_buf_t body;
921 0           kf_buf_init(&body);
922              
923             /* ApiVersions v0: empty body, most compatible */
924 0           self->state = CONN_API_VERSIONS;
925 0           conn_send_request(aTHX_ self, API_API_VERSIONS, 0, &body, NULL, 1, 0);
926 0           kf_buf_free(&body);
927 0           }
928              
929 0           static void conn_parse_api_versions_response(pTHX_ ev_kafka_conn_t *self,
930             const char *data, size_t len)
931             {
932 0           const char *p = data;
933 0           const char *end = data + len;
934             int i;
935              
936             /* Initialize all as unsupported */
937 0 0         for (i = 0; i < API_VERSIONS_MAX_KEY; i++)
938 0           self->api_versions[i] = -1;
939              
940 0 0         if (end - p < 2) goto err;
941 0           int16_t error_code = kf_read_i16(p); p += 2;
942 0 0         if (error_code != 0) {
943             char errbuf[128];
944 0           snprintf(errbuf, sizeof(errbuf), "ApiVersions error: %d", error_code);
945 0           conn_emit_error(aTHX_ self, errbuf);
946 0 0         if (conn_check_destroyed(self)) return;
947 0           conn_handle_disconnect(aTHX_ self, errbuf);
948 0           return;
949             }
950              
951             /* v0 response: array(api_key:i16, min_version:i16, max_version:i16) */
952 0 0         if (end - p < 4) goto err;
953 0           int32_t count = kf_read_i32(p); p += 4;
954              
955 0 0         for (i = 0; i < count; i++) {
956 0 0         if (end - p < 6) goto err;
957 0           int16_t key = kf_read_i16(p); p += 2;
958 0           /* int16_t min_version = kf_read_i16(p); */ p += 2;
959 0           int16_t max_version = kf_read_i16(p); p += 2;
960              
961 0 0         if (key >= 0 && key < API_VERSIONS_MAX_KEY)
    0          
962 0           self->api_versions[key] = max_version;
963             }
964              
965 0           self->api_versions_known = 1;
966              
967             /* Handshake complete (or continue to SASL) */
968 0 0         if (self->sasl_mechanism) {
969 0           self->state = CONN_SASL_HANDSHAKE;
970 0           conn_send_sasl_handshake(aTHX_ self);
971             } else {
972 0           self->state = CONN_READY;
973 0           conn_emit_connect(aTHX_ self);
974             }
975 0           return;
976              
977 0           err:
978 0           conn_emit_error(aTHX_ self, "malformed ApiVersions response");
979 0 0         if (conn_check_destroyed(self)) return;
980 0           conn_handle_disconnect(aTHX_ self, "malformed ApiVersions response");
981             }
982              
983             /* ================================================================
984             * SASL Handshake (API 17) + Authenticate (API 36)
985             * ================================================================ */
986              
987 0           static void conn_send_sasl_handshake(pTHX_ ev_kafka_conn_t *self) {
988             kf_buf_t body;
989 0           kf_buf_init(&body);
990              
991             /* SaslHandshake v1: mechanism (STRING) */
992 0           STRLEN mech_len = strlen(self->sasl_mechanism);
993 0           kf_buf_append_string(&body, self->sasl_mechanism, (int16_t)mech_len);
994              
995 0           self->state = CONN_SASL_HANDSHAKE;
996 0           conn_send_request(aTHX_ self, API_SASL_HANDSHAKE, 1, &body, NULL, 1, 0);
997 0           kf_buf_free(&body);
998 0           }
999              
1000 0           static void conn_parse_sasl_handshake_response(pTHX_ ev_kafka_conn_t *self,
1001             const char *data, size_t len)
1002             {
1003 0           const char *p = data;
1004 0           const char *end = data + len;
1005              
1006 0 0         if (end - p < 2) goto err;
1007 0           int16_t error_code = kf_read_i16(p); p += 2;
1008              
1009             /* skip mechanisms array */
1010 0 0         if (end - p < 4) goto err;
1011 0           int32_t count = kf_read_i32(p); p += 4;
1012             int32_t i;
1013 0 0         for (i = 0; i < count; i++) {
1014             const char *s; int16_t slen;
1015 0           int n = kf_read_string(p, end, &s, &slen);
1016 0 0         if (n < 0) goto err;
1017 0           p += n;
1018             }
1019              
1020 0 0         if (error_code != 0) {
1021 0           conn_emit_error(aTHX_ self, "SASL handshake failed: mechanism not supported");
1022 0 0         if (conn_check_destroyed(self)) return;
1023 0           conn_handle_disconnect(aTHX_ self, "SASL handshake failed");
1024 0           return;
1025             }
1026              
1027             /* Proceed to authenticate */
1028 0           conn_send_sasl_authenticate(aTHX_ self);
1029 0           return;
1030              
1031 0           err:
1032 0           conn_emit_error(aTHX_ self, "malformed SaslHandshake response");
1033 0 0         if (conn_check_destroyed(self)) return;
1034 0           conn_handle_disconnect(aTHX_ self, "malformed SaslHandshake response");
1035             }
1036              
1037             /* SCRAM state for multi-step SASL */
1038             #define SCRAM_STEP_CLIENT_FIRST 0
1039             #define SCRAM_STEP_CLIENT_FINAL 1
1040             #define SCRAM_STEP_DONE 2
1041              
1042 0           static void conn_send_sasl_authenticate(pTHX_ ev_kafka_conn_t *self) {
1043             kf_buf_t body;
1044 0           kf_buf_init(&body);
1045              
1046 0 0         if (self->sasl_mechanism && strcmp(self->sasl_mechanism, "PLAIN") == 0) {
    0          
1047             /* PLAIN: \0username\0password */
1048 0 0         STRLEN ulen = self->sasl_username ? strlen(self->sasl_username) : 0;
1049 0 0         STRLEN plen = self->sasl_password ? strlen(self->sasl_password) : 0;
1050 0           int32_t auth_len = 1 + (int32_t)ulen + 1 + (int32_t)plen;
1051              
1052 0           kf_buf_append_i32(&body, auth_len);
1053 0           kf_buf_append_i8(&body, 0);
1054 0 0         if (ulen > 0) kf_buf_append(&body, self->sasl_username, ulen);
1055 0           kf_buf_append_i8(&body, 0);
1056 0 0         if (plen > 0) kf_buf_append(&body, self->sasl_password, plen);
1057             }
1058             #ifdef HAVE_OPENSSL
1059 0 0         else if (self->sasl_mechanism &&
1060 0 0         (strcmp(self->sasl_mechanism, "SCRAM-SHA-256") == 0 ||
1061 0 0         strcmp(self->sasl_mechanism, "SCRAM-SHA-512") == 0)) {
1062             /* SCRAM client-first-message: n,,n=,r= */
1063             char nonce[33];
1064             {
1065             unsigned char rnd[16];
1066             int i;
1067 0           RAND_bytes(rnd, 16);
1068 0 0         for (i = 0; i < 16; i++)
1069 0           snprintf(nonce + i*2, 3, "%02x", rnd[i]);
1070 0           nonce[32] = '\0';
1071             }
1072              
1073             /* save nonce for later steps */
1074 0 0         if (self->scram_nonce) Safefree(self->scram_nonce);
1075 0           self->scram_nonce = savepv(nonce);
1076 0           self->scram_step = SCRAM_STEP_CLIENT_FIRST;
1077              
1078             kf_buf_t msg;
1079 0           kf_buf_init(&msg);
1080 0           kf_buf_append(&msg, "n,,n=", 5);
1081 0           kf_buf_append(&msg, self->sasl_username, strlen(self->sasl_username));
1082 0           kf_buf_append(&msg, ",r=", 3);
1083 0           kf_buf_append(&msg, nonce, 32);
1084              
1085             /* save client-first-message-bare for later binding */
1086 0 0         if (self->scram_client_first) Safefree(self->scram_client_first);
1087             /* bare = after "n,," */
1088 0           self->scram_client_first = savepvn(msg.data + 3, msg.len - 3);
1089 0           self->scram_client_first_len = msg.len - 3;
1090              
1091 0           kf_buf_append_i32(&body, (int32_t)msg.len);
1092 0           kf_buf_append(&body, msg.data, msg.len);
1093 0           kf_buf_free(&msg);
1094             }
1095             #endif
1096             else {
1097 0           conn_emit_error(aTHX_ self, "unsupported SASL mechanism");
1098 0           kf_buf_free(&body);
1099 0           return;
1100             }
1101              
1102 0           self->state = CONN_SASL_AUTH;
1103 0           conn_send_request(aTHX_ self, API_SASL_AUTHENTICATE, 2, &body, NULL, 1, 0);
1104 0           kf_buf_free(&body);
1105             }
1106              
1107 0           static void conn_parse_sasl_authenticate_response(pTHX_ ev_kafka_conn_t *self,
1108             const char *data, size_t len)
1109             {
1110 0           const char *p = data;
1111 0           const char *end = data + len;
1112              
1113 0 0         if (end - p < 2) goto err;
1114 0           int16_t error_code = kf_read_i16(p); p += 2;
1115              
1116 0           const char *errmsg_str = NULL;
1117 0           int16_t errmsg_len = 0;
1118             {
1119 0           int n = kf_read_string(p, end, &errmsg_str, &errmsg_len);
1120 0 0         if (n < 0) goto err;
1121 0           p += n;
1122             }
1123              
1124             /* auth_bytes */
1125 0           const char *auth_data = NULL;
1126 0           int32_t auth_data_len = 0;
1127 0 0         if (end - p >= 4) {
1128 0           auth_data_len = kf_read_i32(p); p += 4;
1129 0 0         if (auth_data_len > 0 && end - p >= auth_data_len) {
    0          
1130 0           auth_data = p;
1131 0           p += auth_data_len;
1132             }
1133             }
1134              
1135 0 0         if (error_code != 0) {
1136             char errbuf[512];
1137 0 0         if (errmsg_str && errmsg_len > 0)
    0          
1138 0           snprintf(errbuf, sizeof(errbuf), "SASL auth failed: %.*s", (int)errmsg_len, errmsg_str);
1139             else
1140 0           snprintf(errbuf, sizeof(errbuf), "SASL auth failed: error %d", error_code);
1141 0           conn_emit_error(aTHX_ self, errbuf);
1142 0 0         if (conn_check_destroyed(self)) return;
1143 0           conn_handle_disconnect(aTHX_ self, "SASL auth failed");
1144 0           return;
1145             }
1146              
1147             #ifdef HAVE_OPENSSL
1148             /* SCRAM multi-step handling */
1149 0 0         if (self->sasl_mechanism && self->scram_step == SCRAM_STEP_CLIENT_FIRST && auth_data) {
    0          
    0          
1150             /* Server-first-message: r=,s=,i= */
1151             /* Parse server response, compute proof, send client-final */
1152 0           const char *server_nonce = NULL;
1153 0           size_t server_nonce_len = 0;
1154 0           const char *salt_b64 = NULL;
1155 0           size_t salt_b64_len = 0;
1156 0           int iterations = 0;
1157             {
1158 0           const char *sp = auth_data;
1159 0           const char *se = auth_data + auth_data_len;
1160 0 0         while (sp < se) {
1161 0 0         if (sp + 2 <= se && sp[0] == 'r' && sp[1] == '=') {
    0          
    0          
1162 0           sp += 2; server_nonce = sp;
1163 0 0         while (sp < se && *sp != ',') sp++;
    0          
1164 0           server_nonce_len = sp - server_nonce;
1165 0 0         } else if (sp + 2 <= se && sp[0] == 's' && sp[1] == '=') {
    0          
    0          
1166 0           sp += 2; salt_b64 = sp;
1167 0 0         while (sp < se && *sp != ',') sp++;
    0          
1168 0           salt_b64_len = sp - salt_b64;
1169 0 0         } else if (sp + 2 <= se && sp[0] == 'i' && sp[1] == '=') {
    0          
    0          
1170 0           sp += 2;
1171 0           iterations = atoi(sp);
1172 0 0         while (sp < se && *sp != ',') sp++;
    0          
1173             }
1174 0 0         if (sp < se && *sp == ',') sp++;
    0          
1175 0           else sp++;
1176             }
1177             }
1178              
1179 0 0         if (!server_nonce || !salt_b64 || iterations <= 0) {
    0          
    0          
1180 0           conn_emit_error(aTHX_ self, "SCRAM: malformed server-first-message");
1181 0 0         if (conn_check_destroyed(self)) return;
1182 0           conn_handle_disconnect(aTHX_ self, "SCRAM auth failed");
1183 0           return;
1184             }
1185              
1186             /* RFC 5802: server nonce must start with client nonce */
1187 0 0         if (server_nonce_len < 32 ||
1188 0 0         memcmp(server_nonce, self->scram_nonce, 32) != 0) {
1189 0           conn_emit_error(aTHX_ self, "SCRAM: server nonce mismatch");
1190 0 0         if (conn_check_destroyed(self)) return;
1191 0           conn_handle_disconnect(aTHX_ self, "SCRAM auth failed");
1192 0           return;
1193             }
1194              
1195 0           int is_sha512 = (strcmp(self->sasl_mechanism, "SCRAM-SHA-512") == 0);
1196 0 0         const EVP_MD *md = is_sha512 ? EVP_sha512() : EVP_sha256();
1197 0 0         int digest_len = is_sha512 ? 64 : 32;
1198              
1199             /* Decode salt from base64 */
1200             unsigned char salt[128];
1201             int salt_len;
1202             {
1203 0           BIO *b64 = BIO_new(BIO_f_base64());
1204 0           BIO *bmem = BIO_new_mem_buf(salt_b64, (int)salt_b64_len);
1205 0           bmem = BIO_push(b64, bmem);
1206 0           BIO_set_flags(bmem, BIO_FLAGS_BASE64_NO_NL);
1207 0           salt_len = BIO_read(bmem, salt, sizeof(salt));
1208 0           BIO_free_all(bmem);
1209 0 0         if (salt_len <= 0) {
1210 0           conn_emit_error(aTHX_ self, "SCRAM: bad salt");
1211 0 0         if (conn_check_destroyed(self)) return;
1212 0           conn_handle_disconnect(aTHX_ self, "SCRAM auth failed");
1213 0           return;
1214             }
1215             }
1216              
1217             /* SaltedPassword = Hi(password, salt, iterations) using PBKDF2 */
1218             unsigned char salted_password[64];
1219 0           PKCS5_PBKDF2_HMAC(self->sasl_password, strlen(self->sasl_password),
1220             salt, salt_len, iterations, md, digest_len, salted_password);
1221              
1222             /* ClientKey = HMAC(SaltedPassword, "Client Key") */
1223             unsigned char client_key[64];
1224 0           unsigned int ck_len = digest_len;
1225 0           HMAC(md, salted_password, digest_len,
1226             (unsigned char *)"Client Key", 10, client_key, &ck_len);
1227              
1228             /* StoredKey = H(ClientKey) */
1229             unsigned char stored_key[64];
1230             {
1231 0           EVP_MD_CTX *ctx = EVP_MD_CTX_new();
1232             unsigned int sk_len;
1233 0           EVP_DigestInit_ex(ctx, md, NULL);
1234 0           EVP_DigestUpdate(ctx, client_key, digest_len);
1235 0           EVP_DigestFinal_ex(ctx, stored_key, &sk_len);
1236 0           EVP_MD_CTX_free(ctx);
1237             }
1238              
1239             /* AuthMessage = client-first-bare + "," + server-first + "," + client-final-without-proof */
1240 0           char channel_binding_b64[] = "biws"; /* base64("n,,") */
1241             kf_buf_t auth_msg;
1242 0           kf_buf_init(&auth_msg);
1243 0           kf_buf_append(&auth_msg, self->scram_client_first, self->scram_client_first_len);
1244 0           kf_buf_append(&auth_msg, ",", 1);
1245 0           kf_buf_append(&auth_msg, auth_data, auth_data_len);
1246 0           kf_buf_append(&auth_msg, ",c=", 3);
1247 0           kf_buf_append(&auth_msg, channel_binding_b64, 4);
1248 0           kf_buf_append(&auth_msg, ",r=", 3);
1249 0           kf_buf_append(&auth_msg, server_nonce, server_nonce_len);
1250              
1251             /* ClientSignature = HMAC(StoredKey, AuthMessage) */
1252             unsigned char client_sig[64];
1253 0           unsigned int cs_len = digest_len;
1254 0           HMAC(md, stored_key, digest_len,
1255 0           (unsigned char *)auth_msg.data, auth_msg.len, client_sig, &cs_len);
1256              
1257             /* ClientProof = ClientKey XOR ClientSignature */
1258             unsigned char proof[64];
1259             int di;
1260 0 0         for (di = 0; di < digest_len; di++)
1261 0           proof[di] = client_key[di] ^ client_sig[di];
1262              
1263 0           kf_buf_free(&auth_msg);
1264              
1265             /* Base64 encode proof */
1266             char proof_b64[256];
1267             {
1268 0           BIO *b64 = BIO_new(BIO_f_base64());
1269 0           BIO *bmem = BIO_new(BIO_s_mem());
1270 0           b64 = BIO_push(b64, bmem);
1271 0           BIO_set_flags(b64, BIO_FLAGS_BASE64_NO_NL);
1272 0           BIO_write(b64, proof, digest_len);
1273 0           BIO_flush(b64);
1274             BUF_MEM *bptr;
1275 0           BIO_get_mem_ptr(b64, &bptr);
1276 0 0         int plen = bptr->length < 255 ? (int)bptr->length : 255;
1277 0           memcpy(proof_b64, bptr->data, plen);
1278 0           proof_b64[plen] = '\0';
1279 0           BIO_free_all(b64);
1280             }
1281              
1282             /* Build client-final-message: c=biws,r=,p= */
1283             kf_buf_t final_msg;
1284 0           kf_buf_init(&final_msg);
1285 0           kf_buf_append(&final_msg, "c=", 2);
1286 0           kf_buf_append(&final_msg, channel_binding_b64, 4);
1287 0           kf_buf_append(&final_msg, ",r=", 3);
1288 0           kf_buf_append(&final_msg, server_nonce, server_nonce_len);
1289 0           kf_buf_append(&final_msg, ",p=", 3);
1290 0           kf_buf_append(&final_msg, proof_b64, strlen(proof_b64));
1291              
1292             /* Send client-final via SaslAuthenticate */
1293             kf_buf_t body;
1294 0           kf_buf_init(&body);
1295 0           kf_buf_append_i32(&body, (int32_t)final_msg.len);
1296 0           kf_buf_append(&body, final_msg.data, final_msg.len);
1297              
1298 0           self->scram_step = SCRAM_STEP_CLIENT_FINAL;
1299 0           conn_send_request(aTHX_ self, API_SASL_AUTHENTICATE, 2, &body, NULL, 1, 0);
1300 0           kf_buf_free(&body);
1301 0           kf_buf_free(&final_msg);
1302 0           return;
1303             }
1304              
1305 0 0         if (self->sasl_mechanism && self->scram_step == SCRAM_STEP_CLIENT_FINAL) {
    0          
1306             /* Server-final-message: v= — we just verify no error */
1307 0           self->scram_step = SCRAM_STEP_DONE;
1308             /* fall through to CONN_READY */
1309             }
1310             #endif
1311              
1312             /* Auth success — connection is ready */
1313 0           self->state = CONN_READY;
1314 0           conn_emit_connect(aTHX_ self);
1315 0           return;
1316              
1317 0           err:
1318 0           conn_emit_error(aTHX_ self, "malformed SaslAuthenticate response");
1319 0 0         if (conn_check_destroyed(self)) return;
1320 0           conn_handle_disconnect(aTHX_ self, "malformed SaslAuthenticate response");
1321             }
1322              
1323             /* ================================================================
1324             * Response dispatch
1325             * ================================================================ */
1326              
1327 0           static void conn_dispatch_response(pTHX_ ev_kafka_conn_t *self,
1328             ev_kafka_conn_cb_t *cbt, const char *data, size_t len)
1329             {
1330             /* Internal handshake responses */
1331 0 0         if (cbt->internal) {
1332 0           switch (cbt->api_key) {
1333 0           case API_API_VERSIONS:
1334 0           conn_parse_api_versions_response(aTHX_ self, data, len);
1335 0           break;
1336 0           case API_SASL_HANDSHAKE:
1337 0           conn_parse_sasl_handshake_response(aTHX_ self, data, len);
1338 0           break;
1339 0           case API_SASL_AUTHENTICATE:
1340 0           conn_parse_sasl_authenticate_response(aTHX_ self, data, len);
1341 0           break;
1342 0           default: break;
1343             }
1344 0           return;
1345             }
1346              
1347             /* User-facing responses: parse and invoke callback */
1348 0 0         if (!cbt->cb) return;
1349              
1350 0           switch (cbt->api_key) {
1351 0           case API_METADATA: {
1352             /* Parse metadata response and return as Perl hash */
1353 0           SV *result = conn_parse_metadata_response(aTHX_ self, cbt->api_version, data, len);
1354 0           conn_invoke_cb(aTHX_ self, cbt->cb, result, NULL);
1355 0           break;
1356             }
1357 0           case API_PRODUCE: {
1358 0           SV *result = conn_parse_produce_response(aTHX_ self, cbt->api_version, data, len);
1359 0           conn_invoke_cb(aTHX_ self, cbt->cb, result, NULL);
1360 0           break;
1361             }
1362 0           case API_FETCH: {
1363 0           SV *result = conn_parse_fetch_response(aTHX_ self, cbt->api_version, data, len);
1364 0           conn_invoke_cb(aTHX_ self, cbt->cb, result, NULL);
1365 0           break;
1366             }
1367 0           case API_LIST_OFFSETS: {
1368 0           SV *result = conn_parse_list_offsets_response(aTHX_ self, cbt->api_version, data, len);
1369 0           conn_invoke_cb(aTHX_ self, cbt->cb, result, NULL);
1370 0           break;
1371             }
1372 0           case API_FIND_COORDINATOR: {
1373 0           SV *result = conn_parse_find_coordinator_response(aTHX_ self, cbt->api_version, data, len);
1374 0           conn_invoke_cb(aTHX_ self, cbt->cb, result, NULL);
1375 0           break;
1376             }
1377 0           case API_JOIN_GROUP: {
1378 0           SV *result = conn_parse_join_group_response(aTHX_ self, cbt->api_version, data, len);
1379 0           conn_invoke_cb(aTHX_ self, cbt->cb, result, NULL);
1380 0           break;
1381             }
1382 0           case API_SYNC_GROUP: {
1383 0           SV *result = conn_parse_sync_group_response(aTHX_ self, cbt->api_version, data, len);
1384 0           conn_invoke_cb(aTHX_ self, cbt->cb, result, NULL);
1385 0           break;
1386             }
1387 0           case API_HEARTBEAT: {
1388 0           SV *result = conn_parse_heartbeat_response(aTHX_ self, cbt->api_version, data, len);
1389 0           conn_invoke_cb(aTHX_ self, cbt->cb, result, NULL);
1390 0           break;
1391             }
1392 0           case API_OFFSET_COMMIT: {
1393 0           SV *result = conn_parse_offset_commit_response(aTHX_ self, cbt->api_version, data, len);
1394 0           conn_invoke_cb(aTHX_ self, cbt->cb, result, NULL);
1395 0           break;
1396             }
1397 0           case API_OFFSET_FETCH: {
1398 0           SV *result = conn_parse_offset_fetch_response(aTHX_ self, cbt->api_version, data, len);
1399 0           conn_invoke_cb(aTHX_ self, cbt->cb, result, NULL);
1400 0           break;
1401             }
1402 0           case API_LEAVE_GROUP: {
1403 0           SV *result = conn_parse_leave_group_response(aTHX_ self, cbt->api_version, data, len);
1404 0           conn_invoke_cb(aTHX_ self, cbt->cb, result, NULL);
1405 0           break;
1406             }
1407 0           case API_CREATE_TOPICS: {
1408 0           SV *result = conn_parse_create_topics_response(aTHX_ self, cbt->api_version, data, len);
1409 0           conn_invoke_cb(aTHX_ self, cbt->cb, result, NULL);
1410 0           break;
1411             }
1412 0           case API_DELETE_TOPICS: {
1413 0           SV *result = conn_parse_delete_topics_response(aTHX_ self, cbt->api_version, data, len);
1414 0           conn_invoke_cb(aTHX_ self, cbt->cb, result, NULL);
1415 0           break;
1416             }
1417 0           case API_INIT_PRODUCER_ID: {
1418 0           SV *result = conn_parse_init_producer_id_response(aTHX_ self, cbt->api_version, data, len);
1419 0           conn_invoke_cb(aTHX_ self, cbt->cb, result, NULL);
1420 0           break;
1421             }
1422 0           case API_ADD_PARTITIONS_TXN: {
1423 0           SV *result = conn_parse_add_partitions_to_txn_response(aTHX_ self, cbt->api_version, data, len);
1424 0           conn_invoke_cb(aTHX_ self, cbt->cb, result, NULL);
1425 0           break;
1426             }
1427 0           case API_END_TXN: {
1428 0           SV *result = conn_parse_end_txn_response(aTHX_ self, cbt->api_version, data, len);
1429 0           conn_invoke_cb(aTHX_ self, cbt->cb, result, NULL);
1430 0           break;
1431             }
1432 0           case API_TXN_OFFSET_COMMIT: {
1433 0           SV *result = conn_parse_txn_offset_commit_response(aTHX_ self, cbt->api_version, data, len);
1434 0           conn_invoke_cb(aTHX_ self, cbt->cb, result, NULL);
1435 0           break;
1436             }
1437 0           default: {
1438             /* Unknown API — return raw bytes */
1439 0           SV *result = newSVpvn(data, len);
1440 0           conn_invoke_cb(aTHX_ self, cbt->cb, sv_2mortal(result), NULL);
1441 0           break;
1442             }
1443             }
1444             }
1445              
1446             /* ================================================================
1447             * RecordBatch encoder (for Produce requests)
1448             * ================================================================ */
1449              
1450             /* Build a single Record (within a RecordBatch).
1451             * Format: length(varint), attributes(i8=0), timestampDelta(varint),
1452             * offsetDelta(varint), key(varint-bytes), value(varint-bytes),
1453             * headers(varint array of {key:varint-str, value:varint-bytes})
1454             */
1455 0           static void kf_encode_record(kf_buf_t *b, int offset_delta, int64_t ts_delta,
1456             const char *key, STRLEN key_len, const char *value, STRLEN value_len,
1457             HV *headers)
1458             {
1459             kf_buf_t rec;
1460 0           kf_buf_init(&rec);
1461              
1462 0           kf_buf_append_i8(&rec, 0); /* attributes */
1463 0           kf_buf_append_varint(&rec, ts_delta);
1464 0           kf_buf_append_varint(&rec, offset_delta);
1465              
1466             /* key */
1467 0 0         if (key) {
1468 0           kf_buf_append_varint(&rec, (int64_t)key_len);
1469 0           kf_buf_append(&rec, key, key_len);
1470             } else {
1471 0           kf_buf_append_varint(&rec, -1);
1472             }
1473              
1474             /* value */
1475 0 0         if (value) {
1476 0           kf_buf_append_varint(&rec, (int64_t)value_len);
1477 0           kf_buf_append(&rec, value, value_len);
1478             } else {
1479 0           kf_buf_append_varint(&rec, -1);
1480             }
1481              
1482             /* headers */
1483 0 0         if (headers && HvUSEDKEYS(headers) > 0) {
    0          
    0          
1484 0 0         kf_buf_append_varint(&rec, (int64_t)HvUSEDKEYS(headers));
1485             HE *entry;
1486 0           hv_iterinit(headers);
1487 0 0         while ((entry = hv_iternext(headers))) {
1488             I32 hklen;
1489 0           const char *hkey = hv_iterkey(entry, &hklen);
1490 0           SV *hval = hv_iterval(headers, entry);
1491             STRLEN hvlen;
1492 0           const char *hvstr = SvPV(hval, hvlen);
1493              
1494 0           kf_buf_append_varint(&rec, (int64_t)hklen);
1495 0           kf_buf_append(&rec, hkey, hklen);
1496 0           kf_buf_append_varint(&rec, (int64_t)hvlen);
1497 0           kf_buf_append(&rec, hvstr, hvlen);
1498             }
1499             } else {
1500 0           kf_buf_append_varint(&rec, 0); /* 0 headers */
1501             }
1502              
1503             /* Write record: length(varint) + body */
1504 0           kf_buf_append_varint(b, (int64_t)rec.len);
1505 0           kf_buf_append(b, rec.data, rec.len);
1506 0           kf_buf_free(&rec);
1507 0           }
1508              
1509             /* Build a RecordBatch containing a single record.
1510             * Returns the complete RecordBatch bytes in *out.
1511             * Caller must kf_buf_free(out) when done.
1512             */
1513             /* Multi-record batch encoder with producer ID support */
1514 0           static void kf_encode_record_batch_multi(pTHX_ kf_buf_t *out,
1515             AV *records_av, int64_t timestamp, int compression,
1516             int64_t producer_id, int16_t producer_epoch, int32_t base_sequence,
1517             int is_transactional)
1518             {
1519             kf_buf_t records;
1520 0           kf_buf_init(&records);
1521 0           SSize_t i, count = av_len(records_av) + 1;
1522              
1523 0 0         for (i = 0; i < count; i++) {
1524 0           SV **elem = av_fetch(records_av, i, 0);
1525 0 0         if (!elem || !SvROK(*elem))
    0          
1526 0           croak("produce_batch: record element must be a hashref");
1527 0           HV *rh = (HV*)SvRV(*elem);
1528 0           SV **key_sv = hv_fetch(rh, "key", 3, 0);
1529 0           SV **val_sv = hv_fetch(rh, "value", 5, 0);
1530 0           SV **hdr_sv = hv_fetch(rh, "headers", 7, 0);
1531 0           const char *key = NULL; STRLEN key_len = 0;
1532 0           const char *val = NULL; STRLEN val_len = 0;
1533 0           HV *hdrs = NULL;
1534 0 0         if (key_sv && SvOK(*key_sv)) key = SvPV(*key_sv, key_len);
    0          
1535 0 0         if (val_sv && SvOK(*val_sv)) val = SvPV(*val_sv, val_len);
    0          
1536 0 0         if (hdr_sv && SvROK(*hdr_sv) && SvTYPE(SvRV(*hdr_sv)) == SVt_PVHV)
    0          
    0          
1537 0           hdrs = (HV*)SvRV(*hdr_sv);
1538 0           kf_encode_record(&records, (int)i, 0, key, key_len, val, val_len, hdrs);
1539             }
1540              
1541             kf_buf_t inner;
1542 0           kf_buf_init(&inner);
1543              
1544 0           int16_t attrs = (int16_t)(compression & 0x07);
1545 0 0         if (is_transactional) attrs |= 0x10; /* bit 4 = isTransactional */
1546 0           kf_buf_append_i16(&inner, attrs);
1547 0           kf_buf_append_i32(&inner, (int32_t)(count - 1)); /* lastOffsetDelta */
1548 0           kf_buf_append_i64(&inner, timestamp);
1549 0           kf_buf_append_i64(&inner, timestamp);
1550 0           kf_buf_append_i64(&inner, producer_id);
1551 0           kf_buf_append_i16(&inner, producer_epoch);
1552 0           kf_buf_append_i32(&inner, base_sequence);
1553              
1554             #ifdef HAVE_LZ4
1555             if (compression == COMPRESS_LZ4) {
1556             int max_compressed = LZ4_compressBound((int)records.len);
1557             char *compressed;
1558             Newx(compressed, max_compressed, char);
1559             int clen = LZ4_compress_default(records.data, compressed,
1560             (int)records.len, max_compressed);
1561             if (clen > 0) {
1562             kf_buf_append_i32(&inner, (int32_t)count);
1563             kf_buf_append(&inner, compressed, clen);
1564             } else {
1565             uint16_t zero = 0;
1566             memcpy(inner.data, &zero, 2);
1567             kf_buf_append_i32(&inner, (int32_t)count);
1568             kf_buf_append(&inner, records.data, records.len);
1569             }
1570             Safefree(compressed);
1571             } else
1572             #endif
1573             #ifdef HAVE_ZLIB
1574 0 0         if (compression == COMPRESS_GZIP) {
1575 0           size_t dest_cap = compressBound((uLong)records.len) + 32;
1576             char *compressed;
1577 0           Newx(compressed, dest_cap, char);
1578             z_stream zs;
1579 0           Zero(&zs, 1, z_stream);
1580 0           int zinit = deflateInit2(&zs, Z_DEFAULT_COMPRESSION, Z_DEFLATED,
1581             MAX_WBITS + 16, 8, Z_DEFAULT_STRATEGY);
1582 0           int zok = 0;
1583 0 0         if (zinit == Z_OK) {
1584 0           zs.next_in = (Bytef *)records.data;
1585 0           zs.avail_in = (uInt)records.len;
1586 0           zs.next_out = (Bytef *)compressed;
1587 0           zs.avail_out = (uInt)dest_cap;
1588 0 0         if (deflate(&zs, Z_FINISH) == Z_STREAM_END) zok = 1;
1589 0           deflateEnd(&zs);
1590             }
1591 0 0         if (zok) {
1592 0           kf_buf_append_i32(&inner, (int32_t)count);
1593 0           kf_buf_append(&inner, compressed, zs.total_out);
1594             } else {
1595 0           uint16_t zero = 0;
1596 0           memcpy(inner.data, &zero, 2);
1597 0           kf_buf_append_i32(&inner, (int32_t)count);
1598 0           kf_buf_append(&inner, records.data, records.len);
1599             }
1600 0           Safefree(compressed);
1601             } else
1602             #endif
1603             {
1604             (void)compression;
1605 0           kf_buf_append_i32(&inner, (int32_t)count);
1606 0           kf_buf_append(&inner, records.data, records.len);
1607             }
1608              
1609 0           uint32_t crc_val = crc32c(inner.data, inner.len);
1610 0           int32_t batch_length = 4 + 1 + 4 + (int32_t)inner.len;
1611              
1612 0           kf_buf_init(out);
1613 0           kf_buf_append_i64(out, 0);
1614 0           kf_buf_append_i32(out, batch_length);
1615 0           kf_buf_append_i32(out, 0);
1616 0           kf_buf_append_i8(out, 2);
1617 0           kf_buf_append_i32(out, (int32_t)crc_val);
1618 0           kf_buf_append(out, inner.data, inner.len);
1619              
1620 0           kf_buf_free(&inner);
1621 0           kf_buf_free(&records);
1622 0           }
1623              
1624             /* Single-record convenience wrapper */
1625 0           static void kf_encode_record_batch(kf_buf_t *out,
1626             const char *key, STRLEN key_len,
1627             const char *value, STRLEN value_len,
1628             HV *headers, int64_t timestamp, int compression)
1629             {
1630             kf_buf_t records;
1631 0           kf_buf_init(&records);
1632 0           kf_encode_record(&records, 0, 0, key, key_len, value, value_len, headers);
1633              
1634             kf_buf_t inner;
1635 0           kf_buf_init(&inner);
1636              
1637 0           int16_t attrs = (int16_t)(compression & 0x07);
1638 0           kf_buf_append_i16(&inner, attrs);
1639 0           kf_buf_append_i32(&inner, 0); /* lastOffsetDelta */
1640 0           kf_buf_append_i64(&inner, timestamp);
1641 0           kf_buf_append_i64(&inner, timestamp);
1642 0           kf_buf_append_i64(&inner, -1); /* producerId */
1643 0           kf_buf_append_i16(&inner, -1); /* producerEpoch */
1644 0           kf_buf_append_i32(&inner, -1); /* baseSequence */
1645              
1646             #ifdef HAVE_LZ4
1647             if (compression == COMPRESS_LZ4) {
1648             int max_compressed = LZ4_compressBound((int)records.len);
1649             char *compressed;
1650             Newx(compressed, max_compressed, char);
1651             int clen = LZ4_compress_default(records.data, compressed,
1652             (int)records.len, max_compressed);
1653             if (clen > 0) {
1654             kf_buf_append_i32(&inner, 1); /* records count */
1655             kf_buf_append(&inner, compressed, clen);
1656             } else {
1657             /* fallback to uncompressed */
1658             /* rewrite attrs to 0 */
1659             uint16_t zero = 0;
1660             memcpy(inner.data, &zero, 2);
1661             kf_buf_append_i32(&inner, 1);
1662             kf_buf_append(&inner, records.data, records.len);
1663             }
1664             Safefree(compressed);
1665             } else
1666             #endif
1667             #ifdef HAVE_ZLIB
1668 0 0         if (compression == COMPRESS_GZIP) {
1669 0           size_t dest_cap = compressBound((uLong)records.len) + 32;
1670             char *compressed;
1671 0           Newx(compressed, dest_cap, char);
1672             z_stream zs;
1673 0           Zero(&zs, 1, z_stream);
1674 0           int zinit = deflateInit2(&zs, Z_DEFAULT_COMPRESSION, Z_DEFLATED,
1675             MAX_WBITS + 16, 8, Z_DEFAULT_STRATEGY);
1676 0           int zok = 0;
1677 0 0         if (zinit == Z_OK) {
1678 0           zs.next_in = (Bytef *)records.data;
1679 0           zs.avail_in = (uInt)records.len;
1680 0           zs.next_out = (Bytef *)compressed;
1681 0           zs.avail_out = (uInt)dest_cap;
1682 0 0         if (deflate(&zs, Z_FINISH) == Z_STREAM_END) zok = 1;
1683 0           deflateEnd(&zs);
1684             }
1685 0 0         if (zok) {
1686 0           kf_buf_append_i32(&inner, 1);
1687 0           kf_buf_append(&inner, compressed, zs.total_out);
1688             } else {
1689 0           uint16_t zero = 0;
1690 0           memcpy(inner.data, &zero, 2);
1691 0           kf_buf_append_i32(&inner, 1);
1692 0           kf_buf_append(&inner, records.data, records.len);
1693             }
1694 0           Safefree(compressed);
1695             } else
1696             #endif
1697             {
1698             (void)compression;
1699 0           kf_buf_append_i32(&inner, 1); /* records count */
1700 0           kf_buf_append(&inner, records.data, records.len);
1701             }
1702              
1703 0           uint32_t crc_val = crc32c(inner.data, inner.len);
1704              
1705 0           int32_t batch_length = 4 + 1 + 4 + (int32_t)inner.len;
1706              
1707 0           kf_buf_init(out);
1708 0           kf_buf_append_i64(out, 0); /* baseOffset = 0 */
1709 0           kf_buf_append_i32(out, batch_length);
1710 0           kf_buf_append_i32(out, 0); /* partitionLeaderEpoch */
1711 0           kf_buf_append_i8(out, 2); /* magic = 2 */
1712 0           kf_buf_append_i32(out, (int32_t)crc_val); /* CRC32C */
1713 0           kf_buf_append(out, inner.data, inner.len);
1714              
1715 0           kf_buf_free(&inner);
1716 0           kf_buf_free(&records);
1717 0           }
1718              
1719             /* ================================================================
1720             * Response parsers
1721             * ================================================================ */
1722              
1723             /* Metadata response parser (API 3) */
1724 0           static SV* conn_parse_metadata_response(pTHX_ ev_kafka_conn_t *self,
1725             int16_t version, const char *data, size_t len)
1726             {
1727 0           const char *p = data;
1728 0           const char *end = data + len;
1729 0           HV *result = newHV();
1730 0           AV *brokers_av = newAV();
1731 0           AV *topics_av = newAV();
1732             int n;
1733              
1734             (void)self;
1735              
1736             /* For v9+ (flexible versions), fields use compact encoding */
1737 0           int flexible = (version >= 9);
1738              
1739 0 0         if (flexible) {
1740             /* throttle_time_ms */
1741 0 0         if (end - p < 4) goto done;
1742 0           /* int32_t throttle = kf_read_i32(p); */ p += 4;
1743              
1744             /* brokers: compact array */
1745             uint64_t raw;
1746 0           n = kf_read_uvarint(p, end, &raw);
1747 0 0         if (n < 0) goto done;
1748 0           p += n;
1749 0           int32_t broker_count = (int32_t)(raw - 1);
1750             int32_t i;
1751              
1752 0 0         for (i = 0; i < broker_count; i++) {
1753 0           HV *bh = newHV();
1754 0 0         if (end - p < 4) goto done;
1755 0           int32_t nid = kf_read_i32(p); p += 4;
1756 0           hv_store(bh, "node_id", 7, newSViv(nid), 0);
1757              
1758             const char *host; int32_t hlen;
1759 0           n = kf_read_compact_string(p, end, &host, &hlen);
1760 0 0         if (n < 0) goto done;
1761 0           p += n;
1762 0 0         hv_store(bh, "host", 4, newSVpvn(host ? host : "", host ? hlen : 0), 0);
    0          
1763              
1764 0 0         if (end - p < 4) goto done;
1765 0           int32_t port = kf_read_i32(p); p += 4;
1766 0           hv_store(bh, "port", 4, newSViv(port), 0);
1767              
1768             /* rack: compact nullable string */
1769             const char *rack; int32_t rlen;
1770 0           n = kf_read_compact_string(p, end, &rack, &rlen);
1771 0 0         if (n < 0) goto done;
1772 0           p += n;
1773              
1774             /* tagged fields */
1775 0           n = kf_skip_tagged_fields(p, end);
1776 0 0         if (n < 0) goto done;
1777 0           p += n;
1778              
1779 0           av_push(brokers_av, newRV_noinc((SV*)bh));
1780             }
1781              
1782             /* cluster_id */
1783             const char *cid; int32_t cidlen;
1784 0           n = kf_read_compact_string(p, end, &cid, &cidlen);
1785 0 0         if (n < 0) goto done;
1786 0           p += n;
1787              
1788             /* controller_id */
1789 0 0         if (end - p < 4) goto done;
1790 0           int32_t controller_id = kf_read_i32(p); p += 4;
1791 0           hv_store(result, "controller_id", 13, newSViv(controller_id), 0);
1792              
1793             /* topics: compact array */
1794 0           n = kf_read_uvarint(p, end, &raw);
1795 0 0         if (n < 0) goto done;
1796 0           p += n;
1797 0           int32_t topic_count = (int32_t)(raw - 1);
1798              
1799 0 0         for (i = 0; i < topic_count; i++) {
1800 0           HV *th = newHV();
1801 0 0         if (end - p < 2) goto done;
1802 0           int16_t terr = kf_read_i16(p); p += 2;
1803 0           hv_store(th, "error_code", 10, newSViv(terr), 0);
1804              
1805             const char *tname; int32_t tnlen;
1806 0           n = kf_read_compact_string(p, end, &tname, &tnlen);
1807 0 0         if (n < 0) goto done;
1808 0           p += n;
1809 0 0         hv_store(th, "name", 4, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
    0          
1810              
1811             /* topic_id (UUID, 16 bytes) — v10+ */
1812 0 0         if (version >= 10) {
1813 0 0         if (end - p < 16) goto done;
1814 0           p += 16;
1815             }
1816              
1817             /* is_internal */
1818 0 0         if (end - p < 1) goto done;
1819 0           p += 1;
1820              
1821             /* partitions: compact array */
1822 0           n = kf_read_uvarint(p, end, &raw);
1823 0 0         if (n < 0) goto done;
1824 0           p += n;
1825 0           int32_t part_count = (int32_t)(raw - 1);
1826 0           AV *parts_av = newAV();
1827             int32_t j;
1828              
1829 0 0         for (j = 0; j < part_count; j++) {
1830 0           HV *ph = newHV();
1831 0 0         if (end - p < 2) goto done;
1832 0           int16_t perr = kf_read_i16(p); p += 2;
1833 0           hv_store(ph, "error_code", 10, newSViv(perr), 0);
1834              
1835 0 0         if (end - p < 4) goto done;
1836 0           int32_t pid = kf_read_i32(p); p += 4;
1837 0           hv_store(ph, "partition", 9, newSViv(pid), 0);
1838              
1839 0 0         if (end - p < 4) goto done;
1840 0           int32_t leader = kf_read_i32(p); p += 4;
1841 0           hv_store(ph, "leader", 6, newSViv(leader), 0);
1842              
1843             /* leader_epoch — v7+ */
1844 0 0         if (version >= 7) {
1845 0 0         if (end - p < 4) goto done;
1846 0           p += 4;
1847             }
1848              
1849             /* replicas: compact array of i32 */
1850 0           n = kf_read_uvarint(p, end, &raw);
1851 0 0         if (n < 0) goto done;
1852 0           p += n;
1853 0           int32_t rcount = (int32_t)(raw - 1);
1854 0 0         if (end - p < (ptrdiff_t)(rcount * 4)) goto done;
1855 0           p += rcount * 4;
1856              
1857             /* isr: compact array of i32 */
1858 0           n = kf_read_uvarint(p, end, &raw);
1859 0 0         if (n < 0) goto done;
1860 0           p += n;
1861 0           rcount = (int32_t)(raw - 1);
1862 0 0         if (end - p < (ptrdiff_t)(rcount * 4)) goto done;
1863 0           p += rcount * 4;
1864              
1865             /* offline_replicas: compact array of i32 — v5+ */
1866 0 0         if (version >= 5) {
1867 0           n = kf_read_uvarint(p, end, &raw);
1868 0 0         if (n < 0) goto done;
1869 0           p += n;
1870 0           rcount = (int32_t)(raw - 1);
1871 0 0         if (end - p < (ptrdiff_t)(rcount * 4)) goto done;
1872 0           p += rcount * 4;
1873             }
1874              
1875             /* tagged fields */
1876 0           n = kf_skip_tagged_fields(p, end);
1877 0 0         if (n < 0) goto done;
1878 0           p += n;
1879              
1880 0           av_push(parts_av, newRV_noinc((SV*)ph));
1881             }
1882 0           hv_store(th, "partitions", 10, newRV_noinc((SV*)parts_av), 0);
1883              
1884             /* topic authorized operations — v8+ */
1885 0 0         if (version >= 8) {
1886 0 0         if (end - p < 4) goto done;
1887 0           p += 4;
1888             }
1889              
1890             /* tagged fields */
1891 0           n = kf_skip_tagged_fields(p, end);
1892 0 0         if (n < 0) goto done;
1893 0           p += n;
1894              
1895 0           av_push(topics_av, newRV_noinc((SV*)th));
1896             }
1897             } else {
1898             /* Non-flexible (v0-v8) — use classic STRING/ARRAY encoding */
1899             /* throttle_time_ms (v3+) */
1900 0 0         if (version >= 3) {
1901 0 0         if (end - p < 4) goto done;
1902 0           p += 4;
1903             }
1904              
1905             /* brokers array */
1906 0 0         if (end - p < 4) goto done;
1907 0           int32_t broker_count = kf_read_i32(p); p += 4;
1908             int32_t i;
1909 0 0         for (i = 0; i < broker_count; i++) {
1910 0           HV *bh = newHV();
1911 0 0         if (end - p < 4) goto done;
1912 0           int32_t nid = kf_read_i32(p); p += 4;
1913 0           hv_store(bh, "node_id", 7, newSViv(nid), 0);
1914              
1915             const char *host; int16_t hlen;
1916 0           n = kf_read_string(p, end, &host, &hlen);
1917 0 0         if (n < 0) goto done;
1918 0           p += n;
1919 0 0         hv_store(bh, "host", 4, newSVpvn(host ? host : "", host ? hlen : 0), 0);
    0          
1920              
1921 0 0         if (end - p < 4) goto done;
1922 0           int32_t port = kf_read_i32(p); p += 4;
1923 0           hv_store(bh, "port", 4, newSViv(port), 0);
1924              
1925             /* rack (v1+) */
1926 0 0         if (version >= 1) {
1927             const char *r; int16_t rlen;
1928 0           n = kf_read_string(p, end, &r, &rlen);
1929 0 0         if (n < 0) goto done;
1930 0           p += n;
1931             }
1932              
1933 0           av_push(brokers_av, newRV_noinc((SV*)bh));
1934             }
1935              
1936             /* cluster_id (v2+) */
1937 0 0         if (version >= 2) {
1938             const char *cid; int16_t cidlen;
1939 0           n = kf_read_string(p, end, &cid, &cidlen);
1940 0 0         if (n < 0) goto done;
1941 0           p += n;
1942             }
1943              
1944             /* controller_id (v1+) */
1945 0 0         if (version >= 1) {
1946 0 0         if (end - p < 4) goto done;
1947 0           int32_t cid = kf_read_i32(p); p += 4;
1948 0           hv_store(result, "controller_id", 13, newSViv(cid), 0);
1949             }
1950              
1951             /* topics array */
1952 0 0         if (end - p < 4) goto done;
1953 0           int32_t topic_count = kf_read_i32(p); p += 4;
1954 0 0         for (i = 0; i < topic_count; i++) {
1955 0           HV *th = newHV();
1956 0 0         if (end - p < 2) goto done;
1957 0           int16_t terr = kf_read_i16(p); p += 2;
1958 0           hv_store(th, "error_code", 10, newSViv(terr), 0);
1959              
1960             const char *tname; int16_t tnlen;
1961 0           n = kf_read_string(p, end, &tname, &tnlen);
1962 0 0         if (n < 0) goto done;
1963 0           p += n;
1964 0 0         hv_store(th, "name", 4, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
    0          
1965              
1966             /* is_internal (v1+) */
1967 0 0         if (version >= 1) {
1968 0 0         if (end - p < 1) goto done;
1969 0           p += 1;
1970             }
1971              
1972             /* partitions */
1973 0 0         if (end - p < 4) goto done;
1974 0           int32_t part_count = kf_read_i32(p); p += 4;
1975 0           AV *parts_av = newAV();
1976             int32_t j;
1977 0 0         for (j = 0; j < part_count; j++) {
1978 0           HV *ph = newHV();
1979 0 0         if (end - p < 2) goto done;
1980 0           int16_t perr = kf_read_i16(p); p += 2;
1981 0           hv_store(ph, "error_code", 10, newSViv(perr), 0);
1982              
1983 0 0         if (end - p < 4) goto done;
1984 0           int32_t pid = kf_read_i32(p); p += 4;
1985 0           hv_store(ph, "partition", 9, newSViv(pid), 0);
1986              
1987 0 0         if (end - p < 4) goto done;
1988 0           int32_t leader = kf_read_i32(p); p += 4;
1989 0           hv_store(ph, "leader", 6, newSViv(leader), 0);
1990              
1991             /* leader_epoch (v7+) */
1992 0 0         if (version >= 7) {
1993 0 0         if (end - p < 4) goto done;
1994 0           p += 4;
1995             }
1996              
1997             /* replicas */
1998 0 0         if (end - p < 4) goto done;
1999 0           int32_t rcount = kf_read_i32(p); p += 4;
2000 0 0         if (end - p < (ptrdiff_t)(rcount * 4)) goto done;
2001 0           p += rcount * 4;
2002              
2003             /* isr */
2004 0 0         if (end - p < 4) goto done;
2005 0           rcount = kf_read_i32(p); p += 4;
2006 0 0         if (end - p < (ptrdiff_t)(rcount * 4)) goto done;
2007 0           p += rcount * 4;
2008              
2009             /* offline_replicas (v5+) */
2010 0 0         if (version >= 5) {
2011 0 0         if (end - p < 4) goto done;
2012 0           rcount = kf_read_i32(p); p += 4;
2013 0 0         if (end - p < (ptrdiff_t)(rcount * 4)) goto done;
2014 0           p += rcount * 4;
2015             }
2016              
2017 0           av_push(parts_av, newRV_noinc((SV*)ph));
2018             }
2019 0           hv_store(th, "partitions", 10, newRV_noinc((SV*)parts_av), 0);
2020              
2021 0           av_push(topics_av, newRV_noinc((SV*)th));
2022             }
2023             }
2024              
2025 0           done:
2026 0           hv_store(result, "brokers", 7, newRV_noinc((SV*)brokers_av), 0);
2027 0           hv_store(result, "topics", 6, newRV_noinc((SV*)topics_av), 0);
2028 0           return sv_2mortal(newRV_noinc((SV*)result));
2029             }
2030              
2031             /* Stub parsers — return raw bytes wrapped in hash with error_code */
2032             /* Produce response parser (API 0, v0-v7) */
2033 0           static SV* conn_parse_produce_response(pTHX_ ev_kafka_conn_t *self,
2034             int16_t version, const char *data, size_t len)
2035             {
2036 0           const char *p = data;
2037 0           const char *end = data + len;
2038 0           HV *result = newHV();
2039 0           AV *topics_av = newAV();
2040             int n;
2041              
2042             (void)self;
2043              
2044             /* responses: ARRAY */
2045 0 0         if (end - p < 4) goto done;
2046 0           int32_t topic_count = kf_read_i32(p); p += 4;
2047             int32_t i;
2048              
2049 0 0         for (i = 0; i < topic_count; i++) {
2050 0           HV *th = newHV();
2051             const char *tname; int16_t tnlen;
2052 0           n = kf_read_string(p, end, &tname, &tnlen);
2053 0 0         if (n < 0) goto done;
2054 0           p += n;
2055 0 0         hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
    0          
2056              
2057             /* partitions: ARRAY */
2058 0 0         if (end - p < 4) goto done;
2059 0           int32_t part_count = kf_read_i32(p); p += 4;
2060 0           AV *parts_av = newAV();
2061             int32_t j;
2062              
2063 0 0         for (j = 0; j < part_count; j++) {
2064 0           HV *ph = newHV();
2065 0 0         if (end - p < 4) goto done;
2066 0           int32_t pid = kf_read_i32(p); p += 4;
2067 0           hv_store(ph, "partition", 9, newSViv(pid), 0);
2068              
2069 0 0         if (end - p < 2) goto done;
2070 0           int16_t err = kf_read_i16(p); p += 2;
2071 0           hv_store(ph, "error_code", 10, newSViv(err), 0);
2072              
2073 0 0         if (end - p < 8) goto done;
2074 0           int64_t base_offset = kf_read_i64(p); p += 8;
2075 0           hv_store(ph, "base_offset", 11, newSViv(base_offset), 0);
2076              
2077             /* log_append_time (v2+) */
2078 0 0         if (version >= 2) {
2079 0 0         if (end - p < 8) goto done;
2080 0           p += 8;
2081             }
2082              
2083             /* log_start_offset (v5+) */
2084 0 0         if (version >= 5) {
2085 0 0         if (end - p < 8) goto done;
2086 0           p += 8;
2087             }
2088              
2089 0           av_push(parts_av, newRV_noinc((SV*)ph));
2090             }
2091 0           hv_store(th, "partitions", 10, newRV_noinc((SV*)parts_av), 0);
2092 0           av_push(topics_av, newRV_noinc((SV*)th));
2093             }
2094              
2095             /* throttle_time_ms (v1+) */
2096 0 0         if (version >= 1 && end - p >= 4) {
    0          
2097 0           int32_t throttle = kf_read_i32(p); p += 4;
2098 0           hv_store(result, "throttle_time_ms", 16, newSViv(throttle), 0);
2099             }
2100              
2101 0           done:
2102 0           hv_store(result, "topics", 6, newRV_noinc((SV*)topics_av), 0);
2103 0           return sv_2mortal(newRV_noinc((SV*)result));
2104             }
2105              
2106             /* ================================================================
2107             * RecordBatch decoder (for Fetch responses)
2108             * ================================================================ */
2109              
2110             /* Decode records from a RecordBatch, push them as hashrefs onto records_av.
2111             * Returns number of records decoded, or -1 on error. */
2112 0           static int kf_decode_record_batch(pTHX_ const char *data, size_t len,
2113             AV *records_av, int64_t *out_base_offset)
2114             {
2115 0           const char *p = data;
2116 0           const char *end = data + len;
2117             int n;
2118              
2119 0 0         if (end - p < 12) return -1;
2120 0           int64_t base_offset = kf_read_i64(p); p += 8;
2121 0 0         if (out_base_offset) *out_base_offset = base_offset;
2122 0           int32_t batch_length = kf_read_i32(p); p += 4;
2123              
2124 0 0         if (end - p < batch_length) return -1;
2125 0           const char *batch_end = p + batch_length;
2126              
2127 0 0         if (batch_end - p < 9) return -1;
2128 0           /* int32_t partition_leader_epoch = kf_read_i32(p); */ p += 4;
2129 0           int8_t magic = (int8_t)*p; p += 1;
2130 0 0         if (magic != 2) return -1; /* only support magic=2 (current format) */
2131 0           /* int32_t crc = kf_read_i32(p); */ p += 4; /* skip CRC check for speed */
2132              
2133 0 0         if (batch_end - p < 36) return -1;
2134 0           int16_t attributes = kf_read_i16(p); p += 2;
2135 0           int compression_type = attributes & 0x07;
2136 0           /* int32_t last_offset_delta = kf_read_i32(p); */ p += 4;
2137 0           int64_t first_timestamp = kf_read_i64(p); p += 8;
2138 0           /* int64_t max_timestamp = kf_read_i64(p); */ p += 8;
2139 0           /* int64_t producer_id = kf_read_i64(p); */ p += 8;
2140 0           /* int16_t producer_epoch = kf_read_i16(p); */ p += 2;
2141 0           /* int32_t base_sequence = kf_read_i32(p); */ p += 4;
2142              
2143 0 0         if (batch_end - p < 4) return -1;
2144 0           int32_t record_count = kf_read_i32(p); p += 4;
2145              
2146             /* Decompress if needed */
2147 0           const char *rec_data = p;
2148 0           const char *rec_end = batch_end;
2149 0           char *decompressed = NULL;
2150              
2151 0 0         if (compression_type != COMPRESS_NONE && batch_end > p) {
    0          
2152 0           size_t compressed_len = batch_end - p;
2153 0           size_t decomp_cap = compressed_len * 4;
2154 0 0         if (decomp_cap < 4096) decomp_cap = 4096;
2155              
2156             #ifdef HAVE_ZLIB
2157 0 0         if (compression_type == COMPRESS_GZIP) {
2158 0           int zok = 0;
2159 0 0         while (!zok && decomp_cap < 64 * 1024 * 1024) {
    0          
2160 0           Newx(decompressed, decomp_cap, char);
2161             z_stream zs;
2162 0           Zero(&zs, 1, z_stream);
2163 0           int zinit = inflateInit2(&zs, MAX_WBITS + 16);
2164 0 0         if (zinit != Z_OK) {
2165 0           Safefree(decompressed);
2166 0           decompressed = NULL;
2167 0           break;
2168             }
2169 0           zs.next_in = (Bytef *)p;
2170 0           zs.avail_in = (uInt)compressed_len;
2171 0           zs.next_out = (Bytef *)decompressed;
2172 0           zs.avail_out = (uInt)decomp_cap;
2173 0           int zret = inflate(&zs, Z_FINISH);
2174 0           size_t dest_len = zs.total_out;
2175 0           inflateEnd(&zs);
2176 0 0         if (zret == Z_STREAM_END) {
2177 0           rec_data = decompressed;
2178 0           rec_end = decompressed + dest_len;
2179 0           zok = 1;
2180 0 0         } else if (zret == Z_BUF_ERROR || zret == Z_OK) {
    0          
2181 0           Safefree(decompressed);
2182 0           decompressed = NULL;
2183 0           decomp_cap *= 2;
2184             } else {
2185 0           Safefree(decompressed);
2186 0           decompressed = NULL;
2187 0           break;
2188             }
2189             }
2190             }
2191             #endif
2192             #ifdef HAVE_LZ4
2193             if (compression_type == COMPRESS_LZ4) {
2194             Newx(decompressed, decomp_cap, char);
2195             int dlen = LZ4_decompress_safe(p, decompressed,
2196             (int)compressed_len, (int)decomp_cap);
2197             if (dlen > 0) {
2198             rec_data = decompressed;
2199             rec_end = decompressed + dlen;
2200             } else {
2201             Safefree(decompressed);
2202             decompressed = NULL;
2203             }
2204             }
2205             #endif
2206             }
2207              
2208 0           const char *rp = rec_data;
2209             int32_t i;
2210 0 0         for (i = 0; i < record_count; i++) {
2211             int64_t rec_len;
2212 0           n = kf_read_varint(rp, rec_end, &rec_len);
2213 0 0         if (n < 0) { if (decompressed) Safefree(decompressed); return -1; }
    0          
2214 0           rp += n;
2215 0 0         if (rec_end - rp < rec_len) { if (decompressed) Safefree(decompressed); return -1; }
    0          
2216 0           const char *this_rec_end = rp + rec_len;
2217              
2218 0 0         if (this_rec_end - rp < 1) { if (decompressed) Safefree(decompressed); return -1; }
    0          
2219 0           /* int8_t rec_attrs = (int8_t)*rp; */ rp += 1;
2220              
2221             int64_t ts_delta;
2222 0           n = kf_read_varint(rp, this_rec_end, &ts_delta);
2223 0 0         if (n < 0) { if (decompressed) Safefree(decompressed); return -1; }
    0          
2224 0           rp += n;
2225              
2226             int64_t offset_delta;
2227 0           n = kf_read_varint(rp, this_rec_end, &offset_delta);
2228 0 0         if (n < 0) { if (decompressed) Safefree(decompressed); return -1; }
    0          
2229 0           rp += n;
2230              
2231             /* key */
2232             int64_t key_len;
2233 0           n = kf_read_varint(rp, this_rec_end, &key_len);
2234 0 0         if (n < 0) { if (decompressed) Safefree(decompressed); return -1; }
    0          
2235 0           rp += n;
2236 0           const char *key_data = NULL;
2237 0 0         if (key_len >= 0) {
2238 0 0         if (this_rec_end - rp < key_len) { if (decompressed) Safefree(decompressed); return -1; }
    0          
2239 0           key_data = rp;
2240 0           rp += key_len;
2241             }
2242              
2243             /* value */
2244             int64_t val_len;
2245 0           n = kf_read_varint(rp, this_rec_end, &val_len);
2246 0 0         if (n < 0) { if (decompressed) Safefree(decompressed); return -1; }
    0          
2247 0           rp += n;
2248 0           const char *val_data = NULL;
2249 0 0         if (val_len >= 0) {
2250 0 0         if (this_rec_end - rp < val_len) { if (decompressed) Safefree(decompressed); return -1; }
    0          
2251 0           val_data = rp;
2252 0           rp += val_len;
2253             }
2254              
2255             /* headers */
2256             int64_t hdr_count;
2257 0           n = kf_read_varint(rp, this_rec_end, &hdr_count);
2258 0 0         if (n < 0) { if (decompressed) Safefree(decompressed); return -1; }
    0          
2259 0           rp += n;
2260 0           HV *hdr_hv = NULL;
2261 0 0         if (hdr_count > 0) {
2262 0           hdr_hv = newHV();
2263             int64_t h;
2264 0 0         for (h = 0; h < hdr_count; h++) {
2265             int64_t hk_len;
2266 0           n = kf_read_varint(rp, this_rec_end, &hk_len);
2267 0 0         if (n < 0) { SvREFCNT_dec((SV*)hdr_hv); if (decompressed) Safefree(decompressed); return -1; }
    0          
2268 0           rp += n;
2269 0           const char *hk_data = rp;
2270 0 0         if (this_rec_end - rp < hk_len) { SvREFCNT_dec((SV*)hdr_hv); if (decompressed) Safefree(decompressed); return -1; }
    0          
2271 0           rp += hk_len;
2272              
2273             int64_t hv_len;
2274 0           n = kf_read_varint(rp, this_rec_end, &hv_len);
2275 0 0         if (n < 0) { SvREFCNT_dec((SV*)hdr_hv); if (decompressed) Safefree(decompressed); return -1; }
    0          
2276 0           rp += n;
2277 0           const char *hv_data = rp;
2278 0 0         if (hv_len >= 0) {
2279 0 0         if (this_rec_end - rp < hv_len) { SvREFCNT_dec((SV*)hdr_hv); if (decompressed) Safefree(decompressed); return -1; }
    0          
2280 0           rp += hv_len;
2281             }
2282              
2283 0 0         hv_store(hdr_hv, hk_data, (I32)hk_len,
2284             hv_len >= 0 ? newSVpvn(hv_data, (STRLEN)hv_len) : newSV(0), 0);
2285             }
2286             }
2287              
2288 0           HV *rec_hv = newHV();
2289 0           hv_store(rec_hv, "offset", 6, newSViv(base_offset + offset_delta), 0);
2290 0           hv_store(rec_hv, "timestamp", 9, newSViv(first_timestamp + ts_delta), 0);
2291 0 0         hv_store(rec_hv, "key", 3,
2292             key_data ? newSVpvn(key_data, (STRLEN)key_len) : newSV(0), 0);
2293 0 0         hv_store(rec_hv, "value", 5,
2294             val_data ? newSVpvn(val_data, (STRLEN)val_len) : newSV(0), 0);
2295 0 0         if (hdr_hv)
2296 0           hv_store(rec_hv, "headers", 7, newRV_noinc((SV*)hdr_hv), 0);
2297              
2298 0           av_push(records_av, newRV_noinc((SV*)rec_hv));
2299              
2300 0           rp = this_rec_end; /* skip any remaining bytes in the record */
2301             }
2302              
2303 0 0         if (decompressed) Safefree(decompressed);
2304 0           return record_count;
2305             }
2306              
2307             /* Fetch response parser (API 1, v4-v7 non-flexible) */
2308 0           static SV* conn_parse_fetch_response(pTHX_ ev_kafka_conn_t *self,
2309             int16_t version, const char *data, size_t len)
2310             {
2311 0           const char *p = data;
2312 0           const char *end = data + len;
2313 0           HV *result = newHV();
2314 0           AV *topics_av = newAV();
2315             int n;
2316              
2317             (void)self;
2318              
2319             /* throttle_time_ms (v1+) */
2320 0 0         if (version >= 1) {
2321 0 0         if (end - p < 4) goto done;
2322 0           int32_t throttle = kf_read_i32(p); p += 4;
2323 0           hv_store(result, "throttle_time_ms", 16, newSViv(throttle), 0);
2324             }
2325              
2326             /* error_code (v7+) */
2327 0 0         if (version >= 7) {
2328 0 0         if (end - p < 2) goto done;
2329 0           p += 2;
2330             }
2331              
2332             /* session_id (v7+) */
2333 0 0         if (version >= 7) {
2334 0 0         if (end - p < 4) goto done;
2335 0           p += 4;
2336             }
2337              
2338             /* responses: ARRAY */
2339 0 0         if (end - p < 4) goto done;
2340 0           int32_t topic_count = kf_read_i32(p); p += 4;
2341             int32_t i;
2342              
2343 0 0         for (i = 0; i < topic_count; i++) {
2344 0           HV *th = newHV();
2345              
2346             const char *tname; int16_t tnlen;
2347 0           n = kf_read_string(p, end, &tname, &tnlen);
2348 0 0         if (n < 0) goto done;
2349 0           p += n;
2350 0 0         hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
    0          
2351              
2352             /* partitions: ARRAY */
2353 0 0         if (end - p < 4) goto done;
2354 0           int32_t part_count = kf_read_i32(p); p += 4;
2355 0           AV *parts_av = newAV();
2356             int32_t j;
2357              
2358 0 0         for (j = 0; j < part_count; j++) {
2359 0           HV *ph = newHV();
2360              
2361 0 0         if (end - p < 4) goto done;
2362 0           int32_t pid = kf_read_i32(p); p += 4;
2363 0           hv_store(ph, "partition", 9, newSViv(pid), 0);
2364              
2365 0 0         if (end - p < 2) goto done;
2366 0           int16_t err = kf_read_i16(p); p += 2;
2367 0           hv_store(ph, "error_code", 10, newSViv(err), 0);
2368              
2369 0 0         if (end - p < 8) goto done;
2370 0           int64_t hw = kf_read_i64(p); p += 8;
2371 0           hv_store(ph, "high_watermark", 14, newSViv(hw), 0);
2372              
2373             /* last_stable_offset (v4+) */
2374 0 0         if (version >= 4) {
2375 0 0         if (end - p < 8) goto done;
2376 0           int64_t lso = kf_read_i64(p); p += 8;
2377 0           hv_store(ph, "last_stable_offset", 18, newSViv(lso), 0);
2378             }
2379              
2380             /* log_start_offset (v5+) */
2381 0 0         if (version >= 5) {
2382 0 0         if (end - p < 8) goto done;
2383 0           p += 8;
2384             }
2385              
2386             /* aborted_transactions (v4+) */
2387 0 0         if (version >= 4) {
2388 0 0         if (end - p < 4) goto done;
2389 0           int32_t at_count = kf_read_i32(p); p += 4;
2390             int32_t at;
2391 0 0         for (at = 0; at < at_count; at++) {
2392 0 0         if (end - p < 16) goto done;
2393 0           p += 16; /* producer_id(i64) + first_offset(i64) */
2394             }
2395             }
2396              
2397             /* record_set: BYTES (records data) */
2398 0 0         if (end - p < 4) goto done;
2399 0           int32_t records_size = kf_read_i32(p); p += 4;
2400              
2401 0           AV *records_av = newAV();
2402 0 0         if (records_size > 0 && end - p >= records_size) {
    0          
2403 0           const char *rp = p;
2404 0           const char *rend = p + records_size;
2405              
2406             /* May contain multiple RecordBatches */
2407 0 0         while (rp < rend && rend - rp >= 12) {
    0          
2408             int64_t bo;
2409 0           int32_t bl = kf_read_i32(rp + 8);
2410 0 0         if (bl < 0 || rend - rp < 12 + bl) break;
    0          
2411 0           kf_decode_record_batch(aTHX_ rp, 12 + (size_t)bl, records_av, &bo);
2412 0           rp += 12 + bl;
2413             }
2414              
2415 0           p += records_size;
2416 0 0         } else if (records_size > 0) {
2417 0           p = end; /* truncated */
2418             }
2419              
2420 0           hv_store(ph, "records", 7, newRV_noinc((SV*)records_av), 0);
2421 0           av_push(parts_av, newRV_noinc((SV*)ph));
2422             }
2423              
2424 0           hv_store(th, "partitions", 10, newRV_noinc((SV*)parts_av), 0);
2425 0           av_push(topics_av, newRV_noinc((SV*)th));
2426             }
2427              
2428 0           done:
2429 0           hv_store(result, "topics", 6, newRV_noinc((SV*)topics_av), 0);
2430 0           return sv_2mortal(newRV_noinc((SV*)result));
2431             }
2432              
2433             /* ListOffsets response parser (API 2, v1+) */
2434 0           static SV* conn_parse_list_offsets_response(pTHX_ ev_kafka_conn_t *self,
2435             int16_t version, const char *data, size_t len)
2436             {
2437 0           const char *p = data;
2438 0           const char *end = data + len;
2439 0           HV *result = newHV();
2440 0           AV *topics_av = newAV();
2441             int n;
2442              
2443             (void)self;
2444              
2445             /* throttle_time_ms (v2+) */
2446 0 0         if (version >= 2) {
2447 0 0         if (end - p < 4) goto done;
2448 0           p += 4;
2449             }
2450              
2451             /* topics: ARRAY */
2452 0 0         if (end - p < 4) goto done;
2453 0           int32_t topic_count = kf_read_i32(p); p += 4;
2454             int32_t i;
2455              
2456 0 0         for (i = 0; i < topic_count; i++) {
2457 0           HV *th = newHV();
2458             const char *tname; int16_t tnlen;
2459 0           n = kf_read_string(p, end, &tname, &tnlen);
2460 0 0         if (n < 0) goto done;
2461 0           p += n;
2462 0 0         hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
    0          
2463              
2464 0 0         if (end - p < 4) goto done;
2465 0           int32_t part_count = kf_read_i32(p); p += 4;
2466 0           AV *parts_av = newAV();
2467             int32_t j;
2468              
2469 0 0         for (j = 0; j < part_count; j++) {
2470 0           HV *ph = newHV();
2471 0 0         if (end - p < 4) goto done;
2472 0           int32_t pid = kf_read_i32(p); p += 4;
2473 0           hv_store(ph, "partition", 9, newSViv(pid), 0);
2474              
2475 0 0         if (end - p < 2) goto done;
2476 0           int16_t err = kf_read_i16(p); p += 2;
2477 0           hv_store(ph, "error_code", 10, newSViv(err), 0);
2478              
2479 0 0         if (version >= 1) {
2480 0 0         if (end - p < 8) goto done;
2481 0           int64_t ts = kf_read_i64(p); p += 8;
2482 0           hv_store(ph, "timestamp", 9, newSViv(ts), 0);
2483             }
2484              
2485 0 0         if (end - p < 8) goto done;
2486 0           int64_t offset = kf_read_i64(p); p += 8;
2487 0           hv_store(ph, "offset", 6, newSViv(offset), 0);
2488              
2489             /* leader_epoch (v4+) */
2490 0 0         if (version >= 4) {
2491 0 0         if (end - p < 4) goto done;
2492 0           p += 4;
2493             }
2494              
2495 0           av_push(parts_av, newRV_noinc((SV*)ph));
2496             }
2497 0           hv_store(th, "partitions", 10, newRV_noinc((SV*)parts_av), 0);
2498 0           av_push(topics_av, newRV_noinc((SV*)th));
2499             }
2500              
2501 0           done:
2502 0           hv_store(result, "topics", 6, newRV_noinc((SV*)topics_av), 0);
2503 0           return sv_2mortal(newRV_noinc((SV*)result));
2504             }
2505              
2506             /* FindCoordinator response parser (API 10, v0-v3) */
2507 0           static SV* conn_parse_find_coordinator_response(pTHX_ ev_kafka_conn_t *self,
2508             int16_t version, const char *data, size_t len)
2509             {
2510 0           const char *p = data;
2511 0           const char *end = data + len;
2512 0           HV *result = newHV();
2513             int n;
2514             (void)self;
2515              
2516             /* throttle_time_ms (v1+) */
2517 0 0         if (version >= 1) {
2518 0 0         if (end - p < 4) goto done;
2519 0           p += 4;
2520             }
2521              
2522 0 0         if (end - p < 2) goto done;
2523 0           int16_t err = kf_read_i16(p); p += 2;
2524 0           hv_store(result, "error_code", 10, newSViv(err), 0);
2525              
2526             /* error_message (v1+) */
2527 0 0         if (version >= 1) {
2528             const char *emsg; int16_t elen;
2529 0           n = kf_read_string(p, end, &emsg, &elen);
2530 0 0         if (n < 0) goto done;
2531 0           p += n;
2532 0 0         if (emsg && elen > 0)
    0          
2533 0           hv_store(result, "error_message", 13, newSVpvn(emsg, elen), 0);
2534             }
2535              
2536 0 0         if (end - p < 4) goto done;
2537 0           int32_t nid = kf_read_i32(p); p += 4;
2538 0           hv_store(result, "node_id", 7, newSViv(nid), 0);
2539              
2540             const char *host; int16_t hlen;
2541 0           n = kf_read_string(p, end, &host, &hlen);
2542 0 0         if (n < 0) goto done;
2543 0           p += n;
2544 0 0         hv_store(result, "host", 4, newSVpvn(host ? host : "", host ? hlen : 0), 0);
    0          
2545              
2546 0 0         if (end - p < 4) goto done;
2547 0           int32_t port = kf_read_i32(p); p += 4;
2548 0           hv_store(result, "port", 4, newSViv(port), 0);
2549              
2550 0           done:
2551 0           return sv_2mortal(newRV_noinc((SV*)result));
2552             }
2553              
2554             /* JoinGroup response parser (API 11, v0-v5) */
2555 0           static SV* conn_parse_join_group_response(pTHX_ ev_kafka_conn_t *self,
2556             int16_t version, const char *data, size_t len)
2557             {
2558 0           const char *p = data;
2559 0           const char *end = data + len;
2560 0           HV *result = newHV();
2561             int n;
2562             (void)self;
2563              
2564             /* throttle_time_ms (v2+) */
2565 0 0         if (version >= 2) {
2566 0 0         if (end - p < 4) goto done;
2567 0           p += 4;
2568             }
2569              
2570 0 0         if (end - p < 2) goto done;
2571 0           int16_t err = kf_read_i16(p); p += 2;
2572 0           hv_store(result, "error_code", 10, newSViv(err), 0);
2573              
2574 0 0         if (end - p < 4) goto done;
2575 0           int32_t gen = kf_read_i32(p); p += 4;
2576 0           hv_store(result, "generation_id", 13, newSViv(gen), 0);
2577              
2578             /* protocol_type (v7+) — skip for now, we use v5 max */
2579              
2580             const char *proto; int16_t plen;
2581 0           n = kf_read_string(p, end, &proto, &plen);
2582 0 0         if (n < 0) goto done;
2583 0           p += n;
2584 0 0         if (proto)
2585 0           hv_store(result, "protocol_name", 13, newSVpvn(proto, plen), 0);
2586              
2587             const char *leader; int16_t llen;
2588 0           n = kf_read_string(p, end, &leader, &llen);
2589 0 0         if (n < 0) goto done;
2590 0           p += n;
2591 0 0         if (leader)
2592 0           hv_store(result, "leader", 6, newSVpvn(leader, llen), 0);
2593              
2594             /* skip_assignment (v9+) — not applicable */
2595              
2596             const char *member_id; int16_t mlen;
2597 0           n = kf_read_string(p, end, &member_id, &mlen);
2598 0 0         if (n < 0) goto done;
2599 0           p += n;
2600 0 0         if (member_id)
2601 0           hv_store(result, "member_id", 9, newSVpvn(member_id, mlen), 0);
2602              
2603             /* members array */
2604 0 0         if (end - p < 4) goto done;
2605 0           int32_t mcount = kf_read_i32(p); p += 4;
2606 0           AV *members_av = newAV();
2607             int32_t i;
2608 0 0         for (i = 0; i < mcount; i++) {
2609 0           HV *mh = newHV();
2610              
2611             const char *mid; int16_t midlen;
2612 0           n = kf_read_string(p, end, &mid, &midlen);
2613 0 0         if (n < 0) goto done;
2614 0           p += n;
2615 0 0         if (mid)
2616 0           hv_store(mh, "member_id", 9, newSVpvn(mid, midlen), 0);
2617              
2618             /* group_instance_id (v5+) */
2619 0 0         if (version >= 5) {
2620             const char *gi; int16_t gilen;
2621 0           n = kf_read_string(p, end, &gi, &gilen);
2622 0 0         if (n < 0) goto done;
2623 0           p += n;
2624             }
2625              
2626             /* metadata: BYTES */
2627 0 0         if (end - p < 4) goto done;
2628 0           int32_t mdlen = kf_read_i32(p); p += 4;
2629 0 0         if (mdlen > 0) {
2630 0 0         if (end - p < mdlen) goto done;
2631 0           hv_store(mh, "metadata", 8, newSVpvn(p, mdlen), 0);
2632 0           p += mdlen;
2633             }
2634              
2635 0           av_push(members_av, newRV_noinc((SV*)mh));
2636             }
2637 0           hv_store(result, "members", 7, newRV_noinc((SV*)members_av), 0);
2638              
2639 0           done:
2640 0           return sv_2mortal(newRV_noinc((SV*)result));
2641             }
2642              
2643             /* SyncGroup response parser (API 14, v0-v3) */
2644 0           static SV* conn_parse_sync_group_response(pTHX_ ev_kafka_conn_t *self,
2645             int16_t version, const char *data, size_t len)
2646             {
2647 0           const char *p = data;
2648 0           const char *end = data + len;
2649 0           HV *result = newHV();
2650             (void)self;
2651              
2652             /* throttle_time_ms (v1+) */
2653 0 0         if (version >= 1) {
2654 0 0         if (end - p < 4) goto done;
2655 0           p += 4;
2656             }
2657              
2658 0 0         if (end - p < 2) goto done;
2659 0           int16_t err = kf_read_i16(p); p += 2;
2660 0           hv_store(result, "error_code", 10, newSViv(err), 0);
2661              
2662             /* assignment: BYTES */
2663 0 0         if (end - p < 4) goto done;
2664 0           int32_t alen = kf_read_i32(p); p += 4;
2665 0 0         if (alen > 0 && end - p >= alen) {
    0          
2666 0           hv_store(result, "assignment", 10, newSVpvn(p, alen), 0);
2667 0           p += alen;
2668             }
2669              
2670 0           done:
2671 0           return sv_2mortal(newRV_noinc((SV*)result));
2672             }
2673              
2674             /* Heartbeat response parser (API 12) */
2675 0           static SV* conn_parse_heartbeat_response(pTHX_ ev_kafka_conn_t *self,
2676             int16_t version, const char *data, size_t len)
2677             {
2678 0           const char *p = data;
2679 0           const char *end = data + len;
2680 0           HV *result = newHV();
2681             (void)self;
2682              
2683 0 0         if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */
    0          
2684 0 0         if (end - p >= 2) {
2685 0           int16_t err = kf_read_i16(p); p += 2;
2686 0           hv_store(result, "error_code", 10, newSViv(err), 0);
2687             }
2688              
2689 0           return sv_2mortal(newRV_noinc((SV*)result));
2690             }
2691              
2692             /* OffsetCommit response parser (API 8, v0-v7) */
2693 0           static SV* conn_parse_offset_commit_response(pTHX_ ev_kafka_conn_t *self,
2694             int16_t version, const char *data, size_t len)
2695             {
2696 0           const char *p = data;
2697 0           const char *end = data + len;
2698 0           HV *result = newHV();
2699 0           AV *topics_av = newAV();
2700             int n;
2701             (void)self;
2702              
2703 0 0         if (version >= 3 && end - p >= 4) p += 4; /* throttle_time_ms */
    0          
2704              
2705 0 0         if (end - p < 4) goto done;
2706 0           int32_t tc = kf_read_i32(p); p += 4;
2707             int32_t i;
2708 0 0         for (i = 0; i < tc; i++) {
2709 0           HV *th = newHV();
2710             const char *tname; int16_t tnlen;
2711 0           n = kf_read_string(p, end, &tname, &tnlen);
2712 0 0         if (n < 0) goto done;
2713 0           p += n;
2714 0 0         hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
    0          
2715              
2716 0 0         if (end - p < 4) goto done;
2717 0           int32_t pc = kf_read_i32(p); p += 4;
2718 0           AV *parts_av = newAV();
2719             int32_t j;
2720 0 0         for (j = 0; j < pc; j++) {
2721 0           HV *ph = newHV();
2722 0 0         if (end - p < 6) goto done;
2723 0           int32_t pid = kf_read_i32(p); p += 4;
2724 0           int16_t err = kf_read_i16(p); p += 2;
2725 0           hv_store(ph, "partition", 9, newSViv(pid), 0);
2726 0           hv_store(ph, "error_code", 10, newSViv(err), 0);
2727 0           av_push(parts_av, newRV_noinc((SV*)ph));
2728             }
2729 0           hv_store(th, "partitions", 10, newRV_noinc((SV*)parts_av), 0);
2730 0           av_push(topics_av, newRV_noinc((SV*)th));
2731             }
2732              
2733 0           done:
2734 0           hv_store(result, "topics", 6, newRV_noinc((SV*)topics_av), 0);
2735 0           return sv_2mortal(newRV_noinc((SV*)result));
2736             }
2737              
2738             /* OffsetFetch response parser (API 9, v0-v5) */
2739 0           static SV* conn_parse_offset_fetch_response(pTHX_ ev_kafka_conn_t *self,
2740             int16_t version, const char *data, size_t len)
2741             {
2742 0           const char *p = data;
2743 0           const char *end = data + len;
2744 0           HV *result = newHV();
2745 0           AV *topics_av = newAV();
2746             int n;
2747             (void)self;
2748              
2749 0 0         if (version >= 3 && end - p >= 4) p += 4; /* throttle_time_ms */
    0          
2750              
2751 0 0         if (end - p < 4) goto done;
2752 0           int32_t tc = kf_read_i32(p); p += 4;
2753             int32_t i;
2754 0 0         for (i = 0; i < tc; i++) {
2755 0           HV *th = newHV();
2756             const char *tname; int16_t tnlen;
2757 0           n = kf_read_string(p, end, &tname, &tnlen);
2758 0 0         if (n < 0) goto done;
2759 0           p += n;
2760 0 0         hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
    0          
2761              
2762 0 0         if (end - p < 4) goto done;
2763 0           int32_t pc = kf_read_i32(p); p += 4;
2764 0           AV *parts_av = newAV();
2765             int32_t j;
2766 0 0         for (j = 0; j < pc; j++) {
2767 0           HV *ph = newHV();
2768 0 0         if (end - p < 4) goto done;
2769 0           int32_t pid = kf_read_i32(p); p += 4;
2770 0           hv_store(ph, "partition", 9, newSViv(pid), 0);
2771              
2772 0 0         if (end - p < 8) goto done;
2773 0           int64_t offset = kf_read_i64(p); p += 8;
2774 0           hv_store(ph, "offset", 6, newSViv(offset), 0);
2775              
2776             /* leader_epoch (v5+) */
2777 0 0         if (version >= 5 && end - p >= 4) p += 4;
    0          
2778              
2779             const char *meta_str; int16_t meta_len;
2780 0           n = kf_read_string(p, end, &meta_str, &meta_len);
2781 0 0         if (n < 0) goto done;
2782 0           p += n;
2783              
2784 0 0         if (end - p < 2) goto done;
2785 0           int16_t err = kf_read_i16(p); p += 2;
2786 0           hv_store(ph, "error_code", 10, newSViv(err), 0);
2787              
2788 0           av_push(parts_av, newRV_noinc((SV*)ph));
2789             }
2790 0           hv_store(th, "partitions", 10, newRV_noinc((SV*)parts_av), 0);
2791 0           av_push(topics_av, newRV_noinc((SV*)th));
2792             }
2793              
2794             /* error_code (v2+) */
2795 0 0         if (version >= 2 && end - p >= 2) {
    0          
2796 0           int16_t err = kf_read_i16(p); p += 2;
2797 0           hv_store(result, "error_code", 10, newSViv(err), 0);
2798             }
2799              
2800 0           done:
2801 0           hv_store(result, "topics", 6, newRV_noinc((SV*)topics_av), 0);
2802 0           return sv_2mortal(newRV_noinc((SV*)result));
2803             }
2804              
2805             /* LeaveGroup response parser (API 13, v0-v3) */
2806 0           static SV* conn_parse_leave_group_response(pTHX_ ev_kafka_conn_t *self,
2807             int16_t version, const char *data, size_t len)
2808             {
2809 0           const char *p = data;
2810 0           const char *end = data + len;
2811 0           HV *result = newHV();
2812             (void)self;
2813              
2814 0 0         if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */
    0          
2815 0 0         if (end - p >= 2) {
2816 0           int16_t err = kf_read_i16(p); p += 2;
2817 0           hv_store(result, "error_code", 10, newSViv(err), 0);
2818             }
2819              
2820 0           return sv_2mortal(newRV_noinc((SV*)result));
2821             }
2822              
2823             /* CreateTopics response parser (API 19, v0-v4) */
2824 0           static SV* conn_parse_create_topics_response(pTHX_ ev_kafka_conn_t *self,
2825             int16_t version, const char *data, size_t len)
2826             {
2827 0           const char *p = data;
2828 0           const char *end = data + len;
2829 0           HV *result = newHV();
2830 0           AV *topics_av = newAV();
2831             int n;
2832             (void)self;
2833              
2834 0 0         if (version >= 2 && end - p >= 4) p += 4; /* throttle_time_ms */
    0          
2835              
2836 0 0         if (end - p < 4) goto done;
2837 0           int32_t tc = kf_read_i32(p); p += 4;
2838             int32_t i;
2839 0 0         for (i = 0; i < tc; i++) {
2840 0           HV *th = newHV();
2841             const char *tname; int16_t tnlen;
2842 0           n = kf_read_string(p, end, &tname, &tnlen);
2843 0 0         if (n < 0) goto done;
2844 0           p += n;
2845 0 0         hv_store(th, "name", 4, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
    0          
2846              
2847 0 0         if (end - p < 2) goto done;
2848 0           int16_t err = kf_read_i16(p); p += 2;
2849 0           hv_store(th, "error_code", 10, newSViv(err), 0);
2850              
2851             /* error_message (v1+) */
2852 0 0         if (version >= 1) {
2853             const char *emsg; int16_t elen;
2854 0           n = kf_read_string(p, end, &emsg, &elen);
2855 0 0         if (n < 0) goto done;
2856 0           p += n;
2857 0 0         if (emsg && elen > 0)
    0          
2858 0           hv_store(th, "error_message", 13, newSVpvn(emsg, elen), 0);
2859             }
2860              
2861 0           av_push(topics_av, newRV_noinc((SV*)th));
2862             }
2863              
2864 0           done:
2865 0           hv_store(result, "topics", 6, newRV_noinc((SV*)topics_av), 0);
2866 0           return sv_2mortal(newRV_noinc((SV*)result));
2867             }
2868              
2869             /* DeleteTopics response parser (API 20, v0-v3) */
2870 0           static SV* conn_parse_delete_topics_response(pTHX_ ev_kafka_conn_t *self,
2871             int16_t version, const char *data, size_t len)
2872             {
2873 0           const char *p = data;
2874 0           const char *end = data + len;
2875 0           HV *result = newHV();
2876 0           AV *topics_av = newAV();
2877             int n;
2878             (void)self;
2879              
2880 0 0         if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */
    0          
2881              
2882 0 0         if (end - p < 4) goto done;
2883 0           int32_t tc = kf_read_i32(p); p += 4;
2884             int32_t i;
2885 0 0         for (i = 0; i < tc; i++) {
2886 0           HV *th = newHV();
2887             const char *tname; int16_t tnlen;
2888 0           n = kf_read_string(p, end, &tname, &tnlen);
2889 0 0         if (n < 0) goto done;
2890 0           p += n;
2891 0 0         hv_store(th, "name", 4, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
    0          
2892              
2893 0 0         if (end - p < 2) goto done;
2894 0           int16_t err = kf_read_i16(p); p += 2;
2895 0           hv_store(th, "error_code", 10, newSViv(err), 0);
2896              
2897 0           av_push(topics_av, newRV_noinc((SV*)th));
2898             }
2899              
2900 0           done:
2901 0           hv_store(result, "topics", 6, newRV_noinc((SV*)topics_av), 0);
2902 0           return sv_2mortal(newRV_noinc((SV*)result));
2903             }
2904              
2905             /* InitProducerId response parser (API 22, v0-v2) */
2906 0           static SV* conn_parse_init_producer_id_response(pTHX_ ev_kafka_conn_t *self,
2907             int16_t version, const char *data, size_t len)
2908             {
2909 0           const char *p = data;
2910 0           const char *end = data + len;
2911 0           HV *result = newHV();
2912             (void)self;
2913              
2914 0 0         if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */
    0          
2915              
2916 0 0         if (end - p < 2) goto done;
2917 0           int16_t err = kf_read_i16(p); p += 2;
2918 0           hv_store(result, "error_code", 10, newSViv(err), 0);
2919              
2920 0 0         if (end - p < 8) goto done;
2921 0           int64_t producer_id = kf_read_i64(p); p += 8;
2922 0           hv_store(result, "producer_id", 11, newSViv(producer_id), 0);
2923              
2924 0 0         if (end - p < 2) goto done;
2925 0           int16_t producer_epoch = kf_read_i16(p); p += 2;
2926 0           hv_store(result, "producer_epoch", 14, newSViv(producer_epoch), 0);
2927              
2928 0           done:
2929 0           return sv_2mortal(newRV_noinc((SV*)result));
2930             }
2931              
2932             /* AddPartitionsToTxn response parser (API 24, v0-v1) */
2933 0           static SV* conn_parse_add_partitions_to_txn_response(pTHX_ ev_kafka_conn_t *self,
2934             int16_t version, const char *data, size_t len)
2935             {
2936 0           const char *p = data;
2937 0           const char *end = data + len;
2938 0           HV *result = newHV();
2939 0           AV *topics_av = newAV();
2940             int n;
2941             (void)self; (void)version;
2942              
2943 0 0         if (end - p < 4) goto done;
2944 0           p += 4; /* throttle_time_ms */
2945              
2946 0 0         if (end - p < 4) goto done;
2947 0           int32_t tc = kf_read_i32(p); p += 4;
2948             int32_t i;
2949 0 0         for (i = 0; i < tc; i++) {
2950 0           HV *th = newHV();
2951             const char *tname; int16_t tnlen;
2952 0           n = kf_read_string(p, end, &tname, &tnlen);
2953 0 0         if (n < 0) goto done;
2954 0           p += n;
2955 0 0         hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
    0          
2956              
2957 0 0         if (end - p < 4) goto done;
2958 0           int32_t pc = kf_read_i32(p); p += 4;
2959 0           AV *parts_av = newAV();
2960             int32_t j;
2961 0 0         for (j = 0; j < pc; j++) {
2962 0           HV *ph = newHV();
2963 0 0         if (end - p < 6) goto done;
2964 0           int32_t pid = kf_read_i32(p); p += 4;
2965 0           int16_t err = kf_read_i16(p); p += 2;
2966 0           hv_store(ph, "partition", 9, newSViv(pid), 0);
2967 0           hv_store(ph, "error_code", 10, newSViv(err), 0);
2968 0           av_push(parts_av, newRV_noinc((SV*)ph));
2969             }
2970 0           hv_store(th, "partitions", 10, newRV_noinc((SV*)parts_av), 0);
2971 0           av_push(topics_av, newRV_noinc((SV*)th));
2972             }
2973              
2974 0           done:
2975 0           hv_store(result, "topics", 6, newRV_noinc((SV*)topics_av), 0);
2976 0           return sv_2mortal(newRV_noinc((SV*)result));
2977             }
2978              
2979             /* EndTxn response parser (API 26, v0-v1) */
2980 0           static SV* conn_parse_end_txn_response(pTHX_ ev_kafka_conn_t *self,
2981             int16_t version, const char *data, size_t len)
2982             {
2983 0           const char *p = data;
2984 0           const char *end = data + len;
2985 0           HV *result = newHV();
2986             (void)self; (void)version;
2987              
2988 0 0         if (end - p >= 4) p += 4; /* throttle_time_ms */
2989 0 0         if (end - p >= 2) {
2990 0           int16_t err = kf_read_i16(p); p += 2;
2991 0           hv_store(result, "error_code", 10, newSViv(err), 0);
2992             }
2993              
2994 0           return sv_2mortal(newRV_noinc((SV*)result));
2995             }
2996              
2997             /* TxnOffsetCommit response parser (API 28, v0-v2) */
2998 0           static SV* conn_parse_txn_offset_commit_response(pTHX_ ev_kafka_conn_t *self,
2999             int16_t version, const char *data, size_t len)
3000             {
3001 0           const char *p = data;
3002 0           const char *end = data + len;
3003 0           HV *result = newHV();
3004 0           AV *topics_av = newAV();
3005             int n;
3006             (void)self;
3007              
3008 0 0         if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */
    0          
3009              
3010 0 0         if (end - p < 4) goto done;
3011 0           int32_t tc = kf_read_i32(p); p += 4;
3012             int32_t i;
3013 0 0         for (i = 0; i < tc; i++) {
3014 0           HV *th = newHV();
3015             const char *tname; int16_t tnlen;
3016 0           n = kf_read_string(p, end, &tname, &tnlen);
3017 0 0         if (n < 0) goto done;
3018 0           p += n;
3019 0 0         hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
    0          
3020              
3021 0 0         if (end - p < 4) goto done;
3022 0           int32_t pc = kf_read_i32(p); p += 4;
3023 0           AV *parts_av = newAV();
3024             int32_t j;
3025 0 0         for (j = 0; j < pc; j++) {
3026 0           HV *ph = newHV();
3027 0 0         if (end - p < 6) goto done;
3028 0           int32_t pid = kf_read_i32(p); p += 4;
3029 0           int16_t err = kf_read_i16(p); p += 2;
3030 0           hv_store(ph, "partition", 9, newSViv(pid), 0);
3031 0           hv_store(ph, "error_code", 10, newSViv(err), 0);
3032 0           av_push(parts_av, newRV_noinc((SV*)ph));
3033             }
3034 0           hv_store(th, "partitions", 10, newRV_noinc((SV*)parts_av), 0);
3035 0           av_push(topics_av, newRV_noinc((SV*)th));
3036             }
3037              
3038 0           done:
3039 0           hv_store(result, "topics", 6, newRV_noinc((SV*)topics_av), 0);
3040 0           return sv_2mortal(newRV_noinc((SV*)result));
3041             }
3042              
3043             /* ================================================================
3044             * Response processing loop
3045             * ================================================================ */
3046              
3047 0           static void conn_process_responses(pTHX_ ev_kafka_conn_t *self) {
3048 0 0         while (self->rbuf_len >= 4) {
3049 0           int32_t msg_size = kf_read_i32(self->rbuf);
3050 0 0         if (msg_size < 0 || msg_size > 256 * 1024 * 1024) {
    0          
3051 0           conn_emit_error(aTHX_ self, "invalid response size");
3052 0 0         if (conn_check_destroyed(self)) return;
3053 0           conn_handle_disconnect(aTHX_ self, "invalid response size");
3054 0           return;
3055             }
3056              
3057 0 0         if (self->rbuf_len < (size_t)(4 + msg_size))
3058 0           break; /* incomplete response */
3059              
3060 0           const char *msg = self->rbuf + 4;
3061              
3062             /* correlation_id is first 4 bytes of message */
3063 0 0         if (msg_size < 4) {
3064 0           conn_emit_error(aTHX_ self, "response too short");
3065 0 0         if (conn_check_destroyed(self)) return;
3066 0           conn_handle_disconnect(aTHX_ self, "response too short");
3067 0           return;
3068             }
3069              
3070 0           int32_t corr_id = kf_read_i32(msg);
3071 0           const char *payload = msg + 4;
3072 0           size_t payload_len = (size_t)msg_size - 4;
3073              
3074             /* Find matching callback (should be head of queue — Kafka guarantees ordering) */
3075 0           ev_kafka_conn_cb_t *cbt = NULL;
3076 0 0         if (!ngx_queue_empty(&self->cb_queue)) {
3077 0           ngx_queue_t *q = ngx_queue_head(&self->cb_queue);
3078 0           cbt = ngx_queue_data(q, ev_kafka_conn_cb_t, queue);
3079 0 0         if (cbt->correlation_id != corr_id) {
3080             /* Out of order — shouldn't happen with Kafka, but handle gracefully */
3081             char errbuf[128];
3082 0           snprintf(errbuf, sizeof(errbuf),
3083             "correlation ID mismatch: expected %d, got %d",
3084             cbt->correlation_id, corr_id);
3085 0           conn_emit_error(aTHX_ self, errbuf);
3086 0 0         if (conn_check_destroyed(self)) return;
3087 0           conn_handle_disconnect(aTHX_ self, "correlation ID mismatch");
3088 0           return;
3089             }
3090 0           ngx_queue_remove(q);
3091 0           self->pending_count--;
3092             }
3093              
3094             /* Dispatch BEFORE compacting — payload points into rbuf */
3095 0           size_t consumed = 4 + (size_t)msg_size;
3096 0 0         if (cbt) {
3097 0           conn_dispatch_response(aTHX_ self, cbt, payload, payload_len);
3098 0 0         if (cbt->cb) SvREFCNT_dec(cbt->cb);
3099 0           Safefree(cbt);
3100 0 0         if (conn_check_destroyed(self)) return;
3101             }
3102              
3103             /* Compact rbuf after dispatch */
3104 0           self->rbuf_len -= consumed;
3105 0 0         if (self->rbuf_len > 0)
3106 0           memmove(self->rbuf, self->rbuf + consumed, self->rbuf_len);
3107             }
3108             }
3109              
3110             /* ================================================================
3111             * I/O callback
3112             * ================================================================ */
3113              
3114 1           static void conn_io_cb(EV_P_ ev_io *w, int revents) {
3115 1           ev_kafka_conn_t *self = (ev_kafka_conn_t *)w->data;
3116             dTHX;
3117             (void)loop;
3118              
3119 1 50         if (!self || self->magic != KF_MAGIC_ALIVE) return;
    50          
3120              
3121             /* TCP connect in progress */
3122 1 50         if (self->state == CONN_CONNECTING) {
3123 1           int err = 0;
3124 1           socklen_t errlen = sizeof(err);
3125              
3126 1 50         if (self->timing) {
3127 1           ev_timer_stop(self->loop, &self->timer);
3128 1           self->timing = 0;
3129             }
3130 1           conn_stop_writing(self);
3131              
3132 1 50         if (getsockopt(self->fd, SOL_SOCKET, SO_ERROR, &err, &errlen) < 0)
3133 0           err = errno;
3134 1 50         if (err != 0) {
3135             char errbuf[256];
3136 1           snprintf(errbuf, sizeof(errbuf), "connect: %s", strerror(err));
3137 1           conn_emit_error(aTHX_ self, errbuf);
3138 1 50         if (conn_check_destroyed(self)) return;
3139 1           conn_handle_disconnect(aTHX_ self, errbuf);
3140 1           return;
3141             }
3142              
3143 0           conn_on_connect_done(aTHX_ self);
3144 0           return;
3145             }
3146              
3147             #ifdef HAVE_OPENSSL
3148             /* TLS handshake in progress */
3149 0 0         if (self->state == CONN_TLS_HANDSHAKE) {
3150 0           int ret = SSL_connect(self->ssl);
3151 0 0         if (ret == 1) {
3152 0           conn_stop_reading(self);
3153 0           conn_stop_writing(self);
3154              
3155             /* TLS done — proceed to ApiVersions (or SASL if needed) */
3156 0           conn_send_api_versions(aTHX_ self);
3157 0           conn_start_reading(self);
3158 0           return;
3159             }
3160 0           int err = SSL_get_error(self->ssl, ret);
3161 0 0         if (err == SSL_ERROR_WANT_READ) {
3162 0           conn_stop_writing(self);
3163 0           conn_start_reading(self);
3164 0 0         } else if (err == SSL_ERROR_WANT_WRITE) {
3165 0           conn_stop_reading(self);
3166 0           conn_start_writing(self);
3167             } else {
3168 0           conn_emit_error(aTHX_ self, "SSL_connect failed");
3169 0 0         if (conn_check_destroyed(self)) return;
3170 0           conn_handle_disconnect(aTHX_ self, "SSL_connect failed");
3171             }
3172 0           return;
3173             }
3174             #endif
3175              
3176             /* Write */
3177 0 0         if (revents & EV_WRITE) {
3178 0 0         while (self->wbuf_off < self->wbuf_len) {
3179 0           ssize_t n = kf_io_write(self, self->wbuf + self->wbuf_off,
3180 0           self->wbuf_len - self->wbuf_off);
3181 0 0         if (n < 0) {
3182 0 0         if (errno == EAGAIN || errno == EWOULDBLOCK) break;
    0          
3183 0           conn_emit_error(aTHX_ self, strerror(errno));
3184 0 0         if (conn_check_destroyed(self)) return;
3185 0           conn_handle_disconnect(aTHX_ self, "write error");
3186 0           return;
3187             }
3188 0 0         if (n == 0) {
3189 0           conn_handle_disconnect(aTHX_ self, "connection closed");
3190 0           return;
3191             }
3192 0           self->wbuf_off += n;
3193             }
3194              
3195 0 0         if (self->wbuf_off >= self->wbuf_len) {
3196 0           self->wbuf_off = 0;
3197 0           self->wbuf_len = 0;
3198 0           conn_stop_writing(self);
3199             }
3200             }
3201              
3202             /* Read */
3203 0 0         if (revents & EV_READ) {
3204 0           conn_ensure_rbuf(self, self->rbuf_len + 8192);
3205 0           ssize_t n = kf_io_read(self, self->rbuf + self->rbuf_len,
3206 0           self->rbuf_cap - self->rbuf_len);
3207 0 0         if (n < 0) {
3208 0 0         if (errno == EAGAIN || errno == EWOULDBLOCK) return;
    0          
3209 0           conn_emit_error(aTHX_ self, strerror(errno));
3210 0 0         if (conn_check_destroyed(self)) return;
3211 0           conn_handle_disconnect(aTHX_ self, "read error");
3212 0           return;
3213             }
3214 0 0         if (n == 0) {
3215 0           conn_handle_disconnect(aTHX_ self, "connection closed by broker");
3216 0           return;
3217             }
3218 0           self->rbuf_len += n;
3219              
3220 0           conn_process_responses(aTHX_ self);
3221             }
3222             }
3223              
3224             /* ================================================================
3225             * Connect timer (timeout)
3226             * ================================================================ */
3227              
3228 0           static void conn_timer_cb(EV_P_ ev_timer *w, int revents) {
3229 0           ev_kafka_conn_t *self = (ev_kafka_conn_t *)w->data;
3230             dTHX;
3231             (void)loop;
3232             (void)revents;
3233              
3234 0           self->timing = 0;
3235 0 0         if (self->magic != KF_MAGIC_ALIVE) return;
3236              
3237 0           conn_emit_error(aTHX_ self, "connect timeout");
3238 0 0         if (conn_check_destroyed(self)) return;
3239 0           conn_handle_disconnect(aTHX_ self, "connect timeout");
3240             }
3241              
3242             /* ================================================================
3243             * TCP connect + handshake initiation
3244             * ================================================================ */
3245              
3246             #ifdef HAVE_OPENSSL
3247 0           static int is_ip_literal(const char *host) {
3248             struct in_addr addr4;
3249             struct in6_addr addr6;
3250 0           return (inet_pton(AF_INET, host, &addr4) == 1 ||
3251 0           inet_pton(AF_INET6, host, &addr6) == 1);
3252             }
3253             #endif
3254              
3255 0           static void conn_on_connect_done(pTHX_ ev_kafka_conn_t *self) {
3256             /* TCP connected — set up I/O watchers if not already */
3257              
3258             #ifdef HAVE_OPENSSL
3259 0 0         if (self->tls_enabled) {
3260 0           self->ssl_ctx = SSL_CTX_new(TLS_client_method());
3261 0 0         if (!self->ssl_ctx) {
3262 0           conn_emit_error(aTHX_ self, "SSL_CTX_new failed");
3263 0 0         if (conn_check_destroyed(self)) return;
3264 0           conn_handle_disconnect(aTHX_ self, "SSL_CTX_new failed");
3265 0           return;
3266             }
3267 0           SSL_CTX_set_default_verify_paths(self->ssl_ctx);
3268 0 0         if (self->tls_skip_verify)
3269 0           SSL_CTX_set_verify(self->ssl_ctx, SSL_VERIFY_NONE, NULL);
3270             else
3271 0           SSL_CTX_set_verify(self->ssl_ctx, SSL_VERIFY_PEER, NULL);
3272              
3273 0 0         if (self->tls_ca_file) {
3274 0 0         if (SSL_CTX_load_verify_locations(self->ssl_ctx, self->tls_ca_file, NULL) != 1) {
3275 0           conn_emit_error(aTHX_ self, "SSL_CTX_load_verify_locations failed");
3276 0 0         if (conn_check_destroyed(self)) return;
3277 0           conn_handle_disconnect(aTHX_ self, "SSL_CTX_load_verify_locations failed");
3278 0           return;
3279             }
3280             }
3281              
3282 0           self->ssl = SSL_new(self->ssl_ctx);
3283 0 0         if (!self->ssl) {
3284 0           conn_emit_error(aTHX_ self, "SSL_new failed");
3285 0 0         if (conn_check_destroyed(self)) return;
3286 0           conn_handle_disconnect(aTHX_ self, "SSL_new failed");
3287 0           return;
3288             }
3289 0           SSL_set_fd(self->ssl, self->fd);
3290              
3291 0 0         if (!is_ip_literal(self->host))
3292 0           SSL_set_tlsext_host_name(self->ssl, self->host);
3293              
3294 0 0         if (!self->tls_skip_verify) {
3295 0           X509_VERIFY_PARAM *param = SSL_get0_param(self->ssl);
3296 0           X509_VERIFY_PARAM_set_hostflags(param, X509_CHECK_FLAG_NO_PARTIAL_WILDCARDS);
3297 0 0         if (is_ip_literal(self->host))
3298 0           X509_VERIFY_PARAM_set1_ip_asc(param, self->host);
3299             else
3300 0           X509_VERIFY_PARAM_set1_host(param, self->host, 0);
3301             }
3302              
3303 0           self->state = CONN_TLS_HANDSHAKE;
3304 0           int ret = SSL_connect(self->ssl);
3305 0 0         if (ret == 1) {
3306             /* Immediate success */
3307 0           conn_send_api_versions(aTHX_ self);
3308 0           conn_start_reading(self);
3309 0           return;
3310             }
3311 0           int err = SSL_get_error(self->ssl, ret);
3312 0 0         if (err == SSL_ERROR_WANT_READ) {
3313 0           conn_start_reading(self);
3314 0 0         } else if (err == SSL_ERROR_WANT_WRITE) {
3315 0           conn_start_writing(self);
3316             } else {
3317 0           conn_emit_error(aTHX_ self, "SSL_connect failed");
3318 0 0         if (conn_check_destroyed(self)) return;
3319 0           conn_handle_disconnect(aTHX_ self, "SSL_connect failed");
3320             }
3321 0           return;
3322             }
3323             #endif
3324              
3325             /* No TLS — send ApiVersions directly */
3326 0           conn_send_api_versions(aTHX_ self);
3327 0           conn_start_reading(self);
3328             }
3329              
3330 1           static void conn_start_connect(pTHX_ ev_kafka_conn_t *self,
3331             const char *host, int port, double timeout)
3332             {
3333             struct addrinfo hints, *res, *rp;
3334             char port_str[16];
3335 1           int fd = -1;
3336              
3337 1 50         if (self->state != CONN_DISCONNECTED) {
3338 0           conn_cleanup(aTHX_ self);
3339             }
3340              
3341             /* Save host/port (skip if already pointing to same string, e.g. reconnect) */
3342 1 50         if (host != self->host) {
3343 1 50         if (self->host) Safefree(self->host);
3344 1           self->host = savepv(host);
3345             }
3346 1           self->port = port;
3347 1           self->intentional_disconnect = 0;
3348              
3349 1           snprintf(port_str, sizeof(port_str), "%d", port);
3350              
3351 1           memset(&hints, 0, sizeof(hints));
3352 1           hints.ai_family = AF_UNSPEC;
3353 1           hints.ai_socktype = SOCK_STREAM;
3354              
3355 1           int gai_err = getaddrinfo(host, port_str, &hints, &res);
3356 1 50         if (gai_err != 0) {
3357             char errbuf[256];
3358 0           snprintf(errbuf, sizeof(errbuf), "resolve: %s", gai_strerror(gai_err));
3359 0           conn_emit_error(aTHX_ self, errbuf);
3360 0           return;
3361             }
3362              
3363 1 50         for (rp = res; rp; rp = rp->ai_next) {
3364 1           fd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
3365 1 50         if (fd < 0) continue;
3366              
3367             /* Non-blocking */
3368 1           fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK);
3369              
3370             /* TCP_NODELAY */
3371             {
3372 1           int one = 1;
3373 1           setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one));
3374             }
3375              
3376 1           int ret = connect(fd, rp->ai_addr, rp->ai_addrlen);
3377 1 50         if (ret == 0) {
3378             /* Immediate connect */
3379 0           self->fd = fd;
3380 0           self->state = CONN_CONNECTING; /* will be advanced in on_connect_done */
3381 0           freeaddrinfo(res);
3382              
3383 0           ev_io_init(&self->rio, conn_io_cb, fd, EV_READ);
3384 0           self->rio.data = (void *)self;
3385 0           ev_io_init(&self->wio, conn_io_cb, fd, EV_WRITE);
3386 0           self->wio.data = (void *)self;
3387              
3388 0           conn_on_connect_done(aTHX_ self);
3389 0           return;
3390             }
3391              
3392 1 50         if (errno == EINPROGRESS) {
3393 1           self->fd = fd;
3394 1           self->state = CONN_CONNECTING;
3395 1           freeaddrinfo(res);
3396              
3397 1           ev_io_init(&self->rio, conn_io_cb, fd, EV_READ);
3398 1           self->rio.data = (void *)self;
3399 1           ev_io_init(&self->wio, conn_io_cb, fd, EV_WRITE);
3400 1           self->wio.data = (void *)self;
3401              
3402 1           conn_start_writing(self); /* wait for connect to complete */
3403              
3404 1 50         if (timeout > 0) {
3405 1           ev_timer_init(&self->timer, conn_timer_cb, timeout, 0.0);
3406 1           self->timer.data = (void *)self;
3407 1           ev_timer_start(self->loop, &self->timer);
3408 1           self->timing = 1;
3409             }
3410 1           return;
3411             }
3412              
3413 0           close(fd);
3414             }
3415              
3416 0           freeaddrinfo(res);
3417 0           conn_emit_error(aTHX_ self, "connect: all addresses failed");
3418             }
3419              
3420             /* ================================================================
3421             * Cluster client struct (Layer 2) — skeleton
3422             * ================================================================ */
3423              
3424             struct kf_partition_meta_s {
3425             int32_t partition_id;
3426             int32_t leader_id;
3427             int16_t error_code;
3428             };
3429              
3430             struct kf_topic_meta_s {
3431             char *name;
3432             int name_len;
3433             int16_t error_code;
3434             int num_partitions;
3435             kf_partition_meta_t *partitions;
3436             ngx_queue_t queue;
3437             };
3438              
3439             struct kf_consumer_group_s {
3440             char *group_id;
3441             char *member_id;
3442             int32_t generation_id;
3443             int state;
3444             int32_t coordinator_node_id;
3445             ev_kafka_conn_t *coordinator;
3446             ev_timer heartbeat_timer;
3447             int heartbeat_timing;
3448             double heartbeat_interval;
3449             double session_timeout;
3450             double rebalance_timeout;
3451             SV *on_assign;
3452             SV *on_revoke;
3453             AV *subscriptions;
3454             };
3455              
3456             struct ev_kafka_s {
3457             unsigned int magic;
3458             struct ev_loop *loop;
3459              
3460             ngx_queue_t brokers;
3461             int broker_count;
3462              
3463             char **bootstrap_hosts;
3464             int *bootstrap_ports;
3465             int bootstrap_count;
3466              
3467             ngx_queue_t topics;
3468             ev_timer metadata_timer;
3469             int metadata_timing;
3470             double metadata_refresh_interval;
3471             int metadata_pending;
3472              
3473             char *client_id;
3474             int client_id_len;
3475              
3476             int tls_enabled;
3477             char *tls_ca_file;
3478             int tls_skip_verify;
3479             char *sasl_mechanism;
3480             char *sasl_username;
3481             char *sasl_password;
3482              
3483             /* Producer */
3484             ngx_queue_t produce_batches;
3485             ev_timer linger_timer;
3486             int linger_timing;
3487             double linger_ms;
3488             int batch_size;
3489             int16_t acks;
3490             int32_t max_request_size;
3491             SV *partitioner;
3492             int rr_counter;
3493              
3494             /* Consumer */
3495             ngx_queue_t consume_partitions;
3496             SV *on_message;
3497             int32_t fetch_max_bytes;
3498             int32_t fetch_min_bytes;
3499             int32_t fetch_max_wait_ms;
3500             ev_timer fetch_timer;
3501             int fetch_timing;
3502              
3503             /* Consumer group */
3504             kf_consumer_group_t *group;
3505              
3506             /* Event handlers */
3507             SV *on_error;
3508             SV *on_connect;
3509              
3510             int callback_depth;
3511             };
3512              
3513             /* ================================================================
3514             * XS INTERFACE
3515             * ================================================================ */
3516              
3517             MODULE = EV::Kafka PACKAGE = EV::Kafka::Conn
3518              
3519             PROTOTYPES: DISABLE
3520              
3521             EV::Kafka::Conn
3522             _new(char *cls, SV *loop_sv)
3523             CODE:
3524             {
3525             struct ev_loop *loop;
3526             ev_kafka_conn_t *self;
3527              
3528 6 50         if (SvOK(loop_sv) && sv_derived_from(loop_sv, "EV::Loop"))
    0          
3529 0           loop = (struct ev_loop *)SvIV(SvRV(loop_sv));
3530             else
3531 6           loop = EV_DEFAULT;
3532              
3533 6           Newxz(self, 1, ev_kafka_conn_t);
3534 6           self->magic = KF_MAGIC_ALIVE;
3535 6           self->loop = loop;
3536 6           self->fd = -1;
3537 6           self->state = CONN_DISCONNECTED;
3538 6           self->node_id = -1;
3539 6           self->next_correlation_id = 1;
3540              
3541 6           Newx(self->rbuf, KF_BUF_INIT, char);
3542 6           self->rbuf_cap = KF_BUF_INIT;
3543 6           self->rbuf_len = 0;
3544 6           Newx(self->wbuf, KF_BUF_INIT, char);
3545 6           self->wbuf_cap = KF_BUF_INIT;
3546 6           self->wbuf_len = 0;
3547 6           self->wbuf_off = 0;
3548              
3549 6           ngx_queue_init(&self->cb_queue);
3550              
3551             /* Default client_id */
3552 6           self->client_id = savepv("ev-kafka");
3553 6           self->client_id_len = 8;
3554              
3555             /* Default: no API versions known */
3556             {
3557             int i;
3558 390 100         for (i = 0; i < API_VERSIONS_MAX_KEY; i++)
3559 384           self->api_versions[i] = -1;
3560             }
3561              
3562 6           self->reconnect_delay_ms = 1000;
3563              
3564 6           RETVAL = self;
3565             }
3566             OUTPUT:
3567             RETVAL
3568              
3569             void
3570             DESTROY(EV::Kafka::Conn self)
3571             CODE:
3572             {
3573 6 50         if (self->magic != KF_MAGIC_ALIVE) return;
3574              
3575 6           self->intentional_disconnect = 1;
3576 6           conn_cleanup(aTHX_ self);
3577 6           conn_cancel_pending(aTHX_ self, "destroyed");
3578              
3579 6           self->magic = KF_MAGIC_FREED;
3580              
3581 6 50         if (self->reconnect_timing) {
3582 0           ev_timer_stop(self->loop, &self->reconnect_timer);
3583 0           self->reconnect_timing = 0;
3584             }
3585              
3586 6 100         CLEAR_HANDLER(self->on_error);
3587 6 50         CLEAR_HANDLER(self->on_connect);
3588 6 50         CLEAR_HANDLER(self->on_disconnect);
3589              
3590 6 100         if (self->host) Safefree(self->host);
3591 6 50         if (self->client_id) Safefree(self->client_id);
3592 6 50         if (self->sasl_mechanism) Safefree(self->sasl_mechanism);
3593 6 50         if (self->sasl_username) Safefree(self->sasl_username);
3594 6 50         if (self->sasl_password) Safefree(self->sasl_password);
3595 6 50         if (self->scram_nonce) Safefree(self->scram_nonce);
3596 6 50         if (self->scram_client_first) Safefree(self->scram_client_first);
3597 6 50         if (self->tls_ca_file) Safefree(self->tls_ca_file);
3598 6 50         if (self->rbuf) Safefree(self->rbuf);
3599 6 50         if (self->wbuf) Safefree(self->wbuf);
3600              
3601 6           Safefree(self);
3602             }
3603              
3604             void
3605             connect(EV::Kafka::Conn self, const char *host, int port, double timeout = 0)
3606             CODE:
3607             {
3608 1           conn_start_connect(aTHX_ self, host, port, timeout);
3609             }
3610              
3611             void
3612             disconnect(EV::Kafka::Conn self)
3613             CODE:
3614             {
3615 0           self->intentional_disconnect = 1;
3616 0 0         if (self->reconnect_timing) {
3617 0           ev_timer_stop(self->loop, &self->reconnect_timer);
3618 0           self->reconnect_timing = 0;
3619             }
3620 0           conn_handle_disconnect(aTHX_ self, "disconnected");
3621             }
3622              
3623             int
3624             connected(EV::Kafka::Conn self)
3625             CODE:
3626 2 50         RETVAL = (self->state == CONN_READY) ? 1 : 0;
3627             OUTPUT:
3628             RETVAL
3629              
3630             int
3631             state(EV::Kafka::Conn self)
3632             CODE:
3633 0 0         RETVAL = self->state;
3634             OUTPUT:
3635             RETVAL
3636              
3637             int
3638             pending(EV::Kafka::Conn self)
3639             CODE:
3640 0 0         RETVAL = self->pending_count;
3641             OUTPUT:
3642             RETVAL
3643              
3644             void
3645             on_error(EV::Kafka::Conn self, SV *cb = NULL)
3646             CODE:
3647             {
3648 1 50         CLEAR_HANDLER(self->on_error);
3649 1 50         if (cb && SvOK(cb)) {
    50          
3650 1           self->on_error = newSVsv(cb);
3651             }
3652             }
3653              
3654             void
3655             on_connect(EV::Kafka::Conn self, SV *cb = NULL)
3656             CODE:
3657             {
3658 0 0         CLEAR_HANDLER(self->on_connect);
3659 0 0         if (cb && SvOK(cb)) {
    0          
3660 0           self->on_connect = newSVsv(cb);
3661             }
3662             }
3663              
3664             void
3665             on_disconnect(EV::Kafka::Conn self, SV *cb = NULL)
3666             CODE:
3667             {
3668 0 0         CLEAR_HANDLER(self->on_disconnect);
3669 0 0         if (cb && SvOK(cb)) {
    0          
3670 0           self->on_disconnect = newSVsv(cb);
3671             }
3672             }
3673              
3674             void
3675             client_id(EV::Kafka::Conn self, const char *id = NULL)
3676             CODE:
3677             {
3678 0 0         if (id) {
3679 0 0         if (self->client_id) Safefree(self->client_id);
3680 0           self->client_id = savepv(id);
3681 0           self->client_id_len = strlen(id);
3682             }
3683             }
3684              
3685             void
3686             tls(EV::Kafka::Conn self, int enable, const char *ca_file = NULL, int skip_verify = 0)
3687             CODE:
3688             {
3689 0           self->tls_enabled = enable;
3690 0 0         if (self->tls_ca_file) { Safefree(self->tls_ca_file); self->tls_ca_file = NULL; }
3691 0 0         if (ca_file) self->tls_ca_file = savepv(ca_file);
3692 0           self->tls_skip_verify = skip_verify;
3693             }
3694              
3695             void
3696             sasl(EV::Kafka::Conn self, const char *mechanism, const char *username = NULL, const char *password = NULL)
3697             CODE:
3698             {
3699 0 0         if (self->sasl_mechanism) { Safefree(self->sasl_mechanism); self->sasl_mechanism = NULL; }
3700 0 0         if (self->sasl_username) { Safefree(self->sasl_username); self->sasl_username = NULL; }
3701 0 0         if (self->sasl_password) { Safefree(self->sasl_password); self->sasl_password = NULL; }
3702 0 0         if (SvOK(ST(1))) {
3703 0           self->sasl_mechanism = savepv(mechanism);
3704 0 0         if (username) self->sasl_username = savepv(username);
3705 0 0         if (password) self->sasl_password = savepv(password);
3706             }
3707             }
3708              
3709             void
3710             auto_reconnect(EV::Kafka::Conn self, int enable, int delay_ms = 1000)
3711             CODE:
3712             {
3713 0           self->auto_reconnect = enable;
3714 0           self->reconnect_delay_ms = delay_ms;
3715             }
3716              
3717             void
3718             metadata(EV::Kafka::Conn self, SV *topics_sv, SV *cb)
3719             CODE:
3720             {
3721 1 50         if (self->state != CONN_READY)
3722 1           croak("not connected");
3723              
3724             kf_buf_t body;
3725 0           kf_buf_init(&body);
3726              
3727             /* Metadata v1-v4 (non-flexible) */
3728 0           int16_t ver = self->api_versions[API_METADATA];
3729 0 0         if (ver < 0) ver = 1;
3730 0 0         if (ver > 4) ver = 4;
3731              
3732 0 0         if (SvOK(topics_sv) && SvROK(topics_sv) && SvTYPE(SvRV(topics_sv)) == SVt_PVAV) {
    0          
    0          
3733 0           AV *topics = (AV*)SvRV(topics_sv);
3734 0           SSize_t i, count = av_len(topics) + 1;
3735 0           kf_buf_append_i32(&body, (int32_t)count);
3736 0 0         for (i = 0; i < count; i++) {
3737 0           SV **elem = av_fetch(topics, i, 0);
3738             STRLEN tlen;
3739 0           const char *tname = SvPV(*elem, tlen);
3740 0           kf_buf_append_string(&body, tname, (int16_t)tlen);
3741             }
3742             } else {
3743 0           kf_buf_append_i32(&body, -1); /* null array = all topics */
3744             }
3745              
3746             /* allow_auto_topic_creation (v4+) */
3747 0 0         if (ver >= 4)
3748 0           kf_buf_append_i8(&body, 1);
3749              
3750 0           conn_send_request(aTHX_ self, API_METADATA, ver, &body, cb, 0, 0);
3751 0           kf_buf_free(&body);
3752             }
3753              
3754             void
3755             api_versions(EV::Kafka::Conn self)
3756             PPCODE:
3757             {
3758 0 0         if (!self->api_versions_known)
3759 0           XSRETURN_UNDEF;
3760              
3761 0           HV *hv = newHV();
3762             int i;
3763 0 0         for (i = 0; i < API_VERSIONS_MAX_KEY; i++) {
3764 0 0         if (self->api_versions[i] >= 0) {
3765             char key[8];
3766 0           int klen = snprintf(key, sizeof(key), "%d", i);
3767 0           hv_store(hv, key, klen, newSViv(self->api_versions[i]), 0);
3768             }
3769             }
3770 0 0         EXTEND(SP, 1);
3771 0           mPUSHs(newRV_noinc((SV*)hv));
3772 0           XSRETURN(1);
3773             }
3774              
3775             void
3776             fetch(EV::Kafka::Conn self, const char *topic, int partition, SV *offset_sv, SV *cb, int max_bytes = 1048576)
3777             CODE:
3778             {
3779 1 50         if (self->state != CONN_READY)
3780 1           croak("not connected");
3781              
3782 0           int64_t offset = SvIV(offset_sv);
3783 0           STRLEN topic_len = strlen(topic);
3784              
3785 0           int16_t ver = self->api_versions[API_FETCH];
3786 0 0         if (ver < 0) ver = 4;
3787 0 0         if (ver > 7) ver = 7;
3788              
3789             kf_buf_t body;
3790 0           kf_buf_init(&body);
3791              
3792 0           kf_buf_append_i32(&body, -1); /* replica_id = -1 (consumer) */
3793 0           kf_buf_append_i32(&body, 500); /* max_wait_ms */
3794 0           kf_buf_append_i32(&body, 1); /* min_bytes */
3795              
3796             /* max_bytes (v3+) */
3797 0 0         if (ver >= 3)
3798 0           kf_buf_append_i32(&body, max_bytes);
3799              
3800             /* isolation_level (v4+) */
3801 0 0         if (ver >= 4)
3802 0           kf_buf_append_i8(&body, 0); /* READ_UNCOMMITTED */
3803              
3804             /* session_id + session_epoch (v7+) */
3805 0 0         if (ver >= 7) {
3806 0           kf_buf_append_i32(&body, 0); /* session_id */
3807 0           kf_buf_append_i32(&body, -1); /* session_epoch */
3808             }
3809              
3810             /* topics: ARRAY(1) */
3811 0           kf_buf_append_i32(&body, 1);
3812 0           kf_buf_append_string(&body, topic, (int16_t)topic_len);
3813              
3814             /* partitions: ARRAY(1) */
3815 0           kf_buf_append_i32(&body, 1);
3816 0           kf_buf_append_i32(&body, (int32_t)partition);
3817              
3818             /* fetch_offset */
3819 0           kf_buf_append_i64(&body, offset);
3820              
3821             /* log_start_offset (v5+) */
3822 0 0         if (ver >= 5)
3823 0           kf_buf_append_i64(&body, -1);
3824              
3825             /* partition_max_bytes */
3826 0           kf_buf_append_i32(&body, max_bytes);
3827              
3828             /* forgotten_topics_data (v7+) */
3829 0 0         if (ver >= 7)
3830 0           kf_buf_append_i32(&body, 0); /* empty array */
3831              
3832 0           conn_send_request(aTHX_ self, API_FETCH, ver, &body, cb, 0, 0);
3833 0           kf_buf_free(&body);
3834             }
3835              
3836             void
3837             fetch_multi(EV::Kafka::Conn self, SV *topics_sv, SV *cb, int max_bytes = 1048576)
3838             CODE:
3839             {
3840 0 0         if (self->state != CONN_READY)
3841 0           croak("not connected");
3842              
3843             /* topics_sv: { topic => [{partition => N, offset => N}, ...], ... } */
3844 0 0         if (!SvROK(topics_sv) || SvTYPE(SvRV(topics_sv)) != SVt_PVHV)
    0          
3845 0           croak("fetch_multi: expected hashref");
3846 0           HV *topics_hv = (HV*)SvRV(topics_sv);
3847              
3848 0           int16_t ver = self->api_versions[API_FETCH];
3849 0 0         if (ver < 0) ver = 4;
3850 0 0         if (ver > 7) ver = 7;
3851              
3852             kf_buf_t body;
3853 0           kf_buf_init(&body);
3854              
3855 0           kf_buf_append_i32(&body, -1); /* replica_id */
3856 0           kf_buf_append_i32(&body, 500); /* max_wait_ms */
3857 0           kf_buf_append_i32(&body, 1); /* min_bytes */
3858 0 0         if (ver >= 3)
3859 0           kf_buf_append_i32(&body, max_bytes);
3860 0 0         if (ver >= 4)
3861 0           kf_buf_append_i8(&body, 0); /* isolation_level */
3862 0 0         if (ver >= 7) {
3863 0           kf_buf_append_i32(&body, 0); /* session_id */
3864 0           kf_buf_append_i32(&body, -1); /* session_epoch */
3865             }
3866              
3867             /* topics array */
3868 0 0         kf_buf_append_i32(&body, (int32_t)HvUSEDKEYS(topics_hv));
3869              
3870 0           hv_iterinit(topics_hv);
3871             HE *entry;
3872 0 0         while ((entry = hv_iternext(topics_hv))) {
3873             I32 tlen;
3874 0           const char *tname = hv_iterkey(entry, &tlen);
3875 0           kf_buf_append_string(&body, tname, (int16_t)tlen);
3876              
3877 0           SV *parts_sv = hv_iterval(topics_hv, entry);
3878 0 0         if (!SvROK(parts_sv) || SvTYPE(SvRV(parts_sv)) != SVt_PVAV)
    0          
3879 0           croak("fetch_multi: value must be an arrayref");
3880 0           AV *parts_av = (AV*)SvRV(parts_sv);
3881 0           SSize_t i, pc = av_len(parts_av) + 1;
3882 0           kf_buf_append_i32(&body, (int32_t)pc);
3883              
3884 0 0         for (i = 0; i < pc; i++) {
3885 0           SV **elem = av_fetch(parts_av, i, 0);
3886 0 0         if (!elem || !SvROK(*elem))
    0          
3887 0           croak("fetch_multi: partition entry must be a hashref");
3888 0           HV *ph = (HV*)SvRV(*elem);
3889              
3890 0           SV **pid_sv = hv_fetch(ph, "partition", 9, 0);
3891 0 0         int32_t pid = pid_sv ? (int32_t)SvIV(*pid_sv) : 0;
3892 0           kf_buf_append_i32(&body, pid);
3893              
3894 0           SV **off_sv = hv_fetch(ph, "offset", 6, 0);
3895 0 0         int64_t offset = off_sv ? (int64_t)SvIV(*off_sv) : 0;
3896 0           kf_buf_append_i64(&body, offset);
3897              
3898 0 0         if (ver >= 5)
3899 0           kf_buf_append_i64(&body, -1); /* log_start_offset */
3900              
3901 0           kf_buf_append_i32(&body, max_bytes); /* partition_max_bytes */
3902             }
3903             }
3904              
3905 0 0         if (ver >= 7)
3906 0           kf_buf_append_i32(&body, 0); /* forgotten_topics_data */
3907              
3908 0           conn_send_request(aTHX_ self, API_FETCH, ver, &body, cb, 0, 0);
3909 0           kf_buf_free(&body);
3910             }
3911              
3912             void
3913             produce_batch(EV::Kafka::Conn self, const char *topic, int partition, SV *records_sv, SV *opts_sv = NULL, SV *cb = NULL)
3914             CODE:
3915             {
3916 0 0         if (self->state != CONN_READY)
3917 0           croak("not connected");
3918              
3919 0 0         if (!SvROK(records_sv) || SvTYPE(SvRV(records_sv)) != SVt_PVAV)
    0          
3920 0           croak("produce_batch: expected arrayref of records");
3921 0           AV *records_av = (AV*)SvRV(records_sv);
3922              
3923 0           int16_t acks = 1;
3924 0           int compression = COMPRESS_NONE;
3925 0           int64_t producer_id = -1;
3926 0           int16_t producer_epoch = -1;
3927 0           int32_t base_sequence = -1;
3928 0           const char *txn_id = NULL;
3929 0           STRLEN txn_id_len = 0;
3930              
3931 0 0         if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV) {
    0          
    0          
3932 0           HV *opts = (HV*)SvRV(opts_sv);
3933             SV **tmp;
3934 0 0         if ((tmp = hv_fetch(opts, "acks", 4, 0)))
3935 0           acks = (int16_t)SvIV(*tmp);
3936 0 0         if ((tmp = hv_fetch(opts, "transactional_id", 16, 0)) && SvOK(*tmp))
    0          
3937 0           txn_id = SvPV(*tmp, txn_id_len);
3938 0 0         if ((tmp = hv_fetch(opts, "compression", 11, 0))) {
3939             STRLEN clen;
3940 0           const char *cstr = SvPV(*tmp, clen);
3941 0 0         if (clen == 4 && memcmp(cstr, "none", 4) == 0) compression = COMPRESS_NONE;
    0          
3942             #ifdef HAVE_LZ4
3943             else if (clen == 3 && memcmp(cstr, "lz4", 3) == 0) compression = COMPRESS_LZ4;
3944             #endif
3945             #ifdef HAVE_ZLIB
3946 0 0         else if (clen == 4 && memcmp(cstr, "gzip", 4) == 0) compression = COMPRESS_GZIP;
    0          
3947             #endif
3948 0           else croak("unsupported compression: %.*s", (int)clen, cstr);
3949             }
3950 0 0         if ((tmp = hv_fetch(opts, "producer_id", 11, 0)))
3951 0           producer_id = (int64_t)SvIV(*tmp);
3952 0 0         if ((tmp = hv_fetch(opts, "producer_epoch", 14, 0)))
3953 0           producer_epoch = (int16_t)SvIV(*tmp);
3954 0 0         if ((tmp = hv_fetch(opts, "base_sequence", 13, 0)))
3955 0           base_sequence = (int32_t)SvIV(*tmp);
3956 0 0         } else if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVCV) {
    0          
    0          
3957 0           cb = opts_sv;
3958 0           opts_sv = NULL;
3959             }
3960              
3961             struct timeval tv;
3962 0           gettimeofday(&tv, NULL);
3963 0           int64_t timestamp = (int64_t)tv.tv_sec * 1000 + tv.tv_usec / 1000;
3964              
3965             kf_buf_t batch;
3966 0           kf_encode_record_batch_multi(aTHX_ &batch, records_av, timestamp,
3967             compression, producer_id, producer_epoch, base_sequence,
3968             txn_id != NULL ? 1 : 0);
3969              
3970 0           int16_t ver = self->api_versions[API_PRODUCE];
3971 0 0         if (ver < 0) ver = 3;
3972 0 0         if (ver > 7) ver = 7;
3973              
3974 0           STRLEN topic_len = strlen(topic);
3975              
3976             kf_buf_t body;
3977 0           kf_buf_init(&body);
3978              
3979 0 0         if (ver >= 3)
3980 0 0         kf_buf_append_nullable_string(&body, txn_id, txn_id ? (int16_t)txn_id_len : 0);
3981              
3982 0           kf_buf_append_i16(&body, acks);
3983 0           kf_buf_append_i32(&body, 30000);
3984              
3985 0           kf_buf_append_i32(&body, 1);
3986 0           kf_buf_append_string(&body, topic, (int16_t)topic_len);
3987 0           kf_buf_append_i32(&body, 1);
3988 0           kf_buf_append_i32(&body, (int32_t)partition);
3989 0           kf_buf_append_i32(&body, (int32_t)batch.len);
3990 0           kf_buf_append(&body, batch.data, batch.len);
3991              
3992 0 0         conn_send_request(aTHX_ self, API_PRODUCE, ver, &body,
3993 0 0         (cb && SvOK(cb)) ? cb : NULL, 0, (acks == 0) ? 1 : 0);
3994              
3995 0           kf_buf_free(&body);
3996 0           kf_buf_free(&batch);
3997             }
3998              
3999             void
4000             list_offsets(EV::Kafka::Conn self, const char *topic, int partition, SV *timestamp_sv, SV *cb)
4001             CODE:
4002             {
4003 0 0         if (self->state != CONN_READY)
4004 0           croak("not connected");
4005              
4006 0           int64_t timestamp = SvIV(timestamp_sv);
4007 0           STRLEN topic_len = strlen(topic);
4008              
4009             /* -2 = earliest, -1 = latest */
4010 0           int16_t ver = self->api_versions[API_LIST_OFFSETS];
4011 0 0         if (ver < 0) ver = 1;
4012 0 0         if (ver > 5) ver = 5;
4013              
4014             kf_buf_t body;
4015 0           kf_buf_init(&body);
4016              
4017 0           kf_buf_append_i32(&body, -1); /* replica_id */
4018              
4019             /* isolation_level (v2+) */
4020 0 0         if (ver >= 2)
4021 0           kf_buf_append_i8(&body, 0);
4022              
4023             /* topics: ARRAY(1) */
4024 0           kf_buf_append_i32(&body, 1);
4025 0           kf_buf_append_string(&body, topic, (int16_t)topic_len);
4026              
4027             /* partitions: ARRAY(1) */
4028 0           kf_buf_append_i32(&body, 1);
4029 0           kf_buf_append_i32(&body, (int32_t)partition);
4030              
4031             /* current_leader_epoch (v4+) */
4032 0 0         if (ver >= 4)
4033 0           kf_buf_append_i32(&body, -1);
4034              
4035 0           kf_buf_append_i64(&body, timestamp);
4036              
4037 0           conn_send_request(aTHX_ self, API_LIST_OFFSETS, ver, &body, cb, 0, 0);
4038 0           kf_buf_free(&body);
4039             }
4040              
4041             void
4042             produce(EV::Kafka::Conn self, const char *topic, int partition, SV *key_sv, SV *value_sv, SV *opts_sv = NULL, SV *cb = NULL)
4043             CODE:
4044             {
4045 1 50         if (self->state != CONN_READY)
4046 1           croak("not connected");
4047              
4048             /* Handle optional opts hash: produce($topic, $part, $key, $val, \%opts, $cb)
4049             * or produce($topic, $part, $key, $val, $cb)
4050             */
4051 0           HV *headers = NULL;
4052 0           int16_t acks = 1;
4053 0           int compression = COMPRESS_NONE;
4054              
4055 0 0         if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV) {
    0          
    0          
4056 0           HV *opts = (HV*)SvRV(opts_sv);
4057             SV **tmp;
4058 0 0         if ((tmp = hv_fetch(opts, "headers", 7, 0)) && SvROK(*tmp) && SvTYPE(SvRV(*tmp)) == SVt_PVHV)
    0          
    0          
4059 0           headers = (HV*)SvRV(*tmp);
4060 0 0         if ((tmp = hv_fetch(opts, "acks", 4, 0)))
4061 0           acks = (int16_t)SvIV(*tmp);
4062 0 0         if ((tmp = hv_fetch(opts, "compression", 11, 0))) {
4063             STRLEN clen;
4064 0           const char *cstr = SvPV(*tmp, clen);
4065 0 0         if (clen == 4 && memcmp(cstr, "none", 4) == 0) compression = COMPRESS_NONE;
    0          
4066             #ifdef HAVE_LZ4
4067             else if (clen == 3 && memcmp(cstr, "lz4", 3) == 0) compression = COMPRESS_LZ4;
4068             #endif
4069             #ifdef HAVE_ZLIB
4070 0 0         else if (clen == 4 && memcmp(cstr, "gzip", 4) == 0) compression = COMPRESS_GZIP;
    0          
4071             #endif
4072 0           else croak("unsupported compression: %.*s", (int)clen, cstr);
4073             }
4074 0 0         } else if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVCV) {
    0          
    0          
4075             /* opts_sv is actually the callback */
4076 0           cb = opts_sv;
4077 0           opts_sv = NULL;
4078             }
4079              
4080 0           const char *key = NULL;
4081 0           STRLEN key_len = 0;
4082 0 0         if (SvOK(key_sv))
4083 0           key = SvPV(key_sv, key_len);
4084              
4085 0           const char *value = NULL;
4086 0           STRLEN value_len = 0;
4087 0 0         if (SvOK(value_sv))
4088 0           value = SvPV(value_sv, value_len);
4089              
4090             /* Build RecordBatch */
4091             int64_t timestamp;
4092             {
4093 0           SV **ts_tmp = NULL;
4094 0 0         if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV)
    0          
    0          
4095 0           ts_tmp = hv_fetch((HV*)SvRV(opts_sv), "timestamp", 9, 0);
4096 0 0         if (ts_tmp && SvOK(*ts_tmp)) {
    0          
4097 0           timestamp = (int64_t)SvIV(*ts_tmp);
4098             } else {
4099             struct timeval tv;
4100 0           gettimeofday(&tv, NULL);
4101 0           timestamp = (int64_t)tv.tv_sec * 1000 + tv.tv_usec / 1000;
4102             }
4103             }
4104              
4105             kf_buf_t batch;
4106 0           kf_encode_record_batch(&batch, key, key_len, value, value_len, headers, timestamp, compression);
4107              
4108             /* Use Produce version — cap at v7 */
4109 0           int16_t ver = self->api_versions[API_PRODUCE];
4110 0 0         if (ver < 0) ver = 3;
4111 0 0         if (ver > 7) ver = 7;
4112              
4113 0           STRLEN topic_len = strlen(topic);
4114              
4115             kf_buf_t body;
4116 0           kf_buf_init(&body);
4117              
4118             /* transactional_id (v3+): nullable string = null */
4119 0 0         if (ver >= 3)
4120 0           kf_buf_append_nullable_string(&body, NULL, 0);
4121              
4122 0           kf_buf_append_i16(&body, acks); /* acks */
4123 0           kf_buf_append_i32(&body, 30000); /* timeout_ms = 30s */
4124              
4125             /* topic_data: ARRAY(1) */
4126 0           kf_buf_append_i32(&body, 1); /* 1 topic */
4127 0           kf_buf_append_string(&body, topic, (int16_t)topic_len);
4128              
4129             /* partition_data: ARRAY(1) */
4130 0           kf_buf_append_i32(&body, 1); /* 1 partition */
4131 0           kf_buf_append_i32(&body, partition);
4132              
4133             /* record_set: BYTES (i32 length + record_batch) */
4134 0           kf_buf_append_i32(&body, (int32_t)batch.len);
4135 0           kf_buf_append(&body, batch.data, batch.len);
4136              
4137 0 0         conn_send_request(aTHX_ self, API_PRODUCE, ver, &body,
4138 0 0         (cb && SvOK(cb)) ? cb : NULL, 0, (acks == 0) ? 1 : 0);
4139              
4140 0           kf_buf_free(&body);
4141 0           kf_buf_free(&batch);
4142             }
4143              
4144             void
4145             find_coordinator(EV::Kafka::Conn self, const char *group_id, SV *cb, int key_type = 0)
4146             CODE:
4147             {
4148 0 0         if (self->state != CONN_READY)
4149 0           croak("not connected");
4150              
4151 0           int16_t ver = self->api_versions[API_FIND_COORDINATOR];
4152 0 0         if (ver < 0) ver = 0;
4153 0 0         if (ver > 2) ver = 2;
4154              
4155             kf_buf_t body;
4156 0           kf_buf_init(&body);
4157              
4158 0           STRLEN glen = strlen(group_id);
4159 0           kf_buf_append_string(&body, group_id, (int16_t)glen);
4160              
4161             /* key_type (v1+): 0=group, 1=transaction */
4162 0 0         if (ver >= 1)
4163 0           kf_buf_append_i8(&body, (int8_t)key_type);
4164              
4165 0           conn_send_request(aTHX_ self, API_FIND_COORDINATOR, ver, &body, cb, 0, 0);
4166 0           kf_buf_free(&body);
4167             }
4168              
4169             void
4170             join_group(EV::Kafka::Conn self, const char *group_id, const char *member_id, SV *topics_sv, SV *cb, int session_timeout_ms = 30000, int rebalance_timeout_ms = 60000, SV *group_instance_id_sv = NULL)
4171             CODE:
4172             {
4173 0 0         if (self->state != CONN_READY)
4174 0           croak("not connected");
4175              
4176 0           int16_t ver = self->api_versions[API_JOIN_GROUP];
4177 0 0         if (ver < 0) ver = 1;
4178 0 0         if (ver > 5) ver = 5;
4179              
4180             kf_buf_t body;
4181 0           kf_buf_init(&body);
4182              
4183 0           STRLEN glen = strlen(group_id);
4184 0           kf_buf_append_string(&body, group_id, (int16_t)glen);
4185 0           kf_buf_append_i32(&body, session_timeout_ms);
4186              
4187             /* rebalance_timeout_ms (v1+) */
4188 0 0         if (ver >= 1)
4189 0           kf_buf_append_i32(&body, rebalance_timeout_ms);
4190              
4191 0           STRLEN mlen = strlen(member_id);
4192 0           kf_buf_append_string(&body, member_id, (int16_t)mlen);
4193              
4194             /* group_instance_id (v5+) */
4195 0 0         if (ver >= 5) {
4196 0 0         if (group_instance_id_sv && SvOK(group_instance_id_sv)) {
    0          
4197             STRLEN gilen;
4198 0           const char *gi = SvPV(group_instance_id_sv, gilen);
4199 0           kf_buf_append_nullable_string(&body, gi, (int16_t)gilen);
4200             } else {
4201 0           kf_buf_append_nullable_string(&body, NULL, 0);
4202             }
4203             }
4204              
4205             /* protocol_type = "consumer" */
4206 0           kf_buf_append_string(&body, "consumer", 8);
4207              
4208             /* protocols: ARRAY(1) */
4209 0           kf_buf_append_i32(&body, 1);
4210              
4211             /* protocol name = "sticky" */
4212 0           kf_buf_append_string(&body, "sticky", 6);
4213              
4214             /* protocol metadata (ConsumerProtocol subscription) */
4215             /* Version:0, Topics:array, UserData:null */
4216             kf_buf_t meta;
4217 0           kf_buf_init(&meta);
4218 0           kf_buf_append_i16(&meta, 0); /* version */
4219              
4220 0           AV *topics = (AV*)SvRV(topics_sv);
4221 0           SSize_t i, tc = av_len(topics) + 1;
4222 0           kf_buf_append_i32(&meta, (int32_t)tc);
4223 0 0         for (i = 0; i < tc; i++) {
4224 0           SV **elem = av_fetch(topics, i, 0);
4225             STRLEN tlen;
4226 0           const char *tname = SvPV(*elem, tlen);
4227 0           kf_buf_append_string(&meta, tname, (int16_t)tlen);
4228             }
4229 0           kf_buf_append_nullable_bytes(&meta, NULL, 0); /* user_data = null */
4230              
4231 0           kf_buf_append_bytes(&body, meta.data, (int32_t)meta.len);
4232 0           kf_buf_free(&meta);
4233              
4234 0           conn_send_request(aTHX_ self, API_JOIN_GROUP, ver, &body, cb, 0, 0);
4235 0           kf_buf_free(&body);
4236             }
4237              
4238             void
4239             sync_group(EV::Kafka::Conn self, const char *group_id, int generation_id, const char *member_id, SV *assignments_sv, SV *cb, SV *group_instance_id_sv = NULL)
4240             CODE:
4241             {
4242 0 0         if (self->state != CONN_READY)
4243 0           croak("not connected");
4244              
4245 0           int16_t ver = self->api_versions[API_SYNC_GROUP];
4246 0 0         if (ver < 0) ver = 0;
4247 0 0         if (ver > 3) ver = 3;
4248              
4249             kf_buf_t body;
4250 0           kf_buf_init(&body);
4251              
4252 0           STRLEN glen = strlen(group_id);
4253 0           kf_buf_append_string(&body, group_id, (int16_t)glen);
4254 0           kf_buf_append_i32(&body, generation_id);
4255              
4256 0           STRLEN mlen = strlen(member_id);
4257 0           kf_buf_append_string(&body, member_id, (int16_t)mlen);
4258              
4259             /* group_instance_id (v3+) */
4260 0 0         if (ver >= 3) {
4261 0 0         if (group_instance_id_sv && SvOK(group_instance_id_sv)) {
    0          
4262             STRLEN gilen;
4263 0           const char *gi = SvPV(group_instance_id_sv, gilen);
4264 0           kf_buf_append_nullable_string(&body, gi, (int16_t)gilen);
4265             } else {
4266 0           kf_buf_append_nullable_string(&body, NULL, 0);
4267             }
4268             }
4269              
4270             /* assignments: ARRAY of {member_id, assignment_bytes} */
4271 0 0         if (SvOK(assignments_sv) && SvROK(assignments_sv)
    0          
4272 0 0         && SvTYPE(SvRV(assignments_sv)) == SVt_PVAV) {
4273 0           AV *assigns = (AV*)SvRV(assignments_sv);
4274 0           SSize_t i, ac = av_len(assigns) + 1;
4275 0           kf_buf_append_i32(&body, (int32_t)ac);
4276              
4277 0 0         for (i = 0; i < ac; i++) {
4278 0           SV **elem = av_fetch(assigns, i, 0);
4279 0 0         if (!elem || !SvROK(*elem)) continue;
    0          
4280 0           HV *ah = (HV*)SvRV(*elem);
4281              
4282 0           SV **mid_sv = hv_fetch(ah, "member_id", 9, 0);
4283 0 0         if (!mid_sv) continue;
4284             STRLEN mid_len;
4285 0           const char *mid = SvPV(*mid_sv, mid_len);
4286 0           kf_buf_append_string(&body, mid, (int16_t)mid_len);
4287              
4288 0           SV **data_sv = hv_fetch(ah, "assignment", 10, 0);
4289 0 0         if (!data_sv) { kf_buf_append_bytes(&body, NULL, 0); continue; }
4290             STRLEN dlen;
4291 0           const char *ddata = SvPV(*data_sv, dlen);
4292 0           kf_buf_append_bytes(&body, ddata, (int32_t)dlen);
4293             }
4294             } else {
4295 0           kf_buf_append_i32(&body, 0); /* empty array */
4296             }
4297              
4298 0           conn_send_request(aTHX_ self, API_SYNC_GROUP, ver, &body, cb, 0, 0);
4299 0           kf_buf_free(&body);
4300             }
4301              
4302             void
4303             heartbeat(EV::Kafka::Conn self, const char *group_id, int generation_id, const char *member_id, SV *cb, SV *group_instance_id_sv = NULL)
4304             CODE:
4305             {
4306 0 0         if (self->state != CONN_READY)
4307 0           croak("not connected");
4308              
4309 0           int16_t ver = self->api_versions[API_HEARTBEAT];
4310 0 0         if (ver < 0) ver = 0;
4311 0 0         if (ver > 4) ver = 4;
4312              
4313             kf_buf_t body;
4314 0           kf_buf_init(&body);
4315              
4316 0           STRLEN glen = strlen(group_id);
4317 0           kf_buf_append_string(&body, group_id, (int16_t)glen);
4318 0           kf_buf_append_i32(&body, generation_id);
4319              
4320 0           STRLEN mlen = strlen(member_id);
4321 0           kf_buf_append_string(&body, member_id, (int16_t)mlen);
4322              
4323             /* group_instance_id (v3+) */
4324 0 0         if (ver >= 3) {
4325 0 0         if (group_instance_id_sv && SvOK(group_instance_id_sv)) {
    0          
4326             STRLEN gilen;
4327 0           const char *gi = SvPV(group_instance_id_sv, gilen);
4328 0           kf_buf_append_nullable_string(&body, gi, (int16_t)gilen);
4329             } else {
4330 0           kf_buf_append_nullable_string(&body, NULL, 0);
4331             }
4332             }
4333              
4334 0           conn_send_request(aTHX_ self, API_HEARTBEAT, ver, &body, cb, 0, 0);
4335 0           kf_buf_free(&body);
4336             }
4337              
4338             void
4339             offset_commit(EV::Kafka::Conn self, const char *group_id, int generation_id, const char *member_id, SV *offsets_sv, SV *cb)
4340             CODE:
4341             {
4342 0 0         if (self->state != CONN_READY)
4343 0           croak("not connected");
4344              
4345 0           int16_t ver = self->api_versions[API_OFFSET_COMMIT];
4346 0 0         if (ver < 0) ver = 2;
4347 0 0         if (ver > 7) ver = 7;
4348              
4349             kf_buf_t body;
4350 0           kf_buf_init(&body);
4351              
4352 0           STRLEN glen = strlen(group_id);
4353 0           kf_buf_append_string(&body, group_id, (int16_t)glen);
4354              
4355             /* generation_id (v1+) */
4356 0 0         if (ver >= 1)
4357 0           kf_buf_append_i32(&body, generation_id);
4358              
4359             /* member_id (v1+) */
4360 0 0         if (ver >= 1) {
4361 0           STRLEN mlen = strlen(member_id);
4362 0           kf_buf_append_string(&body, member_id, (int16_t)mlen);
4363             }
4364              
4365             /* group_instance_id (v7+): null */
4366 0 0         if (ver >= 7)
4367 0           kf_buf_append_nullable_string(&body, NULL, 0);
4368              
4369             /* topics: ARRAY of {topic, partitions: [{partition, committed_offset, metadata}]} */
4370 0           AV *topics = (AV*)SvRV(offsets_sv);
4371 0           SSize_t i, tc = av_len(topics) + 1;
4372 0           kf_buf_append_i32(&body, (int32_t)tc);
4373              
4374 0 0         for (i = 0; i < tc; i++) {
4375 0           SV **elem = av_fetch(topics, i, 0);
4376 0 0         if (!elem || !SvROK(*elem)) continue;
    0          
4377 0           HV *th = (HV*)SvRV(*elem);
4378 0           SV **tname_sv = hv_fetch(th, "topic", 5, 0);
4379 0 0         if (!tname_sv) continue;
4380             STRLEN tnlen;
4381 0           const char *tname = SvPV(*tname_sv, tnlen);
4382 0           kf_buf_append_string(&body, tname, (int16_t)tnlen);
4383              
4384 0           SV **parts_sv = hv_fetch(th, "partitions", 10, 0);
4385 0 0         if (!parts_sv || !SvROK(*parts_sv)) { kf_buf_append_i32(&body, 0); continue; }
    0          
4386 0           AV *parts = (AV*)SvRV(*parts_sv);
4387 0           SSize_t j, pc = av_len(parts) + 1;
4388 0           kf_buf_append_i32(&body, (int32_t)pc);
4389              
4390 0 0         for (j = 0; j < pc; j++) {
4391 0           SV **pelem = av_fetch(parts, j, 0);
4392 0 0         if (!pelem || !SvROK(*pelem)) continue;
    0          
4393 0           HV *ph = (HV*)SvRV(*pelem);
4394              
4395 0           SV **pid_sv = hv_fetch(ph, "partition", 9, 0);
4396 0 0         kf_buf_append_i32(&body, pid_sv ? (int32_t)SvIV(*pid_sv) : 0);
4397              
4398 0           SV **off_sv = hv_fetch(ph, "offset", 6, 0);
4399 0 0         kf_buf_append_i64(&body, off_sv ? (int64_t)SvIV(*off_sv) : 0);
4400              
4401             /* leader_epoch (v6+) */
4402 0 0         if (ver >= 6)
4403 0           kf_buf_append_i32(&body, -1);
4404              
4405             /* metadata: nullable string = empty */
4406 0           kf_buf_append_nullable_string(&body, "", 0);
4407             }
4408             }
4409              
4410 0           conn_send_request(aTHX_ self, API_OFFSET_COMMIT, ver, &body, cb, 0, 0);
4411 0           kf_buf_free(&body);
4412             }
4413              
4414             void
4415             offset_fetch(EV::Kafka::Conn self, const char *group_id, SV *topics_sv, SV *cb)
4416             CODE:
4417             {
4418 0 0         if (self->state != CONN_READY)
4419 0           croak("not connected");
4420              
4421 0           int16_t ver = self->api_versions[API_OFFSET_FETCH];
4422 0 0         if (ver < 0) ver = 1;
4423 0 0         if (ver > 5) ver = 5;
4424              
4425             kf_buf_t body;
4426 0           kf_buf_init(&body);
4427              
4428 0           STRLEN glen = strlen(group_id);
4429 0           kf_buf_append_string(&body, group_id, (int16_t)glen);
4430              
4431             /* topics: ARRAY */
4432 0           AV *topics = (AV*)SvRV(topics_sv);
4433 0           SSize_t i, tc = av_len(topics) + 1;
4434 0           kf_buf_append_i32(&body, (int32_t)tc);
4435              
4436 0 0         for (i = 0; i < tc; i++) {
4437 0           SV **elem = av_fetch(topics, i, 0);
4438 0 0         if (!elem || !SvROK(*elem)) continue;
    0          
4439 0           HV *th = (HV*)SvRV(*elem);
4440 0           SV **tname_sv = hv_fetch(th, "topic", 5, 0);
4441 0 0         if (!tname_sv) continue;
4442             STRLEN tnlen;
4443 0           const char *tname = SvPV(*tname_sv, tnlen);
4444 0           kf_buf_append_string(&body, tname, (int16_t)tnlen);
4445              
4446 0           SV **parts_sv = hv_fetch(th, "partitions", 10, 0);
4447 0 0         if (!parts_sv || !SvROK(*parts_sv)) { kf_buf_append_i32(&body, 0); continue; }
    0          
4448 0           AV *parts = (AV*)SvRV(*parts_sv);
4449 0           SSize_t j, pc = av_len(parts) + 1;
4450 0           kf_buf_append_i32(&body, (int32_t)pc);
4451              
4452 0 0         for (j = 0; j < pc; j++) {
4453 0           SV **pelem = av_fetch(parts, j, 0);
4454 0 0         kf_buf_append_i32(&body, pelem ? (int32_t)SvIV(*pelem) : 0);
4455             }
4456             }
4457              
4458 0           conn_send_request(aTHX_ self, API_OFFSET_FETCH, ver, &body, cb, 0, 0);
4459 0           kf_buf_free(&body);
4460             }
4461              
4462             void
4463             leave_group(EV::Kafka::Conn self, const char *group_id, const char *member_id, SV *cb)
4464             CODE:
4465             {
4466 0 0         if (self->state != CONN_READY)
4467 0           croak("not connected");
4468              
4469 0           int16_t ver = self->api_versions[API_LEAVE_GROUP];
4470 0 0         if (ver < 0) ver = 0;
4471 0 0         if (ver > 3) ver = 3;
4472              
4473             kf_buf_t body;
4474 0           kf_buf_init(&body);
4475              
4476 0           STRLEN glen = strlen(group_id);
4477 0           kf_buf_append_string(&body, group_id, (int16_t)glen);
4478              
4479 0           STRLEN mlen = strlen(member_id);
4480 0           kf_buf_append_string(&body, member_id, (int16_t)mlen);
4481              
4482 0           conn_send_request(aTHX_ self, API_LEAVE_GROUP, ver, &body, cb, 0, 0);
4483 0           kf_buf_free(&body);
4484             }
4485              
4486             void
4487             create_topics(EV::Kafka::Conn self, SV *topics_sv, int timeout_ms, SV *cb)
4488             CODE:
4489             {
4490 0 0         if (self->state != CONN_READY)
4491 0           croak("not connected");
4492              
4493 0           int16_t ver = self->api_versions[API_CREATE_TOPICS];
4494 0 0         if (ver < 0) ver = 0;
4495 0 0         if (ver > 4) ver = 4;
4496              
4497             kf_buf_t body;
4498 0           kf_buf_init(&body);
4499              
4500 0           AV *topics = (AV*)SvRV(topics_sv);
4501 0           SSize_t i, tc = av_len(topics) + 1;
4502 0           kf_buf_append_i32(&body, (int32_t)tc);
4503              
4504 0 0         for (i = 0; i < tc; i++) {
4505 0           SV **elem = av_fetch(topics, i, 0);
4506 0 0         if (!elem || !SvROK(*elem)) continue;
    0          
4507 0           HV *th = (HV*)SvRV(*elem);
4508              
4509 0           SV **name_sv = hv_fetch(th, "name", 4, 0);
4510 0 0         if (!name_sv) continue;
4511             STRLEN nlen;
4512 0           const char *name = SvPV(*name_sv, nlen);
4513 0           kf_buf_append_string(&body, name, (int16_t)nlen);
4514              
4515 0           SV **np_sv = hv_fetch(th, "num_partitions", 14, 0);
4516 0 0         int32_t num_partitions = np_sv ? (int32_t)SvIV(*np_sv) : 1;
4517 0           kf_buf_append_i32(&body, num_partitions);
4518              
4519 0           SV **rf_sv = hv_fetch(th, "replication_factor", 18, 0);
4520 0 0         int16_t replication_factor = rf_sv ? (int16_t)SvIV(*rf_sv) : 1;
4521 0           kf_buf_append_i16(&body, replication_factor);
4522              
4523             /* assignments: empty array */
4524 0           kf_buf_append_i32(&body, 0);
4525              
4526             /* configs: empty array */
4527 0           kf_buf_append_i32(&body, 0);
4528             }
4529              
4530 0           kf_buf_append_i32(&body, timeout_ms);
4531              
4532             /* validate_only (v1+) */
4533 0 0         if (ver >= 1)
4534 0           kf_buf_append_i8(&body, 0);
4535              
4536 0           conn_send_request(aTHX_ self, API_CREATE_TOPICS, ver, &body, cb, 0, 0);
4537 0           kf_buf_free(&body);
4538             }
4539              
4540             void
4541             delete_topics(EV::Kafka::Conn self, SV *topics_sv, int timeout_ms, SV *cb)
4542             CODE:
4543             {
4544 0 0         if (self->state != CONN_READY)
4545 0           croak("not connected");
4546              
4547 0           int16_t ver = self->api_versions[API_DELETE_TOPICS];
4548 0 0         if (ver < 0) ver = 0;
4549 0 0         if (ver > 3) ver = 3;
4550              
4551             kf_buf_t body;
4552 0           kf_buf_init(&body);
4553              
4554 0           AV *topics = (AV*)SvRV(topics_sv);
4555 0           SSize_t i, tc = av_len(topics) + 1;
4556 0           kf_buf_append_i32(&body, (int32_t)tc);
4557              
4558 0 0         for (i = 0; i < tc; i++) {
4559 0           SV **elem = av_fetch(topics, i, 0);
4560 0 0         if (!elem) continue;
4561             STRLEN tlen;
4562 0           const char *tname = SvPV(*elem, tlen);
4563 0           kf_buf_append_string(&body, tname, (int16_t)tlen);
4564             }
4565              
4566 0           kf_buf_append_i32(&body, timeout_ms);
4567              
4568 0           conn_send_request(aTHX_ self, API_DELETE_TOPICS, ver, &body, cb, 0, 0);
4569 0           kf_buf_free(&body);
4570             }
4571              
4572             void
4573             init_producer_id(EV::Kafka::Conn self, SV *transactional_id_sv, int txn_timeout_ms, SV *cb)
4574             CODE:
4575             {
4576 0 0         if (self->state != CONN_READY)
4577 0           croak("not connected");
4578              
4579 0           int16_t ver = self->api_versions[API_INIT_PRODUCER_ID];
4580 0 0         if (ver < 0) ver = 0;
4581 0 0         if (ver > 1) ver = 1;
4582              
4583             kf_buf_t body;
4584 0           kf_buf_init(&body);
4585              
4586 0 0         if (SvOK(transactional_id_sv)) {
4587             STRLEN tlen;
4588 0           const char *tid = SvPV(transactional_id_sv, tlen);
4589 0           kf_buf_append_string(&body, tid, (int16_t)tlen);
4590             } else {
4591 0           kf_buf_append_nullable_string(&body, NULL, 0);
4592             }
4593              
4594 0           kf_buf_append_i32(&body, txn_timeout_ms);
4595              
4596             /* v2+: producer_id(i64=-1), producer_epoch(i16=-1) */
4597 0 0         if (ver >= 2) {
4598 0           kf_buf_append_i64(&body, -1);
4599 0           kf_buf_append_i16(&body, -1);
4600             }
4601              
4602 0           conn_send_request(aTHX_ self, API_INIT_PRODUCER_ID, ver, &body, cb, 0, 0);
4603 0           kf_buf_free(&body);
4604             }
4605              
4606             void
4607             add_partitions_to_txn(EV::Kafka::Conn self, const char *transactional_id, SV *producer_id_sv, int producer_epoch, SV *topics_sv, SV *cb)
4608             CODE:
4609             {
4610 0 0         if (self->state != CONN_READY)
4611 0           croak("not connected");
4612              
4613 0           int16_t ver = self->api_versions[API_ADD_PARTITIONS_TXN];
4614 0 0         if (ver < 0) ver = 0;
4615 0 0         if (ver > 1) ver = 1;
4616              
4617 0           int64_t pid = SvIV(producer_id_sv);
4618              
4619             kf_buf_t body;
4620 0           kf_buf_init(&body);
4621              
4622 0           STRLEN tid_len = strlen(transactional_id);
4623 0           kf_buf_append_string(&body, transactional_id, (int16_t)tid_len);
4624 0           kf_buf_append_i64(&body, pid);
4625 0           kf_buf_append_i16(&body, (int16_t)producer_epoch);
4626              
4627             /* topics: ARRAY of {topic, partitions: ARRAY(i32)} */
4628 0 0         if (!SvROK(topics_sv) || SvTYPE(SvRV(topics_sv)) != SVt_PVAV)
    0          
4629 0           croak("add_partitions_to_txn: expected arrayref");
4630 0           AV *topics = (AV*)SvRV(topics_sv);
4631 0           SSize_t i, tc = av_len(topics) + 1;
4632 0           kf_buf_append_i32(&body, (int32_t)tc);
4633              
4634 0 0         for (i = 0; i < tc; i++) {
4635 0           SV **elem = av_fetch(topics, i, 0);
4636 0 0         if (!elem || !SvROK(*elem)) croak("add_partitions_to_txn: bad element");
    0          
4637 0           HV *th = (HV*)SvRV(*elem);
4638 0           SV **tname_sv = hv_fetch(th, "topic", 5, 0);
4639 0 0         if (!tname_sv) croak("add_partitions_to_txn: missing topic");
4640             STRLEN tnlen;
4641 0           const char *tname = SvPV(*tname_sv, tnlen);
4642 0           kf_buf_append_string(&body, tname, (int16_t)tnlen);
4643              
4644 0           SV **parts_sv = hv_fetch(th, "partitions", 10, 0);
4645 0 0         if (!parts_sv || !SvROK(*parts_sv)) croak("add_partitions_to_txn: missing partitions");
    0          
4646 0           AV *parts = (AV*)SvRV(*parts_sv);
4647 0           SSize_t j, pc = av_len(parts) + 1;
4648 0           kf_buf_append_i32(&body, (int32_t)pc);
4649 0 0         for (j = 0; j < pc; j++) {
4650 0           SV **pelem = av_fetch(parts, j, 0);
4651 0 0         kf_buf_append_i32(&body, pelem ? (int32_t)SvIV(*pelem) : 0);
4652             }
4653             }
4654              
4655 0           conn_send_request(aTHX_ self, API_ADD_PARTITIONS_TXN, ver, &body, cb, 0, 0);
4656 0           kf_buf_free(&body);
4657             }
4658              
4659             void
4660             end_txn(EV::Kafka::Conn self, const char *transactional_id, SV *producer_id_sv, int producer_epoch, int committed, SV *cb)
4661             CODE:
4662             {
4663 0 0         if (self->state != CONN_READY)
4664 0           croak("not connected");
4665              
4666 0           int16_t ver = self->api_versions[API_END_TXN];
4667 0 0         if (ver < 0) ver = 0;
4668 0 0         if (ver > 1) ver = 1;
4669              
4670 0           int64_t pid = SvIV(producer_id_sv);
4671              
4672             kf_buf_t body;
4673 0           kf_buf_init(&body);
4674              
4675 0           STRLEN tid_len = strlen(transactional_id);
4676 0           kf_buf_append_string(&body, transactional_id, (int16_t)tid_len);
4677 0           kf_buf_append_i64(&body, pid);
4678 0           kf_buf_append_i16(&body, (int16_t)producer_epoch);
4679 0           kf_buf_append_i8(&body, committed ? 1 : 0);
4680              
4681 0           conn_send_request(aTHX_ self, API_END_TXN, ver, &body, cb, 0, 0);
4682 0           kf_buf_free(&body);
4683             }
4684              
4685             void
4686             txn_offset_commit(EV::Kafka::Conn self, const char *transactional_id, const char *group_id, SV *producer_id_sv, int producer_epoch, int generation_id, const char *member_id, SV *offsets_sv, SV *cb)
4687             CODE:
4688             {
4689 0 0         if (self->state != CONN_READY)
4690 0           croak("not connected");
4691              
4692 0           int16_t ver = self->api_versions[API_TXN_OFFSET_COMMIT];
4693 0 0         if (ver < 0) ver = 0;
4694 0 0         if (ver > 3) ver = 3;
4695              
4696 0           int64_t pid = SvIV(producer_id_sv);
4697              
4698             kf_buf_t body;
4699 0           kf_buf_init(&body);
4700              
4701 0           STRLEN tid_len = strlen(transactional_id);
4702 0           kf_buf_append_string(&body, transactional_id, (int16_t)tid_len);
4703              
4704 0           STRLEN gid_len = strlen(group_id);
4705 0           kf_buf_append_string(&body, group_id, (int16_t)gid_len);
4706              
4707 0           kf_buf_append_i64(&body, pid);
4708 0           kf_buf_append_i16(&body, (int16_t)producer_epoch);
4709              
4710             /* v3+: generation_id, member_id, group_instance_id */
4711 0 0         if (ver >= 3) {
4712 0           kf_buf_append_i32(&body, generation_id);
4713 0           STRLEN mid_len = strlen(member_id);
4714 0           kf_buf_append_string(&body, member_id, (int16_t)mid_len);
4715 0           kf_buf_append_nullable_string(&body, NULL, 0); /* group_instance_id */
4716             }
4717              
4718             /* offsets: ARRAY of {topic, partitions: [{partition, offset}]} */
4719 0 0         if (!SvROK(offsets_sv) || SvTYPE(SvRV(offsets_sv)) != SVt_PVAV)
    0          
4720 0           croak("txn_offset_commit: expected arrayref");
4721 0           AV *topics = (AV*)SvRV(offsets_sv);
4722 0           SSize_t i, tc = av_len(topics) + 1;
4723 0           kf_buf_append_i32(&body, (int32_t)tc);
4724              
4725 0 0         for (i = 0; i < tc; i++) {
4726 0           SV **elem = av_fetch(topics, i, 0);
4727 0 0         if (!elem || !SvROK(*elem)) croak("txn_offset_commit: bad element");
    0          
4728 0           HV *th = (HV*)SvRV(*elem);
4729              
4730 0           SV **tname_sv = hv_fetch(th, "topic", 5, 0);
4731 0 0         if (!tname_sv) croak("txn_offset_commit: missing topic");
4732             STRLEN tnlen;
4733 0           const char *tname = SvPV(*tname_sv, tnlen);
4734 0           kf_buf_append_string(&body, tname, (int16_t)tnlen);
4735              
4736 0           SV **parts_sv = hv_fetch(th, "partitions", 10, 0);
4737 0 0         if (!parts_sv || !SvROK(*parts_sv)) croak("txn_offset_commit: missing partitions");
    0          
4738 0           AV *parts = (AV*)SvRV(*parts_sv);
4739 0           SSize_t j, pc = av_len(parts) + 1;
4740 0           kf_buf_append_i32(&body, (int32_t)pc);
4741              
4742 0 0         for (j = 0; j < pc; j++) {
4743 0           SV **pelem = av_fetch(parts, j, 0);
4744 0 0         if (!pelem || !SvROK(*pelem)) croak("txn_offset_commit: bad partition");
    0          
4745 0           HV *ph = (HV*)SvRV(*pelem);
4746              
4747 0           SV **ppid_sv = hv_fetch(ph, "partition", 9, 0);
4748 0 0         kf_buf_append_i32(&body, ppid_sv ? (int32_t)SvIV(*ppid_sv) : 0);
4749              
4750 0           SV **off_sv = hv_fetch(ph, "offset", 6, 0);
4751 0 0         kf_buf_append_i64(&body, off_sv ? (int64_t)SvIV(*off_sv) : 0);
4752              
4753             /* leader_epoch (v2+) */
4754 0 0         if (ver >= 2)
4755 0           kf_buf_append_i32(&body, -1);
4756              
4757 0           kf_buf_append_nullable_string(&body, "", 0); /* metadata */
4758             }
4759             }
4760              
4761 0           conn_send_request(aTHX_ self, API_TXN_OFFSET_COMMIT, ver, &body, cb, 0, 0);
4762 0           kf_buf_free(&body);
4763             }
4764              
4765             MODULE = EV::Kafka PACKAGE = EV::Kafka
4766              
4767             EV::Kafka
4768             _new(char *cls, SV *loop_sv)
4769             CODE:
4770             {
4771             struct ev_loop *loop;
4772             ev_kafka_t *self;
4773              
4774 6 50         if (SvOK(loop_sv) && sv_derived_from(loop_sv, "EV::Loop"))
    0          
4775 0           loop = (struct ev_loop *)SvIV(SvRV(loop_sv));
4776             else
4777 6           loop = EV_DEFAULT;
4778              
4779 6           Newxz(self, 1, ev_kafka_t);
4780 6           self->magic = KF_MAGIC_ALIVE;
4781 6           self->loop = loop;
4782              
4783 6           ngx_queue_init(&self->brokers);
4784 6           ngx_queue_init(&self->topics);
4785 6           ngx_queue_init(&self->produce_batches);
4786 6           ngx_queue_init(&self->consume_partitions);
4787              
4788 6           self->client_id = savepv("ev-kafka");
4789 6           self->client_id_len = 8;
4790              
4791             /* Producer defaults */
4792 6           self->linger_ms = 5.0;
4793 6           self->batch_size = 16384;
4794 6           self->acks = -1;
4795 6           self->max_request_size = 1048576;
4796              
4797             /* Consumer defaults */
4798 6           self->fetch_max_bytes = 1048576;
4799 6           self->fetch_min_bytes = 1;
4800 6           self->fetch_max_wait_ms = 500;
4801              
4802             /* Metadata defaults */
4803 6           self->metadata_refresh_interval = 300.0;
4804              
4805 6           RETVAL = self;
4806             }
4807             OUTPUT:
4808             RETVAL
4809              
4810             void
4811             DESTROY(EV::Kafka self)
4812             CODE:
4813             {
4814 6 50         if (self->magic != KF_MAGIC_ALIVE) return;
4815 6           self->magic = KF_MAGIC_FREED;
4816              
4817             /* Clean up broker connections */
4818 6 50         while (!ngx_queue_empty(&self->brokers)) {
4819 0           ngx_queue_t *q = ngx_queue_head(&self->brokers);
4820 0           ngx_queue_remove(q);
4821 0           ev_kafka_conn_t *conn = ngx_queue_data(q, ev_kafka_conn_t, cluster_queue);
4822 0           conn->cluster = NULL;
4823 0           conn->intentional_disconnect = 1;
4824 0           conn_cleanup(aTHX_ conn);
4825             /* Note: conn is owned by its Perl SV, will be freed by Perl GC */
4826             }
4827              
4828             /* Clean up topic metadata */
4829 6 50         while (!ngx_queue_empty(&self->topics)) {
4830 0           ngx_queue_t *q = ngx_queue_head(&self->topics);
4831 0           ngx_queue_remove(q);
4832 0           kf_topic_meta_t *tm = ngx_queue_data(q, kf_topic_meta_t, queue);
4833 0 0         if (tm->name) Safefree(tm->name);
4834 0 0         if (tm->partitions) Safefree(tm->partitions);
4835 0           Safefree(tm);
4836             }
4837              
4838 6 50         if (self->metadata_timing) {
4839 0           ev_timer_stop(self->loop, &self->metadata_timer);
4840 0           self->metadata_timing = 0;
4841             }
4842 6 50         if (self->linger_timing) {
4843 0           ev_timer_stop(self->loop, &self->linger_timer);
4844 0           self->linger_timing = 0;
4845             }
4846 6 50         if (self->fetch_timing) {
4847 0           ev_timer_stop(self->loop, &self->fetch_timer);
4848 0           self->fetch_timing = 0;
4849             }
4850              
4851             /* Free bootstrap */
4852 6 50         if (self->bootstrap_hosts) {
4853             int i;
4854 0 0         for (i = 0; i < self->bootstrap_count; i++)
4855 0 0         if (self->bootstrap_hosts[i]) Safefree(self->bootstrap_hosts[i]);
4856 0           Safefree(self->bootstrap_hosts);
4857             }
4858 6 50         if (self->bootstrap_ports) Safefree(self->bootstrap_ports);
4859              
4860             /* Free consumer group */
4861 6 50         if (self->group) {
4862 0 0         if (self->group->heartbeat_timing)
4863 0           ev_timer_stop(self->loop, &self->group->heartbeat_timer);
4864 0 0         if (self->group->group_id) Safefree(self->group->group_id);
4865 0 0         if (self->group->member_id) Safefree(self->group->member_id);
4866 0 0         CLEAR_HANDLER(self->group->on_assign);
4867 0 0         CLEAR_HANDLER(self->group->on_revoke);
4868 0 0         if (self->group->subscriptions) SvREFCNT_dec((SV*)self->group->subscriptions);
4869 0           Safefree(self->group);
4870             }
4871              
4872 6 50         CLEAR_HANDLER(self->on_error);
4873 6 50         CLEAR_HANDLER(self->on_connect);
4874 6 50         CLEAR_HANDLER(self->on_message);
4875 6 50         CLEAR_HANDLER(self->partitioner);
4876              
4877 6 50         if (self->client_id) Safefree(self->client_id);
4878 6 50         if (self->sasl_mechanism) Safefree(self->sasl_mechanism);
4879 6 50         if (self->sasl_username) Safefree(self->sasl_username);
4880 6 50         if (self->sasl_password) Safefree(self->sasl_password);
4881 6 50         if (self->tls_ca_file) Safefree(self->tls_ca_file);
4882              
4883 6           Safefree(self);
4884             }
4885              
4886             int
4887             _murmur2(SV *data_sv)
4888             CODE:
4889             {
4890             STRLEN len;
4891 16           const unsigned char *data = (const unsigned char *)SvPV(data_sv, len);
4892 16           uint32_t h = 0x9747b28c ^ (uint32_t)len;
4893 16           const uint32_t m = 0x5bd1e995;
4894 16           size_t i = 0;
4895 16           size_t remaining = len;
4896              
4897 275 100         while (remaining >= 4) {
4898             uint32_t k;
4899 259           memcpy(&k, data + i, 4); /* little-endian on x86, matches Java */
4900 259           k *= m;
4901 259           k ^= k >> 24;
4902 259           k *= m;
4903 259           h *= m;
4904 259           h ^= k;
4905 259           i += 4;
4906 259           remaining -= 4;
4907             }
4908              
4909 16           switch (remaining) {
4910 5           case 3: h ^= (uint32_t)data[i + 2] << 16; /* fallthrough */
4911 7           case 2: h ^= (uint32_t)data[i + 1] << 8; /* fallthrough */
4912 10           case 1: h ^= (uint32_t)data[i]; h *= m;
4913             }
4914              
4915 16           h ^= h >> 13;
4916 16           h *= m;
4917 16           h ^= h >> 15;
4918              
4919 16 100         RETVAL = (int)(h & 0x7FFFFFFF);
4920             }
4921             OUTPUT:
4922             RETVAL
4923              
4924             unsigned int
4925             _crc32c(SV *data_sv)
4926             CODE:
4927             {
4928             STRLEN len;
4929 7           const char *data = SvPV(data_sv, len);
4930 7           RETVAL = crc32c(data, len);
4931             }
4932             OUTPUT:
4933             RETVAL
4934              
4935             void
4936             _error_name(int code)
4937             PPCODE:
4938             {
4939 10           const char *name = NULL;
4940 10           switch (code) {
4941 1           case 0: name = "NONE"; break;
4942 1           case 1: name = "OFFSET_OUT_OF_RANGE"; break;
4943 0           case 2: name = "CORRUPT_MESSAGE"; break;
4944 1           case 3: name = "UNKNOWN_TOPIC_OR_PARTITION"; break;
4945 0           case 5: name = "LEADER_NOT_AVAILABLE"; break;
4946 1           case 6: name = "NOT_LEADER_OR_FOLLOWER"; break;
4947 0           case 7: name = "REQUEST_TIMED_OUT"; break;
4948 0           case 10: name = "MESSAGE_TOO_LARGE"; break;
4949 1           case 15: name = "COORDINATOR_NOT_AVAILABLE"; break;
4950 1           case 16: name = "NOT_COORDINATOR"; break;
4951 0           case 17: name = "INVALID_TOPIC_EXCEPTION"; break;
4952 0           case 22: name = "ILLEGAL_GENERATION"; break;
4953 0           case 25: name = "UNKNOWN_MEMBER_ID"; break;
4954 1           case 27: name = "REBALANCE_IN_PROGRESS"; break;
4955 0           case 35: name = "UNSUPPORTED_VERSION"; break;
4956 1           case 36: name = "TOPIC_ALREADY_EXISTS"; break;
4957 0           case 39: name = "REASSIGNMENT_IN_PROGRESS"; break;
4958 0           case 41: name = "NOT_CONTROLLER"; break;
4959 0           case 47: name = "INVALID_REPLICATION_FACTOR"; break;
4960 0           case 58: name = "SASL_AUTHENTICATION_FAILED"; break;
4961 0           case 72: name = "LISTENER_NOT_FOUND"; break;
4962 1           case 79: name = "MEMBER_ID_REQUIRED"; break;
4963 1           default: name = "UNKNOWN"; break;
4964             }
4965 10 50         EXTEND(SP, 1);
4966 10           mPUSHp(name, strlen(name));
4967 10           XSRETURN(1);
4968             }
4969              
4970             BOOT:
4971             {
4972 15 50         I_EV_API("EV::Kafka");
    50          
    50          
4973 15           crc32c_init_table();
4974             }