File Coverage

blib/lib/AnyEvent/RipeRedis.pm
Criterion Covered Total %
statement 233 476 48.9
branch 49 174 28.1
condition 19 60 31.6
subroutine 43 61 70.4
pod 4 4 100.0
total 348 775 44.9


line stmt bran cond sub pod time code
1             package AnyEvent::RipeRedis;
2              
3 22     22   306511 use 5.008000;
  22         56  
4 22     22   71 use strict;
  22         22  
  22         440  
5 22     22   67 use warnings;
  22         20  
  22         526  
6 22     22   64 use base qw( Exporter );
  22         24  
  22         2036  
7              
8             our $VERSION = '0.42';
9              
10 22     22   6907 use AnyEvent::RipeRedis::Error;
  22         40  
  22         483  
11              
12 22     22   17169 use AnyEvent;
  22         84807  
  22         595  
13 22     22   12630 use AnyEvent::Handle;
  22         320100  
  22         758  
14 22     22   127 use Scalar::Util qw( looks_like_number weaken );
  22         21  
  22         1528  
15 22     22   9946 use Digest::SHA qw( sha1_hex );
  22         52194  
  22         1389  
16 22     22   112 use Carp qw( croak );
  22         26  
  22         1549  
17              
18             my %ERROR_CODES;
19              
20             BEGIN {
21 22     22   366 %ERROR_CODES = %AnyEvent::RipeRedis::Error::ERROR_CODES;
22 22         137 our @EXPORT_OK = keys %ERROR_CODES;
23 22         844 our %EXPORT_TAGS = ( err_codes => \@EXPORT_OK );
24             }
25              
26             use constant {
27             # Default values
28 22         14617 D_HOST => 'localhost',
29             D_PORT => 6379,
30             D_DB_INDEX => 0,
31              
32             %ERROR_CODES,
33              
34             # Operation status
35             S_NEED_DO => 1,
36             S_IN_PROGRESS => 2,
37             S_DONE => 3,
38              
39             # String terminator
40             EOL => "\r\n",
41             EOL_LENGTH => 2,
42 22     22   87 };
  22         29  
