File Coverage

lib/Kafka/Connection.pm
Criterion Covered Total %
statement 406 528 76.8
branch 157 288 54.5
condition 79 166 47.5
subroutine 58 64 90.6
pod 12 12 100.0
total 712 1058 67.3


line stmt bran cond sub pod time code
1             package Kafka::Connection;
2              
3             =head1 NAME
4              
5             Kafka::Connection - Object interface to connect to a kafka cluster.
6              
7             =head1 VERSION
8              
9             This documentation refers to C<Kafka::Connection> version 1.08 .
10              
11             =cut
12              
13 9     9   404668 use 5.010;
  9         40  
14 9     9   49 use strict;
  9         15  
  9         181  
15 9     9   39 use warnings;
  9         15  
  9         482  
16              
17             our $DEBUG = 0;
18              
19             our $VERSION = 'v1.08';
20              
21 9         531 use Exporter qw(
22             import
23 9     9   51 );
  9         13  
24             our @EXPORT = qw(
25             %RETRY_ON_ERRORS
26             );
27              
28 9     9   4566 use Authen::SCRAM::Client;
  9         2057769  
  9         392  
29 9     9   82 use Encode qw/encode/;
  9         17  
  9         641  
30 9         534 use Data::Validate::Domain qw(
31             is_hostname
32 9     9   2844 );
  9         68232  
33 9         632 use Data::Validate::IP qw(
34             is_ipv4
35             is_ipv6
36 9     9   3042 );
  9         181252  
37 9     9   575 use Const::Fast;
  9         1123  
  9         89  
38 9         565 use List::Util qw(
39             shuffle
40 9     9   657 );
  9         30  
41 9         658 use Params::Util qw(
42             _ARRAY
43             _ARRAY0
44             _HASH
45             _NONNEGINT
46             _NUMBER
47             _POSINT
48             _STRING
49 9     9   2492 );
  9         10888  
50 9         376 use Scalar::Util qw(
51             blessed
52 9     9   57 );
  9         18  
53 9         552 use Scalar::Util::Numeric qw(
54             isint
55 9     9   4350 );
  9         5467  
56 9         340 use Storable qw(
57             dclone
58 9     9   65 );
  9         17  
59 9     9   2299 use Time::HiRes ();
  9         5609  
  9         216  
60 9     9   51 use Try::Tiny;
  9         13  
  9         972  
61              
62 9         3450 use Kafka qw(
63             %ERROR
64              
65             $ERROR_NO_ERROR
66             $ERROR_UNKNOWN
67             $ERROR_OFFSET_OUT_OF_RANGE
68             $ERROR_INVALID_MESSAGE
69             $ERROR_UNKNOWN_TOPIC_OR_PARTITION
70             $ERROR_INVALID_FETCH_SIZE
71             $ERROR_LEADER_NOT_AVAILABLE
72             $ERROR_NOT_LEADER_FOR_PARTITION
73             $ERROR_REQUEST_TIMED_OUT
74             $ERROR_BROKER_NOT_AVAILABLE
75             $ERROR_REPLICA_NOT_AVAILABLE
76             $ERROR_MESSAGE_TOO_LARGE
77             $ERROR_STALE_CONTROLLER_EPOCH
78             $ERROR_NETWORK_EXCEPTION
79             $ERROR_GROUP_LOAD_IN_PROGRESS
80             $ERROR_OFFSET_METADATA_TOO_LARGE
81             $ERROR_GROUP_COORDINATOR_NOT_AVAILABLE
82             $ERROR_NOT_COORDINATOR_FOR_GROUP
83             $ERROR_NOT_ENOUGH_REPLICAS
84             $ERROR_NOT_ENOUGH_REPLICAS_AFTER_APPEND
85             $ERROR_REBALANCE_IN_PROGRESS
86             $ERROR_UNSUPPORTED_VERSION
87              
88             $ERROR_CANNOT_BIND
89             $ERROR_CANNOT_GET_METADATA
90             $ERROR_CANNOT_RECV
91             $ERROR_CANNOT_SEND
92             $ERROR_LEADER_NOT_FOUND
93             $ERROR_GROUP_COORDINATOR_NOT_FOUND
94             $ERROR_MISMATCH_ARGUMENT
95             $ERROR_NOT_BINARY_STRING
96             $ERROR_MISMATCH_CORRELATIONID
97             $ERROR_NO_KNOWN_BROKERS
98             $ERROR_RESPONSEMESSAGE_NOT_RECEIVED
99             $ERROR_SEND_NO_ACK
100             $ERROR_UNKNOWN_APIKEY
101             $ERROR_INCOMPATIBLE_HOST_IP_VERSION
102             $ERROR_NO_CONNECTION
103              
104             $IP_V4
105             $IP_V6
106             $KAFKA_SERVER_PORT
107             $NOT_SEND_ANY_RESPONSE
108             $REQUEST_TIMEOUT
109             $RETRY_BACKOFF
110             $SEND_MAX_ATTEMPTS
111 9     9   558 );
  9         28  
112              
113 9     9   2238 use Kafka::Exceptions;
  9         21  
  9         606  
114 9         1397 use Kafka::Internals qw(
115             $APIKEY_FETCH
116             $APIKEY_METADATA
117             $APIKEY_OFFSET
118             $APIKEY_PRODUCE
119             $APIKEY_FINDCOORDINATOR
120             $APIKEY_APIVERSIONS
121             $APIKEY_OFFSETCOMMIT
122             $APIKEY_OFFSETFETCH
123             $APIKEY_SASLHANDSHAKE
124             $MAX_CORRELATIONID
125             $MAX_INT32
126             debug_level
127             _get_CorrelationId
128             format_message
129 9     9   65 );
  9         15  
130 9     9   2549 use Kafka::IO;
  9         22  
  9         307  
131 9     9   4511 use Kafka::IO::Async;
  9         24  
  9         543  
132 9         63456 use Kafka::Protocol qw(
133             $BAD_OFFSET
134             $IMPLEMENTED_APIVERSIONS
135             decode_fetch_response
136             decode_metadata_response
137             decode_offset_response
138             decode_produce_response
139             decode_api_versions_response
140             decode_find_coordinator_response
141             decode_offsetcommit_response
142             decode_offsetfetch_response
143             decode_saslhandshake_response
144             encode_fetch_request
145             encode_metadata_request
146             encode_offset_request
147             encode_produce_request
148             encode_api_versions_request
149             encode_find_coordinator_request
150             encode_offsetcommit_request
151             encode_offsetfetch_request
152             encode_saslhandshake_request
153 9     9   6326 );
  9         22  
