File Coverage

blib/lib/AnyEvent/Redis/RipeRedis.pm
Criterion Covered Total %
statement 239 480 49.7
branch 46 166 27.7
condition 18 54 33.3
subroutine 43 68 63.2
pod 8 8 100.0
total 354 776 45.6


line stmt bran cond sub pod time code
1 22     22   298391 use 5.008000;
  22         51  
2 22     22   84 use strict;
  22         28  
  22         391  
3 22     22   66 use warnings;
  22         23  
  22         795  
4              
5             package AnyEvent::Redis::RipeRedis;
6              
7 22     22   75 use base qw( Exporter );
  22         24  
  22         2164  
8              
9             our $VERSION = '1.62';
10              
11 22     22   19370 use AnyEvent;
  22         83661  
  22         593  
12 22     22   14012 use AnyEvent::Handle;
  22         318347  
  22         712  
13 22     22   10559 use Encode qw( find_encoding is_utf8 );
  22         153567  
  22         1369  
14 22     22   111 use Scalar::Util qw( looks_like_number weaken );
  22         23  
  22         1435  
15 22     22   9509 use Digest::SHA qw( sha1_hex );
  22         51543  
  22         1419  
16 22     22   261 use Carp qw( croak );
  22         25  
  22         1525  
17              
18             my %ERROR_CODES;
19              
20             BEGIN {
21 22     22   1176 %ERROR_CODES = (
22             E_CANT_CONN => 1,
23             E_LOADING_DATASET => 2,
24             E_IO => 3,
25             E_CONN_CLOSED_BY_REMOTE_HOST => 4,
26             E_CONN_CLOSED_BY_CLIENT => 5,
27             E_NO_CONN => 6,
28             E_OPRN_ERROR => 9,
29             E_UNEXPECTED_DATA => 10,
30             E_NO_SCRIPT => 11,
31             E_READ_TIMEDOUT => 12,
32             E_BUSY => 13,
33             E_MASTER_DOWN => 14,
34             E_MISCONF => 15,
35             E_READONLY => 16,
36             E_OOM => 17,
37             E_EXEC_ABORT => 18,
38             E_NO_AUTH => 19,
39             E_WRONG_TYPE => 20,
40             E_NO_REPLICAS => 21,
41             E_BUSY_KEY => 22,
42             E_CROSS_SLOT => 23,
43             E_TRY_AGAIN => 24,
44             E_ASK => 25,
45             E_MOVED => 26,
46             E_CLUSTER_DOWN => 27,
47             );
48             }
49              
50             BEGIN {
51 22     22   140 our @EXPORT_OK = keys %ERROR_CODES;
52 22         852 our %EXPORT_TAGS = ( err_codes => \@EXPORT_OK, );
53             }
54              
55             use constant {
56             # Default values
57 22         18488 D_HOST => 'localhost',
58             D_PORT => 6379,
59             D_DB_INDEX => 0,
60              
61             %ERROR_CODES,
62              
63             # Operation status
64             S_NEED_PERFORM => 1,
65             S_IN_PROGRESS => 2,
66             S_IS_DONE => 3,
67              
68             # String terminator
69             EOL => "\r\n",
70             EOL_LEN => 2,
71 22     22   84 };
  22         21  
