Branch Coverage

blib/lib/EV/Kafka.pm
Criterion Covered Total %
branch 22 460 4.7


line true false branch
85 0 0 if ($c->connected)
87 0 1 $conn && $conn->connected ? :
93 0 0 if $cfg->{'conns'}{$node_id}
96 0 0 unless $info
104 0 0 if $weak
115 0 0 if ($cfg->{'tls'})
118 0 0 if ($cfg->{'sasl'})
123 0 0 if $weak_cfg and $weak_cfg->{'on_error'}
134 0 0 if ($idx >= @bs)
136 0 0 if $cfg->{'on_error'}
151 0 0 if $cfg->{'on_error'}
176 0 0 if $t->{'error_code'}
186 0 1 if $cfg->{'meta_pending'}
190 1 0 unless ($conn)
195 0 0 if ($err)
196 0 0 if $cfg->{'on_error'}
204 0 0 if ($cfg->{'bootstrap_conn'} and $meta->{'brokers'} and @{$meta->{'brokers'};})
209 0 0 if ($cfg->{'idempotent'} || $cfg->{'transactional_id'} and $cfg->{'producer_id'} < 0) { }
212 0 0 if $cb
213 0 0 if $cfg->{'on_connect'}
218 0 0 if $cb
219 0 0 if $cfg->{'on_connect'}
228 0 0 if $cfg->{'meta_pending'}
232 0 0 unless ($conn)
237 0 0 if ($err)
238 0 0 if $cfg->{'on_error'}
247 0 0 if ($t->{'name'} eq $topic and not $t->{'error_code'} and @{$t->{'partitions'} // [];})
253 0 0 if ($topic_ok) { }
273 0 0 if (not $err and $res and not $res->{'error_code'}) { }
278 0 0 if $cfg->{'on_error'}
280 0 0 if $cb
284 0 0 if ($cfg->{'transactional_id'}) { }
287 0 0 if $cb
0 0 unless ($conn)
291 0 0 if ($err or $res->{'error_code'} and $res->{'error_code'} != 0)
300 0 0 if ($txn_conn and $txn_conn->connected) { }
317 0 0 if $cb
0 0 unless ($conn)
327 0 0 if (defined $op->{'node_id'} and $op->{'node_id'} == $node_id) { }
342 0 0 if (defined $op->{'node_id'}) { }
344 0 0 if ($conn and $conn->connected) { }
362 0 0 unless my $meta = $self->{'cfg'}{'meta'}
364 0 0 if $t->{'name'} eq $topic
372 0 0 unless $np > 0
375 0 0 if ($cfg->{'partitioner'})
378 0 0 if (defined $key and length $key)
391 0 0 if (ref $a eq 'CODE') { }
0 0 elsif (ref $a eq 'HASH') { }
398 1 0 unless ($cfg->{'meta'})
402 1 0 unless $cfg->{'meta_pending'}
408 0 0 exists $opts{'partition'} ? :
411 0 0 unless (defined $leader_id)
416 0 0 unless $cfg->{'meta_pending'}
421 0 0 unless ($conn and $conn->connected)
432 0 0 if $opts{'headers'}
442 0 0 if ($batch_bytes >= $cfg->{'batch_size'}) { }
0 0 elsif (not $cfg->{'_linger_active'}) { }
450 0 0 if $weak
460 0 0 unless $batch and @$batch
466 0 0 if $cfg->{'compression'}
467 0 0 if $cfg->{'_txn_active'}
469 0 0 if (defined $cfg->{'producer_id'} and $cfg->{'producer_id'} >= 0)
477 0 0 if $cfg->{'_txn_active'}
486 0 0 if (not $err and $result and ref $result->{'topics'} eq "ARRAY")
490 0 0 if $ec == 6 or $ec == 15 or $ec == 16
495 0 0 if ($retriable and ($cfg->{'_batch_retries'}{$bkey} // 0) > 0)
497 0 0 if defined $saved_seq
498 0 0 if (exists $cfg->{'batches'}{$bkey}) { }
503 0 0 unless $cfg->{'meta_pending'}
513 0 0 if $cb
525 0 0 unless (defined $leader_id)
527 0 0 unless ($conn and $conn->connected)
531 0 0 if ($skipped and keys %{$$cfg{"batches"};})
536 0 0 if $weak
544 0 0 if $cb and not $remaining
548 0 0 ref $msg eq 'ARRAY' ? :
549 0 0 if ($acks0) { }
555 0 0 if $err
556 0 0 if (--$remaining <= 0 and $cb)
557 0 0 @errors ? :
562 0 0 @errors ? :
0 0 if $cb and $acks0
578 0 0 unless $conn and $conn->connected
582 0 0 if ($cfg->{'bootstrap_conn'} and $cfg->{'bootstrap_conn'}->connected and not $seen{${$cfg->{'bootstrap_conn'};}})
586 0 0 if ($pending == 0)
587 0 0 if $cb
595 0 0 unless $c and $c->connected
601 0 0 if $cfg->{'bootstrap_conn'} and $cfg->{'bootstrap_conn'}->connected and not $s{${$cfg->{'bootstrap_conn'};}}
602 0 0 if ($p == 0)
604 0 0 if $cb
622 0 0 if ($a->{'topic'} eq $topic and $a->{'partition'} == $partition)
623 0 0 if ($offset_or_ts >= 0) { }
625 0 0 if $cb
629 0 0 defined $leader_id ? :
630 0 0 if ($conn and $conn->connected) { }
633 0 0 if (not $err and $res)
635 0 0 if defined $off
637 0 0 if $cb
640 0 0 if $cb
646 0 0 if $cb
652 0 0 unless @{$cfg->{'assignments'};}
654 0 0 unless ($cfg->{'meta'})
658 0 0 unless $cfg->{'meta_pending'}
666 0 0 unless defined $leader_id
677 0 0 unless $conn and $conn->connected
697 0 0 if (not $err and $result and ref $result->{'topics'} eq "ARRAY")
702 0 0 if ($cfg->{'on_message'})
710 0 0 if (@$records)
712 0 0 if $a
718 0 0 if $cb and $dispatched <= 0
721 0 0 if $cb and not $dispatched
729 0 0 if $cb and not $np
736 0 0 defined $leader_id ? :
737 0 0 unless ($conn and $conn->connected)
739 0 0 if $cb and --$remaining <= 0
747 0 0 if (not $err and $res and ref $res->{'topics'} eq "ARRAY")
749 0 0 if ($ts == -2) { }
752 0 0 if (++$pdone == 2)
754 0 0 if $cb and --$remaining <= 0
765 0 0 if $cb and not @assignments
772 0 0 defined $leader_id ? :
773 0 0 if ($conn and $conn->connected) { }
777 0 0 if (not $err and $res and ref $res->{'topics'} eq "ARRAY")
785 0 0 if $cb and --$remaining <= 0
789 0 0 if $cb and --$remaining <= 0
795 0 0 if ref $_[0]
808 0 0 if ($args[0] =~ /^(group_id|group_instance_id|on_assign|on_revoke|session_timeout|rebalance_timeout|heartbeat_interval|auto_commit|auto_offset_reset)$/) { }
817 0 0 unless my $group_id = $opts{'group_id'}
838 0 0 unless ($cfg->{'meta'})
842 0 0 unless $cfg->{'meta_pending'}
852 0 0 unless my $g = $cfg->{'group'}
855 0 0 unless $conn
860 0 0 if ($err or $res->{'error_code'})
862 0 0 if $cfg->{'on_error'}
876 0 0 if ($coord->connected) { }
890 0 0 unless my $g = $cfg->{'group'}
891 0 0 unless my $coord = $g->{'coordinator'}
898 0 0 if ($err)
899 0 0 if $cfg->{'on_error'}
903 0 0 if ($res->{'error_code'} == 15 or $res->{'error_code'} == 16)
908 0 0 if ($res->{'error_code'} == 27)
913 0 0 if ($res->{'error_code'} == 79)
915 0 0 if $res->{'member_id'}
919 0 0 if ($res->{'error_code'})
920 0 0 if $cfg->{'on_error'}
930 0 0 if ($is_leader and $res->{'members'} and @{$res->{'members'};})
944 0 7 unless my $meta = $cfg->{'meta'}
949 0 11 unless grep {$_ eq $tname;} @$topics
954 0 16 unless $EV::Kafka::Client::a->{'topic'} cmp $EV::Kafka::Client::b->{'topic'}
964 1 6 @all_parts % $nm ? :
971 30 30 if $_->{'topic'} eq $p->{'topic'}
10 0 if (grep {$_->{'partition'} == $p->{'partition'} if $_->{'topic'} eq $p->{'topic'};} @all_parts)
972 8 2 if (scalar @{$member_parts{$mid};} < $max_per)
986 10 42 if (scalar @{$member_parts{$mid};} < $min_count)
1029 0 0 unless my $g = $cfg->{'group'}
1030 0 0 unless my $coord = $g->{'coordinator'}
1035 0 0 if ($err)
1036 0 0 if $cfg->{'on_error'}
1039 0 0 if ($res->{'error_code'} == 27)
1044 0 0 if ($res->{'error_code'})
1045 0 0 if $cfg->{'on_error'}
1053 0 0 if ($dlen >= 6)
1057 0 0 unless $off + 2 <= $dlen
1059 0 0 unless $off + $tlen <= $dlen
1061 0 0 unless $off + 4 <= $dlen
1064 0 0 unless $off + 4 <= $dlen
1066 0 0 ($g->{'auto_offset_reset'} // 'earliest') eq 'latest' ? :
1081 0 0 if $g->{'on_assign'}
1099 0 0 unless my $g = $cfg->{'group'}
1101 0 0 unless $coord and $coord->connected and @$assignments
1115 0 0 if (not $err and $res and ref $res->{'topics'} eq "ARRAY")
1118 0 0 if $p->{'error_code'}
1119 0 0 if $p->{'offset'} < 0
1121 0 0 if ($a->{'topic'} eq $t->{'topic'} and $a->{'partition'} == $p->{'partition'})
1131 0 0 if (@need_offsets) { }
1135 0 0 defined $leader_id ? :
1136 0 0 if ($lconn and $lconn->connected) { }
1139 0 0 if (not $lerr and $lres and ref $lres->{'topics'} eq "ARRAY")
1142 0 0 unless $lp->{'error_code'}
1147 0 0 if $remaining <= 0
1152 0 0 if $remaining <= 0
1164 0 0 unless my $g = $cfg->{'group'}
1167 0 0 unless $g->{'state'} eq "stable"
1169 0 0 unless $coord and $coord->connected
1173 0 0 if ($err)
1174 0 0 if ($res and $res->{'error_code'} == 27)
1177 0 0 if $g->{'on_revoke'}
1188 4 0 unless my $g = $self->{'cfg'}{'group'}
1195 0 0 if $cfg->{'fetch_active'}
1199 0 0 unless $cfg->{'fetch_active'}
1215 0 0 if $cb
0 0 unless ($g)
1217 0 0 if $cb
0 0 unless ($coord and $coord->connected)
1233 0 0 if $cb
0 0 unless (@topics)
1237 0 0 if $cb
1251 0 0 if ($g and $g->{'coordinator'} and $g->{'coordinator'}->connected and $g->{'member_id'}) { }
1255 0 0 if $cb
1260 0 0 if $cb
1264 0 0 if ($g and $g->{'auto_commit'}) { }
1276 0 0 unless $cfg->{'transactional_id'}
1277 0 0 unless defined $cfg->{'producer_id'} and $cfg->{'producer_id'} >= 0
1286 0 0 if $conn and $conn->connected
1293 0 0 unless $cfg->{'_txn_active'}
1295 0 0 if $cfg->{'_txn_partitions'}{$key}++
1298 0 0 unless $conn
1311 0 0 unless $cfg->{'_txn_active'}
1316 0 0 if $cb
0 0 unless ($conn)
1323 0 0 if $cb
1331 0 0 unless $cfg->{'_txn_active'}
1332 0 0 unless $cfg->{'transactional_id'}
1348 0 0 unless (@topics)
1349 0 0 if $cb
1354 0 0 if $cb
0 0 unless ($conn)
1357 0 0 $g ? :
1358 0 0 $g ? :
1366 0 0 if $cb
1374 0 0 unless $cfg->{'_txn_active'}
1382 0 0 if $cb
0 0 unless ($conn)
1389 0 0 if $cb
1395 0 4 unless my $cfg = $self->{'cfg'}
1401 0 0 if $conn and $conn->connected
1403 0 4 if ($cfg->{'bootstrap_conn'})
1405 0 0 if $cfg->{'bootstrap_conn'}->connected
1409 0 4 if $cb
1414 0 4 unless $self and $self->{'cfg'}