File Coverage

blib/lib/Redis/ClusterRider.pm
Criterion Covered Total %
statement 278 297 93.6
branch 82 108 75.9
condition 13 21 61.9
subroutine 32 34 94.1
pod 6 6 100.0
total 411 466 88.2


line stmt bran cond sub pod time code
1             package Redis::ClusterRider;
2              
3 5     5   385339 use 5.008000;
  5         39  
4 5     5   26 use strict;
  5         11  
  5         121  
5 5     5   25 use warnings;
  5         9  
  5         147  
6 5     5   26 use base qw( Exporter );
  5         10  
  5         874  
7              
8             our $VERSION = '0.21';
9              
10 5     5   1459 use Redis;
  5         102317  
  5         209  
11 5     5   2907 use List::MoreUtils qw( bsearch );
  5         71275  
  5         34  
12 5     5   5426 use Scalar::Util qw( looks_like_number weaken );
  5         15  
  5         318  
13 5     5   1661 use Time::HiRes;
  5         4274  
  5         31  
14 5     5   669 use Carp qw( croak );
  5         12  
  5         337  
15              
16             BEGIN {
17 5     5   207 our @EXPORT_OK = qw( crc16 hash_slot );
18             }
19              
20             use constant {
21 5         15298 D_REFRESH_INTERVAL => 15,
22             MAX_SLOTS => 16384,
23             EOL => "\r\n",
24 5     5   31 };
  5         11  
