Branch Coverage

Kafka.xs
Criterion Covered Total %
branch 460 1922 23.9


line true false branch
135 1656 19 if (b->cap >= need) return;
137 6 13 if (newcap < need) newcap = need;
143 205 0 if (b->data) { Safefree(b->data); b->data = NULL; }
185 0 0 if (len > 0) kf_buf_append(b, s, len);
189 0 1 if (!s) {
193 1 0 if (len > 0) kf_buf_append(b, s, len);
200 0 0 if (len > 0) kf_buf_append(b, s, len);
204 0 0 if (!s) {
208 0 0 if (len > 0) kf_buf_append(b, s, len);
215 21 864 while (val >= 0x80) {
230 0 0 if (!s) {
234 0 0 if (len > 0) kf_buf_append(b, s, len);
271 867 0 while (p < end) {
274 846 21 if (!(b & 0x80)) {
279 0 21 if (shift >= 64) return -1;
288 0 846 if (n < 0) return n;
295 0 13 if (end - buf < 2) return -1;
297 1 12 if (len < 0) { /* nullable null */
302 0 12 if (end - buf < 2 + len) return -1;
312 0 0 if (n < 0) return -1;
313 0 0 if (raw == 0) {
319 0 0 if (end - buf - n < len) return -1;
329 0 0 if (n < 0) return -1;
332 0 0 for (i = 0; i < count; i++) {
335 0 0 if (tn < 0) return -1;
339 0 0 if (dn < 0) return -1;
341 0 0 if ((uint64_t)(end - p) < dlen) return -1;
356 5376 21 for (i = 0; i < 256; i++) {
358 43008 5376 for (j = 0; j < 8; j++) {
359 21504 21504 if (crc & 1)
372 0 37 if (!crc32c_table_inited) crc32c_init_table();
373 205072 37 for (i = 0; i < len; i++)
547 0 1 if (self->ssl) {
548 0 0 int ssl_len = (len > (size_t)INT_MAX) ? INT_MAX : (int)len;
551 0 0 if (ret <= 0) {
553 0 0 if (err == SSL_ERROR_WANT_READ) {
557 0 0 if (err == SSL_ERROR_WANT_WRITE) {
562 0 0 if (err == SSL_ERROR_ZERO_RETURN) return 0;
575 0 1 if (self->ssl) {
576 0 0 int ssl_len = (len > (size_t)INT_MAX) ? INT_MAX : (int)len;
579 0 0 if (ret <= 0) {
581 0 0 if (err == SSL_ERROR_WANT_WRITE) {
585 0 0 if (err == SSL_ERROR_WANT_READ) {
605 1 0 if (self->rbuf_cap >= need) return;
607 0 0 if (newcap < need) newcap = need;
613 1 0 if (self->wbuf_cap >= need) return;
615 0 0 if (newcap < need) newcap = need;
625 1 0 if (!self->reading && self->fd >= 0) {
1 0 if (!self->reading && self->fd >= 0) {
632 1 8 if (self->reading) {
639 3 0 if (!self->writing && self->fd >= 0) {
3 0 if (!self->writing && self->fd >= 0) {
646 3 9 if (self->writing) {
657 0 1 if (!self->on_error)
665 0 1 PUSHMARK(SP);
666 0 1 XPUSHs(sv_2mortal(newSVpv(msg, 0)));
669 1 0 if (SvTRUE(ERRSV)) {
0 1 if (SvTRUE(ERRSV)) {
670 0 0 warn("EV::Kafka::Conn: on_error callback error: %s", SvPV_nolen(ERRSV));
671 0 0 sv_setsv(ERRSV, &PL_sv_undef);
673 1 0 FREETMPS;
680 0 1 if (!self->on_connect) return;
687 0 1 PUSHMARK(SP);
690 1 0 if (SvTRUE(ERRSV)) {
0 1 if (SvTRUE(ERRSV)) {
691 0 0 warn("EV::Kafka::Conn: on_connect callback error: %s", SvPV_nolen(ERRSV));
692 0 0 sv_setsv(ERRSV, &PL_sv_undef);
694 0 1 FREETMPS;
701 2 0 if (!self->on_disconnect) return;
708 0 0 PUSHMARK(SP);
711 0 0 if (SvTRUE(ERRSV)) {
0 0 if (SvTRUE(ERRSV)) {
712 0 0 warn("EV::Kafka::Conn: on_disconnect callback error: %s", SvPV_nolen(ERRSV));
713 0 0 sv_setsv(ERRSV, &PL_sv_undef);
715 0 0 FREETMPS;
723 0 0 if (!cb) return;
730 0 0 PUSHMARK(SP);
731 0 0 EXTEND(SP, 2);
732 0 0 PUSHs(result ? result : &PL_sv_undef);
733 0 0 PUSHs(error ? error : &PL_sv_undef);
736 0 0 if (SvTRUE(ERRSV)) {
0 0 if (SvTRUE(ERRSV)) {
737 0 0 warn("EV::Kafka::Conn: callback error: %s", SvPV_nolen(ERRSV));
738 0 0 sv_setsv(ERRSV, &PL_sv_undef);
740 0 0 FREETMPS;
757 0 9 if (self->timing) {
762 0 9 if (self->ssl) {
766 0 9 if (self->ssl_ctx) {
771 2 7 if (self->fd >= 0) {
780 9 0 SV *err_sv = in_destruct ? NULL : newSVpv(err, 0);
783 0 9 while (!ngx_queue_empty(&self->cb_queue)) {
788 0 0 if (cbt->cb && !cbt->internal && !in_destruct) {
0 0 if (cbt->cb && !cbt->internal && !in_destruct) {
0 0 if (cbt->cb && !cbt->internal && !in_destruct) {
790 0 0 if (conn_check_destroyed(self)) {
797 0 0 if (cbt->cb) SvREFCNT_dec(cbt->cb);
800 9 0 if (err_sv) SvREFCNT_dec(err_sv);
807 0 2 if (conn_check_destroyed(self)) return;
810 0 2 if (conn_check_destroyed(self)) return;
812 1 1 if (!self->intentional_disconnect && self->auto_reconnect) {
0 1 if (!self->intentional_disconnect && self->auto_reconnect) {
828 0 0 if (self->magic != KF_MAGIC_ALIVE) return;
829 0 0 if (self->state != CONN_DISCONNECTED) return;
830 0 0 if (!self->host) return;
836 0 0 if (self->reconnect_timing) return;
838 0 0 if (delay < 0.01) delay = 1.0;
864 1 0 int flexible = (api_key == API_API_VERSIONS && api_version >= 3);
0 1 int flexible = (api_key == API_API_VERSIONS && api_version >= 3);
870 0 1 if (flexible) {
880 0 1 if (raw_size > (size_t)INT32_MAX) {
887 0 1 if (self->wbuf_off > 0 && self->wbuf_off > self->wbuf_len / 2) {
0 0 if (self->wbuf_off > 0 && self->wbuf_off > self->wbuf_len / 2) {
889 0 0 if (self->wbuf_len > 0)
909 1 0 if (!no_response) {
916 0 1 if (cb) {
951 64 1 for (i = 0; i < API_VERSIONS_MAX_KEY; i++)
954 0 1 if (end - p < 2) goto err;
956 0 1 if (error_code != 0) {
960 0 0 if (conn_check_destroyed(self)) return;
966 0 1 if (end - p < 4) goto err;
969 2 1 for (i = 0; i < count; i++) {
970 0 2 if (end - p < 6) goto err;
975 2 0 if (key >= 0 && key < API_VERSIONS_MAX_KEY)
2 0 if (key >= 0 && key < API_VERSIONS_MAX_KEY)
982 0 1 if (self->sasl_mechanism) {
993 0 0 if (conn_check_destroyed(self)) return;
1020 0 0 if (end - p < 2) goto err;
1024 0 0 if (end - p < 4) goto err;
1027 0 0 for (i = 0; i < count; i++) {
1030 0 0 if (n < 0) goto err;
1034 0 0 if (error_code != 0) {
1036 0 0 if (conn_check_destroyed(self)) return;
1047 0 0 if (conn_check_destroyed(self)) return;
1060 0 0 if (self->sasl_mechanism && strcmp(self->sasl_mechanism, "PLAIN") == 0) {
0 0 if (self->sasl_mechanism && strcmp(self->sasl_mechanism, "PLAIN") == 0) {
1062 0 0 STRLEN ulen = self->sasl_username ? strlen(self->sasl_username) : 0;
1063 0 0 STRLEN plen = self->sasl_password ? strlen(self->sasl_password) : 0;
1068 0 0 if (ulen > 0) kf_buf_append(&body, self->sasl_username, ulen);
1070 0 0 if (plen > 0) kf_buf_append(&body, self->sasl_password, plen);
1073 0 0 else if (self->sasl_mechanism &&
1074 0 0 (strcmp(self->sasl_mechanism, "SCRAM-SHA-256") == 0 ||
1075 0 0 strcmp(self->sasl_mechanism, "SCRAM-SHA-512") == 0)) {
1082 0 0 for (i = 0; i < 16; i++)
1088 0 0 if (self->scram_nonce) Safefree(self->scram_nonce);
1092 0 0 if (!self->sasl_username) {
1105 0 0 if (self->scram_client_first) Safefree(self->scram_client_first);
1124 0 0 if (ver < 0) ver = 1;
1125 0 0 if (ver > 2) ver = 2;
1137 0 0 if (end - p < 2) goto err;
1144 0 0 if (n < 0) goto err;
1151 0 0 if (end - p >= 4) {
1153 0 0 if (auth_data_len > 0 && end - p >= auth_data_len) {
0 0 if (auth_data_len > 0 && end - p >= auth_data_len) {
1159 0 0 if (error_code != 0) {
1161 0 0 if (errmsg_str && errmsg_len > 0)
0 0 if (errmsg_str && errmsg_len > 0)
1166 0 0 if (conn_check_destroyed(self)) return;
1173 0 0 if (self->sasl_mechanism && self->scram_step == SCRAM_STEP_CLIENT_FIRST && auth_data) {
0 0 if (self->sasl_mechanism && self->scram_step == SCRAM_STEP_CLIENT_FIRST && auth_data) {
0 0 if (self->sasl_mechanism && self->scram_step == SCRAM_STEP_CLIENT_FIRST && auth_data) {
1184 0 0 while (sp < se) {
1185 0 0 if (sp + 2 <= se && sp[0] == 'r' && sp[1] == '=') {
0 0 if (sp + 2 <= se && sp[0] == 'r' && sp[1] == '=') {
0 0 if (sp + 2 <= se && sp[0] == 'r' && sp[1] == '=') {
1187 0 0 while (sp < se && *sp != ',') sp++;
0 0 while (sp < se && *sp != ',') sp++;
1189 0 0 } else if (sp + 2 <= se && sp[0] == 's' && sp[1] == '=') {
0 0 } else if (sp + 2 <= se && sp[0] == 's' && sp[1] == '=') {
0 0 } else if (sp + 2 <= se && sp[0] == 's' && sp[1] == '=') {
1191 0 0 while (sp < se && *sp != ',') sp++;
0 0 while (sp < se && *sp != ',') sp++;
1193 0 0 } else if (sp + 2 <= se && sp[0] == 'i' && sp[1] == '=') {
0 0 } else if (sp + 2 <= se && sp[0] == 'i' && sp[1] == '=') {
0 0 } else if (sp + 2 <= se && sp[0] == 'i' && sp[1] == '=') {
1196 0 0 while (sp < se && *sp != ',') sp++;
0 0 while (sp < se && *sp != ',') sp++;
1198 0 0 if (sp < se && *sp == ',') sp++;
0 0 if (sp < se && *sp == ',') sp++;
1203 0 0 if (!server_nonce || !salt_b64 || iterations <= 0) {
0 0 if (!server_nonce || !salt_b64 || iterations <= 0) {
0 0 if (!server_nonce || !salt_b64 || iterations <= 0) {
1205 0 0 if (conn_check_destroyed(self)) return;
1211 0 0 if (server_nonce_len < 32 ||
1212 0 0 memcmp(server_nonce, self->scram_nonce, 32) != 0) {
1214 0 0 if (conn_check_destroyed(self)) return;
1220 0 0 const EVP_MD *md = is_sha512 ? EVP_sha512() : EVP_sha256();
1221 0 0 int digest_len = is_sha512 ? 64 : 32;
1233 0 0 if (salt_len <= 0) {
1235 0 0 if (conn_check_destroyed(self)) return;
1292 0 0 for (di = 0; di < digest_len; di++)
1296 0 0 if (self->scram_auth_message) Safefree(self->scram_auth_message);
1313 0 0 int plen = bptr->length < 255 ? (int)bptr->length : 255;
1338 0 0 if (ver < 0) ver = 1;
1339 0 0 if (ver > 2) ver = 2;
1347 0 0 if (self->sasl_mechanism && self->scram_step == SCRAM_STEP_CLIENT_FINAL) {
0 0 if (self->sasl_mechanism && self->scram_step == SCRAM_STEP_CLIENT_FINAL) {
1350 0 0 if (!auth_data || auth_data_len < 2 || auth_data[0] != 'v' || auth_data[1] != '=') {
0 0 if (!auth_data || auth_data_len < 2 || auth_data[0] != 'v' || auth_data[1] != '=') {
0 0 if (!auth_data || auth_data_len < 2 || auth_data[0] != 'v' || auth_data[1] != '=') {
0 0 if (!auth_data || auth_data_len < 2 || auth_data[0] != 'v' || auth_data[1] != '=') {
1352 0 0 if (conn_check_destroyed(self)) return;
1361 0 0 if (strcmp(self->sasl_mechanism, "SCRAM-SHA-256") == 0) md = EVP_sha256();
1362 0 0 else if (strcmp(self->sasl_mechanism, "SCRAM-SHA-512") == 0) md = EVP_sha512();
1363 0 0 if (!md || self->scram_server_key_len <= 0 || !self->scram_auth_message) {
0 0 if (!md || self->scram_server_key_len <= 0 || !self->scram_auth_message) {
0 0 if (!md || self->scram_server_key_len <= 0 || !self->scram_auth_message) {
1365 0 0 if (conn_check_destroyed(self)) return;
1388 0 0 && (CRYPTO_memcmp(recv_sig, expected, expected_len) == 0);
0 0 && (CRYPTO_memcmp(recv_sig, expected, expected_len) == 0);
1393 0 0 if (self->scram_auth_message) {
1401 0 0 if (!ok) {
1403 0 0 if (conn_check_destroyed(self)) return;
1419 0 0 if (conn_check_destroyed(self)) return;
1431 1 0 if (cbt->internal) {
1448 0 0 if (!cbt->cb) return;
1567 140 1 if (key) {
1575 140 1 if (value) {
1583 1 140 if (headers && HvUSEDKEYS(headers) > 0) {
0 1 if (headers && HvUSEDKEYS(headers) > 0) {
1 0 if (headers && HvUSEDKEYS(headers) > 0) {
1584 0 1 kf_buf_append_varint(&rec, (int64_t)HvUSEDKEYS(headers));
1587 2 1 while ((entry = hv_iternext(headers))) {
1619 141 16 for (i = 0; i < count; i++) {
1621 141 0 if (!elem || !SvROK(*elem) || SvTYPE(SvRV(*elem)) != SVt_PVHV)
141 0 if (!elem || !SvROK(*elem) || SvTYPE(SvRV(*elem)) != SVt_PVHV)
0 141 if (!elem || !SvROK(*elem) || SvTYPE(SvRV(*elem)) != SVt_PVHV)
1628 141 16 for (i = 0; i < count; i++) {
1637 141 0 if (key_sv && SvOK(*key_sv)) key = SvPV(*key_sv, key_len);
140 1 if (key_sv && SvOK(*key_sv)) key = SvPV(*key_sv, key_len);
1638 141 0 if (val_sv && SvOK(*val_sv)) val = SvPV(*val_sv, val_len);
140 1 if (val_sv && SvOK(*val_sv)) val = SvPV(*val_sv, val_len);
1639 1 140 if (hdr_sv && SvROK(*hdr_sv) && SvTYPE(SvRV(*hdr_sv)) == SVt_PVHV)
1 0 if (hdr_sv && SvROK(*hdr_sv) && SvTYPE(SvRV(*hdr_sv)) == SVt_PVHV)
1 0 if (hdr_sv && SvROK(*hdr_sv) && SvTYPE(SvRV(*hdr_sv)) == SVt_PVHV)
1648 0 16 if (is_transactional) attrs |= 0x10; /* bit 4 = isTransactional */
1677 2 14 if (compression == COMPRESS_GZIP) {
1686 2 0 if (zinit == Z_OK) {
1691 2 0 if (deflate(&zs, Z_FINISH) == Z_STREAM_END) zok = 1;
1694 2 0 if (zok) {
1785 0 2 if (flexible) {
1787 0 0 if (end - p < 4) goto done;
1793 0 0 if (n < 0) goto done;
1798 0 0 for (i = 0; i < broker_count; i++) {
1800 0 0 if (end - p < 4) goto done;
1806 0 0 if (n < 0) goto done;
1808 0 0 hv_store(bh, "host", 4, newSVpvn(host ? host : "", host ? hlen : 0), 0);
0 0 hv_store(bh, "host", 4, newSVpvn(host ? host : "", host ? hlen : 0), 0);
1810 0 0 if (end - p < 4) goto done;
1817 0 0 if (n < 0) goto done;
1822 0 0 if (n < 0) goto done;
1831 0 0 if (n < 0) goto done;
1835 0 0 if (end - p < 4) goto done;
1841 0 0 if (n < 0) goto done;
1845 0 0 for (i = 0; i < topic_count; i++) {
1847 0 0 if (end - p < 2) goto done;
1853 0 0 if (n < 0) goto done;
1855 0 0 hv_store(th, "name", 4, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
0 0 hv_store(th, "name", 4, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
1858 0 0 if (version >= 10) {
1859 0 0 if (end - p < 16) goto done;
1864 0 0 if (end - p < 1) goto done;
1869 0 0 if (n < 0) goto done;
1875 0 0 for (j = 0; j < part_count; j++) {
1877 0 0 if (end - p < 2) goto done;
1881 0 0 if (end - p < 4) goto done;
1885 0 0 if (end - p < 4) goto done;
1890 0 0 if (version >= 7) {
1891 0 0 if (end - p < 4) goto done;
1897 0 0 if (n < 0) goto done;
1900 0 0 if (rcount < 0 || rcount > 65536) goto done;
0 0 if (rcount < 0 || rcount > 65536) goto done;
1901 0 0 if (end - p < (ptrdiff_t)((int64_t)rcount * 4)) goto done;
1906 0 0 if (n < 0) goto done;
1909 0 0 if (rcount < 0 || rcount > 65536) goto done;
0 0 if (rcount < 0 || rcount > 65536) goto done;
1910 0 0 if (end - p < (ptrdiff_t)((int64_t)rcount * 4)) goto done;
1914 0 0 if (version >= 5) {
1916 0 0 if (n < 0) goto done;
1919 0 0 if (end - p < (ptrdiff_t)(rcount * 4)) goto done;
1925 0 0 if (n < 0) goto done;
1933 0 0 if (version >= 8) {
1934 0 0 if (end - p < 4) goto done;
1940 0 0 if (n < 0) goto done;
1948 0 2 if (version >= 3) {
1949 0 0 if (end - p < 4) goto done;
1954 0 2 if (end - p < 4) goto done;
1956 2 0 if (broker_count < 0 || broker_count > 65536) goto done;
0 2 if (broker_count < 0 || broker_count > 65536) goto done;
1958 2 1 for (i = 0; i < broker_count; i++) {
1960 1 1 if (end - p < 4) goto done;
1966 0 1 if (n < 0) goto done;
1968 1 0 hv_store(bh, "host", 4, newSVpvn(host ? host : "", host ? hlen : 0), 0);
1 0 hv_store(bh, "host", 4, newSVpvn(host ? host : "", host ? hlen : 0), 0);
1970 0 1 if (end - p < 4) goto done;
1975 0 1 if (version >= 1) {
1978 0 0 if (n < 0) goto done;
1986 0 1 if (version >= 2) {
1989 0 0 if (n < 0) goto done;
1994 0 1 if (version >= 1) {
1995 0 0 if (end - p < 4) goto done;
2001 0 1 if (end - p < 4) goto done;
2003 1 0 if (topic_count < 0 || topic_count > 1000000) goto done;
0 1 if (topic_count < 0 || topic_count > 1000000) goto done;
2004 1 1 for (i = 0; i < topic_count; i++) {
2006 0 1 if (end - p < 2) goto done;
2012 0 1 if (n < 0) goto done;
2014 1 0 hv_store(th, "name", 4, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
1 0 hv_store(th, "name", 4, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
2017 0 1 if (version >= 1) {
2018 0 0 if (end - p < 1) goto done;
2023 0 1 if (end - p < 4) goto done;
2025 1 0 if (part_count < 0 || part_count > 1000000) goto done;
0 1 if (part_count < 0 || part_count > 1000000) goto done;
2028 1 1 for (j = 0; j < part_count; j++) {
2030 0 1 if (end - p < 2) goto done;
2034 0 1 if (end - p < 4) goto done;
2038 0 1 if (end - p < 4) goto done;
2043 0 1 if (version >= 7) {
2044 0 0 if (end - p < 4) goto done;
2049 0 1 if (end - p < 4) goto done;
2051 1 0 if (rcount < 0 || rcount > 65536) goto done;
0 1 if (rcount < 0 || rcount > 65536) goto done;
2052 0 1 if (end - p < (ptrdiff_t)((int64_t)rcount * 4)) goto done;
2056 0 1 if (end - p < 4) goto done;
2058 1 0 if (rcount < 0 || rcount > 65536) goto done;
0 1 if (rcount < 0 || rcount > 65536) goto done;
2059 0 1 if (end - p < (ptrdiff_t)((int64_t)rcount * 4)) goto done;
2063 0 1 if (version >= 5) {
2064 0 0 if (end - p < 4) goto done;
2066 0 0 if (end - p < (ptrdiff_t)(rcount * 4)) goto done;
2097 0 1 if (end - p < 4) goto done;
2101 1 1 for (i = 0; i < topic_count; i++) {
2105 0 1 if (n < 0) goto done;
2107 1 0 hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
1 0 hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
2110 0 1 if (end - p < 4) goto done;
2115 1 1 for (j = 0; j < part_count; j++) {
2117 0 1 if (end - p < 4) goto done;
2121 0 1 if (end - p < 2) goto done;
2125 0 1 if (end - p < 8) goto done;
2130 0 1 if (version >= 2) {
2131 0 0 if (end - p < 8) goto done;
2136 0 1 if (version >= 5) {
2137 0 0 if (end - p < 8) goto done;
2148 1 0 if (version >= 1 && end - p >= 4) {
0 0 if (version >= 1 && end - p >= 4) {
2171 0 16 if (end - p < 12) return -1;
2173 16 0 if (out_base_offset) *out_base_offset = base_offset;
2176 1 15 if (end - p < batch_length) return -1;
2179 0 15 if (batch_end - p < 9) return -1;
2182 1 14 if (magic != 2) return -1; /* only support magic=2 (current format) */
2185 1 13 if (crc32c(p, (size_t)(batch_end - p)) != expected_crc) return -1;
2187 0 13 if (batch_end - p < 36) return -1;
2197 0 13 if (batch_end - p < 4) return -1;
2205 7 6 if (compression_type != COMPRESS_NONE && batch_end > p) {
7 0 if (compression_type != COMPRESS_NONE && batch_end > p) {
2208 6 1 if (decomp_cap < 4096) decomp_cap = 4096;
2211 1 6 if (compression_type == COMPRESS_GZIP) {
2213 1 1 while (!zok && decomp_cap < 64 * 1024 * 1024) {
1 0 while (!zok && decomp_cap < 64 * 1024 * 1024) {
2218 0 1 if (zinit != Z_OK) {
2230 1 0 if (zret == Z_STREAM_END) {
2234 0 0 } else if (zret == Z_BUF_ERROR || zret == Z_OK) {
0 0 } else if (zret == Z_BUF_ERROR || zret == Z_OK) {
2313 138 13 for (i = 0; i < record_count; i++) {
2316 0 138 if (n < 0) { if (decompressed) Safefree(decompressed); return -1; }
0 0 if (n < 0) { if (decompressed) Safefree(decompressed); return -1; }
2318 0 138 if (rec_end - rp < rec_len) { if (decompressed) Safefree(decompressed); return -1; }
0 0 if (rec_end - rp < rec_len) { if (decompressed) Safefree(decompressed); return -1; }
2321 0 138 if (this_rec_end - rp < 1) { if (decompressed) Safefree(decompressed); return -1; }
0 0 if (this_rec_end - rp < 1) { if (decompressed) Safefree(decompressed); return -1; }
2326 0 138 if (n < 0) { if (decompressed) Safefree(decompressed); return -1; }
0 0 if (n < 0) { if (decompressed) Safefree(decompressed); return -1; }
2331 0 138 if (n < 0) { if (decompressed) Safefree(decompressed); return -1; }
0 0 if (n < 0) { if (decompressed) Safefree(decompressed); return -1; }
2337 0 138 if (n < 0) { if (decompressed) Safefree(decompressed); return -1; }
0 0 if (n < 0) { if (decompressed) Safefree(decompressed); return -1; }
2340 137 1 if (key_len >= 0) {
2341 0 137 if (this_rec_end - rp < key_len) { if (decompressed) Safefree(decompressed); return -1; }
0 0 if (this_rec_end - rp < key_len) { if (decompressed) Safefree(decompressed); return -1; }
2349 0 138 if (n < 0) { if (decompressed) Safefree(decompressed); return -1; }
0 0 if (n < 0) { if (decompressed) Safefree(decompressed); return -1; }
2352 137 1 if (val_len >= 0) {
2353 0 137 if (this_rec_end - rp < val_len) { if (decompressed) Safefree(decompressed); return -1; }
0 0 if (this_rec_end - rp < val_len) { if (decompressed) Safefree(decompressed); return -1; }
2361 0 138 if (n < 0) { if (decompressed) Safefree(decompressed); return -1; }
0 0 if (n < 0) { if (decompressed) Safefree(decompressed); return -1; }
2364 1 137 if (hdr_count > 0) {
2367 2 1 for (h = 0; h < hdr_count; h++) {
2370 0 2 if (n < 0) { SvREFCNT_dec((SV*)hdr_hv); if (decompressed) Safefree(decompressed); return -1; }
0 0 if (n < 0) { SvREFCNT_dec((SV*)hdr_hv); if (decompressed) Safefree(decompressed); return -1; }
2373 0 2 if (this_rec_end - rp < hk_len) { SvREFCNT_dec((SV*)hdr_hv); if (decompressed) Safefree(decompressed); return -1; }
0 0 if (this_rec_end - rp < hk_len) { SvREFCNT_dec((SV*)hdr_hv); if (decompressed) Safefree(decompressed); return -1; }
2378 0 2 if (n < 0) { SvREFCNT_dec((SV*)hdr_hv); if (decompressed) Safefree(decompressed); return -1; }
0 0 if (n < 0) { SvREFCNT_dec((SV*)hdr_hv); if (decompressed) Safefree(decompressed); return -1; }
2381 2 0 if (hv_len >= 0) {
2382 0 2 if (this_rec_end - rp < hv_len) { SvREFCNT_dec((SV*)hdr_hv); if (decompressed) Safefree(decompressed); return -1; }
0 0 if (this_rec_end - rp < hv_len) { SvREFCNT_dec((SV*)hdr_hv); if (decompressed) Safefree(decompressed); return -1; }
2386 2 0 hv_store(hdr_hv, hk_data, (I32)hk_len,
2394 137 1 hv_store(rec_hv, "key", 3,
2396 137 1 hv_store(rec_hv, "value", 5,
2398 1 137 if (hdr_hv)
2406 1 12 if (decompressed) Safefree(decompressed);
2423 0 0 if (version >= 1) {
2424 0 0 if (end - p < 4) goto done;
2430 0 0 if (version >= 7) {
2431 0 0 if (end - p < 2) goto done;
2436 0 0 if (version >= 7) {
2437 0 0 if (end - p < 4) goto done;
2442 0 0 if (end - p < 4) goto done;
2446 0 0 for (i = 0; i < topic_count; i++) {
2451 0 0 if (n < 0) goto done;
2453 0 0 hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
0 0 hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
2456 0 0 if (end - p < 4) goto done;
2461 0 0 for (j = 0; j < part_count; j++) {
2464 0 0 if (end - p < 4) goto done;
2468 0 0 if (end - p < 2) goto done;
2472 0 0 if (end - p < 8) goto done;
2477 0 0 if (version >= 4) {
2478 0 0 if (end - p < 8) goto done;
2484 0 0 if (version >= 5) {
2485 0 0 if (end - p < 8) goto done;
2490 0 0 if (version >= 4) {
2491 0 0 if (end - p < 4) goto done;
2494 0 0 for (at = 0; at < at_count; at++) {
2495 0 0 if (end - p < 16) goto done;
2501 0 0 if (end - p < 4) goto done;
2505 0 0 if (records_size > 0 && end - p >= records_size) {
0 0 if (records_size > 0 && end - p >= records_size) {
2510 0 0 while (rp < rend && rend - rp >= 12) {
0 0 while (rp < rend && rend - rp >= 12) {
2513 0 0 if (bl < 0 || rend - rp < 12 + bl) break;
0 0 if (bl < 0 || rend - rp < 12 + bl) break;
2519 0 0 } else if (records_size > 0) {
2549 0 0 if (version >= 2) {
2550 0 0 if (end - p < 4) goto done;
2555 0 0 if (end - p < 4) goto done;
2559 0 0 for (i = 0; i < topic_count; i++) {
2563 0 0 if (n < 0) goto done;
2565 0 0 hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
0 0 hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
2567 0 0 if (end - p < 4) goto done;
2572 0 0 for (j = 0; j < part_count; j++) {
2574 0 0 if (end - p < 4) goto done;
2578 0 0 if (end - p < 2) goto done;
2582 0 0 if (version >= 1) {
2583 0 0 if (end - p < 8) goto done;
2588 0 0 if (end - p < 8) goto done;
2593 0 0 if (version >= 4) {
2594 0 0 if (end - p < 4) goto done;
2620 0 1 if (version >= 1) {
2621 0 0 if (end - p < 4) goto done;
2625 0 1 if (end - p < 2) goto done;
2630 0 1 if (version >= 1) {
2633 0 0 if (n < 0) goto done;
2635 0 0 if (emsg && elen > 0)
0 0 if (emsg && elen > 0)
2639 0 1 if (end - p < 4) goto done;
2645 0 1 if (n < 0) goto done;
2647 1 0 hv_store(result, "host", 4, newSVpvn(host ? host : "", host ? hlen : 0), 0);
1 0 hv_store(result, "host", 4, newSVpvn(host ? host : "", host ? hlen : 0), 0);
2649 0 1 if (end - p < 4) goto done;
2668 0 1 if (version >= 2) {
2669 0 0 if (end - p < 4) goto done;
2673 0 1 if (end - p < 2) goto done;
2677 0 1 if (end - p < 4) goto done;
2685 0 1 if (n < 0) goto done;
2687 1 0 if (proto)
2692 0 1 if (n < 0) goto done;
2694 1 0 if (leader)
2701 0 1 if (n < 0) goto done;
2703 1 0 if (member_id)
2707 0 1 if (end - p < 4) goto done;
2711 1 1 for (i = 0; i < mcount; i++) {
2716 0 1 if (n < 0) goto done;
2718 1 0 if (mid)
2722 0 1 if (version >= 5) {
2725 0 0 if (n < 0) goto done;
2730 0 1 if (end - p < 4) goto done;
2732 0 1 if (mdlen > 0) {
2733 0 0 if (end - p < mdlen) goto done;
2756 0 1 if (version >= 1) {
2757 0 0 if (end - p < 4) goto done;
2761 0 1 if (end - p < 2) goto done;
2766 0 1 if (end - p < 4) goto done;
2768 0 1 if (alen > 0 && end - p >= alen) {
0 1 if (alen > 0 && end - p >= alen) {
2786 0 1 if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */
0 0 if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */
2787 1 0 if (end - p >= 2) {
2806 0 0 if (version >= 3 && end - p >= 4) p += 4; /* throttle_time_ms */
0 0 if (version >= 3 && end - p >= 4) p += 4; /* throttle_time_ms */
2808 0 0 if (end - p < 4) goto done;
2811 0 0 for (i = 0; i < tc; i++) {
2815 0 0 if (n < 0) goto done;
2817 0 0 hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
0 0 hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
2819 0 0 if (end - p < 4) goto done;
2823 0 0 for (j = 0; j < pc; j++) {
2825 0 0 if (end - p < 6) goto done;
2852 0 1 if (version >= 3 && end - p >= 4) p += 4; /* throttle_time_ms */
0 0 if (version >= 3 && end - p >= 4) p += 4; /* throttle_time_ms */
2854 0 1 if (end - p < 4) goto done;
2857 1 1 for (i = 0; i < tc; i++) {
2861 0 1 if (n < 0) goto done;
2863 1 0 hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
1 0 hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
2865 0 1 if (end - p < 4) goto done;
2869 1 1 for (j = 0; j < pc; j++) {
2871 0 1 if (end - p < 4) goto done;
2875 0 1 if (end - p < 8) goto done;
2880 0 1 if (version >= 5 && end - p >= 4) p += 4;
0 0 if (version >= 5 && end - p >= 4) p += 4;
2884 0 1 if (n < 0) goto done;
2887 0 1 if (end - p < 2) goto done;
2898 1 0 if (version >= 2 && end - p >= 2) {
0 0 if (version >= 2 && end - p >= 2) {
2917 0 1 if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */
0 0 if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */
2918 1 0 if (end - p >= 2) {
2937 0 1 if (version >= 2 && end - p >= 4) p += 4; /* throttle_time_ms */
0 0 if (version >= 2 && end - p >= 4) p += 4; /* throttle_time_ms */
2939 0 1 if (end - p < 4) goto done;
2942 1 1 for (i = 0; i < tc; i++) {
2946 0 1 if (n < 0) goto done;
2948 1 0 hv_store(th, "name", 4, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
1 0 hv_store(th, "name", 4, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
2950 0 1 if (end - p < 2) goto done;
2955 0 1 if (version >= 1) {
2958 0 0 if (n < 0) goto done;
2960 0 0 if (emsg && elen > 0)
0 0 if (emsg && elen > 0)
2983 0 1 if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */
0 0 if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */
2985 0 1 if (end - p < 4) goto done;
2988 1 1 for (i = 0; i < tc; i++) {
2992 0 1 if (n < 0) goto done;
2994 1 0 hv_store(th, "name", 4, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
1 0 hv_store(th, "name", 4, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
2996 0 1 if (end - p < 2) goto done;
3017 1 0 if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */
1 0 if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */
3019 0 1 if (end - p < 2) goto done;
3023 0 1 if (end - p < 8) goto done;
3027 0 1 if (end - p < 2) goto done;
3046 0 1 if (end - p < 4) goto done;
3049 0 1 if (end - p < 4) goto done;
3052 1 1 for (i = 0; i < tc; i++) {
3056 0 1 if (n < 0) goto done;
3058 1 0 hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
1 0 hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
3060 0 1 if (end - p < 4) goto done;
3064 1 1 for (j = 0; j < pc; j++) {
3066 0 1 if (end - p < 6) goto done;
3091 1 0 if (end - p >= 4) p += 4; /* throttle_time_ms */
3092 1 0 if (end - p >= 2) {
3111 0 1 if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */
0 0 if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */
3113 0 1 if (end - p < 4) goto done;
3116 0 1 for (i = 0; i < tc; i++) {
3120 0 0 if (n < 0) goto done;
3122 0 0 hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
0 0 hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
3124 0 0 if (end - p < 4) goto done;
3128 0 0 for (j = 0; j < pc; j++) {
3130 0 0 if (end - p < 6) goto done;
3151 1 1 while (self->rbuf_len >= 4) {
3153 1 0 if (msg_size < 0 || msg_size > 256 * 1024 * 1024) {
0 1 if (msg_size < 0 || msg_size > 256 * 1024 * 1024) {
3155 0 0 if (conn_check_destroyed(self)) return;
3160 0 1 if (self->rbuf_len < (size_t)(4 + msg_size))
3166 0 1 if (msg_size < 4) {
3168 0 0 if (conn_check_destroyed(self)) return;
3179 1 0 if (!ngx_queue_empty(&self->cb_queue)) {
3182 0 1 if (cbt->correlation_id != corr_id) {
3189 0 0 if (conn_check_destroyed(self)) return;
3199 1 0 if (cbt) {
3201 0 1 if (cbt->cb) SvREFCNT_dec(cbt->cb);
3203 0 1 if (conn_check_destroyed(self)) return;
3208 1 0 if (self->rbuf_len > 0)
3222 4 0 if (!self || self->magic != KF_MAGIC_ALIVE) return;
0 4 if (!self || self->magic != KF_MAGIC_ALIVE) return;
3225 2 2 if (self->state == CONN_CONNECTING) {
3229 2 0 if (self->timing) {
3235 0 2 if (getsockopt(self->fd, SOL_SOCKET, SO_ERROR, &err, &errlen) < 0)
3237 1 1 if (err != 0) {
3241 0 1 if (conn_check_destroyed(self)) return;
3252 0 2 if (self->state == CONN_TLS_HANDSHAKE) {
3255 0 0 if (ret == 1) {
3265 0 0 if (err == SSL_ERROR_WANT_READ) {
3268 0 0 } else if (err == SSL_ERROR_WANT_WRITE) {
3274 0 0 if (e) {
3282 0 0 if (conn_check_destroyed(self)) return;
3290 1 1 if (revents & EV_WRITE) {
3291 1 1 while (self->wbuf_off < self->wbuf_len) {
3294 0 1 if (n < 0) {
3295 0 0 if (errno == EAGAIN || errno == EWOULDBLOCK) break;
0 0 if (errno == EAGAIN || errno == EWOULDBLOCK) break;
3297 0 0 if (conn_check_destroyed(self)) return;
3301 0 1 if (n == 0) {
3308 1 0 if (self->wbuf_off >= self->wbuf_len) {
3316 1 1 if (revents & EV_READ) {
3320 0 1 if (n < 0) {
3321 0 0 if (errno == EAGAIN || errno == EWOULDBLOCK) return;
0 0 if (errno == EAGAIN || errno == EWOULDBLOCK) return;
3323 0 0 if (conn_check_destroyed(self)) return;
3327 0 1 if (n == 0) {
3348 0 0 if (self->magic != KF_MAGIC_ALIVE) return;
3351 0 0 if (conn_check_destroyed(self)) return;
3372 0 1 if (self->tls_enabled) {
3374 0 0 if (!self->ssl_ctx) {
3376 0 0 if (conn_check_destroyed(self)) return;
3381 0 0 if (self->tls_skip_verify)
3386 0 0 if (self->tls_ca_file) {
3387 0 0 if (SSL_CTX_load_verify_locations(self->ssl_ctx, self->tls_ca_file, NULL) != 1) {
3389 0 0 if (conn_check_destroyed(self)) return;
3396 0 0 if (!self->ssl) {
3398 0 0 if (conn_check_destroyed(self)) return;
3404 0 0 if (!is_ip_literal(self->host))
3407 0 0 if (!self->tls_skip_verify) {
3410 0 0 if (is_ip_literal(self->host))
3419 0 0 if (ret == 1) {
3426 0 0 if (err == SSL_ERROR_WANT_READ) {
3428 0 0 } else if (err == SSL_ERROR_WANT_WRITE) {
3433 0 0 if (e) {
3441 0 0 if (conn_check_destroyed(self)) return;
3460 0 2 if (self->state != CONN_DISCONNECTED) {
3465 2 0 if (host != self->host) {
3466 0 2 if (self->host) Safefree(self->host);
3484 0 2 if (gai_err != 0) {
3488 0 2 if (gai_err != 0) {
3495 2 0 for (rp = res; rp; rp = rp->ai_next) {
3497 0 2 if (fd < 0) continue;
3509 0 2 if (ret == 0) {
3524 2 0 if (errno == EINPROGRESS) {
3536 2 0 if (timeout > 0) {
3567 0 7 if (SvOK(loop_sv) && sv_derived_from(loop_sv, "EV::Loop"))
0 0 if (SvOK(loop_sv) && sv_derived_from(loop_sv, "EV::Loop"))
3596 448 7 for (i = 0; i < API_VERSIONS_MAX_KEY; i++)
3611 0 7 if (self->magic != KF_MAGIC_ALIVE) return;
3619 0 7 if (self->reconnect_timing) {
3624 2 5 CLEAR_HANDLER(self->on_error);
3625 1 6 CLEAR_HANDLER(self->on_connect);
3626 0 7 CLEAR_HANDLER(self->on_disconnect);
3628 2 5 if (self->host) Safefree(self->host);
3629 7 0 if (self->client_id) Safefree(self->client_id);
3630 0 7 if (self->sasl_mechanism) Safefree(self->sasl_mechanism);
3631 0 7 if (self->sasl_username) Safefree(self->sasl_username);
3632 0 7 if (self->sasl_password) Safefree(self->sasl_password);
3633 0 7 if (self->scram_nonce) Safefree(self->scram_nonce);
3634 0 7 if (self->scram_client_first) Safefree(self->scram_client_first);
3637 0 7 if (self->scram_auth_message) {
3642 0 7 if (self->tls_ca_file) Safefree(self->tls_ca_file);
3643 7 0 if (self->rbuf) Safefree(self->rbuf);
3644 7 0 if (self->wbuf) Safefree(self->wbuf);
3661 0 1 if (self->reconnect_timing) {
3671 0 3 RETVAL = (self->state == CONN_READY) ? 1 : 0;
3678 0 0 RETVAL = self->state;
3685 0 0 RETVAL = self->pending_count;
3693 0 2 CLEAR_HANDLER(self->on_error);
3694 2 0 if (cb && SvOK(cb)) {
2 0 if (cb && SvOK(cb)) {
3703 0 1 CLEAR_HANDLER(self->on_connect);
3704 1 0 if (cb && SvOK(cb)) {
1 0 if (cb && SvOK(cb)) {
3713 0 0 CLEAR_HANDLER(self->on_disconnect);
3714 0 0 if (cb && SvOK(cb)) {
0 0 if (cb && SvOK(cb)) {
3723 0 0 if (id) {
3724 0 0 if (self->client_id) Safefree(self->client_id);
3735 0 0 if (self->tls_ca_file) { Safefree(self->tls_ca_file); self->tls_ca_file = NULL; }
3736 0 0 if (ca_file) self->tls_ca_file = savepv(ca_file);
3744 0 0 if (self->sasl_mechanism) { Safefree(self->sasl_mechanism); self->sasl_mechanism = NULL; }
3745 0 0 if (self->sasl_username) { Safefree(self->sasl_username); self->sasl_username = NULL; }
3746 0 0 if (self->sasl_password) { Safefree(self->sasl_password); self->sasl_password = NULL; }
3747 0 0 if (SvOK(ST(1))) {
3749 0 0 if (username) self->sasl_username = savepv(username);
3750 0 0 if (password) self->sasl_password = savepv(password);
3766 1 0 if (self->state != CONN_READY)
3774 0 0 if (ver < 0) ver = 1;
3775 0 0 if (ver > 4) ver = 4;
3777 0 0 if (SvOK(topics_sv) && SvROK(topics_sv) && SvTYPE(SvRV(topics_sv)) == SVt_PVAV) {
0 0 if (SvOK(topics_sv) && SvROK(topics_sv) && SvTYPE(SvRV(topics_sv)) == SVt_PVAV) {
0 0 if (SvOK(topics_sv) && SvROK(topics_sv) && SvTYPE(SvRV(topics_sv)) == SVt_PVAV) {
3781 0 0 for (i = 0; i < count; i++) {
3792 0 0 if (ver >= 4)
3803 0 0 if (!self->api_versions_known)
3808 0 0 for (i = 0; i < API_VERSIONS_MAX_KEY; i++) {
3809 0 0 if (self->api_versions[i] >= 0) {
3815 0 0 EXTEND(SP, 1);
3824 1 0 if (self->state != CONN_READY)
3831 0 0 if (arg1 && SvROK(arg1) && SvTYPE(SvRV(arg1)) == SVt_PVHV) {
0 0 if (arg1 && SvROK(arg1) && SvTYPE(SvRV(arg1)) == SVt_PVHV) {
0 0 if (arg1 && SvROK(arg1) && SvTYPE(SvRV(arg1)) == SVt_PVHV) {
3841 0 0 if (opts) {
3843 0 0 if ((v = hv_fetchs(opts, "max_bytes", 0)) && SvOK(*v))
0 0 if ((v = hv_fetchs(opts, "max_bytes", 0)) && SvOK(*v))
3845 0 0 if ((v = hv_fetchs(opts, "max_wait_ms", 0)) && SvOK(*v))
0 0 if ((v = hv_fetchs(opts, "max_wait_ms", 0)) && SvOK(*v))
3847 0 0 if ((v = hv_fetchs(opts, "min_bytes", 0)) && SvOK(*v))
0 0 if ((v = hv_fetchs(opts, "min_bytes", 0)) && SvOK(*v))
3855 0 0 if (ver < 0) ver = 4;
3856 0 0 if (ver > 7) ver = 7;
3866 0 0 if (ver >= 3)
3870 0 0 if (ver >= 4)
3874 0 0 if (ver >= 7) {
3891 0 0 if (ver >= 5)
3898 0 0 if (ver >= 7)
3909 0 0 if (self->state != CONN_READY)
3914 0 0 if (!SvROK(topics_sv) || SvTYPE(SvRV(topics_sv)) != SVt_PVHV)
0 0 if (!SvROK(topics_sv) || SvTYPE(SvRV(topics_sv)) != SVt_PVHV)
3920 0 0 if (arg1 && SvROK(arg1) && SvTYPE(SvRV(arg1)) == SVt_PVHV) {
0 0 if (arg1 && SvROK(arg1) && SvTYPE(SvRV(arg1)) == SVt_PVHV) {
0 0 if (arg1 && SvROK(arg1) && SvTYPE(SvRV(arg1)) == SVt_PVHV) {
3930 0 0 if (opts) {
3932 0 0 if ((v = hv_fetchs(opts, "max_bytes", 0)) && SvOK(*v))
0 0 if ((v = hv_fetchs(opts, "max_bytes", 0)) && SvOK(*v))
3934 0 0 if ((v = hv_fetchs(opts, "max_wait_ms", 0)) && SvOK(*v))
0 0 if ((v = hv_fetchs(opts, "max_wait_ms", 0)) && SvOK(*v))
3936 0 0 if ((v = hv_fetchs(opts, "min_bytes", 0)) && SvOK(*v))
0 0 if ((v = hv_fetchs(opts, "min_bytes", 0)) && SvOK(*v))
3941 0 0 if (ver < 0) ver = 4;
3942 0 0 if (ver > 7) ver = 7;
3950 0 0 if (ver >= 3)
3952 0 0 if (ver >= 4)
3954 0 0 if (ver >= 7) {
3960 0 0 kf_buf_append_i32(&body, (int32_t)HvUSEDKEYS(topics_hv));
3964 0 0 while ((entry = hv_iternext(topics_hv))) {
3970 0 0 if (!SvROK(parts_sv) || SvTYPE(SvRV(parts_sv)) != SVt_PVAV)
0 0 if (!SvROK(parts_sv) || SvTYPE(SvRV(parts_sv)) != SVt_PVAV)
3976 0 0 for (i = 0; i < pc; i++) {
3978 0 0 if (!elem || !SvROK(*elem))
0 0 if (!elem || !SvROK(*elem))
3983 0 0 int32_t pid = pid_sv ? (int32_t)SvIV(*pid_sv) : 0;
3987 0 0 int64_t offset = off_sv ? (int64_t)SvIV(*off_sv) : 0;
3990 0 0 if (ver >= 5)
3997 0 0 if (ver >= 7)
4008 0 0 if (self->state != CONN_READY)
4011 0 0 if (!SvROK(records_sv) || SvTYPE(SvRV(records_sv)) != SVt_PVAV)
0 0 if (!SvROK(records_sv) || SvTYPE(SvRV(records_sv)) != SVt_PVAV)
4023 0 0 if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV) {
0 0 if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV) {
0 0 if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV) {
4026 0 0 if ((tmp = hv_fetch(opts, "acks", 4, 0)))
4028 0 0 if ((tmp = hv_fetch(opts, "transactional_id", 16, 0)) && SvOK(*tmp))
0 0 if ((tmp = hv_fetch(opts, "transactional_id", 16, 0)) && SvOK(*tmp))
4030 0 0 if ((tmp = hv_fetch(opts, "compression", 11, 0))) {
4033 0 0 if (clen == 4 && memcmp(cstr, "none", 4) == 0) compression = COMPRESS_NONE;
0 0 if (clen == 4 && memcmp(cstr, "none", 4) == 0) compression = COMPRESS_NONE;
4038 0 0 else if (clen == 4 && memcmp(cstr, "gzip", 4) == 0) compression = COMPRESS_GZIP;
0 0 else if (clen == 4 && memcmp(cstr, "gzip", 4) == 0) compression = COMPRESS_GZIP;
4048 0 0 if ((tmp = hv_fetch(opts, "producer_id", 11, 0)))
4050 0 0 if ((tmp = hv_fetch(opts, "producer_epoch", 14, 0)))
4052 0 0 if ((tmp = hv_fetch(opts, "base_sequence", 13, 0)))
4054 0 0 } else if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVCV) {
0 0 } else if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVCV) {
0 0 } else if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVCV) {
4069 0 0 if (ver < 0) ver = 3;
4070 0 0 if (ver > 7) ver = 7;
4077 0 0 if (ver >= 3)
4078 0 0 kf_buf_append_nullable_string(&body, txn_id, txn_id ? (int16_t)txn_id_len : 0);
4090 0 0 conn_send_request(aTHX_ self, API_PRODUCE, ver, &body,
4091 0 0 (cb && SvOK(cb)) ? cb : NULL, 0, (acks == 0) ? 1 : 0);
4101 0 0 if (self->state != CONN_READY)
4109 0 0 if (ver < 0) ver = 1;
4110 0 0 if (ver > 5) ver = 5;
4118 0 0 if (ver >= 2)
4130 0 0 if (ver >= 4)
4143 1 0 if (self->state != CONN_READY)
4153 0 0 if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV) {
0 0 if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV) {
0 0 if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV) {
4156 0 0 if ((tmp = hv_fetch(opts, "headers", 7, 0)) && SvROK(*tmp) && SvTYPE(SvRV(*tmp)) == SVt_PVHV)
0 0 if ((tmp = hv_fetch(opts, "headers", 7, 0)) && SvROK(*tmp) && SvTYPE(SvRV(*tmp)) == SVt_PVHV)
0 0 if ((tmp = hv_fetch(opts, "headers", 7, 0)) && SvROK(*tmp) && SvTYPE(SvRV(*tmp)) == SVt_PVHV)
4158 0 0 if ((tmp = hv_fetch(opts, "acks", 4, 0)))
4160 0 0 if ((tmp = hv_fetch(opts, "compression", 11, 0))) {
4163 0 0 if (clen == 4 && memcmp(cstr, "none", 4) == 0) compression = COMPRESS_NONE;
0 0 if (clen == 4 && memcmp(cstr, "none", 4) == 0) compression = COMPRESS_NONE;
4168 0 0 else if (clen == 4 && memcmp(cstr, "gzip", 4) == 0) compression = COMPRESS_GZIP;
0 0 else if (clen == 4 && memcmp(cstr, "gzip", 4) == 0) compression = COMPRESS_GZIP;
4178 0 0 } else if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVCV) {
0 0 } else if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVCV) {
0 0 } else if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVCV) {
4186 0 0 if (SvOK(key_sv))
4191 0 0 if (SvOK(value_sv))
4198 0 0 if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV)
0 0 if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV)
0 0 if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV)
4200 0 0 if (ts_tmp && SvOK(*ts_tmp)) {
0 0 if (ts_tmp && SvOK(*ts_tmp)) {
4214 0 0 if (key) hv_stores(rec, "key", newSVpvn(key, key_len));
4215 0 0 if (value) hv_stores(rec, "value", newSVpvn(value, value_len));
4216 0 0 if (headers) hv_stores(rec, "headers", newRV_inc((SV*)headers));
4226 0 0 if (ver < 0) ver = 3;
4227 0 0 if (ver > 7) ver = 7;
4235 0 0 if (ver >= 3)
4253 0 0 conn_send_request(aTHX_ self, API_PRODUCE, ver, &body,
4254 0 0 (cb && SvOK(cb)) ? cb : NULL, 0, (acks == 0) ? 1 : 0);
4264 0 0 if (self->state != CONN_READY)
4268 0 0 if (ver < 0) ver = 0;
4269 0 0 if (ver > 2) ver = 2;
4278 0 0 if (ver >= 1)
4289 0 0 if (self->state != CONN_READY)
4293 0 0 if (ver < 0) ver = 1;
4294 0 0 if (ver > 5) ver = 5;
4304 0 0 if (ver >= 1)
4311 0 0 if (ver >= 5) {
4312 0 0 if (group_instance_id_sv && SvOK(group_instance_id_sv)) {
0 0 if (group_instance_id_sv && SvOK(group_instance_id_sv)) {
4339 0 0 for (i = 0; i < tc; i++) {
4358 0 0 if (self->state != CONN_READY)
4362 0 0 if (ver < 0) ver = 0;
4363 0 0 if (ver > 3) ver = 3;
4376 0 0 if (ver >= 3) {
4377 0 0 if (group_instance_id_sv && SvOK(group_instance_id_sv)) {
0 0 if (group_instance_id_sv && SvOK(group_instance_id_sv)) {
4387 0 0 if (SvOK(assignments_sv) && SvROK(assignments_sv)
0 0 if (SvOK(assignments_sv) && SvROK(assignments_sv)
4388 0 0 && SvTYPE(SvRV(assignments_sv)) == SVt_PVAV) {
4393 0 0 for (i = 0; i < ac; i++) {
4395 0 0 if (!elem || !SvROK(*elem)) continue;
0 0 if (!elem || !SvROK(*elem)) continue;
4399 0 0 if (!mid_sv) continue;
4405 0 0 if (!data_sv) { kf_buf_append_bytes(&body, NULL, 0); continue; }
4422 0 0 if (self->state != CONN_READY)
4426 0 0 if (ver < 0) ver = 0;
4427 0 0 if (ver > 4) ver = 4;
4440 0 0 if (ver >= 3) {
4441 0 0 if (group_instance_id_sv && SvOK(group_instance_id_sv)) {
0 0 if (group_instance_id_sv && SvOK(group_instance_id_sv)) {
4458 0 0 if (self->state != CONN_READY)
4462 0 0 if (ver < 0) ver = 2;
4463 0 0 if (ver > 7) ver = 7;
4472 0 0 if (ver >= 1)
4476 0 0 if (ver >= 1) {
4482 0 0 if (ver >= 7)
4490 0 0 for (i = 0; i < tc; i++) {
4492 0 0 if (!elem || !SvROK(*elem)) continue;
0 0 if (!elem || !SvROK(*elem)) continue;
4495 0 0 if (!tname_sv) continue;
4501 0 0 if (!parts_sv || !SvROK(*parts_sv)) { kf_buf_append_i32(&body, 0); continue; }
0 0 if (!parts_sv || !SvROK(*parts_sv)) { kf_buf_append_i32(&body, 0); continue; }
4506 0 0 for (j = 0; j < pc; j++) {
4508 0 0 if (!pelem || !SvROK(*pelem)) continue;
0 0 if (!pelem || !SvROK(*pelem)) continue;
4512 0 0 kf_buf_append_i32(&body, pid_sv ? (int32_t)SvIV(*pid_sv) : 0);
4515 0 0 kf_buf_append_i64(&body, off_sv ? (int64_t)SvIV(*off_sv) : 0);
4518 0 0 if (ver >= 6)
4534 0 0 if (self->state != CONN_READY)
4538 0 0 if (ver < 0) ver = 1;
4539 0 0 if (ver > 5) ver = 5;
4552 0 0 for (i = 0; i < tc; i++) {
4554 0 0 if (!elem || !SvROK(*elem)) continue;
0 0 if (!elem || !SvROK(*elem)) continue;
4557 0 0 if (!tname_sv) continue;
4563 0 0 if (!parts_sv || !SvROK(*parts_sv)) { kf_buf_append_i32(&body, 0); continue; }
0 0 if (!parts_sv || !SvROK(*parts_sv)) { kf_buf_append_i32(&body, 0); continue; }
4568 0 0 for (j = 0; j < pc; j++) {
4570 0 0 kf_buf_append_i32(&body, pelem ? (int32_t)SvIV(*pelem) : 0);
4582 0 0 if (self->state != CONN_READY)
4586 0 0 if (ver < 0) ver = 0;
4587 0 0 if (ver > 3) ver = 3;
4606 0 0 if (self->state != CONN_READY)
4610 0 0 if (ver < 0) ver = 0;
4611 0 0 if (ver > 4) ver = 4;
4620 0 0 for (i = 0; i < tc; i++) {
4622 0 0 if (!elem || !SvROK(*elem)) continue;
0 0 if (!elem || !SvROK(*elem)) continue;
4626 0 0 if (!name_sv) continue;
4632 0 0 int32_t num_partitions = np_sv ? (int32_t)SvIV(*np_sv) : 1;
4636 0 0 int16_t replication_factor = rf_sv ? (int16_t)SvIV(*rf_sv) : 1;
4649 0 0 if (ver >= 1)
4660 0 0 if (self->state != CONN_READY)
4664 0 0 if (ver < 0) ver = 0;
4665 0 0 if (ver > 3) ver = 3;
4674 0 0 for (i = 0; i < tc; i++) {
4676 0 0 if (!elem) continue;
4692 0 0 if (self->state != CONN_READY)
4696 0 0 if (ver < 0) ver = 0;
4697 0 0 if (ver > 1) ver = 1;
4702 0 0 if (SvOK(transactional_id_sv)) {
4713 0 0 if (ver >= 2) {
4726 0 0 if (self->state != CONN_READY)
4730 0 0 if (ver < 0) ver = 0;
4731 0 0 if (ver > 1) ver = 1;
4744 0 0 if (!SvROK(topics_sv) || SvTYPE(SvRV(topics_sv)) != SVt_PVAV)
0 0 if (!SvROK(topics_sv) || SvTYPE(SvRV(topics_sv)) != SVt_PVAV)
4750 0 0 for (i = 0; i < tc; i++) {
4752 0 0 if (!elem || !SvROK(*elem)) croak("add_partitions_to_txn: bad element");
0 0 if (!elem || !SvROK(*elem)) croak("add_partitions_to_txn: bad element");
4755 0 0 if (!tname_sv) croak("add_partitions_to_txn: missing topic");
4761 0 0 if (!parts_sv || !SvROK(*parts_sv)) croak("add_partitions_to_txn: missing partitions");
0 0 if (!parts_sv || !SvROK(*parts_sv)) croak("add_partitions_to_txn: missing partitions");
4765 0 0 for (j = 0; j < pc; j++) {
4767 0 0 kf_buf_append_i32(&body, pelem ? (int32_t)SvIV(*pelem) : 0);
4779 0 0 if (self->state != CONN_READY)
4783 0 0 if (ver < 0) ver = 0;
4784 0 0 if (ver > 1) ver = 1;
4805 0 0 if (self->state != CONN_READY)
4809 0 0 if (ver < 0) ver = 0;
4810 0 0 if (ver > 3) ver = 3;
4827 0 0 if (ver >= 3) {
4835 0 0 if (!SvROK(offsets_sv) || SvTYPE(SvRV(offsets_sv)) != SVt_PVAV)
0 0 if (!SvROK(offsets_sv) || SvTYPE(SvRV(offsets_sv)) != SVt_PVAV)
4841 0 0 for (i = 0; i < tc; i++) {
4843 0 0 if (!elem || !SvROK(*elem)) croak("txn_offset_commit: bad element");
0 0 if (!elem || !SvROK(*elem)) croak("txn_offset_commit: bad element");
4847 0 0 if (!tname_sv) croak("txn_offset_commit: missing topic");
4853 0 0 if (!parts_sv || !SvROK(*parts_sv)) croak("txn_offset_commit: missing partitions");
0 0 if (!parts_sv || !SvROK(*parts_sv)) croak("txn_offset_commit: missing partitions");
4858 0 0 for (j = 0; j < pc; j++) {
4860 0 0 if (!pelem || !SvROK(*pelem)) croak("txn_offset_commit: bad partition");
0 0 if (!pelem || !SvROK(*pelem)) croak("txn_offset_commit: bad partition");
4864 0 0 kf_buf_append_i32(&body, ppid_sv ? (int32_t)SvIV(*ppid_sv) : 0);
4867 0 0 kf_buf_append_i64(&body, off_sv ? (int64_t)SvIV(*off_sv) : 0);
4870 0 0 if (ver >= 2)
4894 259 16 while (remaining >= 4) {
4916 9 7 RETVAL = (int)(h & 0x7FFFFFFF);
4967 0 12 EXTEND(SP, 1);
5008 0 14 if (n < 0) RETVAL = &PL_sv_undef;
5022 16 0 if (!SvROK(records_sv) || SvTYPE(SvRV(records_sv)) != SVt_PVAV)
0 16 if (!SvROK(records_sv) || SvTYPE(SvRV(records_sv)) != SVt_PVAV)
5032 11 5 if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV) {
11 0 if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV) {
11 0 if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV) {
5035 10 1 if ((v = hv_fetchs(opts, "compression", 0)) && SvOK(*v))
10 0 if ((v = hv_fetchs(opts, "compression", 0)) && SvOK(*v))
5037 1 10 if ((v = hv_fetchs(opts, "producer_id", 0)) && SvOK(*v))
1 0 if ((v = hv_fetchs(opts, "producer_id", 0)) && SvOK(*v))
5039 1 10 if ((v = hv_fetchs(opts, "producer_epoch", 0)) && SvOK(*v))
1 0 if ((v = hv_fetchs(opts, "producer_epoch", 0)) && SvOK(*v))
5041 1 10 if ((v = hv_fetchs(opts, "base_sequence", 0)) && SvOK(*v))
1 0 if ((v = hv_fetchs(opts, "base_sequence", 0)) && SvOK(*v))
5043 0 11 if ((v = hv_fetchs(opts, "is_transactional", 0)) && SvOK(*v))
0 0 if ((v = hv_fetchs(opts, "is_transactional", 0)) && SvOK(*v))
5045 0 11 if ((v = hv_fetchs(opts, "timestamp", 0)) && SvOK(*v))
0 0 if ((v = hv_fetchs(opts, "timestamp", 0)) && SvOK(*v))
5072 2 13 if (strcmp(api, "metadata") == 0) {
5074 1 12 } else if (strcmp(api, "produce") == 0) {
5076 0 12 } else if (strcmp(api, "fetch") == 0) {
5078 0 12 } else if (strcmp(api, "list_offsets") == 0) {
5080 1 11 } else if (strcmp(api, "find_coordinator") == 0) {
5082 1 10 } else if (strcmp(api, "join_group") == 0) {
5084 1 9 } else if (strcmp(api, "sync_group") == 0) {
5086 1 8 } else if (strcmp(api, "heartbeat") == 0) {
5088 0 8 } else if (strcmp(api, "offset_commit") == 0) {
5090 1 7 } else if (strcmp(api, "offset_fetch") == 0) {
5092 1 6 } else if (strcmp(api, "leave_group") == 0) {
5094 1 5 } else if (strcmp(api, "create_topics") == 0) {
5096 1 4 } else if (strcmp(api, "delete_topics") == 0) {
5098 1 3 } else if (strcmp(api, "init_producer_id") == 0) {
5100 1 2 } else if (strcmp(api, "add_partitions_to_txn") == 0) {
5102 1 1 } else if (strcmp(api, "end_txn") == 0) {
5104 1 0 } else if (strcmp(api, "txn_offset_commit") == 0) {
5109 15 0 RETVAL = result ? SvREFCNT_inc(result) : &PL_sv_undef;
5125 3 13 if (n < 0) {
5137 0 21 I_EV_API("EV::Kafka");
21 0 I_EV_API("EV::Kafka");
0 21 I_EV_API("EV::Kafka");