72              
73             my %SUB_CMDS = (
74             subscribe => 1,
75             psubscribe => 1,
76             );
77             my %SUBUNSUB_CMDS = (
78             %SUB_CMDS,
79             unsubscribe => 1,
80             punsubscribe => 1,
81             );
82              
83             my %NEED_POSTPROCESS = (
84             %SUBUNSUB_CMDS,
85             info => 1,
86             select => 1,
87             quit => 1,
88             );
89              
90             my %MSG_TYPES = (
91             message => 1,
92             pmessage => 1,
93             );
94              
95             my %ERR_PREFS_MAP = (
96             LOADING => E_LOADING_DATASET,
97             NOSCRIPT => E_NO_SCRIPT,
98             BUSY => E_BUSY,
99             MASTERDOWN => E_MASTER_DOWN,
100             MISCONF => E_MISCONF,
101             READONLY => E_READONLY,
102             OOM => E_OOM,
103             EXECABORT => E_EXEC_ABORT,
104             NOAUTH => E_NO_AUTH,
105             WRONGTYPE => E_WRONG_TYPE,
106             NOREPLICAS => E_NO_REPLICAS,
107             BUSYKEY => E_BUSY_KEY,
108             CROSSSLOT => E_CROSS_SLOT,
109             TRYAGAIN => E_TRY_AGAIN,
110             ASK => E_ASK,
111             MOVED => E_MOVED,
112             CLUSTERDOWN => E_CLUSTER_DOWN,
113             );
114              
115             my %EVAL_CACHE;
116              
117              
118             # Constructor
119             sub new {
120 30     30 1 1605119 my $proto = shift;
121 30         87 my %params = @_;
122              
123 30 50       93 my $self = ref( $proto ) ? $proto : bless {}, $proto;
124              
125 30   50     193 $self->{host} = $params{host} || D_HOST;
126 30   100     120 $self->{port} = $params{port} || D_PORT;
127 30         34 $self->{password} = $params{password};
128             $self->{database}
129 30 50       66 = defined $params{database} ? $params{database} : D_DB_INDEX;
130 30 100       71 $self->{reconnect} = exists $params{reconnect} ? $params{reconnect} : 1;
131 30         45 $self->{on_connect} = $params{on_connect};
132 30         41 $self->{on_disconnect} = $params{on_disconnect};
133 30         69 $self->{on_connect_error} = $params{on_connect_error};
134              
135 30         101 $self->encoding( $params{encoding} );
136 29         92 $self->connection_timeout( $params{connection_timeout} );
137 27         68 $self->read_timeout( $params{read_timeout} );
138 25         64 $self->min_reconnect_interval( $params{min_reconnect_interval} );
139 23         56 $self->on_error( $params{on_error} );
140              
141 23   50     82 my $hdl_params = $params{handle_params} || {};
142 23         40 foreach my $name ( qw( linger autocork ) ) {
143 46 50 33     173 if ( !exists $hdl_params->{$name} && defined $params{$name} ) {
144 0         0 $hdl_params->{$name} = $params{$name};
145             }
146             }
147 23         30 $self->{handle_params} = $hdl_params;
148              
149 23         42 $self->{_handle} = undef;
150 23         51 $self->{_connected} = 0;
151 23         37 $self->{_lazy_conn_st} = $params{lazy};
152 23         27 $self->{_auth_st} = S_NEED_PERFORM;
153 23         30 $self->{_select_db_st} = S_NEED_PERFORM;
154 23         25 $self->{_ready_to_write} = 0;
155 23         30 $self->{_input_queue} = [];
156 23         23 $self->{_temp_queue} = [];
157 23         33 $self->{_processing_queue} = [];
158 23         29 $self->{_txn_lock} = 0;
159 23         34 $self->{_channels} = {};
160 23         23 $self->{_channel_cnt} = 0;
161 23         22 $self->{_reconnect_timer} = undef;
162              
163 23 50       49 unless ( $self->{_lazy_conn_st} ) {
164 23         55 $self->_connect();
165             }
166              
167 23         72 return $self;
168             }
169              
170             sub multi {
171 0     0 1 0 my $self = shift;
172 0         0 my $cmd = $self->_prepare_cmd( 'multi', [ @_ ] );
173              
174 0         0 $self->{_txn_lock} = 1;
175 0         0 $self->_execute_cmd( $cmd );
176              
177 0         0 return;
178             }
179              
180             sub exec {
181 0     0 1 0 my $self = shift;
182 0         0 my $cmd = $self->_prepare_cmd( 'exec', [ @_ ] );
183              
184 0         0 $self->{_txn_lock} = 0;
185 0         0 $self->_execute_cmd( $cmd );
186              
187 0         0 return;
188             }
189              
190             sub eval_cached {
191 0     0 1 0 my $self = shift;
192 0         0 my $cmd = $self->_prepare_cmd( 'evalsha', [ @_ ] );
193              
194 0         0 $cmd->{script} = $cmd->{args}[0];
195 0 0       0 unless ( exists $EVAL_CACHE{ $cmd->{script} } ) {
196 0         0 $EVAL_CACHE{ $cmd->{script} } = sha1_hex( $cmd->{script} );
197             }
198 0         0 $cmd->{args}[0] = $EVAL_CACHE{ $cmd->{script} };
199              
200 0         0 $self->_execute_cmd( $cmd );
201              
202 0         0 return;
203             }
204              
205             sub disconnect {
206 2     2 1 10 my $self = shift;
207              
208 2         14 $self->_disconnect();
209              
210 2         2 return;
211             }
212              
213             sub encoding {
214 36     36 1 357 my $self = shift;
215              
216 36 100       86 if ( @_ ) {
217 33         55 my $enc = shift;
218              
219 33 100       49 if ( defined $enc ) {
220 4         16 $self->{encoding} = find_encoding( $enc );
221              
222 4 100       3243 unless ( defined $self->{encoding} ) {
223 2         168 croak "Encoding \"$enc\" not found";
224             }
225             }
226             else {
227 29         49 undef $self->{encoding};
228             }
229             }
230              
231 34         41 return $self->{encoding};
232             }
233              
234             sub on_error {
235 28     28 1 286 my $self = shift;
236              
237 28 100       60 if ( @_ ) {
238 25         30 my $on_error = shift;
239              
240 25 100       35 if ( defined $on_error ) {
241 8         16 $self->{on_error} = $on_error;
242             }
243             else {
244             $self->{on_error} = sub {
245 1     1   2 my $err_msg = shift;
246              
247 1         13 warn "$err_msg\n";
248 17         59 };
249             }
250             }
251              
252 28         37 return $self->{on_error};
253             }
254              
255             sub selected_database {
256 1     1 1 1 my $self = shift;
257              
258 1         2 return $self->{database};
259             }
260              
261             # Generate additional methods and accessors
262             {
263 22     22   101 no strict 'refs';
  22         25  
  22         54270  
264              
265             foreach my $kwd ( keys %SUBUNSUB_CMDS ) {
266             *{$kwd} = sub {
267 1     1   30 my $self = shift;
268 1         4 my $cmd = $self->_prepare_cmd( $kwd, [ @_ ] );
269              
270 1 50 33     8 if ( exists $SUB_CMDS{ $cmd->{kwd} } && !defined $cmd->{on_message} ) {
271 1         80 croak "\"on_message\" callback must be specified";
272             }
273              
274 0 0       0 if ( $self->{_txn_lock} ) {
275             AE::postpone(
276             sub {
277 0     0   0 $self->_process_cmd_error( $cmd,
278             "Command \"$cmd->{kwd}\" not allowed after \"multi\" command."
279             . ' First, the transaction must be finalized.',
280             E_OPRN_ERROR );
281             }
282 0         0 );
283              
284 0         0 return;
285             }
286              
287 0         0 $cmd->{replies_cnt} = scalar @{ $cmd->{args} };
  0         0  
288              
289 0 0       0 if ( defined $cmd->{on_done} ) {
290 0         0 my $on_done = $cmd->{on_done};
291              
292             $cmd->{on_done} = sub {
293 0     0   0 $on_done->( @{ $_[0] } );
  0         0  
294             }
295 0         0 }
296              
297 0         0 $self->_execute_cmd( $cmd );
298              
299 0         0 return;
300             },
301             }
302              
303             foreach my $name ( qw( connection_timeout read_timeout min_reconnect_interval ) ) {
304             *{$name} = sub {
305 102     102   4695 my $self = shift;
306              
307 102 100       163 if ( @_ ) {
308 93         76 my $seconds = shift;
309              
310 93 100 100     283 if ( defined $seconds
      66        
311             && ( !looks_like_number($seconds) || $seconds < 0 ) )
312             {
313 12         1225 croak "\"$name\" must be a positive number";
314             }
315 81         109 $self->{$name} = $seconds;
316             }
317              
318 90         97 return $self->{$name};
319             }
320             }
321              
322             foreach my $name ( qw( reconnect on_connect on_disconnect on_connect_error ) ) {
323             *{$name} = sub {
324 20     20   1085 my $self = shift;
325              
326 20 100       40 if ( @_ ) {
327 8         13 $self->{$name} = shift;
328             }
329              
330 20         43 return $self->{$name};
331             }
332             }
333             }
334              
335             sub _connect {
336 23     23   29 my $self = shift;
337              
338             $self->{_handle} = AnyEvent::Handle->new(
339 23         112 %{ $self->{handle_params} },
340 23         24 connect => [ $self->{host}, $self->{port} ],
341             on_prepare => $self->_get_on_prepare(),
342             on_connect => $self->_get_on_connect(),
343             on_connect_error => $self->_get_on_connect_error(),
344             on_rtimeout => $self->_get_on_rtimeout(),
345             on_eof => $self->_get_on_eof(),
346             on_error => $self->_get_handle_on_error(),
347             on_read => $self->_get_on_read(),
348             );
349              
350 23         2349 return;
351             }
352              
353             sub _get_on_prepare {
354 23     23   29 my $self = shift;
355              
356 23         56 weaken( $self );
357              
358             return sub {
359 35 100   35   88773 if ( defined $self->{connection_timeout} ) {
360 25         65 return $self->{connection_timeout};
361             }
362              
363 10         30 return;
364 23         89 };
365             }
366              
367             sub _get_on_connect {
368 23     23   28 my $self = shift;
369              
370 23         34 weaken( $self );
371              
372             return sub {
373 0     0   0 $self->{_connected} = 1;
374              
375 0 0       0 unless ( defined $self->{password} ) {
376 0         0 $self->{_auth_st} = S_IS_DONE;
377             }
378 0 0       0 if ( $self->{database} == 0 ) {
379 0         0 $self->{_select_db_st} = S_IS_DONE;
380             }
381              
382 0 0       0 if ( $self->{_auth_st} == S_NEED_PERFORM ) {
    0          
383 0         0 $self->_auth();
384             }
385             elsif ( $self->{_select_db_st} == S_NEED_PERFORM ) {
386 0         0 $self->_select_db();
387             }
388             else {
389 0         0 $self->{_ready_to_write} = 1;
390 0         0 $self->_flush_input_queue();
391             }
392              
393 0 0       0 if ( defined $self->{on_connect} ) {
394 0         0 $self->{on_connect}->();
395             }
396 23         64 };
397             }
398              
399             sub _get_on_connect_error {
400 23     23   24 my $self = shift;
401              
402 23         35 weaken( $self );
403              
404             return sub {
405 12     12   1428 my $err_msg = pop;
406              
407 12         60 $self->_disconnect(
408             "Can't connect to $self->{host}:$self->{port}: $err_msg",
409             E_CANT_CONN
410             );
411 23         78 };
412             }
413              
414             sub _get_on_rtimeout {
415 23     23   22 my $self = shift;
416              
417 23         25 weaken( $self );
418              
419             return sub {
420 0 0   0   0 if ( @{ $self->{_processing_queue} } ) {
  0         0  
421 0         0 $self->_disconnect( 'Read timed out.', E_READ_TIMEDOUT );
422             }
423             else {
424 0         0 $self->{_handle}->rtimeout( undef );
425             }
426 23         72 };
427             }
428              
429             sub _get_on_eof {
430 23     23   24 my $self = shift;
431              
432 23         27 weaken( $self );
433              
434             return sub {
435 0     0   0 $self->_disconnect( 'Connection closed by remote host.',
436             E_CONN_CLOSED_BY_REMOTE_HOST );
437 23         71 };
438             }
439              
440             sub _get_handle_on_error {
441 23     23   27 my $self = shift;
442              
443 23         32 weaken( $self );
444              
445             return sub {
446 0     0   0 my $err_msg = pop;
447              
448 0         0 $self->_disconnect( $err_msg, E_IO );
449 23         70 };
450             }
451              
452             sub _get_on_read {
453 23     23   19 my $self = shift;
454              
455 23         34 weaken( $self );
456              
457 23         20 my $str_len;
458             my @bufs;
459 23         22 my $bufs_num = 0;
460              
461             return sub {
462 0     0   0 my $handle = shift;
463              
464 0         0 MAIN: while ( 1 ) {
465 0 0       0 if ( $handle->destroyed() ) {
466 0         0 return;
467             }
468              
469 0         0 my $reply;
470             my $err_code;
471              
472 0 0       0 if ( defined $str_len ) {
473 0 0       0 if ( length( $handle->{rbuf} ) < $str_len + EOL_LEN ) {
474 0         0 return;
475             }
476              
477 0         0 $reply = substr( $handle->{rbuf}, 0, $str_len, '' );
478 0         0 substr( $handle->{rbuf}, 0, EOL_LEN, '' );
479 0 0       0 if ( defined $self->{encoding} ) {
480 0         0 $reply = $self->{encoding}->decode( $reply );
481             }
482 0         0 undef $str_len;
483             }
484             else {
485 0         0 my $eol_pos = index( $handle->{rbuf}, EOL );
486              
487 0 0       0 if ( $eol_pos < 0 ) {
488 0         0 return;
489             }
490              
491 0         0 $reply = substr( $handle->{rbuf}, 0, $eol_pos, '' );
492 0         0 my $type = substr( $reply, 0, 1, '' );
493 0         0 substr( $handle->{rbuf}, 0, EOL_LEN, '' );
494              
495 0 0 0     0 if ( $type ne '+' && $type ne ':' ) {
496 0 0       0 if ( $type eq '$' ) {
    0          
    0          
497 0 0       0 if ( $reply >= 0 ) {
498 0         0 $str_len = $reply;
499              
500 0         0 next;
501             }
502              
503 0         0 undef $reply;
504             }
505             elsif ( $type eq '*' ) {
506 0 0       0 if ( $reply > 0 ) {
    0          
507 0         0 push( @bufs,
508             { data => [],
509             err_code => undef,
510             chunks_cnt => $reply,
511             }
512             );
513 0         0 $bufs_num++;
514              
515 0         0 next;
516             }
517             elsif ( $reply == 0 ) {
518 0         0 $reply = [];
519             }
520             else {
521 0         0 undef $reply;
522             }
523             }
524             elsif ( $type eq '-' ) {
525 0         0 $err_code = E_OPRN_ERROR;
526 0 0       0 if ( $reply =~ m/^([A-Z]{3,}) / ) {
527 0 0       0 if ( exists $ERR_PREFS_MAP{$1} ) {
528 0         0 $err_code = $ERR_PREFS_MAP{$1};
529             }
530             }
531             }
532             else {
533 0         0 $self->_disconnect( 'Unexpected reply received.',
534             E_UNEXPECTED_DATA );
535              
536 0         0 return;
537             }
538             }
539             }
540              
541 0         0 while ( $bufs_num > 0 ) {
542 0         0 my $curr_buf = $bufs[-1];
543 0 0       0 if ( defined $err_code ) {
544 0 0       0 unless ( ref($reply) ) {
545 0         0 $reply
546             = AnyEvent::Redis::RipeRedis::Error->new( $reply, $err_code );
547             }
548 0         0 $curr_buf->{err_code} = E_OPRN_ERROR;
549             }
550 0         0 push( @{ $curr_buf->{data} }, $reply );
  0         0  
551 0 0       0 if ( --$curr_buf->{chunks_cnt} > 0 ) {
552 0         0 next MAIN;
553             }
554              
555 0         0 $reply = $curr_buf->{data};
556 0         0 $err_code = $curr_buf->{err_code};
557 0         0 pop @bufs;
558 0         0 $bufs_num--;
559             }
560              
561 0         0 $self->_process_reply( $reply, $err_code );
562             }
563              
564 0         0 return;
565 23         177 };
566             }
567              
568             sub _prepare_cmd {
569 21     21   22 my $self = shift;
570 21         29 my $kwd = shift;
571 21         17 my $args = shift;
572              
573 21         17 my $cmd;
574 21 100       62 if ( ref( $args->[-1] ) eq 'HASH' ) {
575 20         16 $cmd = pop @{$args};
  20         32  
576             }
577             else {
578 1         1 $cmd = {};
579 1 50       4 if ( ref( $args->[-1] ) eq 'CODE' ) {
580 0 0       0 if ( exists $SUB_CMDS{$kwd} ) {
581 0         0 $cmd->{on_message} = pop @{$args};
  0         0  
582             }
583             else {
584 0         0 $cmd->{on_reply} = pop @{$args};
  0         0  
585             }
586             }
587             }
588 21         33 $cmd->{kwd} = $kwd;
589 21         31 $cmd->{args} = $args;
590              
591 21         30 return $cmd;
592             }
593              
594             sub _execute_cmd {
595 20     20   20 my $self = shift;
596 20         20 my $cmd = shift;
597              
598 20 50       52 unless ( $self->{_ready_to_write} ) {
599 20 100       56 if ( defined $self->{_handle} ) {
    50          
    50          
600 16 50       36 if ( $self->{_connected} ) {
601 0 0       0 if ( $self->{_auth_st} == S_IS_DONE ) {
    0          
602 0 0       0 if ( $self->{_select_db_st} == S_NEED_PERFORM ) {
603 0         0 $self->_select_db();
604             }
605             }
606             elsif ( $self->{_auth_st} == S_NEED_PERFORM ) {
607 0         0 $self->_auth();
608             }
609             }
610             }
611             elsif ( $self->{_lazy_conn_st} ) {
612 0         0 $self->{_lazy_conn_st} = 0;
613 0         0 $self->_connect();
614             }
615             elsif ( $self->{reconnect} ) {
616 0 0 0     0 if ( defined $self->{min_reconnect_interval}
617             && $self->{min_reconnect_interval} > 0 )
618             {
619 0 0       0 unless ( defined $self->{_reconnect_timer} ) {
620             $self->{_reconnect_timer} = AE::timer( $self->{min_reconnect_interval}, 0,
621             sub {
622 0     0   0 undef $self->{_reconnect_timer};
623 0         0 $self->_connect();
624             }
625 0         0 );
626             }
627             }
628             else {
629 0         0 $self->_connect();
630             }
631             }
632             else {
633             AE::postpone(
634             sub {
635 4     4   4460 $self->_process_cmd_error( $cmd, "Operation \"$cmd->{kwd}\" aborted:"
636             . ' No connection to the server.', E_NO_CONN );
637             }
638 4         20 );
639              
640 4         24 return;
641             }
642              
643 16         10 push( @{ $self->{_input_queue} }, $cmd );
  16         28  
644              
645 16         20 return;
646             }
647              
648 0         0 $self->_push_write( $cmd );
649              
650 0         0 return;
651             }
652              
653             sub _push_write {
654 0     0   0 my $self = shift;
655 0         0 my $cmd = shift;
656              
657 0         0 my $cmd_str = '';
658 0         0 foreach my $token ( $cmd->{kwd}, @{ $cmd->{args} } ) {
  0         0  
659 0 0 0     0 unless ( defined $token ) {
    0          
660 0         0 $token = '';
661             }
662             elsif ( defined $self->{encoding} && is_utf8( $token ) ) {
663 0         0 $token = $self->{encoding}->encode( $token );
664             }
665 0         0 $cmd_str .= '$' . length( $token ) . EOL . $token . EOL;
666             }
667 0         0 $cmd_str = '*' . ( scalar( @{ $cmd->{args} } ) + 1 ) . EOL . $cmd_str;
  0         0  
668              
669 0         0 my $handle = $self->{_handle};
670 0 0 0     0 if ( defined $self->{read_timeout} && !@{ $self->{_processing_queue} } ) {
  0         0  
671 0         0 $handle->rtimeout_reset();
672 0         0 $handle->rtimeout( $self->{read_timeout} );
673             }
674 0         0 push( @{ $self->{_processing_queue} }, $cmd );
  0         0  
675              
676 0         0 $handle->push_write( $cmd_str );
677              
678 0         0 return;
679             }
680              
681             sub _auth {
682 0     0   0 my $self = shift;
683              
684 0         0 weaken( $self );
685              
686 0         0 $self->{_auth_st} = S_IN_PROGRESS;
687              
688             $self->_push_write(
689             { kwd => 'auth',
690             args => [ $self->{password} ],
691              
692             on_done => sub {
693 0     0   0 $self->{_auth_st} = S_IS_DONE;
694              
695 0 0       0 if ( $self->{_select_db_st} == S_NEED_PERFORM ) {
696 0         0 $self->_select_db();
697             }
698             else {
699 0         0 $self->{_ready_to_write} = 1;
700 0         0 $self->_flush_input_queue();
701             }
702             },
703              
704             on_error => sub {
705 0     0   0 $self->{_auth_st} = S_NEED_PERFORM;
706 0         0 $self->_abort_all( @_ );
707             },
708             }
709 0         0 );
710              
711 0         0 return;
712             }
713              
714             sub _select_db {
715 0     0   0 my $self = shift;
716              
717 0         0 weaken( $self );
718              
719 0         0 $self->{_select_db_st} = S_IN_PROGRESS;
720              
721             $self->_push_write(
722             { kwd => 'select',
723             args => [ $self->{database} ],
724              
725             on_done => sub {
726 0     0   0 $self->{_select_db_st} = S_IS_DONE;
727 0         0 $self->{_ready_to_write} = 1;
728 0         0 $self->_flush_input_queue();
729             },
730              
731             on_error => sub {
732 0     0   0 $self->{_select_db_st} = S_NEED_PERFORM;
733 0         0 $self->_abort_all( @_ );
734             },
735             }
736 0         0 );
737              
738 0         0 return;
739             }
740              
741             sub _flush_input_queue {
742 0     0   0 my $self = shift;
743              
744 0         0 $self->{_temp_queue} = $self->{_input_queue};
745 0         0 $self->{_input_queue} = [];
746              
747 0         0 while ( my $cmd = shift @{ $self->{_temp_queue} } ) {
  0         0  
748 0         0 $self->_push_write( $cmd );
749             }
750              
751 0         0 return;
752             }
753              
754             sub _process_reply {
755 0     0   0 my $self = shift;
756 0         0 my $reply = shift;
757 0         0 my $err_code = shift;
758              
759 0 0 0     0 if ( defined $err_code ) {
    0 0        
760 0         0 my $cmd = shift @{ $self->{_processing_queue} };
  0         0  
761              
762 0 0       0 unless ( defined $cmd ) {
763 0         0 $self->_disconnect(
764             "Don't know how process error message. Processing queue is empty.",
765             E_UNEXPECTED_DATA,
766             );
767              
768 0         0 return;
769             }
770              
771 0 0       0 $self->_process_cmd_error( $cmd, ref($reply)
772             ? ( "Operation \"$cmd->{kwd}\" completed with errors.",
773             $err_code, $reply )
774             : $reply, $err_code );
775             }
776             elsif ( $self->{_channel_cnt} > 0
777             && ref( $reply ) && exists $MSG_TYPES{ $reply->[0] } )
778             {
779 0         0 my $cmd = $self->{_channels}{ $reply->[1] };
780              
781 0 0       0 unless ( defined $cmd ) {
782 0         0 $self->_disconnect(
783             "Don't know how process published message."
784             . " Unknown channel or pattern \"$reply->[1]\".",
785             E_UNEXPECTED_DATA
786             );
787              
788 0         0 return;
789             }
790              
791             $cmd->{on_message}->( $reply->[0] eq 'pmessage'
792 0 0       0 ? @{$reply}[ 2, 3, 1 ] : @{$reply}[ 1, 2 ] );
  0         0  
  0         0  
793             }
794             else {
795 0         0 my $cmd = $self->{_processing_queue}[0];
796              
797 0 0       0 unless ( defined $cmd ) {
798 0         0 $self->_disconnect(
799             "Don't know how process reply. Processing queue is empty.",
800             E_UNEXPECTED_DATA
801             );
802              
803 0         0 return;
804             }
805              
806 0 0 0     0 if ( !defined $cmd->{replies_cnt} || --$cmd->{replies_cnt} <= 0 ) {
807 0         0 shift @{ $self->{_processing_queue} };
  0         0  
808             }
809 0         0 $self->_process_cmd_success( $cmd, $reply );
810             }
811              
812 0         0 return;
813             }
814              
815             sub _process_cmd_error {
816 18     18   24 my $self = shift;
817 18         12 my $cmd = shift;
818              
819 18 50 33     44 if ( $_[1] == E_NO_SCRIPT && defined $cmd->{script} ) {
820 0         0 $cmd->{kwd} = 'eval';
821 0         0 $cmd->{args}[0] = delete $cmd->{script};
822              
823 0         0 $self->_push_write( $cmd );
824              
825 0         0 return;
826             }
827              
828 18 50       32 if ( defined $cmd->{on_error} ) {
    0          
829 18         42 $cmd->{on_error}->( @_ );
830             }
831             elsif ( defined $cmd->{on_reply} ) {
832 0         0 $cmd->{on_reply}->( @_[ 2, 0, 1 ] );
833             }
834             else {
835 0         0 $self->{on_error}->( @_ );
836             }
837              
838 18         166 return;
839             }
840              
841             sub _process_cmd_success {
842 0     0   0 my $self = shift;
843 0         0 my $cmd = shift;
844 0         0 my $reply = shift;
845              
846 0 0       0 if ( exists $NEED_POSTPROCESS{ $cmd->{kwd} } ) {
847 0         0 my $kwd = $cmd->{kwd};
848              
849 0 0       0 if ( exists $SUBUNSUB_CMDS{$kwd} ) {
    0          
    0          
850 0         0 shift @{$reply};
  0         0  
851              
852 0 0       0 if ( exists $SUB_CMDS{$kwd} ) {
853 0         0 $self->{_channels}{ $reply->[0] } = $cmd;
854             }
855             else { # unsubscribe or punsubscribe
856 0         0 delete $self->{_channels}{ $reply->[0] };
857             }
858              
859 0         0 $self->{_channel_cnt} = $reply->[1];
860             }
861             elsif ( $kwd eq 'info' ) {
862 0         0 $reply = $self->_parse_info( $reply );
863             }
864             elsif ( $kwd eq 'select' ) {
865 0         0 $self->{database} = $cmd->{args}[0];
866             }
867             else { # quit
868 0         0 $self->_disconnect();
869             }
870             }
871              
872 0 0       0 if ( defined $cmd->{on_done} ) {
    0          
873 0         0 $cmd->{on_done}->( $reply );
874             }
875             elsif ( defined $cmd->{on_reply} ) {
876 0         0 $cmd->{on_reply}->( $reply );
877             }
878              
879 0         0 return;
880             }
881              
882             sub _parse_info {
883             return {
884 0         0 map { split( m/:/, $_, 2 ) }
885 0     0   0 grep { m/^[^#]/ } split( EOL, $_[1] )
  0         0  
886             };
887             }
888              
889             sub _disconnect {
890 14     14   18 my $self = shift;
891 14         14 my $err_msg = shift;
892 14         8 my $err_code = shift;
893              
894 14         18 my $was_connected = $self->{_connected};
895              
896 14 50       32 if ( defined $self->{_handle} ) {
897 14         50 $self->{_handle}->destroy();
898 14         620 undef $self->{_handle};
899             }
900 14         8 $self->{_connected} = 0;
901 14         24 $self->{_auth_st} = S_NEED_PERFORM;
902 14         106 $self->{_select_db_st} = S_NEED_PERFORM;
903 14         14 $self->{_ready_to_write} = 0;
904 14         10 $self->{_txn_lock} = 0;
905              
906 14         32 $self->_abort_all( $err_msg, $err_code );
907              
908 14 50 33     32 if ( $was_connected && defined $self->{on_disconnect} ) {
909 0         0 $self->{on_disconnect}->();
910             }
911              
912 14         42 return;
913             }
914              
915             sub _abort_all {
916 14     14   6 my $self = shift;
917 14         14 my $err_msg = shift;
918 14         18 my $err_code = shift;
919              
920             my @unfin_cmds = (
921 14         20 @{ $self->{_processing_queue} },
922 14         14 @{ $self->{_temp_queue} },
923 14         14 @{ $self->{_input_queue} },
  14         22  
924             );
925              
926 14         14 my %channels = %{ $self->{_channels} };
  14         22  
927              
928 14         18 $self->{_input_queue} = [];
929 14         20 $self->{_temp_queue} = [];
930 14         14 $self->{_processing_queue} = [];
931 14         20 $self->{_channels} = {};
932 14         14 $self->{_channel_cnt} = 0;
933              
934 14 100 66     48 if ( !defined $err_msg && @unfin_cmds ) {
935 2         6 $err_msg = 'Connection closed by client prematurely.';
936 2         4 $err_code = E_CONN_CLOSED_BY_CLIENT;
937             }
938              
939 14 50       38 if ( defined $err_msg ) {
940 14 100 66     48 if ( defined $self->{on_connect_error} && $err_code == E_CANT_CONN ) {
941 8         24 $self->{on_connect_error}->( $err_msg );
942             }
943             else {
944 6         18 $self->{on_error}->( $err_msg, $err_code );
945             }
946              
947 14 50 33     58 if ( %channels && $err_code != E_CONN_CLOSED_BY_CLIENT ) {
948 0         0 foreach my $name ( keys %channels ) {
949 0         0 my $cmd = $channels{$name};
950 0         0 $self->_process_cmd_error( $cmd, "Subscription \"$name\" lost: "
951             . $err_msg, $err_code );
952             }
953             }
954 14         26 foreach my $cmd ( @unfin_cmds ) {
955 14         56 $self->_process_cmd_error( $cmd, "Operation \"$cmd->{kwd}\" aborted: "
956             . $err_msg, $err_code );
957             }
958             }
959              
960 14         44 return;
961             }
962              
963             sub AUTOLOAD {
964 4     4   56 our $AUTOLOAD;
965 4         4 my $method = $AUTOLOAD;
966 4         24 $method =~ s/^.+:://;
967 4         16 my ( $kwd, @args ) = split( m/_/, lc( $method ) );
968              
969             my $sub = sub {
970 20     20   4818 my $self = shift;
971 20         70 my $cmd = $self->_prepare_cmd( $kwd, [ @args, @_ ] );
972              
973 20         36 $self->_execute_cmd( $cmd );
974              
975 20         40 return;
976 4         16 };
977              
978 4         4 do {
979 22     22   130 no strict 'refs';
  22         29  
  22         4984  
980 4         8 *{$method} = $sub;
  4         12  
981             };
982              
983 4         4 goto &{$sub};
  4         20  
984             }
985              
986             sub DESTROY {
987 18     18   6009 my $self = shift;
988              
989 18 100       48 if ( defined $self->{_handle} ) {
990             my @unfin_cmds = (
991 9         18 @{ $self->{_processing_queue} },
992 9         16 @{ $self->{_temp_queue} },
993 9         12 @{ $self->{_input_queue} },
  9         18  
994             );
995              
996 9         23 foreach my $cmd ( @unfin_cmds ) {
997 2         22 warn "Operation \"$cmd->{kwd}\" aborted:"
998             . " Client object destroyed prematurely.\n";
999             }
1000             }
1001              
1002 18         147 return;
1003             }
1004              
1005              
1006             package AnyEvent::Redis::RipeRedis::Error;
1007              
1008             # Constructor
1009             sub new {
1010 0     0     my $class = shift;
1011 0           my $err_msg = shift;
1012 0           my $err_code = shift;
1013              
1014 0           my $self = bless {}, $class;
1015              
1016 0           $self->{message} = $err_msg;
1017 0           $self->{code} = $err_code;
1018              
1019 0           return $self;
1020             }
1021              
1022             sub message {
1023 0     0     my $self = shift;
1024              
1025 0           return $self->{message};
1026             }
1027              
1028             sub code {
1029 0     0     my $self = shift;
1030              
1031 0           return $self->{code};
1032             }
1033              
1034             1;
1035             __END__