Branch Coverage

Kafka.xs
Criterion Covered Total %
branch 101 1778 5.6


line true false branch
126 0 0 if (b->cap >= need) return;
128 0 0 if (newcap < need) newcap = need;
134 0 0 if (b->data) { Safefree(b->data); b->data = NULL; }
176 0 0 if (len > 0) kf_buf_append(b, s, len);
180 0 0 if (!s) {
184 0 0 if (len > 0) kf_buf_append(b, s, len);
191 0 0 if (len > 0) kf_buf_append(b, s, len);
195 0 0 if (!s) {
199 0 0 if (len > 0) kf_buf_append(b, s, len);
206 0 0 while (val >= 0x80) {
221 0 0 if (!s) {
225 0 0 if (len > 0) kf_buf_append(b, s, len);
262 0 0 while (p < end) {
265 0 0 if (!(b & 0x80)) {
270 0 0 if (shift >= 64) return -1;
279 0 0 if (n < 0) return n;
286 0 0 if (end - buf < 2) return -1;
288 0 0 if (len < 0) { /* nullable null */
293 0 0 if (end - buf < 2 + len) return -1;
303 0 0 if (n < 0) return -1;
304 0 0 if (raw == 0) {
310 0 0 if (end - buf - n < len) return -1;
320 0 0 if (n < 0) return -1;
323 0 0 for (i = 0; i < count; i++) {
326 0 0 if (tn < 0) return -1;
330 0 0 if (dn < 0) return -1;
332 0 0 if ((uint64_t)(end - p) < dlen) return -1;
347 3840 15 for (i = 0; i < 256; i++) {
349 30720 3840 for (j = 0; j < 8; j++) {
350 15360 15360 if (crc & 1)
363 0 7 if (!crc32c_table_inited) crc32c_init_table();
364 24 7 for (i = 0; i < len; i++)
538 0 0 if (self->ssl) {
539 0 0 int ssl_len = (len > (size_t)INT_MAX) ? INT_MAX : (int)len;
541 0 0 if (ret <= 0) {
543 0 0 if (err == SSL_ERROR_WANT_READ) {
547 0 0 if (err == SSL_ERROR_WANT_WRITE) {
552 0 0 if (err == SSL_ERROR_ZERO_RETURN) return 0;
564 0 0 if (self->ssl) {
565 0 0 int ssl_len = (len > (size_t)INT_MAX) ? INT_MAX : (int)len;
567 0 0 if (ret <= 0) {
569 0 0 if (err == SSL_ERROR_WANT_WRITE) {
573 0 0 if (err == SSL_ERROR_WANT_READ) {
592 0 0 if (self->rbuf_cap >= need) return;
594 0 0 if (newcap < need) newcap = need;
600 0 0 if (self->wbuf_cap >= need) return;
602 0 0 if (newcap < need) newcap = need;
612 0 0 if (!self->reading && self->fd >= 0) {
0 0 if (!self->reading && self->fd >= 0) {
619 0 7 if (self->reading) {
626 1 0 if (!self->writing && self->fd >= 0) {
1 0 if (!self->writing && self->fd >= 0) {
633 1 7 if (self->writing) {
644 0 1 if (!self->on_error)
652 0 1 PUSHMARK(SP);
653 0 1 XPUSHs(sv_2mortal(newSVpv(msg, 0)));
656 1 0 if (SvTRUE(ERRSV)) {
0 1 if (SvTRUE(ERRSV)) {
657 0 0 warn("EV::Kafka::Conn: on_error callback error: %s", SvPV_nolen(ERRSV));
658 0 0 sv_setsv(ERRSV, &PL_sv_undef);
660 1 0 FREETMPS;
667 0 0 if (!self->on_connect) return;
674 0 0 PUSHMARK(SP);
677 0 0 if (SvTRUE(ERRSV)) {
0 0 if (SvTRUE(ERRSV)) {
678 0 0 warn("EV::Kafka::Conn: on_connect callback error: %s", SvPV_nolen(ERRSV));
679 0 0 sv_setsv(ERRSV, &PL_sv_undef);
681 0 0 FREETMPS;
688 1 0 if (!self->on_disconnect) return;
695 0 0 PUSHMARK(SP);
698 0 0 if (SvTRUE(ERRSV)) {
0 0 if (SvTRUE(ERRSV)) {
699 0 0 warn("EV::Kafka::Conn: on_disconnect callback error: %s", SvPV_nolen(ERRSV));
700 0 0 sv_setsv(ERRSV, &PL_sv_undef);
702 0 0 FREETMPS;
710 0 0 if (!cb) return;
717 0 0 PUSHMARK(SP);
718 0 0 EXTEND(SP, 2);
719 0 0 PUSHs(result ? result : &PL_sv_undef);
720 0 0 PUSHs(error ? error : &PL_sv_undef);
723 0 0 if (SvTRUE(ERRSV)) {
0 0 if (SvTRUE(ERRSV)) {
724 0 0 warn("EV::Kafka::Conn: callback error: %s", SvPV_nolen(ERRSV));
725 0 0 sv_setsv(ERRSV, &PL_sv_undef);
727 0 0 FREETMPS;
744 0 7 if (self->timing) {
749 0 7 if (self->ssl) {
753 0 7 if (self->ssl_ctx) {
758 1 6 if (self->fd >= 0) {
769 0 7 while (!ngx_queue_empty(&self->cb_queue)) {
774 0 0 if (cbt->cb && !cbt->internal) {
0 0 if (cbt->cb && !cbt->internal) {
776 0 0 if (conn_check_destroyed(self)) {
783 0 0 if (cbt->cb) SvREFCNT_dec(cbt->cb);
793 0 1 if (conn_check_destroyed(self)) return;
796 0 1 if (conn_check_destroyed(self)) return;
798 1 0 if (!self->intentional_disconnect && self->auto_reconnect) {
0 1 if (!self->intentional_disconnect && self->auto_reconnect) {
814 0 0 if (self->magic != KF_MAGIC_ALIVE) return;
815 0 0 if (self->state != CONN_DISCONNECTED) return;
816 0 0 if (!self->host) return;
822 0 0 if (self->reconnect_timing) return;
824 0 0 if (delay < 0.01) delay = 1.0;
850 0 0 int flexible = (api_key == API_API_VERSIONS && api_version >= 3);
0 0 int flexible = (api_key == API_API_VERSIONS && api_version >= 3);
856 0 0 if (flexible) {
866 0 0 if (raw_size > (size_t)INT32_MAX) {
873 0 0 if (self->wbuf_off > 0 && self->wbuf_off > self->wbuf_len / 2) {
0 0 if (self->wbuf_off > 0 && self->wbuf_off > self->wbuf_len / 2) {
875 0 0 if (self->wbuf_len > 0)
895 0 0 if (!no_response) {
902 0 0 if (cb) {
937 0 0 for (i = 0; i < API_VERSIONS_MAX_KEY; i++)
940 0 0 if (end - p < 2) goto err;
942 0 0 if (error_code != 0) {
946 0 0 if (conn_check_destroyed(self)) return;
952 0 0 if (end - p < 4) goto err;
955 0 0 for (i = 0; i < count; i++) {
956 0 0 if (end - p < 6) goto err;
961 0 0 if (key >= 0 && key < API_VERSIONS_MAX_KEY)
0 0 if (key >= 0 && key < API_VERSIONS_MAX_KEY)
968 0 0 if (self->sasl_mechanism) {
979 0 0 if (conn_check_destroyed(self)) return;
1006 0 0 if (end - p < 2) goto err;
1010 0 0 if (end - p < 4) goto err;
1013 0 0 for (i = 0; i < count; i++) {
1016 0 0 if (n < 0) goto err;
1020 0 0 if (error_code != 0) {
1022 0 0 if (conn_check_destroyed(self)) return;
1033 0 0 if (conn_check_destroyed(self)) return;
1046 0 0 if (self->sasl_mechanism && strcmp(self->sasl_mechanism, "PLAIN") == 0) {
0 0 if (self->sasl_mechanism && strcmp(self->sasl_mechanism, "PLAIN") == 0) {
1048 0 0 STRLEN ulen = self->sasl_username ? strlen(self->sasl_username) : 0;
1049 0 0 STRLEN plen = self->sasl_password ? strlen(self->sasl_password) : 0;
1054 0 0 if (ulen > 0) kf_buf_append(&body, self->sasl_username, ulen);
1056 0 0 if (plen > 0) kf_buf_append(&body, self->sasl_password, plen);
1059 0 0 else if (self->sasl_mechanism &&
1060 0 0 (strcmp(self->sasl_mechanism, "SCRAM-SHA-256") == 0 ||
1061 0 0 strcmp(self->sasl_mechanism, "SCRAM-SHA-512") == 0)) {
1068 0 0 for (i = 0; i < 16; i++)
1074 0 0 if (self->scram_nonce) Safefree(self->scram_nonce);
1086 0 0 if (self->scram_client_first) Safefree(self->scram_client_first);
1113 0 0 if (end - p < 2) goto err;
1120 0 0 if (n < 0) goto err;
1127 0 0 if (end - p >= 4) {
1129 0 0 if (auth_data_len > 0 && end - p >= auth_data_len) {
0 0 if (auth_data_len > 0 && end - p >= auth_data_len) {
1135 0 0 if (error_code != 0) {
1137 0 0 if (errmsg_str && errmsg_len > 0)
0 0 if (errmsg_str && errmsg_len > 0)
1142 0 0 if (conn_check_destroyed(self)) return;
1149 0 0 if (self->sasl_mechanism && self->scram_step == SCRAM_STEP_CLIENT_FIRST && auth_data) {
0 0 if (self->sasl_mechanism && self->scram_step == SCRAM_STEP_CLIENT_FIRST && auth_data) {
0 0 if (self->sasl_mechanism && self->scram_step == SCRAM_STEP_CLIENT_FIRST && auth_data) {
1160 0 0 while (sp < se) {
1161 0 0 if (sp + 2 <= se && sp[0] == 'r' && sp[1] == '=') {
0 0 if (sp + 2 <= se && sp[0] == 'r' && sp[1] == '=') {
0 0 if (sp + 2 <= se && sp[0] == 'r' && sp[1] == '=') {
1163 0 0 while (sp < se && *sp != ',') sp++;
0 0 while (sp < se && *sp != ',') sp++;
1165 0 0 } else if (sp + 2 <= se && sp[0] == 's' && sp[1] == '=') {
0 0 } else if (sp + 2 <= se && sp[0] == 's' && sp[1] == '=') {
0 0 } else if (sp + 2 <= se && sp[0] == 's' && sp[1] == '=') {
1167 0 0 while (sp < se && *sp != ',') sp++;
0 0 while (sp < se && *sp != ',') sp++;
1169 0 0 } else if (sp + 2 <= se && sp[0] == 'i' && sp[1] == '=') {
0 0 } else if (sp + 2 <= se && sp[0] == 'i' && sp[1] == '=') {
0 0 } else if (sp + 2 <= se && sp[0] == 'i' && sp[1] == '=') {
1172 0 0 while (sp < se && *sp != ',') sp++;
0 0 while (sp < se && *sp != ',') sp++;
1174 0 0 if (sp < se && *sp == ',') sp++;
0 0 if (sp < se && *sp == ',') sp++;
1179 0 0 if (!server_nonce || !salt_b64 || iterations <= 0) {
0 0 if (!server_nonce || !salt_b64 || iterations <= 0) {
0 0 if (!server_nonce || !salt_b64 || iterations <= 0) {
1181 0 0 if (conn_check_destroyed(self)) return;
1187 0 0 if (server_nonce_len < 32 ||
1188 0 0 memcmp(server_nonce, self->scram_nonce, 32) != 0) {
1190 0 0 if (conn_check_destroyed(self)) return;
1196 0 0 const EVP_MD *md = is_sha512 ? EVP_sha512() : EVP_sha256();
1197 0 0 int digest_len = is_sha512 ? 64 : 32;
1209 0 0 if (salt_len <= 0) {
1211 0 0 if (conn_check_destroyed(self)) return;
1260 0 0 for (di = 0; di < digest_len; di++)
1276 0 0 int plen = bptr->length < 255 ? (int)bptr->length : 255;
1305 0 0 if (self->sasl_mechanism && self->scram_step == SCRAM_STEP_CLIENT_FINAL) {
0 0 if (self->sasl_mechanism && self->scram_step == SCRAM_STEP_CLIENT_FINAL) {
1319 0 0 if (conn_check_destroyed(self)) return;
1331 0 0 if (cbt->internal) {
1348 0 0 if (!cbt->cb) return;
1467 0 0 if (key) {
1475 0 0 if (value) {
1483 0 0 if (headers && HvUSEDKEYS(headers) > 0) {
0 0 if (headers && HvUSEDKEYS(headers) > 0) {
0 0 if (headers && HvUSEDKEYS(headers) > 0) {
1484 0 0 kf_buf_append_varint(&rec, (int64_t)HvUSEDKEYS(headers));
1487 0 0 while ((entry = hv_iternext(headers))) {
1523 0 0 for (i = 0; i < count; i++) {
1525 0 0 if (!elem || !SvROK(*elem))
0 0 if (!elem || !SvROK(*elem))
1534 0 0 if (key_sv && SvOK(*key_sv)) key = SvPV(*key_sv, key_len);
0 0 if (key_sv && SvOK(*key_sv)) key = SvPV(*key_sv, key_len);
1535 0 0 if (val_sv && SvOK(*val_sv)) val = SvPV(*val_sv, val_len);
0 0 if (val_sv && SvOK(*val_sv)) val = SvPV(*val_sv, val_len);
1536 0 0 if (hdr_sv && SvROK(*hdr_sv) && SvTYPE(SvRV(*hdr_sv)) == SVt_PVHV)
0 0 if (hdr_sv && SvROK(*hdr_sv) && SvTYPE(SvRV(*hdr_sv)) == SVt_PVHV)
0 0 if (hdr_sv && SvROK(*hdr_sv) && SvTYPE(SvRV(*hdr_sv)) == SVt_PVHV)
1545 0 0 if (is_transactional) attrs |= 0x10; /* bit 4 = isTransactional */
1574 0 0 if (compression == COMPRESS_GZIP) {
1583 0 0 if (zinit == Z_OK) {
1588 0 0 if (deflate(&zs, Z_FINISH) == Z_STREAM_END) zok = 1;
1591 0 0 if (zok) {
1668 0 0 if (compression == COMPRESS_GZIP) {
1677 0 0 if (zinit == Z_OK) {
1682 0 0 if (deflate(&zs, Z_FINISH) == Z_STREAM_END) zok = 1;
1685 0 0 if (zok) {
1739 0 0 if (flexible) {
1741 0 0 if (end - p < 4) goto done;
1747 0 0 if (n < 0) goto done;
1752 0 0 for (i = 0; i < broker_count; i++) {
1754 0 0 if (end - p < 4) goto done;
1760 0 0 if (n < 0) goto done;
1762 0 0 hv_store(bh, "host", 4, newSVpvn(host ? host : "", host ? hlen : 0), 0);
0 0 hv_store(bh, "host", 4, newSVpvn(host ? host : "", host ? hlen : 0), 0);
1764 0 0 if (end - p < 4) goto done;
1771 0 0 if (n < 0) goto done;
1776 0 0 if (n < 0) goto done;
1785 0 0 if (n < 0) goto done;
1789 0 0 if (end - p < 4) goto done;
1795 0 0 if (n < 0) goto done;
1799 0 0 for (i = 0; i < topic_count; i++) {
1801 0 0 if (end - p < 2) goto done;
1807 0 0 if (n < 0) goto done;
1809 0 0 hv_store(th, "name", 4, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
0 0 hv_store(th, "name", 4, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
1812 0 0 if (version >= 10) {
1813 0 0 if (end - p < 16) goto done;
1818 0 0 if (end - p < 1) goto done;
1823 0 0 if (n < 0) goto done;
1829 0 0 for (j = 0; j < part_count; j++) {
1831 0 0 if (end - p < 2) goto done;
1835 0 0 if (end - p < 4) goto done;
1839 0 0 if (end - p < 4) goto done;
1844 0 0 if (version >= 7) {
1845 0 0 if (end - p < 4) goto done;
1851 0 0 if (n < 0) goto done;
1854 0 0 if (end - p < (ptrdiff_t)(rcount * 4)) goto done;
1859 0 0 if (n < 0) goto done;
1862 0 0 if (end - p < (ptrdiff_t)(rcount * 4)) goto done;
1866 0 0 if (version >= 5) {
1868 0 0 if (n < 0) goto done;
1871 0 0 if (end - p < (ptrdiff_t)(rcount * 4)) goto done;
1877 0 0 if (n < 0) goto done;
1885 0 0 if (version >= 8) {
1886 0 0 if (end - p < 4) goto done;
1892 0 0 if (n < 0) goto done;
1900 0 0 if (version >= 3) {
1901 0 0 if (end - p < 4) goto done;
1906 0 0 if (end - p < 4) goto done;
1909 0 0 for (i = 0; i < broker_count; i++) {
1911 0 0 if (end - p < 4) goto done;
1917 0 0 if (n < 0) goto done;
1919 0 0 hv_store(bh, "host", 4, newSVpvn(host ? host : "", host ? hlen : 0), 0);
0 0 hv_store(bh, "host", 4, newSVpvn(host ? host : "", host ? hlen : 0), 0);
1921 0 0 if (end - p < 4) goto done;
1926 0 0 if (version >= 1) {
1929 0 0 if (n < 0) goto done;
1937 0 0 if (version >= 2) {
1940 0 0 if (n < 0) goto done;
1945 0 0 if (version >= 1) {
1946 0 0 if (end - p < 4) goto done;
1952 0 0 if (end - p < 4) goto done;
1954 0 0 for (i = 0; i < topic_count; i++) {
1956 0 0 if (end - p < 2) goto done;
1962 0 0 if (n < 0) goto done;
1964 0 0 hv_store(th, "name", 4, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
0 0 hv_store(th, "name", 4, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
1967 0 0 if (version >= 1) {
1968 0 0 if (end - p < 1) goto done;
1973 0 0 if (end - p < 4) goto done;
1977 0 0 for (j = 0; j < part_count; j++) {
1979 0 0 if (end - p < 2) goto done;
1983 0 0 if (end - p < 4) goto done;
1987 0 0 if (end - p < 4) goto done;
1992 0 0 if (version >= 7) {
1993 0 0 if (end - p < 4) goto done;
1998 0 0 if (end - p < 4) goto done;
2000 0 0 if (end - p < (ptrdiff_t)(rcount * 4)) goto done;
2004 0 0 if (end - p < 4) goto done;
2006 0 0 if (end - p < (ptrdiff_t)(rcount * 4)) goto done;
2010 0 0 if (version >= 5) {
2011 0 0 if (end - p < 4) goto done;
2013 0 0 if (end - p < (ptrdiff_t)(rcount * 4)) goto done;
2045 0 0 if (end - p < 4) goto done;
2049 0 0 for (i = 0; i < topic_count; i++) {
2053 0 0 if (n < 0) goto done;
2055 0 0 hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
0 0 hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
2058 0 0 if (end - p < 4) goto done;
2063 0 0 for (j = 0; j < part_count; j++) {
2065 0 0 if (end - p < 4) goto done;
2069 0 0 if (end - p < 2) goto done;
2073 0 0 if (end - p < 8) goto done;
2078 0 0 if (version >= 2) {
2079 0 0 if (end - p < 8) goto done;
2084 0 0 if (version >= 5) {
2085 0 0 if (end - p < 8) goto done;
2096 0 0 if (version >= 1 && end - p >= 4) {
0 0 if (version >= 1 && end - p >= 4) {
2119 0 0 if (end - p < 12) return -1;
2121 0 0 if (out_base_offset) *out_base_offset = base_offset;
2124 0 0 if (end - p < batch_length) return -1;
2127 0 0 if (batch_end - p < 9) return -1;
2130 0 0 if (magic != 2) return -1; /* only support magic=2 (current format) */
2133 0 0 if (batch_end - p < 36) return -1;
2143 0 0 if (batch_end - p < 4) return -1;
2151 0 0 if (compression_type != COMPRESS_NONE && batch_end > p) {
0 0 if (compression_type != COMPRESS_NONE && batch_end > p) {
2154 0 0 if (decomp_cap < 4096) decomp_cap = 4096;
2157 0 0 if (compression_type == COMPRESS_GZIP) {
2159 0 0 while (!zok && decomp_cap < 64 * 1024 * 1024) {
0 0 while (!zok && decomp_cap < 64 * 1024 * 1024) {
2164 0 0 if (zinit != Z_OK) {
2176 0 0 if (zret == Z_STREAM_END) {
2180 0 0 } else if (zret == Z_BUF_ERROR || zret == Z_OK) {
0 0 } else if (zret == Z_BUF_ERROR || zret == Z_OK) {
2210 0 0 for (i = 0; i < record_count; i++) {
2213 0 0 if (n < 0) { if (decompressed) Safefree(decompressed); return -1; }
0 0 if (n < 0) { if (decompressed) Safefree(decompressed); return -1; }
2215 0 0 if (rec_end - rp < rec_len) { if (decompressed) Safefree(decompressed); return -1; }
0 0 if (rec_end - rp < rec_len) { if (decompressed) Safefree(decompressed); return -1; }
2218 0 0 if (this_rec_end - rp < 1) { if (decompressed) Safefree(decompressed); return -1; }
0 0 if (this_rec_end - rp < 1) { if (decompressed) Safefree(decompressed); return -1; }
2223 0 0 if (n < 0) { if (decompressed) Safefree(decompressed); return -1; }
0 0 if (n < 0) { if (decompressed) Safefree(decompressed); return -1; }
2228 0 0 if (n < 0) { if (decompressed) Safefree(decompressed); return -1; }
0 0 if (n < 0) { if (decompressed) Safefree(decompressed); return -1; }
2234 0 0 if (n < 0) { if (decompressed) Safefree(decompressed); return -1; }
0 0 if (n < 0) { if (decompressed) Safefree(decompressed); return -1; }
2237 0 0 if (key_len >= 0) {
2238 0 0 if (this_rec_end - rp < key_len) { if (decompressed) Safefree(decompressed); return -1; }
0 0 if (this_rec_end - rp < key_len) { if (decompressed) Safefree(decompressed); return -1; }
2246 0 0 if (n < 0) { if (decompressed) Safefree(decompressed); return -1; }
0 0 if (n < 0) { if (decompressed) Safefree(decompressed); return -1; }
2249 0 0 if (val_len >= 0) {
2250 0 0 if (this_rec_end - rp < val_len) { if (decompressed) Safefree(decompressed); return -1; }
0 0 if (this_rec_end - rp < val_len) { if (decompressed) Safefree(decompressed); return -1; }
2258 0 0 if (n < 0) { if (decompressed) Safefree(decompressed); return -1; }
0 0 if (n < 0) { if (decompressed) Safefree(decompressed); return -1; }
2261 0 0 if (hdr_count > 0) {
2264 0 0 for (h = 0; h < hdr_count; h++) {
2267 0 0 if (n < 0) { SvREFCNT_dec((SV*)hdr_hv); if (decompressed) Safefree(decompressed); return -1; }
0 0 if (n < 0) { SvREFCNT_dec((SV*)hdr_hv); if (decompressed) Safefree(decompressed); return -1; }
2270 0 0 if (this_rec_end - rp < hk_len) { SvREFCNT_dec((SV*)hdr_hv); if (decompressed) Safefree(decompressed); return -1; }
0 0 if (this_rec_end - rp < hk_len) { SvREFCNT_dec((SV*)hdr_hv); if (decompressed) Safefree(decompressed); return -1; }
2275 0 0 if (n < 0) { SvREFCNT_dec((SV*)hdr_hv); if (decompressed) Safefree(decompressed); return -1; }
0 0 if (n < 0) { SvREFCNT_dec((SV*)hdr_hv); if (decompressed) Safefree(decompressed); return -1; }
2278 0 0 if (hv_len >= 0) {
2279 0 0 if (this_rec_end - rp < hv_len) { SvREFCNT_dec((SV*)hdr_hv); if (decompressed) Safefree(decompressed); return -1; }
0 0 if (this_rec_end - rp < hv_len) { SvREFCNT_dec((SV*)hdr_hv); if (decompressed) Safefree(decompressed); return -1; }
2283 0 0 hv_store(hdr_hv, hk_data, (I32)hk_len,
2291 0 0 hv_store(rec_hv, "key", 3,
2293 0 0 hv_store(rec_hv, "value", 5,
2295 0 0 if (hdr_hv)
2303 0 0 if (decompressed) Safefree(decompressed);
2320 0 0 if (version >= 1) {
2321 0 0 if (end - p < 4) goto done;
2327 0 0 if (version >= 7) {
2328 0 0 if (end - p < 2) goto done;
2333 0 0 if (version >= 7) {
2334 0 0 if (end - p < 4) goto done;
2339 0 0 if (end - p < 4) goto done;
2343 0 0 for (i = 0; i < topic_count; i++) {
2348 0 0 if (n < 0) goto done;
2350 0 0 hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
0 0 hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
2353 0 0 if (end - p < 4) goto done;
2358 0 0 for (j = 0; j < part_count; j++) {
2361 0 0 if (end - p < 4) goto done;
2365 0 0 if (end - p < 2) goto done;
2369 0 0 if (end - p < 8) goto done;
2374 0 0 if (version >= 4) {
2375 0 0 if (end - p < 8) goto done;
2381 0 0 if (version >= 5) {
2382 0 0 if (end - p < 8) goto done;
2387 0 0 if (version >= 4) {
2388 0 0 if (end - p < 4) goto done;
2391 0 0 for (at = 0; at < at_count; at++) {
2392 0 0 if (end - p < 16) goto done;
2398 0 0 if (end - p < 4) goto done;
2402 0 0 if (records_size > 0 && end - p >= records_size) {
0 0 if (records_size > 0 && end - p >= records_size) {
2407 0 0 while (rp < rend && rend - rp >= 12) {
0 0 while (rp < rend && rend - rp >= 12) {
2410 0 0 if (bl < 0 || rend - rp < 12 + bl) break;
0 0 if (bl < 0 || rend - rp < 12 + bl) break;
2416 0 0 } else if (records_size > 0) {
2446 0 0 if (version >= 2) {
2447 0 0 if (end - p < 4) goto done;
2452 0 0 if (end - p < 4) goto done;
2456 0 0 for (i = 0; i < topic_count; i++) {
2460 0 0 if (n < 0) goto done;
2462 0 0 hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
0 0 hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
2464 0 0 if (end - p < 4) goto done;
2469 0 0 for (j = 0; j < part_count; j++) {
2471 0 0 if (end - p < 4) goto done;
2475 0 0 if (end - p < 2) goto done;
2479 0 0 if (version >= 1) {
2480 0 0 if (end - p < 8) goto done;
2485 0 0 if (end - p < 8) goto done;
2490 0 0 if (version >= 4) {
2491 0 0 if (end - p < 4) goto done;
2517 0 0 if (version >= 1) {
2518 0 0 if (end - p < 4) goto done;
2522 0 0 if (end - p < 2) goto done;
2527 0 0 if (version >= 1) {
2530 0 0 if (n < 0) goto done;
2532 0 0 if (emsg && elen > 0)
0 0 if (emsg && elen > 0)
2536 0 0 if (end - p < 4) goto done;
2542 0 0 if (n < 0) goto done;
2544 0 0 hv_store(result, "host", 4, newSVpvn(host ? host : "", host ? hlen : 0), 0);
0 0 hv_store(result, "host", 4, newSVpvn(host ? host : "", host ? hlen : 0), 0);
2546 0 0 if (end - p < 4) goto done;
2565 0 0 if (version >= 2) {
2566 0 0 if (end - p < 4) goto done;
2570 0 0 if (end - p < 2) goto done;
2574 0 0 if (end - p < 4) goto done;
2582 0 0 if (n < 0) goto done;
2584 0 0 if (proto)
2589 0 0 if (n < 0) goto done;
2591 0 0 if (leader)
2598 0 0 if (n < 0) goto done;
2600 0 0 if (member_id)
2604 0 0 if (end - p < 4) goto done;
2608 0 0 for (i = 0; i < mcount; i++) {
2613 0 0 if (n < 0) goto done;
2615 0 0 if (mid)
2619 0 0 if (version >= 5) {
2622 0 0 if (n < 0) goto done;
2627 0 0 if (end - p < 4) goto done;
2629 0 0 if (mdlen > 0) {
2630 0 0 if (end - p < mdlen) goto done;
2653 0 0 if (version >= 1) {
2654 0 0 if (end - p < 4) goto done;
2658 0 0 if (end - p < 2) goto done;
2663 0 0 if (end - p < 4) goto done;
2665 0 0 if (alen > 0 && end - p >= alen) {
0 0 if (alen > 0 && end - p >= alen) {
2683 0 0 if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */
0 0 if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */
2684 0 0 if (end - p >= 2) {
2703 0 0 if (version >= 3 && end - p >= 4) p += 4; /* throttle_time_ms */
0 0 if (version >= 3 && end - p >= 4) p += 4; /* throttle_time_ms */
2705 0 0 if (end - p < 4) goto done;
2708 0 0 for (i = 0; i < tc; i++) {
2712 0 0 if (n < 0) goto done;
2714 0 0 hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
0 0 hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
2716 0 0 if (end - p < 4) goto done;
2720 0 0 for (j = 0; j < pc; j++) {
2722 0 0 if (end - p < 6) goto done;
2749 0 0 if (version >= 3 && end - p >= 4) p += 4; /* throttle_time_ms */
0 0 if (version >= 3 && end - p >= 4) p += 4; /* throttle_time_ms */
2751 0 0 if (end - p < 4) goto done;
2754 0 0 for (i = 0; i < tc; i++) {
2758 0 0 if (n < 0) goto done;
2760 0 0 hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
0 0 hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
2762 0 0 if (end - p < 4) goto done;
2766 0 0 for (j = 0; j < pc; j++) {
2768 0 0 if (end - p < 4) goto done;
2772 0 0 if (end - p < 8) goto done;
2777 0 0 if (version >= 5 && end - p >= 4) p += 4;
0 0 if (version >= 5 && end - p >= 4) p += 4;
2781 0 0 if (n < 0) goto done;
2784 0 0 if (end - p < 2) goto done;
2795 0 0 if (version >= 2 && end - p >= 2) {
0 0 if (version >= 2 && end - p >= 2) {
2814 0 0 if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */
0 0 if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */
2815 0 0 if (end - p >= 2) {
2834 0 0 if (version >= 2 && end - p >= 4) p += 4; /* throttle_time_ms */
0 0 if (version >= 2 && end - p >= 4) p += 4; /* throttle_time_ms */
2836 0 0 if (end - p < 4) goto done;
2839 0 0 for (i = 0; i < tc; i++) {
2843 0 0 if (n < 0) goto done;
2845 0 0 hv_store(th, "name", 4, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
0 0 hv_store(th, "name", 4, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
2847 0 0 if (end - p < 2) goto done;
2852 0 0 if (version >= 1) {
2855 0 0 if (n < 0) goto done;
2857 0 0 if (emsg && elen > 0)
0 0 if (emsg && elen > 0)
2880 0 0 if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */
0 0 if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */
2882 0 0 if (end - p < 4) goto done;
2885 0 0 for (i = 0; i < tc; i++) {
2889 0 0 if (n < 0) goto done;
2891 0 0 hv_store(th, "name", 4, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
0 0 hv_store(th, "name", 4, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
2893 0 0 if (end - p < 2) goto done;
2914 0 0 if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */
0 0 if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */
2916 0 0 if (end - p < 2) goto done;
2920 0 0 if (end - p < 8) goto done;
2924 0 0 if (end - p < 2) goto done;
2943 0 0 if (end - p < 4) goto done;
2946 0 0 if (end - p < 4) goto done;
2949 0 0 for (i = 0; i < tc; i++) {
2953 0 0 if (n < 0) goto done;
2955 0 0 hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
0 0 hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
2957 0 0 if (end - p < 4) goto done;
2961 0 0 for (j = 0; j < pc; j++) {
2963 0 0 if (end - p < 6) goto done;
2988 0 0 if (end - p >= 4) p += 4; /* throttle_time_ms */
2989 0 0 if (end - p >= 2) {
3008 0 0 if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */
0 0 if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */
3010 0 0 if (end - p < 4) goto done;
3013 0 0 for (i = 0; i < tc; i++) {
3017 0 0 if (n < 0) goto done;
3019 0 0 hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
0 0 hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
3021 0 0 if (end - p < 4) goto done;
3025 0 0 for (j = 0; j < pc; j++) {
3027 0 0 if (end - p < 6) goto done;
3048 0 0 while (self->rbuf_len >= 4) {
3050 0 0 if (msg_size < 0 || msg_size > 256 * 1024 * 1024) {
0 0 if (msg_size < 0 || msg_size > 256 * 1024 * 1024) {
3052 0 0 if (conn_check_destroyed(self)) return;
3057 0 0 if (self->rbuf_len < (size_t)(4 + msg_size))
3063 0 0 if (msg_size < 4) {
3065 0 0 if (conn_check_destroyed(self)) return;
3076 0 0 if (!ngx_queue_empty(&self->cb_queue)) {
3079 0 0 if (cbt->correlation_id != corr_id) {
3086 0 0 if (conn_check_destroyed(self)) return;
3096 0 0 if (cbt) {
3098 0 0 if (cbt->cb) SvREFCNT_dec(cbt->cb);
3100 0 0 if (conn_check_destroyed(self)) return;
3105 0 0 if (self->rbuf_len > 0)
3119 1 0 if (!self || self->magic != KF_MAGIC_ALIVE) return;
0 1 if (!self || self->magic != KF_MAGIC_ALIVE) return;
3122 1 0 if (self->state == CONN_CONNECTING) {
3126 1 0 if (self->timing) {
3132 0 1 if (getsockopt(self->fd, SOL_SOCKET, SO_ERROR, &err, &errlen) < 0)
3134 1 0 if (err != 0) {
3138 0 1 if (conn_check_destroyed(self)) return;
3149 0 0 if (self->state == CONN_TLS_HANDSHAKE) {
3151 0 0 if (ret == 1) {
3161 0 0 if (err == SSL_ERROR_WANT_READ) {
3164 0 0 } else if (err == SSL_ERROR_WANT_WRITE) {
3169 0 0 if (conn_check_destroyed(self)) return;
3177 0 0 if (revents & EV_WRITE) {
3178 0 0 while (self->wbuf_off < self->wbuf_len) {
3181 0 0 if (n < 0) {
3182 0 0 if (errno == EAGAIN || errno == EWOULDBLOCK) break;
0 0 if (errno == EAGAIN || errno == EWOULDBLOCK) break;
3184 0 0 if (conn_check_destroyed(self)) return;
3188 0 0 if (n == 0) {
3195 0 0 if (self->wbuf_off >= self->wbuf_len) {
3203 0 0 if (revents & EV_READ) {
3207 0 0 if (n < 0) {
3208 0 0 if (errno == EAGAIN || errno == EWOULDBLOCK) return;
0 0 if (errno == EAGAIN || errno == EWOULDBLOCK) return;
3210 0 0 if (conn_check_destroyed(self)) return;
3214 0 0 if (n == 0) {
3235 0 0 if (self->magic != KF_MAGIC_ALIVE) return;
3238 0 0 if (conn_check_destroyed(self)) return;
3259 0 0 if (self->tls_enabled) {
3261 0 0 if (!self->ssl_ctx) {
3263 0 0 if (conn_check_destroyed(self)) return;
3268 0 0 if (self->tls_skip_verify)
3273 0 0 if (self->tls_ca_file) {
3274 0 0 if (SSL_CTX_load_verify_locations(self->ssl_ctx, self->tls_ca_file, NULL) != 1) {
3276 0 0 if (conn_check_destroyed(self)) return;
3283 0 0 if (!self->ssl) {
3285 0 0 if (conn_check_destroyed(self)) return;
3291 0 0 if (!is_ip_literal(self->host))
3294 0 0 if (!self->tls_skip_verify) {
3297 0 0 if (is_ip_literal(self->host))
3305 0 0 if (ret == 1) {
3312 0 0 if (err == SSL_ERROR_WANT_READ) {
3314 0 0 } else if (err == SSL_ERROR_WANT_WRITE) {
3318 0 0 if (conn_check_destroyed(self)) return;
3337 0 1 if (self->state != CONN_DISCONNECTED) {
3342 1 0 if (host != self->host) {
3343 0 1 if (self->host) Safefree(self->host);
3356 0 1 if (gai_err != 0) {
3363 1 0 for (rp = res; rp; rp = rp->ai_next) {
3365 0 1 if (fd < 0) continue;
3377 0 1 if (ret == 0) {
3392 1 0 if (errno == EINPROGRESS) {
3404 1 0 if (timeout > 0) {
3528 0 6 if (SvOK(loop_sv) && sv_derived_from(loop_sv, "EV::Loop"))
0 0 if (SvOK(loop_sv) && sv_derived_from(loop_sv, "EV::Loop"))
3558 384 6 for (i = 0; i < API_VERSIONS_MAX_KEY; i++)
3573 0 6 if (self->magic != KF_MAGIC_ALIVE) return;
3581 0 6 if (self->reconnect_timing) {
3586 1 5 CLEAR_HANDLER(self->on_error);
3587 0 6 CLEAR_HANDLER(self->on_connect);
3588 0 6 CLEAR_HANDLER(self->on_disconnect);
3590 1 5 if (self->host) Safefree(self->host);
3591 6 0 if (self->client_id) Safefree(self->client_id);
3592 0 6 if (self->sasl_mechanism) Safefree(self->sasl_mechanism);
3593 0 6 if (self->sasl_username) Safefree(self->sasl_username);
3594 0 6 if (self->sasl_password) Safefree(self->sasl_password);
3595 0 6 if (self->scram_nonce) Safefree(self->scram_nonce);
3596 0 6 if (self->scram_client_first) Safefree(self->scram_client_first);
3597 0 6 if (self->tls_ca_file) Safefree(self->tls_ca_file);
3598 6 0 if (self->rbuf) Safefree(self->rbuf);
3599 6 0 if (self->wbuf) Safefree(self->wbuf);
3616 0 0 if (self->reconnect_timing) {
3626 0 2 RETVAL = (self->state == CONN_READY) ? 1 : 0;
3633 0 0 RETVAL = self->state;
3640 0 0 RETVAL = self->pending_count;
3648 0 1 CLEAR_HANDLER(self->on_error);
3649 1 0 if (cb && SvOK(cb)) {
1 0 if (cb && SvOK(cb)) {
3658 0 0 CLEAR_HANDLER(self->on_connect);
3659 0 0 if (cb && SvOK(cb)) {
0 0 if (cb && SvOK(cb)) {
3668 0 0 CLEAR_HANDLER(self->on_disconnect);
3669 0 0 if (cb && SvOK(cb)) {
0 0 if (cb && SvOK(cb)) {
3678 0 0 if (id) {
3679 0 0 if (self->client_id) Safefree(self->client_id);
3690 0 0 if (self->tls_ca_file) { Safefree(self->tls_ca_file); self->tls_ca_file = NULL; }
3691 0 0 if (ca_file) self->tls_ca_file = savepv(ca_file);
3699 0 0 if (self->sasl_mechanism) { Safefree(self->sasl_mechanism); self->sasl_mechanism = NULL; }
3700 0 0 if (self->sasl_username) { Safefree(self->sasl_username); self->sasl_username = NULL; }
3701 0 0 if (self->sasl_password) { Safefree(self->sasl_password); self->sasl_password = NULL; }
3702 0 0 if (SvOK(ST(1))) {
3704 0 0 if (username) self->sasl_username = savepv(username);
3705 0 0 if (password) self->sasl_password = savepv(password);
3721 1 0 if (self->state != CONN_READY)
3729 0 0 if (ver < 0) ver = 1;
3730 0 0 if (ver > 4) ver = 4;
3732 0 0 if (SvOK(topics_sv) && SvROK(topics_sv) && SvTYPE(SvRV(topics_sv)) == SVt_PVAV) {
0 0 if (SvOK(topics_sv) && SvROK(topics_sv) && SvTYPE(SvRV(topics_sv)) == SVt_PVAV) {
0 0 if (SvOK(topics_sv) && SvROK(topics_sv) && SvTYPE(SvRV(topics_sv)) == SVt_PVAV) {
3736 0 0 for (i = 0; i < count; i++) {
3747 0 0 if (ver >= 4)
3758 0 0 if (!self->api_versions_known)
3763 0 0 for (i = 0; i < API_VERSIONS_MAX_KEY; i++) {
3764 0 0 if (self->api_versions[i] >= 0) {
3770 0 0 EXTEND(SP, 1);
3779 1 0 if (self->state != CONN_READY)
3786 0 0 if (ver < 0) ver = 4;
3787 0 0 if (ver > 7) ver = 7;
3797 0 0 if (ver >= 3)
3801 0 0 if (ver >= 4)
3805 0 0 if (ver >= 7) {
3822 0 0 if (ver >= 5)
3829 0 0 if (ver >= 7)
3840 0 0 if (self->state != CONN_READY)
3844 0 0 if (!SvROK(topics_sv) || SvTYPE(SvRV(topics_sv)) != SVt_PVHV)
0 0 if (!SvROK(topics_sv) || SvTYPE(SvRV(topics_sv)) != SVt_PVHV)
3849 0 0 if (ver < 0) ver = 4;
3850 0 0 if (ver > 7) ver = 7;
3858 0 0 if (ver >= 3)
3860 0 0 if (ver >= 4)
3862 0 0 if (ver >= 7) {
3868 0 0 kf_buf_append_i32(&body, (int32_t)HvUSEDKEYS(topics_hv));
3872 0 0 while ((entry = hv_iternext(topics_hv))) {
3878 0 0 if (!SvROK(parts_sv) || SvTYPE(SvRV(parts_sv)) != SVt_PVAV)
0 0 if (!SvROK(parts_sv) || SvTYPE(SvRV(parts_sv)) != SVt_PVAV)
3884 0 0 for (i = 0; i < pc; i++) {
3886 0 0 if (!elem || !SvROK(*elem))
0 0 if (!elem || !SvROK(*elem))
3891 0 0 int32_t pid = pid_sv ? (int32_t)SvIV(*pid_sv) : 0;
3895 0 0 int64_t offset = off_sv ? (int64_t)SvIV(*off_sv) : 0;
3898 0 0 if (ver >= 5)
3905 0 0 if (ver >= 7)
3916 0 0 if (self->state != CONN_READY)
3919 0 0 if (!SvROK(records_sv) || SvTYPE(SvRV(records_sv)) != SVt_PVAV)
0 0 if (!SvROK(records_sv) || SvTYPE(SvRV(records_sv)) != SVt_PVAV)
3931 0 0 if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV) {
0 0 if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV) {
0 0 if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV) {
3934 0 0 if ((tmp = hv_fetch(opts, "acks", 4, 0)))
3936 0 0 if ((tmp = hv_fetch(opts, "transactional_id", 16, 0)) && SvOK(*tmp))
0 0 if ((tmp = hv_fetch(opts, "transactional_id", 16, 0)) && SvOK(*tmp))
3938 0 0 if ((tmp = hv_fetch(opts, "compression", 11, 0))) {
3941 0 0 if (clen == 4 && memcmp(cstr, "none", 4) == 0) compression = COMPRESS_NONE;
0 0 if (clen == 4 && memcmp(cstr, "none", 4) == 0) compression = COMPRESS_NONE;
3946 0 0 else if (clen == 4 && memcmp(cstr, "gzip", 4) == 0) compression = COMPRESS_GZIP;
0 0 else if (clen == 4 && memcmp(cstr, "gzip", 4) == 0) compression = COMPRESS_GZIP;
3950 0 0 if ((tmp = hv_fetch(opts, "producer_id", 11, 0)))
3952 0 0 if ((tmp = hv_fetch(opts, "producer_epoch", 14, 0)))
3954 0 0 if ((tmp = hv_fetch(opts, "base_sequence", 13, 0)))
3956 0 0 } else if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVCV) {
0 0 } else if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVCV) {
0 0 } else if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVCV) {
3971 0 0 if (ver < 0) ver = 3;
3972 0 0 if (ver > 7) ver = 7;
3979 0 0 if (ver >= 3)
3980 0 0 kf_buf_append_nullable_string(&body, txn_id, txn_id ? (int16_t)txn_id_len : 0);
3992 0 0 conn_send_request(aTHX_ self, API_PRODUCE, ver, &body,
3993 0 0 (cb && SvOK(cb)) ? cb : NULL, 0, (acks == 0) ? 1 : 0);
4003 0 0 if (self->state != CONN_READY)
4011 0 0 if (ver < 0) ver = 1;
4012 0 0 if (ver > 5) ver = 5;
4020 0 0 if (ver >= 2)
4032 0 0 if (ver >= 4)
4045 1 0 if (self->state != CONN_READY)
4055 0 0 if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV) {
0 0 if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV) {
0 0 if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV) {
4058 0 0 if ((tmp = hv_fetch(opts, "headers", 7, 0)) && SvROK(*tmp) && SvTYPE(SvRV(*tmp)) == SVt_PVHV)
0 0 if ((tmp = hv_fetch(opts, "headers", 7, 0)) && SvROK(*tmp) && SvTYPE(SvRV(*tmp)) == SVt_PVHV)
0 0 if ((tmp = hv_fetch(opts, "headers", 7, 0)) && SvROK(*tmp) && SvTYPE(SvRV(*tmp)) == SVt_PVHV)
4060 0 0 if ((tmp = hv_fetch(opts, "acks", 4, 0)))
4062 0 0 if ((tmp = hv_fetch(opts, "compression", 11, 0))) {
4065 0 0 if (clen == 4 && memcmp(cstr, "none", 4) == 0) compression = COMPRESS_NONE;
0 0 if (clen == 4 && memcmp(cstr, "none", 4) == 0) compression = COMPRESS_NONE;
4070 0 0 else if (clen == 4 && memcmp(cstr, "gzip", 4) == 0) compression = COMPRESS_GZIP;
0 0 else if (clen == 4 && memcmp(cstr, "gzip", 4) == 0) compression = COMPRESS_GZIP;
4074 0 0 } else if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVCV) {
0 0 } else if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVCV) {
0 0 } else if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVCV) {
4082 0 0 if (SvOK(key_sv))
4087 0 0 if (SvOK(value_sv))
4094 0 0 if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV)
0 0 if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV)
0 0 if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV)
4096 0 0 if (ts_tmp && SvOK(*ts_tmp)) {
0 0 if (ts_tmp && SvOK(*ts_tmp)) {
4110 0 0 if (ver < 0) ver = 3;
4111 0 0 if (ver > 7) ver = 7;
4119 0 0 if (ver >= 3)
4137 0 0 conn_send_request(aTHX_ self, API_PRODUCE, ver, &body,
4138 0 0 (cb && SvOK(cb)) ? cb : NULL, 0, (acks == 0) ? 1 : 0);
4148 0 0 if (self->state != CONN_READY)
4152 0 0 if (ver < 0) ver = 0;
4153 0 0 if (ver > 2) ver = 2;
4162 0 0 if (ver >= 1)
4173 0 0 if (self->state != CONN_READY)
4177 0 0 if (ver < 0) ver = 1;
4178 0 0 if (ver > 5) ver = 5;
4188 0 0 if (ver >= 1)
4195 0 0 if (ver >= 5) {
4196 0 0 if (group_instance_id_sv && SvOK(group_instance_id_sv)) {
0 0 if (group_instance_id_sv && SvOK(group_instance_id_sv)) {
4223 0 0 for (i = 0; i < tc; i++) {
4242 0 0 if (self->state != CONN_READY)
4246 0 0 if (ver < 0) ver = 0;
4247 0 0 if (ver > 3) ver = 3;
4260 0 0 if (ver >= 3) {
4261 0 0 if (group_instance_id_sv && SvOK(group_instance_id_sv)) {
0 0 if (group_instance_id_sv && SvOK(group_instance_id_sv)) {
4271 0 0 if (SvOK(assignments_sv) && SvROK(assignments_sv)
0 0 if (SvOK(assignments_sv) && SvROK(assignments_sv)
4272 0 0 && SvTYPE(SvRV(assignments_sv)) == SVt_PVAV) {
4277 0 0 for (i = 0; i < ac; i++) {
4279 0 0 if (!elem || !SvROK(*elem)) continue;
0 0 if (!elem || !SvROK(*elem)) continue;
4283 0 0 if (!mid_sv) continue;
4289 0 0 if (!data_sv) { kf_buf_append_bytes(&body, NULL, 0); continue; }
4306 0 0 if (self->state != CONN_READY)
4310 0 0 if (ver < 0) ver = 0;
4311 0 0 if (ver > 4) ver = 4;
4324 0 0 if (ver >= 3) {
4325 0 0 if (group_instance_id_sv && SvOK(group_instance_id_sv)) {
0 0 if (group_instance_id_sv && SvOK(group_instance_id_sv)) {
4342 0 0 if (self->state != CONN_READY)
4346 0 0 if (ver < 0) ver = 2;
4347 0 0 if (ver > 7) ver = 7;
4356 0 0 if (ver >= 1)
4360 0 0 if (ver >= 1) {
4366 0 0 if (ver >= 7)
4374 0 0 for (i = 0; i < tc; i++) {
4376 0 0 if (!elem || !SvROK(*elem)) continue;
0 0 if (!elem || !SvROK(*elem)) continue;
4379 0 0 if (!tname_sv) continue;
4385 0 0 if (!parts_sv || !SvROK(*parts_sv)) { kf_buf_append_i32(&body, 0); continue; }
0 0 if (!parts_sv || !SvROK(*parts_sv)) { kf_buf_append_i32(&body, 0); continue; }
4390 0 0 for (j = 0; j < pc; j++) {
4392 0 0 if (!pelem || !SvROK(*pelem)) continue;
0 0 if (!pelem || !SvROK(*pelem)) continue;
4396 0 0 kf_buf_append_i32(&body, pid_sv ? (int32_t)SvIV(*pid_sv) : 0);
4399 0 0 kf_buf_append_i64(&body, off_sv ? (int64_t)SvIV(*off_sv) : 0);
4402 0 0 if (ver >= 6)
4418 0 0 if (self->state != CONN_READY)
4422 0 0 if (ver < 0) ver = 1;
4423 0 0 if (ver > 5) ver = 5;
4436 0 0 for (i = 0; i < tc; i++) {
4438 0 0 if (!elem || !SvROK(*elem)) continue;
0 0 if (!elem || !SvROK(*elem)) continue;
4441 0 0 if (!tname_sv) continue;
4447 0 0 if (!parts_sv || !SvROK(*parts_sv)) { kf_buf_append_i32(&body, 0); continue; }
0 0 if (!parts_sv || !SvROK(*parts_sv)) { kf_buf_append_i32(&body, 0); continue; }
4452 0 0 for (j = 0; j < pc; j++) {
4454 0 0 kf_buf_append_i32(&body, pelem ? (int32_t)SvIV(*pelem) : 0);
4466 0 0 if (self->state != CONN_READY)
4470 0 0 if (ver < 0) ver = 0;
4471 0 0 if (ver > 3) ver = 3;
4490 0 0 if (self->state != CONN_READY)
4494 0 0 if (ver < 0) ver = 0;
4495 0 0 if (ver > 4) ver = 4;
4504 0 0 for (i = 0; i < tc; i++) {
4506 0 0 if (!elem || !SvROK(*elem)) continue;
0 0 if (!elem || !SvROK(*elem)) continue;
4510 0 0 if (!name_sv) continue;
4516 0 0 int32_t num_partitions = np_sv ? (int32_t)SvIV(*np_sv) : 1;
4520 0 0 int16_t replication_factor = rf_sv ? (int16_t)SvIV(*rf_sv) : 1;
4533 0 0 if (ver >= 1)
4544 0 0 if (self->state != CONN_READY)
4548 0 0 if (ver < 0) ver = 0;
4549 0 0 if (ver > 3) ver = 3;
4558 0 0 for (i = 0; i < tc; i++) {
4560 0 0 if (!elem) continue;
4576 0 0 if (self->state != CONN_READY)
4580 0 0 if (ver < 0) ver = 0;
4581 0 0 if (ver > 1) ver = 1;
4586 0 0 if (SvOK(transactional_id_sv)) {
4597 0 0 if (ver >= 2) {
4610 0 0 if (self->state != CONN_READY)
4614 0 0 if (ver < 0) ver = 0;
4615 0 0 if (ver > 1) ver = 1;
4628 0 0 if (!SvROK(topics_sv) || SvTYPE(SvRV(topics_sv)) != SVt_PVAV)
0 0 if (!SvROK(topics_sv) || SvTYPE(SvRV(topics_sv)) != SVt_PVAV)
4634 0 0 for (i = 0; i < tc; i++) {
4636 0 0 if (!elem || !SvROK(*elem)) croak("add_partitions_to_txn: bad element");
0 0 if (!elem || !SvROK(*elem)) croak("add_partitions_to_txn: bad element");
4639 0 0 if (!tname_sv) croak("add_partitions_to_txn: missing topic");
4645 0 0 if (!parts_sv || !SvROK(*parts_sv)) croak("add_partitions_to_txn: missing partitions");
0 0 if (!parts_sv || !SvROK(*parts_sv)) croak("add_partitions_to_txn: missing partitions");
4649 0 0 for (j = 0; j < pc; j++) {
4651 0 0 kf_buf_append_i32(&body, pelem ? (int32_t)SvIV(*pelem) : 0);
4663 0 0 if (self->state != CONN_READY)
4667 0 0 if (ver < 0) ver = 0;
4668 0 0 if (ver > 1) ver = 1;
4689 0 0 if (self->state != CONN_READY)
4693 0 0 if (ver < 0) ver = 0;
4694 0 0 if (ver > 3) ver = 3;
4711 0 0 if (ver >= 3) {
4719 0 0 if (!SvROK(offsets_sv) || SvTYPE(SvRV(offsets_sv)) != SVt_PVAV)
0 0 if (!SvROK(offsets_sv) || SvTYPE(SvRV(offsets_sv)) != SVt_PVAV)
4725 0 0 for (i = 0; i < tc; i++) {
4727 0 0 if (!elem || !SvROK(*elem)) croak("txn_offset_commit: bad element");
0 0 if (!elem || !SvROK(*elem)) croak("txn_offset_commit: bad element");
4731 0 0 if (!tname_sv) croak("txn_offset_commit: missing topic");
4737 0 0 if (!parts_sv || !SvROK(*parts_sv)) croak("txn_offset_commit: missing partitions");
0 0 if (!parts_sv || !SvROK(*parts_sv)) croak("txn_offset_commit: missing partitions");
4742 0 0 for (j = 0; j < pc; j++) {
4744 0 0 if (!pelem || !SvROK(*pelem)) croak("txn_offset_commit: bad partition");
0 0 if (!pelem || !SvROK(*pelem)) croak("txn_offset_commit: bad partition");
4748 0 0 kf_buf_append_i32(&body, ppid_sv ? (int32_t)SvIV(*ppid_sv) : 0);
4751 0 0 kf_buf_append_i64(&body, off_sv ? (int64_t)SvIV(*off_sv) : 0);
4754 0 0 if (ver >= 2)
4774 0 6 if (SvOK(loop_sv) && sv_derived_from(loop_sv, "EV::Loop"))
0 0 if (SvOK(loop_sv) && sv_derived_from(loop_sv, "EV::Loop"))
4814 0 6 if (self->magic != KF_MAGIC_ALIVE) return;
4818 0 6 while (!ngx_queue_empty(&self->brokers)) {
4829 0 6 while (!ngx_queue_empty(&self->topics)) {
4833 0 0 if (tm->name) Safefree(tm->name);
4834 0 0 if (tm->partitions) Safefree(tm->partitions);
4838 0 6 if (self->metadata_timing) {
4842 0 6 if (self->linger_timing) {
4846 0 6 if (self->fetch_timing) {
4852 0 6 if (self->bootstrap_hosts) {
4854 0 0 for (i = 0; i < self->bootstrap_count; i++)
4855 0 0 if (self->bootstrap_hosts[i]) Safefree(self->bootstrap_hosts[i]);
4858 0 6 if (self->bootstrap_ports) Safefree(self->bootstrap_ports);
4861 0 6 if (self->group) {
4862 0 0 if (self->group->heartbeat_timing)
4864 0 0 if (self->group->group_id) Safefree(self->group->group_id);
4865 0 0 if (self->group->member_id) Safefree(self->group->member_id);
4866 0 0 CLEAR_HANDLER(self->group->on_assign);
4867 0 0 CLEAR_HANDLER(self->group->on_revoke);
4868 0 0 if (self->group->subscriptions) SvREFCNT_dec((SV*)self->group->subscriptions);
4872 0 6 CLEAR_HANDLER(self->on_error);
4873 0 6 CLEAR_HANDLER(self->on_connect);
4874 0 6 CLEAR_HANDLER(self->on_message);
4875 0 6 CLEAR_HANDLER(self->partitioner);
4877 6 0 if (self->client_id) Safefree(self->client_id);
4878 0 6 if (self->sasl_mechanism) Safefree(self->sasl_mechanism);
4879 0 6 if (self->sasl_username) Safefree(self->sasl_username);
4880 0 6 if (self->sasl_password) Safefree(self->sasl_password);
4881 0 6 if (self->tls_ca_file) Safefree(self->tls_ca_file);
4897 259 16 while (remaining >= 4) {
4919 9 7 RETVAL = (int)(h & 0x7FFFFFFF);
4965 0 10 EXTEND(SP, 1);
4972 0 15 I_EV_API("EV::Kafka");
15 0 I_EV_API("EV::Kafka");
0 15 I_EV_API("EV::Kafka");