File Coverage

MariaDB.xs
Criterion Covered Total %
statement 104 1561 6.6
branch 37 1196 3.0
condition n/a
subroutine n/a
pod n/a
total 141 2757 5.1


line stmt bran cond sub pod time code
1             #include "EXTERN.h"
2             #include "perl.h"
3             #include "XSUB.h"
4              
5             #include "EVAPI.h"
6             #include
7              
8             #include "ngx_queue.h"
9              
10             typedef struct ev_mariadb_s ev_mariadb_t;
11             typedef struct ev_mariadb_cb_s ev_mariadb_cb_t;
12             typedef struct ev_mariadb_send_s ev_mariadb_send_t;
13             typedef struct ev_mariadb_stmt_s ev_mariadb_stmt_t;
14              
15             typedef ev_mariadb_t* EV__MariaDB;
16             typedef struct ev_loop* EV__Loop;
17              
18             #define EV_MARIADB_MAGIC 0xDEADBEEF
19             #define EV_MARIADB_FREED 0xFEEDFACE
20              
21             enum ev_mariadb_state {
22             STATE_IDLE,
23             STATE_CONNECTING,
24             STATE_SEND,
25             STATE_READ_RESULT,
26             STATE_STORE_RESULT,
27             STATE_NEXT_RESULT,
28             STATE_PING,
29             STATE_CHANGE_USER,
30             STATE_SELECT_DB,
31             STATE_RESET_CONNECTION,
32             STATE_SET_CHARSET,
33             STATE_COMMIT,
34             STATE_ROLLBACK,
35             STATE_AUTOCOMMIT,
36             STATE_STMT_PREPARE, /* boundary: states >= here block query queueing */
37             STATE_STMT_EXECUTE,
38             STATE_STMT_STORE,
39             STATE_STMT_CLOSE,
40             STATE_STMT_RESET,
41             STATE_STMT_SEND_LONG_DATA,
42             STATE_REAL_QUERY,
43             STATE_STREAM_FETCH,
44             STATE_CLOSE,
45             };
46              
47             struct ev_mariadb_s {
48             unsigned int magic;
49             struct ev_loop *loop;
50             MYSQL *conn;
51              
52             ev_io rio, wio;
53             ev_timer timer;
54             int reading, writing, timing;
55             int fd;
56              
57             enum ev_mariadb_state state;
58             char *host, *user, *password, *database, *unix_socket;
59             unsigned int port;
60              
61             ngx_queue_t cb_queue; /* callbacks for sent queries (awaiting results) */
62             ngx_queue_t send_queue; /* queries waiting to be sent */
63             int pending_count; /* total: send_queue + cb_queue */
64             int send_count; /* queries sent, results not yet read */
65             int draining; /* draining multi-result extras */
66              
67             /* current operation context */
68             int op_ret;
69             MYSQL_RES *op_result;
70             MYSQL_STMT *op_stmt;
71             ev_mariadb_stmt_t *op_stmt_ctx; /* per-stmt wrapper for bind_params cleanup */
72             ev_mariadb_stmt_t *stmt_list; /* all allocated stmt wrappers */
73             MYSQL *op_conn_ret;
74             my_bool op_bool_ret;
75             MYSQL_ROW op_row; /* for streaming fetch_row result */
76             SV *stream_cb; /* streaming per-row callback */
77             char *op_data_ptr; /* copied data buffer for send_long_data */
78              
79             SV *on_connect;
80             SV *on_error;
81              
82             int callback_depth;
83             pid_t connect_pid; /* PID at connect time, for fork detection */
84              
85             /* connection options (applied before mysql_real_connect_start) */
86             unsigned int connect_timeout;
87             unsigned int read_timeout;
88             unsigned int write_timeout;
89             int compress;
90             int multi_statements;
91             int found_rows;
92             char *charset;
93             char *init_command;
94             char *ssl_key;
95             char *ssl_cert;
96             char *ssl_ca;
97             char *ssl_capath;
98             char *ssl_cipher;
99             int ssl_verify_server_cert;
100             int utf8; /* auto-flag result strings as UTF-8 */
101             unsigned long client_flags;
102              
103             /* Pending cache values for in-flight change_user/select_db/set_charset.
104             Committed by on_utility_done on success, freed on failure. The strings
105             are also passed to the libmariadb start function so they remain valid
106             across the async op. */
107             char *pending_user;
108             char *pending_password;
109             char *pending_database;
110             char *pending_charset;
111             };
112              
113             struct ev_mariadb_cb_s {
114             SV *cb;
115             ngx_queue_t queue;
116             };
117              
118             struct ev_mariadb_send_s {
119             char *sql;
120             unsigned long sql_len;
121             SV *cb;
122             ngx_queue_t queue;
123             };
124              
125             struct ev_mariadb_stmt_s {
126             MYSQL_STMT *stmt;
127             MYSQL_BIND *bind_params;
128             int bind_param_count;
129             int closed; /* invalidated by cleanup_connection */
130             ev_mariadb_stmt_t *next; /* linked list of all stmts on this connection */
131             };
132              
133             #define MAX_PIPELINE_DEPTH 64
134             #define MAX_FREELIST_DEPTH 64
135              
136             #define COPY_ERROR(buf, src) do { \
137             strncpy((buf), (src), sizeof(buf) - 1); \
138             (buf)[sizeof(buf) - 1] = '\0'; \
139             } while (0)
140              
141             #define SET_STR_OPTION(field) do { \
142             if (self->field) Safefree(self->field); \
143             self->field = SvOK(value) ? safe_strdup(SvPV_nolen(value)) : NULL; \
144             } while (0)
145              
146             #define IS_FORKED(self) \
147             ((self)->connect_pid != 0 && (self)->connect_pid != getpid())
148              
149             /* Safefree is NULL-safe; the NULL assignment matters for callers where
150             self survives the call (cleanup_connection, reconnect via reset()). */
151             #define FREE_STR(p) do { Safefree(p); (p) = NULL; } while (0)
152              
153             #define CHECK_READY(self, cb) \
154             do { \
155             if (NULL == (self)->conn || (self)->state == STATE_CONNECTING) \
156             croak("not connected"); \
157             if (IS_FORKED(self)) \
158             croak("connection not valid after fork"); \
159             if ((self)->state != STATE_IDLE) \
160             croak("another operation is in progress"); \
161             if ((self)->send_count > 0) \
162             croak("cannot start operation while pipeline results are pending"); \
163             if (!(SvROK(cb) && SvTYPE(SvRV(cb)) == SVt_PVCV)) \
164             croak("callback must be a CODE reference"); \
165             } while (0)
166              
167             #define CHECK_STMT(ctx) \
168             do { \
169             if (!(ctx)) \
170             croak("invalid statement handle"); \
171             if ((ctx)->closed) \
172             croak("statement handle is no longer valid (connection was reset)"); \
173             } while (0)
174              
175             static void io_cb(EV_P_ ev_io *w, int revents);
176             static void timer_cb(EV_P_ ev_timer *w, int revents);
177             static void continue_operation(ev_mariadb_t *self, int events);
178             static void pipeline_advance(ev_mariadb_t *self);
179             static void on_next_result_done(ev_mariadb_t *self);
180             static void start_reading(ev_mariadb_t *self);
181             static void stop_reading(ev_mariadb_t *self);
182             static void start_writing(ev_mariadb_t *self);
183             static void stop_writing(ev_mariadb_t *self);
184             static void free_stmt_bind_params(ev_mariadb_stmt_t *ctx);
185             static int is_utf8_charset(unsigned int charsetnr);
186             static void start_timer(ev_mariadb_t *self);
187             static void stop_timer(ev_mariadb_t *self);
188             static void update_watchers(ev_mariadb_t *self, int status);
189             static void emit_error(ev_mariadb_t *self, const char *msg);
190             static void cleanup_connection(ev_mariadb_t *self);
191             static int check_destroyed(ev_mariadb_t *self);
192             static void drain_multi_result(ev_mariadb_t *self);
193             static AV* build_field_names(MYSQL *conn, MYSQL_FIELD *fields, unsigned int ncols);
194             static void on_real_query_done(ev_mariadb_t *self);
195             static void on_stream_fetch_done(ev_mariadb_t *self);
196             static void on_close_done(ev_mariadb_t *self);
197              
198 0           static void maybe_pipeline(ev_mariadb_t *self) {
199 0 0         if (self->state != STATE_IDLE) return;
200 0 0         if (!ngx_queue_empty(&self->send_queue)) {
201 0           pipeline_advance(self);
202 0           return;
203             }
204 0           stop_reading(self);
205 0           stop_writing(self);
206 0           stop_timer(self);
207             }
208              
209             /* --- freelist for cb_queue entries --- */
210              
211             static ev_mariadb_cb_t *cbt_freelist = NULL;
212             static int cbt_freelist_size = 0;
213              
214 0           static ev_mariadb_cb_t* alloc_cbt(void) {
215             ev_mariadb_cb_t *cbt;
216 0 0         if (cbt_freelist) {
217 0           cbt = cbt_freelist;
218 0           cbt_freelist = *(ev_mariadb_cb_t **)cbt;
219 0           cbt_freelist_size--;
220             } else {
221 0           Newx(cbt, 1, ev_mariadb_cb_t);
222             }
223 0           return cbt;
224             }
225              
226 0           static void release_cbt(ev_mariadb_cb_t *cbt) {
227 0 0         if (cbt_freelist_size >= MAX_FREELIST_DEPTH) {
228 0           Safefree(cbt);
229 0           return;
230             }
231 0           *(ev_mariadb_cb_t **)cbt = cbt_freelist;
232 0           cbt_freelist = cbt;
233 0           cbt_freelist_size++;
234             }
235              
236             /* --- freelist for send_queue entries --- */
237              
238             static ev_mariadb_send_t *send_freelist = NULL;
239             static int send_freelist_size = 0;
240              
241 0           static ev_mariadb_send_t* alloc_send(void) {
242             ev_mariadb_send_t *s;
243 0 0         if (send_freelist) {
244 0           s = send_freelist;
245 0           send_freelist = *(ev_mariadb_send_t **)s;
246 0           send_freelist_size--;
247             } else {
248 0           Newx(s, 1, ev_mariadb_send_t);
249             }
250 0           return s;
251             }
252              
253 0           static void release_send(ev_mariadb_send_t *s) {
254 0 0         if (send_freelist_size >= MAX_FREELIST_DEPTH) {
255 0           Safefree(s);
256 0           return;
257             }
258 0           *(ev_mariadb_send_t **)s = send_freelist;
259 0           send_freelist = s;
260 0           send_freelist_size++;
261             }
262              
263             /* Drain freelists at interpreter shutdown so Valgrind reports zero leaks. */
264 15           static void drain_freelists(pTHX_ void *unused) {
265             PERL_UNUSED_VAR(unused);
266 15 50         while (cbt_freelist) {
267 0           ev_mariadb_cb_t *next = *(ev_mariadb_cb_t **)cbt_freelist;
268 0           Safefree(cbt_freelist);
269 0           cbt_freelist = next;
270             }
271 15           cbt_freelist_size = 0;
272 15 50         while (send_freelist) {
273 0           ev_mariadb_send_t *next = *(ev_mariadb_send_t **)send_freelist;
274 0           Safefree(send_freelist);
275 0           send_freelist = next;
276             }
277 15           send_freelist_size = 0;
278 15           }
279              
280 0           static void push_send(ev_mariadb_t *self, const char *sql, STRLEN sql_len, SV *cb) {
281             char *sql_copy;
282             ev_mariadb_send_t *s;
283 0           Newx(sql_copy, sql_len + 1, char);
284 0           Copy(sql, sql_copy, sql_len + 1, char);
285              
286 0           s = alloc_send();
287 0           s->sql = sql_copy;
288 0           s->sql_len = (unsigned long)sql_len;
289 0           s->cb = SvREFCNT_inc(cb);
290 0           ngx_queue_insert_tail(&self->send_queue, &s->queue);
291 0           self->pending_count++;
292 0           }
293              
294 2           static void drain_queues_silent(ev_mariadb_t *self) {
295 2 50         while (!ngx_queue_empty(&self->send_queue)) {
296 0           ngx_queue_t *q = ngx_queue_head(&self->send_queue);
297 0           ev_mariadb_send_t *send = ngx_queue_data(q, ev_mariadb_send_t, queue);
298 0           ngx_queue_remove(q);
299 0           Safefree(send->sql);
300 0           SvREFCNT_dec(send->cb);
301 0           release_send(send);
302 0           self->pending_count--;
303             }
304 2 50         while (!ngx_queue_empty(&self->cb_queue)) {
305 0           ngx_queue_t *q = ngx_queue_head(&self->cb_queue);
306 0           ev_mariadb_cb_t *cbt = ngx_queue_data(q, ev_mariadb_cb_t, queue);
307 0           ngx_queue_remove(q);
308 0           SvREFCNT_dec(cbt->cb);
309 0           release_cbt(cbt);
310 0           self->pending_count--;
311             }
312 2           }
313              
314             /* --- watcher helpers --- */
315              
316 0           static void start_reading(ev_mariadb_t *self) {
317 0 0         if (!self->reading && self->fd >= 0) {
    0          
318 0           ev_io_start(self->loop, &self->rio);
319 0           self->reading = 1;
320             }
321 0           }
322              
323 2           static void stop_reading(ev_mariadb_t *self) {
324 2 50         if (self->reading) {
325 0           ev_io_stop(self->loop, &self->rio);
326 0           self->reading = 0;
327             }
328 2           }
329              
330 0           static void start_writing(ev_mariadb_t *self) {
331 0 0         if (!self->writing && self->fd >= 0) {
    0          
332 0           ev_io_start(self->loop, &self->wio);
333 0           self->writing = 1;
334             }
335 0           }
336              
337 2           static void stop_writing(ev_mariadb_t *self) {
338 2 50         if (self->writing) {
339 0           ev_io_stop(self->loop, &self->wio);
340 0           self->writing = 0;
341             }
342 2           }
343              
344 0           static void start_timer(ev_mariadb_t *self) {
345             /* libmariadb may report a fresh timeout on each _cont; always re-arm.
346             ms == 0 means "fire as soon as possible" — use 1ms so the timer fires
347             on the next loop iteration rather than stalling. */
348 0           unsigned int ms = mysql_get_timeout_value_ms(self->conn);
349 0 0         if (ms == 0) ms = 1;
350 0 0         if (self->timing) ev_timer_stop(self->loop, &self->timer);
351 0           ev_timer_set(&self->timer, ms / 1000.0, 0.0);
352 0           ev_timer_start(self->loop, &self->timer);
353 0           self->timing = 1;
354 0           }
355              
356 2           static void stop_timer(ev_mariadb_t *self) {
357 2 50         if (self->timing) {
358 0           ev_timer_stop(self->loop, &self->timer);
359 0           self->timing = 0;
360             }
361 2           }
362              
363 0           static void update_watchers(ev_mariadb_t *self, int status) {
364 0 0         if (status & (MYSQL_WAIT_READ | MYSQL_WAIT_EXCEPT)) start_reading(self); else stop_reading(self);
365 0 0         if (status & MYSQL_WAIT_WRITE) start_writing(self); else stop_writing(self);
366 0 0         if (status & MYSQL_WAIT_TIMEOUT) start_timer(self); else stop_timer(self);
367 0           }
368              
369 0           static void init_io_watchers(ev_mariadb_t *self) {
370 0           ev_io_init(&self->rio, io_cb, self->fd, EV_READ);
371 0           self->rio.data = (void *)self;
372 0           ev_io_init(&self->wio, io_cb, self->fd, EV_WRITE);
373 0           self->wio.data = (void *)self;
374 0           }
375              
376 0           static int check_destroyed(ev_mariadb_t *self) {
377 0 0         if (self->magic == EV_MARIADB_FREED &&
378 0 0         self->callback_depth == 0) {
379 0           Safefree(self);
380 0           return 1;
381             }
382 0           return 0;
383             }
384              
385 0           static void emit_error(ev_mariadb_t *self, const char *msg) {
386             SV *cb;
387 0 0         if (NULL == self->on_error) return;
388 0           cb = sv_2mortal(SvREFCNT_inc_simple_NN(self->on_error));
389              
390 0           dSP;
391 0           ENTER;
392 0           SAVETMPS;
393 0 0         PUSHMARK(SP);
394 0 0         XPUSHs(sv_2mortal(newSVpv(msg, 0)));
395 0           PUTBACK;
396              
397 0           call_sv(cb, G_DISCARD | G_EVAL);
398 0 0         if (SvTRUE(ERRSV)) {
    0          
399 0 0         warn("EV::MariaDB: exception in error handler: %s", SvPV_nolen(ERRSV));
400 0 0         sv_setpvn(ERRSV, "", 0);
401             }
402              
403 0 0         FREETMPS;
404 0           LEAVE;
405             }
406              
407             /* Pop and return the head callback from cb_queue. Caller must SvREFCNT_dec. */
408 0           static SV* pop_cb(ev_mariadb_t *self) {
409             ngx_queue_t *q;
410             ev_mariadb_cb_t *cbt;
411             SV *cb;
412              
413 0 0         if (ngx_queue_empty(&self->cb_queue)) return NULL;
414              
415 0           q = ngx_queue_head(&self->cb_queue);
416 0           cbt = ngx_queue_data(q, ev_mariadb_cb_t, queue);
417              
418 0           cb = cbt->cb;
419 0           ngx_queue_remove(q);
420 0           self->pending_count--;
421 0           release_cbt(cbt);
422              
423 0           return cb;
424             }
425              
426             /* Invoke a callback with (undef, errmsg). Decrements cb refcount. */
427 0           static void invoke_error_cb(SV *cb, const char *errmsg) {
428 0           dSP;
429 0           ENTER;
430 0           SAVETMPS;
431 0 0         PUSHMARK(SP);
432 0           PUSHs(&PL_sv_undef);
433 0           PUSHs(sv_2mortal(newSVpv(errmsg, 0)));
434 0           PUTBACK;
435 0           call_sv(cb, G_DISCARD | G_EVAL);
436 0 0         if (SvTRUE(ERRSV)) {
    0          
437 0 0         warn("EV::MariaDB: exception in callback: %s", SvPV_nolen(ERRSV));
438 0 0         sv_setpvn(ERRSV, "", 0);
439             }
440 0           SvREFCNT_dec(cb);
441 0 0         FREETMPS;
442 0           LEAVE;
443 0           }
444              
445             /* Invoke a callback with (undef) — EOF signal. Decrements cb refcount. */
446 0           static void invoke_eof_cb(SV *cb) {
447 0           dSP;
448 0           ENTER;
449 0           SAVETMPS;
450 0 0         PUSHMARK(SP);
451 0           PUSHs(&PL_sv_undef);
452 0           PUTBACK;
453 0           call_sv(cb, G_DISCARD | G_EVAL);
454 0 0         if (SvTRUE(ERRSV)) {
    0          
455 0 0         warn("EV::MariaDB: exception in stream callback: %s", SvPV_nolen(ERRSV));
456 0 0         sv_setpvn(ERRSV, "", 0);
457             }
458 0           SvREFCNT_dec(cb);
459 0 0         FREETMPS;
460 0           LEAVE;
461 0           }
462              
463             /* Invoke a callback SV with args already on the stack. Decrements refcount. */
464 0           static void invoke_cb(SV *cb) {
465 0           call_sv(cb, G_DISCARD | G_EVAL);
466 0 0         if (SvTRUE(ERRSV)) {
    0          
467 0 0         warn("EV::MariaDB: exception in callback: %s", SvPV_nolen(ERRSV));
468 0 0         sv_setpvn(ERRSV, "", 0);
469             }
470 0           SvREFCNT_dec(cb);
471 0           }
472              
473             /* Returns 1 if self was freed (caller must not touch self). */
474 0           static int deliver_result(ev_mariadb_t *self) {
475 0           MYSQL_RES *res = self->op_result;
476 0           SV *cb = pop_cb(self);
477              
478 0 0         if (cb == NULL) {
479 0 0         if (res) {
480 0           mysql_free_result(res);
481 0           self->op_result = NULL;
482             }
483 0           return 0;
484             }
485              
486 0           self->callback_depth++;
487              
488             {
489 0           dSP;
490 0           ENTER;
491 0           SAVETMPS;
492 0 0         PUSHMARK(SP);
493              
494 0 0         if (res == NULL && mysql_field_count(self->conn) > 0) {
    0          
495 0           PUSHs(&PL_sv_undef);
496 0           PUSHs(sv_2mortal(newSVpv(mysql_error(self->conn), 0)));
497             }
498 0 0         else if (res != NULL) {
499 0           my_ulonglong nrows = mysql_num_rows(res);
500 0           unsigned int ncols = mysql_num_fields(res);
501 0           MYSQL_FIELD *fields = mysql_fetch_fields(res);
502 0           AV *rows = newAV();
503             AV *fnames;
504             MYSQL_ROW row;
505             unsigned long *lengths;
506              
507 0 0         if (nrows > 0) av_extend(rows, (SSize_t)(nrows < (my_ulonglong)SSize_t_MAX ? nrows - 1 : SSize_t_MAX));
    0          
508 0 0         while ((row = mysql_fetch_row(res)) != NULL) {
509 0           AV *r = newAV();
510             unsigned int c;
511 0           lengths = mysql_fetch_lengths(res);
512 0 0         if (ncols > 0) av_extend(r, ncols - 1);
513 0 0         for (c = 0; c < ncols; c++) {
514 0 0         if (row[c] == NULL) {
515 0           av_push(r, newSV(0));
516             } else {
517 0           SV *val = newSVpvn(row[c], lengths[c]);
518 0 0         if (self->utf8 && is_utf8_charset(fields[c].charsetnr))
    0          
519 0           SvUTF8_on(val);
520 0           av_push(r, val);
521             }
522             }
523 0           av_push(rows, newRV_noinc((SV*)r));
524             }
525             /* build field names before freeing result */
526 0           fnames = build_field_names(self->conn, fields, ncols);
527 0           mysql_free_result(res);
528 0           self->op_result = NULL;
529 0           PUSHs(sv_2mortal(newRV_noinc((SV*)rows)));
530 0           PUSHs(&PL_sv_undef);
531 0           PUSHs(sv_2mortal(newRV_noinc((SV*)fnames)));
532             }
533             else {
534 0           my_ulonglong affected = mysql_affected_rows(self->conn);
535 0           PUSHs(sv_2mortal(newSVuv((UV)affected)));
536             }
537              
538 0           PUTBACK;
539 0           invoke_cb(cb);
540 0 0         FREETMPS;
541 0           LEAVE;
542             }
543              
544 0           self->callback_depth--;
545 0           return check_destroyed(self);
546             }
547              
548             /* Returns 1 if self was freed. */
549 0           static int deliver_error(ev_mariadb_t *self, const char *errmsg) {
550 0           SV *cb = pop_cb(self);
551 0 0         if (cb == NULL) return 0;
552              
553 0           self->callback_depth++;
554 0           invoke_error_cb(cb, errmsg);
555 0           self->callback_depth--;
556 0           return check_destroyed(self);
557             }
558              
559             /* Returns 1 if self was freed. Caller passes a non-mortal SV; ownership is
560             transferred — deliver_value will sv_2mortal inside its SAVETMPS scope so
561             FREETMPS correctly frees it (avoids mortal leak when sv_2mortal is called
562             before SAVETMPS). */
563 0           static int deliver_value(ev_mariadb_t *self, SV *val) {
564 0           SV *cb = pop_cb(self);
565 0 0         if (cb == NULL) {
566 0           SvREFCNT_dec(val);
567 0           return 0;
568             }
569              
570 0           self->callback_depth++;
571              
572             {
573 0           dSP;
574 0           ENTER;
575 0           SAVETMPS;
576 0 0         PUSHMARK(SP);
577 0           PUSHs(sv_2mortal(val));
578 0           PUTBACK;
579 0           invoke_cb(cb);
580 0 0         FREETMPS;
581 0           LEAVE;
582             }
583              
584 0           self->callback_depth--;
585 0           return check_destroyed(self);
586             }
587              
588 0           static void cleanup_connection(ev_mariadb_t *self) {
589 0           int saved_fd = self->fd;
590 0 0         int is_fork = IS_FORKED(self);
    0          
591              
592 0           stop_reading(self);
593 0           stop_writing(self);
594 0           stop_timer(self);
595 0           self->fd = -1;
596 0           self->state = STATE_IDLE;
597 0           self->send_count = 0;
598 0           self->draining = 0;
599              
600 0 0         if (self->op_data_ptr) {
601 0           Safefree(self->op_data_ptr);
602 0           self->op_data_ptr = NULL;
603             }
604              
605             /* Discard any pending utility-op cache update — the op was cancelled. */
606 0           FREE_STR(self->pending_user);
607 0           FREE_STR(self->pending_password);
608 0           FREE_STR(self->pending_database);
609 0           FREE_STR(self->pending_charset);
610              
611 0           self->op_stmt_ctx = NULL;
612              
613             /* Invalidate all tracked stmt wrappers (don't free — user may hold handles) */
614             {
615 0           ev_mariadb_stmt_t *ctx = self->stmt_list;
616 0 0         while (ctx) {
617 0           free_stmt_bind_params(ctx);
618 0 0         if (!is_fork && ctx->stmt) {
    0          
619 0 0         if (ctx->stmt == self->op_stmt)
620 0           self->op_stmt = NULL; /* avoid double-close below */
621 0           mysql_stmt_close(ctx->stmt);
622             }
623 0           ctx->stmt = NULL;
624 0           ctx->closed = 1;
625 0           ctx = ctx->next;
626             }
627             /* keep stmt_list linked for close_stmt to unlink+free later */
628             }
629              
630 0 0         if (is_fork) {
631 0           self->op_result = NULL;
632 0           self->op_stmt = NULL;
633 0           self->conn = NULL;
634             } else {
635 0 0         if (self->op_result && saved_fd >= 0)
    0          
636 0           shutdown(saved_fd, SHUT_RDWR);
637             /* Close op_stmt if not tracked in stmt_list (e.g., in-flight prepare) */
638 0 0         if (self->op_stmt) {
639 0           mysql_stmt_close(self->op_stmt);
640             }
641 0           self->op_stmt = NULL;
642 0 0         if (self->op_result) {
643 0           mysql_free_result(self->op_result);
644 0           self->op_result = NULL;
645             }
646 0 0         if (self->conn) {
647 0           MYSQL *conn = self->conn;
648 0           self->conn = NULL;
649 0           mysql_close(conn);
650             }
651             }
652 0           }
653              
654 2           static void cancel_pending(ev_mariadb_t *self, const char *errmsg) {
655             ngx_queue_t local_send;
656             ngx_queue_t local_cb;
657              
658 2           ngx_queue_init(&local_send);
659 2           ngx_queue_init(&local_cb);
660              
661 2 50         if (!ngx_queue_empty(&self->send_queue)) {
662 0           ngx_queue_add(&local_send, &self->send_queue);
663 0           ngx_queue_init(&self->send_queue);
664             }
665              
666 2 50         if (!ngx_queue_empty(&self->cb_queue)) {
667 0           ngx_queue_add(&local_cb, &self->cb_queue);
668 0           ngx_queue_init(&self->cb_queue);
669             }
670              
671 2           self->send_count = 0;
672 2           self->callback_depth++;
673              
674             /* cancel active stream if any */
675 2 50         if (self->stream_cb) {
676 0           SV *cb = self->stream_cb;
677 0           self->stream_cb = NULL;
678 0           self->pending_count--;
679 0           invoke_error_cb(cb, errmsg);
680             }
681              
682             /* cancel unsent queries */
683 2 50         while (!ngx_queue_empty(&local_send)) {
684 0           ngx_queue_t *q = ngx_queue_head(&local_send);
685 0           ev_mariadb_send_t *send = ngx_queue_data(q, ev_mariadb_send_t, queue);
686 0           SV *cb = send->cb;
687 0           ngx_queue_remove(q);
688 0           Safefree(send->sql);
689 0           release_send(send);
690 0           self->pending_count--;
691              
692 0           invoke_error_cb(cb, errmsg);
693             }
694              
695             /* cancel sent-but-not-received */
696 2 50         while (!ngx_queue_empty(&local_cb)) {
697 0           ngx_queue_t *q = ngx_queue_head(&local_cb);
698 0           ev_mariadb_cb_t *cbt = ngx_queue_data(q, ev_mariadb_cb_t, queue);
699 0           SV *cb = cbt->cb;
700 0           ngx_queue_remove(q);
701 0           self->pending_count--;
702 0           release_cbt(cbt);
703              
704 0           invoke_error_cb(cb, errmsg);
705             }
706              
707 2           self->callback_depth--;
708 2           }
709              
710 0           static void push_cb(ev_mariadb_t *self, SV *cb) {
711 0           ev_mariadb_cb_t *cbt = alloc_cbt();
712 0           cbt->cb = SvREFCNT_inc(cb);
713 0           ngx_queue_insert_tail(&self->cb_queue, &cbt->queue);
714 0           self->pending_count++;
715 0           }
716              
717             /* Push cb to cb_queue, transferring ownership (no extra SvREFCNT_inc). */
718 0           static void push_cb_owned(ev_mariadb_t *self, SV *cb) {
719 0           ev_mariadb_cb_t *cbt = alloc_cbt();
720 0           cbt->cb = cb;
721 0           ngx_queue_insert_tail(&self->cb_queue, &cbt->queue);
722             /* pending_count was incremented by push_send; ownership transfers from send_queue to cb_queue */
723 0           }
724              
725 2           static SV* handler_accessor(SV **slot, SV *handler, int has_arg) {
726 2 50         if (has_arg) {
727 2 50         if (NULL != *slot) {
728 0           SvREFCNT_dec(*slot);
729 0           *slot = NULL;
730             }
731 2 50         if (NULL != handler && SvOK(handler) &&
    50          
732 2 50         SvROK(handler) && SvTYPE(SvRV(handler)) == SVt_PVCV) {
    50          
733 2           *slot = SvREFCNT_inc(handler);
734             }
735             }
736              
737 2           return (NULL != *slot)
738 2           ? SvREFCNT_inc(*slot)
739 4 50         : &PL_sv_undef;
740             }
741              
742             /* --- Multi-result drain --- */
743              
744 0           static void drain_multi_result(ev_mariadb_t *self) {
745             int status;
746              
747 0           self->draining = 1;
748 0           status = mysql_next_result_start(&self->op_ret, self->conn);
749 0 0         if (status != 0) {
750 0           self->state = STATE_NEXT_RESULT;
751 0           update_watchers(self, status);
752 0           return;
753             }
754 0           on_next_result_done(self);
755             }
756              
757 0           static void on_next_result_done(ev_mariadb_t *self) {
758             int status;
759              
760             for (;;) {
761 0 0         if (self->op_ret > 0) {
762             /* error in secondary result set — report and stop draining */
763 0           self->draining = 0;
764 0           self->state = STATE_IDLE;
765 0           self->callback_depth++;
766 0           emit_error(self, mysql_error(self->conn));
767 0           self->callback_depth--;
768 0 0         if (check_destroyed(self)) return;
769 0 0         if (self->state != STATE_IDLE) return;
770 0 0         if (self->conn && mysql_more_results(self->conn)) {
    0          
771 0           drain_multi_result(self);
772 0           return;
773             }
774 0           pipeline_advance(self);
775 0           return;
776             }
777              
778 0 0         if (self->op_ret == -1) {
779             /* no more results */
780 0           break;
781             }
782              
783             /* op_ret == 0: another result set is available */
784 0 0         if (mysql_field_count(self->conn) > 0) {
785 0           status = mysql_store_result_start(&self->op_result, self->conn);
786 0 0         if (status != 0) {
787 0           self->state = STATE_STORE_RESULT;
788 0           update_watchers(self, status);
789 0           return;
790             }
791             /* synchronous store — free and continue drain */
792 0 0         if (self->op_result) {
793 0           mysql_free_result(self->op_result);
794 0           self->op_result = NULL;
795             }
796             }
797              
798             /* DML result or sync store done — check for more */
799 0 0         if (!mysql_more_results(self->conn))
800 0           break;
801              
802 0           status = mysql_next_result_start(&self->op_ret, self->conn);
803 0 0         if (status != 0) {
804 0           self->state = STATE_NEXT_RESULT;
805 0           update_watchers(self, status);
806 0           return;
807             }
808             /* synchronous completion — loop instead of recursing */
809             }
810              
811 0           self->draining = 0;
812 0           self->state = STATE_IDLE;
813 0           pipeline_advance(self);
814             }
815              
816             /* --- Pipeline: send_query + read_query_result state machine --- */
817              
818 0           static void handle_send_failure(ev_mariadb_t *self, ev_mariadb_send_t *send) {
819             char errbuf[512];
820 0           SV *cb = send->cb;
821 0           Safefree(send->sql);
822 0           release_send(send);
823 0           self->pending_count--;
824              
825 0           COPY_ERROR(errbuf, mysql_error(self->conn));
826              
827 0           self->state = STATE_IDLE;
828 0           self->callback_depth++;
829 0           invoke_error_cb(cb, errbuf);
830 0           self->callback_depth--;
831 0 0         if (check_destroyed(self)) return;
832 0 0         if (self->state != STATE_IDLE) return;
833 0           cancel_pending(self, "send failed");
834 0 0         if (check_destroyed(self)) return;
835 0 0         if (self->state != STATE_IDLE) return;
836 0           drain_queues_silent(self);
837 0           cleanup_connection(self);
838             }
839              
840 0           static void on_send_done(ev_mariadb_t *self) {
841 0           ngx_queue_t *q = ngx_queue_head(&self->send_queue);
842 0           ev_mariadb_send_t *send = ngx_queue_data(q, ev_mariadb_send_t, queue);
843 0           ngx_queue_remove(q);
844              
845 0 0         if (self->op_ret != 0) {
846 0           handle_send_failure(self, send);
847 0           return;
848             }
849              
850 0           push_cb_owned(self, send->cb);
851 0           Safefree(send->sql);
852 0           release_send(send);
853 0           self->send_count++;
854              
855 0           self->state = STATE_IDLE;
856 0           pipeline_advance(self);
857             }
858              
859 0           static void on_read_result_done(ev_mariadb_t *self) {
860             int status;
861              
862 0 0         if (self->op_bool_ret != 0) {
863             /* query returned an error */
864 0           self->send_count--;
865 0           self->state = STATE_IDLE;
866 0 0         if (deliver_error(self, mysql_error(self->conn))) return;
867 0 0         if (self->state != STATE_IDLE) return;
868             /* drain any remaining result sets */
869 0 0         if (self->conn && mysql_more_results(self->conn)) {
    0          
870 0           drain_multi_result(self);
871 0           return;
872             }
873 0           pipeline_advance(self);
874 0           return;
875             }
876              
877 0 0         if (mysql_field_count(self->conn) > 0) {
878             /* has result set — store it */
879 0           status = mysql_store_result_start(&self->op_result, self->conn);
880 0 0         if (status != 0) {
881 0           self->state = STATE_STORE_RESULT;
882 0           update_watchers(self, status);
883 0           return;
884             }
885             /* synchronous store completion — fall through below */
886             }
887              
888             /* DML or synchronous store_result completion */
889             {
890 0           self->send_count--;
891 0           self->state = STATE_IDLE;
892 0 0         if (deliver_result(self)) return;
893 0 0         if (self->state != STATE_IDLE) return;
894             /* drain any extra result sets from multi-result queries */
895 0 0         if (self->conn && mysql_more_results(self->conn)) {
    0          
896 0           drain_multi_result(self);
897 0           return;
898             }
899 0           pipeline_advance(self);
900             }
901             }
902              
903             /* Called when store_result_cont completes (pipeline text query path) */
904 0           static void on_store_result_done(ev_mariadb_t *self) {
905 0 0         if (self->draining) {
906 0           MYSQL_RES *res = self->op_result;
907 0           self->op_result = NULL;
908             /* draining multi-result: free and continue */
909 0 0         if (res) mysql_free_result(res);
910 0 0         if (self->conn && mysql_more_results(self->conn)) {
    0          
911 0           int status = mysql_next_result_start(&self->op_ret, self->conn);
912 0 0         if (status != 0) {
913 0           self->state = STATE_NEXT_RESULT;
914 0           update_watchers(self, status);
915 0           return;
916             }
917 0           on_next_result_done(self);
918 0           return;
919             }
920 0           self->draining = 0;
921 0           self->state = STATE_IDLE;
922 0           pipeline_advance(self);
923 0           return;
924             }
925              
926 0           self->send_count--;
927 0           self->state = STATE_IDLE;
928              
929 0 0         if (deliver_result(self)) return;
930 0 0         if (self->state != STATE_IDLE) return;
931             /* drain any extra result sets */
932 0 0         if (self->conn && mysql_more_results(self->conn)) {
    0          
933 0           drain_multi_result(self);
934 0           return;
935             }
936 0           pipeline_advance(self);
937             }
938              
939             /*
940             * Pipeline orchestrator. Called when state == IDLE.
941             * Phase 1: send all queued queries via mysql_send_query.
942             * Phase 2: read next result via mysql_read_query_result.
943             */
944 0           static void pipeline_advance(ev_mariadb_t *self) {
945             int status;
946              
947             /* Ensure clean watcher state — previous operations may have left
948             * watchers active after completing synchronously within their
949             * done handlers. Without this, subsequent operations that need
950             * the same watcher direction would skip ev_io_start. */
951 0           stop_reading(self);
952 0           stop_writing(self);
953 0           stop_timer(self);
954              
955 0           send_phase:
956             /* Phase 1: send up to MAX_PIPELINE_DEPTH queries */
957 0 0         while (!ngx_queue_empty(&self->send_queue) &&
958 0 0         self->send_count < MAX_PIPELINE_DEPTH) {
959 0           ngx_queue_t *q = ngx_queue_head(&self->send_queue);
960 0           ev_mariadb_send_t *send = ngx_queue_data(q, ev_mariadb_send_t, queue);
961              
962 0           status = mysql_send_query_start(&self->op_ret, self->conn,
963 0           send->sql, send->sql_len);
964              
965 0 0         if (status != 0) {
966             /* need async IO to finish sending */
967 0           self->state = STATE_SEND;
968 0           update_watchers(self, status);
969 0           return;
970             }
971              
972             /* synchronous send completion */
973 0           ngx_queue_remove(q);
974              
975 0 0         if (self->op_ret != 0) {
976 0           handle_send_failure(self, send);
977 0           return;
978             }
979              
980 0           push_cb_owned(self, send->cb);
981 0           Safefree(send->sql);
982 0           release_send(send);
983 0           self->send_count++;
984             }
985              
986             /* Phase 2: read next result */
987 0 0         while (self->send_count > 0) {
988 0           status = mysql_read_query_result_start(&self->op_bool_ret, self->conn);
989              
990 0 0         if (status != 0) {
991 0           self->state = STATE_READ_RESULT;
992 0           update_watchers(self, status);
993 0           return;
994             }
995              
996             /* synchronous read completion */
997 0 0         if (self->op_bool_ret != 0) {
998             /* query error */
999 0           self->send_count--;
1000 0 0         if (deliver_error(self, mysql_error(self->conn))) return;
1001             /* callback may have started a new operation */
1002 0 0         if (self->state != STATE_IDLE) return;
1003 0 0         if (self->conn && mysql_more_results(self->conn)) {
    0          
1004 0           drain_multi_result(self);
1005 0           return;
1006             }
1007 0 0         if (!ngx_queue_empty(&self->send_queue)) goto send_phase;
1008 0           continue;
1009             }
1010              
1011 0 0         if (mysql_field_count(self->conn) > 0) {
1012             /* has result set */
1013 0           status = mysql_store_result_start(&self->op_result, self->conn);
1014 0 0         if (status != 0) {
1015 0           self->state = STATE_STORE_RESULT;
1016 0           update_watchers(self, status);
1017 0           return;
1018             }
1019             }
1020              
1021             {
1022 0           self->send_count--;
1023 0 0         if (deliver_result(self)) return;
1024             /* callback may have started a new operation */
1025 0 0         if (self->state != STATE_IDLE) return;
1026 0 0         if (self->conn && mysql_more_results(self->conn)) {
    0          
1027 0           drain_multi_result(self);
1028 0           return;
1029             }
1030             /* loop back to check send_queue (callback may have queued more) */
1031 0 0         if (!ngx_queue_empty(&self->send_queue)) goto send_phase;
1032             }
1033             }
1034              
1035 0           self->state = STATE_IDLE;
1036             }
1037              
1038             /* --- Connection --- */
1039              
1040 0           static void on_connect_done(ev_mariadb_t *self) {
1041 0           self->state = STATE_IDLE;
1042              
1043 0 0         if (self->op_conn_ret == NULL) {
1044             char errbuf[512];
1045 0           COPY_ERROR(errbuf, mysql_error(self->conn));
1046 0           self->callback_depth++;
1047 0           emit_error(self, errbuf);
1048 0           self->callback_depth--;
1049 0 0         if (check_destroyed(self)) return;
1050             /* on_error handler may have called reset(), starting a new connection;
1051             if state is no longer IDLE, don't touch the new connection */
1052 0 0         if (self->state != STATE_IDLE) return;
1053 0           cancel_pending(self, errbuf);
1054 0 0         if (check_destroyed(self)) return;
1055 0 0         if (self->state != STATE_IDLE) return;
1056 0           drain_queues_silent(self);
1057 0           cleanup_connection(self);
1058 0           return;
1059             }
1060              
1061             /* connected — reinit watchers for normal IO */
1062 0           stop_reading(self);
1063 0           stop_writing(self);
1064 0           stop_timer(self);
1065              
1066 0           self->fd = mysql_get_socket(self->conn);
1067              
1068 0 0         if (self->fd < 0) {
1069 0           self->callback_depth++;
1070 0           emit_error(self, "mysql_get_socket returned invalid fd");
1071 0           self->callback_depth--;
1072 0 0         if (check_destroyed(self)) return;
1073 0 0         if (self->state != STATE_IDLE) return;
1074 0           cancel_pending(self, "invalid fd");
1075 0 0         if (check_destroyed(self)) return;
1076 0 0         if (self->state != STATE_IDLE) return;
1077 0           drain_queues_silent(self);
1078 0           cleanup_connection(self);
1079 0           return;
1080             }
1081              
1082 0           init_io_watchers(self);
1083              
1084 0 0         if (NULL != self->on_connect) {
1085 0           SV *cb = sv_2mortal(SvREFCNT_inc_simple_NN(self->on_connect));
1086 0           self->callback_depth++;
1087              
1088             {
1089 0           dSP;
1090 0           ENTER;
1091 0           SAVETMPS;
1092 0 0         PUSHMARK(SP);
1093 0           PUTBACK;
1094              
1095 0           call_sv(cb, G_DISCARD | G_EVAL);
1096 0 0         if (SvTRUE(ERRSV)) {
    0          
1097 0 0         warn("EV::MariaDB: exception in connect handler: %s", SvPV_nolen(ERRSV));
1098 0 0         sv_setpvn(ERRSV, "", 0);
1099             }
1100              
1101 0 0         FREETMPS;
1102 0           LEAVE;
1103             }
1104              
1105 0           self->callback_depth--;
1106 0 0         if (check_destroyed(self)) return;
1107             }
1108              
1109             /* start pipeline if queries were queued during connection */
1110 0           maybe_pipeline(self);
1111             }
1112              
1113             /* --- Prepared statements --- */
1114              
1115 0           static void on_stmt_prepare_done(ev_mariadb_t *self) {
1116 0           MYSQL_STMT *stmt = self->op_stmt;
1117 0           self->op_stmt = NULL;
1118 0           self->state = STATE_IDLE;
1119              
1120 0 0         if (self->op_ret != 0) {
1121             char errbuf[512];
1122 0           COPY_ERROR(errbuf, mysql_stmt_error(stmt));
1123 0           mysql_stmt_close(stmt);
1124 0 0         if (deliver_error(self, errbuf)) return;
1125 0           maybe_pipeline(self);
1126 0           return;
1127             }
1128              
1129             {
1130             ev_mariadb_stmt_t *ctx;
1131 0           Newxz(ctx, 1, ev_mariadb_stmt_t);
1132 0           ctx->stmt = stmt;
1133 0           ctx->next = self->stmt_list;
1134 0           self->stmt_list = ctx;
1135 0 0         if (deliver_value(self, newSViv(PTR2IV(ctx)))) return;
1136             }
1137 0           maybe_pipeline(self);
1138             }
1139              
1140 0           static void on_stmt_execute_done(ev_mariadb_t *self) {
1141 0           MYSQL_STMT *stmt = self->op_stmt;
1142 0           self->op_stmt = NULL;
1143 0           self->state = STATE_IDLE;
1144              
1145 0 0         if (self->op_ret != 0) {
1146 0 0         if (deliver_error(self, mysql_stmt_error(stmt))) return;
1147 0           maybe_pipeline(self);
1148 0           return;
1149             }
1150              
1151             {
1152             MYSQL_RES *meta;
1153             SV *cb;
1154 0           meta = mysql_stmt_result_metadata(stmt);
1155 0           self->op_result = meta;
1156 0           cb = pop_cb(self);
1157 0 0         if (cb == NULL) {
1158 0 0         if (self->op_result) {
1159 0           mysql_stmt_free_result(stmt);
1160 0           mysql_free_result(self->op_result);
1161 0           self->op_result = NULL;
1162             }
1163 0           maybe_pipeline(self);
1164 0           return;
1165             }
1166              
1167 0           self->callback_depth++;
1168             {
1169 0           dSP;
1170 0           ENTER;
1171 0           SAVETMPS;
1172 0 0         PUSHMARK(SP);
1173              
1174 0 0         if (self->op_result == NULL) {
1175 0           my_ulonglong affected = mysql_stmt_affected_rows(stmt);
1176 0           PUSHs(sv_2mortal(newSVuv((UV)affected)));
1177             }
1178             else {
1179 0           unsigned int ncols = mysql_num_fields(self->op_result);
1180 0           MYSQL_FIELD *fields = mysql_fetch_fields(self->op_result);
1181             MYSQL_BIND *bind;
1182             unsigned long *lengths;
1183             my_bool *is_null;
1184             char **buffers;
1185             unsigned int c;
1186 0           AV *rows = newAV();
1187             int fetch_ret;
1188              
1189 0           Newxz(bind, ncols, MYSQL_BIND);
1190 0           SAVEFREEPV(bind);
1191 0           Newx(lengths, ncols, unsigned long);
1192 0           SAVEFREEPV(lengths);
1193 0           Newx(is_null, ncols, my_bool);
1194 0           SAVEFREEPV(is_null);
1195 0           Newx(buffers, ncols, char *);
1196 0           SAVEFREEPV(buffers);
1197              
1198 0 0         for (c = 0; c < ncols; c++) {
1199 0           unsigned long buflen = fields[c].max_length;
1200 0 0         if (buflen < 256) buflen = 256;
1201 0           Newx(buffers[c], buflen, char);
1202 0           SAVEFREEPV(buffers[c]);
1203 0           bind[c].buffer_type = MYSQL_TYPE_STRING;
1204 0           bind[c].buffer = buffers[c];
1205 0           bind[c].buffer_length = buflen;
1206 0           bind[c].length = &lengths[c];
1207 0           bind[c].is_null = &is_null[c];
1208             }
1209              
1210 0 0         if (mysql_stmt_bind_result(stmt, bind)) {
1211 0           mysql_stmt_free_result(stmt);
1212 0           mysql_free_result(self->op_result);
1213 0           self->op_result = NULL;
1214 0           SvREFCNT_dec((SV*)rows);
1215              
1216 0           PUSHs(&PL_sv_undef);
1217 0           PUSHs(sv_2mortal(newSVpv(mysql_stmt_error(stmt), 0)));
1218 0           goto invoke;
1219             }
1220              
1221 0 0         while ((fetch_ret = mysql_stmt_fetch(stmt)) == 0) {
1222 0           AV *row = newAV();
1223 0 0         if (ncols > 0) av_extend(row, ncols - 1);
1224 0 0         for (c = 0; c < ncols; c++) {
1225 0 0         if (is_null[c]) {
1226 0           av_push(row, newSV(0));
1227             } else {
1228 0           SV *val = newSVpvn(buffers[c], lengths[c]);
1229 0 0         if (self->utf8 && is_utf8_charset(fields[c].charsetnr))
    0          
1230 0           SvUTF8_on(val);
1231 0           av_push(row, val);
1232             }
1233             }
1234 0           av_push(rows, newRV_noinc((SV*)row));
1235             }
1236              
1237 0 0         if (fetch_ret != 0 && fetch_ret != MYSQL_NO_DATA) {
    0          
1238 0           mysql_free_result(self->op_result);
1239 0           self->op_result = NULL;
1240 0           mysql_stmt_free_result(stmt);
1241 0           SvREFCNT_dec((SV*)rows);
1242 0           PUSHs(&PL_sv_undef);
1243 0 0         if (fetch_ret == MYSQL_DATA_TRUNCATED)
1244 0           PUSHs(sv_2mortal(newSVpvs("data truncated")));
1245             else
1246 0           PUSHs(sv_2mortal(newSVpv(mysql_stmt_error(stmt), 0)));
1247             } else {
1248 0           AV *fnames = build_field_names(self->conn, fields, ncols);
1249 0           mysql_free_result(self->op_result);
1250 0           self->op_result = NULL;
1251 0           mysql_stmt_free_result(stmt);
1252 0           PUSHs(sv_2mortal(newRV_noinc((SV*)rows)));
1253 0           PUSHs(&PL_sv_undef);
1254 0           PUSHs(sv_2mortal(newRV_noinc((SV*)fnames)));
1255             }
1256             }
1257              
1258 0           invoke:
1259 0           PUTBACK;
1260 0           invoke_cb(cb);
1261 0 0         FREETMPS;
1262 0           LEAVE;
1263             }
1264 0           self->callback_depth--;
1265 0 0         if (check_destroyed(self)) return;
1266 0           maybe_pipeline(self);
1267             }
1268             }
1269              
1270 0           static void on_stmt_store_done(ev_mariadb_t *self) {
1271 0 0         if (self->op_ret != 0) {
1272 0           MYSQL_STMT *stmt = self->op_stmt;
1273 0           self->op_stmt = NULL;
1274 0           self->state = STATE_IDLE;
1275 0 0         if (deliver_error(self, mysql_stmt_error(stmt))) return;
1276 0           maybe_pipeline(self);
1277 0           return;
1278             }
1279 0           on_stmt_execute_done(self);
1280             }
1281              
1282 0           static void unlink_stmt(ev_mariadb_t *self, ev_mariadb_stmt_t *ctx) {
1283 0           ev_mariadb_stmt_t **pp = &self->stmt_list;
1284 0 0         while (*pp) {
1285 0 0         if (*pp == ctx) { *pp = ctx->next; return; }
1286 0           pp = &(*pp)->next;
1287             }
1288             }
1289              
1290 0           static void on_stmt_close_done(ev_mariadb_t *self) {
1291 0           self->op_stmt = NULL; /* stmt freed by mysql_stmt_close */
1292 0 0         if (self->op_stmt_ctx) {
1293 0           unlink_stmt(self, self->op_stmt_ctx);
1294 0           Safefree(self->op_stmt_ctx);
1295 0           self->op_stmt_ctx = NULL;
1296             }
1297 0           self->state = STATE_IDLE;
1298 0 0         if (self->op_bool_ret != 0) {
1299 0 0         if (deliver_error(self, mysql_error(self->conn))) return;
1300             } else {
1301 0 0         if (deliver_value(self, newSViv(1))) return;
1302             }
1303 0           maybe_pipeline(self);
1304             }
1305              
1306 0           static void on_stmt_reset_done(ev_mariadb_t *self) {
1307 0           MYSQL_STMT *stmt = self->op_stmt;
1308 0           self->op_stmt = NULL;
1309 0           self->state = STATE_IDLE;
1310 0 0         if (self->op_bool_ret != 0) {
1311 0 0         if (deliver_error(self, mysql_stmt_error(stmt))) return;
1312             } else {
1313 0 0         if (deliver_value(self, newSViv(1))) return;
1314             }
1315 0           maybe_pipeline(self);
1316             }
1317              
1318             /* --- Async utility operation done handler --- */
1319              
1320             /* mysql_reset_connection / mysql_change_user drop all server-side prepared
1321             statements; invalidate every tracked wrapper so user code can't keep using
1322             them. Closes the local libmariadb stmt state and marks closed=1, mirroring
1323             cleanup_connection's pattern but without touching the live connection. */
1324 0           static void invalidate_all_stmts(ev_mariadb_t *self) {
1325 0           ev_mariadb_stmt_t *ctx = self->stmt_list;
1326 0 0         while (ctx) {
1327 0           free_stmt_bind_params(ctx);
1328 0 0         if (ctx->stmt) {
1329 0           mysql_stmt_close(ctx->stmt);
1330 0           ctx->stmt = NULL;
1331             }
1332 0           ctx->closed = 1;
1333 0           ctx = ctx->next;
1334             }
1335 0           self->op_stmt = NULL;
1336 0           self->op_stmt_ctx = NULL;
1337 0           }
1338              
1339             /* Commit (success) or discard (failure) any pending utility-op cache update. */
1340 0           static void resolve_pending_cache(ev_mariadb_t *self, int failed) {
1341 0 0         if (failed) {
1342 0           FREE_STR(self->pending_user);
1343 0           FREE_STR(self->pending_password);
1344 0           FREE_STR(self->pending_database);
1345 0           FREE_STR(self->pending_charset);
1346 0           return;
1347             }
1348 0 0         if (self->pending_user) {
1349 0           Safefree(self->user);
1350 0           self->user = self->pending_user;
1351 0           self->pending_user = NULL;
1352             }
1353 0 0         if (self->pending_password) {
1354 0           Safefree(self->password);
1355 0           self->password = self->pending_password;
1356 0           self->pending_password = NULL;
1357             }
1358 0 0         if (self->pending_database) {
1359 0           Safefree(self->database);
1360 0           self->database = self->pending_database;
1361 0           self->pending_database = NULL;
1362             }
1363 0 0         if (self->pending_charset) {
1364 0           Safefree(self->charset);
1365 0           self->charset = self->pending_charset;
1366 0           self->pending_charset = NULL;
1367             }
1368             }
1369              
1370 0           static void on_utility_done(ev_mariadb_t *self, int failed) {
1371 0           enum ev_mariadb_state prev = self->state;
1372 0           self->state = STATE_IDLE;
1373 0           resolve_pending_cache(self, failed);
1374 0 0         if (!failed && (prev == STATE_RESET_CONNECTION || prev == STATE_CHANGE_USER))
    0          
    0          
1375 0           invalidate_all_stmts(self);
1376 0 0         if (failed) {
1377 0 0         if (deliver_error(self, mysql_error(self->conn))) return;
1378             } else {
1379 0 0         if (deliver_value(self, newSViv(1))) return;
1380             }
1381 0           maybe_pipeline(self);
1382             }
1383              
1384             /* Trailing dispatch shared by every utility op start/cont site:
1385             completed → on_utility_done; otherwise → arm watchers. */
1386 0           static void dispatch_utility(ev_mariadb_t *self, int status, int failed) {
1387 0 0         if (status == 0) on_utility_done(self, failed);
1388 0           else update_watchers(self, status);
1389 0           }
1390              
1391 0           static void free_stmt_bind_params(ev_mariadb_stmt_t *ctx) {
1392             int i;
1393 0 0         if (!ctx->bind_params) return;
1394 0 0         for (i = 0; i < ctx->bind_param_count; i++) {
1395 0           Safefree(ctx->bind_params[i].buffer);
1396             }
1397 0           Safefree(ctx->bind_params);
1398 0           ctx->bind_params = NULL;
1399 0           ctx->bind_param_count = 0;
1400             }
1401              
1402 0           static void transition_to_stmt_store(ev_mariadb_t *self) {
1403             int status;
1404 0           self->op_stmt_ctx = NULL; /* bind_params live in wrapper, not freed here */
1405 0 0         if (self->op_ret != 0) {
1406 0           on_stmt_execute_done(self);
1407             } else {
1408 0           self->state = STATE_STMT_STORE;
1409 0           status = mysql_stmt_store_result_start(&self->op_ret, self->op_stmt);
1410 0 0         if (status == 0) {
1411 0           on_stmt_store_done(self);
1412             } else {
1413 0           update_watchers(self, status);
1414             }
1415             }
1416 0           }
1417              
1418             /* --- Streaming query (mysql_use_result + fetch_row) --- */
1419              
1420 0           static void stream_error(ev_mariadb_t *self) {
1421 0           SV *cb = self->stream_cb;
1422             char errbuf[512];
1423 0           COPY_ERROR(errbuf, mysql_error(self->conn));
1424 0           self->stream_cb = NULL;
1425 0           self->state = STATE_IDLE;
1426 0           self->pending_count--;
1427 0           self->callback_depth++;
1428 0           invoke_error_cb(cb, errbuf);
1429 0           self->callback_depth--;
1430 0           }
1431              
1432 0           static void on_real_query_done(ev_mariadb_t *self) {
1433             int status;
1434              
1435 0 0         if (self->op_ret != 0) {
1436 0           stream_error(self);
1437 0 0         if (check_destroyed(self)) return;
1438 0           maybe_pipeline(self);
1439 0           return;
1440             }
1441              
1442 0           self->op_result = mysql_use_result(self->conn);
1443 0 0         if (!self->op_result) {
1444 0           const char *err = mysql_error(self->conn);
1445 0 0         if (err && err[0]) {
    0          
1446 0           stream_error(self);
1447 0 0         if (check_destroyed(self)) return;
1448             } else {
1449             /* DML or no-result query: deliver EOF to stream callback */
1450 0           SV *cb = self->stream_cb;
1451 0           self->stream_cb = NULL;
1452 0           self->state = STATE_IDLE;
1453 0           self->pending_count--;
1454 0           self->callback_depth++;
1455 0           invoke_eof_cb(cb);
1456 0           self->callback_depth--;
1457 0 0         if (check_destroyed(self)) return;
1458             }
1459 0           maybe_pipeline(self);
1460 0           return;
1461             }
1462              
1463 0           status = mysql_fetch_row_start(&self->op_row, self->op_result);
1464 0 0         if (status == 0) {
1465 0           on_stream_fetch_done(self);
1466             } else {
1467 0           self->state = STATE_STREAM_FETCH;
1468 0           update_watchers(self, status);
1469             }
1470             }
1471              
1472 0           static void on_stream_fetch_done(ev_mariadb_t *self) {
1473 0 0         MYSQL_FIELD *fields = (self->utf8 && self->op_result)
1474 0 0         ? mysql_fetch_fields(self->op_result) : NULL;
1475              
1476 0           for (;;) {
1477 0           MYSQL_ROW row = self->op_row;
1478              
1479 0 0         if (row == NULL) break;
1480              
1481             /* deliver row to callback */
1482             {
1483 0           unsigned int ncols = mysql_num_fields(self->op_result);
1484 0           unsigned long *lengths = mysql_fetch_lengths(self->op_result);
1485 0           AV *r = newAV();
1486             unsigned int c;
1487              
1488 0 0         if (ncols > 0) av_extend(r, ncols - 1);
1489 0 0         for (c = 0; c < ncols; c++) {
1490 0 0         if (row[c] == NULL) {
1491 0           av_push(r, newSV(0));
1492             } else {
1493 0           SV *val = newSVpvn(row[c], lengths[c]);
1494 0 0         if (fields && is_utf8_charset(fields[c].charsetnr))
    0          
1495 0           SvUTF8_on(val);
1496 0           av_push(r, val);
1497             }
1498             }
1499              
1500             /* Detach stream_cb before invoking: prevents cancel_pending
1501             from double-firing the callback if finish() is called inside */
1502             {
1503 0           SV *saved_cb = self->stream_cb;
1504 0           self->stream_cb = NULL;
1505 0           self->pending_count--;
1506              
1507 0           self->callback_depth++;
1508             {
1509 0           SV *cb = sv_2mortal(SvREFCNT_inc_simple_NN(saved_cb));
1510 0           dSP;
1511 0           ENTER;
1512 0           SAVETMPS;
1513 0 0         PUSHMARK(SP);
1514 0           PUSHs(sv_2mortal(newRV_noinc((SV*)r)));
1515 0           PUTBACK;
1516 0           call_sv(cb, G_DISCARD | G_EVAL);
1517 0 0         if (SvTRUE(ERRSV)) {
    0          
1518 0 0         warn("EV::MariaDB: exception in stream callback: %s", SvPV_nolen(ERRSV));
1519 0 0         sv_setpvn(ERRSV, "", 0);
1520             }
1521 0 0         FREETMPS;
1522 0           LEAVE;
1523             }
1524 0           self->callback_depth--;
1525 0 0         if (check_destroyed(self)) {
1526 0           SvREFCNT_dec(saved_cb);
1527 0           return;
1528             }
1529              
1530             /* Was stream cancelled (finish/reset from callback)? */
1531 0 0         if (self->stream_cb != NULL) {
1532             /* Defence: unreachable via the public API — query_stream's
1533             CHECK_READY rejects the only way stream_cb could be
1534             reset while we're inside on_stream_fetch_done. Drop the
1535             saved cb so the (hypothetical) new stream owns the slot. */
1536 0           SvREFCNT_dec(saved_cb);
1537 0 0         } else if (self->conn == NULL || self->op_result == NULL) {
    0          
1538             /* connection torn down — stream is over */
1539 0           SvREFCNT_dec(saved_cb);
1540 0           return;
1541             } else {
1542             /* normal: restore for next row */
1543 0           self->stream_cb = saved_cb;
1544 0           self->pending_count++;
1545             }
1546             }
1547             }
1548              
1549             /* fetch next row */
1550             {
1551 0           int status = mysql_fetch_row_start(&self->op_row, self->op_result);
1552 0 0         if (status != 0) {
1553 0           self->state = STATE_STREAM_FETCH;
1554 0           update_watchers(self, status);
1555 0           return;
1556             }
1557             }
1558             }
1559              
1560             /* row == NULL: EOF or error */
1561             {
1562 0           SV *cb = self->stream_cb;
1563             char errbuf[512];
1564 0           const char *err = mysql_error(self->conn);
1565 0 0         int has_error = (err && err[0]);
    0          
1566 0 0         if (has_error) COPY_ERROR(errbuf, err);
1567              
1568 0           self->stream_cb = NULL;
1569 0           mysql_free_result(self->op_result);
1570 0           self->op_result = NULL;
1571 0           self->state = STATE_IDLE;
1572 0           self->pending_count--;
1573              
1574 0           self->callback_depth++;
1575 0 0         if (has_error) {
1576 0           invoke_error_cb(cb, errbuf);
1577             } else {
1578 0           invoke_eof_cb(cb);
1579             }
1580 0           self->callback_depth--;
1581 0 0         if (check_destroyed(self)) return;
1582             /* drain any secondary result sets from multi_statements */
1583 0 0         if (self->conn && mysql_more_results(self->conn)) {
    0          
1584 0           drain_multi_result(self);
1585 0           return;
1586             }
1587 0           maybe_pipeline(self);
1588             }
1589             }
1590              
1591             /* --- Async close --- */
1592              
1593 0           static void on_close_done(ev_mariadb_t *self) {
1594             SV *cb;
1595              
1596             /* conn was freed by mysql_close */
1597 0           self->conn = NULL;
1598 0           self->fd = -1;
1599 0           stop_reading(self);
1600 0           stop_writing(self);
1601 0           stop_timer(self);
1602 0           self->state = STATE_IDLE;
1603              
1604             /* pop our callback before cancel_pending fires errors for remaining items */
1605 0           cb = pop_cb(self);
1606              
1607             /* cancel any items queued from within a callback before close_async */
1608 0           cancel_pending(self, "connection closed");
1609 0 0         if (check_destroyed(self)) {
1610 0           SvREFCNT_dec(cb);
1611 0           return;
1612             }
1613              
1614 0 0         if (cb) {
1615 0           SV *val = newSViv(1);
1616 0           self->callback_depth++;
1617             {
1618 0           dSP;
1619 0           ENTER;
1620 0           SAVETMPS;
1621 0 0         PUSHMARK(SP);
1622 0           PUSHs(sv_2mortal(val));
1623 0           PUTBACK;
1624 0           invoke_cb(cb);
1625 0 0         FREETMPS;
1626 0           LEAVE;
1627             }
1628 0           self->callback_depth--;
1629 0 0         if (check_destroyed(self)) return;
1630             }
1631             /* no maybe_pipeline: conn is NULL, nothing can be sent */
1632             }
1633              
1634             /* --- Send long data done --- */
1635              
1636 0           static void on_send_long_data_done(ev_mariadb_t *self) {
1637 0           MYSQL_STMT *stmt = self->op_stmt;
1638              
1639 0 0         if (self->op_data_ptr) {
1640 0           Safefree(self->op_data_ptr);
1641 0           self->op_data_ptr = NULL;
1642             }
1643 0           self->op_stmt = NULL;
1644 0           self->state = STATE_IDLE;
1645              
1646 0 0         if (self->op_bool_ret != 0) {
1647 0 0         if (deliver_error(self, mysql_stmt_error(stmt))) return;
1648             } else {
1649 0 0         if (deliver_value(self, newSViv(1))) return;
1650             }
1651 0           maybe_pipeline(self);
1652             }
1653              
1654             /* --- Main continuation dispatcher --- */
1655              
1656 0           static void continue_operation(ev_mariadb_t *self, int events) {
1657             int status;
1658              
1659 0           switch (self->state) {
1660 0           case STATE_CONNECTING:
1661 0           status = mysql_real_connect_cont(&self->op_conn_ret, self->conn, events);
1662 0 0         if (status == 0) {
1663 0           on_connect_done(self);
1664             } else {
1665 0           update_watchers(self, status);
1666             }
1667 0           break;
1668              
1669 0           case STATE_SEND:
1670 0           status = mysql_send_query_cont(&self->op_ret, self->conn, events);
1671 0 0         if (status == 0) {
1672 0           on_send_done(self);
1673             } else {
1674 0           update_watchers(self, status);
1675             }
1676 0           break;
1677              
1678 0           case STATE_READ_RESULT:
1679 0           status = mysql_read_query_result_cont(&self->op_bool_ret, self->conn, events);
1680 0 0         if (status == 0) {
1681 0           on_read_result_done(self);
1682             } else {
1683 0           update_watchers(self, status);
1684             }
1685 0           break;
1686              
1687 0           case STATE_STORE_RESULT:
1688 0           status = mysql_store_result_cont(&self->op_result, self->conn, events);
1689 0 0         if (status == 0) {
1690 0           on_store_result_done(self);
1691             } else {
1692 0           update_watchers(self, status);
1693             }
1694 0           break;
1695              
1696 0           case STATE_NEXT_RESULT:
1697 0           status = mysql_next_result_cont(&self->op_ret, self->conn, events);
1698 0 0         if (status == 0) {
1699 0           on_next_result_done(self);
1700             } else {
1701 0           update_watchers(self, status);
1702             }
1703 0           break;
1704              
1705 0           case STATE_STMT_PREPARE:
1706 0           status = mysql_stmt_prepare_cont(&self->op_ret, self->op_stmt, events);
1707 0 0         if (status == 0) {
1708 0           on_stmt_prepare_done(self);
1709             } else {
1710 0           update_watchers(self, status);
1711             }
1712 0           break;
1713              
1714 0           case STATE_STMT_EXECUTE:
1715 0           status = mysql_stmt_execute_cont(&self->op_ret, self->op_stmt, events);
1716 0 0         if (status == 0) {
1717 0           transition_to_stmt_store(self);
1718             } else {
1719 0           update_watchers(self, status);
1720             }
1721 0           break;
1722              
1723 0           case STATE_STMT_STORE:
1724 0           status = mysql_stmt_store_result_cont(&self->op_ret, self->op_stmt, events);
1725 0 0         if (status == 0) {
1726 0           on_stmt_store_done(self);
1727             } else {
1728 0           update_watchers(self, status);
1729             }
1730 0           break;
1731              
1732 0           case STATE_STMT_CLOSE:
1733 0           status = mysql_stmt_close_cont(&self->op_bool_ret, self->op_stmt, events);
1734 0 0         if (status == 0) {
1735 0           on_stmt_close_done(self);
1736             } else {
1737 0           update_watchers(self, status);
1738             }
1739 0           break;
1740              
1741 0           case STATE_STMT_RESET:
1742 0           status = mysql_stmt_reset_cont(&self->op_bool_ret, self->op_stmt, events);
1743 0 0         if (status == 0) {
1744 0           on_stmt_reset_done(self);
1745             } else {
1746 0           update_watchers(self, status);
1747             }
1748 0           break;
1749              
1750 0           case STATE_PING:
1751 0           status = mysql_ping_cont(&self->op_ret, self->conn, events);
1752 0           dispatch_utility(self, status, self->op_ret != 0);
1753 0           break;
1754              
1755 0           case STATE_CHANGE_USER:
1756 0           status = mysql_change_user_cont(&self->op_bool_ret, self->conn, events);
1757 0           dispatch_utility(self, status, self->op_bool_ret != 0);
1758 0           break;
1759              
1760 0           case STATE_SELECT_DB:
1761 0           status = mysql_select_db_cont(&self->op_ret, self->conn, events);
1762 0           dispatch_utility(self, status, self->op_ret != 0);
1763 0           break;
1764              
1765 0           case STATE_RESET_CONNECTION:
1766 0           status = mysql_reset_connection_cont(&self->op_ret, self->conn, events);
1767 0           dispatch_utility(self, status, self->op_ret != 0);
1768 0           break;
1769              
1770 0           case STATE_SET_CHARSET:
1771 0           status = mysql_set_character_set_cont(&self->op_ret, self->conn, events);
1772 0           dispatch_utility(self, status, self->op_ret != 0);
1773 0           break;
1774              
1775 0           case STATE_COMMIT:
1776 0           status = mysql_commit_cont(&self->op_bool_ret, self->conn, events);
1777 0           dispatch_utility(self, status, self->op_bool_ret != 0);
1778 0           break;
1779              
1780 0           case STATE_ROLLBACK:
1781 0           status = mysql_rollback_cont(&self->op_bool_ret, self->conn, events);
1782 0           dispatch_utility(self, status, self->op_bool_ret != 0);
1783 0           break;
1784              
1785 0           case STATE_AUTOCOMMIT:
1786 0           status = mysql_autocommit_cont(&self->op_bool_ret, self->conn, events);
1787 0           dispatch_utility(self, status, self->op_bool_ret != 0);
1788 0           break;
1789              
1790 0           case STATE_STMT_SEND_LONG_DATA:
1791 0           status = mysql_stmt_send_long_data_cont(&self->op_bool_ret, self->op_stmt, events);
1792 0 0         if (status == 0) {
1793 0           on_send_long_data_done(self);
1794             } else {
1795 0           update_watchers(self, status);
1796             }
1797 0           break;
1798              
1799 0           case STATE_REAL_QUERY:
1800 0           status = mysql_real_query_cont(&self->op_ret, self->conn, events);
1801 0 0         if (status == 0) {
1802 0           on_real_query_done(self);
1803             } else {
1804 0           update_watchers(self, status);
1805             }
1806 0           break;
1807              
1808 0           case STATE_STREAM_FETCH:
1809 0           status = mysql_fetch_row_cont(&self->op_row, self->op_result, events);
1810 0 0         if (status == 0) {
1811 0           on_stream_fetch_done(self);
1812             } else {
1813 0           update_watchers(self, status);
1814             }
1815 0           break;
1816              
1817 0           case STATE_CLOSE:
1818 0           status = mysql_close_cont(self->conn, events);
1819 0 0         if (status == 0) {
1820 0           on_close_done(self);
1821             } else {
1822 0           update_watchers(self, status);
1823             }
1824 0           break;
1825              
1826 0           default:
1827 0           warn("EV::MariaDB: unexpected state %d in continue_operation", self->state);
1828 0           break;
1829             }
1830 0           }
1831              
1832 0           static void io_cb(EV_P_ ev_io *w, int revents) {
1833 0           ev_mariadb_t *self = (ev_mariadb_t *)w->data;
1834 0           int events = 0;
1835             (void)loop;
1836              
1837 0 0         if (self->magic != EV_MARIADB_MAGIC) return;
1838 0 0         if (NULL == self->conn) return;
1839              
1840 0 0         if (revents & EV_READ) events |= MYSQL_WAIT_READ | MYSQL_WAIT_EXCEPT;
1841 0 0         if (revents & EV_WRITE) events |= MYSQL_WAIT_WRITE;
1842              
1843 0           continue_operation(self, events);
1844             }
1845              
1846 0           static void timer_cb(EV_P_ ev_timer *w, int revents) {
1847 0           ev_mariadb_t *self = (ev_mariadb_t *)w->data;
1848             (void)loop;
1849             (void)revents;
1850              
1851 0 0         if (self->magic != EV_MARIADB_MAGIC) return;
1852 0 0         if (NULL == self->conn) return;
1853              
1854 0           self->timing = 0;
1855 0           continue_operation(self, MYSQL_WAIT_TIMEOUT);
1856             }
1857              
1858 0           static char* safe_strdup(const char *s) {
1859             char *d;
1860             size_t len;
1861 0 0         if (!s) return NULL;
1862 0           len = strlen(s);
1863 0           Newx(d, len + 1, char);
1864 0           Copy(s, d, len + 1, char);
1865 0           return d;
1866             }
1867              
1868             /* newSVpv on a non-empty string, &PL_sv_undef otherwise. Used by accessors
1869             that wrap a libmariadb getter returning a NUL-terminated string or NULL. */
1870 0           static SV* str_or_undef(const char *s) {
1871 0 0         return (s && s[0]) ? newSVpv(s, 0) : &PL_sv_undef;
    0          
1872             }
1873              
1874             /* Free *slot then replace it with a fresh copy of src (or NULL). */
1875 0           static void replace_str(char **slot, const char *src) {
1876 0           Safefree(*slot);
1877 0           *slot = safe_strdup(src);
1878 0           }
1879              
1880 2           static void free_connect_strings(ev_mariadb_t *self) {
1881 2           FREE_STR(self->host);
1882 2           FREE_STR(self->user);
1883 2           FREE_STR(self->password);
1884 2           FREE_STR(self->database);
1885 2           FREE_STR(self->unix_socket);
1886 2           }
1887              
1888 2           static void free_option_strings(ev_mariadb_t *self) {
1889 2           FREE_STR(self->charset);
1890 2           FREE_STR(self->init_command);
1891 2           FREE_STR(self->ssl_key);
1892 2           FREE_STR(self->ssl_cert);
1893 2           FREE_STR(self->ssl_ca);
1894 2           FREE_STR(self->ssl_capath);
1895 2           FREE_STR(self->ssl_cipher);
1896 2           }
1897              
1898 0           static void apply_options(ev_mariadb_t *self) {
1899 0           MYSQL *conn = self->conn;
1900 0           unsigned long flags = self->client_flags;
1901              
1902 0 0         if (self->connect_timeout > 0)
1903 0           mysql_options(conn, MYSQL_OPT_CONNECT_TIMEOUT, &self->connect_timeout);
1904 0 0         if (self->read_timeout > 0)
1905 0           mysql_options(conn, MYSQL_OPT_READ_TIMEOUT, &self->read_timeout);
1906 0 0         if (self->write_timeout > 0)
1907 0           mysql_options(conn, MYSQL_OPT_WRITE_TIMEOUT, &self->write_timeout);
1908 0 0         if (self->compress)
1909 0           mysql_options(conn, MYSQL_OPT_COMPRESS, NULL);
1910 0 0         if (self->charset)
1911 0           mysql_options(conn, MYSQL_SET_CHARSET_NAME, self->charset);
1912 0 0         if (self->init_command)
1913 0           mysql_options(conn, MYSQL_INIT_COMMAND, self->init_command);
1914 0 0         if (self->ssl_ca || self->ssl_capath || self->ssl_cert || self->ssl_key || self->ssl_cipher)
    0          
    0          
    0          
    0          
1915 0           mysql_ssl_set(conn, self->ssl_key, self->ssl_cert, self->ssl_ca, self->ssl_capath, self->ssl_cipher);
1916 0 0         if (self->ssl_verify_server_cert) {
1917 0           my_bool val = 1;
1918 0           mysql_options(conn, MYSQL_OPT_SSL_VERIFY_SERVER_CERT, &val);
1919             }
1920 0 0         if (self->multi_statements)
1921 0           flags |= CLIENT_MULTI_STATEMENTS | CLIENT_MULTI_RESULTS;
1922 0 0         if (self->found_rows)
1923 0           flags |= CLIENT_FOUND_ROWS;
1924              
1925 0           self->client_flags = flags;
1926 0           }
1927              
1928 0           static int is_utf8_charset(unsigned int charsetnr) {
1929             /* utf8mb3 collations: 33, 83, 192-211 */
1930             /* utf8mb4 collations: 45, 46, 224-247, 255, 256-309 */
1931             /* MariaDB 10.10+ uca1400: utf8mb3 2048-2303, utf8mb4 2304-2559 */
1932             /* (2560+ are ucs2/utf16/utf32 uca1400 — NOT UTF-8) */
1933 0 0         return charsetnr == 33 || charsetnr == 83
1934 0 0         || (charsetnr >= 192 && charsetnr <= 211)
    0          
1935 0 0         || charsetnr == 45 || charsetnr == 46
    0          
1936 0 0         || (charsetnr >= 224 && charsetnr <= 247)
    0          
1937 0 0         || charsetnr == 255
1938 0 0         || (charsetnr >= 256 && charsetnr <= 309)
    0          
1939 0 0         || (charsetnr >= 2048 && charsetnr <= 2559);
    0          
    0          
1940             }
1941              
1942 0           static int conn_charset_is_utf8(MYSQL *conn) {
1943             const char *cs;
1944 0 0         if (!conn) return 0;
1945 0           cs = mysql_character_set_name(conn);
1946 0 0         return (cs && strncmp(cs, "utf8", 4) == 0);
    0          
1947             }
1948              
1949 0           static AV* build_field_names(MYSQL *conn, MYSQL_FIELD *fields, unsigned int ncols) {
1950 0           AV *fnames = newAV();
1951             unsigned int i;
1952 0           int utf8_names = conn_charset_is_utf8(conn);
1953 0 0         if (ncols > 0) av_extend(fnames, ncols - 1);
1954 0 0         for (i = 0; i < ncols; i++) {
1955 0           SV *name = newSVpvn(fields[i].name, fields[i].name_length);
1956 0 0         if (utf8_names)
1957 0           SvUTF8_on(name);
1958 0           av_push(fnames, name);
1959             }
1960 0           return fnames;
1961             }
1962              
1963 0           static void setup_bind_params(ev_mariadb_stmt_t *ctx, AV *params) {
1964 0           int nparams = (int)(av_len(params) + 1);
1965             MYSQL_BIND *bp;
1966             int i;
1967              
1968             {
1969 0           unsigned long expected = mysql_stmt_param_count(ctx->stmt);
1970 0 0         if ((unsigned long)nparams != expected)
1971 0           croak("parameter count mismatch: got %d, expected %lu", nparams, expected);
1972             }
1973              
1974 0 0         if (nparams == 0) return;
1975              
1976 0           free_stmt_bind_params(ctx);
1977              
1978 0           Newxz(bp, nparams, MYSQL_BIND);
1979 0           ctx->bind_params = bp;
1980 0           ctx->bind_param_count = nparams;
1981              
1982 0 0         for (i = 0; i < nparams; i++) {
1983 0           SV **svp = av_fetch(params, i, 0);
1984 0 0         if (svp && SvOK(*svp)) {
    0          
1985 0 0         if (SvIOK(*svp)) {
1986             long long *val;
1987 0           Newx(val, 1, long long);
1988 0 0         if (SvIsUV(*svp)) {
1989 0           *val = (long long)(unsigned long long)SvUV(*svp);
1990 0           bp[i].is_unsigned = 1;
1991             } else {
1992 0           *val = (long long)SvIV(*svp);
1993 0           bp[i].is_unsigned = 0;
1994             }
1995 0           bp[i].buffer_type = MYSQL_TYPE_LONGLONG;
1996 0           bp[i].buffer = (void *)val;
1997 0 0         } else if (SvNOK(*svp)) {
1998             double *val;
1999 0           Newx(val, 1, double);
2000 0           *val = (double)SvNV(*svp);
2001 0           bp[i].buffer_type = MYSQL_TYPE_DOUBLE;
2002 0           bp[i].buffer = (void *)val;
2003             } else {
2004             STRLEN len;
2005 0           const char *s = SvPV(*svp, len);
2006             char *copy;
2007 0           Newx(copy, len + 1, char);
2008 0           Copy(s, copy, len, char);
2009 0           copy[len] = '\0';
2010 0           bp[i].buffer_type = MYSQL_TYPE_STRING;
2011 0           bp[i].buffer = (void *)copy;
2012 0           bp[i].buffer_length = (unsigned long)len;
2013 0           bp[i].length = &bp[i].buffer_length;
2014             }
2015             } else {
2016 0           bp[i].buffer_type = MYSQL_TYPE_NULL;
2017             }
2018             }
2019              
2020 0 0         if (mysql_stmt_bind_param(ctx->stmt, bp)) {
2021             char errbuf[512];
2022 0           COPY_ERROR(errbuf, mysql_stmt_error(ctx->stmt));
2023 0           free_stmt_bind_params(ctx);
2024 0           croak("%s", errbuf);
2025             }
2026             }
2027              
2028 0           static void start_connect(ev_mariadb_t *self) {
2029             int status;
2030              
2031 0           self->conn = mysql_init(NULL);
2032 0 0         if (NULL == self->conn) {
2033 0           croak("mysql_init failed");
2034             }
2035              
2036 0           self->connect_pid = getpid();
2037 0           mysql_options(self->conn, MYSQL_OPT_NONBLOCK, 0);
2038 0           apply_options(self);
2039              
2040 0           self->state = STATE_CONNECTING;
2041 0           status = mysql_real_connect_start(&self->op_conn_ret, self->conn,
2042 0           self->host, self->user, self->password, self->database,
2043 0           self->port, self->unix_socket, self->client_flags);
2044              
2045 0 0         if (status == 0) {
2046 0           on_connect_done(self);
2047             } else {
2048 0           self->fd = mysql_get_socket(self->conn);
2049 0 0         if (self->fd < 0) {
2050 0           mysql_close(self->conn);
2051 0           self->conn = NULL;
2052 0           self->state = STATE_IDLE;
2053 0           croak("mysql_get_socket returned invalid fd");
2054             }
2055              
2056 0           init_io_watchers(self);
2057              
2058 0           update_watchers(self, status);
2059             }
2060 0           }
2061              
2062             MODULE = EV::MariaDB PACKAGE = EV::MariaDB
2063              
2064             BOOT:
2065             {
2066 15 50         I_EV_API("EV::MariaDB");
    50          
    50          
2067 15           call_atexit(drain_freelists, NULL);
2068             }
2069              
2070             EV::MariaDB
2071             _new(char *class, EV::Loop loop)
2072             CODE:
2073             {
2074             PERL_UNUSED_VAR(class);
2075 2           Newxz(RETVAL, 1, ev_mariadb_t);
2076 2           RETVAL->magic = EV_MARIADB_MAGIC;
2077 2           RETVAL->loop = loop;
2078 2           RETVAL->fd = -1;
2079 2           RETVAL->state = STATE_IDLE;
2080 2           ngx_queue_init(&RETVAL->cb_queue);
2081 2           ngx_queue_init(&RETVAL->send_queue);
2082              
2083 2           ev_init(&RETVAL->timer, timer_cb);
2084 2           RETVAL->timer.data = (void *)RETVAL;
2085             }
2086             OUTPUT:
2087             RETVAL
2088              
2089             void
2090             DESTROY(EV::MariaDB self)
2091             CODE:
2092             {
2093 2 50         if (self->magic != EV_MARIADB_MAGIC)
2094 0           return;
2095              
2096 2           self->magic = EV_MARIADB_FREED;
2097              
2098 2           stop_reading(self);
2099 2           stop_writing(self);
2100 2           stop_timer(self);
2101              
2102 2 50         if (PL_dirty) {
2103             /* global destruction — free C resources only; skip SvREFCNT_dec
2104             to avoid cascading destructors in torn-down interpreter */
2105 0 0         int is_fork = IS_FORKED(self);
    0          
2106 0 0         while (!ngx_queue_empty(&self->send_queue)) {
2107 0           ngx_queue_t *q = ngx_queue_head(&self->send_queue);
2108 0           ev_mariadb_send_t *s = ngx_queue_data(q, ev_mariadb_send_t, queue);
2109 0           ngx_queue_remove(q);
2110 0           Safefree(s->sql);
2111 0           Safefree(s);
2112             }
2113 0 0         while (!ngx_queue_empty(&self->cb_queue)) {
2114 0           ngx_queue_t *q = ngx_queue_head(&self->cb_queue);
2115 0           ev_mariadb_cb_t *cbt = ngx_queue_data(q, ev_mariadb_cb_t, queue);
2116 0           ngx_queue_remove(q);
2117 0           Safefree(cbt);
2118             }
2119 0 0         if (self->op_data_ptr) Safefree(self->op_data_ptr);
2120 0           Safefree(self->pending_user);
2121 0           Safefree(self->pending_password);
2122 0           Safefree(self->pending_database);
2123 0           Safefree(self->pending_charset);
2124 0           self->stream_cb = NULL;
2125             {
2126 0           ev_mariadb_stmt_t *ctx = self->stmt_list;
2127 0 0         while (ctx) {
2128 0           ev_mariadb_stmt_t *next = ctx->next;
2129 0           free_stmt_bind_params(ctx);
2130 0 0         if (!is_fork && ctx->stmt) {
    0          
2131 0 0         if (ctx->stmt == self->op_stmt) self->op_stmt = NULL;
2132 0           mysql_stmt_close(ctx->stmt);
2133             }
2134 0           Safefree(ctx);
2135 0           ctx = next;
2136             }
2137             }
2138 0 0         if (!is_fork) {
2139 0 0         if (self->op_result) mysql_free_result(self->op_result);
2140 0 0         if (self->conn) mysql_close(self->conn);
2141             }
2142 0           free_connect_strings(self);
2143 0           free_option_strings(self);
2144 0           Safefree(self);
2145 0           return;
2146             }
2147              
2148 2           cancel_pending(self, "object destroyed");
2149              
2150             /* safety net: callbacks during cancel_pending could re-queue entries */
2151 2           drain_queues_silent(self);
2152              
2153 2 50         if (self->op_data_ptr) {
2154 0           Safefree(self->op_data_ptr);
2155 0           self->op_data_ptr = NULL;
2156             }
2157              
2158 2           FREE_STR(self->pending_user);
2159 2           FREE_STR(self->pending_password);
2160 2           FREE_STR(self->pending_database);
2161 2           FREE_STR(self->pending_charset);
2162              
2163             /* Free all tracked stmt wrappers and connection resources */
2164             {
2165 2 50         int is_fork = IS_FORKED(self);
    0          
2166 2           ev_mariadb_stmt_t *ctx = self->stmt_list;
2167 2 50         while (ctx) {
2168 0           ev_mariadb_stmt_t *next = ctx->next;
2169 0           free_stmt_bind_params(ctx);
2170 0 0         if (!is_fork && ctx->stmt) {
    0          
2171             /* op_stmt may equal a tracked stmt (e.g. STATE_STMT_RESET,
2172             STATE_STMT_STORE, STATE_STMT_SEND_LONG_DATA where op_stmt_ctx
2173             is NULL); avoid the double-close below. */
2174 0 0         if (ctx->stmt == self->op_stmt) self->op_stmt = NULL;
2175 0           mysql_stmt_close(ctx->stmt);
2176             }
2177 0           Safefree(ctx);
2178 0           ctx = next;
2179             }
2180 2           self->stmt_list = NULL;
2181             /* Close op_stmt only if it was never linked into stmt_list (in-flight prepare). */
2182 2 50         if (!is_fork && self->op_stmt && !self->op_stmt_ctx) {
    50          
    0          
2183 0           mysql_stmt_close(self->op_stmt);
2184             }
2185 2           self->op_stmt = NULL;
2186 2           self->op_stmt_ctx = NULL;
2187              
2188 2 50         if (is_fork) {
2189 0           self->op_result = NULL;
2190 0           self->conn = NULL;
2191             } else {
2192 2 50         if (self->op_result) {
2193 0           mysql_free_result(self->op_result);
2194 0           self->op_result = NULL;
2195             }
2196             {
2197 2           MYSQL *conn = self->conn;
2198 2           self->conn = NULL;
2199 2 50         if (conn) mysql_close(conn);
2200             }
2201             }
2202             }
2203              
2204             /* Drop handler SVs before zeroing loop/fd so any re-entrant destructors
2205             see NULL handler fields (and a still-valid loop while unwinding). */
2206             {
2207             SV *cb;
2208 2 50         if ((cb = self->on_connect)) { self->on_connect = NULL; SvREFCNT_dec(cb); }
2209 2 50         if ((cb = self->on_error)) { self->on_error = NULL; SvREFCNT_dec(cb); }
2210             }
2211              
2212 2           self->loop = NULL;
2213 2           self->fd = -1;
2214              
2215 2           free_connect_strings(self);
2216 2           free_option_strings(self);
2217              
2218             /* deferred free: check_destroyed will Safefree when depth hits 0 */
2219 2 50         if (self->callback_depth == 0)
2220 2           Safefree(self);
2221             }
2222              
2223             void
2224             connect(EV::MariaDB self, const char *host, const char *user, const char *password, const char *database, unsigned int port = 3306, SV *unix_socket_sv = NULL)
2225             CODE:
2226             {
2227 0 0         if (NULL != self->conn) {
2228 0 0         croak(self->state == STATE_CONNECTING
2229             ? "connection already in progress"
2230             : "already connected");
2231             }
2232              
2233 0           free_connect_strings(self);
2234              
2235 0           self->host = safe_strdup(host);
2236 0           self->user = safe_strdup(user);
2237 0           self->password = safe_strdup(password);
2238 0           self->database = safe_strdup(database);
2239 0           self->port = port;
2240 0           self->unix_socket = NULL;
2241              
2242 0 0         if (unix_socket_sv && SvOK(unix_socket_sv)) {
    0          
2243 0           self->unix_socket = safe_strdup(SvPV_nolen(unix_socket_sv));
2244             }
2245              
2246 0           start_connect(self);
2247             }
2248              
2249             void
2250             reset(EV::MariaDB self)
2251             CODE:
2252             {
2253 0 0         if (NULL == self->host) {
2254 0           croak("no previous connection to reset");
2255             }
2256              
2257 0           cleanup_connection(self);
2258 0           cancel_pending(self, "connection reset");
2259 0 0         if (check_destroyed(self)) return;
2260 0 0         if (self->state != STATE_IDLE) return;
2261              
2262 0           start_connect(self);
2263             }
2264              
2265             void
2266             finish(EV::MariaDB self)
2267             CODE:
2268             {
2269 0           cleanup_connection(self);
2270 0           cancel_pending(self, "connection finished");
2271 0 0         if (check_destroyed(self)) return;
2272             }
2273              
2274             SV*
2275             on_connect(EV::MariaDB self, SV *handler = NULL)
2276             CODE:
2277             {
2278 0           RETVAL = handler_accessor(&self->on_connect, handler, items > 1);
2279             }
2280             OUTPUT:
2281             RETVAL
2282              
2283             SV*
2284             on_error(EV::MariaDB self, SV *handler = NULL)
2285             CODE:
2286             {
2287 2           RETVAL = handler_accessor(&self->on_error, handler, items > 1);
2288             }
2289             OUTPUT:
2290             RETVAL
2291              
2292             void
2293             query(EV::MariaDB self, SV *sql_sv, SV *cb)
2294             CODE:
2295             {
2296             STRLEN sql_len;
2297             const char *sql;
2298              
2299 0 0         if (NULL == self->conn) {
2300 0           croak("not connected");
2301             }
2302 0 0         if (IS_FORKED(self)) {
    0          
2303 0           croak("connection not valid after fork");
2304             }
2305 0 0         if (self->state >= STATE_STMT_PREPARE) {
2306 0           croak("cannot queue query: exclusive operation in progress");
2307             }
2308 0 0         if (!(SvROK(cb) && SvTYPE(SvRV(cb)) == SVt_PVCV)) {
    0          
2309 0           croak("callback must be a CODE reference");
2310             }
2311              
2312 0           sql = SvPV(sql_sv, sql_len);
2313 0           push_send(self, sql, sql_len, cb);
2314              
2315 0 0         if (self->state == STATE_IDLE && self->callback_depth == 0)
    0          
2316 0           pipeline_advance(self);
2317             }
2318              
2319             void
2320             prepare(EV::MariaDB self, SV *sql_sv, SV *cb)
2321             PREINIT:
2322             STRLEN sql_len;
2323             const char *sql;
2324             CODE:
2325             {
2326             int status;
2327             MYSQL_STMT *stmt;
2328              
2329 0 0         CHECK_READY(self, cb);
    0          
    0          
    0          
    0          
    0          
    0          
    0          
2330              
2331 0           stmt = mysql_stmt_init(self->conn);
2332 0 0         if (NULL == stmt) {
2333 0           croak("mysql_stmt_init failed");
2334             }
2335             {
2336 0           my_bool update_max = 1;
2337 0           mysql_stmt_attr_set(stmt, STMT_ATTR_UPDATE_MAX_LENGTH, &update_max);
2338             }
2339              
2340 0           sql = SvPV(sql_sv, sql_len);
2341 0           push_cb(self, cb);
2342 0           self->op_stmt = stmt;
2343              
2344 0           self->state = STATE_STMT_PREPARE;
2345 0           status = mysql_stmt_prepare_start(&self->op_ret, stmt, sql, (unsigned long)sql_len);
2346 0 0         if (status == 0) {
2347 0           on_stmt_prepare_done(self);
2348             } else {
2349 0           update_watchers(self, status);
2350             }
2351             }
2352              
2353             void
2354             execute(EV::MariaDB self, IV stmt_iv, SV *params_ref, SV *cb)
2355             PREINIT:
2356             ev_mariadb_stmt_t *ctx;
2357             int status;
2358             CODE:
2359             {
2360 0 0         CHECK_READY(self, cb);
    0          
    0          
    0          
    0          
    0          
    0          
    0          
2361              
2362 0           ctx = INT2PTR(ev_mariadb_stmt_t *, stmt_iv);
2363 0 0         CHECK_STMT(ctx);
    0          
2364              
2365 0 0         if (SvOK(params_ref)) {
2366 0 0         if (!SvROK(params_ref) || SvTYPE(SvRV(params_ref)) != SVt_PVAV)
    0          
2367 0           croak("params must be an ARRAY reference or undef");
2368 0           setup_bind_params(ctx, (AV *)SvRV(params_ref));
2369             }
2370              
2371 0           push_cb(self, cb);
2372 0           self->op_stmt = ctx->stmt;
2373 0           self->op_stmt_ctx = ctx;
2374              
2375 0           self->state = STATE_STMT_EXECUTE;
2376 0           status = mysql_stmt_execute_start(&self->op_ret, ctx->stmt);
2377 0 0         if (status == 0) {
2378 0           transition_to_stmt_store(self);
2379             } else {
2380 0           update_watchers(self, status);
2381             }
2382             }
2383              
2384             void
2385             bind_params(EV::MariaDB self, IV stmt_iv, SV *params_ref)
2386             PREINIT:
2387             ev_mariadb_stmt_t *ctx;
2388             CODE:
2389             {
2390 0 0         if (NULL == self->conn || self->state == STATE_CONNECTING)
    0          
2391 0           croak("not connected");
2392 0 0         if (IS_FORKED(self))
    0          
2393 0           croak("connection not valid after fork");
2394 0 0         if (self->state != STATE_IDLE)
2395 0           croak("another operation is in progress");
2396 0 0         if (self->send_count > 0)
2397 0           croak("cannot bind while pipeline results are pending");
2398 0 0         if (!SvROK(params_ref) || SvTYPE(SvRV(params_ref)) != SVt_PVAV)
    0          
2399 0           croak("params must be an ARRAY reference");
2400              
2401 0           ctx = INT2PTR(ev_mariadb_stmt_t *, stmt_iv);
2402 0 0         CHECK_STMT(ctx);
    0          
2403 0           setup_bind_params(ctx, (AV *)SvRV(params_ref));
2404             }
2405              
2406             void
2407             close_stmt(EV::MariaDB self, IV stmt_iv, SV *cb)
2408             PREINIT:
2409             ev_mariadb_stmt_t *ctx;
2410             int status;
2411             CODE:
2412             {
2413 0 0         CHECK_READY(self, cb);
    0          
    0          
    0          
    0          
    0          
    0          
    0          
2414              
2415 0           ctx = INT2PTR(ev_mariadb_stmt_t *, stmt_iv);
2416 0 0         if (!ctx) croak("invalid statement handle");
2417 0 0         if (ctx->closed) {
2418             /* already closed by cleanup_connection — just free the wrapper */
2419 0           unlink_stmt(self, ctx);
2420 0           Safefree(ctx);
2421 0           push_cb(self, cb);
2422 0 0         if (deliver_value(self, newSViv(1))) return;
2423 0           maybe_pipeline(self);
2424 0           return;
2425             }
2426 0           free_stmt_bind_params(ctx);
2427 0           push_cb(self, cb);
2428 0           self->op_stmt = ctx->stmt;
2429 0           self->op_stmt_ctx = ctx;
2430              
2431 0           self->state = STATE_STMT_CLOSE;
2432 0           status = mysql_stmt_close_start(&self->op_bool_ret, ctx->stmt);
2433 0 0         if (status == 0) {
2434 0           on_stmt_close_done(self);
2435             } else {
2436 0           update_watchers(self, status);
2437             }
2438             }
2439              
2440             void
2441             stmt_reset(EV::MariaDB self, IV stmt_iv, SV *cb)
2442             PREINIT:
2443             ev_mariadb_stmt_t *ctx;
2444             int status;
2445             CODE:
2446             {
2447 0 0         CHECK_READY(self, cb);
    0          
    0          
    0          
    0          
    0          
    0          
    0          
2448              
2449 0           ctx = INT2PTR(ev_mariadb_stmt_t *, stmt_iv);
2450 0 0         CHECK_STMT(ctx);
    0          
2451 0           push_cb(self, cb);
2452 0           self->op_stmt = ctx->stmt;
2453              
2454 0           self->state = STATE_STMT_RESET;
2455 0           status = mysql_stmt_reset_start(&self->op_bool_ret, ctx->stmt);
2456 0 0         if (status == 0) {
2457 0           on_stmt_reset_done(self);
2458             } else {
2459 0           update_watchers(self, status);
2460             }
2461             }
2462              
2463             void
2464             ping(EV::MariaDB self, SV *cb)
2465             PREINIT:
2466             int status;
2467             CODE:
2468             {
2469 0 0         CHECK_READY(self, cb);
    0          
    0          
    0          
    0          
    0          
    0          
    0          
2470              
2471 0           push_cb(self, cb);
2472              
2473 0           self->state = STATE_PING;
2474 0           status = mysql_ping_start(&self->op_ret, self->conn);
2475 0           dispatch_utility(self, status, self->op_ret != 0);
2476             }
2477              
2478             void
2479             change_user(EV::MariaDB self, const char *user, const char *password, SV *db_sv, SV *cb)
2480             PREINIT:
2481             int status;
2482             const char *db;
2483             CODE:
2484             {
2485 0 0         CHECK_READY(self, cb);
    0          
    0          
    0          
    0          
    0          
    0          
    0          
2486              
2487 0 0         db = (SvOK(db_sv)) ? SvPV_nolen(db_sv) : NULL;
2488              
2489             /* Stash the new credentials; on_utility_done commits them on success or
2490             discards on failure so reset() never sees credentials we couldn't use.
2491             Pass the pending pointers to libmariadb so the strings live across the
2492             async start/cont sequence (Perl args expire when the XS body returns). */
2493 0           replace_str(&self->pending_user, user);
2494 0           replace_str(&self->pending_password, password);
2495 0 0         if (db) replace_str(&self->pending_database, db);
2496              
2497 0           push_cb(self, cb);
2498              
2499 0           self->state = STATE_CHANGE_USER;
2500 0           status = mysql_change_user_start(&self->op_bool_ret, self->conn,
2501 0           self->pending_user, self->pending_password, self->pending_database);
2502 0           dispatch_utility(self, status, self->op_bool_ret != 0);
2503             }
2504              
2505             void
2506             select_db(EV::MariaDB self, const char *db, SV *cb)
2507             PREINIT:
2508             int status;
2509             CODE:
2510             {
2511 0 0         CHECK_READY(self, cb);
    0          
    0          
    0          
    0          
    0          
    0          
    0          
2512              
2513             /* Stash the new database; on_utility_done commits on success. */
2514 0           replace_str(&self->pending_database, db);
2515              
2516 0           push_cb(self, cb);
2517              
2518 0           self->state = STATE_SELECT_DB;
2519 0           status = mysql_select_db_start(&self->op_ret, self->conn, self->pending_database);
2520 0           dispatch_utility(self, status, self->op_ret != 0);
2521             }
2522              
2523             void
2524             reset_connection(EV::MariaDB self, SV *cb)
2525             PREINIT:
2526             int status;
2527             CODE:
2528             {
2529 0 0         CHECK_READY(self, cb);
    0          
    0          
    0          
    0          
    0          
    0          
    0          
2530              
2531 0           push_cb(self, cb);
2532              
2533 0           self->state = STATE_RESET_CONNECTION;
2534 0           status = mysql_reset_connection_start(&self->op_ret, self->conn);
2535 0           dispatch_utility(self, status, self->op_ret != 0);
2536             }
2537              
2538             void
2539             set_charset(EV::MariaDB self, const char *charset, SV *cb)
2540             PREINIT:
2541             int status;
2542             CODE:
2543             {
2544 0 0         CHECK_READY(self, cb);
    0          
    0          
    0          
    0          
    0          
    0          
    0          
2545              
2546             /* Stash the new charset; on_utility_done commits on success. */
2547 0           replace_str(&self->pending_charset, charset);
2548              
2549 0           push_cb(self, cb);
2550              
2551 0           self->state = STATE_SET_CHARSET;
2552 0           status = mysql_set_character_set_start(&self->op_ret, self->conn, self->pending_charset);
2553 0           dispatch_utility(self, status, self->op_ret != 0);
2554             }
2555              
2556             void
2557             commit(EV::MariaDB self, SV *cb)
2558             PREINIT:
2559             int status;
2560             CODE:
2561             {
2562 0 0         CHECK_READY(self, cb);
    0          
    0          
    0          
    0          
    0          
    0          
    0          
2563              
2564 0           push_cb(self, cb);
2565              
2566 0           self->state = STATE_COMMIT;
2567 0           status = mysql_commit_start(&self->op_bool_ret, self->conn);
2568 0           dispatch_utility(self, status, self->op_bool_ret != 0);
2569             }
2570              
2571             void
2572             rollback(EV::MariaDB self, SV *cb)
2573             PREINIT:
2574             int status;
2575             CODE:
2576             {
2577 0 0         CHECK_READY(self, cb);
    0          
    0          
    0          
    0          
    0          
    0          
    0          
2578              
2579 0           push_cb(self, cb);
2580              
2581 0           self->state = STATE_ROLLBACK;
2582 0           status = mysql_rollback_start(&self->op_bool_ret, self->conn);
2583 0           dispatch_utility(self, status, self->op_bool_ret != 0);
2584             }
2585              
2586             void
2587             autocommit(EV::MariaDB self, int mode, SV *cb)
2588             PREINIT:
2589             int status;
2590             CODE:
2591             {
2592 0 0         CHECK_READY(self, cb);
    0          
    0          
    0          
    0          
    0          
    0          
    0          
2593              
2594 0           push_cb(self, cb);
2595              
2596 0           self->state = STATE_AUTOCOMMIT;
2597 0           status = mysql_autocommit_start(&self->op_bool_ret, self->conn, (my_bool)(mode ? 1 : 0));
2598 0           dispatch_utility(self, status, self->op_bool_ret != 0);
2599             }
2600              
2601             void
2602             query_stream(EV::MariaDB self, SV *sql_sv, SV *cb)
2603             PREINIT:
2604             STRLEN sql_len;
2605             const char *sql;
2606             int status;
2607             CODE:
2608             {
2609 0 0         CHECK_READY(self, cb);
    0          
    0          
    0          
    0          
    0          
    0          
    0          
2610              
2611 0           sql = SvPV(sql_sv, sql_len);
2612 0           self->stream_cb = SvREFCNT_inc(cb);
2613 0           self->pending_count++;
2614              
2615 0           self->state = STATE_REAL_QUERY;
2616 0           status = mysql_real_query_start(&self->op_ret, self->conn, sql, (unsigned long)sql_len);
2617 0 0         if (status == 0) {
2618 0           on_real_query_done(self);
2619             } else {
2620 0           update_watchers(self, status);
2621             }
2622             }
2623              
2624             void
2625             close_async(EV::MariaDB self, SV *cb)
2626             PREINIT:
2627             int status;
2628             CODE:
2629             {
2630 0 0         CHECK_READY(self, cb);
    0          
    0          
    0          
    0          
    0          
    0          
    0          
2631              
2632 0           push_cb(self, cb);
2633              
2634 0           self->state = STATE_CLOSE;
2635 0           status = mysql_close_start(self->conn);
2636 0 0         if (status == 0) {
2637 0           on_close_done(self);
2638             } else {
2639 0           update_watchers(self, status);
2640             }
2641             }
2642              
2643             void
2644             send_long_data(EV::MariaDB self, IV stmt_iv, unsigned int param_idx, SV *data_sv, SV *cb)
2645             PREINIT:
2646             ev_mariadb_stmt_t *ctx;
2647             STRLEN data_len;
2648             char *data;
2649             int status;
2650             CODE:
2651             {
2652 0 0         CHECK_READY(self, cb);
    0          
    0          
    0          
    0          
    0          
    0          
    0          
2653              
2654 0           ctx = INT2PTR(ev_mariadb_stmt_t *, stmt_iv);
2655 0 0         CHECK_STMT(ctx);
    0          
2656             {
2657 0           const char *src = SvPV(data_sv, data_len);
2658 0 0         Newx(data, data_len > 0 ? data_len : 1, char);
2659 0           Copy(src, data, data_len, char);
2660             }
2661 0           push_cb(self, cb);
2662 0           self->op_stmt = ctx->stmt;
2663 0           self->op_data_ptr = data;
2664              
2665 0           self->state = STATE_STMT_SEND_LONG_DATA;
2666 0           status = mysql_stmt_send_long_data_start(&self->op_bool_ret, ctx->stmt,
2667             param_idx, data, (unsigned long)data_len);
2668 0 0         if (status == 0) {
2669 0           on_send_long_data_done(self);
2670             } else {
2671 0           update_watchers(self, status);
2672             }
2673             }
2674              
2675             void
2676             _set_option(EV::MariaDB self, const char *key, SV *value)
2677             CODE:
2678             {
2679 0 0         if (strcmp(key, "connect_timeout") == 0) {
2680 0           self->connect_timeout = SvUV(value);
2681 0 0         } else if (strcmp(key, "read_timeout") == 0) {
2682 0           self->read_timeout = SvUV(value);
2683 0 0         } else if (strcmp(key, "write_timeout") == 0) {
2684 0           self->write_timeout = SvUV(value);
2685 0 0         } else if (strcmp(key, "compress") == 0) {
2686 0           self->compress = SvTRUE(value) ? 1 : 0;
2687 0 0         } else if (strcmp(key, "multi_statements") == 0) {
2688 0           self->multi_statements = SvTRUE(value) ? 1 : 0;
2689 0 0         } else if (strcmp(key, "found_rows") == 0) {
2690 0           self->found_rows = SvTRUE(value) ? 1 : 0;
2691 0 0         } else if (strcmp(key, "charset") == 0) {
2692 0 0         SET_STR_OPTION(charset);
    0          
2693 0 0         } else if (strcmp(key, "init_command") == 0) {
2694 0 0         SET_STR_OPTION(init_command);
    0          
2695 0 0         } else if (strcmp(key, "ssl_key") == 0) {
2696 0 0         SET_STR_OPTION(ssl_key);
    0          
2697 0 0         } else if (strcmp(key, "ssl_cert") == 0) {
2698 0 0         SET_STR_OPTION(ssl_cert);
    0          
2699 0 0         } else if (strcmp(key, "ssl_ca") == 0) {
2700 0 0         SET_STR_OPTION(ssl_ca);
    0          
2701 0 0         } else if (strcmp(key, "ssl_capath") == 0) {
2702 0 0         SET_STR_OPTION(ssl_capath);
    0          
2703 0 0         } else if (strcmp(key, "ssl_cipher") == 0) {
2704 0 0         SET_STR_OPTION(ssl_cipher);
    0          
2705 0 0         } else if (strcmp(key, "ssl_verify_server_cert") == 0) {
2706 0           self->ssl_verify_server_cert = SvTRUE(value) ? 1 : 0;
2707 0 0         } else if (strcmp(key, "utf8") == 0) {
2708 0           self->utf8 = SvTRUE(value) ? 1 : 0;
2709             } else {
2710 0           croak("unknown option: %s", key);
2711             }
2712             }
2713              
2714             int
2715             is_connected(EV::MariaDB self)
2716             CODE:
2717             {
2718 1 50         RETVAL = (NULL != self->conn && self->state != STATE_CONNECTING) ? 1 : 0;
    0          
    50          
2719             }
2720             OUTPUT:
2721             RETVAL
2722              
2723             SV*
2724             error_message(EV::MariaDB self)
2725             CODE:
2726             {
2727 0 0         RETVAL = self->conn ? str_or_undef(mysql_error(self->conn)) : &PL_sv_undef;
2728             }
2729             OUTPUT:
2730             RETVAL
2731              
2732             unsigned int
2733             error_number(EV::MariaDB self)
2734             CODE:
2735             {
2736 0 0         RETVAL = (NULL != self->conn) ? mysql_errno(self->conn) : 0;
    0          
2737             }
2738             OUTPUT:
2739             RETVAL
2740              
2741             SV*
2742             sqlstate(EV::MariaDB self)
2743             CODE:
2744             {
2745 0 0         RETVAL = self->conn ? str_or_undef(mysql_sqlstate(self->conn)) : &PL_sv_undef;
2746             }
2747             OUTPUT:
2748             RETVAL
2749              
2750             SV*
2751             insert_id(EV::MariaDB self)
2752             CODE:
2753             {
2754 0 0         if (NULL != self->conn) {
2755 0           my_ulonglong id = mysql_insert_id(self->conn);
2756 0           RETVAL = newSVuv((UV)id);
2757             }
2758             else {
2759 0           RETVAL = &PL_sv_undef;
2760             }
2761             }
2762             OUTPUT:
2763             RETVAL
2764              
2765             unsigned int
2766             warning_count(EV::MariaDB self)
2767             CODE:
2768             {
2769 0 0         RETVAL = (NULL != self->conn) ? mysql_warning_count(self->conn) : 0;
    0          
2770             }
2771             OUTPUT:
2772             RETVAL
2773              
2774             SV*
2775             info(EV::MariaDB self)
2776             CODE:
2777             {
2778 0 0         RETVAL = self->conn ? str_or_undef(mysql_info(self->conn)) : &PL_sv_undef;
2779             }
2780             OUTPUT:
2781             RETVAL
2782              
2783             UV
2784             server_version(EV::MariaDB self)
2785             CODE:
2786             {
2787 0 0         RETVAL = (NULL != self->conn) ? (UV)mysql_get_server_version(self->conn) : 0;
    0          
2788             }
2789             OUTPUT:
2790             RETVAL
2791              
2792             SV*
2793             server_info(EV::MariaDB self)
2794             CODE:
2795             {
2796 0 0         RETVAL = self->conn ? str_or_undef(mysql_get_server_info(self->conn)) : &PL_sv_undef;
2797             }
2798             OUTPUT:
2799             RETVAL
2800              
2801             UV
2802             thread_id(EV::MariaDB self)
2803             CODE:
2804             {
2805 0 0         RETVAL = (NULL != self->conn) ? (UV)mysql_thread_id(self->conn) : 0;
    0          
2806             }
2807             OUTPUT:
2808             RETVAL
2809              
2810             SV*
2811             host_info(EV::MariaDB self)
2812             CODE:
2813             {
2814 0 0         RETVAL = self->conn ? str_or_undef(mysql_get_host_info(self->conn)) : &PL_sv_undef;
2815             }
2816             OUTPUT:
2817             RETVAL
2818              
2819             SV*
2820             character_set_name(EV::MariaDB self)
2821             CODE:
2822             {
2823 0 0         RETVAL = self->conn ? str_or_undef(mysql_character_set_name(self->conn)) : &PL_sv_undef;
2824             }
2825             OUTPUT:
2826             RETVAL
2827              
2828             int
2829             socket(EV::MariaDB self)
2830             CODE:
2831             {
2832 0 0         RETVAL = (NULL != self->conn) ? (int)mysql_get_socket(self->conn) : -1;
    0          
2833             }
2834             OUTPUT:
2835             RETVAL
2836              
2837             SV*
2838             affected_rows(EV::MariaDB self)
2839             CODE:
2840             {
2841 0 0         if (NULL != self->conn) {
2842 0           my_ulonglong rows = mysql_affected_rows(self->conn);
2843 0 0         RETVAL = (rows == (my_ulonglong)-1) ? &PL_sv_undef : newSVuv((UV)rows);
2844             }
2845             else {
2846 0           RETVAL = &PL_sv_undef;
2847             }
2848             }
2849             OUTPUT:
2850             RETVAL
2851              
2852             SV*
2853             escape(EV::MariaDB self, SV *str)
2854             PREINIT:
2855             STRLEN len;
2856             const char *s;
2857             unsigned long elen;
2858             CODE:
2859             {
2860 0 0         if (NULL == self->conn || self->state == STATE_CONNECTING || IS_FORKED(self))
    0          
    0          
    0          
2861 0           croak("not connected");
2862 0 0         if (self->state == STATE_CLOSE)
2863 0           croak("connection is closing");
2864 0           s = SvPV(str, len);
2865 0 0         if (SvUTF8(str)) {
2866 0           const char *cs = mysql_character_set_name(self->conn);
2867 0 0         if (!cs || (strncmp(cs, "utf8", 4) != 0)) {
    0          
2868 0 0         warn("EV::MariaDB: escaping a UTF-8 string on a non-utf8 connection (%s) may cause corruption or injection vulnerabilities", cs ? cs : "unknown");
2869             }
2870             }
2871 0 0         if (len > (STRLEN)((~(STRLEN)0) / 2 - 1))
2872 0           croak("string too large to escape");
2873 0           RETVAL = newSV(len * 2 + 1);
2874 0           SvPOK_on(RETVAL);
2875 0           elen = mysql_real_escape_string(self->conn, SvPVX(RETVAL), s, (unsigned long)len);
2876 0 0         if (elen == (unsigned long)-1) {
2877 0           SvREFCNT_dec(RETVAL);
2878 0           croak("mysql_real_escape_string failed");
2879             }
2880 0           SvCUR_set(RETVAL, elen);
2881 0           *SvEND(RETVAL) = '\0';
2882             }
2883             OUTPUT:
2884             RETVAL
2885              
2886             int
2887             pending_count(EV::MariaDB self)
2888             CODE:
2889             {
2890 0 0         RETVAL = self->pending_count;
2891             }
2892             OUTPUT:
2893             RETVAL
2894              
2895             void
2896             skip_pending(EV::MariaDB self)
2897             CODE:
2898             {
2899 0 0         if (self->state != STATE_IDLE || self->send_count > 0) {
    0          
2900 0           cleanup_connection(self);
2901             }
2902 0           cancel_pending(self, "skipped");
2903 0 0         if (check_destroyed(self)) return;
2904             }
2905              
2906             UV
2907             lib_version(char *class)
2908             CODE:
2909             {
2910             PERL_UNUSED_VAR(class);
2911 2           RETVAL = (UV)mysql_get_client_version();
2912             }
2913             OUTPUT:
2914             RETVAL
2915              
2916             SV*
2917             lib_info(char *class)
2918             CODE:
2919             {
2920             PERL_UNUSED_VAR(class);
2921 1           RETVAL = newSVpv(mysql_get_client_info(), 0);
2922             }
2923             OUTPUT:
2924             RETVAL