File Coverage

Memcached.xs
Criterion Covered Total %
statement 7 1246 0.5
branch 3 940 0.3
condition n/a
subroutine n/a
pod n/a
total 10 2186 0.4


line stmt bran cond sub pod time code
1             #include "EXTERN.h"
2             #include "perl.h"
3             #include "XSUB.h"
4             #include "ppport.h"
5              
6             #include "EVAPI.h"
7             #include "ngx-queue.h"
8              
9             #include
10             #include
11             #include
12             #include
13             #include
14             #include
15             #include
16             #include
17             #include
18             #include
19              
20             /* ================================================================
21             * Constants
22             * ================================================================ */
23              
24             #define MC_MAGIC_ALIVE 0xCAFEBEEF
25             #define MC_MAGIC_FREED 0xDEADCAFE
26              
27             #define MC_REQ_MAGIC 0x80
28             #define MC_RES_MAGIC 0x81
29             #define MC_HEADER_SIZE 24
30              
31             /* Opcodes */
32             #define MC_OP_GET 0x00
33             #define MC_OP_SET 0x01
34             #define MC_OP_ADD 0x02
35             #define MC_OP_REPLACE 0x03
36             #define MC_OP_DELETE 0x04
37             #define MC_OP_INCR 0x05
38             #define MC_OP_DECR 0x06
39             #define MC_OP_QUIT 0x07
40             #define MC_OP_FLUSH 0x08
41             #define MC_OP_NOOP 0x0A
42             #define MC_OP_VERSION 0x0B
43             #define MC_OP_GETKQ 0x0D
44             #define MC_OP_APPEND 0x0E
45             #define MC_OP_PREPEND 0x0F
46             #define MC_OP_STAT 0x10
47             #define MC_OP_SETQ 0x11
48             #define MC_OP_FLUSHQ 0x18
49             #define MC_OP_TOUCH 0x1C
50             #define MC_OP_GAT 0x1D
51             #define MC_OP_SASL_LIST_MECHS 0x20
52             #define MC_OP_SASL_AUTH 0x21
53             #define MC_OP_SASL_STEP 0x22
54              
55             /* Status codes */
56             #define MC_STATUS_OK 0x0000
57             #define MC_STATUS_KEY_NOT_FOUND 0x0001
58             #define MC_STATUS_KEY_EXISTS 0x0002
59             #define MC_STATUS_VALUE_TOO_LARGE 0x0003
60             #define MC_STATUS_INVALID_ARGS 0x0004
61             #define MC_STATUS_NOT_STORED 0x0005
62             #define MC_STATUS_NON_NUMERIC 0x0006
63             #define MC_STATUS_AUTH_ERROR 0x0020
64             #define MC_STATUS_AUTH_CONTINUE 0x0021
65             #define MC_STATUS_UNKNOWN_CMD 0x0081
66             #define MC_STATUS_OUT_OF_MEMORY 0x0082
67              
68             #define BUF_INIT_SIZE 16384
69             #define MC_MAX_KEY_LEN 250
70              
71             /* Callback command types */
72             #define CB_CMD_GET 0 /* get - return value only */
73             #define CB_CMD_GETS 1 /* gets - return {value, flags, cas} */
74             #define CB_CMD_STORE 2 /* set/add/replace/append/prepend - return 1 */
75             #define CB_CMD_DELETE 3 /* delete - return 1 */
76             #define CB_CMD_ARITH 4 /* incr/decr - return new value */
77             #define CB_CMD_TOUCH 5 /* touch - return 1 */
78             #define CB_CMD_GAT 6 /* gat - return value */
79             #define CB_CMD_GATS 7 /* gats - return {value, flags, cas} */
80             #define CB_CMD_VERSION 8 /* version - return string */
81             #define CB_CMD_NOOP 9 /* noop - return 1 */
82             #define CB_CMD_FLUSH 10 /* flush - return 1 */
83             #define CB_CMD_STATS 11 /* stats - accumulate, return hash */
84             #define CB_CMD_MGET_ENTRY 12 /* individual GETKQ in mget */
85             #define CB_CMD_MGET_FENCE 13 /* NOOP fence in mget */
86             #define CB_CMD_QUIT 14 /* quit */
87             #define CB_CMD_MGETS_ENTRY 15 /* individual GETKQ in mgets (full info) */
88             #define CB_CMD_MGETS_FENCE 16 /* NOOP fence in mgets */
89             #define CB_CMD_SASL_LIST 17 /* sasl_list_mechs - return string */
90             #define CB_CMD_SASL_AUTH 18 /* sasl_auth - return 1 on success */
91              
92             #define CLEAR_HANDLER(field) \
93             do { if (NULL != (field)) { SvREFCNT_dec(field); (field) = NULL; } } while(0)
94              
95             #define MC_CROAK_UNLESS_CONNECTED(self) \
96             do { \
97             if (!(self)->connected && !(self)->connecting && \
98             !((self)->reconnect && (self)->reconnect_timer_active)) \
99             croak("not connected"); \
100             } while(0)
101              
102             /* ================================================================
103             * Type declarations
104             * ================================================================ */
105              
106             typedef struct ev_mc_s ev_mc_t;
107             typedef struct ev_mc_cb_s ev_mc_cb_t;
108             typedef struct ev_mc_wait_s ev_mc_wait_t;
109              
110             typedef ev_mc_t* EV__Memcached;
111             typedef struct ev_loop* EV__Loop;
112              
113             /* ================================================================
114             * Data structures
115             * ================================================================ */
116              
117             struct ev_mc_cb_s {
118             SV *cb; /* Perl callback (NULL for fire-and-forget / mget entries) */
119             ngx_queue_t queue;
120             uint32_t opaque;
121             int cmd; /* CB_CMD_* */
122             int quiet; /* 1 = quiet variant, may not get response */
123             int counted; /* 1 = contributes to pending_count */
124             int skipped;
125             HV *stats_hv; /* for CB_CMD_STATS: accumulated hash */
126             HV *mget_results; /* for CB_CMD_MGET_ENTRY: shared hash (borrowed) */
127             /* for CB_CMD_MGET_FENCE: owned hash */
128             };
129              
130             struct ev_mc_wait_s {
131             char *packet;
132             size_t packet_len;
133             SV *cb;
134             int cmd;
135             int quiet;
136             uint32_t opaque;
137             HV *stats_hv;
138             HV *mget_results; /* borrowed for MGET_ENTRY, owned for MGET_FENCE */
139             int counted;
140             int no_response; /* 1: fire-and-forget, drain to wbuf only, no cb_queue entry */
141             ngx_queue_t queue;
142             ev_tstamp queued_at;
143             };
144              
145             struct ev_mc_s {
146             unsigned int magic;
147             struct ev_loop *loop;
148             int fd;
149             int connected;
150             int connecting;
151              
152             /* IO watchers */
153             ev_io rio, wio;
154             int reading, writing;
155              
156             /* Buffers */
157             char *rbuf;
158             size_t rbuf_len, rbuf_cap;
159             char *wbuf;
160             size_t wbuf_len, wbuf_off, wbuf_cap;
161              
162             /* Callbacks */
163             SV *on_error;
164             SV *on_connect;
165             SV *on_disconnect;
166              
167             /* Command queue */
168             ngx_queue_t cb_queue;
169             ngx_queue_t wait_queue;
170             int pending_count;
171             int waiting_count;
172             int max_pending; /* 0 = unlimited */
173             uint32_t next_opaque;
174              
175             /* Reconnection */
176             char *host;
177             int port;
178             char *path;
179             int reconnect;
180             int reconnect_delay_ms;
181             int max_reconnect_attempts;
182             int reconnect_attempts;
183             ev_timer reconnect_timer;
184             int reconnect_timer_active;
185             int intentional_disconnect;
186             int resume_waiting_on_reconnect;
187              
188             /* Timeouts */
189             int connect_timeout_ms;
190             ev_timer connect_timer;
191             int connect_timer_active;
192             int command_timeout_ms;
193             ev_timer cmd_timer;
194             int cmd_timer_active;
195              
196             /* Flow control */
197             int waiting_timeout_ms;
198             ev_timer waiting_timer;
199             int waiting_timer_active;
200              
201             /* Safety */
202             int callback_depth;
203             int in_cb_cleanup;
204             int in_wait_cleanup;
205              
206             /* Options */
207             int priority;
208             int keepalive;
209              
210             /* SASL auth */
211             char *username;
212             char *password;
213             };
214              
215             /* ================================================================
216             * Shared error strings (initialized in BOOT)
217             * ================================================================ */
218              
219             static SV *err_skipped = NULL;
220             static SV *err_disconnected = NULL;
221             static SV *err_waiting_timeout = NULL;
222              
223             /* ================================================================
224             * Forward declarations
225             * ================================================================ */
226              
227             static void io_cb(EV_P_ ev_io *w, int revents);
228             static void reconnect_timer_cb(EV_P_ ev_timer *w, int revents);
229             static void waiting_timer_cb(EV_P_ ev_timer *w, int revents);
230             static void connect_timeout_cb(EV_P_ ev_timer *w, int revents);
231             static void cmd_timeout_cb(EV_P_ ev_timer *w, int revents);
232             static void arm_cmd_timer(ev_mc_t *self);
233             static void disarm_cmd_timer(ev_mc_t *self);
234             static uint32_t mc_enqueue_cmd(pTHX_ ev_mc_t *self,
235             uint8_t opcode, const char *key, STRLEN key_len,
236             const char *value, STRLEN value_len,
237             const char *extras, uint8_t extras_len,
238             uint64_t cas, int cmd, int quiet, SV *cb);
239             static void start_reading(ev_mc_t *self);
240             static void stop_reading(ev_mc_t *self);
241             static void start_writing(ev_mc_t *self);
242             static void stop_writing(ev_mc_t *self);
243             static void start_connect(pTHX_ ev_mc_t *self);
244             static void cleanup_connection(pTHX_ ev_mc_t *self);
245             static void emit_error(pTHX_ ev_mc_t *self, const char *msg);
246             static void handle_disconnect(pTHX_ ev_mc_t *self, const char *reason);
247             static void schedule_reconnect(pTHX_ ev_mc_t *self);
248             static void apply_keepalive(ev_mc_t *self);
249             static void report_connect_error(pTHX_ ev_mc_t *self, const char *errbuf);
250             static void finish_connect_success(pTHX_ ev_mc_t *self);
251             static void mc_send_sasl_auth(pTHX_ ev_mc_t *self, SV *cb);
252             static void stop_connect_timer(ev_mc_t *self);
253             static void stop_reconnect_timer(ev_mc_t *self);
254             static void stop_waiting_timer(ev_mc_t *self);
255             static void send_next_waiting(pTHX_ ev_mc_t *self);
256             static int check_destroyed(ev_mc_t *self);
257             static void cancel_pending(pTHX_ ev_mc_t *self, SV *err_sv);
258             static void cancel_waiting(pTHX_ ev_mc_t *self, SV *err_sv);
259              
260             /* ================================================================
261             * Binary protocol helpers (portable, no unaligned access)
262             * ================================================================ */
263              
264 0           static void mc_write_u16(char *buf, uint16_t val) {
265 0           val = htons(val);
266 0           memcpy(buf, &val, 2);
267 0           }
268              
269 0           static void mc_write_u32(char *buf, uint32_t val) {
270 0           val = htonl(val);
271 0           memcpy(buf, &val, 4);
272 0           }
273              
274 0           static void mc_write_u64(char *buf, uint64_t val) {
275 0           uint32_t hi = htonl((uint32_t)(val >> 32));
276 0           uint32_t lo = htonl((uint32_t)(val & 0xFFFFFFFF));
277 0           memcpy(buf, &hi, 4);
278 0           memcpy(buf + 4, &lo, 4);
279 0           }
280              
281 0           static uint16_t mc_read_u16(const char *buf) {
282             uint16_t val;
283 0           memcpy(&val, buf, 2);
284 0           return ntohs(val);
285             }
286              
287 0           static uint32_t mc_read_u32(const char *buf) {
288             uint32_t val;
289 0           memcpy(&val, buf, 4);
290 0           return ntohl(val);
291             }
292              
293 0           static uint64_t mc_read_u64(const char *buf) {
294             uint32_t hi, lo;
295 0           memcpy(&hi, buf, 4);
296 0           memcpy(&lo, buf + 4, 4);
297 0           return ((uint64_t)ntohl(hi) << 32) | ntohl(lo);
298             }
299              
300 0           static void mc_encode_header(char *buf, uint8_t opcode, uint16_t key_len,
301             uint8_t extras_len, uint32_t body_len, uint32_t opaque, uint64_t cas)
302             {
303 0           buf[0] = MC_REQ_MAGIC;
304 0           buf[1] = opcode;
305 0           mc_write_u16(buf + 2, key_len);
306 0           buf[4] = extras_len;
307 0           buf[5] = 0; /* data_type = raw */
308 0           mc_write_u16(buf + 6, 0); /* vbucket / reserved */
309 0           mc_write_u32(buf + 8, body_len);
310 0           mc_write_u32(buf + 12, opaque);
311 0           mc_write_u64(buf + 16, cas);
312 0           }
313              
314             /* ================================================================
315             * Status code to string
316             * ================================================================ */
317              
318 0           static const char* mc_status_str(uint16_t status) {
319 0           switch (status) {
320 0           case MC_STATUS_OK: return "OK";
321 0           case MC_STATUS_KEY_NOT_FOUND: return "NOT_FOUND";
322 0           case MC_STATUS_KEY_EXISTS: return "EXISTS";
323 0           case MC_STATUS_VALUE_TOO_LARGE: return "VALUE_TOO_LARGE";
324 0           case MC_STATUS_INVALID_ARGS: return "INVALID_ARGUMENTS";
325 0           case MC_STATUS_NOT_STORED: return "NOT_STORED";
326 0           case MC_STATUS_NON_NUMERIC: return "NON_NUMERIC_VALUE";
327 0           case MC_STATUS_AUTH_ERROR: return "AUTH_ERROR";
328 0           case MC_STATUS_AUTH_CONTINUE: return "AUTH_CONTINUE";
329 0           case MC_STATUS_UNKNOWN_CMD: return "UNKNOWN_COMMAND";
330 0           case MC_STATUS_OUT_OF_MEMORY: return "OUT_OF_MEMORY";
331 0           default: return "UNKNOWN_ERROR";
332             }
333             }
334              
335             /* ================================================================
336             * Buffer management
337             * ================================================================ */
338              
339 0           static void buf_ensure_write(ev_mc_t *self, size_t needed) {
340             /* Compact first: reclaim any already-sent prefix. The capacity check
341             below short-circuits the "compaction alone sufficed" case. */
342 0 0         if (self->wbuf_off > 0) {
343 0           size_t live = self->wbuf_len - self->wbuf_off;
344 0           memmove(self->wbuf, self->wbuf + self->wbuf_off, live);
345 0           self->wbuf_len = live;
346 0           self->wbuf_off = 0;
347             }
348 0           size_t total = self->wbuf_len + needed;
349 0 0         if (self->wbuf_cap >= total) return;
350 0 0         size_t new_cap = self->wbuf_cap ? self->wbuf_cap : BUF_INIT_SIZE;
351 0 0         while (new_cap < total) new_cap *= 2;
352 0           Renew(self->wbuf, new_cap, char);
353 0           self->wbuf_cap = new_cap;
354             }
355              
356 0           static void buf_ensure_read(ev_mc_t *self, size_t needed) {
357 0           size_t total = self->rbuf_len + needed;
358 0 0         if (self->rbuf_cap >= total) return;
359 0 0         size_t new_cap = self->rbuf_cap ? self->rbuf_cap : BUF_INIT_SIZE;
360 0 0         while (new_cap < total) new_cap *= 2;
361 0           Renew(self->rbuf, new_cap, char);
362 0           self->rbuf_cap = new_cap;
363             }
364              
365 0           static void buf_append_write(ev_mc_t *self, const char *data, size_t len) {
366 0           buf_ensure_write(self, len);
367 0           memcpy(self->wbuf + self->wbuf_len, data, len);
368 0           self->wbuf_len += len;
369 0           }
370              
371             /* ================================================================
372             * Watcher helpers
373             * ================================================================ */
374              
375 0           static void start_reading(ev_mc_t *self) {
376 0 0         if (!self->reading && self->fd >= 0) {
    0          
377 0           ev_io_start(self->loop, &self->rio);
378 0           self->reading = 1;
379             }
380 0           }
381              
382 0           static void stop_reading(ev_mc_t *self) {
383 0 0         if (self->reading) {
384 0           ev_io_stop(self->loop, &self->rio);
385 0           self->reading = 0;
386             }
387 0           }
388              
389 0           static void start_writing(ev_mc_t *self) {
390 0 0         if (!self->writing && self->fd >= 0) {
    0          
391 0           ev_io_start(self->loop, &self->wio);
392 0           self->writing = 1;
393             }
394 0           }
395              
396 0           static void stop_writing(ev_mc_t *self) {
397 0 0         if (self->writing) {
398 0           ev_io_stop(self->loop, &self->wio);
399 0           self->writing = 0;
400             }
401 0           }
402              
403 0           static int check_destroyed(ev_mc_t *self) {
404 0 0         if (self->magic == MC_MAGIC_FREED && self->callback_depth == 0) {
    0          
405 0           Safefree(self);
406 0           return 1;
407             }
408 0           return 0;
409             }
410              
411             /* ================================================================
412             * Callback invocation
413             * ================================================================ */
414              
415             /* Invoke Perl callback with two arguments.
416             * result and error are newly created SVs (refcount=1) that become mortal.
417             * Pass NULL for undef. */
418 0           static void invoke_cb(pTHX_ ev_mc_t *self, SV *cb, SV *result, SV *error) {
419 0 0         if (!cb) return;
420              
421 0           self->callback_depth++;
422              
423 0           dSP;
424 0           ENTER;
425 0           SAVETMPS;
426 0 0         PUSHMARK(SP);
427 0 0         EXTEND(SP, 2);
428 0 0         if (result)
429 0           mPUSHs(result);
430             else
431 0           PUSHs(&PL_sv_undef);
432 0 0         if (error)
433 0           mPUSHs(error);
434             else
435 0           PUSHs(&PL_sv_undef);
436 0           PUTBACK;
437 0           call_sv(cb, G_DISCARD | G_EVAL);
438 0 0         if (SvTRUE(ERRSV)) {
    0          
439 0 0         warn("EV::Memcached: callback error: %s", SvPV_nolen(ERRSV));
440 0 0         sv_setsv(ERRSV, &PL_sv_undef);
441             }
442 0 0         FREETMPS;
443 0           LEAVE;
444              
445 0           self->callback_depth--;
446             }
447              
448             /* Invoke a user handler with at most one mortal arg; catch exceptions
449             so a die in user code can't unwind through libev. */
450 0           static void invoke_handler(pTHX_ ev_mc_t *self, SV *cb, SV *arg, const char *label) {
451 0 0         if (NULL == cb) { if (arg) SvREFCNT_dec(arg); return; }
    0          
452              
453 0           self->callback_depth++;
454              
455 0           dSP;
456 0           ENTER;
457 0           SAVETMPS;
458 0 0         PUSHMARK(SP);
459 0 0         if (arg) XPUSHs(sv_2mortal(arg));
    0          
460 0           PUTBACK;
461 0           call_sv(cb, G_DISCARD | G_EVAL);
462 0 0         if (SvTRUE(ERRSV)) {
    0          
463 0 0         warn("EV::Memcached: %s callback error: %s", label, SvPV_nolen(ERRSV));
464 0 0         sv_setsv(ERRSV, &PL_sv_undef);
465             }
466 0 0         FREETMPS;
467 0           LEAVE;
468              
469 0           self->callback_depth--;
470             }
471              
472 0           static void emit_error(pTHX_ ev_mc_t *self, const char *msg) {
473 0           invoke_handler(aTHX_ self, self->on_error, newSVpv(msg, 0), "on_error");
474 0           }
475              
476 0           static void emit_connect(pTHX_ ev_mc_t *self) {
477 0           invoke_handler(aTHX_ self, self->on_connect, NULL, "on_connect");
478 0           }
479              
480 0           static void emit_disconnect(pTHX_ ev_mc_t *self) {
481 0           invoke_handler(aTHX_ self, self->on_disconnect, NULL, "on_disconnect");
482 0           }
483              
484 0           static void apply_keepalive(ev_mc_t *self) {
485 0 0         if (self->keepalive <= 0 || self->path) return;
    0          
486 0           int one = 1;
487 0           setsockopt(self->fd, SOL_SOCKET, SO_KEEPALIVE, &one, sizeof(one));
488             #ifdef TCP_KEEPIDLE
489 0           setsockopt(self->fd, IPPROTO_TCP, TCP_KEEPIDLE,
490 0           &self->keepalive, sizeof(self->keepalive));
491             #endif
492             }
493              
494             /* Common tail for synchronous connect-failure paths in start_connect:
495             emit error, run pending callbacks, and arm reconnect if configured.
496             Caller returns immediately after invoking. */
497 0           static void report_connect_error(pTHX_ ev_mc_t *self, const char *errbuf) {
498 0           self->callback_depth++;
499 0           emit_error(aTHX_ self, errbuf);
500 0           self->callback_depth--;
501 0 0         if (check_destroyed(self)) return;
502 0 0         if (!self->intentional_disconnect && self->reconnect)
    0          
503 0           schedule_reconnect(aTHX_ self);
504             }
505              
506             /* Shared post-connect-success path used by both on_connect_complete (after
507             async EINPROGRESS resolves) and start_connect (when connect(2) returns
508             immediately). Caller must already have set self->connected = 1 and
509             stopped/initialized the io watchers as required by its path. */
510 0           static void finish_connect_success(pTHX_ ev_mc_t *self) {
511 0           self->reconnect_attempts = 0;
512              
513 0           start_reading(self);
514 0           apply_keepalive(self);
515              
516 0           mc_send_sasl_auth(aTHX_ self, NULL);
517              
518 0           emit_connect(aTHX_ self);
519 0 0         if (check_destroyed(self)) return;
520              
521             /* Drain wait_queue immediately unless we are waiting for SASL_AUTH
522             to complete; the SASL response handler calls send_next_waiting. */
523 0 0         if (!self->username || !self->password)
    0          
524 0           send_next_waiting(aTHX_ self);
525             }
526              
527             /* ================================================================
528             * Callback entry management
529             * ================================================================ */
530              
531 0           static ev_mc_cb_t* alloc_cbt(void) {
532             ev_mc_cb_t *cbt;
533 0           Newxz(cbt, 1, ev_mc_cb_t);
534 0           return cbt;
535             }
536              
537 0           static void cleanup_cbt(pTHX_ ev_mc_cb_t *cbt) {
538 0 0         CLEAR_HANDLER(cbt->cb);
539 0 0         if (cbt->stats_hv) {
540 0           SvREFCNT_dec((SV*)cbt->stats_hv);
541 0           cbt->stats_hv = NULL;
542             }
543 0 0         if ((cbt->cmd == CB_CMD_MGET_FENCE || cbt->cmd == CB_CMD_MGETS_FENCE) && cbt->mget_results) {
    0          
    0          
544 0           SvREFCNT_dec((SV*)cbt->mget_results);
545 0           cbt->mget_results = NULL;
546             }
547             /* MGET_ENTRY has borrowed ref - don't decrement */
548 0           Safefree(cbt);
549 0           }
550              
551 0           static void cleanup_wait(pTHX_ ev_mc_wait_t *wt) {
552 0 0         CLEAR_HANDLER(wt->cb);
553 0 0         if (wt->packet) { Safefree(wt->packet); wt->packet = NULL; }
554 0 0         if (wt->stats_hv) {
555 0           SvREFCNT_dec((SV*)wt->stats_hv);
556 0           wt->stats_hv = NULL;
557             }
558 0 0         if ((wt->cmd == CB_CMD_MGET_FENCE || wt->cmd == CB_CMD_MGETS_FENCE) && wt->mget_results) {
    0          
    0          
559 0           SvREFCNT_dec((SV*)wt->mget_results);
560 0           wt->mget_results = NULL;
561             }
562 0           Safefree(wt);
563 0           }
564              
565             /* ================================================================
566             * Cancel pending/waiting commands (on disconnect)
567             * ================================================================ */
568              
569             /* mark_skipped: if true, set cbt->skipped=1 and invoke ALL callbacks (skip_pending behavior).
570             * if false, only invoke non-skipped callbacks (cancel_pending behavior).
571             *
572             * Callers MUST bump callback_depth before calling this (and call
573             * check_destroyed after) — DESTROY sets magic=FREED before running
574             * us, so an unconditional check_destroyed here would Safefree(self)
575             * mid-loop. The depth bump pins self until the caller is done. */
576 0           static void cancel_pending_impl(pTHX_ ev_mc_t *self, SV *err_sv, int mark_skipped) {
577 0 0         if (self->in_cb_cleanup) return;
578 0           self->in_cb_cleanup = 1;
579              
580 0 0         while (!ngx_queue_empty(&self->cb_queue)) {
581 0           ngx_queue_t *q = ngx_queue_head(&self->cb_queue);
582 0           ev_mc_cb_t *cbt = ngx_queue_data(q, ev_mc_cb_t, queue);
583 0           ngx_queue_remove(q);
584 0 0         if (cbt->counted) self->pending_count--;
585              
586 0           int already_skipped = cbt->skipped;
587 0 0         if (mark_skipped) cbt->skipped = 1;
588 0 0         if (cbt->cb && !already_skipped) {
    0          
589 0           invoke_cb(aTHX_ self, cbt->cb, NULL, newSVsv(err_sv));
590 0 0         if (self->magic == MC_MAGIC_FREED) {
591 0           cleanup_cbt(aTHX_ cbt);
592 0           self->in_cb_cleanup = 0;
593 0           return;
594             }
595             }
596 0           cleanup_cbt(aTHX_ cbt);
597             }
598 0           self->pending_count = 0;
599 0           self->in_cb_cleanup = 0;
600             }
601              
602 0           static void cancel_pending(pTHX_ ev_mc_t *self, SV *err_sv) {
603 0           cancel_pending_impl(aTHX_ self, err_sv, 0);
604 0           }
605              
606 0           static void cancel_waiting(pTHX_ ev_mc_t *self, SV *err_sv) {
607 0 0         if (self->in_wait_cleanup) return;
608 0           self->in_wait_cleanup = 1;
609              
610 0 0         while (!ngx_queue_empty(&self->wait_queue)) {
611 0           ngx_queue_t *q = ngx_queue_head(&self->wait_queue);
612 0           ev_mc_wait_t *wt = ngx_queue_data(q, ev_mc_wait_t, queue);
613 0           ngx_queue_remove(q);
614 0           self->waiting_count--;
615              
616 0 0         if (wt->cb) {
617 0           invoke_cb(aTHX_ self, wt->cb, NULL, newSVsv(err_sv));
618 0 0         if (self->magic == MC_MAGIC_FREED) {
619 0           cleanup_wait(aTHX_ wt);
620 0           self->in_wait_cleanup = 0;
621             /* Caller bumps depth before calling us (disconnect XS,
622             skip_waiting XS, or io_cb path), so check_destroyed
623             here would prematurely free during DESTROY where
624             magic is already FREED before we run. */
625 0           return;
626             }
627             }
628 0           cleanup_wait(aTHX_ wt);
629             }
630 0           self->waiting_count = 0;
631 0           self->in_wait_cleanup = 0;
632             }
633              
634             /* ================================================================
635             * Connection cleanup and disconnect handling
636             * ================================================================ */
637              
638 0           static void cleanup_connection(pTHX_ ev_mc_t *self) {
639 0           stop_reading(self);
640 0           stop_writing(self);
641              
642 0           stop_connect_timer(self);
643 0           disarm_cmd_timer(self);
644 0           stop_waiting_timer(self);
645              
646 0 0         if (self->fd >= 0) {
647 0           close(self->fd);
648 0           self->fd = -1;
649             }
650              
651 0           self->connected = 0;
652 0           self->connecting = 0;
653 0           self->rbuf_len = 0;
654 0           self->wbuf_len = 0;
655 0           self->wbuf_off = 0;
656 0           }
657              
658 0           static void handle_disconnect(pTHX_ ev_mc_t *self, const char *reason) {
659 0           int was_connected = self->connected;
660              
661 0           cleanup_connection(aTHX_ self);
662              
663             /* Cancel pending commands */
664 0           cancel_pending(aTHX_ self, err_disconnected);
665 0 0         if (self->magic == MC_MAGIC_FREED) return;
666              
667 0 0         if (!self->resume_waiting_on_reconnect) {
668 0           cancel_waiting(aTHX_ self, err_disconnected);
669 0 0         if (self->magic == MC_MAGIC_FREED) return;
670             }
671              
672 0 0         if (was_connected) {
673 0           emit_disconnect(aTHX_ self);
674 0 0         if (check_destroyed(self)) return;
675             }
676              
677 0 0         if (reason) {
678 0           emit_error(aTHX_ self, reason);
679 0 0         if (check_destroyed(self)) return;
680             }
681              
682 0 0         if (!self->intentional_disconnect && self->reconnect) {
    0          
683 0           schedule_reconnect(aTHX_ self);
684             }
685             }
686              
687             /* ================================================================
688             * Reconnection
689             * ================================================================ */
690              
691 0           static void schedule_reconnect(pTHX_ ev_mc_t *self) {
692 0 0         if (self->reconnect_timer_active) return;
693 0 0         if (self->max_reconnect_attempts > 0 &&
694 0 0         self->reconnect_attempts >= self->max_reconnect_attempts) {
695 0           emit_error(aTHX_ self, "max reconnect attempts reached");
696 0           return;
697             }
698              
699 0           self->reconnect_attempts++;
700              
701             /* Always defer through a timer (even with delay==0) so that an
702             immediate connect failure cannot recurse:
703             schedule_reconnect -> start_connect -> report_connect_error ->
704             schedule_reconnect -> ... blowing the C stack. */
705 0           ev_tstamp delay = (ev_tstamp)self->reconnect_delay_ms / 1000.0;
706 0 0         if (delay < 0) delay = 0;
707 0           ev_timer_init(&self->reconnect_timer, reconnect_timer_cb, delay, 0.0);
708 0           self->reconnect_timer.data = (void *)self;
709 0           ev_timer_start(self->loop, &self->reconnect_timer);
710 0           self->reconnect_timer_active = 1;
711             }
712              
713 0           static void arm_cmd_timer(ev_mc_t *self) {
714 0 0         if (self->command_timeout_ms <= 0) return;
715 0 0         if (!self->connected) return;
716 0           ev_tstamp timeout = (ev_tstamp)self->command_timeout_ms / 1000.0;
717 0 0         if (self->cmd_timer_active) {
718             /* Adjust repeat in-place; ev_timer_again restarts using the new value.
719             ev_timer_set is forbidden on an active watcher per libev docs. */
720 0           self->cmd_timer.repeat = timeout;
721 0           ev_timer_again(self->loop, &self->cmd_timer);
722             } else {
723 0           ev_timer_init(&self->cmd_timer, cmd_timeout_cb, timeout, timeout);
724 0           self->cmd_timer.data = (void *)self;
725 0           ev_timer_start(self->loop, &self->cmd_timer);
726 0           self->cmd_timer_active = 1;
727             }
728             }
729              
730 0           static void disarm_cmd_timer(ev_mc_t *self) {
731 0 0         if (self->cmd_timer_active) {
732 0           ev_timer_stop(self->loop, &self->cmd_timer);
733 0           self->cmd_timer_active = 0;
734             }
735 0           }
736              
737 0           static void stop_connect_timer(ev_mc_t *self) {
738 0 0         if (self->connect_timer_active) {
739 0           ev_timer_stop(self->loop, &self->connect_timer);
740 0           self->connect_timer_active = 0;
741             }
742 0           }
743              
744 0           static void stop_reconnect_timer(ev_mc_t *self) {
745 0 0         if (self->reconnect_timer_active) {
746 0           ev_timer_stop(self->loop, &self->reconnect_timer);
747 0           self->reconnect_timer_active = 0;
748             }
749 0           }
750              
751 0           static void stop_waiting_timer(ev_mc_t *self) {
752 0 0         if (self->waiting_timer_active) {
753 0           ev_timer_stop(self->loop, &self->waiting_timer);
754 0           self->waiting_timer_active = 0;
755             }
756 0           }
757              
758 0           static void cmd_timeout_cb(EV_P_ ev_timer *w, int revents) {
759 0           ev_mc_t *self = (ev_mc_t *)w->data;
760             (void)loop; (void)revents;
761              
762 0 0         if (self->magic != MC_MAGIC_ALIVE) return;
763              
764 0 0         if (ngx_queue_empty(&self->cb_queue)) {
765 0           disarm_cmd_timer(self);
766 0           return;
767             }
768              
769             /* Stop the repeating timer before disconnect */
770 0           disarm_cmd_timer(self);
771 0           self->callback_depth++;
772 0           handle_disconnect(aTHX_ self, "command timeout");
773 0           self->callback_depth--;
774 0           check_destroyed(self);
775             }
776              
777             /* Send SASL PLAIN auth. If cb is NULL, creates an internal callback
778             * that disconnects on auth failure. */
779 0           static void mc_send_sasl_auth(pTHX_ ev_mc_t *self, SV *cb) {
780 0 0         if (!self->username || !self->password) return;
    0          
781              
782 0           size_t ulen = strlen(self->username);
783 0           size_t plen = strlen(self->password);
784 0           size_t vlen = 1 + ulen + 1 + plen;
785             char *authdata;
786 0           Newx(authdata, vlen, char);
787 0           authdata[0] = '\0';
788 0           memcpy(authdata + 1, self->username, ulen);
789 0           authdata[1 + ulen] = '\0';
790 0           memcpy(authdata + 2 + ulen, self->password, plen);
791              
792 0           mc_enqueue_cmd(aTHX_ self, MC_OP_SASL_AUTH, "PLAIN", 5,
793             authdata, vlen, NULL, 0, 0, CB_CMD_SASL_AUTH, 0, cb);
794 0           Safefree(authdata);
795             }
796              
797 0           static void connect_timeout_cb(EV_P_ ev_timer *w, int revents) {
798 0           ev_mc_t *self = (ev_mc_t *)w->data;
799             (void)loop; (void)revents;
800              
801 0 0         if (self->magic != MC_MAGIC_ALIVE) return;
802              
803 0           self->connect_timer_active = 0;
804 0           self->callback_depth++;
805 0           handle_disconnect(aTHX_ self, "connect timeout");
806 0           self->callback_depth--;
807 0           check_destroyed(self);
808             }
809              
810 0           static void reconnect_timer_cb(EV_P_ ev_timer *w, int revents) {
811 0           ev_mc_t *self = (ev_mc_t *)w->data;
812             (void)loop; (void)revents;
813              
814 0 0         if (self->magic != MC_MAGIC_ALIVE) return;
815              
816 0           self->reconnect_timer_active = 0;
817 0           self->callback_depth++;
818 0           start_connect(aTHX_ self);
819 0           self->callback_depth--;
820 0           check_destroyed(self);
821             }
822              
823             /* ================================================================
824             * Flow control: waiting timer
825             * ================================================================ */
826              
827             static void schedule_waiting_timer(ev_mc_t *self);
828              
829 0           static void expire_waiting_commands(pTHX_ ev_mc_t *self) {
830 0           ev_tstamp now = ev_now(self->loop);
831 0           ev_tstamp timeout = (ev_tstamp)self->waiting_timeout_ms / 1000.0;
832              
833 0 0         while (!ngx_queue_empty(&self->wait_queue)) {
834 0           ngx_queue_t *q = ngx_queue_head(&self->wait_queue);
835 0           ev_mc_wait_t *wt = ngx_queue_data(q, ev_mc_wait_t, queue);
836              
837 0 0         if (wt->queued_at + timeout > now) break; /* not expired yet */
838              
839 0           ngx_queue_remove(q);
840 0           self->waiting_count--;
841              
842 0 0         if (wt->cb) {
843 0           invoke_cb(aTHX_ self, wt->cb, NULL, newSVsv(err_waiting_timeout));
844 0 0         if (self->magic == MC_MAGIC_FREED) {
845 0           cleanup_wait(aTHX_ wt);
846 0           return;
847             }
848             }
849 0           cleanup_wait(aTHX_ wt);
850             }
851             }
852              
853 0           static void waiting_timer_cb(EV_P_ ev_timer *w, int revents) {
854 0           ev_mc_t *self = (ev_mc_t *)w->data;
855             (void)loop; (void)revents;
856              
857 0 0         if (self->magic != MC_MAGIC_ALIVE) return;
858              
859 0           self->waiting_timer_active = 0;
860 0           self->callback_depth++;
861 0           expire_waiting_commands(aTHX_ self);
862 0           self->callback_depth--;
863 0 0         if (check_destroyed(self)) return;
864              
865 0 0         if (!ngx_queue_empty(&self->wait_queue) && self->waiting_timeout_ms > 0)
    0          
866 0           schedule_waiting_timer(self);
867             }
868              
869 0           static void schedule_waiting_timer(ev_mc_t *self) {
870 0 0         if (self->waiting_timer_active) return;
871 0 0         if (self->waiting_timeout_ms <= 0) return;
872 0 0         if (ngx_queue_empty(&self->wait_queue)) return;
873              
874 0           ngx_queue_t *q = ngx_queue_head(&self->wait_queue);
875 0           ev_mc_wait_t *wt = ngx_queue_data(q, ev_mc_wait_t, queue);
876 0           ev_tstamp timeout = (ev_tstamp)self->waiting_timeout_ms / 1000.0;
877 0           ev_tstamp delay = (wt->queued_at + timeout) - ev_now(self->loop);
878 0 0         if (delay < 0.0) delay = 0.0;
879              
880 0           ev_timer_init(&self->waiting_timer, waiting_timer_cb, delay, 0.0);
881 0           self->waiting_timer.data = (void *)self;
882 0           ev_timer_start(self->loop, &self->waiting_timer);
883 0           self->waiting_timer_active = 1;
884             }
885              
886             /* ================================================================
887             * Send next waiting command
888             * ================================================================ */
889              
890 0           static void send_next_waiting(pTHX_ ev_mc_t *self) {
891 0 0         while (!ngx_queue_empty(&self->wait_queue) && self->connected) {
    0          
892 0           ngx_queue_t *q = ngx_queue_head(&self->wait_queue);
893 0           ev_mc_wait_t *wt = ngx_queue_data(q, ev_mc_wait_t, queue);
894              
895             /* max_pending gates only counted entries; mc_enqueue_cmd uses the
896             same exception (|| !counted). Fire-and-forget (no_response) and
897             mget GETKQ entries are uncounted and must not be blocked. */
898 0 0         if (self->max_pending > 0 && self->pending_count >= self->max_pending
    0          
899 0 0         && !wt->no_response && wt->counted)
    0          
900 0           break;
901              
902 0           ngx_queue_remove(q);
903 0           self->waiting_count--;
904              
905             /* Append packet to write buffer */
906 0           buf_append_write(self, wt->packet, wt->packet_len);
907              
908 0 0         if (!wt->no_response) {
909 0           ev_mc_cb_t *cbt = alloc_cbt();
910 0           cbt->cb = wt->cb; wt->cb = NULL;
911 0           cbt->opaque = wt->opaque;
912 0           cbt->cmd = wt->cmd;
913 0           cbt->quiet = wt->quiet;
914 0           cbt->counted = wt->counted;
915 0           cbt->stats_hv = wt->stats_hv; wt->stats_hv = NULL;
916 0           cbt->mget_results = wt->mget_results; wt->mget_results = NULL;
917 0           ngx_queue_insert_tail(&self->cb_queue, &cbt->queue);
918 0 0         if (cbt->counted) self->pending_count++;
919 0           arm_cmd_timer(self);
920             }
921              
922 0           Safefree(wt->packet);
923 0           Safefree(wt);
924              
925 0           start_writing(self);
926             }
927 0           }
928              
929             /* ================================================================
930             * Build and enqueue a command
931             * ================================================================ */
932              
933             /* Allocate the next opaque, skipping 0 across wraparound (0 is reserved
934             * for fire-and-forget commands so error responses don't get matched to
935             * the wrong cb_queue head). */
936 0           static uint32_t mc_next_opaque(ev_mc_t *self) {
937 0           uint32_t op = self->next_opaque++;
938 0 0         if (self->next_opaque == 0) self->next_opaque = 1;
939 0           return op;
940             }
941              
942             /* Encode a complete request packet (header + extras + key + value) into
943             * the caller-supplied buffer p, which must be at least
944             * MC_HEADER_SIZE + extras_len + key_len + value_len bytes. */
945 0           static void mc_pack(char *p, uint8_t opcode,
946             const char *key, uint16_t key_len,
947             const char *value, uint32_t value_len,
948             const char *extras, uint8_t extras_len,
949             uint32_t opaque, uint64_t cas)
950             {
951 0           uint32_t body_len = (uint32_t)extras_len + (uint32_t)key_len + value_len;
952 0           mc_encode_header(p, opcode, key_len, extras_len, body_len, opaque, cas);
953 0           p += MC_HEADER_SIZE;
954 0 0         if (extras_len > 0) { memcpy(p, extras, extras_len); p += extras_len; }
955 0 0         if (key_len > 0) { memcpy(p, key, key_len); p += key_len; }
956 0 0         if (value_len > 0) { memcpy(p, value, value_len); }
957 0           }
958              
959             /* Fire-and-forget using a quiet opcode (SETQ/FLUSHQ/...). No cb_queue
960             * entry; opaque is always 0 (server only responds on error and that
961             * response is silently discarded by handle_response_packet). */
962 0           static void mc_fire_and_forget(pTHX_ ev_mc_t *self,
963             uint8_t opcode, const char *key, STRLEN key_len,
964             const char *value, STRLEN value_len,
965             const char *extras, uint8_t extras_len,
966             uint64_t cas)
967             {
968 0 0         if (key_len > MC_MAX_KEY_LEN)
969 0           croak("key too long (%d bytes, max %d)", (int)key_len, MC_MAX_KEY_LEN);
970              
971 0           uint32_t body_len = extras_len + (uint32_t)key_len + (uint32_t)value_len;
972 0           size_t packet_len = MC_HEADER_SIZE + body_len;
973              
974 0 0         if (self->connected) {
975 0           buf_ensure_write(self, packet_len);
976 0           mc_pack(self->wbuf + self->wbuf_len, opcode,
977 0           key, (uint16_t)key_len, value, (uint32_t)value_len,
978             extras, extras_len, 0, cas);
979 0           self->wbuf_len += packet_len;
980 0           start_writing(self);
981             } else {
982             /* Not yet connected (or reconnecting): queue and drain after connect. */
983             ev_mc_wait_t *wt;
984 0           Newxz(wt, 1, ev_mc_wait_t);
985 0           Newx(wt->packet, packet_len, char);
986 0           wt->packet_len = packet_len;
987 0           mc_pack(wt->packet, opcode,
988 0           key, (uint16_t)key_len, value, (uint32_t)value_len,
989             extras, extras_len, 0, cas);
990 0           wt->no_response = 1;
991 0           wt->queued_at = ev_now(self->loop);
992 0           ngx_queue_insert_tail(&self->wait_queue, &wt->queue);
993 0           self->waiting_count++;
994 0 0         if (self->waiting_timeout_ms > 0)
995 0           schedule_waiting_timer(self);
996             }
997 0           }
998              
999             /* Build a binary protocol packet and enqueue for sending.
1000             * Returns the assigned opaque. */
1001 0           static uint32_t mc_enqueue_cmd(pTHX_ ev_mc_t *self,
1002             uint8_t opcode, const char *key, STRLEN key_len,
1003             const char *value, STRLEN value_len,
1004             const char *extras, uint8_t extras_len,
1005             uint64_t cas, int cmd, int quiet, SV *cb)
1006             {
1007 0 0         if (key_len > MC_MAX_KEY_LEN)
1008 0           croak("key too long (%d bytes, max %d)", (int)key_len, MC_MAX_KEY_LEN);
1009              
1010 0           uint32_t opaque = mc_next_opaque(self);
1011 0           uint32_t body_len = extras_len + (uint32_t)key_len + (uint32_t)value_len;
1012 0           size_t packet_len = MC_HEADER_SIZE + body_len;
1013              
1014 0 0         int counted = (cmd != CB_CMD_MGET_ENTRY && cmd != CB_CMD_MGETS_ENTRY);
    0          
1015              
1016             /* Check if we can send immediately */
1017 0 0         int can_send = self->connected &&
1018 0 0         (self->max_pending <= 0 || self->pending_count < self->max_pending || !counted);
    0          
    0          
1019              
1020 0 0         if (can_send) {
1021 0           buf_ensure_write(self, packet_len);
1022 0           mc_pack(self->wbuf + self->wbuf_len, opcode,
1023 0           key, (uint16_t)key_len, value, (uint32_t)value_len,
1024             extras, extras_len, opaque, cas);
1025 0           self->wbuf_len += packet_len;
1026              
1027 0           ev_mc_cb_t *cbt = alloc_cbt();
1028 0 0         if (cb) { cbt->cb = newSVsv(cb); }
1029 0           cbt->opaque = opaque;
1030 0           cbt->cmd = cmd;
1031 0           cbt->quiet = quiet;
1032 0           cbt->counted = counted;
1033 0 0         if (cmd == CB_CMD_STATS) {
1034 0           cbt->stats_hv = newHV();
1035             }
1036 0           ngx_queue_insert_tail(&self->cb_queue, &cbt->queue);
1037 0 0         if (counted) self->pending_count++;
1038              
1039 0           arm_cmd_timer(self);
1040 0           start_writing(self);
1041             } else {
1042             /* Queue for later */
1043             ev_mc_wait_t *wt;
1044 0           Newxz(wt, 1, ev_mc_wait_t);
1045 0           Newx(wt->packet, packet_len, char);
1046 0           wt->packet_len = packet_len;
1047 0           mc_pack(wt->packet, opcode,
1048 0           key, (uint16_t)key_len, value, (uint32_t)value_len,
1049             extras, extras_len, opaque, cas);
1050              
1051 0 0         if (cb) { wt->cb = newSVsv(cb); }
1052 0           wt->opaque = opaque;
1053 0           wt->cmd = cmd;
1054 0           wt->quiet = quiet;
1055 0           wt->counted = counted;
1056 0           wt->queued_at = ev_now(self->loop);
1057 0 0         if (cmd == CB_CMD_STATS) {
1058 0           wt->stats_hv = newHV();
1059             }
1060              
1061 0           ngx_queue_insert_tail(&self->wait_queue, &wt->queue);
1062 0           self->waiting_count++;
1063              
1064 0 0         if (self->waiting_timeout_ms > 0)
1065 0           schedule_waiting_timer(self);
1066             }
1067              
1068 0           return opaque;
1069             }
1070              
1071             /* ================================================================
1072             * Multi-get: GETKQ + NOOP fence
1073             * ================================================================ */
1074              
1075             /* full_info: 0 = mget (key=>value), 1 = mgets (key=>{value,flags,cas}) */
1076 0           static void mc_enqueue_mget(pTHX_ ev_mc_t *self, AV *keys_av, SV *cb, int full_info) {
1077 0           SSize_t count = av_len(keys_av) + 1;
1078 0 0         if (count <= 0) {
1079 0 0         if (cb) {
1080 0           HV *hv = newHV();
1081 0           self->callback_depth++;
1082 0           invoke_cb(aTHX_ self, cb, newRV_noinc((SV*)hv), NULL);
1083 0           self->callback_depth--;
1084 0           check_destroyed(self);
1085             }
1086 0           return;
1087             }
1088              
1089 0 0         int entry_cmd = full_info ? CB_CMD_MGETS_ENTRY : CB_CMD_MGET_ENTRY;
1090 0 0         int fence_cmd = full_info ? CB_CMD_MGETS_FENCE : CB_CMD_MGET_FENCE;
1091              
1092             SSize_t i;
1093              
1094             /* Pre-validate all key lengths before mutating wbuf / queues / refcounts.
1095             Croaking mid-loop would leave half-built GETKQ packets queued without
1096             a NOOP fence, corrupting opaque ordering on the wire. */
1097 0 0         for (i = 0; i < count; i++) {
1098 0           SV **sv = av_fetch(keys_av, i, 0);
1099 0 0         if (!sv || !SvOK(*sv)) continue;
    0          
1100             STRLEN key_len;
1101 0           (void)SvPV(*sv, key_len);
1102 0 0         if (key_len > MC_MAX_KEY_LEN)
1103 0           croak("mget key too long (%d bytes, max %d)", (int)key_len, MC_MAX_KEY_LEN);
1104             }
1105              
1106 0           HV *results = newHV();
1107              
1108 0 0         for (i = 0; i < count; i++) {
1109 0           SV **sv = av_fetch(keys_av, i, 0);
1110 0 0         if (!sv || !SvOK(*sv)) continue;
    0          
1111              
1112             STRLEN key_len;
1113 0           const char *key = SvPV(*sv, key_len);
1114              
1115 0           uint32_t opaque = mc_next_opaque(self);
1116 0           uint32_t body_len = (uint32_t)key_len;
1117 0           size_t packet_len = MC_HEADER_SIZE + body_len;
1118              
1119 0           int can_send = self->connected;
1120              
1121 0 0         if (can_send) {
1122 0           buf_ensure_write(self, packet_len);
1123 0           char *p = self->wbuf + self->wbuf_len;
1124 0           mc_encode_header(p, MC_OP_GETKQ, (uint16_t)key_len, 0, body_len, opaque, 0);
1125 0           memcpy(p + MC_HEADER_SIZE, key, key_len);
1126 0           self->wbuf_len += packet_len;
1127              
1128 0           ev_mc_cb_t *cbt = alloc_cbt();
1129 0           cbt->opaque = opaque;
1130 0           cbt->cmd = entry_cmd;
1131 0           cbt->quiet = 1;
1132 0           cbt->counted = 0;
1133 0           cbt->mget_results = results;
1134 0           ngx_queue_insert_tail(&self->cb_queue, &cbt->queue);
1135             } else {
1136             ev_mc_wait_t *wt;
1137 0           Newxz(wt, 1, ev_mc_wait_t);
1138 0           Newx(wt->packet, packet_len, char);
1139 0           wt->packet_len = packet_len;
1140 0           mc_encode_header(wt->packet, MC_OP_GETKQ, (uint16_t)key_len, 0, body_len, opaque, 0);
1141 0           memcpy(wt->packet + MC_HEADER_SIZE, key, key_len);
1142 0           wt->opaque = opaque;
1143 0           wt->cmd = entry_cmd;
1144 0           wt->quiet = 1;
1145 0           wt->counted = 0;
1146 0           wt->mget_results = results;
1147 0           wt->queued_at = ev_now(self->loop);
1148 0           ngx_queue_insert_tail(&self->wait_queue, &wt->queue);
1149 0           self->waiting_count++;
1150             }
1151             }
1152              
1153             /* NOOP fence */
1154             {
1155 0           uint32_t opaque = mc_next_opaque(self);
1156 0           size_t packet_len = MC_HEADER_SIZE;
1157 0           int can_send = self->connected;
1158              
1159 0 0         if (can_send) {
1160 0           buf_ensure_write(self, packet_len);
1161 0           char *p = self->wbuf + self->wbuf_len;
1162 0           mc_encode_header(p, MC_OP_NOOP, 0, 0, 0, opaque, 0);
1163 0           self->wbuf_len += packet_len;
1164              
1165 0           ev_mc_cb_t *cbt = alloc_cbt();
1166 0 0         if (cb) { cbt->cb = newSVsv(cb); }
1167 0           cbt->opaque = opaque;
1168 0           cbt->cmd = fence_cmd;
1169 0           cbt->quiet = 0;
1170 0           cbt->counted = 1;
1171 0           cbt->mget_results = results;
1172 0           SvREFCNT_inc_simple_void_NN((SV*)results);
1173 0           ngx_queue_insert_tail(&self->cb_queue, &cbt->queue);
1174 0           self->pending_count++;
1175              
1176 0           arm_cmd_timer(self);
1177 0           start_writing(self);
1178             } else {
1179             ev_mc_wait_t *wt;
1180 0           Newxz(wt, 1, ev_mc_wait_t);
1181 0           Newx(wt->packet, packet_len, char);
1182 0           wt->packet_len = packet_len;
1183 0           mc_encode_header(wt->packet, MC_OP_NOOP, 0, 0, 0, opaque, 0);
1184 0 0         if (cb) { wt->cb = newSVsv(cb); }
1185 0           wt->opaque = opaque;
1186 0           wt->cmd = fence_cmd;
1187 0           wt->quiet = 0;
1188 0           wt->counted = 1;
1189 0           wt->mget_results = results; /* owned ref */
1190 0           SvREFCNT_inc_simple_void_NN((SV*)results);
1191 0           wt->queued_at = ev_now(self->loop);
1192 0           ngx_queue_insert_tail(&self->wait_queue, &wt->queue);
1193 0           self->waiting_count++;
1194             }
1195             }
1196              
1197             /* results starts with refcnt=1, fence owns +1 = total 2.
1198             * Drop our initial ref, fence now owns it (refcnt=1) */
1199 0           SvREFCNT_dec((SV*)results);
1200              
1201 0 0         if (self->waiting_timeout_ms > 0)
1202 0           schedule_waiting_timer(self);
1203             }
1204              
1205             /* ================================================================
1206             * Response processing
1207             * ================================================================ */
1208              
1209             /* Drain quiet entries before the given opaque.
1210             * Quiet entries that got no response are "quiet success":
1211             * - MGET_ENTRY: miss (nothing to do)
1212             * - Other quiet commands: invoke callback with success */
1213 0           static void drain_quiet_before(pTHX_ ev_mc_t *self, uint32_t opaque) {
1214 0 0         while (!ngx_queue_empty(&self->cb_queue)) {
1215 0           ngx_queue_t *q = ngx_queue_head(&self->cb_queue);
1216 0           ev_mc_cb_t *cbt = ngx_queue_data(q, ev_mc_cb_t, queue);
1217              
1218             /* Use signed comparison to handle wraparound */
1219 0 0         if ((int32_t)(cbt->opaque - opaque) >= 0) break;
1220 0 0         if (!cbt->quiet) break; /* non-quiet should always get response */
1221              
1222 0           ngx_queue_remove(q);
1223 0 0         if (cbt->counted) self->pending_count--;
1224              
1225             /* Quiet success: for storage = stored, for MGET_ENTRY = miss */
1226 0 0         if (cbt->cb && !cbt->skipped &&
    0          
1227 0 0         cbt->cmd != CB_CMD_MGET_ENTRY && cbt->cmd != CB_CMD_MGETS_ENTRY) {
    0          
1228 0           invoke_cb(aTHX_ self, cbt->cb, newSViv(1), NULL);
1229 0 0         if (self->magic == MC_MAGIC_FREED) {
1230 0           cleanup_cbt(aTHX_ cbt);
1231 0           return;
1232             }
1233             }
1234 0           cleanup_cbt(aTHX_ cbt);
1235             }
1236             }
1237              
1238 0           static void handle_response_packet(pTHX_ ev_mc_t *self) {
1239 0           const char *pkt = self->rbuf;
1240 0           uint16_t key_len = mc_read_u16(pkt + 2);
1241 0           uint8_t extras_len = (uint8_t)pkt[4];
1242 0           uint16_t status = mc_read_u16(pkt + 6);
1243 0           uint32_t body_len = mc_read_u32(pkt + 8);
1244 0           uint32_t opaque = mc_read_u32(pkt + 12);
1245 0           uint64_t cas = mc_read_u64(pkt + 16);
1246              
1247 0           const char *body = pkt + MC_HEADER_SIZE;
1248              
1249 0 0         if ((uint32_t)extras_len + key_len > body_len) {
1250 0           handle_disconnect(aTHX_ self, "malformed response: body too short");
1251 0           return;
1252             }
1253              
1254 0           const char *extras = body;
1255 0           const char *key_ptr = body + extras_len;
1256 0           const char *value_ptr = body + extras_len + key_len;
1257 0           uint32_t value_len = body_len - extras_len - key_len;
1258              
1259             /* Drain quiet entries that were skipped (no response) */
1260 0           drain_quiet_before(aTHX_ self, opaque);
1261 0 0         if (self->magic == MC_MAGIC_FREED) return;
1262              
1263             /* Find matching callback entry */
1264 0 0         if (ngx_queue_empty(&self->cb_queue)) {
1265             /* Stray response (e.g., quiet opcode error) — discard */
1266 0           return;
1267             }
1268              
1269 0           ngx_queue_t *q = ngx_queue_head(&self->cb_queue);
1270 0           ev_mc_cb_t *cbt = ngx_queue_data(q, ev_mc_cb_t, queue);
1271              
1272 0 0         if (cbt->opaque != opaque) {
1273             /* opaque=0 is fire-and-forget error response — discard silently */
1274 0 0         if (opaque == 0) return;
1275             char errbuf[128];
1276 0           snprintf(errbuf, sizeof(errbuf),
1277             "protocol error: expected opaque %u, got %u", cbt->opaque, opaque);
1278 0           handle_disconnect(aTHX_ self, errbuf);
1279 0           return;
1280             }
1281              
1282             /* STAT accumulation: don't remove from queue until terminator */
1283 0 0         if (cbt->cmd == CB_CMD_STATS && status == MC_STATUS_OK && key_len > 0) {
    0          
    0          
1284 0 0         if (cbt->stats_hv) {
1285 0           hv_store(cbt->stats_hv, key_ptr, key_len,
1286             newSVpvn(value_ptr, value_len), 0);
1287             }
1288 0           return; /* wait for more stats or terminator */
1289             }
1290              
1291             /* Remove from queue */
1292 0           ngx_queue_remove(q);
1293 0 0         if (cbt->counted) self->pending_count--;
1294              
1295             /* Handle error status */
1296 0 0         if (status != MC_STATUS_OK) {
1297             /* GET/GETS/GAT/GATS miss: return (undef, undef) */
1298 0 0         if (status == MC_STATUS_KEY_NOT_FOUND &&
1299 0 0         (cbt->cmd == CB_CMD_GET || cbt->cmd == CB_CMD_GETS ||
    0          
1300 0 0         cbt->cmd == CB_CMD_GAT || cbt->cmd == CB_CMD_GATS)) {
    0          
1301 0 0         if (cbt->cb && !cbt->skipped)
    0          
1302 0           invoke_cb(aTHX_ self, cbt->cb, NULL, NULL);
1303             }
1304 0 0         else if (cbt->cmd == CB_CMD_MGET_ENTRY || cbt->cmd == CB_CMD_MGETS_ENTRY) {
    0          
1305             /* mget miss — nothing to add */
1306             }
1307             /* SASL auth failure: disconnect (auto-auth) or report to callback */
1308 0 0         else if (cbt->cmd == CB_CMD_SASL_AUTH) {
1309 0 0         if (cbt->cb && !cbt->skipped) {
    0          
1310 0           const char *errstr = mc_status_str(status);
1311 0 0         if (value_len > 0)
1312 0           invoke_cb(aTHX_ self, cbt->cb, NULL,
1313             newSVpvf("%s: %.*s", errstr, (int)value_len, value_ptr));
1314             else
1315 0           invoke_cb(aTHX_ self, cbt->cb, NULL, newSVpv(errstr, 0));
1316 0 0         } else if (!cbt->cb) {
1317             /* Auto-auth failed — disconnect with error */
1318             char errbuf[256];
1319 0 0         if (value_len > 0)
1320 0           snprintf(errbuf, sizeof(errbuf), "SASL auth failed: %.*s",
1321             (int)value_len, value_ptr);
1322             else
1323 0           snprintf(errbuf, sizeof(errbuf), "SASL auth failed: %s",
1324             mc_status_str(status));
1325 0           cleanup_cbt(aTHX_ cbt);
1326 0           handle_disconnect(aTHX_ self, errbuf);
1327 0           return;
1328             }
1329             }
1330             else {
1331             /* Real error */
1332 0 0         if (cbt->cb && !cbt->skipped) {
    0          
1333 0           const char *errstr = mc_status_str(status);
1334             /* Include server error message if present */
1335 0 0         if (value_len > 0)
1336 0           invoke_cb(aTHX_ self, cbt->cb, NULL,
1337             newSVpvf("%s: %.*s", errstr, (int)value_len, value_ptr));
1338             else
1339 0           invoke_cb(aTHX_ self, cbt->cb, NULL, newSVpv(errstr, 0));
1340             }
1341             }
1342 0 0         if (self->magic != MC_MAGIC_FREED) {
1343 0           cleanup_cbt(aTHX_ cbt);
1344 0           send_next_waiting(aTHX_ self);
1345             } else {
1346 0           cleanup_cbt(aTHX_ cbt);
1347             }
1348 0           return;
1349             }
1350              
1351             /* Handle success */
1352 0 0         if (cbt->skipped) {
1353 0           cleanup_cbt(aTHX_ cbt);
1354 0           send_next_waiting(aTHX_ self);
1355 0           return;
1356             }
1357              
1358 0           switch (cbt->cmd) {
1359 0           case CB_CMD_GET:
1360             case CB_CMD_GAT:
1361 0 0         if (cbt->cb)
1362 0           invoke_cb(aTHX_ self, cbt->cb, newSVpvn(value_ptr, value_len), NULL);
1363 0           break;
1364              
1365 0           case CB_CMD_GETS:
1366             case CB_CMD_GATS:
1367 0 0         if (cbt->cb) {
1368 0           HV *hv = newHV();
1369 0           hv_stores(hv, "value", newSVpvn(value_ptr, value_len));
1370 0 0         if (extras_len >= 4)
1371 0           hv_stores(hv, "flags", newSVuv(mc_read_u32(extras)));
1372 0           hv_stores(hv, "cas", newSVuv(cas));
1373 0           invoke_cb(aTHX_ self, cbt->cb, newRV_noinc((SV*)hv), NULL);
1374             }
1375 0           break;
1376              
1377 0           case CB_CMD_STORE:
1378             case CB_CMD_DELETE:
1379             case CB_CMD_TOUCH:
1380             case CB_CMD_FLUSH:
1381             case CB_CMD_NOOP:
1382             case CB_CMD_QUIT:
1383 0 0         if (cbt->cb)
1384 0           invoke_cb(aTHX_ self, cbt->cb, newSViv(1), NULL);
1385 0           break;
1386              
1387 0           case CB_CMD_ARITH:
1388 0 0         if (cbt->cb) {
1389 0 0         if (value_len >= 8) {
1390 0           uint64_t new_val = mc_read_u64(value_ptr);
1391 0           invoke_cb(aTHX_ self, cbt->cb, newSVuv((UV)new_val), NULL);
1392             } else {
1393 0           invoke_cb(aTHX_ self, cbt->cb, NULL,
1394             newSVpv("malformed ARITH response", 0));
1395             }
1396             }
1397 0           break;
1398              
1399 0           case CB_CMD_VERSION:
1400             case CB_CMD_SASL_LIST:
1401 0 0         if (cbt->cb)
1402 0           invoke_cb(aTHX_ self, cbt->cb, newSVpvn(value_ptr, value_len), NULL);
1403 0           break;
1404              
1405 0           case CB_CMD_SASL_AUTH:
1406 0 0         if (cbt->cb)
1407 0           invoke_cb(aTHX_ self, cbt->cb, newSViv(1), NULL);
1408             /* Auto-auth (cb==NULL): wait queue is drained by the common
1409             send_next_waiting call at the end of this switch. */
1410 0           break;
1411              
1412 0           case CB_CMD_STATS:
1413             /* Terminator (key_len==0): deliver accumulated stats */
1414 0 0         if (cbt->cb && cbt->stats_hv) {
    0          
1415 0           SV *rv = newRV_noinc((SV*)cbt->stats_hv);
1416 0           cbt->stats_hv = NULL; /* transferred ownership */
1417 0           invoke_cb(aTHX_ self, cbt->cb, rv, NULL);
1418             }
1419 0           break;
1420              
1421 0           case CB_CMD_MGET_ENTRY:
1422 0 0         if (cbt->mget_results && key_len > 0) {
    0          
1423 0           hv_store(cbt->mget_results, key_ptr, key_len,
1424             newSVpvn(value_ptr, value_len), 0);
1425             }
1426 0           break;
1427              
1428 0           case CB_CMD_MGETS_ENTRY:
1429 0 0         if (cbt->mget_results && key_len > 0) {
    0          
1430 0           HV *info = newHV();
1431 0           hv_stores(info, "value", newSVpvn(value_ptr, value_len));
1432 0 0         if (extras_len >= 4)
1433 0           hv_stores(info, "flags", newSVuv(mc_read_u32(extras)));
1434 0           hv_stores(info, "cas", newSVuv(cas));
1435 0           hv_store(cbt->mget_results, key_ptr, key_len,
1436             newRV_noinc((SV*)info), 0);
1437             }
1438 0           break;
1439              
1440 0           case CB_CMD_MGET_FENCE:
1441             case CB_CMD_MGETS_FENCE:
1442 0 0         if (cbt->cb && cbt->mget_results) {
    0          
1443 0           SV *rv = newRV_noinc((SV*)cbt->mget_results);
1444 0           cbt->mget_results = NULL;
1445 0           invoke_cb(aTHX_ self, cbt->cb, rv, NULL);
1446             }
1447 0           break;
1448             }
1449              
1450 0 0         if (self->magic != MC_MAGIC_FREED) {
1451 0           cleanup_cbt(aTHX_ cbt);
1452 0           send_next_waiting(aTHX_ self);
1453             } else {
1454 0           cleanup_cbt(aTHX_ cbt);
1455             }
1456             }
1457              
1458 0           static void process_responses(pTHX_ ev_mc_t *self) {
1459 0           int processed = 0;
1460 0 0         while (self->rbuf_len >= MC_HEADER_SIZE) {
1461             /* Verify magic byte */
1462 0 0         if ((uint8_t)self->rbuf[0] != MC_RES_MAGIC) {
1463 0           handle_disconnect(aTHX_ self, "invalid response magic byte");
1464 0           return;
1465             }
1466              
1467 0           uint32_t body_len = mc_read_u32(self->rbuf + 8);
1468             /* Sanity bound: memcached's hard limit is 128 MB; cap at 256 MB to
1469             catch garbage/MITM responses without rejecting any real reply.
1470             Also avoids size_t overflow on 32-bit perls when MC_HEADER_SIZE
1471             is added below. */
1472 0 0         if (body_len > 0x10000000u) {
1473 0           handle_disconnect(aTHX_ self, "response body too large");
1474 0           return;
1475             }
1476 0           size_t total_len = (size_t)MC_HEADER_SIZE + body_len;
1477              
1478 0 0         if (self->rbuf_len < total_len) break; /* incomplete packet */
1479              
1480 0           handle_response_packet(aTHX_ self);
1481 0 0         if (self->magic == MC_MAGIC_FREED) return;
1482 0 0         if (!self->connected) return; /* disconnect() called from callback */
1483              
1484             /* Consume from buffer */
1485 0           self->rbuf_len -= total_len;
1486 0 0         if (self->rbuf_len > 0)
1487 0           memmove(self->rbuf, self->rbuf + total_len, self->rbuf_len);
1488 0           processed++;
1489             }
1490              
1491             /* Reset command timeout on activity */
1492 0 0         if (processed > 0 && self->cmd_timer_active) {
    0          
1493 0 0         if (ngx_queue_empty(&self->cb_queue))
1494 0           disarm_cmd_timer(self);
1495             else
1496 0           arm_cmd_timer(self);
1497             }
1498             }
1499              
1500             /* ================================================================
1501             * IO callbacks
1502             * ================================================================ */
1503              
1504 0           static void on_readable(pTHX_ ev_mc_t *self) {
1505 0           buf_ensure_read(self, 4096);
1506 0           ssize_t n = read(self->fd, self->rbuf + self->rbuf_len,
1507 0           self->rbuf_cap - self->rbuf_len);
1508 0 0         if (n > 0) {
1509 0           self->rbuf_len += n;
1510 0           process_responses(aTHX_ self);
1511 0 0         } else if (n == 0) {
1512 0           handle_disconnect(aTHX_ self, "connection closed by server");
1513             } else {
1514 0 0         if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
    0          
    0          
1515 0           return;
1516             char errbuf[128];
1517 0           snprintf(errbuf, sizeof(errbuf), "read error: %s", strerror(errno));
1518 0           handle_disconnect(aTHX_ self, errbuf);
1519             }
1520             }
1521              
1522 0           static int try_write(ev_mc_t *self) {
1523 0 0         while (self->wbuf_off < self->wbuf_len) {
1524 0           ssize_t n = write(self->fd, self->wbuf + self->wbuf_off,
1525 0           self->wbuf_len - self->wbuf_off);
1526 0 0         if (n > 0) {
1527 0           self->wbuf_off += n;
1528 0 0         } else if (n < 0) {
1529 0 0         if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
    0          
    0          
1530 0           return 0; /* try again later */
1531 0           return -1; /* error */
1532             }
1533             }
1534             /* All written */
1535 0           self->wbuf_len = 0;
1536 0           self->wbuf_off = 0;
1537 0           stop_writing(self);
1538 0           return 1;
1539             }
1540              
1541 0           static void on_connect_complete(pTHX_ ev_mc_t *self) {
1542 0           int err = 0;
1543 0           socklen_t len = sizeof(err);
1544 0 0         if (getsockopt(self->fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0 || err != 0) {
    0          
1545             char errbuf[128];
1546 0           snprintf(errbuf, sizeof(errbuf), "connect failed: %s",
1547 0 0         strerror(err ? err : errno));
1548 0           close(self->fd);
1549 0           self->fd = -1;
1550 0           self->connecting = 0;
1551 0           stop_writing(self);
1552 0           stop_connect_timer(self);
1553              
1554 0           report_connect_error(aTHX_ self, errbuf);
1555 0           return;
1556             }
1557              
1558 0           self->connecting = 0;
1559 0           self->connected = 1;
1560              
1561 0           stop_writing(self);
1562 0           stop_connect_timer(self);
1563              
1564 0           finish_connect_success(aTHX_ self);
1565             }
1566              
1567 0           static void io_cb(EV_P_ ev_io *w, int revents) {
1568 0           ev_mc_t *self = (ev_mc_t *)w->data;
1569             (void)loop;
1570              
1571 0 0         if (self->magic != MC_MAGIC_ALIVE) return;
1572              
1573 0           self->callback_depth++;
1574              
1575 0 0         if (self->connecting) {
1576 0 0         if (revents & EV_WRITE) {
1577 0           on_connect_complete(aTHX_ self);
1578             }
1579 0           self->callback_depth--;
1580 0           check_destroyed(self);
1581 0           return;
1582             }
1583              
1584 0 0         if (revents & EV_READ) {
1585 0           on_readable(aTHX_ self);
1586 0 0         if (self->magic != MC_MAGIC_ALIVE) {
1587 0           self->callback_depth--;
1588 0           check_destroyed(self);
1589 0           return;
1590             }
1591             }
1592              
1593 0 0         if (revents & EV_WRITE) {
1594 0           int rv = try_write(self);
1595 0 0         if (rv < 0) {
1596             char errbuf[128];
1597 0           snprintf(errbuf, sizeof(errbuf), "write error: %s", strerror(errno));
1598 0           handle_disconnect(aTHX_ self, errbuf);
1599             }
1600             }
1601              
1602 0           self->callback_depth--;
1603 0           check_destroyed(self);
1604             }
1605              
1606             /* ================================================================
1607             * Connection
1608             * ================================================================ */
1609              
1610 0           static void start_connect(pTHX_ ev_mc_t *self) {
1611             int fd, ret;
1612              
1613 0 0         if (self->path) {
1614             /* Unix socket */
1615             struct sockaddr_un addr;
1616 0           memset(&addr, 0, sizeof(addr));
1617 0           addr.sun_family = AF_UNIX;
1618 0 0         if (strlen(self->path) >= sizeof(addr.sun_path)) {
1619 0           emit_error(aTHX_ self, "unix socket path too long");
1620 0           return;
1621             }
1622 0           strncpy(addr.sun_path, self->path, sizeof(addr.sun_path) - 1);
1623              
1624 0           fd = socket(AF_UNIX, SOCK_STREAM, 0);
1625 0 0         if (fd < 0) {
1626             char errbuf[128];
1627 0           snprintf(errbuf, sizeof(errbuf), "socket: %s", strerror(errno));
1628 0           report_connect_error(aTHX_ self, errbuf);
1629 0           return;
1630             }
1631              
1632             /* non-blocking */
1633             {
1634 0           int fl = fcntl(fd, F_GETFL);
1635 0 0         if (fl < 0 || fcntl(fd, F_SETFL, fl | O_NONBLOCK) < 0) {
    0          
1636 0           close(fd);
1637 0           report_connect_error(aTHX_ self, "fcntl O_NONBLOCK failed");
1638 0           return;
1639             }
1640             }
1641              
1642 0           self->fd = fd;
1643 0           ret = connect(fd, (struct sockaddr *)&addr, sizeof(addr));
1644             } else {
1645             /* TCP */
1646 0           struct addrinfo hints, *res = NULL;
1647             char port_str[16];
1648              
1649 0           snprintf(port_str, sizeof(port_str), "%d", self->port);
1650 0           memset(&hints, 0, sizeof(hints));
1651 0           hints.ai_family = AF_UNSPEC;
1652 0           hints.ai_socktype = SOCK_STREAM;
1653              
1654 0           ret = getaddrinfo(self->host, port_str, &hints, &res);
1655 0 0         if (ret != 0) {
1656             char errbuf[256];
1657 0           snprintf(errbuf, sizeof(errbuf), "getaddrinfo: %s", gai_strerror(ret));
1658 0           report_connect_error(aTHX_ self, errbuf);
1659 0           return;
1660             }
1661              
1662 0           fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
1663 0 0         if (fd < 0) {
1664 0           freeaddrinfo(res);
1665             char errbuf[128];
1666 0           snprintf(errbuf, sizeof(errbuf), "socket: %s", strerror(errno));
1667 0           report_connect_error(aTHX_ self, errbuf);
1668 0           return;
1669             }
1670              
1671             /* non-blocking */
1672             {
1673 0           int fl = fcntl(fd, F_GETFL);
1674 0 0         if (fl < 0 || fcntl(fd, F_SETFL, fl | O_NONBLOCK) < 0) {
    0          
1675 0           freeaddrinfo(res);
1676 0           close(fd);
1677 0           report_connect_error(aTHX_ self, "fcntl O_NONBLOCK failed");
1678 0           return;
1679             }
1680             }
1681              
1682             /* TCP_NODELAY */
1683             {
1684 0           int one = 1;
1685 0           setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one));
1686             }
1687              
1688 0           self->fd = fd;
1689 0           ret = connect(fd, res->ai_addr, res->ai_addrlen);
1690 0           freeaddrinfo(res);
1691             }
1692              
1693 0 0         if (ret == 0) {
1694             /* Connected immediately (localhost) */
1695 0           self->connected = 1;
1696 0           ev_io_init(&self->rio, io_cb, self->fd, EV_READ);
1697 0           self->rio.data = (void *)self;
1698 0           ev_io_init(&self->wio, io_cb, self->fd, EV_WRITE);
1699 0           self->wio.data = (void *)self;
1700 0           ev_set_priority(&self->rio, self->priority);
1701 0           ev_set_priority(&self->wio, self->priority);
1702              
1703 0           finish_connect_success(aTHX_ self);
1704 0           return;
1705             }
1706              
1707 0 0         if (errno != EINPROGRESS) {
1708             char errbuf[128];
1709 0           snprintf(errbuf, sizeof(errbuf), "connect: %s", strerror(errno));
1710 0           close(self->fd);
1711 0           self->fd = -1;
1712 0           report_connect_error(aTHX_ self, errbuf);
1713 0           return;
1714             }
1715              
1716             /* In progress - wait for writability */
1717 0           self->connecting = 1;
1718 0           ev_io_init(&self->rio, io_cb, self->fd, EV_READ);
1719 0           self->rio.data = (void *)self;
1720 0           ev_io_init(&self->wio, io_cb, self->fd, EV_WRITE);
1721 0           self->wio.data = (void *)self;
1722 0           ev_set_priority(&self->rio, self->priority);
1723 0           ev_set_priority(&self->wio, self->priority);
1724 0           start_writing(self);
1725              
1726 0 0         if (self->connect_timeout_ms > 0) {
1727 0           ev_tstamp delay = (ev_tstamp)self->connect_timeout_ms / 1000.0;
1728 0           ev_timer_init(&self->connect_timer, connect_timeout_cb, delay, 0.0);
1729 0           self->connect_timer.data = (void *)self;
1730 0           ev_timer_start(self->loop, &self->connect_timer);
1731 0           self->connect_timer_active = 1;
1732             }
1733             }
1734              
1735             /* ================================================================
1736             * XS interface
1737             * ================================================================ */
1738              
1739             MODULE = EV::Memcached PACKAGE = EV::Memcached
1740              
1741             BOOT:
1742             {
1743 8 50         I_EV_API("EV::Memcached");
    50          
    50          
1744 8           err_skipped = newSVpvs("skipped");
1745 8           SvREADONLY_on(err_skipped);
1746 8           err_disconnected = newSVpvs("disconnected");
1747 8           SvREADONLY_on(err_disconnected);
1748 8           err_waiting_timeout = newSVpvs("waiting timeout");
1749 8           SvREADONLY_on(err_waiting_timeout);
1750             }
1751              
1752             EV::Memcached
1753             new(char *class, ...)
1754             CODE:
1755             {
1756             PERL_UNUSED_VAR(class);
1757 0 0         if ((items - 1) % 2 != 0) croak("odd number of arguments");
1758              
1759 0           Newxz(RETVAL, 1, ev_mc_t);
1760 0           RETVAL->magic = MC_MAGIC_ALIVE;
1761 0           RETVAL->fd = -1;
1762 0           RETVAL->port = 11211;
1763 0           RETVAL->next_opaque = 1; /* reserve 0 for fire-and-forget quiet ops */
1764 0           ngx_queue_init(&RETVAL->cb_queue);
1765 0           ngx_queue_init(&RETVAL->wait_queue);
1766 0           Newx(RETVAL->rbuf, BUF_INIT_SIZE, char);
1767 0           RETVAL->rbuf_cap = BUF_INIT_SIZE;
1768 0           Newx(RETVAL->wbuf, BUF_INIT_SIZE, char);
1769 0           RETVAL->wbuf_cap = BUF_INIT_SIZE;
1770              
1771             /* Default error handler: warn. Callback exceptions are caught by
1772             G_EVAL in emit_error, so a `die` would be demoted to a warning
1773             anyway — emit it directly and avoid the double prefix. */
1774 0           RETVAL->on_error = eval_pv("sub { warn \"EV::Memcached error: @_\\n\" }", TRUE);
1775 0           SvREFCNT_inc_simple_void_NN(RETVAL->on_error);
1776              
1777             /* Parse options */
1778 0           SV *host_sv = NULL, *path_sv = NULL;
1779 0           int port = 11211;
1780 0           int do_reconnect = 0, reconnect_delay = 1000, max_reconnect_attempts = 0;
1781 0           RETVAL->loop = EV_DEFAULT;
1782             int i;
1783              
1784 0 0         for (i = 1; i < items; i += 2) {
1785 0           const char *k = SvPV_nolen(ST(i));
1786 0           SV *v = ST(i + 1);
1787              
1788 0 0         if (strEQ(k, "host")) host_sv = v;
1789 0 0         else if (strEQ(k, "port")) port = SvIV(v);
1790 0 0         else if (strEQ(k, "path")) path_sv = v;
1791 0 0         else if (strEQ(k, "on_error")) {
1792 0 0         CLEAR_HANDLER(RETVAL->on_error);
1793 0 0         if (SvOK(v) && SvROK(v)) RETVAL->on_error = newSVsv(v);
    0          
1794             }
1795 0 0         else if (strEQ(k, "on_connect")) {
1796 0 0         if (SvOK(v) && SvROK(v)) RETVAL->on_connect = newSVsv(v);
    0          
1797             }
1798 0 0         else if (strEQ(k, "on_disconnect")) {
1799 0 0         if (SvOK(v) && SvROK(v)) RETVAL->on_disconnect = newSVsv(v);
    0          
1800             }
1801 0 0         else if (strEQ(k, "max_pending")) RETVAL->max_pending = SvIV(v);
1802 0 0         else if (strEQ(k, "waiting_timeout")) RETVAL->waiting_timeout_ms = SvIV(v);
1803 0 0         else if (strEQ(k, "connect_timeout")) RETVAL->connect_timeout_ms = SvIV(v);
1804 0 0         else if (strEQ(k, "command_timeout")) RETVAL->command_timeout_ms = SvIV(v);
1805 0 0         else if (strEQ(k, "resume_waiting_on_reconnect")) RETVAL->resume_waiting_on_reconnect = SvTRUE(v) ? 1 : 0;
1806 0 0         else if (strEQ(k, "priority")) RETVAL->priority = SvIV(v);
1807 0 0         else if (strEQ(k, "keepalive")) RETVAL->keepalive = SvIV(v);
1808 0 0         else if (strEQ(k, "reconnect")) do_reconnect = SvTRUE(v) ? 1 : 0;
1809 0 0         else if (strEQ(k, "reconnect_delay")) reconnect_delay = SvIV(v);
1810 0 0         else if (strEQ(k, "max_reconnect_attempts")) max_reconnect_attempts = SvIV(v);
1811 0 0         else if (strEQ(k, "username")) {
1812 0 0         if (SvOK(v)) RETVAL->username = savepv(SvPV_nolen(v));
1813             }
1814 0 0         else if (strEQ(k, "password")) {
1815 0 0         if (SvOK(v)) RETVAL->password = savepv(SvPV_nolen(v));
1816             }
1817 0 0         else if (strEQ(k, "loop")) {
1818 0           RETVAL->loop = (struct ev_loop *)SvPVX(SvRV(v));
1819             }
1820             }
1821              
1822 0 0         if (host_sv && path_sv) {
    0          
1823 0           Safefree(RETVAL->rbuf);
1824 0           Safefree(RETVAL->wbuf);
1825 0 0         CLEAR_HANDLER(RETVAL->on_error);
1826 0 0         CLEAR_HANDLER(RETVAL->on_connect);
1827 0 0         CLEAR_HANDLER(RETVAL->on_disconnect);
1828 0 0         if (RETVAL->username) Safefree(RETVAL->username);
1829 0 0         if (RETVAL->password) Safefree(RETVAL->password);
1830 0           Safefree(RETVAL);
1831 0           croak("cannot specify both 'host' and 'path'");
1832             }
1833              
1834 0           RETVAL->port = port;
1835 0 0         if (do_reconnect) {
1836 0           RETVAL->reconnect = 1;
1837 0           RETVAL->reconnect_delay_ms = reconnect_delay >= 0 ? reconnect_delay : 0;
1838 0           RETVAL->max_reconnect_attempts = max_reconnect_attempts >= 0 ? max_reconnect_attempts : 0;
1839             }
1840              
1841 0 0         if (host_sv && SvOK(host_sv)) {
    0          
1842 0           RETVAL->host = savepv(SvPV_nolen(host_sv));
1843 0           start_connect(aTHX_ RETVAL);
1844             }
1845 0 0         else if (path_sv && SvOK(path_sv)) {
    0          
1846 0           RETVAL->path = savepv(SvPV_nolen(path_sv));
1847 0           start_connect(aTHX_ RETVAL);
1848             }
1849             }
1850             OUTPUT:
1851             RETVAL
1852              
1853             void
1854             DESTROY(EV::Memcached self)
1855             CODE:
1856             {
1857 0 0         if (self->magic == MC_MAGIC_FREED) return;
1858              
1859             /* If we're inside a callback, defer destruction.
1860             * check_destroyed() in io_cb/timer_cb will Safefree after unwind. */
1861 0 0         if (self->callback_depth > 0) {
1862 0           self->magic = MC_MAGIC_FREED;
1863              
1864             /* Stop watchers */
1865 0           stop_reading(self);
1866 0           stop_writing(self);
1867 0           stop_connect_timer(self);
1868 0           disarm_cmd_timer(self);
1869 0           stop_reconnect_timer(self);
1870 0           stop_waiting_timer(self);
1871 0 0         if (self->fd >= 0) { close(self->fd); self->fd = -1; }
1872              
1873             /* Clean up queues without invoking Perl callbacks */
1874 0 0         while (!ngx_queue_empty(&self->cb_queue)) {
1875 0           ngx_queue_t *q = ngx_queue_head(&self->cb_queue);
1876 0           ev_mc_cb_t *cbt = ngx_queue_data(q, ev_mc_cb_t, queue);
1877 0           ngx_queue_remove(q);
1878 0           cleanup_cbt(aTHX_ cbt);
1879             }
1880 0 0         while (!ngx_queue_empty(&self->wait_queue)) {
1881 0           ngx_queue_t *q = ngx_queue_head(&self->wait_queue);
1882 0           ev_mc_wait_t *wt = ngx_queue_data(q, ev_mc_wait_t, queue);
1883 0           ngx_queue_remove(q);
1884 0           cleanup_wait(aTHX_ wt);
1885             }
1886 0           self->pending_count = 0;
1887 0           self->waiting_count = 0;
1888              
1889 0 0         CLEAR_HANDLER(self->on_error);
1890 0 0         CLEAR_HANDLER(self->on_connect);
1891 0 0         CLEAR_HANDLER(self->on_disconnect);
1892 0           Safefree(self->host); self->host = NULL;
1893 0           Safefree(self->path); self->path = NULL;
1894 0           Safefree(self->username); self->username = NULL;
1895 0           Safefree(self->password); self->password = NULL;
1896 0           Safefree(self->rbuf); self->rbuf = NULL;
1897 0           Safefree(self->wbuf); self->wbuf = NULL;
1898 0           return;
1899             }
1900              
1901             /* Stop timers */
1902 0           stop_connect_timer(self);
1903 0           disarm_cmd_timer(self);
1904 0           stop_reconnect_timer(self);
1905 0           stop_waiting_timer(self);
1906              
1907 0           cleanup_connection(aTHX_ self);
1908              
1909             /* Cancel pending/waiting with disconnected error.
1910             Don't pre-set magic=FREED here: cancel_pending_impl bails its
1911             loop on FREED, which would leak every entry past the first if
1912             any callback was set. Bump callback_depth so any nested DESTROY
1913             (e.g. a callback drops a separate strong ref) takes the deferred
1914             path at the top of this function. */
1915 0 0         if (!PL_dirty) {
1916 0           self->callback_depth++;
1917 0           cancel_pending(aTHX_ self, err_disconnected);
1918 0           cancel_waiting(aTHX_ self, err_disconnected);
1919 0           self->callback_depth--;
1920             } else {
1921             /* Global destruction: free entries without calling Perl */
1922 0 0         while (!ngx_queue_empty(&self->cb_queue)) {
1923 0           ngx_queue_t *q = ngx_queue_head(&self->cb_queue);
1924 0           ev_mc_cb_t *cbt = ngx_queue_data(q, ev_mc_cb_t, queue);
1925 0           ngx_queue_remove(q);
1926 0           cleanup_cbt(aTHX_ cbt);
1927             }
1928 0 0         while (!ngx_queue_empty(&self->wait_queue)) {
1929 0           ngx_queue_t *q = ngx_queue_head(&self->wait_queue);
1930 0           ev_mc_wait_t *wt = ngx_queue_data(q, ev_mc_wait_t, queue);
1931 0           ngx_queue_remove(q);
1932 0           cleanup_wait(aTHX_ wt);
1933             }
1934             }
1935              
1936 0           self->magic = MC_MAGIC_FREED;
1937              
1938 0 0         CLEAR_HANDLER(self->on_error);
1939 0 0         CLEAR_HANDLER(self->on_connect);
1940 0 0         CLEAR_HANDLER(self->on_disconnect);
1941              
1942 0           Safefree(self->host);
1943 0           Safefree(self->path);
1944 0           Safefree(self->username);
1945 0           Safefree(self->password);
1946 0           Safefree(self->rbuf);
1947 0           Safefree(self->wbuf);
1948              
1949 0           Safefree(self);
1950             }
1951              
1952             void
1953             connect(EV::Memcached self, const char *host, int port = 11211)
1954             CODE:
1955             {
1956 0 0         if (self->connected || self->connecting)
    0          
1957 0           croak("already connected");
1958              
1959             /* An auto-reconnect timer may be pending; cancel it to avoid a
1960             second start_connect when the timer fires. */
1961 0           stop_reconnect_timer(self);
1962              
1963 0           Safefree(self->host);
1964 0           self->host = savepv(host);
1965 0           self->port = port;
1966 0           Safefree(self->path); self->path = NULL;
1967 0           self->intentional_disconnect = 0;
1968              
1969 0           start_connect(aTHX_ self);
1970             }
1971              
1972             void
1973             connect_unix(EV::Memcached self, const char *path)
1974             CODE:
1975             {
1976 0 0         if (self->connected || self->connecting)
    0          
1977 0           croak("already connected");
1978              
1979 0           stop_reconnect_timer(self);
1980              
1981 0           Safefree(self->path);
1982 0           self->path = savepv(path);
1983 0           Safefree(self->host); self->host = NULL;
1984 0           self->intentional_disconnect = 0;
1985              
1986 0           start_connect(aTHX_ self);
1987             }
1988              
1989             void
1990             disconnect(EV::Memcached self)
1991             CODE:
1992             {
1993 0           self->intentional_disconnect = 1;
1994              
1995 0           stop_reconnect_timer(self);
1996              
1997             /* Pin the object across pending-callback dispatch so that an
1998             `undef $mc` from a callback defers DESTROY rather than freeing
1999             us mid-call; check_destroyed below handles the deferred free. */
2000 0           self->callback_depth++;
2001 0 0         if (self->connected || self->connecting) {
    0          
2002 0           handle_disconnect(aTHX_ self, NULL);
2003             } else {
2004 0           cancel_waiting(aTHX_ self, err_disconnected);
2005             }
2006 0           self->callback_depth--;
2007 0           check_destroyed(self);
2008             }
2009              
2010             int
2011             is_connected(EV::Memcached self)
2012             CODE:
2013 0 0         RETVAL = self->connected || self->connecting;
    0          
    0          
