File Coverage

Memcached.xs
Criterion Covered Total %
statement 7 1244 0.5
branch 3 938 0.3
condition n/a
subroutine n/a
pod n/a
total 10 2182 0.4


line stmt bran cond sub pod time code
1             #include "EXTERN.h"
2             #include "perl.h"
3             #include "XSUB.h"
4             #include "ppport.h"
5              
6             #include "EVAPI.h"
7             #include "ngx-queue.h"
8              
9             #include
10             #include
11             #include
12             #include
13             #include
14             #include
15             #include
16             #include
17             #include
18             #include
19              
20             /* ================================================================
21             * Constants
22             * ================================================================ */
23              
24             #define MC_MAGIC_ALIVE 0xCAFEBEEF
25             #define MC_MAGIC_FREED 0xDEADCAFE
26              
27             #define MC_REQ_MAGIC 0x80
28             #define MC_RES_MAGIC 0x81
29             #define MC_HEADER_SIZE 24
30              
31             /* Opcodes */
32             #define MC_OP_GET 0x00
33             #define MC_OP_SET 0x01
34             #define MC_OP_ADD 0x02
35             #define MC_OP_REPLACE 0x03
36             #define MC_OP_DELETE 0x04
37             #define MC_OP_INCR 0x05
38             #define MC_OP_DECR 0x06
39             #define MC_OP_QUIT 0x07
40             #define MC_OP_FLUSH 0x08
41             #define MC_OP_NOOP 0x0A
42             #define MC_OP_VERSION 0x0B
43             #define MC_OP_GETKQ 0x0D
44             #define MC_OP_APPEND 0x0E
45             #define MC_OP_PREPEND 0x0F
46             #define MC_OP_STAT 0x10
47             #define MC_OP_SETQ 0x11
48             #define MC_OP_FLUSHQ 0x18
49             #define MC_OP_TOUCH 0x1C
50             #define MC_OP_GAT 0x1D
51             #define MC_OP_SASL_LIST_MECHS 0x20
52             #define MC_OP_SASL_AUTH 0x21
53             #define MC_OP_SASL_STEP 0x22
54              
55             /* Status codes */
56             #define MC_STATUS_OK 0x0000
57             #define MC_STATUS_KEY_NOT_FOUND 0x0001
58             #define MC_STATUS_KEY_EXISTS 0x0002
59             #define MC_STATUS_VALUE_TOO_LARGE 0x0003
60             #define MC_STATUS_INVALID_ARGS 0x0004
61             #define MC_STATUS_NOT_STORED 0x0005
62             #define MC_STATUS_NON_NUMERIC 0x0006
63             #define MC_STATUS_AUTH_ERROR 0x0020
64             #define MC_STATUS_AUTH_CONTINUE 0x0021
65             #define MC_STATUS_UNKNOWN_CMD 0x0081
66             #define MC_STATUS_OUT_OF_MEMORY 0x0082
67              
68             #define BUF_INIT_SIZE 16384
69             #define MC_MAX_KEY_LEN 250
70              
71             /* Callback command types */
72             #define CB_CMD_GET 0 /* get - return value only */
73             #define CB_CMD_GETS 1 /* gets - return {value, flags, cas} */
74             #define CB_CMD_STORE 2 /* set/add/replace/append/prepend - return 1 */
75             #define CB_CMD_DELETE 3 /* delete - return 1 */
76             #define CB_CMD_ARITH 4 /* incr/decr - return new value */
77             #define CB_CMD_TOUCH 5 /* touch - return 1 */
78             #define CB_CMD_GAT 6 /* gat - return value */
79             #define CB_CMD_GATS 7 /* gats - return {value, flags, cas} */
80             #define CB_CMD_VERSION 8 /* version - return string */
81             #define CB_CMD_NOOP 9 /* noop - return 1 */
82             #define CB_CMD_FLUSH 10 /* flush - return 1 */
83             #define CB_CMD_STATS 11 /* stats - accumulate, return hash */
84             #define CB_CMD_MGET_ENTRY 12 /* individual GETKQ in mget */
85             #define CB_CMD_MGET_FENCE 13 /* NOOP fence in mget */
86             #define CB_CMD_QUIT 14 /* quit */
87             #define CB_CMD_MGETS_ENTRY 15 /* individual GETKQ in mgets (full info) */
88             #define CB_CMD_MGETS_FENCE 16 /* NOOP fence in mgets */
89             #define CB_CMD_SASL_LIST 17 /* sasl_list_mechs - return string */
90             #define CB_CMD_SASL_AUTH 18 /* sasl_auth - return 1 on success */
91              
92             #define CLEAR_HANDLER(field) \
93             do { if (NULL != (field)) { SvREFCNT_dec(field); (field) = NULL; } } while(0)
94              
95             #define MC_CROAK_UNLESS_CONNECTED(self) \
96             do { \
97             if (!(self)->connected && !(self)->connecting && \
98             !((self)->reconnect && (self)->reconnect_timer_active)) \
99             croak("not connected"); \
100             } while(0)
101              
102             /* ================================================================
103             * Type declarations
104             * ================================================================ */
105              
106             typedef struct ev_mc_s ev_mc_t;
107             typedef struct ev_mc_cb_s ev_mc_cb_t;
108             typedef struct ev_mc_wait_s ev_mc_wait_t;
109              
110             typedef ev_mc_t* EV__Memcached;
111             typedef struct ev_loop* EV__Loop;
112              
113             /* ================================================================
114             * Data structures
115             * ================================================================ */
116              
117             struct ev_mc_cb_s {
118             SV *cb; /* Perl callback (NULL for fire-and-forget / mget entries) */
119             ngx_queue_t queue;
120             uint32_t opaque;
121             int cmd; /* CB_CMD_* */
122             int quiet; /* 1 = quiet variant, may not get response */
123             int counted; /* 1 = contributes to pending_count */
124             int skipped;
125             HV *stats_hv; /* for CB_CMD_STATS: accumulated hash */
126             HV *mget_results; /* for CB_CMD_MGET_ENTRY: shared hash (borrowed) */
127             /* for CB_CMD_MGET_FENCE: owned hash */
128             };
129              
130             struct ev_mc_wait_s {
131             char *packet;
132             size_t packet_len;
133             SV *cb;
134             int cmd;
135             int quiet;
136             uint32_t opaque;
137             HV *stats_hv;
138             HV *mget_results; /* borrowed for MGET_ENTRY, owned for MGET_FENCE */
139             int counted;
140             int no_response; /* 1: fire-and-forget, drain to wbuf only, no cb_queue entry */
141             ngx_queue_t queue;
142             ev_tstamp queued_at;
143             };
144              
145             struct ev_mc_s {
146             unsigned int magic;
147             struct ev_loop *loop;
148             int fd;
149             int connected;
150             int connecting;
151              
152             /* IO watchers */
153             ev_io rio, wio;
154             int reading, writing;
155              
156             /* Buffers */
157             char *rbuf;
158             size_t rbuf_len, rbuf_cap;
159             char *wbuf;
160             size_t wbuf_len, wbuf_off, wbuf_cap;
161              
162             /* Callbacks */
163             SV *on_error;
164             SV *on_connect;
165             SV *on_disconnect;
166              
167             /* Command queue */
168             ngx_queue_t cb_queue;
169             ngx_queue_t wait_queue;
170             int pending_count;
171             int waiting_count;
172             int max_pending; /* 0 = unlimited */
173             uint32_t next_opaque;
174              
175             /* Reconnection */
176             char *host;
177             int port;
178             char *path;
179             int reconnect;
180             int reconnect_delay_ms;
181             int max_reconnect_attempts;
182             int reconnect_attempts;
183             ev_timer reconnect_timer;
184             int reconnect_timer_active;
185             int intentional_disconnect;
186             int resume_waiting_on_reconnect;
187              
188             /* Timeouts */
189             int connect_timeout_ms;
190             ev_timer connect_timer;
191             int connect_timer_active;
192             int command_timeout_ms;
193             ev_timer cmd_timer;
194             int cmd_timer_active;
195              
196             /* Flow control */
197             int waiting_timeout_ms;
198             ev_timer waiting_timer;
199             int waiting_timer_active;
200              
201             /* Safety */
202             int callback_depth;
203             int in_cb_cleanup;
204             int in_wait_cleanup;
205              
206             /* Options */
207             int priority;
208             int keepalive;
209              
210             /* SASL auth */
211             char *username;
212             char *password;
213             };
214              
215             /* ================================================================
216             * Shared error strings (initialized in BOOT)
217             * ================================================================ */
218              
219             static SV *err_skipped = NULL;
220             static SV *err_disconnected = NULL;
221             static SV *err_waiting_timeout = NULL;
222              
223             /* ================================================================
224             * Forward declarations
225             * ================================================================ */
226              
227             static void io_cb(EV_P_ ev_io *w, int revents);
228             static void reconnect_timer_cb(EV_P_ ev_timer *w, int revents);
229             static void waiting_timer_cb(EV_P_ ev_timer *w, int revents);
230             static void connect_timeout_cb(EV_P_ ev_timer *w, int revents);
231             static void cmd_timeout_cb(EV_P_ ev_timer *w, int revents);
232             static void arm_cmd_timer(ev_mc_t *self);
233             static void disarm_cmd_timer(ev_mc_t *self);
234             static uint32_t mc_enqueue_cmd(pTHX_ ev_mc_t *self,
235             uint8_t opcode, const char *key, STRLEN key_len,
236             const char *value, STRLEN value_len,
237             const char *extras, uint8_t extras_len,
238             uint64_t cas, int cmd, int quiet, SV *cb);
239             static void start_reading(ev_mc_t *self);
240             static void stop_reading(ev_mc_t *self);
241             static void start_writing(ev_mc_t *self);
242             static void stop_writing(ev_mc_t *self);
243             static void start_connect(pTHX_ ev_mc_t *self);
244             static void cleanup_connection(pTHX_ ev_mc_t *self);
245             static void emit_error(pTHX_ ev_mc_t *self, const char *msg);
246             static void handle_disconnect(pTHX_ ev_mc_t *self, const char *reason);
247             static void schedule_reconnect(pTHX_ ev_mc_t *self);
248             static void apply_keepalive(ev_mc_t *self);
249             static void report_connect_error(pTHX_ ev_mc_t *self, const char *errbuf);
250             static void finish_connect_success(pTHX_ ev_mc_t *self);
251             static void mc_send_sasl_auth(pTHX_ ev_mc_t *self, SV *cb);
252             static void stop_connect_timer(ev_mc_t *self);
253             static void stop_reconnect_timer(ev_mc_t *self);
254             static void stop_waiting_timer(ev_mc_t *self);
255             static void send_next_waiting(pTHX_ ev_mc_t *self);
256             static int check_destroyed(ev_mc_t *self);
257             static void cancel_pending(pTHX_ ev_mc_t *self, SV *err_sv);
258             static void cancel_waiting(pTHX_ ev_mc_t *self, SV *err_sv);
259              
260             /* ================================================================
261             * Binary protocol helpers (portable, no unaligned access)
262             * ================================================================ */
263              
264 0           static void mc_write_u16(char *buf, uint16_t val) {
265 0           val = htons(val);
266 0           memcpy(buf, &val, 2);
267 0           }
268              
269 0           static void mc_write_u32(char *buf, uint32_t val) {
270 0           val = htonl(val);
271 0           memcpy(buf, &val, 4);
272 0           }
273              
274 0           static void mc_write_u64(char *buf, uint64_t val) {
275 0           uint32_t hi = htonl((uint32_t)(val >> 32));
276 0           uint32_t lo = htonl((uint32_t)(val & 0xFFFFFFFF));
277 0           memcpy(buf, &hi, 4);
278 0           memcpy(buf + 4, &lo, 4);
279 0           }
280              
281 0           static uint16_t mc_read_u16(const char *buf) {
282             uint16_t val;
283 0           memcpy(&val, buf, 2);
284 0           return ntohs(val);
285             }
286              
287 0           static uint32_t mc_read_u32(const char *buf) {
288             uint32_t val;
289 0           memcpy(&val, buf, 4);
290 0           return ntohl(val);
291             }
292              
293 0           static uint64_t mc_read_u64(const char *buf) {
294             uint32_t hi, lo;
295 0           memcpy(&hi, buf, 4);
296 0           memcpy(&lo, buf + 4, 4);
297 0           return ((uint64_t)ntohl(hi) << 32) | ntohl(lo);
298             }
299              
300 0           static void mc_encode_header(char *buf, uint8_t opcode, uint16_t key_len,
301             uint8_t extras_len, uint32_t body_len, uint32_t opaque, uint64_t cas)
302             {
303 0           buf[0] = MC_REQ_MAGIC;
304 0           buf[1] = opcode;
305 0           mc_write_u16(buf + 2, key_len);
306 0           buf[4] = extras_len;
307 0           buf[5] = 0; /* data_type = raw */
308 0           mc_write_u16(buf + 6, 0); /* vbucket / reserved */
309 0           mc_write_u32(buf + 8, body_len);
310 0           mc_write_u32(buf + 12, opaque);
311 0           mc_write_u64(buf + 16, cas);
312 0           }
313              
314             /* ================================================================
315             * Status code to string
316             * ================================================================ */
317              
318 0           static const char* mc_status_str(uint16_t status) {
319 0           switch (status) {
320 0           case MC_STATUS_OK: return "OK";
321 0           case MC_STATUS_KEY_NOT_FOUND: return "NOT_FOUND";
322 0           case MC_STATUS_KEY_EXISTS: return "EXISTS";
323 0           case MC_STATUS_VALUE_TOO_LARGE: return "VALUE_TOO_LARGE";
324 0           case MC_STATUS_INVALID_ARGS: return "INVALID_ARGUMENTS";
325 0           case MC_STATUS_NOT_STORED: return "NOT_STORED";
326 0           case MC_STATUS_NON_NUMERIC: return "NON_NUMERIC_VALUE";
327 0           case MC_STATUS_AUTH_ERROR: return "AUTH_ERROR";
328 0           case MC_STATUS_AUTH_CONTINUE: return "AUTH_CONTINUE";
329 0           case MC_STATUS_UNKNOWN_CMD: return "UNKNOWN_COMMAND";
330 0           case MC_STATUS_OUT_OF_MEMORY: return "OUT_OF_MEMORY";
331 0           default: return "UNKNOWN_ERROR";
332             }
333             }
334              
335             /* ================================================================
336             * Buffer management
337             * ================================================================ */
338              
339 0           static void buf_ensure_write(ev_mc_t *self, size_t needed) {
340             /* Compact first: reclaim any already-sent prefix. The capacity check
341             below short-circuits the "compaction alone sufficed" case. */
342 0 0         if (self->wbuf_off > 0) {
343 0           size_t live = self->wbuf_len - self->wbuf_off;
344 0           memmove(self->wbuf, self->wbuf + self->wbuf_off, live);
345 0           self->wbuf_len = live;
346 0           self->wbuf_off = 0;
347             }
348 0           size_t total = self->wbuf_len + needed;
349 0 0         if (self->wbuf_cap >= total) return;
350 0 0         size_t new_cap = self->wbuf_cap ? self->wbuf_cap : BUF_INIT_SIZE;
351 0 0         while (new_cap < total) new_cap *= 2;
352 0           Renew(self->wbuf, new_cap, char);
353 0           self->wbuf_cap = new_cap;
354             }
355              
356 0           static void buf_ensure_read(ev_mc_t *self, size_t needed) {
357 0           size_t total = self->rbuf_len + needed;
358 0 0         if (self->rbuf_cap >= total) return;
359 0 0         size_t new_cap = self->rbuf_cap ? self->rbuf_cap : BUF_INIT_SIZE;
360 0 0         while (new_cap < total) new_cap *= 2;
361 0           Renew(self->rbuf, new_cap, char);
362 0           self->rbuf_cap = new_cap;
363             }
364              
365 0           static void buf_append_write(ev_mc_t *self, const char *data, size_t len) {
366 0           buf_ensure_write(self, len);
367 0           memcpy(self->wbuf + self->wbuf_len, data, len);
368 0           self->wbuf_len += len;
369 0           }
370              
371             /* ================================================================
372             * Watcher helpers
373             * ================================================================ */
374              
375 0           static void start_reading(ev_mc_t *self) {
376 0 0         if (!self->reading && self->fd >= 0) {
    0          
377 0           ev_io_start(self->loop, &self->rio);
378 0           self->reading = 1;
379             }
380 0           }
381              
382 0           static void stop_reading(ev_mc_t *self) {
383 0 0         if (self->reading) {
384 0           ev_io_stop(self->loop, &self->rio);
385 0           self->reading = 0;
386             }
387 0           }
388              
389 0           static void start_writing(ev_mc_t *self) {
390 0 0         if (!self->writing && self->fd >= 0) {
    0          
391 0           ev_io_start(self->loop, &self->wio);
392 0           self->writing = 1;
393             }
394 0           }
395              
396 0           static void stop_writing(ev_mc_t *self) {
397 0 0         if (self->writing) {
398 0           ev_io_stop(self->loop, &self->wio);
399 0           self->writing = 0;
400             }
401 0           }
402              
403 0           static int check_destroyed(ev_mc_t *self) {
404 0 0         if (self->magic == MC_MAGIC_FREED && self->callback_depth == 0) {
    0          
405 0           Safefree(self);
406 0           return 1;
407             }
408 0           return 0;
409             }
410              
411             /* ================================================================
412             * Callback invocation
413             * ================================================================ */
414              
415             /* Invoke Perl callback with two arguments.
416             * result and error are newly created SVs (refcount=1) that become mortal.
417             * Pass NULL for undef. */
418 0           static void invoke_cb(pTHX_ ev_mc_t *self, SV *cb, SV *result, SV *error) {
419 0 0         if (!cb) return;
420              
421 0           self->callback_depth++;
422              
423 0           dSP;
424 0           ENTER;
425 0           SAVETMPS;
426 0 0         PUSHMARK(SP);
427 0 0         EXTEND(SP, 2);
428 0 0         if (result)
429 0           mPUSHs(result);
430             else
431 0           PUSHs(&PL_sv_undef);
432 0 0         if (error)
433 0           mPUSHs(error);
434             else
435 0           PUSHs(&PL_sv_undef);
436 0           PUTBACK;
437 0           call_sv(cb, G_DISCARD | G_EVAL);
438 0 0         if (SvTRUE(ERRSV)) {
    0          
439 0 0         warn("EV::Memcached: callback error: %s", SvPV_nolen(ERRSV));
440 0 0         sv_setsv(ERRSV, &PL_sv_undef);
441             }
442 0 0         FREETMPS;
443 0           LEAVE;
444              
445 0           self->callback_depth--;
446             }
447              
448             /* Invoke a user handler with at most one mortal arg; catch exceptions
449             so a die in user code can't unwind through libev. */
450 0           static void invoke_handler(pTHX_ ev_mc_t *self, SV *cb, SV *arg, const char *label) {
451 0 0         if (NULL == cb) { if (arg) SvREFCNT_dec(arg); return; }
    0          
452              
453 0           self->callback_depth++;
454              
455 0           dSP;
456 0           ENTER;
457 0           SAVETMPS;
458 0 0         PUSHMARK(SP);
459 0 0         if (arg) XPUSHs(sv_2mortal(arg));
    0          
460 0           PUTBACK;
461 0           call_sv(cb, G_DISCARD | G_EVAL);
462 0 0         if (SvTRUE(ERRSV)) {
    0          
463 0 0         warn("EV::Memcached: %s callback error: %s", label, SvPV_nolen(ERRSV));
464 0 0         sv_setsv(ERRSV, &PL_sv_undef);
465             }
466 0 0         FREETMPS;
467 0           LEAVE;
468              
469 0           self->callback_depth--;
470             }
471              
472 0           static void emit_error(pTHX_ ev_mc_t *self, const char *msg) {
473 0           invoke_handler(aTHX_ self, self->on_error, newSVpv(msg, 0), "on_error");
474 0           }
475              
476 0           static void emit_connect(pTHX_ ev_mc_t *self) {
477 0           invoke_handler(aTHX_ self, self->on_connect, NULL, "on_connect");
478 0           }
479              
480 0           static void emit_disconnect(pTHX_ ev_mc_t *self) {
481 0           invoke_handler(aTHX_ self, self->on_disconnect, NULL, "on_disconnect");
482 0           }
483              
484 0           static void apply_keepalive(ev_mc_t *self) {
485 0 0         if (self->keepalive <= 0 || self->path) return;
    0          
486 0           int one = 1;
487 0           setsockopt(self->fd, SOL_SOCKET, SO_KEEPALIVE, &one, sizeof(one));
488             #ifdef TCP_KEEPIDLE
489 0           setsockopt(self->fd, IPPROTO_TCP, TCP_KEEPIDLE,
490 0           &self->keepalive, sizeof(self->keepalive));
491             #endif
492             }
493              
494             /* Common tail for synchronous connect-failure paths in start_connect:
495             emit error, run pending callbacks, and arm reconnect if configured.
496             Caller returns immediately after invoking. */
497 0           static void report_connect_error(pTHX_ ev_mc_t *self, const char *errbuf) {
498 0           self->callback_depth++;
499 0           emit_error(aTHX_ self, errbuf);
500 0           self->callback_depth--;
501 0 0         if (check_destroyed(self)) return;
502 0 0         if (!self->intentional_disconnect && self->reconnect)
    0          
503 0           schedule_reconnect(aTHX_ self);
504             }
505              
506             /* Shared post-connect-success path used by both on_connect_complete (after
507             async EINPROGRESS resolves) and start_connect (when connect(2) returns
508             immediately). Caller must already have set self->connected = 1 and
509             stopped/initialized the io watchers as required by its path. */
510 0           static void finish_connect_success(pTHX_ ev_mc_t *self) {
511 0           self->reconnect_attempts = 0;
512              
513 0           start_reading(self);
514 0           apply_keepalive(self);
515              
516 0           mc_send_sasl_auth(aTHX_ self, NULL);
517              
518 0           emit_connect(aTHX_ self);
519 0 0         if (check_destroyed(self)) return;
520              
521             /* Drain wait_queue immediately unless we are waiting for SASL_AUTH
522             to complete; the SASL response handler calls send_next_waiting. */
523 0 0         if (!self->username || !self->password)
    0          
524 0           send_next_waiting(aTHX_ self);
525             }
526              
527             /* ================================================================
528             * Callback entry management
529             * ================================================================ */
530              
531 0           static ev_mc_cb_t* alloc_cbt(void) {
532             ev_mc_cb_t *cbt;
533 0           Newxz(cbt, 1, ev_mc_cb_t);
534 0           return cbt;
535             }
536              
537 0           static void cleanup_cbt(pTHX_ ev_mc_cb_t *cbt) {
538 0 0         CLEAR_HANDLER(cbt->cb);
539 0 0         if (cbt->stats_hv) {
540 0           SvREFCNT_dec((SV*)cbt->stats_hv);
541 0           cbt->stats_hv = NULL;
542             }
543 0 0         if ((cbt->cmd == CB_CMD_MGET_FENCE || cbt->cmd == CB_CMD_MGETS_FENCE) && cbt->mget_results) {
    0          
    0          
544 0           SvREFCNT_dec((SV*)cbt->mget_results);
545 0           cbt->mget_results = NULL;
546             }
547             /* MGET_ENTRY has borrowed ref - don't decrement */
548 0           Safefree(cbt);
549 0           }
550              
551 0           static void cleanup_wait(pTHX_ ev_mc_wait_t *wt) {
552 0 0         CLEAR_HANDLER(wt->cb);
553 0 0         if (wt->packet) { Safefree(wt->packet); wt->packet = NULL; }
554 0 0         if (wt->stats_hv) {
555 0           SvREFCNT_dec((SV*)wt->stats_hv);
556 0           wt->stats_hv = NULL;
557             }
558 0 0         if ((wt->cmd == CB_CMD_MGET_FENCE || wt->cmd == CB_CMD_MGETS_FENCE) && wt->mget_results) {
    0          
    0          
559 0           SvREFCNT_dec((SV*)wt->mget_results);
560 0           wt->mget_results = NULL;
561             }
562 0           Safefree(wt);
563 0           }
564              
565             /* ================================================================
566             * Cancel pending/waiting commands (on disconnect)
567             * ================================================================ */
568              
569             /* mark_skipped: if true, set cbt->skipped=1 and invoke ALL callbacks (skip_pending behavior).
570             * if false, only invoke non-skipped callbacks (cancel_pending behavior).
571             *
572             * Callers MUST bump callback_depth before calling this (and call
573             * check_destroyed after) — DESTROY sets magic=FREED before running
574             * us, so an unconditional check_destroyed here would Safefree(self)
575             * mid-loop. The depth bump pins self until the caller is done. */
576 0           static void cancel_pending_impl(pTHX_ ev_mc_t *self, SV *err_sv, int mark_skipped) {
577 0 0         if (self->in_cb_cleanup) return;
578 0           self->in_cb_cleanup = 1;
579              
580 0 0         while (!ngx_queue_empty(&self->cb_queue)) {
581 0           ngx_queue_t *q = ngx_queue_head(&self->cb_queue);
582 0           ev_mc_cb_t *cbt = ngx_queue_data(q, ev_mc_cb_t, queue);
583 0           ngx_queue_remove(q);
584 0 0         if (cbt->counted) self->pending_count--;
585              
586 0           int already_skipped = cbt->skipped;
587 0 0         if (mark_skipped) cbt->skipped = 1;
588 0 0         if (cbt->cb && !already_skipped) {
    0          
589 0           invoke_cb(aTHX_ self, cbt->cb, NULL, newSVsv(err_sv));
590 0 0         if (self->magic == MC_MAGIC_FREED) {
591 0           cleanup_cbt(aTHX_ cbt);
592 0           self->in_cb_cleanup = 0;
593 0           return;
594             }
595             }
596 0           cleanup_cbt(aTHX_ cbt);
597             }
598 0           self->pending_count = 0;
599 0           self->in_cb_cleanup = 0;
600             }
601              
602 0           static void cancel_pending(pTHX_ ev_mc_t *self, SV *err_sv) {
603 0           cancel_pending_impl(aTHX_ self, err_sv, 0);
604 0           }
605              
606 0           static void cancel_waiting(pTHX_ ev_mc_t *self, SV *err_sv) {
607 0 0         if (self->in_wait_cleanup) return;
608 0           self->in_wait_cleanup = 1;
609              
610 0 0         while (!ngx_queue_empty(&self->wait_queue)) {
611 0           ngx_queue_t *q = ngx_queue_head(&self->wait_queue);
612 0           ev_mc_wait_t *wt = ngx_queue_data(q, ev_mc_wait_t, queue);
613 0           ngx_queue_remove(q);
614 0           self->waiting_count--;
615              
616 0 0         if (wt->cb) {
617 0           invoke_cb(aTHX_ self, wt->cb, NULL, newSVsv(err_sv));
618 0 0         if (self->magic == MC_MAGIC_FREED) {
619 0           cleanup_wait(aTHX_ wt);
620 0           self->in_wait_cleanup = 0;
621             /* Caller bumps depth before calling us (disconnect XS,
622             skip_waiting XS, or io_cb path), so check_destroyed
623             here would prematurely free during DESTROY where
624             magic is already FREED before we run. */
625 0           return;
626             }
627             }
628 0           cleanup_wait(aTHX_ wt);
629             }
630 0           self->waiting_count = 0;
631 0           self->in_wait_cleanup = 0;
632             }
633              
634             /* ================================================================
635             * Connection cleanup and disconnect handling
636             * ================================================================ */
637              
638 0           static void cleanup_connection(pTHX_ ev_mc_t *self) {
639 0           stop_reading(self);
640 0           stop_writing(self);
641              
642 0           stop_connect_timer(self);
643 0           disarm_cmd_timer(self);
644 0           stop_waiting_timer(self);
645              
646 0 0         if (self->fd >= 0) {
647 0           close(self->fd);
648 0           self->fd = -1;
649             }
650              
651 0           self->connected = 0;
652 0           self->connecting = 0;
653 0           self->rbuf_len = 0;
654 0           self->wbuf_len = 0;
655 0           self->wbuf_off = 0;
656 0           }
657              
658 0           static void handle_disconnect(pTHX_ ev_mc_t *self, const char *reason) {
659 0           int was_connected = self->connected;
660              
661 0           cleanup_connection(aTHX_ self);
662              
663             /* Cancel pending commands */
664 0           cancel_pending(aTHX_ self, err_disconnected);
665 0 0         if (self->magic == MC_MAGIC_FREED) return;
666              
667 0 0         if (!self->resume_waiting_on_reconnect) {
668 0           cancel_waiting(aTHX_ self, err_disconnected);
669 0 0         if (self->magic == MC_MAGIC_FREED) return;
670             }
671              
672 0 0         if (was_connected) {
673 0           emit_disconnect(aTHX_ self);
674 0 0         if (check_destroyed(self)) return;
675             }
676              
677 0 0         if (reason) {
678 0           emit_error(aTHX_ self, reason);
679 0 0         if (check_destroyed(self)) return;
680             }
681              
682 0 0         if (!self->intentional_disconnect && self->reconnect) {
    0          
683 0           schedule_reconnect(aTHX_ self);
684             }
685             }
686              
687             /* ================================================================
688             * Reconnection
689             * ================================================================ */
690              
691 0           static void schedule_reconnect(pTHX_ ev_mc_t *self) {
692 0 0         if (self->reconnect_timer_active) return;
693 0 0         if (self->max_reconnect_attempts > 0 &&
694 0 0         self->reconnect_attempts >= self->max_reconnect_attempts) {
695 0           emit_error(aTHX_ self, "max reconnect attempts reached");
696 0           return;
697             }
698              
699 0           self->reconnect_attempts++;
700              
701             /* Always defer through a timer (even with delay==0) so that an
702             immediate connect failure cannot recurse:
703             schedule_reconnect -> start_connect -> report_connect_error ->
704             schedule_reconnect -> ... blowing the C stack. */
705 0           ev_tstamp delay = (ev_tstamp)self->reconnect_delay_ms / 1000.0;
706 0 0         if (delay < 0) delay = 0;
707 0           ev_timer_init(&self->reconnect_timer, reconnect_timer_cb, delay, 0.0);
708 0           self->reconnect_timer.data = (void *)self;
709 0           ev_timer_start(self->loop, &self->reconnect_timer);
710 0           self->reconnect_timer_active = 1;
711             }
712              
713 0           static void arm_cmd_timer(ev_mc_t *self) {
714 0 0         if (self->command_timeout_ms <= 0) return;
715 0 0         if (!self->connected) return;
716 0           ev_tstamp timeout = (ev_tstamp)self->command_timeout_ms / 1000.0;
717 0 0         if (self->cmd_timer_active) {
718             /* Adjust repeat in-place; ev_timer_again restarts using the new value.
719             ev_timer_set is forbidden on an active watcher per libev docs. */
720 0           self->cmd_timer.repeat = timeout;
721 0           ev_timer_again(self->loop, &self->cmd_timer);
722             } else {
723 0           ev_timer_init(&self->cmd_timer, cmd_timeout_cb, timeout, timeout);
724 0           self->cmd_timer.data = (void *)self;
725 0           ev_timer_start(self->loop, &self->cmd_timer);
726 0           self->cmd_timer_active = 1;
727             }
728             }
729              
730 0           static void disarm_cmd_timer(ev_mc_t *self) {
731 0 0         if (self->cmd_timer_active) {
732 0           ev_timer_stop(self->loop, &self->cmd_timer);
733 0           self->cmd_timer_active = 0;
734             }
735 0           }
736              
737 0           static void stop_connect_timer(ev_mc_t *self) {
738 0 0         if (self->connect_timer_active) {
739 0           ev_timer_stop(self->loop, &self->connect_timer);
740 0           self->connect_timer_active = 0;
741             }
742 0           }
743              
744 0           static void stop_reconnect_timer(ev_mc_t *self) {
745 0 0         if (self->reconnect_timer_active) {
746 0           ev_timer_stop(self->loop, &self->reconnect_timer);
747 0           self->reconnect_timer_active = 0;
748             }
749 0           }
750              
751 0           static void stop_waiting_timer(ev_mc_t *self) {
752 0 0         if (self->waiting_timer_active) {
753 0           ev_timer_stop(self->loop, &self->waiting_timer);
754 0           self->waiting_timer_active = 0;
755             }
756 0           }
757              
758 0           static void cmd_timeout_cb(EV_P_ ev_timer *w, int revents) {
759 0           ev_mc_t *self = (ev_mc_t *)w->data;
760             (void)loop; (void)revents;
761              
762 0 0         if (self->magic != MC_MAGIC_ALIVE) return;
763              
764 0 0         if (ngx_queue_empty(&self->cb_queue)) {
765 0           disarm_cmd_timer(self);
766 0           return;
767             }
768              
769             /* Stop the repeating timer before disconnect */
770 0           disarm_cmd_timer(self);
771 0           self->callback_depth++;
772 0           handle_disconnect(aTHX_ self, "command timeout");
773 0           self->callback_depth--;
774 0           check_destroyed(self);
775             }
776              
777             /* Send SASL PLAIN auth. If cb is NULL, creates an internal callback
778             * that disconnects on auth failure. */
779 0           static void mc_send_sasl_auth(pTHX_ ev_mc_t *self, SV *cb) {
780 0 0         if (!self->username || !self->password) return;
    0          
781              
782 0           size_t ulen = strlen(self->username);
783 0           size_t plen = strlen(self->password);
784 0           size_t vlen = 1 + ulen + 1 + plen;
785             char *authdata;
786 0           Newx(authdata, vlen, char);
787 0           authdata[0] = '\0';
788 0           memcpy(authdata + 1, self->username, ulen);
789 0           authdata[1 + ulen] = '\0';
790 0           memcpy(authdata + 2 + ulen, self->password, plen);
791              
792 0           mc_enqueue_cmd(aTHX_ self, MC_OP_SASL_AUTH, "PLAIN", 5,
793             authdata, vlen, NULL, 0, 0, CB_CMD_SASL_AUTH, 0, cb);
794 0           Safefree(authdata);
795             }
796              
797 0           static void connect_timeout_cb(EV_P_ ev_timer *w, int revents) {
798 0           ev_mc_t *self = (ev_mc_t *)w->data;
799             (void)loop; (void)revents;
800              
801 0 0         if (self->magic != MC_MAGIC_ALIVE) return;
802              
803 0           self->connect_timer_active = 0;
804 0           self->callback_depth++;
805 0           handle_disconnect(aTHX_ self, "connect timeout");
806 0           self->callback_depth--;
807 0           check_destroyed(self);
808             }
809              
810 0           static void reconnect_timer_cb(EV_P_ ev_timer *w, int revents) {
811 0           ev_mc_t *self = (ev_mc_t *)w->data;
812             (void)loop; (void)revents;
813              
814 0 0         if (self->magic != MC_MAGIC_ALIVE) return;
815              
816 0           self->reconnect_timer_active = 0;
817 0           self->callback_depth++;
818 0           start_connect(aTHX_ self);
819 0           self->callback_depth--;
820 0           check_destroyed(self);
821             }
822              
823             /* ================================================================
824             * Flow control: waiting timer
825             * ================================================================ */
826              
827             static void schedule_waiting_timer(ev_mc_t *self);
828              
829 0           static void expire_waiting_commands(pTHX_ ev_mc_t *self) {
830 0           ev_tstamp now = ev_now(self->loop);
831 0           ev_tstamp timeout = (ev_tstamp)self->waiting_timeout_ms / 1000.0;
832              
833 0 0         while (!ngx_queue_empty(&self->wait_queue)) {
834 0           ngx_queue_t *q = ngx_queue_head(&self->wait_queue);
835 0           ev_mc_wait_t *wt = ngx_queue_data(q, ev_mc_wait_t, queue);
836              
837 0 0         if (wt->queued_at + timeout > now) break; /* not expired yet */
838              
839 0           ngx_queue_remove(q);
840 0           self->waiting_count--;
841              
842 0 0         if (wt->cb) {
843 0           invoke_cb(aTHX_ self, wt->cb, NULL, newSVsv(err_waiting_timeout));
844 0 0         if (self->magic == MC_MAGIC_FREED) {
845 0           cleanup_wait(aTHX_ wt);
846 0           return;
847             }
848             }
849 0           cleanup_wait(aTHX_ wt);
850             }
851             }
852              
853 0           static void waiting_timer_cb(EV_P_ ev_timer *w, int revents) {
854 0           ev_mc_t *self = (ev_mc_t *)w->data;
855             (void)loop; (void)revents;
856              
857 0 0         if (self->magic != MC_MAGIC_ALIVE) return;
858              
859 0           self->waiting_timer_active = 0;
860 0           self->callback_depth++;
861 0           expire_waiting_commands(aTHX_ self);
862 0           self->callback_depth--;
863 0 0         if (check_destroyed(self)) return;
864              
865 0 0         if (!ngx_queue_empty(&self->wait_queue) && self->waiting_timeout_ms > 0)
    0          
866 0           schedule_waiting_timer(self);
867             }
868              
869 0           static void schedule_waiting_timer(ev_mc_t *self) {
870 0 0         if (self->waiting_timer_active) return;
871 0 0         if (self->waiting_timeout_ms <= 0) return;
872 0 0         if (ngx_queue_empty(&self->wait_queue)) return;
873              
874 0           ngx_queue_t *q = ngx_queue_head(&self->wait_queue);
875 0           ev_mc_wait_t *wt = ngx_queue_data(q, ev_mc_wait_t, queue);
876 0           ev_tstamp timeout = (ev_tstamp)self->waiting_timeout_ms / 1000.0;
877 0           ev_tstamp delay = (wt->queued_at + timeout) - ev_now(self->loop);
878 0 0         if (delay < 0.0) delay = 0.0;
879              
880 0           ev_timer_init(&self->waiting_timer, waiting_timer_cb, delay, 0.0);
881 0           self->waiting_timer.data = (void *)self;
882 0           ev_timer_start(self->loop, &self->waiting_timer);
883 0           self->waiting_timer_active = 1;
884             }
885              
886             /* ================================================================
887             * Send next waiting command
888             * ================================================================ */
889              
890 0           static void send_next_waiting(pTHX_ ev_mc_t *self) {
891 0 0         while (!ngx_queue_empty(&self->wait_queue) && self->connected) {
    0          
892 0           ngx_queue_t *q = ngx_queue_head(&self->wait_queue);
893 0           ev_mc_wait_t *wt = ngx_queue_data(q, ev_mc_wait_t, queue);
894              
895             /* max_pending gates only counted entries; mc_enqueue_cmd uses the
896             same exception (|| !counted). Fire-and-forget (no_response) and
897             mget GETKQ entries are uncounted and must not be blocked. */
898 0 0         if (self->max_pending > 0 && self->pending_count >= self->max_pending
    0          
899 0 0         && !wt->no_response && wt->counted)
    0          
900 0           break;
901              
902 0           ngx_queue_remove(q);
903 0           self->waiting_count--;
904              
905             /* Append packet to write buffer */
906 0           buf_append_write(self, wt->packet, wt->packet_len);
907              
908 0 0         if (!wt->no_response) {
909 0           ev_mc_cb_t *cbt = alloc_cbt();
910 0           cbt->cb = wt->cb; wt->cb = NULL;
911 0           cbt->opaque = wt->opaque;
912 0           cbt->cmd = wt->cmd;
913 0           cbt->quiet = wt->quiet;
914 0           cbt->counted = wt->counted;
915 0           cbt->stats_hv = wt->stats_hv; wt->stats_hv = NULL;
916 0           cbt->mget_results = wt->mget_results; wt->mget_results = NULL;
917 0           ngx_queue_insert_tail(&self->cb_queue, &cbt->queue);
918 0 0         if (cbt->counted) self->pending_count++;
919 0           arm_cmd_timer(self);
920             }
921              
922 0           Safefree(wt->packet);
923 0           Safefree(wt);
924              
925 0           start_writing(self);
926             }
927 0           }
928              
929             /* ================================================================
930             * Build and enqueue a command
931             * ================================================================ */
932              
933             /* Allocate the next opaque, skipping 0 across wraparound (0 is reserved
934             * for fire-and-forget commands so error responses don't get matched to
935             * the wrong cb_queue head). */
936 0           static uint32_t mc_next_opaque(ev_mc_t *self) {
937 0           uint32_t op = self->next_opaque++;
938 0 0         if (self->next_opaque == 0) self->next_opaque = 1;
939 0           return op;
940             }
941              
942             /* Encode a complete request packet (header + extras + key + value) into
943             * the caller-supplied buffer p, which must be at least
944             * MC_HEADER_SIZE + extras_len + key_len + value_len bytes. */
945 0           static void mc_pack(char *p, uint8_t opcode,
946             const char *key, uint16_t key_len,
947             const char *value, uint32_t value_len,
948             const char *extras, uint8_t extras_len,
949             uint32_t opaque, uint64_t cas)
950             {
951 0           uint32_t body_len = (uint32_t)extras_len + (uint32_t)key_len + value_len;
952 0           mc_encode_header(p, opcode, key_len, extras_len, body_len, opaque, cas);
953 0           p += MC_HEADER_SIZE;
954 0 0         if (extras_len > 0) { memcpy(p, extras, extras_len); p += extras_len; }
955 0 0         if (key_len > 0) { memcpy(p, key, key_len); p += key_len; }
956 0 0         if (value_len > 0) { memcpy(p, value, value_len); }
957 0           }
958              
959             /* Fire-and-forget using a quiet opcode (SETQ/FLUSHQ/...). No cb_queue
960             * entry; opaque is always 0 (server only responds on error and that
961             * response is silently discarded by handle_response_packet). */
962 0           static void mc_fire_and_forget(pTHX_ ev_mc_t *self,
963             uint8_t opcode, const char *key, STRLEN key_len,
964             const char *value, STRLEN value_len,
965             const char *extras, uint8_t extras_len,
966             uint64_t cas)
967             {
968 0 0         if (key_len > MC_MAX_KEY_LEN)
969 0           croak("key too long (%d bytes, max %d)", (int)key_len, MC_MAX_KEY_LEN);
970              
971 0           uint32_t body_len = extras_len + (uint32_t)key_len + (uint32_t)value_len;
972 0           size_t packet_len = MC_HEADER_SIZE + body_len;
973              
974 0 0         if (self->connected) {
975 0           buf_ensure_write(self, packet_len);
976 0           mc_pack(self->wbuf + self->wbuf_len, opcode,
977 0           key, (uint16_t)key_len, value, (uint32_t)value_len,
978             extras, extras_len, 0, cas);
979 0           self->wbuf_len += packet_len;
980 0           start_writing(self);
981             } else {
982             /* Not yet connected (or reconnecting): queue and drain after connect. */
983             ev_mc_wait_t *wt;
984 0           Newxz(wt, 1, ev_mc_wait_t);
985 0           Newx(wt->packet, packet_len, char);
986 0           wt->packet_len = packet_len;
987 0           mc_pack(wt->packet, opcode,
988 0           key, (uint16_t)key_len, value, (uint32_t)value_len,
989             extras, extras_len, 0, cas);
990 0           wt->no_response = 1;
991 0           wt->queued_at = ev_now(self->loop);
992 0           ngx_queue_insert_tail(&self->wait_queue, &wt->queue);
993 0           self->waiting_count++;
994 0 0         if (self->waiting_timeout_ms > 0)
995 0           schedule_waiting_timer(self);
996             }
997 0           }
998              
999             /* Build a binary protocol packet and enqueue for sending.
1000             * Returns the assigned opaque. */
1001 0           static uint32_t mc_enqueue_cmd(pTHX_ ev_mc_t *self,
1002             uint8_t opcode, const char *key, STRLEN key_len,
1003             const char *value, STRLEN value_len,
1004             const char *extras, uint8_t extras_len,
1005             uint64_t cas, int cmd, int quiet, SV *cb)
1006             {
1007 0 0         if (key_len > MC_MAX_KEY_LEN)
1008 0           croak("key too long (%d bytes, max %d)", (int)key_len, MC_MAX_KEY_LEN);
1009              
1010 0           uint32_t opaque = mc_next_opaque(self);
1011 0           uint32_t body_len = extras_len + (uint32_t)key_len + (uint32_t)value_len;
1012 0           size_t packet_len = MC_HEADER_SIZE + body_len;
1013              
1014 0 0         int counted = (cmd != CB_CMD_MGET_ENTRY && cmd != CB_CMD_MGETS_ENTRY);
    0          
1015              
1016             /* Check if we can send immediately */
1017 0 0         int can_send = self->connected &&
1018 0 0         (self->max_pending <= 0 || self->pending_count < self->max_pending || !counted);
    0          
    0          
1019              
1020 0 0         if (can_send) {
1021 0           buf_ensure_write(self, packet_len);
1022 0           mc_pack(self->wbuf + self->wbuf_len, opcode,
1023 0           key, (uint16_t)key_len, value, (uint32_t)value_len,
1024             extras, extras_len, opaque, cas);
1025 0           self->wbuf_len += packet_len;
1026              
1027 0           ev_mc_cb_t *cbt = alloc_cbt();
1028 0 0         if (cb) { cbt->cb = newSVsv(cb); }
1029 0           cbt->opaque = opaque;
1030 0           cbt->cmd = cmd;
1031 0           cbt->quiet = quiet;
1032 0           cbt->counted = counted;
1033 0 0         if (cmd == CB_CMD_STATS) {
1034 0           cbt->stats_hv = newHV();
1035             }
1036 0           ngx_queue_insert_tail(&self->cb_queue, &cbt->queue);
1037 0 0         if (counted) self->pending_count++;
1038              
1039 0           arm_cmd_timer(self);
1040 0           start_writing(self);
1041             } else {
1042             /* Queue for later */
1043             ev_mc_wait_t *wt;
1044 0           Newxz(wt, 1, ev_mc_wait_t);
1045 0           Newx(wt->packet, packet_len, char);
1046 0           wt->packet_len = packet_len;
1047 0           mc_pack(wt->packet, opcode,
1048 0           key, (uint16_t)key_len, value, (uint32_t)value_len,
1049             extras, extras_len, opaque, cas);
1050              
1051 0 0         if (cb) { wt->cb = newSVsv(cb); }
1052 0           wt->opaque = opaque;
1053 0           wt->cmd = cmd;
1054 0           wt->quiet = quiet;
1055 0           wt->counted = counted;
1056 0           wt->queued_at = ev_now(self->loop);
1057 0 0         if (cmd == CB_CMD_STATS) {
1058 0           wt->stats_hv = newHV();
1059             }
1060              
1061 0           ngx_queue_insert_tail(&self->wait_queue, &wt->queue);
1062 0           self->waiting_count++;
1063              
1064 0 0         if (self->waiting_timeout_ms > 0)
1065 0           schedule_waiting_timer(self);
1066             }
1067              
1068 0           return opaque;
1069             }
1070              
1071             /* ================================================================
1072             * Multi-get: GETKQ + NOOP fence
1073             * ================================================================ */
1074              
1075             /* full_info: 0 = mget (key=>value), 1 = mgets (key=>{value,flags,cas}) */
1076 0           static void mc_enqueue_mget(pTHX_ ev_mc_t *self, AV *keys_av, SV *cb, int full_info) {
1077 0           SSize_t count = av_len(keys_av) + 1;
1078 0 0         if (count <= 0) {
1079 0 0         if (cb) {
1080 0           HV *hv = newHV();
1081 0           self->callback_depth++;
1082 0           invoke_cb(aTHX_ self, cb, newRV_noinc((SV*)hv), NULL);
1083 0           self->callback_depth--;
1084 0           check_destroyed(self);
1085             }
1086 0           return;
1087             }
1088              
1089 0 0         int entry_cmd = full_info ? CB_CMD_MGETS_ENTRY : CB_CMD_MGET_ENTRY;
1090 0 0         int fence_cmd = full_info ? CB_CMD_MGETS_FENCE : CB_CMD_MGET_FENCE;
1091              
1092             SSize_t i;
1093              
1094             /* Pre-validate all key lengths before mutating wbuf / queues / refcounts.
1095             Croaking mid-loop would leave half-built GETKQ packets queued without
1096             a NOOP fence, corrupting opaque ordering on the wire. */
1097 0 0         for (i = 0; i < count; i++) {
1098 0           SV **sv = av_fetch(keys_av, i, 0);
1099 0 0         if (!sv || !SvOK(*sv)) continue;
    0          
1100             STRLEN key_len;
1101 0           (void)SvPV(*sv, key_len);
1102 0 0         if (key_len > MC_MAX_KEY_LEN)
1103 0           croak("mget key too long (%d bytes, max %d)", (int)key_len, MC_MAX_KEY_LEN);
1104             }
1105              
1106 0           HV *results = newHV();
1107              
1108 0 0         for (i = 0; i < count; i++) {
1109 0           SV **sv = av_fetch(keys_av, i, 0);
1110 0 0         if (!sv || !SvOK(*sv)) continue;
    0          
1111              
1112             STRLEN key_len;
1113 0           const char *key = SvPV(*sv, key_len);
1114              
1115 0           uint32_t opaque = mc_next_opaque(self);
1116 0           uint32_t body_len = (uint32_t)key_len;
1117 0           size_t packet_len = MC_HEADER_SIZE + body_len;
1118              
1119 0           int can_send = self->connected;
1120              
1121 0 0         if (can_send) {
1122 0           buf_ensure_write(self, packet_len);
1123 0           char *p = self->wbuf + self->wbuf_len;
1124 0           mc_encode_header(p, MC_OP_GETKQ, (uint16_t)key_len, 0, body_len, opaque, 0);
1125 0           memcpy(p + MC_HEADER_SIZE, key, key_len);
1126 0           self->wbuf_len += packet_len;
1127              
1128 0           ev_mc_cb_t *cbt = alloc_cbt();
1129 0           cbt->opaque = opaque;
1130 0           cbt->cmd = entry_cmd;
1131 0           cbt->quiet = 1;
1132 0           cbt->counted = 0;
1133 0           cbt->mget_results = results;
1134 0           ngx_queue_insert_tail(&self->cb_queue, &cbt->queue);
1135             } else {
1136             ev_mc_wait_t *wt;
1137 0           Newxz(wt, 1, ev_mc_wait_t);
1138 0           Newx(wt->packet, packet_len, char);
1139 0           wt->packet_len = packet_len;
1140 0           mc_encode_header(wt->packet, MC_OP_GETKQ, (uint16_t)key_len, 0, body_len, opaque, 0);
1141 0           memcpy(wt->packet + MC_HEADER_SIZE, key, key_len);
1142 0           wt->opaque = opaque;
1143 0           wt->cmd = entry_cmd;
1144 0           wt->quiet = 1;
1145 0           wt->counted = 0;
1146 0           wt->mget_results = results;
1147 0           wt->queued_at = ev_now(self->loop);
1148 0           ngx_queue_insert_tail(&self->wait_queue, &wt->queue);
1149 0           self->waiting_count++;
1150             }
1151             }
1152              
1153             /* NOOP fence */
1154             {
1155 0           uint32_t opaque = mc_next_opaque(self);
1156 0           size_t packet_len = MC_HEADER_SIZE;
1157 0           int can_send = self->connected;
1158              
1159 0 0         if (can_send) {
1160 0           buf_ensure_write(self, packet_len);
1161 0           char *p = self->wbuf + self->wbuf_len;
1162 0           mc_encode_header(p, MC_OP_NOOP, 0, 0, 0, opaque, 0);
1163 0           self->wbuf_len += packet_len;
1164              
1165 0           ev_mc_cb_t *cbt = alloc_cbt();
1166 0 0         if (cb) { cbt->cb = newSVsv(cb); }
1167 0           cbt->opaque = opaque;
1168 0           cbt->cmd = fence_cmd;
1169 0           cbt->quiet = 0;
1170 0           cbt->counted = 1;
1171 0           cbt->mget_results = results;
1172 0           SvREFCNT_inc_simple_void_NN((SV*)results);
1173 0           ngx_queue_insert_tail(&self->cb_queue, &cbt->queue);
1174 0           self->pending_count++;
1175              
1176 0           arm_cmd_timer(self);
1177 0           start_writing(self);
1178             } else {
1179             ev_mc_wait_t *wt;
1180 0           Newxz(wt, 1, ev_mc_wait_t);
1181 0           Newx(wt->packet, packet_len, char);
1182 0           wt->packet_len = packet_len;
1183 0           mc_encode_header(wt->packet, MC_OP_NOOP, 0, 0, 0, opaque, 0);
1184 0 0         if (cb) { wt->cb = newSVsv(cb); }
1185 0           wt->opaque = opaque;
1186 0           wt->cmd = fence_cmd;
1187 0           wt->quiet = 0;
1188 0           wt->counted = 1;
1189 0           wt->mget_results = results; /* owned ref */
1190 0           SvREFCNT_inc_simple_void_NN((SV*)results);
1191 0           wt->queued_at = ev_now(self->loop);
1192 0           ngx_queue_insert_tail(&self->wait_queue, &wt->queue);
1193 0           self->waiting_count++;
1194             }
1195             }
1196              
1197             /* results starts with refcnt=1, fence owns +1 = total 2.
1198             * Drop our initial ref, fence now owns it (refcnt=1) */
1199 0           SvREFCNT_dec((SV*)results);
1200              
1201 0 0         if (self->waiting_timeout_ms > 0)
1202 0           schedule_waiting_timer(self);
1203             }
1204              
1205             /* ================================================================
1206             * Response processing
1207             * ================================================================ */
1208              
1209             /* Drain quiet entries before the given opaque.
1210             * Quiet entries that got no response are "quiet success":
1211             * - MGET_ENTRY: miss (nothing to do)
1212             * - Other quiet commands: invoke callback with success */
1213 0           static void drain_quiet_before(pTHX_ ev_mc_t *self, uint32_t opaque) {
1214 0 0         while (!ngx_queue_empty(&self->cb_queue)) {
1215 0           ngx_queue_t *q = ngx_queue_head(&self->cb_queue);
1216 0           ev_mc_cb_t *cbt = ngx_queue_data(q, ev_mc_cb_t, queue);
1217              
1218             /* Use signed comparison to handle wraparound */
1219 0 0         if ((int32_t)(cbt->opaque - opaque) >= 0) break;
1220 0 0         if (!cbt->quiet) break; /* non-quiet should always get response */
1221              
1222 0           ngx_queue_remove(q);
1223 0 0         if (cbt->counted) self->pending_count--;
1224              
1225             /* Quiet success: for storage = stored, for MGET_ENTRY = miss */
1226 0 0         if (cbt->cb && !cbt->skipped &&
    0          
1227 0 0         cbt->cmd != CB_CMD_MGET_ENTRY && cbt->cmd != CB_CMD_MGETS_ENTRY) {
    0          
1228 0           invoke_cb(aTHX_ self, cbt->cb, newSViv(1), NULL);
1229 0 0         if (self->magic == MC_MAGIC_FREED) {
1230 0           cleanup_cbt(aTHX_ cbt);
1231 0           return;
1232             }
1233             }
1234 0           cleanup_cbt(aTHX_ cbt);
1235             }
1236             }
1237              
1238 0           static void handle_response_packet(pTHX_ ev_mc_t *self) {
1239 0           const char *pkt = self->rbuf;
1240 0           uint16_t key_len = mc_read_u16(pkt + 2);
1241 0           uint8_t extras_len = (uint8_t)pkt[4];
1242 0           uint16_t status = mc_read_u16(pkt + 6);
1243 0           uint32_t body_len = mc_read_u32(pkt + 8);
1244 0           uint32_t opaque = mc_read_u32(pkt + 12);
1245 0           uint64_t cas = mc_read_u64(pkt + 16);
1246              
1247 0           const char *body = pkt + MC_HEADER_SIZE;
1248              
1249 0 0         if ((uint32_t)extras_len + key_len > body_len) {
1250 0           handle_disconnect(aTHX_ self, "malformed response: body too short");
1251 0           return;
1252             }
1253              
1254 0           const char *extras = body;
1255 0           const char *key_ptr = body + extras_len;
1256 0           const char *value_ptr = body + extras_len + key_len;
1257 0           uint32_t value_len = body_len - extras_len - key_len;
1258              
1259             /* Drain quiet entries that were skipped (no response) */
1260 0           drain_quiet_before(aTHX_ self, opaque);
1261 0 0         if (self->magic == MC_MAGIC_FREED) return;
1262              
1263             /* Find matching callback entry */
1264 0 0         if (ngx_queue_empty(&self->cb_queue)) {
1265             /* Stray response (e.g., quiet opcode error) — discard */
1266 0           return;
1267             }
1268              
1269 0           ngx_queue_t *q = ngx_queue_head(&self->cb_queue);
1270 0           ev_mc_cb_t *cbt = ngx_queue_data(q, ev_mc_cb_t, queue);
1271              
1272 0 0         if (cbt->opaque != opaque) {
1273             /* opaque=0 is fire-and-forget error response — discard silently */
1274 0 0         if (opaque == 0) return;
1275             char errbuf[128];
1276 0           snprintf(errbuf, sizeof(errbuf),
1277             "protocol error: expected opaque %u, got %u", cbt->opaque, opaque);
1278 0           handle_disconnect(aTHX_ self, errbuf);
1279 0           return;
1280             }
1281              
1282             /* STAT accumulation: don't remove from queue until terminator */
1283 0 0         if (cbt->cmd == CB_CMD_STATS && status == MC_STATUS_OK && key_len > 0) {
    0          
    0          
1284 0 0         if (cbt->stats_hv) {
1285 0           hv_store(cbt->stats_hv, key_ptr, key_len,
1286             newSVpvn(value_ptr, value_len), 0);
1287             }
1288 0           return; /* wait for more stats or terminator */
1289             }
1290              
1291             /* Remove from queue */
1292 0           ngx_queue_remove(q);
1293 0 0         if (cbt->counted) self->pending_count--;
1294              
1295             /* Handle error status */
1296 0 0         if (status != MC_STATUS_OK) {
1297             /* GET/GETS/GAT/GATS miss: return (undef, undef) */
1298 0 0         if (status == MC_STATUS_KEY_NOT_FOUND &&
1299 0 0         (cbt->cmd == CB_CMD_GET || cbt->cmd == CB_CMD_GETS ||
    0          
1300 0 0         cbt->cmd == CB_CMD_GAT || cbt->cmd == CB_CMD_GATS)) {
    0          
1301 0 0         if (cbt->cb && !cbt->skipped)
    0          
1302 0           invoke_cb(aTHX_ self, cbt->cb, NULL, NULL);
1303             }
1304 0 0         else if (cbt->cmd == CB_CMD_MGET_ENTRY || cbt->cmd == CB_CMD_MGETS_ENTRY) {
    0          
1305             /* mget miss — nothing to add */
1306             }
1307             /* SASL auth failure: disconnect (auto-auth) or report to callback */
1308 0 0         else if (cbt->cmd == CB_CMD_SASL_AUTH) {
1309 0 0         if (cbt->cb && !cbt->skipped) {
    0          
1310 0           const char *errstr = mc_status_str(status);
1311 0 0         if (value_len > 0)
1312 0           invoke_cb(aTHX_ self, cbt->cb, NULL,
1313             newSVpvf("%s: %.*s", errstr, (int)value_len, value_ptr));
1314             else
1315 0           invoke_cb(aTHX_ self, cbt->cb, NULL, newSVpv(errstr, 0));
1316 0 0         } else if (!cbt->cb) {
1317             /* Auto-auth failed — disconnect with error */
1318             char errbuf[256];
1319 0 0         if (value_len > 0)
1320 0           snprintf(errbuf, sizeof(errbuf), "SASL auth failed: %.*s",
1321             (int)value_len, value_ptr);
1322             else
1323 0           snprintf(errbuf, sizeof(errbuf), "SASL auth failed: %s",
1324             mc_status_str(status));
1325 0           cleanup_cbt(aTHX_ cbt);
1326 0           handle_disconnect(aTHX_ self, errbuf);
1327 0           return;
1328             }
1329             }
1330             else {
1331             /* Real error */
1332 0 0         if (cbt->cb && !cbt->skipped) {
    0          
1333 0           const char *errstr = mc_status_str(status);
1334             /* Include server error message if present */
1335 0 0         if (value_len > 0)
1336 0           invoke_cb(aTHX_ self, cbt->cb, NULL,
1337             newSVpvf("%s: %.*s", errstr, (int)value_len, value_ptr));
1338             else
1339 0           invoke_cb(aTHX_ self, cbt->cb, NULL, newSVpv(errstr, 0));
1340             }
1341             }
1342 0 0         if (self->magic != MC_MAGIC_FREED) {
1343 0           cleanup_cbt(aTHX_ cbt);
1344 0           send_next_waiting(aTHX_ self);
1345             } else {
1346 0           cleanup_cbt(aTHX_ cbt);
1347             }
1348 0           return;
1349             }
1350              
1351             /* Handle success */
1352 0 0         if (cbt->skipped) {
1353 0           cleanup_cbt(aTHX_ cbt);
1354 0           send_next_waiting(aTHX_ self);
1355 0           return;
1356             }
1357              
1358 0           switch (cbt->cmd) {
1359 0           case CB_CMD_GET:
1360             case CB_CMD_GAT:
1361 0 0         if (cbt->cb)
1362 0           invoke_cb(aTHX_ self, cbt->cb, newSVpvn(value_ptr, value_len), NULL);
1363 0           break;
1364              
1365 0           case CB_CMD_GETS:
1366             case CB_CMD_GATS:
1367 0 0         if (cbt->cb) {
1368 0           HV *hv = newHV();
1369 0           hv_stores(hv, "value", newSVpvn(value_ptr, value_len));
1370 0 0         if (extras_len >= 4)
1371 0           hv_stores(hv, "flags", newSVuv(mc_read_u32(extras)));
1372 0           hv_stores(hv, "cas", newSVuv(cas));
1373 0           invoke_cb(aTHX_ self, cbt->cb, newRV_noinc((SV*)hv), NULL);
1374             }
1375 0           break;
1376              
1377 0           case CB_CMD_STORE:
1378             case CB_CMD_DELETE:
1379             case CB_CMD_TOUCH:
1380             case CB_CMD_FLUSH:
1381             case CB_CMD_NOOP:
1382             case CB_CMD_QUIT:
1383 0 0         if (cbt->cb)
1384 0           invoke_cb(aTHX_ self, cbt->cb, newSViv(1), NULL);
1385 0           break;
1386              
1387 0           case CB_CMD_ARITH:
1388 0 0         if (cbt->cb) {
1389 0           uint64_t new_val = mc_read_u64(value_ptr);
1390 0           invoke_cb(aTHX_ self, cbt->cb, newSVuv((UV)new_val), NULL);
1391             }
1392 0           break;
1393              
1394 0           case CB_CMD_VERSION:
1395             case CB_CMD_SASL_LIST:
1396 0 0         if (cbt->cb)
1397 0           invoke_cb(aTHX_ self, cbt->cb, newSVpvn(value_ptr, value_len), NULL);
1398 0           break;
1399              
1400 0           case CB_CMD_SASL_AUTH:
1401 0 0         if (cbt->cb)
1402 0           invoke_cb(aTHX_ self, cbt->cb, newSViv(1), NULL);
1403             /* Auto-auth (cb==NULL): wait queue is drained by the common
1404             send_next_waiting call at the end of this switch. */
1405 0           break;
1406              
1407 0           case CB_CMD_STATS:
1408             /* Terminator (key_len==0): deliver accumulated stats */
1409 0 0         if (cbt->cb && cbt->stats_hv) {
    0          
1410 0           SV *rv = newRV_noinc((SV*)cbt->stats_hv);
1411 0           cbt->stats_hv = NULL; /* transferred ownership */
1412 0           invoke_cb(aTHX_ self, cbt->cb, rv, NULL);
1413             }
1414 0           break;
1415              
1416 0           case CB_CMD_MGET_ENTRY:
1417 0 0         if (cbt->mget_results && key_len > 0) {
    0          
1418 0           hv_store(cbt->mget_results, key_ptr, key_len,
1419             newSVpvn(value_ptr, value_len), 0);
1420             }
1421 0           break;
1422              
1423 0           case CB_CMD_MGETS_ENTRY:
1424 0 0         if (cbt->mget_results && key_len > 0) {
    0          
1425 0           HV *info = newHV();
1426 0           hv_stores(info, "value", newSVpvn(value_ptr, value_len));
1427 0 0         if (extras_len >= 4)
1428 0           hv_stores(info, "flags", newSVuv(mc_read_u32(extras)));
1429 0           hv_stores(info, "cas", newSVuv(cas));
1430 0           hv_store(cbt->mget_results, key_ptr, key_len,
1431             newRV_noinc((SV*)info), 0);
1432             }
1433 0           break;
1434              
1435 0           case CB_CMD_MGET_FENCE:
1436             case CB_CMD_MGETS_FENCE:
1437 0 0         if (cbt->cb && cbt->mget_results) {
    0          
1438 0           SV *rv = newRV_noinc((SV*)cbt->mget_results);
1439 0           cbt->mget_results = NULL;
1440 0           invoke_cb(aTHX_ self, cbt->cb, rv, NULL);
1441             }
1442 0           break;
1443             }
1444              
1445 0 0         if (self->magic != MC_MAGIC_FREED) {
1446 0           cleanup_cbt(aTHX_ cbt);
1447 0           send_next_waiting(aTHX_ self);
1448             } else {
1449 0           cleanup_cbt(aTHX_ cbt);
1450             }
1451             }
1452              
1453 0           static void process_responses(pTHX_ ev_mc_t *self) {
1454 0           int processed = 0;
1455 0 0         while (self->rbuf_len >= MC_HEADER_SIZE) {
1456             /* Verify magic byte */
1457 0 0         if ((uint8_t)self->rbuf[0] != MC_RES_MAGIC) {
1458 0           handle_disconnect(aTHX_ self, "invalid response magic byte");
1459 0           return;
1460             }
1461              
1462 0           uint32_t body_len = mc_read_u32(self->rbuf + 8);
1463             /* Sanity bound: memcached's hard limit is 128 MB; cap at 256 MB to
1464             catch garbage/MITM responses without rejecting any real reply.
1465             Also avoids size_t overflow on 32-bit perls when MC_HEADER_SIZE
1466             is added below. */
1467 0 0         if (body_len > 0x10000000u) {
1468 0           handle_disconnect(aTHX_ self, "response body too large");
1469 0           return;
1470             }
1471 0           size_t total_len = (size_t)MC_HEADER_SIZE + body_len;
1472              
1473 0 0         if (self->rbuf_len < total_len) break; /* incomplete packet */
1474              
1475 0           handle_response_packet(aTHX_ self);
1476 0 0         if (self->magic == MC_MAGIC_FREED) return;
1477 0 0         if (!self->connected) return; /* disconnect() called from callback */
1478              
1479             /* Consume from buffer */
1480 0           self->rbuf_len -= total_len;
1481 0 0         if (self->rbuf_len > 0)
1482 0           memmove(self->rbuf, self->rbuf + total_len, self->rbuf_len);
1483 0           processed++;
1484             }
1485              
1486             /* Reset command timeout on activity */
1487 0 0         if (processed > 0 && self->cmd_timer_active) {
    0          
1488 0 0         if (ngx_queue_empty(&self->cb_queue))
1489 0           disarm_cmd_timer(self);
1490             else
1491 0           arm_cmd_timer(self);
1492             }
1493             }
1494              
1495             /* ================================================================
1496             * IO callbacks
1497             * ================================================================ */
1498              
1499 0           static void on_readable(pTHX_ ev_mc_t *self) {
1500 0           buf_ensure_read(self, 4096);
1501 0           ssize_t n = read(self->fd, self->rbuf + self->rbuf_len,
1502 0           self->rbuf_cap - self->rbuf_len);
1503 0 0         if (n > 0) {
1504 0           self->rbuf_len += n;
1505 0           process_responses(aTHX_ self);
1506 0 0         } else if (n == 0) {
1507 0           handle_disconnect(aTHX_ self, "connection closed by server");
1508             } else {
1509 0 0         if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
    0          
    0          
1510 0           return;
1511             char errbuf[128];
1512 0           snprintf(errbuf, sizeof(errbuf), "read error: %s", strerror(errno));
1513 0           handle_disconnect(aTHX_ self, errbuf);
1514             }
1515             }
1516              
1517 0           static int try_write(ev_mc_t *self) {
1518 0 0         while (self->wbuf_off < self->wbuf_len) {
1519 0           ssize_t n = write(self->fd, self->wbuf + self->wbuf_off,
1520 0           self->wbuf_len - self->wbuf_off);
1521 0 0         if (n > 0) {
1522 0           self->wbuf_off += n;
1523 0 0         } else if (n < 0) {
1524 0 0         if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
    0          
    0          
1525 0           return 0; /* try again later */
1526 0           return -1; /* error */
1527             }
1528             }
1529             /* All written */
1530 0           self->wbuf_len = 0;
1531 0           self->wbuf_off = 0;
1532 0           stop_writing(self);
1533 0           return 1;
1534             }
1535              
1536 0           static void on_connect_complete(pTHX_ ev_mc_t *self) {
1537 0           int err = 0;
1538 0           socklen_t len = sizeof(err);
1539 0 0         if (getsockopt(self->fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0 || err != 0) {
    0          
1540             char errbuf[128];
1541 0           snprintf(errbuf, sizeof(errbuf), "connect failed: %s",
1542 0 0         strerror(err ? err : errno));
1543 0           close(self->fd);
1544 0           self->fd = -1;
1545 0           self->connecting = 0;
1546 0           stop_writing(self);
1547 0           stop_connect_timer(self);
1548              
1549 0           report_connect_error(aTHX_ self, errbuf);
1550 0           return;
1551             }
1552              
1553 0           self->connecting = 0;
1554 0           self->connected = 1;
1555              
1556 0           stop_writing(self);
1557 0           stop_connect_timer(self);
1558              
1559 0           finish_connect_success(aTHX_ self);
1560             }
1561              
1562 0           static void io_cb(EV_P_ ev_io *w, int revents) {
1563 0           ev_mc_t *self = (ev_mc_t *)w->data;
1564             (void)loop;
1565              
1566 0 0         if (self->magic != MC_MAGIC_ALIVE) return;
1567              
1568 0           self->callback_depth++;
1569              
1570 0 0         if (self->connecting) {
1571 0 0         if (revents & EV_WRITE) {
1572 0           on_connect_complete(aTHX_ self);
1573             }
1574 0           self->callback_depth--;
1575 0           check_destroyed(self);
1576 0           return;
1577             }
1578              
1579 0 0         if (revents & EV_READ) {
1580 0           on_readable(aTHX_ self);
1581 0 0         if (self->magic != MC_MAGIC_ALIVE) {
1582 0           self->callback_depth--;
1583 0           check_destroyed(self);
1584 0           return;
1585             }
1586             }
1587              
1588 0 0         if (revents & EV_WRITE) {
1589 0           int rv = try_write(self);
1590 0 0         if (rv < 0) {
1591             char errbuf[128];
1592 0           snprintf(errbuf, sizeof(errbuf), "write error: %s", strerror(errno));
1593 0           handle_disconnect(aTHX_ self, errbuf);
1594             }
1595             }
1596              
1597 0           self->callback_depth--;
1598 0           check_destroyed(self);
1599             }
1600              
1601             /* ================================================================
1602             * Connection
1603             * ================================================================ */
1604              
1605 0           static void start_connect(pTHX_ ev_mc_t *self) {
1606             int fd, ret;
1607              
1608 0 0         if (self->path) {
1609             /* Unix socket */
1610             struct sockaddr_un addr;
1611 0           memset(&addr, 0, sizeof(addr));
1612 0           addr.sun_family = AF_UNIX;
1613 0 0         if (strlen(self->path) >= sizeof(addr.sun_path)) {
1614 0           emit_error(aTHX_ self, "unix socket path too long");
1615 0           return;
1616             }
1617 0           strncpy(addr.sun_path, self->path, sizeof(addr.sun_path) - 1);
1618              
1619 0           fd = socket(AF_UNIX, SOCK_STREAM, 0);
1620 0 0         if (fd < 0) {
1621             char errbuf[128];
1622 0           snprintf(errbuf, sizeof(errbuf), "socket: %s", strerror(errno));
1623 0           emit_error(aTHX_ self, errbuf);
1624 0           return;
1625             }
1626              
1627             /* non-blocking */
1628             {
1629 0           int fl = fcntl(fd, F_GETFL);
1630 0 0         if (fl < 0 || fcntl(fd, F_SETFL, fl | O_NONBLOCK) < 0) {
    0          
1631 0           close(fd);
1632 0           emit_error(aTHX_ self, "fcntl O_NONBLOCK failed");
1633 0           return;
1634             }
1635             }
1636              
1637 0           self->fd = fd;
1638 0           ret = connect(fd, (struct sockaddr *)&addr, sizeof(addr));
1639             } else {
1640             /* TCP */
1641 0           struct addrinfo hints, *res = NULL;
1642             char port_str[16];
1643              
1644 0           snprintf(port_str, sizeof(port_str), "%d", self->port);
1645 0           memset(&hints, 0, sizeof(hints));
1646 0           hints.ai_family = AF_UNSPEC;
1647 0           hints.ai_socktype = SOCK_STREAM;
1648              
1649 0           ret = getaddrinfo(self->host, port_str, &hints, &res);
1650 0 0         if (ret != 0) {
1651             char errbuf[256];
1652 0           snprintf(errbuf, sizeof(errbuf), "getaddrinfo: %s", gai_strerror(ret));
1653 0           report_connect_error(aTHX_ self, errbuf);
1654 0           return;
1655             }
1656              
1657 0           fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
1658 0 0         if (fd < 0) {
1659 0           freeaddrinfo(res);
1660             char errbuf[128];
1661 0           snprintf(errbuf, sizeof(errbuf), "socket: %s", strerror(errno));
1662 0           report_connect_error(aTHX_ self, errbuf);
1663 0           return;
1664             }
1665              
1666             /* non-blocking */
1667             {
1668 0           int fl = fcntl(fd, F_GETFL);
1669 0 0         if (fl < 0 || fcntl(fd, F_SETFL, fl | O_NONBLOCK) < 0) {
    0          
1670 0           freeaddrinfo(res);
1671 0           close(fd);
1672 0           emit_error(aTHX_ self, "fcntl O_NONBLOCK failed");
1673 0           return;
1674             }
1675             }
1676              
1677             /* TCP_NODELAY */
1678             {
1679 0           int one = 1;
1680 0           setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one));
1681             }
1682              
1683 0           self->fd = fd;
1684 0           ret = connect(fd, res->ai_addr, res->ai_addrlen);
1685 0           freeaddrinfo(res);
1686             }
1687              
1688 0 0         if (ret == 0) {
1689             /* Connected immediately (localhost) */
1690 0           self->connected = 1;
1691 0           ev_io_init(&self->rio, io_cb, self->fd, EV_READ);
1692 0           self->rio.data = (void *)self;
1693 0           ev_io_init(&self->wio, io_cb, self->fd, EV_WRITE);
1694 0           self->wio.data = (void *)self;
1695 0           ev_set_priority(&self->rio, self->priority);
1696 0           ev_set_priority(&self->wio, self->priority);
1697              
1698 0           finish_connect_success(aTHX_ self);
1699 0           return;
1700             }
1701              
1702 0 0         if (errno != EINPROGRESS) {
1703             char errbuf[128];
1704 0           snprintf(errbuf, sizeof(errbuf), "connect: %s", strerror(errno));
1705 0           close(self->fd);
1706 0           self->fd = -1;
1707 0           report_connect_error(aTHX_ self, errbuf);
1708 0           return;
1709             }
1710              
1711             /* In progress - wait for writability */
1712 0           self->connecting = 1;
1713 0           ev_io_init(&self->rio, io_cb, self->fd, EV_READ);
1714 0           self->rio.data = (void *)self;
1715 0           ev_io_init(&self->wio, io_cb, self->fd, EV_WRITE);
1716 0           self->wio.data = (void *)self;
1717 0           ev_set_priority(&self->rio, self->priority);
1718 0           ev_set_priority(&self->wio, self->priority);
1719 0           start_writing(self);
1720              
1721 0 0         if (self->connect_timeout_ms > 0) {
1722 0           ev_tstamp delay = (ev_tstamp)self->connect_timeout_ms / 1000.0;
1723 0           ev_timer_init(&self->connect_timer, connect_timeout_cb, delay, 0.0);
1724 0           self->connect_timer.data = (void *)self;
1725 0           ev_timer_start(self->loop, &self->connect_timer);
1726 0           self->connect_timer_active = 1;
1727             }
1728             }
1729              
1730             /* ================================================================
1731             * XS interface
1732             * ================================================================ */
1733              
1734             MODULE = EV::Memcached PACKAGE = EV::Memcached
1735              
1736             BOOT:
1737             {
1738 8 50         I_EV_API("EV::Memcached");
    50          
    50          
1739 8           err_skipped = newSVpvs("skipped");
1740 8           SvREADONLY_on(err_skipped);
1741 8           err_disconnected = newSVpvs("disconnected");
1742 8           SvREADONLY_on(err_disconnected);
1743 8           err_waiting_timeout = newSVpvs("waiting timeout");
1744 8           SvREADONLY_on(err_waiting_timeout);
1745             }
1746              
1747             EV::Memcached
1748             new(char *class, ...)
1749             CODE:
1750             {
1751             PERL_UNUSED_VAR(class);
1752 0 0         if ((items - 1) % 2 != 0) croak("odd number of arguments");
1753              
1754 0           Newxz(RETVAL, 1, ev_mc_t);
1755 0           RETVAL->magic = MC_MAGIC_ALIVE;
1756 0           RETVAL->fd = -1;
1757 0           RETVAL->port = 11211;
1758 0           RETVAL->next_opaque = 1; /* reserve 0 for fire-and-forget quiet ops */
1759 0           ngx_queue_init(&RETVAL->cb_queue);
1760 0           ngx_queue_init(&RETVAL->wait_queue);
1761 0           Newx(RETVAL->rbuf, BUF_INIT_SIZE, char);
1762 0           RETVAL->rbuf_cap = BUF_INIT_SIZE;
1763 0           Newx(RETVAL->wbuf, BUF_INIT_SIZE, char);
1764 0           RETVAL->wbuf_cap = BUF_INIT_SIZE;
1765              
1766             /* Default error handler: warn. Callback exceptions are caught by
1767             G_EVAL in emit_error, so a `die` would be demoted to a warning
1768             anyway — emit it directly and avoid the double prefix. */
1769 0           RETVAL->on_error = eval_pv("sub { warn \"EV::Memcached error: @_\\n\" }", TRUE);
1770 0           SvREFCNT_inc_simple_void_NN(RETVAL->on_error);
1771              
1772             /* Parse options */
1773 0           SV *host_sv = NULL, *path_sv = NULL;
1774 0           int port = 11211;
1775 0           int do_reconnect = 0, reconnect_delay = 1000, max_reconnect_attempts = 0;
1776 0           RETVAL->loop = EV_DEFAULT;
1777             int i;
1778              
1779 0 0         for (i = 1; i < items; i += 2) {
1780 0           const char *k = SvPV_nolen(ST(i));
1781 0           SV *v = ST(i + 1);
1782              
1783 0 0         if (strEQ(k, "host")) host_sv = v;
1784 0 0         else if (strEQ(k, "port")) port = SvIV(v);
1785 0 0         else if (strEQ(k, "path")) path_sv = v;
1786 0 0         else if (strEQ(k, "on_error")) {
1787 0 0         CLEAR_HANDLER(RETVAL->on_error);
1788 0 0         if (SvOK(v) && SvROK(v)) RETVAL->on_error = newSVsv(v);
    0          
1789             }
1790 0 0         else if (strEQ(k, "on_connect")) {
1791 0 0         if (SvOK(v) && SvROK(v)) RETVAL->on_connect = newSVsv(v);
    0          
1792             }
1793 0 0         else if (strEQ(k, "on_disconnect")) {
1794 0 0         if (SvOK(v) && SvROK(v)) RETVAL->on_disconnect = newSVsv(v);
    0          
1795             }
1796 0 0         else if (strEQ(k, "max_pending")) RETVAL->max_pending = SvIV(v);
1797 0 0         else if (strEQ(k, "waiting_timeout")) RETVAL->waiting_timeout_ms = SvIV(v);
1798 0 0         else if (strEQ(k, "connect_timeout")) RETVAL->connect_timeout_ms = SvIV(v);
1799 0 0         else if (strEQ(k, "command_timeout")) RETVAL->command_timeout_ms = SvIV(v);
1800 0 0         else if (strEQ(k, "resume_waiting_on_reconnect")) RETVAL->resume_waiting_on_reconnect = SvTRUE(v) ? 1 : 0;
1801 0 0         else if (strEQ(k, "priority")) RETVAL->priority = SvIV(v);
1802 0 0         else if (strEQ(k, "keepalive")) RETVAL->keepalive = SvIV(v);
1803 0 0         else if (strEQ(k, "reconnect")) do_reconnect = SvTRUE(v) ? 1 : 0;
1804 0 0         else if (strEQ(k, "reconnect_delay")) reconnect_delay = SvIV(v);
1805 0 0         else if (strEQ(k, "max_reconnect_attempts")) max_reconnect_attempts = SvIV(v);
1806 0 0         else if (strEQ(k, "username")) {
1807 0 0         if (SvOK(v)) RETVAL->username = savepv(SvPV_nolen(v));
1808             }
1809 0 0         else if (strEQ(k, "password")) {
1810 0 0         if (SvOK(v)) RETVAL->password = savepv(SvPV_nolen(v));
1811             }
1812 0 0         else if (strEQ(k, "loop")) {
1813 0           RETVAL->loop = (struct ev_loop *)SvPVX(SvRV(v));
1814             }
1815             }
1816              
1817 0 0         if (host_sv && path_sv) {
    0          
1818 0           Safefree(RETVAL->rbuf);
1819 0           Safefree(RETVAL->wbuf);
1820 0 0         CLEAR_HANDLER(RETVAL->on_error);
1821 0 0         CLEAR_HANDLER(RETVAL->on_connect);
1822 0 0         CLEAR_HANDLER(RETVAL->on_disconnect);
1823 0 0         if (RETVAL->username) Safefree(RETVAL->username);
1824 0 0         if (RETVAL->password) Safefree(RETVAL->password);
1825 0           Safefree(RETVAL);
1826 0           croak("cannot specify both 'host' and 'path'");
1827             }
1828              
1829 0           RETVAL->port = port;
1830 0 0         if (do_reconnect) {
1831 0           RETVAL->reconnect = 1;
1832 0           RETVAL->reconnect_delay_ms = reconnect_delay >= 0 ? reconnect_delay : 0;
1833 0           RETVAL->max_reconnect_attempts = max_reconnect_attempts >= 0 ? max_reconnect_attempts : 0;
1834             }
1835              
1836 0 0         if (host_sv && SvOK(host_sv)) {
    0          
1837 0           RETVAL->host = savepv(SvPV_nolen(host_sv));
1838 0           start_connect(aTHX_ RETVAL);
1839             }
1840 0 0         else if (path_sv && SvOK(path_sv)) {
    0          
1841 0           RETVAL->path = savepv(SvPV_nolen(path_sv));
1842 0           start_connect(aTHX_ RETVAL);
1843             }
1844             }
1845             OUTPUT:
1846             RETVAL
1847              
1848             void
1849             DESTROY(EV::Memcached self)
1850             CODE:
1851             {
1852 0 0         if (self->magic == MC_MAGIC_FREED) return;
1853              
1854             /* If we're inside a callback, defer destruction.
1855             * check_destroyed() in io_cb/timer_cb will Safefree after unwind. */
1856 0 0         if (self->callback_depth > 0) {
1857 0           self->magic = MC_MAGIC_FREED;
1858              
1859             /* Stop watchers */
1860 0           stop_reading(self);
1861 0           stop_writing(self);
1862 0           stop_connect_timer(self);
1863 0           disarm_cmd_timer(self);
1864 0           stop_reconnect_timer(self);
1865 0           stop_waiting_timer(self);
1866 0 0         if (self->fd >= 0) { close(self->fd); self->fd = -1; }
1867              
1868             /* Clean up queues without invoking Perl callbacks */
1869 0 0         while (!ngx_queue_empty(&self->cb_queue)) {
1870 0           ngx_queue_t *q = ngx_queue_head(&self->cb_queue);
1871 0           ev_mc_cb_t *cbt = ngx_queue_data(q, ev_mc_cb_t, queue);
1872 0           ngx_queue_remove(q);
1873 0           cleanup_cbt(aTHX_ cbt);
1874             }
1875 0 0         while (!ngx_queue_empty(&self->wait_queue)) {
1876 0           ngx_queue_t *q = ngx_queue_head(&self->wait_queue);
1877 0           ev_mc_wait_t *wt = ngx_queue_data(q, ev_mc_wait_t, queue);
1878 0           ngx_queue_remove(q);
1879 0           cleanup_wait(aTHX_ wt);
1880             }
1881 0           self->pending_count = 0;
1882 0           self->waiting_count = 0;
1883              
1884 0 0         CLEAR_HANDLER(self->on_error);
1885 0 0         CLEAR_HANDLER(self->on_connect);
1886 0 0         CLEAR_HANDLER(self->on_disconnect);
1887 0           Safefree(self->host); self->host = NULL;
1888 0           Safefree(self->path); self->path = NULL;
1889 0           Safefree(self->username); self->username = NULL;
1890 0           Safefree(self->password); self->password = NULL;
1891 0           Safefree(self->rbuf); self->rbuf = NULL;
1892 0           Safefree(self->wbuf); self->wbuf = NULL;
1893 0           return;
1894             }
1895              
1896             /* Stop timers */
1897 0           stop_connect_timer(self);
1898 0           disarm_cmd_timer(self);
1899 0           stop_reconnect_timer(self);
1900 0           stop_waiting_timer(self);
1901              
1902 0           cleanup_connection(aTHX_ self);
1903              
1904             /* Cancel pending/waiting with disconnected error.
1905             Don't pre-set magic=FREED here: cancel_pending_impl bails its
1906             loop on FREED, which would leak every entry past the first if
1907             any callback was set. Bump callback_depth so any nested DESTROY
1908             (e.g. a callback drops a separate strong ref) takes the deferred
1909             path at the top of this function. */
1910 0 0         if (!PL_dirty) {
1911 0           self->callback_depth++;
1912 0           cancel_pending(aTHX_ self, err_disconnected);
1913 0           cancel_waiting(aTHX_ self, err_disconnected);
1914 0           self->callback_depth--;
1915             } else {
1916             /* Global destruction: free entries without calling Perl */
1917 0 0         while (!ngx_queue_empty(&self->cb_queue)) {
1918 0           ngx_queue_t *q = ngx_queue_head(&self->cb_queue);
1919 0           ev_mc_cb_t *cbt = ngx_queue_data(q, ev_mc_cb_t, queue);
1920 0           ngx_queue_remove(q);
1921 0           cleanup_cbt(aTHX_ cbt);
1922             }
1923 0 0         while (!ngx_queue_empty(&self->wait_queue)) {
1924 0           ngx_queue_t *q = ngx_queue_head(&self->wait_queue);
1925 0           ev_mc_wait_t *wt = ngx_queue_data(q, ev_mc_wait_t, queue);
1926 0           ngx_queue_remove(q);
1927 0           cleanup_wait(aTHX_ wt);
1928             }
1929             }
1930              
1931 0           self->magic = MC_MAGIC_FREED;
1932              
1933 0 0         CLEAR_HANDLER(self->on_error);
1934 0 0         CLEAR_HANDLER(self->on_connect);
1935 0 0         CLEAR_HANDLER(self->on_disconnect);
1936              
1937 0           Safefree(self->host);
1938 0           Safefree(self->path);
1939 0           Safefree(self->username);
1940 0           Safefree(self->password);
1941 0           Safefree(self->rbuf);
1942 0           Safefree(self->wbuf);
1943              
1944 0           Safefree(self);
1945             }
1946              
1947             void
1948             connect(EV::Memcached self, const char *host, int port = 11211)
1949             CODE:
1950             {
1951 0 0         if (self->connected || self->connecting)
    0          
1952 0           croak("already connected");
1953              
1954             /* An auto-reconnect timer may be pending; cancel it to avoid a
1955             second start_connect when the timer fires. */
1956 0           stop_reconnect_timer(self);
1957              
1958 0           Safefree(self->host);
1959 0           self->host = savepv(host);
1960 0           self->port = port;
1961 0           Safefree(self->path); self->path = NULL;
1962 0           self->intentional_disconnect = 0;
1963              
1964 0           start_connect(aTHX_ self);
1965             }
1966              
1967             void
1968             connect_unix(EV::Memcached self, const char *path)
1969             CODE:
1970             {
1971 0 0         if (self->connected || self->connecting)
    0          
1972 0           croak("already connected");
1973              
1974 0           stop_reconnect_timer(self);
1975              
1976 0           Safefree(self->path);
1977 0           self->path = savepv(path);
1978 0           Safefree(self->host); self->host = NULL;
1979 0           self->intentional_disconnect = 0;
1980              
1981 0           start_connect(aTHX_ self);
1982             }
1983              
1984             void
1985             disconnect(EV::Memcached self)
1986             CODE:
1987             {
1988 0           self->intentional_disconnect = 1;
1989              
1990 0           stop_reconnect_timer(self);
1991              
1992             /* Pin the object across pending-callback dispatch so that an
1993             `undef $mc` from a callback defers DESTROY rather than freeing
1994             us mid-call; check_destroyed below handles the deferred free. */
1995 0           self->callback_depth++;
1996 0 0         if (self->connected || self->connecting) {
    0          
1997 0           handle_disconnect(aTHX_ self, NULL);
1998             } else {
1999 0           cancel_waiting(aTHX_ self, err_disconnected);
2000             }
2001 0           self->callback_depth--;
2002 0           check_destroyed(self);
2003             }
2004              
2005             int
2006             is_connected(EV::Memcached self)
2007             CODE:
2008 0 0         RETVAL = self->connected || self->connecting;
    0          
    0          
