File Coverage

MariaDB.xs
Criterion Covered Total %
statement 94 1536 6.1
branch 47 1252 3.7
condition n/a
subroutine n/a
pod n/a
total 141 2788 5.0


line stmt bran cond sub pod time code
1             #include "EXTERN.h"
2             #include "perl.h"
3             #include "XSUB.h"
4              
5             #include "EVAPI.h"
6             #include
7              
8             #include "ngx_queue.h"
9              
10             typedef struct ev_mariadb_s ev_mariadb_t;
11             typedef struct ev_mariadb_cb_s ev_mariadb_cb_t;
12             typedef struct ev_mariadb_send_s ev_mariadb_send_t;
13             typedef struct ev_mariadb_stmt_s ev_mariadb_stmt_t;
14              
15             typedef ev_mariadb_t* EV__MariaDB;
16             typedef struct ev_loop* EV__Loop;
17              
18             #define EV_MARIADB_MAGIC 0xDEADBEEF
19             #define EV_MARIADB_FREED 0xFEEDFACE
20              
21             enum ev_mariadb_state {
22             STATE_IDLE,
23             STATE_CONNECTING,
24             STATE_SEND,
25             STATE_READ_RESULT,
26             STATE_STORE_RESULT,
27             STATE_NEXT_RESULT,
28             STATE_PING,
29             STATE_CHANGE_USER,
30             STATE_SELECT_DB,
31             STATE_RESET_CONNECTION,
32             STATE_SET_CHARSET,
33             STATE_COMMIT,
34             STATE_ROLLBACK,
35             STATE_AUTOCOMMIT,
36             STATE_STMT_PREPARE, /* boundary: states >= here block query queueing */
37             STATE_STMT_EXECUTE,
38             STATE_STMT_STORE,
39             STATE_STMT_CLOSE,
40             STATE_STMT_RESET,
41             STATE_STMT_SEND_LONG_DATA,
42             STATE_REAL_QUERY,
43             STATE_STREAM_FETCH,
44             STATE_CLOSE,
45             };
46              
47             struct ev_mariadb_s {
48             unsigned int magic;
49             struct ev_loop *loop;
50             MYSQL *conn;
51              
52             ev_io rio, wio;
53             ev_timer timer;
54             int reading, writing, timing;
55             int fd;
56              
57             enum ev_mariadb_state state;
58             char *host, *user, *password, *database, *unix_socket;
59             unsigned int port;
60              
61             ngx_queue_t cb_queue; /* callbacks for sent queries (awaiting results) */
62             ngx_queue_t send_queue; /* queries waiting to be sent */
63             int pending_count; /* total: send_queue + cb_queue */
64             int send_count; /* queries sent, results not yet read */
65             int draining; /* draining multi-result extras */
66              
67             /* current operation context */
68             int op_ret;
69             MYSQL_RES *op_result;
70             MYSQL_STMT *op_stmt;
71             ev_mariadb_stmt_t *op_stmt_ctx; /* per-stmt wrapper for bind_params cleanup */
72             ev_mariadb_stmt_t *stmt_list; /* all allocated stmt wrappers */
73             MYSQL *op_conn_ret;
74             my_bool op_bool_ret;
75             MYSQL_ROW op_row; /* for streaming fetch_row result */
76             SV *stream_cb; /* streaming per-row callback */
77             char *op_data_ptr; /* copied data buffer for send_long_data */
78              
79             SV *on_connect;
80             SV *on_error;
81              
82             int callback_depth;
83             pid_t connect_pid; /* PID at connect time, for fork detection */
84              
85             /* connection options (applied before mysql_real_connect_start) */
86             unsigned int connect_timeout;
87             unsigned int read_timeout;
88             unsigned int write_timeout;
89             int compress;
90             int multi_statements;
91             int found_rows;
92             char *charset;
93             char *init_command;
94             char *ssl_key;
95             char *ssl_cert;
96             char *ssl_ca;
97             char *ssl_capath;
98             char *ssl_cipher;
99             int ssl_verify_server_cert;
100             int utf8; /* auto-flag result strings as UTF-8 */
101             unsigned long client_flags;
102             };
103              
104             struct ev_mariadb_cb_s {
105             SV *cb;
106             ngx_queue_t queue;
107             };
108              
109             struct ev_mariadb_send_s {
110             char *sql;
111             unsigned long sql_len;
112             SV *cb;
113             ngx_queue_t queue;
114             };
115              
116             struct ev_mariadb_stmt_s {
117             MYSQL_STMT *stmt;
118             MYSQL_BIND *bind_params;
119             int bind_param_count;
120             int closed; /* invalidated by cleanup_connection */
121             ev_mariadb_stmt_t *next; /* linked list of all stmts on this connection */
122             };
123              
124             #define MAX_PIPELINE_DEPTH 64
125             #define MAX_FREELIST_DEPTH 64
126              
127             #define COPY_ERROR(buf, src) do { \
128             strncpy((buf), (src), sizeof(buf) - 1); \
129             (buf)[sizeof(buf) - 1] = '\0'; \
130             } while (0)
131              
132             #define SET_STR_OPTION(field) do { \
133             if (self->field) Safefree(self->field); \
134             self->field = SvOK(value) ? safe_strdup(SvPV_nolen(value)) : NULL; \
135             } while (0)
136              
137             #define IS_FORKED(self) \
138             ((self)->connect_pid != 0 && (self)->connect_pid != getpid())
139              
140             #define CHECK_READY(self, cb) \
141             do { \
142             if (NULL == (self)->conn || (self)->state == STATE_CONNECTING) \
143             croak("not connected"); \
144             if (IS_FORKED(self)) \
145             croak("connection not valid after fork"); \
146             if ((self)->state != STATE_IDLE) \
147             croak("another operation is in progress"); \
148             if ((self)->send_count > 0) \
149             croak("cannot start operation while pipeline results are pending"); \
150             if (!(SvROK(cb) && SvTYPE(SvRV(cb)) == SVt_PVCV)) \
151             croak("callback must be a CODE reference"); \
152             } while (0)
153              
154             #define CHECK_STMT(ctx) \
155             do { \
156             if (!(ctx)) \
157             croak("invalid statement handle"); \
158             if ((ctx)->closed) \
159             croak("statement handle is no longer valid (connection was reset)"); \
160             } while (0)
161              
162             static void io_cb(EV_P_ ev_io *w, int revents);
163             static void timer_cb(EV_P_ ev_timer *w, int revents);
164             static void continue_operation(ev_mariadb_t *self, int events);
165             static void pipeline_advance(ev_mariadb_t *self);
166             static void on_next_result_done(ev_mariadb_t *self);
167             static void start_reading(ev_mariadb_t *self);
168             static void stop_reading(ev_mariadb_t *self);
169             static void start_writing(ev_mariadb_t *self);
170             static void stop_writing(ev_mariadb_t *self);
171             static void free_stmt_bind_params(ev_mariadb_stmt_t *ctx);
172             static int is_utf8_charset(unsigned int charsetnr);
173             static void start_timer(ev_mariadb_t *self);
174             static void stop_timer(ev_mariadb_t *self);
175             static void update_watchers(ev_mariadb_t *self, int status);
176             static void emit_error(ev_mariadb_t *self, const char *msg);
177             static void cleanup_connection(ev_mariadb_t *self);
178             static int check_destroyed(ev_mariadb_t *self);
179             static void drain_multi_result(ev_mariadb_t *self);
180             static AV* build_field_names(MYSQL *conn, MYSQL_FIELD *fields, unsigned int ncols);
181             static void on_real_query_done(ev_mariadb_t *self);
182             static void on_stream_fetch_done(ev_mariadb_t *self);
183             static void on_close_done(ev_mariadb_t *self);
184              
185 0           static void maybe_pipeline(ev_mariadb_t *self) {
186 0 0         if (self->state == STATE_IDLE && !ngx_queue_empty(&self->send_queue))
    0          
187 0           pipeline_advance(self);
188 0 0         else if (self->state == STATE_IDLE) {
189 0           stop_reading(self);
190 0           stop_writing(self);
191 0           stop_timer(self);
192             }
193 0           }
194              
195             /* --- freelist for cb_queue entries --- */
196              
197             static ev_mariadb_cb_t *cbt_freelist = NULL;
198             static int cbt_freelist_size = 0;
199              
200 0           static ev_mariadb_cb_t* alloc_cbt(void) {
201             ev_mariadb_cb_t *cbt;
202 0 0         if (cbt_freelist) {
203 0           cbt = cbt_freelist;
204 0           cbt_freelist = *(ev_mariadb_cb_t **)cbt;
205 0           cbt_freelist_size--;
206             } else {
207 0           Newx(cbt, 1, ev_mariadb_cb_t);
208             }
209 0           return cbt;
210             }
211              
212 0           static void release_cbt(ev_mariadb_cb_t *cbt) {
213 0 0         if (cbt_freelist_size >= MAX_FREELIST_DEPTH) {
214 0           Safefree(cbt);
215 0           return;
216             }
217 0           *(ev_mariadb_cb_t **)cbt = cbt_freelist;
218 0           cbt_freelist = cbt;
219 0           cbt_freelist_size++;
220             }
221              
222             /* --- freelist for send_queue entries --- */
223              
224             static ev_mariadb_send_t *send_freelist = NULL;
225             static int send_freelist_size = 0;
226              
227 0           static ev_mariadb_send_t* alloc_send(void) {
228             ev_mariadb_send_t *s;
229 0 0         if (send_freelist) {
230 0           s = send_freelist;
231 0           send_freelist = *(ev_mariadb_send_t **)s;
232 0           send_freelist_size--;
233             } else {
234 0           Newx(s, 1, ev_mariadb_send_t);
235             }
236 0           return s;
237             }
238              
239 0           static void release_send(ev_mariadb_send_t *s) {
240 0 0         if (send_freelist_size >= MAX_FREELIST_DEPTH) {
241 0           Safefree(s);
242 0           return;
243             }
244 0           *(ev_mariadb_send_t **)s = send_freelist;
245 0           send_freelist = s;
246 0           send_freelist_size++;
247             }
248              
249 0           static void push_send(ev_mariadb_t *self, const char *sql, STRLEN sql_len, SV *cb) {
250             char *sql_copy;
251             ev_mariadb_send_t *s;
252 0           Newx(sql_copy, sql_len + 1, char);
253 0           Copy(sql, sql_copy, sql_len + 1, char);
254              
255 0           s = alloc_send();
256 0           s->sql = sql_copy;
257 0           s->sql_len = (unsigned long)sql_len;
258 0           s->cb = SvREFCNT_inc(cb);
259 0           ngx_queue_insert_tail(&self->send_queue, &s->queue);
260 0           self->pending_count++;
261 0           }
262              
263 2           static void drain_queues_silent(ev_mariadb_t *self) {
264 2 50         while (!ngx_queue_empty(&self->send_queue)) {
265 0           ngx_queue_t *q = ngx_queue_head(&self->send_queue);
266 0           ev_mariadb_send_t *send = ngx_queue_data(q, ev_mariadb_send_t, queue);
267 0           ngx_queue_remove(q);
268 0           Safefree(send->sql);
269 0           SvREFCNT_dec(send->cb);
270 0           release_send(send);
271 0           self->pending_count--;
272             }
273 2 50         while (!ngx_queue_empty(&self->cb_queue)) {
274 0           ngx_queue_t *q = ngx_queue_head(&self->cb_queue);
275 0           ev_mariadb_cb_t *cbt = ngx_queue_data(q, ev_mariadb_cb_t, queue);
276 0           ngx_queue_remove(q);
277 0           SvREFCNT_dec(cbt->cb);
278 0           release_cbt(cbt);
279 0           self->pending_count--;
280             }
281 2           }
282              
283             /* --- watcher helpers --- */
284              
285 0           static void start_reading(ev_mariadb_t *self) {
286 0 0         if (!self->reading && self->fd >= 0) {
    0          
287 0           ev_io_start(self->loop, &self->rio);
288 0           self->reading = 1;
289             }
290 0           }
291              
292 2           static void stop_reading(ev_mariadb_t *self) {
293 2 50         if (self->reading) {
294 0           ev_io_stop(self->loop, &self->rio);
295 0           self->reading = 0;
296             }
297 2           }
298              
299 0           static void start_writing(ev_mariadb_t *self) {
300 0 0         if (!self->writing && self->fd >= 0) {
    0          
301 0           ev_io_start(self->loop, &self->wio);
302 0           self->writing = 1;
303             }
304 0           }
305              
306 2           static void stop_writing(ev_mariadb_t *self) {
307 2 50         if (self->writing) {
308 0           ev_io_stop(self->loop, &self->wio);
309 0           self->writing = 0;
310             }
311 2           }
312              
313 0           static void start_timer(ev_mariadb_t *self) {
314 0 0         if (!self->timing) {
315 0           unsigned int ms = mysql_get_timeout_value_ms(self->conn);
316 0 0         if (ms > 0) {
317 0           ev_timer_set(&self->timer, ms / 1000.0, 0.0);
318 0           ev_timer_start(self->loop, &self->timer);
319 0           self->timing = 1;
320             }
321             }
322 0           }
323              
324 2           static void stop_timer(ev_mariadb_t *self) {
325 2 50         if (self->timing) {
326 0           ev_timer_stop(self->loop, &self->timer);
327 0           self->timing = 0;
328             }
329 2           }
330              
331 0           static void update_watchers(ev_mariadb_t *self, int status) {
332 0 0         if (status & (MYSQL_WAIT_READ | MYSQL_WAIT_EXCEPT)) start_reading(self); else stop_reading(self);
333 0 0         if (status & MYSQL_WAIT_WRITE) start_writing(self); else stop_writing(self);
334 0 0         if (status & MYSQL_WAIT_TIMEOUT) start_timer(self); else stop_timer(self);
335 0           }
336              
337 0           static void init_io_watchers(ev_mariadb_t *self) {
338 0           ev_io_init(&self->rio, io_cb, self->fd, EV_READ);
339 0           self->rio.data = (void *)self;
340 0           ev_io_init(&self->wio, io_cb, self->fd, EV_WRITE);
341 0           self->wio.data = (void *)self;
342 0           }
343              
344 0           static int check_destroyed(ev_mariadb_t *self) {
345 0 0         if (self->magic == EV_MARIADB_FREED &&
346 0 0         self->callback_depth == 0) {
347 0           Safefree(self);
348 0           return 1;
349             }
350 0           return 0;
351             }
352              
353 0           static void emit_error(ev_mariadb_t *self, const char *msg) {
354             SV *cb;
355 0 0         if (NULL == self->on_error) return;
356 0           cb = sv_2mortal(SvREFCNT_inc_simple_NN(self->on_error));
357              
358 0           dSP;
359 0           ENTER;
360 0           SAVETMPS;
361 0 0         PUSHMARK(SP);
362 0 0         XPUSHs(sv_2mortal(newSVpv(msg, 0)));
363 0           PUTBACK;
364              
365 0           call_sv(cb, G_DISCARD | G_EVAL);
366 0 0         if (SvTRUE(ERRSV)) {
    0          
367 0 0         warn("EV::MariaDB: exception in error handler: %s", SvPV_nolen(ERRSV));
368 0 0         sv_setpvn(ERRSV, "", 0);
369             }
370              
371 0 0         FREETMPS;
372 0           LEAVE;
373             }
374              
375             /* Pop and return the head callback from cb_queue. Caller must SvREFCNT_dec. */
376 0           static SV* pop_cb(ev_mariadb_t *self) {
377             ngx_queue_t *q;
378             ev_mariadb_cb_t *cbt;
379             SV *cb;
380              
381 0 0         if (ngx_queue_empty(&self->cb_queue)) return NULL;
382              
383 0           q = ngx_queue_head(&self->cb_queue);
384 0           cbt = ngx_queue_data(q, ev_mariadb_cb_t, queue);
385              
386 0           cb = cbt->cb;
387 0           ngx_queue_remove(q);
388 0           self->pending_count--;
389 0           release_cbt(cbt);
390              
391 0           return cb;
392             }
393              
394             /* Invoke a callback with (undef, errmsg). Decrements cb refcount. */
395 0           static void invoke_error_cb(SV *cb, const char *errmsg) {
396 0           dSP;
397 0           ENTER;
398 0           SAVETMPS;
399 0 0         PUSHMARK(SP);
400 0           PUSHs(&PL_sv_undef);
401 0           PUSHs(sv_2mortal(newSVpv(errmsg, 0)));
402 0           PUTBACK;
403 0           call_sv(cb, G_DISCARD | G_EVAL);
404 0 0         if (SvTRUE(ERRSV)) {
    0          
405 0 0         warn("EV::MariaDB: exception in callback: %s", SvPV_nolen(ERRSV));
406 0 0         sv_setpvn(ERRSV, "", 0);
407             }
408 0           SvREFCNT_dec(cb);
409 0 0         FREETMPS;
410 0           LEAVE;
411 0           }
412              
413             /* Invoke a callback with (undef) — EOF signal. Decrements cb refcount. */
414 0           static void invoke_eof_cb(SV *cb) {
415 0           dSP;
416 0           ENTER;
417 0           SAVETMPS;
418 0 0         PUSHMARK(SP);
419 0           PUSHs(&PL_sv_undef);
420 0           PUTBACK;
421 0           call_sv(cb, G_DISCARD | G_EVAL);
422 0 0         if (SvTRUE(ERRSV)) {
    0          
423 0 0         warn("EV::MariaDB: exception in stream callback: %s", SvPV_nolen(ERRSV));
424 0 0         sv_setpvn(ERRSV, "", 0);
425             }
426 0           SvREFCNT_dec(cb);
427 0 0         FREETMPS;
428 0           LEAVE;
429 0           }
430              
431             /* Invoke a callback SV with args already on the stack. Decrements refcount. */
432 0           static void invoke_cb(SV *cb) {
433 0           call_sv(cb, G_DISCARD | G_EVAL);
434 0 0         if (SvTRUE(ERRSV)) {
    0          
435 0 0         warn("EV::MariaDB: exception in callback: %s", SvPV_nolen(ERRSV));
436 0 0         sv_setpvn(ERRSV, "", 0);
437             }
438 0           SvREFCNT_dec(cb);
439 0           }
440              
441             /* Returns 1 if self was freed (caller must not touch self). */
442 0           static int deliver_result(ev_mariadb_t *self) {
443 0           MYSQL_RES *res = self->op_result;
444 0           SV *cb = pop_cb(self);
445              
446 0 0         if (cb == NULL) {
447 0 0         if (res) {
448 0           mysql_free_result(res);
449 0           self->op_result = NULL;
450             }
451 0           return 0;
452             }
453              
454 0           self->callback_depth++;
455              
456             {
457 0           dSP;
458 0           ENTER;
459 0           SAVETMPS;
460 0 0         PUSHMARK(SP);
461              
462 0 0         if (res == NULL && mysql_field_count(self->conn) > 0) {
    0          
463 0           PUSHs(&PL_sv_undef);
464 0           PUSHs(sv_2mortal(newSVpv(mysql_error(self->conn), 0)));
465             }
466 0 0         else if (res != NULL) {
467 0           my_ulonglong nrows = mysql_num_rows(res);
468 0           unsigned int ncols = mysql_num_fields(res);
469 0           MYSQL_FIELD *fields = mysql_fetch_fields(res);
470 0           AV *rows = newAV();
471             AV *fnames;
472             MYSQL_ROW row;
473             unsigned long *lengths;
474              
475 0 0         if (nrows > 0) av_extend(rows, (SSize_t)(nrows < (my_ulonglong)SSize_t_MAX ? nrows - 1 : SSize_t_MAX));
    0          
476 0 0         while ((row = mysql_fetch_row(res)) != NULL) {
477 0           AV *r = newAV();
478             unsigned int c;
479 0           lengths = mysql_fetch_lengths(res);
480 0 0         if (ncols > 0) av_extend(r, ncols - 1);
481 0 0         for (c = 0; c < ncols; c++) {
482 0 0         if (row[c] == NULL) {
483 0           av_push(r, newSV(0));
484             } else {
485 0           SV *val = newSVpvn(row[c], lengths[c]);
486 0 0         if (self->utf8 && is_utf8_charset(fields[c].charsetnr))
    0          
487 0           SvUTF8_on(val);
488 0           av_push(r, val);
489             }
490             }
491 0           av_push(rows, newRV_noinc((SV*)r));
492             }
493             /* build field names before freeing result */
494 0           fnames = build_field_names(self->conn, fields, ncols);
495 0           mysql_free_result(res);
496 0           self->op_result = NULL;
497 0           PUSHs(sv_2mortal(newRV_noinc((SV*)rows)));
498 0           PUSHs(&PL_sv_undef);
499 0           PUSHs(sv_2mortal(newRV_noinc((SV*)fnames)));
500             }
501             else {
502 0           my_ulonglong affected = mysql_affected_rows(self->conn);
503 0           PUSHs(sv_2mortal(newSVuv((UV)affected)));
504             }
505              
506 0           PUTBACK;
507 0           invoke_cb(cb);
508 0 0         FREETMPS;
509 0           LEAVE;
510             }
511              
512 0           self->callback_depth--;
513 0           return check_destroyed(self);
514             }
515              
516             /* Returns 1 if self was freed. */
517 0           static int deliver_error(ev_mariadb_t *self, const char *errmsg) {
518 0           SV *cb = pop_cb(self);
519 0 0         if (cb == NULL) return 0;
520              
521 0           self->callback_depth++;
522 0           invoke_error_cb(cb, errmsg);
523 0           self->callback_depth--;
524 0           return check_destroyed(self);
525             }
526              
527             /* Returns 1 if self was freed. Caller passes a non-mortal SV; ownership is
528             transferred — deliver_value will sv_2mortal inside its SAVETMPS scope so
529             FREETMPS correctly frees it (avoids mortal leak when sv_2mortal is called
530             before SAVETMPS). */
531 0           static int deliver_value(ev_mariadb_t *self, SV *val) {
532 0           SV *cb = pop_cb(self);
533 0 0         if (cb == NULL) {
534 0           SvREFCNT_dec(val);
535 0           return 0;
536             }
537              
538 0           self->callback_depth++;
539              
540             {
541 0           dSP;
542 0           ENTER;
543 0           SAVETMPS;
544 0 0         PUSHMARK(SP);
545 0           PUSHs(sv_2mortal(val));
546 0           PUTBACK;
547 0           invoke_cb(cb);
548 0 0         FREETMPS;
549 0           LEAVE;
550             }
551              
552 0           self->callback_depth--;
553 0           return check_destroyed(self);
554             }
555              
556 0           static void cleanup_connection(ev_mariadb_t *self) {
557 0           int saved_fd = self->fd;
558 0 0         int is_fork = IS_FORKED(self);
    0          
559              
560 0           stop_reading(self);
561 0           stop_writing(self);
562 0           stop_timer(self);
563 0           self->fd = -1;
564 0           self->state = STATE_IDLE;
565 0           self->send_count = 0;
566 0           self->draining = 0;
567              
568 0 0         if (self->op_data_ptr) {
569 0           Safefree(self->op_data_ptr);
570 0           self->op_data_ptr = NULL;
571             }
572              
573 0           self->op_stmt_ctx = NULL;
574              
575             /* Invalidate all tracked stmt wrappers (don't free — user may hold handles) */
576             {
577 0           ev_mariadb_stmt_t *ctx = self->stmt_list;
578 0 0         while (ctx) {
579 0           free_stmt_bind_params(ctx);
580 0 0         if (!is_fork && ctx->stmt) {
    0          
581 0 0         if (ctx->stmt == self->op_stmt)
582 0           self->op_stmt = NULL; /* avoid double-close below */
583 0           mysql_stmt_close(ctx->stmt);
584             }
585 0           ctx->stmt = NULL;
586 0           ctx->closed = 1;
587 0           ctx = ctx->next;
588             }
589             /* keep stmt_list linked for close_stmt to unlink+free later */
590             }
591              
592 0 0         if (is_fork) {
593 0           self->op_result = NULL;
594 0           self->op_stmt = NULL;
595 0           self->conn = NULL;
596             } else {
597 0 0         if (self->op_result && saved_fd >= 0)
    0          
598 0           shutdown(saved_fd, SHUT_RDWR);
599             /* Close op_stmt if not tracked in stmt_list (e.g., in-flight prepare) */
600 0 0         if (self->op_stmt) {
601 0           mysql_stmt_close(self->op_stmt);
602             }
603 0           self->op_stmt = NULL;
604 0 0         if (self->op_result) {
605 0           mysql_free_result(self->op_result);
606 0           self->op_result = NULL;
607             }
608 0 0         if (self->conn) {
609 0           MYSQL *conn = self->conn;
610 0           self->conn = NULL;
611 0           mysql_close(conn);
612             }
613             }
614 0           }
615              
616 2           static void cancel_pending(ev_mariadb_t *self, const char *errmsg) {
617             ngx_queue_t local_send;
618             ngx_queue_t local_cb;
619              
620 2           ngx_queue_init(&local_send);
621 2           ngx_queue_init(&local_cb);
622              
623 2 50         if (!ngx_queue_empty(&self->send_queue)) {
624 0           ngx_queue_add(&local_send, &self->send_queue);
625 0           ngx_queue_init(&self->send_queue);
626             }
627              
628 2 50         if (!ngx_queue_empty(&self->cb_queue)) {
629 0           ngx_queue_add(&local_cb, &self->cb_queue);
630 0           ngx_queue_init(&self->cb_queue);
631             }
632              
633 2           self->send_count = 0;
634 2           self->callback_depth++;
635              
636             /* cancel active stream if any */
637 2 50         if (self->stream_cb) {
638 0           SV *cb = self->stream_cb;
639 0           self->stream_cb = NULL;
640 0           self->pending_count--;
641 0           invoke_error_cb(cb, errmsg);
642             }
643              
644             /* cancel unsent queries */
645 2 50         while (!ngx_queue_empty(&local_send)) {
646 0           ngx_queue_t *q = ngx_queue_head(&local_send);
647 0           ev_mariadb_send_t *send = ngx_queue_data(q, ev_mariadb_send_t, queue);
648 0           SV *cb = send->cb;
649 0           ngx_queue_remove(q);
650 0           Safefree(send->sql);
651 0           release_send(send);
652 0           self->pending_count--;
653              
654 0           invoke_error_cb(cb, errmsg);
655             }
656              
657             /* cancel sent-but-not-received */
658 2 50         while (!ngx_queue_empty(&local_cb)) {
659 0           ngx_queue_t *q = ngx_queue_head(&local_cb);
660 0           ev_mariadb_cb_t *cbt = ngx_queue_data(q, ev_mariadb_cb_t, queue);
661 0           SV *cb = cbt->cb;
662 0           ngx_queue_remove(q);
663 0           self->pending_count--;
664 0           release_cbt(cbt);
665              
666 0           invoke_error_cb(cb, errmsg);
667             }
668              
669 2           self->callback_depth--;
670 2           }
671              
672 0           static void push_cb(ev_mariadb_t *self, SV *cb) {
673 0           ev_mariadb_cb_t *cbt = alloc_cbt();
674 0           cbt->cb = SvREFCNT_inc(cb);
675 0           ngx_queue_insert_tail(&self->cb_queue, &cbt->queue);
676 0           self->pending_count++;
677 0           }
678              
679             /* Push cb to cb_queue, transferring ownership (no extra SvREFCNT_inc). */
680 0           static void push_cb_owned(ev_mariadb_t *self, SV *cb) {
681 0           ev_mariadb_cb_t *cbt = alloc_cbt();
682 0           cbt->cb = cb;
683 0           ngx_queue_insert_tail(&self->cb_queue, &cbt->queue);
684             /* pending_count was incremented by push_send; ownership transfers from send_queue to cb_queue */
685 0           }
686              
687 2           static SV* handler_accessor(SV **slot, SV *handler, int has_arg) {
688 2 50         if (has_arg) {
689 2 50         if (NULL != *slot) {
690 0           SvREFCNT_dec(*slot);
691 0           *slot = NULL;
692             }
693 2 50         if (NULL != handler && SvOK(handler) &&
    50          
694 2 50         SvROK(handler) && SvTYPE(SvRV(handler)) == SVt_PVCV) {
    50          
695 2           *slot = SvREFCNT_inc(handler);
696             }
697             }
698              
699 2           return (NULL != *slot)
700 2           ? SvREFCNT_inc(*slot)
701 4 50         : &PL_sv_undef;
702             }
703              
704             /* --- Multi-result drain --- */
705              
706 0           static void drain_multi_result(ev_mariadb_t *self) {
707             int status;
708              
709 0           self->draining = 1;
710 0           status = mysql_next_result_start(&self->op_ret, self->conn);
711 0 0         if (status != 0) {
712 0           self->state = STATE_NEXT_RESULT;
713 0           update_watchers(self, status);
714 0           return;
715             }
716 0           on_next_result_done(self);
717             }
718              
719 0           static void on_next_result_done(ev_mariadb_t *self) {
720             int status;
721              
722             for (;;) {
723 0 0         if (self->op_ret > 0) {
724             /* error in secondary result set — report and stop draining */
725 0           self->draining = 0;
726 0           self->state = STATE_IDLE;
727 0           self->callback_depth++;
728 0           emit_error(self, mysql_error(self->conn));
729 0           self->callback_depth--;
730 0 0         if (check_destroyed(self)) return;
731 0 0         if (self->state != STATE_IDLE) return;
732 0 0         if (self->conn && mysql_more_results(self->conn)) {
    0          
733 0           drain_multi_result(self);
734 0           return;
735             }
736 0           pipeline_advance(self);
737 0           return;
738             }
739              
740 0 0         if (self->op_ret == -1) {
741             /* no more results */
742 0           break;
743             }
744              
745             /* op_ret == 0: another result set is available */
746 0 0         if (mysql_field_count(self->conn) > 0) {
747 0           status = mysql_store_result_start(&self->op_result, self->conn);
748 0 0         if (status != 0) {
749 0           self->state = STATE_STORE_RESULT;
750 0           update_watchers(self, status);
751 0           return;
752             }
753             /* synchronous store — free and continue drain */
754 0 0         if (self->op_result) {
755 0           mysql_free_result(self->op_result);
756 0           self->op_result = NULL;
757             }
758             }
759              
760             /* DML result or sync store done — check for more */
761 0 0         if (!mysql_more_results(self->conn))
762 0           break;
763              
764 0           status = mysql_next_result_start(&self->op_ret, self->conn);
765 0 0         if (status != 0) {
766 0           self->state = STATE_NEXT_RESULT;
767 0           update_watchers(self, status);
768 0           return;
769             }
770             /* synchronous completion — loop instead of recursing */
771             }
772              
773 0           self->draining = 0;
774 0           self->state = STATE_IDLE;
775 0           pipeline_advance(self);
776             }
777              
778             /* --- Pipeline: send_query + read_query_result state machine --- */
779              
780 0           static void handle_send_failure(ev_mariadb_t *self, ev_mariadb_send_t *send) {
781             char errbuf[512];
782 0           SV *cb = send->cb;
783 0           Safefree(send->sql);
784 0           release_send(send);
785 0           self->pending_count--;
786              
787 0           COPY_ERROR(errbuf, mysql_error(self->conn));
788              
789 0           self->state = STATE_IDLE;
790 0           self->callback_depth++;
791 0           invoke_error_cb(cb, errbuf);
792 0           self->callback_depth--;
793 0 0         if (check_destroyed(self)) return;
794 0 0         if (self->state != STATE_IDLE) return;
795 0           cancel_pending(self, "send failed");
796 0 0         if (check_destroyed(self)) return;
797 0 0         if (self->state != STATE_IDLE) return;
798 0           drain_queues_silent(self);
799 0           cleanup_connection(self);
800             }
801              
802 0           static void on_send_done(ev_mariadb_t *self) {
803 0           ngx_queue_t *q = ngx_queue_head(&self->send_queue);
804 0           ev_mariadb_send_t *send = ngx_queue_data(q, ev_mariadb_send_t, queue);
805 0           ngx_queue_remove(q);
806              
807 0 0         if (self->op_ret != 0) {
808 0           handle_send_failure(self, send);
809 0           return;
810             }
811              
812 0           push_cb_owned(self, send->cb);
813 0           Safefree(send->sql);
814 0           release_send(send);
815 0           self->send_count++;
816              
817 0           self->state = STATE_IDLE;
818 0           pipeline_advance(self);
819             }
820              
821 0           static void on_read_result_done(ev_mariadb_t *self) {
822             int status;
823              
824 0 0         if (self->op_bool_ret != 0) {
825             /* query returned an error */
826 0           self->send_count--;
827 0           self->state = STATE_IDLE;
828 0 0         if (deliver_error(self, mysql_error(self->conn))) return;
829 0 0         if (self->state != STATE_IDLE) return;
830             /* drain any remaining result sets */
831 0 0         if (self->conn && mysql_more_results(self->conn)) {
    0          
832 0           drain_multi_result(self);
833 0           return;
834             }
835 0           pipeline_advance(self);
836 0           return;
837             }
838              
839 0 0         if (mysql_field_count(self->conn) > 0) {
840             /* has result set — store it */
841 0           status = mysql_store_result_start(&self->op_result, self->conn);
842 0 0         if (status != 0) {
843 0           self->state = STATE_STORE_RESULT;
844 0           update_watchers(self, status);
845 0           return;
846             }
847             /* synchronous store completion — fall through below */
848             }
849              
850             /* DML or synchronous store_result completion */
851             {
852 0           self->send_count--;
853 0           self->state = STATE_IDLE;
854 0 0         if (deliver_result(self)) return;
855 0 0         if (self->state != STATE_IDLE) return;
856             /* drain any extra result sets from multi-result queries */
857 0 0         if (self->conn && mysql_more_results(self->conn)) {
    0          
858 0           drain_multi_result(self);
859 0           return;
860             }
861 0           pipeline_advance(self);
862             }
863             }
864              
865             /* Called when store_result_cont completes (pipeline text query path) */
866 0           static void on_store_result_done(ev_mariadb_t *self) {
867 0 0         if (self->draining) {
868 0           MYSQL_RES *res = self->op_result;
869 0           self->op_result = NULL;
870             /* draining multi-result: free and continue */
871 0 0         if (res) mysql_free_result(res);
872 0 0         if (self->conn && mysql_more_results(self->conn)) {
    0          
873 0           int status = mysql_next_result_start(&self->op_ret, self->conn);
874 0 0         if (status != 0) {
875 0           self->state = STATE_NEXT_RESULT;
876 0           update_watchers(self, status);
877 0           return;
878             }
879 0           on_next_result_done(self);
880 0           return;
881             }
882 0           self->draining = 0;
883 0           self->state = STATE_IDLE;
884 0           pipeline_advance(self);
885 0           return;
886             }
887              
888 0           self->send_count--;
889 0           self->state = STATE_IDLE;
890              
891 0 0         if (deliver_result(self)) return;
892 0 0         if (self->state != STATE_IDLE) return;
893             /* drain any extra result sets */
894 0 0         if (self->conn && mysql_more_results(self->conn)) {
    0          
895 0           drain_multi_result(self);
896 0           return;
897             }
898 0           pipeline_advance(self);
899             }
900              
901             /*
902             * Pipeline orchestrator. Called when state == IDLE.
903             * Phase 1: send all queued queries via mysql_send_query.
904             * Phase 2: read next result via mysql_read_query_result.
905             */
906 0           static void pipeline_advance(ev_mariadb_t *self) {
907             int status;
908              
909             /* Ensure clean watcher state — previous operations may have left
910             * watchers active after completing synchronously within their
911             * done handlers. Without this, subsequent operations that need
912             * the same watcher direction would skip ev_io_start. */
913 0           stop_reading(self);
914 0           stop_writing(self);
915 0           stop_timer(self);
916              
917 0           send_phase:
918             /* Phase 1: send up to MAX_PIPELINE_DEPTH queries */
919 0 0         while (!ngx_queue_empty(&self->send_queue) &&
920 0 0         self->send_count < MAX_PIPELINE_DEPTH) {
921 0           ngx_queue_t *q = ngx_queue_head(&self->send_queue);
922 0           ev_mariadb_send_t *send = ngx_queue_data(q, ev_mariadb_send_t, queue);
923              
924 0           status = mysql_send_query_start(&self->op_ret, self->conn,
925 0           send->sql, send->sql_len);
926              
927 0 0         if (status != 0) {
928             /* need async IO to finish sending */
929 0           self->state = STATE_SEND;
930 0           update_watchers(self, status);
931 0           return;
932             }
933              
934             /* synchronous send completion */
935 0           ngx_queue_remove(q);
936              
937 0 0         if (self->op_ret != 0) {
938 0           handle_send_failure(self, send);
939 0           return;
940             }
941              
942 0           push_cb_owned(self, send->cb);
943 0           Safefree(send->sql);
944 0           release_send(send);
945 0           self->send_count++;
946             }
947              
948             /* Phase 2: read next result */
949 0 0         while (self->send_count > 0) {
950 0           status = mysql_read_query_result_start(&self->op_bool_ret, self->conn);
951              
952 0 0         if (status != 0) {
953 0           self->state = STATE_READ_RESULT;
954 0           update_watchers(self, status);
955 0           return;
956             }
957              
958             /* synchronous read completion */
959 0 0         if (self->op_bool_ret != 0) {
960             /* query error */
961 0           self->send_count--;
962 0 0         if (deliver_error(self, mysql_error(self->conn))) return;
963             /* callback may have started a new operation */
964 0 0         if (self->state != STATE_IDLE) return;
965 0 0         if (self->conn && mysql_more_results(self->conn)) {
    0          
966 0           drain_multi_result(self);
967 0           return;
968             }
969 0 0         if (!ngx_queue_empty(&self->send_queue)) goto send_phase;
970 0           continue;
971             }
972              
973 0 0         if (mysql_field_count(self->conn) > 0) {
974             /* has result set */
975 0           status = mysql_store_result_start(&self->op_result, self->conn);
976 0 0         if (status != 0) {
977 0           self->state = STATE_STORE_RESULT;
978 0           update_watchers(self, status);
979 0           return;
980             }
981             }
982              
983             {
984 0           self->send_count--;
985 0 0         if (deliver_result(self)) return;
986             /* callback may have started a new operation */
987 0 0         if (self->state != STATE_IDLE) return;
988 0 0         if (self->conn && mysql_more_results(self->conn)) {
    0          
989 0           drain_multi_result(self);
990 0           return;
991             }
992             /* loop back to check send_queue (callback may have queued more) */
993 0 0         if (!ngx_queue_empty(&self->send_queue)) goto send_phase;
994             }
995             }
996              
997 0           self->state = STATE_IDLE;
998             }
999              
1000             /* --- Connection --- */
1001              
1002 0           static void on_connect_done(ev_mariadb_t *self) {
1003 0           self->state = STATE_IDLE;
1004              
1005 0 0         if (self->op_conn_ret == NULL) {
1006             char errbuf[512];
1007 0           COPY_ERROR(errbuf, mysql_error(self->conn));
1008 0           self->callback_depth++;
1009 0           emit_error(self, errbuf);
1010 0           self->callback_depth--;
1011 0 0         if (check_destroyed(self)) return;
1012             /* on_error handler may have called reset(), starting a new connection;
1013             if state is no longer IDLE, don't touch the new connection */
1014 0 0         if (self->state != STATE_IDLE) return;
1015 0           cancel_pending(self, errbuf);
1016 0 0         if (check_destroyed(self)) return;
1017 0 0         if (self->state != STATE_IDLE) return;
1018 0           drain_queues_silent(self);
1019 0           cleanup_connection(self);
1020 0           return;
1021             }
1022              
1023             /* connected — reinit watchers for normal IO */
1024 0           stop_reading(self);
1025 0           stop_writing(self);
1026 0           stop_timer(self);
1027              
1028 0           self->fd = mysql_get_socket(self->conn);
1029              
1030 0 0         if (self->fd < 0) {
1031 0           self->callback_depth++;
1032 0           emit_error(self, "mysql_get_socket returned invalid fd");
1033 0           self->callback_depth--;
1034 0 0         if (check_destroyed(self)) return;
1035 0 0         if (self->state != STATE_IDLE) return;
1036 0           cancel_pending(self, "invalid fd");
1037 0 0         if (check_destroyed(self)) return;
1038 0 0         if (self->state != STATE_IDLE) return;
1039 0           drain_queues_silent(self);
1040 0           cleanup_connection(self);
1041 0           return;
1042             }
1043              
1044 0           init_io_watchers(self);
1045              
1046 0 0         if (NULL != self->on_connect) {
1047 0           SV *cb = sv_2mortal(SvREFCNT_inc_simple_NN(self->on_connect));
1048 0           self->callback_depth++;
1049              
1050             {
1051 0           dSP;
1052 0           ENTER;
1053 0           SAVETMPS;
1054 0 0         PUSHMARK(SP);
1055 0           PUTBACK;
1056              
1057 0           call_sv(cb, G_DISCARD | G_EVAL);
1058 0 0         if (SvTRUE(ERRSV)) {
    0          
1059 0 0         warn("EV::MariaDB: exception in connect handler: %s", SvPV_nolen(ERRSV));
1060 0 0         sv_setpvn(ERRSV, "", 0);
1061             }
1062              
1063 0 0         FREETMPS;
1064 0           LEAVE;
1065             }
1066              
1067 0           self->callback_depth--;
1068 0 0         if (check_destroyed(self)) return;
1069             }
1070              
1071             /* start pipeline if queries were queued during connection */
1072 0           maybe_pipeline(self);
1073             }
1074              
1075             /* --- Prepared statements --- */
1076              
1077 0           static void on_stmt_prepare_done(ev_mariadb_t *self) {
1078 0           MYSQL_STMT *stmt = self->op_stmt;
1079 0           self->op_stmt = NULL;
1080 0           self->state = STATE_IDLE;
1081              
1082 0 0         if (self->op_ret != 0) {
1083             char errbuf[512];
1084 0           COPY_ERROR(errbuf, mysql_stmt_error(stmt));
1085 0           mysql_stmt_close(stmt);
1086 0 0         if (deliver_error(self, errbuf)) return;
1087 0           maybe_pipeline(self);
1088 0           return;
1089             }
1090              
1091             {
1092             ev_mariadb_stmt_t *ctx;
1093 0           Newxz(ctx, 1, ev_mariadb_stmt_t);
1094 0           ctx->stmt = stmt;
1095 0           ctx->next = self->stmt_list;
1096 0           self->stmt_list = ctx;
1097 0 0         if (deliver_value(self, newSViv(PTR2IV(ctx)))) return;
1098             }
1099 0           maybe_pipeline(self);
1100             }
1101              
1102 0           static void on_stmt_execute_done(ev_mariadb_t *self) {
1103 0           MYSQL_STMT *stmt = self->op_stmt;
1104 0           self->op_stmt = NULL;
1105 0           self->state = STATE_IDLE;
1106              
1107 0 0         if (self->op_ret != 0) {
1108 0 0         if (deliver_error(self, mysql_stmt_error(stmt))) return;
1109 0           maybe_pipeline(self);
1110 0           return;
1111             }
1112              
1113             {
1114             MYSQL_RES *meta;
1115             SV *cb;
1116 0           meta = mysql_stmt_result_metadata(stmt);
1117 0           self->op_result = meta;
1118 0           cb = pop_cb(self);
1119 0 0         if (cb == NULL) {
1120 0 0         if (self->op_result) {
1121 0           mysql_free_result(self->op_result);
1122 0           self->op_result = NULL;
1123             }
1124 0           maybe_pipeline(self);
1125 0           return;
1126             }
1127              
1128 0           self->callback_depth++;
1129             {
1130 0           dSP;
1131 0           ENTER;
1132 0           SAVETMPS;
1133 0 0         PUSHMARK(SP);
1134              
1135 0 0         if (self->op_result == NULL) {
1136 0           my_ulonglong affected = mysql_stmt_affected_rows(stmt);
1137 0           PUSHs(sv_2mortal(newSVuv((UV)affected)));
1138             }
1139             else {
1140 0           unsigned int ncols = mysql_num_fields(self->op_result);
1141 0           MYSQL_FIELD *fields = mysql_fetch_fields(self->op_result);
1142             MYSQL_BIND *bind;
1143             unsigned long *lengths;
1144             my_bool *is_null;
1145             char **buffers;
1146             unsigned int c;
1147 0           AV *rows = newAV();
1148             int fetch_ret;
1149              
1150 0           Newxz(bind, ncols, MYSQL_BIND);
1151 0           SAVEFREEPV(bind);
1152 0           Newx(lengths, ncols, unsigned long);
1153 0           SAVEFREEPV(lengths);
1154 0           Newx(is_null, ncols, my_bool);
1155 0           SAVEFREEPV(is_null);
1156 0           Newx(buffers, ncols, char *);
1157 0           SAVEFREEPV(buffers);
1158              
1159 0 0         for (c = 0; c < ncols; c++) {
1160 0           unsigned long buflen = fields[c].max_length;
1161 0 0         if (buflen < 256) buflen = 256;
1162 0           Newx(buffers[c], buflen, char);
1163 0           SAVEFREEPV(buffers[c]);
1164 0           bind[c].buffer_type = MYSQL_TYPE_STRING;
1165 0           bind[c].buffer = buffers[c];
1166 0           bind[c].buffer_length = buflen;
1167 0           bind[c].length = &lengths[c];
1168 0           bind[c].is_null = &is_null[c];
1169             }
1170              
1171 0 0         if (mysql_stmt_bind_result(stmt, bind)) {
1172 0           mysql_stmt_free_result(stmt);
1173 0           mysql_free_result(self->op_result);
1174 0           self->op_result = NULL;
1175 0           SvREFCNT_dec((SV*)rows);
1176              
1177 0           PUSHs(&PL_sv_undef);
1178 0           PUSHs(sv_2mortal(newSVpv(mysql_stmt_error(stmt), 0)));
1179 0           goto invoke;
1180             }
1181              
1182 0 0         while ((fetch_ret = mysql_stmt_fetch(stmt)) == 0) {
1183 0           AV *row = newAV();
1184 0 0         if (ncols > 0) av_extend(row, ncols - 1);
1185 0 0         for (c = 0; c < ncols; c++) {
1186 0 0         if (is_null[c]) {
1187 0           av_push(row, newSV(0));
1188             } else {
1189 0           SV *val = newSVpvn(buffers[c], lengths[c]);
1190 0 0         if (self->utf8 && is_utf8_charset(fields[c].charsetnr))
    0          
1191 0           SvUTF8_on(val);
1192 0           av_push(row, val);
1193             }
1194             }
1195 0           av_push(rows, newRV_noinc((SV*)row));
1196             }
1197              
1198 0 0         if (fetch_ret != 0 && fetch_ret != MYSQL_NO_DATA) {
    0          
1199 0           mysql_free_result(self->op_result);
1200 0           self->op_result = NULL;
1201 0           mysql_stmt_free_result(stmt);
1202 0           SvREFCNT_dec((SV*)rows);
1203 0           PUSHs(&PL_sv_undef);
1204 0 0         if (fetch_ret == MYSQL_DATA_TRUNCATED)
1205 0           PUSHs(sv_2mortal(newSVpvs("data truncated")));
1206             else
1207 0           PUSHs(sv_2mortal(newSVpv(mysql_stmt_error(stmt), 0)));
1208             } else {
1209 0           AV *fnames = build_field_names(self->conn, fields, ncols);
1210 0           mysql_free_result(self->op_result);
1211 0           self->op_result = NULL;
1212 0           mysql_stmt_free_result(stmt);
1213 0           PUSHs(sv_2mortal(newRV_noinc((SV*)rows)));
1214 0           PUSHs(&PL_sv_undef);
1215 0           PUSHs(sv_2mortal(newRV_noinc((SV*)fnames)));
1216             }
1217             }
1218              
1219 0           invoke:
1220 0           PUTBACK;
1221 0           invoke_cb(cb);
1222 0 0         FREETMPS;
1223 0           LEAVE;
1224             }
1225 0           self->callback_depth--;
1226 0 0         if (check_destroyed(self)) return;
1227 0           maybe_pipeline(self);
1228             }
1229             }
1230              
1231 0           static void on_stmt_store_done(ev_mariadb_t *self) {
1232 0 0         if (self->op_ret != 0) {
1233 0           MYSQL_STMT *stmt = self->op_stmt;
1234 0           self->op_stmt = NULL;
1235 0           self->state = STATE_IDLE;
1236 0 0         if (deliver_error(self, mysql_stmt_error(stmt))) return;
1237 0           maybe_pipeline(self);
1238 0           return;
1239             }
1240 0           on_stmt_execute_done(self);
1241             }
1242              
1243 0           static void unlink_stmt(ev_mariadb_t *self, ev_mariadb_stmt_t *ctx) {
1244 0           ev_mariadb_stmt_t **pp = &self->stmt_list;
1245 0 0         while (*pp) {
1246 0 0         if (*pp == ctx) { *pp = ctx->next; return; }
1247 0           pp = &(*pp)->next;
1248             }
1249             }
1250              
1251 0           static void on_stmt_close_done(ev_mariadb_t *self) {
1252 0           self->op_stmt = NULL; /* stmt freed by mysql_stmt_close */
1253 0 0         if (self->op_stmt_ctx) {
1254 0           unlink_stmt(self, self->op_stmt_ctx);
1255 0           Safefree(self->op_stmt_ctx);
1256 0           self->op_stmt_ctx = NULL;
1257             }
1258 0           self->state = STATE_IDLE;
1259 0 0         if (self->op_bool_ret != 0) {
1260 0 0         if (deliver_error(self, mysql_error(self->conn))) return;
1261             } else {
1262 0 0         if (deliver_value(self, newSViv(1))) return;
1263             }
1264 0           maybe_pipeline(self);
1265             }
1266              
1267 0           static void on_stmt_reset_done(ev_mariadb_t *self) {
1268 0           MYSQL_STMT *stmt = self->op_stmt;
1269 0           self->op_stmt = NULL;
1270 0           self->state = STATE_IDLE;
1271 0 0         if (self->op_bool_ret != 0) {
1272 0 0         if (deliver_error(self, mysql_stmt_error(stmt))) return;
1273             } else {
1274 0 0         if (deliver_value(self, newSViv(1))) return;
1275             }
1276 0           maybe_pipeline(self);
1277             }
1278              
1279             /* --- Async utility operation done handler --- */
1280              
1281 0           static void on_utility_done(ev_mariadb_t *self, int failed) {
1282 0           self->state = STATE_IDLE;
1283 0 0         if (failed) {
1284 0 0         if (deliver_error(self, mysql_error(self->conn))) return;
1285             } else {
1286 0 0         if (deliver_value(self, newSViv(1))) return;
1287             }
1288 0           maybe_pipeline(self);
1289             }
1290              
1291 0           static void free_stmt_bind_params(ev_mariadb_stmt_t *ctx) {
1292             int i;
1293 0 0         if (!ctx->bind_params) return;
1294 0 0         for (i = 0; i < ctx->bind_param_count; i++) {
1295 0           Safefree(ctx->bind_params[i].buffer);
1296             }
1297 0           Safefree(ctx->bind_params);
1298 0           ctx->bind_params = NULL;
1299 0           ctx->bind_param_count = 0;
1300             }
1301              
1302 0           static void transition_to_stmt_store(ev_mariadb_t *self) {
1303             int status;
1304 0           self->op_stmt_ctx = NULL; /* bind_params live in wrapper, not freed here */
1305 0 0         if (self->op_ret != 0) {
1306 0           on_stmt_execute_done(self);
1307             } else {
1308 0           self->state = STATE_STMT_STORE;
1309 0           status = mysql_stmt_store_result_start(&self->op_ret, self->op_stmt);
1310 0 0         if (status == 0) {
1311 0           on_stmt_store_done(self);
1312             } else {
1313 0           update_watchers(self, status);
1314             }
1315             }
1316 0           }
1317              
1318             /* --- Streaming query (mysql_use_result + fetch_row) --- */
1319              
1320 0           static void stream_error(ev_mariadb_t *self) {
1321 0           SV *cb = self->stream_cb;
1322             char errbuf[512];
1323 0           COPY_ERROR(errbuf, mysql_error(self->conn));
1324 0           self->stream_cb = NULL;
1325 0           self->state = STATE_IDLE;
1326 0           self->pending_count--;
1327 0           self->callback_depth++;
1328 0           invoke_error_cb(cb, errbuf);
1329 0           self->callback_depth--;
1330 0           }
1331              
1332 0           static void on_real_query_done(ev_mariadb_t *self) {
1333             int status;
1334              
1335 0 0         if (self->op_ret != 0) {
1336 0           stream_error(self);
1337 0 0         if (check_destroyed(self)) return;
1338 0           maybe_pipeline(self);
1339 0           return;
1340             }
1341              
1342 0           self->op_result = mysql_use_result(self->conn);
1343 0 0         if (!self->op_result) {
1344 0           const char *err = mysql_error(self->conn);
1345 0 0         if (err && err[0]) {
    0          
1346 0           stream_error(self);
1347 0 0         if (check_destroyed(self)) return;
1348             } else {
1349             /* DML or no-result query: deliver EOF to stream callback */
1350 0           SV *cb = self->stream_cb;
1351 0           self->stream_cb = NULL;
1352 0           self->state = STATE_IDLE;
1353 0           self->pending_count--;
1354 0           self->callback_depth++;
1355 0           invoke_eof_cb(cb);
1356 0           self->callback_depth--;
1357 0 0         if (check_destroyed(self)) return;
1358             }
1359 0           maybe_pipeline(self);
1360 0           return;
1361             }
1362              
1363 0           status = mysql_fetch_row_start(&self->op_row, self->op_result);
1364 0 0         if (status == 0) {
1365 0           on_stream_fetch_done(self);
1366             } else {
1367 0           self->state = STATE_STREAM_FETCH;
1368 0           update_watchers(self, status);
1369             }
1370             }
1371              
1372 0           static void on_stream_fetch_done(ev_mariadb_t *self) {
1373 0 0         MYSQL_FIELD *fields = (self->utf8 && self->op_result)
1374 0 0         ? mysql_fetch_fields(self->op_result) : NULL;
1375              
1376 0           for (;;) {
1377 0           MYSQL_ROW row = self->op_row;
1378              
1379 0 0         if (row == NULL) break;
1380              
1381             /* deliver row to callback */
1382             {
1383 0           unsigned int ncols = mysql_num_fields(self->op_result);
1384 0           unsigned long *lengths = mysql_fetch_lengths(self->op_result);
1385 0           AV *r = newAV();
1386             unsigned int c;
1387              
1388 0 0         if (ncols > 0) av_extend(r, ncols - 1);
1389 0 0         for (c = 0; c < ncols; c++) {
1390 0 0         if (row[c] == NULL) {
1391 0           av_push(r, newSV(0));
1392             } else {
1393 0           SV *val = newSVpvn(row[c], lengths[c]);
1394 0 0         if (fields && is_utf8_charset(fields[c].charsetnr))
    0          
1395 0           SvUTF8_on(val);
1396 0           av_push(r, val);
1397             }
1398             }
1399              
1400             /* Detach stream_cb before invoking: prevents cancel_pending
1401             from double-firing the callback if finish() is called inside */
1402             {
1403 0           SV *saved_cb = self->stream_cb;
1404 0           self->stream_cb = NULL;
1405 0           self->pending_count--;
1406              
1407 0           self->callback_depth++;
1408             {
1409 0           SV *cb = sv_2mortal(SvREFCNT_inc_simple_NN(saved_cb));
1410 0           dSP;
1411 0           ENTER;
1412 0           SAVETMPS;
1413 0 0         PUSHMARK(SP);
1414 0           PUSHs(sv_2mortal(newRV_noinc((SV*)r)));
1415 0           PUTBACK;
1416 0           call_sv(cb, G_DISCARD | G_EVAL);
1417 0 0         if (SvTRUE(ERRSV)) {
    0          
1418 0 0         warn("EV::MariaDB: exception in stream callback: %s", SvPV_nolen(ERRSV));
1419 0 0         sv_setpvn(ERRSV, "", 0);
1420             }
1421 0 0         FREETMPS;
1422 0           LEAVE;
1423             }
1424 0           self->callback_depth--;
1425 0 0         if (check_destroyed(self)) {
1426 0           SvREFCNT_dec(saved_cb);
1427 0           return;
1428             }
1429              
1430             /* Was stream cancelled (finish/reset from callback)? */
1431 0 0         if (self->stream_cb != NULL) {
1432             /* callback started a new stream — don't touch */
1433 0           SvREFCNT_dec(saved_cb);
1434 0 0         } else if (self->conn == NULL || self->op_result == NULL) {
    0          
1435             /* connection torn down — stream is over */
1436 0           SvREFCNT_dec(saved_cb);
1437 0           return;
1438             } else {
1439             /* normal: restore for next row */
1440 0           self->stream_cb = saved_cb;
1441 0           self->pending_count++;
1442             }
1443             }
1444             }
1445              
1446             /* fetch next row */
1447             {
1448 0           int status = mysql_fetch_row_start(&self->op_row, self->op_result);
1449 0 0         if (status != 0) {
1450 0           self->state = STATE_STREAM_FETCH;
1451 0           update_watchers(self, status);
1452 0           return;
1453             }
1454             }
1455             }
1456              
1457             /* row == NULL: EOF or error */
1458             {
1459 0           SV *cb = self->stream_cb;
1460             char errbuf[512];
1461 0           const char *err = mysql_error(self->conn);
1462 0 0         int has_error = (err && err[0]);
    0          
1463 0 0         if (has_error) COPY_ERROR(errbuf, err);
1464              
1465 0           self->stream_cb = NULL;
1466 0           mysql_free_result(self->op_result);
1467 0           self->op_result = NULL;
1468 0           self->state = STATE_IDLE;
1469 0           self->pending_count--;
1470              
1471 0           self->callback_depth++;
1472 0 0         if (has_error) {
1473 0           invoke_error_cb(cb, errbuf);
1474             } else {
1475 0           invoke_eof_cb(cb);
1476             }
1477 0           self->callback_depth--;
1478 0 0         if (check_destroyed(self)) return;
1479             /* drain any secondary result sets from multi_statements */
1480 0 0         if (self->conn && mysql_more_results(self->conn)) {
    0          
1481 0           drain_multi_result(self);
1482 0           return;
1483             }
1484 0           maybe_pipeline(self);
1485             }
1486             }
1487              
1488             /* --- Async close --- */
1489              
1490 0           static void on_close_done(ev_mariadb_t *self) {
1491             SV *cb;
1492              
1493             /* conn was freed by mysql_close */
1494 0           self->conn = NULL;
1495 0           self->fd = -1;
1496 0           stop_reading(self);
1497 0           stop_writing(self);
1498 0           stop_timer(self);
1499 0           self->state = STATE_IDLE;
1500              
1501             /* pop our callback before cancel_pending fires errors for remaining items */
1502 0           cb = pop_cb(self);
1503              
1504             /* cancel any items queued from within a callback before close_async */
1505 0           cancel_pending(self, "connection closed");
1506 0 0         if (check_destroyed(self)) {
1507 0           SvREFCNT_dec(cb);
1508 0           return;
1509             }
1510              
1511 0 0         if (cb) {
1512 0           SV *val = newSViv(1);
1513 0           self->callback_depth++;
1514             {
1515 0           dSP;
1516 0           ENTER;
1517 0           SAVETMPS;
1518 0 0         PUSHMARK(SP);
1519 0           PUSHs(sv_2mortal(val));
1520 0           PUTBACK;
1521 0           invoke_cb(cb);
1522 0 0         FREETMPS;
1523 0           LEAVE;
1524             }
1525 0           self->callback_depth--;
1526 0 0         if (check_destroyed(self)) return;
1527             }
1528             /* no maybe_pipeline: conn is NULL, nothing can be sent */
1529             }
1530              
1531             /* --- Send long data done --- */
1532              
1533 0           static void on_send_long_data_done(ev_mariadb_t *self) {
1534 0           MYSQL_STMT *stmt = self->op_stmt;
1535              
1536 0 0         if (self->op_data_ptr) {
1537 0           Safefree(self->op_data_ptr);
1538 0           self->op_data_ptr = NULL;
1539             }
1540 0           self->op_stmt = NULL;
1541 0           self->state = STATE_IDLE;
1542              
1543 0 0         if (self->op_bool_ret != 0) {
1544 0 0         if (deliver_error(self, mysql_stmt_error(stmt))) return;
1545             } else {
1546 0 0         if (deliver_value(self, newSViv(1))) return;
1547             }
1548 0           maybe_pipeline(self);
1549             }
1550              
1551             /* --- Main continuation dispatcher --- */
1552              
1553 0           static void continue_operation(ev_mariadb_t *self, int events) {
1554             int status;
1555              
1556 0           switch (self->state) {
1557 0           case STATE_CONNECTING:
1558 0           status = mysql_real_connect_cont(&self->op_conn_ret, self->conn, events);
1559 0 0         if (status == 0) {
1560 0           on_connect_done(self);
1561             } else {
1562 0           update_watchers(self, status);
1563             }
1564 0           break;
1565              
1566 0           case STATE_SEND:
1567 0           status = mysql_send_query_cont(&self->op_ret, self->conn, events);
1568 0 0         if (status == 0) {
1569 0           on_send_done(self);
1570             } else {
1571 0           update_watchers(self, status);
1572             }
1573 0           break;
1574              
1575 0           case STATE_READ_RESULT:
1576 0           status = mysql_read_query_result_cont(&self->op_bool_ret, self->conn, events);
1577 0 0         if (status == 0) {
1578 0           on_read_result_done(self);
1579             } else {
1580 0           update_watchers(self, status);
1581             }
1582 0           break;
1583              
1584 0           case STATE_STORE_RESULT:
1585 0           status = mysql_store_result_cont(&self->op_result, self->conn, events);
1586 0 0         if (status == 0) {
1587 0           on_store_result_done(self);
1588             } else {
1589 0           update_watchers(self, status);
1590             }
1591 0           break;
1592              
1593 0           case STATE_NEXT_RESULT:
1594 0           status = mysql_next_result_cont(&self->op_ret, self->conn, events);
1595 0 0         if (status == 0) {
1596 0           on_next_result_done(self);
1597             } else {
1598 0           update_watchers(self, status);
1599             }
1600 0           break;
1601              
1602 0           case STATE_STMT_PREPARE:
1603 0           status = mysql_stmt_prepare_cont(&self->op_ret, self->op_stmt, events);
1604 0 0         if (status == 0) {
1605 0           on_stmt_prepare_done(self);
1606             } else {
1607 0           update_watchers(self, status);
1608             }
1609 0           break;
1610              
1611 0           case STATE_STMT_EXECUTE:
1612 0           status = mysql_stmt_execute_cont(&self->op_ret, self->op_stmt, events);
1613 0 0         if (status == 0) {
1614 0           transition_to_stmt_store(self);
1615             } else {
1616 0           update_watchers(self, status);
1617             }
1618 0           break;
1619              
1620 0           case STATE_STMT_STORE:
1621 0           status = mysql_stmt_store_result_cont(&self->op_ret, self->op_stmt, events);
1622 0 0         if (status == 0) {
1623 0           on_stmt_store_done(self);
1624             } else {
1625 0           update_watchers(self, status);
1626             }
1627 0           break;
1628              
1629 0           case STATE_STMT_CLOSE:
1630 0           status = mysql_stmt_close_cont(&self->op_bool_ret, self->op_stmt, events);
1631 0 0         if (status == 0) {
1632 0           on_stmt_close_done(self);
1633             } else {
1634 0           update_watchers(self, status);
1635             }
1636 0           break;
1637              
1638 0           case STATE_STMT_RESET:
1639 0           status = mysql_stmt_reset_cont(&self->op_bool_ret, self->op_stmt, events);
1640 0 0         if (status == 0) {
1641 0           on_stmt_reset_done(self);
1642             } else {
1643 0           update_watchers(self, status);
1644             }
1645 0           break;
1646              
1647 0           case STATE_PING:
1648 0           status = mysql_ping_cont(&self->op_ret, self->conn, events);
1649 0 0         if (status == 0) {
1650 0           on_utility_done(self, self->op_ret != 0);
1651             } else {
1652 0           update_watchers(self, status);
1653             }
1654 0           break;
1655              
1656 0           case STATE_CHANGE_USER:
1657 0           status = mysql_change_user_cont(&self->op_bool_ret, self->conn, events);
1658 0 0         if (status == 0) {
1659 0           on_utility_done(self, self->op_bool_ret != 0);
1660             } else {
1661 0           update_watchers(self, status);
1662             }
1663 0           break;
1664              
1665 0           case STATE_SELECT_DB:
1666 0           status = mysql_select_db_cont(&self->op_ret, self->conn, events);
1667 0 0         if (status == 0) {
1668 0           on_utility_done(self, self->op_ret != 0);
1669             } else {
1670 0           update_watchers(self, status);
1671             }
1672 0           break;
1673              
1674 0           case STATE_RESET_CONNECTION:
1675 0           status = mysql_reset_connection_cont(&self->op_ret, self->conn, events);
1676 0 0         if (status == 0) {
1677 0           on_utility_done(self, self->op_ret != 0);
1678             } else {
1679 0           update_watchers(self, status);
1680             }
1681 0           break;
1682              
1683 0           case STATE_SET_CHARSET:
1684 0           status = mysql_set_character_set_cont(&self->op_ret, self->conn, events);
1685 0 0         if (status == 0) {
1686 0           on_utility_done(self, self->op_ret != 0);
1687             } else {
1688 0           update_watchers(self, status);
1689             }
1690 0           break;
1691              
1692 0           case STATE_COMMIT:
1693 0           status = mysql_commit_cont(&self->op_bool_ret, self->conn, events);
1694 0 0         if (status == 0) {
1695 0           on_utility_done(self, self->op_bool_ret != 0);
1696             } else {
1697 0           update_watchers(self, status);
1698             }
1699 0           break;
1700              
1701 0           case STATE_ROLLBACK:
1702 0           status = mysql_rollback_cont(&self->op_bool_ret, self->conn, events);
1703 0 0         if (status == 0) {
1704 0           on_utility_done(self, self->op_bool_ret != 0);
1705             } else {
1706 0           update_watchers(self, status);
1707             }
1708 0           break;
1709              
1710 0           case STATE_AUTOCOMMIT:
1711 0           status = mysql_autocommit_cont(&self->op_bool_ret, self->conn, events);
1712 0 0         if (status == 0) {
1713 0           on_utility_done(self, self->op_bool_ret != 0);
1714             } else {
1715 0           update_watchers(self, status);
1716             }
1717 0           break;
1718              
1719 0           case STATE_STMT_SEND_LONG_DATA:
1720 0           status = mysql_stmt_send_long_data_cont(&self->op_bool_ret, self->op_stmt, events);
1721 0 0         if (status == 0) {
1722 0           on_send_long_data_done(self);
1723             } else {
1724 0           update_watchers(self, status);
1725             }
1726 0           break;
1727              
1728 0           case STATE_REAL_QUERY:
1729 0           status = mysql_real_query_cont(&self->op_ret, self->conn, events);
1730 0 0         if (status == 0) {
1731 0           on_real_query_done(self);
1732             } else {
1733 0           update_watchers(self, status);
1734             }
1735 0           break;
1736              
1737 0           case STATE_STREAM_FETCH:
1738 0           status = mysql_fetch_row_cont(&self->op_row, self->op_result, events);
1739 0 0         if (status == 0) {
1740 0           on_stream_fetch_done(self);
1741             } else {
1742 0           update_watchers(self, status);
1743             }
1744 0           break;
1745              
1746 0           case STATE_CLOSE:
1747 0           status = mysql_close_cont(self->conn, events);
1748 0 0         if (status == 0) {
1749 0           on_close_done(self);
1750             } else {
1751 0           update_watchers(self, status);
1752             }
1753 0           break;
1754              
1755 0           default:
1756 0           warn("EV::MariaDB: unexpected state %d in continue_operation", self->state);
1757 0           break;
1758             }
1759 0           }
1760              
1761 0           static void io_cb(EV_P_ ev_io *w, int revents) {
1762 0           ev_mariadb_t *self = (ev_mariadb_t *)w->data;
1763 0           int events = 0;
1764             (void)loop;
1765              
1766 0 0         if (self == NULL || self->magic != EV_MARIADB_MAGIC) return;
    0          
1767 0 0         if (NULL == self->conn) return;
1768              
1769 0 0         if (revents & EV_READ) events |= MYSQL_WAIT_READ | MYSQL_WAIT_EXCEPT;
1770 0 0         if (revents & EV_WRITE) events |= MYSQL_WAIT_WRITE;
1771              
1772 0           continue_operation(self, events);
1773             }
1774              
1775 0           static void timer_cb(EV_P_ ev_timer *w, int revents) {
1776 0           ev_mariadb_t *self = (ev_mariadb_t *)w->data;
1777             (void)loop;
1778             (void)revents;
1779              
1780 0 0         if (self == NULL || self->magic != EV_MARIADB_MAGIC) return;
    0          
1781 0 0         if (NULL == self->conn) return;
1782              
1783 0           self->timing = 0;
1784 0           continue_operation(self, MYSQL_WAIT_TIMEOUT);
1785             }
1786              
1787 0           static char* safe_strdup(const char *s) {
1788             char *d;
1789             size_t len;
1790 0 0         if (!s) return NULL;
1791 0           len = strlen(s);
1792 0           Newx(d, len + 1, char);
1793 0           Copy(s, d, len + 1, char);
1794 0           return d;
1795             }
1796              
1797 2           static void free_connect_strings(ev_mariadb_t *self) {
1798 2 50         if (self->host) { Safefree(self->host); self->host = NULL; }
1799 2 50         if (self->user) { Safefree(self->user); self->user = NULL; }
1800 2 50         if (self->password) { Safefree(self->password); self->password = NULL; }
1801 2 50         if (self->database) { Safefree(self->database); self->database = NULL; }
1802 2 50         if (self->unix_socket) { Safefree(self->unix_socket); self->unix_socket = NULL; }
1803 2           }
1804              
1805 2           static void free_option_strings(ev_mariadb_t *self) {
1806 2 50         if (self->charset) { Safefree(self->charset); self->charset = NULL; }
1807 2 50         if (self->init_command) { Safefree(self->init_command); self->init_command = NULL; }
1808 2 50         if (self->ssl_key) { Safefree(self->ssl_key); self->ssl_key = NULL; }
1809 2 50         if (self->ssl_cert) { Safefree(self->ssl_cert); self->ssl_cert = NULL; }
1810 2 50         if (self->ssl_ca) { Safefree(self->ssl_ca); self->ssl_ca = NULL; }
1811 2 50         if (self->ssl_capath) { Safefree(self->ssl_capath); self->ssl_capath = NULL; }
1812 2 50         if (self->ssl_cipher) { Safefree(self->ssl_cipher); self->ssl_cipher = NULL; }
1813 2           }
1814              
1815 0           static void apply_options(ev_mariadb_t *self) {
1816 0           MYSQL *conn = self->conn;
1817 0           unsigned long flags = self->client_flags;
1818              
1819 0 0         if (self->connect_timeout > 0)
1820 0           mysql_options(conn, MYSQL_OPT_CONNECT_TIMEOUT, &self->connect_timeout);
1821 0 0         if (self->read_timeout > 0)
1822 0           mysql_options(conn, MYSQL_OPT_READ_TIMEOUT, &self->read_timeout);
1823 0 0         if (self->write_timeout > 0)
1824 0           mysql_options(conn, MYSQL_OPT_WRITE_TIMEOUT, &self->write_timeout);
1825 0 0         if (self->compress)
1826 0           mysql_options(conn, MYSQL_OPT_COMPRESS, NULL);
1827 0 0         if (self->charset)
1828 0           mysql_options(conn, MYSQL_SET_CHARSET_NAME, self->charset);
1829 0 0         if (self->init_command)
1830 0           mysql_options(conn, MYSQL_INIT_COMMAND, self->init_command);
1831 0 0         if (self->ssl_ca || self->ssl_capath || self->ssl_cert || self->ssl_key || self->ssl_cipher)
    0          
    0          
    0          
    0          
1832 0           mysql_ssl_set(conn, self->ssl_key, self->ssl_cert, self->ssl_ca, self->ssl_capath, self->ssl_cipher);
1833 0 0         if (self->ssl_verify_server_cert) {
1834 0           my_bool val = 1;
1835 0           mysql_options(conn, MYSQL_OPT_SSL_VERIFY_SERVER_CERT, &val);
1836             }
1837 0 0         if (self->multi_statements)
1838 0           flags |= CLIENT_MULTI_STATEMENTS | CLIENT_MULTI_RESULTS;
1839 0 0         if (self->found_rows)
1840 0           flags |= CLIENT_FOUND_ROWS;
1841              
1842 0           self->client_flags = flags;
1843 0           }
1844              
1845 0           static int is_utf8_charset(unsigned int charsetnr) {
1846             /* utf8mb3 collations: 33, 83, 192-211 */
1847             /* utf8mb4 collations: 45, 46, 224-247, 255, 256-309 */
1848             /* MariaDB 10.10+ uca1400: utf8mb3 2048-2303, utf8mb4 2304-2559 */
1849             /* (2560+ are ucs2/utf16/utf32 uca1400 — NOT UTF-8) */
1850 0 0         return charsetnr == 33 || charsetnr == 83
1851 0 0         || (charsetnr >= 192 && charsetnr <= 211)
    0          
1852 0 0         || charsetnr == 45 || charsetnr == 46
    0          
1853 0 0         || (charsetnr >= 224 && charsetnr <= 247)
    0          
1854 0 0         || charsetnr == 255
1855 0 0         || (charsetnr >= 256 && charsetnr <= 309)
    0          
1856 0 0         || (charsetnr >= 2048 && charsetnr <= 2559);
    0          
    0          
1857             }
1858              
1859 0           static int conn_charset_is_utf8(MYSQL *conn) {
1860             const char *cs;
1861 0 0         if (!conn) return 0;
1862 0           cs = mysql_character_set_name(conn);
1863 0 0         return (cs && strncmp(cs, "utf8", 4) == 0);
    0          
1864             }
1865              
1866 0           static AV* build_field_names(MYSQL *conn, MYSQL_FIELD *fields, unsigned int ncols) {
1867 0           AV *fnames = newAV();
1868             unsigned int i;
1869 0           int utf8_names = conn_charset_is_utf8(conn);
1870 0 0         if (ncols > 0) av_extend(fnames, ncols - 1);
1871 0 0         for (i = 0; i < ncols; i++) {
1872 0           SV *name = newSVpvn(fields[i].name, fields[i].name_length);
1873 0 0         if (utf8_names)
1874 0           SvUTF8_on(name);
1875 0           av_push(fnames, name);
1876             }
1877 0           return fnames;
1878             }
1879              
1880 0           static void setup_bind_params(ev_mariadb_stmt_t *ctx, AV *params) {
1881 0           int nparams = (int)(av_len(params) + 1);
1882             MYSQL_BIND *bp;
1883             int i;
1884              
1885             {
1886 0           unsigned long expected = mysql_stmt_param_count(ctx->stmt);
1887 0 0         if ((unsigned long)nparams != expected)
1888 0           croak("parameter count mismatch: got %d, expected %lu", nparams, expected);
1889             }
1890              
1891 0 0         if (nparams == 0) return;
1892              
1893 0           free_stmt_bind_params(ctx);
1894              
1895 0           Newxz(bp, nparams, MYSQL_BIND);
1896 0           ctx->bind_params = bp;
1897 0           ctx->bind_param_count = nparams;
1898              
1899 0 0         for (i = 0; i < nparams; i++) {
1900 0           SV **svp = av_fetch(params, i, 0);
1901 0 0         if (svp && SvOK(*svp)) {
    0          
1902 0 0         if (SvIOK(*svp)) {
1903             long long *val;
1904 0           Newx(val, 1, long long);
1905 0 0         if (SvIsUV(*svp)) {
1906 0           *val = (long long)(unsigned long long)SvUV(*svp);
1907 0           bp[i].is_unsigned = 1;
1908             } else {
1909 0           *val = (long long)SvIV(*svp);
1910 0           bp[i].is_unsigned = 0;
1911             }
1912 0           bp[i].buffer_type = MYSQL_TYPE_LONGLONG;
1913 0           bp[i].buffer = (void *)val;
1914 0 0         } else if (SvNOK(*svp)) {
1915             double *val;
1916 0           Newx(val, 1, double);
1917 0           *val = (double)SvNV(*svp);
1918 0           bp[i].buffer_type = MYSQL_TYPE_DOUBLE;
1919 0           bp[i].buffer = (void *)val;
1920             } else {
1921             STRLEN len;
1922 0           const char *s = SvPV(*svp, len);
1923             char *copy;
1924 0           Newx(copy, len + 1, char);
1925 0           Copy(s, copy, len, char);
1926 0           copy[len] = '\0';
1927 0           bp[i].buffer_type = MYSQL_TYPE_STRING;
1928 0           bp[i].buffer = (void *)copy;
1929 0           bp[i].buffer_length = (unsigned long)len;
1930 0           bp[i].length = &bp[i].buffer_length;
1931             }
1932             } else {
1933 0           bp[i].buffer_type = MYSQL_TYPE_NULL;
1934             }
1935             }
1936              
1937 0 0         if (mysql_stmt_bind_param(ctx->stmt, bp)) {
1938             char errbuf[512];
1939 0           COPY_ERROR(errbuf, mysql_stmt_error(ctx->stmt));
1940 0           free_stmt_bind_params(ctx);
1941 0           croak("%s", errbuf);
1942             }
1943             }
1944              
1945 0           static void start_connect(ev_mariadb_t *self) {
1946             int status;
1947              
1948 0           self->conn = mysql_init(NULL);
1949 0 0         if (NULL == self->conn) {
1950 0           croak("mysql_init failed");
1951             }
1952              
1953 0           self->connect_pid = getpid();
1954 0           mysql_options(self->conn, MYSQL_OPT_NONBLOCK, 0);
1955 0           apply_options(self);
1956              
1957 0           self->state = STATE_CONNECTING;
1958 0           status = mysql_real_connect_start(&self->op_conn_ret, self->conn,
1959 0           self->host, self->user, self->password, self->database,
1960 0           self->port, self->unix_socket, self->client_flags);
1961              
1962 0 0         if (status == 0) {
1963 0           on_connect_done(self);
1964             } else {
1965 0           self->fd = mysql_get_socket(self->conn);
1966 0 0         if (self->fd < 0) {
1967 0           mysql_close(self->conn);
1968 0           self->conn = NULL;
1969 0           self->state = STATE_IDLE;
1970 0           croak("mysql_get_socket returned invalid fd");
1971             }
1972              
1973 0           init_io_watchers(self);
1974              
1975 0           update_watchers(self, status);
1976             }
1977 0           }
1978              
1979             MODULE = EV::MariaDB PACKAGE = EV::MariaDB
1980              
1981             BOOT:
1982             {
1983 15 50         I_EV_API("EV::MariaDB");
    50          
    50          
1984             }
1985              
1986             EV::MariaDB
1987             _new(char *class, EV::Loop loop)
1988             CODE:
1989             {
1990             PERL_UNUSED_VAR(class);
1991 2           Newxz(RETVAL, 1, ev_mariadb_t);
1992 2           RETVAL->magic = EV_MARIADB_MAGIC;
1993 2           RETVAL->loop = loop;
1994 2           RETVAL->fd = -1;
1995 2           RETVAL->state = STATE_IDLE;
1996 2           ngx_queue_init(&RETVAL->cb_queue);
1997 2           ngx_queue_init(&RETVAL->send_queue);
1998              
1999 2           ev_init(&RETVAL->timer, timer_cb);
2000 2           RETVAL->timer.data = (void *)RETVAL;
2001             }
2002             OUTPUT:
2003             RETVAL
2004              
2005             void
2006             DESTROY(EV::MariaDB self)
2007             CODE:
2008             {
2009 2 50         if (self->magic != EV_MARIADB_MAGIC)
2010 0           return;
2011              
2012 2           self->magic = EV_MARIADB_FREED;
2013              
2014 2           stop_reading(self);
2015 2           stop_writing(self);
2016 2           stop_timer(self);
2017              
2018 2 50         if (PL_dirty) {
2019             /* global destruction — free C resources only; skip SvREFCNT_dec
2020             to avoid cascading destructors in torn-down interpreter */
2021 0 0         int is_fork = IS_FORKED(self);
    0          
2022 0 0         while (!ngx_queue_empty(&self->send_queue)) {
2023 0           ngx_queue_t *q = ngx_queue_head(&self->send_queue);
2024 0           ev_mariadb_send_t *s = ngx_queue_data(q, ev_mariadb_send_t, queue);
2025 0           ngx_queue_remove(q);
2026 0           Safefree(s->sql);
2027 0           Safefree(s);
2028             }
2029 0 0         while (!ngx_queue_empty(&self->cb_queue)) {
2030 0           ngx_queue_t *q = ngx_queue_head(&self->cb_queue);
2031 0           ev_mariadb_cb_t *cbt = ngx_queue_data(q, ev_mariadb_cb_t, queue);
2032 0           ngx_queue_remove(q);
2033 0           Safefree(cbt);
2034             }
2035 0 0         if (self->op_data_ptr) Safefree(self->op_data_ptr);
2036 0           self->stream_cb = NULL;
2037             {
2038 0           ev_mariadb_stmt_t *ctx = self->stmt_list;
2039 0 0         while (ctx) {
2040 0           ev_mariadb_stmt_t *next = ctx->next;
2041 0           free_stmt_bind_params(ctx);
2042 0 0         if (!is_fork && ctx->stmt) mysql_stmt_close(ctx->stmt);
    0          
2043 0           Safefree(ctx);
2044 0           ctx = next;
2045             }
2046             }
2047 0 0         if (!is_fork) {
2048 0 0         if (self->op_result) mysql_free_result(self->op_result);
2049 0 0         if (self->conn) mysql_close(self->conn);
2050             }
2051 0           free_connect_strings(self);
2052 0           free_option_strings(self);
2053 0           Safefree(self);
2054 0           return;
2055             }
2056              
2057 2           cancel_pending(self, "object destroyed");
2058              
2059             /* safety net: callbacks during cancel_pending could re-queue entries */
2060 2           drain_queues_silent(self);
2061              
2062 2 50         if (self->op_data_ptr) {
2063 0           Safefree(self->op_data_ptr);
2064 0           self->op_data_ptr = NULL;
2065             }
2066              
2067             /* Free all tracked stmt wrappers and connection resources */
2068             {
2069 2 50         int is_fork = IS_FORKED(self);
    0          
2070 2           ev_mariadb_stmt_t *ctx = self->stmt_list;
2071 2 50         while (ctx) {
2072 0           ev_mariadb_stmt_t *next = ctx->next;
2073 0           free_stmt_bind_params(ctx);
2074 0 0         if (!is_fork && ctx->stmt) mysql_stmt_close(ctx->stmt);
    0          
2075 0           Safefree(ctx);
2076 0           ctx = next;
2077             }
2078 2           self->stmt_list = NULL;
2079             /* Close op_stmt if not tracked in stmt_list (e.g., in-flight prepare) */
2080 2 50         if (!is_fork && self->op_stmt && !self->op_stmt_ctx) {
    50          
    0          
2081 0           mysql_stmt_close(self->op_stmt);
2082             }
2083 2           self->op_stmt = NULL;
2084 2           self->op_stmt_ctx = NULL;
2085              
2086 2 50         if (is_fork) {
2087 0           self->op_result = NULL;
2088 0           self->conn = NULL;
2089             } else {
2090 2 50         if (self->op_result) {
2091 0           mysql_free_result(self->op_result);
2092 0           self->op_result = NULL;
2093             }
2094             {
2095 2           MYSQL *conn = self->conn;
2096 2           self->conn = NULL;
2097 2 50         if (conn) mysql_close(conn);
2098             }
2099             }
2100 2           self->loop = NULL;
2101 2           self->fd = -1;
2102             }
2103              
2104 2 50         if (NULL != self->on_connect) {
2105 0           SvREFCNT_dec(self->on_connect);
2106 0           self->on_connect = NULL;
2107             }
2108 2 50         if (NULL != self->on_error) {
2109 2           SvREFCNT_dec(self->on_error);
2110 2           self->on_error = NULL;
2111             }
2112 2           free_connect_strings(self);
2113 2           free_option_strings(self);
2114              
2115             /* deferred free: check_destroyed will Safefree when depth hits 0 */
2116 2 50         if (self->callback_depth == 0)
2117 2           Safefree(self);
2118             }
2119              
2120             void
2121             connect(EV::MariaDB self, const char *host, const char *user, const char *password, const char *database, unsigned int port = 3306, SV *unix_socket_sv = NULL)
2122             CODE:
2123             {
2124 0 0         if (NULL != self->conn) {
2125 0 0         croak(self->state == STATE_CONNECTING
2126             ? "connection already in progress"
2127             : "already connected");
2128             }
2129              
2130 0           free_connect_strings(self);
2131              
2132 0           self->host = safe_strdup(host);
2133 0           self->user = safe_strdup(user);
2134 0           self->password = safe_strdup(password);
2135 0           self->database = safe_strdup(database);
2136 0           self->port = port;
2137 0           self->unix_socket = NULL;
2138              
2139 0 0         if (unix_socket_sv && SvOK(unix_socket_sv)) {
    0          
2140 0           self->unix_socket = safe_strdup(SvPV_nolen(unix_socket_sv));
2141             }
2142              
2143 0           start_connect(self);
2144             }
2145              
2146             void
2147             reset(EV::MariaDB self)
2148             CODE:
2149             {
2150 0 0         if (NULL == self->host) {
2151 0           croak("no previous connection to reset");
2152             }
2153              
2154 0           cleanup_connection(self);
2155 0           cancel_pending(self, "connection reset");
2156 0 0         if (check_destroyed(self)) return;
2157 0 0         if (self->state != STATE_IDLE) return;
2158              
2159 0           start_connect(self);
2160             }
2161              
2162             void
2163             finish(EV::MariaDB self)
2164             CODE:
2165             {
2166 0           cleanup_connection(self);
2167 0           cancel_pending(self, "connection finished");
2168 0 0         if (check_destroyed(self)) return;
2169             }
2170              
2171             SV*
2172             on_connect(EV::MariaDB self, SV *handler = NULL)
2173             CODE:
2174             {
2175 0           RETVAL = handler_accessor(&self->on_connect, handler, items > 1);
2176             }
2177             OUTPUT:
2178             RETVAL
2179              
2180             SV*
2181             on_error(EV::MariaDB self, SV *handler = NULL)
2182             CODE:
2183             {
2184 2           RETVAL = handler_accessor(&self->on_error, handler, items > 1);
2185             }
2186             OUTPUT:
2187             RETVAL
2188              
2189             void
2190             query(EV::MariaDB self, SV *sql_sv, SV *cb)
2191             CODE:
2192             {
2193             STRLEN sql_len;
2194             const char *sql;
2195              
2196 0 0         if (NULL == self->conn) {
2197 0           croak("not connected");
2198             }
2199 0 0         if (IS_FORKED(self)) {
    0          
2200 0           croak("connection not valid after fork");
2201             }
2202 0 0         if (self->state >= STATE_STMT_PREPARE) {
2203 0           croak("cannot queue query: exclusive operation in progress");
2204             }
2205 0 0         if (!(SvROK(cb) && SvTYPE(SvRV(cb)) == SVt_PVCV)) {
    0          
2206 0           croak("callback must be a CODE reference");
2207             }
2208              
2209 0           sql = SvPV(sql_sv, sql_len);
2210 0           push_send(self, sql, sql_len, cb);
2211              
2212 0 0         if (self->state == STATE_IDLE && self->callback_depth == 0)
    0          
2213 0           pipeline_advance(self);
2214             }
2215              
2216             void
2217             prepare(EV::MariaDB self, SV *sql_sv, SV *cb)
2218             PREINIT:
2219             STRLEN sql_len;
2220             const char *sql;
2221             CODE:
2222             {
2223             int status;
2224             MYSQL_STMT *stmt;
2225              
2226 0 0         CHECK_READY(self, cb);
    0          
    0          
    0          
    0          
    0          
    0          
    0          
2227              
2228 0           stmt = mysql_stmt_init(self->conn);
2229 0 0         if (NULL == stmt) {
2230 0           croak("mysql_stmt_init failed");
2231             }
2232             {
2233 0           my_bool update_max = 1;
2234 0           mysql_stmt_attr_set(stmt, STMT_ATTR_UPDATE_MAX_LENGTH, &update_max);
2235             }
2236              
2237 0           sql = SvPV(sql_sv, sql_len);
2238 0           push_cb(self, cb);
2239 0           self->op_stmt = stmt;
2240              
2241 0           self->state = STATE_STMT_PREPARE;
2242 0           status = mysql_stmt_prepare_start(&self->op_ret, stmt, sql, (unsigned long)sql_len);
2243 0 0         if (status == 0) {
2244 0           on_stmt_prepare_done(self);
2245             } else {
2246 0           update_watchers(self, status);
2247             }
2248             }
2249              
2250             void
2251             execute(EV::MariaDB self, IV stmt_iv, SV *params_ref, SV *cb)
2252             PREINIT:
2253             ev_mariadb_stmt_t *ctx;
2254             int status;
2255             CODE:
2256             {
2257 0 0         CHECK_READY(self, cb);
    0          
    0          
    0          
    0          
    0          
    0          
    0          
2258              
2259 0           ctx = INT2PTR(ev_mariadb_stmt_t *, stmt_iv);
2260 0 0         CHECK_STMT(ctx);
    0          
2261              
2262 0 0         if (SvOK(params_ref)) {
2263 0 0         if (!SvROK(params_ref) || SvTYPE(SvRV(params_ref)) != SVt_PVAV)
    0          
2264 0           croak("params must be an ARRAY reference or undef");
2265 0           setup_bind_params(ctx, (AV *)SvRV(params_ref));
2266             }
2267              
2268 0           push_cb(self, cb);
2269 0           self->op_stmt = ctx->stmt;
2270 0           self->op_stmt_ctx = ctx;
2271              
2272 0           self->state = STATE_STMT_EXECUTE;
2273 0           status = mysql_stmt_execute_start(&self->op_ret, ctx->stmt);
2274 0 0         if (status == 0) {
2275 0           transition_to_stmt_store(self);
2276             } else {
2277 0           update_watchers(self, status);
2278             }
2279             }
2280              
2281             void
2282             bind_params(EV::MariaDB self, IV stmt_iv, SV *params_ref)
2283             PREINIT:
2284             ev_mariadb_stmt_t *ctx;
2285             CODE:
2286             {
2287 0 0         if (NULL == self->conn || self->state == STATE_CONNECTING)
    0          
2288 0           croak("not connected");
2289 0 0         if (IS_FORKED(self))
    0          
2290 0           croak("connection not valid after fork");
2291 0 0         if (self->state != STATE_IDLE)
2292 0           croak("another operation is in progress");
2293 0 0         if (!SvROK(params_ref) || SvTYPE(SvRV(params_ref)) != SVt_PVAV)
    0          
2294 0           croak("params must be an ARRAY reference");
2295              
2296 0           ctx = INT2PTR(ev_mariadb_stmt_t *, stmt_iv);
2297 0 0         CHECK_STMT(ctx);
    0          
2298 0           setup_bind_params(ctx, (AV *)SvRV(params_ref));
2299             }
2300              
2301             void
2302             close_stmt(EV::MariaDB self, IV stmt_iv, SV *cb)
2303             PREINIT:
2304             ev_mariadb_stmt_t *ctx;
2305             int status;
2306             CODE:
2307             {
2308 0 0         CHECK_READY(self, cb);
    0          
    0          
    0          
    0          
    0          
    0          
    0          
2309              
2310 0           ctx = INT2PTR(ev_mariadb_stmt_t *, stmt_iv);
2311 0 0         if (ctx->closed) {
2312             /* already closed by cleanup_connection — just free the wrapper */
2313 0           unlink_stmt(self, ctx);
2314 0           Safefree(ctx);
2315 0           push_cb(self, cb);
2316 0 0         if (deliver_value(self, newSViv(1))) return;
2317 0           maybe_pipeline(self);
2318 0           return;
2319             }
2320 0           free_stmt_bind_params(ctx);
2321 0           push_cb(self, cb);
2322 0           self->op_stmt = ctx->stmt;
2323 0           self->op_stmt_ctx = ctx;
2324              
2325 0           self->state = STATE_STMT_CLOSE;
2326 0           status = mysql_stmt_close_start(&self->op_bool_ret, ctx->stmt);
2327 0 0         if (status == 0) {
2328 0           on_stmt_close_done(self);
2329             } else {
2330 0           update_watchers(self, status);
2331             }
2332             }
2333              
2334             void
2335             stmt_reset(EV::MariaDB self, IV stmt_iv, SV *cb)
2336             PREINIT:
2337             ev_mariadb_stmt_t *ctx;
2338             int status;
2339             CODE:
2340             {
2341 0 0         CHECK_READY(self, cb);
    0          
    0          
    0          
    0          
    0          
    0          
    0          
2342              
2343 0           ctx = INT2PTR(ev_mariadb_stmt_t *, stmt_iv);
2344 0 0         CHECK_STMT(ctx);
    0          
2345 0           push_cb(self, cb);
2346 0           self->op_stmt = ctx->stmt;
2347              
2348 0           self->state = STATE_STMT_RESET;
2349 0           status = mysql_stmt_reset_start(&self->op_bool_ret, ctx->stmt);
2350 0 0         if (status == 0) {
2351 0           on_stmt_reset_done(self);
2352             } else {
2353 0           update_watchers(self, status);
2354             }
2355             }
2356              
2357             void
2358             ping(EV::MariaDB self, SV *cb)
2359             PREINIT:
2360             int status;
2361             CODE:
2362             {
2363 0 0         CHECK_READY(self, cb);
    0          
    0          
    0          
    0          
    0          
    0          
    0          
2364              
2365 0           push_cb(self, cb);
2366              
2367 0           self->state = STATE_PING;
2368 0           status = mysql_ping_start(&self->op_ret, self->conn);
2369 0 0         if (status == 0) {
2370 0           on_utility_done(self, self->op_ret != 0);
2371             } else {
2372 0           update_watchers(self, status);
2373             }
2374             }
2375              
2376             void
2377             change_user(EV::MariaDB self, const char *user, const char *password, SV *db_sv, SV *cb)
2378             PREINIT:
2379             int status;
2380             const char *db;
2381             CODE:
2382             {
2383 0 0         CHECK_READY(self, cb);
    0          
    0          
    0          
    0          
    0          
    0          
    0          
2384              
2385 0 0         db = (SvOK(db_sv)) ? SvPV_nolen(db_sv) : NULL;
2386              
2387             /* Update cached credentials so reset() reconnects with the new ones */
2388 0 0         if (self->user) Safefree(self->user);
2389 0           self->user = safe_strdup(user);
2390 0 0         if (self->password) Safefree(self->password);
2391 0           self->password = safe_strdup(password);
2392 0 0         if (db) {
2393 0 0         if (self->database) Safefree(self->database);
2394 0           self->database = safe_strdup(db);
2395             }
2396             /* if db is NULL (undef), keep self->database for reset() */
2397              
2398 0           push_cb(self, cb);
2399              
2400 0           self->state = STATE_CHANGE_USER;
2401 0           status = mysql_change_user_start(&self->op_bool_ret, self->conn, user, password, db);
2402 0 0         if (status == 0) {
2403 0           on_utility_done(self, self->op_bool_ret != 0);
2404             } else {
2405 0           update_watchers(self, status);
2406             }
2407             }
2408              
2409             void
2410             select_db(EV::MariaDB self, const char *db, SV *cb)
2411             PREINIT:
2412             int status;
2413             CODE:
2414             {
2415 0 0         CHECK_READY(self, cb);
    0          
    0          
    0          
    0          
    0          
    0          
    0          
2416              
2417             /* Update cached database so reset() reconnects to the right one */
2418 0 0         if (self->database) Safefree(self->database);
2419 0           self->database = safe_strdup(db);
2420              
2421 0           push_cb(self, cb);
2422              
2423 0           self->state = STATE_SELECT_DB;
2424 0           status = mysql_select_db_start(&self->op_ret, self->conn, db);
2425 0 0         if (status == 0) {
2426 0           on_utility_done(self, self->op_ret != 0);
2427             } else {
2428 0           update_watchers(self, status);
2429             }
2430             }
2431              
2432             void
2433             reset_connection(EV::MariaDB self, SV *cb)
2434             PREINIT:
2435             int status;
2436             CODE:
2437             {
2438 0 0         CHECK_READY(self, cb);
    0          
    0          
    0          
    0          
    0          
    0          
    0          
2439              
2440 0           push_cb(self, cb);
2441              
2442 0           self->state = STATE_RESET_CONNECTION;
2443 0           status = mysql_reset_connection_start(&self->op_ret, self->conn);
2444 0 0         if (status == 0) {
2445 0           on_utility_done(self, self->op_ret != 0);
2446             } else {
2447 0           update_watchers(self, status);
2448             }
2449             }
2450              
2451             void
2452             set_charset(EV::MariaDB self, const char *charset, SV *cb)
2453             PREINIT:
2454             int status;
2455             CODE:
2456             {
2457 0 0         CHECK_READY(self, cb);
    0          
    0          
    0          
    0          
    0          
    0          
    0          
2458              
2459             /* Update cached charset so reset() reconnects with the right one */
2460 0 0         if (self->charset) Safefree(self->charset);
2461 0           self->charset = safe_strdup(charset);
2462              
2463 0           push_cb(self, cb);
2464              
2465 0           self->state = STATE_SET_CHARSET;
2466 0           status = mysql_set_character_set_start(&self->op_ret, self->conn, charset);
2467 0 0         if (status == 0) {
2468 0           on_utility_done(self, self->op_ret != 0);
2469             } else {
2470 0           update_watchers(self, status);
2471             }
2472             }
2473              
2474             void
2475             commit(EV::MariaDB self, SV *cb)
2476             PREINIT:
2477             int status;
2478             CODE:
2479             {
2480 0 0         CHECK_READY(self, cb);
    0          
    0          
    0          
    0          
    0          
    0          
    0          
2481              
2482 0           push_cb(self, cb);
2483              
2484 0           self->state = STATE_COMMIT;
2485 0           status = mysql_commit_start(&self->op_bool_ret, self->conn);
2486 0 0         if (status == 0) {
2487 0           on_utility_done(self, self->op_bool_ret != 0);
2488             } else {
2489 0           update_watchers(self, status);
2490             }
2491             }
2492              
2493             void
2494             rollback(EV::MariaDB self, SV *cb)
2495             PREINIT:
2496             int status;
2497             CODE:
2498             {
2499 0 0         CHECK_READY(self, cb);
    0          
    0          
    0          
    0          
    0          
    0          
    0          
2500              
2501 0           push_cb(self, cb);
2502              
2503 0           self->state = STATE_ROLLBACK;
2504 0           status = mysql_rollback_start(&self->op_bool_ret, self->conn);
2505 0 0         if (status == 0) {
2506 0           on_utility_done(self, self->op_bool_ret != 0);
2507             } else {
2508 0           update_watchers(self, status);
2509             }
2510             }
2511              
2512             void
2513             autocommit(EV::MariaDB self, int mode, SV *cb)
2514             PREINIT:
2515             int status;
2516             CODE:
2517             {
2518 0 0         CHECK_READY(self, cb);
    0          
    0          
    0          
    0          
    0          
    0          
    0          
2519              
2520 0           push_cb(self, cb);
2521              
2522 0           self->state = STATE_AUTOCOMMIT;
2523 0           status = mysql_autocommit_start(&self->op_bool_ret, self->conn, (my_bool)(mode ? 1 : 0));
2524 0 0         if (status == 0) {
2525 0           on_utility_done(self, self->op_bool_ret != 0);
2526             } else {
2527 0           update_watchers(self, status);
2528             }
2529             }
2530              
2531             void
2532             query_stream(EV::MariaDB self, SV *sql_sv, SV *cb)
2533             PREINIT:
2534             STRLEN sql_len;
2535             const char *sql;
2536             int status;
2537             CODE:
2538             {
2539 0 0         CHECK_READY(self, cb);
    0          
    0          
    0          
    0          
    0          
    0          
    0          
2540              
2541 0           sql = SvPV(sql_sv, sql_len);
2542 0           self->stream_cb = SvREFCNT_inc(cb);
2543 0           self->pending_count++;
2544              
2545 0           self->state = STATE_REAL_QUERY;
2546 0           status = mysql_real_query_start(&self->op_ret, self->conn, sql, (unsigned long)sql_len);
2547 0 0         if (status == 0) {
2548 0           on_real_query_done(self);
2549             } else {
2550 0           update_watchers(self, status);
2551             }
2552             }
2553              
2554             void
2555             close_async(EV::MariaDB self, SV *cb)
2556             PREINIT:
2557             int status;
2558             CODE:
2559             {
2560 0 0         CHECK_READY(self, cb);
    0          
    0          
    0          
    0          
    0          
    0          
    0          
2561              
2562 0           push_cb(self, cb);
2563              
2564 0           self->state = STATE_CLOSE;
2565 0           status = mysql_close_start(self->conn);
2566 0 0         if (status == 0) {
2567 0           on_close_done(self);
2568             } else {
2569 0           update_watchers(self, status);
2570             }
2571             }
2572              
2573             void
2574             send_long_data(EV::MariaDB self, IV stmt_iv, unsigned int param_idx, SV *data_sv, SV *cb)
2575             PREINIT:
2576             ev_mariadb_stmt_t *ctx;
2577             STRLEN data_len;
2578             char *data;
2579             int status;
2580             CODE:
2581             {
2582 0 0         CHECK_READY(self, cb);
    0          
    0          
    0          
    0          
    0          
    0          
    0          
2583              
2584 0           ctx = INT2PTR(ev_mariadb_stmt_t *, stmt_iv);
2585 0 0         CHECK_STMT(ctx);
    0          
2586             {
2587 0           const char *src = SvPV(data_sv, data_len);
2588 0 0         Newx(data, data_len > 0 ? data_len : 1, char);
2589 0           Copy(src, data, data_len, char);
2590             }
2591 0           push_cb(self, cb);
2592 0           self->op_stmt = ctx->stmt;
2593 0           self->op_data_ptr = data;
2594              
2595 0           self->state = STATE_STMT_SEND_LONG_DATA;
2596 0           status = mysql_stmt_send_long_data_start(&self->op_bool_ret, ctx->stmt,
2597             param_idx, data, (unsigned long)data_len);
2598 0 0         if (status == 0) {
2599 0           on_send_long_data_done(self);
2600             } else {
2601 0           update_watchers(self, status);
2602             }
2603             }
2604              
2605             void
2606             _set_option(EV::MariaDB self, const char *key, SV *value)
2607             CODE:
2608             {
2609 0 0         if (strcmp(key, "connect_timeout") == 0) {
2610 0           self->connect_timeout = SvUV(value);
2611 0 0         } else if (strcmp(key, "read_timeout") == 0) {
2612 0           self->read_timeout = SvUV(value);
2613 0 0         } else if (strcmp(key, "write_timeout") == 0) {
2614 0           self->write_timeout = SvUV(value);
2615 0 0         } else if (strcmp(key, "compress") == 0) {
2616 0           self->compress = SvTRUE(value) ? 1 : 0;
2617 0 0         } else if (strcmp(key, "multi_statements") == 0) {
2618 0           self->multi_statements = SvTRUE(value) ? 1 : 0;
2619 0 0         } else if (strcmp(key, "found_rows") == 0) {
2620 0           self->found_rows = SvTRUE(value) ? 1 : 0;
2621 0 0         } else if (strcmp(key, "charset") == 0) {
2622 0 0         SET_STR_OPTION(charset);
    0          
2623 0 0         } else if (strcmp(key, "init_command") == 0) {
2624 0 0         SET_STR_OPTION(init_command);
    0          
2625 0 0         } else if (strcmp(key, "ssl_key") == 0) {
2626 0 0         SET_STR_OPTION(ssl_key);
    0          
2627 0 0         } else if (strcmp(key, "ssl_cert") == 0) {
2628 0 0         SET_STR_OPTION(ssl_cert);
    0          
2629 0 0         } else if (strcmp(key, "ssl_ca") == 0) {
2630 0 0         SET_STR_OPTION(ssl_ca);
    0          
2631 0 0         } else if (strcmp(key, "ssl_capath") == 0) {
2632 0 0         SET_STR_OPTION(ssl_capath);
    0          
2633 0 0         } else if (strcmp(key, "ssl_cipher") == 0) {
2634 0 0         SET_STR_OPTION(ssl_cipher);
    0          
2635 0 0         } else if (strcmp(key, "ssl_verify_server_cert") == 0) {
2636 0           self->ssl_verify_server_cert = SvTRUE(value) ? 1 : 0;
2637 0 0         } else if (strcmp(key, "utf8") == 0) {
2638 0           self->utf8 = SvTRUE(value) ? 1 : 0;
2639             } else {
2640 0           croak("unknown option: %s", key);
2641             }
2642             }
2643              
2644             int
2645             is_connected(EV::MariaDB self)
2646             CODE:
2647             {
2648 1 50         RETVAL = (NULL != self->conn && self->state != STATE_CONNECTING) ? 1 : 0;
    0          
    50          
2649             }
2650             OUTPUT:
2651             RETVAL
2652              
2653             SV*
2654             error_message(EV::MariaDB self)
2655             CODE:
2656             {
2657 0 0         if (NULL != self->conn) {
2658 0           const char *msg = mysql_error(self->conn);
2659 0 0         RETVAL = (msg && msg[0]) ? newSVpv(msg, 0) : &PL_sv_undef;
    0          
2660             }
2661             else {
2662 0           RETVAL = &PL_sv_undef;
2663             }
2664             }
2665             OUTPUT:
2666             RETVAL
2667              
2668             unsigned int
2669             error_number(EV::MariaDB self)
2670             CODE:
2671             {
2672 0 0         RETVAL = (NULL != self->conn) ? mysql_errno(self->conn) : 0;
    0          
2673             }
2674             OUTPUT:
2675             RETVAL
2676              
2677             SV*
2678             sqlstate(EV::MariaDB self)
2679             CODE:
2680             {
2681 0 0         if (NULL != self->conn) {
2682 0           const char *s = mysql_sqlstate(self->conn);
2683 0 0         RETVAL = (s && s[0]) ? newSVpv(s, 0) : &PL_sv_undef;
    0          
2684             }
2685             else {
2686 0           RETVAL = &PL_sv_undef;
2687             }
2688             }
2689             OUTPUT:
2690             RETVAL
2691              
2692             SV*
2693             insert_id(EV::MariaDB self)
2694             CODE:
2695             {
2696 0 0         if (NULL != self->conn) {
2697 0           my_ulonglong id = mysql_insert_id(self->conn);
2698 0           RETVAL = newSVuv((UV)id);
2699             }
2700             else {
2701 0           RETVAL = &PL_sv_undef;
2702             }
2703             }
2704             OUTPUT:
2705             RETVAL
2706              
2707             unsigned int
2708             warning_count(EV::MariaDB self)
2709             CODE:
2710             {
2711 0 0         RETVAL = (NULL != self->conn) ? mysql_warning_count(self->conn) : 0;
    0          
2712             }
2713             OUTPUT:
2714             RETVAL
2715              
2716             SV*
2717             info(EV::MariaDB self)
2718             CODE:
2719             {
2720 0 0         if (NULL != self->conn) {
2721 0           const char *i = mysql_info(self->conn);
2722 0 0         RETVAL = (i && i[0]) ? newSVpv(i, 0) : &PL_sv_undef;
    0          
2723             }
2724             else {
2725 0           RETVAL = &PL_sv_undef;
2726             }
2727             }
2728             OUTPUT:
2729             RETVAL
2730              
2731             UV
2732             server_version(EV::MariaDB self)
2733             CODE:
2734             {
2735 0 0         RETVAL = (NULL != self->conn) ? (UV)mysql_get_server_version(self->conn) : 0;
    0          
2736             }
2737             OUTPUT:
2738             RETVAL
2739              
2740             SV*
2741             server_info(EV::MariaDB self)
2742             CODE:
2743             {
2744 0 0         if (NULL != self->conn) {
2745 0           const char *info = mysql_get_server_info(self->conn);
2746 0 0         RETVAL = (info && info[0]) ? newSVpv(info, 0) : &PL_sv_undef;
    0          
2747             }
2748             else {
2749 0           RETVAL = &PL_sv_undef;
2750             }
2751             }
2752             OUTPUT:
2753             RETVAL
2754              
2755             UV
2756             thread_id(EV::MariaDB self)
2757             CODE:
2758             {
2759 0 0         RETVAL = (NULL != self->conn) ? (UV)mysql_thread_id(self->conn) : 0;
    0          
2760             }
2761             OUTPUT:
2762             RETVAL
2763              
2764             SV*
2765             host_info(EV::MariaDB self)
2766             CODE:
2767             {
2768 0 0         if (NULL != self->conn) {
2769 0           const char *info = mysql_get_host_info(self->conn);
2770 0 0         RETVAL = (info && info[0]) ? newSVpv(info, 0) : &PL_sv_undef;
    0          
2771             }
2772             else {
2773 0           RETVAL = &PL_sv_undef;
2774             }
2775             }
2776             OUTPUT:
2777             RETVAL
2778              
2779             SV*
2780             character_set_name(EV::MariaDB self)
2781             CODE:
2782             {
2783 0 0         if (NULL != self->conn) {
2784 0           const char *cs = mysql_character_set_name(self->conn);
2785 0 0         RETVAL = (cs && cs[0]) ? newSVpv(cs, 0) : &PL_sv_undef;
    0          
2786             }
2787             else {
2788 0           RETVAL = &PL_sv_undef;
2789             }
2790             }
2791             OUTPUT:
2792             RETVAL
2793              
2794             int
2795             socket(EV::MariaDB self)
2796             CODE:
2797             {
2798 0 0         RETVAL = (NULL != self->conn) ? (int)mysql_get_socket(self->conn) : -1;
    0          
2799             }
2800             OUTPUT:
2801             RETVAL
2802              
2803             SV*
2804             affected_rows(EV::MariaDB self)
2805             CODE:
2806             {
2807 0 0         if (NULL != self->conn) {
2808 0           my_ulonglong rows = mysql_affected_rows(self->conn);
2809 0 0         RETVAL = (rows == (my_ulonglong)-1) ? &PL_sv_undef : newSVuv((UV)rows);
2810             }
2811             else {
2812 0           RETVAL = &PL_sv_undef;
2813             }
2814             }
2815             OUTPUT:
2816             RETVAL
2817              
2818             SV*
2819             escape(EV::MariaDB self, SV *str)
2820             PREINIT:
2821             STRLEN len;
2822             const char *s;
2823             unsigned long elen;
2824             CODE:
2825             {
2826 0 0         if (NULL == self->conn || self->state == STATE_CONNECTING
    0          
2827 0 0         || self->state == STATE_CLOSE
2828 0 0         || IS_FORKED(self)) {
    0          
2829 0           croak("not connected");
2830             }
2831 0           s = SvPV(str, len);
2832 0 0         if (SvUTF8(str)) {
2833 0           const char *cs = mysql_character_set_name(self->conn);
2834 0 0         if (!cs || (strncmp(cs, "utf8", 4) != 0)) {
    0          
2835 0 0         warn("EV::MariaDB: escaping a UTF-8 string on a non-utf8 connection (%s) may cause corruption or injection vulnerabilities", cs ? cs : "unknown");
2836             }
2837             }
2838 0           RETVAL = newSV(len * 2 + 1);
2839 0           SvPOK_on(RETVAL);
2840 0           elen = mysql_real_escape_string(self->conn, SvPVX(RETVAL), s, (unsigned long)len);
2841 0 0         if (elen == (unsigned long)-1) {
2842 0           SvREFCNT_dec(RETVAL);
2843 0           croak("mysql_real_escape_string failed");
2844             }
2845 0           SvCUR_set(RETVAL, elen);
2846 0           *SvEND(RETVAL) = '\0';
2847             }
2848             OUTPUT:
2849             RETVAL
2850              
2851             int
2852             pending_count(EV::MariaDB self)
2853             CODE:
2854             {
2855 0 0         RETVAL = self->pending_count;
2856             }
2857             OUTPUT:
2858             RETVAL
2859              
2860             void
2861             skip_pending(EV::MariaDB self)
2862             CODE:
2863             {
2864 0 0         if (self->state != STATE_IDLE || self->send_count > 0) {
    0          
2865 0           cleanup_connection(self);
2866             }
2867 0           cancel_pending(self, "skipped");
2868 0 0         if (check_destroyed(self)) return;
2869             }
2870              
2871             UV
2872             lib_version(char *class)
2873             CODE:
2874             {
2875             PERL_UNUSED_VAR(class);
2876 1           RETVAL = (UV)mysql_get_client_version();
2877             }
2878             OUTPUT:
2879             RETVAL
2880              
2881             SV*
2882             lib_info(char *class)
2883             CODE:
2884             {
2885             PERL_UNUSED_VAR(class);
2886 0           RETVAL = newSVpv(mysql_get_client_info(), 0);
2887             }
2888             OUTPUT:
2889             RETVAL