File Coverage

lib/Redis/Fast.xs
Criterion Covered Total %
statement 342 686 49.8
branch 205 636 32.2
condition n/a
subroutine n/a
pod n/a
total 547 1322 41.3


line stmt bran cond sub pod time code
1             #define PERL_NO_GET_CONTEXT
2             #include "EXTERN.h"
3             #include "perl.h"
4             #include "XSUB.h"
5              
6             #include "ppport.h"
7             #include "hiredis.h"
8             #include "async.h"
9              
10             #include
11             #include
12             #include
13             #include
14             #include
15             #include
16              
17             #define MAX_ERROR_SIZE 256
18              
19             #define WAIT_FOR_EVENT_OK 0
20             #define WAIT_FOR_EVENT_READ_TIMEOUT 1
21             #define WAIT_FOR_EVENT_WRITE_TIMEOUT 2
22             #define WAIT_FOR_EVENT_EXCEPTION 3
23              
24             #define FLAG_INSIDE_TRANSACTION 0x01
25             #define FLAG_INSIDE_WATCH 0x02
26              
27             #define DEBUG_MSG(fmt, ...) \
28             if (self->debug) { \
29             fprintf(stderr, "[%d][%d][%s:%d:%s]: ", getpid(), getppid(), __FILE__, __LINE__, __func__); \
30             fprintf(stderr, fmt, __VA_ARGS__); \
31             fprintf(stderr, "\n"); \
32             }
33              
34             #define EQUALS_COMMAND(len, cmd, expected) ((len) == sizeof(expected) - 1 && memcmp(cmd, expected, sizeof(expected) - 1) == 0)
35              
36             typedef struct redis_fast_s {
37             redisAsyncContext* ac;
38             char* hostname;
39             int port;
40             char* path;
41             char* error;
42             double reconnect;
43             int every;
44             int debug;
45             double cnx_timeout;
46             double read_timeout;
47             double write_timeout;
48             int current_database;
49             int need_reconnect;
50             int is_connected;
51             SV* on_connect;
52             SV* on_build_sock;
53             SV* data;
54             SV* reconnect_on_error;
55             double next_reconnect_on_error_at;
56             int proccess_sub_count;
57             int is_subscriber;
58             int expected_subs;
59             pid_t pid;
60             int flags;
61             } redis_fast_t, *Redis__Fast;
62              
63             typedef struct redis_fast_reply_s {
64             SV* result;
65             SV* error;
66             } redis_fast_reply_t;
67              
68             typedef redis_fast_reply_t (*CUSTOM_DECODE)(pTHX_ Redis__Fast self, redisReply* reply, int collect_errors);
69              
70             typedef struct redis_fast_sync_cb_s {
71             redis_fast_reply_t ret;
72             int collect_errors;
73             CUSTOM_DECODE custom_decode;
74             int on_flags;
75             int off_flags;
76             } redis_fast_sync_cb_t;
77              
78             typedef struct redis_fast_async_cb_s {
79             SV* cb;
80             int collect_errors;
81             CUSTOM_DECODE custom_decode;
82             int on_flags;
83             int off_flags;
84             const void* command_name;
85             STRLEN command_length;
86             } redis_fast_async_cb_t;
87              
88             typedef struct redis_fast_subscribe_cb_s {
89             Redis__Fast self;
90             SV* cb;
91             } redis_fast_subscribe_cb_t;
92              
93              
94             #define WAIT_FOR_READ 0x01
95             #define WAIT_FOR_WRITE 0x02
96             typedef struct redis_fast_event_s {
97             int flags;
98             Redis__Fast self;
99             } redis_fast_event_t;
100              
101              
102 203           static void AddRead(void *privdata) {
103             redis_fast_event_t *e = (redis_fast_event_t*)privdata;
104 203           Redis__Fast self = e->self;
105 203           e->flags |= WAIT_FOR_READ;
106 203 50         DEBUG_MSG("flags = %x", e->flags);
107 203           }
108              
109 0           static void DelRead(void *privdata) {
110             redis_fast_event_t *e = (redis_fast_event_t*)privdata;
111 0           Redis__Fast self = e->self;
112 0           e->flags &= ~WAIT_FOR_READ;
113 0 0         DEBUG_MSG("flags = %x", e->flags);
114 0           }
115              
116 136           static void AddWrite(void *privdata) {
117             redis_fast_event_t *e = (redis_fast_event_t*)privdata;
118 136           Redis__Fast self = e->self;
119 136           e->flags |= WAIT_FOR_WRITE;
120 136 50         DEBUG_MSG("flags = %x", e->flags);
121 136           }
122              
123 136           static void DelWrite(void *privdata) {
124             redis_fast_event_t *e = (redis_fast_event_t*)privdata;
125 136           Redis__Fast self = e->self;
126 136           e->flags &= ~WAIT_FOR_WRITE;
127 136 50         DEBUG_MSG("flags = %x", e->flags);
128 136           }
129              
130 68           static void Cleanup(void *privdata) {
131 68           free(privdata);
132 68           }
133              
134 68           static int Attach(redisAsyncContext *ac) {
135 68           Redis__Fast self = (Redis__Fast)ac->data;
136             redis_fast_event_t *e;
137              
138             /* Nothing should be attached when something is already attached */
139 68 50         if (ac->ev.data != NULL)
140             return REDIS_ERR;
141              
142             /* Create container for context and r/w events */
143 68           e = (redis_fast_event_t*)malloc(sizeof(*e));
144 68           e->flags = 0;
145 68           e->self = self;
146              
147             /* Register functions to start/stop listening for events */
148 68           ac->ev.addRead = AddRead;
149 68           ac->ev.delRead = DelRead;
150 68           ac->ev.addWrite = AddWrite;
151 68           ac->ev.delWrite = DelWrite;
152 68           ac->ev.cleanup = Cleanup;
153 68           ac->ev.data = e;
154              
155 68           return REDIS_OK;
156             }
157              
158 55239           static int wait_for_event(pTHX_ Redis__Fast self, double read_timeout, double write_timeout) {
159             redisContext *c;
160             int fd;
161             redis_fast_event_t *e;
162             struct pollfd pollfd;
163             int rc;
164             double timeout = -1;
165             int timeout_mode = WAIT_FOR_EVENT_WRITE_TIMEOUT;
166             int ms;
167              
168 55239 50         if(self==NULL) return WAIT_FOR_EVENT_EXCEPTION;
169 55239 50         if(self->ac==NULL) return WAIT_FOR_EVENT_EXCEPTION;
170              
171             c = &(self->ac->c);
172 55239           fd = c->fd;
173 55239           e = (redis_fast_event_t*)self->ac->ev.data;
174 55239 50         if(e==NULL) return 0;
175              
176 55239 100         if((e->flags & (WAIT_FOR_READ|WAIT_FOR_WRITE)) == (WAIT_FOR_READ|WAIT_FOR_WRITE)) {
177 68 50         DEBUG_MSG("set READ and WRITE, compare read_timeout = %f and write_timeout = %f",
178             read_timeout, write_timeout);
179 68 100         if(read_timeout < 0 && write_timeout < 0) {
    50          
180             timeout = -1;
181             timeout_mode = WAIT_FOR_EVENT_WRITE_TIMEOUT;
182 2 50         } else if(read_timeout < 0) {
183             timeout = write_timeout;
184             timeout_mode = WAIT_FOR_EVENT_WRITE_TIMEOUT;
185 2 50         } else if(write_timeout < 0) {
186             timeout = read_timeout;
187             timeout_mode = WAIT_FOR_EVENT_READ_TIMEOUT;
188 55239 0         } else if(read_timeout < write_timeout) {
189             timeout = read_timeout;
190             timeout_mode = WAIT_FOR_EVENT_READ_TIMEOUT;
191             } else {
192             timeout = write_timeout;
193             timeout_mode = WAIT_FOR_EVENT_WRITE_TIMEOUT;
194             }
195 55171 100         } else if(e->flags & WAIT_FOR_READ) {
196 68 50         DEBUG_MSG("set READ, read_timeout = %f", read_timeout);
197             timeout = read_timeout;
198             timeout_mode = WAIT_FOR_EVENT_READ_TIMEOUT;
199 55103 50         } else if(e->flags & WAIT_FOR_WRITE) {
200 55103 50         DEBUG_MSG("set WRITE, write_timeout = %f", write_timeout);
201             timeout = write_timeout;
202             timeout_mode = WAIT_FOR_EVENT_WRITE_TIMEOUT;
203             }
204              
205             START_POLL:
206 55239 100         if (timeout < 0) {
207             ms = -1;
208             } else {
209 4           ms = (int)(timeout * 1000 + 0.999);
210             }
211 55239 50         DEBUG_MSG("select start, timeout is %f", timeout);
212 55239           pollfd.fd = fd;
213 55239           pollfd.events = 0;
214 55239           pollfd.revents = 0;
215 55239 100         if(e->flags & WAIT_FOR_READ) { pollfd.events |= POLLIN; }
216 55239 100         if(e->flags & WAIT_FOR_WRITE) { pollfd.events |= POLLOUT; }
217             rc = poll(&pollfd, 1, ms);
218 55239 50         DEBUG_MSG("poll returns %d", rc);
219 55239 100         if(rc == 0) {
220 1 50         DEBUG_MSG("%s", "timeout");
221             return timeout_mode;
222             }
223              
224 55238 50         if(rc < 0) {
225 0 0         DEBUG_MSG("exception: %s", strerror(errno));
226 0 0         if( errno == EINTR ) {
227 0 0         PERL_ASYNC_CHECK();
228 0 0         DEBUG_MSG("%s", "recieved interrupt. retry wait_for_event");
229             goto START_POLL;
230             }
231             return WAIT_FOR_EVENT_EXCEPTION;
232             }
233 55238 50         if(self->ac && (pollfd.revents & POLLIN) != 0) {
    100          
234 67 50         DEBUG_MSG("ready to %s", "read");
235 67           redisAsyncHandleRead(self->ac);
236             }
237 55238 100         if(self->ac && (pollfd.revents & (POLLOUT|POLLHUP)) != 0) {
    100          
238 55171 50         DEBUG_MSG("ready to %s", "write");
239 55171           redisAsyncHandleWrite(self->ac);
240             }
241 55238 50         if((pollfd.revents & (POLLERR|POLLNVAL)) != 0) {
242 0 0         DEBUG_MSG(
    0          
    0          
243             "exception: %s%s",
244             (pollfd.revents & POLLERR) ? "POLLERR " : "",
245             (pollfd.revents & POLLNVAL) ? "POLLNVAL " : "");
246             return WAIT_FOR_EVENT_EXCEPTION;
247             }
248              
249 55238 50         DEBUG_MSG("%s", "finish");
250             return WAIT_FOR_EVENT_OK;
251             }
252              
253              
254 130           static int _wait_all_responses(pTHX_ Redis__Fast self) {
255 130 50         DEBUG_MSG("%s", "start");
256 265 100         while(self->ac && self->ac->replies.tail) {
    100          
257 136           int res = wait_for_event(aTHX_ self, self->read_timeout, self->write_timeout);
258 136 100         if (res != WAIT_FOR_EVENT_OK) {
259 1 50         DEBUG_MSG("error: %d", res);
260             return res;
261             }
262             }
263 129 50         DEBUG_MSG("%s", "finish");
264             return WAIT_FOR_EVENT_OK;
265             }
266              
267              
268 68           static void Redis__Fast_connect_cb(redisAsyncContext* c, int status) {
269 68           Redis__Fast self = (Redis__Fast)c->data;
270 68 50         DEBUG_MSG("connected status = %d", status);
271 68 50         if(status != REDIS_OK) {
272             // Connection Error!!
273             // Redis context will close automatically
274 0           self->ac = NULL;
275             } else {
276 68           self->is_connected = 1;
277             }
278 68           }
279              
280 68           static void Redis__Fast_disconnect_cb(redisAsyncContext* c, int status) {
281 68           Redis__Fast self = (Redis__Fast)c->data;
282             PERL_UNUSED_VAR(status);
283 68 50         DEBUG_MSG("disconnected status = %d", status);
284 68           self->ac = NULL;
285 68           }
286              
287 69           static redisAsyncContext* __build_sock(pTHX_ Redis__Fast self)
288             {
289             redisAsyncContext *ac;
290             double timeout;
291             int res;
292              
293 69 50         DEBUG_MSG("%s", "start");
294              
295 69 50         if(self->on_build_sock) {
296 0           dSP;
297              
298 0           ENTER;
299 0           SAVETMPS;
300              
301 0 0         PUSHMARK(SP);
302 0           call_sv(self->on_build_sock, G_DISCARD | G_NOARGS);
303              
304 0 0         FREETMPS;
305 0           LEAVE;
306             }
307              
308 69 100         if(self->path) {
309 66           ac = redisAsyncConnectUnix(self->path);
310             } else {
311 3           ac = redisAsyncConnect(self->hostname, self->port);
312             }
313              
314 69 50         if(ac == NULL) {
315 0 0         DEBUG_MSG("%s", "allocation error");
316             return NULL;
317             }
318 69 100         if(ac->err) {
319 1 50         DEBUG_MSG("connection error: %s", ac->errstr);
320 1           redisAsyncFree(ac);
321 1           return NULL;
322             }
323 68           ac->data = (void*)self;
324 68           self->ac = ac;
325 68           self->is_connected = 0;
326              
327 68           Attach(ac);
328 68           redisAsyncSetConnectCallback(ac, (redisConnectCallback*)Redis__Fast_connect_cb);
329 68           redisAsyncSetDisconnectCallback(ac, (redisDisconnectCallback*)Redis__Fast_disconnect_cb);
330              
331             // wait to connect...
332             timeout = -1;
333 68 50         if(self->cnx_timeout) {
334             timeout = self->cnx_timeout;
335             }
336 55171 100         while(!self->is_connected) {
337 55103           res = wait_for_event(aTHX_ self, timeout, timeout);
338 55103 50         if(self->ac == NULL) {
339             // set is_connected flag to reconnect.
340             // see https://github.com/shogo82148/Redis-Fast/issues/73
341 0           self->is_connected = 1;
342              
343 0           return NULL;
344             }
345 55103 50         if(res != WAIT_FOR_EVENT_OK) {
346 0 0         DEBUG_MSG("error: %d", res);
347              
348             // free the redis context
349 0           redisAsyncFree(self->ac);
350 0           _wait_all_responses(aTHX_ self);
351 0           self->ac = NULL;
352              
353             // set is_connected flag to reconnect.
354             // see https://github.com/shogo82148/Redis-Fast/issues/73
355 0           self->is_connected = 1;
356              
357 68           return NULL;
358             }
359             }
360 68 50         if(self->on_connect){
361 68           dSP;
362 68 50         PUSHMARK(SP);
363 68           call_sv(self->on_connect, G_DISCARD | G_NOARGS);
364             }
365              
366 68 50         DEBUG_MSG("%s", "finish");
367 68           return self->ac;
368             }
369              
370              
371 69           static void Redis__Fast_connect(pTHX_ Redis__Fast self) {
372             struct timeval start, end;
373              
374 69 50         DEBUG_MSG("%s", "start");
375              
376 69           self->flags = 0;
377              
378             //$self->{queue} = [];
379 69           self->pid = getpid();
380              
381 69 100         if(self->reconnect == 0) {
382 3 100         if(! __build_sock(aTHX_ self)) {
383 1 50         if(self->path) {
384 0           snprintf(self->error, MAX_ERROR_SIZE, "Could not connect to Redis server at %s", self->path);
385             } else {
386 1           snprintf(self->error, MAX_ERROR_SIZE, "Could not connect to Redis server at %s:%d", self->hostname, self->port);
387             }
388 1           croak("%s", self->error);
389             }
390 68           return ;
391             }
392              
393             // Reconnect...
394 66           gettimeofday(&start, NULL);
395             while (1) {
396             double elapsed_time;
397 66 50         if(__build_sock(aTHX_ self)) {
398             // Connected!
399 66 50         DEBUG_MSG("%s", "finish");
400             return;
401             }
402 0           gettimeofday(&end, NULL);
403 0           elapsed_time = (end.tv_sec-start.tv_sec) + 1E-6 * (end.tv_usec-start.tv_usec);
404 0 0         DEBUG_MSG("elapsed time:%f, reconnect:%lf", elapsed_time, self->reconnect);
405 0 0         if( elapsed_time > self->reconnect) {
406 0 0         if(self->path) {
407 0           snprintf(self->error, MAX_ERROR_SIZE, "Could not connect to Redis server at %s", self->path);
408             } else {
409 0           snprintf(self->error, MAX_ERROR_SIZE, "Could not connect to Redis server at %s:%d", self->hostname, self->port);
410             }
411 0 0         DEBUG_MSG("%s", "timed out");
412 0           croak("%s", self->error);
413             return;
414             }
415 0 0         DEBUG_MSG("%s", "failed to connect. wait...");
416 0           usleep(self->every);
417 0           }
418             DEBUG_MSG("%s", "finish");
419             }
420              
421             // reconnect if the current connection is closed.
422             // the caller must check self->ac != 0 to continue.
423 71           static void Redis__Fast_reconnect(pTHX_ Redis__Fast self) {
424 71 50         DEBUG_MSG("%s", "start");
425 71 50         if(self->is_connected && !self->ac && self->reconnect > 0) {
    100          
    50          
426 0 0         DEBUG_MSG("%s", "connection not found. reconnect");
427 0           Redis__Fast_connect(aTHX_ self);
428             }
429 71 100         if(!self->ac) {
430 3 50         DEBUG_MSG("%s", "Not connected to any server");
431             }
432 71 50         DEBUG_MSG("%s", "finish");
433 71           }
434              
435 140           static redis_fast_reply_t Redis__Fast_decode_reply(pTHX_ Redis__Fast self, redisReply* reply, int collect_errors) {
436             redis_fast_reply_t res = {NULL, NULL};
437              
438 140           switch (reply->type) {
439             case REDIS_REPLY_ERROR:
440 14           res.error = sv_2mortal(newSVpvn(reply->str, reply->len));
441 14           break;
442             case REDIS_REPLY_STRING:
443             case REDIS_REPLY_STATUS:
444 51           res.result = sv_2mortal(newSVpvn(reply->str, reply->len));
445 51           break;
446              
447             case REDIS_REPLY_INTEGER:
448 37           res.result = sv_2mortal(newSViv(reply->integer));
449 37           break;
450             case REDIS_REPLY_NIL:
451             res.result = &PL_sv_undef;
452 18           break;
453              
454             case REDIS_REPLY_ARRAY: {
455 20           AV* av = newAV();
456             size_t i;
457 20           res.result = sv_2mortal(newRV_noinc((SV*)av));
458              
459 93 100         for (i = 0; i < reply->elements; i++) {
460 73           redis_fast_reply_t elem = Redis__Fast_decode_reply(aTHX_ self, reply->element[i], collect_errors);
461 73 100         if(collect_errors) {
462 3           AV* elem_av = (AV*)sv_2mortal((SV*)newAV());
463 3 100         if(elem.result) {
464 2           av_push(elem_av, SvREFCNT_inc(elem.result));
465             } else {
466 1           av_push(elem_av, newSV(0));
467             }
468 3 100         if(elem.error) {
469 1           av_push(elem_av, SvREFCNT_inc(elem.error));
470             } else {
471 2           av_push(elem_av, newSV(0));
472             }
473 3           av_push(av, newRV_inc((SV*)elem_av));
474             } else {
475 70 100         if(elem.result) {
476 68           av_push(av, SvREFCNT_inc(elem.result));
477             } else {
478 2           av_push(av, newSV(0));
479             }
480 70 100         if(elem.error && !res.error) {
    50          
481             res.error = elem.error;
482             }
483             }
484             }
485             break;
486             }
487             }
488              
489 140           return res;
490             }
491              
492 65           static int Redis__Fast_call_reconnect_on_error(pTHX_ Redis__Fast self, redis_fast_reply_t ret, const void *command_name, STRLEN command_length) {
493             int _need_reconnect = 0;
494             struct timeval current;
495             double current_sec;
496             SV* sv_ret;
497             SV* sv_err;
498             SV* sv_cmd;
499             int count;
500              
501 65 100         if (ret.error == NULL) {
502             return _need_reconnect;
503             }
504 13 50         if (self->reconnect_on_error == NULL) {
505             return _need_reconnect;
506             }
507              
508 0           gettimeofday(¤t, NULL);
509 0           current_sec = current.tv_sec + 1E-6 * current.tv_usec;
510 0 0         if( self->next_reconnect_on_error_at < 0 ||
    0          
511             self->next_reconnect_on_error_at < current_sec) {
512 0           dSP;
513 0           ENTER;
514 0           SAVETMPS;
515              
516 0 0         sv_ret = ret.result ? ret.result : &PL_sv_undef;
517             sv_err = ret.error;
518 0           sv_cmd = sv_2mortal(newSVpvn((const char*)command_name, command_length));
519              
520 0 0         PUSHMARK(SP);
521 0 0         XPUSHs(sv_err);
522 0 0         XPUSHs(sv_ret);
523 0 0         XPUSHs(sv_cmd);
524 0           PUTBACK;
525              
526 0           count = call_sv(self->reconnect_on_error, G_SCALAR);
527              
528 0           SPAGAIN;
529              
530 0 0         if (count != 1) {
531 0           croak("[BUG] retval count should be 1\n");
532             }
533 0 0         _need_reconnect = POPi;
534              
535 0           PUTBACK;
536 0 0         FREETMPS;
537 0           LEAVE;
538             }
539              
540             return _need_reconnect;
541             }
542              
543 67           static void Redis__Fast_sync_reply_cb(redisAsyncContext* c, void* reply, void* privdata) {
544             dTHX;
545              
546 67           Redis__Fast self = (Redis__Fast)c->data;
547             redis_fast_sync_cb_t *cbt = (redis_fast_sync_cb_t*)privdata;
548 67 50         DEBUG_MSG("%p", (void*)privdata);
549 67 100         if(reply) {
550 66           self->flags = (self->flags | cbt->on_flags) & cbt->off_flags;
551 66 50         if(cbt->custom_decode) {
552 0           cbt->ret = (cbt->custom_decode)(aTHX_ self, (redisReply*)reply, cbt->collect_errors);
553             } else {
554 66           cbt->ret = Redis__Fast_decode_reply(aTHX_ self, (redisReply*)reply, cbt->collect_errors);
555             }
556 1 50         } else if(c->c.flags & REDIS_FREEING) {
557 1 50         DEBUG_MSG("%s", "redis freeing");
558 1           Safefree(cbt);
559             } else {
560 0 0         DEBUG_MSG("connect error: %s", c->errstr);
561 0           self->need_reconnect = 1;
562 0           cbt->ret.result = NULL;
563 0           cbt->ret.error = sv_2mortal( newSVpvn(c->errstr, strlen(c->errstr)) );
564             }
565 67 50         DEBUG_MSG("%s", "finish");
566 67           }
567              
568 1           static void Redis__Fast_async_reply_cb(redisAsyncContext* c, void* reply, void* privdata) {
569             dTHX;
570              
571 1           Redis__Fast self = (Redis__Fast)c->data;
572             redis_fast_async_cb_t *cbt = (redis_fast_async_cb_t*)privdata;
573 1 50         DEBUG_MSG("%p, %p", reply, privdata);
574 1 50         if (reply) {
575 1           self->flags = (self->flags | cbt->on_flags) & cbt->off_flags;
576              
577             {
578             redis_fast_reply_t result;
579              
580 1           dSP;
581              
582 1           ENTER;
583 1           SAVETMPS;
584              
585 1 50         if(cbt->custom_decode) {
586 0           result = (cbt->custom_decode)(aTHX_ self, (redisReply*)reply, cbt->collect_errors);
587             } else {
588 1           result = Redis__Fast_decode_reply(aTHX_ self, (redisReply*)reply, cbt->collect_errors);
589             }
590              
591 1 50         if(result.result == NULL) result.result = &PL_sv_undef;
592 1 50         if(result.error == NULL) result.error = &PL_sv_undef;
593              
594 1 50         PUSHMARK(SP);
595 1 50         XPUSHs(result.result);
596 1 50         XPUSHs(result.error);
597 1           PUTBACK;
598              
599 1           call_sv(cbt->cb, G_DISCARD);
600              
601 1 50         FREETMPS;
602 1           LEAVE;
603             }
604              
605             {
606 1 50         if (0 < self->reconnect && !self->need_reconnect
    50          
607             // Avoid useless cost when reconnect_on_error is not set.
608 1 50         && self->reconnect_on_error != NULL) {
609             redis_fast_reply_t result;
610 0 0         if(cbt->custom_decode) {
611 0           result = (cbt->custom_decode)(
612             aTHX_ self, (redisReply*)reply, cbt->collect_errors
613             );
614             } else {
615 0           result = Redis__Fast_decode_reply(
616             aTHX_ self, (redisReply*)reply, cbt->collect_errors
617             );
618             }
619 0           self->need_reconnect = Redis__Fast_call_reconnect_on_error(
620             aTHX_ self, result, cbt->command_name, cbt->command_length
621             );
622             }
623             }
624             } else {
625 0 0         if (c->c.flags & REDIS_FREEING) {
626 0 0         DEBUG_MSG("%s", "redis freeing");
627             } else {
628 0 0         DEBUG_MSG("connect error: %s", c->errstr);
629             }
630              
631             {
632             redis_fast_reply_t result;
633             const char *msg;
634              
635 0           dSP;
636              
637 0           ENTER;
638 0           SAVETMPS;
639              
640             result.result = &PL_sv_undef;
641 0 0         if (c->c.flags & REDIS_FREEING) {
642             msg = "redis freeing";
643             } else {
644 0           msg = c->errstr;
645             }
646 0 0         DEBUG_MSG("error: %s", msg);
647 0           result.error = sv_2mortal(newSVpvn(msg, strlen(msg)));
648              
649 0 0         PUSHMARK(SP);
650 0 0         XPUSHs(result.result);
651 0 0         XPUSHs(result.error);
652 0           PUTBACK;
653              
654 0           call_sv(cbt->cb, G_DISCARD);
655              
656 0 0         FREETMPS;
657 0           LEAVE;
658             }
659             }
660              
661 1           SvREFCNT_dec(cbt->cb);
662 1           Safefree(cbt);
663 1           }
664              
665 0           static void Redis__Fast_subscribe_cb(redisAsyncContext* c, void* reply, void* privdata) {
666             dTHX;
667              
668             int is_need_free = 0;
669 0           Redis__Fast self = (Redis__Fast)c->data;
670             redis_fast_subscribe_cb_t *cbt = (redis_fast_subscribe_cb_t*)privdata;
671             redisReply* r = (redisReply*)reply;
672              
673 0 0         DEBUG_MSG("%s", "start");
674 0 0         if(!cbt) {
675 0 0         DEBUG_MSG("%s", "cbt is empty finished");
676             return ;
677             }
678              
679 0 0         if (r) {
680 0           char* stype = r->element[0]->str;
681 0           int pvariant = (tolower(stype[0]) == 'p') ? 1 : 0;
682             redis_fast_reply_t res;
683              
684 0           dSP;
685 0           ENTER;
686 0           SAVETMPS;
687              
688 0           res = Redis__Fast_decode_reply(aTHX_ self, r, 0);
689              
690 0 0         if (strcasecmp(stype+pvariant,"subscribe") == 0) {
691 0 0         DEBUG_MSG("%s %s %lld", r->element[0]->str, r->element[1]->str, r->element[2]->integer);
692 0           self->is_subscriber = r->element[2]->integer;
693 0           self->expected_subs--;
694 0 0         } else if (strcasecmp(stype+pvariant,"unsubscribe") == 0) {
695 0 0         DEBUG_MSG("%s %s %lld", r->element[0]->str, r->element[1]->str, r->element[2]->integer);
696 0           self->is_subscriber = r->element[2]->integer;
697             is_need_free = 1;
698 0           self->expected_subs--;
699             } else {
700 0 0         DEBUG_MSG("%s %s", r->element[0]->str, r->element[1]->str);
701 0           self->proccess_sub_count++;
702             }
703              
704 0 0         if(res.result == NULL) res.result = &PL_sv_undef;
705 0 0         if(res.error == NULL) res.error = &PL_sv_undef;
706              
707 0 0         PUSHMARK(SP);
708 0 0         XPUSHs(res.result);
709 0 0         XPUSHs(res.error);
710 0           PUTBACK;
711              
712 0           call_sv(cbt->cb, G_DISCARD);
713              
714 0 0         FREETMPS;
715 0           LEAVE;
716             } else {
717 0 0         DEBUG_MSG("connect error: %s", c->errstr);
718             is_need_free = 1;
719             }
720              
721 0 0         if(is_need_free) {
722             // destroy private data
723 0 0         DEBUG_MSG("destroy %p", cbt);
724 0 0         if(cbt->cb) {
725             SvREFCNT_dec(cbt->cb);
726 0           cbt->cb = NULL;
727             }
728 0           Safefree(cbt);
729             }
730 0 0         DEBUG_MSG("%s", "finish");
731             }
732              
733 0           static void Redis__Fast_quit(pTHX_ Redis__Fast self) {
734             redis_fast_sync_cb_t *cbt;
735              
736 0 0         if(!self->ac) {
737             return;
738             }
739              
740 0           Newx(cbt, sizeof(redis_fast_sync_cb_t), redis_fast_sync_cb_t);
741 0           cbt->ret.result = NULL;
742 0           cbt->ret.error = NULL;
743 0           cbt->custom_decode = NULL;
744              
745             // initialize, or self->flags will be corrupted.
746 0           cbt->on_flags = 0;
747 0           cbt->off_flags = 0;
748              
749 0           redisAsyncCommand(
750             self->ac, Redis__Fast_sync_reply_cb, cbt, "QUIT"
751             );
752 0           redisAsyncDisconnect(self->ac);
753 0 0         if(_wait_all_responses(aTHX_ self) == WAIT_FOR_EVENT_OK) {
754 0 0         DEBUG_MSG("%s", "wait_all_responses ok");
755 0 0         if(cbt->ret.result || cbt->ret.error) Safefree(cbt);
    0          
756             } else {
757 0 0         DEBUG_MSG("%s", "wait_all_responses not ok");
758 0 0         if(cbt->ret.result || cbt->ret.error) Safefree(cbt);
    0          
759             }
760 0 0         DEBUG_MSG("%s", "finish");
761             }
762              
763 68           static redis_fast_reply_t Redis__Fast_run_cmd(pTHX_ Redis__Fast self, int collect_errors, CUSTOM_DECODE custom_decode, SV* cb, int argc, const char** argv, size_t* argvlen) {
764             redis_fast_reply_t ret = {NULL, NULL};
765             int on_flags = 0, off_flags = ~0;
766              
767 68 50         DEBUG_MSG("start %s", argv[0]);
768              
769 68 50         DEBUG_MSG("pid check: previous pid is %d, now %d", self->pid, getpid());
770 68 50         if(self->pid != getpid()) {
771 0 0         DEBUG_MSG("%s", "pid changed. create new connection..");
772 0           Redis__Fast_connect(aTHX_ self);
773             }
774              
775 68 50         if(EQUALS_COMMAND(argvlen[0], argv[0], "MULTI")) {
    0          
776             on_flags = FLAG_INSIDE_TRANSACTION;
777 68 100         } else if(EQUALS_COMMAND(argvlen[0], argv[0], "EXEC") ||
    50          
    50          
778 0 0         EQUALS_COMMAND(argvlen[0], argv[0], "DISCARD")) {
779             off_flags = ~(FLAG_INSIDE_TRANSACTION | FLAG_INSIDE_WATCH);
780 67 50         } else if(EQUALS_COMMAND(argvlen[0], argv[0], "WATCH")) {
    0          
781             on_flags = FLAG_INSIDE_WATCH;
782 67 50         } else if(EQUALS_COMMAND(argvlen[0], argv[0], "UNWATCH")) {
    0          
783             off_flags = ~FLAG_INSIDE_WATCH;
784             }
785              
786 68 100         if(cb) {
787             redis_fast_async_cb_t *cbt;
788 1           Newx(cbt, sizeof(redis_fast_async_cb_t), redis_fast_async_cb_t);
789 1           cbt->cb = SvREFCNT_inc(cb);
790 1           cbt->custom_decode = custom_decode;
791 1           cbt->collect_errors = collect_errors;
792 1           cbt->on_flags = on_flags;
793 1           cbt->off_flags = off_flags;
794 1           cbt->command_name = argv[0];
795 1           cbt->command_length = argvlen[0];
796 1           redisAsyncCommandArgv(
797             self->ac, Redis__Fast_async_reply_cb, cbt,
798             argc, argv, argvlen
799             );
800 1           ret.result = sv_2mortal(newSViv(1));
801             } else {
802             redis_fast_sync_cb_t *cbt;
803 67 100         int i, cnt = (self->reconnect == 0 ? 1 : 2);
804             int res = WAIT_FOR_EVENT_OK;
805 67 50         for(i = 0; i < cnt; i++) {
806 67           Newx(cbt, sizeof(redis_fast_sync_cb_t), redis_fast_sync_cb_t);
807 67           self->need_reconnect = 0;
808 67           cbt->ret.result = NULL;
809 67           cbt->ret.error = NULL;
810 67           cbt->custom_decode = custom_decode;
811 67           cbt->collect_errors = collect_errors;
812 67           cbt->on_flags = on_flags;
813 67           cbt->off_flags = off_flags;
814 67 50         DEBUG_MSG("%s", "send command in sync mode");
815 67           redisAsyncCommandArgv(
816             self->ac, Redis__Fast_sync_reply_cb, cbt,
817             argc, argv, argvlen
818             );
819 67 50         DEBUG_MSG("%s", "waiting response");
820 67           res = _wait_all_responses(aTHX_ self);
821 67 100         if(res == WAIT_FOR_EVENT_OK && self->need_reconnect == 0) {
    50          
822             int _need_reconnect = 0;
823 66 100         if (1 < cnt - i) {
824 65           _need_reconnect = Redis__Fast_call_reconnect_on_error(
825             aTHX_ self, cbt->ret, argv[0], argvlen[0]
826             );
827             // Should be quit before reconnect
828 65 50         if (_need_reconnect) {
829 0           Redis__Fast_quit(aTHX_ self);
830             }
831             }
832 66 50         if (!_need_reconnect) {
833 66           ret = cbt->ret;
834 66 100         if(cbt->ret.result || cbt->ret.error) Safefree(cbt);
    50          
835 66 50         DEBUG_MSG("finish %s", argv[0]);
836 66           return ret;
837             }
838             }
839              
840 1 50         if( res == WAIT_FOR_EVENT_READ_TIMEOUT ) break;
841              
842 0 0         if(self->flags & (FLAG_INSIDE_TRANSACTION | FLAG_INSIDE_WATCH)) {
843             char *msg = "reconnect disabled inside transaction or watch";
844 0 0         DEBUG_MSG("error: %s", msg);
845 0           ret.error = sv_2mortal(newSVpvn(msg, strlen(msg)));
846 0           return ret;
847             }
848              
849 0           Redis__Fast_reconnect(aTHX_ self);
850 0 0         if(!self->ac) {
851             char *msg = "Not connected to any server";
852 0 0         DEBUG_MSG("error: %s", msg);
853 0           ret.error = sv_2mortal(newSVpvn(msg, strlen(msg)));
854 0           return ret;
855             }
856             }
857              
858 1 50         if( res == WAIT_FOR_EVENT_OK && (cbt->ret.result || cbt->ret.error) ) Safefree(cbt);
    0          
    0          
859             // else destructor will release cbt
860              
861 1 50         if(res == WAIT_FOR_EVENT_READ_TIMEOUT || res == WAIT_FOR_EVENT_WRITE_TIMEOUT) {
862 1           snprintf(self->error, MAX_ERROR_SIZE, "Error while reading from Redis server: %s", strerror(EAGAIN));
863 1           errno = EAGAIN;
864 1 50         DEBUG_MSG("error: %s", self->error);
865 1           ret.error = sv_2mortal(newSVpvn(self->error, strlen(self->error)));
866 1           return ret;
867             }
868 0 0         if(!self->ac) {
869             char *msg = "Not connected to any server";
870 0 0         DEBUG_MSG("error: %s", msg);
871 0           ret.error = sv_2mortal(newSVpvn(msg, strlen(msg)));
872 0           return ret;
873             }
874             }
875 1 50         DEBUG_MSG("Finish %s", argv[0]);
876 1           return ret;
877             }
878              
879 0           static redis_fast_reply_t Redis__Fast_keys_custom_decode(pTHX_ Redis__Fast self, redisReply* reply, int collect_errors) {
880             // TODO: Support redis <= 1.2.6
881 0           return Redis__Fast_decode_reply(aTHX_ self, reply, collect_errors);
882             }
883              
884 0           static redis_fast_reply_t Redis__Fast_info_custom_decode(pTHX_ Redis__Fast self, redisReply* reply, int collect_errors) {
885             redis_fast_reply_t res = {NULL, NULL};
886              
887 0 0         if(reply->type == REDIS_REPLY_STRING ||
888             reply->type == REDIS_REPLY_STATUS) {
889              
890 0           HV* hv = newHV();
891 0           char* str = reply->str;
892 0           size_t len = reply->len;
893 0           res.result = sv_2mortal(newRV_noinc((SV*)hv));
894              
895 0 0         while(len != 0) {
896 0           const char* line = (char*)memchr(str, '\r', len);
897             const char* sep;
898             size_t linelen;
899 0 0         if(line == NULL) {
900             linelen = len;
901             } else {
902 0           linelen = line - str;
903             }
904 0           sep = (char*)memchr(str, ':', linelen);
905 0 0         if(str[0] != '#' && sep != NULL) {
    0          
906             SV* val;
907             SV** ret;
908             size_t keylen;
909 0           keylen = sep - str;
910 0           val = sv_2mortal(newSVpvn(sep + 1, linelen - keylen - 1));
911 0           ret = hv_store(hv, str, keylen, SvREFCNT_inc(val), 0);
912 0 0         if (ret == NULL) {
913             SvREFCNT_dec(val);
914 0           croak("failed to hv_store");
915             }
916             }
917 0 0         if(line == NULL) {
918             break;
919             } else {
920 0           len -= linelen + 2;
921 0           str += linelen + 2;
922             }
923             }
924             } else {
925 0           res = Redis__Fast_decode_reply(aTHX_ self, reply, collect_errors);
926             }
927              
928 0           return res;
929             }
930              
931             MODULE = Redis::Fast PACKAGE = Redis::Fast
932              
933             SV*
934             _new(char* cls);
935             PREINIT:
936             redis_fast_t* self;
937             CODE:
938             {
939 70           Newxz(self, sizeof(redis_fast_t), redis_fast_t);
940 70 50         DEBUG_MSG("%s", "start");
941 70           self->error = (char*)malloc(MAX_ERROR_SIZE);
942 70           self->reconnect_on_error = NULL;
943 70           self->next_reconnect_on_error_at = -1;
944 70           self->is_connected = 1;
945 70           ST(0) = sv_newmortal();
946 70           sv_setref_pv(ST(0), cls, (void*)self);
947 70 50         DEBUG_MSG("return %p", ST(0));
948 70           XSRETURN(1);
949             }
950             OUTPUT:
951             RETVAL
952              
953             double
954             __set_reconnect(Redis::Fast self, double val)
955             CODE:
956             {
957 206           RETVAL = self->reconnect = val;
958             }
959             OUTPUT:
960             RETVAL
961              
962              
963             double
964             __get_reconnect(Redis::Fast self)
965             CODE:
966             {
967 68           RETVAL = self->reconnect;
968             }
969             OUTPUT:
970             RETVAL
971              
972              
973             int
974             __set_every(Redis::Fast self, int val)
975             CODE:
976             {
977 70           RETVAL = self->every = val;
978             }
979             OUTPUT:
980             RETVAL
981              
982              
983             int
984             __get_every(Redis::Fast self)
985             CODE:
986             {
987 0           RETVAL = self->every;
988             }
989             OUTPUT:
990             RETVAL
991              
992             int
993             __set_debug(Redis::Fast self, int val)
994             CODE:
995             {
996 70           RETVAL = self->debug = val;
997             }
998             OUTPUT:
999             RETVAL
1000              
1001             double
1002             __set_cnx_timeout(Redis::Fast self, double val)
1003             CODE:
1004             {
1005 70           RETVAL = self->cnx_timeout = val;
1006             }
1007             OUTPUT:
1008             RETVAL
1009              
1010             double
1011             __get_cnx_timeout(Redis::Fast self)
1012             CODE:
1013             {
1014 0           RETVAL = self->cnx_timeout;
1015             }
1016             OUTPUT:
1017             RETVAL
1018              
1019              
1020             double
1021             __set_read_timeout(Redis::Fast self, double val)
1022             CODE:
1023             {
1024 70           RETVAL = self->read_timeout = val;
1025             }
1026             OUTPUT:
1027             RETVAL
1028              
1029             double
1030             __get_read_timeout(Redis::Fast self)
1031             CODE:
1032             {
1033 0           RETVAL = self->read_timeout;
1034             }
1035             OUTPUT:
1036             RETVAL
1037              
1038              
1039             double
1040             __set_write_timeout(Redis::Fast self, double val)
1041             CODE:
1042             {
1043 70           RETVAL = self->write_timeout = val;
1044             }
1045             OUTPUT:
1046             RETVAL
1047              
1048             double
1049             __get_write_timeout(Redis::Fast self)
1050             CODE:
1051             {
1052 0           RETVAL = self->write_timeout;
1053             }
1054             OUTPUT:
1055             RETVAL
1056              
1057              
1058             int
1059             __set_current_database(Redis::Fast self, int val)
1060             CODE:
1061             {
1062 0           RETVAL = self->current_database = val;
1063             }
1064             OUTPUT:
1065             RETVAL
1066              
1067              
1068             int
1069             __get_current_database(Redis::Fast self)
1070             CODE:
1071             {
1072 0           RETVAL = self->current_database;
1073             }
1074             OUTPUT:
1075             RETVAL
1076              
1077              
1078             int
1079             __sock(Redis::Fast self)
1080             CODE:
1081             {
1082 0 0         RETVAL = self->ac ? self->ac->c.fd : 0;
1083             }
1084             OUTPUT:
1085             RETVAL
1086              
1087              
1088             int
1089             __get_port(Redis::Fast self)
1090             CODE:
1091             {
1092             struct sockaddr_in addr;
1093             socklen_t len;
1094 0           len = sizeof( addr );
1095 0           getsockname( self->ac->c.fd, ( struct sockaddr *)&addr, &len );
1096 0           RETVAL = addr.sin_port;
1097             }
1098             OUTPUT:
1099             RETVAL
1100              
1101              
1102             void
1103             __set_on_connect(Redis::Fast self, SV* func)
1104             CODE:
1105             {
1106 70           self->on_connect = SvREFCNT_inc(func);
1107             }
1108              
1109             void
1110             __set_on_build_sock(Redis::Fast self, SV* func)
1111             CODE:
1112             {
1113 0           self->on_build_sock = SvREFCNT_inc(func);
1114             }
1115              
1116             void
1117             __set_data(Redis::Fast self, SV* data)
1118             CODE:
1119             {
1120 70           self->data = SvREFCNT_inc(data);
1121             }
1122              
1123             void
1124             __get_data(Redis::Fast self)
1125             CODE:
1126             {
1127 272           ST(0) = self->data;
1128 272           XSRETURN(1);
1129             }
1130              
1131             void
1132             __set_reconnect_on_error(Redis::Fast self, SV* func)
1133             CODE:
1134             {
1135 0           self->reconnect_on_error = SvREFCNT_inc(func);
1136             }
1137              
1138             double
1139             __set_next_reconnect_on_error_at(Redis::Fast self, double val)
1140             CODE:
1141             {
1142             struct timeval current;
1143             double current_sec;
1144              
1145 0 0         if ( -1 < val ) {
1146 0           gettimeofday(¤t, NULL);
1147 0           current_sec = current.tv_sec + 1E-6 * current.tv_usec;
1148 0           val += current_sec;
1149             }
1150              
1151 0           RETVAL = self->next_reconnect_on_error_at = val;
1152             }
1153             OUTPUT:
1154             RETVAL
1155              
1156             void
1157             is_subscriber(Redis::Fast self)
1158             CODE:
1159             {
1160 139           ST(0) = sv_2mortal(newSViv(self->is_subscriber));
1161 139           XSRETURN(1);
1162             }
1163              
1164              
1165             void
1166             DESTROY(Redis::Fast self);
1167             CODE:
1168             {
1169 70 50         DEBUG_MSG("%s", "start");
1170 70 100         if (self->ac) {
1171 62 50         DEBUG_MSG("%s", "free ac");
1172 62           redisAsyncFree(self->ac);
1173 62           _wait_all_responses(aTHX_ self);
1174 62           self->ac = NULL;
1175             }
1176              
1177 70 100         if(self->hostname) {
1178 4 50         DEBUG_MSG("%s", "free hostname");
1179 4           free(self->hostname);
1180 4           self->hostname = NULL;
1181             }
1182              
1183 70 100         if(self->path) {
1184 66 50         DEBUG_MSG("%s", "free path");
1185 66           free(self->path);
1186 66           self->path = NULL;
1187             }
1188              
1189 70 50         if(self->error) {
1190 70 50         DEBUG_MSG("%s", "free error");
1191 70           free(self->error);
1192 70           self->error = NULL;
1193             }
1194              
1195 70 50         if(self->on_connect) {
1196 70 50         DEBUG_MSG("%s", "free on_connect");
1197 70           SvREFCNT_dec(self->on_connect);
1198 70           self->on_connect = NULL;
1199             }
1200              
1201 70 50         if(self->on_build_sock) {
1202 0 0         DEBUG_MSG("%s", "free on_build_sock");
1203 0           SvREFCNT_dec(self->on_build_sock);
1204 0           self->on_build_sock = NULL;
1205             }
1206              
1207 70 50         if(self->data) {
1208 70 50         DEBUG_MSG("%s", "free data");
1209 70           SvREFCNT_dec(self->data);
1210 70           self->data = NULL;
1211             }
1212              
1213 70 50         if(self->reconnect_on_error) {
1214 0 0         DEBUG_MSG("%s", "free reconnect_on_error");
1215 0           SvREFCNT_dec(self->reconnect_on_error);
1216 0           self->reconnect_on_error = NULL;
1217             }
1218              
1219 70 50         DEBUG_MSG("%s", "finish");
1220 70           Safefree(self);
1221             }
1222              
1223              
1224             void
1225             __connection_info(Redis::Fast self, char* hostname, int port = 6379)
1226             CODE:
1227             {
1228 4 50         if(self->hostname) {
1229 0           free(self->hostname);
1230 0           self->hostname = NULL;
1231             }
1232              
1233 4 50         if(self->path) {
1234 0           free(self->path);
1235 0           self->path = NULL;
1236             }
1237              
1238 4 50         if(hostname) {
1239 4           self->hostname = (char*)malloc(strlen(hostname) + 1);
1240             strcpy(self->hostname, hostname);
1241             }
1242              
1243 4           self->port = port;
1244             }
1245              
1246             void
1247             __connection_info_unix(Redis::Fast self, char* path)
1248             CODE:
1249             {
1250 66 50         if(self->hostname) {
1251 0           free(self->hostname);
1252 0           self->hostname = NULL;
1253             }
1254              
1255 66 50         if(self->path) {
1256 0           free(self->path);
1257 0           self->path = NULL;
1258             }
1259              
1260 66 50         if(path) {
1261 66           self->path = (char*)malloc(strlen(path) + 1);
1262             strcpy(self->path, path);
1263             }
1264             }
1265              
1266              
1267             void
1268             connect(Redis::Fast self)
1269             CODE:
1270             {
1271 69           Redis__Fast_connect(aTHX_ self);
1272             }
1273              
1274             void
1275             wait_all_responses(Redis::Fast self)
1276             CODE:
1277             {
1278 1           int res = _wait_all_responses(aTHX_ self);
1279 1 50         if(res != WAIT_FOR_EVENT_OK) {
1280 0           croak("Error while reading from Redis server");
1281             }
1282              
1283 1 50         if (0 < self->reconnect && self->need_reconnect) {
    50          
1284             // Should be quit before reconnect
1285 0           Redis__Fast_quit(aTHX_ self);
1286 0           Redis__Fast_reconnect(aTHX_ self);
1287 0           self->need_reconnect = 0;
1288             }
1289             }
1290              
1291             void
1292             wait_one_response(Redis::Fast self)
1293             CODE:
1294             {
1295 0           int res = _wait_all_responses(aTHX_ self);
1296 0 0         if(res != WAIT_FOR_EVENT_OK) {
1297 0           croak("Error while reading from Redis server");
1298             }
1299              
1300 0 0         if (0 < self->reconnect && self->need_reconnect) {
    0          
1301             // Should be quit before reconnect
1302 0           Redis__Fast_quit(aTHX_ self);
1303 0           Redis__Fast_reconnect(aTHX_ self);
1304 0           self->need_reconnect = 0;
1305             }
1306             }
1307              
1308             void
1309             __std_cmd(Redis::Fast self, ...)
1310             PREINIT:
1311             redis_fast_reply_t ret;
1312             SV* cb;
1313             char** argv;
1314             size_t* argvlen;
1315             STRLEN len;
1316             int argc, i, collect_errors;
1317             CODE:
1318             {
1319 68           Redis__Fast_reconnect(aTHX_ self);
1320 68 50         if(!self->ac) {
1321 0           croak("Not connected to any server");
1322             }
1323              
1324 68           cb = ST(items - 1);
1325 68 100         if (SvROK(cb) && SvTYPE(SvRV(cb)) == SVt_PVCV) {
    50          
1326 1           argc = items - 2;
1327             } else {
1328             cb = NULL;
1329 67           argc = items - 1;
1330             }
1331 68 50         Newx(argv, sizeof(char*) * argc, char*);
1332 68 50         Newx(argvlen, sizeof(size_t) * argc, size_t);
1333              
1334 204 100         for (i = 0; i < argc; i++) {
1335 136 50         if(!sv_utf8_downgrade(ST(i + 1), 1)) {
1336 0           croak("command sent is not an octet sequence in the native encoding (Latin-1). Consider using debug mode to see the command itself.");
1337             }
1338 136 50         argv[i] = SvPV(ST(i + 1), len);
1339 136           argvlen[i] = len;
1340             }
1341              
1342             collect_errors = 0;
1343 68 100         if(cb && EQUALS_COMMAND(argvlen[0], argv[0], "EXEC"))
    50          
    50          
1344             collect_errors = 1;
1345              
1346 68           ret = Redis__Fast_run_cmd(aTHX_ self, collect_errors, NULL, cb, argc, (const char**)argv, argvlen);
1347              
1348 68           Safefree(argv);
1349 68           Safefree(argvlen);
1350              
1351 68 100         ST(0) = ret.result ? ret.result : &PL_sv_undef;
1352 68 100         ST(1) = ret.error ? ret.error : &PL_sv_undef;
1353 68           XSRETURN(2);
1354             }
1355              
1356              
1357             void
1358             __quit(Redis::Fast self)
1359             CODE:
1360             {
1361 0 0         DEBUG_MSG("%s", "start QUIT");
1362 0 0         if(self->ac) {
1363 0           Redis__Fast_quit(aTHX_ self);
1364 0           ST(0) = sv_2mortal(newSViv(1));
1365 0           XSRETURN(1);
1366             } else {
1367 0 0         DEBUG_MSG("%s", "finish. there is no connection.");
1368 0           XSRETURN(0);
1369             }
1370             }
1371              
1372              
1373             void
1374             __shutdown(Redis::Fast self)
1375             CODE:
1376             {
1377 0 0         DEBUG_MSG("%s", "start SHUTDOWN");
1378 0 0         if(self->ac) {
1379 0           redisAsyncCommand(
1380             self->ac, NULL, NULL, "SHUTDOWN"
1381             );
1382 0           redisAsyncDisconnect(self->ac);
1383 0           _wait_all_responses(aTHX_ self);
1384 0           self->is_connected = 0;
1385 0           ST(0) = sv_2mortal(newSViv(1));
1386 0           XSRETURN(1);
1387             } else {
1388 0 0         DEBUG_MSG("%s", "redis server has alread shutdown");
1389 0           XSRETURN(0);
1390             }
1391             }
1392              
1393              
1394             void
1395             __keys(Redis::Fast self, ...)
1396             PREINIT:
1397             redis_fast_reply_t ret;
1398             SV* cb;
1399             char** argv;
1400             size_t* argvlen;
1401             STRLEN len;
1402             int argc, i;
1403             CODE:
1404             {
1405 0           Redis__Fast_reconnect(aTHX_ self);
1406 0 0         if(!self->ac) {
1407 0           croak("Not connected to any server");
1408             }
1409              
1410 0           cb = ST(items - 1);
1411 0 0         if (SvROK(cb) && SvTYPE(SvRV(cb)) == SVt_PVCV) {
    0          
1412 0           argc = items - 1;
1413             } else {
1414             cb = NULL;
1415             argc = items;
1416             }
1417 0 0         Newx(argv, sizeof(char*) * argc, char*);
1418 0 0         Newx(argvlen, sizeof(size_t) * argc, size_t);
1419              
1420 0           argv[0] = "KEYS";
1421 0           argvlen[0] = 4;
1422 0 0         for (i = 1; i < argc; i++) {
1423 0 0         argv[i] = SvPV(ST(i), len);
1424 0           argvlen[i] = len;
1425             }
1426              
1427 0           ret = Redis__Fast_run_cmd(aTHX_ self, 0, Redis__Fast_keys_custom_decode, cb, argc, (const char**)argv, argvlen);
1428 0           Safefree(argv);
1429 0           Safefree(argvlen);
1430              
1431 0 0         ST(0) = ret.result ? ret.result : &PL_sv_undef;
1432 0 0         ST(1) = ret.error ? ret.error : &PL_sv_undef;
1433 0           XSRETURN(2);
1434             }
1435              
1436              
1437             void
1438             __info(Redis::Fast self, ...)
1439             PREINIT:
1440             redis_fast_reply_t ret;
1441             SV* cb;
1442             char** argv;
1443             size_t* argvlen;
1444             STRLEN len;
1445             int argc, i;
1446             CODE:
1447             {
1448 3           Redis__Fast_reconnect(aTHX_ self);
1449 3 50         if(!self->ac) {
1450             char *msg = "Not connected to any server";
1451 3           ST(0) = &PL_sv_undef;
1452 3           ST(1) = sv_2mortal(newSVpvn(msg, strlen(msg)));
1453 3           XSRETURN(2);
1454             }
1455              
1456 0           cb = ST(items - 1);
1457 0 0         if (SvROK(cb) && SvTYPE(SvRV(cb)) == SVt_PVCV) {
    0          
1458 0           argc = items - 1;
1459             } else {
1460             cb = NULL;
1461             argc = items;
1462             }
1463 0 0         Newx(argv, sizeof(char*) * argc, char*);
1464 0 0         Newx(argvlen, sizeof(size_t) * argc, size_t);
1465              
1466 0           argv[0] = "INFO";
1467 0           argvlen[0] = 4;
1468 0 0         for (i = 1; i < argc; i++) {
1469 0 0         argv[i] = SvPV(ST(i), len);
1470 0           argvlen[i] = len;
1471             }
1472              
1473 0           ret = Redis__Fast_run_cmd(aTHX_ self, 0, Redis__Fast_info_custom_decode, cb, argc, (const char**)argv, argvlen);
1474 0           Safefree(argv);
1475 0           Safefree(argvlen);
1476              
1477 0 0         ST(0) = ret.result ? ret.result : &PL_sv_undef;
1478 0 0         ST(1) = ret.error ? ret.error : &PL_sv_undef;
1479 0           XSRETURN(2);
1480             }
1481              
1482              
1483             void
1484             __send_subscription_cmd(Redis::Fast self, ...)
1485             PREINIT:
1486             SV* cb;
1487             char** argv;
1488             size_t* argvlen;
1489             STRLEN len;
1490             int argc, i;
1491             redis_fast_subscribe_cb_t* cbt;
1492             int pvariant;
1493             CODE:
1494             {
1495 0 0         int cnt = (self->reconnect == 0 ? 1 : 2);
1496              
1497 0 0         DEBUG_MSG("%s", "start");
1498              
1499 0           Redis__Fast_reconnect(aTHX_ self);
1500 0 0         if(!self->ac) {
1501 0           croak("Not connected to any server");
1502             }
1503              
1504 0 0         if(!self->is_subscriber) {
1505 0           _wait_all_responses(aTHX_ self);
1506             }
1507 0           cb = ST(items - 1);
1508 0 0         if (SvROK(cb) && SvTYPE(SvRV(cb)) == SVt_PVCV) {
    0          
1509 0           argc = items - 2;
1510             } else {
1511             cb = NULL;
1512 0           argc = items - 1;
1513             }
1514 0 0         Newx(argv, sizeof(char*) * argc, char*);
1515 0 0         Newx(argvlen, sizeof(size_t) * argc, size_t);
1516              
1517 0 0         for (i = 0; i < argc; i++) {
1518 0 0         argv[i] = SvPV(ST(i+1), len);
1519 0           argvlen[i] = len;
1520 0 0         DEBUG_MSG("argv[%d] = %s", i, argv[i]);
1521             }
1522              
1523 0 0         for(i = 0; i < cnt; i++) {
1524 0           pvariant = tolower(argv[0][0]) == 'p';
1525 0 0         if (strcasecmp(argv[0]+pvariant,"unsubscribe") != 0) {
1526 0 0         DEBUG_MSG("%s", "command is not unsubscribe");
1527 0           Newx(cbt, sizeof(redis_fast_subscribe_cb_t), redis_fast_subscribe_cb_t);
1528 0           cbt->self = self;
1529 0           cbt->cb = SvREFCNT_inc(cb);
1530             } else {
1531 0 0         DEBUG_MSG("%s", "command is unsubscribe");
1532             cbt = NULL;
1533             }
1534 0 0         redisAsyncCommandArgv(
1535             self->ac, cbt ? Redis__Fast_subscribe_cb : NULL, cbt,
1536             argc, (const char**)argv, argvlen
1537             );
1538 0           self->expected_subs = argc - 1;
1539 0 0         while(self->expected_subs > 0 && wait_for_event(aTHX_ self, self->read_timeout, self->write_timeout) == WAIT_FOR_EVENT_OK) ;
    0          
1540 0 0         if(self->expected_subs == 0) break;
1541 0           Redis__Fast_reconnect(aTHX_ self);
1542 0 0         if(!self->ac) {
1543 0           Safefree(argv);
1544 0           Safefree(argvlen);
1545 0           croak("Not connected to any server");
1546             }
1547             }
1548              
1549 0           Safefree(argv);
1550 0           Safefree(argvlen);
1551 0 0         DEBUG_MSG("%s", "finish");
1552 0           XSRETURN(0);
1553             }
1554              
1555             void
1556             wait_for_messages(Redis::Fast self, double timeout = -1)
1557             CODE:
1558             {
1559 0 0         int i, cnt = (self->reconnect == 0 ? 1 : 2);
1560             int res = WAIT_FOR_EVENT_OK;
1561 0 0         DEBUG_MSG("%s", "start");
1562 0           self->proccess_sub_count = 0;
1563 0 0         for(i = 0; i < cnt; i++) {
1564 0 0         while((res = wait_for_event(aTHX_ self, timeout, timeout)) == WAIT_FOR_EVENT_OK) ;
1565 0 0         if(res == WAIT_FOR_EVENT_READ_TIMEOUT || res == WAIT_FOR_EVENT_WRITE_TIMEOUT) break;
1566 0           Redis__Fast_reconnect(aTHX_ self);
1567 0 0         if(!self->ac) {
1568 0           croak("Not connected to any server");
1569             }
1570             }
1571 0 0         if(res == WAIT_FOR_EVENT_EXCEPTION) {
1572 0 0         if(!self->ac) {
1573 0 0         DEBUG_MSG("%s", "Connection not found");
1574 0           croak("EOF from server");
1575 0 0         } else if(self->ac->c.err == REDIS_ERR_EOF) {
1576 0 0         DEBUG_MSG("hiredis returns error: %s", self->ac->c.errstr);
1577 0           croak("EOF from server");
1578             } else {
1579 0 0         DEBUG_MSG("hiredis returns error: %s", self->ac->c.errstr);
1580 0           snprintf(self->error, MAX_ERROR_SIZE, "[WAIT_FOR_MESSAGES] %s", self->ac->c.errstr);
1581 0           croak("%s", self->error);
1582             }
1583             }
1584 0           ST(0) = sv_2mortal(newSViv(self->proccess_sub_count));
1585 0 0         DEBUG_MSG("finish with %d", res);
1586 0           XSRETURN(1);
1587             }
1588              
1589             void
1590             __wait_for_event(Redis::Fast self, double timeout = -1)
1591             CODE:
1592             {
1593 0 0         DEBUG_MSG("%s", "start");
1594 0           wait_for_event(aTHX_ self, timeout, timeout);
1595 0 0         DEBUG_MSG("%s", "finish");
1596 0           XSRETURN(0);
1597             }