File Coverage

lib/Kafka/Producer.pm
Criterion Covered Total %
statement 76 91 83.5
branch 38 64 59.3
condition 48 82 58.5
subroutine 14 14 100.0
pod 2 2 100.0
total 178 253 70.3


line stmt bran cond sub pod time code
1             package Kafka::Producer;
2              
3             =head1 NAME
4              
5             Kafka::Producer - Perl interface for Kafka producer client.
6              
7             =head1 VERSION
8              
9             This documentation refers to C<Kafka::Producer> version 1.08 .
10              
11             =cut
12              
13              
14              
15 7     7   53208 use 5.010;
  7         24  
16 7     7   37 use strict;
  7         24  
  7         149  
17 7     7   32 use warnings;
  7         12  
  7         328  
18              
19              
20             our $VERSION = 'v1.08';
21              
22              
23 7     7   73 use Carp;
  7         25  
  7         479  
24 7         437 use Params::Util qw(
25             _ARRAY
26             _INSTANCE
27             _NONNEGINT
28             _NUMBER
29             _STRING
30             _POSINT
31 7     7   61 );
  7         27  
32 7         349 use Scalar::Util qw(
33             blessed
34 7     7   44 );
  7         12  
35 7         412 use Scalar::Util::Numeric qw(
36             isint
37 7     7   46 );
  7         10  
38              
39 7         1132 use Kafka qw(
40             %ERROR
41             $COMPRESSION_GZIP
42             $COMPRESSION_NONE
43             $COMPRESSION_SNAPPY
44             $COMPRESSION_LZ4
45             $ERROR_CANNOT_GET_METADATA
46             $ERROR_MISMATCH_ARGUMENT
47             $ERROR_NOT_BINARY_STRING
48             $REQUEST_TIMEOUT
49             $NOT_SEND_ANY_RESPONSE
50             $WAIT_WRITTEN_TO_LOCAL_LOG
51             $BLOCK_UNTIL_IS_COMMITTED
52 7     7   45 );
  7         13  
53 7     7   50 use Kafka::Connection;
  7         12  
  7         574  
54 7     7   48 use Kafka::Exceptions;
  7         19  
  7         344  
55 7         8516 use Kafka::Internals qw(
56             $APIKEY_PRODUCE
57             $MAX_CORRELATIONID
58             $MAX_INT16
59             $MAX_INT32
60             $PRODUCER_ANY_OFFSET
61             _get_CorrelationId
62             format_message
63 7     7   44 );
  7         14  
