File Coverage

xs/queues.c
Criterion Covered Total %
statement 0 72 0.0
branch 0 26 0.0
condition n/a
subroutine n/a
pod n/a
total 0 98 0.0


line stmt bran cond sub pod time code
1             /* --- freelist for cb_queue entries + send_queue entries ---
2             *
3             * Both use a singly-linked-list-in-the-record-itself stash: the first
4             * sizeof(void*) bytes of a released entry are repurposed as the "next
5             * free" pointer. Reuse the same struct without rebuilding it from
6             * scratch.
7             *
8             * Also lives here: the singleton sentinels (keepalive_noop_cb,
9             * iter_timeout_cb) and the small helpers that munge per-query
10             * settings into per-send fields.
11             *
12             * This file is #include'd from ClickHouse.xs as part of the single
13             * translation unit; symbols stay file-local-to-the-TU.
14             */
15              
16             static ev_ch_cb_t *cbt_freelist = NULL;
17              
18 0           static ev_ch_cb_t* alloc_cbt(void) {
19             ev_ch_cb_t *cbt;
20 0 0         if (cbt_freelist) {
21 0           cbt = cbt_freelist;
22 0           cbt_freelist = *(ev_ch_cb_t **)cbt;
23             } else {
24 0           Newx(cbt, 1, ev_ch_cb_t);
25             }
26             /* Reset all fields — freelist may have stale values. */
27 0           cbt->cb = NULL;
28 0           cbt->raw = 0;
29 0           cbt->on_data = NULL;
30 0           cbt->on_complete = NULL;
31 0           cbt->query_timeout = 0;
32 0           return cbt;
33             }
34              
35 0           static void release_cbt(ev_ch_cb_t *cbt) {
36 0           *(ev_ch_cb_t **)cbt = cbt_freelist;
37 0           cbt_freelist = cbt;
38 0           }
39              
40             static ev_ch_send_t *send_freelist = NULL;
41              
42             /* Iterator timeout watcher cb: just break the loop the iterator drove. */
43 0           static void iter_timeout_cb(EV_P_ ev_timer *w, int revents) {
44             (void)w; (void)revents;
45 0           ev_break(EV_A, EVBREAK_ONE);
46 0           }
47              
48             /* No-op CV reference used as the callback for HTTP keepalive pings;
49             * initialised once at BOOT and shared by all connections. */
50             static SV *keepalive_noop_cb = NULL;
51              
52 0           static ev_ch_send_t* alloc_send(void) {
53             ev_ch_send_t *s;
54 0 0         if (send_freelist) {
55 0           s = send_freelist;
56 0           send_freelist = *(ev_ch_send_t **)s;
57             } else {
58 0           Newx(s, 1, ev_ch_send_t);
59             }
60 0           s->data = NULL;
61 0           s->data_len = 0;
62 0           s->cb = NULL;
63 0           s->insert_data = NULL;
64 0           s->insert_data_len = 0;
65 0           s->insert_av = NULL;
66 0           s->raw = 0;
67 0           s->on_data = NULL;
68 0           s->on_complete = NULL;
69 0           s->query_timeout = 0;
70 0           s->query_id = NULL;
71 0           return s;
72             }
73              
74 0           static void release_send(ev_ch_send_t *s) {
75 0 0         CLEAR_STR(s->query_id);
76 0           *(ev_ch_send_t **)s = send_freelist;
77 0           send_freelist = s;
78 0           }
79              
80             /* Copy settings->{query_id} into s->query_id and apply query_timeout. */
81 0           static void send_apply_settings(ev_ch_send_t *s, HV *settings) {
82 0           SV **svp = hv_fetch(settings, "query_id", 8, 0);
83 0 0         if (svp && SvOK(*svp)) {
    0          
84             STRLEN qlen;
85 0           const char *qstr = SvPV(*svp, qlen);
86 0           Newx(s->query_id, qlen + 1, char);
87 0           Copy(qstr, s->query_id, qlen, char);
88 0           s->query_id[qlen] = '\0';
89             }
90 0           svp = hv_fetch(settings, "query_timeout", 13, 0);
91 0 0         if (svp && SvOK(*svp)) s->query_timeout = SvNV(*svp);
    0          
92 0           }
93              
94             /* If settings has params => { x => 1 }, return a new HV* copy with the
95             * param keys flattened to param_x => '1'. Caller owns the returned HV
96             * (SvREFCNT_dec it). Returns NULL if no params key — caller continues
97             * to use the original settings hashref. */
98 0           static HV* expand_params(pTHX_ HV *settings) {
99 0           SV **svp = hv_fetch(settings, "params", 6, 0);
100 0 0         if (!svp || !SvROK(*svp) || SvTYPE(SvRV(*svp)) != SVt_PVHV)
    0          
    0          
101 0           return NULL;
102 0           HV *phv = (HV *)SvRV(*svp);
103 0           HV *copy = newHVhv(settings);
104             HE *pe;
105 0           hv_iterinit(phv);
106 0 0         while ((pe = hv_iternext(phv))) {
107             I32 pklen;
108 0           char *pkey = hv_iterkey(pe, &pklen);
109 0           SV *pval = hv_iterval(phv, pe);
110             char *prefixed;
111 0           Newx(prefixed, pklen + 7, char);
112 0           Copy("param_", prefixed, 6, char);
113 0           Copy(pkey, prefixed + 6, pklen, char);
114 0           (void)hv_store(copy, prefixed, pklen + 6, newSVsv(pval), 0);
115 0           Safefree(prefixed);
116             }
117 0           return copy;
118             }
119              
120             /* Append send entry to queue and dispatch if idle. */
121 0           static void enqueue_send(ev_clickhouse_t *self, ev_ch_send_t *s) {
122 0           ngx_queue_insert_tail(&self->send_queue, &s->queue);
123 0           self->pending_count++;
124 0 0         if (self->connected && self->callback_depth == 0)
    0          
125 0           pipeline_advance(self);
126 0           }