File Coverage

lib/Kafka/Consumer.pm
Criterion Covered Total %
statement 97 135 71.8
branch 44 82 53.6
condition 70 161 43.4
subroutine 16 22 72.7
pod 9 9 100.0
total 236 409 57.7


line stmt bran cond sub pod time code
1             package Kafka::Consumer;
2              
3             =head1 NAME
4              
5             Kafka::Consumer - Perl interface for Kafka consumer client.
6              
7             =head1 VERSION
8              
9             This documentation refers to C<Kafka::Consumer> version 1.08 .
10              
11             =cut
12              
13 6     6   4326 use 5.010;
  6         37  
14 6     6   31 use strict;
  6         10  
  6         122  
15 6     6   25 use warnings;
  6         38  
  6         271  
16              
17             our $VERSION = 'v1.08';
18              
19 6     6   41 use Carp;
  6         23  
  6         434  
20 6         396 use Params::Util qw(
21             _INSTANCE
22             _NONNEGINT
23             _NUMBER
24             _POSINT
25             _STRING
26 6     6   39 );
  6         8  
27 6         379 use Scalar::Util::Numeric qw(
28             isint
29 6     6   35 );
  6         19  
30              
31 6         1079 use Kafka qw(
32             $BITS64
33             $DEFAULT_MAX_BYTES
34             $DEFAULT_MAX_NUMBER_OF_OFFSETS
35             $DEFAULT_MAX_WAIT_TIME
36             %ERROR
37             $ERROR_CANNOT_GET_METADATA
38             $ERROR_METADATA_ATTRIBUTES
39             $ERROR_MISMATCH_ARGUMENT
40             $ERROR_NOT_BINARY_STRING
41             $ERROR_PARTITION_DOES_NOT_MATCH
42             $ERROR_TOPIC_DOES_NOT_MATCH
43             $MESSAGE_SIZE_OVERHEAD
44             $MIN_BYTES_RESPOND_IMMEDIATELY
45             $RECEIVE_LATEST_OFFSETS
46             $RECEIVE_EARLIEST_OFFSET
47 6     6   46 );
  6         12  
48 6     6   50 use Kafka::Exceptions;
  6         12  
  6         297  
49 6         740 use Kafka::Internals qw(
50             $APIKEY_FETCH
51             $APIKEY_OFFSET
52             $APIKEY_OFFSETCOMMIT
53             $APIKEY_OFFSETFETCH
54             $MAX_INT32
55             _get_CorrelationId
56             _isbig
57             format_message
58 6     6   34 );
  6         11  
59 6     6   41 use Kafka::Connection;
  6         12  
  6         460  
60 6     6   1905 use Kafka::Message;
  6         13  
  6         12466  
