| line |
true |
false |
branch |
|
85
|
0 |
0 |
if ($c->connected) |
|
87
|
0 |
1 |
$conn && $conn->connected ? : |
|
93
|
0 |
0 |
if $cfg->{'conns'}{$node_id} |
|
96
|
0 |
0 |
unless $info |
|
104
|
0 |
0 |
if $weak |
|
115
|
0 |
0 |
if ($cfg->{'tls'}) |
|
118
|
0 |
0 |
if ($cfg->{'sasl'}) |
|
123
|
0 |
0 |
if $weak_cfg and $weak_cfg->{'on_error'} |
|
134
|
0 |
0 |
if ($idx >= @bs) |
|
136
|
0 |
0 |
if $cfg->{'on_error'} |
|
151
|
0 |
0 |
if $cfg->{'on_error'} |
|
176
|
0 |
0 |
if $t->{'error_code'} |
|
186
|
0 |
1 |
if $cfg->{'meta_pending'} |
|
190
|
1 |
0 |
unless ($conn) |
|
195
|
0 |
0 |
if ($err) |
|
196
|
0 |
0 |
if $cfg->{'on_error'} |
|
204
|
0 |
0 |
if ($cfg->{'bootstrap_conn'} and $meta->{'brokers'} and @{$meta->{'brokers'};}) |
|
209
|
0 |
0 |
if ($cfg->{'idempotent'} || $cfg->{'transactional_id'} and $cfg->{'producer_id'} < 0) { } |
|
212
|
0 |
0 |
if $cb |
|
213
|
0 |
0 |
if $cfg->{'on_connect'} |
|
218
|
0 |
0 |
if $cb |
|
219
|
0 |
0 |
if $cfg->{'on_connect'} |
|
228
|
0 |
0 |
if $cfg->{'meta_pending'} |
|
232
|
0 |
0 |
unless ($conn) |
|
237
|
0 |
0 |
if ($err) |
|
238
|
0 |
0 |
if $cfg->{'on_error'} |
|
247
|
0 |
0 |
if ($t->{'name'} eq $topic and not $t->{'error_code'} and @{$t->{'partitions'} // [];}) |
|
253
|
0 |
0 |
if ($topic_ok) { } |
|
273
|
0 |
0 |
if (not $err and $res and not $res->{'error_code'}) { } |
|
278
|
0 |
0 |
if $cfg->{'on_error'} |
|
280
|
0 |
0 |
if $cb |
|
284
|
0 |
0 |
if ($cfg->{'transactional_id'}) { } |
|
287
|
0 |
0 |
if $cb |
|
|
0 |
0 |
unless ($conn) |
|
291
|
0 |
0 |
if ($err or $res->{'error_code'} and $res->{'error_code'} != 0) |
|
300
|
0 |
0 |
if ($txn_conn and $txn_conn->connected) { } |
|
317
|
0 |
0 |
if $cb |
|
|
0 |
0 |
unless ($conn) |
|
327
|
0 |
0 |
if (defined $op->{'node_id'} and $op->{'node_id'} == $node_id) { } |
|
342
|
0 |
0 |
if (defined $op->{'node_id'}) { } |
|
344
|
0 |
0 |
if ($conn and $conn->connected) { } |
|
362
|
0 |
0 |
unless my $meta = $self->{'cfg'}{'meta'} |
|
364
|
0 |
0 |
if $t->{'name'} eq $topic |
|
372
|
0 |
0 |
unless $np > 0 |
|
375
|
0 |
0 |
if ($cfg->{'partitioner'}) |
|
378
|
0 |
0 |
if (defined $key and length $key) |
|
391
|
0 |
0 |
if (ref $a eq 'CODE') { } |
|
|
0 |
0 |
elsif (ref $a eq 'HASH') { } |
|
398
|
1 |
0 |
unless ($cfg->{'meta'}) |
|
402
|
1 |
0 |
unless $cfg->{'meta_pending'} |
|
408
|
0 |
0 |
exists $opts{'partition'} ? : |
|
411
|
0 |
0 |
unless (defined $leader_id) |
|
416
|
0 |
0 |
unless $cfg->{'meta_pending'} |
|
421
|
0 |
0 |
unless ($conn and $conn->connected) |
|
432
|
0 |
0 |
if $opts{'headers'} |
|
442
|
0 |
0 |
if ($batch_bytes >= $cfg->{'batch_size'}) { } |
|
|
0 |
0 |
elsif (not $cfg->{'_linger_active'}) { } |
|
450
|
0 |
0 |
if $weak |
|
460
|
0 |
0 |
unless $batch and @$batch |
|
466
|
0 |
0 |
if $cfg->{'compression'} |
|
467
|
0 |
0 |
if $cfg->{'_txn_active'} |
|
469
|
0 |
0 |
if (defined $cfg->{'producer_id'} and $cfg->{'producer_id'} >= 0) |
|
477
|
0 |
0 |
if $cfg->{'_txn_active'} |
|
486
|
0 |
0 |
if (not $err and $result and ref $result->{'topics'} eq "ARRAY") |
|
490
|
0 |
0 |
if $ec == 6 or $ec == 15 or $ec == 16 |
|
495
|
0 |
0 |
if ($retriable and ($cfg->{'_batch_retries'}{$bkey} // 0) > 0) |
|
497
|
0 |
0 |
if defined $saved_seq |
|
498
|
0 |
0 |
if (exists $cfg->{'batches'}{$bkey}) { } |
|
503
|
0 |
0 |
unless $cfg->{'meta_pending'} |
|
513
|
0 |
0 |
if $cb |
|
525
|
0 |
0 |
unless (defined $leader_id) |
|
527
|
0 |
0 |
unless ($conn and $conn->connected) |
|
531
|
0 |
0 |
if ($skipped and keys %{$$cfg{"batches"};}) |
|
536
|
0 |
0 |
if $weak |
|
544
|
0 |
0 |
if $cb and not $remaining |
|
548
|
0 |
0 |
ref $msg eq 'ARRAY' ? : |
|
549
|
0 |
0 |
if ($acks0) { } |
|
555
|
0 |
0 |
if $err |
|
556
|
0 |
0 |
if (--$remaining <= 0 and $cb) |
|
557
|
0 |
0 |
@errors ? : |
|
562
|
0 |
0 |
@errors ? : |
|
|
0 |
0 |
if $cb and $acks0 |
|
578
|
0 |
0 |
unless $conn and $conn->connected |
|
582
|
0 |
0 |
if ($cfg->{'bootstrap_conn'} and $cfg->{'bootstrap_conn'}->connected and not $seen{${$cfg->{'bootstrap_conn'};}}) |
|
586
|
0 |
0 |
if ($pending == 0) |
|
587
|
0 |
0 |
if $cb |
|
595
|
0 |
0 |
unless $c and $c->connected |
|
601
|
0 |
0 |
if $cfg->{'bootstrap_conn'} and $cfg->{'bootstrap_conn'}->connected and not $s{${$cfg->{'bootstrap_conn'};}} |
|
602
|
0 |
0 |
if ($p == 0) |
|
604
|
0 |
0 |
if $cb |
|
622
|
0 |
0 |
if ($a->{'topic'} eq $topic and $a->{'partition'} == $partition) |
|
623
|
0 |
0 |
if ($offset_or_ts >= 0) { } |
|
625
|
0 |
0 |
if $cb |
|
629
|
0 |
0 |
defined $leader_id ? : |
|
630
|
0 |
0 |
if ($conn and $conn->connected) { } |
|
633
|
0 |
0 |
if (not $err and $res) |
|
635
|
0 |
0 |
if defined $off |
|
637
|
0 |
0 |
if $cb |
|
640
|
0 |
0 |
if $cb |
|
646
|
0 |
0 |
if $cb |
|
652
|
0 |
0 |
unless @{$cfg->{'assignments'};} |
|
654
|
0 |
0 |
unless ($cfg->{'meta'}) |
|
658
|
0 |
0 |
unless $cfg->{'meta_pending'} |
|
666
|
0 |
0 |
unless defined $leader_id |
|
677
|
0 |
0 |
unless $conn and $conn->connected |
|
697
|
0 |
0 |
if (not $err and $result and ref $result->{'topics'} eq "ARRAY") |
|
702
|
0 |
0 |
if ($cfg->{'on_message'}) |
|
710
|
0 |
0 |
if (@$records) |
|
712
|
0 |
0 |
if $a |
|
718
|
0 |
0 |
if $cb and $dispatched <= 0 |
|
721
|
0 |
0 |
if $cb and not $dispatched |
|
729
|
0 |
0 |
if $cb and not $np |
|
736
|
0 |
0 |
defined $leader_id ? : |
|
737
|
0 |
0 |
unless ($conn and $conn->connected) |
|
739
|
0 |
0 |
if $cb and --$remaining <= 0 |
|
747
|
0 |
0 |
if (not $err and $res and ref $res->{'topics'} eq "ARRAY") |
|
749
|
0 |
0 |
if ($ts == -2) { } |
|
752
|
0 |
0 |
if (++$pdone == 2) |
|
754
|
0 |
0 |
if $cb and --$remaining <= 0 |
|
765
|
0 |
0 |
if $cb and not @assignments |
|
772
|
0 |
0 |
defined $leader_id ? : |
|
773
|
0 |
0 |
if ($conn and $conn->connected) { } |
|
777
|
0 |
0 |
if (not $err and $res and ref $res->{'topics'} eq "ARRAY") |
|
785
|
0 |
0 |
if $cb and --$remaining <= 0 |
|
789
|
0 |
0 |
if $cb and --$remaining <= 0 |
|
795
|
0 |
0 |
if ref $_[0] |
|
808
|
0 |
0 |
if ($args[0] =~ /^(group_id|group_instance_id|on_assign|on_revoke|session_timeout|rebalance_timeout|heartbeat_interval|auto_commit|auto_offset_reset)$/) { } |
|
817
|
0 |
0 |
unless my $group_id = $opts{'group_id'} |
|
838
|
0 |
0 |
unless ($cfg->{'meta'}) |
|
842
|
0 |
0 |
unless $cfg->{'meta_pending'} |
|
852
|
0 |
0 |
unless my $g = $cfg->{'group'} |
|
855
|
0 |
0 |
unless $conn |
|
860
|
0 |
0 |
if ($err or $res->{'error_code'}) |
|
862
|
0 |
0 |
if $cfg->{'on_error'} |
|
876
|
0 |
0 |
if ($coord->connected) { } |
|
890
|
0 |
0 |
unless my $g = $cfg->{'group'} |
|
891
|
0 |
0 |
unless my $coord = $g->{'coordinator'} |
|
898
|
0 |
0 |
if ($err) |
|
899
|
0 |
0 |
if $cfg->{'on_error'} |
|
903
|
0 |
0 |
if ($res->{'error_code'} == 15 or $res->{'error_code'} == 16) |
|
908
|
0 |
0 |
if ($res->{'error_code'} == 27) |
|
913
|
0 |
0 |
if ($res->{'error_code'} == 79) |
|
915
|
0 |
0 |
if $res->{'member_id'} |
|
919
|
0 |
0 |
if ($res->{'error_code'}) |
|
920
|
0 |
0 |
if $cfg->{'on_error'} |
|
930
|
0 |
0 |
if ($is_leader and $res->{'members'} and @{$res->{'members'};}) |
|
944
|
0 |
7 |
unless my $meta = $cfg->{'meta'} |
|
949
|
0 |
11 |
unless grep {$_ eq $tname;} @$topics |
|
954
|
0 |
16 |
unless $EV::Kafka::Client::a->{'topic'} cmp $EV::Kafka::Client::b->{'topic'} |
|
964
|
1 |
6 |
@all_parts % $nm ? : |
|
971
|
30 |
30 |
if $_->{'topic'} eq $p->{'topic'} |
|
|
10 |
0 |
if (grep {$_->{'partition'} == $p->{'partition'} if $_->{'topic'} eq $p->{'topic'};} @all_parts) |
|
972
|
8 |
2 |
if (scalar @{$member_parts{$mid};} < $max_per) |
|
986
|
10 |
42 |
if (scalar @{$member_parts{$mid};} < $min_count) |
|
1029
|
0 |
0 |
unless my $g = $cfg->{'group'} |
|
1030
|
0 |
0 |
unless my $coord = $g->{'coordinator'} |
|
1035
|
0 |
0 |
if ($err) |
|
1036
|
0 |
0 |
if $cfg->{'on_error'} |
|
1039
|
0 |
0 |
if ($res->{'error_code'} == 27) |
|
1044
|
0 |
0 |
if ($res->{'error_code'}) |
|
1045
|
0 |
0 |
if $cfg->{'on_error'} |
|
1053
|
0 |
0 |
if ($dlen >= 6) |
|
1057
|
0 |
0 |
unless $off + 2 <= $dlen |
|
1059
|
0 |
0 |
unless $off + $tlen <= $dlen |
|
1061
|
0 |
0 |
unless $off + 4 <= $dlen |
|
1064
|
0 |
0 |
unless $off + 4 <= $dlen |
|
1066
|
0 |
0 |
($g->{'auto_offset_reset'} // 'earliest') eq 'latest' ? : |
|
1081
|
0 |
0 |
if $g->{'on_assign'} |
|
1099
|
0 |
0 |
unless my $g = $cfg->{'group'} |
|
1101
|
0 |
0 |
unless $coord and $coord->connected and @$assignments |
|
1115
|
0 |
0 |
if (not $err and $res and ref $res->{'topics'} eq "ARRAY") |
|
1118
|
0 |
0 |
if $p->{'error_code'} |
|
1119
|
0 |
0 |
if $p->{'offset'} < 0 |
|
1121
|
0 |
0 |
if ($a->{'topic'} eq $t->{'topic'} and $a->{'partition'} == $p->{'partition'}) |
|
1131
|
0 |
0 |
if (@need_offsets) { } |
|
1135
|
0 |
0 |
defined $leader_id ? : |
|
1136
|
0 |
0 |
if ($lconn and $lconn->connected) { } |
|
1139
|
0 |
0 |
if (not $lerr and $lres and ref $lres->{'topics'} eq "ARRAY") |
|
1142
|
0 |
0 |
unless $lp->{'error_code'} |
|
1147
|
0 |
0 |
if $remaining <= 0 |
|
1152
|
0 |
0 |
if $remaining <= 0 |
|
1164
|
0 |
0 |
unless my $g = $cfg->{'group'} |
|
1167
|
0 |
0 |
unless $g->{'state'} eq "stable" |
|
1169
|
0 |
0 |
unless $coord and $coord->connected |
|
1173
|
0 |
0 |
if ($err) |
|
1174
|
0 |
0 |
if ($res and $res->{'error_code'} == 27) |
|
1177
|
0 |
0 |
if $g->{'on_revoke'} |
|
1188
|
4 |
0 |
unless my $g = $self->{'cfg'}{'group'} |
|
1195
|
0 |
0 |
if $cfg->{'fetch_active'} |
|
1199
|
0 |
0 |
unless $cfg->{'fetch_active'} |
|
1215
|
0 |
0 |
if $cb |
|
|
0 |
0 |
unless ($g) |
|
1217
|
0 |
0 |
if $cb |
|
|
0 |
0 |
unless ($coord and $coord->connected) |
|
1233
|
0 |
0 |
if $cb |
|
|
0 |
0 |
unless (@topics) |
|
1237
|
0 |
0 |
if $cb |
|
1251
|
0 |
0 |
if ($g and $g->{'coordinator'} and $g->{'coordinator'}->connected and $g->{'member_id'}) { } |
|
1255
|
0 |
0 |
if $cb |
|
1260
|
0 |
0 |
if $cb |
|
1264
|
0 |
0 |
if ($g and $g->{'auto_commit'}) { } |
|
1276
|
0 |
0 |
unless $cfg->{'transactional_id'} |
|
1277
|
0 |
0 |
unless defined $cfg->{'producer_id'} and $cfg->{'producer_id'} >= 0 |
|
1286
|
0 |
0 |
if $conn and $conn->connected |
|
1293
|
0 |
0 |
unless $cfg->{'_txn_active'} |
|
1295
|
0 |
0 |
if $cfg->{'_txn_partitions'}{$key}++ |
|
1298
|
0 |
0 |
unless $conn |
|
1311
|
0 |
0 |
unless $cfg->{'_txn_active'} |
|
1316
|
0 |
0 |
if $cb |
|
|
0 |
0 |
unless ($conn) |
|
1323
|
0 |
0 |
if $cb |
|
1331
|
0 |
0 |
unless $cfg->{'_txn_active'} |
|
1332
|
0 |
0 |
unless $cfg->{'transactional_id'} |
|
1348
|
0 |
0 |
unless (@topics) |
|
1349
|
0 |
0 |
if $cb |
|
1354
|
0 |
0 |
if $cb |
|
|
0 |
0 |
unless ($conn) |
|
1357
|
0 |
0 |
$g ? : |
|
1358
|
0 |
0 |
$g ? : |
|
1366
|
0 |
0 |
if $cb |
|
1374
|
0 |
0 |
unless $cfg->{'_txn_active'} |
|
1382
|
0 |
0 |
if $cb |
|
|
0 |
0 |
unless ($conn) |
|
1389
|
0 |
0 |
if $cb |
|
1395
|
0 |
4 |
unless my $cfg = $self->{'cfg'} |
|
1401
|
0 |
0 |
if $conn and $conn->connected |
|
1403
|
0 |
4 |
if ($cfg->{'bootstrap_conn'}) |
|
1405
|
0 |
0 |
if $cfg->{'bootstrap_conn'}->connected |
|
1409
|
0 |
4 |
if $cb |
|
1414
|
0 |
4 |
unless $self and $self->{'cfg'} |