File Coverage

blib/lib/Kafka/Librd.pm
Criterion Covered Total %
statement 156 156 100.0
branch n/a
condition n/a
subroutine 7 7 100.0
pod 1 1 100.0
total 164 164 100.0


line stmt bran cond sub pod time code
1             package Kafka::Librd;
2              
3 5     5   518476 use strict;
  5         50  
  5         153  
4 5     5   25 use warnings;
  5         10  
  5         147  
5 5     5   26 use XSLoader;
  5         9  
  5         94  
6 5     5   2434 use Exporter::Lite;
  5         3854  
  5         28  
7              
8             our $VERSION = '0.18_01';
9             XSLoader::load('Kafka::Librd', $VERSION);
10             $VERSION =~ tr/_//d;
11              
12             our @EXPORT_OK;
13              
14             =head1 NAME
15              
16             Kafka::Librd - bindings for librdkafka
17              
18             =head1 SYNOPSIS
19              
20             use Kafka::Librd;
21              
22             my $kafka = Kafka::Librd->new(
23             Kafka::Librd::RD_KAFKA_CONSUMER,
24             {
25             "group.id" => 'consumer_id',
26             },
27             );
28             $kafka->brokers_add('server1:9092,server2:9092');
29             $kafka->subscribe( \@topics );
30             while (1) {
31             my $msg = $kafka->consumer_poll(1000);
32             if ($msg) {
33             if ( $msg->err ) {
34             say "Error: ", Kafka::Librd::Error::to_string($err);
35             }
36             else {
37             say $msg->payload;
38             }
39             }
40             }
41              
42              
43             =head1 DESCRIPTION
44              
45             This module provides perl bindings for librdkafka.
46              
47             =head1 METHODS
48              
49             =cut
50              
51             =head2 new
52              
53             $kafka = $class->new($type, \%config)
54              
55             Create a new instance. $type can be either C or
56             C. Config is a hash with configuration parameters as
57             described in
58             L,
59             additionally it may include C key, with a hash containing
60             default topic configuration properties.
61              
62             =cut
63              
64             sub new {
65 6     6 1 2905 my ( $class, $type, $params ) = @_;
66 6         7158 return _new( $type, $params );
67             }
68              
69             {
70             my $errors = Kafka::Librd::Error::rd_kafka_get_err_descs();
71 5     5   727 no strict 'refs';
  5         11  
  5         692  
72             for ( keys %$errors ) {
73 1     1   990 *{__PACKAGE__ . "::RD_KAFKA_RESP_ERR_$_"} = eval "sub { $errors->{$_} }";
  1         960  
  1         960  
  1         934  
  1         6  
  1         6  
  1         1020  
  1         5  
  1         1003  
  2         12  
  1         6  
  1         6  
  2         1511  
  1         6  
  1         948  
  1         9  
  1         1015  
  1         973  
  1         6  
  1         958  
  1         1013  
  1         7  
  1         969  
  1         6  
  1         6  
  1         955  
  1         958  
  1         1003  
  1         6  
  1         974  
  1         7  
  1         7  
  1         7  
  1         8  
  1         995  
  1         934  
  1         934  
  1         966  
  1         953  
  1         6  
  1         933  
  2         12  
  1         1001  
  1         993  
  1         5  
  1         5  
  1         6  
  1         953  
  1         955  
  1         5  
  1         954  
  1         7  
  1         969  
  1         1495  
  1         5  
  1         950  
  1         7  
  1         959  
  1         930  
  1         958  
  1         6  
  1         6  
  1         7  
  1         7  
  1         7  
  3         1150  
  1         7  
  1         6  
  1         951  
  1         6  
  1         967  
  1         5  
  1         6  
  1         1009  
  1         1011  
  1         965  
  1         9  
  1         958  
  1         1007  
  1         5  
  1         6  
  1         956  
  1         6  
  1         6  
  1         960  
  1         6  
  1         6  
  1         959  
  1         6  
  1         6  
  1         5  
  1         956  
  1         6  
  1         955  
  1         958  
  1         946  
  1         5  
  1         8  
  1         959  
  1         8  
  1         948  
  1         955  
  1         1006  
  1         6  
  1         1018  
  1         6  
  1         955  
  1         5  
  1         6  
  1         968  
  1         964  
  1         934  
  1         6  
  1         6  
  1         1014  
  1         962  
  1         5  
  1         968  
  1         10  
  1         932  
  1         8  
  1         968  
  1         951  
  1         1108  
  2         12  
  1         956  
  1         6  
  2         960  
  1         1022  
  1         7  
  1         8  
  1         965  
  1         6  
  1         965  
  1         956  
  1         5  
  1         951  
  1         957  
  1         5  
74             push @EXPORT_OK, "RD_KAFKA_RESP_ERR_$_";
75             }
76             }
77              
78             =head2 brokers_add
79              
80             $cnt = $kafka->brokers_add($brokers)
81              
82             add one or more brokers to the list of initial bootstrap brokers. I<$brokers>
83             is a comma separated list of brokers in the format C<[proto://]host[:port]>.
84              
85             =head2 subscribe
86              
87             $err = $kafka->subscribe(\@topics)
88              
89             subscribe to the list of topics using balanced consumer groups.
90              
91             =head2 unsubscribe
92              
93             $err = $kafka->unsubscribe
94              
95             unsubscribe from the current subscription set
96              
97             =head2 subscription
98              
99             $tplist = $kafka->subscription
100              
101             return current subscriptions. Subscription returned as a reference to array of
102             hashes with the following fields: C, C, C, C.
103              
104             =head2 assign
105              
106             $err = $kafka->assign(\@tplist)
107              
108             assign partitions to consume. C<@tplist> is an array of hashes with
109             C and C fields set.
110              
111             =head2 assignment
112              
113             $tplist = $kafka->assignment
114              
115             return current assignment. Result returned in the same way as for
116             L.
117              
118             =head2 consumer_poll
119              
120             $msg = $kafka->consumer_poll($timeout_ms)
121              
122             poll for messages or events. If any message or event received, returns
123             L object. If C<<$msg->err>> for returned object is zero
124             (RD_KAFKA_RESP_ERR_NO_ERROR), then it is a proper message, otherwise it is an
125             event or an error.
126              
127             =head2 commit
128              
129             $err = $kafka->commit(\@tplist, $async)
130              
131             commit offsets to the broker. C<@tplist> is an array of hashes
132             with the following keys: C, C, C, C. If
133             @topic_partition_list is missing or undef, then current partition assignment
134             is used instead. If C<$async> is 1, then method returns immediately, if it is
135             0 or missing then method blocks until offsets are committed.
136              
137             =head2 commit_message
138              
139             $err = $kafka->commit_message($msg, $async)
140              
141             commit message's offset for the message's partition. C<$async> same as for
142             L.
143              
144             =head2 committed
145              
146             $tplist = $kafka->committed(\@tplist, $timeout_ms)
147              
148             retrieve committed offsets for topics and partitions specified in C<@tplist>,
149             which is an array of hashes with C and C fields. Returned
150             C<$tplist> contains a copy of the input list with added C fields.
151              
152             =head2 position
153              
154             $tplist = $kafka->position(\@tplist)
155              
156             retrieve current offsets for topics and partitions specified in C<@tplist>,
157             which is an array of hashes with C and C fields. Returned
158             C<$tplist> contains a copy of the input list with added C fields.
159              
160             =head2 consumer_close
161              
162             $err = $kafka->consumer_close
163              
164             close down the consumer
165              
166             =head2 topic
167              
168             $topic = $kafka->topic($name, \%config)
169              
170             Return a L object, that can be used to produce
171             messages.
172              
173             If an error occurs during creation of the topic, C is returned. In such
174             case use L to obtain the corresponding error
175             code!
176              
177             =head2 outq_len
178              
179             $len = $kafka->outq_len
180              
181             return the current out queue length.
182              
183             =head2 flush
184              
185             $kafka->flush($timeout_ms)
186              
187             wait until all outstanding produce requests, et.al, are completed.
188              
189             =head2 destroy
190              
191             $kafka->destroy
192              
193             destroy kafka handle
194              
195             =head2 dump
196              
197             $kafka->dump
198              
199             dump internal state of kafka handle to stdout, only useful for debugging
200              
201             =head1 Kafka::Librd::Topic
202              
203             This class maps to C structure from librdkafka and represents
204             topic. It should be created with L method of Kafka::Librd object. It
205             provides the following method:
206              
207             =head2 produce
208              
209             $status = $topic->produce($partition, $msgflags, $payload, $key)
210              
211             produce a message for the topic. I<$msgflags> can be RD_KAFKA_MSG_F_BLOCK in
212             the future, but currently it should be set to 0, RD_KAFKA_MSG_F_COPY and
213             RD_KAFKA_MSG_F_FREE must not be used, internally RD_KAFKA_MSG_F_COPY is always
214             set.
215              
216             The returned status is -1 in case of an error, otherwise 0. The error code can
217             be retrieved using the L function.
218              
219             =head2 destroy
220              
221             $topic->destroy
222              
223             destroy topic handle
224              
225             =head1 Kafka::Librd::Message
226              
227             This class maps to C structure from librdkafka and
228             represents message or event. Objects of this class have the following methods:
229              
230             =head2 err
231              
232             return error code from the message
233              
234             =head2 topic
235              
236             return topic name
237              
238             =head2 partition
239              
240             return partition number
241              
242             =head2 offset
243              
244             return offset. Note, that the value is truncated to 32 bit if your perl doesn't
245             support 64 bit integers.
246              
247             =head2 key
248              
249             return message key
250              
251             =head2 payload
252              
253             return message payload
254              
255             =head2 timestamp(\$tstype)
256              
257             return message timestamp (milliseconds since UNIX epoch)
258              
259             The $tstype argument is optional, and if present, it should be a
260             scalar reference. It will be filled with one of the following values:
261              
262             =over
263              
264             =item
265              
266             C
267              
268             =item
269              
270             C
271              
272             =item
273              
274             C
275              
276             =back
277              
278             =head1 Kafka::Librd::Error
279              
280             =head2 Kafka::Librd::Error::to_string
281              
282             my $error_message = Kafka::Librd::Error::to_string($err)
283              
284             Convert an error code into a human-readable error description. Use this for
285             error codes returned by L and
286             L.
287              
288             =head2 Kafka::Librd::Error::last_error
289              
290             my $err = Kafka::Librd::Error::last_error
291              
292             Retrieve the last error state set by function calls L and L.
293             This function should be called immediately after those functions, since they
294             store error information globally.
295              
296             =cut
297              
298             package Kafka::Librd;
299             package Kafka::Librd::Topic;
300             package Kafka::Librd::Message;
301             package Kafka::Librd::Error;
302              
303             1;
304              
305             __END__