61              
62             if ( !$BITS64 ) { eval 'use Kafka::Int64; 1;' or die "Cannot load Kafka::Int64 : $@"; } ## no critic
63              
64             =head1 SYNOPSIS
65              
66             use 5.010;
67             use strict;
68             use warnings;
69              
70             use Scalar::Util qw(
71             blessed
72             );
73             use Try::Tiny;
74              
75             use Kafka qw(
76             $DEFAULT_MAX_BYTES
77             $DEFAULT_MAX_NUMBER_OF_OFFSETS
78             $RECEIVE_EARLIEST_OFFSET
79             );
80             use Kafka::Connection;
81             use Kafka::Consumer;
82              
83             my ( $connection, $consumer );
84             try {
85              
86             #-- Connection
87             $connection = Kafka::Connection->new( host => 'localhost' );
88              
89             #-- Consumer
90             $consumer = Kafka::Consumer->new( Connection => $connection );
91              
92             # Get a valid offset before the given time
93             my $offsets = $consumer->offset_before_time(
94             'mytopic', # topic
95             0, # partition
96             (time()-3600) * 1000, # time
97             );
98              
99             if ( @$offsets ) {
100             say "Received offset: $_" foreach @$offsets;
101             } else {
102             warn "Error: Offsets are not received\n";
103             }
104              
105             # Consuming messages
106             my $messages = $consumer->fetch(
107             'mytopic', # topic
108             0, # partition
109             0, # offset
110             $DEFAULT_MAX_BYTES # Maximum size of MESSAGE(s) to receive
111             );
112              
113             if ( $messages ) {
114             foreach my $message ( @$messages ) {
115             if ( $message->valid ) {
116             say 'payload : ', $message->payload;
117             say 'key : ', $message->key;
118             say 'offset : ', $message->offset;
119             say 'next_offset: ', $message->next_offset;
120             } else {
121             say 'error : ', $message->error;
122             }
123             }
124             }
125              
126             } catch {
127             my $error = $_;
128             if ( blessed( $error ) && $error->isa( 'Kafka::Exception' ) ) {
129             warn 'Error: (', $error->code, ') ', $error->message, "\n";
130             exit;
131             } else {
132             die $error;
133             }
134             };
135              
136             # Closes the consumer and cleans up
137             undef $consumer;
138             $connection->close;
139             undef $connection;
140              
141             =head1 DESCRIPTION
142              
143             Kafka consumer API is implemented by C<Kafka::Consumer> class.
144              
145             The main features of the C<Kafka::Consumer> class are:
146              
147             =over 3
148              
149             =item *
150              
151             Provides an object-oriented API for consuming messages.
152              
153             =item *
154              
155             Provides Kafka FETCH and OFFSETS requests.
156              
157             =item *
158              
159             Supports parsing the Apache Kafka 0.9+ Wire Format protocol.
160              
161             =item *
162              
163             Works with 64-bit elements of the Kafka Wire Format protocol
164             on 32 bit systems.
165              
166             =back
167              
168             The Kafka consumer response returns ARRAY references for C<offsets> and
169             C<fetch> methods.
170              
171             Array returned by C<offsets> contains offset integers.
172              
173             Array returned by C<fetch> contains objects of L<Kafka::Message|Kafka::Message> class.
174              
175             =cut
176              
177             #-- constructor ----------------------------------------------------------------
178              
179             =head2 CONSTRUCTOR
180              
181             =head3 C<new>
182              
183             Creates a new consumer client object. Returns the created C<Kafka::Consumer> object.
184              
185             C<new()> takes arguments in key-value pairs. The following arguments are recognized:
186              
187             =over 3
188              
189             =item C<Connection =E<gt> $connection>
190              
191             C<$connection> is the L<Kafka::Connection|Kafka::Connection> object responsible for communication with
192             the Apache Kafka cluster.
193              
194             =item C<ClientId =E<gt> $client_id>
195              
196             This is a user supplied identifier (string) for the client application.
197              
198             C<ClientId> will be auto-assigned if not passed in when creating L<Kafka::Producer|Kafka::Producer> object.
199              
200             =item C<MaxWaitTime =E<gt> $max_time>
201              
202             The maximum amount of time (seconds, may be fractional) to wait when no sufficient data is available at the time the
203             request was issued.
204              
205             Optional, default is C<$DEFAULT_MAX_WAIT_TIME>.
206              
207             C<$DEFAULT_MAX_WAIT_TIME> is the default time that can be imported from the
208             L<Kafka|Kafka> module.
209              
210             The C<$max_time> must be a positive number.
211              
212             =item C<MinBytes =E<gt> $min_bytes>
213              
214             The minimum number of bytes of messages that must be available to give a response.
215             If the client sets this to C<$MIN_BYTES_RESPOND_IMMEDIATELY> the server will always respond
216             immediately. If it is set to C<$MIN_BYTES_RESPOND_HAS_DATA>, the server will respond as soon
217             as at least one partition has at least 1 byte of data or the specified timeout occurs.
218             Setting higher values in combination with the bigger timeouts allows reading larger chunks of data.
219              
220             Optional, int32 signed integer, default is C<$MIN_BYTES_RESPOND_IMMEDIATELY>.
221              
222             C<$MIN_BYTES_RESPOND_IMMEDIATELY>, C<$MIN_BYTES_RESPOND_HAS_DATA> are the defaults that
223             can be imported from the L<Kafka|Kafka> module.
224              
225             The C<$min_bytes> must be a non-negative int32 signed integer.
226              
227             =item C<MaxBytes =E<gt> $max_bytes>
228              
229             The maximum bytes to include in the message set for this partition.
230              
231             Optional, int32 signed integer, default = C<$DEFAULT_MAX_BYTES> (1_000_000).
232              
233             The C<$max_bytes> must be more than C<$MESSAGE_SIZE_OVERHEAD>
234             (size of protocol overhead - data added by Kafka wire protocol to each message).
235              
236             C<$DEFAULT_MAX_BYTES>, C<$MESSAGE_SIZE_OVERHEAD>
237             are the defaults that can be imported from the L<Kafka|Kafka> module.
238              
239             =item C<MaxNumberOfOffsets =E<gt> $max_number>
240              
241             Limit the number of offsets returned by Kafka.
242              
243             That is a non-negative integer.
244              
245             Optional, int32 signed integer, default = C<$DEFAULT_MAX_NUMBER_OF_OFFSETS> (100).
246              
247             C<$DEFAULT_MAX_NUMBER_OF_OFFSETS>
248             is the default that can be imported from the L<Kafka|Kafka> module.
249              
250             =back
251              
252             =cut
253             sub new {
254 179     179 1 230895 my ( $class, %params ) = @_;
255              
256 179         814 my $self = bless {
257             Connection => undef,
258             ClientId => undef,
259             MaxWaitTime => $DEFAULT_MAX_WAIT_TIME,
260             MinBytes => $MIN_BYTES_RESPOND_IMMEDIATELY,
261             MaxBytes => $DEFAULT_MAX_BYTES,
262             MaxNumberOfOffsets => $DEFAULT_MAX_NUMBER_OF_OFFSETS,
263             ApiVersion => undef, # undef - allows consumer to choose newest supported
264             }, $class;
265              
266 179         510 foreach my $p ( keys %params ) {
267 594 50       884 if( exists $self->{ $p } ) {
268 594         800 $self->{ $p } = $params{ $p };
269             }
270             else {
271 0         0 $self->_error( $ERROR_MISMATCH_ARGUMENT, $p );
272             }
273             }
274              
275 179   100     791 $self->{ClientId} //= 'consumer';
276              
277             $self->_error( $ERROR_MISMATCH_ARGUMENT, 'Connection' )
278 179 100       1238 unless _INSTANCE( $self->{Connection}, 'Kafka::Connection' );
279             $self->_error( $ERROR_MISMATCH_ARGUMENT, 'ClientId' )
280 159 100 66     704 unless ( $self->{ClientId} eq q{} || defined( _STRING( $self->{ClientId} ) ) );
281             $self->_error( $ERROR_NOT_BINARY_STRING, 'ClientId' )
282 152 100       349 if utf8::is_utf8( $self->{ClientId} );
283             $self->_error( $ERROR_MISMATCH_ARGUMENT, format_message( 'MaxWaitTime (%s)', $self->{MaxWaitTime} ) )
284 151 100 100     1059 unless defined( $self->{MaxWaitTime} ) && defined _NUMBER( $self->{MaxWaitTime} ) && int( $self->{MaxWaitTime} * 1000 ) >= 1 && int( $self->{MaxWaitTime} * 1000 ) <= $MAX_INT32;
      100        
      66        
285             $self->_error( $ERROR_MISMATCH_ARGUMENT, format_message( 'MinBytes (%s)', $self->{MinBytes} ) )
286 139 50 66     428 unless ( _isbig( $self->{MinBytes} ) ? ( $self->{MinBytes} >= 0 ) : defined( _NONNEGINT( $self->{MinBytes} ) ) ) && $self->{MinBytes} <= $MAX_INT32;
    100          
287             $self->_error( $ERROR_MISMATCH_ARGUMENT, format_message( 'MaxBytes (%s)', $self->{MaxBytes} ) )
288 126 50 100     1419 unless ( _isbig( $self->{MaxBytes} ) ? ( $self->{MaxBytes} > 0 ) : _POSINT( $self->{MaxBytes} ) ) && $self->{MaxBytes} >= $MESSAGE_SIZE_OVERHEAD && $self->{MaxBytes} <= $MAX_INT32;
    100 66        
289             $self->_error( $ERROR_MISMATCH_ARGUMENT, format_message( 'MaxNumberOfOffsets (%s)', $self->{MaxNumberOfOffsets} ) )
290 110 100 66     2788 unless defined( _POSINT( $self->{MaxNumberOfOffsets} ) ) && $self->{MaxNumberOfOffsets} <= $MAX_INT32;
291              
292 95         1043 return $self;
293             }
294              
295             #-- public attributes ----------------------------------------------------------
296              
297             =head2 METHODS
298              
299             The following methods are defined for the C<Kafka::Consumer> class:
300              
301             =cut
302              
303             #-- public methods -------------------------------------------------------------
304              
305             =head3 C<fetch( $topic, $partition, $start_offset, $max_size )>
306              
307             Get a list of messages to consume one by one up to C<$max_size> bytes.
308              
309             Returns the reference to array of the L<Kafka::Message|Kafka::Message> objects.
310              
311             C<fetch()> takes the following arguments:
312              
313             =over 3
314              
315             =item C<$topic>
316              
317             The C<$topic> must be a normal non-false string of non-zero length.
318              
319             =item C<$partition>
320              
321             The C<$partition> must be a non-negative integer.
322              
323             =item C<$start_offset>
324              
325             Offset in topic and partition to start from (64-bit integer).
326              
327             The argument must be a non-negative integer. The argument may be a
328             L<Math::BigInt|Math::BigInt> integer on 32-bit system.
329              
330             =item C<$max_size>
331              
332             C<$max_size> is the maximum size of the messages set to return. The argument
333             must be a positive int32 signed integer.
334              
335             The maximum size of a request limited by C<MAX_SOCKET_REQUEST_BYTES> that
336             can be imported from L<Kafka|Kafka> module.
337              
338             =back
339              
340             =cut
341             sub fetch {
342 5055     5055 1 140350 my ( $self, $topic, $partition, $start_offset, $max_size, $_return_all, $api_version ) = @_;
343             # Special argument: $_return_all - return redundant messages sent out of a compressed package posts
344              
345 5055 100 66     22159 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'topic' )
      66        