64              
65              
66              
67             =head1 SYNOPSIS
68              
69             use 5.010;
70             use strict;
71             use warnings;
72              
73             use Scalar::Util qw(
74             blessed
75             );
76             use Try::Tiny;
77              
78             use Kafka::Connection;
79             use Kafka::Producer;
80              
81             my ( $connection, $producer );
82             try {
83              
84             #-- Connection
85             $connection = Kafka::Connection->new( host => 'localhost' );
86              
87             #-- Producer
88             $producer = Kafka::Producer->new( Connection => $connection );
89              
90             # Sending a single message
91             my $response = $producer->send(
92             'mytopic', # topic
93             0, # partition
94             'Single message' # message
95             );
96              
97             # Sending a series of messages
98             $response = $producer->send(
99             'mytopic', # topic
100             0, # partition
101             [ # messages
102             'The first message',
103             'The second message',
104             'The third message',
105             ]
106             );
107              
108             } catch {
109             my $error = $_;
110             if ( blessed( $error ) && $error->isa( 'Kafka::Exception' ) ) {
111             warn 'Error: (', $error->code, ') ', $error->message, "\n";
112             exit;
113             } else {
114             die $error;
115             }
116             };
117              
118             # Closes the producer and cleans up
119             undef $producer;
120             $connection->close;
121             undef $connection;
122              
123             =head1 DESCRIPTION
124              
125             Kafka producer API is implemented by C<Kafka::Producer> class.
126              
127             The main features of the C<Kafka::Producer> class are:
128              
129             =over 3
130              
131             =item *
132              
133             Provides object-oriented API for producing messages.
134              
135             =item *
136              
137             Provides Kafka PRODUCE requests.
138              
139             =back
140              
141             =cut
142              
143             my %known_compression_codecs = map { $_ => 1 } (
144             $COMPRESSION_NONE,
145             $COMPRESSION_GZIP,
146             $COMPRESSION_SNAPPY,
147             $COMPRESSION_LZ4,
148             );
149              
150             #-- constructor ----------------------------------------------------------------
151              
152             =head2 CONSTRUCTOR
153              
154             =head3 C<new>
155              
156             Creates new producer client object.
157              
158             C<new()> takes arguments in key-value pairs. The following arguments are currently recognized:
159              
160             =over 3
161              
162             =item C<Connection =E<gt> $connection>
163              
164             C<$connection> is the L<Kafka::Connection|Kafka::Connection> object responsible for communication with
165             the Apache Kafka cluster.
166              
167             =item C<ClientId =E<gt> $client_id>
168              
169             This is a user supplied identifier (string) for the client application.
170              
171             If C<ClientId> is not passed to constructor, its value will be automatically assigned
172             (to string C<'producer'>).
173              
174             =item C<RequiredAcks =E<gt> $acks>
175              
176             The C<$acks> should be an int16 signed integer.
177              
178             Indicates how many acknowledgements the servers should receive before responding to the request.
179              
180             If it is C<$NOT_SEND_ANY_RESPONSE> the server does not send any response.
181              
182             If it is C<$WAIT_WRITTEN_TO_LOCAL_LOG>, (default)
183             the server will wait until the data is written to the local log before sending a response.
184              
185             If it is C<$BLOCK_UNTIL_IS_COMMITTED>
186             the server will block until the message is committed by all in sync replicas before sending a response.
187              
188             C<$NOT_SEND_ANY_RESPONSE>, C<$WAIT_WRITTEN_TO_LOCAL_LOG>, C<$BLOCK_UNTIL_IS_COMMITTED>
189             can be imported from the L<Kafka|Kafka> module.
190              
191             =item C<Timeout =E<gt> $timeout>
192              
193             This provides a maximum time the server can await the receipt
194             of the number of acknowledgements in C<RequiredAcks>.
195              
196             The C<$timeout> in seconds, could be any integer or floating-point type not bigger than int32 positive integer.
197              
198             Optional, default = C<$REQUEST_TIMEOUT>.
199              
200             C<$REQUEST_TIMEOUT> is the default timeout that can be imported from the
201             L<Kafka|Kafka> module.
202              
203             =back
204              
205             =cut
206             sub new {
207 107     107 1 134951 my ( $class, %params ) = @_;
208              
209 107         408 my $self = bless {
210             Connection => undef,
211             ClientId => undef,
212             RequiredAcks => $WAIT_WRITTEN_TO_LOCAL_LOG,
213             Timeout => undef,
214             }, $class;
215              
216 107         321 foreach my $p ( keys %params ) {
217 249 50       431 if( exists $self->{ $p } ) {
218 249         402 $self->{ $p } = $params{ $p };
219             }
220             else {
221 0         0 $self->_error( $ERROR_MISMATCH_ARGUMENT, $p );
222             }
223             }
224              
225 107   100     394 $self->{ClientId} //= 'producer';
226              
227             $self->_error( $ERROR_MISMATCH_ARGUMENT, 'Connection' )
228 107 100       788 unless _INSTANCE( $self->{Connection}, 'Kafka::Connection' );
229             $self->_error( $ERROR_MISMATCH_ARGUMENT, 'ClientId' )
230 87 100 66     427 unless ( $self->{ClientId} eq '' || defined( _STRING( $self->{ClientId} ) ) );
231             $self->_error( $ERROR_NOT_BINARY_STRING, 'ClientId' )
232 80 100       189 if utf8::is_utf8( $self->{ClientId} );
233              
234             # Use connection timeout if not provided explicitly
235 79   33     299 $self->{Timeout} //= $self->{Connection}->{Timeout} //= $REQUEST_TIMEOUT;
      66        
236             $self->_error( $ERROR_MISMATCH_ARGUMENT, format_message( 'Timeout (%s)', $self->{Timeout} ) )
237 79 50 66     472 unless defined _NUMBER( $self->{Timeout} ) && int( $self->{Timeout} * 1000 ) >= 1 && int( $self->{Timeout} * 1000 ) <= $MAX_INT32;
      66        
238              
239 70         130 my $required_acks = $self->{RequiredAcks};
240 70 100 66     572 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'RequiredAcks' )
      66        
      100        