154              
155             =head1 SYNOPSIS
156              
157             use 5.010;
158             use strict;
159             use warnings;
160              
161             use Scalar::Util qw(
162             blessed
163             );
164             use Try::Tiny;
165              
166             # A simple example of Kafka::Connection usage:
167             use Kafka::Connection;
168              
169             # connect to local cluster with the defaults
170             my $connection;
171             try {
172             $connection = Kafka::Connection->new( host => 'localhost' );
173             } catch {
174             my $error = $_;
175             if ( blessed( $error ) && $error->isa( 'Kafka::Exception' ) ) {
176             warn $error->message, "\n", $error->trace->as_string, "\n";
177             exit;
178             } else {
179             die $error;
180             }
181             };
182              
183             # Closes the connection and cleans up
184             $connection->close;
185             undef $connection;
186              
187             =head1 DESCRIPTION
188              
189             The main features of the C<Kafka::Connection> class are:
190              
191             =over 3
192              
193             =item *
194              
195             Provides API for communication with Kafka 0.9+ cluster.
196              
197             =item *
198              
199             Performs requests encoding and responses decoding, provides automatic
200             selection or promotion of a leader server from Kafka cluster.
201              
202             =item *
203              
204             Provides information about Kafka cluster.
205              
206             =back
207              
208             =cut
209              
210             my %protocol = (
211             "$APIKEY_PRODUCE" => {
212             decode => \&decode_produce_response,
213             encode => \&encode_produce_request,
214             },
215             "$APIKEY_FETCH" => {
216             decode => \&decode_fetch_response,
217             encode => \&encode_fetch_request,
218             },
219             "$APIKEY_OFFSET" => {
220             decode => \&decode_offset_response,
221             encode => \&encode_offset_request,
222             },
223             "$APIKEY_METADATA" => {
224             decode => \&decode_metadata_response,
225             encode => \&encode_metadata_request,
226             },
227             "$APIKEY_APIVERSIONS" => {
228             decode => \&decode_api_versions_response,
229             encode => \&encode_api_versions_request,
230             },
231             "$APIKEY_FINDCOORDINATOR" => {
232             decode => \&decode_find_coordinator_response,
233             encode => \&encode_find_coordinator_request,
234             },
235             "$APIKEY_OFFSETCOMMIT" => {
236             decode => \&decode_offsetcommit_response,
237             encode => \&encode_offsetcommit_request,
238             },
239             "$APIKEY_OFFSETFETCH" => {
240             decode => \&decode_offsetfetch_response,
241             encode => \&encode_offsetfetch_request,
242             },
243             "$APIKEY_SASLHANDSHAKE" => {
244             decode => \&decode_saslhandshake_response,
245             encode => \&encode_saslhandshake_request,
246             },
247             );
248              
249             =head2 EXPORT
250              
251             The following constants are available for export
252              
253             =cut
254              
255             =head3 C<%RETRY_ON_ERRORS>
256              
257             These are non-fatal errors, which when happen causes refreshing of meta-data from Kafka followed by
258             another attempt to fetch data.
259              
260             =cut
261             # When any of the following error happens, a possible change in meta-data on server is expected.
262             const our %RETRY_ON_ERRORS => (
263             # $ERROR_NO_ERROR => 1, # 0 - No error
264             $ERROR_UNKNOWN => 1, # -1 - An unexpected server error
265             # $ERROR_OFFSET_OUT_OF_RANGE => 1, # 1 - The requested offset is not within the range of offsets maintained by the server
266             $ERROR_INVALID_MESSAGE => 1, # 2 - Retriable - This message has failed its CRC checksum, exceeds the valid size, or is otherwise corrupt
267             $ERROR_UNKNOWN_TOPIC_OR_PARTITION => 1, # 3 - Retriable - This server does not host this topic-partition
268             # $ERROR_INVALID_FETCH_SIZE => 1, # 4 - The requested fetch size is invalid
269             $ERROR_LEADER_NOT_AVAILABLE => 1, # 5 - Retriable - Unable to write due to ongoing Kafka leader selection
270             $ERROR_NOT_LEADER_FOR_PARTITION => 1, # 6 - Retriable - Server is not a leader for partition
271             $ERROR_REQUEST_TIMED_OUT => 1, # 7 - Retriable - Request time-out
272             $ERROR_BROKER_NOT_AVAILABLE => 1, # 8 - Broker is not available
273             $ERROR_REPLICA_NOT_AVAILABLE => 1, # 9 - Replica not available
274             # $ERROR_MESSAGE_TOO_LARGE => 1, # 10 - The request included a message larger than the max message size the server will accept
275             $ERROR_STALE_CONTROLLER_EPOCH => 1, # 11 - The controller moved to another broker
276             # $ERROR_OFFSET_METADATA_TOO_LARGE => 1, # 12 - The metadata field of the offset request was too large
277             $ERROR_NETWORK_EXCEPTION => 1, # 13 Retriable - The server disconnected before a response was received
278             $ERROR_GROUP_LOAD_IN_PROGRESS => 1, # 14 - Retriable - The coordinator is loading and hence can't process requests for this group
279             $ERROR_GROUP_COORDINATOR_NOT_AVAILABLE => 1, # 15 - Retriable - The group coordinator is not available
280             $ERROR_NOT_COORDINATOR_FOR_GROUP => 1, # 16 - Retriable - This is not the correct coordinator for this group
281              
282             # $ERROR_INVALID_TOPIC_EXCEPTION => 1, # 17 - The request attempted to perform an operation on an invalid topic
283             # $ERROR_RECORD_LIST_TOO_LARGE => 1, # 18 - The request included message batch larger than the configured segment size on the server
284             $ERROR_NOT_ENOUGH_REPLICAS => 1, # 19 - Retriable - Messages are rejected since there are fewer in-sync replicas than required
285             $ERROR_NOT_ENOUGH_REPLICAS_AFTER_APPEND => 1, # 20 - Retriable - Messages are written to the log, but to fewer in-sync replicas than required
286             # $ERROR_INVALID_REQUIRED_ACKS => 1, # 21 - Produce request specified an invalid value for required acks
287             # $ERROR_ILLEGAL_GENERATION => 1, # 22 - Specified group generation id is not valid
288             # $ERROR_INCONSISTENT_GROUP_PROTOCOL => 1, # 23 - The group member's supported protocols are incompatible with those of existing members
289             # $ERROR_INVALID_GROUP_ID => 1, # 24 - The configured groupId is invalid
290             # $ERROR_UNKNOWN_MEMBER_ID => 1, # 25 - The coordinator is not aware of this member
291             # $ERROR_INVALID_SESSION_TIMEOUT => 1, # 26 - The session timeout is not within the range allowed by the broker (as configured by group.min.session.timeout.ms and group.max.session.timeout.ms)
292             $ERROR_REBALANCE_IN_PROGRESS => 1, # 27 - The group is rebalancing, so a rejoin is needed
293             # $ERROR_INVALID_COMMIT_OFFSET_SIZE => 1, # 28 - The committing offset data size is not valid
294             # $ERROR_TOPIC_AUTHORIZATION_FAILED => 1, # 29 - Not authorized to access topics: Topic authorization failed
295             # $ERROR_GROUP_AUTHORIZATION_FAILED => 1, # 30 - Not authorized to access group: Group authorization failed
296             # $ERROR_CLUSTER_AUTHORIZATION_FAILED => 1, # 31 - Cluster authorization failed
297             # $ERROR_INVALID_TIMESTAMP => 1, # 32 - The timestamp of the message is out of acceptable range
298             # $ERROR_UNSUPPORTED_SASL_MECHANISM => 1, # 33 - The broker does not support the requested SASL mechanism
299             # $ERROR_ILLEGAL_SASL_STATE => 1, # 34 - Request is not valid given the current SASL state
300             # $ERROR_UNSUPPORTED_VERSION => 1, # 35 - The version of API is not supported
301             $ERROR_NO_CONNECTION => 1, # may be disconnected due to idle timeout etc.
302             );
303              
304             #-- constructor ----------------------------------------------------------------
305              
306             =head2 CONSTRUCTOR
307              
308             =head3 C<new>
309              
310             Creates C<Kafka::Connection> object for interaction with Kafka cluster.
311             Returns created C<Kafka::Connection> object.
312              
313             C<new()> takes arguments in key-value pairs. The following arguments are currently recognized:
314              
315             =over 3
316              
317             =item C<host =E<gt> $host>
318              
319             C<$host> is any Apache Kafka cluster host to connect to. It can be a hostname or the
320             IP-address in the "xx.xx.xx.xx" form.
321              
322             Optional. Either C<host> or C<broker_list> must be supplied.
323              
324             WARNING:
325              
326             Make sure that you always connect to brokers using EXACTLY the same address or host name
327             as specified in broker configuration (host.name in server.properties).
328             Avoid using default value (when host.name is commented out) in server.properties - always use explicit value instead.
329              
330             =item C<port =E<gt> $port>
331              
332             Optional, default = C<$KAFKA_SERVER_PORT>.
333              
334             C<$port> is the attribute denoting the port number of the service we want to
335             access (Apache Kafka service). C<$port> should be an integer number.
336              
337             C<$KAFKA_SERVER_PORT> is the default Apache Kafka server port constant (C<9092>) that can
338             be imported from the L<Kafka|Kafka> module.
339              
340             =item C<broker_list =E<gt> $broker_list>
341              
342             Optional, C<$broker_list> is a reference to array of the host:port or [IPv6_host]:port strings, defining the list
343             of Kafka servers. This list will be used to locate the new leader if the server specified
344             via C<host =E<gt> $host> and C<port =E<gt> $port> arguments becomes unavailable. Either C<host>
345             or C<broker_list> must be supplied.
346              
347             =item C<ip_version =E<gt> $ip_version>
348              
349             Specify version of IP for interpreting of passed IP address and resolving of host name.
350              
351             Optional, undefined by default, which works in the following way: version of IP address
352             is detected automatically, host name is resolved into IPv4 address.
353              
354             See description of L<$IP_V4|Kafka::IO/$IP_V4>, L<$IP_V6|Kafka::IO/$IP_V6>
355             in C<Kafka> L<EXPORT|Kafka/EXPORT>.
356              
357             =item C<timeout =E<gt> $timeout>
358              
359             Optional, default = C<$Kafka::REQUEST_TIMEOUT>.
360              
361             C<$timeout> specifies how long we wait for the remote server to respond.
362             C<$timeout> is in seconds, could be a positive integer or a floating-point number not bigger than int32 positive integer.
363              
364             Special behavior when C<timeout> is set to C<undef>:
365              
366             =back
367              
368             =over 3
369              
370             =item *
371              
372             Alarms are not used internally (namely when performing C<gethostbyname>).
373              
374             =item *
375              
376             Default C<$REQUEST_TIMEOUT> is used for the rest of IO operations.
377              
378             =back
379              
380             =over 3
381              
382             =item C<SEND_MAX_ATTEMPTS =E<gt> $attempts>
383              
384             Optional, int32 signed integer, default = C<$Kafka::SEND_MAX_ATTEMPTS> .
385              
386             In some circumstances (leader is temporarily unavailable, outdated metadata, etc) we may fail to send a message.
387             This property specifies the maximum number of attempts to send a message.
388             The C<$attempts> should be an integer number.
389              
390             =item C<RETRY_BACKOFF =E<gt> $backoff>
391              
392             Optional, default = C<$Kafka::RETRY_BACKOFF> .
393              
394             Since leader election takes a bit of time, this property specifies the amount of time,
395             in milliseconds, that the producer waits before refreshing the metadata.
396             The C<$backoff> should be an integer number.
397              
398             =item C<AutoCreateTopicsEnable =E<gt> $mode>
399              
400             Optional, default value is 0 (false).
401              
402             Kafka BUG "[KAFKA-1124]" (Fixed in Kafka 0.8.2):
403             I<AutoCreateTopicsEnable> controls how this module handles the first access to non-existent topic
404             when C<auto.create.topics.enable> in server configuration is C<true>.
405             If I<AutoCreateTopicsEnable> is false (default),
406             the first access to non-existent topic produces an exception;
407             however, the topic is created and next attempts to access it will succeed.
408              
409             If I<AutoCreateTopicsEnable> is true, this module waits
410             (according to the C<SEND_MAX_ATTEMPTS> and C<RETRY_BACKOFF> properties)
411             until the topic is created,
412             to avoid errors on the first access to non-existent topic.
413              
414             If C<auto.create.topics.enable> in server configuration is C<false>, this setting has no effect.
415              
416             =item C<MaxLoggedErrors =E<gt> $number>
417              
418             Optional, default value is 100.
419              
420             Defines maximum number of last non-fatal errors that we keep in log. Use method L</nonfatal_errors> to
421             access those errors.
422              
423             =item C<dont_load_supported_api_versions =E<gt> $boolean>
424              
425             Optional, default value is 0 (false).
426              
427             If set to false, when communicating with a broker, the client will
428             automatically try to find out the best version numbers to use for each of the
429             API endpoints.
430              
431             If set to true, the client will always use
432             C<$Kafka::Protocol::DEFAULT_APIVERSION> as API version.
433              
434             WARNING: API versions are supported starting from Kafka 0.10. Set this parameter to true
435             if you're connecting to 0.9.
436              
437             =back
438              
439             =cut
440              
441             my %Param_mapping = (
442             'timeout' => 'Timeout',
443             );
444              
445             sub new {
446 234     234 1 148422 my ( $class, %params ) = @_;
447              
448 234         2165 my $self = bless {
449             host => q{},
450             port => $KAFKA_SERVER_PORT,
451             broker_list => [],
452             Timeout => $REQUEST_TIMEOUT,
453             async => 0,
454             ip_version => undef,
455             SEND_MAX_ATTEMPTS => $SEND_MAX_ATTEMPTS,
456             RETRY_BACKOFF => $RETRY_BACKOFF,
457             AutoCreateTopicsEnable => 0,
458             MaxLoggedErrors => 100,
459             dont_load_supported_api_versions => 0,
460             sasl_username => undef,
461             sasl_password => undef,
462             sasl_mechanizm => undef,
463             }, $class;
464              
465 234         729 foreach my $p ( keys %params ) {
466 1001   66     2526 my $attr = $Param_mapping{ $p } // $p;
467 1001 50       1471 if( exists $self->{ $attr } ) {
468 1001         1736 $self->{ $attr } = $params{ $p };
469             }
470             else {
471 0         0 $self->_error( $ERROR_MISMATCH_ARGUMENT, $p );
472             }
473             }
474              
475             $self->_error( $ERROR_MISMATCH_ARGUMENT, 'host' )
476 234 100 100     1671 unless defined( $self->{host} ) && ( $self->{host} eq q{} || defined( _STRING( $self->{host} ) ) );
      100        
477             $self->_error( $ERROR_NOT_BINARY_STRING, 'host' )
478 226 100       690 if utf8::is_utf8( $self->{host} );
479             $self->_error( $ERROR_MISMATCH_ARGUMENT, 'port' )
480 225 100       6335 unless _POSINT( $self->{port} );
481             $self->_error( $ERROR_MISMATCH_ARGUMENT, format_message( 'Timeout (%s)', $self->{Timeout} ) )
482 210 50 66     3493 unless !defined( $self->{Timeout} ) || ( defined _NUMBER( $self->{Timeout} ) && int( 1000 * $self->{Timeout} ) >= 1 && int( $self->{Timeout} * 1000 ) <= $MAX_INT32 );
      66        
      33        
483             $self->_error( $ERROR_MISMATCH_ARGUMENT, 'broker_list' )
484 200 100       645 unless _ARRAY0( $self->{broker_list} );
485             $self->_error( $ERROR_MISMATCH_ARGUMENT, 'SEND_MAX_ATTEMPTS' )
486 184 100       3512 unless _POSINT( $self->{SEND_MAX_ATTEMPTS} );
487             $self->_error( $ERROR_MISMATCH_ARGUMENT, 'RETRY_BACKOFF' )
488 169 100       3899 unless _POSINT( $self->{RETRY_BACKOFF} );
489             $self->_error( $ERROR_MISMATCH_ARGUMENT, 'MaxLoggedErrors' )
490 154 50       3556 unless defined( _NONNEGINT( $self->{MaxLoggedErrors} ) );
491              
492 154         1222 my $ip_version = $self->{ip_version};
493 154 100 66     441 $self->_error( $ERROR_MISMATCH_ARGUMENT, format_message( 'ip_version (%s)', $ip_version ) )
      66        
      66        
494             unless ( !defined( $ip_version ) || ( defined( _NONNEGINT( $ip_version ) ) && ( $ip_version == $IP_V4 || $ip_version == $IP_V6 ) ) );
495              
496 152         343 $self->{_metadata} = {}; # {
497             # TopicName => {
498             # Partition => {
499             # 'Leader' => ...,
500             # 'Replicas' => [
501             # ...,
502             # ],
503             # 'Isr' => [
504             # ...,
505             # ],
506             # },
507             # ...,
508             # },
509             # ...,
510             # }
511 152         485 $self->{_leaders} = {}; # {
512             # NodeId => host:port or [IPv6_host]:port,
513             # ...,
514             # }
515 152         261 $self->{_group_coordinators} = {}; # {
516             # GroupId => host:port or [IPv6_host]:port,
517             # ...,
518             # }
519 152         252 $self->{_nonfatal_errors} = [];
520 152         274 my $IO_cache = $self->{_IO_cache} = {}; # host:port or [IPv6_host]:port => {
521             # 'NodeId' => ...,
522             # 'IO' => ...,
523             # 'timeout' => ...,
524             # 'host' => ...,
525             # 'port' => ...,
526             # 'error' => ...,
527             # },
528             # ...,
529              
530             # init IO cache
531 152 100       540 foreach my $server ( ( $self->{host} ? $self->_build_server_name( $self->{host}, $self->{port} ) : (), @{ $self->{broker_list} } ) ) {
  152         384  
532 170 100       423 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'bad host:port or broker_list element' )
533             unless $self->_is_like_server( $server );
534 151         336 my ( $host, $port ) = _split_host_port( $server );
535 151         360 my $correct_server = $self->_build_server_name( $host, $port );
536 151         790 $IO_cache->{ $correct_server } = {
537             NodeId => undef,
538             IO => undef,
539             host => $host,
540             port => $port,
541             };
542             }
543              
544 133 100       399 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'server is not specified' )
545             unless keys( %$IO_cache );
546              
547 132         1133 return $self;
548             }
549              
550             #-- public attributes ----------------------------------------------------------
551              
552             =head2 METHODS
553              
554             The following methods are defined for the C<Kafka::Producer> class:
555              
556             =cut
557              
558             #-- public methods -------------------------------------------------------------
559              
560             =head3 C<get_known_servers>
561              
562             Returns the list of known Kafka servers (in host:port or [IPv6_host]:port format).
563              
564             =cut
565             sub get_known_servers {
566 146     146 1 2984 my ( $self ) = @_;
567              
568 146         204 return keys %{ $self->{_IO_cache} };
  146         846  
569             }
570              
571             sub _get_api_versions {
572 10124     10124   11659 my ( $self, $server ) = @_;
573              
574 10124         10713 my $server_metadata = $self->{_IO_cache}->{$server};
575 10124 50       12013 defined $server_metadata
576             or die "Fatal error: server '$server' is unknown in IO cache, which should not happen";
577              
578             # if we have cached data, just use it
579             defined $server_metadata->{_api_versions}
580 10124 100       19642 and return $server_metadata->{_api_versions};
581              
582             # no cached data. Initialize empty one
583 96         285 my $server_api_versions = $server_metadata->{_api_versions} = {};
584              
585             # use empty data if client doesn't want to detect API versions
586             $self->{dont_load_supported_api_versions}
587 96 50       427 and return $server_api_versions;
588              
589             # call the server and try to get the supported API versions
590 0         0 my $api_versions = [];
591 0         0 my $error;
592             try {
593             # The ApiVersions API endpoint is only supported on Kafka versions >
594             # 0.10.0.0 so this call may fail. We simply ignore this failure and
595             # carry on.
596 0     0   0 $api_versions = $self->_get_supported_api_versions( $server );
597             }
598             catch {
599 0     0   0 $error = $_;
600 0         0 };
601              
602 0 0       0 if( defined $error ) {
603 0 0 0     0 if ( blessed( $error ) && $error->isa( 'Kafka::Exception' ) ) {
604 0 0       0 if( $error->code == $ERROR_MISMATCH_ARGUMENT ) {
605             # rethrow known fatal errors
606 0         0 die $error;
607             }
608 0         0 $self->_remember_nonfatal_error( $error->code, $error, $server );
609             } else {
610 0         0 die $error;
611             }
612             }
613              
614 0         0 foreach my $element (@$api_versions) {
615             # we want to choose which api version to use for each API call. We
616             # try to use the max version that the server supports, with
617             # fallback to the max version the protocol implements. If it's
618             # lower than the min version the kafka server supports, we set it
619             # to -1. If thie API endpoint is called, it'll die.
620 0         0 my $kafka_min_version = $element->{MinVersion};
621 0         0 my $kafka_max_version = $element->{MaxVersion};
622 0         0 my $api_key = $element->{ApiKey};
623 0   0     0 my $implemented_max_version = $IMPLEMENTED_APIVERSIONS->{$api_key} // -1;
624 0         0 my $version = $kafka_max_version;
625 0 0       0 $version > $implemented_max_version
626             and $version = $implemented_max_version;
627 0 0       0 $version < $kafka_min_version
628             and $version = -1;
629 0         0 $server_api_versions->{$api_key} = $version;
630             }
631              
632 0         0 return $server_api_versions;
633             }
634              
635             # Returns the list of supported API versions. This is not really. *Warning*,
636             # this call works only against Kafka 1.10.0.0
637              
638             sub _get_supported_api_versions {
639 0     0   0 my ( $self, $broker ) = @_;
640              
641 0         0 my $CorrelationId = _get_CorrelationId();
642 0         0 my $decoded_request = {
643             CorrelationId => $CorrelationId,
644             ClientId => q{},
645             ApiVersion => 0,
646             };
647 0 0       0 say STDERR format_message( '[%s] apiversions request: %s',
648             scalar( localtime ),
649             $decoded_request,
650             ) if $self->debug_level;
651 0         0 my $encoded_request = $protocol{ $APIKEY_APIVERSIONS }->{encode}->( $decoded_request );
652              
653 0         0 my $encoded_response_ref;
654              
655             # receive apiversions. We use a code block because it's actually a loop where
656             # you can do last.
657             {
658 0 0       0 $self->_connectIO( $broker )
  0         0  
659             or last;
660 0 0       0 my $sent = $self->_sendIO( $broker, $encoded_request )
661             or last;
662 0         0 $encoded_response_ref = $self->_receiveIO( $broker );
663             }
664              
665 0 0       0 unless ( $encoded_response_ref ) {
666             # NOTE: it is possible to repeat the operation here
667 0         0 $self->_error( $ERROR_CANNOT_RECV );
668             }
669              
670 0         0 my $decoded_response = $protocol{ $APIKEY_APIVERSIONS }->{decode}->( $encoded_response_ref );
671 0 0       0 say STDERR format_message( '[%s] apiversions response: %s',
672             scalar( localtime ),
673             $decoded_response,
674             ) if $self->debug_level;
675 0 0 0     0 ( defined( $decoded_response->{CorrelationId} ) && $decoded_response->{CorrelationId} == $CorrelationId )
676             # FATAL error
677             or $self->_error( $ERROR_MISMATCH_CORRELATIONID );
678 0         0 my $ErrorCode = $decoded_response->{ErrorCode};
679              
680             # we asked a Kafka < 0.10 ( in this case the call is not
681             # implemented and it dies
682 0 0       0 $ErrorCode == $ERROR_NO_ERROR
683             or $self->_error($ErrorCode);
684              
685 0         0 my $api_versions = $decoded_response->{ApiVersions};
686 0         0 return $api_versions;
687             }
688              
689             =head3 C<sasl_auth( $broker, Username =E<gt> $username, Password =E<gt> $password )>
690              
691             Auth on C<$broker>. Connection must be established in advance.
692             C<$username> and C<$password> are the username and password for
693             SASL PLAINTEXT/SCRAM authentication respectively.
694              
695             =cut
696             sub sasl_auth {
697 1     1 1 5 my ($self, $broker, %p) = @_;
698 1         4 my ($username, $password, $mechanizm) = ($p{Username}, $p{Password}, $p{Mechanizm});
699 1   50     12 $mechanizm ||= 'PLAIN';
700 1 50 33     14 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'Username' )
      33        