346             unless defined( $topic ) && ( $topic eq q{} || defined( _STRING( $topic ) ) );
347 5048 100       9133 $self->_error( $ERROR_NOT_BINARY_STRING, 'topic' )
348             if utf8::is_utf8( $topic );
349 5047 100 66     21777 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'partition' )
      66        
350             unless defined( $partition ) && isint( $partition ) && $partition >= 0;
351 5037 100 66     11714 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'offset' )
      100        
352             unless defined( $start_offset ) && ( ( _isbig( $start_offset ) && $start_offset >= 0 ) || defined( _NONNEGINT( $start_offset ) ) );
353 5024 100 66     41879 $self->_error( $ERROR_MISMATCH_ARGUMENT, format_message( 'max_size (%s)', $max_size ) )
      100        
      66        
      66        
354             unless ( !defined( $max_size ) || ( ( _isbig( $max_size ) || _POSINT( $max_size ) ) && $max_size >= $MESSAGE_SIZE_OVERHEAD && $max_size <= $MAX_INT32 ) );
355              
356             my $request = {
357             ApiKey => $APIKEY_FETCH,
358             ApiVersion => $api_version // $self->{ApiVersion},
359             CorrelationId => _get_CorrelationId(),
360             ClientId => $self->{ClientId},
361             MaxWaitTime => int( $self->{MaxWaitTime} * 1000 ),
362             MinBytes => $self->{MinBytes},
363             MaxBytes => $max_size // $self->{MaxBytes},
364             topics => [
365             {
366             TopicName => $topic,
367             partitions => [
368             {
369             Partition => $partition,
370             FetchOffset => $start_offset,
371             MaxBytes => $max_size // $self->{MaxBytes},
372             },
373 5011   33     48123 ],
      66        
      66        
374             },
375             ],
376             };
377              
378 5011         13698 my $response = $self->{Connection}->receive_response_to_request( $request, undef, $self->{MaxWaitTime} );
379              
380 5009         7155 my $messages = [];
381 5009         4517 foreach my $received_topic ( @{ $response->{topics} } ) {
  5009         7611  
382             $received_topic->{TopicName} eq $topic
383 5009 50       7868 or $self->_error( $ERROR_TOPIC_DOES_NOT_MATCH, format_message( "'%s' ne '%s'", $topic, $received_topic->{TopicName} ) );
384 5009         4456 foreach my $received_partition ( @{ $received_topic->{partitions} } ) {
  5009         5177  
385             $received_partition->{Partition} == $partition
386 5009 50       7085 or $self->_error( $ERROR_PARTITION_DOES_NOT_MATCH, format_message( '%s != %s', $partition, $received_partition->{Partition} ) );
387 5009         4553 my $HighwaterMarkOffset = $received_partition->{HighwaterMarkOffset};
388 5009         3866 foreach my $Message ( @{ $received_partition->{MessageSet} } ) {
  5009         5445  
389 15018         15652 my $offset = $Message->{Offset};
390 15018         11511 my $next_offset;
391 15018 50       16271 if ( $BITS64 ) {
392 15018         13166 $next_offset += $offset + 1;
393             } else {
394 0         0 $offset = Kafka::Int64::intsum( $offset, 0 );
395 0         0 $next_offset = Kafka::Int64::intsum( $offset, 1 );
396             }
397              
398             # skip previous messages of a compressed package posts
399 15018 50 33     20290 next if $offset < $start_offset && !$_return_all;
400              
401 15018         12620 my $message_error = q{};
402             # According to Apache Kafka documentation:
403             # This byte holds metadata attributes about the message. The
404             # lowest 3 bits contain the compression codec used for the
405             # message. The fourth lowest bit represents the timestamp type.
406             # 0 stands for CreateTime and 1 stands for LogAppendTime. The
407             # producer should always set this bit to 0. (since 0.10.0).
408             # All other bits should be set to 0.
409 15018         13045 my $attributes = $Message->{Attributes};
410             # check that attributes is valid
411             $attributes & 0b11110000
412 15018 50       18411 and $message_error = $ERROR{ $ERROR_METADATA_ATTRIBUTES };
413              
414 15018 50       16775 if (my $compression_codec = $attributes & 0b00000111) {
415 0 0 0     0 unless ( $compression_codec == 1 # GZIP
416             || $compression_codec == 2 # Snappy
417             ) {
418 0         0 $message_error = $ERROR{ $ERROR_METADATA_ATTRIBUTES };
419             }
420             }
421              
422             push( @$messages, Kafka::Message->new( {
423             Attributes => $Message->{Attributes},
424             Timestamp => $Message->{Timestamp},
425             MagicByte => $Message->{MagicByte},
426             key => $Message->{Key},
427             payload => $Message->{Value},
428 15018         73794 offset => $offset,
429             next_offset => $next_offset,
430             error => $message_error,
431             valid => !$message_error,
432             HighwaterMarkOffset => $HighwaterMarkOffset,
433             } )
434             );
435             }
436             }
437             }
438              
439 5009         29998 return $messages;
440             }
441              
442             =head3 C<offset_at_time( $topic, $partition, $time )>
443              
444             Returns an offset, given a topic, partition and time.
445              
446             The returned offset is the earliest offset whose timestamp is greater than or
447             equal to the given timestamp. The return value is a HashRef, containing
448             C<timestamp> and C<offset> keys.
449              
450             B<WARNING>: this method requires Kafka 0.10.0, and messages with timestamps.
451             Check the configuration of the brokers or topic, specifically
452             C<message.timestamp.type>, and set it either to C<LogAppentTime> to have Kafka
453             automatically set messages timestamps based on the broker clock, or
454             C<CreateTime>, in which case the client populating your topic has to set the
455             timestamps when producing messages.
456              
457             C<offset_at_time()> takes the following arguments:
458              
459             =over 3
460              
461             =item C<$topic>
462              
463             The C<$topics> must be a normal non-false strings of non-zero length.
464              
465             =item C<$partition>
466              
467             The C<$partitions> must be a non-negative integers.
468              
469             =item C<$time>
470              
471             Get offsets before the given time (in milliseconds since UNIX Epoch).
472              
473             The argument must be a positive number.
474              
475             The argument may be a L<Math::BigInt|Math::BigInt> integer on 32 bit
476             system.
477              
478             =back
479              
480             =cut
481              
482             sub offset_at_time {
483 0     0 1 0 my ( $self, $topic, $partition, $time ) = @_;
484              
485             # we don't accept special values for $time, we want a real timestamp
486 0 0 0     0 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'time' )
      0        
      0        
