File Coverage

c/pgst.c
Criterion Covered Total %
statement 0 395 0.0
branch 0 256 0.0
condition n/a
subroutine n/a
pod n/a
total 0 651 0.0


line stmt bran cond sub pod time code
1             typedef struct {
2             /* Set on creation */
3             SV *self; /* (unused, but whatever) */
4             fupg_conn *conn; /* has a refcnt on conn->self */
5             UV cookie;
6             char *query;
7             SV **bind;
8             int nbind;
9             int stflags;
10              
11             /* Set during prepare */
12             int prepared;
13             char name[32];
14             fupg_prep *prep;
15             double preptime;
16             PGresult *describe; /* shared with prep->describe if prep is set */
17              
18             /* Set during execute */
19             int nfields;
20             const char **param_values; /* Points into conn->buf or st->bind SVs, may be invalid after exec */
21             int *param_lengths;
22             int *param_formats;
23             double exectime;
24             fupg_tio send;
25             fupg_tio *recv;
26             PGresult *result;
27             } fupg_st;
28              
29              
30 0           static void fupg_tracecb(pTHX_ fupg_st *st) {
31 0 0         if (!st->conn->trace) return;
32 0           dSP;
33              
34 0           ENTER;
35 0           SAVETMPS;
36              
37 0 0         PUSHMARK(SP);
38 0 0         mXPUSHs(sv_bless(newRV_inc(st->self), gv_stashpv("FU::Pg::st", GV_ADD)));
39 0           PUTBACK;
40              
41 0           call_sv(st->conn->trace, G_DISCARD);
42 0 0         FREETMPS;
43 0           LEAVE;
44             }
45              
46 0           static SV *fupg_exec(pTHX_ fupg_conn *c, const char *sql) {
47             struct timespec t_start;
48 0           clock_gettime(CLOCK_MONOTONIC, &t_start);
49 0           PGresult *r = PQexec(c->conn, sql);
50             struct timespec t_end;
51 0           clock_gettime(CLOCK_MONOTONIC, &t_end);
52              
53 0 0         if (!r) fupg_conn_croak(c, "exec");
54 0 0         switch (PQresultStatus(r)) {
55 0           case PGRES_EMPTY_QUERY:
56             case PGRES_COMMAND_OK:
57 0           case PGRES_TUPLES_OK: break;
58 0           default: fupg_result_croak(r, "exec", sql);
59             }
60              
61 0           SV *ret = fupg_exec_result(aTHX_ r);
62 0 0         if (c->trace) {
63 0           fupg_st *st = safecalloc(1, sizeof(*st));
64 0           st->conn = c;
65 0           SvREFCNT_inc(c->self);
66 0           st->cookie = c->cookie;
67 0           st->query = savepv(sql);
68 0           st->stflags = c->stflags;
69 0           st->result = r;
70 0           st->exectime = fu_timediff(&t_end, &t_start);
71 0           fu_selfobj(st, "FU::Pg::st");
72 0           fupg_tracecb(aTHX_ st);
73             } else {
74 0           PQclear(r);
75             }
76 0           return ret;
77             }
78              
79 0           static SV *fupg_sql(pTHX_ fupg_conn *c, int stflags, const char *query, I32 ax, I32 argc) {
80 0           fupg_st *st = safecalloc(1, sizeof(fupg_st));
81 0           st->conn = c;
82 0           st->cookie = c->cookie;
83 0           st->stflags = stflags;
84 0           SvREFCNT_inc(c->self);
85              
86 0           st->query = savepv(query);
87 0 0         if (argc > 2) {
88 0           st->bind = safemalloc((argc-2) * sizeof(SV *));
89             I32 i;
90 0 0         for (i=2; i < argc; i++) {
91 0           st->bind[st->nbind] = newSV(0);
92 0           sv_setsv(st->bind[st->nbind], ST(i));
93 0           st->nbind++;
94             }
95             }
96              
97 0           return fu_selfobj(st, "FU::Pg::st");
98             }
99              
100 0           static void fupg_st_destroy(pTHX_ fupg_st *st) {
101             int i;
102              
103 0 0         if (st->prep) {
104 0           fupg_prepared_unref(st->conn, st->prep);
105 0 0         } else if (st->prepared) {
106 0           PQclear(st->describe);
107 0           PQclear(PQclosePrepared(st->conn->conn, st->name));
108             }
109              
110 0           safefree(st->query);
111 0 0         for (i=0; i < st->nbind; i++) SvREFCNT_dec(st->bind[i]);
112 0           safefree(st->bind);
113 0           safefree(st->param_values);
114 0           safefree(st->param_lengths);
115 0           safefree(st->param_formats);
116 0 0         if (st->recv) for (i=0; infields; i++) fupg_tio_free(st->recv + i);
    0          
117 0           fupg_tio_free(&st->send);
118 0           safefree(st->recv);
119 0           PQclear(st->result);
120 0           SvREFCNT_dec(st->conn->self);
121 0           safefree(st);
122 0           }
123              
124 0           static void fupg_st_prepare(pTHX_ fupg_st *st) {
125 0 0         if (st->describe) return;
126 0 0         if (st->prepared) fu_confess("invalid attempt to re-prepare invalid statement");
127 0 0         if (st->result) fu_confess("invalid attempt to prepare already executed statement");
128              
129 0 0         if (st->stflags & FUPG_CACHE)
130 0           st->prep = fupg_prepared_ref(aTHX_ st->conn, st->query);
131 0 0         if (st->prep && st->prep->describe) {
    0          
132 0           snprintf(st->name, sizeof(st->name), "fupg%"UVuf, st->prep->name);
133 0           st->describe = st->prep->describe;
134 0           st->prepared = 1;
135 0           return;
136             }
137              
138 0           st->conn->prep_counter++;
139 0 0         if (st->prep) st->prep->name = st->conn->prep_counter;
140 0           snprintf(st->name, sizeof(st->name), "fupg%"UVuf, st->conn->prep_counter);
141              
142             struct timespec t_start;
143 0           clock_gettime(CLOCK_MONOTONIC, &t_start);
144              
145             /* Send prepare + describe in a pipeline to avoid a double round-trip with the server */
146 0           PQenterPipelineMode(st->conn->conn);
147 0           PQsendPrepare(st->conn->conn, st->name, st->query, 0, NULL);
148 0           PQsendDescribePrepared(st->conn->conn, st->name);
149 0           PQpipelineSync(st->conn->conn);
150 0           PGresult *prep = PQgetResult(st->conn->conn); PQgetResult(st->conn->conn); /* NULL */
151 0           PGresult *desc = PQgetResult(st->conn->conn); PQgetResult(st->conn->conn); /* NULL */
152 0           PGresult *sync = PQgetResult(st->conn->conn);
153 0           PQexitPipelineMode(st->conn->conn);
154              
155 0 0         if (!prep) {
156 0           PQclear(desc); PQclear(sync);
157 0           fupg_conn_croak(st->conn , "prepare");
158             }
159 0 0         if (PQresultStatus(prep) != PGRES_COMMAND_OK) {
160 0           PQclear(desc); PQclear(sync);
161 0           fupg_result_croak(prep, "prepare", st->query);
162             }
163 0           PQclear(prep);
164 0           st->prepared = 1;
165              
166             struct timespec t_end;
167 0           clock_gettime(CLOCK_MONOTONIC, &t_end);
168 0           st->preptime = fu_timediff(&t_end, &t_start);
169              
170 0 0         if (!desc) {
171 0           PQclear(sync);
172 0           fupg_conn_croak(st->conn , "prepare");
173             }
174 0 0         if (PQresultStatus(desc) != PGRES_COMMAND_OK) {
175 0           PQclear(sync);
176 0           fupg_result_croak(desc, "prepare", st->query);
177             }
178 0 0         if (st->prep) st->prep->describe = desc;
179 0           st->describe = desc;
180              
181 0 0         if (!sync) fupg_conn_croak(st->conn , "prepare");
182 0 0         if (PQresultStatus(sync) != PGRES_PIPELINE_SYNC)
183 0           fupg_result_croak(sync, "prepare", st->query);
184 0           PQclear(sync);
185             }
186              
187 0           static SV *fupg_st_param_types(pTHX_ fupg_st *st) {
188 0 0         if (st->result && !st->describe)
    0          
189 0           return sv_2mortal(newRV_noinc((SV *)newAV()));
190 0           fupg_st_prepare(aTHX_ st);
191 0           int i, nparams = PQnparams(st->describe);
192 0 0         AV *av = nparams == 0 ? newAV() : newAV_alloc_x(nparams);
193 0 0         for (i=0; i
194 0           av_push_simple(av, newSViv(PQparamtype(st->describe, i)));
195 0           return sv_2mortal(newRV_noinc((SV *)av));
196             }
197              
198 0           static SV *fupg_st_param_values(pTHX_ fupg_st *st) {
199             int i;
200 0 0         AV *av = st->nbind == 0 ? newAV() : newAV_alloc_x(st->nbind);
201 0 0         for (i=0; inbind; i++)
202 0           av_push_simple(av, SvREFCNT_inc(st->bind[i]));
203 0           return sv_2mortal(newRV_noinc((SV *)av));
204             }
205              
206 0           static SV *fupg_st_columns(pTHX_ fupg_st *st) {
207 0           PGresult *r = st->result;
208 0 0         if (!r) {
209 0           fupg_st_prepare(aTHX_ st);
210 0           r = st->describe;
211             }
212 0           int i, nfields = PQnfields(r);
213 0 0         AV *av = nfields == 0 ? newAV() : newAV_alloc_x(nfields);
214 0 0         for (i=0; i
215 0           HV *hv = newHV();
216 0           const char *name = PQfname(r, i);
217 0           hv_stores(hv, "name", newSVpvn_utf8(name, strlen(name), 1));
218 0           hv_stores(hv, "oid", newSViv(PQftype(r, i)));
219 0           int tmod = PQfmod(r, i);
220 0 0         if (tmod >= 0) hv_stores(hv, "typemod", newSViv(tmod));
221 0           av_push_simple(av, newRV_noinc((SV *)hv));
222             }
223 0           return sv_2mortal(newRV_noinc((SV *)av));
224             }
225              
226 0           static void fupg_params_setup(pTHX_ fupg_st *st, int *refresh_done) {
227             int i;
228 0           st->param_values = safecalloc(st->nbind, sizeof(*st->param_values));
229 0 0         if (st->stflags & FUPG_TEXT_PARAMS) {
230 0 0         for (i=0; inbind; i++)
231 0 0         st->param_values[i] = !SvOK(st->bind[i]) ? NULL : SvPVutf8_nolen(st->bind[i]);
232 0           return;
233             }
234              
235 0           fustr *buf = &st->conn->buf;
236 0 0         buf->cur = fustr_start(buf);
237 0           st->param_lengths = safecalloc(st->nbind, sizeof(*st->param_lengths));
238 0           st->param_formats = safecalloc(st->nbind, sizeof(*st->param_formats));
239 0           size_t off = 0;
240 0 0         for (i=0; inbind; i++) {
241 0 0         if (!SvOK(st->bind[i])) {
242 0           st->param_values[i] = NULL;
243 0           continue;
244             }
245 0           fupg_tio_setup(aTHX_ st->conn, &st->send,
246 0           FUPGT_SEND | (st->stflags & FUPG_TEXT_PARAMS ? FUPGT_TEXT : 0),
247 0           PQparamtype(st->describe, i), refresh_done);
248 0 0         off = fustr_len(buf);
249 0           st->send.send(aTHX_ &st->send, st->bind[i], buf);
250 0           fupg_tio_free(&st->send);
251 0           memset(&st->send, 0, sizeof(st->send));
252              
253 0 0         st->param_lengths[i] = fustr_len(buf) - off;
254 0           st->param_formats[i] = 1;
255 0           st->param_values[i] = "";
256             /* Don't write param_values here, the buffer may be invalidated when writing the next param */
257             }
258 0           off = 0;
259 0 0         buf->cur = fustr_start(buf);
260 0 0         for (i=0; inbind; i++) {
261 0 0         if (st->param_values[i]) {
262 0           st->param_values[i] = buf->cur + off;
263 0           off += st->param_lengths[i];
264             }
265             }
266             }
267              
268 0           static void fupg_st_execute(pTHX_ fupg_st *st) {
269             /* Disallow fetching the results more than once. I don't see a reason why
270             * someone would need that and disallowing it leaves room for fetching the
271             * results in a streaming fashion without breaking API compat. */
272 0 0         if (st->result) fu_confess("Invalid attempt to execute statement multiple times");
273              
274             /* Whether we can do a direct call or need to prepare first */
275 0 0         int direct = !st->describe && (st->nbind == 0 || st->stflags & FUPG_TEXT_PARAMS) && !(st->stflags & FUPG_CACHE);
    0          
    0          
    0          
276 0 0         if (!direct) {
277 0           fupg_st_prepare(aTHX_ st);
278 0 0         if (PQnparams(st->describe) != st->nbind)
279 0           fu_confess("Statement expects %d bind parameters but %d were given", PQnparams(st->describe), st->nbind);
280             }
281 0           int refresh_done = 0;
282 0           fupg_params_setup(aTHX_ st, &refresh_done);
283              
284             /* I'm not super fond of this approach. Storing the full query results in a
285             * PGresult involves unnecessary parsing, memory allocation and copying.
286             * The wire protocol is sufficiently simple that I could parse the query
287             * results directly from the network buffers without much additional code,
288             * and that would be much more efficient. Alas, libpq doesn't let me do
289             * that.
290             * There is the option of fetching results in chunked mode, but from what I
291             * gather that just saves a bit of memory in exchange for more and smaller
292             * malloc()/free()'s. Performance-wise, it probably won't be much of an
293             * improvement */
294             struct timespec t_start;
295 0           clock_gettime(CLOCK_MONOTONIC, &t_start);
296 0           PGresult *r = direct ? PQexecParams(st->conn->conn,
297 0           st->query, st->nbind, NULL,
298 0           (const char * const *)st->param_values,
299 0           st->param_lengths, st->param_formats,
300 0           st->stflags & FUPG_TEXT_RESULTS ? 0 : 1
301 0 0         ) : PQexecPrepared(st->conn->conn,
302 0           st->name, st->nbind,
303 0           (const char * const *)st->param_values,
304 0           st->param_lengths, st->param_formats,
305 0           st->stflags & FUPG_TEXT_RESULTS ? 0 : 1
306             );
307             struct timespec t_end;
308 0           clock_gettime(CLOCK_MONOTONIC, &t_end);
309 0           st->exectime = fu_timediff(&t_end, &t_start);
310              
311 0 0         if (!r) fupg_conn_croak(st->conn , "exec");
312 0 0         switch (PQresultStatus(r)) {
313 0           case PGRES_COMMAND_OK:
314 0           case PGRES_TUPLES_OK: break;
315 0           default: fupg_result_croak(r, "exec", st->query);
316             }
317 0           st->result = r;
318              
319 0           st->nfields = PQnfields(r);
320 0           st->recv = safecalloc(st->nfields, sizeof(*st->recv));
321             int i;
322 0 0         for (i=0; infields; i++)
323 0           fupg_tio_setup(aTHX_ st->conn, st->recv + i,
324 0           FUPGT_RECV | (st->stflags & FUPG_TEXT_RESULTS ? FUPGT_TEXT : 0),
325 0           PQftype(st->result, i), &refresh_done);
326              
327 0           fupg_tracecb(aTHX_ st);
328 0           }
329              
330 0           static SV *fupg_st_getval(pTHX_ fupg_st *st, int row, int col) {
331 0           PGresult *r = st->result;
332 0 0         if (PQgetisnull(r, row, col)) return newSV(0);
333 0           const fupg_tio *ctx = st->recv+col;
334 0           return ctx->recv(aTHX_ ctx, PQgetvalue(r, row, col), PQgetlength(r, row, col));
335             }
336              
337 0           static void fupg_st_check_dupcols(pTHX_ fupg_st *st, int start) {
338 0           PGresult *r = st->result;
339 0           HV *hv = newHV();
340 0           sv_2mortal((SV *)hv);
341 0           int i, nfields = PQnfields(r);
342 0 0         for (i=start; i
343 0           const char *key = PQfname(r, i);
344 0           int len = -strlen(key);
345 0 0         if (hv_exists(hv, key, len))
346 0           fu_confess("Query returns multiple columns with the same name ('%s')", key);
347 0           hv_store(hv, key, len, &PL_sv_yes, 0);
348             }
349 0           }
350              
351              
352              
353              
354             /* Result fetching */
355              
356 0           static SV *fupg_st_exec(pTHX_ fupg_st *st) {
357 0           fupg_st_execute(aTHX_ st);
358 0           return fupg_exec_result(aTHX_ st->result);
359             }
360              
361 0           static SV *fupg_st_val(pTHX_ fupg_st *st) {
362 0           fupg_st_execute(aTHX_ st);
363 0 0         if (st->nfields > 1) fu_confess("Invalid use of $st->val() on query returning more than one column");
364 0 0         if (st->nfields == 0) fu_confess("Invalid use of $st->val() on query returning no data");
365 0 0         if (PQntuples(st->result) > 1) fu_confess("Invalid use of $st->val() on query returning more than one row");
366 0 0         SV *sv = PQntuples(st->result) == 0 ? newSV(0) : fupg_st_getval(aTHX_ st, 0, 0);
367 0           return sv_2mortal(sv);
368             }
369              
370 0           static I32 fupg_st_rowl(pTHX_ fupg_st *st, I32 ax) {
371 0           dSP;
372 0           fupg_st_execute(aTHX_ st);
373 0 0         if (PQntuples(st->result) > 1) fu_confess("Invalid use of $st->rowl() on query returning more than one row");
374 0 0         int nfields = PQntuples(st->result) == 0 ? 0 : st->nfields;
375 0 0         if (GIMME_V != G_LIST) {
376 0           ST(0) = sv_2mortal(newSViv(nfields));
377 0           return 1;
378             }
379 0           (void)POPs;
380 0 0         EXTEND(SP, nfields);
    0          
381             int i;
382 0 0         for (i=0; i
383 0           return nfields;
384             }
385              
386 0           static SV *fupg_st_rowa(pTHX_ fupg_st *st) {
387 0           fupg_st_execute(aTHX_ st);
388 0 0         if (PQntuples(st->result) > 1) fu_confess("Invalid use of $st->rowl() on query returning more than one row");
389 0 0         if (PQntuples(st->result) == 0) return &PL_sv_undef;
390 0 0         AV *av = st->nfields == 0 ? newAV() : newAV_alloc_x(st->nfields);
391 0           SV *sv = sv_2mortal(newRV_noinc((SV *)av));
392             int i;
393 0 0         for (i=0; infields; i++) av_push_simple(av, fupg_st_getval(aTHX_ st, 0, i));
394 0           return sv;
395             }
396              
397 0           static SV *fupg_st_rowh(pTHX_ fupg_st *st) {
398 0           fupg_st_execute(aTHX_ st);
399 0           fupg_st_check_dupcols(aTHX_ st, 0);
400 0 0         if (PQntuples(st->result) > 1) fu_confess("Invalid use of $st->rowh() on query returning more than one row");
401 0 0         if (PQntuples(st->result) == 0) return &PL_sv_undef;
402 0           HV *hv = newHV();
403 0           SV *sv = sv_2mortal(newRV_noinc((SV *)hv));
404             int i;
405 0 0         for (i=0; infields; i++) {
406 0           const char *key = PQfname(st->result, i);
407 0           hv_store(hv, key, -strlen(key), fupg_st_getval(aTHX_ st, 0, i), 0);
408             }
409 0           return sv;
410             }
411              
412 0           static SV *fupg_st_alla(pTHX_ fupg_st *st) {
413 0           fupg_st_execute(aTHX_ st);
414 0           int i, j, nrows = PQntuples(st->result);
415 0 0         AV *av = nrows == 0 ? newAV() : newAV_alloc_x(nrows);
416 0           SV *sv = sv_2mortal(newRV_noinc((SV *)av));
417 0 0         for (i=0; i
418 0 0         AV *row = st->nfields == 0 ? newAV() : newAV_alloc_x(st->nfields);
419 0           av_push_simple(av, newRV_noinc((SV *)row));
420 0 0         for (j=0; jnfields; j++)
421 0           av_push_simple(row, fupg_st_getval(aTHX_ st, i, j));
422             }
423 0           return sv;
424             }
425              
426 0           static SV *fupg_st_allh(pTHX_ fupg_st *st) {
427 0           fupg_st_execute(aTHX_ st);
428 0           fupg_st_check_dupcols(aTHX_ st, 0);
429 0           int i, j, nrows = PQntuples(st->result);
430 0 0         AV *av = nrows == 0 ? newAV() : newAV_alloc_x(nrows);
431 0           SV *sv = sv_2mortal(newRV_noinc((SV *)av));
432 0 0         for (i=0; i
433 0           HV *row = newHV();
434 0           av_push_simple(av, newRV_noinc((SV *)row));
435 0 0         for (j=0; jnfields; j++) {
436 0           const char *key = PQfname(st->result, j);
437 0           hv_store(row, key, -strlen(key), fupg_st_getval(aTHX_ st, i, j), 0);
438             }
439             }
440 0           return sv;
441             }
442              
443 0           static SV *fupg_st_flat(pTHX_ fupg_st *st) {
444 0           fupg_st_execute(aTHX_ st);
445 0           int i, j, nrows = PQntuples(st->result);
446 0 0         AV *av = nrows == 0 || st->nfields == 0 ? newAV() : newAV_alloc_x(nrows * st->nfields);
    0          
447 0           SV *sv = sv_2mortal(newRV_noinc((SV *)av));
448 0 0         for (i=0; i
449 0 0         for (j=0; jnfields; j++)
450 0           av_push_simple(av, fupg_st_getval(aTHX_ st, i, j));
451             }
452 0           return sv;
453             }
454              
455 0           static SV *fupg_st_kvv(pTHX_ fupg_st *st) {
456 0           fupg_st_execute(aTHX_ st);
457 0 0         if (st->nfields > 2) fu_confess("Invalid use of $st->kvv() on query returning more than two columns");
458 0 0         if (st->nfields == 0) fu_confess("Invalid use of $st->kvv() on query returning no data");
459 0           int i, nrows = PQntuples(st->result);
460 0           HV *hv = newHV();
461 0           SV *sv = sv_2mortal(newRV_noinc((SV *)hv));
462 0 0         for (i=0; i
463 0           SAVETMPS;
464 0           SV *key = sv_2mortal(fupg_st_getval(aTHX_ st, i, 0));
465 0 0         if (hv_exists_ent(hv, key, 0)) fu_confess("Key '%s' is duplicated in $st->kvv() query results", SvPV_nolen(key));
466 0 0         hv_store_ent(hv, key, st->nfields == 1 ? newSV_true() : fupg_st_getval(aTHX_ st, i, 1), 0);
467 0 0         FREETMPS;
468             }
469 0           return sv;
470             }
471              
472 0           static SV *fupg_st_kva(pTHX_ fupg_st *st) {
473 0           fupg_st_execute(aTHX_ st);
474 0 0         if (st->nfields == 0) fu_confess("Invalid use of $st->kva() on query returning no data");
475 0           int i, j, nrows = PQntuples(st->result);
476 0           HV *hv = newHV();
477 0           SV *sv = sv_2mortal(newRV_noinc((SV *)hv));
478 0 0         for (i=0; i
479 0           SAVETMPS;
480 0           SV *key = sv_2mortal(fupg_st_getval(aTHX_ st, i, 0));
481 0 0         if (hv_exists_ent(hv, key, 0)) fu_confess("Key '%s' is duplicated in $st->kva() query results", SvPV_nolen(key));
482 0 0         AV *row = st->nfields == 1 ? newAV() : newAV_alloc_x(st->nfields-1);
483 0           hv_store_ent(hv, key, newRV_noinc((SV *)row), 0);
484 0 0         FREETMPS;
485 0 0         for (j=1; jnfields; j++)
486 0           av_push_simple(row, fupg_st_getval(aTHX_ st, i, j));
487             }
488 0           return sv;
489             }
490              
491 0           static SV *fupg_st_kvh(pTHX_ fupg_st *st) {
492 0           fupg_st_execute(aTHX_ st);
493 0           fupg_st_check_dupcols(aTHX_ st, 1);
494 0 0         if (st->nfields == 0) fu_confess("Invalid use of $st->kvh() on query returning no data");
495 0           int i, j, nrows = PQntuples(st->result);
496 0           HV *hv = newHV();
497 0           SV *sv = sv_2mortal(newRV_noinc((SV *)hv));
498 0 0         for (i=0; i
499 0           SAVETMPS;
500 0           SV *key = sv_2mortal(fupg_st_getval(aTHX_ st, i, 0));
501 0 0         if (hv_exists_ent(hv, key, 0)) fu_confess("Key '%s' is duplicated in $st->kvh() query results", SvPV_nolen(key));
502 0           HV *row = newHV();
503 0           hv_store_ent(hv, key, newRV_noinc((SV *)row), 0);
504 0 0         FREETMPS;
505 0 0         for (j=1; jnfields; j++) {
506 0           const char *key = PQfname(st->result, j);
507 0           hv_store(row, key, -strlen(key), fupg_st_getval(aTHX_ st, i, j), 0);
508             }
509             }
510 0           return sv;
511             }
512              
513              
514              
515              
516             /* COPY support */
517              
518             typedef struct {
519             SV *self;
520             fupg_conn *conn;
521             char in;
522             char bin;
523             char rddone;
524             char closed;
525             } fupg_copy;
526              
527 0           static SV *fupg_copy_exec(pTHX_ fupg_conn *c, const char *sql) {
528 0           PGresult *r = PQexec(c->conn, sql);
529              
530 0 0         if (!r) fupg_conn_croak(c, "exec");
531 0           int s = PQresultStatus(r);
532 0 0         switch (s) {
533 0           case PGRES_COPY_OUT:
534             case PGRES_COPY_IN:
535 0           break;
536 0           default: fupg_result_croak(r, "exec", sql);
537             }
538              
539 0           fupg_copy *copy = safecalloc(1, sizeof(fupg_copy));
540 0           copy->conn = c;
541 0           SvREFCNT_inc(c->self);
542 0           copy->bin = !!PQbinaryTuples(r);
543 0           copy->in = s == PGRES_COPY_IN;
544 0           PQclear(r);
545 0           return fu_selfobj(copy, "FU::Pg::copy");
546             }
547              
548 0           static void fupg_copy_write(pTHX_ fupg_copy *c, SV *data) {
549             STRLEN len;
550 0 0         const char *buf = c->bin ? SvPVbyte(data, len) : SvPVutf8(data, len);
551 0 0         if (PQputCopyData(c->conn->conn, buf, len) < 0) fupg_conn_croak(c->conn, "copy");
552 0           }
553              
554 0           static SV *fupg_copy_read(pTHX_ fupg_copy *c, int discard) {
555 0           char *buf = NULL;
556 0           int len = PQgetCopyData(c->conn->conn, &buf, 0);
557 0 0         if (len == -1) {
558 0           c->rddone = 1;
559 0           return &PL_sv_undef;
560 0 0         } else if (len < 0) {
561 0 0         if (discard) c->rddone = 1;
562 0           else fupg_conn_croak(c->conn, "copy");
563             }
564 0 0         SV *r = discard ? &PL_sv_undef : newSVpvn_flags(buf, len, SVs_TEMP | (c->bin ? 0 : SVf_UTF8));
    0          
565 0           PQfreemem(buf);
566 0           return r;
567             }
568              
569 0           static void fupg_copy_close(pTHX_ fupg_copy *c, int ignerror) {
570 0 0         if (c->closed) return;
571 0           c->closed = 1; /* Mark as closed even on error, a second attempt won't help anyway */
572              
573 0 0         if (c->in && PQputCopyEnd(c->conn->conn, NULL) < 0 && !ignerror)
    0          
    0          
574 0           fupg_conn_croak(c->conn, "copyEnd");
575              
576 0 0         while (!c->in && !c->rddone) fupg_copy_read(aTHX_ c, 1);
    0          
577              
578 0           PGresult *r = PQgetResult(c->conn->conn);
579 0 0         if (!ignerror && !r) fupg_conn_croak(c->conn, "copyEnd");
    0          
580 0 0         if (!ignerror && PQresultStatus(r) != PGRES_COMMAND_OK) fupg_result_croak(r, "copy", "");
    0          
581 0           PQclear(r);
582              
583 0 0         while ((r = PQgetResult(c->conn->conn))) PQclear(r);
584             }
585              
586 0           static void fupg_copy_destroy(pTHX_ fupg_copy *c) {
587 0           fupg_copy_close(aTHX_ c, 1);
588 0           SvREFCNT_dec(c->conn->self);
589 0           safefree(c);
590 0           }