File Coverage

Redis.xs
Criterion Covered Total %
statement 91 881 10.3
branch 54 724 7.4
condition n/a
subroutine n/a
pod n/a
total 145 1605 9.0


line stmt bran cond sub pod time code
1             #include "EXTERN.h"
2             #include "perl.h"
3             #include "XSUB.h"
4             #include "ppport.h"
5              
6             #include "EVAPI.h"
7              
8             #include "hiredis.h"
9             #include "async.h"
10             #include "libev_adapter.h"
11             #include "ngx-queue.h"
12              
13             #ifdef EV_REDIS_SSL
14             #include "hiredis_ssl.h"
15             #endif
16              
17             typedef struct ev_redis_s ev_redis_t;
18             typedef struct ev_redis_cb_s ev_redis_cb_t;
19             typedef struct ev_redis_wait_s ev_redis_wait_t;
20              
21             typedef ev_redis_t* EV__Redis;
22             typedef struct ev_loop* EV__Loop;
23              
24             #define EV_REDIS_MAGIC 0xDEADBEEF
25             #define EV_REDIS_FREED 0xFEEDFACE
26              
27             #define CLEAR_HANDLER(field) \
28             do { if (NULL != (field)) { SvREFCNT_dec(field); (field) = NULL; } } while(0)
29              
30             struct ev_redis_s {
31             unsigned int magic; /* Set to EV_REDIS_MAGIC when alive */
32             struct ev_loop* loop;
33             redisAsyncContext* ac;
34             SV* error_handler;
35             SV* connect_handler;
36             SV* disconnect_handler;
37             SV* push_handler;
38             struct timeval* connect_timeout;
39             struct timeval* command_timeout;
40             ngx_queue_t cb_queue;
41             ngx_queue_t wait_queue;
42             int pending_count;
43             int waiting_count;
44             int max_pending; /* 0 = unlimited */
45             ev_redis_cb_t* current_cb; /* callback currently executing */
46             int resume_waiting_on_reconnect; /* keep waiting queue on disconnect */
47             int waiting_timeout_ms; /* max ms in waiting queue, 0 = unlimited */
48             ev_timer waiting_timer;
49             int waiting_timer_active;
50              
51             /* Reconnect settings */
52             char* host;
53             int port;
54             char* path;
55             int reconnect; /* 0 = disabled, 1 = enabled */
56             int reconnect_delay_ms; /* delay between reconnect attempts */
57             int max_reconnect_attempts; /* 0 = unlimited */
58             int reconnect_attempts; /* current attempt count */
59             ev_timer reconnect_timer;
60             int reconnect_timer_active;
61             int intentional_disconnect; /* set before explicit disconnect() */
62             int priority; /* libev watcher priority, default 0 */
63             int in_cb_cleanup; /* prevent re-entrant cb_queue modification */
64             int in_wait_cleanup; /* prevent re-entrant wait_queue modification */
65             int callback_depth; /* nesting depth of C-level callbacks invoking Perl code */
66             int keepalive; /* TCP keepalive interval in seconds, 0 = disabled */
67             int prefer_ipv4; /* prefer IPv4 DNS resolution */
68             int prefer_ipv6; /* prefer IPv6 DNS resolution */
69             char* source_addr; /* local address to bind to */
70             unsigned int tcp_user_timeout; /* TCP_USER_TIMEOUT in ms, 0 = OS default */
71             int cloexec; /* set SOCK_CLOEXEC on socket */
72             int reuseaddr; /* set SO_REUSEADDR on socket */
73             redisAsyncContext* ac_saved; /* saved ac pointer for deferred disconnect cleanup */
74             #ifdef EV_REDIS_SSL
75             redisSSLContext* ssl_ctx;
76             #endif
77             };
78              
79             struct ev_redis_cb_s {
80             SV* cb;
81             ngx_queue_t queue;
82             int persist;
83             int skipped;
84             int sub_count; /* subscription channels remaining (for persistent commands) */
85             };
86              
87             struct ev_redis_wait_s {
88             char** argv;
89             size_t* argvlen;
90             int argc;
91             SV* cb;
92             int persist;
93             ngx_queue_t queue;
94             ev_tstamp queued_at;
95             };
96              
97             /* Shared error strings (initialized in BOOT) */
98             static SV* err_skipped = NULL;
99             static SV* err_waiting_timeout = NULL;
100             static SV* err_disconnected = NULL;
101              
102             /* Check for unsubscribe-family commands. These are persistent (stay in cb_queue)
103             * but hiredis ignores their callbacks — replies go through the subscribe callback. */
104 0           static int is_unsubscribe_command(const char* cmd) {
105 0           char c = cmd[0];
106 0 0         if (c == 'u' || c == 'U') return (0 == strcasecmp(cmd, "unsubscribe"));
    0          
107 0 0         if (c == 'p' || c == 'P') return (0 == strcasecmp(cmd, "punsubscribe"));
    0          
108 0 0         if (c == 's' || c == 'S') return (0 == strcasecmp(cmd, "sunsubscribe"));
    0          
109 0           return 0;
110             }
111              
112 0           static int is_persistent_command(const char* cmd) {
113 0           char c = cmd[0];
114              
115 0 0         if (c == 's' || c == 'S') {
    0          
116 0 0         if (0 == strcasecmp(cmd, "subscribe")) return 1;
117 0 0         if (0 == strcasecmp(cmd, "ssubscribe")) return 1;
118 0 0         if (0 == strcasecmp(cmd, "sunsubscribe")) return 1;
119 0           return 0;
120             }
121 0 0         if (c == 'u' || c == 'U') {
    0          
122 0           return (0 == strcasecmp(cmd, "unsubscribe"));
123             }
124 0 0         if (c == 'p' || c == 'P') {
    0          
125 0 0         if (0 == strcasecmp(cmd, "psubscribe")) return 1;
126 0 0         if (0 == strcasecmp(cmd, "punsubscribe")) return 1;
127 0           return 0;
128             }
129 0 0         if (c == 'm' || c == 'M') {
    0          
130 0           return (0 == strcasecmp(cmd, "monitor"));
131             }
132              
133 0           return 0;
134             }
135              
136             /* Detect unsubscribe-type replies that indicate end of a subscription channel.
137             * Format: [type_string, channel, remaining_count]
138             * Returns 1 if the reply is an unsubscribe/punsubscribe/sunsubscribe message. */
139 0           static int is_unsub_reply(redisReply* reply) {
140             const char* s;
141              
142 0 0         if (reply->type != REDIS_REPLY_ARRAY && reply->type != REDIS_REPLY_PUSH) return 0;
    0          
143 0 0         if (reply->elements < 3) return 0;
144 0 0         if (NULL == reply->element[0]) return 0;
145 0 0         if (reply->element[0]->type != REDIS_REPLY_STRING &&
146 0 0         reply->element[0]->type != REDIS_REPLY_STATUS) return 0;
147              
148 0           s = reply->element[0]->str;
149 0 0         if (s[0] == 'u' || s[0] == 'U') return (0 == strcasecmp(s, "unsubscribe"));
    0          
150 0 0         if (s[0] == 'p' || s[0] == 'P') return (0 == strcasecmp(s, "punsubscribe"));
    0          
151 0 0         if (s[0] == 's' || s[0] == 'S') return (0 == strcasecmp(s, "sunsubscribe"));
    0          
152 0           return 0;
153             }
154              
155 0           static void emit_error(EV__Redis self, SV* error) {
156 0 0         if (NULL == self->error_handler) return;
157              
158 0           dSP;
159              
160 0           ENTER;
161 0           SAVETMPS;
162              
163 0 0         PUSHMARK(SP);
164 0 0         XPUSHs(error);
165 0           PUTBACK;
166              
167 0           call_sv(self->error_handler, G_DISCARD | G_EVAL);
168 0 0         if (SvTRUE(ERRSV)) {
    0          
169 0 0         warn("EV::Redis: exception in error handler: %s", SvPV_nolen(ERRSV));
170             }
171              
172 0 0         FREETMPS;
173 0           LEAVE;
174             }
175              
176 0           static void emit_error_str(EV__Redis self, const char* error) {
177 0 0         if (NULL == self->error_handler) return;
178 0           emit_error(self, sv_2mortal(newSVpv(error, 0)));
179             }
180              
181 0           static void invoke_callback_error(SV* cb, SV* error_sv) {
182 0           dSP;
183 0           ENTER;
184 0           SAVETMPS;
185 0 0         PUSHMARK(SP);
186 0 0         EXTEND(SP, 2);
187 0           PUSHs(&PL_sv_undef);
188 0           PUSHs(error_sv);
189 0           PUTBACK;
190 0           call_sv(cb, G_DISCARD | G_EVAL);
191 0 0         if (SvTRUE(ERRSV)) {
    0          
192 0 0         warn("EV::Redis: exception in command callback: %s", SvPV_nolen(ERRSV));
193             }
194 0 0         FREETMPS;
195 0           LEAVE;
196 0           }
197              
198             /* Check if DESTROY was called during a callback and deferred Safefree.
199             * Call after decrementing callback_depth. Returns 1 if self was freed
200             * (caller MUST NOT access self afterward). */
201 0           static int check_destroyed(EV__Redis self) {
202 0 0         if (self->magic == EV_REDIS_FREED &&
203 0 0         self->callback_depth == 0 &&
204 0 0         self->current_cb == NULL) {
205 0           Safefree(self);
206 0           return 1;
207             }
208 0           return 0;
209             }
210              
211             /* Free C-allocated fields (used by both PL_dirty and normal DESTROY paths) */
212 6           static void free_c_fields(EV__Redis self) {
213 6 50         if (NULL != self->host) { Safefree(self->host); self->host = NULL; }
214 6 50         if (NULL != self->path) { Safefree(self->path); self->path = NULL; }
215 6 50         if (NULL != self->source_addr) { Safefree(self->source_addr); self->source_addr = NULL; }
216 6 50         if (NULL != self->connect_timeout) { Safefree(self->connect_timeout); self->connect_timeout = NULL; }
217 6 50         if (NULL != self->command_timeout) { Safefree(self->command_timeout); self->command_timeout = NULL; }
218             #ifdef EV_REDIS_SSL
219 6 100         if (NULL != self->ssl_ctx) { redisFreeSSLContext(self->ssl_ctx); self->ssl_ctx = NULL; }
220             #endif
221 6           }
222              
223 6           static void stop_waiting_timer(EV__Redis self) {
224 6 50         if (self->waiting_timer_active && NULL != self->loop && !PL_dirty) {
    0          
    0          
225 0           ev_timer_stop(self->loop, &self->waiting_timer);
226 0           self->waiting_timer_active = 0;
227             }
228 6           }
229              
230 6           static void stop_reconnect_timer(EV__Redis self) {
231 6 50         if (self->reconnect_timer_active && NULL != self->loop && !PL_dirty) {
    0          
    0          
232 0           ev_timer_stop(self->loop, &self->reconnect_timer);
233 0           self->reconnect_timer_active = 0;
234             }
235 6           }
236              
237             /* Maximum timeout: ~23 days (fits safely in 32-bit calculations) */
238             #define MAX_TIMEOUT_MS 2000000000
239              
240 0           static void validate_timeout_ms(IV ms, const char* name) {
241 0 0         if (ms < 0) croak("%s must be non-negative", name);
242 0 0         if (ms > MAX_TIMEOUT_MS) croak("%s too large (max %d ms)", name, MAX_TIMEOUT_MS);
243 0           }
244              
245 0           static SV* timeout_accessor(struct timeval** tv_ptr, SV* timeout_ms, const char* name) {
246 0 0         if (NULL != timeout_ms && SvOK(timeout_ms)) {
    0          
247 0           IV ms = SvIV(timeout_ms);
248 0           validate_timeout_ms(ms, name);
249 0 0         if (NULL == *tv_ptr) {
250 0           Newx(*tv_ptr, 1, struct timeval);
251             }
252 0           (*tv_ptr)->tv_sec = (long)(ms / 1000);
253 0           (*tv_ptr)->tv_usec = (long)((ms % 1000) * 1000);
254             }
255              
256 0 0         if (NULL != *tv_ptr) {
257 0           return newSViv((IV)(*tv_ptr)->tv_sec * 1000 + (*tv_ptr)->tv_usec / 1000);
258             }
259 0           return &PL_sv_undef;
260             }
261              
262             /* Helper to set/clear a callback handler field.
263             * If called without handler (items == 1), clears the handler.
264             * If called with handler, sets it (or clears if handler is undef/not CODE).
265             * Returns the current handler (with refcount incremented) or undef. */
266 6           static SV* handler_accessor(SV** handler_ptr, SV* handler, int has_handler_arg) {
267             /* Clear existing handler first - both no-arg calls and set calls clear first */
268 6 50         if (NULL != *handler_ptr) {
269 0           SvREFCNT_dec(*handler_ptr);
270 0           *handler_ptr = NULL;
271             }
272              
273             /* If a handler argument was provided and it's a valid CODE ref, set it */
274 6 50         if (has_handler_arg && NULL != handler && SvOK(handler) && SvROK(handler) &&
    50          
    50          
    50          
275 6 50         SvTYPE(SvRV(handler)) == SVt_PVCV) {
276 6           *handler_ptr = SvREFCNT_inc(handler);
277             }
278              
279 6           return (NULL != *handler_ptr)
280 6           ? SvREFCNT_inc(*handler_ptr)
281 12 50         : &PL_sv_undef;
282             }
283              
284             /* Uses in_cb_cleanup flag to prevent re-entrant queue modification from
285             * user callbacks (e.g., if callback calls skip_pending). */
286 6           static void remove_cb_queue_sv(EV__Redis self, SV* error_sv) {
287             ngx_queue_t* q;
288             ev_redis_cb_t* cbt;
289              
290 6 50         if (self->in_cb_cleanup) {
291 0           return;
292             }
293              
294 6           self->in_cb_cleanup = 1;
295              
296             /* Use while loop with re-fetch of head each iteration.
297             * This is safe against re-entrant modifications because we
298             * re-check the queue state after each callback invocation. */
299 6 50         while (!ngx_queue_empty(&self->cb_queue)) {
300 0           q = ngx_queue_head(&self->cb_queue);
301 0           cbt = ngx_queue_data(q, ev_redis_cb_t, queue);
302              
303 0 0         if (cbt == self->current_cb) {
304             /* Skip current_cb - it is owned by an in-flight reply_cb. */
305 0 0         if (ngx_queue_next(q) == ngx_queue_sentinel(&self->cb_queue)) {
306 0           break;
307             }
308 0           q = ngx_queue_next(q);
309 0           cbt = ngx_queue_data(q, ev_redis_cb_t, queue);
310             }
311              
312 0           ngx_queue_remove(q);
313 0 0         if (!cbt->persist) self->pending_count--;
314              
315 0 0         if (NULL != cbt->cb) {
316 0 0         if (NULL != error_sv) {
317 0           invoke_callback_error(cbt->cb, error_sv);
318             }
319 0           SvREFCNT_dec(cbt->cb);
320             }
321 0           Safefree(cbt);
322             }
323              
324 6           self->in_cb_cleanup = 0;
325             }
326              
327 0           static void free_wait_entry(ev_redis_wait_t* wt) {
328             int i;
329 0 0         for (i = 0; i < wt->argc; i++) {
330 0           Safefree(wt->argv[i]);
331             }
332 0           Safefree(wt->argv);
333 0           Safefree(wt->argvlen);
334 0 0         if (NULL != wt->cb) {
335 0           SvREFCNT_dec(wt->cb);
336             }
337 0           Safefree(wt);
338 0           }
339              
340             /* Uses in_wait_cleanup flag to prevent re-entrant queue modification. */
341 6           static void clear_wait_queue_sv(EV__Redis self, SV* error_sv) {
342             ngx_queue_t* q;
343             ev_redis_wait_t* wt;
344              
345 6 50         if (self->in_wait_cleanup) {
346 0           return;
347             }
348              
349             /* Protect against re-entrancy: if a callback invokes skip_waiting() or
350             * skip_pending(), they should no-op since we're already clearing. */
351 6           self->in_wait_cleanup = 1;
352              
353 6 50         while (!ngx_queue_empty(&self->wait_queue)) {
354 0           q = ngx_queue_head(&self->wait_queue);
355 0           wt = ngx_queue_data(q, ev_redis_wait_t, queue);
356 0           ngx_queue_remove(q);
357 0           self->waiting_count--;
358              
359 0 0         if (NULL != error_sv && NULL != wt->cb) {
    0          
360 0           invoke_callback_error(wt->cb, error_sv);
361             }
362              
363 0           free_wait_entry(wt);
364             }
365              
366 6           self->in_wait_cleanup = 0;
367             }
368              
369             /* Forward declarations */
370             static void pre_connect_common(EV__Redis self, redisOptions* opts);
371             static int post_connect_setup(EV__Redis self, const char* err_prefix);
372             static void do_reconnect(EV__Redis self);
373             static void send_next_waiting(EV__Redis self);
374             static void schedule_waiting_timer(EV__Redis self);
375             static void expire_waiting_commands(EV__Redis self);
376             static void schedule_reconnect(EV__Redis self);
377             static void EV__redis_connect_cb(redisAsyncContext* c, int status);
378             static void EV__redis_disconnect_cb(const redisAsyncContext* c, int status);
379             static void EV__redis_push_cb(redisAsyncContext* ac, void* reply_ptr);
380             static SV* EV__redis_decode_reply(redisReply* reply);
381              
382 0           static void clear_connection_params(EV__Redis self) {
383 0 0         if (NULL != self->host) {
384 0           Safefree(self->host);
385 0           self->host = NULL;
386             }
387 0 0         if (NULL != self->path) {
388 0           Safefree(self->path);
389 0           self->path = NULL;
390             }
391 0           }
392              
393 0           static void reconnect_timer_cb(EV_P_ ev_timer* w, int revents) {
394 0           EV__Redis self = (EV__Redis)w->data;
395              
396             (void)loop;
397             (void)revents;
398              
399 0 0         if (NULL == self || self->magic != EV_REDIS_MAGIC) return;
    0          
400              
401 0           self->reconnect_timer_active = 0;
402 0           self->callback_depth++;
403 0           do_reconnect(self);
404 0           self->callback_depth--;
405 0 0         if (check_destroyed(self)) return;
406             }
407              
408 0           static void schedule_reconnect(EV__Redis self) {
409             ev_tstamp delay;
410              
411 0 0         if (!self->reconnect) return;
412 0 0         if (self->intentional_disconnect) return;
413 0 0         if (NULL == self->loop) return;
414 0           stop_reconnect_timer(self);
415 0 0         if (self->max_reconnect_attempts > 0 &&
416 0 0         self->reconnect_attempts >= self->max_reconnect_attempts) {
417             /* Clear waiting queue that was preserved for reconnect - reconnect has
418             * permanently failed, so these commands will never be sent. */
419 0           clear_wait_queue_sv(self, sv_2mortal(newSVpv("reconnect error: max attempts reached", 0)));
420 0           stop_waiting_timer(self);
421 0           emit_error_str(self, "reconnect error: max attempts reached");
422 0           return;
423             }
424              
425 0           self->reconnect_attempts++;
426 0           delay = self->reconnect_delay_ms / 1000.0;
427              
428 0           ev_timer_init(&self->reconnect_timer, reconnect_timer_cb, delay, 0);
429 0           self->reconnect_timer.data = (void*)self;
430 0           ev_timer_start(self->loop, &self->reconnect_timer);
431 0           self->reconnect_timer_active = 1;
432             }
433              
434             /* Expire waiting commands that have exceeded waiting_timeout.
435             * Uses head-refetch iteration pattern which is safe against re-entrant
436             * queue modification (e.g., if a callback calls skip_waiting). */
437 0           static void expire_waiting_commands(EV__Redis self) {
438             ngx_queue_t* q;
439             ev_redis_wait_t* wt;
440             ev_tstamp now;
441             ev_tstamp timeout;
442              
443 0           now = ev_now(self->loop);
444             /* Capture timeout at start - callbacks may modify self->waiting_timeout_ms
445             * and we need consistent behavior for the entire batch. */
446 0           timeout = self->waiting_timeout_ms / 1000.0;
447              
448             /* Use while loop with re-fetch of head each iteration.
449             * This is safe against re-entrant modifications. */
450 0 0         while (!ngx_queue_empty(&self->wait_queue)) {
451 0           q = ngx_queue_head(&self->wait_queue);
452 0           wt = ngx_queue_data(q, ev_redis_wait_t, queue);
453              
454 0 0         if (now - wt->queued_at >= timeout) {
455 0           ngx_queue_remove(q);
456 0           self->waiting_count--;
457              
458 0 0         if (NULL != wt->cb) {
459 0           invoke_callback_error(wt->cb, err_waiting_timeout);
460             }
461              
462 0           free_wait_entry(wt);
463             }
464             else {
465             /* Queue is FIFO with monotonically increasing queued_at times.
466             * If this entry hasn't expired, neither have any following entries. */
467 0           break;
468             }
469             }
470 0           }
471              
472 0           static void waiting_timer_cb(EV_P_ ev_timer* w, int revents) {
473 0           EV__Redis self = (EV__Redis)w->data;
474              
475             (void)loop;
476             (void)revents;
477              
478 0 0         if (NULL == self || self->magic != EV_REDIS_MAGIC) return;
    0          
479              
480 0           self->waiting_timer_active = 0;
481 0           self->callback_depth++;
482 0           expire_waiting_commands(self);
483 0           schedule_waiting_timer(self);
484 0           self->callback_depth--;
485 0 0         if (check_destroyed(self)) return;
486             }
487              
488 0           static void schedule_waiting_timer(EV__Redis self) {
489             ngx_queue_t* q;
490             ev_redis_wait_t* wt;
491             ev_tstamp now, expires_at, delay;
492              
493             /* Use helper which includes NULL loop check */
494 0           stop_waiting_timer(self);
495              
496 0 0         if (NULL == self->loop) return;
497 0 0         if (self->waiting_timeout_ms <= 0) return;
498 0 0         if (ngx_queue_empty(&self->wait_queue)) return;
499              
500 0           q = ngx_queue_head(&self->wait_queue);
501 0           wt = ngx_queue_data(q, ev_redis_wait_t, queue);
502              
503 0           now = ev_now(self->loop);
504 0           expires_at = wt->queued_at + self->waiting_timeout_ms / 1000.0;
505 0           delay = expires_at - now;
506 0 0         if (delay < 0) delay = 0;
507              
508 0           ev_timer_init(&self->waiting_timer, waiting_timer_cb, delay, 0);
509 0           self->waiting_timer.data = (void*)self;
510 0           ev_timer_start(self->loop, &self->waiting_timer);
511 0           self->waiting_timer_active = 1;
512             }
513              
514 0           static void do_reconnect(EV__Redis self) {
515             redisOptions opts;
516 0           memset(&opts, 0, sizeof(opts));
517              
518 0 0         if (NULL == self->loop) {
519             /* Object is being destroyed */
520 0           return;
521             }
522              
523 0 0         if (NULL != self->ac) {
524             /* Already connected or connecting */
525 0           return;
526             }
527              
528 0           self->intentional_disconnect = 0;
529 0           pre_connect_common(self, &opts);
530              
531 0 0         if (NULL != self->path) {
532 0           REDIS_OPTIONS_SET_UNIX(&opts, self->path);
533             }
534 0 0         else if (NULL != self->host) {
535 0           REDIS_OPTIONS_SET_TCP(&opts, self->host, self->port);
536             }
537             else {
538 0           emit_error_str(self, "reconnect error: no connection parameters");
539 0           return;
540             }
541              
542 0           self->ac = redisAsyncConnectWithOptions(&opts);
543 0 0         if (NULL == self->ac) {
544 0           emit_error_str(self, "reconnect error: cannot allocate memory");
545 0           schedule_reconnect(self);
546 0           return;
547             }
548              
549 0 0         if (REDIS_OK != post_connect_setup(self, "reconnect error")) {
550 0           schedule_reconnect(self);
551 0           return;
552             }
553             }
554              
555 0           static void EV__redis_connect_cb(redisAsyncContext* c, int status) {
556 0           EV__Redis self = (EV__Redis)c->data;
557              
558 0 0         if (NULL == self || self->magic != EV_REDIS_MAGIC) return;
    0          
559              
560 0           self->callback_depth++;
561              
562 0 0         if (REDIS_OK != status) {
563 0           self->ac = NULL;
564 0 0         emit_error_str(self, c->errstr[0] ? c->errstr : "connect failed");
565 0 0         if (!self->reconnect || !self->resume_waiting_on_reconnect
    0          
566 0 0         || self->intentional_disconnect) {
567 0 0         clear_wait_queue_sv(self, sv_2mortal(newSVpv(
568             c->errstr[0] ? c->errstr : "connect failed", 0)));
569 0           stop_waiting_timer(self);
570             }
571 0           schedule_reconnect(self);
572             }
573             else {
574 0           self->reconnect_attempts = 0;
575              
576 0 0         if (NULL != self->connect_handler) {
577 0           dSP;
578              
579 0           ENTER;
580 0           SAVETMPS;
581              
582 0 0         PUSHMARK(SP);
583 0           PUTBACK;
584              
585 0           call_sv(self->connect_handler, G_DISCARD | G_EVAL);
586 0 0         if (SvTRUE(ERRSV)) {
    0          
587 0 0         warn("EV::Redis: exception in connect handler: %s", SvPV_nolen(ERRSV));
588             }
589              
590 0 0         FREETMPS;
591 0           LEAVE;
592             }
593              
594 0           send_next_waiting(self);
595             }
596              
597 0           self->callback_depth--;
598 0           check_destroyed(self);
599             }
600              
601 0           static void EV__redis_disconnect_cb(const redisAsyncContext* c, int status) {
602 0           EV__Redis self = (EV__Redis)c->data;
603             SV* error_sv;
604 0           int should_reconnect = 0;
605             int was_intentional;
606             int will_reconnect;
607              
608 0 0         if (NULL == self || self->magic != EV_REDIS_MAGIC) return;
    0          
609              
610             /* Stale disconnect callback: user already established a new connection
611             * (e.g., called disconnect() then connect() before the old deferred
612             * disconnect fired). Old pending callbacks were already processed by
613             * reply_cb. Skip all cleanup to avoid clobbering the new connection.
614             * Clear ac_saved if it points to this old context to prevent dangling. */
615 0 0         if (self->ac != NULL && self->ac != c) {
    0          
616 0 0         if (self->ac_saved == c) self->ac_saved = NULL;
617 0           return;
618             }
619              
620 0           was_intentional = self->intentional_disconnect;
621 0           self->intentional_disconnect = 0;
622              
623 0           self->ac = NULL;
624 0           self->ac_saved = NULL; /* disconnect callback fired normally */
625 0           self->callback_depth++;
626              
627 0 0         if (REDIS_OK == status) {
628 0           error_sv = err_disconnected;
629             }
630             else {
631 0 0         error_sv = sv_2mortal(newSVpv(
632             c->errstr[0] ? c->errstr : "disconnected", 0));
633 0 0         emit_error_str(self, c->errstr[0] ? c->errstr : "disconnected");
634 0 0         if (!was_intentional) {
635 0           should_reconnect = 1;
636             }
637             }
638              
639 0 0         if (NULL != self->disconnect_handler) {
640 0           dSP;
641              
642 0           ENTER;
643 0           SAVETMPS;
644              
645 0 0         PUSHMARK(SP);
646 0           PUTBACK;
647              
648 0           call_sv(self->disconnect_handler, G_DISCARD | G_EVAL);
649 0 0         if (SvTRUE(ERRSV)) {
    0          
650 0 0         warn("EV::Redis: exception in disconnect handler: %s", SvPV_nolen(ERRSV));
651             }
652              
653 0 0         FREETMPS;
654 0           LEAVE;
655              
656             /* Re-check: user's handler might have called connect() or reconnect()
657             * establishing a new ac. If so, skip clearing cb_queue to avoid
658             * freeing new commands. ac_saved is also already handled or NULL. */
659 0 0         if (self->ac != NULL && self->ac != c) {
    0          
660 0           self->callback_depth--;
661 0           check_destroyed(self);
662 0           return;
663             }
664             }
665              
666 0           remove_cb_queue_sv(self, error_sv);
667              
668             /* Clear waiting queue unless we will actually reconnect and
669             * resume_waiting_on_reconnect is enabled. */
670 0 0         will_reconnect = should_reconnect && !self->intentional_disconnect && self->reconnect;
    0          
    0          
671 0 0         if (!self->resume_waiting_on_reconnect || was_intentional || !will_reconnect) {
    0          
    0          
672 0           clear_wait_queue_sv(self, error_sv);
673 0           stop_waiting_timer(self);
674             }
675              
676 0 0         if (will_reconnect) {
677 0           schedule_reconnect(self);
678             }
679              
680 0           self->callback_depth--;
681 0           check_destroyed(self);
682             }
683              
684 0           static void EV__redis_push_cb(redisAsyncContext* ac, void* reply_ptr) {
685 0           EV__Redis self = (EV__Redis)ac->data;
686 0           redisReply* reply = (redisReply*)reply_ptr;
687              
688 0 0         if (NULL == self || self->magic != EV_REDIS_MAGIC) return;
    0          
689 0 0         if (NULL == self->push_handler || NULL == reply) return;
    0          
690              
691 0           self->callback_depth++;
692              
693             {
694 0           dSP;
695              
696 0           ENTER;
697 0           SAVETMPS;
698              
699 0 0         PUSHMARK(SP);
700 0 0         XPUSHs(sv_2mortal(EV__redis_decode_reply(reply)));
701 0           PUTBACK;
702              
703 0           call_sv(self->push_handler, G_DISCARD | G_EVAL);
704 0 0         if (SvTRUE(ERRSV)) {
    0          
705 0 0         warn("EV::Redis: exception in push handler: %s", SvPV_nolen(ERRSV));
706             }
707              
708 0 0         FREETMPS;
709 0           LEAVE;
710             }
711              
712 0           self->callback_depth--;
713 0           check_destroyed(self);
714             }
715              
716 0           static void pre_connect_common(EV__Redis self, redisOptions* opts) {
717 0 0         if (NULL != self->connect_timeout) {
718 0           opts->connect_timeout = self->connect_timeout;
719             }
720 0 0         if (NULL != self->command_timeout) {
721 0           opts->command_timeout = self->command_timeout;
722             }
723 0 0         if (self->prefer_ipv4) {
724 0           opts->options |= REDIS_OPT_PREFER_IPV4;
725             }
726 0 0         else if (self->prefer_ipv6) {
727 0           opts->options |= REDIS_OPT_PREFER_IPV6;
728             }
729 0 0         if (self->cloexec) {
730 0           opts->options |= REDIS_OPT_SET_SOCK_CLOEXEC;
731             }
732 0 0         if (self->reuseaddr) {
733 0           opts->options |= REDIS_OPT_REUSEADDR;
734             }
735 0 0         if (NULL != self->source_addr && NULL == self->path) {
    0          
736 0           opts->endpoint.tcp.source_addr = self->source_addr;
737             }
738 0           }
739              
740             /* Set up a newly allocated redisAsyncContext: SSL, keepalive, libev, callbacks.
741             * On failure: frees ac, nulls self->ac, emits error with err_prefix. */
742 0           static int post_connect_setup(EV__Redis self, const char* err_prefix) {
743 0           self->ac_saved = NULL;
744 0           self->ac->data = (void*)self;
745              
746             #ifdef EV_REDIS_SSL
747 0 0         if (NULL != self->ssl_ctx) {
748 0 0         if (REDIS_OK != redisInitiateSSLWithContext(&self->ac->c, self->ssl_ctx)) {
749 0 0         SV* err = sv_2mortal(newSVpvf("%s: SSL initiation failed: %s",
750             err_prefix, self->ac->errstr[0] ? self->ac->errstr : "unknown error"));
751 0           redisAsyncFree(self->ac);
752 0           self->ac = NULL;
753 0           emit_error(self, err);
754 0           return REDIS_ERR;
755             }
756             }
757             #endif
758              
759 0 0         if (self->keepalive > 0) {
760 0           redisEnableKeepAliveWithInterval(&self->ac->c, self->keepalive);
761             }
762 0 0         if (self->tcp_user_timeout > 0) {
763 0           redisSetTcpUserTimeout(&self->ac->c, self->tcp_user_timeout);
764             }
765              
766 0 0         if (REDIS_OK != redisLibevAttach(self->loop, self->ac)) {
767 0           SV* err = sv_2mortal(newSVpvf("%s: cannot attach libev", err_prefix));
768 0           redisAsyncFree(self->ac);
769 0           self->ac = NULL;
770 0           emit_error(self, err);
771 0           return REDIS_ERR;
772             }
773              
774 0 0         if (self->priority != 0) {
775 0           redisLibevSetPriority(self->ac, self->priority);
776             }
777              
778 0           redisAsyncSetConnectCallbackNC(self->ac, EV__redis_connect_cb);
779 0           redisAsyncSetDisconnectCallback(self->ac, EV__redis_disconnect_cb);
780 0 0         if (NULL != self->push_handler) {
781 0           redisAsyncSetPushCallback(self->ac, EV__redis_push_cb);
782             }
783              
784 0 0         if (self->ac->err) {
785 0           SV* err = sv_2mortal(newSVpvf("%s: %s", err_prefix, self->ac->errstr));
786 0           redisAsyncFree(self->ac);
787 0           self->ac = NULL;
788 0           emit_error(self, err);
789 0           return REDIS_ERR;
790             }
791              
792 0           return REDIS_OK;
793             }
794              
795 0           static SV* EV__redis_decode_reply(redisReply* reply) {
796             SV* res;
797              
798 0           switch (reply->type) {
799 0           case REDIS_REPLY_STRING:
800             case REDIS_REPLY_ERROR:
801             case REDIS_REPLY_STATUS:
802             case REDIS_REPLY_BIGNUM:
803             case REDIS_REPLY_VERB:
804 0           res = newSVpvn(reply->str, reply->len);
805 0           break;
806              
807 0           case REDIS_REPLY_INTEGER:
808 0           res = newSViv(reply->integer);
809 0           break;
810              
811 0           case REDIS_REPLY_DOUBLE:
812 0           res = newSVnv(reply->dval);
813 0           break;
814              
815 0           case REDIS_REPLY_BOOL:
816 0           res = newSViv(reply->integer ? 1 : 0);
817 0           break;
818              
819 0           case REDIS_REPLY_NIL:
820 0           res = newSV(0);
821 0           break;
822              
823 0           case REDIS_REPLY_ARRAY:
824             case REDIS_REPLY_MAP:
825             case REDIS_REPLY_SET:
826             case REDIS_REPLY_ATTR:
827             case REDIS_REPLY_PUSH: {
828 0           AV* av = newAV();
829             size_t i;
830 0 0         if (reply->elements > 0) {
831 0           av_extend(av, (SSize_t)(reply->elements - 1));
832 0 0         for (i = 0; i < reply->elements; i++) {
833 0 0         if (NULL != reply->element[i]) {
834 0           av_push(av, EV__redis_decode_reply(reply->element[i]));
835             }
836             else {
837 0           av_push(av, newSV(0));
838             }
839             }
840             }
841 0           res = newRV_noinc((SV*)av);
842 0           break;
843             }
844              
845 0           default:
846             /* Unknown type, return undef */
847 0           res = newSV(0);
848 0           break;
849             }
850              
851 0           return res;
852             }
853              
854 0           static void EV__redis_reply_cb(redisAsyncContext* c, void* reply, void* privdata) {
855 0           EV__Redis self = (EV__Redis)c->data;
856             ev_redis_cb_t* cbt;
857             SV* sv_reply;
858             SV* sv_err;
859              
860 0           cbt = (ev_redis_cb_t*)privdata;
861              
862 0 0         if (cbt->skipped) {
863 0 0         if (!cbt->persist || NULL == reply) {
    0          
864             /* Multi-channel persistent: hiredis fires once per channel with
865             * same cbt. Decrement sub_count, free only on last call. */
866 0 0         if (cbt->persist && NULL == reply && cbt->sub_count > 1) {
    0          
    0          
867 0           cbt->sub_count--;
868 0           return;
869             }
870 0           Safefree(cbt);
871             }
872 0 0         else if (cbt->persist && reply != NULL && is_unsub_reply((redisReply*)reply)) {
    0          
    0          
873 0           cbt->sub_count--;
874 0 0         if (cbt->sub_count <= 0) {
875 0           Safefree(cbt);
876             }
877             }
878 0           return;
879             }
880              
881             /* self is NULL when DESTROY nulled ac->data (deferred free inside
882             * REDIS_IN_CALLBACK) or during PL_dirty. Still invoke the callback
883             * with a disconnect error so users can clean up resources. The hiredis
884             * context (c) is still alive here — safe to read c->errstr.
885             * cb may be NULL during PL_dirty where we pre-null it.
886             * For persistent commands (multi-channel subscribe), hiredis fires
887             * reply_cb once per channel with the same cbt. Invoke the callback
888             * only once (null cb after), use sub_count to track when to free. */
889 0 0         if (self == NULL) {
890 0 0         if (NULL != cbt->cb) {
891 0           invoke_callback_error(cbt->cb,
892 0 0         sv_2mortal(newSVpv(c->errstr[0] ? c->errstr : "disconnected", 0)));
893 0           SvREFCNT_dec(cbt->cb);
894 0           cbt->cb = NULL;
895             }
896 0 0         if (cbt->persist && reply == NULL && cbt->sub_count > 1) {
    0          
    0          
897 0           cbt->sub_count--;
898 0           return;
899             }
900 0           Safefree(cbt);
901 0           return;
902             }
903              
904             /* If self is marked as freed (during DESTROY), we still invoke the
905             * callback with an error, but skip any self->field access afterward.
906             * For persistent commands, don't free cbt here — leave it in the queue
907             * for remove_cb_queue_sv to clean up after redisAsyncFree returns. */
908 0 0         if (self->magic == EV_REDIS_FREED) {
909 0 0         if (NULL != cbt->cb) {
910 0           self->callback_depth++;
911 0 0         invoke_callback_error(cbt->cb, sv_2mortal(newSVpv(c->errstr[0] ? c->errstr : "disconnected", 0)));
912 0           self->callback_depth--;
913 0           SvREFCNT_dec(cbt->cb);
914 0           cbt->cb = NULL;
915             }
916 0 0         if (!cbt->persist) {
917 0           ngx_queue_remove(&cbt->queue);
918 0           Safefree(cbt);
919             }
920 0           check_destroyed(self);
921 0           return;
922             }
923              
924             /* Unknown magic - memory corruption, skip.
925             * Don't touch queue pointers (self's memory may be garbage).
926             * Always decrement refcount since callback will never be invoked again. */
927 0 0         if (self->magic != EV_REDIS_MAGIC) {
928 0 0         if (NULL != cbt->cb) SvREFCNT_dec(cbt->cb);
929 0           Safefree(cbt);
930 0           return;
931             }
932              
933 0           self->current_cb = cbt;
934 0           self->callback_depth++;
935              
936 0 0         if (NULL != cbt->cb) {
937 0 0         if (NULL == reply) {
938 0 0         sv_err = sv_2mortal(newSVpv(
939             c->errstr[0] ? c->errstr : "disconnected", 0));
940 0           invoke_callback_error(cbt->cb, sv_err);
941             }
942             else {
943 0           dSP;
944              
945 0           ENTER;
946 0           SAVETMPS;
947              
948 0 0         PUSHMARK(SP);
949 0 0         EXTEND(SP, 2);
950 0           sv_reply = sv_2mortal(EV__redis_decode_reply((redisReply*)reply));
951 0 0         if (((redisReply*)reply)->type == REDIS_REPLY_ERROR) {
952 0           PUSHs(&PL_sv_undef);
953 0           PUSHs(sv_reply);
954             }
955             else {
956 0           PUSHs(sv_reply);
957             }
958 0           PUTBACK;
959              
960 0           call_sv(cbt->cb, G_DISCARD | G_EVAL);
961 0 0         if (SvTRUE(ERRSV)) {
    0          
962 0 0         warn("EV::Redis: exception in command callback: %s", SvPV_nolen(ERRSV));
963             }
964              
965 0 0         FREETMPS;
966 0           LEAVE;
967             }
968             }
969              
970 0           self->callback_depth--;
971 0           self->current_cb = NULL;
972              
973             /* If DESTROY was called during our callback (e.g., user undef'd $redis),
974             * self->magic is EV_REDIS_FREED but self is still valid (DESTROY defers
975             * Safefree when callback_depth > 0). Complete cleanup here.
976             * For persistent commands (multi-channel subscribe), hiredis will fire
977             * reply_cb again for remaining channels via __redisAsyncFree. Null the
978             * callback to prevent double invocation, but leave cbt alive so those
979             * later calls see it and can track sub_count for proper cleanup. */
980 0 0         if (self->magic == EV_REDIS_FREED) {
981 0 0         if (NULL != cbt->cb) {
982 0           SvREFCNT_dec(cbt->cb);
983 0           cbt->cb = NULL;
984             }
985 0 0         if (!cbt->persist) {
986 0           Safefree(cbt);
987             }
988 0           check_destroyed(self);
989 0           return;
990             }
991              
992 0 0         if (cbt->skipped) {
993             /* Defensive check: handles edge case where callback is marked skipped
994             * during its own execution (e.g., via reentrant event loop where a
995             * nested callback overwrites current_cb, allowing skip_pending to
996             * process this callback). ngx_queue_remove is safe here due to
997             * ngx_queue_init in skip_pending. Don't decrement pending_count since
998             * skip_pending already did when it set skipped=1. */
999 0           ngx_queue_remove(&cbt->queue);
1000             /* For persistent commands (e.g., SUBSCRIBE), hiredis fires reply_cb
1001             * once per subscribed channel during disconnect. Only free cbt on the
1002             * last channel to prevent use-after-free. */
1003 0 0         if (cbt->persist && cbt->sub_count > 1) {
    0          
1004 0           cbt->sub_count--;
1005 0           return;
1006             }
1007 0           Safefree(cbt);
1008 0           self->callback_depth++;
1009 0           send_next_waiting(self);
1010 0           self->callback_depth--;
1011 0           check_destroyed(self);
1012 0           return;
1013             }
1014              
1015             /* Detect end of persistent subscription: when all channels from a
1016             * SUBSCRIBE command have been unsubscribed, hiredis removes its internal
1017             * callback entry. Clean up our cbt to prevent orphaned queue entries. */
1018 0 0         if (cbt->persist && reply != NULL && is_unsub_reply((redisReply*)reply)) {
    0          
    0          
1019 0           cbt->sub_count--;
1020 0 0         if (cbt->sub_count <= 0) {
1021             /* All channels unsubscribed — persistent commands are not counted
1022             * in pending_count, so don't decrement it. */
1023 0           ngx_queue_remove(&cbt->queue);
1024 0           self->callback_depth++;
1025 0 0         if (NULL != cbt->cb) SvREFCNT_dec(cbt->cb);
1026 0           Safefree(cbt);
1027 0           self->callback_depth--;
1028 0           check_destroyed(self);
1029 0           return;
1030             }
1031             }
1032              
1033             /* Connection teardown with active subscription: hiredis fires reply_cb
1034             * once per subscribed channel (from dict iteration in __redisAsyncFree).
1035             * Track sub_count and remove from queue on last channel to prevent
1036             * disconnect_cb's remove_cb_queue_sv from invoking the callback again. */
1037 0 0         if (cbt->persist && NULL == reply) {
    0          
1038 0 0         if (cbt->sub_count > 1) {
1039 0           cbt->sub_count--;
1040             } else {
1041 0           ngx_queue_remove(&cbt->queue);
1042 0           self->callback_depth++;
1043 0 0         if (NULL != cbt->cb) SvREFCNT_dec(cbt->cb);
1044 0           Safefree(cbt);
1045 0           self->callback_depth--;
1046 0           check_destroyed(self);
1047             }
1048 0           return;
1049             }
1050              
1051 0 0         if (0 == cbt->persist) {
1052             /* Remove from queue BEFORE SvREFCNT_dec. The SvREFCNT_dec may free a
1053             * closure that holds the last reference to this object, triggering
1054             * DESTROY. If cbt is still in the queue, DESTROY's remove_cb_queue_sv
1055             * would double-free it. Wrapping in callback_depth defers DESTROY's
1056             * Safefree(self) so we can safely access self afterward. */
1057 0           ngx_queue_remove(&cbt->queue);
1058 0           self->pending_count--;
1059 0           self->callback_depth++;
1060 0 0         if (NULL != cbt->cb) SvREFCNT_dec(cbt->cb);
1061 0           Safefree(cbt);
1062             /* Don't drain waiting queue when reply is NULL (connection dying) —
1063             * disconnect_cb will handle reconnect and wait queue preservation. */
1064 0 0         if (reply != NULL) {
1065 0           send_next_waiting(self);
1066             }
1067 0           self->callback_depth--;
1068 0           check_destroyed(self);
1069             }
1070             }
1071              
1072             /* Submit a cbt (already in cb_queue) to Redis. On failure, removes cbt from
1073             * queue, invokes error callback, and frees cbt. Returns REDIS_OK or REDIS_ERR. */
1074 0           static int submit_to_redis(EV__Redis self, ev_redis_cb_t* cbt,
1075             int argc, const char** argv, const size_t* argvlen)
1076             {
1077 0           redisCallbackFn* fn = EV__redis_reply_cb;
1078 0           void* privdata = (void*)cbt;
1079 0           const char* cmd = argv[0];
1080              
1081             /* Hiredis does not store callbacks for unsubscribe commands — replies are
1082             * routed through the original subscribe callback. Pass NULL so hiredis
1083             * doesn't hold a dangling reference; we clean up our cbt below. */
1084 0 0         if (cbt->persist && is_unsubscribe_command(cmd)) {
    0          
1085 0           fn = NULL;
1086 0           privdata = NULL;
1087             }
1088              
1089 0           int r = redisAsyncCommandArgv(
1090             self->ac, fn, privdata,
1091             argc, argv, argvlen
1092             );
1093              
1094 0 0         if (REDIS_OK != r) {
1095 0           ngx_queue_remove(&cbt->queue);
1096 0 0         if (!cbt->persist) self->pending_count--;
1097              
1098 0 0         if (NULL != cbt->cb) {
1099 0 0         invoke_callback_error(cbt->cb, sv_2mortal(newSVpv(
    0          
1100             (self->ac && self->ac->errstr[0]) ? self->ac->errstr : "command failed", 0)));
1101 0           SvREFCNT_dec(cbt->cb);
1102             }
1103 0           Safefree(cbt);
1104 0 0         } else if (fn == NULL) {
1105             /* Successfully sent an unsubscribe command. Since hiredis won't
1106             * call us back, we must clean up our tracking 'cbt' now. */
1107 0           ngx_queue_remove(&cbt->queue);
1108 0 0         if (NULL != cbt->cb) SvREFCNT_dec(cbt->cb);
1109 0           Safefree(cbt);
1110             }
1111              
1112 0           return r;
1113             }
1114              
1115             /* Send waiting commands to Redis. Uses iterative loop instead of recursion
1116             * to avoid stack overflow when many commands fail consecutively. */
1117 0           static void send_next_waiting(EV__Redis self) {
1118             ngx_queue_t* q;
1119             ev_redis_wait_t* wt;
1120             ev_redis_cb_t* cbt;
1121              
1122             while (1) {
1123             /* Check preconditions each iteration - they may change after callbacks */
1124 0 0         if (NULL == self->ac || self->intentional_disconnect) return;
    0          
1125 0 0         if (ngx_queue_empty(&self->wait_queue)) return;
1126 0 0         if (self->max_pending > 0 && self->pending_count >= self->max_pending) return;
    0          
1127              
1128 0           q = ngx_queue_head(&self->wait_queue);
1129 0           wt = ngx_queue_data(q, ev_redis_wait_t, queue);
1130 0           ngx_queue_remove(q);
1131 0           self->waiting_count--;
1132              
1133 0           Newx(cbt, 1, ev_redis_cb_t);
1134 0           cbt->cb = wt->cb;
1135 0           wt->cb = NULL;
1136 0           cbt->skipped = 0;
1137 0           cbt->persist = wt->persist;
1138 0 0         cbt->sub_count = wt->persist ? wt->argc - 1 : 0;
1139 0           ngx_queue_init(&cbt->queue);
1140 0           ngx_queue_insert_tail(&self->cb_queue, &cbt->queue);
1141 0 0         if (!cbt->persist) self->pending_count++;
1142              
1143             /* Ignore submit_to_redis return: on failure it invokes cbt's error
1144             * callback and frees cbt, so the loop continues to try the next entry. */
1145 0           (void)submit_to_redis(self, cbt, wt->argc,
1146 0           (const char**)wt->argv, wt->argvlen);
1147 0           free_wait_entry(wt);
1148             }
1149             }
1150              
1151             MODULE = EV::Redis PACKAGE = EV::Redis
1152              
1153             BOOT:
1154             {
1155 24 50         I_EV_API("EV::Redis");
    50          
    50          
1156              
1157             /* Initialize shared error strings */
1158 24           err_skipped = newSVpvs_share("skipped");
1159 24           SvREADONLY_on(err_skipped);
1160              
1161 24           err_waiting_timeout = newSVpvs_share("waiting timeout");
1162 24           SvREADONLY_on(err_waiting_timeout);
1163              
1164 24           err_disconnected = newSVpvs_share("disconnected");
1165 24           SvREADONLY_on(err_disconnected);
1166             #ifdef EV_REDIS_SSL
1167 24           redisInitOpenSSL();
1168             #endif
1169             }
1170              
1171             EV::Redis
1172             _new(char* class, EV::Loop loop);
1173             CODE:
1174             {
1175             PERL_UNUSED_VAR(class);
1176 6           Newxz(RETVAL, 1, ev_redis_t);
1177 6           RETVAL->magic = EV_REDIS_MAGIC;
1178 6           ngx_queue_init(&RETVAL->cb_queue);
1179 6           ngx_queue_init(&RETVAL->wait_queue);
1180 6           RETVAL->loop = loop;
1181 6           RETVAL->cloexec = 1;
1182             }
1183             OUTPUT:
1184             RETVAL
1185              
1186             void
1187             DESTROY(EV::Redis self);
1188             CODE:
1189             {
1190             redisAsyncContext* ac_to_free;
1191 6           int skip_cb_cleanup = 0;
1192              
1193             /* Check for use-after-free: if magic number is wrong, this object
1194             * was already freed and memory is being reused. Skip cleanup. */
1195 6 50         if (self->magic != EV_REDIS_MAGIC) {
1196 0 0         if (self->magic == EV_REDIS_FREED) {
1197             /* Already destroyed - this is a double-free at Perl level */
1198 0           return;
1199             }
1200             /* Unknown magic - memory corruption or uninitialized */
1201 0           return;
1202             }
1203              
1204             /* Mark as freed FIRST to prevent re-entrant DESTROY */
1205 6           self->magic = EV_REDIS_FREED;
1206              
1207             /* Stop timers BEFORE PL_dirty check. Timer callbacks have self as data
1208             * pointer, so we must stop them before freeing self to prevent UAF.
1209             * The stop helpers check for NULL loop, so this is safe even if loop
1210             * is already destroyed. */
1211 6           stop_reconnect_timer(self);
1212 6           stop_waiting_timer(self);
1213              
1214             /* During global destruction (PL_dirty), the EV loop and other Perl
1215             * objects may already be destroyed. Clean up hiredis and our own memory
1216             * but don't invoke Perl-level handlers.
1217             * CRITICAL: We must call redisAsyncFree to stop the libev adapter's
1218             * watchers and free the redisAsyncContext. Without this, the adapter's
1219             * ev_io/ev_timer watchers remain registered in the EV loop with dangling
1220             * data pointers, causing SEGV during process cleanup. */
1221 6 50         if (PL_dirty) {
1222 0 0         if (NULL != self->ac) {
1223             /* Null cb_queue callbacks before redisAsyncFree to prevent
1224             * SvREFCNT_dec on potentially-freed SVs during global destruction.
1225             * (Same pattern as wait_queue below.) */
1226             {
1227             ngx_queue_t* q;
1228 0           for (q = ngx_queue_head(&self->cb_queue);
1229 0 0         q != ngx_queue_sentinel(&self->cb_queue);
1230 0           q = ngx_queue_next(q)) {
1231 0           ev_redis_cb_t* cbt = ngx_queue_data(q, ev_redis_cb_t, queue);
1232 0           cbt->cb = NULL;
1233             }
1234             }
1235 0           self->ac->data = NULL; /* prevent callbacks from accessing self */
1236 0           redisAsyncFree(self->ac);
1237 0           self->ac = NULL;
1238             }
1239 0 0         if (NULL != self->ac_saved) {
1240 0           self->ac_saved->data = NULL;
1241 0           self->ac_saved = NULL;
1242             }
1243 0           free_c_fields(self);
1244             /* Free wait_queue C memory (skip Perl callbacks during global destruction) */
1245 0 0         while (!ngx_queue_empty(&self->wait_queue)) {
1246 0           ngx_queue_t* q = ngx_queue_head(&self->wait_queue);
1247 0           ev_redis_wait_t* wt = ngx_queue_data(q, ev_redis_wait_t, queue);
1248 0           ngx_queue_remove(q);
1249 0           wt->cb = NULL; /* skip SvREFCNT_dec during PL_dirty */
1250 0           free_wait_entry(wt);
1251             }
1252 0           Safefree(self);
1253 0           return;
1254             }
1255              
1256 6           self->reconnect = 0;
1257              
1258             /* CRITICAL: Set self->ac to NULL BEFORE calling redisAsyncFree.
1259             * redisAsyncFree triggers reply callbacks, which call send_next_waiting,
1260             * which checks self->ac != NULL before issuing commands. If we don't
1261             * clear self->ac first, send_next_waiting will try to call
1262             * redisAsyncCommandArgv during the teardown, causing heap corruption. */
1263 6           self->loop = NULL;
1264 6           ac_to_free = self->ac;
1265 6           self->ac = NULL;
1266 6 50         if (NULL != ac_to_free) {
1267             /* If inside a hiredis callback (REDIS_IN_CALLBACK), redisAsyncFree
1268             * will be deferred. hiredis will fire pending reply callbacks later
1269             * via __redisAsyncFree. NULL ac->data so those callbacks see NULL self
1270             * and handle cleanup without accessing freed memory. */
1271 0 0         if (ac_to_free->c.flags & REDIS_IN_CALLBACK) {
1272 0           ac_to_free->data = NULL;
1273 0           skip_cb_cleanup = 1;
1274             }
1275              
1276             /* Protect against premature free in disconnect_cb if triggered synchronously */
1277 0           self->callback_depth++;
1278 0           redisAsyncFree(ac_to_free);
1279 0           self->callback_depth--;
1280             }
1281             /* If disconnect() was called from inside a callback, ac_saved points to
1282             * the deferred async context. NULL its data pointer to prevent the
1283             * deferred disconnect callback from accessing freed self. */
1284 6 50         if (self->ac_saved != NULL) {
1285 0           self->ac_saved->data = NULL;
1286 0           self->ac_saved = NULL;
1287 0           skip_cb_cleanup = 1;
1288             }
1289 6 50         CLEAR_HANDLER(self->error_handler);
1290 6 50         CLEAR_HANDLER(self->connect_handler);
1291 6 50         CLEAR_HANDLER(self->disconnect_handler);
1292 6 50         CLEAR_HANDLER(self->push_handler);
1293 6           free_c_fields(self);
1294              
1295 6 50         if (!self->in_wait_cleanup) {
1296 6           clear_wait_queue_sv(self, err_disconnected);
1297             }
1298 6 50         if (!skip_cb_cleanup && !self->in_cb_cleanup) {
    50          
1299             /* Safe to free cbts ourselves — hiredis has no deferred references. */
1300 6           remove_cb_queue_sv(self, NULL);
1301             }
1302             /* else: hiredis still holds references to our cbts (deferred free/disconnect).
1303             * reply_cb will handle cbt cleanup when called with self == NULL. */
1304              
1305             /* Defer Safefree if inside a callback — check_destroyed() handles it */
1306 6 50         if (self->current_cb == NULL && self->callback_depth == 0) {
    50          
1307 6           Safefree(self);
1308             }
1309             }
1310              
1311             void
1312             connect(EV::Redis self, char* hostname, int port = 6379);
1313             CODE:
1314             {
1315             redisOptions opts;
1316              
1317 0 0         if (NULL != self->ac) {
1318 0           croak("already connected");
1319             }
1320              
1321 0           self->intentional_disconnect = 0;
1322 0           self->reconnect_attempts = 0;
1323 0           clear_connection_params(self);
1324 0           self->host = savepv(hostname);
1325 0           self->port = port;
1326              
1327 0           memset(&opts, 0, sizeof(opts));
1328 0           pre_connect_common(self, &opts);
1329 0           REDIS_OPTIONS_SET_TCP(&opts, hostname, port);
1330 0           self->ac = redisAsyncConnectWithOptions(&opts);
1331 0 0         if (NULL == self->ac) {
1332 0           croak("connect error: cannot allocate memory");
1333             }
1334              
1335 0           (void)post_connect_setup(self, "connect error");
1336             }
1337              
1338             void
1339             connect_unix(EV::Redis self, const char* path);
1340             CODE:
1341             {
1342             redisOptions opts;
1343              
1344 0 0         if (NULL != self->ac) {
1345 0           croak("already connected");
1346             }
1347              
1348 0           self->intentional_disconnect = 0;
1349 0           self->reconnect_attempts = 0;
1350 0           clear_connection_params(self);
1351 0           self->path = savepv(path);
1352              
1353 0           memset(&opts, 0, sizeof(opts));
1354 0           pre_connect_common(self, &opts);
1355 0           REDIS_OPTIONS_SET_UNIX(&opts, path);
1356 0           self->ac = redisAsyncConnectWithOptions(&opts);
1357 0 0         if (NULL == self->ac) {
1358 0           croak("connect error: cannot allocate memory");
1359             }
1360              
1361 0           (void)post_connect_setup(self, "connect error");
1362             }
1363              
1364             void
1365             disconnect(EV::Redis self);
1366             CODE:
1367             {
1368             /* Stop any pending reconnect timer on explicit disconnect */
1369 0           self->intentional_disconnect = 1;
1370 0           stop_reconnect_timer(self);
1371 0           self->reconnect_attempts = 0;
1372              
1373 0 0         if (NULL == self->ac) {
1374             /* Already disconnected — still stop waiting timer and clear
1375             * wait queue (e.g., resume_waiting_on_reconnect kept them alive
1376             * after a connection drop, but user now explicitly disconnects). */
1377 0           stop_waiting_timer(self);
1378 0 0         if (!ngx_queue_empty(&self->wait_queue)) {
1379 0           self->callback_depth++;
1380 0           clear_wait_queue_sv(self, err_disconnected);
1381 0           self->callback_depth--;
1382 0           check_destroyed(self);
1383             }
1384 0           return;
1385             }
1386             /* Save ac pointer for deferred disconnect: when inside a hiredis
1387             * callback, redisAsyncDisconnect only sets REDIS_DISCONNECTING and
1388             * returns. DESTROY needs ac_saved to NULL ac->data if the Perl object
1389             * is freed before the deferred disconnect completes.
1390             * Only set when REDIS_IN_CALLBACK: in the synchronous path,
1391             * disconnect_cb fires during redisAsyncDisconnect and clears ac_saved;
1392             * but if DESTROY fires nested during that processing (SvREFCNT_dec
1393             * dropping last ref), it would NULL ac->data, causing disconnect_cb to
1394             * skip cleanup, leaving ac_saved dangling after ac is freed. */
1395 0 0         if (self->ac->c.flags & REDIS_IN_CALLBACK) {
1396 0           self->ac_saved = self->ac;
1397             }
1398             /* Protect against Safefree(self) if disconnect_cb fires synchronously
1399             * and user's on_disconnect handler drops the last Perl reference. */
1400 0           self->callback_depth++;
1401 0           redisAsyncDisconnect(self->ac);
1402 0           self->ac = NULL;
1403 0           self->callback_depth--;
1404 0 0         if (check_destroyed(self)) return;
1405             }
1406              
1407             int
1408             is_connected(EV::Redis self);
1409             CODE:
1410             {
1411 0 0         RETVAL = (NULL != self->ac) ? 1 : 0;
1412             }
1413             OUTPUT:
1414             RETVAL
1415              
1416             SV*
1417             connect_timeout(EV::Redis self, SV* timeout_ms = NULL);
1418             CODE:
1419             {
1420 0           RETVAL = timeout_accessor(&self->connect_timeout, timeout_ms, "connect_timeout");
1421             }
1422             OUTPUT:
1423             RETVAL
1424              
1425             SV*
1426             command_timeout(EV::Redis self, SV* timeout_ms = NULL);
1427             CODE:
1428             {
1429 0           RETVAL = timeout_accessor(&self->command_timeout, timeout_ms, "command_timeout");
1430             /* Apply to active connection immediately */
1431 0 0         if (NULL != timeout_ms && SvOK(timeout_ms) && NULL != self->ac && NULL != self->command_timeout) {
    0          
    0          
    0          
1432 0           redisAsyncSetTimeout(self->ac, *self->command_timeout);
1433             }
1434             }
1435             OUTPUT:
1436             RETVAL
1437              
1438             SV*
1439             on_error(EV::Redis self, SV* handler = NULL);
1440             CODE:
1441             {
1442 6           RETVAL = handler_accessor(&self->error_handler, handler, items > 1);
1443             }
1444             OUTPUT:
1445             RETVAL
1446              
1447             SV*
1448             on_connect(EV::Redis self, SV* handler = NULL);
1449             CODE:
1450             {
1451 0           RETVAL = handler_accessor(&self->connect_handler, handler, items > 1);
1452             }
1453             OUTPUT:
1454             RETVAL
1455              
1456             SV*
1457             on_disconnect(EV::Redis self, SV* handler = NULL);
1458             CODE:
1459             {
1460 0           RETVAL = handler_accessor(&self->disconnect_handler, handler, items > 1);
1461             }
1462             OUTPUT:
1463             RETVAL
1464              
1465             SV*
1466             on_push(EV::Redis self, SV* handler = NULL);
1467             CODE:
1468             {
1469 0           RETVAL = handler_accessor(&self->push_handler, handler, items > 1);
1470             /* Sync push callback with hiredis if connected */
1471 0 0         if (NULL != self->ac) {
1472 0 0         if (NULL != self->push_handler) {
1473 0           redisAsyncSetPushCallback(self->ac, EV__redis_push_cb);
1474             } else {
1475 0           redisAsyncSetPushCallback(self->ac, NULL);
1476             }
1477             }
1478             }
1479             OUTPUT:
1480             RETVAL
1481              
1482             int
1483             command(EV::Redis self, ...);
1484             PREINIT:
1485             SV* cb;
1486             char** argv;
1487             size_t* argvlen;
1488             STRLEN len;
1489             int argc, i, persist;
1490             ev_redis_cb_t* cbt;
1491             ev_redis_wait_t* wt;
1492             char* p;
1493             CODE:
1494             {
1495 1 50         if (items < 2) {
1496 0           croak("Usage: command(\"command\", ..., [$callback])");
1497             }
1498              
1499 1           cb = ST(items - 1);
1500 1 50         if (SvROK(cb) && SvTYPE(SvRV(cb)) == SVt_PVCV) {
    50          
1501 1           argc = items - 2; /* last arg is callback */
1502             }
1503             else {
1504 0           cb = NULL; /* fire-and-forget: no callback */
1505 0           argc = items - 1;
1506             }
1507              
1508 1 50         if (argc < 1) {
1509 0           croak("Usage: command(\"command\", ..., [$callback])");
1510             }
1511              
1512 1 50         if (NULL == self->ac) {
1513 1 50         if (!self->reconnect_timer_active) {
1514 1           croak("connection required before calling command");
1515             }
1516             /* Reconnect in progress — fall through to queue in wait_queue */
1517             }
1518 0           Newx(argv, argc, char*);
1519 0           SAVEFREEPV(argv);
1520 0           Newx(argvlen, argc, size_t);
1521 0           SAVEFREEPV(argvlen);
1522              
1523 0 0         for (i = 0; i < argc; i++) {
1524 0           argv[i] = SvPV(ST(i + 1), len);
1525 0           argvlen[i] = len;
1526             }
1527              
1528 0           persist = is_persistent_command(argv[0]);
1529              
1530 0 0         if (NULL == self->ac ||
1531 0 0         (self->max_pending > 0 && self->pending_count >= self->max_pending)) {
    0          
1532 0           Newx(wt, 1, ev_redis_wait_t);
1533 0           Newx(wt->argv, argc, char*);
1534 0           Newx(wt->argvlen, argc, size_t);
1535 0 0         for (i = 0; i < argc; i++) {
1536 0           Newx(p, argvlen[i] + 1, char);
1537 0           Copy(argv[i], p, argvlen[i], char);
1538 0           p[argvlen[i]] = '\0';
1539 0           wt->argv[i] = p;
1540 0           wt->argvlen[i] = argvlen[i];
1541             }
1542 0           wt->argc = argc;
1543 0           wt->cb = SvREFCNT_inc(cb);
1544 0           wt->persist = persist;
1545 0           wt->queued_at = ev_now(self->loop);
1546 0           ngx_queue_init(&wt->queue);
1547 0           ngx_queue_insert_tail(&self->wait_queue, &wt->queue);
1548 0           self->waiting_count++;
1549 0           schedule_waiting_timer(self);
1550 0           RETVAL = REDIS_OK;
1551             }
1552             else {
1553 0           Newx(cbt, 1, ev_redis_cb_t);
1554 0           cbt->cb = SvREFCNT_inc(cb);
1555 0           cbt->skipped = 0;
1556 0           cbt->persist = persist;
1557 0 0         cbt->sub_count = persist ? argc - 1 : 0;
1558 0           ngx_queue_init(&cbt->queue);
1559 0           ngx_queue_insert_tail(&self->cb_queue, &cbt->queue);
1560 0 0         if (!persist) self->pending_count++;
1561              
1562 0           RETVAL = submit_to_redis(self, cbt,
1563             argc, (const char**)argv, argvlen);
1564             }
1565             }
1566             OUTPUT:
1567             RETVAL
1568              
1569             void
1570             reconnect(EV::Redis self, int enable, int delay_ms = 1000, int max_attempts = 0);
1571             CODE:
1572             {
1573 0           validate_timeout_ms(delay_ms, "reconnect_delay");
1574 0           self->reconnect = enable ? 1 : 0;
1575 0           self->reconnect_delay_ms = delay_ms;
1576 0           self->max_reconnect_attempts = max_attempts >= 0 ? max_attempts : 0;
1577 0           self->reconnect_attempts = 0;
1578              
1579 0 0         if (!enable) {
1580 0           stop_reconnect_timer(self);
1581             }
1582             }
1583              
1584             int
1585             reconnect_enabled(EV::Redis self);
1586             CODE:
1587             {
1588 0 0         RETVAL = self->reconnect;
1589             }
1590             OUTPUT:
1591             RETVAL
1592              
1593             int
1594             pending_count(EV::Redis self);
1595             CODE:
1596             {
1597 0 0         RETVAL = self->pending_count;
1598             }
1599             OUTPUT:
1600             RETVAL
1601              
1602             int
1603             waiting_count(EV::Redis self);
1604             CODE:
1605             {
1606 0 0         RETVAL = self->waiting_count;
1607             }
1608             OUTPUT:
1609             RETVAL
1610              
1611             int
1612             max_pending(EV::Redis self, SV* limit = NULL);
1613             CODE:
1614             {
1615 0 0         if (NULL != limit && SvOK(limit)) {
    0          
1616 0           int val = SvIV(limit);
1617 0 0         if (val < 0) {
1618 0           croak("max_pending must be non-negative");
1619             }
1620 0           self->max_pending = val;
1621              
1622             /* When limit is increased or removed, send waiting commands.
1623             * callback_depth protects against DESTROY if a failed command's
1624             * error callback drops the last Perl reference to self. */
1625 0           self->callback_depth++;
1626 0           send_next_waiting(self);
1627 0           self->callback_depth--;
1628 0 0         if (check_destroyed(self)) XSRETURN_IV(0);
1629             }
1630 0 0         RETVAL = self->max_pending;
1631             }
1632             OUTPUT:
1633             RETVAL
1634              
1635             SV*
1636             waiting_timeout(EV::Redis self, SV* timeout_ms = NULL);
1637             CODE:
1638             {
1639 0 0         if (NULL != timeout_ms && SvOK(timeout_ms)) {
    0          
1640 0           IV ms = SvIV(timeout_ms);
1641 0           validate_timeout_ms(ms, "waiting_timeout");
1642 0           self->waiting_timeout_ms = (int)ms;
1643 0           schedule_waiting_timer(self);
1644             }
1645              
1646 0           RETVAL = newSViv((IV)self->waiting_timeout_ms);
1647             }
1648             OUTPUT:
1649             RETVAL
1650              
1651             int
1652             resume_waiting_on_reconnect(EV::Redis self, SV* value = NULL);
1653             CODE:
1654             {
1655 0 0         if (NULL != value && SvOK(value)) {
    0          
1656 0           self->resume_waiting_on_reconnect = SvTRUE(value) ? 1 : 0;
1657             }
1658 0 0         RETVAL = self->resume_waiting_on_reconnect;
1659             }
1660             OUTPUT:
1661             RETVAL
1662              
1663             int
1664             priority(EV::Redis self, SV* value = NULL);
1665             CODE:
1666             {
1667 0 0         if (NULL != value && SvOK(value)) {
    0          
1668 0           int prio = SvIV(value);
1669 0 0         if (prio < EV_MINPRI) prio = EV_MINPRI;
1670 0 0         if (prio > EV_MAXPRI) prio = EV_MAXPRI;
1671 0           self->priority = prio;
1672 0 0         if (NULL != self->ac) {
1673 0           redisLibevSetPriority(self->ac, prio);
1674             }
1675             }
1676 0 0         RETVAL = self->priority;
1677             }
1678             OUTPUT:
1679             RETVAL
1680              
1681             int
1682             keepalive(EV::Redis self, SV* value = NULL);
1683             CODE:
1684             {
1685 0 0         if (NULL != value && SvOK(value)) {
    0          
1686 0           int interval = SvIV(value);
1687 0 0         if (interval < 0) croak("keepalive interval must be non-negative");
1688 0 0         if (interval > MAX_TIMEOUT_MS / 1000) croak("keepalive interval too large");
1689 0           self->keepalive = interval;
1690 0 0         if (NULL != self->ac && interval > 0) {
    0          
1691 0           redisEnableKeepAliveWithInterval(&self->ac->c, interval);
1692             }
1693             }
1694 0 0         RETVAL = self->keepalive;
1695             }
1696             OUTPUT:
1697             RETVAL
1698              
1699             int
1700             prefer_ipv4(EV::Redis self, SV* value = NULL);
1701             CODE:
1702             {
1703 0 0         if (NULL != value && SvOK(value)) {
    0          
1704 0           self->prefer_ipv4 = SvTRUE(value) ? 1 : 0;
1705 0 0         if (self->prefer_ipv4) self->prefer_ipv6 = 0;
1706             }
1707 0 0         RETVAL = self->prefer_ipv4;
1708             }
1709             OUTPUT:
1710             RETVAL
1711              
1712             int
1713             prefer_ipv6(EV::Redis self, SV* value = NULL);
1714             CODE:
1715             {
1716 0 0         if (NULL != value && SvOK(value)) {
    0          
1717 0           self->prefer_ipv6 = SvTRUE(value) ? 1 : 0;
1718 0 0         if (self->prefer_ipv6) self->prefer_ipv4 = 0;
1719             }
1720 0 0         RETVAL = self->prefer_ipv6;
1721             }
1722             OUTPUT:
1723             RETVAL
1724              
1725             SV*
1726             source_addr(EV::Redis self, SV* value = NULL);
1727             CODE:
1728             {
1729 0 0         if (items > 1) {
1730 0 0         if (NULL != self->source_addr) {
1731 0           Safefree(self->source_addr);
1732 0           self->source_addr = NULL;
1733             }
1734 0 0         if (NULL != value && SvOK(value)) {
    0          
1735 0           self->source_addr = savepv(SvPV_nolen(value));
1736             }
1737             }
1738 0 0         if (NULL != self->source_addr) {
1739 0           RETVAL = newSVpv(self->source_addr, 0);
1740             } else {
1741 0           RETVAL = &PL_sv_undef;
1742             }
1743             }
1744             OUTPUT:
1745             RETVAL
1746              
1747             unsigned int
1748             tcp_user_timeout(EV::Redis self, SV* value = NULL);
1749             CODE:
1750             {
1751 0 0         if (NULL != value && SvOK(value)) {
    0          
1752 0           IV ms = SvIV(value);
1753 0           validate_timeout_ms(ms, "tcp_user_timeout");
1754 0           self->tcp_user_timeout = (unsigned int)ms;
1755             }
1756 0 0         RETVAL = self->tcp_user_timeout;
1757             }
1758             OUTPUT:
1759             RETVAL
1760              
1761             int
1762             cloexec(EV::Redis self, SV* value = NULL);
1763             CODE:
1764             {
1765 0 0         if (NULL != value && SvOK(value)) {
    0          
1766 0           self->cloexec = SvTRUE(value) ? 1 : 0;
1767             }
1768 0 0         RETVAL = self->cloexec;
1769             }
1770             OUTPUT:
1771             RETVAL
1772              
1773             int
1774             reuseaddr(EV::Redis self, SV* value = NULL);
1775             CODE:
1776             {
1777 0 0         if (NULL != value && SvOK(value)) {
    0          
1778 0           self->reuseaddr = SvTRUE(value) ? 1 : 0;
1779             }
1780 0 0         RETVAL = self->reuseaddr;
1781             }
1782             OUTPUT:
1783             RETVAL
1784              
1785             void
1786             skip_waiting(EV::Redis self);
1787             CODE:
1788             {
1789             /* Protect self from destruction during queue iteration */
1790 0           self->callback_depth++;
1791              
1792             /* If cleanup is already in progress (e.g., during expire_waiting_commands
1793             * or disconnect callback), don't modify the wait_queue. */
1794 0 0         if (self->in_wait_cleanup) {
1795 0           self->callback_depth--;
1796 0           check_destroyed(self);
1797 0           return;
1798             }
1799              
1800 0           clear_wait_queue_sv(self, err_skipped);
1801 0           stop_waiting_timer(self);
1802              
1803 0           self->callback_depth--;
1804 0           check_destroyed(self);
1805             }
1806              
1807             void
1808             skip_pending(EV::Redis self);
1809             CODE:
1810             {
1811             ngx_queue_t local_queue;
1812             ngx_queue_t* q;
1813             ev_redis_cb_t* cbt;
1814              
1815             /* Protect self from destruction during queue iteration */
1816 0           self->callback_depth++;
1817              
1818             /* Always attempt to clear waiting queue (handles its own re-entrancy) */
1819 0           clear_wait_queue_sv(self, err_skipped);
1820 0           stop_waiting_timer(self);
1821              
1822             /* If cb_queue cleanup is already in progress, stop here. */
1823 0 0         if (self->in_cb_cleanup) {
1824 0           self->callback_depth--;
1825 0           check_destroyed(self);
1826 0           return;
1827             }
1828              
1829             /* Protect cb_queue iteration from re-entrancy. If a user callback
1830             * calls skip_pending() again, the in_cb_cleanup check above will return. */
1831 0           self->in_cb_cleanup = 1;
1832              
1833 0           ngx_queue_init(&local_queue);
1834 0 0         while (!ngx_queue_empty(&self->cb_queue)) {
1835 0           q = ngx_queue_head(&self->cb_queue);
1836 0           cbt = ngx_queue_data(q, ev_redis_cb_t, queue);
1837              
1838 0 0         if (cbt == self->current_cb) {
1839             /* If current_cb is at head — if it's the only item, we're done */
1840 0 0         if (ngx_queue_next(q) == ngx_queue_sentinel(&self->cb_queue)) {
1841 0           break;
1842             }
1843 0           q = ngx_queue_next(q);
1844 0           cbt = ngx_queue_data(q, ev_redis_cb_t, queue);
1845             }
1846              
1847 0           ngx_queue_remove(q);
1848 0           ngx_queue_insert_tail(&local_queue, q);
1849             }
1850              
1851 0 0         while (!ngx_queue_empty(&local_queue)) {
1852 0 0         if (self->magic == EV_REDIS_FREED) {
1853 0           break;
1854             }
1855              
1856 0           q = ngx_queue_head(&local_queue);
1857 0           cbt = ngx_queue_data(q, ev_redis_cb_t, queue);
1858 0           ngx_queue_remove(q);
1859              
1860             /* Mark as skipped FIRST to prevent double callback invocation if
1861             * invoke_callback_error re-enters the event loop. */
1862 0           cbt->skipped = 1;
1863              
1864             /* Re-initialize queue node so any subsequent remove (from reply_cb's
1865             * skipped path on re-entry) is safe. */
1866 0           ngx_queue_init(q);
1867 0 0         if (!cbt->persist) self->pending_count--;
1868              
1869             /* Save and clear callback BEFORE invoking — if the user callback
1870             * re-enters and a Redis reply arrives, reply_cb sees skipped=1
1871             * and frees cbt. Clearing cb first avoids use-after-free. */
1872 0 0         if (NULL != cbt->cb) {
1873 0           SV* cb_to_invoke = cbt->cb;
1874 0           cbt->cb = NULL;
1875              
1876 0           invoke_callback_error(cb_to_invoke, err_skipped);
1877 0           SvREFCNT_dec(cb_to_invoke);
1878             }
1879             }
1880              
1881 0           self->in_cb_cleanup = 0;
1882              
1883 0           self->callback_depth--;
1884 0           check_destroyed(self);
1885             }
1886              
1887             int
1888             has_ssl(char* class);
1889             CODE:
1890             {
1891             PERL_UNUSED_VAR(class);
1892             #ifdef EV_REDIS_SSL
1893 4 50         RETVAL = 1;
1894             #else
1895             RETVAL = 0;
1896             #endif
1897             }
1898             OUTPUT:
1899             RETVAL
1900              
1901             #ifdef EV_REDIS_SSL
1902              
1903             void
1904             _setup_ssl_context(EV::Redis self, SV* cacert, SV* capath, SV* cert, SV* key, SV* server_name, int verify = 1);
1905             CODE:
1906             {
1907 4           redisSSLContextError ssl_error = REDIS_SSL_CTX_NONE;
1908             redisSSLOptions ssl_opts;
1909              
1910 4           memset(&ssl_opts, 0, sizeof(ssl_opts));
1911 4 100         ssl_opts.cacert_filename = (SvOK(cacert)) ? SvPV_nolen(cacert) : NULL;
1912 4 100         ssl_opts.capath = (SvOK(capath)) ? SvPV_nolen(capath) : NULL;
1913 4 100         ssl_opts.cert_filename = (SvOK(cert)) ? SvPV_nolen(cert) : NULL;
1914 4 50         ssl_opts.private_key_filename = (SvOK(key)) ? SvPV_nolen(key) : NULL;
1915 4 50         ssl_opts.server_name = (SvOK(server_name)) ? SvPV_nolen(server_name) : NULL;
1916 4           ssl_opts.verify_mode = verify ? REDIS_SSL_VERIFY_PEER : REDIS_SSL_VERIFY_NONE;
1917              
1918 4 50         if (NULL != self->ssl_ctx) {
1919 0           redisFreeSSLContext(self->ssl_ctx);
1920 0           self->ssl_ctx = NULL;
1921             }
1922              
1923 4           self->ssl_ctx = redisCreateSSLContextWithOptions(&ssl_opts, &ssl_error);
1924              
1925 4 100         if (NULL == self->ssl_ctx) {
1926 2           croak("SSL context creation failed: %s", redisSSLContextGetError(ssl_error));
1927             }
1928             }
1929              
1930             #endif