487             unless defined( $time ) && ( _isbig( $time ) || isint( $time ) ) && $time >= 0;
488              
489             # no max_number, api version = 1
490 0         0 return $self->_query_offsets($topic, $partition, $time, undef, 1)->[0];
491             }
492              
493             =head3 C<offset_before_time( $topic, $partition, $time )>
494              
495             Returns an offset, given a topic, partition and time.
496              
497             The returned offset is an offset whose timestamp is guaranteed to be earlier
498             than the given timestamp. The return value is a Number
499              
500             This method works with all version of Kafka, and doesn't require messages with
501             timestamps.
502              
503             C<offset_before_time()> takes the following arguments:
504              
505             =over 3
506              
507             =item C<$topic>
508              
509             The C<$topics> must be a normal non-false strings of non-zero length.
510              
511             =item C<$partition>
512              
513             The C<$partitions> must be a non-negative integers.
514              
515             =item C<$time>
516              
517             Get offsets before the given time (in milliseconds since UNIX Epoch).
518              
519             The argument must be a positive number.
520              
521             The argument may be a L<Math::BigInt|Math::BigInt> integer on 32 bit
522             system.
523              
524             =back
525              
526             =cut
527              
528             sub offset_before_time {
529 0     0 1 0 my ( $self, $topic, $partition, $time ) = @_;
530              
531             # we don't accept special values for $time, we want a real timestamp
532 0 0 0     0 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'time' )
      0        
      0        