2014             OUTPUT:
2015             RETVAL
2016              
2017             SV *
2018             on_error(EV::Memcached self, ...)
2019             CODE:
2020             {
2021 0 0         if (items > 1) {
2022 0 0         CLEAR_HANDLER(self->on_error);
2023 0 0         if (SvOK(ST(1)) && SvROK(ST(1)) && SvTYPE(SvRV(ST(1))) == SVt_PVCV) {
    0          
    0          
2024 0           self->on_error = newSVsv(ST(1));
2025             }
2026             }
2027 0 0         RETVAL = self->on_error ? newSVsv(self->on_error) : &PL_sv_undef;
2028             }
2029             OUTPUT:
2030             RETVAL
2031              
2032             SV *
2033             on_connect(EV::Memcached self, ...)
2034             CODE:
2035             {
2036 0 0         if (items > 1) {
2037 0 0         CLEAR_HANDLER(self->on_connect);
2038 0 0         if (SvOK(ST(1)) && SvROK(ST(1)) && SvTYPE(SvRV(ST(1))) == SVt_PVCV) {
    0          
    0          
2039 0           self->on_connect = newSVsv(ST(1));
2040             }
2041             }
2042 0 0         RETVAL = self->on_connect ? newSVsv(self->on_connect) : &PL_sv_undef;
2043             }
2044             OUTPUT:
2045             RETVAL
2046              
2047             SV *
2048             on_disconnect(EV::Memcached self, ...)
2049             CODE:
2050             {
2051 0 0         if (items > 1) {
2052 0 0         CLEAR_HANDLER(self->on_disconnect);
2053 0 0         if (SvOK(ST(1)) && SvROK(ST(1)) && SvTYPE(SvRV(ST(1))) == SVt_PVCV) {
    0          
    0          
2054 0           self->on_disconnect = newSVsv(ST(1));
2055             }
2056             }
2057 0 0         RETVAL = self->on_disconnect ? newSVsv(self->on_disconnect) : &PL_sv_undef;
2058             }
2059             OUTPUT:
2060             RETVAL
2061              
2062             void
2063             set(EV::Memcached self, SV *key_sv, SV *value_sv, ...)
2064             ALIAS:
2065             add = 1
2066             replace = 2
2067             CODE:
2068             {
2069             static const uint8_t opcodes[] = { MC_OP_SET, MC_OP_ADD, MC_OP_REPLACE };
2070 0 0         MC_CROAK_UNLESS_CONNECTED(self);
    0          
    0          
    0          
2071              
2072             STRLEN key_len, value_len;
2073 0           const char *key = SvPV(key_sv, key_len);
2074 0           const char *value = SvPV(value_sv, value_len);
2075              
2076 0           int extra = items - 3;
2077 0           SV *cb = NULL;
2078 0 0         if (extra > 0 && SvROK(ST(items-1)) && SvTYPE(SvRV(ST(items-1))) == SVt_PVCV) {
    0          
    0          
2079 0           cb = ST(items-1);
2080 0           extra--;
2081             }
2082 0 0         UV expiry = extra > 0 ? SvUV(ST(3)) : 0;
2083 0 0         UV flags = extra > 1 ? SvUV(ST(4)) : 0;
2084              
2085             char extras[8];
2086 0           mc_write_u32(extras, (uint32_t)flags);
2087 0           mc_write_u32(extras + 4, (uint32_t)expiry);
2088              
2089             /* SET fire-and-forget uses SETQ (quiet) — server suppresses response.
2090             * ADD/REPLACE can fail, so they always use normal opcodes. */
2091 0 0         if (!cb && ix == 0) {
    0          
2092 0           mc_fire_and_forget(aTHX_ self, MC_OP_SETQ, key, key_len, value, value_len,
2093             extras, 8, 0);
2094             } else {
2095 0           mc_enqueue_cmd(aTHX_ self, opcodes[ix], key, key_len, value, value_len,
2096             extras, 8, 0, CB_CMD_STORE, 0, cb);
2097             }
2098             }
2099              
2100             void
2101             cas(EV::Memcached self, SV *key_sv, SV *value_sv, SV *cas_sv, ...)
2102             CODE:
2103             {
2104 0 0         MC_CROAK_UNLESS_CONNECTED(self);
    0          
    0          
    0          
2105              
2106             STRLEN key_len, value_len;
2107 0           const char *key = SvPV(key_sv, key_len);
2108 0           const char *value = SvPV(value_sv, value_len);
2109 0           uint64_t cas_val = SvUV(cas_sv);
2110              
2111 0           int extra = items - 4;
2112 0           SV *cb = NULL;
2113 0 0         if (extra > 0 && SvROK(ST(items-1)) && SvTYPE(SvRV(ST(items-1))) == SVt_PVCV) {
    0          
    0          
2114 0           cb = ST(items-1);
2115 0           extra--;
2116             }
2117 0 0         UV expiry = extra > 0 ? SvUV(ST(4)) : 0;
2118 0 0         UV flags = extra > 1 ? SvUV(ST(5)) : 0;
2119              
2120             char extras[8];
2121 0           mc_write_u32(extras, (uint32_t)flags);
2122 0           mc_write_u32(extras + 4, (uint32_t)expiry);
2123              
2124 0           mc_enqueue_cmd(aTHX_ self, MC_OP_SET, key, key_len, value, value_len,
2125             extras, 8, cas_val, CB_CMD_STORE, 0, cb);
2126             }
2127              
2128             void
2129             get(EV::Memcached self, SV *key_sv, SV *cb_sv = &PL_sv_undef)
2130             ALIAS:
2131             gets = 1
2132             CODE:
2133             {
2134 0 0         MC_CROAK_UNLESS_CONNECTED(self);
    0          
    0          
    0          
2135              
2136             STRLEN key_len;
2137 0           const char *key = SvPV(key_sv, key_len);
2138 0 0         SV *cb = (SvOK(cb_sv) && SvROK(cb_sv)) ? cb_sv : NULL;
    0          
2139              
2140 0           mc_enqueue_cmd(aTHX_ self, MC_OP_GET, key, key_len, NULL, 0,
2141             NULL, 0, 0, ix == 0 ? CB_CMD_GET : CB_CMD_GETS, 0, cb);
2142             }
2143              
2144             void
2145             delete(EV::Memcached self, SV *key_sv, SV *cb_sv = &PL_sv_undef)
2146             CODE:
2147             {
2148 0 0         MC_CROAK_UNLESS_CONNECTED(self);
    0          
    0          
    0          
2149              
2150             STRLEN key_len;
2151 0           const char *key = SvPV(key_sv, key_len);
2152 0 0         SV *cb = (SvOK(cb_sv) && SvROK(cb_sv)) ? cb_sv : NULL;
    0          
2153              
2154 0           mc_enqueue_cmd(aTHX_ self, MC_OP_DELETE, key, key_len, NULL, 0,
2155             NULL, 0, 0, CB_CMD_DELETE, 0, cb);
2156             }
2157              
2158             void
2159             incr(EV::Memcached self, SV *key_sv, ...)
2160             ALIAS:
2161             decr = 1
2162             CODE:
2163             {
2164 0 0         MC_CROAK_UNLESS_CONNECTED(self);
    0          
    0          
    0          
2165              
2166             STRLEN key_len;
2167 0           const char *key = SvPV(key_sv, key_len);
2168              
2169 0           int extra = items - 2;
2170 0           SV *cb = NULL;
2171 0 0         if (extra > 0 && SvROK(ST(items-1)) && SvTYPE(SvRV(ST(items-1))) == SVt_PVCV) {
    0          
    0          
2172 0           cb = ST(items-1);
2173 0           extra--;
2174             }
2175 0 0         UV delta = extra > 0 ? SvUV(ST(2)) : 1;
2176 0 0         UV initial = extra > 1 ? SvUV(ST(3)) : 0;
2177 0 0         UV expiry = extra > 2 ? SvUV(ST(4)) : 0xFFFFFFFF;
2178              
2179             char extras[20];
2180 0           mc_write_u64(extras, (uint64_t)delta);
2181 0           mc_write_u64(extras + 8, (uint64_t)initial);
2182 0           mc_write_u32(extras + 16, (uint32_t)expiry);
2183              
2184 0 0         mc_enqueue_cmd(aTHX_ self, ix == 0 ? MC_OP_INCR : MC_OP_DECR,
2185             key, key_len, NULL, 0, extras, 20, 0, CB_CMD_ARITH, 0, cb);
2186             }
2187              
2188             void
2189             append(EV::Memcached self, SV *key_sv, SV *value_sv, SV *cb_sv = &PL_sv_undef)
2190             ALIAS:
2191             prepend = 1
2192             CODE:
2193             {
2194 0 0         MC_CROAK_UNLESS_CONNECTED(self);
    0          
    0          
    0          
2195              
2196             STRLEN key_len, value_len;
2197 0           const char *key = SvPV(key_sv, key_len);
2198 0           const char *value = SvPV(value_sv, value_len);
2199 0 0         SV *cb = (SvOK(cb_sv) && SvROK(cb_sv)) ? cb_sv : NULL;
    0          
2200              
2201 0 0         mc_enqueue_cmd(aTHX_ self, ix == 0 ? MC_OP_APPEND : MC_OP_PREPEND,
2202             key, key_len, value, value_len, NULL, 0, 0, CB_CMD_STORE, 0, cb);
2203             }
2204              
2205             void
2206             touch(EV::Memcached self, SV *key_sv, UV expiry, SV *cb_sv = &PL_sv_undef)
2207             ALIAS:
2208             gat = 1
2209             gats = 2
2210             CODE:
2211             {
2212 0 0         MC_CROAK_UNLESS_CONNECTED(self);
    0          
    0          
    0          
2213              
2214             STRLEN key_len;
2215 0           const char *key = SvPV(key_sv, key_len);
2216 0 0         SV *cb = (SvOK(cb_sv) && SvROK(cb_sv)) ? cb_sv : NULL;
    0          
2217              
2218             char extras[4];
2219 0           mc_write_u32(extras, (uint32_t)expiry);
2220              
2221             static const int cmds[] = { CB_CMD_TOUCH, CB_CMD_GAT, CB_CMD_GATS };
2222             static const uint8_t ops[] = { MC_OP_TOUCH, MC_OP_GAT, MC_OP_GAT };
2223              
2224 0           mc_enqueue_cmd(aTHX_ self, ops[ix], key, key_len, NULL, 0,
2225 0           extras, 4, 0, cmds[ix], 0, cb);
2226             }
2227              
2228             void
2229             flush(EV::Memcached self, ...)
2230             CODE:
2231             {
2232 0 0         MC_CROAK_UNLESS_CONNECTED(self);
    0          
    0          
    0          
2233              
2234 0           int extra = items - 1;
2235 0           SV *cb = NULL;
2236 0 0         if (extra > 0 && SvROK(ST(items-1)) && SvTYPE(SvRV(ST(items-1))) == SVt_PVCV) {
    0          
    0          
2237 0           cb = ST(items-1);
2238 0           extra--;
2239             }
2240 0 0         UV expiry = extra > 0 ? SvUV(ST(1)) : 0;
2241              
2242             char extras[4];
2243 0           const char *xp = NULL;
2244 0           uint8_t xl = 0;
2245 0 0         if (expiry > 0) {
2246 0           mc_write_u32(extras, (uint32_t)expiry);
2247 0           xp = extras;
2248 0           xl = 4;
2249             }
2250              
2251 0 0         if (cb)
2252 0           mc_enqueue_cmd(aTHX_ self, MC_OP_FLUSH, NULL, 0, NULL, 0,
2253             xp, xl, 0, CB_CMD_FLUSH, 0, cb);
2254             else
2255 0           mc_fire_and_forget(aTHX_ self, MC_OP_FLUSHQ, NULL, 0, NULL, 0,
2256             xp, xl, 0);
2257             }
2258              
2259             void
2260             version(EV::Memcached self, SV *cb_sv = &PL_sv_undef)
2261             ALIAS:
2262             noop = 1
2263             quit = 2
2264             CODE:
2265             {
2266 0 0         MC_CROAK_UNLESS_CONNECTED(self);
    0          
    0          
    0          
2267              
2268 0 0         SV *cb = (SvOK(cb_sv) && SvROK(cb_sv)) ? cb_sv : NULL;
    0          
2269              
2270             static const uint8_t ops[] = { MC_OP_VERSION, MC_OP_NOOP, MC_OP_QUIT };
2271             static const int cmds[] = { CB_CMD_VERSION, CB_CMD_NOOP, CB_CMD_QUIT };
2272              
2273 0           mc_enqueue_cmd(aTHX_ self, ops[ix], NULL, 0, NULL, 0,
2274 0           NULL, 0, 0, cmds[ix], 0, cb);
2275             }
2276              
2277             void
2278             stats(EV::Memcached self, ...)
2279             CODE:
2280             {
2281 0 0         MC_CROAK_UNLESS_CONNECTED(self);
    0          
    0          
    0          
2282              
2283 0           int extra = items - 1;
2284 0           SV *cb = NULL;
2285 0 0         if (extra > 0 && SvROK(ST(items-1)) && SvTYPE(SvRV(ST(items-1))) == SVt_PVCV) {
    0          
    0          
2286 0           cb = ST(items-1);
2287 0           extra--;
2288             }
2289              
2290 0           STRLEN name_len = 0;
2291 0           const char *name = NULL;
2292 0 0         if (extra > 0 && SvOK(ST(1))) {
    0          
2293 0           name = SvPV(ST(1), name_len);
2294             }
2295              
2296 0           mc_enqueue_cmd(aTHX_ self, MC_OP_STAT, name, name_len, NULL, 0,
2297             NULL, 0, 0, CB_CMD_STATS, 0, cb);
2298             }
2299              
2300             void
2301             mget(EV::Memcached self, SV *keys_sv, SV *cb_sv = &PL_sv_undef)
2302             ALIAS:
2303             mgets = 1
2304             CODE:
2305             {
2306 0 0         MC_CROAK_UNLESS_CONNECTED(self);
    0          
    0          
    0          
2307              
2308 0 0         if (!SvROK(keys_sv) || SvTYPE(SvRV(keys_sv)) != SVt_PVAV)
    0          
2309 0           croak("mget requires an array reference");
2310              
2311 0 0         SV *cb = (SvOK(cb_sv) && SvROK(cb_sv)) ? cb_sv : NULL;
    0          
2312 0           AV *keys_av = (AV *)SvRV(keys_sv);
2313              
2314 0           mc_enqueue_mget(aTHX_ self, keys_av, cb, ix);
2315             }
2316              
2317             void
2318             sasl_auth(EV::Memcached self, SV *user_sv, SV *pass_sv, SV *cb_sv = &PL_sv_undef)
2319             CODE:
2320             {
2321 0 0         MC_CROAK_UNLESS_CONNECTED(self);
    0          
    0          
    0          
2322              
2323             STRLEN ulen, plen;
2324 0           const char *user = SvPV(user_sv, ulen);
2325 0           const char *pass = SvPV(pass_sv, plen);
2326 0 0         SV *cb = (SvOK(cb_sv) && SvROK(cb_sv)) ? cb_sv : NULL;
    0          
2327              
2328 0           size_t vlen = 1 + ulen + 1 + plen;
2329             char *authdata;
2330 0           Newx(authdata, vlen, char);
2331 0           authdata[0] = '\0';
2332 0           memcpy(authdata + 1, user, ulen);
2333 0           authdata[1 + ulen] = '\0';
2334 0           memcpy(authdata + 2 + ulen, pass, plen);
2335              
2336 0           mc_enqueue_cmd(aTHX_ self, MC_OP_SASL_AUTH, "PLAIN", 5, authdata, vlen,
2337             NULL, 0, 0, CB_CMD_SASL_AUTH, 0, cb);
2338 0           Safefree(authdata);
2339             }
2340              
2341             void
2342             sasl_list_mechs(EV::Memcached self, SV *cb_sv = &PL_sv_undef)
2343             CODE:
2344             {
2345 0 0         MC_CROAK_UNLESS_CONNECTED(self);
    0          
    0          
    0          
2346              
2347 0 0         SV *cb = (SvOK(cb_sv) && SvROK(cb_sv)) ? cb_sv : NULL;
    0          
2348              
2349 0           mc_enqueue_cmd(aTHX_ self, MC_OP_SASL_LIST_MECHS, NULL, 0, NULL, 0,
2350             NULL, 0, 0, CB_CMD_SASL_LIST, 0, cb);
2351             }
2352              
2353             int
2354             pending_count(EV::Memcached self)
2355             CODE:
2356 0 0         RETVAL = self->pending_count;
2357             OUTPUT:
2358             RETVAL
2359              
2360             int
2361             waiting_count(EV::Memcached self)
2362             CODE:
2363 0 0         RETVAL = self->waiting_count;
2364             OUTPUT:
2365             RETVAL
2366              
2367             int
2368             max_pending(EV::Memcached self, ...)
2369             CODE:
2370             {
2371 0 0         if (items > 1) {
2372 0           int old = self->max_pending;
2373 0           self->max_pending = SvIV(ST(1));
2374 0 0         if (self->max_pending < 0) self->max_pending = 0;
2375             /* If limit increased, drain waiting queue */
2376 0 0         if (self->connected && self->max_pending > old)
    0          
2377 0           send_next_waiting(aTHX_ self);
2378             }
2379 0 0         RETVAL = self->max_pending;
2380             }
2381             OUTPUT:
2382             RETVAL
2383              
2384             int
2385             waiting_timeout(EV::Memcached self, ...)
2386             CODE:
2387             {
2388 0 0         if (items > 1) {
2389 0           self->waiting_timeout_ms = SvIV(ST(1));
2390 0 0         if (self->waiting_timeout_ms < 0) self->waiting_timeout_ms = 0;
2391             }
2392 0 0         RETVAL = self->waiting_timeout_ms;
2393             }
2394             OUTPUT:
2395             RETVAL
2396              
2397             int
2398             resume_waiting_on_reconnect(EV::Memcached self, ...)
2399             CODE:
2400             {
2401 0 0         if (items > 1) {
2402 0           self->resume_waiting_on_reconnect = SvTRUE(ST(1)) ? 1 : 0;
2403             }
2404 0 0         RETVAL = self->resume_waiting_on_reconnect;
2405             }
2406             OUTPUT:
2407             RETVAL
2408              
2409             int
2410             connect_timeout(EV::Memcached self, ...)
2411             CODE:
2412             {
2413 0 0         if (items > 1) {
2414 0           self->connect_timeout_ms = SvIV(ST(1));
2415 0 0         if (self->connect_timeout_ms < 0) self->connect_timeout_ms = 0;
2416             }
2417 0 0         RETVAL = self->connect_timeout_ms;
2418             }
2419             OUTPUT:
2420             RETVAL
2421              
2422             int
2423             command_timeout(EV::Memcached self, ...)
2424             CODE:
2425             {
2426 0 0         if (items > 1) {
2427 0           self->command_timeout_ms = SvIV(ST(1));
2428 0 0         if (self->command_timeout_ms < 0) self->command_timeout_ms = 0;
2429 0 0         if (self->command_timeout_ms == 0)
2430 0           disarm_cmd_timer(self);
2431 0 0         else if (!ngx_queue_empty(&self->cb_queue))
2432 0           arm_cmd_timer(self);
2433             }
2434 0 0         RETVAL = self->command_timeout_ms;
2435             }
2436             OUTPUT:
2437             RETVAL
2438              
2439             void
2440             reconnect(EV::Memcached self, int enable, int delay_ms = 1000, int max_attempts = 0)
2441             CODE:
2442             {
2443 0           self->reconnect = enable ? 1 : 0;
2444 0           self->reconnect_delay_ms = delay_ms >= 0 ? delay_ms : 0;
2445 0           self->max_reconnect_attempts = max_attempts >= 0 ? max_attempts : 0;
2446 0 0         if (!enable) {
2447 0           self->reconnect_attempts = 0;
2448 0           stop_reconnect_timer(self);
2449             }
2450             }
2451              
2452             int
2453             reconnect_enabled(EV::Memcached self)
2454             CODE:
2455 0 0         RETVAL = self->reconnect;
2456             OUTPUT:
2457             RETVAL
2458              
2459             int
2460             priority(EV::Memcached self, ...)
2461             CODE:
2462             {
2463 0 0         if (items > 1) {
2464 0           self->priority = SvIV(ST(1));
2465 0 0         if (self->priority < -2) self->priority = -2;
2466 0 0         if (self->priority > 2) self->priority = 2;
2467             /* Apply to active watchers */
2468 0 0         if (self->reading) {
2469 0           ev_io_stop(self->loop, &self->rio);
2470 0           ev_set_priority(&self->rio, self->priority);
2471 0           ev_io_start(self->loop, &self->rio);
2472             } else {
2473 0           ev_set_priority(&self->rio, self->priority);
2474             }
2475 0 0         if (self->writing) {
2476 0           ev_io_stop(self->loop, &self->wio);
2477 0           ev_set_priority(&self->wio, self->priority);
2478 0           ev_io_start(self->loop, &self->wio);
2479             } else {
2480 0           ev_set_priority(&self->wio, self->priority);
2481             }
2482             }
2483 0 0         RETVAL = self->priority;
2484             }
2485             OUTPUT:
2486             RETVAL
2487              
2488             int
2489             keepalive(EV::Memcached self, ...)
2490             CODE:
2491             {
2492 0 0         if (items > 1) {
2493 0           self->keepalive = SvIV(ST(1));
2494 0 0         if (self->keepalive < 0) self->keepalive = 0;
2495 0 0         if (self->connected && self->fd >= 0)
    0          
2496 0           apply_keepalive(self);
2497             }
2498 0 0         RETVAL = self->keepalive;
2499             }
2500             OUTPUT:
2501             RETVAL
2502              
2503             void
2504             skip_pending(EV::Memcached self)
2505             CODE:
2506             {
2507 0           self->callback_depth++;
2508 0           cancel_pending_impl(aTHX_ self, err_skipped, 1);
2509 0           self->callback_depth--;
2510 0           check_destroyed(self);
2511             }
2512              
2513             void
2514             skip_waiting(EV::Memcached self)
2515             CODE:
2516             {
2517 0           self->callback_depth++;
2518 0           cancel_waiting(aTHX_ self, err_skipped);
2519 0           self->callback_depth--;
2520 0           check_destroyed(self);
2521             }