241             unless
242             defined( $required_acks )
243             && isint( $required_acks )
244             && (
245             $required_acks == $NOT_SEND_ANY_RESPONSE
246             || $required_acks == $WAIT_WRITTEN_TO_LOCAL_LOG
247             || $required_acks == $BLOCK_UNTIL_IS_COMMITTED
248             )
249             ;
250              
251 60         251 return $self;
252             }
253              
254             #-- public attributes ----------------------------------------------------------
255              
256             =head2 METHODS
257              
258             The following methods are defined for the C<Kafka::Producer> class:
259              
260             =cut
261              
262             #-- public methods -------------------------------------------------------------
263              
264             =head3 C<send( $topic, $partition, $messages, $keys, $compression_codec )>
265              
266             Sends a messages on a L<Kafka::Connection|Kafka::Connection> object.
267              
268             Returns a non-blank value (a reference to a hash with server response description)
269             if the message is successfully sent.
270              
271             C<send()> takes the following arguments:
272              
273             =over 3
274              
275             =item C<$topic>
276              
277             The C<$topic> must be a normal non-false string of non-zero length.
278              
279             =item C<$partition>
280              
281             The C<$partition> must be a non-negative integer.
282              
283             =item C<$messages>
284              
285             The C<$messages> is an arbitrary amount of data (a simple data string or
286             a reference to an array of the data strings).
287              
288             =item C<$keys>
289              
290             The C<$keys> are optional message keys, for partitioning with each message,
291             so the consumer knows the partitioning key.
292             This argument should be either a single string (common key for all messages),
293             or an array of strings with length matching messages array.
294              
295             =item C<$compression_codec>
296              
297             Optional.
298              
299             C<$compression_codec> sets the required type of C<$messages> compression,
300             if the compression is desirable.
301              
302             Supported codecs:
303             C<$COMPRESSION_NONE>,
304             C<$COMPRESSION_GZIP>,
305             C<$COMPRESSION_SNAPPY>,
306             C<$COMPRESSION_LZ4>.
307             The defaults that can be imported from the L<Kafka|Kafka> module.
308              
309             =item C<$timestamps>
310              
311             Optional.
312              
313             This is the timestamps of the C<$messages>.
314              
315             This argument should be either a single number (common timestamp for all messages),
316             or an array of integers with length matching messages array.
317              
318             Unit is milliseconds since beginning of the epoch (midnight Jan 1, 1970 (UTC)).
319              
320             B<WARNING>: timestamps supported since Kafka 0.10.0.
321              
322              
323             Do not use C<$Kafka::SEND_MAX_ATTEMPTS> in C<Kafka::Producer-<gt>send> request to prevent duplicates.
324              
325             =back
326              
327             =cut
328             sub send {
329 5070     5070 1 87817 my ( $self, $topic, $partition, $messages, $keys, $compression_codec, $timestamps ) = @_;
330              
331 5070 100 66     20039 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'topic' )
      66        
332             unless defined( $topic ) && ( $topic eq '' || defined( _STRING( $topic ) ) );
333 5063 100       8563 $self->_error( $ERROR_NOT_BINARY_STRING, 'topic' )
334             if utf8::is_utf8( $topic );
335 5062 100 66     21170 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'partition' )
      66        
336             unless defined( $partition ) && isint( $partition ) && $partition >= 0;
337 5052 100 100     15888 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'messages' )
338             unless defined( _STRING( $messages ) ) || _ARRAY( $messages );
339 5047 100 100     7665 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'keys' )
      66        
340             unless ( !defined( $keys ) || defined( _STRING( $keys ) ) || _ARRAY( $keys ) );
341             $self->_error( $ERROR_MISMATCH_ARGUMENT, 'compression_codec' )
342 5041 100 100     5949 unless ( !defined( $compression_codec ) || $known_compression_codecs{ $compression_codec } );
343 5029 0 33     6752 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'timestamps' )
      33        