533             unless defined( $time ) && ( _isbig( $time ) || isint( $time ) ) && $time >= 0;
534             # $self->_error( $ERROR_MISMATCH_ARGUMENT, format_message( 'max_number (%s)', $max_number ) )
535             # unless !defined( $max_number ) || ( _POSINT( $max_number ) && $max_number <= $MAX_INT32 );
536              
537             # max_number = 1, api version = 0
538 0         0 return $self->_query_offsets($topic, $partition, $time, 1, 0)->[0];
539             }
540              
541             =head3 C<offset_earliest( $topic, $partition )>
542              
543             Returns the earliest offset for a given topic and partition
544              
545             C<offset_earliest()> takes the following arguments:
546              
547             =over 3
548              
549             =item C<$topic>
550              
551             The C<$topics> must be a normal non-false strings of non-zero length.
552              
553             =item C<$partition>
554              
555             The C<$partitions> must be a non-negative integers.
556              
557             =back
558              
559             =cut
560              
561             sub offset_earliest {
562 0     0 1 0 my ( $self, $topic, $partition ) = @_;
563              
564             # max_number = 1, api version = 0
565 0         0 return $self->_query_offsets($topic, $partition, $RECEIVE_EARLIEST_OFFSET, 1, 0)->[0];
566             }
567              
568             =head3 C<offset_latest( $topic, $partition )>
569              
570             Returns the latest offset for a given topic and partition
571              
572             C<offset_latest()> takes the following arguments:
573              
574             =over 3
575              
576             =item C<$topic>
577              
578             The C<$topics> must be a normal non-false strings of non-zero length.
579              
580             =item C<$partition>
581              
582             The C<$partitions> must be a non-negative integers.
583              
584             =back
585              
586             =cut
587              
588             sub offset_latest {
589 0     0 1 0 my ( $self, $topic, $partition ) = @_;
590              
591             # max_number = 1, api version = 0
592 0         0 return $self->_query_offsets($topic, $partition, $RECEIVE_LATEST_OFFSETS, 1, 0)->[0];
593             }
594              
595             =head3 C<offsets( $topic, $partition, $time, $max_number )>
596              
597             B<WARNING>: This method is DEPRECATED, please use one of C<offset_at_time()>, C<offset_before_time()>, C<offset_earliest()>, C<offset_latest()>. It is kept for backward compatibility.
598              
599             Returns an ArrayRef of offsets
600              
601             C<offsets()> takes the following arguments:
602              
603             =over 3
604              
605             =item C<$topic>
606              
607             The C<$topics> must be a normal non-false strings of non-zero length.
608              
609             =item C<$partition>
610              
611             The C<$partitions> must be a non-negative integers.
612              
613             =item C<$time>
614              
615             Get offsets before the given time (in milliseconds since UNIX Epoch). It must
616             be a positive number. It may be a L<Math::BigInt|Math::BigInt> integer on 32
617             bit system.
618              
619             The special values C<$RECEIVE_LATEST_OFFSETS> (-1), C<$RECEIVE_EARLIEST_OFFSET>
620             (-2) are allowed. They can be imported from the L<Kafka|Kafka> module.
621              
622             =item C<$max_number>
623              
624             Maximum number of offsets to be returned
625              
626             =back
627              
628             =cut
629              
630             sub offsets {
631 63     63 1 2745309 my ( $self, $topic, $partition, $time, $max_number ) = @_;
632              
633 63 100 66     233 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'time' )
      66        
      100        
