File Coverage

blib/lib/AnyEvent/Redis.pm
Criterion Covered Total %
statement 30 204 14.7
branch 0 104 0.0
condition 0 64 0.0
subroutine 10 19 52.6
pod 0 5 0.0
total 40 396 10.1


line stmt bran cond sub pod time code
1             package AnyEvent::Redis;
2              
3 8     8   955948 use strict;
  8         21  
  8         284  
4 8     8   217 use 5.008_001;
  8         26  
  8         502  
5             our $VERSION = '0.24';
6              
7 8     8   43 use constant DEBUG => $ENV{ANYEVENT_REDIS_DEBUG};
  8         172  
  8         549  
8 8     8   13843 use AnyEvent;
  8         65758  
  8         371  
9 8     8   12070 use AnyEvent::Handle;
  8         150931  
  8         342  
10 8     8   10122 use AnyEvent::Socket;
  8         160501  
  8         1447  
11 8     8   6116 use AnyEvent::Redis::Protocol;
  8         24  
  8         361  
12 8     8   55 use Carp qw( croak confess );
  8         14  
  8         498  
13 8     8   8695 use Encode ();
  8         110899  
  8         283  
14 8     8   80 use Scalar::Util qw(weaken);
  8         19  
  8         22639  
15              
16             our $AUTOLOAD;
17              
18             sub new {
19 0     0 0   my ($class, %args) = @_;
20              
21 0   0       my $host = delete $args{host} || '127.0.0.1';
22 0   0       my $port = delete $args{port} || 6379;
23              
24 0 0         if (my $encoding = $args{encoding}) {
25 0           $args{encoding} = Encode::find_encoding($encoding);
26 0 0         croak qq{Encoding "$encoding" not found} unless ref $args{encoding};
27             }
28              
29             bless {
30 0           host => $host,
31             port => $port,
32             pending_cvs => [],
33             %args,
34             }, $class;
35             }
36              
37             sub run_cmd {
38 0     0 0   my $self = shift;
39 0           my $cmd = shift;
40              
41 0 0         $self->{cmd_cb} or return $self->connect($cmd, @_);
42 0           $self->{cmd_cb}->($cmd, @_);
43             }
44              
45 0     0     sub DESTROY { }
46              
47             sub AUTOLOAD {
48 0     0     my $self = shift;
49 0           (my $method = $AUTOLOAD) =~ s/.*:://;
50 0           $self->run_cmd($method, @_);
51             }
52              
53             sub all_cv {
54 0     0 0   my $self = shift;
55 0 0         $self->{all_cv} = shift if @_;
56 0   0       $self->{all_cv} ||= AE::cv;
57             }
58              
59             sub cleanup {
60 0     0 0   my $self = shift;
61 0           delete $self->{cmd_cb};
62 0           delete $self->{sock};
63 0 0         $self->{on_error}->(@_) if $self->{on_error};
64 0 0         $self->{on_cleanup}->(@_) if $self->{on_cleanup};
65 0 0         for (splice(@{$self->{pending_cvs}}),
  0            
  0            
66             splice(@{$self->{multi_cvs} || []}))
67             {
68 0           eval { $_->croak(@_) };
  0            
69 0 0         warn "Exception in cleanup callback (ignored): $@" if $@;
70             }
71 0           return;
72             }
73              
74             sub connect {
75 0     0 0   my $self = shift;
76              
77 0           my $cv;
78 0 0         if (@_) {
79 0 0         $cv = pop if UNIVERSAL::isa($_[-1], 'AnyEvent::CondVar');
80 0   0       $cv ||= AE::cv;
81 0           push @{$self->{connect_queue}}, [ $cv, @_ ];
  0            
82             }
83              
84 0 0         return $cv if $self->{sock};
85 0           weaken $self;
86              
87             $self->{sock} = tcp_connect $self->{host}, $self->{port}, sub {
88             my $fh = shift
89 0 0   0     or do {
90 0           my $err = "Can't connect Redis server: $!";
91 0           $self->cleanup($err);
92 0           eval { $cv->croak($err) };
  0            
93 0 0         warn "Exception in connect failure callback (ignored): $@" if $@;
94             return
95 0           };
96              
97 0           binmode $fh; # ensure bytes until we decode
98              
99             my $hd = AnyEvent::Handle->new(
100             fh => $fh,
101 0           on_error => sub { $_[0]->destroy;
102 0 0         $self->cleanup($_[2]) if $_[1];
103             },
104 0           on_eof => sub { $_[0]->destroy;
105 0           $self->cleanup('connection closed');
106             },
107 0           encoding => $self->{encoding},
108             );
109              
110             $self->{cmd_cb} = sub {
111 0           my $command = lc shift;
112 0           my $is_pubsub = $command =~ /^p?(?:un)?subscribe\z/;
113 0           my $is_subscribe = $command =~ /^p?subscribe\z/;
114              
115              
116             # Are we already subscribed to anything?
117 0 0 0       if ($self->{sub} && %{$self->{sub}}) {
  0            
118 0 0         croak "Use of non-pubsub command during pubsub session may result in unexpected behaviour"
119             unless $is_pubsub;
120             }
121             # Are we already in a transaction?
122 0 0         if ($self->{multi_write}) {
123 0 0 0       croak "Use of pubsub or multi command in transaction is not supported"
124             if $is_pubsub || $command eq 'multi';
125             } else {
126 0 0         croak "Can't 'exec' a transaction because none is pending"
127             if $command eq 'exec';
128             }
129              
130 0           my ($cv, $cb);
131 0 0         if (@_) {
132 0 0 0       $cv = pop if ref $_[-1] && UNIVERSAL::isa($_[-1], 'AnyEvent::CondVar');
133 0 0         $cb = pop if ref $_[-1] eq 'CODE';
134             }
135 0   0       $cv ||= AE::cv;
136 0 0 0       croak "Must provide a CODE reference for subscriptions" if $is_subscribe && !$cb;
137              
138 0           my $send = join("\r\n",
139             "*" . (1 + @_),
140 0 0 0       map { ('$' . length $_ => $_) }
141 0           (uc($command), map { $self->{encoding} && length($_)
142             ? $self->{encoding}->encode($_)
143             : $_ } @_))
144             . "\r\n";
145              
146 0           warn $send if DEBUG;
147              
148             # $self is weakened to avoid leaks, hold on to a strong copy
149             # controlled via a CV.
150 0           my $cmd_cv = AE::cv;
151             $cmd_cv->cb(sub {
152 0           my $strong_self = $self;
153 0           });
154              
155             # pubsub is very different - get it out of the way first
156              
157 0 0         if ($is_pubsub) {
158              
159 0           $hd->push_write($send);
160              
161 0   0       my $already = $self->{sub} && %{$self->{sub}};
162              
163 0 0         if ($is_subscribe) {
164 0   0       $self->{sub}->{$_} ||= [$cv, $cb] for @_;
165             }
166              
167 0 0 0       if (!$already && @_) {
168 0           my $res_cb; $res_cb = sub {
169             $hd->push_read("AnyEvent::Redis::Protocol" => sub {
170 0           my ($res, $err) = @_;
171              
172 0 0         if (ref $res) {
173 0           my $action = lc $res->[0];
174 0           warn "$action $res->[1]" if DEBUG;
175              
176 0 0 0       if ($action eq 'message') {
    0 0        
    0          
    0          
177 0           $self->{sub}->{$res->[1]}[1]->($res->[2], $res->[1]);
178              
179             } elsif ($action eq 'pmessage') {
180 0           $self->{sub}->{$res->[1]}[1]->($res->[3], $res->[2], $res->[1]);
181              
182             } elsif ($action eq 'subscribe' || $action eq 'psubscribe') {
183 0           $self->{sub_count} = $res->[2];
184              
185             } elsif ($action eq 'unsubscribe' || $action eq 'punsubscribe') {
186 0           $self->{sub_count} = $res->[2];
187 0           eval { $self->{sub}->{$res->[1]}[0]->send };
  0            
188 0 0         warn "Exception in callback (ignored): $@" if $@;
189 0           delete $self->{sub}->{$res->[1]};
190 0           $self->all_cv->end;
191 0           $cmd_cv->send;
192              
193             } else {
194 0           warn "Unknown pubsub action: $action";
195             }
196             }
197              
198 0 0 0       if ($self->{sub_count} || %{$self->{sub}}) {
  0            
199             # Carry on reading while we are subscribed
200 0           $res_cb->();
201             }
202 0           });
203 0           };
204              
205 0           $res_cb->();
206             }
207              
208 0           return $cv;
209             }
210              
211             # non-pubsub from here on out
212              
213             $cv->cb(sub {
214 0           my $cv = shift;
215 0           local $@;
216 0           eval {
217 0           my $res = $cv->recv;
218 0           $cb->($res);
219             };
220 0 0         if ($@) {
221 0   0       ($self->{on_error} || sub { die @_ })->(my $err = $@);
222             }
223 0 0         }) if $cb;
224              
225 0           $self->all_cv->begin;
226 0           push @{$self->{pending_cvs}}, $cv;
  0            
227              
228 0           $hd->push_write($send);
229              
230 0 0         if ($command eq 'exec') {
    0          
231              
232             # at end of transaction, expect bulk reply possibly including errors
233             $hd->push_read("AnyEvent::Redis::Protocol" => sub {
234 0           my ($res, $err) = @_;
235              
236 0           $self->_expect($cv);
237              
238 0           my @mcvs = splice @{$self->{multi_cvs}};
  0            
239              
240 0 0 0       if ($err || ref($res) ne 'ARRAY') {
241 0           for ($cv, @mcvs) {
242 0           eval { $_->croak($res, 1) };
  0            
243 0 0         warn "Exception in callback (ignored): $@" if $@;
244             }
245             } else {
246 0           for my $i (0 .. $#mcvs) {
247 0           my $r = $res->[$i];
248 0           eval {
249 0 0 0       ref($r) && UNIVERSAL::isa($r, 'AnyEvent::Redis::Error')
250             ? $mcvs[$i]->croak($$r)
251             : $mcvs[$i]->send($r);
252             };
253 0 0         warn "Exception in callback (ignored): $@" if $@;
254             }
255 0           eval { $cv->send($res) };
  0            
256 0 0         warn "Exception in callback (ignored): $@" if $@;
257             }
258              
259 0           $self->all_cv->end;
260 0           $cmd_cv->send;
261 0           });
262              
263 0           delete $self->{multi_write};
264              
265             } elsif ($self->{multi_write}) {
266              
267             # in transaction, expect only "QUEUED"
268             $hd->push_read("AnyEvent::Redis::Protocol" => sub {
269 0           my ($res, $err) = @_;
270              
271 0           $self->_expect($cv);
272              
273 0 0 0       if (!$err && $res eq 'QUEUED') {
274 0           push @{$self->{multi_cvs}}, $cv;
  0            
275             }
276             else {
277 0           eval { $cv->croak($res) };
  0            
278 0 0         warn "Exception in callback (ignored): $@" if $@;
279             }
280              
281 0           $self->all_cv->end;
282 0           $cmd_cv->send;
283 0           });
284              
285             } else {
286              
287             $hd->push_read("AnyEvent::Redis::Protocol" => sub {
288 0           my ($res, $err) = @_;
289              
290 0           $self->_expect($cv);
291              
292 0 0 0       if ($command eq 'info') {
    0          
293 0           $res = { map { split /:/, $_, 2 } grep !/^#/, split /\r\n/, $res };
  0            
294             } elsif ($command eq 'keys' && !ref $res) {
295             # Older versions of Redis (1.2) need this
296 0           $res = [split / /, $res];
297             }
298              
299 0 0         eval { $err ? $cv->croak($res) : $cv->send($res) };
  0            
300 0 0         warn "Exception in callback (ignored): $@" if $@;
301              
302 0           $self->all_cv->end;
303 0           $cmd_cv->send;
304 0           });
305              
306 0 0         $self->{multi_write} = 1 if $command eq 'multi';
307              
308             }
309              
310 0           return $cv;
311 0           };
312              
313 0   0       my $queue = delete $self->{connect_queue} || [];
314 0           for my $command (@$queue) {
315 0           my($cv, @args) = @$command;
316 0           $self->{cmd_cb}->(@args, $cv);
317             }
318              
319 0           };
320              
321 0           return $cv;
322             }
323              
324             sub _expect {
325 0     0     my ($self, $cv) = @_;
326 0 0         my $p = shift @{$self->{pending_cvs} || []};
  0            
327 0 0 0       $p && $p == $cv or confess "BUG: mismatched CVs";
328             }
329              
330             1;
331             __END__