File Coverage

blib/lib/Redis/Fast.pm
Criterion Covered Total %
statement 95 265 35.8
branch 20 112 17.8
condition 16 32 50.0
subroutine 20 41 48.7
pod 0 11 0.0
total 151 461 32.7


line stmt bran cond sub pod time code
1             package Redis::Fast;
2              
3             BEGIN {
4 38     38   2827723 use XSLoader;
  38         440  
  38         1690  
5 38     38   116 our $VERSION = '0.36';
6 38         25102 XSLoader::load __PACKAGE__, $VERSION;
7             }
8              
9 38     38   335 use warnings;
  38         91  
  38         1188  
10 38     38   223 use strict;
  38         163  
  38         1141  
11              
12 38     38   257 use Carp qw/confess/;
  38         82  
  38         2020  
13 38     38   22688 use Encode;
  38         396910  
  38         2527  
14 38     38   3634 use Try::Tiny;
  38         14648  
  38         1816  
15 38     38   236 use Scalar::Util qw(weaken);
  38         69  
  38         1612  
16              
17 38     38   14907 use Redis::Fast::Sentinel;
  38         111  
  38         72548  
18              
19              
20             # small utilities for handling host and port
21             sub _join_host_port {
22 0     0   0 my ($host, $port) = @_;
23 0 0 0     0 return "[$host]:$port" if $host =~ /:/ || $host =~ /%/;
24 0         0 return "$host:$port";
25             }
26             sub _split_host_port {
27 4     4   37 my $hostport = shift;
28 4 50       46 if ($hostport =~ /\A\[([^\]]+)\]:([0-9]+)\z/) {
29 0         0 return $1, $2;
30             }
31 4         31 return split /:/, $hostport;
32             }
33              
34             sub _new_on_connect_cb {
35 70     70   510 my ($self, $on_conn, $password, $name) = @_;
36 70         378 weaken $self;
37             my $handler = sub {
38             # If we are in PubSub mode we shouldn't perform any command besides
39             # (p)(un)subscribe
40 68 50   68   415 if (! $self->is_subscriber) {
41             defined $name
42             and try {
43 0         0 my $n = $name;
44 0 0       0 $n = $n->($self) if ref($n) eq 'CODE';
45 0 0       0 $self->client_setname($n) if defined $n;
46 68 50       200 };
47 68         187 my $data = $self->__get_data;
48             defined $data->{current_database}
49 68 50       618 and $self->select($data->{current_database});
50             }
51              
52 68         282 my $subscribers = $self->__get_data->{subscribers};
53 68         196 $self->__get_data->{subscribers} = {};
54 68         685 $self->__get_data->{cbs} = undef;
55 68         212 foreach my $topic (CORE::keys(%{$subscribers})) {
  68         382  
56 0 0       0 if ($topic =~ /(p?message):(.*)$/ ) {
57 0         0 my ($key, $channel) = ($1, $2);
58 0         0 my $subs = $subscribers->{$topic};
59 0 0       0 if ($key eq 'message') {
60 0         0 $self->__subscription_cmd('', 0, subscribe => $channel, $_) for @$subs;
61             } else {
62 0         0 $self->__subscription_cmd('p', 0, psubscribe => $channel, $_) for @$subs;
63             }
64             }
65             }
66              
67 68 50       552 defined $on_conn
68             and $on_conn->($self);
69 70         1822 };
70              
71             return sub {
72 68     68   686 my $reconnect_stash = $self->__get_reconnect;
73 68 50       305 if(defined $password) {
74 0         0 my $err;
75 0         0 $self->__set_reconnect(0);
76             try {
77 0         0 $self->auth($password);
78             } catch {
79 0         0 $err = $_;
80 0         0 };
81 0 0       0 if(defined $err) {
82 0 0       0 if($err =~ /ERR invalid password|WRONGPASS invalid username-password pair/) {
83             # password setting is incorrect, no need to reconnect
84 0         0 die("Redis server refused password");
85             } else {
86             # it might be network error
87             # invoke reconnect
88 0         0 $self->__set_reconnect($reconnect_stash);
89 0         0 return ;
90             }
91             }
92             }
93              
94             try {
95             # disable reconnection while executing on_connect handler
96 68         14460 $self->__set_reconnect(0);
97 68         191 $handler->();
98             } catch {
99 0         0 $self->quit();
100             } finally {
101 68         2474 $self->__set_reconnect($reconnect_stash);
102 68         3882 };
103 70         2657 };
104             }
105              
106             sub _new_reconnect_on_error_cb {
107 70     70   388 my ($self, $reconnect_on_error) = @_;
108 70         561 weaken $self;
109              
110 70 50       194 if ($reconnect_on_error) {
111             return sub {
112             # The unit should be second and the type should be double.
113             # -1 is a special value, it means that we do not reconnect.
114 0     0   0 my $next_reconnect_interval = $reconnect_on_error->(@_);
115 0 0       0 if ($next_reconnect_interval < -1) {
116 0         0 warn "reconnect_on_error must not return a number less than -1";
117              
118             # Reset a next_reconnect_interval and do not reconnect.
119 0         0 $next_reconnect_interval = -1;
120             }
121              
122             # Wait until next_reconnect_interval seconds elapse.
123 0         0 $self->__set_next_reconnect_on_error_at($next_reconnect_interval);
124              
125 0         0 my $need_reconnect = 0;
126 0 0       0 if (-1 < $next_reconnect_interval) {
127 0         0 $need_reconnect = 1;
128             }
129 0         0 return $need_reconnect;
130 0         0 };
131             } else {
132 70         366 return;
133             }
134             }
135              
136             sub new {
137 70     70 0 510818 my $class = shift;
138 70         1891 my %args = @_;
139 70         7085 my $self = $class->_new;
140              
141             ## Deal with REDIS_SERVER ENV
142 70 0 33     651 if ($ENV{REDIS_SERVER} && !$args{sock} && !$args{server}) {
      33        
143 0 0       0 if ($ENV{REDIS_SERVER} =~ m!^/!) {
    0          
    0          
144 0         0 $args{sock} = $ENV{REDIS_SERVER};
145             }
146             elsif ($ENV{REDIS_SERVER} =~ m!^unix:(.+)!) {
147 0         0 $args{sock} = $1;
148             }
149             elsif ($ENV{REDIS_SERVER} =~ m!^(tcp:)?(.+)!) {
150 0         0 $args{server} = $2;
151             }
152             }
153              
154 70         280 my $on_conn = $args{on_connect};
155 70         431 my $password = $args{password};
156 70         268 my $name = $args{name};
157 70         1015 $self->__set_on_connect($self->_new_on_connect_cb($on_conn, $password, $name));
158             $self->__set_data({
159             subscribers => {},
160             sentinels_cnx_timeout => $args{sentinels_cnx_timeout},
161             sentinels_read_timeout => $args{sentinels_read_timeout},
162             sentinels_write_timeout => $args{sentinels_write_timeout},
163             no_sentinels_list_update => $args{no_sentinels_list_update},
164 70         1591 });
165              
166 70 100       314 if ($args{sock}) {
    50          
167 66         748 $self->__connection_info_unix($args{sock});
168             } elsif ($args{sentinels}) {
169 0         0 my $sentinels = $args{sentinels};
170 0 0       0 ref $sentinels eq 'ARRAY'
171             or croak("'sentinels' param must be an ArrayRef");
172             defined($self->__get_data->{service} = $args{service})
173 0 0       0 or croak("Need 'service' name when using 'sentinels'!");
174 0         0 $self->__get_data->{sentinels} = $sentinels;
175 0         0 $self->__get_data->{sentinels_password} = $args{sentinels_password};
176             my $on_build_sock = sub {
177 0     0   0 my $data = $self->__get_data;
178 0         0 my $sentinels = $data->{sentinels};
179              
180             # try to connect to a sentinel
181 0         0 my $status;
182 0         0 foreach my $sentinel_address (@$sentinels) {
183 0 0       0 my $sentinel = eval {
184             Redis::Fast::Sentinel->new(
185             server => $sentinel_address,
186             password => $data->{sentinels_password},
187             cnx_timeout => ( exists $data->{sentinels_cnx_timeout}
188             ? $data->{sentinels_cnx_timeout} : 0.1),
189             read_timeout => ( exists $data->{sentinels_read_timeout}
190             ? $data->{sentinels_read_timeout} : 1 ),
191             write_timeout => ( exists $data->{sentinels_write_timeout}
192 0 0       0 ? $data->{sentinels_write_timeout} : 1 ),
    0          
    0          
193             )
194             } or next;
195 0         0 my $server_address = $sentinel->get_service_address($data->{service});
196 0 0 0     0 defined $server_address
197             or $status ||= "Sentinels don't know this service",
198             next;
199 0 0       0 $server_address eq 'IDONTKNOW'
200             and $status = "service is configured in one Sentinel, but was never reached",
201             next;
202              
203             # we found the service, set the server
204 0         0 my ($server, $port) = _split_host_port $server_address;
205 0         0 $self->__connection_info($server, $port);
206              
207 0 0       0 if (! $data->{no_sentinels_list_update} ) {
208             # move the elected sentinel at the front of the list and add
209             # additional sentinels
210 0         0 my $idx = 2;
211 0         0 my %h = ( ( map { $_ => $idx++ } @{$data->{sentinels}}),
  0         0  
  0         0  
212             $sentinel_address => 1,
213             );
214              
215             $data->{sentinels} = [
216 0         0 ( sort { $h{$a} <=> $h{$b} } CORE::keys(%h) ), # sorted existing sentinels,
217 0         0 grep { ! $h{$_}; } # list of unknown
218             map {
219 0         0 my $s = +{ @$_ };
220 0         0 _join_host_port($s->{ip}, $s->{port});
221             } # ip:port of
222             $sentinel->sentinel( # sentinels
223             sentinels => $data->{service} # for this service
224             )
225 0         0 ];
226             }
227             }
228 0         0 };
229 0         0 $self->__set_on_build_sock($on_build_sock);
230             } else {
231 4   50     46 my ($server, $port) = _split_host_port($args{server} || '127.0.0.1:6379');
232 4         28 $self->__connection_info($server, $port);
233             }
234              
235             #$self->{is_subscriber} = 0;
236             #$self->{subscribers} = {};
237 70   100     507 $self->__set_reconnect($args{reconnect} || 0);
238 70   100     2144 $self->__set_every($args{every} || 1000);
239 70 50 33     939 $self->__set_debug(($args{debug} || $ENV{REDIS_DEBUG}) ? 1 : 0);
240 70   100     1207 $self->__set_cnx_timeout($args{cnx_timeout} || -1);
241 70   100     705 $self->__set_read_timeout($args{read_timeout} || -1);
242 70   100     377 $self->__set_write_timeout($args{write_timeout} || -1);
243              
244 70 50       838 if (my $cb = $self->_new_reconnect_on_error_cb($args{reconnect_on_error})) {
245 0         0 $self->__set_reconnect_on_error($cb);
246             }
247              
248 70 100       470457 $self->connect unless $args{no_auto_connect_on_new};
249              
250 69         3587 return $self;
251             }
252              
253              
254              
255             ### Deal with common, general case, Redis commands
256             our $AUTOLOAD;
257              
258             sub AUTOLOAD {
259 13     13   3700 my $command = $AUTOLOAD;
260 13         292 $command =~ s/.*://;
261 13         78 my @command = split /_/, uc $command;
262              
263             my $method = sub {
264 68     68   2126 my $self = shift;
265 68         825 $self->__is_valid_command($command);
266 68         1212116 my ($ret, $error) = $self->__std_cmd(@command, @_);
267 68 100       10209 confess "[$command] $error, " if defined $error;
268 54 100 66     2385 return (wantarray && ref $ret eq 'ARRAY') ? @$ret : $ret;
269 13         243 };
270              
271             # Save this method for future calls
272 38     38   433 no strict 'refs';
  38         84  
  38         58273  
273 13         385 *$AUTOLOAD = $method;
274              
275 13         77 goto $method;
276             }
277              
278             sub __with_reconnect {
279 0     0   0 my ($self, $cb) = @_;
280              
281 0         0 confess "not implemented";
282             }
283              
284              
285             ### Commands with extra logic
286              
287             sub keys {
288 0     0 0 0 my $self = shift;
289 0         0 $self->__is_valid_command('keys');
290 0         0 my ($ret, $error) = $self->__keys(@_);
291 0 0       0 confess "[keys] $error, " if defined $error;
292 0 0       0 return $ret unless ref $ret eq 'ARRAY';
293 0         0 return @$ret;
294             }
295              
296             sub ping {
297 0     0 0 0 my $self = shift;
298 0         0 $self->__is_valid_command('ping');
299 0 0       0 return unless $self->__sock;
300             return scalar try {
301 0     0   0 my ($ret, $error) = $self->__std_cmd('ping');
302 0 0       0 return if defined $error;
303 0         0 return $ret;
304             } catch {
305 0     0   0 return ;
306 0         0 };
307             }
308              
309             sub info {
310 3     3 0 132 my $self = shift;
311 3         9 $self->__is_valid_command('info');
312 3         16 my ($ret, $error) = $self->__info(@_);
313 3 50       392 confess "[info] $error, " if defined $error;
314 0 0       0 return $ret unless ref $ret eq 'ARRAY';
315 0         0 return @$ret;
316             }
317              
318             sub quit {
319 0     0 0 0 my $self = shift;
320 0         0 $self->__is_valid_command('quit');
321 0         0 $self->__quit(@_);
322             }
323              
324             sub shutdown {
325 0     0 0 0 my $self = shift;
326 0         0 $self->__is_valid_command('shutdown');
327 0         0 $self->__shutdown(@_);
328             }
329              
330             sub select {
331 0     0 0 0 my $self = shift;
332 0         0 my $database = shift;
333 0         0 $self->__is_valid_command('select');
334 0         0 my ($ret, $error) = $self->__std_cmd('SELECT', $database, @_);
335 0 0       0 confess "[SELECT] $error, " if defined $error;
336 0         0 $self->__get_data->{current_database} = $database;
337 0         0 return $ret;
338             }
339              
340             sub __subscription_cmd {
341 0     0   0 my $self = shift;
342 0         0 my $pr = shift;
343 0         0 my $unsub = shift;
344 0         0 my $command = shift;
345 0         0 my $cb = pop;
346 0         0 weaken $self;
347              
348 0 0       0 confess("Missing required callback in call to $command(), ")
349             unless ref($cb) eq 'CODE';
350              
351 0         0 $self->wait_all_responses;
352              
353 0         0 while($self->__get_data->{cbs}) {
354 0         0 $self->__wait_for_event(1);
355             }
356              
357 0         0 my @subs = @_;
358 0 0       0 @subs = $self->__process_unsubscribe_requests($cb, $pr, @subs)
359             if $unsub;
360              
361 0 0       0 if(@subs) {
362 0         0 $self->__get_data->{cbs} = { map { ("${pr}message:$_" => $cb) } @subs };
  0         0  
363 0         0 for my $sub(@subs) {
364 0         0 $self->__send_subscription_cmd(
365             $command,
366             $sub,
367             $self->__subscription_callbak,
368             );
369             }
370 0         0 while($self->__get_data->{cbs}) {
371 0         0 $self->__wait_for_event(1);
372             }
373             }
374             }
375              
376             sub __subscription_callbak {
377 0     0   0 my $self = shift;
378 0         0 my $cb = $self->__get_data->{callback};
379 0 0       0 return $cb if $cb;
380              
381 0         0 weaken $self;
382             $cb = sub {
383 0     0   0 my $cbs = $self->__get_data->{cbs};
384 0 0       0 if($cbs) {
385 0         0 $self->__process_subscription_changes($cbs, @_);
386 0 0       0 unless(%$cbs) {
387 0         0 $self->__get_data->{cbs} = undef;
388             }
389             } else {
390 0         0 $self->__process_pubsub_msg(@_);
391             }
392 0         0 };
393              
394 0         0 $self->__get_data->{callback} = $cb;
395 0         0 return $cb;
396             }
397              
398 0     0 0 0 sub subscribe { shift->__subscription_cmd('', 0, subscribe => @_) }
399 0     0 0 0 sub psubscribe { shift->__subscription_cmd('p', 0, psubscribe => @_) }
400 0     0 0 0 sub unsubscribe { shift->__subscription_cmd('', 1, unsubscribe => @_) }
401 0     0 0 0 sub punsubscribe { shift->__subscription_cmd('p', 1, punsubscribe => @_) }
402              
403             sub __process_unsubscribe_requests {
404 0     0   0 my ($self, $cb, $pr, @unsubs) = @_;
405 0         0 my $subs = $self->__get_data->{subscribers};
406              
407 0         0 my @subs_to_unsubscribe;
408 0         0 for my $sub (@unsubs) {
409 0         0 my $key = "${pr}message:$sub";
410 0 0 0     0 next unless $subs->{$key} && @{ $subs->{$key} };
  0         0  
411 0         0 my $cbs = $subs->{$key} = [grep { $_ ne $cb } @{ $subs->{$key} }];
  0         0  
  0         0  
412 0 0       0 next if @$cbs;
413              
414 0         0 delete $subs->{$key};
415 0         0 push @subs_to_unsubscribe, $sub;
416             }
417 0         0 return @subs_to_unsubscribe;
418             }
419              
420             sub __process_subscription_changes {
421 0     0   0 my ($self, $expected, $m, $error) = @_;
422 0         0 my $subs = $self->__get_data->{subscribers};
423              
424             ## Deal with pending PUBLISH'ed messages
425 0 0       0 if ($m->[0] =~ /^p?message$/) {
426 0         0 $self->__process_pubsub_msg($m);
427 0         0 return ;
428             }
429              
430 0         0 my ($key, $unsub) = $m->[0] =~ m/^(p)?(un)?subscribe$/;
431 0         0 $key .= "message:$m->[1]";
432 0         0 my $cb = delete $expected->{$key};
433              
434 0 0       0 push @{ $subs->{$key} }, $cb unless $unsub;
  0         0  
435             }
436              
437             sub __process_pubsub_msg {
438 0     0   0 my ($self, $m) = @_;
439 0         0 my $subs = $self->__get_data->{subscribers};
440              
441 0         0 my $sub = $m->[1];
442 0         0 my $cbid = "$m->[0]:$sub";
443 0         0 my $data = pop @$m;
444 0 0       0 my $topic = defined $m->[2] ? $m->[2] : $sub;
445              
446 0 0       0 if (!exists $subs->{$cbid}) {
447 0         0 warn "Message for topic '$topic' ($cbid) without expected callback, ";
448 0         0 return 0;
449             }
450              
451 0         0 $_->($data, $topic, $sub) for @{ $subs->{$cbid} };
  0         0  
452              
453 0         0 return 1;
454              
455             }
456              
457             sub __is_valid_command {
458 71     71   465 my ($self, $cmd) = @_;
459              
460 71 50       491 confess("Cannot use command '$cmd' while in SUBSCRIBE mode, ")
461             if $self->is_subscriber;
462             }
463              
464             1; # End of Redis.pm
465              
466             __END__