Branch Coverage

lib/Kafka/Connection.pm
Criterion Covered Total %
branch 157 288 54.5


line true false branch
467 1001 0 if (exists $self->{$attr}) { }
476 8 226 unless defined $self->{'host'} and $self->{'host'} eq '' || defined &_STRING($self->{'host'})
478 1 225 if utf8::is_utf8($self->{'host'})
480 15 210 unless &_POSINT($self->{'port'})
482 0 200 unless not defined $self->{'Timeout'} or defined &_NUMBER($self->{'Timeout'}) and int 1000 * $self->{'Timeout'} >= 1 and int $self->{'Timeout'} * 1000 <= $Kafka::Connection::MAX_INT32
484 16 184 unless &_ARRAY0($self->{'broker_list'})
486 15 169 unless &_POSINT($self->{'SEND_MAX_ATTEMPTS'})
488 15 154 unless &_POSINT($self->{'RETRY_BACKOFF'})
490 0 154 unless defined &_NONNEGINT($self->{'MaxLoggedErrors'})
493 2 2 unless not defined $ip_version or defined &_NONNEGINT($ip_version) and $ip_version == $Kafka::Connection::IP_V4 || $ip_version == $Kafka::Connection::IP_V6
531 151 1 $self->{'host'} ? :
532 19 151 unless $self->_is_like_server($server)
544 1 132 unless keys %$IO_cache
575 0 10124 unless defined $server_metadata
580 10028 96 if defined $server_metadata->{'_api_versions'}
587 96 0 if $self->{'dont_load_supported_api_versions'}
602 0 0 if (defined $error)
603 0 0 if (&blessed($error) and $error->isa('Kafka::Exception')) { }
604 0 0 if ($error->code == $Kafka::Connection::ERROR_MISMATCH_ARGUMENT)
625 0 0 if $version > $implemented_max_version
627 0 0 if $version < $kafka_min_version
647 0 0 if $self->debug_level
658 0 0 unless $self->_connectIO($broker)
660 0 0 unless my $sent = $self->_sendIO($broker, $encoded_request)
665 0 0 unless ($encoded_response_ref)
671 0 0 if $self->debug_level
675 0 0 unless defined $decoded_response->{'CorrelationId'} and $decoded_response->{'CorrelationId'} == $CorrelationId
682 0 0 unless $ErrorCode == $Kafka::Connection::ERROR_NO_ERROR
700 0 1 unless defined $username and defined &_STRING($username) and not utf8::is_utf8($username)
702 0 1 unless defined $password and defined &_STRING($password) and not utf8::is_utf8($password)
714 0 1 unless $self->_sendIO($broker, $encoded_request) and my $encoded_response_ref = $self->_receiveIO($broker)
717 1 0 if ($mechanizm =~ /SCRAM-(.*)$/) { }
0 0 elsif ($mechanizm eq 'PLAIN') { }
726 0 1 unless $self->_sendIO($broker, $client_first)
729 0 1 unless $encoded_response_ref
733 0 1 unless $self->_sendIO($broker, $client_final)
735 0 1 unless $encoded_response_ref
747 0 0 unless $$encoded_sasl_resp_len_ref
750 0 0 unless $sasl_resp_len
789 0 1 unless not defined $topic or ($topic eq '' or defined &_STRING($topic))
791 0 2 if utf8::is_utf8($topic)
794 0 0 $topic ? :
0 2 unless $self->_update_metadata($topic)
799 1 1 if (defined $topic) { }
818 38 5 unless $self->_is_like_server($server)
830 19 3 unless $self->_is_like_server($server)
833 0 3 unless $self->get_known_servers
838 1 2 unless exists $io_cache->{$server}
840 2 0 if (my $io = $self->_connectIO($server)) { }
851 19 14 unless $self->_is_like_server($server)
856 8 6 unless (exists $io_cache->{$server} and $io = $io_cache->{$server}{'IO'})
926 52 10046 if (not %{$self->{'_metadata'};} or not $self->{'AutoCreateTopicsEnable'} and defined $topic_name and not exists $self->{'_metadata'}{$topic_name})
930 2 49 unless $self->_update_metadata($topic_name)
936 0 10095 unless exists $request->{'CorrelationId'}
938 0 10095 if $self->debug_level
956 10146 0 if ($host_to_send_to eq 'leader') { }
0 0 elsif ($host_to_send_to eq 'group_coordinator') { }
959 0 10146 unless defined $leader
962 0 10146 unless ($server)
969 0 0 if (not %{$self->{'_group_coordinators'};} and defined $group_id)
974 0 0 unless ($server)
984 10142 4 if ($self->_connectIO($server)) { }
992 10124 18 unless (defined $request->{'ApiVersion'})
995 0 10124 unless ($self->_is_IO_connected($server))
1003 2 10140 unless ($self->_sendIO($server, $encoded_request))
1005 2 0 $io_error ? :
1011 4 0 $io_error ? :
1014 6 10140 if ($ErrorCode != $Kafka::Connection::ERROR_NO_ERROR)
1017 0 6 if ($api_key == $Kafka::Connection::APIKEY_PRODUCE and not $ErrorCode == $Kafka::Connection::ERROR_CANNOT_BIND || $ErrorCode == $Kafka::Connection::ERROR_NO_CONNECTION)
1026 5009 5131 if ($api_key == $Kafka::Connection::APIKEY_PRODUCE and $request->{'RequiredAcks'} == $Kafka::Connection::NOT_SEND_ANY_RESPONSE) { }
1045 2 5129 unless ($encoded_response_ref)
1046 1 1 if ($api_key == $Kafka::Connection::APIKEY_PRODUCE) { }
1061 5128 1 if (length $$encoded_response_ref > 4) { }
1065 0 5128 if $self->debug_level
1077 0 10137 unless $response->{'CorrelationId'} == $request->{'CorrelationId'}
1079 21 10116 $api_key == $Kafka::Connection::APIKEY_OFFSET ? :
1083 10053 84 if $ErrorCode == $Kafka::Connection::ERROR_NO_ERROR
1085 0 84 if ($api_key == $Kafka::Connection::APIKEY_PRODUCE and $ErrorCode == $Kafka::Connection::ERROR_REQUEST_TIMED_OUT)
1093 64 20 if (exists $RETRY_ON_ERRORS{$ErrorCode})
1105 0 0 $ErrorCode == $Kafka::Connection::ERROR_NO_ERROR ? :
0 71 if $self->debug_level
1111 3 68 unless $self->_update_metadata($topic_name)
1115 0 68 if ($host_to_send_to eq 'group_coordinator')
1121 17 0 if ($ErrorCode) { }
1122 16 1 $partition_data ? :
1151 0 3 unless defined $topic and $topic eq '' || defined &_STRING($topic)
1153 0 3 if utf8::is_utf8($topic)
1155 0 3 unless defined $partition and &isint($partition) and $partition >= 0
1158 0 3 unless (%{$self->{'_metadata'};})
1159 0 0 unless $self->_update_metadata($topic)
1175 0 3 unless ($self->is_server_known($server))
1213 3 1 if (my $error = $self->_io_error($server))
1260 0 374 if $host and $host =~ /^\[(.+)\]$/
1272 0 71 unless my $max_logged_errors = $self->{'MaxLoggedErrors'}
1275 0 71 if scalar @{$self->{'_nonfatal_errors'};} == $max_logged_errors
1278 0 0 defined $error_code && exists $Kafka::Connection::ERROR{$error_code} ? :
1285 0 71 if $self->debug_level
1302 119 132 if (defined $NodeId and $NodeId == $node_id)
1320 213 54 if (defined $server_data->{'NodeId'}) { }
1321 135 78 if ($server_data->{'IO'}) { }
1345 0 0 if $self->debug_level
1356 0 0 if $self->_connectIO($broker) and $self->_sendIO($broker, $encoded_request) and $encoded_response_ref = $self->_receiveIO($broker)
1361 0 0 unless ($encoded_response_ref)
1367 0 0 if $self->debug_level
1371 0 0 unless defined $decoded_response->{'CorrelationId'} and $decoded_response->{'CorrelationId'} == $CorrelationId
1375 0 0 if $decoded_response->{'ErrorCode'}
1402 0 125 if $self->debug_level
1413 119 11 if $self->_connectIO($broker) and $self->_sendIO($broker, $encoded_request) and $encoded_response_ref = $self->_receiveIO($broker)
1418 5 119 unless ($encoded_response_ref)
1424 0 119 if $self->debug_level
1428 0 119 unless defined $decoded_response->{'CorrelationId'} and $decoded_response->{'CorrelationId'} == $CorrelationId
1432 0 119 unless (&_ARRAY($decoded_response->{'Broker'}))
1433 0 0 if ($self->{'AutoCreateTopicsEnable'}) { }
1470 0 119 if ($ErrorCode = $topic_metadata->{'ErrorCode'}) != $Kafka::Connection::ERROR_NO_ERROR
1475 0 119 if ($ErrorCode = $partition_metadata->{'ErrorCode'}) != $Kafka::Connection::ERROR_NO_ERROR and $ErrorCode != $Kafka::Connection::ERROR_REPLICA_NOT_AVAILABLE
1487 0 119 if ($ErrorCode != $Kafka::Connection::ERROR_NO_ERROR)
1488 0 0 if (exists $RETRY_ON_ERRORS{$ErrorCode}) { }
1492 0 0 defined $partition ? :
1507 0 0 if $is_recursive_call
1516 0 0 if $self->debug_level
1519 0 0 if $self->_update_metadata($topic, 1)
1522 0 0 defined $partition ? :
1531 0 659 if is_ipv6($host)
1540 16 4 if ($server_data->{'IO'})
1545 20 0 if (&blessed($error) and $error->isa('Kafka::Exception')) { }
1546 1 19 if ($error->code == $Kafka::Connection::ERROR_MISMATCH_ARGUMENT or $error->code == $Kafka::Connection::ERROR_INCOMPATIBLE_HOST_IP_VERSION)
1561 19 0 if (my $server_data = $self->{'_IO_cache'}{$server})
1569 0 10124 unless my $server_data = $self->{'_IO_cache'}{$server}
1577 0 10279 unless my $server_data = $self->{'_IO_cache'}{$server}
1580 115 10164 unless ($server_data->{'IO'})
1582 0 115 $self->{'async'} ? :
1595 4 111 if (defined $error)
1599 1 110 if (defined $self->{'sasl_username'} and defined $self->{'sasl_password'})
1600 0 1 unless ($self->sasl_auth($server, 'Username', $self->{'sasl_username'}, 'Password', $self->{'sasl_password'}, 'Mechanizm', $self->{'sasl_mechanizm'}))
1612 0 15532 unless my $server_data = $self->{'_IO_cache'}{$server}
1617 0 15532 unless $server_data->{'IO'}
1633 11 10265 if (defined $error)
1648 5251 0 if ($response_ref and length $$response_ref == 4)
1657 5 5251 if (defined $error)
1668 24 0 if (my $server_data = $self->{'_IO_cache'}{$server})
1669 13 11 if (my $io = $server_data->{'IO'})
1671 13 0 unless $keep_error
1683 40 223 unless (defined $server and defined &_STRING($server) and not utf8::is_utf8($server))
1692 50 173 unless (is_hostname($host) || is_ipv4($host) || is_ipv6($host) and $port)