File Coverage

Redis.xs
Criterion Covered Total %
statement 91 885 10.2
branch 54 724 7.4
condition n/a
subroutine n/a
pod n/a
total 145 1609 9.0


line stmt bran cond sub pod time code
1             #include "EXTERN.h"
2             #include "perl.h"
3             #include "XSUB.h"
4             #include "ppport.h"
5              
6             #include "EVAPI.h"
7              
8             #include "hiredis.h"
9             #include "async.h"
10             #include "libev_adapter.h"
11             #include "ngx-queue.h"
12              
13             #ifdef EV_REDIS_SSL
14             #include "hiredis_ssl.h"
15             #endif
16              
17             typedef struct ev_redis_s ev_redis_t;
18             typedef struct ev_redis_cb_s ev_redis_cb_t;
19             typedef struct ev_redis_wait_s ev_redis_wait_t;
20              
21             typedef ev_redis_t* EV__Redis;
22             typedef struct ev_loop* EV__Loop;
23              
24             #define EV_REDIS_MAGIC 0xDEADBEEF
25             #define EV_REDIS_FREED 0xFEEDFACE
26              
27             #define CLEAR_HANDLER(field) \
28             do { if (NULL != (field)) { SvREFCNT_dec(field); (field) = NULL; } } while(0)
29              
30             struct ev_redis_s {
31             unsigned int magic; /* Set to EV_REDIS_MAGIC when alive */
32             struct ev_loop* loop;
33             redisAsyncContext* ac;
34             SV* error_handler;
35             SV* connect_handler;
36             SV* disconnect_handler;
37             SV* push_handler;
38             struct timeval* connect_timeout;
39             struct timeval* command_timeout;
40             ngx_queue_t cb_queue;
41             ngx_queue_t wait_queue;
42             int pending_count;
43             int waiting_count;
44             int max_pending; /* 0 = unlimited */
45             ev_redis_cb_t* current_cb; /* callback currently executing */
46             int resume_waiting_on_reconnect; /* keep waiting queue on disconnect */
47             int waiting_timeout_ms; /* max ms in waiting queue, 0 = unlimited */
48             ev_timer waiting_timer;
49             int waiting_timer_active;
50              
51             /* Reconnect settings */
52             char* host;
53             int port;
54             char* path;
55             int reconnect; /* 0 = disabled, 1 = enabled */
56             int reconnect_delay_ms; /* delay between reconnect attempts */
57             int max_reconnect_attempts; /* 0 = unlimited */
58             int reconnect_attempts; /* current attempt count */
59             ev_timer reconnect_timer;
60             int reconnect_timer_active;
61             int intentional_disconnect; /* set before explicit disconnect() */
62             int priority; /* libev watcher priority, default 0 */
63             int in_cb_cleanup; /* prevent re-entrant cb_queue modification */
64             int in_wait_cleanup; /* prevent re-entrant wait_queue modification */
65             int callback_depth; /* nesting depth of C-level callbacks invoking Perl code */
66             int keepalive; /* TCP keepalive interval in seconds, 0 = disabled */
67             int prefer_ipv4; /* prefer IPv4 DNS resolution */
68             int prefer_ipv6; /* prefer IPv6 DNS resolution */
69             char* source_addr; /* local address to bind to */
70             unsigned int tcp_user_timeout; /* TCP_USER_TIMEOUT in ms, 0 = OS default */
71             int cloexec; /* set SOCK_CLOEXEC on socket */
72             int reuseaddr; /* set SO_REUSEADDR on socket */
73             redisAsyncContext* ac_saved; /* saved ac pointer for deferred disconnect cleanup */
74             #ifdef EV_REDIS_SSL
75             redisSSLContext* ssl_ctx;
76             #endif
77             };
78              
79             struct ev_redis_cb_s {
80             SV* cb;
81             ngx_queue_t queue;
82             int persist;
83             int skipped;
84             int sub_count; /* subscription channels remaining (for persistent commands) */
85             };
86              
87             struct ev_redis_wait_s {
88             char** argv;
89             size_t* argvlen;
90             int argc;
91             SV* cb;
92             int persist;
93             ngx_queue_t queue;
94             ev_tstamp queued_at;
95             };
96              
97             /* Shared error strings (initialized in BOOT) */
98             static SV* err_skipped = NULL;
99             static SV* err_waiting_timeout = NULL;
100             static SV* err_disconnected = NULL;
101              
102             /* Check for unsubscribe-family commands. These are persistent (stay in cb_queue)
103             * but hiredis ignores their callbacks — replies go through the subscribe callback. */
104 0           static int is_unsubscribe_command(const char* cmd) {
105 0           char c = cmd[0];
106 0 0         if (c == 'u' || c == 'U') return (0 == strcasecmp(cmd, "unsubscribe"));
    0          
107 0 0         if (c == 'p' || c == 'P') return (0 == strcasecmp(cmd, "punsubscribe"));
    0          
108 0 0         if (c == 's' || c == 'S') return (0 == strcasecmp(cmd, "sunsubscribe"));
    0          
109 0           return 0;
110             }
111              
112 0           static int is_persistent_command(const char* cmd) {
113 0           char c = cmd[0];
114              
115 0 0         if (c == 's' || c == 'S') {
    0          
116 0 0         if (0 == strcasecmp(cmd, "subscribe")) return 1;
117 0 0         if (0 == strcasecmp(cmd, "ssubscribe")) return 1;
118 0 0         if (0 == strcasecmp(cmd, "sunsubscribe")) return 1;
119 0           return 0;
120             }
121 0 0         if (c == 'u' || c == 'U') {
    0          
122 0           return (0 == strcasecmp(cmd, "unsubscribe"));
123             }
124 0 0         if (c == 'p' || c == 'P') {
    0          
125 0 0         if (0 == strcasecmp(cmd, "psubscribe")) return 1;
126 0 0         if (0 == strcasecmp(cmd, "punsubscribe")) return 1;
127 0           return 0;
128             }
129 0 0         if (c == 'm' || c == 'M') {
    0          
130 0           return (0 == strcasecmp(cmd, "monitor"));
131             }
132              
133 0           return 0;
134             }
135              
136             /* Detect unsubscribe-type replies that indicate end of a subscription channel.
137             * Format: [type_string, channel, remaining_count]
138             * Returns 1 if the reply is an unsubscribe/punsubscribe/sunsubscribe message. */
139 0           static int is_unsub_reply(redisReply* reply) {
140             const char* s;
141              
142 0 0         if (reply->type != REDIS_REPLY_ARRAY && reply->type != REDIS_REPLY_PUSH) return 0;
    0          
143 0 0         if (reply->elements < 3) return 0;
144 0 0         if (NULL == reply->element[0]) return 0;
145 0 0         if (reply->element[0]->type != REDIS_REPLY_STRING &&
146 0 0         reply->element[0]->type != REDIS_REPLY_STATUS) return 0;
147              
148 0           s = reply->element[0]->str;
149 0 0         if (s[0] == 'u' || s[0] == 'U') return (0 == strcasecmp(s, "unsubscribe"));
    0          
150 0 0         if (s[0] == 'p' || s[0] == 'P') return (0 == strcasecmp(s, "punsubscribe"));
    0          
151 0 0         if (s[0] == 's' || s[0] == 'S') return (0 == strcasecmp(s, "sunsubscribe"));
    0          
152 0           return 0;
153             }
154              
155 0           static void emit_error(EV__Redis self, SV* error) {
156 0 0         if (NULL == self->error_handler) return;
157              
158 0           dSP;
159              
160 0           ENTER;
161 0           SAVETMPS;
162              
163 0 0         PUSHMARK(SP);
164 0 0         XPUSHs(error);
165 0           PUTBACK;
166              
167 0           call_sv(self->error_handler, G_DISCARD | G_EVAL);
168 0 0         if (SvTRUE(ERRSV)) {
    0          
169 0 0         warn("EV::Redis: exception in error handler: %s", SvPV_nolen(ERRSV));
170             }
171              
172 0 0         FREETMPS;
173 0           LEAVE;
174             }
175              
176 0           static void emit_error_str(EV__Redis self, const char* error) {
177 0 0         if (NULL == self->error_handler) return;
178 0           emit_error(self, sv_2mortal(newSVpv(error, 0)));
179             }
180              
181 0           static void invoke_callback_error(SV* cb, SV* error_sv) {
182 0           dSP;
183 0           ENTER;
184 0           SAVETMPS;
185 0 0         PUSHMARK(SP);
186 0 0         EXTEND(SP, 2);
187 0           PUSHs(&PL_sv_undef);
188 0           PUSHs(error_sv);
189 0           PUTBACK;
190 0           call_sv(cb, G_DISCARD | G_EVAL);
191 0 0         if (SvTRUE(ERRSV)) {
    0          
192 0 0         warn("EV::Redis: exception in command callback: %s", SvPV_nolen(ERRSV));
193             }
194 0 0         FREETMPS;
195 0           LEAVE;
196 0           }
197              
198             /* Check if DESTROY was called during a callback and deferred Safefree.
199             * Call after decrementing callback_depth. Returns 1 if self was freed
200             * (caller MUST NOT access self afterward). */
201 0           static int check_destroyed(EV__Redis self) {
202 0 0         if (self->magic == EV_REDIS_FREED &&
203 0 0         self->callback_depth == 0 &&
204 0 0         self->current_cb == NULL) {
205 0           Safefree(self);
206 0           return 1;
207             }
208 0           return 0;
209             }
210              
211             /* Free C-allocated fields (used by both PL_dirty and normal DESTROY paths) */
212 6           static void free_c_fields(EV__Redis self) {
213 6 50         if (NULL != self->host) { Safefree(self->host); self->host = NULL; }
214 6 50         if (NULL != self->path) { Safefree(self->path); self->path = NULL; }
215 6 50         if (NULL != self->source_addr) { Safefree(self->source_addr); self->source_addr = NULL; }
216 6 50         if (NULL != self->connect_timeout) { Safefree(self->connect_timeout); self->connect_timeout = NULL; }
217 6 50         if (NULL != self->command_timeout) { Safefree(self->command_timeout); self->command_timeout = NULL; }
218             #ifdef EV_REDIS_SSL
219 6 100         if (NULL != self->ssl_ctx) { redisFreeSSLContext(self->ssl_ctx); self->ssl_ctx = NULL; }
220             #endif
221 6           }
222              
223 6           static void stop_waiting_timer(EV__Redis self) {
224 6 50         if (self->waiting_timer_active && NULL != self->loop && !PL_dirty) {
    0          
    0          
225 0           ev_timer_stop(self->loop, &self->waiting_timer);
226 0           self->waiting_timer_active = 0;
227             }
228 6           }
229              
230 6           static void stop_reconnect_timer(EV__Redis self) {
231 6 50         if (self->reconnect_timer_active && NULL != self->loop && !PL_dirty) {
    0          
    0          
232 0           ev_timer_stop(self->loop, &self->reconnect_timer);
233 0           self->reconnect_timer_active = 0;
234             }
235 6           }
236              
237             /* Maximum timeout: ~24 days (fits safely in 32-bit calculations) */
238             #define MAX_TIMEOUT_MS 2000000000
239              
240 0           static void validate_timeout_ms(IV ms, const char* name) {
241 0 0         if (ms < 0) croak("%s must be non-negative", name);
242 0 0         if (ms > MAX_TIMEOUT_MS) croak("%s too large (max %d ms)", name, MAX_TIMEOUT_MS);
243 0           }
244              
245 0           static SV* timeout_accessor(struct timeval** tv_ptr, SV* timeout_ms, const char* name) {
246 0 0         if (NULL != timeout_ms && SvOK(timeout_ms)) {
    0          
247 0           IV ms = SvIV(timeout_ms);
248 0           validate_timeout_ms(ms, name);
249 0 0         if (NULL == *tv_ptr) {
250 0           Newx(*tv_ptr, 1, struct timeval);
251             }
252 0           (*tv_ptr)->tv_sec = (long)(ms / 1000);
253 0           (*tv_ptr)->tv_usec = (long)((ms % 1000) * 1000);
254             }
255              
256 0 0         if (NULL != *tv_ptr) {
257 0           return newSViv((IV)(*tv_ptr)->tv_sec * 1000 + (*tv_ptr)->tv_usec / 1000);
258             }
259 0           return &PL_sv_undef;
260             }
261              
262             /* Helper to set/clear a callback handler field.
263             * If called without handler (items == 1), clears the handler.
264             * If called with handler, sets it (or clears if handler is undef/not CODE).
265             * Returns the current handler (with refcount incremented) or undef. */
266 6           static SV* handler_accessor(SV** handler_ptr, SV* handler, int has_handler_arg) {
267             /* Clear existing handler first - both no-arg calls and set calls clear first */
268 6 50         if (NULL != *handler_ptr) {
269 0           SvREFCNT_dec(*handler_ptr);
270 0           *handler_ptr = NULL;
271             }
272              
273             /* If a handler argument was provided and it's a valid CODE ref, set it */
274 6 50         if (has_handler_arg && NULL != handler && SvOK(handler) && SvROK(handler) &&
    50          
    50          
    50          
275 6 50         SvTYPE(SvRV(handler)) == SVt_PVCV) {
276 6           *handler_ptr = SvREFCNT_inc(handler);
277             }
278              
279 6           return (NULL != *handler_ptr)
280 6           ? SvREFCNT_inc(*handler_ptr)
281 12 50         : &PL_sv_undef;
282             }
283              
284             /* Uses in_cb_cleanup flag to prevent re-entrant queue modification from
285             * user callbacks (e.g., if callback calls skip_pending). */
286 6           static void remove_cb_queue_sv(EV__Redis self, SV* error_sv) {
287             ngx_queue_t* q;
288             ev_redis_cb_t* cbt;
289              
290 6 50         if (self->in_cb_cleanup) {
291 0           return;
292             }
293              
294 6           self->in_cb_cleanup = 1;
295              
296             /* Use while loop with re-fetch of head each iteration.
297             * This is safe against re-entrant modifications because we
298             * re-check the queue state after each callback invocation. */
299 6 50         while (!ngx_queue_empty(&self->cb_queue)) {
300 0           q = ngx_queue_head(&self->cb_queue);
301 0           cbt = ngx_queue_data(q, ev_redis_cb_t, queue);
302              
303 0 0         if (cbt == self->current_cb) {
304             /* Skip current_cb - check if it's the only item left */
305 0 0         if (ngx_queue_next(q) == ngx_queue_sentinel(&self->cb_queue)) {
306 0           break; /* Only current_cb remains, we're done */
307             }
308             /* Move to next item */
309 0           q = ngx_queue_next(q);
310 0           cbt = ngx_queue_data(q, ev_redis_cb_t, queue);
311             }
312              
313 0           ngx_queue_remove(q);
314 0 0         if (!cbt->persist) self->pending_count--;
315              
316 0 0         if (NULL != cbt->cb) {
317 0 0         if (NULL != error_sv) {
318 0           invoke_callback_error(cbt->cb, error_sv);
319             }
320 0           SvREFCNT_dec(cbt->cb);
321             }
322 0           Safefree(cbt);
323             }
324              
325 6           self->in_cb_cleanup = 0;
326             }
327              
328 0           static void free_wait_entry(ev_redis_wait_t* wt) {
329             int i;
330 0 0         for (i = 0; i < wt->argc; i++) {
331 0           Safefree(wt->argv[i]);
332             }
333 0           Safefree(wt->argv);
334 0           Safefree(wt->argvlen);
335 0 0         if (NULL != wt->cb) {
336 0           SvREFCNT_dec(wt->cb);
337             }
338 0           Safefree(wt);
339 0           }
340              
341             /* Uses in_wait_cleanup flag to prevent re-entrant queue modification. */
342 6           static void clear_wait_queue_sv(EV__Redis self, SV* error_sv) {
343             ngx_queue_t* q;
344             ev_redis_wait_t* wt;
345              
346 6 50         if (self->in_wait_cleanup) {
347 0           return;
348             }
349              
350             /* Protect against re-entrancy: if a callback invokes skip_waiting() or
351             * skip_pending(), they should no-op since we're already clearing. */
352 6           self->in_wait_cleanup = 1;
353              
354 6 50         while (!ngx_queue_empty(&self->wait_queue)) {
355 0           q = ngx_queue_head(&self->wait_queue);
356 0           wt = ngx_queue_data(q, ev_redis_wait_t, queue);
357 0           ngx_queue_remove(q);
358 0           self->waiting_count--;
359              
360 0 0         if (NULL != error_sv && NULL != wt->cb) {
    0          
361 0           invoke_callback_error(wt->cb, error_sv);
362             }
363              
364 0           free_wait_entry(wt);
365             }
366              
367 6           self->in_wait_cleanup = 0;
368             }
369              
370             /* Forward declarations */
371             static void pre_connect_common(EV__Redis self, redisOptions* opts);
372             static int post_connect_setup(EV__Redis self, const char* err_prefix);
373             static void do_reconnect(EV__Redis self);
374             static void send_next_waiting(EV__Redis self);
375             static void schedule_waiting_timer(EV__Redis self);
376             static void expire_waiting_commands(EV__Redis self);
377             static void schedule_reconnect(EV__Redis self);
378             static void EV__redis_connect_cb(redisAsyncContext* c, int status);
379             static void EV__redis_disconnect_cb(redisAsyncContext* c, int status);
380             static void EV__redis_push_cb(redisAsyncContext* ac, void* reply_ptr);
381             static SV* EV__redis_decode_reply(redisReply* reply);
382              
383 0           static void clear_connection_params(EV__Redis self) {
384 0 0         if (NULL != self->host) {
385 0           Safefree(self->host);
386 0           self->host = NULL;
387             }
388 0 0         if (NULL != self->path) {
389 0           Safefree(self->path);
390 0           self->path = NULL;
391             }
392 0           }
393              
394 0           static void reconnect_timer_cb(EV_P_ ev_timer* w, int revents) {
395 0           EV__Redis self = (EV__Redis)w->data;
396              
397             (void)loop;
398             (void)revents;
399              
400 0 0         if (NULL == self || self->magic != EV_REDIS_MAGIC) return;
    0          
401              
402 0           self->reconnect_timer_active = 0;
403 0           self->callback_depth++;
404 0           do_reconnect(self);
405 0           self->callback_depth--;
406 0 0         if (check_destroyed(self)) return;
407             }
408              
409 0           static void schedule_reconnect(EV__Redis self) {
410             ev_tstamp delay;
411              
412 0 0         if (!self->reconnect) return;
413 0 0         if (self->intentional_disconnect) return;
414 0 0         if (NULL == self->loop) return;
415 0           stop_reconnect_timer(self);
416 0 0         if (self->max_reconnect_attempts > 0 &&
417 0 0         self->reconnect_attempts >= self->max_reconnect_attempts) {
418             /* Clear waiting queue that was preserved for reconnect - reconnect has
419             * permanently failed, so these commands will never be sent. */
420 0           clear_wait_queue_sv(self, sv_2mortal(newSVpv("reconnect error: max attempts reached", 0)));
421 0           stop_waiting_timer(self);
422 0           emit_error_str(self, "reconnect error: max attempts reached");
423 0           return;
424             }
425              
426 0           self->reconnect_attempts++;
427 0           delay = self->reconnect_delay_ms / 1000.0;
428              
429 0           ev_timer_init(&self->reconnect_timer, reconnect_timer_cb, delay, 0);
430 0           self->reconnect_timer.data = (void*)self;
431 0           ev_timer_start(self->loop, &self->reconnect_timer);
432 0           self->reconnect_timer_active = 1;
433             }
434              
435             /* Expire waiting commands that have exceeded waiting_timeout.
436             * Uses head-refetch iteration pattern which is safe against re-entrant
437             * queue modification (e.g., if a callback calls skip_waiting). */
438 0           static void expire_waiting_commands(EV__Redis self) {
439             ngx_queue_t* q;
440             ev_redis_wait_t* wt;
441             ev_tstamp now;
442             ev_tstamp timeout;
443              
444 0           now = ev_now(self->loop);
445             /* Capture timeout at start - callbacks may modify self->waiting_timeout_ms
446             * and we need consistent behavior for the entire batch. */
447 0           timeout = self->waiting_timeout_ms / 1000.0;
448              
449             /* Use while loop with re-fetch of head each iteration.
450             * This is safe against re-entrant modifications. */
451 0 0         while (!ngx_queue_empty(&self->wait_queue)) {
452 0           q = ngx_queue_head(&self->wait_queue);
453 0           wt = ngx_queue_data(q, ev_redis_wait_t, queue);
454              
455 0 0         if (now - wt->queued_at >= timeout) {
456 0           ngx_queue_remove(q);
457 0           self->waiting_count--;
458              
459 0 0         if (NULL != wt->cb) {
460 0           invoke_callback_error(wt->cb, err_waiting_timeout);
461             }
462              
463 0           free_wait_entry(wt);
464             }
465             else {
466             /* Queue is FIFO with monotonically increasing queued_at times.
467             * If this entry hasn't expired, neither have any following entries. */
468 0           break;
469             }
470             }
471 0           }
472              
473 0           static void waiting_timer_cb(EV_P_ ev_timer* w, int revents) {
474 0           EV__Redis self = (EV__Redis)w->data;
475              
476             (void)loop;
477             (void)revents;
478              
479 0 0         if (NULL == self || self->magic != EV_REDIS_MAGIC) return;
    0          
480              
481 0           self->waiting_timer_active = 0;
482 0           self->callback_depth++;
483 0           expire_waiting_commands(self);
484 0           schedule_waiting_timer(self);
485 0           self->callback_depth--;
486 0 0         if (check_destroyed(self)) return;
487             }
488              
489 0           static void schedule_waiting_timer(EV__Redis self) {
490             ngx_queue_t* q;
491             ev_redis_wait_t* wt;
492             ev_tstamp now, expires_at, delay;
493              
494             /* Use helper which includes NULL loop check */
495 0           stop_waiting_timer(self);
496              
497 0 0         if (NULL == self->loop) return;
498 0 0         if (self->waiting_timeout_ms <= 0) return;
499 0 0         if (ngx_queue_empty(&self->wait_queue)) return;
500              
501 0           q = ngx_queue_head(&self->wait_queue);
502 0           wt = ngx_queue_data(q, ev_redis_wait_t, queue);
503              
504 0           now = ev_now(self->loop);
505 0           expires_at = wt->queued_at + self->waiting_timeout_ms / 1000.0;
506 0           delay = expires_at - now;
507 0 0         if (delay < 0) delay = 0;
508              
509 0           ev_timer_init(&self->waiting_timer, waiting_timer_cb, delay, 0);
510 0           self->waiting_timer.data = (void*)self;
511 0           ev_timer_start(self->loop, &self->waiting_timer);
512 0           self->waiting_timer_active = 1;
513             }
514              
515 0           static void do_reconnect(EV__Redis self) {
516             redisOptions opts;
517 0           memset(&opts, 0, sizeof(opts));
518              
519 0 0         if (NULL == self->loop) {
520             /* Object is being destroyed */
521 0           return;
522             }
523              
524 0 0         if (NULL != self->ac) {
525             /* Already connected or connecting */
526 0           return;
527             }
528              
529 0           self->intentional_disconnect = 0;
530 0           pre_connect_common(self, &opts);
531              
532 0 0         if (NULL != self->path) {
533 0           REDIS_OPTIONS_SET_UNIX(&opts, self->path);
534             }
535 0 0         else if (NULL != self->host) {
536 0           REDIS_OPTIONS_SET_TCP(&opts, self->host, self->port);
537             }
538             else {
539 0           emit_error_str(self, "reconnect error: no connection parameters");
540 0           return;
541             }
542              
543 0           self->ac = redisAsyncConnectWithOptions(&opts);
544 0 0         if (NULL == self->ac) {
545 0           emit_error_str(self, "reconnect error: cannot allocate memory");
546 0           schedule_reconnect(self);
547 0           return;
548             }
549              
550 0 0         if (REDIS_OK != post_connect_setup(self, "reconnect error")) {
551 0           schedule_reconnect(self);
552 0           return;
553             }
554             }
555              
556 0           static void EV__redis_connect_cb(redisAsyncContext* c, int status) {
557 0           EV__Redis self = (EV__Redis)c->data;
558              
559             /* Safety check: if self is NULL or already freed, skip callback */
560 0 0         if (self == NULL) return;
561 0 0         if (self->magic != EV_REDIS_MAGIC) return;
562              
563 0           self->callback_depth++;
564              
565 0 0         if (REDIS_OK != status) {
566 0           self->ac = NULL;
567 0 0         emit_error_str(self, c->errstr[0] ? c->errstr : "connect failed");
568 0 0         if (!self->reconnect || !self->resume_waiting_on_reconnect
    0          
569 0 0         || self->intentional_disconnect) {
570 0 0         clear_wait_queue_sv(self, sv_2mortal(newSVpv(
571             c->errstr[0] ? c->errstr : "connect failed", 0)));
572 0           stop_waiting_timer(self);
573             }
574 0           schedule_reconnect(self);
575             }
576             else {
577 0           self->reconnect_attempts = 0;
578              
579 0 0         if (NULL != self->connect_handler) {
580 0           dSP;
581              
582 0           ENTER;
583 0           SAVETMPS;
584              
585 0 0         PUSHMARK(SP);
586 0           PUTBACK;
587              
588 0           call_sv(self->connect_handler, G_DISCARD | G_EVAL);
589 0 0         if (SvTRUE(ERRSV)) {
    0          
590 0 0         warn("EV::Redis: exception in connect handler: %s", SvPV_nolen(ERRSV));
591             }
592              
593 0 0         FREETMPS;
594 0           LEAVE;
595             }
596              
597             /* Resume waiting commands if any. */
598 0           send_next_waiting(self);
599             }
600              
601 0           self->callback_depth--;
602 0           check_destroyed(self);
603             }
604              
605 0           static void EV__redis_disconnect_cb(redisAsyncContext* c, int status) {
606 0           EV__Redis self = (EV__Redis)c->data;
607             SV* error_sv;
608 0           int should_reconnect = 0;
609             int was_intentional;
610             int will_reconnect;
611              
612             /* Safety check: if self is NULL or already freed, skip callback */
613 0 0         if (self == NULL) return;
614 0 0         if (self->magic != EV_REDIS_MAGIC) return;
615              
616             /* Stale disconnect callback: user already established a new connection
617             * (e.g., called disconnect() then connect() before the old deferred
618             * disconnect fired). Old pending callbacks were already processed by
619             * reply_cb. Skip all cleanup to avoid clobbering the new connection.
620             * Clear ac_saved if it points to this old context to prevent dangling. */
621 0 0         if (self->ac != NULL && self->ac != c) {
    0          
622 0 0         if (self->ac_saved == c) self->ac_saved = NULL;
623 0           return;
624             }
625              
626 0           was_intentional = self->intentional_disconnect;
627 0           self->intentional_disconnect = 0;
628              
629 0           self->ac = NULL;
630 0           self->ac_saved = NULL; /* disconnect callback fired normally */
631 0           self->callback_depth++;
632              
633 0 0         if (REDIS_OK == status) {
634 0           error_sv = err_disconnected;
635             }
636             else {
637 0 0         error_sv = sv_2mortal(newSVpv(
638             c->errstr[0] ? c->errstr : "disconnected", 0));
639 0 0         emit_error_str(self, c->errstr[0] ? c->errstr : "disconnected");
640 0 0         if (!was_intentional) {
641 0           should_reconnect = 1;
642             }
643             }
644              
645 0 0         if (NULL != self->disconnect_handler) {
646 0           dSP;
647              
648 0           ENTER;
649 0           SAVETMPS;
650              
651 0 0         PUSHMARK(SP);
652 0           PUTBACK;
653              
654 0           call_sv(self->disconnect_handler, G_DISCARD | G_EVAL);
655 0 0         if (SvTRUE(ERRSV)) {
    0          
656 0 0         warn("EV::Redis: exception in disconnect handler: %s", SvPV_nolen(ERRSV));
657             }
658              
659 0 0         FREETMPS;
660 0           LEAVE;
661              
662             /* Re-check: user's handler might have called connect() or reconnect()
663             * establishing a new ac. If so, skip clearing cb_queue to avoid
664             * freeing new commands. ac_saved is also already handled or NULL. */
665 0 0         if (self->ac != NULL && self->ac != c) {
    0          
666 0           self->callback_depth--;
667 0           check_destroyed(self);
668 0           return;
669             }
670             }
671              
672 0           remove_cb_queue_sv(self, error_sv);
673              
674             /* Clear waiting queue unless we will actually reconnect and
675             * resume_waiting_on_reconnect is enabled. */
676 0 0         will_reconnect = should_reconnect && !self->intentional_disconnect && self->reconnect;
    0          
    0          
677 0 0         if (!self->resume_waiting_on_reconnect || was_intentional || !will_reconnect) {
    0          
    0          
678 0           clear_wait_queue_sv(self, error_sv);
679 0           stop_waiting_timer(self);
680             }
681              
682 0 0         if (will_reconnect) {
683 0           schedule_reconnect(self);
684             }
685              
686 0           self->callback_depth--;
687 0           check_destroyed(self);
688             }
689              
690 0           static void EV__redis_push_cb(redisAsyncContext* ac, void* reply_ptr) {
691 0           EV__Redis self = (EV__Redis)ac->data;
692 0           redisReply* reply = (redisReply*)reply_ptr;
693              
694 0 0         if (self == NULL) return;
695 0 0         if (self->magic != EV_REDIS_MAGIC) return;
696 0 0         if (NULL == self->push_handler) return;
697 0 0         if (NULL == reply) return;
698              
699 0           self->callback_depth++;
700              
701             {
702 0           dSP;
703              
704 0           ENTER;
705 0           SAVETMPS;
706              
707 0 0         PUSHMARK(SP);
708 0 0         XPUSHs(sv_2mortal(EV__redis_decode_reply(reply)));
709 0           PUTBACK;
710              
711 0           call_sv(self->push_handler, G_DISCARD | G_EVAL);
712 0 0         if (SvTRUE(ERRSV)) {
    0          
713 0 0         warn("EV::Redis: exception in push handler: %s", SvPV_nolen(ERRSV));
714             }
715              
716 0 0         FREETMPS;
717 0           LEAVE;
718             }
719              
720 0           self->callback_depth--;
721 0           check_destroyed(self);
722             }
723              
724 0           static void pre_connect_common(EV__Redis self, redisOptions* opts) {
725 0 0         if (NULL != self->connect_timeout) {
726 0           opts->connect_timeout = self->connect_timeout;
727             }
728 0 0         if (NULL != self->command_timeout) {
729 0           opts->command_timeout = self->command_timeout;
730             }
731 0 0         if (self->prefer_ipv4) {
732 0           opts->options |= REDIS_OPT_PREFER_IPV4;
733             }
734 0 0         else if (self->prefer_ipv6) {
735 0           opts->options |= REDIS_OPT_PREFER_IPV6;
736             }
737 0 0         if (self->cloexec) {
738 0           opts->options |= REDIS_OPT_SET_SOCK_CLOEXEC;
739             }
740 0 0         if (self->reuseaddr) {
741 0           opts->options |= REDIS_OPT_REUSEADDR;
742             }
743 0 0         if (NULL != self->source_addr && NULL == self->path) {
    0          
744 0           opts->endpoint.tcp.source_addr = self->source_addr;
745             }
746 0           }
747              
748             /* Set up a newly allocated redisAsyncContext: SSL, keepalive, libev, callbacks.
749             * On failure: frees ac, nulls self->ac, emits error with err_prefix. */
750 0           static int post_connect_setup(EV__Redis self, const char* err_prefix) {
751 0           self->ac_saved = NULL;
752 0           self->ac->data = (void*)self;
753              
754             #ifdef EV_REDIS_SSL
755 0 0         if (NULL != self->ssl_ctx) {
756 0 0         if (REDIS_OK != redisInitiateSSLWithContext(&self->ac->c, self->ssl_ctx)) {
757 0 0         SV* err = sv_2mortal(newSVpvf("%s: SSL initiation failed: %s",
758             err_prefix, self->ac->errstr[0] ? self->ac->errstr : "unknown error"));
759 0           redisAsyncFree(self->ac);
760 0           self->ac = NULL;
761 0           emit_error(self, err);
762 0           return REDIS_ERR;
763             }
764             }
765             #endif
766              
767 0 0         if (self->keepalive > 0) {
768 0           redisEnableKeepAliveWithInterval(&self->ac->c, self->keepalive);
769             }
770 0 0         if (self->tcp_user_timeout > 0) {
771 0           redisSetTcpUserTimeout(&self->ac->c, self->tcp_user_timeout);
772             }
773              
774 0 0         if (REDIS_OK != redisLibevAttach(self->loop, self->ac)) {
775 0           SV* err = sv_2mortal(newSVpvf("%s: cannot attach libev", err_prefix));
776 0           redisAsyncFree(self->ac);
777 0           self->ac = NULL;
778 0           emit_error(self, err);
779 0           return REDIS_ERR;
780             }
781              
782 0 0         if (self->priority != 0) {
783 0           redisLibevSetPriority(self->ac, self->priority);
784             }
785              
786 0           redisAsyncSetConnectCallbackNC(self->ac, EV__redis_connect_cb);
787 0           redisAsyncSetDisconnectCallback(self->ac, (redisDisconnectCallback*)EV__redis_disconnect_cb);
788 0 0         if (NULL != self->push_handler) {
789 0           redisAsyncSetPushCallback(self->ac, EV__redis_push_cb);
790             }
791              
792 0 0         if (self->ac->err) {
793 0           SV* err = sv_2mortal(newSVpvf("%s: %s", err_prefix, self->ac->errstr));
794 0           redisAsyncFree(self->ac);
795 0           self->ac = NULL;
796 0           emit_error(self, err);
797 0           return REDIS_ERR;
798             }
799              
800 0           return REDIS_OK;
801             }
802              
803 0           static SV* EV__redis_decode_reply(redisReply* reply) {
804             SV* res;
805              
806 0           switch (reply->type) {
807 0           case REDIS_REPLY_STRING:
808             case REDIS_REPLY_ERROR:
809             case REDIS_REPLY_STATUS:
810             case REDIS_REPLY_BIGNUM:
811             case REDIS_REPLY_VERB:
812 0           res = newSVpvn(reply->str, reply->len);
813 0           break;
814              
815 0           case REDIS_REPLY_INTEGER:
816 0           res = newSViv(reply->integer);
817 0           break;
818              
819 0           case REDIS_REPLY_DOUBLE:
820 0           res = newSVnv(reply->dval);
821 0           break;
822              
823 0           case REDIS_REPLY_BOOL:
824 0           res = newSViv(reply->integer ? 1 : 0);
825 0           break;
826              
827 0           case REDIS_REPLY_NIL:
828 0           res = newSV(0);
829 0           break;
830              
831 0           case REDIS_REPLY_ARRAY:
832             case REDIS_REPLY_MAP:
833             case REDIS_REPLY_SET:
834             case REDIS_REPLY_ATTR:
835             case REDIS_REPLY_PUSH: {
836 0           AV* av = newAV();
837             size_t i;
838 0 0         if (reply->elements > 0) {
839 0           av_extend(av, (SSize_t)(reply->elements - 1));
840 0 0         for (i = 0; i < reply->elements; i++) {
841 0 0         if (NULL != reply->element[i]) {
842 0           av_push(av, EV__redis_decode_reply(reply->element[i]));
843             }
844             else {
845 0           av_push(av, newSV(0));
846             }
847             }
848             }
849 0           res = newRV_noinc((SV*)av);
850 0           break;
851             }
852              
853 0           default:
854             /* Unknown type, return undef */
855 0           res = newSV(0);
856 0           break;
857             }
858              
859 0           return res;
860             }
861              
862 0           static void EV__redis_reply_cb(redisAsyncContext* c, void* reply, void* privdata) {
863 0           EV__Redis self = (EV__Redis)c->data;
864             ev_redis_cb_t* cbt;
865             SV* sv_reply;
866             SV* sv_err;
867              
868 0           cbt = (ev_redis_cb_t*)privdata;
869              
870 0 0         if (cbt->skipped) {
871 0 0         if (!cbt->persist || NULL == reply) {
    0          
872             /* Multi-channel persistent: hiredis fires once per channel with
873             * same cbt. Decrement sub_count, free only on last call. */
874 0 0         if (cbt->persist && NULL == reply && cbt->sub_count > 1) {
    0          
    0          
875 0           cbt->sub_count--;
876 0           return;
877             }
878 0           Safefree(cbt);
879             }
880 0 0         else if (cbt->persist && reply != NULL && is_unsub_reply((redisReply*)reply)) {
    0          
    0          
881 0           cbt->sub_count--;
882 0 0         if (cbt->sub_count <= 0) {
883 0           Safefree(cbt);
884             }
885             }
886 0           return;
887             }
888              
889             /* self is NULL when DESTROY nulled ac->data (deferred free inside
890             * REDIS_IN_CALLBACK) or during PL_dirty. Still invoke the callback
891             * with a disconnect error so users can clean up resources. The hiredis
892             * context (c) is still alive here — safe to read c->errstr.
893             * cb may be NULL during PL_dirty where we pre-null it.
894             * For persistent commands (multi-channel subscribe), hiredis fires
895             * reply_cb once per channel with the same cbt. Invoke the callback
896             * only once (null cb after), use sub_count to track when to free. */
897 0 0         if (self == NULL) {
898 0 0         if (NULL != cbt->cb) {
899 0           invoke_callback_error(cbt->cb,
900 0 0         sv_2mortal(newSVpv(c->errstr[0] ? c->errstr : "disconnected", 0)));
901 0           SvREFCNT_dec(cbt->cb);
902 0           cbt->cb = NULL;
903             }
904 0 0         if (cbt->persist && reply == NULL && cbt->sub_count > 1) {
    0          
    0          
905 0           cbt->sub_count--;
906 0           return;
907             }
908 0           Safefree(cbt);
909 0           return;
910             }
911              
912             /* If self is marked as freed (during DESTROY), we still invoke the
913             * callback with an error, but skip any self->field access afterward.
914             * For persistent commands, don't free cbt here — leave it in the queue
915             * for remove_cb_queue_sv to clean up after redisAsyncFree returns. */
916 0 0         if (self->magic == EV_REDIS_FREED) {
917 0 0         if (NULL != cbt->cb) {
918 0           self->callback_depth++;
919 0 0         invoke_callback_error(cbt->cb, sv_2mortal(newSVpv(c->errstr[0] ? c->errstr : "disconnected", 0)));
920 0           self->callback_depth--;
921 0           SvREFCNT_dec(cbt->cb);
922 0           cbt->cb = NULL;
923             }
924 0 0         if (!cbt->persist) {
925 0           ngx_queue_remove(&cbt->queue);
926 0           Safefree(cbt);
927             }
928 0           check_destroyed(self);
929 0           return;
930             }
931              
932             /* Unknown magic - memory corruption, skip.
933             * Don't touch queue pointers (self's memory may be garbage).
934             * Always decrement refcount since callback will never be invoked again. */
935 0 0         if (self->magic != EV_REDIS_MAGIC) {
936 0 0         if (NULL != cbt->cb) SvREFCNT_dec(cbt->cb);
937 0           Safefree(cbt);
938 0           return;
939             }
940              
941 0           self->current_cb = cbt;
942 0           self->callback_depth++;
943              
944 0 0         if (NULL != cbt->cb) {
945 0 0         if (NULL == reply) {
946 0 0         sv_err = sv_2mortal(newSVpv(
947             c->errstr[0] ? c->errstr : "disconnected", 0));
948 0           invoke_callback_error(cbt->cb, sv_err);
949             }
950             else {
951 0           dSP;
952              
953 0           ENTER;
954 0           SAVETMPS;
955              
956 0 0         PUSHMARK(SP);
957 0 0         EXTEND(SP, 2);
958 0           sv_reply = sv_2mortal(EV__redis_decode_reply((redisReply*)reply));
959 0 0         if (((redisReply*)reply)->type == REDIS_REPLY_ERROR) {
960 0           PUSHs(&PL_sv_undef);
961 0           PUSHs(sv_reply);
962             }
963             else {
964 0           PUSHs(sv_reply);
965             }
966 0           PUTBACK;
967              
968 0           call_sv(cbt->cb, G_DISCARD | G_EVAL);
969 0 0         if (SvTRUE(ERRSV)) {
    0          
970 0 0         warn("EV::Redis: exception in command callback: %s", SvPV_nolen(ERRSV));
971             }
972              
973 0 0         FREETMPS;
974 0           LEAVE;
975             }
976             }
977              
978 0           self->callback_depth--;
979 0           self->current_cb = NULL;
980              
981             /* If DESTROY was called during our callback (e.g., user undef'd $redis),
982             * self->magic is EV_REDIS_FREED but self is still valid (DESTROY defers
983             * Safefree when callback_depth > 0). Complete cleanup here.
984             * For persistent commands (multi-channel subscribe), hiredis will fire
985             * reply_cb again for remaining channels via __redisAsyncFree. Null the
986             * callback to prevent double invocation, but leave cbt alive so those
987             * later calls see it and can track sub_count for proper cleanup. */
988 0 0         if (self->magic == EV_REDIS_FREED) {
989 0 0         if (NULL != cbt->cb) {
990 0           SvREFCNT_dec(cbt->cb);
991 0           cbt->cb = NULL;
992             }
993 0 0         if (!cbt->persist) {
994 0           Safefree(cbt);
995             }
996 0           check_destroyed(self);
997 0           return;
998             }
999              
1000 0 0         if (cbt->skipped) {
1001             /* Defensive check: handles edge case where callback is marked skipped
1002             * during its own execution (e.g., via reentrant event loop where a
1003             * nested callback overwrites current_cb, allowing skip_pending to
1004             * process this callback). ngx_queue_remove is safe here due to
1005             * ngx_queue_init in skip_pending. Don't decrement pending_count since
1006             * skip_pending already did when it set skipped=1. */
1007 0           ngx_queue_remove(&cbt->queue);
1008             /* For persistent commands (e.g., SUBSCRIBE), hiredis fires reply_cb
1009             * once per subscribed channel during disconnect. Only free cbt on the
1010             * last channel to prevent use-after-free. */
1011 0 0         if (cbt->persist && cbt->sub_count > 1) {
    0          
1012 0           cbt->sub_count--;
1013 0           return;
1014             }
1015 0           Safefree(cbt);
1016 0           self->callback_depth++;
1017 0           send_next_waiting(self);
1018 0           self->callback_depth--;
1019 0           check_destroyed(self);
1020 0           return;
1021             }
1022              
1023             /* Detect end of persistent subscription: when all channels from a
1024             * SUBSCRIBE command have been unsubscribed, hiredis removes its internal
1025             * callback entry. Clean up our cbt to prevent orphaned queue entries. */
1026 0 0         if (cbt->persist && reply != NULL && is_unsub_reply((redisReply*)reply)) {
    0          
    0          
1027 0           cbt->sub_count--;
1028 0 0         if (cbt->sub_count <= 0) {
1029             /* All channels unsubscribed — persistent commands are not counted
1030             * in pending_count, so don't decrement it. */
1031 0           ngx_queue_remove(&cbt->queue);
1032 0           self->callback_depth++;
1033 0 0         if (NULL != cbt->cb) SvREFCNT_dec(cbt->cb);
1034 0           Safefree(cbt);
1035 0           self->callback_depth--;
1036 0           check_destroyed(self);
1037 0           return;
1038             }
1039             }
1040              
1041             /* Connection teardown with active subscription: hiredis fires reply_cb
1042             * once per subscribed channel (from dict iteration in __redisAsyncFree).
1043             * Track sub_count and remove from queue on last channel to prevent
1044             * disconnect_cb's remove_cb_queue_sv from invoking the callback again. */
1045 0 0         if (cbt->persist && NULL == reply) {
    0          
1046 0 0         if (cbt->sub_count > 1) {
1047 0           cbt->sub_count--;
1048             } else {
1049 0           ngx_queue_remove(&cbt->queue);
1050 0           self->callback_depth++;
1051 0 0         if (NULL != cbt->cb) SvREFCNT_dec(cbt->cb);
1052 0           Safefree(cbt);
1053 0           self->callback_depth--;
1054 0           check_destroyed(self);
1055             }
1056 0           return;
1057             }
1058              
1059 0 0         if (0 == cbt->persist) {
1060             /* Remove from queue BEFORE SvREFCNT_dec. The SvREFCNT_dec may free a
1061             * closure that holds the last reference to this object, triggering
1062             * DESTROY. If cbt is still in the queue, DESTROY's remove_cb_queue_sv
1063             * would double-free it. Wrapping in callback_depth defers DESTROY's
1064             * Safefree(self) so we can safely access self afterward. */
1065 0           ngx_queue_remove(&cbt->queue);
1066 0           self->pending_count--;
1067 0           self->callback_depth++;
1068 0 0         if (NULL != cbt->cb) SvREFCNT_dec(cbt->cb);
1069 0           Safefree(cbt);
1070             /* Don't drain waiting queue when reply is NULL (connection dying) —
1071             * disconnect_cb will handle reconnect and wait queue preservation. */
1072 0 0         if (reply != NULL) {
1073 0           send_next_waiting(self);
1074             }
1075 0           self->callback_depth--;
1076 0           check_destroyed(self);
1077             }
1078             }
1079              
1080             /* Submit a cbt (already in cb_queue) to Redis. On failure, removes cbt from
1081             * queue, invokes error callback, and frees cbt. Returns REDIS_OK or REDIS_ERR. */
1082 0           static int submit_to_redis(EV__Redis self, ev_redis_cb_t* cbt,
1083             int argc, const char** argv, const size_t* argvlen)
1084             {
1085 0           redisCallbackFn* fn = EV__redis_reply_cb;
1086 0           void* privdata = (void*)cbt;
1087 0           const char* cmd = argv[0];
1088              
1089             /* Hiredis does not store callbacks for unsubscribe commands — replies are
1090             * routed through the original subscribe callback. Pass NULL so hiredis
1091             * doesn't hold a dangling reference; we clean up our cbt below. */
1092 0 0         if (cbt->persist && is_unsubscribe_command(cmd)) {
    0          
1093 0           fn = NULL;
1094 0           privdata = NULL;
1095             }
1096              
1097 0           int r = redisAsyncCommandArgv(
1098             self->ac, fn, privdata,
1099             argc, argv, argvlen
1100             );
1101              
1102 0 0         if (REDIS_OK != r) {
1103 0           ngx_queue_remove(&cbt->queue);
1104 0 0         if (!cbt->persist) self->pending_count--;
1105              
1106 0 0         if (NULL != cbt->cb) {
1107 0 0         invoke_callback_error(cbt->cb, sv_2mortal(newSVpv(
    0          
1108             (self->ac && self->ac->errstr[0]) ? self->ac->errstr : "command failed", 0)));
1109 0           SvREFCNT_dec(cbt->cb);
1110             }
1111 0           Safefree(cbt);
1112 0 0         } else if (fn == NULL) {
1113             /* Successfully sent an unsubscribe command. Since hiredis won't
1114             * call us back, we must clean up our tracking 'cbt' now. */
1115 0           ngx_queue_remove(&cbt->queue);
1116 0 0         if (NULL != cbt->cb) SvREFCNT_dec(cbt->cb);
1117 0           Safefree(cbt);
1118             }
1119              
1120 0           return r;
1121             }
1122              
1123             /* Send waiting commands to Redis. Uses iterative loop instead of recursion
1124             * to avoid stack overflow when many commands fail consecutively. */
1125 0           static void send_next_waiting(EV__Redis self) {
1126             ngx_queue_t* q;
1127             ev_redis_wait_t* wt;
1128             ev_redis_cb_t* cbt;
1129             int r;
1130              
1131             while (1) {
1132             /* Check preconditions each iteration - they may change after callbacks */
1133 0 0         if (NULL == self->ac || self->intentional_disconnect) return;
    0          
1134 0 0         if (ngx_queue_empty(&self->wait_queue)) return;
1135 0 0         if (self->max_pending > 0 && self->pending_count >= self->max_pending) return;
    0          
1136              
1137 0           q = ngx_queue_head(&self->wait_queue);
1138 0           wt = ngx_queue_data(q, ev_redis_wait_t, queue);
1139 0           ngx_queue_remove(q);
1140 0           self->waiting_count--;
1141              
1142 0           Newx(cbt, 1, ev_redis_cb_t);
1143 0           cbt->cb = wt->cb;
1144 0           wt->cb = NULL;
1145 0           cbt->skipped = 0;
1146 0           cbt->persist = wt->persist;
1147 0 0         cbt->sub_count = wt->persist ? wt->argc - 1 : 0;
1148 0           ngx_queue_init(&cbt->queue);
1149 0           ngx_queue_insert_tail(&self->cb_queue, &cbt->queue);
1150 0 0         if (!cbt->persist) self->pending_count++;
1151              
1152 0           r = submit_to_redis(self, cbt, wt->argc,
1153 0           (const char**)wt->argv, wt->argvlen);
1154 0           free_wait_entry(wt);
1155              
1156             /* If command failed, loop to try next from queue.
1157             * If command succeeded, continue loop to fill more slots up to max_pending. */
1158             }
1159             }
1160              
1161             MODULE = EV::Redis PACKAGE = EV::Redis
1162              
1163             BOOT:
1164             {
1165 24 50         I_EV_API("EV::Redis");
    50          
    50          
1166              
1167             /* Initialize shared error strings */
1168 24           err_skipped = newSVpvs_share("skipped");
1169 24           SvREADONLY_on(err_skipped);
1170              
1171 24           err_waiting_timeout = newSVpvs_share("waiting timeout");
1172 24           SvREADONLY_on(err_waiting_timeout);
1173              
1174 24           err_disconnected = newSVpvs_share("disconnected");
1175 24           SvREADONLY_on(err_disconnected);
1176             #ifdef EV_REDIS_SSL
1177 24           redisInitOpenSSL();
1178             #endif
1179             }
1180              
1181             EV::Redis
1182             _new(char* class, EV::Loop loop);
1183             CODE:
1184             {
1185             PERL_UNUSED_VAR(class);
1186 6           Newxz(RETVAL, 1, ev_redis_t);
1187 6           RETVAL->magic = EV_REDIS_MAGIC;
1188 6           ngx_queue_init(&RETVAL->cb_queue);
1189 6           ngx_queue_init(&RETVAL->wait_queue);
1190 6           RETVAL->loop = loop;
1191 6           RETVAL->cloexec = 1;
1192             }
1193             OUTPUT:
1194             RETVAL
1195              
1196             void
1197             DESTROY(EV::Redis self);
1198             CODE:
1199             {
1200             redisAsyncContext* ac_to_free;
1201 6           int skip_cb_cleanup = 0;
1202              
1203             /* Check for use-after-free: if magic number is wrong, this object
1204             * was already freed and memory is being reused. Skip cleanup. */
1205 6 50         if (self->magic != EV_REDIS_MAGIC) {
1206 0 0         if (self->magic == EV_REDIS_FREED) {
1207             /* Already destroyed - this is a double-free at Perl level */
1208 0           return;
1209             }
1210             /* Unknown magic - memory corruption or uninitialized */
1211 0           return;
1212             }
1213              
1214             /* Mark as freed FIRST to prevent re-entrant DESTROY */
1215 6           self->magic = EV_REDIS_FREED;
1216              
1217             /* Stop timers BEFORE PL_dirty check. Timer callbacks have self as data
1218             * pointer, so we must stop them before freeing self to prevent UAF.
1219             * The stop helpers check for NULL loop, so this is safe even if loop
1220             * is already destroyed. */
1221 6           stop_reconnect_timer(self);
1222 6           stop_waiting_timer(self);
1223              
1224             /* During global destruction (PL_dirty), the EV loop and other Perl
1225             * objects may already be destroyed. Clean up hiredis and our own memory
1226             * but don't invoke Perl-level handlers.
1227             * CRITICAL: We must call redisAsyncFree to stop the libev adapter's
1228             * watchers and free the redisAsyncContext. Without this, the adapter's
1229             * ev_io/ev_timer watchers remain registered in the EV loop with dangling
1230             * data pointers, causing SEGV during process cleanup. */
1231 6 50         if (PL_dirty) {
1232 0 0         if (NULL != self->ac) {
1233             /* Null cb_queue callbacks before redisAsyncFree to prevent
1234             * SvREFCNT_dec on potentially-freed SVs during global destruction.
1235             * (Same pattern as wait_queue below.) */
1236             {
1237             ngx_queue_t* q;
1238 0           for (q = ngx_queue_head(&self->cb_queue);
1239 0 0         q != ngx_queue_sentinel(&self->cb_queue);
1240 0           q = ngx_queue_next(q)) {
1241 0           ev_redis_cb_t* cbt = ngx_queue_data(q, ev_redis_cb_t, queue);
1242 0           cbt->cb = NULL;
1243             }
1244             }
1245 0           self->ac->data = NULL; /* prevent callbacks from accessing self */
1246 0           redisAsyncFree(self->ac);
1247 0           self->ac = NULL;
1248             }
1249 0 0         if (NULL != self->ac_saved) {
1250 0           self->ac_saved->data = NULL;
1251 0           self->ac_saved = NULL;
1252             }
1253 0           free_c_fields(self);
1254             /* Free wait_queue C memory (skip Perl callbacks during global destruction) */
1255 0 0         while (!ngx_queue_empty(&self->wait_queue)) {
1256 0           ngx_queue_t* q = ngx_queue_head(&self->wait_queue);
1257 0           ev_redis_wait_t* wt = ngx_queue_data(q, ev_redis_wait_t, queue);
1258 0           ngx_queue_remove(q);
1259 0           wt->cb = NULL; /* skip SvREFCNT_dec during PL_dirty */
1260 0           free_wait_entry(wt);
1261             }
1262 0           Safefree(self);
1263 0           return;
1264             }
1265              
1266 6           self->reconnect = 0;
1267              
1268             /* CRITICAL: Set self->ac to NULL BEFORE calling redisAsyncFree.
1269             * redisAsyncFree triggers reply callbacks, which call send_next_waiting,
1270             * which checks self->ac != NULL before issuing commands. If we don't
1271             * clear self->ac first, send_next_waiting will try to call
1272             * redisAsyncCommandArgv during the teardown, causing heap corruption. */
1273 6           self->loop = NULL;
1274 6           ac_to_free = self->ac;
1275 6           self->ac = NULL;
1276 6 50         if (NULL != ac_to_free) {
1277             /* If inside a hiredis callback (REDIS_IN_CALLBACK), redisAsyncFree
1278             * will be deferred. hiredis will fire pending reply callbacks later
1279             * via __redisAsyncFree. NULL ac->data so those callbacks see NULL self
1280             * and handle cleanup without accessing freed memory. */
1281 0 0         if (ac_to_free->c.flags & REDIS_IN_CALLBACK) {
1282 0           ac_to_free->data = NULL;
1283 0           skip_cb_cleanup = 1;
1284             }
1285              
1286             /* Protect against premature free in disconnect_cb if triggered synchronously */
1287 0           self->callback_depth++;
1288 0           redisAsyncFree(ac_to_free);
1289 0           self->callback_depth--;
1290             }
1291             /* If disconnect() was called from inside a callback, ac_saved points to
1292             * the deferred async context. NULL its data pointer to prevent the
1293             * deferred disconnect callback from accessing freed self. */
1294 6 50         if (self->ac_saved != NULL) {
1295 0           self->ac_saved->data = NULL;
1296 0           self->ac_saved = NULL;
1297 0           skip_cb_cleanup = 1;
1298             }
1299 6 50         CLEAR_HANDLER(self->error_handler);
1300 6 50         CLEAR_HANDLER(self->connect_handler);
1301 6 50         CLEAR_HANDLER(self->disconnect_handler);
1302 6 50         CLEAR_HANDLER(self->push_handler);
1303 6           free_c_fields(self);
1304              
1305 6 50         if (!self->in_wait_cleanup) {
1306 6           clear_wait_queue_sv(self, err_disconnected);
1307             }
1308 6 50         if (!skip_cb_cleanup && !self->in_cb_cleanup) {
    50          
1309             /* Safe to free cbts ourselves — hiredis has no deferred references. */
1310 6           remove_cb_queue_sv(self, NULL);
1311             }
1312             /* else: hiredis still holds references to our cbts (deferred free/disconnect).
1313             * reply_cb will handle cbt cleanup when called with self == NULL. */
1314              
1315             /* Defer Safefree if inside a callback — check_destroyed() handles it */
1316 6 50         if (self->current_cb == NULL && self->callback_depth == 0) {
    50          
1317 6           Safefree(self);
1318             }
1319             }
1320              
1321             void
1322             connect(EV::Redis self, char* hostname, int port = 6379);
1323             CODE:
1324             {
1325             redisOptions opts;
1326              
1327 0 0         if (NULL != self->ac) {
1328 0           croak("already connected");
1329             }
1330              
1331 0           self->intentional_disconnect = 0;
1332 0           self->reconnect_attempts = 0;
1333 0           clear_connection_params(self);
1334 0           self->host = savepv(hostname);
1335 0           self->port = port;
1336              
1337 0           memset(&opts, 0, sizeof(opts));
1338 0           pre_connect_common(self, &opts);
1339 0           REDIS_OPTIONS_SET_TCP(&opts, hostname, port);
1340 0           self->ac = redisAsyncConnectWithOptions(&opts);
1341 0 0         if (NULL == self->ac) {
1342 0           croak("cannot allocate memory");
1343             }
1344              
1345 0           (void)post_connect_setup(self, "connect error");
1346             }
1347              
1348             void
1349             connect_unix(EV::Redis self, const char* path);
1350             CODE:
1351             {
1352             redisOptions opts;
1353              
1354 0 0         if (NULL != self->ac) {
1355 0           croak("already connected");
1356             }
1357              
1358 0           self->intentional_disconnect = 0;
1359 0           self->reconnect_attempts = 0;
1360 0           clear_connection_params(self);
1361 0           self->path = savepv(path);
1362              
1363 0           memset(&opts, 0, sizeof(opts));
1364 0           pre_connect_common(self, &opts);
1365 0           REDIS_OPTIONS_SET_UNIX(&opts, path);
1366 0           self->ac = redisAsyncConnectWithOptions(&opts);
1367 0 0         if (NULL == self->ac) {
1368 0           croak("cannot allocate memory");
1369             }
1370              
1371 0           (void)post_connect_setup(self, "connect error");
1372             }
1373              
1374             void
1375             disconnect(EV::Redis self);
1376             CODE:
1377             {
1378             /* Stop any pending reconnect timer on explicit disconnect */
1379 0           self->intentional_disconnect = 1;
1380 0           stop_reconnect_timer(self);
1381 0           self->reconnect_attempts = 0;
1382              
1383 0 0         if (NULL == self->ac) {
1384             /* Already disconnected — still stop waiting timer and clear
1385             * wait queue (e.g., resume_waiting_on_reconnect kept them alive
1386             * after a connection drop, but user now explicitly disconnects). */
1387 0           stop_waiting_timer(self);
1388 0 0         if (!ngx_queue_empty(&self->wait_queue)) {
1389 0           self->callback_depth++;
1390 0           clear_wait_queue_sv(self, err_disconnected);
1391 0           self->callback_depth--;
1392 0           check_destroyed(self);
1393             }
1394 0           return;
1395             }
1396             /* Save ac pointer for deferred disconnect: when inside a hiredis
1397             * callback, redisAsyncDisconnect only sets REDIS_DISCONNECTING and
1398             * returns. DESTROY needs ac_saved to NULL ac->data if the Perl object
1399             * is freed before the deferred disconnect completes.
1400             * Only set when REDIS_IN_CALLBACK: in the synchronous path,
1401             * disconnect_cb fires during redisAsyncDisconnect and clears ac_saved;
1402             * but if DESTROY fires nested during that processing (SvREFCNT_dec
1403             * dropping last ref), it would NULL ac->data, causing disconnect_cb to
1404             * skip cleanup, leaving ac_saved dangling after ac is freed. */
1405 0 0         if (self->ac->c.flags & REDIS_IN_CALLBACK) {
1406 0           self->ac_saved = self->ac;
1407             }
1408             /* Protect against Safefree(self) if disconnect_cb fires synchronously
1409             * and user's on_disconnect handler drops the last Perl reference. */
1410 0           self->callback_depth++;
1411 0           redisAsyncDisconnect(self->ac);
1412 0           self->ac = NULL;
1413 0           self->callback_depth--;
1414 0 0         if (check_destroyed(self)) return;
1415             }
1416              
1417             int
1418             is_connected(EV::Redis self);
1419             CODE:
1420             {
1421 0 0         RETVAL = (NULL != self->ac) ? 1 : 0;
1422             }
1423             OUTPUT:
1424             RETVAL
1425              
1426             SV*
1427             connect_timeout(EV::Redis self, SV* timeout_ms = NULL);
1428             CODE:
1429             {
1430 0           RETVAL = timeout_accessor(&self->connect_timeout, timeout_ms, "connect_timeout");
1431             }
1432             OUTPUT:
1433             RETVAL
1434              
1435             SV*
1436             command_timeout(EV::Redis self, SV* timeout_ms = NULL);
1437             CODE:
1438             {
1439 0           RETVAL = timeout_accessor(&self->command_timeout, timeout_ms, "command_timeout");
1440             /* Apply to active connection immediately */
1441 0 0         if (NULL != timeout_ms && SvOK(timeout_ms) && NULL != self->ac && NULL != self->command_timeout) {
    0          
    0          
    0          
1442 0           redisAsyncSetTimeout(self->ac, *self->command_timeout);
1443             }
1444             }
1445             OUTPUT:
1446             RETVAL
1447              
1448             SV*
1449             on_error(EV::Redis self, SV* handler = NULL);
1450             CODE:
1451             {
1452 6           RETVAL = handler_accessor(&self->error_handler, handler, items > 1);
1453             }
1454             OUTPUT:
1455             RETVAL
1456              
1457             SV*
1458             on_connect(EV::Redis self, SV* handler = NULL);
1459             CODE:
1460             {
1461 0           RETVAL = handler_accessor(&self->connect_handler, handler, items > 1);
1462             }
1463             OUTPUT:
1464             RETVAL
1465              
1466             SV*
1467             on_disconnect(EV::Redis self, SV* handler = NULL);
1468             CODE:
1469             {
1470 0           RETVAL = handler_accessor(&self->disconnect_handler, handler, items > 1);
1471             }
1472             OUTPUT:
1473             RETVAL
1474              
1475             SV*
1476             on_push(EV::Redis self, SV* handler = NULL);
1477             CODE:
1478             {
1479 0           RETVAL = handler_accessor(&self->push_handler, handler, items > 1);
1480             /* Sync push callback with hiredis if connected */
1481 0 0         if (NULL != self->ac) {
1482 0 0         if (NULL != self->push_handler) {
1483 0           redisAsyncSetPushCallback(self->ac, EV__redis_push_cb);
1484             } else {
1485 0           redisAsyncSetPushCallback(self->ac, NULL);
1486             }
1487             }
1488             }
1489             OUTPUT:
1490             RETVAL
1491              
1492             int
1493             command(EV::Redis self, ...);
1494             PREINIT:
1495             SV* cb;
1496             char** argv;
1497             size_t* argvlen;
1498             STRLEN len;
1499             int argc, i, persist;
1500             ev_redis_cb_t* cbt;
1501             ev_redis_wait_t* wt;
1502             char* p;
1503             CODE:
1504             {
1505 1 50         if (items < 2) {
1506 0           croak("Usage: command(\"command\", ..., [$callback])");
1507             }
1508              
1509 1           cb = ST(items - 1);
1510 1 50         if (SvROK(cb) && SvTYPE(SvRV(cb)) == SVt_PVCV) {
    50          
1511 1           argc = items - 2; /* last arg is callback */
1512             }
1513             else {
1514 0           cb = NULL; /* fire-and-forget: no callback */
1515 0           argc = items - 1;
1516             }
1517              
1518 1 50         if (argc < 1) {
1519 0           croak("Usage: command(\"command\", ..., [$callback])");
1520             }
1521              
1522 1 50         if (NULL == self->ac) {
1523 1 50         if (!self->reconnect_timer_active) {
1524 1           croak("connection required before calling command");
1525             }
1526             /* Reconnect in progress — fall through to queue in wait_queue */
1527             }
1528 0           Newx(argv, argc, char*);
1529 0           SAVEFREEPV(argv);
1530 0           Newx(argvlen, argc, size_t);
1531 0           SAVEFREEPV(argvlen);
1532              
1533 0 0         for (i = 0; i < argc; i++) {
1534 0           argv[i] = SvPV(ST(i + 1), len);
1535 0           argvlen[i] = len;
1536             }
1537              
1538 0           persist = is_persistent_command(argv[0]);
1539              
1540 0 0         if (NULL == self->ac ||
1541 0 0         (self->max_pending > 0 && self->pending_count >= self->max_pending)) {
    0          
1542 0           Newx(wt, 1, ev_redis_wait_t);
1543 0           Newx(wt->argv, argc, char*);
1544 0           Newx(wt->argvlen, argc, size_t);
1545 0 0         for (i = 0; i < argc; i++) {
1546 0           Newx(p, argvlen[i] + 1, char);
1547 0           Copy(argv[i], p, argvlen[i], char);
1548 0           p[argvlen[i]] = '\0';
1549 0           wt->argv[i] = p;
1550 0           wt->argvlen[i] = argvlen[i];
1551             }
1552 0           wt->argc = argc;
1553 0           wt->cb = SvREFCNT_inc(cb);
1554 0           wt->persist = persist;
1555 0           wt->queued_at = ev_now(self->loop);
1556 0           ngx_queue_init(&wt->queue);
1557 0           ngx_queue_insert_tail(&self->wait_queue, &wt->queue);
1558 0           self->waiting_count++;
1559 0           schedule_waiting_timer(self);
1560 0           RETVAL = REDIS_OK;
1561             }
1562             else {
1563 0           Newx(cbt, 1, ev_redis_cb_t);
1564 0           cbt->cb = SvREFCNT_inc(cb);
1565 0           cbt->skipped = 0;
1566 0           cbt->persist = persist;
1567 0 0         cbt->sub_count = persist ? argc - 1 : 0;
1568 0           ngx_queue_init(&cbt->queue);
1569 0           ngx_queue_insert_tail(&self->cb_queue, &cbt->queue);
1570 0 0         if (!persist) self->pending_count++;
1571              
1572 0           RETVAL = submit_to_redis(self, cbt,
1573             argc, (const char**)argv, argvlen);
1574             }
1575             }
1576             OUTPUT:
1577             RETVAL
1578              
1579             void
1580             reconnect(EV::Redis self, int enable, int delay_ms = 1000, int max_attempts = 0);
1581             CODE:
1582             {
1583 0           validate_timeout_ms(delay_ms, "reconnect_delay");
1584 0           self->reconnect = enable ? 1 : 0;
1585 0           self->reconnect_delay_ms = delay_ms;
1586 0           self->max_reconnect_attempts = max_attempts >= 0 ? max_attempts : 0;
1587 0           self->reconnect_attempts = 0;
1588              
1589 0 0         if (!enable) {
1590 0           stop_reconnect_timer(self);
1591             }
1592             }
1593              
1594             int
1595             reconnect_enabled(EV::Redis self);
1596             CODE:
1597             {
1598 0 0         RETVAL = self->reconnect;
1599             }
1600             OUTPUT:
1601             RETVAL
1602              
1603             int
1604             pending_count(EV::Redis self);
1605             CODE:
1606             {
1607 0 0         RETVAL = self->pending_count;
1608             }
1609             OUTPUT:
1610             RETVAL
1611              
1612             int
1613             waiting_count(EV::Redis self);
1614             CODE:
1615             {
1616 0 0         RETVAL = self->waiting_count;
1617             }
1618             OUTPUT:
1619             RETVAL
1620              
1621             int
1622             max_pending(EV::Redis self, SV* limit = NULL);
1623             CODE:
1624             {
1625 0 0         if (NULL != limit && SvOK(limit)) {
    0          
1626 0           int val = SvIV(limit);
1627 0 0         if (val < 0) {
1628 0           croak("max_pending must be non-negative");
1629             }
1630 0           self->max_pending = val;
1631              
1632             /* When limit is increased or removed, send waiting commands.
1633             * callback_depth protects against DESTROY if a failed command's
1634             * error callback drops the last Perl reference to self. */
1635 0           self->callback_depth++;
1636 0           send_next_waiting(self);
1637 0           self->callback_depth--;
1638 0 0         if (check_destroyed(self)) XSRETURN_IV(0);
1639             }
1640 0 0         RETVAL = self->max_pending;
1641             }
1642             OUTPUT:
1643             RETVAL
1644              
1645             SV*
1646             waiting_timeout(EV::Redis self, SV* timeout_ms = NULL);
1647             CODE:
1648             {
1649 0 0         if (NULL != timeout_ms && SvOK(timeout_ms)) {
    0          
1650 0           IV ms = SvIV(timeout_ms);
1651 0           validate_timeout_ms(ms, "waiting_timeout");
1652 0           self->waiting_timeout_ms = (int)ms;
1653 0           schedule_waiting_timer(self);
1654             }
1655              
1656 0           RETVAL = newSViv((IV)self->waiting_timeout_ms);
1657             }
1658             OUTPUT:
1659             RETVAL
1660              
1661             int
1662             resume_waiting_on_reconnect(EV::Redis self, SV* value = NULL);
1663             CODE:
1664             {
1665 0 0         if (NULL != value && SvOK(value)) {
    0          
1666 0           self->resume_waiting_on_reconnect = SvTRUE(value) ? 1 : 0;
1667             }
1668 0 0         RETVAL = self->resume_waiting_on_reconnect;
1669             }
1670             OUTPUT:
1671             RETVAL
1672              
1673             int
1674             priority(EV::Redis self, SV* value = NULL);
1675             CODE:
1676             {
1677 0 0         if (NULL != value && SvOK(value)) {
    0          
1678 0           int prio = SvIV(value);
1679             /* Clamp priority to libev valid range: EV_MINPRI (-2) to EV_MAXPRI (+2) */
1680 0 0         if (prio < -2) prio = -2;
1681 0 0         if (prio > 2) prio = 2;
1682 0           self->priority = prio;
1683 0 0         if (NULL != self->ac) {
1684 0           redisLibevSetPriority(self->ac, prio);
1685             }
1686             }
1687 0 0         RETVAL = self->priority;
1688             }
1689             OUTPUT:
1690             RETVAL
1691              
1692             int
1693             keepalive(EV::Redis self, SV* value = NULL);
1694             CODE:
1695             {
1696 0 0         if (NULL != value && SvOK(value)) {
    0          
1697 0           int interval = SvIV(value);
1698 0 0         if (interval < 0) croak("keepalive interval must be non-negative");
1699 0 0         if (interval > MAX_TIMEOUT_MS / 1000) croak("keepalive interval too large");
1700 0           self->keepalive = interval;
1701 0 0         if (NULL != self->ac && interval > 0) {
    0          
1702 0           redisEnableKeepAliveWithInterval(&self->ac->c, interval);
1703             }
1704             }
1705 0 0         RETVAL = self->keepalive;
1706             }
1707             OUTPUT:
1708             RETVAL
1709              
1710             int
1711             prefer_ipv4(EV::Redis self, SV* value = NULL);
1712             CODE:
1713             {
1714 0 0         if (NULL != value && SvOK(value)) {
    0          
1715 0           self->prefer_ipv4 = SvTRUE(value) ? 1 : 0;
1716 0 0         if (self->prefer_ipv4) self->prefer_ipv6 = 0;
1717             }
1718 0 0         RETVAL = self->prefer_ipv4;
1719             }
1720             OUTPUT:
1721             RETVAL
1722              
1723             int
1724             prefer_ipv6(EV::Redis self, SV* value = NULL);
1725             CODE:
1726             {
1727 0 0         if (NULL != value && SvOK(value)) {
    0          
1728 0           self->prefer_ipv6 = SvTRUE(value) ? 1 : 0;
1729 0 0         if (self->prefer_ipv6) self->prefer_ipv4 = 0;
1730             }
1731 0 0         RETVAL = self->prefer_ipv6;
1732             }
1733             OUTPUT:
1734             RETVAL
1735              
1736             SV*
1737             source_addr(EV::Redis self, SV* value = NULL);
1738             CODE:
1739             {
1740 0 0         if (items > 1) {
1741 0 0         if (NULL != self->source_addr) {
1742 0           Safefree(self->source_addr);
1743 0           self->source_addr = NULL;
1744             }
1745 0 0         if (NULL != value && SvOK(value)) {
    0          
1746 0           self->source_addr = savepv(SvPV_nolen(value));
1747             }
1748             }
1749 0 0         if (NULL != self->source_addr) {
1750 0           RETVAL = newSVpv(self->source_addr, 0);
1751             } else {
1752 0           RETVAL = &PL_sv_undef;
1753             }
1754             }
1755             OUTPUT:
1756             RETVAL
1757              
1758             unsigned int
1759             tcp_user_timeout(EV::Redis self, SV* value = NULL);
1760             CODE:
1761             {
1762 0 0         if (NULL != value && SvOK(value)) {
    0          
1763 0           IV ms = SvIV(value);
1764 0           validate_timeout_ms(ms, "tcp_user_timeout");
1765 0           self->tcp_user_timeout = (unsigned int)ms;
1766             }
1767 0 0         RETVAL = self->tcp_user_timeout;
1768             }
1769             OUTPUT:
1770             RETVAL
1771              
1772             int
1773             cloexec(EV::Redis self, SV* value = NULL);
1774             CODE:
1775             {
1776 0 0         if (NULL != value && SvOK(value)) {
    0          
1777 0           self->cloexec = SvTRUE(value) ? 1 : 0;
1778             }
1779 0 0         RETVAL = self->cloexec;
1780             }
1781             OUTPUT:
1782             RETVAL
1783              
1784             int
1785             reuseaddr(EV::Redis self, SV* value = NULL);
1786             CODE:
1787             {
1788 0 0         if (NULL != value && SvOK(value)) {
    0          
1789 0           self->reuseaddr = SvTRUE(value) ? 1 : 0;
1790             }
1791 0 0         RETVAL = self->reuseaddr;
1792             }
1793             OUTPUT:
1794             RETVAL
1795              
1796             void
1797             skip_waiting(EV::Redis self);
1798             CODE:
1799             {
1800             /* Protect self from destruction during queue iteration */
1801 0           self->callback_depth++;
1802              
1803             /* If cleanup is already in progress (e.g., during expire_waiting_commands
1804             * or disconnect callback), don't modify the wait_queue. */
1805 0 0         if (self->in_wait_cleanup) {
1806 0           self->callback_depth--;
1807 0           check_destroyed(self);
1808 0           return;
1809             }
1810              
1811 0           clear_wait_queue_sv(self, err_skipped);
1812 0           stop_waiting_timer(self);
1813              
1814 0           self->callback_depth--;
1815 0           check_destroyed(self);
1816             }
1817              
1818             void
1819             skip_pending(EV::Redis self);
1820             CODE:
1821             {
1822             ngx_queue_t local_queue;
1823             ngx_queue_t* q;
1824             ev_redis_cb_t* cbt;
1825              
1826             /* Protect self from destruction during queue iteration */
1827 0           self->callback_depth++;
1828              
1829             /* Always attempt to clear waiting queue (handles its own re-entrancy) */
1830 0           clear_wait_queue_sv(self, err_skipped);
1831 0           stop_waiting_timer(self);
1832              
1833             /* If cb_queue cleanup is already in progress, stop here. */
1834 0 0         if (self->in_cb_cleanup) {
1835 0           self->callback_depth--;
1836 0           check_destroyed(self);
1837 0           return;
1838             }
1839              
1840             /* Protect cb_queue iteration from re-entrancy. If a user callback
1841             * calls skip_pending() again, the in_cb_cleanup check above will return. */
1842 0           self->in_cb_cleanup = 1;
1843              
1844 0           ngx_queue_init(&local_queue);
1845 0 0         while (!ngx_queue_empty(&self->cb_queue)) {
1846 0           q = ngx_queue_head(&self->cb_queue);
1847 0           cbt = ngx_queue_data(q, ev_redis_cb_t, queue);
1848              
1849 0 0         if (cbt == self->current_cb) {
1850             /* If current_cb is at head — if it's the only item, we're done */
1851 0 0         if (ngx_queue_next(q) == ngx_queue_sentinel(&self->cb_queue)) {
1852 0           break;
1853             }
1854 0           q = ngx_queue_next(q);
1855 0           cbt = ngx_queue_data(q, ev_redis_cb_t, queue);
1856             }
1857              
1858 0           ngx_queue_remove(q);
1859 0           ngx_queue_insert_tail(&local_queue, q);
1860             }
1861              
1862 0 0         while (!ngx_queue_empty(&local_queue)) {
1863 0 0         if (self->magic == EV_REDIS_FREED) {
1864 0           break;
1865             }
1866              
1867 0           q = ngx_queue_head(&local_queue);
1868 0           cbt = ngx_queue_data(q, ev_redis_cb_t, queue);
1869 0           ngx_queue_remove(q);
1870              
1871             /* Mark as skipped FIRST to prevent double callback invocation if
1872             * invoke_callback_error re-enters the event loop. */
1873 0           cbt->skipped = 1;
1874              
1875             /* Re-initialize queue node so any subsequent remove (from reply_cb's
1876             * skipped path on re-entry) is safe. */
1877 0           ngx_queue_init(q);
1878 0 0         if (!cbt->persist) self->pending_count--;
1879              
1880             /* Save and clear callback BEFORE invoking — if the user callback
1881             * re-enters and a Redis reply arrives, reply_cb sees skipped=1
1882             * and frees cbt. Clearing cb first avoids use-after-free. */
1883 0 0         if (NULL != cbt->cb) {
1884 0           SV* cb_to_invoke = cbt->cb;
1885 0           cbt->cb = NULL;
1886              
1887 0           invoke_callback_error(cb_to_invoke, err_skipped);
1888 0           SvREFCNT_dec(cb_to_invoke);
1889             }
1890             }
1891              
1892 0           self->in_cb_cleanup = 0;
1893              
1894 0           self->callback_depth--;
1895 0           check_destroyed(self);
1896             }
1897              
1898             int
1899             has_ssl(char* class);
1900             CODE:
1901             {
1902             PERL_UNUSED_VAR(class);
1903             #ifdef EV_REDIS_SSL
1904 4 50         RETVAL = 1;
1905             #else
1906             RETVAL = 0;
1907             #endif
1908             }
1909             OUTPUT:
1910             RETVAL
1911              
1912             #ifdef EV_REDIS_SSL
1913              
1914             void
1915             _setup_ssl_context(EV::Redis self, SV* cacert, SV* capath, SV* cert, SV* key, SV* server_name, int verify = 1);
1916             CODE:
1917             {
1918 4           redisSSLContextError ssl_error = REDIS_SSL_CTX_NONE;
1919             redisSSLOptions ssl_opts;
1920              
1921 4           memset(&ssl_opts, 0, sizeof(ssl_opts));
1922 4 100         ssl_opts.cacert_filename = (SvOK(cacert)) ? SvPV_nolen(cacert) : NULL;
1923 4 100         ssl_opts.capath = (SvOK(capath)) ? SvPV_nolen(capath) : NULL;
1924 4 100         ssl_opts.cert_filename = (SvOK(cert)) ? SvPV_nolen(cert) : NULL;
1925 4 50         ssl_opts.private_key_filename = (SvOK(key)) ? SvPV_nolen(key) : NULL;
1926 4 50         ssl_opts.server_name = (SvOK(server_name)) ? SvPV_nolen(server_name) : NULL;
1927 4           ssl_opts.verify_mode = verify ? REDIS_SSL_VERIFY_PEER : REDIS_SSL_VERIFY_NONE;
1928              
1929             /* Free existing SSL context if any */
1930 4 50         if (NULL != self->ssl_ctx) {
1931 0           redisFreeSSLContext(self->ssl_ctx);
1932 0           self->ssl_ctx = NULL;
1933             }
1934              
1935 4           self->ssl_ctx = redisCreateSSLContextWithOptions(&ssl_opts, &ssl_error);
1936              
1937 4 100         if (NULL == self->ssl_ctx) {
1938 2           croak("SSL context creation failed: %s", redisSSLContextGetError(ssl_error));
1939             }
1940             }
1941              
1942             #endif