File Coverage

blib/lib/EV/Kafka.pm
Criterion Covered Total %
statement 208 1046 19.8
branch 38 518 7.3
condition 38 416 9.1
subroutine 21 92 22.8
pod 1 1 100.0
total 306 2073 14.7


line stmt bran cond sub pod time code
1             package EV::Kafka;
2 21     21   1989063 use strict;
  21         35  
  21         623  
3 21     21   63 use warnings;
  21         83  
  21         881  
4 21     21   4647 use EV;
  21         20180  
  21         530  
5              
6             BEGIN {
7 21     21   80 use XSLoader;
  21         51  
  21         751  
8 21     21   68 our $VERSION = '0.02';
9 21         53782 XSLoader::load __PACKAGE__, $VERSION;
10             }
11              
12             sub new {
13 12     12 1 535049 my ($class, %opts) = @_;
14              
15 12         24 my $loop = delete $opts{loop}; # passed through to per-conn _new on demand
16              
17             # Parse brokers
18 12   100     37 my $brokers = delete $opts{brokers} // '127.0.0.1:9092';
19 12         18 my @bootstrap;
20 12         33 for my $b (split /,/, $brokers) {
21 14         80 $b =~ s/^\s+|\s+$//g;
22 14         36 my ($h, $p) = split /:/, $b, 2;
23 14   100     29 $p //= 9092;
24 14         48 push @bootstrap, [$h, $p + 0];
25             }
26              
27             # Store config
28             my $cfg = {
29             bootstrap => \@bootstrap,
30             client_id => delete $opts{client_id} // 'ev-kafka',
31             tls => delete $opts{tls} // 0,
32             tls_ca_file => delete $opts{tls_ca_file},
33             tls_skip_verify => delete $opts{tls_skip_verify} // 0,
34             sasl => delete $opts{sasl},
35 0     0   0 on_error => delete $opts{on_error} // sub { die "EV::Kafka: @_\n" },
36             on_connect => delete $opts{on_connect},
37             on_message => delete $opts{on_message},
38             acks => delete $opts{acks} // -1,
39             linger_ms => delete $opts{linger_ms} // 5,
40             batch_size => delete $opts{batch_size} // 16384,
41             partitioner => delete $opts{partitioner},
42             compression => delete $opts{compression}, # 'lz4', 'gzip', or undef
43             idempotent => delete $opts{idempotent} // 0,
44             transactional_id => delete $opts{transactional_id}, # enables transactions
45             fetch_max_wait_ms => delete $opts{fetch_max_wait_ms} // 500,
46             fetch_max_bytes => delete $opts{fetch_max_bytes} // 1048576,
47             fetch_min_bytes => delete $opts{fetch_min_bytes} // 1,
48 12   50     439 metadata_refresh => delete $opts{metadata_refresh} // 300,
      100        
      50        
      66        
      100        
      50        
      50        
      50        
      50        
      50        
      50        
      50        
49             };
50              
51             # Internal state
52 12         26 $cfg->{conns} = {}; # node_id => EV::Kafka::Conn
53 12         29 $cfg->{meta} = undef; # latest metadata response
54 12         19 $cfg->{leaders} = {}; # "topic:partition" => node_id
55 12         19 $cfg->{broker_map}= {}; # node_id => {host, port}
56 12         19 $cfg->{connected} = 0;
57 12         17 $cfg->{meta_pending} = 0;
58 12         12 $cfg->{pending_ops} = []; # ops waiting for metadata
59              
60             # Producer state
61 12         20 $cfg->{batches} = {}; # "topic:partition" => [{rec, cb}]
62 12         17 $cfg->{next_sequence} = {}; # "topic:partition" => next sequence number
63 12         15 $cfg->{producer_id} = -1;
64 12         16 $cfg->{producer_epoch} = -1;
65 12         22 $cfg->{rr_counter} = 0;
66              
67             # Consumer state
68 12         40 $cfg->{assignments} = []; # [{topic, partition, offset}]
69 12         16 $cfg->{fetch_active} = 0;
70 12         15 $cfg->{group} = undef;
71              
72 12         47 my $self = bless { cfg => $cfg, loop => $loop }, "${class}::Client";
73              
74             # Warn on credentials over plaintext.
75 12 100 100     43 if ($cfg->{sasl} && !$cfg->{tls}) {
76 1   50     2 my $mech = $cfg->{sasl}{mechanism} // '';
77 1 50 33     5 if ($mech eq 'PLAIN' || $mech =~ /^SCRAM-/) {
78 1         13 warn "EV::Kafka: SASL $mech configured without TLS — "
79             . "credentials will be sent over plaintext\n";
80             }
81             }
82              
83 12         52 return $self;
84             }
85              
86             package EV::Kafka::Client;
87 21     21   163 use EV;
  21         28  
  21         582  
88 21     21   87 use Scalar::Util 'weaken';
  21         51  
  21         202361  
