File Coverage

c/pgconn.c
Criterion Covered Total %
statement 0 469 0.0
branch 0 534 0.0
condition n/a
subroutine n/a
pod n/a
total 0 1003 0.0


line stmt bran cond sub pod time code
1             #define FUPG_CACHE 1
2             #define FUPG_TEXT_PARAMS 2
3             #define FUPG_TEXT_RESULTS 4
4             #define FUPG_TEXT (FUPG_TEXT_PARAMS|FUPG_TEXT_RESULTS)
5              
6 0 0         KHASHL_MAP_INIT(KH_LOCAL, fupg_records, fupg_records, Oid, fupg_record *, kh_hash_uint32, kh_eq_generic);
  0 0          
  0 0          
  0 0          
  0 0          
  0 0          
  0 0          
  0 0          
  0 0          
  0 0          
  0 0          
  0 0          
  0 0          
  0 0          
  0 0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
7              
8             typedef struct fupg_prep fupg_prep;
9             struct fupg_prep {
10             khint_t hash; /* Cached kh_hash_str() of the query */
11             int ref; /* How many active $st objects are using this */
12             UV name;
13             fupg_prep *next, *prev; /* FIFO list for the LRU, only if ref=0 */
14             char *query;
15             PGresult *describe;
16             };
17              
18             #define fupg_prep_hash(p) ((p)->hash)
19             #define fupg_prep_eq(a, b) ((a)->hash == (b)->hash && strcmp((a)->query, (b)->query) == 0)
20 0 0         KHASHL_SET_INIT(KH_LOCAL, fupg_prepared, fupg_prepared, fupg_prep *, fupg_prep_hash, fupg_prep_eq);
  0 0          
  0 0          
  0 0          
  0 0          
  0 0          
  0 0          
  0 0          
  0 0          
  0 0          
  0 0          
  0 0          
  0 0          
  0 0          
  0 0          
  0 0          
  0 0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
21              
22 0           static void fupg_prep_destroy(fupg_prep *p) {
23 0           PQclear(p->describe);
24 0           safefree(p->query);
25 0           safefree(p);
26 0           }
27              
28             typedef struct {
29             const fupg_type *send, *recv;
30             SV *sendcb, *recvcb;
31             } fupg_override;
32              
33             #define fupg_name_hash(v) kh_hash_str((v).n)
34             #define fupg_name_eq(a,b) kh_eq_str((a).n, (b).n)
35 0 0         KHASHL_MAP_INIT(KH_LOCAL, fupg_oid_overrides, fupg_oid_overrides, Oid, fupg_override, kh_hash_uint32, kh_eq_generic);
  0 0          
  0 0          
  0 0          
  0 0          
  0 0          
  0 0          
  0 0          
  0 0          
  0 0          
  0 0          
  0 0          
  0 0          
  0 0          
  0 0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
36 0 0         KHASHL_MAP_INIT(KH_LOCAL, fupg_name_overrides, fupg_name_overrides, fupg_name, fupg_override, fupg_name_hash, fupg_name_eq);
  0 0          
  0 0          
  0 0          
  0 0          
  0 0          
  0 0          
  0 0          
  0 0          
  0 0          
  0 0          
  0 0          
  0 0          
  0 0          
  0 0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
