File Coverage

Redis.xs
Criterion Covered Total %
statement 91 886 10.2
branch 54 728 7.4
condition n/a
subroutine n/a
pod n/a
total 145 1614 8.9


line stmt bran cond sub pod time code
1             #include "EXTERN.h"
2             #include "perl.h"
3             #include "XSUB.h"
4             #include "ppport.h"
5              
6             #include "EVAPI.h"
7              
8             #include "hiredis.h"
9             #include "async.h"
10             #include "libev_adapter.h"
11             #include "ngx-queue.h"
12              
13             #ifdef EV_REDIS_SSL
14             #include "hiredis_ssl.h"
15             #endif
16              
17             typedef struct ev_redis_s ev_redis_t;
18             typedef struct ev_redis_cb_s ev_redis_cb_t;
19             typedef struct ev_redis_wait_s ev_redis_wait_t;
20              
21             typedef ev_redis_t* EV__Redis;
22             typedef struct ev_loop* EV__Loop;
23              
24             #define EV_REDIS_MAGIC 0xDEADBEEF
25             #define EV_REDIS_FREED 0xFEEDFACE
26              
27             #define CLEAR_HANDLER(field) \
28             do { if (NULL != (field)) { SvREFCNT_dec(field); (field) = NULL; } } while(0)
29              
30             struct ev_redis_s {
31             unsigned int magic; /* Set to EV_REDIS_MAGIC when alive */
32             struct ev_loop* loop;
33             redisAsyncContext* ac;
34             SV* error_handler;
35             SV* connect_handler;
36             SV* disconnect_handler;
37             SV* push_handler;
38             struct timeval* connect_timeout;
39             struct timeval* command_timeout;
40             ngx_queue_t cb_queue;
41             ngx_queue_t wait_queue;
42             int pending_count;
43             int waiting_count;
44             int max_pending; /* 0 = unlimited */
45             ev_redis_cb_t* current_cb; /* callback currently executing */
46             int resume_waiting_on_reconnect; /* keep waiting queue on disconnect */
47             int waiting_timeout_ms; /* max ms in waiting queue, 0 = unlimited */
48             ev_timer waiting_timer;
49             int waiting_timer_active;
50              
51             /* Reconnect settings */
52             char* host;
53             int port;
54             char* path;
55             int reconnect; /* 0 = disabled, 1 = enabled */
56             int reconnect_delay_ms; /* delay between reconnect attempts */
57             int max_reconnect_attempts; /* 0 = unlimited */
58             int reconnect_attempts; /* current attempt count */
59             ev_timer reconnect_timer;
60             int reconnect_timer_active;
61             int intentional_disconnect; /* set before explicit disconnect() */
62             int priority; /* libev watcher priority, default 0 */
63             int in_cb_cleanup; /* prevent re-entrant cb_queue modification */
64             int in_wait_cleanup; /* prevent re-entrant wait_queue modification */
65             int callback_depth; /* nesting depth of C-level callbacks invoking Perl code */
66             int keepalive; /* TCP keepalive interval in seconds, 0 = disabled */
67             int prefer_ipv4; /* prefer IPv4 DNS resolution */
68             int prefer_ipv6; /* prefer IPv6 DNS resolution */
69             char* source_addr; /* local address to bind to */
70             unsigned int tcp_user_timeout; /* TCP_USER_TIMEOUT in ms, 0 = OS default */
71             int cloexec; /* set SOCK_CLOEXEC on socket */
72             int reuseaddr; /* set SO_REUSEADDR on socket */
73             redisAsyncContext* ac_saved; /* saved ac pointer for deferred disconnect cleanup */
74             #ifdef EV_REDIS_SSL
75             redisSSLContext* ssl_ctx;
76             #endif
77             };
78              
79             struct ev_redis_cb_s {
80             SV* cb;
81             ngx_queue_t queue;
82             int persist;
83             int skipped;
84             int sub_count; /* subscription channels remaining (for persistent commands) */
85             };
86              
87             struct ev_redis_wait_s {
88             char** argv;
89             size_t* argvlen;
90             int argc;
91             SV* cb;
92             int persist;
93             ngx_queue_t queue;
94             ev_tstamp queued_at;
95             };
96              
97             /* Shared error strings (initialized in BOOT) */
98             static SV* err_skipped = NULL;
99             static SV* err_waiting_timeout = NULL;
100             static SV* err_disconnected = NULL;
101              
102             /* Check for unsubscribe-family commands. These are persistent (stay in cb_queue)
103             * but hiredis ignores their callbacks — replies go through the subscribe callback. */
104 0           static int is_unsubscribe_command(const char* cmd) {
105 0           char c = cmd[0];
106 0 0         if (c == 'u' || c == 'U') return (0 == strcasecmp(cmd, "unsubscribe"));
    0          
107 0 0         if (c == 'p' || c == 'P') return (0 == strcasecmp(cmd, "punsubscribe"));
    0          
108 0 0         if (c == 's' || c == 'S') return (0 == strcasecmp(cmd, "sunsubscribe"));
    0          
109 0           return 0;
110             }
111              
112 0           static int is_persistent_command(const char* cmd) {
113 0           char c = cmd[0];
114              
115 0 0         if (c == 's' || c == 'S') {
    0          
116 0 0         if (0 == strcasecmp(cmd, "subscribe")) return 1;
117 0 0         if (0 == strcasecmp(cmd, "ssubscribe")) return 1;
118 0 0         if (0 == strcasecmp(cmd, "sunsubscribe")) return 1;
119 0           return 0;
120             }
121 0 0         if (c == 'u' || c == 'U') {
    0          
122 0           return (0 == strcasecmp(cmd, "unsubscribe"));
123             }
124 0 0         if (c == 'p' || c == 'P') {
    0          
125 0 0         if (0 == strcasecmp(cmd, "psubscribe")) return 1;
126 0 0         if (0 == strcasecmp(cmd, "punsubscribe")) return 1;
127 0           return 0;
128             }
129 0 0         if (c == 'm' || c == 'M') {
    0          
130 0           return (0 == strcasecmp(cmd, "monitor"));
131             }
132              
133 0           return 0;
134             }
135              
136             /* Detect unsubscribe-type replies that indicate end of a subscription channel.
137             * Format: [type_string, channel, remaining_count]
138             * Returns 1 if the reply is an unsubscribe/punsubscribe/sunsubscribe message. */
139 0           static int is_unsub_reply(redisReply* reply) {
140             const char* s;
141              
142 0 0         if (reply->type != REDIS_REPLY_ARRAY && reply->type != REDIS_REPLY_PUSH) return 0;
    0          
143 0 0         if (reply->elements < 3) return 0;
144 0 0         if (NULL == reply->element[0]) return 0;
145 0 0         if (reply->element[0]->type != REDIS_REPLY_STRING &&
146 0 0         reply->element[0]->type != REDIS_REPLY_STATUS) return 0;
147              
148 0           s = reply->element[0]->str;
149 0 0         if (s[0] == 'u' || s[0] == 'U') return (0 == strcasecmp(s, "unsubscribe"));
    0          
150 0 0         if (s[0] == 'p' || s[0] == 'P') return (0 == strcasecmp(s, "punsubscribe"));
    0          
151 0 0         if (s[0] == 's' || s[0] == 'S') return (0 == strcasecmp(s, "sunsubscribe"));
    0          
152 0           return 0;
153             }
154              
155 0           static void emit_error(EV__Redis self, SV* error) {
156 0 0         if (NULL == self->error_handler) return;
157              
158 0           dSP;
159              
160 0           ENTER;
161 0           SAVETMPS;
162              
163 0 0         PUSHMARK(SP);
164 0 0         XPUSHs(error);
165 0           PUTBACK;
166              
167 0           call_sv(self->error_handler, G_DISCARD | G_EVAL);
168 0 0         if (SvTRUE(ERRSV)) {
    0          
169 0 0         warn("EV::Redis: exception in error handler: %s", SvPV_nolen(ERRSV));
170             }
171              
172 0 0         FREETMPS;
173 0           LEAVE;
174             }
175              
176 0           static void emit_error_str(EV__Redis self, const char* error) {
177 0 0         if (NULL == self->error_handler) return;
178 0           emit_error(self, sv_2mortal(newSVpv(error, 0)));
179             }
180              
181 0           static void invoke_callback_error(SV* cb, SV* error_sv) {
182 0           dSP;
183 0           ENTER;
184 0           SAVETMPS;
185 0 0         PUSHMARK(SP);
186 0 0         EXTEND(SP, 2);
187 0           PUSHs(&PL_sv_undef);
188 0           PUSHs(error_sv);
189 0           PUTBACK;
190 0           call_sv(cb, G_DISCARD | G_EVAL);
191 0 0         if (SvTRUE(ERRSV)) {
    0          
192 0 0         warn("EV::Redis: exception in command callback: %s", SvPV_nolen(ERRSV));
193             }
194 0 0         FREETMPS;
195 0           LEAVE;
196 0           }
197              
198             /* Check if DESTROY was called during a callback and deferred Safefree.
199             * Call after decrementing callback_depth. Returns 1 if self was freed
200             * (caller MUST NOT access self afterward). */
201 0           static int check_destroyed(EV__Redis self) {
202 0 0         if (self->magic == EV_REDIS_FREED &&
203 0 0         self->callback_depth == 0 &&
204 0 0         self->current_cb == NULL) {
205 0           Safefree(self);
206 0           return 1;
207             }
208 0           return 0;
209             }
210              
211             /* Free C-allocated fields (used by both PL_dirty and normal DESTROY paths) */
212 6           static void free_c_fields(EV__Redis self) {
213 6 50         if (NULL != self->host) { Safefree(self->host); self->host = NULL; }
214 6 50         if (NULL != self->path) { Safefree(self->path); self->path = NULL; }
215 6 50         if (NULL != self->source_addr) { Safefree(self->source_addr); self->source_addr = NULL; }
216 6 50         if (NULL != self->connect_timeout) { Safefree(self->connect_timeout); self->connect_timeout = NULL; }
217 6 50         if (NULL != self->command_timeout) { Safefree(self->command_timeout); self->command_timeout = NULL; }
218             #ifdef EV_REDIS_SSL
219 6 100         if (NULL != self->ssl_ctx) { redisFreeSSLContext(self->ssl_ctx); self->ssl_ctx = NULL; }
220             #endif
221 6           }
222              
223 6           static void stop_waiting_timer(EV__Redis self) {
224 6 50         if (self->waiting_timer_active && NULL != self->loop && !PL_dirty) {
    0          
    0          
225 0           ev_timer_stop(self->loop, &self->waiting_timer);
226 0           self->waiting_timer_active = 0;
227             }
228 6           }
229              
230 6           static void stop_reconnect_timer(EV__Redis self) {
231 6 50         if (self->reconnect_timer_active && NULL != self->loop && !PL_dirty) {
    0          
    0          
232 0           ev_timer_stop(self->loop, &self->reconnect_timer);
233 0           self->reconnect_timer_active = 0;
234             }
235 6           }
236              
237             /* Maximum timeout: ~23 days (fits safely in 32-bit calculations) */
238             #define MAX_TIMEOUT_MS 2000000000
239              
240 0           static void validate_timeout_ms(IV ms, const char* name) {
241 0 0         if (ms < 0) croak("%s must be non-negative", name);
242 0 0         if (ms > MAX_TIMEOUT_MS) croak("%s too large (max %d ms)", name, MAX_TIMEOUT_MS);
243 0           }
244              
245 0           static SV* timeout_accessor(struct timeval** tv_ptr, SV* timeout_ms, const char* name) {
246 0 0         if (NULL != timeout_ms && SvOK(timeout_ms)) {
    0          
247 0           IV ms = SvIV(timeout_ms);
248 0           validate_timeout_ms(ms, name);
249 0 0         if (NULL == *tv_ptr) {
250 0           Newx(*tv_ptr, 1, struct timeval);
251             }
252 0           (*tv_ptr)->tv_sec = (long)(ms / 1000);
253 0           (*tv_ptr)->tv_usec = (long)((ms % 1000) * 1000);
254             }
255              
256 0 0         if (NULL != *tv_ptr) {
257 0           return newSViv((IV)(*tv_ptr)->tv_sec * 1000 + (*tv_ptr)->tv_usec / 1000);
258             }
259 0           return &PL_sv_undef;
260             }
261              
262             /* Helper to set/clear a callback handler field.
263             * If called without handler (items == 1), clears the handler.
264             * If called with handler, sets it (or clears if handler is undef/not CODE).
265             * Returns the current handler (with refcount incremented) or undef. */
266 6           static SV* handler_accessor(SV** handler_ptr, SV* handler, int has_handler_arg) {
267             /* Clear existing handler first - both no-arg calls and set calls clear first */
268 6 50         if (NULL != *handler_ptr) {
269 0           SvREFCNT_dec(*handler_ptr);
270 0           *handler_ptr = NULL;
271             }
272              
273             /* If a handler argument was provided and it's a valid CODE ref, set it */
274 6 50         if (has_handler_arg && NULL != handler && SvOK(handler) && SvROK(handler) &&
    50          
    50          
    50          
275 6 50         SvTYPE(SvRV(handler)) == SVt_PVCV) {
276 6           *handler_ptr = SvREFCNT_inc(handler);
277             }
278              
279 6           return (NULL != *handler_ptr)
280 6           ? SvREFCNT_inc(*handler_ptr)
281 12 50         : &PL_sv_undef;
282             }
283              
284             /* Uses in_cb_cleanup flag to prevent re-entrant queue modification from
285             * user callbacks (e.g., if callback calls skip_pending). */
286 6           static void remove_cb_queue_sv(EV__Redis self, SV* error_sv) {
287             ngx_queue_t* q;
288             ev_redis_cb_t* cbt;
289              
290 6 50         if (self->in_cb_cleanup) {
291 0           return;
292             }
293              
294 6           self->in_cb_cleanup = 1;
295              
296             /* Use while loop with re-fetch of head each iteration.
297             * This is safe against re-entrant modifications because we
298             * re-check the queue state after each callback invocation. */
299 6 50         while (!ngx_queue_empty(&self->cb_queue)) {
300 0           q = ngx_queue_head(&self->cb_queue);
301 0           cbt = ngx_queue_data(q, ev_redis_cb_t, queue);
302              
303 0 0         if (cbt == self->current_cb) {
304             /* Skip current_cb - it is owned by an in-flight reply_cb. */
305 0 0         if (ngx_queue_next(q) == ngx_queue_sentinel(&self->cb_queue)) {
306 0           break;
307             }
308 0           q = ngx_queue_next(q);
309 0           cbt = ngx_queue_data(q, ev_redis_cb_t, queue);
310             }
311              
312 0           ngx_queue_remove(q);
313 0 0         if (!cbt->persist) self->pending_count--;
314              
315 0 0         if (NULL != cbt->cb) {
316 0 0         if (NULL != error_sv) {
317 0           invoke_callback_error(cbt->cb, error_sv);
318             }
319 0           SvREFCNT_dec(cbt->cb);
320             }
321 0           Safefree(cbt);
322             }
323              
324 6           self->in_cb_cleanup = 0;
325             }
326              
327 0           static void free_wait_entry(ev_redis_wait_t* wt) {
328             int i;
329 0 0         for (i = 0; i < wt->argc; i++) {
330 0           Safefree(wt->argv[i]);
331             }
332 0           Safefree(wt->argv);
333 0           Safefree(wt->argvlen);
334 0 0         if (NULL != wt->cb) {
335 0           SvREFCNT_dec(wt->cb);
336             }
337 0           Safefree(wt);
338 0           }
339              
340             /* Uses in_wait_cleanup flag to prevent re-entrant queue modification. */
341 6           static void clear_wait_queue_sv(EV__Redis self, SV* error_sv) {
342             ngx_queue_t* q;
343             ev_redis_wait_t* wt;
344              
345 6 50         if (self->in_wait_cleanup) {
346 0           return;
347             }
348              
349             /* Protect against re-entrancy: if a callback invokes skip_waiting() or
350             * skip_pending(), they should no-op since we're already clearing. */
351 6           self->in_wait_cleanup = 1;
352              
353 6 50         while (!ngx_queue_empty(&self->wait_queue)) {
354 0           q = ngx_queue_head(&self->wait_queue);
355 0           wt = ngx_queue_data(q, ev_redis_wait_t, queue);
356 0           ngx_queue_remove(q);
357 0           self->waiting_count--;
358              
359 0 0         if (NULL != error_sv && NULL != wt->cb) {
    0          
360 0           invoke_callback_error(wt->cb, error_sv);
361             }
362              
363 0           free_wait_entry(wt);
364             }
365              
366 6           self->in_wait_cleanup = 0;
367             }
368              
369             /* Forward declarations */
370             static void pre_connect_common(EV__Redis self, redisOptions* opts);
371             static int post_connect_setup(EV__Redis self, const char* err_prefix);
372             static void do_reconnect(EV__Redis self);
373             static void send_next_waiting(EV__Redis self);
374             static void schedule_waiting_timer(EV__Redis self);
375             static void expire_waiting_commands(EV__Redis self);
376             static void schedule_reconnect(EV__Redis self);
377             static void EV__redis_connect_cb(redisAsyncContext* c, int status);
378             static void EV__redis_disconnect_cb(const redisAsyncContext* c, int status);
379             static void EV__redis_push_cb(redisAsyncContext* ac, void* reply_ptr);
380             static SV* EV__redis_decode_reply(redisReply* reply);
381             /* Recursion limit for nested array/map/set replies. Bounds C-stack growth
382             * when decoding maliciously deep replies from an untrusted server. */
383             #define EV_REDIS_MAX_REPLY_DEPTH 512
384             static SV* decode_reply_depth(redisReply* reply, int depth);
385              
386 0           static void clear_connection_params(EV__Redis self) {
387 0 0         if (NULL != self->host) { Safefree(self->host); self->host = NULL; }
388 0 0         if (NULL != self->path) { Safefree(self->path); self->path = NULL; }
389 0           }
390              
391 0           static void reconnect_timer_cb(EV_P_ ev_timer* w, int revents) {
392 0           EV__Redis self = (EV__Redis)w->data;
393              
394             (void)loop;
395             (void)revents;
396              
397 0 0         if (NULL == self || self->magic != EV_REDIS_MAGIC) return;
    0          
398              
399 0           self->reconnect_timer_active = 0;
400 0           self->callback_depth++;
401 0           do_reconnect(self);
402 0           self->callback_depth--;
403 0 0         if (check_destroyed(self)) return;
404             }
405              
406 0           static void schedule_reconnect(EV__Redis self) {
407             ev_tstamp delay;
408              
409 0 0         if (!self->reconnect) return;
410 0 0         if (self->intentional_disconnect) return;
411 0 0         if (NULL == self->loop) return;
412 0           stop_reconnect_timer(self);
413 0 0         if (self->max_reconnect_attempts > 0 &&
414 0 0         self->reconnect_attempts >= self->max_reconnect_attempts) {
415             /* Clear waiting queue that was preserved for reconnect - reconnect has
416             * permanently failed, so these commands will never be sent. */
417 0           clear_wait_queue_sv(self, sv_2mortal(newSVpv("reconnect error: max attempts reached", 0)));
418 0           stop_waiting_timer(self);
419 0           emit_error_str(self, "reconnect error: max attempts reached");
420 0           return;
421             }
422              
423 0           self->reconnect_attempts++;
424 0           delay = self->reconnect_delay_ms / 1000.0;
425              
426 0           ev_timer_init(&self->reconnect_timer, reconnect_timer_cb, delay, 0);
427 0           self->reconnect_timer.data = (void*)self;
428 0           ev_timer_start(self->loop, &self->reconnect_timer);
429 0           self->reconnect_timer_active = 1;
430             }
431              
432             /* Expire waiting commands that have exceeded waiting_timeout.
433             * Uses head-refetch iteration pattern which is safe against re-entrant
434             * queue modification (e.g., if a callback calls skip_waiting). */
435 0           static void expire_waiting_commands(EV__Redis self) {
436             ngx_queue_t* q;
437             ev_redis_wait_t* wt;
438             ev_tstamp now;
439             ev_tstamp timeout;
440              
441 0           now = ev_now(self->loop);
442             /* Capture timeout at start - callbacks may modify self->waiting_timeout_ms
443             * and we need consistent behavior for the entire batch. */
444 0           timeout = self->waiting_timeout_ms / 1000.0;
445              
446             /* Use while loop with re-fetch of head each iteration.
447             * This is safe against re-entrant modifications. */
448 0 0         while (!ngx_queue_empty(&self->wait_queue)) {
449 0           q = ngx_queue_head(&self->wait_queue);
450 0           wt = ngx_queue_data(q, ev_redis_wait_t, queue);
451              
452 0 0         if (now - wt->queued_at >= timeout) {
453 0           ngx_queue_remove(q);
454 0           self->waiting_count--;
455              
456 0 0         if (NULL != wt->cb) {
457 0           invoke_callback_error(wt->cb, err_waiting_timeout);
458             }
459              
460 0           free_wait_entry(wt);
461             }
462             else {
463             /* Queue is FIFO with monotonically increasing queued_at times.
464             * If this entry hasn't expired, neither have any following entries. */
465 0           break;
466             }
467             }
468 0           }
469              
470 0           static void waiting_timer_cb(EV_P_ ev_timer* w, int revents) {
471 0           EV__Redis self = (EV__Redis)w->data;
472              
473             (void)loop;
474             (void)revents;
475              
476 0 0         if (NULL == self || self->magic != EV_REDIS_MAGIC) return;
    0          
477              
478 0           self->waiting_timer_active = 0;
479 0           self->callback_depth++;
480 0           expire_waiting_commands(self);
481 0           schedule_waiting_timer(self);
482 0           self->callback_depth--;
483 0 0         if (check_destroyed(self)) return;
484             }
485              
486 0           static void schedule_waiting_timer(EV__Redis self) {
487             ngx_queue_t* q;
488             ev_redis_wait_t* wt;
489             ev_tstamp now, expires_at, delay;
490              
491             /* Use helper which includes NULL loop check */
492 0           stop_waiting_timer(self);
493              
494 0 0         if (NULL == self->loop) return;
495 0 0         if (self->waiting_timeout_ms <= 0) return;
496 0 0         if (ngx_queue_empty(&self->wait_queue)) return;
497              
498 0           q = ngx_queue_head(&self->wait_queue);
499 0           wt = ngx_queue_data(q, ev_redis_wait_t, queue);
500              
501 0           now = ev_now(self->loop);
502 0           expires_at = wt->queued_at + self->waiting_timeout_ms / 1000.0;
503 0           delay = expires_at - now;
504 0 0         if (delay < 0) delay = 0;
505              
506 0           ev_timer_init(&self->waiting_timer, waiting_timer_cb, delay, 0);
507 0           self->waiting_timer.data = (void*)self;
508 0           ev_timer_start(self->loop, &self->waiting_timer);
509 0           self->waiting_timer_active = 1;
510             }
511              
512 0           static void do_reconnect(EV__Redis self) {
513             redisOptions opts;
514 0           memset(&opts, 0, sizeof(opts));
515              
516 0 0         if (NULL == self->loop) {
517             /* Object is being destroyed */
518 0           return;
519             }
520              
521 0 0         if (NULL != self->ac) {
522             /* Already connected or connecting */
523 0           return;
524             }
525              
526 0           self->intentional_disconnect = 0;
527 0           pre_connect_common(self, &opts);
528              
529 0 0         if (NULL != self->path) {
530 0           REDIS_OPTIONS_SET_UNIX(&opts, self->path);
531             }
532 0 0         else if (NULL != self->host) {
533 0           REDIS_OPTIONS_SET_TCP(&opts, self->host, self->port);
534             }
535             else {
536 0           emit_error_str(self, "reconnect error: no connection parameters");
537 0           return;
538             }
539              
540 0           self->ac = redisAsyncConnectWithOptions(&opts);
541 0 0         if (NULL == self->ac) {
542 0           emit_error_str(self, "reconnect error: cannot allocate memory");
543 0           schedule_reconnect(self);
544 0           return;
545             }
546              
547 0 0         if (REDIS_OK != post_connect_setup(self, "reconnect error")) {
548 0           schedule_reconnect(self);
549 0           return;
550             }
551             }
552              
553 0           static void EV__redis_connect_cb(redisAsyncContext* c, int status) {
554 0           EV__Redis self = (EV__Redis)c->data;
555              
556 0 0         if (NULL == self || self->magic != EV_REDIS_MAGIC) return;
    0          
557              
558 0           self->callback_depth++;
559              
560 0 0         if (REDIS_OK != status) {
561 0           self->ac = NULL;
562 0 0         emit_error_str(self, c->errstr[0] ? c->errstr : "connect failed");
563 0 0         if (!self->reconnect || !self->resume_waiting_on_reconnect
    0          
564 0 0         || self->intentional_disconnect) {
565 0 0         clear_wait_queue_sv(self, sv_2mortal(newSVpv(
566             c->errstr[0] ? c->errstr : "connect failed", 0)));
567 0           stop_waiting_timer(self);
568             }
569 0           schedule_reconnect(self);
570             }
571             else {
572 0           self->reconnect_attempts = 0;
573              
574 0 0         if (NULL != self->connect_handler) {
575 0           dSP;
576              
577 0           ENTER;
578 0           SAVETMPS;
579              
580 0 0         PUSHMARK(SP);
581 0           PUTBACK;
582              
583 0           call_sv(self->connect_handler, G_DISCARD | G_EVAL);
584 0 0         if (SvTRUE(ERRSV)) {
    0          
585 0 0         warn("EV::Redis: exception in connect handler: %s", SvPV_nolen(ERRSV));
586             }
587              
588 0 0         FREETMPS;
589 0           LEAVE;
590             }
591              
592 0           send_next_waiting(self);
593             }
594              
595 0           self->callback_depth--;
596 0           check_destroyed(self);
597             }
598              
599 0           static void EV__redis_disconnect_cb(const redisAsyncContext* c, int status) {
600 0           EV__Redis self = (EV__Redis)c->data;
601             SV* error_sv;
602 0           int should_reconnect = 0;
603             int was_intentional;
604             int will_reconnect;
605              
606 0 0         if (NULL == self || self->magic != EV_REDIS_MAGIC) return;
    0          
607              
608             /* Stale disconnect callback: user already established a new connection
609             * (e.g., called disconnect() then connect() before the old deferred
610             * disconnect fired). Old pending callbacks were already processed by
611             * reply_cb. Skip all cleanup to avoid clobbering the new connection.
612             * Clear ac_saved if it points to this old context to prevent dangling. */
613 0 0         if (self->ac != NULL && self->ac != c) {
    0          
614 0 0         if (self->ac_saved == c) self->ac_saved = NULL;
615 0           return;
616             }
617              
618 0           was_intentional = self->intentional_disconnect;
619 0           self->intentional_disconnect = 0;
620              
621 0           self->ac = NULL;
622 0           self->ac_saved = NULL; /* disconnect callback fired normally */
623 0           self->callback_depth++;
624              
625 0 0         if (REDIS_OK == status) {
626 0           error_sv = err_disconnected;
627             }
628             else {
629 0 0         error_sv = sv_2mortal(newSVpv(
630             c->errstr[0] ? c->errstr : "disconnected", 0));
631 0 0         emit_error_str(self, c->errstr[0] ? c->errstr : "disconnected");
632 0 0         if (!was_intentional) {
633 0           should_reconnect = 1;
634             }
635             }
636              
637 0 0         if (NULL != self->disconnect_handler) {
638 0           dSP;
639              
640 0           ENTER;
641 0           SAVETMPS;
642              
643 0 0         PUSHMARK(SP);
644 0           PUTBACK;
645              
646 0           call_sv(self->disconnect_handler, G_DISCARD | G_EVAL);
647 0 0         if (SvTRUE(ERRSV)) {
    0          
648 0 0         warn("EV::Redis: exception in disconnect handler: %s", SvPV_nolen(ERRSV));
649             }
650              
651 0 0         FREETMPS;
652 0           LEAVE;
653              
654             /* Re-check: user's handler might have called connect() or reconnect()
655             * establishing a new ac. If so, skip clearing cb_queue to avoid
656             * freeing new commands; still honour resume_waiting_on_reconnect=0
657             * by clearing the old wait queue (its entries belong to the prior
658             * connection, not the new one). ac_saved is already handled or NULL. */
659 0 0         if (self->ac != NULL && self->ac != c) {
    0          
660 0 0         if (!self->resume_waiting_on_reconnect) {
661 0           clear_wait_queue_sv(self, error_sv);
662 0           stop_waiting_timer(self);
663             }
664 0           self->callback_depth--;
665 0           check_destroyed(self);
666 0           return;
667             }
668             }
669              
670 0           remove_cb_queue_sv(self, error_sv);
671              
672 0 0         will_reconnect = should_reconnect && !self->intentional_disconnect && self->reconnect;
    0          
    0          
673 0 0         if (!self->resume_waiting_on_reconnect || was_intentional || !will_reconnect) {
    0          
    0          
674 0           clear_wait_queue_sv(self, error_sv);
675 0           stop_waiting_timer(self);
676             }
677              
678 0 0         if (will_reconnect) {
679 0           schedule_reconnect(self);
680             }
681              
682 0           self->callback_depth--;
683 0           check_destroyed(self);
684             }
685              
686 0           static void EV__redis_push_cb(redisAsyncContext* ac, void* reply_ptr) {
687 0           EV__Redis self = (EV__Redis)ac->data;
688 0           redisReply* reply = (redisReply*)reply_ptr;
689              
690 0 0         if (NULL == self || self->magic != EV_REDIS_MAGIC) return;
    0          
691 0 0         if (NULL == self->push_handler || NULL == reply) return;
    0          
692              
693 0           self->callback_depth++;
694              
695             {
696 0           dSP;
697              
698 0           ENTER;
699 0           SAVETMPS;
700              
701 0 0         PUSHMARK(SP);
702 0 0         XPUSHs(sv_2mortal(EV__redis_decode_reply(reply)));
703 0           PUTBACK;
704              
705 0           call_sv(self->push_handler, G_DISCARD | G_EVAL);
706 0 0         if (SvTRUE(ERRSV)) {
    0          
707 0 0         warn("EV::Redis: exception in push handler: %s", SvPV_nolen(ERRSV));
708             }
709              
710 0 0         FREETMPS;
711 0           LEAVE;
712             }
713              
714 0           self->callback_depth--;
715 0           check_destroyed(self);
716             }
717              
718 0           static void pre_connect_common(EV__Redis self, redisOptions* opts) {
719 0 0         if (NULL != self->connect_timeout) {
720 0           opts->connect_timeout = self->connect_timeout;
721             }
722 0 0         if (NULL != self->command_timeout) {
723 0           opts->command_timeout = self->command_timeout;
724             }
725 0 0         if (self->prefer_ipv4) {
726 0           opts->options |= REDIS_OPT_PREFER_IPV4;
727             }
728 0 0         else if (self->prefer_ipv6) {
729 0           opts->options |= REDIS_OPT_PREFER_IPV6;
730             }
731 0 0         if (self->cloexec) {
732 0           opts->options |= REDIS_OPT_SET_SOCK_CLOEXEC;
733             }
734 0 0         if (self->reuseaddr) {
735 0           opts->options |= REDIS_OPT_REUSEADDR;
736             }
737 0 0         if (NULL != self->source_addr && NULL == self->path) {
    0          
738 0           opts->endpoint.tcp.source_addr = self->source_addr;
739             }
740 0           }
741              
742             /* Set up a newly allocated redisAsyncContext: SSL, keepalive, libev, callbacks.
743             * On failure: frees ac, nulls self->ac, emits error with err_prefix. */
744 0           static int post_connect_setup(EV__Redis self, const char* err_prefix) {
745 0           self->ac_saved = NULL;
746 0           self->ac->data = (void*)self;
747              
748             #ifdef EV_REDIS_SSL
749 0 0         if (NULL != self->ssl_ctx) {
750 0 0         if (REDIS_OK != redisInitiateSSLWithContext(&self->ac->c, self->ssl_ctx)) {
751 0 0         SV* err = sv_2mortal(newSVpvf("%s: SSL initiation failed: %s",
752             err_prefix, self->ac->errstr[0] ? self->ac->errstr : "unknown error"));
753 0           redisAsyncFree(self->ac);
754 0           self->ac = NULL;
755 0           emit_error(self, err);
756 0           return REDIS_ERR;
757             }
758             }
759             #endif
760              
761 0 0         if (self->keepalive > 0) {
762 0           redisEnableKeepAliveWithInterval(&self->ac->c, self->keepalive);
763             }
764 0 0         if (self->tcp_user_timeout > 0) {
765 0           redisSetTcpUserTimeout(&self->ac->c, self->tcp_user_timeout);
766             }
767              
768 0 0         if (REDIS_OK != redisLibevAttach(self->loop, self->ac)) {
769 0           SV* err = sv_2mortal(newSVpvf("%s: cannot attach libev", err_prefix));
770 0           redisAsyncFree(self->ac);
771 0           self->ac = NULL;
772 0           emit_error(self, err);
773 0           return REDIS_ERR;
774             }
775              
776 0 0         if (self->priority != 0) {
777 0           redisLibevSetPriority(self->ac, self->priority);
778             }
779              
780 0           redisAsyncSetConnectCallbackNC(self->ac, EV__redis_connect_cb);
781 0           redisAsyncSetDisconnectCallback(self->ac, EV__redis_disconnect_cb);
782 0 0         if (NULL != self->push_handler) {
783 0           redisAsyncSetPushCallback(self->ac, EV__redis_push_cb);
784             }
785              
786 0 0         if (self->ac->err) {
787 0           SV* err = sv_2mortal(newSVpvf("%s: %s", err_prefix, self->ac->errstr));
788 0           redisAsyncFree(self->ac);
789 0           self->ac = NULL;
790 0           emit_error(self, err);
791 0           return REDIS_ERR;
792             }
793              
794 0           return REDIS_OK;
795             }
796              
797 0           static SV* decode_reply_depth(redisReply* reply, int depth) {
798             SV* res;
799              
800 0           switch (reply->type) {
801 0           case REDIS_REPLY_STRING:
802             case REDIS_REPLY_ERROR:
803             case REDIS_REPLY_STATUS:
804             case REDIS_REPLY_BIGNUM:
805             case REDIS_REPLY_VERB:
806 0           res = newSVpvn(reply->str, reply->len);
807 0           break;
808              
809 0           case REDIS_REPLY_INTEGER:
810 0           res = newSViv(reply->integer);
811 0           break;
812              
813 0           case REDIS_REPLY_DOUBLE:
814 0           res = newSVnv(reply->dval);
815 0           break;
816              
817 0           case REDIS_REPLY_BOOL:
818 0           res = newSViv(reply->integer ? 1 : 0);
819 0           break;
820              
821 0           case REDIS_REPLY_NIL:
822 0           res = newSV(0);
823 0           break;
824              
825 0           case REDIS_REPLY_ARRAY:
826             case REDIS_REPLY_MAP:
827             case REDIS_REPLY_SET:
828             case REDIS_REPLY_ATTR:
829             case REDIS_REPLY_PUSH: {
830 0           AV* av = newAV();
831             size_t i;
832 0 0         if (depth >= EV_REDIS_MAX_REPLY_DEPTH) {
833             /* Stop recursing: an empty array placeholder bounds C-stack
834             * usage against a hostile server replying with deep nesting. */
835 0           res = newRV_noinc((SV*)av);
836 0           break;
837             }
838 0 0         if (reply->elements > 0) {
839 0           av_extend(av, (SSize_t)(reply->elements - 1));
840 0 0         for (i = 0; i < reply->elements; i++) {
841 0 0         if (NULL != reply->element[i]) {
842 0           av_push(av, decode_reply_depth(reply->element[i], depth + 1));
843             }
844             else {
845 0           av_push(av, newSV(0));
846             }
847             }
848             }
849 0           res = newRV_noinc((SV*)av);
850 0           break;
851             }
852              
853 0           default:
854 0           res = newSV(0);
855 0           break;
856             }
857              
858 0           return res;
859             }
860              
861 0           static SV* EV__redis_decode_reply(redisReply* reply) {
862 0           return decode_reply_depth(reply, 0);
863             }
864              
865 0           static void EV__redis_reply_cb(redisAsyncContext* c, void* reply, void* privdata) {
866 0           EV__Redis self = (EV__Redis)c->data;
867             ev_redis_cb_t* cbt;
868             SV* sv_reply;
869             SV* sv_err;
870              
871 0           cbt = (ev_redis_cb_t*)privdata;
872              
873 0 0         if (cbt->skipped) {
874 0 0         if (!cbt->persist || NULL == reply) {
    0          
875             /* Multi-channel persistent: hiredis fires once per channel with
876             * same cbt. Decrement sub_count, free only on last call. */
877 0 0         if (cbt->persist && NULL == reply && cbt->sub_count > 1) {
    0          
    0          
878 0           cbt->sub_count--;
879 0           return;
880             }
881 0           Safefree(cbt);
882             }
883 0 0         else if (cbt->persist && reply != NULL && is_unsub_reply((redisReply*)reply)) {
    0          
    0          
884 0           cbt->sub_count--;
885 0 0         if (cbt->sub_count <= 0) {
886 0           Safefree(cbt);
887             }
888             }
889 0           return;
890             }
891              
892             /* self is NULL when DESTROY nulled ac->data (deferred free inside
893             * REDIS_IN_CALLBACK) or during PL_dirty. Still invoke the callback
894             * with a disconnect error so users can clean up resources. The hiredis
895             * context (c) is still alive here — safe to read c->errstr.
896             * cb may be NULL during PL_dirty where we pre-null it.
897             * For persistent commands (multi-channel subscribe), hiredis fires
898             * reply_cb once per channel with the same cbt. Invoke the callback
899             * only once (null cb after), use sub_count to track when to free. */
900 0 0         if (self == NULL) {
901 0 0         if (NULL != cbt->cb) {
902 0           invoke_callback_error(cbt->cb,
903 0 0         sv_2mortal(newSVpv(c->errstr[0] ? c->errstr : "disconnected", 0)));
904 0           SvREFCNT_dec(cbt->cb);
905 0           cbt->cb = NULL;
906             }
907 0 0         if (cbt->persist && reply == NULL && cbt->sub_count > 1) {
    0          
    0          
908 0           cbt->sub_count--;
909 0           return;
910             }
911 0           Safefree(cbt);
912 0           return;
913             }
914              
915             /* If self is marked as freed (during DESTROY), we still invoke the
916             * callback with an error, but skip any self->field access afterward.
917             * For persistent commands, don't free cbt here — leave it in the queue
918             * for remove_cb_queue_sv to clean up after redisAsyncFree returns. */
919 0 0         if (self->magic == EV_REDIS_FREED) {
920 0 0         if (NULL != cbt->cb) {
921 0           self->callback_depth++;
922 0 0         invoke_callback_error(cbt->cb, sv_2mortal(newSVpv(c->errstr[0] ? c->errstr : "disconnected", 0)));
923 0           self->callback_depth--;
924 0           SvREFCNT_dec(cbt->cb);
925 0           cbt->cb = NULL;
926             }
927 0 0         if (!cbt->persist) {
928 0           ngx_queue_remove(&cbt->queue);
929 0           Safefree(cbt);
930             }
931 0           check_destroyed(self);
932 0           return;
933             }
934              
935             /* Unknown magic - memory corruption, skip.
936             * Don't touch queue pointers (self's memory may be garbage).
937             * Always decrement refcount since callback will never be invoked again. */
938 0 0         if (self->magic != EV_REDIS_MAGIC) {
939 0 0         if (NULL != cbt->cb) SvREFCNT_dec(cbt->cb);
940 0           Safefree(cbt);
941 0           return;
942             }
943              
944 0           self->current_cb = cbt;
945 0           self->callback_depth++;
946              
947 0 0         if (NULL != cbt->cb) {
948 0 0         if (NULL == reply) {
949 0 0         sv_err = sv_2mortal(newSVpv(
950             c->errstr[0] ? c->errstr : "disconnected", 0));
951 0           invoke_callback_error(cbt->cb, sv_err);
952             }
953             else {
954 0           dSP;
955              
956 0           ENTER;
957 0           SAVETMPS;
958              
959 0 0         PUSHMARK(SP);
960 0 0         EXTEND(SP, 2);
961 0           sv_reply = sv_2mortal(EV__redis_decode_reply((redisReply*)reply));
962 0 0         if (((redisReply*)reply)->type == REDIS_REPLY_ERROR) {
963 0           PUSHs(&PL_sv_undef);
964 0           PUSHs(sv_reply);
965             }
966             else {
967 0           PUSHs(sv_reply);
968             }
969 0           PUTBACK;
970              
971 0           call_sv(cbt->cb, G_DISCARD | G_EVAL);
972 0 0         if (SvTRUE(ERRSV)) {
    0          
973 0 0         warn("EV::Redis: exception in command callback: %s", SvPV_nolen(ERRSV));
974             }
975              
976 0 0         FREETMPS;
977 0           LEAVE;
978             }
979             }
980              
981 0           self->callback_depth--;
982 0           self->current_cb = NULL;
983              
984             /* If DESTROY was called during our callback (e.g., user undef'd $redis),
985             * self->magic is EV_REDIS_FREED but self is still valid (DESTROY defers
986             * Safefree when callback_depth > 0). Complete cleanup here.
987             * For persistent commands (multi-channel subscribe), hiredis will fire
988             * reply_cb again for remaining channels via __redisAsyncFree. Null the
989             * callback to prevent double invocation, but leave cbt alive so those
990             * later calls see it and can track sub_count for proper cleanup. */
991 0 0         if (self->magic == EV_REDIS_FREED) {
992 0 0         if (NULL != cbt->cb) {
993 0           SvREFCNT_dec(cbt->cb);
994 0           cbt->cb = NULL;
995             }
996 0 0         if (!cbt->persist) {
997 0           Safefree(cbt);
998             }
999 0           check_destroyed(self);
1000 0           return;
1001             }
1002              
1003 0 0         if (cbt->skipped) {
1004             /* Defensive check: handles edge case where callback is marked skipped
1005             * during its own execution (e.g., via reentrant event loop where a
1006             * nested callback overwrites current_cb, allowing skip_pending to
1007             * process this callback). ngx_queue_remove is safe here due to
1008             * ngx_queue_init in skip_pending. Don't decrement pending_count since
1009             * skip_pending already did when it set skipped=1. */
1010 0           ngx_queue_remove(&cbt->queue);
1011             /* For persistent commands (e.g., SUBSCRIBE), hiredis fires reply_cb
1012             * once per subscribed channel during disconnect. Only free cbt on the
1013             * last channel to prevent use-after-free. */
1014 0 0         if (cbt->persist && cbt->sub_count > 1) {
    0          
1015 0           cbt->sub_count--;
1016 0           return;
1017             }
1018 0           Safefree(cbt);
1019 0           self->callback_depth++;
1020 0           send_next_waiting(self);
1021 0           self->callback_depth--;
1022 0           check_destroyed(self);
1023 0           return;
1024             }
1025              
1026             /* Detect end of persistent subscription: when all channels from a
1027             * SUBSCRIBE command have been unsubscribed, hiredis removes its internal
1028             * callback entry. Clean up our cbt to prevent orphaned queue entries. */
1029 0 0         if (cbt->persist && reply != NULL && is_unsub_reply((redisReply*)reply)) {
    0          
    0          
1030 0           cbt->sub_count--;
1031 0 0         if (cbt->sub_count <= 0) {
1032             /* All channels unsubscribed — persistent commands are not counted
1033             * in pending_count, so don't decrement it. */
1034 0           ngx_queue_remove(&cbt->queue);
1035 0           self->callback_depth++;
1036 0 0         if (NULL != cbt->cb) SvREFCNT_dec(cbt->cb);
1037 0           Safefree(cbt);
1038 0           self->callback_depth--;
1039 0           check_destroyed(self);
1040 0           return;
1041             }
1042             }
1043              
1044             /* Connection teardown with active subscription: hiredis fires reply_cb
1045             * once per subscribed channel (from dict iteration in __redisAsyncFree).
1046             * Track sub_count and remove from queue on last channel to prevent
1047             * disconnect_cb's remove_cb_queue_sv from invoking the callback again. */
1048 0 0         if (cbt->persist && NULL == reply) {
    0          
1049 0 0         if (cbt->sub_count > 1) {
1050 0           cbt->sub_count--;
1051             } else {
1052 0           ngx_queue_remove(&cbt->queue);
1053 0           self->callback_depth++;
1054 0 0         if (NULL != cbt->cb) SvREFCNT_dec(cbt->cb);
1055 0           Safefree(cbt);
1056 0           self->callback_depth--;
1057 0           check_destroyed(self);
1058             }
1059 0           return;
1060             }
1061              
1062 0 0         if (0 == cbt->persist) {
1063             /* Remove from queue BEFORE SvREFCNT_dec. The SvREFCNT_dec may free a
1064             * closure that holds the last reference to this object, triggering
1065             * DESTROY. If cbt is still in the queue, DESTROY's remove_cb_queue_sv
1066             * would double-free it. Wrapping in callback_depth defers DESTROY's
1067             * Safefree(self) so we can safely access self afterward. */
1068 0           ngx_queue_remove(&cbt->queue);
1069 0           self->pending_count--;
1070 0           self->callback_depth++;
1071 0 0         if (NULL != cbt->cb) SvREFCNT_dec(cbt->cb);
1072 0           Safefree(cbt);
1073             /* Don't drain waiting queue when reply is NULL (connection dying) —
1074             * disconnect_cb will handle reconnect and wait queue preservation. */
1075 0 0         if (reply != NULL) {
1076 0           send_next_waiting(self);
1077             }
1078 0           self->callback_depth--;
1079 0           check_destroyed(self);
1080             }
1081             }
1082              
1083             /* Submit a cbt (already in cb_queue) to Redis. On failure, removes cbt from
1084             * queue, invokes error callback, and frees cbt. Returns REDIS_OK or REDIS_ERR. */
1085 0           static int submit_to_redis(EV__Redis self, ev_redis_cb_t* cbt,
1086             int argc, const char** argv, const size_t* argvlen)
1087             {
1088 0           redisCallbackFn* fn = EV__redis_reply_cb;
1089 0           void* privdata = (void*)cbt;
1090 0           const char* cmd = argv[0];
1091              
1092             /* Hiredis does not store callbacks for unsubscribe commands — replies are
1093             * routed through the original subscribe callback. Pass NULL so hiredis
1094             * doesn't hold a dangling reference; we clean up our cbt below. */
1095 0 0         if (cbt->persist && is_unsubscribe_command(cmd)) {
    0          
1096 0           fn = NULL;
1097 0           privdata = NULL;
1098             }
1099              
1100 0           int r = redisAsyncCommandArgv(
1101             self->ac, fn, privdata,
1102             argc, argv, argvlen
1103             );
1104              
1105 0 0         if (REDIS_OK != r) {
1106 0           ngx_queue_remove(&cbt->queue);
1107 0 0         if (!cbt->persist) self->pending_count--;
1108              
1109 0 0         if (NULL != cbt->cb) {
1110 0 0         invoke_callback_error(cbt->cb, sv_2mortal(newSVpv(
    0          
1111             (self->ac && self->ac->errstr[0]) ? self->ac->errstr : "command failed", 0)));
1112 0           SvREFCNT_dec(cbt->cb);
1113             }
1114 0           Safefree(cbt);
1115 0 0         } else if (fn == NULL) {
1116             /* Successfully sent an unsubscribe command. Since hiredis won't
1117             * call us back, we must clean up our tracking 'cbt' now. */
1118 0           ngx_queue_remove(&cbt->queue);
1119 0 0         if (NULL != cbt->cb) SvREFCNT_dec(cbt->cb);
1120 0           Safefree(cbt);
1121             }
1122              
1123 0           return r;
1124             }
1125              
1126             /* Send waiting commands to Redis. Uses iterative loop instead of recursion
1127             * to avoid stack overflow when many commands fail consecutively. */
1128 0           static void send_next_waiting(EV__Redis self) {
1129             ngx_queue_t* q;
1130             ev_redis_wait_t* wt;
1131             ev_redis_cb_t* cbt;
1132              
1133             while (1) {
1134             /* Check preconditions each iteration - they may change after callbacks */
1135 0 0         if (NULL == self->ac || self->intentional_disconnect) return;
    0          
1136 0 0         if (ngx_queue_empty(&self->wait_queue)) return;
1137 0 0         if (self->max_pending > 0 && self->pending_count >= self->max_pending) return;
    0          
1138              
1139 0           q = ngx_queue_head(&self->wait_queue);
1140 0           wt = ngx_queue_data(q, ev_redis_wait_t, queue);
1141 0           ngx_queue_remove(q);
1142 0           self->waiting_count--;
1143              
1144 0           Newx(cbt, 1, ev_redis_cb_t);
1145 0           cbt->cb = wt->cb;
1146 0           wt->cb = NULL;
1147 0           cbt->skipped = 0;
1148 0           cbt->persist = wt->persist;
1149 0 0         cbt->sub_count = wt->persist ? wt->argc - 1 : 0;
1150 0           ngx_queue_init(&cbt->queue);
1151 0           ngx_queue_insert_tail(&self->cb_queue, &cbt->queue);
1152 0 0         if (!cbt->persist) self->pending_count++;
1153              
1154             /* Ignore submit_to_redis return: on failure it invokes cbt's error
1155             * callback and frees cbt, so the loop continues to try the next entry. */
1156 0           (void)submit_to_redis(self, cbt, wt->argc,
1157 0           (const char**)wt->argv, wt->argvlen);
1158 0           free_wait_entry(wt);
1159             }
1160             }
1161              
1162             MODULE = EV::Redis PACKAGE = EV::Redis
1163              
1164             BOOT:
1165             {
1166 24 50         I_EV_API("EV::Redis");
    50          
    50          
1167              
1168             /* Initialize shared error strings */
1169 24           err_skipped = newSVpvs_share("skipped");
1170 24           SvREADONLY_on(err_skipped);
1171              
1172 24           err_waiting_timeout = newSVpvs_share("waiting timeout");
1173 24           SvREADONLY_on(err_waiting_timeout);
1174              
1175 24           err_disconnected = newSVpvs_share("disconnected");
1176 24           SvREADONLY_on(err_disconnected);
1177             #ifdef EV_REDIS_SSL
1178 24           redisInitOpenSSL();
1179             #endif
1180             }
1181              
1182             EV::Redis
1183             _new(char* class, EV::Loop loop);
1184             CODE:
1185             {
1186             PERL_UNUSED_VAR(class);
1187 6           Newxz(RETVAL, 1, ev_redis_t);
1188 6           RETVAL->magic = EV_REDIS_MAGIC;
1189 6           ngx_queue_init(&RETVAL->cb_queue);
1190 6           ngx_queue_init(&RETVAL->wait_queue);
1191 6           RETVAL->loop = loop;
1192 6           RETVAL->cloexec = 1;
1193             }
1194             OUTPUT:
1195             RETVAL
1196              
1197             void
1198             DESTROY(EV::Redis self);
1199             CODE:
1200             {
1201             redisAsyncContext* ac_to_free;
1202 6           int skip_cb_cleanup = 0;
1203              
1204             /* Check for use-after-free: if magic number is wrong, this object
1205             * was already freed and memory is being reused. Skip cleanup. */
1206 6 50         if (self->magic != EV_REDIS_MAGIC) {
1207 0 0         if (self->magic == EV_REDIS_FREED) {
1208             /* Already destroyed - this is a double-free at Perl level */
1209 0           return;
1210             }
1211             /* Unknown magic - memory corruption or uninitialized */
1212 0           return;
1213             }
1214              
1215             /* Mark as freed FIRST to prevent re-entrant DESTROY */
1216 6           self->magic = EV_REDIS_FREED;
1217              
1218             /* Stop timers BEFORE PL_dirty check. Timer callbacks have self as data
1219             * pointer, so we must stop them before freeing self to prevent UAF.
1220             * The stop helpers check for NULL loop, so this is safe even if loop
1221             * is already destroyed. */
1222 6           stop_reconnect_timer(self);
1223 6           stop_waiting_timer(self);
1224              
1225             /* During global destruction (PL_dirty), the EV loop and other Perl
1226             * objects may already be destroyed. Clean up hiredis and our own memory
1227             * but don't invoke Perl-level handlers.
1228             * CRITICAL: We must call redisAsyncFree to stop the libev adapter's
1229             * watchers and free the redisAsyncContext. Without this, the adapter's
1230             * ev_io/ev_timer watchers remain registered in the EV loop with dangling
1231             * data pointers, causing SEGV during process cleanup. */
1232 6 50         if (PL_dirty) {
1233 0 0         if (NULL != self->ac) {
1234             /* Null cb_queue callbacks before redisAsyncFree to prevent
1235             * SvREFCNT_dec on potentially-freed SVs during global destruction.
1236             * (Same pattern as wait_queue below.) */
1237             {
1238             ngx_queue_t* q;
1239 0           for (q = ngx_queue_head(&self->cb_queue);
1240 0 0         q != ngx_queue_sentinel(&self->cb_queue);
1241 0           q = ngx_queue_next(q)) {
1242 0           ev_redis_cb_t* cbt = ngx_queue_data(q, ev_redis_cb_t, queue);
1243 0           cbt->cb = NULL;
1244             }
1245             }
1246 0           self->ac->data = NULL; /* prevent callbacks from accessing self */
1247 0           redisAsyncFree(self->ac);
1248 0           self->ac = NULL;
1249             }
1250 0 0         if (NULL != self->ac_saved) {
1251 0           self->ac_saved->data = NULL;
1252 0           self->ac_saved = NULL;
1253             }
1254 0           free_c_fields(self);
1255             /* Free wait_queue C memory (skip Perl callbacks during global destruction) */
1256 0 0         while (!ngx_queue_empty(&self->wait_queue)) {
1257 0           ngx_queue_t* q = ngx_queue_head(&self->wait_queue);
1258 0           ev_redis_wait_t* wt = ngx_queue_data(q, ev_redis_wait_t, queue);
1259 0           ngx_queue_remove(q);
1260 0           wt->cb = NULL; /* skip SvREFCNT_dec during PL_dirty */
1261 0           free_wait_entry(wt);
1262             }
1263 0           Safefree(self);
1264 0           return;
1265             }
1266              
1267 6           self->reconnect = 0;
1268              
1269             /* CRITICAL: Set self->ac to NULL BEFORE calling redisAsyncFree.
1270             * redisAsyncFree triggers reply callbacks, which call send_next_waiting,
1271             * which checks self->ac != NULL before issuing commands. If we don't
1272             * clear self->ac first, send_next_waiting will try to call
1273             * redisAsyncCommandArgv during the teardown, causing heap corruption. */
1274 6           self->loop = NULL;
1275 6           ac_to_free = self->ac;
1276 6           self->ac = NULL;
1277 6 50         if (NULL != ac_to_free) {
1278             /* If inside a hiredis callback (REDIS_IN_CALLBACK), redisAsyncFree
1279             * will be deferred. hiredis will fire pending reply callbacks later
1280             * via __redisAsyncFree. NULL ac->data so those callbacks see NULL self
1281             * and handle cleanup without accessing freed memory. */
1282 0 0         if (ac_to_free->c.flags & REDIS_IN_CALLBACK) {
1283 0           ac_to_free->data = NULL;
1284 0           skip_cb_cleanup = 1;
1285             }
1286              
1287             /* Protect against premature free in disconnect_cb if triggered synchronously */
1288 0           self->callback_depth++;
1289 0           redisAsyncFree(ac_to_free);
1290 0           self->callback_depth--;
1291             }
1292             /* If disconnect() was called from inside a callback, ac_saved points to
1293             * the deferred async context. NULL its data pointer to prevent the
1294             * deferred disconnect callback from accessing freed self. */
1295 6 50         if (self->ac_saved != NULL) {
1296 0           self->ac_saved->data = NULL;
1297 0           self->ac_saved = NULL;
1298 0           skip_cb_cleanup = 1;
1299             }
1300 6 50         CLEAR_HANDLER(self->error_handler);
1301 6 50         CLEAR_HANDLER(self->connect_handler);
1302 6 50         CLEAR_HANDLER(self->disconnect_handler);
1303 6 50         CLEAR_HANDLER(self->push_handler);
1304 6           free_c_fields(self);
1305              
1306 6 50         if (!self->in_wait_cleanup) {
1307 6           clear_wait_queue_sv(self, err_disconnected);
1308             }
1309 6 50         if (!skip_cb_cleanup && !self->in_cb_cleanup) {
    50          
1310             /* Safe to free cbts ourselves — hiredis has no deferred references. */
1311 6           remove_cb_queue_sv(self, NULL);
1312             }
1313             /* else: hiredis still holds references to our cbts (deferred free/disconnect).
1314             * reply_cb will handle cbt cleanup when called with self == NULL. */
1315              
1316             /* Defer Safefree if inside a callback — check_destroyed() handles it */
1317 6 50         if (self->current_cb == NULL && self->callback_depth == 0) {
    50          
1318 6           Safefree(self);
1319             }
1320             }
1321              
1322             void
1323             connect(EV::Redis self, char* hostname, int port = 6379);
1324             CODE:
1325             {
1326             redisOptions opts;
1327              
1328 0 0         if (NULL != self->ac) {
1329 0           croak("already connected");
1330             }
1331              
1332 0           self->intentional_disconnect = 0;
1333 0           self->reconnect_attempts = 0;
1334 0           clear_connection_params(self);
1335 0           self->host = savepv(hostname);
1336 0           self->port = port;
1337              
1338 0           memset(&opts, 0, sizeof(opts));
1339 0           pre_connect_common(self, &opts);
1340 0           REDIS_OPTIONS_SET_TCP(&opts, hostname, port);
1341 0           self->ac = redisAsyncConnectWithOptions(&opts);
1342 0 0         if (NULL == self->ac) {
1343 0           croak("connect error: cannot allocate memory");
1344             }
1345              
1346 0           (void)post_connect_setup(self, "connect error");
1347             }
1348              
1349             void
1350             connect_unix(EV::Redis self, const char* path);
1351             CODE:
1352             {
1353             redisOptions opts;
1354              
1355 0 0         if (NULL != self->ac) {
1356 0           croak("already connected");
1357             }
1358              
1359 0           self->intentional_disconnect = 0;
1360 0           self->reconnect_attempts = 0;
1361 0           clear_connection_params(self);
1362 0           self->path = savepv(path);
1363              
1364 0           memset(&opts, 0, sizeof(opts));
1365 0           pre_connect_common(self, &opts);
1366 0           REDIS_OPTIONS_SET_UNIX(&opts, path);
1367 0           self->ac = redisAsyncConnectWithOptions(&opts);
1368 0 0         if (NULL == self->ac) {
1369 0           croak("connect error: cannot allocate memory");
1370             }
1371              
1372 0           (void)post_connect_setup(self, "connect error");
1373             }
1374              
1375             void
1376             disconnect(EV::Redis self);
1377             CODE:
1378             {
1379             /* Stop any pending reconnect timer on explicit disconnect */
1380 0           self->intentional_disconnect = 1;
1381 0           stop_reconnect_timer(self);
1382 0           self->reconnect_attempts = 0;
1383              
1384 0 0         if (NULL == self->ac) {
1385             /* Already disconnected — still stop waiting timer and clear
1386             * wait queue (e.g., resume_waiting_on_reconnect kept them alive
1387             * after a connection drop, but user now explicitly disconnects). */
1388 0           stop_waiting_timer(self);
1389 0 0         if (!ngx_queue_empty(&self->wait_queue)) {
1390 0           self->callback_depth++;
1391 0           clear_wait_queue_sv(self, err_disconnected);
1392 0           self->callback_depth--;
1393 0           check_destroyed(self);
1394             }
1395 0           return;
1396             }
1397             /* Save ac pointer for deferred disconnect: when inside a hiredis
1398             * callback, redisAsyncDisconnect only sets REDIS_DISCONNECTING and
1399             * returns. DESTROY needs ac_saved to NULL ac->data if the Perl object
1400             * is freed before the deferred disconnect completes.
1401             * Only set when REDIS_IN_CALLBACK: in the synchronous path,
1402             * disconnect_cb fires during redisAsyncDisconnect and clears ac_saved;
1403             * but if DESTROY fires nested during that processing (SvREFCNT_dec
1404             * dropping last ref), it would NULL ac->data, causing disconnect_cb to
1405             * skip cleanup, leaving ac_saved dangling after ac is freed. */
1406 0 0         if (self->ac->c.flags & REDIS_IN_CALLBACK) {
1407 0           self->ac_saved = self->ac;
1408             }
1409             /* Protect against Safefree(self) if disconnect_cb fires synchronously
1410             * and user's on_disconnect handler drops the last Perl reference. */
1411 0           self->callback_depth++;
1412 0           redisAsyncDisconnect(self->ac);
1413 0           self->ac = NULL;
1414 0           self->callback_depth--;
1415 0 0         if (check_destroyed(self)) return;
1416             }
1417              
1418             int
1419             is_connected(EV::Redis self);
1420             CODE:
1421             {
1422 0 0         RETVAL = (NULL != self->ac) ? 1 : 0;
1423             }
1424             OUTPUT:
1425             RETVAL
1426              
1427             SV*
1428             connect_timeout(EV::Redis self, SV* timeout_ms = NULL);
1429             CODE:
1430             {
1431 0           RETVAL = timeout_accessor(&self->connect_timeout, timeout_ms, "connect_timeout");
1432             }
1433             OUTPUT:
1434             RETVAL
1435              
1436             SV*
1437             command_timeout(EV::Redis self, SV* timeout_ms = NULL);
1438             CODE:
1439             {
1440 0           RETVAL = timeout_accessor(&self->command_timeout, timeout_ms, "command_timeout");
1441             /* Apply to active connection immediately */
1442 0 0         if (NULL != timeout_ms && SvOK(timeout_ms) && NULL != self->ac && NULL != self->command_timeout) {
    0          
    0          
    0          
1443 0           redisAsyncSetTimeout(self->ac, *self->command_timeout);
1444             }
1445             }
1446             OUTPUT:
1447             RETVAL
1448              
1449             SV*
1450             on_error(EV::Redis self, SV* handler = NULL);
1451             CODE:
1452             {
1453 6           RETVAL = handler_accessor(&self->error_handler, handler, items > 1);
1454             }
1455             OUTPUT:
1456             RETVAL
1457              
1458             SV*
1459             on_connect(EV::Redis self, SV* handler = NULL);
1460             CODE:
1461             {
1462 0           RETVAL = handler_accessor(&self->connect_handler, handler, items > 1);
1463             }
1464             OUTPUT:
1465             RETVAL
1466              
1467             SV*
1468             on_disconnect(EV::Redis self, SV* handler = NULL);
1469             CODE:
1470             {
1471 0           RETVAL = handler_accessor(&self->disconnect_handler, handler, items > 1);
1472             }
1473             OUTPUT:
1474             RETVAL
1475              
1476             SV*
1477             on_push(EV::Redis self, SV* handler = NULL);
1478             CODE:
1479             {
1480 0           RETVAL = handler_accessor(&self->push_handler, handler, items > 1);
1481             /* Sync push callback with hiredis if connected */
1482 0 0         if (NULL != self->ac) {
1483 0 0         if (NULL != self->push_handler) {
1484 0           redisAsyncSetPushCallback(self->ac, EV__redis_push_cb);
1485             } else {
1486 0           redisAsyncSetPushCallback(self->ac, NULL);
1487             }
1488             }
1489             }
1490             OUTPUT:
1491             RETVAL
1492              
1493             int
1494             command(EV::Redis self, ...);
1495             PREINIT:
1496             SV* cb;
1497             char** argv;
1498             size_t* argvlen;
1499             STRLEN len;
1500             int argc, i, persist;
1501             ev_redis_cb_t* cbt;
1502             ev_redis_wait_t* wt;
1503             char* p;
1504             CODE:
1505             {
1506 1 50         if (items < 2) {
1507 0           croak("Usage: command(\"command\", ..., [$callback])");
1508             }
1509              
1510 1           cb = ST(items - 1);
1511 1 50         if (SvROK(cb) && SvTYPE(SvRV(cb)) == SVt_PVCV) {
    50          
1512 1           argc = items - 2; /* last arg is callback */
1513             }
1514             else {
1515 0           cb = NULL; /* fire-and-forget: no callback */
1516 0           argc = items - 1;
1517             }
1518              
1519 1 50         if (argc < 1) {
1520 0           croak("Usage: command(\"command\", ..., [$callback])");
1521             }
1522              
1523 1 50         if (NULL == self->ac) {
1524 1 50         if (!self->reconnect_timer_active) {
1525 1           croak("connection required before calling command");
1526             }
1527             /* Reconnect in progress — fall through to queue in wait_queue */
1528             }
1529 0           Newx(argv, argc, char*);
1530 0           SAVEFREEPV(argv);
1531 0           Newx(argvlen, argc, size_t);
1532 0           SAVEFREEPV(argvlen);
1533              
1534 0 0         for (i = 0; i < argc; i++) {
1535 0           argv[i] = SvPV(ST(i + 1), len);
1536 0           argvlen[i] = len;
1537             }
1538              
1539 0           persist = is_persistent_command(argv[0]);
1540              
1541 0 0         if (NULL == self->ac ||
1542 0 0         (self->max_pending > 0 && self->pending_count >= self->max_pending)) {
    0          
1543 0           Newx(wt, 1, ev_redis_wait_t);
1544 0           Newx(wt->argv, argc, char*);
1545 0           Newx(wt->argvlen, argc, size_t);
1546 0 0         for (i = 0; i < argc; i++) {
1547 0           Newx(p, argvlen[i] + 1, char);
1548 0           Copy(argv[i], p, argvlen[i], char);
1549 0           p[argvlen[i]] = '\0';
1550 0           wt->argv[i] = p;
1551 0           wt->argvlen[i] = argvlen[i];
1552             }
1553 0           wt->argc = argc;
1554 0           wt->cb = SvREFCNT_inc(cb);
1555 0           wt->persist = persist;
1556             /* Refresh ev_now: command() may be called outside an ev_run iteration
1557             * (e.g. during initial setup), where the cached time is stale. Without
1558             * this, queued_at reflects an old time base and a later expire check
1559             * against the up-to-date ev_now would compute an inflated elapsed. */
1560 0           ev_now_update(self->loop);
1561 0           wt->queued_at = ev_now(self->loop);
1562 0           ngx_queue_init(&wt->queue);
1563 0           ngx_queue_insert_tail(&self->wait_queue, &wt->queue);
1564 0           self->waiting_count++;
1565 0           schedule_waiting_timer(self);
1566 0           RETVAL = REDIS_OK;
1567             }
1568             else {
1569 0           Newx(cbt, 1, ev_redis_cb_t);
1570 0           cbt->cb = SvREFCNT_inc(cb);
1571 0           cbt->skipped = 0;
1572 0           cbt->persist = persist;
1573 0 0         cbt->sub_count = persist ? argc - 1 : 0;
1574 0           ngx_queue_init(&cbt->queue);
1575 0           ngx_queue_insert_tail(&self->cb_queue, &cbt->queue);
1576 0 0         if (!persist) self->pending_count++;
1577              
1578 0           RETVAL = submit_to_redis(self, cbt,
1579             argc, (const char**)argv, argvlen);
1580             }
1581             }
1582             OUTPUT:
1583             RETVAL
1584              
1585             void
1586             reconnect(EV::Redis self, int enable, int delay_ms = 1000, int max_attempts = 0);
1587             CODE:
1588             {
1589 0           validate_timeout_ms(delay_ms, "reconnect_delay");
1590 0           self->reconnect = enable ? 1 : 0;
1591 0           self->reconnect_delay_ms = delay_ms;
1592 0           self->max_reconnect_attempts = max_attempts >= 0 ? max_attempts : 0;
1593 0           self->reconnect_attempts = 0;
1594              
1595 0 0         if (!enable) {
1596 0           stop_reconnect_timer(self);
1597             }
1598             }
1599              
1600             int
1601             reconnect_enabled(EV::Redis self);
1602             CODE:
1603             {
1604 0 0         RETVAL = self->reconnect;
1605             }
1606             OUTPUT:
1607             RETVAL
1608              
1609             int
1610             pending_count(EV::Redis self);
1611             CODE:
1612             {
1613 0 0         RETVAL = self->pending_count;
1614             }
1615             OUTPUT:
1616             RETVAL
1617              
1618             int
1619             waiting_count(EV::Redis self);
1620             CODE:
1621             {
1622 0 0         RETVAL = self->waiting_count;
1623             }
1624             OUTPUT:
1625             RETVAL
1626              
1627             int
1628             max_pending(EV::Redis self, SV* limit = NULL);
1629             CODE:
1630             {
1631 0 0         if (NULL != limit && SvOK(limit)) {
    0          
1632 0           int val = SvIV(limit);
1633 0 0         if (val < 0) {
1634 0           croak("max_pending must be non-negative");
1635             }
1636 0           self->max_pending = val;
1637              
1638             /* When limit is increased or removed, send waiting commands.
1639             * callback_depth protects against DESTROY if a failed command's
1640             * error callback drops the last Perl reference to self. */
1641 0           self->callback_depth++;
1642 0           send_next_waiting(self);
1643 0           self->callback_depth--;
1644 0 0         if (check_destroyed(self)) XSRETURN_IV(0);
1645             }
1646 0 0         RETVAL = self->max_pending;
1647             }
1648             OUTPUT:
1649             RETVAL
1650              
1651             SV*
1652             waiting_timeout(EV::Redis self, SV* timeout_ms = NULL);
1653             CODE:
1654             {
1655 0 0         if (NULL != timeout_ms && SvOK(timeout_ms)) {
    0          
1656 0           IV ms = SvIV(timeout_ms);
1657 0           validate_timeout_ms(ms, "waiting_timeout");
1658 0           self->waiting_timeout_ms = (int)ms;
1659 0           schedule_waiting_timer(self);
1660             }
1661              
1662 0           RETVAL = newSViv((IV)self->waiting_timeout_ms);
1663             }
1664             OUTPUT:
1665             RETVAL
1666              
1667             int
1668             resume_waiting_on_reconnect(EV::Redis self, SV* value = NULL);
1669             CODE:
1670             {
1671 0 0         if (NULL != value && SvOK(value)) {
    0          
1672 0           self->resume_waiting_on_reconnect = SvTRUE(value) ? 1 : 0;
1673             }
1674 0 0         RETVAL = self->resume_waiting_on_reconnect;
1675             }
1676             OUTPUT:
1677             RETVAL
1678              
1679             int
1680             priority(EV::Redis self, SV* value = NULL);
1681             CODE:
1682             {
1683 0 0         if (NULL != value && SvOK(value)) {
    0          
1684 0           int prio = SvIV(value);
1685 0 0         if (prio < EV_MINPRI) prio = EV_MINPRI;
1686 0 0         if (prio > EV_MAXPRI) prio = EV_MAXPRI;
1687 0           self->priority = prio;
1688 0 0         if (NULL != self->ac) {
1689 0           redisLibevSetPriority(self->ac, prio);
1690             }
1691             }
1692 0 0         RETVAL = self->priority;
1693             }
1694             OUTPUT:
1695             RETVAL
1696              
1697             int
1698             keepalive(EV::Redis self, SV* value = NULL);
1699             CODE:
1700             {
1701 0 0         if (NULL != value && SvOK(value)) {
    0          
1702 0           int interval = SvIV(value);
1703 0 0         if (interval < 0) croak("keepalive interval must be non-negative");
1704 0 0         if (interval > MAX_TIMEOUT_MS / 1000) croak("keepalive interval too large");
1705 0           self->keepalive = interval;
1706 0 0         if (NULL != self->ac && interval > 0) {
    0          
1707 0           redisEnableKeepAliveWithInterval(&self->ac->c, interval);
1708             }
1709             }
1710 0 0         RETVAL = self->keepalive;
1711             }
1712             OUTPUT:
1713             RETVAL
1714              
1715             int
1716             prefer_ipv4(EV::Redis self, SV* value = NULL);
1717             CODE:
1718             {
1719 0 0         if (NULL != value && SvOK(value)) {
    0          
1720 0           self->prefer_ipv4 = SvTRUE(value) ? 1 : 0;
1721 0 0         if (self->prefer_ipv4) self->prefer_ipv6 = 0;
1722             }
1723 0 0         RETVAL = self->prefer_ipv4;
1724             }
1725             OUTPUT:
1726             RETVAL
1727              
1728             int
1729             prefer_ipv6(EV::Redis self, SV* value = NULL);
1730             CODE:
1731             {
1732 0 0         if (NULL != value && SvOK(value)) {
    0          
1733 0           self->prefer_ipv6 = SvTRUE(value) ? 1 : 0;
1734 0 0         if (self->prefer_ipv6) self->prefer_ipv4 = 0;
1735             }
1736 0 0         RETVAL = self->prefer_ipv6;
1737             }
1738             OUTPUT:
1739             RETVAL
1740              
1741             SV*
1742             source_addr(EV::Redis self, SV* value = NULL);
1743             CODE:
1744             {
1745 0 0         if (items > 1) {
1746 0 0         if (NULL != self->source_addr) {
1747 0           Safefree(self->source_addr);
1748 0           self->source_addr = NULL;
1749             }
1750 0 0         if (NULL != value && SvOK(value)) {
    0          
1751 0           self->source_addr = savepv(SvPV_nolen(value));
1752             }
1753             }
1754 0 0         if (NULL != self->source_addr) {
1755 0           RETVAL = newSVpv(self->source_addr, 0);
1756             } else {
1757 0           RETVAL = &PL_sv_undef;
1758             }
1759             }
1760             OUTPUT:
1761             RETVAL
1762              
1763             unsigned int
1764             tcp_user_timeout(EV::Redis self, SV* value = NULL);
1765             CODE:
1766             {
1767 0 0         if (NULL != value && SvOK(value)) {
    0          
1768 0           IV ms = SvIV(value);
1769 0           validate_timeout_ms(ms, "tcp_user_timeout");
1770 0           self->tcp_user_timeout = (unsigned int)ms;
1771             }
1772 0 0         RETVAL = self->tcp_user_timeout;
1773             }
1774             OUTPUT:
1775             RETVAL
1776              
1777             int
1778             cloexec(EV::Redis self, SV* value = NULL);
1779             CODE:
1780             {
1781 0 0         if (NULL != value && SvOK(value)) {
    0          
1782 0           self->cloexec = SvTRUE(value) ? 1 : 0;
1783             }
1784 0 0         RETVAL = self->cloexec;
1785             }
1786             OUTPUT:
1787             RETVAL
1788              
1789             int
1790             reuseaddr(EV::Redis self, SV* value = NULL);
1791             CODE:
1792             {
1793 0 0         if (NULL != value && SvOK(value)) {
    0          
1794 0           self->reuseaddr = SvTRUE(value) ? 1 : 0;
1795             }
1796 0 0         RETVAL = self->reuseaddr;
1797             }
1798             OUTPUT:
1799             RETVAL
1800              
1801             void
1802             skip_waiting(EV::Redis self);
1803             CODE:
1804             {
1805             /* Protect self from destruction during queue iteration */
1806 0           self->callback_depth++;
1807              
1808             /* If cleanup is already in progress (e.g., during expire_waiting_commands
1809             * or disconnect callback), don't modify the wait_queue. */
1810 0 0         if (self->in_wait_cleanup) {
1811 0           self->callback_depth--;
1812 0           check_destroyed(self);
1813 0           return;
1814             }
1815              
1816 0           clear_wait_queue_sv(self, err_skipped);
1817 0           stop_waiting_timer(self);
1818              
1819 0           self->callback_depth--;
1820 0           check_destroyed(self);
1821             }
1822              
1823             void
1824             skip_pending(EV::Redis self);
1825             CODE:
1826             {
1827             ngx_queue_t local_queue;
1828             ngx_queue_t* q;
1829             ev_redis_cb_t* cbt;
1830              
1831             /* Protect self from destruction during queue iteration */
1832 0           self->callback_depth++;
1833              
1834             /* Always attempt to clear waiting queue (handles its own re-entrancy) */
1835 0           clear_wait_queue_sv(self, err_skipped);
1836 0           stop_waiting_timer(self);
1837              
1838             /* If cb_queue cleanup is already in progress, stop here. */
1839 0 0         if (self->in_cb_cleanup) {
1840 0           self->callback_depth--;
1841 0           check_destroyed(self);
1842 0           return;
1843             }
1844              
1845             /* Protect cb_queue iteration from re-entrancy. If a user callback
1846             * calls skip_pending() again, the in_cb_cleanup check above will return. */
1847 0           self->in_cb_cleanup = 1;
1848              
1849 0           ngx_queue_init(&local_queue);
1850 0 0         while (!ngx_queue_empty(&self->cb_queue)) {
1851 0           q = ngx_queue_head(&self->cb_queue);
1852 0           cbt = ngx_queue_data(q, ev_redis_cb_t, queue);
1853              
1854 0 0         if (cbt == self->current_cb) {
1855             /* If current_cb is at head — if it's the only item, we're done */
1856 0 0         if (ngx_queue_next(q) == ngx_queue_sentinel(&self->cb_queue)) {
1857 0           break;
1858             }
1859 0           q = ngx_queue_next(q);
1860 0           cbt = ngx_queue_data(q, ev_redis_cb_t, queue);
1861             }
1862              
1863 0           ngx_queue_remove(q);
1864 0           ngx_queue_insert_tail(&local_queue, q);
1865             }
1866              
1867 0 0         while (!ngx_queue_empty(&local_queue)) {
1868 0 0         if (self->magic == EV_REDIS_FREED) {
1869 0           break;
1870             }
1871              
1872 0           q = ngx_queue_head(&local_queue);
1873 0           cbt = ngx_queue_data(q, ev_redis_cb_t, queue);
1874 0           ngx_queue_remove(q);
1875              
1876             /* Mark as skipped FIRST to prevent double callback invocation if
1877             * invoke_callback_error re-enters the event loop. */
1878 0           cbt->skipped = 1;
1879              
1880             /* Re-initialize queue node so any subsequent remove (from reply_cb's
1881             * skipped path on re-entry) is safe. */
1882 0           ngx_queue_init(q);
1883 0 0         if (!cbt->persist) self->pending_count--;
1884              
1885             /* Save and clear callback BEFORE invoking — if the user callback
1886             * re-enters and a Redis reply arrives, reply_cb sees skipped=1
1887             * and frees cbt. Clearing cb first avoids use-after-free. */
1888 0 0         if (NULL != cbt->cb) {
1889 0           SV* cb_to_invoke = cbt->cb;
1890 0           cbt->cb = NULL;
1891              
1892 0           invoke_callback_error(cb_to_invoke, err_skipped);
1893 0           SvREFCNT_dec(cb_to_invoke);
1894             }
1895             }
1896              
1897 0           self->in_cb_cleanup = 0;
1898              
1899 0           self->callback_depth--;
1900 0           check_destroyed(self);
1901             }
1902              
1903             int
1904             has_ssl(char* class);
1905             CODE:
1906             {
1907             PERL_UNUSED_VAR(class);
1908             #ifdef EV_REDIS_SSL
1909 4 50         RETVAL = 1;
1910             #else
1911             RETVAL = 0;
1912             #endif
1913             }
1914             OUTPUT:
1915             RETVAL
1916              
1917             #ifdef EV_REDIS_SSL
1918              
1919             void
1920             _setup_ssl_context(EV::Redis self, SV* cacert, SV* capath, SV* cert, SV* key, SV* server_name, int verify = 1);
1921             CODE:
1922             {
1923 4           redisSSLContextError ssl_error = REDIS_SSL_CTX_NONE;
1924             redisSSLOptions ssl_opts;
1925              
1926 4           memset(&ssl_opts, 0, sizeof(ssl_opts));
1927 4 100         ssl_opts.cacert_filename = (SvOK(cacert)) ? SvPV_nolen(cacert) : NULL;
1928 4 100         ssl_opts.capath = (SvOK(capath)) ? SvPV_nolen(capath) : NULL;
1929 4 100         ssl_opts.cert_filename = (SvOK(cert)) ? SvPV_nolen(cert) : NULL;
1930 4 50         ssl_opts.private_key_filename = (SvOK(key)) ? SvPV_nolen(key) : NULL;
1931 4 50         ssl_opts.server_name = (SvOK(server_name)) ? SvPV_nolen(server_name) : NULL;
1932 4           ssl_opts.verify_mode = verify ? REDIS_SSL_VERIFY_PEER : REDIS_SSL_VERIFY_NONE;
1933              
1934 4 50         if (NULL != self->ssl_ctx) {
1935 0           redisFreeSSLContext(self->ssl_ctx);
1936 0           self->ssl_ctx = NULL;
1937             }
1938              
1939 4           self->ssl_ctx = redisCreateSSLContextWithOptions(&ssl_opts, &ssl_error);
1940              
1941 4 100         if (NULL == self->ssl_ctx) {
1942 2           croak("SSL context creation failed: %s", redisSSLContextGetError(ssl_error));
1943             }
1944             }
1945              
1946             #endif