File Coverage

blib/lib/Metabrik/Client/Kafka.pm
Criterion Covered Total %
statement 12 174 6.9
branch 0 66 0.0
condition 0 17 0.0
subroutine 4 19 21.0
pod 1 14 7.1
total 17 290 5.8


line stmt bran cond sub pod time code
1             #
2             # $Id$
3             #
4             # client::kafka Brik
5             #
6             package Metabrik::Client::Kafka;
7 1     1   761 use strict;
  1         3  
  1         30  
8 1     1   5 use warnings;
  1         2  
  1         26  
9              
10 1     1   6 use base qw(Metabrik::Shell::Command);
  1         3  
  1         325  
11              
12             sub brik_properties {
13             return {
14 0     0 1   revision => '$Revision$',
15             tags => [ qw(unstable) ],
16             author => 'GomoR ',
17             license => 'http://opensource.org/licenses/BSD-3-Clause',
18             attributes => {
19             host => [ qw(host_list) ],
20             host_zookeeper => [ qw(host) ],
21             max_fetch_size => [ qw(size) ],
22             rtimeout => [ qw(seconds_float) ],
23             retry => [ qw(count) ],
24             retry_backoff => [ qw(milliseconds) ],
25             _broker => [ qw(INTERNAL) ],
26             _kc => [ qw(INTERNAL) ],
27             _kcli => [ qw(INTERNAL) ],
28             },
29             attributes_default => {
30             host => [ qw(localhost:9092) ],
31             host_zookeeper => 'localhost',
32             max_fetch_size => 20000000,
33             rtimeout => 3,
34             retry => 5,
35             retry_backoff => 1000,
36             },
37             commands => {
38             create_connection => [ qw(host|OPTIONAL) ],
39             create_producer => [ ],
40             create_consumer => [ ],
41             send => [ qw(topic partition messages) ],
42             loop_consumer_fetch => [ qw(topic partition|OPTIONAL) ],
43             close => [ ],
44             create_topic => [ qw(topic replication_factor|OPTIONAL partitions|OPTIONAL) ],
45             alter_topic => [ qw(topic replication_factor|OPTIONAL partitions|OPTIONAL) ],
46             delete_topic => [ qw(topic) ],
47             list_topics => [ ],
48             describe_topic => [ qw(topic) ],
49             run_console_producer => [ qw(topic) ],
50             run_console_consumer => [ qw(topic) ],
51             },
52             require_modules => {
53             'List::Util' => [ qw(shuffle) ],
54             'Kafka' => [ ],
55             'Kafka::Connection' => [ ],
56             'Kafka::Producer' => [ ],
57             'Kafka::Consumer' => [ ],
58             },
59             require_binaries => {
60             },
61             optional_binaries => {
62             },
63             need_packages => {
64             freebsd => [ qw(p5-Tree-Trie) ],
65             },
66             };
67             }
68              
69             sub create_connection {
70 0     0 0   my $self = shift;
71 0           my ($host) = @_;
72              
73 0   0       $host ||= $self->host;
74 0 0         $self->brik_help_run_undef_arg('create_connection', $host) or return;
75 0 0         $self->brik_help_run_invalid_arg('create_connection', $host, 'ARRAY') or return;
76 0 0         $self->brik_help_run_empty_array_arg('create_connection', $host) or return;
77              
78             # Patch fonction to disable utf8 stuff, it fails strangely.
79             {
80 1     1   8 no warnings 'redefine';
  1         1  
  1         1889  
  0            
81              
82             *Kafka::Connection::_is_like_server = sub {
83 0     0     my ($self, $server) = @_;
84              
85 0 0 0       unless(
      0        
86             defined($server)
87             && defined(Kafka::Connection::_STRING($server))
88             #&& !utf8::is_utf8($server) # this sucks.
89             && $server =~ /^[^:]+:\d+$/
90             ) {
91 0           return;
92             }
93              
94 0           return $server;
95 0           };
96             };
97              
98 0           my $rtimeout = $self->rtimeout;
99 0           my $send_max_attempts = $self->retry;
100 0           my $retry_backoff = $self->retry_backoff;
101              
102             # Cause Kafka will connect to the first working broker.
103             # By randomizing, different processes will use different brokers.
104 0           my @list = List::Util::shuffle(@$host);
105              
106 0           my $broker = $list[0]; # We take the first, as it is now randomized.
107 0           $self->_broker($broker);
108              
109 0           my $kc;
110 0           eval {
111 0           $kc = Kafka::Connection->new(
112             broker_list => [ $broker ],
113             timeout => $rtimeout,
114             SEND_MAX_ATTEMPTS => $send_max_attempts,
115             RETRY_BACKOFF => $retry_backoff,
116             );
117             };
118 0 0         if ($@) {
119 0           chomp($@);
120 0           my $str_list = join(',', @list);
121 0           return $self->log->error("create_connection: failed with list [$str_list]: [$@]");
122             }
123              
124 0           return $self->_kc($kc);
125             }
126              
127             sub create_producer {
128 0     0 0   my $self = shift;
129 0           my ($host) = @_;
130              
131 0 0         my $kc = $self->create_connection or return;
132              
133             # Doc:
134             # https://kafka.apache.org/documentation/#acks
135              
136 0           my $kp;
137 0           eval {
138 0           $kp = Kafka::Producer->new(
139             Connection => $kc,
140             #RequiredAcks => $Kafka::WAIT_WRITTEN_TO_LOCAL_LOG, # 1, default
141             RequiredAcks => $Kafka::BLOCK_UNTIL_IS_COMMITTED, # -1, best
142             #RequiredAcks => $Kafka::NOT_SEND_ANY_RESPONSE, # 0
143             );
144             };
145 0 0         if ($@) {
146 0           chomp($@);
147 0           return $self->log->error("create_producer: failed [$@]");
148             }
149              
150 0           return $self->_kcli($kp);
151             }
152              
153             sub create_consumer {
154 0     0 0   my $self = shift;
155 0           my ($host) = @_;
156              
157 0 0         my $kc = $self->create_connection or return;
158              
159 0           my $kco;
160 0           eval {
161 0           $kco = Kafka::Consumer->new(Connection => $kc);
162             };
163 0 0         if ($@) {
164 0           chomp($@);
165 0           return $self->log->error("create_consumer: failed [$@]");
166             }
167              
168 0           return $self->_kcli($kco);
169             }
170              
171             sub send {
172 0     0 0   my $self = shift;
173 0           my ($topic, $partition, $messages) = @_;
174              
175 0           my $kcli = $self->_kcli;
176 0 0         $self->brik_help_run_undef_arg('create_producer', $kcli) or return;
177              
178 0 0         $self->brik_help_run_undef_arg('send', $topic) or return;
179 0 0         $self->brik_help_run_undef_arg('send', $partition) or return;
180 0 0         $self->brik_help_run_undef_arg('send', $messages) or return;
181 0 0         $self->brik_help_run_invalid_arg('send', $messages, 'ARRAY', 'SCALAR') or return;
182              
183 0           my $broker = $self->_broker;
184              
185 0           my $r;
186 0           eval {
187 0           $r = $kcli->send($topic, $partition, $messages);
188             };
189 0 0         if ($@) {
190 0           chomp($@);
191              
192             # Response $r looks like the following. We should use ErrorCode instead of regexes.
193             # {
194             # CorrelationId => -1608629279,
195             # Throttle_Time_Ms => 0,
196             # topics => [ {
197             # partitions => [
198             # { ErrorCode => 0, Log_Append_Time => -1, Offset => 0, Partition => 0 },
199             # ],
200             # TopicName => "test",
201             # } ],
202             # }
203              
204 0           my $no_ack_for_request = 'No acknowledgement for sent request';
205 0           my $cant_connect = 'Cannot connect to broker';
206 0           my $cant_get_metadata = 'Cannot get metadata';
207 0           my $unable_to_write = 'Unable to write due to ongoing Kafka leader selection';
208 0           my $no_known_broker = 'There are no known brokers';
209 0           my $too_big = 'Message is too big';
210 0           my $invalid_arg_messages = 'Invalid argument: messages';
211 0           my $err = $@;
212 0 0         if ($@ =~ m{^$no_ack_for_request}i) {
    0          
    0          
    0          
    0          
    0          
    0          
213 0           $err = $no_ack_for_request;
214             }
215             elsif ($@ =~ m{^$cant_connect}i) {
216 0           $err = $cant_connect;
217             }
218             elsif ($@ =~ m{^$cant_get_metadata}i) {
219 0           $err = $cant_get_metadata;
220             }
221             elsif ($@ =~ m{^$unable_to_write}i) {
222 0           $err = $unable_to_write;
223             }
224             elsif ($@ =~ m{^$no_known_broker}i) {
225 0           $err = $no_known_broker;
226             }
227             elsif ($@ =~ m{^$too_big}i) {
228 0           $err = $too_big;
229             }
230             elsif ($@ =~ m{^$invalid_arg_messages}i) {
231 0           $err = $invalid_arg_messages;
232             }
233              
234 0           my $broker = $self->_broker;
235              
236 0           return $self->log->error("send: fail for broker [$broker]: [$err]");
237             }
238              
239 0           return $r;
240             }
241              
242             sub loop_consumer_fetch {
243 0     0 0   my $self = shift;
244 0           my ($topic, $partition) = @_;
245              
246 0           my $kcli = $self->_kcli;
247 0 0         $self->brik_help_run_undef_arg('create_consumer', $kcli) or return;
248 0 0         $self->brik_help_run_undef_arg('loop_consumer_fetch', $topic) or return;
249              
250 0   0       $partition ||= 0;
251              
252 0           my $offsets = $kcli->offsets(
253             $topic,
254             $partition,
255             $Kafka::RECEIVE_EARLIEST_OFFSET, # time
256             $Kafka::DEFAULT_MAX_NUMBER_OF_OFFSETS, # max_number
257             );
258              
259 0           for (@$offsets) {
260 0           print "Received offset: $_\n";
261             }
262              
263 0           my $messages = $kcli->fetch(
264             $topic,
265             $partition,
266             0, # offset
267             $self->max_fetch_size, # Maximum size of MESSAGE(s) to receive
268             );
269 0           for my $message (@$messages) {
270 0 0         if ($message->valid) {
271 0           print 'payload : ', $message->payload, "\n";
272 0           print 'key : ', $message->key, "\n";
273 0           print 'offset : ', $message->offset, "\n";
274 0           print 'next_offset: ', $message->next_offset, "\n";
275             }
276             else {
277 0           print 'error : ', $message->error, "\n";
278             }
279             }
280              
281 0           return 1;
282             }
283              
284             sub close {
285 0     0 0   my $self = shift;
286              
287 0 0         if ($self->_kcli) {
288 0           $self->_kcli(undef);
289             }
290              
291 0 0         if ($self->_kc) {
292 0           $self->_kc->close;
293 0           $self->_kc(undef);
294             }
295              
296 0           return 1;
297             }
298              
299             sub create_topic {
300 0     0 0   my $self = shift;
301 0           my ($topic, $rf, $partitions) = @_;
302              
303 0   0       $rf ||= 1;
304 0   0       $partitions ||= 1;
305 0 0         $self->brik_help_run_undef_arg('create_topic', $topic) or return;
306              
307 0           my $basedir = $ENV{HOME}."/metabrik/server-kafka/kafka";
308 0           my $host = $self->host_zookeeper;
309              
310 0           my $cmd = "$basedir/bin/kafka-topics.sh --create --zookeeper $host:2181 ".
311             "--replication-factor $rf --partitions $partitions --topic $topic";
312              
313 0           $self->log->verbose("create_topic: cmd[$cmd]");
314              
315 0           return $self->execute($cmd);
316             }
317              
318             sub alter_topic {
319 0     0 0   my $self = shift;
320 0           my ($topic, $partitions) = @_;
321              
322 0   0       $partitions ||= 1;
323 0 0         $self->brik_help_run_undef_arg('alter_topic', $topic) or return;
324              
325 0           my $basedir = $ENV{HOME}."/metabrik/server-kafka/kafka";
326 0           my $host = $self->host_zookeeper;
327              
328 0           my $cmd = "$basedir/bin/kafka-topics.sh --alter --zookeeper $host:2181 ".
329             "--partitions $partitions --topic $topic";
330              
331 0           $self->log->verbose("alter_topic: cmd[$cmd]");
332              
333 0           return $self->execute($cmd);
334             }
335              
336             sub delete_topic {
337 0     0 0   my $self = shift;
338 0           my ($topic) = @_;
339              
340 0 0         $self->brik_help_run_undef_arg('delete_topic', $topic) or return;
341              
342 0           my $basedir = $ENV{HOME}."/metabrik/server-kafka/kafka";
343 0           my $host = $self->host_zookeeper;
344              
345 0           my $cmd = "$basedir/bin/kafka-topics.sh --delete --if-exists --zookeeper $host:2181 ".
346             "--topic $topic";
347              
348 0           $self->log->verbose("delete_topic: cmd[$cmd]");
349              
350 0           return $self->execute($cmd);
351             }
352              
353             sub list_topics {
354 0     0 0   my $self = shift;
355              
356 0           my $basedir = $ENV{HOME}."/metabrik/server-kafka/kafka";
357 0           my $host = $self->host_zookeeper;
358              
359 0           my $cmd = "$basedir/bin/kafka-topics.sh --list --zookeeper $host:2181";
360              
361 0           $self->log->verbose("list_topics: cmd[$cmd]");
362              
363 0           return $self->execute($cmd);
364             }
365              
366             sub describe_topic {
367 0     0 0   my $self = shift;
368 0           my ($topic) = @_;
369              
370 0 0         $self->brik_help_run_undef_arg('describe_topic', $topic) or return;
371              
372 0           my $basedir = $ENV{HOME}."/metabrik/server-kafka/kafka";
373 0           my $host = $self->host_zookeeper;
374              
375 0           my $cmd = "$basedir/bin/kafka-topics.sh --describe --zookeeper $host:2181 --topic $topic";
376              
377 0           $self->log->verbose("describe_topic: cmd[$cmd]");
378              
379 0           return $self->execute($cmd);
380             }
381              
382             # https://stackoverflow.com/questions/16284399/purge-kafka-queue
383             # kafka-topics.sh --zookeeper localhost:13003 --alter --topic MyTopic --config retention.ms=1000
384             # Wait, then restore previous retention.ms
385             #sub purge_topic {
386             #}
387              
388             sub run_console_producer {
389 0     0 0   my $self = shift;
390 0           my ($topic) = @_;
391              
392 0 0         $self->brik_help_run_undef_arg('run_console_producer', $topic) or return;
393              
394 0           my $basedir = $ENV{HOME}."/metabrik/server-kafka/kafka";
395 0           my $host = $self->host;
396              
397 0           my $cmd = "$basedir/bin/kafka-console-producer.sh --broker-list $host:9092 --topic $topic";
398              
399 0           $self->log->verbose("run_console_producer: cmd[$cmd]");
400              
401 0           return $self->execute($cmd);
402             }
403              
404             sub run_console_consumer {
405 0     0 0   my $self = shift;
406 0           my ($topic) = @_;
407              
408 0 0         $self->brik_help_run_undef_arg('run_console_consumer', $topic) or return;
409              
410 0           my $basedir = $ENV{HOME}."/metabrik/server-kafka/kafka";
411 0           my $host = $self->host;
412              
413 0           my $cmd = "$basedir/bin/kafka-console-consumer.sh --bootstrap-server $host:9092 ".
414             "--topic $topic --from-beginning";
415              
416 0           $self->log->verbose("run_console_consumer: cmd[$cmd]");
417              
418 0           return $self->execute($cmd);
419             }
420              
421             1;
422              
423             __END__