File Coverage

blib/lib/EV/Kafka.pm
Criterion Covered Total %
statement 156 937 16.6
branch 22 460 4.7
condition 26 349 7.4
subroutine 16 88 18.1
pod 1 1 100.0
total 221 1835 12.0


line stmt bran cond sub pod time code
1             package EV::Kafka;
2 15     15   1545727 use strict;
  15         22  
  15         540  
3 15     15   58 use warnings;
  15         78  
  15         716  
4 15     15   3333 use EV;
  15         13911  
  15         361  
5              
6             BEGIN {
7 15     15   63 use XSLoader;
  15         41  
  15         676  
8 15     15   40 our $VERSION = '0.01';
9 15         39675 XSLoader::load __PACKAGE__, $VERSION;
10             }
11              
12             sub new {
13 5     5 1 580625 my ($class, %opts) = @_;
14              
15 5         14 my $loop = delete $opts{loop};
16 5         52 my $self = $class->_new($loop);
17              
18             # Parse brokers
19 5   100     23 my $brokers = delete $opts{brokers} // '127.0.0.1:9092';
20 5         14 my @bootstrap;
21 5         20 for my $b (split /,/, $brokers) {
22 7         46 $b =~ s/^\s+|\s+$//g;
23 7         22 my ($h, $p) = split /:/, $b, 2;
24 7   100     38 $p //= 9092;
25 7         32 push @bootstrap, [$h, $p + 0];
26             }
27              
28             # Store config
29             my $cfg = {
30             bootstrap => \@bootstrap,
31             client_id => delete $opts{client_id} // 'ev-kafka',
32             tls => delete $opts{tls} // 0,
33             tls_ca_file => delete $opts{tls_ca_file},
34             tls_skip_verify => delete $opts{tls_skip_verify} // 0,
35             sasl => delete $opts{sasl},
36 0     0   0 on_error => delete $opts{on_error} // sub { die "EV::Kafka: @_\n" },
37             on_connect => delete $opts{on_connect},
38             on_message => delete $opts{on_message},
39             acks => delete $opts{acks} // -1,
40             linger_ms => delete $opts{linger_ms} // 5,
41             batch_size => delete $opts{batch_size} // 16384,
42             partitioner => delete $opts{partitioner},
43             compression => delete $opts{compression}, # 'lz4', 'gzip', or undef
44             idempotent => delete $opts{idempotent} // 0,
45             transactional_id => delete $opts{transactional_id}, # enables transactions
46             fetch_max_wait_ms => delete $opts{fetch_max_wait_ms} // 500,
47             fetch_max_bytes => delete $opts{fetch_max_bytes} // 1048576,
48             fetch_min_bytes => delete $opts{fetch_min_bytes} // 1,
49 5   50     333 metadata_refresh => delete $opts{metadata_refresh} // 300,
      50        
      50        
      33        
      100        
      50        
      50        
      50        
      50        
      50        
      50        
      50        
50             };
51              
52             # Internal state
53 5         16 $cfg->{conns} = {}; # node_id => EV::Kafka::Conn
54 5         18 $cfg->{meta} = undef; # latest metadata response
55 5         12 $cfg->{leaders} = {}; # "topic:partition" => node_id
56 5         13 $cfg->{broker_map}= {}; # node_id => {host, port}
57 5         8 $cfg->{connected} = 0;
58 5         11 $cfg->{meta_pending} = 0;
59 5         9 $cfg->{pending_ops} = []; # ops waiting for metadata
60              
61             # Producer state
62 5         12 $cfg->{batches} = {}; # "topic:partition" => [{rec, cb}]
63 5         8 $cfg->{next_sequence} = {}; # "topic:partition" => next sequence number
64 5         12 $cfg->{producer_id} = -1;
65 5         23 $cfg->{producer_epoch} = -1;
66 5         13 $cfg->{rr_counter} = 0;
67              
68             # Consumer state
69 5         8 $cfg->{assignments} = []; # [{topic, partition, offset}]
70 5         9 $cfg->{fetch_active} = 0;
71 5         10 $cfg->{group} = undef;
72              
73 5         44 bless { xs => $self, cfg => $cfg }, "${class}::Client";
74             }
75              
76             package EV::Kafka::Client;
77 15     15   109 use EV;
  15         20  
  15         351  
78 15     15   61 use Scalar::Util 'weaken';
  15         30  
  15         139464  
