line |
true |
false |
branch |
467
|
1001 |
0 |
if (exists $self->{$attr}) { }
|
476
|
8 |
226 |
unless defined $self->{'host'} and $self->{'host'} eq '' || defined &_STRING($self->{'host'})
|
478
|
1 |
225 |
if utf8::is_utf8($self->{'host'})
|
480
|
15 |
210 |
unless &_POSINT($self->{'port'})
|
482
|
0 |
200 |
unless not defined $self->{'Timeout'} or defined &_NUMBER($self->{'Timeout'}) and int 1000 * $self->{'Timeout'} >= 1 and int $self->{'Timeout'} * 1000 <= $Kafka::Connection::MAX_INT32
|
484
|
16 |
184 |
unless &_ARRAY0($self->{'broker_list'})
|
486
|
15 |
169 |
unless &_POSINT($self->{'SEND_MAX_ATTEMPTS'})
|
488
|
15 |
154 |
unless &_POSINT($self->{'RETRY_BACKOFF'})
|
490
|
0 |
154 |
unless defined &_NONNEGINT($self->{'MaxLoggedErrors'})
|
493
|
2 |
2 |
unless not defined $ip_version or defined &_NONNEGINT($ip_version) and $ip_version == $Kafka::Connection::IP_V4 || $ip_version == $Kafka::Connection::IP_V6
|
531
|
151 |
1 |
$self->{'host'} ? :
|
532
|
19 |
151 |
unless $self->_is_like_server($server)
|
544
|
1 |
132 |
unless keys %$IO_cache
|
575
|
0 |
10124 |
unless defined $server_metadata
|
580
|
10028 |
96 |
if defined $server_metadata->{'_api_versions'}
|
587
|
96 |
0 |
if $self->{'dont_load_supported_api_versions'}
|
602
|
0 |
0 |
if (defined $error)
|
603
|
0 |
0 |
if (&blessed($error) and $error->isa('Kafka::Exception')) { }
|
604
|
0 |
0 |
if ($error->code == $Kafka::Connection::ERROR_MISMATCH_ARGUMENT)
|
625
|
0 |
0 |
if $version > $implemented_max_version
|
627
|
0 |
0 |
if $version < $kafka_min_version
|
647
|
0 |
0 |
if $self->debug_level
|
658
|
0 |
0 |
unless $self->_connectIO($broker)
|
660
|
0 |
0 |
unless my $sent = $self->_sendIO($broker, $encoded_request)
|
665
|
0 |
0 |
unless ($encoded_response_ref)
|
671
|
0 |
0 |
if $self->debug_level
|
675
|
0 |
0 |
unless defined $decoded_response->{'CorrelationId'} and $decoded_response->{'CorrelationId'} == $CorrelationId
|
682
|
0 |
0 |
unless $ErrorCode == $Kafka::Connection::ERROR_NO_ERROR
|
700
|
0 |
1 |
unless defined $username and defined &_STRING($username) and not utf8::is_utf8($username)
|
702
|
0 |
1 |
unless defined $password and defined &_STRING($password) and not utf8::is_utf8($password)
|
714
|
0 |
1 |
unless $self->_sendIO($broker, $encoded_request) and my $encoded_response_ref = $self->_receiveIO($broker)
|
717
|
1 |
0 |
if ($mechanizm =~ /SCRAM-(.*)$/) { }
|
|
0 |
0 |
elsif ($mechanizm eq 'PLAIN') { }
|
726
|
0 |
1 |
unless $self->_sendIO($broker, $client_first)
|
729
|
0 |
1 |
unless $encoded_response_ref
|
733
|
0 |
1 |
unless $self->_sendIO($broker, $client_final)
|
735
|
0 |
1 |
unless $encoded_response_ref
|
747
|
0 |
0 |
unless $$encoded_sasl_resp_len_ref
|
750
|
0 |
0 |
unless $sasl_resp_len
|
789
|
0 |
1 |
unless not defined $topic or ($topic eq '' or defined &_STRING($topic))
|
791
|
0 |
2 |
if utf8::is_utf8($topic)
|
794
|
0 |
0 |
$topic ? :
|
|
0 |
2 |
unless $self->_update_metadata($topic)
|
799
|
1 |
1 |
if (defined $topic) { }
|
818
|
38 |
5 |
unless $self->_is_like_server($server)
|
830
|
19 |
3 |
unless $self->_is_like_server($server)
|
833
|
0 |
3 |
unless $self->get_known_servers
|
838
|
1 |
2 |
unless exists $io_cache->{$server}
|
840
|
2 |
0 |
if (my $io = $self->_connectIO($server)) { }
|
851
|
19 |
14 |
unless $self->_is_like_server($server)
|
856
|
8 |
6 |
unless (exists $io_cache->{$server} and $io = $io_cache->{$server}{'IO'})
|
926
|
52 |
10046 |
if (not %{$self->{'_metadata'};} or not $self->{'AutoCreateTopicsEnable'} and defined $topic_name and not exists $self->{'_metadata'}{$topic_name})
|
930
|
2 |
49 |
unless $self->_update_metadata($topic_name)
|
936
|
0 |
10095 |
unless exists $request->{'CorrelationId'}
|
938
|
0 |
10095 |
if $self->debug_level
|
956
|
10146 |
0 |
if ($host_to_send_to eq 'leader') { }
|
|
0 |
0 |
elsif ($host_to_send_to eq 'group_coordinator') { }
|
959
|
0 |
10146 |
unless defined $leader
|
962
|
0 |
10146 |
unless ($server)
|
969
|
0 |
0 |
if (not %{$self->{'_group_coordinators'};} and defined $group_id)
|
974
|
0 |
0 |
unless ($server)
|
984
|
10142 |
4 |
if ($self->_connectIO($server)) { }
|
992
|
10124 |
18 |
unless (defined $request->{'ApiVersion'})
|
995
|
0 |
10124 |
unless ($self->_is_IO_connected($server))
|
1003
|
2 |
10140 |
unless ($self->_sendIO($server, $encoded_request))
|
1005
|
2 |
0 |
$io_error ? :
|
1011
|
4 |
0 |
$io_error ? :
|
1014
|
6 |
10140 |
if ($ErrorCode != $Kafka::Connection::ERROR_NO_ERROR)
|
1017
|
0 |
6 |
if ($api_key == $Kafka::Connection::APIKEY_PRODUCE and not $ErrorCode == $Kafka::Connection::ERROR_CANNOT_BIND || $ErrorCode == $Kafka::Connection::ERROR_NO_CONNECTION)
|
1026
|
5009 |
5131 |
if ($api_key == $Kafka::Connection::APIKEY_PRODUCE and $request->{'RequiredAcks'} == $Kafka::Connection::NOT_SEND_ANY_RESPONSE) { }
|
1045
|
2 |
5129 |
unless ($encoded_response_ref)
|
1046
|
1 |
1 |
if ($api_key == $Kafka::Connection::APIKEY_PRODUCE) { }
|
1061
|
5128 |
1 |
if (length $$encoded_response_ref > 4) { }
|
1065
|
0 |
5128 |
if $self->debug_level
|
1077
|
0 |
10137 |
unless $response->{'CorrelationId'} == $request->{'CorrelationId'}
|
1079
|
21 |
10116 |
$api_key == $Kafka::Connection::APIKEY_OFFSET ? :
|
1083
|
10053 |
84 |
if $ErrorCode == $Kafka::Connection::ERROR_NO_ERROR
|
1085
|
0 |
84 |
if ($api_key == $Kafka::Connection::APIKEY_PRODUCE and $ErrorCode == $Kafka::Connection::ERROR_REQUEST_TIMED_OUT)
|
1093
|
64 |
20 |
if (exists $RETRY_ON_ERRORS{$ErrorCode})
|
1105
|
0 |
0 |
$ErrorCode == $Kafka::Connection::ERROR_NO_ERROR ? :
|
|
0 |
71 |
if $self->debug_level
|
1111
|
3 |
68 |
unless $self->_update_metadata($topic_name)
|
1115
|
0 |
68 |
if ($host_to_send_to eq 'group_coordinator')
|
1121
|
17 |
0 |
if ($ErrorCode) { }
|
1122
|
16 |
1 |
$partition_data ? :
|
1151
|
0 |
3 |
unless defined $topic and $topic eq '' || defined &_STRING($topic)
|
1153
|
0 |
3 |
if utf8::is_utf8($topic)
|
1155
|
0 |
3 |
unless defined $partition and &isint($partition) and $partition >= 0
|
1158
|
0 |
3 |
unless (%{$self->{'_metadata'};})
|
1159
|
0 |
0 |
unless $self->_update_metadata($topic)
|
1175
|
0 |
3 |
unless ($self->is_server_known($server))
|
1213
|
3 |
1 |
if (my $error = $self->_io_error($server))
|
1260
|
0 |
374 |
if $host and $host =~ /^\[(.+)\]$/
|
1272
|
0 |
71 |
unless my $max_logged_errors = $self->{'MaxLoggedErrors'}
|
1275
|
0 |
71 |
if scalar @{$self->{'_nonfatal_errors'};} == $max_logged_errors
|
1278
|
0 |
0 |
defined $error_code && exists $Kafka::Connection::ERROR{$error_code} ? :
|
1285
|
0 |
71 |
if $self->debug_level
|
1302
|
119 |
132 |
if (defined $NodeId and $NodeId == $node_id)
|
1320
|
213 |
54 |
if (defined $server_data->{'NodeId'}) { }
|
1321
|
135 |
78 |
if ($server_data->{'IO'}) { }
|
1345
|
0 |
0 |
if $self->debug_level
|
1356
|
0 |
0 |
if $self->_connectIO($broker) and $self->_sendIO($broker, $encoded_request) and $encoded_response_ref = $self->_receiveIO($broker)
|
1361
|
0 |
0 |
unless ($encoded_response_ref)
|
1367
|
0 |
0 |
if $self->debug_level
|
1371
|
0 |
0 |
unless defined $decoded_response->{'CorrelationId'} and $decoded_response->{'CorrelationId'} == $CorrelationId
|
1375
|
0 |
0 |
if $decoded_response->{'ErrorCode'}
|
1402
|
0 |
125 |
if $self->debug_level
|
1413
|
119 |
11 |
if $self->_connectIO($broker) and $self->_sendIO($broker, $encoded_request) and $encoded_response_ref = $self->_receiveIO($broker)
|
1418
|
5 |
119 |
unless ($encoded_response_ref)
|
1424
|
0 |
119 |
if $self->debug_level
|
1428
|
0 |
119 |
unless defined $decoded_response->{'CorrelationId'} and $decoded_response->{'CorrelationId'} == $CorrelationId
|
1432
|
0 |
119 |
unless (&_ARRAY($decoded_response->{'Broker'}))
|
1433
|
0 |
0 |
if ($self->{'AutoCreateTopicsEnable'}) { }
|
1470
|
0 |
119 |
if ($ErrorCode = $topic_metadata->{'ErrorCode'}) != $Kafka::Connection::ERROR_NO_ERROR
|
1475
|
0 |
119 |
if ($ErrorCode = $partition_metadata->{'ErrorCode'}) != $Kafka::Connection::ERROR_NO_ERROR and $ErrorCode != $Kafka::Connection::ERROR_REPLICA_NOT_AVAILABLE
|
1487
|
0 |
119 |
if ($ErrorCode != $Kafka::Connection::ERROR_NO_ERROR)
|
1488
|
0 |
0 |
if (exists $RETRY_ON_ERRORS{$ErrorCode}) { }
|
1492
|
0 |
0 |
defined $partition ? :
|
1507
|
0 |
0 |
if $is_recursive_call
|
1516
|
0 |
0 |
if $self->debug_level
|
1519
|
0 |
0 |
if $self->_update_metadata($topic, 1)
|
1522
|
0 |
0 |
defined $partition ? :
|
1531
|
0 |
659 |
if is_ipv6($host)
|
1540
|
16 |
4 |
if ($server_data->{'IO'})
|
1545
|
20 |
0 |
if (&blessed($error) and $error->isa('Kafka::Exception')) { }
|
1546
|
1 |
19 |
if ($error->code == $Kafka::Connection::ERROR_MISMATCH_ARGUMENT or $error->code == $Kafka::Connection::ERROR_INCOMPATIBLE_HOST_IP_VERSION)
|
1561
|
19 |
0 |
if (my $server_data = $self->{'_IO_cache'}{$server})
|
1569
|
0 |
10124 |
unless my $server_data = $self->{'_IO_cache'}{$server}
|
1577
|
0 |
10279 |
unless my $server_data = $self->{'_IO_cache'}{$server}
|
1580
|
115 |
10164 |
unless ($server_data->{'IO'})
|
1582
|
0 |
115 |
$self->{'async'} ? :
|
1595
|
4 |
111 |
if (defined $error)
|
1599
|
1 |
110 |
if (defined $self->{'sasl_username'} and defined $self->{'sasl_password'})
|
1600
|
0 |
1 |
unless ($self->sasl_auth($server, 'Username', $self->{'sasl_username'}, 'Password', $self->{'sasl_password'}, 'Mechanizm', $self->{'sasl_mechanizm'}))
|
1612
|
0 |
15532 |
unless my $server_data = $self->{'_IO_cache'}{$server}
|
1617
|
0 |
15532 |
unless $server_data->{'IO'}
|
1633
|
11 |
10265 |
if (defined $error)
|
1648
|
5251 |
0 |
if ($response_ref and length $$response_ref == 4)
|
1657
|
5 |
5251 |
if (defined $error)
|
1668
|
24 |
0 |
if (my $server_data = $self->{'_IO_cache'}{$server})
|
1669
|
13 |
11 |
if (my $io = $server_data->{'IO'})
|
1671
|
13 |
0 |
unless $keep_error
|
1683
|
40 |
223 |
unless (defined $server and defined &_STRING($server) and not utf8::is_utf8($server))
|
1692
|
50 |
173 |
unless (is_hostname($host) || is_ipv4($host) || is_ipv6($host) and $port)
|