File Coverage

blib/lib/AnyEvent/RipeRedis.pm
Criterion Covered Total %
statement 233 480 48.5
branch 49 176 27.8
condition 20 63 31.7
subroutine 43 61 70.4
pod 4 4 100.0
total 349 784 44.5


line stmt bran cond sub pod time code
1             package AnyEvent::RipeRedis;
2              
3 21     21   1561559 use 5.008000;
  21         275  
4 21     21   143 use strict;
  21         41  
  21         630  
5 21     21   119 use warnings;
  21         76  
  21         756  
6 21     21   157 use base qw( Exporter );
  21         77  
  21         3942  
7              
8             our $VERSION = '0.48';
9              
10 21     21   9590 use AnyEvent::RipeRedis::Error;
  21         63  
  21         664  
11              
12 21     21   22354 use AnyEvent;
  21         123057  
  21         945  
13 21     21   18539 use AnyEvent::Handle;
  21         442365  
  21         1097  
14 21     21   231 use Scalar::Util qw( looks_like_number weaken );
  21         59  
  21         1447  
15 21     21   14137 use Digest::SHA qw( sha1_hex );
  21         69967  
  21         2053  
16 21     21   197 use Carp qw( croak );
  21         50  
  21         2052  
17              
18             my %ERROR_CODES;
19              
20             BEGIN {
21 21     21   395 %ERROR_CODES = %AnyEvent::RipeRedis::Error::ERROR_CODES;
22 21         169 our @EXPORT_OK = keys %ERROR_CODES;
23 21         1535 our %EXPORT_TAGS = ( err_codes => \@EXPORT_OK );
24             }
25              
26             use constant {
27             # Default values
28 21         21880 D_HOST => 'localhost',
29             D_PORT => 6379,
30             D_DB_INDEX => 0,
31              
32             %ERROR_CODES,
33              
34             # Operation status
35             S_NEED_DO => 1,
36             S_IN_PROGRESS => 2,
37             S_DONE => 3,
38              
39             # String terminator
40             EOL => "\r\n",
41             EOL_LENGTH => 2,
42 21     21   170 };
  21         43  