634             unless defined( $time ) && ( _isbig( $time ) || isint( $time ) ) && $time >= $RECEIVE_EARLIEST_OFFSET;
635 52 100 66     1288 $self->_error( $ERROR_MISMATCH_ARGUMENT, format_message( 'max_number (%s)', $max_number ) )
      66        
636             unless !defined( $max_number ) || ( _POSINT( $max_number ) && $max_number <= $MAX_INT32 );
637              
638 38         414 return $self->_query_offsets($topic, $partition, $time, $max_number, 0);
639             }
640              
641             sub _query_offsets {
642 38     38   103 my ( $self, $topic, $partition, $time, $max_number, $api_version ) = @_;
643              
644 38 100 66     217 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'topic' )
      66        
645             unless defined( $topic) && ( $topic eq q{} || defined( _STRING( $topic ) ) );
646 31 100       88 $self->_error( $ERROR_NOT_BINARY_STRING, 'topic' )
647             if utf8::is_utf8( $topic );
648 30 100 66     192 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'partition' )
      66        
649             unless defined( $partition ) && isint( $partition ) && $partition >= 0;
650              
651 20         34 my $is_v1 = $api_version == 1;
652              
653             my $request = {
654             ApiKey => $APIKEY_OFFSET,
655             ApiVersion => $api_version,
656             CorrelationId => _get_CorrelationId(),
657             ClientId => $self->{ClientId},
658             topics => [
659             {
660             TopicName => $topic,
661             partitions => [
662             {
663             Partition => $partition,
664             Time => $time,
665             MaxNumberOfOffsets => $max_number // $self->{MaxNumberOfOffsets},
666             },
667 20   66     78 ],
668             },
669             ],
670             };
671              
672 20         88 my $response = $self->{Connection}->receive_response_to_request( $request );
673              
674 18         32 my $offsets = [];
675             # because we accepted only one topic and partition, we are sure that the
676             # response is all about this single topic and partition, so we can merge
677             # the offsets.
678 18 50       37 if ($is_v1) {
679 0         0 foreach my $received_topic ( @{ $response->{topics} } ) {
  0         0  
680 0         0 foreach my $partition_offsets ( @{ $received_topic->{PartitionOffsets} } ) {
  0         0  
681             push @$offsets, { timestamp => $partition_offsets->{Timestamp},
682 0         0 offset => $partition_offsets->{Offset} };
683             }
684             }
685             } else {
686 18         24 foreach my $received_topic ( @{ $response->{topics} } ) {
  18         37  
687 18         22 foreach my $partition_offsets ( @{ $received_topic->{PartitionOffsets} } ) {
  18         26  
688 18         18 push @$offsets, @{ $partition_offsets->{Offset} };
  18         35  
689             }
690             }
691             }
692              
693 18         93 return $offsets;
694             }
695              
696             =head3 C<commit_offsets( $topic, $partition, $offset, $group )>
697              
698             Commit offsets using the offset commit/fetch API.
699              
700             Returns a non-blank value (a reference to a hash with server response description)
701             if the message is successfully sent.
702              
703             C<commit_offsets()> takes the following arguments:
704              
705             =over 3
706              
707             =item C<$topic>
708              
709             The C<$topic> must be a normal non-false string of non-zero length.
710              
711             =item C<$partition>
712              
713             The C<$partition> must be a non-negative integer.
714              
715             =item C<$offset>
716              
717             Offset in topic and partition to commit.
718              
719             The argument must be a positive number.
720              
721             The argument may be a L<Math::BigInt|Math::BigInt> integer on 32 bit
722             system.
723              
724             =item C<$group>
725              
726             The name of the consumer group
727              
728             The argument must be a normal non-false string of non-zero length.
729              
730             =back
731              
732             =cut
733             sub commit_offsets {
734 0     0 1 0 my ( $self, $topic, $partition, $offset, $group ) = @_;
735              
736              
737 0 0 0     0 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'topic' )
      0        
