| line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
|
1
|
|
|
|
|
|
|
package EV::Kafka; |
|
2
|
15
|
|
|
15
|
|
1545727
|
use strict; |
|
|
15
|
|
|
|
|
22
|
|
|
|
15
|
|
|
|
|
540
|
|
|
3
|
15
|
|
|
15
|
|
58
|
use warnings; |
|
|
15
|
|
|
|
|
78
|
|
|
|
15
|
|
|
|
|
716
|
|
|
4
|
15
|
|
|
15
|
|
3333
|
use EV; |
|
|
15
|
|
|
|
|
13911
|
|
|
|
15
|
|
|
|
|
361
|
|
|
5
|
|
|
|
|
|
|
|
|
6
|
|
|
|
|
|
|
BEGIN { |
|
7
|
15
|
|
|
15
|
|
63
|
use XSLoader; |
|
|
15
|
|
|
|
|
41
|
|
|
|
15
|
|
|
|
|
676
|
|
|
8
|
15
|
|
|
15
|
|
40
|
our $VERSION = '0.01'; |
|
9
|
15
|
|
|
|
|
39675
|
XSLoader::load __PACKAGE__, $VERSION; |
|
10
|
|
|
|
|
|
|
} |
|
11
|
|
|
|
|
|
|
|
|
12
|
|
|
|
|
|
|
sub new { |
|
13
|
5
|
|
|
5
|
1
|
580625
|
my ($class, %opts) = @_; |
|
14
|
|
|
|
|
|
|
|
|
15
|
5
|
|
|
|
|
14
|
my $loop = delete $opts{loop}; |
|
16
|
5
|
|
|
|
|
52
|
my $self = $class->_new($loop); |
|
17
|
|
|
|
|
|
|
|
|
18
|
|
|
|
|
|
|
# Parse brokers |
|
19
|
5
|
|
100
|
|
|
23
|
my $brokers = delete $opts{brokers} // '127.0.0.1:9092'; |
|
20
|
5
|
|
|
|
|
14
|
my @bootstrap; |
|
21
|
5
|
|
|
|
|
20
|
for my $b (split /,/, $brokers) { |
|
22
|
7
|
|
|
|
|
46
|
$b =~ s/^\s+|\s+$//g; |
|
23
|
7
|
|
|
|
|
22
|
my ($h, $p) = split /:/, $b, 2; |
|
24
|
7
|
|
100
|
|
|
38
|
$p //= 9092; |
|
25
|
7
|
|
|
|
|
32
|
push @bootstrap, [$h, $p + 0]; |
|
26
|
|
|
|
|
|
|
} |
|
27
|
|
|
|
|
|
|
|
|
28
|
|
|
|
|
|
|
# Store config |
|
29
|
|
|
|
|
|
|
my $cfg = { |
|
30
|
|
|
|
|
|
|
bootstrap => \@bootstrap, |
|
31
|
|
|
|
|
|
|
client_id => delete $opts{client_id} // 'ev-kafka', |
|
32
|
|
|
|
|
|
|
tls => delete $opts{tls} // 0, |
|
33
|
|
|
|
|
|
|
tls_ca_file => delete $opts{tls_ca_file}, |
|
34
|
|
|
|
|
|
|
tls_skip_verify => delete $opts{tls_skip_verify} // 0, |
|
35
|
|
|
|
|
|
|
sasl => delete $opts{sasl}, |
|
36
|
0
|
|
|
0
|
|
0
|
on_error => delete $opts{on_error} // sub { die "EV::Kafka: @_\n" }, |
|
37
|
|
|
|
|
|
|
on_connect => delete $opts{on_connect}, |
|
38
|
|
|
|
|
|
|
on_message => delete $opts{on_message}, |
|
39
|
|
|
|
|
|
|
acks => delete $opts{acks} // -1, |
|
40
|
|
|
|
|
|
|
linger_ms => delete $opts{linger_ms} // 5, |
|
41
|
|
|
|
|
|
|
batch_size => delete $opts{batch_size} // 16384, |
|
42
|
|
|
|
|
|
|
partitioner => delete $opts{partitioner}, |
|
43
|
|
|
|
|
|
|
compression => delete $opts{compression}, # 'lz4', 'gzip', or undef |
|
44
|
|
|
|
|
|
|
idempotent => delete $opts{idempotent} // 0, |
|
45
|
|
|
|
|
|
|
transactional_id => delete $opts{transactional_id}, # enables transactions |
|
46
|
|
|
|
|
|
|
fetch_max_wait_ms => delete $opts{fetch_max_wait_ms} // 500, |
|
47
|
|
|
|
|
|
|
fetch_max_bytes => delete $opts{fetch_max_bytes} // 1048576, |
|
48
|
|
|
|
|
|
|
fetch_min_bytes => delete $opts{fetch_min_bytes} // 1, |
|
49
|
5
|
|
50
|
|
|
333
|
metadata_refresh => delete $opts{metadata_refresh} // 300, |
|
|
|
|
50
|
|
|
|
|
|
|
|
|
50
|
|
|
|
|
|
|
|
|
33
|
|
|
|
|
|
|
|
|
100
|
|
|
|
|
|
|
|
|
50
|
|
|
|
|
|
|
|
|
50
|
|
|
|
|
|
|
|
|
50
|
|
|
|
|
|
|
|
|
50
|
|
|
|
|
|
|
|
|
50
|
|
|
|
|
|
|
|
|
50
|
|
|
|
|
|
|
|
|
50
|
|
|
|
|
|
50
|
|
|
|
|
|
|
}; |
|
51
|
|
|
|
|
|
|
|
|
52
|
|
|
|
|
|
|
# Internal state |
|
53
|
5
|
|
|
|
|
16
|
$cfg->{conns} = {}; # node_id => EV::Kafka::Conn |
|
54
|
5
|
|
|
|
|
18
|
$cfg->{meta} = undef; # latest metadata response |
|
55
|
5
|
|
|
|
|
12
|
$cfg->{leaders} = {}; # "topic:partition" => node_id |
|
56
|
5
|
|
|
|
|
13
|
$cfg->{broker_map}= {}; # node_id => {host, port} |
|
57
|
5
|
|
|
|
|
8
|
$cfg->{connected} = 0; |
|
58
|
5
|
|
|
|
|
11
|
$cfg->{meta_pending} = 0; |
|
59
|
5
|
|
|
|
|
9
|
$cfg->{pending_ops} = []; # ops waiting for metadata |
|
60
|
|
|
|
|
|
|
|
|
61
|
|
|
|
|
|
|
# Producer state |
|
62
|
5
|
|
|
|
|
12
|
$cfg->{batches} = {}; # "topic:partition" => [{rec, cb}] |
|
63
|
5
|
|
|
|
|
8
|
$cfg->{next_sequence} = {}; # "topic:partition" => next sequence number |
|
64
|
5
|
|
|
|
|
12
|
$cfg->{producer_id} = -1; |
|
65
|
5
|
|
|
|
|
23
|
$cfg->{producer_epoch} = -1; |
|
66
|
5
|
|
|
|
|
13
|
$cfg->{rr_counter} = 0; |
|
67
|
|
|
|
|
|
|
|
|
68
|
|
|
|
|
|
|
# Consumer state |
|
69
|
5
|
|
|
|
|
8
|
$cfg->{assignments} = []; # [{topic, partition, offset}] |
|
70
|
5
|
|
|
|
|
9
|
$cfg->{fetch_active} = 0; |
|
71
|
5
|
|
|
|
|
10
|
$cfg->{group} = undef; |
|
72
|
|
|
|
|
|
|
|
|
73
|
5
|
|
|
|
|
44
|
bless { xs => $self, cfg => $cfg }, "${class}::Client"; |
|
74
|
|
|
|
|
|
|
} |
|
75
|
|
|
|
|
|
|
|
|
76
|
|
|
|
|
|
|
package EV::Kafka::Client; |
|
77
|
15
|
|
|
15
|
|
109
|
use EV; |
|
|
15
|
|
|
|
|
20
|
|
|
|
15
|
|
|
|
|
351
|
|
|
78
|
15
|
|
|
15
|
|
61
|
use Scalar::Util 'weaken'; |
|
|
15
|
|
|
|
|
30
|
|
|
|
15
|
|
|
|
|
139464
|
|
|
79
|
|
|
|
|
|
|
|
|
80
|
|
|
|
|
|
|
sub _any_conn { |
|
81
|
1
|
|
|
1
|
|
2
|
my ($self) = @_; |
|
82
|
1
|
|
|
|
|
3
|
my $cfg = $self->{cfg}; |
|
83
|
1
|
|
|
|
|
2
|
my $conn = $cfg->{bootstrap_conn}; |
|
84
|
1
|
|
|
|
|
2
|
for my $c (values %{$cfg->{conns}}) { |
|
|
1
|
|
|
|
|
5
|
|
|
85
|
0
|
0
|
|
|
|
0
|
if ($c->connected) { $conn = $c; last } |
|
|
0
|
|
|
|
|
0
|
|
|
|
0
|
|
|
|
|
0
|
|
|
86
|
|
|
|
|
|
|
} |
|
87
|
1
|
50
|
33
|
|
|
7
|
return ($conn && $conn->connected) ? $conn : undef; |
|
88
|
|
|
|
|
|
|
} |
|
89
|
|
|
|
|
|
|
|
|
90
|
|
|
|
|
|
|
sub _get_or_create_conn { |
|
91
|
0
|
|
|
0
|
|
0
|
my ($self, $node_id) = @_; |
|
92
|
0
|
|
|
|
|
0
|
my $cfg = $self->{cfg}; |
|
93
|
0
|
0
|
|
|
|
0
|
return $cfg->{conns}{$node_id} if $cfg->{conns}{$node_id}; |
|
94
|
|
|
|
|
|
|
|
|
95
|
0
|
|
|
|
|
0
|
my $info = $cfg->{broker_map}{$node_id}; |
|
96
|
0
|
0
|
|
|
|
0
|
return undef unless $info; |
|
97
|
|
|
|
|
|
|
|
|
98
|
0
|
|
|
|
|
0
|
my $conn = EV::Kafka::Conn::_new('EV::Kafka::Conn', undef); |
|
99
|
0
|
|
|
|
|
0
|
$self->_configure_conn($conn); |
|
100
|
|
|
|
|
|
|
|
|
101
|
0
|
|
|
|
|
0
|
$cfg->{conns}{$node_id} = $conn; |
|
102
|
0
|
|
|
|
|
0
|
weaken(my $weak = $self); |
|
103
|
|
|
|
|
|
|
$conn->on_connect(sub { |
|
104
|
0
|
0
|
|
0
|
|
0
|
$weak->_drain_pending_for($node_id) if $weak; |
|
105
|
0
|
|
|
|
|
0
|
}); |
|
106
|
0
|
|
|
|
|
0
|
$conn->connect($info->{host}, $info->{port}, 10.0); |
|
107
|
|
|
|
|
|
|
|
|
108
|
0
|
|
|
|
|
0
|
return $conn; |
|
109
|
|
|
|
|
|
|
} |
|
110
|
|
|
|
|
|
|
|
|
111
|
|
|
|
|
|
|
sub _configure_conn { |
|
112
|
0
|
|
|
0
|
|
0
|
my ($self, $conn) = @_; |
|
113
|
0
|
|
|
|
|
0
|
my $cfg = $self->{cfg}; |
|
114
|
0
|
|
|
|
|
0
|
$conn->client_id($cfg->{client_id}); |
|
115
|
0
|
0
|
|
|
|
0
|
if ($cfg->{tls}) { |
|
116
|
0
|
|
|
|
|
0
|
$conn->tls(1, $cfg->{tls_ca_file}, $cfg->{tls_skip_verify}); |
|
117
|
|
|
|
|
|
|
} |
|
118
|
0
|
0
|
|
|
|
0
|
if ($cfg->{sasl}) { |
|
119
|
0
|
|
|
|
|
0
|
$conn->sasl($cfg->{sasl}{mechanism}, $cfg->{sasl}{username}, $cfg->{sasl}{password}); |
|
120
|
|
|
|
|
|
|
} |
|
121
|
0
|
|
|
|
|
0
|
weaken(my $weak_cfg = $cfg); |
|
122
|
|
|
|
|
|
|
$conn->on_error(sub { |
|
123
|
0
|
0
|
0
|
0
|
|
0
|
$weak_cfg->{on_error}->($_[0]) if $weak_cfg && $weak_cfg->{on_error}; |
|
124
|
0
|
|
|
|
|
0
|
}); |
|
125
|
|
|
|
|
|
|
} |
|
126
|
|
|
|
|
|
|
|
|
127
|
|
|
|
|
|
|
sub _bootstrap_connect { |
|
128
|
0
|
|
|
0
|
|
0
|
my ($self, $cb) = @_; |
|
129
|
0
|
|
|
|
|
0
|
my $cfg = $self->{cfg}; |
|
130
|
0
|
|
|
|
|
0
|
my @bs = @{$cfg->{bootstrap}}; |
|
|
0
|
|
|
|
|
0
|
|
|
131
|
0
|
|
|
|
|
0
|
my $idx = 0; |
|
132
|
|
|
|
|
|
|
|
|
133
|
0
|
|
|
|
|
0
|
my $try; $try = sub { |
|
134
|
0
|
0
|
|
0
|
|
0
|
if ($idx >= @bs) { |
|
135
|
0
|
|
|
|
|
0
|
undef $try; |
|
136
|
0
|
0
|
|
|
|
0
|
$cfg->{on_error}->("all bootstrap brokers unreachable") if $cfg->{on_error}; |
|
137
|
0
|
|
|
|
|
0
|
return; |
|
138
|
|
|
|
|
|
|
} |
|
139
|
0
|
|
|
|
|
0
|
my ($host, $port) = @{$bs[$idx++]}; |
|
|
0
|
|
|
|
|
0
|
|
|
140
|
0
|
|
|
|
|
0
|
my $conn = EV::Kafka::Conn::_new('EV::Kafka::Conn', undef); |
|
141
|
0
|
|
|
|
|
0
|
$self->_configure_conn($conn); |
|
142
|
|
|
|
|
|
|
|
|
143
|
|
|
|
|
|
|
$conn->on_error(sub { |
|
144
|
|
|
|
|
|
|
# try next broker |
|
145
|
0
|
|
|
|
|
0
|
$try->(); |
|
146
|
0
|
|
|
|
|
0
|
}); |
|
147
|
|
|
|
|
|
|
|
|
148
|
|
|
|
|
|
|
$conn->on_connect(sub { |
|
149
|
0
|
|
|
|
|
0
|
undef $try; # break self-reference cycle |
|
150
|
|
|
|
|
|
|
$conn->on_error(sub { |
|
151
|
0
|
0
|
|
|
|
0
|
$cfg->{on_error}->($_[0]) if $cfg->{on_error}; |
|
152
|
0
|
|
|
|
|
0
|
}); |
|
153
|
0
|
|
|
|
|
0
|
$cfg->{bootstrap_conn} = $conn; |
|
154
|
0
|
|
|
|
|
0
|
$cfg->{connected} = 1; |
|
155
|
0
|
|
|
|
|
0
|
$self->_refresh_metadata($cb); |
|
156
|
0
|
|
|
|
|
0
|
}); |
|
157
|
|
|
|
|
|
|
|
|
158
|
0
|
|
|
|
|
0
|
$conn->connect($host, $port, 10.0); |
|
159
|
0
|
|
|
|
|
0
|
}; |
|
160
|
0
|
|
|
|
|
0
|
$try->(); |
|
161
|
|
|
|
|
|
|
} |
|
162
|
|
|
|
|
|
|
|
|
163
|
|
|
|
|
|
|
sub connect { |
|
164
|
0
|
|
|
0
|
|
0
|
my ($self, $cb) = @_; |
|
165
|
0
|
|
|
|
|
0
|
$self->_bootstrap_connect($cb); |
|
166
|
|
|
|
|
|
|
} |
|
167
|
|
|
|
|
|
|
|
|
168
|
|
|
|
|
|
|
sub _merge_metadata { |
|
169
|
0
|
|
|
0
|
|
0
|
my ($cfg, $meta) = @_; |
|
170
|
0
|
|
0
|
|
|
0
|
for my $b (@{$meta->{brokers} // []}) { |
|
|
0
|
|
|
|
|
0
|
|
|
171
|
|
|
|
|
|
|
$cfg->{broker_map}{$b->{node_id}} = { |
|
172
|
|
|
|
|
|
|
host => $b->{host}, port => $b->{port} |
|
173
|
0
|
|
|
|
|
0
|
}; |
|
174
|
|
|
|
|
|
|
} |
|
175
|
0
|
|
0
|
|
|
0
|
for my $t (@{$meta->{topics} // []}) { |
|
|
0
|
|
|
|
|
0
|
|
|
176
|
0
|
0
|
|
|
|
0
|
next if $t->{error_code}; |
|
177
|
0
|
|
0
|
|
|
0
|
for my $p (@{$t->{partitions} // []}) { |
|
|
0
|
|
|
|
|
0
|
|
|
178
|
0
|
|
|
|
|
0
|
$cfg->{leaders}{"$t->{name}:$p->{partition}"} = $p->{leader}; |
|
179
|
|
|
|
|
|
|
} |
|
180
|
|
|
|
|
|
|
} |
|
181
|
|
|
|
|
|
|
} |
|
182
|
|
|
|
|
|
|
|
|
183
|
|
|
|
|
|
|
sub _refresh_metadata { |
|
184
|
1
|
|
|
1
|
|
3
|
my ($self, $cb) = @_; |
|
185
|
1
|
|
|
|
|
2
|
my $cfg = $self->{cfg}; |
|
186
|
1
|
50
|
|
|
|
5
|
return if $cfg->{meta_pending}; |
|
187
|
1
|
|
|
|
|
2
|
$cfg->{meta_pending} = 1; |
|
188
|
|
|
|
|
|
|
|
|
189
|
1
|
|
|
|
|
4
|
my $conn = $self->_any_conn; |
|
190
|
1
|
50
|
|
|
|
3
|
unless ($conn) { $cfg->{meta_pending} = 0; return } |
|
|
1
|
|
|
|
|
3
|
|
|
|
1
|
|
|
|
|
2
|
|
|
191
|
|
|
|
|
|
|
|
|
192
|
|
|
|
|
|
|
$conn->metadata(undef, sub { |
|
193
|
0
|
|
|
0
|
|
0
|
my ($meta, $err) = @_; |
|
194
|
0
|
|
|
|
|
0
|
$cfg->{meta_pending} = 0; |
|
195
|
0
|
0
|
|
|
|
0
|
if ($err) { |
|
196
|
0
|
0
|
|
|
|
0
|
$cfg->{on_error}->("metadata: $err") if $cfg->{on_error}; |
|
197
|
0
|
|
|
|
|
0
|
return; |
|
198
|
|
|
|
|
|
|
} |
|
199
|
|
|
|
|
|
|
|
|
200
|
0
|
|
|
|
|
0
|
$cfg->{meta} = $meta; |
|
201
|
0
|
|
|
|
|
0
|
_merge_metadata($cfg, $meta); |
|
202
|
|
|
|
|
|
|
|
|
203
|
|
|
|
|
|
|
# assign bootstrap_conn a node_id if possible |
|
204
|
0
|
0
|
0
|
|
|
0
|
if ($cfg->{bootstrap_conn} && $meta->{brokers} && @{$meta->{brokers}}) { |
|
|
0
|
|
0
|
|
|
0
|
|
|
205
|
0
|
|
|
|
|
0
|
my $binfo = $meta->{brokers}[0]; |
|
206
|
0
|
|
0
|
|
|
0
|
$cfg->{conns}{$binfo->{node_id}} //= $cfg->{bootstrap_conn}; |
|
207
|
|
|
|
|
|
|
} |
|
208
|
|
|
|
|
|
|
|
|
209
|
0
|
0
|
0
|
|
|
0
|
if (($cfg->{idempotent} || $cfg->{transactional_id}) && $cfg->{producer_id} < 0) { |
|
|
|
|
0
|
|
|
|
|
|
210
|
|
|
|
|
|
|
$self->_init_idempotent(sub { |
|
211
|
0
|
|
|
|
|
0
|
$self->_drain_all_pending; |
|
212
|
0
|
0
|
|
|
|
0
|
$cb->($meta) if $cb; |
|
213
|
0
|
0
|
|
|
|
0
|
$cfg->{on_connect}->() if $cfg->{on_connect}; |
|
214
|
0
|
|
|
|
|
0
|
$cfg->{on_connect} = undef; |
|
215
|
0
|
|
|
|
|
0
|
}); |
|
216
|
|
|
|
|
|
|
} else { |
|
217
|
0
|
|
|
|
|
0
|
$self->_drain_all_pending; |
|
218
|
0
|
0
|
|
|
|
0
|
$cb->($meta) if $cb; |
|
219
|
0
|
0
|
|
|
|
0
|
$cfg->{on_connect}->() if $cfg->{on_connect}; |
|
220
|
0
|
|
|
|
|
0
|
$cfg->{on_connect} = undef; |
|
221
|
|
|
|
|
|
|
} |
|
222
|
0
|
|
|
|
|
0
|
}); |
|
223
|
|
|
|
|
|
|
} |
|
224
|
|
|
|
|
|
|
|
|
225
|
|
|
|
|
|
|
sub _refresh_metadata_for_topic { |
|
226
|
0
|
|
|
0
|
|
0
|
my ($self, $topic) = @_; |
|
227
|
0
|
|
|
|
|
0
|
my $cfg = $self->{cfg}; |
|
228
|
0
|
0
|
|
|
|
0
|
return if $cfg->{meta_pending}; |
|
229
|
0
|
|
|
|
|
0
|
$cfg->{meta_pending} = 1; |
|
230
|
|
|
|
|
|
|
|
|
231
|
0
|
|
|
|
|
0
|
my $conn = $self->_any_conn; |
|
232
|
0
|
0
|
|
|
|
0
|
unless ($conn) { $cfg->{meta_pending} = 0; return } |
|
|
0
|
|
|
|
|
0
|
|
|
|
0
|
|
|
|
|
0
|
|
|
233
|
|
|
|
|
|
|
|
|
234
|
|
|
|
|
|
|
$conn->metadata([$topic], sub { |
|
235
|
0
|
|
|
0
|
|
0
|
my ($meta, $err) = @_; |
|
236
|
0
|
|
|
|
|
0
|
$cfg->{meta_pending} = 0; |
|
237
|
0
|
0
|
|
|
|
0
|
if ($err) { |
|
238
|
0
|
0
|
|
|
|
0
|
$cfg->{on_error}->("metadata: $err") if $cfg->{on_error}; |
|
239
|
0
|
|
|
|
|
0
|
return; |
|
240
|
|
|
|
|
|
|
} |
|
241
|
|
|
|
|
|
|
|
|
242
|
0
|
|
|
|
|
0
|
_merge_metadata($cfg, $meta); |
|
243
|
|
|
|
|
|
|
|
|
244
|
|
|
|
|
|
|
# if topic still has error, retry after delay |
|
245
|
0
|
|
|
|
|
0
|
my $topic_ok = 0; |
|
246
|
0
|
|
0
|
|
|
0
|
for my $t (@{$meta->{topics} // []}) { |
|
|
0
|
|
|
|
|
0
|
|
|
247
|
0
|
0
|
0
|
|
|
0
|
if ($t->{name} eq $topic && !$t->{error_code} && @{$t->{partitions} // []}) { |
|
|
0
|
|
0
|
|
|
0
|
|
|
|
|
|
0
|
|
|
|
|
|
248
|
0
|
|
|
|
|
0
|
$topic_ok = 1; |
|
249
|
0
|
|
|
|
|
0
|
last; |
|
250
|
|
|
|
|
|
|
} |
|
251
|
|
|
|
|
|
|
} |
|
252
|
|
|
|
|
|
|
|
|
253
|
0
|
0
|
|
|
|
0
|
if ($topic_ok) { |
|
254
|
0
|
|
|
|
|
0
|
$self->_drain_all_pending; |
|
255
|
|
|
|
|
|
|
} else { |
|
256
|
|
|
|
|
|
|
# retry after short delay (topic being created) |
|
257
|
0
|
|
|
|
|
0
|
my $t; $t = EV::timer 0.5, 0, sub { |
|
258
|
0
|
|
|
|
|
0
|
undef $t; |
|
259
|
0
|
|
|
|
|
0
|
$self->_refresh_metadata_for_topic($topic); |
|
260
|
0
|
|
|
|
|
0
|
}; |
|
261
|
|
|
|
|
|
|
} |
|
262
|
0
|
|
|
|
|
0
|
}); |
|
263
|
|
|
|
|
|
|
} |
|
264
|
|
|
|
|
|
|
|
|
265
|
|
|
|
|
|
|
sub _init_idempotent { |
|
266
|
0
|
|
|
0
|
|
0
|
my ($self, $cb) = @_; |
|
267
|
0
|
|
|
|
|
0
|
my $cfg = $self->{cfg}; |
|
268
|
|
|
|
|
|
|
|
|
269
|
|
|
|
|
|
|
my $do_init = sub { |
|
270
|
0
|
|
|
0
|
|
0
|
my ($conn) = @_; |
|
271
|
|
|
|
|
|
|
$conn->init_producer_id($cfg->{transactional_id}, 30000, sub { |
|
272
|
0
|
|
|
|
|
0
|
my ($res, $err) = @_; |
|
273
|
0
|
0
|
0
|
|
|
0
|
if (!$err && $res && !$res->{error_code}) { |
|
|
|
|
0
|
|
|
|
|
|
274
|
0
|
|
|
|
|
0
|
$cfg->{producer_id} = $res->{producer_id}; |
|
275
|
0
|
|
|
|
|
0
|
$cfg->{producer_epoch} = $res->{producer_epoch}; |
|
276
|
|
|
|
|
|
|
} else { |
|
277
|
0
|
|
0
|
|
|
0
|
my $msg = $err || "InitProducerId error: " . ($res->{error_code} // '?'); |
|
278
|
0
|
0
|
|
|
|
0
|
$cfg->{on_error}->($msg) if $cfg->{on_error}; |
|
279
|
|
|
|
|
|
|
} |
|
280
|
0
|
0
|
|
|
|
0
|
$cb->() if $cb; |
|
281
|
0
|
|
|
|
|
0
|
}); |
|
282
|
0
|
|
|
|
|
0
|
}; |
|
283
|
|
|
|
|
|
|
|
|
284
|
0
|
0
|
|
|
|
0
|
if ($cfg->{transactional_id}) { |
|
285
|
|
|
|
|
|
|
# transactional: find transaction coordinator first |
|
286
|
0
|
|
|
|
|
0
|
my $conn = $self->_any_conn; |
|
287
|
0
|
0
|
|
|
|
0
|
unless ($conn) { $cb->() if $cb; return } |
|
|
0
|
0
|
|
|
|
0
|
|
|
|
0
|
|
|
|
|
0
|
|
|
288
|
|
|
|
|
|
|
|
|
289
|
|
|
|
|
|
|
my $on_coord = sub { |
|
290
|
0
|
|
|
0
|
|
0
|
my ($res, $err) = @_; |
|
291
|
0
|
0
|
0
|
|
|
0
|
if ($err || ($res->{error_code} && $res->{error_code} != 0)) { |
|
|
|
|
0
|
|
|
|
|
|
292
|
0
|
|
|
|
|
0
|
$do_init->($conn); |
|
293
|
0
|
|
|
|
|
0
|
return; |
|
294
|
|
|
|
|
|
|
} |
|
295
|
|
|
|
|
|
|
# connect to the transaction coordinator |
|
296
|
|
|
|
|
|
|
$cfg->{broker_map}{$res->{node_id}} = { |
|
297
|
|
|
|
|
|
|
host => $res->{host}, port => $res->{port} |
|
298
|
0
|
|
|
|
|
0
|
}; |
|
299
|
0
|
|
|
|
|
0
|
my $txn_conn = $self->_get_or_create_conn($res->{node_id}); |
|
300
|
0
|
0
|
0
|
|
|
0
|
if ($txn_conn && $txn_conn->connected) { |
|
301
|
0
|
|
|
|
|
0
|
$cfg->{_txn_coordinator} = $txn_conn; |
|
302
|
0
|
|
|
|
|
0
|
$do_init->($txn_conn); |
|
303
|
|
|
|
|
|
|
} else { |
|
304
|
0
|
|
|
|
|
0
|
push @{$cfg->{pending_ops}}, { |
|
305
|
|
|
|
|
|
|
node_id => $res->{node_id}, |
|
306
|
|
|
|
|
|
|
run => sub { |
|
307
|
0
|
|
|
|
|
0
|
$cfg->{_txn_coordinator} = $self->_get_or_create_conn($res->{node_id}); |
|
308
|
0
|
|
0
|
|
|
0
|
$do_init->($cfg->{_txn_coordinator} || $conn); |
|
309
|
|
|
|
|
|
|
}, |
|
310
|
0
|
|
|
|
|
0
|
}; |
|
311
|
|
|
|
|
|
|
} |
|
312
|
0
|
|
|
|
|
0
|
}; |
|
313
|
0
|
|
|
|
|
0
|
$conn->find_coordinator($cfg->{transactional_id}, $on_coord, 1); |
|
314
|
|
|
|
|
|
|
} else { |
|
315
|
|
|
|
|
|
|
# idempotent only: any broker works |
|
316
|
0
|
|
|
|
|
0
|
my $conn = $self->_any_conn; |
|
317
|
0
|
0
|
|
|
|
0
|
unless ($conn) { $cb->() if $cb; return } |
|
|
0
|
0
|
|
|
|
0
|
|
|
|
0
|
|
|
|
|
0
|
|
|
318
|
0
|
|
|
|
|
0
|
$do_init->($conn); |
|
319
|
|
|
|
|
|
|
} |
|
320
|
|
|
|
|
|
|
} |
|
321
|
|
|
|
|
|
|
|
|
322
|
|
|
|
|
|
|
sub _drain_pending_for { |
|
323
|
0
|
|
|
0
|
|
0
|
my ($self, $node_id) = @_; |
|
324
|
0
|
|
|
|
|
0
|
my $cfg = $self->{cfg}; |
|
325
|
0
|
|
|
|
|
0
|
my @remaining; |
|
326
|
0
|
|
|
|
|
0
|
for my $op (@{$cfg->{pending_ops}}) { |
|
|
0
|
|
|
|
|
0
|
|
|
327
|
0
|
0
|
0
|
|
|
0
|
if (defined $op->{node_id} && $op->{node_id} == $node_id) { |
|
328
|
0
|
|
|
|
|
0
|
$op->{run}->(); |
|
329
|
|
|
|
|
|
|
} else { |
|
330
|
0
|
|
|
|
|
0
|
push @remaining, $op; |
|
331
|
|
|
|
|
|
|
} |
|
332
|
|
|
|
|
|
|
} |
|
333
|
0
|
|
|
|
|
0
|
$cfg->{pending_ops} = \@remaining; |
|
334
|
|
|
|
|
|
|
} |
|
335
|
|
|
|
|
|
|
|
|
336
|
|
|
|
|
|
|
sub _drain_all_pending { |
|
337
|
0
|
|
|
0
|
|
0
|
my ($self) = @_; |
|
338
|
0
|
|
|
|
|
0
|
my $cfg = $self->{cfg}; |
|
339
|
0
|
|
|
|
|
0
|
my @ops = @{$cfg->{pending_ops}}; |
|
|
0
|
|
|
|
|
0
|
|
|
340
|
0
|
|
|
|
|
0
|
$cfg->{pending_ops} = []; |
|
341
|
0
|
|
|
|
|
0
|
for my $op (@ops) { |
|
342
|
0
|
0
|
|
|
|
0
|
if (defined $op->{node_id}) { |
|
343
|
0
|
|
|
|
|
0
|
my $conn = $self->_get_or_create_conn($op->{node_id}); |
|
344
|
0
|
0
|
0
|
|
|
0
|
if ($conn && $conn->connected) { |
|
345
|
0
|
|
|
|
|
0
|
$op->{run}->(); |
|
346
|
|
|
|
|
|
|
} else { |
|
347
|
0
|
|
|
|
|
0
|
push @{$cfg->{pending_ops}}, $op; |
|
|
0
|
|
|
|
|
0
|
|
|
348
|
|
|
|
|
|
|
} |
|
349
|
|
|
|
|
|
|
} else { |
|
350
|
0
|
|
|
|
|
0
|
$op->{run}->(); |
|
351
|
|
|
|
|
|
|
} |
|
352
|
|
|
|
|
|
|
} |
|
353
|
|
|
|
|
|
|
} |
|
354
|
|
|
|
|
|
|
|
|
355
|
|
|
|
|
|
|
sub _get_leader { |
|
356
|
0
|
|
|
0
|
|
0
|
my ($self, $topic, $partition) = @_; |
|
357
|
0
|
|
|
|
|
0
|
return $self->{cfg}{leaders}{"$topic:$partition"}; |
|
358
|
|
|
|
|
|
|
} |
|
359
|
|
|
|
|
|
|
|
|
360
|
|
|
|
|
|
|
sub _num_partitions { |
|
361
|
0
|
|
|
0
|
|
0
|
my ($self, $topic) = @_; |
|
362
|
0
|
0
|
|
|
|
0
|
my $meta = $self->{cfg}{meta} or return 0; |
|
363
|
0
|
|
0
|
|
|
0
|
for my $t (@{$meta->{topics} // []}) { |
|
|
0
|
|
|
|
|
0
|
|
|
364
|
0
|
0
|
|
|
|
0
|
return scalar @{$t->{partitions}} if $t->{name} eq $topic; |
|
|
0
|
|
|
|
|
0
|
|
|
365
|
|
|
|
|
|
|
} |
|
366
|
0
|
|
|
|
|
0
|
return 0; |
|
367
|
|
|
|
|
|
|
} |
|
368
|
|
|
|
|
|
|
|
|
369
|
|
|
|
|
|
|
sub _select_partition { |
|
370
|
0
|
|
|
0
|
|
0
|
my ($self, $topic, $key) = @_; |
|
371
|
0
|
|
|
|
|
0
|
my $np = $self->_num_partitions($topic); |
|
372
|
0
|
0
|
|
|
|
0
|
return 0 unless $np > 0; |
|
373
|
|
|
|
|
|
|
|
|
374
|
0
|
|
|
|
|
0
|
my $cfg = $self->{cfg}; |
|
375
|
0
|
0
|
|
|
|
0
|
if ($cfg->{partitioner}) { |
|
376
|
0
|
|
|
|
|
0
|
return $cfg->{partitioner}->($topic, $key, $np); |
|
377
|
|
|
|
|
|
|
} |
|
378
|
0
|
0
|
0
|
|
|
0
|
if (defined $key && length $key) { |
|
379
|
0
|
|
|
|
|
0
|
return EV::Kafka::_murmur2($key) % $np; |
|
380
|
|
|
|
|
|
|
} |
|
381
|
0
|
|
|
|
|
0
|
return $cfg->{rr_counter}++ % $np; |
|
382
|
|
|
|
|
|
|
} |
|
383
|
|
|
|
|
|
|
|
|
384
|
|
|
|
|
|
|
# --- Producer --- |
|
385
|
|
|
|
|
|
|
|
|
386
|
|
|
|
|
|
|
sub produce { |
|
387
|
1
|
|
|
1
|
|
26
|
my ($self, $topic, $key, $value, @rest) = @_; |
|
388
|
1
|
|
|
|
|
3
|
my $cb; |
|
389
|
|
|
|
|
|
|
my %opts; |
|
390
|
1
|
|
|
|
|
4
|
for my $a (@rest) { |
|
391
|
0
|
0
|
|
|
|
0
|
if (ref $a eq 'CODE') { $cb = $a } |
|
|
0
|
0
|
|
|
|
0
|
|
|
392
|
0
|
|
|
|
|
0
|
elsif (ref $a eq 'HASH') { %opts = %$a } |
|
393
|
|
|
|
|
|
|
} |
|
394
|
|
|
|
|
|
|
|
|
395
|
1
|
|
|
|
|
8
|
my $cfg = $self->{cfg}; |
|
396
|
|
|
|
|
|
|
|
|
397
|
|
|
|
|
|
|
# ensure we have metadata |
|
398
|
1
|
50
|
|
|
|
5
|
unless ($cfg->{meta}) { |
|
399
|
1
|
|
|
|
|
10
|
push @{$cfg->{pending_ops}}, { |
|
400
|
0
|
|
|
0
|
|
0
|
run => sub { $self->produce($topic, $key, $value, @rest) }, |
|
401
|
1
|
|
|
|
|
3
|
}; |
|
402
|
1
|
50
|
|
|
|
8
|
$self->_refresh_metadata unless $cfg->{meta_pending}; |
|
403
|
1
|
|
|
|
|
4
|
return; |
|
404
|
|
|
|
|
|
|
} |
|
405
|
|
|
|
|
|
|
|
|
406
|
|
|
|
|
|
|
my $partition = exists $opts{partition} |
|
407
|
|
|
|
|
|
|
? $opts{partition} |
|
408
|
0
|
0
|
|
|
|
0
|
: $self->_select_partition($topic, $key); |
|
409
|
|
|
|
|
|
|
|
|
410
|
0
|
|
|
|
|
0
|
my $leader_id = $self->_get_leader($topic, $partition); |
|
411
|
0
|
0
|
|
|
|
0
|
unless (defined $leader_id) { |
|
412
|
|
|
|
|
|
|
# topic/partition unknown — request metadata for this topic to trigger auto-creation |
|
413
|
0
|
|
|
|
|
0
|
push @{$cfg->{pending_ops}}, { |
|
414
|
0
|
|
|
0
|
|
0
|
run => sub { $self->produce($topic, $key, $value, @rest) }, |
|
415
|
0
|
|
|
|
|
0
|
}; |
|
416
|
0
|
0
|
|
|
|
0
|
$self->_refresh_metadata_for_topic($topic) unless $cfg->{meta_pending}; |
|
417
|
0
|
|
|
|
|
0
|
return; |
|
418
|
|
|
|
|
|
|
} |
|
419
|
|
|
|
|
|
|
|
|
420
|
0
|
|
|
|
|
0
|
my $conn = $self->_get_or_create_conn($leader_id); |
|
421
|
0
|
0
|
0
|
|
|
0
|
unless ($conn && $conn->connected) { |
|
422
|
0
|
|
|
|
|
0
|
push @{$cfg->{pending_ops}}, { |
|
423
|
|
|
|
|
|
|
node_id => $leader_id, |
|
424
|
0
|
|
|
0
|
|
0
|
run => sub { $self->produce($topic, $key, $value, @rest) }, |
|
425
|
0
|
|
|
|
|
0
|
}; |
|
426
|
0
|
|
|
|
|
0
|
return; |
|
427
|
|
|
|
|
|
|
} |
|
428
|
|
|
|
|
|
|
|
|
429
|
|
|
|
|
|
|
# Accumulate into batch |
|
430
|
0
|
|
|
|
|
0
|
my $bkey = "$topic:$partition"; |
|
431
|
0
|
|
|
|
|
0
|
my $rec = { key => $key, value => $value }; |
|
432
|
0
|
0
|
|
|
|
0
|
$rec->{headers} = $opts{headers} if $opts{headers}; |
|
433
|
0
|
|
0
|
|
|
0
|
push @{$cfg->{batches}{$bkey} //= []}, { rec => $rec, cb => $cb }; |
|
|
0
|
|
|
|
|
0
|
|
|
434
|
|
|
|
|
|
|
|
|
435
|
|
|
|
|
|
|
# Check batch size threshold |
|
436
|
0
|
|
|
|
|
0
|
my $batch = $cfg->{batches}{$bkey}; |
|
437
|
0
|
|
|
|
|
0
|
my $batch_bytes = 0; |
|
438
|
0
|
|
|
|
|
0
|
for my $b (@$batch) { |
|
439
|
0
|
|
0
|
|
|
0
|
$batch_bytes += length($b->{rec}{value} // '') + length($b->{rec}{key} // '') + 20; |
|
|
|
|
0
|
|
|
|
|
|
440
|
|
|
|
|
|
|
} |
|
441
|
|
|
|
|
|
|
|
|
442
|
0
|
0
|
|
|
|
0
|
if ($batch_bytes >= $cfg->{batch_size}) { |
|
|
|
0
|
|
|
|
|
|
|
443
|
0
|
|
|
|
|
0
|
$self->_flush_batch($topic, $partition, $conn); |
|
444
|
|
|
|
|
|
|
} elsif (!$cfg->{_linger_active}) { |
|
445
|
|
|
|
|
|
|
# start linger timer |
|
446
|
0
|
|
|
|
|
0
|
$cfg->{_linger_active} = 1; |
|
447
|
0
|
|
|
|
|
0
|
weaken(my $weak = $self); |
|
448
|
|
|
|
|
|
|
$cfg->{_linger_timer} = EV::timer $cfg->{linger_ms} / 1000.0, 0, sub { |
|
449
|
0
|
|
|
0
|
|
0
|
$cfg->{_linger_active} = 0; |
|
450
|
0
|
0
|
|
|
|
0
|
$weak->_flush_all_batches if $weak; |
|
451
|
0
|
|
|
|
|
0
|
}; |
|
452
|
|
|
|
|
|
|
} |
|
453
|
|
|
|
|
|
|
} |
|
454
|
|
|
|
|
|
|
|
|
455
|
|
|
|
|
|
|
sub _flush_batch { |
|
456
|
0
|
|
|
0
|
|
0
|
my ($self, $topic, $partition, $conn) = @_; |
|
457
|
0
|
|
|
|
|
0
|
my $cfg = $self->{cfg}; |
|
458
|
0
|
|
|
|
|
0
|
my $bkey = "$topic:$partition"; |
|
459
|
0
|
|
|
|
|
0
|
my $batch = delete $cfg->{batches}{$bkey}; |
|
460
|
0
|
0
|
0
|
|
|
0
|
return unless $batch && @$batch; |
|
461
|
|
|
|
|
|
|
|
|
462
|
0
|
|
|
|
|
0
|
my @records = map { $_->{rec} } @$batch; |
|
|
0
|
|
|
|
|
0
|
|
|
463
|
0
|
|
|
|
|
0
|
my @cbs = map { $_->{cb} } @$batch; |
|
|
0
|
|
|
|
|
0
|
|
|
464
|
|
|
|
|
|
|
|
|
465
|
0
|
|
|
|
|
0
|
my %popts = (acks => $cfg->{acks}); |
|
466
|
0
|
0
|
|
|
|
0
|
$popts{compression} = $cfg->{compression} if $cfg->{compression}; |
|
467
|
0
|
0
|
|
|
|
0
|
$popts{transactional_id} = $cfg->{transactional_id} if $cfg->{_txn_active}; |
|
468
|
0
|
|
|
|
|
0
|
my $saved_seq; |
|
469
|
0
|
0
|
0
|
|
|
0
|
if (defined $cfg->{producer_id} && $cfg->{producer_id} >= 0) { |
|
470
|
0
|
|
|
|
|
0
|
$popts{producer_id} = $cfg->{producer_id}; |
|
471
|
0
|
|
|
|
|
0
|
$popts{producer_epoch} = $cfg->{producer_epoch}; |
|
472
|
0
|
|
0
|
|
|
0
|
$saved_seq = $cfg->{next_sequence}{$bkey} // 0; |
|
473
|
0
|
|
|
|
|
0
|
$popts{base_sequence} = $saved_seq; |
|
474
|
0
|
|
|
|
|
0
|
$cfg->{next_sequence}{$bkey} = $saved_seq + scalar @records; |
|
475
|
|
|
|
|
|
|
} |
|
476
|
|
|
|
|
|
|
|
|
477
|
0
|
0
|
|
|
|
0
|
$self->_add_txn_partition($topic, $partition) if $cfg->{_txn_active}; |
|
478
|
|
|
|
|
|
|
|
|
479
|
|
|
|
|
|
|
# retry count persists on the batch across re-queues |
|
480
|
0
|
|
0
|
|
|
0
|
$cfg->{_batch_retries}{$bkey} //= 3; |
|
481
|
|
|
|
|
|
|
|
|
482
|
|
|
|
|
|
|
$conn->produce_batch($topic, $partition, \@records, \%popts, sub { |
|
483
|
0
|
|
|
0
|
|
0
|
my ($result, $err) = @_; |
|
484
|
|
|
|
|
|
|
|
|
485
|
0
|
|
|
|
|
0
|
my $retriable = 0; |
|
486
|
0
|
0
|
0
|
|
|
0
|
if (!$err && $result && ref $result->{topics} eq 'ARRAY') { |
|
|
|
|
0
|
|
|
|
|
|
487
|
0
|
|
|
|
|
0
|
for my $t (@{$result->{topics}}) { |
|
|
0
|
|
|
|
|
0
|
|
|
488
|
0
|
|
0
|
|
|
0
|
for my $p (@{$t->{partitions} // []}) { |
|
|
0
|
|
|
|
|
0
|
|
|
489
|
0
|
|
0
|
|
|
0
|
my $ec = $p->{error_code} // 0; |
|
490
|
0
|
0
|
0
|
|
|
0
|
$retriable = $ec if $ec == 6 || $ec == 15 || $ec == 16; |
|
|
|
|
0
|
|
|
|
|
|
491
|
|
|
|
|
|
|
} |
|
492
|
|
|
|
|
|
|
} |
|
493
|
|
|
|
|
|
|
} |
|
494
|
|
|
|
|
|
|
|
|
495
|
0
|
0
|
0
|
|
|
0
|
if ($retriable && ($cfg->{_batch_retries}{$bkey} // 0) > 0) { |
|
|
|
|
0
|
|
|
|
|
|
496
|
0
|
|
|
|
|
0
|
$cfg->{_batch_retries}{$bkey}--; |
|
497
|
0
|
0
|
|
|
|
0
|
$cfg->{next_sequence}{$bkey} = $saved_seq if defined $saved_seq; |
|
498
|
0
|
0
|
|
|
|
0
|
if (exists $cfg->{batches}{$bkey}) { |
|
499
|
0
|
|
|
|
|
0
|
unshift @{$cfg->{batches}{$bkey}}, @$batch; |
|
|
0
|
|
|
|
|
0
|
|
|
500
|
|
|
|
|
|
|
} else { |
|
501
|
0
|
|
|
|
|
0
|
$cfg->{batches}{$bkey} = $batch; |
|
502
|
|
|
|
|
|
|
} |
|
503
|
0
|
0
|
|
|
|
0
|
$self->_refresh_metadata unless $cfg->{meta_pending}; |
|
504
|
0
|
|
|
|
|
0
|
my $rt; $rt = EV::timer 0.5, 0, sub { |
|
505
|
0
|
|
|
|
|
0
|
undef $rt; |
|
506
|
0
|
|
|
|
|
0
|
$self->_flush_all_batches; |
|
507
|
0
|
|
|
|
|
0
|
}; |
|
508
|
0
|
|
|
|
|
0
|
return; |
|
509
|
|
|
|
|
|
|
} |
|
510
|
0
|
|
|
|
|
0
|
delete $cfg->{_batch_retries}{$bkey}; |
|
511
|
|
|
|
|
|
|
|
|
512
|
0
|
|
|
|
|
0
|
for my $cb (@cbs) { |
|
513
|
0
|
0
|
|
|
|
0
|
$cb->($result, $err) if $cb; |
|
514
|
|
|
|
|
|
|
} |
|
515
|
0
|
|
|
|
|
0
|
}); |
|
516
|
|
|
|
|
|
|
} |
|
517
|
|
|
|
|
|
|
|
|
518
|
|
|
|
|
|
|
sub _flush_all_batches { |
|
519
|
0
|
|
|
0
|
|
0
|
my ($self) = @_; |
|
520
|
0
|
|
|
|
|
0
|
my $cfg = $self->{cfg}; |
|
521
|
0
|
|
|
|
|
0
|
my $skipped = 0; |
|
522
|
0
|
|
|
|
|
0
|
for my $bkey (keys %{$cfg->{batches}}) { |
|
|
0
|
|
|
|
|
0
|
|
|
523
|
0
|
|
|
|
|
0
|
my ($topic, $partition) = split /:/, $bkey, 2; |
|
524
|
0
|
|
|
|
|
0
|
my $leader_id = $self->_get_leader($topic, $partition); |
|
525
|
0
|
0
|
|
|
|
0
|
unless (defined $leader_id) { $skipped++; next } |
|
|
0
|
|
|
|
|
0
|
|
|
|
0
|
|
|
|
|
0
|
|
|
526
|
0
|
|
|
|
|
0
|
my $conn = $self->_get_or_create_conn($leader_id); |
|
527
|
0
|
0
|
0
|
|
|
0
|
unless ($conn && $conn->connected) { $skipped++; next } |
|
|
0
|
|
|
|
|
0
|
|
|
|
0
|
|
|
|
|
0
|
|
|
528
|
0
|
|
|
|
|
0
|
$self->_flush_batch($topic, $partition, $conn); |
|
529
|
|
|
|
|
|
|
} |
|
530
|
|
|
|
|
|
|
# re-arm timer if batches were skipped (connection not yet ready) |
|
531
|
0
|
0
|
0
|
|
|
0
|
if ($skipped && keys %{$cfg->{batches}}) { |
|
|
0
|
|
|
|
|
0
|
|
|
532
|
0
|
|
|
|
|
0
|
$cfg->{_linger_active} = 1; |
|
533
|
0
|
|
|
|
|
0
|
weaken(my $weak = $self); |
|
534
|
|
|
|
|
|
|
$cfg->{_linger_timer} = EV::timer 0.1, 0, sub { |
|
535
|
0
|
|
|
0
|
|
0
|
$cfg->{_linger_active} = 0; |
|
536
|
0
|
0
|
|
|
|
0
|
$weak->_flush_all_batches if $weak; |
|
537
|
0
|
|
|
|
|
0
|
}; |
|
538
|
|
|
|
|
|
|
} |
|
539
|
|
|
|
|
|
|
} |
|
540
|
|
|
|
|
|
|
|
|
541
|
|
|
|
|
|
|
sub produce_many { |
|
542
|
0
|
|
|
0
|
|
0
|
my ($self, $messages, $cb) = @_; |
|
543
|
0
|
|
|
|
|
0
|
my $remaining = scalar @$messages; |
|
544
|
0
|
0
|
0
|
|
|
0
|
return $cb->() if $cb && !$remaining; |
|
545
|
0
|
|
|
|
|
0
|
my @errors; |
|
546
|
0
|
|
|
|
|
0
|
my $acks0 = ($self->{cfg}{acks} == 0); |
|
547
|
0
|
|
|
|
|
0
|
for my $msg (@$messages) { |
|
548
|
0
|
0
|
|
|
|
0
|
my ($topic, $key, $value, @rest) = ref $msg eq 'ARRAY' ? @$msg : @{$msg}{qw(topic key value)}; |
|
|
0
|
|
|
|
|
0
|
|
|
549
|
0
|
0
|
|
|
|
0
|
if ($acks0) { |
|
550
|
0
|
|
|
|
|
0
|
$self->produce($topic, $key, $value, @rest); |
|
551
|
0
|
|
|
|
|
0
|
--$remaining; |
|
552
|
|
|
|
|
|
|
} else { |
|
553
|
|
|
|
|
|
|
$self->produce($topic, $key, $value, @rest, sub { |
|
554
|
0
|
|
|
0
|
|
0
|
my ($result, $err) = @_; |
|
555
|
0
|
0
|
|
|
|
0
|
push @errors, $err if $err; |
|
556
|
0
|
0
|
0
|
|
|
0
|
if (--$remaining <= 0 && $cb) { |
|
557
|
0
|
0
|
|
|
|
0
|
$cb->(@errors ? \@errors : ()); |
|
558
|
|
|
|
|
|
|
} |
|
559
|
0
|
|
|
|
|
0
|
}); |
|
560
|
|
|
|
|
|
|
} |
|
561
|
|
|
|
|
|
|
} |
|
562
|
0
|
0
|
0
|
|
|
0
|
$cb->(@errors ? \@errors : ()) if $cb && $acks0; |
|
|
|
0
|
|
|
|
|
|
|
563
|
|
|
|
|
|
|
} |
|
564
|
|
|
|
|
|
|
|
|
565
|
|
|
|
|
|
|
sub flush { |
|
566
|
0
|
|
|
0
|
|
0
|
my ($self, $cb) = @_; |
|
567
|
0
|
|
|
|
|
0
|
my $cfg = $self->{cfg}; |
|
568
|
|
|
|
|
|
|
|
|
569
|
|
|
|
|
|
|
# flush any accumulated linger batches first |
|
570
|
0
|
|
|
|
|
0
|
$self->_flush_all_batches; |
|
571
|
0
|
|
|
|
|
0
|
undef $cfg->{_linger_timer}; |
|
572
|
0
|
|
|
|
|
0
|
$cfg->{_linger_active} = 0; |
|
573
|
|
|
|
|
|
|
|
|
574
|
|
|
|
|
|
|
# wait for all in-flight produce callbacks across all connections |
|
575
|
0
|
|
|
|
|
0
|
my $pending = 0; |
|
576
|
0
|
|
|
|
|
0
|
my %seen; |
|
577
|
0
|
|
0
|
|
|
0
|
for my $conn (values %{$cfg->{conns} // {}}) { |
|
|
0
|
|
|
|
|
0
|
|
|
578
|
0
|
0
|
0
|
|
|
0
|
next unless $conn && $conn->connected; |
|
579
|
0
|
|
|
|
|
0
|
$pending += $conn->pending; |
|
580
|
0
|
|
|
|
|
0
|
$seen{$$conn} = 1; |
|
581
|
|
|
|
|
|
|
} |
|
582
|
0
|
0
|
0
|
|
|
0
|
if ($cfg->{bootstrap_conn} && $cfg->{bootstrap_conn}->connected |
|
|
|
|
0
|
|
|
|
|
|
583
|
0
|
|
|
|
|
0
|
&& !$seen{${$cfg->{bootstrap_conn}}}) { |
|
584
|
0
|
|
|
|
|
0
|
$pending += $cfg->{bootstrap_conn}->pending; |
|
585
|
|
|
|
|
|
|
} |
|
586
|
0
|
0
|
|
|
|
0
|
if ($pending == 0) { |
|
587
|
0
|
0
|
|
|
|
0
|
$cb->() if $cb; |
|
588
|
0
|
|
|
|
|
0
|
return; |
|
589
|
|
|
|
|
|
|
} |
|
590
|
|
|
|
|
|
|
# poll until pending drains |
|
591
|
0
|
|
|
|
|
0
|
my $check; $check = EV::timer 0, 0.01, sub { |
|
592
|
0
|
|
|
0
|
|
0
|
my $p = 0; |
|
593
|
0
|
|
|
|
|
0
|
my %s; |
|
594
|
0
|
|
0
|
|
|
0
|
for my $c (values %{$cfg->{conns} // {}}) { |
|
|
0
|
|
|
|
|
0
|
|
|
595
|
0
|
0
|
0
|
|
|
0
|
next unless $c && $c->connected; |
|
596
|
0
|
|
|
|
|
0
|
$p += $c->pending; |
|
597
|
0
|
|
|
|
|
0
|
$s{$$c} = 1; |
|
598
|
|
|
|
|
|
|
} |
|
599
|
|
|
|
|
|
|
$p += $cfg->{bootstrap_conn}->pending |
|
600
|
|
|
|
|
|
|
if $cfg->{bootstrap_conn} && $cfg->{bootstrap_conn}->connected |
|
601
|
0
|
0
|
0
|
|
|
0
|
&& !$s{${$cfg->{bootstrap_conn}}}; |
|
|
0
|
|
0
|
|
|
0
|
|
|
602
|
0
|
0
|
|
|
|
0
|
if ($p == 0) { |
|
603
|
0
|
|
|
|
|
0
|
undef $check; |
|
604
|
0
|
0
|
|
|
|
0
|
$cb->() if $cb; |
|
605
|
|
|
|
|
|
|
} |
|
606
|
0
|
|
|
|
|
0
|
}; |
|
607
|
0
|
|
|
|
|
0
|
$cfg->{_flush_timer} = $check; |
|
608
|
|
|
|
|
|
|
} |
|
609
|
|
|
|
|
|
|
|
|
610
|
|
|
|
|
|
|
# --- Consumer --- |
|
611
|
|
|
|
|
|
|
|
|
612
|
|
|
|
|
|
|
sub assign { |
|
613
|
0
|
|
|
0
|
|
0
|
my ($self, $partitions) = @_; |
|
614
|
0
|
|
|
|
|
0
|
my $cfg = $self->{cfg}; |
|
615
|
0
|
|
|
|
|
0
|
$cfg->{assignments} = $partitions; |
|
616
|
|
|
|
|
|
|
} |
|
617
|
|
|
|
|
|
|
|
|
618
|
|
|
|
|
|
|
sub seek { |
|
619
|
0
|
|
|
0
|
|
0
|
my ($self, $topic, $partition, $offset_or_ts, $cb) = @_; |
|
620
|
|
|
|
|
|
|
# offset_or_ts: integer offset, or -1 (latest), -2 (earliest) |
|
621
|
0
|
|
|
|
|
0
|
for my $a (@{$self->{cfg}{assignments}}) { |
|
|
0
|
|
|
|
|
0
|
|
|
622
|
0
|
0
|
0
|
|
|
0
|
if ($a->{topic} eq $topic && $a->{partition} == $partition) { |
|
623
|
0
|
0
|
|
|
|
0
|
if ($offset_or_ts >= 0) { |
|
624
|
0
|
|
|
|
|
0
|
$a->{offset} = $offset_or_ts; |
|
625
|
0
|
0
|
|
|
|
0
|
$cb->() if $cb; |
|
626
|
|
|
|
|
|
|
} else { |
|
627
|
|
|
|
|
|
|
# resolve via list_offsets |
|
628
|
0
|
|
|
|
|
0
|
my $leader_id = $self->_get_leader($topic, $partition); |
|
629
|
0
|
0
|
|
|
|
0
|
my $conn = defined($leader_id) ? $self->_get_or_create_conn($leader_id) : undef; |
|
630
|
0
|
0
|
0
|
|
|
0
|
if ($conn && $conn->connected) { |
|
631
|
|
|
|
|
|
|
$conn->list_offsets($topic, $partition, $offset_or_ts, sub { |
|
632
|
0
|
|
|
0
|
|
0
|
my ($res, $err) = @_; |
|
633
|
0
|
0
|
0
|
|
|
0
|
if (!$err && $res) { |
|
634
|
0
|
|
|
|
|
0
|
my $off = $res->{topics}[0]{partitions}[0]{offset}; |
|
635
|
0
|
0
|
|
|
|
0
|
$a->{offset} = $off if defined $off; |
|
636
|
|
|
|
|
|
|
} |
|
637
|
0
|
0
|
|
|
|
0
|
$cb->() if $cb; |
|
638
|
0
|
|
|
|
|
0
|
}); |
|
639
|
|
|
|
|
|
|
} else { |
|
640
|
0
|
0
|
|
|
|
0
|
$cb->() if $cb; |
|
641
|
|
|
|
|
|
|
} |
|
642
|
|
|
|
|
|
|
} |
|
643
|
0
|
|
|
|
|
0
|
return; |
|
644
|
|
|
|
|
|
|
} |
|
645
|
|
|
|
|
|
|
} |
|
646
|
0
|
0
|
|
|
|
0
|
$cb->() if $cb; |
|
647
|
|
|
|
|
|
|
} |
|
648
|
|
|
|
|
|
|
|
|
649
|
|
|
|
|
|
|
sub poll { |
|
650
|
0
|
|
|
0
|
|
0
|
my ($self, $cb) = @_; |
|
651
|
0
|
|
|
|
|
0
|
my $cfg = $self->{cfg}; |
|
652
|
0
|
0
|
|
|
|
0
|
return unless @{$cfg->{assignments}}; |
|
|
0
|
|
|
|
|
0
|
|
|
653
|
|
|
|
|
|
|
|
|
654
|
0
|
0
|
|
|
|
0
|
unless ($cfg->{meta}) { |
|
655
|
0
|
|
|
|
|
0
|
push @{$cfg->{pending_ops}}, { |
|
656
|
0
|
|
|
0
|
|
0
|
run => sub { $self->poll($cb) }, |
|
657
|
0
|
|
|
|
|
0
|
}; |
|
658
|
0
|
0
|
|
|
|
0
|
$self->_refresh_metadata unless $cfg->{meta_pending}; |
|
659
|
0
|
|
|
|
|
0
|
return; |
|
660
|
|
|
|
|
|
|
} |
|
661
|
|
|
|
|
|
|
|
|
662
|
|
|
|
|
|
|
# Group assignments by leader for multi-partition fetch |
|
663
|
0
|
|
|
|
|
0
|
my %by_leader; # leader_id => { topic => [{partition, offset, assign_ref}] } |
|
664
|
0
|
|
|
|
|
0
|
for my $a (@{$cfg->{assignments}}) { |
|
|
0
|
|
|
|
|
0
|
|
|
665
|
0
|
|
|
|
|
0
|
my $leader_id = $self->_get_leader($a->{topic}, $a->{partition}); |
|
666
|
0
|
0
|
|
|
|
0
|
next unless defined $leader_id; |
|
667
|
0
|
|
|
|
|
0
|
push @{$by_leader{$leader_id}{$a->{topic}}}, { |
|
668
|
|
|
|
|
|
|
partition => $a->{partition}, |
|
669
|
|
|
|
|
|
|
offset => $a->{offset}, |
|
670
|
0
|
|
|
|
|
0
|
_assign => $a, |
|
671
|
|
|
|
|
|
|
}; |
|
672
|
|
|
|
|
|
|
} |
|
673
|
|
|
|
|
|
|
|
|
674
|
0
|
|
|
|
|
0
|
my $dispatched = 0; |
|
675
|
0
|
|
|
|
|
0
|
for my $leader_id (keys %by_leader) { |
|
676
|
0
|
|
|
|
|
0
|
my $conn = $self->_get_or_create_conn($leader_id); |
|
677
|
0
|
0
|
0
|
|
|
0
|
next unless $conn && $conn->connected; |
|
678
|
0
|
|
|
|
|
0
|
$dispatched++; |
|
679
|
|
|
|
|
|
|
|
|
680
|
|
|
|
|
|
|
# build fetch_multi argument: {topic => [{partition, offset}]} |
|
681
|
0
|
|
|
|
|
0
|
my %fetch_arg; |
|
682
|
|
|
|
|
|
|
my %assign_map; # "topic:partition" => assignment ref |
|
683
|
0
|
|
|
|
|
0
|
for my $topic (keys %{$by_leader{$leader_id}}) { |
|
|
0
|
|
|
|
|
0
|
|
|
684
|
0
|
|
|
|
|
0
|
for my $p (@{$by_leader{$leader_id}{$topic}}) { |
|
|
0
|
|
|
|
|
0
|
|
|
685
|
0
|
|
|
|
|
0
|
push @{$fetch_arg{$topic}}, { |
|
686
|
|
|
|
|
|
|
partition => $p->{partition}, |
|
687
|
|
|
|
|
|
|
offset => $p->{offset}, |
|
688
|
0
|
|
|
|
|
0
|
}; |
|
689
|
0
|
|
|
|
|
0
|
$assign_map{"$topic:$p->{partition}"} = $p->{_assign}; |
|
690
|
|
|
|
|
|
|
} |
|
691
|
|
|
|
|
|
|
} |
|
692
|
|
|
|
|
|
|
|
|
693
|
|
|
|
|
|
|
$conn->fetch_multi(\%fetch_arg, sub { |
|
694
|
0
|
|
|
0
|
|
0
|
my ($result, $err) = @_; |
|
695
|
0
|
|
|
|
|
0
|
$dispatched--; |
|
696
|
|
|
|
|
|
|
|
|
697
|
0
|
0
|
0
|
|
|
0
|
if (!$err && $result && ref $result->{topics} eq 'ARRAY') { |
|
|
|
|
0
|
|
|
|
|
|
698
|
0
|
|
|
|
|
0
|
for my $t (@{$result->{topics}}) { |
|
|
0
|
|
|
|
|
0
|
|
|
699
|
0
|
|
0
|
|
|
0
|
for my $p (@{$t->{partitions} // []}) { |
|
|
0
|
|
|
|
|
0
|
|
|
700
|
0
|
|
0
|
|
|
0
|
my $records = $p->{records} // []; |
|
701
|
0
|
|
|
|
|
0
|
for my $r (@$records) { |
|
702
|
0
|
0
|
|
|
|
0
|
if ($cfg->{on_message}) { |
|
703
|
|
|
|
|
|
|
$cfg->{on_message}->( |
|
704
|
|
|
|
|
|
|
$t->{topic}, $p->{partition}, |
|
705
|
|
|
|
|
|
|
$r->{offset}, $r->{key}, $r->{value}, |
|
706
|
|
|
|
|
|
|
$r->{headers} |
|
707
|
0
|
|
|
|
|
0
|
); |
|
708
|
|
|
|
|
|
|
} |
|
709
|
|
|
|
|
|
|
} |
|
710
|
0
|
0
|
|
|
|
0
|
if (@$records) { |
|
711
|
0
|
|
|
|
|
0
|
my $a = $assign_map{"$t->{topic}:$p->{partition}"}; |
|
712
|
0
|
0
|
|
|
|
0
|
$a->{offset} = $records->[-1]{offset} + 1 if $a; |
|
713
|
|
|
|
|
|
|
} |
|
714
|
|
|
|
|
|
|
} |
|
715
|
|
|
|
|
|
|
} |
|
716
|
|
|
|
|
|
|
} |
|
717
|
|
|
|
|
|
|
|
|
718
|
0
|
0
|
0
|
|
|
0
|
$cb->() if $cb && $dispatched <= 0; |
|
719
|
0
|
|
|
|
|
0
|
}); |
|
720
|
|
|
|
|
|
|
} |
|
721
|
0
|
0
|
0
|
|
|
0
|
$cb->() if $cb && !$dispatched; |
|
722
|
|
|
|
|
|
|
} |
|
723
|
|
|
|
|
|
|
|
|
724
|
|
|
|
|
|
|
sub offsets_for { |
|
725
|
0
|
|
|
0
|
|
0
|
my ($self, $topic, $cb) = @_; |
|
726
|
0
|
|
|
|
|
0
|
my $cfg = $self->{cfg}; |
|
727
|
|
|
|
|
|
|
|
|
728
|
0
|
|
|
|
|
0
|
my $np = $self->_num_partitions($topic); |
|
729
|
0
|
0
|
0
|
|
|
0
|
return $cb->({}) if $cb && !$np; |
|
730
|
|
|
|
|
|
|
|
|
731
|
0
|
|
|
|
|
0
|
my $result = {}; |
|
732
|
0
|
|
|
|
|
0
|
my $remaining = $np; |
|
733
|
0
|
|
|
|
|
0
|
for my $p (0..$np-1) { |
|
734
|
0
|
|
|
|
|
0
|
my $pid = $p; |
|
735
|
0
|
|
|
|
|
0
|
my $leader_id = $self->_get_leader($topic, $pid); |
|
736
|
0
|
0
|
|
|
|
0
|
my $conn = defined($leader_id) ? $self->_get_or_create_conn($leader_id) : undef; |
|
737
|
0
|
0
|
0
|
|
|
0
|
unless ($conn && $conn->connected) { |
|
738
|
0
|
|
|
|
|
0
|
$result->{$pid} = {}; |
|
739
|
0
|
0
|
0
|
|
|
0
|
$cb->($result) if $cb && --$remaining <= 0; |
|
740
|
0
|
|
|
|
|
0
|
next; |
|
741
|
|
|
|
|
|
|
} |
|
742
|
0
|
|
|
|
|
0
|
my %pdata; |
|
743
|
0
|
|
|
|
|
0
|
my $pdone = 0; |
|
744
|
0
|
|
|
|
|
0
|
for my $ts (-2, -1) { |
|
745
|
|
|
|
|
|
|
$conn->list_offsets($topic, $pid, $ts, sub { |
|
746
|
0
|
|
|
0
|
|
0
|
my ($res, $err) = @_; |
|
747
|
0
|
0
|
0
|
|
|
0
|
if (!$err && $res && ref $res->{topics} eq 'ARRAY') { |
|
|
|
|
0
|
|
|
|
|
|
748
|
0
|
|
|
|
|
0
|
my $off = $res->{topics}[0]{partitions}[0]{offset}; |
|
749
|
0
|
0
|
|
|
|
0
|
if ($ts == -2) { $pdata{earliest} = $off } |
|
|
0
|
|
|
|
|
0
|
|
|
750
|
0
|
|
|
|
|
0
|
else { $pdata{latest} = $off } |
|
751
|
|
|
|
|
|
|
} |
|
752
|
0
|
0
|
|
|
|
0
|
if (++$pdone == 2) { |
|
753
|
0
|
|
|
|
|
0
|
$result->{$pid} = \%pdata; |
|
754
|
0
|
0
|
0
|
|
|
0
|
$cb->($result) if $cb && --$remaining <= 0; |
|
755
|
|
|
|
|
|
|
} |
|
756
|
0
|
|
|
|
|
0
|
}); |
|
757
|
|
|
|
|
|
|
} |
|
758
|
|
|
|
|
|
|
} |
|
759
|
|
|
|
|
|
|
} |
|
760
|
|
|
|
|
|
|
|
|
761
|
|
|
|
|
|
|
sub lag { |
|
762
|
0
|
|
|
0
|
|
0
|
my ($self, $cb) = @_; |
|
763
|
0
|
|
|
|
|
0
|
my $cfg = $self->{cfg}; |
|
764
|
0
|
|
0
|
|
|
0
|
my @assignments = @{$cfg->{assignments} // []}; |
|
|
0
|
|
|
|
|
0
|
|
|
765
|
0
|
0
|
0
|
|
|
0
|
return $cb->({}) if $cb && !@assignments; |
|
766
|
|
|
|
|
|
|
|
|
767
|
0
|
|
|
|
|
0
|
my $result = {}; |
|
768
|
0
|
|
|
|
|
0
|
my $remaining = scalar @assignments; |
|
769
|
0
|
|
|
|
|
0
|
for my $a (@assignments) { |
|
770
|
0
|
|
|
|
|
0
|
my $key = "$a->{topic}:$a->{partition}"; |
|
771
|
0
|
|
|
|
|
0
|
my $leader_id = $self->_get_leader($a->{topic}, $a->{partition}); |
|
772
|
0
|
0
|
|
|
|
0
|
my $conn = defined($leader_id) ? $self->_get_or_create_conn($leader_id) : undef; |
|
773
|
0
|
0
|
0
|
|
|
0
|
if ($conn && $conn->connected) { |
|
774
|
|
|
|
|
|
|
$conn->list_offsets($a->{topic}, $a->{partition}, -1, sub { |
|
775
|
0
|
|
|
0
|
|
0
|
my ($res, $err) = @_; |
|
776
|
0
|
|
|
|
|
0
|
my $hw = 0; |
|
777
|
0
|
0
|
0
|
|
|
0
|
if (!$err && $res && ref $res->{topics} eq 'ARRAY') { |
|
|
|
|
0
|
|
|
|
|
|
778
|
0
|
|
0
|
|
|
0
|
$hw = $res->{topics}[0]{partitions}[0]{offset} // 0; |
|
779
|
|
|
|
|
|
|
} |
|
780
|
|
|
|
|
|
|
$result->{$key} = { |
|
781
|
|
|
|
|
|
|
current => $a->{offset}, |
|
782
|
|
|
|
|
|
|
latest => $hw, |
|
783
|
|
|
|
|
|
|
lag => $hw - $a->{offset}, |
|
784
|
0
|
|
|
|
|
0
|
}; |
|
785
|
0
|
0
|
0
|
|
|
0
|
$cb->($result) if $cb && --$remaining <= 0; |
|
786
|
0
|
|
|
|
|
0
|
}); |
|
787
|
|
|
|
|
|
|
} else { |
|
788
|
0
|
|
|
|
|
0
|
$result->{$key} = { current => $a->{offset}, latest => 0, lag => 0 }; |
|
789
|
0
|
0
|
0
|
|
|
0
|
$cb->($result) if $cb && --$remaining <= 0; |
|
790
|
|
|
|
|
|
|
} |
|
791
|
|
|
|
|
|
|
} |
|
792
|
|
|
|
|
|
|
} |
|
793
|
|
|
|
|
|
|
|
|
794
|
|
|
|
|
|
|
sub error_name { |
|
795
|
0
|
0
|
|
0
|
|
0
|
shift if ref $_[0]; # allow $kafka->error_name or EV::Kafka::Client::error_name |
|
796
|
0
|
|
|
|
|
0
|
return EV::Kafka::_error_name($_[0]); |
|
797
|
|
|
|
|
|
|
} |
|
798
|
|
|
|
|
|
|
|
|
799
|
|
|
|
|
|
|
# --- Consumer Group --- |
|
800
|
|
|
|
|
|
|
|
|
801
|
|
|
|
|
|
|
sub subscribe { |
|
802
|
0
|
|
|
0
|
|
0
|
my ($self, @args) = @_; |
|
803
|
0
|
|
|
|
|
0
|
my @topics; |
|
804
|
|
|
|
|
|
|
my %opts; |
|
805
|
|
|
|
|
|
|
|
|
806
|
|
|
|
|
|
|
# subscribe('topic1', 'topic2', group_id => 'g', ...) |
|
807
|
0
|
|
|
|
|
0
|
while (@args) { |
|
808
|
0
|
0
|
|
|
|
0
|
if ($args[0] =~ /^(group_id|group_instance_id|on_assign|on_revoke|session_timeout|rebalance_timeout|heartbeat_interval|auto_commit|auto_offset_reset)$/) { |
|
809
|
0
|
|
|
|
|
0
|
my $k = shift @args; |
|
810
|
0
|
|
|
|
|
0
|
$opts{$k} = shift @args; |
|
811
|
|
|
|
|
|
|
} else { |
|
812
|
0
|
|
|
|
|
0
|
push @topics, shift @args; |
|
813
|
|
|
|
|
|
|
} |
|
814
|
|
|
|
|
|
|
} |
|
815
|
|
|
|
|
|
|
|
|
816
|
0
|
|
|
|
|
0
|
my $cfg = $self->{cfg}; |
|
817
|
0
|
0
|
|
|
|
0
|
my $group_id = $opts{group_id} or die "group_id required"; |
|
818
|
|
|
|
|
|
|
|
|
819
|
|
|
|
|
|
|
$cfg->{group} = { |
|
820
|
|
|
|
|
|
|
group_id => $group_id, |
|
821
|
|
|
|
|
|
|
member_id => '', |
|
822
|
|
|
|
|
|
|
generation => -1, |
|
823
|
|
|
|
|
|
|
topics => \@topics, |
|
824
|
|
|
|
|
|
|
on_assign => $opts{on_assign}, |
|
825
|
|
|
|
|
|
|
on_revoke => $opts{on_revoke}, |
|
826
|
|
|
|
|
|
|
session_timeout => $opts{session_timeout} // 30000, |
|
827
|
|
|
|
|
|
|
rebalance_timeout => $opts{rebalance_timeout} // 60000, |
|
828
|
|
|
|
|
|
|
heartbeat_interval => $opts{heartbeat_interval} // 3, |
|
829
|
|
|
|
|
|
|
auto_commit => $opts{auto_commit} // 1, |
|
830
|
|
|
|
|
|
|
auto_offset_reset => $opts{auto_offset_reset} // 'earliest', |
|
831
|
|
|
|
|
|
|
group_instance_id => $opts{group_instance_id}, |
|
832
|
0
|
|
0
|
|
|
0
|
coordinator => undef, |
|
|
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
833
|
|
|
|
|
|
|
heartbeat_timer => undef, |
|
834
|
|
|
|
|
|
|
state => 'init', |
|
835
|
|
|
|
|
|
|
}; |
|
836
|
|
|
|
|
|
|
|
|
837
|
|
|
|
|
|
|
# Step 1: ensure we have metadata |
|
838
|
0
|
0
|
|
|
|
0
|
unless ($cfg->{meta}) { |
|
839
|
0
|
|
|
|
|
0
|
push @{$cfg->{pending_ops}}, { |
|
840
|
0
|
|
|
0
|
|
0
|
run => sub { $self->_group_start }, |
|
841
|
0
|
|
|
|
|
0
|
}; |
|
842
|
0
|
0
|
|
|
|
0
|
$self->_refresh_metadata unless $cfg->{meta_pending}; |
|
843
|
0
|
|
|
|
|
0
|
return; |
|
844
|
|
|
|
|
|
|
} |
|
845
|
|
|
|
|
|
|
|
|
846
|
0
|
|
|
|
|
0
|
$self->_group_start; |
|
847
|
|
|
|
|
|
|
} |
|
848
|
|
|
|
|
|
|
|
|
849
|
|
|
|
|
|
|
sub _group_start { |
|
850
|
0
|
|
|
0
|
|
0
|
my ($self) = @_; |
|
851
|
0
|
|
|
|
|
0
|
my $cfg = $self->{cfg}; |
|
852
|
0
|
0
|
|
|
|
0
|
my $g = $cfg->{group} or return; |
|
853
|
|
|
|
|
|
|
|
|
854
|
0
|
|
|
|
|
0
|
my $conn = $self->_any_conn; |
|
855
|
0
|
0
|
|
|
|
0
|
return unless $conn; |
|
856
|
|
|
|
|
|
|
|
|
857
|
0
|
|
|
|
|
0
|
$g->{state} = 'finding'; |
|
858
|
|
|
|
|
|
|
$conn->find_coordinator($g->{group_id}, sub { |
|
859
|
0
|
|
|
0
|
|
0
|
my ($res, $err) = @_; |
|
860
|
0
|
0
|
0
|
|
|
0
|
if ($err || $res->{error_code}) { |
|
861
|
0
|
|
0
|
|
|
0
|
my $msg = $err || "FindCoordinator error: $res->{error_code}"; |
|
862
|
0
|
0
|
|
|
|
0
|
$cfg->{on_error}->($msg) if $cfg->{on_error}; |
|
863
|
|
|
|
|
|
|
# retry after delay |
|
864
|
0
|
|
|
|
|
0
|
my $t; $t = EV::timer 1, 0, sub { undef $t; $self->_group_start }; |
|
|
0
|
|
|
|
|
0
|
|
|
|
0
|
|
|
|
|
0
|
|
|
|
0
|
|
|
|
|
0
|
|
|
865
|
0
|
|
|
|
|
0
|
return; |
|
866
|
|
|
|
|
|
|
} |
|
867
|
|
|
|
|
|
|
|
|
868
|
|
|
|
|
|
|
# Store coordinator info and connect |
|
869
|
|
|
|
|
|
|
$cfg->{broker_map}{$res->{node_id}} = { |
|
870
|
|
|
|
|
|
|
host => $res->{host}, port => $res->{port} |
|
871
|
0
|
|
|
|
|
0
|
}; |
|
872
|
0
|
|
|
|
|
0
|
my $coord = $self->_get_or_create_conn($res->{node_id}); |
|
873
|
0
|
|
|
|
|
0
|
$g->{coordinator} = $coord; |
|
874
|
0
|
|
|
|
|
0
|
$g->{coordinator_id} = $res->{node_id}; |
|
875
|
|
|
|
|
|
|
|
|
876
|
0
|
0
|
|
|
|
0
|
if ($coord->connected) { |
|
877
|
0
|
|
|
|
|
0
|
$self->_group_join; |
|
878
|
|
|
|
|
|
|
} else { |
|
879
|
0
|
|
|
|
|
0
|
push @{$cfg->{pending_ops}}, { |
|
880
|
|
|
|
|
|
|
node_id => $res->{node_id}, |
|
881
|
0
|
|
|
|
|
0
|
run => sub { $self->_group_join }, |
|
882
|
0
|
|
|
|
|
0
|
}; |
|
883
|
|
|
|
|
|
|
} |
|
884
|
0
|
|
|
|
|
0
|
}); |
|
885
|
|
|
|
|
|
|
} |
|
886
|
|
|
|
|
|
|
|
|
887
|
|
|
|
|
|
|
sub _group_join { |
|
888
|
0
|
|
|
0
|
|
0
|
my ($self) = @_; |
|
889
|
0
|
|
|
|
|
0
|
my $cfg = $self->{cfg}; |
|
890
|
0
|
0
|
|
|
|
0
|
my $g = $cfg->{group} or return; |
|
891
|
0
|
0
|
|
|
|
0
|
my $coord = $g->{coordinator} or return; |
|
892
|
|
|
|
|
|
|
|
|
893
|
0
|
|
|
|
|
0
|
$g->{state} = 'joining'; |
|
894
|
|
|
|
|
|
|
$coord->join_group( |
|
895
|
|
|
|
|
|
|
$g->{group_id}, $g->{member_id}, |
|
896
|
|
|
|
|
|
|
$g->{topics}, sub { |
|
897
|
0
|
|
|
0
|
|
0
|
my ($res, $err) = @_; |
|
898
|
0
|
0
|
|
|
|
0
|
if ($err) { |
|
899
|
0
|
0
|
|
|
|
0
|
$cfg->{on_error}->("JoinGroup: $err") if $cfg->{on_error}; |
|
900
|
0
|
|
|
|
|
0
|
return; |
|
901
|
|
|
|
|
|
|
} |
|
902
|
|
|
|
|
|
|
|
|
903
|
0
|
0
|
0
|
|
|
0
|
if ($res->{error_code} == 15 || $res->{error_code} == 16) { |
|
904
|
|
|
|
|
|
|
# COORDINATOR_NOT_AVAILABLE / NOT_COORDINATOR — re-discover |
|
905
|
0
|
|
|
|
|
0
|
my $t; $t = EV::timer 1, 0, sub { undef $t; $self->_group_start }; |
|
|
0
|
|
|
|
|
0
|
|
|
|
0
|
|
|
|
|
0
|
|
|
|
0
|
|
|
|
|
0
|
|
|
906
|
0
|
|
|
|
|
0
|
return; |
|
907
|
|
|
|
|
|
|
} |
|
908
|
0
|
0
|
|
|
|
0
|
if ($res->{error_code} == 27) { |
|
909
|
|
|
|
|
|
|
# REBALANCE_IN_PROGRESS — retry |
|
910
|
0
|
|
|
|
|
0
|
my $t; $t = EV::timer 1, 0, sub { undef $t; $self->_group_join }; |
|
|
0
|
|
|
|
|
0
|
|
|
|
0
|
|
|
|
|
0
|
|
|
|
0
|
|
|
|
|
0
|
|
|
911
|
0
|
|
|
|
|
0
|
return; |
|
912
|
|
|
|
|
|
|
} |
|
913
|
0
|
0
|
|
|
|
0
|
if ($res->{error_code} == 79) { |
|
914
|
|
|
|
|
|
|
# MEMBER_ID_REQUIRED — retry with assigned member_id |
|
915
|
0
|
0
|
|
|
|
0
|
$g->{member_id} = $res->{member_id} if $res->{member_id}; |
|
916
|
0
|
|
|
|
|
0
|
$self->_group_join; |
|
917
|
0
|
|
|
|
|
0
|
return; |
|
918
|
|
|
|
|
|
|
} |
|
919
|
0
|
0
|
|
|
|
0
|
if ($res->{error_code}) { |
|
920
|
0
|
0
|
|
|
|
0
|
$cfg->{on_error}->("JoinGroup error: $res->{error_code}") if $cfg->{on_error}; |
|
921
|
0
|
|
|
|
|
0
|
return; |
|
922
|
|
|
|
|
|
|
} |
|
923
|
|
|
|
|
|
|
|
|
924
|
0
|
|
|
|
|
0
|
$g->{member_id} = $res->{member_id}; |
|
925
|
0
|
|
|
|
|
0
|
$g->{generation} = $res->{generation_id}; |
|
926
|
0
|
|
|
|
|
0
|
my $is_leader = ($res->{leader} eq $res->{member_id}); |
|
927
|
|
|
|
|
|
|
|
|
928
|
|
|
|
|
|
|
# Build assignments (if leader) |
|
929
|
0
|
|
|
|
|
0
|
my $assignments = []; |
|
930
|
0
|
0
|
0
|
|
|
0
|
if ($is_leader && $res->{members} && @{$res->{members}}) { |
|
|
0
|
|
0
|
|
|
0
|
|
|
931
|
0
|
|
|
|
|
0
|
$assignments = $self->_assign_partitions($res->{members}, $g->{topics}); |
|
932
|
|
|
|
|
|
|
} |
|
933
|
|
|
|
|
|
|
|
|
934
|
0
|
|
|
|
|
0
|
$self->_group_sync($assignments); |
|
935
|
|
|
|
|
|
|
}, |
|
936
|
|
|
|
|
|
|
$g->{session_timeout}, $g->{rebalance_timeout}, |
|
937
|
|
|
|
|
|
|
$g->{group_instance_id} |
|
938
|
0
|
|
|
|
|
0
|
); |
|
939
|
|
|
|
|
|
|
} |
|
940
|
|
|
|
|
|
|
|
|
941
|
|
|
|
|
|
|
sub _assign_partitions { |
|
942
|
7
|
|
|
7
|
|
10224
|
my ($self, $members, $topics) = @_; |
|
943
|
7
|
|
|
|
|
14
|
my $cfg = $self->{cfg}; |
|
944
|
7
|
50
|
|
|
|
24
|
my $meta = $cfg->{meta} or return []; |
|
945
|
|
|
|
|
|
|
|
|
946
|
7
|
|
|
|
|
7
|
my @all_parts; |
|
947
|
7
|
|
50
|
|
|
9
|
for my $t (@{$meta->{topics} // []}) { |
|
|
7
|
|
|
|
|
19
|
|
|
948
|
11
|
|
|
|
|
17
|
my $tname = $t->{name}; |
|
949
|
11
|
50
|
|
|
|
17
|
next unless grep { $_ eq $tname } @$topics; |
|
|
19
|
|
|
|
|
36
|
|
|
950
|
11
|
|
50
|
|
|
12
|
for my $p (@{$t->{partitions} // []}) { |
|
|
11
|
|
|
|
|
20
|
|
|
951
|
36
|
|
|
|
|
77
|
push @all_parts, { topic => $tname, partition => $p->{partition} }; |
|
952
|
|
|
|
|
|
|
} |
|
953
|
|
|
|
|
|
|
} |
|
954
|
7
|
50
|
|
|
|
31
|
@all_parts = sort { $a->{topic} cmp $b->{topic} || $a->{partition} <=> $b->{partition} } @all_parts; |
|
|
48
|
|
|
|
|
113
|
|
|
955
|
|
|
|
|
|
|
|
|
956
|
7
|
|
|
|
|
15
|
my @member_ids = sort map { $_->{member_id} } @$members; |
|
|
14
|
|
|
|
|
39
|
|
|
957
|
7
|
|
|
|
|
13
|
my $nm = scalar @member_ids; |
|
958
|
|
|
|
|
|
|
|
|
959
|
|
|
|
|
|
|
# Sticky assignment: preserve previous assignments where possible |
|
960
|
7
|
|
100
|
|
|
19
|
my $prev = $cfg->{_prev_assignments} // {}; |
|
961
|
7
|
|
|
|
|
10
|
my %member_parts; # member_id => [@parts] |
|
962
|
|
|
|
|
|
|
my %assigned; # "topic:partition" => 1 |
|
963
|
|
|
|
|
|
|
|
|
964
|
7
|
100
|
|
|
|
48
|
my $max_per = int(@all_parts / $nm) + ((@all_parts % $nm) ? 1 : 0); |
|
965
|
|
|
|
|
|
|
|
|
966
|
|
|
|
|
|
|
# Step 1: keep valid previous assignments (but cap at max_per) |
|
967
|
7
|
|
|
|
|
16
|
for my $mid (@member_ids) { |
|
968
|
14
|
|
|
|
|
25
|
$member_parts{$mid} = []; |
|
969
|
14
|
|
100
|
|
|
16
|
for my $p (@{$prev->{$mid} // []}) { |
|
|
14
|
|
|
|
|
38
|
|
|
970
|
10
|
|
|
|
|
12
|
my $key = "$p->{topic}:$p->{partition}"; |
|
971
|
10
|
100
|
|
|
|
10
|
if (grep { $_->{topic} eq $p->{topic} && $_->{partition} == $p->{partition} } @all_parts) { |
|
|
60
|
50
|
|
|
|
128
|
|
|
972
|
10
|
100
|
|
|
|
10
|
if (scalar @{$member_parts{$mid}} < $max_per) { |
|
|
10
|
|
|
|
|
14
|
|
|
973
|
8
|
|
|
|
|
5
|
push @{$member_parts{$mid}}, $p; |
|
|
8
|
|
|
|
|
9
|
|
|
974
|
8
|
|
|
|
|
14
|
$assigned{$key} = 1; |
|
975
|
|
|
|
|
|
|
} |
|
976
|
|
|
|
|
|
|
} |
|
977
|
|
|
|
|
|
|
} |
|
978
|
|
|
|
|
|
|
} |
|
979
|
|
|
|
|
|
|
|
|
980
|
|
|
|
|
|
|
# Step 2: distribute unassigned partitions to least-loaded members |
|
981
|
7
|
|
|
|
|
12
|
my @unassigned = grep { !$assigned{"$_->{topic}:$_->{partition}"} } @all_parts; |
|
|
36
|
|
|
|
|
70
|
|
|
982
|
7
|
|
|
|
|
13
|
for my $p (@unassigned) { |
|
983
|
28
|
|
|
|
|
27
|
my $min_mid = $member_ids[0]; |
|
984
|
28
|
|
|
|
|
25
|
my $min_count = scalar @{$member_parts{$min_mid}}; |
|
|
28
|
|
|
|
|
33
|
|
|
985
|
28
|
|
|
|
|
25
|
for my $mid (@member_ids) { |
|
986
|
52
|
100
|
|
|
|
35
|
if (scalar @{$member_parts{$mid}} < $min_count) { |
|
|
52
|
|
|
|
|
75
|
|
|
987
|
10
|
|
|
|
|
8
|
$min_count = scalar @{$member_parts{$mid}}; |
|
|
10
|
|
|
|
|
12
|
|
|
988
|
10
|
|
|
|
|
18
|
$min_mid = $mid; |
|
989
|
|
|
|
|
|
|
} |
|
990
|
|
|
|
|
|
|
} |
|
991
|
28
|
|
|
|
|
22
|
push @{$member_parts{$min_mid}}, $p; |
|
|
28
|
|
|
|
|
36
|
|
|
992
|
|
|
|
|
|
|
} |
|
993
|
|
|
|
|
|
|
|
|
994
|
|
|
|
|
|
|
# Save for next rebalance |
|
995
|
7
|
|
|
|
|
19
|
$cfg->{_prev_assignments} = { %member_parts }; |
|
996
|
|
|
|
|
|
|
|
|
997
|
|
|
|
|
|
|
# Encode assignments |
|
998
|
7
|
|
|
|
|
12
|
my @assignments; |
|
999
|
7
|
|
|
|
|
9
|
for my $mid (@member_ids) { |
|
1000
|
14
|
|
|
|
|
16
|
my %by_topic; |
|
1001
|
14
|
|
|
|
|
12
|
for my $p (@{$member_parts{$mid}}) { |
|
|
14
|
|
|
|
|
15
|
|
|
1002
|
36
|
|
|
|
|
22
|
push @{$by_topic{$p->{topic}}}, $p->{partition}; |
|
|
36
|
|
|
|
|
68
|
|
|
1003
|
|
|
|
|
|
|
} |
|
1004
|
|
|
|
|
|
|
|
|
1005
|
14
|
|
|
|
|
16
|
my $buf = ''; |
|
1006
|
14
|
|
|
|
|
17
|
$buf .= pack('n', 0); # version |
|
1007
|
14
|
|
|
|
|
35
|
$buf .= pack('N', scalar keys %by_topic); |
|
1008
|
14
|
|
|
|
|
28
|
for my $t (sort keys %by_topic) { |
|
1009
|
20
|
|
|
|
|
29
|
$buf .= pack('n', length($t)) . $t; |
|
1010
|
20
|
|
|
|
|
22
|
$buf .= pack('N', scalar @{$by_topic{$t}}); |
|
|
20
|
|
|
|
|
45
|
|
|
1011
|
20
|
|
|
|
|
19
|
for my $pid (@{$by_topic{$t}}) { |
|
|
20
|
|
|
|
|
24
|
|
|
1012
|
36
|
|
|
|
|
55
|
$buf .= pack('N', $pid); |
|
1013
|
|
|
|
|
|
|
} |
|
1014
|
|
|
|
|
|
|
} |
|
1015
|
14
|
|
|
|
|
16
|
$buf .= pack('N', -1); # user_data = null |
|
1016
|
|
|
|
|
|
|
|
|
1017
|
14
|
|
|
|
|
43
|
push @assignments, { |
|
1018
|
|
|
|
|
|
|
member_id => $mid, |
|
1019
|
|
|
|
|
|
|
assignment => $buf, |
|
1020
|
|
|
|
|
|
|
}; |
|
1021
|
|
|
|
|
|
|
} |
|
1022
|
|
|
|
|
|
|
|
|
1023
|
7
|
|
|
|
|
46
|
return \@assignments; |
|
1024
|
|
|
|
|
|
|
} |
|
1025
|
|
|
|
|
|
|
|
|
1026
|
|
|
|
|
|
|
sub _group_sync { |
|
1027
|
0
|
|
|
0
|
|
0
|
my ($self, $assignments) = @_; |
|
1028
|
0
|
|
|
|
|
0
|
my $cfg = $self->{cfg}; |
|
1029
|
0
|
0
|
|
|
|
0
|
my $g = $cfg->{group} or return; |
|
1030
|
0
|
0
|
|
|
|
0
|
my $coord = $g->{coordinator} or return; |
|
1031
|
|
|
|
|
|
|
|
|
1032
|
0
|
|
|
|
|
0
|
$g->{state} = 'syncing'; |
|
1033
|
|
|
|
|
|
|
my $sync_cb = sub { |
|
1034
|
0
|
|
|
0
|
|
0
|
my ($res, $err) = @_; |
|
1035
|
0
|
0
|
|
|
|
0
|
if ($err) { |
|
1036
|
0
|
0
|
|
|
|
0
|
$cfg->{on_error}->("SyncGroup: $err") if $cfg->{on_error}; |
|
1037
|
0
|
|
|
|
|
0
|
return; |
|
1038
|
|
|
|
|
|
|
} |
|
1039
|
0
|
0
|
|
|
|
0
|
if ($res->{error_code} == 27) { |
|
1040
|
|
|
|
|
|
|
# REBALANCE_IN_PROGRESS — rejoin |
|
1041
|
0
|
|
|
|
|
0
|
my $t; $t = EV::timer 1, 0, sub { undef $t; $self->_group_join }; |
|
|
0
|
|
|
|
|
0
|
|
|
|
0
|
|
|
|
|
0
|
|
|
|
0
|
|
|
|
|
0
|
|
|
1042
|
0
|
|
|
|
|
0
|
return; |
|
1043
|
|
|
|
|
|
|
} |
|
1044
|
0
|
0
|
|
|
|
0
|
if ($res->{error_code}) { |
|
1045
|
0
|
0
|
|
|
|
0
|
$cfg->{on_error}->("SyncGroup error: $res->{error_code}") if $cfg->{on_error}; |
|
1046
|
0
|
|
|
|
|
0
|
return; |
|
1047
|
|
|
|
|
|
|
} |
|
1048
|
|
|
|
|
|
|
|
|
1049
|
|
|
|
|
|
|
# Decode assignment |
|
1050
|
0
|
|
0
|
|
|
0
|
my $data = $res->{assignment} // ''; |
|
1051
|
0
|
|
|
|
|
0
|
my $dlen = length $data; |
|
1052
|
0
|
|
|
|
|
0
|
my @my_assignments; |
|
1053
|
0
|
0
|
|
|
|
0
|
if ($dlen >= 6) { |
|
1054
|
0
|
|
|
|
|
0
|
my $off = 2; # skip version |
|
1055
|
0
|
|
|
|
|
0
|
my $tc = unpack('N', substr($data, $off, 4)); $off += 4; |
|
|
0
|
|
|
|
|
0
|
|
|
1056
|
0
|
|
|
|
|
0
|
for my $i (0..$tc-1) { |
|
1057
|
0
|
0
|
|
|
|
0
|
last unless $off + 2 <= $dlen; |
|
1058
|
0
|
|
|
|
|
0
|
my $tlen = unpack('n', substr($data, $off, 2)); $off += 2; |
|
|
0
|
|
|
|
|
0
|
|
|
1059
|
0
|
0
|
|
|
|
0
|
last unless $off + $tlen <= $dlen; |
|
1060
|
0
|
|
|
|
|
0
|
my $tname = substr($data, $off, $tlen); $off += $tlen; |
|
|
0
|
|
|
|
|
0
|
|
|
1061
|
0
|
0
|
|
|
|
0
|
last unless $off + 4 <= $dlen; |
|
1062
|
0
|
|
|
|
|
0
|
my $pc = unpack('N', substr($data, $off, 4)); $off += 4; |
|
|
0
|
|
|
|
|
0
|
|
|
1063
|
0
|
|
|
|
|
0
|
for my $j (0..$pc-1) { |
|
1064
|
0
|
0
|
|
|
|
0
|
last unless $off + 4 <= $dlen; |
|
1065
|
0
|
|
|
|
|
0
|
my $pid = unpack('N', substr($data, $off, 4)); $off += 4; |
|
|
0
|
|
|
|
|
0
|
|
|
1066
|
0
|
0
|
0
|
|
|
0
|
my $reset = ($g->{auto_offset_reset} // 'earliest') eq 'latest' ? -1 : -2; |
|
1067
|
0
|
|
|
|
|
0
|
push @my_assignments, { |
|
1068
|
|
|
|
|
|
|
topic => $tname, partition => $pid, offset => $reset |
|
1069
|
|
|
|
|
|
|
}; |
|
1070
|
|
|
|
|
|
|
} |
|
1071
|
|
|
|
|
|
|
} |
|
1072
|
|
|
|
|
|
|
} |
|
1073
|
|
|
|
|
|
|
|
|
1074
|
0
|
|
|
|
|
0
|
$g->{state} = 'stable'; |
|
1075
|
|
|
|
|
|
|
|
|
1076
|
|
|
|
|
|
|
# Fetch committed offsets, then start consuming |
|
1077
|
|
|
|
|
|
|
$self->_fetch_committed_offsets(\@my_assignments, sub { |
|
1078
|
0
|
|
|
|
|
0
|
$cfg->{assignments} = \@my_assignments; |
|
1079
|
|
|
|
|
|
|
|
|
1080
|
|
|
|
|
|
|
# Fire on_assign |
|
1081
|
0
|
0
|
|
|
|
0
|
$g->{on_assign}->(\@my_assignments) if $g->{on_assign}; |
|
1082
|
|
|
|
|
|
|
|
|
1083
|
|
|
|
|
|
|
# Start heartbeat |
|
1084
|
0
|
|
|
|
|
0
|
$self->_start_heartbeat; |
|
1085
|
|
|
|
|
|
|
|
|
1086
|
|
|
|
|
|
|
# Start fetch loop |
|
1087
|
0
|
|
|
|
|
0
|
$self->_start_fetch_loop; |
|
1088
|
0
|
|
|
|
|
0
|
}); |
|
1089
|
0
|
|
|
|
|
0
|
}; |
|
1090
|
|
|
|
|
|
|
$coord->sync_group( |
|
1091
|
|
|
|
|
|
|
$g->{group_id}, $g->{generation}, $g->{member_id}, |
|
1092
|
|
|
|
|
|
|
$assignments, $sync_cb, $g->{group_instance_id} |
|
1093
|
0
|
|
|
|
|
0
|
); |
|
1094
|
|
|
|
|
|
|
} |
|
1095
|
|
|
|
|
|
|
|
|
1096
|
|
|
|
|
|
|
sub _fetch_committed_offsets { |
|
1097
|
0
|
|
|
0
|
|
0
|
my ($self, $assignments, $cb) = @_; |
|
1098
|
0
|
|
|
|
|
0
|
my $cfg = $self->{cfg}; |
|
1099
|
0
|
0
|
|
|
|
0
|
my $g = $cfg->{group} or return $cb->(); |
|
1100
|
0
|
|
|
|
|
0
|
my $coord = $g->{coordinator}; |
|
1101
|
0
|
0
|
0
|
|
|
0
|
return $cb->() unless $coord && $coord->connected && @$assignments; |
|
|
|
|
0
|
|
|
|
|
|
1102
|
|
|
|
|
|
|
|
|
1103
|
|
|
|
|
|
|
# Build topics array for offset_fetch |
|
1104
|
0
|
|
|
|
|
0
|
my %by_topic; |
|
1105
|
0
|
|
|
|
|
0
|
for my $a (@$assignments) { |
|
1106
|
0
|
|
|
|
|
0
|
push @{$by_topic{$a->{topic}}}, $a->{partition}; |
|
|
0
|
|
|
|
|
0
|
|
|
1107
|
|
|
|
|
|
|
} |
|
1108
|
0
|
|
|
|
|
0
|
my @topics; |
|
1109
|
0
|
|
|
|
|
0
|
for my $t (sort keys %by_topic) { |
|
1110
|
0
|
|
|
|
|
0
|
push @topics, { topic => $t, partitions => $by_topic{$t} }; |
|
1111
|
|
|
|
|
|
|
} |
|
1112
|
|
|
|
|
|
|
|
|
1113
|
|
|
|
|
|
|
$coord->offset_fetch($g->{group_id}, \@topics, sub { |
|
1114
|
0
|
|
|
0
|
|
0
|
my ($res, $err) = @_; |
|
1115
|
0
|
0
|
0
|
|
|
0
|
if (!$err && $res && ref $res->{topics} eq 'ARRAY') { |
|
|
|
|
0
|
|
|
|
|
|
1116
|
0
|
|
|
|
|
0
|
for my $t (@{$res->{topics}}) { |
|
|
0
|
|
|
|
|
0
|
|
|
1117
|
0
|
|
0
|
|
|
0
|
for my $p (@{$t->{partitions} // []}) { |
|
|
0
|
|
|
|
|
0
|
|
|
1118
|
0
|
0
|
|
|
|
0
|
next if $p->{error_code}; |
|
1119
|
0
|
0
|
|
|
|
0
|
next if $p->{offset} < 0; # no committed offset |
|
1120
|
0
|
|
|
|
|
0
|
for my $a (@$assignments) { |
|
1121
|
0
|
0
|
0
|
|
|
0
|
if ($a->{topic} eq $t->{topic} && $a->{partition} == $p->{partition}) { |
|
1122
|
0
|
|
|
|
|
0
|
$a->{offset} = $p->{offset}; |
|
1123
|
|
|
|
|
|
|
} |
|
1124
|
|
|
|
|
|
|
} |
|
1125
|
|
|
|
|
|
|
} |
|
1126
|
|
|
|
|
|
|
} |
|
1127
|
|
|
|
|
|
|
} |
|
1128
|
|
|
|
|
|
|
|
|
1129
|
|
|
|
|
|
|
# For partitions with unresolved offset (-2=earliest, -1=latest), resolve via ListOffsets |
|
1130
|
0
|
|
|
|
|
0
|
my @need_offsets = grep { $_->{offset} < 0 } @$assignments; |
|
|
0
|
|
|
|
|
0
|
|
|
1131
|
0
|
0
|
|
|
|
0
|
if (@need_offsets) { |
|
1132
|
0
|
|
|
|
|
0
|
my $remaining = scalar @need_offsets; |
|
1133
|
0
|
|
|
|
|
0
|
for my $a (@need_offsets) { |
|
1134
|
0
|
|
|
|
|
0
|
my $leader_id = $self->_get_leader($a->{topic}, $a->{partition}); |
|
1135
|
0
|
0
|
|
|
|
0
|
my $lconn = defined($leader_id) ? $self->_get_or_create_conn($leader_id) : undef; |
|
1136
|
0
|
0
|
0
|
|
|
0
|
if ($lconn && $lconn->connected) { |
|
1137
|
|
|
|
|
|
|
$lconn->list_offsets($a->{topic}, $a->{partition}, $a->{offset}, sub { |
|
1138
|
0
|
|
|
|
|
0
|
my ($lres, $lerr) = @_; |
|
1139
|
0
|
0
|
0
|
|
|
0
|
if (!$lerr && $lres && ref $lres->{topics} eq 'ARRAY') { |
|
|
|
|
0
|
|
|
|
|
|
1140
|
0
|
|
|
|
|
0
|
for my $lt (@{$lres->{topics}}) { |
|
|
0
|
|
|
|
|
0
|
|
|
1141
|
0
|
|
0
|
|
|
0
|
for my $lp (@{$lt->{partitions} // []}) { |
|
|
0
|
|
|
|
|
0
|
|
|
1142
|
0
|
0
|
|
|
|
0
|
$a->{offset} = $lp->{offset} if !$lp->{error_code}; |
|
1143
|
|
|
|
|
|
|
} |
|
1144
|
|
|
|
|
|
|
} |
|
1145
|
|
|
|
|
|
|
} |
|
1146
|
0
|
|
|
|
|
0
|
$remaining--; |
|
1147
|
0
|
0
|
|
|
|
0
|
$cb->() if $remaining <= 0; |
|
1148
|
0
|
|
|
|
|
0
|
}); |
|
1149
|
|
|
|
|
|
|
} else { |
|
1150
|
0
|
|
|
|
|
0
|
$a->{offset} = 0; |
|
1151
|
0
|
|
|
|
|
0
|
$remaining--; |
|
1152
|
0
|
0
|
|
|
|
0
|
$cb->() if $remaining <= 0; |
|
1153
|
|
|
|
|
|
|
} |
|
1154
|
|
|
|
|
|
|
} |
|
1155
|
|
|
|
|
|
|
} else { |
|
1156
|
0
|
|
|
|
|
0
|
$cb->(); |
|
1157
|
|
|
|
|
|
|
} |
|
1158
|
0
|
|
|
|
|
0
|
}); |
|
1159
|
|
|
|
|
|
|
} |
|
1160
|
|
|
|
|
|
|
|
|
1161
|
|
|
|
|
|
|
sub _start_heartbeat { |
|
1162
|
0
|
|
|
0
|
|
0
|
my ($self) = @_; |
|
1163
|
0
|
|
|
|
|
0
|
my $cfg = $self->{cfg}; |
|
1164
|
0
|
0
|
|
|
|
0
|
my $g = $cfg->{group} or return; |
|
1165
|
|
|
|
|
|
|
|
|
1166
|
|
|
|
|
|
|
$g->{heartbeat_timer} = EV::timer $g->{heartbeat_interval}, $g->{heartbeat_interval}, sub { |
|
1167
|
0
|
0
|
|
0
|
|
0
|
return unless $g->{state} eq 'stable'; |
|
1168
|
0
|
|
|
|
|
0
|
my $coord = $g->{coordinator}; |
|
1169
|
0
|
0
|
0
|
|
|
0
|
return unless $coord && $coord->connected; |
|
1170
|
|
|
|
|
|
|
|
|
1171
|
|
|
|
|
|
|
$coord->heartbeat($g->{group_id}, $g->{generation}, $g->{member_id}, sub { |
|
1172
|
0
|
|
|
|
|
0
|
my ($res, $err) = @_; |
|
1173
|
0
|
0
|
|
|
|
0
|
if ($err) { return } |
|
|
0
|
|
|
|
|
0
|
|
|
1174
|
0
|
0
|
0
|
|
|
0
|
if ($res && $res->{error_code} == 27) { |
|
1175
|
|
|
|
|
|
|
# REBALANCE_IN_PROGRESS |
|
1176
|
0
|
|
|
|
|
0
|
$g->{state} = 'rebalancing'; |
|
1177
|
0
|
0
|
|
|
|
0
|
$g->{on_revoke}->($cfg->{assignments}) if $g->{on_revoke}; |
|
1178
|
0
|
|
|
|
|
0
|
$self->_stop_heartbeat; |
|
1179
|
0
|
|
|
|
|
0
|
$self->_stop_fetch_loop; |
|
1180
|
0
|
|
|
|
|
0
|
$self->_group_join; |
|
1181
|
|
|
|
|
|
|
} |
|
1182
|
0
|
|
|
|
|
0
|
}, $g->{group_instance_id}); |
|
1183
|
0
|
|
|
|
|
0
|
}; |
|
1184
|
|
|
|
|
|
|
} |
|
1185
|
|
|
|
|
|
|
|
|
1186
|
|
|
|
|
|
|
sub _stop_heartbeat { |
|
1187
|
4
|
|
|
4
|
|
8
|
my ($self) = @_; |
|
1188
|
4
|
50
|
|
|
|
14
|
my $g = $self->{cfg}{group} or return; |
|
1189
|
0
|
|
|
|
|
0
|
undef $g->{heartbeat_timer}; |
|
1190
|
|
|
|
|
|
|
} |
|
1191
|
|
|
|
|
|
|
|
|
1192
|
|
|
|
|
|
|
sub _start_fetch_loop { |
|
1193
|
0
|
|
|
0
|
|
0
|
my ($self) = @_; |
|
1194
|
0
|
|
|
|
|
0
|
my $cfg = $self->{cfg}; |
|
1195
|
0
|
0
|
|
|
|
0
|
return if $cfg->{fetch_active}; |
|
1196
|
0
|
|
|
|
|
0
|
$cfg->{fetch_active} = 1; |
|
1197
|
|
|
|
|
|
|
|
|
1198
|
|
|
|
|
|
|
$cfg->{fetch_timer} = EV::timer 0, 0.1, sub { |
|
1199
|
0
|
0
|
|
0
|
|
0
|
return unless $cfg->{fetch_active}; |
|
1200
|
0
|
|
|
|
|
0
|
$self->poll; |
|
1201
|
0
|
|
|
|
|
0
|
}; |
|
1202
|
|
|
|
|
|
|
} |
|
1203
|
|
|
|
|
|
|
|
|
1204
|
|
|
|
|
|
|
sub _stop_fetch_loop { |
|
1205
|
4
|
|
|
4
|
|
9
|
my ($self) = @_; |
|
1206
|
4
|
|
|
|
|
8
|
my $cfg = $self->{cfg}; |
|
1207
|
4
|
|
|
|
|
7
|
$cfg->{fetch_active} = 0; |
|
1208
|
4
|
|
|
|
|
9
|
undef $cfg->{fetch_timer}; |
|
1209
|
|
|
|
|
|
|
} |
|
1210
|
|
|
|
|
|
|
|
|
1211
|
|
|
|
|
|
|
sub commit { |
|
1212
|
0
|
|
|
0
|
|
0
|
my ($self, $cb) = @_; |
|
1213
|
0
|
|
|
|
|
0
|
my $cfg = $self->{cfg}; |
|
1214
|
0
|
|
|
|
|
0
|
my $g = $cfg->{group}; |
|
1215
|
0
|
0
|
|
|
|
0
|
unless ($g) { $cb->() if $cb; return } |
|
|
0
|
0
|
|
|
|
0
|
|
|
|
0
|
|
|
|
|
0
|
|
|
1216
|
0
|
|
|
|
|
0
|
my $coord = $g->{coordinator}; |
|
1217
|
0
|
0
|
0
|
|
|
0
|
unless ($coord && $coord->connected) { $cb->() if $cb; return } |
|
|
0
|
0
|
|
|
|
0
|
|
|
|
0
|
|
|
|
|
0
|
|
|
1218
|
|
|
|
|
|
|
|
|
1219
|
|
|
|
|
|
|
# Build offset commit data from current assignments |
|
1220
|
0
|
|
|
|
|
0
|
my %by_topic; |
|
1221
|
0
|
|
0
|
|
|
0
|
for my $a (@{$cfg->{assignments} // []}) { |
|
|
0
|
|
|
|
|
0
|
|
|
1222
|
0
|
|
|
|
|
0
|
push @{$by_topic{$a->{topic}}}, { |
|
1223
|
|
|
|
|
|
|
partition => $a->{partition}, |
|
1224
|
|
|
|
|
|
|
offset => $a->{offset}, |
|
1225
|
0
|
|
|
|
|
0
|
}; |
|
1226
|
|
|
|
|
|
|
} |
|
1227
|
|
|
|
|
|
|
|
|
1228
|
0
|
|
|
|
|
0
|
my @topics; |
|
1229
|
0
|
|
|
|
|
0
|
for my $t (sort keys %by_topic) { |
|
1230
|
0
|
|
|
|
|
0
|
push @topics, { topic => $t, partitions => $by_topic{$t} }; |
|
1231
|
|
|
|
|
|
|
} |
|
1232
|
|
|
|
|
|
|
|
|
1233
|
0
|
0
|
|
|
|
0
|
if (!@topics) { $cb->() if $cb; return } |
|
|
0
|
0
|
|
|
|
0
|
|
|
|
0
|
|
|
|
|
0
|
|
|
1234
|
|
|
|
|
|
|
|
|
1235
|
|
|
|
|
|
|
$coord->offset_commit($g->{group_id}, $g->{generation}, $g->{member_id}, \@topics, sub { |
|
1236
|
0
|
|
|
0
|
|
0
|
my ($res, $err) = @_; |
|
1237
|
0
|
0
|
|
|
|
0
|
$cb->($err) if $cb; |
|
1238
|
0
|
|
|
|
|
0
|
}); |
|
1239
|
|
|
|
|
|
|
} |
|
1240
|
|
|
|
|
|
|
|
|
1241
|
|
|
|
|
|
|
sub unsubscribe { |
|
1242
|
0
|
|
|
0
|
|
0
|
my ($self, $cb) = @_; |
|
1243
|
0
|
|
|
|
|
0
|
my $cfg = $self->{cfg}; |
|
1244
|
0
|
|
|
|
|
0
|
my $g = $cfg->{group}; |
|
1245
|
|
|
|
|
|
|
|
|
1246
|
0
|
|
|
|
|
0
|
$self->_stop_heartbeat; |
|
1247
|
0
|
|
|
|
|
0
|
$self->_stop_fetch_loop; |
|
1248
|
|
|
|
|
|
|
|
|
1249
|
|
|
|
|
|
|
my $finish = sub { |
|
1250
|
|
|
|
|
|
|
# send LeaveGroup to coordinator for fast rebalance |
|
1251
|
0
|
0
|
0
|
0
|
|
0
|
if ($g && $g->{coordinator} && $g->{coordinator}->connected && $g->{member_id}) { |
|
|
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
1252
|
|
|
|
|
|
|
$g->{coordinator}->leave_group($g->{group_id}, $g->{member_id}, sub { |
|
1253
|
0
|
|
|
|
|
0
|
$cfg->{assignments} = []; |
|
1254
|
0
|
|
|
|
|
0
|
$cfg->{group} = undef; |
|
1255
|
0
|
0
|
|
|
|
0
|
$cb->() if $cb; |
|
1256
|
0
|
|
|
|
|
0
|
}); |
|
1257
|
|
|
|
|
|
|
} else { |
|
1258
|
0
|
|
|
|
|
0
|
$cfg->{assignments} = []; |
|
1259
|
0
|
|
|
|
|
0
|
$cfg->{group} = undef; |
|
1260
|
0
|
0
|
|
|
|
0
|
$cb->() if $cb; |
|
1261
|
|
|
|
|
|
|
} |
|
1262
|
0
|
|
|
|
|
0
|
}; |
|
1263
|
|
|
|
|
|
|
|
|
1264
|
0
|
0
|
0
|
|
|
0
|
if ($g && $g->{auto_commit}) { |
|
1265
|
0
|
|
|
0
|
|
0
|
$self->commit(sub { $finish->() }); |
|
|
0
|
|
|
|
|
0
|
|
|
1266
|
|
|
|
|
|
|
} else { |
|
1267
|
0
|
|
|
|
|
0
|
$finish->(); |
|
1268
|
|
|
|
|
|
|
} |
|
1269
|
|
|
|
|
|
|
} |
|
1270
|
|
|
|
|
|
|
|
|
1271
|
|
|
|
|
|
|
# --- Transactions --- |
|
1272
|
|
|
|
|
|
|
|
|
1273
|
|
|
|
|
|
|
sub begin_transaction { |
|
1274
|
0
|
|
|
0
|
|
0
|
my ($self) = @_; |
|
1275
|
0
|
|
|
|
|
0
|
my $cfg = $self->{cfg}; |
|
1276
|
0
|
0
|
|
|
|
0
|
die "transactional_id required" unless $cfg->{transactional_id}; |
|
1277
|
0
|
0
|
0
|
|
|
0
|
die "producer_id not initialized" unless defined $cfg->{producer_id} && $cfg->{producer_id} >= 0; |
|
1278
|
0
|
|
|
|
|
0
|
$cfg->{_txn_active} = 1; |
|
1279
|
0
|
|
|
|
|
0
|
$cfg->{_txn_partitions} = {}; # "topic:partition" => 1 |
|
1280
|
|
|
|
|
|
|
} |
|
1281
|
|
|
|
|
|
|
|
|
1282
|
|
|
|
|
|
|
sub _txn_conn { |
|
1283
|
0
|
|
|
0
|
|
0
|
my ($self) = @_; |
|
1284
|
0
|
|
|
|
|
0
|
my $cfg = $self->{cfg}; |
|
1285
|
0
|
|
|
|
|
0
|
my $conn = $cfg->{_txn_coordinator}; |
|
1286
|
0
|
0
|
0
|
|
|
0
|
return $conn if $conn && $conn->connected; |
|
1287
|
0
|
|
|
|
|
0
|
return $self->_any_conn; |
|
1288
|
|
|
|
|
|
|
} |
|
1289
|
|
|
|
|
|
|
|
|
1290
|
|
|
|
|
|
|
sub _add_txn_partition { |
|
1291
|
0
|
|
|
0
|
|
0
|
my ($self, $topic, $partition) = @_; |
|
1292
|
0
|
|
|
|
|
0
|
my $cfg = $self->{cfg}; |
|
1293
|
0
|
0
|
|
|
|
0
|
return unless $cfg->{_txn_active}; |
|
1294
|
0
|
|
|
|
|
0
|
my $key = "$topic:$partition"; |
|
1295
|
0
|
0
|
|
|
|
0
|
return if $cfg->{_txn_partitions}{$key}++; |
|
1296
|
|
|
|
|
|
|
|
|
1297
|
0
|
|
|
|
|
0
|
my $conn = $self->_txn_conn; |
|
1298
|
0
|
0
|
|
|
|
0
|
return unless $conn; |
|
1299
|
|
|
|
|
|
|
|
|
1300
|
|
|
|
|
|
|
$conn->add_partitions_to_txn( |
|
1301
|
|
|
|
|
|
|
$cfg->{transactional_id}, $cfg->{producer_id}, |
|
1302
|
|
|
|
|
|
|
$cfg->{producer_epoch}, |
|
1303
|
|
|
|
|
|
|
[{ topic => $topic, partitions => [$partition] }], |
|
1304
|
|
|
|
0
|
|
|
sub {} |
|
1305
|
0
|
|
|
|
|
0
|
); |
|
1306
|
|
|
|
|
|
|
} |
|
1307
|
|
|
|
|
|
|
|
|
1308
|
|
|
|
|
|
|
sub commit_transaction { |
|
1309
|
0
|
|
|
0
|
|
0
|
my ($self, $cb) = @_; |
|
1310
|
0
|
|
|
|
|
0
|
my $cfg = $self->{cfg}; |
|
1311
|
0
|
0
|
|
|
|
0
|
die "no active transaction" unless $cfg->{_txn_active}; |
|
1312
|
|
|
|
|
|
|
|
|
1313
|
|
|
|
|
|
|
# flush all pending batches first |
|
1314
|
|
|
|
|
|
|
$self->flush(sub { |
|
1315
|
0
|
|
|
0
|
|
0
|
my $conn = $self->_txn_conn; |
|
1316
|
0
|
0
|
|
|
|
0
|
unless ($conn) { $cb->(undef) if $cb; return } |
|
|
0
|
0
|
|
|
|
0
|
|
|
|
0
|
|
|
|
|
0
|
|
|
1317
|
|
|
|
|
|
|
|
|
1318
|
|
|
|
|
|
|
$conn->end_txn($cfg->{transactional_id}, $cfg->{producer_id}, |
|
1319
|
|
|
|
|
|
|
$cfg->{producer_epoch}, 1, sub { |
|
1320
|
0
|
|
|
|
|
0
|
my ($res, $err) = @_; |
|
1321
|
0
|
|
|
|
|
0
|
$cfg->{_txn_active} = 0; |
|
1322
|
0
|
|
|
|
|
0
|
$cfg->{_txn_partitions} = {}; |
|
1323
|
0
|
0
|
|
|
|
0
|
$cb->($res, $err) if $cb; |
|
1324
|
0
|
|
|
|
|
0
|
}); |
|
1325
|
0
|
|
|
|
|
0
|
}); |
|
1326
|
|
|
|
|
|
|
} |
|
1327
|
|
|
|
|
|
|
|
|
1328
|
|
|
|
|
|
|
sub send_offsets_to_transaction { |
|
1329
|
0
|
|
|
0
|
|
0
|
my ($self, $group_id, $cb) = @_; |
|
1330
|
0
|
|
|
|
|
0
|
my $cfg = $self->{cfg}; |
|
1331
|
0
|
0
|
|
|
|
0
|
die "no active transaction" unless $cfg->{_txn_active}; |
|
1332
|
0
|
0
|
|
|
|
0
|
die "transactional_id required" unless $cfg->{transactional_id}; |
|
1333
|
|
|
|
|
|
|
|
|
1334
|
|
|
|
|
|
|
# gather current consumer offsets |
|
1335
|
0
|
|
|
|
|
0
|
my %by_topic; |
|
1336
|
0
|
|
0
|
|
|
0
|
for my $a (@{$cfg->{assignments} // []}) { |
|
|
0
|
|
|
|
|
0
|
|
|
1337
|
0
|
|
|
|
|
0
|
push @{$by_topic{$a->{topic}}}, { |
|
1338
|
|
|
|
|
|
|
partition => $a->{partition}, |
|
1339
|
|
|
|
|
|
|
offset => $a->{offset}, |
|
1340
|
0
|
|
|
|
|
0
|
}; |
|
1341
|
|
|
|
|
|
|
} |
|
1342
|
|
|
|
|
|
|
|
|
1343
|
0
|
|
|
|
|
0
|
my @topics; |
|
1344
|
0
|
|
|
|
|
0
|
for my $t (sort keys %by_topic) { |
|
1345
|
0
|
|
|
|
|
0
|
push @topics, { topic => $t, partitions => $by_topic{$t} }; |
|
1346
|
|
|
|
|
|
|
} |
|
1347
|
|
|
|
|
|
|
|
|
1348
|
0
|
0
|
|
|
|
0
|
unless (@topics) { |
|
1349
|
0
|
0
|
|
|
|
0
|
$cb->() if $cb; |
|
1350
|
0
|
|
|
|
|
0
|
return; |
|
1351
|
|
|
|
|
|
|
} |
|
1352
|
|
|
|
|
|
|
|
|
1353
|
0
|
|
|
|
|
0
|
my $conn = $self->_txn_conn; |
|
1354
|
0
|
0
|
|
|
|
0
|
unless ($conn) { $cb->() if $cb; return } |
|
|
0
|
0
|
|
|
|
0
|
|
|
|
0
|
|
|
|
|
0
|
|
|
1355
|
|
|
|
|
|
|
|
|
1356
|
0
|
|
|
|
|
0
|
my $g = $cfg->{group}; |
|
1357
|
0
|
0
|
|
|
|
0
|
my $generation = $g ? $g->{generation} : -1; |
|
1358
|
0
|
0
|
0
|
|
|
0
|
my $member_id = $g ? ($g->{member_id} // '') : ''; |
|
1359
|
|
|
|
|
|
|
|
|
1360
|
|
|
|
|
|
|
$conn->txn_offset_commit( |
|
1361
|
|
|
|
|
|
|
$cfg->{transactional_id}, $group_id, |
|
1362
|
|
|
|
|
|
|
$cfg->{producer_id}, $cfg->{producer_epoch}, |
|
1363
|
|
|
|
|
|
|
$generation, $member_id, |
|
1364
|
|
|
|
|
|
|
\@topics, sub { |
|
1365
|
0
|
|
|
0
|
|
0
|
my ($res, $err) = @_; |
|
1366
|
0
|
0
|
|
|
|
0
|
$cb->($res, $err) if $cb; |
|
1367
|
|
|
|
|
|
|
} |
|
1368
|
0
|
|
|
|
|
0
|
); |
|
1369
|
|
|
|
|
|
|
} |
|
1370
|
|
|
|
|
|
|
|
|
1371
|
|
|
|
|
|
|
sub abort_transaction { |
|
1372
|
0
|
|
|
0
|
|
0
|
my ($self, $cb) = @_; |
|
1373
|
0
|
|
|
|
|
0
|
my $cfg = $self->{cfg}; |
|
1374
|
0
|
0
|
|
|
|
0
|
die "no active transaction" unless $cfg->{_txn_active}; |
|
1375
|
|
|
|
|
|
|
|
|
1376
|
|
|
|
|
|
|
# discard unsent batches — they must not reach the broker after abort |
|
1377
|
0
|
|
|
|
|
0
|
$cfg->{batches} = {}; |
|
1378
|
0
|
|
|
|
|
0
|
undef $cfg->{_linger_timer}; |
|
1379
|
0
|
|
|
|
|
0
|
$cfg->{_linger_active} = 0; |
|
1380
|
|
|
|
|
|
|
|
|
1381
|
0
|
|
|
|
|
0
|
my $conn = $self->_txn_conn; |
|
1382
|
0
|
0
|
|
|
|
0
|
unless ($conn) { $cb->(undef) if $cb; return } |
|
|
0
|
0
|
|
|
|
0
|
|
|
|
0
|
|
|
|
|
0
|
|
|
1383
|
|
|
|
|
|
|
|
|
1384
|
|
|
|
|
|
|
$conn->end_txn($cfg->{transactional_id}, $cfg->{producer_id}, |
|
1385
|
|
|
|
|
|
|
$cfg->{producer_epoch}, 0, sub { |
|
1386
|
0
|
|
|
0
|
|
0
|
my ($res, $err) = @_; |
|
1387
|
0
|
|
|
|
|
0
|
$cfg->{_txn_active} = 0; |
|
1388
|
0
|
|
|
|
|
0
|
$cfg->{_txn_partitions} = {}; |
|
1389
|
0
|
0
|
|
|
|
0
|
$cb->($res, $err) if $cb; |
|
1390
|
0
|
|
|
|
|
0
|
}); |
|
1391
|
|
|
|
|
|
|
} |
|
1392
|
|
|
|
|
|
|
|
|
1393
|
|
|
|
|
|
|
sub close { |
|
1394
|
4
|
|
|
4
|
|
8
|
my ($self, $cb) = @_; |
|
1395
|
4
|
50
|
|
|
|
10
|
my $cfg = $self->{cfg} or return; |
|
1396
|
|
|
|
|
|
|
|
|
1397
|
4
|
|
|
|
|
17
|
$self->_stop_heartbeat; |
|
1398
|
4
|
|
|
|
|
16
|
$self->_stop_fetch_loop; |
|
1399
|
|
|
|
|
|
|
|
|
1400
|
4
|
|
50
|
|
|
5
|
for my $conn (values %{$cfg->{conns} // {}}) { |
|
|
4
|
|
|
|
|
19
|
|
|
1401
|
0
|
0
|
0
|
|
|
0
|
eval { $conn->disconnect if $conn && $conn->connected }; |
|
|
0
|
|
|
|
|
0
|
|
|
1402
|
|
|
|
|
|
|
} |
|
1403
|
4
|
50
|
|
|
|
10
|
if ($cfg->{bootstrap_conn}) { |
|
1404
|
0
|
|
|
|
|
0
|
eval { $cfg->{bootstrap_conn}->disconnect |
|
1405
|
0
|
0
|
|
|
|
0
|
if $cfg->{bootstrap_conn}->connected }; |
|
1406
|
|
|
|
|
|
|
} |
|
1407
|
4
|
|
|
|
|
12
|
$cfg->{conns} = {}; |
|
1408
|
4
|
|
|
|
|
6
|
$cfg->{bootstrap_conn} = undef; |
|
1409
|
4
|
50
|
|
|
|
179
|
$cb->() if $cb; |
|
1410
|
|
|
|
|
|
|
} |
|
1411
|
|
|
|
|
|
|
|
|
1412
|
|
|
|
|
|
|
sub DESTROY { |
|
1413
|
4
|
|
|
4
|
|
10219
|
my $self = shift; |
|
1414
|
4
|
50
|
33
|
|
|
39
|
return unless $self && $self->{cfg}; |
|
1415
|
4
|
|
|
|
|
17
|
$self->close; |
|
1416
|
|
|
|
|
|
|
} |
|
1417
|
|
|
|
|
|
|
|
|
1418
|
|
|
|
|
|
|
package EV::Kafka::Conn; |
|
1419
|
|
|
|
|
|
|
|
|
1420
|
|
|
|
|
|
|
1; |
|
1421
|
|
|
|
|
|
|
|
|
1422
|
|
|
|
|
|
|
=head1 NAME |
|
1423
|
|
|
|
|
|
|
|
|
1424
|
|
|
|
|
|
|
EV::Kafka - High-performance asynchronous Kafka/Redpanda client using EV |
|
1425
|
|
|
|
|
|
|
|
|
1426
|
|
|
|
|
|
|
=head1 SYNOPSIS |
|
1427
|
|
|
|
|
|
|
|
|
1428
|
|
|
|
|
|
|
use EV::Kafka; |
|
1429
|
|
|
|
|
|
|
|
|
1430
|
|
|
|
|
|
|
my $kafka = EV::Kafka->new( |
|
1431
|
|
|
|
|
|
|
brokers => '127.0.0.1:9092', |
|
1432
|
|
|
|
|
|
|
acks => -1, |
|
1433
|
|
|
|
|
|
|
on_error => sub { warn "kafka: @_" }, |
|
1434
|
|
|
|
|
|
|
on_message => sub { |
|
1435
|
|
|
|
|
|
|
my ($topic, $partition, $offset, $key, $value, $headers) = @_; |
|
1436
|
|
|
|
|
|
|
print "$topic:$partition @ $offset $key = $value\n"; |
|
1437
|
|
|
|
|
|
|
}, |
|
1438
|
|
|
|
|
|
|
); |
|
1439
|
|
|
|
|
|
|
|
|
1440
|
|
|
|
|
|
|
# Producer |
|
1441
|
|
|
|
|
|
|
$kafka->connect(sub { |
|
1442
|
|
|
|
|
|
|
$kafka->produce('my-topic', 'key', 'value', sub { |
|
1443
|
|
|
|
|
|
|
my ($result, $err) = @_; |
|
1444
|
|
|
|
|
|
|
say "produced at offset " . $result->{topics}[0]{partitions}[0]{base_offset}; |
|
1445
|
|
|
|
|
|
|
}); |
|
1446
|
|
|
|
|
|
|
}); |
|
1447
|
|
|
|
|
|
|
|
|
1448
|
|
|
|
|
|
|
# Consumer (manual assignment) |
|
1449
|
|
|
|
|
|
|
$kafka->assign([{ topic => 'my-topic', partition => 0, offset => 0 }]); |
|
1450
|
|
|
|
|
|
|
my $poll = EV::timer 0, 0.1, sub { $kafka->poll }; |
|
1451
|
|
|
|
|
|
|
|
|
1452
|
|
|
|
|
|
|
# Consumer group |
|
1453
|
|
|
|
|
|
|
$kafka->subscribe('my-topic', |
|
1454
|
|
|
|
|
|
|
group_id => 'my-group', |
|
1455
|
|
|
|
|
|
|
on_assign => sub { ... }, |
|
1456
|
|
|
|
|
|
|
on_revoke => sub { ... }, |
|
1457
|
|
|
|
|
|
|
); |
|
1458
|
|
|
|
|
|
|
|
|
1459
|
|
|
|
|
|
|
EV::run; |
|
1460
|
|
|
|
|
|
|
|
|
1461
|
|
|
|
|
|
|
=head1 DESCRIPTION |
|
1462
|
|
|
|
|
|
|
|
|
1463
|
|
|
|
|
|
|
EV::Kafka is a high-performance asynchronous Kafka client that implements |
|
1464
|
|
|
|
|
|
|
the Kafka binary protocol in XS with L event loop integration. It |
|
1465
|
|
|
|
|
|
|
targets Redpanda and Apache Kafka (protocol version 0.11+). |
|
1466
|
|
|
|
|
|
|
|
|
1467
|
|
|
|
|
|
|
Two-layer architecture: |
|
1468
|
|
|
|
|
|
|
|
|
1469
|
|
|
|
|
|
|
=over |
|
1470
|
|
|
|
|
|
|
|
|
1471
|
|
|
|
|
|
|
=item * B (XS) -- single broker TCP connection with |
|
1472
|
|
|
|
|
|
|
protocol encoding/decoding, correlation ID matching, pipelining, |
|
1473
|
|
|
|
|
|
|
optional TLS and SASL/PLAIN authentication. |
|
1474
|
|
|
|
|
|
|
|
|
1475
|
|
|
|
|
|
|
=item * B (Perl) -- cluster management with metadata |
|
1476
|
|
|
|
|
|
|
discovery, broker connection pooling, partition leader routing, producer |
|
1477
|
|
|
|
|
|
|
with key-based partitioning, consumer with manual assignment or consumer |
|
1478
|
|
|
|
|
|
|
groups. |
|
1479
|
|
|
|
|
|
|
|
|
1480
|
|
|
|
|
|
|
=back |
|
1481
|
|
|
|
|
|
|
|
|
1482
|
|
|
|
|
|
|
Features: |
|
1483
|
|
|
|
|
|
|
|
|
1484
|
|
|
|
|
|
|
=over |
|
1485
|
|
|
|
|
|
|
|
|
1486
|
|
|
|
|
|
|
=item * Binary protocol implemented in pure XS (no librdkafka dependency) |
|
1487
|
|
|
|
|
|
|
|
|
1488
|
|
|
|
|
|
|
=item * Automatic request pipelining per broker connection |
|
1489
|
|
|
|
|
|
|
|
|
1490
|
|
|
|
|
|
|
=item * Metadata-driven partition leader routing |
|
1491
|
|
|
|
|
|
|
|
|
1492
|
|
|
|
|
|
|
=item * Producer: acks modes (-1/0/1), key-based partitioning (murmur2), |
|
1493
|
|
|
|
|
|
|
headers, fire-and-forget (acks=0) |
|
1494
|
|
|
|
|
|
|
|
|
1495
|
|
|
|
|
|
|
=item * Consumer: manual partition assignment, offset tracking, poll-based |
|
1496
|
|
|
|
|
|
|
message delivery |
|
1497
|
|
|
|
|
|
|
|
|
1498
|
|
|
|
|
|
|
=item * Consumer groups: JoinGroup/SyncGroup/Heartbeat, sticky |
|
1499
|
|
|
|
|
|
|
partition assignment, offset commit/fetch, automatic rebalancing |
|
1500
|
|
|
|
|
|
|
|
|
1501
|
|
|
|
|
|
|
=item * TLS (OpenSSL) and SASL/PLAIN authentication |
|
1502
|
|
|
|
|
|
|
|
|
1503
|
|
|
|
|
|
|
=item * Automatic reconnection at the connection layer |
|
1504
|
|
|
|
|
|
|
|
|
1505
|
|
|
|
|
|
|
=item * Bootstrap broker failover (tries all listed brokers) |
|
1506
|
|
|
|
|
|
|
|
|
1507
|
|
|
|
|
|
|
=back |
|
1508
|
|
|
|
|
|
|
|
|
1509
|
|
|
|
|
|
|
=head1 ANYEVENT INTEGRATION |
|
1510
|
|
|
|
|
|
|
|
|
1511
|
|
|
|
|
|
|
L has EV as one of its backends, so EV::Kafka can be used |
|
1512
|
|
|
|
|
|
|
in AnyEvent applications seamlessly. |
|
1513
|
|
|
|
|
|
|
|
|
1514
|
|
|
|
|
|
|
=head1 NO UTF-8 SUPPORT |
|
1515
|
|
|
|
|
|
|
|
|
1516
|
|
|
|
|
|
|
This module handles all values as bytes. Encode your UTF-8 strings |
|
1517
|
|
|
|
|
|
|
before passing them: |
|
1518
|
|
|
|
|
|
|
|
|
1519
|
|
|
|
|
|
|
use Encode; |
|
1520
|
|
|
|
|
|
|
|
|
1521
|
|
|
|
|
|
|
$kafka->produce($topic, $key, encode_utf8($val), sub { ... }); |
|
1522
|
|
|
|
|
|
|
|
|
1523
|
|
|
|
|
|
|
=head1 CLUSTER CLIENT METHODS |
|
1524
|
|
|
|
|
|
|
|
|
1525
|
|
|
|
|
|
|
=head2 new(%options) |
|
1526
|
|
|
|
|
|
|
|
|
1527
|
|
|
|
|
|
|
Create a new EV::Kafka client. Returns a blessed C |
|
1528
|
|
|
|
|
|
|
object. |
|
1529
|
|
|
|
|
|
|
|
|
1530
|
|
|
|
|
|
|
my $kafka = EV::Kafka->new( |
|
1531
|
|
|
|
|
|
|
brokers => '10.0.0.1:9092,10.0.0.2:9092', |
|
1532
|
|
|
|
|
|
|
acks => -1, |
|
1533
|
|
|
|
|
|
|
on_error => sub { warn @_ }, |
|
1534
|
|
|
|
|
|
|
); |
|
1535
|
|
|
|
|
|
|
|
|
1536
|
|
|
|
|
|
|
Options: |
|
1537
|
|
|
|
|
|
|
|
|
1538
|
|
|
|
|
|
|
=over |
|
1539
|
|
|
|
|
|
|
|
|
1540
|
|
|
|
|
|
|
=item brokers => 'Str' |
|
1541
|
|
|
|
|
|
|
|
|
1542
|
|
|
|
|
|
|
Comma-separated list of bootstrap broker addresses (host:port). |
|
1543
|
|
|
|
|
|
|
Default: C<127.0.0.1:9092>. |
|
1544
|
|
|
|
|
|
|
|
|
1545
|
|
|
|
|
|
|
=item client_id => 'Str' (default 'ev-kafka') |
|
1546
|
|
|
|
|
|
|
|
|
1547
|
|
|
|
|
|
|
Client identifier sent to brokers. |
|
1548
|
|
|
|
|
|
|
|
|
1549
|
|
|
|
|
|
|
=item tls => Bool |
|
1550
|
|
|
|
|
|
|
|
|
1551
|
|
|
|
|
|
|
Enable TLS encryption. |
|
1552
|
|
|
|
|
|
|
|
|
1553
|
|
|
|
|
|
|
=item tls_ca_file => 'Str' |
|
1554
|
|
|
|
|
|
|
|
|
1555
|
|
|
|
|
|
|
Path to CA certificate file for TLS verification. |
|
1556
|
|
|
|
|
|
|
|
|
1557
|
|
|
|
|
|
|
=item tls_skip_verify => Bool |
|
1558
|
|
|
|
|
|
|
|
|
1559
|
|
|
|
|
|
|
Skip TLS certificate verification. |
|
1560
|
|
|
|
|
|
|
|
|
1561
|
|
|
|
|
|
|
=item sasl => \%opts |
|
1562
|
|
|
|
|
|
|
|
|
1563
|
|
|
|
|
|
|
Enable SASL authentication. Supports PLAIN mechanism: |
|
1564
|
|
|
|
|
|
|
|
|
1565
|
|
|
|
|
|
|
sasl => { mechanism => 'PLAIN', username => 'user', password => 'pass' } |
|
1566
|
|
|
|
|
|
|
|
|
1567
|
|
|
|
|
|
|
=item acks => Int (default -1) |
|
1568
|
|
|
|
|
|
|
|
|
1569
|
|
|
|
|
|
|
Producer acknowledgment mode. C<-1> = all in-sync replicas, C<0> = no |
|
1570
|
|
|
|
|
|
|
acknowledgment (fire-and-forget), C<1> = leader only. |
|
1571
|
|
|
|
|
|
|
|
|
1572
|
|
|
|
|
|
|
=item linger_ms => Int (default 5) |
|
1573
|
|
|
|
|
|
|
|
|
1574
|
|
|
|
|
|
|
Time in milliseconds to accumulate records before flushing a batch. |
|
1575
|
|
|
|
|
|
|
Lower values reduce latency; higher values improve throughput. |
|
1576
|
|
|
|
|
|
|
|
|
1577
|
|
|
|
|
|
|
=item batch_size => Int (default 16384) |
|
1578
|
|
|
|
|
|
|
|
|
1579
|
|
|
|
|
|
|
Maximum batch size in bytes before a batch is flushed immediately. |
|
1580
|
|
|
|
|
|
|
|
|
1581
|
|
|
|
|
|
|
=item compression => 'Str' |
|
1582
|
|
|
|
|
|
|
|
|
1583
|
|
|
|
|
|
|
Compression type for produce batches: C<'lz4'> (requires liblz4), |
|
1584
|
|
|
|
|
|
|
C<'gzip'> (requires zlib), or C for none. |
|
1585
|
|
|
|
|
|
|
|
|
1586
|
|
|
|
|
|
|
=item idempotent => Bool (default 0) |
|
1587
|
|
|
|
|
|
|
|
|
1588
|
|
|
|
|
|
|
Enable idempotent producer. Calls C on connect and |
|
1589
|
|
|
|
|
|
|
sets producer_id/epoch/sequence in each RecordBatch for exactly-once |
|
1590
|
|
|
|
|
|
|
delivery (broker-side deduplication). |
|
1591
|
|
|
|
|
|
|
|
|
1592
|
|
|
|
|
|
|
=item transactional_id => 'Str' |
|
1593
|
|
|
|
|
|
|
|
|
1594
|
|
|
|
|
|
|
Enable transactional producer. Implies idempotent. Required for |
|
1595
|
|
|
|
|
|
|
C/C/C |
|
1596
|
|
|
|
|
|
|
and C (full EOS). |
|
1597
|
|
|
|
|
|
|
|
|
1598
|
|
|
|
|
|
|
=item partitioner => $cb->($topic, $key, $num_partitions) |
|
1599
|
|
|
|
|
|
|
|
|
1600
|
|
|
|
|
|
|
Custom partition selection function. Default: murmur2 hash of key, |
|
1601
|
|
|
|
|
|
|
or round-robin for null keys. |
|
1602
|
|
|
|
|
|
|
|
|
1603
|
|
|
|
|
|
|
=item on_error => $cb->($errstr) |
|
1604
|
|
|
|
|
|
|
|
|
1605
|
|
|
|
|
|
|
Error callback. Default: C. |
|
1606
|
|
|
|
|
|
|
|
|
1607
|
|
|
|
|
|
|
=item on_connect => $cb->() |
|
1608
|
|
|
|
|
|
|
|
|
1609
|
|
|
|
|
|
|
Called once after initial metadata fetch completes. |
|
1610
|
|
|
|
|
|
|
|
|
1611
|
|
|
|
|
|
|
=item on_message => $cb->($topic, $partition, $offset, $key, $value, $headers) |
|
1612
|
|
|
|
|
|
|
|
|
1613
|
|
|
|
|
|
|
Message delivery callback for consumer operations. |
|
1614
|
|
|
|
|
|
|
|
|
1615
|
|
|
|
|
|
|
=item fetch_max_wait_ms => Int (default 500) |
|
1616
|
|
|
|
|
|
|
|
|
1617
|
|
|
|
|
|
|
Maximum time the broker waits for C of data. |
|
1618
|
|
|
|
|
|
|
|
|
1619
|
|
|
|
|
|
|
=item fetch_max_bytes => Int (default 1048576) |
|
1620
|
|
|
|
|
|
|
|
|
1621
|
|
|
|
|
|
|
Maximum bytes per fetch response. |
|
1622
|
|
|
|
|
|
|
|
|
1623
|
|
|
|
|
|
|
=item fetch_min_bytes => Int (default 1) |
|
1624
|
|
|
|
|
|
|
|
|
1625
|
|
|
|
|
|
|
Minimum bytes before the broker responds to a fetch. |
|
1626
|
|
|
|
|
|
|
|
|
1627
|
|
|
|
|
|
|
=item metadata_refresh => Int (default 300) |
|
1628
|
|
|
|
|
|
|
|
|
1629
|
|
|
|
|
|
|
Metadata refresh interval in seconds (reserved, not yet wired). |
|
1630
|
|
|
|
|
|
|
|
|
1631
|
|
|
|
|
|
|
=item loop => EV::Loop |
|
1632
|
|
|
|
|
|
|
|
|
1633
|
|
|
|
|
|
|
EV loop to use. Default: C. |
|
1634
|
|
|
|
|
|
|
|
|
1635
|
|
|
|
|
|
|
=back |
|
1636
|
|
|
|
|
|
|
|
|
1637
|
|
|
|
|
|
|
=head2 connect($cb) |
|
1638
|
|
|
|
|
|
|
|
|
1639
|
|
|
|
|
|
|
Connect to the cluster. Connects to the first available bootstrap |
|
1640
|
|
|
|
|
|
|
broker, fetches cluster metadata, then fires C<$cb->($metadata)>. |
|
1641
|
|
|
|
|
|
|
|
|
1642
|
|
|
|
|
|
|
$kafka->connect(sub { |
|
1643
|
|
|
|
|
|
|
my $meta = shift; |
|
1644
|
|
|
|
|
|
|
# $meta->{brokers}, $meta->{topics} |
|
1645
|
|
|
|
|
|
|
}); |
|
1646
|
|
|
|
|
|
|
|
|
1647
|
|
|
|
|
|
|
=head2 produce($topic, $key, $value, [\%opts,] [$cb]) |
|
1648
|
|
|
|
|
|
|
|
|
1649
|
|
|
|
|
|
|
Produce a message. Routes to the correct partition leader automatically. |
|
1650
|
|
|
|
|
|
|
|
|
1651
|
|
|
|
|
|
|
# with callback (acks=1 or acks=-1) |
|
1652
|
|
|
|
|
|
|
$kafka->produce('topic', 'key', 'value', sub { |
|
1653
|
|
|
|
|
|
|
my ($result, $err) = @_; |
|
1654
|
|
|
|
|
|
|
}); |
|
1655
|
|
|
|
|
|
|
|
|
1656
|
|
|
|
|
|
|
# with headers |
|
1657
|
|
|
|
|
|
|
$kafka->produce('topic', 'key', 'value', |
|
1658
|
|
|
|
|
|
|
{ headers => { 'h1' => 'v1' } }, sub { ... }); |
|
1659
|
|
|
|
|
|
|
|
|
1660
|
|
|
|
|
|
|
# fire-and-forget (acks=0) |
|
1661
|
|
|
|
|
|
|
$kafka->produce('topic', 'key', 'value'); |
|
1662
|
|
|
|
|
|
|
|
|
1663
|
|
|
|
|
|
|
# explicit partition |
|
1664
|
|
|
|
|
|
|
$kafka->produce('topic', 'key', 'value', |
|
1665
|
|
|
|
|
|
|
{ partition => 3 }, sub { ... }); |
|
1666
|
|
|
|
|
|
|
|
|
1667
|
|
|
|
|
|
|
=head2 produce_many(\@messages, $cb) |
|
1668
|
|
|
|
|
|
|
|
|
1669
|
|
|
|
|
|
|
Produce multiple messages with a single completion callback. Each |
|
1670
|
|
|
|
|
|
|
message is an arrayref C<[$topic, $key, $value]> or a hashref |
|
1671
|
|
|
|
|
|
|
C<{topic, key, value}>. C<$cb> fires when all messages are acknowledged. |
|
1672
|
|
|
|
|
|
|
|
|
1673
|
|
|
|
|
|
|
$kafka->produce_many([ |
|
1674
|
|
|
|
|
|
|
['my-topic', 'k1', 'v1'], |
|
1675
|
|
|
|
|
|
|
['my-topic', 'k2', 'v2'], |
|
1676
|
|
|
|
|
|
|
], sub { |
|
1677
|
|
|
|
|
|
|
my $errors = shift; |
|
1678
|
|
|
|
|
|
|
warn "some failed: @$errors" if $errors; |
|
1679
|
|
|
|
|
|
|
}); |
|
1680
|
|
|
|
|
|
|
|
|
1681
|
|
|
|
|
|
|
=head2 flush([$cb]) |
|
1682
|
|
|
|
|
|
|
|
|
1683
|
|
|
|
|
|
|
Flush all accumulated produce batches and wait for all in-flight |
|
1684
|
|
|
|
|
|
|
requests to complete. C<$cb> fires when all pending responses have |
|
1685
|
|
|
|
|
|
|
been received. |
|
1686
|
|
|
|
|
|
|
|
|
1687
|
|
|
|
|
|
|
=head2 assign(\@partitions) |
|
1688
|
|
|
|
|
|
|
|
|
1689
|
|
|
|
|
|
|
Manually assign partitions for consuming. |
|
1690
|
|
|
|
|
|
|
|
|
1691
|
|
|
|
|
|
|
$kafka->assign([ |
|
1692
|
|
|
|
|
|
|
{ topic => 'my-topic', partition => 0, offset => 0 }, |
|
1693
|
|
|
|
|
|
|
{ topic => 'my-topic', partition => 1, offset => 100 }, |
|
1694
|
|
|
|
|
|
|
]); |
|
1695
|
|
|
|
|
|
|
|
|
1696
|
|
|
|
|
|
|
=head2 seek($topic, $partition, $offset, [$cb]) |
|
1697
|
|
|
|
|
|
|
|
|
1698
|
|
|
|
|
|
|
Seek a partition to a specific offset. Use C<-2> for earliest, C<-1> |
|
1699
|
|
|
|
|
|
|
for latest. Updates the assignment in-place. |
|
1700
|
|
|
|
|
|
|
|
|
1701
|
|
|
|
|
|
|
$kafka->seek('my-topic', 0, -1, sub { print "at latest\n" }); |
|
1702
|
|
|
|
|
|
|
|
|
1703
|
|
|
|
|
|
|
=head2 offsets_for($topic, $cb) |
|
1704
|
|
|
|
|
|
|
|
|
1705
|
|
|
|
|
|
|
Get earliest and latest offsets for all partitions of a topic. |
|
1706
|
|
|
|
|
|
|
|
|
1707
|
|
|
|
|
|
|
$kafka->offsets_for('my-topic', sub { |
|
1708
|
|
|
|
|
|
|
my $offsets = shift; |
|
1709
|
|
|
|
|
|
|
# { 0 => { earliest => 0, latest => 42 }, 1 => ... } |
|
1710
|
|
|
|
|
|
|
}); |
|
1711
|
|
|
|
|
|
|
|
|
1712
|
|
|
|
|
|
|
=head2 lag($cb) |
|
1713
|
|
|
|
|
|
|
|
|
1714
|
|
|
|
|
|
|
Get consumer lag for all assigned partitions. |
|
1715
|
|
|
|
|
|
|
|
|
1716
|
|
|
|
|
|
|
$kafka->lag(sub { |
|
1717
|
|
|
|
|
|
|
my $lag = shift; |
|
1718
|
|
|
|
|
|
|
# { "topic:0" => { current => 10, latest => 42, lag => 32 } } |
|
1719
|
|
|
|
|
|
|
}); |
|
1720
|
|
|
|
|
|
|
|
|
1721
|
|
|
|
|
|
|
=head2 error_name($code) |
|
1722
|
|
|
|
|
|
|
|
|
1723
|
|
|
|
|
|
|
Convert a Kafka numeric error code to its name. |
|
1724
|
|
|
|
|
|
|
|
|
1725
|
|
|
|
|
|
|
EV::Kafka::Client::error_name(3) # "UNKNOWN_TOPIC_OR_PARTITION" |
|
1726
|
|
|
|
|
|
|
|
|
1727
|
|
|
|
|
|
|
=head2 poll([$cb]) |
|
1728
|
|
|
|
|
|
|
|
|
1729
|
|
|
|
|
|
|
Fetch messages from assigned partitions. Calls C for each |
|
1730
|
|
|
|
|
|
|
received record. C<$cb> fires when all fetch responses have arrived. |
|
1731
|
|
|
|
|
|
|
|
|
1732
|
|
|
|
|
|
|
my $timer = EV::timer 0, 0.1, sub { $kafka->poll }; |
|
1733
|
|
|
|
|
|
|
|
|
1734
|
|
|
|
|
|
|
=head2 subscribe($topic, ..., %opts) |
|
1735
|
|
|
|
|
|
|
|
|
1736
|
|
|
|
|
|
|
Join a consumer group and subscribe to topics. The group protocol |
|
1737
|
|
|
|
|
|
|
handles partition assignment automatically. |
|
1738
|
|
|
|
|
|
|
|
|
1739
|
|
|
|
|
|
|
$kafka->subscribe('topic-a', 'topic-b', |
|
1740
|
|
|
|
|
|
|
group_id => 'my-group', |
|
1741
|
|
|
|
|
|
|
session_timeout => 30000, # ms |
|
1742
|
|
|
|
|
|
|
rebalance_timeout => 60000, # ms |
|
1743
|
|
|
|
|
|
|
heartbeat_interval => 3, # seconds |
|
1744
|
|
|
|
|
|
|
auto_commit => 1, # commit on unsubscribe (default) |
|
1745
|
|
|
|
|
|
|
auto_offset_reset => 'earliest', # or 'latest' |
|
1746
|
|
|
|
|
|
|
group_instance_id => 'pod-abc', # KIP-345 static membership |
|
1747
|
|
|
|
|
|
|
on_assign => sub { |
|
1748
|
|
|
|
|
|
|
my $partitions = shift; |
|
1749
|
|
|
|
|
|
|
# [{topic, partition, offset}, ...] |
|
1750
|
|
|
|
|
|
|
}, |
|
1751
|
|
|
|
|
|
|
on_revoke => sub { |
|
1752
|
|
|
|
|
|
|
my $partitions = shift; |
|
1753
|
|
|
|
|
|
|
}, |
|
1754
|
|
|
|
|
|
|
); |
|
1755
|
|
|
|
|
|
|
|
|
1756
|
|
|
|
|
|
|
=head2 commit([$cb]) |
|
1757
|
|
|
|
|
|
|
|
|
1758
|
|
|
|
|
|
|
Commit current consumer offsets to the group coordinator. |
|
1759
|
|
|
|
|
|
|
|
|
1760
|
|
|
|
|
|
|
$kafka->commit(sub { |
|
1761
|
|
|
|
|
|
|
my $err = shift; |
|
1762
|
|
|
|
|
|
|
warn "commit failed: $err" if $err; |
|
1763
|
|
|
|
|
|
|
}); |
|
1764
|
|
|
|
|
|
|
|
|
1765
|
|
|
|
|
|
|
=head2 unsubscribe([$cb]) |
|
1766
|
|
|
|
|
|
|
|
|
1767
|
|
|
|
|
|
|
Leave the consumer group (sends LeaveGroup for fast rebalance), |
|
1768
|
|
|
|
|
|
|
stop heartbeat and fetch loop. If C is enabled, |
|
1769
|
|
|
|
|
|
|
commits offsets before leaving. |
|
1770
|
|
|
|
|
|
|
|
|
1771
|
|
|
|
|
|
|
=head2 begin_transaction |
|
1772
|
|
|
|
|
|
|
|
|
1773
|
|
|
|
|
|
|
Start a transaction. Requires C in constructor. |
|
1774
|
|
|
|
|
|
|
|
|
1775
|
|
|
|
|
|
|
=head2 send_offsets_to_transaction($group_id, [$cb]) |
|
1776
|
|
|
|
|
|
|
|
|
1777
|
|
|
|
|
|
|
Commit consumer offsets within the current transaction via |
|
1778
|
|
|
|
|
|
|
C. This is the key step for exactly-once |
|
1779
|
|
|
|
|
|
|
consume-process-produce pipelines. |
|
1780
|
|
|
|
|
|
|
|
|
1781
|
|
|
|
|
|
|
$kafka->send_offsets_to_transaction('my-group', sub { |
|
1782
|
|
|
|
|
|
|
my ($result, $err) = @_; |
|
1783
|
|
|
|
|
|
|
}); |
|
1784
|
|
|
|
|
|
|
|
|
1785
|
|
|
|
|
|
|
=head2 commit_transaction([$cb]) |
|
1786
|
|
|
|
|
|
|
|
|
1787
|
|
|
|
|
|
|
Commit the current transaction. All produced messages and offset |
|
1788
|
|
|
|
|
|
|
commits within the transaction become visible atomically. |
|
1789
|
|
|
|
|
|
|
|
|
1790
|
|
|
|
|
|
|
=head2 abort_transaction([$cb]) |
|
1791
|
|
|
|
|
|
|
|
|
1792
|
|
|
|
|
|
|
Abort the current transaction. All produced messages are discarded |
|
1793
|
|
|
|
|
|
|
and offset commits are rolled back. |
|
1794
|
|
|
|
|
|
|
|
|
1795
|
|
|
|
|
|
|
=head2 close([$cb]) |
|
1796
|
|
|
|
|
|
|
|
|
1797
|
|
|
|
|
|
|
Graceful shutdown: stop timers, disconnect all broker connections. |
|
1798
|
|
|
|
|
|
|
|
|
1799
|
|
|
|
|
|
|
$kafka->close(sub { EV::break }); |
|
1800
|
|
|
|
|
|
|
|
|
1801
|
|
|
|
|
|
|
=head1 LOW-LEVEL CONNECTION METHODS |
|
1802
|
|
|
|
|
|
|
|
|
1803
|
|
|
|
|
|
|
C provides direct access to a single broker connection. |
|
1804
|
|
|
|
|
|
|
Useful for custom protocols, debugging, or when cluster-level routing |
|
1805
|
|
|
|
|
|
|
is not needed. |
|
1806
|
|
|
|
|
|
|
|
|
1807
|
|
|
|
|
|
|
my $conn = EV::Kafka::Conn::_new('EV::Kafka::Conn', undef); |
|
1808
|
|
|
|
|
|
|
$conn->on_error(sub { warn @_ }); |
|
1809
|
|
|
|
|
|
|
$conn->on_connect(sub { ... }); |
|
1810
|
|
|
|
|
|
|
$conn->connect('127.0.0.1', 9092, 5.0); |
|
1811
|
|
|
|
|
|
|
|
|
1812
|
|
|
|
|
|
|
=head2 connect($host, $port, [$timeout]) |
|
1813
|
|
|
|
|
|
|
|
|
1814
|
|
|
|
|
|
|
Connect to a broker. Timeout in seconds (0 = no timeout). |
|
1815
|
|
|
|
|
|
|
|
|
1816
|
|
|
|
|
|
|
=head2 disconnect |
|
1817
|
|
|
|
|
|
|
|
|
1818
|
|
|
|
|
|
|
Disconnect from broker. |
|
1819
|
|
|
|
|
|
|
|
|
1820
|
|
|
|
|
|
|
=head2 connected |
|
1821
|
|
|
|
|
|
|
|
|
1822
|
|
|
|
|
|
|
Returns true if the connection is ready (ApiVersions handshake complete). |
|
1823
|
|
|
|
|
|
|
|
|
1824
|
|
|
|
|
|
|
=head2 metadata(\@topics, $cb) |
|
1825
|
|
|
|
|
|
|
|
|
1826
|
|
|
|
|
|
|
Request cluster metadata. Pass C for all topics. |
|
1827
|
|
|
|
|
|
|
|
|
1828
|
|
|
|
|
|
|
$conn->metadata(['my-topic'], sub { |
|
1829
|
|
|
|
|
|
|
my ($result, $err) = @_; |
|
1830
|
|
|
|
|
|
|
# $result->{brokers}, $result->{topics} |
|
1831
|
|
|
|
|
|
|
}); |
|
1832
|
|
|
|
|
|
|
|
|
1833
|
|
|
|
|
|
|
=head2 produce($topic, $partition, $key, $value, [\%opts,] [$cb]) |
|
1834
|
|
|
|
|
|
|
|
|
1835
|
|
|
|
|
|
|
Produce a message to a specific partition. |
|
1836
|
|
|
|
|
|
|
|
|
1837
|
|
|
|
|
|
|
$conn->produce('topic', 0, 'key', 'value', sub { |
|
1838
|
|
|
|
|
|
|
my ($result, $err) = @_; |
|
1839
|
|
|
|
|
|
|
}); |
|
1840
|
|
|
|
|
|
|
|
|
1841
|
|
|
|
|
|
|
Options: C (default 1), C (hashref), C |
|
1842
|
|
|
|
|
|
|
(epoch ms, default now), C (C<'none'>, C<'lz4'>; requires |
|
1843
|
|
|
|
|
|
|
LZ4 at build time). |
|
1844
|
|
|
|
|
|
|
|
|
1845
|
|
|
|
|
|
|
=head2 produce_batch($topic, $partition, \@records, [\%opts,] [$cb]) |
|
1846
|
|
|
|
|
|
|
|
|
1847
|
|
|
|
|
|
|
Produce multiple records in a single RecordBatch. Each record is |
|
1848
|
|
|
|
|
|
|
C<{key, value, headers}>. Options: C, C, |
|
1849
|
|
|
|
|
|
|
C, C, C. |
|
1850
|
|
|
|
|
|
|
|
|
1851
|
|
|
|
|
|
|
$conn->produce_batch('topic', 0, [ |
|
1852
|
|
|
|
|
|
|
{ key => 'k1', value => 'v1' }, |
|
1853
|
|
|
|
|
|
|
{ key => 'k2', value => 'v2' }, |
|
1854
|
|
|
|
|
|
|
], sub { my ($result, $err) = @_ }); |
|
1855
|
|
|
|
|
|
|
|
|
1856
|
|
|
|
|
|
|
=head2 fetch($topic, $partition, $offset, $cb, [$max_bytes]) |
|
1857
|
|
|
|
|
|
|
|
|
1858
|
|
|
|
|
|
|
Fetch messages from a partition starting at C<$offset>. |
|
1859
|
|
|
|
|
|
|
|
|
1860
|
|
|
|
|
|
|
$conn->fetch('topic', 0, 0, sub { |
|
1861
|
|
|
|
|
|
|
my ($result, $err) = @_; |
|
1862
|
|
|
|
|
|
|
for my $rec (@{ $result->{topics}[0]{partitions}[0]{records} }) { |
|
1863
|
|
|
|
|
|
|
printf "offset=%d key=%s value=%s\n", |
|
1864
|
|
|
|
|
|
|
$rec->{offset}, $rec->{key}, $rec->{value}; |
|
1865
|
|
|
|
|
|
|
} |
|
1866
|
|
|
|
|
|
|
}); |
|
1867
|
|
|
|
|
|
|
|
|
1868
|
|
|
|
|
|
|
=head2 fetch_multi(\%topics, $cb, [$max_bytes]) |
|
1869
|
|
|
|
|
|
|
|
|
1870
|
|
|
|
|
|
|
Multi-partition fetch in a single request. Groups multiple |
|
1871
|
|
|
|
|
|
|
topic-partitions into one Fetch call to the broker. |
|
1872
|
|
|
|
|
|
|
|
|
1873
|
|
|
|
|
|
|
$conn->fetch_multi({ |
|
1874
|
|
|
|
|
|
|
'topic-a' => [{ partition => 0, offset => 10 }, |
|
1875
|
|
|
|
|
|
|
{ partition => 1, offset => 20 }], |
|
1876
|
|
|
|
|
|
|
'topic-b' => [{ partition => 0, offset => 0 }], |
|
1877
|
|
|
|
|
|
|
}, sub { my ($result, $err) = @_ }); |
|
1878
|
|
|
|
|
|
|
|
|
1879
|
|
|
|
|
|
|
Used internally by C to batch fetches by broker leader. |
|
1880
|
|
|
|
|
|
|
|
|
1881
|
|
|
|
|
|
|
=head2 list_offsets($topic, $partition, $timestamp, $cb) |
|
1882
|
|
|
|
|
|
|
|
|
1883
|
|
|
|
|
|
|
Get offsets by timestamp. Use C<-2> for earliest, C<-1> for latest. |
|
1884
|
|
|
|
|
|
|
|
|
1885
|
|
|
|
|
|
|
=head2 find_coordinator($key, $cb, [$key_type]) |
|
1886
|
|
|
|
|
|
|
|
|
1887
|
|
|
|
|
|
|
Find the coordinator broker. C<$key_type>: 0=group (default), |
|
1888
|
|
|
|
|
|
|
1=transaction. |
|
1889
|
|
|
|
|
|
|
|
|
1890
|
|
|
|
|
|
|
=head2 join_group($group_id, $member_id, \@topics, $cb, [$session_timeout_ms, $rebalance_timeout_ms, $group_instance_id]) |
|
1891
|
|
|
|
|
|
|
|
|
1892
|
|
|
|
|
|
|
Join a consumer group. Pass C<$group_instance_id> for KIP-345 static |
|
1893
|
|
|
|
|
|
|
membership. |
|
1894
|
|
|
|
|
|
|
|
|
1895
|
|
|
|
|
|
|
=head2 sync_group($group_id, $generation_id, $member_id, \@assignments, $cb, [$group_instance_id]) |
|
1896
|
|
|
|
|
|
|
|
|
1897
|
|
|
|
|
|
|
Synchronize group state after join. |
|
1898
|
|
|
|
|
|
|
|
|
1899
|
|
|
|
|
|
|
=head2 heartbeat($group_id, $generation_id, $member_id, $cb, [$group_instance_id]) |
|
1900
|
|
|
|
|
|
|
|
|
1901
|
|
|
|
|
|
|
Send heartbeat to group coordinator. |
|
1902
|
|
|
|
|
|
|
|
|
1903
|
|
|
|
|
|
|
=head2 offset_commit($group_id, $generation_id, $member_id, \@offsets, $cb) |
|
1904
|
|
|
|
|
|
|
|
|
1905
|
|
|
|
|
|
|
Commit consumer offsets. |
|
1906
|
|
|
|
|
|
|
|
|
1907
|
|
|
|
|
|
|
=head2 offset_fetch($group_id, \@topics, $cb) |
|
1908
|
|
|
|
|
|
|
|
|
1909
|
|
|
|
|
|
|
Fetch committed offsets for a consumer group. |
|
1910
|
|
|
|
|
|
|
|
|
1911
|
|
|
|
|
|
|
=head2 api_versions |
|
1912
|
|
|
|
|
|
|
|
|
1913
|
|
|
|
|
|
|
Returns a hashref of supported API keys to max versions, or undef |
|
1914
|
|
|
|
|
|
|
if not yet negotiated. |
|
1915
|
|
|
|
|
|
|
|
|
1916
|
|
|
|
|
|
|
my $vers = $conn->api_versions; |
|
1917
|
|
|
|
|
|
|
# { 0 => 7, 1 => 11, 3 => 8, ... } |
|
1918
|
|
|
|
|
|
|
|
|
1919
|
|
|
|
|
|
|
=head2 on_error([$cb]) |
|
1920
|
|
|
|
|
|
|
|
|
1921
|
|
|
|
|
|
|
=head2 on_connect([$cb]) |
|
1922
|
|
|
|
|
|
|
|
|
1923
|
|
|
|
|
|
|
=head2 on_disconnect([$cb]) |
|
1924
|
|
|
|
|
|
|
|
|
1925
|
|
|
|
|
|
|
Set handler callbacks. Pass C to clear. |
|
1926
|
|
|
|
|
|
|
|
|
1927
|
|
|
|
|
|
|
=head2 client_id($id) |
|
1928
|
|
|
|
|
|
|
|
|
1929
|
|
|
|
|
|
|
Set the client identifier. |
|
1930
|
|
|
|
|
|
|
|
|
1931
|
|
|
|
|
|
|
=head2 tls($enable, [$ca_file, $skip_verify]) |
|
1932
|
|
|
|
|
|
|
|
|
1933
|
|
|
|
|
|
|
Configure TLS. |
|
1934
|
|
|
|
|
|
|
|
|
1935
|
|
|
|
|
|
|
=head2 sasl($mechanism, [$username, $password]) |
|
1936
|
|
|
|
|
|
|
|
|
1937
|
|
|
|
|
|
|
Configure SASL authentication. |
|
1938
|
|
|
|
|
|
|
|
|
1939
|
|
|
|
|
|
|
=head2 auto_reconnect($enable, [$delay_ms]) |
|
1940
|
|
|
|
|
|
|
|
|
1941
|
|
|
|
|
|
|
Enable automatic reconnection with delay in milliseconds (default 1000). |
|
1942
|
|
|
|
|
|
|
|
|
1943
|
|
|
|
|
|
|
=head2 leave_group($group_id, $member_id, $cb) |
|
1944
|
|
|
|
|
|
|
|
|
1945
|
|
|
|
|
|
|
Send LeaveGroup to coordinator for fast partition rebalance. |
|
1946
|
|
|
|
|
|
|
|
|
1947
|
|
|
|
|
|
|
=head2 create_topics(\@topics, $timeout_ms, $cb) |
|
1948
|
|
|
|
|
|
|
|
|
1949
|
|
|
|
|
|
|
Create topics. Each element: C<{name, num_partitions, replication_factor}>. |
|
1950
|
|
|
|
|
|
|
|
|
1951
|
|
|
|
|
|
|
$conn->create_topics( |
|
1952
|
|
|
|
|
|
|
[{ name => 'new-topic', num_partitions => 3, replication_factor => 1 }], |
|
1953
|
|
|
|
|
|
|
5000, sub { my ($res, $err) = @_ } |
|
1954
|
|
|
|
|
|
|
); |
|
1955
|
|
|
|
|
|
|
|
|
1956
|
|
|
|
|
|
|
=head2 delete_topics(\@topic_names, $timeout_ms, $cb) |
|
1957
|
|
|
|
|
|
|
|
|
1958
|
|
|
|
|
|
|
Delete topics by name. |
|
1959
|
|
|
|
|
|
|
|
|
1960
|
|
|
|
|
|
|
=head2 init_producer_id($transactional_id, $txn_timeout_ms, $cb) |
|
1961
|
|
|
|
|
|
|
|
|
1962
|
|
|
|
|
|
|
Initialize a producer ID for idempotent/transactional produce. |
|
1963
|
|
|
|
|
|
|
Pass C for non-transactional idempotent producer. |
|
1964
|
|
|
|
|
|
|
|
|
1965
|
|
|
|
|
|
|
=head2 add_partitions_to_txn($txn_id, $producer_id, $epoch, \@topics, $cb) |
|
1966
|
|
|
|
|
|
|
|
|
1967
|
|
|
|
|
|
|
Register partitions with the transaction coordinator. |
|
1968
|
|
|
|
|
|
|
|
|
1969
|
|
|
|
|
|
|
=head2 end_txn($txn_id, $producer_id, $epoch, $committed, $cb) |
|
1970
|
|
|
|
|
|
|
|
|
1971
|
|
|
|
|
|
|
Commit (C<$committed=1>) or abort (C<$committed=0>) a transaction. |
|
1972
|
|
|
|
|
|
|
|
|
1973
|
|
|
|
|
|
|
=head2 txn_offset_commit($txn_id, $group_id, $producer_id, $epoch, $generation, $member_id, \@offsets, $cb) |
|
1974
|
|
|
|
|
|
|
|
|
1975
|
|
|
|
|
|
|
Commit consumer offsets within a transaction (API 28). |
|
1976
|
|
|
|
|
|
|
|
|
1977
|
|
|
|
|
|
|
=head2 pending |
|
1978
|
|
|
|
|
|
|
|
|
1979
|
|
|
|
|
|
|
Number of requests awaiting broker response. |
|
1980
|
|
|
|
|
|
|
|
|
1981
|
|
|
|
|
|
|
=head2 state |
|
1982
|
|
|
|
|
|
|
|
|
1983
|
|
|
|
|
|
|
Connection state as integer (0=disconnected, 6=ready). |
|
1984
|
|
|
|
|
|
|
|
|
1985
|
|
|
|
|
|
|
=head1 UTILITY FUNCTIONS |
|
1986
|
|
|
|
|
|
|
|
|
1987
|
|
|
|
|
|
|
=head2 EV::Kafka::_murmur2($key) |
|
1988
|
|
|
|
|
|
|
|
|
1989
|
|
|
|
|
|
|
Kafka-compatible murmur2 hash. Returns a non-negative 31-bit integer. |
|
1990
|
|
|
|
|
|
|
|
|
1991
|
|
|
|
|
|
|
=head2 EV::Kafka::_crc32c($data) |
|
1992
|
|
|
|
|
|
|
|
|
1993
|
|
|
|
|
|
|
CRC32C checksum (Castagnoli). Used internally for RecordBatch integrity. |
|
1994
|
|
|
|
|
|
|
|
|
1995
|
|
|
|
|
|
|
=head2 EV::Kafka::_error_name($code) |
|
1996
|
|
|
|
|
|
|
|
|
1997
|
|
|
|
|
|
|
Convert Kafka error code to string name. |
|
1998
|
|
|
|
|
|
|
|
|
1999
|
|
|
|
|
|
|
=head1 RESULT STRUCTURES |
|
2000
|
|
|
|
|
|
|
|
|
2001
|
|
|
|
|
|
|
=head2 Produce result |
|
2002
|
|
|
|
|
|
|
|
|
2003
|
|
|
|
|
|
|
$result = { |
|
2004
|
|
|
|
|
|
|
topics => [{ |
|
2005
|
|
|
|
|
|
|
topic => 'name', |
|
2006
|
|
|
|
|
|
|
partitions => [{ |
|
2007
|
|
|
|
|
|
|
partition => 0, |
|
2008
|
|
|
|
|
|
|
error_code => 0, |
|
2009
|
|
|
|
|
|
|
base_offset => 42, |
|
2010
|
|
|
|
|
|
|
}], |
|
2011
|
|
|
|
|
|
|
}], |
|
2012
|
|
|
|
|
|
|
}; |
|
2013
|
|
|
|
|
|
|
|
|
2014
|
|
|
|
|
|
|
=head2 Fetch result |
|
2015
|
|
|
|
|
|
|
|
|
2016
|
|
|
|
|
|
|
$result = { |
|
2017
|
|
|
|
|
|
|
topics => [{ |
|
2018
|
|
|
|
|
|
|
topic => 'name', |
|
2019
|
|
|
|
|
|
|
partitions => [{ |
|
2020
|
|
|
|
|
|
|
partition => 0, |
|
2021
|
|
|
|
|
|
|
error_code => 0, |
|
2022
|
|
|
|
|
|
|
high_watermark => 100, |
|
2023
|
|
|
|
|
|
|
records => [{ |
|
2024
|
|
|
|
|
|
|
offset => 42, |
|
2025
|
|
|
|
|
|
|
timestamp => 1712345678000, |
|
2026
|
|
|
|
|
|
|
key => 'key', # or undef |
|
2027
|
|
|
|
|
|
|
value => 'value', # or undef |
|
2028
|
|
|
|
|
|
|
headers => { h => 'v' }, # if present |
|
2029
|
|
|
|
|
|
|
}], |
|
2030
|
|
|
|
|
|
|
}], |
|
2031
|
|
|
|
|
|
|
}], |
|
2032
|
|
|
|
|
|
|
}; |
|
2033
|
|
|
|
|
|
|
|
|
2034
|
|
|
|
|
|
|
=head2 Metadata result |
|
2035
|
|
|
|
|
|
|
|
|
2036
|
|
|
|
|
|
|
$result = { |
|
2037
|
|
|
|
|
|
|
controller_id => 0, |
|
2038
|
|
|
|
|
|
|
brokers => [{ node_id => 0, host => '10.0.0.1', port => 9092 }], |
|
2039
|
|
|
|
|
|
|
topics => [{ |
|
2040
|
|
|
|
|
|
|
name => 'topic', |
|
2041
|
|
|
|
|
|
|
error_code => 0, |
|
2042
|
|
|
|
|
|
|
partitions => [{ |
|
2043
|
|
|
|
|
|
|
partition => 0, |
|
2044
|
|
|
|
|
|
|
leader => 0, |
|
2045
|
|
|
|
|
|
|
error_code => 0, |
|
2046
|
|
|
|
|
|
|
}], |
|
2047
|
|
|
|
|
|
|
}], |
|
2048
|
|
|
|
|
|
|
}; |
|
2049
|
|
|
|
|
|
|
|
|
2050
|
|
|
|
|
|
|
=head1 ERROR HANDLING |
|
2051
|
|
|
|
|
|
|
|
|
2052
|
|
|
|
|
|
|
Errors are delivered through two channels: |
|
2053
|
|
|
|
|
|
|
|
|
2054
|
|
|
|
|
|
|
=over |
|
2055
|
|
|
|
|
|
|
|
|
2056
|
|
|
|
|
|
|
=item B fire the C callback (or |
|
2057
|
|
|
|
|
|
|
C if none set). These include connection refused, DNS failure, |
|
2058
|
|
|
|
|
|
|
TLS errors, SASL auth failure, and protocol violations. |
|
2059
|
|
|
|
|
|
|
|
|
2060
|
|
|
|
|
|
|
=item B are delivered as the second argument to |
|
2061
|
|
|
|
|
|
|
the request callback: C<$cb-E($result, $error)>. If C<$error> is |
|
2062
|
|
|
|
|
|
|
defined, C<$result> may be undef. |
|
2063
|
|
|
|
|
|
|
|
|
2064
|
|
|
|
|
|
|
=back |
|
2065
|
|
|
|
|
|
|
|
|
2066
|
|
|
|
|
|
|
Within result structures, per-partition C fields use Kafka |
|
2067
|
|
|
|
|
|
|
numeric codes: |
|
2068
|
|
|
|
|
|
|
|
|
2069
|
|
|
|
|
|
|
0 No error |
|
2070
|
|
|
|
|
|
|
1 OFFSET_OUT_OF_RANGE |
|
2071
|
|
|
|
|
|
|
3 UNKNOWN_TOPIC_OR_PARTITION |
|
2072
|
|
|
|
|
|
|
6 NOT_LEADER_OR_FOLLOWER |
|
2073
|
|
|
|
|
|
|
15 COORDINATOR_NOT_AVAILABLE |
|
2074
|
|
|
|
|
|
|
16 NOT_COORDINATOR |
|
2075
|
|
|
|
|
|
|
25 UNKNOWN_MEMBER_ID |
|
2076
|
|
|
|
|
|
|
27 REBALANCE_IN_PROGRESS |
|
2077
|
|
|
|
|
|
|
36 TOPIC_ALREADY_EXISTS |
|
2078
|
|
|
|
|
|
|
79 MEMBER_ID_REQUIRED |
|
2079
|
|
|
|
|
|
|
|
|
2080
|
|
|
|
|
|
|
When a broker disconnects mid-flight, all pending callbacks receive |
|
2081
|
|
|
|
|
|
|
C<(undef, "connection closed by broker")> or C<(undef, "disconnected")>. |
|
2082
|
|
|
|
|
|
|
|
|
2083
|
|
|
|
|
|
|
=head1 ENVIRONMENT VARIABLES |
|
2084
|
|
|
|
|
|
|
|
|
2085
|
|
|
|
|
|
|
These are used by tests and examples (not by the module itself): |
|
2086
|
|
|
|
|
|
|
|
|
2087
|
|
|
|
|
|
|
TEST_KAFKA_BROKER broker address for tests (host:port) |
|
2088
|
|
|
|
|
|
|
KAFKA_BROKER broker address for examples |
|
2089
|
|
|
|
|
|
|
KAFKA_HOST broker hostname for low-level examples |
|
2090
|
|
|
|
|
|
|
KAFKA_PORT broker port for low-level examples |
|
2091
|
|
|
|
|
|
|
KAFKA_TOPIC topic name for examples |
|
2092
|
|
|
|
|
|
|
KAFKA_GROUP_ID consumer group for examples |
|
2093
|
|
|
|
|
|
|
KAFKA_LIMIT message limit for consume example |
|
2094
|
|
|
|
|
|
|
KAFKA_COUNT message count for fire-and-forget |
|
2095
|
|
|
|
|
|
|
BENCH_BROKER broker for benchmarks |
|
2096
|
|
|
|
|
|
|
BENCH_MESSAGES message count for benchmarks |
|
2097
|
|
|
|
|
|
|
BENCH_VALUE_SIZE value size in bytes for benchmarks |
|
2098
|
|
|
|
|
|
|
BENCH_TOPIC topic name for benchmarks |
|
2099
|
|
|
|
|
|
|
|
|
2100
|
|
|
|
|
|
|
=head1 QUICK START |
|
2101
|
|
|
|
|
|
|
|
|
2102
|
|
|
|
|
|
|
Minimal producer + consumer lifecycle: |
|
2103
|
|
|
|
|
|
|
|
|
2104
|
|
|
|
|
|
|
use EV; |
|
2105
|
|
|
|
|
|
|
use EV::Kafka; |
|
2106
|
|
|
|
|
|
|
|
|
2107
|
|
|
|
|
|
|
my $kafka = EV::Kafka->new( |
|
2108
|
|
|
|
|
|
|
brokers => '127.0.0.1:9092', |
|
2109
|
|
|
|
|
|
|
acks => 1, |
|
2110
|
|
|
|
|
|
|
on_error => sub { warn "kafka: @_\n" }, |
|
2111
|
|
|
|
|
|
|
on_message => sub { |
|
2112
|
|
|
|
|
|
|
my ($topic, $part, $offset, $key, $value) = @_; |
|
2113
|
|
|
|
|
|
|
print "got: $key=$value\n"; |
|
2114
|
|
|
|
|
|
|
}, |
|
2115
|
|
|
|
|
|
|
); |
|
2116
|
|
|
|
|
|
|
|
|
2117
|
|
|
|
|
|
|
$kafka->connect(sub { |
|
2118
|
|
|
|
|
|
|
# produce |
|
2119
|
|
|
|
|
|
|
$kafka->produce('test', 'k1', 'hello', sub { |
|
2120
|
|
|
|
|
|
|
print "produced\n"; |
|
2121
|
|
|
|
|
|
|
|
|
2122
|
|
|
|
|
|
|
# consume from the beginning |
|
2123
|
|
|
|
|
|
|
$kafka->assign([{topic=>'test', partition=>0, offset=>0}]); |
|
2124
|
|
|
|
|
|
|
$kafka->seek('test', 0, -2, sub { |
|
2125
|
|
|
|
|
|
|
my $t = EV::timer 0, 0.1, sub { $kafka->poll }; |
|
2126
|
|
|
|
|
|
|
$kafka->{cfg}{_t} = $t; |
|
2127
|
|
|
|
|
|
|
}); |
|
2128
|
|
|
|
|
|
|
}); |
|
2129
|
|
|
|
|
|
|
}); |
|
2130
|
|
|
|
|
|
|
|
|
2131
|
|
|
|
|
|
|
EV::run; |
|
2132
|
|
|
|
|
|
|
|
|
2133
|
|
|
|
|
|
|
=head1 COOKBOOK |
|
2134
|
|
|
|
|
|
|
|
|
2135
|
|
|
|
|
|
|
=head2 Produce JSON with headers |
|
2136
|
|
|
|
|
|
|
|
|
2137
|
|
|
|
|
|
|
use JSON::PP; |
|
2138
|
|
|
|
|
|
|
my $json = JSON::PP->new->utf8; |
|
2139
|
|
|
|
|
|
|
|
|
2140
|
|
|
|
|
|
|
$kafka->produce('events', 'user-42', |
|
2141
|
|
|
|
|
|
|
$json->encode({ action => 'click', page => '/home' }), |
|
2142
|
|
|
|
|
|
|
{ headers => { 'content-type' => 'application/json' } }, |
|
2143
|
|
|
|
|
|
|
sub { ... } |
|
2144
|
|
|
|
|
|
|
); |
|
2145
|
|
|
|
|
|
|
|
|
2146
|
|
|
|
|
|
|
=head2 Consume from latest offset only |
|
2147
|
|
|
|
|
|
|
|
|
2148
|
|
|
|
|
|
|
$kafka->subscribe('live-feed', |
|
2149
|
|
|
|
|
|
|
group_id => 'realtime', |
|
2150
|
|
|
|
|
|
|
auto_offset_reset => 'latest', |
|
2151
|
|
|
|
|
|
|
on_assign => sub { print "ready\n" }, |
|
2152
|
|
|
|
|
|
|
); |
|
2153
|
|
|
|
|
|
|
|
|
2154
|
|
|
|
|
|
|
=head2 Graceful shutdown |
|
2155
|
|
|
|
|
|
|
|
|
2156
|
|
|
|
|
|
|
$SIG{INT} = sub { |
|
2157
|
|
|
|
|
|
|
$kafka->commit(sub { |
|
2158
|
|
|
|
|
|
|
$kafka->unsubscribe(sub { |
|
2159
|
|
|
|
|
|
|
$kafka->close(sub { EV::break }); |
|
2160
|
|
|
|
|
|
|
}); |
|
2161
|
|
|
|
|
|
|
}); |
|
2162
|
|
|
|
|
|
|
}; |
|
2163
|
|
|
|
|
|
|
|
|
2164
|
|
|
|
|
|
|
=head2 At-least-once processing |
|
2165
|
|
|
|
|
|
|
|
|
2166
|
|
|
|
|
|
|
$kafka->subscribe('jobs', |
|
2167
|
|
|
|
|
|
|
group_id => 'workers', |
|
2168
|
|
|
|
|
|
|
auto_commit => 0, |
|
2169
|
|
|
|
|
|
|
); |
|
2170
|
|
|
|
|
|
|
|
|
2171
|
|
|
|
|
|
|
# in on_message: process, then commit |
|
2172
|
|
|
|
|
|
|
on_message => sub { |
|
2173
|
|
|
|
|
|
|
process($_[4]); |
|
2174
|
|
|
|
|
|
|
$kafka->commit if ++$count % 100 == 0; |
|
2175
|
|
|
|
|
|
|
}, |
|
2176
|
|
|
|
|
|
|
|
|
2177
|
|
|
|
|
|
|
=head2 Batch produce |
|
2178
|
|
|
|
|
|
|
|
|
2179
|
|
|
|
|
|
|
$kafka->produce_many([ |
|
2180
|
|
|
|
|
|
|
['events', 'k1', 'v1'], |
|
2181
|
|
|
|
|
|
|
['events', 'k2', 'v2'], |
|
2182
|
|
|
|
|
|
|
['events', 'k3', 'v3'], |
|
2183
|
|
|
|
|
|
|
], sub { |
|
2184
|
|
|
|
|
|
|
my $errs = shift; |
|
2185
|
|
|
|
|
|
|
print $errs ? "some failed\n" : "all done\n"; |
|
2186
|
|
|
|
|
|
|
}); |
|
2187
|
|
|
|
|
|
|
|
|
2188
|
|
|
|
|
|
|
=head2 Exactly-once stream processing (EOS) |
|
2189
|
|
|
|
|
|
|
|
|
2190
|
|
|
|
|
|
|
my $kafka = EV::Kafka->new( |
|
2191
|
|
|
|
|
|
|
brokers => '...', |
|
2192
|
|
|
|
|
|
|
transactional_id => 'my-eos-app', |
|
2193
|
|
|
|
|
|
|
acks => -1, |
|
2194
|
|
|
|
|
|
|
on_message => sub { |
|
2195
|
|
|
|
|
|
|
my ($t, $p, $off, $key, $value) = @_; |
|
2196
|
|
|
|
|
|
|
my $result = process($value); |
|
2197
|
|
|
|
|
|
|
$kafka->produce('output-topic', $key, $result); |
|
2198
|
|
|
|
|
|
|
}, |
|
2199
|
|
|
|
|
|
|
); |
|
2200
|
|
|
|
|
|
|
|
|
2201
|
|
|
|
|
|
|
# consume-process-produce loop: |
|
2202
|
|
|
|
|
|
|
$kafka->begin_transaction; |
|
2203
|
|
|
|
|
|
|
$kafka->poll(sub { |
|
2204
|
|
|
|
|
|
|
$kafka->send_offsets_to_transaction('my-group', sub { |
|
2205
|
|
|
|
|
|
|
$kafka->commit_transaction(sub { |
|
2206
|
|
|
|
|
|
|
$kafka->begin_transaction; # next transaction |
|
2207
|
|
|
|
|
|
|
}); |
|
2208
|
|
|
|
|
|
|
}); |
|
2209
|
|
|
|
|
|
|
}); |
|
2210
|
|
|
|
|
|
|
|
|
2211
|
|
|
|
|
|
|
=head2 Topic administration |
|
2212
|
|
|
|
|
|
|
|
|
2213
|
|
|
|
|
|
|
my $conn = EV::Kafka::Conn::_new('EV::Kafka::Conn', undef); |
|
2214
|
|
|
|
|
|
|
$conn->on_connect(sub { |
|
2215
|
|
|
|
|
|
|
$conn->create_topics( |
|
2216
|
|
|
|
|
|
|
[{ name => 'new-topic', num_partitions => 6, replication_factor => 3 }], |
|
2217
|
|
|
|
|
|
|
10000, sub { ... } |
|
2218
|
|
|
|
|
|
|
); |
|
2219
|
|
|
|
|
|
|
}); |
|
2220
|
|
|
|
|
|
|
|
|
2221
|
|
|
|
|
|
|
=head1 BENCHMARKS |
|
2222
|
|
|
|
|
|
|
|
|
2223
|
|
|
|
|
|
|
Measured on Linux with TCP loopback to Redpanda, 100-byte values, |
|
2224
|
|
|
|
|
|
|
Perl 5.40.2, 50K messages (C): |
|
2225
|
|
|
|
|
|
|
|
|
2226
|
|
|
|
|
|
|
Pipeline produce (acks=1) 68K msg/sec 7.4 MB/s |
|
2227
|
|
|
|
|
|
|
Fire-and-forget (acks=0) 100K msg/sec 11.0 MB/s |
|
2228
|
|
|
|
|
|
|
Fetch throughput 31K msg/sec 3.4 MB/s |
|
2229
|
|
|
|
|
|
|
Sequential round-trip 19K msg/sec 54 us avg latency |
|
2230
|
|
|
|
|
|
|
Metadata request 25K req/sec 41 us avg latency |
|
2231
|
|
|
|
|
|
|
|
|
2232
|
|
|
|
|
|
|
Throughput by value size (pipelined, acks=1): |
|
2233
|
|
|
|
|
|
|
|
|
2234
|
|
|
|
|
|
|
10 bytes 61K msg/sec 0.9 MB/s |
|
2235
|
|
|
|
|
|
|
100 bytes 68K msg/sec 7.4 MB/s |
|
2236
|
|
|
|
|
|
|
1000 bytes 50K msg/sec 50.2 MB/s |
|
2237
|
|
|
|
|
|
|
10000 bytes 18K msg/sec 178.5 MB/s |
|
2238
|
|
|
|
|
|
|
|
|
2239
|
|
|
|
|
|
|
Pipeline produce throughput is limited by Perl callback overhead per |
|
2240
|
|
|
|
|
|
|
message. Fire-and-forget mode (C) skips the response cycle |
|
2241
|
|
|
|
|
|
|
entirely, reaching ~100K msg/sec. Sequential round-trip (one produce, |
|
2242
|
|
|
|
|
|
|
wait for ack, repeat) measures raw broker latency at ~54 microseconds. |
|
2243
|
|
|
|
|
|
|
|
|
2244
|
|
|
|
|
|
|
The fetch path is sequential (fetch, process, fetch again) which |
|
2245
|
|
|
|
|
|
|
introduces one round-trip per batch. With larger C and |
|
2246
|
|
|
|
|
|
|
dense topics, fetch throughput increases proportionally. |
|
2247
|
|
|
|
|
|
|
|
|
2248
|
|
|
|
|
|
|
Run C for throughput results. Set |
|
2249
|
|
|
|
|
|
|
C, C, C, and |
|
2250
|
|
|
|
|
|
|
C to customize. |
|
2251
|
|
|
|
|
|
|
|
|
2252
|
|
|
|
|
|
|
Run C for a latency histogram with percentiles |
|
2253
|
|
|
|
|
|
|
(min, avg, median, p90, p95, p99, max). |
|
2254
|
|
|
|
|
|
|
|
|
2255
|
|
|
|
|
|
|
=head1 KAFKA PROTOCOL |
|
2256
|
|
|
|
|
|
|
|
|
2257
|
|
|
|
|
|
|
This module implements the Kafka binary protocol directly in XS. |
|
2258
|
|
|
|
|
|
|
All integers are big-endian. Requests use a 4-byte size prefix |
|
2259
|
|
|
|
|
|
|
followed by a header (API key, version, correlation ID, client ID) |
|
2260
|
|
|
|
|
|
|
and a version-specific body. |
|
2261
|
|
|
|
|
|
|
|
|
2262
|
|
|
|
|
|
|
Responses are matched to requests by correlation ID. The broker |
|
2263
|
|
|
|
|
|
|
guarantees FIFO ordering per connection, so the response queue is |
|
2264
|
|
|
|
|
|
|
a simple FIFO. |
|
2265
|
|
|
|
|
|
|
|
|
2266
|
|
|
|
|
|
|
RecordBatch encoding (magic=2) is used for produce. CRC32C covers |
|
2267
|
|
|
|
|
|
|
the batch from attributes through the last record. Records use |
|
2268
|
|
|
|
|
|
|
ZigZag-encoded varints for lengths and deltas. |
|
2269
|
|
|
|
|
|
|
|
|
2270
|
|
|
|
|
|
|
The connection handshake sends ApiVersions (v0) on connect to |
|
2271
|
|
|
|
|
|
|
discover supported protocol versions. SASL authentication uses |
|
2272
|
|
|
|
|
|
|
SaslHandshake (v1) + SaslAuthenticate (v2) with PLAIN mechanism. |
|
2273
|
|
|
|
|
|
|
|
|
2274
|
|
|
|
|
|
|
Consumer group protocol uses sticky partition assignment with |
|
2275
|
|
|
|
|
|
|
MEMBER_ID_REQUIRED (error 79) retry per KIP-394. |
|
2276
|
|
|
|
|
|
|
|
|
2277
|
|
|
|
|
|
|
Non-flexible API versions are used throughout (capped below the |
|
2278
|
|
|
|
|
|
|
flexible-version threshold for each API) to avoid the compact |
|
2279
|
|
|
|
|
|
|
encoding complexity. |
|
2280
|
|
|
|
|
|
|
|
|
2281
|
|
|
|
|
|
|
=head1 LIMITATIONS |
|
2282
|
|
|
|
|
|
|
|
|
2283
|
|
|
|
|
|
|
=over |
|
2284
|
|
|
|
|
|
|
|
|
2285
|
|
|
|
|
|
|
=item * B -- supported when built with |
|
2286
|
|
|
|
|
|
|
liblz4 and zlib. snappy and zstd are not implemented. |
|
2287
|
|
|
|
|
|
|
|
|
2288
|
|
|
|
|
|
|
=item * B -- C, |
|
2289
|
|
|
|
|
|
|
C, C, |
|
2290
|
|
|
|
|
|
|
C provide full exactly-once stream processing. |
|
2291
|
|
|
|
|
|
|
C, C, C, C |
|
2292
|
|
|
|
|
|
|
are all wired. Requires C in constructor. |
|
2293
|
|
|
|
|
|
|
|
|
2294
|
|
|
|
|
|
|
=item * B -- SASL/PLAIN and SCRAM-SHA-256/512 |
|
2295
|
|
|
|
|
|
|
are supported. GSSAPI (Kerberos) and OAUTHBEARER are not implemented. |
|
2296
|
|
|
|
|
|
|
|
|
2297
|
|
|
|
|
|
|
=item * B -- assignments are preserved |
|
2298
|
|
|
|
|
|
|
across rebalances where possible. New partitions are distributed to |
|
2299
|
|
|
|
|
|
|
the least-loaded member. Overloaded members shed excess partitions. |
|
2300
|
|
|
|
|
|
|
|
|
2301
|
|
|
|
|
|
|
=item * B -- C is called |
|
2302
|
|
|
|
|
|
|
synchronously in C. For fully non-blocking |
|
2303
|
|
|
|
|
|
|
operation, use IP addresses instead of hostnames. |
|
2304
|
|
|
|
|
|
|
|
|
2305
|
|
|
|
|
|
|
=item * B -- all API versions are capped |
|
2306
|
|
|
|
|
|
|
below the flexible-version threshold to avoid compact string/array |
|
2307
|
|
|
|
|
|
|
encoding. This limits interoperability with very new protocol features |
|
2308
|
|
|
|
|
|
|
but works with all Kafka 0.11+ and Redpanda brokers. |
|
2309
|
|
|
|
|
|
|
|
|
2310
|
|
|
|
|
|
|
=item * B -- transient errors (NOT_LEADER, |
|
2311
|
|
|
|
|
|
|
COORDINATOR_NOT_AVAILABLE) trigger metadata refresh and up to 3 |
|
2312
|
|
|
|
|
|
|
retries with backoff. Non-retriable errors are surfaced to the |
|
2313
|
|
|
|
|
|
|
callback immediately. |
|
2314
|
|
|
|
|
|
|
|
|
2315
|
|
|
|
|
|
|
=back |
|
2316
|
|
|
|
|
|
|
|
|
2317
|
|
|
|
|
|
|
=head1 AUTHOR |
|
2318
|
|
|
|
|
|
|
|
|
2319
|
|
|
|
|
|
|
vividsnow |
|
2320
|
|
|
|
|
|
|
|
|
2321
|
|
|
|
|
|
|
=head1 LICENSE |
|
2322
|
|
|
|
|
|
|
|
|
2323
|
|
|
|
|
|
|
This library is free software; you can redistribute it and/or modify it |
|
2324
|
|
|
|
|
|
|
under the same terms as Perl itself. |
|
2325
|
|
|
|
|
|
|
|
|
2326
|
|
|
|
|
|
|
=cut |
|
2327
|
|
|
|
|
|
|
|