701             unless defined( $username ) && defined( _STRING( $username ) ) && !utf8::is_utf8( $username );
702 1 50 33     10 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'Password' )
      33        
703             unless defined( $password ) && defined( _STRING( $password ) ) && !utf8::is_utf8( $password );
704              
705 1         7 my $decoded_request = {
706             CorrelationId => Kafka::Internals->_get_CorrelationId(),
707             ClientId => q{},
708             ApiVersion => 0,
709             Mechanism => $mechanizm,
710             };
711              
712 1         10 my $encoded_request = $protocol{ $APIKEY_SASLHANDSHAKE }->{encode}->( $decoded_request );
713              
714 1 50 33     7 return unless $self->_sendIO( $broker, $encoded_request ) &&
715             ( my $encoded_response_ref = $self->_receiveIO( $broker ) );
716              
717 1 50       9 if ($mechanizm =~ /SCRAM-(.*)$/) {
    0          
718 1         5 my $digest = $1;
719 1         12 my $client = Authen::SCRAM::Client->new(
720             digest => $digest,
721             username => $username,
722             password => $password,
723             );
724 1         9536 my $first_msg = encode("utf-8", $client->first_msg());
725 1         5002 my $client_first = pack "l>a*", length($first_msg), $first_msg;
726 1 50       8 return unless $self->_sendIO( $broker, $client_first);
727              
728 1         5 $encoded_response_ref = $self->_receiveIO( $broker );
729 1 50       4 return unless $encoded_response_ref;
730 1         5 my ($sasl_first_resp_len, $server_first) = unpack "l>a*", $$encoded_response_ref;
731 1         6 my $final_msg = encode("utf-8", $client->final_msg( $server_first ));
732 1         56990 my $client_final = pack "l>a*", length($final_msg), $final_msg;
733 1 50       7 return unless $self->_sendIO( $broker, $client_final );
734 1         6 $encoded_response_ref = $self->_receiveIO( $broker );
735 1 50       15 return unless $encoded_response_ref;
736 1         9 my ($sasl_final_resp_len, $server_final) = unpack "l>a*", $$encoded_response_ref;
737 1         7 return $client->validate( $server_final );
738             } elsif ($mechanizm eq 'PLAIN') {
739 0         0 my $msg = $username . "\0" . $username . "\0" . $password;
740              
741 0         0 my $encoded_sasl_req = pack( 'l>a'.length($msg), length($msg), $msg );
742              
743 0         0 $self->_sendIO( $broker, $encoded_sasl_req );
744              
745 0         0 my ($server_data, $io) = $self->_server_data_IO($broker);
746 0         0 my $encoded_sasl_resp_len_ref = $io->try_receive( 4 ); # receive resp msg size (actually is 0)
747 0 0       0 return 0 unless $$encoded_sasl_resp_len_ref;
748              
749 0         0 my $sasl_resp_len = unpack 'l>', $$encoded_sasl_resp_len_ref;
750 0 0       0 return 1 unless $sasl_resp_len;
751 0         0 $io->receive( $sasl_resp_len );
752 0         0 return 1;
753             } else {
754 0         0 die "Unsupported mechanizm $mechanizm";
755             }
756             }
757              
758             =head3 C<get_metadata( $topic )>
759              
760             If C<$topic> is present, it must be a non-false string of non-zero length.
761              
762             If C<$topic> is absent, this method returns metadata for all topics.
763              
764             Updates kafka cluster's metadata description and returns the hash reference to metadata,
765             which can be schematically described as:
766              
767             {
768             TopicName => {
769             Partition => {
770             'Leader' => ...,
771             'Replicas' => [
772             ...,
773             ],
774             'Isr' => [
775             ...,
776             ],
777             },
778             ...,
779             },
780             ...,
781             }
782              
783             Consult Kafka "Wire protocol" documentation for more details about metadata structure.
784              
785             =cut
786             sub get_metadata {
787 2     2 1 1195 my ( $self, $topic ) = @_;
788              
789 2 50 33     19 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'topic' )
      66        