738             unless defined( $topic ) && ( $topic eq q{} || defined( _STRING( $topic ) ) );
739 0 0       0 $self->_error( $ERROR_NOT_BINARY_STRING, 'topic' )
740             if utf8::is_utf8( $topic );
741 0 0 0     0 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'partition' )
      0        
742             unless defined( $partition ) && isint( $partition ) && $partition >= 0;
743 0 0 0     0 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'offset' )
      0        
744             unless defined( $offset ) && ( ( _isbig( $offset ) && $offset >= 0 ) || defined( _NONNEGINT( $offset ) ) );
745 0 0 0     0 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'group' )
      0        
746             unless defined( $group ) && ( $group eq q{} || defined( _STRING( $group ) ) );
747 0 0       0 $self->_error( $ERROR_NOT_BINARY_STRING, 'group' )
748             if utf8::is_utf8( $group );
749              
750             my $request = {
751             __send_to__ => 'group_coordinator',
752             ApiKey => $APIKEY_OFFSETCOMMIT,
753             CorrelationId => _get_CorrelationId(),
754             ClientId => $self->{ClientId},
755 0         0 GroupId => $group,
756             topics => [
757             {
758             TopicName => $topic,
759             partitions => [
760             {
761             Partition => $partition,
762             Offset => $offset,
763             Metadata => '',
764             },
765             ],
766             },
767             ],
768             };
769              
770 0         0 return $self->{Connection}->receive_response_to_request( $request );
771             }
772              
773             =head3 C<fetch_offsets( $topic, $partition, $group )>
774              
775             Fetch Committed offsets using the offset commit/fetch API.
776              
777             Returns a non-blank value (a reference to a hash with server response description)
778             if the message is successfully sent.
779              
780             C<fetch_offsets()> takes the following arguments:
781              
782             =over 3
783              
784             =item C<$topic>
785              
786             The C<$topic> must be a normal non-false string of non-zero length.
787              
788             =item C<$partition>
789              
790             The C<$partition> must be a non-negative integer.
791              
792             =item C<$group>
793              
794             The name of the consumer group
795              
796             The argument must be a normal non-false string of non-zero length.
797              
798             =back
799              
800             =cut
801             sub fetch_offsets {
802 0     0 1 0 my ( $self, $topic, $partition, $group ) = @_;
803              
804              
805 0 0 0     0 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'topic' )
      0        
806             unless defined( $topic ) && ( $topic eq q{} || defined( _STRING( $topic ) ) );
807 0 0       0 $self->_error( $ERROR_NOT_BINARY_STRING, 'topic' )
808             if utf8::is_utf8( $topic );
809 0 0 0     0 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'partition' )
      0        
810             unless defined( $partition ) && isint( $partition ) && $partition >= 0;
811 0 0 0     0 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'group' )
      0        