79              
80             sub _any_conn {
81 1     1   2 my ($self) = @_;
82 1         3 my $cfg = $self->{cfg};
83 1         2 my $conn = $cfg->{bootstrap_conn};
84 1         2 for my $c (values %{$cfg->{conns}}) {
  1         5  
85 0 0       0 if ($c->connected) { $conn = $c; last }
  0         0  
  0         0  
86             }
87 1 50 33     7 return ($conn && $conn->connected) ? $conn : undef;
88             }
89              
90             sub _get_or_create_conn {
91 0     0   0 my ($self, $node_id) = @_;
92 0         0 my $cfg = $self->{cfg};
93 0 0       0 return $cfg->{conns}{$node_id} if $cfg->{conns}{$node_id};
94              
95 0         0 my $info = $cfg->{broker_map}{$node_id};
96 0 0       0 return undef unless $info;
97              
98 0         0 my $conn = EV::Kafka::Conn::_new('EV::Kafka::Conn', undef);
99 0         0 $self->_configure_conn($conn);
100              
101 0         0 $cfg->{conns}{$node_id} = $conn;
102 0         0 weaken(my $weak = $self);
103             $conn->on_connect(sub {
104 0 0   0   0 $weak->_drain_pending_for($node_id) if $weak;
105 0         0 });
106 0         0 $conn->connect($info->{host}, $info->{port}, 10.0);
107              
108 0         0 return $conn;
109             }
110              
111             sub _configure_conn {
112 0     0   0 my ($self, $conn) = @_;
113 0         0 my $cfg = $self->{cfg};
114 0         0 $conn->client_id($cfg->{client_id});
115 0 0       0 if ($cfg->{tls}) {
116 0         0 $conn->tls(1, $cfg->{tls_ca_file}, $cfg->{tls_skip_verify});
117             }
118 0 0       0 if ($cfg->{sasl}) {
119 0         0 $conn->sasl($cfg->{sasl}{mechanism}, $cfg->{sasl}{username}, $cfg->{sasl}{password});
120             }
121 0         0 weaken(my $weak_cfg = $cfg);
122             $conn->on_error(sub {
123 0 0 0 0   0 $weak_cfg->{on_error}->($_[0]) if $weak_cfg && $weak_cfg->{on_error};
124 0         0 });
125             }
126              
127             sub _bootstrap_connect {
128 0     0   0 my ($self, $cb) = @_;
129 0         0 my $cfg = $self->{cfg};
130 0         0 my @bs = @{$cfg->{bootstrap}};
  0         0  
131 0         0 my $idx = 0;
132              
133 0         0 my $try; $try = sub {
134 0 0   0   0 if ($idx >= @bs) {
135 0         0 undef $try;
136 0 0       0 $cfg->{on_error}->("all bootstrap brokers unreachable") if $cfg->{on_error};
137 0         0 return;
138             }
139 0         0 my ($host, $port) = @{$bs[$idx++]};
  0         0  
140 0         0 my $conn = EV::Kafka::Conn::_new('EV::Kafka::Conn', undef);
141 0         0 $self->_configure_conn($conn);
142              
143             $conn->on_error(sub {
144             # try next broker
145 0         0 $try->();
146 0         0 });
147              
148             $conn->on_connect(sub {
149 0         0 undef $try; # break self-reference cycle
150             $conn->on_error(sub {
151 0 0       0 $cfg->{on_error}->($_[0]) if $cfg->{on_error};
152 0         0 });
153 0         0 $cfg->{bootstrap_conn} = $conn;
154 0         0 $cfg->{connected} = 1;
155 0         0 $self->_refresh_metadata($cb);
156 0         0 });
157              
158 0         0 $conn->connect($host, $port, 10.0);
159 0         0 };
160 0         0 $try->();
161             }
162              
163             sub connect {
164 0     0   0 my ($self, $cb) = @_;
165 0         0 $self->_bootstrap_connect($cb);
166             }
167              
168             sub _merge_metadata {
169 0     0   0 my ($cfg, $meta) = @_;
170 0   0     0 for my $b (@{$meta->{brokers} // []}) {
  0         0  
171             $cfg->{broker_map}{$b->{node_id}} = {
172             host => $b->{host}, port => $b->{port}
173 0         0 };
174             }
175 0   0     0 for my $t (@{$meta->{topics} // []}) {
  0         0  
176 0 0       0 next if $t->{error_code};
177 0   0     0 for my $p (@{$t->{partitions} // []}) {
  0         0  
178 0         0 $cfg->{leaders}{"$t->{name}:$p->{partition}"} = $p->{leader};
179             }
180             }
181             }
182              
183             sub _refresh_metadata {
184 1     1   3 my ($self, $cb) = @_;
185 1         2 my $cfg = $self->{cfg};
186 1 50       5 return if $cfg->{meta_pending};
187 1         2 $cfg->{meta_pending} = 1;
188              
189 1         4 my $conn = $self->_any_conn;
190 1 50       3 unless ($conn) { $cfg->{meta_pending} = 0; return }
  1         3  
  1         2  
191              
192             $conn->metadata(undef, sub {
193 0     0   0 my ($meta, $err) = @_;
194 0         0 $cfg->{meta_pending} = 0;
195 0 0       0 if ($err) {
196 0 0       0 $cfg->{on_error}->("metadata: $err") if $cfg->{on_error};
197 0         0 return;
198             }
199              
200 0         0 $cfg->{meta} = $meta;
201 0         0 _merge_metadata($cfg, $meta);
202              
203             # assign bootstrap_conn a node_id if possible
204 0 0 0     0 if ($cfg->{bootstrap_conn} && $meta->{brokers} && @{$meta->{brokers}}) {
  0   0     0  
205 0         0 my $binfo = $meta->{brokers}[0];
206 0   0     0 $cfg->{conns}{$binfo->{node_id}} //= $cfg->{bootstrap_conn};
207             }
208              
209 0 0 0     0 if (($cfg->{idempotent} || $cfg->{transactional_id}) && $cfg->{producer_id} < 0) {
      0        
210             $self->_init_idempotent(sub {
211 0         0 $self->_drain_all_pending;
212 0 0       0 $cb->($meta) if $cb;
213 0 0       0 $cfg->{on_connect}->() if $cfg->{on_connect};
214 0         0 $cfg->{on_connect} = undef;
215 0         0 });
216             } else {
217 0         0 $self->_drain_all_pending;
218 0 0       0 $cb->($meta) if $cb;
219 0 0       0 $cfg->{on_connect}->() if $cfg->{on_connect};
220 0         0 $cfg->{on_connect} = undef;
221             }
222 0         0 });
223             }
224              
225             sub _refresh_metadata_for_topic {
226 0     0   0 my ($self, $topic) = @_;
227 0         0 my $cfg = $self->{cfg};
228 0 0       0 return if $cfg->{meta_pending};
229 0         0 $cfg->{meta_pending} = 1;
230              
231 0         0 my $conn = $self->_any_conn;
232 0 0       0 unless ($conn) { $cfg->{meta_pending} = 0; return }
  0         0  
  0         0  
233              
234             $conn->metadata([$topic], sub {
235 0     0   0 my ($meta, $err) = @_;
236 0         0 $cfg->{meta_pending} = 0;
237 0 0       0 if ($err) {
238 0 0       0 $cfg->{on_error}->("metadata: $err") if $cfg->{on_error};
239 0         0 return;
240             }
241              
242 0         0 _merge_metadata($cfg, $meta);
243              
244             # if topic still has error, retry after delay
245 0         0 my $topic_ok = 0;
246 0   0     0 for my $t (@{$meta->{topics} // []}) {
  0         0  
247 0 0 0     0 if ($t->{name} eq $topic && !$t->{error_code} && @{$t->{partitions} // []}) {
  0   0     0  
      0        
248 0         0 $topic_ok = 1;
249 0         0 last;
250             }
251             }
252              
253 0 0       0 if ($topic_ok) {
254 0         0 $self->_drain_all_pending;
255             } else {
256             # retry after short delay (topic being created)
257 0         0 my $t; $t = EV::timer 0.5, 0, sub {
258 0         0 undef $t;
259 0         0 $self->_refresh_metadata_for_topic($topic);
260 0         0 };
261             }
262 0         0 });
263             }
264              
265             sub _init_idempotent {
266 0     0   0 my ($self, $cb) = @_;
267 0         0 my $cfg = $self->{cfg};
268              
269             my $do_init = sub {
270 0     0   0 my ($conn) = @_;
271             $conn->init_producer_id($cfg->{transactional_id}, 30000, sub {
272 0         0 my ($res, $err) = @_;
273 0 0 0     0 if (!$err && $res && !$res->{error_code}) {
      0        
274 0         0 $cfg->{producer_id} = $res->{producer_id};
275 0         0 $cfg->{producer_epoch} = $res->{producer_epoch};
276             } else {
277 0   0     0 my $msg = $err || "InitProducerId error: " . ($res->{error_code} // '?');
278 0 0       0 $cfg->{on_error}->($msg) if $cfg->{on_error};
279             }
280 0 0       0 $cb->() if $cb;
281 0         0 });
282 0         0 };
283              
284 0 0       0 if ($cfg->{transactional_id}) {
285             # transactional: find transaction coordinator first
286 0         0 my $conn = $self->_any_conn;
287 0 0       0 unless ($conn) { $cb->() if $cb; return }
  0 0       0  
  0         0  
288              
289             my $on_coord = sub {
290 0     0   0 my ($res, $err) = @_;
291 0 0 0     0 if ($err || ($res->{error_code} && $res->{error_code} != 0)) {
      0        
292 0         0 $do_init->($conn);
293 0         0 return;
294             }
295             # connect to the transaction coordinator
296             $cfg->{broker_map}{$res->{node_id}} = {
297             host => $res->{host}, port => $res->{port}
298 0         0 };
299 0         0 my $txn_conn = $self->_get_or_create_conn($res->{node_id});
300 0 0 0     0 if ($txn_conn && $txn_conn->connected) {
301 0         0 $cfg->{_txn_coordinator} = $txn_conn;
302 0         0 $do_init->($txn_conn);
303             } else {
304 0         0 push @{$cfg->{pending_ops}}, {
305             node_id => $res->{node_id},
306             run => sub {
307 0         0 $cfg->{_txn_coordinator} = $self->_get_or_create_conn($res->{node_id});
308 0   0     0 $do_init->($cfg->{_txn_coordinator} || $conn);
309             },
310 0         0 };
311             }
312 0         0 };
313 0         0 $conn->find_coordinator($cfg->{transactional_id}, $on_coord, 1);
314             } else {
315             # idempotent only: any broker works
316 0         0 my $conn = $self->_any_conn;
317 0 0       0 unless ($conn) { $cb->() if $cb; return }
  0 0       0  
  0         0  
318 0         0 $do_init->($conn);
319             }
320             }
321              
322             sub _drain_pending_for {
323 0     0   0 my ($self, $node_id) = @_;
324 0         0 my $cfg = $self->{cfg};
325 0         0 my @remaining;
326 0         0 for my $op (@{$cfg->{pending_ops}}) {
  0         0  
327 0 0 0     0 if (defined $op->{node_id} && $op->{node_id} == $node_id) {
328 0         0 $op->{run}->();
329             } else {
330 0         0 push @remaining, $op;
331             }
332             }
333 0         0 $cfg->{pending_ops} = \@remaining;
334             }
335              
336             sub _drain_all_pending {
337 0     0   0 my ($self) = @_;
338 0         0 my $cfg = $self->{cfg};
339 0         0 my @ops = @{$cfg->{pending_ops}};
  0         0  
340 0         0 $cfg->{pending_ops} = [];
341 0         0 for my $op (@ops) {
342 0 0       0 if (defined $op->{node_id}) {
343 0         0 my $conn = $self->_get_or_create_conn($op->{node_id});
344 0 0 0     0 if ($conn && $conn->connected) {
345 0         0 $op->{run}->();
346             } else {
347 0         0 push @{$cfg->{pending_ops}}, $op;
  0         0  
348             }
349             } else {
350 0         0 $op->{run}->();
351             }
352             }
353             }
354              
355             sub _get_leader {
356 0     0   0 my ($self, $topic, $partition) = @_;
357 0         0 return $self->{cfg}{leaders}{"$topic:$partition"};
358             }
359              
360             sub _num_partitions {
361 0     0   0 my ($self, $topic) = @_;
362 0 0       0 my $meta = $self->{cfg}{meta} or return 0;
363 0   0     0 for my $t (@{$meta->{topics} // []}) {
  0         0  
364 0 0       0 return scalar @{$t->{partitions}} if $t->{name} eq $topic;
  0         0  
365             }
366 0         0 return 0;
367             }
368              
369             sub _select_partition {
370 0     0   0 my ($self, $topic, $key) = @_;
371 0         0 my $np = $self->_num_partitions($topic);
372 0 0       0 return 0 unless $np > 0;
373              
374 0         0 my $cfg = $self->{cfg};
375 0 0       0 if ($cfg->{partitioner}) {
376 0         0 return $cfg->{partitioner}->($topic, $key, $np);
377             }
378 0 0 0     0 if (defined $key && length $key) {
379 0         0 return EV::Kafka::_murmur2($key) % $np;
380             }
381 0         0 return $cfg->{rr_counter}++ % $np;
382             }
383              
384             # --- Producer ---
385              
386             sub produce {
387 1     1   26 my ($self, $topic, $key, $value, @rest) = @_;
388 1         3 my $cb;
389             my %opts;
390 1         4 for my $a (@rest) {
391 0 0       0 if (ref $a eq 'CODE') { $cb = $a }
  0 0       0  
392 0         0 elsif (ref $a eq 'HASH') { %opts = %$a }
393             }
394              
395 1         8 my $cfg = $self->{cfg};
396              
397             # ensure we have metadata
398 1 50       5 unless ($cfg->{meta}) {
399 1         10 push @{$cfg->{pending_ops}}, {
400 0     0   0 run => sub { $self->produce($topic, $key, $value, @rest) },
401 1         3 };
402 1 50       8 $self->_refresh_metadata unless $cfg->{meta_pending};
403 1         4 return;
404             }
405              
406             my $partition = exists $opts{partition}
407             ? $opts{partition}
408 0 0       0 : $self->_select_partition($topic, $key);
409              
410 0         0 my $leader_id = $self->_get_leader($topic, $partition);
411 0 0       0 unless (defined $leader_id) {
412             # topic/partition unknown — request metadata for this topic to trigger auto-creation
413 0         0 push @{$cfg->{pending_ops}}, {
414 0     0   0 run => sub { $self->produce($topic, $key, $value, @rest) },
415 0         0 };
416 0 0       0 $self->_refresh_metadata_for_topic($topic) unless $cfg->{meta_pending};
417 0         0 return;
418             }
419              
420 0         0 my $conn = $self->_get_or_create_conn($leader_id);
421 0 0 0     0 unless ($conn && $conn->connected) {
422 0         0 push @{$cfg->{pending_ops}}, {
423             node_id => $leader_id,
424 0     0   0 run => sub { $self->produce($topic, $key, $value, @rest) },
425 0         0 };
426 0         0 return;
427             }
428              
429             # Accumulate into batch
430 0         0 my $bkey = "$topic:$partition";
431 0         0 my $rec = { key => $key, value => $value };
432 0 0       0 $rec->{headers} = $opts{headers} if $opts{headers};
433 0   0     0 push @{$cfg->{batches}{$bkey} //= []}, { rec => $rec, cb => $cb };
  0         0  
434              
435             # Check batch size threshold
436 0         0 my $batch = $cfg->{batches}{$bkey};
437 0         0 my $batch_bytes = 0;
438 0         0 for my $b (@$batch) {
439 0   0     0 $batch_bytes += length($b->{rec}{value} // '') + length($b->{rec}{key} // '') + 20;
      0        
440             }
441              
442 0 0       0 if ($batch_bytes >= $cfg->{batch_size}) {
    0          
443 0         0 $self->_flush_batch($topic, $partition, $conn);
444             } elsif (!$cfg->{_linger_active}) {
445             # start linger timer
446 0         0 $cfg->{_linger_active} = 1;
447 0         0 weaken(my $weak = $self);
448             $cfg->{_linger_timer} = EV::timer $cfg->{linger_ms} / 1000.0, 0, sub {
449 0     0   0 $cfg->{_linger_active} = 0;
450 0 0       0 $weak->_flush_all_batches if $weak;
451 0         0 };
452             }
453             }
454              
455             sub _flush_batch {
456 0     0   0 my ($self, $topic, $partition, $conn) = @_;
457 0         0 my $cfg = $self->{cfg};
458 0         0 my $bkey = "$topic:$partition";
459 0         0 my $batch = delete $cfg->{batches}{$bkey};
460 0 0 0     0 return unless $batch && @$batch;
461              
462 0         0 my @records = map { $_->{rec} } @$batch;
  0         0  
463 0         0 my @cbs = map { $_->{cb} } @$batch;
  0         0  
464              
465 0         0 my %popts = (acks => $cfg->{acks});
466 0 0       0 $popts{compression} = $cfg->{compression} if $cfg->{compression};
467 0 0       0 $popts{transactional_id} = $cfg->{transactional_id} if $cfg->{_txn_active};
468 0         0 my $saved_seq;
469 0 0 0     0 if (defined $cfg->{producer_id} && $cfg->{producer_id} >= 0) {
470 0         0 $popts{producer_id} = $cfg->{producer_id};
471 0         0 $popts{producer_epoch} = $cfg->{producer_epoch};
472 0   0     0 $saved_seq = $cfg->{next_sequence}{$bkey} // 0;
473 0         0 $popts{base_sequence} = $saved_seq;
474 0         0 $cfg->{next_sequence}{$bkey} = $saved_seq + scalar @records;
475             }
476              
477 0 0       0 $self->_add_txn_partition($topic, $partition) if $cfg->{_txn_active};
478              
479             # retry count persists on the batch across re-queues
480 0   0     0 $cfg->{_batch_retries}{$bkey} //= 3;
481              
482             $conn->produce_batch($topic, $partition, \@records, \%popts, sub {
483 0     0   0 my ($result, $err) = @_;
484              
485 0         0 my $retriable = 0;
486 0 0 0     0 if (!$err && $result && ref $result->{topics} eq 'ARRAY') {
      0        
487 0         0 for my $t (@{$result->{topics}}) {
  0         0  
488 0   0     0 for my $p (@{$t->{partitions} // []}) {
  0         0  
489 0   0     0 my $ec = $p->{error_code} // 0;
490 0 0 0     0 $retriable = $ec if $ec == 6 || $ec == 15 || $ec == 16;
      0        
491             }
492             }
493             }
494              
495 0 0 0     0 if ($retriable && ($cfg->{_batch_retries}{$bkey} // 0) > 0) {
      0        
496 0         0 $cfg->{_batch_retries}{$bkey}--;
497 0 0       0 $cfg->{next_sequence}{$bkey} = $saved_seq if defined $saved_seq;
498 0 0       0 if (exists $cfg->{batches}{$bkey}) {
499 0         0 unshift @{$cfg->{batches}{$bkey}}, @$batch;
  0         0  
500             } else {
501 0         0 $cfg->{batches}{$bkey} = $batch;
502             }
503 0 0       0 $self->_refresh_metadata unless $cfg->{meta_pending};
504 0         0 my $rt; $rt = EV::timer 0.5, 0, sub {
505 0         0 undef $rt;
506 0         0 $self->_flush_all_batches;
507 0         0 };
508 0         0 return;
509             }
510 0         0 delete $cfg->{_batch_retries}{$bkey};
511              
512 0         0 for my $cb (@cbs) {
513 0 0       0 $cb->($result, $err) if $cb;
514             }
515 0         0 });
516             }
517              
518             sub _flush_all_batches {
519 0     0   0 my ($self) = @_;
520 0         0 my $cfg = $self->{cfg};
521 0         0 my $skipped = 0;
522 0         0 for my $bkey (keys %{$cfg->{batches}}) {
  0         0  
523 0         0 my ($topic, $partition) = split /:/, $bkey, 2;
524 0         0 my $leader_id = $self->_get_leader($topic, $partition);
525 0 0       0 unless (defined $leader_id) { $skipped++; next }
  0         0  
  0         0  
526 0         0 my $conn = $self->_get_or_create_conn($leader_id);
527 0 0 0     0 unless ($conn && $conn->connected) { $skipped++; next }
  0         0  
  0         0  
528 0         0 $self->_flush_batch($topic, $partition, $conn);
529             }
530             # re-arm timer if batches were skipped (connection not yet ready)
531 0 0 0     0 if ($skipped && keys %{$cfg->{batches}}) {
  0         0  
532 0         0 $cfg->{_linger_active} = 1;
533 0         0 weaken(my $weak = $self);
534             $cfg->{_linger_timer} = EV::timer 0.1, 0, sub {
535 0     0   0 $cfg->{_linger_active} = 0;
536 0 0       0 $weak->_flush_all_batches if $weak;
537 0         0 };
538             }
539             }
540              
541             sub produce_many {
542 0     0   0 my ($self, $messages, $cb) = @_;
543 0         0 my $remaining = scalar @$messages;
544 0 0 0     0 return $cb->() if $cb && !$remaining;
545 0         0 my @errors;
546 0         0 my $acks0 = ($self->{cfg}{acks} == 0);
547 0         0 for my $msg (@$messages) {
548 0 0       0 my ($topic, $key, $value, @rest) = ref $msg eq 'ARRAY' ? @$msg : @{$msg}{qw(topic key value)};
  0         0  
549 0 0       0 if ($acks0) {
550 0         0 $self->produce($topic, $key, $value, @rest);
551 0         0 --$remaining;
552             } else {
553             $self->produce($topic, $key, $value, @rest, sub {
554 0     0   0 my ($result, $err) = @_;
555 0 0       0 push @errors, $err if $err;
556 0 0 0     0 if (--$remaining <= 0 && $cb) {
557 0 0       0 $cb->(@errors ? \@errors : ());
558             }
559 0         0 });
560             }
561             }
562 0 0 0     0 $cb->(@errors ? \@errors : ()) if $cb && $acks0;
    0          
563             }
564              
565             sub flush {
566 0     0   0 my ($self, $cb) = @_;
567 0         0 my $cfg = $self->{cfg};
568              
569             # flush any accumulated linger batches first
570 0         0 $self->_flush_all_batches;
571 0         0 undef $cfg->{_linger_timer};
572 0         0 $cfg->{_linger_active} = 0;
573              
574             # wait for all in-flight produce callbacks across all connections
575 0         0 my $pending = 0;
576 0         0 my %seen;
577 0   0     0 for my $conn (values %{$cfg->{conns} // {}}) {
  0         0  
578 0 0 0     0 next unless $conn && $conn->connected;
579 0         0 $pending += $conn->pending;
580 0         0 $seen{$$conn} = 1;
581             }
582 0 0 0     0 if ($cfg->{bootstrap_conn} && $cfg->{bootstrap_conn}->connected
      0        
583 0         0 && !$seen{${$cfg->{bootstrap_conn}}}) {
584 0         0 $pending += $cfg->{bootstrap_conn}->pending;
585             }
586 0 0       0 if ($pending == 0) {
587 0 0       0 $cb->() if $cb;
588 0         0 return;
589             }
590             # poll until pending drains
591 0         0 my $check; $check = EV::timer 0, 0.01, sub {
592 0     0   0 my $p = 0;
593 0         0 my %s;
594 0   0     0 for my $c (values %{$cfg->{conns} // {}}) {
  0         0  
595 0 0 0     0 next unless $c && $c->connected;
596 0         0 $p += $c->pending;
597 0         0 $s{$$c} = 1;
598             }
599             $p += $cfg->{bootstrap_conn}->pending
600             if $cfg->{bootstrap_conn} && $cfg->{bootstrap_conn}->connected
601 0 0 0     0 && !$s{${$cfg->{bootstrap_conn}}};
  0   0     0  
602 0 0       0 if ($p == 0) {
603 0         0 undef $check;
604 0 0       0 $cb->() if $cb;
605             }
606 0         0 };
607 0         0 $cfg->{_flush_timer} = $check;
608             }
609              
610             # --- Consumer ---
611              
612             sub assign {
613 0     0   0 my ($self, $partitions) = @_;
614 0         0 my $cfg = $self->{cfg};
615 0         0 $cfg->{assignments} = $partitions;
616             }
617              
618             sub seek {
619 0     0   0 my ($self, $topic, $partition, $offset_or_ts, $cb) = @_;
620             # offset_or_ts: integer offset, or -1 (latest), -2 (earliest)
621 0         0 for my $a (@{$self->{cfg}{assignments}}) {
  0         0  
622 0 0 0     0 if ($a->{topic} eq $topic && $a->{partition} == $partition) {
623 0 0       0 if ($offset_or_ts >= 0) {
624 0         0 $a->{offset} = $offset_or_ts;
625 0 0       0 $cb->() if $cb;
626             } else {
627             # resolve via list_offsets
628 0         0 my $leader_id = $self->_get_leader($topic, $partition);
629 0 0       0 my $conn = defined($leader_id) ? $self->_get_or_create_conn($leader_id) : undef;
630 0 0 0     0 if ($conn && $conn->connected) {
631             $conn->list_offsets($topic, $partition, $offset_or_ts, sub {
632 0     0   0 my ($res, $err) = @_;
633 0 0 0     0 if (!$err && $res) {
634 0         0 my $off = $res->{topics}[0]{partitions}[0]{offset};
635 0 0       0 $a->{offset} = $off if defined $off;
636             }
637 0 0       0 $cb->() if $cb;
638 0         0 });
639             } else {
640 0 0       0 $cb->() if $cb;
641             }
642             }
643 0         0 return;
644             }
645             }
646 0 0       0 $cb->() if $cb;
647             }
648              
649             sub poll {
650 0     0   0 my ($self, $cb) = @_;
651 0         0 my $cfg = $self->{cfg};
652 0 0       0 return unless @{$cfg->{assignments}};
  0         0  
653              
654 0 0       0 unless ($cfg->{meta}) {
655 0         0 push @{$cfg->{pending_ops}}, {
656 0     0   0 run => sub { $self->poll($cb) },
657 0         0 };
658 0 0       0 $self->_refresh_metadata unless $cfg->{meta_pending};
659 0         0 return;
660             }
661              
662             # Group assignments by leader for multi-partition fetch
663 0         0 my %by_leader; # leader_id => { topic => [{partition, offset, assign_ref}] }
664 0         0 for my $a (@{$cfg->{assignments}}) {
  0         0  
665 0         0 my $leader_id = $self->_get_leader($a->{topic}, $a->{partition});
666 0 0       0 next unless defined $leader_id;
667 0         0 push @{$by_leader{$leader_id}{$a->{topic}}}, {
668             partition => $a->{partition},
669             offset => $a->{offset},
670 0         0 _assign => $a,
671             };
672             }
673              
674 0         0 my $dispatched = 0;
675 0         0 for my $leader_id (keys %by_leader) {
676 0         0 my $conn = $self->_get_or_create_conn($leader_id);
677 0 0 0     0 next unless $conn && $conn->connected;
678 0         0 $dispatched++;
679              
680             # build fetch_multi argument: {topic => [{partition, offset}]}
681 0         0 my %fetch_arg;
682             my %assign_map; # "topic:partition" => assignment ref
683 0         0 for my $topic (keys %{$by_leader{$leader_id}}) {
  0         0  
684 0         0 for my $p (@{$by_leader{$leader_id}{$topic}}) {
  0         0  
685 0         0 push @{$fetch_arg{$topic}}, {
686             partition => $p->{partition},
687             offset => $p->{offset},
688 0         0 };
689 0         0 $assign_map{"$topic:$p->{partition}"} = $p->{_assign};
690             }
691             }
692              
693             $conn->fetch_multi(\%fetch_arg, sub {
694 0     0   0 my ($result, $err) = @_;
695 0         0 $dispatched--;
696              
697 0 0 0     0 if (!$err && $result && ref $result->{topics} eq 'ARRAY') {
      0        
698 0         0 for my $t (@{$result->{topics}}) {
  0         0  
699 0   0     0 for my $p (@{$t->{partitions} // []}) {
  0         0  
700 0   0     0 my $records = $p->{records} // [];
701 0         0 for my $r (@$records) {
702 0 0       0 if ($cfg->{on_message}) {
703             $cfg->{on_message}->(
704             $t->{topic}, $p->{partition},
705             $r->{offset}, $r->{key}, $r->{value},
706             $r->{headers}
707 0         0 );
708             }
709             }
710 0 0       0 if (@$records) {
711 0         0 my $a = $assign_map{"$t->{topic}:$p->{partition}"};
712 0 0       0 $a->{offset} = $records->[-1]{offset} + 1 if $a;
713             }
714             }
715             }
716             }
717              
718 0 0 0     0 $cb->() if $cb && $dispatched <= 0;
719 0         0 });
720             }
721 0 0 0     0 $cb->() if $cb && !$dispatched;
722             }
723              
724             sub offsets_for {
725 0     0   0 my ($self, $topic, $cb) = @_;
726 0         0 my $cfg = $self->{cfg};
727              
728 0         0 my $np = $self->_num_partitions($topic);
729 0 0 0     0 return $cb->({}) if $cb && !$np;
730              
731 0         0 my $result = {};
732 0         0 my $remaining = $np;
733 0         0 for my $p (0..$np-1) {
734 0         0 my $pid = $p;
735 0         0 my $leader_id = $self->_get_leader($topic, $pid);
736 0 0       0 my $conn = defined($leader_id) ? $self->_get_or_create_conn($leader_id) : undef;
737 0 0 0     0 unless ($conn && $conn->connected) {
738 0         0 $result->{$pid} = {};
739 0 0 0     0 $cb->($result) if $cb && --$remaining <= 0;
740 0         0 next;
741             }
742 0         0 my %pdata;
743 0         0 my $pdone = 0;
744 0         0 for my $ts (-2, -1) {
745             $conn->list_offsets($topic, $pid, $ts, sub {
746 0     0   0 my ($res, $err) = @_;
747 0 0 0     0 if (!$err && $res && ref $res->{topics} eq 'ARRAY') {
      0        
748 0         0 my $off = $res->{topics}[0]{partitions}[0]{offset};
749 0 0       0 if ($ts == -2) { $pdata{earliest} = $off }
  0         0  
750 0         0 else { $pdata{latest} = $off }
751             }
752 0 0       0 if (++$pdone == 2) {
753 0         0 $result->{$pid} = \%pdata;
754 0 0 0     0 $cb->($result) if $cb && --$remaining <= 0;
755             }
756 0         0 });
757             }
758             }
759             }
760              
761             sub lag {
762 0     0   0 my ($self, $cb) = @_;
763 0         0 my $cfg = $self->{cfg};
764 0   0     0 my @assignments = @{$cfg->{assignments} // []};
  0         0  
765 0 0 0     0 return $cb->({}) if $cb && !@assignments;
766              
767 0         0 my $result = {};
768 0         0 my $remaining = scalar @assignments;
769 0         0 for my $a (@assignments) {
770 0         0 my $key = "$a->{topic}:$a->{partition}";
771 0         0 my $leader_id = $self->_get_leader($a->{topic}, $a->{partition});
772 0 0       0 my $conn = defined($leader_id) ? $self->_get_or_create_conn($leader_id) : undef;
773 0 0 0     0 if ($conn && $conn->connected) {
774             $conn->list_offsets($a->{topic}, $a->{partition}, -1, sub {
775 0     0   0 my ($res, $err) = @_;
776 0         0 my $hw = 0;
777 0 0 0     0 if (!$err && $res && ref $res->{topics} eq 'ARRAY') {
      0        
778 0   0     0 $hw = $res->{topics}[0]{partitions}[0]{offset} // 0;
779             }
780             $result->{$key} = {
781             current => $a->{offset},
782             latest => $hw,
783             lag => $hw - $a->{offset},
784 0         0 };
785 0 0 0     0 $cb->($result) if $cb && --$remaining <= 0;
786 0         0 });
787             } else {
788 0         0 $result->{$key} = { current => $a->{offset}, latest => 0, lag => 0 };
789 0 0 0     0 $cb->($result) if $cb && --$remaining <= 0;
790             }
791             }
792             }
793              
794             sub error_name {
795 0 0   0   0 shift if ref $_[0]; # allow $kafka->error_name or EV::Kafka::Client::error_name
796 0         0 return EV::Kafka::_error_name($_[0]);
797             }
798              
799             # --- Consumer Group ---
800              
801             sub subscribe {
802 0     0   0 my ($self, @args) = @_;
803 0         0 my @topics;
804             my %opts;
805              
806             # subscribe('topic1', 'topic2', group_id => 'g', ...)
807 0         0 while (@args) {
808 0 0       0 if ($args[0] =~ /^(group_id|group_instance_id|on_assign|on_revoke|session_timeout|rebalance_timeout|heartbeat_interval|auto_commit|auto_offset_reset)$/) {
809 0         0 my $k = shift @args;
810 0         0 $opts{$k} = shift @args;
811             } else {
812 0         0 push @topics, shift @args;
813             }
814             }
815              
816 0         0 my $cfg = $self->{cfg};
817 0 0       0 my $group_id = $opts{group_id} or die "group_id required";
818              
819             $cfg->{group} = {
820             group_id => $group_id,
821             member_id => '',
822             generation => -1,
823             topics => \@topics,
824             on_assign => $opts{on_assign},
825             on_revoke => $opts{on_revoke},
826             session_timeout => $opts{session_timeout} // 30000,
827             rebalance_timeout => $opts{rebalance_timeout} // 60000,
828             heartbeat_interval => $opts{heartbeat_interval} // 3,
829             auto_commit => $opts{auto_commit} // 1,
830             auto_offset_reset => $opts{auto_offset_reset} // 'earliest',
831             group_instance_id => $opts{group_instance_id},
832 0   0     0 coordinator => undef,
      0        
      0        
      0        
      0        
833             heartbeat_timer => undef,
834             state => 'init',
835             };
836              
837             # Step 1: ensure we have metadata
838 0 0       0 unless ($cfg->{meta}) {
839 0         0 push @{$cfg->{pending_ops}}, {
840 0     0   0 run => sub { $self->_group_start },
841 0         0 };
842 0 0       0 $self->_refresh_metadata unless $cfg->{meta_pending};
843 0         0 return;
844             }
845              
846 0         0 $self->_group_start;
847             }
848              
849             sub _group_start {
850 0     0   0 my ($self) = @_;
851 0         0 my $cfg = $self->{cfg};
852 0 0       0 my $g = $cfg->{group} or return;
853              
854 0         0 my $conn = $self->_any_conn;
855 0 0       0 return unless $conn;
856              
857 0         0 $g->{state} = 'finding';
858             $conn->find_coordinator($g->{group_id}, sub {
859 0     0   0 my ($res, $err) = @_;
860 0 0 0     0 if ($err || $res->{error_code}) {
861 0   0     0 my $msg = $err || "FindCoordinator error: $res->{error_code}";
862 0 0       0 $cfg->{on_error}->($msg) if $cfg->{on_error};
863             # retry after delay
864 0         0 my $t; $t = EV::timer 1, 0, sub { undef $t; $self->_group_start };
  0         0  
  0         0  
  0         0  
865 0         0 return;
866             }
867              
868             # Store coordinator info and connect
869             $cfg->{broker_map}{$res->{node_id}} = {
870             host => $res->{host}, port => $res->{port}
871 0         0 };
872 0         0 my $coord = $self->_get_or_create_conn($res->{node_id});
873 0         0 $g->{coordinator} = $coord;
874 0         0 $g->{coordinator_id} = $res->{node_id};
875              
876 0 0       0 if ($coord->connected) {
877 0         0 $self->_group_join;
878             } else {
879 0         0 push @{$cfg->{pending_ops}}, {
880             node_id => $res->{node_id},
881 0         0 run => sub { $self->_group_join },
882 0         0 };
883             }
884 0         0 });
885             }
886              
887             sub _group_join {
888 0     0   0 my ($self) = @_;
889 0         0 my $cfg = $self->{cfg};
890 0 0       0 my $g = $cfg->{group} or return;
891 0 0       0 my $coord = $g->{coordinator} or return;
892              
893 0         0 $g->{state} = 'joining';
894             $coord->join_group(
895             $g->{group_id}, $g->{member_id},
896             $g->{topics}, sub {
897 0     0   0 my ($res, $err) = @_;
898 0 0       0 if ($err) {
899 0 0       0 $cfg->{on_error}->("JoinGroup: $err") if $cfg->{on_error};
900 0         0 return;
901             }
902              
903 0 0 0     0 if ($res->{error_code} == 15 || $res->{error_code} == 16) {
904             # COORDINATOR_NOT_AVAILABLE / NOT_COORDINATOR — re-discover
905 0         0 my $t; $t = EV::timer 1, 0, sub { undef $t; $self->_group_start };
  0         0  
  0         0  
  0         0  
906 0         0 return;
907             }
908 0 0       0 if ($res->{error_code} == 27) {
909             # REBALANCE_IN_PROGRESS — retry
910 0         0 my $t; $t = EV::timer 1, 0, sub { undef $t; $self->_group_join };
  0         0  
  0         0  
  0         0  
911 0         0 return;
912             }
913 0 0       0 if ($res->{error_code} == 79) {
914             # MEMBER_ID_REQUIRED — retry with assigned member_id
915 0 0       0 $g->{member_id} = $res->{member_id} if $res->{member_id};
916 0         0 $self->_group_join;
917 0         0 return;
918             }
919 0 0       0 if ($res->{error_code}) {
920 0 0       0 $cfg->{on_error}->("JoinGroup error: $res->{error_code}") if $cfg->{on_error};
921 0         0 return;
922             }
923              
924 0         0 $g->{member_id} = $res->{member_id};
925 0         0 $g->{generation} = $res->{generation_id};
926 0         0 my $is_leader = ($res->{leader} eq $res->{member_id});
927              
928             # Build assignments (if leader)
929 0         0 my $assignments = [];
930 0 0 0     0 if ($is_leader && $res->{members} && @{$res->{members}}) {
  0   0     0  
931 0         0 $assignments = $self->_assign_partitions($res->{members}, $g->{topics});
932             }
933              
934 0         0 $self->_group_sync($assignments);
935             },
936             $g->{session_timeout}, $g->{rebalance_timeout},
937             $g->{group_instance_id}
938 0         0 );
939             }
940              
941             sub _assign_partitions {
942 7     7   10224 my ($self, $members, $topics) = @_;
943 7         14 my $cfg = $self->{cfg};
944 7 50       24 my $meta = $cfg->{meta} or return [];
945              
946 7         7 my @all_parts;
947 7   50     9 for my $t (@{$meta->{topics} // []}) {
  7         19  
948 11         17 my $tname = $t->{name};
949 11 50       17 next unless grep { $_ eq $tname } @$topics;
  19         36  
950 11   50     12 for my $p (@{$t->{partitions} // []}) {
  11         20  
951 36         77 push @all_parts, { topic => $tname, partition => $p->{partition} };
952             }
953             }
954 7 50       31 @all_parts = sort { $a->{topic} cmp $b->{topic} || $a->{partition} <=> $b->{partition} } @all_parts;
  48         113  
955              
956 7         15 my @member_ids = sort map { $_->{member_id} } @$members;
  14         39  
957 7         13 my $nm = scalar @member_ids;
958              
959             # Sticky assignment: preserve previous assignments where possible
960 7   100     19 my $prev = $cfg->{_prev_assignments} // {};
961 7         10 my %member_parts; # member_id => [@parts]
962             my %assigned; # "topic:partition" => 1
963              
964 7 100       48 my $max_per = int(@all_parts / $nm) + ((@all_parts % $nm) ? 1 : 0);
965              
966             # Step 1: keep valid previous assignments (but cap at max_per)
967 7         16 for my $mid (@member_ids) {
968 14         25 $member_parts{$mid} = [];
969 14   100     16 for my $p (@{$prev->{$mid} // []}) {
  14         38  
970 10         12 my $key = "$p->{topic}:$p->{partition}";
971 10 100       10 if (grep { $_->{topic} eq $p->{topic} && $_->{partition} == $p->{partition} } @all_parts) {
  60 50       128  
972 10 100       10 if (scalar @{$member_parts{$mid}} < $max_per) {
  10         14  
973 8         5 push @{$member_parts{$mid}}, $p;
  8         9  
974 8         14 $assigned{$key} = 1;
975             }
976             }
977             }
978             }
979              
980             # Step 2: distribute unassigned partitions to least-loaded members
981 7         12 my @unassigned = grep { !$assigned{"$_->{topic}:$_->{partition}"} } @all_parts;
  36         70  
982 7         13 for my $p (@unassigned) {
983 28         27 my $min_mid = $member_ids[0];
984 28         25 my $min_count = scalar @{$member_parts{$min_mid}};
  28         33  
985 28         25 for my $mid (@member_ids) {
986 52 100       35 if (scalar @{$member_parts{$mid}} < $min_count) {
  52         75  
987 10         8 $min_count = scalar @{$member_parts{$mid}};
  10         12  
988 10         18 $min_mid = $mid;
989             }
990             }
991 28         22 push @{$member_parts{$min_mid}}, $p;
  28         36  
992             }
993              
994             # Save for next rebalance
995 7         19 $cfg->{_prev_assignments} = { %member_parts };
996              
997             # Encode assignments
998 7         12 my @assignments;
999 7         9 for my $mid (@member_ids) {
1000 14         16 my %by_topic;
1001 14         12 for my $p (@{$member_parts{$mid}}) {
  14         15  
1002 36         22 push @{$by_topic{$p->{topic}}}, $p->{partition};
  36         68  
1003             }
1004              
1005 14         16 my $buf = '';
1006 14         17 $buf .= pack('n', 0); # version
1007 14         35 $buf .= pack('N', scalar keys %by_topic);
1008 14         28 for my $t (sort keys %by_topic) {
1009 20         29 $buf .= pack('n', length($t)) . $t;
1010 20         22 $buf .= pack('N', scalar @{$by_topic{$t}});
  20         45  
1011 20         19 for my $pid (@{$by_topic{$t}}) {
  20         24  
1012 36         55 $buf .= pack('N', $pid);
1013             }
1014             }
1015 14         16 $buf .= pack('N', -1); # user_data = null
1016              
1017 14         43 push @assignments, {
1018             member_id => $mid,
1019             assignment => $buf,
1020             };
1021             }
1022              
1023 7         46 return \@assignments;
1024             }
1025              
1026             sub _group_sync {
1027 0     0   0 my ($self, $assignments) = @_;
1028 0         0 my $cfg = $self->{cfg};
1029 0 0       0 my $g = $cfg->{group} or return;
1030 0 0       0 my $coord = $g->{coordinator} or return;
1031              
1032 0         0 $g->{state} = 'syncing';
1033             my $sync_cb = sub {
1034 0     0   0 my ($res, $err) = @_;
1035 0 0       0 if ($err) {
1036 0 0       0 $cfg->{on_error}->("SyncGroup: $err") if $cfg->{on_error};
1037 0         0 return;
1038             }
1039 0 0       0 if ($res->{error_code} == 27) {
1040             # REBALANCE_IN_PROGRESS — rejoin
1041 0         0 my $t; $t = EV::timer 1, 0, sub { undef $t; $self->_group_join };
  0         0  
  0         0  
  0         0  
1042 0         0 return;
1043             }
1044 0 0       0 if ($res->{error_code}) {
1045 0 0       0 $cfg->{on_error}->("SyncGroup error: $res->{error_code}") if $cfg->{on_error};
1046 0         0 return;
1047             }
1048              
1049             # Decode assignment
1050 0   0     0 my $data = $res->{assignment} // '';
1051 0         0 my $dlen = length $data;
1052 0         0 my @my_assignments;
1053 0 0       0 if ($dlen >= 6) {
1054 0         0 my $off = 2; # skip version
1055 0         0 my $tc = unpack('N', substr($data, $off, 4)); $off += 4;
  0         0  
1056 0         0 for my $i (0..$tc-1) {
1057 0 0       0 last unless $off + 2 <= $dlen;
1058 0         0 my $tlen = unpack('n', substr($data, $off, 2)); $off += 2;
  0         0  
1059 0 0       0 last unless $off + $tlen <= $dlen;
1060 0         0 my $tname = substr($data, $off, $tlen); $off += $tlen;
  0         0  
1061 0 0       0 last unless $off + 4 <= $dlen;
1062 0         0 my $pc = unpack('N', substr($data, $off, 4)); $off += 4;
  0         0  
1063 0         0 for my $j (0..$pc-1) {
1064 0 0       0 last unless $off + 4 <= $dlen;
1065 0         0 my $pid = unpack('N', substr($data, $off, 4)); $off += 4;
  0         0  
1066 0 0 0     0 my $reset = ($g->{auto_offset_reset} // 'earliest') eq 'latest' ? -1 : -2;
1067 0         0 push @my_assignments, {
1068             topic => $tname, partition => $pid, offset => $reset
1069             };
1070             }
1071             }
1072             }
1073              
1074 0         0 $g->{state} = 'stable';
1075              
1076             # Fetch committed offsets, then start consuming
1077             $self->_fetch_committed_offsets(\@my_assignments, sub {
1078 0         0 $cfg->{assignments} = \@my_assignments;
1079              
1080             # Fire on_assign
1081 0 0       0 $g->{on_assign}->(\@my_assignments) if $g->{on_assign};
1082              
1083             # Start heartbeat
1084 0         0 $self->_start_heartbeat;
1085              
1086             # Start fetch loop
1087 0         0 $self->_start_fetch_loop;
1088 0         0 });
1089 0         0 };
1090             $coord->sync_group(
1091             $g->{group_id}, $g->{generation}, $g->{member_id},
1092             $assignments, $sync_cb, $g->{group_instance_id}
1093 0         0 );
1094             }
1095              
1096             sub _fetch_committed_offsets {
1097 0     0   0 my ($self, $assignments, $cb) = @_;
1098 0         0 my $cfg = $self->{cfg};
1099 0 0       0 my $g = $cfg->{group} or return $cb->();
1100 0         0 my $coord = $g->{coordinator};
1101 0 0 0     0 return $cb->() unless $coord && $coord->connected && @$assignments;
      0        
1102              
1103             # Build topics array for offset_fetch
1104 0         0 my %by_topic;
1105 0         0 for my $a (@$assignments) {
1106 0         0 push @{$by_topic{$a->{topic}}}, $a->{partition};
  0         0  
1107             }
1108 0         0 my @topics;
1109 0         0 for my $t (sort keys %by_topic) {
1110 0         0 push @topics, { topic => $t, partitions => $by_topic{$t} };
1111             }
1112              
1113             $coord->offset_fetch($g->{group_id}, \@topics, sub {
1114 0     0   0 my ($res, $err) = @_;
1115 0 0 0     0 if (!$err && $res && ref $res->{topics} eq 'ARRAY') {
      0        
1116 0         0 for my $t (@{$res->{topics}}) {
  0         0  
1117 0   0     0 for my $p (@{$t->{partitions} // []}) {
  0         0  
1118 0 0       0 next if $p->{error_code};
1119 0 0       0 next if $p->{offset} < 0; # no committed offset
1120 0         0 for my $a (@$assignments) {
1121 0 0 0     0 if ($a->{topic} eq $t->{topic} && $a->{partition} == $p->{partition}) {
1122 0         0 $a->{offset} = $p->{offset};
1123             }
1124             }
1125             }
1126             }
1127             }
1128              
1129             # For partitions with unresolved offset (-2=earliest, -1=latest), resolve via ListOffsets
1130 0         0 my @need_offsets = grep { $_->{offset} < 0 } @$assignments;
  0         0  
1131 0 0       0 if (@need_offsets) {
1132 0         0 my $remaining = scalar @need_offsets;
1133 0         0 for my $a (@need_offsets) {
1134 0         0 my $leader_id = $self->_get_leader($a->{topic}, $a->{partition});
1135 0 0       0 my $lconn = defined($leader_id) ? $self->_get_or_create_conn($leader_id) : undef;
1136 0 0 0     0 if ($lconn && $lconn->connected) {
1137             $lconn->list_offsets($a->{topic}, $a->{partition}, $a->{offset}, sub {
1138 0         0 my ($lres, $lerr) = @_;
1139 0 0 0     0 if (!$lerr && $lres && ref $lres->{topics} eq 'ARRAY') {
      0        
1140 0         0 for my $lt (@{$lres->{topics}}) {
  0         0  
1141 0   0     0 for my $lp (@{$lt->{partitions} // []}) {
  0         0  
1142 0 0       0 $a->{offset} = $lp->{offset} if !$lp->{error_code};
1143             }
1144             }
1145             }
1146 0         0 $remaining--;
1147 0 0       0 $cb->() if $remaining <= 0;
1148 0         0 });
1149             } else {
1150 0         0 $a->{offset} = 0;
1151 0         0 $remaining--;
1152 0 0       0 $cb->() if $remaining <= 0;
1153             }
1154             }
1155             } else {
1156 0         0 $cb->();
1157             }
1158 0         0 });
1159             }
1160              
1161             sub _start_heartbeat {
1162 0     0   0 my ($self) = @_;
1163 0         0 my $cfg = $self->{cfg};
1164 0 0       0 my $g = $cfg->{group} or return;
1165              
1166             $g->{heartbeat_timer} = EV::timer $g->{heartbeat_interval}, $g->{heartbeat_interval}, sub {
1167 0 0   0   0 return unless $g->{state} eq 'stable';
1168 0         0 my $coord = $g->{coordinator};
1169 0 0 0     0 return unless $coord && $coord->connected;
1170              
1171             $coord->heartbeat($g->{group_id}, $g->{generation}, $g->{member_id}, sub {
1172 0         0 my ($res, $err) = @_;
1173 0 0       0 if ($err) { return }
  0         0  
1174 0 0 0     0 if ($res && $res->{error_code} == 27) {
1175             # REBALANCE_IN_PROGRESS
1176 0         0 $g->{state} = 'rebalancing';
1177 0 0       0 $g->{on_revoke}->($cfg->{assignments}) if $g->{on_revoke};
1178 0         0 $self->_stop_heartbeat;
1179 0         0 $self->_stop_fetch_loop;
1180 0         0 $self->_group_join;
1181             }
1182 0         0 }, $g->{group_instance_id});
1183 0         0 };
1184             }
1185              
1186             sub _stop_heartbeat {
1187 4     4   8 my ($self) = @_;
1188 4 50       14 my $g = $self->{cfg}{group} or return;
1189 0         0 undef $g->{heartbeat_timer};
1190             }
1191              
1192             sub _start_fetch_loop {
1193 0     0   0 my ($self) = @_;
1194 0         0 my $cfg = $self->{cfg};
1195 0 0       0 return if $cfg->{fetch_active};
1196 0         0 $cfg->{fetch_active} = 1;
1197              
1198             $cfg->{fetch_timer} = EV::timer 0, 0.1, sub {
1199 0 0   0   0 return unless $cfg->{fetch_active};
1200 0         0 $self->poll;
1201 0         0 };
1202             }
1203              
1204             sub _stop_fetch_loop {
1205 4     4   9 my ($self) = @_;
1206 4         8 my $cfg = $self->{cfg};
1207 4         7 $cfg->{fetch_active} = 0;
1208 4         9 undef $cfg->{fetch_timer};
1209             }
1210              
1211             sub commit {
1212 0     0   0 my ($self, $cb) = @_;
1213 0         0 my $cfg = $self->{cfg};
1214 0         0 my $g = $cfg->{group};
1215 0 0       0 unless ($g) { $cb->() if $cb; return }
  0 0       0  
  0         0  
1216 0         0 my $coord = $g->{coordinator};
1217 0 0 0     0 unless ($coord && $coord->connected) { $cb->() if $cb; return }
  0 0       0  
  0         0  
1218              
1219             # Build offset commit data from current assignments
1220 0         0 my %by_topic;
1221 0   0     0 for my $a (@{$cfg->{assignments} // []}) {
  0         0  
1222 0         0 push @{$by_topic{$a->{topic}}}, {
1223             partition => $a->{partition},
1224             offset => $a->{offset},
1225 0         0 };
1226             }
1227              
1228 0         0 my @topics;
1229 0         0 for my $t (sort keys %by_topic) {
1230 0         0 push @topics, { topic => $t, partitions => $by_topic{$t} };
1231             }
1232              
1233 0 0       0 if (!@topics) { $cb->() if $cb; return }
  0 0       0  
  0         0  
1234              
1235             $coord->offset_commit($g->{group_id}, $g->{generation}, $g->{member_id}, \@topics, sub {
1236 0     0   0 my ($res, $err) = @_;
1237 0 0       0 $cb->($err) if $cb;
1238 0         0 });
1239             }
1240              
1241             sub unsubscribe {
1242 0     0   0 my ($self, $cb) = @_;
1243 0         0 my $cfg = $self->{cfg};
1244 0         0 my $g = $cfg->{group};
1245              
1246 0         0 $self->_stop_heartbeat;
1247 0         0 $self->_stop_fetch_loop;
1248              
1249             my $finish = sub {
1250             # send LeaveGroup to coordinator for fast rebalance
1251 0 0 0 0   0 if ($g && $g->{coordinator} && $g->{coordinator}->connected && $g->{member_id}) {
      0        
      0        
1252             $g->{coordinator}->leave_group($g->{group_id}, $g->{member_id}, sub {
1253 0         0 $cfg->{assignments} = [];
1254 0         0 $cfg->{group} = undef;
1255 0 0       0 $cb->() if $cb;
1256 0         0 });
1257             } else {
1258 0         0 $cfg->{assignments} = [];
1259 0         0 $cfg->{group} = undef;
1260 0 0       0 $cb->() if $cb;
1261             }
1262 0         0 };
1263              
1264 0 0 0     0 if ($g && $g->{auto_commit}) {
1265 0     0   0 $self->commit(sub { $finish->() });
  0         0  
1266             } else {
1267 0         0 $finish->();
1268             }
1269             }
1270              
1271             # --- Transactions ---
1272              
1273             sub begin_transaction {
1274 0     0   0 my ($self) = @_;
1275 0         0 my $cfg = $self->{cfg};
1276 0 0       0 die "transactional_id required" unless $cfg->{transactional_id};
1277 0 0 0     0 die "producer_id not initialized" unless defined $cfg->{producer_id} && $cfg->{producer_id} >= 0;
1278 0         0 $cfg->{_txn_active} = 1;
1279 0         0 $cfg->{_txn_partitions} = {}; # "topic:partition" => 1
1280             }
1281              
1282             sub _txn_conn {
1283 0     0   0 my ($self) = @_;
1284 0         0 my $cfg = $self->{cfg};
1285 0         0 my $conn = $cfg->{_txn_coordinator};
1286 0 0 0     0 return $conn if $conn && $conn->connected;
1287 0         0 return $self->_any_conn;
1288             }
1289              
1290             sub _add_txn_partition {
1291 0     0   0 my ($self, $topic, $partition) = @_;
1292 0         0 my $cfg = $self->{cfg};
1293 0 0       0 return unless $cfg->{_txn_active};
1294 0         0 my $key = "$topic:$partition";
1295 0 0       0 return if $cfg->{_txn_partitions}{$key}++;
1296              
1297 0         0 my $conn = $self->_txn_conn;
1298 0 0       0 return unless $conn;
1299              
1300             $conn->add_partitions_to_txn(
1301             $cfg->{transactional_id}, $cfg->{producer_id},
1302             $cfg->{producer_epoch},
1303             [{ topic => $topic, partitions => [$partition] }],
1304       0     sub {}
1305 0         0 );
1306             }
1307              
1308             sub commit_transaction {
1309 0     0   0 my ($self, $cb) = @_;
1310 0         0 my $cfg = $self->{cfg};
1311 0 0       0 die "no active transaction" unless $cfg->{_txn_active};
1312              
1313             # flush all pending batches first
1314             $self->flush(sub {
1315 0     0   0 my $conn = $self->_txn_conn;
1316 0 0       0 unless ($conn) { $cb->(undef) if $cb; return }
  0 0       0  
  0         0  
1317              
1318             $conn->end_txn($cfg->{transactional_id}, $cfg->{producer_id},
1319             $cfg->{producer_epoch}, 1, sub {
1320 0         0 my ($res, $err) = @_;
1321 0         0 $cfg->{_txn_active} = 0;
1322 0         0 $cfg->{_txn_partitions} = {};
1323 0 0       0 $cb->($res, $err) if $cb;
1324 0         0 });
1325 0         0 });
1326             }
1327              
1328             sub send_offsets_to_transaction {
1329 0     0   0 my ($self, $group_id, $cb) = @_;
1330 0         0 my $cfg = $self->{cfg};
1331 0 0       0 die "no active transaction" unless $cfg->{_txn_active};
1332 0 0       0 die "transactional_id required" unless $cfg->{transactional_id};
1333              
1334             # gather current consumer offsets
1335 0         0 my %by_topic;
1336 0   0     0 for my $a (@{$cfg->{assignments} // []}) {
  0         0  
1337 0         0 push @{$by_topic{$a->{topic}}}, {
1338             partition => $a->{partition},
1339             offset => $a->{offset},
1340 0         0 };
1341             }
1342              
1343 0         0 my @topics;
1344 0         0 for my $t (sort keys %by_topic) {
1345 0         0 push @topics, { topic => $t, partitions => $by_topic{$t} };
1346             }
1347              
1348 0 0       0 unless (@topics) {
1349 0 0       0 $cb->() if $cb;
1350 0         0 return;
1351             }
1352              
1353 0         0 my $conn = $self->_txn_conn;
1354 0 0       0 unless ($conn) { $cb->() if $cb; return }
  0 0       0  
  0         0  
1355              
1356 0         0 my $g = $cfg->{group};
1357 0 0       0 my $generation = $g ? $g->{generation} : -1;
1358 0 0 0     0 my $member_id = $g ? ($g->{member_id} // '') : '';
1359              
1360             $conn->txn_offset_commit(
1361             $cfg->{transactional_id}, $group_id,
1362             $cfg->{producer_id}, $cfg->{producer_epoch},
1363             $generation, $member_id,
1364             \@topics, sub {
1365 0     0   0 my ($res, $err) = @_;
1366 0 0       0 $cb->($res, $err) if $cb;
1367             }
1368 0         0 );
1369             }
1370              
1371             sub abort_transaction {
1372 0     0   0 my ($self, $cb) = @_;
1373 0         0 my $cfg = $self->{cfg};
1374 0 0       0 die "no active transaction" unless $cfg->{_txn_active};
1375              
1376             # discard unsent batches — they must not reach the broker after abort
1377 0         0 $cfg->{batches} = {};
1378 0         0 undef $cfg->{_linger_timer};
1379 0         0 $cfg->{_linger_active} = 0;
1380              
1381 0         0 my $conn = $self->_txn_conn;
1382 0 0       0 unless ($conn) { $cb->(undef) if $cb; return }
  0 0       0  
  0         0  
1383              
1384             $conn->end_txn($cfg->{transactional_id}, $cfg->{producer_id},
1385             $cfg->{producer_epoch}, 0, sub {
1386 0     0   0 my ($res, $err) = @_;
1387 0         0 $cfg->{_txn_active} = 0;
1388 0         0 $cfg->{_txn_partitions} = {};
1389 0 0       0 $cb->($res, $err) if $cb;
1390 0         0 });
1391             }
1392              
1393             sub close {
1394 4     4   8 my ($self, $cb) = @_;
1395 4 50       10 my $cfg = $self->{cfg} or return;
1396              
1397 4         17 $self->_stop_heartbeat;
1398 4         16 $self->_stop_fetch_loop;
1399              
1400 4   50     5 for my $conn (values %{$cfg->{conns} // {}}) {
  4         19  
1401 0 0 0     0 eval { $conn->disconnect if $conn && $conn->connected };
  0         0  
1402             }
1403 4 50       10 if ($cfg->{bootstrap_conn}) {
1404 0         0 eval { $cfg->{bootstrap_conn}->disconnect
1405 0 0       0 if $cfg->{bootstrap_conn}->connected };
1406             }
1407 4         12 $cfg->{conns} = {};
1408 4         6 $cfg->{bootstrap_conn} = undef;
1409 4 50       179 $cb->() if $cb;
1410             }
1411              
1412             sub DESTROY {
1413 4     4   10219 my $self = shift;
1414 4 50 33     39 return unless $self && $self->{cfg};
1415 4         17 $self->close;
1416             }
1417              
1418             package EV::Kafka::Conn;
1419              
1420             1;
1421              
1422             =head1 NAME
1423              
1424             EV::Kafka - High-performance asynchronous Kafka/Redpanda client using EV
1425              
1426             =head1 SYNOPSIS
1427              
1428             use EV::Kafka;
1429              
1430             my $kafka = EV::Kafka->new(
1431             brokers => '127.0.0.1:9092',
1432             acks => -1,
1433             on_error => sub { warn "kafka: @_" },
1434             on_message => sub {
1435             my ($topic, $partition, $offset, $key, $value, $headers) = @_;
1436             print "$topic:$partition @ $offset $key = $value\n";
1437             },
1438             );
1439              
1440             # Producer
1441             $kafka->connect(sub {
1442             $kafka->produce('my-topic', 'key', 'value', sub {
1443             my ($result, $err) = @_;
1444             say "produced at offset " . $result->{topics}[0]{partitions}[0]{base_offset};
1445             });
1446             });
1447              
1448             # Consumer (manual assignment)
1449             $kafka->assign([{ topic => 'my-topic', partition => 0, offset => 0 }]);
1450             my $poll = EV::timer 0, 0.1, sub { $kafka->poll };
1451              
1452             # Consumer group
1453             $kafka->subscribe('my-topic',
1454             group_id => 'my-group',
1455             on_assign => sub { ... },
1456             on_revoke => sub { ... },
1457             );
1458              
1459             EV::run;
1460              
1461             =head1 DESCRIPTION
1462              
1463             EV::Kafka is a high-performance asynchronous Kafka client that implements
1464             the Kafka binary protocol in XS with L event loop integration. It
1465             targets Redpanda and Apache Kafka (protocol version 0.11+).
1466              
1467             Two-layer architecture:
1468              
1469             =over
1470              
1471             =item * B (XS) -- single broker TCP connection with
1472             protocol encoding/decoding, correlation ID matching, pipelining,
1473             optional TLS and SASL/PLAIN authentication.
1474              
1475             =item * B (Perl) -- cluster management with metadata
1476             discovery, broker connection pooling, partition leader routing, producer
1477             with key-based partitioning, consumer with manual assignment or consumer
1478             groups.
1479              
1480             =back
1481              
1482             Features:
1483              
1484             =over
1485              
1486             =item * Binary protocol implemented in pure XS (no librdkafka dependency)
1487              
1488             =item * Automatic request pipelining per broker connection
1489              
1490             =item * Metadata-driven partition leader routing
1491              
1492             =item * Producer: acks modes (-1/0/1), key-based partitioning (murmur2),
1493             headers, fire-and-forget (acks=0)
1494              
1495             =item * Consumer: manual partition assignment, offset tracking, poll-based
1496             message delivery
1497              
1498             =item * Consumer groups: JoinGroup/SyncGroup/Heartbeat, sticky
1499             partition assignment, offset commit/fetch, automatic rebalancing
1500              
1501             =item * TLS (OpenSSL) and SASL/PLAIN authentication
1502              
1503             =item * Automatic reconnection at the connection layer
1504              
1505             =item * Bootstrap broker failover (tries all listed brokers)
1506              
1507             =back
1508              
1509             =head1 ANYEVENT INTEGRATION
1510              
1511             L has EV as one of its backends, so EV::Kafka can be used
1512             in AnyEvent applications seamlessly.
1513              
1514             =head1 NO UTF-8 SUPPORT
1515              
1516             This module handles all values as bytes. Encode your UTF-8 strings
1517             before passing them:
1518              
1519             use Encode;
1520              
1521             $kafka->produce($topic, $key, encode_utf8($val), sub { ... });
1522              
1523             =head1 CLUSTER CLIENT METHODS
1524              
1525             =head2 new(%options)
1526              
1527             Create a new EV::Kafka client. Returns a blessed C
1528             object.
1529              
1530             my $kafka = EV::Kafka->new(
1531             brokers => '10.0.0.1:9092,10.0.0.2:9092',
1532             acks => -1,
1533             on_error => sub { warn @_ },
1534             );
1535              
1536             Options:
1537              
1538             =over
1539              
1540             =item brokers => 'Str'
1541              
1542             Comma-separated list of bootstrap broker addresses (host:port).
1543             Default: C<127.0.0.1:9092>.
1544              
1545             =item client_id => 'Str' (default 'ev-kafka')
1546              
1547             Client identifier sent to brokers.
1548              
1549             =item tls => Bool
1550              
1551             Enable TLS encryption.
1552              
1553             =item tls_ca_file => 'Str'
1554              
1555             Path to CA certificate file for TLS verification.
1556              
1557             =item tls_skip_verify => Bool
1558              
1559             Skip TLS certificate verification.
1560              
1561             =item sasl => \%opts
1562              
1563             Enable SASL authentication. Supports PLAIN mechanism:
1564              
1565             sasl => { mechanism => 'PLAIN', username => 'user', password => 'pass' }
1566              
1567             =item acks => Int (default -1)
1568              
1569             Producer acknowledgment mode. C<-1> = all in-sync replicas, C<0> = no
1570             acknowledgment (fire-and-forget), C<1> = leader only.
1571              
1572             =item linger_ms => Int (default 5)
1573              
1574             Time in milliseconds to accumulate records before flushing a batch.
1575             Lower values reduce latency; higher values improve throughput.
1576              
1577             =item batch_size => Int (default 16384)
1578              
1579             Maximum batch size in bytes before a batch is flushed immediately.
1580              
1581             =item compression => 'Str'
1582              
1583             Compression type for produce batches: C<'lz4'> (requires liblz4),
1584             C<'gzip'> (requires zlib), or C for none.
1585              
1586             =item idempotent => Bool (default 0)
1587              
1588             Enable idempotent producer. Calls C on connect and
1589             sets producer_id/epoch/sequence in each RecordBatch for exactly-once
1590             delivery (broker-side deduplication).
1591              
1592             =item transactional_id => 'Str'
1593              
1594             Enable transactional producer. Implies idempotent. Required for
1595             C/C/C
1596             and C (full EOS).
1597              
1598             =item partitioner => $cb->($topic, $key, $num_partitions)
1599              
1600             Custom partition selection function. Default: murmur2 hash of key,
1601             or round-robin for null keys.
1602              
1603             =item on_error => $cb->($errstr)
1604              
1605             Error callback. Default: C.
1606              
1607             =item on_connect => $cb->()
1608              
1609             Called once after initial metadata fetch completes.
1610              
1611             =item on_message => $cb->($topic, $partition, $offset, $key, $value, $headers)
1612              
1613             Message delivery callback for consumer operations.
1614              
1615             =item fetch_max_wait_ms => Int (default 500)
1616              
1617             Maximum time the broker waits for C of data.
1618              
1619             =item fetch_max_bytes => Int (default 1048576)
1620              
1621             Maximum bytes per fetch response.
1622              
1623             =item fetch_min_bytes => Int (default 1)
1624              
1625             Minimum bytes before the broker responds to a fetch.
1626              
1627             =item metadata_refresh => Int (default 300)
1628              
1629             Metadata refresh interval in seconds (reserved, not yet wired).
1630              
1631             =item loop => EV::Loop
1632              
1633             EV loop to use. Default: C.
1634              
1635             =back
1636              
1637             =head2 connect($cb)
1638              
1639             Connect to the cluster. Connects to the first available bootstrap
1640             broker, fetches cluster metadata, then fires C<$cb->($metadata)>.
1641              
1642             $kafka->connect(sub {
1643             my $meta = shift;
1644             # $meta->{brokers}, $meta->{topics}
1645             });
1646              
1647             =head2 produce($topic, $key, $value, [\%opts,] [$cb])
1648              
1649             Produce a message. Routes to the correct partition leader automatically.
1650              
1651             # with callback (acks=1 or acks=-1)
1652             $kafka->produce('topic', 'key', 'value', sub {
1653             my ($result, $err) = @_;
1654             });
1655              
1656             # with headers
1657             $kafka->produce('topic', 'key', 'value',
1658             { headers => { 'h1' => 'v1' } }, sub { ... });
1659              
1660             # fire-and-forget (acks=0)
1661             $kafka->produce('topic', 'key', 'value');
1662              
1663             # explicit partition
1664             $kafka->produce('topic', 'key', 'value',
1665             { partition => 3 }, sub { ... });
1666              
1667             =head2 produce_many(\@messages, $cb)
1668              
1669             Produce multiple messages with a single completion callback. Each
1670             message is an arrayref C<[$topic, $key, $value]> or a hashref
1671             C<{topic, key, value}>. C<$cb> fires when all messages are acknowledged.
1672              
1673             $kafka->produce_many([
1674             ['my-topic', 'k1', 'v1'],
1675             ['my-topic', 'k2', 'v2'],
1676             ], sub {
1677             my $errors = shift;
1678             warn "some failed: @$errors" if $errors;
1679             });
1680              
1681             =head2 flush([$cb])
1682              
1683             Flush all accumulated produce batches and wait for all in-flight
1684             requests to complete. C<$cb> fires when all pending responses have
1685             been received.
1686              
1687             =head2 assign(\@partitions)
1688              
1689             Manually assign partitions for consuming.
1690              
1691             $kafka->assign([
1692             { topic => 'my-topic', partition => 0, offset => 0 },
1693             { topic => 'my-topic', partition => 1, offset => 100 },
1694             ]);
1695              
1696             =head2 seek($topic, $partition, $offset, [$cb])
1697              
1698             Seek a partition to a specific offset. Use C<-2> for earliest, C<-1>
1699             for latest. Updates the assignment in-place.
1700              
1701             $kafka->seek('my-topic', 0, -1, sub { print "at latest\n" });
1702              
1703             =head2 offsets_for($topic, $cb)
1704              
1705             Get earliest and latest offsets for all partitions of a topic.
1706              
1707             $kafka->offsets_for('my-topic', sub {
1708             my $offsets = shift;
1709             # { 0 => { earliest => 0, latest => 42 }, 1 => ... }
1710             });
1711              
1712             =head2 lag($cb)
1713              
1714             Get consumer lag for all assigned partitions.
1715              
1716             $kafka->lag(sub {
1717             my $lag = shift;
1718             # { "topic:0" => { current => 10, latest => 42, lag => 32 } }
1719             });
1720              
1721             =head2 error_name($code)
1722              
1723             Convert a Kafka numeric error code to its name.
1724              
1725             EV::Kafka::Client::error_name(3) # "UNKNOWN_TOPIC_OR_PARTITION"
1726              
1727             =head2 poll([$cb])
1728              
1729             Fetch messages from assigned partitions. Calls C for each
1730             received record. C<$cb> fires when all fetch responses have arrived.
1731              
1732             my $timer = EV::timer 0, 0.1, sub { $kafka->poll };
1733              
1734             =head2 subscribe($topic, ..., %opts)
1735              
1736             Join a consumer group and subscribe to topics. The group protocol
1737             handles partition assignment automatically.
1738              
1739             $kafka->subscribe('topic-a', 'topic-b',
1740             group_id => 'my-group',
1741             session_timeout => 30000, # ms
1742             rebalance_timeout => 60000, # ms
1743             heartbeat_interval => 3, # seconds
1744             auto_commit => 1, # commit on unsubscribe (default)
1745             auto_offset_reset => 'earliest', # or 'latest'
1746             group_instance_id => 'pod-abc', # KIP-345 static membership
1747             on_assign => sub {
1748             my $partitions = shift;
1749             # [{topic, partition, offset}, ...]
1750             },
1751             on_revoke => sub {
1752             my $partitions = shift;
1753             },
1754             );
1755              
1756             =head2 commit([$cb])
1757              
1758             Commit current consumer offsets to the group coordinator.
1759              
1760             $kafka->commit(sub {
1761             my $err = shift;
1762             warn "commit failed: $err" if $err;
1763             });
1764              
1765             =head2 unsubscribe([$cb])
1766              
1767             Leave the consumer group (sends LeaveGroup for fast rebalance),
1768             stop heartbeat and fetch loop. If C is enabled,
1769             commits offsets before leaving.
1770              
1771             =head2 begin_transaction
1772              
1773             Start a transaction. Requires C in constructor.
1774              
1775             =head2 send_offsets_to_transaction($group_id, [$cb])
1776              
1777             Commit consumer offsets within the current transaction via
1778             C. This is the key step for exactly-once
1779             consume-process-produce pipelines.
1780              
1781             $kafka->send_offsets_to_transaction('my-group', sub {
1782             my ($result, $err) = @_;
1783             });
1784              
1785             =head2 commit_transaction([$cb])
1786              
1787             Commit the current transaction. All produced messages and offset
1788             commits within the transaction become visible atomically.
1789              
1790             =head2 abort_transaction([$cb])
1791              
1792             Abort the current transaction. All produced messages are discarded
1793             and offset commits are rolled back.
1794              
1795             =head2 close([$cb])
1796              
1797             Graceful shutdown: stop timers, disconnect all broker connections.
1798              
1799             $kafka->close(sub { EV::break });
1800              
1801             =head1 LOW-LEVEL CONNECTION METHODS
1802              
1803             C provides direct access to a single broker connection.
1804             Useful for custom protocols, debugging, or when cluster-level routing
1805             is not needed.
1806              
1807             my $conn = EV::Kafka::Conn::_new('EV::Kafka::Conn', undef);
1808             $conn->on_error(sub { warn @_ });
1809             $conn->on_connect(sub { ... });
1810             $conn->connect('127.0.0.1', 9092, 5.0);
1811              
1812             =head2 connect($host, $port, [$timeout])
1813              
1814             Connect to a broker. Timeout in seconds (0 = no timeout).
1815              
1816             =head2 disconnect
1817              
1818             Disconnect from broker.
1819              
1820             =head2 connected
1821              
1822             Returns true if the connection is ready (ApiVersions handshake complete).
1823              
1824             =head2 metadata(\@topics, $cb)
1825              
1826             Request cluster metadata. Pass C for all topics.
1827              
1828             $conn->metadata(['my-topic'], sub {
1829             my ($result, $err) = @_;
1830             # $result->{brokers}, $result->{topics}
1831             });
1832              
1833             =head2 produce($topic, $partition, $key, $value, [\%opts,] [$cb])
1834              
1835             Produce a message to a specific partition.
1836              
1837             $conn->produce('topic', 0, 'key', 'value', sub {
1838             my ($result, $err) = @_;
1839             });
1840              
1841             Options: C (default 1), C (hashref), C
1842             (epoch ms, default now), C (C<'none'>, C<'lz4'>; requires
1843             LZ4 at build time).
1844              
1845             =head2 produce_batch($topic, $partition, \@records, [\%opts,] [$cb])
1846              
1847             Produce multiple records in a single RecordBatch. Each record is
1848             C<{key, value, headers}>. Options: C, C,
1849             C, C, C.
1850              
1851             $conn->produce_batch('topic', 0, [
1852             { key => 'k1', value => 'v1' },
1853             { key => 'k2', value => 'v2' },
1854             ], sub { my ($result, $err) = @_ });
1855              
1856             =head2 fetch($topic, $partition, $offset, $cb, [$max_bytes])
1857              
1858             Fetch messages from a partition starting at C<$offset>.
1859              
1860             $conn->fetch('topic', 0, 0, sub {
1861             my ($result, $err) = @_;
1862             for my $rec (@{ $result->{topics}[0]{partitions}[0]{records} }) {
1863             printf "offset=%d key=%s value=%s\n",
1864             $rec->{offset}, $rec->{key}, $rec->{value};
1865             }
1866             });
1867              
1868             =head2 fetch_multi(\%topics, $cb, [$max_bytes])
1869              
1870             Multi-partition fetch in a single request. Groups multiple
1871             topic-partitions into one Fetch call to the broker.
1872              
1873             $conn->fetch_multi({
1874             'topic-a' => [{ partition => 0, offset => 10 },
1875             { partition => 1, offset => 20 }],
1876             'topic-b' => [{ partition => 0, offset => 0 }],
1877             }, sub { my ($result, $err) = @_ });
1878              
1879             Used internally by C to batch fetches by broker leader.
1880              
1881             =head2 list_offsets($topic, $partition, $timestamp, $cb)
1882              
1883             Get offsets by timestamp. Use C<-2> for earliest, C<-1> for latest.
1884              
1885             =head2 find_coordinator($key, $cb, [$key_type])
1886              
1887             Find the coordinator broker. C<$key_type>: 0=group (default),
1888             1=transaction.
1889              
1890             =head2 join_group($group_id, $member_id, \@topics, $cb, [$session_timeout_ms, $rebalance_timeout_ms, $group_instance_id])
1891              
1892             Join a consumer group. Pass C<$group_instance_id> for KIP-345 static
1893             membership.
1894              
1895             =head2 sync_group($group_id, $generation_id, $member_id, \@assignments, $cb, [$group_instance_id])
1896              
1897             Synchronize group state after join.
1898              
1899             =head2 heartbeat($group_id, $generation_id, $member_id, $cb, [$group_instance_id])
1900              
1901             Send heartbeat to group coordinator.
1902              
1903             =head2 offset_commit($group_id, $generation_id, $member_id, \@offsets, $cb)
1904              
1905             Commit consumer offsets.
1906              
1907             =head2 offset_fetch($group_id, \@topics, $cb)
1908              
1909             Fetch committed offsets for a consumer group.
1910              
1911             =head2 api_versions
1912              
1913             Returns a hashref of supported API keys to max versions, or undef
1914             if not yet negotiated.
1915              
1916             my $vers = $conn->api_versions;
1917             # { 0 => 7, 1 => 11, 3 => 8, ... }
1918              
1919             =head2 on_error([$cb])
1920              
1921             =head2 on_connect([$cb])
1922              
1923             =head2 on_disconnect([$cb])
1924              
1925             Set handler callbacks. Pass C to clear.
1926              
1927             =head2 client_id($id)
1928              
1929             Set the client identifier.
1930              
1931             =head2 tls($enable, [$ca_file, $skip_verify])
1932              
1933             Configure TLS.
1934              
1935             =head2 sasl($mechanism, [$username, $password])
1936              
1937             Configure SASL authentication.
1938              
1939             =head2 auto_reconnect($enable, [$delay_ms])
1940              
1941             Enable automatic reconnection with delay in milliseconds (default 1000).
1942              
1943             =head2 leave_group($group_id, $member_id, $cb)
1944              
1945             Send LeaveGroup to coordinator for fast partition rebalance.
1946              
1947             =head2 create_topics(\@topics, $timeout_ms, $cb)
1948              
1949             Create topics. Each element: C<{name, num_partitions, replication_factor}>.
1950              
1951             $conn->create_topics(
1952             [{ name => 'new-topic', num_partitions => 3, replication_factor => 1 }],
1953             5000, sub { my ($res, $err) = @_ }
1954             );
1955              
1956             =head2 delete_topics(\@topic_names, $timeout_ms, $cb)
1957              
1958             Delete topics by name.
1959              
1960             =head2 init_producer_id($transactional_id, $txn_timeout_ms, $cb)
1961              
1962             Initialize a producer ID for idempotent/transactional produce.
1963             Pass C for non-transactional idempotent producer.
1964              
1965             =head2 add_partitions_to_txn($txn_id, $producer_id, $epoch, \@topics, $cb)
1966              
1967             Register partitions with the transaction coordinator.
1968              
1969             =head2 end_txn($txn_id, $producer_id, $epoch, $committed, $cb)
1970              
1971             Commit (C<$committed=1>) or abort (C<$committed=0>) a transaction.
1972              
1973             =head2 txn_offset_commit($txn_id, $group_id, $producer_id, $epoch, $generation, $member_id, \@offsets, $cb)
1974              
1975             Commit consumer offsets within a transaction (API 28).
1976              
1977             =head2 pending
1978              
1979             Number of requests awaiting broker response.
1980              
1981             =head2 state
1982              
1983             Connection state as integer (0=disconnected, 6=ready).
1984              
1985             =head1 UTILITY FUNCTIONS
1986              
1987             =head2 EV::Kafka::_murmur2($key)
1988              
1989             Kafka-compatible murmur2 hash. Returns a non-negative 31-bit integer.
1990              
1991             =head2 EV::Kafka::_crc32c($data)
1992              
1993             CRC32C checksum (Castagnoli). Used internally for RecordBatch integrity.
1994              
1995             =head2 EV::Kafka::_error_name($code)
1996              
1997             Convert Kafka error code to string name.
1998              
1999             =head1 RESULT STRUCTURES
2000              
2001             =head2 Produce result
2002              
2003             $result = {
2004             topics => [{
2005             topic => 'name',
2006             partitions => [{
2007             partition => 0,
2008             error_code => 0,
2009             base_offset => 42,
2010             }],
2011             }],
2012             };
2013              
2014             =head2 Fetch result
2015              
2016             $result = {
2017             topics => [{
2018             topic => 'name',
2019             partitions => [{
2020             partition => 0,
2021             error_code => 0,
2022             high_watermark => 100,
2023             records => [{
2024             offset => 42,
2025             timestamp => 1712345678000,
2026             key => 'key', # or undef
2027             value => 'value', # or undef
2028             headers => { h => 'v' }, # if present
2029             }],
2030             }],
2031             }],
2032             };
2033              
2034             =head2 Metadata result
2035              
2036             $result = {
2037             controller_id => 0,
2038             brokers => [{ node_id => 0, host => '10.0.0.1', port => 9092 }],
2039             topics => [{
2040             name => 'topic',
2041             error_code => 0,
2042             partitions => [{
2043             partition => 0,
2044             leader => 0,
2045             error_code => 0,
2046             }],
2047             }],
2048             };
2049              
2050             =head1 ERROR HANDLING
2051              
2052             Errors are delivered through two channels:
2053              
2054             =over
2055              
2056             =item B fire the C callback (or
2057             C if none set). These include connection refused, DNS failure,
2058             TLS errors, SASL auth failure, and protocol violations.
2059              
2060             =item B are delivered as the second argument to
2061             the request callback: C<$cb-E($result, $error)>. If C<$error> is
2062             defined, C<$result> may be undef.
2063              
2064             =back
2065              
2066             Within result structures, per-partition C fields use Kafka
2067             numeric codes:
2068              
2069             0 No error
2070             1 OFFSET_OUT_OF_RANGE
2071             3 UNKNOWN_TOPIC_OR_PARTITION
2072             6 NOT_LEADER_OR_FOLLOWER
2073             15 COORDINATOR_NOT_AVAILABLE
2074             16 NOT_COORDINATOR
2075             25 UNKNOWN_MEMBER_ID
2076             27 REBALANCE_IN_PROGRESS
2077             36 TOPIC_ALREADY_EXISTS
2078             79 MEMBER_ID_REQUIRED
2079              
2080             When a broker disconnects mid-flight, all pending callbacks receive
2081             C<(undef, "connection closed by broker")> or C<(undef, "disconnected")>.
2082              
2083             =head1 ENVIRONMENT VARIABLES
2084              
2085             These are used by tests and examples (not by the module itself):
2086              
2087             TEST_KAFKA_BROKER broker address for tests (host:port)
2088             KAFKA_BROKER broker address for examples
2089             KAFKA_HOST broker hostname for low-level examples
2090             KAFKA_PORT broker port for low-level examples
2091             KAFKA_TOPIC topic name for examples
2092             KAFKA_GROUP_ID consumer group for examples
2093             KAFKA_LIMIT message limit for consume example
2094             KAFKA_COUNT message count for fire-and-forget
2095             BENCH_BROKER broker for benchmarks
2096             BENCH_MESSAGES message count for benchmarks
2097             BENCH_VALUE_SIZE value size in bytes for benchmarks
2098             BENCH_TOPIC topic name for benchmarks
2099              
2100             =head1 QUICK START
2101              
2102             Minimal producer + consumer lifecycle:
2103              
2104             use EV;
2105             use EV::Kafka;
2106              
2107             my $kafka = EV::Kafka->new(
2108             brokers => '127.0.0.1:9092',
2109             acks => 1,
2110             on_error => sub { warn "kafka: @_\n" },
2111             on_message => sub {
2112             my ($topic, $part, $offset, $key, $value) = @_;
2113             print "got: $key=$value\n";
2114             },
2115             );
2116              
2117             $kafka->connect(sub {
2118             # produce
2119             $kafka->produce('test', 'k1', 'hello', sub {
2120             print "produced\n";
2121              
2122             # consume from the beginning
2123             $kafka->assign([{topic=>'test', partition=>0, offset=>0}]);
2124             $kafka->seek('test', 0, -2, sub {
2125             my $t = EV::timer 0, 0.1, sub { $kafka->poll };
2126             $kafka->{cfg}{_t} = $t;
2127             });
2128             });
2129             });
2130              
2131             EV::run;
2132              
2133             =head1 COOKBOOK
2134              
2135             =head2 Produce JSON with headers
2136              
2137             use JSON::PP;
2138             my $json = JSON::PP->new->utf8;
2139              
2140             $kafka->produce('events', 'user-42',
2141             $json->encode({ action => 'click', page => '/home' }),
2142             { headers => { 'content-type' => 'application/json' } },
2143             sub { ... }
2144             );
2145              
2146             =head2 Consume from latest offset only
2147              
2148             $kafka->subscribe('live-feed',
2149             group_id => 'realtime',
2150             auto_offset_reset => 'latest',
2151             on_assign => sub { print "ready\n" },
2152             );
2153              
2154             =head2 Graceful shutdown
2155              
2156             $SIG{INT} = sub {
2157             $kafka->commit(sub {
2158             $kafka->unsubscribe(sub {
2159             $kafka->close(sub { EV::break });
2160             });
2161             });
2162             };
2163              
2164             =head2 At-least-once processing
2165              
2166             $kafka->subscribe('jobs',
2167             group_id => 'workers',
2168             auto_commit => 0,
2169             );
2170              
2171             # in on_message: process, then commit
2172             on_message => sub {
2173             process($_[4]);
2174             $kafka->commit if ++$count % 100 == 0;
2175             },
2176              
2177             =head2 Batch produce
2178              
2179             $kafka->produce_many([
2180             ['events', 'k1', 'v1'],
2181             ['events', 'k2', 'v2'],
2182             ['events', 'k3', 'v3'],
2183             ], sub {
2184             my $errs = shift;
2185             print $errs ? "some failed\n" : "all done\n";
2186             });
2187              
2188             =head2 Exactly-once stream processing (EOS)
2189              
2190             my $kafka = EV::Kafka->new(
2191             brokers => '...',
2192             transactional_id => 'my-eos-app',
2193             acks => -1,
2194             on_message => sub {
2195             my ($t, $p, $off, $key, $value) = @_;
2196             my $result = process($value);
2197             $kafka->produce('output-topic', $key, $result);
2198             },
2199             );
2200              
2201             # consume-process-produce loop:
2202             $kafka->begin_transaction;
2203             $kafka->poll(sub {
2204             $kafka->send_offsets_to_transaction('my-group', sub {
2205             $kafka->commit_transaction(sub {
2206             $kafka->begin_transaction; # next transaction
2207             });
2208             });
2209             });
2210              
2211             =head2 Topic administration
2212              
2213             my $conn = EV::Kafka::Conn::_new('EV::Kafka::Conn', undef);
2214             $conn->on_connect(sub {
2215             $conn->create_topics(
2216             [{ name => 'new-topic', num_partitions => 6, replication_factor => 3 }],
2217             10000, sub { ... }
2218             );
2219             });
2220              
2221             =head1 BENCHMARKS
2222              
2223             Measured on Linux with TCP loopback to Redpanda, 100-byte values,
2224             Perl 5.40.2, 50K messages (C):
2225              
2226             Pipeline produce (acks=1) 68K msg/sec 7.4 MB/s
2227             Fire-and-forget (acks=0) 100K msg/sec 11.0 MB/s
2228             Fetch throughput 31K msg/sec 3.4 MB/s
2229             Sequential round-trip 19K msg/sec 54 us avg latency
2230             Metadata request 25K req/sec 41 us avg latency
2231              
2232             Throughput by value size (pipelined, acks=1):
2233              
2234             10 bytes 61K msg/sec 0.9 MB/s
2235             100 bytes 68K msg/sec 7.4 MB/s
2236             1000 bytes 50K msg/sec 50.2 MB/s
2237             10000 bytes 18K msg/sec 178.5 MB/s
2238              
2239             Pipeline produce throughput is limited by Perl callback overhead per
2240             message. Fire-and-forget mode (C) skips the response cycle
2241             entirely, reaching ~100K msg/sec. Sequential round-trip (one produce,
2242             wait for ack, repeat) measures raw broker latency at ~54 microseconds.
2243              
2244             The fetch path is sequential (fetch, process, fetch again) which
2245             introduces one round-trip per batch. With larger C and
2246             dense topics, fetch throughput increases proportionally.
2247              
2248             Run C for throughput results. Set
2249             C, C, C, and
2250             C to customize.
2251              
2252             Run C for a latency histogram with percentiles
2253             (min, avg, median, p90, p95, p99, max).
2254              
2255             =head1 KAFKA PROTOCOL
2256              
2257             This module implements the Kafka binary protocol directly in XS.
2258             All integers are big-endian. Requests use a 4-byte size prefix
2259             followed by a header (API key, version, correlation ID, client ID)
2260             and a version-specific body.
2261              
2262             Responses are matched to requests by correlation ID. The broker
2263             guarantees FIFO ordering per connection, so the response queue is
2264             a simple FIFO.
2265              
2266             RecordBatch encoding (magic=2) is used for produce. CRC32C covers
2267             the batch from attributes through the last record. Records use
2268             ZigZag-encoded varints for lengths and deltas.
2269              
2270             The connection handshake sends ApiVersions (v0) on connect to
2271             discover supported protocol versions. SASL authentication uses
2272             SaslHandshake (v1) + SaslAuthenticate (v2) with PLAIN mechanism.
2273              
2274             Consumer group protocol uses sticky partition assignment with
2275             MEMBER_ID_REQUIRED (error 79) retry per KIP-394.
2276              
2277             Non-flexible API versions are used throughout (capped below the
2278             flexible-version threshold for each API) to avoid the compact
2279             encoding complexity.
2280              
2281             =head1 LIMITATIONS
2282              
2283             =over
2284              
2285             =item * B -- supported when built with
2286             liblz4 and zlib. snappy and zstd are not implemented.
2287              
2288             =item * B -- C,
2289             C, C,
2290             C provide full exactly-once stream processing.
2291             C, C, C, C
2292             are all wired. Requires C in constructor.
2293              
2294             =item * B -- SASL/PLAIN and SCRAM-SHA-256/512
2295             are supported. GSSAPI (Kerberos) and OAUTHBEARER are not implemented.
2296              
2297             =item * B -- assignments are preserved
2298             across rebalances where possible. New partitions are distributed to
2299             the least-loaded member. Overloaded members shed excess partitions.
2300              
2301             =item * B -- C is called
2302             synchronously in C. For fully non-blocking
2303             operation, use IP addresses instead of hostnames.
2304              
2305             =item * B -- all API versions are capped
2306             below the flexible-version threshold to avoid compact string/array
2307             encoding. This limits interoperability with very new protocol features
2308             but works with all Kafka 0.11+ and Redpanda brokers.
2309              
2310             =item * B -- transient errors (NOT_LEADER,
2311             COORDINATOR_NOT_AVAILABLE) trigger metadata refresh and up to 3
2312             retries with backoff. Non-retriable errors are surfaced to the
2313             callback immediately.
2314              
2315             =back
2316              
2317             =head1 AUTHOR
2318              
2319             vividsnow
2320              
2321             =head1 LICENSE
2322              
2323             This library is free software; you can redistribute it and/or modify it
2324             under the same terms as Perl itself.
2325              
2326             =cut
2327