File Coverage

Kafka.xs
Criterion Covered Total %
statement 1121 2768 40.5
branch 460 1922 23.9
condition n/a
subroutine n/a
pod n/a
total 1581 4690 33.7


line stmt bran cond sub pod time code
1             #include "EXTERN.h"
2             #include "perl.h"
3             #include "XSUB.h"
4             #include "ppport.h"
5              
6             #include "EVAPI.h"
7             #include "ngx-queue.h"
8              
9             #include
10             #include
11             #include
12             #include
13             #include
14             #include
15             #include
16             #include
17             #include
18             #include
19              
20             #ifdef HAVE_OPENSSL
21             #include
22             #include
23             #include
24             #include
25             #include
26             #include
27             #include
28             #include
29             #include
30             #endif
31              
32             #ifdef HAVE_LZ4
33             #include
34             #endif
35              
36             #ifdef HAVE_ZLIB
37             #include
38             #endif
39              
40             #ifdef HAVE_ZSTD
41             #include
42             #endif
43              
44             #ifdef HAVE_SNAPPY
45             #include
46             #endif
47              
48             /* ================================================================
49             * Constants
50             * ================================================================ */
51              
52             #define KF_MAGIC_ALIVE 0xCAFEBEEF
53             #define KF_MAGIC_FREED 0xDEADCAFE
54              
55             #define KF_BUF_INIT 16384
56              
57             /* True during Perl global destruction — calling user callbacks then is unsafe
58             * because closed-over SVs may already have been freed. */
59             #ifdef PERL_PHASE_DESTRUCT
60             # define KF_IN_GLOBAL_DESTRUCT() (PL_phase == PERL_PHASE_DESTRUCT)
61             #else
62             # define KF_IN_GLOBAL_DESTRUCT() (PL_dirty)
63             #endif
64              
65             /* Connection states */
66             #define CONN_DISCONNECTED 0
67             #define CONN_CONNECTING 1
68             #define CONN_TLS_HANDSHAKE 2
69             #define CONN_SASL_HANDSHAKE 3
70             #define CONN_SASL_AUTH 4
71             #define CONN_API_VERSIONS 5
72             #define CONN_READY 6
73              
74             /* Kafka API keys */
75             #define API_PRODUCE 0
76             #define API_FETCH 1
77             #define API_LIST_OFFSETS 2
78             #define API_METADATA 3
79             #define API_OFFSET_COMMIT 8
80             #define API_OFFSET_FETCH 9
81             #define API_FIND_COORDINATOR 10
82             #define API_JOIN_GROUP 11
83             #define API_HEARTBEAT 12
84             #define API_LEAVE_GROUP 13
85             #define API_SYNC_GROUP 14
86             #define API_SASL_HANDSHAKE 17
87             #define API_API_VERSIONS 18
88             #define API_CREATE_TOPICS 19
89             #define API_DELETE_TOPICS 20
90             #define API_INIT_PRODUCER_ID 22
91             #define API_ADD_PARTITIONS_TXN 24
92             #define API_END_TXN 26
93             #define API_TXN_OFFSET_COMMIT 28
94             #define API_SASL_AUTHENTICATE 36
95              
96             #define API_VERSIONS_MAX_KEY 64
97              
98             /* Compression types — Kafka wire codes. */
99             #define COMPRESS_NONE 0
100             #define COMPRESS_GZIP 1
101             #define COMPRESS_SNAPPY 2
102             #define COMPRESS_LZ4 3
103             #define COMPRESS_ZSTD 4
104              
105             #define CLEAR_HANDLER(field) \
106             do { if (NULL != (field)) { SvREFCNT_dec(field); (field) = NULL; } } while(0)
107              
108             /* ================================================================
109             * Type declarations
110             * ================================================================ */
111              
112             typedef struct kf_buf_s kf_buf_t;
113             typedef struct ev_kafka_conn_s ev_kafka_conn_t;
114             typedef struct ev_kafka_conn_cb_s ev_kafka_conn_cb_t;
115              
116             typedef ev_kafka_conn_t* EV__Kafka__Conn;
117              
118             /* ================================================================
119             * Dynamic buffer
120             * ================================================================ */
121              
122             struct kf_buf_s {
123             char *data;
124             size_t len;
125             size_t cap;
126             };
127              
128 205           static void kf_buf_init(kf_buf_t *b) {
129 205           Newx(b->data, 256, char);
130 205           b->len = 0;
131 205           b->cap = 256;
132 205           }
133              
134 1675           static void kf_buf_grow(kf_buf_t *b, size_t need) {
135 1675 100         if (b->cap >= need) return;
136 19           size_t newcap = b->cap * 2;
137 19 100         if (newcap < need) newcap = need;
138 19           Renew(b->data, newcap, char);
139 19           b->cap = newcap;
140             }
141              
142 205           static void kf_buf_free(kf_buf_t *b) {
143 205 50         if (b->data) { Safefree(b->data); b->data = NULL; }
144 205           b->len = 0;
145 205           b->cap = 0;
146 205           }
147              
148 458           static void kf_buf_append(kf_buf_t *b, const char *data, size_t len) {
149 458           kf_buf_grow(b, b->len + len);
150 458           Copy(data, b->data + b->len, len, char);
151 458           b->len += len;
152 458           }
153              
154 157           static void kf_buf_append_i8(kf_buf_t *b, int8_t val) {
155 157           kf_buf_grow(b, b->len + 1);
156 157           b->data[b->len++] = val;
157 157           }
158              
159 35           static void kf_buf_append_i16(kf_buf_t *b, int16_t val) {
160 35           kf_buf_grow(b, b->len + 2);
161 35           uint16_t v = htons((uint16_t)val);
162 35           memcpy(b->data + b->len, &v, 2);
163 35           b->len += 2;
164 35           }
165              
166 97           static void kf_buf_append_i32(kf_buf_t *b, int32_t val) {
167 97           kf_buf_grow(b, b->len + 4);
168 97           uint32_t v = htonl((uint32_t)val);
169 97           memcpy(b->data + b->len, &v, 4);
170 97           b->len += 4;
171 97           }
172              
173 64           static void kf_buf_append_i64(kf_buf_t *b, int64_t val) {
174 64           kf_buf_grow(b, b->len + 8);
175 64           uint32_t hi = htonl((uint32_t)((uint64_t)val >> 32));
176 64           uint32_t lo = htonl((uint32_t)((uint64_t)val & 0xFFFFFFFF));
177 64           memcpy(b->data + b->len, &hi, 4);
178 64           memcpy(b->data + b->len + 4, &lo, 4);
179 64           b->len += 8;
180 64           }
181              
182             /* Kafka STRING: INT16 length + bytes. NULL → -1 length */
183 0           static void kf_buf_append_string(kf_buf_t *b, const char *s, int16_t len) {
184 0           kf_buf_append_i16(b, len);
185 0 0         if (len > 0) kf_buf_append(b, s, len);
186 0           }
187              
188 1           static void kf_buf_append_nullable_string(kf_buf_t *b, const char *s, int16_t len) {
189 1 50         if (!s) {
190 0           kf_buf_append_i16(b, -1);
191             } else {
192 1           kf_buf_append_i16(b, len);
193 1 50         if (len > 0) kf_buf_append(b, s, len);
194             }
195 1           }
196              
197             /* Kafka BYTES: INT32 length + bytes */
198 0           static void kf_buf_append_bytes(kf_buf_t *b, const char *s, int32_t len) {
199 0           kf_buf_append_i32(b, len);
200 0 0         if (len > 0) kf_buf_append(b, s, len);
201 0           }
202              
203 0           static void kf_buf_append_nullable_bytes(kf_buf_t *b, const char *s, int32_t len) {
204 0 0         if (!s) {
205 0           kf_buf_append_i32(b, -1);
206             } else {
207 0           kf_buf_append_i32(b, len);
208 0 0         if (len > 0) kf_buf_append(b, s, len);
209             }
210 0           }
211              
212             /* Unsigned varint (ZigZag for signed is not used in Kafka framing) */
213 864           static void kf_buf_append_uvarint(kf_buf_t *b, uint64_t val) {
214 864           kf_buf_grow(b, b->len + 10);
215 885 100         while (val >= 0x80) {
216 21           b->data[b->len++] = (char)((val & 0x7F) | 0x80);
217 21           val >>= 7;
218             }
219 864           b->data[b->len++] = (char)val;
220 864           }
221              
222             /* Signed varint (ZigZag encoding) */
223 864           static void kf_buf_append_varint(kf_buf_t *b, int64_t val) {
224 864           uint64_t uval = ((uint64_t)val << 1) ^ (uint64_t)(val >> 63);
225 864           kf_buf_append_uvarint(b, uval);
226 864           }
227              
228             /* Compact string: uvarint(len+1) + bytes. NULL → 0 */
229 0           static void kf_buf_append_compact_string(kf_buf_t *b, const char *s, int32_t len) {
230 0 0         if (!s) {
231 0           kf_buf_append_uvarint(b, 0);
232             } else {
233 0           kf_buf_append_uvarint(b, (uint64_t)len + 1);
234 0 0         if (len > 0) kf_buf_append(b, s, len);
235             }
236 0           }
237              
238             /* Tagged fields: just 0 = no tagged fields */
239 0           static void kf_buf_append_tagged_fields(kf_buf_t *b) {
240 0           kf_buf_append_uvarint(b, 0);
241 0           }
242              
243             /* ================================================================
244             * Wire read helpers (big-endian)
245             * ================================================================ */
246              
247 46           static int16_t kf_read_i16(const char *buf) {
248             uint16_t v;
249 46           memcpy(&v, buf, 2);
250 46           return (int16_t)ntohs(v);
251             }
252              
253 74           static int32_t kf_read_i32(const char *buf) {
254             uint32_t v;
255 74           memcpy(&v, buf, 4);
256 74           return (int32_t)ntohl(v);
257             }
258              
259 32           static int64_t kf_read_i64(const char *buf) {
260             uint32_t hi, lo;
261 32           memcpy(&hi, buf, 4);
262 32           memcpy(&lo, buf + 4, 4);
263 32           return ((int64_t)ntohl(hi) << 32) | (uint32_t)ntohl(lo);
264             }
265              
266             /* Read unsigned varint, returns bytes consumed or -1 on error */
267 846           static int kf_read_uvarint(const char *buf, const char *end, uint64_t *out) {
268 846           uint64_t val = 0;
269 846           int shift = 0;
270 846           const char *p = buf;
271 867 50         while (p < end) {
272 867           uint8_t b = (uint8_t)*p++;
273 867           val |= ((uint64_t)(b & 0x7F)) << shift;
274 867 100         if (!(b & 0x80)) {
275 846           *out = val;
276 846           return (int)(p - buf);
277             }
278 21           shift += 7;
279 21 50         if (shift >= 64) return -1;
280             }
281 0           return -1; /* incomplete */
282             }
283              
284             /* Read signed varint (ZigZag) */
285 846           static int kf_read_varint(const char *buf, const char *end, int64_t *out) {
286             uint64_t uval;
287 846           int n = kf_read_uvarint(buf, end, &uval);
288 846 50         if (n < 0) return n;
289 846           *out = (int64_t)((uval >> 1) ^ -(int64_t)(uval & 1));
290 846           return n;
291             }
292              
293             /* Read Kafka STRING: i16 len + bytes. Returns pointer into buf, sets *len. Returns bytes consumed. */
294 13           static int kf_read_string(const char *buf, const char *end, const char **out, int16_t *slen) {
295 13 50         if (end - buf < 2) return -1;
296 13           int16_t len = kf_read_i16(buf);
297 13 100         if (len < 0) { /* nullable null */
298 1           *out = NULL;
299 1           *slen = 0;
300 1           return 2;
301             }
302 12 50         if (end - buf < 2 + len) return -1;
303 12           *out = buf + 2;
304 12           *slen = len;
305 12           return 2 + len;
306             }
307              
308             /* Read compact string: uvarint(len+1) + bytes */
309 0           static int kf_read_compact_string(const char *buf, const char *end, const char **out, int32_t *slen) {
310             uint64_t raw;
311 0           int n = kf_read_uvarint(buf, end, &raw);
312 0 0         if (n < 0) return -1;
313 0 0         if (raw == 0) {
314 0           *out = NULL;
315 0           *slen = 0;
316 0           return n;
317             }
318 0           int32_t len = (int32_t)(raw - 1);
319 0 0         if (end - buf - n < len) return -1;
320 0           *out = buf + n;
321 0           *slen = len;
322 0           return n + len;
323             }
324              
325             /* Skip tagged fields */
326 0           static int kf_skip_tagged_fields(const char *buf, const char *end) {
327             uint64_t count;
328 0           int n = kf_read_uvarint(buf, end, &count);
329 0 0         if (n < 0) return -1;
330 0           const char *p = buf + n;
331             uint64_t i;
332 0 0         for (i = 0; i < count; i++) {
333             uint64_t tag;
334 0           int tn = kf_read_uvarint(p, end, &tag);
335 0 0         if (tn < 0) return -1;
336 0           p += tn;
337             uint64_t dlen;
338 0           int dn = kf_read_uvarint(p, end, &dlen);
339 0 0         if (dn < 0) return -1;
340 0           p += dn;
341 0 0         if ((uint64_t)(end - p) < dlen) return -1;
342 0           p += dlen;
343             }
344 0           return (int)(p - buf);
345             }
346              
347             /* ================================================================
348             * CRC32C (software implementation)
349             * ================================================================ */
350              
351             static uint32_t crc32c_table[256];
352             static int crc32c_table_inited = 0;
353              
354 21           static void crc32c_init_table(void) {
355             uint32_t i, j;
356 5397 100         for (i = 0; i < 256; i++) {
357 5376           uint32_t crc = i;
358 48384 100         for (j = 0; j < 8; j++) {
359 43008 100         if (crc & 1)
360 21504           crc = (crc >> 1) ^ 0x82F63B78;
361             else
362 21504           crc >>= 1;
363             }
364 5376           crc32c_table[i] = crc;
365             }
366 21           crc32c_table_inited = 1;
367 21           }
368              
369 37           static uint32_t crc32c(const char *buf, size_t len) {
370 37           uint32_t crc = 0xFFFFFFFF;
371             size_t i;
372 37 50         if (!crc32c_table_inited) crc32c_init_table();
373 205109 100         for (i = 0; i < len; i++)
374 205072           crc = crc32c_table[(crc ^ (uint8_t)buf[i]) & 0xFF] ^ (crc >> 8);
375 37           return crc ^ 0xFFFFFFFF;
376             }
377              
378             /* ================================================================
379             * Connection callback struct
380             * ================================================================ */
381              
382             struct ev_kafka_conn_cb_s {
383             SV *cb;
384             ngx_queue_t queue;
385             int32_t correlation_id;
386             int16_t api_key;
387             int16_t api_version;
388             int internal; /* 1 = internal handshake cb, don't invoke Perl */
389             };
390              
391             /* ================================================================
392             * Broker connection struct (Layer 1)
393             * ================================================================ */
394              
395             struct ev_kafka_conn_s {
396             unsigned int magic;
397             struct ev_loop *loop;
398             int fd;
399             int state;
400              
401             /* EV watchers */
402             ev_io rio, wio;
403             ev_timer timer;
404             int reading, writing, timing;
405              
406             /* TLS */
407             #ifdef HAVE_OPENSSL
408             SSL_CTX *ssl_ctx;
409             SSL *ssl;
410             #endif
411             int tls_enabled;
412             char *tls_ca_file;
413             int tls_skip_verify;
414              
415             /* Connection params */
416             char *host;
417             int port;
418              
419             /* SASL */
420             char *sasl_mechanism;
421             char *sasl_username;
422             char *sasl_password;
423              
424             /* SCRAM state */
425             int scram_step;
426             char *scram_nonce;
427             char *scram_client_first;
428             size_t scram_client_first_len;
429             /* For server-final-message signature verification (RFC 5802). */
430             unsigned char scram_server_key[64];
431             int scram_server_key_len;
432             char *scram_auth_message;
433             size_t scram_auth_message_len;
434              
435             /* Buffers */
436             char *rbuf;
437             size_t rbuf_len, rbuf_cap;
438             char *wbuf;
439             size_t wbuf_len, wbuf_off, wbuf_cap;
440              
441             /* Request/response correlation */
442             ngx_queue_t cb_queue;
443             int32_t next_correlation_id;
444             int pending_count;
445              
446             /* Client identity */
447             char *client_id;
448             int client_id_len;
449              
450             /* API version negotiation */
451             int16_t api_versions[API_VERSIONS_MAX_KEY];
452             int api_versions_known;
453              
454             /* Event handlers */
455             SV *on_error;
456             SV *on_connect;
457             SV *on_disconnect;
458              
459             /* Reconnection */
460             int auto_reconnect;
461             int reconnect_delay_ms;
462             ev_timer reconnect_timer;
463             int reconnect_timing;
464             int intentional_disconnect;
465              
466             /* Safety */
467             int callback_depth;
468             };
469              
470             /* ================================================================
471             * Forward declarations
472             * ================================================================ */
473              
474             static void conn_io_cb(EV_P_ ev_io *w, int revents);
475             static void conn_timer_cb(EV_P_ ev_timer *w, int revents);
476             static void conn_reconnect_timer_cb(EV_P_ ev_timer *w, int revents);
477             static void conn_start_reading(ev_kafka_conn_t *self);
478             static void conn_stop_reading(ev_kafka_conn_t *self);
479             static void conn_start_writing(ev_kafka_conn_t *self);
480             static void conn_stop_writing(ev_kafka_conn_t *self);
481             static void conn_emit_error(pTHX_ ev_kafka_conn_t *self, const char *msg);
482             static void conn_cleanup(pTHX_ ev_kafka_conn_t *self);
483             static int conn_check_destroyed(ev_kafka_conn_t *self);
484             static void conn_cancel_pending(pTHX_ ev_kafka_conn_t *self, const char *err);
485             static void conn_on_connect_done(pTHX_ ev_kafka_conn_t *self);
486             static void conn_process_responses(pTHX_ ev_kafka_conn_t *self);
487             static void conn_schedule_reconnect(pTHX_ ev_kafka_conn_t *self);
488             static int32_t conn_send_request(pTHX_ ev_kafka_conn_t *self,
489             int16_t api_key, int16_t api_version, kf_buf_t *body, SV *cb,
490             int internal, int no_response);
491             static void conn_send_api_versions(pTHX_ ev_kafka_conn_t *self);
492             static void conn_start_connect(pTHX_ ev_kafka_conn_t *self,
493             const char *host, int port, double timeout);
494             static void conn_send_sasl_handshake(pTHX_ ev_kafka_conn_t *self);
495             static void conn_parse_sasl_handshake_response(pTHX_ ev_kafka_conn_t *self,
496             const char *data, size_t len);
497             static void conn_send_sasl_authenticate(pTHX_ ev_kafka_conn_t *self);
498             static void conn_parse_sasl_authenticate_response(pTHX_ ev_kafka_conn_t *self,
499             const char *data, size_t len);
500              
501             /* Response parsers */
502             static SV* conn_parse_metadata_response(pTHX_ ev_kafka_conn_t *self,
503             int16_t version, const char *data, size_t len);
504             static SV* conn_parse_produce_response(pTHX_ ev_kafka_conn_t *self,
505             int16_t version, const char *data, size_t len);
506             static SV* conn_parse_fetch_response(pTHX_ ev_kafka_conn_t *self,
507             int16_t version, const char *data, size_t len);
508             static SV* conn_parse_list_offsets_response(pTHX_ ev_kafka_conn_t *self,
509             int16_t version, const char *data, size_t len);
510             static SV* conn_parse_find_coordinator_response(pTHX_ ev_kafka_conn_t *self,
511             int16_t version, const char *data, size_t len);
512             static SV* conn_parse_join_group_response(pTHX_ ev_kafka_conn_t *self,
513             int16_t version, const char *data, size_t len);
514             static SV* conn_parse_sync_group_response(pTHX_ ev_kafka_conn_t *self,
515             int16_t version, const char *data, size_t len);
516             static SV* conn_parse_heartbeat_response(pTHX_ ev_kafka_conn_t *self,
517             int16_t version, const char *data, size_t len);
518             static SV* conn_parse_offset_commit_response(pTHX_ ev_kafka_conn_t *self,
519             int16_t version, const char *data, size_t len);
520             static SV* conn_parse_offset_fetch_response(pTHX_ ev_kafka_conn_t *self,
521             int16_t version, const char *data, size_t len);
522             static SV* conn_parse_leave_group_response(pTHX_ ev_kafka_conn_t *self,
523             int16_t version, const char *data, size_t len);
524             static SV* conn_parse_create_topics_response(pTHX_ ev_kafka_conn_t *self,
525             int16_t version, const char *data, size_t len);
526             static SV* conn_parse_delete_topics_response(pTHX_ ev_kafka_conn_t *self,
527             int16_t version, const char *data, size_t len);
528             static SV* conn_parse_init_producer_id_response(pTHX_ ev_kafka_conn_t *self,
529             int16_t version, const char *data, size_t len);
530             static SV* conn_parse_add_partitions_to_txn_response(pTHX_ ev_kafka_conn_t *self,
531             int16_t version, const char *data, size_t len);
532             static SV* conn_parse_end_txn_response(pTHX_ ev_kafka_conn_t *self,
533             int16_t version, const char *data, size_t len);
534             static SV* conn_parse_txn_offset_commit_response(pTHX_ ev_kafka_conn_t *self,
535             int16_t version, const char *data, size_t len);
536              
537             #ifdef HAVE_OPENSSL
538             static int is_ip_literal(const char *host);
539             #endif
540              
541             /* ================================================================
542             * I/O helpers (with optional TLS)
543             * ================================================================ */
544              
545 1           static ssize_t kf_io_read(ev_kafka_conn_t *self, void *buf, size_t len) {
546             #ifdef HAVE_OPENSSL
547 1 50         if (self->ssl) {
548 0 0         int ssl_len = (len > (size_t)INT_MAX) ? INT_MAX : (int)len;
549 0           ERR_clear_error();
550 0           int ret = SSL_read(self->ssl, buf, ssl_len);
551 0 0         if (ret <= 0) {
552 0           int err = SSL_get_error(self->ssl, ret);
553 0 0         if (err == SSL_ERROR_WANT_READ) {
554 0           errno = EAGAIN;
555 0           return -1;
556             }
557 0 0         if (err == SSL_ERROR_WANT_WRITE) {
558 0           conn_start_writing(self);
559 0           errno = EAGAIN;
560 0           return -1;
561             }
562 0 0         if (err == SSL_ERROR_ZERO_RETURN) return 0;
563 0           ERR_clear_error();
564 0           errno = EIO;
565 0           return -1;
566             }
567 0           return ret;
568             }
569             #endif
570 1           return read(self->fd, buf, len);
571             }
572              
573 1           static ssize_t kf_io_write(ev_kafka_conn_t *self, const void *buf, size_t len) {
574             #ifdef HAVE_OPENSSL
575 1 50         if (self->ssl) {
576 0 0         int ssl_len = (len > (size_t)INT_MAX) ? INT_MAX : (int)len;
577 0           ERR_clear_error();
578 0           int ret = SSL_write(self->ssl, buf, ssl_len);
579 0 0         if (ret <= 0) {
580 0           int err = SSL_get_error(self->ssl, ret);
581 0 0         if (err == SSL_ERROR_WANT_WRITE) {
582 0           errno = EAGAIN;
583 0           return -1;
584             }
585 0 0         if (err == SSL_ERROR_WANT_READ) {
586 0           conn_start_reading(self);
587 0           errno = EAGAIN;
588 0           return -1;
589             }
590 0           ERR_clear_error();
591 0           errno = EIO;
592 0           return -1;
593             }
594 0           return ret;
595             }
596             #endif
597 1           return write(self->fd, buf, len);
598             }
599              
600             /* ================================================================
601             * Buffer management
602             * ================================================================ */
603              
604 1           static void conn_ensure_rbuf(ev_kafka_conn_t *self, size_t need) {
605 1 50         if (self->rbuf_cap >= need) return;
606 0           size_t newcap = self->rbuf_cap * 2;
607 0 0         if (newcap < need) newcap = need;
608 0           Renew(self->rbuf, newcap, char);
609 0           self->rbuf_cap = newcap;
610             }
611              
612 1           static void conn_ensure_wbuf(ev_kafka_conn_t *self, size_t need) {
613 1 50         if (self->wbuf_cap >= need) return;
614 0           size_t newcap = self->wbuf_cap * 2;
615 0 0         if (newcap < need) newcap = need;
616 0           Renew(self->wbuf, newcap, char);
617 0           self->wbuf_cap = newcap;
618             }
619              
620             /* ================================================================
621             * Watcher control
622             * ================================================================ */
623              
624 1           static void conn_start_reading(ev_kafka_conn_t *self) {
625 1 50         if (!self->reading && self->fd >= 0) {
    50          
626 1           ev_io_start(self->loop, &self->rio);
627 1           self->reading = 1;
628             }
629 1           }
630              
631 9           static void conn_stop_reading(ev_kafka_conn_t *self) {
632 9 100         if (self->reading) {
633 1           ev_io_stop(self->loop, &self->rio);
634 1           self->reading = 0;
635             }
636 9           }
637              
638 3           static void conn_start_writing(ev_kafka_conn_t *self) {
639 3 50         if (!self->writing && self->fd >= 0) {
    50          
640 3           ev_io_start(self->loop, &self->wio);
641 3           self->writing = 1;
642             }
643 3           }
644              
645 12           static void conn_stop_writing(ev_kafka_conn_t *self) {
646 12 100         if (self->writing) {
647 3           ev_io_stop(self->loop, &self->wio);
648 3           self->writing = 0;
649             }
650 12           }
651              
652             /* ================================================================
653             * Callback invocation helpers
654             * ================================================================ */
655              
656 1           static void conn_emit_error(pTHX_ ev_kafka_conn_t *self, const char *msg) {
657 1 50         if (!self->on_error)
658 0           croak("EV::Kafka::Conn: %s", msg);
659              
660 1           self->callback_depth++;
661             {
662 1           dSP;
663 1           ENTER;
664 1           SAVETMPS;
665 1 50         PUSHMARK(SP);
666 1 50         XPUSHs(sv_2mortal(newSVpv(msg, 0)));
667 1           PUTBACK;
668 1           call_sv(self->on_error, G_DISCARD | G_EVAL);
669 1 50         if (SvTRUE(ERRSV)) {
    50          
670 0 0         warn("EV::Kafka::Conn: on_error callback error: %s", SvPV_nolen(ERRSV));
671 0 0         sv_setsv(ERRSV, &PL_sv_undef);
672             }
673 1 50         FREETMPS;
674 1           LEAVE;
675             }
676 1           self->callback_depth--;
677 1           }
678              
679 1           static void conn_emit_connect(pTHX_ ev_kafka_conn_t *self) {
680 1 50         if (!self->on_connect) return;
681              
682 1           self->callback_depth++;
683             {
684 1           dSP;
685 1           ENTER;
686 1           SAVETMPS;
687 1 50         PUSHMARK(SP);
688 1           PUTBACK;
689 1           call_sv(self->on_connect, G_DISCARD | G_EVAL);
690 1 50         if (SvTRUE(ERRSV)) {
    50          
691 0 0         warn("EV::Kafka::Conn: on_connect callback error: %s", SvPV_nolen(ERRSV));
692 0 0         sv_setsv(ERRSV, &PL_sv_undef);
693             }
694 1 50         FREETMPS;
695 1           LEAVE;
696             }
697 1           self->callback_depth--;
698             }
699              
700 2           static void conn_emit_disconnect(pTHX_ ev_kafka_conn_t *self) {
701 2 50         if (!self->on_disconnect) return;
702              
703 0           self->callback_depth++;
704             {
705 0           dSP;
706 0           ENTER;
707 0           SAVETMPS;
708 0 0         PUSHMARK(SP);
709 0           PUTBACK;
710 0           call_sv(self->on_disconnect, G_DISCARD | G_EVAL);
711 0 0         if (SvTRUE(ERRSV)) {
    0          
712 0 0         warn("EV::Kafka::Conn: on_disconnect callback error: %s", SvPV_nolen(ERRSV));
713 0 0         sv_setsv(ERRSV, &PL_sv_undef);
714             }
715 0 0         FREETMPS;
716 0           LEAVE;
717             }
718 0           self->callback_depth--;
719             }
720              
721             /* Invoke a command callback: cb->(result, error) */
722 0           static void conn_invoke_cb(pTHX_ ev_kafka_conn_t *self, SV *cb, SV *result, SV *error) {
723 0 0         if (!cb) return;
724              
725 0           self->callback_depth++;
726             {
727 0           dSP;
728 0           ENTER;
729 0           SAVETMPS;
730 0 0         PUSHMARK(SP);
731 0 0         EXTEND(SP, 2);
732 0 0         PUSHs(result ? result : &PL_sv_undef);
733 0 0         PUSHs(error ? error : &PL_sv_undef);
734 0           PUTBACK;
735 0           call_sv(cb, G_DISCARD | G_EVAL);
736 0 0         if (SvTRUE(ERRSV)) {
    0          
737 0 0         warn("EV::Kafka::Conn: callback error: %s", SvPV_nolen(ERRSV));
738 0 0         sv_setsv(ERRSV, &PL_sv_undef);
739             }
740 0 0         FREETMPS;
741 0           LEAVE;
742             }
743 0           self->callback_depth--;
744             }
745              
746 6           static int conn_check_destroyed(ev_kafka_conn_t *self) {
747 6           return self->magic != KF_MAGIC_ALIVE;
748             }
749              
750             /* ================================================================
751             * Connection cleanup
752             * ================================================================ */
753              
754 9           static void conn_cleanup(pTHX_ ev_kafka_conn_t *self) {
755 9           conn_stop_reading(self);
756 9           conn_stop_writing(self);
757 9 50         if (self->timing) {
758 0           ev_timer_stop(self->loop, &self->timer);
759 0           self->timing = 0;
760             }
761             #ifdef HAVE_OPENSSL
762 9 50         if (self->ssl) {
763 0           SSL_free(self->ssl);
764 0           self->ssl = NULL;
765             }
766 9 50         if (self->ssl_ctx) {
767 0           SSL_CTX_free(self->ssl_ctx);
768 0           self->ssl_ctx = NULL;
769             }
770             #endif
771 9 100         if (self->fd >= 0) {
772 2           close(self->fd);
773 2           self->fd = -1;
774             }
775 9           self->state = CONN_DISCONNECTED;
776 9           }
777              
778 9           static void conn_cancel_pending(pTHX_ ev_kafka_conn_t *self, const char *err) {
779 9           int in_destruct = KF_IN_GLOBAL_DESTRUCT();
780 9 50         SV *err_sv = in_destruct ? NULL : newSVpv(err, 0);
781             ngx_queue_t *q;
782              
783 9 50         while (!ngx_queue_empty(&self->cb_queue)) {
784 0           q = ngx_queue_head(&self->cb_queue);
785 0           ngx_queue_remove(q);
786 0           ev_kafka_conn_cb_t *cbt = ngx_queue_data(q, ev_kafka_conn_cb_t, queue);
787 0           self->pending_count--;
788 0 0         if (cbt->cb && !cbt->internal && !in_destruct) {
    0          
    0          
789 0           conn_invoke_cb(aTHX_ self, cbt->cb, NULL, sv_2mortal(newSVsv(err_sv)));
790 0 0         if (conn_check_destroyed(self)) {
791 0           SvREFCNT_dec(cbt->cb);
792 0           Safefree(cbt);
793 0           SvREFCNT_dec(err_sv);
794 0           return;
795             }
796             }
797 0 0         if (cbt->cb) SvREFCNT_dec(cbt->cb);
798 0           Safefree(cbt);
799             }
800 9 50         if (err_sv) SvREFCNT_dec(err_sv);
801             }
802              
803 2           static void conn_handle_disconnect(pTHX_ ev_kafka_conn_t *self, const char *reason) {
804 2           conn_cleanup(aTHX_ self);
805              
806 2           conn_emit_disconnect(aTHX_ self);
807 2 50         if (conn_check_destroyed(self)) return;
808              
809 2           conn_cancel_pending(aTHX_ self, reason);
810 2 50         if (conn_check_destroyed(self)) return;
811              
812 2 100         if (!self->intentional_disconnect && self->auto_reconnect) {
    50          
813 0           conn_schedule_reconnect(aTHX_ self);
814             }
815             }
816              
817             /* ================================================================
818             * Reconnection
819             * ================================================================ */
820              
821 0           static void conn_reconnect_timer_cb(EV_P_ ev_timer *w, int revents) {
822 0           ev_kafka_conn_t *self = (ev_kafka_conn_t *)((char *)w - offsetof(ev_kafka_conn_t, reconnect_timer));
823             dTHX;
824             (void)loop;
825             (void)revents;
826              
827 0           self->reconnect_timing = 0;
828 0 0         if (self->magic != KF_MAGIC_ALIVE) return;
829 0 0         if (self->state != CONN_DISCONNECTED) return;
830 0 0         if (!self->host) return;
831              
832 0           conn_start_connect(aTHX_ self, self->host, self->port, 10.0);
833             }
834              
835 0           static void conn_schedule_reconnect(pTHX_ ev_kafka_conn_t *self) {
836 0 0         if (self->reconnect_timing) return;
837 0           double delay = self->reconnect_delay_ms / 1000.0;
838 0 0         if (delay < 0.01) delay = 1.0;
839 0           ev_timer_init(&self->reconnect_timer, conn_reconnect_timer_cb, delay, 0.0);
840 0           ev_timer_start(self->loop, &self->reconnect_timer);
841 0           self->reconnect_timing = 1;
842             }
843              
844             /* ================================================================
845             * Request framing
846             * ================================================================ */
847              
848             /* Build request header + body, append to wbuf, enqueue callback.
849             * For api_version >= threshold, uses compact header (v2).
850             * ApiVersions v3+ uses flexible (v1) request header.
851             * Returns correlation_id.
852             */
853 1           static int32_t conn_send_request(pTHX_ ev_kafka_conn_t *self,
854             int16_t api_key, int16_t api_version, kf_buf_t *body, SV *cb,
855             int internal, int no_response)
856             {
857 1           int32_t corr_id = self->next_correlation_id++;
858             kf_buf_t hdr;
859 1           kf_buf_init(&hdr);
860              
861             /* Request header v1 (non-flexible): api_key, api_version, correlation_id, client_id */
862             /* Request header v2 (flexible): same + tagged_fields */
863             /* ApiVersions v3+ uses header v2 (flexible) */
864 1 50         int flexible = (api_key == API_API_VERSIONS && api_version >= 3);
    50          
865              
866 1           kf_buf_append_i16(&hdr, api_key);
867 1           kf_buf_append_i16(&hdr, api_version);
868 1           kf_buf_append_i32(&hdr, corr_id);
869              
870 1 50         if (flexible) {
871             /* compact string for client_id */
872 0           kf_buf_append_compact_string(&hdr, self->client_id, self->client_id_len);
873 0           kf_buf_append_tagged_fields(&hdr);
874             } else {
875 1           kf_buf_append_nullable_string(&hdr, self->client_id, self->client_id_len);
876             }
877              
878             /* Total size = header + body */
879 1           size_t raw_size = hdr.len + body->len;
880 1 50         if (raw_size > (size_t)INT32_MAX) {
881 0           kf_buf_free(&hdr);
882 0           croak("request too large");
883             }
884 1           int32_t total_size = (int32_t)raw_size;
885              
886             /* Compact wbuf if sent prefix wastes significant space */
887 1 50         if (self->wbuf_off > 0 && self->wbuf_off > self->wbuf_len / 2) {
    0          
888 0           self->wbuf_len -= self->wbuf_off;
889 0 0         if (self->wbuf_len > 0)
890 0           memmove(self->wbuf, self->wbuf + self->wbuf_off, self->wbuf_len);
891 0           self->wbuf_off = 0;
892             }
893              
894             /* Append to wbuf: [size:i32][header][body] */
895 1           conn_ensure_wbuf(self, self->wbuf_len + 4 + total_size);
896             {
897 1           uint32_t sz = htonl((uint32_t)total_size);
898 1           memcpy(self->wbuf + self->wbuf_len, &sz, 4);
899 1           self->wbuf_len += 4;
900             }
901 1           Copy(hdr.data, self->wbuf + self->wbuf_len, hdr.len, char);
902 1           self->wbuf_len += hdr.len;
903 1           Copy(body->data, self->wbuf + self->wbuf_len, body->len, char);
904 1           self->wbuf_len += body->len;
905              
906 1           kf_buf_free(&hdr);
907              
908             /* Enqueue callback (skip for no-response requests like acks=0) */
909 1 50         if (!no_response) {
910             ev_kafka_conn_cb_t *cbt;
911 1           Newxz(cbt, 1, ev_kafka_conn_cb_t);
912 1           cbt->correlation_id = corr_id;
913 1           cbt->api_key = api_key;
914 1           cbt->api_version = api_version;
915 1           cbt->internal = internal;
916 1 50         if (cb) {
917 0           cbt->cb = cb;
918 0           SvREFCNT_inc(cb);
919             }
920 1           ngx_queue_insert_tail(&self->cb_queue, &cbt->queue);
921 1           self->pending_count++;
922             }
923              
924 1           conn_start_writing(self);
925              
926 1           return corr_id;
927             }
928              
929             /* ================================================================
930             * ApiVersions (API 18)
931             * ================================================================ */
932              
933 1           static void conn_send_api_versions(pTHX_ ev_kafka_conn_t *self) {
934             kf_buf_t body;
935 1           kf_buf_init(&body);
936              
937             /* ApiVersions v0: empty body, most compatible */
938 1           self->state = CONN_API_VERSIONS;
939 1           conn_send_request(aTHX_ self, API_API_VERSIONS, 0, &body, NULL, 1, 0);
940 1           kf_buf_free(&body);
941 1           }
942              
943 1           static void conn_parse_api_versions_response(pTHX_ ev_kafka_conn_t *self,
944             const char *data, size_t len)
945             {
946 1           const char *p = data;
947 1           const char *end = data + len;
948             int i;
949              
950             /* Initialize all as unsupported */
951 65 100         for (i = 0; i < API_VERSIONS_MAX_KEY; i++)
952 64           self->api_versions[i] = -1;
953              
954 1 50         if (end - p < 2) goto err;
955 1           int16_t error_code = kf_read_i16(p); p += 2;
956 1 50         if (error_code != 0) {
957             char errbuf[128];
958 0           snprintf(errbuf, sizeof(errbuf), "ApiVersions error: %d", error_code);
959 0           conn_emit_error(aTHX_ self, errbuf);
960 0 0         if (conn_check_destroyed(self)) return;
961 0           conn_handle_disconnect(aTHX_ self, errbuf);
962 0           return;
963             }
964              
965             /* v0 response: array(api_key:i16, min_version:i16, max_version:i16) */
966 1 50         if (end - p < 4) goto err;
967 1           int32_t count = kf_read_i32(p); p += 4;
968              
969 3 100         for (i = 0; i < count; i++) {
970 2 50         if (end - p < 6) goto err;
971 2           int16_t key = kf_read_i16(p); p += 2;
972 2           /* int16_t min_version = kf_read_i16(p); */ p += 2;
973 2           int16_t max_version = kf_read_i16(p); p += 2;
974              
975 2 50         if (key >= 0 && key < API_VERSIONS_MAX_KEY)
    50          
976 2           self->api_versions[key] = max_version;
977             }
978              
979 1           self->api_versions_known = 1;
980              
981             /* Handshake complete (or continue to SASL) */
982 1 50         if (self->sasl_mechanism) {
983 0           self->state = CONN_SASL_HANDSHAKE;
984 0           conn_send_sasl_handshake(aTHX_ self);
985             } else {
986 1           self->state = CONN_READY;
987 1           conn_emit_connect(aTHX_ self);
988             }
989 1           return;
990              
991 0           err:
992 0           conn_emit_error(aTHX_ self, "malformed ApiVersions response");
993 0 0         if (conn_check_destroyed(self)) return;
994 0           conn_handle_disconnect(aTHX_ self, "malformed ApiVersions response");
995             }
996              
997             /* ================================================================
998             * SASL Handshake (API 17) + Authenticate (API 36)
999             * ================================================================ */
1000              
1001 0           static void conn_send_sasl_handshake(pTHX_ ev_kafka_conn_t *self) {
1002             kf_buf_t body;
1003 0           kf_buf_init(&body);
1004              
1005             /* SaslHandshake v1: mechanism (STRING) */
1006 0           STRLEN mech_len = strlen(self->sasl_mechanism);
1007 0           kf_buf_append_string(&body, self->sasl_mechanism, (int16_t)mech_len);
1008              
1009 0           self->state = CONN_SASL_HANDSHAKE;
1010 0           conn_send_request(aTHX_ self, API_SASL_HANDSHAKE, 1, &body, NULL, 1, 0);
1011 0           kf_buf_free(&body);
1012 0           }
1013              
1014 0           static void conn_parse_sasl_handshake_response(pTHX_ ev_kafka_conn_t *self,
1015             const char *data, size_t len)
1016             {
1017 0           const char *p = data;
1018 0           const char *end = data + len;
1019              
1020 0 0         if (end - p < 2) goto err;
1021 0           int16_t error_code = kf_read_i16(p); p += 2;
1022              
1023             /* skip mechanisms array */
1024 0 0         if (end - p < 4) goto err;
1025 0           int32_t count = kf_read_i32(p); p += 4;
1026             int32_t i;
1027 0 0         for (i = 0; i < count; i++) {
1028             const char *s; int16_t slen;
1029 0           int n = kf_read_string(p, end, &s, &slen);
1030 0 0         if (n < 0) goto err;
1031 0           p += n;
1032             }
1033              
1034 0 0         if (error_code != 0) {
1035 0           conn_emit_error(aTHX_ self, "SASL handshake failed: mechanism not supported");
1036 0 0         if (conn_check_destroyed(self)) return;
1037 0           conn_handle_disconnect(aTHX_ self, "SASL handshake failed");
1038 0           return;
1039             }
1040              
1041             /* Proceed to authenticate */
1042 0           conn_send_sasl_authenticate(aTHX_ self);
1043 0           return;
1044              
1045 0           err:
1046 0           conn_emit_error(aTHX_ self, "malformed SaslHandshake response");
1047 0 0         if (conn_check_destroyed(self)) return;
1048 0           conn_handle_disconnect(aTHX_ self, "malformed SaslHandshake response");
1049             }
1050              
1051             /* SCRAM state for multi-step SASL */
1052             #define SCRAM_STEP_CLIENT_FIRST 0
1053             #define SCRAM_STEP_CLIENT_FINAL 1
1054             #define SCRAM_STEP_DONE 2
1055              
1056 0           static void conn_send_sasl_authenticate(pTHX_ ev_kafka_conn_t *self) {
1057             kf_buf_t body;
1058 0           kf_buf_init(&body);
1059              
1060 0 0         if (self->sasl_mechanism && strcmp(self->sasl_mechanism, "PLAIN") == 0) {
    0          
1061             /* PLAIN: \0username\0password */
1062 0 0         STRLEN ulen = self->sasl_username ? strlen(self->sasl_username) : 0;
1063 0 0         STRLEN plen = self->sasl_password ? strlen(self->sasl_password) : 0;
1064 0           int32_t auth_len = 1 + (int32_t)ulen + 1 + (int32_t)plen;
1065              
1066 0           kf_buf_append_i32(&body, auth_len);
1067 0           kf_buf_append_i8(&body, 0);
1068 0 0         if (ulen > 0) kf_buf_append(&body, self->sasl_username, ulen);
1069 0           kf_buf_append_i8(&body, 0);
1070 0 0         if (plen > 0) kf_buf_append(&body, self->sasl_password, plen);
1071             }
1072             #ifdef HAVE_OPENSSL
1073 0 0         else if (self->sasl_mechanism &&
1074 0 0         (strcmp(self->sasl_mechanism, "SCRAM-SHA-256") == 0 ||
1075 0 0         strcmp(self->sasl_mechanism, "SCRAM-SHA-512") == 0)) {
1076             /* SCRAM client-first-message: n,,n=,r= */
1077             char nonce[33];
1078             {
1079             unsigned char rnd[16];
1080             int i;
1081 0           RAND_bytes(rnd, 16);
1082 0 0         for (i = 0; i < 16; i++)
1083 0           snprintf(nonce + i*2, 3, "%02x", rnd[i]);
1084 0           nonce[32] = '\0';
1085             }
1086              
1087             /* save nonce for later steps */
1088 0 0         if (self->scram_nonce) Safefree(self->scram_nonce);
1089 0           self->scram_nonce = savepv(nonce);
1090 0           self->scram_step = SCRAM_STEP_CLIENT_FIRST;
1091              
1092 0 0         if (!self->sasl_username) {
1093 0           conn_emit_error(aTHX_ self, "SCRAM: username required");
1094 0           kf_buf_free(&body);
1095 0           return;
1096             }
1097             kf_buf_t msg;
1098 0           kf_buf_init(&msg);
1099 0           kf_buf_append(&msg, "n,,n=", 5);
1100 0           kf_buf_append(&msg, self->sasl_username, strlen(self->sasl_username));
1101 0           kf_buf_append(&msg, ",r=", 3);
1102 0           kf_buf_append(&msg, nonce, 32);
1103              
1104             /* save client-first-message-bare for later binding */
1105 0 0         if (self->scram_client_first) Safefree(self->scram_client_first);
1106             /* bare = after "n,," */
1107 0           self->scram_client_first = savepvn(msg.data + 3, msg.len - 3);
1108 0           self->scram_client_first_len = msg.len - 3;
1109              
1110 0           kf_buf_append_i32(&body, (int32_t)msg.len);
1111 0           kf_buf_append(&body, msg.data, msg.len);
1112 0           kf_buf_free(&msg);
1113             }
1114             #endif
1115             else {
1116 0           conn_emit_error(aTHX_ self, "unsupported SASL mechanism");
1117 0           kf_buf_free(&body);
1118 0           return;
1119             }
1120              
1121 0           self->state = CONN_SASL_AUTH;
1122             {
1123 0           int16_t ver = self->api_versions[API_SASL_AUTHENTICATE];
1124 0 0         if (ver < 0) ver = 1;
1125 0 0         if (ver > 2) ver = 2;
1126 0           conn_send_request(aTHX_ self, API_SASL_AUTHENTICATE, ver, &body, NULL, 1, 0);
1127             }
1128 0           kf_buf_free(&body);
1129             }
1130              
1131 0           static void conn_parse_sasl_authenticate_response(pTHX_ ev_kafka_conn_t *self,
1132             const char *data, size_t len)
1133             {
1134 0           const char *p = data;
1135 0           const char *end = data + len;
1136              
1137 0 0         if (end - p < 2) goto err;
1138 0           int16_t error_code = kf_read_i16(p); p += 2;
1139              
1140 0           const char *errmsg_str = NULL;
1141 0           int16_t errmsg_len = 0;
1142             {
1143 0           int n = kf_read_string(p, end, &errmsg_str, &errmsg_len);
1144 0 0         if (n < 0) goto err;
1145 0           p += n;
1146             }
1147              
1148             /* auth_bytes */
1149 0           const char *auth_data = NULL;
1150 0           int32_t auth_data_len = 0;
1151 0 0         if (end - p >= 4) {
1152 0           auth_data_len = kf_read_i32(p); p += 4;
1153 0 0         if (auth_data_len > 0 && end - p >= auth_data_len) {
    0          
1154 0           auth_data = p;
1155 0           p += auth_data_len;
1156             }
1157             }
1158              
1159 0 0         if (error_code != 0) {
1160             char errbuf[512];
1161 0 0         if (errmsg_str && errmsg_len > 0)
    0          
1162 0           snprintf(errbuf, sizeof(errbuf), "SASL auth failed: %.*s", (int)errmsg_len, errmsg_str);
1163             else
1164 0           snprintf(errbuf, sizeof(errbuf), "SASL auth failed: error %d", error_code);
1165 0           conn_emit_error(aTHX_ self, errbuf);
1166 0 0         if (conn_check_destroyed(self)) return;
1167 0           conn_handle_disconnect(aTHX_ self, "SASL auth failed");
1168 0           return;
1169             }
1170              
1171             #ifdef HAVE_OPENSSL
1172             /* SCRAM multi-step handling */
1173 0 0         if (self->sasl_mechanism && self->scram_step == SCRAM_STEP_CLIENT_FIRST && auth_data) {
    0          
    0          
1174             /* Server-first-message: r=,s=,i= */
1175             /* Parse server response, compute proof, send client-final */
1176 0           const char *server_nonce = NULL;
1177 0           size_t server_nonce_len = 0;
1178 0           const char *salt_b64 = NULL;
1179 0           size_t salt_b64_len = 0;
1180 0           int iterations = 0;
1181             {
1182 0           const char *sp = auth_data;
1183 0           const char *se = auth_data + auth_data_len;
1184 0 0         while (sp < se) {
1185 0 0         if (sp + 2 <= se && sp[0] == 'r' && sp[1] == '=') {
    0          
    0          
1186 0           sp += 2; server_nonce = sp;
1187 0 0         while (sp < se && *sp != ',') sp++;
    0          
1188 0           server_nonce_len = sp - server_nonce;
1189 0 0         } else if (sp + 2 <= se && sp[0] == 's' && sp[1] == '=') {
    0          
    0          
1190 0           sp += 2; salt_b64 = sp;
1191 0 0         while (sp < se && *sp != ',') sp++;
    0          
1192 0           salt_b64_len = sp - salt_b64;
1193 0 0         } else if (sp + 2 <= se && sp[0] == 'i' && sp[1] == '=') {
    0          
    0          
1194 0           sp += 2;
1195 0           iterations = atoi(sp);
1196 0 0         while (sp < se && *sp != ',') sp++;
    0          
1197             }
1198 0 0         if (sp < se && *sp == ',') sp++;
    0          
1199 0           else sp++;
1200             }
1201             }
1202              
1203 0 0         if (!server_nonce || !salt_b64 || iterations <= 0) {
    0          
    0          
1204 0           conn_emit_error(aTHX_ self, "SCRAM: malformed server-first-message");
1205 0 0         if (conn_check_destroyed(self)) return;
1206 0           conn_handle_disconnect(aTHX_ self, "SCRAM auth failed");
1207 0           return;
1208             }
1209              
1210             /* RFC 5802: server nonce must start with client nonce */
1211 0 0         if (server_nonce_len < 32 ||
1212 0 0         memcmp(server_nonce, self->scram_nonce, 32) != 0) {
1213 0           conn_emit_error(aTHX_ self, "SCRAM: server nonce mismatch");
1214 0 0         if (conn_check_destroyed(self)) return;
1215 0           conn_handle_disconnect(aTHX_ self, "SCRAM auth failed");
1216 0           return;
1217             }
1218              
1219 0           int is_sha512 = (strcmp(self->sasl_mechanism, "SCRAM-SHA-512") == 0);
1220 0 0         const EVP_MD *md = is_sha512 ? EVP_sha512() : EVP_sha256();
1221 0 0         int digest_len = is_sha512 ? 64 : 32;
1222              
1223             /* Decode salt from base64 */
1224             unsigned char salt[128];
1225             int salt_len;
1226             {
1227 0           BIO *b64 = BIO_new(BIO_f_base64());
1228 0           BIO *bmem = BIO_new_mem_buf(salt_b64, (int)salt_b64_len);
1229 0           bmem = BIO_push(b64, bmem);
1230 0           BIO_set_flags(bmem, BIO_FLAGS_BASE64_NO_NL);
1231 0           salt_len = BIO_read(bmem, salt, sizeof(salt));
1232 0           BIO_free_all(bmem);
1233 0 0         if (salt_len <= 0) {
1234 0           conn_emit_error(aTHX_ self, "SCRAM: bad salt");
1235 0 0         if (conn_check_destroyed(self)) return;
1236 0           conn_handle_disconnect(aTHX_ self, "SCRAM auth failed");
1237 0           return;
1238             }
1239             }
1240              
1241             /* SaltedPassword = Hi(password, salt, iterations) using PBKDF2 */
1242             unsigned char salted_password[64];
1243 0           PKCS5_PBKDF2_HMAC(self->sasl_password, strlen(self->sasl_password),
1244             salt, salt_len, iterations, md, digest_len, salted_password);
1245              
1246             /* ClientKey = HMAC(SaltedPassword, "Client Key") */
1247             unsigned char client_key[64];
1248 0           unsigned int ck_len = digest_len;
1249 0           HMAC(md, salted_password, digest_len,
1250             (unsigned char *)"Client Key", 10, client_key, &ck_len);
1251              
1252             /* ServerKey = HMAC(SaltedPassword, "Server Key") — saved for
1253             * server-final-message verification. */
1254 0           unsigned int sk_hmac_len = digest_len;
1255 0           HMAC(md, salted_password, digest_len,
1256             (unsigned char *)"Server Key", 10,
1257 0           self->scram_server_key, &sk_hmac_len);
1258 0           self->scram_server_key_len = (int)sk_hmac_len;
1259              
1260             /* StoredKey = H(ClientKey) */
1261             unsigned char stored_key[64];
1262             {
1263 0           EVP_MD_CTX *ctx = EVP_MD_CTX_new();
1264             unsigned int sk_len;
1265 0           EVP_DigestInit_ex(ctx, md, NULL);
1266 0           EVP_DigestUpdate(ctx, client_key, digest_len);
1267 0           EVP_DigestFinal_ex(ctx, stored_key, &sk_len);
1268 0           EVP_MD_CTX_free(ctx);
1269             }
1270              
1271             /* AuthMessage = client-first-bare + "," + server-first + "," + client-final-without-proof */
1272 0           char channel_binding_b64[] = "biws"; /* base64("n,,") */
1273             kf_buf_t auth_msg;
1274 0           kf_buf_init(&auth_msg);
1275 0           kf_buf_append(&auth_msg, self->scram_client_first, self->scram_client_first_len);
1276 0           kf_buf_append(&auth_msg, ",", 1);
1277 0           kf_buf_append(&auth_msg, auth_data, auth_data_len);
1278 0           kf_buf_append(&auth_msg, ",c=", 3);
1279 0           kf_buf_append(&auth_msg, channel_binding_b64, 4);
1280 0           kf_buf_append(&auth_msg, ",r=", 3);
1281 0           kf_buf_append(&auth_msg, server_nonce, server_nonce_len);
1282              
1283             /* ClientSignature = HMAC(StoredKey, AuthMessage) */
1284             unsigned char client_sig[64];
1285 0           unsigned int cs_len = digest_len;
1286 0           HMAC(md, stored_key, digest_len,
1287 0           (unsigned char *)auth_msg.data, auth_msg.len, client_sig, &cs_len);
1288              
1289             /* ClientProof = ClientKey XOR ClientSignature */
1290             unsigned char proof[64];
1291             int di;
1292 0 0         for (di = 0; di < digest_len; di++)
1293 0           proof[di] = client_key[di] ^ client_sig[di];
1294              
1295             /* Save AuthMessage for server-signature verification at step 2. */
1296 0 0         if (self->scram_auth_message) Safefree(self->scram_auth_message);
1297 0           self->scram_auth_message = savepvn(auth_msg.data, auth_msg.len);
1298 0           self->scram_auth_message_len = auth_msg.len;
1299              
1300 0           kf_buf_free(&auth_msg);
1301              
1302             /* Base64 encode proof */
1303             char proof_b64[256];
1304             {
1305 0           BIO *b64 = BIO_new(BIO_f_base64());
1306 0           BIO *bmem = BIO_new(BIO_s_mem());
1307 0           b64 = BIO_push(b64, bmem);
1308 0           BIO_set_flags(b64, BIO_FLAGS_BASE64_NO_NL);
1309 0           BIO_write(b64, proof, digest_len);
1310 0           BIO_flush(b64);
1311             BUF_MEM *bptr;
1312 0           BIO_get_mem_ptr(b64, &bptr);
1313 0 0         int plen = bptr->length < 255 ? (int)bptr->length : 255;
1314 0           memcpy(proof_b64, bptr->data, plen);
1315 0           proof_b64[plen] = '\0';
1316 0           BIO_free_all(b64);
1317             }
1318              
1319             /* Build client-final-message: c=biws,r=,p= */
1320             kf_buf_t final_msg;
1321 0           kf_buf_init(&final_msg);
1322 0           kf_buf_append(&final_msg, "c=", 2);
1323 0           kf_buf_append(&final_msg, channel_binding_b64, 4);
1324 0           kf_buf_append(&final_msg, ",r=", 3);
1325 0           kf_buf_append(&final_msg, server_nonce, server_nonce_len);
1326 0           kf_buf_append(&final_msg, ",p=", 3);
1327 0           kf_buf_append(&final_msg, proof_b64, strlen(proof_b64));
1328              
1329             /* Send client-final via SaslAuthenticate */
1330             kf_buf_t body;
1331 0           kf_buf_init(&body);
1332 0           kf_buf_append_i32(&body, (int32_t)final_msg.len);
1333 0           kf_buf_append(&body, final_msg.data, final_msg.len);
1334              
1335 0           self->scram_step = SCRAM_STEP_CLIENT_FINAL;
1336             {
1337 0           int16_t ver = self->api_versions[API_SASL_AUTHENTICATE];
1338 0 0         if (ver < 0) ver = 1;
1339 0 0         if (ver > 2) ver = 2;
1340 0           conn_send_request(aTHX_ self, API_SASL_AUTHENTICATE, ver, &body, NULL, 1, 0);
1341             }
1342 0           kf_buf_free(&body);
1343 0           kf_buf_free(&final_msg);
1344 0           return;
1345             }
1346              
1347 0 0         if (self->sasl_mechanism && self->scram_step == SCRAM_STEP_CLIENT_FINAL) {
    0          
1348             /* Server-final-message: v=
1349             * ServerSignature = HMAC(ServerKey, AuthMessage) — RFC 5802. */
1350 0 0         if (!auth_data || auth_data_len < 2 || auth_data[0] != 'v' || auth_data[1] != '=') {
    0          
    0          
    0          
1351 0           conn_emit_error(aTHX_ self, "SCRAM: missing server signature");
1352 0 0         if (conn_check_destroyed(self)) return;
1353 0           conn_handle_disconnect(aTHX_ self, "SCRAM verify failed");
1354 0           return;
1355             }
1356 0           const char *recv_b64 = auth_data + 2;
1357 0           int recv_b64_len = (int)(auth_data_len - 2);
1358              
1359             /* Compute expected server signature. */
1360 0           const EVP_MD *md = NULL;
1361 0 0         if (strcmp(self->sasl_mechanism, "SCRAM-SHA-256") == 0) md = EVP_sha256();
1362 0 0         else if (strcmp(self->sasl_mechanism, "SCRAM-SHA-512") == 0) md = EVP_sha512();
1363 0 0         if (!md || self->scram_server_key_len <= 0 || !self->scram_auth_message) {
    0          
    0          
1364 0           conn_emit_error(aTHX_ self, "SCRAM: missing state for verification");
1365 0 0         if (conn_check_destroyed(self)) return;
1366 0           conn_handle_disconnect(aTHX_ self, "SCRAM verify failed");
1367 0           return;
1368             }
1369             unsigned char expected[64];
1370 0           unsigned int expected_len = (unsigned int)self->scram_server_key_len;
1371 0           HMAC(md, self->scram_server_key, self->scram_server_key_len,
1372 0           (unsigned char *)self->scram_auth_message,
1373             self->scram_auth_message_len, expected, &expected_len);
1374              
1375             /* Decode the received base64 signature. */
1376             unsigned char recv_sig[64];
1377 0           int recv_sig_len = 0;
1378             {
1379 0           BIO *b64 = BIO_new(BIO_f_base64());
1380 0           BIO *bmem = BIO_new_mem_buf(recv_b64, recv_b64_len);
1381 0           BIO_set_flags(b64, BIO_FLAGS_BASE64_NO_NL);
1382 0           b64 = BIO_push(b64, bmem);
1383 0           recv_sig_len = BIO_read(b64, recv_sig, (int)sizeof(recv_sig));
1384 0           BIO_free_all(b64);
1385             }
1386              
1387 0           int ok = (recv_sig_len == (int)expected_len)
1388 0 0         && (CRYPTO_memcmp(recv_sig, expected, expected_len) == 0);
    0          
1389              
1390             /* Wipe ServerKey/AuthMessage now that verification is done. */
1391 0           OPENSSL_cleanse(self->scram_server_key, sizeof(self->scram_server_key));
1392 0           self->scram_server_key_len = 0;
1393 0 0         if (self->scram_auth_message) {
1394 0           OPENSSL_cleanse(self->scram_auth_message,
1395             self->scram_auth_message_len);
1396 0           Safefree(self->scram_auth_message);
1397 0           self->scram_auth_message = NULL;
1398 0           self->scram_auth_message_len = 0;
1399             }
1400              
1401 0 0         if (!ok) {
1402 0           conn_emit_error(aTHX_ self, "SCRAM: server signature mismatch");
1403 0 0         if (conn_check_destroyed(self)) return;
1404 0           conn_handle_disconnect(aTHX_ self, "SCRAM verify failed");
1405 0           return;
1406             }
1407 0           self->scram_step = SCRAM_STEP_DONE;
1408             /* fall through to CONN_READY */
1409             }
1410             #endif
1411              
1412             /* Auth success — connection is ready */
1413 0           self->state = CONN_READY;
1414 0           conn_emit_connect(aTHX_ self);
1415 0           return;
1416              
1417 0           err:
1418 0           conn_emit_error(aTHX_ self, "malformed SaslAuthenticate response");
1419 0 0         if (conn_check_destroyed(self)) return;
1420 0           conn_handle_disconnect(aTHX_ self, "malformed SaslAuthenticate response");
1421             }
1422              
1423             /* ================================================================
1424             * Response dispatch
1425             * ================================================================ */
1426              
1427 1           static void conn_dispatch_response(pTHX_ ev_kafka_conn_t *self,
1428             ev_kafka_conn_cb_t *cbt, const char *data, size_t len)
1429             {
1430             /* Internal handshake responses */
1431 1 50         if (cbt->internal) {
1432 1           switch (cbt->api_key) {
1433 1           case API_API_VERSIONS:
1434 1           conn_parse_api_versions_response(aTHX_ self, data, len);
1435 1           break;
1436 0           case API_SASL_HANDSHAKE:
1437 0           conn_parse_sasl_handshake_response(aTHX_ self, data, len);
1438 0           break;
1439 0           case API_SASL_AUTHENTICATE:
1440 0           conn_parse_sasl_authenticate_response(aTHX_ self, data, len);
1441 0           break;
1442 0           default: break;
1443             }
1444 1           return;
1445             }
1446              
1447             /* User-facing responses: parse and invoke callback */
1448 0 0         if (!cbt->cb) return;
1449              
1450 0           switch (cbt->api_key) {
1451 0           case API_METADATA: {
1452             /* Parse metadata response and return as Perl hash */
1453 0           SV *result = conn_parse_metadata_response(aTHX_ self, cbt->api_version, data, len);
1454 0           conn_invoke_cb(aTHX_ self, cbt->cb, result, NULL);
1455 0           break;
1456             }
1457 0           case API_PRODUCE: {
1458 0           SV *result = conn_parse_produce_response(aTHX_ self, cbt->api_version, data, len);
1459 0           conn_invoke_cb(aTHX_ self, cbt->cb, result, NULL);
1460 0           break;
1461             }
1462 0           case API_FETCH: {
1463 0           SV *result = conn_parse_fetch_response(aTHX_ self, cbt->api_version, data, len);
1464 0           conn_invoke_cb(aTHX_ self, cbt->cb, result, NULL);
1465 0           break;
1466             }
1467 0           case API_LIST_OFFSETS: {
1468 0           SV *result = conn_parse_list_offsets_response(aTHX_ self, cbt->api_version, data, len);
1469 0           conn_invoke_cb(aTHX_ self, cbt->cb, result, NULL);
1470 0           break;
1471             }
1472 0           case API_FIND_COORDINATOR: {
1473 0           SV *result = conn_parse_find_coordinator_response(aTHX_ self, cbt->api_version, data, len);
1474 0           conn_invoke_cb(aTHX_ self, cbt->cb, result, NULL);
1475 0           break;
1476             }
1477 0           case API_JOIN_GROUP: {
1478 0           SV *result = conn_parse_join_group_response(aTHX_ self, cbt->api_version, data, len);
1479 0           conn_invoke_cb(aTHX_ self, cbt->cb, result, NULL);
1480 0           break;
1481             }
1482 0           case API_SYNC_GROUP: {
1483 0           SV *result = conn_parse_sync_group_response(aTHX_ self, cbt->api_version, data, len);
1484 0           conn_invoke_cb(aTHX_ self, cbt->cb, result, NULL);
1485 0           break;
1486             }
1487 0           case API_HEARTBEAT: {
1488 0           SV *result = conn_parse_heartbeat_response(aTHX_ self, cbt->api_version, data, len);
1489 0           conn_invoke_cb(aTHX_ self, cbt->cb, result, NULL);
1490 0           break;
1491             }
1492 0           case API_OFFSET_COMMIT: {
1493 0           SV *result = conn_parse_offset_commit_response(aTHX_ self, cbt->api_version, data, len);
1494 0           conn_invoke_cb(aTHX_ self, cbt->cb, result, NULL);
1495 0           break;
1496             }
1497 0           case API_OFFSET_FETCH: {
1498 0           SV *result = conn_parse_offset_fetch_response(aTHX_ self, cbt->api_version, data, len);
1499 0           conn_invoke_cb(aTHX_ self, cbt->cb, result, NULL);
1500 0           break;
1501             }
1502 0           case API_LEAVE_GROUP: {
1503 0           SV *result = conn_parse_leave_group_response(aTHX_ self, cbt->api_version, data, len);
1504 0           conn_invoke_cb(aTHX_ self, cbt->cb, result, NULL);
1505 0           break;
1506             }
1507 0           case API_CREATE_TOPICS: {
1508 0           SV *result = conn_parse_create_topics_response(aTHX_ self, cbt->api_version, data, len);
1509 0           conn_invoke_cb(aTHX_ self, cbt->cb, result, NULL);
1510 0           break;
1511             }
1512 0           case API_DELETE_TOPICS: {
1513 0           SV *result = conn_parse_delete_topics_response(aTHX_ self, cbt->api_version, data, len);
1514 0           conn_invoke_cb(aTHX_ self, cbt->cb, result, NULL);
1515 0           break;
1516             }
1517 0           case API_INIT_PRODUCER_ID: {
1518 0           SV *result = conn_parse_init_producer_id_response(aTHX_ self, cbt->api_version, data, len);
1519 0           conn_invoke_cb(aTHX_ self, cbt->cb, result, NULL);
1520 0           break;
1521             }
1522 0           case API_ADD_PARTITIONS_TXN: {
1523 0           SV *result = conn_parse_add_partitions_to_txn_response(aTHX_ self, cbt->api_version, data, len);
1524 0           conn_invoke_cb(aTHX_ self, cbt->cb, result, NULL);
1525 0           break;
1526             }
1527 0           case API_END_TXN: {
1528 0           SV *result = conn_parse_end_txn_response(aTHX_ self, cbt->api_version, data, len);
1529 0           conn_invoke_cb(aTHX_ self, cbt->cb, result, NULL);
1530 0           break;
1531             }
1532 0           case API_TXN_OFFSET_COMMIT: {
1533 0           SV *result = conn_parse_txn_offset_commit_response(aTHX_ self, cbt->api_version, data, len);
1534 0           conn_invoke_cb(aTHX_ self, cbt->cb, result, NULL);
1535 0           break;
1536             }
1537 0           default: {
1538             /* Unknown API — return raw bytes */
1539 0           SV *result = newSVpvn(data, len);
1540 0           conn_invoke_cb(aTHX_ self, cbt->cb, sv_2mortal(result), NULL);
1541 0           break;
1542             }
1543             }
1544             }
1545              
1546             /* ================================================================
1547             * RecordBatch encoder (for Produce requests)
1548             * ================================================================ */
1549              
1550             /* Build a single Record (within a RecordBatch).
1551             * Format: length(varint), attributes(i8=0), timestampDelta(varint),
1552             * offsetDelta(varint), key(varint-bytes), value(varint-bytes),
1553             * headers(varint array of {key:varint-str, value:varint-bytes})
1554             */
1555 141           static void kf_encode_record(kf_buf_t *b, int offset_delta, int64_t ts_delta,
1556             const char *key, STRLEN key_len, const char *value, STRLEN value_len,
1557             HV *headers)
1558             {
1559             kf_buf_t rec;
1560 141           kf_buf_init(&rec);
1561              
1562 141           kf_buf_append_i8(&rec, 0); /* attributes */
1563 141           kf_buf_append_varint(&rec, ts_delta);
1564 141           kf_buf_append_varint(&rec, offset_delta);
1565              
1566             /* key */
1567 141 100         if (key) {
1568 140           kf_buf_append_varint(&rec, (int64_t)key_len);
1569 140           kf_buf_append(&rec, key, key_len);
1570             } else {
1571 1           kf_buf_append_varint(&rec, -1);
1572             }
1573              
1574             /* value */
1575 141 100         if (value) {
1576 140           kf_buf_append_varint(&rec, (int64_t)value_len);
1577 140           kf_buf_append(&rec, value, value_len);
1578             } else {
1579 1           kf_buf_append_varint(&rec, -1);
1580             }
1581              
1582             /* headers */
1583 142 100         if (headers && HvUSEDKEYS(headers) > 0) {
    50          
    50          
1584 1 50         kf_buf_append_varint(&rec, (int64_t)HvUSEDKEYS(headers));
1585             HE *entry;
1586 1           hv_iterinit(headers);
1587 3 100         while ((entry = hv_iternext(headers))) {
1588             I32 hklen;
1589 2           const char *hkey = hv_iterkey(entry, &hklen);
1590 2           SV *hval = hv_iterval(headers, entry);
1591             STRLEN hvlen;
1592 2           const char *hvstr = SvPV(hval, hvlen);
1593              
1594 2           kf_buf_append_varint(&rec, (int64_t)hklen);
1595 2           kf_buf_append(&rec, hkey, hklen);
1596 2           kf_buf_append_varint(&rec, (int64_t)hvlen);
1597 2           kf_buf_append(&rec, hvstr, hvlen);
1598             }
1599             } else {
1600 140           kf_buf_append_varint(&rec, 0); /* 0 headers */
1601             }
1602              
1603             /* Write record: length(varint) + body */
1604 141           kf_buf_append_varint(b, (int64_t)rec.len);
1605 141           kf_buf_append(b, rec.data, rec.len);
1606 141           kf_buf_free(&rec);
1607 141           }
1608              
1609             /* Build a RecordBatch from one or more records, with optional
1610             * compression and producer-ID/transactional state. Caller frees *out. */
1611 16           static void kf_encode_record_batch_multi(pTHX_ kf_buf_t *out,
1612             AV *records_av, int64_t timestamp, int compression,
1613             int64_t producer_id, int16_t producer_epoch, int32_t base_sequence,
1614             int is_transactional)
1615             {
1616 16           SSize_t i, count = av_len(records_av) + 1;
1617              
1618             /* Validate up front so croak() doesn't longjmp over kf_buf_free below. */
1619 157 100         for (i = 0; i < count; i++) {
1620 141           SV **elem = av_fetch(records_av, i, 0);
1621 141 50         if (!elem || !SvROK(*elem) || SvTYPE(SvRV(*elem)) != SVt_PVHV)
    50          
    50          
1622 0           croak("produce_batch: record element must be a hashref");
1623             }
1624              
1625             kf_buf_t records;
1626 16           kf_buf_init(&records);
1627              
1628 157 100         for (i = 0; i < count; i++) {
1629 141           SV **elem = av_fetch(records_av, i, 0);
1630 141           HV *rh = (HV*)SvRV(*elem);
1631 141           SV **key_sv = hv_fetch(rh, "key", 3, 0);
1632 141           SV **val_sv = hv_fetch(rh, "value", 5, 0);
1633 141           SV **hdr_sv = hv_fetch(rh, "headers", 7, 0);
1634 141           const char *key = NULL; STRLEN key_len = 0;
1635 141           const char *val = NULL; STRLEN val_len = 0;
1636 141           HV *hdrs = NULL;
1637 141 50         if (key_sv && SvOK(*key_sv)) key = SvPV(*key_sv, key_len);
    100          
1638 141 50         if (val_sv && SvOK(*val_sv)) val = SvPV(*val_sv, val_len);
    100          
1639 141 100         if (hdr_sv && SvROK(*hdr_sv) && SvTYPE(SvRV(*hdr_sv)) == SVt_PVHV)
    50          
    50          
1640 1           hdrs = (HV*)SvRV(*hdr_sv);
1641 141           kf_encode_record(&records, (int)i, 0, key, key_len, val, val_len, hdrs);
1642             }
1643              
1644             kf_buf_t inner;
1645 16           kf_buf_init(&inner);
1646              
1647 16           int16_t attrs = (int16_t)(compression & 0x07);
1648 16 50         if (is_transactional) attrs |= 0x10; /* bit 4 = isTransactional */
1649 16           kf_buf_append_i16(&inner, attrs);
1650 16           kf_buf_append_i32(&inner, (int32_t)(count - 1)); /* lastOffsetDelta */
1651 16           kf_buf_append_i64(&inner, timestamp);
1652 16           kf_buf_append_i64(&inner, timestamp);
1653 16           kf_buf_append_i64(&inner, producer_id);
1654 16           kf_buf_append_i16(&inner, producer_epoch);
1655 16           kf_buf_append_i32(&inner, base_sequence);
1656              
1657             #ifdef HAVE_LZ4
1658             if (compression == COMPRESS_LZ4) {
1659             int max_compressed = LZ4_compressBound((int)records.len);
1660             char *compressed;
1661             Newx(compressed, max_compressed, char);
1662             int clen = LZ4_compress_default(records.data, compressed,
1663             (int)records.len, max_compressed);
1664             if (clen > 0) {
1665             kf_buf_append_i32(&inner, (int32_t)count);
1666             kf_buf_append(&inner, compressed, clen);
1667             } else {
1668             uint16_t zero = 0;
1669             memcpy(inner.data, &zero, 2);
1670             kf_buf_append_i32(&inner, (int32_t)count);
1671             kf_buf_append(&inner, records.data, records.len);
1672             }
1673             Safefree(compressed);
1674             } else
1675             #endif
1676             #ifdef HAVE_ZLIB
1677 16 100         if (compression == COMPRESS_GZIP) {
1678 2           size_t dest_cap = compressBound((uLong)records.len) + 32;
1679             char *compressed;
1680 2           Newx(compressed, dest_cap, char);
1681             z_stream zs;
1682 2           Zero(&zs, 1, z_stream);
1683 2           int zinit = deflateInit2(&zs, Z_DEFAULT_COMPRESSION, Z_DEFLATED,
1684             MAX_WBITS + 16, 8, Z_DEFAULT_STRATEGY);
1685 2           int zok = 0;
1686 2 50         if (zinit == Z_OK) {
1687 2           zs.next_in = (Bytef *)records.data;
1688 2           zs.avail_in = (uInt)records.len;
1689 2           zs.next_out = (Bytef *)compressed;
1690 2           zs.avail_out = (uInt)dest_cap;
1691 2 50         if (deflate(&zs, Z_FINISH) == Z_STREAM_END) zok = 1;
1692 2           deflateEnd(&zs);
1693             }
1694 2 50         if (zok) {
1695 2           kf_buf_append_i32(&inner, (int32_t)count);
1696 2           kf_buf_append(&inner, compressed, zs.total_out);
1697             } else {
1698 0           uint16_t zero = 0;
1699 0           memcpy(inner.data, &zero, 2);
1700 0           kf_buf_append_i32(&inner, (int32_t)count);
1701 0           kf_buf_append(&inner, records.data, records.len);
1702             }
1703 2           Safefree(compressed);
1704             } else
1705             #endif
1706             #ifdef HAVE_ZSTD
1707             if (compression == COMPRESS_ZSTD) {
1708             size_t dest_cap = ZSTD_compressBound(records.len);
1709             char *compressed;
1710             Newx(compressed, dest_cap, char);
1711             size_t clen = ZSTD_compress(compressed, dest_cap,
1712             records.data, records.len, ZSTD_CLEVEL_DEFAULT);
1713             if (!ZSTD_isError(clen)) {
1714             kf_buf_append_i32(&inner, (int32_t)count);
1715             kf_buf_append(&inner, compressed, clen);
1716             } else {
1717             uint16_t zero = 0;
1718             memcpy(inner.data, &zero, 2);
1719             kf_buf_append_i32(&inner, (int32_t)count);
1720             kf_buf_append(&inner, records.data, records.len);
1721             }
1722             Safefree(compressed);
1723             } else
1724             #endif
1725             #ifdef HAVE_SNAPPY
1726             if (compression == COMPRESS_SNAPPY) {
1727             size_t dest_cap = snappy_max_compressed_length(records.len);
1728             char *compressed;
1729             Newx(compressed, dest_cap, char);
1730             size_t clen = dest_cap;
1731             if (snappy_compress(records.data, records.len, compressed, &clen)
1732             == SNAPPY_OK) {
1733             kf_buf_append_i32(&inner, (int32_t)count);
1734             kf_buf_append(&inner, compressed, clen);
1735             } else {
1736             uint16_t zero = 0;
1737             memcpy(inner.data, &zero, 2);
1738             kf_buf_append_i32(&inner, (int32_t)count);
1739             kf_buf_append(&inner, records.data, records.len);
1740             }
1741             Safefree(compressed);
1742             } else
1743             #endif
1744             {
1745             (void)compression;
1746 14           kf_buf_append_i32(&inner, (int32_t)count);
1747 14           kf_buf_append(&inner, records.data, records.len);
1748             }
1749              
1750 16           uint32_t crc_val = crc32c(inner.data, inner.len);
1751 16           int32_t batch_length = 4 + 1 + 4 + (int32_t)inner.len;
1752              
1753 16           kf_buf_init(out);
1754 16           kf_buf_append_i64(out, 0);
1755 16           kf_buf_append_i32(out, batch_length);
1756 16           kf_buf_append_i32(out, 0);
1757 16           kf_buf_append_i8(out, 2);
1758 16           kf_buf_append_i32(out, (int32_t)crc_val);
1759 16           kf_buf_append(out, inner.data, inner.len);
1760              
1761 16           kf_buf_free(&inner);
1762 16           kf_buf_free(&records);
1763 16           }
1764              
1765             /* ================================================================
1766             * Response parsers
1767             * ================================================================ */
1768              
1769             /* Metadata response parser (API 3) */
1770 2           static SV* conn_parse_metadata_response(pTHX_ ev_kafka_conn_t *self,
1771             int16_t version, const char *data, size_t len)
1772             {
1773 2           const char *p = data;
1774 2           const char *end = data + len;
1775 2           HV *result = newHV();
1776 2           AV *brokers_av = newAV();
1777 2           AV *topics_av = newAV();
1778             int n;
1779              
1780             (void)self;
1781              
1782             /* For v9+ (flexible versions), fields use compact encoding */
1783 2           int flexible = (version >= 9);
1784              
1785 2 50         if (flexible) {
1786             /* throttle_time_ms */
1787 0 0         if (end - p < 4) goto done;
1788 0           /* int32_t throttle = kf_read_i32(p); */ p += 4;
1789              
1790             /* brokers: compact array */
1791             uint64_t raw;
1792 0           n = kf_read_uvarint(p, end, &raw);
1793 0 0         if (n < 0) goto done;
1794 0           p += n;
1795 0           int32_t broker_count = (int32_t)(raw - 1);
1796             int32_t i;
1797              
1798 0 0         for (i = 0; i < broker_count; i++) {
1799 0           HV *bh = newHV();
1800 0 0         if (end - p < 4) goto done;
1801 0           int32_t nid = kf_read_i32(p); p += 4;
1802 0           hv_store(bh, "node_id", 7, newSViv(nid), 0);
1803              
1804             const char *host; int32_t hlen;
1805 0           n = kf_read_compact_string(p, end, &host, &hlen);
1806 0 0         if (n < 0) goto done;
1807 0           p += n;
1808 0 0         hv_store(bh, "host", 4, newSVpvn(host ? host : "", host ? hlen : 0), 0);
    0          
1809              
1810 0 0         if (end - p < 4) goto done;
1811 0           int32_t port = kf_read_i32(p); p += 4;
1812 0           hv_store(bh, "port", 4, newSViv(port), 0);
1813              
1814             /* rack: compact nullable string */
1815             const char *rack; int32_t rlen;
1816 0           n = kf_read_compact_string(p, end, &rack, &rlen);
1817 0 0         if (n < 0) goto done;
1818 0           p += n;
1819              
1820             /* tagged fields */
1821 0           n = kf_skip_tagged_fields(p, end);
1822 0 0         if (n < 0) goto done;
1823 0           p += n;
1824              
1825 0           av_push(brokers_av, newRV_noinc((SV*)bh));
1826             }
1827              
1828             /* cluster_id */
1829             const char *cid; int32_t cidlen;
1830 0           n = kf_read_compact_string(p, end, &cid, &cidlen);
1831 0 0         if (n < 0) goto done;
1832 0           p += n;
1833              
1834             /* controller_id */
1835 0 0         if (end - p < 4) goto done;
1836 0           int32_t controller_id = kf_read_i32(p); p += 4;
1837 0           hv_store(result, "controller_id", 13, newSViv(controller_id), 0);
1838              
1839             /* topics: compact array */
1840 0           n = kf_read_uvarint(p, end, &raw);
1841 0 0         if (n < 0) goto done;
1842 0           p += n;
1843 0           int32_t topic_count = (int32_t)(raw - 1);
1844              
1845 0 0         for (i = 0; i < topic_count; i++) {
1846 0           HV *th = newHV();
1847 0 0         if (end - p < 2) goto done;
1848 0           int16_t terr = kf_read_i16(p); p += 2;
1849 0           hv_store(th, "error_code", 10, newSViv(terr), 0);
1850              
1851             const char *tname; int32_t tnlen;
1852 0           n = kf_read_compact_string(p, end, &tname, &tnlen);
1853 0 0         if (n < 0) goto done;
1854 0           p += n;
1855 0 0         hv_store(th, "name", 4, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
    0          
1856              
1857             /* topic_id (UUID, 16 bytes) — v10+ */
1858 0 0         if (version >= 10) {
1859 0 0         if (end - p < 16) goto done;
1860 0           p += 16;
1861             }
1862              
1863             /* is_internal */
1864 0 0         if (end - p < 1) goto done;
1865 0           p += 1;
1866              
1867             /* partitions: compact array */
1868 0           n = kf_read_uvarint(p, end, &raw);
1869 0 0         if (n < 0) goto done;
1870 0           p += n;
1871 0           int32_t part_count = (int32_t)(raw - 1);
1872 0           AV *parts_av = newAV();
1873             int32_t j;
1874              
1875 0 0         for (j = 0; j < part_count; j++) {
1876 0           HV *ph = newHV();
1877 0 0         if (end - p < 2) goto done;
1878 0           int16_t perr = kf_read_i16(p); p += 2;
1879 0           hv_store(ph, "error_code", 10, newSViv(perr), 0);
1880              
1881 0 0         if (end - p < 4) goto done;
1882 0           int32_t pid = kf_read_i32(p); p += 4;
1883 0           hv_store(ph, "partition", 9, newSViv(pid), 0);
1884              
1885 0 0         if (end - p < 4) goto done;
1886 0           int32_t leader = kf_read_i32(p); p += 4;
1887 0           hv_store(ph, "leader", 6, newSViv(leader), 0);
1888              
1889             /* leader_epoch — v7+ */
1890 0 0         if (version >= 7) {
1891 0 0         if (end - p < 4) goto done;
1892 0           p += 4;
1893             }
1894              
1895             /* replicas: compact array of i32 */
1896 0           n = kf_read_uvarint(p, end, &raw);
1897 0 0         if (n < 0) goto done;
1898 0           p += n;
1899 0           int32_t rcount = (int32_t)(raw - 1);
1900 0 0         if (rcount < 0 || rcount > 65536) goto done;
    0          
1901 0 0         if (end - p < (ptrdiff_t)((int64_t)rcount * 4)) goto done;
1902 0           p += (int64_t)rcount * 4;
1903              
1904             /* isr: compact array of i32 */
1905 0           n = kf_read_uvarint(p, end, &raw);
1906 0 0         if (n < 0) goto done;
1907 0           p += n;
1908 0           rcount = (int32_t)(raw - 1);
1909 0 0         if (rcount < 0 || rcount > 65536) goto done;
    0          
1910 0 0         if (end - p < (ptrdiff_t)((int64_t)rcount * 4)) goto done;
1911 0           p += (int64_t)rcount * 4;
1912              
1913             /* offline_replicas: compact array of i32 — v5+ */
1914 0 0         if (version >= 5) {
1915 0           n = kf_read_uvarint(p, end, &raw);
1916 0 0         if (n < 0) goto done;
1917 0           p += n;
1918 0           rcount = (int32_t)(raw - 1);
1919 0 0         if (end - p < (ptrdiff_t)(rcount * 4)) goto done;
1920 0           p += rcount * 4;
1921             }
1922              
1923             /* tagged fields */
1924 0           n = kf_skip_tagged_fields(p, end);
1925 0 0         if (n < 0) goto done;
1926 0           p += n;
1927              
1928 0           av_push(parts_av, newRV_noinc((SV*)ph));
1929             }
1930 0           hv_store(th, "partitions", 10, newRV_noinc((SV*)parts_av), 0);
1931              
1932             /* topic authorized operations — v8+ */
1933 0 0         if (version >= 8) {
1934 0 0         if (end - p < 4) goto done;
1935 0           p += 4;
1936             }
1937              
1938             /* tagged fields */
1939 0           n = kf_skip_tagged_fields(p, end);
1940 0 0         if (n < 0) goto done;
1941 0           p += n;
1942              
1943 0           av_push(topics_av, newRV_noinc((SV*)th));
1944             }
1945             } else {
1946             /* Non-flexible (v0-v8) — use classic STRING/ARRAY encoding */
1947             /* throttle_time_ms (v3+) */
1948 2 50         if (version >= 3) {
1949 0 0         if (end - p < 4) goto done;
1950 0           p += 4;
1951             }
1952              
1953             /* brokers array */
1954 2 50         if (end - p < 4) goto done;
1955 2           int32_t broker_count = kf_read_i32(p); p += 4;
1956 2 50         if (broker_count < 0 || broker_count > 65536) goto done;
    50          
1957             int32_t i;
1958 3 100         for (i = 0; i < broker_count; i++) {
1959 2           HV *bh = newHV();
1960 2 100         if (end - p < 4) goto done;
1961 1           int32_t nid = kf_read_i32(p); p += 4;
1962 1           hv_store(bh, "node_id", 7, newSViv(nid), 0);
1963              
1964             const char *host; int16_t hlen;
1965 1           n = kf_read_string(p, end, &host, &hlen);
1966 1 50         if (n < 0) goto done;
1967 1           p += n;
1968 1 50         hv_store(bh, "host", 4, newSVpvn(host ? host : "", host ? hlen : 0), 0);
    50          
1969              
1970 1 50         if (end - p < 4) goto done;
1971 1           int32_t port = kf_read_i32(p); p += 4;
1972 1           hv_store(bh, "port", 4, newSViv(port), 0);
1973              
1974             /* rack (v1+) */
1975 1 50         if (version >= 1) {
1976             const char *r; int16_t rlen;
1977 0           n = kf_read_string(p, end, &r, &rlen);
1978 0 0         if (n < 0) goto done;
1979 0           p += n;
1980             }
1981              
1982 1           av_push(brokers_av, newRV_noinc((SV*)bh));
1983             }
1984              
1985             /* cluster_id (v2+) */
1986 1 50         if (version >= 2) {
1987             const char *cid; int16_t cidlen;
1988 0           n = kf_read_string(p, end, &cid, &cidlen);
1989 0 0         if (n < 0) goto done;
1990 0           p += n;
1991             }
1992              
1993             /* controller_id (v1+) */
1994 1 50         if (version >= 1) {
1995 0 0         if (end - p < 4) goto done;
1996 0           int32_t cid = kf_read_i32(p); p += 4;
1997 0           hv_store(result, "controller_id", 13, newSViv(cid), 0);
1998             }
1999              
2000             /* topics array */
2001 1 50         if (end - p < 4) goto done;
2002 1           int32_t topic_count = kf_read_i32(p); p += 4;
2003 1 50         if (topic_count < 0 || topic_count > 1000000) goto done;
    50          
2004 2 100         for (i = 0; i < topic_count; i++) {
2005 1           HV *th = newHV();
2006 1 50         if (end - p < 2) goto done;
2007 1           int16_t terr = kf_read_i16(p); p += 2;
2008 1           hv_store(th, "error_code", 10, newSViv(terr), 0);
2009              
2010             const char *tname; int16_t tnlen;
2011 1           n = kf_read_string(p, end, &tname, &tnlen);
2012 1 50         if (n < 0) goto done;
2013 1           p += n;
2014 1 50         hv_store(th, "name", 4, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
    50          
2015              
2016             /* is_internal (v1+) */
2017 1 50         if (version >= 1) {
2018 0 0         if (end - p < 1) goto done;
2019 0           p += 1;
2020             }
2021              
2022             /* partitions */
2023 1 50         if (end - p < 4) goto done;
2024 1           int32_t part_count = kf_read_i32(p); p += 4;
2025 1 50         if (part_count < 0 || part_count > 1000000) goto done;
    50          
2026 1           AV *parts_av = newAV();
2027             int32_t j;
2028 2 100         for (j = 0; j < part_count; j++) {
2029 1           HV *ph = newHV();
2030 1 50         if (end - p < 2) goto done;
2031 1           int16_t perr = kf_read_i16(p); p += 2;
2032 1           hv_store(ph, "error_code", 10, newSViv(perr), 0);
2033              
2034 1 50         if (end - p < 4) goto done;
2035 1           int32_t pid = kf_read_i32(p); p += 4;
2036 1           hv_store(ph, "partition", 9, newSViv(pid), 0);
2037              
2038 1 50         if (end - p < 4) goto done;
2039 1           int32_t leader = kf_read_i32(p); p += 4;
2040 1           hv_store(ph, "leader", 6, newSViv(leader), 0);
2041              
2042             /* leader_epoch (v7+) */
2043 1 50         if (version >= 7) {
2044 0 0         if (end - p < 4) goto done;
2045 0           p += 4;
2046             }
2047              
2048             /* replicas */
2049 1 50         if (end - p < 4) goto done;
2050 1           int32_t rcount = kf_read_i32(p); p += 4;
2051 1 50         if (rcount < 0 || rcount > 65536) goto done;
    50          
2052 1 50         if (end - p < (ptrdiff_t)((int64_t)rcount * 4)) goto done;
2053 1           p += (int64_t)rcount * 4;
2054              
2055             /* isr */
2056 1 50         if (end - p < 4) goto done;
2057 1           rcount = kf_read_i32(p); p += 4;
2058 1 50         if (rcount < 0 || rcount > 65536) goto done;
    50          
2059 1 50         if (end - p < (ptrdiff_t)((int64_t)rcount * 4)) goto done;
2060 1           p += (int64_t)rcount * 4;
2061              
2062             /* offline_replicas (v5+) */
2063 1 50         if (version >= 5) {
2064 0 0         if (end - p < 4) goto done;
2065 0           rcount = kf_read_i32(p); p += 4;
2066 0 0         if (end - p < (ptrdiff_t)(rcount * 4)) goto done;
2067 0           p += rcount * 4;
2068             }
2069              
2070 1           av_push(parts_av, newRV_noinc((SV*)ph));
2071             }
2072 1           hv_store(th, "partitions", 10, newRV_noinc((SV*)parts_av), 0);
2073              
2074 1           av_push(topics_av, newRV_noinc((SV*)th));
2075             }
2076             }
2077              
2078 1           done:
2079 2           hv_store(result, "brokers", 7, newRV_noinc((SV*)brokers_av), 0);
2080 2           hv_store(result, "topics", 6, newRV_noinc((SV*)topics_av), 0);
2081 2           return sv_2mortal(newRV_noinc((SV*)result));
2082             }
2083              
2084             /* Produce response parser (API 0, v0-v7) */
2085 1           static SV* conn_parse_produce_response(pTHX_ ev_kafka_conn_t *self,
2086             int16_t version, const char *data, size_t len)
2087             {
2088 1           const char *p = data;
2089 1           const char *end = data + len;
2090 1           HV *result = newHV();
2091 1           AV *topics_av = newAV();
2092             int n;
2093              
2094             (void)self;
2095              
2096             /* responses: ARRAY */
2097 1 50         if (end - p < 4) goto done;
2098 1           int32_t topic_count = kf_read_i32(p); p += 4;
2099             int32_t i;
2100              
2101 2 100         for (i = 0; i < topic_count; i++) {
2102 1           HV *th = newHV();
2103             const char *tname; int16_t tnlen;
2104 1           n = kf_read_string(p, end, &tname, &tnlen);
2105 1 50         if (n < 0) goto done;
2106 1           p += n;
2107 1 50         hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
    50          
2108              
2109             /* partitions: ARRAY */
2110 1 50         if (end - p < 4) goto done;
2111 1           int32_t part_count = kf_read_i32(p); p += 4;
2112 1           AV *parts_av = newAV();
2113             int32_t j;
2114              
2115 2 100         for (j = 0; j < part_count; j++) {
2116 1           HV *ph = newHV();
2117 1 50         if (end - p < 4) goto done;
2118 1           int32_t pid = kf_read_i32(p); p += 4;
2119 1           hv_store(ph, "partition", 9, newSViv(pid), 0);
2120              
2121 1 50         if (end - p < 2) goto done;
2122 1           int16_t err = kf_read_i16(p); p += 2;
2123 1           hv_store(ph, "error_code", 10, newSViv(err), 0);
2124              
2125 1 50         if (end - p < 8) goto done;
2126 1           int64_t base_offset = kf_read_i64(p); p += 8;
2127 1           hv_store(ph, "base_offset", 11, newSViv(base_offset), 0);
2128              
2129             /* log_append_time (v2+) */
2130 1 50         if (version >= 2) {
2131 0 0         if (end - p < 8) goto done;
2132 0           p += 8;
2133             }
2134              
2135             /* log_start_offset (v5+) */
2136 1 50         if (version >= 5) {
2137 0 0         if (end - p < 8) goto done;
2138 0           p += 8;
2139             }
2140              
2141 1           av_push(parts_av, newRV_noinc((SV*)ph));
2142             }
2143 1           hv_store(th, "partitions", 10, newRV_noinc((SV*)parts_av), 0);
2144 1           av_push(topics_av, newRV_noinc((SV*)th));
2145             }
2146              
2147             /* throttle_time_ms (v1+) */
2148 1 50         if (version >= 1 && end - p >= 4) {
    0          
2149 0           int32_t throttle = kf_read_i32(p); p += 4;
2150 0           hv_store(result, "throttle_time_ms", 16, newSViv(throttle), 0);
2151             }
2152              
2153 1           done:
2154 1           hv_store(result, "topics", 6, newRV_noinc((SV*)topics_av), 0);
2155 1           return sv_2mortal(newRV_noinc((SV*)result));
2156             }
2157              
2158             /* ================================================================
2159             * RecordBatch decoder (for Fetch responses)
2160             * ================================================================ */
2161              
2162             /* Decode records from a RecordBatch, push them as hashrefs onto records_av.
2163             * Returns number of records decoded, or -1 on error. */
2164 16           static int kf_decode_record_batch(pTHX_ const char *data, size_t len,
2165             AV *records_av, int64_t *out_base_offset)
2166             {
2167 16           const char *p = data;
2168 16           const char *end = data + len;
2169             int n;
2170              
2171 16 50         if (end - p < 12) return -1;
2172 16           int64_t base_offset = kf_read_i64(p); p += 8;
2173 16 50         if (out_base_offset) *out_base_offset = base_offset;
2174 16           int32_t batch_length = kf_read_i32(p); p += 4;
2175              
2176 16 100         if (end - p < batch_length) return -1;
2177 15           const char *batch_end = p + batch_length;
2178              
2179 15 50         if (batch_end - p < 9) return -1;
2180 15           /* int32_t partition_leader_epoch = kf_read_i32(p); */ p += 4;
2181 15           int8_t magic = (int8_t)*p; p += 1;
2182 15 100         if (magic != 2) return -1; /* only support magic=2 (current format) */
2183 14           uint32_t expected_crc = (uint32_t)kf_read_i32(p); p += 4;
2184             /* CRC32C covers the bytes from attributes to end of batch. */
2185 14 100         if (crc32c(p, (size_t)(batch_end - p)) != expected_crc) return -1;
2186              
2187 13 50         if (batch_end - p < 36) return -1;
2188 13           int16_t attributes = kf_read_i16(p); p += 2;
2189 13           int compression_type = attributes & 0x07;
2190 13           /* int32_t last_offset_delta = kf_read_i32(p); */ p += 4;
2191 13           int64_t first_timestamp = kf_read_i64(p); p += 8;
2192 13           /* int64_t max_timestamp = kf_read_i64(p); */ p += 8;
2193 13           /* int64_t producer_id = kf_read_i64(p); */ p += 8;
2194 13           /* int16_t producer_epoch = kf_read_i16(p); */ p += 2;
2195 13           /* int32_t base_sequence = kf_read_i32(p); */ p += 4;
2196              
2197 13 50         if (batch_end - p < 4) return -1;
2198 13           int32_t record_count = kf_read_i32(p); p += 4;
2199              
2200             /* Decompress if needed */
2201 13           const char *rec_data = p;
2202 13           const char *rec_end = batch_end;
2203 13           char *decompressed = NULL;
2204              
2205 13 100         if (compression_type != COMPRESS_NONE && batch_end > p) {
    50          
2206 7           size_t compressed_len = batch_end - p;
2207 7           size_t decomp_cap = compressed_len * 4;
2208 7 100         if (decomp_cap < 4096) decomp_cap = 4096;
2209              
2210             #ifdef HAVE_ZLIB
2211 7 100         if (compression_type == COMPRESS_GZIP) {
2212 1           int zok = 0;
2213 2 100         while (!zok && decomp_cap < 64 * 1024 * 1024) {
    50          
2214 1           Newx(decompressed, decomp_cap, char);
2215             z_stream zs;
2216 1           Zero(&zs, 1, z_stream);
2217 1           int zinit = inflateInit2(&zs, MAX_WBITS + 16);
2218 1 50         if (zinit != Z_OK) {
2219 0           Safefree(decompressed);
2220 0           decompressed = NULL;
2221 0           break;
2222             }
2223 1           zs.next_in = (Bytef *)p;
2224 1           zs.avail_in = (uInt)compressed_len;
2225 1           zs.next_out = (Bytef *)decompressed;
2226 1           zs.avail_out = (uInt)decomp_cap;
2227 1           int zret = inflate(&zs, Z_FINISH);
2228 1           size_t dest_len = zs.total_out;
2229 1           inflateEnd(&zs);
2230 1 50         if (zret == Z_STREAM_END) {
2231 1           rec_data = decompressed;
2232 1           rec_end = decompressed + dest_len;
2233 1           zok = 1;
2234 0 0         } else if (zret == Z_BUF_ERROR || zret == Z_OK) {
    0          
2235 0           Safefree(decompressed);
2236 0           decompressed = NULL;
2237 0           decomp_cap *= 2;
2238             } else {
2239 0           Safefree(decompressed);
2240 0           decompressed = NULL;
2241 0           break;
2242             }
2243             }
2244             }
2245             #endif
2246             #ifdef HAVE_LZ4
2247             if (compression_type == COMPRESS_LZ4) {
2248             int dlen = -1;
2249             while (decomp_cap < 64 * 1024 * 1024) {
2250             Newx(decompressed, decomp_cap, char);
2251             dlen = LZ4_decompress_safe(p, decompressed,
2252             (int)compressed_len, (int)decomp_cap);
2253             if (dlen >= 0) break;
2254             /* Negative return can mean either malformed input or
2255             * insufficient output buffer — LZ4 doesn't distinguish.
2256             * Grow and retry; if it's malformed we'll bail at the cap. */
2257             Safefree(decompressed);
2258             decompressed = NULL;
2259             decomp_cap *= 2;
2260             }
2261             if (dlen > 0) {
2262             rec_data = decompressed;
2263             rec_end = decompressed + dlen;
2264             } else if (decompressed) {
2265             Safefree(decompressed);
2266             decompressed = NULL;
2267             }
2268             }
2269             #endif
2270             #ifdef HAVE_ZSTD
2271             if (compression_type == COMPRESS_ZSTD) {
2272             unsigned long long expected =
2273             ZSTD_getFrameContentSize(p, compressed_len);
2274             size_t dlen;
2275             if (expected != ZSTD_CONTENTSIZE_ERROR
2276             && expected != ZSTD_CONTENTSIZE_UNKNOWN
2277             && expected <= 64ULL * 1024 * 1024) {
2278             Newx(decompressed, (size_t)expected, char);
2279             dlen = ZSTD_decompress(decompressed, (size_t)expected,
2280             p, compressed_len);
2281             if (!ZSTD_isError(dlen)) {
2282             rec_data = decompressed;
2283             rec_end = decompressed + dlen;
2284             } else {
2285             Safefree(decompressed);
2286             decompressed = NULL;
2287             }
2288             }
2289             }
2290             #endif
2291             #ifdef HAVE_SNAPPY
2292             if (compression_type == COMPRESS_SNAPPY) {
2293             size_t dlen;
2294             if (snappy_uncompressed_length(p, compressed_len, &dlen)
2295             == SNAPPY_OK
2296             && dlen <= 64UL * 1024 * 1024) {
2297             Newx(decompressed, dlen, char);
2298             if (snappy_uncompress(p, compressed_len,
2299             decompressed, &dlen) == SNAPPY_OK) {
2300             rec_data = decompressed;
2301             rec_end = decompressed + dlen;
2302             } else {
2303             Safefree(decompressed);
2304             decompressed = NULL;
2305             }
2306             }
2307             }
2308             #endif
2309             }
2310              
2311 13           const char *rp = rec_data;
2312             int32_t i;
2313 151 100         for (i = 0; i < record_count; i++) {
2314             int64_t rec_len;
2315 138           n = kf_read_varint(rp, rec_end, &rec_len);
2316 138 50         if (n < 0) { if (decompressed) Safefree(decompressed); return -1; }
    0          
2317 138           rp += n;
2318 138 50         if (rec_end - rp < rec_len) { if (decompressed) Safefree(decompressed); return -1; }
    0          
2319 138           const char *this_rec_end = rp + rec_len;
2320              
2321 138 50         if (this_rec_end - rp < 1) { if (decompressed) Safefree(decompressed); return -1; }
    0          
2322 138           /* int8_t rec_attrs = (int8_t)*rp; */ rp += 1;
2323              
2324             int64_t ts_delta;
2325 138           n = kf_read_varint(rp, this_rec_end, &ts_delta);
2326 138 50         if (n < 0) { if (decompressed) Safefree(decompressed); return -1; }
    0          
2327 138           rp += n;
2328              
2329             int64_t offset_delta;
2330 138           n = kf_read_varint(rp, this_rec_end, &offset_delta);
2331 138 50         if (n < 0) { if (decompressed) Safefree(decompressed); return -1; }
    0          
2332 138           rp += n;
2333              
2334             /* key */
2335             int64_t key_len;
2336 138           n = kf_read_varint(rp, this_rec_end, &key_len);
2337 138 50         if (n < 0) { if (decompressed) Safefree(decompressed); return -1; }
    0          
2338 138           rp += n;
2339 138           const char *key_data = NULL;
2340 138 100         if (key_len >= 0) {
2341 137 50         if (this_rec_end - rp < key_len) { if (decompressed) Safefree(decompressed); return -1; }
    0          
2342 137           key_data = rp;
2343 137           rp += key_len;
2344             }
2345              
2346             /* value */
2347             int64_t val_len;
2348 138           n = kf_read_varint(rp, this_rec_end, &val_len);
2349 138 50         if (n < 0) { if (decompressed) Safefree(decompressed); return -1; }
    0          
2350 138           rp += n;
2351 138           const char *val_data = NULL;
2352 138 100         if (val_len >= 0) {
2353 137 50         if (this_rec_end - rp < val_len) { if (decompressed) Safefree(decompressed); return -1; }
    0          
2354 137           val_data = rp;
2355 137           rp += val_len;
2356             }
2357              
2358             /* headers */
2359             int64_t hdr_count;
2360 138           n = kf_read_varint(rp, this_rec_end, &hdr_count);
2361 138 50         if (n < 0) { if (decompressed) Safefree(decompressed); return -1; }
    0          
2362 138           rp += n;
2363 138           HV *hdr_hv = NULL;
2364 138 100         if (hdr_count > 0) {
2365 1           hdr_hv = newHV();
2366             int64_t h;
2367 3 100         for (h = 0; h < hdr_count; h++) {
2368             int64_t hk_len;
2369 2           n = kf_read_varint(rp, this_rec_end, &hk_len);
2370 2 50         if (n < 0) { SvREFCNT_dec((SV*)hdr_hv); if (decompressed) Safefree(decompressed); return -1; }
    0          
2371 2           rp += n;
2372 2           const char *hk_data = rp;
2373 2 50         if (this_rec_end - rp < hk_len) { SvREFCNT_dec((SV*)hdr_hv); if (decompressed) Safefree(decompressed); return -1; }
    0          
2374 2           rp += hk_len;
2375              
2376             int64_t hv_len;
2377 2           n = kf_read_varint(rp, this_rec_end, &hv_len);
2378 2 50         if (n < 0) { SvREFCNT_dec((SV*)hdr_hv); if (decompressed) Safefree(decompressed); return -1; }
    0          
2379 2           rp += n;
2380 2           const char *hv_data = rp;
2381 2 50         if (hv_len >= 0) {
2382 2 50         if (this_rec_end - rp < hv_len) { SvREFCNT_dec((SV*)hdr_hv); if (decompressed) Safefree(decompressed); return -1; }
    0          
2383 2           rp += hv_len;
2384             }
2385              
2386 2 50         hv_store(hdr_hv, hk_data, (I32)hk_len,
2387             hv_len >= 0 ? newSVpvn(hv_data, (STRLEN)hv_len) : newSV(0), 0);
2388             }
2389             }
2390              
2391 138           HV *rec_hv = newHV();
2392 138           hv_store(rec_hv, "offset", 6, newSViv(base_offset + offset_delta), 0);
2393 138           hv_store(rec_hv, "timestamp", 9, newSViv(first_timestamp + ts_delta), 0);
2394 138 100         hv_store(rec_hv, "key", 3,
2395             key_data ? newSVpvn(key_data, (STRLEN)key_len) : newSV(0), 0);
2396 138 100         hv_store(rec_hv, "value", 5,
2397             val_data ? newSVpvn(val_data, (STRLEN)val_len) : newSV(0), 0);
2398 138 100         if (hdr_hv)
2399 1           hv_store(rec_hv, "headers", 7, newRV_noinc((SV*)hdr_hv), 0);
2400              
2401 138           av_push(records_av, newRV_noinc((SV*)rec_hv));
2402              
2403 138           rp = this_rec_end; /* skip any remaining bytes in the record */
2404             }
2405              
2406 13 100         if (decompressed) Safefree(decompressed);
2407 13           return record_count;
2408             }
2409              
2410             /* Fetch response parser (API 1, v4-v7 non-flexible) */
2411 0           static SV* conn_parse_fetch_response(pTHX_ ev_kafka_conn_t *self,
2412             int16_t version, const char *data, size_t len)
2413             {
2414 0           const char *p = data;
2415 0           const char *end = data + len;
2416 0           HV *result = newHV();
2417 0           AV *topics_av = newAV();
2418             int n;
2419              
2420             (void)self;
2421              
2422             /* throttle_time_ms (v1+) */
2423 0 0         if (version >= 1) {
2424 0 0         if (end - p < 4) goto done;
2425 0           int32_t throttle = kf_read_i32(p); p += 4;
2426 0           hv_store(result, "throttle_time_ms", 16, newSViv(throttle), 0);
2427             }
2428              
2429             /* error_code (v7+) */
2430 0 0         if (version >= 7) {
2431 0 0         if (end - p < 2) goto done;
2432 0           p += 2;
2433             }
2434              
2435             /* session_id (v7+) */
2436 0 0         if (version >= 7) {
2437 0 0         if (end - p < 4) goto done;
2438 0           p += 4;
2439             }
2440              
2441             /* responses: ARRAY */
2442 0 0         if (end - p < 4) goto done;
2443 0           int32_t topic_count = kf_read_i32(p); p += 4;
2444             int32_t i;
2445              
2446 0 0         for (i = 0; i < topic_count; i++) {
2447 0           HV *th = newHV();
2448              
2449             const char *tname; int16_t tnlen;
2450 0           n = kf_read_string(p, end, &tname, &tnlen);
2451 0 0         if (n < 0) goto done;
2452 0           p += n;
2453 0 0         hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
    0          
2454              
2455             /* partitions: ARRAY */
2456 0 0         if (end - p < 4) goto done;
2457 0           int32_t part_count = kf_read_i32(p); p += 4;
2458 0           AV *parts_av = newAV();
2459             int32_t j;
2460              
2461 0 0         for (j = 0; j < part_count; j++) {
2462 0           HV *ph = newHV();
2463              
2464 0 0         if (end - p < 4) goto done;
2465 0           int32_t pid = kf_read_i32(p); p += 4;
2466 0           hv_store(ph, "partition", 9, newSViv(pid), 0);
2467              
2468 0 0         if (end - p < 2) goto done;
2469 0           int16_t err = kf_read_i16(p); p += 2;
2470 0           hv_store(ph, "error_code", 10, newSViv(err), 0);
2471              
2472 0 0         if (end - p < 8) goto done;
2473 0           int64_t hw = kf_read_i64(p); p += 8;
2474 0           hv_store(ph, "high_watermark", 14, newSViv(hw), 0);
2475              
2476             /* last_stable_offset (v4+) */
2477 0 0         if (version >= 4) {
2478 0 0         if (end - p < 8) goto done;
2479 0           int64_t lso = kf_read_i64(p); p += 8;
2480 0           hv_store(ph, "last_stable_offset", 18, newSViv(lso), 0);
2481             }
2482              
2483             /* log_start_offset (v5+) */
2484 0 0         if (version >= 5) {
2485 0 0         if (end - p < 8) goto done;
2486 0           p += 8;
2487             }
2488              
2489             /* aborted_transactions (v4+) */
2490 0 0         if (version >= 4) {
2491 0 0         if (end - p < 4) goto done;
2492 0           int32_t at_count = kf_read_i32(p); p += 4;
2493             int32_t at;
2494 0 0         for (at = 0; at < at_count; at++) {
2495 0 0         if (end - p < 16) goto done;
2496 0           p += 16; /* producer_id(i64) + first_offset(i64) */
2497             }
2498             }
2499              
2500             /* record_set: BYTES (records data) */
2501 0 0         if (end - p < 4) goto done;
2502 0           int32_t records_size = kf_read_i32(p); p += 4;
2503              
2504 0           AV *records_av = newAV();
2505 0 0         if (records_size > 0 && end - p >= records_size) {
    0          
2506 0           const char *rp = p;
2507 0           const char *rend = p + records_size;
2508              
2509             /* May contain multiple RecordBatches */
2510 0 0         while (rp < rend && rend - rp >= 12) {
    0          
2511             int64_t bo;
2512 0           int32_t bl = kf_read_i32(rp + 8);
2513 0 0         if (bl < 0 || rend - rp < 12 + bl) break;
    0          
2514 0           kf_decode_record_batch(aTHX_ rp, 12 + (size_t)bl, records_av, &bo);
2515 0           rp += 12 + bl;
2516             }
2517              
2518 0           p += records_size;
2519 0 0         } else if (records_size > 0) {
2520 0           p = end; /* truncated */
2521             }
2522              
2523 0           hv_store(ph, "records", 7, newRV_noinc((SV*)records_av), 0);
2524 0           av_push(parts_av, newRV_noinc((SV*)ph));
2525             }
2526              
2527 0           hv_store(th, "partitions", 10, newRV_noinc((SV*)parts_av), 0);
2528 0           av_push(topics_av, newRV_noinc((SV*)th));
2529             }
2530              
2531 0           done:
2532 0           hv_store(result, "topics", 6, newRV_noinc((SV*)topics_av), 0);
2533 0           return sv_2mortal(newRV_noinc((SV*)result));
2534             }
2535              
2536             /* ListOffsets response parser (API 2, v1+) */
2537 0           static SV* conn_parse_list_offsets_response(pTHX_ ev_kafka_conn_t *self,
2538             int16_t version, const char *data, size_t len)
2539             {
2540 0           const char *p = data;
2541 0           const char *end = data + len;
2542 0           HV *result = newHV();
2543 0           AV *topics_av = newAV();
2544             int n;
2545              
2546             (void)self;
2547              
2548             /* throttle_time_ms (v2+) */
2549 0 0         if (version >= 2) {
2550 0 0         if (end - p < 4) goto done;
2551 0           p += 4;
2552             }
2553              
2554             /* topics: ARRAY */
2555 0 0         if (end - p < 4) goto done;
2556 0           int32_t topic_count = kf_read_i32(p); p += 4;
2557             int32_t i;
2558              
2559 0 0         for (i = 0; i < topic_count; i++) {
2560 0           HV *th = newHV();
2561             const char *tname; int16_t tnlen;
2562 0           n = kf_read_string(p, end, &tname, &tnlen);
2563 0 0         if (n < 0) goto done;
2564 0           p += n;
2565 0 0         hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
    0          
2566              
2567 0 0         if (end - p < 4) goto done;
2568 0           int32_t part_count = kf_read_i32(p); p += 4;
2569 0           AV *parts_av = newAV();
2570             int32_t j;
2571              
2572 0 0         for (j = 0; j < part_count; j++) {
2573 0           HV *ph = newHV();
2574 0 0         if (end - p < 4) goto done;
2575 0           int32_t pid = kf_read_i32(p); p += 4;
2576 0           hv_store(ph, "partition", 9, newSViv(pid), 0);
2577              
2578 0 0         if (end - p < 2) goto done;
2579 0           int16_t err = kf_read_i16(p); p += 2;
2580 0           hv_store(ph, "error_code", 10, newSViv(err), 0);
2581              
2582 0 0         if (version >= 1) {
2583 0 0         if (end - p < 8) goto done;
2584 0           int64_t ts = kf_read_i64(p); p += 8;
2585 0           hv_store(ph, "timestamp", 9, newSViv(ts), 0);
2586             }
2587              
2588 0 0         if (end - p < 8) goto done;
2589 0           int64_t offset = kf_read_i64(p); p += 8;
2590 0           hv_store(ph, "offset", 6, newSViv(offset), 0);
2591              
2592             /* leader_epoch (v4+) */
2593 0 0         if (version >= 4) {
2594 0 0         if (end - p < 4) goto done;
2595 0           p += 4;
2596             }
2597              
2598 0           av_push(parts_av, newRV_noinc((SV*)ph));
2599             }
2600 0           hv_store(th, "partitions", 10, newRV_noinc((SV*)parts_av), 0);
2601 0           av_push(topics_av, newRV_noinc((SV*)th));
2602             }
2603              
2604 0           done:
2605 0           hv_store(result, "topics", 6, newRV_noinc((SV*)topics_av), 0);
2606 0           return sv_2mortal(newRV_noinc((SV*)result));
2607             }
2608              
2609             /* FindCoordinator response parser (API 10, v0-v3) */
2610 1           static SV* conn_parse_find_coordinator_response(pTHX_ ev_kafka_conn_t *self,
2611             int16_t version, const char *data, size_t len)
2612             {
2613 1           const char *p = data;
2614 1           const char *end = data + len;
2615 1           HV *result = newHV();
2616             int n;
2617             (void)self;
2618              
2619             /* throttle_time_ms (v1+) */
2620 1 50         if (version >= 1) {
2621 0 0         if (end - p < 4) goto done;
2622 0           p += 4;
2623             }
2624              
2625 1 50         if (end - p < 2) goto done;
2626 1           int16_t err = kf_read_i16(p); p += 2;
2627 1           hv_store(result, "error_code", 10, newSViv(err), 0);
2628              
2629             /* error_message (v1+) */
2630 1 50         if (version >= 1) {
2631             const char *emsg; int16_t elen;
2632 0           n = kf_read_string(p, end, &emsg, &elen);
2633 0 0         if (n < 0) goto done;
2634 0           p += n;
2635 0 0         if (emsg && elen > 0)
    0          
2636 0           hv_store(result, "error_message", 13, newSVpvn(emsg, elen), 0);
2637             }
2638              
2639 1 50         if (end - p < 4) goto done;
2640 1           int32_t nid = kf_read_i32(p); p += 4;
2641 1           hv_store(result, "node_id", 7, newSViv(nid), 0);
2642              
2643             const char *host; int16_t hlen;
2644 1           n = kf_read_string(p, end, &host, &hlen);
2645 1 50         if (n < 0) goto done;
2646 1           p += n;
2647 1 50         hv_store(result, "host", 4, newSVpvn(host ? host : "", host ? hlen : 0), 0);
    50          
2648              
2649 1 50         if (end - p < 4) goto done;
2650 1           int32_t port = kf_read_i32(p); p += 4;
2651 1           hv_store(result, "port", 4, newSViv(port), 0);
2652              
2653 1           done:
2654 1           return sv_2mortal(newRV_noinc((SV*)result));
2655             }
2656              
2657             /* JoinGroup response parser (API 11, v0-v5) */
2658 1           static SV* conn_parse_join_group_response(pTHX_ ev_kafka_conn_t *self,
2659             int16_t version, const char *data, size_t len)
2660             {
2661 1           const char *p = data;
2662 1           const char *end = data + len;
2663 1           HV *result = newHV();
2664             int n;
2665             (void)self;
2666              
2667             /* throttle_time_ms (v2+) */
2668 1 50         if (version >= 2) {
2669 0 0         if (end - p < 4) goto done;
2670 0           p += 4;
2671             }
2672              
2673 1 50         if (end - p < 2) goto done;
2674 1           int16_t err = kf_read_i16(p); p += 2;
2675 1           hv_store(result, "error_code", 10, newSViv(err), 0);
2676              
2677 1 50         if (end - p < 4) goto done;
2678 1           int32_t gen = kf_read_i32(p); p += 4;
2679 1           hv_store(result, "generation_id", 13, newSViv(gen), 0);
2680              
2681             /* protocol_type (v7+) — skip for now, we use v5 max */
2682              
2683             const char *proto; int16_t plen;
2684 1           n = kf_read_string(p, end, &proto, &plen);
2685 1 50         if (n < 0) goto done;
2686 1           p += n;
2687 1 50         if (proto)
2688 1           hv_store(result, "protocol_name", 13, newSVpvn(proto, plen), 0);
2689              
2690             const char *leader; int16_t llen;
2691 1           n = kf_read_string(p, end, &leader, &llen);
2692 1 50         if (n < 0) goto done;
2693 1           p += n;
2694 1 50         if (leader)
2695 1           hv_store(result, "leader", 6, newSVpvn(leader, llen), 0);
2696              
2697             /* skip_assignment (v9+) — not applicable */
2698              
2699             const char *member_id; int16_t mlen;
2700 1           n = kf_read_string(p, end, &member_id, &mlen);
2701 1 50         if (n < 0) goto done;
2702 1           p += n;
2703 1 50         if (member_id)
2704 1           hv_store(result, "member_id", 9, newSVpvn(member_id, mlen), 0);
2705              
2706             /* members array */
2707 1 50         if (end - p < 4) goto done;
2708 1           int32_t mcount = kf_read_i32(p); p += 4;
2709 1           AV *members_av = newAV();
2710             int32_t i;
2711 2 100         for (i = 0; i < mcount; i++) {
2712 1           HV *mh = newHV();
2713              
2714             const char *mid; int16_t midlen;
2715 1           n = kf_read_string(p, end, &mid, &midlen);
2716 1 50         if (n < 0) goto done;
2717 1           p += n;
2718 1 50         if (mid)
2719 1           hv_store(mh, "member_id", 9, newSVpvn(mid, midlen), 0);
2720              
2721             /* group_instance_id (v5+) */
2722 1 50         if (version >= 5) {
2723             const char *gi; int16_t gilen;
2724 0           n = kf_read_string(p, end, &gi, &gilen);
2725 0 0         if (n < 0) goto done;
2726 0           p += n;
2727             }
2728              
2729             /* metadata: BYTES */
2730 1 50         if (end - p < 4) goto done;
2731 1           int32_t mdlen = kf_read_i32(p); p += 4;
2732 1 50         if (mdlen > 0) {
2733 0 0         if (end - p < mdlen) goto done;
2734 0           hv_store(mh, "metadata", 8, newSVpvn(p, mdlen), 0);
2735 0           p += mdlen;
2736             }
2737              
2738 1           av_push(members_av, newRV_noinc((SV*)mh));
2739             }
2740 1           hv_store(result, "members", 7, newRV_noinc((SV*)members_av), 0);
2741              
2742 1           done:
2743 1           return sv_2mortal(newRV_noinc((SV*)result));
2744             }
2745              
2746             /* SyncGroup response parser (API 14, v0-v3) */
2747 1           static SV* conn_parse_sync_group_response(pTHX_ ev_kafka_conn_t *self,
2748             int16_t version, const char *data, size_t len)
2749             {
2750 1           const char *p = data;
2751 1           const char *end = data + len;
2752 1           HV *result = newHV();
2753             (void)self;
2754              
2755             /* throttle_time_ms (v1+) */
2756 1 50         if (version >= 1) {
2757 0 0         if (end - p < 4) goto done;
2758 0           p += 4;
2759             }
2760              
2761 1 50         if (end - p < 2) goto done;
2762 1           int16_t err = kf_read_i16(p); p += 2;
2763 1           hv_store(result, "error_code", 10, newSViv(err), 0);
2764              
2765             /* assignment: BYTES */
2766 1 50         if (end - p < 4) goto done;
2767 1           int32_t alen = kf_read_i32(p); p += 4;
2768 1 50         if (alen > 0 && end - p >= alen) {
    50          
2769 1           hv_store(result, "assignment", 10, newSVpvn(p, alen), 0);
2770 1           p += alen;
2771             }
2772              
2773 0           done:
2774 1           return sv_2mortal(newRV_noinc((SV*)result));
2775             }
2776              
2777             /* Heartbeat response parser (API 12) */
2778 1           static SV* conn_parse_heartbeat_response(pTHX_ ev_kafka_conn_t *self,
2779             int16_t version, const char *data, size_t len)
2780             {
2781 1           const char *p = data;
2782 1           const char *end = data + len;
2783 1           HV *result = newHV();
2784             (void)self;
2785              
2786 1 50         if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */
    0          
2787 1 50         if (end - p >= 2) {
2788 1           int16_t err = kf_read_i16(p); p += 2;
2789 1           hv_store(result, "error_code", 10, newSViv(err), 0);
2790             }
2791              
2792 1           return sv_2mortal(newRV_noinc((SV*)result));
2793             }
2794              
2795             /* OffsetCommit response parser (API 8, v0-v7) */
2796 0           static SV* conn_parse_offset_commit_response(pTHX_ ev_kafka_conn_t *self,
2797             int16_t version, const char *data, size_t len)
2798             {
2799 0           const char *p = data;
2800 0           const char *end = data + len;
2801 0           HV *result = newHV();
2802 0           AV *topics_av = newAV();
2803             int n;
2804             (void)self;
2805              
2806 0 0         if (version >= 3 && end - p >= 4) p += 4; /* throttle_time_ms */
    0          
2807              
2808 0 0         if (end - p < 4) goto done;
2809 0           int32_t tc = kf_read_i32(p); p += 4;
2810             int32_t i;
2811 0 0         for (i = 0; i < tc; i++) {
2812 0           HV *th = newHV();
2813             const char *tname; int16_t tnlen;
2814 0           n = kf_read_string(p, end, &tname, &tnlen);
2815 0 0         if (n < 0) goto done;
2816 0           p += n;
2817 0 0         hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
    0          
2818              
2819 0 0         if (end - p < 4) goto done;
2820 0           int32_t pc = kf_read_i32(p); p += 4;
2821 0           AV *parts_av = newAV();
2822             int32_t j;
2823 0 0         for (j = 0; j < pc; j++) {
2824 0           HV *ph = newHV();
2825 0 0         if (end - p < 6) goto done;
2826 0           int32_t pid = kf_read_i32(p); p += 4;
2827 0           int16_t err = kf_read_i16(p); p += 2;
2828 0           hv_store(ph, "partition", 9, newSViv(pid), 0);
2829 0           hv_store(ph, "error_code", 10, newSViv(err), 0);
2830 0           av_push(parts_av, newRV_noinc((SV*)ph));
2831             }
2832 0           hv_store(th, "partitions", 10, newRV_noinc((SV*)parts_av), 0);
2833 0           av_push(topics_av, newRV_noinc((SV*)th));
2834             }
2835              
2836 0           done:
2837 0           hv_store(result, "topics", 6, newRV_noinc((SV*)topics_av), 0);
2838 0           return sv_2mortal(newRV_noinc((SV*)result));
2839             }
2840              
2841             /* OffsetFetch response parser (API 9, v0-v5) */
2842 1           static SV* conn_parse_offset_fetch_response(pTHX_ ev_kafka_conn_t *self,
2843             int16_t version, const char *data, size_t len)
2844             {
2845 1           const char *p = data;
2846 1           const char *end = data + len;
2847 1           HV *result = newHV();
2848 1           AV *topics_av = newAV();
2849             int n;
2850             (void)self;
2851              
2852 1 50         if (version >= 3 && end - p >= 4) p += 4; /* throttle_time_ms */
    0          
2853              
2854 1 50         if (end - p < 4) goto done;
2855 1           int32_t tc = kf_read_i32(p); p += 4;
2856             int32_t i;
2857 2 100         for (i = 0; i < tc; i++) {
2858 1           HV *th = newHV();
2859             const char *tname; int16_t tnlen;
2860 1           n = kf_read_string(p, end, &tname, &tnlen);
2861 1 50         if (n < 0) goto done;
2862 1           p += n;
2863 1 50         hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
    50          
2864              
2865 1 50         if (end - p < 4) goto done;
2866 1           int32_t pc = kf_read_i32(p); p += 4;
2867 1           AV *parts_av = newAV();
2868             int32_t j;
2869 2 100         for (j = 0; j < pc; j++) {
2870 1           HV *ph = newHV();
2871 1 50         if (end - p < 4) goto done;
2872 1           int32_t pid = kf_read_i32(p); p += 4;
2873 1           hv_store(ph, "partition", 9, newSViv(pid), 0);
2874              
2875 1 50         if (end - p < 8) goto done;
2876 1           int64_t offset = kf_read_i64(p); p += 8;
2877 1           hv_store(ph, "offset", 6, newSViv(offset), 0);
2878              
2879             /* leader_epoch (v5+) */
2880 1 50         if (version >= 5 && end - p >= 4) p += 4;
    0          
2881              
2882             const char *meta_str; int16_t meta_len;
2883 1           n = kf_read_string(p, end, &meta_str, &meta_len);
2884 1 50         if (n < 0) goto done;
2885 1           p += n;
2886              
2887 1 50         if (end - p < 2) goto done;
2888 1           int16_t err = kf_read_i16(p); p += 2;
2889 1           hv_store(ph, "error_code", 10, newSViv(err), 0);
2890              
2891 1           av_push(parts_av, newRV_noinc((SV*)ph));
2892             }
2893 1           hv_store(th, "partitions", 10, newRV_noinc((SV*)parts_av), 0);
2894 1           av_push(topics_av, newRV_noinc((SV*)th));
2895             }
2896              
2897             /* error_code (v2+) */
2898 1 50         if (version >= 2 && end - p >= 2) {
    0          
2899 0           int16_t err = kf_read_i16(p); p += 2;
2900 0           hv_store(result, "error_code", 10, newSViv(err), 0);
2901             }
2902              
2903 1           done:
2904 1           hv_store(result, "topics", 6, newRV_noinc((SV*)topics_av), 0);
2905 1           return sv_2mortal(newRV_noinc((SV*)result));
2906             }
2907              
2908             /* LeaveGroup response parser (API 13, v0-v3) */
2909 1           static SV* conn_parse_leave_group_response(pTHX_ ev_kafka_conn_t *self,
2910             int16_t version, const char *data, size_t len)
2911             {
2912 1           const char *p = data;
2913 1           const char *end = data + len;
2914 1           HV *result = newHV();
2915             (void)self;
2916              
2917 1 50         if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */
    0          
2918 1 50         if (end - p >= 2) {
2919 1           int16_t err = kf_read_i16(p); p += 2;
2920 1           hv_store(result, "error_code", 10, newSViv(err), 0);
2921             }
2922              
2923 1           return sv_2mortal(newRV_noinc((SV*)result));
2924             }
2925              
2926             /* CreateTopics response parser (API 19, v0-v4) */
2927 1           static SV* conn_parse_create_topics_response(pTHX_ ev_kafka_conn_t *self,
2928             int16_t version, const char *data, size_t len)
2929             {
2930 1           const char *p = data;
2931 1           const char *end = data + len;
2932 1           HV *result = newHV();
2933 1           AV *topics_av = newAV();
2934             int n;
2935             (void)self;
2936              
2937 1 50         if (version >= 2 && end - p >= 4) p += 4; /* throttle_time_ms */
    0          
2938              
2939 1 50         if (end - p < 4) goto done;
2940 1           int32_t tc = kf_read_i32(p); p += 4;
2941             int32_t i;
2942 2 100         for (i = 0; i < tc; i++) {
2943 1           HV *th = newHV();
2944             const char *tname; int16_t tnlen;
2945 1           n = kf_read_string(p, end, &tname, &tnlen);
2946 1 50         if (n < 0) goto done;
2947 1           p += n;
2948 1 50         hv_store(th, "name", 4, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
    50          
2949              
2950 1 50         if (end - p < 2) goto done;
2951 1           int16_t err = kf_read_i16(p); p += 2;
2952 1           hv_store(th, "error_code", 10, newSViv(err), 0);
2953              
2954             /* error_message (v1+) */
2955 1 50         if (version >= 1) {
2956             const char *emsg; int16_t elen;
2957 0           n = kf_read_string(p, end, &emsg, &elen);
2958 0 0         if (n < 0) goto done;
2959 0           p += n;
2960 0 0         if (emsg && elen > 0)
    0          
2961 0           hv_store(th, "error_message", 13, newSVpvn(emsg, elen), 0);
2962             }
2963              
2964 1           av_push(topics_av, newRV_noinc((SV*)th));
2965             }
2966              
2967 1           done:
2968 1           hv_store(result, "topics", 6, newRV_noinc((SV*)topics_av), 0);
2969 1           return sv_2mortal(newRV_noinc((SV*)result));
2970             }
2971              
2972             /* DeleteTopics response parser (API 20, v0-v3) */
2973 1           static SV* conn_parse_delete_topics_response(pTHX_ ev_kafka_conn_t *self,
2974             int16_t version, const char *data, size_t len)
2975             {
2976 1           const char *p = data;
2977 1           const char *end = data + len;
2978 1           HV *result = newHV();
2979 1           AV *topics_av = newAV();
2980             int n;
2981             (void)self;
2982              
2983 1 50         if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */
    0          
2984              
2985 1 50         if (end - p < 4) goto done;
2986 1           int32_t tc = kf_read_i32(p); p += 4;
2987             int32_t i;
2988 2 100         for (i = 0; i < tc; i++) {
2989 1           HV *th = newHV();
2990             const char *tname; int16_t tnlen;
2991 1           n = kf_read_string(p, end, &tname, &tnlen);
2992 1 50         if (n < 0) goto done;
2993 1           p += n;
2994 1 50         hv_store(th, "name", 4, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
    50          
2995              
2996 1 50         if (end - p < 2) goto done;
2997 1           int16_t err = kf_read_i16(p); p += 2;
2998 1           hv_store(th, "error_code", 10, newSViv(err), 0);
2999              
3000 1           av_push(topics_av, newRV_noinc((SV*)th));
3001             }
3002              
3003 1           done:
3004 1           hv_store(result, "topics", 6, newRV_noinc((SV*)topics_av), 0);
3005 1           return sv_2mortal(newRV_noinc((SV*)result));
3006             }
3007              
3008             /* InitProducerId response parser (API 22, v0-v2) */
3009 1           static SV* conn_parse_init_producer_id_response(pTHX_ ev_kafka_conn_t *self,
3010             int16_t version, const char *data, size_t len)
3011             {
3012 1           const char *p = data;
3013 1           const char *end = data + len;
3014 1           HV *result = newHV();
3015             (void)self;
3016              
3017 1 50         if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */
    50          
3018              
3019 1 50         if (end - p < 2) goto done;
3020 1           int16_t err = kf_read_i16(p); p += 2;
3021 1           hv_store(result, "error_code", 10, newSViv(err), 0);
3022              
3023 1 50         if (end - p < 8) goto done;
3024 1           int64_t producer_id = kf_read_i64(p); p += 8;
3025 1           hv_store(result, "producer_id", 11, newSViv(producer_id), 0);
3026              
3027 1 50         if (end - p < 2) goto done;
3028 1           int16_t producer_epoch = kf_read_i16(p); p += 2;
3029 1           hv_store(result, "producer_epoch", 14, newSViv(producer_epoch), 0);
3030              
3031 1           done:
3032 1           return sv_2mortal(newRV_noinc((SV*)result));
3033             }
3034              
3035             /* AddPartitionsToTxn response parser (API 24, v0-v1) */
3036 1           static SV* conn_parse_add_partitions_to_txn_response(pTHX_ ev_kafka_conn_t *self,
3037             int16_t version, const char *data, size_t len)
3038             {
3039 1           const char *p = data;
3040 1           const char *end = data + len;
3041 1           HV *result = newHV();
3042 1           AV *topics_av = newAV();
3043             int n;
3044             (void)self; (void)version;
3045              
3046 1 50         if (end - p < 4) goto done;
3047 1           p += 4; /* throttle_time_ms */
3048              
3049 1 50         if (end - p < 4) goto done;
3050 1           int32_t tc = kf_read_i32(p); p += 4;
3051             int32_t i;
3052 2 100         for (i = 0; i < tc; i++) {
3053 1           HV *th = newHV();
3054             const char *tname; int16_t tnlen;
3055 1           n = kf_read_string(p, end, &tname, &tnlen);
3056 1 50         if (n < 0) goto done;
3057 1           p += n;
3058 1 50         hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
    50          
3059              
3060 1 50         if (end - p < 4) goto done;
3061 1           int32_t pc = kf_read_i32(p); p += 4;
3062 1           AV *parts_av = newAV();
3063             int32_t j;
3064 2 100         for (j = 0; j < pc; j++) {
3065 1           HV *ph = newHV();
3066 1 50         if (end - p < 6) goto done;
3067 1           int32_t pid = kf_read_i32(p); p += 4;
3068 1           int16_t err = kf_read_i16(p); p += 2;
3069 1           hv_store(ph, "partition", 9, newSViv(pid), 0);
3070 1           hv_store(ph, "error_code", 10, newSViv(err), 0);
3071 1           av_push(parts_av, newRV_noinc((SV*)ph));
3072             }
3073 1           hv_store(th, "partitions", 10, newRV_noinc((SV*)parts_av), 0);
3074 1           av_push(topics_av, newRV_noinc((SV*)th));
3075             }
3076              
3077 1           done:
3078 1           hv_store(result, "topics", 6, newRV_noinc((SV*)topics_av), 0);
3079 1           return sv_2mortal(newRV_noinc((SV*)result));
3080             }
3081              
3082             /* EndTxn response parser (API 26, v0-v1) */
3083 1           static SV* conn_parse_end_txn_response(pTHX_ ev_kafka_conn_t *self,
3084             int16_t version, const char *data, size_t len)
3085             {
3086 1           const char *p = data;
3087 1           const char *end = data + len;
3088 1           HV *result = newHV();
3089             (void)self; (void)version;
3090              
3091 1 50         if (end - p >= 4) p += 4; /* throttle_time_ms */
3092 1 50         if (end - p >= 2) {
3093 1           int16_t err = kf_read_i16(p); p += 2;
3094 1           hv_store(result, "error_code", 10, newSViv(err), 0);
3095             }
3096              
3097 1           return sv_2mortal(newRV_noinc((SV*)result));
3098             }
3099              
3100             /* TxnOffsetCommit response parser (API 28, v0-v2) */
3101 1           static SV* conn_parse_txn_offset_commit_response(pTHX_ ev_kafka_conn_t *self,
3102             int16_t version, const char *data, size_t len)
3103             {
3104 1           const char *p = data;
3105 1           const char *end = data + len;
3106 1           HV *result = newHV();
3107 1           AV *topics_av = newAV();
3108             int n;
3109             (void)self;
3110              
3111 1 50         if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */
    0          
3112              
3113 1 50         if (end - p < 4) goto done;
3114 1           int32_t tc = kf_read_i32(p); p += 4;
3115             int32_t i;
3116 1 50         for (i = 0; i < tc; i++) {
3117 0           HV *th = newHV();
3118             const char *tname; int16_t tnlen;
3119 0           n = kf_read_string(p, end, &tname, &tnlen);
3120 0 0         if (n < 0) goto done;
3121 0           p += n;
3122 0 0         hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
    0          
3123              
3124 0 0         if (end - p < 4) goto done;
3125 0           int32_t pc = kf_read_i32(p); p += 4;
3126 0           AV *parts_av = newAV();
3127             int32_t j;
3128 0 0         for (j = 0; j < pc; j++) {
3129 0           HV *ph = newHV();
3130 0 0         if (end - p < 6) goto done;
3131 0           int32_t pid = kf_read_i32(p); p += 4;
3132 0           int16_t err = kf_read_i16(p); p += 2;
3133 0           hv_store(ph, "partition", 9, newSViv(pid), 0);
3134 0           hv_store(ph, "error_code", 10, newSViv(err), 0);
3135 0           av_push(parts_av, newRV_noinc((SV*)ph));
3136             }
3137 0           hv_store(th, "partitions", 10, newRV_noinc((SV*)parts_av), 0);
3138 0           av_push(topics_av, newRV_noinc((SV*)th));
3139             }
3140              
3141 1           done:
3142 1           hv_store(result, "topics", 6, newRV_noinc((SV*)topics_av), 0);
3143 1           return sv_2mortal(newRV_noinc((SV*)result));
3144             }
3145              
3146             /* ================================================================
3147             * Response processing loop
3148             * ================================================================ */
3149              
3150 1           static void conn_process_responses(pTHX_ ev_kafka_conn_t *self) {
3151 3 100         while (self->rbuf_len >= 4) {
3152 1           int32_t msg_size = kf_read_i32(self->rbuf);
3153 1 50         if (msg_size < 0 || msg_size > 256 * 1024 * 1024) {
    50          
3154 0           conn_emit_error(aTHX_ self, "invalid response size");
3155 0 0         if (conn_check_destroyed(self)) return;
3156 0           conn_handle_disconnect(aTHX_ self, "invalid response size");
3157 0           return;
3158             }
3159              
3160 1 50         if (self->rbuf_len < (size_t)(4 + msg_size))
3161 0           break; /* incomplete response */
3162              
3163 1           const char *msg = self->rbuf + 4;
3164              
3165             /* correlation_id is first 4 bytes of message */
3166 1 50         if (msg_size < 4) {
3167 0           conn_emit_error(aTHX_ self, "response too short");
3168 0 0         if (conn_check_destroyed(self)) return;
3169 0           conn_handle_disconnect(aTHX_ self, "response too short");
3170 0           return;
3171             }
3172              
3173 1           int32_t corr_id = kf_read_i32(msg);
3174 1           const char *payload = msg + 4;
3175 1           size_t payload_len = (size_t)msg_size - 4;
3176              
3177             /* Find matching callback (should be head of queue — Kafka guarantees ordering) */
3178 1           ev_kafka_conn_cb_t *cbt = NULL;
3179 1 50         if (!ngx_queue_empty(&self->cb_queue)) {
3180 1           ngx_queue_t *q = ngx_queue_head(&self->cb_queue);
3181 1           cbt = ngx_queue_data(q, ev_kafka_conn_cb_t, queue);
3182 1 50         if (cbt->correlation_id != corr_id) {
3183             /* Out of order — shouldn't happen with Kafka, but handle gracefully */
3184             char errbuf[128];
3185 0           snprintf(errbuf, sizeof(errbuf),
3186             "correlation ID mismatch: expected %d, got %d",
3187             cbt->correlation_id, corr_id);
3188 0           conn_emit_error(aTHX_ self, errbuf);
3189 0 0         if (conn_check_destroyed(self)) return;
3190 0           conn_handle_disconnect(aTHX_ self, "correlation ID mismatch");
3191 0           return;
3192             }
3193 1           ngx_queue_remove(q);
3194 1           self->pending_count--;
3195             }
3196              
3197             /* Dispatch BEFORE compacting — payload points into rbuf */
3198 1           size_t consumed = 4 + (size_t)msg_size;
3199 1 50         if (cbt) {
3200 1           conn_dispatch_response(aTHX_ self, cbt, payload, payload_len);
3201 1 50         if (cbt->cb) SvREFCNT_dec(cbt->cb);
3202 1           Safefree(cbt);
3203 1 50         if (conn_check_destroyed(self)) return;
3204             }
3205              
3206             /* Compact rbuf after dispatch */
3207 1           self->rbuf_len -= consumed;
3208 1 50         if (self->rbuf_len > 0)
3209 0           memmove(self->rbuf, self->rbuf + consumed, self->rbuf_len);
3210             }
3211             }
3212              
3213             /* ================================================================
3214             * I/O callback
3215             * ================================================================ */
3216              
3217 4           static void conn_io_cb(EV_P_ ev_io *w, int revents) {
3218 4           ev_kafka_conn_t *self = (ev_kafka_conn_t *)w->data;
3219             dTHX;
3220             (void)loop;
3221              
3222 4 50         if (!self || self->magic != KF_MAGIC_ALIVE) return;
    50          
3223              
3224             /* TCP connect in progress */
3225 4 100         if (self->state == CONN_CONNECTING) {
3226 2           int err = 0;
3227 2           socklen_t errlen = sizeof(err);
3228              
3229 2 50         if (self->timing) {
3230 2           ev_timer_stop(self->loop, &self->timer);
3231 2           self->timing = 0;
3232             }
3233 2           conn_stop_writing(self);
3234              
3235 2 50         if (getsockopt(self->fd, SOL_SOCKET, SO_ERROR, &err, &errlen) < 0)
3236 0           err = errno;
3237 2 100         if (err != 0) {
3238             char errbuf[256];
3239 1           snprintf(errbuf, sizeof(errbuf), "connect: %s", strerror(err));
3240 1           conn_emit_error(aTHX_ self, errbuf);
3241 1 50         if (conn_check_destroyed(self)) return;
3242 1           conn_handle_disconnect(aTHX_ self, errbuf);
3243 1           return;
3244             }
3245              
3246 1           conn_on_connect_done(aTHX_ self);
3247 1           return;
3248             }
3249              
3250             #ifdef HAVE_OPENSSL
3251             /* TLS handshake in progress */
3252 2 50         if (self->state == CONN_TLS_HANDSHAKE) {
3253 0           ERR_clear_error();
3254 0           int ret = SSL_connect(self->ssl);
3255 0 0         if (ret == 1) {
3256 0           conn_stop_reading(self);
3257 0           conn_stop_writing(self);
3258              
3259             /* TLS done — proceed to ApiVersions (or SASL if needed) */
3260 0           conn_send_api_versions(aTHX_ self);
3261 0           conn_start_reading(self);
3262 0           return;
3263             }
3264 0           int err = SSL_get_error(self->ssl, ret);
3265 0 0         if (err == SSL_ERROR_WANT_READ) {
3266 0           conn_stop_writing(self);
3267 0           conn_start_reading(self);
3268 0 0         } else if (err == SSL_ERROR_WANT_WRITE) {
3269 0           conn_stop_reading(self);
3270 0           conn_start_writing(self);
3271             } else {
3272             char errbuf[256];
3273 0           unsigned long e = ERR_peek_last_error();
3274 0 0         if (e) {
3275 0           snprintf(errbuf, sizeof(errbuf), "SSL_connect failed: %s",
3276             ERR_reason_error_string(e));
3277             } else {
3278 0           snprintf(errbuf, sizeof(errbuf), "SSL_connect failed (err=%d)", err);
3279             }
3280 0           ERR_clear_error();
3281 0           conn_emit_error(aTHX_ self, errbuf);
3282 0 0         if (conn_check_destroyed(self)) return;
3283 0           conn_handle_disconnect(aTHX_ self, errbuf);
3284             }
3285 0           return;
3286             }
3287             #endif
3288              
3289             /* Write */
3290 2 100         if (revents & EV_WRITE) {
3291 2 100         while (self->wbuf_off < self->wbuf_len) {
3292 1           ssize_t n = kf_io_write(self, self->wbuf + self->wbuf_off,
3293 1           self->wbuf_len - self->wbuf_off);
3294 1 50         if (n < 0) {
3295 0 0         if (errno == EAGAIN || errno == EWOULDBLOCK) break;
    0          
3296 0           conn_emit_error(aTHX_ self, strerror(errno));
3297 0 0         if (conn_check_destroyed(self)) return;
3298 0           conn_handle_disconnect(aTHX_ self, "write error");
3299 0           return;
3300             }
3301 1 50         if (n == 0) {
3302 0           conn_handle_disconnect(aTHX_ self, "connection closed");
3303 0           return;
3304             }
3305 1           self->wbuf_off += n;
3306             }
3307              
3308 1 50         if (self->wbuf_off >= self->wbuf_len) {
3309 1           self->wbuf_off = 0;
3310 1           self->wbuf_len = 0;
3311 1           conn_stop_writing(self);
3312             }
3313             }
3314              
3315             /* Read */
3316 2 100         if (revents & EV_READ) {
3317 1           conn_ensure_rbuf(self, self->rbuf_len + 8192);
3318 1           ssize_t n = kf_io_read(self, self->rbuf + self->rbuf_len,
3319 1           self->rbuf_cap - self->rbuf_len);
3320 1 50         if (n < 0) {
3321 0 0         if (errno == EAGAIN || errno == EWOULDBLOCK) return;
    0          
3322 0           conn_emit_error(aTHX_ self, strerror(errno));
3323 0 0         if (conn_check_destroyed(self)) return;
3324 0           conn_handle_disconnect(aTHX_ self, "read error");
3325 0           return;
3326             }
3327 1 50         if (n == 0) {
3328 0           conn_handle_disconnect(aTHX_ self, "connection closed by broker");
3329 0           return;
3330             }
3331 1           self->rbuf_len += n;
3332              
3333 1           conn_process_responses(aTHX_ self);
3334             }
3335             }
3336              
3337             /* ================================================================
3338             * Connect timer (timeout)
3339             * ================================================================ */
3340              
3341 0           static void conn_timer_cb(EV_P_ ev_timer *w, int revents) {
3342 0           ev_kafka_conn_t *self = (ev_kafka_conn_t *)w->data;
3343             dTHX;
3344             (void)loop;
3345             (void)revents;
3346              
3347 0           self->timing = 0;
3348 0 0         if (self->magic != KF_MAGIC_ALIVE) return;
3349              
3350 0           conn_emit_error(aTHX_ self, "connect timeout");
3351 0 0         if (conn_check_destroyed(self)) return;
3352 0           conn_handle_disconnect(aTHX_ self, "connect timeout");
3353             }
3354              
3355             /* ================================================================
3356             * TCP connect + handshake initiation
3357             * ================================================================ */
3358              
3359             #ifdef HAVE_OPENSSL
3360 0           static int is_ip_literal(const char *host) {
3361             struct in_addr addr4;
3362             struct in6_addr addr6;
3363 0           return (inet_pton(AF_INET, host, &addr4) == 1 ||
3364 0           inet_pton(AF_INET6, host, &addr6) == 1);
3365             }
3366             #endif
3367              
3368 1           static void conn_on_connect_done(pTHX_ ev_kafka_conn_t *self) {
3369             /* TCP connected — set up I/O watchers if not already */
3370              
3371             #ifdef HAVE_OPENSSL
3372 1 50         if (self->tls_enabled) {
3373 0           self->ssl_ctx = SSL_CTX_new(TLS_client_method());
3374 0 0         if (!self->ssl_ctx) {
3375 0           conn_emit_error(aTHX_ self, "SSL_CTX_new failed");
3376 0 0         if (conn_check_destroyed(self)) return;
3377 0           conn_handle_disconnect(aTHX_ self, "SSL_CTX_new failed");
3378 0           return;
3379             }
3380 0           SSL_CTX_set_default_verify_paths(self->ssl_ctx);
3381 0 0         if (self->tls_skip_verify)
3382 0           SSL_CTX_set_verify(self->ssl_ctx, SSL_VERIFY_NONE, NULL);
3383             else
3384 0           SSL_CTX_set_verify(self->ssl_ctx, SSL_VERIFY_PEER, NULL);
3385              
3386 0 0         if (self->tls_ca_file) {
3387 0 0         if (SSL_CTX_load_verify_locations(self->ssl_ctx, self->tls_ca_file, NULL) != 1) {
3388 0           conn_emit_error(aTHX_ self, "SSL_CTX_load_verify_locations failed");
3389 0 0         if (conn_check_destroyed(self)) return;
3390 0           conn_handle_disconnect(aTHX_ self, "SSL_CTX_load_verify_locations failed");
3391 0           return;
3392             }
3393             }
3394              
3395 0           self->ssl = SSL_new(self->ssl_ctx);
3396 0 0         if (!self->ssl) {
3397 0           conn_emit_error(aTHX_ self, "SSL_new failed");
3398 0 0         if (conn_check_destroyed(self)) return;
3399 0           conn_handle_disconnect(aTHX_ self, "SSL_new failed");
3400 0           return;
3401             }
3402 0           SSL_set_fd(self->ssl, self->fd);
3403              
3404 0 0         if (!is_ip_literal(self->host))
3405 0           SSL_set_tlsext_host_name(self->ssl, self->host);
3406              
3407 0 0         if (!self->tls_skip_verify) {
3408 0           X509_VERIFY_PARAM *param = SSL_get0_param(self->ssl);
3409 0           X509_VERIFY_PARAM_set_hostflags(param, X509_CHECK_FLAG_NO_PARTIAL_WILDCARDS);
3410 0 0         if (is_ip_literal(self->host))
3411 0           X509_VERIFY_PARAM_set1_ip_asc(param, self->host);
3412             else
3413 0           X509_VERIFY_PARAM_set1_host(param, self->host, 0);
3414             }
3415              
3416 0           self->state = CONN_TLS_HANDSHAKE;
3417 0           ERR_clear_error();
3418 0           int ret = SSL_connect(self->ssl);
3419 0 0         if (ret == 1) {
3420             /* Immediate success */
3421 0           conn_send_api_versions(aTHX_ self);
3422 0           conn_start_reading(self);
3423 0           return;
3424             }
3425 0           int err = SSL_get_error(self->ssl, ret);
3426 0 0         if (err == SSL_ERROR_WANT_READ) {
3427 0           conn_start_reading(self);
3428 0 0         } else if (err == SSL_ERROR_WANT_WRITE) {
3429 0           conn_start_writing(self);
3430             } else {
3431             char errbuf[256];
3432 0           unsigned long e = ERR_peek_last_error();
3433 0 0         if (e) {
3434 0           snprintf(errbuf, sizeof(errbuf), "SSL_connect failed: %s",
3435             ERR_reason_error_string(e));
3436             } else {
3437 0           snprintf(errbuf, sizeof(errbuf), "SSL_connect failed (err=%d)", err);
3438             }
3439 0           ERR_clear_error();
3440 0           conn_emit_error(aTHX_ self, errbuf);
3441 0 0         if (conn_check_destroyed(self)) return;
3442 0           conn_handle_disconnect(aTHX_ self, errbuf);
3443             }
3444 0           return;
3445             }
3446             #endif
3447              
3448             /* No TLS — send ApiVersions directly */
3449 1           conn_send_api_versions(aTHX_ self);
3450 1           conn_start_reading(self);
3451             }
3452              
3453 2           static void conn_start_connect(pTHX_ ev_kafka_conn_t *self,
3454             const char *host, int port, double timeout)
3455             {
3456             struct addrinfo hints, *res, *rp;
3457             char port_str[16];
3458 2           int fd = -1;
3459              
3460 2 50         if (self->state != CONN_DISCONNECTED) {
3461 0           conn_cleanup(aTHX_ self);
3462             }
3463              
3464             /* Save host/port (skip if already pointing to same string, e.g. reconnect) */
3465 2 50         if (host != self->host) {
3466 2 50         if (self->host) Safefree(self->host);
3467 2           self->host = savepv(host);
3468             }
3469 2           self->port = port;
3470 2           self->intentional_disconnect = 0;
3471              
3472 2           snprintf(port_str, sizeof(port_str), "%d", port);
3473              
3474 2           memset(&hints, 0, sizeof(hints));
3475 2           hints.ai_family = AF_UNSPEC;
3476 2           hints.ai_socktype = SOCK_STREAM;
3477              
3478             /* Fast path: if host is a numeric IP literal, AI_NUMERICHOST avoids any
3479             * DNS resolver work and returns synchronously without blocking the EV
3480             * loop. For non-literal hostnames getaddrinfo still blocks; users that
3481             * need fully-async DNS should resolve in Perl-land before connecting. */
3482 2           hints.ai_flags = AI_NUMERICHOST | AI_NUMERICSERV;
3483 2           int gai_err = getaddrinfo(host, port_str, &hints, &res);
3484 2 50         if (gai_err != 0) {
3485 0           hints.ai_flags = 0;
3486 0           gai_err = getaddrinfo(host, port_str, &hints, &res);
3487             }
3488 2 50         if (gai_err != 0) {
3489             char errbuf[256];
3490 0           snprintf(errbuf, sizeof(errbuf), "resolve: %s", gai_strerror(gai_err));
3491 0           conn_emit_error(aTHX_ self, errbuf);
3492 0           return;
3493             }
3494              
3495 2 50         for (rp = res; rp; rp = rp->ai_next) {
3496 2           fd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
3497 2 50         if (fd < 0) continue;
3498              
3499             /* Non-blocking */
3500 2           fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK);
3501              
3502             /* TCP_NODELAY */
3503             {
3504 2           int one = 1;
3505 2           setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one));
3506             }
3507              
3508 2           int ret = connect(fd, rp->ai_addr, rp->ai_addrlen);
3509 2 50         if (ret == 0) {
3510             /* Immediate connect */
3511 0           self->fd = fd;
3512 0           self->state = CONN_CONNECTING; /* will be advanced in on_connect_done */
3513 0           freeaddrinfo(res);
3514              
3515 0           ev_io_init(&self->rio, conn_io_cb, fd, EV_READ);
3516 0           self->rio.data = (void *)self;
3517 0           ev_io_init(&self->wio, conn_io_cb, fd, EV_WRITE);
3518 0           self->wio.data = (void *)self;
3519              
3520 0           conn_on_connect_done(aTHX_ self);
3521 0           return;
3522             }
3523              
3524 2 50         if (errno == EINPROGRESS) {
3525 2           self->fd = fd;
3526 2           self->state = CONN_CONNECTING;
3527 2           freeaddrinfo(res);
3528              
3529 2           ev_io_init(&self->rio, conn_io_cb, fd, EV_READ);
3530 2           self->rio.data = (void *)self;
3531 2           ev_io_init(&self->wio, conn_io_cb, fd, EV_WRITE);
3532 2           self->wio.data = (void *)self;
3533              
3534 2           conn_start_writing(self); /* wait for connect to complete */
3535              
3536 2 50         if (timeout > 0) {
3537 2           ev_timer_init(&self->timer, conn_timer_cb, timeout, 0.0);
3538 2           self->timer.data = (void *)self;
3539 2           ev_timer_start(self->loop, &self->timer);
3540 2           self->timing = 1;
3541             }
3542 2           return;
3543             }
3544              
3545 0           close(fd);
3546             }
3547              
3548 0           freeaddrinfo(res);
3549 0           conn_emit_error(aTHX_ self, "connect: all addresses failed");
3550             }
3551              
3552             /* ================================================================
3553             * XS INTERFACE
3554             * ================================================================ */
3555              
3556             MODULE = EV::Kafka PACKAGE = EV::Kafka::Conn
3557              
3558             PROTOTYPES: DISABLE
3559              
3560             EV::Kafka::Conn
3561             _new(char *cls, SV *loop_sv)
3562             CODE:
3563             {
3564             struct ev_loop *loop;
3565             ev_kafka_conn_t *self;
3566              
3567 7 50         if (SvOK(loop_sv) && sv_derived_from(loop_sv, "EV::Loop"))
    0          
3568 0           loop = (struct ev_loop *)SvIV(SvRV(loop_sv));
3569             else
3570 7           loop = EV_DEFAULT;
3571              
3572 7           Newxz(self, 1, ev_kafka_conn_t);
3573 7           self->magic = KF_MAGIC_ALIVE;
3574 7           self->loop = loop;
3575 7           self->fd = -1;
3576 7           self->state = CONN_DISCONNECTED;
3577 7           self->next_correlation_id = 1;
3578              
3579 7           Newx(self->rbuf, KF_BUF_INIT, char);
3580 7           self->rbuf_cap = KF_BUF_INIT;
3581 7           self->rbuf_len = 0;
3582 7           Newx(self->wbuf, KF_BUF_INIT, char);
3583 7           self->wbuf_cap = KF_BUF_INIT;
3584 7           self->wbuf_len = 0;
3585 7           self->wbuf_off = 0;
3586              
3587 7           ngx_queue_init(&self->cb_queue);
3588              
3589             /* Default client_id */
3590 7           self->client_id = savepv("ev-kafka");
3591 7           self->client_id_len = 8;
3592              
3593             /* Default: no API versions known */
3594             {
3595             int i;
3596 455 100         for (i = 0; i < API_VERSIONS_MAX_KEY; i++)
3597 448           self->api_versions[i] = -1;
3598             }
3599              
3600 7           self->reconnect_delay_ms = 1000;
3601              
3602 7           RETVAL = self;
3603             }
3604             OUTPUT:
3605             RETVAL
3606              
3607             void
3608             DESTROY(EV::Kafka::Conn self)
3609             CODE:
3610             {
3611 7 50         if (self->magic != KF_MAGIC_ALIVE) return;
3612              
3613 7           self->intentional_disconnect = 1;
3614 7           conn_cleanup(aTHX_ self);
3615 7           conn_cancel_pending(aTHX_ self, "destroyed");
3616              
3617 7           self->magic = KF_MAGIC_FREED;
3618              
3619 7 50         if (self->reconnect_timing) {
3620 0           ev_timer_stop(self->loop, &self->reconnect_timer);
3621 0           self->reconnect_timing = 0;
3622             }
3623              
3624 7 100         CLEAR_HANDLER(self->on_error);
3625 7 100         CLEAR_HANDLER(self->on_connect);
3626 7 50         CLEAR_HANDLER(self->on_disconnect);
3627              
3628 7 100         if (self->host) Safefree(self->host);
3629 7 50         if (self->client_id) Safefree(self->client_id);
3630 7 50         if (self->sasl_mechanism) Safefree(self->sasl_mechanism);
3631 7 50         if (self->sasl_username) Safefree(self->sasl_username);
3632 7 50         if (self->sasl_password) Safefree(self->sasl_password);
3633 7 50         if (self->scram_nonce) Safefree(self->scram_nonce);
3634 7 50         if (self->scram_client_first) Safefree(self->scram_client_first);
3635             #ifdef HAVE_OPENSSL
3636 7           OPENSSL_cleanse(self->scram_server_key, sizeof(self->scram_server_key));
3637 7 50         if (self->scram_auth_message) {
3638 0           OPENSSL_cleanse(self->scram_auth_message, self->scram_auth_message_len);
3639 0           Safefree(self->scram_auth_message);
3640             }
3641             #endif
3642 7 50         if (self->tls_ca_file) Safefree(self->tls_ca_file);
3643 7 50         if (self->rbuf) Safefree(self->rbuf);
3644 7 50         if (self->wbuf) Safefree(self->wbuf);
3645              
3646 7           Safefree(self);
3647             }
3648              
3649             void
3650             connect(EV::Kafka::Conn self, const char *host, int port, double timeout = 0)
3651             CODE:
3652             {
3653 2           conn_start_connect(aTHX_ self, host, port, timeout);
3654             }
3655              
3656             void
3657             disconnect(EV::Kafka::Conn self)
3658             CODE:
3659             {
3660 1           self->intentional_disconnect = 1;
3661 1 50         if (self->reconnect_timing) {
3662 0           ev_timer_stop(self->loop, &self->reconnect_timer);
3663 0           self->reconnect_timing = 0;
3664             }
3665 1           conn_handle_disconnect(aTHX_ self, "disconnected");
3666             }
3667              
3668             int
3669             connected(EV::Kafka::Conn self)
3670             CODE:
3671 3 50         RETVAL = (self->state == CONN_READY) ? 1 : 0;
3672             OUTPUT:
3673             RETVAL
3674              
3675             int
3676             state(EV::Kafka::Conn self)
3677             CODE:
3678 0 0         RETVAL = self->state;
3679             OUTPUT:
3680             RETVAL
3681              
3682             int
3683             pending(EV::Kafka::Conn self)
3684             CODE:
3685 0 0         RETVAL = self->pending_count;
3686             OUTPUT:
3687             RETVAL
3688              
3689             void
3690             on_error(EV::Kafka::Conn self, SV *cb = NULL)
3691             CODE:
3692             {
3693 2 50         CLEAR_HANDLER(self->on_error);
3694 2 50         if (cb && SvOK(cb)) {
    50          
3695 2           self->on_error = newSVsv(cb);
3696             }
3697             }
3698              
3699             void
3700             on_connect(EV::Kafka::Conn self, SV *cb = NULL)
3701             CODE:
3702             {
3703 1 50         CLEAR_HANDLER(self->on_connect);
3704 1 50         if (cb && SvOK(cb)) {
    50          
3705 1           self->on_connect = newSVsv(cb);
3706             }
3707             }
3708              
3709             void
3710             on_disconnect(EV::Kafka::Conn self, SV *cb = NULL)
3711             CODE:
3712             {
3713 0 0         CLEAR_HANDLER(self->on_disconnect);
3714 0 0         if (cb && SvOK(cb)) {
    0          
3715 0           self->on_disconnect = newSVsv(cb);
3716             }
3717             }
3718              
3719             void
3720             client_id(EV::Kafka::Conn self, const char *id = NULL)
3721             CODE:
3722             {
3723 0 0         if (id) {
3724 0 0         if (self->client_id) Safefree(self->client_id);
3725 0           self->client_id = savepv(id);
3726 0           self->client_id_len = strlen(id);
3727             }
3728             }
3729              
3730             void
3731             tls(EV::Kafka::Conn self, int enable, const char *ca_file = NULL, int skip_verify = 0)
3732             CODE:
3733             {
3734 0           self->tls_enabled = enable;
3735 0 0         if (self->tls_ca_file) { Safefree(self->tls_ca_file); self->tls_ca_file = NULL; }
3736 0 0         if (ca_file) self->tls_ca_file = savepv(ca_file);
3737 0           self->tls_skip_verify = skip_verify;
3738             }
3739              
3740             void
3741             sasl(EV::Kafka::Conn self, const char *mechanism, const char *username = NULL, const char *password = NULL)
3742             CODE:
3743             {
3744 0 0         if (self->sasl_mechanism) { Safefree(self->sasl_mechanism); self->sasl_mechanism = NULL; }
3745 0 0         if (self->sasl_username) { Safefree(self->sasl_username); self->sasl_username = NULL; }
3746 0 0         if (self->sasl_password) { Safefree(self->sasl_password); self->sasl_password = NULL; }
3747 0 0         if (SvOK(ST(1))) {
3748 0           self->sasl_mechanism = savepv(mechanism);
3749 0 0         if (username) self->sasl_username = savepv(username);
3750 0 0         if (password) self->sasl_password = savepv(password);
3751             }
3752             }
3753              
3754             void
3755             auto_reconnect(EV::Kafka::Conn self, int enable, int delay_ms = 1000)
3756             CODE:
3757             {
3758 0           self->auto_reconnect = enable;
3759 0           self->reconnect_delay_ms = delay_ms;
3760             }
3761              
3762             void
3763             metadata(EV::Kafka::Conn self, SV *topics_sv, SV *cb)
3764             CODE:
3765             {
3766 1 50         if (self->state != CONN_READY)
3767 1           croak("not connected");
3768              
3769             kf_buf_t body;
3770 0           kf_buf_init(&body);
3771              
3772             /* Metadata v1-v4 (non-flexible) */
3773 0           int16_t ver = self->api_versions[API_METADATA];
3774 0 0         if (ver < 0) ver = 1;
3775 0 0         if (ver > 4) ver = 4;
3776              
3777 0 0         if (SvOK(topics_sv) && SvROK(topics_sv) && SvTYPE(SvRV(topics_sv)) == SVt_PVAV) {
    0          
    0          
3778 0           AV *topics = (AV*)SvRV(topics_sv);
3779 0           SSize_t i, count = av_len(topics) + 1;
3780 0           kf_buf_append_i32(&body, (int32_t)count);
3781 0 0         for (i = 0; i < count; i++) {
3782 0           SV **elem = av_fetch(topics, i, 0);
3783             STRLEN tlen;
3784 0           const char *tname = SvPV(*elem, tlen);
3785 0           kf_buf_append_string(&body, tname, (int16_t)tlen);
3786             }
3787             } else {
3788 0           kf_buf_append_i32(&body, -1); /* null array = all topics */
3789             }
3790              
3791             /* allow_auto_topic_creation (v4+) */
3792 0 0         if (ver >= 4)
3793 0           kf_buf_append_i8(&body, 1);
3794              
3795 0           conn_send_request(aTHX_ self, API_METADATA, ver, &body, cb, 0, 0);
3796 0           kf_buf_free(&body);
3797             }
3798              
3799             void
3800             api_versions(EV::Kafka::Conn self)
3801             PPCODE:
3802             {
3803 0 0         if (!self->api_versions_known)
3804 0           XSRETURN_UNDEF;
3805              
3806 0           HV *hv = newHV();
3807             int i;
3808 0 0         for (i = 0; i < API_VERSIONS_MAX_KEY; i++) {
3809 0 0         if (self->api_versions[i] >= 0) {
3810             char key[8];
3811 0           int klen = snprintf(key, sizeof(key), "%d", i);
3812 0           hv_store(hv, key, klen, newSViv(self->api_versions[i]), 0);
3813             }
3814             }
3815 0 0         EXTEND(SP, 1);
3816 0           mPUSHs(newRV_noinc((SV*)hv));
3817 0           XSRETURN(1);
3818             }
3819              
3820             void
3821             fetch(EV::Kafka::Conn self, const char *topic, int partition, SV *offset_sv, SV *arg1 = NULL, SV *arg2 = NULL)
3822             CODE:
3823             {
3824 1 50         if (self->state != CONN_READY)
3825 1           croak("not connected");
3826              
3827             /* Accept either fetch(..., $cb) or fetch(..., \%opts, $cb).
3828             * $opts may set max_bytes, max_wait_ms, min_bytes. */
3829 0           SV *cb = NULL;
3830 0           HV *opts = NULL;
3831 0 0         if (arg1 && SvROK(arg1) && SvTYPE(SvRV(arg1)) == SVt_PVHV) {
    0          
    0          
3832 0           opts = (HV*)SvRV(arg1);
3833 0           cb = arg2;
3834             } else {
3835 0           cb = arg1;
3836             }
3837              
3838 0           int32_t max_bytes = 1048576;
3839 0           int32_t max_wait_ms = 500;
3840 0           int32_t min_bytes = 1;
3841 0 0         if (opts) {
3842             SV **v;
3843 0 0         if ((v = hv_fetchs(opts, "max_bytes", 0)) && SvOK(*v))
    0          
3844 0           max_bytes = (int32_t)SvIV(*v);
3845 0 0         if ((v = hv_fetchs(opts, "max_wait_ms", 0)) && SvOK(*v))
    0          
3846 0           max_wait_ms = (int32_t)SvIV(*v);
3847 0 0         if ((v = hv_fetchs(opts, "min_bytes", 0)) && SvOK(*v))
    0          
3848 0           min_bytes = (int32_t)SvIV(*v);
3849             }
3850              
3851 0           int64_t offset = SvIV(offset_sv);
3852 0           STRLEN topic_len = strlen(topic);
3853              
3854 0           int16_t ver = self->api_versions[API_FETCH];
3855 0 0         if (ver < 0) ver = 4;
3856 0 0         if (ver > 7) ver = 7;
3857              
3858             kf_buf_t body;
3859 0           kf_buf_init(&body);
3860              
3861 0           kf_buf_append_i32(&body, -1); /* replica_id = -1 (consumer) */
3862 0           kf_buf_append_i32(&body, max_wait_ms);
3863 0           kf_buf_append_i32(&body, min_bytes);
3864              
3865             /* max_bytes (v3+) */
3866 0 0         if (ver >= 3)
3867 0           kf_buf_append_i32(&body, max_bytes);
3868              
3869             /* isolation_level (v4+) */
3870 0 0         if (ver >= 4)
3871 0           kf_buf_append_i8(&body, 0); /* READ_UNCOMMITTED */
3872              
3873             /* session_id + session_epoch (v7+) */
3874 0 0         if (ver >= 7) {
3875 0           kf_buf_append_i32(&body, 0); /* session_id */
3876 0           kf_buf_append_i32(&body, -1); /* session_epoch */
3877             }
3878              
3879             /* topics: ARRAY(1) */
3880 0           kf_buf_append_i32(&body, 1);
3881 0           kf_buf_append_string(&body, topic, (int16_t)topic_len);
3882              
3883             /* partitions: ARRAY(1) */
3884 0           kf_buf_append_i32(&body, 1);
3885 0           kf_buf_append_i32(&body, (int32_t)partition);
3886              
3887             /* fetch_offset */
3888 0           kf_buf_append_i64(&body, offset);
3889              
3890             /* log_start_offset (v5+) */
3891 0 0         if (ver >= 5)
3892 0           kf_buf_append_i64(&body, -1);
3893              
3894             /* partition_max_bytes */
3895 0           kf_buf_append_i32(&body, max_bytes);
3896              
3897             /* forgotten_topics_data (v7+) */
3898 0 0         if (ver >= 7)
3899 0           kf_buf_append_i32(&body, 0); /* empty array */
3900              
3901 0           conn_send_request(aTHX_ self, API_FETCH, ver, &body, cb, 0, 0);
3902 0           kf_buf_free(&body);
3903             }
3904              
3905             void
3906             fetch_multi(EV::Kafka::Conn self, SV *topics_sv, SV *arg1 = NULL, SV *arg2 = NULL)
3907             CODE:
3908             {
3909 0 0         if (self->state != CONN_READY)
3910 0           croak("not connected");
3911              
3912             /* topics_sv: { topic => [{partition => N, offset => N}, ...], ... }
3913             * accept fetch_multi(\%topics, $cb) or fetch_multi(\%topics, \%opts, $cb). */
3914 0 0         if (!SvROK(topics_sv) || SvTYPE(SvRV(topics_sv)) != SVt_PVHV)
    0          
3915 0           croak("fetch_multi: expected hashref");
3916 0           HV *topics_hv = (HV*)SvRV(topics_sv);
3917              
3918 0           SV *cb = NULL;
3919 0           HV *opts = NULL;
3920 0 0         if (arg1 && SvROK(arg1) && SvTYPE(SvRV(arg1)) == SVt_PVHV) {
    0          
    0          
3921 0           opts = (HV*)SvRV(arg1);
3922 0           cb = arg2;
3923             } else {
3924 0           cb = arg1;
3925             }
3926              
3927 0           int32_t max_bytes = 1048576;
3928 0           int32_t max_wait_ms = 500;
3929 0           int32_t min_bytes = 1;
3930 0 0         if (opts) {
3931             SV **v;
3932 0 0         if ((v = hv_fetchs(opts, "max_bytes", 0)) && SvOK(*v))
    0          
3933 0           max_bytes = (int32_t)SvIV(*v);
3934 0 0         if ((v = hv_fetchs(opts, "max_wait_ms", 0)) && SvOK(*v))
    0          
3935 0           max_wait_ms = (int32_t)SvIV(*v);
3936 0 0         if ((v = hv_fetchs(opts, "min_bytes", 0)) && SvOK(*v))
    0          
3937 0           min_bytes = (int32_t)SvIV(*v);
3938             }
3939              
3940 0           int16_t ver = self->api_versions[API_FETCH];
3941 0 0         if (ver < 0) ver = 4;
3942 0 0         if (ver > 7) ver = 7;
3943              
3944             kf_buf_t body;
3945 0           kf_buf_init(&body);
3946              
3947 0           kf_buf_append_i32(&body, -1); /* replica_id */
3948 0           kf_buf_append_i32(&body, max_wait_ms);
3949 0           kf_buf_append_i32(&body, min_bytes);
3950 0 0         if (ver >= 3)
3951 0           kf_buf_append_i32(&body, max_bytes);
3952 0 0         if (ver >= 4)
3953 0           kf_buf_append_i8(&body, 0); /* isolation_level */
3954 0 0         if (ver >= 7) {
3955 0           kf_buf_append_i32(&body, 0); /* session_id */
3956 0           kf_buf_append_i32(&body, -1); /* session_epoch */
3957             }
3958              
3959             /* topics array */
3960 0 0         kf_buf_append_i32(&body, (int32_t)HvUSEDKEYS(topics_hv));
3961              
3962 0           hv_iterinit(topics_hv);
3963             HE *entry;
3964 0 0         while ((entry = hv_iternext(topics_hv))) {
3965             I32 tlen;
3966 0           const char *tname = hv_iterkey(entry, &tlen);
3967 0           kf_buf_append_string(&body, tname, (int16_t)tlen);
3968              
3969 0           SV *parts_sv = hv_iterval(topics_hv, entry);
3970 0 0         if (!SvROK(parts_sv) || SvTYPE(SvRV(parts_sv)) != SVt_PVAV)
    0          
3971 0           croak("fetch_multi: value must be an arrayref");
3972 0           AV *parts_av = (AV*)SvRV(parts_sv);
3973 0           SSize_t i, pc = av_len(parts_av) + 1;
3974 0           kf_buf_append_i32(&body, (int32_t)pc);
3975              
3976 0 0         for (i = 0; i < pc; i++) {
3977 0           SV **elem = av_fetch(parts_av, i, 0);
3978 0 0         if (!elem || !SvROK(*elem))
    0          
3979 0           croak("fetch_multi: partition entry must be a hashref");
3980 0           HV *ph = (HV*)SvRV(*elem);
3981              
3982 0           SV **pid_sv = hv_fetch(ph, "partition", 9, 0);
3983 0 0         int32_t pid = pid_sv ? (int32_t)SvIV(*pid_sv) : 0;
3984 0           kf_buf_append_i32(&body, pid);
3985              
3986 0           SV **off_sv = hv_fetch(ph, "offset", 6, 0);
3987 0 0         int64_t offset = off_sv ? (int64_t)SvIV(*off_sv) : 0;
3988 0           kf_buf_append_i64(&body, offset);
3989              
3990 0 0         if (ver >= 5)
3991 0           kf_buf_append_i64(&body, -1); /* log_start_offset */
3992              
3993 0           kf_buf_append_i32(&body, max_bytes); /* partition_max_bytes */
3994             }
3995             }
3996              
3997 0 0         if (ver >= 7)
3998 0           kf_buf_append_i32(&body, 0); /* forgotten_topics_data */
3999              
4000 0           conn_send_request(aTHX_ self, API_FETCH, ver, &body, cb, 0, 0);
4001 0           kf_buf_free(&body);
4002             }
4003              
4004             void
4005             produce_batch(EV::Kafka::Conn self, const char *topic, int partition, SV *records_sv, SV *opts_sv = NULL, SV *cb = NULL)
4006             CODE:
4007             {
4008 0 0         if (self->state != CONN_READY)
4009 0           croak("not connected");
4010              
4011 0 0         if (!SvROK(records_sv) || SvTYPE(SvRV(records_sv)) != SVt_PVAV)
    0          
4012 0           croak("produce_batch: expected arrayref of records");
4013 0           AV *records_av = (AV*)SvRV(records_sv);
4014              
4015 0           int16_t acks = 1;
4016 0           int compression = COMPRESS_NONE;
4017 0           int64_t producer_id = -1;
4018 0           int16_t producer_epoch = -1;
4019 0           int32_t base_sequence = -1;
4020 0           const char *txn_id = NULL;
4021 0           STRLEN txn_id_len = 0;
4022              
4023 0 0         if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV) {
    0          
    0          
4024 0           HV *opts = (HV*)SvRV(opts_sv);
4025             SV **tmp;
4026 0 0         if ((tmp = hv_fetch(opts, "acks", 4, 0)))
4027 0           acks = (int16_t)SvIV(*tmp);
4028 0 0         if ((tmp = hv_fetch(opts, "transactional_id", 16, 0)) && SvOK(*tmp))
    0          
4029 0           txn_id = SvPV(*tmp, txn_id_len);
4030 0 0         if ((tmp = hv_fetch(opts, "compression", 11, 0))) {
4031             STRLEN clen;
4032 0           const char *cstr = SvPV(*tmp, clen);
4033 0 0         if (clen == 4 && memcmp(cstr, "none", 4) == 0) compression = COMPRESS_NONE;
    0          
4034             #ifdef HAVE_LZ4
4035             else if (clen == 3 && memcmp(cstr, "lz4", 3) == 0) compression = COMPRESS_LZ4;
4036             #endif
4037             #ifdef HAVE_ZLIB
4038 0 0         else if (clen == 4 && memcmp(cstr, "gzip", 4) == 0) compression = COMPRESS_GZIP;
    0          
4039             #endif
4040             #ifdef HAVE_ZSTD
4041             else if (clen == 4 && memcmp(cstr, "zstd", 4) == 0) compression = COMPRESS_ZSTD;
4042             #endif
4043             #ifdef HAVE_SNAPPY
4044             else if (clen == 6 && memcmp(cstr, "snappy", 6) == 0) compression = COMPRESS_SNAPPY;
4045             #endif
4046 0           else croak("unsupported compression: %.*s", (int)clen, cstr);
4047             }
4048 0 0         if ((tmp = hv_fetch(opts, "producer_id", 11, 0)))
4049 0           producer_id = (int64_t)SvIV(*tmp);
4050 0 0         if ((tmp = hv_fetch(opts, "producer_epoch", 14, 0)))
4051 0           producer_epoch = (int16_t)SvIV(*tmp);
4052 0 0         if ((tmp = hv_fetch(opts, "base_sequence", 13, 0)))
4053 0           base_sequence = (int32_t)SvIV(*tmp);
4054 0 0         } else if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVCV) {
    0          
    0          
4055 0           cb = opts_sv;
4056 0           opts_sv = NULL;
4057             }
4058              
4059             struct timeval tv;
4060 0           gettimeofday(&tv, NULL);
4061 0           int64_t timestamp = (int64_t)tv.tv_sec * 1000 + tv.tv_usec / 1000;
4062              
4063             kf_buf_t batch;
4064 0           kf_encode_record_batch_multi(aTHX_ &batch, records_av, timestamp,
4065             compression, producer_id, producer_epoch, base_sequence,
4066             txn_id != NULL ? 1 : 0);
4067              
4068 0           int16_t ver = self->api_versions[API_PRODUCE];
4069 0 0         if (ver < 0) ver = 3;
4070 0 0         if (ver > 7) ver = 7;
4071              
4072 0           STRLEN topic_len = strlen(topic);
4073              
4074             kf_buf_t body;
4075 0           kf_buf_init(&body);
4076              
4077 0 0         if (ver >= 3)
4078 0 0         kf_buf_append_nullable_string(&body, txn_id, txn_id ? (int16_t)txn_id_len : 0);
4079              
4080 0           kf_buf_append_i16(&body, acks);
4081 0           kf_buf_append_i32(&body, 30000);
4082              
4083 0           kf_buf_append_i32(&body, 1);
4084 0           kf_buf_append_string(&body, topic, (int16_t)topic_len);
4085 0           kf_buf_append_i32(&body, 1);
4086 0           kf_buf_append_i32(&body, (int32_t)partition);
4087 0           kf_buf_append_i32(&body, (int32_t)batch.len);
4088 0           kf_buf_append(&body, batch.data, batch.len);
4089              
4090 0 0         conn_send_request(aTHX_ self, API_PRODUCE, ver, &body,
4091 0 0         (cb && SvOK(cb)) ? cb : NULL, 0, (acks == 0) ? 1 : 0);
4092              
4093 0           kf_buf_free(&body);
4094 0           kf_buf_free(&batch);
4095             }
4096              
4097             void
4098             list_offsets(EV::Kafka::Conn self, const char *topic, int partition, SV *timestamp_sv, SV *cb)
4099             CODE:
4100             {
4101 0 0         if (self->state != CONN_READY)
4102 0           croak("not connected");
4103              
4104 0           int64_t timestamp = SvIV(timestamp_sv);
4105 0           STRLEN topic_len = strlen(topic);
4106              
4107             /* -2 = earliest, -1 = latest */
4108 0           int16_t ver = self->api_versions[API_LIST_OFFSETS];
4109 0 0         if (ver < 0) ver = 1;
4110 0 0         if (ver > 5) ver = 5;
4111              
4112             kf_buf_t body;
4113 0           kf_buf_init(&body);
4114              
4115 0           kf_buf_append_i32(&body, -1); /* replica_id */
4116              
4117             /* isolation_level (v2+) */
4118 0 0         if (ver >= 2)
4119 0           kf_buf_append_i8(&body, 0);
4120              
4121             /* topics: ARRAY(1) */
4122 0           kf_buf_append_i32(&body, 1);
4123 0           kf_buf_append_string(&body, topic, (int16_t)topic_len);
4124              
4125             /* partitions: ARRAY(1) */
4126 0           kf_buf_append_i32(&body, 1);
4127 0           kf_buf_append_i32(&body, (int32_t)partition);
4128              
4129             /* current_leader_epoch (v4+) */
4130 0 0         if (ver >= 4)
4131 0           kf_buf_append_i32(&body, -1);
4132              
4133 0           kf_buf_append_i64(&body, timestamp);
4134              
4135 0           conn_send_request(aTHX_ self, API_LIST_OFFSETS, ver, &body, cb, 0, 0);
4136 0           kf_buf_free(&body);
4137             }
4138              
4139             void
4140             produce(EV::Kafka::Conn self, const char *topic, int partition, SV *key_sv, SV *value_sv, SV *opts_sv = NULL, SV *cb = NULL)
4141             CODE:
4142             {
4143 1 50         if (self->state != CONN_READY)
4144 1           croak("not connected");
4145              
4146             /* Handle optional opts hash: produce($topic, $part, $key, $val, \%opts, $cb)
4147             * or produce($topic, $part, $key, $val, $cb)
4148             */
4149 0           HV *headers = NULL;
4150 0           int16_t acks = 1;
4151 0           int compression = COMPRESS_NONE;
4152              
4153 0 0         if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV) {
    0          
    0          
4154 0           HV *opts = (HV*)SvRV(opts_sv);
4155             SV **tmp;
4156 0 0         if ((tmp = hv_fetch(opts, "headers", 7, 0)) && SvROK(*tmp) && SvTYPE(SvRV(*tmp)) == SVt_PVHV)
    0          
    0          
4157 0           headers = (HV*)SvRV(*tmp);
4158 0 0         if ((tmp = hv_fetch(opts, "acks", 4, 0)))
4159 0           acks = (int16_t)SvIV(*tmp);
4160 0 0         if ((tmp = hv_fetch(opts, "compression", 11, 0))) {
4161             STRLEN clen;
4162 0           const char *cstr = SvPV(*tmp, clen);
4163 0 0         if (clen == 4 && memcmp(cstr, "none", 4) == 0) compression = COMPRESS_NONE;
    0          
4164             #ifdef HAVE_LZ4
4165             else if (clen == 3 && memcmp(cstr, "lz4", 3) == 0) compression = COMPRESS_LZ4;
4166             #endif
4167             #ifdef HAVE_ZLIB
4168 0 0         else if (clen == 4 && memcmp(cstr, "gzip", 4) == 0) compression = COMPRESS_GZIP;
    0          
4169             #endif
4170             #ifdef HAVE_ZSTD
4171             else if (clen == 4 && memcmp(cstr, "zstd", 4) == 0) compression = COMPRESS_ZSTD;
4172             #endif
4173             #ifdef HAVE_SNAPPY
4174             else if (clen == 6 && memcmp(cstr, "snappy", 6) == 0) compression = COMPRESS_SNAPPY;
4175             #endif
4176 0           else croak("unsupported compression: %.*s", (int)clen, cstr);
4177             }
4178 0 0         } else if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVCV) {
    0          
    0          
4179             /* opts_sv is actually the callback */
4180 0           cb = opts_sv;
4181 0           opts_sv = NULL;
4182             }
4183              
4184 0           const char *key = NULL;
4185 0           STRLEN key_len = 0;
4186 0 0         if (SvOK(key_sv))
4187 0           key = SvPV(key_sv, key_len);
4188              
4189 0           const char *value = NULL;
4190 0           STRLEN value_len = 0;
4191 0 0         if (SvOK(value_sv))
4192 0           value = SvPV(value_sv, value_len);
4193              
4194             /* Build RecordBatch */
4195             int64_t timestamp;
4196             {
4197 0           SV **ts_tmp = NULL;
4198 0 0         if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV)
    0          
    0          