37              
38              
39             typedef struct {
40             SV *self;
41             PGconn *conn;
42             SV *trace;
43             UV prep_counter;
44             UV cookie_counter;
45             UV cookie; /* currently active transaction object; 0 = none active */
46             int stflags;
47             int ntypes;
48             unsigned int prep_max;
49             unsigned int prep_cur; /* Number of prepared statements not associated with an active $st object */
50             fupg_type *types;
51             fupg_oid_overrides *oidtypes;
52             fupg_name_overrides *nametypes;
53             fupg_records *records;
54             fupg_prepared *prep_map;
55             fupg_prep *prep_head, *prep_tail; /* Inserted into head, removed at tail */
56             fustr buf; /* Scratch space for query params */
57             } fupg_conn;
58              
59              
60             typedef struct fupg_txn fupg_txn;
61             struct fupg_txn {
62             SV *self;
63             fupg_txn *parent;
64             fupg_conn *conn;
65             UV cookie; /* 0 means done */
66             int stflags;
67             char rollback_cmd[64];
68             };
69              
70              
71              
72              
73             /* Utilities */
74              
75 0           static SV *fupg_conn_errsv(PGconn *conn, const char *action) {
76             dTHX;
77 0           HV *hv = newHV();
78 0           hv_stores(hv, "action", newSVpv(action, 0));
79 0           hv_stores(hv, "severity", newSVpvs("FATAL")); /* Connection-related errors are always fatal */
80 0           hv_stores(hv, "message", newSVpv(PQerrorMessage(conn), 0));
81 0           return fu_croak_hv(hv, "FU::Pg::error", "FATAL: %s", PQerrorMessage(conn));
82             }
83              
84             __attribute__((noreturn))
85 0           static void fupg_conn_croak(fupg_conn *c, const char *action) {
86             dTHX;
87 0           croak_sv(fupg_conn_errsv(c->conn, action));
88             }
89              
90             /* Takes ownership of the PGresult and croaks. */
91             __attribute__((noreturn))
92 0           static void fupg_result_croak(PGresult *r, const char *action, const char *query) {
93             dTHX;
94 0           HV *hv = newHV();
95 0           hv_stores(hv, "action", newSVpv(action, 0));
96 0           char *s = PQresultErrorField(r, PG_DIAG_SEVERITY_NONLOCALIZED);
97 0 0         hv_stores(hv, "severity", newSVpv(s ? s : "FATAL", 0));
98 0 0         if (query) hv_stores(hv, "query", newSVpv(query, 0));
99              
100             /* If the PGresult is not an error, assume it's an unexpected resultStatus */
101 0           s = PQresultErrorField(r, PG_DIAG_MESSAGE_PRIMARY);
102 0 0         hv_stores(hv, "message", s ? newSVpv(s, 0) : newSVpvf("unexpected status code '%s'", PQresStatus(PQresultStatus(r))));
103              
104             /* I like the verbose error messages. Doesn't include anything that's not
105             * also fetched below, but saves me from having to do the formatting
106             * manually. */
107 0           char *verbose = NULL;
108 0 0         if (s) {
109 0           verbose = PQresultVerboseErrorMessage(r, PQERRORS_VERBOSE, PQSHOW_CONTEXT_ERRORS);
110 0 0         if (s) {
111 0           hv_stores(hv, "verbose_message", newSVpv(verbose, 0));
112 0           PQfreemem(verbose);
113             }
114             }
115              
116 0 0         if ((s = PQresultErrorField(r, PG_DIAG_MESSAGE_DETAIL))) hv_stores(hv, "detail", newSVpv(s, 0));
117 0 0         if ((s = PQresultErrorField(r, PG_DIAG_MESSAGE_HINT))) hv_stores(hv, "hint", newSVpv(s, 0));
118 0 0         if ((s = PQresultErrorField(r, PG_DIAG_STATEMENT_POSITION))) hv_stores(hv, "statement_position", newSVpv(s, 0));
119 0 0         if ((s = PQresultErrorField(r, PG_DIAG_INTERNAL_POSITION))) hv_stores(hv, "internal_position", newSVpv(s, 0));
120 0 0         if ((s = PQresultErrorField(r, PG_DIAG_INTERNAL_QUERY))) hv_stores(hv, "internal_query", newSVpv(s, 0));
121 0 0         if ((s = PQresultErrorField(r, PG_DIAG_CONTEXT))) hv_stores(hv, "context", newSVpv(s, 0));
122 0 0         if ((s = PQresultErrorField(r, PG_DIAG_SCHEMA_NAME))) hv_stores(hv, "schema_name", newSVpv(s, 0));
123 0 0         if ((s = PQresultErrorField(r, PG_DIAG_TABLE_NAME))) hv_stores(hv, "table_name", newSVpv(s, 0));
124 0 0         if ((s = PQresultErrorField(r, PG_DIAG_COLUMN_NAME))) hv_stores(hv, "column_name", newSVpv(s, 0));
125 0 0         if ((s = PQresultErrorField(r, PG_DIAG_DATATYPE_NAME))) hv_stores(hv, "datatype_name", newSVpv(s, 0));
126 0 0         if ((s = PQresultErrorField(r, PG_DIAG_CONSTRAINT_NAME))) hv_stores(hv, "constraint_name", newSVpv(s, 0));
127 0 0         if ((s = PQresultErrorField(r, PG_DIAG_SOURCE_FILE))) hv_stores(hv, "source_file", newSVpv(s, 0));
128 0 0         if ((s = PQresultErrorField(r, PG_DIAG_SOURCE_LINE))) hv_stores(hv, "source_line", newSVpv(s, 0));
129 0 0         if ((s = PQresultErrorField(r, PG_DIAG_SOURCE_FUNCTION))) hv_stores(hv, "source_function", newSVpv(s, 0));
130              
131 0           PQclear(r);
132 0 0         croak_sv(verbose
133             ? fu_croak_hv(hv, "FU::Pg::error", "%s", SvPV_nolen(*hv_fetchs(hv, "verbose_message", 0)))
134             : fu_croak_hv(hv, "FU::Pg::error", "%s: %s",
135             SvPV_nolen(*hv_fetchs(hv, "severity", 0)),
136             SvPV_nolen(*hv_fetchs(hv, "message", 0))
137             )
138             );
139             }
140              
141 0           static SV *fupg_exec_result(pTHX_ PGresult *r) {
142 0           SV *ret = &PL_sv_undef;
143 0           char *tup = PQcmdTuples(r);
144 0 0         if (tup && *tup) {
    0          
145 0           ret = sv_2mortal(newSVpv(tup, 0));
146 0           SvIV(ret);
147 0 0         SvIOK_only(ret);
148             }
149 0           return ret;
150             }
151              
152 0           static void fupg_exec_ok(fupg_conn *c, const char *sql) {
153 0           PGresult *r = PQexec(c->conn, sql);
154 0 0         if (!r) fupg_conn_croak(c, "exec");
155 0 0         if (PQresultStatus(r) != PGRES_COMMAND_OK) fupg_result_croak(r, "exec", sql);
156 0           PQclear(r);
157 0           }
158              
159              
160              
161              
162             /* Connection & transaction handling */
163              
164 0           static SV *fupg_connect(pTHX_ const char *str) {
165 0 0         if (!PQconnectdb) fupg_load();
166 0           PGconn *conn = PQconnectdb(str);
167 0 0         if (PQstatus(conn) != CONNECTION_OK) {
168 0           SV *sv = fupg_conn_errsv(conn, "connect");
169 0           PQfinish(conn);
170 0           croak_sv(sv);
171             }
172              
173 0           fupg_conn *c = safemalloc(sizeof(fupg_conn));
174 0           c->conn = conn;
175 0           c->trace = NULL;
176 0           c->prep_counter = c->cookie_counter = c->cookie = 0;
177 0           c->stflags = FUPG_CACHE;
178 0           c->ntypes = 0;
179 0           c->types = NULL;
180 0           c->records = fupg_records_init();
181 0           c->oidtypes = fupg_oid_overrides_init();
182 0           c->nametypes = fupg_name_overrides_init();
183 0           c->prep_cur = 0;
184 0           c->prep_max = 256;
185 0           c->prep_map = fupg_prepared_init();
186 0           c->prep_head = c->prep_tail = NULL;
187 0           fustr_init(&c->buf, NULL, SIZE_MAX);
188 0           return fu_selfobj(c, "FU::Pg::conn");
189             }
190              
191 0           static const char *fupg_conn_status(fupg_conn *c) {
192 0 0         if (PQstatus(c->conn) == CONNECTION_BAD) return "bad";
193 0           switch (PQtransactionStatus(c->conn)) {
194 0 0         case PQTRANS_IDLE: return c->cookie ? "txn_done" : "idle";
195 0           case PQTRANS_ACTIVE: return "active"; /* can't happen, we don't do async */
196 0           case PQTRANS_INTRANS: return "txn_idle";
197 0           case PQTRANS_INERROR: return "txn_error";
198 0           default: return "unknown";
199             }
200             }
201              
202 0           static void fupg_conn_disconnect(fupg_conn *c) {
203 0           PQfinish(c->conn);
204 0           c->conn = NULL;
205             /* We don't have an API to reconnect with the same $conn object, so no need
206             * to clean up the prepared statement cache at this point. */
207 0           }
208              
209 0           static void fupg_conn_destroy(pTHX_ fupg_conn *c) {
210 0           PQfinish(c->conn);
211 0 0         if (c->buf.sv) SvREFCNT_dec(c->buf.sv);
212 0           safefree(c->types);
213             khint_t k;
214              
215 0 0         kh_foreach(c->oidtypes, k) {
    0          
    0          
216 0           SvREFCNT_dec(kh_val(c->oidtypes, k).sendcb);
217 0           SvREFCNT_dec(kh_val(c->oidtypes, k).recvcb);
218             }
219 0           fupg_oid_overrides_destroy(c->oidtypes);
220              
221 0 0         kh_foreach(c->nametypes, k) {
    0          
    0          
222 0           SvREFCNT_dec(kh_val(c->nametypes, k).sendcb);
223 0           SvREFCNT_dec(kh_val(c->nametypes, k).recvcb);
224             }
225 0           fupg_name_overrides_destroy(c->nametypes);
226              
227 0 0         kh_foreach(c->records, k) safefree(kh_val(c->records, k));
    0          
    0          
228 0           fupg_records_destroy(c->records);
229              
230 0 0         kh_foreach(c->prep_map, k) fupg_prep_destroy(kh_key(c->prep_map, k));
    0          
    0          
231 0           fupg_prepared_destroy(c->prep_map);
232              
233 0           safefree(c);
234 0           }
235              
236 0           static SV *fupg_conn_txn(pTHX_ fupg_conn *c) {
237 0           fupg_exec_ok(c, "BEGIN");
238 0           fupg_txn *t = safecalloc(1, sizeof(fupg_txn));
239 0           t->conn = c;
240 0           t->cookie = c->cookie = ++c->cookie_counter;
241 0           t->stflags = c->stflags;
242 0           strcpy(t->rollback_cmd, "ROLLBACK");
243 0           SvREFCNT_inc(c->self);
244 0           return fu_selfobj(t, "FU::Pg::txn");
245             }
246              
247 0           static SV *fupg_txn_txn(pTHX_ fupg_txn *t) {
248             char cmd[64];
249 0           UV cookie = ++t->conn->cookie_counter;
250 0           snprintf(cmd, sizeof(cmd), "SAVEPOINT fupg_%"UVuf, cookie);
251 0           fupg_exec_ok(t->conn, cmd);
252              
253 0           fupg_txn *n = safecalloc(1, sizeof(fupg_txn));
254 0           n->conn = t->conn;
255 0           n->parent = t;
256 0           n->cookie = t->conn->cookie = cookie;
257 0           n->stflags = t->stflags;
258 0           snprintf(n->rollback_cmd, sizeof(n->rollback_cmd), "ROLLBACK TO SAVEPOINT fupg_%"UVuf, cookie);
259 0           SvREFCNT_inc(t->self);
260 0           return fu_selfobj(n, "FU::Pg::txn");
261             }
262              
263 0           static const char *fupg_txn_status(fupg_txn *t) {
264 0 0         if (PQstatus(t->conn->conn) == CONNECTION_BAD) return "bad";
265 0 0         if (!t->cookie) return "done";
266 0           int a = t->cookie == t->conn->cookie;
267 0           switch (PQtransactionStatus(t->conn->conn)) {
268 0           case PQTRANS_IDLE: return "done";
269 0           case PQTRANS_ACTIVE: return "active";
270 0 0         case PQTRANS_INTRANS: return a ? "idle" : "txn_idle";
271 0 0         case PQTRANS_INERROR: return a ? "error" : "txn_error";
272 0           default: return "unknown";
273             }
274             }
275              
276 0           static void fupg_txn_commit(fupg_txn *t) {
277             char cmd[64];
278 0 0         if (t->parent) snprintf(cmd, sizeof(cmd), "RELEASE SAVEPOINT fupg_%"UVuf, t->cookie);
279 0           else strcpy(cmd, "COMMIT");
280 0           t->cookie = 0;
281 0           fupg_exec_ok(t->conn, cmd);
282 0           }
283              
284 0           static void fupg_txn_rollback(fupg_txn *t) {
285 0           t->cookie = 0;
286 0           fupg_exec_ok(t->conn, t->rollback_cmd);
287 0           }
288              
289 0           static void fupg_txn_destroy(pTHX_ fupg_txn *t) {
290 0 0         if (t->cookie) {
291 0           PGresult *r = PQexec(t->conn->conn, t->rollback_cmd);
292             /* Can't really throw an error in DESTROY. If a rollback command fails,
293             * we're sufficiently screwed that the only sensible recourse is to
294             * disconnect and let any further operations throw an error. */
295 0 0         if (!r || PQresultStatus(r) != PGRES_COMMAND_OK)
    0          
296 0           fupg_conn_disconnect(t->conn);
297 0           PQclear(r);
298             }
299 0 0         if (t->parent) {
300 0           t->conn->cookie = t->parent->cookie;
301 0           SvREFCNT_dec(t->parent->self);
302             } else {
303 0           t->conn->cookie = 0;
304 0           SvREFCNT_dec(t->conn->self);
305             }
306 0           safefree(t);
307 0           }
308              
309              
310              
311              
312             /* Prepared statement caching */
313              
314 0           static void fupg_prepared_list_remove(fupg_conn *c, fupg_prep *p) {
315 0 0         if (p->next) p->next->prev = p->prev;
316 0 0         if (p->prev) p->prev->next = p->next;
317 0 0         if (c->prep_head == p) c->prep_head = p->next;
318 0 0         if (c->prep_tail == p) c->prep_tail = p->prev;
319 0           c->prep_cur--;
320 0           }
321              
322 0           static void fupg_prepared_list_unshift(fupg_conn *c, fupg_prep *p) {
323 0           p->next = c->prep_head;
324 0           p->prev = NULL;
325 0           c->prep_head = p;
326 0 0         if (p->next) p->next->prev = p;
327 0           else c->prep_tail = p;
328 0           c->prep_cur++;
329 0           }
330              
331 0           static void fupg_prepared_prune(fupg_conn *c) {
332 0 0         while (c->prep_cur > c->prep_max) {
333 0           fupg_prep *p = c->prep_tail;
334 0           fupg_prepared_list_remove(c, p);
335             assert(p->ref == 0);
336              
337 0           khint_t k = fupg_prepared_get(c->prep_map, p);
338             assert(k != kh_end(c->prep_map));
339 0           fupg_prepared_del(c->prep_map, k);
340              
341             char name[64];
342 0           snprintf(name, sizeof(name), "fupg%"UVuf, p->name);
343 0           PQclear(PQclosePrepared(c->conn, name));
344 0           fupg_prep_destroy(p);
345             }
346 0           }
347              
348             /* Fetch and ref a prepared statement, returns a new object if nothing was cached */
349 0           static fupg_prep *fupg_prepared_ref(pTHX_ fupg_conn *c, const char *query) {
350             fupg_prep prep;
351 0           prep.hash = kh_hash_str(query);
352 0           prep.query = (char *)query;
353 0           khint_t k = fupg_prepared_get(c->prep_map, &prep);
354             fupg_prep *p;
355              
356 0 0         if (k == kh_end(c->prep_map)) {
    0          
357 0           p = safecalloc(1, sizeof(*p));
358 0           p->hash = prep.hash;
359 0           p->query = savepv(query);
360 0           p->ref = 1;
361             int i;
362 0           fupg_prepared_put(c->prep_map, p, &i);
363              
364             } else {
365 0           p = kh_key(c->prep_map, k);
366 0 0         if (!p->ref++) fupg_prepared_list_remove(c, p);
367             }
368 0           return p;
369             }
370              
371 0           static void fupg_prepared_unref(fupg_conn *c, fupg_prep *p) {
372             assert(p->ref > 0);
373 0 0         if (!--p->ref) {
374 0           fupg_prepared_list_unshift(c, p);
375 0           fupg_prepared_prune(c);
376             }
377 0           }
378              
379              
380              
381              
382             /* Type handling */
383              
384 0           static const fupg_type *fupg_resolve_builtin(pTHX_ SV *name, SV **cb) {
385 0           SvGETMAGIC(name);
386 0           *cb = NULL;
387 0 0         if (!SvOK(name)) return NULL;
388              
389 0 0         if (SvROK(name)) {
390 0           SV *rv = SvRV(name);
391 0 0         if (SvTYPE(rv) == SVt_PVCV) {
392 0           *cb = SvREFCNT_inc(name);
393 0           return &fupg_type_perlcb;
394             }
395             }
396              
397             UV uv;
398 0           const char *pv = SvPV_nomg_nolen(name);
399 0 0         const fupg_type *t = grok_atoUV(pv, &uv, NULL) && uv <= (UV)UINT_MAX
400 0           ? fupg_builtin_byoid((Oid)uv)
401 0 0         : fupg_builtin_byname(pv);
402 0 0         if (!t) fu_confess("No builtin type found with oid or name '%s'", pv);
403 0           return t;
404             }
405              
406 0           static void fupg_set_type(pTHX_ fupg_conn *c, SV *name, SV *sendsv, SV *recvsv) {
407             fupg_override o;
408 0           o.send = fupg_resolve_builtin(aTHX_ sendsv, &o.sendcb);
409 0           o.recv = fupg_resolve_builtin(aTHX_ recvsv, &o.recvcb);
410 0 0         if ((o.send && o.send->send == fupg_send_array) || (o.recv && o.recv->recv == fupg_recv_array))
    0          
    0          
    0          
411 0           fu_confess("Cannot set a type to array, override the underlying element type instead");
412             /* Can't currently happen since we have no records in the builtin type
413             * list, but catch this just in case that changes. */
414 0 0         if ((o.send && o.send->send == fupg_send_record) || (o.recv && o.recv->recv == fupg_recv_record))
    0          
    0          
    0          
415 0           fu_confess("Cannot set a type to record");
416              
417             UV uv;
418             STRLEN len;
419 0           const char *pv = SvPV(name, len);
420             int k, absent;
421 0           fupg_override *so = NULL;
422 0 0         if (grok_atoUV(pv, &uv, NULL) && uv <= (UV)UINT_MAX) {
    0          
423 0           k = fupg_oid_overrides_put(c->oidtypes, (Oid)uv, &absent);
424 0           so = &kh_val(c->oidtypes, k);
425 0 0         } else if (len < sizeof(fupg_name)) {
426             fupg_name n;
427 0           strcpy(n.n, pv);
428 0           k = fupg_name_overrides_put(c->nametypes, n, &absent);
429 0           so = &kh_val(c->nametypes, k);
430             } else {
431 0           fu_confess("Invalid type oid or name '%s'", pv);
432             }
433 0 0         if (!absent) {
434 0           SvREFCNT_dec(so->sendcb);
435 0           SvREFCNT_dec(so->recvcb);
436             }
437 0           *so = o;
438 0           }
439              
440              
441             /* XXX: It feels a bit wasteful to load *all* types; even on an empty database
442             * that's ~55k of data, but it's easier and (potentially) faster than fetching
443             * each type seperately as we encounter them.
444             * Perhaps an easier optimization is to filter out all table-based composites
445             * and their array types by default, I've never seen anyone use those types for
446             * I/O and that would shrink the data by nearly a factor 5.
447             */
448 0           static void fupg_refresh_types(pTHX_ fupg_conn *c) {
449 0           safefree(c->types);
450 0           c->types = 0;
451 0           c->ntypes = 0;
452              
453 0           const char *sql =
454             "SELECT oid, typname, typtype"
455             ", CASE WHEN typtype = 'd' THEN typbasetype"
456             " WHEN typtype = 'c' THEN typrelid"
457             " WHEN typcategory = 'A' THEN typelem"
458             " ELSE 0 END"
459             " FROM pg_type"
460             " ORDER BY oid";
461 0           PGresult *r = PQexecParams(c->conn, sql, 0, NULL, NULL, NULL, NULL, 1);
462 0 0         if (!r) fupg_conn_croak(c, "exec");
463 0 0         if (PQresultStatus(r) != PGRES_TUPLES_OK) fupg_result_croak(r, "exec", sql);
464              
465 0           c->ntypes = PQntuples(r);
466 0           c->types = safecalloc(c->ntypes, sizeof(*c->types));
467             int i;
468 0 0         for (i=0; intypes; i++) {
469 0           fupg_type *t = c->types + i;
470 0           t->oid = fu_frombeU(32, PQgetvalue(r, i, 0));
471 0           snprintf(t->name.n, sizeof(t->name.n), "%s", PQgetvalue(r, i, 1));
472 0           char typ = *PQgetvalue(r, i, 2);
473 0           t->elemoid = fu_frombeU(32, PQgetvalue(r, i, 3));
474             const fupg_type *builtin;
475              
476 0 0         if (t->elemoid) {
477 0 0         if (typ == 'd') { /* domain */
478 0           t->send = fupg_send_domain;
479 0           t->recv = fupg_recv_domain;
480 0 0         } else if (typ == 'c') { /* composite type */
481 0           t->send = fupg_send_record;
482 0           t->recv = fupg_recv_record;
483             } else { /* array */
484 0           t->send = fupg_send_array;
485 0           t->recv = fupg_recv_array;
486             }
487 0 0         } else if (typ == 'e') {
488             /* enum, can use text send/recv */
489 0           t->send = fupg_send_text;
490 0           t->recv = fupg_recv_text;
491 0 0         } else if ((builtin = fupg_builtin_byoid(t->oid))) {
492 0           t->send = builtin->send;
493 0           t->recv = builtin->recv;
494 0 0         } else if ((builtin = fupg_dynoid_byname(t->name.n))) {
495 0           t->send = builtin->send;
496 0           t->recv = builtin->recv;
497             } else {
498             /* TODO: (multi)ranges */
499             }
500             }
501 0           PQclear(r);
502 0           }
503              
504 0           static const fupg_type *fupg_lookup_type(pTHX_ fupg_conn *c, int *refresh_done, Oid oid) {
505 0 0         if (oid == 0) return NULL;
506 0           const fupg_type *t = NULL;
507 0 0         if (c->types && (t = fupg_type_byoid(c->types, c->ntypes, oid))) return t;
    0          
508 0 0         if ((t = fupg_builtin_byoid(oid))) return t;
509 0 0         if (*refresh_done) return NULL;
510 0           *refresh_done = 1;
511 0           fupg_refresh_types(aTHX_ c);
512 0           return fupg_type_byoid(c->types, c->ntypes, oid);
513             }
514              
515              
516 0           static const fupg_record *fupg_lookup_record(fupg_conn *c, Oid oid) {
517 0           khint_t k = fupg_records_get(c->records, oid);
518 0 0         if (k != kh_end(c->records)) return kh_val(c->records, k);
    0          
519              
520 0           const char *sql =
521             "SELECT atttypid, attname"
522             " FROM pg_attribute"
523             " WHERE NOT attisdropped AND attnum > 0 AND attrelid = $1"
524             " ORDER BY attnum";
525             char buf[4];
526 0           fu_tobeU(32, buf, oid);
527 0           const char *abuf = buf;
528 0           int len = 4;
529 0           int format = 1;
530 0           PGresult *r = PQexecParams(c->conn, sql, 1, NULL, &abuf, &len, &format, 1);
531 0 0         if (!r) fupg_conn_croak(c, "exec");
532 0 0         if (PQresultStatus(r) != PGRES_TUPLES_OK) fupg_result_croak(r, "exec", sql);
533              
534 0           fupg_record *record = safemalloc(sizeof(*record) + PQntuples(r) * sizeof(*record->attrs));
535 0           record->nattrs = PQntuples(r);
536             int i;
537 0 0         for (i=0; inattrs; i++) {
538 0           record->attrs[i].oid = fu_frombeU(32, PQgetvalue(r, i, 0));
539 0           snprintf(record->attrs[i].name.n, sizeof(record->attrs->name.n), "%s", PQgetvalue(r, i, 1));
540             }
541 0           k = fupg_records_put(c->records, oid, &i);
542 0           kh_val(c->records, k) = record;
543 0           PQclear(r);
544 0           return record;
545             }
546              
547              
548             #define FUPGT_TEXT 1
549             #define FUPGT_SEND 2
550             #define FUPGT_RECV 4
551              
552 0           static const fupg_type *fupg_override_get(fupg_conn *c, int flags, Oid oid, const fupg_name *name, SV **cb) {
553             khint_t k;
554             fupg_override *o;
555 0 0         if (name == NULL) {
556 0           k = fupg_oid_overrides_get(c->oidtypes, oid);
557 0 0         o = k == kh_end(c->oidtypes) ? NULL : &kh_val(c->oidtypes, k);
    0          
558             } else {
559 0           k = fupg_name_overrides_get(c->nametypes, *name);
560 0 0         o = k == kh_end(c->nametypes) ? NULL : &kh_val(c->nametypes, k);
    0          
561             }
562 0 0         if (!o) return NULL;
563 0 0         *cb = flags & FUPGT_SEND ? o->sendcb : o->recvcb;
564 0 0         return flags & FUPGT_SEND ? o->send : o->recv;
565             }
566              
567 0           static void fupg_tio_setup(pTHX_ fupg_conn *conn, fupg_tio *tio, int flags, Oid oid, int *refresh_done) {
568 0           tio->oid = oid;
569 0 0         if (flags & FUPGT_TEXT) {
570 0           tio->name = "{textfmt}";
571 0           tio->send = fupg_send_text;
572 0           tio->recv = fupg_recv_text;
573 0           return;
574             }
575              
576             /* Minor wart? When the type is overridden by oid, its name in error
577             * messages will be that of the builtin type instead of the actual type. */
578              
579 0           SV *cb = NULL;
580             const fupg_type *e, *t;
581 0           e = t = fupg_override_get(conn, flags, oid, NULL, &cb);
582 0 0         if (!t) t = fupg_lookup_type(aTHX_ conn, refresh_done, oid);
583 0 0         if (!t) fu_confess("No type found with oid %u", oid);
584 0           tio->name = t->name.n;
585 0 0         if (!e && (e = fupg_override_get(conn, flags, 0, &t->name, &cb))) t = e;
    0          
586              
587 0 0         if (flags & FUPGT_SEND && !t->send) fu_confess("Unable to send type '%s' (oid %u)", tio->name, oid);
    0          
588 0 0         if (flags & FUPGT_RECV && !t->recv) fu_confess("Unable to receive type '%s' (oid %u)", tio->name, oid);
    0          
589              
590 0 0         if (flags & FUPGT_SEND ? t->send == fupg_send_domain : t->recv == fupg_recv_domain) {
    0          
591 0           e = fupg_lookup_type(aTHX_ conn, refresh_done, t->elemoid);
592 0 0         if (!e) fu_confess("Base type %u not found for domain '%s' (oid %u)", t->elemoid, tio->name, t->oid);
593 0           t = e;
594             }
595              
596 0           tio->send = t->send;
597 0           tio->recv = t->recv;
598              
599 0 0         if (flags & FUPGT_SEND ? tio->send == fupg_send_perlcb : tio->recv == fupg_recv_perlcb) {
    0          
600 0           tio->cb = cb;
601              
602 0 0         } else if (flags & FUPGT_SEND ? tio->send == fupg_send_array : tio->recv == fupg_recv_array) {
    0          
603 0           tio->arrayelem = safecalloc(1, sizeof(*tio->arrayelem));
604 0           fupg_tio_setup(aTHX_ conn, tio->arrayelem, flags, t->elemoid, refresh_done);
605              
606 0 0         } else if (flags & FUPGT_SEND ? tio->send == fupg_send_record : tio->recv == fupg_recv_record) {
    0          
607 0           tio->record.info = fupg_lookup_record(conn, t->elemoid);
608 0 0         if (!tio->record.info) fu_confess("Unable to find attributes for record type '%s' (oid %u, relid %u)", tio->name, t->oid, t->elemoid);
609 0           tio->record.tio = safecalloc(tio->record.info->nattrs, sizeof(*tio->record.tio));
610             int i;
611 0 0         for (i=0; irecord.info->nattrs; i++)
612 0           fupg_tio_setup(aTHX_ conn, tio->record.tio+i, flags, tio->record.info->attrs[i].oid, refresh_done);
613             }
614             }
615              
616 0           static void fupg_tio_free(fupg_tio *tio) {
617 0 0         if (!tio) return;
618             /* XXX: This assumes send/recv are the same types, at least for arrays & records */
619 0 0         if (tio->send == fupg_send_array) {
620 0           fupg_tio_free(tio->arrayelem);
621 0           safefree(tio->arrayelem);
622 0 0         } else if (tio->send == fupg_send_record) {
623             int i;
624 0 0         for (i=0; irecord.info->nattrs; i++)
625 0           fupg_tio_free(tio->record.tio+i);
626 0           safefree(tio->record.tio);
627             }
628             }
629              
630              
631              
632              
633 0           static SV *fupg_perl2bin(pTHX_ fupg_conn *conn, Oid oid, SV *sv) {
634 0           int refresh_done = 0;
635             fupg_tio tio;
636             fustr buf;
637 0           memset(&tio, 0, sizeof(tio));
638 0           fupg_tio_setup(aTHX_ conn, &tio, FUPGT_SEND, oid, &refresh_done);
639 0           fustr_init(&buf, sv_newmortal(), SIZE_MAX);
640 0           tio.send(aTHX_ &tio, sv, &buf); /* XXX: Leaks 'tio' on error */
641 0           fupg_tio_free(&tio);
642 0           return fustr_done(&buf);
643             }
644              
645 0           static SV *fupg_bin2perl(pTHX_ fupg_conn *conn, Oid oid, SV *sv) {
646 0           int refresh_done = 0;
647             fupg_tio tio;
648             STRLEN len;
649 0           const char *buf = SvPVbyte(sv, len);
650 0           memset(&tio, 0, sizeof(tio));
651 0           fupg_tio_setup(aTHX_ conn, &tio, FUPGT_RECV, oid, &refresh_done);
652 0           SV *r = tio.recv(aTHX_ &tio, buf, len); /* XXX: Leaks 'tio' on error */
653 0           fupg_tio_free(&tio);
654 0           return r;
655             }
656              
657              
658 0           static I32 fupg_bintext(pTHX_ fupg_conn *conn, int format, I32 ax, I32 argc) {
659 0           int vals = argc/2;
660              
661 0 0         if (argc == 1 || argc % 2 == 0) croak("Usage: $conn->%s(oid, data, ...)", format ? "text2bin" : "bin2text");
    0          
    0          
662 0 0         if (vals > 1 && GIMME_V != G_LIST) {
    0          
663 0           ST(0) = sv_2mortal(newSViv(vals));
664 0           return 1;
665             }
666              
667 0           Oid *paramtypes = safemalloc(vals * sizeof(*paramtypes));
668 0           const char **paramvalues = safemalloc(vals * sizeof(*paramvalues));
669 0           int *paramlengths = safemalloc(vals * sizeof(*paramlengths));
670 0           int *paramformats = safemalloc(vals * sizeof(*paramformats));
671              
672             fustr sql;
673 0           fustr_init(&sql, NULL, SIZE_MAX);
674 0           fustr_write(&sql, "SELECT ", 7);
675              
676             STRLEN len;
677             int i;
678 0 0         for (i=0; i
679 0           paramtypes[i] = SvIV(ST(i*2+1));
680 0 0         paramvalues[i] = format ? SvPVutf8(ST(i*2+2), len) : SvPVbyte(ST(i*2+2), len);
681 0           paramlengths[i] = len;
682 0           paramformats[i] = format ? 0 : 1;
683 0 0         if (i) fustr_write_ch(&sql, ',');
684 0           sql.cur -= 8 - sprintf(fustr_write_buf(&sql, 8), "$%d", i+1);
685             }
686 0           fustr_write_ch(&sql, 0);
687              
688 0 0         PGresult *r = PQexecParams(conn->conn, fustr_start(&sql), vals,
689             paramtypes, paramvalues, paramlengths, paramformats, format);
690 0           safefree(paramtypes);
691 0           safefree(paramvalues);
692 0           safefree(paramlengths);
693 0           safefree(paramformats);
694 0           SvREFCNT_dec(sql.sv);
695              
696 0 0         if (!r) fupg_conn_croak(conn, "exec");
697 0 0         if (PQresultStatus(r) != PGRES_TUPLES_OK) fupg_result_croak(r, "exec", sql.sv ? "SELECT $1, ..." : sql.sbuf);
    0          
698              
699             /* The stack is guaranteed to be large enough, since we received 1+2*vals arguments */
700 0 0         for (i=0; i
701 0 0         ST(i) = newSVpvn_flags(PQgetvalue(r, 0, i), PQgetlength(r, 0, i), SVs_TEMP | (format ? 0 : SVf_UTF8));
702              
703 0           PQclear(r);
704 0           return vals;
705             }