25              
26             my @CRC16_TAB = (
27             0x0000, 0x1021, 0x2042, 0x3063, 0x4084, 0x50a5, 0x60c6, 0x70e7,
28             0x8108, 0x9129, 0xa14a, 0xb16b, 0xc18c, 0xd1ad, 0xe1ce, 0xf1ef,
29             0x1231, 0x0210, 0x3273, 0x2252, 0x52b5, 0x4294, 0x72f7, 0x62d6,
30             0x9339, 0x8318, 0xb37b, 0xa35a, 0xd3bd, 0xc39c, 0xf3ff, 0xe3de,
31             0x2462, 0x3443, 0x0420, 0x1401, 0x64e6, 0x74c7, 0x44a4, 0x5485,
32             0xa56a, 0xb54b, 0x8528, 0x9509, 0xe5ee, 0xf5cf, 0xc5ac, 0xd58d,
33             0x3653, 0x2672, 0x1611, 0x0630, 0x76d7, 0x66f6, 0x5695, 0x46b4,
34             0xb75b, 0xa77a, 0x9719, 0x8738, 0xf7df, 0xe7fe, 0xd79d, 0xc7bc,
35             0x48c4, 0x58e5, 0x6886, 0x78a7, 0x0840, 0x1861, 0x2802, 0x3823,
36             0xc9cc, 0xd9ed, 0xe98e, 0xf9af, 0x8948, 0x9969, 0xa90a, 0xb92b,
37             0x5af5, 0x4ad4, 0x7ab7, 0x6a96, 0x1a71, 0x0a50, 0x3a33, 0x2a12,
38             0xdbfd, 0xcbdc, 0xfbbf, 0xeb9e, 0x9b79, 0x8b58, 0xbb3b, 0xab1a,
39             0x6ca6, 0x7c87, 0x4ce4, 0x5cc5, 0x2c22, 0x3c03, 0x0c60, 0x1c41,
40             0xedae, 0xfd8f, 0xcdec, 0xddcd, 0xad2a, 0xbd0b, 0x8d68, 0x9d49,
41             0x7e97, 0x6eb6, 0x5ed5, 0x4ef4, 0x3e13, 0x2e32, 0x1e51, 0x0e70,
42             0xff9f, 0xefbe, 0xdfdd, 0xcffc, 0xbf1b, 0xaf3a, 0x9f59, 0x8f78,
43             0x9188, 0x81a9, 0xb1ca, 0xa1eb, 0xd10c, 0xc12d, 0xf14e, 0xe16f,
44             0x1080, 0x00a1, 0x30c2, 0x20e3, 0x5004, 0x4025, 0x7046, 0x6067,
45             0x83b9, 0x9398, 0xa3fb, 0xb3da, 0xc33d, 0xd31c, 0xe37f, 0xf35e,
46             0x02b1, 0x1290, 0x22f3, 0x32d2, 0x4235, 0x5214, 0x6277, 0x7256,
47             0xb5ea, 0xa5cb, 0x95a8, 0x8589, 0xf56e, 0xe54f, 0xd52c, 0xc50d,
48             0x34e2, 0x24c3, 0x14a0, 0x0481, 0x7466, 0x6447, 0x5424, 0x4405,
49             0xa7db, 0xb7fa, 0x8799, 0x97b8, 0xe75f, 0xf77e, 0xc71d, 0xd73c,
50             0x26d3, 0x36f2, 0x0691, 0x16b0, 0x6657, 0x7676, 0x4615, 0x5634,
51             0xd94c, 0xc96d, 0xf90e, 0xe92f, 0x99c8, 0x89e9, 0xb98a, 0xa9ab,
52             0x5844, 0x4865, 0x7806, 0x6827, 0x18c0, 0x08e1, 0x3882, 0x28a3,
53             0xcb7d, 0xdb5c, 0xeb3f, 0xfb1e, 0x8bf9, 0x9bd8, 0xabbb, 0xbb9a,
54             0x4a75, 0x5a54, 0x6a37, 0x7a16, 0x0af1, 0x1ad0, 0x2ab3, 0x3a92,
55             0xfd2e, 0xed0f, 0xdd6c, 0xcd4d, 0xbdaa, 0xad8b, 0x9de8, 0x8dc9,
56             0x7c26, 0x6c07, 0x5c64, 0x4c45, 0x3ca2, 0x2c83, 0x1ce0, 0x0cc1,
57             0xef1f, 0xff3e, 0xcf5d, 0xdf7c, 0xaf9b, 0xbfba, 0x8fd9, 0x9ff8,
58             0x6e17, 0x7e36, 0x4e55, 0x5e74, 0x2e93, 0x3eb2, 0x0ed1, 0x1ef0,
59             );
60              
61             my %PREDEFINED_CMDS = (
62             sort => { readonly => 0, key_pos => 1 },
63             zunionstore => { readonly => 0, key_pos => 1 },
64             zinterstore => { readonly => 0, key_pos => 1 },
65             eval => { readonly => 0, movablekeys => 1, key_pos => 0 },
66             evalsha => { readonly => 0, movablekeys => 1, key_pos => 0 },
67             );
68              
69             $Carp::Internal{ (__PACKAGE__) }++;
70              
71              
72             sub new {
73 10     10 1 5216 my $class = shift;
74 10         35 my %params = @_;
75              
76 10         27 my $self = bless {}, $class;
77              
78 10 100       35 unless ( defined $params{startup_nodes} ) {
79 1         194 croak 'Startup nodes not specified';
80             }
81 9 100       34 unless ( ref( $params{startup_nodes} ) eq 'ARRAY' ) {
82 1         94 croak 'Startup nodes must be specified as array reference';
83             }
84 8 100       15 unless ( @{ $params{startup_nodes} } ) {
  8         26  
85 1         89 croak 'Specified empty list of startup nodes';
86             }
87              
88 7         48 $self->{startup_nodes} = $params{startup_nodes};
89 7         18 $self->{allow_slaves} = $params{allow_slaves};
90 7         13 $self->{lazy} = $params{lazy};
91 7         31 $self->refresh_interval( $params{refresh_interval} );
92              
93 5         13 $self->{on_node_connect} = $params{on_node_connect};
94 5         12 $self->{on_node_error} = $params{on_node_error};
95              
96 5         8 my %node_params;
97 5         14 foreach my $name ( qw( conservative_reconnect cnx_timeout read_timeout
98             write_timeout password name debug ) )
99             {
100 35 100       83 next unless defined $params{$name};
101 2         5 $node_params{$name} = $params{$name};
102             }
103 5         14 $self->{_node_params} = \%node_params;
104              
105 5         14 $self->{_nodes_pool} = undef;
106 5         11 $self->{_nodes} = undef;
107 5         10 $self->{_master_nodes} = undef;
108 5         10 $self->{_slots} = undef;
109 5         10 $self->{_commands} = undef;
110 5         10 $self->{_refresh_timestamp} = undef;
111              
112 5 100       14 unless ( $self->{lazy} ) {
113 3         10 $self->_init;
114             }
115              
116 5         26 return $self;
117             }
118              
119             sub run_command {
120 1     1 1 583 my $self = shift;
121 1         3 my $cmd_name = shift;
122              
123 1         4 return $self->_route( $cmd_name, [ @_ ] );
124             }
125              
126             sub nodes {
127 4     4 1 2109 my $self = shift;
128 4         5 my $key = shift;
129 4         8 my $allow_slaves = shift;
130              
131 4 50       13 unless ( defined $self->{_slots} ) {
132 0         0 $self->_init;
133             }
134              
135 4         5 my $slot;
136 4 100       10 if ( defined $key ) {
137 2         7 $slot = hash_slot($key);
138             }
139              
140 4         12 my $nodes = $self->_nodes( $slot, $allow_slaves );
141              
142             return wantarray
143 4         15 ? @{ $self->{_nodes_pool} }{ @{$nodes} }
  4         8  
144 4 50       9 : $self->{_nodes_pool}{ $nodes->[0] };
145             }
146              
147             sub refresh_interval {
148 14     14 1 1327 my $self = shift;
149              
150 14 100       42 if (@_) {
151 11         18 my $seconds = shift;
152              
153 11 100       36 if ( defined $seconds ) {
154 8 100 100     61 if ( !looks_like_number($seconds) || $seconds < 0 ) {
155 4         402 croak qq{"refresh_interval" must be a positive number};
156             }
157 4         10 $self->{refresh_interval} = $seconds;
158             }
159             else {
160 3         9 $self->{refresh_interval} = D_REFRESH_INTERVAL;
161             }
162             }
163              
164 10         24 return $self->{refresh_interval};
165             }
166              
167             sub crc16 {
168 9     9 1 95 my $data = shift;
169              
170 9 50       32 unless ( utf8::downgrade( $data, 1 ) ) {
171 0         0 utf8::encode($data);
172             }
173              
174 9         14 my $crc = 0;
175 9         31 foreach my $char ( split //, $data ) {
176 33         78 $crc = ( $crc << 8 & 0xff00 )
177             ^ $CRC16_TAB[ ( ( $crc >> 8 ) ^ ord($char) ) & 0x00ff ];
178             }
179              
180 9         29 return $crc;
181             }
182              
183             sub hash_slot {
184 8     8 1 825 my $key = shift;
185              
186 8         10 my $hashtag = $key;
187              
188 8 100       35 if ( $key =~ m/\{([^}]*?)\}/ ) {
189 1 50       5 if ( length $1 > 0 ) {
190 1         3 $hashtag = $1;
191             }
192             }
193              
194 8         20 return crc16($hashtag) % MAX_SLOTS;
195             }
196              
197             sub _init {
198 4     4   10 my $self = shift;
199              
200 4         14 $self->_discover_cluster;
201              
202 4 100       18 if ( $self->{refresh_interval} > 0 ) {
203 3         21 $self->{_refresh_timestamp} = [Time::HiRes::gettimeofday];
204             }
205              
206 4         8 return;
207             }
208              
209             sub _discover_cluster {
210 4     4   8 my $self = shift;
211              
212 4         7 my $nodes;
213              
214 4 50       15 if ( defined $self->{_slots} ) {
215 0         0 $nodes = $self->_nodes( undef, $self->{allow_slaves} );
216             }
217             else {
218 4         7 my %nodes_pool;
219              
220 4         5 foreach my $hostport ( @{ $self->{startup_nodes} } ) {
  4         12  
221 12 50       2450 unless ( defined $nodes_pool{$hostport} ) {
222 12         34 $nodes_pool{$hostport} = $self->_new_node($hostport);
223             }
224             }
225              
226 4         1155 $self->{_nodes_pool} = \%nodes_pool;
227 4         17 $nodes = [ keys %nodes_pool ];
228             }
229              
230 4         36 $self->_run_command( 'cluster_state', [], $nodes );
231 4         15 my $slots = $self->_run_command( 'cluster_slots', [], $nodes );
232              
233 4 50       10 unless ( @{$slots} ) {
  4         16  
234 0         0 croak 'ERR Returned empty list of slots';
235             }
236              
237 4         16 $self->_prepare_nodes($slots);
238              
239 4 50       14 unless ( defined $self->{_commands} ) {
240 4         13 $self->_load_commands;
241             }
242              
243 4         17 return;
244             }
245              
246             sub _prepare_nodes {
247 4     4   9 my $self = shift;
248 4         9 my $slots_raw = shift;
249              
250 4         17 my %nodes_pool;
251             my @slots;
252 4         0 my @masters_nodes;
253 4         0 my @slave_nodes;
254              
255 4         10 my $nodes_pool_old = $self->{_nodes_pool};
256              
257 4         8 foreach my $range ( @{$slots_raw} ) {
  4         10  
258 16         25 my $range_start = shift @{$range};
  16         29  
259 16         27 my $range_end = shift @{$range};
  16         25  
260              
261 16         25 my @nodes;
262 16         23 my $is_master = 1;
263              
264 16         19 foreach my $node_info ( @{$range} ) {
  16         29  
265 40         89 my $hostport = "$node_info->[0]:$node_info->[1]";
266              
267 40 100       87 unless ( defined $nodes_pool{$hostport} ) {
268 28 100       61 if ( defined $nodes_pool_old->{$hostport} ) {
269 12         29 $nodes_pool{$hostport} = delete $nodes_pool_old->{$hostport};
270             }
271             else {
272 16         34 $nodes_pool{$hostport} = $self->_new_node($hostport);
273              
274 16 50       4427 unless ($is_master) {
275 16         35 push( @slave_nodes, $hostport );
276             }
277             }
278              
279 28 100       57 if ($is_master) {
280 12         24 push( @masters_nodes, $hostport );
281 12         21 $is_master = 0;
282             }
283             }
284              
285 40         68 push( @nodes, $hostport );
286             }
287              
288 16         57 push( @slots, [ $range_start, $range_end, \@nodes ] );
289             }
290              
291 4         45 @slots = sort { $a->[0] <=> $b->[0] } @slots;
  16         44  
292              
293 4         14 $self->{_nodes_pool} = \%nodes_pool;
294 4         40 $self->{_nodes} = [ keys %nodes_pool ];
295 4         40 $self->{_master_nodes} = \@masters_nodes;
296 4         11 $self->{_slots} = \@slots;
297              
298 4 100 66     34 if ( $self->{allow_slaves} && @slave_nodes ) {
299 1         4 $self->_prepare_slaves( \@slave_nodes );
300             }
301              
302 4         17 return;
303             }
304              
305             sub _prepare_slaves {
306 1     1   2 my $self = shift;
307 1         28 my $slave_nodes = shift;
308              
309 1         4 foreach my $hostport ( @{$slave_nodes} ) {
  1         3  
310 4         8 local $@;
311              
312 4         6 eval { $self->_run_command( 'readonly', [], [ $hostport ] ) };
  4         12  
313              
314 4 50       11 if ($@) {
315 0         0 warn $@;
316             }
317             }
318              
319 1         3 return;
320             }
321              
322             sub _load_commands {
323 4     4   23 my $self = shift;
324              
325 4         17 my $nodes = $self->_nodes( undef, $self->{allow_slaves} );
326 4         30 my $commands_raw = $self->_run_command( 'command', [], $nodes );
327              
328 4         35 my %commands = %PREDEFINED_CMDS;
329              
330 4         12 foreach my $cmd_raw ( @{$commands_raw} ) {
  4         11  
331 20         45 my $kwd = lc( $cmd_raw->[0] );
332              
333 20 50       53 next if exists $commands{$kwd};
334              
335 20         29 my $readonly = 0;
336 20         26 foreach my $flag ( @{ $cmd_raw->[2] } ) {
  20         36  
337 32 100       75 if ( $flag eq 'readonly' ) {
338 8         13 $readonly = 1;
339 8         14 last;
340             }
341             }
342              
343 20         62 $commands{$kwd} = {
344             readonly => $readonly,
345             key_pos => $cmd_raw->[3],
346             };
347             }
348              
349 4         11 $self->{_commands} = \%commands;
350              
351 4         19 return;
352             }
353              
354             sub _new_node {
355 28     28   43 my $self = shift;
356 28         50 my $hostport = shift;
357              
358             return Redis->new(
359 28         43 %{ $self->{_node_params} },
  28         88  
360             server => $hostport,
361             reconnect => 0.001, # reconnect only once
362             every => 1000,
363             no_auto_connect_on_new => 1,
364              
365             on_connect => $self->_create_on_node_connect($hostport),
366             );
367             }
368              
369             sub _create_on_node_connect {
370 28     28   43 my $self = shift;
371 28         38 my $hostport = shift;
372              
373 28         74 weaken($self);
374              
375             return sub {
376 0 0   0   0 if ( defined $self->{on_node_connect} ) {
377 0         0 $self->{on_node_connect}->($hostport);
378             }
379 28         135 };
380             }
381              
382             sub _route {
383 8     8   14 my $self = shift;
384 8         10 my $cmd_name = shift;
385 8         16 my $args = shift;
386              
387 8 100 33     58 if ( !defined $self->{_slots} || (
      66        
388             $self->{refresh_interval} > 0
389             && Time::HiRes::tv_interval( $self->{_refresh_timestamp} )
390             > $self->{refresh_interval} ) )
391             {
392 1         3 $self->_init;
393             }
394              
395 8         137 my $key;
396 8         28 my @kwds = split( m/_/, lc($cmd_name) );
397 8         19 my $cmd_info = $self->{_commands}{ $kwds[0] };
398              
399 8 100       18 if ( defined $cmd_info ) {
400 6 100 33     18 if ( $cmd_info->{key_pos} > 0 ) {
    50          
401 5         22 $key = $args->[ $cmd_info->{key_pos} - scalar @kwds ];
402             }
403             # Exception for EVAL and EVALSHA commands
404             elsif ( $cmd_info->{movablekeys}
405             && $args->[1] > 0 )
406             {
407 0         0 $key = $args->[2];
408             }
409             }
410              
411 8         14 my $slot;
412 8         12 my $allow_slaves = $self->{allow_slaves};
413              
414 8 100       17 if ( defined $key ) {
415 5         12 $slot = hash_slot($key);
416 5   100     27 $allow_slaves &&= $cmd_info->{readonly};
417             }
418              
419 8         26 my $nodes = $self->_nodes( $slot, $allow_slaves );
420              
421 8 50       18 unless ( defined $nodes ) {
422 0         0 croak 'ERR Target node not found. Maybe not all slots are served';
423             }
424              
425 8         16 return $self->_run_command( $cmd_name, $args, $nodes );
426             }
427              
428             sub _run_command {
429 24     24   39 my $self = shift;
430 24         41 my $cmd_name = shift;
431 24         32 my $args = shift;
432 24         33 my $nodes = shift;
433              
434 24         39 my $nodes_pool = $self->{_nodes_pool};
435              
436 24         33 my $nodes_num = scalar @{$nodes};
  24         39  
437 24         166 my $node_index = int( rand($nodes_num) );
438 24         58 my $fails_cnt = 0;
439 24         41 my $wantarray = wantarray;
440              
441 24 100       54 my $cmd_method
442             = $cmd_name eq 'cluster_state'
443             ? 'cluster_info'
444             : $cmd_name;
445              
446 24         35 while (1) {
447 25         41 my $hostport = $nodes->[$node_index];
448 25         44 my $node = $nodes_pool->{$hostport};
449              
450 25         57 my $reply;
451             my @arr_reply;
452 25         0 my $err_msg;
453              
454             {
455 25         36 local $@;
  25         37  
456              
457 25         46 eval {
458 25 100       64 if ( $cmd_name eq 'cluster_state' ) {
    100          
459 4         6 undef $wantarray;
460 4         8 my $reply_raw = $node->$cmd_method( @{$args} );
  4         42  
461 4         388 $reply = _parse_info($reply_raw);
462              
463 4 50       18 if ( $reply->{cluster_state} eq 'ok' ) {
464 4         10 $reply = 1;
465             }
466             else {
467 0         0 croak 'CLUSTERDOWN The cluster is down';
468             }
469             }
470             elsif ( $wantarray ) {
471 1         3 @arr_reply = $node->$cmd_method( @{$args} );
  1         7  
472             }
473             else {
474 20         31 $reply = $node->$cmd_method( @{$args} );
  20         143  
475             }
476             };
477              
478 25 100       2079 if ($@) {
479 2         5 $err_msg = $@;
480             }
481             }
482              
483 25 100       53 if ($err_msg) {
484 2         5 my $err_code = 'ERR';
485 2 50       11 if ( $err_msg =~ m/^(?:\[\w+\]\s+)?([A-Z]{3,})/ ) {
486 2         7 $err_code = $1;
487             }
488              
489 2 50 33     10 if ( $err_code eq 'MOVED' || $err_code eq 'ASK' ) {
490 0 0       0 if ( $err_code eq 'MOVED' ) {
491 0         0 $self->_init;
492             }
493              
494 0         0 my ($fwd_hostport) = ( split( m/\s+/, $err_msg ) )[3];
495 0         0 $fwd_hostport =~ s/,$//;
496              
497 0 0       0 unless ( defined $nodes_pool->{$fwd_hostport} ) {
498 0         0 $nodes_pool->{$fwd_hostport} = $self->_new_node( $fwd_hostport );
499             }
500              
501 0         0 return $self->_run_command( $cmd_name, $args, [ $fwd_hostport ] );
502             }
503              
504 2 50       6 if ( defined $self->{on_node_error} ) {
505 0         0 $self->{on_node_error}->( $err_msg, $hostport );
506             }
507              
508 2 100       5 if ( ++$fails_cnt < $nodes_num ) {
509 1 50       3 if ( ++$node_index == $nodes_num ) {
510 0         0 $node_index = 0;
511             }
512              
513 1         3 next;
514             }
515              
516 1         7 die $err_msg;
517             }
518              
519 23 100       92 return $wantarray ? @arr_reply : $reply;
520             }
521             }
522              
523             sub _nodes {
524 16     16   41 my $self = shift;
525 16         52 my $slot = shift;
526 16         24 my $allow_slaves = shift;
527              
528 16 100       35 if ( defined $slot ) {
529             my ($range) = bsearch {
530 14 50   14   42 $slot > $_->[1] ? -1 : $slot < $_->[0] ? 1 : 0;
    100          
531             }
532 7         30 @{ $self->{_slots} };
  7         27  
533              
534 7 50       38 return unless defined $range;
535              
536 7 100       23 return $allow_slaves
537             ? $range->[2]
538             : [ $range->[2][0] ];
539             }
540              
541             return $allow_slaves
542             ? $self->{_nodes}
543 9 100       28 : $self->{_master_nodes};
544             }
545              
546             sub _parse_info {
547 4         34 return { map { split( m/:/, $_, 2 ) }
548 4     4   29 grep { m/^[^#]/ } split( EOL, $_[0] ) };
  4         26  
549             }
550              
551             sub AUTOLOAD {
552 5     5   3118 our $AUTOLOAD;
553 5         8 my $cmd_name = $AUTOLOAD;
554 5         35 $cmd_name =~ s/^.+:://;
555              
556             my $sub = sub {
557 7     7   568 my $self = shift;
558 7         23 return $self->_route( $cmd_name, [@_] );
559 5         24 };
560              
561 5         10 do {
562 5     5   47 no strict 'refs';
  5         12  
  5         475  
563 5         6 *{$cmd_name} = $sub;
  5         19  
564             };
565              
566 5         9 goto &{$sub};
  5         15  
567             }
568              
569       0     sub DESTROY { }
570              
571             1;
572             __END__