4199 0           ts_tmp = hv_fetch((HV*)SvRV(opts_sv), "timestamp", 9, 0);
4200 0 0         if (ts_tmp && SvOK(*ts_tmp)) {
    0          
4201 0           timestamp = (int64_t)SvIV(*ts_tmp);
4202             } else {
4203             struct timeval tv;
4204 0           gettimeofday(&tv, NULL);
4205 0           timestamp = (int64_t)tv.tv_sec * 1000 + tv.tv_usec / 1000;
4206             }
4207             }
4208              
4209             /* Build a single-element AV and dispatch to the multi encoder so
4210             * the compression / CRC framing logic lives in one place. */
4211             kf_buf_t batch;
4212             {
4213 0           HV *rec = newHV();
4214 0 0         if (key) hv_stores(rec, "key", newSVpvn(key, key_len));
4215 0 0         if (value) hv_stores(rec, "value", newSVpvn(value, value_len));
4216 0 0         if (headers) hv_stores(rec, "headers", newRV_inc((SV*)headers));
4217 0           AV *records_av = newAV();
4218 0           av_push(records_av, newRV_noinc((SV*)rec));
4219 0           kf_encode_record_batch_multi(aTHX_ &batch, records_av, timestamp,
4220             compression, -1, -1, -1, 0);
4221 0           SvREFCNT_dec((SV*)records_av);
4222             }
4223              
4224             /* Use Produce version — cap at v7 */
4225 0           int16_t ver = self->api_versions[API_PRODUCE];
4226 0 0         if (ver < 0) ver = 3;
4227 0 0         if (ver > 7) ver = 7;
4228              
4229 0           STRLEN topic_len = strlen(topic);
4230              
4231             kf_buf_t body;
4232 0           kf_buf_init(&body);
4233              
4234             /* transactional_id (v3+): nullable string = null */
4235 0 0         if (ver >= 3)
4236 0           kf_buf_append_nullable_string(&body, NULL, 0);
4237              
4238 0           kf_buf_append_i16(&body, acks); /* acks */
4239 0           kf_buf_append_i32(&body, 30000); /* timeout_ms = 30s */
4240              
4241             /* topic_data: ARRAY(1) */
4242 0           kf_buf_append_i32(&body, 1); /* 1 topic */
4243 0           kf_buf_append_string(&body, topic, (int16_t)topic_len);
4244              
4245             /* partition_data: ARRAY(1) */
4246 0           kf_buf_append_i32(&body, 1); /* 1 partition */
4247 0           kf_buf_append_i32(&body, partition);
4248              
4249             /* record_set: BYTES (i32 length + record_batch) */
4250 0           kf_buf_append_i32(&body, (int32_t)batch.len);
4251 0           kf_buf_append(&body, batch.data, batch.len);
4252              
4253 0 0         conn_send_request(aTHX_ self, API_PRODUCE, ver, &body,
4254 0 0         (cb && SvOK(cb)) ? cb : NULL, 0, (acks == 0) ? 1 : 0);
4255              
4256 0           kf_buf_free(&body);
4257 0           kf_buf_free(&batch);
4258             }
4259              
4260             void
4261             find_coordinator(EV::Kafka::Conn self, const char *group_id, SV *cb, int key_type = 0)
4262             CODE:
4263             {
4264 0 0         if (self->state != CONN_READY)
4265 0           croak("not connected");
4266              
4267 0           int16_t ver = self->api_versions[API_FIND_COORDINATOR];
4268 0 0         if (ver < 0) ver = 0;
4269 0 0         if (ver > 2) ver = 2;
4270              
4271             kf_buf_t body;
4272 0           kf_buf_init(&body);
4273              
4274 0           STRLEN glen = strlen(group_id);
4275 0           kf_buf_append_string(&body, group_id, (int16_t)glen);
4276              
4277             /* key_type (v1+): 0=group, 1=transaction */
4278 0 0         if (ver >= 1)
4279 0           kf_buf_append_i8(&body, (int8_t)key_type);
4280              
4281 0           conn_send_request(aTHX_ self, API_FIND_COORDINATOR, ver, &body, cb, 0, 0);
4282 0           kf_buf_free(&body);
4283             }
4284              
4285             void
4286             join_group(EV::Kafka::Conn self, const char *group_id, const char *member_id, SV *topics_sv, SV *cb, int session_timeout_ms = 30000, int rebalance_timeout_ms = 60000, SV *group_instance_id_sv = NULL)
4287             CODE:
4288             {
4289 0 0         if (self->state != CONN_READY)
4290 0           croak("not connected");
4291              
4292 0           int16_t ver = self->api_versions[API_JOIN_GROUP];
4293 0 0         if (ver < 0) ver = 1;
4294 0 0         if (ver > 5) ver = 5;
4295              
4296             kf_buf_t body;
4297 0           kf_buf_init(&body);
4298              
4299 0           STRLEN glen = strlen(group_id);
4300 0           kf_buf_append_string(&body, group_id, (int16_t)glen);
4301 0           kf_buf_append_i32(&body, session_timeout_ms);
4302              
4303             /* rebalance_timeout_ms (v1+) */
4304 0 0         if (ver >= 1)
4305 0           kf_buf_append_i32(&body, rebalance_timeout_ms);
4306              
4307 0           STRLEN mlen = strlen(member_id);
4308 0           kf_buf_append_string(&body, member_id, (int16_t)mlen);
4309              
4310             /* group_instance_id (v5+) */
4311 0 0         if (ver >= 5) {
4312 0 0         if (group_instance_id_sv && SvOK(group_instance_id_sv)) {
    0          
4313             STRLEN gilen;
4314 0           const char *gi = SvPV(group_instance_id_sv, gilen);
4315 0           kf_buf_append_nullable_string(&body, gi, (int16_t)gilen);
4316             } else {
4317 0           kf_buf_append_nullable_string(&body, NULL, 0);
4318             }
4319             }
4320              
4321             /* protocol_type = "consumer" */
4322 0           kf_buf_append_string(&body, "consumer", 8);
4323              
4324             /* protocols: ARRAY(1) */
4325 0           kf_buf_append_i32(&body, 1);
4326              
4327             /* protocol name = "sticky" */
4328 0           kf_buf_append_string(&body, "sticky", 6);
4329              
4330             /* protocol metadata (ConsumerProtocol subscription) */
4331             /* Version:0, Topics:array, UserData:null */
4332             kf_buf_t meta;
4333 0           kf_buf_init(&meta);
4334 0           kf_buf_append_i16(&meta, 0); /* version */
4335              
4336 0           AV *topics = (AV*)SvRV(topics_sv);
4337 0           SSize_t i, tc = av_len(topics) + 1;
4338 0           kf_buf_append_i32(&meta, (int32_t)tc);
4339 0 0         for (i = 0; i < tc; i++) {
4340 0           SV **elem = av_fetch(topics, i, 0);
4341             STRLEN tlen;
4342 0           const char *tname = SvPV(*elem, tlen);
4343 0           kf_buf_append_string(&meta, tname, (int16_t)tlen);
4344             }
4345 0           kf_buf_append_nullable_bytes(&meta, NULL, 0); /* user_data = null */
4346              
4347 0           kf_buf_append_bytes(&body, meta.data, (int32_t)meta.len);
4348 0           kf_buf_free(&meta);
4349              
4350 0           conn_send_request(aTHX_ self, API_JOIN_GROUP, ver, &body, cb, 0, 0);
4351 0           kf_buf_free(&body);
4352             }
4353              
4354             void
4355             sync_group(EV::Kafka::Conn self, const char *group_id, int generation_id, const char *member_id, SV *assignments_sv, SV *cb, SV *group_instance_id_sv = NULL)
4356             CODE:
4357             {
4358 0 0         if (self->state != CONN_READY)
4359 0           croak("not connected");
4360              
4361 0           int16_t ver = self->api_versions[API_SYNC_GROUP];
4362 0 0         if (ver < 0) ver = 0;
4363 0 0         if (ver > 3) ver = 3;
4364              
4365             kf_buf_t body;
4366 0           kf_buf_init(&body);
4367              
4368 0           STRLEN glen = strlen(group_id);
4369 0           kf_buf_append_string(&body, group_id, (int16_t)glen);
4370 0           kf_buf_append_i32(&body, generation_id);
4371              
4372 0           STRLEN mlen = strlen(member_id);
4373 0           kf_buf_append_string(&body, member_id, (int16_t)mlen);
4374              
4375             /* group_instance_id (v3+) */
4376 0 0         if (ver >= 3) {
4377 0 0         if (group_instance_id_sv && SvOK(group_instance_id_sv)) {
    0          
4378             STRLEN gilen;
4379 0           const char *gi = SvPV(group_instance_id_sv, gilen);
4380 0           kf_buf_append_nullable_string(&body, gi, (int16_t)gilen);
4381             } else {
4382 0           kf_buf_append_nullable_string(&body, NULL, 0);
4383             }
4384             }
4385              
4386             /* assignments: ARRAY of {member_id, assignment_bytes} */
4387 0 0         if (SvOK(assignments_sv) && SvROK(assignments_sv)
    0          
4388 0 0         && SvTYPE(SvRV(assignments_sv)) == SVt_PVAV) {
4389 0           AV *assigns = (AV*)SvRV(assignments_sv);
4390 0           SSize_t i, ac = av_len(assigns) + 1;
4391 0           kf_buf_append_i32(&body, (int32_t)ac);
4392              
4393 0 0         for (i = 0; i < ac; i++) {
4394 0           SV **elem = av_fetch(assigns, i, 0);
4395 0 0         if (!elem || !SvROK(*elem)) continue;
    0          
4396 0           HV *ah = (HV*)SvRV(*elem);
4397              
4398 0           SV **mid_sv = hv_fetch(ah, "member_id", 9, 0);
4399 0 0         if (!mid_sv) continue;
4400             STRLEN mid_len;
4401 0           const char *mid = SvPV(*mid_sv, mid_len);
4402 0           kf_buf_append_string(&body, mid, (int16_t)mid_len);
4403              
4404 0           SV **data_sv = hv_fetch(ah, "assignment", 10, 0);
4405 0 0         if (!data_sv) { kf_buf_append_bytes(&body, NULL, 0); continue; }
4406             STRLEN dlen;
4407 0           const char *ddata = SvPV(*data_sv, dlen);
4408 0           kf_buf_append_bytes(&body, ddata, (int32_t)dlen);
4409             }
4410             } else {
4411 0           kf_buf_append_i32(&body, 0); /* empty array */
4412             }
4413              
4414 0           conn_send_request(aTHX_ self, API_SYNC_GROUP, ver, &body, cb, 0, 0);
4415 0           kf_buf_free(&body);
4416             }
4417              
4418             void
4419             heartbeat(EV::Kafka::Conn self, const char *group_id, int generation_id, const char *member_id, SV *cb, SV *group_instance_id_sv = NULL)
4420             CODE:
4421             {
4422 0 0         if (self->state != CONN_READY)
4423 0           croak("not connected");
4424              
4425 0           int16_t ver = self->api_versions[API_HEARTBEAT];
4426 0 0         if (ver < 0) ver = 0;
4427 0 0         if (ver > 4) ver = 4;
4428              
4429             kf_buf_t body;
4430 0           kf_buf_init(&body);
4431              
4432 0           STRLEN glen = strlen(group_id);
4433 0           kf_buf_append_string(&body, group_id, (int16_t)glen);
4434 0           kf_buf_append_i32(&body, generation_id);
4435              
4436 0           STRLEN mlen = strlen(member_id);
4437 0           kf_buf_append_string(&body, member_id, (int16_t)mlen);
4438              
4439             /* group_instance_id (v3+) */
4440 0 0         if (ver >= 3) {
4441 0 0         if (group_instance_id_sv && SvOK(group_instance_id_sv)) {
    0          
4442             STRLEN gilen;
4443 0           const char *gi = SvPV(group_instance_id_sv, gilen);
4444 0           kf_buf_append_nullable_string(&body, gi, (int16_t)gilen);
4445             } else {
4446 0           kf_buf_append_nullable_string(&body, NULL, 0);
4447             }
4448             }
4449              
4450 0           conn_send_request(aTHX_ self, API_HEARTBEAT, ver, &body, cb, 0, 0);
4451 0           kf_buf_free(&body);
4452             }
4453              
4454             void
4455             offset_commit(EV::Kafka::Conn self, const char *group_id, int generation_id, const char *member_id, SV *offsets_sv, SV *cb)
4456             CODE:
4457             {
4458 0 0         if (self->state != CONN_READY)
4459 0           croak("not connected");
4460              
4461 0           int16_t ver = self->api_versions[API_OFFSET_COMMIT];
4462 0 0         if (ver < 0) ver = 2;
4463 0 0         if (ver > 7) ver = 7;
4464              
4465             kf_buf_t body;
4466 0           kf_buf_init(&body);
4467              
4468 0           STRLEN glen = strlen(group_id);
4469 0           kf_buf_append_string(&body, group_id, (int16_t)glen);
4470              
4471             /* generation_id (v1+) */
4472 0 0         if (ver >= 1)
4473 0           kf_buf_append_i32(&body, generation_id);
4474              
4475             /* member_id (v1+) */
4476 0 0         if (ver >= 1) {
4477 0           STRLEN mlen = strlen(member_id);
4478 0           kf_buf_append_string(&body, member_id, (int16_t)mlen);
4479             }
4480              
4481             /* group_instance_id (v7+): null */
4482 0 0         if (ver >= 7)
4483 0           kf_buf_append_nullable_string(&body, NULL, 0);
4484              
4485             /* topics: ARRAY of {topic, partitions: [{partition, committed_offset, metadata}]} */
4486 0           AV *topics = (AV*)SvRV(offsets_sv);
4487 0           SSize_t i, tc = av_len(topics) + 1;
4488 0           kf_buf_append_i32(&body, (int32_t)tc);
4489              
4490 0 0         for (i = 0; i < tc; i++) {
4491 0           SV **elem = av_fetch(topics, i, 0);
4492 0 0         if (!elem || !SvROK(*elem)) continue;
    0          
4493 0           HV *th = (HV*)SvRV(*elem);
4494 0           SV **tname_sv = hv_fetch(th, "topic", 5, 0);
4495 0 0         if (!tname_sv) continue;
4496             STRLEN tnlen;
4497 0           const char *tname = SvPV(*tname_sv, tnlen);
4498 0           kf_buf_append_string(&body, tname, (int16_t)tnlen);
4499              
4500 0           SV **parts_sv = hv_fetch(th, "partitions", 10, 0);
4501 0 0         if (!parts_sv || !SvROK(*parts_sv)) { kf_buf_append_i32(&body, 0); continue; }
    0          
4502 0           AV *parts = (AV*)SvRV(*parts_sv);
4503 0           SSize_t j, pc = av_len(parts) + 1;
4504 0           kf_buf_append_i32(&body, (int32_t)pc);
4505              
4506 0 0         for (j = 0; j < pc; j++) {
4507 0           SV **pelem = av_fetch(parts, j, 0);
4508 0 0         if (!pelem || !SvROK(*pelem)) continue;
    0          
4509 0           HV *ph = (HV*)SvRV(*pelem);
4510              
4511 0           SV **pid_sv = hv_fetch(ph, "partition", 9, 0);
4512 0 0         kf_buf_append_i32(&body, pid_sv ? (int32_t)SvIV(*pid_sv) : 0);
4513              
4514 0           SV **off_sv = hv_fetch(ph, "offset", 6, 0);
4515 0 0         kf_buf_append_i64(&body, off_sv ? (int64_t)SvIV(*off_sv) : 0);
4516              
4517             /* leader_epoch (v6+) */
4518 0 0         if (ver >= 6)
4519 0           kf_buf_append_i32(&body, -1);
4520              
4521             /* metadata: nullable string = empty */
4522 0           kf_buf_append_nullable_string(&body, "", 0);
4523             }
4524             }
4525              
4526 0           conn_send_request(aTHX_ self, API_OFFSET_COMMIT, ver, &body, cb, 0, 0);
4527 0           kf_buf_free(&body);
4528             }
4529              
4530             void
4531             offset_fetch(EV::Kafka::Conn self, const char *group_id, SV *topics_sv, SV *cb)
4532             CODE:
4533             {
4534 0 0         if (self->state != CONN_READY)
4535 0           croak("not connected");
4536              
4537 0           int16_t ver = self->api_versions[API_OFFSET_FETCH];
4538 0 0         if (ver < 0) ver = 1;
4539 0 0         if (ver > 5) ver = 5;
4540              
4541             kf_buf_t body;
4542 0           kf_buf_init(&body);
4543              
4544 0           STRLEN glen = strlen(group_id);
4545 0           kf_buf_append_string(&body, group_id, (int16_t)glen);
4546              
4547             /* topics: ARRAY */
4548 0           AV *topics = (AV*)SvRV(topics_sv);
4549 0           SSize_t i, tc = av_len(topics) + 1;
4550 0           kf_buf_append_i32(&body, (int32_t)tc);
4551              
4552 0 0         for (i = 0; i < tc; i++) {
4553 0           SV **elem = av_fetch(topics, i, 0);
4554 0 0         if (!elem || !SvROK(*elem)) continue;
    0          
4555 0           HV *th = (HV*)SvRV(*elem);
4556 0           SV **tname_sv = hv_fetch(th, "topic", 5, 0);
4557 0 0         if (!tname_sv) continue;
4558             STRLEN tnlen;
4559 0           const char *tname = SvPV(*tname_sv, tnlen);
4560 0           kf_buf_append_string(&body, tname, (int16_t)tnlen);
4561              
4562 0           SV **parts_sv = hv_fetch(th, "partitions", 10, 0);
4563 0 0         if (!parts_sv || !SvROK(*parts_sv)) { kf_buf_append_i32(&body, 0); continue; }
    0          
4564 0           AV *parts = (AV*)SvRV(*parts_sv);
4565 0           SSize_t j, pc = av_len(parts) + 1;
4566 0           kf_buf_append_i32(&body, (int32_t)pc);
4567              
4568 0 0         for (j = 0; j < pc; j++) {
4569 0           SV **pelem = av_fetch(parts, j, 0);
4570 0 0         kf_buf_append_i32(&body, pelem ? (int32_t)SvIV(*pelem) : 0);
4571             }
4572             }
4573              
4574 0           conn_send_request(aTHX_ self, API_OFFSET_FETCH, ver, &body, cb, 0, 0);
4575 0           kf_buf_free(&body);
4576             }
4577              
4578             void
4579             leave_group(EV::Kafka::Conn self, const char *group_id, const char *member_id, SV *cb)
4580             CODE:
4581             {
4582 0 0         if (self->state != CONN_READY)
4583 0           croak("not connected");
4584              
4585 0           int16_t ver = self->api_versions[API_LEAVE_GROUP];
4586 0 0         if (ver < 0) ver = 0;
4587 0 0         if (ver > 3) ver = 3;
4588              
4589             kf_buf_t body;
4590 0           kf_buf_init(&body);
4591              
4592 0           STRLEN glen = strlen(group_id);
4593 0           kf_buf_append_string(&body, group_id, (int16_t)glen);
4594              
4595 0           STRLEN mlen = strlen(member_id);
4596 0           kf_buf_append_string(&body, member_id, (int16_t)mlen);
4597              
4598 0           conn_send_request(aTHX_ self, API_LEAVE_GROUP, ver, &body, cb, 0, 0);
4599 0           kf_buf_free(&body);
4600             }
4601              
4602             void
4603             create_topics(EV::Kafka::Conn self, SV *topics_sv, int timeout_ms, SV *cb)
4604             CODE:
4605             {
4606 0 0         if (self->state != CONN_READY)
4607 0           croak("not connected");
4608              
4609 0           int16_t ver = self->api_versions[API_CREATE_TOPICS];
4610 0 0         if (ver < 0) ver = 0;
4611 0 0         if (ver > 4) ver = 4;
4612              
4613             kf_buf_t body;
4614 0           kf_buf_init(&body);
4615              
4616 0           AV *topics = (AV*)SvRV(topics_sv);
4617 0           SSize_t i, tc = av_len(topics) + 1;
4618 0           kf_buf_append_i32(&body, (int32_t)tc);
4619              
4620 0 0         for (i = 0; i < tc; i++) {
4621 0           SV **elem = av_fetch(topics, i, 0);
4622 0 0         if (!elem || !SvROK(*elem)) continue;
    0          
4623 0           HV *th = (HV*)SvRV(*elem);
4624              
4625 0           SV **name_sv = hv_fetch(th, "name", 4, 0);
4626 0 0         if (!name_sv) continue;
4627             STRLEN nlen;
4628 0           const char *name = SvPV(*name_sv, nlen);
4629 0           kf_buf_append_string(&body, name, (int16_t)nlen);
4630              
4631 0           SV **np_sv = hv_fetch(th, "num_partitions", 14, 0);
4632 0 0         int32_t num_partitions = np_sv ? (int32_t)SvIV(*np_sv) : 1;
4633 0           kf_buf_append_i32(&body, num_partitions);
4634              
4635 0           SV **rf_sv = hv_fetch(th, "replication_factor", 18, 0);
4636 0 0         int16_t replication_factor = rf_sv ? (int16_t)SvIV(*rf_sv) : 1;
4637 0           kf_buf_append_i16(&body, replication_factor);
4638              
4639             /* assignments: empty array */
4640 0           kf_buf_append_i32(&body, 0);
4641              
4642             /* configs: empty array */
4643 0           kf_buf_append_i32(&body, 0);
4644             }
4645              
4646 0           kf_buf_append_i32(&body, timeout_ms);
4647              
4648             /* validate_only (v1+) */
4649 0 0         if (ver >= 1)
4650 0           kf_buf_append_i8(&body, 0);
4651              
4652 0           conn_send_request(aTHX_ self, API_CREATE_TOPICS, ver, &body, cb, 0, 0);
4653 0           kf_buf_free(&body);
4654             }
4655              
4656             void
4657             delete_topics(EV::Kafka::Conn self, SV *topics_sv, int timeout_ms, SV *cb)
4658             CODE:
4659             {
4660 0 0         if (self->state != CONN_READY)
4661 0           croak("not connected");
4662              
4663 0           int16_t ver = self->api_versions[API_DELETE_TOPICS];
4664 0 0         if (ver < 0) ver = 0;
4665 0 0         if (ver > 3) ver = 3;
4666              
4667             kf_buf_t body;
4668 0           kf_buf_init(&body);
4669              
4670 0           AV *topics = (AV*)SvRV(topics_sv);
4671 0           SSize_t i, tc = av_len(topics) + 1;
4672 0           kf_buf_append_i32(&body, (int32_t)tc);
4673              
4674 0 0         for (i = 0; i < tc; i++) {
4675 0           SV **elem = av_fetch(topics, i, 0);
4676 0 0         if (!elem) continue;
4677             STRLEN tlen;
4678 0           const char *tname = SvPV(*elem, tlen);
4679 0           kf_buf_append_string(&body, tname, (int16_t)tlen);
4680             }
4681              
4682 0           kf_buf_append_i32(&body, timeout_ms);
4683              
4684 0           conn_send_request(aTHX_ self, API_DELETE_TOPICS, ver, &body, cb, 0, 0);
4685 0           kf_buf_free(&body);
4686             }
4687              
4688             void
4689             init_producer_id(EV::Kafka::Conn self, SV *transactional_id_sv, int txn_timeout_ms, SV *cb)
4690             CODE:
4691             {
4692 0 0         if (self->state != CONN_READY)
4693 0           croak("not connected");
4694              
4695 0           int16_t ver = self->api_versions[API_INIT_PRODUCER_ID];
4696 0 0         if (ver < 0) ver = 0;
4697 0 0         if (ver > 1) ver = 1;
4698              
4699             kf_buf_t body;
4700 0           kf_buf_init(&body);
4701              
4702 0 0         if (SvOK(transactional_id_sv)) {
4703             STRLEN tlen;
4704 0           const char *tid = SvPV(transactional_id_sv, tlen);
4705 0           kf_buf_append_string(&body, tid, (int16_t)tlen);
4706             } else {
4707 0           kf_buf_append_nullable_string(&body, NULL, 0);
4708             }
4709              
4710 0           kf_buf_append_i32(&body, txn_timeout_ms);
4711              
4712             /* v2+: producer_id(i64=-1), producer_epoch(i16=-1) */
4713 0 0         if (ver >= 2) {
4714 0           kf_buf_append_i64(&body, -1);
4715 0           kf_buf_append_i16(&body, -1);
4716             }
4717              
4718 0           conn_send_request(aTHX_ self, API_INIT_PRODUCER_ID, ver, &body, cb, 0, 0);
4719 0           kf_buf_free(&body);
4720             }
4721              
4722             void
4723             add_partitions_to_txn(EV::Kafka::Conn self, const char *transactional_id, SV *producer_id_sv, int producer_epoch, SV *topics_sv, SV *cb)
4724             CODE:
4725             {
4726 0 0         if (self->state != CONN_READY)
4727 0           croak("not connected");
4728              
4729 0           int16_t ver = self->api_versions[API_ADD_PARTITIONS_TXN];
4730 0 0         if (ver < 0) ver = 0;
4731 0 0         if (ver > 1) ver = 1;
4732              
4733 0           int64_t pid = SvIV(producer_id_sv);
4734              
4735             kf_buf_t body;
4736 0           kf_buf_init(&body);
4737              
4738 0           STRLEN tid_len = strlen(transactional_id);
4739 0           kf_buf_append_string(&body, transactional_id, (int16_t)tid_len);
4740 0           kf_buf_append_i64(&body, pid);
4741 0           kf_buf_append_i16(&body, (int16_t)producer_epoch);
4742              
4743             /* topics: ARRAY of {topic, partitions: ARRAY(i32)} */
4744 0 0         if (!SvROK(topics_sv) || SvTYPE(SvRV(topics_sv)) != SVt_PVAV)
    0          
4745 0           croak("add_partitions_to_txn: expected arrayref");
4746 0           AV *topics = (AV*)SvRV(topics_sv);
4747 0           SSize_t i, tc = av_len(topics) + 1;
4748 0           kf_buf_append_i32(&body, (int32_t)tc);
4749              
4750 0 0         for (i = 0; i < tc; i++) {
4751 0           SV **elem = av_fetch(topics, i, 0);
4752 0 0         if (!elem || !SvROK(*elem)) croak("add_partitions_to_txn: bad element");
    0          
4753 0           HV *th = (HV*)SvRV(*elem);
4754 0           SV **tname_sv = hv_fetch(th, "topic", 5, 0);
4755 0 0         if (!tname_sv) croak("add_partitions_to_txn: missing topic");
4756             STRLEN tnlen;
4757 0           const char *tname = SvPV(*tname_sv, tnlen);
4758 0           kf_buf_append_string(&body, tname, (int16_t)tnlen);
4759              
4760 0           SV **parts_sv = hv_fetch(th, "partitions", 10, 0);
4761 0 0         if (!parts_sv || !SvROK(*parts_sv)) croak("add_partitions_to_txn: missing partitions");
    0          
4762 0           AV *parts = (AV*)SvRV(*parts_sv);
4763 0           SSize_t j, pc = av_len(parts) + 1;
4764 0           kf_buf_append_i32(&body, (int32_t)pc);
4765 0 0         for (j = 0; j < pc; j++) {
4766 0           SV **pelem = av_fetch(parts, j, 0);
4767 0 0         kf_buf_append_i32(&body, pelem ? (int32_t)SvIV(*pelem) : 0);
4768             }
4769             }
4770              
4771 0           conn_send_request(aTHX_ self, API_ADD_PARTITIONS_TXN, ver, &body, cb, 0, 0);
4772 0           kf_buf_free(&body);
4773             }
4774              
4775             void
4776             end_txn(EV::Kafka::Conn self, const char *transactional_id, SV *producer_id_sv, int producer_epoch, int committed, SV *cb)
4777             CODE:
4778             {
4779 0 0         if (self->state != CONN_READY)
4780 0           croak("not connected");
4781              
4782 0           int16_t ver = self->api_versions[API_END_TXN];
4783 0 0         if (ver < 0) ver = 0;
4784 0 0         if (ver > 1) ver = 1;
4785              
4786 0           int64_t pid = SvIV(producer_id_sv);
4787              
4788             kf_buf_t body;
4789 0           kf_buf_init(&body);
4790              
4791 0           STRLEN tid_len = strlen(transactional_id);
4792 0           kf_buf_append_string(&body, transactional_id, (int16_t)tid_len);
4793 0           kf_buf_append_i64(&body, pid);
4794 0           kf_buf_append_i16(&body, (int16_t)producer_epoch);
4795 0           kf_buf_append_i8(&body, committed ? 1 : 0);
4796              
4797 0           conn_send_request(aTHX_ self, API_END_TXN, ver, &body, cb, 0, 0);
4798 0           kf_buf_free(&body);
4799             }
4800              
4801             void
4802             txn_offset_commit(EV::Kafka::Conn self, const char *transactional_id, const char *group_id, SV *producer_id_sv, int producer_epoch, int generation_id, const char *member_id, SV *offsets_sv, SV *cb)
4803             CODE:
4804             {
4805 0 0         if (self->state != CONN_READY)
4806 0           croak("not connected");
4807              
4808 0           int16_t ver = self->api_versions[API_TXN_OFFSET_COMMIT];
4809 0 0         if (ver < 0) ver = 0;
4810 0 0         if (ver > 3) ver = 3;
4811              
4812 0           int64_t pid = SvIV(producer_id_sv);
4813              
4814             kf_buf_t body;
4815 0           kf_buf_init(&body);
4816              
4817 0           STRLEN tid_len = strlen(transactional_id);
4818 0           kf_buf_append_string(&body, transactional_id, (int16_t)tid_len);
4819              
4820 0           STRLEN gid_len = strlen(group_id);
4821 0           kf_buf_append_string(&body, group_id, (int16_t)gid_len);
4822              
4823 0           kf_buf_append_i64(&body, pid);
4824 0           kf_buf_append_i16(&body, (int16_t)producer_epoch);
4825              
4826             /* v3+: generation_id, member_id, group_instance_id */
4827 0 0         if (ver >= 3) {
4828 0           kf_buf_append_i32(&body, generation_id);
4829 0           STRLEN mid_len = strlen(member_id);
4830 0           kf_buf_append_string(&body, member_id, (int16_t)mid_len);
4831 0           kf_buf_append_nullable_string(&body, NULL, 0); /* group_instance_id */
4832             }
4833              
4834             /* offsets: ARRAY of {topic, partitions: [{partition, offset}]} */
4835 0 0         if (!SvROK(offsets_sv) || SvTYPE(SvRV(offsets_sv)) != SVt_PVAV)
    0          
4836 0           croak("txn_offset_commit: expected arrayref");
4837 0           AV *topics = (AV*)SvRV(offsets_sv);
4838 0           SSize_t i, tc = av_len(topics) + 1;
4839 0           kf_buf_append_i32(&body, (int32_t)tc);
4840              
4841 0 0         for (i = 0; i < tc; i++) {
4842 0           SV **elem = av_fetch(topics, i, 0);
4843 0 0         if (!elem || !SvROK(*elem)) croak("txn_offset_commit: bad element");
    0          
4844 0           HV *th = (HV*)SvRV(*elem);
4845              
4846 0           SV **tname_sv = hv_fetch(th, "topic", 5, 0);
4847 0 0         if (!tname_sv) croak("txn_offset_commit: missing topic");
4848             STRLEN tnlen;
4849 0           const char *tname = SvPV(*tname_sv, tnlen);
4850 0           kf_buf_append_string(&body, tname, (int16_t)tnlen);
4851              
4852 0           SV **parts_sv = hv_fetch(th, "partitions", 10, 0);
4853 0 0         if (!parts_sv || !SvROK(*parts_sv)) croak("txn_offset_commit: missing partitions");
    0          
4854 0           AV *parts = (AV*)SvRV(*parts_sv);
4855 0           SSize_t j, pc = av_len(parts) + 1;
4856 0           kf_buf_append_i32(&body, (int32_t)pc);
4857              
4858 0 0         for (j = 0; j < pc; j++) {
4859 0           SV **pelem = av_fetch(parts, j, 0);
4860 0 0         if (!pelem || !SvROK(*pelem)) croak("txn_offset_commit: bad partition");
    0          
4861 0           HV *ph = (HV*)SvRV(*pelem);
4862              
4863 0           SV **ppid_sv = hv_fetch(ph, "partition", 9, 0);
4864 0 0         kf_buf_append_i32(&body, ppid_sv ? (int32_t)SvIV(*ppid_sv) : 0);
4865              
4866 0           SV **off_sv = hv_fetch(ph, "offset", 6, 0);
4867 0 0         kf_buf_append_i64(&body, off_sv ? (int64_t)SvIV(*off_sv) : 0);
4868              
4869             /* leader_epoch (v2+) */
4870 0 0         if (ver >= 2)
4871 0           kf_buf_append_i32(&body, -1);
4872              
4873 0           kf_buf_append_nullable_string(&body, "", 0); /* metadata */
4874             }
4875             }
4876              
4877 0           conn_send_request(aTHX_ self, API_TXN_OFFSET_COMMIT, ver, &body, cb, 0, 0);
4878 0           kf_buf_free(&body);
4879             }
4880              
4881             MODULE = EV::Kafka PACKAGE = EV::Kafka
4882              
4883             int
4884             _murmur2(SV *data_sv)
4885             CODE:
4886             {
4887             STRLEN len;
4888 16           const unsigned char *data = (const unsigned char *)SvPV(data_sv, len);
4889 16           uint32_t h = 0x9747b28c ^ (uint32_t)len;
4890 16           const uint32_t m = 0x5bd1e995;
4891 16           size_t i = 0;
4892 16           size_t remaining = len;
4893              
4894 275 100         while (remaining >= 4) {
4895             uint32_t k;
4896 259           memcpy(&k, data + i, 4); /* little-endian on x86, matches Java */
4897 259           k *= m;
4898 259           k ^= k >> 24;
4899 259           k *= m;
4900 259           h *= m;
4901 259           h ^= k;
4902 259           i += 4;
4903 259           remaining -= 4;
4904             }
4905              
4906 16           switch (remaining) {
4907 5           case 3: h ^= (uint32_t)data[i + 2] << 16; /* fallthrough */
4908 7           case 2: h ^= (uint32_t)data[i + 1] << 8; /* fallthrough */
4909 10           case 1: h ^= (uint32_t)data[i]; h *= m;
4910             }
4911              
4912 16           h ^= h >> 13;
4913 16           h *= m;
4914 16           h ^= h >> 15;
4915              
4916 16 100         RETVAL = (int)(h & 0x7FFFFFFF);
4917             }
4918             OUTPUT:
4919             RETVAL
4920              
4921             unsigned int
4922             _crc32c(SV *data_sv)
4923             CODE:
4924             {
4925             STRLEN len;
4926 7           const char *data = SvPV(data_sv, len);
4927 7           RETVAL = crc32c(data, len);
4928             }
4929             OUTPUT:
4930             RETVAL
4931              
4932             void
4933             _error_name(int code)
4934             PPCODE:
4935             {
4936 12           const char *name = NULL;
4937 12           switch (code) {
4938 1           case 0: name = "NONE"; break;
4939 1           case 1: name = "OFFSET_OUT_OF_RANGE"; break;
4940 0           case 2: name = "CORRUPT_MESSAGE"; break;
4941 1           case 3: name = "UNKNOWN_TOPIC_OR_PARTITION"; break;
4942 0           case 5: name = "LEADER_NOT_AVAILABLE"; break;
4943 1           case 6: name = "NOT_LEADER_OR_FOLLOWER"; break;
4944 0           case 7: name = "REQUEST_TIMED_OUT"; break;
4945 0           case 10: name = "MESSAGE_TOO_LARGE"; break;
4946 1           case 15: name = "COORDINATOR_NOT_AVAILABLE"; break;
4947 1           case 16: name = "NOT_COORDINATOR"; break;
4948 0           case 17: name = "INVALID_TOPIC_EXCEPTION"; break;
4949 0           case 19: name = "NOT_ENOUGH_REPLICAS"; break;
4950 0           case 20: name = "NOT_ENOUGH_REPLICAS_AFTER_APPEND"; break;
4951 0           case 22: name = "ILLEGAL_GENERATION"; break;
4952 0           case 25: name = "UNKNOWN_MEMBER_ID"; break;
4953 0           case 26: name = "INVALID_SESSION_TIMEOUT"; break;
4954 1           case 27: name = "REBALANCE_IN_PROGRESS"; break;
4955 0           case 35: name = "UNSUPPORTED_VERSION"; break;
4956 1           case 36: name = "TOPIC_ALREADY_EXISTS"; break;
4957 0           case 39: name = "REASSIGNMENT_IN_PROGRESS"; break;
4958 0           case 41: name = "NOT_CONTROLLER"; break;
4959 1           case 45: name = "OUT_OF_ORDER_SEQUENCE_NUMBER"; break;
4960 1           case 46: name = "DUPLICATE_SEQUENCE_NUMBER"; break;
4961 0           case 47: name = "INVALID_REPLICATION_FACTOR"; break;
4962 0           case 58: name = "SASL_AUTHENTICATION_FAILED"; break;
4963 0           case 72: name = "LISTENER_NOT_FOUND"; break;
4964 1           case 79: name = "MEMBER_ID_REQUIRED"; break;
4965 1           default: name = "UNKNOWN"; break;
4966             }
4967 12 50         EXTEND(SP, 1);
4968 12           mPUSHp(name, strlen(name));
4969 12           XSRETURN(1);
4970             }
4971              
4972             # ---- internal test helpers (not part of the public API) ----
4973              
4974             SV*
4975             _test_zigzag_i32(int v)
4976             CODE:
4977             {
4978 5           int32_t v32 = (int32_t)v;
4979 5           uint32_t z = (uint32_t)((v32 << 1) ^ (v32 >> 31));
4980 5           RETVAL = newSVuv(z);
4981             }
4982             OUTPUT:
4983             RETVAL
4984              
4985             SV*
4986             _test_zigzag_i64(SV *v_sv)
4987             CODE:
4988             {
4989 5           int64_t v = (int64_t)SvIV(v_sv);
4990 5           uint64_t z = (uint64_t)((v << 1) ^ (v >> 63));
4991 5           RETVAL = newSVuv(z);
4992             }
4993             OUTPUT:
4994             RETVAL
4995              
4996             # Encode then decode a varint; returns the round-tripped i64 or undef on
4997             # malformed input.
4998             SV*
4999             _test_varint_roundtrip(SV *v_sv)
5000             CODE:
5001             {
5002 14           int64_t v = (int64_t)SvIV(v_sv);
5003             kf_buf_t b;
5004 14           kf_buf_init(&b);
5005 14           kf_buf_append_varint(&b, v);
5006             int64_t out;
5007 14           int n = kf_read_varint(b.data, b.data + b.len, &out);
5008 14 50         if (n < 0) RETVAL = &PL_sv_undef;
5009 14           else RETVAL = newSViv((IV)out);
5010 14           kf_buf_free(&b);
5011             }
5012             OUTPUT:
5013             RETVAL
5014              
5015             # Encode a single-record RecordBatch v2 with given options.
5016             # opts: { compression => 0|1|3, producer_id, producer_epoch,
5017             # base_sequence, is_transactional, timestamp }.
5018             SV*
5019             _test_encode_batch(SV *records_sv, SV *opts_sv = NULL)
5020             CODE:
5021             {
5022 16 50         if (!SvROK(records_sv) || SvTYPE(SvRV(records_sv)) != SVt_PVAV)
    50          
5023 0           croak("_test_encode_batch: arrayref required");
5024 16           AV *records = (AV*)SvRV(records_sv);
5025              
5026 16           int compression = 0;
5027 16           int64_t producer_id = -1;
5028 16           int16_t producer_epoch = -1;
5029 16           int32_t base_sequence = -1;
5030 16           int is_txn = 0;
5031 16           int64_t ts = 0;
5032 16 100         if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV) {
    50          
    50          
5033 11           HV *opts = (HV*)SvRV(opts_sv);
5034             SV **v;
5035 11 100         if ((v = hv_fetchs(opts, "compression", 0)) && SvOK(*v))
    50          
5036 10           compression = (int)SvIV(*v);
5037 11 100         if ((v = hv_fetchs(opts, "producer_id", 0)) && SvOK(*v))
    50          
5038 1           producer_id = (int64_t)SvIV(*v);
5039 11 100         if ((v = hv_fetchs(opts, "producer_epoch", 0)) && SvOK(*v))
    50          
5040 1           producer_epoch = (int16_t)SvIV(*v);
5041 11 100         if ((v = hv_fetchs(opts, "base_sequence", 0)) && SvOK(*v))
    50          
5042 1           base_sequence = (int32_t)SvIV(*v);
5043 11 50         if ((v = hv_fetchs(opts, "is_transactional", 0)) && SvOK(*v))
    0          
5044 0           is_txn = (int)SvIV(*v);
5045 11 50         if ((v = hv_fetchs(opts, "timestamp", 0)) && SvOK(*v))
    0          
5046 0           ts = (int64_t)SvIV(*v);
5047             }
5048              
5049             kf_buf_t out;
5050 16           kf_encode_record_batch_multi(aTHX_ &out, records, ts, compression,
5051             producer_id, producer_epoch, base_sequence, is_txn);
5052 16           RETVAL = newSVpvn(out.data, out.len);
5053 16           kf_buf_free(&out);
5054             }
5055             OUTPUT:
5056             RETVAL
5057              
5058             # Parse a captured response body of a known API. Returns the parsed hash
5059             # (same shape user callbacks see) or undef if the parser rejected the bytes.
5060             # api: 'metadata' | 'produce' | 'fetch' | 'list_offsets' | 'find_coordinator'.
5061             SV*
5062             _test_parse_response(const char *api, int version, SV *bytes_sv)
5063             CODE:
5064             {
5065             STRLEN len;
5066 15           const char *data = SvPV(bytes_sv, len);
5067             ev_kafka_conn_t stub;
5068 15           memset(&stub, 0, sizeof(stub));
5069 15           stub.magic = KF_MAGIC_ALIVE;
5070 15           stub.fd = -1;
5071 15           SV *result = NULL;
5072 15 100         if (strcmp(api, "metadata") == 0) {
5073 2           result = conn_parse_metadata_response(aTHX_ &stub, (int16_t)version, data, len);
5074 13 100         } else if (strcmp(api, "produce") == 0) {
5075 1           result = conn_parse_produce_response(aTHX_ &stub, (int16_t)version, data, len);
5076 12 50         } else if (strcmp(api, "fetch") == 0) {
5077 0           result = conn_parse_fetch_response(aTHX_ &stub, (int16_t)version, data, len);
5078 12 50         } else if (strcmp(api, "list_offsets") == 0) {
5079 0           result = conn_parse_list_offsets_response(aTHX_ &stub, (int16_t)version, data, len);
5080 12 100         } else if (strcmp(api, "find_coordinator") == 0) {
5081 1           result = conn_parse_find_coordinator_response(aTHX_ &stub, (int16_t)version, data, len);
5082 11 100         } else if (strcmp(api, "join_group") == 0) {
5083 1           result = conn_parse_join_group_response(aTHX_ &stub, (int16_t)version, data, len);
5084 10 100         } else if (strcmp(api, "sync_group") == 0) {
5085 1           result = conn_parse_sync_group_response(aTHX_ &stub, (int16_t)version, data, len);
5086 9 100         } else if (strcmp(api, "heartbeat") == 0) {
5087 1           result = conn_parse_heartbeat_response(aTHX_ &stub, (int16_t)version, data, len);
5088 8 50         } else if (strcmp(api, "offset_commit") == 0) {
5089 0           result = conn_parse_offset_commit_response(aTHX_ &stub, (int16_t)version, data, len);
5090 8 100         } else if (strcmp(api, "offset_fetch") == 0) {
5091 1           result = conn_parse_offset_fetch_response(aTHX_ &stub, (int16_t)version, data, len);
5092 7 100         } else if (strcmp(api, "leave_group") == 0) {
5093 1           result = conn_parse_leave_group_response(aTHX_ &stub, (int16_t)version, data, len);
5094 6 100         } else if (strcmp(api, "create_topics") == 0) {
5095 1           result = conn_parse_create_topics_response(aTHX_ &stub, (int16_t)version, data, len);
5096 5 100         } else if (strcmp(api, "delete_topics") == 0) {
5097 1           result = conn_parse_delete_topics_response(aTHX_ &stub, (int16_t)version, data, len);
5098 4 100         } else if (strcmp(api, "init_producer_id") == 0) {
5099 1           result = conn_parse_init_producer_id_response(aTHX_ &stub, (int16_t)version, data, len);
5100 3 100         } else if (strcmp(api, "add_partitions_to_txn") == 0) {
5101 1           result = conn_parse_add_partitions_to_txn_response(aTHX_ &stub, (int16_t)version, data, len);
5102 2 100         } else if (strcmp(api, "end_txn") == 0) {
5103 1           result = conn_parse_end_txn_response(aTHX_ &stub, (int16_t)version, data, len);
5104 1 50         } else if (strcmp(api, "txn_offset_commit") == 0) {
5105 1           result = conn_parse_txn_offset_commit_response(aTHX_ &stub, (int16_t)version, data, len);
5106             } else {
5107 0           croak("_test_parse_response: unknown api '%s'", api);
5108             }
5109 15 50         RETVAL = result ? SvREFCNT_inc(result) : &PL_sv_undef;
5110             }
5111             OUTPUT:
5112             RETVAL
5113              
5114             # Decode a RecordBatch v2 blob. Returns arrayref of {offset, key, value,
5115             # headers} on success, or undef on malformed input (CRC mismatch, etc.).
5116             SV*
5117             _test_decode_batch(SV *bytes_sv)
5118             CODE:
5119             {
5120             STRLEN len;
5121 16           const char *data = SvPV(bytes_sv, len);
5122 16           AV *records_av = newAV();
5123             int64_t base_offset;
5124 16           int n = kf_decode_record_batch(aTHX_ data, len, records_av, &base_offset);
5125 16 100         if (n < 0) {
5126 3           SvREFCNT_dec((SV*)records_av);
5127 3           RETVAL = &PL_sv_undef;
5128             } else {
5129 13           RETVAL = newRV_noinc((SV*)records_av);
5130             }
5131             }
5132             OUTPUT:
5133             RETVAL
5134              
5135             BOOT:
5136             {
5137 21 50         I_EV_API("EV::Kafka");
    50          
    50          
5138 21           crc32c_init_table();
5139             }