89              
90             sub _any_conn {
91 1     1   2 my ($self) = @_;
92 1         1 my $cfg = $self->{cfg};
93 1         2 my $conn = $cfg->{bootstrap_conn};
94 1         1 for my $c (values %{$cfg->{conns}}) {
  1         3  
95 0 0       0 if ($c->connected) { $conn = $c; last }
  0         0  
  0         0  
96             }
97 1 50 33     3 return ($conn && $conn->connected) ? $conn : undef;
98             }
99              
100             sub _get_or_create_conn {
101 0     0   0 my ($self, $node_id) = @_;
102 0         0 my $cfg = $self->{cfg};
103 0 0       0 return $cfg->{conns}{$node_id} if $cfg->{conns}{$node_id};
104              
105 0         0 my $info = $cfg->{broker_map}{$node_id};
106 0 0       0 return undef unless $info;
107              
108 0         0 my $conn = EV::Kafka::Conn::_new('EV::Kafka::Conn', $self->{loop});
109 0         0 $self->_configure_conn($conn);
110              
111 0         0 $cfg->{conns}{$node_id} = $conn;
112 0         0 weaken(my $weak = $self);
113             $conn->on_connect(sub {
114 0 0   0   0 $weak->_drain_pending_for($node_id) if $weak;
115 0         0 });
116 0         0 $conn->connect($info->{host}, $info->{port}, 10.0);
117              
118 0         0 return $conn;
119             }
120              
121             sub _configure_conn {
122 0     0   0 my ($self, $conn) = @_;
123 0         0 my $cfg = $self->{cfg};
124 0         0 $conn->client_id($cfg->{client_id});
125 0 0       0 if ($cfg->{tls}) {
126 0         0 $conn->tls(1, $cfg->{tls_ca_file}, $cfg->{tls_skip_verify});
127             }
128 0 0       0 if ($cfg->{sasl}) {
129 0         0 $conn->sasl($cfg->{sasl}{mechanism}, $cfg->{sasl}{username}, $cfg->{sasl}{password});
130             }
131 0         0 weaken(my $weak_cfg = $cfg);
132             $conn->on_error(sub {
133 0 0 0 0   0 $weak_cfg->{on_error}->($_[0]) if $weak_cfg && $weak_cfg->{on_error};
134 0         0 });
135             }
136              
137             sub _bootstrap_connect {
138 0     0   0 my ($self, $cb) = @_;
139 0         0 my $cfg = $self->{cfg};
140 0         0 my @bs = @{$cfg->{bootstrap}};
  0         0  
141 0         0 my $idx = 0;
142              
143 0         0 my $try; $try = sub {
144 0 0   0   0 if ($idx >= @bs) {
145 0         0 undef $try;
146 0 0       0 $cfg->{on_error}->("all bootstrap brokers unreachable") if $cfg->{on_error};
147 0         0 return;
148             }
149 0         0 my ($host, $port) = @{$bs[$idx++]};
  0         0  
150 0         0 my $conn = EV::Kafka::Conn::_new('EV::Kafka::Conn', $self->{loop});
151 0         0 $self->_configure_conn($conn);
152              
153             $conn->on_error(sub {
154             # try next broker
155 0 0       0 $try->() if $try;
156 0         0 });
157              
158             # $conn must be captured strongly here so the conn struct stays
159             # alive between $try returning and on_connect firing. The cost is
160             # a benign cycle (conn -> on_connect CV -> $conn) cleared at
161             # client DESTROY when $cfg is torn down.
162             $conn->on_connect(sub {
163 0         0 undef $try; # break self-reference cycle
164             $conn->on_error(sub {
165 0 0       0 $cfg->{on_error}->($_[0]) if $cfg->{on_error};
166 0         0 });
167 0         0 $cfg->{bootstrap_conn} = $conn;
168 0         0 $cfg->{connected} = 1;
169 0         0 $self->_refresh_metadata($cb);
170 0         0 });
171              
172 0         0 $conn->connect($host, $port, 10.0);
173 0         0 };
174 0         0 $try->();
175             }
176              
177             sub connect {
178 0     0   0 my ($self, $cb) = @_;
179 0         0 $self->_bootstrap_connect($cb);
180             }
181              
182             sub _merge_metadata {
183 0     0   0 my ($cfg, $meta) = @_;
184 0   0     0 for my $b (@{$meta->{brokers} // []}) {
  0         0  
185             $cfg->{broker_map}{$b->{node_id}} = {
186             host => $b->{host}, port => $b->{port}
187 0         0 };
188             }
189 0   0     0 for my $t (@{$meta->{topics} // []}) {
  0         0  
190 0 0       0 next if $t->{error_code};
191 0   0     0 for my $p (@{$t->{partitions} // []}) {
  0         0  
192 0         0 $cfg->{leaders}{"$t->{name}:$p->{partition}"} = $p->{leader};
193             }
194             }
195             }
196              
197             sub _refresh_metadata {
198 1     1   3 my ($self, $cb) = @_;
199 1         1 my $cfg = $self->{cfg};
200 1 50       7 return if $cfg->{meta_pending};
201 1         2 $cfg->{meta_pending} = 1;
202              
203 1         2 my $conn = $self->_any_conn;
204 1 50       2 unless ($conn) { $cfg->{meta_pending} = 0; return }
  1         1  
  1         2  
205              
206             $conn->metadata(undef, sub {
207 0     0   0 my ($meta, $err) = @_;
208 0         0 $cfg->{meta_pending} = 0;
209 0 0       0 if ($err) {
210 0 0       0 $cfg->{on_error}->("metadata: $err") if $cfg->{on_error};
211 0         0 return;
212             }
213              
214 0         0 $cfg->{meta} = $meta;
215 0         0 _merge_metadata($cfg, $meta);
216              
217             # assign bootstrap_conn a node_id if possible
218 0 0 0     0 if ($cfg->{bootstrap_conn} && $meta->{brokers} && @{$meta->{brokers}}) {
  0   0     0  
219 0         0 my $binfo = $meta->{brokers}[0];
220 0   0     0 $cfg->{conns}{$binfo->{node_id}} //= $cfg->{bootstrap_conn};
221             }
222              
223 0 0 0     0 if (($cfg->{idempotent} || $cfg->{transactional_id}) && $cfg->{producer_id} < 0) {
      0        
224             $self->_init_idempotent(sub {
225 0         0 $self->_drain_all_pending;
226 0 0       0 $cb->($meta) if $cb;
227 0 0       0 $cfg->{on_connect}->() if $cfg->{on_connect};
228 0         0 $cfg->{on_connect} = undef;
229 0         0 });
230             } else {
231 0         0 $self->_drain_all_pending;
232 0 0       0 $cb->($meta) if $cb;
233 0 0       0 $cfg->{on_connect}->() if $cfg->{on_connect};
234 0         0 $cfg->{on_connect} = undef;
235             }
236              
237             # Arm periodic metadata refresh (after first successful fetch).
238 0         0 $self->_arm_metadata_timer;
239 0         0 });
240             }
241              
242             sub _arm_metadata_timer {
243 0     0   0 my ($self) = @_;
244 0         0 my $cfg = $self->{cfg};
245 0 0       0 return if $cfg->{_meta_timer};
246 0   0     0 my $interval = $cfg->{metadata_refresh} || 0;
247 0 0       0 return if $interval <= 0;
248 0         0 weaken(my $weak = $self);
249             $cfg->{_meta_timer} = EV::timer $interval, $interval, sub {
250 0 0   0   0 return unless $weak;
251 0 0       0 $weak->_refresh_metadata unless $weak->{cfg}{meta_pending};
252 0         0 };
253             }
254              
255             sub _disarm_metadata_timer {
256 11     11   16 my ($self) = @_;
257 11         20 undef $self->{cfg}{_meta_timer};
258             }
259              
260             sub _refresh_metadata_for_topic {
261 1     1   22 my ($self, $topic) = @_;
262 1         3 my $cfg = $self->{cfg};
263 1 50       3 return if $cfg->{meta_pending};
264              
265             # Bound retries so a permanently-unavailable topic can't accumulate
266             # produce ops in pending_ops indefinitely.
267 1         2 my $tries = ++$cfg->{_topic_meta_tries}{$topic};
268 1 50       2 if ($tries > 10) {
269 1         10 delete $cfg->{_topic_meta_tries}{$topic};
270 1         3 my $msg = "metadata: topic '$topic' unavailable after $tries tries";
271             # Drop any pending ops queued for this topic and report.
272 1         1 my @keep;
273 1         2 for my $op (@{$cfg->{pending_ops}}) {
  1         3  
274 1 50 33     9 if ($op->{topic} && $op->{topic} eq $topic) {
275 1 50       4 $op->{cb}->(undef, $msg) if $op->{cb};
276             } else {
277 0         0 push @keep, $op;
278             }
279             }
280 1         6 $cfg->{pending_ops} = \@keep;
281 1 50       4 $cfg->{on_error}->($msg) if $cfg->{on_error};
282 1         3 return;
283             }
284              
285 0         0 $cfg->{meta_pending} = 1;
286              
287 0         0 my $conn = $self->_any_conn;
288 0 0       0 unless ($conn) { $cfg->{meta_pending} = 0; return }
  0         0  
  0         0  
289              
290             $conn->metadata([$topic], sub {
291 0     0   0 my ($meta, $err) = @_;
292 0         0 $cfg->{meta_pending} = 0;
293 0 0       0 if ($err) {
294 0 0       0 $cfg->{on_error}->("metadata: $err") if $cfg->{on_error};
295 0         0 return;
296             }
297              
298 0         0 _merge_metadata($cfg, $meta);
299              
300             # Fold the per-topic response into $cfg->{meta} so _num_partitions
301             # (which reads from {meta}{topics}) sees auto-created topics.
302 0 0 0     0 if ($cfg->{meta} && ref $cfg->{meta}{topics} eq 'ARRAY') {
303 0         0 my %by_name = map { $_->{name} => $_ } @{$cfg->{meta}{topics}};
  0         0  
  0         0  
304 0   0     0 for my $t (@{$meta->{topics} // []}) {
  0         0  
305 0         0 $by_name{$t->{name}} = $t;
306             }
307 0         0 $cfg->{meta}{topics} = [values %by_name];
308             } else {
309 0         0 $cfg->{meta} = $meta;
310             }
311              
312             # if topic still has error, retry after delay
313 0         0 my $topic_ok = 0;
314 0   0     0 for my $t (@{$meta->{topics} // []}) {
  0         0  
315 0 0 0     0 if ($t->{name} eq $topic && !$t->{error_code} && @{$t->{partitions} // []}) {
  0   0     0  
      0        
316 0         0 $topic_ok = 1;
317 0         0 last;
318             }
319             }
320              
321 0 0       0 if ($topic_ok) {
322 0         0 delete $cfg->{_topic_meta_tries}{$topic};
323 0         0 $self->_drain_all_pending;
324             } else {
325             # retry after short delay (topic being created); bounded above.
326 0         0 my $t; $t = EV::timer 0.5, 0, sub {
327 0         0 undef $t;
328 0         0 $self->_refresh_metadata_for_topic($topic);
329 0         0 };
330             }
331 0         0 });
332             }
333              
334             sub _init_idempotent {
335 0     0   0 my ($self, $cb) = @_;
336 0         0 my $cfg = $self->{cfg};
337              
338             my $do_init = sub {
339 0     0   0 my ($conn) = @_;
340             $conn->init_producer_id($cfg->{transactional_id}, 30000, sub {
341 0         0 my ($res, $err) = @_;
342 0 0 0     0 if (!$err && $res && !$res->{error_code}) {
      0        
343 0         0 $cfg->{producer_id} = $res->{producer_id};
344 0         0 $cfg->{producer_epoch} = $res->{producer_epoch};
345             } else {
346 0   0     0 my $msg = $err || "InitProducerId error: " . ($res->{error_code} // '?');
347 0 0       0 $cfg->{on_error}->($msg) if $cfg->{on_error};
348             }
349 0 0       0 $cb->() if $cb;
350 0         0 });
351 0         0 };
352              
353 0 0       0 if ($cfg->{transactional_id}) {
354             # transactional: find transaction coordinator first
355 0         0 my $conn = $self->_any_conn;
356 0 0       0 unless ($conn) { $cb->() if $cb; return }
  0 0       0  
  0         0  
357              
358             my $on_coord = sub {
359 0     0   0 my ($res, $err) = @_;
360 0 0 0     0 if ($err || ($res->{error_code} && $res->{error_code} != 0)) {
      0        
361 0         0 $do_init->($conn);
362 0         0 return;
363             }
364             # connect to the transaction coordinator
365             $cfg->{broker_map}{$res->{node_id}} = {
366             host => $res->{host}, port => $res->{port}
367 0         0 };
368 0         0 my $txn_conn = $self->_get_or_create_conn($res->{node_id});
369 0 0 0     0 if ($txn_conn && $txn_conn->connected) {
370 0         0 $cfg->{_txn_coordinator} = $txn_conn;
371 0         0 $do_init->($txn_conn);
372             } else {
373 0         0 push @{$cfg->{pending_ops}}, {
374             node_id => $res->{node_id},
375             run => sub {
376 0         0 $cfg->{_txn_coordinator} = $self->_get_or_create_conn($res->{node_id});
377 0   0     0 $do_init->($cfg->{_txn_coordinator} || $conn);
378             },
379 0         0 };
380             }
381 0         0 };
382 0         0 $conn->find_coordinator($cfg->{transactional_id}, $on_coord, 1);
383             } else {
384             # idempotent only: any broker works
385 0         0 my $conn = $self->_any_conn;
386 0 0       0 unless ($conn) { $cb->() if $cb; return }
  0 0       0  
  0         0  
387 0         0 $do_init->($conn);
388             }
389             }
390              
391             sub _drain_pending_for {
392 0     0   0 my ($self, $node_id) = @_;
393 0         0 my $cfg = $self->{cfg};
394 0         0 my @remaining;
395 0         0 for my $op (@{$cfg->{pending_ops}}) {
  0         0  
396 0 0 0     0 if (defined $op->{node_id} && $op->{node_id} == $node_id) {
397 0         0 $op->{run}->();
398             } else {
399 0         0 push @remaining, $op;
400             }
401             }
402 0         0 $cfg->{pending_ops} = \@remaining;
403             }
404              
405             sub _drain_all_pending {
406 0     0   0 my ($self) = @_;
407 0         0 my $cfg = $self->{cfg};
408 0         0 my @ops = @{$cfg->{pending_ops}};
  0         0  
409 0         0 $cfg->{pending_ops} = [];
410 0         0 for my $op (@ops) {
411 0 0       0 if (defined $op->{node_id}) {
412 0         0 my $conn = $self->_get_or_create_conn($op->{node_id});
413 0 0 0     0 if ($conn && $conn->connected) {
414 0         0 $op->{run}->();
415             } else {
416 0         0 push @{$cfg->{pending_ops}}, $op;
  0         0  
417             }
418             } else {
419 0         0 $op->{run}->();
420             }
421             }
422             }
423              
424             sub _get_leader {
425 1     1   2 my ($self, $topic, $partition) = @_;
426 1         4 return $self->{cfg}{leaders}{"$topic:$partition"};
427             }
428              
429             sub _num_partitions {
430 0     0   0 my ($self, $topic) = @_;
431 0 0       0 my $meta = $self->{cfg}{meta} or return 0;
432 0   0     0 for my $t (@{$meta->{topics} // []}) {
  0         0  
433 0 0       0 return scalar @{$t->{partitions}} if $t->{name} eq $topic;
  0         0  
434             }
435 0         0 return 0;
436             }
437              
438             sub _select_partition {
439 0     0   0 my ($self, $topic, $key) = @_;
440 0         0 my $np = $self->_num_partitions($topic);
441 0 0       0 return 0 unless $np > 0;
442              
443 0         0 my $cfg = $self->{cfg};
444 0 0       0 if ($cfg->{partitioner}) {
445 0         0 return $cfg->{partitioner}->($topic, $key, $np);
446             }
447 0 0 0     0 if (defined $key && length $key) {
448 0         0 return EV::Kafka::_murmur2($key) % $np;
449             }
450 0         0 return $cfg->{rr_counter}++ % $np;
451             }
452              
453             # --- Producer ---
454              
455             sub produce {
456 1     1   8 my ($self, $topic, $key, $value, @rest) = @_;
457 1         2 my $cb;
458             my %opts;
459 1         2 for my $a (@rest) {
460 0 0       0 if (ref $a eq 'CODE') { $cb = $a }
  0 0       0  
461 0         0 elsif (ref $a eq 'HASH') { %opts = %$a }
462             }
463              
464 1         5 my $cfg = $self->{cfg};
465              
466             # ensure we have metadata
467 1 50       3 unless ($cfg->{meta}) {
468 1         26 push @{$cfg->{pending_ops}}, {
469 0     0   0 run => sub { $self->produce($topic, $key, $value, @rest) },
470 1         1 };
471 1 50       11 $self->_refresh_metadata unless $cfg->{meta_pending};
472 1         2 return;
473             }
474              
475             my $partition = exists $opts{partition}
476             ? $opts{partition}
477 0 0       0 : $self->_select_partition($topic, $key);
478              
479 0         0 my $leader_id = $self->_get_leader($topic, $partition);
480 0 0       0 unless (defined $leader_id) {
481             # topic/partition unknown — request metadata for this topic to trigger auto-creation
482 0         0 push @{$cfg->{pending_ops}}, {
483 0     0   0 run => sub { $self->produce($topic, $key, $value, @rest) },
484 0         0 };
485 0 0       0 $self->_refresh_metadata_for_topic($topic) unless $cfg->{meta_pending};
486 0         0 return;
487             }
488              
489 0         0 my $conn = $self->_get_or_create_conn($leader_id);
490 0 0 0     0 unless ($conn && $conn->connected) {
491 0         0 push @{$cfg->{pending_ops}}, {
492             node_id => $leader_id,
493 0     0   0 run => sub { $self->produce($topic, $key, $value, @rest) },
494 0         0 };
495 0         0 return;
496             }
497              
498             # Accumulate into batch
499 0         0 my $bkey = "$topic:$partition";
500 0         0 my $rec = { key => $key, value => $value };
501 0 0       0 $rec->{headers} = $opts{headers} if $opts{headers};
502 0   0     0 push @{$cfg->{batches}{$bkey} //= []}, { rec => $rec, cb => $cb };
  0         0  
503              
504             # Check batch size threshold
505 0         0 my $batch = $cfg->{batches}{$bkey};
506 0         0 my $batch_bytes = 0;
507 0         0 for my $b (@$batch) {
508 0   0     0 $batch_bytes += length($b->{rec}{value} // '') + length($b->{rec}{key} // '') + 20;
      0        
509             }
510              
511 0 0       0 if ($batch_bytes >= $cfg->{batch_size}) {
    0          
512 0         0 $self->_flush_batch($topic, $partition, $conn);
513             } elsif (!$cfg->{_linger_active}) {
514             # start linger timer
515 0         0 $cfg->{_linger_active} = 1;
516 0         0 weaken(my $weak = $self);
517             $cfg->{_linger_timer} = EV::timer $cfg->{linger_ms} / 1000.0, 0, sub {
518 0     0   0 $cfg->{_linger_active} = 0;
519 0 0       0 $weak->_flush_all_batches if $weak;
520 0         0 };
521             }
522             }
523              
524             sub _flush_batch {
525 0     0   0 my ($self, $topic, $partition, $conn) = @_;
526 0         0 my $cfg = $self->{cfg};
527 0         0 my $bkey = "$topic:$partition";
528 0   0     0 my $idempotent = defined $cfg->{producer_id} && $cfg->{producer_id} >= 0;
529              
530             # When idempotent, allow only one in-flight batch per partition.
531             # Two in-flight batches racing a retry can alias sequence numbers and
532             # trigger OutOfOrderSequenceNumber (error 45), which is non-retriable.
533 0 0 0     0 return if $idempotent && $cfg->{_inflight}{$bkey};
534              
535 0         0 my $batch = delete $cfg->{batches}{$bkey};
536 0 0 0     0 return unless $batch && @$batch;
537              
538 0         0 my @records = map { $_->{rec} } @$batch;
  0         0  
539 0         0 my @cbs = map { $_->{cb} } @$batch;
  0         0  
540              
541 0         0 my %popts = (acks => $cfg->{acks});
542 0 0       0 $popts{compression} = $cfg->{compression} if $cfg->{compression};
543 0 0       0 $popts{transactional_id} = $cfg->{transactional_id} if $cfg->{_txn_active};
544 0         0 my $saved_seq;
545 0 0       0 if ($idempotent) {
546 0         0 $popts{producer_id} = $cfg->{producer_id};
547 0         0 $popts{producer_epoch} = $cfg->{producer_epoch};
548 0   0     0 $saved_seq = $cfg->{next_sequence}{$bkey} // 0;
549 0         0 $popts{base_sequence} = $saved_seq;
550 0         0 $cfg->{next_sequence}{$bkey} = $saved_seq + scalar @records;
551 0         0 $cfg->{_inflight}{$bkey} = 1;
552             }
553              
554 0 0       0 $self->_add_txn_partition($topic, $partition) if $cfg->{_txn_active};
555              
556             # retry count persists on the batch across re-queues
557 0   0     0 $cfg->{_batch_retries}{$bkey} //= 3;
558              
559 0         0 weaken(my $weak_self = $self);
560             $conn->produce_batch($topic, $partition, \@records, \%popts, sub {
561 0     0   0 my ($result, $err) = @_;
562 0 0       0 delete $cfg->{_inflight}{$bkey} if $idempotent;
563              
564 0         0 my $retriable = 0;
565 0         0 my $fatal_seq = 0;
566 0 0 0     0 if (!$err && $result && ref $result->{topics} eq 'ARRAY') {
      0        
567 0         0 for my $t (@{$result->{topics}}) {
  0         0  
568 0   0     0 for my $p (@{$t->{partitions} // []}) {
  0         0  
569 0   0     0 my $ec = $p->{error_code} // 0;
570 0 0 0     0 $retriable = $ec if $ec == 6 || $ec == 15 || $ec == 16;
      0        
571             # 45 = OUT_OF_ORDER_SEQUENCE_NUMBER, 46 = DUPLICATE,
572             # 47 = INVALID_PRODUCER_EPOCH (not the validation one).
573 0 0 0     0 $fatal_seq = $ec if $ec == 45 || $ec == 46;
574             }
575             }
576             }
577              
578             # Idempotent fatal sequence error: bump producer epoch and retry
579             # the batch. This is the producer's only recovery path from these
580             # codes — they're non-retriable in the usual sense but require a
581             # fresh InitProducerId.
582 0 0 0     0 if ($idempotent && $fatal_seq && !$cfg->{_txn_active}
      0        
      0        
      0        
583             && ($cfg->{_epoch_retries}{$bkey} //= 1) > 0) {
584 0         0 $cfg->{_epoch_retries}{$bkey}--;
585             # rewind sequence and re-queue the batch
586 0         0 $cfg->{next_sequence} = {};
587 0         0 $cfg->{_acked_sequence} = {};
588 0 0       0 if (exists $cfg->{batches}{$bkey}) {
589 0         0 unshift @{$cfg->{batches}{$bkey}}, @$batch;
  0         0  
590             } else {
591 0         0 $cfg->{batches}{$bkey} = $batch;
592             }
593             # force re-init of producer_id/epoch then re-flush
594 0         0 $cfg->{producer_id} = -1;
595 0         0 $cfg->{producer_epoch} = -1;
596             $weak_self->_init_idempotent(sub {
597 0 0       0 $weak_self->_flush_all_batches if $weak_self;
598 0 0       0 }) if $weak_self;
599 0         0 return;
600             }
601              
602 0 0 0     0 if ($retriable && ($cfg->{_batch_retries}{$bkey} // 0) > 0) {
      0        
603 0         0 $cfg->{_batch_retries}{$bkey}--;
604 0 0       0 $cfg->{next_sequence}{$bkey} = $saved_seq if defined $saved_seq;
605 0 0       0 if (exists $cfg->{batches}{$bkey}) {
606 0         0 unshift @{$cfg->{batches}{$bkey}}, @$batch;
  0         0  
607             } else {
608 0         0 $cfg->{batches}{$bkey} = $batch;
609             }
610 0 0 0     0 $weak_self->_refresh_metadata if $weak_self && !$cfg->{meta_pending};
611 0         0 my $rt; $rt = EV::timer 0.5, 0, sub {
612 0         0 undef $rt;
613 0 0       0 $weak_self->_flush_all_batches if $weak_self;
614 0         0 };
615 0         0 return;
616             }
617 0         0 delete $cfg->{_batch_retries}{$bkey};
618              
619             # Track broker-acked sequence high-water mark for the partition so
620             # abort_transaction can roll back next_sequence and not leave gaps.
621             # Also restore the epoch-retry budget so a much-later 45/46 still
622             # gets a recovery attempt.
623 0 0 0     0 if ($idempotent && !$err && defined $saved_seq) {
      0        
624 0         0 my $next = $saved_seq + scalar @records;
625 0   0     0 my $cur = $cfg->{_acked_sequence}{$bkey} // 0;
626 0 0       0 $cfg->{_acked_sequence}{$bkey} = $next if $next > $cur;
627 0         0 delete $cfg->{_epoch_retries}{$bkey};
628             }
629              
630 0         0 for my $cb (@cbs) {
631 0 0       0 $cb->($result, $err) if $cb;
632             }
633              
634             # Idempotent: kick any batch that accumulated while we were in-flight.
635 0 0 0     0 if ($idempotent && $weak_self
      0        
      0        
636 0         0 && $cfg->{batches}{$bkey} && @{$cfg->{batches}{$bkey}}) {
637 0         0 $weak_self->_flush_all_batches;
638             }
639 0         0 });
640             }
641              
642             sub _flush_all_batches {
643 1     1   14 my ($self) = @_;
644 1         3 my $cfg = $self->{cfg};
645 1         1 my $skipped = 0;
646 1         2 for my $bkey (keys %{$cfg->{batches}}) {
  1         3  
647 1         3 my ($topic, $partition) = split /:/, $bkey, 2;
648 1         4 my $leader_id = $self->_get_leader($topic, $partition);
649 1 50       3 unless (defined $leader_id) { $skipped++; next }
  1         1  
  1         2  
650 0         0 my $conn = $self->_get_or_create_conn($leader_id);
651 0 0 0     0 unless ($conn && $conn->connected) { $skipped++; next }
  0         0  
  0         0  
652 0         0 $self->_flush_batch($topic, $partition, $conn);
653             }
654             # re-arm timer if batches were skipped (connection not yet ready)
655 1 50 33     3 if ($skipped && keys %{$cfg->{batches}}) {
  1         4  
656 1         2 $cfg->{_linger_active} = 1;
657 1         2 weaken(my $weak = $self);
658             $cfg->{_linger_timer} = EV::timer 0.1, 0, sub {
659 0     0   0 $cfg->{_linger_active} = 0;
660 0 0       0 $weak->_flush_all_batches if $weak;
661 1         23 };
662             }
663             }
664              
665             sub produce_many {
666 2     2   20 my ($self, $messages, $cb) = @_;
667 2         3 my $remaining = scalar @$messages;
668 2 100 66     21 return $cb->() if $cb && !$remaining;
669 1         2 my @errors;
670 1         6 my $acks0 = ($self->{cfg}{acks} == 0);
671 1         2 for my $msg (@$messages) {
672 2         4 my ($topic, $key, $value, @rest);
673 2 50       4 if (ref $msg eq 'ARRAY') {
674 2         5 ($topic, $key, $value, @rest) = @$msg;
675             } else {
676 0         0 ($topic, $key, $value) = @{$msg}{qw(topic key value)};
  0         0  
677 0         0 my %opts = %$msg;
678 0         0 delete @opts{qw(topic key value)};
679 0 0       0 push @rest, \%opts if %opts;
680             }
681 2 50       3 if ($acks0) {
682 2         4 $self->produce($topic, $key, $value, @rest);
683             } else {
684             $self->produce($topic, $key, $value, @rest, sub {
685 0     0   0 my ($result, $err) = @_;
686 0 0       0 push @errors, $err if $err;
687 0 0 0     0 if (--$remaining <= 0 && $cb) {
688 0 0       0 $cb->(@errors ? \@errors : ());
689             }
690 0         0 });
691             }
692             }
693 1 50 33     7 $cb->(@errors ? \@errors : ()) if $cb && $acks0;
    50          
694             }
695              
696             sub flush {
697 0     0   0 my ($self, $cb) = @_;
698 0         0 my $cfg = $self->{cfg};
699              
700             # flush any accumulated linger batches first
701 0         0 $self->_flush_all_batches;
702 0         0 undef $cfg->{_linger_timer};
703 0         0 $cfg->{_linger_active} = 0;
704              
705             # Drained when all of these reach zero: in-flight requests on every
706             # connection, pre-metadata pending_ops, gated/queued idempotent batches.
707             my $outstanding = sub {
708 0     0   0 my $p = 0;
709 0         0 my %seen;
710 0   0     0 for my $c (values %{$cfg->{conns} // {}}) {
  0         0  
711 0 0 0     0 next unless $c && $c->connected;
712 0         0 $p += $c->pending;
713 0         0 $seen{$$c} = 1;
714             }
715             $p += $cfg->{bootstrap_conn}->pending
716             if $cfg->{bootstrap_conn} && $cfg->{bootstrap_conn}->connected
717 0 0 0     0 && !$seen{${$cfg->{bootstrap_conn}}};
  0   0     0  
718 0   0     0 $p += scalar @{$cfg->{pending_ops} // []};
  0         0  
719 0 0 0     0 $p += scalar grep { $_ && @$_ } values %{$cfg->{batches} // {}};
  0         0  
  0         0  
720 0         0 return $p;
721 0         0 };
722              
723 0 0       0 if ($outstanding->() == 0) {
724 0 0       0 $cb->() if $cb;
725 0         0 return;
726             }
727 0         0 my $check; $check = EV::timer 0, 0.01, sub {
728 0 0   0   0 return if $outstanding->() > 0;
729 0         0 undef $check;
730 0 0       0 $cb->() if $cb;
731 0         0 };
732 0         0 $cfg->{_flush_timer} = $check;
733             }
734              
735             # --- Consumer ---
736              
737             sub assign {
738 0     0   0 my ($self, $partitions) = @_;
739 0         0 my $cfg = $self->{cfg};
740 0         0 $cfg->{assignments} = $partitions;
741             }
742              
743             sub seek {
744 0     0   0 my ($self, $topic, $partition, $offset_or_ts, $cb) = @_;
745             # offset_or_ts: integer offset, or -1 (latest), -2 (earliest)
746 0         0 for my $a (@{$self->{cfg}{assignments}}) {
  0         0  
747 0 0 0     0 if ($a->{topic} eq $topic && $a->{partition} == $partition) {
748 0 0       0 if ($offset_or_ts >= 0) {
749 0         0 $a->{offset} = $offset_or_ts;
750 0 0       0 $cb->() if $cb;
751             } else {
752             # resolve via list_offsets
753 0         0 my $leader_id = $self->_get_leader($topic, $partition);
754 0 0       0 my $conn = defined($leader_id) ? $self->_get_or_create_conn($leader_id) : undef;
755 0 0 0     0 if ($conn && $conn->connected) {
756             $conn->list_offsets($topic, $partition, $offset_or_ts, sub {
757 0     0   0 my ($res, $err) = @_;
758 0 0 0     0 if (!$err && $res) {
759 0         0 my $off = $res->{topics}[0]{partitions}[0]{offset};
760 0 0       0 $a->{offset} = $off if defined $off;
761             }
762 0 0       0 $cb->() if $cb;
763 0         0 });
764             } else {
765 0 0       0 $cb->() if $cb;
766             }
767             }
768 0         0 return;
769             }
770             }
771 0 0       0 $cb->() if $cb;
772             }
773              
774             sub poll {
775 0     0   0 my ($self, $cb) = @_;
776 0         0 my $cfg = $self->{cfg};
777 0 0       0 return unless @{$cfg->{assignments}};
  0         0  
778              
779 0 0       0 unless ($cfg->{meta}) {
780 0         0 push @{$cfg->{pending_ops}}, {
781 0     0   0 run => sub { $self->poll($cb) },
782 0         0 };
783 0 0       0 $self->_refresh_metadata unless $cfg->{meta_pending};
784 0         0 return;
785             }
786              
787             # Group assignments by leader for multi-partition fetch
788 0         0 my %by_leader; # leader_id => { topic => [{partition, offset, assign_ref}] }
789 0         0 for my $a (@{$cfg->{assignments}}) {
  0         0  
790 0         0 my $leader_id = $self->_get_leader($a->{topic}, $a->{partition});
791 0 0       0 next unless defined $leader_id;
792 0         0 push @{$by_leader{$leader_id}{$a->{topic}}}, {
793             partition => $a->{partition},
794             offset => $a->{offset},
795 0         0 _assign => $a,
796             };
797             }
798              
799 0         0 my $dispatched = 0;
800 0         0 for my $leader_id (keys %by_leader) {
801 0         0 my $conn = $self->_get_or_create_conn($leader_id);
802 0 0 0     0 next unless $conn && $conn->connected;
803 0         0 $dispatched++;
804              
805             # build fetch_multi argument: {topic => [{partition, offset}]}
806 0         0 my %fetch_arg;
807             my %assign_map; # "topic:partition" => assignment ref
808 0         0 for my $topic (keys %{$by_leader{$leader_id}}) {
  0         0  
809 0         0 for my $p (@{$by_leader{$leader_id}{$topic}}) {
  0         0  
810 0         0 push @{$fetch_arg{$topic}}, {
811             partition => $p->{partition},
812             offset => $p->{offset},
813 0         0 };
814 0         0 $assign_map{"$topic:$p->{partition}"} = $p->{_assign};
815             }
816             }
817              
818             $conn->fetch_multi(\%fetch_arg, {
819             max_bytes => $cfg->{fetch_max_bytes},
820             max_wait_ms => $cfg->{fetch_max_wait_ms},
821             min_bytes => $cfg->{fetch_min_bytes},
822             }, sub {
823 0     0   0 my ($result, $err) = @_;
824 0         0 $dispatched--;
825              
826 0 0 0     0 if (!$err && $result && ref $result->{topics} eq 'ARRAY') {
      0        
827 0         0 for my $t (@{$result->{topics}}) {
  0         0  
828 0   0     0 for my $p (@{$t->{partitions} // []}) {
  0         0  
829 0   0     0 my $records = $p->{records} // [];
830 0         0 for my $r (@$records) {
831 0 0       0 if ($cfg->{on_message}) {
832             $cfg->{on_message}->(
833             $t->{topic}, $p->{partition},
834             $r->{offset}, $r->{key}, $r->{value},
835             $r->{headers}
836 0         0 );
837             }
838             }
839 0 0       0 if (@$records) {
840 0         0 my $a = $assign_map{"$t->{topic}:$p->{partition}"};
841 0 0       0 $a->{offset} = $records->[-1]{offset} + 1 if $a;
842             }
843             }
844             }
845             }
846              
847 0 0 0     0 $cb->() if $cb && $dispatched <= 0;
848 0         0 });
849             }
850 0 0 0     0 $cb->() if $cb && !$dispatched;
851             }
852              
853             sub offsets_for {
854 0     0   0 my ($self, $topic, $cb) = @_;
855 0         0 my $cfg = $self->{cfg};
856              
857 0         0 my $np = $self->_num_partitions($topic);
858 0 0 0     0 return $cb->({}) if $cb && !$np;
859              
860 0         0 my $result = {};
861 0         0 my $remaining = $np;
862 0         0 for my $p (0..$np-1) {
863 0         0 my $pid = $p;
864 0         0 my $leader_id = $self->_get_leader($topic, $pid);
865 0 0       0 my $conn = defined($leader_id) ? $self->_get_or_create_conn($leader_id) : undef;
866 0 0 0     0 unless ($conn && $conn->connected) {
867 0         0 $result->{$pid} = {};
868 0 0 0     0 $cb->($result) if $cb && --$remaining <= 0;
869 0         0 next;
870             }
871 0         0 my %pdata;
872 0         0 my $pdone = 0;
873 0         0 for my $ts (-2, -1) {
874             $conn->list_offsets($topic, $pid, $ts, sub {
875 0     0   0 my ($res, $err) = @_;
876 0 0 0     0 if (!$err && $res && ref $res->{topics} eq 'ARRAY') {
      0        
877 0         0 my $off = $res->{topics}[0]{partitions}[0]{offset};
878 0 0       0 if ($ts == -2) { $pdata{earliest} = $off }
  0         0  
879 0         0 else { $pdata{latest} = $off }
880             }
881 0 0       0 if (++$pdone == 2) {
882 0         0 $result->{$pid} = \%pdata;
883 0 0 0     0 $cb->($result) if $cb && --$remaining <= 0;
884             }
885 0         0 });
886             }
887             }
888             }
889              
890             sub lag {
891 0     0   0 my ($self, $cb) = @_;
892 0         0 my $cfg = $self->{cfg};
893 0   0     0 my @assignments = @{$cfg->{assignments} // []};
  0         0  
894 0 0 0     0 return $cb->({}) if $cb && !@assignments;
895              
896 0         0 my $result = {};
897 0         0 my $remaining = scalar @assignments;
898 0         0 for my $a (@assignments) {
899 0         0 my $key = "$a->{topic}:$a->{partition}";
900 0         0 my $leader_id = $self->_get_leader($a->{topic}, $a->{partition});
901 0 0       0 my $conn = defined($leader_id) ? $self->_get_or_create_conn($leader_id) : undef;
902 0 0 0     0 if ($conn && $conn->connected) {
903             $conn->list_offsets($a->{topic}, $a->{partition}, -1, sub {
904 0     0   0 my ($res, $err) = @_;
905 0         0 my $hw = 0;
906 0 0 0     0 if (!$err && $res && ref $res->{topics} eq 'ARRAY') {
      0        
907 0   0     0 $hw = $res->{topics}[0]{partitions}[0]{offset} // 0;
908             }
909             $result->{$key} = {
910             current => $a->{offset},
911             latest => $hw,
912             lag => $hw - $a->{offset},
913 0         0 };
914 0 0 0     0 $cb->($result) if $cb && --$remaining <= 0;
915 0         0 });
916             } else {
917 0         0 $result->{$key} = { current => $a->{offset}, latest => 0, lag => 0 };
918 0 0 0     0 $cb->($result) if $cb && --$remaining <= 0;
919             }
920             }
921             }
922              
923             sub error_name {
924 0 0   0   0 shift if ref $_[0]; # allow $kafka->error_name or EV::Kafka::Client::error_name
925 0         0 return EV::Kafka::_error_name($_[0]);
926             }
927              
928             # --- Consumer Group ---
929              
930             sub subscribe {
931 0     0   0 my ($self, @args) = @_;
932 0         0 my @topics;
933             my %opts;
934              
935             # subscribe('topic1', 'topic2', group_id => 'g', ...)
936 0         0 while (@args) {
937 0 0       0 if ($args[0] =~ /^(group_id|group_instance_id|on_assign|on_revoke|session_timeout|rebalance_timeout|heartbeat_interval|auto_commit|auto_offset_reset)$/) {
938 0         0 my $k = shift @args;
939 0         0 $opts{$k} = shift @args;
940             } else {
941 0         0 push @topics, shift @args;
942             }
943             }
944              
945 0         0 my $cfg = $self->{cfg};
946 0 0       0 my $group_id = $opts{group_id} or die "group_id required";
947              
948             $cfg->{group} = {
949             group_id => $group_id,
950             member_id => '',
951             generation => -1,
952             topics => \@topics,
953             on_assign => $opts{on_assign},
954             on_revoke => $opts{on_revoke},
955             session_timeout => $opts{session_timeout} // 30000,
956             rebalance_timeout => $opts{rebalance_timeout} // 60000,
957             heartbeat_interval => $opts{heartbeat_interval} // 3,
958             auto_commit => $opts{auto_commit} // 1,
959             auto_offset_reset => $opts{auto_offset_reset} // 'earliest',
960             group_instance_id => $opts{group_instance_id},
961 0   0     0 coordinator => undef,
      0        
      0        
      0        
      0        
962             heartbeat_timer => undef,
963             state => 'init',
964             };
965              
966             # Step 1: ensure we have metadata
967 0 0       0 unless ($cfg->{meta}) {
968 0         0 push @{$cfg->{pending_ops}}, {
969 0     0   0 run => sub { $self->_group_start },
970 0         0 };
971 0 0       0 $self->_refresh_metadata unless $cfg->{meta_pending};
972 0         0 return;
973             }
974              
975 0         0 $self->_group_start;
976             }
977              
978             sub _group_start {
979 0     0   0 my ($self) = @_;
980 0         0 my $cfg = $self->{cfg};
981 0 0       0 my $g = $cfg->{group} or return;
982              
983 0         0 my $conn = $self->_any_conn;
984 0 0       0 return unless $conn;
985              
986 0         0 $g->{state} = 'finding';
987             $conn->find_coordinator($g->{group_id}, sub {
988 0     0   0 my ($res, $err) = @_;
989 0 0 0     0 if ($err || $res->{error_code}) {
990 0   0     0 my $msg = $err || "FindCoordinator error: $res->{error_code}";
991 0 0       0 $cfg->{on_error}->($msg) if $cfg->{on_error};
992             # retry after delay
993 0         0 my $t; $t = EV::timer 1, 0, sub { undef $t; $self->_group_start };
  0         0  
  0         0  
  0         0  
994 0         0 return;
995             }
996              
997             # Store coordinator info and connect
998             $cfg->{broker_map}{$res->{node_id}} = {
999             host => $res->{host}, port => $res->{port}
1000 0         0 };
1001 0         0 my $coord = $self->_get_or_create_conn($res->{node_id});
1002 0         0 $g->{coordinator} = $coord;
1003 0         0 $g->{coordinator_id} = $res->{node_id};
1004              
1005 0 0       0 if ($coord->connected) {
1006 0         0 $self->_group_join;
1007             } else {
1008 0         0 push @{$cfg->{pending_ops}}, {
1009             node_id => $res->{node_id},
1010 0         0 run => sub { $self->_group_join },
1011 0         0 };
1012             }
1013 0         0 });
1014             }
1015              
1016             sub _group_join {
1017 0     0   0 my ($self) = @_;
1018 0         0 my $cfg = $self->{cfg};
1019 0 0       0 my $g = $cfg->{group} or return;
1020 0 0       0 my $coord = $g->{coordinator} or return;
1021              
1022 0         0 $g->{state} = 'joining';
1023             $coord->join_group(
1024             $g->{group_id}, $g->{member_id},
1025             $g->{topics}, sub {
1026 0     0   0 my ($res, $err) = @_;
1027 0 0       0 if ($err) {
1028 0 0       0 $cfg->{on_error}->("JoinGroup: $err") if $cfg->{on_error};
1029 0         0 return;
1030             }
1031              
1032 0 0 0     0 if ($res->{error_code} == 15 || $res->{error_code} == 16) {
1033             # COORDINATOR_NOT_AVAILABLE / NOT_COORDINATOR — re-discover
1034 0         0 my $t; $t = EV::timer 1, 0, sub { undef $t; $self->_group_start };
  0         0  
  0         0  
  0         0  
1035 0         0 return;
1036             }
1037 0 0       0 if ($res->{error_code} == 27) {
1038             # REBALANCE_IN_PROGRESS — retry
1039 0         0 my $t; $t = EV::timer 1, 0, sub { undef $t; $self->_group_join };
  0         0  
  0         0  
  0         0  
1040 0         0 return;
1041             }
1042 0 0       0 if ($res->{error_code} == 79) {
1043             # MEMBER_ID_REQUIRED — retry with assigned member_id
1044 0 0       0 $g->{member_id} = $res->{member_id} if $res->{member_id};
1045 0         0 $self->_group_join;
1046 0         0 return;
1047             }
1048 0 0 0     0 if ($res->{error_code} == 22 || $res->{error_code} == 25) {
1049             # ILLEGAL_GENERATION / UNKNOWN_MEMBER_ID — broker-side
1050             # session expired or generation rolled. Reset state and
1051             # rejoin from scratch.
1052 0         0 $g->{member_id} = '';
1053 0         0 $g->{generation} = -1;
1054 0         0 my $t; $t = EV::timer 1, 0, sub { undef $t; $self->_group_start };
  0         0  
  0         0  
  0         0  
1055 0         0 return;
1056             }
1057 0 0       0 if ($res->{error_code}) {
1058             $cfg->{on_error}->(sprintf "JoinGroup: %s (code %d)",
1059             $self->error_name($res->{error_code}),
1060 0 0       0 $res->{error_code}) if $cfg->{on_error};
1061 0         0 return;
1062             }
1063              
1064 0         0 $g->{member_id} = $res->{member_id};
1065 0         0 $g->{generation} = $res->{generation_id};
1066             my $is_leader = defined($res->{leader}) && defined($res->{member_id})
1067 0   0     0 && $res->{leader} eq $res->{member_id};
1068              
1069             # Build assignments (if leader)
1070 0         0 my $assignments = [];
1071 0 0 0     0 if ($is_leader && $res->{members} && @{$res->{members}}) {
  0   0     0  
1072 0         0 $assignments = $self->_assign_partitions($res->{members}, $g->{topics});
1073             }
1074              
1075 0         0 $self->_group_sync($assignments);
1076             },
1077             $g->{session_timeout}, $g->{rebalance_timeout},
1078             $g->{group_instance_id}
1079 0         0 );
1080             }
1081              
1082             sub _assign_partitions {
1083 7     7   7782 my ($self, $members, $topics) = @_;
1084 7         15 my $cfg = $self->{cfg};
1085 7 50       17 my $meta = $cfg->{meta} or return [];
1086              
1087 7         10 my @all_parts;
1088 7   50     6 for my $t (@{$meta->{topics} // []}) {
  7         17  
1089 11         14 my $tname = $t->{name};
1090 11 50       16 next unless grep { $_ eq $tname } @$topics;
  19         36  
1091 11   50     11 for my $p (@{$t->{partitions} // []}) {
  11         18  
1092 36         69 push @all_parts, { topic => $tname, partition => $p->{partition} };
1093             }
1094             }
1095 7 50       24 @all_parts = sort { $a->{topic} cmp $b->{topic} || $a->{partition} <=> $b->{partition} } @all_parts;
  48         100  
1096              
1097 7         17 my @member_ids = sort map { $_->{member_id} } @$members;
  14         29  
1098 7         8 my $nm = scalar @member_ids;
1099              
1100             # Sticky assignment: preserve previous assignments where possible
1101 7   100     15 my $prev = $cfg->{_prev_assignments} // {};
1102 7         8 my %member_parts; # member_id => [@parts]
1103             my %assigned; # "topic:partition" => 1
1104              
1105 7 100       24 my $max_per = int(@all_parts / $nm) + ((@all_parts % $nm) ? 1 : 0);
1106              
1107             # Step 1: keep valid previous assignments (but cap at max_per)
1108 7         13 for my $mid (@member_ids) {
1109 14         20 $member_parts{$mid} = [];
1110 14   100     14 for my $p (@{$prev->{$mid} // []}) {
  14         33  
1111 10         13 my $key = "$p->{topic}:$p->{partition}";
1112 10 100       9 if (grep { $_->{topic} eq $p->{topic} && $_->{partition} == $p->{partition} } @all_parts) {
  60 50       129  
1113 10 100       9 if (scalar @{$member_parts{$mid}} < $max_per) {
  10         14  
1114 8         8 push @{$member_parts{$mid}}, $p;
  8         9  
1115 8         14 $assigned{$key} = 1;
1116             }
1117             }
1118             }
1119             }
1120              
1121             # Step 2: distribute unassigned partitions to least-loaded members
1122 7         8 my @unassigned = grep { !$assigned{"$_->{topic}:$_->{partition}"} } @all_parts;
  36         75  
1123 7         10 for my $p (@unassigned) {
1124 28         26 my $min_mid = $member_ids[0];
1125 28         24 my $min_count = scalar @{$member_parts{$min_mid}};
  28         31  
1126 28         58 for my $mid (@member_ids) {
1127 52 100       35 if (scalar @{$member_parts{$mid}} < $min_count) {
  52         74  
1128 10         7 $min_count = scalar @{$member_parts{$mid}};
  10         18  
1129 10         16 $min_mid = $mid;
1130             }
1131             }
1132 28         25 push @{$member_parts{$min_mid}}, $p;
  28         33  
1133             }
1134              
1135             # Save for next rebalance
1136 7         18 $cfg->{_prev_assignments} = { %member_parts };
1137              
1138             # Encode assignments
1139 7         9 my @assignments;
1140 7         9 for my $mid (@member_ids) {
1141 14         11 my %by_topic;
1142 14         14 for my $p (@{$member_parts{$mid}}) {
  14         17  
1143 36         29 push @{$by_topic{$p->{topic}}}, $p->{partition};
  36         59  
1144             }
1145              
1146 14         14 my $buf = '';
1147 14         16 $buf .= pack('n', 0); # version
1148 14         30 $buf .= pack('N', scalar keys %by_topic);
1149 14         26 for my $t (sort keys %by_topic) {
1150 20         27 $buf .= pack('n', length($t)) . $t;
1151 20         22 $buf .= pack('N', scalar @{$by_topic{$t}});
  20         24  
1152 20         22 for my $pid (@{$by_topic{$t}}) {
  20         20  
1153 36         53 $buf .= pack('N', $pid);
1154             }
1155             }
1156 14         14 $buf .= pack('N', -1); # user_data = null
1157              
1158 14         45 push @assignments, {
1159             member_id => $mid,
1160             assignment => $buf,
1161             };
1162             }
1163              
1164 7         38 return \@assignments;
1165             }
1166              
1167             sub _group_sync {
1168 0     0   0 my ($self, $assignments) = @_;
1169 0         0 my $cfg = $self->{cfg};
1170 0 0       0 my $g = $cfg->{group} or return;
1171 0 0       0 my $coord = $g->{coordinator} or return;
1172              
1173 0         0 $g->{state} = 'syncing';
1174             my $sync_cb = sub {
1175 0     0   0 my ($res, $err) = @_;
1176 0 0       0 if ($err) {
1177 0 0       0 $cfg->{on_error}->("SyncGroup: $err") if $cfg->{on_error};
1178 0         0 return;
1179             }
1180 0 0       0 if ($res->{error_code} == 27) {
1181             # REBALANCE_IN_PROGRESS — rejoin
1182 0         0 my $t; $t = EV::timer 1, 0, sub { undef $t; $self->_group_join };
  0         0  
  0         0  
  0         0  
1183 0         0 return;
1184             }
1185 0 0 0     0 if ($res->{error_code} == 22 || $res->{error_code} == 25) {
1186             # ILLEGAL_GENERATION / UNKNOWN_MEMBER_ID — start over.
1187 0         0 $g->{member_id} = '';
1188 0         0 $g->{generation} = -1;
1189 0         0 my $t; $t = EV::timer 1, 0, sub { undef $t; $self->_group_start };
  0         0  
  0         0  
  0         0  
1190 0         0 return;
1191             }
1192 0 0       0 if ($res->{error_code}) {
1193             $cfg->{on_error}->(sprintf "SyncGroup: %s (code %d)",
1194             $self->error_name($res->{error_code}),
1195 0 0       0 $res->{error_code}) if $cfg->{on_error};
1196 0         0 return;
1197             }
1198              
1199             # Decode assignment
1200 0   0     0 my $data = $res->{assignment} // '';
1201 0         0 my $dlen = length $data;
1202 0         0 my @my_assignments;
1203 0 0       0 if ($dlen >= 6) {
1204 0         0 my $off = 2; # skip version
1205 0         0 my $tc = unpack('N', substr($data, $off, 4)); $off += 4;
  0         0  
1206 0         0 for my $i (0..$tc-1) {
1207 0 0       0 last unless $off + 2 <= $dlen;
1208 0         0 my $tlen = unpack('n', substr($data, $off, 2)); $off += 2;
  0         0  
1209 0 0       0 last unless $off + $tlen <= $dlen;
1210 0         0 my $tname = substr($data, $off, $tlen); $off += $tlen;
  0         0  
1211 0 0       0 last unless $off + 4 <= $dlen;
1212 0         0 my $pc = unpack('N', substr($data, $off, 4)); $off += 4;
  0         0  
1213 0         0 for my $j (0..$pc-1) {
1214 0 0       0 last unless $off + 4 <= $dlen;
1215 0         0 my $pid = unpack('N', substr($data, $off, 4)); $off += 4;
  0         0  
1216 0 0 0     0 my $reset = ($g->{auto_offset_reset} // 'earliest') eq 'latest' ? -1 : -2;
1217 0         0 push @my_assignments, {
1218             topic => $tname, partition => $pid, offset => $reset
1219             };
1220             }
1221             }
1222             }
1223              
1224 0         0 $g->{state} = 'stable';
1225              
1226             # Fetch committed offsets, then start consuming
1227             $self->_fetch_committed_offsets(\@my_assignments, sub {
1228 0         0 $cfg->{assignments} = \@my_assignments;
1229              
1230             # Fire on_assign
1231 0 0       0 $g->{on_assign}->(\@my_assignments) if $g->{on_assign};
1232              
1233             # Start heartbeat
1234 0         0 $self->_start_heartbeat;
1235              
1236             # Start fetch loop
1237 0         0 $self->_start_fetch_loop;
1238 0         0 });
1239 0         0 };
1240             $coord->sync_group(
1241             $g->{group_id}, $g->{generation}, $g->{member_id},
1242             $assignments, $sync_cb, $g->{group_instance_id}
1243 0         0 );
1244             }
1245              
1246             sub _fetch_committed_offsets {
1247 0     0   0 my ($self, $assignments, $cb) = @_;
1248 0         0 my $cfg = $self->{cfg};
1249 0 0       0 my $g = $cfg->{group} or return $cb->();
1250 0         0 my $coord = $g->{coordinator};
1251 0 0 0     0 return $cb->() unless $coord && $coord->connected && @$assignments;
      0        
1252              
1253             # Build topics array for offset_fetch
1254 0         0 my %by_topic;
1255 0         0 for my $a (@$assignments) {
1256 0         0 push @{$by_topic{$a->{topic}}}, $a->{partition};
  0         0  
1257             }
1258 0         0 my @topics;
1259 0         0 for my $t (sort keys %by_topic) {
1260 0         0 push @topics, { topic => $t, partitions => $by_topic{$t} };
1261             }
1262              
1263             $coord->offset_fetch($g->{group_id}, \@topics, sub {
1264 0     0   0 my ($res, $err) = @_;
1265 0 0 0     0 if (!$err && $res && ref $res->{topics} eq 'ARRAY') {
      0        
1266 0         0 for my $t (@{$res->{topics}}) {
  0         0  
1267 0   0     0 for my $p (@{$t->{partitions} // []}) {
  0         0  
1268 0 0       0 next if $p->{error_code};
1269 0 0       0 next if $p->{offset} < 0; # no committed offset
1270 0         0 for my $a (@$assignments) {
1271 0 0 0     0 if ($a->{topic} eq $t->{topic} && $a->{partition} == $p->{partition}) {
1272 0         0 $a->{offset} = $p->{offset};
1273             }
1274             }
1275             }
1276             }
1277             }
1278              
1279             # For partitions with unresolved offset (-2=earliest, -1=latest), resolve via ListOffsets
1280 0         0 my @need_offsets = grep { $_->{offset} < 0 } @$assignments;
  0         0  
1281 0 0       0 if (@need_offsets) {
1282 0         0 my $remaining = scalar @need_offsets;
1283 0         0 for my $a (@need_offsets) {
1284 0         0 my $leader_id = $self->_get_leader($a->{topic}, $a->{partition});
1285 0 0       0 my $lconn = defined($leader_id) ? $self->_get_or_create_conn($leader_id) : undef;
1286 0 0 0     0 if ($lconn && $lconn->connected) {
1287             $lconn->list_offsets($a->{topic}, $a->{partition}, $a->{offset}, sub {
1288 0         0 my ($lres, $lerr) = @_;
1289 0 0 0     0 if (!$lerr && $lres && ref $lres->{topics} eq 'ARRAY') {
      0        
1290 0         0 for my $lt (@{$lres->{topics}}) {
  0         0  
1291 0   0     0 for my $lp (@{$lt->{partitions} // []}) {
  0         0  
1292 0 0       0 $a->{offset} = $lp->{offset} if !$lp->{error_code};
1293             }
1294             }
1295             }
1296 0         0 $remaining--;
1297 0 0       0 $cb->() if $remaining <= 0;
1298 0         0 });
1299             } else {
1300 0         0 $a->{offset} = 0;
1301 0         0 $remaining--;
1302 0 0       0 $cb->() if $remaining <= 0;
1303             }
1304             }
1305             } else {
1306 0         0 $cb->();
1307             }
1308 0         0 });
1309             }
1310              
1311             sub _start_heartbeat {
1312 0     0   0 my ($self) = @_;
1313 0         0 my $cfg = $self->{cfg};
1314 0 0       0 my $g = $cfg->{group} or return;
1315              
1316             $g->{heartbeat_timer} = EV::timer $g->{heartbeat_interval}, $g->{heartbeat_interval}, sub {
1317 0 0   0   0 return unless $g->{state} eq 'stable';
1318 0         0 my $coord = $g->{coordinator};
1319 0 0 0     0 return unless $coord && $coord->connected;
1320              
1321             $coord->heartbeat($g->{group_id}, $g->{generation}, $g->{member_id}, sub {
1322 0         0 my ($res, $err) = @_;
1323 0 0       0 if ($err) { return }
  0         0  
1324 0 0       0 return unless $res;
1325 0   0     0 my $ec = $res->{error_code} // 0;
1326 0 0 0     0 if ($ec == 27) {
    0 0        
    0          
1327             # REBALANCE_IN_PROGRESS
1328 0         0 $g->{state} = 'rebalancing';
1329 0 0       0 $g->{on_revoke}->($cfg->{assignments}) if $g->{on_revoke};
1330 0         0 $self->_stop_heartbeat;
1331 0         0 $self->_stop_fetch_loop;
1332 0         0 $self->_group_join;
1333             } elsif ($ec == 22 || $ec == 25) {
1334             # ILLEGAL_GENERATION / UNKNOWN_MEMBER_ID — start over.
1335 0         0 $g->{state} = 'rebalancing';
1336 0         0 $g->{member_id} = '';
1337 0         0 $g->{generation} = -1;
1338 0         0 $self->_stop_heartbeat;
1339 0         0 $self->_stop_fetch_loop;
1340 0         0 $self->_group_start;
1341             } elsif ($ec == 15 || $ec == 16) {
1342             # Coordinator moved — re-discover. Stop the fetch loop
1343             # too: assignments are stale until the new coordinator
1344             # confirms generation.
1345 0         0 $g->{state} = 'rebalancing';
1346 0         0 $self->_stop_heartbeat;
1347 0         0 $self->_stop_fetch_loop;
1348 0         0 $self->_group_start;
1349             }
1350 0         0 }, $g->{group_instance_id});
1351 0         0 };
1352             }
1353              
1354             sub _stop_heartbeat {
1355 11     11   29 my ($self) = @_;
1356 11 50       32 my $g = $self->{cfg}{group} or return;
1357 0         0 undef $g->{heartbeat_timer};
1358             }
1359              
1360             sub _start_fetch_loop {
1361 0     0   0 my ($self) = @_;
1362 0         0 my $cfg = $self->{cfg};
1363 0 0       0 return if $cfg->{fetch_active};
1364 0         0 $cfg->{fetch_active} = 1;
1365 0         0 $cfg->{_fetch_in_flight} = 0;
1366              
1367 0         0 weaken(my $weak = $self);
1368             $cfg->{fetch_timer} = EV::timer 0, 0.1, sub {
1369 0 0 0 0   0 return unless $weak && $cfg->{fetch_active};
1370             # Skip ticks while a prior poll round is still in flight to avoid
1371             # duplicate per-(topic,partition) requests landing on the broker.
1372 0 0       0 return if $cfg->{_fetch_in_flight};
1373 0         0 $cfg->{_fetch_in_flight} = 1;
1374 0         0 $weak->poll(sub { $cfg->{_fetch_in_flight} = 0 });
  0         0  
1375 0         0 };
1376             }
1377              
1378             sub _stop_fetch_loop {
1379 11     11   17 my ($self) = @_;
1380 11         18 my $cfg = $self->{cfg};
1381 11         16 $cfg->{fetch_active} = 0;
1382 11         17 $cfg->{_fetch_in_flight} = 0;
1383 11         17 undef $cfg->{fetch_timer};
1384             }
1385              
1386             sub commit {
1387 0     0   0 my ($self, $cb) = @_;
1388 0         0 my $cfg = $self->{cfg};
1389 0         0 my $g = $cfg->{group};
1390 0 0       0 unless ($g) { $cb->() if $cb; return }
  0 0       0  
  0         0  
1391 0         0 my $coord = $g->{coordinator};
1392 0 0 0     0 unless ($coord && $coord->connected) { $cb->() if $cb; return }
  0 0       0  
  0         0  
1393              
1394             # Build offset commit data from current assignments
1395 0         0 my %by_topic;
1396 0   0     0 for my $a (@{$cfg->{assignments} // []}) {
  0         0  
1397 0         0 push @{$by_topic{$a->{topic}}}, {
1398             partition => $a->{partition},
1399             offset => $a->{offset},
1400 0         0 };
1401             }
1402              
1403 0         0 my @topics;
1404 0         0 for my $t (sort keys %by_topic) {
1405 0         0 push @topics, { topic => $t, partitions => $by_topic{$t} };
1406             }
1407              
1408 0 0       0 if (!@topics) { $cb->() if $cb; return }
  0 0       0  
  0         0  
1409              
1410             $coord->offset_commit($g->{group_id}, $g->{generation}, $g->{member_id}, \@topics, sub {
1411 0     0   0 my ($res, $err) = @_;
1412 0 0       0 $cb->($err) if $cb;
1413 0         0 });
1414             }
1415              
1416             sub unsubscribe {
1417 0     0   0 my ($self, $cb) = @_;
1418 0         0 my $cfg = $self->{cfg};
1419 0         0 my $g = $cfg->{group};
1420              
1421 0         0 $self->_stop_heartbeat;
1422 0         0 $self->_stop_fetch_loop;
1423              
1424             my $finish = sub {
1425             # send LeaveGroup to coordinator for fast rebalance
1426 0 0 0 0   0 if ($g && $g->{coordinator} && $g->{coordinator}->connected && $g->{member_id}) {
      0        
      0        
1427             $g->{coordinator}->leave_group($g->{group_id}, $g->{member_id}, sub {
1428 0         0 $cfg->{assignments} = [];
1429 0         0 $cfg->{group} = undef;
1430 0 0       0 $cb->() if $cb;
1431 0         0 });
1432             } else {
1433 0         0 $cfg->{assignments} = [];
1434 0         0 $cfg->{group} = undef;
1435 0 0       0 $cb->() if $cb;
1436             }
1437 0         0 };
1438              
1439 0 0 0     0 if ($g && $g->{auto_commit}) {
1440 0     0   0 $self->commit(sub { $finish->() });
  0         0  
1441             } else {
1442 0         0 $finish->();
1443             }
1444             }
1445              
1446             # --- Transactions ---
1447              
1448             sub begin_transaction {
1449 0     0   0 my ($self) = @_;
1450 0         0 my $cfg = $self->{cfg};
1451 0 0       0 die "transactional_id required" unless $cfg->{transactional_id};
1452 0 0 0     0 die "producer_id not initialized" unless defined $cfg->{producer_id} && $cfg->{producer_id} >= 0;
1453 0         0 $cfg->{_txn_active} = 1;
1454 0         0 $cfg->{_txn_partitions} = {}; # "topic:partition" => 1
1455             }
1456              
1457             sub _txn_conn {
1458 0     0   0 my ($self) = @_;
1459 0         0 my $cfg = $self->{cfg};
1460 0         0 my $conn = $cfg->{_txn_coordinator};
1461 0 0 0     0 return $conn if $conn && $conn->connected;
1462 0         0 return $self->_any_conn;
1463             }
1464              
1465             sub _add_txn_partition {
1466 0     0   0 my ($self, $topic, $partition) = @_;
1467 0         0 my $cfg = $self->{cfg};
1468 0 0       0 return unless $cfg->{_txn_active};
1469 0         0 my $key = "$topic:$partition";
1470 0 0       0 return if $cfg->{_txn_partitions}{$key}++;
1471              
1472 0         0 my $conn = $self->_txn_conn;
1473 0 0       0 return unless $conn;
1474              
1475             $conn->add_partitions_to_txn(
1476             $cfg->{transactional_id}, $cfg->{producer_id},
1477             $cfg->{producer_epoch},
1478             [{ topic => $topic, partitions => [$partition] }],
1479       0     sub {}
1480 0         0 );
1481             }
1482              
1483             sub commit_transaction {
1484 0     0   0 my ($self, $cb) = @_;
1485 0         0 my $cfg = $self->{cfg};
1486 0 0       0 die "no active transaction" unless $cfg->{_txn_active};
1487              
1488             # flush all pending batches first
1489             $self->flush(sub {
1490 0     0   0 my $conn = $self->_txn_conn;
1491 0 0       0 unless ($conn) { $cb->(undef) if $cb; return }
  0 0       0  
  0         0  
1492              
1493             $conn->end_txn($cfg->{transactional_id}, $cfg->{producer_id},
1494             $cfg->{producer_epoch}, 1, sub {
1495 0         0 my ($res, $err) = @_;
1496 0         0 $cfg->{_txn_active} = 0;
1497 0         0 $cfg->{_txn_partitions} = {};
1498 0 0       0 $cb->($res, $err) if $cb;
1499 0         0 });
1500 0         0 });
1501             }
1502              
1503             sub send_offsets_to_transaction {
1504 0     0   0 my ($self, $group_id, $cb) = @_;
1505 0         0 my $cfg = $self->{cfg};
1506 0 0       0 die "no active transaction" unless $cfg->{_txn_active};
1507 0 0       0 die "transactional_id required" unless $cfg->{transactional_id};
1508              
1509             # gather current consumer offsets
1510 0         0 my %by_topic;
1511 0   0     0 for my $a (@{$cfg->{assignments} // []}) {
  0         0  
1512 0         0 push @{$by_topic{$a->{topic}}}, {
1513             partition => $a->{partition},
1514             offset => $a->{offset},
1515 0         0 };
1516             }
1517              
1518 0         0 my @topics;
1519 0         0 for my $t (sort keys %by_topic) {
1520 0         0 push @topics, { topic => $t, partitions => $by_topic{$t} };
1521             }
1522              
1523 0 0       0 unless (@topics) {
1524 0 0       0 $cb->() if $cb;
1525 0         0 return;
1526             }
1527              
1528 0         0 my $conn = $self->_txn_conn;
1529 0 0       0 unless ($conn) { $cb->() if $cb; return }
  0 0       0  
  0         0  
1530              
1531 0         0 my $g = $cfg->{group};
1532 0 0       0 my $generation = $g ? $g->{generation} : -1;
1533 0 0 0     0 my $member_id = $g ? ($g->{member_id} // '') : '';
1534              
1535             $conn->txn_offset_commit(
1536             $cfg->{transactional_id}, $group_id,
1537             $cfg->{producer_id}, $cfg->{producer_epoch},
1538             $generation, $member_id,
1539             \@topics, sub {
1540 0     0   0 my ($res, $err) = @_;
1541 0 0       0 $cb->($res, $err) if $cb;
1542             }
1543 0         0 );
1544             }
1545              
1546             sub abort_transaction {
1547 0     0   0 my ($self, $cb) = @_;
1548 0         0 my $cfg = $self->{cfg};
1549 0 0       0 die "no active transaction" unless $cfg->{_txn_active};
1550              
1551             # discard unsent batches — they must not reach the broker after abort
1552 0         0 $cfg->{batches} = {};
1553 0         0 undef $cfg->{_linger_timer};
1554 0         0 $cfg->{_linger_active} = 0;
1555             # Clear per-partition idempotent state. In-flight responses for the
1556             # aborted transaction may still arrive; their callbacks will run as
1557             # usual but must not block subsequent produce after begin_transaction.
1558 0         0 $cfg->{_inflight} = {};
1559 0         0 $cfg->{_batch_retries} = {};
1560             # Roll next_sequence back to the broker-acked high-water mark so
1561             # discarded reservations don't leave gaps that would trigger
1562             # OutOfOrderSequenceNumber on the next transaction.
1563 0         0 for my $bkey (keys %{$cfg->{next_sequence}}) {
  0         0  
1564 0   0     0 $cfg->{next_sequence}{$bkey} = $cfg->{_acked_sequence}{$bkey} // 0;
1565             }
1566              
1567 0         0 my $conn = $self->_txn_conn;
1568 0 0       0 unless ($conn) { $cb->(undef) if $cb; return }
  0 0       0  
  0         0  
1569              
1570             $conn->end_txn($cfg->{transactional_id}, $cfg->{producer_id},
1571             $cfg->{producer_epoch}, 0, sub {
1572 0     0   0 my ($res, $err) = @_;
1573 0         0 $cfg->{_txn_active} = 0;
1574 0         0 $cfg->{_txn_partitions} = {};
1575 0 0       0 $cb->($res, $err) if $cb;
1576 0         0 });
1577             }
1578              
1579             sub close {
1580 11     11   21 my ($self, $cb) = @_;
1581 11 50       20 my $cfg = $self->{cfg} or return;
1582              
1583 11         30 $self->_stop_heartbeat;
1584 11         22 $self->_stop_fetch_loop;
1585 11         31 $self->_disarm_metadata_timer;
1586              
1587 11   50     30 for my $conn (values %{$cfg->{conns} // {}}) {
  11         33  
1588 0 0 0     0 eval { $conn->disconnect if $conn && $conn->connected };
  0         0  
1589             }
1590 11 50       27 if ($cfg->{bootstrap_conn}) {
1591 0         0 eval { $cfg->{bootstrap_conn}->disconnect
1592 0 0       0 if $cfg->{bootstrap_conn}->connected };
1593             }
1594 11         20 $cfg->{conns} = {};
1595 11         17 $cfg->{bootstrap_conn} = undef;
1596 11 50       383 $cb->() if $cb;
1597             }
1598              
1599             sub DESTROY {
1600 11     11   9765 my $self = shift;
1601 11 50 33     60 return unless $self && $self->{cfg};
1602 11         29 $self->close;
1603             }
1604              
1605             package EV::Kafka::Conn;
1606              
1607             1;
1608              
1609             =head1 NAME
1610              
1611             EV::Kafka - High-performance asynchronous Kafka/Redpanda client using EV
1612              
1613             =head1 SYNOPSIS
1614              
1615             use EV::Kafka;
1616              
1617             my $kafka = EV::Kafka->new(
1618             brokers => '127.0.0.1:9092',
1619             acks => -1,
1620             on_error => sub { warn "kafka: @_" },
1621             on_message => sub {
1622             my ($topic, $partition, $offset, $key, $value, $headers) = @_;
1623             print "$topic:$partition @ $offset $key = $value\n";
1624             },
1625             );
1626              
1627             # Producer
1628             $kafka->connect(sub {
1629             $kafka->produce('my-topic', 'key', 'value', sub {
1630             my ($result, $err) = @_;
1631             my $off = $result->{topics}[0]{partitions}[0]{base_offset};
1632             print "produced at offset $off\n";
1633             });
1634             });
1635              
1636             # Consumer (manual assignment)
1637             $kafka->assign([{ topic => 'my-topic', partition => 0, offset => 0 }]);
1638             my $poll = EV::timer 0, 0.1, sub { $kafka->poll };
1639              
1640             # Consumer group
1641             $kafka->subscribe('my-topic',
1642             group_id => 'my-group',
1643             on_assign => sub { ... },
1644             on_revoke => sub { ... },
1645             );
1646              
1647             EV::run;
1648              
1649             =head1 DESCRIPTION
1650              
1651             EV::Kafka is a high-performance asynchronous Kafka client that implements
1652             the Kafka binary protocol in XS with L event loop integration. It
1653             targets Redpanda and Apache Kafka (protocol version 0.11+).
1654              
1655             Two-layer architecture:
1656              
1657             =over
1658              
1659             =item * B (XS) -- single broker TCP connection with
1660             protocol encoding/decoding, correlation ID matching, pipelining,
1661             optional TLS and SASL (PLAIN, SCRAM-SHA-256/512) authentication.
1662              
1663             =item * B (Perl) -- cluster management with metadata
1664             discovery, broker connection pooling, partition leader routing, producer
1665             with key-based partitioning, consumer with manual assignment or consumer
1666             groups.
1667              
1668             =back
1669              
1670             Features:
1671              
1672             =over
1673              
1674             =item * Binary protocol implemented in pure XS (no librdkafka dependency)
1675              
1676             =item * Automatic request pipelining per broker connection
1677              
1678             =item * Metadata-driven partition leader routing
1679              
1680             =item * Producer: acks modes (-1/0/1), key-based partitioning (murmur2),
1681             headers, fire-and-forget (acks=0), idempotent producer with epoch-bump
1682             recovery, transactional / exactly-once stream processing
1683              
1684             =item * Consumer: manual partition assignment, offset tracking, poll-based
1685             message delivery; consumer groups with JoinGroup/SyncGroup/Heartbeat,
1686             sticky partition assignment, offset commit/fetch, automatic rebalancing,
1687             session-expiry recovery
1688              
1689             =item * Compression: lz4, gzip, zstd, snappy (each gated by build-time
1690             library detection)
1691              
1692             =item * TLS (OpenSSL) and SASL/PLAIN, SCRAM-SHA-256/512 (with full
1693             RFC 5802 server-signature verification)
1694              
1695             =item * Automatic reconnection at the connection layer; bootstrap-broker
1696             failover; periodic metadata refresh
1697              
1698             =back
1699              
1700             =head1 ANYEVENT INTEGRATION
1701              
1702             L has EV as one of its backends, so EV::Kafka can be used
1703             in AnyEvent applications seamlessly.
1704              
1705             =head1 NO UTF-8 SUPPORT
1706              
1707             This module handles all values as bytes. Encode your UTF-8 strings
1708             before passing them:
1709              
1710             use Encode;
1711              
1712             $kafka->produce($topic, $key, encode_utf8($val), sub { ... });
1713              
1714             =head1 CLUSTER CLIENT METHODS
1715              
1716             =head2 new(%options)
1717              
1718             Create a new EV::Kafka client. Returns a blessed C
1719             object.
1720              
1721             my $kafka = EV::Kafka->new(
1722             brokers => '10.0.0.1:9092,10.0.0.2:9092',
1723             acks => -1,
1724             on_error => sub { warn @_ },
1725             );
1726              
1727             Options:
1728              
1729             =over
1730              
1731             =item brokers => 'Str'
1732              
1733             Comma-separated list of bootstrap broker addresses (host:port).
1734             Default: C<127.0.0.1:9092>.
1735              
1736             =item client_id => 'Str' (default 'ev-kafka')
1737              
1738             Client identifier sent to brokers.
1739              
1740             =item tls => Bool
1741              
1742             Enable TLS encryption.
1743              
1744             =item tls_ca_file => 'Str'
1745              
1746             Path to CA certificate file for TLS verification.
1747              
1748             =item tls_skip_verify => Bool
1749              
1750             Skip TLS certificate verification.
1751              
1752             =item sasl => \%opts
1753              
1754             Enable SASL authentication. Supported mechanisms: C,
1755             C, C.
1756              
1757             sasl => { mechanism => 'PLAIN', username => 'user', password => 'pass' }
1758              
1759             =item acks => Int (default -1)
1760              
1761             Producer acknowledgment mode. C<-1> = all in-sync replicas, C<0> = no
1762             acknowledgment (fire-and-forget), C<1> = leader only.
1763              
1764             =item linger_ms => Int (default 5)
1765              
1766             Time in milliseconds to accumulate records before flushing a batch.
1767             Lower values reduce latency; higher values improve throughput.
1768              
1769             =item batch_size => Int (default 16384)
1770              
1771             Maximum batch size in bytes before a batch is flushed immediately.
1772              
1773             =item compression => 'Str'
1774              
1775             Compression type for produce batches: C<'lz4'> (requires liblz4),
1776             C<'gzip'> (requires zlib), C<'zstd'> (requires libzstd),
1777             C<'snappy'> (requires libsnappy), or C for none.
1778              
1779             =item idempotent => Bool (default 0)
1780              
1781             Enable idempotent producer. Calls C on connect and
1782             sets producer_id/epoch/sequence in each RecordBatch for exactly-once
1783             delivery (broker-side deduplication). Only one batch per
1784             (topic, partition) is in-flight at a time when this is enabled, to
1785             prevent sequence-number aliasing on retry.
1786              
1787             =item transactional_id => 'Str'
1788              
1789             Enable transactional producer. Implies idempotent. Required for
1790             C/C/C
1791             and C (full EOS).
1792              
1793             =item partitioner => $cb->($topic, $key, $num_partitions)
1794              
1795             Custom partition selection function. Default: murmur2 hash of key,
1796             or round-robin for null keys.
1797              
1798             =item on_error => $cb->($errstr)
1799              
1800             Error callback. Default: C.
1801              
1802             =item on_connect => $cb->()
1803              
1804             Called once after initial metadata fetch completes.
1805              
1806             =item on_message => $cb->($topic, $partition, $offset, $key, $value, $headers)
1807              
1808             Message delivery callback for consumer operations.
1809              
1810             =item fetch_max_wait_ms => Int (default 500)
1811              
1812             Maximum time the broker waits to accumulate C of data
1813             before returning a fetch response.
1814              
1815             =item fetch_max_bytes => Int (default 1048576)
1816              
1817             Maximum bytes per fetch response.
1818              
1819             =item fetch_min_bytes => Int (default 1)
1820              
1821             Minimum bytes before the broker responds to a fetch.
1822              
1823             =item metadata_refresh => Int (default 300)
1824              
1825             Periodic metadata refresh interval in seconds. Set to C<0> to disable.
1826             Refreshes happen in the background, so consumers and producers pick up
1827             leader changes without waiting for a request to fail first.
1828              
1829             =item loop => $ev_loop
1830              
1831             EV loop object to use. Default: C.
1832              
1833             =back
1834              
1835             =head2 connect([$cb])
1836              
1837             Connect to the cluster. Connects to the first available bootstrap
1838             broker, fetches cluster metadata, then fires C<$cb-E($metadata)>.
1839             On bootstrap-broker failure the next address is tried; if all fail,
1840             the C handler fires.
1841              
1842             $kafka->connect(sub {
1843             my $meta = shift;
1844             # $meta->{brokers}, $meta->{topics}
1845             });
1846              
1847             =head2 produce($topic, $key, $value, [\%opts,] [$cb])
1848              
1849             Produce a message. Routes to the correct partition leader automatically.
1850              
1851             # with callback (acks=1 or acks=-1)
1852             $kafka->produce('topic', 'key', 'value', sub {
1853             my ($result, $err) = @_;
1854             });
1855              
1856             # with headers
1857             $kafka->produce('topic', 'key', 'value',
1858             { headers => { 'h1' => 'v1' } }, sub { ... });
1859              
1860             # fire-and-forget (acks=0)
1861             $kafka->produce('topic', 'key', 'value');
1862              
1863             # explicit partition
1864             $kafka->produce('topic', 'key', 'value',
1865             { partition => 3 }, sub { ... });
1866              
1867             =head2 produce_many(\@messages, $cb)
1868              
1869             Produce multiple messages with a single completion callback. Each
1870             message is an arrayref C<[$topic, $key, $value]> or a hashref
1871             C<{topic, key, value}>. C<$cb> fires when all messages are acknowledged.
1872              
1873             $kafka->produce_many([
1874             ['my-topic', 'k1', 'v1'],
1875             ['my-topic', 'k2', 'v2'],
1876             ], sub {
1877             my $errors = shift;
1878             warn "some failed: @$errors" if $errors;
1879             });
1880              
1881             =head2 flush([$cb])
1882              
1883             Flush all accumulated produce batches and wait for all in-flight
1884             requests to complete. C<$cb> fires when all pending responses have
1885             been received.
1886              
1887             =head2 assign(\@partitions)
1888              
1889             Manually assign partitions for consuming.
1890              
1891             $kafka->assign([
1892             { topic => 'my-topic', partition => 0, offset => 0 },
1893             { topic => 'my-topic', partition => 1, offset => 100 },
1894             ]);
1895              
1896             =head2 seek($topic, $partition, $offset, [$cb])
1897              
1898             Seek a partition to a specific offset. Use C<-2> for earliest, C<-1>
1899             for latest. Updates the assignment in-place.
1900              
1901             $kafka->seek('my-topic', 0, -1, sub { print "at latest\n" });
1902              
1903             =head2 offsets_for($topic, $cb)
1904              
1905             Get earliest and latest offsets for all partitions of a topic.
1906              
1907             $kafka->offsets_for('my-topic', sub {
1908             my $offsets = shift;
1909             # { 0 => { earliest => 0, latest => 42 }, 1 => ... }
1910             });
1911              
1912             =head2 lag($cb)
1913              
1914             Get consumer lag for all assigned partitions.
1915              
1916             $kafka->lag(sub {
1917             my $lag = shift;
1918             # { "topic:0" => { current => 10, latest => 42, lag => 32 } }
1919             });
1920              
1921             =head2 error_name($code)
1922              
1923             Convert a Kafka numeric error code to its name. Callable as a method
1924             or class function.
1925              
1926             $kafka->error_name(3); # "UNKNOWN_TOPIC_OR_PARTITION"
1927             EV::Kafka::Client::error_name(3); # same
1928              
1929             =head2 poll([$cb])
1930              
1931             Fetch messages from assigned partitions. Calls C for each
1932             received record. C<$cb> fires when all fetch responses have arrived.
1933              
1934             my $timer = EV::timer 0, 0.1, sub { $kafka->poll };
1935              
1936             =head2 subscribe(@topics, %opts)
1937              
1938             Join a consumer group and subscribe to one or more topics. The list
1939             of topic names comes first, followed by option key/value pairs. The
1940             group protocol handles partition assignment automatically.
1941              
1942             $kafka->subscribe('topic-a', 'topic-b',
1943             group_id => 'my-group',
1944             session_timeout => 30000, # ms
1945             rebalance_timeout => 60000, # ms
1946             heartbeat_interval => 3, # seconds
1947             auto_commit => 1, # commit on unsubscribe (default)
1948             auto_offset_reset => 'earliest', # or 'latest'
1949             group_instance_id => 'pod-abc', # KIP-345 static membership
1950             on_assign => sub {
1951             my $partitions = shift;
1952             # [{topic, partition, offset}, ...]
1953             },
1954             on_revoke => sub {
1955             my $partitions = shift;
1956             },
1957             );
1958              
1959             =head2 commit([$cb])
1960              
1961             Commit current consumer offsets to the group coordinator.
1962              
1963             $kafka->commit(sub {
1964             my $err = shift;
1965             warn "commit failed: $err" if $err;
1966             });
1967              
1968             =head2 unsubscribe([$cb])
1969              
1970             Leave the consumer group (sends LeaveGroup for fast rebalance),
1971             stop heartbeat and fetch loop. If C is enabled,
1972             commits offsets before leaving.
1973              
1974             =head2 begin_transaction
1975              
1976             Start a transaction. Requires C in constructor.
1977              
1978             =head2 send_offsets_to_transaction($group_id, [$cb])
1979              
1980             Commit consumer offsets within the current transaction via
1981             C. This is the key step for exactly-once
1982             consume-process-produce pipelines.
1983              
1984             $kafka->send_offsets_to_transaction('my-group', sub {
1985             my ($result, $err) = @_;
1986             });
1987              
1988             =head2 commit_transaction([$cb])
1989              
1990             Commit the current transaction. All produced messages and offset
1991             commits within the transaction become visible atomically.
1992              
1993             =head2 abort_transaction([$cb])
1994              
1995             Abort the current transaction. All produced messages are discarded
1996             and offset commits are rolled back.
1997              
1998             =head2 close([$cb])
1999              
2000             Graceful shutdown: stop timers, disconnect all broker connections.
2001              
2002             $kafka->close(sub { EV::break });
2003              
2004             =head1 LOW-LEVEL CONNECTION METHODS
2005              
2006             C provides direct access to a single broker connection.
2007             Useful for custom protocols, debugging, or when cluster-level routing
2008             is not needed.
2009              
2010             my $conn = EV::Kafka::Conn::_new('EV::Kafka::Conn', undef);
2011             $conn->on_error(sub { warn @_ });
2012             $conn->on_connect(sub { ... });
2013             $conn->connect('127.0.0.1', 9092, 5.0);
2014              
2015             =head2 connect($host, $port, [$timeout])
2016              
2017             Connect to a broker. Timeout in seconds (0 = no timeout).
2018              
2019             =head2 disconnect
2020              
2021             Disconnect from broker.
2022              
2023             =head2 connected
2024              
2025             Returns true if the connection is ready (ApiVersions handshake complete).
2026              
2027             =head2 metadata(\@topics, $cb)
2028              
2029             Request cluster metadata. Pass C for all topics.
2030              
2031             $conn->metadata(['my-topic'], sub {
2032             my ($result, $err) = @_;
2033             # $result->{brokers}, $result->{topics}
2034             });
2035              
2036             =head2 produce($topic, $partition, $key, $value, [\%opts,] [$cb])
2037              
2038             Produce a message to a specific partition.
2039              
2040             $conn->produce('topic', 0, 'key', 'value', sub {
2041             my ($result, $err) = @_;
2042             });
2043              
2044             Options: C (default 1), C (hashref), C
2045             (epoch ms, default now), C (C<'none'>, C<'lz4'>,
2046             C<'gzip'>, C<'zstd'>, C<'snappy'>; each requires its respective
2047             library at build time).
2048              
2049             =head2 produce_batch($topic, $partition, \@records, [\%opts,] [$cb])
2050              
2051             Produce multiple records in a single RecordBatch. Each record is
2052             C<{key, value, headers}>. Options: C, C,
2053             C, C, C.
2054              
2055             $conn->produce_batch('topic', 0, [
2056             { key => 'k1', value => 'v1' },
2057             { key => 'k2', value => 'v2' },
2058             ], sub { my ($result, $err) = @_ });
2059              
2060             =head2 fetch($topic, $partition, $offset, [\%opts,] $cb)
2061              
2062             Fetch messages from a partition starting at C<$offset>. C<%opts> may
2063             set C (per-partition cap, default 1 MiB), C
2064             (broker block-time, default 500), C (default 1).
2065              
2066             $conn->fetch('topic', 0, 0, sub {
2067             my ($result, $err) = @_;
2068             for my $rec (@{ $result->{topics}[0]{partitions}[0]{records} }) {
2069             printf "offset=%d key=%s value=%s\n",
2070             $rec->{offset}, $rec->{key}, $rec->{value};
2071             }
2072             });
2073              
2074             =head2 fetch_multi(\%topics, [\%opts,] $cb)
2075              
2076             Multi-partition fetch in a single request. Groups multiple
2077             topic-partitions into one Fetch call to the broker. C<%opts> accepts
2078             the same keys as C.
2079              
2080             $conn->fetch_multi({
2081             'topic-a' => [{ partition => 0, offset => 10 },
2082             { partition => 1, offset => 20 }],
2083             'topic-b' => [{ partition => 0, offset => 0 }],
2084             }, sub { my ($result, $err) = @_ });
2085              
2086             Used internally by C to batch fetches by broker leader, with
2087             C/C/C taken from the cluster client
2088             config.
2089              
2090             =head2 list_offsets($topic, $partition, $timestamp, $cb)
2091              
2092             Get offsets by timestamp. Use C<-2> for earliest, C<-1> for latest.
2093              
2094             =head2 find_coordinator($key, $cb, [$key_type])
2095              
2096             Find the coordinator broker. C<$key_type>: 0=group (default),
2097             1=transaction.
2098              
2099             =head2 join_group($group_id, $member_id, \@topics, $cb, [$session_timeout_ms, $rebalance_timeout_ms, $group_instance_id])
2100              
2101             Join a consumer group. Pass C<$group_instance_id> for KIP-345 static
2102             membership.
2103              
2104             =head2 sync_group($group_id, $generation_id, $member_id, \@assignments, $cb, [$group_instance_id])
2105              
2106             Synchronize group state after join.
2107              
2108             =head2 heartbeat($group_id, $generation_id, $member_id, $cb, [$group_instance_id])
2109              
2110             Send heartbeat to group coordinator.
2111              
2112             =head2 offset_commit($group_id, $generation_id, $member_id, \@offsets, $cb)
2113              
2114             Commit consumer offsets.
2115              
2116             =head2 offset_fetch($group_id, \@topics, $cb)
2117              
2118             Fetch committed offsets for a consumer group.
2119              
2120             =head2 api_versions
2121              
2122             Returns a hashref of supported API keys to max versions, or undef
2123             if not yet negotiated.
2124              
2125             my $vers = $conn->api_versions;
2126             # { 0 => 7, 1 => 11, 3 => 8, ... }
2127              
2128             =head2 on_error([$cb]), on_connect([$cb]), on_disconnect([$cb])
2129              
2130             Set connection-level handler callbacks. Call with no argument or
2131             C to clear.
2132              
2133             =head2 client_id($id)
2134              
2135             Set the client identifier.
2136              
2137             =head2 tls($enable, [$ca_file, $skip_verify])
2138              
2139             Configure TLS.
2140              
2141             =head2 sasl($mechanism, [$username, $password])
2142              
2143             Configure SASL authentication.
2144              
2145             =head2 auto_reconnect($enable, [$delay_ms])
2146              
2147             Enable automatic reconnection with delay in milliseconds (default 1000).
2148              
2149             =head2 leave_group($group_id, $member_id, $cb)
2150              
2151             Send LeaveGroup to coordinator for fast partition rebalance.
2152              
2153             =head2 create_topics(\@topics, $timeout_ms, $cb)
2154              
2155             Create topics. Each element: C<{name, num_partitions, replication_factor}>.
2156              
2157             $conn->create_topics(
2158             [{ name => 'new-topic', num_partitions => 3, replication_factor => 1 }],
2159             5000, sub { my ($res, $err) = @_ }
2160             );
2161              
2162             =head2 delete_topics(\@topic_names, $timeout_ms, $cb)
2163              
2164             Delete topics by name.
2165              
2166             =head2 init_producer_id($transactional_id, $txn_timeout_ms, $cb)
2167              
2168             Initialize a producer ID for idempotent/transactional produce.
2169             Pass C for non-transactional idempotent producer.
2170              
2171             =head2 add_partitions_to_txn($txn_id, $producer_id, $epoch, \@topics, $cb)
2172              
2173             Register partitions with the transaction coordinator.
2174              
2175             =head2 end_txn($txn_id, $producer_id, $epoch, $committed, $cb)
2176              
2177             Commit (C<$committed=1>) or abort (C<$committed=0>) a transaction.
2178              
2179             =head2 txn_offset_commit($txn_id, $group_id, $producer_id, $epoch, $generation, $member_id, \@offsets, $cb)
2180              
2181             Commit consumer offsets within a transaction (API 28).
2182              
2183             =head2 pending
2184              
2185             Number of requests awaiting broker response.
2186              
2187             =head2 state
2188              
2189             Connection state as integer (0=disconnected, 6=ready).
2190              
2191             =head1 UTILITY FUNCTIONS
2192              
2193             =head2 EV::Kafka::_murmur2($key)
2194              
2195             Kafka-compatible murmur2 hash. Returns a non-negative 31-bit integer.
2196              
2197             =head2 EV::Kafka::_crc32c($data)
2198              
2199             CRC32C checksum (Castagnoli). Used internally for RecordBatch integrity.
2200              
2201             =head2 EV::Kafka::_error_name($code)
2202              
2203             Convert Kafka error code to string name.
2204              
2205             =head1 RESULT STRUCTURES
2206              
2207             =head2 Produce result
2208              
2209             $result = {
2210             topics => [{
2211             topic => 'name',
2212             partitions => [{
2213             partition => 0,
2214             error_code => 0,
2215             base_offset => 42,
2216             }],
2217             }],
2218             };
2219              
2220             =head2 Fetch result
2221              
2222             $result = {
2223             topics => [{
2224             topic => 'name',
2225             partitions => [{
2226             partition => 0,
2227             error_code => 0,
2228             high_watermark => 100,
2229             records => [{
2230             offset => 42,
2231             timestamp => 1712345678000,
2232             key => 'key', # or undef
2233             value => 'value', # or undef
2234             headers => { h => 'v' }, # if present
2235             }],
2236             }],
2237             }],
2238             };
2239              
2240             =head2 Metadata result
2241              
2242             $result = {
2243             controller_id => 0,
2244             brokers => [{ node_id => 0, host => '10.0.0.1', port => 9092 }],
2245             topics => [{
2246             name => 'topic',
2247             error_code => 0,
2248             partitions => [{
2249             partition => 0,
2250             leader => 0,
2251             error_code => 0,
2252             }],
2253             }],
2254             };
2255              
2256             =head1 ERROR HANDLING
2257              
2258             Errors are delivered through two channels:
2259              
2260             =over
2261              
2262             =item B fire the C callback (or
2263             C if none set). These include connection refused, DNS failure,
2264             TLS errors, SASL auth failure, and protocol violations.
2265              
2266             =item B are delivered as the second argument to
2267             the request callback: C<$cb-E($result, $error)>. If C<$error> is
2268             defined, C<$result> may be undef.
2269              
2270             =back
2271              
2272             Within result structures, per-partition C fields use Kafka
2273             numeric codes:
2274              
2275             0 No error
2276             1 OFFSET_OUT_OF_RANGE
2277             3 UNKNOWN_TOPIC_OR_PARTITION
2278             6 NOT_LEADER_OR_FOLLOWER (retried by the producer)
2279             15 COORDINATOR_NOT_AVAILABLE (retried)
2280             16 NOT_COORDINATOR (retried)
2281             22 ILLEGAL_GENERATION (group rejoin)
2282             25 UNKNOWN_MEMBER_ID (group rejoin)
2283             27 REBALANCE_IN_PROGRESS (group rejoin)
2284             36 TOPIC_ALREADY_EXISTS
2285             45 OUT_OF_ORDER_SEQUENCE_NUMBER (idempotent: epoch bump)
2286             46 DUPLICATE_SEQUENCE_NUMBER (idempotent: epoch bump)
2287             79 MEMBER_ID_REQUIRED (group rejoin with assigned id)
2288              
2289             Use C for the full list.
2290              
2291             When a broker disconnects mid-flight, all pending callbacks receive
2292             C<(undef, "connection closed by broker")> or C<(undef, "disconnected")>.
2293              
2294             =head1 ENVIRONMENT VARIABLES
2295              
2296             These are used by tests and examples (not by the module itself):
2297              
2298             TEST_KAFKA_BROKER broker address for tests (host:port)
2299             KAFKA_BROKER broker address for examples
2300             KAFKA_HOST broker hostname for low-level examples
2301             KAFKA_PORT broker port for low-level examples
2302             KAFKA_TOPIC topic name for examples
2303             KAFKA_GROUP_ID consumer group for examples
2304             KAFKA_LIMIT message limit for consume example
2305             KAFKA_COUNT message count for fire-and-forget
2306             BENCH_BROKER broker for benchmarks
2307             BENCH_MESSAGES message count for benchmarks
2308             BENCH_VALUE_SIZE value size in bytes for benchmarks
2309             BENCH_TOPIC topic name for benchmarks
2310              
2311             =head1 QUICK START
2312              
2313             Minimal producer + consumer lifecycle:
2314              
2315             use EV;
2316             use EV::Kafka;
2317              
2318             my $kafka = EV::Kafka->new(
2319             brokers => '127.0.0.1:9092',
2320             acks => 1,
2321             on_error => sub { warn "kafka: @_\n" },
2322             on_message => sub {
2323             my ($topic, $part, $offset, $key, $value) = @_;
2324             print "got: $key=$value\n";
2325             },
2326             );
2327              
2328             $kafka->connect(sub {
2329             # produce
2330             $kafka->produce('test', 'k1', 'hello', sub {
2331             print "produced\n";
2332              
2333             # consume from the beginning
2334             $kafka->assign([{topic=>'test', partition=>0, offset=>0}]);
2335             $kafka->seek('test', 0, -2, sub {
2336             my $t = EV::timer 0, 0.1, sub { $kafka->poll };
2337             $kafka->{cfg}{_t} = $t;
2338             });
2339             });
2340             });
2341              
2342             EV::run;
2343              
2344             =head1 COOKBOOK
2345              
2346             =head2 Produce JSON with headers
2347              
2348             use JSON::PP;
2349             my $json = JSON::PP->new->utf8;
2350              
2351             $kafka->produce('events', 'user-42',
2352             $json->encode({ action => 'click', page => '/home' }),
2353             { headers => { 'content-type' => 'application/json' } },
2354             sub { ... }
2355             );
2356              
2357             =head2 Consume from latest offset only
2358              
2359             $kafka->subscribe('live-feed',
2360             group_id => 'realtime',
2361             auto_offset_reset => 'latest',
2362             on_assign => sub { print "ready\n" },
2363             );
2364              
2365             =head2 Graceful shutdown
2366              
2367             $SIG{INT} = sub {
2368             $kafka->commit(sub {
2369             $kafka->unsubscribe(sub {
2370             $kafka->close(sub { EV::break });
2371             });
2372             });
2373             };
2374              
2375             =head2 At-least-once processing
2376              
2377             $kafka->subscribe('jobs',
2378             group_id => 'workers',
2379             auto_commit => 0,
2380             );
2381              
2382             # in on_message: process, then commit
2383             on_message => sub {
2384             process($_[4]);
2385             $kafka->commit if ++$count % 100 == 0;
2386             },
2387              
2388             =head2 Batch produce
2389              
2390             $kafka->produce_many([
2391             ['events', 'k1', 'v1'],
2392             ['events', 'k2', 'v2'],
2393             ['events', 'k3', 'v3'],
2394             ], sub {
2395             my $errs = shift;
2396             print $errs ? "some failed\n" : "all done\n";
2397             });
2398              
2399             =head2 Exactly-once stream processing (EOS)
2400              
2401             my $kafka = EV::Kafka->new(
2402             brokers => '...',
2403             transactional_id => 'my-eos-app',
2404             acks => -1,
2405             on_message => sub {
2406             my ($t, $p, $off, $key, $value) = @_;
2407             my $result = process($value);
2408             $kafka->produce('output-topic', $key, $result);
2409             },
2410             );
2411              
2412             # consume-process-produce loop:
2413             $kafka->begin_transaction;
2414             $kafka->poll(sub {
2415             $kafka->send_offsets_to_transaction('my-group', sub {
2416             $kafka->commit_transaction(sub {
2417             $kafka->begin_transaction; # next transaction
2418             });
2419             });
2420             });
2421              
2422             =head2 Topic administration
2423              
2424             my $conn = EV::Kafka::Conn::_new('EV::Kafka::Conn', undef);
2425             $conn->on_connect(sub {
2426             $conn->create_topics(
2427             [{ name => 'new-topic', num_partitions => 6, replication_factor => 3 }],
2428             10000, sub { ... }
2429             );
2430             });
2431              
2432             =head1 BENCHMARKS
2433              
2434             Measured on Linux with TCP loopback to Redpanda, 100-byte values,
2435             Perl 5.40.2, 20K messages (C):
2436              
2437             Pipeline produce (acks=1) 100K msg/sec 11.0 MB/s
2438             Fire-and-forget (acks=0) 120K msg/sec 13.2 MB/s
2439             Sequential round-trip 24K msg/sec 42 us avg latency
2440             Metadata request 21K req/sec 47 us avg latency
2441              
2442             Throughput by value size (pipelined, acks=1):
2443              
2444             10 bytes 105K msg/sec 1.5 MB/s
2445             100 bytes 100K msg/sec 10.5 MB/s
2446             1000 bytes 70K msg/sec 70.7 MB/s
2447             10000 bytes 20K msg/sec 202.0 MB/s
2448              
2449             Latency histogram (20K round-trips, acks=1, C):
2450              
2451             median: 39 us p90: 59 us p95: 75 us p99: 122 us
2452              
2453             Pipeline produce throughput is limited by Perl callback overhead per
2454             message. Fire-and-forget mode (C) skips the response cycle
2455             entirely, reaching ~120K msg/sec. Sequential round-trip (one produce,
2456             wait for ack, repeat) measures raw broker latency around 39us median.
2457              
2458             The fetch path is sequential (fetch, process, fetch again) which
2459             introduces one round-trip per batch. With larger C and
2460             dense topics, fetch throughput increases proportionally.
2461              
2462             Run C for throughput results. Set
2463             C, C, C, and
2464             C to customize.
2465              
2466             Run C for a latency histogram with percentiles
2467             (min, avg, median, p90, p95, p99, max).
2468              
2469             =head1 KAFKA PROTOCOL
2470              
2471             This module implements the Kafka binary protocol directly in XS.
2472             All integers are big-endian. Requests use a 4-byte size prefix
2473             followed by a header (API key, version, correlation ID, client ID)
2474             and a version-specific body.
2475              
2476             Responses are matched to requests by correlation ID. The broker
2477             guarantees FIFO ordering per connection, so the response queue is
2478             a simple FIFO.
2479              
2480             RecordBatch encoding (magic=2) is used for produce. CRC32C covers
2481             the batch from attributes through the last record. Records use
2482             ZigZag-encoded varints for lengths and deltas.
2483              
2484             The connection handshake sends ApiVersions (v0) on connect to
2485             discover supported protocol versions. SASL authentication uses
2486             SaslHandshake (v1) + SaslAuthenticate (v2) with PLAIN mechanism.
2487              
2488             Consumer group protocol uses sticky partition assignment with
2489             MEMBER_ID_REQUIRED (error 79) retry per KIP-394.
2490              
2491             Non-flexible API versions are used throughout (capped below the
2492             flexible-version threshold for each API) to avoid the compact
2493             encoding complexity.
2494              
2495             =head1 LIMITATIONS
2496              
2497             =over
2498              
2499             =item * B -- numeric IPv4/IPv6 literals
2500             take a fast path (C) and never block. Non-literal
2501             hostnames call C synchronously, blocking the EV loop
2502             until the resolver responds. For fully non-blocking operation against
2503             named brokers, pre-resolve in Perl-land.
2504              
2505             =item * B -- only SASL/PLAIN and
2506             SCRAM-SHA-256/512 are implemented.
2507              
2508             =item * B -- all API versions are capped
2509             below the flexible-version threshold to avoid compact string/array
2510             encoding. Works with Kafka 0.11+ and Redpanda; loses access to a few
2511             newer protocol features.
2512              
2513             =item * B -- transient errors (NOT_LEADER,
2514             COORDINATOR_NOT_AVAILABLE) trigger metadata refresh and up to 3
2515             retries with backoff. Hard idempotent errors (OUT_OF_ORDER_SEQUENCE,
2516             DUPLICATE_SEQUENCE) trigger one InitProducerId-with-fresh-epoch
2517             recovery attempt. Other broker errors are surfaced to the callback
2518             immediately.
2519              
2520             =back
2521              
2522             =head1 AUTHOR
2523              
2524             vividsnow
2525              
2526             =head1 LICENSE
2527              
2528             This library is free software; you can redistribute it and/or modify it
2529             under the same terms as Perl itself.
2530              
2531             =cut
2532