File Coverage

blib/lib/Redis/Fast.pm
Criterion Covered Total %
statement 100 275 36.3
branch 21 124 16.9
condition 22 42 52.3
subroutine 21 42 50.0
pod 0 11 0.0
total 164 494 33.2


line stmt bran cond sub pod time code
1             package Redis::Fast 0.37;
2              
3             BEGIN {
4 38     38   4670375 use XSLoader;
  38         89  
  38         1645  
5 38     38   123414 XSLoader::load __PACKAGE__, $VERSION;
6             }
7              
8 38     38   334 use warnings;
  38         107  
  38         2586  
9 38     38   242 use strict;
  38         96  
  38         1371  
10              
11 38     38   266 use Carp qw/confess/;
  38         80  
  38         2675  
12 38     38   29565 use Encode;
  38         800847  
  38         6375  
13 38     38   8037 use Try::Tiny;
  38         16623  
  38         2982  
14 38     38   268 use Scalar::Util qw(weaken);
  38         68  
  38         3037  
15 38     38   267 use constant SSL_AVAILABLE => eval { require IO::Socket::SSL };
  38         91  
  38         241  
  38         49837  
16              
17 38     38   5319725 use Redis::Fast::Sentinel;
  38         150  
  38         103611  
18              
19             # small utilities for handling host and port
20             sub _join_host_port {
21 0     0   0 my ($host, $port) = @_;
22 0 0 0     0 return "[$host]:$port" if $host =~ /:/ || $host =~ /%/;
23 0         0 return "$host:$port";
24             }
25             sub _split_host_port {
26 4     4   67 my $hostport = shift;
27 4 50       62 if ($hostport =~ /\A\[([^\]]+)\]:([0-9]+)\z/) {
28 0         0 return $1, $2;
29             }
30 4         36 return split /:/, $hostport;
31             }
32              
33             sub _new_on_connect_cb {
34 70     70   1092 my ($self, $on_conn, $password, $name) = @_;
35 70         341 weaken $self;
36             my $handler = sub {
37             # If we are in PubSub mode we shouldn't perform any command besides
38             # (p)(un)subscribe
39 68 50   68   460 if (! $self->is_subscriber) {
40             defined $name
41             and try {
42 0         0 my $n = $name;
43 0 0       0 $n = $n->($self) if ref($n) eq 'CODE';
44 0 0       0 $self->client_setname($n) if defined $n;
45 68 50       381 };
46 68         295 my $data = $self->__get_data;
47             defined $data->{current_database}
48 68 50       363 and $self->select($data->{current_database});
49             }
50              
51 68         235 my $subscribers = $self->__get_data->{subscribers};
52 68         536 $self->__get_data->{subscribers} = {};
53 68         1118 $self->__get_data->{cbs} = undef;
54 68         155 foreach my $topic (CORE::keys(%{$subscribers})) {
  68         327  
55 0 0       0 if ($topic =~ /(p?message):(.*)$/ ) {
56 0         0 my ($key, $channel) = ($1, $2);
57 0         0 my $subs = $subscribers->{$topic};
58 0 0       0 if ($key eq 'message') {
59 0         0 $self->__subscription_cmd('', 0, subscribe => $channel, $_) for @$subs;
60             } else {
61 0         0 $self->__subscription_cmd('p', 0, psubscribe => $channel, $_) for @$subs;
62             }
63             }
64             }
65              
66 68 50       431 defined $on_conn
67             and $on_conn->($self);
68 70         3128 };
69              
70             return sub {
71 68     68   2236 my $reconnect_stash = $self->__get_reconnect;
72 68 50       1379 if(defined $password) {
73 0         0 my $err;
74 0         0 $self->__set_reconnect(0);
75             try {
76 0         0 $self->auth($password);
77             } catch {
78 0         0 $err = $_;
79 0         0 };
80 0 0       0 if(defined $err) {
81 0 0       0 if($err =~ /ERR invalid password|WRONGPASS invalid username-password pair/) {
82             # password setting is incorrect, no need to reconnect
83 0         0 die("Redis server refused password");
84             } else {
85             # it might be network error
86             # invoke reconnect
87 0         0 $self->__set_reconnect($reconnect_stash);
88 0         0 return ;
89             }
90             }
91             }
92              
93             try {
94             # disable reconnection while executing on_connect handler
95 68         54130 $self->__set_reconnect(0);
96 68         421 $handler->();
97             } catch {
98 0         0 $self->quit();
99             } finally {
100 68         2534 $self->__set_reconnect($reconnect_stash);
101 68         5452 };
102 70         4310 };
103             }
104              
105             sub _new_reconnect_on_error_cb {
106 70     70   462 my ($self, $reconnect_on_error) = @_;
107 70         183 weaken $self;
108              
109 70 50       303 if ($reconnect_on_error) {
110             return sub {
111             # The unit should be second and the type should be double.
112             # -1 is a special value, it means that we do not reconnect.
113 0     0   0 my $next_reconnect_interval = $reconnect_on_error->(@_);
114 0 0       0 if ($next_reconnect_interval < -1) {
115 0         0 warn "reconnect_on_error must not return a number less than -1";
116              
117             # Reset a next_reconnect_interval and do not reconnect.
118 0         0 $next_reconnect_interval = -1;
119             }
120              
121             # Wait until next_reconnect_interval seconds elapse.
122 0         0 $self->__set_next_reconnect_on_error_at($next_reconnect_interval);
123              
124 0         0 my $need_reconnect = 0;
125 0 0       0 if (-1 < $next_reconnect_interval) {
126 0         0 $need_reconnect = 1;
127             }
128 0         0 return $need_reconnect;
129 0         0 };
130             } else {
131 70         309 return;
132             }
133             }
134              
135             sub new {
136 70     70 0 5994947 my $class = shift;
137 70         3302 my %args = @_;
138 70         37178 my $self = $class->_new;
139              
140             ## Deal with REDIS_SERVER ENV
141 70 0 33     999 if ($ENV{REDIS_SERVER} && !$args{sock} && !$args{server}) {
      33        
142 0 0       0 if ($ENV{REDIS_SERVER} =~ m!^/!) {
    0          
    0          
143 0         0 $args{sock} = $ENV{REDIS_SERVER};
144             }
145             elsif ($ENV{REDIS_SERVER} =~ m!^unix:(.+)!) {
146 0         0 $args{sock} = $1;
147             }
148             elsif ($ENV{REDIS_SERVER} =~ m!^(tcp:)?(.+)!) {
149 0         0 $args{server} = $2;
150             }
151             }
152              
153 70         1868 my $on_conn = $args{on_connect};
154 70         1345 my $password = $args{password};
155 70         274 my $name = $args{name};
156 70         2607 $self->__set_on_connect($self->_new_on_connect_cb($on_conn, $password, $name));
157             $self->__set_data({
158             subscribers => {},
159             sentinels_cnx_timeout => $args{sentinels_cnx_timeout},
160             sentinels_read_timeout => $args{sentinels_read_timeout},
161             sentinels_write_timeout => $args{sentinels_write_timeout},
162             no_sentinels_list_update => $args{no_sentinels_list_update},
163 70         3852 });
164              
165 70 100       712 if ($args{sock}) {
    50          
166 66         1385 $self->__connection_info_unix($args{sock});
167             } elsif ($args{sentinels}) {
168 0         0 my $sentinels = $args{sentinels};
169 0 0       0 ref $sentinels eq 'ARRAY'
170             or croak("'sentinels' param must be an ArrayRef");
171             defined($self->__get_data->{service} = $args{service})
172 0 0       0 or croak("Need 'service' name when using 'sentinels'!");
173 0 0 0     0 croak("Does not support SSL and sentinels") if exists $args{ssl} and $args{ssl};
174 0         0 $self->__get_data->{sentinels} = $sentinels;
175 0         0 $self->__get_data->{sentinels_password} = $args{sentinels_password};
176             my $on_build_sock = sub {
177 0     0   0 my $data = $self->__get_data;
178 0         0 my $sentinels = $data->{sentinels};
179              
180             # try to connect to a sentinel
181 0         0 my $status;
182 0         0 foreach my $sentinel_address (@$sentinels) {
183 0 0       0 my $sentinel = eval {
184             Redis::Fast::Sentinel->new(
185             server => $sentinel_address,
186             password => $data->{sentinels_password},
187             cnx_timeout => ( exists $data->{sentinels_cnx_timeout}
188             ? $data->{sentinels_cnx_timeout} : 0.1),
189             read_timeout => ( exists $data->{sentinels_read_timeout}
190             ? $data->{sentinels_read_timeout} : 1 ),
191             write_timeout => ( exists $data->{sentinels_write_timeout}
192 0 0       0 ? $data->{sentinels_write_timeout} : 1 ),
    0          
    0          
193             )
194             } or next;
195 0         0 my $server_address = $sentinel->get_service_address($data->{service});
196 0 0 0     0 defined $server_address
197             or $status ||= "Sentinels don't know this service",
198             next;
199 0 0       0 $server_address eq 'IDONTKNOW'
200             and $status = "service is configured in one Sentinel, but was never reached",
201             next;
202              
203             # we found the service, set the server
204 0         0 my ($server, $port) = _split_host_port $server_address;
205 0         0 $self->__connection_info($server, $port);
206              
207 0 0       0 if (! $data->{no_sentinels_list_update} ) {
208             # move the elected sentinel at the front of the list and add
209             # additional sentinels
210 0         0 my $idx = 2;
211 0         0 my %h = ( ( map { $_ => $idx++ } @{$data->{sentinels}}),
  0         0  
  0         0  
212             $sentinel_address => 1,
213             );
214              
215             $data->{sentinels} = [
216 0         0 ( sort { $h{$a} <=> $h{$b} } CORE::keys(%h) ), # sorted existing sentinels,
217 0         0 grep { ! $h{$_}; } # list of unknown
218             map {
219 0         0 my $s = +{ @$_ };
220 0         0 _join_host_port($s->{ip}, $s->{port});
221             } # ip:port of
222             $sentinel->sentinel( # sentinels
223             sentinels => $data->{service} # for this service
224             )
225 0         0 ];
226             }
227             }
228 0         0 };
229 0         0 $self->__set_on_build_sock($on_build_sock);
230             } else {
231 4   50     93 my ($server, $port) = _split_host_port($args{server} || '127.0.0.1:6379');
232 4         64 $self->__connection_info($server, $port);
233             }
234              
235             #$self->{is_subscriber} = 0;
236             #$self->{subscribers} = {};
237 70   100     1533 $self->__set_reconnect($args{reconnect} || 0);
238 70   100     1411 $self->__set_every($args{every} || 1000);
239 70 50 33     1119 $self->__set_debug(($args{debug} || $ENV{REDIS_DEBUG}) ? 1 : 0);
240 70   100     1290 $self->__set_cnx_timeout($args{cnx_timeout} || -1);
241 70   100     1045 $self->__set_read_timeout($args{read_timeout} || -1);
242 70   100     592 $self->__set_write_timeout($args{write_timeout} || -1);
243 70   100     539 $self->__set_ssl($args{ssl} || 0);
244 70 50 100     1067 if ($args{ssl} && SSL_AVAILABLE && $args{SSL_verify_mode}) {
      66        
245             # To pass the SSL verify mode to the underlying bindings, we'll use a string
246 0 0       0 $self->__set_ssl_verify_mode("SSL_VERIFY_NONE") if ($args{SSL_verify_mode} == IO::Socket::SSL::SSL_VERIFY_NONE);
247 0 0       0 $self->__set_ssl_verify_mode("SSL_VERIFY_PEER") if ($args{SSL_verify_mode} == IO::Socket::SSL::SSL_VERIFY_PEER);
248 0 0       0 $self->__set_ssl_verify_mode("SSL_VERIFY_FAIL_IF_NO_PEER_CERT") if ($args{SSL_verify_mode} == IO::Socket::SSL::SSL_VERIFY_FAIL_IF_NO_PEER_CERT);
249 0 0       0 $self->__set_ssl_verify_mode("SSL_VERIFY_CLIENT_ONCE") if ($args{SSL_verify_mode} == IO::Socket::SSL::SSL_VERIFY_CLIENT_ONCE);
250             }
251              
252 70 50       958 if (my $cb = $self->_new_reconnect_on_error_cb($args{reconnect_on_error})) {
253 0         0 $self->__set_reconnect_on_error($cb);
254             }
255              
256 70 100       1100723 $self->connect unless $args{no_auto_connect_on_new};
257              
258 69         4225 return $self;
259             }
260              
261              
262              
263             ### Deal with common, general case, Redis commands
264             our $AUTOLOAD;
265              
266             sub AUTOLOAD {
267 13     13   6207 my $command = $AUTOLOAD;
268 13         324 $command =~ s/.*://;
269 13         56 my @command = split /_/, uc $command;
270              
271             my $method = sub {
272 68     68   4169 my $self = shift;
273 68         2152 $self->__is_valid_command($command);
274 68         2349937 my ($ret, $error) = $self->__std_cmd(@command, @_);
275 68 100       10175 confess "[$command] $error, " if defined $error;
276 54 100 66     2924 return (wantarray && ref $ret eq 'ARRAY') ? @$ret : $ret;
277 13         492 };
278              
279             # Save this method for future calls
280 38     38   635 no strict 'refs';
  38         115  
  38         74434  
281 13         575 *$AUTOLOAD = $method;
282              
283 13         58 goto $method;
284             }
285              
286             sub __with_reconnect {
287 0     0   0 my ($self, $cb) = @_;
288              
289 0         0 confess "not implemented";
290             }
291              
292              
293             ### Commands with extra logic
294              
295             sub keys {
296 0     0 0 0 my $self = shift;
297 0         0 $self->__is_valid_command('keys');
298 0         0 my ($ret, $error) = $self->__keys(@_);
299 0 0       0 confess "[keys] $error, " if defined $error;
300 0 0       0 return $ret unless ref $ret eq 'ARRAY';
301 0         0 return @$ret;
302             }
303              
304             sub ping {
305 0     0 0 0 my $self = shift;
306 0         0 $self->__is_valid_command('ping');
307 0 0       0 return unless $self->__sock;
308             return scalar try {
309 0     0   0 my ($ret, $error) = $self->__std_cmd('ping');
310 0 0       0 return if defined $error;
311 0         0 return $ret;
312             } catch {
313 0     0   0 return ;
314 0         0 };
315             }
316              
317             sub info {
318 3     3 0 20 my $self = shift;
319 3         10 $self->__is_valid_command('info');
320 3         20 my ($ret, $error) = $self->__info(@_);
321 3 50       404 confess "[info] $error, " if defined $error;
322 0 0       0 return $ret unless ref $ret eq 'ARRAY';
323 0         0 return @$ret;
324             }
325              
326             sub quit {
327 0     0 0 0 my $self = shift;
328 0         0 $self->__is_valid_command('quit');
329 0         0 $self->__quit(@_);
330             }
331              
332             sub shutdown {
333 0     0 0 0 my $self = shift;
334 0         0 $self->__is_valid_command('shutdown');
335 0         0 $self->__shutdown(@_);
336             }
337              
338             sub select {
339 0     0 0 0 my $self = shift;
340 0         0 my $database = shift;
341 0         0 $self->__is_valid_command('select');
342 0         0 my ($ret, $error) = $self->__std_cmd('SELECT', $database, @_);
343 0 0       0 confess "[SELECT] $error, " if defined $error;
344 0         0 $self->__get_data->{current_database} = $database;
345 0         0 return $ret;
346             }
347              
348             sub __subscription_cmd {
349 0     0   0 my $self = shift;
350 0         0 my $pr = shift;
351 0         0 my $unsub = shift;
352 0         0 my $command = shift;
353 0         0 my $cb = pop;
354 0         0 weaken $self;
355              
356 0 0       0 confess("Missing required callback in call to $command(), ")
357             unless ref($cb) eq 'CODE';
358              
359 0         0 $self->wait_all_responses;
360              
361 0         0 while($self->__get_data->{cbs}) {
362 0         0 $self->__wait_for_event(1);
363             }
364              
365 0         0 my @subs = @_;
366 0 0       0 @subs = $self->__process_unsubscribe_requests($cb, $pr, @subs)
367             if $unsub;
368              
369 0 0       0 if(@subs) {
370 0         0 $self->__get_data->{cbs} = { map { ("${pr}message:$_" => $cb) } @subs };
  0         0  
371 0         0 for my $sub(@subs) {
372 0         0 $self->__send_subscription_cmd(
373             $command,
374             $sub,
375             $self->__subscription_callbak,
376             );
377             }
378 0         0 while($self->__get_data->{cbs}) {
379 0         0 $self->__wait_for_event(1);
380             }
381             }
382             }
383              
384             sub __subscription_callbak {
385 0     0   0 my $self = shift;
386 0         0 my $cb = $self->__get_data->{callback};
387 0 0       0 return $cb if $cb;
388              
389 0         0 weaken $self;
390             $cb = sub {
391 0     0   0 my $cbs = $self->__get_data->{cbs};
392 0 0       0 if($cbs) {
393 0         0 $self->__process_subscription_changes($cbs, @_);
394 0 0       0 unless(%$cbs) {
395 0         0 $self->__get_data->{cbs} = undef;
396             }
397             } else {
398 0         0 $self->__process_pubsub_msg(@_);
399             }
400 0         0 };
401              
402 0         0 $self->__get_data->{callback} = $cb;
403 0         0 return $cb;
404             }
405              
406 0     0 0 0 sub subscribe { shift->__subscription_cmd('', 0, subscribe => @_) }
407 0     0 0 0 sub psubscribe { shift->__subscription_cmd('p', 0, psubscribe => @_) }
408 0     0 0 0 sub unsubscribe { shift->__subscription_cmd('', 1, unsubscribe => @_) }
409 0     0 0 0 sub punsubscribe { shift->__subscription_cmd('p', 1, punsubscribe => @_) }
410              
411             sub __process_unsubscribe_requests {
412 0     0   0 my ($self, $cb, $pr, @unsubs) = @_;
413 0         0 my $subs = $self->__get_data->{subscribers};
414              
415 0         0 my @subs_to_unsubscribe;
416 0         0 for my $sub (@unsubs) {
417 0         0 my $key = "${pr}message:$sub";
418 0 0 0     0 next unless $subs->{$key} && @{ $subs->{$key} };
  0         0  
419 0         0 my $cbs = $subs->{$key} = [grep { $_ ne $cb } @{ $subs->{$key} }];
  0         0  
  0         0  
420 0 0       0 next if @$cbs;
421              
422 0         0 delete $subs->{$key};
423 0         0 push @subs_to_unsubscribe, $sub;
424             }
425 0         0 return @subs_to_unsubscribe;
426             }
427              
428             sub __process_subscription_changes {
429 0     0   0 my ($self, $expected, $m, $error) = @_;
430 0         0 my $subs = $self->__get_data->{subscribers};
431              
432             ## Deal with pending PUBLISH'ed messages
433 0 0       0 if ($m->[0] =~ /^p?message$/) {
434 0         0 $self->__process_pubsub_msg($m);
435 0         0 return ;
436             }
437              
438 0         0 my ($key, $unsub) = $m->[0] =~ m/^(p)?(un)?subscribe$/;
439 0         0 $key .= "message:$m->[1]";
440 0         0 my $cb = delete $expected->{$key};
441              
442 0 0       0 push @{ $subs->{$key} }, $cb unless $unsub;
  0         0  
443             }
444              
445             sub __process_pubsub_msg {
446 0     0   0 my ($self, $m) = @_;
447 0         0 my $subs = $self->__get_data->{subscribers};
448              
449 0         0 my $sub = $m->[1];
450 0         0 my $cbid = "$m->[0]:$sub";
451 0         0 my $data = pop @$m;
452 0 0       0 my $topic = defined $m->[2] ? $m->[2] : $sub;
453              
454 0 0       0 if (!exists $subs->{$cbid}) {
455 0         0 warn "Message for topic '$topic' ($cbid) without expected callback, ";
456 0         0 return 0;
457             }
458              
459 0         0 $_->($data, $topic, $sub) for @{ $subs->{$cbid} };
  0         0  
460              
461 0         0 return 1;
462              
463             }
464              
465             sub __is_valid_command {
466 71     71   267 my ($self, $cmd) = @_;
467              
468 71 50       662 confess("Cannot use command '$cmd' while in SUBSCRIBE mode, ")
469             if $self->is_subscriber;
470             }
471              
472             1; # End of Redis.pm
473              
474             __END__