| line |
true |
false |
branch |
|
135
|
1656 |
19 |
if (b->cap >= need) return; |
|
137
|
6 |
13 |
if (newcap < need) newcap = need; |
|
143
|
205 |
0 |
if (b->data) { Safefree(b->data); b->data = NULL; } |
|
185
|
0 |
0 |
if (len > 0) kf_buf_append(b, s, len); |
|
189
|
0 |
1 |
if (!s) { |
|
193
|
1 |
0 |
if (len > 0) kf_buf_append(b, s, len); |
|
200
|
0 |
0 |
if (len > 0) kf_buf_append(b, s, len); |
|
204
|
0 |
0 |
if (!s) { |
|
208
|
0 |
0 |
if (len > 0) kf_buf_append(b, s, len); |
|
215
|
21 |
864 |
while (val >= 0x80) { |
|
230
|
0 |
0 |
if (!s) { |
|
234
|
0 |
0 |
if (len > 0) kf_buf_append(b, s, len); |
|
271
|
867 |
0 |
while (p < end) { |
|
274
|
846 |
21 |
if (!(b & 0x80)) { |
|
279
|
0 |
21 |
if (shift >= 64) return -1; |
|
288
|
0 |
846 |
if (n < 0) return n; |
|
295
|
0 |
13 |
if (end - buf < 2) return -1; |
|
297
|
1 |
12 |
if (len < 0) { /* nullable null */ |
|
302
|
0 |
12 |
if (end - buf < 2 + len) return -1; |
|
312
|
0 |
0 |
if (n < 0) return -1; |
|
313
|
0 |
0 |
if (raw == 0) { |
|
319
|
0 |
0 |
if (end - buf - n < len) return -1; |
|
329
|
0 |
0 |
if (n < 0) return -1; |
|
332
|
0 |
0 |
for (i = 0; i < count; i++) { |
|
335
|
0 |
0 |
if (tn < 0) return -1; |
|
339
|
0 |
0 |
if (dn < 0) return -1; |
|
341
|
0 |
0 |
if ((uint64_t)(end - p) < dlen) return -1; |
|
356
|
5376 |
21 |
for (i = 0; i < 256; i++) { |
|
358
|
43008 |
5376 |
for (j = 0; j < 8; j++) { |
|
359
|
21504 |
21504 |
if (crc & 1) |
|
372
|
0 |
37 |
if (!crc32c_table_inited) crc32c_init_table(); |
|
373
|
205072 |
37 |
for (i = 0; i < len; i++) |
|
547
|
0 |
1 |
if (self->ssl) { |
|
548
|
0 |
0 |
int ssl_len = (len > (size_t)INT_MAX) ? INT_MAX : (int)len; |
|
551
|
0 |
0 |
if (ret <= 0) { |
|
553
|
0 |
0 |
if (err == SSL_ERROR_WANT_READ) { |
|
557
|
0 |
0 |
if (err == SSL_ERROR_WANT_WRITE) { |
|
562
|
0 |
0 |
if (err == SSL_ERROR_ZERO_RETURN) return 0; |
|
575
|
0 |
1 |
if (self->ssl) { |
|
576
|
0 |
0 |
int ssl_len = (len > (size_t)INT_MAX) ? INT_MAX : (int)len; |
|
579
|
0 |
0 |
if (ret <= 0) { |
|
581
|
0 |
0 |
if (err == SSL_ERROR_WANT_WRITE) { |
|
585
|
0 |
0 |
if (err == SSL_ERROR_WANT_READ) { |
|
605
|
1 |
0 |
if (self->rbuf_cap >= need) return; |
|
607
|
0 |
0 |
if (newcap < need) newcap = need; |
|
613
|
1 |
0 |
if (self->wbuf_cap >= need) return; |
|
615
|
0 |
0 |
if (newcap < need) newcap = need; |
|
625
|
1 |
0 |
if (!self->reading && self->fd >= 0) { |
|
|
1 |
0 |
if (!self->reading && self->fd >= 0) { |
|
632
|
1 |
8 |
if (self->reading) { |
|
639
|
3 |
0 |
if (!self->writing && self->fd >= 0) { |
|
|
3 |
0 |
if (!self->writing && self->fd >= 0) { |
|
646
|
3 |
9 |
if (self->writing) { |
|
657
|
0 |
1 |
if (!self->on_error) |
|
665
|
0 |
1 |
PUSHMARK(SP); |
|
666
|
0 |
1 |
XPUSHs(sv_2mortal(newSVpv(msg, 0))); |
|
669
|
1 |
0 |
if (SvTRUE(ERRSV)) { |
|
|
0 |
1 |
if (SvTRUE(ERRSV)) { |
|
670
|
0 |
0 |
warn("EV::Kafka::Conn: on_error callback error: %s", SvPV_nolen(ERRSV)); |
|
671
|
0 |
0 |
sv_setsv(ERRSV, &PL_sv_undef); |
|
673
|
1 |
0 |
FREETMPS; |
|
680
|
0 |
1 |
if (!self->on_connect) return; |
|
687
|
0 |
1 |
PUSHMARK(SP); |
|
690
|
1 |
0 |
if (SvTRUE(ERRSV)) { |
|
|
0 |
1 |
if (SvTRUE(ERRSV)) { |
|
691
|
0 |
0 |
warn("EV::Kafka::Conn: on_connect callback error: %s", SvPV_nolen(ERRSV)); |
|
692
|
0 |
0 |
sv_setsv(ERRSV, &PL_sv_undef); |
|
694
|
0 |
1 |
FREETMPS; |
|
701
|
2 |
0 |
if (!self->on_disconnect) return; |
|
708
|
0 |
0 |
PUSHMARK(SP); |
|
711
|
0 |
0 |
if (SvTRUE(ERRSV)) { |
|
|
0 |
0 |
if (SvTRUE(ERRSV)) { |
|
712
|
0 |
0 |
warn("EV::Kafka::Conn: on_disconnect callback error: %s", SvPV_nolen(ERRSV)); |
|
713
|
0 |
0 |
sv_setsv(ERRSV, &PL_sv_undef); |
|
715
|
0 |
0 |
FREETMPS; |
|
723
|
0 |
0 |
if (!cb) return; |
|
730
|
0 |
0 |
PUSHMARK(SP); |
|
731
|
0 |
0 |
EXTEND(SP, 2); |
|
732
|
0 |
0 |
PUSHs(result ? result : &PL_sv_undef); |
|
733
|
0 |
0 |
PUSHs(error ? error : &PL_sv_undef); |
|
736
|
0 |
0 |
if (SvTRUE(ERRSV)) { |
|
|
0 |
0 |
if (SvTRUE(ERRSV)) { |
|
737
|
0 |
0 |
warn("EV::Kafka::Conn: callback error: %s", SvPV_nolen(ERRSV)); |
|
738
|
0 |
0 |
sv_setsv(ERRSV, &PL_sv_undef); |
|
740
|
0 |
0 |
FREETMPS; |
|
757
|
0 |
9 |
if (self->timing) { |
|
762
|
0 |
9 |
if (self->ssl) { |
|
766
|
0 |
9 |
if (self->ssl_ctx) { |
|
771
|
2 |
7 |
if (self->fd >= 0) { |
|
780
|
9 |
0 |
SV *err_sv = in_destruct ? NULL : newSVpv(err, 0); |
|
783
|
0 |
9 |
while (!ngx_queue_empty(&self->cb_queue)) { |
|
788
|
0 |
0 |
if (cbt->cb && !cbt->internal && !in_destruct) { |
|
|
0 |
0 |
if (cbt->cb && !cbt->internal && !in_destruct) { |
|
|
0 |
0 |
if (cbt->cb && !cbt->internal && !in_destruct) { |
|
790
|
0 |
0 |
if (conn_check_destroyed(self)) { |
|
797
|
0 |
0 |
if (cbt->cb) SvREFCNT_dec(cbt->cb); |
|
800
|
9 |
0 |
if (err_sv) SvREFCNT_dec(err_sv); |
|
807
|
0 |
2 |
if (conn_check_destroyed(self)) return; |
|
810
|
0 |
2 |
if (conn_check_destroyed(self)) return; |
|
812
|
1 |
1 |
if (!self->intentional_disconnect && self->auto_reconnect) { |
|
|
0 |
1 |
if (!self->intentional_disconnect && self->auto_reconnect) { |
|
828
|
0 |
0 |
if (self->magic != KF_MAGIC_ALIVE) return; |
|
829
|
0 |
0 |
if (self->state != CONN_DISCONNECTED) return; |
|
830
|
0 |
0 |
if (!self->host) return; |
|
836
|
0 |
0 |
if (self->reconnect_timing) return; |
|
838
|
0 |
0 |
if (delay < 0.01) delay = 1.0; |
|
864
|
1 |
0 |
int flexible = (api_key == API_API_VERSIONS && api_version >= 3); |
|
|
0 |
1 |
int flexible = (api_key == API_API_VERSIONS && api_version >= 3); |
|
870
|
0 |
1 |
if (flexible) { |
|
880
|
0 |
1 |
if (raw_size > (size_t)INT32_MAX) { |
|
887
|
0 |
1 |
if (self->wbuf_off > 0 && self->wbuf_off > self->wbuf_len / 2) { |
|
|
0 |
0 |
if (self->wbuf_off > 0 && self->wbuf_off > self->wbuf_len / 2) { |
|
889
|
0 |
0 |
if (self->wbuf_len > 0) |
|
909
|
1 |
0 |
if (!no_response) { |
|
916
|
0 |
1 |
if (cb) { |
|
951
|
64 |
1 |
for (i = 0; i < API_VERSIONS_MAX_KEY; i++) |
|
954
|
0 |
1 |
if (end - p < 2) goto err; |
|
956
|
0 |
1 |
if (error_code != 0) { |
|
960
|
0 |
0 |
if (conn_check_destroyed(self)) return; |
|
966
|
0 |
1 |
if (end - p < 4) goto err; |
|
969
|
2 |
1 |
for (i = 0; i < count; i++) { |
|
970
|
0 |
2 |
if (end - p < 6) goto err; |
|
975
|
2 |
0 |
if (key >= 0 && key < API_VERSIONS_MAX_KEY) |
|
|
2 |
0 |
if (key >= 0 && key < API_VERSIONS_MAX_KEY) |
|
982
|
0 |
1 |
if (self->sasl_mechanism) { |
|
993
|
0 |
0 |
if (conn_check_destroyed(self)) return; |
|
1020
|
0 |
0 |
if (end - p < 2) goto err; |
|
1024
|
0 |
0 |
if (end - p < 4) goto err; |
|
1027
|
0 |
0 |
for (i = 0; i < count; i++) { |
|
1030
|
0 |
0 |
if (n < 0) goto err; |
|
1034
|
0 |
0 |
if (error_code != 0) { |
|
1036
|
0 |
0 |
if (conn_check_destroyed(self)) return; |
|
1047
|
0 |
0 |
if (conn_check_destroyed(self)) return; |
|
1060
|
0 |
0 |
if (self->sasl_mechanism && strcmp(self->sasl_mechanism, "PLAIN") == 0) { |
|
|
0 |
0 |
if (self->sasl_mechanism && strcmp(self->sasl_mechanism, "PLAIN") == 0) { |
|
1062
|
0 |
0 |
STRLEN ulen = self->sasl_username ? strlen(self->sasl_username) : 0; |
|
1063
|
0 |
0 |
STRLEN plen = self->sasl_password ? strlen(self->sasl_password) : 0; |
|
1068
|
0 |
0 |
if (ulen > 0) kf_buf_append(&body, self->sasl_username, ulen); |
|
1070
|
0 |
0 |
if (plen > 0) kf_buf_append(&body, self->sasl_password, plen); |
|
1073
|
0 |
0 |
else if (self->sasl_mechanism && |
|
1074
|
0 |
0 |
(strcmp(self->sasl_mechanism, "SCRAM-SHA-256") == 0 || |
|
1075
|
0 |
0 |
strcmp(self->sasl_mechanism, "SCRAM-SHA-512") == 0)) { |
|
1082
|
0 |
0 |
for (i = 0; i < 16; i++) |
|
1088
|
0 |
0 |
if (self->scram_nonce) Safefree(self->scram_nonce); |
|
1092
|
0 |
0 |
if (!self->sasl_username) { |
|
1105
|
0 |
0 |
if (self->scram_client_first) Safefree(self->scram_client_first); |
|
1124
|
0 |
0 |
if (ver < 0) ver = 1; |
|
1125
|
0 |
0 |
if (ver > 2) ver = 2; |
|
1137
|
0 |
0 |
if (end - p < 2) goto err; |
|
1144
|
0 |
0 |
if (n < 0) goto err; |
|
1151
|
0 |
0 |
if (end - p >= 4) { |
|
1153
|
0 |
0 |
if (auth_data_len > 0 && end - p >= auth_data_len) { |
|
|
0 |
0 |
if (auth_data_len > 0 && end - p >= auth_data_len) { |
|
1159
|
0 |
0 |
if (error_code != 0) { |
|
1161
|
0 |
0 |
if (errmsg_str && errmsg_len > 0) |
|
|
0 |
0 |
if (errmsg_str && errmsg_len > 0) |
|
1166
|
0 |
0 |
if (conn_check_destroyed(self)) return; |
|
1173
|
0 |
0 |
if (self->sasl_mechanism && self->scram_step == SCRAM_STEP_CLIENT_FIRST && auth_data) { |
|
|
0 |
0 |
if (self->sasl_mechanism && self->scram_step == SCRAM_STEP_CLIENT_FIRST && auth_data) { |
|
|
0 |
0 |
if (self->sasl_mechanism && self->scram_step == SCRAM_STEP_CLIENT_FIRST && auth_data) { |
|
1184
|
0 |
0 |
while (sp < se) { |
|
1185
|
0 |
0 |
if (sp + 2 <= se && sp[0] == 'r' && sp[1] == '=') { |
|
|
0 |
0 |
if (sp + 2 <= se && sp[0] == 'r' && sp[1] == '=') { |
|
|
0 |
0 |
if (sp + 2 <= se && sp[0] == 'r' && sp[1] == '=') { |
|
1187
|
0 |
0 |
while (sp < se && *sp != ',') sp++; |
|
|
0 |
0 |
while (sp < se && *sp != ',') sp++; |
|
1189
|
0 |
0 |
} else if (sp + 2 <= se && sp[0] == 's' && sp[1] == '=') { |
|
|
0 |
0 |
} else if (sp + 2 <= se && sp[0] == 's' && sp[1] == '=') { |
|
|
0 |
0 |
} else if (sp + 2 <= se && sp[0] == 's' && sp[1] == '=') { |
|
1191
|
0 |
0 |
while (sp < se && *sp != ',') sp++; |
|
|
0 |
0 |
while (sp < se && *sp != ',') sp++; |
|
1193
|
0 |
0 |
} else if (sp + 2 <= se && sp[0] == 'i' && sp[1] == '=') { |
|
|
0 |
0 |
} else if (sp + 2 <= se && sp[0] == 'i' && sp[1] == '=') { |
|
|
0 |
0 |
} else if (sp + 2 <= se && sp[0] == 'i' && sp[1] == '=') { |
|
1196
|
0 |
0 |
while (sp < se && *sp != ',') sp++; |
|
|
0 |
0 |
while (sp < se && *sp != ',') sp++; |
|
1198
|
0 |
0 |
if (sp < se && *sp == ',') sp++; |
|
|
0 |
0 |
if (sp < se && *sp == ',') sp++; |
|
1203
|
0 |
0 |
if (!server_nonce || !salt_b64 || iterations <= 0) { |
|
|
0 |
0 |
if (!server_nonce || !salt_b64 || iterations <= 0) { |
|
|
0 |
0 |
if (!server_nonce || !salt_b64 || iterations <= 0) { |
|
1205
|
0 |
0 |
if (conn_check_destroyed(self)) return; |
|
1211
|
0 |
0 |
if (server_nonce_len < 32 || |
|
1212
|
0 |
0 |
memcmp(server_nonce, self->scram_nonce, 32) != 0) { |
|
1214
|
0 |
0 |
if (conn_check_destroyed(self)) return; |
|
1220
|
0 |
0 |
const EVP_MD *md = is_sha512 ? EVP_sha512() : EVP_sha256(); |
|
1221
|
0 |
0 |
int digest_len = is_sha512 ? 64 : 32; |
|
1233
|
0 |
0 |
if (salt_len <= 0) { |
|
1235
|
0 |
0 |
if (conn_check_destroyed(self)) return; |
|
1292
|
0 |
0 |
for (di = 0; di < digest_len; di++) |
|
1296
|
0 |
0 |
if (self->scram_auth_message) Safefree(self->scram_auth_message); |
|
1313
|
0 |
0 |
int plen = bptr->length < 255 ? (int)bptr->length : 255; |
|
1338
|
0 |
0 |
if (ver < 0) ver = 1; |
|
1339
|
0 |
0 |
if (ver > 2) ver = 2; |
|
1347
|
0 |
0 |
if (self->sasl_mechanism && self->scram_step == SCRAM_STEP_CLIENT_FINAL) { |
|
|
0 |
0 |
if (self->sasl_mechanism && self->scram_step == SCRAM_STEP_CLIENT_FINAL) { |
|
1350
|
0 |
0 |
if (!auth_data || auth_data_len < 2 || auth_data[0] != 'v' || auth_data[1] != '=') { |
|
|
0 |
0 |
if (!auth_data || auth_data_len < 2 || auth_data[0] != 'v' || auth_data[1] != '=') { |
|
|
0 |
0 |
if (!auth_data || auth_data_len < 2 || auth_data[0] != 'v' || auth_data[1] != '=') { |
|
|
0 |
0 |
if (!auth_data || auth_data_len < 2 || auth_data[0] != 'v' || auth_data[1] != '=') { |
|
1352
|
0 |
0 |
if (conn_check_destroyed(self)) return; |
|
1361
|
0 |
0 |
if (strcmp(self->sasl_mechanism, "SCRAM-SHA-256") == 0) md = EVP_sha256(); |
|
1362
|
0 |
0 |
else if (strcmp(self->sasl_mechanism, "SCRAM-SHA-512") == 0) md = EVP_sha512(); |
|
1363
|
0 |
0 |
if (!md || self->scram_server_key_len <= 0 || !self->scram_auth_message) { |
|
|
0 |
0 |
if (!md || self->scram_server_key_len <= 0 || !self->scram_auth_message) { |
|
|
0 |
0 |
if (!md || self->scram_server_key_len <= 0 || !self->scram_auth_message) { |
|
1365
|
0 |
0 |
if (conn_check_destroyed(self)) return; |
|
1388
|
0 |
0 |
&& (CRYPTO_memcmp(recv_sig, expected, expected_len) == 0); |
|
|
0 |
0 |
&& (CRYPTO_memcmp(recv_sig, expected, expected_len) == 0); |
|
1393
|
0 |
0 |
if (self->scram_auth_message) { |
|
1401
|
0 |
0 |
if (!ok) { |
|
1403
|
0 |
0 |
if (conn_check_destroyed(self)) return; |
|
1419
|
0 |
0 |
if (conn_check_destroyed(self)) return; |
|
1431
|
1 |
0 |
if (cbt->internal) { |
|
1448
|
0 |
0 |
if (!cbt->cb) return; |
|
1567
|
140 |
1 |
if (key) { |
|
1575
|
140 |
1 |
if (value) { |
|
1583
|
1 |
140 |
if (headers && HvUSEDKEYS(headers) > 0) { |
|
|
0 |
1 |
if (headers && HvUSEDKEYS(headers) > 0) { |
|
|
1 |
0 |
if (headers && HvUSEDKEYS(headers) > 0) { |
|
1584
|
0 |
1 |
kf_buf_append_varint(&rec, (int64_t)HvUSEDKEYS(headers)); |
|
1587
|
2 |
1 |
while ((entry = hv_iternext(headers))) { |
|
1619
|
141 |
16 |
for (i = 0; i < count; i++) { |
|
1621
|
141 |
0 |
if (!elem || !SvROK(*elem) || SvTYPE(SvRV(*elem)) != SVt_PVHV) |
|
|
141 |
0 |
if (!elem || !SvROK(*elem) || SvTYPE(SvRV(*elem)) != SVt_PVHV) |
|
|
0 |
141 |
if (!elem || !SvROK(*elem) || SvTYPE(SvRV(*elem)) != SVt_PVHV) |
|
1628
|
141 |
16 |
for (i = 0; i < count; i++) { |
|
1637
|
141 |
0 |
if (key_sv && SvOK(*key_sv)) key = SvPV(*key_sv, key_len); |
|
|
140 |
1 |
if (key_sv && SvOK(*key_sv)) key = SvPV(*key_sv, key_len); |
|
1638
|
141 |
0 |
if (val_sv && SvOK(*val_sv)) val = SvPV(*val_sv, val_len); |
|
|
140 |
1 |
if (val_sv && SvOK(*val_sv)) val = SvPV(*val_sv, val_len); |
|
1639
|
1 |
140 |
if (hdr_sv && SvROK(*hdr_sv) && SvTYPE(SvRV(*hdr_sv)) == SVt_PVHV) |
|
|
1 |
0 |
if (hdr_sv && SvROK(*hdr_sv) && SvTYPE(SvRV(*hdr_sv)) == SVt_PVHV) |
|
|
1 |
0 |
if (hdr_sv && SvROK(*hdr_sv) && SvTYPE(SvRV(*hdr_sv)) == SVt_PVHV) |
|
1648
|
0 |
16 |
if (is_transactional) attrs |= 0x10; /* bit 4 = isTransactional */ |
|
1677
|
2 |
14 |
if (compression == COMPRESS_GZIP) { |
|
1686
|
2 |
0 |
if (zinit == Z_OK) { |
|
1691
|
2 |
0 |
if (deflate(&zs, Z_FINISH) == Z_STREAM_END) zok = 1; |
|
1694
|
2 |
0 |
if (zok) { |
|
1785
|
0 |
2 |
if (flexible) { |
|
1787
|
0 |
0 |
if (end - p < 4) goto done; |
|
1793
|
0 |
0 |
if (n < 0) goto done; |
|
1798
|
0 |
0 |
for (i = 0; i < broker_count; i++) { |
|
1800
|
0 |
0 |
if (end - p < 4) goto done; |
|
1806
|
0 |
0 |
if (n < 0) goto done; |
|
1808
|
0 |
0 |
hv_store(bh, "host", 4, newSVpvn(host ? host : "", host ? hlen : 0), 0); |
|
|
0 |
0 |
hv_store(bh, "host", 4, newSVpvn(host ? host : "", host ? hlen : 0), 0); |
|
1810
|
0 |
0 |
if (end - p < 4) goto done; |
|
1817
|
0 |
0 |
if (n < 0) goto done; |
|
1822
|
0 |
0 |
if (n < 0) goto done; |
|
1831
|
0 |
0 |
if (n < 0) goto done; |
|
1835
|
0 |
0 |
if (end - p < 4) goto done; |
|
1841
|
0 |
0 |
if (n < 0) goto done; |
|
1845
|
0 |
0 |
for (i = 0; i < topic_count; i++) { |
|
1847
|
0 |
0 |
if (end - p < 2) goto done; |
|
1853
|
0 |
0 |
if (n < 0) goto done; |
|
1855
|
0 |
0 |
hv_store(th, "name", 4, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0); |
|
|
0 |
0 |
hv_store(th, "name", 4, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0); |
|
1858
|
0 |
0 |
if (version >= 10) { |
|
1859
|
0 |
0 |
if (end - p < 16) goto done; |
|
1864
|
0 |
0 |
if (end - p < 1) goto done; |
|
1869
|
0 |
0 |
if (n < 0) goto done; |
|
1875
|
0 |
0 |
for (j = 0; j < part_count; j++) { |
|
1877
|
0 |
0 |
if (end - p < 2) goto done; |
|
1881
|
0 |
0 |
if (end - p < 4) goto done; |
|
1885
|
0 |
0 |
if (end - p < 4) goto done; |
|
1890
|
0 |
0 |
if (version >= 7) { |
|
1891
|
0 |
0 |
if (end - p < 4) goto done; |
|
1897
|
0 |
0 |
if (n < 0) goto done; |
|
1900
|
0 |
0 |
if (rcount < 0 || rcount > 65536) goto done; |
|
|
0 |
0 |
if (rcount < 0 || rcount > 65536) goto done; |
|
1901
|
0 |
0 |
if (end - p < (ptrdiff_t)((int64_t)rcount * 4)) goto done; |
|
1906
|
0 |
0 |
if (n < 0) goto done; |
|
1909
|
0 |
0 |
if (rcount < 0 || rcount > 65536) goto done; |
|
|
0 |
0 |
if (rcount < 0 || rcount > 65536) goto done; |
|
1910
|
0 |
0 |
if (end - p < (ptrdiff_t)((int64_t)rcount * 4)) goto done; |
|
1914
|
0 |
0 |
if (version >= 5) { |
|
1916
|
0 |
0 |
if (n < 0) goto done; |
|
1919
|
0 |
0 |
if (end - p < (ptrdiff_t)(rcount * 4)) goto done; |
|
1925
|
0 |
0 |
if (n < 0) goto done; |
|
1933
|
0 |
0 |
if (version >= 8) { |
|
1934
|
0 |
0 |
if (end - p < 4) goto done; |
|
1940
|
0 |
0 |
if (n < 0) goto done; |
|
1948
|
0 |
2 |
if (version >= 3) { |
|
1949
|
0 |
0 |
if (end - p < 4) goto done; |
|
1954
|
0 |
2 |
if (end - p < 4) goto done; |
|
1956
|
2 |
0 |
if (broker_count < 0 || broker_count > 65536) goto done; |
|
|
0 |
2 |
if (broker_count < 0 || broker_count > 65536) goto done; |
|
1958
|
2 |
1 |
for (i = 0; i < broker_count; i++) { |
|
1960
|
1 |
1 |
if (end - p < 4) goto done; |
|
1966
|
0 |
1 |
if (n < 0) goto done; |
|
1968
|
1 |
0 |
hv_store(bh, "host", 4, newSVpvn(host ? host : "", host ? hlen : 0), 0); |
|
|
1 |
0 |
hv_store(bh, "host", 4, newSVpvn(host ? host : "", host ? hlen : 0), 0); |
|
1970
|
0 |
1 |
if (end - p < 4) goto done; |
|
1975
|
0 |
1 |
if (version >= 1) { |
|
1978
|
0 |
0 |
if (n < 0) goto done; |
|
1986
|
0 |
1 |
if (version >= 2) { |
|
1989
|
0 |
0 |
if (n < 0) goto done; |
|
1994
|
0 |
1 |
if (version >= 1) { |
|
1995
|
0 |
0 |
if (end - p < 4) goto done; |
|
2001
|
0 |
1 |
if (end - p < 4) goto done; |
|
2003
|
1 |
0 |
if (topic_count < 0 || topic_count > 1000000) goto done; |
|
|
0 |
1 |
if (topic_count < 0 || topic_count > 1000000) goto done; |
|
2004
|
1 |
1 |
for (i = 0; i < topic_count; i++) { |
|
2006
|
0 |
1 |
if (end - p < 2) goto done; |
|
2012
|
0 |
1 |
if (n < 0) goto done; |
|
2014
|
1 |
0 |
hv_store(th, "name", 4, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0); |
|
|
1 |
0 |
hv_store(th, "name", 4, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0); |
|
2017
|
0 |
1 |
if (version >= 1) { |
|
2018
|
0 |
0 |
if (end - p < 1) goto done; |
|
2023
|
0 |
1 |
if (end - p < 4) goto done; |
|
2025
|
1 |
0 |
if (part_count < 0 || part_count > 1000000) goto done; |
|
|
0 |
1 |
if (part_count < 0 || part_count > 1000000) goto done; |
|
2028
|
1 |
1 |
for (j = 0; j < part_count; j++) { |
|
2030
|
0 |
1 |
if (end - p < 2) goto done; |
|
2034
|
0 |
1 |
if (end - p < 4) goto done; |
|
2038
|
0 |
1 |
if (end - p < 4) goto done; |
|
2043
|
0 |
1 |
if (version >= 7) { |
|
2044
|
0 |
0 |
if (end - p < 4) goto done; |
|
2049
|
0 |
1 |
if (end - p < 4) goto done; |
|
2051
|
1 |
0 |
if (rcount < 0 || rcount > 65536) goto done; |
|
|
0 |
1 |
if (rcount < 0 || rcount > 65536) goto done; |
|
2052
|
0 |
1 |
if (end - p < (ptrdiff_t)((int64_t)rcount * 4)) goto done; |
|
2056
|
0 |
1 |
if (end - p < 4) goto done; |
|
2058
|
1 |
0 |
if (rcount < 0 || rcount > 65536) goto done; |
|
|
0 |
1 |
if (rcount < 0 || rcount > 65536) goto done; |
|
2059
|
0 |
1 |
if (end - p < (ptrdiff_t)((int64_t)rcount * 4)) goto done; |
|
2063
|
0 |
1 |
if (version >= 5) { |
|
2064
|
0 |
0 |
if (end - p < 4) goto done; |
|
2066
|
0 |
0 |
if (end - p < (ptrdiff_t)(rcount * 4)) goto done; |
|
2097
|
0 |
1 |
if (end - p < 4) goto done; |
|
2101
|
1 |
1 |
for (i = 0; i < topic_count; i++) { |
|
2105
|
0 |
1 |
if (n < 0) goto done; |
|
2107
|
1 |
0 |
hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0); |
|
|
1 |
0 |
hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0); |
|
2110
|
0 |
1 |
if (end - p < 4) goto done; |
|
2115
|
1 |
1 |
for (j = 0; j < part_count; j++) { |
|
2117
|
0 |
1 |
if (end - p < 4) goto done; |
|
2121
|
0 |
1 |
if (end - p < 2) goto done; |
|
2125
|
0 |
1 |
if (end - p < 8) goto done; |
|
2130
|
0 |
1 |
if (version >= 2) { |
|
2131
|
0 |
0 |
if (end - p < 8) goto done; |
|
2136
|
0 |
1 |
if (version >= 5) { |
|
2137
|
0 |
0 |
if (end - p < 8) goto done; |
|
2148
|
1 |
0 |
if (version >= 1 && end - p >= 4) { |
|
|
0 |
0 |
if (version >= 1 && end - p >= 4) { |
|
2171
|
0 |
16 |
if (end - p < 12) return -1; |
|
2173
|
16 |
0 |
if (out_base_offset) *out_base_offset = base_offset; |
|
2176
|
1 |
15 |
if (end - p < batch_length) return -1; |
|
2179
|
0 |
15 |
if (batch_end - p < 9) return -1; |
|
2182
|
1 |
14 |
if (magic != 2) return -1; /* only support magic=2 (current format) */ |
|
2185
|
1 |
13 |
if (crc32c(p, (size_t)(batch_end - p)) != expected_crc) return -1; |
|
2187
|
0 |
13 |
if (batch_end - p < 36) return -1; |
|
2197
|
0 |
13 |
if (batch_end - p < 4) return -1; |
|
2205
|
7 |
6 |
if (compression_type != COMPRESS_NONE && batch_end > p) { |
|
|
7 |
0 |
if (compression_type != COMPRESS_NONE && batch_end > p) { |
|
2208
|
6 |
1 |
if (decomp_cap < 4096) decomp_cap = 4096; |
|
2211
|
1 |
6 |
if (compression_type == COMPRESS_GZIP) { |
|
2213
|
1 |
1 |
while (!zok && decomp_cap < 64 * 1024 * 1024) { |
|
|
1 |
0 |
while (!zok && decomp_cap < 64 * 1024 * 1024) { |
|
2218
|
0 |
1 |
if (zinit != Z_OK) { |
|
2230
|
1 |
0 |
if (zret == Z_STREAM_END) { |
|
2234
|
0 |
0 |
} else if (zret == Z_BUF_ERROR || zret == Z_OK) { |
|
|
0 |
0 |
} else if (zret == Z_BUF_ERROR || zret == Z_OK) { |
|
2313
|
138 |
13 |
for (i = 0; i < record_count; i++) { |
|
2316
|
0 |
138 |
if (n < 0) { if (decompressed) Safefree(decompressed); return -1; } |
|
|
0 |
0 |
if (n < 0) { if (decompressed) Safefree(decompressed); return -1; } |
|
2318
|
0 |
138 |
if (rec_end - rp < rec_len) { if (decompressed) Safefree(decompressed); return -1; } |
|
|
0 |
0 |
if (rec_end - rp < rec_len) { if (decompressed) Safefree(decompressed); return -1; } |
|
2321
|
0 |
138 |
if (this_rec_end - rp < 1) { if (decompressed) Safefree(decompressed); return -1; } |
|
|
0 |
0 |
if (this_rec_end - rp < 1) { if (decompressed) Safefree(decompressed); return -1; } |
|
2326
|
0 |
138 |
if (n < 0) { if (decompressed) Safefree(decompressed); return -1; } |
|
|
0 |
0 |
if (n < 0) { if (decompressed) Safefree(decompressed); return -1; } |
|
2331
|
0 |
138 |
if (n < 0) { if (decompressed) Safefree(decompressed); return -1; } |
|
|
0 |
0 |
if (n < 0) { if (decompressed) Safefree(decompressed); return -1; } |
|
2337
|
0 |
138 |
if (n < 0) { if (decompressed) Safefree(decompressed); return -1; } |
|
|
0 |
0 |
if (n < 0) { if (decompressed) Safefree(decompressed); return -1; } |
|
2340
|
137 |
1 |
if (key_len >= 0) { |
|
2341
|
0 |
137 |
if (this_rec_end - rp < key_len) { if (decompressed) Safefree(decompressed); return -1; } |
|
|
0 |
0 |
if (this_rec_end - rp < key_len) { if (decompressed) Safefree(decompressed); return -1; } |
|
2349
|
0 |
138 |
if (n < 0) { if (decompressed) Safefree(decompressed); return -1; } |
|
|
0 |
0 |
if (n < 0) { if (decompressed) Safefree(decompressed); return -1; } |
|
2352
|
137 |
1 |
if (val_len >= 0) { |
|
2353
|
0 |
137 |
if (this_rec_end - rp < val_len) { if (decompressed) Safefree(decompressed); return -1; } |
|
|
0 |
0 |
if (this_rec_end - rp < val_len) { if (decompressed) Safefree(decompressed); return -1; } |
|
2361
|
0 |
138 |
if (n < 0) { if (decompressed) Safefree(decompressed); return -1; } |
|
|
0 |
0 |
if (n < 0) { if (decompressed) Safefree(decompressed); return -1; } |
|
2364
|
1 |
137 |
if (hdr_count > 0) { |
|
2367
|
2 |
1 |
for (h = 0; h < hdr_count; h++) { |
|
2370
|
0 |
2 |
if (n < 0) { SvREFCNT_dec((SV*)hdr_hv); if (decompressed) Safefree(decompressed); return -1; } |
|
|
0 |
0 |
if (n < 0) { SvREFCNT_dec((SV*)hdr_hv); if (decompressed) Safefree(decompressed); return -1; } |
|
2373
|
0 |
2 |
if (this_rec_end - rp < hk_len) { SvREFCNT_dec((SV*)hdr_hv); if (decompressed) Safefree(decompressed); return -1; } |
|
|
0 |
0 |
if (this_rec_end - rp < hk_len) { SvREFCNT_dec((SV*)hdr_hv); if (decompressed) Safefree(decompressed); return -1; } |
|
2378
|
0 |
2 |
if (n < 0) { SvREFCNT_dec((SV*)hdr_hv); if (decompressed) Safefree(decompressed); return -1; } |
|
|
0 |
0 |
if (n < 0) { SvREFCNT_dec((SV*)hdr_hv); if (decompressed) Safefree(decompressed); return -1; } |
|
2381
|
2 |
0 |
if (hv_len >= 0) { |
|
2382
|
0 |
2 |
if (this_rec_end - rp < hv_len) { SvREFCNT_dec((SV*)hdr_hv); if (decompressed) Safefree(decompressed); return -1; } |
|
|
0 |
0 |
if (this_rec_end - rp < hv_len) { SvREFCNT_dec((SV*)hdr_hv); if (decompressed) Safefree(decompressed); return -1; } |
|
2386
|
2 |
0 |
hv_store(hdr_hv, hk_data, (I32)hk_len, |
|
2394
|
137 |
1 |
hv_store(rec_hv, "key", 3, |
|
2396
|
137 |
1 |
hv_store(rec_hv, "value", 5, |
|
2398
|
1 |
137 |
if (hdr_hv) |
|
2406
|
1 |
12 |
if (decompressed) Safefree(decompressed); |
|
2423
|
0 |
0 |
if (version >= 1) { |
|
2424
|
0 |
0 |
if (end - p < 4) goto done; |
|
2430
|
0 |
0 |
if (version >= 7) { |
|
2431
|
0 |
0 |
if (end - p < 2) goto done; |
|
2436
|
0 |
0 |
if (version >= 7) { |
|
2437
|
0 |
0 |
if (end - p < 4) goto done; |
|
2442
|
0 |
0 |
if (end - p < 4) goto done; |
|
2446
|
0 |
0 |
for (i = 0; i < topic_count; i++) { |
|
2451
|
0 |
0 |
if (n < 0) goto done; |
|
2453
|
0 |
0 |
hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0); |
|
|
0 |
0 |
hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0); |
|
2456
|
0 |
0 |
if (end - p < 4) goto done; |
|
2461
|
0 |
0 |
for (j = 0; j < part_count; j++) { |
|
2464
|
0 |
0 |
if (end - p < 4) goto done; |
|
2468
|
0 |
0 |
if (end - p < 2) goto done; |
|
2472
|
0 |
0 |
if (end - p < 8) goto done; |
|
2477
|
0 |
0 |
if (version >= 4) { |
|
2478
|
0 |
0 |
if (end - p < 8) goto done; |
|
2484
|
0 |
0 |
if (version >= 5) { |
|
2485
|
0 |
0 |
if (end - p < 8) goto done; |
|
2490
|
0 |
0 |
if (version >= 4) { |
|
2491
|
0 |
0 |
if (end - p < 4) goto done; |
|
2494
|
0 |
0 |
for (at = 0; at < at_count; at++) { |
|
2495
|
0 |
0 |
if (end - p < 16) goto done; |
|
2501
|
0 |
0 |
if (end - p < 4) goto done; |
|
2505
|
0 |
0 |
if (records_size > 0 && end - p >= records_size) { |
|
|
0 |
0 |
if (records_size > 0 && end - p >= records_size) { |
|
2510
|
0 |
0 |
while (rp < rend && rend - rp >= 12) { |
|
|
0 |
0 |
while (rp < rend && rend - rp >= 12) { |
|
2513
|
0 |
0 |
if (bl < 0 || rend - rp < 12 + bl) break; |
|
|
0 |
0 |
if (bl < 0 || rend - rp < 12 + bl) break; |
|
2519
|
0 |
0 |
} else if (records_size > 0) { |
|
2549
|
0 |
0 |
if (version >= 2) { |
|
2550
|
0 |
0 |
if (end - p < 4) goto done; |
|
2555
|
0 |
0 |
if (end - p < 4) goto done; |
|
2559
|
0 |
0 |
for (i = 0; i < topic_count; i++) { |
|
2563
|
0 |
0 |
if (n < 0) goto done; |
|
2565
|
0 |
0 |
hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0); |
|
|
0 |
0 |
hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0); |
|
2567
|
0 |
0 |
if (end - p < 4) goto done; |
|
2572
|
0 |
0 |
for (j = 0; j < part_count; j++) { |
|
2574
|
0 |
0 |
if (end - p < 4) goto done; |
|
2578
|
0 |
0 |
if (end - p < 2) goto done; |
|
2582
|
0 |
0 |
if (version >= 1) { |
|
2583
|
0 |
0 |
if (end - p < 8) goto done; |
|
2588
|
0 |
0 |
if (end - p < 8) goto done; |
|
2593
|
0 |
0 |
if (version >= 4) { |
|
2594
|
0 |
0 |
if (end - p < 4) goto done; |
|
2620
|
0 |
1 |
if (version >= 1) { |
|
2621
|
0 |
0 |
if (end - p < 4) goto done; |
|
2625
|
0 |
1 |
if (end - p < 2) goto done; |
|
2630
|
0 |
1 |
if (version >= 1) { |
|
2633
|
0 |
0 |
if (n < 0) goto done; |
|
2635
|
0 |
0 |
if (emsg && elen > 0) |
|
|
0 |
0 |
if (emsg && elen > 0) |
|
2639
|
0 |
1 |
if (end - p < 4) goto done; |
|
2645
|
0 |
1 |
if (n < 0) goto done; |
|
2647
|
1 |
0 |
hv_store(result, "host", 4, newSVpvn(host ? host : "", host ? hlen : 0), 0); |
|
|
1 |
0 |
hv_store(result, "host", 4, newSVpvn(host ? host : "", host ? hlen : 0), 0); |
|
2649
|
0 |
1 |
if (end - p < 4) goto done; |
|
2668
|
0 |
1 |
if (version >= 2) { |
|
2669
|
0 |
0 |
if (end - p < 4) goto done; |
|
2673
|
0 |
1 |
if (end - p < 2) goto done; |
|
2677
|
0 |
1 |
if (end - p < 4) goto done; |
|
2685
|
0 |
1 |
if (n < 0) goto done; |
|
2687
|
1 |
0 |
if (proto) |
|
2692
|
0 |
1 |
if (n < 0) goto done; |
|
2694
|
1 |
0 |
if (leader) |
|
2701
|
0 |
1 |
if (n < 0) goto done; |
|
2703
|
1 |
0 |
if (member_id) |
|
2707
|
0 |
1 |
if (end - p < 4) goto done; |
|
2711
|
1 |
1 |
for (i = 0; i < mcount; i++) { |
|
2716
|
0 |
1 |
if (n < 0) goto done; |
|
2718
|
1 |
0 |
if (mid) |
|
2722
|
0 |
1 |
if (version >= 5) { |
|
2725
|
0 |
0 |
if (n < 0) goto done; |
|
2730
|
0 |
1 |
if (end - p < 4) goto done; |
|
2732
|
0 |
1 |
if (mdlen > 0) { |
|
2733
|
0 |
0 |
if (end - p < mdlen) goto done; |
|
2756
|
0 |
1 |
if (version >= 1) { |
|
2757
|
0 |
0 |
if (end - p < 4) goto done; |
|
2761
|
0 |
1 |
if (end - p < 2) goto done; |
|
2766
|
0 |
1 |
if (end - p < 4) goto done; |
|
2768
|
0 |
1 |
if (alen > 0 && end - p >= alen) { |
|
|
0 |
1 |
if (alen > 0 && end - p >= alen) { |
|
2786
|
0 |
1 |
if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */ |
|
|
0 |
0 |
if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */ |
|
2787
|
1 |
0 |
if (end - p >= 2) { |
|
2806
|
0 |
0 |
if (version >= 3 && end - p >= 4) p += 4; /* throttle_time_ms */ |
|
|
0 |
0 |
if (version >= 3 && end - p >= 4) p += 4; /* throttle_time_ms */ |
|
2808
|
0 |
0 |
if (end - p < 4) goto done; |
|
2811
|
0 |
0 |
for (i = 0; i < tc; i++) { |
|
2815
|
0 |
0 |
if (n < 0) goto done; |
|
2817
|
0 |
0 |
hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0); |
|
|
0 |
0 |
hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0); |
|
2819
|
0 |
0 |
if (end - p < 4) goto done; |
|
2823
|
0 |
0 |
for (j = 0; j < pc; j++) { |
|
2825
|
0 |
0 |
if (end - p < 6) goto done; |
|
2852
|
0 |
1 |
if (version >= 3 && end - p >= 4) p += 4; /* throttle_time_ms */ |
|
|
0 |
0 |
if (version >= 3 && end - p >= 4) p += 4; /* throttle_time_ms */ |
|
2854
|
0 |
1 |
if (end - p < 4) goto done; |
|
2857
|
1 |
1 |
for (i = 0; i < tc; i++) { |
|
2861
|
0 |
1 |
if (n < 0) goto done; |
|
2863
|
1 |
0 |
hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0); |
|
|
1 |
0 |
hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0); |
|
2865
|
0 |
1 |
if (end - p < 4) goto done; |
|
2869
|
1 |
1 |
for (j = 0; j < pc; j++) { |
|
2871
|
0 |
1 |
if (end - p < 4) goto done; |
|
2875
|
0 |
1 |
if (end - p < 8) goto done; |
|
2880
|
0 |
1 |
if (version >= 5 && end - p >= 4) p += 4; |
|
|
0 |
0 |
if (version >= 5 && end - p >= 4) p += 4; |
|
2884
|
0 |
1 |
if (n < 0) goto done; |
|
2887
|
0 |
1 |
if (end - p < 2) goto done; |
|
2898
|
1 |
0 |
if (version >= 2 && end - p >= 2) { |
|
|
0 |
0 |
if (version >= 2 && end - p >= 2) { |
|
2917
|
0 |
1 |
if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */ |
|
|
0 |
0 |
if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */ |
|
2918
|
1 |
0 |
if (end - p >= 2) { |
|
2937
|
0 |
1 |
if (version >= 2 && end - p >= 4) p += 4; /* throttle_time_ms */ |
|
|
0 |
0 |
if (version >= 2 && end - p >= 4) p += 4; /* throttle_time_ms */ |
|
2939
|
0 |
1 |
if (end - p < 4) goto done; |
|
2942
|
1 |
1 |
for (i = 0; i < tc; i++) { |
|
2946
|
0 |
1 |
if (n < 0) goto done; |
|
2948
|
1 |
0 |
hv_store(th, "name", 4, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0); |
|
|
1 |
0 |
hv_store(th, "name", 4, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0); |
|
2950
|
0 |
1 |
if (end - p < 2) goto done; |
|
2955
|
0 |
1 |
if (version >= 1) { |
|
2958
|
0 |
0 |
if (n < 0) goto done; |
|
2960
|
0 |
0 |
if (emsg && elen > 0) |
|
|
0 |
0 |
if (emsg && elen > 0) |
|
2983
|
0 |
1 |
if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */ |
|
|
0 |
0 |
if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */ |
|
2985
|
0 |
1 |
if (end - p < 4) goto done; |
|
2988
|
1 |
1 |
for (i = 0; i < tc; i++) { |
|
2992
|
0 |
1 |
if (n < 0) goto done; |
|
2994
|
1 |
0 |
hv_store(th, "name", 4, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0); |
|
|
1 |
0 |
hv_store(th, "name", 4, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0); |
|
2996
|
0 |
1 |
if (end - p < 2) goto done; |
|
3017
|
1 |
0 |
if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */ |
|
|
1 |
0 |
if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */ |
|
3019
|
0 |
1 |
if (end - p < 2) goto done; |
|
3023
|
0 |
1 |
if (end - p < 8) goto done; |
|
3027
|
0 |
1 |
if (end - p < 2) goto done; |
|
3046
|
0 |
1 |
if (end - p < 4) goto done; |
|
3049
|
0 |
1 |
if (end - p < 4) goto done; |
|
3052
|
1 |
1 |
for (i = 0; i < tc; i++) { |
|
3056
|
0 |
1 |
if (n < 0) goto done; |
|
3058
|
1 |
0 |
hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0); |
|
|
1 |
0 |
hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0); |
|
3060
|
0 |
1 |
if (end - p < 4) goto done; |
|
3064
|
1 |
1 |
for (j = 0; j < pc; j++) { |
|
3066
|
0 |
1 |
if (end - p < 6) goto done; |
|
3091
|
1 |
0 |
if (end - p >= 4) p += 4; /* throttle_time_ms */ |
|
3092
|
1 |
0 |
if (end - p >= 2) { |
|
3111
|
0 |
1 |
if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */ |
|
|
0 |
0 |
if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */ |
|
3113
|
0 |
1 |
if (end - p < 4) goto done; |
|
3116
|
0 |
1 |
for (i = 0; i < tc; i++) { |
|
3120
|
0 |
0 |
if (n < 0) goto done; |
|
3122
|
0 |
0 |
hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0); |
|
|
0 |
0 |
hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0); |
|
3124
|
0 |
0 |
if (end - p < 4) goto done; |
|
3128
|
0 |
0 |
for (j = 0; j < pc; j++) { |
|
3130
|
0 |
0 |
if (end - p < 6) goto done; |
|
3151
|
1 |
1 |
while (self->rbuf_len >= 4) { |
|
3153
|
1 |
0 |
if (msg_size < 0 || msg_size > 256 * 1024 * 1024) { |
|
|
0 |
1 |
if (msg_size < 0 || msg_size > 256 * 1024 * 1024) { |
|
3155
|
0 |
0 |
if (conn_check_destroyed(self)) return; |
|
3160
|
0 |
1 |
if (self->rbuf_len < (size_t)(4 + msg_size)) |
|
3166
|
0 |
1 |
if (msg_size < 4) { |
|
3168
|
0 |
0 |
if (conn_check_destroyed(self)) return; |
|
3179
|
1 |
0 |
if (!ngx_queue_empty(&self->cb_queue)) { |
|
3182
|
0 |
1 |
if (cbt->correlation_id != corr_id) { |
|
3189
|
0 |
0 |
if (conn_check_destroyed(self)) return; |
|
3199
|
1 |
0 |
if (cbt) { |
|
3201
|
0 |
1 |
if (cbt->cb) SvREFCNT_dec(cbt->cb); |
|
3203
|
0 |
1 |
if (conn_check_destroyed(self)) return; |
|
3208
|
1 |
0 |
if (self->rbuf_len > 0) |
|
3222
|
4 |
0 |
if (!self || self->magic != KF_MAGIC_ALIVE) return; |
|
|
0 |
4 |
if (!self || self->magic != KF_MAGIC_ALIVE) return; |
|
3225
|
2 |
2 |
if (self->state == CONN_CONNECTING) { |
|
3229
|
2 |
0 |
if (self->timing) { |
|
3235
|
0 |
2 |
if (getsockopt(self->fd, SOL_SOCKET, SO_ERROR, &err, &errlen) < 0) |
|
3237
|
1 |
1 |
if (err != 0) { |
|
3241
|
0 |
1 |
if (conn_check_destroyed(self)) return; |
|
3252
|
0 |
2 |
if (self->state == CONN_TLS_HANDSHAKE) { |
|
3255
|
0 |
0 |
if (ret == 1) { |
|
3265
|
0 |
0 |
if (err == SSL_ERROR_WANT_READ) { |
|
3268
|
0 |
0 |
} else if (err == SSL_ERROR_WANT_WRITE) { |
|
3274
|
0 |
0 |
if (e) { |
|
3282
|
0 |
0 |
if (conn_check_destroyed(self)) return; |
|
3290
|
1 |
1 |
if (revents & EV_WRITE) { |
|
3291
|
1 |
1 |
while (self->wbuf_off < self->wbuf_len) { |
|
3294
|
0 |
1 |
if (n < 0) { |
|
3295
|
0 |
0 |
if (errno == EAGAIN || errno == EWOULDBLOCK) break; |
|
|
0 |
0 |
if (errno == EAGAIN || errno == EWOULDBLOCK) break; |
|
3297
|
0 |
0 |
if (conn_check_destroyed(self)) return; |
|
3301
|
0 |
1 |
if (n == 0) { |
|
3308
|
1 |
0 |
if (self->wbuf_off >= self->wbuf_len) { |
|
3316
|
1 |
1 |
if (revents & EV_READ) { |
|
3320
|
0 |
1 |
if (n < 0) { |
|
3321
|
0 |
0 |
if (errno == EAGAIN || errno == EWOULDBLOCK) return; |
|
|
0 |
0 |
if (errno == EAGAIN || errno == EWOULDBLOCK) return; |
|
3323
|
0 |
0 |
if (conn_check_destroyed(self)) return; |
|
3327
|
0 |
1 |
if (n == 0) { |
|
3348
|
0 |
0 |
if (self->magic != KF_MAGIC_ALIVE) return; |
|
3351
|
0 |
0 |
if (conn_check_destroyed(self)) return; |
|
3372
|
0 |
1 |
if (self->tls_enabled) { |
|
3374
|
0 |
0 |
if (!self->ssl_ctx) { |
|
3376
|
0 |
0 |
if (conn_check_destroyed(self)) return; |
|
3381
|
0 |
0 |
if (self->tls_skip_verify) |
|
3386
|
0 |
0 |
if (self->tls_ca_file) { |
|
3387
|
0 |
0 |
if (SSL_CTX_load_verify_locations(self->ssl_ctx, self->tls_ca_file, NULL) != 1) { |
|
3389
|
0 |
0 |
if (conn_check_destroyed(self)) return; |
|
3396
|
0 |
0 |
if (!self->ssl) { |
|
3398
|
0 |
0 |
if (conn_check_destroyed(self)) return; |
|
3404
|
0 |
0 |
if (!is_ip_literal(self->host)) |
|
3407
|
0 |
0 |
if (!self->tls_skip_verify) { |
|
3410
|
0 |
0 |
if (is_ip_literal(self->host)) |
|
3419
|
0 |
0 |
if (ret == 1) { |
|
3426
|
0 |
0 |
if (err == SSL_ERROR_WANT_READ) { |
|
3428
|
0 |
0 |
} else if (err == SSL_ERROR_WANT_WRITE) { |
|
3433
|
0 |
0 |
if (e) { |
|
3441
|
0 |
0 |
if (conn_check_destroyed(self)) return; |
|
3460
|
0 |
2 |
if (self->state != CONN_DISCONNECTED) { |
|
3465
|
2 |
0 |
if (host != self->host) { |
|
3466
|
0 |
2 |
if (self->host) Safefree(self->host); |
|
3484
|
0 |
2 |
if (gai_err != 0) { |
|
3488
|
0 |
2 |
if (gai_err != 0) { |
|
3495
|
2 |
0 |
for (rp = res; rp; rp = rp->ai_next) { |
|
3497
|
0 |
2 |
if (fd < 0) continue; |
|
3509
|
0 |
2 |
if (ret == 0) { |
|
3524
|
2 |
0 |
if (errno == EINPROGRESS) { |
|
3536
|
2 |
0 |
if (timeout > 0) { |
|
3567
|
0 |
7 |
if (SvOK(loop_sv) && sv_derived_from(loop_sv, "EV::Loop")) |
|
|
0 |
0 |
if (SvOK(loop_sv) && sv_derived_from(loop_sv, "EV::Loop")) |
|
3596
|
448 |
7 |
for (i = 0; i < API_VERSIONS_MAX_KEY; i++) |
|
3611
|
0 |
7 |
if (self->magic != KF_MAGIC_ALIVE) return; |
|
3619
|
0 |
7 |
if (self->reconnect_timing) { |
|
3624
|
2 |
5 |
CLEAR_HANDLER(self->on_error); |
|
3625
|
1 |
6 |
CLEAR_HANDLER(self->on_connect); |
|
3626
|
0 |
7 |
CLEAR_HANDLER(self->on_disconnect); |
|
3628
|
2 |
5 |
if (self->host) Safefree(self->host); |
|
3629
|
7 |
0 |
if (self->client_id) Safefree(self->client_id); |
|
3630
|
0 |
7 |
if (self->sasl_mechanism) Safefree(self->sasl_mechanism); |
|
3631
|
0 |
7 |
if (self->sasl_username) Safefree(self->sasl_username); |
|
3632
|
0 |
7 |
if (self->sasl_password) Safefree(self->sasl_password); |
|
3633
|
0 |
7 |
if (self->scram_nonce) Safefree(self->scram_nonce); |
|
3634
|
0 |
7 |
if (self->scram_client_first) Safefree(self->scram_client_first); |
|
3637
|
0 |
7 |
if (self->scram_auth_message) { |
|
3642
|
0 |
7 |
if (self->tls_ca_file) Safefree(self->tls_ca_file); |
|
3643
|
7 |
0 |
if (self->rbuf) Safefree(self->rbuf); |
|
3644
|
7 |
0 |
if (self->wbuf) Safefree(self->wbuf); |
|
3661
|
0 |
1 |
if (self->reconnect_timing) { |
|
3671
|
0 |
3 |
RETVAL = (self->state == CONN_READY) ? 1 : 0; |
|
3678
|
0 |
0 |
RETVAL = self->state; |
|
3685
|
0 |
0 |
RETVAL = self->pending_count; |
|
3693
|
0 |
2 |
CLEAR_HANDLER(self->on_error); |
|
3694
|
2 |
0 |
if (cb && SvOK(cb)) { |
|
|
2 |
0 |
if (cb && SvOK(cb)) { |
|
3703
|
0 |
1 |
CLEAR_HANDLER(self->on_connect); |
|
3704
|
1 |
0 |
if (cb && SvOK(cb)) { |
|
|
1 |
0 |
if (cb && SvOK(cb)) { |
|
3713
|
0 |
0 |
CLEAR_HANDLER(self->on_disconnect); |
|
3714
|
0 |
0 |
if (cb && SvOK(cb)) { |
|
|
0 |
0 |
if (cb && SvOK(cb)) { |
|
3723
|
0 |
0 |
if (id) { |
|
3724
|
0 |
0 |
if (self->client_id) Safefree(self->client_id); |
|
3735
|
0 |
0 |
if (self->tls_ca_file) { Safefree(self->tls_ca_file); self->tls_ca_file = NULL; } |
|
3736
|
0 |
0 |
if (ca_file) self->tls_ca_file = savepv(ca_file); |
|
3744
|
0 |
0 |
if (self->sasl_mechanism) { Safefree(self->sasl_mechanism); self->sasl_mechanism = NULL; } |
|
3745
|
0 |
0 |
if (self->sasl_username) { Safefree(self->sasl_username); self->sasl_username = NULL; } |
|
3746
|
0 |
0 |
if (self->sasl_password) { Safefree(self->sasl_password); self->sasl_password = NULL; } |
|
3747
|
0 |
0 |
if (SvOK(ST(1))) { |
|
3749
|
0 |
0 |
if (username) self->sasl_username = savepv(username); |
|
3750
|
0 |
0 |
if (password) self->sasl_password = savepv(password); |
|
3766
|
1 |
0 |
if (self->state != CONN_READY) |
|
3774
|
0 |
0 |
if (ver < 0) ver = 1; |
|
3775
|
0 |
0 |
if (ver > 4) ver = 4; |
|
3777
|
0 |
0 |
if (SvOK(topics_sv) && SvROK(topics_sv) && SvTYPE(SvRV(topics_sv)) == SVt_PVAV) { |
|
|
0 |
0 |
if (SvOK(topics_sv) && SvROK(topics_sv) && SvTYPE(SvRV(topics_sv)) == SVt_PVAV) { |
|
|
0 |
0 |
if (SvOK(topics_sv) && SvROK(topics_sv) && SvTYPE(SvRV(topics_sv)) == SVt_PVAV) { |
|
3781
|
0 |
0 |
for (i = 0; i < count; i++) { |
|
3792
|
0 |
0 |
if (ver >= 4) |
|
3803
|
0 |
0 |
if (!self->api_versions_known) |
|
3808
|
0 |
0 |
for (i = 0; i < API_VERSIONS_MAX_KEY; i++) { |
|
3809
|
0 |
0 |
if (self->api_versions[i] >= 0) { |
|
3815
|
0 |
0 |
EXTEND(SP, 1); |
|
3824
|
1 |
0 |
if (self->state != CONN_READY) |
|
3831
|
0 |
0 |
if (arg1 && SvROK(arg1) && SvTYPE(SvRV(arg1)) == SVt_PVHV) { |
|
|
0 |
0 |
if (arg1 && SvROK(arg1) && SvTYPE(SvRV(arg1)) == SVt_PVHV) { |
|
|
0 |
0 |
if (arg1 && SvROK(arg1) && SvTYPE(SvRV(arg1)) == SVt_PVHV) { |
|
3841
|
0 |
0 |
if (opts) { |
|
3843
|
0 |
0 |
if ((v = hv_fetchs(opts, "max_bytes", 0)) && SvOK(*v)) |
|
|
0 |
0 |
if ((v = hv_fetchs(opts, "max_bytes", 0)) && SvOK(*v)) |
|
3845
|
0 |
0 |
if ((v = hv_fetchs(opts, "max_wait_ms", 0)) && SvOK(*v)) |
|
|
0 |
0 |
if ((v = hv_fetchs(opts, "max_wait_ms", 0)) && SvOK(*v)) |
|
3847
|
0 |
0 |
if ((v = hv_fetchs(opts, "min_bytes", 0)) && SvOK(*v)) |
|
|
0 |
0 |
if ((v = hv_fetchs(opts, "min_bytes", 0)) && SvOK(*v)) |
|
3855
|
0 |
0 |
if (ver < 0) ver = 4; |
|
3856
|
0 |
0 |
if (ver > 7) ver = 7; |
|
3866
|
0 |
0 |
if (ver >= 3) |
|
3870
|
0 |
0 |
if (ver >= 4) |
|
3874
|
0 |
0 |
if (ver >= 7) { |
|
3891
|
0 |
0 |
if (ver >= 5) |
|
3898
|
0 |
0 |
if (ver >= 7) |
|
3909
|
0 |
0 |
if (self->state != CONN_READY) |
|
3914
|
0 |
0 |
if (!SvROK(topics_sv) || SvTYPE(SvRV(topics_sv)) != SVt_PVHV) |
|
|
0 |
0 |
if (!SvROK(topics_sv) || SvTYPE(SvRV(topics_sv)) != SVt_PVHV) |
|
3920
|
0 |
0 |
if (arg1 && SvROK(arg1) && SvTYPE(SvRV(arg1)) == SVt_PVHV) { |
|
|
0 |
0 |
if (arg1 && SvROK(arg1) && SvTYPE(SvRV(arg1)) == SVt_PVHV) { |
|
|
0 |
0 |
if (arg1 && SvROK(arg1) && SvTYPE(SvRV(arg1)) == SVt_PVHV) { |
|
3930
|
0 |
0 |
if (opts) { |
|
3932
|
0 |
0 |
if ((v = hv_fetchs(opts, "max_bytes", 0)) && SvOK(*v)) |
|
|
0 |
0 |
if ((v = hv_fetchs(opts, "max_bytes", 0)) && SvOK(*v)) |
|
3934
|
0 |
0 |
if ((v = hv_fetchs(opts, "max_wait_ms", 0)) && SvOK(*v)) |
|
|
0 |
0 |
if ((v = hv_fetchs(opts, "max_wait_ms", 0)) && SvOK(*v)) |
|
3936
|
0 |
0 |
if ((v = hv_fetchs(opts, "min_bytes", 0)) && SvOK(*v)) |
|
|
0 |
0 |
if ((v = hv_fetchs(opts, "min_bytes", 0)) && SvOK(*v)) |
|
3941
|
0 |
0 |
if (ver < 0) ver = 4; |
|
3942
|
0 |
0 |
if (ver > 7) ver = 7; |
|
3950
|
0 |
0 |
if (ver >= 3) |
|
3952
|
0 |
0 |
if (ver >= 4) |
|
3954
|
0 |
0 |
if (ver >= 7) { |
|
3960
|
0 |
0 |
kf_buf_append_i32(&body, (int32_t)HvUSEDKEYS(topics_hv)); |
|
3964
|
0 |
0 |
while ((entry = hv_iternext(topics_hv))) { |
|
3970
|
0 |
0 |
if (!SvROK(parts_sv) || SvTYPE(SvRV(parts_sv)) != SVt_PVAV) |
|
|
0 |
0 |
if (!SvROK(parts_sv) || SvTYPE(SvRV(parts_sv)) != SVt_PVAV) |
|
3976
|
0 |
0 |
for (i = 0; i < pc; i++) { |
|
3978
|
0 |
0 |
if (!elem || !SvROK(*elem)) |
|
|
0 |
0 |
if (!elem || !SvROK(*elem)) |
|
3983
|
0 |
0 |
int32_t pid = pid_sv ? (int32_t)SvIV(*pid_sv) : 0; |
|
3987
|
0 |
0 |
int64_t offset = off_sv ? (int64_t)SvIV(*off_sv) : 0; |
|
3990
|
0 |
0 |
if (ver >= 5) |
|
3997
|
0 |
0 |
if (ver >= 7) |
|
4008
|
0 |
0 |
if (self->state != CONN_READY) |
|
4011
|
0 |
0 |
if (!SvROK(records_sv) || SvTYPE(SvRV(records_sv)) != SVt_PVAV) |
|
|
0 |
0 |
if (!SvROK(records_sv) || SvTYPE(SvRV(records_sv)) != SVt_PVAV) |
|
4023
|
0 |
0 |
if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV) { |
|
|
0 |
0 |
if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV) { |
|
|
0 |
0 |
if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV) { |
|
4026
|
0 |
0 |
if ((tmp = hv_fetch(opts, "acks", 4, 0))) |
|
4028
|
0 |
0 |
if ((tmp = hv_fetch(opts, "transactional_id", 16, 0)) && SvOK(*tmp)) |
|
|
0 |
0 |
if ((tmp = hv_fetch(opts, "transactional_id", 16, 0)) && SvOK(*tmp)) |
|
4030
|
0 |
0 |
if ((tmp = hv_fetch(opts, "compression", 11, 0))) { |
|
4033
|
0 |
0 |
if (clen == 4 && memcmp(cstr, "none", 4) == 0) compression = COMPRESS_NONE; |
|
|
0 |
0 |
if (clen == 4 && memcmp(cstr, "none", 4) == 0) compression = COMPRESS_NONE; |
|
4038
|
0 |
0 |
else if (clen == 4 && memcmp(cstr, "gzip", 4) == 0) compression = COMPRESS_GZIP; |
|
|
0 |
0 |
else if (clen == 4 && memcmp(cstr, "gzip", 4) == 0) compression = COMPRESS_GZIP; |
|
4048
|
0 |
0 |
if ((tmp = hv_fetch(opts, "producer_id", 11, 0))) |
|
4050
|
0 |
0 |
if ((tmp = hv_fetch(opts, "producer_epoch", 14, 0))) |
|
4052
|
0 |
0 |
if ((tmp = hv_fetch(opts, "base_sequence", 13, 0))) |
|
4054
|
0 |
0 |
} else if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVCV) { |
|
|
0 |
0 |
} else if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVCV) { |
|
|
0 |
0 |
} else if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVCV) { |
|
4069
|
0 |
0 |
if (ver < 0) ver = 3; |
|
4070
|
0 |
0 |
if (ver > 7) ver = 7; |
|
4077
|
0 |
0 |
if (ver >= 3) |
|
4078
|
0 |
0 |
kf_buf_append_nullable_string(&body, txn_id, txn_id ? (int16_t)txn_id_len : 0); |
|
4090
|
0 |
0 |
conn_send_request(aTHX_ self, API_PRODUCE, ver, &body, |
|
4091
|
0 |
0 |
(cb && SvOK(cb)) ? cb : NULL, 0, (acks == 0) ? 1 : 0); |
|
4101
|
0 |
0 |
if (self->state != CONN_READY) |
|
4109
|
0 |
0 |
if (ver < 0) ver = 1; |
|
4110
|
0 |
0 |
if (ver > 5) ver = 5; |
|
4118
|
0 |
0 |
if (ver >= 2) |
|
4130
|
0 |
0 |
if (ver >= 4) |
|
4143
|
1 |
0 |
if (self->state != CONN_READY) |
|
4153
|
0 |
0 |
if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV) { |
|
|
0 |
0 |
if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV) { |
|
|
0 |
0 |
if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV) { |
|
4156
|
0 |
0 |
if ((tmp = hv_fetch(opts, "headers", 7, 0)) && SvROK(*tmp) && SvTYPE(SvRV(*tmp)) == SVt_PVHV) |
|
|
0 |
0 |
if ((tmp = hv_fetch(opts, "headers", 7, 0)) && SvROK(*tmp) && SvTYPE(SvRV(*tmp)) == SVt_PVHV) |
|
|
0 |
0 |
if ((tmp = hv_fetch(opts, "headers", 7, 0)) && SvROK(*tmp) && SvTYPE(SvRV(*tmp)) == SVt_PVHV) |
|
4158
|
0 |
0 |
if ((tmp = hv_fetch(opts, "acks", 4, 0))) |
|
4160
|
0 |
0 |
if ((tmp = hv_fetch(opts, "compression", 11, 0))) { |
|
4163
|
0 |
0 |
if (clen == 4 && memcmp(cstr, "none", 4) == 0) compression = COMPRESS_NONE; |
|
|
0 |
0 |
if (clen == 4 && memcmp(cstr, "none", 4) == 0) compression = COMPRESS_NONE; |
|
4168
|
0 |
0 |
else if (clen == 4 && memcmp(cstr, "gzip", 4) == 0) compression = COMPRESS_GZIP; |
|
|
0 |
0 |
else if (clen == 4 && memcmp(cstr, "gzip", 4) == 0) compression = COMPRESS_GZIP; |
|
4178
|
0 |
0 |
} else if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVCV) { |
|
|
0 |
0 |
} else if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVCV) { |
|
|
0 |
0 |
} else if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVCV) { |
|
4186
|
0 |
0 |
if (SvOK(key_sv)) |
|
4191
|
0 |
0 |
if (SvOK(value_sv)) |
|
4198
|
0 |
0 |
if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV) |
|
|
0 |
0 |
if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV) |
|
|
0 |
0 |
if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV) |
|
4200
|
0 |
0 |
if (ts_tmp && SvOK(*ts_tmp)) { |
|
|
0 |
0 |
if (ts_tmp && SvOK(*ts_tmp)) { |
|
4214
|
0 |
0 |
if (key) hv_stores(rec, "key", newSVpvn(key, key_len)); |
|
4215
|
0 |
0 |
if (value) hv_stores(rec, "value", newSVpvn(value, value_len)); |
|
4216
|
0 |
0 |
if (headers) hv_stores(rec, "headers", newRV_inc((SV*)headers)); |
|
4226
|
0 |
0 |
if (ver < 0) ver = 3; |
|
4227
|
0 |
0 |
if (ver > 7) ver = 7; |
|
4235
|
0 |
0 |
if (ver >= 3) |
|
4253
|
0 |
0 |
conn_send_request(aTHX_ self, API_PRODUCE, ver, &body, |
|
4254
|
0 |
0 |
(cb && SvOK(cb)) ? cb : NULL, 0, (acks == 0) ? 1 : 0); |
|
4264
|
0 |
0 |
if (self->state != CONN_READY) |
|
4268
|
0 |
0 |
if (ver < 0) ver = 0; |
|
4269
|
0 |
0 |
if (ver > 2) ver = 2; |
|
4278
|
0 |
0 |
if (ver >= 1) |
|
4289
|
0 |
0 |
if (self->state != CONN_READY) |
|
4293
|
0 |
0 |
if (ver < 0) ver = 1; |
|
4294
|
0 |
0 |
if (ver > 5) ver = 5; |
|
4304
|
0 |
0 |
if (ver >= 1) |
|
4311
|
0 |
0 |
if (ver >= 5) { |
|
4312
|
0 |
0 |
if (group_instance_id_sv && SvOK(group_instance_id_sv)) { |
|
|
0 |
0 |
if (group_instance_id_sv && SvOK(group_instance_id_sv)) { |
|
4339
|
0 |
0 |
for (i = 0; i < tc; i++) { |
|
4358
|
0 |
0 |
if (self->state != CONN_READY) |
|
4362
|
0 |
0 |
if (ver < 0) ver = 0; |
|
4363
|
0 |
0 |
if (ver > 3) ver = 3; |
|
4376
|
0 |
0 |
if (ver >= 3) { |
|
4377
|
0 |
0 |
if (group_instance_id_sv && SvOK(group_instance_id_sv)) { |
|
|
0 |
0 |
if (group_instance_id_sv && SvOK(group_instance_id_sv)) { |
|
4387
|
0 |
0 |
if (SvOK(assignments_sv) && SvROK(assignments_sv) |
|
|
0 |
0 |
if (SvOK(assignments_sv) && SvROK(assignments_sv) |
|
4388
|
0 |
0 |
&& SvTYPE(SvRV(assignments_sv)) == SVt_PVAV) { |
|
4393
|
0 |
0 |
for (i = 0; i < ac; i++) { |
|
4395
|
0 |
0 |
if (!elem || !SvROK(*elem)) continue; |
|
|
0 |
0 |
if (!elem || !SvROK(*elem)) continue; |
|
4399
|
0 |
0 |
if (!mid_sv) continue; |
|
4405
|
0 |
0 |
if (!data_sv) { kf_buf_append_bytes(&body, NULL, 0); continue; } |
|
4422
|
0 |
0 |
if (self->state != CONN_READY) |
|
4426
|
0 |
0 |
if (ver < 0) ver = 0; |
|
4427
|
0 |
0 |
if (ver > 4) ver = 4; |
|
4440
|
0 |
0 |
if (ver >= 3) { |
|
4441
|
0 |
0 |
if (group_instance_id_sv && SvOK(group_instance_id_sv)) { |
|
|
0 |
0 |
if (group_instance_id_sv && SvOK(group_instance_id_sv)) { |
|
4458
|
0 |
0 |
if (self->state != CONN_READY) |
|
4462
|
0 |
0 |
if (ver < 0) ver = 2; |
|
4463
|
0 |
0 |
if (ver > 7) ver = 7; |
|
4472
|
0 |
0 |
if (ver >= 1) |
|
4476
|
0 |
0 |
if (ver >= 1) { |
|
4482
|
0 |
0 |
if (ver >= 7) |
|
4490
|
0 |
0 |
for (i = 0; i < tc; i++) { |
|
4492
|
0 |
0 |
if (!elem || !SvROK(*elem)) continue; |
|
|
0 |
0 |
if (!elem || !SvROK(*elem)) continue; |
|
4495
|
0 |
0 |
if (!tname_sv) continue; |
|
4501
|
0 |
0 |
if (!parts_sv || !SvROK(*parts_sv)) { kf_buf_append_i32(&body, 0); continue; } |
|
|
0 |
0 |
if (!parts_sv || !SvROK(*parts_sv)) { kf_buf_append_i32(&body, 0); continue; } |
|
4506
|
0 |
0 |
for (j = 0; j < pc; j++) { |
|
4508
|
0 |
0 |
if (!pelem || !SvROK(*pelem)) continue; |
|
|
0 |
0 |
if (!pelem || !SvROK(*pelem)) continue; |
|
4512
|
0 |
0 |
kf_buf_append_i32(&body, pid_sv ? (int32_t)SvIV(*pid_sv) : 0); |
|
4515
|
0 |
0 |
kf_buf_append_i64(&body, off_sv ? (int64_t)SvIV(*off_sv) : 0); |
|
4518
|
0 |
0 |
if (ver >= 6) |
|
4534
|
0 |
0 |
if (self->state != CONN_READY) |
|
4538
|
0 |
0 |
if (ver < 0) ver = 1; |
|
4539
|
0 |
0 |
if (ver > 5) ver = 5; |
|
4552
|
0 |
0 |
for (i = 0; i < tc; i++) { |
|
4554
|
0 |
0 |
if (!elem || !SvROK(*elem)) continue; |
|
|
0 |
0 |
if (!elem || !SvROK(*elem)) continue; |
|
4557
|
0 |
0 |
if (!tname_sv) continue; |
|
4563
|
0 |
0 |
if (!parts_sv || !SvROK(*parts_sv)) { kf_buf_append_i32(&body, 0); continue; } |
|
|
0 |
0 |
if (!parts_sv || !SvROK(*parts_sv)) { kf_buf_append_i32(&body, 0); continue; } |
|
4568
|
0 |
0 |
for (j = 0; j < pc; j++) { |
|
4570
|
0 |
0 |
kf_buf_append_i32(&body, pelem ? (int32_t)SvIV(*pelem) : 0); |
|
4582
|
0 |
0 |
if (self->state != CONN_READY) |
|
4586
|
0 |
0 |
if (ver < 0) ver = 0; |
|
4587
|
0 |
0 |
if (ver > 3) ver = 3; |
|
4606
|
0 |
0 |
if (self->state != CONN_READY) |
|
4610
|
0 |
0 |
if (ver < 0) ver = 0; |
|
4611
|
0 |
0 |
if (ver > 4) ver = 4; |
|
4620
|
0 |
0 |
for (i = 0; i < tc; i++) { |
|
4622
|
0 |
0 |
if (!elem || !SvROK(*elem)) continue; |
|
|
0 |
0 |
if (!elem || !SvROK(*elem)) continue; |
|
4626
|
0 |
0 |
if (!name_sv) continue; |
|
4632
|
0 |
0 |
int32_t num_partitions = np_sv ? (int32_t)SvIV(*np_sv) : 1; |
|
4636
|
0 |
0 |
int16_t replication_factor = rf_sv ? (int16_t)SvIV(*rf_sv) : 1; |
|
4649
|
0 |
0 |
if (ver >= 1) |
|
4660
|
0 |
0 |
if (self->state != CONN_READY) |
|
4664
|
0 |
0 |
if (ver < 0) ver = 0; |
|
4665
|
0 |
0 |
if (ver > 3) ver = 3; |
|
4674
|
0 |
0 |
for (i = 0; i < tc; i++) { |
|
4676
|
0 |
0 |
if (!elem) continue; |
|
4692
|
0 |
0 |
if (self->state != CONN_READY) |
|
4696
|
0 |
0 |
if (ver < 0) ver = 0; |
|
4697
|
0 |
0 |
if (ver > 1) ver = 1; |
|
4702
|
0 |
0 |
if (SvOK(transactional_id_sv)) { |
|
4713
|
0 |
0 |
if (ver >= 2) { |
|
4726
|
0 |
0 |
if (self->state != CONN_READY) |
|
4730
|
0 |
0 |
if (ver < 0) ver = 0; |
|
4731
|
0 |
0 |
if (ver > 1) ver = 1; |
|
4744
|
0 |
0 |
if (!SvROK(topics_sv) || SvTYPE(SvRV(topics_sv)) != SVt_PVAV) |
|
|
0 |
0 |
if (!SvROK(topics_sv) || SvTYPE(SvRV(topics_sv)) != SVt_PVAV) |
|
4750
|
0 |
0 |
for (i = 0; i < tc; i++) { |
|
4752
|
0 |
0 |
if (!elem || !SvROK(*elem)) croak("add_partitions_to_txn: bad element"); |
|
|
0 |
0 |
if (!elem || !SvROK(*elem)) croak("add_partitions_to_txn: bad element"); |
|
4755
|
0 |
0 |
if (!tname_sv) croak("add_partitions_to_txn: missing topic"); |
|
4761
|
0 |
0 |
if (!parts_sv || !SvROK(*parts_sv)) croak("add_partitions_to_txn: missing partitions"); |
|
|
0 |
0 |
if (!parts_sv || !SvROK(*parts_sv)) croak("add_partitions_to_txn: missing partitions"); |
|
4765
|
0 |
0 |
for (j = 0; j < pc; j++) { |
|
4767
|
0 |
0 |
kf_buf_append_i32(&body, pelem ? (int32_t)SvIV(*pelem) : 0); |
|
4779
|
0 |
0 |
if (self->state != CONN_READY) |
|
4783
|
0 |
0 |
if (ver < 0) ver = 0; |
|
4784
|
0 |
0 |
if (ver > 1) ver = 1; |
|
4805
|
0 |
0 |
if (self->state != CONN_READY) |
|
4809
|
0 |
0 |
if (ver < 0) ver = 0; |
|
4810
|
0 |
0 |
if (ver > 3) ver = 3; |
|
4827
|
0 |
0 |
if (ver >= 3) { |
|
4835
|
0 |
0 |
if (!SvROK(offsets_sv) || SvTYPE(SvRV(offsets_sv)) != SVt_PVAV) |
|
|
0 |
0 |
if (!SvROK(offsets_sv) || SvTYPE(SvRV(offsets_sv)) != SVt_PVAV) |
|
4841
|
0 |
0 |
for (i = 0; i < tc; i++) { |
|
4843
|
0 |
0 |
if (!elem || !SvROK(*elem)) croak("txn_offset_commit: bad element"); |
|
|
0 |
0 |
if (!elem || !SvROK(*elem)) croak("txn_offset_commit: bad element"); |
|
4847
|
0 |
0 |
if (!tname_sv) croak("txn_offset_commit: missing topic"); |
|
4853
|
0 |
0 |
if (!parts_sv || !SvROK(*parts_sv)) croak("txn_offset_commit: missing partitions"); |
|
|
0 |
0 |
if (!parts_sv || !SvROK(*parts_sv)) croak("txn_offset_commit: missing partitions"); |
|
4858
|
0 |
0 |
for (j = 0; j < pc; j++) { |
|
4860
|
0 |
0 |
if (!pelem || !SvROK(*pelem)) croak("txn_offset_commit: bad partition"); |
|
|
0 |
0 |
if (!pelem || !SvROK(*pelem)) croak("txn_offset_commit: bad partition"); |
|
4864
|
0 |
0 |
kf_buf_append_i32(&body, ppid_sv ? (int32_t)SvIV(*ppid_sv) : 0); |
|
4867
|
0 |
0 |
kf_buf_append_i64(&body, off_sv ? (int64_t)SvIV(*off_sv) : 0); |
|
4870
|
0 |
0 |
if (ver >= 2) |
|
4894
|
259 |
16 |
while (remaining >= 4) { |
|
4916
|
9 |
7 |
RETVAL = (int)(h & 0x7FFFFFFF); |
|
4967
|
0 |
12 |
EXTEND(SP, 1); |
|
5008
|
0 |
14 |
if (n < 0) RETVAL = &PL_sv_undef; |
|
5022
|
16 |
0 |
if (!SvROK(records_sv) || SvTYPE(SvRV(records_sv)) != SVt_PVAV) |
|
|
0 |
16 |
if (!SvROK(records_sv) || SvTYPE(SvRV(records_sv)) != SVt_PVAV) |
|
5032
|
11 |
5 |
if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV) { |
|
|
11 |
0 |
if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV) { |
|
|
11 |
0 |
if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV) { |
|
5035
|
10 |
1 |
if ((v = hv_fetchs(opts, "compression", 0)) && SvOK(*v)) |
|
|
10 |
0 |
if ((v = hv_fetchs(opts, "compression", 0)) && SvOK(*v)) |
|
5037
|
1 |
10 |
if ((v = hv_fetchs(opts, "producer_id", 0)) && SvOK(*v)) |
|
|
1 |
0 |
if ((v = hv_fetchs(opts, "producer_id", 0)) && SvOK(*v)) |
|
5039
|
1 |
10 |
if ((v = hv_fetchs(opts, "producer_epoch", 0)) && SvOK(*v)) |
|
|
1 |
0 |
if ((v = hv_fetchs(opts, "producer_epoch", 0)) && SvOK(*v)) |
|
5041
|
1 |
10 |
if ((v = hv_fetchs(opts, "base_sequence", 0)) && SvOK(*v)) |
|
|
1 |
0 |
if ((v = hv_fetchs(opts, "base_sequence", 0)) && SvOK(*v)) |
|
5043
|
0 |
11 |
if ((v = hv_fetchs(opts, "is_transactional", 0)) && SvOK(*v)) |
|
|
0 |
0 |
if ((v = hv_fetchs(opts, "is_transactional", 0)) && SvOK(*v)) |
|
5045
|
0 |
11 |
if ((v = hv_fetchs(opts, "timestamp", 0)) && SvOK(*v)) |
|
|
0 |
0 |
if ((v = hv_fetchs(opts, "timestamp", 0)) && SvOK(*v)) |
|
5072
|
2 |
13 |
if (strcmp(api, "metadata") == 0) { |
|
5074
|
1 |
12 |
} else if (strcmp(api, "produce") == 0) { |
|
5076
|
0 |
12 |
} else if (strcmp(api, "fetch") == 0) { |
|
5078
|
0 |
12 |
} else if (strcmp(api, "list_offsets") == 0) { |
|
5080
|
1 |
11 |
} else if (strcmp(api, "find_coordinator") == 0) { |
|
5082
|
1 |
10 |
} else if (strcmp(api, "join_group") == 0) { |
|
5084
|
1 |
9 |
} else if (strcmp(api, "sync_group") == 0) { |
|
5086
|
1 |
8 |
} else if (strcmp(api, "heartbeat") == 0) { |
|
5088
|
0 |
8 |
} else if (strcmp(api, "offset_commit") == 0) { |
|
5090
|
1 |
7 |
} else if (strcmp(api, "offset_fetch") == 0) { |
|
5092
|
1 |
6 |
} else if (strcmp(api, "leave_group") == 0) { |
|
5094
|
1 |
5 |
} else if (strcmp(api, "create_topics") == 0) { |
|
5096
|
1 |
4 |
} else if (strcmp(api, "delete_topics") == 0) { |
|
5098
|
1 |
3 |
} else if (strcmp(api, "init_producer_id") == 0) { |
|
5100
|
1 |
2 |
} else if (strcmp(api, "add_partitions_to_txn") == 0) { |
|
5102
|
1 |
1 |
} else if (strcmp(api, "end_txn") == 0) { |
|
5104
|
1 |
0 |
} else if (strcmp(api, "txn_offset_commit") == 0) { |
|
5109
|
15 |
0 |
RETVAL = result ? SvREFCNT_inc(result) : &PL_sv_undef; |
|
5125
|
3 |
13 |
if (n < 0) { |
|
5137
|
0 |
21 |
I_EV_API("EV::Kafka"); |
|
|
21 |
0 |
I_EV_API("EV::Kafka"); |
|
|
0 |
21 |
I_EV_API("EV::Kafka"); |