Branch Coverage

Kafka.xs
Criterion Covered Total %
branch 461 1932 23.8


line true false branch
135 1656 19 if (b->cap >= need) return;
137 6 13 if (newcap < need) newcap = need;
143 205 0 if (b->data) { Safefree(b->data); b->data = NULL; }
185 0 0 if (len > 0) kf_buf_append(b, s, len);
189 0 1 if (!s) {
193 1 0 if (len > 0) kf_buf_append(b, s, len);
200 0 0 if (len > 0) kf_buf_append(b, s, len);
204 0 0 if (!s) {
208 0 0 if (len > 0) kf_buf_append(b, s, len);
215 21 864 while (val >= 0x80) {
230 0 0 if (!s) {
234 0 0 if (len > 0) kf_buf_append(b, s, len);
271 867 0 while (p < end) {
274 846 21 if (!(b & 0x80)) {
279 0 21 if (shift >= 64) return -1;
288 0 846 if (n < 0) return n;
295 0 13 if (end - buf < 2) return -1;
297 1 12 if (len < 0) { /* nullable null */
302 0 12 if (end - buf < 2 + len) return -1;
312 0 0 if (n < 0) return -1;
313 0 0 if (raw == 0) {
319 0 0 if (end - buf - n < len) return -1;
329 0 0 if (n < 0) return -1;
332 0 0 for (i = 0; i < count; i++) {
335 0 0 if (tn < 0) return -1;
339 0 0 if (dn < 0) return -1;
341 0 0 if ((uint64_t)(end - p) < dlen) return -1;
356 5376 21 for (i = 0; i < 256; i++) {
358 43008 5376 for (j = 0; j < 8; j++) {
359 21504 21504 if (crc & 1)
372 0 37 if (!crc32c_table_inited) crc32c_init_table();
373 205072 37 for (i = 0; i < len; i++)
547 0 1 if (self->ssl) {
548 0 0 int ssl_len = (len > (size_t)INT_MAX) ? INT_MAX : (int)len;
551 0 0 if (ret <= 0) {
553 0 0 if (err == SSL_ERROR_WANT_READ) {
557 0 0 if (err == SSL_ERROR_WANT_WRITE) {
562 0 0 if (err == SSL_ERROR_ZERO_RETURN) return 0;
575 0 1 if (self->ssl) {
576 0 0 int ssl_len = (len > (size_t)INT_MAX) ? INT_MAX : (int)len;
579 0 0 if (ret <= 0) {
581 0 0 if (err == SSL_ERROR_WANT_WRITE) {
585 0 0 if (err == SSL_ERROR_WANT_READ) {
605 1 0 if (self->rbuf_cap >= need) return;
607 0 0 if (newcap < need) newcap = need;
613 1 0 if (self->wbuf_cap >= need) return;
615 0 0 if (newcap < need) newcap = need;
625 1 0 if (!self->reading && self->fd >= 0) {
1 0 if (!self->reading && self->fd >= 0) {
632 1 8 if (self->reading) {
639 3 0 if (!self->writing && self->fd >= 0) {
3 0 if (!self->writing && self->fd >= 0) {
646 3 9 if (self->writing) {
657 0 1 if (!self->on_error)
665 0 1 PUSHMARK(SP);
666 0 1 XPUSHs(sv_2mortal(newSVpv(msg, 0)));
669 1 0 if (SvTRUE(ERRSV)) {
0 1 if (SvTRUE(ERRSV)) {
670 0 0 warn("EV::Kafka::Conn: on_error callback error: %s", SvPV_nolen(ERRSV));
671 0 0 sv_setsv(ERRSV, &PL_sv_undef);
673 1 0 FREETMPS;
680 0 1 if (!self->on_connect) return;
687 0 1 PUSHMARK(SP);
690 1 0 if (SvTRUE(ERRSV)) {
0 1 if (SvTRUE(ERRSV)) {
691 0 0 warn("EV::Kafka::Conn: on_connect callback error: %s", SvPV_nolen(ERRSV));
692 0 0 sv_setsv(ERRSV, &PL_sv_undef);
694 0 1 FREETMPS;
701 2 0 if (!self->on_disconnect) return;
708 0 0 PUSHMARK(SP);
711 0 0 if (SvTRUE(ERRSV)) {
0 0 if (SvTRUE(ERRSV)) {
712 0 0 warn("EV::Kafka::Conn: on_disconnect callback error: %s", SvPV_nolen(ERRSV));
713 0 0 sv_setsv(ERRSV, &PL_sv_undef);
715 0 0 FREETMPS;
723 0 0 if (!cb) return;
730 0 0 PUSHMARK(SP);
731 0 0 EXTEND(SP, 2);
732 0 0 PUSHs(result ? result : &PL_sv_undef);
733 0 0 PUSHs(error ? error : &PL_sv_undef);
736 0 0 if (SvTRUE(ERRSV)) {
0 0 if (SvTRUE(ERRSV)) {
737 0 0 warn("EV::Kafka::Conn: callback error: %s", SvPV_nolen(ERRSV));
738 0 0 sv_setsv(ERRSV, &PL_sv_undef);
740 0 0 FREETMPS;
757 0 9 if (self->timing) {
762 0 9 if (self->ssl) {
766 0 9 if (self->ssl_ctx) {
771 2 7 if (self->fd >= 0) {
780 9 0 SV *err_sv = in_destruct ? NULL : newSVpv(err, 0);
783 0 9 while (!ngx_queue_empty(&self->cb_queue)) {
788 0 0 if (cbt->cb && !cbt->internal && !in_destruct) {
0 0 if (cbt->cb && !cbt->internal && !in_destruct) {
0 0 if (cbt->cb && !cbt->internal && !in_destruct) {
790 0 0 if (conn_check_destroyed(self)) {
797 0 0 if (cbt->cb) SvREFCNT_dec(cbt->cb);
800 9 0 if (err_sv) SvREFCNT_dec(err_sv);
807 0 2 if (conn_check_destroyed(self)) return;
810 0 2 if (conn_check_destroyed(self)) return;
812 1 1 if (!self->intentional_disconnect && self->auto_reconnect) {
0 1 if (!self->intentional_disconnect && self->auto_reconnect) {
828 0 0 if (self->magic != KF_MAGIC_ALIVE) return;
829 0 0 if (self->state != CONN_DISCONNECTED) return;
830 0 0 if (!self->host) return;
836 0 0 if (self->reconnect_timing) return;
838 0 0 if (delay < 0.01) delay = 1.0;
864 1 0 int flexible = (api_key == API_API_VERSIONS && api_version >= 3);
0 1 int flexible = (api_key == API_API_VERSIONS && api_version >= 3);
870 0 1 if (flexible) {
880 0 1 if (raw_size > (size_t)INT32_MAX) {
887 0 1 if (self->wbuf_off > 0 && self->wbuf_off > self->wbuf_len / 2) {
0 0 if (self->wbuf_off > 0 && self->wbuf_off > self->wbuf_len / 2) {
889 0 0 if (self->wbuf_len > 0)
909 1 0 if (!no_response) {
916 0 1 if (cb) {
951 64 1 for (i = 0; i < API_VERSIONS_MAX_KEY; i++)
954 0 1 if (end - p < 2) goto err;
956 0 1 if (error_code != 0) {
960 0 0 if (conn_check_destroyed(self)) return;
966 0 1 if (end - p < 4) goto err;
969 2 1 for (i = 0; i < count; i++) {
970 0 2 if (end - p < 6) goto err;
975 2 0 if (key >= 0 && key < API_VERSIONS_MAX_KEY)
2 0 if (key >= 0 && key < API_VERSIONS_MAX_KEY)
982 0 1 if (self->sasl_mechanism) {
993 0 0 if (conn_check_destroyed(self)) return;
1020 0 0 if (end - p < 2) goto err;
1024 0 0 if (end - p < 4) goto err;
1027 0 0 for (i = 0; i < count; i++) {
1030 0 0 if (n < 0) goto err;
1034 0 0 if (error_code != 0) {
1036 0 0 if (conn_check_destroyed(self)) return;
1047 0 0 if (conn_check_destroyed(self)) return;
1060 0 0 if (self->sasl_mechanism && strcmp(self->sasl_mechanism, "PLAIN") == 0) {
0 0 if (self->sasl_mechanism && strcmp(self->sasl_mechanism, "PLAIN") == 0) {
1062 0 0 STRLEN ulen = self->sasl_username ? strlen(self->sasl_username) : 0;
1063 0 0 STRLEN plen = self->sasl_password ? strlen(self->sasl_password) : 0;
1068 0 0 if (ulen > 0) kf_buf_append(&body, self->sasl_username, ulen);
1070 0 0 if (plen > 0) kf_buf_append(&body, self->sasl_password, plen);
1073 0 0 else if (self->sasl_mechanism &&
1074 0 0 (strcmp(self->sasl_mechanism, "SCRAM-SHA-256") == 0 ||
1075 0 0 strcmp(self->sasl_mechanism, "SCRAM-SHA-512") == 0)) {
1082 0 0 for (i = 0; i < 16; i++)
1088 0 0 if (self->scram_nonce) Safefree(self->scram_nonce);
1092 0 0 if (!self->sasl_username) {
1105 0 0 if (self->scram_client_first) Safefree(self->scram_client_first);
1124 0 0 if (ver < 0) ver = 1;
1125 0 0 if (ver > 2) ver = 2;
1137 0 0 if (end - p < 2) goto err;
1144 0 0 if (n < 0) goto err;
1151 0 0 if (end - p >= 4) {
1153 0 0 if (auth_data_len > 0 && end - p >= auth_data_len) {
0 0 if (auth_data_len > 0 && end - p >= auth_data_len) {
1159 0 0 if (error_code != 0) {
1161 0 0 if (errmsg_str && errmsg_len > 0)
0 0 if (errmsg_str && errmsg_len > 0)
1166 0 0 if (conn_check_destroyed(self)) return;
1173 0 0 if (self->sasl_mechanism && self->scram_step == SCRAM_STEP_CLIENT_FIRST && auth_data) {
0 0 if (self->sasl_mechanism && self->scram_step == SCRAM_STEP_CLIENT_FIRST && auth_data) {
0 0 if (self->sasl_mechanism && self->scram_step == SCRAM_STEP_CLIENT_FIRST && auth_data) {
1184 0 0 while (sp < se) {
1185 0 0 if (sp + 2 <= se && sp[0] == 'r' && sp[1] == '=') {
0 0 if (sp + 2 <= se && sp[0] == 'r' && sp[1] == '=') {
0 0 if (sp + 2 <= se && sp[0] == 'r' && sp[1] == '=') {
1187 0 0 while (sp < se && *sp != ',') sp++;
0 0 while (sp < se && *sp != ',') sp++;
1189 0 0 } else if (sp + 2 <= se && sp[0] == 's' && sp[1] == '=') {
0 0 } else if (sp + 2 <= se && sp[0] == 's' && sp[1] == '=') {
0 0 } else if (sp + 2 <= se && sp[0] == 's' && sp[1] == '=') {
1191 0 0 while (sp < se && *sp != ',') sp++;
0 0 while (sp < se && *sp != ',') sp++;
1193 0 0 } else if (sp + 2 <= se && sp[0] == 'i' && sp[1] == '=') {
0 0 } else if (sp + 2 <= se && sp[0] == 'i' && sp[1] == '=') {
0 0 } else if (sp + 2 <= se && sp[0] == 'i' && sp[1] == '=') {
1196 0 0 while (sp < se && *sp != ',') sp++;
0 0 while (sp < se && *sp != ',') sp++;
1198 0 0 if (sp < se && *sp == ',') sp++;
0 0 if (sp < se && *sp == ',') sp++;
1203 0 0 if (!server_nonce || !salt_b64 || iterations <= 0) {
0 0 if (!server_nonce || !salt_b64 || iterations <= 0) {
0 0 if (!server_nonce || !salt_b64 || iterations <= 0) {
1205 0 0 if (conn_check_destroyed(self)) return;
1211 0 0 if (server_nonce_len < 32 ||
1212 0 0 memcmp(server_nonce, self->scram_nonce, 32) != 0) {
1214 0 0 if (conn_check_destroyed(self)) return;
1220 0 0 const EVP_MD *md = is_sha512 ? EVP_sha512() : EVP_sha256();
1221 0 0 int digest_len = is_sha512 ? 64 : 32;
1233 0 0 if (salt_len <= 0) {
1235 0 0 if (conn_check_destroyed(self)) return;
1292 0 0 for (di = 0; di < digest_len; di++)
1296 0 0 if (self->scram_auth_message) Safefree(self->scram_auth_message);
1313 0 0 int plen = bptr->length < 255 ? (int)bptr->length : 255;
1338 0 0 if (ver < 0) ver = 1;
1339 0 0 if (ver > 2) ver = 2;
1347 0 0 if (self->sasl_mechanism && self->scram_step == SCRAM_STEP_CLIENT_FINAL) {
0 0 if (self->sasl_mechanism && self->scram_step == SCRAM_STEP_CLIENT_FINAL) {
1350 0 0 if (!auth_data || auth_data_len < 2 || auth_data[0] != 'v' || auth_data[1] != '=') {
0 0 if (!auth_data || auth_data_len < 2 || auth_data[0] != 'v' || auth_data[1] != '=') {
0 0 if (!auth_data || auth_data_len < 2 || auth_data[0] != 'v' || auth_data[1] != '=') {
0 0 if (!auth_data || auth_data_len < 2 || auth_data[0] != 'v' || auth_data[1] != '=') {
1352 0 0 if (conn_check_destroyed(self)) return;
1361 0 0 if (strcmp(self->sasl_mechanism, "SCRAM-SHA-256") == 0) md = EVP_sha256();
1362 0 0 else if (strcmp(self->sasl_mechanism, "SCRAM-SHA-512") == 0) md = EVP_sha512();
1363 0 0 if (!md || self->scram_server_key_len <= 0 || !self->scram_auth_message) {
0 0 if (!md || self->scram_server_key_len <= 0 || !self->scram_auth_message) {
0 0 if (!md || self->scram_server_key_len <= 0 || !self->scram_auth_message) {
1365 0 0 if (conn_check_destroyed(self)) return;
1388 0 0 && (CRYPTO_memcmp(recv_sig, expected, expected_len) == 0);
0 0 && (CRYPTO_memcmp(recv_sig, expected, expected_len) == 0);
1393 0 0 if (self->scram_auth_message) {
1401 0 0 if (!ok) {
1403 0 0 if (conn_check_destroyed(self)) return;
1419 0 0 if (conn_check_destroyed(self)) return;
1431 1 0 if (cbt->internal) {
1448 0 0 if (!cbt->cb) return;
1567 140 1 if (key) {
1575 140 1 if (value) {
1583 1 140 if (headers && HvUSEDKEYS(headers) > 0) {
0 1 if (headers && HvUSEDKEYS(headers) > 0) {
1 0 if (headers && HvUSEDKEYS(headers) > 0) {
1584 0 1 kf_buf_append_varint(&rec, (int64_t)HvUSEDKEYS(headers));
1587 2 1 while ((entry = hv_iternext(headers))) {
1619 141 16 for (i = 0; i < count; i++) {
1621 141 0 if (!elem || !SvROK(*elem) || SvTYPE(SvRV(*elem)) != SVt_PVHV)
141 0 if (!elem || !SvROK(*elem) || SvTYPE(SvRV(*elem)) != SVt_PVHV)
0 141 if (!elem || !SvROK(*elem) || SvTYPE(SvRV(*elem)) != SVt_PVHV)
1628 141 16 for (i = 0; i < count; i++) {
1637 141 0 if (key_sv && SvOK(*key_sv)) key = SvPV(*key_sv, key_len);
140 1 if (key_sv && SvOK(*key_sv)) key = SvPV(*key_sv, key_len);
1638 141 0 if (val_sv && SvOK(*val_sv)) val = SvPV(*val_sv, val_len);
140 1 if (val_sv && SvOK(*val_sv)) val = SvPV(*val_sv, val_len);
1639 1 140 if (hdr_sv && SvROK(*hdr_sv) && SvTYPE(SvRV(*hdr_sv)) == SVt_PVHV)
1 0 if (hdr_sv && SvROK(*hdr_sv) && SvTYPE(SvRV(*hdr_sv)) == SVt_PVHV)
1 0 if (hdr_sv && SvROK(*hdr_sv) && SvTYPE(SvRV(*hdr_sv)) == SVt_PVHV)
1648 0 16 if (is_transactional) attrs |= 0x10; /* bit 4 = isTransactional */
1677 2 14 if (compression == COMPRESS_GZIP) {
1686 2 0 if (zinit == Z_OK) {
1691 2 0 if (deflate(&zs, Z_FINISH) == Z_STREAM_END) zok = 1;
1694 2 0 if (zok) {
1785 0 2 if (flexible) {
1787 0 0 if (end - p < 4) goto done;
1793 0 0 if (n < 0) goto done;
1798 0 0 for (i = 0; i < broker_count; i++) {
1800 0 0 if (end - p < 4) goto done;
1806 0 0 if (n < 0) goto done;
1808 0 0 hv_store(bh, "host", 4, newSVpvn(host ? host : "", host ? hlen : 0), 0);
0 0 hv_store(bh, "host", 4, newSVpvn(host ? host : "", host ? hlen : 0), 0);
1810 0 0 if (end - p < 4) goto done;
1817 0 0 if (n < 0) goto done;
1822 0 0 if (n < 0) goto done;
1831 0 0 if (n < 0) goto done;
1835 0 0 if (end - p < 4) goto done;
1841 0 0 if (n < 0) goto done;
1845 0 0 for (i = 0; i < topic_count; i++) {
1847 0 0 if (end - p < 2) goto done;
1853 0 0 if (n < 0) goto done;
1855 0 0 hv_store(th, "name", 4, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
0 0 hv_store(th, "name", 4, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
1858 0 0 if (version >= 10) {
1859 0 0 if (end - p < 16) goto done;
1864 0 0 if (end - p < 1) goto done;
1869 0 0 if (n < 0) goto done;
1875 0 0 for (j = 0; j < part_count; j++) {
1877 0 0 if (end - p < 2) goto done;
1881 0 0 if (end - p < 4) goto done;
1885 0 0 if (end - p < 4) goto done;
1890 0 0 if (version >= 7) {
1891 0 0 if (end - p < 4) goto done;
1897 0 0 if (n < 0) goto done;
1900 0 0 if (rcount < 0 || rcount > 65536) goto done;
0 0 if (rcount < 0 || rcount > 65536) goto done;
1901 0 0 if (end - p < (ptrdiff_t)((int64_t)rcount * 4)) goto done;
1906 0 0 if (n < 0) goto done;
1909 0 0 if (rcount < 0 || rcount > 65536) goto done;
0 0 if (rcount < 0 || rcount > 65536) goto done;
1910 0 0 if (end - p < (ptrdiff_t)((int64_t)rcount * 4)) goto done;
1914 0 0 if (version >= 5) {
1916 0 0 if (n < 0) goto done;
1919 0 0 if (rcount < 0 || rcount > 65536) goto done;
0 0 if (rcount < 0 || rcount > 65536) goto done;
1920 0 0 if (end - p < (ptrdiff_t)((int64_t)rcount * 4)) goto done;
1926 0 0 if (n < 0) goto done;
1934 0 0 if (version >= 8) {
1935 0 0 if (end - p < 4) goto done;
1941 0 0 if (n < 0) goto done;
1949 0 2 if (version >= 3) {
1950 0 0 if (end - p < 4) goto done;
1955 0 2 if (end - p < 4) goto done;
1957 2 0 if (broker_count < 0 || broker_count > 65536) goto done;
0 2 if (broker_count < 0 || broker_count > 65536) goto done;
1959 2 1 for (i = 0; i < broker_count; i++) {
1961 1 1 if (end - p < 4) goto done;
1967 0 1 if (n < 0) goto done;
1969 1 0 hv_store(bh, "host", 4, newSVpvn(host ? host : "", host ? hlen : 0), 0);
1 0 hv_store(bh, "host", 4, newSVpvn(host ? host : "", host ? hlen : 0), 0);
1971 0 1 if (end - p < 4) goto done;
1976 0 1 if (version >= 1) {
1979 0 0 if (n < 0) goto done;
1987 0 1 if (version >= 2) {
1990 0 0 if (n < 0) goto done;
1995 0 1 if (version >= 1) {
1996 0 0 if (end - p < 4) goto done;
2002 0 1 if (end - p < 4) goto done;
2004 1 0 if (topic_count < 0 || topic_count > 1000000) goto done;
0 1 if (topic_count < 0 || topic_count > 1000000) goto done;
2005 1 1 for (i = 0; i < topic_count; i++) {
2007 0 1 if (end - p < 2) goto done;
2013 0 1 if (n < 0) goto done;
2015 1 0 hv_store(th, "name", 4, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
1 0 hv_store(th, "name", 4, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
2018 0 1 if (version >= 1) {
2019 0 0 if (end - p < 1) goto done;
2024 0 1 if (end - p < 4) goto done;
2026 1 0 if (part_count < 0 || part_count > 1000000) goto done;
0 1 if (part_count < 0 || part_count > 1000000) goto done;
2029 1 1 for (j = 0; j < part_count; j++) {
2031 0 1 if (end - p < 2) goto done;
2035 0 1 if (end - p < 4) goto done;
2039 0 1 if (end - p < 4) goto done;
2044 0 1 if (version >= 7) {
2045 0 0 if (end - p < 4) goto done;
2050 0 1 if (end - p < 4) goto done;
2052 1 0 if (rcount < 0 || rcount > 65536) goto done;
0 1 if (rcount < 0 || rcount > 65536) goto done;
2053 0 1 if (end - p < (ptrdiff_t)((int64_t)rcount * 4)) goto done;
2057 0 1 if (end - p < 4) goto done;
2059 1 0 if (rcount < 0 || rcount > 65536) goto done;
0 1 if (rcount < 0 || rcount > 65536) goto done;
2060 0 1 if (end - p < (ptrdiff_t)((int64_t)rcount * 4)) goto done;
2064 0 1 if (version >= 5) {
2065 0 0 if (end - p < 4) goto done;
2067 0 0 if (rcount < 0 || rcount > 65536) goto done;
0 0 if (rcount < 0 || rcount > 65536) goto done;
2068 0 0 if (end - p < (ptrdiff_t)((int64_t)rcount * 4)) goto done;
2099 0 1 if (end - p < 4) goto done;
2103 1 1 for (i = 0; i < topic_count; i++) {
2107 0 1 if (n < 0) goto done;
2109 1 0 hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
1 0 hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
2112 0 1 if (end - p < 4) goto done;
2117 1 1 for (j = 0; j < part_count; j++) {
2119 0 1 if (end - p < 4) goto done;
2123 0 1 if (end - p < 2) goto done;
2127 0 1 if (end - p < 8) goto done;
2132 0 1 if (version >= 2) {
2133 0 0 if (end - p < 8) goto done;
2138 0 1 if (version >= 5) {
2139 0 0 if (end - p < 8) goto done;
2150 1 0 if (version >= 1 && end - p >= 4) {
0 0 if (version >= 1 && end - p >= 4) {
2173 0 16 if (end - p < 12) return -1;
2175 16 0 if (out_base_offset) *out_base_offset = base_offset;
2178 1 15 if (end - p < batch_length) return -1;
2181 0 15 if (batch_end - p < 9) return -1;
2184 1 14 if (magic != 2) return -1; /* only support magic=2 (current format) */
2187 1 13 if (crc32c(p, (size_t)(batch_end - p)) != expected_crc) return -1;
2189 0 13 if (batch_end - p < 36) return -1;
2199 0 13 if (batch_end - p < 4) return -1;
2207 7 6 if (compression_type != COMPRESS_NONE && batch_end > p) {
7 0 if (compression_type != COMPRESS_NONE && batch_end > p) {
2210 6 1 if (decomp_cap < 4096) decomp_cap = 4096;
2213 1 6 if (compression_type == COMPRESS_GZIP) {
2215 1 1 while (!zok && decomp_cap < 64 * 1024 * 1024) {
1 0 while (!zok && decomp_cap < 64 * 1024 * 1024) {
2220 0 1 if (zinit != Z_OK) {
2232 1 0 if (zret == Z_STREAM_END) {
2236 0 0 } else if (zret == Z_BUF_ERROR || zret == Z_OK) {
0 0 } else if (zret == Z_BUF_ERROR || zret == Z_OK) {
2315 138 13 for (i = 0; i < record_count; i++) {
2318 0 138 if (n < 0) { if (decompressed) Safefree(decompressed); return -1; }
0 0 if (n < 0) { if (decompressed) Safefree(decompressed); return -1; }
2320 0 138 if (rec_end - rp < rec_len) { if (decompressed) Safefree(decompressed); return -1; }
0 0 if (rec_end - rp < rec_len) { if (decompressed) Safefree(decompressed); return -1; }
2323 0 138 if (this_rec_end - rp < 1) { if (decompressed) Safefree(decompressed); return -1; }
0 0 if (this_rec_end - rp < 1) { if (decompressed) Safefree(decompressed); return -1; }
2328 0 138 if (n < 0) { if (decompressed) Safefree(decompressed); return -1; }
0 0 if (n < 0) { if (decompressed) Safefree(decompressed); return -1; }
2333 0 138 if (n < 0) { if (decompressed) Safefree(decompressed); return -1; }
0 0 if (n < 0) { if (decompressed) Safefree(decompressed); return -1; }
2339 0 138 if (n < 0) { if (decompressed) Safefree(decompressed); return -1; }
0 0 if (n < 0) { if (decompressed) Safefree(decompressed); return -1; }
2342 137 1 if (key_len >= 0) {
2343 0 137 if (this_rec_end - rp < key_len) { if (decompressed) Safefree(decompressed); return -1; }
0 0 if (this_rec_end - rp < key_len) { if (decompressed) Safefree(decompressed); return -1; }
2351 0 138 if (n < 0) { if (decompressed) Safefree(decompressed); return -1; }
0 0 if (n < 0) { if (decompressed) Safefree(decompressed); return -1; }
2354 137 1 if (val_len >= 0) {
2355 0 137 if (this_rec_end - rp < val_len) { if (decompressed) Safefree(decompressed); return -1; }
0 0 if (this_rec_end - rp < val_len) { if (decompressed) Safefree(decompressed); return -1; }
2363 0 138 if (n < 0) { if (decompressed) Safefree(decompressed); return -1; }
0 0 if (n < 0) { if (decompressed) Safefree(decompressed); return -1; }
2366 1 137 if (hdr_count > 0) {
2369 2 1 for (h = 0; h < hdr_count; h++) {
2372 0 2 if (n < 0) { SvREFCNT_dec((SV*)hdr_hv); if (decompressed) Safefree(decompressed); return -1; }
0 0 if (n < 0) { SvREFCNT_dec((SV*)hdr_hv); if (decompressed) Safefree(decompressed); return -1; }
2375 2 0 if (hk_len < 0 || this_rec_end - rp < hk_len) { SvREFCNT_dec((SV*)hdr_hv); if (decompressed) Safefree(decompressed); return -1; }
0 2 if (hk_len < 0 || this_rec_end - rp < hk_len) { SvREFCNT_dec((SV*)hdr_hv); if (decompressed) Safefree(decompressed); return -1; }
0 0 if (hk_len < 0 || this_rec_end - rp < hk_len) { SvREFCNT_dec((SV*)hdr_hv); if (decompressed) Safefree(decompressed); return -1; }
2380 0 2 if (n < 0) { SvREFCNT_dec((SV*)hdr_hv); if (decompressed) Safefree(decompressed); return -1; }
0 0 if (n < 0) { SvREFCNT_dec((SV*)hdr_hv); if (decompressed) Safefree(decompressed); return -1; }
2383 2 0 if (hv_len >= 0) {
2384 0 2 if (this_rec_end - rp < hv_len) { SvREFCNT_dec((SV*)hdr_hv); if (decompressed) Safefree(decompressed); return -1; }
0 0 if (this_rec_end - rp < hv_len) { SvREFCNT_dec((SV*)hdr_hv); if (decompressed) Safefree(decompressed); return -1; }
2388 2 0 hv_store(hdr_hv, hk_data, (I32)hk_len,
2396 137 1 hv_store(rec_hv, "key", 3,
2398 137 1 hv_store(rec_hv, "value", 5,
2400 1 137 if (hdr_hv)
2408 1 12 if (decompressed) Safefree(decompressed);
2425 0 0 if (version >= 1) {
2426 0 0 if (end - p < 4) goto done;
2432 0 0 if (version >= 7) {
2433 0 0 if (end - p < 2) goto done;
2438 0 0 if (version >= 7) {
2439 0 0 if (end - p < 4) goto done;
2444 0 0 if (end - p < 4) goto done;
2448 0 0 for (i = 0; i < topic_count; i++) {
2453 0 0 if (n < 0) goto done;
2455 0 0 hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
0 0 hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
2458 0 0 if (end - p < 4) goto done;
2463 0 0 for (j = 0; j < part_count; j++) {
2466 0 0 if (end - p < 4) goto done;
2470 0 0 if (end - p < 2) goto done;
2474 0 0 if (end - p < 8) goto done;
2479 0 0 if (version >= 4) {
2480 0 0 if (end - p < 8) goto done;
2486 0 0 if (version >= 5) {
2487 0 0 if (end - p < 8) goto done;
2492 0 0 if (version >= 4) {
2493 0 0 if (end - p < 4) goto done;
2496 0 0 for (at = 0; at < at_count; at++) {
2497 0 0 if (end - p < 16) goto done;
2503 0 0 if (end - p < 4) goto done;
2507 0 0 if (records_size > 0 && end - p >= records_size) {
0 0 if (records_size > 0 && end - p >= records_size) {
2512 0 0 while (rp < rend && rend - rp >= 12) {
0 0 while (rp < rend && rend - rp >= 12) {
2515 0 0 if (bl < 0 || rend - rp < 12 + bl) break;
0 0 if (bl < 0 || rend - rp < 12 + bl) break;
2521 0 0 } else if (records_size > 0) {
2551 0 0 if (version >= 2) {
2552 0 0 if (end - p < 4) goto done;
2557 0 0 if (end - p < 4) goto done;
2561 0 0 for (i = 0; i < topic_count; i++) {
2565 0 0 if (n < 0) goto done;
2567 0 0 hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
0 0 hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
2569 0 0 if (end - p < 4) goto done;
2574 0 0 for (j = 0; j < part_count; j++) {
2576 0 0 if (end - p < 4) goto done;
2580 0 0 if (end - p < 2) goto done;
2584 0 0 if (version >= 1) {
2585 0 0 if (end - p < 8) goto done;
2590 0 0 if (end - p < 8) goto done;
2595 0 0 if (version >= 4) {
2596 0 0 if (end - p < 4) goto done;
2622 0 1 if (version >= 1) {
2623 0 0 if (end - p < 4) goto done;
2627 0 1 if (end - p < 2) goto done;
2632 0 1 if (version >= 1) {
2635 0 0 if (n < 0) goto done;
2637 0 0 if (emsg && elen > 0)
0 0 if (emsg && elen > 0)
2641 0 1 if (end - p < 4) goto done;
2647 0 1 if (n < 0) goto done;
2649 1 0 hv_store(result, "host", 4, newSVpvn(host ? host : "", host ? hlen : 0), 0);
1 0 hv_store(result, "host", 4, newSVpvn(host ? host : "", host ? hlen : 0), 0);
2651 0 1 if (end - p < 4) goto done;
2670 0 1 if (version >= 2) {
2671 0 0 if (end - p < 4) goto done;
2675 0 1 if (end - p < 2) goto done;
2679 0 1 if (end - p < 4) goto done;
2687 0 1 if (n < 0) goto done;
2689 1 0 if (proto)
2694 0 1 if (n < 0) goto done;
2696 1 0 if (leader)
2703 0 1 if (n < 0) goto done;
2705 1 0 if (member_id)
2709 0 1 if (end - p < 4) goto done;
2713 1 1 for (i = 0; i < mcount; i++) {
2718 0 1 if (n < 0) goto done;
2720 1 0 if (mid)
2724 0 1 if (version >= 5) {
2727 0 0 if (n < 0) goto done;
2732 0 1 if (end - p < 4) goto done;
2734 0 1 if (mdlen > 0) {
2735 0 0 if (end - p < mdlen) goto done;
2758 0 1 if (version >= 1) {
2759 0 0 if (end - p < 4) goto done;
2763 0 1 if (end - p < 2) goto done;
2768 0 1 if (end - p < 4) goto done;
2770 0 1 if (alen > 0 && end - p >= alen) {
0 1 if (alen > 0 && end - p >= alen) {
2788 0 1 if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */
0 0 if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */
2789 1 0 if (end - p >= 2) {
2808 0 0 if (version >= 3 && end - p >= 4) p += 4; /* throttle_time_ms */
0 0 if (version >= 3 && end - p >= 4) p += 4; /* throttle_time_ms */
2810 0 0 if (end - p < 4) goto done;
2813 0 0 for (i = 0; i < tc; i++) {
2817 0 0 if (n < 0) goto done;
2819 0 0 hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
0 0 hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
2821 0 0 if (end - p < 4) goto done;
2825 0 0 for (j = 0; j < pc; j++) {
2827 0 0 if (end - p < 6) goto done;
2854 0 1 if (version >= 3 && end - p >= 4) p += 4; /* throttle_time_ms */
0 0 if (version >= 3 && end - p >= 4) p += 4; /* throttle_time_ms */
2856 0 1 if (end - p < 4) goto done;
2859 1 1 for (i = 0; i < tc; i++) {
2863 0 1 if (n < 0) goto done;
2865 1 0 hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
1 0 hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
2867 0 1 if (end - p < 4) goto done;
2871 1 1 for (j = 0; j < pc; j++) {
2873 0 1 if (end - p < 4) goto done;
2877 0 1 if (end - p < 8) goto done;
2882 0 1 if (version >= 5 && end - p >= 4) p += 4;
0 0 if (version >= 5 && end - p >= 4) p += 4;
2886 0 1 if (n < 0) goto done;
2889 0 1 if (end - p < 2) goto done;
2900 1 0 if (version >= 2 && end - p >= 2) {
0 0 if (version >= 2 && end - p >= 2) {
2919 0 1 if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */
0 0 if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */
2920 1 0 if (end - p >= 2) {
2939 0 1 if (version >= 2 && end - p >= 4) p += 4; /* throttle_time_ms */
0 0 if (version >= 2 && end - p >= 4) p += 4; /* throttle_time_ms */
2941 0 1 if (end - p < 4) goto done;
2944 1 1 for (i = 0; i < tc; i++) {
2948 0 1 if (n < 0) goto done;
2950 1 0 hv_store(th, "name", 4, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
1 0 hv_store(th, "name", 4, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
2952 0 1 if (end - p < 2) goto done;
2957 0 1 if (version >= 1) {
2960 0 0 if (n < 0) goto done;
2962 0 0 if (emsg && elen > 0)
0 0 if (emsg && elen > 0)
2985 0 1 if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */
0 0 if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */
2987 0 1 if (end - p < 4) goto done;
2990 1 1 for (i = 0; i < tc; i++) {
2994 0 1 if (n < 0) goto done;
2996 1 0 hv_store(th, "name", 4, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
1 0 hv_store(th, "name", 4, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
2998 0 1 if (end - p < 2) goto done;
3019 1 0 if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */
1 0 if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */
3021 0 1 if (end - p < 2) goto done;
3025 0 1 if (end - p < 8) goto done;
3029 0 1 if (end - p < 2) goto done;
3048 0 1 if (end - p < 4) goto done;
3051 0 1 if (end - p < 4) goto done;
3054 1 1 for (i = 0; i < tc; i++) {
3058 0 1 if (n < 0) goto done;
3060 1 0 hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
1 0 hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
3062 0 1 if (end - p < 4) goto done;
3066 1 1 for (j = 0; j < pc; j++) {
3068 0 1 if (end - p < 6) goto done;
3093 1 0 if (end - p >= 4) p += 4; /* throttle_time_ms */
3094 1 0 if (end - p >= 2) {
3113 0 1 if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */
0 0 if (version >= 1 && end - p >= 4) p += 4; /* throttle_time_ms */
3115 0 1 if (end - p < 4) goto done;
3118 0 1 for (i = 0; i < tc; i++) {
3122 0 0 if (n < 0) goto done;
3124 0 0 hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
0 0 hv_store(th, "topic", 5, newSVpvn(tname ? tname : "", tname ? tnlen : 0), 0);
3126 0 0 if (end - p < 4) goto done;
3130 0 0 for (j = 0; j < pc; j++) {
3132 0 0 if (end - p < 6) goto done;
3153 1 1 while (self->rbuf_len >= 4) {
3155 1 0 if (msg_size < 0 || msg_size > 256 * 1024 * 1024) {
0 1 if (msg_size < 0 || msg_size > 256 * 1024 * 1024) {
3157 0 0 if (conn_check_destroyed(self)) return;
3162 0 1 if (self->rbuf_len < (size_t)(4 + msg_size))
3168 0 1 if (msg_size < 4) {
3170 0 0 if (conn_check_destroyed(self)) return;
3181 1 0 if (!ngx_queue_empty(&self->cb_queue)) {
3184 0 1 if (cbt->correlation_id != corr_id) {
3191 0 0 if (conn_check_destroyed(self)) return;
3201 1 0 if (cbt) {
3203 0 1 if (cbt->cb) SvREFCNT_dec(cbt->cb);
3205 0 1 if (conn_check_destroyed(self)) return;
3210 1 0 if (self->rbuf_len > 0)
3224 4 0 if (!self || self->magic != KF_MAGIC_ALIVE) return;
0 4 if (!self || self->magic != KF_MAGIC_ALIVE) return;
3227 2 2 if (self->state == CONN_CONNECTING) {
3231 2 0 if (self->timing) {
3237 0 2 if (getsockopt(self->fd, SOL_SOCKET, SO_ERROR, &err, &errlen) < 0)
3239 1 1 if (err != 0) {
3243 0 1 if (conn_check_destroyed(self)) return;
3254 0 2 if (self->state == CONN_TLS_HANDSHAKE) {
3257 0 0 if (ret == 1) {
3267 0 0 if (err == SSL_ERROR_WANT_READ) {
3270 0 0 } else if (err == SSL_ERROR_WANT_WRITE) {
3276 0 0 if (e) {
3284 0 0 if (conn_check_destroyed(self)) return;
3292 1 1 if (revents & EV_WRITE) {
3293 1 1 while (self->wbuf_off < self->wbuf_len) {
3296 0 1 if (n < 0) {
3297 0 0 if (errno == EAGAIN || errno == EWOULDBLOCK) break;
0 0 if (errno == EAGAIN || errno == EWOULDBLOCK) break;
3299 0 0 if (conn_check_destroyed(self)) return;
3303 0 1 if (n == 0) {
3310 1 0 if (self->wbuf_off >= self->wbuf_len) {
3318 1 1 if (revents & EV_READ) {
3322 0 1 if (n < 0) {
3323 0 0 if (errno == EAGAIN || errno == EWOULDBLOCK) return;
0 0 if (errno == EAGAIN || errno == EWOULDBLOCK) return;
3325 0 0 if (conn_check_destroyed(self)) return;
3329 0 1 if (n == 0) {
3350 0 0 if (self->magic != KF_MAGIC_ALIVE) return;
3353 0 0 if (conn_check_destroyed(self)) return;
3374 0 1 if (self->tls_enabled) {
3376 0 0 if (!self->ssl_ctx) {
3378 0 0 if (conn_check_destroyed(self)) return;
3383 0 0 if (self->tls_skip_verify)
3388 0 0 if (self->tls_ca_file) {
3389 0 0 if (SSL_CTX_load_verify_locations(self->ssl_ctx, self->tls_ca_file, NULL) != 1) {
3391 0 0 if (conn_check_destroyed(self)) return;
3398 0 0 if (!self->ssl) {
3400 0 0 if (conn_check_destroyed(self)) return;
3406 0 0 if (!is_ip_literal(self->host))
3409 0 0 if (!self->tls_skip_verify) {
3412 0 0 if (is_ip_literal(self->host))
3421 0 0 if (ret == 1) {
3428 0 0 if (err == SSL_ERROR_WANT_READ) {
3430 0 0 } else if (err == SSL_ERROR_WANT_WRITE) {
3435 0 0 if (e) {
3443 0 0 if (conn_check_destroyed(self)) return;
3462 0 2 if (self->state != CONN_DISCONNECTED) {
3467 2 0 if (host != self->host) {
3468 0 2 if (self->host) Safefree(self->host);
3486 0 2 if (gai_err != 0) {
3490 0 2 if (gai_err != 0) {
3497 2 0 for (rp = res; rp; rp = rp->ai_next) {
3499 0 2 if (fd < 0) continue;
3511 0 2 if (ret == 0) {
3526 2 0 if (errno == EINPROGRESS) {
3538 2 0 if (timeout > 0) {
3569 0 7 if (SvOK(loop_sv) && sv_derived_from(loop_sv, "EV::Loop"))
0 0 if (SvOK(loop_sv) && sv_derived_from(loop_sv, "EV::Loop"))
3598 448 7 for (i = 0; i < API_VERSIONS_MAX_KEY; i++)
3613 0 7 if (self->magic != KF_MAGIC_ALIVE) return;
3621 0 7 if (self->reconnect_timing) {
3626 2 5 CLEAR_HANDLER(self->on_error);
3627 1 6 CLEAR_HANDLER(self->on_connect);
3628 0 7 CLEAR_HANDLER(self->on_disconnect);
3630 2 5 if (self->host) Safefree(self->host);
3631 7 0 if (self->client_id) Safefree(self->client_id);
3632 0 7 if (self->sasl_mechanism) Safefree(self->sasl_mechanism);
3633 0 7 if (self->sasl_username) Safefree(self->sasl_username);
3634 0 7 if (self->sasl_password) Safefree(self->sasl_password);
3635 0 7 if (self->scram_nonce) Safefree(self->scram_nonce);
3636 0 7 if (self->scram_client_first) Safefree(self->scram_client_first);
3639 0 7 if (self->scram_auth_message) {
3644 0 7 if (self->tls_ca_file) Safefree(self->tls_ca_file);
3645 7 0 if (self->rbuf) Safefree(self->rbuf);
3646 7 0 if (self->wbuf) Safefree(self->wbuf);
3663 0 1 if (self->reconnect_timing) {
3673 0 3 RETVAL = (self->state == CONN_READY) ? 1 : 0;
3680 0 0 RETVAL = self->state;
3687 0 0 RETVAL = self->pending_count;
3695 0 2 CLEAR_HANDLER(self->on_error);
3696 2 0 if (cb && SvOK(cb)) {
2 0 if (cb && SvOK(cb)) {
3705 0 1 CLEAR_HANDLER(self->on_connect);
3706 1 0 if (cb && SvOK(cb)) {
1 0 if (cb && SvOK(cb)) {
3715 0 0 CLEAR_HANDLER(self->on_disconnect);
3716 0 0 if (cb && SvOK(cb)) {
0 0 if (cb && SvOK(cb)) {
3725 0 0 if (id) {
3726 0 0 if (self->client_id) Safefree(self->client_id);
3737 0 0 if (self->tls_ca_file) { Safefree(self->tls_ca_file); self->tls_ca_file = NULL; }
3738 0 0 if (ca_file) self->tls_ca_file = savepv(ca_file);
3746 0 0 if (self->sasl_mechanism) { Safefree(self->sasl_mechanism); self->sasl_mechanism = NULL; }
3747 0 0 if (self->sasl_username) { Safefree(self->sasl_username); self->sasl_username = NULL; }
3748 0 0 if (self->sasl_password) { Safefree(self->sasl_password); self->sasl_password = NULL; }
3749 0 0 if (SvOK(ST(1))) {
3751 0 0 if (username) self->sasl_username = savepv(username);
3752 0 0 if (password) self->sasl_password = savepv(password);
3768 1 0 if (self->state != CONN_READY)
3776 0 0 if (ver < 0) ver = 1;
3777 0 0 if (ver > 4) ver = 4;
3779 0 0 if (SvOK(topics_sv) && SvROK(topics_sv) && SvTYPE(SvRV(topics_sv)) == SVt_PVAV) {
0 0 if (SvOK(topics_sv) && SvROK(topics_sv) && SvTYPE(SvRV(topics_sv)) == SVt_PVAV) {
0 0 if (SvOK(topics_sv) && SvROK(topics_sv) && SvTYPE(SvRV(topics_sv)) == SVt_PVAV) {
3783 0 0 for (i = 0; i < count; i++) {
3794 0 0 if (ver >= 4)
3805 0 0 if (!self->api_versions_known)
3810 0 0 for (i = 0; i < API_VERSIONS_MAX_KEY; i++) {
3811 0 0 if (self->api_versions[i] >= 0) {
3817 0 0 EXTEND(SP, 1);
3826 1 0 if (self->state != CONN_READY)
3833 0 0 if (arg1 && SvROK(arg1) && SvTYPE(SvRV(arg1)) == SVt_PVHV) {
0 0 if (arg1 && SvROK(arg1) && SvTYPE(SvRV(arg1)) == SVt_PVHV) {
0 0 if (arg1 && SvROK(arg1) && SvTYPE(SvRV(arg1)) == SVt_PVHV) {
3843 0 0 if (opts) {
3845 0 0 if ((v = hv_fetchs(opts, "max_bytes", 0)) && SvOK(*v))
0 0 if ((v = hv_fetchs(opts, "max_bytes", 0)) && SvOK(*v))
3847 0 0 if ((v = hv_fetchs(opts, "max_wait_ms", 0)) && SvOK(*v))
0 0 if ((v = hv_fetchs(opts, "max_wait_ms", 0)) && SvOK(*v))
3849 0 0 if ((v = hv_fetchs(opts, "min_bytes", 0)) && SvOK(*v))
0 0 if ((v = hv_fetchs(opts, "min_bytes", 0)) && SvOK(*v))
3857 0 0 if (ver < 0) ver = 4;
3858 0 0 if (ver > 7) ver = 7;
3868 0 0 if (ver >= 3)
3872 0 0 if (ver >= 4)
3876 0 0 if (ver >= 7) {
3893 0 0 if (ver >= 5)
3900 0 0 if (ver >= 7)
3911 0 0 if (self->state != CONN_READY)
3916 0 0 if (!SvROK(topics_sv) || SvTYPE(SvRV(topics_sv)) != SVt_PVHV)
0 0 if (!SvROK(topics_sv) || SvTYPE(SvRV(topics_sv)) != SVt_PVHV)
3922 0 0 if (arg1 && SvROK(arg1) && SvTYPE(SvRV(arg1)) == SVt_PVHV) {
0 0 if (arg1 && SvROK(arg1) && SvTYPE(SvRV(arg1)) == SVt_PVHV) {
0 0 if (arg1 && SvROK(arg1) && SvTYPE(SvRV(arg1)) == SVt_PVHV) {
3932 0 0 if (opts) {
3934 0 0 if ((v = hv_fetchs(opts, "max_bytes", 0)) && SvOK(*v))
0 0 if ((v = hv_fetchs(opts, "max_bytes", 0)) && SvOK(*v))
3936 0 0 if ((v = hv_fetchs(opts, "max_wait_ms", 0)) && SvOK(*v))
0 0 if ((v = hv_fetchs(opts, "max_wait_ms", 0)) && SvOK(*v))
3938 0 0 if ((v = hv_fetchs(opts, "min_bytes", 0)) && SvOK(*v))
0 0 if ((v = hv_fetchs(opts, "min_bytes", 0)) && SvOK(*v))
3943 0 0 if (ver < 0) ver = 4;
3944 0 0 if (ver > 7) ver = 7;
3952 0 0 if (ver >= 3)
3954 0 0 if (ver >= 4)
3956 0 0 if (ver >= 7) {
3962 0 0 kf_buf_append_i32(&body, (int32_t)HvUSEDKEYS(topics_hv));
3966 0 0 while ((entry = hv_iternext(topics_hv))) {
3972 0 0 if (!SvROK(parts_sv) || SvTYPE(SvRV(parts_sv)) != SVt_PVAV)
0 0 if (!SvROK(parts_sv) || SvTYPE(SvRV(parts_sv)) != SVt_PVAV)
3978 0 0 for (i = 0; i < pc; i++) {
3980 0 0 if (!elem || !SvROK(*elem))
0 0 if (!elem || !SvROK(*elem))
3985 0 0 int32_t pid = pid_sv ? (int32_t)SvIV(*pid_sv) : 0;
3989 0 0 int64_t offset = off_sv ? (int64_t)SvIV(*off_sv) : 0;
3992 0 0 if (ver >= 5)
3999 0 0 if (ver >= 7)
4010 0 0 if (self->state != CONN_READY)
4013 0 0 if (!SvROK(records_sv) || SvTYPE(SvRV(records_sv)) != SVt_PVAV)
0 0 if (!SvROK(records_sv) || SvTYPE(SvRV(records_sv)) != SVt_PVAV)
4025 0 0 if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV) {
0 0 if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV) {
0 0 if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV) {
4028 0 0 if ((tmp = hv_fetch(opts, "acks", 4, 0)))
4030 0 0 if ((tmp = hv_fetch(opts, "transactional_id", 16, 0)) && SvOK(*tmp))
0 0 if ((tmp = hv_fetch(opts, "transactional_id", 16, 0)) && SvOK(*tmp))
4032 0 0 if ((tmp = hv_fetch(opts, "compression", 11, 0))) {
4035 0 0 if (clen == 4 && memcmp(cstr, "none", 4) == 0) compression = COMPRESS_NONE;
0 0 if (clen == 4 && memcmp(cstr, "none", 4) == 0) compression = COMPRESS_NONE;
4040 0 0 else if (clen == 4 && memcmp(cstr, "gzip", 4) == 0) compression = COMPRESS_GZIP;
0 0 else if (clen == 4 && memcmp(cstr, "gzip", 4) == 0) compression = COMPRESS_GZIP;
4050 0 0 if ((tmp = hv_fetch(opts, "producer_id", 11, 0)))
4052 0 0 if ((tmp = hv_fetch(opts, "producer_epoch", 14, 0)))
4054 0 0 if ((tmp = hv_fetch(opts, "base_sequence", 13, 0)))
4056 0 0 } else if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVCV) {
0 0 } else if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVCV) {
0 0 } else if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVCV) {
4071 0 0 if (ver < 0) ver = 3;
4072 0 0 if (ver > 7) ver = 7;
4079 0 0 if (ver >= 3)
4080 0 0 kf_buf_append_nullable_string(&body, txn_id, txn_id ? (int16_t)txn_id_len : 0);
4092 0 0 conn_send_request(aTHX_ self, API_PRODUCE, ver, &body,
4093 0 0 (cb && SvOK(cb)) ? cb : NULL, 0, (acks == 0) ? 1 : 0);
4103 0 0 if (self->state != CONN_READY)
4111 0 0 if (ver < 0) ver = 1;
4112 0 0 if (ver > 5) ver = 5;
4120 0 0 if (ver >= 2)
4132 0 0 if (ver >= 4)
4145 1 0 if (self->state != CONN_READY)
4155 0 0 if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV) {
0 0 if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV) {
0 0 if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV) {
4158 0 0 if ((tmp = hv_fetch(opts, "headers", 7, 0)) && SvROK(*tmp) && SvTYPE(SvRV(*tmp)) == SVt_PVHV)
0 0 if ((tmp = hv_fetch(opts, "headers", 7, 0)) && SvROK(*tmp) && SvTYPE(SvRV(*tmp)) == SVt_PVHV)
0 0 if ((tmp = hv_fetch(opts, "headers", 7, 0)) && SvROK(*tmp) && SvTYPE(SvRV(*tmp)) == SVt_PVHV)
4160 0 0 if ((tmp = hv_fetch(opts, "acks", 4, 0)))
4162 0 0 if ((tmp = hv_fetch(opts, "compression", 11, 0))) {
4165 0 0 if (clen == 4 && memcmp(cstr, "none", 4) == 0) compression = COMPRESS_NONE;
0 0 if (clen == 4 && memcmp(cstr, "none", 4) == 0) compression = COMPRESS_NONE;
4170 0 0 else if (clen == 4 && memcmp(cstr, "gzip", 4) == 0) compression = COMPRESS_GZIP;
0 0 else if (clen == 4 && memcmp(cstr, "gzip", 4) == 0) compression = COMPRESS_GZIP;
4180 0 0 } else if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVCV) {
0 0 } else if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVCV) {
0 0 } else if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVCV) {
4188 0 0 if (SvOK(key_sv))
4193 0 0 if (SvOK(value_sv))
4200 0 0 if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV)
0 0 if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV)
0 0 if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV)
4202 0 0 if (ts_tmp && SvOK(*ts_tmp)) {
0 0 if (ts_tmp && SvOK(*ts_tmp)) {
4216 0 0 if (key) hv_stores(rec, "key", newSVpvn(key, key_len));
4217 0 0 if (value) hv_stores(rec, "value", newSVpvn(value, value_len));
4218 0 0 if (headers) hv_stores(rec, "headers", newRV_inc((SV*)headers));
4228 0 0 if (ver < 0) ver = 3;
4229 0 0 if (ver > 7) ver = 7;
4237 0 0 if (ver >= 3)
4255 0 0 conn_send_request(aTHX_ self, API_PRODUCE, ver, &body,
4256 0 0 (cb && SvOK(cb)) ? cb : NULL, 0, (acks == 0) ? 1 : 0);
4266 0 0 if (self->state != CONN_READY)
4270 0 0 if (ver < 0) ver = 0;
4271 0 0 if (ver > 2) ver = 2;
4280 0 0 if (ver >= 1)
4291 0 0 if (self->state != CONN_READY)
4295 0 0 if (ver < 0) ver = 1;
4296 0 0 if (ver > 5) ver = 5;
4306 0 0 if (ver >= 1)
4313 0 0 if (ver >= 5) {
4314 0 0 if (group_instance_id_sv && SvOK(group_instance_id_sv)) {
0 0 if (group_instance_id_sv && SvOK(group_instance_id_sv)) {
4341 0 0 for (i = 0; i < tc; i++) {
4360 0 0 if (self->state != CONN_READY)
4364 0 0 if (ver < 0) ver = 0;
4365 0 0 if (ver > 3) ver = 3;
4378 0 0 if (ver >= 3) {
4379 0 0 if (group_instance_id_sv && SvOK(group_instance_id_sv)) {
0 0 if (group_instance_id_sv && SvOK(group_instance_id_sv)) {
4389 0 0 if (SvOK(assignments_sv) && SvROK(assignments_sv)
0 0 if (SvOK(assignments_sv) && SvROK(assignments_sv)
4390 0 0 && SvTYPE(SvRV(assignments_sv)) == SVt_PVAV) {
4395 0 0 for (i = 0; i < ac; i++) {
4397 0 0 if (!elem || !SvROK(*elem)) continue;
0 0 if (!elem || !SvROK(*elem)) continue;
4401 0 0 if (!mid_sv) continue;
4407 0 0 if (!data_sv) { kf_buf_append_bytes(&body, NULL, 0); continue; }
4424 0 0 if (self->state != CONN_READY)
4428 0 0 if (ver < 0) ver = 0;
4429 0 0 if (ver > 4) ver = 4;
4442 0 0 if (ver >= 3) {
4443 0 0 if (group_instance_id_sv && SvOK(group_instance_id_sv)) {
0 0 if (group_instance_id_sv && SvOK(group_instance_id_sv)) {
4460 0 0 if (self->state != CONN_READY)
4464 0 0 if (ver < 0) ver = 2;
4465 0 0 if (ver > 7) ver = 7;
4474 0 0 if (ver >= 1)
4478 0 0 if (ver >= 1) {
4484 0 0 if (ver >= 7)
4492 0 0 for (i = 0; i < tc; i++) {
4494 0 0 if (!elem || !SvROK(*elem)) continue;
0 0 if (!elem || !SvROK(*elem)) continue;
4497 0 0 if (!tname_sv) continue;
4503 0 0 if (!parts_sv || !SvROK(*parts_sv)) { kf_buf_append_i32(&body, 0); continue; }
0 0 if (!parts_sv || !SvROK(*parts_sv)) { kf_buf_append_i32(&body, 0); continue; }
4508 0 0 for (j = 0; j < pc; j++) {
4510 0 0 if (!pelem || !SvROK(*pelem)) continue;
0 0 if (!pelem || !SvROK(*pelem)) continue;
4514 0 0 kf_buf_append_i32(&body, pid_sv ? (int32_t)SvIV(*pid_sv) : 0);
4517 0 0 kf_buf_append_i64(&body, off_sv ? (int64_t)SvIV(*off_sv) : 0);
4520 0 0 if (ver >= 6)
4536 0 0 if (self->state != CONN_READY)
4540 0 0 if (ver < 0) ver = 1;
4541 0 0 if (ver > 5) ver = 5;
4554 0 0 for (i = 0; i < tc; i++) {
4556 0 0 if (!elem || !SvROK(*elem)) continue;
0 0 if (!elem || !SvROK(*elem)) continue;
4559 0 0 if (!tname_sv) continue;
4565 0 0 if (!parts_sv || !SvROK(*parts_sv)) { kf_buf_append_i32(&body, 0); continue; }
0 0 if (!parts_sv || !SvROK(*parts_sv)) { kf_buf_append_i32(&body, 0); continue; }
4570 0 0 for (j = 0; j < pc; j++) {
4572 0 0 kf_buf_append_i32(&body, pelem ? (int32_t)SvIV(*pelem) : 0);
4584 0 0 if (self->state != CONN_READY)
4588 0 0 if (ver < 0) ver = 0;
4589 0 0 if (ver > 3) ver = 3;
4608 0 0 if (self->state != CONN_READY)
4612 0 0 if (ver < 0) ver = 0;
4613 0 0 if (ver > 4) ver = 4;
4622 0 0 for (i = 0; i < tc; i++) {
4624 0 0 if (!elem || !SvROK(*elem)) continue;
0 0 if (!elem || !SvROK(*elem)) continue;
4628 0 0 if (!name_sv) continue;
4634 0 0 int32_t num_partitions = np_sv ? (int32_t)SvIV(*np_sv) : 1;
4638 0 0 int16_t replication_factor = rf_sv ? (int16_t)SvIV(*rf_sv) : 1;
4651 0 0 if (ver >= 1)
4662 0 0 if (self->state != CONN_READY)
4666 0 0 if (ver < 0) ver = 0;
4667 0 0 if (ver > 3) ver = 3;
4676 0 0 for (i = 0; i < tc; i++) {
4678 0 0 if (!elem) continue;
4694 0 0 if (self->state != CONN_READY)
4698 0 0 if (ver < 0) ver = 0;
4699 0 0 if (ver > 1) ver = 1;
4704 0 0 if (SvOK(transactional_id_sv)) {
4715 0 0 if (ver >= 2) {
4728 0 0 if (self->state != CONN_READY)
4732 0 0 if (ver < 0) ver = 0;
4733 0 0 if (ver > 1) ver = 1;
4746 0 0 if (!SvROK(topics_sv) || SvTYPE(SvRV(topics_sv)) != SVt_PVAV)
0 0 if (!SvROK(topics_sv) || SvTYPE(SvRV(topics_sv)) != SVt_PVAV)
4752 0 0 for (i = 0; i < tc; i++) {
4754 0 0 if (!elem || !SvROK(*elem)) croak("add_partitions_to_txn: bad element");
0 0 if (!elem || !SvROK(*elem)) croak("add_partitions_to_txn: bad element");
4757 0 0 if (!tname_sv) croak("add_partitions_to_txn: missing topic");
4763 0 0 if (!parts_sv || !SvROK(*parts_sv)) croak("add_partitions_to_txn: missing partitions");
0 0 if (!parts_sv || !SvROK(*parts_sv)) croak("add_partitions_to_txn: missing partitions");
4767 0 0 for (j = 0; j < pc; j++) {
4769 0 0 kf_buf_append_i32(&body, pelem ? (int32_t)SvIV(*pelem) : 0);
4781 0 0 if (self->state != CONN_READY)
4785 0 0 if (ver < 0) ver = 0;
4786 0 0 if (ver > 1) ver = 1;
4807 0 0 if (self->state != CONN_READY)
4811 0 0 if (ver < 0) ver = 0;
4812 0 0 if (ver > 3) ver = 3;
4829 0 0 if (ver >= 3) {
4837 0 0 if (!SvROK(offsets_sv) || SvTYPE(SvRV(offsets_sv)) != SVt_PVAV)
0 0 if (!SvROK(offsets_sv) || SvTYPE(SvRV(offsets_sv)) != SVt_PVAV)
4843 0 0 for (i = 0; i < tc; i++) {
4845 0 0 if (!elem || !SvROK(*elem)) croak("txn_offset_commit: bad element");
0 0 if (!elem || !SvROK(*elem)) croak("txn_offset_commit: bad element");
4849 0 0 if (!tname_sv) croak("txn_offset_commit: missing topic");
4855 0 0 if (!parts_sv || !SvROK(*parts_sv)) croak("txn_offset_commit: missing partitions");
0 0 if (!parts_sv || !SvROK(*parts_sv)) croak("txn_offset_commit: missing partitions");
4860 0 0 for (j = 0; j < pc; j++) {
4862 0 0 if (!pelem || !SvROK(*pelem)) croak("txn_offset_commit: bad partition");
0 0 if (!pelem || !SvROK(*pelem)) croak("txn_offset_commit: bad partition");
4866 0 0 kf_buf_append_i32(&body, ppid_sv ? (int32_t)SvIV(*ppid_sv) : 0);
4869 0 0 kf_buf_append_i64(&body, off_sv ? (int64_t)SvIV(*off_sv) : 0);
4872 0 0 if (ver >= 2)
4896 259 16 while (remaining >= 4) {
4918 9 7 RETVAL = (int)(h & 0x7FFFFFFF);
4969 0 12 EXTEND(SP, 1);
5010 0 14 if (n < 0) RETVAL = &PL_sv_undef;
5024 16 0 if (!SvROK(records_sv) || SvTYPE(SvRV(records_sv)) != SVt_PVAV)
0 16 if (!SvROK(records_sv) || SvTYPE(SvRV(records_sv)) != SVt_PVAV)
5034 11 5 if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV) {
11 0 if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV) {
11 0 if (opts_sv && SvROK(opts_sv) && SvTYPE(SvRV(opts_sv)) == SVt_PVHV) {
5037 10 1 if ((v = hv_fetchs(opts, "compression", 0)) && SvOK(*v))
10 0 if ((v = hv_fetchs(opts, "compression", 0)) && SvOK(*v))
5039 1 10 if ((v = hv_fetchs(opts, "producer_id", 0)) && SvOK(*v))
1 0 if ((v = hv_fetchs(opts, "producer_id", 0)) && SvOK(*v))
5041 1 10 if ((v = hv_fetchs(opts, "producer_epoch", 0)) && SvOK(*v))
1 0 if ((v = hv_fetchs(opts, "producer_epoch", 0)) && SvOK(*v))
5043 1 10 if ((v = hv_fetchs(opts, "base_sequence", 0)) && SvOK(*v))
1 0 if ((v = hv_fetchs(opts, "base_sequence", 0)) && SvOK(*v))
5045 0 11 if ((v = hv_fetchs(opts, "is_transactional", 0)) && SvOK(*v))
0 0 if ((v = hv_fetchs(opts, "is_transactional", 0)) && SvOK(*v))
5047 0 11 if ((v = hv_fetchs(opts, "timestamp", 0)) && SvOK(*v))
0 0 if ((v = hv_fetchs(opts, "timestamp", 0)) && SvOK(*v))
5074 2 13 if (strcmp(api, "metadata") == 0) {
5076 1 12 } else if (strcmp(api, "produce") == 0) {
5078 0 12 } else if (strcmp(api, "fetch") == 0) {
5080 0 12 } else if (strcmp(api, "list_offsets") == 0) {
5082 1 11 } else if (strcmp(api, "find_coordinator") == 0) {
5084 1 10 } else if (strcmp(api, "join_group") == 0) {
5086 1 9 } else if (strcmp(api, "sync_group") == 0) {
5088 1 8 } else if (strcmp(api, "heartbeat") == 0) {
5090 0 8 } else if (strcmp(api, "offset_commit") == 0) {
5092 1 7 } else if (strcmp(api, "offset_fetch") == 0) {
5094 1 6 } else if (strcmp(api, "leave_group") == 0) {
5096 1 5 } else if (strcmp(api, "create_topics") == 0) {
5098 1 4 } else if (strcmp(api, "delete_topics") == 0) {
5100 1 3 } else if (strcmp(api, "init_producer_id") == 0) {
5102 1 2 } else if (strcmp(api, "add_partitions_to_txn") == 0) {
5104 1 1 } else if (strcmp(api, "end_txn") == 0) {
5106 1 0 } else if (strcmp(api, "txn_offset_commit") == 0) {
5111 15 0 RETVAL = result ? SvREFCNT_inc(result) : &PL_sv_undef;
5127 3 13 if (n < 0) {
5139 0 21 I_EV_API("EV::Kafka");
21 0 I_EV_API("EV::Kafka");
0 21 I_EV_API("EV::Kafka");