812             unless defined( $group ) && ( $group eq q{} || defined( _STRING( $group ) ) );
813 0 0       0 $self->_error( $ERROR_NOT_BINARY_STRING, 'group' )
814             if utf8::is_utf8( $group );
815              
816             my $request = {
817             __send_to__ => 'group_coordinator',
818             ApiKey => $APIKEY_OFFSETFETCH,
819             CorrelationId => _get_CorrelationId(),
820             ClientId => $self->{ClientId},
821 0         0 GroupId => $group,
822             topics => [
823             {
824             TopicName => $topic,
825             partitions => [
826             {
827             Partition => $partition,
828             },
829             ],
830             },
831             ],
832             };
833              
834 0         0 return $self->{Connection}->receive_response_to_request( $request );
835             }
836              
837             #-- private attributes ---------------------------------------------------------
838              
839             #-- private methods ------------------------------------------------------------
840              
841             # Handler for errors
842             sub _error {
843 171     171   336 my $self = shift;
844              
845 171         429 Kafka::Exception::Consumer->throw( throw_args( @_ ) );
846              
847 0           return;
848             }
849              
850              
851              
852             1;
853              
854             __END__
855              
856             =head1 DIAGNOSTICS
857              
858             When error is detected, an exception, represented by object of C<Kafka::Exception::Consumer> class,
859             is thrown (see L<Kafka::Exceptions|Kafka::Exceptions>).
860              
861             L<code|Kafka::Exceptions/code> and a more descriptive L<message|Kafka::Exceptions/message> provide
862             information about thrown exception. Consult documentation of the L<Kafka::Exceptions|Kafka::Exceptions>
863             for the list of all available methods.
864              
865             Authors suggest using of L<Try::Tiny|Try::Tiny>'s C<try> and C<catch> to handle exceptions while
866             working with L<Kafka|Kafka> package.
867              
868             =over 3
869              
870             =item C<Invalid argument>
871              
872             Invalid argument passed to a C<new> L<constructor|/CONSTRUCTOR> or other L<method|/METHODS>.
873              
874             =item C<Cannot send>
875              
876             Request cannot be sent.
877              
878             =item C<Cannot receive>
879              
880             Response cannot be received.
881              
882             =item C<Cannott bind>
883              
884             TCP connection can't be established on the given host and port.
885              
886             =item C<Cannot get metadata>
887              
888             Failed to obtain metadata from Kafka servers.
889              
890             =item C<Leader not found>
891              
892             Missing information about server-leader in metadata.
893              
894             =item C<Mismatch CorrelationId>
895              
896             C<CorrelationId> of response doesn't match one in request.
897              
898             =item C<There are no known brokers>
899              
900             Resulting metadata has no information about cluster brokers.
901              
902             =item C<Cannot get metadata>
903              
904             Received metadata has incorrect internal structure.
905              
906             =back
907              
908             =head1 SEE ALSO
909              
910             The basic operation of the Kafka package modules:
911              
912             L<Kafka|Kafka> - constants and messages used by the Kafka package modules.
913              
914             L<Kafka::Connection|Kafka::Connection> - interface to connect to a Kafka cluster.
915              
916             L<Kafka::Producer|Kafka::Producer> - interface for producing client.
917              
918             L<Kafka::Consumer|Kafka::Consumer> - interface for consuming client.
919              
920             L<Kafka::Message|Kafka::Message> - interface to access Kafka message
921             properties.
922              
923             L<Kafka::Int64|Kafka::Int64> - functions to work with 64 bit elements of the
924             protocol on 32 bit systems.
925              
926             L<Kafka::Protocol|Kafka::Protocol> - functions to process messages in the
927             Apache Kafka's Protocol.
928              
929             L<Kafka::IO|Kafka::IO> - low-level interface for communication with Kafka server.
930              
931             L<Kafka::Exceptions|Kafka::Exceptions> - module designated to handle Kafka exceptions.
932              
933             L<Kafka::Internals|Kafka::Internals> - internal constants and functions used
934             by several package modules.
935              
936             A wealth of detail about the Apache Kafka and the Kafka Protocol:
937              
938             Main page at L<http://kafka.apache.org/>
939              
940             Kafka Protocol at L<https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol>
941              
942             =head1 SOURCE CODE
943              
944             Kafka package is hosted on GitHub:
945             L<https://github.com/TrackingSoft/Kafka>
946              
947             =head1 AUTHOR
948              
949             Sergey Gladkov
950              
951             Please use GitHub project link above to report problems or contact authors.
952              
953             =head1 CONTRIBUTORS
954              
955             Alexander Solovey
956              
957             Jeremy Jordan
958              
959             Sergiy Zuban
960              
961             Vlad Marchenko
962              
963             Damien Krotkine
964              
965             Greg Franklin
966              
967             =head1 COPYRIGHT AND LICENSE
968              
969             Copyright (C) 2012-2017 by TrackingSoft LLC.
970              
971             This package is free software; you can redistribute it and/or modify it under
972             the same terms as Perl itself. See I<perlartistic> at
973             L<http://dev.perl.org/licenses/artistic.html>.
974              
975             This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
976             without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
977             PARTICULAR PURPOSE.
978              
979             =cut
980