2009             OUTPUT:
2010             RETVAL
2011              
2012             SV *
2013             on_error(EV::Memcached self, ...)
2014             CODE:
2015             {
2016 0 0         if (items > 1) {
2017 0 0         CLEAR_HANDLER(self->on_error);
2018 0 0         if (SvOK(ST(1)) && SvROK(ST(1)) && SvTYPE(SvRV(ST(1))) == SVt_PVCV) {
    0          
    0          
2019 0           self->on_error = newSVsv(ST(1));
2020             }
2021             }
2022 0 0         RETVAL = self->on_error ? newSVsv(self->on_error) : &PL_sv_undef;
2023             }
2024             OUTPUT:
2025             RETVAL
2026              
2027             SV *
2028             on_connect(EV::Memcached self, ...)
2029             CODE:
2030             {
2031 0 0         if (items > 1) {
2032 0 0         CLEAR_HANDLER(self->on_connect);
2033 0 0         if (SvOK(ST(1)) && SvROK(ST(1)) && SvTYPE(SvRV(ST(1))) == SVt_PVCV) {
    0          
    0          
2034 0           self->on_connect = newSVsv(ST(1));
2035             }
2036             }
2037 0 0         RETVAL = self->on_connect ? newSVsv(self->on_connect) : &PL_sv_undef;
2038             }
2039             OUTPUT:
2040             RETVAL
2041              
2042             SV *
2043             on_disconnect(EV::Memcached self, ...)
2044             CODE:
2045             {
2046 0 0         if (items > 1) {
2047 0 0         CLEAR_HANDLER(self->on_disconnect);
2048 0 0         if (SvOK(ST(1)) && SvROK(ST(1)) && SvTYPE(SvRV(ST(1))) == SVt_PVCV) {
    0          
    0          
2049 0           self->on_disconnect = newSVsv(ST(1));
2050             }
2051             }
2052 0 0         RETVAL = self->on_disconnect ? newSVsv(self->on_disconnect) : &PL_sv_undef;
2053             }
2054             OUTPUT:
2055             RETVAL
2056              
2057             void
2058             set(EV::Memcached self, SV *key_sv, SV *value_sv, ...)
2059             ALIAS:
2060             add = 1
2061             replace = 2
2062             CODE:
2063             {
2064             static const uint8_t opcodes[] = { MC_OP_SET, MC_OP_ADD, MC_OP_REPLACE };
2065 0 0         MC_CROAK_UNLESS_CONNECTED(self);
    0          
    0          
    0          
2066              
2067             STRLEN key_len, value_len;
2068 0           const char *key = SvPV(key_sv, key_len);
2069 0           const char *value = SvPV(value_sv, value_len);
2070              
2071 0           int extra = items - 3;
2072 0           SV *cb = NULL;
2073 0 0         if (extra > 0 && SvROK(ST(items-1)) && SvTYPE(SvRV(ST(items-1))) == SVt_PVCV) {
    0          
    0          
2074 0           cb = ST(items-1);
2075 0           extra--;
2076             }
2077 0 0         UV expiry = extra > 0 ? SvUV(ST(3)) : 0;
2078 0 0         UV flags = extra > 1 ? SvUV(ST(4)) : 0;
2079              
2080             char extras[8];
2081 0           mc_write_u32(extras, (uint32_t)flags);
2082 0           mc_write_u32(extras + 4, (uint32_t)expiry);
2083              
2084             /* SET fire-and-forget uses SETQ (quiet) — server suppresses response.
2085             * ADD/REPLACE can fail, so they always use normal opcodes. */
2086 0 0         if (!cb && ix == 0) {
    0          
2087 0           mc_fire_and_forget(aTHX_ self, MC_OP_SETQ, key, key_len, value, value_len,
2088             extras, 8, 0);
2089             } else {
2090 0           mc_enqueue_cmd(aTHX_ self, opcodes[ix], key, key_len, value, value_len,
2091             extras, 8, 0, CB_CMD_STORE, 0, cb);
2092             }
2093             }
2094              
2095             void
2096             cas(EV::Memcached self, SV *key_sv, SV *value_sv, SV *cas_sv, ...)
2097             CODE:
2098             {
2099 0 0         MC_CROAK_UNLESS_CONNECTED(self);
    0          
    0          
    0          
2100              
2101             STRLEN key_len, value_len;
2102 0           const char *key = SvPV(key_sv, key_len);
2103 0           const char *value = SvPV(value_sv, value_len);
2104 0           uint64_t cas_val = SvUV(cas_sv);
2105              
2106 0           int extra = items - 4;
2107 0           SV *cb = NULL;
2108 0 0         if (extra > 0 && SvROK(ST(items-1)) && SvTYPE(SvRV(ST(items-1))) == SVt_PVCV) {
    0          
    0          
2109 0           cb = ST(items-1);
2110 0           extra--;
2111             }
2112 0 0         UV expiry = extra > 0 ? SvUV(ST(4)) : 0;
2113 0 0         UV flags = extra > 1 ? SvUV(ST(5)) : 0;
2114              
2115             char extras[8];
2116 0           mc_write_u32(extras, (uint32_t)flags);
2117 0           mc_write_u32(extras + 4, (uint32_t)expiry);
2118              
2119 0           mc_enqueue_cmd(aTHX_ self, MC_OP_SET, key, key_len, value, value_len,
2120             extras, 8, cas_val, CB_CMD_STORE, 0, cb);
2121             }
2122              
2123             void
2124             get(EV::Memcached self, SV *key_sv, SV *cb_sv = &PL_sv_undef)
2125             ALIAS:
2126             gets = 1
2127             CODE:
2128             {
2129 0 0         MC_CROAK_UNLESS_CONNECTED(self);
    0          
    0          
    0          
2130              
2131             STRLEN key_len;
2132 0           const char *key = SvPV(key_sv, key_len);
2133 0 0         SV *cb = (SvOK(cb_sv) && SvROK(cb_sv)) ? cb_sv : NULL;
    0          
2134              
2135 0           mc_enqueue_cmd(aTHX_ self, MC_OP_GET, key, key_len, NULL, 0,
2136             NULL, 0, 0, ix == 0 ? CB_CMD_GET : CB_CMD_GETS, 0, cb);
2137             }
2138              
2139             void
2140             delete(EV::Memcached self, SV *key_sv, SV *cb_sv = &PL_sv_undef)
2141             CODE:
2142             {
2143 0 0         MC_CROAK_UNLESS_CONNECTED(self);
    0          
    0          
    0          
2144              
2145             STRLEN key_len;
2146 0           const char *key = SvPV(key_sv, key_len);
2147 0 0         SV *cb = (SvOK(cb_sv) && SvROK(cb_sv)) ? cb_sv : NULL;
    0          
2148              
2149 0           mc_enqueue_cmd(aTHX_ self, MC_OP_DELETE, key, key_len, NULL, 0,
2150             NULL, 0, 0, CB_CMD_DELETE, 0, cb);
2151             }
2152              
2153             void
2154             incr(EV::Memcached self, SV *key_sv, ...)
2155             ALIAS:
2156             decr = 1
2157             CODE:
2158             {
2159 0 0         MC_CROAK_UNLESS_CONNECTED(self);
    0          
    0          
    0          
2160              
2161             STRLEN key_len;
2162 0           const char *key = SvPV(key_sv, key_len);
2163              
2164 0           int extra = items - 2;
2165 0           SV *cb = NULL;
2166 0 0         if (extra > 0 && SvROK(ST(items-1)) && SvTYPE(SvRV(ST(items-1))) == SVt_PVCV) {
    0          
    0          
2167 0           cb = ST(items-1);
2168 0           extra--;
2169             }
2170 0 0         UV delta = extra > 0 ? SvUV(ST(2)) : 1;
2171 0 0         UV initial = extra > 1 ? SvUV(ST(3)) : 0;
2172 0 0         UV expiry = extra > 2 ? SvUV(ST(4)) : 0xFFFFFFFF;
2173              
2174             char extras[20];
2175 0           mc_write_u64(extras, (uint64_t)delta);
2176 0           mc_write_u64(extras + 8, (uint64_t)initial);
2177 0           mc_write_u32(extras + 16, (uint32_t)expiry);
2178              
2179 0 0         mc_enqueue_cmd(aTHX_ self, ix == 0 ? MC_OP_INCR : MC_OP_DECR,
2180             key, key_len, NULL, 0, extras, 20, 0, CB_CMD_ARITH, 0, cb);
2181             }
2182              
2183             void
2184             append(EV::Memcached self, SV *key_sv, SV *value_sv, SV *cb_sv = &PL_sv_undef)
2185             ALIAS:
2186             prepend = 1
2187             CODE:
2188             {
2189 0 0         MC_CROAK_UNLESS_CONNECTED(self);
    0          
    0          
    0          
2190              
2191             STRLEN key_len, value_len;
2192 0           const char *key = SvPV(key_sv, key_len);
2193 0           const char *value = SvPV(value_sv, value_len);
2194 0 0         SV *cb = (SvOK(cb_sv) && SvROK(cb_sv)) ? cb_sv : NULL;
    0          
2195              
2196 0 0         mc_enqueue_cmd(aTHX_ self, ix == 0 ? MC_OP_APPEND : MC_OP_PREPEND,
2197             key, key_len, value, value_len, NULL, 0, 0, CB_CMD_STORE, 0, cb);
2198             }
2199              
2200             void
2201             touch(EV::Memcached self, SV *key_sv, UV expiry, SV *cb_sv = &PL_sv_undef)
2202             ALIAS:
2203             gat = 1
2204             gats = 2
2205             CODE:
2206             {
2207 0 0         MC_CROAK_UNLESS_CONNECTED(self);
    0          
    0          
    0          
2208              
2209             STRLEN key_len;
2210 0           const char *key = SvPV(key_sv, key_len);
2211 0 0         SV *cb = (SvOK(cb_sv) && SvROK(cb_sv)) ? cb_sv : NULL;
    0          
2212              
2213             char extras[4];
2214 0           mc_write_u32(extras, (uint32_t)expiry);
2215              
2216             static const int cmds[] = { CB_CMD_TOUCH, CB_CMD_GAT, CB_CMD_GATS };
2217             static const uint8_t ops[] = { MC_OP_TOUCH, MC_OP_GAT, MC_OP_GAT };
2218              
2219 0           mc_enqueue_cmd(aTHX_ self, ops[ix], key, key_len, NULL, 0,
2220 0           extras, 4, 0, cmds[ix], 0, cb);
2221             }
2222              
2223             void
2224             flush(EV::Memcached self, ...)
2225             CODE:
2226             {
2227 0 0         MC_CROAK_UNLESS_CONNECTED(self);
    0          
    0          
    0          
2228              
2229 0           int extra = items - 1;
2230 0           SV *cb = NULL;
2231 0 0         if (extra > 0 && SvROK(ST(items-1)) && SvTYPE(SvRV(ST(items-1))) == SVt_PVCV) {
    0          
    0          
2232 0           cb = ST(items-1);
2233 0           extra--;
2234             }
2235 0 0         UV expiry = extra > 0 ? SvUV(ST(1)) : 0;
2236              
2237             char extras[4];
2238 0           const char *xp = NULL;
2239 0           uint8_t xl = 0;
2240 0 0         if (expiry > 0) {
2241 0           mc_write_u32(extras, (uint32_t)expiry);
2242 0           xp = extras;
2243 0           xl = 4;
2244             }
2245              
2246 0 0         if (cb)
2247 0           mc_enqueue_cmd(aTHX_ self, MC_OP_FLUSH, NULL, 0, NULL, 0,
2248             xp, xl, 0, CB_CMD_FLUSH, 0, cb);
2249             else
2250 0           mc_fire_and_forget(aTHX_ self, MC_OP_FLUSHQ, NULL, 0, NULL, 0,
2251             xp, xl, 0);
2252             }
2253              
2254             void
2255             version(EV::Memcached self, SV *cb_sv = &PL_sv_undef)
2256             ALIAS:
2257             noop = 1
2258             quit = 2
2259             CODE:
2260             {
2261 0 0         MC_CROAK_UNLESS_CONNECTED(self);
    0          
    0          
    0          
2262              
2263 0 0         SV *cb = (SvOK(cb_sv) && SvROK(cb_sv)) ? cb_sv : NULL;
    0          
2264              
2265             static const uint8_t ops[] = { MC_OP_VERSION, MC_OP_NOOP, MC_OP_QUIT };
2266             static const int cmds[] = { CB_CMD_VERSION, CB_CMD_NOOP, CB_CMD_QUIT };
2267              
2268 0           mc_enqueue_cmd(aTHX_ self, ops[ix], NULL, 0, NULL, 0,
2269 0           NULL, 0, 0, cmds[ix], 0, cb);
2270             }
2271              
2272             void
2273             stats(EV::Memcached self, ...)
2274             CODE:
2275             {
2276 0 0         MC_CROAK_UNLESS_CONNECTED(self);
    0          
    0          
    0          
2277              
2278 0           int extra = items - 1;
2279 0           SV *cb = NULL;
2280 0 0         if (extra > 0 && SvROK(ST(items-1)) && SvTYPE(SvRV(ST(items-1))) == SVt_PVCV) {
    0          
    0          
2281 0           cb = ST(items-1);
2282 0           extra--;
2283             }
2284              
2285 0           STRLEN name_len = 0;
2286 0           const char *name = NULL;
2287 0 0         if (extra > 0 && SvOK(ST(1))) {
    0          
2288 0           name = SvPV(ST(1), name_len);
2289             }
2290              
2291 0           mc_enqueue_cmd(aTHX_ self, MC_OP_STAT, name, name_len, NULL, 0,
2292             NULL, 0, 0, CB_CMD_STATS, 0, cb);
2293             }
2294              
2295             void
2296             mget(EV::Memcached self, SV *keys_sv, SV *cb_sv = &PL_sv_undef)
2297             ALIAS:
2298             mgets = 1
2299             CODE:
2300             {
2301 0 0         MC_CROAK_UNLESS_CONNECTED(self);
    0          
    0          
    0          
2302              
2303 0 0         if (!SvROK(keys_sv) || SvTYPE(SvRV(keys_sv)) != SVt_PVAV)
    0          
2304 0           croak("mget requires an array reference");
2305              
2306 0 0         SV *cb = (SvOK(cb_sv) && SvROK(cb_sv)) ? cb_sv : NULL;
    0          
2307 0           AV *keys_av = (AV *)SvRV(keys_sv);
2308              
2309 0           mc_enqueue_mget(aTHX_ self, keys_av, cb, ix);
2310             }
2311              
2312             void
2313             sasl_auth(EV::Memcached self, SV *user_sv, SV *pass_sv, SV *cb_sv = &PL_sv_undef)
2314             CODE:
2315             {
2316 0 0         MC_CROAK_UNLESS_CONNECTED(self);
    0          
    0          
    0          
2317              
2318             STRLEN ulen, plen;
2319 0           const char *user = SvPV(user_sv, ulen);
2320 0           const char *pass = SvPV(pass_sv, plen);
2321 0 0         SV *cb = (SvOK(cb_sv) && SvROK(cb_sv)) ? cb_sv : NULL;
    0          
2322              
2323 0           size_t vlen = 1 + ulen + 1 + plen;
2324             char *authdata;
2325 0           Newx(authdata, vlen, char);
2326 0           authdata[0] = '\0';
2327 0           memcpy(authdata + 1, user, ulen);
2328 0           authdata[1 + ulen] = '\0';
2329 0           memcpy(authdata + 2 + ulen, pass, plen);
2330              
2331 0           mc_enqueue_cmd(aTHX_ self, MC_OP_SASL_AUTH, "PLAIN", 5, authdata, vlen,
2332             NULL, 0, 0, CB_CMD_SASL_AUTH, 0, cb);
2333 0           Safefree(authdata);
2334             }
2335              
2336             void
2337             sasl_list_mechs(EV::Memcached self, SV *cb_sv = &PL_sv_undef)
2338             CODE:
2339             {
2340 0 0         MC_CROAK_UNLESS_CONNECTED(self);
    0          
    0          
    0          
2341              
2342 0 0         SV *cb = (SvOK(cb_sv) && SvROK(cb_sv)) ? cb_sv : NULL;
    0          
2343              
2344 0           mc_enqueue_cmd(aTHX_ self, MC_OP_SASL_LIST_MECHS, NULL, 0, NULL, 0,
2345             NULL, 0, 0, CB_CMD_SASL_LIST, 0, cb);
2346             }
2347              
2348             int
2349             pending_count(EV::Memcached self)
2350             CODE:
2351 0 0         RETVAL = self->pending_count;
2352             OUTPUT:
2353             RETVAL
2354              
2355             int
2356             waiting_count(EV::Memcached self)
2357             CODE:
2358 0 0         RETVAL = self->waiting_count;
2359             OUTPUT:
2360             RETVAL
2361              
2362             int
2363             max_pending(EV::Memcached self, ...)
2364             CODE:
2365             {
2366 0 0         if (items > 1) {
2367 0           int old = self->max_pending;
2368 0           self->max_pending = SvIV(ST(1));
2369 0 0         if (self->max_pending < 0) self->max_pending = 0;
2370             /* If limit increased, drain waiting queue */
2371 0 0         if (self->connected && self->max_pending > old)
    0          
2372 0           send_next_waiting(aTHX_ self);
2373             }
2374 0 0         RETVAL = self->max_pending;
2375             }
2376             OUTPUT:
2377             RETVAL
2378              
2379             int
2380             waiting_timeout(EV::Memcached self, ...)
2381             CODE:
2382             {
2383 0 0         if (items > 1) {
2384 0           self->waiting_timeout_ms = SvIV(ST(1));
2385 0 0         if (self->waiting_timeout_ms < 0) self->waiting_timeout_ms = 0;
2386             }
2387 0 0         RETVAL = self->waiting_timeout_ms;
2388             }
2389             OUTPUT:
2390             RETVAL
2391              
2392             int
2393             resume_waiting_on_reconnect(EV::Memcached self, ...)
2394             CODE:
2395             {
2396 0 0         if (items > 1) {
2397 0           self->resume_waiting_on_reconnect = SvTRUE(ST(1)) ? 1 : 0;
2398             }
2399 0 0         RETVAL = self->resume_waiting_on_reconnect;
2400             }
2401             OUTPUT:
2402             RETVAL
2403              
2404             int
2405             connect_timeout(EV::Memcached self, ...)
2406             CODE:
2407             {
2408 0 0         if (items > 1) {
2409 0           self->connect_timeout_ms = SvIV(ST(1));
2410 0 0         if (self->connect_timeout_ms < 0) self->connect_timeout_ms = 0;
2411             }
2412 0 0         RETVAL = self->connect_timeout_ms;
2413             }
2414             OUTPUT:
2415             RETVAL
2416              
2417             int
2418             command_timeout(EV::Memcached self, ...)
2419             CODE:
2420             {
2421 0 0         if (items > 1) {
2422 0           self->command_timeout_ms = SvIV(ST(1));
2423 0 0         if (self->command_timeout_ms < 0) self->command_timeout_ms = 0;
2424 0 0         if (self->command_timeout_ms == 0)
2425 0           disarm_cmd_timer(self);
2426 0 0         else if (!ngx_queue_empty(&self->cb_queue))
2427 0           arm_cmd_timer(self);
2428             }
2429 0 0         RETVAL = self->command_timeout_ms;
2430             }
2431             OUTPUT:
2432             RETVAL
2433              
2434             void
2435             reconnect(EV::Memcached self, int enable, int delay_ms = 1000, int max_attempts = 0)
2436             CODE:
2437             {
2438 0           self->reconnect = enable ? 1 : 0;
2439 0           self->reconnect_delay_ms = delay_ms >= 0 ? delay_ms : 0;
2440 0           self->max_reconnect_attempts = max_attempts >= 0 ? max_attempts : 0;
2441 0 0         if (!enable) {
2442 0           self->reconnect_attempts = 0;
2443 0           stop_reconnect_timer(self);
2444             }
2445             }
2446              
2447             int
2448             reconnect_enabled(EV::Memcached self)
2449             CODE:
2450 0 0         RETVAL = self->reconnect;
2451             OUTPUT:
2452             RETVAL
2453              
2454             int
2455             priority(EV::Memcached self, ...)
2456             CODE:
2457             {
2458 0 0         if (items > 1) {
2459 0           self->priority = SvIV(ST(1));
2460 0 0         if (self->priority < -2) self->priority = -2;
2461 0 0         if (self->priority > 2) self->priority = 2;
2462             /* Apply to active watchers */
2463 0 0         if (self->reading) {
2464 0           ev_io_stop(self->loop, &self->rio);
2465 0           ev_set_priority(&self->rio, self->priority);
2466 0           ev_io_start(self->loop, &self->rio);
2467             } else {
2468 0           ev_set_priority(&self->rio, self->priority);
2469             }
2470 0 0         if (self->writing) {
2471 0           ev_io_stop(self->loop, &self->wio);
2472 0           ev_set_priority(&self->wio, self->priority);
2473 0           ev_io_start(self->loop, &self->wio);
2474             } else {
2475 0           ev_set_priority(&self->wio, self->priority);
2476             }
2477             }
2478 0 0         RETVAL = self->priority;
2479             }
2480             OUTPUT:
2481             RETVAL
2482              
2483             int
2484             keepalive(EV::Memcached self, ...)
2485             CODE:
2486             {
2487 0 0         if (items > 1) {
2488 0           self->keepalive = SvIV(ST(1));
2489 0 0         if (self->keepalive < 0) self->keepalive = 0;
2490 0 0         if (self->connected && self->fd >= 0)
    0          
2491 0           apply_keepalive(self);
2492             }
2493 0 0         RETVAL = self->keepalive;
2494             }
2495             OUTPUT:
2496             RETVAL
2497              
2498             void
2499             skip_pending(EV::Memcached self)
2500             CODE:
2501             {
2502 0           self->callback_depth++;
2503 0           cancel_pending_impl(aTHX_ self, err_skipped, 1);
2504 0           self->callback_depth--;
2505 0           check_destroyed(self);
2506             }
2507              
2508             void
2509             skip_waiting(EV::Memcached self)
2510             CODE:
2511             {
2512 0           self->callback_depth++;
2513 0           cancel_waiting(aTHX_ self, err_skipped);
2514 0           self->callback_depth--;
2515 0           check_destroyed(self);
2516             }