File Coverage

blib/lib/Redis/ClusterRider.pm
Criterion Covered Total %
statement 278 292 95.2
branch 87 110 79.0
condition 12 18 66.6
subroutine 32 33 96.9
pod 6 6 100.0
total 415 459 90.4


line stmt bran cond sub pod time code
1             package Redis::ClusterRider;
2              
3 5     5   338323 use 5.008000;
  5         49  
4 5     5   27 use strict;
  5         9  
  5         196  
5 5     5   26 use warnings;
  5         9  
  5         152  
6 5     5   24 use base qw( Exporter );
  5         8  
  5         888  
7              
8             our $VERSION = '0.24';
9              
10 5     5   3039 use Redis;
  5         576309  
  5         224  
11 5     5   3227 use List::MoreUtils qw( bsearch );
  5         63282  
  5         36  
12 5     5   4699 use Scalar::Util qw( looks_like_number weaken );
  5         12  
  5         319  
13 5     5   27 use Time::HiRes;
  5         10  
  5         51  
14 5     5   464 use Carp qw( carp croak );
  5         11  
  5         293  
15              
16             BEGIN {
17 5     5   171 our @EXPORT_OK = qw( crc16 hash_slot );
18             }
19              
20             use constant {
21 5         12017 D_REFRESH_INTERVAL => 15,
22             MAX_SLOTS => 16384,
23             EOL => "\r\n",
24 5     5   26 };
  5         12  
