File Coverage

blib/lib/AnyEvent/Redis.pm
Criterion Covered Total %
statement 24 173 13.8
branch 0 88 0.0
condition 0 31 0.0
subroutine 8 18 44.4
pod 0 6 0.0
total 32 316 10.1


line stmt bran cond sub pod time code
1             package AnyEvent::Redis;
2              
3 4     4   313067 use strict;
  4         8  
  4         150  
4 4     4   92 use 5.008_001;
  4         11  
  4         234  
5             our $VERSION = '0.19_01';
6              
7 4     4   21 use constant DEBUG => $ENV{ANYEVENT_REDIS_DEBUG};
  4         11  
  4         304  
8 4     4   5393 use AnyEvent;
  4         21506  
  4         153  
9 4     4   4036 use AnyEvent::Handle;
  4         64798  
  4         170  
10 4     4   3059 use AnyEvent::Socket;
  4         64649  
  4         626  
11 4     4   2788 use Try::Tiny;
  4         6615  
  4         281  
12 4     4   28 use Carp qw(croak);
  4         7  
  4         9733  
13              
14             our $AUTOLOAD;
15              
16             sub new {
17 0     0 0   my($class, %args) = @_;
18              
19 0   0       my $host = delete $args{host} || '127.0.0.1';
20 0   0       my $port = delete $args{port} || 6379;
21              
22 0           bless {
23             host => $host,
24             port => $port,
25             %args,
26             }, $class;
27             }
28              
29             sub run_cmd {
30 0     0 0   my $self = shift;
31 0           my $cmd = shift;
32              
33 0 0         $self->{cmd_cb} or return $self->connect($cmd, @_);
34 0           $self->{cmd_cb}->($cmd, @_);
35             }
36              
37 0     0     sub DESTROY { }
38              
39             sub AUTOLOAD {
40 0     0     my $self = shift;
41 0           (my $method = $AUTOLOAD) =~ s/.*:://;
42 0           $self->run_cmd($method, @_);
43             }
44              
45             sub all_cv {
46 0     0 0   my $self = shift;
47 0 0         $self->{all_cv} = shift if @_;
48 0 0         unless ($self->{all_cv}) {
49 0           $self->{all_cv} = AE::cv;
50             }
51 0           $self->{all_cv};
52             }
53              
54             sub cleanup {
55 0     0 0   my $self = shift;
56 0           delete $self->{cmd_cb};
57 0           delete $self->{sock};
58 0           $self->{on_error}->(@_);
59             }
60              
61             sub connect {
62 0     0 0   my $self = shift;
63              
64 0           my $cv;
65 0 0         if (@_) {
66 0           $cv = AE::cv;
67 0           push @{$self->{connect_queue}}, [ $cv, @_ ];
  0            
68             }
69              
70 0 0         return $cv if $self->{sock};
71              
72             $self->{sock} = tcp_connect $self->{host}, $self->{port}, sub {
73             my $fh = shift
74 0 0   0     or do {
75 0           $cv->croak("Can't connect Redis server: $!");
76             return
77 0           };
78              
79             my $hd = AnyEvent::Handle->new(
80             fh => $fh,
81 0           on_error => sub { $_[0]->destroy;
82 0 0         if ($_[1]) {
83 0           $self->cleanup($_[2]);
84             }
85             },
86 0           on_eof => sub { $_[0]->destroy;
87 0           $self->cleanup('connection closed');
88             },
89 0           );
90              
91             $self->{cmd_cb} = sub {
92 0           $self->all_cv->begin;
93 0           my $command = shift;
94              
95 0           my($cv, $cb);
96 0 0         if (@_) {
97 0 0         $cv = pop if UNIVERSAL::isa($_[-1], 'AnyEvent::CondVar');
98 0 0         $cb = pop if ref $_[-1] eq 'CODE';
99             }
100              
101 0           my $send = join("\r\n",
102             "*" . (1 + @_),
103             map(('$' . length $_ => $_), uc($command), @_))
104             . "\r\n";
105              
106 0           warn $send if DEBUG;
107              
108 0   0       $cv ||= AE::cv;
109              
110 0           $hd->push_write($send);
111              
112             # Are we already subscribed to anything?
113 0 0 0       if($self->{sub} && %{$self->{sub}}) {
  0 0          
114              
115 0 0         croak "Use of non-pubsub command during pubsub session may result in unexpected behaviour"
116             unless $command =~ /^p?(?:un)?subscribe$/i;
117              
118             # Remember subscriptions
119 0   0       $self->{sub}->{$_} ||= [$cv, $cb] for @_;
120              
121             } elsif ($command !~ /^p?subscribe$/i) {
122              
123             $cv->cb(sub {
124 0           my $cv = shift;
125             try {
126 0           my $res = $cv->recv;
127 0           $cb->($res);
128             } catch {
129 0   0       ($self->{on_error} || sub { die @_ })->($_);
130             }
131 0 0         }) if $cb;
  0            
132              
133             $hd->push_read(ref $self => sub {
134 0           my($res, $err) = @_;
135              
136 0 0 0       if($command eq 'info') {
    0          
137 0           $res = { map { split /:/, $_, 2 } split /\r\n/, $res };
  0            
138             } elsif($command eq 'keys' && !ref $res) {
139             # Older versions of Redis (1.2) need this
140 0           $res = [split / /, $res];
141             }
142              
143 0           $self->all_cv->end;
144 0 0         $err ? $cv->croak($res) : $cv->send($res);
145 0           });
146              
147             } else {
148 0 0         croak "Must provide a CODE reference for subscriptions" unless $cb;
149              
150             # Remember subscriptions
151 0   0       $self->{sub}->{$_} ||= [$cv, $cb] for @_;
152              
153 0           my $res_cb; $res_cb = sub {
154              
155             $hd->push_read(ref $self => sub {
156 0           my($res, $err) = @_;
157              
158 0 0         if(ref $res) {
159 0           my $action = lc $res->[0];
160 0           warn "$action $res->[1]" if DEBUG;
161              
162 0 0 0       if($action eq 'message') {
    0 0        
    0          
    0          
163 0           $self->{sub}->{$res->[1]}[1]->($res->[2], $res->[1]);
164              
165             } elsif($action eq 'pmessage') {
166 0           $self->{sub}->{$res->[1]}[1]->($res->[3], $res->[2], $res->[1]);
167              
168             } elsif($action eq 'subscribe' || $action eq 'psubscribe') {
169 0           $self->{sub_count} = $res->[2];
170              
171             } elsif($action eq 'unsubscribe' || $action eq 'punsubscribe') {
172 0           $self->{sub_count} = $res->[2];
173 0           $self->{sub}->{$res->[1]}[0]->send;
174 0           delete $self->{sub}->{$res->[1]};
175 0           $self->all_cv->end;
176              
177             } else {
178 0           warn "Unknown pubsub action: $action";
179             }
180             }
181              
182 0 0 0       if($self->{sub_count} || %{$self->{sub}}) {
  0            
183             # Carry on reading while we are subscribed
184 0           $res_cb->();
185             }
186 0           });
187 0           };
188              
189 0           $res_cb->();
190             }
191              
192 0           return $cv;
193 0           };
194              
195 0 0         for my $queue (@{$self->{connect_queue} || []}) {
  0            
196 0           my($cv, @args) = @$queue;
197 0           $self->{cmd_cb}->(@args, $cv);
198             }
199              
200 0           };
201              
202 0           return $cv;
203             }
204              
205             sub anyevent_read_type {
206 0     0 0   my(undef, $cb) = @_;
207              
208             sub {
209 0     0     my($hd) = @_;
210              
211 0 0         return unless defined $hd->{rbuf};
212              
213 0 0         if($hd->{rbuf} =~ /^[-+:]/) {
    0          
    0          
    0          
214 0 0         $hd->{rbuf} =~ s/^([-+:])([^\015\012]*)\015?\012// or return;
215              
216 0           $cb->($2, $1 eq '-');
217              
218 0           return 1;
219              
220             } elsif($hd->{rbuf} =~ /^\$/) {
221 0 0         $hd->{rbuf} =~ s/^\$([-0-9]+)\015?\012// or return;
222 0           my $len = $1;
223              
224 0 0         if($len < 0) {
    0          
225 0           $cb->(undef);
226             } elsif($len + 2 <= length $hd->{rbuf}) {
227 0           $cb->(substr $hd->{rbuf}, 0, $len);
228             # Remove ending newline
229 0           substr $hd->{rbuf}, 0, $len + 2, "";
230             } else {
231             $hd->unshift_read (chunk => $len + 2, sub {
232 0           $cb->(substr $_[1], 0, $len);
233 0           });
234             }
235              
236 0           return 1;
237              
238             } elsif($hd->{rbuf} =~ /^\*/) {
239              
240 0 0         $hd->{rbuf} =~ s/^\*([-0-9]+)\015?\012// or return;
241 0           my $size = $1;
242 0           my @lines;
243              
244             my $reader; $reader = sub {
245 0           my($hd) = @_;
246              
247 0           while(@lines < $size) {
248 0 0         if($hd->{rbuf} =~ /^([\$\-+:])([^\012\015]+)\015?\012/) {
    0          
249 0           my $type = $1;
250 0           my $line = $2;
251              
252 0 0         if($type =~ /[-+:]/) {
    0          
    0          
253 0           $hd->{rbuf} =~ s/^[^\012\015]+\015?\012//;
254 0           push @lines, $line;
255             } elsif($line < 0) {
256 0           $hd->{rbuf} =~ s/^[^\012\015]+\015?\012//;
257 0           push @lines, undef;
258              
259             } elsif(2 + $line <= length $hd->{rbuf}) {
260 0           $hd->{rbuf} =~ s/^[^\012\015]+\015?\012//;
261 0           push @lines, substr $hd->{rbuf}, 0, $line, "";
262 0           $hd->{rbuf} =~ s/^\015?\012//;
263              
264             } else {
265             # Data not buffered, so we need to do this async
266 0           $hd->unshift_read($reader);
267 0           return 1;
268             }
269             } elsif($hd->{rbuf} =~ /^\*/) { # Nested
270              
271             $hd->unshift_read(__PACKAGE__, sub {
272 0           push @lines, $_[0];
273              
274 0 0         if(@lines == $size) {
275 0           $cb->(\@lines);
276             } else {
277 0           $hd->unshift_read($reader);
278             }
279 0           return 1;
280 0           });
281 0           return 1;
282             } else {
283 0           $hd->unshift_read($reader);
284             }
285             }
286              
287 0 0 0       if($size < 0 || @lines == $size) {
288 0 0         $cb->($size < 0 ? undef : \@lines);
289 0           return 1;
290             }
291              
292 0           return;
293 0           };
294              
295 0           return $reader->($hd);
296              
297             } elsif(length $hd->{rbuf}) {
298             # remove extra lines
299 0           $hd->{rbuf} =~ s/^\015?\012//g;
300              
301 0 0         if(length $hd->{rbuf}) {
302             # Unknown
303 0           $cb->("Unknown type", 1);
304 0           return 1;
305             }
306             }
307              
308 0           return;
309             }
310 0           }
311              
312             1;
313             __END__