344             unless ( !defined( $timestamps ) || defined( _POSINT( $timestamps ) ) || _ARRAY( $timestamps ) );
345              
346 5029 100       7436 $messages = [ $messages ] unless ref( $messages );
347 5029         6252 foreach my $message ( @$messages ) {
348 10042 100 66     27925 $self->_error( $ERROR_MISMATCH_ARGUMENT, format_message( 'message = %s', $message ) )
      100        
349             unless defined( $message ) && ( $message eq '' || defined( _STRING( $message ) ) );
350 10035 100       15071 $self->_error( $ERROR_NOT_BINARY_STRING, format_message( 'message = %s', $message ) )
351             if utf8::is_utf8( $message );
352             }
353              
354 5020         4376 my $common_key;
355              
356 5020 50       8489 if( _ARRAY( $keys ) ) {
    100          
357             # ensure that keys array maytches messages array
358 0 0       0 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'keys' )
359             unless scalar( @$keys ) == scalar( @$messages );
360              
361 0         0 foreach my $key ( @$keys ) {
362 0 0 0     0 $self->_error( $ERROR_MISMATCH_ARGUMENT, format_message( 'key = %s', $key ) )
      0        
363             unless !defined( $key ) || $key eq '' || ( defined( _STRING( $key ) ) );
364 0 0       0 $self->_error( $ERROR_NOT_BINARY_STRING, format_message( 'key = %s', $key ) )
365             if utf8::is_utf8( $key );
366             }
367             }
368             elsif( defined $keys ) {
369 1 50 33     11 $self->_error( $ERROR_MISMATCH_ARGUMENT, format_message( 'key = %s', $keys ) )
370             unless $keys eq '' || defined( _STRING( $keys ) );
371 1 50       6 $self->_error( $ERROR_NOT_BINARY_STRING, format_message( 'key = %s', $keys ) )
372             if utf8::is_utf8( $keys );
373 0         0 $common_key = $keys;
374             }
375             else {
376 5019         5011 $common_key = '';
377             }
378              
379 5019         4531 my ($common_ts, $use_ts);
380              
381 5019 50       8566 if( _ARRAY( $timestamps ) ) {
    50          
382             # ensure that timestamps array maytches messages array
383 0 0       0 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'timestamps' )
384             unless scalar( @$timestamps ) == scalar( @$messages );
385              
386 0         0 foreach my $ts ( @$timestamps ) {
387 0 0 0     0 $self->_error( $ERROR_MISMATCH_ARGUMENT, format_message( 'timestamp = %s', $ts ) )
388             unless !defined( $ts ) || defined( _POSINT( $ts ) );
389             }
390 0         0 $use_ts = 1;
391             }
392             elsif( defined $timestamps ) {
393 0 0       0 $self->_error( $ERROR_MISMATCH_ARGUMENT, format_message( 'timestamp = %s', $timestamps ) )
394             unless defined( _POSINT( $timestamps ));
395 0         0 $common_ts = $timestamps;
396 0         0 $use_ts = 1;
397             }
398              
399 5019         5042 my $MessageSet = [];
400             my $request = {
401             ApiKey => $APIKEY_PRODUCE,
402             CorrelationId => _get_CorrelationId(),
403             ClientId => $self->{ClientId},
404             RequiredAcks => $self->{RequiredAcks},
405 5019         11037 Timeout => int( $self->{Timeout} * 1000 ),
406             topics => [
407             {
408             TopicName => $topic,
409             partitions => [
410             {
411             Partition => $partition,
412             MessageSet => $MessageSet,
413             },
414             ],
415             },
416             ],
417             };
418 5019 50 66     17136 if ( ( defined $compression_codec and $compression_codec == $COMPRESSION_LZ4 ) or defined $timestamps ) {
      33        
419 0         0 $request->{ApiVersion} = 2;
420             }
421              
422 5019         5212 my $key_index = 0;
423 5019         5147 foreach my $message ( @$messages ) {
424 10032 50 0     26044 push @$MessageSet, {
    0          
    50          
425             Offset => $PRODUCER_ANY_OFFSET,
426             Key => defined $common_key ? $common_key : ( $keys->[ $key_index ] // '' ),
427             Value => $message,
428             $use_ts ? ( Timestamp => defined $common_ts ? $common_ts : $timestamps->[$key_index] ) : (),
429             };
430 10032         10563 ++$key_index;
431             }
432              
433 5019         9967 my $result = $self->{Connection}->receive_response_to_request( $request, $compression_codec, $self->{Timeout} );
434 5017         21656 return $result;
435             }
436              
437             #-- private attributes ---------------------------------------------------------
438              
439             #-- private methods ------------------------------------------------------------
440              
441             # Handler for errors
442             sub _error {
443 98     98   122 my $self = shift;
444              
445 98         232 Kafka::Exception::Producer->throw( throw_args( @_ ) );
446              
447 0           return;
448             }
449              
450              
451              
452             1;
453              
454             __END__
455              
456             =head1 DIAGNOSTICS
457              
458             When error is detected, an exception, represented by object of C<Kafka::Exception::Producer> class,
459             is thrown (see L<Kafka::Exceptions|Kafka::Exceptions>).
460              
461             L<code|Kafka::Exceptions/code> and a more descriptive L<message|Kafka::Exceptions/message> provide
462             information about thrown exception. Consult documentation of the L<Kafka::Exceptions|Kafka::Exceptions>
463             for the list of all available methods.
464              
465             Authors suggest using of L<Try::Tiny|Try::Tiny>'s C<try> and C<catch> to handle exceptions while
466             working with L<Kafka|Kafka> package.
467              
468             =over 3
469              
470             =item C<Invalid argument>
471              
472             Invalid arguments were provided to a C<new>
473             L<constructor|/CONSTRUCTOR> or to other L<method|/METHODS>.
474              
475             =item C<Cannot send>
476              
477             Request cannot be sent.
478              
479             =item C<Cannot receive>
480              
481             Response cannot be received.
482              
483             =item C<Cannot bind>
484              
485             TCP connection cannot be established on a given host and port.
486              
487             =item C<Cannot get metadata>
488              
489             IO error is present, errors found in the structure of the reply or the reply contains a non-zero error codes.
490              
491             =item C<Description leader not found>
492              
493             Information about the server-leader is missing in metadata.
494              
495             =item C<Mismatch CorrelationId>
496              
497             C<CorrelationId> of response doesn't match one in request.
498              
499             =item C<There are no known brokers>
500              
501             Information about brokers in the cluster is missing.
502              
503             =item C<Cannot get metadata>
504              
505             Obtained metadata is incorrect or failed to obtain metadata.
506              
507             =back
508              
509             =head1 SEE ALSO
510              
511             The basic operation of the Kafka package modules:
512              
513             L<Kafka|Kafka> - constants and messages used by the Kafka package modules.
514              
515             L<Kafka::Connection|Kafka::Connection> - interface to connect to a Kafka cluster.
516              
517             L<Kafka::Producer|Kafka::Producer> - interface for producing client.
518              
519             L<Kafka::Consumer|Kafka::Consumer> - interface for consuming client.
520              
521             L<Kafka::Message|Kafka::Message> - interface to access Kafka message
522             properties.
523              
524             L<Kafka::Int64|Kafka::Int64> - functions to work with 64 bit elements of the
525             protocol on 32 bit systems.
526              
527             L<Kafka::Protocol|Kafka::Protocol> - functions to process messages in the
528             Apache Kafka's Protocol.
529              
530             L<Kafka::IO|Kafka::IO> - low-level interface for communication with Kafka server.
531              
532             L<Kafka::Exceptions|Kafka::Exceptions> - module designated to handle Kafka exceptions.
533              
534             L<Kafka::Internals|Kafka::Internals> - internal constants and functions used
535             by several package modules.
536              
537             A wealth of detail about the Apache Kafka and the Kafka Protocol:
538              
539             Main page at L<http://kafka.apache.org/>
540              
541             Kafka Protocol at L<https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol>
542              
543             =head1 SOURCE CODE
544              
545             Kafka package is hosted on GitHub:
546             L<https://github.com/TrackingSoft/Kafka>
547              
548             =head1 AUTHOR
549              
550             Sergey Gladkov
551              
552             Please use GitHub project link above to report problems or contact authors.
553              
554             =head1 CONTRIBUTORS
555              
556             Alexander Solovey
557              
558             Jeremy Jordan
559              
560             Sergiy Zuban
561              
562             Vlad Marchenko
563              
564             =head1 COPYRIGHT AND LICENSE
565              
566             Copyright (C) 2012-2017 by TrackingSoft LLC.
567              
568             This package is free software; you can redistribute it and/or modify it under
569             the same terms as Perl itself. See I<perlartistic> at
570             L<http://dev.perl.org/licenses/artistic.html>.
571              
572             This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
573             without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
574             PARTICULAR PURPOSE.
575              
576             =cut