| line |
true |
false |
branch |
|
75
|
1 |
11 |
if ($cfg->{'sasl'} and not $cfg->{'tls'}) |
|
77
|
1 |
0 |
if ($mech eq "PLAIN" or $mech =~ /^SCRAM-/) |
|
95
|
0 |
0 |
if ($c->connected) |
|
97
|
0 |
1 |
$conn && $conn->connected ? : |
|
103
|
0 |
0 |
if $cfg->{'conns'}{$node_id} |
|
106
|
0 |
0 |
unless $info |
|
114
|
0 |
0 |
if $weak |
|
125
|
0 |
0 |
if ($cfg->{'tls'}) |
|
128
|
0 |
0 |
if ($cfg->{'sasl'}) |
|
133
|
0 |
0 |
if $weak_cfg and $weak_cfg->{'on_error'} |
|
144
|
0 |
0 |
if ($idx >= @bs) |
|
146
|
0 |
0 |
if $cfg->{'on_error'} |
|
155
|
0 |
0 |
if $try |
|
165
|
0 |
0 |
if $cfg->{'on_error'} |
|
190
|
0 |
0 |
if $t->{'error_code'} |
|
200
|
0 |
1 |
if $cfg->{'meta_pending'} |
|
204
|
1 |
0 |
unless ($conn) |
|
209
|
0 |
0 |
if ($err) |
|
210
|
0 |
0 |
if $cfg->{'on_error'} |
|
218
|
0 |
0 |
if ($cfg->{'bootstrap_conn'} and $meta->{'brokers'} and @{$meta->{'brokers'};}) |
|
223
|
0 |
0 |
if ($cfg->{'idempotent'} || $cfg->{'transactional_id'} and $cfg->{'producer_id'} < 0) { } |
|
226
|
0 |
0 |
if $cb |
|
227
|
0 |
0 |
if $cfg->{'on_connect'} |
|
232
|
0 |
0 |
if $cb |
|
233
|
0 |
0 |
if $cfg->{'on_connect'} |
|
245
|
0 |
0 |
if $cfg->{'_meta_timer'} |
|
247
|
0 |
0 |
if $interval <= 0 |
|
250
|
0 |
0 |
unless $weak |
|
251
|
0 |
0 |
unless $weak->{'cfg'}{'meta_pending'} |
|
263
|
0 |
1 |
if $cfg->{'meta_pending'} |
|
268
|
1 |
0 |
if ($tries > 10) |
|
274
|
1 |
0 |
if ($op->{'topic'} and $op->{'topic'} eq $topic) { } |
|
275
|
1 |
0 |
if $op->{'cb'} |
|
281
|
1 |
0 |
if $cfg->{'on_error'} |
|
288
|
0 |
0 |
unless ($conn) |
|
293
|
0 |
0 |
if ($err) |
|
294
|
0 |
0 |
if $cfg->{'on_error'} |
|
302
|
0 |
0 |
if ($cfg->{'meta'} and ref $cfg->{'meta'}{'topics'} eq 'ARRAY') { } |
|
315
|
0 |
0 |
if ($t->{'name'} eq $topic and not $t->{'error_code'} and @{$t->{'partitions'} // [];}) |
|
321
|
0 |
0 |
if ($topic_ok) { } |
|
342
|
0 |
0 |
if (not $err and $res and not $res->{'error_code'}) { } |
|
347
|
0 |
0 |
if $cfg->{'on_error'} |
|
349
|
0 |
0 |
if $cb |
|
353
|
0 |
0 |
if ($cfg->{'transactional_id'}) { } |
|
356
|
0 |
0 |
if $cb |
|
|
0 |
0 |
unless ($conn) |
|
360
|
0 |
0 |
if ($err or $res->{'error_code'} and $res->{'error_code'} != 0) |
|
369
|
0 |
0 |
if ($txn_conn and $txn_conn->connected) { } |
|
386
|
0 |
0 |
if $cb |
|
|
0 |
0 |
unless ($conn) |
|
396
|
0 |
0 |
if (defined $op->{'node_id'} and $op->{'node_id'} == $node_id) { } |
|
411
|
0 |
0 |
if (defined $op->{'node_id'}) { } |
|
413
|
0 |
0 |
if ($conn and $conn->connected) { } |
|
431
|
0 |
0 |
unless my $meta = $self->{'cfg'}{'meta'} |
|
433
|
0 |
0 |
if $t->{'name'} eq $topic |
|
441
|
0 |
0 |
unless $np > 0 |
|
444
|
0 |
0 |
if ($cfg->{'partitioner'}) |
|
447
|
0 |
0 |
if (defined $key and length $key) |
|
460
|
0 |
0 |
if (ref $a eq 'CODE') { } |
|
|
0 |
0 |
elsif (ref $a eq 'HASH') { } |
|
467
|
1 |
0 |
unless ($cfg->{'meta'}) |
|
471
|
1 |
0 |
unless $cfg->{'meta_pending'} |
|
477
|
0 |
0 |
exists $opts{'partition'} ? : |
|
480
|
0 |
0 |
unless (defined $leader_id) |
|
485
|
0 |
0 |
unless $cfg->{'meta_pending'} |
|
490
|
0 |
0 |
unless ($conn and $conn->connected) |
|
501
|
0 |
0 |
if $opts{'headers'} |
|
511
|
0 |
0 |
if ($batch_bytes >= $cfg->{'batch_size'}) { } |
|
|
0 |
0 |
elsif (not $cfg->{'_linger_active'}) { } |
|
519
|
0 |
0 |
if $weak |
|
533
|
0 |
0 |
if $idempotent and $cfg->{'_inflight'}{$bkey} |
|
536
|
0 |
0 |
unless $batch and @$batch |
|
542
|
0 |
0 |
if $cfg->{'compression'} |
|
543
|
0 |
0 |
if $cfg->{'_txn_active'} |
|
545
|
0 |
0 |
if ($idempotent) |
|
554
|
0 |
0 |
if $cfg->{'_txn_active'} |
|
562
|
0 |
0 |
if $idempotent |
|
566
|
0 |
0 |
if (not $err and $result and ref $result->{'topics'} eq "ARRAY") |
|
570
|
0 |
0 |
if $ec == 6 or $ec == 15 or $ec == 16 |
|
573
|
0 |
0 |
if $ec == 45 or $ec == 46 |
|
582
|
0 |
0 |
if ($idempotent and $fatal_seq and not $cfg->{'_txn_active'} and ($cfg->{'_epoch_retries'}{$bkey} //= 1) > 0) |
|
588
|
0 |
0 |
if (exists $cfg->{'batches'}{$bkey}) { } |
|
597
|
0 |
0 |
if $weak_self |
|
598
|
0 |
0 |
if $weak_self |
|
602
|
0 |
0 |
if ($retriable and ($cfg->{'_batch_retries'}{$bkey} // 0) > 0) |
|
604
|
0 |
0 |
if defined $saved_seq |
|
605
|
0 |
0 |
if (exists $cfg->{'batches'}{$bkey}) { } |
|
610
|
0 |
0 |
if $weak_self and not $cfg->{'meta_pending'} |
|
613
|
0 |
0 |
if $weak_self |
|
623
|
0 |
0 |
if ($idempotent and not $err and defined $saved_seq) |
|
626
|
0 |
0 |
if $next > $cur |
|
631
|
0 |
0 |
if $cb |
|
635
|
0 |
0 |
if ($idempotent and $weak_self and $cfg->{'batches'}{$bkey} and @{$cfg->{'batches'}{$bkey};}) |
|
649
|
1 |
0 |
unless (defined $leader_id) |
|
651
|
0 |
0 |
unless ($conn and $conn->connected) |
|
655
|
1 |
0 |
if ($skipped and keys %{$$cfg{"batches"};}) |
|
660
|
0 |
0 |
if $weak |
|
668
|
1 |
1 |
if $cb and not $remaining |
|
673
|
2 |
0 |
if (ref $msg eq 'ARRAY') { } |
|
679
|
0 |
0 |
if %opts |
|
681
|
2 |
0 |
if ($acks0) { } |
|
686
|
0 |
0 |
if $err |
|
687
|
0 |
0 |
if (--$remaining <= 0 and $cb) |
|
688
|
0 |
0 |
@errors ? : |
|
693
|
0 |
1 |
@errors ? : |
|
|
1 |
0 |
if $cb and $acks0 |
|
711
|
0 |
0 |
unless $c and $c->connected |
|
717
|
0 |
0 |
if $cfg->{'bootstrap_conn'} and $cfg->{'bootstrap_conn'}->connected and not $seen{${$cfg->{'bootstrap_conn'};}} |
|
719
|
0 |
0 |
if $_ |
|
723
|
0 |
0 |
if (&$outstanding() == 0) |
|
724
|
0 |
0 |
if $cb |
|
728
|
0 |
0 |
if &$outstanding() > 0 |
|
730
|
0 |
0 |
if $cb |
|
747
|
0 |
0 |
if ($a->{'topic'} eq $topic and $a->{'partition'} == $partition) |
|
748
|
0 |
0 |
if ($offset_or_ts >= 0) { } |
|
750
|
0 |
0 |
if $cb |
|
754
|
0 |
0 |
defined $leader_id ? : |
|
755
|
0 |
0 |
if ($conn and $conn->connected) { } |
|
758
|
0 |
0 |
if (not $err and $res) |
|
760
|
0 |
0 |
if defined $off |
|
762
|
0 |
0 |
if $cb |
|
765
|
0 |
0 |
if $cb |
|
771
|
0 |
0 |
if $cb |
|
777
|
0 |
0 |
unless @{$cfg->{'assignments'};} |
|
779
|
0 |
0 |
unless ($cfg->{'meta'}) |
|
783
|
0 |
0 |
unless $cfg->{'meta_pending'} |
|
791
|
0 |
0 |
unless defined $leader_id |
|
802
|
0 |
0 |
unless $conn and $conn->connected |
|
826
|
0 |
0 |
if (not $err and $result and ref $result->{'topics'} eq "ARRAY") |
|
831
|
0 |
0 |
if ($cfg->{'on_message'}) |
|
839
|
0 |
0 |
if (@$records) |
|
841
|
0 |
0 |
if $a |
|
847
|
0 |
0 |
if $cb and $dispatched <= 0 |
|
850
|
0 |
0 |
if $cb and not $dispatched |
|
858
|
0 |
0 |
if $cb and not $np |
|
865
|
0 |
0 |
defined $leader_id ? : |
|
866
|
0 |
0 |
unless ($conn and $conn->connected) |
|
868
|
0 |
0 |
if $cb and --$remaining <= 0 |
|
876
|
0 |
0 |
if (not $err and $res and ref $res->{'topics'} eq "ARRAY") |
|
878
|
0 |
0 |
if ($ts == -2) { } |
|
881
|
0 |
0 |
if (++$pdone == 2) |
|
883
|
0 |
0 |
if $cb and --$remaining <= 0 |
|
894
|
0 |
0 |
if $cb and not @assignments |
|
901
|
0 |
0 |
defined $leader_id ? : |
|
902
|
0 |
0 |
if ($conn and $conn->connected) { } |
|
906
|
0 |
0 |
if (not $err and $res and ref $res->{'topics'} eq "ARRAY") |
|
914
|
0 |
0 |
if $cb and --$remaining <= 0 |
|
918
|
0 |
0 |
if $cb and --$remaining <= 0 |
|
924
|
0 |
0 |
if ref $_[0] |
|
937
|
0 |
0 |
if ($args[0] =~ /^(group_id|group_instance_id|on_assign|on_revoke|session_timeout|rebalance_timeout|heartbeat_interval|auto_commit|auto_offset_reset)$/) { } |
|
946
|
0 |
0 |
unless my $group_id = $opts{'group_id'} |
|
967
|
0 |
0 |
unless ($cfg->{'meta'}) |
|
971
|
0 |
0 |
unless $cfg->{'meta_pending'} |
|
981
|
0 |
0 |
unless my $g = $cfg->{'group'} |
|
984
|
0 |
0 |
unless $conn |
|
989
|
0 |
0 |
if ($err or $res->{'error_code'}) |
|
991
|
0 |
0 |
if $cfg->{'on_error'} |
|
1005
|
0 |
0 |
if ($coord->connected) { } |
|
1019
|
0 |
0 |
unless my $g = $cfg->{'group'} |
|
1020
|
0 |
0 |
unless my $coord = $g->{'coordinator'} |
|
1027
|
0 |
0 |
if ($err) |
|
1028
|
0 |
0 |
if $cfg->{'on_error'} |
|
1032
|
0 |
0 |
if ($res->{'error_code'} == 15 or $res->{'error_code'} == 16) |
|
1037
|
0 |
0 |
if ($res->{'error_code'} == 27) |
|
1042
|
0 |
0 |
if ($res->{'error_code'} == 79) |
|
1044
|
0 |
0 |
if $res->{'member_id'} |
|
1048
|
0 |
0 |
if ($res->{'error_code'} == 22 or $res->{'error_code'} == 25) |
|
1057
|
0 |
0 |
if ($res->{'error_code'}) |
|
1060
|
0 |
0 |
if $cfg->{'on_error'} |
|
1071
|
0 |
0 |
if ($is_leader and $res->{'members'} and @{$res->{'members'};}) |
|
1085
|
0 |
7 |
unless my $meta = $cfg->{'meta'} |
|
1090
|
0 |
11 |
unless grep {$_ eq $tname;} @$topics |
|
1095
|
0 |
16 |
unless $EV::Kafka::Client::a->{'topic'} cmp $EV::Kafka::Client::b->{'topic'} |
|
1105
|
1 |
6 |
@all_parts % $nm ? : |
|
1112
|
30 |
30 |
if $_->{'topic'} eq $p->{'topic'} |
|
|
10 |
0 |
if (grep {$_->{'partition'} == $p->{'partition'} if $_->{'topic'} eq $p->{'topic'};} @all_parts) |
|
1113
|
8 |
2 |
if (scalar @{$member_parts{$mid};} < $max_per) |
|
1127
|
10 |
42 |
if (scalar @{$member_parts{$mid};} < $min_count) |
|
1170
|
0 |
0 |
unless my $g = $cfg->{'group'} |
|
1171
|
0 |
0 |
unless my $coord = $g->{'coordinator'} |
|
1176
|
0 |
0 |
if ($err) |
|
1177
|
0 |
0 |
if $cfg->{'on_error'} |
|
1180
|
0 |
0 |
if ($res->{'error_code'} == 27) |
|
1185
|
0 |
0 |
if ($res->{'error_code'} == 22 or $res->{'error_code'} == 25) |
|
1192
|
0 |
0 |
if ($res->{'error_code'}) |
|
1195
|
0 |
0 |
if $cfg->{'on_error'} |
|
1203
|
0 |
0 |
if ($dlen >= 6) |
|
1207
|
0 |
0 |
unless $off + 2 <= $dlen |
|
1209
|
0 |
0 |
unless $off + $tlen <= $dlen |
|
1211
|
0 |
0 |
unless $off + 4 <= $dlen |
|
1214
|
0 |
0 |
unless $off + 4 <= $dlen |
|
1216
|
0 |
0 |
($g->{'auto_offset_reset'} // 'earliest') eq 'latest' ? : |
|
1231
|
0 |
0 |
if $g->{'on_assign'} |
|
1249
|
0 |
0 |
unless my $g = $cfg->{'group'} |
|
1251
|
0 |
0 |
unless $coord and $coord->connected and @$assignments |
|
1265
|
0 |
0 |
if (not $err and $res and ref $res->{'topics'} eq "ARRAY") |
|
1268
|
0 |
0 |
if $p->{'error_code'} |
|
1269
|
0 |
0 |
if $p->{'offset'} < 0 |
|
1271
|
0 |
0 |
if ($a->{'topic'} eq $t->{'topic'} and $a->{'partition'} == $p->{'partition'}) |
|
1281
|
0 |
0 |
if (@need_offsets) { } |
|
1285
|
0 |
0 |
defined $leader_id ? : |
|
1286
|
0 |
0 |
if ($lconn and $lconn->connected) { } |
|
1289
|
0 |
0 |
if (not $lerr and $lres and ref $lres->{'topics'} eq "ARRAY") |
|
1292
|
0 |
0 |
unless $lp->{'error_code'} |
|
1297
|
0 |
0 |
if $remaining <= 0 |
|
1302
|
0 |
0 |
if $remaining <= 0 |
|
1314
|
0 |
0 |
unless my $g = $cfg->{'group'} |
|
1317
|
0 |
0 |
unless $g->{'state'} eq "stable" |
|
1319
|
0 |
0 |
unless $coord and $coord->connected |
|
1323
|
0 |
0 |
if ($err) |
|
1324
|
0 |
0 |
unless $res |
|
1326
|
0 |
0 |
if ($ec == 27) { } |
|
|
0 |
0 |
elsif ($ec == 22 or $ec == 25) { } |
|
|
0 |
0 |
elsif ($ec == 15 or $ec == 16) { } |
|
1329
|
0 |
0 |
if $g->{'on_revoke'} |
|
1356
|
11 |
0 |
unless my $g = $self->{'cfg'}{'group'} |
|
1363
|
0 |
0 |
if $cfg->{'fetch_active'} |
|
1369
|
0 |
0 |
unless $weak and $cfg->{'fetch_active'} |
|
1372
|
0 |
0 |
if $cfg->{'_fetch_in_flight'} |
|
1390
|
0 |
0 |
if $cb |
|
|
0 |
0 |
unless ($g) |
|
1392
|
0 |
0 |
if $cb |
|
|
0 |
0 |
unless ($coord and $coord->connected) |
|
1408
|
0 |
0 |
if $cb |
|
|
0 |
0 |
unless (@topics) |
|
1412
|
0 |
0 |
if $cb |
|
1426
|
0 |
0 |
if ($g and $g->{'coordinator'} and $g->{'coordinator'}->connected and $g->{'member_id'}) { } |
|
1430
|
0 |
0 |
if $cb |
|
1435
|
0 |
0 |
if $cb |
|
1439
|
0 |
0 |
if ($g and $g->{'auto_commit'}) { } |
|
1451
|
0 |
0 |
unless $cfg->{'transactional_id'} |
|
1452
|
0 |
0 |
unless defined $cfg->{'producer_id'} and $cfg->{'producer_id'} >= 0 |
|
1461
|
0 |
0 |
if $conn and $conn->connected |
|
1468
|
0 |
0 |
unless $cfg->{'_txn_active'} |
|
1470
|
0 |
0 |
if $cfg->{'_txn_partitions'}{$key}++ |
|
1473
|
0 |
0 |
unless $conn |
|
1486
|
0 |
0 |
unless $cfg->{'_txn_active'} |
|
1491
|
0 |
0 |
if $cb |
|
|
0 |
0 |
unless ($conn) |
|
1498
|
0 |
0 |
if $cb |
|
1506
|
0 |
0 |
unless $cfg->{'_txn_active'} |
|
1507
|
0 |
0 |
unless $cfg->{'transactional_id'} |
|
1523
|
0 |
0 |
unless (@topics) |
|
1524
|
0 |
0 |
if $cb |
|
1529
|
0 |
0 |
if $cb |
|
|
0 |
0 |
unless ($conn) |
|
1532
|
0 |
0 |
$g ? : |
|
1533
|
0 |
0 |
$g ? : |
|
1541
|
0 |
0 |
if $cb |
|
1549
|
0 |
0 |
unless $cfg->{'_txn_active'} |
|
1568
|
0 |
0 |
if $cb |
|
|
0 |
0 |
unless ($conn) |
|
1575
|
0 |
0 |
if $cb |
|
1581
|
0 |
11 |
unless my $cfg = $self->{'cfg'} |
|
1588
|
0 |
0 |
if $conn and $conn->connected |
|
1590
|
0 |
11 |
if ($cfg->{'bootstrap_conn'}) |
|
1592
|
0 |
0 |
if $cfg->{'bootstrap_conn'}->connected |
|
1596
|
0 |
11 |
if $cb |
|
1601
|
0 |
11 |
unless $self and $self->{'cfg'} |