File Coverage

lib/Redis/Fast.xs
Criterion Covered Total %
statement 359 731 49.1
branch 221 672 32.8
condition n/a
subroutine n/a
pod n/a
total 580 1403 41.3


line stmt bran cond sub pod time code
1             #define PERL_NO_GET_CONTEXT
2             #include "EXTERN.h"
3             #include "perl.h"
4             #include "XSUB.h"
5              
6             #include "ppport.h"
7             #include "hiredis.h"
8             #include "hiredis_ssl.h"
9             #include "async.h"
10              
11             #include
12             #include
13             #include
14             #include
15             #include
16             #include
17              
18             #define MAX_ERROR_SIZE 256
19              
20             #define WAIT_FOR_EVENT_OK 0
21             #define WAIT_FOR_EVENT_READ_TIMEOUT 1
22             #define WAIT_FOR_EVENT_WRITE_TIMEOUT 2
23             #define WAIT_FOR_EVENT_EXCEPTION 3
24              
25             #define FLAG_INSIDE_TRANSACTION 0x01
26             #define FLAG_INSIDE_WATCH 0x02
27              
28             #define DEBUG_MSG(fmt, ...) \
29             if (self->debug) { \
30             fprintf(stderr, "[%d][%d][%s:%d:%s]: ", getpid(), getppid(), __FILE__, __LINE__, __func__); \
31             fprintf(stderr, fmt, __VA_ARGS__); \
32             fprintf(stderr, "\n"); \
33             }
34              
35             #define EQUALS_COMMAND(len, cmd, expected) ((len) == sizeof(expected) - 1 && memcmp(cmd, expected, sizeof(expected) - 1) == 0)
36              
37             typedef struct redis_fast_s {
38             redisAsyncContext* ac;
39             char* hostname;
40             int port;
41             char* path;
42             char* error;
43             double reconnect;
44             int every;
45             int debug;
46             double cnx_timeout;
47             double read_timeout;
48             double write_timeout;
49             int current_database;
50             int need_reconnect;
51             int is_connected;
52             SV* on_connect;
53             SV* on_build_sock;
54             SV* data;
55             SV* reconnect_on_error;
56             double next_reconnect_on_error_at;
57             int proccess_sub_count;
58             int is_subscriber;
59             int expected_subs;
60             pid_t pid;
61             int ssl;
62             int ssl_verify_mode;
63             int flags;
64             } redis_fast_t, *Redis__Fast;
65              
66             typedef struct redis_fast_reply_s {
67             SV* result;
68             SV* error;
69             } redis_fast_reply_t;
70              
71             typedef redis_fast_reply_t (*CUSTOM_DECODE)(pTHX_ Redis__Fast self, redisReply* reply, int collect_errors);
72              
73             typedef struct redis_fast_sync_cb_s {
74             redis_fast_reply_t ret;
75             int collect_errors;
76             CUSTOM_DECODE custom_decode;
77             int on_flags;
78             int off_flags;
79             } redis_fast_sync_cb_t;
80              
81             typedef struct redis_fast_async_cb_s {
82             SV* cb;
83             int collect_errors;
84             CUSTOM_DECODE custom_decode;
85             int on_flags;
86             int off_flags;
87             const void* command_name;
88             STRLEN command_length;
89             } redis_fast_async_cb_t;
90              
91             typedef struct redis_fast_subscribe_cb_s {
92             Redis__Fast self;
93             SV* cb;
94             } redis_fast_subscribe_cb_t;
95              
96              
97             #define WAIT_FOR_READ 0x01
98             #define WAIT_FOR_WRITE 0x02
99             typedef struct redis_fast_event_s {
100             int flags;
101             Redis__Fast self;
102             } redis_fast_event_t;
103              
104              
105 207           static void AddRead(void *privdata) {
106             redis_fast_event_t *e = (redis_fast_event_t*)privdata;
107 207           Redis__Fast self = e->self;
108 207           e->flags |= WAIT_FOR_READ;
109 207 50         DEBUG_MSG("flags = %x", e->flags);
110 207           }
111              
112 0           static void DelRead(void *privdata) {
113             redis_fast_event_t *e = (redis_fast_event_t*)privdata;
114 0           Redis__Fast self = e->self;
115 0           e->flags &= ~WAIT_FOR_READ;
116 0 0         DEBUG_MSG("flags = %x", e->flags);
117 0           }
118              
119 136           static void AddWrite(void *privdata) {
120             redis_fast_event_t *e = (redis_fast_event_t*)privdata;
121 136           Redis__Fast self = e->self;
122 136           e->flags |= WAIT_FOR_WRITE;
123 136 50         DEBUG_MSG("flags = %x", e->flags);
124 136           }
125              
126 140           static void DelWrite(void *privdata) {
127             redis_fast_event_t *e = (redis_fast_event_t*)privdata;
128 140           Redis__Fast self = e->self;
129 140           e->flags &= ~WAIT_FOR_WRITE;
130 140 50         DEBUG_MSG("flags = %x", e->flags);
131 140           }
132              
133 68           static void Cleanup(void *privdata) {
134 68           free(privdata);
135 68           }
136              
137 68           static int Attach(redisAsyncContext *ac) {
138 68           Redis__Fast self = (Redis__Fast)ac->data;
139             redis_fast_event_t *e;
140              
141             /* Nothing should be attached when something is already attached */
142 68 50         if (ac->ev.data != NULL)
143             return REDIS_ERR;
144              
145             /* Create container for context and r/w events */
146 68           e = (redis_fast_event_t*)malloc(sizeof(*e));
147 68           e->flags = 0;
148 68           e->self = self;
149              
150             /* Register functions to start/stop listening for events */
151 68           ac->ev.addRead = AddRead;
152 68           ac->ev.delRead = DelRead;
153 68           ac->ev.addWrite = AddWrite;
154 68           ac->ev.delWrite = DelWrite;
155 68           ac->ev.cleanup = Cleanup;
156 68           ac->ev.data = e;
157              
158 68           return REDIS_OK;
159             }
160              
161 62791           static int wait_for_event(pTHX_ Redis__Fast self, double read_timeout, double write_timeout) {
162             redisContext *c;
163             int fd;
164             redis_fast_event_t *e;
165             struct pollfd pollfd;
166             int rc;
167             double timeout = -1;
168             int timeout_mode = WAIT_FOR_EVENT_WRITE_TIMEOUT;
169             int ms;
170              
171 62791 50         if(self==NULL) return WAIT_FOR_EVENT_EXCEPTION;
172 62791 50         if(self->ac==NULL) return WAIT_FOR_EVENT_EXCEPTION;
173              
174             c = &(self->ac->c);
175 62791           fd = c->fd;
176 62791           e = (redis_fast_event_t*)self->ac->ev.data;
177 62791 50         if(e==NULL) return 0;
178              
179 62791 100         if((e->flags & (WAIT_FOR_READ|WAIT_FOR_WRITE)) == (WAIT_FOR_READ|WAIT_FOR_WRITE)) {
180 68 50         DEBUG_MSG("set READ and WRITE, compare read_timeout = %f and write_timeout = %f",
181             read_timeout, write_timeout);
182 68 100         if(read_timeout < 0 && write_timeout < 0) {
    50          
183             timeout = -1;
184             timeout_mode = WAIT_FOR_EVENT_WRITE_TIMEOUT;
185             } else if(read_timeout < 0) {
186             timeout = write_timeout;
187             timeout_mode = WAIT_FOR_EVENT_WRITE_TIMEOUT;
188 2 50         } else if(write_timeout < 0) {
189             timeout = read_timeout;
190             timeout_mode = WAIT_FOR_EVENT_READ_TIMEOUT;
191 0 0         } else if(read_timeout < write_timeout) {
192             timeout = read_timeout;
193             timeout_mode = WAIT_FOR_EVENT_READ_TIMEOUT;
194             } else {
195             timeout = write_timeout;
196             timeout_mode = WAIT_FOR_EVENT_WRITE_TIMEOUT;
197             }
198 62723 100         } else if(e->flags & WAIT_FOR_READ) {
199 68 50         DEBUG_MSG("set READ, read_timeout = %f", read_timeout);
200             timeout = read_timeout;
201             timeout_mode = WAIT_FOR_EVENT_READ_TIMEOUT;
202 62655 50         } else if(e->flags & WAIT_FOR_WRITE) {
203 62655 50         DEBUG_MSG("set WRITE, write_timeout = %f", write_timeout);
204             timeout = write_timeout;
205             timeout_mode = WAIT_FOR_EVENT_WRITE_TIMEOUT;
206             }
207              
208 62791           START_POLL:
209 62791 100         if (timeout < 0) {
210             ms = -1;
211             } else {
212 4           ms = (int)(timeout * 1000 + 0.999);
213             }
214 62791 50         DEBUG_MSG("select start, timeout is %f", timeout);
215 62791           pollfd.fd = fd;
216 62791           pollfd.events = 0;
217 62791           pollfd.revents = 0;
218 62791 100         if(e->flags & WAIT_FOR_READ) { pollfd.events |= POLLIN; }
219 62791 100         if(e->flags & WAIT_FOR_WRITE) { pollfd.events |= POLLOUT; }
220             rc = poll(&pollfd, 1, ms);
221 62791 50         DEBUG_MSG("poll returns %d", rc);
222 62791 100         if(rc == 0) {
223 1 50         DEBUG_MSG("%s", "timeout");
224 1           return timeout_mode;
225             }
226              
227 62790 50         if(rc < 0) {
228 0 0         DEBUG_MSG("exception: %s", strerror(errno));
229 0 0         if( errno == EINTR ) {
230 0 0         PERL_ASYNC_CHECK();
231 0 0         DEBUG_MSG("%s", "recieved interrupt. retry wait_for_event");
232 0           goto START_POLL;
233             }
234             return WAIT_FOR_EVENT_EXCEPTION;
235             }
236 62790 50         if(self->ac && (pollfd.revents & POLLIN) != 0) {
    100          
237 67 50         DEBUG_MSG("ready to %s", "read");
238 67           redisAsyncHandleRead(self->ac);
239             }
240 62790 100         if(self->ac && (pollfd.revents & (POLLOUT|POLLHUP)) != 0) {
    100          
241 62727 50         DEBUG_MSG("ready to %s", "write");
242 62727           redisAsyncHandleWrite(self->ac);
243             }
244 62790 50         if((pollfd.revents & (POLLERR|POLLNVAL)) != 0) {
245 0 0         DEBUG_MSG(
    0          
    0          
246             "exception: %s%s",
247             (pollfd.revents & POLLERR) ? "POLLERR " : "",
248             (pollfd.revents & POLLNVAL) ? "POLLNVAL " : "");
249 0           return WAIT_FOR_EVENT_EXCEPTION;
250             }
251              
252 62790 50         DEBUG_MSG("%s", "finish");
253             return WAIT_FOR_EVENT_OK;
254             }
255              
256              
257 130           static int _wait_all_responses(pTHX_ Redis__Fast self) {
258 130 50         DEBUG_MSG("%s", "start");
259 265 100         while(self->ac && self->ac->replies.tail) {
    100          
260 136           int res = wait_for_event(aTHX_ self, self->read_timeout, self->write_timeout);
261 136 100         if (res != WAIT_FOR_EVENT_OK) {
262 1 50         DEBUG_MSG("error: %d", res);
263 1           return res;
264             }
265             }
266 129 50         DEBUG_MSG("%s", "finish");
267             return WAIT_FOR_EVENT_OK;
268             }
269              
270              
271 68           static void Redis__Fast_connect_cb(redisAsyncContext* c, int status) {
272 68           Redis__Fast self = (Redis__Fast)c->data;
273 68 50         DEBUG_MSG("connected status = %d", status);
274 68 50         if(status != REDIS_OK) {
275             // Connection Error!!
276             // Redis context will close automatically
277 0           self->ac = NULL;
278             } else {
279 68           self->is_connected = 1;
280             }
281 68           }
282              
283 68           static void Redis__Fast_disconnect_cb(redisAsyncContext* c, int status) {
284 68           Redis__Fast self = (Redis__Fast)c->data;
285             PERL_UNUSED_VAR(status);
286 68 50         DEBUG_MSG("disconnected status = %d", status);
287 68           self->ac = NULL;
288 68           }
289              
290 69           static redisAsyncContext* __build_sock(pTHX_ Redis__Fast self)
291             {
292             redisAsyncContext *ac;
293             double timeout;
294             int res;
295              
296 69 50         DEBUG_MSG("%s", "start");
297              
298 69 50         if(self->on_build_sock) {
299 0           dSP;
300              
301 0           ENTER;
302 0           SAVETMPS;
303              
304 0 0         PUSHMARK(SP);
305 0           call_sv(self->on_build_sock, G_DISCARD | G_NOARGS);
306              
307 0 0         FREETMPS;
308 0           LEAVE;
309             }
310              
311 69 100         if(self->path) {
312 66           ac = redisAsyncConnectUnix(self->path);
313             } else {
314 3           ac = redisAsyncConnect(self->hostname, self->port);
315             }
316              
317 69 50         if(ac == NULL) {
318 0 0         DEBUG_MSG("%s", "allocation error");
319 0           return NULL;
320             }
321 69 50         if(ac->err) {
322 0 0         DEBUG_MSG("connection error: %s", ac->errstr);
323 0           redisAsyncFree(ac);
324 0           return NULL;
325             }
326              
327 69 100         if(self->ssl){
328             redisSSLContext* ssl_context;
329 1           redisSSLContextError ssl_error = REDIS_SSL_CTX_NONE;
330 1           redisSSLOptions options = {
331             .cacert_filename = NULL,
332             .capath = NULL,
333             .cert_filename = NULL,
334             .private_key_filename = NULL,
335 1           .server_name = self->hostname,
336 1           .verify_mode = self->ssl_verify_mode,
337             };
338 1           ssl_context = redisCreateSSLContextWithOptions(&options, &ssl_error);
339              
340 1 50         if(ssl_context == NULL || ssl_error != REDIS_SSL_CTX_NONE) {
    50          
341 0 0         DEBUG_MSG("ssl context error: %s", redisSSLContextGetError(ssl_error));
342 0           redisAsyncFree(ac);
343 0           redisFreeSSLContext(ssl_context);
344 1           return NULL;
345             }
346              
347 1 50         if (redisInitiateSSLWithContext(&ac->c, ssl_context) != REDIS_OK) {
348 1 50         DEBUG_MSG("ssl connection error: %s", ac->c.errstr);
349 1           redisAsyncFree(ac);
350 1           redisFreeSSLContext(ssl_context);
351 1           return NULL;
352             }
353 0 0         DEBUG_MSG("%s", "ssl connection setup");
354             }
355              
356 68           ac->data = (void*)self;
357 68           self->ac = ac;
358 68           self->is_connected = 0;
359              
360 68           Attach(ac);
361 68           redisAsyncSetConnectCallback(ac, (redisConnectCallback*)Redis__Fast_connect_cb);
362 68           redisAsyncSetDisconnectCallback(ac, (redisDisconnectCallback*)Redis__Fast_disconnect_cb);
363              
364             // wait to connect...
365             timeout = -1;
366 68 50         if(self->cnx_timeout) {
367             timeout = self->cnx_timeout;
368             }
369 62723 100         while(!self->is_connected) {
370 62655           res = wait_for_event(aTHX_ self, timeout, timeout);
371 62655 50         if(self->ac == NULL) {
372             // set is_connected flag to reconnect.
373             // see https://github.com/shogo82148/Redis-Fast/issues/73
374 0           self->is_connected = 1;
375              
376 0           return NULL;
377             }
378 62655 50         if(res != WAIT_FOR_EVENT_OK) {
379 0 0         DEBUG_MSG("error: %d", res);
380              
381             // free the redis context
382 0           redisAsyncFree(self->ac);
383 0           _wait_all_responses(aTHX_ self);
384 0           self->ac = NULL;
385              
386             // set is_connected flag to reconnect.
387             // see https://github.com/shogo82148/Redis-Fast/issues/73
388 0           self->is_connected = 1;
389              
390 0           return NULL;
391             }
392             }
393 68 50         if(self->on_connect){
394 68           dSP;
395 68 50         PUSHMARK(SP);
396 68           call_sv(self->on_connect, G_DISCARD | G_NOARGS);
397             }
398              
399 68 50         DEBUG_MSG("%s", "finish");
400 68           return self->ac;
401             }
402              
403              
404 69           static void Redis__Fast_connect(pTHX_ Redis__Fast self) {
405             struct timeval start, end;
406              
407 69 50         DEBUG_MSG("%s", "start");
408              
409 69           self->flags = 0;
410              
411             //$self->{queue} = [];
412 69           self->pid = getpid();
413              
414 69 100         if(self->reconnect == 0) {
415 3 100         if(! __build_sock(aTHX_ self)) {
416 1 50         if(self->path) {
417 0           snprintf(self->error, MAX_ERROR_SIZE, "Could not connect to Redis server at %s", self->path);
418             } else {
419 1           snprintf(self->error, MAX_ERROR_SIZE, "Could not connect to Redis server at %s:%d", self->hostname, self->port);
420             }
421 1           croak("%s", self->error);
422             }
423 68           return ;
424             }
425              
426             // Reconnect...
427 66           gettimeofday(&start, NULL);
428 0           while (1) {
429             double elapsed_time;
430 66 50         if(__build_sock(aTHX_ self)) {
431             // Connected!
432 66 50         DEBUG_MSG("%s", "finish");
433 66           return;
434             }
435 0           gettimeofday(&end, NULL);
436 0           elapsed_time = (end.tv_sec-start.tv_sec) + 1E-6 * (end.tv_usec-start.tv_usec);
437 0 0         DEBUG_MSG("elapsed time:%f, reconnect:%lf", elapsed_time, self->reconnect);
438 0 0         if( elapsed_time > self->reconnect) {
439 0 0         if(self->path) {
440 0           snprintf(self->error, MAX_ERROR_SIZE, "Could not connect to Redis server at %s", self->path);
441             } else {
442 0           snprintf(self->error, MAX_ERROR_SIZE, "Could not connect to Redis server at %s:%d", self->hostname, self->port);
443             }
444 0 0         DEBUG_MSG("%s", "timed out");
445 0           croak("%s", self->error);
446             return;
447             }
448 0 0         DEBUG_MSG("%s", "failed to connect. wait...");
449 0           usleep(self->every);
450             }
451             DEBUG_MSG("%s", "finish");
452             }
453              
454             // reconnect if the current connection is closed.
455             // the caller must check self->ac != 0 to continue.
456 71           static void Redis__Fast_reconnect(pTHX_ Redis__Fast self) {
457 71 50         DEBUG_MSG("%s", "start");
458 71 50         if(self->is_connected && !self->ac && self->reconnect > 0) {
    100          
    50          
459 0 0         DEBUG_MSG("%s", "connection not found. reconnect");
460 0           Redis__Fast_connect(aTHX_ self);
461             }
462 71 100         if(!self->ac) {
463 3 50         DEBUG_MSG("%s", "Not connected to any server");
464             }
465 71 50         DEBUG_MSG("%s", "finish");
466 71           }
467              
468 140           static redis_fast_reply_t Redis__Fast_decode_reply(pTHX_ Redis__Fast self, redisReply* reply, int collect_errors) {
469             redis_fast_reply_t res = {NULL, NULL};
470              
471 140           switch (reply->type) {
472 14           case REDIS_REPLY_ERROR:
473 14           res.error = sv_2mortal(newSVpvn(reply->str, reply->len));
474 14           break;
475 51           case REDIS_REPLY_STRING:
476             case REDIS_REPLY_STATUS:
477 51           res.result = sv_2mortal(newSVpvn(reply->str, reply->len));
478 51           break;
479              
480 37           case REDIS_REPLY_INTEGER:
481 37           res.result = sv_2mortal(newSViv(reply->integer));
482 37           break;
483 18           case REDIS_REPLY_NIL:
484             res.result = &PL_sv_undef;
485 18           break;
486              
487 20           case REDIS_REPLY_ARRAY: {
488 20           AV* av = newAV();
489             size_t i;
490 20           res.result = sv_2mortal(newRV_noinc((SV*)av));
491              
492 93 100         for (i = 0; i < reply->elements; i++) {
493 73           redis_fast_reply_t elem = Redis__Fast_decode_reply(aTHX_ self, reply->element[i], collect_errors);
494 73 100         if(collect_errors) {
495 3           AV* elem_av = (AV*)sv_2mortal((SV*)newAV());
496 3 100         if(elem.result) {
497 2           av_push(elem_av, SvREFCNT_inc(elem.result));
498             } else {
499 1           av_push(elem_av, newSV(0));
500             }
501 3 100         if(elem.error) {
502 1           av_push(elem_av, SvREFCNT_inc(elem.error));
503             } else {
504 2           av_push(elem_av, newSV(0));
505             }
506 3           av_push(av, newRV_inc((SV*)elem_av));
507             } else {
508 70 100         if(elem.result) {
509 68           av_push(av, SvREFCNT_inc(elem.result));
510             } else {
511 2           av_push(av, newSV(0));
512             }
513 70 100         if(elem.error && !res.error) {
    50          
514             res.error = elem.error;
515             }
516             }
517             }
518             break;
519             }
520             }
521              
522 140           return res;
523             }
524              
525 65           static int Redis__Fast_call_reconnect_on_error(pTHX_ Redis__Fast self, redis_fast_reply_t ret, const void *command_name, STRLEN command_length) {
526             int _need_reconnect = 0;
527             struct timeval current;
528             double current_sec;
529             SV* sv_ret;
530             SV* sv_err;
531             SV* sv_cmd;
532             int count;
533              
534 65 100         if (ret.error == NULL) {
535             return _need_reconnect;
536             }
537 13 50         if (self->reconnect_on_error == NULL) {
538             return _need_reconnect;
539             }
540              
541 0           gettimeofday(¤t, NULL);
542 0           current_sec = current.tv_sec + 1E-6 * current.tv_usec;
543 0 0         if( self->next_reconnect_on_error_at < 0 ||
    0          
544             self->next_reconnect_on_error_at < current_sec) {
545 0           dSP;
546 0           ENTER;
547 0           SAVETMPS;
548              
549 0 0         sv_ret = ret.result ? ret.result : &PL_sv_undef;
550             sv_err = ret.error;
551 0           sv_cmd = sv_2mortal(newSVpvn((const char*)command_name, command_length));
552              
553 0 0         PUSHMARK(SP);
554 0 0         XPUSHs(sv_err);
555 0 0         XPUSHs(sv_ret);
556 0 0         XPUSHs(sv_cmd);
557 0           PUTBACK;
558              
559 0           count = call_sv(self->reconnect_on_error, G_SCALAR);
560              
561 0           SPAGAIN;
562              
563 0 0         if (count != 1) {
564 0           croak("[BUG] retval count should be 1\n");
565             }
566 0           _need_reconnect = POPi;
567              
568 0           PUTBACK;
569 0 0         FREETMPS;
570 0           LEAVE;
571             }
572              
573             return _need_reconnect;
574             }
575              
576 67           static void Redis__Fast_sync_reply_cb(redisAsyncContext* c, void* reply, void* privdata) {
577             dTHX;
578              
579 67           Redis__Fast self = (Redis__Fast)c->data;
580             redis_fast_sync_cb_t *cbt = (redis_fast_sync_cb_t*)privdata;
581 67 50         DEBUG_MSG("%p", (void*)privdata);
582 67 100         if(reply) {
583 66           self->flags = (self->flags | cbt->on_flags) & cbt->off_flags;
584 66 50         if(cbt->custom_decode) {
585 0           cbt->ret = (cbt->custom_decode)(aTHX_ self, (redisReply*)reply, cbt->collect_errors);
586             } else {
587 66           cbt->ret = Redis__Fast_decode_reply(aTHX_ self, (redisReply*)reply, cbt->collect_errors);
588             }
589 1 50         } else if(c->c.flags & REDIS_FREEING) {
590 1 50         DEBUG_MSG("%s", "redis freeing");
591 1           Safefree(cbt);
592             } else {
593 0 0         DEBUG_MSG("connect error: %s", c->errstr);
594 0           self->need_reconnect = 1;
595 0           cbt->ret.result = NULL;
596 0           cbt->ret.error = sv_2mortal( newSVpvn(c->errstr, strlen(c->errstr)) );
597             }
598 67 50         DEBUG_MSG("%s", "finish");
599 67           }
600              
601 1           static void Redis__Fast_async_reply_cb(redisAsyncContext* c, void* reply, void* privdata) {
602             dTHX;
603              
604 1           Redis__Fast self = (Redis__Fast)c->data;
605             redis_fast_async_cb_t *cbt = (redis_fast_async_cb_t*)privdata;
606 1 50         DEBUG_MSG("%p, %p", reply, privdata);
607 1 50         if (reply) {
608 1           self->flags = (self->flags | cbt->on_flags) & cbt->off_flags;
609              
610             {
611             redis_fast_reply_t result;
612              
613 1           dSP;
614              
615 1           ENTER;
616 1           SAVETMPS;
617              
618 1 50         if(cbt->custom_decode) {
619 0           result = (cbt->custom_decode)(aTHX_ self, (redisReply*)reply, cbt->collect_errors);
620             } else {
621 1           result = Redis__Fast_decode_reply(aTHX_ self, (redisReply*)reply, cbt->collect_errors);
622             }
623              
624 1 50         if(result.result == NULL) result.result = &PL_sv_undef;
625 1 50         if(result.error == NULL) result.error = &PL_sv_undef;
626              
627 1 50         PUSHMARK(SP);
628 1 50         XPUSHs(result.result);
629 1 50         XPUSHs(result.error);
630 1           PUTBACK;
631              
632 1           call_sv(cbt->cb, G_DISCARD);
633              
634 1 50         FREETMPS;
635 1           LEAVE;
636             }
637              
638             {
639 1 50         if (0 < self->reconnect && !self->need_reconnect
    50          
640             // Avoid useless cost when reconnect_on_error is not set.
641 1 50         && self->reconnect_on_error != NULL) {
642             redis_fast_reply_t result;
643 0 0         if(cbt->custom_decode) {
644 0           result = (cbt->custom_decode)(
645             aTHX_ self, (redisReply*)reply, cbt->collect_errors
646             );
647             } else {
648 0           result = Redis__Fast_decode_reply(
649             aTHX_ self, (redisReply*)reply, cbt->collect_errors
650             );
651             }
652 0           self->need_reconnect = Redis__Fast_call_reconnect_on_error(
653             aTHX_ self, result, cbt->command_name, cbt->command_length
654             );
655             }
656             }
657             } else {
658 0 0         if (c->c.flags & REDIS_FREEING) {
659 0 0         DEBUG_MSG("%s", "redis freeing");
660             } else {
661 0 0         DEBUG_MSG("connect error: %s", c->errstr);
662             }
663              
664             {
665             redis_fast_reply_t result;
666             const char *msg;
667              
668 0           dSP;
669              
670 0           ENTER;
671 0           SAVETMPS;
672              
673             result.result = &PL_sv_undef;
674 0 0         if (c->c.flags & REDIS_FREEING) {
675             msg = "redis freeing";
676             } else {
677 0           msg = c->errstr;
678             }
679 0 0         DEBUG_MSG("error: %s", msg);
680 0           result.error = sv_2mortal(newSVpvn(msg, strlen(msg)));
681              
682 0 0         PUSHMARK(SP);
683 0 0         XPUSHs(result.result);
684 0 0         XPUSHs(result.error);
685 0           PUTBACK;
686              
687 0           call_sv(cbt->cb, G_DISCARD);
688              
689 0 0         FREETMPS;
690 0           LEAVE;
691             }
692             }
693              
694 1           SvREFCNT_dec(cbt->cb);
695 1           Safefree(cbt);
696 1           }
697              
698 0           static void Redis__Fast_subscribe_cb(redisAsyncContext* c, void* reply, void* privdata) {
699             dTHX;
700              
701             int is_need_free = 0;
702 0           Redis__Fast self = (Redis__Fast)c->data;
703             redis_fast_subscribe_cb_t *cbt = (redis_fast_subscribe_cb_t*)privdata;
704             redisReply* r = (redisReply*)reply;
705              
706 0 0         DEBUG_MSG("%s", "start");
707 0 0         if(!cbt) {
708 0 0         DEBUG_MSG("%s", "cbt is empty finished");
709 0           return ;
710             }
711              
712 0 0         if (r) {
713 0           char* stype = r->element[0]->str;
714 0           int pvariant = (tolower(stype[0]) == 'p') ? 1 : 0;
715             redis_fast_reply_t res;
716              
717 0           dSP;
718 0           ENTER;
719 0           SAVETMPS;
720              
721 0           res = Redis__Fast_decode_reply(aTHX_ self, r, 0);
722              
723 0 0         if (strcasecmp(stype+pvariant,"subscribe") == 0) {
724 0 0         DEBUG_MSG("%s %s %lld", r->element[0]->str, r->element[1]->str, r->element[2]->integer);
725 0           self->is_subscriber = r->element[2]->integer;
726 0           self->expected_subs--;
727 0 0         } else if (strcasecmp(stype+pvariant,"unsubscribe") == 0) {
728 0 0         DEBUG_MSG("%s %s %lld", r->element[0]->str, r->element[1]->str, r->element[2]->integer);
729 0           self->is_subscriber = r->element[2]->integer;
730             is_need_free = 1;
731 0           self->expected_subs--;
732             } else {
733 0 0         DEBUG_MSG("%s %s", r->element[0]->str, r->element[1]->str);
734 0           self->proccess_sub_count++;
735             }
736              
737 0 0         if(res.result == NULL) res.result = &PL_sv_undef;
738 0 0         if(res.error == NULL) res.error = &PL_sv_undef;
739              
740 0 0         PUSHMARK(SP);
741 0 0         XPUSHs(res.result);
742 0 0         XPUSHs(res.error);
743 0           PUTBACK;
744              
745 0           call_sv(cbt->cb, G_DISCARD);
746              
747 0 0         FREETMPS;
748 0           LEAVE;
749             } else {
750 0 0         DEBUG_MSG("connect error: %s", c->errstr);
751             is_need_free = 1;
752             }
753              
754 0 0         if(is_need_free) {
755             // destroy private data
756 0 0         DEBUG_MSG("destroy %p", cbt);
757 0 0         if(cbt->cb) {
758 0           SvREFCNT_dec(cbt->cb);
759 0           cbt->cb = NULL;
760             }
761 0           Safefree(cbt);
762             }
763 0 0         DEBUG_MSG("%s", "finish");
764             }
765              
766 0           static void Redis__Fast_quit(pTHX_ Redis__Fast self) {
767             redis_fast_sync_cb_t *cbt;
768              
769 0 0         if(!self->ac) {
770             return;
771             }
772              
773 0           Newx(cbt, sizeof(redis_fast_sync_cb_t), redis_fast_sync_cb_t);
774 0           cbt->ret.result = NULL;
775 0           cbt->ret.error = NULL;
776 0           cbt->custom_decode = NULL;
777              
778             // initialize, or self->flags will be corrupted.
779 0           cbt->on_flags = 0;
780 0           cbt->off_flags = 0;
781              
782 0           redisAsyncCommand(
783             self->ac, Redis__Fast_sync_reply_cb, cbt, "QUIT"
784             );
785 0           redisAsyncDisconnect(self->ac);
786 0 0         if(_wait_all_responses(aTHX_ self) == WAIT_FOR_EVENT_OK) {
787 0 0         DEBUG_MSG("%s", "wait_all_responses ok");
788 0 0         if(cbt->ret.result || cbt->ret.error) Safefree(cbt);
    0          
789             } else {
790 0 0         DEBUG_MSG("%s", "wait_all_responses not ok");
791 0 0         if(cbt->ret.result || cbt->ret.error) Safefree(cbt);
    0          
792             }
793 0 0         DEBUG_MSG("%s", "finish");
794             }
795              
796 68           static redis_fast_reply_t Redis__Fast_run_cmd(pTHX_ Redis__Fast self, int collect_errors, CUSTOM_DECODE custom_decode, SV* cb, int argc, const char** argv, size_t* argvlen) {
797             redis_fast_reply_t ret = {NULL, NULL};
798             int on_flags = 0, off_flags = ~0;
799              
800 68 50         DEBUG_MSG("start %s", argv[0]);
801              
802 68 50         DEBUG_MSG("pid check: previous pid is %d, now %d", self->pid, getpid());
803 68 50         if(self->pid != getpid()) {
804 0 0         DEBUG_MSG("%s", "pid changed. create new connection..");
805 0           Redis__Fast_connect(aTHX_ self);
806             }
807              
808 68 50         if(EQUALS_COMMAND(argvlen[0], argv[0], "MULTI")) {
    0          
809             on_flags = FLAG_INSIDE_TRANSACTION;
810 68 100         } else if(EQUALS_COMMAND(argvlen[0], argv[0], "EXEC") ||
    50          
    50          
811 0 0         EQUALS_COMMAND(argvlen[0], argv[0], "DISCARD")) {
812             off_flags = ~(FLAG_INSIDE_TRANSACTION | FLAG_INSIDE_WATCH);
813 67 50         } else if(EQUALS_COMMAND(argvlen[0], argv[0], "WATCH")) {
    0          
814             on_flags = FLAG_INSIDE_WATCH;
815 67 50         } else if(EQUALS_COMMAND(argvlen[0], argv[0], "UNWATCH")) {
    0          
816             off_flags = ~FLAG_INSIDE_WATCH;
817             }
818              
819 68 100         if(cb) {
820             redis_fast_async_cb_t *cbt;
821 1           Newx(cbt, sizeof(redis_fast_async_cb_t), redis_fast_async_cb_t);
822 1           cbt->cb = SvREFCNT_inc(cb);
823 1           cbt->custom_decode = custom_decode;
824 1           cbt->collect_errors = collect_errors;
825 1           cbt->on_flags = on_flags;
826 1           cbt->off_flags = off_flags;
827 1           cbt->command_name = argv[0];
828 1           cbt->command_length = argvlen[0];
829 1           redisAsyncCommandArgv(
830             self->ac, Redis__Fast_async_reply_cb, cbt,
831             argc, argv, argvlen
832             );
833 1           ret.result = sv_2mortal(newSViv(1));
834             } else {
835             redis_fast_sync_cb_t *cbt;
836 67 100         int i, cnt = (self->reconnect == 0 ? 1 : 2);
837             int res = WAIT_FOR_EVENT_OK;
838 67 50         for(i = 0; i < cnt; i++) {
839 67           Newx(cbt, sizeof(redis_fast_sync_cb_t), redis_fast_sync_cb_t);
840 67           self->need_reconnect = 0;
841 67           cbt->ret.result = NULL;
842 67           cbt->ret.error = NULL;
843 67           cbt->custom_decode = custom_decode;
844 67           cbt->collect_errors = collect_errors;
845 67           cbt->on_flags = on_flags;
846 67           cbt->off_flags = off_flags;
847 67 50         DEBUG_MSG("%s", "send command in sync mode");
848 67           redisAsyncCommandArgv(
849             self->ac, Redis__Fast_sync_reply_cb, cbt,
850             argc, argv, argvlen
851             );
852 67 50         DEBUG_MSG("%s", "waiting response");
853 67           res = _wait_all_responses(aTHX_ self);
854 67 100         if(res == WAIT_FOR_EVENT_OK && self->need_reconnect == 0) {
    50          
855             int _need_reconnect = 0;
856 66 100         if (1 < cnt - i) {
857 65           _need_reconnect = Redis__Fast_call_reconnect_on_error(
858             aTHX_ self, cbt->ret, argv[0], argvlen[0]
859             );
860             // Should be quit before reconnect
861 65 50         if (_need_reconnect) {
862 0           Redis__Fast_quit(aTHX_ self);
863             }
864             }
865             if (!_need_reconnect) {
866 66           ret = cbt->ret;
867 66 100         if(cbt->ret.result || cbt->ret.error) Safefree(cbt);
    50          
868 66 50         DEBUG_MSG("finish %s", argv[0]);
869 66           return ret;
870             }
871             }
872              
873 1 50         if( res == WAIT_FOR_EVENT_READ_TIMEOUT ) break;
874              
875 0 0         if(self->flags & (FLAG_INSIDE_TRANSACTION | FLAG_INSIDE_WATCH)) {
876             char *msg = "reconnect disabled inside transaction or watch";
877 0 0         DEBUG_MSG("error: %s", msg);
878 0           ret.error = sv_2mortal(newSVpvn(msg, strlen(msg)));
879 0           return ret;
880             }
881              
882 0           Redis__Fast_reconnect(aTHX_ self);
883 0 0         if(!self->ac) {
884             char *msg = "Not connected to any server";
885 0 0         DEBUG_MSG("error: %s", msg);
886 0           ret.error = sv_2mortal(newSVpvn(msg, strlen(msg)));
887 0           return ret;
888             }
889             }
890              
891 1 50         if( res == WAIT_FOR_EVENT_OK && (cbt->ret.result || cbt->ret.error) ) Safefree(cbt);
    0          
    0          
892             // else destructor will release cbt
893              
894 1 50         if(res == WAIT_FOR_EVENT_READ_TIMEOUT || res == WAIT_FOR_EVENT_WRITE_TIMEOUT) {
895 1 50         snprintf(self->error, MAX_ERROR_SIZE, "Error while reading from Redis server: %s", strerror(EAGAIN));
896 1           errno = EAGAIN;
897 1 50         DEBUG_MSG("error: %s", self->error);
898 1           ret.error = sv_2mortal(newSVpvn(self->error, strlen(self->error)));
899 1           return ret;
900             }
901 0 0         if(!self->ac) {
902             char *msg = "Not connected to any server";
903 0 0         DEBUG_MSG("error: %s", msg);
904 0           ret.error = sv_2mortal(newSVpvn(msg, strlen(msg)));
905 0           return ret;
906             }
907             }
908 1 50         DEBUG_MSG("Finish %s", argv[0]);
909 1           return ret;
910             }
911              
912 0           static redis_fast_reply_t Redis__Fast_keys_custom_decode(pTHX_ Redis__Fast self, redisReply* reply, int collect_errors) {
913             // TODO: Support redis <= 1.2.6
914 0           return Redis__Fast_decode_reply(aTHX_ self, reply, collect_errors);
915             }
916              
917 0           static redis_fast_reply_t Redis__Fast_info_custom_decode(pTHX_ Redis__Fast self, redisReply* reply, int collect_errors) {
918             redis_fast_reply_t res = {NULL, NULL};
919              
920 0 0         if(reply->type == REDIS_REPLY_STRING ||
921             reply->type == REDIS_REPLY_STATUS) {
922              
923 0           HV* hv = newHV();
924 0           char* str = reply->str;
925 0           size_t len = reply->len;
926 0           res.result = sv_2mortal(newRV_noinc((SV*)hv));
927              
928 0 0         while(len != 0) {
929 0           const char* line = (char*)memchr(str, '\r', len);
930             const char* sep;
931             size_t linelen;
932 0 0         if(line == NULL) {
933             linelen = len;
934             } else {
935 0           linelen = line - str;
936             }
937 0           sep = (char*)memchr(str, ':', linelen);
938 0 0         if(str[0] != '#' && sep != NULL) {
    0          
939             SV* val;
940             SV** ret;
941             size_t keylen;
942 0           keylen = sep - str;
943 0           val = sv_2mortal(newSVpvn(sep + 1, linelen - keylen - 1));
944 0           ret = hv_store(hv, str, keylen, SvREFCNT_inc(val), 0);
945 0 0         if (ret == NULL) {
946 0           SvREFCNT_dec(val);
947 0           croak("failed to hv_store");
948             }
949             }
950 0 0         if(line == NULL) {
951             break;
952             } else {
953 0           len -= linelen + 2;
954 0           str += linelen + 2;
955             }
956             }
957             } else {
958 0           res = Redis__Fast_decode_reply(aTHX_ self, reply, collect_errors);
959             }
960              
961 0           return res;
962             }
963              
964             MODULE = Redis::Fast PACKAGE = Redis::Fast
965              
966             SV*
967             _new(char* cls);
968             PREINIT:
969             redis_fast_t* self;
970             CODE:
971             {
972 70           redisInitOpenSSL();
973 70           Newxz(self, sizeof(redis_fast_t), redis_fast_t);
974 70 50         DEBUG_MSG("%s", "start");
975 70           self->error = (char*)malloc(MAX_ERROR_SIZE);
976 70           self->reconnect_on_error = NULL;
977 70           self->next_reconnect_on_error_at = -1;
978 70           self->is_connected = 1;
979 70           ST(0) = sv_newmortal();
980 70           sv_setref_pv(ST(0), cls, (void*)self);
981 70 50         DEBUG_MSG("return %p", ST(0));
982 70           XSRETURN(1);
983             }
984             OUTPUT:
985             RETVAL
986              
987             double
988             __set_reconnect(Redis::Fast self, double val)
989             CODE:
990             {
991 206 100         RETVAL = self->reconnect = val;
992             }
993             OUTPUT:
994             RETVAL
995              
996              
997             double
998             __get_reconnect(Redis::Fast self)
999             CODE:
1000             {
1001 68 50         RETVAL = self->reconnect;
1002             }
1003             OUTPUT:
1004             RETVAL
1005              
1006              
1007             int
1008             __set_every(Redis::Fast self, int val)
1009             CODE:
1010             {
1011 70 100         RETVAL = self->every = val;
1012             }
1013             OUTPUT:
1014             RETVAL
1015              
1016              
1017             int
1018             __get_every(Redis::Fast self)
1019             CODE:
1020             {
1021 0 0         RETVAL = self->every;
1022             }
1023             OUTPUT:
1024             RETVAL
1025              
1026             int
1027             __set_debug(Redis::Fast self, int val)
1028             CODE:
1029             {
1030 70 100         RETVAL = self->debug = val;
1031             }
1032             OUTPUT:
1033             RETVAL
1034              
1035             double
1036             __set_cnx_timeout(Redis::Fast self, double val)
1037             CODE:
1038             {
1039 70 100         RETVAL = self->cnx_timeout = val;
1040             }
1041             OUTPUT:
1042             RETVAL
1043              
1044             double
1045             __get_cnx_timeout(Redis::Fast self)
1046             CODE:
1047             {
1048 0 0         RETVAL = self->cnx_timeout;
1049             }
1050             OUTPUT:
1051             RETVAL
1052              
1053              
1054             double
1055             __set_read_timeout(Redis::Fast self, double val)
1056             CODE:
1057             {
1058 70 100         RETVAL = self->read_timeout = val;
1059             }
1060             OUTPUT:
1061             RETVAL
1062              
1063             double
1064             __get_read_timeout(Redis::Fast self)
1065             CODE:
1066             {
1067 0 0         RETVAL = self->read_timeout;
1068             }
1069             OUTPUT:
1070             RETVAL
1071              
1072              
1073             double
1074             __set_write_timeout(Redis::Fast self, double val)
1075             CODE:
1076             {
1077 70 100         RETVAL = self->write_timeout = val;
1078             }
1079             OUTPUT:
1080             RETVAL
1081              
1082             double
1083             __get_write_timeout(Redis::Fast self)
1084             CODE:
1085             {
1086 0 0         RETVAL = self->write_timeout;
1087             }
1088             OUTPUT:
1089             RETVAL
1090              
1091              
1092             int
1093             __set_current_database(Redis::Fast self, int val)
1094             CODE:
1095             {
1096 0 0         RETVAL = self->current_database = val;
1097             }
1098             OUTPUT:
1099             RETVAL
1100              
1101              
1102             int
1103             __get_current_database(Redis::Fast self)
1104             CODE:
1105             {
1106 0 0         RETVAL = self->current_database;
1107             }
1108             OUTPUT:
1109             RETVAL
1110              
1111              
1112             int
1113             __sock(Redis::Fast self)
1114             CODE:
1115             {
1116 0 0         RETVAL = self->ac ? self->ac->c.fd : 0;
1117             }
1118             OUTPUT:
1119             RETVAL
1120              
1121              
1122             int
1123             __get_port(Redis::Fast self)
1124             CODE:
1125             {
1126             struct sockaddr_in addr;
1127             socklen_t len;
1128 0           len = sizeof( addr );
1129 0           getsockname( self->ac->c.fd, ( struct sockaddr *)&addr, &len );
1130 0 0         RETVAL = addr.sin_port;
1131             }
1132             OUTPUT:
1133             RETVAL
1134              
1135              
1136             void
1137             __set_on_connect(Redis::Fast self, SV* func)
1138             CODE:
1139             {
1140 70           self->on_connect = SvREFCNT_inc(func);
1141             }
1142              
1143             void
1144             __set_on_build_sock(Redis::Fast self, SV* func)
1145             CODE:
1146             {
1147 0           self->on_build_sock = SvREFCNT_inc(func);
1148             }
1149              
1150             void
1151             __set_data(Redis::Fast self, SV* data)
1152             CODE:
1153             {
1154 70           self->data = SvREFCNT_inc(data);
1155             }
1156              
1157             void
1158             __get_data(Redis::Fast self)
1159             CODE:
1160             {
1161 272           ST(0) = self->data;
1162 272           XSRETURN(1);
1163             }
1164              
1165             void
1166             __set_reconnect_on_error(Redis::Fast self, SV* func)
1167             CODE:
1168             {
1169 0           self->reconnect_on_error = SvREFCNT_inc(func);
1170             }
1171              
1172             double
1173             __set_next_reconnect_on_error_at(Redis::Fast self, double val)
1174             CODE:
1175             {
1176             struct timeval current;
1177             double current_sec;
1178              
1179 0 0         if ( -1 < val ) {
1180 0           gettimeofday(¤t, NULL);
1181 0           current_sec = current.tv_sec + 1E-6 * current.tv_usec;
1182 0           val += current_sec;
1183             }
1184              
1185 0 0         RETVAL = self->next_reconnect_on_error_at = val;
1186             }
1187             OUTPUT:
1188             RETVAL
1189              
1190             int
1191             __set_ssl(Redis::Fast self, int ssl)
1192             CODE:
1193             {
1194 70 100         RETVAL = self->ssl = ssl;
1195             }
1196             OUTPUT:
1197             RETVAL
1198              
1199             int
1200             __set_ssl_verify_mode(Redis::Fast self, char* verify_mode)
1201             CODE:
1202             {
1203 0 0         if (verify_mode != NULL) {
1204 0 0         DEBUG_MSG("SSL verify mode: %s", verify_mode);
1205 0 0         if (strcmp(verify_mode, "SSL_VERIFY_NONE") == 0 ) {
1206 0           RETVAL = self->ssl_verify_mode = REDIS_SSL_VERIFY_NONE;
1207 0 0         } else if (strcmp(verify_mode, "SSL_VERIFY_PEER") == 0 ) {
1208 0           RETVAL = self->ssl_verify_mode = REDIS_SSL_VERIFY_PEER;
1209 0 0         } else if (strcmp(verify_mode, "SSL_VERIFY_FAIL_IF_NO_PEER_CERT") == 0 ) {
1210 0           RETVAL = self->ssl_verify_mode = REDIS_SSL_VERIFY_FAIL_IF_NO_PEER_CERT;
1211 0 0         } else if (strcmp(verify_mode, "SSL_VERIFY_CLIENT_ONCE") == 0 ) {
1212 0           RETVAL = self->ssl_verify_mode = REDIS_SSL_VERIFY_CLIENT_ONCE;
1213             } else {
1214 0 0         DEBUG_MSG("Invalid SSL verify mode (%s), setting SSL_VERIFY_PEER", verify_mode);
1215 0           RETVAL = self->ssl_verify_mode = REDIS_SSL_VERIFY_PEER;
1216             }
1217             } else {
1218 0           RETVAL = self->ssl_verify_mode = REDIS_SSL_VERIFY_PEER;
1219             }
1220             }
1221             OUTPUT:
1222             RETVAL
1223              
1224             void
1225             is_subscriber(Redis::Fast self)
1226             CODE:
1227             {
1228 139           ST(0) = sv_2mortal(newSViv(self->is_subscriber));
1229 139           XSRETURN(1);
1230             }
1231              
1232              
1233             void
1234             DESTROY(Redis::Fast self);
1235             CODE:
1236             {
1237 70 50         DEBUG_MSG("%s", "start");
1238 70 100         if (self->ac) {
1239 62 50         DEBUG_MSG("%s", "free ac");
1240 62           redisAsyncFree(self->ac);
1241 62           _wait_all_responses(aTHX_ self);
1242 62           self->ac = NULL;
1243             }
1244              
1245 70 100         if(self->hostname) {
1246 4 50         DEBUG_MSG("%s", "free hostname");
1247 4           free(self->hostname);
1248 4           self->hostname = NULL;
1249             }
1250              
1251 70 100         if(self->path) {
1252 66 50         DEBUG_MSG("%s", "free path");
1253 66           free(self->path);
1254 66           self->path = NULL;
1255             }
1256              
1257 70 50         if(self->error) {
1258 70 50         DEBUG_MSG("%s", "free error");
1259 70           free(self->error);
1260 70           self->error = NULL;
1261             }
1262              
1263 70 50         if(self->on_connect) {
1264 70 50         DEBUG_MSG("%s", "free on_connect");
1265 70           SvREFCNT_dec(self->on_connect);
1266 70           self->on_connect = NULL;
1267             }
1268              
1269 70 50         if(self->on_build_sock) {
1270 0 0         DEBUG_MSG("%s", "free on_build_sock");
1271 0           SvREFCNT_dec(self->on_build_sock);
1272 0           self->on_build_sock = NULL;
1273             }
1274              
1275 70 50         if(self->data) {
1276 70 50         DEBUG_MSG("%s", "free data");
1277 70           SvREFCNT_dec(self->data);
1278 70           self->data = NULL;
1279             }
1280              
1281 70 50         if(self->reconnect_on_error) {
1282 0 0         DEBUG_MSG("%s", "free reconnect_on_error");
1283 0           SvREFCNT_dec(self->reconnect_on_error);
1284 0           self->reconnect_on_error = NULL;
1285             }
1286              
1287 70 50         DEBUG_MSG("%s", "finish");
1288 70           Safefree(self);
1289             }
1290              
1291              
1292             void
1293             __connection_info(Redis::Fast self, char* hostname, int port = 6379)
1294             CODE:
1295             {
1296 4 50         if(self->hostname) {
1297 0           free(self->hostname);
1298 0           self->hostname = NULL;
1299             }
1300              
1301 4 50         if(self->path) {
1302 0           free(self->path);
1303 0           self->path = NULL;
1304             }
1305              
1306 4 50         if(hostname) {
1307 4           self->hostname = (char*)malloc(strlen(hostname) + 1);
1308             strcpy(self->hostname, hostname);
1309             }
1310              
1311 4           self->port = port;
1312             }
1313              
1314             void
1315             __connection_info_unix(Redis::Fast self, char* path)
1316             CODE:
1317             {
1318 66 50         if(self->hostname) {
1319 0           free(self->hostname);
1320 0           self->hostname = NULL;
1321             }
1322              
1323 66 50         if(self->path) {
1324 0           free(self->path);
1325 0           self->path = NULL;
1326             }
1327              
1328 66 50         if(path) {
1329 66           self->path = (char*)malloc(strlen(path) + 1);
1330             strcpy(self->path, path);
1331             }
1332             }
1333              
1334              
1335             void
1336             connect(Redis::Fast self)
1337             CODE:
1338             {
1339 69           Redis__Fast_connect(aTHX_ self);
1340             }
1341              
1342             void
1343             wait_all_responses(Redis::Fast self)
1344             CODE:
1345             {
1346 1           int res = _wait_all_responses(aTHX_ self);
1347 1 50         if(res != WAIT_FOR_EVENT_OK) {
1348 0           croak("Error while reading from Redis server");
1349             }
1350              
1351 1 50         if (0 < self->reconnect && self->need_reconnect) {
    50          
1352             // Should be quit before reconnect
1353 0           Redis__Fast_quit(aTHX_ self);
1354 0           Redis__Fast_reconnect(aTHX_ self);
1355 0           self->need_reconnect = 0;
1356             }
1357             }
1358              
1359             void
1360             wait_one_response(Redis::Fast self)
1361             CODE:
1362             {
1363 0           int res = _wait_all_responses(aTHX_ self);
1364 0 0         if(res != WAIT_FOR_EVENT_OK) {
1365 0           croak("Error while reading from Redis server");
1366             }
1367              
1368 0 0         if (0 < self->reconnect && self->need_reconnect) {
    0          
1369             // Should be quit before reconnect
1370 0           Redis__Fast_quit(aTHX_ self);
1371 0           Redis__Fast_reconnect(aTHX_ self);
1372 0           self->need_reconnect = 0;
1373             }
1374             }
1375              
1376             void
1377             __std_cmd(Redis::Fast self, ...)
1378             PREINIT:
1379             redis_fast_reply_t ret;
1380             SV* cb;
1381             char** argv;
1382             size_t* argvlen;
1383             STRLEN len;
1384             int argc, i, collect_errors;
1385             CODE:
1386             {
1387 68           Redis__Fast_reconnect(aTHX_ self);
1388 68 50         if(!self->ac) {
1389 0           croak("Not connected to any server");
1390             }
1391              
1392 68           cb = ST(items - 1);
1393 68 100         if (SvROK(cb) && SvTYPE(SvRV(cb)) == SVt_PVCV) {
    50          
1394 1           argc = items - 2;
1395             } else {
1396             cb = NULL;
1397 67           argc = items - 1;
1398             }
1399 68 50         Newx(argv, sizeof(char*) * argc, char*);
1400 68           Newx(argvlen, sizeof(size_t) * argc, size_t);
1401              
1402 204 100         for (i = 0; i < argc; i++) {
1403 136 50         if(!sv_utf8_downgrade(ST(i + 1), 1)) {
1404 0           croak("command sent is not an octet sequence in the native encoding (Latin-1). Consider using debug mode to see the command itself.");
1405             }
1406 136           argv[i] = SvPV(ST(i + 1), len);
1407 136           argvlen[i] = len;
1408             }
1409              
1410             collect_errors = 0;
1411 68 100         if(cb && EQUALS_COMMAND(argvlen[0], argv[0], "EXEC"))
    50          
    50          
1412             collect_errors = 1;
1413              
1414 68           ret = Redis__Fast_run_cmd(aTHX_ self, collect_errors, NULL, cb, argc, (const char**)argv, argvlen);
1415              
1416 68           Safefree(argv);
1417 68           Safefree(argvlen);
1418              
1419 68 100         ST(0) = ret.result ? ret.result : &PL_sv_undef;
1420 68 100         ST(1) = ret.error ? ret.error : &PL_sv_undef;
1421 68           XSRETURN(2);
1422             }
1423              
1424              
1425             void
1426             __quit(Redis::Fast self)
1427             CODE:
1428             {
1429 0 0         DEBUG_MSG("%s", "start QUIT");
1430 0 0         if(self->ac) {
1431 0           Redis__Fast_quit(aTHX_ self);
1432 0           ST(0) = sv_2mortal(newSViv(1));
1433 0           XSRETURN(1);
1434             } else {
1435 0 0         DEBUG_MSG("%s", "finish. there is no connection.");
1436 0           XSRETURN(0);
1437             }
1438             }
1439              
1440              
1441             void
1442             __shutdown(Redis::Fast self)
1443             CODE:
1444             {
1445 0 0         DEBUG_MSG("%s", "start SHUTDOWN");
1446 0 0         if(self->ac) {
1447 0           redisAsyncCommand(
1448             self->ac, NULL, NULL, "SHUTDOWN"
1449             );
1450 0           redisAsyncDisconnect(self->ac);
1451 0           _wait_all_responses(aTHX_ self);
1452 0           self->is_connected = 0;
1453 0           ST(0) = sv_2mortal(newSViv(1));
1454 0           XSRETURN(1);
1455             } else {
1456 0 0         DEBUG_MSG("%s", "redis server has alread shutdown");
1457 0           XSRETURN(0);
1458             }
1459             }
1460              
1461              
1462             void
1463             __keys(Redis::Fast self, ...)
1464             PREINIT:
1465             redis_fast_reply_t ret;
1466             SV* cb;
1467             char** argv;
1468             size_t* argvlen;
1469             STRLEN len;
1470             int argc, i;
1471             CODE:
1472             {
1473 0           Redis__Fast_reconnect(aTHX_ self);
1474 0 0         if(!self->ac) {
1475 0           croak("Not connected to any server");
1476             }
1477              
1478 0           cb = ST(items - 1);
1479 0 0         if (SvROK(cb) && SvTYPE(SvRV(cb)) == SVt_PVCV) {
    0          
1480 0           argc = items - 1;
1481             } else {
1482             cb = NULL;
1483             argc = items;
1484             }
1485 0           Newx(argv, sizeof(char*) * argc, char*);
1486 0           Newx(argvlen, sizeof(size_t) * argc, size_t);
1487              
1488 0           argv[0] = "KEYS";
1489 0           argvlen[0] = 4;
1490 0 0         for (i = 1; i < argc; i++) {
1491 0           argv[i] = SvPV(ST(i), len);
1492 0           argvlen[i] = len;
1493             }
1494              
1495 0           ret = Redis__Fast_run_cmd(aTHX_ self, 0, Redis__Fast_keys_custom_decode, cb, argc, (const char**)argv, argvlen);
1496 0           Safefree(argv);
1497 0           Safefree(argvlen);
1498              
1499 0 0         ST(0) = ret.result ? ret.result : &PL_sv_undef;
1500 0 0         ST(1) = ret.error ? ret.error : &PL_sv_undef;
1501 0           XSRETURN(2);
1502             }
1503              
1504              
1505             void
1506             __info(Redis::Fast self, ...)
1507             PREINIT:
1508             redis_fast_reply_t ret;
1509             SV* cb;
1510             char** argv;
1511             size_t* argvlen;
1512             STRLEN len;
1513             int argc, i;
1514             CODE:
1515             {
1516 3           Redis__Fast_reconnect(aTHX_ self);
1517 3 50         if(!self->ac) {
1518             char *msg = "Not connected to any server";
1519 3           ST(0) = &PL_sv_undef;
1520 3           ST(1) = sv_2mortal(newSVpvn(msg, strlen(msg)));
1521 3           XSRETURN(2);
1522             }
1523              
1524 0           cb = ST(items - 1);
1525 0 0         if (SvROK(cb) && SvTYPE(SvRV(cb)) == SVt_PVCV) {
    0          
1526 0           argc = items - 1;
1527             } else {
1528             cb = NULL;
1529             argc = items;
1530             }
1531 0           Newx(argv, sizeof(char*) * argc, char*);
1532 0           Newx(argvlen, sizeof(size_t) * argc, size_t);
1533              
1534 0           argv[0] = "INFO";
1535 0           argvlen[0] = 4;
1536 0 0         for (i = 1; i < argc; i++) {
1537 0           argv[i] = SvPV(ST(i), len);
1538 0           argvlen[i] = len;
1539             }
1540              
1541 0           ret = Redis__Fast_run_cmd(aTHX_ self, 0, Redis__Fast_info_custom_decode, cb, argc, (const char**)argv, argvlen);
1542 0           Safefree(argv);
1543 0           Safefree(argvlen);
1544              
1545 0 0         ST(0) = ret.result ? ret.result : &PL_sv_undef;
1546 0 0         ST(1) = ret.error ? ret.error : &PL_sv_undef;
1547 0           XSRETURN(2);
1548             }
1549              
1550              
1551             void
1552             __send_subscription_cmd(Redis::Fast self, ...)
1553             PREINIT:
1554             SV* cb;
1555             char** argv;
1556             size_t* argvlen;
1557             STRLEN len;
1558             int argc, i;
1559             redis_fast_subscribe_cb_t* cbt;
1560             int pvariant;
1561             CODE:
1562             {
1563 0 0         int cnt = (self->reconnect == 0 ? 1 : 2);
1564              
1565 0 0         DEBUG_MSG("%s", "start");
1566              
1567 0           Redis__Fast_reconnect(aTHX_ self);
1568 0 0         if(!self->ac) {
1569 0           croak("Not connected to any server");
1570             }
1571              
1572 0 0         if(!self->is_subscriber) {
1573 0           _wait_all_responses(aTHX_ self);
1574             }
1575 0           cb = ST(items - 1);
1576 0 0         if (SvROK(cb) && SvTYPE(SvRV(cb)) == SVt_PVCV) {
    0          
1577 0           argc = items - 2;
1578             } else {
1579             cb = NULL;
1580 0           argc = items - 1;
1581             }
1582 0 0         Newx(argv, sizeof(char*) * argc, char*);
1583 0           Newx(argvlen, sizeof(size_t) * argc, size_t);
1584              
1585 0 0         for (i = 0; i < argc; i++) {
1586 0           argv[i] = SvPV(ST(i+1), len);
1587 0           argvlen[i] = len;
1588 0 0         DEBUG_MSG("argv[%d] = %s", i, argv[i]);
1589             }
1590              
1591 0 0         for(i = 0; i < cnt; i++) {
1592 0           pvariant = tolower(argv[0][0]) == 'p';
1593 0 0         if (strcasecmp(argv[0]+pvariant,"unsubscribe") != 0) {
1594 0 0         DEBUG_MSG("%s", "command is not unsubscribe");
1595 0           Newx(cbt, sizeof(redis_fast_subscribe_cb_t), redis_fast_subscribe_cb_t);
1596 0 0         cbt->self = self;
1597 0           cbt->cb = SvREFCNT_inc(cb);
1598             } else {
1599 0 0         DEBUG_MSG("%s", "command is unsubscribe");
1600             cbt = NULL;
1601             }
1602 0           redisAsyncCommandArgv(
1603             self->ac, cbt ? Redis__Fast_subscribe_cb : NULL, cbt,
1604             argc, (const char**)argv, argvlen
1605             );
1606 0           self->expected_subs = argc - 1;
1607 0 0         while(self->expected_subs > 0 && wait_for_event(aTHX_ self, self->read_timeout, self->write_timeout) == WAIT_FOR_EVENT_OK) ;
    0          
1608 0 0         if(self->expected_subs == 0) break;
1609 0           Redis__Fast_reconnect(aTHX_ self);
1610 0 0         if(!self->ac) {
1611 0           Safefree(argv);
1612 0           Safefree(argvlen);
1613 0           croak("Not connected to any server");
1614             }
1615             }
1616              
1617 0           Safefree(argv);
1618 0           Safefree(argvlen);
1619 0 0         DEBUG_MSG("%s", "finish");
1620 0           XSRETURN(0);
1621             }
1622              
1623             void
1624             wait_for_messages(Redis::Fast self, double timeout = -1)
1625             CODE:
1626             {
1627 0 0         int i, cnt = (self->reconnect == 0 ? 1 : 2);
1628             int res = WAIT_FOR_EVENT_OK;
1629 0 0         DEBUG_MSG("%s", "start");
1630 0           self->proccess_sub_count = 0;
1631 0 0         for(i = 0; i < cnt; i++) {
1632 0 0         while((res = wait_for_event(aTHX_ self, timeout, timeout)) == WAIT_FOR_EVENT_OK) ;
1633 0 0         if(res == WAIT_FOR_EVENT_READ_TIMEOUT || res == WAIT_FOR_EVENT_WRITE_TIMEOUT) break;
1634 0           Redis__Fast_reconnect(aTHX_ self);
1635 0 0         if(!self->ac) {
1636 0           croak("Not connected to any server");
1637             }
1638             }
1639 0 0         if(res == WAIT_FOR_EVENT_EXCEPTION) {
1640 0 0         if(!self->ac) {
1641 0 0         DEBUG_MSG("%s", "Connection not found");
1642 0           croak("EOF from server");
1643 0 0         } else if(self->ac->c.err == REDIS_ERR_EOF) {
1644 0 0         DEBUG_MSG("hiredis returns error: %s", self->ac->c.errstr);
1645 0           croak("EOF from server");
1646             } else {
1647 0 0         DEBUG_MSG("hiredis returns error: %s", self->ac->c.errstr);
1648 0           snprintf(self->error, MAX_ERROR_SIZE, "[WAIT_FOR_MESSAGES] %s", self->ac->c.errstr);
1649 0           croak("%s", self->error);
1650             }
1651             }
1652 0           ST(0) = sv_2mortal(newSViv(self->proccess_sub_count));
1653 0 0         DEBUG_MSG("finish with %d", res);
1654 0           XSRETURN(1);
1655             }
1656              
1657             void
1658             __wait_for_event(Redis::Fast self, double timeout = -1)
1659             CODE:
1660             {
1661 0 0         DEBUG_MSG("%s", "start");
1662 0           wait_for_event(aTHX_ self, timeout, timeout);
1663 0 0         DEBUG_MSG("%s", "finish");
1664 0           XSRETURN(0);
1665             }