| line |
true |
false |
branch |
|
333
|
0 |
0 |
if (!self->reading && self->fd >= 0) { |
|
|
0 |
0 |
if (!self->reading && self->fd >= 0) { |
|
340
|
0 |
0 |
if (self->reading) { |
|
347
|
0 |
0 |
if (!self->writing && self->fd >= 0) { |
|
|
0 |
0 |
if (!self->writing && self->fd >= 0) { |
|
354
|
0 |
0 |
if (self->writing) { |
|
361
|
0 |
0 |
if (self->timing) { |
|
368
|
0 |
0 |
if (self->magic == EV_CH_FREED && self->callback_depth == 0) { |
|
|
0 |
0 |
if (self->magic == EV_CH_FREED && self->callback_depth == 0) { |
|
379
|
0 |
0 |
if (self->failover_hosts) { |
|
380
|
0 |
0 |
for (int i = 0; i < self->failover_n; i++) |
|
381
|
0 |
0 |
if (self->failover_hosts[i]) Safefree(self->failover_hosts[i]); |
|
385
|
0 |
0 |
if (self->failover_ports) { |
|
396
|
0 |
0 |
if (self->on_failover) { |
|
411
|
0 |
0 |
if (!msg) return 0; |
|
412
|
0 |
0 |
for (const char **k = KEYWORDS; *k; k++) { |
|
417
|
0 |
0 |
for (const char *q = p; *q; q++) { |
|
418
|
0 |
0 |
if (strncasecmp(q, *k, kl) == 0) { m = q; break; } |
|
420
|
0 |
0 |
if (!m) break; |
|
421
|
0 |
0 |
int boundary_l = (m == msg) || !isalnum((unsigned char)m[-1]); |
|
|
0 |
0 |
int boundary_l = (m == msg) || !isalnum((unsigned char)m[-1]); |
|
423
|
0 |
0 |
if (boundary_l && boundary_r) return 1; |
|
|
0 |
0 |
if (boundary_l && boundary_r) return 1; |
|
433
|
0 |
0 |
if (!self->failover_hosts || self->failover_n <= 0) return; |
|
|
0 |
0 |
if (!self->failover_hosts || self->failover_n <= 0) return; |
|
434
|
0 |
0 |
if (!failover_msg_match(msg)) return; |
|
435
|
0 |
0 |
char *old_host = self->host ? safe_strdup(self->host) : NULL; |
|
438
|
0 |
0 |
CLEAR_STR(self->host); |
|
441
|
0 |
0 |
if (self->on_failover) { |
|
444
|
0 |
0 |
PUSHMARK(SP); |
|
445
|
0 |
0 |
XPUSHs(old_host ? sv_2mortal(newSVpv(old_host, 0)) : &PL_sv_undef); |
|
|
0 |
0 |
XPUSHs(old_host ? sv_2mortal(newSVpv(old_host, 0)) : &PL_sv_undef); |
|
446
|
0 |
0 |
mXPUSHu(old_port); |
|
447
|
0 |
0 |
XPUSHs(sv_2mortal(newSVpv(self->host, 0))); |
|
448
|
0 |
0 |
mXPUSHu(self->port); |
|
449
|
0 |
0 |
XPUSHs(sv_2mortal(newSVpv(msg ? msg : "", 0))); |
|
|
0 |
0 |
XPUSHs(sv_2mortal(newSVpv(msg ? msg : "", 0))); |
|
452
|
0 |
0 |
WARN_AND_CLEAR_ERRSV("on_failover"); |
|
|
0 |
0 |
WARN_AND_CLEAR_ERRSV("on_failover"); |
|
|
0 |
0 |
WARN_AND_CLEAR_ERRSV("on_failover"); |
|
|
0 |
0 |
WARN_AND_CLEAR_ERRSV("on_failover"); |
|
453
|
0 |
0 |
FREETMPS; LEAVE; |
|
455
|
0 |
0 |
if (old_host) Safefree(old_host); |
|
465
|
0 |
0 |
if (!self->on_error) goto done; |
|
470
|
0 |
0 |
PUSHMARK(SP); |
|
471
|
0 |
0 |
XPUSHs(sv_2mortal(newSVpv(msg, 0))); |
|
475
|
0 |
0 |
WARN_AND_CLEAR_ERRSV("error handler"); |
|
|
0 |
0 |
WARN_AND_CLEAR_ERRSV("error handler"); |
|
|
0 |
0 |
WARN_AND_CLEAR_ERRSV("error handler"); |
|
|
0 |
0 |
WARN_AND_CLEAR_ERRSV("error handler"); |
|
477
|
0 |
0 |
FREETMPS; |
|
493
|
0 |
0 |
if (check_destroyed(self)) return 1; |
|
494
|
0 |
0 |
if (self->connect_gen != gen) return 0; |
|
495
|
0 |
0 |
if (cancel_pending(self, cancel_msg)) return 1; |
|
496
|
0 |
0 |
if (self->connect_gen != gen) return 0; |
|
502
|
0 |
0 |
if (teardown_io_error(self, msg, msg)) return 1; |
|
505
|
0 |
0 |
if (self->connect_gen == gen && self->auto_reconnect && self->host) |
|
|
0 |
0 |
if (self->connect_gen == gen && self->auto_reconnect && self->host) |
|
|
0 |
0 |
if (self->connect_gen == gen && self->auto_reconnect && self->host) |
|
519
|
0 |
0 |
PUSHMARK(SP); |
|
522
|
0 |
0 |
if (SvTRUE(ERRSV)) { |
|
|
0 |
0 |
if (SvTRUE(ERRSV)) { |
|
523
|
0 |
0 |
warn("EV::ClickHouse: exception in %s handler: %s", |
|
525
|
0 |
0 |
sv_setsv(ERRSV, &PL_sv_undef); |
|
527
|
0 |
0 |
FREETMPS; |
|
537
|
0 |
0 |
if (!self->on_trace) return; |
|
550
|
0 |
0 |
PUSHMARK(SP); |
|
551
|
0 |
0 |
XPUSHs(sv_2mortal(newSVpv(buf, 0))); |
|
554
|
0 |
0 |
WARN_AND_CLEAR_ERRSV("trace handler"); |
|
|
0 |
0 |
WARN_AND_CLEAR_ERRSV("trace handler"); |
|
|
0 |
0 |
WARN_AND_CLEAR_ERRSV("trace handler"); |
|
|
0 |
0 |
WARN_AND_CLEAR_ERRSV("trace handler"); |
|
555
|
0 |
0 |
FREETMPS; |
|
569
|
0 |
0 |
if (out_on_complete) *out_on_complete = NULL; |
|
570
|
0 |
0 |
if (ngx_queue_empty(&self->cb_queue)) return NULL; |
|
576
|
0 |
0 |
CLEAR_SV(cbt->on_data); |
|
577
|
0 |
0 |
if (out_on_complete) { |
|
581
|
0 |
0 |
CLEAR_SV(cbt->on_complete); |
|
594
|
0 |
0 |
if (ngx_queue_empty(&self->cb_queue)) return NULL; |
|
603
|
0 |
0 |
if (ngx_queue_empty(&self->cb_queue)) return 0; |
|
611
|
0 |
0 |
WARN_AND_CLEAR_ERRSV("callback"); |
|
|
0 |
0 |
WARN_AND_CLEAR_ERRSV("callback"); |
|
|
0 |
0 |
WARN_AND_CLEAR_ERRSV("callback"); |
|
|
0 |
0 |
WARN_AND_CLEAR_ERRSV("callback"); |
|
620
|
0 |
0 |
PUSHMARK(SP); |
|
625
|
0 |
0 |
FREETMPS; |
|
637
|
0 |
0 |
SV *target = override ? override : self->on_query_complete; |
|
638
|
0 |
0 |
if (!target) return; |
|
640
|
0 |
0 |
? ev_now(self->loop) - self->query_start_time : 0.0; |
|
645
|
0 |
0 |
PUSHMARK(SP); |
|
646
|
0 |
0 |
EXTEND(SP, 6); |
|
647
|
0 |
0 |
PUSHs(self->last_query_id |
|
653
|
0 |
0 |
PUSHs(errmsg ? sv_2mortal(newSVpv(errmsg, 0)) : &PL_sv_undef); |
|
656
|
0 |
0 |
WARN_AND_CLEAR_ERRSV("on_query_complete"); |
|
|
0 |
0 |
WARN_AND_CLEAR_ERRSV("on_query_complete"); |
|
|
0 |
0 |
WARN_AND_CLEAR_ERRSV("on_query_complete"); |
|
|
0 |
0 |
WARN_AND_CLEAR_ERRSV("on_query_complete"); |
|
657
|
0 |
0 |
FREETMPS; LEAVE; |
|
673
|
0 |
0 |
if (cb == NULL) { |
|
675
|
0 |
0 |
if (oqc) SvREFCNT_dec(oqc); |
|
680
|
0 |
0 |
if (!IS_KEEPALIVE_CB(cb)) fire_on_query_complete_ex(self, errmsg, oqc); |
|
|
0 |
0 |
if (!IS_KEEPALIVE_CB(cb)) fire_on_query_complete_ex(self, errmsg, oqc); |
|
682
|
0 |
0 |
if (oqc) SvREFCNT_dec(oqc); |
|
691
|
0 |
0 |
if (cb == NULL) { |
|
692
|
0 |
0 |
if (rows) SvREFCNT_dec((SV*)rows); |
|
694
|
0 |
0 |
if (oqc) SvREFCNT_dec(oqc); |
|
699
|
0 |
0 |
if (!IS_KEEPALIVE_CB(cb)) fire_on_query_complete_ex(self, NULL, oqc); |
|
|
0 |
0 |
if (!IS_KEEPALIVE_CB(cb)) fire_on_query_complete_ex(self, NULL, oqc); |
|
704
|
0 |
0 |
PUSHMARK(SP); |
|
705
|
0 |
0 |
PUSHs(rows ? sv_2mortal(newRV_noinc((SV*)rows)) : &PL_sv_undef); |
|
708
|
0 |
0 |
FREETMPS; |
|
711
|
0 |
0 |
if (oqc) SvREFCNT_dec(oqc); |
|
720
|
0 |
0 |
if (cb == NULL) { |
|
722
|
0 |
0 |
if (oqc) SvREFCNT_dec(oqc); |
|
727
|
0 |
0 |
if (!IS_KEEPALIVE_CB(cb)) fire_on_query_complete_ex(self, NULL, oqc); |
|
|
0 |
0 |
if (!IS_KEEPALIVE_CB(cb)) fire_on_query_complete_ex(self, NULL, oqc); |
|
732
|
0 |
0 |
PUSHMARK(SP); |
|
736
|
0 |
0 |
FREETMPS; |
|
739
|
0 |
0 |
if (oqc) SvREFCNT_dec(oqc); |
|
755
|
0 |
0 |
if (deliver_error(self, deliver_msg)) return 1; |
|
756
|
0 |
0 |
if (self->connect_gen != gen) return 0; |
|
757
|
0 |
0 |
if (cancel_pending(self, cancel_msg)) return 1; |
|
758
|
0 |
0 |
if (self->connect_gen != gen) return 0; |
|
771
|
0 |
0 |
cbt->on_data = on_data ? SvREFCNT_inc(on_data) : NULL; |
|
778
|
0 |
0 |
if (has_arg) { |
|
779
|
0 |
0 |
CLEAR_SV(*slot); |
|
780
|
0 |
0 |
if (handler && SvROK(handler) && SvTYPE(SvRV(handler)) == SVt_PVCV) { |
|
|
0 |
0 |
if (handler && SvROK(handler) && SvTYPE(SvRV(handler)) == SVt_PVCV) { |
|
|
0 |
0 |
if (handler && SvROK(handler) && SvTYPE(SvRV(handler)) == SVt_PVCV) { |
|
784
|
0 |
0 |
return *slot ? SvREFCNT_inc(*slot) : &PL_sv_undef; |
|
790
|
0 |
0 |
if (!s) return NULL; |
|
802
|
0 |
0 |
if (!cmt) return 0; |
|
812
|
0 |
0 |
if (!s) return 0; |
|
813
|
0 |
0 |
for (; *s; s++) |
|
814
|
0 |
0 |
if (*s == '\r' || *s == '\n') return 1; |
|
|
0 |
0 |
if (*s == '\r' || *s == '\n') return 1; |
|
824
|
0 |
0 |
if (n >= self->recv_len) { self->recv_len = 0; return; } |
|
852
|
0 |
0 |
if (was_connected) emit_trace(self, "disconnect"); |
|
859
|
0 |
0 |
if (self->ssl) { |
|
864
|
0 |
0 |
if (self->ssl_ctx) { |
|
870
|
0 |
0 |
if (self->fd >= 0) { |
|
885
|
0 |
0 |
CLEAR_SV(self->native_rows); |
|
886
|
0 |
0 |
CLEAR_SV(self->native_col_names); |
|
887
|
0 |
0 |
CLEAR_SV(self->native_col_types); |
|
888
|
0 |
0 |
CLEAR_SV(self->native_totals); |
|
889
|
0 |
0 |
CLEAR_SV(self->native_extremes); |
|
891
|
0 |
0 |
CLEAR_INSERT(self); |
|
|
0 |
0 |
CLEAR_INSERT(self); |
|
892
|
0 |
0 |
CLEAR_STR(self->insert_err); |
|
898
|
0 |
0 |
if (was_connected && self->on_disconnect) |
|
|
0 |
0 |
if (was_connected && self->on_disconnect) |
|
917
|
0 |
0 |
if (!IS_KEEPALIVE_CB(cb)) |
|
|
0 |
0 |
if (!IS_KEEPALIVE_CB(cb)) |
|
919
|
0 |
0 |
if (oqc) SvREFCNT_dec(oqc); |
|
929
|
0 |
0 |
while (!ngx_queue_empty(&self->cb_queue)) { |
|
932
|
0 |
0 |
if (cb == NULL) break; |
|
934
|
0 |
0 |
if (self->magic != EV_CH_MAGIC) break; |
|
943
|
0 |
0 |
while (!ngx_queue_empty(&self->send_queue)) { |
|
951
|
0 |
0 |
CLEAR_INSERT(send); |
|
|
0 |
0 |
CLEAR_INSERT(send); |
|
952
|
0 |
0 |
CLEAR_SV(send->on_data); |
|
957
|
0 |
0 |
if (self->magic != EV_CH_MAGIC) break; |
|
969
|
0 |
0 |
if (self->ssl) { |
|
970
|
0 |
0 |
int ssl_len = (len > (size_t)INT_MAX) ? INT_MAX : (int)len; |
|
972
|
0 |
0 |
if (ret <= 0) { |
|
974
|
0 |
0 |
if (err == SSL_ERROR_WANT_READ) { |
|
978
|
0 |
0 |
if (err == SSL_ERROR_WANT_WRITE) { |
|
983
|
0 |
0 |
if (err == SSL_ERROR_ZERO_RETURN) return 0; |
|
995
|
0 |
0 |
if (self->ssl) { |
|
996
|
0 |
0 |
int ssl_len = (len > (size_t)INT_MAX) ? INT_MAX : (int)len; |
|
998
|
0 |
0 |
if (ret <= 0) { |
|
1000
|
0 |
0 |
if (err == SSL_ERROR_WANT_WRITE) { |
|
1004
|
0 |
0 |
if (err == SSL_ERROR_WANT_READ) { |
|
1021
|
0 |
0 |
if (self->recv_cap >= need) return; |
|
1022
|
0 |
0 |
if (need > SIZE_MAX / 2) croak("recv buffer overflow"); |
|
1024
|
0 |
0 |
if (newcap < need) newcap = need; |
|
1030
|
0 |
0 |
if (self->send_cap >= need) return; |
|
1031
|
0 |
0 |
if (need > SIZE_MAX / 2) croak("send buffer overflow"); |
|
1033
|
0 |
0 |
if (newcap < need) newcap = need; |
|
1055
|
0 |
0 |
if (need > SIZE_MAX - b->len) croak("native buffer overflow"); |
|
1056
|
0 |
0 |
if (b->len + need > b->cap) { |
|
1057
|
0 |
0 |
while (b->len + need > b->cap) { |
|
1058
|
0 |
0 |
if (b->cap > SIZE_MAX / 2) croak("native buffer overflow"); |
|
1073
|
0 |
0 |
while (n >= 0x80) { |
|
1086
|
0 |
0 |
nbuf_string(b, s, s ? strlen(s) : 0); |
|
1112
|
0 |
0 |
for (i = 0; i < len; i++) if (s[i] == '\'' || s[i] == '\\') esc++; |
|
|
0 |
0 |
for (i = 0; i < len; i++) if (s[i] == '\'' || s[i] == '\\') esc++; |
|
|
0 |
0 |
for (i = 0; i < len; i++) if (s[i] == '\'' || s[i] == '\\') esc++; |
|
1116
|
0 |
0 |
if (esc == 0) { |
|
1120
|
0 |
0 |
for (i = 0; i < len; i++) { |
|
1121
|
0 |
0 |
if (s[i] == '\'' || s[i] == '\\') b->data[b->len++] = '\\'; |
|
|
0 |
0 |
if (s[i] == '\'' || s[i] == '\\') b->data[b->len++] = '\\'; |
|
1135
|
0 |
0 |
while (p < len) { |
|
1138
|
0 |
0 |
if (!(byte & 0x80)) { |
|
1144
|
0 |
0 |
if (shift >= 64) return -1; |
|
1154
|
0 |
0 |
if (rc <= 0) { *pos = saved; return rc; } |
|
1155
|
0 |
0 |
if (slen > len - *pos) { *pos = saved; return 0; } |
|
1159
|
0 |
0 |
if (out_len) *out_len = (size_t)slen; |
|
1170
|
0 |
0 |
if (rc <= 0) { *pos = saved; return rc; } |
|
1171
|
0 |
0 |
if (slen > len - *pos) { *pos = saved; return 0; } |
|
1179
|
0 |
0 |
if (*pos + 1 > len) return 0; |
|
1186
|
0 |
0 |
if (*pos + 4 > len) return 0; |
|
1197
|
0 |
0 |
if (rc <= 0) { *pos = saved; return rc; } |
|
1198
|
0 |
0 |
if (slen > len - *pos) { *pos = saved; return 0; } |
|
1209
|
0 |
0 |
for (i = 0; i < src_len; i++) { |
|
1211
|
0 |
0 |
if ((c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z') || |
|
|
0 |
0 |
if ((c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z') || |
|
|
0 |
0 |
if ((c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z') || |
|
|
0 |
0 |
if ((c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z') || |
|
|
0 |
0 |
if ((c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z') || |
|
1212
|
0 |
0 |
(c >= '0' && c <= '9') || c == '-' || c == '_' || c == '.' || c == '~') { |
|
|
0 |
0 |
(c >= '0' && c <= '9') || c == '-' || c == '_' || c == '.' || c == '~') { |
|
|
0 |
0 |
(c >= '0' && c <= '9') || c == '-' || c == '_' || c == '.' || c == '~') { |
|
|
0 |
0 |
(c >= '0' && c <= '9') || c == '-' || c == '_' || c == '.' || c == '~') { |
|
|
0 |
0 |
(c >= '0' && c <= '9') || c == '-' || c == '_' || c == '.' || c == '~') { |
|
1227
|
0 |
0 |
return (klen == 3 && memcmp(key, "raw", 3) == 0) |
|
1228
|
0 |
0 |
|| (klen == 8 && memcmp(key, "query_id", 8) == 0) |
|
|
0 |
0 |
|| (klen == 8 && memcmp(key, "query_id", 8) == 0) |
|
1229
|
0 |
0 |
|| (klen == 7 && memcmp(key, "on_data", 7) == 0) |
|
|
0 |
0 |
|| (klen == 7 && memcmp(key, "on_data", 7) == 0) |
|
1230
|
0 |
0 |
|| (klen == 13 && memcmp(key, "query_timeout", 13) == 0) |
|
|
0 |
0 |
|| (klen == 13 && memcmp(key, "query_timeout", 13) == 0) |
|
1231
|
0 |
0 |
|| (klen == 6 && memcmp(key, "params", 6) == 0) |
|
|
0 |
0 |
|| (klen == 6 && memcmp(key, "params", 6) == 0) |
|
1232
|
0 |
0 |
|| (klen == 8 && memcmp(key, "external", 8) == 0) |
|
|
0 |
0 |
|| (klen == 8 && memcmp(key, "external", 8) == 0) |
|
1233
|
0 |
0 |
|| (klen == 17 && memcmp(key, "on_query_complete", 17) == 0); |
|
|
0 |
0 |
|| (klen == 17 && memcmp(key, "on_query_complete", 17) == 0); |
|
|
0 |
0 |
|| (klen == 17 && memcmp(key, "on_query_complete", 17) == 0); |
|
1240
|
0 |
0 |
if (overrides) { |
|
1242
|
0 |
0 |
while ((entry = hv_iternext(overrides))) { |
|
1246
|
0 |
0 |
if (is_client_only_key(key, klen)) continue; |
|
1251
|
0 |
0 |
if (defaults) { |
|
1253
|
0 |
0 |
while ((entry = hv_iternext(defaults))) { |
|
1257
|
0 |
0 |
if (overrides && hv_exists(overrides, key, klen)) |
|
|
0 |
0 |
if (overrides && hv_exists(overrides, key, klen)) |
|
1259
|
0 |
0 |
if (is_client_only_key(key, klen)) continue; |
|
1279
|
0 |
0 |
if (overrides) { |
|
1281
|
0 |
0 |
while ((entry = hv_iternext(overrides))) { |
|
1286
|
0 |
0 |
if (klen == 8 && memcmp(key, "query_id", 8) == 0) { |
|
|
0 |
0 |
if (klen == 8 && memcmp(key, "query_id", 8) == 0) { |
|
1291
|
0 |
0 |
if (is_client_only_key(key, klen)) continue; |
|
1299
|
0 |
0 |
if (defaults) { |
|
1301
|
0 |
0 |
while ((entry = hv_iternext(defaults))) { |
|
1306
|
0 |
0 |
if (overrides && hv_exists(overrides, key, klen)) |
|
|
0 |
0 |
if (overrides && hv_exists(overrides, key, klen)) |
|
1308
|
0 |
0 |
if (klen == 8 && memcmp(key, "query_id", 8) == 0) { |
|
|
0 |
0 |
if (klen == 8 && memcmp(key, "query_id", 8) == 0) { |
|
1309
|
0 |
0 |
if (!*query_id_out) { |
|
1315
|
0 |
0 |
if (is_client_only_key(key, klen)) continue; |
|
1330
|
0 |
0 |
if (overrides) { |
|
1332
|
0 |
0 |
while ((entry = hv_iternext(overrides))) { |
|
1337
|
0 |
0 |
if (klen <= 6 || memcmp(key, "param_", 6) != 0) continue; |
|
|
0 |
0 |
if (klen <= 6 || memcmp(key, "param_", 6) != 0) continue; |
|
1344
|
0 |
0 |
if (defaults) { |
|
1346
|
0 |
0 |
while ((entry = hv_iternext(defaults))) { |
|
1351
|
0 |
0 |
if (klen <= 6 || memcmp(key, "param_", 6) != 0) continue; |
|
|
0 |
0 |
if (klen <= 6 || memcmp(key, "param_", 6) != 0) continue; |
|
1352
|
0 |
0 |
if (overrides && hv_exists(overrides, key, klen)) continue; |
|
|
0 |
0 |
if (overrides && hv_exists(overrides, key, klen)) continue; |
|
1368
|
0 |
0 |
if (overrides) { |
|
1370
|
0 |
0 |
while ((entry = hv_iternext(overrides))) { |
|
1375
|
0 |
0 |
if (is_client_only_key(key, klen)) continue; |
|
1377
|
0 |
0 |
if (klen > 6 && memcmp(key, "param_", 6) == 0) continue; |
|
|
0 |
0 |
if (klen > 6 && memcmp(key, "param_", 6) == 0) continue; |
|
1385
|
0 |
0 |
if (defaults) { |
|
1387
|
0 |
0 |
while ((entry = hv_iternext(defaults))) { |
|
1392
|
0 |
0 |
if (overrides && hv_exists(overrides, key, klen)) |
|
|
0 |
0 |
if (overrides && hv_exists(overrides, key, klen)) |
|
1394
|
0 |
0 |
if (is_client_only_key(key, klen)) continue; |
|
1395
|
0 |
0 |
if (klen > 6 && memcmp(key, "param_", 6) == 0) continue; |
|
|
0 |
0 |
if (klen > 6 && memcmp(key, "param_", 6) == 0) continue; |
|
1423
|
0 |
48 |
I_EV_API("EV::ClickHouse"); |
|
|
48 |
0 |
I_EV_API("EV::ClickHouse"); |
|
|
0 |
48 |
I_EV_API("EV::ClickHouse"); |
|
1461
|
0 |
0 |
if (self->magic != EV_CH_MAGIC) return; |
|
1467
|
0 |
0 |
if (self->reconnect_timing) { |
|
1472
|
0 |
0 |
if (PL_dirty) { |
|
1474
|
0 |
0 |
while (!ngx_queue_empty(&self->send_queue)) { |
|
1479
|
0 |
0 |
CLEAR_INSERT(send); |
|
|
0 |
0 |
CLEAR_INSERT(send); |
|
1480
|
0 |
0 |
CLEAR_SV(send->on_data); |
|
1481
|
0 |
0 |
CLEAR_SV(send->on_complete); |
|
1485
|
0 |
0 |
while (!ngx_queue_empty(&self->cb_queue)) { |
|
1489
|
0 |
0 |
CLEAR_SV(cbt->on_data); |
|
1490
|
0 |
0 |
CLEAR_SV(cbt->on_complete); |
|
1495
|
0 |
0 |
#ifdef HAVE_OPENSSL |
|
1496
|
0 |
0 |
if (self->ssl) { SSL_free(self->ssl); self->ssl = NULL; } |
|
1497
|
0 |
0 |
if (self->ssl_ctx) { SSL_CTX_free(self->ssl_ctx); self->ssl_ctx = NULL; } |
|
1498
|
0 |
0 |
#endif |
|
|
0 |
0 |
#endif |
|
|
0 |
0 |
#endif |
|
|
0 |
0 |
#endif |
|
|
0 |
0 |
#endif |
|
|
0 |
0 |
#endif |
|
|
0 |
0 |
#endif |
|
|
0 |
0 |
#endif |
|
|
0 |
0 |
#endif |
|
1499
|
0 |
0 |
if (self->fd >= 0) close(self->fd); |
|
|
0 |
0 |
if (self->fd >= 0) close(self->fd); |
|
|
0 |
0 |
if (self->fd >= 0) close(self->fd); |
|
|
0 |
0 |
if (self->fd >= 0) close(self->fd); |
|
|
0 |
0 |
if (self->fd >= 0) close(self->fd); |
|
|
0 |
0 |
if (self->fd >= 0) close(self->fd); |
|
|
0 |
0 |
if (self->fd >= 0) close(self->fd); |
|
|
0 |
0 |
if (self->fd >= 0) close(self->fd); |
|
|
0 |
0 |
if (self->fd >= 0) close(self->fd); |
|
|
0 |
0 |
if (self->fd >= 0) close(self->fd); |
|
|
0 |
0 |
if (self->fd >= 0) close(self->fd); |
|
|
0 |
0 |
if (self->fd >= 0) close(self->fd); |
|
|
0 |
0 |
if (self->fd >= 0) close(self->fd); |
|
|
0 |
0 |
if (self->fd >= 0) close(self->fd); |
|
|
0 |
0 |
if (self->fd >= 0) close(self->fd); |
|
|
0 |
0 |
if (self->fd >= 0) close(self->fd); |
|
|
0 |
0 |
if (self->fd >= 0) close(self->fd); |
|
|
0 |
0 |
if (self->fd >= 0) close(self->fd); |
|
|
0 |
0 |
if (self->fd >= 0) close(self->fd); |
|
|
0 |
0 |
if (self->fd >= 0) close(self->fd); |
|
|
0 |
0 |
if (self->fd >= 0) close(self->fd); |
|
|
0 |
0 |
if (self->fd >= 0) close(self->fd); |
|
|
0 |
0 |
if (self->fd >= 0) close(self->fd); |
|
1501
|
0 |
0 |
CLEAR_PERSISTENT_STATE(self); |
|
|
0 |
0 |
CLEAR_PERSISTENT_STATE(self); |
|
1507
|
0 |
0 |
} |
|
1518
|
0 |
0 |
stop_timing(self); |
|
1523
|
0 |
0 |
} |
|
1528
|
0 |
0 |
SSL_free(self->ssl); |
|
1533
|
0 |
0 |
self->ssl_ctx = NULL; |
|
1541
|
0 |
0 |
|
|
|
0 |
0 |
|
|
|
0 |
0 |
|
|
|
0 |
0 |
|
|
|
0 |
0 |
|
|
|
0 |
0 |
|
|
|
0 |
0 |
|
|
|
0 |
0 |
|
|
|
0 |
0 |
|
|
1542
|
0 |
0 |
self->loop = NULL; |
|
|
0 |
0 |
self->loop = NULL; |
|
|
0 |
0 |
self->loop = NULL; |
|
|
0 |
0 |
self->loop = NULL; |
|
|
0 |
0 |
self->loop = NULL; |
|
|
0 |
0 |
self->loop = NULL; |
|
|
0 |
0 |
self->loop = NULL; |
|
|
0 |
0 |
self->loop = NULL; |
|
|
0 |
0 |
self->loop = NULL; |
|
|
0 |
0 |
self->loop = NULL; |
|
|
0 |
0 |
self->loop = NULL; |
|
|
0 |
0 |
self->loop = NULL; |
|
|
0 |
0 |
self->loop = NULL; |
|
|
0 |
0 |
self->loop = NULL; |
|
|
0 |
0 |
self->loop = NULL; |
|
|
0 |
0 |
self->loop = NULL; |
|
|
0 |
0 |
self->loop = NULL; |
|
|
0 |
0 |
self->loop = NULL; |
|
|
0 |
0 |
self->loop = NULL; |
|
|
0 |
0 |
self->loop = NULL; |
|
|
0 |
0 |
self->loop = NULL; |
|
|
0 |
0 |
self->loop = NULL; |
|
|
0 |
0 |
self->loop = NULL; |
|
1545
|
0 |
0 |
CLEAR_CONNECTION_HANDLERS(self); |
|
|
0 |
0 |
CLEAR_CONNECTION_HANDLERS(self); |
|
1548
|
0 |
0 |
lc_free_dicts(self); |
|
1562
|
0 |
0 |
CLEAR_STR(self->tls_ca_file); |
|
1570
|
0 |
0 |
CLEAR_STR(self->tls_cert_file); |
|
1578
|
0 |
0 |
CLEAR_STR(self->tls_key_file); |
|
1586
|
0 |
0 |
if (self->connected || self->connecting) croak("already connected"); |
|
|
0 |
0 |
if (self->connected || self->connecting) croak("already connected"); |
|
1587
|
0 |
0 |
if (has_http_unsafe_chars(host) || has_http_unsafe_chars(user) || |
|
1588
|
0 |
0 |
has_http_unsafe_chars(password) || has_http_unsafe_chars(database)) |
|
1591
|
0 |
0 |
CLEAR_STR(self->host); |
|
1592
|
0 |
0 |
CLEAR_STR(self->user); |
|
1593
|
0 |
0 |
CLEAR_STR(self->password); |
|
1594
|
0 |
0 |
CLEAR_STR(self->database); |
|
1609
|
0 |
0 |
if (!self->host) croak("no previous connection to reset"); |
|
1610
|
0 |
0 |
if (cancel_pending(self, "connection reset")) return; |
|
1611
|
0 |
0 |
if (cleanup_connection(self)) return; /* on_disconnect freed self */ |
|
1619
|
0 |
0 |
if (cancel_pending(self, "connection finished")) return; |
|
1636
|
0 |
0 |
if (items == 3) { |
|
1638
|
0 |
0 |
} else if (items == 4) { |
|
1639
|
0 |
0 |
if (!(SvROK(ST(2)) && SvTYPE(SvRV(ST(2))) == SVt_PVHV)) |
|
|
0 |
0 |
if (!(SvROK(ST(2)) && SvTYPE(SvRV(ST(2))) == SVt_PVHV)) |
|
1647
|
0 |
0 |
if (!self->connected && !self->connecting && !self->dns_pending) croak("not connected"); |
|
|
0 |
0 |
if (!self->connected && !self->connecting && !self->dns_pending) croak("not connected"); |
|
|
0 |
0 |
if (!self->connected && !self->connecting && !self->dns_pending) croak("not connected"); |
|
1648
|
0 |
0 |
if (!(SvROK(cb) && SvTYPE(SvRV(cb)) == SVt_PVCV)) |
|
|
0 |
0 |
if (!(SvROK(cb) && SvTYPE(SvRV(cb)) == SVt_PVCV)) |
|
1651
|
0 |
0 |
if (self->protocol == PROTO_NATIVE && (self->insert_data || self->insert_av)) |
|
|
0 |
0 |
if (self->protocol == PROTO_NATIVE && (self->insert_data || self->insert_av)) |
|
|
0 |
0 |
if (self->protocol == PROTO_NATIVE && (self->insert_data || self->insert_av)) |
|
1659
|
0 |
0 |
if (settings) { |
|
1661
|
0 |
0 |
if (settings_copy) settings = settings_copy; |
|
1663
|
0 |
0 |
if (svp) raw = SvTRUE(*svp) ? 1 : 0; |
|
1665
|
0 |
0 |
if (svp && SvROK(*svp) && SvTYPE(SvRV(*svp)) == SVt_PVCV) |
|
|
0 |
0 |
if (svp && SvROK(*svp) && SvTYPE(SvRV(*svp)) == SVt_PVCV) |
|
|
0 |
0 |
if (svp && SvROK(*svp) && SvTYPE(SvRV(*svp)) == SVt_PVCV) |
|
1668
|
0 |
0 |
if (svp && SvROK(*svp) && SvTYPE(SvRV(*svp)) == SVt_PVCV) |
|
|
0 |
0 |
if (svp && SvROK(*svp) && SvTYPE(SvRV(*svp)) == SVt_PVCV) |
|
|
0 |
0 |
if (svp && SvROK(*svp) && SvTYPE(SvRV(*svp)) == SVt_PVCV) |
|
1671
|
0 |
0 |
if (svp && SvROK(*svp) && SvTYPE(SvRV(*svp)) == SVt_PVHV |
|
|
0 |
0 |
if (svp && SvROK(*svp) && SvTYPE(SvRV(*svp)) == SVt_PVHV |
|
|
0 |
0 |
if (svp && SvROK(*svp) && SvTYPE(SvRV(*svp)) == SVt_PVHV |
|
1672
|
0 |
0 |
&& HvKEYS((HV *)SvRV(*svp)) > 0) |
|
|
0 |
0 |
&& HvKEYS((HV *)SvRV(*svp)) > 0) |
|
1678
|
0 |
0 |
if (self->max_query_size > 0 && sql_len > self->max_query_size) { |
|
|
0 |
0 |
if (self->max_query_size > 0 && sql_len > self->max_query_size) { |
|
1679
|
0 |
0 |
if (settings_copy) SvREFCNT_dec((SV*)settings_copy); |
|
1684
|
0 |
0 |
if (raw && self->protocol == PROTO_NATIVE) { |
|
|
0 |
0 |
if (raw && self->protocol == PROTO_NATIVE) { |
|
1685
|
0 |
0 |
if (settings_copy) SvREFCNT_dec((SV*)settings_copy); |
|
1689
|
0 |
0 |
if (on_data_sv && self->protocol == PROTO_HTTP) { |
|
|
0 |
0 |
if (on_data_sv && self->protocol == PROTO_HTTP) { |
|
1690
|
0 |
0 |
if (settings_copy) SvREFCNT_dec((SV*)settings_copy); |
|
1694
|
0 |
0 |
if (external && self->protocol == PROTO_HTTP) { |
|
|
0 |
0 |
if (external && self->protocol == PROTO_HTTP) { |
|
1695
|
0 |
0 |
if (settings_copy) SvREFCNT_dec((SV*)settings_copy); |
|
1703
|
0 |
0 |
if (self->query_log_comment) { |
|
1713
|
0 |
0 |
if (self->protocol == PROTO_HTTP) { |
|
1720
|
0 |
0 |
if (external) { |
|
1725
|
0 |
0 |
if (!ext_data) { |
|
1726
|
0 |
0 |
if (settings_copy) SvREFCNT_dec((SV*)settings_copy); |
|
1727
|
0 |
0 |
if (qlc_sql) Safefree(qlc_sql); |
|
1734
|
0 |
0 |
if (ext_data) Safefree(ext_data); |
|
1736
|
0 |
0 |
if (qlc_sql) Safefree(qlc_sql); |
|
1742
|
0 |
0 |
if (on_data_sv) s->on_data = SvREFCNT_inc(on_data_sv); |
|
1743
|
0 |
0 |
if (on_complete_sv) s->on_complete = SvREFCNT_inc(on_complete_sv); |
|
1744
|
0 |
0 |
if (settings) send_apply_settings(s, settings); |
|
1746
|
0 |
0 |
if (settings_copy) SvREFCNT_dec((SV*)settings_copy); |
|
1764
|
0 |
0 |
if (items == 4) { |
|
1766
|
0 |
0 |
} else if (items == 5) { |
|
1767
|
0 |
0 |
if (!(SvROK(ST(3)) && SvTYPE(SvRV(ST(3))) == SVt_PVHV)) |
|
|
0 |
0 |
if (!(SvROK(ST(3)) && SvTYPE(SvRV(ST(3))) == SVt_PVHV)) |
|
1775
|
0 |
0 |
if (!self->connected && !self->connecting && !self->dns_pending) croak("not connected"); |
|
|
0 |
0 |
if (!self->connected && !self->connecting && !self->dns_pending) croak("not connected"); |
|
|
0 |
0 |
if (!self->connected && !self->connecting && !self->dns_pending) croak("not connected"); |
|
1776
|
0 |
0 |
if (!(SvROK(cb) && SvTYPE(SvRV(cb)) == SVt_PVCV)) |
|
|
0 |
0 |
if (!(SvROK(cb) && SvTYPE(SvRV(cb)) == SVt_PVCV)) |
|
1782
|
0 |
0 |
if (SvROK(data_sv) && SvTYPE(SvRV(data_sv)) == SVt_PVAV) { |
|
|
0 |
0 |
if (SvROK(data_sv) && SvTYPE(SvRV(data_sv)) == SVt_PVAV) { |
|
1788
|
0 |
0 |
if (self->protocol == PROTO_NATIVE && (self->insert_data || self->insert_av)) |
|
|
0 |
0 |
if (self->protocol == PROTO_NATIVE && (self->insert_data || self->insert_av)) |
|
|
0 |
0 |
if (self->protocol == PROTO_NATIVE && (self->insert_data || self->insert_av)) |
|
1796
|
0 |
0 |
if (self->protocol == PROTO_HTTP) { |
|
1797
|
0 |
0 |
if (data_is_av) { |
|
1808
|
0 |
0 |
if (settings) { |
|
1814
|
0 |
0 |
if (idem && SvTRUE(*idem)) { |
|
|
0 |
0 |
if (idem && SvTRUE(*idem)) { |
|
1815
|
0 |
0 |
if (!settings_copy) { |
|
1823
|
0 |
0 |
if (SvPOK(*idem) || SvIOK(*idem) || SvNOK(*idem)) { |
|
|
0 |
0 |
if (SvPOK(*idem) || SvIOK(*idem) || SvNOK(*idem)) { |
|
|
0 |
0 |
if (SvPOK(*idem) || SvIOK(*idem) || SvNOK(*idem)) { |
|
1827
|
0 |
0 |
if (!(tlen == 1 && tstr[0] == '1')) generate = 0; |
|
|
0 |
0 |
if (!(tlen == 1 && tstr[0] == '1')) generate = 0; |
|
1829
|
0 |
0 |
if (generate) { |
|
1844
|
0 |
0 |
if (async && SvTRUE(*async)) { |
|
|
0 |
0 |
if (async && SvTRUE(*async)) { |
|
1845
|
0 |
0 |
if (!settings_copy) settings_copy = newHVhv(settings); |
|
1847
|
0 |
0 |
if (!hv_exists(settings_copy, "wait_for_async_insert", 21)) |
|
1851
|
0 |
0 |
if (settings_copy) settings = settings_copy; |
|
1860
|
0 |
0 |
: 0; |
|
1869
|
0 |
0 |
if (self->protocol == PROTO_HTTP) { |
|
1874
|
0 |
0 |
if (tsv_buf) Safefree(tsv_buf); |
|
1888
|
0 |
0 |
if (settings) send_apply_settings(s, settings); |
|
1892
|
0 |
0 |
if (settings) { |
|
1894
|
0 |
0 |
if (svp && SvROK(*svp) && SvTYPE(SvRV(*svp)) == SVt_PVCV) |
|
|
0 |
0 |
if (svp && SvROK(*svp) && SvTYPE(SvRV(*svp)) == SVt_PVCV) |
|
|
0 |
0 |
if (svp && SvROK(*svp) && SvTYPE(SvRV(*svp)) == SVt_PVCV) |
|
1897
|
0 |
0 |
if (settings_copy) SvREFCNT_dec((SV*)settings_copy); |
|
1902
|
0 |
0 |
if (self->protocol == PROTO_NATIVE) { |
|
1903
|
0 |
0 |
if (data_is_av) { |
|
1908
|
0 |
0 |
Newx(s->insert_data, data_len > 0 ? data_len : 1, char); |
|
1909
|
0 |
0 |
if (data_len > 0) |
|
1927
|
0 |
0 |
if (!self->connected && !self->connecting && !self->dns_pending) croak("not connected"); |
|
|
0 |
0 |
if (!self->connected && !self->connecting && !self->dns_pending) croak("not connected"); |
|
|
0 |
0 |
if (!self->connected && !self->connecting && !self->dns_pending) croak("not connected"); |
|
1928
|
0 |
0 |
if (!(SvROK(cb) && SvTYPE(SvRV(cb)) == SVt_PVCV)) |
|
|
0 |
0 |
if (!(SvROK(cb) && SvTYPE(SvRV(cb)) == SVt_PVCV)) |
|
1931
|
0 |
0 |
if (self->protocol == PROTO_HTTP) { |
|
1975
|
0 |
0 |
RETVAL = self->connected ? 1 : 0; |
|
1984
|
0 |
0 |
RETVAL = self->pending_count; |
|
2001
|
0 |
0 |
if (!ngx_queue_empty(&self->cb_queue)) { |
|
2004
|
0 |
0 |
(void)hv_stores(h, "query_id", |
|
2009
|
0 |
0 |
? ev_now(self->loop) - self->query_start_time : 0.0; |
|
2015
|
0 |
0 |
q != ngx_queue_sentinel(&self->send_queue); |
|
2020
|
0 |
0 |
(void)hv_stores(h, "query_id", |
|
2050
|
0 |
0 |
(void)hv_stores(h, "protocol", newSVpv(self->protocol == PROTO_NATIVE ? "native" : "http", 0)); |
|
2053
|
0 |
0 |
(void)hv_stores(h, "host", |
|
2068
|
0 |
0 |
RETVAL = self->host ? newSVpv(self->host, 0) : &PL_sv_undef; |
|
2077
|
0 |
0 |
RETVAL = self->port; |
|
2086
|
0 |
0 |
if (self->server_name) { |
|
2094
|
0 |
0 |
if (n >= (int)sizeof(buf)) n = (int)sizeof(buf) - 1; |
|
2107
|
0 |
0 |
if (self->server_revision) { |
|
2113
|
0 |
0 |
if (n >= (int)sizeof(buf)) n = (int)sizeof(buf) - 1; |
|
2126
|
0 |
0 |
RETVAL = (UV)self->server_revision; |
|
2139
|
0 |
0 |
if (cancel_pending(self, "skipped")) return; |
|
2140
|
0 |
0 |
CLEAR_INSERT(self); |
|
|
0 |
0 |
CLEAR_INSERT(self); |
|
2141
|
0 |
0 |
CLEAR_STR(self->insert_err); |
|
2142
|
0 |
0 |
if (had_inflight) |
|
2164
|
0 |
0 |
CLEAR_STR(self->session_id); |
|
2172
|
0 |
0 |
CLEAR_STR(self->query_log_comment); |
|
2180
|
0 |
0 |
CLEAR_STR(self->host); |
|
2204
|
0 |
0 |
self->dns_pending = 0; |
|
2229
|
0 |
0 |
if (!(SvROK(href) && SvTYPE(SvRV(href)) == SVt_PVHV)) |
|
|
0 |
0 |
if (!(SvROK(href) && SvTYPE(SvRV(href)) == SVt_PVHV)) |
|
2231
|
0 |
0 |
CLEAR_SV(self->default_settings); |
|
2275
|
0 |
0 |
RETVAL = self->last_tls_error ? newSVpv(self->last_tls_error, 0) : &PL_sv_undef; |
|
2294
|
0 |
0 |
if (!SvOK(hosts_av_ref)) return; |
|
2295
|
0 |
0 |
if (!(SvROK(hosts_av_ref) && SvTYPE(SvRV(hosts_av_ref)) == SVt_PVAV)) |
|
|
0 |
0 |
if (!(SvROK(hosts_av_ref) && SvTYPE(SvRV(hosts_av_ref)) == SVt_PVAV)) |
|
2298
|
0 |
0 |
SSize_t n = av_top_index(hosts) + 1; |
|
2299
|
0 |
0 |
if (n <= 0) return; |
|
2300
|
0 |
0 |
Newx(self->failover_hosts, n, char *); |
|
2301
|
0 |
0 |
Newx(self->failover_ports, n, unsigned int); |
|
2307
|
0 |
0 |
for (SSize_t i = 0; i < n; i++) { |
|
2309
|
0 |
0 |
if (!e || !SvOK(*e)) { |
|
|
0 |
0 |
if (!e || !SvOK(*e)) { |
|
2319
|
0 |
0 |
if (sl > 0 && s[0] == '[') { |
|
|
0 |
0 |
if (sl > 0 && s[0] == '[') { |
|
2321
|
0 |
0 |
if (close) { |
|
2325
|
0 |
0 |
if (close + 1 < s + sl && close[1] == ':') |
|
|
0 |
0 |
if (close + 1 < s + sl && close[1] == ':') |
|
2329
|
0 |
0 |
if (!host && (colon = (const char *)memchr(s, ':', sl))) { |
|
|
0 |
0 |
if (!host && (colon = (const char *)memchr(s, ':', sl))) { |
|
2336
|
0 |
0 |
if (!host) host = safe_strdup(s); |
|
2347
|
0 |
0 |
: &PL_sv_undef; |
|
2406
|
0 |
0 |
: &PL_sv_undef; |
|
2416
|
0 |
0 |
: &PL_sv_undef; |
|
2435
|
0 |
0 |
: &PL_sv_undef; |
|
2445
|
0 |
0 |
: &PL_sv_undef; |
|
2455
|
0 |
0 |
: &PL_sv_undef; |
|
2521
|
0 |
0 |
self->reconnect_jitter = val < 0 ? 0 : val; |
|
2542
|
0 |
0 |
if (!(SvROK(cb) && SvTYPE(SvRV(cb)) == SVt_PVCV)) |
|
|
0 |
0 |
if (!(SvROK(cb) && SvTYPE(SvRV(cb)) == SVt_PVCV)) |
|
2544
|
0 |
0 |
CLEAR_SV(self->on_drain); |
|
2545
|
0 |
0 |
if (self->pending_count == 0 && ngx_queue_empty(&self->send_queue)) { |
|
|
0 |
0 |
if (self->pending_count == 0 && ngx_queue_empty(&self->send_queue)) { |
|
2557
|
0 |
0 |
if (self->protocol == PROTO_NATIVE && self->send_count > 0) { |
|
|
0 |
0 |
if (self->protocol == PROTO_NATIVE && self->send_count > 0) { |
|
2568
|
0 |
0 |
} else if (self->protocol == PROTO_HTTP && self->send_count > 0) { |
|
|
0 |
0 |
} else if (self->protocol == PROTO_HTTP && self->send_count > 0) { |
|
2570
|
0 |
0 |
CLEAR_SV(self->native_rows); |
|
2572
|
0 |
0 |
if (cancel_pending(self, "query cancelled")) return; |
|
2573
|
0 |
0 |
if (self->connect_gen != gen) return; |
|
2574
|
0 |
0 |
if (cleanup_connection(self)) return; /* on_disconnect freed self */ |
|
2575
|
0 |
0 |
if (self->auto_reconnect && self->host) |
|
|
0 |
0 |
if (self->auto_reconnect && self->host) |
|
2589
|
0 |
0 |
if (!(SvROK(self_sv) && SvTYPE(SvRV(self_sv)) == SVt_PVHV)) |
|
|
0 |
0 |
if (!(SvROK(self_sv) && SvTYPE(SvRV(self_sv)) == SVt_PVHV)) |
|
2594
|
0 |
0 |
if (!buf_p || !SvROK(*buf_p) || SvTYPE(SvRV(*buf_p)) != SVt_PVAV) |
|
|
0 |
0 |
if (!buf_p || !SvROK(*buf_p) || SvTYPE(SvRV(*buf_p)) != SVt_PVAV) |
|
|
0 |
0 |
if (!buf_p || !SvROK(*buf_p) || SvTYPE(SvRV(*buf_p)) != SVt_PVAV) |
|
2599
|
0 |
0 |
if (SvROK(row) && SvTYPE(SvRV(row)) == SVt_PVHV) { |
|
|
0 |
0 |
if (SvROK(row) && SvTYPE(SvRV(row)) == SVt_PVHV) { |
|
2601
|
0 |
0 |
if (!cols_p || !SvROK(*cols_p) || SvTYPE(SvRV(*cols_p)) != SVt_PVAV) |
|
|
0 |
0 |
if (!cols_p || !SvROK(*cols_p) || SvTYPE(SvRV(*cols_p)) != SVt_PVAV) |
|
|
0 |
0 |
if (!cols_p || !SvROK(*cols_p) || SvTYPE(SvRV(*cols_p)) != SVt_PVAV) |
|
2605
|
0 |
0 |
SSize_t n = av_top_index(cols) + 1; |
|
2608
|
0 |
0 |
for (SSize_t i = 0; i < n; i++) { |
|
2610
|
0 |
0 |
if (!col) { av_push(out, newSV(0)); continue; } |
|
2614
|
0 |
0 |
av_push(out, vp ? newSVsv(*vp) : newSV(0)); |
|
2621
|
0 |
0 |
SSize_t buf_n = av_top_index(buffer) + 1; |
|
2624
|
0 |
0 |
SSize_t batch = bs_p ? SvIV(*bs_p) : 10000; |
|
2629
|
0 |
0 |
SSize_t hw = hw_p ? SvIV(*hw_p) : 0; |
|
2630
|
0 |
0 |
if (hw && buf_n >= hw) { |
|
|
0 |
0 |
if (hw && buf_n >= hw) { |
|
2632
|
0 |
0 |
if (!act_p || !SvTRUE(*act_p)) { |
|
|
0 |
0 |
if (!act_p || !SvTRUE(*act_p)) { |
|
2641
|
0 |
0 |
if (need_flush) { |
|
2644
|
0 |
0 |
PUSHMARK(SP); |
|
2645
|
0 |
0 |
XPUSHs(self_sv); |
|
2648
|
0 |
0 |
WARN_AND_CLEAR_ERRSV("Streamer _flush"); |
|
|
0 |
0 |
WARN_AND_CLEAR_ERRSV("Streamer _flush"); |
|
|
0 |
0 |
WARN_AND_CLEAR_ERRSV("Streamer _flush"); |
|
|
0 |
0 |
WARN_AND_CLEAR_ERRSV("Streamer _flush"); |
|
2649
|
0 |
0 |
FREETMPS; LEAVE; |
|
2652
|
0 |
0 |
if (need_hw) { |
|
2654
|
0 |
0 |
if (cb_p && SvROK(*cb_p) && SvTYPE(SvRV(*cb_p)) == SVt_PVCV) { |
|
|
0 |
0 |
if (cb_p && SvROK(*cb_p) && SvTYPE(SvRV(*cb_p)) == SVt_PVCV) { |
|
|
0 |
0 |
if (cb_p && SvROK(*cb_p) && SvTYPE(SvRV(*cb_p)) == SVt_PVCV) { |
|
2656
|
0 |
0 |
IV inflight = inflight_p ? SvIV(*inflight_p) : 0; |
|
2659
|
0 |
0 |
PUSHMARK(SP); |
|
2660
|
0 |
0 |
mXPUSHi(buf_n); |
|
2661
|
0 |
0 |
mXPUSHi(inflight); |
|
2664
|
0 |
0 |
WARN_AND_CLEAR_ERRSV("on_high_water"); |
|
|
0 |
0 |
WARN_AND_CLEAR_ERRSV("on_high_water"); |
|
|
0 |
0 |
WARN_AND_CLEAR_ERRSV("on_high_water"); |
|
|
0 |
0 |
WARN_AND_CLEAR_ERRSV("on_high_water"); |
|
2665
|
0 |
0 |
FREETMPS; LEAVE; |
|
2680
|
0 |
0 |
if (!(SvROK(pool_sv) && SvTYPE(SvRV(pool_sv)) == SVt_PVHV)) |
|
|
0 |
0 |
if (!(SvROK(pool_sv) && SvTYPE(SvRV(pool_sv)) == SVt_PVHV)) |
|
2685
|
0 |
0 |
if (!conns_p || !SvROK(*conns_p) || SvTYPE(SvRV(*conns_p)) != SVt_PVAV) |
|
|
0 |
0 |
if (!conns_p || !SvROK(*conns_p) || SvTYPE(SvRV(*conns_p)) != SVt_PVAV) |
|
|
0 |
0 |
if (!conns_p || !SvROK(*conns_p) || SvTYPE(SvRV(*conns_p)) != SVt_PVAV) |
|
2688
|
0 |
0 |
SSize_t n = av_top_index(conns) + 1; |
|
2689
|
0 |
0 |
if (n <= 0) croak("_pool_pick: empty pool"); |
|
2692
|
0 |
0 |
IV thresh = thresh_p ? SvIV(*thresh_p) : 0; |
|
2695
|
0 |
0 |
if (thresh > 0) { |
|
2697
|
0 |
0 |
if (cs_p && SvROK(*cs_p) && SvTYPE(SvRV(*cs_p)) == SVt_PVAV) |
|
|
0 |
0 |
if (cs_p && SvROK(*cs_p) && SvTYPE(SvRV(*cs_p)) == SVt_PVAV) |
|
|
0 |
0 |
if (cs_p && SvROK(*cs_p) && SvTYPE(SvRV(*cs_p)) == SVt_PVAV) |
|
2707
|
0 |
0 |
if (p_p && SvROK(*p_p) && SvTYPE(SvRV(*p_p)) == SVt_PVHV) |
|
|
0 |
0 |
if (p_p && SvROK(*p_p) && SvTYPE(SvRV(*p_p)) == SVt_PVHV) |
|
|
0 |
0 |
if (p_p && SvROK(*p_p) && SvTYPE(SvRV(*p_p)) == SVt_PVHV) |
|
2714
|
0 |
0 |
if (n > ties_cap) { |
|
2715
|
0 |
0 |
Newx(ties, n, int); |
|
2723
|
0 |
0 |
for (int pass = 0; pass < 3; pass++) { |
|
2724
|
0 |
0 |
for (SSize_t i = 0; i < n; i++) { |
|
2725
|
0 |
0 |
if (pass < 2 && cb_state) { |
|
|
0 |
0 |
if (pass < 2 && cb_state) { |
|
2727
|
0 |
0 |
if (slot_sv && SvROK(*slot_sv) && SvTYPE(SvRV(*slot_sv)) == SVt_PVHV) { |
|
|
0 |
0 |
if (slot_sv && SvROK(*slot_sv) && SvTYPE(SvRV(*slot_sv)) == SVt_PVHV) { |
|
|
0 |
0 |
if (slot_sv && SvROK(*slot_sv) && SvTYPE(SvRV(*slot_sv)) == SVt_PVHV) { |
|
2730
|
0 |
0 |
if (du_p && SvNV(*du_p) > now) continue; |
|
|
0 |
0 |
if (du_p && SvNV(*du_p) > now) continue; |
|
2734
|
0 |
0 |
if (!csv || !SvROK(*csv) || !sv_isa(*csv, "EV::ClickHouse")) continue; |
|
|
0 |
0 |
if (!csv || !SvROK(*csv) || !sv_isa(*csv, "EV::ClickHouse")) continue; |
|
|
0 |
0 |
if (!csv || !SvROK(*csv) || !sv_isa(*csv, "EV::ClickHouse")) continue; |
|
2736
|
0 |
0 |
if (ch->magic != EV_CH_MAGIC) continue; /* freed mid-callback */ |
|
2737
|
0 |
0 |
if (pass == 0 && pinned) { |
|
|
0 |
0 |
if (pass == 0 && pinned) { |
|
2744
|
0 |
0 |
if (hv_exists(pinned, key, klen)) continue; |
|
2747
|
0 |
0 |
if (best < 0 || pc < best_n) { |
|
|
0 |
0 |
if (best < 0 || pc < best_n) { |
|
2752
|
0 |
0 |
} else if (pc == best_n) { |
|
2753
|
0 |
0 |
if (n_ties < ties_cap) ties[n_ties++] = (int)i; |
|
2756
|
0 |
0 |
if (best >= 0) break; |
|
2762
|
0 |
0 |
if (best < 0) { |
|
2763
|
0 |
0 |
if (ties != stack_ties) Safefree(ties); |
|
2768
|
0 |
0 |
if (n_ties == 1) { |
|
2772
|
0 |
0 |
IV idx = idx_p ? SvIV(*idx_p) : 0; |
|
2780
|
0 |
0 |
if (ties != stack_ties) Safefree(ties); |
|
2792
|
0 |
0 |
if (!(SvROK(self_sv) && SvTYPE(SvRV(self_sv)) == SVt_PVHV)) |
|
|
0 |
0 |
if (!(SvROK(self_sv) && SvTYPE(SvRV(self_sv)) == SVt_PVHV)) |
|
2796
|
0 |
0 |
if (!ch_p || !SvROK(*ch_p) || !sv_isa(*ch_p, "EV::ClickHouse")) |
|
|
0 |
0 |
if (!ch_p || !SvROK(*ch_p) || !sv_isa(*ch_p, "EV::ClickHouse")) |
|
|
0 |
0 |
if (!ch_p || !SvROK(*ch_p) || !sv_isa(*ch_p, "EV::ClickHouse")) |
|
2799
|
0 |
0 |
struct ev_loop *loop = ch->loop ? ch->loop : EV_DEFAULT; |
|
2802
|
0 |
0 |
if (!batches_p || !SvROK(*batches_p) || SvTYPE(SvRV(*batches_p)) != SVt_PVAV) |
|
|
0 |
0 |
if (!batches_p || !SvROK(*batches_p) || SvTYPE(SvRV(*batches_p)) != SVt_PVAV) |
|
|
0 |
0 |
if (!batches_p || !SvROK(*batches_p) || SvTYPE(SvRV(*batches_p)) != SVt_PVAV) |
|
2807
|
0 |
0 |
if (SvOK(timeout_sv)) { |
|
2809
|
0 |
0 |
if (t > 0) timeout = t; |
|
2811
|
0 |
0 |
double expires = timeout > 0 ? ev_now(loop) + timeout : 0; |
|
2816
|
0 |
0 |
while (av_top_index(batches) < 0) { |
|
|
0 |
0 |
while (av_top_index(batches) < 0) { |
|
2818
|
0 |
0 |
if (done_p && SvTRUE(*done_p)) break; |
|
|
0 |
0 |
if (done_p && SvTRUE(*done_p)) break; |
|
2819
|
0 |
0 |
if (expires) { |
|
2821
|
0 |
0 |
if (left <= 0) { |
|
2822
|
0 |
0 |
if (armed) { ev_timer_stop(loop, &to); armed = 0; } |
|
2831
|
0 |
0 |
if (armed) { ev_timer_stop(loop, &to); armed = 0; } |
|
2832
|
0 |
0 |
if (av_top_index(batches) < 0) { |
|
|
0 |
0 |
if (av_top_index(batches) < 0) { |
|
2834
|
0 |
0 |
if (!dp2 || !SvTRUE(*dp2)) { |
|
|
0 |
0 |
if (!dp2 || !SvTRUE(*dp2)) { |
|
2842
|
0 |
0 |
if (av_top_index(batches) >= 0) { |
|
|
0 |
0 |
if (av_top_index(batches) >= 0) { |
|
2844
|
0 |
0 |
RETVAL = b ? b : &PL_sv_undef; |
|
2858
|
0 |
0 |
if (!(SvROK(slot_sv) && SvTYPE(SvRV(slot_sv)) == SVt_PVHV)) return; |
|
|
0 |
0 |
if (!(SvROK(slot_sv) && SvTYPE(SvRV(slot_sv)) == SVt_PVHV)) return; |
|
2860
|
0 |
0 |
if (SvTRUE(err_sv)) { |
|
2862
|
0 |
0 |
IV fails = (fails_p ? SvIV(*fails_p) : 0) + 1; |
|
2864
|
0 |
0 |
if (threshold > 0 && fails >= threshold) { |
|
|
0 |
0 |
if (threshold > 0 && fails >= threshold) { |