Branch Coverage

blib/lib/EV/Kafka.pm
Criterion Covered Total %
branch 38 518 7.3


line true false branch
75 1 11 if ($cfg->{'sasl'} and not $cfg->{'tls'})
77 1 0 if ($mech eq "PLAIN" or $mech =~ /^SCRAM-/)
95 0 0 if ($c->connected)
97 0 1 $conn && $conn->connected ? :
103 0 0 if $cfg->{'conns'}{$node_id}
106 0 0 unless $info
114 0 0 if $weak
125 0 0 if ($cfg->{'tls'})
128 0 0 if ($cfg->{'sasl'})
133 0 0 if $weak_cfg and $weak_cfg->{'on_error'}
144 0 0 if ($idx >= @bs)
146 0 0 if $cfg->{'on_error'}
155 0 0 if $try
165 0 0 if $cfg->{'on_error'}
190 0 0 if $t->{'error_code'}
200 0 1 if $cfg->{'meta_pending'}
204 1 0 unless ($conn)
209 0 0 if ($err)
210 0 0 if $cfg->{'on_error'}
218 0 0 if ($cfg->{'bootstrap_conn'} and $meta->{'brokers'} and @{$meta->{'brokers'};})
223 0 0 if ($cfg->{'idempotent'} || $cfg->{'transactional_id'} and $cfg->{'producer_id'} < 0) { }
226 0 0 if $cb
227 0 0 if $cfg->{'on_connect'}
232 0 0 if $cb
233 0 0 if $cfg->{'on_connect'}
245 0 0 if $cfg->{'_meta_timer'}
247 0 0 if $interval <= 0
250 0 0 unless $weak
251 0 0 unless $weak->{'cfg'}{'meta_pending'}
263 0 1 if $cfg->{'meta_pending'}
268 1 0 if ($tries > 10)
274 1 0 if ($op->{'topic'} and $op->{'topic'} eq $topic) { }
275 1 0 if $op->{'cb'}
281 1 0 if $cfg->{'on_error'}
288 0 0 unless ($conn)
293 0 0 if ($err)
294 0 0 if $cfg->{'on_error'}
302 0 0 if ($cfg->{'meta'} and ref $cfg->{'meta'}{'topics'} eq 'ARRAY') { }
315 0 0 if ($t->{'name'} eq $topic and not $t->{'error_code'} and @{$t->{'partitions'} // [];})
321 0 0 if ($topic_ok) { }
342 0 0 if (not $err and $res and not $res->{'error_code'}) { }
347 0 0 if $cfg->{'on_error'}
349 0 0 if $cb
353 0 0 if ($cfg->{'transactional_id'}) { }
356 0 0 if $cb
0 0 unless ($conn)
360 0 0 if ($err or $res->{'error_code'} and $res->{'error_code'} != 0)
369 0 0 if ($txn_conn and $txn_conn->connected) { }
386 0 0 if $cb
0 0 unless ($conn)
396 0 0 if (defined $op->{'node_id'} and $op->{'node_id'} == $node_id) { }
411 0 0 if (defined $op->{'node_id'}) { }
413 0 0 if ($conn and $conn->connected) { }
431 0 0 unless my $meta = $self->{'cfg'}{'meta'}
433 0 0 if $t->{'name'} eq $topic
441 0 0 unless $np > 0
444 0 0 if ($cfg->{'partitioner'})
447 0 0 if (defined $key and length $key)
460 0 0 if (ref $a eq 'CODE') { }
0 0 elsif (ref $a eq 'HASH') { }
467 1 0 unless ($cfg->{'meta'})
471 1 0 unless $cfg->{'meta_pending'}
477 0 0 exists $opts{'partition'} ? :
480 0 0 unless (defined $leader_id)
485 0 0 unless $cfg->{'meta_pending'}
490 0 0 unless ($conn and $conn->connected)
501 0 0 if $opts{'headers'}
511 0 0 if ($batch_bytes >= $cfg->{'batch_size'}) { }
0 0 elsif (not $cfg->{'_linger_active'}) { }
519 0 0 if $weak
533 0 0 if $idempotent and $cfg->{'_inflight'}{$bkey}
536 0 0 unless $batch and @$batch
542 0 0 if $cfg->{'compression'}
543 0 0 if $cfg->{'_txn_active'}
545 0 0 if ($idempotent)
554 0 0 if $cfg->{'_txn_active'}
562 0 0 if $idempotent
566 0 0 if (not $err and $result and ref $result->{'topics'} eq "ARRAY")
570 0 0 if $ec == 6 or $ec == 15 or $ec == 16
573 0 0 if $ec == 45 or $ec == 46
582 0 0 if ($idempotent and $fatal_seq and not $cfg->{'_txn_active'} and ($cfg->{'_epoch_retries'}{$bkey} //= 1) > 0)
588 0 0 if (exists $cfg->{'batches'}{$bkey}) { }
597 0 0 if $weak_self
598 0 0 if $weak_self
602 0 0 if ($retriable and ($cfg->{'_batch_retries'}{$bkey} // 0) > 0)
604 0 0 if defined $saved_seq
605 0 0 if (exists $cfg->{'batches'}{$bkey}) { }
610 0 0 if $weak_self and not $cfg->{'meta_pending'}
613 0 0 if $weak_self
623 0 0 if ($idempotent and not $err and defined $saved_seq)
626 0 0 if $next > $cur
631 0 0 if $cb
635 0 0 if ($idempotent and $weak_self and $cfg->{'batches'}{$bkey} and @{$cfg->{'batches'}{$bkey};})
649 1 0 unless (defined $leader_id)
651 0 0 unless ($conn and $conn->connected)
655 1 0 if ($skipped and keys %{$$cfg{"batches"};})
660 0 0 if $weak
668 1 1 if $cb and not $remaining
673 2 0 if (ref $msg eq 'ARRAY') { }
679 0 0 if %opts
681 2 0 if ($acks0) { }
686 0 0 if $err
687 0 0 if (--$remaining <= 0 and $cb)
688 0 0 @errors ? :
693 0 1 @errors ? :
1 0 if $cb and $acks0
711 0 0 unless $c and $c->connected
717 0 0 if $cfg->{'bootstrap_conn'} and $cfg->{'bootstrap_conn'}->connected and not $seen{${$cfg->{'bootstrap_conn'};}}
719 0 0 if $_
723 0 0 if (&$outstanding() == 0)
724 0 0 if $cb
728 0 0 if &$outstanding() > 0
730 0 0 if $cb
747 0 0 if ($a->{'topic'} eq $topic and $a->{'partition'} == $partition)
748 0 0 if ($offset_or_ts >= 0) { }
750 0 0 if $cb
754 0 0 defined $leader_id ? :
755 0 0 if ($conn and $conn->connected) { }
758 0 0 if (not $err and $res)
760 0 0 if defined $off
762 0 0 if $cb
765 0 0 if $cb
771 0 0 if $cb
777 0 0 unless @{$cfg->{'assignments'};}
779 0 0 unless ($cfg->{'meta'})
783 0 0 unless $cfg->{'meta_pending'}
791 0 0 unless defined $leader_id
802 0 0 unless $conn and $conn->connected
826 0 0 if (not $err and $result and ref $result->{'topics'} eq "ARRAY")
831 0 0 if ($cfg->{'on_message'})
839 0 0 if (@$records)
841 0 0 if $a
847 0 0 if $cb and $dispatched <= 0
850 0 0 if $cb and not $dispatched
858 0 0 if $cb and not $np
865 0 0 defined $leader_id ? :
866 0 0 unless ($conn and $conn->connected)
868 0 0 if $cb and --$remaining <= 0
876 0 0 if (not $err and $res and ref $res->{'topics'} eq "ARRAY")
878 0 0 if ($ts == -2) { }
881 0 0 if (++$pdone == 2)
883 0 0 if $cb and --$remaining <= 0
894 0 0 if $cb and not @assignments
901 0 0 defined $leader_id ? :
902 0 0 if ($conn and $conn->connected) { }
906 0 0 if (not $err and $res and ref $res->{'topics'} eq "ARRAY")
914 0 0 if $cb and --$remaining <= 0
918 0 0 if $cb and --$remaining <= 0
924 0 0 if ref $_[0]
937 0 0 if ($args[0] =~ /^(group_id|group_instance_id|on_assign|on_revoke|session_timeout|rebalance_timeout|heartbeat_interval|auto_commit|auto_offset_reset)$/) { }
946 0 0 unless my $group_id = $opts{'group_id'}
967 0 0 unless ($cfg->{'meta'})
971 0 0 unless $cfg->{'meta_pending'}
981 0 0 unless my $g = $cfg->{'group'}
984 0 0 unless $conn
989 0 0 if ($err or $res->{'error_code'})
991 0 0 if $cfg->{'on_error'}
1005 0 0 if ($coord->connected) { }
1019 0 0 unless my $g = $cfg->{'group'}
1020 0 0 unless my $coord = $g->{'coordinator'}
1027 0 0 if ($err)
1028 0 0 if $cfg->{'on_error'}
1032 0 0 if ($res->{'error_code'} == 15 or $res->{'error_code'} == 16)
1037 0 0 if ($res->{'error_code'} == 27)
1042 0 0 if ($res->{'error_code'} == 79)
1044 0 0 if $res->{'member_id'}
1048 0 0 if ($res->{'error_code'} == 22 or $res->{'error_code'} == 25)
1057 0 0 if ($res->{'error_code'})
1060 0 0 if $cfg->{'on_error'}
1071 0 0 if ($is_leader and $res->{'members'} and @{$res->{'members'};})
1085 0 7 unless my $meta = $cfg->{'meta'}
1090 0 11 unless grep {$_ eq $tname;} @$topics
1095 0 16 unless $EV::Kafka::Client::a->{'topic'} cmp $EV::Kafka::Client::b->{'topic'}
1105 1 6 @all_parts % $nm ? :
1112 30 30 if $_->{'topic'} eq $p->{'topic'}
10 0 if (grep {$_->{'partition'} == $p->{'partition'} if $_->{'topic'} eq $p->{'topic'};} @all_parts)
1113 8 2 if (scalar @{$member_parts{$mid};} < $max_per)
1127 10 42 if (scalar @{$member_parts{$mid};} < $min_count)
1170 0 0 unless my $g = $cfg->{'group'}
1171 0 0 unless my $coord = $g->{'coordinator'}
1176 0 0 if ($err)
1177 0 0 if $cfg->{'on_error'}
1180 0 0 if ($res->{'error_code'} == 27)
1185 0 0 if ($res->{'error_code'} == 22 or $res->{'error_code'} == 25)
1192 0 0 if ($res->{'error_code'})
1195 0 0 if $cfg->{'on_error'}
1203 0 0 if ($dlen >= 6)
1207 0 0 unless $off + 2 <= $dlen
1209 0 0 unless $off + $tlen <= $dlen
1211 0 0 unless $off + 4 <= $dlen
1214 0 0 unless $off + 4 <= $dlen
1216 0 0 ($g->{'auto_offset_reset'} // 'earliest') eq 'latest' ? :
1231 0 0 if $g->{'on_assign'}
1249 0 0 unless my $g = $cfg->{'group'}
1251 0 0 unless $coord and $coord->connected and @$assignments
1265 0 0 if (not $err and $res and ref $res->{'topics'} eq "ARRAY")
1268 0 0 if $p->{'error_code'}
1269 0 0 if $p->{'offset'} < 0
1271 0 0 if ($a->{'topic'} eq $t->{'topic'} and $a->{'partition'} == $p->{'partition'})
1281 0 0 if (@need_offsets) { }
1285 0 0 defined $leader_id ? :
1286 0 0 if ($lconn and $lconn->connected) { }
1289 0 0 if (not $lerr and $lres and ref $lres->{'topics'} eq "ARRAY")
1292 0 0 unless $lp->{'error_code'}
1297 0 0 if $remaining <= 0
1302 0 0 if $remaining <= 0
1314 0 0 unless my $g = $cfg->{'group'}
1317 0 0 unless $g->{'state'} eq "stable"
1319 0 0 unless $coord and $coord->connected
1323 0 0 if ($err)
1324 0 0 unless $res
1326 0 0 if ($ec == 27) { }
0 0 elsif ($ec == 22 or $ec == 25) { }
0 0 elsif ($ec == 15 or $ec == 16) { }
1329 0 0 if $g->{'on_revoke'}
1356 11 0 unless my $g = $self->{'cfg'}{'group'}
1363 0 0 if $cfg->{'fetch_active'}
1369 0 0 unless $weak and $cfg->{'fetch_active'}
1372 0 0 if $cfg->{'_fetch_in_flight'}
1390 0 0 if $cb
0 0 unless ($g)
1392 0 0 if $cb
0 0 unless ($coord and $coord->connected)
1408 0 0 if $cb
0 0 unless (@topics)
1412 0 0 if $cb
1426 0 0 if ($g and $g->{'coordinator'} and $g->{'coordinator'}->connected and $g->{'member_id'}) { }
1430 0 0 if $cb
1435 0 0 if $cb
1439 0 0 if ($g and $g->{'auto_commit'}) { }
1451 0 0 unless $cfg->{'transactional_id'}
1452 0 0 unless defined $cfg->{'producer_id'} and $cfg->{'producer_id'} >= 0
1461 0 0 if $conn and $conn->connected
1468 0 0 unless $cfg->{'_txn_active'}
1470 0 0 if $cfg->{'_txn_partitions'}{$key}++
1473 0 0 unless $conn
1486 0 0 unless $cfg->{'_txn_active'}
1491 0 0 if $cb
0 0 unless ($conn)
1498 0 0 if $cb
1506 0 0 unless $cfg->{'_txn_active'}
1507 0 0 unless $cfg->{'transactional_id'}
1523 0 0 unless (@topics)
1524 0 0 if $cb
1529 0 0 if $cb
0 0 unless ($conn)
1532 0 0 $g ? :
1533 0 0 $g ? :
1541 0 0 if $cb
1549 0 0 unless $cfg->{'_txn_active'}
1568 0 0 if $cb
0 0 unless ($conn)
1575 0 0 if $cb
1581 0 11 unless my $cfg = $self->{'cfg'}
1588 0 0 if $conn and $conn->connected
1590 0 11 if ($cfg->{'bootstrap_conn'})
1592 0 0 if $cfg->{'bootstrap_conn'}->connected
1596 0 11 if $cb
1601 0 11 unless $self and $self->{'cfg'}