File Coverage

blib/lib/Kafka/Librd.pm
Criterion Covered Total %
statement 156 156 100.0
branch n/a
condition n/a
subroutine 7 7 100.0
pod 1 1 100.0
total 164 164 100.0


line stmt bran cond sub pod time code
1             package Kafka::Librd;
2              
3 5     5   433177 use strict;
  5         43  
  5         139  
4 5     5   21 use warnings;
  5         8  
  5         109  
5 5     5   20 use XSLoader;
  5         8  
  5         78  
6 5     5   2153 use Exporter::Lite;
  5         3117  
  5         25  
7              
8             our $VERSION = '0.20';
9             XSLoader::load('Kafka::Librd', $VERSION);
10              
11             our @EXPORT_OK;
12              
13             =head1 NAME
14              
15             Kafka::Librd - bindings for librdkafka
16              
17             =head1 SYNOPSIS
18              
19             use Kafka::Librd;
20              
21             my $kafka = Kafka::Librd->new(
22             Kafka::Librd::RD_KAFKA_CONSUMER,
23             {
24             "group.id" => 'consumer_id',
25             },
26             );
27             $kafka->brokers_add('server1:9092,server2:9092');
28             $kafka->subscribe( \@topics );
29             while (1) {
30             my $msg = $kafka->consumer_poll(1000);
31             if ($msg) {
32             if ( $msg->err ) {
33             say "Error: ", Kafka::Librd::Error::to_string($err);
34             }
35             else {
36             say $msg->payload;
37             }
38             }
39             }
40              
41              
42             =head1 DESCRIPTION
43              
44             This module provides perl bindings for librdkafka.
45              
46             =head1 METHODS
47              
48             =cut
49              
50             =head2 new
51              
52             $kafka = $class->new($type, \%config)
53              
54             Create a new instance. $type can be either C or
55             C. Config is a hash with configuration parameters as
56             described in
57             L,
58             additionally it may include C key, with a hash containing
59             default topic configuration properties.
60              
61             =cut
62              
63             sub new {
64 6     6 1 2378 my ( $class, $type, $params ) = @_;
65 6         5940 return _new( $type, $params );
66             }
67              
68             {
69             my $errors = Kafka::Librd::Error::rd_kafka_get_err_descs();
70 5     5   564 no strict 'refs';
  5         10  
  5         580  
71             for ( keys %$errors ) {
72 1     1   4 *{__PACKAGE__ . "::RD_KAFKA_RESP_ERR_$_"} = eval "sub { $errors->{$_} }";
  1         5  
  1         5  
  1         4  
  1         5  
  1         4  
  1         5  
  1         5  
  1         5  
  1         5  
  1         5  
  1         4  
  1         5  
  1         4  
  1         5  
  1         6  
  1         5  
  1         5  
  1         5  
  1         5  
  1         4  
  1         4  
  1         5  
  1         5  
  1         796  
  1         5  
  1         5  
  1         4  
  1         6  
  1         6  
  1         5  
  1         5  
  1         4  
  1         4  
  1         4  
  1         5  
  1         5  
  1         5  
  1         4  
  1         5  
  1         5  
  1         4  
  1         5  
  2         9  
  3         1018  
  1         5  
  1         4  
  2         788  
  1         5  
  1         5  
  1         4  
  1         4  
  1         5  
  1         5  
  1         5  
  1         5  
  1         4  
  1         4  
  1         5  
  1         5  
  1         1239  
  1         5  
  1         5  
  1         4  
  1         5  
  1         5  
  1         5  
  1         5  
  1         6  
  1         4  
  1         5  
  1         4  
  1         1089  
  1         4  
  1         5  
  1         4  
  1         4  
  2         10  
  1         5  
  1         5  
  1         6  
  1         5  
  1         4  
  1         5  
  1         5  
  1         6  
  1         5  
  1         5  
  1         5  
  1         4  
  1         5  
  1         5  
  1         5  
  1         5  
  1         5  
  1         4  
  1         6  
  1         5  
  1         5  
  1         5  
  1         4  
  1         6  
  2         127  
  1         5  
  1         5  
  1         5  
  1         1143  
  1         5  
  1         6  
  1         5  
  1         6  
  1         5  
  1         5  
  1         4  
  1         5  
  1         5  
  1         6  
  1         5  
  1         6  
  1         4  
  1         5  
  1         5  
  1         5  
  1         4  
  1         791  
  1         6  
  1         4  
  1         6  
  1         816  
  1         4  
  1         5  
  2         12  
  1         4  
  1         7  
  1         5  
  1         5  
  1         5  
  1         5  
  1         6  
73             push @EXPORT_OK, "RD_KAFKA_RESP_ERR_$_";
74             }
75             }
76              
77             =head2 brokers_add
78              
79             $cnt = $kafka->brokers_add($brokers)
80              
81             add one or more brokers to the list of initial bootstrap brokers. I<$brokers>
82             is a comma separated list of brokers in the format C<[proto://]host[:port]>.
83              
84             =head2 subscribe
85              
86             $err = $kafka->subscribe(\@topics)
87              
88             subscribe to the list of topics using balanced consumer groups.
89              
90             =head2 unsubscribe
91              
92             $err = $kafka->unsubscribe
93              
94             unsubscribe from the current subscription set
95              
96             =head2 subscription
97              
98             $tplist = $kafka->subscription
99              
100             return current subscriptions. Subscription returned as a reference to array of
101             hashes with the following fields: C, C, C, C.
102              
103             =head2 assign
104              
105             $err = $kafka->assign(\@tplist)
106              
107             assign partitions to consume. C<@tplist> is an array of hashes with
108             C and C fields set.
109              
110             =head2 assignment
111              
112             $tplist = $kafka->assignment
113              
114             return current assignment. Result returned in the same way as for
115             L.
116              
117             =head2 consumer_poll
118              
119             $msg = $kafka->consumer_poll($timeout_ms)
120              
121             poll for messages or events. If any message or event received, returns
122             L object. If C<<$msg->err>> for returned object is zero
123             (RD_KAFKA_RESP_ERR_NO_ERROR), then it is a proper message, otherwise it is an
124             event or an error.
125              
126             =head2 commit
127              
128             $err = $kafka->commit(\@tplist, $async)
129              
130             commit offsets to the broker. C<@tplist> is an array of hashes
131             with the following keys: C, C, C, C. If
132             @topic_partition_list is missing or undef, then current partition assignment
133             is used instead. If C<$async> is 1, then method returns immediately, if it is
134             0 or missing then method blocks until offsets are committed.
135              
136             =head2 commit_message
137              
138             $err = $kafka->commit_message($msg, $async)
139              
140             commit message's offset for the message's partition. C<$async> same as for
141             L.
142              
143             =head2 committed
144              
145             $tplist = $kafka->committed(\@tplist, $timeout_ms)
146              
147             retrieve committed offsets for topics and partitions specified in C<@tplist>,
148             which is an array of hashes with C and C fields. Returned
149             C<$tplist> contains a copy of the input list with added C fields.
150              
151             =head2 position
152              
153             $tplist = $kafka->position(\@tplist)
154              
155             retrieve current offsets for topics and partitions specified in C<@tplist>,
156             which is an array of hashes with C and C fields. Returned
157             C<$tplist> contains a copy of the input list with added C fields.
158              
159             =head2 consumer_close
160              
161             $err = $kafka->consumer_close
162              
163             close down the consumer
164              
165             =head2 topic
166              
167             $topic = $kafka->topic($name, \%config)
168              
169             Return a L object, that can be used to produce
170             messages.
171              
172             If an error occurs during creation of the topic, C is returned. In such
173             case use L to obtain the corresponding error
174             code!
175              
176             =head2 outq_len
177              
178             $len = $kafka->outq_len
179              
180             return the current out queue length.
181              
182             =head2 flush
183              
184             $kafka->flush($timeout_ms)
185              
186             wait until all outstanding produce requests, et.al, are completed.
187              
188             =head2 destroy
189              
190             $kafka->destroy
191              
192             destroy kafka handle
193              
194             =head2 dump
195              
196             $kafka->dump
197              
198             dump internal state of kafka handle to stdout, only useful for debugging
199              
200             =head1 Kafka::Librd::Topic
201              
202             This class maps to C structure from librdkafka and represents
203             topic. It should be created with L method of Kafka::Librd object. It
204             provides the following method:
205              
206             =head2 produce
207              
208             $status = $topic->produce($partition, $msgflags, $payload, $key)
209              
210             produce a message for the topic. I<$msgflags> can be RD_KAFKA_MSG_F_BLOCK in
211             the future, but currently it should be set to 0, RD_KAFKA_MSG_F_COPY and
212             RD_KAFKA_MSG_F_FREE must not be used, internally RD_KAFKA_MSG_F_COPY is always
213             set.
214              
215             The returned status is -1 in case of an error, otherwise 0. The error code can
216             be retrieved using the L function.
217              
218             =head2 destroy
219              
220             $topic->destroy
221              
222             destroy topic handle
223              
224             =head1 Kafka::Librd::Message
225              
226             This class maps to C structure from librdkafka and
227             represents message or event. Objects of this class have the following methods:
228              
229             =head2 err
230              
231             return error code from the message
232              
233             =head2 topic
234              
235             return topic name
236              
237             =head2 partition
238              
239             return partition number
240              
241             =head2 offset
242              
243             return offset. Note, that the value is truncated to 32 bit if your perl doesn't
244             support 64 bit integers.
245              
246             =head2 key
247              
248             return message key
249              
250             =head2 payload
251              
252             return message payload
253              
254             =head2 timestamp(\$tstype)
255              
256             return message timestamp (milliseconds since UNIX epoch)
257              
258             The $tstype argument is optional, and if present, it should be a
259             scalar reference. It will be filled with one of the following values:
260              
261             =over
262              
263             =item
264              
265             C
266              
267             =item
268              
269             C
270              
271             =item
272              
273             C
274              
275             =back
276              
277             =head1 Kafka::Librd::Error
278              
279             =head2 Kafka::Librd::Error::to_string
280              
281             my $error_message = Kafka::Librd::Error::to_string($err)
282              
283             Convert an error code into a human-readable error description. Use this for
284             error codes returned by L and
285             L.
286              
287             =head2 Kafka::Librd::Error::last_error
288              
289             my $err = Kafka::Librd::Error::last_error
290              
291             Retrieve the last error state set by function calls L and L.
292             This function should be called immediately after those functions, since they
293             store error information globally.
294              
295             =cut
296              
297             package Kafka::Librd;
298             package Kafka::Librd::Topic;
299             package Kafka::Librd::Message;
300             package Kafka::Librd::Error;
301              
302             1;
303              
304             __END__