| line |
true |
false |
branch |
|
135
|
1656 |
19 |
if (b->cap >= need) return; |
|
137
|
6 |
13 |
if (newcap < need) newcap = need; |
|
143
|
205 |
0 |
if (b->data) { Safefree(b->data); b->data = NULL; } |
|
185
|
0 |
0 |
if (len > 0) kf_buf_append(b, s, len); |
|
189
|
0 |
1 |
if (!s) { |
|
193
|
1 |
0 |
if (len > 0) kf_buf_append(b, s, len); |
|
200
|
0 |
0 |
if (len > 0) kf_buf_append(b, s, len); |
|
204
|
0 |
0 |
if (!s) { |
|
208
|
0 |
0 |
if (len > 0) kf_buf_append(b, s, len); |
|
215
|
21 |
864 |
while (val >= 0x80) { |
|
230
|
0 |
0 |
if (!s) { |
|
234
|
0 |
0 |
if (len > 0) kf_buf_append(b, s, len); |
|
271
|
867 |
0 |
while (p < end) { |
|
274
|
846 |
21 |
if (!(b & 0x80)) { |
|
279
|
0 |
21 |
if (shift >= 64) return -1; |
|
288
|
0 |
846 |
if (n < 0) return n; |
|
295
|
0 |
13 |
if (end - buf < 2) return -1; |
|
297
|
1 |
12 |
if (len < 0) { /* nullable null */ |
|
302
|
0 |
12 |
if (end - buf < 2 + len) return -1; |
|
312
|
0 |
0 |
if (n < 0) return -1; |
|
313
|
0 |
0 |
if (raw == 0) { |
|
319
|
0 |
0 |
if (end - buf - n < len) return -1; |
|
329
|
0 |
0 |
if (n < 0) return -1; |
|
332
|
0 |
0 |
for (i = 0; i < count; i++) { |
|
335
|
0 |
0 |
if (tn < 0) return -1; |
|
339
|
0 |
0 |
if (dn < 0) return -1; |
|
341
|
0 |
0 |
if ((uint64_t)(end - p) < dlen) return -1; |
|
356
|
5376 |
21 |
for (i = 0; i < 256; i++) { |
|
358
|
43008 |
5376 |
for (j = 0; j < 8; j++) { |
|
359
|
21504 |
21504 |
if (crc & 1) |
|
372
|
0 |
37 |
if (!crc32c_table_inited) crc32c_init_table(); |
|
373
|
205072 |
37 |
for (i = 0; i < len; i++) |
|
547
|
0 |
1 |
if (self->ssl) { |
|
548
|
0 |
0 |
int ssl_len = (len > (size_t)INT_MAX) ? INT_MAX : (int)len; |
|
551
|
0 |
0 |
if (ret <= 0) { |
|
553
|
0 |
0 |
if (err == SSL_ERROR_WANT_READ) { |
|
557
|
0 |
0 |
if (err == SSL_ERROR_WANT_WRITE) { |
|
562
|
0 |
0 |
if (err == SSL_ERROR_ZERO_RETURN) return 0; |
|
575
|
0 |
1 |
if (self->ssl) { |
|
576
|
0 |
0 |
int ssl_len = (len > (size_t)INT_MAX) ? INT_MAX : (int)len; |
|
579
|
0 |
0 |
if (ret <= 0) { |
|
581
|
0 |
0 |
if (err == SSL_ERROR_WANT_WRITE) { |
|
585
|
0 |
0 |
if (err == SSL_ERROR_WANT_READ) { |
|
605
|
1 |
0 |
if (self->rbuf_cap >= need) return; |
|
607
|
0 |
0 |
if (newcap < need) newcap = need; |
|
613
|
1 |
0 |
if (self->wbuf_cap >= need) return; |
|
615
|
0 |
0 |
if (newcap < need) newcap = need; |
|
625
|
1 |
0 |
if (!self->reading && self->fd >= 0) { |
|
|
1 |
0 |
if (!self->reading && self->fd >= 0) { |
|
632
|
1 |
8 |
if (self->reading) { |
|
639
|
3 |
0 |
if (!self->writing && self->fd >= 0) { |
|
|
3 |
0 |
if (!self->writing && self->fd >= 0) { |
|
646
|
3 |
9 |
if (self->writing) { |
|
657
|
0 |
1 |
if (!self->on_error) |
|
665
|
0 |
1 |
PUSHMARK(SP); |
|
666
|
0 |
1 |
XPUSHs(sv_2mortal(newSVpv(msg, 0))); |
|
669
|
1 |
0 |
if (SvTRUE(ERRSV)) { |
|
|
0 |
1 |
if (SvTRUE(ERRSV)) { |
|
670
|
0 |
0 |
warn("EV::Kafka::Conn: on_error callback error: %s", SvPV_nolen(ERRSV)); |
|
671
|
0 |
0 |
sv_setsv(ERRSV, &PL_sv_undef); |
|
673
|
1 |
0 |
FREETMPS; |
|
680
|
0 |
1 |
if (!self->on_connect) return; |
|
687
|
0 |
1 |
PUSHMARK(SP); |
|
690
|
1 |
0 |
if (SvTRUE(ERRSV)) { |
|
|
0 |
1 |
if (SvTRUE(ERRSV)) { |
|
691
|
0 |
0 |
warn("EV::Kafka::Conn: on_connect callback error: %s", SvPV_nolen(ERRSV)); |
|
692
|
0 |
0 |
sv_setsv(ERRSV, &PL_sv_undef); |
|
694
|
0 |
1 |
FREETMPS; |
|
701
|
2 |
0 |
if (!self->on_disconnect) return; |
|
708
|
0 |
0 |
PUSHMARK(SP); |
|
711
|
0 |
0 |
if (SvTRUE(ERRSV)) { |
|
|
0 |
0 |
if (SvTRUE(ERRSV)) { |
|
712
|
0 |
0 |
warn("EV::Kafka::Conn: on_disconnect callback error: %s", SvPV_nolen(ERRSV)); |
|
713
|
0 |
0 |
sv_setsv(ERRSV, &PL_sv_undef); |
|
715
|
0 |
0 |
FREETMPS; |
|
723
|
0 |
0 |
if (!cb) return; |
|
730
|
0 |
0 |
PUSHMARK(SP); |
|
731
|
0 |
0 |
EXTEND(SP, 2); |
|
732
|
0 |
0 |
PUSHs(result ? result : &PL_sv_undef); |
|
733
|
0 |
0 |
PUSHs(error ? error : &PL_sv_undef); |
|
736
|
0 |
0 |
if (SvTRUE(ERRSV)) { |
|
|
0 |
0 |
if (SvTRUE(ERRSV)) { |
|
737
|
0 |
0 |
warn("EV::Kafka::Conn: callback error: %s", SvPV_nolen(ERRSV)); |
|
738
|
0 |
0 |
sv_setsv(ERRSV, &PL_sv_undef); |
|
740
|
0 |
0 |
FREETMPS; |
|
757
|
0 |
9 |
if (self->timing) { |
|
762
|
0 |
9 |
if (self->ssl) { |
|
766
|
0 |
9 |
if (self->ssl_ctx) { |
|
771
|
2 |
7 |
if (self->fd >= 0) { |
|
780
|
9 |
0 |
SV *err_sv = in_destruct ? NULL : newSVpv(err, 0); |
|
783
|
0 |
9 |
while (!ngx_queue_empty(&self->cb_queue)) { |
|
788
|
0 |
0 |
if (cbt->cb && !cbt->internal && !in_destruct) { |
|
|
0 |
0 |
if (cbt->cb && !cbt->internal && !in_destruct) { |
|
|
0 |
0 |
if (cbt->cb && !cbt->internal && !in_destruct) { |
|
790
|
0 |
0 |
if (conn_check_destroyed(self)) { |
|
797
|
0 |
0 |
if (cbt->cb) SvREFCNT_dec(cbt->cb); |
|
800
|
9 |
0 |
if (err_sv) SvREFCNT_dec(err_sv); |
|
807
|
0 |
2 |
if (conn_check_destroyed(self)) return; |
|
810
|
0 |
2 |
if (conn_check_destroyed(self)) return; |
|
812
|
1 |
1 |
if (!self->intentional_disconnect && self->auto_reconnect) { |
|
|
0 |
1 |
if (!self->intentional_disconnect && self->auto_reconnect) { |
|
828
|
0 |
0 |
if (self->magic != KF_MAGIC_ALIVE) return; |
|
829
|
0 |
0 |
if (self->state != CONN_DISCONNECTED) return; |
|
830
|
0 |
0 |
if (!self->host) return; |
|
836
|
0 |
0 |
if (self->reconnect_timing) return; |
|
838
|
0 |
0 |
if (delay < 0.01) delay = 1.0; |
|
864
|
1 |
0 |
int flexible = (api_key == API_API_VERSIONS && api_version >= 3); |
|
|
0 |
1 |
int flexible = (api_key == API_API_VERSIONS && api_version >= 3); |
|
870
|
0 |
1 |
if (flexible) { |
|
880
|
0 |
1 |
if (raw_size > (size_t)INT32_MAX) { |
|
887
|
0 |
1 |
if (self->wbuf_off > 0 && self->wbuf_off > self->wbuf_len / 2) { |
|
|
0 |
0 |
if (self->wbuf_off > 0 && self->wbuf_off > self->wbuf_len / 2) { |
|
889
|
0 |
0 |
if (self->wbuf_len > 0) |
|
909
|
1 |
0 |
if (!no_response) { |
|
916
|
0 |
1 |
if (cb) { |
|
951
|
64 |
1 |
for (i = 0; i < API_VERSIONS_MAX_KEY; i++) |
|
954
|
0 |
1 |
if (end - p < 2) goto err; |
|
956
|
0 |
1 |
if (error_code != 0) { |
|
960
|
0 |
0 |
if (conn_check_destroyed(self)) return; |
|
966
|
0 |
1 |
if (end - p < 4) goto err; |
|
969
|
2 |
1 |
for (i = 0; i < count; i++) { |
|
970
|
0 |
2 |
if (end - p < 6) goto err; |
|
975
|
2 |
0 |
if (key >= 0 && key < API_VERSIONS_MAX_KEY) |
|
|
2 |
0 |
if (key >= 0 && key < API_VERSIONS_MAX_KEY) |
|
982
|
0 |
1 |
if (self->sasl_mechanism) { |
|
993
|
0 |
0 |
if (conn_check_destroyed(self)) return; |
|
1020
|
0 |
0 |
if (end - p < 2) goto err; |
|
1024
|
0 |
0 |
if (end - p < 4) goto err; |
|
1027
|
0 |
0 |
for (i = 0; i < count; i++) { |
|
1030
|
0 |
0 |
if (n < 0) goto err; |
|
1034
|
0 |
0 |
if (error_code != 0) { |
|
1036
|
0 |
0 |
if (conn_check_destroyed(self)) return; |
|
1047
|
0 |
0 |
if (conn_check_destroyed(self)) return; |
|
1060
|
0 |
0 |
if (self->sasl_mechanism && strcmp(self->sasl_mechanism, "PLAIN") == 0) { |
|
|
0 |
0 |
if (self->sasl_mechanism && strcmp(self->sasl_mechanism, "PLAIN") == 0) { |
|
1062
|
0 |
0 |
STRLEN ulen = self->sasl_username ? strlen(self->sasl_username) : 0; |
|
1063
|
0 |
0 |
STRLEN plen = self->sasl_password ? strlen(self->sasl_password) : 0; |
|
1068
|
0 |
0 |
if (ulen > 0) kf_buf_append(&body, self->sasl_username, ulen); |
|
1070
|
0 |
0 |
if (plen > 0) kf_buf_append(&body, self->sasl_password, plen); |
|
1073
|
0 |
0 |
else if (self->sasl_mechanism && |
|
1074
|
0 |
0 |
(strcmp(self->sasl_mechanism, "SCRAM-SHA-256") == 0 || |
|
1075
|
0 |
0 |
strcmp(self->sasl_mechanism, "SCRAM-SHA-512") == 0)) { |
|
1082
|
0 |
0 |
for (i = 0; i < 16; i++) |
|
1088
|
0 |
0 |
if (self->scram_nonce) Safefree(self->scram_nonce); |
|
1092
|
0 |
0 |
if (!self->sasl_username) { |
|
1105
|
0 |
0 |
if (self->scram_client_first) Safefree(self->scram_client_first); |
|
1124
|
0 |
0 |
if (ver < 0) ver = 1; |
|
1125
|
0 |
0 |
if (ver > 2) ver = 2; |
|
1137
|
0 |
0 |
if (end - p < 2) goto err; |
|
1144
|
0 |
0 |
if (n < 0) goto err; |
|
1151
|
0 |
0 |
if (end - p >= 4) { |
|
1153
|
0 |
0 |
if (auth_data_len > 0 && end - p >= auth_data_len) { |
|
|
0 |
0 |
if (auth_data_len > 0 && end - p >= auth_data_len) { |
|
1159
|
0 |
0 |
if (error_code != 0) { |
|
1161
|
0 |
0 |
if (errmsg_str && errmsg_len > 0) |
|
|
0 |
0 |
if (errmsg_str && errmsg_len > 0) |
|
1166
|
0 |
0 |
if (conn_check_destroyed(self)) return; |
|
1173
|
0 |
0 |
if (self->sasl_mechanism && self->scram_step == SCRAM_STEP_CLIENT_FIRST && auth_data) { |
|
|
0 |
0 |
if (self->sasl_mechanism && self->scram_step == SCRAM_STEP_CLIENT_FIRST && auth_data) { |
|
|
0 |
0 |
if (self->sasl_mechanism && self->scram_step == SCRAM_STEP_CLIENT_FIRST && auth_data) { |
|
1184
|
0 |
0 |
while (sp < se) { |
|
1185
|
0 |
0 |
if (sp + 2 <= se && sp[0] == 'r' && sp[1] == '=') { |
|
|
0 |
0 |
if (sp + 2 <= se && sp[0] == 'r' && sp[1] == '=') { |
|
|
0 |
0 |
if (sp + 2 <= se && sp[0] == 'r' && sp[1] == '=') { |
|
1187
|
0 |
0 |
while (sp < se && *sp != ',') sp++; |
|
|
0 |
0 |
while (sp < se && *sp != ',') sp++; |
|
1189
|
0 |
0 |
} else if (sp + 2 <= se && sp[0] == 's' && sp[1] == '=') { |
|
|
0 |
0 |
} else if (sp + 2 <= se && sp[0] == 's' && sp[1] == '=') { |
|
|
0 |
0 |
} else if (sp + 2 <= se && sp[0] == 's' && sp[1] == '=') { |
|
1191
|
0 |
0 |
while (sp < se && *sp != ',') sp++; |
|
|
0 |
0 |
while (sp < se && *sp != ',') sp++; |
|
1193
|
0 |
0 |
} else if (sp + 2 <= se && sp[0] == 'i' && sp[1] == '=') { |
|
|
0 |
0 |
} else if (sp + 2 <= se && sp[0] == 'i' && sp[1] == '=') { |
|
|
0 |
0 |
} else if (sp + 2 <= se && sp[0] == 'i' && sp[1] == '=') { |
|
1196
|
0 |
0 |
while (sp < se && *sp != ',') sp++; |
|
|
0 |
0 |
while (sp < se && *sp != ',') sp++; |
|
1198
|
0 |
0 |
if (sp < se && *sp == ',') sp++; |
|
|
0 |
0 |
if (sp < se && *sp == ',') sp++; |
|
1203
|
0 |
0 |
if (!server_nonce || !salt_b64 || iterations <= 0) { |
|
|
0 |
0 |
if (!server_nonce || !salt_b64 || iterations <= 0) { |
|
|
0 |
0 |
if (!server_nonce || !salt_b64 || iterations <= 0) { |
|
1205
|
0 |
0 |
if (conn_check_destroyed(self)) return; |
|
1211
|
0 |
0 |
if (server_nonce_len < 32 || |
|
1212
|
0 |
0 |
memcmp(server_nonce, self->scram_nonce, 32) != 0) { |
|
1214
|
0 |
0 |
if (conn_check_destroyed(self)) return; |
|
1220
|
0 |
0 |
const EVP_MD *md = is_sha512 ? EVP_sha512() : EVP_sha256(); |
|
1221
|
0 |
0 |
int digest_len = is_sha512 ? 64 : 32; |
|
1233
|
0 |
0 |
if (salt_len <= 0) { |
|
1235
|
0 |
0 |
if (conn_check_destroyed(self)) return; |
|
1292
|
0 |
0 |
for (di = 0; di < digest_len; di++) |
|
1296
|
0 |
0 |
if (self->scram_auth_message) Safefree(self->scram_auth_message); |
|
1313
|
0 |
0 |
int plen = bptr->length < 255 ? (int)bptr->length : 255; |
|
1338
|
0 |
0 |
if (ver < 0) ver = 1; |
|
1339
|
0 |
0 |
if (ver > 2) ver = 2; |
|
1347
|
0 |
0 |
if (self->sasl_mechanism && self->scram_step == SCRAM_STEP_CLIENT_FINAL) { |
|
|
0 |
0 |
if (self->sasl_mechanism && self->scram_step == SCRAM_STEP_CLIENT_FINAL) { |
|
1350
|
0 |
0 |
if (!auth_data || auth_data_len < 2 || auth_data[0] != 'v' || auth_data[1] != '=') { |
|
|
0 |
0 |
if (!auth_data || auth_data_len < 2 || auth_data[0] != 'v' || auth_data[1] != '=') { |
|
|
0 |
0 |
if (!auth_data || auth_data_len < 2 || auth_data[0] != 'v' || auth_data[1] != '=') { |
|
|
0 |
0 |
if (!auth_data || auth_data_len < 2 || auth_data[0] != 'v' || auth_data[1] != '=') { |
|
1352
|
0 |
0 |
if (conn_check_destroyed(self)) return; |
|
1361
|
0 |
0 |
if (strcmp(self->sasl_mechanism, "SCRAM-SHA-256") == 0) md = EVP_sha256(); |
|
1362
|
0 |
0 |
else if (strcmp(self->sasl_mechanism, "SCRAM-SHA-512") == 0) md = EVP_sha512(); |
|
1363
|
0 |
0 |
if (!md || self->scram_server_key_len <= 0 || !self->scram_auth_message) { |
|
|
0 |
0 |
if (!md || self->scram_server_key_len <= 0 || !self->scram_auth_message) { |
|
|
0 |
0 |
if (!md || self->scram_server_key_len <= 0 || !self->scram_auth_message) { |
|
1365
|
0 |
0 |
if (conn_check_destroyed(self)) return; |
|
1388
|
0 |
0 |
&& (CRYPTO_memcmp(recv_sig, expected, expected_len) == 0); |
|
|
0 |
0 |
&& (CRYPTO_memcmp(recv_sig, expected, expected_len) == 0); |
|
1393
|
0 |
0 |
if (self->scram_auth_message) { |
|
1401
|
0 |
0 |
if (!ok) { |
|
1403
|
0 |
0 |
if (conn_check_destroyed(self)) return; |
|
1419
|
0 |
0 |
if (conn_check_destroyed(self)) return; |
|
1431
|
1 |
0 |
if (cbt->internal) { |
|
1448
|
0 |
0 |
if (!cbt->cb) return; |
|
1567
|
140 |
1 |
if (key) { |
|
1575
|
140 |
1 |
if (value) { |
|
1583
|
1 |
140 |
if (headers && HvUSEDKEYS(headers) > 0) { |
|
|
0 |
1 |
if (headers && HvUSEDKEYS(headers) > 0) { |
|
|
1 |
0 |
if (headers && HvUSEDKEYS(headers) > 0) { |
|
1584
|
0 |
1 |
kf_buf_append_varint(&rec, (int64_t)HvUSEDKEYS(headers)); |
|
1587
|
2 |
1 |
while ((entry = hv_iternext(headers))) { |
|
1619
|
141 |
16 |
for (i = 0; i < count; i++) { |
|
1621
|
141 |
0 |
if (!elem || !SvROK(*elem) || SvTYPE(SvRV(*elem)) != SVt_PVHV) |
|
|
141 |
0 |
if (!elem || !SvROK(*elem) || SvTYPE(SvRV(*elem)) != SVt_PVHV) |
|
|
0 |
141 |
if (!elem || !SvROK(*elem) || SvTYPE(SvRV(*elem)) != SVt_PVHV) |
|
1628
|
141 |
16 |
for (i = 0; i < count; i++) { |
|
1637
|
141 |
0 |
if (key_sv && SvOK(*key_sv)) key = SvPV(*key_sv, key_len); |
|
|
140 |
1 |
if (key_sv && SvOK(*key_sv)) key = SvPV(*key_sv, key_len); |
|
1638
|
141 |
0 |
if (val_sv && SvOK(*val_sv)) val = SvPV(*val_sv, val_len); |
|
|
140 |
1 |
if (val_sv && SvOK(*val_sv)) val = SvPV(*val_sv, val_len); |
|
1639
|
1 |
140 |
if (hdr_sv && SvROK(*hdr_sv) && SvTYPE(SvRV(*hdr_sv)) == SVt_PVHV) |
|
|
1 |
0 |
if (hdr_sv && SvROK(*hdr_sv) && SvTYPE(SvRV(*hdr_sv)) == SVt_PVHV) |
|
|
1 |
0 |
if (hdr_sv && SvROK(*hdr_sv) && SvTYPE(SvRV(*hdr_sv)) == SVt_PVHV) |
|
1648
|
0 |
16 |
if (is_transactional) attrs |= 0x10; /* bit 4 = isTransactional */ |
|
1677
|
2 |
14 |
if (compression == COMPRESS_GZIP) { |
|
1686
|
2 |
0 |
if (zinit == Z_OK) { |
|
1691
|
2 |
0 |
if (deflate(&zs, Z_FINISH) == Z_STREAM_END) zok = 1; |
|
1694
|
2 |
0 |
if (zok) { |
|
1785
|
0 |
2 |
if (flexible) { |
|
1787
|
0 |
0 |
if (end - p < 4) goto done; |
|
1793
|
0 |
0 |
if (n < 0) goto done; |
|
1798
|
0 |
0 |
for (i = 0; i < broker_count; i++) { |
|
1800
|
0 |
0 |
if (end - p < 4) goto done; |
|
1806
|
0 |
0 |
if (n < 0) goto done; |
|
1808
|
0 |
0 |
hv_store(bh, "host", 4, newSVpvn(host ? host : "", host ? hlen : 0), 0); |
|
|
0 |
0 |
hv_store(bh, "host", 4, newSVpvn(host ? host : "", host ? hlen : 0), 0); |
|
1810
|
0 |
0 |
if (end - p < 4) goto done; |
|
1817
|
0 |
0 |
if (n < 0) goto done; |
|
1822
|
0 |
0 |
if (n < 0) goto done; |
|
1831
|
0 |
0 |
if (n < 0) goto done; |
|
1835
|
0 |
0 |
if (end - p < 4) goto done; |
|
1841
|
0 |
0 |
if (n < 0) goto done; |
|
1845
|
0 |
0 |
for (i = 0; i < topic_count; i++) { |
|
1847
|
0 |
0 |
if (end - p < 2) goto done; |
|
1853
|
0 |
0 |
if (n < 0) goto done; |
|
1855
|
0 |
0 |
hv_store(th, "name", 4, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0); |
|
|
0 |
0 |
hv_store(th, "name", 4, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0); |
|
1858
|
0 |
0 |
if (version >= 10) { |
|
1859
|
0 |
0 |
if (end - p < 16) goto done; |
|
1864
|
0 |
0 |
if (end - p < 1) goto done; |
|
1869
|
0 |
0 |
if (n < 0) goto done; |
|
1875
|
0 |
0 |
for (j = 0; j < part_count; j++) { |
|
1877
|
0 |
0 |
if (end - p < 2) goto done; |
|
1881
|
0 |
0 |
if (end - p < 4) goto done; |
|
1885
|
0 |
0 |
if (end - p < 4) goto done; |
|
1890
|
0 |
0 |
if (version >= 7) { |
|
1891
|
0 |
0 |
if (end - p < 4) goto done; |
|
1897
|
0 |
0 |
if (n < 0) goto done; |
|
1900
|
0 |
0 |
if (rcount < 0 || rcount > 65536) goto done; |
|
|
0 |
0 |
if (rcount < 0 || rcount > 65536) goto done; |
|
1901
|
0 |
0 |
if (end - p < (ptrdiff_t)((int64_t)rcount * 4)) goto done; |
|
1906
|
0 |
0 |
if (n < 0) goto done; |
|
1909
|
0 |
0 |
if (rcount < 0 || rcount > 65536) goto done; |
|
|
0 |
0 |
if (rcount < 0 || rcount > 65536) goto done; |
|
1910
|
0 |
0 |
if (end - p < (ptrdiff_t)((int64_t)rcount * 4)) goto done; |
|
1914
|
0 |
0 |
if (version >= 5) { |
|
1916
|
0 |
0 |
if (n < 0) goto done; |
|
1919
|
0 |
0 |
if (rcount < 0 || rcount > 65536) goto done; |
|
|
0 |
0 |
if (rcount < 0 || rcount > 65536) goto done; |
|
1920
|
0 |
0 |
if (end - p < (ptrdiff_t)((int64_t)rcount * 4)) goto done; |
|
1926
|
0 |
0 |
if (n < 0) goto done; |
|
1934
|
0 |
0 |
if (version >= 8) { |
|
1935
|
0 |
0 |
if (end - p < 4) goto done; |
|
1941
|
0 |
0 |
if (n < 0) goto done; |
|
1949
|
0 |
2 |
if (version >= 3) { |
|
1950
|
0 |
0 |
if (end - p < 4) goto done; |
|
1955
|
0 |
2 |
if (end - p < 4) goto done; |
|
1957
|
2 |
0 |
if (broker_count < 0 || broker_count > 65536) goto done; |
|
|
0 |
2 |
if (broker_count < 0 || broker_count > 65536) goto done; |
|
1959
|
2 |
1 |
for (i = 0; i < broker_count; i++) { |
|
1961
|
1 |
1 |
if (end - p < 4) goto done; |
|
1967
|
0 |
1 |
if (n < 0) goto done; |
|
1969
|
1 |
0 |
hv_store(bh, "host", 4, newSVpvn(host ? host : "", host ? hlen : 0), 0); |
|
|
1 |
0 |
hv_store(bh, "host", 4, newSVpvn(host ? host : "", host ? hlen : 0), 0); |
|
1971
|
0 |
1 |
if (end - p < 4) goto done; |
|
1976
|
0 |
1 |
if (version >= 1) { |
|
1979
|
0 |
0 |
if (n < 0) goto done; |
|
1987
|
0 |
1 |
if (version >= 2) { |
|
1990
|
0 |
0 |
if (n < 0) goto done; |
|
1995
|
0 |
1 |
if (version >= 1) { |
|
1996
|
0 |
0 |
if (end - p < 4) goto done; |
|
2002
|
0 |
1 |
if (end - p < 4) goto done; |
|
2004
|
1 |
0 |
if (topic_count < 0 || topic_count > 1000000) goto done; |
|
|
0 |
1 |
if (topic_count < 0 || topic_count > 1000000) goto done; |
|
2005
|
1 |
1 |
for (i = 0; i < topic_count; i++) { |
|
2007
|
0 |
1 |
if (end - p < 2) goto done; |
|
2013
|
0 |
1 |
if (n < 0) goto done; |
|
2015
|
1 |
0 |
hv_store(th, "name", 4, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0); |
|
|
1 |
0 |
hv_store(th, "name", 4, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0); |
|
2018
|
0 |
1 |
if (version >= 1) { |
|
2019
|
0 |
0 |
if (end - p < 1) goto done; |
|
2024
|
0 |
1 |
if (end - p < 4) goto done; |
|
2026
|
1 |
0 |
if (part_count < 0 || part_count > 1000000) goto done; |
|
|
0 |
1 |
if (part_count < 0 || part_count > 1000000) goto done; |
|
2029
|
1 |
1 |
for (j = 0; j < part_count; j++) { |
|
2031
|
0 |
1 |
if (end - p < 2) goto done; |
|
2035
|
0 |
1 |
if (end - p < 4) goto done; |
|
2039
|
0 |
1 |
if (end - p < 4) goto done; |
|
2044
|
0 |
1 |
if (version >= 7) { |
|
2045
|
0 |
0 |
if (end - p < 4) goto done; |
|
2050
|
0 |
1 |
if (end - p < 4) goto done; |
|
2052
|
1 |
0 |
if (rcount < 0 || rcount > 65536) goto done; |
|
|
0 |
1 |
if (rcount < 0 || rcount > 65536) goto done; |
|
2053
|
0 |
1 |
if (end - p < (ptrdiff_t)((int64_t)rcount * 4)) goto done; |
|
2057
|
0 |
1 |
if (end - p < 4) goto done; |
|
2059
|
1 |
0 |
if (rcount < 0 || rcount > 65536) goto done; |
|
|
0 |
1 |
if (rcount < 0 || rcount > 65536) goto done; |
|
2060
|
0 |
1 |
if (end - p < (ptrdiff_t)((int64_t)rcount * 4)) goto done; |
|
2064
|
0 |
1 |
if (version >= 5) { |
|
2065
|
0 |
0 |
if (end - p < 4) goto done; |
|
2067
|
0 |
0 |
if (rcount < 0 || rcount > 65536) goto done; |
|
|
0 |
0 |
if (rcount < 0 || rcount > 65536) goto done; |
|
2068
|
0 |
0 |
if (end - p < (ptrdiff_t)((int64_t)rcount * 4)) goto done; |
|
2099
|
0 |
1 |
if (end - p < 4) goto done; |
|
2103
|
1 |
1 |
for (i = 0; i < topic_count; i++) { |
|
2107
|
0 |
1 |
if (n < 0) goto done; |
|
2109
|
1 |
0 |
hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0); |
|
|
1 |
0 |
hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0); |
|
2112
|
0 |
1 |
if (end - p < 4) goto done; |
|
2117
|
1 |
1 |
for (j = 0; j < part_count; j++) { |
|
2119
|
0 |
1 |
if (end - p < 4) goto done; |
|
2123
|
0 |
1 |
if (end - p < 2) goto done; |
|
2127
|
0 |
1 |
if (end - p < 8) goto done; |
|
2132
|
0 |
1 |
if (version >= 2) { |
|
2133
|
0 |
0 |
if (end - p < 8) goto done; |
|
2138
|
0 |
1 |
if (version >= 5) { |
|
2139
|
0 |
0 |
if (end - p < 8) goto done; |
|
2150
|
1 |
0 |
if (version >= 1 && end - p >= 4) { |
|
|
0 |
0 |
if (version >= 1 && end - p >= 4) { |
|
2173
|
0 |
16 |
if (end - p < 12) return -1; |
|
2175
|
16 |
0 |
if (out_base_offset) *out_base_offset = base_offset; |
|
2178
|
1 |
15 |
if (end - p < batch_length) return -1; |
|
2181
|
0 |
15 |
if (batch_end - p < 9) return -1; |
|
2184
|
1 |
14 |
if (magic != 2) return -1; /* only support magic=2 (current format) */ |
|
2187
|
1 |
13 |
if (crc32c(p, (size_t)(batch_end - p)) != expected_crc) return -1; |
|
2189
|
0 |
13 |
if (batch_end - p < 36) return -1; |
|
2199
|
0 |
13 |
if (batch_end - p < 4) return -1; |
|
2207
|
7 |
6 |
if (compression_type != COMPRESS_NONE && batch_end > p) { |
|
|
7 |
0 |
if (compression_type != COMPRESS_NONE && batch_end > p) { |
|
2210
|
6 |
1 |
if (decomp_cap < 4096) decomp_cap = 4096; |
|
2213
|
1 |
6 |
if (compression_type == COMPRESS_GZIP) { |
|
2215
|
1 |
1 |
while (!zok && decomp_cap < 64 * 1024 * 1024) { |
|
|
1 |
0 |
while (!zok && decomp_cap < 64 * 1024 * 1024) { |
|
2220
|
0 |
1 |
if (zinit != Z_OK) { |
|
2232
|
1 |
0 |
if (zret == Z_STREAM_END) { |
|
2236
|
0 |
0 |
} else if (zret == Z_BUF_ERROR || zret == Z_OK) { |
|
|
0 |
0 |
} else if (zret == Z_BUF_ERROR || zret == Z_OK) { |
|
2315
|
138 |
13 |
for (i = 0; i < record_count; i++) { |
|
2318
|
0 |
138 |
if (n < 0) { if (decompressed) Safefree(decompressed); return -1; } |
|
|
0 |
0 |
if (n < 0) { if (decompressed) Safefree(decompressed); return -1; } |
|
2320
|
0 |
138 |
if (rec_end - rp < rec_len) { if (decompressed) Safefree(decompressed); return -1; } |
|
|
0 |
0 |
if (rec_end - rp < rec_len) { if (decompressed) Safefree(decompressed); return -1; } |
|
2323
|
0 |
138 |
if (this_rec_end - rp < 1) { if (decompressed) Safefree(decompressed); return -1; } |
|
|
0 |
0 |
if (this_rec_end - rp < 1) { if (decompressed) Safefree(decompressed); return -1; } |
|
2328
|
0 |
138 |
if (n < 0) { if (decompressed) Safefree(decompressed); return -1; } |
|
|
0 |
0 |
if (n < 0) { if (decompressed) Safefree(decompressed); return -1; } |
|
2333
|
0 |
138 |
if (n < 0) { if (decompressed) Safefree(decompressed); return -1; } |
|
|
0 |
0 |
if (n < 0) { if (decompressed) Safefree(decompressed); return -1; } |
|
2339
|
0 |
138 |
if (n < 0) { if (decompressed) Safefree(decompressed); return -1; } |
|
|
0 |
0 |
if (n < 0) { if (decompressed) Safefree(decompressed); return -1; } |
|
2342
|
137 |
1 |
if (key_len >= 0) { |
|
2343
|
0 |
137 |
if (this_rec_end - rp < key_len) { if (decompressed) Safefree(decompressed); return -1; } |
|
|
0 |
0 |
if (this_rec_end - rp < key_len) { if (decompressed) Safefree(decompressed); return -1; } |
|
2351
|
0 |
138 |
if (n < 0) { if (decompressed) Safefree(decompressed); return -1; } |
|
|
0 |
0 |
if (n < 0) { if (decompressed) Safefree(decompressed); return -1; } |
|
2354
|
137 |
1 |
if (val_len >= 0) { |
|
2355
|
0 |
137 |
if (this_rec_end - rp < val_len) { if (decompressed) Safefree(decompressed); return -1; } |
|
|
0 |
0 |
if (this_rec_end - rp < val_len) { if (decompressed) Safefree(decompressed); return -1; } |
|
2363
|
0 |
138 |
if (n < 0) { if (decompressed) Safefree(decompressed); return -1; } |
|
|
0 |
0 |
if (n < 0) { if (decompressed) Safefree(decompressed); return -1; } |
|
2366
|
1 |
137 |
if (hdr_count > 0) { |
|
2369
|
2 |
1 |
for (h = 0; h < hdr_count; h++) { |
|
2372
|
0 |
2 |
if (n < 0) { SvREFCNT_dec((SV*)hdr_hv); if (decompressed) Safefree(decompressed); return -1; } |
|
|
0 |
0 |
if (n < 0) { SvREFCNT_dec((SV*)hdr_hv); if (decompressed) Safefree(decompressed); return -1; } |
|
2375
|
2 |
0 |
if (hk_len < 0 || this_rec_end - rp < hk_len) { SvREFCNT_dec((SV*)hdr_hv); if (decompressed) Safefree(decompressed); return -1; } |
|
|
0 |
2 |
if (hk_len < 0 || this_rec_end - rp < hk_len) { SvREFCNT_dec((SV*)hdr_hv); if (decompressed) Safefree(decompressed); return -1; } |
|
|
0 |
0 |
if (hk_len < 0 || this_rec_end - rp < hk_len) { SvREFCNT_dec((SV*)hdr_hv); if (decompressed) Safefree(decompressed); return -1; } |
|
2380
|
0 |
2 |
if (n < 0) { SvREFCNT_dec((SV*)hdr_hv); if (decompressed) Safefree(decompressed); return -1; } |
|
|
0 |
0 |
if (n < 0) { SvREFCNT_dec((SV*)hdr_hv); if (decompressed) Safefree(decompressed); return -1; } |
|
2383
|
2 |
0 |
if (hv_len >= 0) { |
|
2384
|
0 |
2 |
if (this_rec_end - rp < hv_len) { SvREFCNT_dec((SV*)hdr_hv); if (decompressed) Safefree(decompressed); return -1; } |
|
|
0 |
0 |
if (this_rec_end - rp < hv_len) { SvREFCNT_dec((SV*)hdr_hv); if (decompressed) Safefree(decompressed); return -1; } |
|
2388
|
2 |
0 |
hv_store(hdr_hv, hk_data, (I32)hk_len, |
|
2396
|
137 |
1 |
hv_store(rec_hv, "key", 3, |
|
2398
|
137 |
1 |
hv_store(rec_hv, "value", 5, |
|
2400
|
1 |
137 |
if (hdr_hv) |
|
2408
|
1 |
12 |
if (decompressed) Safefree(decompressed); |
|
2425
|
0 |
0 |
if (version >= 1) { |
|
2426
|
0 |
0 |
if (end - p < 4) goto done; |
|
2432
|
0 |
0 |
if (version >= 7) { |
|
2433
|
0 |
0 |
if (end - p < 2) goto done; |
|
2438
|
0 |
0 |
if (version >= 7) { |
|
2439
|
0 |
0 |
if (end - p < 4) goto done; |
|
2444
|
0 |
0 |
if (end - p < 4) goto done; |
|
2448
|
0 |
0 |
for (i = 0; i < topic_count; i++) { |
|
2453
|
0 |
0 |
if (n < 0) goto done; |
|
2455
|
0 |
0 |
hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0); |
|
|
0 |
0 |
hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0); |
|
2458
|
0 |
0 |
if (end - p < 4) goto done; |
|
2463
|
0 |
0 |
for (j = 0; j < part_count; j++) { |
|
2466
|
0 |
0 |
if (end - p < 4) goto done; |
|
2470
|
0 |
0 |
if (end - p < 2) goto done; |
|
2474
|
0 |
0 |
if (end - p < 8) goto done; |
|
2479
|
0 |
0 |
if (version >= 4) { |
|
2480
|
0 |
0 |
if (end - p < 8) goto done; |
|
2486
|
0 |
0 |
if (version >= 5) { |
|
2487
|
0 |
0 |
if (end - p < 8) goto done; |
|
2492
|
0 |
0 |
if (version >= 4) { |
|
2493
|
0 |
0 |
if (end - p < 4) goto done; |
|
2496
|
0 |
0 |
for (at = 0; at < at_count; at++) { |
|
2497
|
0 |
0 |
if (end - p < 16) goto done; |
|
2503
|
0 |
0 |
if (end - p < 4) goto done; |
|
2507
|
0 |
0 |
if (records_size > 0 && end - p >= records_size) { |
|
|
0 |
0 |
if (records_size > 0 && end - p >= records_size) { |
|
2512
|
0 |
0 |
while (rp < rend && rend - rp >= 12) { |
|
|
0 |
0 |
while (rp < rend && rend - rp >= 12) { |
|
2515
|
0 |
0 |
if (bl < 0 || rend - rp < 12 + bl) break; |
|
|
0 |
0 |
if (bl < 0 || rend - rp < 12 + bl) break; |
|
2521
|
0 |
0 |
} else if (records_size > 0) { |
|
2551
|
0 |
0 |
if (version >= 2) { |
|
2552
|
0 |
0 |
if (end - p < 4) goto done; |
|
2557
|
0 |
0 |
if (end - p < 4) goto done; |
|
2561
|
0 |
0 |
for (i = 0; i < topic_count; i++) { |
|
2565
|
0 |
0 |
if (n < 0) goto done; |
|
2567
|
0 |
0 |
hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0); |
|
|
0 |
0 |
hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0); |
|
2569
|
0 |
0 |
if (end - p < 4) goto done; |
|
2574
|
0 |
0 |
for (j = 0; j < part_count; j++) { |
|
2576
|
0 |
0 |
if (end - p < 4) goto done; |
|
2580
|
0 |
0 |
if (end - p < 2) goto done; |
|
2584
|
0 |
0 |
if (version >= 1) { |
|
2585
|
0 |
0 |
if (end - p < 8) goto done; |
|
2590
|
0 |
0 |
if (end - p < 8) goto done; |
|
2595
|
0 |
0 |
if (version >= 4) { |
|
2596
|
0 |
0 |
if (end - p < 4) goto done; |
|
2622
|
0 |
1 |
if (version >= 1) { |
|
2623
|
0 |
0 |
if (end - p < 4) goto done; |
|
2627
|
0 |
1 |
if (end - p < 2) goto done; |
|
2632
|
0 |
1 |
if (version >= 1) { |
|
2635
|
0 |
0 |
if (n < 0) goto done; |
|
2637
|
0 |
0 |
if (emsg && elen > 0) |
|
|
0 |
0 |
if (emsg && elen > 0) |
|
2641
|
0 |
1 |
if (end - p < 4) goto done; |
|
2647
|
0 |
1 |
if (n < 0) goto done; |
|
2649
|
1 |
0 |
hv_store(result, "host", 4, newSVpvn(host ? host : "", host ? hlen : 0), 0); |
|
|
1 |
0 |
hv_store(result, "host", 4, newSVpvn(host ? host : "", host ? hlen : 0), 0); |
|
2651
|
0 |
1 |
if (end - p < 4) goto done; |
|
2670
|
0 |
1 |
if (version >= 2) { |
|
2671
|
0 |
0 |
if (end - p < 4) goto done; |
|
2675
|
0 |
1 |
if (end - p < 2) goto done; |
|
2679
|
0 |
1 |
if (end - p < 4) goto done; |
|
2687
|
0 |
1 |
if (n < 0) goto done; |
|
2689
|
1 |
0 |
if (proto) |
|
2694
|
0 |
1 |
if (n < 0) goto done; |
|
2696
|
1 |
0 |
if (leader) |
|
2703
|
0 |
1 |
if (n < 0) goto done; |
|
2705
|
1 |
0 |
if (member_id) |
|
2709
|
0 |
1 |
if (end - p < 4) goto done; |
|
2713
|
1 |
1 |
for (i = 0; i < mcount; i++) { |
|
2718
|
0 |
1 |
if (n < 0) goto done; |
|
2720
|
1 |
0 |
if (mid) |
|
2724
|
0 |
1 |
if (version >= 5) { |
|
2727
|
0 |
0 |
if (n < 0) goto done; |
|
2732
|
0 |
1 |
if (end - p < 4) goto done; |
|
2734
|
0 |
1 |
if (mdlen > 0) { |
|
2735
|
0 |
0 |
if (end - p < mdlen) goto done; |
|
2758
|
0 |
1 |
if (version >= 1) { |
|
2759
|
0 |
0 |
if (end - p < 4) goto done; |
|
2763
|
0 |
1 |
if (end - p < 2) goto done; |
|
2768
|
0 |
1 |
if (end - p < 4) goto done; |
|
2770
|
0 |
1 |
if (alen > 0 && end - p >= alen) { |
|
|
0 |
1 |
if (alen > 0 && end - p >= alen) { |
|
2788
|
0 |
1 |
if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */ |
|
|
0 |
0 |
if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */ |
|
2789
|
1 |
0 |
if (end - p >= 2) { |
|
2808
|
0 |
0 |
if (version >= 3 && end - p >= 4) p += 4; /* throttle_time_ms */ |
|
|
0 |
0 |
if (version >= 3 && end - p >= 4) p += 4; /* throttle_time_ms */ |
|
2810
|
0 |
0 |
if (end - p < 4) goto done; |
|
2813
|
0 |
0 |
for (i = 0; i < tc; i++) { |
|
2817
|
0 |
0 |
if (n < 0) goto done; |
|
2819
|
0 |
0 |
hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0); |
|
|
0 |
0 |
hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0); |
|
2821
|
0 |
0 |
if (end - p < 4) goto done; |
|
2825
|
0 |
0 |
for (j = 0; j < pc; j++) { |
|
2827
|
0 |
0 |
if (end - p < 6) goto done; |
|
2854
|
0 |
1 |
if (version >= 3 && end - p >= 4) p += 4; /* throttle_time_ms */ |
|
|
0 |
0 |
if (version >= 3 && end - p >= 4) p += 4; /* throttle_time_ms */ |
|
2856
|
0 |
1 |
if (end - p < 4) goto done; |
|
2859
|
1 |
1 |
for (i = 0; i < tc; i++) { |
|
2863
|
0 |
1 |
if (n < 0) goto done; |
|
2865
|
1 |
0 |
hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0); |
|
|
1 |
0 |
hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0); |
|
2867
|
0 |
1 |
if (end - p < 4) goto done; |
|
2871
|
1 |
1 |
for (j = 0; j < pc; j++) { |
|
2873
|
0 |
1 |
if (end - p < 4) goto done; |
|
2877
|
0 |
1 |
if (end - p < 8) goto done; |
|
2882
|
0 |
1 |
if (version >= 5 && end - p >= 4) p += 4; |
|
|
0 |
0 |
if (version >= 5 && end - p >= 4) p += 4; |
|
2886
|
0 |
1 |
if (n < 0) goto done; |
|
2889
|
0 |
1 |
if (end - p < 2) goto done; |
|
2900
|
1 |
0 |
if (version >= 2 && end - p >= 2) { |
|
|
0 |
0 |
if (version >= 2 && end - p >= 2) { |
|
2919
|
0 |
1 |
if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */ |
|
|
0 |
0 |
if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */ |
|
2920
|
1 |
0 |
if (end - p >= 2) { |
|
2939
|
0 |
1 |
if (version >= 2 && end - p >= 4) p += 4; /* throttle_time_ms */ |
|
|
0 |
0 |
if (version >= 2 && end - p >= 4) p += 4; /* throttle_time_ms */ |
|
2941
|
0 |
1 |
if (end - p < 4) goto done; |
|
2944
|
1 |
1 |
for (i = 0; i < tc; i++) { |
|
2948
|
0 |
1 |
if (n < 0) goto done; |
|
2950
|
1 |
0 |
hv_store(th, "name", 4, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0); |
|
|
1 |
0 |
hv_store(th, "name", 4, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0); |
|
2952
|
0 |
1 |
if (end - p < 2) goto done; |
|
2957
|
0 |
1 |
if (version >= 1) { |
|
2960
|
0 |
0 |
if (n < 0) goto done; |
|
2962
|
0 |
0 |
if (emsg && elen > 0) |
|
|
0 |
0 |
if (emsg && elen > 0) |
|
2985
|
0 |
1 |
if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */ |
|
|
0 |
0 |
if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */ |
|
2987
|
0 |
1 |
if (end - p < 4) goto done; |
|
2990
|
1 |
1 |
for (i = 0; i < tc; i++) { |
|
2994
|
0 |
1 |
if (n < 0) goto done; |
|
2996
|
1 |
0 |
hv_store(th, "name", 4, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0); |
|
|
1 |
0 |
hv_store(th, "name", 4, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0); |
|
2998
|
0 |
1 |
if (end - p < 2) goto done; |
|
3019
|
1 |
0 |
if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */ |
|
|
1 |
0 |
if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */ |
|
3021
|
0 |
1 |
if (end - p < 2) goto done; |
|
3025
|
0 |
1 |
if (end - p < 8) goto done; |
|
3029
|
0 |
1 |
if (end - p < 2) goto done; |
|
3048
|
0 |
1 |
if (end - p < 4) goto done; |
|
3051
|
0 |
1 |
if (end - p < 4) goto done; |
|
3054
|
1 |
1 |
for (i = 0; i < tc; i++) { |
|
3058
|
0 |
1 |
if (n < 0) goto done; |
|
3060
|
1 |
0 |
hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0); |
|
|
1 |
0 |
hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0); |
|
3062
|
0 |
1 |
if (end - p < 4) goto done; |
|
3066
|
1 |
1 |
for (j = 0; j < pc; j++) { |
|
3068
|
0 |
1 |
if (end - p < 6) goto done; |
|
3093
|
1 |
0 |
if (end - p >= 4) p += 4; /* throttle_time_ms */ |
|
3094
|
1 |
0 |
if (end - p >= 2) { |
|
3113
|
0 |
1 |
if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */ |
|
|
0 |
0 |
if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */ |
|
3115
|
0 |
1 |
if (end - p < 4) goto done; |
|
3118
|
0 |
1 |
for (i = 0; i < tc; i++) { |
|
3122
|
0 |
0 |
if (n < 0) goto done; |
|
3124
|
0 |
0 |
hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0); |
|
|
0 |
0 |
hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0); |
|
3126
|
0 |
0 |
if (end - p < 4) goto done; |
|
3130
|
0 |
0 |
for (j = 0; j < pc; j++) { |
|
3132
|
0 |
0 |
if (end - p < 6) goto done; |
|
3153
|
1 |
1 |
while (self->rbuf_len >= 4) { |
|
3155
|
1 |
0 |
if (msg_size < 0 || msg_size > 256 * 1024 * 1024) { |
|
|
0 |
1 |
if (msg_size < 0 || msg_size > 256 * 1024 * 1024) { |
|
3157
|
0 |
0 |
if (conn_check_destroyed(self)) return; |
|
3162
|
0 |
1 |
if (self->rbuf_len < (size_t)(4 + msg_size)) |
|
3168
|
0 |
1 |
if (msg_size < 4) { |
|
3170
|
0 |
0 |
if (conn_check_destroyed(self)) return; |
|
3181
|
1 |
0 |
if (!ngx_queue_empty(&self->cb_queue)) { |
|
3184
|
0 |
1 |
if (cbt->correlation_id != corr_id) { |
|
3191
|
0 |
0 |
if (conn_check_destroyed(self)) return; |
|
3201
|
1 |
0 |
if (cbt) { |
|
3203
|
0 |
1 |
if (cbt->cb) SvREFCNT_dec(cbt->cb); |
|
3205
|
0 |
1 |
if (conn_check_destroyed(self)) return; |
|
3210
|
1 |
0 |
if (self->rbuf_len > 0) |
|
3224
|
4 |
0 |
if (!self || self->magic != KF_MAGIC_ALIVE) return; |
|
|
0 |
4 |
if (!self || self->magic != KF_MAGIC_ALIVE) return; |
|
3227
|
2 |
2 |
if (self->state == CONN_CONNECTING) { |
|
3231
|
2 |
0 |
if (self->timing) { |
|
3237
|
0 |
2 |
if (getsockopt(self->fd, SOL_SOCKET, SO_ERROR, &err, &errlen) < 0) |
|
3239
|
1 |
1 |
if (err != 0) { |
|
3243
|
0 |
1 |
if (conn_check_destroyed(self)) return; |
|
3254
|
0 |
2 |
if (self->state == CONN_TLS_HANDSHAKE) { |
|
3257
|
0 |
0 |
if (ret == 1) { |
|
3267
|
0 |
0 |
if (err == SSL_ERROR_WANT_READ) { |
|
3270
|
0 |
0 |
} else if (err == SSL_ERROR_WANT_WRITE) { |
|
3276
|
0 |
0 |
if (e) { |
|
3284
|
0 |
0 |
if (conn_check_destroyed(self)) return; |
|
3292
|
1 |
1 |
if (revents & EV_WRITE) { |
|
3293
|
1 |
1 |
while (self->wbuf_off < self->wbuf_len) { |
|
3296
|
0 |
1 |
if (n < 0) { |
|
3297
|
0 |
0 |
if (errno == EAGAIN || errno == EWOULDBLOCK) break; |
|
|
0 |
0 |
if (errno == EAGAIN || errno == EWOULDBLOCK) break; |
|
3299
|
0 |
0 |
if (conn_check_destroyed(self)) return; |
|
3303
|
0 |
1 |
if (n == 0) { |
|
3310
|
1 |
0 |
if (self->wbuf_off >= self->wbuf_len) { |
|
3318
|
1 |
1 |
if (revents & EV_READ) { |
|
3322
|
0 |
1 |
if (n < 0) { |
|
3323
|
0 |
0 |
if (errno == EAGAIN || errno == EWOULDBLOCK) return; |
|
|
0 |
0 |
if (errno == EAGAIN || errno == EWOULDBLOCK) return; |
|
3325
|
0 |
0 |
if (conn_check_destroyed(self)) return; |
|
3329
|
0 |
1 |
if (n == 0) { |
|
3350
|
0 |
0 |
if (self->magic != KF_MAGIC_ALIVE) return; |
|
3353
|
0 |
0 |
if (conn_check_destroyed(self)) return; |
|
3374
|
0 |
1 |
if (self->tls_enabled) { |
|
3376
|
0 |
0 |
if (!self->ssl_ctx) { |
|
3378
|
0 |
0 |
if (conn_check_destroyed(self)) return; |
|
3383
|
0 |
0 |
if (self->tls_skip_verify) |
|
3388
|
0 |
0 |
if (self->tls_ca_file) { |
|
3389
|
0 |
0 |
if (SSL_CTX_load_verify_locations(self->ssl_ctx, self->tls_ca_file, NULL) != 1) { |
|
3391
|
0 |
0 |
if (conn_check_destroyed(self)) return; |
|
3398
|
0 |
0 |
if (!self->ssl) { |
|
3400
|
0 |
0 |
if (conn_check_destroyed(self)) return; |
|
3406
|
0 |
0 |
if (!is_ip_literal(self->host)) |
|
3409
|
0 |
0 |
if (!self->tls_skip_verify) { |
|
3412
|
0 |
0 |
if (is_ip_literal(self->host)) |
|
3421
|
0 |
0 |
if (ret == 1) { |
|
3428
|
0 |
0 |
if (err == SSL_ERROR_WANT_READ) { |
|
3430
|
0 |
0 |
} else if (err == SSL_ERROR_WANT_WRITE) { |
|
3435
|
0 |
0 |
if (e) { |
|
3443
|
0 |
0 |
if (conn_check_destroyed(self)) return; |
|
3462
|
0 |
2 |
if (self->state != CONN_DISCONNECTED) { |
|
3467
|
2 |
0 |
if (host != self->host) { |
|
3468
|
0 |
2 |
if (self->host) Safefree(self->host); |
|
3486
|
0 |
2 |
if (gai_err != 0) { |
|
3490
|
0 |
2 |
if (gai_err != 0) { |
|
3497
|
2 |
0 |
for (rp = res; rp; rp = rp->ai_next) { |
|
3499
|
0 |
2 |
if (fd < 0) continue; |
|
3511
|
0 |
2 |
if (ret == 0) { |
|
3526
|
2 |
0 |
if (errno == EINPROGRESS) { |
|
3538
|
2 |
0 |
if (timeout > 0) { |
|
3569
|
0 |
7 |
if (SvOK(loop_sv) && sv_derived_from(loop_sv, "EV::Loop")) |
|
|
0 |
0 |
if (SvOK(loop_sv) && sv_derived_from(loop_sv, "EV::Loop")) |
|
3598
|
448 |
7 |
for (i = 0; i < API_VERSIONS_MAX_KEY; i++) |
|
3613
|
0 |
7 |
if (self->magic != KF_MAGIC_ALIVE) return; |
|
3621
|
0 |
7 |
if (self->reconnect_timing) { |
|
3626
|
2 |
5 |
CLEAR_HANDLER(self->on_error); |
|
3627
|
1 |
6 |
CLEAR_HANDLER(self->on_connect); |
|
3628
|
0 |
7 |
CLEAR_HANDLER(self->on_disconnect); |
|
3630
|
2 |
5 |
if (self->host) Safefree(self->host); |
|
3631
|
7 |
0 |
if (self->client_id) Safefree(self->client_id); |
|
3632
|
0 |
7 |
if (self->sasl_mechanism) Safefree(self->sasl_mechanism); |
|
3633
|
0 |
7 |
if (self->sasl_username) Safefree(self->sasl_username); |
|
3634
|
0 |
7 |
if (self->sasl_password) Safefree(self->sasl_password); |
|
3635
|
0 |
7 |
if (self->scram_nonce) Safefree(self->scram_nonce); |
|
3636
|
0 |
7 |
if (self->scram_client_first) Safefree(self->scram_client_first); |
|
3639
|
0 |
7 |
if (self->scram_auth_message) { |
|
3644
|
0 |
7 |
if (self->tls_ca_file) Safefree(self->tls_ca_file); |
|
3645
|
7 |
0 |
if (self->rbuf) Safefree(self->rbuf); |
|
3646
|
7 |
0 |
if (self->wbuf) Safefree(self->wbuf); |
|
3663
|
0 |
1 |
if (self->reconnect_timing) { |
|
3673
|
0 |
3 |
RETVAL = (self->state == CONN_READY) ? 1 : 0; |
|
3680
|
0 |
0 |
RETVAL = self->state; |
|
3687
|
0 |
0 |
RETVAL = self->pending_count; |
|
3695
|
0 |
2 |
CLEAR_HANDLER(self->on_error); |
|
3696
|
2 |
0 |
if (cb && SvOK(cb)) { |
|
|
2 |
0 |
if (cb && SvOK(cb)) { |
|
3705
|
0 |
1 |
CLEAR_HANDLER(self->on_connect); |
|
3706
|
1 |
0 |
if (cb && SvOK(cb)) { |
|
|
1 |
0 |
if (cb && SvOK(cb)) { |
|
3715
|
0 |
0 |
CLEAR_HANDLER(self->on_disconnect); |
|
3716
|
0 |
0 |
if (cb && SvOK(cb)) { |
|
|
0 |
0 |
if (cb && SvOK(cb)) { |
|
3725
|
0 |
0 |
if (id) { |
|
3726
|
0 |
0 |
if (self->client_id) Safefree(self->client_id); |
|
3737
|
0 |
0 |
if (self->tls_ca_file) { Safefree(self->tls_ca_file); self->tls_ca_file = NULL; } |
|
3738
|
0 |
0 |
if (ca_file) self->tls_ca_file = savepv(ca_file); |
|
3746
|
0 |
0 |
if (self->sasl_mechanism) { Safefree(self->sasl_mechanism); self->sasl_mechanism = NULL; } |
|
3747
|
0 |
0 |
if (self->sasl_username) { Safefree(self->sasl_username); self->sasl_username = NULL; } |
|
3748
|
0 |
0 |
if (self->sasl_password) { Safefree(self->sasl_password); self->sasl_password = NULL; } |
|
3749
|
0 |
0 |
if (SvOK(ST(1))) { |
|
3751
|
0 |
0 |
if (username) self->sasl_username = savepv(username); |
|
3752
|
0 |
0 |
if (password) self->sasl_password = savepv(password); |
|
3768
|
1 |
0 |
if (self->state != CONN_READY) |
|
3776
|
0 |
0 |
if (ver < 0) ver = 1; |
|
3777
|
0 |
0 |
if (ver > 4) ver = 4; |
|
3779
|
0 |
0 |
if (SvOK(topics_sv) && SvROK(topics_sv) && SvTYPE(SvRV(topics_sv)) == SVt_PVAV) { |
|
|
0 |
0 |
if (SvOK(topics_sv) && SvROK(topics_sv) && SvTYPE(SvRV(topics_sv)) == SVt_PVAV) { |
|
|
0 |
0 |
if (SvOK(topics_sv) && SvROK(topics_sv) && SvTYPE(SvRV(topics_sv)) == SVt_PVAV) { |
|
3783
|
0 |
0 |
for (i = 0; i < count; i++) { |
|
3794
|
0 |
0 |
if (ver >= 4) |
|
3805
|
0 |
0 |
if (!self->api_versions_known) |
|
3810
|
0 |
0 |
for (i = 0; i < API_VERSIONS_MAX_KEY; i++) { |
|
3811
|
0 |
0 |
if (self->api_versions[i] >= 0) { |
|
3817
|
0 |
0 |
EXTEND(SP, 1); |
|
3826
|
1 |
0 |
if (self->state != CONN_READY) |
|
3833
|
0 |
0 |
if (arg1 && SvROK(arg1) && SvTYPE(SvRV(arg1)) == SVt_PVHV) { |
|
|
0 |
0 |
if (arg1 && SvROK(arg1) && SvTYPE(SvRV(arg1)) == SVt_PVHV) { |
|
|
0 |
0 |
if (arg1 && SvROK(arg1) && SvTYPE(SvRV(arg1)) == SVt_PVHV) { |
|
3843
|
0 |
0 |
if (opts) { |
|
3845
|
0 |
0 |
if ((v = hv_fetchs(opts, "max_bytes", 0)) && SvOK(*v)) |
|
|
0 |
0 |
if ((v = hv_fetchs(opts, "max_bytes", 0)) && SvOK(*v)) |
|
3847
|
0 |
0 |
if ((v = hv_fetchs(opts, "max_wait_ms", 0)) && SvOK(*v)) |
|
|
0 |
0 |
if ((v = hv_fetchs(opts, "max_wait_ms", 0)) && SvOK(*v)) |
|
3849
|
0 |
0 |
if ((v = hv_fetchs(opts, "min_bytes", 0)) && SvOK(*v)) |
|
|
0 |
0 |
if ((v = hv_fetchs(opts, "min_bytes", 0)) && SvOK(*v)) |
|
3857
|
0 |
0 |
if (ver < 0) ver = 4; |
|
3858
|
0 |
0 |
if (ver > 7) ver = 7; |
|
3868
|
0 |
0 |
if (ver >= 3) |
|
3872
|
0 |
0 |
if (ver >= 4) |
|
3876
|
0 |
0 |
if (ver >= 7) { |
|
3893
|
0 |
0 |
if (ver >= 5) |
|
3900
|
0 |
0 |
if (ver >= 7) |
|
3911
|
0 |
0 |
if (self->state != CONN_READY) |
|
3916
|
0 |
0 |
if (!SvROK(topics_sv) || SvTYPE(SvRV(topics_sv)) != SVt_PVHV) |
|
|
0 |
0 |
if (!SvROK(topics_sv) || SvTYPE(SvRV(topics_sv)) != SVt_PVHV) |
|
3922
|
0 |
0 |
if (arg1 && SvROK(arg1) && SvTYPE(SvRV(arg1)) == SVt_PVHV) { |
|
|
0 |
0 |
if (arg1 && SvROK(arg1) && SvTYPE(SvRV(arg1)) == SVt_PVHV) { |
|
|
0 |
0 |
if (arg1 && SvROK(arg1) && SvTYPE(SvRV(arg1)) == SVt_PVHV) { |
|
3932
|
0 |
0 |
if (opts) { |
|
3934
|
0 |
0 |
if ((v = hv_fetchs(opts, "max_bytes", 0)) && SvOK(*v)) |
|
|
0 |
0 |
if ((v = hv_fetchs(opts, "max_bytes", 0)) && SvOK(*v)) |
|
3936
|
0 |
0 |
if ((v = hv_fetchs(opts, "max_wait_ms", 0)) && SvOK(*v)) |
|
|
0 |
0 |
if ((v = hv_fetchs(opts, "max_wait_ms", 0)) && SvOK(*v)) |
|
3938
|
0 |
0 |
if ((v = hv_fetchs(opts, "min_bytes", 0)) && SvOK(*v)) |
|
|
0 |
0 |
if ((v = hv_fetchs(opts, "min_bytes", 0)) && SvOK(*v)) |
|
3943
|
0 |
0 |
if (ver < 0) ver = 4; |
|
3944
|
0 |
0 |
if (ver > 7) ver = 7; |
|
3952
|
0 |
0 |
if (ver >= 3) |
|
3954
|
0 |
0 |
if (ver >= 4) |
|
3956
|
0 |
0 |
if (ver >= 7) { |
|
3962
|
0 |
0 |
kf_buf_append_i32(&body, (int32_t)HvUSEDKEYS(topics_hv)); |
|
3966
|
0 |
0 |
while ((entry = hv_iternext(topics_hv))) { |
|
3972
|
0 |
0 |
if (!SvROK(parts_sv) || SvTYPE(SvRV(parts_sv)) != SVt_PVAV) |
|
|
0 |
0 |
if (!SvROK(parts_sv) || SvTYPE(SvRV(parts_sv)) != SVt_PVAV) |
|
3978
|
0 |
0 |
for (i = 0; i < pc; i++) { |
|
3980
|
0 |
0 |
if (!elem || !SvROK(*elem)) |
|
|
0 |
0 |
if (!elem || !SvROK(*elem)) |
|
3985
|
0 |
0 |
int32_t pid = pid_sv ? (int32_t)SvIV(*pid_sv) : 0; |
|
3989
|
0 |
0 |
int64_t offset = off_sv ? (int64_t)SvIV(*off_sv) : 0; |
|
3992
|
0 |
0 |
if (ver >= 5) |
|
3999
|
0 |
0 |
if (ver >= 7) |
|
4010
|
0 |
0 |
if (self->state != CONN_READY) |
|
4013
|
0 |
0 |
if (!SvROK(records_sv) || SvTYPE(SvRV(records_sv)) != SVt_PVAV) |
|
|
0 |
0 |
if (!SvROK(records_sv) || SvTYPE(SvRV(records_sv)) != SVt_PVAV) |
|
4025
|
0 |
0 |
if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV) { |
|
|
0 |
0 |
if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV) { |
|
|
0 |
0 |
if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV) { |
|
4028
|
0 |
0 |
if ((tmp = hv_fetch(opts, "acks", 4, 0))) |
|
4030
|
0 |
0 |
if ((tmp = hv_fetch(opts, "transactional_id", 16, 0)) && SvOK(*tmp)) |
|
|
0 |
0 |
if ((tmp = hv_fetch(opts, "transactional_id", 16, 0)) && SvOK(*tmp)) |
|
4032
|
0 |
0 |
if ((tmp = hv_fetch(opts, "compression", 11, 0))) { |
|
4035
|
0 |
0 |
if (clen == 4 && memcmp(cstr, "none", 4) == 0) compression = COMPRESS_NONE; |
|
|
0 |
0 |
if (clen == 4 && memcmp(cstr, "none", 4) == 0) compression = COMPRESS_NONE; |
|
4040
|
0 |
0 |
else if (clen == 4 && memcmp(cstr, "gzip", 4) == 0) compression = COMPRESS_GZIP; |
|
|
0 |
0 |
else if (clen == 4 && memcmp(cstr, "gzip", 4) == 0) compression = COMPRESS_GZIP; |
|
4050
|
0 |
0 |
if ((tmp = hv_fetch(opts, "producer_id", 11, 0))) |
|
4052
|
0 |
0 |
if ((tmp = hv_fetch(opts, "producer_epoch", 14, 0))) |
|
4054
|
0 |
0 |
if ((tmp = hv_fetch(opts, "base_sequence", 13, 0))) |
|
4056
|
0 |
0 |
} else if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVCV) { |
|
|
0 |
0 |
} else if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVCV) { |
|
|
0 |
0 |
} else if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVCV) { |
|
4071
|
0 |
0 |
if (ver < 0) ver = 3; |
|
4072
|
0 |
0 |
if (ver > 7) ver = 7; |
|
4079
|
0 |
0 |
if (ver >= 3) |
|
4080
|
0 |
0 |
kf_buf_append_nullable_string(&body, txn_id, txn_id ? (int16_t)txn_id_len : 0); |
|
4092
|
0 |
0 |
conn_send_request(aTHX_ self, API_PRODUCE, ver, &body, |
|
4093
|
0 |
0 |
(cb && SvOK(cb)) ? cb : NULL, 0, (acks == 0) ? 1 : 0); |
|
4103
|
0 |
0 |
if (self->state != CONN_READY) |
|
4111
|
0 |
0 |
if (ver < 0) ver = 1; |
|
4112
|
0 |
0 |
if (ver > 5) ver = 5; |
|
4120
|
0 |
0 |
if (ver >= 2) |
|
4132
|
0 |
0 |
if (ver >= 4) |
|
4145
|
1 |
0 |
if (self->state != CONN_READY) |
|
4155
|
0 |
0 |
if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV) { |
|
|
0 |
0 |
if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV) { |
|
|
0 |
0 |
if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV) { |
|
4158
|
0 |
0 |
if ((tmp = hv_fetch(opts, "headers", 7, 0)) && SvROK(*tmp) && SvTYPE(SvRV(*tmp)) == SVt_PVHV) |
|
|
0 |
0 |
if ((tmp = hv_fetch(opts, "headers", 7, 0)) && SvROK(*tmp) && SvTYPE(SvRV(*tmp)) == SVt_PVHV) |
|
|
0 |
0 |
if ((tmp = hv_fetch(opts, "headers", 7, 0)) && SvROK(*tmp) && SvTYPE(SvRV(*tmp)) == SVt_PVHV) |
|
4160
|
0 |
0 |
if ((tmp = hv_fetch(opts, "acks", 4, 0))) |
|
4162
|
0 |
0 |
if ((tmp = hv_fetch(opts, "compression", 11, 0))) { |
|
4165
|
0 |
0 |
if (clen == 4 && memcmp(cstr, "none", 4) == 0) compression = COMPRESS_NONE; |
|
|
0 |
0 |
if (clen == 4 && memcmp(cstr, "none", 4) == 0) compression = COMPRESS_NONE; |
|
4170
|
0 |
0 |
else if (clen == 4 && memcmp(cstr, "gzip", 4) == 0) compression = COMPRESS_GZIP; |
|
|
0 |
0 |
else if (clen == 4 && memcmp(cstr, "gzip", 4) == 0) compression = COMPRESS_GZIP; |
|
4180
|
0 |
0 |
} else if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVCV) { |
|
|
0 |
0 |
} else if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVCV) { |
|
|
0 |
0 |
} else if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVCV) { |
|
4188
|
0 |
0 |
if (SvOK(key_sv)) |
|
4193
|
0 |
0 |
if (SvOK(value_sv)) |
|
4200
|
0 |
0 |
if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV) |
|
|
0 |
0 |
if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV) |
|
|
0 |
0 |
if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV) |
|
4202
|
0 |
0 |
if (ts_tmp && SvOK(*ts_tmp)) { |
|
|
0 |
0 |
if (ts_tmp && SvOK(*ts_tmp)) { |
|
4216
|
0 |
0 |
if (key) hv_stores(rec, "key", newSVpvn(key, key_len)); |
|
4217
|
0 |
0 |
if (value) hv_stores(rec, "value", newSVpvn(value, value_len)); |
|
4218
|
0 |
0 |
if (headers) hv_stores(rec, "headers", newRV_inc((SV*)headers)); |
|
4228
|
0 |
0 |
if (ver < 0) ver = 3; |
|
4229
|
0 |
0 |
if (ver > 7) ver = 7; |
|
4237
|
0 |
0 |
if (ver >= 3) |
|
4255
|
0 |
0 |
conn_send_request(aTHX_ self, API_PRODUCE, ver, &body, |
|
4256
|
0 |
0 |
(cb && SvOK(cb)) ? cb : NULL, 0, (acks == 0) ? 1 : 0); |
|
4266
|
0 |
0 |
if (self->state != CONN_READY) |
|
4270
|
0 |
0 |
if (ver < 0) ver = 0; |
|
4271
|
0 |
0 |
if (ver > 2) ver = 2; |
|
4280
|
0 |
0 |
if (ver >= 1) |
|
4291
|
0 |
0 |
if (self->state != CONN_READY) |
|
4295
|
0 |
0 |
if (ver < 0) ver = 1; |
|
4296
|
0 |
0 |
if (ver > 5) ver = 5; |
|
4306
|
0 |
0 |
if (ver >= 1) |
|
4313
|
0 |
0 |
if (ver >= 5) { |
|
4314
|
0 |
0 |
if (group_instance_id_sv && SvOK(group_instance_id_sv)) { |
|
|
0 |
0 |
if (group_instance_id_sv && SvOK(group_instance_id_sv)) { |
|
4341
|
0 |
0 |
for (i = 0; i < tc; i++) { |
|
4360
|
0 |
0 |
if (self->state != CONN_READY) |
|
4364
|
0 |
0 |
if (ver < 0) ver = 0; |
|
4365
|
0 |
0 |
if (ver > 3) ver = 3; |
|
4378
|
0 |
0 |
if (ver >= 3) { |
|
4379
|
0 |
0 |
if (group_instance_id_sv && SvOK(group_instance_id_sv)) { |
|
|
0 |
0 |
if (group_instance_id_sv && SvOK(group_instance_id_sv)) { |
|
4389
|
0 |
0 |
if (SvOK(assignments_sv) && SvROK(assignments_sv) |
|
|
0 |
0 |
if (SvOK(assignments_sv) && SvROK(assignments_sv) |
|
4390
|
0 |
0 |
&& SvTYPE(SvRV(assignments_sv)) == SVt_PVAV) { |
|
4395
|
0 |
0 |
for (i = 0; i < ac; i++) { |
|
4397
|
0 |
0 |
if (!elem || !SvROK(*elem)) continue; |
|
|
0 |
0 |
if (!elem || !SvROK(*elem)) continue; |
|
4401
|
0 |
0 |
if (!mid_sv) continue; |
|
4407
|
0 |
0 |
if (!data_sv) { kf_buf_append_bytes(&body, NULL, 0); continue; } |
|
4424
|
0 |
0 |
if (self->state != CONN_READY) |
|
4428
|
0 |
0 |
if (ver < 0) ver = 0; |
|
4429
|
0 |
0 |
if (ver > 4) ver = 4; |
|
4442
|
0 |
0 |
if (ver >= 3) { |
|
4443
|
0 |
0 |
if (group_instance_id_sv && SvOK(group_instance_id_sv)) { |
|
|
0 |
0 |
if (group_instance_id_sv && SvOK(group_instance_id_sv)) { |
|
4460
|
0 |
0 |
if (self->state != CONN_READY) |
|
4464
|
0 |
0 |
if (ver < 0) ver = 2; |
|
4465
|
0 |
0 |
if (ver > 7) ver = 7; |
|
4474
|
0 |
0 |
if (ver >= 1) |
|
4478
|
0 |
0 |
if (ver >= 1) { |
|
4484
|
0 |
0 |
if (ver >= 7) |
|
4492
|
0 |
0 |
for (i = 0; i < tc; i++) { |
|
4494
|
0 |
0 |
if (!elem || !SvROK(*elem)) continue; |
|
|
0 |
0 |
if (!elem || !SvROK(*elem)) continue; |
|
4497
|
0 |
0 |
if (!tname_sv) continue; |
|
4503
|
0 |
0 |
if (!parts_sv || !SvROK(*parts_sv)) { kf_buf_append_i32(&body, 0); continue; } |
|
|
0 |
0 |
if (!parts_sv || !SvROK(*parts_sv)) { kf_buf_append_i32(&body, 0); continue; } |
|
4508
|
0 |
0 |
for (j = 0; j < pc; j++) { |
|
4510
|
0 |
0 |
if (!pelem || !SvROK(*pelem)) continue; |
|
|
0 |
0 |
if (!pelem || !SvROK(*pelem)) continue; |
|
4514
|
0 |
0 |
kf_buf_append_i32(&body, pid_sv ? (int32_t)SvIV(*pid_sv) : 0); |
|
4517
|
0 |
0 |
kf_buf_append_i64(&body, off_sv ? (int64_t)SvIV(*off_sv) : 0); |
|
4520
|
0 |
0 |
if (ver >= 6) |
|
4536
|
0 |
0 |
if (self->state != CONN_READY) |
|
4540
|
0 |
0 |
if (ver < 0) ver = 1; |
|
4541
|
0 |
0 |
if (ver > 5) ver = 5; |
|
4554
|
0 |
0 |
for (i = 0; i < tc; i++) { |
|
4556
|
0 |
0 |
if (!elem || !SvROK(*elem)) continue; |
|
|
0 |
0 |
if (!elem || !SvROK(*elem)) continue; |
|
4559
|
0 |
0 |
if (!tname_sv) continue; |
|
4565
|
0 |
0 |
if (!parts_sv || !SvROK(*parts_sv)) { kf_buf_append_i32(&body, 0); continue; } |
|
|
0 |
0 |
if (!parts_sv || !SvROK(*parts_sv)) { kf_buf_append_i32(&body, 0); continue; } |
|
4570
|
0 |
0 |
for (j = 0; j < pc; j++) { |
|
4572
|
0 |
0 |
kf_buf_append_i32(&body, pelem ? (int32_t)SvIV(*pelem) : 0); |
|
4584
|
0 |
0 |
if (self->state != CONN_READY) |
|
4588
|
0 |
0 |
if (ver < 0) ver = 0; |
|
4589
|
0 |
0 |
if (ver > 3) ver = 3; |
|
4608
|
0 |
0 |
if (self->state != CONN_READY) |
|
4612
|
0 |
0 |
if (ver < 0) ver = 0; |
|
4613
|
0 |
0 |
if (ver > 4) ver = 4; |
|
4622
|
0 |
0 |
for (i = 0; i < tc; i++) { |
|
4624
|
0 |
0 |
if (!elem || !SvROK(*elem)) continue; |
|
|
0 |
0 |
if (!elem || !SvROK(*elem)) continue; |
|
4628
|
0 |
0 |
if (!name_sv) continue; |
|
4634
|
0 |
0 |
int32_t num_partitions = np_sv ? (int32_t)SvIV(*np_sv) : 1; |
|
4638
|
0 |
0 |
int16_t replication_factor = rf_sv ? (int16_t)SvIV(*rf_sv) : 1; |
|
4651
|
0 |
0 |
if (ver >= 1) |
|
4662
|
0 |
0 |
if (self->state != CONN_READY) |
|
4666
|
0 |
0 |
if (ver < 0) ver = 0; |
|
4667
|
0 |
0 |
if (ver > 3) ver = 3; |
|
4676
|
0 |
0 |
for (i = 0; i < tc; i++) { |
|
4678
|
0 |
0 |
if (!elem) continue; |
|
4694
|
0 |
0 |
if (self->state != CONN_READY) |
|
4698
|
0 |
0 |
if (ver < 0) ver = 0; |
|
4699
|
0 |
0 |
if (ver > 1) ver = 1; |
|
4704
|
0 |
0 |
if (SvOK(transactional_id_sv)) { |
|
4715
|
0 |
0 |
if (ver >= 2) { |
|
4728
|
0 |
0 |
if (self->state != CONN_READY) |
|
4732
|
0 |
0 |
if (ver < 0) ver = 0; |
|
4733
|
0 |
0 |
if (ver > 1) ver = 1; |
|
4746
|
0 |
0 |
if (!SvROK(topics_sv) || SvTYPE(SvRV(topics_sv)) != SVt_PVAV) |
|
|
0 |
0 |
if (!SvROK(topics_sv) || SvTYPE(SvRV(topics_sv)) != SVt_PVAV) |
|
4752
|
0 |
0 |
for (i = 0; i < tc; i++) { |
|
4754
|
0 |
0 |
if (!elem || !SvROK(*elem)) croak("add_partitions_to_txn: bad element"); |
|
|
0 |
0 |
if (!elem || !SvROK(*elem)) croak("add_partitions_to_txn: bad element"); |
|
4757
|
0 |
0 |
if (!tname_sv) croak("add_partitions_to_txn: missing topic"); |
|
4763
|
0 |
0 |
if (!parts_sv || !SvROK(*parts_sv)) croak("add_partitions_to_txn: missing partitions"); |
|
|
0 |
0 |
if (!parts_sv || !SvROK(*parts_sv)) croak("add_partitions_to_txn: missing partitions"); |
|
4767
|
0 |
0 |
for (j = 0; j < pc; j++) { |
|
4769
|
0 |
0 |
kf_buf_append_i32(&body, pelem ? (int32_t)SvIV(*pelem) : 0); |
|
4781
|
0 |
0 |
if (self->state != CONN_READY) |
|
4785
|
0 |
0 |
if (ver < 0) ver = 0; |
|
4786
|
0 |
0 |
if (ver > 1) ver = 1; |
|
4807
|
0 |
0 |
if (self->state != CONN_READY) |
|
4811
|
0 |
0 |
if (ver < 0) ver = 0; |
|
4812
|
0 |
0 |
if (ver > 3) ver = 3; |
|
4829
|
0 |
0 |
if (ver >= 3) { |
|
4837
|
0 |
0 |
if (!SvROK(offsets_sv) || SvTYPE(SvRV(offsets_sv)) != SVt_PVAV) |
|
|
0 |
0 |
if (!SvROK(offsets_sv) || SvTYPE(SvRV(offsets_sv)) != SVt_PVAV) |
|
4843
|
0 |
0 |
for (i = 0; i < tc; i++) { |
|
4845
|
0 |
0 |
if (!elem || !SvROK(*elem)) croak("txn_offset_commit: bad element"); |
|
|
0 |
0 |
if (!elem || !SvROK(*elem)) croak("txn_offset_commit: bad element"); |
|
4849
|
0 |
0 |
if (!tname_sv) croak("txn_offset_commit: missing topic"); |
|
4855
|
0 |
0 |
if (!parts_sv || !SvROK(*parts_sv)) croak("txn_offset_commit: missing partitions"); |
|
|
0 |
0 |
if (!parts_sv || !SvROK(*parts_sv)) croak("txn_offset_commit: missing partitions"); |
|
4860
|
0 |
0 |
for (j = 0; j < pc; j++) { |
|
4862
|
0 |
0 |
if (!pelem || !SvROK(*pelem)) croak("txn_offset_commit: bad partition"); |
|
|
0 |
0 |
if (!pelem || !SvROK(*pelem)) croak("txn_offset_commit: bad partition"); |
|
4866
|
0 |
0 |
kf_buf_append_i32(&body, ppid_sv ? (int32_t)SvIV(*ppid_sv) : 0); |
|
4869
|
0 |
0 |
kf_buf_append_i64(&body, off_sv ? (int64_t)SvIV(*off_sv) : 0); |
|
4872
|
0 |
0 |
if (ver >= 2) |
|
4896
|
259 |
16 |
while (remaining >= 4) { |
|
4918
|
9 |
7 |
RETVAL = (int)(h & 0x7FFFFFFF); |
|
4969
|
0 |
12 |
EXTEND(SP, 1); |
|
5010
|
0 |
14 |
if (n < 0) RETVAL = &PL_sv_undef; |
|
5024
|
16 |
0 |
if (!SvROK(records_sv) || SvTYPE(SvRV(records_sv)) != SVt_PVAV) |
|
|
0 |
16 |
if (!SvROK(records_sv) || SvTYPE(SvRV(records_sv)) != SVt_PVAV) |
|
5034
|
11 |
5 |
if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV) { |
|
|
11 |
0 |
if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV) { |
|
|
11 |
0 |
if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV) { |
|
5037
|
10 |
1 |
if ((v = hv_fetchs(opts, "compression", 0)) && SvOK(*v)) |
|
|
10 |
0 |
if ((v = hv_fetchs(opts, "compression", 0)) && SvOK(*v)) |
|
5039
|
1 |
10 |
if ((v = hv_fetchs(opts, "producer_id", 0)) && SvOK(*v)) |
|
|
1 |
0 |
if ((v = hv_fetchs(opts, "producer_id", 0)) && SvOK(*v)) |
|
5041
|
1 |
10 |
if ((v = hv_fetchs(opts, "producer_epoch", 0)) && SvOK(*v)) |
|
|
1 |
0 |
if ((v = hv_fetchs(opts, "producer_epoch", 0)) && SvOK(*v)) |
|
5043
|
1 |
10 |
if ((v = hv_fetchs(opts, "base_sequence", 0)) && SvOK(*v)) |
|
|
1 |
0 |
if ((v = hv_fetchs(opts, "base_sequence", 0)) && SvOK(*v)) |
|
5045
|
0 |
11 |
if ((v = hv_fetchs(opts, "is_transactional", 0)) && SvOK(*v)) |
|
|
0 |
0 |
if ((v = hv_fetchs(opts, "is_transactional", 0)) && SvOK(*v)) |
|
5047
|
0 |
11 |
if ((v = hv_fetchs(opts, "timestamp", 0)) && SvOK(*v)) |
|
|
0 |
0 |
if ((v = hv_fetchs(opts, "timestamp", 0)) && SvOK(*v)) |
|
5074
|
2 |
13 |
if (strcmp(api, "metadata") == 0) { |
|
5076
|
1 |
12 |
} else if (strcmp(api, "produce") == 0) { |
|
5078
|
0 |
12 |
} else if (strcmp(api, "fetch") == 0) { |
|
5080
|
0 |
12 |
} else if (strcmp(api, "list_offsets") == 0) { |
|
5082
|
1 |
11 |
} else if (strcmp(api, "find_coordinator") == 0) { |
|
5084
|
1 |
10 |
} else if (strcmp(api, "join_group") == 0) { |
|
5086
|
1 |
9 |
} else if (strcmp(api, "sync_group") == 0) { |
|
5088
|
1 |
8 |
} else if (strcmp(api, "heartbeat") == 0) { |
|
5090
|
0 |
8 |
} else if (strcmp(api, "offset_commit") == 0) { |
|
5092
|
1 |
7 |
} else if (strcmp(api, "offset_fetch") == 0) { |
|
5094
|
1 |
6 |
} else if (strcmp(api, "leave_group") == 0) { |
|
5096
|
1 |
5 |
} else if (strcmp(api, "create_topics") == 0) { |
|
5098
|
1 |
4 |
} else if (strcmp(api, "delete_topics") == 0) { |
|
5100
|
1 |
3 |
} else if (strcmp(api, "init_producer_id") == 0) { |
|
5102
|
1 |
2 |
} else if (strcmp(api, "add_partitions_to_txn") == 0) { |
|
5104
|
1 |
1 |
} else if (strcmp(api, "end_txn") == 0) { |
|
5106
|
1 |
0 |
} else if (strcmp(api, "txn_offset_commit") == 0) { |
|
5111
|
15 |
0 |
RETVAL = result ? SvREFCNT_inc(result) : &PL_sv_undef; |
|
5127
|
3 |
13 |
if (n < 0) { |
|
5139
|
0 |
21 |
I_EV_API("EV::Kafka"); |
|
|
21 |
0 |
I_EV_API("EV::Kafka"); |
|
|
0 |
21 |
I_EV_API("EV::Kafka"); |