790             unless !defined( $topic ) || ( $topic eq '' || defined( _STRING( $topic ) ) );
791 2 50       9 $self->_error( $ERROR_NOT_BINARY_STRING, 'topic' )
792             if utf8::is_utf8( $topic );
793              
794 2 0       11 $self->_update_metadata( $topic )
    50          
795             # FATAL error
796             or $self->_error( $ERROR_CANNOT_GET_METADATA, $topic ? format_message( "topic='%s'", $topic ) : '' );
797              
798 2         6 my $clone;
799 2 100       9 if ( defined $topic ) {
800             $clone = {
801 1         91 $topic => dclone( $self->{_metadata}->{ $topic } )
802             };
803             } else {
804 1         87 $clone = dclone( $self->{_metadata} );
805             }
806              
807 2         12 return $clone;
808             }
809              
810             =head3 C<is_server_known( $server )>
811              
812             Returns true, if C<$server> (host:port or [IPv6_host]:port) is known in cluster.
813              
814             =cut
815             sub is_server_known {
816 43     43 1 1005 my ( $self, $server ) = @_;
817              
818 43 100       87 $self->_error( $ERROR_MISMATCH_ARGUMENT )
819             unless $self->_is_like_server( $server );
820              
821 5         20 return exists $self->{_IO_cache}->{ $server };
822             }
823              
824             # Returns true, if known C<$server> (host:port or [IPv6_host]:port) is accessible.
825             # Checks the accessibility of the server.
826             # This is evil: opens and closes a NEW connection immediately so do not use unless there is a strong reason for it.
827             sub _is_server_alive {
828 22     22   2285 my ( $self, $server ) = @_;
829              
830 22 100       46 $self->_error( $ERROR_MISMATCH_ARGUMENT )
831             unless $self->_is_like_server( $server );
832              
833 3 50       10 $self->_error( $ERROR_NO_KNOWN_BROKERS, 'has not yet received the metadata?' )
834             unless $self->get_known_servers;
835              
836 3         7 my $io_cache = $self->{_IO_cache};
837             $self->_error( $ERROR_MISMATCH_ARGUMENT, format_message( "Unknown server '%s' (is not found in the metadata)", $server ) )
838 3 100       10 unless exists( $io_cache->{ $server } );
839              
840 2 50       6 if ( my $io = $self->_connectIO( $server ) ) {
841 2         8 return $io->_is_alive;
842             } else {
843 0         0 return;
844             }
845             }
846              
847             # this is evil, do not use unless there is a very strong reason for it
848             sub _is_server_connected {
849 33     33   1496 my ( $self, $server ) = @_;
850              
851 33 100       67 $self->_error( $ERROR_MISMATCH_ARGUMENT )
852             unless $self->_is_like_server( $server );
853              
854 14         30 my $io_cache = $self->{_IO_cache};
855 14         19 my $io;
856 14 100 66     52 unless ( exists( $io_cache->{ $server } ) && ( $io = $io_cache->{ $server }->{IO} ) ) {
857 8         22 return;
858             }
859              
860 6         20 return $io->_is_alive;
861             }
862              
863             =head3 C<receive_response_to_request( $request, $compression_codec )>
864              
865             =over 3
866              
867             =item C<$request>
868              
869             C<$request> is a reference to the hash representing
870             the structure of the request.
871              
872             This method encodes C<$request>, passes it to the leader of cluster, receives reply, decodes and returns
873             it in a form of hash reference.
874              
875             =back
876              
877             WARNING:
878              
879             =over 3
880              
881             =item *
882              
883             This method should be considered private and should not be called by an end user.
884              
885             =item *
886              
887             In order to achieve better performance, this method does not perform arguments validation.
888              
889             =back
890              
891             =over 3
892              
893             =item C<$compression_codec>
894              
895             Optional.
896              
897             C<$compression_codec> sets the required type of C<$messages> compression,
898             if the compression is desirable.
899              
900             Supported codecs:
901             L<$COMPRESSION_NONE|Kafka/$COMPRESSION_NONE>,
902             L<$COMPRESSION_GZIP|Kafka/$COMPRESSION_GZIP>,
903             L<$COMPRESSION_SNAPPY|Kafka/$COMPRESSION_SNAPPY>,
904             L<$COMPRESSION_LZ4|Kafka/$COMPRESSION_LZ4>.
905              
906             NOTE: $COMPRESSION_LZ4 requires Kafka 0.10 or higher, as initial implementation of LZ4 in Kafka did not follow the standard LZ4 framing specification.
907              
908              
909             =back
910              
911             =cut
912             sub receive_response_to_request {
913 10098     10098 1 19858 my ( $self, $request, $compression_codec, $response_timeout ) = @_;
914              
915 10098         13155 my $api_key = $request->{ApiKey};
916              
917 10098   50     22914 my $host_to_send_to = $request->{__send_to__} // 'leader';
918              
919             # WARNING: The current version of the module limited to the following:
920             # supports queries with only one combination of topic + partition (first and only).
921              
922 10098         11798 my $topic_data = $request->{topics}->[0];
923 10098         11258 my $topic_name = $topic_data->{TopicName};
924 10098         10717 my $partition = $topic_data->{partitions}->[0]->{Partition};
925              
926 10098 100 33     8906 if (
      33        
      66        
927 10098         47831 !%{ $self->{_metadata} } # the first request
928             || ( !$self->{AutoCreateTopicsEnable} && defined( $topic_name ) && !exists( $self->{_metadata}->{ $topic_name } ) )
929             ) {
930 52 100       144 $self->_update_metadata( $topic_name ) # hash metadata could be updated
931             # FATAL error
932             or $self->_error( $ERROR_CANNOT_GET_METADATA, format_message( "topic='%s'", $topic_name ), request => $request )
933             ;
934             }
935              
936 10095 50       18072 $request->{CorrelationId} = _get_CorrelationId() unless exists $request->{CorrelationId};
937              
938 10095 50       18461 say STDERR format_message( '[%s] compression_codec=%s, request=%s',
939             scalar( localtime ),
940             $compression_codec,
941             $request,
942             ) if $self->debug_level;
943              
944 10095         11397 my( $ErrorCode, $partition_data, $io_error );
945              
946 10095         10077 my $attempt = 0;
947             # we save the original api version of the request, because in the attempt
948             # loop we might be trying different brokers which may support different api
949             # versions.
950 10095         10406 my $original_request_api_version = $request->{ApiVersion};
951 10095   50     20284 ATTEMPT: while ( ++$attempt <= ( $self->{SEND_MAX_ATTEMPTS} // 1 ) ) {
952 10146         10975 $ErrorCode = $ERROR_NO_ERROR;
953 10146         10305 undef $io_error;
954              
955 10146         8607 my $server;
956 10146 50       13500 if ($host_to_send_to eq 'leader') {
    0          
957             # hash metadata could be updated
958 10146         13331 my $leader = $self->{_metadata}->{ $topic_name }->{ $partition }->{Leader};
959 10146 50       16163 next ATTEMPT unless defined $leader;
960              
961 10146         11786 $server = $self->{_leaders}->{ $leader };
962 10146 50       13213 unless ( $server ) {
963 0         0 $ErrorCode = $ERROR_LEADER_NOT_FOUND;
964 0         0 $self->_remember_nonfatal_error( $ErrorCode, $ERROR{ $ErrorCode }, $server, $topic_name, $partition );
965 0         0 next ATTEMPT;
966             }
967             } elsif ( $host_to_send_to eq 'group_coordinator') {
968 0         0 my $group_id = $request->{GroupId};
969 0 0 0     0 if ( !%{ $self->{_group_coordinators} } && defined $group_id) {
  0         0  
970             # first request
971 0         0 $self->_update_group_coordinators($group_id);
972             }
973 0         0 $server = $self->{_group_coordinators}->{$group_id};
974 0 0       0 unless ( $server ) {
975 0         0 $ErrorCode = $ERROR_GROUP_COORDINATOR_NOT_FOUND;
976 0         0 $self->_remember_nonfatal_error( $ErrorCode, $ERROR{ $ErrorCode }, $server, $topic_name, $partition );
977 0         0 next ATTEMPT;
978             }
979             } else {
980 0         0 die "__send_to__ must be either 'leader', 'group_coordinator', or void (will default to 'leader')";
981             }
982              
983             # Send a request to the server
984 10146 100       16323 if ( $self->_connectIO( $server ) ) {
985             # we can connect to this server, so let's detect the api versions
986             # it and use whatever it supports, except if the request forces us
987             # to use an api version. Warning, the version might end up being
988             # undef if detection against the Kafka server failed, or if
989             # dont_load_supported_api_versions is true. However the Encoder
990             # code knows how to handle it.
991 10142         11957 $request->{ApiVersion} = $original_request_api_version;
992 10142 100       14270 unless( defined $request->{ApiVersion} ) {
993 10124         15198 $request->{ApiVersion} = $self->_get_api_versions( $server )->{ $api_key };
994             # API versions request may fail and the server may be disconnected
995 10124 50       16536 unless( $self->_is_IO_connected( $server ) ) {
996             # this attempt does not count, assuming that _get_api_versions will not try to get them from failing broker again
997 0         0 redo ATTEMPT;
998             }
999             }
1000              
1001 10142         23360 my $encoded_request = $protocol{ $api_key }->{encode}->( $request, $compression_codec );
1002              
1003 10142 100       18975 unless ( $self->_sendIO( $server, $encoded_request ) ) {
1004 2         9 $io_error = $self->_io_error( $server );
1005 2 50       13 $ErrorCode = $io_error ? $io_error->code : $ERROR_CANNOT_SEND;
1006 2         62 $self->_closeIO( $server, 1 );
1007             }
1008             }
1009             else {
1010 4         16 $io_error = $self->_io_error( $server );
1011 4 50       23 $ErrorCode = $io_error ? $io_error->code : $ERROR_CANNOT_BIND;
1012             }
1013              
1014 10146 100       15866 if ( $ErrorCode != $ERROR_NO_ERROR ) {
1015             # could not send request due to non-fatal IO error (fatal errors should be thrown by connectIO/sendIO already)
1016 6         17 $self->_remember_nonfatal_error( $ErrorCode, $self->_io_error( $server ), $server, $topic_name, $partition );
1017 6 50 33     35 if( $api_key == $APIKEY_PRODUCE && !( $ErrorCode == $ERROR_CANNOT_BIND || $ErrorCode == $ERROR_NO_CONNECTION ) ) {
      66        
1018             # do not retry failed produce requests which may have sent some data already
1019 0         0 $ErrorCode = $ERROR_CANNOT_SEND;
1020 0         0 last ATTEMPT;
1021             }
1022 6         23 next ATTEMPT;
1023             }
1024              
1025 10140         10003 my $response;
1026 10140 100 100     26890 if ( $api_key == $APIKEY_PRODUCE && $request->{RequiredAcks} == $NOT_SEND_ANY_RESPONSE ) {
1027             # Do not receive a response, self-forming own response
1028             $response = {
1029             CorrelationId => $request->{CorrelationId},
1030 5009         18812 topics => [
1031             {
1032             TopicName => $topic_name,
1033             partitions => [
1034             {
1035             Partition => $partition,
1036             ErrorCode => 0,
1037             Offset => $BAD_OFFSET,
1038             },
1039             ],
1040             },
1041             ],
1042             };
1043             } else {
1044 5131         10228 my $encoded_response_ref = $self->_receiveIO( $server, $response_timeout );
1045 5131 100       7222 unless ( $encoded_response_ref ) {
1046 2 100       7 if ( $api_key == $APIKEY_PRODUCE ) {
1047             # WARNING: Unfortunately, the sent package (one or more messages) does not have a unique identifier
1048             # and there is no way to verify the delivery of data
1049 1         2 $ErrorCode = $ERROR_SEND_NO_ACK;
1050              
1051             # Should not be allowed to re-send data on the next attempt
1052             # FATAL error
1053 1         8 $self->_error( $ErrorCode, "no ack for request", io_error => $self->_io_error( $server ), request => $request );
1054 0         0 last ATTEMPT;
1055             } else {
1056 1         7 $ErrorCode = $ERROR_CANNOT_RECV;
1057 1         4 $self->_remember_nonfatal_error( $ErrorCode, $self->_io_error( $server ), $server, $topic_name, $partition );
1058 1         4 next ATTEMPT;
1059             }
1060             }
1061 5129 100       7934 if ( length( $$encoded_response_ref ) > 4 ) { # MessageSize => int32
1062             # we also pass the api version that was used for the request,
1063             # so that we know how to decode the response
1064 5128         12947 $response = $protocol{ $api_key }->{decode}->( $encoded_response_ref, $request->{ApiVersion} );
1065 5128 50       12756 say STDERR format_message( '[%s] response: %s',
1066             scalar( localtime ),
1067             $response,
1068             ) if $self->debug_level;
1069             } else {
1070 1         6 $self->_error( $ERROR_RESPONSEMESSAGE_NOT_RECEIVED, format_message("response length=%s", length( $$encoded_response_ref ) ), io_error => $self->_io_error( $server ), request => $request );
1071             }
1072             }
1073              
1074             # FATAL error if correllation does not match
1075             $self->_error( $ERROR_MISMATCH_CORRELATIONID, "$response->{CorrelationId} != $request->{CorrelationId}", request => $request, response => $response )
1076             unless $response->{CorrelationId} == $request->{CorrelationId}
1077 10137 50       17887 ;
1078 10137         11108 $topic_data = $response->{topics}->[0];
1079 10137 100       18428 $partition_data = $topic_data->{ $api_key == $APIKEY_OFFSET ? 'PartitionOffsets' : 'partitions' }->[0];
1080              
1081 10137         10476 $ErrorCode = $partition_data->{ErrorCode};
1082              
1083 10137 100       29601 return $response if $ErrorCode == $ERROR_NO_ERROR; # success
1084              
1085 84 50 33     487 if( $api_key == $APIKEY_PRODUCE && $ErrorCode == $ERROR_REQUEST_TIMED_OUT ) {
1086             # special case: produce request timed out so we did not get expected ACK and should not retry sending request again
1087             # Should not be allowed to re-send data on the next attempt
1088             # FATAL error
1089 0         0 $self->_error( $ERROR_SEND_NO_ACK, format_message( "topic='%s', partition=%s response error: %s", $topic_name, $partition, $ErrorCode ), request => $request, response => $response );
1090 0         0 last ATTEMPT;
1091             }
1092              
1093 84 100       303 if ( exists $RETRY_ON_ERRORS{ $ErrorCode } ) {
1094 64         366 $self->_remember_nonfatal_error( $ErrorCode, $ERROR{ $ErrorCode }, $server, $topic_name, $partition );
1095 64         320 next ATTEMPT;
1096             }
1097              
1098             # FATAL error
1099 20         60 $self->_error( $ErrorCode, format_message( "topic='%s', partition=%s", $topic_name, $partition ), request => $request );
1100             } continue {
1101             # Expect to possible changes in the situation, such as restoration of connection
1102             say STDERR format_message( '[%s] sleeping for %d ms before making request attempt #%d (%s)',
1103             scalar( localtime ),
1104             $self->{RETRY_BACKOFF},
1105 71 0       236 $attempt + 1,
    50          
1106             $ErrorCode == $ERROR_NO_ERROR ? 'refreshing metadata' : "ErrorCode ${ErrorCode}",
1107             ) if $self->debug_level;
1108              
1109 71         28212495 Time::HiRes::sleep( $self->{RETRY_BACKOFF} / 1000 );
1110              
1111 71 100 33     2227 $self->_update_metadata( $topic_name )
1112             # FATAL error
1113             or $self->_error( $ErrorCode || $ERROR_CANNOT_GET_METADATA, format_message( "topic='%s', partition=%s", $topic_name, $partition ), request => $request )
1114             ;
1115 68 50       510 if ( $host_to_send_to eq 'group_coordinator') {
1116             $self->_update_group_coordinators($request->{GroupId})
1117 0         0 }
1118             }
1119              
1120             # FATAL error
1121 17 50       66 if ( $ErrorCode ) {
1122 17 100       155 $self->_error( $ErrorCode, format_message( "topic='%s'%s", $topic_data->{TopicName}, $partition_data ? ", partition = ".$partition_data->{Partition} : '' ), request => $request, io_error => $io_error );
1123             } else {
1124 0         0 $self->_error( $ERROR_UNKNOWN_TOPIC_OR_PARTITION, format_message( "topic='%s', partition=%s", $topic_name, $partition ), request => $request, io_error => $io_error );
1125             }
1126              
1127 0         0 return;
1128             }
1129              
1130             =head3 C<exists_topic_partition( $topic, $partition )>
1131              
1132             Returns true if the metadata contains information about specified combination of topic and partition.
1133             Otherwise returns false.
1134              
1135             C<exists_topic_partition()> takes the following arguments:
1136              
1137             =over 3
1138              
1139             =item C<$topic>
1140              
1141             The C<$topic> must be a normal non-false string of non-zero length.
1142              
1143             =item C<$partition>
1144              
1145             =back
1146              
1147             =cut
1148             sub exists_topic_partition {
1149 3     3 1 657 my ( $self, $topic, $partition ) = @_;
1150              
1151 3 50 33     28 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'topic' )
      33        
1152             unless defined( $topic ) && ( $topic eq q{} || defined( _STRING( $topic ) ) );
1153 3 50       10 $self->_error( $ERROR_NOT_BINARY_STRING, 'topic' )
1154             if utf8::is_utf8( $topic );
1155 3 50 33     21 $self->_error( $ERROR_MISMATCH_ARGUMENT, 'partition' )
      33        
1156             unless defined( $partition ) && isint( $partition ) && $partition >= 0;
1157              
1158 3 50       6 unless ( %{ $self->{_metadata} } ) { # the first request
  3         9  
1159 0 0       0 $self->_update_metadata( $topic ) # hash metadata could be updated
1160             # FATAL error
1161             or $self->_error( $ERROR_CANNOT_GET_METADATA, format_message( "topic='%s'", $topic ) );
1162             }
1163              
1164 3         17 return exists $self->{_metadata}->{ $topic }->{ $partition };
1165             }
1166              
1167             =head3 C<close_connection( $server )>
1168              
1169             Closes connection with C<$server> (defined as host:port or [IPv6_host]:port).
1170              
1171             =cut
1172             sub close_connection {
1173 22     22 1 2295 my ( $self, $server ) = @_;
1174              
1175 22 50       50 unless ( $self->is_server_known( $server ) ) {
1176 0         0 return;
1177             }
1178              
1179 3         10 $self->_closeIO( $server );
1180 3         12 return 1;
1181             }
1182              
1183             =head3 C<close>
1184              
1185             Closes connection with all known Kafka servers.
1186              
1187             =cut
1188             sub close {
1189 7     7 1 47292 my ( $self ) = @_;
1190              
1191 7         25 foreach my $server ( $self->get_known_servers ) {
1192 19         39 $self->_closeIO( $server );
1193             }
1194              
1195 7         160 return;
1196             }
1197              
1198             =head3 C<cluster_errors>
1199              
1200             Returns a reference to a hash.
1201              
1202             Each hash key is the identifier of the server (host:port or [IPv6_host]:port), and the value is the last communication error
1203             with that server.
1204              
1205             An empty hash is returned if there were no communication errors.
1206              
1207             =cut
1208             sub cluster_errors {
1209 2     2 1 1956 my ( $self ) = @_;
1210              
1211 2         5 my %errors;
1212 2         8 foreach my $server ( $self->get_known_servers ) {
1213 4 100       12 if ( my $error = $self->_io_error( $server ) ) {
1214 3         28 $errors{ $server } = $error;
1215             }
1216             }
1217              
1218 2         11 return \%errors;
1219             }
1220              
1221             =head3 C<nonfatal_errors>
1222              
1223             Returns a reference to an array of the last non-fatal errors.
1224              
1225             Maximum number of entries is set using C<MaxLoggedErrors> parameter of L<constructor|/new>.
1226              
1227             A reference to the empty array is returned if there were no non-fatal errors or parameter C<MaxLoggedErrors>
1228             is set to 0.
1229              
1230             =cut
1231             sub nonfatal_errors {
1232 81     81 1 103806 my ( $self ) = @_;
1233              
1234 81         341 return $self->{_nonfatal_errors};
1235             }
1236              
1237             =head3 C<clear_nonfatals>
1238              
1239             Clears an array of the last non-fatal errors.
1240              
1241             A reference to the empty array is returned because there are no non-fatal errors now.
1242              
1243             =cut
1244             sub clear_nonfatals {
1245 0     0 1 0 my ( $self ) = @_;
1246              
1247 0         0 @{ $self->{_nonfatal_errors} } = ();
  0         0  
1248              
1249 0         0 return $self->{_nonfatal_errors};
1250             }
1251              
1252             #-- private attributes ---------------------------------------------------------
1253              
1254             #-- private functions ----------------------------------------------------------
1255              
1256             sub _split_host_port {
1257 374     374   563 my ( $server ) = @_;
1258              
1259 374         1509 my ( $host, $port ) = $server=~ /^(.+):(\d+)$/;
1260 374 50 66     1279 $host = $1 if $host && $host =~ /^\[(.+)\]$/;
1261              
1262 374         887 return( $host, $port );
1263             }
1264              
1265             #-- private methods ------------------------------------------------------------
1266              
1267             # Remember non-fatal error
1268             sub _remember_nonfatal_error {
1269 71     71   294 my ( $self, $error_code, $error, $server, $topic, $partition ) = @_;
1270              
1271             my $max_logged_errors = $self->{MaxLoggedErrors}
1272 71 50       250 or return;
1273              
1274 0         0 shift( @{ $self->{_nonfatal_errors} } )
1275 71 50       134 if scalar( @{ $self->{_nonfatal_errors} } ) == $max_logged_errors;
  71         222  
1276             my $msg = format_message( "[%s] Non-fatal error: %s (ErrorCode %s, server '%s', topic '%s', partition %s)",
1277             scalar( localtime ),
1278 71 0 0     5797 $error // ( defined( $error_code ) && exists( $ERROR{ $error_code } ) ? $ERROR{ $error_code } : '<undef>' ),
      33        
      50        
1279             $error_code // 'IO error',
1280             $server,
1281             $topic,
1282             $partition,
1283             );
1284              
1285 71 50       309 say STDERR $msg
1286             if $self->debug_level;
1287              
1288 71         131 push @{ $self->{_nonfatal_errors} }, $msg;
  71         255  
1289              
1290 71         191 return $msg;
1291             }
1292              
1293             # Returns identifier of the cluster leader (host:port or [IPv6_host]:port)
1294             sub _find_leader_server {
1295 119     119   294 my ( $self, $node_id ) = @_;
1296              
1297 119         159 my $leader_server;
1298 119         215 my $IO_cache = $self->{_IO_cache};
1299 119         148 my $NodeId;
1300 119         421 foreach my $server ( keys %$IO_cache ) {
1301 251         316 $NodeId = $IO_cache->{ $server }->{NodeId};
1302 251 100 66     874 if ( defined( $NodeId ) && $NodeId == $node_id ) {
1303 119         207 $leader_server = $server;
1304 119         202 last;
1305             }
1306             }
1307              
1308 119         562 return $leader_server;
1309             }
1310              
1311             # Form a list of servers to attempt querying of the metadata
1312             sub _get_interviewed_servers {
1313 125     125   317 my ( $self ) = @_;
1314              
1315 125         239 my ( @priority, @secondary, @rest );
1316 125         301 my $IO_cache = $self->{_IO_cache};
1317 125         179 my $server_data;
1318 125         514 foreach my $server ( $self->get_known_servers ) {
1319 267         424 $server_data = $IO_cache->{ $server };
1320 267 100       712 if ( defined $server_data->{NodeId} ) {
1321 213 100       580 if ( $server_data->{IO} ) {
1322 135         312 push @priority, $server;
1323             } else {
1324 78         290 push @secondary, $server;
1325             }
1326             } else {
1327 54         110 push @rest, $server;
1328             }
1329             }
1330              
1331 125         1021 return( shuffle( @priority ), shuffle( @secondary ), shuffle( @rest ) );
1332             }
1333              
1334             # Refresh group_coordinators for given topic
1335             sub _update_group_coordinators {
1336 0     0   0 my ($self, $group_id) = @_;
1337              
1338 0         0 my $CorrelationId = _get_CorrelationId();
1339 0         0 my $decoded_request = {
1340             CorrelationId => $CorrelationId,
1341             ClientId => q{},
1342             CoordinatorKey => $group_id,
1343             CoordinatorType => 0, # type is group
1344             };
1345 0 0       0 say STDERR format_message( '[%s] group coordinators request: %s',
1346             scalar( localtime ),
1347             $decoded_request,
1348             ) if $self->debug_level;
1349 0         0 my $encoded_request = $protocol{ $APIKEY_FINDCOORDINATOR }->{encode}->( $decoded_request );
1350              
1351 0         0 my $encoded_response_ref;
1352 0         0 my @brokers = $self->_get_interviewed_servers;
1353              
1354             # receive coordinator data
1355 0         0 foreach my $broker ( @brokers ) {
1356 0 0 0     0 last if $self->_connectIO( $broker )
      0        
1357             && $self->_sendIO( $broker, $encoded_request )
1358             && ( $encoded_response_ref = $self->_receiveIO( $broker ) );
1359             }
1360              
1361 0 0       0 unless ( $encoded_response_ref ) {
1362             # NOTE: it is possible to repeat the operation here
1363 0         0 return;
1364             }
1365              
1366 0         0 my $decoded_response = $protocol{ $APIKEY_FINDCOORDINATOR }->{decode}->( $encoded_response_ref );
1367 0 0       0 say STDERR format_message( '[%s] group coordinators: %s',
1368             scalar( localtime ),
1369             $decoded_response,
1370             ) if $self->debug_level;
1371 0 0 0     0 ( defined( $decoded_response->{CorrelationId} ) && $decoded_response->{CorrelationId} == $CorrelationId )
1372             # FATAL error
1373             or $self->_error( $ERROR_MISMATCH_CORRELATIONID );
1374             $decoded_response->{ErrorCode}
1375 0 0       0 and $self->_error( $decoded_response->{ErrorCode} );
1376              
1377 0         0 my $IO_cache = $self->{_IO_cache};
1378 0         0 my $server = $self->_build_server_name( @{ $decoded_response }{ 'Host', 'Port' } );
  0         0  
1379             $IO_cache->{ $server } = { # can add new servers
1380             IO => $IO_cache->{ $server }->{IO}, # IO or undef
1381             NodeId => $decoded_response->{NodeId},
1382             host => $decoded_response->{Host},
1383             port => $decoded_response->{Port},
1384 0         0 };
1385 0         0 $self->{_group_coordinators}->{ $group_id } = $server;
1386              
1387 0         0 return 1;
1388             }
1389              
1390             # Refresh metadata for given topic
1391             sub _update_metadata {
1392 125     125   882 my ( $self, $topic, $is_recursive_call ) = @_;
1393              
1394 125         990 my $CorrelationId = _get_CorrelationId();
1395 125   66     1975 my $decoded_request = {
1396             CorrelationId => $CorrelationId,
1397             ClientId => q{},
1398             topics => [
1399             $topic // (),
1400             ],
1401             };
1402 125 50       662 say STDERR format_message( '[%s] metadata request: %s',
1403             scalar( localtime ),
1404             $decoded_request,
1405             ) if $self->debug_level;
1406 125         1114 my $encoded_request = $protocol{ $APIKEY_METADATA }->{encode}->( $decoded_request );
1407              
1408 125         275 my $encoded_response_ref;
1409 125         529 my @brokers = $self->_get_interviewed_servers;
1410              
1411             # receive metadata
1412 125         290 foreach my $broker ( @brokers ) {
1413 131 100 66     560 last if $self->_connectIO( $broker )
      100        
1414             && $self->_sendIO( $broker, $encoded_request )
1415             && ( $encoded_response_ref = $self->_receiveIO( $broker ) );
1416             }
1417              
1418 124 100       365 unless ( $encoded_response_ref ) {
1419             # NOTE: it is possible to repeat the operation here
1420 5         58 return;
1421             }
1422              
1423 119         589 my $decoded_response = $protocol{ $APIKEY_METADATA }->{decode}->( $encoded_response_ref );
1424 119 50       389 say STDERR format_message( '[%s] metadata response: %s',
1425             scalar( localtime ),
1426             $decoded_response,
1427             ) if $self->debug_level;
1428 119 50 33     612 ( defined( $decoded_response->{CorrelationId} ) && $decoded_response->{CorrelationId} == $CorrelationId )
1429             # FATAL error
1430             or $self->_error( $ERROR_MISMATCH_CORRELATIONID );
1431              
1432 119 50       549 unless ( _ARRAY( $decoded_response->{Broker} ) ) {
1433 0 0       0 if ( $self->{AutoCreateTopicsEnable} ) {
1434 0         0 return $self->_attempt_update_metadata( $is_recursive_call, $topic, undef, $ERROR_NO_KNOWN_BROKERS );
1435             } else {
1436             # FATAL error
1437 0         0 $self->_error( $ERROR_NO_KNOWN_BROKERS, format_message( "topic='%s'", $topic ) );
1438             }
1439             }
1440              
1441 119         206 my $IO_cache = $self->{_IO_cache};
1442              
1443             # Clear the previous information about the NodeId in the IO cache
1444 119         594 $IO_cache->{ $_ }->{NodeId} = undef for @brokers;
1445              
1446             # In the IO cache update/add obtained server information
1447 119         214 foreach my $received_broker ( @{ $decoded_response->{Broker} } ) {
  119         315  
1448 357         482 my $server = $self->_build_server_name( @{ $received_broker }{ 'Host', 'Port' } );
  357         861  
1449             $IO_cache->{ $server } = { # can add new servers
1450             IO => $IO_cache->{ $server }->{IO}, # IO or undef
1451             NodeId => $received_broker->{NodeId},
1452             host => $received_broker->{Host},
1453             port => $received_broker->{Port},
1454 357         2084 };
1455             }
1456              
1457             #NOTE: IO cache does not remove server that's missing in metadata
1458              
1459             # Collect the received metadata
1460 119         308 my $received_metadata = {};
1461 119         253 my $leaders = {};
1462              
1463 119         267 my $ErrorCode = $ERROR_NO_ERROR;
1464 119         265 my( $TopicName, $partition );
1465             METADATA_CREATION:
1466 119         190 foreach my $topic_metadata ( @{ $decoded_response->{TopicMetadata} } ) {
  119         255  
1467 119         204 $TopicName = $topic_metadata->{TopicName};
1468 119         231 undef $partition;
1469             last METADATA_CREATION
1470 119 50       370 if ( $ErrorCode = $topic_metadata->{ErrorCode} ) != $ERROR_NO_ERROR;
1471              
1472 119         155 foreach my $partition_metadata ( @{ $topic_metadata->{PartitionMetadata} } ) {
  119         278  
1473 119         240 $partition = $partition_metadata->{Partition};
1474             last METADATA_CREATION
1475 119 50 33     332 if ( $ErrorCode = $partition_metadata->{ErrorCode} ) != $ERROR_NO_ERROR
1476             && $ErrorCode != $ERROR_REPLICA_NOT_AVAILABLE;
1477 119         195 $ErrorCode = $ERROR_NO_ERROR;
1478              
1479 119         439 my $received_partition_data = $received_metadata->{ $TopicName }->{ $partition } = {};
1480 119         385 my $leader = $received_partition_data->{Leader} = $partition_metadata->{Leader};
1481 119         206 $received_partition_data->{Replicas} = [ @{ $partition_metadata->{Replicas} } ];
  119         306  
1482 119         194 $received_partition_data->{Isr} = [ @{ $partition_metadata->{Isr} } ];
  119         287  
1483              
1484 119         339 $leaders->{ $leader } = $self->_find_leader_server( $leader );
1485             }
1486             }
1487 119 50       355 if ( $ErrorCode != $ERROR_NO_ERROR ) {
1488 0 0       0 if ( exists $RETRY_ON_ERRORS{ $ErrorCode } ) {
1489 0         0 return $self->_attempt_update_metadata( $is_recursive_call, $TopicName, $partition, $ErrorCode );
1490             } else {
1491             # FATAL error
1492 0 0       0 $self->_error( $ErrorCode, format_message( "topic='%s'%s", $TopicName, defined( $partition ) ? ", partition=$partition" : '' ) );
1493             }
1494             }
1495              
1496             # Update metadata for received topics
1497 119         208 $self->{_metadata}->{ $_ } = $received_metadata->{ $_ } foreach keys %{ $received_metadata };
  119         711  
1498 119         290 $self->{_leaders}->{ $_ } = $leaders->{ $_ } foreach keys %{ $leaders };
  119         511  
1499              
1500 119         1153 return 1;
1501             }
1502              
1503             # trying to get the metadata without error
1504             sub _attempt_update_metadata {
1505 0     0   0 my ( $self, $is_recursive_call, $topic, $partition, $error_code ) = @_;
1506              
1507 0 0       0 return if $is_recursive_call;
1508 0         0 $self->_remember_nonfatal_error( $error_code, $ERROR{ $error_code }, undef, $topic, $partition );
1509              
1510 0         0 my $attempts = $self->{SEND_MAX_ATTEMPTS};
1511             ATTEMPTS:
1512 0         0 while ( $attempts-- ) {
1513             say STDERR format_message( '[%s] sleeping for %d ms before making update metadata attempt #%d',
1514             scalar( localtime ),
1515             $self->{RETRY_BACKOFF},
1516 0 0       0 $self->{SEND_MAX_ATTEMPTS} - $attempts + 1,
1517             ) if $self->debug_level;
1518 0         0 Time::HiRes::sleep( $self->{RETRY_BACKOFF} / 1000 );
1519 0 0       0 return( 1 ) if $self->_update_metadata( $topic, 1 );
1520             }
1521             # FATAL error
1522 0 0       0 $self->_error( $error_code, format_message( "topic='%s'%s", $topic, defined( $partition ) ? ", partition=$partition" : '' ) );
1523              
1524 0         0 return;
1525             }
1526              
1527             # forms server identifier using supplied $host, $port
1528             sub _build_server_name {
1529 659     659   1233 my ( $self, $host, $port ) = @_;
1530              
1531 659 50       2004 $host = "[$host]" if is_ipv6( $host );
1532              
1533 659         10988 return "$host:$port";
1534             }
1535              
1536             # remembers error communicating with the server
1537             sub _on_io_error {
1538 20     20   56 my ( $self, $server_data, $error ) = @_;
1539 20         43 $server_data->{error} = $error;
1540 20 100       68 if( $server_data->{IO} ) {
1541 16         102 $server_data->{IO}->close;
1542 16         237 $server_data->{IO} = undef;
1543             }
1544              
1545 20 50 33     179 if( blessed( $error ) && $error->isa('Kafka::Exception') ) {
1546 20 100 66     507 if( $error->code == $ERROR_MISMATCH_ARGUMENT || $error->code == $ERROR_INCOMPATIBLE_HOST_IP_VERSION ) {
1547             # rethrow known fatal errors
1548 1         23 die $error;
1549             }
1550             } else {
1551             # rethrow all unknown errors
1552 0         0 die $error;
1553             }
1554              
1555 19         560 return;
1556             }
1557              
1558             sub _io_error {
1559 19     19   127 my( $self, $server ) = @_;
1560 19         32 my $error;
1561 19 50       50 if( my $server_data = $self->{_IO_cache}->{ $server } ) {
1562 19         32 $error = $server_data->{error};
1563             }
1564 19         72 return $error;
1565             }
1566              
1567             sub _is_IO_connected {
1568 10124     10124   12302 my ( $self, $server ) = @_;
1569 10124 50       14572 my $server_data = $self->{_IO_cache}->{ $server } or return;
1570 10124         16612 return $server_data->{IO};
1571             }
1572              
1573             # connects to a server (host:port or [IPv6_host]:port)
1574             sub _connectIO {
1575 10279     10279   12753 my ( $self, $server ) = @_;
1576              
1577 10279 50       15591 my $server_data = $self->{_IO_cache}->{ $server }
1578             or $self->_error( $ERROR_MISMATCH_ARGUMENT, format_message( "Unknown server '%s' (is not found in the metadata)", $server ) )
1579             ;
1580 10279 100       14683 unless( $server_data->{IO} ) {
1581 115         163 my $error;
1582 115 50       275 my $io_class = $self->{async} ? 'Kafka::IO::Async' : 'Kafka::IO';
1583             try {
1584             $server_data->{IO} = $io_class->new(
1585             host => $server_data->{host},
1586             port => $server_data->{port},
1587             timeout => $self->{Timeout},
1588             ip_version => $self->{ip_version},
1589 115     115   8667 );
1590 111         7509 $server_data->{error} = undef;
1591             } catch {
1592 4     4   5424 $error = $_;
1593 115         936 };
1594              
1595 115 100       1877 if( defined $error ) {
1596 4         114 $self->_on_io_error( $server_data, $error );
1597 4         14 return;
1598             }
1599 111 100 66     351 if ( defined $self->{sasl_username} && defined $self->{sasl_password} ) {
1600 1 50       5 unless ( $self->sasl_auth($server, Username => $self->{sasl_username}, Password => $self->{sasl_password}, Mechanizm => $self->{sasl_mechanizm}) ) {
1601 0         0 $self->_on_io_error( $server_data, 'Auth failed' );
1602 0         0 return;
1603             }
1604             }
1605             }
1606              
1607 10275         17616 return $server_data->{IO};
1608             }
1609              
1610             sub _server_data_IO {
1611 15532     15532   17485 my ( $self, $server ) = @_;
1612 15532 50       29735 my $server_data = $self->{_IO_cache}->{ $server }
1613             or $self->_error( $ERROR_MISMATCH_ARGUMENT, format_message( "Unknown server '%s' (is not found in the metadata)", $server ) )
1614             ;
1615             $self->_error( $ERROR_MISMATCH_ARGUMENT, format_message( "Server '%s' is not connected", $server ) )
1616             unless $server_data->{IO}
1617 15532 50       22888 ;
1618 15532         24589 return ( $server_data, $server_data->{IO} );
1619             }
1620              
1621             # Send encoded request ($encoded_request) to server ($server)
1622             sub _sendIO {
1623 10276     10276   14080 my ( $self, $server, $encoded_request ) = @_;
1624 10276         16218 my( $server_data, $io ) = $self->_server_data_IO( $server );
1625 10276         12489 my $sent;
1626             my $error;
1627             try {
1628 10276     10276   361114 $sent = $io->send( $encoded_request );
1629             } catch {
1630 11     11   16082 $error = $_;
1631 10276         48794 };
1632              
1633 10276 100       316460 if( defined $error ) {
1634 11         53 $self->_on_io_error( $server_data, $error );
1635             }
1636              
1637 10275         26593 return $sent;
1638             }
1639              
1640             # Receive response from a given server
1641             sub _receiveIO {
1642 5256     5256   8197 my ( $self, $server, $response_timeout ) = @_;
1643 5256         8257 my( $server_data, $io ) = $self->_server_data_IO( $server );
1644 5256         5850 my $response_ref;
1645             my $error;
1646             try {
1647 5256     5256   179215 $response_ref = $io->receive( 4, $response_timeout ); # response header must arrive within request-specific timeout if provided
1648 5251 50 33     25147 if ( $response_ref && length( $$response_ref ) == 4 ) {
1649             # received 4-byte response header with response size; try receiving the rest
1650 5251         15274 my $message_body_ref = $io->receive( unpack( 'l>', $$response_ref ) );
1651 5251         24284 $$response_ref .= $$message_body_ref;
1652             }
1653             } catch {
1654 5     5   6125 $error = $_;
1655 5256         25175 };
1656              
1657 5256 100       60282 if( defined $error ) {
1658 5         19 $self->_on_io_error( $server_data, $error );
1659             }
1660              
1661 5256         8471 return $response_ref;
1662             }
1663              
1664             # Close connectino to $server
1665             sub _closeIO {
1666 24     24   46 my ( $self, $server, $keep_error ) = @_;
1667              
1668 24 50       65 if ( my $server_data = $self->{_IO_cache}->{ $server } ) {
1669 24 100       61 if ( my $io = $server_data->{IO} ) {
1670 13         46 $io->close;
1671 13 50       140 $server_data->{error} = undef unless $keep_error;
1672 13         59 $server_data->{IO} = undef;
1673             }
1674             }
1675              
1676 24         38 return;
1677             }
1678              
1679             # check validity of an argument to match host:port format
1680             sub _is_like_server {
1681 268     268   483 my ( $self, $server ) = @_;
1682              
1683 268 100 100     1597 unless (
      100        
1684             defined( $server )
1685             && defined( _STRING( $server ) )
1686             && !utf8::is_utf8( $server )
1687             ) {
1688 45         164 return;
1689             }
1690              
1691 223         468 my ( $host, $port ) = _split_host_port( $server );
1692 223 100 66     765 unless ( ( is_hostname( $host ) || is_ipv4( $host ) || is_ipv6( $host ) ) && $port ) {
      66        
1693 50         1792 return;
1694             }
1695              
1696 173         10722 return $server;
1697             }
1698              
1699             # Handler for errors
1700             sub _error {
1701 226     226   668 my $self = shift;
1702 226         714 Kafka::Exception::Connection->throw( throw_args( @_ ) );
1703             }
1704              
1705             1;
1706              
1707             __END__
1708              
1709             =head1 DIAGNOSTICS
1710              
1711             When error is detected, an exception, represented by object of L<Kafka::Exception::Connection|Kafka::Exception::Connection> class,
1712             is thrown (see L<Kafka::Exceptions|Kafka::Exceptions>).
1713              
1714             L<code|Kafka::Exceptions/code> and a more descriptive L<message|Kafka::Exceptions/message> provide
1715             information about exception. Consult documentation of the L<Kafka::Exceptions|Kafka::Exceptions>
1716             for the list of all available methods.
1717              
1718             Here is the list of possible error messages that C<Kafka::Connection> may produce:
1719              
1720             =over 3
1721              
1722             =item C<Invalid argument>
1723              
1724             Invalid argument was provided to C<new> L<constructor|/CONSTRUCTOR> or to other L<method|/METHODS>.
1725              
1726             =item C<Cannot send>
1727              
1728             Request cannot be sent to Kafka.
1729              
1730             =item C<Cannot receive>
1731              
1732             Response cannot be received from Kafka.
1733              
1734             =item C<Cannot bind>
1735              
1736             A successful TCP connection cannot be established on given host and port.
1737              
1738             =item C<Cannot get metadata>
1739              
1740             Error detected during parsing of response from Kafka.
1741              
1742             =item C<Leader not found>
1743              
1744             Failed to locate leader of Kafka cluster.
1745              
1746             =item C<Mismatch CorrelationId>
1747              
1748             Mismatch of C<CorrelationId> of request and response.
1749              
1750             =item C<There are no known brokers>
1751              
1752             Failed to locate cluster broker.
1753              
1754             =item C<Cannot get metadata>
1755              
1756             Received meta data is incorrect or missing.
1757              
1758             =back
1759              
1760             =head2 Debug mode
1761              
1762             Debug output can be enabled by passing desired level via environment variable
1763             using one of the following ways:
1764              
1765             C<PERL_KAFKA_DEBUG=1> - debug is enabled for the whole L<Kafka|Kafka> package.
1766              
1767             C<PERL_KAFKA_DEBUG=Connection:1> - enable debug for C<Kafka::Connection> only.
1768              
1769             C<Kafka::Connection> prints to C<STDERR> information about non-fatal errors,
1770             re-connection attempts and such when debug level is set to 1 or higher.
1771              
1772             =head1 SEE ALSO
1773              
1774             The basic operation of the Kafka package modules:
1775              
1776             L<Kafka|Kafka> - constants and messages used by the Kafka package modules.
1777              
1778             L<Kafka::Connection|Kafka::Connection> - interface to connect to a Kafka cluster.
1779              
1780             L<Kafka::Producer|Kafka::Producer> - interface for producing client.
1781              
1782             L<Kafka::Consumer|Kafka::Consumer> - interface for consuming client.
1783              
1784             L<Kafka::Message|Kafka::Message> - interface to access Kafka message
1785             properties.
1786              
1787             L<Kafka::Int64|Kafka::Int64> - functions to work with 64 bit elements of the
1788             protocol on 32 bit systems.
1789              
1790             L<Kafka::Protocol|Kafka::Protocol> - functions to process messages in the
1791             Apache Kafka's Protocol.
1792              
1793             L<Kafka::IO|Kafka::IO> - low-level interface for communication with Kafka server.
1794              
1795             L<Kafka::Exceptions|Kafka::Exceptions> - module designated to handle Kafka exceptions.
1796              
1797             L<Kafka::Internals|Kafka::Internals> - internal constants and functions used
1798             by several package modules.
1799              
1800             A wealth of detail about the Apache Kafka and the Kafka Protocol:
1801              
1802             Main page at L<http://kafka.apache.org/>
1803              
1804             Kafka Protocol at L<https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol>
1805              
1806             =head1 SOURCE CODE
1807              
1808             Kafka package is hosted on GitHub:
1809             L<https://github.com/TrackingSoft/Kafka>
1810              
1811             =head1 AUTHOR
1812              
1813             Sergey Gladkov
1814              
1815             Please use GitHub project link above to report problems or contact authors.
1816              
1817             =head1 CONTRIBUTORS
1818              
1819             Alexander Solovey
1820              
1821             Jeremy Jordan
1822              
1823             Sergiy Zuban
1824              
1825             Vlad Marchenko
1826              
1827             Damien Krotkine
1828              
1829             Greg Franklin
1830              
1831             =head1 COPYRIGHT AND LICENSE
1832              
1833             Copyright (C) 2012-2017 by TrackingSoft LLC.
1834              
1835             This package is free software; you can redistribute it and/or modify it under
1836             the same terms as Perl itself. See I<perlartistic> at
1837             L<http://dev.perl.org/licenses/artistic.html>.
1838              
1839             This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
1840             without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
1841             PARTICULAR PURPOSE.
1842              
1843             =cut
1844