43              
44             my %SUB_CMDS = (
45             subscribe => 1,
46             psubscribe => 1,
47             );
48              
49             my %SUBUNSUB_CMDS = (
50             %SUB_CMDS,
51             unsubscribe => 1,
52             punsubscribe => 1,
53             );
54              
55             my %MESSAGE_TYPES = (
56             message => 1,
57             pmessage => 1,
58             );
59              
60             my %NEED_PREPROCESS = (
61             multi => 1,
62             exec => 1,
63             discard => 1,
64             eval_cached => 1,
65             %SUBUNSUB_CMDS,
66             );
67              
68             my %NEED_POSTPROCESS = (
69             info => 1,
70             cluster_info => 1,
71             select => 1,
72             quit => 1,
73             );
74              
75             my %ERR_PREFS_MAP = (
76             LOADING => E_LOADING_DATASET,
77             NOSCRIPT => E_NO_SCRIPT,
78             BUSY => E_BUSY,
79             MASTERDOWN => E_MASTER_DOWN,
80             MISCONF => E_MISCONF,
81             READONLY => E_READONLY,
82             OOM => E_OOM,
83             EXECABORT => E_EXEC_ABORT,
84             NOAUTH => E_NO_AUTH,
85             WRONGTYPE => E_WRONG_TYPE,
86             NOREPLICAS => E_NO_REPLICAS,
87             BUSYKEY => E_BUSY_KEY,
88             CROSSSLOT => E_CROSS_SLOT,
89             TRYAGAIN => E_TRY_AGAIN,
90             ASK => E_ASK,
91             MOVED => E_MOVED,
92             CLUSTERDOWN => E_CLUSTER_DOWN,
93             NOTBUSY => E_NOT_BUSY,
94             );
95              
96             my %EVAL_CACHE;
97              
98              
99             sub new {
100 25     25 1 76212 my $class = shift;
101 25         89 my %params = @_;
102              
103 25         50 my $self = bless {}, $class;
104              
105 25   50     180 $self->{host} = $params{host} || D_HOST;
106 25   100     92 $self->{port} = $params{port} || D_PORT;
107 25         38 $self->{password} = $params{password};
108             $self->{database}
109 25 50       60 = defined $params{database} ? $params{database} : D_DB_INDEX;
110 25 50       59 $self->{utf8} = exists $params{utf8} ? $params{utf8} : 1;
111 25         32 $self->{lazy} = $params{lazy};
112 25 100       54 $self->{reconnect} = exists $params{reconnect} ? $params{reconnect} : 1;
113 25   50     124 $self->{handle_params} = $params{handle_params} || {};
114 25         130 $self->{on_connect} = $params{on_connect};
115 25         41 $self->{on_disconnect} = $params{on_disconnect};
116              
117 25         90 $self->connection_timeout( $params{connection_timeout} );
118 23         71 $self->read_timeout( $params{read_timeout} );
119 21         68 $self->reconnect_interval( $params{reconnect_interval} );
120 19         66 $self->on_error( $params{on_error} );
121              
122 19         48 $self->_reset_internals;
123 19         33 $self->{_input_queue} = [];
124 19         33 $self->{_temp_queue} = [];
125 19         20 $self->{_processing_queue} = [];
126 19         33 $self->{_channels} = {};
127 19         23 $self->{_channel_cnt} = 0;
128 19         30 $self->{_pchannel_cnt} = 0;
129              
130 19 50       38 unless ( $self->{lazy} ) {
131 19         47 $self->_connect;
132             }
133              
134 19         75 return $self;
135             }
136              
137             sub execute {
138 0     0 1 0 my $self = shift;
139 0         0 my $cmd_name = shift;
140              
141 0         0 my $cmd = $self->_prepare( $cmd_name, [@_] );
142 0         0 $self->_execute($cmd);
143              
144 0         0 return;
145             }
146              
147             sub disconnect {
148 3     3 1 421 my $self = shift;
149              
150 3         13 $self->_disconnect;
151              
152 3         6 return;
153             }
154              
155             sub on_error {
156 24     24 1 288 my $self = shift;
157              
158 24 100       55 if (@_) {
159 21         27 my $on_error = shift;
160              
161 21 100       37 if ( defined $on_error ) {
162 13         24 $self->{on_error} = $on_error;
163             }
164             else {
165             $self->{on_error} = sub {
166 1     1   2 my $err = shift;
167 1         3 warn $err->message . "\n";
168 8         29 };
169             }
170             }
171              
172 24         39 return $self->{on_error};
173             }
174              
175             # Generate accessors
176             {
177 22     22   100 no strict qw( refs );
  22         24  
  22         62488  
178              
179             foreach my $name ( qw( host port database ) ) {
180             *{$name} = sub {
181 3     3   2472 my $self = shift;
182 3         12 return $self->{$name};
183             }
184             }
185              
186             foreach my $name ( qw( connection_timeout read_timeout
187             reconnect_interval ) )
188             {
189             *{$name} = sub {
190 90     90   1813 my $self = shift;
191              
192 90 100       153 if (@_) {
193 81         88 my $seconds = shift;
194              
195 81 100 100     277 if ( defined $seconds
      66        
196             && ( !looks_like_number($seconds) || $seconds < 0 ) )
197             {
198 12         1103 croak qq{"$name" must be a positive number};
199             }
200 69         96 $self->{$name} = $seconds;
201             }
202              
203 78         100 return $self->{$name};
204             };
205             }
206              
207             foreach my $name ( qw( utf8 reconnect on_connect on_disconnect ) ) {
208             *{$name} = sub {
209 20     20   491 my $self = shift;
210              
211 20 100       34 if (@_) {
212 8         10 $self->{$name} = shift;
213             }
214              
215 20         44 return $self->{$name};
216             };
217             }
218             }
219              
220             sub _connect {
221 19     19   18 my $self = shift;
222              
223             $self->{_handle} = AnyEvent::Handle->new(
224 19         92 %{ $self->{handle_params} },
225 19         23 connect => [ $self->{host}, $self->{port} ],
226             on_prepare => $self->_create_on_prepare,
227             on_connect => $self->_create_on_connect,
228             on_connect_error => $self->_create_on_connect_error,
229             on_rtimeout => $self->_create_on_rtimeout,
230             on_eof => $self->_create_on_eof,
231             on_error => $self->_create_on_handle_error,
232             on_read => $self->_create_on_read,
233             );
234              
235 19         2202 return;
236             }
237              
238             sub _create_on_prepare {
239 19     19   28 my $self = shift;
240              
241 19         57 weaken($self);
242              
243             return sub {
244 27 100   27   92418 if ( defined $self->{connection_timeout} ) {
245 17         52 return $self->{connection_timeout};
246             }
247              
248 10         30 return;
249 19         82 };
250             }
251              
252             sub _create_on_connect {
253 19     19   25 my $self = shift;
254              
255 19         37 weaken($self);
256              
257             return sub {
258 0     0   0 $self->{_connected} = 1;
259              
260 0 0       0 unless ( defined $self->{password} ) {
261 0         0 $self->{_auth_state} = S_DONE;
262             }
263 0 0       0 if ( $self->{database} == 0 ) {
264 0         0 $self->{_db_selection_state} = S_DONE;
265             }
266              
267 0 0       0 if ( $self->{_auth_state} == S_NEED_DO ) {
    0          
268 0         0 $self->_auth;
269             }
270             elsif ( $self->{_db_selection_state} == S_NEED_DO ) {
271 0         0 $self->_select_database;
272             }
273             else {
274 0         0 $self->{_ready} = 1;
275 0         0 $self->_process_input_queue;
276             }
277              
278 0 0       0 if ( defined $self->{on_connect} ) {
279 0         0 $self->{on_connect}->();
280             }
281 19         71 };
282             }
283              
284             sub _create_on_connect_error {
285 19     19   35 my $self = shift;
286              
287 19         29 weaken($self);
288              
289             return sub {
290 8     8   1040 my $err_msg = pop;
291              
292 8         44 my $err = _new_error(
293             "Can't connect to $self->{host}:$self->{port}: $err_msg",
294             E_CANT_CONN
295             );
296 8         20 $self->_disconnect($err);
297 19         76 };
298             }
299              
300             sub _create_on_rtimeout {
301 19     19   25 my $self = shift;
302              
303 19         24 weaken($self);
304              
305             return sub {
306 0 0   0   0 if ( @{ $self->{_processing_queue} } ) {
  0         0  
307 0         0 my $err = _new_error( 'Read timed out.', E_READ_TIMEDOUT );
308 0         0 $self->_disconnect($err);
309             }
310             else {
311 0         0 $self->{_handle}->rtimeout(undef);
312             }
313 19         77 };
314             }
315              
316             sub _create_on_eof {
317 19     19   18 my $self = shift;
318              
319 19         31 weaken($self);
320              
321             return sub {
322 0     0   0 my $err = _new_error( 'Connection closed by remote host.',
323             E_CONN_CLOSED_BY_REMOTE_HOST );
324 0         0 $self->_disconnect($err);
325 19         63 };
326             }
327              
328             sub _create_on_handle_error {
329 19     19   24 my $self = shift;
330              
331 19         26 weaken($self);
332              
333             return sub {
334 0     0   0 my $err_msg = pop;
335              
336 0         0 my $err = _new_error( $err_msg, E_IO );
337 0         0 $self->_disconnect($err);
338 19         63 };
339             }
340              
341             sub _create_on_read {
342 19     19   18 my $self = shift;
343              
344 19         29 weaken($self);
345              
346 19         18 my $str_len;
347             my @bufs;
348 19         21 my $bufs_num = 0;
349              
350             return sub {
351 0     0   0 my $handle = shift;
352              
353 0         0 MAIN: while (1) {
354 0 0       0 return if $handle->destroyed;
355              
356 0         0 my $reply;
357             my $err_code;
358              
359 0 0       0 if ( defined $str_len ) {
360 0 0       0 if ( length( $handle->{rbuf} ) < $str_len + EOL_LENGTH ) {
361 0         0 return;
362             }
363              
364 0         0 $reply = substr( $handle->{rbuf}, 0, $str_len, '' );
365 0         0 substr( $handle->{rbuf}, 0, EOL_LENGTH, '' );
366 0 0       0 if ( $self->{utf8} ) {
367 0         0 utf8::decode($reply);
368             }
369              
370 0         0 undef $str_len;
371             }
372             else {
373 0         0 my $eol_pos = index( $handle->{rbuf}, EOL );
374              
375 0 0       0 if ( $eol_pos < 0 ) {
376 0         0 return;
377             }
378              
379 0         0 $reply = substr( $handle->{rbuf}, 0, $eol_pos, '' );
380 0         0 my $type = substr( $reply, 0, 1, '' );
381 0         0 substr( $handle->{rbuf}, 0, EOL_LENGTH, '' );
382              
383 0 0 0     0 if ( $type ne '+' && $type ne ':' ) {
384 0 0       0 if ( $type eq '$' ) {
    0          
    0          
385 0 0       0 if ( $reply >= 0 ) {
386 0         0 $str_len = $reply;
387 0         0 next;
388             }
389              
390 0         0 undef $reply;
391             }
392             elsif ( $type eq '*' ) {
393 0 0       0 if ( $reply > 0 ) {
    0          
394 0         0 push( @bufs,
395             { reply => [],
396             err_code => undef,
397             chunks_cnt => $reply,
398             }
399             );
400 0         0 $bufs_num++;
401              
402 0         0 next;
403             }
404             elsif ( $reply == 0 ) {
405 0         0 $reply = [];
406             }
407             else {
408 0         0 undef $reply;
409             }
410             }
411             elsif ( $type eq '-' ) {
412 0         0 $err_code = E_OPRN_ERROR;
413 0 0       0 if ( $reply =~ m/^([A-Z]{3,}) / ) {
414 0 0       0 if ( exists $ERR_PREFS_MAP{$1} ) {
415 0         0 $err_code = $ERR_PREFS_MAP{$1};
416             }
417             }
418             }
419             else {
420 0         0 my $err = _new_error( 'Unexpected reply received.',
421             E_UNEXPECTED_DATA );
422 0         0 $self->_disconnect($err);
423              
424 0         0 return;
425             }
426             }
427             }
428              
429 0         0 while ( $bufs_num > 0 ) {
430 0         0 my $curr_buf = $bufs[-1];
431 0 0       0 if ( defined $err_code ) {
432 0 0       0 unless ( ref($reply) ) {
433 0         0 $reply = _new_error( $reply, $err_code );
434             }
435 0         0 $curr_buf->{err_code} = E_OPRN_ERROR;
436             }
437 0         0 push( @{ $curr_buf->{reply} }, $reply );
  0         0  
438 0 0       0 if ( --$curr_buf->{chunks_cnt} > 0 ) {
439 0         0 next MAIN;
440             }
441              
442 0         0 $reply = $curr_buf->{reply};
443 0         0 $err_code = $curr_buf->{err_code};
444 0         0 pop @bufs;
445 0         0 $bufs_num--;
446             }
447              
448 0         0 $self->_process_reply( $reply, $err_code );
449             }
450              
451 0         0 return;
452 19         176 };
453             }
454              
455             sub _prepare {
456 25     25   29 my $self = shift;
457 25         33 my $cmd_name = shift;
458 25         25 my $args = shift;
459              
460 25         18 my $cbs;
461 25 50       62 if ( ref( $args->[-1] ) eq 'HASH' ) {
462 0         0 $cbs = pop @{$args};
  0         0  
463             }
464             else {
465 25         31 $cbs = {};
466 25 100       61 if ( ref( $args->[-1] ) eq 'CODE' ) {
467 16 50       40 if ( exists $SUB_CMDS{$cmd_name} ) {
468 0         0 $cbs->{on_message} = pop @{$args};
  0         0  
469             }
470             else {
471 16         6 $cbs->{on_reply} = pop @{$args};
  16         40  
472             }
473             }
474             }
475              
476             my @kwds
477 25 50       105 = $cmd_name eq 'eval_cached'
478             ? ('evalsha')
479             : split( m/_/, lc($cmd_name) );
480              
481             my $cmd = {
482             name => $cmd_name,
483             kwds => \@kwds,
484             args => $args,
485 25         39 %{$cbs},
  25         96  
486             };
487              
488 25 100       65 unless ( defined $cmd->{on_reply} ) {
489 9         38 weaken($self);
490              
491             $cmd->{on_reply} = sub {
492 1     1   1 my $err = $_[1];
493              
494 1 50       3 if ( defined $err ) {
495 1         2 $self->{on_error}->($err);
496 1         7 return;
497             }
498 9         20 };
499             }
500              
501 25         49 return $cmd;
502             }
503              
504             sub _execute {
505 25     25   27 my $self = shift;
506 25         26 my $cmd = shift;
507              
508 25 50 66     103 if ( $self->{_multi_mode}
      66        
509             && ( exists $SUBUNSUB_CMDS{ $cmd->{name} }
510             || exists $NEED_POSTPROCESS{ $cmd->{name} } ) )
511             {
512 7         519 croak qq{Command "$cmd->{name}" not allowed after "multi" command.}
513             . ' First, the transaction must be finalized.';
514             }
515              
516 18 100       49 if ( exists $NEED_PREPROCESS{ $cmd->{name} } ) {
517 2 100 33     10 if ( $cmd->{name} eq 'multi' ) {
    50          
    50          
518 1         2 $self->{_multi_mode} = 1;
519             }
520             elsif ( $cmd->{name} eq 'exec'
521             || $cmd->{name} eq 'discard' )
522             {
523 0         0 $self->{_multi_mode} = 0;
524             }
525             elsif ( $cmd->{name} eq 'eval_cached' ) {
526 0         0 my $script = $cmd->{args}[0];
527 0 0       0 unless ( exists $EVAL_CACHE{$script} ) {
528 0         0 $EVAL_CACHE{$script} = sha1_hex($script);
529             }
530 0         0 $cmd->{args}[0] = $EVAL_CACHE{$script};
531 0         0 $cmd->{script} = $script;
532             }
533             else { # subscribe, unsubscribe, psubscribe, punsubscribe
534 1 50 33     7 if ( exists $SUB_CMDS{ $cmd->{name} }
535             && !defined $cmd->{on_message} )
536             {
537 1         94 croak '"on_message" callback must be specified';
538             }
539              
540 0 0       0 if ( @{ $cmd->{args} } ) {
  0         0  
541 0         0 $cmd->{reply_cnt} = scalar @{ $cmd->{args} };
  0         0  
542             }
543             }
544             }
545              
546 17 50       39 unless ( $self->{_ready} ) {
547 17 100       46 if ( defined $self->{_handle} ) {
    50          
    50          
548 13 50       31 if ( $self->{_connected} ) {
549 0 0       0 if ( $self->{_auth_state} == S_DONE ) {
    0          
550 0 0       0 if ( $self->{_db_selection_state} == S_NEED_DO ) {
551 0         0 $self->_select_database;
552             }
553             }
554             elsif ( $self->{_auth_state} == S_NEED_DO ) {
555 0         0 $self->_auth;
556             }
557             }
558             }
559             elsif ( $self->{lazy} ) {
560 0         0 undef $self->{lazy};
561 0         0 $self->_connect;
562             }
563             elsif ( $self->{reconnect} ) {
564 0 0 0     0 if ( defined $self->{reconnect_interval}
565             && $self->{reconnect_interval} > 0 )
566             {
567 0 0       0 unless ( defined $self->{_reconnect_timer} ) {
568 0         0 weaken($self);
569              
570             $self->{_reconnect_timer} = AE::timer(
571             $self->{reconnect_interval}, 0,
572             sub {
573 0     0   0 undef $self->{_reconnect_timer};
574 0         0 $self->_connect;
575             }
576 0         0 );
577             }
578             }
579             else {
580 0         0 $self->_connect;
581             }
582             }
583             else {
584             AE::postpone {
585 4     4   4564 my $err = _new_error( qq{Operation "$cmd->{name}" aborted:}
586             . ' No connection to the server.', E_NO_CONN );
587 4         16 $cmd->{on_reply}->( undef, $err );
588 4         24 };
589              
590 4         24 return;
591             }
592              
593 13         15 push( @{ $self->{_input_queue} }, $cmd );
  13         22  
594              
595 13         140 return;
596             }
597              
598 0         0 $self->_push_write($cmd);
599              
600 0         0 return;
601             }
602              
603             sub _push_write {
604 0     0   0 my $self = shift;
605 0         0 my $cmd = shift;
606              
607 0         0 my $cmd_str = '';
608 0         0 my @tokens = ( @{ $cmd->{kwds} }, @{ $cmd->{args} } );
  0         0  
  0         0  
609 0         0 foreach my $token (@tokens) {
610 0 0 0     0 unless ( defined $token ) {
611 0         0 $token = '';
612             }
613             elsif ( $self->{utf8} ) {
614             utf8::encode($token);
615             }
616 0         0 $cmd_str .= '$' . length($token) . EOL . $token . EOL;
617             }
618 0         0 $cmd_str = '*' . scalar(@tokens) . EOL . $cmd_str;
619              
620 0         0 my $handle = $self->{_handle};
621              
622 0 0 0     0 if ( defined $self->{read_timeout}
623 0         0 && !@{ $self->{_processing_queue} } )
624             {
625 0         0 $handle->rtimeout_reset;
626 0         0 $handle->rtimeout( $self->{read_timeout} );
627             }
628              
629 0         0 push( @{ $self->{_processing_queue} }, $cmd );
  0         0  
630 0         0 $handle->push_write($cmd_str);
631              
632 0         0 return;
633             }
634              
635             sub _auth {
636 0     0   0 my $self = shift;
637              
638 0         0 weaken($self);
639 0         0 $self->{_auth_state} = S_IN_PROGRESS;
640              
641             $self->_push_write(
642             { name => 'auth',
643             kwds => ['auth'],
644             args => [ $self->{password} ],
645              
646             on_reply => sub {
647 0     0   0 my $err = $_[1];
648              
649 0 0       0 if ( defined $err ) {
650 0         0 $self->{_auth_state} = S_NEED_DO;
651 0         0 $self->_abort($err);
652              
653 0         0 return;
654             }
655              
656 0         0 $self->{_auth_state} = S_DONE;
657              
658 0 0       0 if ( $self->{_db_selection_state} == S_NEED_DO ) {
659 0         0 $self->_select_database;
660             }
661             else {
662 0         0 $self->{_ready} = 1;
663 0         0 $self->_process_input_queue;
664             }
665             },
666             }
667 0         0 );
668              
669 0         0 return;
670             }
671              
672             sub _select_database {
673 0     0   0 my $self = shift;
674              
675 0         0 weaken($self);
676 0         0 $self->{_db_selection_state} = S_IN_PROGRESS;
677              
678             $self->_push_write(
679             { name => 'select',
680             kwds => ['select'],
681             args => [ $self->{database} ],
682              
683             on_reply => sub {
684 0     0   0 my $err = $_[1];
685              
686 0 0       0 if ( defined $err ) {
687 0         0 $self->{_db_selection_state} = S_NEED_DO;
688 0         0 $self->_abort($err);
689              
690 0         0 return;
691             }
692              
693 0         0 $self->{_db_selection_state} = S_DONE;
694              
695 0         0 $self->{_ready} = 1;
696 0         0 $self->_process_input_queue;
697             },
698             }
699 0         0 );
700              
701 0         0 return;
702             }
703              
704             sub _process_input_queue {
705 0     0   0 my $self = shift;
706              
707 0         0 $self->{_temp_queue} = $self->{_input_queue};
708 0         0 $self->{_input_queue} = [];
709              
710 0         0 while ( my $cmd = shift @{ $self->{_temp_queue} } ) {
  0         0  
711 0         0 $self->_push_write($cmd);
712             }
713              
714 0         0 return;
715             }
716              
717             sub _process_reply {
718 0     0   0 my $self = shift;
719 0         0 my $reply = shift;
720 0         0 my $err_code = shift;
721              
722 0 0 0     0 if ( defined $err_code ) {
    0 0        
723 0         0 $self->_process_error( $reply, $err_code );
724             }
725             elsif ( $self->{_channel_cnt} + $self->{_pchannel_cnt} > 0
726             && ref($reply) && exists $MESSAGE_TYPES{ $reply->[0] } )
727             {
728 0         0 $self->_process_message($reply);
729             }
730             else {
731 0         0 $self->_process_success($reply);
732             }
733              
734 0         0 return;
735             }
736              
737             sub _process_error {
738 0     0   0 my $self = shift;
739 0         0 my $reply = shift;
740 0         0 my $err_code = shift;
741              
742 0         0 my $cmd = shift @{ $self->{_processing_queue} };
  0         0  
743              
744 0 0       0 unless ( defined $cmd ) {
745 0         0 my $err = _new_error(
746             q{Don't know how process error message. Processing queue is empty.},
747             E_UNEXPECTED_DATA
748             );
749 0         0 $self->_disconnect($err);
750              
751 0         0 return;
752             }
753              
754 0 0 0     0 if ( $cmd->{name} eq 'eval_cached'
755             && $err_code == E_NO_SCRIPT )
756             {
757 0         0 $cmd->{kwds}[0] = 'eval';
758 0         0 $cmd->{args}[0] = $cmd->{script};
759              
760 0         0 $self->_push_write($cmd);
761              
762 0         0 return;
763             }
764              
765 0 0       0 if ( ref($reply) ) {
766 0         0 my $err = _new_error(
767             qq{Operation "$cmd->{name}" completed with errors.}, $err_code );
768 0         0 $cmd->{on_reply}->( $reply, $err );
769             }
770             else {
771 0         0 my $err = _new_error( $reply, $err_code );
772 0         0 $cmd->{on_reply}->( undef, $err );
773             }
774              
775 0         0 return;
776             }
777              
778             sub _process_message {
779 0     0   0 my $self = shift;
780 0         0 my $msg = shift;
781              
782 0         0 my $cmd = $self->{_channels}{ $msg->[1] };
783              
784 0 0       0 unless ( defined $cmd ) {
785 0         0 my $err = _new_error(
786             q{Don't know how process published message.}
787             . qq{ Unknown channel or pattern "$msg->[1]".},
788             E_UNEXPECTED_DATA
789             );
790 0         0 $self->_disconnect($err);
791              
792 0         0 return;
793             }
794              
795             $cmd->{on_message}->(
796             $msg->[0] eq 'pmessage'
797 0         0 ? @{$msg}[ 3, 1, 2 ]
798 0 0       0 : @{$msg}[ 2, 1 ]
  0         0  
799             );
800              
801 0         0 return;
802             }
803              
804             sub _process_success {
805 0     0   0 my $self = shift;
806 0         0 my $reply = shift;
807              
808 0         0 my $cmd = $self->{_processing_queue}[0];
809              
810 0 0       0 unless ( defined $cmd ) {
811 0         0 my $err = _new_error(
812             q{Don't know how process reply. Processing queue is empty.},
813             E_UNEXPECTED_DATA
814             );
815 0         0 $self->_disconnect($err);
816              
817 0         0 return;
818             }
819              
820 0 0       0 if ( exists $SUBUNSUB_CMDS{ $cmd->{name} } ) {
821 0 0       0 if ( $cmd->{name} eq 'subscribe' ) {
    0          
    0          
822 0         0 $self->{_channels}{ $reply->[1] } = $cmd;
823 0         0 $self->{_channel_cnt}++;
824             }
825             elsif ( $cmd->{name} eq 'psubscribe' ) {
826 0         0 $self->{_channels}{ $reply->[1] } = $cmd;
827 0         0 $self->{_pchannel_cnt}++;
828             }
829             elsif ( $cmd->{name} eq 'unsubscribe' ) {
830 0 0       0 unless ( defined $cmd->{reply_cnt} ) {
831 0         0 $cmd->{reply_cnt} = $self->{_channel_cnt};
832             }
833              
834 0         0 delete $self->{_channels}{ $reply->[1] };
835 0         0 $self->{_channel_cnt}--;
836             }
837             else { # punsubscribe
838 0 0       0 unless ( defined $cmd->{reply_cnt} ) {
839 0         0 $cmd->{reply_cnt} = $self->{_pchannel_cnt};
840             }
841              
842 0         0 delete $self->{_channels}{ $reply->[1] };
843 0         0 $self->{_pchannel_cnt}--;
844             }
845              
846 0         0 $reply = $reply->[2];
847             }
848              
849 0 0 0     0 if ( !defined $cmd->{reply_cnt}
850             || --$cmd->{reply_cnt} == 0 )
851             {
852 0         0 shift @{ $self->{_processing_queue} };
  0         0  
853              
854 0 0       0 if ( exists $NEED_POSTPROCESS{ $cmd->{name} } ) {
855 0 0 0     0 if ( $cmd->{name} eq 'info'
    0          
856             || $cmd->{name} eq 'cluster_info' )
857             {
858 0         0 $reply = _parse_info($reply);
859             }
860             elsif ( $cmd->{name} eq 'select' ) {
861 0         0 $self->{database} = $cmd->{args}[0];
862             }
863             else { # quit
864 0         0 $self->_disconnect;
865             }
866             }
867              
868 0         0 $cmd->{on_reply}->($reply);
869             }
870              
871 0         0 return;
872             }
873              
874             sub _parse_info {
875 0         0 return { map { split( m/:/, $_, 2 ) }
876 0     0   0 grep { m/^[^#]/ } split( EOL, $_[0] ) };
  0         0  
877             }
878              
879             sub _disconnect {
880 11     11   15 my $self = shift;
881 11         12 my $err = shift;
882              
883 11         18 my $was_connected = $self->{_connected};
884              
885 11 50       29 if ( defined $self->{_handle} ) {
886 11         48 $self->{_handle}->destroy;
887             }
888 11         659 $self->_reset_internals;
889 11         33 $self->_abort($err);
890              
891 11 50 33     27 if ( $was_connected && defined $self->{on_disconnect} ) {
892 0         0 $self->{on_disconnect}->();
893             }
894              
895 11         31 return;
896             }
897              
898             sub _reset_internals {
899 30     30   36 my $self = shift;
900              
901 30         49 $self->{_handle} = undef;
902 30         122 $self->{_connected} = 0;
903 30         40 $self->{_auth_state} = S_NEED_DO;
904 30         35 $self->{_db_selection_state} = S_NEED_DO;
905 30         37 $self->{_ready} = 0;
906 30         33 $self->{_multi_mode} = 0;
907 30         39 $self->{_reconnect_timer} = undef;
908              
909 30         22 return;
910             }
911              
912             sub _abort {
913 11     11   17 my $self = shift;
914 11         16 my $err = shift;
915              
916 11         26 my @queued_commands = $self->_queued_commands;
917 11         11 my %channels = %{ $self->{_channels} };
  11         26  
918              
919 11         18 $self->{_input_queue} = [];
920 11         18 $self->{_temp_queue} = [];
921 11         17 $self->{_processing_queue} = [];
922 11         12 $self->{_channels} = {};
923 11         18 $self->{_channel_cnt} = 0;
924 11         11 $self->{_pchannel_cnt} = 0;
925              
926 11 100 66     55 if ( !defined $err && @queued_commands ) {
927 3         11 $err = _new_error( 'Connection closed by client prematurely.',
928             E_CONN_CLOSED_BY_CLIENT );
929             }
930              
931 11 50       27 if ( defined $err ) {
932 11         36 my $err_msg = $err->message;
933 11         27 my $err_code = $err->code;
934              
935 11         33 $self->{on_error}->($err);
936              
937 11 50 33     40 if ( %channels && $err_code != E_CONN_CLOSED_BY_CLIENT ) {
938 0         0 foreach my $name ( keys %channels ) {
939 0         0 my $err = _new_error(
940             qq{Subscription to channel "$name" lost: $err_msg},
941             $err_code
942             );
943              
944 0         0 my $cmd = $channels{$name};
945 0         0 $cmd->{on_reply}->( undef, $err );
946             }
947             }
948              
949 11         23 foreach my $cmd (@queued_commands) {
950 11         40 my $err = _new_error( qq{Operation "$cmd->{name}" aborted: $err_msg},
951             $err_code );
952 11         31 $cmd->{on_reply}->( undef, $err );
953             }
954             }
955              
956 11         108 return;
957             }
958              
959             sub _queued_commands {
960 22     22   25 my $self = shift;
961              
962             return (
963 22         34 @{ $self->{_processing_queue} },
964 22         22 @{ $self->{_temp_queue} },
965 22         21 @{ $self->{_input_queue} },
  22         52  
966             );
967             }
968              
969             sub _new_error {
970 26     26   119 return AnyEvent::RipeRedis::Error->new(@_);
971             }
972              
973             sub AUTOLOAD {
974 12     12   2735 our $AUTOLOAD;
975 12         16 my $cmd_name = $AUTOLOAD;
976 12         57 $cmd_name =~ s/^.+:://;
977              
978             my $sub = sub {
979 25     25   226 my $self = shift;
980              
981 25         164 my $cmd = $self->_prepare( $cmd_name, [@_] );
982 25         60 $self->_execute($cmd);
983              
984 17         34 return;
985 12         38 };
986              
987 12         20 do {
988 22     22   115 no strict 'refs';
  22         22  
  22         2841  
989 12         8 *{$cmd_name} = $sub;
  12         34  
990             };
991              
992 12         15 goto &{$sub};
  12         32  
993             }
994              
995             sub DESTROY {
996 17     17   2563 my $self = shift;
997              
998 17 100       48 if ( defined $self->{_handle} ) {
999 8         36 $self->{_handle}->destroy;
1000             }
1001              
1002 17 100       700 if ( defined $self->{_processing_queue} ) {
1003 11         29 my @queued_commands = $self->_queued_commands;
1004              
1005 11         27 foreach my $cmd (@queued_commands) {
1006 2         24 warn qq{Operation "$cmd->{name}" aborted:}
1007             . " Client object destroyed prematurely.\n";
1008             }
1009             }
1010              
1011 17         249 return;
1012             }
1013              
1014             1;
1015             __END__