Branch Coverage

ClickHouse.xs
Criterion Covered Total %
branch 3 1330 0.2


line true false branch
333 0 0 if (!self->reading && self->fd >= 0) {
0 0 if (!self->reading && self->fd >= 0) {
340 0 0 if (self->reading) {
347 0 0 if (!self->writing && self->fd >= 0) {
0 0 if (!self->writing && self->fd >= 0) {
354 0 0 if (self->writing) {
361 0 0 if (self->timing) {
368 0 0 if (self->magic == EV_CH_FREED && self->callback_depth == 0) {
0 0 if (self->magic == EV_CH_FREED && self->callback_depth == 0) {
379 0 0 if (self->failover_hosts) {
380 0 0 for (int i = 0; i < self->failover_n; i++)
381 0 0 if (self->failover_hosts[i]) Safefree(self->failover_hosts[i]);
385 0 0 if (self->failover_ports) {
396 0 0 if (self->on_failover) {
411 0 0 if (!msg) return 0;
412 0 0 for (const char **k = KEYWORDS; *k; k++) {
417 0 0 for (const char *q = p; *q; q++) {
418 0 0 if (strncasecmp(q, *k, kl) == 0) { m = q; break; }
420 0 0 if (!m) break;
421 0 0 int boundary_l = (m == msg) || !isalnum((unsigned char)m[-1]);
0 0 int boundary_l = (m == msg) || !isalnum((unsigned char)m[-1]);
423 0 0 if (boundary_l && boundary_r) return 1;
0 0 if (boundary_l && boundary_r) return 1;
433 0 0 if (!self->failover_hosts || self->failover_n <= 0) return;
0 0 if (!self->failover_hosts || self->failover_n <= 0) return;
434 0 0 if (!failover_msg_match(msg)) return;
435 0 0 char *old_host = self->host ? safe_strdup(self->host) : NULL;
438 0 0 CLEAR_STR(self->host);
441 0 0 if (self->on_failover) {
444 0 0 PUSHMARK(SP);
445 0 0 XPUSHs(old_host ? sv_2mortal(newSVpv(old_host, 0)) : &PL_sv_undef);
0 0 XPUSHs(old_host ? sv_2mortal(newSVpv(old_host, 0)) : &PL_sv_undef);
446 0 0 mXPUSHu(old_port);
447 0 0 XPUSHs(sv_2mortal(newSVpv(self->host, 0)));
448 0 0 mXPUSHu(self->port);
449 0 0 XPUSHs(sv_2mortal(newSVpv(msg ? msg : "", 0)));
0 0 XPUSHs(sv_2mortal(newSVpv(msg ? msg : "", 0)));
452 0 0 WARN_AND_CLEAR_ERRSV("on_failover");
0 0 WARN_AND_CLEAR_ERRSV("on_failover");
0 0 WARN_AND_CLEAR_ERRSV("on_failover");
0 0 WARN_AND_CLEAR_ERRSV("on_failover");
453 0 0 FREETMPS; LEAVE;
455 0 0 if (old_host) Safefree(old_host);
465 0 0 if (!self->on_error) goto done;
470 0 0 PUSHMARK(SP);
471 0 0 XPUSHs(sv_2mortal(newSVpv(msg, 0)));
475 0 0 WARN_AND_CLEAR_ERRSV("error handler");
0 0 WARN_AND_CLEAR_ERRSV("error handler");
0 0 WARN_AND_CLEAR_ERRSV("error handler");
0 0 WARN_AND_CLEAR_ERRSV("error handler");
477 0 0 FREETMPS;
493 0 0 if (check_destroyed(self)) return 1;
494 0 0 if (self->connect_gen != gen) return 0;
495 0 0 if (cancel_pending(self, cancel_msg)) return 1;
496 0 0 if (self->connect_gen != gen) return 0;
502 0 0 if (teardown_io_error(self, msg, msg)) return 1;
505 0 0 if (self->connect_gen == gen && self->auto_reconnect && self->host)
0 0 if (self->connect_gen == gen && self->auto_reconnect && self->host)
0 0 if (self->connect_gen == gen && self->auto_reconnect && self->host)
519 0 0 PUSHMARK(SP);
522 0 0 if (SvTRUE(ERRSV)) {
0 0 if (SvTRUE(ERRSV)) {
523 0 0 warn("EV::ClickHouse: exception in %s handler: %s",
525 0 0 sv_setsv(ERRSV, &PL_sv_undef);
527 0 0 FREETMPS;
537 0 0 if (!self->on_trace) return;
550 0 0 PUSHMARK(SP);
551 0 0 XPUSHs(sv_2mortal(newSVpv(buf, 0)));
554 0 0 WARN_AND_CLEAR_ERRSV("trace handler");
0 0 WARN_AND_CLEAR_ERRSV("trace handler");
0 0 WARN_AND_CLEAR_ERRSV("trace handler");
0 0 WARN_AND_CLEAR_ERRSV("trace handler");
555 0 0 FREETMPS;
569 0 0 if (out_on_complete) *out_on_complete = NULL;
570 0 0 if (ngx_queue_empty(&self->cb_queue)) return NULL;
576 0 0 CLEAR_SV(cbt->on_data);
577 0 0 if (out_on_complete) {
581 0 0 CLEAR_SV(cbt->on_complete);
594 0 0 if (ngx_queue_empty(&self->cb_queue)) return NULL;
603 0 0 if (ngx_queue_empty(&self->cb_queue)) return 0;
611 0 0 WARN_AND_CLEAR_ERRSV("callback");
0 0 WARN_AND_CLEAR_ERRSV("callback");
0 0 WARN_AND_CLEAR_ERRSV("callback");
0 0 WARN_AND_CLEAR_ERRSV("callback");
620 0 0 PUSHMARK(SP);
625 0 0 FREETMPS;
637 0 0 SV *target = override ? override : self->on_query_complete;
638 0 0 if (!target) return;
640 0 0 ? ev_now(self->loop) - self->query_start_time : 0.0;
645 0 0 PUSHMARK(SP);
646 0 0 EXTEND(SP, 6);
647 0 0 PUSHs(self->last_query_id
653 0 0 PUSHs(errmsg ? sv_2mortal(newSVpv(errmsg, 0)) : &PL_sv_undef);
656 0 0 WARN_AND_CLEAR_ERRSV("on_query_complete");
0 0 WARN_AND_CLEAR_ERRSV("on_query_complete");
0 0 WARN_AND_CLEAR_ERRSV("on_query_complete");
0 0 WARN_AND_CLEAR_ERRSV("on_query_complete");
657 0 0 FREETMPS; LEAVE;
673 0 0 if (cb == NULL) {
675 0 0 if (oqc) SvREFCNT_dec(oqc);
680 0 0 if (!IS_KEEPALIVE_CB(cb)) fire_on_query_complete_ex(self, errmsg, oqc);
0 0 if (!IS_KEEPALIVE_CB(cb)) fire_on_query_complete_ex(self, errmsg, oqc);
682 0 0 if (oqc) SvREFCNT_dec(oqc);
691 0 0 if (cb == NULL) {
692 0 0 if (rows) SvREFCNT_dec((SV*)rows);
694 0 0 if (oqc) SvREFCNT_dec(oqc);
699 0 0 if (!IS_KEEPALIVE_CB(cb)) fire_on_query_complete_ex(self, NULL, oqc);
0 0 if (!IS_KEEPALIVE_CB(cb)) fire_on_query_complete_ex(self, NULL, oqc);
704 0 0 PUSHMARK(SP);
705 0 0 PUSHs(rows ? sv_2mortal(newRV_noinc((SV*)rows)) : &PL_sv_undef);
708 0 0 FREETMPS;
711 0 0 if (oqc) SvREFCNT_dec(oqc);
720 0 0 if (cb == NULL) {
722 0 0 if (oqc) SvREFCNT_dec(oqc);
727 0 0 if (!IS_KEEPALIVE_CB(cb)) fire_on_query_complete_ex(self, NULL, oqc);
0 0 if (!IS_KEEPALIVE_CB(cb)) fire_on_query_complete_ex(self, NULL, oqc);
732 0 0 PUSHMARK(SP);
736 0 0 FREETMPS;
739 0 0 if (oqc) SvREFCNT_dec(oqc);
755 0 0 if (deliver_error(self, deliver_msg)) return 1;
756 0 0 if (self->connect_gen != gen) return 0;
757 0 0 if (cancel_pending(self, cancel_msg)) return 1;
758 0 0 if (self->connect_gen != gen) return 0;
771 0 0 cbt->on_data = on_data ? SvREFCNT_inc(on_data) : NULL;
778 0 0 if (has_arg) {
779 0 0 CLEAR_SV(*slot);
780 0 0 if (handler && SvROK(handler) && SvTYPE(SvRV(handler)) == SVt_PVCV) {
0 0 if (handler && SvROK(handler) && SvTYPE(SvRV(handler)) == SVt_PVCV) {
0 0 if (handler && SvROK(handler) && SvTYPE(SvRV(handler)) == SVt_PVCV) {
784 0 0 return *slot ? SvREFCNT_inc(*slot) : &PL_sv_undef;
790 0 0 if (!s) return NULL;
802 0 0 if (!cmt) return 0;
812 0 0 if (!s) return 0;
813 0 0 for (; *s; s++)
814 0 0 if (*s == '\r' || *s == '\n') return 1;
0 0 if (*s == '\r' || *s == '\n') return 1;
824 0 0 if (n >= self->recv_len) { self->recv_len = 0; return; }
852 0 0 if (was_connected) emit_trace(self, "disconnect");
859 0 0 if (self->ssl) {
864 0 0 if (self->ssl_ctx) {
870 0 0 if (self->fd >= 0) {
885 0 0 CLEAR_SV(self->native_rows);
886 0 0 CLEAR_SV(self->native_col_names);
887 0 0 CLEAR_SV(self->native_col_types);
888 0 0 CLEAR_SV(self->native_totals);
889 0 0 CLEAR_SV(self->native_extremes);
891 0 0 CLEAR_INSERT(self);
0 0 CLEAR_INSERT(self);
892 0 0 CLEAR_STR(self->insert_err);
898 0 0 if (was_connected && self->on_disconnect)
0 0 if (was_connected && self->on_disconnect)
917 0 0 if (!IS_KEEPALIVE_CB(cb))
0 0 if (!IS_KEEPALIVE_CB(cb))
919 0 0 if (oqc) SvREFCNT_dec(oqc);
929 0 0 while (!ngx_queue_empty(&self->cb_queue)) {
932 0 0 if (cb == NULL) break;
934 0 0 if (self->magic != EV_CH_MAGIC) break;
943 0 0 while (!ngx_queue_empty(&self->send_queue)) {
951 0 0 CLEAR_INSERT(send);
0 0 CLEAR_INSERT(send);
952 0 0 CLEAR_SV(send->on_data);
957 0 0 if (self->magic != EV_CH_MAGIC) break;
969 0 0 if (self->ssl) {
970 0 0 int ssl_len = (len > (size_t)INT_MAX) ? INT_MAX : (int)len;
972 0 0 if (ret <= 0) {
974 0 0 if (err == SSL_ERROR_WANT_READ) {
978 0 0 if (err == SSL_ERROR_WANT_WRITE) {
983 0 0 if (err == SSL_ERROR_ZERO_RETURN) return 0;
995 0 0 if (self->ssl) {
996 0 0 int ssl_len = (len > (size_t)INT_MAX) ? INT_MAX : (int)len;
998 0 0 if (ret <= 0) {
1000 0 0 if (err == SSL_ERROR_WANT_WRITE) {
1004 0 0 if (err == SSL_ERROR_WANT_READ) {
1021 0 0 if (self->recv_cap >= need) return;
1022 0 0 if (need > SIZE_MAX / 2) croak("recv buffer overflow");
1024 0 0 if (newcap < need) newcap = need;
1030 0 0 if (self->send_cap >= need) return;
1031 0 0 if (need > SIZE_MAX / 2) croak("send buffer overflow");
1033 0 0 if (newcap < need) newcap = need;
1055 0 0 if (need > SIZE_MAX - b->len) croak("native buffer overflow");
1056 0 0 if (b->len + need > b->cap) {
1057 0 0 while (b->len + need > b->cap) {
1058 0 0 if (b->cap > SIZE_MAX / 2) croak("native buffer overflow");
1073 0 0 while (n >= 0x80) {
1086 0 0 nbuf_string(b, s, s ? strlen(s) : 0);
1112 0 0 for (i = 0; i < len; i++) if (s[i] == '\'' || s[i] == '\\') esc++;
0 0 for (i = 0; i < len; i++) if (s[i] == '\'' || s[i] == '\\') esc++;
0 0 for (i = 0; i < len; i++) if (s[i] == '\'' || s[i] == '\\') esc++;
1116 0 0 if (esc == 0) {
1120 0 0 for (i = 0; i < len; i++) {
1121 0 0 if (s[i] == '\'' || s[i] == '\\') b->data[b->len++] = '\\';
0 0 if (s[i] == '\'' || s[i] == '\\') b->data[b->len++] = '\\';
1135 0 0 while (p < len) {
1138 0 0 if (!(byte & 0x80)) {
1144 0 0 if (shift >= 64) return -1;
1154 0 0 if (rc <= 0) { *pos = saved; return rc; }
1155 0 0 if (slen > len - *pos) { *pos = saved; return 0; }
1159 0 0 if (out_len) *out_len = (size_t)slen;
1170 0 0 if (rc <= 0) { *pos = saved; return rc; }
1171 0 0 if (slen > len - *pos) { *pos = saved; return 0; }
1179 0 0 if (*pos + 1 > len) return 0;
1186 0 0 if (*pos + 4 > len) return 0;
1197 0 0 if (rc <= 0) { *pos = saved; return rc; }
1198 0 0 if (slen > len - *pos) { *pos = saved; return 0; }
1209 0 0 for (i = 0; i < src_len; i++) {
1211 0 0 if ((c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z') ||
0 0 if ((c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z') ||
0 0 if ((c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z') ||
0 0 if ((c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z') ||
0 0 if ((c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z') ||
1212 0 0 (c >= '0' && c <= '9') || c == '-' || c == '_' || c == '.' || c == '~') {
0 0 (c >= '0' && c <= '9') || c == '-' || c == '_' || c == '.' || c == '~') {
0 0 (c >= '0' && c <= '9') || c == '-' || c == '_' || c == '.' || c == '~') {
0 0 (c >= '0' && c <= '9') || c == '-' || c == '_' || c == '.' || c == '~') {
0 0 (c >= '0' && c <= '9') || c == '-' || c == '_' || c == '.' || c == '~') {
1227 0 0 return (klen == 3 && memcmp(key, "raw", 3) == 0)
1228 0 0 || (klen == 8 && memcmp(key, "query_id", 8) == 0)
0 0 || (klen == 8 && memcmp(key, "query_id", 8) == 0)
1229 0 0 || (klen == 7 && memcmp(key, "on_data", 7) == 0)
0 0 || (klen == 7 && memcmp(key, "on_data", 7) == 0)
1230 0 0 || (klen == 13 && memcmp(key, "query_timeout", 13) == 0)
0 0 || (klen == 13 && memcmp(key, "query_timeout", 13) == 0)
1231 0 0 || (klen == 6 && memcmp(key, "params", 6) == 0)
0 0 || (klen == 6 && memcmp(key, "params", 6) == 0)
1232 0 0 || (klen == 8 && memcmp(key, "external", 8) == 0)
0 0 || (klen == 8 && memcmp(key, "external", 8) == 0)
1233 0 0 || (klen == 17 && memcmp(key, "on_query_complete", 17) == 0);
0 0 || (klen == 17 && memcmp(key, "on_query_complete", 17) == 0);
0 0 || (klen == 17 && memcmp(key, "on_query_complete", 17) == 0);
1240 0 0 if (overrides) {
1242 0 0 while ((entry = hv_iternext(overrides))) {
1246 0 0 if (is_client_only_key(key, klen)) continue;
1251 0 0 if (defaults) {
1253 0 0 while ((entry = hv_iternext(defaults))) {
1257 0 0 if (overrides && hv_exists(overrides, key, klen))
0 0 if (overrides && hv_exists(overrides, key, klen))
1259 0 0 if (is_client_only_key(key, klen)) continue;
1279 0 0 if (overrides) {
1281 0 0 while ((entry = hv_iternext(overrides))) {
1286 0 0 if (klen == 8 && memcmp(key, "query_id", 8) == 0) {
0 0 if (klen == 8 && memcmp(key, "query_id", 8) == 0) {
1291 0 0 if (is_client_only_key(key, klen)) continue;
1299 0 0 if (defaults) {
1301 0 0 while ((entry = hv_iternext(defaults))) {
1306 0 0 if (overrides && hv_exists(overrides, key, klen))
0 0 if (overrides && hv_exists(overrides, key, klen))
1308 0 0 if (klen == 8 && memcmp(key, "query_id", 8) == 0) {
0 0 if (klen == 8 && memcmp(key, "query_id", 8) == 0) {
1309 0 0 if (!*query_id_out) {
1315 0 0 if (is_client_only_key(key, klen)) continue;
1330 0 0 if (overrides) {
1332 0 0 while ((entry = hv_iternext(overrides))) {
1337 0 0 if (klen <= 6 || memcmp(key, "param_", 6) != 0) continue;
0 0 if (klen <= 6 || memcmp(key, "param_", 6) != 0) continue;
1344 0 0 if (defaults) {
1346 0 0 while ((entry = hv_iternext(defaults))) {
1351 0 0 if (klen <= 6 || memcmp(key, "param_", 6) != 0) continue;
0 0 if (klen <= 6 || memcmp(key, "param_", 6) != 0) continue;
1352 0 0 if (overrides && hv_exists(overrides, key, klen)) continue;
0 0 if (overrides && hv_exists(overrides, key, klen)) continue;
1368 0 0 if (overrides) {
1370 0 0 while ((entry = hv_iternext(overrides))) {
1375 0 0 if (is_client_only_key(key, klen)) continue;
1377 0 0 if (klen > 6 && memcmp(key, "param_", 6) == 0) continue;
0 0 if (klen > 6 && memcmp(key, "param_", 6) == 0) continue;
1385 0 0 if (defaults) {
1387 0 0 while ((entry = hv_iternext(defaults))) {
1392 0 0 if (overrides && hv_exists(overrides, key, klen))
0 0 if (overrides && hv_exists(overrides, key, klen))
1394 0 0 if (is_client_only_key(key, klen)) continue;
1395 0 0 if (klen > 6 && memcmp(key, "param_", 6) == 0) continue;
0 0 if (klen > 6 && memcmp(key, "param_", 6) == 0) continue;
1423 0 48 I_EV_API("EV::ClickHouse");
48 0 I_EV_API("EV::ClickHouse");
0 48 I_EV_API("EV::ClickHouse");
1461 0 0 if (self->magic != EV_CH_MAGIC) return;
1467 0 0 if (self->reconnect_timing) {
1472 0 0 if (PL_dirty) {
1474 0 0 while (!ngx_queue_empty(&self->send_queue)) {
1479 0 0 CLEAR_INSERT(send);
0 0 CLEAR_INSERT(send);
1480 0 0 CLEAR_SV(send->on_data);
1481 0 0 CLEAR_SV(send->on_complete);
1485 0 0 while (!ngx_queue_empty(&self->cb_queue)) {
1489 0 0 CLEAR_SV(cbt->on_data);
1490 0 0 CLEAR_SV(cbt->on_complete);
1495 0 0 #ifdef HAVE_OPENSSL
1496 0 0 if (self->ssl) { SSL_free(self->ssl); self->ssl = NULL; }
1497 0 0 if (self->ssl_ctx) { SSL_CTX_free(self->ssl_ctx); self->ssl_ctx = NULL; }
1498 0 0 #endif
0 0 #endif
0 0 #endif
0 0 #endif
0 0 #endif
0 0 #endif
0 0 #endif
0 0 #endif
0 0 #endif
1499 0 0 if (self->fd >= 0) close(self->fd);
0 0 if (self->fd >= 0) close(self->fd);
0 0 if (self->fd >= 0) close(self->fd);
0 0 if (self->fd >= 0) close(self->fd);
0 0 if (self->fd >= 0) close(self->fd);
0 0 if (self->fd >= 0) close(self->fd);
0 0 if (self->fd >= 0) close(self->fd);
0 0 if (self->fd >= 0) close(self->fd);
0 0 if (self->fd >= 0) close(self->fd);
0 0 if (self->fd >= 0) close(self->fd);
0 0 if (self->fd >= 0) close(self->fd);
0 0 if (self->fd >= 0) close(self->fd);
0 0 if (self->fd >= 0) close(self->fd);
0 0 if (self->fd >= 0) close(self->fd);
0 0 if (self->fd >= 0) close(self->fd);
0 0 if (self->fd >= 0) close(self->fd);
0 0 if (self->fd >= 0) close(self->fd);
0 0 if (self->fd >= 0) close(self->fd);
0 0 if (self->fd >= 0) close(self->fd);
0 0 if (self->fd >= 0) close(self->fd);
0 0 if (self->fd >= 0) close(self->fd);
0 0 if (self->fd >= 0) close(self->fd);
0 0 if (self->fd >= 0) close(self->fd);
1501 0 0 CLEAR_PERSISTENT_STATE(self);
0 0 CLEAR_PERSISTENT_STATE(self);
1507 0 0 }
1518 0 0 stop_timing(self);
1523 0 0 }
1528 0 0 SSL_free(self->ssl);
1533 0 0 self->ssl_ctx = NULL;
1541 0 0
0 0
0 0
0 0
0 0
0 0
0 0
0 0
0 0
1542 0 0 self->loop = NULL;
0 0 self->loop = NULL;
0 0 self->loop = NULL;
0 0 self->loop = NULL;
0 0 self->loop = NULL;
0 0 self->loop = NULL;
0 0 self->loop = NULL;
0 0 self->loop = NULL;
0 0 self->loop = NULL;
0 0 self->loop = NULL;
0 0 self->loop = NULL;
0 0 self->loop = NULL;
0 0 self->loop = NULL;
0 0 self->loop = NULL;
0 0 self->loop = NULL;
0 0 self->loop = NULL;
0 0 self->loop = NULL;
0 0 self->loop = NULL;
0 0 self->loop = NULL;
0 0 self->loop = NULL;
0 0 self->loop = NULL;
0 0 self->loop = NULL;
0 0 self->loop = NULL;
1545 0 0 CLEAR_CONNECTION_HANDLERS(self);
0 0 CLEAR_CONNECTION_HANDLERS(self);
1548 0 0 lc_free_dicts(self);
1562 0 0 CLEAR_STR(self->tls_ca_file);
1570 0 0 CLEAR_STR(self->tls_cert_file);
1578 0 0 CLEAR_STR(self->tls_key_file);
1586 0 0 if (self->connected || self->connecting) croak("already connected");
0 0 if (self->connected || self->connecting) croak("already connected");
1587 0 0 if (has_http_unsafe_chars(host) || has_http_unsafe_chars(user) ||
1588 0 0 has_http_unsafe_chars(password) || has_http_unsafe_chars(database))
1591 0 0 CLEAR_STR(self->host);
1592 0 0 CLEAR_STR(self->user);
1593 0 0 CLEAR_STR(self->password);
1594 0 0 CLEAR_STR(self->database);
1609 0 0 if (!self->host) croak("no previous connection to reset");
1610 0 0 if (cancel_pending(self, "connection reset")) return;
1611 0 0 if (cleanup_connection(self)) return; /* on_disconnect freed self */
1619 0 0 if (cancel_pending(self, "connection finished")) return;
1636 0 0 if (items == 3) {
1638 0 0 } else if (items == 4) {
1639 0 0 if (!(SvROK(ST(2)) && SvTYPE(SvRV(ST(2))) == SVt_PVHV))
0 0 if (!(SvROK(ST(2)) && SvTYPE(SvRV(ST(2))) == SVt_PVHV))
1647 0 0 if (!self->connected && !self->connecting && !self->dns_pending) croak("not connected");
0 0 if (!self->connected && !self->connecting && !self->dns_pending) croak("not connected");
0 0 if (!self->connected && !self->connecting && !self->dns_pending) croak("not connected");
1648 0 0 if (!(SvROK(cb) && SvTYPE(SvRV(cb)) == SVt_PVCV))
0 0 if (!(SvROK(cb) && SvTYPE(SvRV(cb)) == SVt_PVCV))
1651 0 0 if (self->protocol == PROTO_NATIVE && (self->insert_data || self->insert_av))
0 0 if (self->protocol == PROTO_NATIVE && (self->insert_data || self->insert_av))
0 0 if (self->protocol == PROTO_NATIVE && (self->insert_data || self->insert_av))
1659 0 0 if (settings) {
1661 0 0 if (settings_copy) settings = settings_copy;
1663 0 0 if (svp) raw = SvTRUE(*svp) ? 1 : 0;
1665 0 0 if (svp && SvROK(*svp) && SvTYPE(SvRV(*svp)) == SVt_PVCV)
0 0 if (svp && SvROK(*svp) && SvTYPE(SvRV(*svp)) == SVt_PVCV)
0 0 if (svp && SvROK(*svp) && SvTYPE(SvRV(*svp)) == SVt_PVCV)
1668 0 0 if (svp && SvROK(*svp) && SvTYPE(SvRV(*svp)) == SVt_PVCV)
0 0 if (svp && SvROK(*svp) && SvTYPE(SvRV(*svp)) == SVt_PVCV)
0 0 if (svp && SvROK(*svp) && SvTYPE(SvRV(*svp)) == SVt_PVCV)
1671 0 0 if (svp && SvROK(*svp) && SvTYPE(SvRV(*svp)) == SVt_PVHV
0 0 if (svp && SvROK(*svp) && SvTYPE(SvRV(*svp)) == SVt_PVHV
0 0 if (svp && SvROK(*svp) && SvTYPE(SvRV(*svp)) == SVt_PVHV
1672 0 0 && HvKEYS((HV *)SvRV(*svp)) > 0)
0 0 && HvKEYS((HV *)SvRV(*svp)) > 0)
1678 0 0 if (self->max_query_size > 0 && sql_len > self->max_query_size) {
0 0 if (self->max_query_size > 0 && sql_len > self->max_query_size) {
1679 0 0 if (settings_copy) SvREFCNT_dec((SV*)settings_copy);
1684 0 0 if (raw && self->protocol == PROTO_NATIVE) {
0 0 if (raw && self->protocol == PROTO_NATIVE) {
1685 0 0 if (settings_copy) SvREFCNT_dec((SV*)settings_copy);
1689 0 0 if (on_data_sv && self->protocol == PROTO_HTTP) {
0 0 if (on_data_sv && self->protocol == PROTO_HTTP) {
1690 0 0 if (settings_copy) SvREFCNT_dec((SV*)settings_copy);
1694 0 0 if (external && self->protocol == PROTO_HTTP) {
0 0 if (external && self->protocol == PROTO_HTTP) {
1695 0 0 if (settings_copy) SvREFCNT_dec((SV*)settings_copy);
1703 0 0 if (self->query_log_comment) {
1713 0 0 if (self->protocol == PROTO_HTTP) {
1720 0 0 if (external) {
1725 0 0 if (!ext_data) {
1726 0 0 if (settings_copy) SvREFCNT_dec((SV*)settings_copy);
1727 0 0 if (qlc_sql) Safefree(qlc_sql);
1734 0 0 if (ext_data) Safefree(ext_data);
1736 0 0 if (qlc_sql) Safefree(qlc_sql);
1742 0 0 if (on_data_sv) s->on_data = SvREFCNT_inc(on_data_sv);
1743 0 0 if (on_complete_sv) s->on_complete = SvREFCNT_inc(on_complete_sv);
1744 0 0 if (settings) send_apply_settings(s, settings);
1746 0 0 if (settings_copy) SvREFCNT_dec((SV*)settings_copy);
1764 0 0 if (items == 4) {
1766 0 0 } else if (items == 5) {
1767 0 0 if (!(SvROK(ST(3)) && SvTYPE(SvRV(ST(3))) == SVt_PVHV))
0 0 if (!(SvROK(ST(3)) && SvTYPE(SvRV(ST(3))) == SVt_PVHV))
1775 0 0 if (!self->connected && !self->connecting && !self->dns_pending) croak("not connected");
0 0 if (!self->connected && !self->connecting && !self->dns_pending) croak("not connected");
0 0 if (!self->connected && !self->connecting && !self->dns_pending) croak("not connected");
1776 0 0 if (!(SvROK(cb) && SvTYPE(SvRV(cb)) == SVt_PVCV))
0 0 if (!(SvROK(cb) && SvTYPE(SvRV(cb)) == SVt_PVCV))
1782 0 0 if (SvROK(data_sv) && SvTYPE(SvRV(data_sv)) == SVt_PVAV) {
0 0 if (SvROK(data_sv) && SvTYPE(SvRV(data_sv)) == SVt_PVAV) {
1788 0 0 if (self->protocol == PROTO_NATIVE && (self->insert_data || self->insert_av))
0 0 if (self->protocol == PROTO_NATIVE && (self->insert_data || self->insert_av))
0 0 if (self->protocol == PROTO_NATIVE && (self->insert_data || self->insert_av))
1796 0 0 if (self->protocol == PROTO_HTTP) {
1797 0 0 if (data_is_av) {
1808 0 0 if (settings) {
1814 0 0 if (idem && SvTRUE(*idem)) {
0 0 if (idem && SvTRUE(*idem)) {
1815 0 0 if (!settings_copy) {
1823 0 0 if (SvPOK(*idem) || SvIOK(*idem) || SvNOK(*idem)) {
0 0 if (SvPOK(*idem) || SvIOK(*idem) || SvNOK(*idem)) {
0 0 if (SvPOK(*idem) || SvIOK(*idem) || SvNOK(*idem)) {
1827 0 0 if (!(tlen == 1 && tstr[0] == '1')) generate = 0;
0 0 if (!(tlen == 1 && tstr[0] == '1')) generate = 0;
1829 0 0 if (generate) {
1844 0 0 if (async && SvTRUE(*async)) {
0 0 if (async && SvTRUE(*async)) {
1845 0 0 if (!settings_copy) settings_copy = newHVhv(settings);
1847 0 0 if (!hv_exists(settings_copy, "wait_for_async_insert", 21))
1851 0 0 if (settings_copy) settings = settings_copy;
1860 0 0 : 0;
1869 0 0 if (self->protocol == PROTO_HTTP) {
1874 0 0 if (tsv_buf) Safefree(tsv_buf);
1888 0 0 if (settings) send_apply_settings(s, settings);
1892 0 0 if (settings) {
1894 0 0 if (svp && SvROK(*svp) && SvTYPE(SvRV(*svp)) == SVt_PVCV)
0 0 if (svp && SvROK(*svp) && SvTYPE(SvRV(*svp)) == SVt_PVCV)
0 0 if (svp && SvROK(*svp) && SvTYPE(SvRV(*svp)) == SVt_PVCV)
1897 0 0 if (settings_copy) SvREFCNT_dec((SV*)settings_copy);
1902 0 0 if (self->protocol == PROTO_NATIVE) {
1903 0 0 if (data_is_av) {
1908 0 0 Newx(s->insert_data, data_len > 0 ? data_len : 1, char);
1909 0 0 if (data_len > 0)
1927 0 0 if (!self->connected && !self->connecting && !self->dns_pending) croak("not connected");
0 0 if (!self->connected && !self->connecting && !self->dns_pending) croak("not connected");
0 0 if (!self->connected && !self->connecting && !self->dns_pending) croak("not connected");
1928 0 0 if (!(SvROK(cb) && SvTYPE(SvRV(cb)) == SVt_PVCV))
0 0 if (!(SvROK(cb) && SvTYPE(SvRV(cb)) == SVt_PVCV))
1931 0 0 if (self->protocol == PROTO_HTTP) {
1975 0 0 RETVAL = self->connected ? 1 : 0;
1984 0 0 RETVAL = self->pending_count;
2001 0 0 if (!ngx_queue_empty(&self->cb_queue)) {
2004 0 0 (void)hv_stores(h, "query_id",
2009 0 0 ? ev_now(self->loop) - self->query_start_time : 0.0;
2015 0 0 q != ngx_queue_sentinel(&self->send_queue);
2020 0 0 (void)hv_stores(h, "query_id",
2050 0 0 (void)hv_stores(h, "protocol", newSVpv(self->protocol == PROTO_NATIVE ? "native" : "http", 0));
2053 0 0 (void)hv_stores(h, "host",
2068 0 0 RETVAL = self->host ? newSVpv(self->host, 0) : &PL_sv_undef;
2077 0 0 RETVAL = self->port;
2086 0 0 if (self->server_name) {
2094 0 0 if (n >= (int)sizeof(buf)) n = (int)sizeof(buf) - 1;
2107 0 0 if (self->server_revision) {
2113 0 0 if (n >= (int)sizeof(buf)) n = (int)sizeof(buf) - 1;
2126 0 0 RETVAL = (UV)self->server_revision;
2139 0 0 if (cancel_pending(self, "skipped")) return;
2140 0 0 CLEAR_INSERT(self);
0 0 CLEAR_INSERT(self);
2141 0 0 CLEAR_STR(self->insert_err);
2142 0 0 if (had_inflight)
2164 0 0 CLEAR_STR(self->session_id);
2172 0 0 CLEAR_STR(self->query_log_comment);
2180 0 0 CLEAR_STR(self->host);
2204 0 0 self->dns_pending = 0;
2229 0 0 if (!(SvROK(href) && SvTYPE(SvRV(href)) == SVt_PVHV))
0 0 if (!(SvROK(href) && SvTYPE(SvRV(href)) == SVt_PVHV))
2231 0 0 CLEAR_SV(self->default_settings);
2275 0 0 RETVAL = self->last_tls_error ? newSVpv(self->last_tls_error, 0) : &PL_sv_undef;
2294 0 0 if (!SvOK(hosts_av_ref)) return;
2295 0 0 if (!(SvROK(hosts_av_ref) && SvTYPE(SvRV(hosts_av_ref)) == SVt_PVAV))
0 0 if (!(SvROK(hosts_av_ref) && SvTYPE(SvRV(hosts_av_ref)) == SVt_PVAV))
2298 0 0 SSize_t n = av_top_index(hosts) + 1;
2299 0 0 if (n <= 0) return;
2300 0 0 Newx(self->failover_hosts, n, char *);
2301 0 0 Newx(self->failover_ports, n, unsigned int);
2307 0 0 for (SSize_t i = 0; i < n; i++) {
2309 0 0 if (!e || !SvOK(*e)) {
0 0 if (!e || !SvOK(*e)) {
2319 0 0 if (sl > 0 && s[0] == '[') {
0 0 if (sl > 0 && s[0] == '[') {
2321 0 0 if (close) {
2325 0 0 if (close + 1 < s + sl && close[1] == ':')
0 0 if (close + 1 < s + sl && close[1] == ':')
2329 0 0 if (!host && (colon = (const char *)memchr(s, ':', sl))) {
0 0 if (!host && (colon = (const char *)memchr(s, ':', sl))) {
2336 0 0 if (!host) host = safe_strdup(s);
2347 0 0 : &PL_sv_undef;
2406 0 0 : &PL_sv_undef;
2416 0 0 : &PL_sv_undef;
2435 0 0 : &PL_sv_undef;
2445 0 0 : &PL_sv_undef;
2455 0 0 : &PL_sv_undef;
2521 0 0 self->reconnect_jitter = val < 0 ? 0 : val;
2542 0 0 if (!(SvROK(cb) && SvTYPE(SvRV(cb)) == SVt_PVCV))
0 0 if (!(SvROK(cb) && SvTYPE(SvRV(cb)) == SVt_PVCV))
2544 0 0 CLEAR_SV(self->on_drain);
2545 0 0 if (self->pending_count == 0 && ngx_queue_empty(&self->send_queue)) {
0 0 if (self->pending_count == 0 && ngx_queue_empty(&self->send_queue)) {
2557 0 0 if (self->protocol == PROTO_NATIVE && self->send_count > 0) {
0 0 if (self->protocol == PROTO_NATIVE && self->send_count > 0) {
2568 0 0 } else if (self->protocol == PROTO_HTTP && self->send_count > 0) {
0 0 } else if (self->protocol == PROTO_HTTP && self->send_count > 0) {
2570 0 0 CLEAR_SV(self->native_rows);
2572 0 0 if (cancel_pending(self, "query cancelled")) return;
2573 0 0 if (self->connect_gen != gen) return;
2574 0 0 if (cleanup_connection(self)) return; /* on_disconnect freed self */
2575 0 0 if (self->auto_reconnect && self->host)
0 0 if (self->auto_reconnect && self->host)
2589 0 0 if (!(SvROK(self_sv) && SvTYPE(SvRV(self_sv)) == SVt_PVHV))
0 0 if (!(SvROK(self_sv) && SvTYPE(SvRV(self_sv)) == SVt_PVHV))
2594 0 0 if (!buf_p || !SvROK(*buf_p) || SvTYPE(SvRV(*buf_p)) != SVt_PVAV)
0 0 if (!buf_p || !SvROK(*buf_p) || SvTYPE(SvRV(*buf_p)) != SVt_PVAV)
0 0 if (!buf_p || !SvROK(*buf_p) || SvTYPE(SvRV(*buf_p)) != SVt_PVAV)
2599 0 0 if (SvROK(row) && SvTYPE(SvRV(row)) == SVt_PVHV) {
0 0 if (SvROK(row) && SvTYPE(SvRV(row)) == SVt_PVHV) {
2601 0 0 if (!cols_p || !SvROK(*cols_p) || SvTYPE(SvRV(*cols_p)) != SVt_PVAV)
0 0 if (!cols_p || !SvROK(*cols_p) || SvTYPE(SvRV(*cols_p)) != SVt_PVAV)
0 0 if (!cols_p || !SvROK(*cols_p) || SvTYPE(SvRV(*cols_p)) != SVt_PVAV)
2605 0 0 SSize_t n = av_top_index(cols) + 1;
2608 0 0 for (SSize_t i = 0; i < n; i++) {
2610 0 0 if (!col) { av_push(out, newSV(0)); continue; }
2614 0 0 av_push(out, vp ? newSVsv(*vp) : newSV(0));
2621 0 0 SSize_t buf_n = av_top_index(buffer) + 1;
2624 0 0 SSize_t batch = bs_p ? SvIV(*bs_p) : 10000;
2629 0 0 SSize_t hw = hw_p ? SvIV(*hw_p) : 0;
2630 0 0 if (hw && buf_n >= hw) {
0 0 if (hw && buf_n >= hw) {
2632 0 0 if (!act_p || !SvTRUE(*act_p)) {
0 0 if (!act_p || !SvTRUE(*act_p)) {
2641 0 0 if (need_flush) {
2644 0 0 PUSHMARK(SP);
2645 0 0 XPUSHs(self_sv);
2648 0 0 WARN_AND_CLEAR_ERRSV("Streamer _flush");
0 0 WARN_AND_CLEAR_ERRSV("Streamer _flush");
0 0 WARN_AND_CLEAR_ERRSV("Streamer _flush");
0 0 WARN_AND_CLEAR_ERRSV("Streamer _flush");
2649 0 0 FREETMPS; LEAVE;
2652 0 0 if (need_hw) {
2654 0 0 if (cb_p && SvROK(*cb_p) && SvTYPE(SvRV(*cb_p)) == SVt_PVCV) {
0 0 if (cb_p && SvROK(*cb_p) && SvTYPE(SvRV(*cb_p)) == SVt_PVCV) {
0 0 if (cb_p && SvROK(*cb_p) && SvTYPE(SvRV(*cb_p)) == SVt_PVCV) {
2656 0 0 IV inflight = inflight_p ? SvIV(*inflight_p) : 0;
2659 0 0 PUSHMARK(SP);
2660 0 0 mXPUSHi(buf_n);
2661 0 0 mXPUSHi(inflight);
2664 0 0 WARN_AND_CLEAR_ERRSV("on_high_water");
0 0 WARN_AND_CLEAR_ERRSV("on_high_water");
0 0 WARN_AND_CLEAR_ERRSV("on_high_water");
0 0 WARN_AND_CLEAR_ERRSV("on_high_water");
2665 0 0 FREETMPS; LEAVE;
2680 0 0 if (!(SvROK(pool_sv) && SvTYPE(SvRV(pool_sv)) == SVt_PVHV))
0 0 if (!(SvROK(pool_sv) && SvTYPE(SvRV(pool_sv)) == SVt_PVHV))
2685 0 0 if (!conns_p || !SvROK(*conns_p) || SvTYPE(SvRV(*conns_p)) != SVt_PVAV)
0 0 if (!conns_p || !SvROK(*conns_p) || SvTYPE(SvRV(*conns_p)) != SVt_PVAV)
0 0 if (!conns_p || !SvROK(*conns_p) || SvTYPE(SvRV(*conns_p)) != SVt_PVAV)
2688 0 0 SSize_t n = av_top_index(conns) + 1;
2689 0 0 if (n <= 0) croak("_pool_pick: empty pool");
2692 0 0 IV thresh = thresh_p ? SvIV(*thresh_p) : 0;
2695 0 0 if (thresh > 0) {
2697 0 0 if (cs_p && SvROK(*cs_p) && SvTYPE(SvRV(*cs_p)) == SVt_PVAV)
0 0 if (cs_p && SvROK(*cs_p) && SvTYPE(SvRV(*cs_p)) == SVt_PVAV)
0 0 if (cs_p && SvROK(*cs_p) && SvTYPE(SvRV(*cs_p)) == SVt_PVAV)
2707 0 0 if (p_p && SvROK(*p_p) && SvTYPE(SvRV(*p_p)) == SVt_PVHV)
0 0 if (p_p && SvROK(*p_p) && SvTYPE(SvRV(*p_p)) == SVt_PVHV)
0 0 if (p_p && SvROK(*p_p) && SvTYPE(SvRV(*p_p)) == SVt_PVHV)
2714 0 0 if (n > ties_cap) {
2715 0 0 Newx(ties, n, int);
2723 0 0 for (int pass = 0; pass < 3; pass++) {
2724 0 0 for (SSize_t i = 0; i < n; i++) {
2725 0 0 if (pass < 2 && cb_state) {
0 0 if (pass < 2 && cb_state) {
2727 0 0 if (slot_sv && SvROK(*slot_sv) && SvTYPE(SvRV(*slot_sv)) == SVt_PVHV) {
0 0 if (slot_sv && SvROK(*slot_sv) && SvTYPE(SvRV(*slot_sv)) == SVt_PVHV) {
0 0 if (slot_sv && SvROK(*slot_sv) && SvTYPE(SvRV(*slot_sv)) == SVt_PVHV) {
2730 0 0 if (du_p && SvNV(*du_p) > now) continue;
0 0 if (du_p && SvNV(*du_p) > now) continue;
2734 0 0 if (!csv || !SvROK(*csv) || !sv_isa(*csv, "EV::ClickHouse")) continue;
0 0 if (!csv || !SvROK(*csv) || !sv_isa(*csv, "EV::ClickHouse")) continue;
0 0 if (!csv || !SvROK(*csv) || !sv_isa(*csv, "EV::ClickHouse")) continue;
2736 0 0 if (ch->magic != EV_CH_MAGIC) continue; /* freed mid-callback */
2737 0 0 if (pass == 0 && pinned) {
0 0 if (pass == 0 && pinned) {
2744 0 0 if (hv_exists(pinned, key, klen)) continue;
2747 0 0 if (best < 0 || pc < best_n) {
0 0 if (best < 0 || pc < best_n) {
2752 0 0 } else if (pc == best_n) {
2753 0 0 if (n_ties < ties_cap) ties[n_ties++] = (int)i;
2756 0 0 if (best >= 0) break;
2762 0 0 if (best < 0) {
2763 0 0 if (ties != stack_ties) Safefree(ties);
2768 0 0 if (n_ties == 1) {
2772 0 0 IV idx = idx_p ? SvIV(*idx_p) : 0;
2780 0 0 if (ties != stack_ties) Safefree(ties);
2792 0 0 if (!(SvROK(self_sv) && SvTYPE(SvRV(self_sv)) == SVt_PVHV))
0 0 if (!(SvROK(self_sv) && SvTYPE(SvRV(self_sv)) == SVt_PVHV))
2796 0 0 if (!ch_p || !SvROK(*ch_p) || !sv_isa(*ch_p, "EV::ClickHouse"))
0 0 if (!ch_p || !SvROK(*ch_p) || !sv_isa(*ch_p, "EV::ClickHouse"))
0 0 if (!ch_p || !SvROK(*ch_p) || !sv_isa(*ch_p, "EV::ClickHouse"))
2799 0 0 struct ev_loop *loop = ch->loop ? ch->loop : EV_DEFAULT;
2802 0 0 if (!batches_p || !SvROK(*batches_p) || SvTYPE(SvRV(*batches_p)) != SVt_PVAV)
0 0 if (!batches_p || !SvROK(*batches_p) || SvTYPE(SvRV(*batches_p)) != SVt_PVAV)
0 0 if (!batches_p || !SvROK(*batches_p) || SvTYPE(SvRV(*batches_p)) != SVt_PVAV)
2807 0 0 if (SvOK(timeout_sv)) {
2809 0 0 if (t > 0) timeout = t;
2811 0 0 double expires = timeout > 0 ? ev_now(loop) + timeout : 0;
2816 0 0 while (av_top_index(batches) < 0) {
0 0 while (av_top_index(batches) < 0) {
2818 0 0 if (done_p && SvTRUE(*done_p)) break;
0 0 if (done_p && SvTRUE(*done_p)) break;
2819 0 0 if (expires) {
2821 0 0 if (left <= 0) {
2822 0 0 if (armed) { ev_timer_stop(loop, &to); armed = 0; }
2831 0 0 if (armed) { ev_timer_stop(loop, &to); armed = 0; }
2832 0 0 if (av_top_index(batches) < 0) {
0 0 if (av_top_index(batches) < 0) {
2834 0 0 if (!dp2 || !SvTRUE(*dp2)) {
0 0 if (!dp2 || !SvTRUE(*dp2)) {
2842 0 0 if (av_top_index(batches) >= 0) {
0 0 if (av_top_index(batches) >= 0) {
2844 0 0 RETVAL = b ? b : &PL_sv_undef;
2858 0 0 if (!(SvROK(slot_sv) && SvTYPE(SvRV(slot_sv)) == SVt_PVHV)) return;
0 0 if (!(SvROK(slot_sv) && SvTYPE(SvRV(slot_sv)) == SVt_PVHV)) return;
2860 0 0 if (SvTRUE(err_sv)) {
2862 0 0 IV fails = (fails_p ? SvIV(*fails_p) : 0) + 1;
2864 0 0 if (threshold > 0 && fails >= threshold) {
0 0 if (threshold > 0 && fails >= threshold) {