| line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
|
1
|
|
|
|
|
|
|
/* --- freelist for cb_queue entries + send_queue entries --- |
|
2
|
|
|
|
|
|
|
* |
|
3
|
|
|
|
|
|
|
* Both use a singly-linked-list-in-the-record-itself stash: the first |
|
4
|
|
|
|
|
|
|
* sizeof(void*) bytes of a released entry are repurposed as the "next |
|
5
|
|
|
|
|
|
|
* free" pointer. Reuse the same struct without rebuilding it from |
|
6
|
|
|
|
|
|
|
* scratch. |
|
7
|
|
|
|
|
|
|
* |
|
8
|
|
|
|
|
|
|
* Also lives here: the singleton sentinels (keepalive_noop_cb, |
|
9
|
|
|
|
|
|
|
* iter_timeout_cb) and the small helpers that munge per-query |
|
10
|
|
|
|
|
|
|
* settings into per-send fields. |
|
11
|
|
|
|
|
|
|
* |
|
12
|
|
|
|
|
|
|
* This file is #include'd from ClickHouse.xs as part of the single |
|
13
|
|
|
|
|
|
|
* translation unit; symbols stay file-local-to-the-TU. |
|
14
|
|
|
|
|
|
|
*/ |
|
15
|
|
|
|
|
|
|
|
|
16
|
|
|
|
|
|
|
static ev_ch_cb_t *cbt_freelist = NULL; |
|
17
|
|
|
|
|
|
|
|
|
18
|
0
|
|
|
|
|
|
static ev_ch_cb_t* alloc_cbt(void) { |
|
19
|
|
|
|
|
|
|
ev_ch_cb_t *cbt; |
|
20
|
0
|
0
|
|
|
|
|
if (cbt_freelist) { |
|
21
|
0
|
|
|
|
|
|
cbt = cbt_freelist; |
|
22
|
0
|
|
|
|
|
|
cbt_freelist = *(ev_ch_cb_t **)cbt; |
|
23
|
|
|
|
|
|
|
} else { |
|
24
|
0
|
|
|
|
|
|
Newx(cbt, 1, ev_ch_cb_t); |
|
25
|
|
|
|
|
|
|
} |
|
26
|
|
|
|
|
|
|
/* Reset all fields — freelist may have stale values. */ |
|
27
|
0
|
|
|
|
|
|
cbt->cb = NULL; |
|
28
|
0
|
|
|
|
|
|
cbt->raw = 0; |
|
29
|
0
|
|
|
|
|
|
cbt->on_data = NULL; |
|
30
|
0
|
|
|
|
|
|
cbt->on_complete = NULL; |
|
31
|
0
|
|
|
|
|
|
cbt->query_timeout = 0; |
|
32
|
0
|
|
|
|
|
|
return cbt; |
|
33
|
|
|
|
|
|
|
} |
|
34
|
|
|
|
|
|
|
|
|
35
|
0
|
|
|
|
|
|
static void release_cbt(ev_ch_cb_t *cbt) { |
|
36
|
0
|
|
|
|
|
|
*(ev_ch_cb_t **)cbt = cbt_freelist; |
|
37
|
0
|
|
|
|
|
|
cbt_freelist = cbt; |
|
38
|
0
|
|
|
|
|
|
} |
|
39
|
|
|
|
|
|
|
|
|
40
|
|
|
|
|
|
|
static ev_ch_send_t *send_freelist = NULL; |
|
41
|
|
|
|
|
|
|
|
|
42
|
|
|
|
|
|
|
/* Iterator timeout watcher cb: just break the loop the iterator drove. */ |
|
43
|
0
|
|
|
|
|
|
static void iter_timeout_cb(EV_P_ ev_timer *w, int revents) { |
|
44
|
|
|
|
|
|
|
(void)w; (void)revents; |
|
45
|
0
|
|
|
|
|
|
ev_break(EV_A, EVBREAK_ONE); |
|
46
|
0
|
|
|
|
|
|
} |
|
47
|
|
|
|
|
|
|
|
|
48
|
|
|
|
|
|
|
/* No-op CV reference used as the callback for HTTP keepalive pings; |
|
49
|
|
|
|
|
|
|
* initialised once at BOOT and shared by all connections. */ |
|
50
|
|
|
|
|
|
|
static SV *keepalive_noop_cb = NULL; |
|
51
|
|
|
|
|
|
|
|
|
52
|
0
|
|
|
|
|
|
static ev_ch_send_t* alloc_send(void) { |
|
53
|
|
|
|
|
|
|
ev_ch_send_t *s; |
|
54
|
0
|
0
|
|
|
|
|
if (send_freelist) { |
|
55
|
0
|
|
|
|
|
|
s = send_freelist; |
|
56
|
0
|
|
|
|
|
|
send_freelist = *(ev_ch_send_t **)s; |
|
57
|
|
|
|
|
|
|
} else { |
|
58
|
0
|
|
|
|
|
|
Newx(s, 1, ev_ch_send_t); |
|
59
|
|
|
|
|
|
|
} |
|
60
|
0
|
|
|
|
|
|
s->data = NULL; |
|
61
|
0
|
|
|
|
|
|
s->data_len = 0; |
|
62
|
0
|
|
|
|
|
|
s->cb = NULL; |
|
63
|
0
|
|
|
|
|
|
s->insert_data = NULL; |
|
64
|
0
|
|
|
|
|
|
s->insert_data_len = 0; |
|
65
|
0
|
|
|
|
|
|
s->insert_av = NULL; |
|
66
|
0
|
|
|
|
|
|
s->raw = 0; |
|
67
|
0
|
|
|
|
|
|
s->on_data = NULL; |
|
68
|
0
|
|
|
|
|
|
s->on_complete = NULL; |
|
69
|
0
|
|
|
|
|
|
s->query_timeout = 0; |
|
70
|
0
|
|
|
|
|
|
s->query_id = NULL; |
|
71
|
0
|
|
|
|
|
|
return s; |
|
72
|
|
|
|
|
|
|
} |
|
73
|
|
|
|
|
|
|
|
|
74
|
0
|
|
|
|
|
|
static void release_send(ev_ch_send_t *s) { |
|
75
|
0
|
0
|
|
|
|
|
CLEAR_STR(s->query_id); |
|
76
|
0
|
|
|
|
|
|
*(ev_ch_send_t **)s = send_freelist; |
|
77
|
0
|
|
|
|
|
|
send_freelist = s; |
|
78
|
0
|
|
|
|
|
|
} |
|
79
|
|
|
|
|
|
|
|
|
80
|
|
|
|
|
|
|
/* Copy settings->{query_id} into s->query_id and apply query_timeout. */ |
|
81
|
0
|
|
|
|
|
|
static void send_apply_settings(ev_ch_send_t *s, HV *settings) { |
|
82
|
0
|
|
|
|
|
|
SV **svp = hv_fetch(settings, "query_id", 8, 0); |
|
83
|
0
|
0
|
|
|
|
|
if (svp && SvOK(*svp)) { |
|
|
|
0
|
|
|
|
|
|
|
84
|
|
|
|
|
|
|
STRLEN qlen; |
|
85
|
0
|
|
|
|
|
|
const char *qstr = SvPV(*svp, qlen); |
|
86
|
0
|
|
|
|
|
|
Newx(s->query_id, qlen + 1, char); |
|
87
|
0
|
|
|
|
|
|
Copy(qstr, s->query_id, qlen, char); |
|
88
|
0
|
|
|
|
|
|
s->query_id[qlen] = '\0'; |
|
89
|
|
|
|
|
|
|
} |
|
90
|
0
|
|
|
|
|
|
svp = hv_fetch(settings, "query_timeout", 13, 0); |
|
91
|
0
|
0
|
|
|
|
|
if (svp && SvOK(*svp)) s->query_timeout = SvNV(*svp); |
|
|
|
0
|
|
|
|
|
|
|
92
|
0
|
|
|
|
|
|
} |
|
93
|
|
|
|
|
|
|
|
|
94
|
|
|
|
|
|
|
/* If settings has params => { x => 1 }, return a new HV* copy with the |
|
95
|
|
|
|
|
|
|
* param keys flattened to param_x => '1'. Caller owns the returned HV |
|
96
|
|
|
|
|
|
|
* (SvREFCNT_dec it). Returns NULL if no params key — caller continues |
|
97
|
|
|
|
|
|
|
* to use the original settings hashref. */ |
|
98
|
0
|
|
|
|
|
|
static HV* expand_params(pTHX_ HV *settings) { |
|
99
|
0
|
|
|
|
|
|
SV **svp = hv_fetch(settings, "params", 6, 0); |
|
100
|
0
|
0
|
|
|
|
|
if (!svp || !SvROK(*svp) || SvTYPE(SvRV(*svp)) != SVt_PVHV) |
|
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
101
|
0
|
|
|
|
|
|
return NULL; |
|
102
|
0
|
|
|
|
|
|
HV *phv = (HV *)SvRV(*svp); |
|
103
|
0
|
|
|
|
|
|
HV *copy = newHVhv(settings); |
|
104
|
|
|
|
|
|
|
HE *pe; |
|
105
|
0
|
|
|
|
|
|
hv_iterinit(phv); |
|
106
|
0
|
0
|
|
|
|
|
while ((pe = hv_iternext(phv))) { |
|
107
|
|
|
|
|
|
|
I32 pklen; |
|
108
|
0
|
|
|
|
|
|
char *pkey = hv_iterkey(pe, &pklen); |
|
109
|
0
|
|
|
|
|
|
SV *pval = hv_iterval(phv, pe); |
|
110
|
|
|
|
|
|
|
char *prefixed; |
|
111
|
0
|
|
|
|
|
|
Newx(prefixed, pklen + 7, char); |
|
112
|
0
|
|
|
|
|
|
Copy("param_", prefixed, 6, char); |
|
113
|
0
|
|
|
|
|
|
Copy(pkey, prefixed + 6, pklen, char); |
|
114
|
0
|
|
|
|
|
|
(void)hv_store(copy, prefixed, pklen + 6, newSVsv(pval), 0); |
|
115
|
0
|
|
|
|
|
|
Safefree(prefixed); |
|
116
|
|
|
|
|
|
|
} |
|
117
|
0
|
|
|
|
|
|
return copy; |
|
118
|
|
|
|
|
|
|
} |
|
119
|
|
|
|
|
|
|
|
|
120
|
|
|
|
|
|
|
/* Append send entry to queue and dispatch if idle. */ |
|
121
|
0
|
|
|
|
|
|
static void enqueue_send(ev_clickhouse_t *self, ev_ch_send_t *s) { |
|
122
|
0
|
|
|
|
|
|
ngx_queue_insert_tail(&self->send_queue, &s->queue); |
|
123
|
0
|
|
|
|
|
|
self->pending_count++; |
|
124
|
0
|
0
|
|
|
|
|
if (self->connected && self->callback_depth == 0) |
|
|
|
0
|
|
|
|
|
|
|
125
|
0
|
|
|
|
|
|
pipeline_advance(self); |
|
126
|
0
|
|
|
|
|
|
} |