25              
26             my @CRC16_TAB = (
27             0x0000, 0x1021, 0x2042, 0x3063, 0x4084, 0x50a5, 0x60c6, 0x70e7,
28             0x8108, 0x9129, 0xa14a, 0xb16b, 0xc18c, 0xd1ad, 0xe1ce, 0xf1ef,
29             0x1231, 0x0210, 0x3273, 0x2252, 0x52b5, 0x4294, 0x72f7, 0x62d6,
30             0x9339, 0x8318, 0xb37b, 0xa35a, 0xd3bd, 0xc39c, 0xf3ff, 0xe3de,
31             0x2462, 0x3443, 0x0420, 0x1401, 0x64e6, 0x74c7, 0x44a4, 0x5485,
32             0xa56a, 0xb54b, 0x8528, 0x9509, 0xe5ee, 0xf5cf, 0xc5ac, 0xd58d,
33             0x3653, 0x2672, 0x1611, 0x0630, 0x76d7, 0x66f6, 0x5695, 0x46b4,
34             0xb75b, 0xa77a, 0x9719, 0x8738, 0xf7df, 0xe7fe, 0xd79d, 0xc7bc,
35             0x48c4, 0x58e5, 0x6886, 0x78a7, 0x0840, 0x1861, 0x2802, 0x3823,
36             0xc9cc, 0xd9ed, 0xe98e, 0xf9af, 0x8948, 0x9969, 0xa90a, 0xb92b,
37             0x5af5, 0x4ad4, 0x7ab7, 0x6a96, 0x1a71, 0x0a50, 0x3a33, 0x2a12,
38             0xdbfd, 0xcbdc, 0xfbbf, 0xeb9e, 0x9b79, 0x8b58, 0xbb3b, 0xab1a,
39             0x6ca6, 0x7c87, 0x4ce4, 0x5cc5, 0x2c22, 0x3c03, 0x0c60, 0x1c41,
40             0xedae, 0xfd8f, 0xcdec, 0xddcd, 0xad2a, 0xbd0b, 0x8d68, 0x9d49,
41             0x7e97, 0x6eb6, 0x5ed5, 0x4ef4, 0x3e13, 0x2e32, 0x1e51, 0x0e70,
42             0xff9f, 0xefbe, 0xdfdd, 0xcffc, 0xbf1b, 0xaf3a, 0x9f59, 0x8f78,
43             0x9188, 0x81a9, 0xb1ca, 0xa1eb, 0xd10c, 0xc12d, 0xf14e, 0xe16f,
44             0x1080, 0x00a1, 0x30c2, 0x20e3, 0x5004, 0x4025, 0x7046, 0x6067,
45             0x83b9, 0x9398, 0xa3fb, 0xb3da, 0xc33d, 0xd31c, 0xe37f, 0xf35e,
46             0x02b1, 0x1290, 0x22f3, 0x32d2, 0x4235, 0x5214, 0x6277, 0x7256,
47             0xb5ea, 0xa5cb, 0x95a8, 0x8589, 0xf56e, 0xe54f, 0xd52c, 0xc50d,
48             0x34e2, 0x24c3, 0x14a0, 0x0481, 0x7466, 0x6447, 0x5424, 0x4405,
49             0xa7db, 0xb7fa, 0x8799, 0x97b8, 0xe75f, 0xf77e, 0xc71d, 0xd73c,
50             0x26d3, 0x36f2, 0x0691, 0x16b0, 0x6657, 0x7676, 0x4615, 0x5634,
51             0xd94c, 0xc96d, 0xf90e, 0xe92f, 0x99c8, 0x89e9, 0xb98a, 0xa9ab,
52             0x5844, 0x4865, 0x7806, 0x6827, 0x18c0, 0x08e1, 0x3882, 0x28a3,
53             0xcb7d, 0xdb5c, 0xeb3f, 0xfb1e, 0x8bf9, 0x9bd8, 0xabbb, 0xbb9a,
54             0x4a75, 0x5a54, 0x6a37, 0x7a16, 0x0af1, 0x1ad0, 0x2ab3, 0x3a92,
55             0xfd2e, 0xed0f, 0xdd6c, 0xcd4d, 0xbdaa, 0xad8b, 0x9de8, 0x8dc9,
56             0x7c26, 0x6c07, 0x5c64, 0x4c45, 0x3ca2, 0x2c83, 0x1ce0, 0x0cc1,
57             0xef1f, 0xff3e, 0xcf5d, 0xdf7c, 0xaf9b, 0xbfba, 0x8fd9, 0x9ff8,
58             0x6e17, 0x7e36, 0x4e55, 0x5e74, 0x2e93, 0x3eb2, 0x0ed1, 0x1ef0,
59             );
60              
61             my %PREDEFINED_CMDS = (
62             sort => { readonly => 0, key_pos => 1 },
63             zunionstore => { readonly => 0, key_pos => 1 },
64             zinterstore => { readonly => 0, key_pos => 1 },
65             eval => { readonly => 0, movablekeys => 1, key_pos => 0 },
66             evalsha => { readonly => 0, movablekeys => 1, key_pos => 0 },
67             );
68              
69             $Carp::Internal{ (__PACKAGE__) }++;
70              
71              
72             sub new {
73 12     12 1 7097 my $class = shift;
74 12         34 my %params = @_;
75              
76 12         28 my $self = bless {}, $class;
77              
78 12 100       38 unless ( defined $params{startup_nodes} ) {
79 1         214 croak 'Startup nodes not specified';
80             }
81 11 100       37 unless ( ref( $params{startup_nodes} ) eq 'ARRAY' ) {
82 1         83 croak 'Startup nodes must be specified as array reference';
83             }
84 10 100       15 unless ( @{ $params{startup_nodes} } ) {
  10         26  
85 1         89 croak 'Specified empty list of startup nodes';
86             }
87              
88 9 50       30 if ( $params{fallback} ) {
89 0 0       0 if ( $params{lazy} ) {
90 0         0 carp 'Fallback mode revokes lazy for ' . $params{startup_nodes}->[0];
91             }
92              
93 0         0 my $node = Redis->new(%params, server => $params{startup_nodes}->[0]);
94 0 0       0 eval { $node->cluster_info(); 1 } or return $node;
  0         0  
  0         0  
95             }
96              
97 9         41 $self->{startup_nodes} = $params{startup_nodes};
98 9         19 $self->{allow_slaves} = $params{allow_slaves};
99 9         15 $self->{lazy} = $params{lazy};
100 9         36 $self->refresh_interval( $params{refresh_interval} );
101              
102 7         13 $self->{on_node_connect} = $params{on_node_connect};
103 7         13 $self->{on_node_error} = $params{on_node_error};
104              
105 7         11 my %node_params;
106 7         23 foreach my $name ( qw( conservative_reconnect cnx_timeout read_timeout
107             write_timeout password name debug ) )
108             {
109 49 100       118 next unless defined $params{$name};
110 6         12 $node_params{$name} = $params{$name};
111             }
112 7         16 $self->{_node_params} = \%node_params;
113              
114 7         18 $self->{_nodes_pool} = undef;
115 7         15 $self->{_nodes} = undef;
116 7         13 $self->{_master_nodes} = undef;
117 7         11 $self->{_slots} = undef;
118 7         13 $self->{_commands} = undef;
119 7         13 $self->{_refresh_timestamp} = undef;
120              
121 7 100       17 unless ( $self->{lazy} ) {
122 5         19 $self->_init;
123             }
124              
125 7         43 return $self;
126             }
127              
128             sub run_command {
129 1     1 1 482 my $self = shift;
130 1         2 my $cmd_name = shift;
131              
132 1         3 return $self->_route( $cmd_name, [ @_ ] );
133             }
134              
135             sub nodes {
136 4     4 1 1680 my $self = shift;
137 4         6 my $key = shift;
138 4         4 my $allow_slaves = shift;
139              
140 4 50       11 unless ( defined $self->{_slots} ) {
141 0         0 $self->_init;
142             }
143              
144 4         5 my $slot;
145 4 100       8 if ( defined $key ) {
146 2         8 $slot = hash_slot($key);
147             }
148              
149 4         8 my $nodes = $self->_nodes( $slot, $allow_slaves );
150              
151             return wantarray
152 4         13 ? @{ $self->{_nodes_pool} }{ @{$nodes} }
  4         6  
153 4 50       9 : $self->{_nodes_pool}{ $nodes->[0] };
154             }
155              
156             sub refresh_interval {
157 16     16 1 1754 my $self = shift;
158              
159 16 100       42 if (@_) {
160 13         22 my $seconds = shift;
161              
162 13 100       36 if ( defined $seconds ) {
163 10 100 100     73 if ( !looks_like_number($seconds) || $seconds < 0 ) {
164 4         445 croak qq{"refresh_interval" must be a positive number};
165             }
166 6         14 $self->{refresh_interval} = $seconds;
167             }
168             else {
169 3         7 $self->{refresh_interval} = D_REFRESH_INTERVAL;
170             }
171             }
172              
173 12         29 return $self->{refresh_interval};
174             }
175              
176             sub crc16 {
177 11     11 1 92 my $data = shift;
178              
179 11 50       28 unless ( utf8::downgrade( $data, 1 ) ) {
180 0         0 utf8::encode($data);
181             }
182              
183 11         15 my $crc = 0;
184 11         30 foreach my $char ( split //, $data ) {
185 39         75 $crc = ( $crc << 8 & 0xff00 )
186             ^ $CRC16_TAB[ ( ( $crc >> 8 ) ^ ord($char) ) & 0x00ff ];
187             }
188              
189 11         33 return $crc;
190             }
191              
192             sub hash_slot {
193 10     10 1 674 my $key = shift;
194              
195 10         13 my $hashtag = $key;
196              
197 10 100       34 if ( $key =~ m/\{([^}]*?)\}/ ) {
198 1 50       4 if ( length $1 > 0 ) {
199 1         2 $hashtag = $1;
200             }
201             }
202              
203 10         18 return crc16($hashtag) % MAX_SLOTS;
204             }
205              
206             sub _init {
207 7     7   15 my $self = shift;
208              
209 7         26 $self->_discover_cluster;
210              
211 7 100       21 if ( $self->{refresh_interval} > 0 ) {
212 6         33 $self->{_refresh_timestamp} = [Time::HiRes::gettimeofday];
213             }
214              
215 7         15 return;
216             }
217              
218             sub _discover_cluster {
219 7     7   12 my $self = shift;
220              
221 7         9 my $nodes;
222              
223 7 100       28 if ( defined $self->{_slots} ) {
224 1         3 $nodes = $self->_nodes( undef, $self->{allow_slaves} );
225             }
226             else {
227 6         8 my %nodes_pool;
228              
229 6         11 foreach my $hostport ( @{ $self->{startup_nodes} } ) {
  6         15  
230 18 50       98 unless ( defined $nodes_pool{$hostport} ) {
231 18         40 $nodes_pool{$hostport} = $self->_new_node($hostport);
232             }
233             }
234              
235 6         40 $self->{_nodes_pool} = \%nodes_pool;
236 6         21 $nodes = [ keys %nodes_pool ];
237             }
238              
239 7         32 $self->_run_command( 'cluster_state', [], $nodes );
240 7         20 my $slots = $self->_run_command( 'cluster_slots', [], $nodes );
241              
242 7 50       12 unless ( @{$slots} ) {
  7         22  
243 0         0 croak 'ERR Returned empty list of slots';
244             }
245              
246 7         22 $self->_prepare_nodes($slots);
247              
248 7 100       22 unless ( defined $self->{_commands} ) {
249 6         44 $self->_load_commands;
250             }
251              
252 7         35 return;
253             }
254              
255             sub _prepare_nodes {
256 7     7   13 my $self = shift;
257 7         11 my $slots_raw = shift;
258              
259 7         19 my %nodes_pool;
260             my @slots;
261 7         0 my @masters_nodes;
262              
263 7         14 my $nodes_pool_old = $self->{_nodes_pool};
264              
265 7         13 foreach my $range ( @{$slots_raw} ) {
  7         17  
266 28         33 my $range_start = shift @{$range};
  28         41  
267 28         30 my $range_end = shift @{$range};
  28         40  
268              
269 28         39 my @nodes;
270 28         34 my $is_master = 1;
271              
272 28         31 foreach my $node_info ( @{$range} ) {
  28         36  
273 70         133 my $hostport = "$node_info->[0]:$node_info->[1]";
274              
275 70 100       131 unless ( defined $nodes_pool{$hostport} ) {
276 49 100       100 if ( defined $nodes_pool_old->{$hostport} ) {
277 25         51 $nodes_pool{$hostport} = delete $nodes_pool_old->{$hostport};
278             }
279             else {
280 24         50 $nodes_pool{$hostport} = $self->_new_node($hostport);
281             }
282              
283 49 100       173 if ($is_master) {
284 21         31 push( @masters_nodes, $hostport );
285 21         46 $is_master = 0;
286             }
287             }
288              
289 70         112 push( @nodes, $hostport );
290             }
291              
292 28         67 push( @slots, [ $range_start, $range_end, \@nodes ] );
293             }
294              
295 7         46 @slots = sort { $a->[0] <=> $b->[0] } @slots;
  28         89  
296              
297 7         18 $self->{_nodes_pool} = \%nodes_pool;
298 7         32 $self->{_nodes} = [ keys %nodes_pool ];
299 7         38 $self->{_master_nodes} = \@masters_nodes;
300 7         18 $self->{_slots} = \@slots;
301              
302 7         39 return;
303             }
304              
305             sub _load_commands {
306 6     6   14 my $self = shift;
307              
308 6         21 my $nodes = $self->_nodes( undef, $self->{allow_slaves} );
309 6         20 my $commands_raw = $self->_run_command( 'command', [], $nodes );
310              
311 6         37 my %commands = %PREDEFINED_CMDS;
312              
313 6         11 foreach my $cmd_raw ( @{$commands_raw} ) {
  6         15  
314 30         54 my $kwd = lc( $cmd_raw->[0] );
315              
316 30 50       59 next if exists $commands{$kwd};
317              
318 30         36 my $readonly = 0;
319 30         35 foreach my $flag ( @{ $cmd_raw->[2] } ) {
  30         44  
320 48 100       78 if ( $flag eq 'readonly' ) {
321 12         17 $readonly = 1;
322 12         23 last;
323             }
324             }
325              
326 30         108 $commands{$kwd} = {
327             readonly => $readonly,
328             key_pos => $cmd_raw->[3],
329             };
330             }
331              
332 6         12 $self->{_commands} = \%commands;
333              
334 6         23 return;
335             }
336              
337             sub _new_node {
338 43     43   56 my $self = shift;
339 43         56 my $hostport = shift;
340              
341             return Redis->new(
342 43         54 %{ $self->{_node_params} },
  43         121  
343             server => $hostport,
344             reconnect => 0.001, # reconnect only once
345             every => 1000,
346             no_auto_connect_on_new => 1,
347              
348             on_connect => $self->_create_on_node_connect($hostport),
349             );
350             }
351              
352             sub _create_on_node_connect {
353 43     43   55 my $self = shift;
354 43         51 my $hostport = shift;
355              
356 43         110 weaken($self);
357              
358             return sub {
359 43     43   13411 my $redis = shift;
360              
361 43 100       96 if ( $self->{allow_slaves} ) {
362 14         50 $redis->readonly;
363             }
364              
365 43 50       2405 if ( defined $self->{on_node_connect} ) {
366 0         0 $self->{on_node_connect}->($hostport);
367             }
368 43         268 };
369             }
370              
371             sub _route {
372 10     10   14 my $self = shift;
373 10         12 my $cmd_name = shift;
374 10         11 my $args = shift;
375              
376 10 100 33     78 if ( !defined $self->{_slots} || (
      66        
377             $self->{refresh_interval} > 0
378             && Time::HiRes::tv_interval( $self->{_refresh_timestamp} )
379             > $self->{refresh_interval} ) )
380             {
381 1         10 $self->_init;
382             }
383              
384 10         133 my $key;
385 10         27 my @kwds = split( m/_/, lc($cmd_name) );
386 10         18 my $cmd_info = $self->{_commands}{ $kwds[0] };
387              
388 10 100       27 if ( defined $cmd_info ) {
389 8 100 33     19 if ( $cmd_info->{key_pos} > 0 ) {
    50          
390 7         17 $key = $args->[ $cmd_info->{key_pos} - scalar @kwds ];
391             }
392             # Exception for EVAL and EVALSHA commands
393             elsif ( $cmd_info->{movablekeys}
394             && $args->[1] > 0 )
395             {
396 0         0 $key = $args->[2];
397             }
398             }
399              
400 10         10 my $slot;
401 10         13 my $allow_slaves = $self->{allow_slaves};
402              
403 10 100       33 if ( defined $key ) {
404 7         29 $slot = hash_slot($key);
405 7   100     20 $allow_slaves &&= $cmd_info->{readonly};
406             }
407              
408 10         18 my $nodes = $self->_nodes( $slot, $allow_slaves );
409              
410 10 50       16 unless ( defined $nodes ) {
411 0         0 croak 'ERR Target node not found. Maybe not all slots are served';
412             }
413              
414 10         23 return $self->_run_command( $cmd_name, $args, $nodes );
415             }
416              
417             sub _run_command {
418 31     31   47 my $self = shift;
419 31         45 my $cmd_name = shift;
420 31         36 my $args = shift;
421 31         36 my $nodes = shift;
422              
423 31         50 my $nodes_pool = $self->{_nodes_pool};
424              
425 31         39 my $nodes_num = scalar @{$nodes};
  31         45  
426 31         78 my $node_index = int( rand($nodes_num) );
427 31         43 my $fails_cnt = 0;
428 31         42 my $wantarray = wantarray;
429              
430 31 100       68 my $cmd_method
431             = $cmd_name eq 'cluster_state'
432             ? 'cluster_info'
433             : $cmd_name;
434              
435 31         36 while (1) {
436 32         47 my $hostport = $nodes->[$node_index];
437 32         49 my $node = $nodes_pool->{$hostport};
438              
439 32         72 my $reply;
440             my @arr_reply;
441 32         0 my $err_msg;
442              
443             {
444 32         38 local $@;
  32         38  
445              
446 32         60 eval {
447 32 100       60 if ( $cmd_name eq 'cluster_state' ) {
    100          
448 7         12 undef $wantarray;
449 7         11 my $reply_raw = $node->$cmd_method( @{$args} );
  7         39  
450 7         2461 $reply = _parse_info($reply_raw);
451              
452 7 50       22 if ( $reply->{cluster_state} eq 'ok' ) {
453 7         16 $reply = 1;
454             }
455             else {
456 0         0 croak 'CLUSTERDOWN The cluster is down';
457             }
458             }
459             elsif ( $wantarray ) {
460 1         2 @arr_reply = $node->$cmd_method( @{$args} );
  1         7  
461             }
462             else {
463 24         35 $reply = $node->$cmd_method( @{$args} );
  24         139  
464             }
465             };
466              
467 32 100       3690 if ($@) {
468 3         6 $err_msg = $@;
469             }
470             }
471              
472 32 100       61 if ($err_msg) {
473 3         5 my $err_code = 'ERR';
474 3 50       15 if ( $err_msg =~ m/^(?:\[\w+\]\s+)?([A-Z]{3,})/ ) {
475 3         9 $err_code = $1;
476             }
477              
478 3 100 66     12 if ( $err_code eq 'MOVED' || $err_code eq 'ASK' ) {
479 1 50       4 if ( $err_code eq 'MOVED' ) {
480 1         4 $self->_init;
481             }
482              
483 1         7 my ($fwd_hostport) = ( split( m/\s+/, $err_msg ) )[3];
484 1         3 $fwd_hostport =~ s/,$//;
485              
486 1 50       3 unless ( defined $nodes_pool->{$fwd_hostport} ) {
487 1         3 $nodes_pool->{$fwd_hostport} = $self->_new_node( $fwd_hostport );
488             }
489              
490 1         8 return $self->_run_command( $cmd_name, $args, [ $fwd_hostport ] );
491             }
492              
493 2 50       6 if ( defined $self->{on_node_error} ) {
494 0         0 $self->{on_node_error}->( $err_msg, $hostport );
495             }
496              
497 2 100       4 if ( ++$fails_cnt < $nodes_num ) {
498 1 50       4 if ( ++$node_index == $nodes_num ) {
499 1         1 $node_index = 0;
500             }
501              
502 1         2 next;
503             }
504              
505 1         5 die $err_msg;
506             }
507              
508 29 100       101 return $wantarray ? @arr_reply : $reply;
509             }
510             }
511              
512             sub _nodes {
513 21     21   29 my $self = shift;
514 21         26 my $slot = shift;
515 21         27 my $allow_slaves = shift;
516              
517 21 100       39 if ( defined $slot ) {
518             my ($range) = bsearch {
519 18 50   18   42 $slot > $_->[1] ? -1 : $slot < $_->[0] ? 1 : 0;
    100          
520             }
521 9         47 @{ $self->{_slots} };
  9         33  
522              
523 9 50       33 return unless defined $range;
524              
525 9 100       22 return $allow_slaves
526             ? $range->[2]
527             : [ $range->[2][0] ];
528             }
529              
530             return $allow_slaves
531             ? $self->{_nodes}
532 12 100       32 : $self->{_master_nodes};
533             }
534              
535             sub _parse_info {
536 7         33 return { map { split( m/:/, $_, 2 ) }
537 7     7   31 grep { m/^[^#]/ } split( EOL, $_[0] ) };
  7         43  
538             }
539              
540             sub AUTOLOAD {
541 5     5   2534 our $AUTOLOAD;
542 5         8 my $cmd_name = $AUTOLOAD;
543 5         28 $cmd_name =~ s/^.+:://;
544              
545             my $sub = sub {
546 9     9   617 my $self = shift;
547 9         26 return $self->_route( $cmd_name, [@_] );
548 5         17 };
549              
550 5         9 do {
551 5     5   63 no strict 'refs';
  5         10  
  5         500  
552 5         6 *{$cmd_name} = $sub;
  5         17  
553             };
554              
555 5         7 goto &{$sub};
  5         13  
556             }
557              
558       0     sub DESTROY { }
559              
560             1;
561             __END__