line |
true |
false |
branch |
448
|
8 |
224 |
unless defined $self->{'host'} and $self->{'host'} eq '' || defined &_STRING($self->{'host'}) and not utf8::is_utf8($self->{'host'}) |
450
|
15 |
209 |
unless &_POSINT($self->{'port'}) |
452
|
0 |
199 |
unless not defined $self->{'timeout'} or defined &_NUMBER($self->{'timeout'}) and int 1000 * $self->{'timeout'} >= 1 and int $self->{'timeout'} * 1000 <= $Kafka::Connection::MAX_INT32 |
454
|
16 |
183 |
unless &_ARRAY0($self->{'broker_list'}) |
456
|
15 |
168 |
unless &_POSINT($self->{'SEND_MAX_ATTEMPTS'}) |
458
|
15 |
153 |
unless &_POSINT($self->{'RETRY_BACKOFF'}) |
460
|
0 |
153 |
unless defined &_NONNEGINT($self->{'MaxLoggedErrors'}) |
463
|
2 |
2 |
unless not defined $ip_version or defined &_NONNEGINT($ip_version) and $ip_version == $Kafka::Connection::IP_V4 || $ip_version == $Kafka::Connection::IP_V6 |
501
|
150 |
1 |
$self->{'host'} ? : |
502
|
19 |
150 |
unless $self->_is_like_server($server) |
514
|
1 |
131 |
unless keys %$IO_cache |
545
|
0 |
10124 |
unless defined $server_metadata |
550
|
10028 |
96 |
if defined $server_metadata->{'_api_versions'} |
557
|
96 |
0 |
if $self->{'dont_load_supported_api_versions'} |
572
|
0 |
0 |
if (defined $error) |
573
|
0 |
0 |
if (&blessed($error) and $error->isa('Kafka::Exception')) { } |
574
|
0 |
0 |
if ($error->code == $Kafka::Connection::ERROR_MISMATCH_ARGUMENT) |
595
|
0 |
0 |
if $version > $implemented_max_version |
597
|
0 |
0 |
if $version < $kafka_min_version |
617
|
0 |
0 |
if $self->debug_level |
628
|
0 |
0 |
unless $self->_connectIO($broker) |
630
|
0 |
0 |
unless my $sent = $self->_sendIO($broker, $encoded_request) |
635
|
0 |
0 |
unless ($encoded_response_ref) |
641
|
0 |
0 |
if $self->debug_level |
645
|
0 |
0 |
unless defined $decoded_response->{'CorrelationId'} and $decoded_response->{'CorrelationId'} == $CorrelationId |
652
|
0 |
0 |
unless $ErrorCode == $Kafka::Connection::ERROR_NO_ERROR |
690
|
0 |
1 |
unless not defined $topic or $topic eq '' || defined &_STRING($topic) and not utf8::is_utf8($topic) |
693
|
0 |
1 |
unless $self->_update_metadata($topic) |
698
|
1 |
0 |
if (defined $topic) { } |
717
|
38 |
5 |
unless $self->_is_like_server($server) |
729
|
19 |
3 |
unless $self->_is_like_server($server) |
732
|
0 |
3 |
unless $self->get_known_servers |
737
|
1 |
2 |
unless exists $io_cache->{$server} |
739
|
2 |
0 |
if (my $io = $self->_connectIO($server)) { } |
750
|
19 |
14 |
unless $self->_is_like_server($server) |
755
|
8 |
6 |
unless (exists $io_cache->{$server} and $io = $io_cache->{$server}{'IO'}) |
825
|
52 |
10046 |
if (not %{$self->{'_metadata'};} or not $self->{'AutoCreateTopicsEnable'} and defined $topic_name and not exists $self->{'_metadata'}{$topic_name}) |
829
|
2 |
49 |
unless $self->_update_metadata($topic_name) |
835
|
0 |
10095 |
unless exists $request->{'CorrelationId'} |
837
|
0 |
10095 |
if $self->debug_level |
855
|
10146 |
0 |
if ($host_to_send_to eq 'leader') { } |
|
0 |
0 |
elsif ($host_to_send_to eq 'group_coordinator') { } |
858
|
0 |
10146 |
unless defined $leader |
861
|
0 |
10146 |
unless ($server) |
868
|
0 |
0 |
if (not %{$self->{'_group_coordinators'};} and defined $group_id) |
873
|
0 |
0 |
unless ($server) |
883
|
10142 |
4 |
if ($self->_connectIO($server)) { } |
891
|
10124 |
18 |
unless (defined $request->{'ApiVersion'}) |
894
|
0 |
10124 |
unless ($self->_is_IO_connected($server)) |
902
|
2 |
10140 |
unless ($self->_sendIO($server, $encoded_request)) |
904
|
2 |
0 |
$io_error ? : |
910
|
4 |
0 |
$io_error ? : |
913
|
6 |
10140 |
if ($ErrorCode != $Kafka::Connection::ERROR_NO_ERROR) |
916
|
0 |
6 |
if ($api_key == $Kafka::Connection::APIKEY_PRODUCE and not $ErrorCode == $Kafka::Connection::ERROR_CANNOT_BIND || $ErrorCode == $Kafka::Connection::ERROR_NO_CONNECTION) |
925
|
5009 |
5131 |
if ($api_key == $Kafka::Connection::APIKEY_PRODUCE and $request->{'RequiredAcks'} == $Kafka::Connection::NOT_SEND_ANY_RESPONSE) { } |
944
|
2 |
5129 |
unless ($encoded_response_ref) |
945
|
1 |
1 |
if ($api_key == $Kafka::Connection::APIKEY_PRODUCE) { } |
960
|
5128 |
1 |
if (length $$encoded_response_ref > 4) { } |
964
|
0 |
5128 |
if $self->debug_level |
976
|
0 |
10137 |
unless $response->{'CorrelationId'} == $request->{'CorrelationId'} |
978
|
21 |
10116 |
$api_key == $Kafka::Connection::APIKEY_OFFSET ? : |
982
|
10053 |
84 |
if $ErrorCode == $Kafka::Connection::ERROR_NO_ERROR |
984
|
0 |
84 |
if ($api_key == $Kafka::Connection::APIKEY_PRODUCE and $ErrorCode == $Kafka::Connection::ERROR_REQUEST_TIMED_OUT) |
992
|
64 |
20 |
if (exists $RETRY_ON_ERRORS{$ErrorCode}) |
1004
|
0 |
0 |
$ErrorCode == $Kafka::Connection::ERROR_NO_ERROR ? : |
|
0 |
71 |
if $self->debug_level |
1010
|
3 |
68 |
unless $self->_update_metadata($topic_name) |
1014
|
0 |
68 |
if ($host_to_send_to eq 'group_coordinator') |
1020
|
17 |
0 |
if ($ErrorCode) { } |
1021
|
16 |
1 |
$partition_data ? : |
1050
|
0 |
3 |
unless defined $topic and $topic eq '' || defined &_STRING($topic) and not utf8::is_utf8($topic) |
1052
|
0 |
3 |
unless defined $partition and &isint($partition) and $partition >= 0 |
1055
|
0 |
3 |
unless (%{$self->{'_metadata'};}) |
1056
|
0 |
0 |
unless $self->_update_metadata($topic) |
1072
|
0 |
3 |
unless ($self->is_server_known($server)) |
1110
|
3 |
1 |
if (my $error = $self->_io_error($server)) |
1157
|
0 |
372 |
if $host and $host =~ /^\[(.+)\]$/ |
1169
|
0 |
71 |
unless my $max_logged_errors = $self->{'MaxLoggedErrors'} |
1172
|
0 |
71 |
if scalar @{$self->{'_nonfatal_errors'};} == $max_logged_errors |
1175
|
0 |
0 |
defined $error_code && exists $Kafka::Connection::ERROR{$error_code} ? : |
1182
|
0 |
71 |
if $self->debug_level |
1199
|
118 |
109 |
if (defined $NodeId and $NodeId == $node_id) |
1217
|
213 |
53 |
if (defined $server_data->{'NodeId'}) { } |
1218
|
135 |
78 |
if ($server_data->{'IO'}) { } |
1242
|
0 |
0 |
if $self->debug_level |
1253
|
0 |
0 |
if $self->_connectIO($broker) and $self->_sendIO($broker, $encoded_request) and $encoded_response_ref = $self->_receiveIO($broker) |
1258
|
0 |
0 |
unless ($encoded_response_ref) |
1264
|
0 |
0 |
if $self->debug_level |
1268
|
0 |
0 |
unless defined $decoded_response->{'CorrelationId'} and $decoded_response->{'CorrelationId'} == $CorrelationId |
1272
|
0 |
0 |
if $decoded_response->{'ErrorCode'} |
1299
|
0 |
124 |
if $self->debug_level |
1310
|
118 |
11 |
if $self->_connectIO($broker) and $self->_sendIO($broker, $encoded_request) and $encoded_response_ref = $self->_receiveIO($broker) |
1315
|
5 |
118 |
unless ($encoded_response_ref) |
1321
|
0 |
118 |
if $self->debug_level |
1325
|
0 |
118 |
unless defined $decoded_response->{'CorrelationId'} and $decoded_response->{'CorrelationId'} == $CorrelationId |
1329
|
0 |
118 |
unless (&_ARRAY($decoded_response->{'Broker'})) |
1330
|
0 |
0 |
if ($self->{'AutoCreateTopicsEnable'}) { } |
1367
|
0 |
118 |
if ($ErrorCode = $topic_metadata->{'ErrorCode'}) != $Kafka::Connection::ERROR_NO_ERROR |
1372
|
0 |
118 |
if ($ErrorCode = $partition_metadata->{'ErrorCode'}) != $Kafka::Connection::ERROR_NO_ERROR and $ErrorCode != $Kafka::Connection::ERROR_REPLICA_NOT_AVAILABLE |
1384
|
0 |
118 |
if ($ErrorCode != $Kafka::Connection::ERROR_NO_ERROR) |
1385
|
0 |
0 |
if (exists $RETRY_ON_ERRORS{$ErrorCode}) { } |
1389
|
0 |
0 |
defined $partition ? : |
1404
|
0 |
0 |
if $is_recursive_call |
1413
|
0 |
0 |
if $self->debug_level |
1416
|
0 |
0 |
if $self->_update_metadata($topic, 1) |
1419
|
0 |
0 |
defined $partition ? : |
1428
|
0 |
654 |
if is_ipv6($host) |
1437
|
16 |
4 |
if ($server_data->{'IO'}) |
1442
|
20 |
0 |
if (&blessed($error) and $error->isa('Kafka::Exception')) { } |
1443
|
1 |
19 |
if ($error->code == $Kafka::Connection::ERROR_MISMATCH_ARGUMENT or $error->code == $Kafka::Connection::ERROR_INCOMPATIBLE_HOST_IP_VERSION) |
1458
|
19 |
0 |
if (my $server_data = $self->{'_IO_cache'}{$server}) |
1466
|
0 |
10124 |
unless my $server_data = $self->{'_IO_cache'}{$server} |
1474
|
0 |
10278 |
unless my $server_data = $self->{'_IO_cache'}{$server} |
1477
|
114 |
10164 |
unless ($server_data->{'IO'}) |
1491
|
4 |
110 |
if (defined $error) |
1502
|
0 |
15524 |
unless my $server_data = $self->{'_IO_cache'}{$server} |
1507
|
0 |
15524 |
unless $server_data->{'IO'} |
1523
|
11 |
10261 |
if (defined $error) |
1538
|
5247 |
0 |
if ($response_ref and length $$response_ref == 4) |
1547
|
5 |
5247 |
if (defined $error) |
1558
|
24 |
0 |
if (my $server_data = $self->{'_IO_cache'}{$server}) |
1559
|
13 |
11 |
if (my $io = $server_data->{'IO'}) |
1561
|
13 |
0 |
unless $keep_error |
1573
|
40 |
222 |
unless (defined $server and defined &_STRING($server) and not utf8::is_utf8($server)) |
1582
|
50 |
172 |
unless (is_hostname($host) || is_ipv4($host) || is_ipv6($host) and $port) |