| line |
true |
false |
branch |
|
126
|
0 |
0 |
if (b->cap >= need) return; |
|
128
|
0 |
0 |
if (newcap < need) newcap = need; |
|
134
|
0 |
0 |
if (b->data) { Safefree(b->data); b->data = NULL; } |
|
176
|
0 |
0 |
if (len > 0) kf_buf_append(b, s, len); |
|
180
|
0 |
0 |
if (!s) { |
|
184
|
0 |
0 |
if (len > 0) kf_buf_append(b, s, len); |
|
191
|
0 |
0 |
if (len > 0) kf_buf_append(b, s, len); |
|
195
|
0 |
0 |
if (!s) { |
|
199
|
0 |
0 |
if (len > 0) kf_buf_append(b, s, len); |
|
206
|
0 |
0 |
while (val >= 0x80) { |
|
221
|
0 |
0 |
if (!s) { |
|
225
|
0 |
0 |
if (len > 0) kf_buf_append(b, s, len); |
|
262
|
0 |
0 |
while (p < end) { |
|
265
|
0 |
0 |
if (!(b & 0x80)) { |
|
270
|
0 |
0 |
if (shift >= 64) return -1; |
|
279
|
0 |
0 |
if (n < 0) return n; |
|
286
|
0 |
0 |
if (end - buf < 2) return -1; |
|
288
|
0 |
0 |
if (len < 0) { /* nullable null */ |
|
293
|
0 |
0 |
if (end - buf < 2 + len) return -1; |
|
303
|
0 |
0 |
if (n < 0) return -1; |
|
304
|
0 |
0 |
if (raw == 0) { |
|
310
|
0 |
0 |
if (end - buf - n < len) return -1; |
|
320
|
0 |
0 |
if (n < 0) return -1; |
|
323
|
0 |
0 |
for (i = 0; i < count; i++) { |
|
326
|
0 |
0 |
if (tn < 0) return -1; |
|
330
|
0 |
0 |
if (dn < 0) return -1; |
|
332
|
0 |
0 |
if ((uint64_t)(end - p) < dlen) return -1; |
|
347
|
3840 |
15 |
for (i = 0; i < 256; i++) { |
|
349
|
30720 |
3840 |
for (j = 0; j < 8; j++) { |
|
350
|
15360 |
15360 |
if (crc & 1) |
|
363
|
0 |
7 |
if (!crc32c_table_inited) crc32c_init_table(); |
|
364
|
24 |
7 |
for (i = 0; i < len; i++) |
|
538
|
0 |
0 |
if (self->ssl) { |
|
539
|
0 |
0 |
int ssl_len = (len > (size_t)INT_MAX) ? INT_MAX : (int)len; |
|
541
|
0 |
0 |
if (ret <= 0) { |
|
543
|
0 |
0 |
if (err == SSL_ERROR_WANT_READ) { |
|
547
|
0 |
0 |
if (err == SSL_ERROR_WANT_WRITE) { |
|
552
|
0 |
0 |
if (err == SSL_ERROR_ZERO_RETURN) return 0; |
|
564
|
0 |
0 |
if (self->ssl) { |
|
565
|
0 |
0 |
int ssl_len = (len > (size_t)INT_MAX) ? INT_MAX : (int)len; |
|
567
|
0 |
0 |
if (ret <= 0) { |
|
569
|
0 |
0 |
if (err == SSL_ERROR_WANT_WRITE) { |
|
573
|
0 |
0 |
if (err == SSL_ERROR_WANT_READ) { |
|
592
|
0 |
0 |
if (self->rbuf_cap >= need) return; |
|
594
|
0 |
0 |
if (newcap < need) newcap = need; |
|
600
|
0 |
0 |
if (self->wbuf_cap >= need) return; |
|
602
|
0 |
0 |
if (newcap < need) newcap = need; |
|
612
|
0 |
0 |
if (!self->reading && self->fd >= 0) { |
|
|
0 |
0 |
if (!self->reading && self->fd >= 0) { |
|
619
|
0 |
7 |
if (self->reading) { |
|
626
|
1 |
0 |
if (!self->writing && self->fd >= 0) { |
|
|
1 |
0 |
if (!self->writing && self->fd >= 0) { |
|
633
|
1 |
7 |
if (self->writing) { |
|
644
|
0 |
1 |
if (!self->on_error) |
|
652
|
0 |
1 |
PUSHMARK(SP); |
|
653
|
0 |
1 |
XPUSHs(sv_2mortal(newSVpv(msg, 0))); |
|
656
|
1 |
0 |
if (SvTRUE(ERRSV)) { |
|
|
0 |
1 |
if (SvTRUE(ERRSV)) { |
|
657
|
0 |
0 |
warn("EV::Kafka::Conn: on_error callback error: %s", SvPV_nolen(ERRSV)); |
|
658
|
0 |
0 |
sv_setsv(ERRSV, &PL_sv_undef); |
|
660
|
1 |
0 |
FREETMPS; |
|
667
|
0 |
0 |
if (!self->on_connect) return; |
|
674
|
0 |
0 |
PUSHMARK(SP); |
|
677
|
0 |
0 |
if (SvTRUE(ERRSV)) { |
|
|
0 |
0 |
if (SvTRUE(ERRSV)) { |
|
678
|
0 |
0 |
warn("EV::Kafka::Conn: on_connect callback error: %s", SvPV_nolen(ERRSV)); |
|
679
|
0 |
0 |
sv_setsv(ERRSV, &PL_sv_undef); |
|
681
|
0 |
0 |
FREETMPS; |
|
688
|
1 |
0 |
if (!self->on_disconnect) return; |
|
695
|
0 |
0 |
PUSHMARK(SP); |
|
698
|
0 |
0 |
if (SvTRUE(ERRSV)) { |
|
|
0 |
0 |
if (SvTRUE(ERRSV)) { |
|
699
|
0 |
0 |
warn("EV::Kafka::Conn: on_disconnect callback error: %s", SvPV_nolen(ERRSV)); |
|
700
|
0 |
0 |
sv_setsv(ERRSV, &PL_sv_undef); |
|
702
|
0 |
0 |
FREETMPS; |
|
710
|
0 |
0 |
if (!cb) return; |
|
717
|
0 |
0 |
PUSHMARK(SP); |
|
718
|
0 |
0 |
EXTEND(SP, 2); |
|
719
|
0 |
0 |
PUSHs(result ? result : &PL_sv_undef); |
|
720
|
0 |
0 |
PUSHs(error ? error : &PL_sv_undef); |
|
723
|
0 |
0 |
if (SvTRUE(ERRSV)) { |
|
|
0 |
0 |
if (SvTRUE(ERRSV)) { |
|
724
|
0 |
0 |
warn("EV::Kafka::Conn: callback error: %s", SvPV_nolen(ERRSV)); |
|
725
|
0 |
0 |
sv_setsv(ERRSV, &PL_sv_undef); |
|
727
|
0 |
0 |
FREETMPS; |
|
744
|
0 |
7 |
if (self->timing) { |
|
749
|
0 |
7 |
if (self->ssl) { |
|
753
|
0 |
7 |
if (self->ssl_ctx) { |
|
758
|
1 |
6 |
if (self->fd >= 0) { |
|
769
|
0 |
7 |
while (!ngx_queue_empty(&self->cb_queue)) { |
|
774
|
0 |
0 |
if (cbt->cb && !cbt->internal) { |
|
|
0 |
0 |
if (cbt->cb && !cbt->internal) { |
|
776
|
0 |
0 |
if (conn_check_destroyed(self)) { |
|
783
|
0 |
0 |
if (cbt->cb) SvREFCNT_dec(cbt->cb); |
|
793
|
0 |
1 |
if (conn_check_destroyed(self)) return; |
|
796
|
0 |
1 |
if (conn_check_destroyed(self)) return; |
|
798
|
1 |
0 |
if (!self->intentional_disconnect && self->auto_reconnect) { |
|
|
0 |
1 |
if (!self->intentional_disconnect && self->auto_reconnect) { |
|
814
|
0 |
0 |
if (self->magic != KF_MAGIC_ALIVE) return; |
|
815
|
0 |
0 |
if (self->state != CONN_DISCONNECTED) return; |
|
816
|
0 |
0 |
if (!self->host) return; |
|
822
|
0 |
0 |
if (self->reconnect_timing) return; |
|
824
|
0 |
0 |
if (delay < 0.01) delay = 1.0; |
|
850
|
0 |
0 |
int flexible = (api_key == API_API_VERSIONS && api_version >= 3); |
|
|
0 |
0 |
int flexible = (api_key == API_API_VERSIONS && api_version >= 3); |
|
856
|
0 |
0 |
if (flexible) { |
|
866
|
0 |
0 |
if (raw_size > (size_t)INT32_MAX) { |
|
873
|
0 |
0 |
if (self->wbuf_off > 0 && self->wbuf_off > self->wbuf_len / 2) { |
|
|
0 |
0 |
if (self->wbuf_off > 0 && self->wbuf_off > self->wbuf_len / 2) { |
|
875
|
0 |
0 |
if (self->wbuf_len > 0) |
|
895
|
0 |
0 |
if (!no_response) { |
|
902
|
0 |
0 |
if (cb) { |
|
937
|
0 |
0 |
for (i = 0; i < API_VERSIONS_MAX_KEY; i++) |
|
940
|
0 |
0 |
if (end - p < 2) goto err; |
|
942
|
0 |
0 |
if (error_code != 0) { |
|
946
|
0 |
0 |
if (conn_check_destroyed(self)) return; |
|
952
|
0 |
0 |
if (end - p < 4) goto err; |
|
955
|
0 |
0 |
for (i = 0; i < count; i++) { |
|
956
|
0 |
0 |
if (end - p < 6) goto err; |
|
961
|
0 |
0 |
if (key >= 0 && key < API_VERSIONS_MAX_KEY) |
|
|
0 |
0 |
if (key >= 0 && key < API_VERSIONS_MAX_KEY) |
|
968
|
0 |
0 |
if (self->sasl_mechanism) { |
|
979
|
0 |
0 |
if (conn_check_destroyed(self)) return; |
|
1006
|
0 |
0 |
if (end - p < 2) goto err; |
|
1010
|
0 |
0 |
if (end - p < 4) goto err; |
|
1013
|
0 |
0 |
for (i = 0; i < count; i++) { |
|
1016
|
0 |
0 |
if (n < 0) goto err; |
|
1020
|
0 |
0 |
if (error_code != 0) { |
|
1022
|
0 |
0 |
if (conn_check_destroyed(self)) return; |
|
1033
|
0 |
0 |
if (conn_check_destroyed(self)) return; |
|
1046
|
0 |
0 |
if (self->sasl_mechanism && strcmp(self->sasl_mechanism, "PLAIN") == 0) { |
|
|
0 |
0 |
if (self->sasl_mechanism && strcmp(self->sasl_mechanism, "PLAIN") == 0) { |
|
1048
|
0 |
0 |
STRLEN ulen = self->sasl_username ? strlen(self->sasl_username) : 0; |
|
1049
|
0 |
0 |
STRLEN plen = self->sasl_password ? strlen(self->sasl_password) : 0; |
|
1054
|
0 |
0 |
if (ulen > 0) kf_buf_append(&body, self->sasl_username, ulen); |
|
1056
|
0 |
0 |
if (plen > 0) kf_buf_append(&body, self->sasl_password, plen); |
|
1059
|
0 |
0 |
else if (self->sasl_mechanism && |
|
1060
|
0 |
0 |
(strcmp(self->sasl_mechanism, "SCRAM-SHA-256") == 0 || |
|
1061
|
0 |
0 |
strcmp(self->sasl_mechanism, "SCRAM-SHA-512") == 0)) { |
|
1068
|
0 |
0 |
for (i = 0; i < 16; i++) |
|
1074
|
0 |
0 |
if (self->scram_nonce) Safefree(self->scram_nonce); |
|
1086
|
0 |
0 |
if (self->scram_client_first) Safefree(self->scram_client_first); |
|
1113
|
0 |
0 |
if (end - p < 2) goto err; |
|
1120
|
0 |
0 |
if (n < 0) goto err; |
|
1127
|
0 |
0 |
if (end - p >= 4) { |
|
1129
|
0 |
0 |
if (auth_data_len > 0 && end - p >= auth_data_len) { |
|
|
0 |
0 |
if (auth_data_len > 0 && end - p >= auth_data_len) { |
|
1135
|
0 |
0 |
if (error_code != 0) { |
|
1137
|
0 |
0 |
if (errmsg_str && errmsg_len > 0) |
|
|
0 |
0 |
if (errmsg_str && errmsg_len > 0) |
|
1142
|
0 |
0 |
if (conn_check_destroyed(self)) return; |
|
1149
|
0 |
0 |
if (self->sasl_mechanism && self->scram_step == SCRAM_STEP_CLIENT_FIRST && auth_data) { |
|
|
0 |
0 |
if (self->sasl_mechanism && self->scram_step == SCRAM_STEP_CLIENT_FIRST && auth_data) { |
|
|
0 |
0 |
if (self->sasl_mechanism && self->scram_step == SCRAM_STEP_CLIENT_FIRST && auth_data) { |
|
1160
|
0 |
0 |
while (sp < se) { |
|
1161
|
0 |
0 |
if (sp + 2 <= se && sp[0] == 'r' && sp[1] == '=') { |
|
|
0 |
0 |
if (sp + 2 <= se && sp[0] == 'r' && sp[1] == '=') { |
|
|
0 |
0 |
if (sp + 2 <= se && sp[0] == 'r' && sp[1] == '=') { |
|
1163
|
0 |
0 |
while (sp < se && *sp != ',') sp++; |
|
|
0 |
0 |
while (sp < se && *sp != ',') sp++; |
|
1165
|
0 |
0 |
} else if (sp + 2 <= se && sp[0] == 's' && sp[1] == '=') { |
|
|
0 |
0 |
} else if (sp + 2 <= se && sp[0] == 's' && sp[1] == '=') { |
|
|
0 |
0 |
} else if (sp + 2 <= se && sp[0] == 's' && sp[1] == '=') { |
|
1167
|
0 |
0 |
while (sp < se && *sp != ',') sp++; |
|
|
0 |
0 |
while (sp < se && *sp != ',') sp++; |
|
1169
|
0 |
0 |
} else if (sp + 2 <= se && sp[0] == 'i' && sp[1] == '=') { |
|
|
0 |
0 |
} else if (sp + 2 <= se && sp[0] == 'i' && sp[1] == '=') { |
|
|
0 |
0 |
} else if (sp + 2 <= se && sp[0] == 'i' && sp[1] == '=') { |
|
1172
|
0 |
0 |
while (sp < se && *sp != ',') sp++; |
|
|
0 |
0 |
while (sp < se && *sp != ',') sp++; |
|
1174
|
0 |
0 |
if (sp < se && *sp == ',') sp++; |
|
|
0 |
0 |
if (sp < se && *sp == ',') sp++; |
|
1179
|
0 |
0 |
if (!server_nonce || !salt_b64 || iterations <= 0) { |
|
|
0 |
0 |
if (!server_nonce || !salt_b64 || iterations <= 0) { |
|
|
0 |
0 |
if (!server_nonce || !salt_b64 || iterations <= 0) { |
|
1181
|
0 |
0 |
if (conn_check_destroyed(self)) return; |
|
1187
|
0 |
0 |
if (server_nonce_len < 32 || |
|
1188
|
0 |
0 |
memcmp(server_nonce, self->scram_nonce, 32) != 0) { |
|
1190
|
0 |
0 |
if (conn_check_destroyed(self)) return; |
|
1196
|
0 |
0 |
const EVP_MD *md = is_sha512 ? EVP_sha512() : EVP_sha256(); |
|
1197
|
0 |
0 |
int digest_len = is_sha512 ? 64 : 32; |
|
1209
|
0 |
0 |
if (salt_len <= 0) { |
|
1211
|
0 |
0 |
if (conn_check_destroyed(self)) return; |
|
1260
|
0 |
0 |
for (di = 0; di < digest_len; di++) |
|
1276
|
0 |
0 |
int plen = bptr->length < 255 ? (int)bptr->length : 255; |
|
1305
|
0 |
0 |
if (self->sasl_mechanism && self->scram_step == SCRAM_STEP_CLIENT_FINAL) { |
|
|
0 |
0 |
if (self->sasl_mechanism && self->scram_step == SCRAM_STEP_CLIENT_FINAL) { |
|
1319
|
0 |
0 |
if (conn_check_destroyed(self)) return; |
|
1331
|
0 |
0 |
if (cbt->internal) { |
|
1348
|
0 |
0 |
if (!cbt->cb) return; |
|
1467
|
0 |
0 |
if (key) { |
|
1475
|
0 |
0 |
if (value) { |
|
1483
|
0 |
0 |
if (headers && HvUSEDKEYS(headers) > 0) { |
|
|
0 |
0 |
if (headers && HvUSEDKEYS(headers) > 0) { |
|
|
0 |
0 |
if (headers && HvUSEDKEYS(headers) > 0) { |
|
1484
|
0 |
0 |
kf_buf_append_varint(&rec, (int64_t)HvUSEDKEYS(headers)); |
|
1487
|
0 |
0 |
while ((entry = hv_iternext(headers))) { |
|
1523
|
0 |
0 |
for (i = 0; i < count; i++) { |
|
1525
|
0 |
0 |
if (!elem || !SvROK(*elem)) |
|
|
0 |
0 |
if (!elem || !SvROK(*elem)) |
|
1534
|
0 |
0 |
if (key_sv && SvOK(*key_sv)) key = SvPV(*key_sv, key_len); |
|
|
0 |
0 |
if (key_sv && SvOK(*key_sv)) key = SvPV(*key_sv, key_len); |
|
1535
|
0 |
0 |
if (val_sv && SvOK(*val_sv)) val = SvPV(*val_sv, val_len); |
|
|
0 |
0 |
if (val_sv && SvOK(*val_sv)) val = SvPV(*val_sv, val_len); |
|
1536
|
0 |
0 |
if (hdr_sv && SvROK(*hdr_sv) && SvTYPE(SvRV(*hdr_sv)) == SVt_PVHV) |
|
|
0 |
0 |
if (hdr_sv && SvROK(*hdr_sv) && SvTYPE(SvRV(*hdr_sv)) == SVt_PVHV) |
|
|
0 |
0 |
if (hdr_sv && SvROK(*hdr_sv) && SvTYPE(SvRV(*hdr_sv)) == SVt_PVHV) |
|
1545
|
0 |
0 |
if (is_transactional) attrs |= 0x10; /* bit 4 = isTransactional */ |
|
1574
|
0 |
0 |
if (compression == COMPRESS_GZIP) { |
|
1583
|
0 |
0 |
if (zinit == Z_OK) { |
|
1588
|
0 |
0 |
if (deflate(&zs, Z_FINISH) == Z_STREAM_END) zok = 1; |
|
1591
|
0 |
0 |
if (zok) { |
|
1668
|
0 |
0 |
if (compression == COMPRESS_GZIP) { |
|
1677
|
0 |
0 |
if (zinit == Z_OK) { |
|
1682
|
0 |
0 |
if (deflate(&zs, Z_FINISH) == Z_STREAM_END) zok = 1; |
|
1685
|
0 |
0 |
if (zok) { |
|
1739
|
0 |
0 |
if (flexible) { |
|
1741
|
0 |
0 |
if (end - p < 4) goto done; |
|
1747
|
0 |
0 |
if (n < 0) goto done; |
|
1752
|
0 |
0 |
for (i = 0; i < broker_count; i++) { |
|
1754
|
0 |
0 |
if (end - p < 4) goto done; |
|
1760
|
0 |
0 |
if (n < 0) goto done; |
|
1762
|
0 |
0 |
hv_store(bh, "host", 4, newSVpvn(host ? host : "", host ? hlen : 0), 0); |
|
|
0 |
0 |
hv_store(bh, "host", 4, newSVpvn(host ? host : "", host ? hlen : 0), 0); |
|
1764
|
0 |
0 |
if (end - p < 4) goto done; |
|
1771
|
0 |
0 |
if (n < 0) goto done; |
|
1776
|
0 |
0 |
if (n < 0) goto done; |
|
1785
|
0 |
0 |
if (n < 0) goto done; |
|
1789
|
0 |
0 |
if (end - p < 4) goto done; |
|
1795
|
0 |
0 |
if (n < 0) goto done; |
|
1799
|
0 |
0 |
for (i = 0; i < topic_count; i++) { |
|
1801
|
0 |
0 |
if (end - p < 2) goto done; |
|
1807
|
0 |
0 |
if (n < 0) goto done; |
|
1809
|
0 |
0 |
hv_store(th, "name", 4, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0); |
|
|
0 |
0 |
hv_store(th, "name", 4, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0); |
|
1812
|
0 |
0 |
if (version >= 10) { |
|
1813
|
0 |
0 |
if (end - p < 16) goto done; |
|
1818
|
0 |
0 |
if (end - p < 1) goto done; |
|
1823
|
0 |
0 |
if (n < 0) goto done; |
|
1829
|
0 |
0 |
for (j = 0; j < part_count; j++) { |
|
1831
|
0 |
0 |
if (end - p < 2) goto done; |
|
1835
|
0 |
0 |
if (end - p < 4) goto done; |
|
1839
|
0 |
0 |
if (end - p < 4) goto done; |
|
1844
|
0 |
0 |
if (version >= 7) { |
|
1845
|
0 |
0 |
if (end - p < 4) goto done; |
|
1851
|
0 |
0 |
if (n < 0) goto done; |
|
1854
|
0 |
0 |
if (end - p < (ptrdiff_t)(rcount * 4)) goto done; |
|
1859
|
0 |
0 |
if (n < 0) goto done; |
|
1862
|
0 |
0 |
if (end - p < (ptrdiff_t)(rcount * 4)) goto done; |
|
1866
|
0 |
0 |
if (version >= 5) { |
|
1868
|
0 |
0 |
if (n < 0) goto done; |
|
1871
|
0 |
0 |
if (end - p < (ptrdiff_t)(rcount * 4)) goto done; |
|
1877
|
0 |
0 |
if (n < 0) goto done; |
|
1885
|
0 |
0 |
if (version >= 8) { |
|
1886
|
0 |
0 |
if (end - p < 4) goto done; |
|
1892
|
0 |
0 |
if (n < 0) goto done; |
|
1900
|
0 |
0 |
if (version >= 3) { |
|
1901
|
0 |
0 |
if (end - p < 4) goto done; |
|
1906
|
0 |
0 |
if (end - p < 4) goto done; |
|
1909
|
0 |
0 |
for (i = 0; i < broker_count; i++) { |
|
1911
|
0 |
0 |
if (end - p < 4) goto done; |
|
1917
|
0 |
0 |
if (n < 0) goto done; |
|
1919
|
0 |
0 |
hv_store(bh, "host", 4, newSVpvn(host ? host : "", host ? hlen : 0), 0); |
|
|
0 |
0 |
hv_store(bh, "host", 4, newSVpvn(host ? host : "", host ? hlen : 0), 0); |
|
1921
|
0 |
0 |
if (end - p < 4) goto done; |
|
1926
|
0 |
0 |
if (version >= 1) { |
|
1929
|
0 |
0 |
if (n < 0) goto done; |
|
1937
|
0 |
0 |
if (version >= 2) { |
|
1940
|
0 |
0 |
if (n < 0) goto done; |
|
1945
|
0 |
0 |
if (version >= 1) { |
|
1946
|
0 |
0 |
if (end - p < 4) goto done; |
|
1952
|
0 |
0 |
if (end - p < 4) goto done; |
|
1954
|
0 |
0 |
for (i = 0; i < topic_count; i++) { |
|
1956
|
0 |
0 |
if (end - p < 2) goto done; |
|
1962
|
0 |
0 |
if (n < 0) goto done; |
|
1964
|
0 |
0 |
hv_store(th, "name", 4, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0); |
|
|
0 |
0 |
hv_store(th, "name", 4, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0); |
|
1967
|
0 |
0 |
if (version >= 1) { |
|
1968
|
0 |
0 |
if (end - p < 1) goto done; |
|
1973
|
0 |
0 |
if (end - p < 4) goto done; |
|
1977
|
0 |
0 |
for (j = 0; j < part_count; j++) { |
|
1979
|
0 |
0 |
if (end - p < 2) goto done; |
|
1983
|
0 |
0 |
if (end - p < 4) goto done; |
|
1987
|
0 |
0 |
if (end - p < 4) goto done; |
|
1992
|
0 |
0 |
if (version >= 7) { |
|
1993
|
0 |
0 |
if (end - p < 4) goto done; |
|
1998
|
0 |
0 |
if (end - p < 4) goto done; |
|
2000
|
0 |
0 |
if (end - p < (ptrdiff_t)(rcount * 4)) goto done; |
|
2004
|
0 |
0 |
if (end - p < 4) goto done; |
|
2006
|
0 |
0 |
if (end - p < (ptrdiff_t)(rcount * 4)) goto done; |
|
2010
|
0 |
0 |
if (version >= 5) { |
|
2011
|
0 |
0 |
if (end - p < 4) goto done; |
|
2013
|
0 |
0 |
if (end - p < (ptrdiff_t)(rcount * 4)) goto done; |
|
2045
|
0 |
0 |
if (end - p < 4) goto done; |
|
2049
|
0 |
0 |
for (i = 0; i < topic_count; i++) { |
|
2053
|
0 |
0 |
if (n < 0) goto done; |
|
2055
|
0 |
0 |
hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0); |
|
|
0 |
0 |
hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0); |
|
2058
|
0 |
0 |
if (end - p < 4) goto done; |
|
2063
|
0 |
0 |
for (j = 0; j < part_count; j++) { |
|
2065
|
0 |
0 |
if (end - p < 4) goto done; |
|
2069
|
0 |
0 |
if (end - p < 2) goto done; |
|
2073
|
0 |
0 |
if (end - p < 8) goto done; |
|
2078
|
0 |
0 |
if (version >= 2) { |
|
2079
|
0 |
0 |
if (end - p < 8) goto done; |
|
2084
|
0 |
0 |
if (version >= 5) { |
|
2085
|
0 |
0 |
if (end - p < 8) goto done; |
|
2096
|
0 |
0 |
if (version >= 1 && end - p >= 4) { |
|
|
0 |
0 |
if (version >= 1 && end - p >= 4) { |
|
2119
|
0 |
0 |
if (end - p < 12) return -1; |
|
2121
|
0 |
0 |
if (out_base_offset) *out_base_offset = base_offset; |
|
2124
|
0 |
0 |
if (end - p < batch_length) return -1; |
|
2127
|
0 |
0 |
if (batch_end - p < 9) return -1; |
|
2130
|
0 |
0 |
if (magic != 2) return -1; /* only support magic=2 (current format) */ |
|
2133
|
0 |
0 |
if (batch_end - p < 36) return -1; |
|
2143
|
0 |
0 |
if (batch_end - p < 4) return -1; |
|
2151
|
0 |
0 |
if (compression_type != COMPRESS_NONE && batch_end > p) { |
|
|
0 |
0 |
if (compression_type != COMPRESS_NONE && batch_end > p) { |
|
2154
|
0 |
0 |
if (decomp_cap < 4096) decomp_cap = 4096; |
|
2157
|
0 |
0 |
if (compression_type == COMPRESS_GZIP) { |
|
2159
|
0 |
0 |
while (!zok && decomp_cap < 64 * 1024 * 1024) { |
|
|
0 |
0 |
while (!zok && decomp_cap < 64 * 1024 * 1024) { |
|
2164
|
0 |
0 |
if (zinit != Z_OK) { |
|
2176
|
0 |
0 |
if (zret == Z_STREAM_END) { |
|
2180
|
0 |
0 |
} else if (zret == Z_BUF_ERROR || zret == Z_OK) { |
|
|
0 |
0 |
} else if (zret == Z_BUF_ERROR || zret == Z_OK) { |
|
2210
|
0 |
0 |
for (i = 0; i < record_count; i++) { |
|
2213
|
0 |
0 |
if (n < 0) { if (decompressed) Safefree(decompressed); return -1; } |
|
|
0 |
0 |
if (n < 0) { if (decompressed) Safefree(decompressed); return -1; } |
|
2215
|
0 |
0 |
if (rec_end - rp < rec_len) { if (decompressed) Safefree(decompressed); return -1; } |
|
|
0 |
0 |
if (rec_end - rp < rec_len) { if (decompressed) Safefree(decompressed); return -1; } |
|
2218
|
0 |
0 |
if (this_rec_end - rp < 1) { if (decompressed) Safefree(decompressed); return -1; } |
|
|
0 |
0 |
if (this_rec_end - rp < 1) { if (decompressed) Safefree(decompressed); return -1; } |
|
2223
|
0 |
0 |
if (n < 0) { if (decompressed) Safefree(decompressed); return -1; } |
|
|
0 |
0 |
if (n < 0) { if (decompressed) Safefree(decompressed); return -1; } |
|
2228
|
0 |
0 |
if (n < 0) { if (decompressed) Safefree(decompressed); return -1; } |
|
|
0 |
0 |
if (n < 0) { if (decompressed) Safefree(decompressed); return -1; } |
|
2234
|
0 |
0 |
if (n < 0) { if (decompressed) Safefree(decompressed); return -1; } |
|
|
0 |
0 |
if (n < 0) { if (decompressed) Safefree(decompressed); return -1; } |
|
2237
|
0 |
0 |
if (key_len >= 0) { |
|
2238
|
0 |
0 |
if (this_rec_end - rp < key_len) { if (decompressed) Safefree(decompressed); return -1; } |
|
|
0 |
0 |
if (this_rec_end - rp < key_len) { if (decompressed) Safefree(decompressed); return -1; } |
|
2246
|
0 |
0 |
if (n < 0) { if (decompressed) Safefree(decompressed); return -1; } |
|
|
0 |
0 |
if (n < 0) { if (decompressed) Safefree(decompressed); return -1; } |
|
2249
|
0 |
0 |
if (val_len >= 0) { |
|
2250
|
0 |
0 |
if (this_rec_end - rp < val_len) { if (decompressed) Safefree(decompressed); return -1; } |
|
|
0 |
0 |
if (this_rec_end - rp < val_len) { if (decompressed) Safefree(decompressed); return -1; } |
|
2258
|
0 |
0 |
if (n < 0) { if (decompressed) Safefree(decompressed); return -1; } |
|
|
0 |
0 |
if (n < 0) { if (decompressed) Safefree(decompressed); return -1; } |
|
2261
|
0 |
0 |
if (hdr_count > 0) { |
|
2264
|
0 |
0 |
for (h = 0; h < hdr_count; h++) { |
|
2267
|
0 |
0 |
if (n < 0) { SvREFCNT_dec((SV*)hdr_hv); if (decompressed) Safefree(decompressed); return -1; } |
|
|
0 |
0 |
if (n < 0) { SvREFCNT_dec((SV*)hdr_hv); if (decompressed) Safefree(decompressed); return -1; } |
|
2270
|
0 |
0 |
if (this_rec_end - rp < hk_len) { SvREFCNT_dec((SV*)hdr_hv); if (decompressed) Safefree(decompressed); return -1; } |
|
|
0 |
0 |
if (this_rec_end - rp < hk_len) { SvREFCNT_dec((SV*)hdr_hv); if (decompressed) Safefree(decompressed); return -1; } |
|
2275
|
0 |
0 |
if (n < 0) { SvREFCNT_dec((SV*)hdr_hv); if (decompressed) Safefree(decompressed); return -1; } |
|
|
0 |
0 |
if (n < 0) { SvREFCNT_dec((SV*)hdr_hv); if (decompressed) Safefree(decompressed); return -1; } |
|
2278
|
0 |
0 |
if (hv_len >= 0) { |
|
2279
|
0 |
0 |
if (this_rec_end - rp < hv_len) { SvREFCNT_dec((SV*)hdr_hv); if (decompressed) Safefree(decompressed); return -1; } |
|
|
0 |
0 |
if (this_rec_end - rp < hv_len) { SvREFCNT_dec((SV*)hdr_hv); if (decompressed) Safefree(decompressed); return -1; } |
|
2283
|
0 |
0 |
hv_store(hdr_hv, hk_data, (I32)hk_len, |
|
2291
|
0 |
0 |
hv_store(rec_hv, "key", 3, |
|
2293
|
0 |
0 |
hv_store(rec_hv, "value", 5, |
|
2295
|
0 |
0 |
if (hdr_hv) |
|
2303
|
0 |
0 |
if (decompressed) Safefree(decompressed); |
|
2320
|
0 |
0 |
if (version >= 1) { |
|
2321
|
0 |
0 |
if (end - p < 4) goto done; |
|
2327
|
0 |
0 |
if (version >= 7) { |
|
2328
|
0 |
0 |
if (end - p < 2) goto done; |
|
2333
|
0 |
0 |
if (version >= 7) { |
|
2334
|
0 |
0 |
if (end - p < 4) goto done; |
|
2339
|
0 |
0 |
if (end - p < 4) goto done; |
|
2343
|
0 |
0 |
for (i = 0; i < topic_count; i++) { |
|
2348
|
0 |
0 |
if (n < 0) goto done; |
|
2350
|
0 |
0 |
hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0); |
|
|
0 |
0 |
hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0); |
|
2353
|
0 |
0 |
if (end - p < 4) goto done; |
|
2358
|
0 |
0 |
for (j = 0; j < part_count; j++) { |
|
2361
|
0 |
0 |
if (end - p < 4) goto done; |
|
2365
|
0 |
0 |
if (end - p < 2) goto done; |
|
2369
|
0 |
0 |
if (end - p < 8) goto done; |
|
2374
|
0 |
0 |
if (version >= 4) { |
|
2375
|
0 |
0 |
if (end - p < 8) goto done; |
|
2381
|
0 |
0 |
if (version >= 5) { |
|
2382
|
0 |
0 |
if (end - p < 8) goto done; |
|
2387
|
0 |
0 |
if (version >= 4) { |
|
2388
|
0 |
0 |
if (end - p < 4) goto done; |
|
2391
|
0 |
0 |
for (at = 0; at < at_count; at++) { |
|
2392
|
0 |
0 |
if (end - p < 16) goto done; |
|
2398
|
0 |
0 |
if (end - p < 4) goto done; |
|
2402
|
0 |
0 |
if (records_size > 0 && end - p >= records_size) { |
|
|
0 |
0 |
if (records_size > 0 && end - p >= records_size) { |
|
2407
|
0 |
0 |
while (rp < rend && rend - rp >= 12) { |
|
|
0 |
0 |
while (rp < rend && rend - rp >= 12) { |
|
2410
|
0 |
0 |
if (bl < 0 || rend - rp < 12 + bl) break; |
|
|
0 |
0 |
if (bl < 0 || rend - rp < 12 + bl) break; |
|
2416
|
0 |
0 |
} else if (records_size > 0) { |
|
2446
|
0 |
0 |
if (version >= 2) { |
|
2447
|
0 |
0 |
if (end - p < 4) goto done; |
|
2452
|
0 |
0 |
if (end - p < 4) goto done; |
|
2456
|
0 |
0 |
for (i = 0; i < topic_count; i++) { |
|
2460
|
0 |
0 |
if (n < 0) goto done; |
|
2462
|
0 |
0 |
hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0); |
|
|
0 |
0 |
hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0); |
|
2464
|
0 |
0 |
if (end - p < 4) goto done; |
|
2469
|
0 |
0 |
for (j = 0; j < part_count; j++) { |
|
2471
|
0 |
0 |
if (end - p < 4) goto done; |
|
2475
|
0 |
0 |
if (end - p < 2) goto done; |
|
2479
|
0 |
0 |
if (version >= 1) { |
|
2480
|
0 |
0 |
if (end - p < 8) goto done; |
|
2485
|
0 |
0 |
if (end - p < 8) goto done; |
|
2490
|
0 |
0 |
if (version >= 4) { |
|
2491
|
0 |
0 |
if (end - p < 4) goto done; |
|
2517
|
0 |
0 |
if (version >= 1) { |
|
2518
|
0 |
0 |
if (end - p < 4) goto done; |
|
2522
|
0 |
0 |
if (end - p < 2) goto done; |
|
2527
|
0 |
0 |
if (version >= 1) { |
|
2530
|
0 |
0 |
if (n < 0) goto done; |
|
2532
|
0 |
0 |
if (emsg && elen > 0) |
|
|
0 |
0 |
if (emsg && elen > 0) |
|
2536
|
0 |
0 |
if (end - p < 4) goto done; |
|
2542
|
0 |
0 |
if (n < 0) goto done; |
|
2544
|
0 |
0 |
hv_store(result, "host", 4, newSVpvn(host ? host : "", host ? hlen : 0), 0); |
|
|
0 |
0 |
hv_store(result, "host", 4, newSVpvn(host ? host : "", host ? hlen : 0), 0); |
|
2546
|
0 |
0 |
if (end - p < 4) goto done; |
|
2565
|
0 |
0 |
if (version >= 2) { |
|
2566
|
0 |
0 |
if (end - p < 4) goto done; |
|
2570
|
0 |
0 |
if (end - p < 2) goto done; |
|
2574
|
0 |
0 |
if (end - p < 4) goto done; |
|
2582
|
0 |
0 |
if (n < 0) goto done; |
|
2584
|
0 |
0 |
if (proto) |
|
2589
|
0 |
0 |
if (n < 0) goto done; |
|
2591
|
0 |
0 |
if (leader) |
|
2598
|
0 |
0 |
if (n < 0) goto done; |
|
2600
|
0 |
0 |
if (member_id) |
|
2604
|
0 |
0 |
if (end - p < 4) goto done; |
|
2608
|
0 |
0 |
for (i = 0; i < mcount; i++) { |
|
2613
|
0 |
0 |
if (n < 0) goto done; |
|
2615
|
0 |
0 |
if (mid) |
|
2619
|
0 |
0 |
if (version >= 5) { |
|
2622
|
0 |
0 |
if (n < 0) goto done; |
|
2627
|
0 |
0 |
if (end - p < 4) goto done; |
|
2629
|
0 |
0 |
if (mdlen > 0) { |
|
2630
|
0 |
0 |
if (end - p < mdlen) goto done; |
|
2653
|
0 |
0 |
if (version >= 1) { |
|
2654
|
0 |
0 |
if (end - p < 4) goto done; |
|
2658
|
0 |
0 |
if (end - p < 2) goto done; |
|
2663
|
0 |
0 |
if (end - p < 4) goto done; |
|
2665
|
0 |
0 |
if (alen > 0 && end - p >= alen) { |
|
|
0 |
0 |
if (alen > 0 && end - p >= alen) { |
|
2683
|
0 |
0 |
if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */ |
|
|
0 |
0 |
if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */ |
|
2684
|
0 |
0 |
if (end - p >= 2) { |
|
2703
|
0 |
0 |
if (version >= 3 && end - p >= 4) p += 4; /* throttle_time_ms */ |
|
|
0 |
0 |
if (version >= 3 && end - p >= 4) p += 4; /* throttle_time_ms */ |
|
2705
|
0 |
0 |
if (end - p < 4) goto done; |
|
2708
|
0 |
0 |
for (i = 0; i < tc; i++) { |
|
2712
|
0 |
0 |
if (n < 0) goto done; |
|
2714
|
0 |
0 |
hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0); |
|
|
0 |
0 |
hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0); |
|
2716
|
0 |
0 |
if (end - p < 4) goto done; |
|
2720
|
0 |
0 |
for (j = 0; j < pc; j++) { |
|
2722
|
0 |
0 |
if (end - p < 6) goto done; |
|
2749
|
0 |
0 |
if (version >= 3 && end - p >= 4) p += 4; /* throttle_time_ms */ |
|
|
0 |
0 |
if (version >= 3 && end - p >= 4) p += 4; /* throttle_time_ms */ |
|
2751
|
0 |
0 |
if (end - p < 4) goto done; |
|
2754
|
0 |
0 |
for (i = 0; i < tc; i++) { |
|
2758
|
0 |
0 |
if (n < 0) goto done; |
|
2760
|
0 |
0 |
hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0); |
|
|
0 |
0 |
hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0); |
|
2762
|
0 |
0 |
if (end - p < 4) goto done; |
|
2766
|
0 |
0 |
for (j = 0; j < pc; j++) { |
|
2768
|
0 |
0 |
if (end - p < 4) goto done; |
|
2772
|
0 |
0 |
if (end - p < 8) goto done; |
|
2777
|
0 |
0 |
if (version >= 5 && end - p >= 4) p += 4; |
|
|
0 |
0 |
if (version >= 5 && end - p >= 4) p += 4; |
|
2781
|
0 |
0 |
if (n < 0) goto done; |
|
2784
|
0 |
0 |
if (end - p < 2) goto done; |
|
2795
|
0 |
0 |
if (version >= 2 && end - p >= 2) { |
|
|
0 |
0 |
if (version >= 2 && end - p >= 2) { |
|
2814
|
0 |
0 |
if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */ |
|
|
0 |
0 |
if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */ |
|
2815
|
0 |
0 |
if (end - p >= 2) { |
|
2834
|
0 |
0 |
if (version >= 2 && end - p >= 4) p += 4; /* throttle_time_ms */ |
|
|
0 |
0 |
if (version >= 2 && end - p >= 4) p += 4; /* throttle_time_ms */ |
|
2836
|
0 |
0 |
if (end - p < 4) goto done; |
|
2839
|
0 |
0 |
for (i = 0; i < tc; i++) { |
|
2843
|
0 |
0 |
if (n < 0) goto done; |
|
2845
|
0 |
0 |
hv_store(th, "name", 4, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0); |
|
|
0 |
0 |
hv_store(th, "name", 4, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0); |
|
2847
|
0 |
0 |
if (end - p < 2) goto done; |
|
2852
|
0 |
0 |
if (version >= 1) { |
|
2855
|
0 |
0 |
if (n < 0) goto done; |
|
2857
|
0 |
0 |
if (emsg && elen > 0) |
|
|
0 |
0 |
if (emsg && elen > 0) |
|
2880
|
0 |
0 |
if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */ |
|
|
0 |
0 |
if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */ |
|
2882
|
0 |
0 |
if (end - p < 4) goto done; |
|
2885
|
0 |
0 |
for (i = 0; i < tc; i++) { |
|
2889
|
0 |
0 |
if (n < 0) goto done; |
|
2891
|
0 |
0 |
hv_store(th, "name", 4, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0); |
|
|
0 |
0 |
hv_store(th, "name", 4, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0); |
|
2893
|
0 |
0 |
if (end - p < 2) goto done; |
|
2914
|
0 |
0 |
if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */ |
|
|
0 |
0 |
if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */ |
|
2916
|
0 |
0 |
if (end - p < 2) goto done; |
|
2920
|
0 |
0 |
if (end - p < 8) goto done; |
|
2924
|
0 |
0 |
if (end - p < 2) goto done; |
|
2943
|
0 |
0 |
if (end - p < 4) goto done; |
|
2946
|
0 |
0 |
if (end - p < 4) goto done; |
|
2949
|
0 |
0 |
for (i = 0; i < tc; i++) { |
|
2953
|
0 |
0 |
if (n < 0) goto done; |
|
2955
|
0 |
0 |
hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0); |
|
|
0 |
0 |
hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0); |
|
2957
|
0 |
0 |
if (end - p < 4) goto done; |
|
2961
|
0 |
0 |
for (j = 0; j < pc; j++) { |
|
2963
|
0 |
0 |
if (end - p < 6) goto done; |
|
2988
|
0 |
0 |
if (end - p >= 4) p += 4; /* throttle_time_ms */ |
|
2989
|
0 |
0 |
if (end - p >= 2) { |
|
3008
|
0 |
0 |
if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */ |
|
|
0 |
0 |
if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */ |
|
3010
|
0 |
0 |
if (end - p < 4) goto done; |
|
3013
|
0 |
0 |
for (i = 0; i < tc; i++) { |
|
3017
|
0 |
0 |
if (n < 0) goto done; |
|
3019
|
0 |
0 |
hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0); |
|
|
0 |
0 |
hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0); |
|
3021
|
0 |
0 |
if (end - p < 4) goto done; |
|
3025
|
0 |
0 |
for (j = 0; j < pc; j++) { |
|
3027
|
0 |
0 |
if (end - p < 6) goto done; |
|
3048
|
0 |
0 |
while (self->rbuf_len >= 4) { |
|
3050
|
0 |
0 |
if (msg_size < 0 || msg_size > 256 * 1024 * 1024) { |
|
|
0 |
0 |
if (msg_size < 0 || msg_size > 256 * 1024 * 1024) { |
|
3052
|
0 |
0 |
if (conn_check_destroyed(self)) return; |
|
3057
|
0 |
0 |
if (self->rbuf_len < (size_t)(4 + msg_size)) |
|
3063
|
0 |
0 |
if (msg_size < 4) { |
|
3065
|
0 |
0 |
if (conn_check_destroyed(self)) return; |
|
3076
|
0 |
0 |
if (!ngx_queue_empty(&self->cb_queue)) { |
|
3079
|
0 |
0 |
if (cbt->correlation_id != corr_id) { |
|
3086
|
0 |
0 |
if (conn_check_destroyed(self)) return; |
|
3096
|
0 |
0 |
if (cbt) { |
|
3098
|
0 |
0 |
if (cbt->cb) SvREFCNT_dec(cbt->cb); |
|
3100
|
0 |
0 |
if (conn_check_destroyed(self)) return; |
|
3105
|
0 |
0 |
if (self->rbuf_len > 0) |
|
3119
|
1 |
0 |
if (!self || self->magic != KF_MAGIC_ALIVE) return; |
|
|
0 |
1 |
if (!self || self->magic != KF_MAGIC_ALIVE) return; |
|
3122
|
1 |
0 |
if (self->state == CONN_CONNECTING) { |
|
3126
|
1 |
0 |
if (self->timing) { |
|
3132
|
0 |
1 |
if (getsockopt(self->fd, SOL_SOCKET, SO_ERROR, &err, &errlen) < 0) |
|
3134
|
1 |
0 |
if (err != 0) { |
|
3138
|
0 |
1 |
if (conn_check_destroyed(self)) return; |
|
3149
|
0 |
0 |
if (self->state == CONN_TLS_HANDSHAKE) { |
|
3151
|
0 |
0 |
if (ret == 1) { |
|
3161
|
0 |
0 |
if (err == SSL_ERROR_WANT_READ) { |
|
3164
|
0 |
0 |
} else if (err == SSL_ERROR_WANT_WRITE) { |
|
3169
|
0 |
0 |
if (conn_check_destroyed(self)) return; |
|
3177
|
0 |
0 |
if (revents & EV_WRITE) { |
|
3178
|
0 |
0 |
while (self->wbuf_off < self->wbuf_len) { |
|
3181
|
0 |
0 |
if (n < 0) { |
|
3182
|
0 |
0 |
if (errno == EAGAIN || errno == EWOULDBLOCK) break; |
|
|
0 |
0 |
if (errno == EAGAIN || errno == EWOULDBLOCK) break; |
|
3184
|
0 |
0 |
if (conn_check_destroyed(self)) return; |
|
3188
|
0 |
0 |
if (n == 0) { |
|
3195
|
0 |
0 |
if (self->wbuf_off >= self->wbuf_len) { |
|
3203
|
0 |
0 |
if (revents & EV_READ) { |
|
3207
|
0 |
0 |
if (n < 0) { |
|
3208
|
0 |
0 |
if (errno == EAGAIN || errno == EWOULDBLOCK) return; |
|
|
0 |
0 |
if (errno == EAGAIN || errno == EWOULDBLOCK) return; |
|
3210
|
0 |
0 |
if (conn_check_destroyed(self)) return; |
|
3214
|
0 |
0 |
if (n == 0) { |
|
3235
|
0 |
0 |
if (self->magic != KF_MAGIC_ALIVE) return; |
|
3238
|
0 |
0 |
if (conn_check_destroyed(self)) return; |
|
3259
|
0 |
0 |
if (self->tls_enabled) { |
|
3261
|
0 |
0 |
if (!self->ssl_ctx) { |
|
3263
|
0 |
0 |
if (conn_check_destroyed(self)) return; |
|
3268
|
0 |
0 |
if (self->tls_skip_verify) |
|
3273
|
0 |
0 |
if (self->tls_ca_file) { |
|
3274
|
0 |
0 |
if (SSL_CTX_load_verify_locations(self->ssl_ctx, self->tls_ca_file, NULL) != 1) { |
|
3276
|
0 |
0 |
if (conn_check_destroyed(self)) return; |
|
3283
|
0 |
0 |
if (!self->ssl) { |
|
3285
|
0 |
0 |
if (conn_check_destroyed(self)) return; |
|
3291
|
0 |
0 |
if (!is_ip_literal(self->host)) |
|
3294
|
0 |
0 |
if (!self->tls_skip_verify) { |
|
3297
|
0 |
0 |
if (is_ip_literal(self->host)) |
|
3305
|
0 |
0 |
if (ret == 1) { |
|
3312
|
0 |
0 |
if (err == SSL_ERROR_WANT_READ) { |
|
3314
|
0 |
0 |
} else if (err == SSL_ERROR_WANT_WRITE) { |
|
3318
|
0 |
0 |
if (conn_check_destroyed(self)) return; |
|
3337
|
0 |
1 |
if (self->state != CONN_DISCONNECTED) { |
|
3342
|
1 |
0 |
if (host != self->host) { |
|
3343
|
0 |
1 |
if (self->host) Safefree(self->host); |
|
3356
|
0 |
1 |
if (gai_err != 0) { |
|
3363
|
1 |
0 |
for (rp = res; rp; rp = rp->ai_next) { |
|
3365
|
0 |
1 |
if (fd < 0) continue; |
|
3377
|
0 |
1 |
if (ret == 0) { |
|
3392
|
1 |
0 |
if (errno == EINPROGRESS) { |
|
3404
|
1 |
0 |
if (timeout > 0) { |
|
3528
|
0 |
6 |
if (SvOK(loop_sv) && sv_derived_from(loop_sv, "EV::Loop")) |
|
|
0 |
0 |
if (SvOK(loop_sv) && sv_derived_from(loop_sv, "EV::Loop")) |
|
3558
|
384 |
6 |
for (i = 0; i < API_VERSIONS_MAX_KEY; i++) |
|
3573
|
0 |
6 |
if (self->magic != KF_MAGIC_ALIVE) return; |
|
3581
|
0 |
6 |
if (self->reconnect_timing) { |
|
3586
|
1 |
5 |
CLEAR_HANDLER(self->on_error); |
|
3587
|
0 |
6 |
CLEAR_HANDLER(self->on_connect); |
|
3588
|
0 |
6 |
CLEAR_HANDLER(self->on_disconnect); |
|
3590
|
1 |
5 |
if (self->host) Safefree(self->host); |
|
3591
|
6 |
0 |
if (self->client_id) Safefree(self->client_id); |
|
3592
|
0 |
6 |
if (self->sasl_mechanism) Safefree(self->sasl_mechanism); |
|
3593
|
0 |
6 |
if (self->sasl_username) Safefree(self->sasl_username); |
|
3594
|
0 |
6 |
if (self->sasl_password) Safefree(self->sasl_password); |
|
3595
|
0 |
6 |
if (self->scram_nonce) Safefree(self->scram_nonce); |
|
3596
|
0 |
6 |
if (self->scram_client_first) Safefree(self->scram_client_first); |
|
3597
|
0 |
6 |
if (self->tls_ca_file) Safefree(self->tls_ca_file); |
|
3598
|
6 |
0 |
if (self->rbuf) Safefree(self->rbuf); |
|
3599
|
6 |
0 |
if (self->wbuf) Safefree(self->wbuf); |
|
3616
|
0 |
0 |
if (self->reconnect_timing) { |
|
3626
|
0 |
2 |
RETVAL = (self->state == CONN_READY) ? 1 : 0; |
|
3633
|
0 |
0 |
RETVAL = self->state; |
|
3640
|
0 |
0 |
RETVAL = self->pending_count; |
|
3648
|
0 |
1 |
CLEAR_HANDLER(self->on_error); |
|
3649
|
1 |
0 |
if (cb && SvOK(cb)) { |
|
|
1 |
0 |
if (cb && SvOK(cb)) { |
|
3658
|
0 |
0 |
CLEAR_HANDLER(self->on_connect); |
|
3659
|
0 |
0 |
if (cb && SvOK(cb)) { |
|
|
0 |
0 |
if (cb && SvOK(cb)) { |
|
3668
|
0 |
0 |
CLEAR_HANDLER(self->on_disconnect); |
|
3669
|
0 |
0 |
if (cb && SvOK(cb)) { |
|
|
0 |
0 |
if (cb && SvOK(cb)) { |
|
3678
|
0 |
0 |
if (id) { |
|
3679
|
0 |
0 |
if (self->client_id) Safefree(self->client_id); |
|
3690
|
0 |
0 |
if (self->tls_ca_file) { Safefree(self->tls_ca_file); self->tls_ca_file = NULL; } |
|
3691
|
0 |
0 |
if (ca_file) self->tls_ca_file = savepv(ca_file); |
|
3699
|
0 |
0 |
if (self->sasl_mechanism) { Safefree(self->sasl_mechanism); self->sasl_mechanism = NULL; } |
|
3700
|
0 |
0 |
if (self->sasl_username) { Safefree(self->sasl_username); self->sasl_username = NULL; } |
|
3701
|
0 |
0 |
if (self->sasl_password) { Safefree(self->sasl_password); self->sasl_password = NULL; } |
|
3702
|
0 |
0 |
if (SvOK(ST(1))) { |
|
3704
|
0 |
0 |
if (username) self->sasl_username = savepv(username); |
|
3705
|
0 |
0 |
if (password) self->sasl_password = savepv(password); |
|
3721
|
1 |
0 |
if (self->state != CONN_READY) |
|
3729
|
0 |
0 |
if (ver < 0) ver = 1; |
|
3730
|
0 |
0 |
if (ver > 4) ver = 4; |
|
3732
|
0 |
0 |
if (SvOK(topics_sv) && SvROK(topics_sv) && SvTYPE(SvRV(topics_sv)) == SVt_PVAV) { |
|
|
0 |
0 |
if (SvOK(topics_sv) && SvROK(topics_sv) && SvTYPE(SvRV(topics_sv)) == SVt_PVAV) { |
|
|
0 |
0 |
if (SvOK(topics_sv) && SvROK(topics_sv) && SvTYPE(SvRV(topics_sv)) == SVt_PVAV) { |
|
3736
|
0 |
0 |
for (i = 0; i < count; i++) { |
|
3747
|
0 |
0 |
if (ver >= 4) |
|
3758
|
0 |
0 |
if (!self->api_versions_known) |
|
3763
|
0 |
0 |
for (i = 0; i < API_VERSIONS_MAX_KEY; i++) { |
|
3764
|
0 |
0 |
if (self->api_versions[i] >= 0) { |
|
3770
|
0 |
0 |
EXTEND(SP, 1); |
|
3779
|
1 |
0 |
if (self->state != CONN_READY) |
|
3786
|
0 |
0 |
if (ver < 0) ver = 4; |
|
3787
|
0 |
0 |
if (ver > 7) ver = 7; |
|
3797
|
0 |
0 |
if (ver >= 3) |
|
3801
|
0 |
0 |
if (ver >= 4) |
|
3805
|
0 |
0 |
if (ver >= 7) { |
|
3822
|
0 |
0 |
if (ver >= 5) |
|
3829
|
0 |
0 |
if (ver >= 7) |
|
3840
|
0 |
0 |
if (self->state != CONN_READY) |
|
3844
|
0 |
0 |
if (!SvROK(topics_sv) || SvTYPE(SvRV(topics_sv)) != SVt_PVHV) |
|
|
0 |
0 |
if (!SvROK(topics_sv) || SvTYPE(SvRV(topics_sv)) != SVt_PVHV) |
|
3849
|
0 |
0 |
if (ver < 0) ver = 4; |
|
3850
|
0 |
0 |
if (ver > 7) ver = 7; |
|
3858
|
0 |
0 |
if (ver >= 3) |
|
3860
|
0 |
0 |
if (ver >= 4) |
|
3862
|
0 |
0 |
if (ver >= 7) { |
|
3868
|
0 |
0 |
kf_buf_append_i32(&body, (int32_t)HvUSEDKEYS(topics_hv)); |
|
3872
|
0 |
0 |
while ((entry = hv_iternext(topics_hv))) { |
|
3878
|
0 |
0 |
if (!SvROK(parts_sv) || SvTYPE(SvRV(parts_sv)) != SVt_PVAV) |
|
|
0 |
0 |
if (!SvROK(parts_sv) || SvTYPE(SvRV(parts_sv)) != SVt_PVAV) |
|
3884
|
0 |
0 |
for (i = 0; i < pc; i++) { |
|
3886
|
0 |
0 |
if (!elem || !SvROK(*elem)) |
|
|
0 |
0 |
if (!elem || !SvROK(*elem)) |
|
3891
|
0 |
0 |
int32_t pid = pid_sv ? (int32_t)SvIV(*pid_sv) : 0; |
|
3895
|
0 |
0 |
int64_t offset = off_sv ? (int64_t)SvIV(*off_sv) : 0; |
|
3898
|
0 |
0 |
if (ver >= 5) |
|
3905
|
0 |
0 |
if (ver >= 7) |
|
3916
|
0 |
0 |
if (self->state != CONN_READY) |
|
3919
|
0 |
0 |
if (!SvROK(records_sv) || SvTYPE(SvRV(records_sv)) != SVt_PVAV) |
|
|
0 |
0 |
if (!SvROK(records_sv) || SvTYPE(SvRV(records_sv)) != SVt_PVAV) |
|
3931
|
0 |
0 |
if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV) { |
|
|
0 |
0 |
if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV) { |
|
|
0 |
0 |
if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV) { |
|
3934
|
0 |
0 |
if ((tmp = hv_fetch(opts, "acks", 4, 0))) |
|
3936
|
0 |
0 |
if ((tmp = hv_fetch(opts, "transactional_id", 16, 0)) && SvOK(*tmp)) |
|
|
0 |
0 |
if ((tmp = hv_fetch(opts, "transactional_id", 16, 0)) && SvOK(*tmp)) |
|
3938
|
0 |
0 |
if ((tmp = hv_fetch(opts, "compression", 11, 0))) { |
|
3941
|
0 |
0 |
if (clen == 4 && memcmp(cstr, "none", 4) == 0) compression = COMPRESS_NONE; |
|
|
0 |
0 |
if (clen == 4 && memcmp(cstr, "none", 4) == 0) compression = COMPRESS_NONE; |
|
3946
|
0 |
0 |
else if (clen == 4 && memcmp(cstr, "gzip", 4) == 0) compression = COMPRESS_GZIP; |
|
|
0 |
0 |
else if (clen == 4 && memcmp(cstr, "gzip", 4) == 0) compression = COMPRESS_GZIP; |
|
3950
|
0 |
0 |
if ((tmp = hv_fetch(opts, "producer_id", 11, 0))) |
|
3952
|
0 |
0 |
if ((tmp = hv_fetch(opts, "producer_epoch", 14, 0))) |
|
3954
|
0 |
0 |
if ((tmp = hv_fetch(opts, "base_sequence", 13, 0))) |
|
3956
|
0 |
0 |
} else if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVCV) { |
|
|
0 |
0 |
} else if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVCV) { |
|
|
0 |
0 |
} else if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVCV) { |
|
3971
|
0 |
0 |
if (ver < 0) ver = 3; |
|
3972
|
0 |
0 |
if (ver > 7) ver = 7; |
|
3979
|
0 |
0 |
if (ver >= 3) |
|
3980
|
0 |
0 |
kf_buf_append_nullable_string(&body, txn_id, txn_id ? (int16_t)txn_id_len : 0); |
|
3992
|
0 |
0 |
conn_send_request(aTHX_ self, API_PRODUCE, ver, &body, |
|
3993
|
0 |
0 |
(cb && SvOK(cb)) ? cb : NULL, 0, (acks == 0) ? 1 : 0); |
|
4003
|
0 |
0 |
if (self->state != CONN_READY) |
|
4011
|
0 |
0 |
if (ver < 0) ver = 1; |
|
4012
|
0 |
0 |
if (ver > 5) ver = 5; |
|
4020
|
0 |
0 |
if (ver >= 2) |
|
4032
|
0 |
0 |
if (ver >= 4) |
|
4045
|
1 |
0 |
if (self->state != CONN_READY) |
|
4055
|
0 |
0 |
if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV) { |
|
|
0 |
0 |
if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV) { |
|
|
0 |
0 |
if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV) { |
|
4058
|
0 |
0 |
if ((tmp = hv_fetch(opts, "headers", 7, 0)) && SvROK(*tmp) && SvTYPE(SvRV(*tmp)) == SVt_PVHV) |
|
|
0 |
0 |
if ((tmp = hv_fetch(opts, "headers", 7, 0)) && SvROK(*tmp) && SvTYPE(SvRV(*tmp)) == SVt_PVHV) |
|
|
0 |
0 |
if ((tmp = hv_fetch(opts, "headers", 7, 0)) && SvROK(*tmp) && SvTYPE(SvRV(*tmp)) == SVt_PVHV) |
|
4060
|
0 |
0 |
if ((tmp = hv_fetch(opts, "acks", 4, 0))) |
|
4062
|
0 |
0 |
if ((tmp = hv_fetch(opts, "compression", 11, 0))) { |
|
4065
|
0 |
0 |
if (clen == 4 && memcmp(cstr, "none", 4) == 0) compression = COMPRESS_NONE; |
|
|
0 |
0 |
if (clen == 4 && memcmp(cstr, "none", 4) == 0) compression = COMPRESS_NONE; |
|
4070
|
0 |
0 |
else if (clen == 4 && memcmp(cstr, "gzip", 4) == 0) compression = COMPRESS_GZIP; |
|
|
0 |
0 |
else if (clen == 4 && memcmp(cstr, "gzip", 4) == 0) compression = COMPRESS_GZIP; |
|
4074
|
0 |
0 |
} else if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVCV) { |
|
|
0 |
0 |
} else if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVCV) { |
|
|
0 |
0 |
} else if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVCV) { |
|
4082
|
0 |
0 |
if (SvOK(key_sv)) |
|
4087
|
0 |
0 |
if (SvOK(value_sv)) |
|
4094
|
0 |
0 |
if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV) |
|
|
0 |
0 |
if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV) |
|
|
0 |
0 |
if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV) |
|
4096
|
0 |
0 |
if (ts_tmp && SvOK(*ts_tmp)) { |
|
|
0 |
0 |
if (ts_tmp && SvOK(*ts_tmp)) { |
|
4110
|
0 |
0 |
if (ver < 0) ver = 3; |
|
4111
|
0 |
0 |
if (ver > 7) ver = 7; |
|
4119
|
0 |
0 |
if (ver >= 3) |
|
4137
|
0 |
0 |
conn_send_request(aTHX_ self, API_PRODUCE, ver, &body, |
|
4138
|
0 |
0 |
(cb && SvOK(cb)) ? cb : NULL, 0, (acks == 0) ? 1 : 0); |
|
4148
|
0 |
0 |
if (self->state != CONN_READY) |
|
4152
|
0 |
0 |
if (ver < 0) ver = 0; |
|
4153
|
0 |
0 |
if (ver > 2) ver = 2; |
|
4162
|
0 |
0 |
if (ver >= 1) |
|
4173
|
0 |
0 |
if (self->state != CONN_READY) |
|
4177
|
0 |
0 |
if (ver < 0) ver = 1; |
|
4178
|
0 |
0 |
if (ver > 5) ver = 5; |
|
4188
|
0 |
0 |
if (ver >= 1) |
|
4195
|
0 |
0 |
if (ver >= 5) { |
|
4196
|
0 |
0 |
if (group_instance_id_sv && SvOK(group_instance_id_sv)) { |
|
|
0 |
0 |
if (group_instance_id_sv && SvOK(group_instance_id_sv)) { |
|
4223
|
0 |
0 |
for (i = 0; i < tc; i++) { |
|
4242
|
0 |
0 |
if (self->state != CONN_READY) |
|
4246
|
0 |
0 |
if (ver < 0) ver = 0; |
|
4247
|
0 |
0 |
if (ver > 3) ver = 3; |
|
4260
|
0 |
0 |
if (ver >= 3) { |
|
4261
|
0 |
0 |
if (group_instance_id_sv && SvOK(group_instance_id_sv)) { |
|
|
0 |
0 |
if (group_instance_id_sv && SvOK(group_instance_id_sv)) { |
|
4271
|
0 |
0 |
if (SvOK(assignments_sv) && SvROK(assignments_sv) |
|
|
0 |
0 |
if (SvOK(assignments_sv) && SvROK(assignments_sv) |
|
4272
|
0 |
0 |
&& SvTYPE(SvRV(assignments_sv)) == SVt_PVAV) { |
|
4277
|
0 |
0 |
for (i = 0; i < ac; i++) { |
|
4279
|
0 |
0 |
if (!elem || !SvROK(*elem)) continue; |
|
|
0 |
0 |
if (!elem || !SvROK(*elem)) continue; |
|
4283
|
0 |
0 |
if (!mid_sv) continue; |
|
4289
|
0 |
0 |
if (!data_sv) { kf_buf_append_bytes(&body, NULL, 0); continue; } |
|
4306
|
0 |
0 |
if (self->state != CONN_READY) |
|
4310
|
0 |
0 |
if (ver < 0) ver = 0; |
|
4311
|
0 |
0 |
if (ver > 4) ver = 4; |
|
4324
|
0 |
0 |
if (ver >= 3) { |
|
4325
|
0 |
0 |
if (group_instance_id_sv && SvOK(group_instance_id_sv)) { |
|
|
0 |
0 |
if (group_instance_id_sv && SvOK(group_instance_id_sv)) { |
|
4342
|
0 |
0 |
if (self->state != CONN_READY) |
|
4346
|
0 |
0 |
if (ver < 0) ver = 2; |
|
4347
|
0 |
0 |
if (ver > 7) ver = 7; |
|
4356
|
0 |
0 |
if (ver >= 1) |
|
4360
|
0 |
0 |
if (ver >= 1) { |
|
4366
|
0 |
0 |
if (ver >= 7) |
|
4374
|
0 |
0 |
for (i = 0; i < tc; i++) { |
|
4376
|
0 |
0 |
if (!elem || !SvROK(*elem)) continue; |
|
|
0 |
0 |
if (!elem || !SvROK(*elem)) continue; |
|
4379
|
0 |
0 |
if (!tname_sv) continue; |
|
4385
|
0 |
0 |
if (!parts_sv || !SvROK(*parts_sv)) { kf_buf_append_i32(&body, 0); continue; } |
|
|
0 |
0 |
if (!parts_sv || !SvROK(*parts_sv)) { kf_buf_append_i32(&body, 0); continue; } |
|
4390
|
0 |
0 |
for (j = 0; j < pc; j++) { |
|
4392
|
0 |
0 |
if (!pelem || !SvROK(*pelem)) continue; |
|
|
0 |
0 |
if (!pelem || !SvROK(*pelem)) continue; |
|
4396
|
0 |
0 |
kf_buf_append_i32(&body, pid_sv ? (int32_t)SvIV(*pid_sv) : 0); |
|
4399
|
0 |
0 |
kf_buf_append_i64(&body, off_sv ? (int64_t)SvIV(*off_sv) : 0); |
|
4402
|
0 |
0 |
if (ver >= 6) |
|
4418
|
0 |
0 |
if (self->state != CONN_READY) |
|
4422
|
0 |
0 |
if (ver < 0) ver = 1; |
|
4423
|
0 |
0 |
if (ver > 5) ver = 5; |
|
4436
|
0 |
0 |
for (i = 0; i < tc; i++) { |
|
4438
|
0 |
0 |
if (!elem || !SvROK(*elem)) continue; |
|
|
0 |
0 |
if (!elem || !SvROK(*elem)) continue; |
|
4441
|
0 |
0 |
if (!tname_sv) continue; |
|
4447
|
0 |
0 |
if (!parts_sv || !SvROK(*parts_sv)) { kf_buf_append_i32(&body, 0); continue; } |
|
|
0 |
0 |
if (!parts_sv || !SvROK(*parts_sv)) { kf_buf_append_i32(&body, 0); continue; } |
|
4452
|
0 |
0 |
for (j = 0; j < pc; j++) { |
|
4454
|
0 |
0 |
kf_buf_append_i32(&body, pelem ? (int32_t)SvIV(*pelem) : 0); |
|
4466
|
0 |
0 |
if (self->state != CONN_READY) |
|
4470
|
0 |
0 |
if (ver < 0) ver = 0; |
|
4471
|
0 |
0 |
if (ver > 3) ver = 3; |
|
4490
|
0 |
0 |
if (self->state != CONN_READY) |
|
4494
|
0 |
0 |
if (ver < 0) ver = 0; |
|
4495
|
0 |
0 |
if (ver > 4) ver = 4; |
|
4504
|
0 |
0 |
for (i = 0; i < tc; i++) { |
|
4506
|
0 |
0 |
if (!elem || !SvROK(*elem)) continue; |
|
|
0 |
0 |
if (!elem || !SvROK(*elem)) continue; |
|
4510
|
0 |
0 |
if (!name_sv) continue; |
|
4516
|
0 |
0 |
int32_t num_partitions = np_sv ? (int32_t)SvIV(*np_sv) : 1; |
|
4520
|
0 |
0 |
int16_t replication_factor = rf_sv ? (int16_t)SvIV(*rf_sv) : 1; |
|
4533
|
0 |
0 |
if (ver >= 1) |
|
4544
|
0 |
0 |
if (self->state != CONN_READY) |
|
4548
|
0 |
0 |
if (ver < 0) ver = 0; |
|
4549
|
0 |
0 |
if (ver > 3) ver = 3; |
|
4558
|
0 |
0 |
for (i = 0; i < tc; i++) { |
|
4560
|
0 |
0 |
if (!elem) continue; |
|
4576
|
0 |
0 |
if (self->state != CONN_READY) |
|
4580
|
0 |
0 |
if (ver < 0) ver = 0; |
|
4581
|
0 |
0 |
if (ver > 1) ver = 1; |
|
4586
|
0 |
0 |
if (SvOK(transactional_id_sv)) { |
|
4597
|
0 |
0 |
if (ver >= 2) { |
|
4610
|
0 |
0 |
if (self->state != CONN_READY) |
|
4614
|
0 |
0 |
if (ver < 0) ver = 0; |
|
4615
|
0 |
0 |
if (ver > 1) ver = 1; |
|
4628
|
0 |
0 |
if (!SvROK(topics_sv) || SvTYPE(SvRV(topics_sv)) != SVt_PVAV) |
|
|
0 |
0 |
if (!SvROK(topics_sv) || SvTYPE(SvRV(topics_sv)) != SVt_PVAV) |
|
4634
|
0 |
0 |
for (i = 0; i < tc; i++) { |
|
4636
|
0 |
0 |
if (!elem || !SvROK(*elem)) croak("add_partitions_to_txn: bad element"); |
|
|
0 |
0 |
if (!elem || !SvROK(*elem)) croak("add_partitions_to_txn: bad element"); |
|
4639
|
0 |
0 |
if (!tname_sv) croak("add_partitions_to_txn: missing topic"); |
|
4645
|
0 |
0 |
if (!parts_sv || !SvROK(*parts_sv)) croak("add_partitions_to_txn: missing partitions"); |
|
|
0 |
0 |
if (!parts_sv || !SvROK(*parts_sv)) croak("add_partitions_to_txn: missing partitions"); |
|
4649
|
0 |
0 |
for (j = 0; j < pc; j++) { |
|
4651
|
0 |
0 |
kf_buf_append_i32(&body, pelem ? (int32_t)SvIV(*pelem) : 0); |
|
4663
|
0 |
0 |
if (self->state != CONN_READY) |
|
4667
|
0 |
0 |
if (ver < 0) ver = 0; |
|
4668
|
0 |
0 |
if (ver > 1) ver = 1; |
|
4689
|
0 |
0 |
if (self->state != CONN_READY) |
|
4693
|
0 |
0 |
if (ver < 0) ver = 0; |
|
4694
|
0 |
0 |
if (ver > 3) ver = 3; |
|
4711
|
0 |
0 |
if (ver >= 3) { |
|
4719
|
0 |
0 |
if (!SvROK(offsets_sv) || SvTYPE(SvRV(offsets_sv)) != SVt_PVAV) |
|
|
0 |
0 |
if (!SvROK(offsets_sv) || SvTYPE(SvRV(offsets_sv)) != SVt_PVAV) |
|
4725
|
0 |
0 |
for (i = 0; i < tc; i++) { |
|
4727
|
0 |
0 |
if (!elem || !SvROK(*elem)) croak("txn_offset_commit: bad element"); |
|
|
0 |
0 |
if (!elem || !SvROK(*elem)) croak("txn_offset_commit: bad element"); |
|
4731
|
0 |
0 |
if (!tname_sv) croak("txn_offset_commit: missing topic"); |
|
4737
|
0 |
0 |
if (!parts_sv || !SvROK(*parts_sv)) croak("txn_offset_commit: missing partitions"); |
|
|
0 |
0 |
if (!parts_sv || !SvROK(*parts_sv)) croak("txn_offset_commit: missing partitions"); |
|
4742
|
0 |
0 |
for (j = 0; j < pc; j++) { |
|
4744
|
0 |
0 |
if (!pelem || !SvROK(*pelem)) croak("txn_offset_commit: bad partition"); |
|
|
0 |
0 |
if (!pelem || !SvROK(*pelem)) croak("txn_offset_commit: bad partition"); |
|
4748
|
0 |
0 |
kf_buf_append_i32(&body, ppid_sv ? (int32_t)SvIV(*ppid_sv) : 0); |
|
4751
|
0 |
0 |
kf_buf_append_i64(&body, off_sv ? (int64_t)SvIV(*off_sv) : 0); |
|
4754
|
0 |
0 |
if (ver >= 2) |
|
4774
|
0 |
6 |
if (SvOK(loop_sv) && sv_derived_from(loop_sv, "EV::Loop")) |
|
|
0 |
0 |
if (SvOK(loop_sv) && sv_derived_from(loop_sv, "EV::Loop")) |
|
4814
|
0 |
6 |
if (self->magic != KF_MAGIC_ALIVE) return; |
|
4818
|
0 |
6 |
while (!ngx_queue_empty(&self->brokers)) { |
|
4829
|
0 |
6 |
while (!ngx_queue_empty(&self->topics)) { |
|
4833
|
0 |
0 |
if (tm->name) Safefree(tm->name); |
|
4834
|
0 |
0 |
if (tm->partitions) Safefree(tm->partitions); |
|
4838
|
0 |
6 |
if (self->metadata_timing) { |
|
4842
|
0 |
6 |
if (self->linger_timing) { |
|
4846
|
0 |
6 |
if (self->fetch_timing) { |
|
4852
|
0 |
6 |
if (self->bootstrap_hosts) { |
|
4854
|
0 |
0 |
for (i = 0; i < self->bootstrap_count; i++) |
|
4855
|
0 |
0 |
if (self->bootstrap_hosts[i]) Safefree(self->bootstrap_hosts[i]); |
|
4858
|
0 |
6 |
if (self->bootstrap_ports) Safefree(self->bootstrap_ports); |
|
4861
|
0 |
6 |
if (self->group) { |
|
4862
|
0 |
0 |
if (self->group->heartbeat_timing) |
|
4864
|
0 |
0 |
if (self->group->group_id) Safefree(self->group->group_id); |
|
4865
|
0 |
0 |
if (self->group->member_id) Safefree(self->group->member_id); |
|
4866
|
0 |
0 |
CLEAR_HANDLER(self->group->on_assign); |
|
4867
|
0 |
0 |
CLEAR_HANDLER(self->group->on_revoke); |
|
4868
|
0 |
0 |
if (self->group->subscriptions) SvREFCNT_dec((SV*)self->group->subscriptions); |
|
4872
|
0 |
6 |
CLEAR_HANDLER(self->on_error); |
|
4873
|
0 |
6 |
CLEAR_HANDLER(self->on_connect); |
|
4874
|
0 |
6 |
CLEAR_HANDLER(self->on_message); |
|
4875
|
0 |
6 |
CLEAR_HANDLER(self->partitioner); |
|
4877
|
6 |
0 |
if (self->client_id) Safefree(self->client_id); |
|
4878
|
0 |
6 |
if (self->sasl_mechanism) Safefree(self->sasl_mechanism); |
|
4879
|
0 |
6 |
if (self->sasl_username) Safefree(self->sasl_username); |
|
4880
|
0 |
6 |
if (self->sasl_password) Safefree(self->sasl_password); |
|
4881
|
0 |
6 |
if (self->tls_ca_file) Safefree(self->tls_ca_file); |
|
4897
|
259 |
16 |
while (remaining >= 4) { |
|
4919
|
9 |
7 |
RETVAL = (int)(h & 0x7FFFFFFF); |
|
4965
|
0 |
10 |
EXTEND(SP, 1); |
|
4972
|
0 |
15 |
I_EV_API("EV::Kafka"); |
|
|
15 |
0 |
I_EV_API("EV::Kafka"); |
|
|
0 |
15 |
I_EV_API("EV::Kafka"); |