43              
44             my %SUB_CMDS = (
45             subscribe => 1,
46             psubscribe => 1,
47             );
48              
49             my %SUBUNSUB_CMDS = (
50             %SUB_CMDS,
51             unsubscribe => 1,
52             punsubscribe => 1,
53             );
54              
55             my %MESSAGE_TYPES = (
56             message => 1,
57             pmessage => 1,
58             );
59              
60             my %NEED_PREPROCESS = (
61             multi => 1,
62             exec => 1,
63             discard => 1,
64             eval_cached => 1,
65             %SUBUNSUB_CMDS,
66             );
67              
68             my %NEED_POSTPROCESS = (
69             info => 1,
70             cluster_info => 1,
71             select => 1,
72             quit => 1,
73             );
74              
75             my %ERR_PREFS_MAP = (
76             LOADING => E_LOADING_DATASET,
77             NOSCRIPT => E_NO_SCRIPT,
78             BUSY => E_BUSY,
79             MASTERDOWN => E_MASTER_DOWN,
80             MISCONF => E_MISCONF,
81             READONLY => E_READONLY,
82             OOM => E_OOM,
83             EXECABORT => E_EXEC_ABORT,
84             NOAUTH => E_NO_AUTH,
85             WRONGTYPE => E_WRONG_TYPE,
86             NOREPLICAS => E_NO_REPLICAS,
87             BUSYKEY => E_BUSY_KEY,
88             CROSSSLOT => E_CROSS_SLOT,
89             TRYAGAIN => E_TRY_AGAIN,
90             ASK => E_ASK,
91             MOVED => E_MOVED,
92             CLUSTERDOWN => E_CLUSTER_DOWN,
93             NOTBUSY => E_NOT_BUSY,
94             );
95              
96             my %EVAL_CACHE;
97              
98              
99             sub new {
100 25     25 1 113111 my $class = shift;
101 25         125 my %params = @_;
102              
103 25         75 my $self = bless {}, $class;
104              
105 25   50     248 $self->{host} = $params{host} || D_HOST;
106 25   100     142 $self->{port} = $params{port} || D_PORT;
107 25         77 $self->{password} = $params{password};
108             $self->{database}
109 25 50       81 = defined $params{database} ? $params{database} : D_DB_INDEX;
110 25 50       87 $self->{utf8} = exists $params{utf8} ? $params{utf8} : 1;
111 25         55 $self->{lazy} = $params{lazy};
112 25 100       78 $self->{reconnect} = exists $params{reconnect} ? $params{reconnect} : 1;
113 25   50     206 $self->{handle_params} = $params{handle_params} || {};
114 25         79 $self->{on_connect} = $params{on_connect};
115 25         65 $self->{on_disconnect} = $params{on_disconnect};
116              
117 25         134 $self->connection_timeout( $params{connection_timeout} );
118 23         113 $self->read_timeout( $params{read_timeout} );
119 21         90 $self->reconnect_interval( $params{reconnect_interval} );
120 19         88 $self->on_error( $params{on_error} );
121              
122 19         107 $self->_reset_internals;
123 19         55 $self->{_input_queue} = [];
124 19         42 $self->{_temp_queue} = [];
125 19         42 $self->{_processing_queue} = [];
126 19         41 $self->{_channels} = {};
127 19         52 $self->{_channel_cnt} = 0;
128 19         35 $self->{_pchannel_cnt} = 0;
129              
130 19 50       54 unless ( $self->{lazy} ) {
131 19         60 $self->_connect;
132             }
133              
134 19         90 return $self;
135             }
136              
137             sub execute {
138 0     0 1 0 my $self = shift;
139 0         0 my $cmd_name = shift;
140              
141 0         0 my $cmd = $self->_prepare( $cmd_name, [@_] );
142 0         0 $self->_execute($cmd);
143              
144 0         0 return;
145             }
146              
147             sub disconnect {
148 3     3 1 597 my $self = shift;
149              
150 3         31 $self->_disconnect;
151              
152 3         6 return;
153             }
154              
155             sub on_error {
156 24     24 1 592 my $self = shift;
157              
158 24 100       65 if (@_) {
159 21         36 my $on_error = shift;
160              
161 21 100       52 if ( defined $on_error ) {
162 13         35 $self->{on_error} = $on_error;
163             }
164             else {
165             $self->{on_error} = sub {
166 1     1   2 my $err = shift;
167 1         5 warn $err->message . "\n";
168 8         61 };
169             }
170             }
171              
172 24         88 return $self->{on_error};
173             }
174              
175             # Generate accessors
176             {
177 21     21   180 no strict qw( refs );
  21         57  
  21         92900  
178              
179             foreach my $name ( qw( host port database ) ) {
180             *{$name} = sub {
181 3     3   4106 my $self = shift;
182 3         22 return $self->{$name};
183             }
184             }
185              
186             foreach my $name ( qw( connection_timeout read_timeout
187             reconnect_interval ) )
188             {
189             *{$name} = sub {
190 90     90   2408 my $self = shift;
191              
192 90 100       222 if (@_) {
193 81         163 my $seconds = shift;
194              
195 81 100 100     316 if ( defined $seconds
      100        
196             && ( !looks_like_number($seconds) || $seconds < 0 ) )
197             {
198 12         1264 croak qq{"$name" must be a positive number};
199             }
200 69         184 $self->{$name} = $seconds;
201             }
202              
203 78         165 return $self->{$name};
204             };
205             }
206              
207             foreach my $name ( qw( utf8 reconnect on_connect on_disconnect ) ) {
208             *{$name} = sub {
209 20     20   1125 my $self = shift;
210              
211 20 100       53 if (@_) {
212 8         22 $self->{$name} = shift;
213             }
214              
215 20         68 return $self->{$name};
216             };
217             }
218             }
219              
220             sub _connect {
221 19     19   36 my $self = shift;
222              
223             $self->{_handle} = AnyEvent::Handle->new(
224 19         123 %{ $self->{handle_params} },
225 19         49 connect => [ $self->{host}, $self->{port} ],
226             on_prepare => $self->_create_on_prepare,
227             on_connect => $self->_create_on_connect,
228             on_connect_error => $self->_create_on_connect_error,
229             on_rtimeout => $self->_create_on_rtimeout,
230             on_eof => $self->_create_on_eof,
231             on_error => $self->_create_on_handle_error,
232             on_read => $self->_create_on_read,
233             );
234              
235 19         3058 return;
236             }
237              
238             sub _create_on_prepare {
239 19     19   43 my $self = shift;
240              
241 19         97 weaken($self);
242              
243             return sub {
244 27 100   27   144191 if ( defined $self->{connection_timeout} ) {
245 17         80 return $self->{connection_timeout};
246             }
247              
248 10         76 return;
249 19         157 };
250             }
251              
252             sub _create_on_connect {
253 19     19   51 my $self = shift;
254              
255 19         70 weaken($self);
256              
257             return sub {
258 0     0   0 $self->{_connected} = 1;
259              
260 0 0       0 unless ( defined $self->{password} ) {
261 0         0 $self->{_auth_state} = S_DONE;
262             }
263 0 0       0 if ( $self->{database} == 0 ) {
264 0         0 $self->{_db_selection_state} = S_DONE;
265             }
266              
267 0 0       0 if ( $self->{_auth_state} == S_NEED_DO ) {
    0          
268 0         0 $self->_auth;
269             }
270             elsif ( $self->{_db_selection_state} == S_NEED_DO ) {
271 0         0 $self->_select_database;
272             }
273             else {
274 0         0 $self->{_ready} = 1;
275 0         0 $self->_process_input_queue;
276             }
277              
278 0 0       0 if ( defined $self->{on_connect} ) {
279 0         0 $self->{on_connect}->();
280             }
281 19         122 };
282             }
283              
284             sub _create_on_connect_error {
285 19     19   45 my $self = shift;
286              
287 19         50 weaken($self);
288              
289             return sub {
290 8     8   1132 my $err_msg = pop;
291              
292 8         60 my $err = _new_error(
293             "Can't connect to $self->{host}:$self->{port}: $err_msg",
294             E_CANT_CONN
295             );
296 8         32 $self->_disconnect($err);
297 19         118 };
298             }
299              
300             sub _create_on_rtimeout {
301 19     19   44 my $self = shift;
302              
303 19         47 weaken($self);
304              
305             return sub {
306 0 0   0   0 if ( @{ $self->{_processing_queue} } ) {
  0         0  
307 0         0 my $err = _new_error( 'Read timed out.', E_READ_TIMEDOUT );
308 0         0 $self->_disconnect($err);
309             }
310             else {
311 0         0 $self->{_handle}->rtimeout(undef);
312             }
313 19         122 };
314             }
315              
316             sub _create_on_eof {
317 19     19   39 my $self = shift;
318              
319 19         50 weaken($self);
320              
321             return sub {
322 0     0   0 my $err = _new_error( 'Connection closed by remote host.',
323             E_CONN_CLOSED_BY_REMOTE_HOST );
324 0         0 $self->_disconnect($err);
325 19         134 };
326             }
327              
328             sub _create_on_handle_error {
329 19     19   36 my $self = shift;
330              
331 19         50 weaken($self);
332              
333             return sub {
334 0     0   0 my $err_msg = pop;
335              
336 0         0 my $err = _new_error( $err_msg, E_IO );
337 0         0 $self->_disconnect($err);
338 19         110 };
339             }
340              
341             sub _create_on_read {
342 19     19   41 my $self = shift;
343              
344 19         55 weaken($self);
345              
346 19         38 my $str_len;
347             my @bufs;
348 19         36 my $bufs_num = 0;
349              
350             return sub {
351 0     0   0 my $handle = shift;
352              
353 0         0 MAIN: while (1) {
354 0 0       0 return if $handle->destroyed;
355              
356 0         0 my $reply;
357             my $err_code;
358              
359 0 0       0 if ( defined $str_len ) {
360 0 0       0 if ( length( $handle->{rbuf} ) < $str_len + EOL_LENGTH ) {
361 0         0 return;
362             }
363              
364 0         0 $reply = substr( $handle->{rbuf}, 0, $str_len, '' );
365 0         0 substr( $handle->{rbuf}, 0, EOL_LENGTH, '' );
366 0 0       0 if ( $self->{utf8} ) {
367 0         0 utf8::decode($reply);
368             }
369              
370 0         0 undef $str_len;
371             }
372             else {
373 0         0 my $eol_pos = index( $handle->{rbuf}, EOL );
374              
375 0 0       0 if ( $eol_pos < 0 ) {
376 0         0 return;
377             }
378              
379 0         0 $reply = substr( $handle->{rbuf}, 0, $eol_pos, '' );
380 0         0 my $type = substr( $reply, 0, 1, '' );
381 0         0 substr( $handle->{rbuf}, 0, EOL_LENGTH, '' );
382              
383 0 0 0     0 if ( $type ne '+' && $type ne ':' ) {
384 0 0       0 if ( $type eq '$' ) {
    0          
    0          
385 0 0       0 if ( $reply >= 0 ) {
386 0         0 $str_len = $reply;
387 0         0 next;
388             }
389              
390 0         0 undef $reply;
391             }
392             elsif ( $type eq '*' ) {
393 0 0       0 if ( $reply > 0 ) {
    0          
394 0         0 push( @bufs,
395             { reply => [],
396             err_code => undef,
397             chunks_cnt => $reply,
398             }
399             );
400 0         0 $bufs_num++;
401              
402 0         0 next;
403             }
404             elsif ( $reply == 0 ) {
405 0         0 $reply = [];
406             }
407             else {
408 0         0 undef $reply;
409             }
410             }
411             elsif ( $type eq '-' ) {
412 0         0 $err_code = E_OPRN_ERROR;
413 0 0       0 if ( $reply =~ m/^([A-Z]{3,}) / ) {
414 0 0       0 if ( exists $ERR_PREFS_MAP{$1} ) {
415 0         0 $err_code = $ERR_PREFS_MAP{$1};
416             }
417             }
418             }
419             else {
420 0         0 my $err = _new_error( 'Unexpected reply received.',
421             E_UNEXPECTED_DATA );
422 0         0 $self->_disconnect($err);
423              
424 0         0 return;
425             }
426             }
427             }
428              
429 0         0 while ( $bufs_num > 0 ) {
430 0         0 my $curr_buf = $bufs[-1];
431 0 0       0 if ( defined $err_code ) {
432 0 0       0 unless ( ref($reply) ) {
433 0         0 $reply = _new_error( $reply, $err_code );
434             }
435 0         0 $curr_buf->{err_code} = E_OPRN_ERROR;
436             }
437 0         0 push( @{ $curr_buf->{reply} }, $reply );
  0         0  
438 0 0       0 if ( --$curr_buf->{chunks_cnt} > 0 ) {
439 0         0 next MAIN;
440             }
441              
442 0         0 $reply = $curr_buf->{reply};
443 0         0 $err_code = $curr_buf->{err_code};
444 0         0 pop @bufs;
445 0         0 $bufs_num--;
446             }
447              
448 0         0 $self->_process_reply( $reply, $err_code );
449             }
450              
451 0         0 return;
452 19         327 };
453             }
454              
455             sub _prepare {
456 25     25   46 my $self = shift;
457 25         42 my $cmd_name = shift;
458 25         133 my $args = shift;
459              
460 25         69 my $cbs;
461 25 50       89 if ( ref( $args->[-1] ) eq 'HASH' ) {
462 0         0 $cbs = pop @{$args};
  0         0  
463             }
464             else {
465 25         51 $cbs = {};
466 25 100       95 if ( ref( $args->[-1] ) eq 'CODE' ) {
467 16 50       46 if ( exists $SUB_CMDS{$cmd_name} ) {
468 0         0 $cbs->{on_message} = pop @{$args};
  0         0  
469             }
470             else {
471 16         22 $cbs->{on_reply} = pop @{$args};
  16         54  
472             }
473             }
474             }
475              
476             my @kwds
477 25 50       137 = $cmd_name eq 'eval_cached'
478             ? ('evalsha')
479             : split( m/_/, lc($cmd_name) );
480              
481             my $cmd = {
482             name => $cmd_name,
483             kwds => \@kwds,
484             args => $args,
485 25         59 %{$cbs},
  25         139  
486             };
487              
488 25 100       92 unless ( defined $cmd->{on_reply} ) {
489 9         25 weaken($self);
490              
491             $cmd->{on_reply} = sub {
492 1     1   2 my $err = $_[1];
493              
494 1 50       4 if ( defined $err ) {
495 1         3 $self->{on_error}->($err);
496 1         9 return;
497             }
498 9         29 };
499             }
500              
501 25         89 return $cmd;
502             }
503              
504             sub _execute {
505 25     25   45 my $self = shift;
506 25         44 my $cmd = shift;
507              
508 25 50 66     127 if ( $self->{_multi_mode}
      66        
509             && ( exists $SUBUNSUB_CMDS{ $cmd->{name} }
510             || exists $NEED_POSTPROCESS{ $cmd->{name} } ) )
511             {
512 7         719 croak qq{Command "$cmd->{name}" not allowed after "multi" command.}
513             . ' First, the transaction must be finalized.';
514             }
515              
516 18 100       62 if ( exists $NEED_PREPROCESS{ $cmd->{name} } ) {
517 2 100 33     14 if ( $cmd->{name} eq 'multi' ) {
    50          
    50          
518 1         3 $self->{_multi_mode} = 1;
519             }
520             elsif ( $cmd->{name} eq 'exec'
521             || $cmd->{name} eq 'discard' )
522             {
523 0         0 $self->{_multi_mode} = 0;
524             }
525             elsif ( $cmd->{name} eq 'eval_cached' ) {
526 0         0 my $script = $cmd->{args}[0];
527 0 0       0 unless ( exists $EVAL_CACHE{$script} ) {
528 0         0 $EVAL_CACHE{$script} = sha1_hex($script);
529             }
530 0         0 $cmd->{args}[0] = $EVAL_CACHE{$script};
531 0         0 $cmd->{script} = $script;
532             }
533             else { # subscribe, unsubscribe, psubscribe, punsubscribe
534 1 50 33     7 if ( exists $SUB_CMDS{ $cmd->{name} }
535             && !defined $cmd->{on_message} )
536             {
537 1         153 croak '"on_message" callback must be specified';
538             }
539              
540 0 0       0 if ( @{ $cmd->{args} } ) {
  0         0  
541 0         0 $cmd->{reply_cnt} = scalar @{ $cmd->{args} };
  0         0  
542             }
543             }
544             }
545              
546 17 50       59 unless ( $self->{_ready} ) {
547 17 100       61 if ( defined $self->{_handle} ) {
    50          
    50          
548 13 50       59 if ( $self->{_connected} ) {
549 0 0       0 if ( $self->{_auth_state} == S_DONE ) {
    0          
550 0 0       0 if ( $self->{_db_selection_state} == S_NEED_DO ) {
551 0         0 $self->_select_database;
552             }
553             }
554             elsif ( $self->{_auth_state} == S_NEED_DO ) {
555 0         0 $self->_auth;
556             }
557             }
558             }
559             elsif ( $self->{lazy} ) {
560 0         0 undef $self->{lazy};
561 0         0 $self->_connect;
562             }
563             elsif ( $self->{reconnect} ) {
564 0 0 0     0 if ( defined $self->{reconnect_interval}
565             && $self->{reconnect_interval} > 0 )
566             {
567 0 0       0 unless ( defined $self->{_reconnect_timer} ) {
568 0         0 weaken($self);
569              
570             $self->{_reconnect_timer} = AE::timer(
571             $self->{reconnect_interval}, 0,
572             sub {
573 0     0   0 undef $self->{_reconnect_timer};
574 0         0 $self->_connect;
575             }
576 0         0 );
577             }
578             }
579             else {
580 0         0 $self->_connect;
581             }
582             }
583             else {
584             AE::postpone {
585 4     4   164 my $err = _new_error( qq{Operation "$cmd->{name}" aborted:}
586             . ' No connection to the server.', E_NO_CONN );
587 4         20 $cmd->{on_reply}->( undef, $err );
588 4         28 };
589              
590 4         40 return;
591             }
592              
593 13         28 push( @{ $self->{_input_queue} }, $cmd );
  13         33  
594              
595 13         31 return;
596             }
597              
598 0         0 $self->_push_write($cmd);
599              
600 0         0 return;
601             }
602              
603             sub _push_write {
604 0     0   0 my $self = shift;
605 0         0 my $cmd = shift;
606              
607 0         0 my $cmd_str = '';
608 0         0 my @tokens = ( @{ $cmd->{kwds} }, @{ $cmd->{args} } );
  0         0  
  0         0  
609 0         0 foreach my $token (@tokens) {
610 0 0 0     0 unless ( defined $token ) {
611 0         0 $token = '';
612             }
613             elsif ( $self->{utf8} ) {
614             utf8::encode($token);
615             }
616 0         0 $cmd_str .= '$' . length($token) . EOL . $token . EOL;
617             }
618 0         0 $cmd_str = '*' . scalar(@tokens) . EOL . $cmd_str;
619              
620 0         0 my $handle = $self->{_handle};
621              
622 0 0 0     0 if ( defined $self->{read_timeout}
623 0         0 && !@{ $self->{_processing_queue} } )
624             {
625 0         0 $handle->rtimeout_reset;
626 0         0 $handle->rtimeout( $self->{read_timeout} );
627             }
628              
629 0         0 push( @{ $self->{_processing_queue} }, $cmd );
  0         0  
630 0         0 $handle->push_write($cmd_str);
631              
632 0         0 return;
633             }
634              
635             sub _auth {
636 0     0   0 my $self = shift;
637              
638 0         0 weaken($self);
639 0         0 $self->{_auth_state} = S_IN_PROGRESS;
640              
641             $self->_push_write(
642             { name => 'auth',
643             kwds => ['auth'],
644             args => [ $self->{password} ],
645              
646             on_reply => sub {
647 0     0   0 my $err = $_[1];
648              
649 0 0 0     0 if ( defined $err
650             && $err->message ne 'ERR Client sent AUTH, but no password is set' )
651             {
652 0         0 $self->{_auth_state} = S_NEED_DO;
653 0         0 $self->_abort($err);
654              
655 0         0 return;
656             }
657              
658 0         0 $self->{_auth_state} = S_DONE;
659              
660 0 0       0 if ( $self->{_db_selection_state} == S_NEED_DO ) {
661 0         0 $self->_select_database;
662             }
663             else {
664 0         0 $self->{_ready} = 1;
665 0         0 $self->_process_input_queue;
666             }
667             },
668             }
669 0         0 );
670              
671 0         0 return;
672             }
673              
674             sub _select_database {
675 0     0   0 my $self = shift;
676              
677 0         0 weaken($self);
678 0         0 $self->{_db_selection_state} = S_IN_PROGRESS;
679              
680             $self->_push_write(
681             { name => 'select',
682             kwds => ['select'],
683             args => [ $self->{database} ],
684              
685             on_reply => sub {
686 0     0   0 my $err = $_[1];
687              
688 0 0       0 if ( defined $err ) {
689 0         0 $self->{_db_selection_state} = S_NEED_DO;
690 0         0 $self->_abort($err);
691              
692 0         0 return;
693             }
694              
695 0         0 $self->{_db_selection_state} = S_DONE;
696              
697 0         0 $self->{_ready} = 1;
698 0         0 $self->_process_input_queue;
699             },
700             }
701 0         0 );
702              
703 0         0 return;
704             }
705              
706             sub _process_input_queue {
707 0     0   0 my $self = shift;
708              
709 0         0 $self->{_temp_queue} = $self->{_input_queue};
710 0         0 $self->{_input_queue} = [];
711              
712 0         0 while ( my $cmd = shift @{ $self->{_temp_queue} } ) {
  0         0  
713 0         0 $self->_push_write($cmd);
714             }
715              
716 0         0 return;
717             }
718              
719             sub _process_reply {
720 0     0   0 my $self = shift;
721 0         0 my $reply = shift;
722 0         0 my $err_code = shift;
723              
724 0 0 0     0 if ( defined $err_code ) {
    0 0        
725 0         0 $self->_process_error( $reply, $err_code );
726             }
727             elsif ( $self->{_channel_cnt} + $self->{_pchannel_cnt} > 0
728             && ref($reply) && exists $MESSAGE_TYPES{ $reply->[0] } )
729             {
730 0         0 $self->_process_message($reply);
731             }
732             else {
733 0         0 $self->_process_success($reply);
734             }
735              
736 0         0 return;
737             }
738              
739             sub _process_error {
740 0     0   0 my $self = shift;
741 0         0 my $reply = shift;
742 0         0 my $err_code = shift;
743              
744 0         0 my $cmd = shift @{ $self->{_processing_queue} };
  0         0  
745              
746 0 0       0 unless ( defined $cmd ) {
747 0         0 my $err = _new_error(
748             q{Don't know how process error message. Processing queue is empty.},
749             E_UNEXPECTED_DATA
750             );
751 0         0 $self->_disconnect($err);
752              
753 0         0 return;
754             }
755              
756 0 0       0 if ( $err_code == E_NO_AUTH ) {
757 0         0 my $err = _new_error( $reply, $err_code );
758 0         0 $self->_disconnect($err);
759              
760 0         0 return;
761             }
762              
763 0 0 0     0 if ( $cmd->{name} eq 'eval_cached'
764             && $err_code == E_NO_SCRIPT )
765             {
766 0         0 $cmd->{kwds}[0] = 'eval';
767 0         0 $cmd->{args}[0] = $cmd->{script};
768              
769 0         0 $self->_push_write($cmd);
770              
771 0         0 return;
772             }
773              
774 0 0       0 if ( ref($reply) ) {
775 0         0 my $err = _new_error(
776             qq{Operation "$cmd->{name}" completed with errors.}, $err_code );
777 0         0 $cmd->{on_reply}->( $reply, $err );
778             }
779             else {
780 0         0 my $err = _new_error( $reply, $err_code );
781 0         0 $cmd->{on_reply}->( undef, $err );
782             }
783              
784 0         0 return;
785             }
786              
787             sub _process_message {
788 0     0   0 my $self = shift;
789 0         0 my $msg = shift;
790              
791 0         0 my $cmd = $self->{_channels}{ $msg->[1] };
792              
793 0 0       0 unless ( defined $cmd ) {
794 0         0 my $err = _new_error(
795             q{Don't know how process published message.}
796             . qq{ Unknown channel or pattern "$msg->[1]".},
797             E_UNEXPECTED_DATA
798             );
799 0         0 $self->_disconnect($err);
800              
801 0         0 return;
802             }
803              
804             $cmd->{on_message}->(
805             $msg->[0] eq 'pmessage'
806 0         0 ? @{$msg}[ 3, 1, 2 ]
807 0 0       0 : @{$msg}[ 2, 1 ]
  0         0  
808             );
809              
810 0         0 return;
811             }
812              
813             sub _process_success {
814 0     0   0 my $self = shift;
815 0         0 my $reply = shift;
816              
817 0         0 my $cmd = $self->{_processing_queue}[0];
818              
819 0 0       0 unless ( defined $cmd ) {
820 0         0 my $err = _new_error(
821             q{Don't know how process reply. Processing queue is empty.},
822             E_UNEXPECTED_DATA
823             );
824 0         0 $self->_disconnect($err);
825              
826 0         0 return;
827             }
828              
829 0 0       0 if ( exists $SUBUNSUB_CMDS{ $cmd->{name} } ) {
830 0 0       0 if ( $cmd->{name} eq 'subscribe' ) {
    0          
    0          
831 0         0 $self->{_channels}{ $reply->[1] } = $cmd;
832 0         0 $self->{_channel_cnt}++;
833             }
834             elsif ( $cmd->{name} eq 'psubscribe' ) {
835 0         0 $self->{_channels}{ $reply->[1] } = $cmd;
836 0         0 $self->{_pchannel_cnt}++;
837             }
838             elsif ( $cmd->{name} eq 'unsubscribe' ) {
839 0 0       0 unless ( defined $cmd->{reply_cnt} ) {
840 0         0 $cmd->{reply_cnt} = $self->{_channel_cnt};
841             }
842              
843 0         0 delete $self->{_channels}{ $reply->[1] };
844 0         0 $self->{_channel_cnt}--;
845             }
846             else { # punsubscribe
847 0 0       0 unless ( defined $cmd->{reply_cnt} ) {
848 0         0 $cmd->{reply_cnt} = $self->{_pchannel_cnt};
849             }
850              
851 0         0 delete $self->{_channels}{ $reply->[1] };
852 0         0 $self->{_pchannel_cnt}--;
853             }
854              
855 0         0 $reply = $reply->[2];
856             }
857              
858 0 0 0     0 if ( !defined $cmd->{reply_cnt}
859             || --$cmd->{reply_cnt} == 0 )
860             {
861 0         0 shift @{ $self->{_processing_queue} };
  0         0  
862              
863 0 0       0 if ( exists $NEED_POSTPROCESS{ $cmd->{name} } ) {
864 0 0 0     0 if ( $cmd->{name} eq 'info'
    0          
865             || $cmd->{name} eq 'cluster_info' )
866             {
867 0         0 $reply = _parse_info($reply);
868             }
869             elsif ( $cmd->{name} eq 'select' ) {
870 0         0 $self->{database} = $cmd->{args}[0];
871             }
872             else { # quit
873 0         0 $self->_disconnect;
874             }
875             }
876              
877 0         0 $cmd->{on_reply}->($reply);
878             }
879              
880 0         0 return;
881             }
882              
883             sub _parse_info {
884 0         0 return { map { split( m/:/, $_, 2 ) }
885 0     0   0 grep { m/^[^#]/ } split( EOL, $_[0] ) };
  0         0  
886             }
887              
888             sub _disconnect {
889 11     11   21 my $self = shift;
890 11         27 my $err = shift;
891              
892 11         18 my $was_connected = $self->{_connected};
893              
894 11 50       35 if ( defined $self->{_handle} ) {
895 11         76 $self->{_handle}->destroy;
896             }
897 11         902 $self->_reset_internals;
898 11         54 $self->_abort($err);
899              
900 11 50 33     48 if ( $was_connected && defined $self->{on_disconnect} ) {
901 0         0 $self->{on_disconnect}->();
902             }
903              
904 11         42 return;
905             }
906              
907             sub _reset_internals {
908 30     30   66 my $self = shift;
909              
910 30         148 $self->{_handle} = undef;
911 30         97 $self->{_connected} = 0;
912 30         62 $self->{_auth_state} = S_NEED_DO;
913 30         64 $self->{_db_selection_state} = S_NEED_DO;
914 30         47 $self->{_ready} = 0;
915 30         67 $self->{_multi_mode} = 0;
916 30         57 $self->{_reconnect_timer} = undef;
917              
918 30         55 return;
919             }
920              
921             sub _abort {
922 11     11   24 my $self = shift;
923 11         19 my $err = shift;
924              
925 11         47 my @queued_commands = $self->_queued_commands;
926 11         24 my %channels = %{ $self->{_channels} };
  11         32  
927              
928 11         32 $self->{_input_queue} = [];
929 11         27 $self->{_temp_queue} = [];
930 11         23 $self->{_processing_queue} = [];
931 11         27 $self->{_channels} = {};
932 11         22 $self->{_channel_cnt} = 0;
933 11         18 $self->{_pchannel_cnt} = 0;
934              
935 11 100 66     71 if ( !defined $err && @queued_commands ) {
936 3         34 $err = _new_error( 'Connection closed by client prematurely.',
937             E_CONN_CLOSED_BY_CLIENT );
938             }
939              
940 11 50       29 if ( defined $err ) {
941 11         47 my $err_msg = $err->message;
942 11         35 my $err_code = $err->code;
943              
944 11         47 $self->{on_error}->($err);
945              
946 11 50 33     47 if ( %channels && $err_code != E_CONN_CLOSED_BY_CLIENT ) {
947 0         0 foreach my $name ( keys %channels ) {
948 0         0 my $err = _new_error(
949             qq{Subscription to channel "$name" lost: $err_msg},
950             $err_code
951             );
952              
953 0         0 my $cmd = $channels{$name};
954 0         0 $cmd->{on_reply}->( undef, $err );
955             }
956             }
957              
958 11         43 foreach my $cmd (@queued_commands) {
959 11         57 my $err = _new_error( qq{Operation "$cmd->{name}" aborted: $err_msg},
960             $err_code );
961 11         39 $cmd->{on_reply}->( undef, $err );
962             }
963             }
964              
965 11         169 return;
966             }
967              
968             sub _queued_commands {
969 22     22   45 my $self = shift;
970              
971             return (
972 22         52 @{ $self->{_processing_queue} },
973 22         53 @{ $self->{_temp_queue} },
974 22         79 @{ $self->{_input_queue} },
  22         74  
975             );
976             }
977              
978             sub _new_error {
979 26     26   192 return AnyEvent::RipeRedis::Error->new(@_);
980             }
981              
982             sub AUTOLOAD {
983 12     12   4133 our $AUTOLOAD;
984 12         24 my $cmd_name = $AUTOLOAD;
985 12         83 $cmd_name =~ s/^.+:://;
986              
987             my $sub = sub {
988 25     25   378 my $self = shift;
989              
990 25         99 my $cmd = $self->_prepare( $cmd_name, [@_] );
991 25         95 $self->_execute($cmd);
992              
993 17         49 return;
994 12         48 };
995              
996 12         25 do {
997 21     21   292 no strict 'refs';
  21         61  
  21         4499  
998 12         23 *{$cmd_name} = $sub;
  12         37  
999             };
1000              
1001 12         26 goto &{$sub};
  12         36  
1002             }
1003              
1004             sub DESTROY {
1005 17     17   4011 my $self = shift;
1006              
1007 17 100       89 if ( defined $self->{_handle} ) {
1008 8         43 $self->{_handle}->destroy;
1009             }
1010              
1011 17 100       1683 if ( defined $self->{_processing_queue} ) {
1012 11         45 my @queued_commands = $self->_queued_commands;
1013              
1014 11         55 foreach my $cmd (@queued_commands) {
1015 2         38 warn qq{Operation "$cmd->{name}" aborted:}
1016             . " Client object destroyed prematurely.\n";
1017             }
1018             }
1019              
1020 17         661 return;
1021             }
1022              
1023             1;
1024             __END__