File Coverage

blib/lib/MojoX/Redis.pm
Criterion Covered Total %
statement 21 130 16.1
branch 0 32 0.0
condition 0 20 0.0
subroutine 7 22 31.8
pod 3 5 60.0
total 31 209 14.8


line stmt bran cond sub pod time code
1             package MojoX::Redis;
2              
3 2     2   279086 use strict;
  2         5  
  2         60  
4 2     2   10 use warnings;
  2         4  
  2         92  
5              
6             our $VERSION = 0.86;
7 2     2   9 use base 'Mojo::Base';
  2         7  
  2         198  
8              
9 2     2   12 use Mojo::IOLoop;
  2         3  
  2         16  
10 2     2   52 use List::Util ();
  2         4  
  2         30  
11 2     2   10 use Scalar::Util ();
  2         2  
  2         40  
12 2     2   11 use Encode ();
  2         3  
  2         5022  
13             require Carp;
14              
15             __PACKAGE__->attr(server => '127.0.0.1:6379');
16             __PACKAGE__->attr(ioloop => sub { Mojo::IOLoop->singleton });
17             __PACKAGE__->attr(error => undef);
18             __PACKAGE__->attr(timeout => 300);
19             __PACKAGE__->attr(encoding => 'UTF-8');
20             __PACKAGE__->attr(
21             on_error => sub {
22             sub {
23             my $redis = shift;
24             warn "Redis error: ", $redis->error, "\n";
25             }
26             }
27             );
28              
29             __PACKAGE__->attr(
30             protocol_redis => sub {
31             require Protocol::Redis;
32             "Protocol::Redis";
33             }
34             );
35              
36             our @COMMANDS = qw/
37             append auth bgrewriteaof bgsave blpop brpop brpoplpush config_get config_set
38             config_resetstat dbsize debug_object debug_segfault decr decrby del discard
39             echo exec exists expire expireat flushall flushdb get getbit getrange getset
40             hdel hexists hget hgetall hincrby hkeys hlen hmget hmset hset hsetnx hvals
41             incr incrby info keys lastsave lindex linsert llen lpop lpush lpushx lrange
42             lrem lset ltrim mget monitor move mset msetnx multi persist ping psubscribe
43             publish punsubscribe quit randomkey rename renamenx rpop rpoplpush rpush
44             rpushx sadd save scard sdiff sdiffstore select set setbit setex setnx
45             setrange shutdown sinter sinterstore sismember slaveof smembers smove sort
46             spop srandmember srem strlen subscribe sunion sunionstore sync ttl type
47             unsubscribe unwatch watch zadd zcard zcount zincrby zinterstore zrange
48             zrangebyscore zrank zrem zremrangebyrank zremrangebyscore zrevrange
49             zrevrangebyscore zrevrank zscore zunionstore
50             /;
51              
52             sub AUTOLOAD {
53 0     0     my ($package, $cmd) = our $AUTOLOAD =~ /^([\w\:]+)\:\:(\w+)$/;
54              
55             Carp::croak(qq|Can't locate object method "$cmd" via "$package"|)
56 0 0   0     unless List::Util::first { $_ eq $cmd } @COMMANDS;
  0            
57              
58 0           my $self = shift;
59              
60 0           my $args = [@_];
61 0           my $cb = $args->[-1];
62 0 0         if (ref $cb ne 'CODE') {
63 0           $cb = undef;
64             }
65             else {
66 0           pop @$args;
67             }
68              
69 0           $self->execute($cmd, $args, $cb);
70             }
71              
72             sub DESTROY {
73 0     0     my $self = shift;
74              
75             # Loop
76 0 0         return unless my $loop = $self->ioloop;
77              
78             # Cleanup connection
79 0 0         $loop->remove($self->{_connection})
80             if $self->{_connection};
81             }
82              
83             sub connect {
84 0     0 1   my $self = shift;
85              
86             # drop old connection
87 0 0         if ($self->connected) {
88 0           $self->ioloop->remove($self->{_connection});
89             }
90              
91 0           $self->server =~ m{^([^:]+)(:(\d+))?};
92 0           my $address = $1;
93 0   0       my $port = $3 || 6379;
94              
95 0           Scalar::Util::weaken $self;
96              
97 0           $self->{_protocol} = $self->_create_protocol;
98              
99             # connect
100 0           $self->{_connecting} = 1;
101             $self->{_connection} = $self->ioloop->client(
102             { address => $address,
103             port => $port
104             },
105             sub {
106 0     0     my ($loop, $err, $stream) = @_;
107              
108              
109 0           delete $self->{_connecting};
110 0           $stream->timeout($self->timeout);
111 0           $self->_send_next_message;
112              
113             $stream->on(
114             read => sub {
115 0           my ($stream, $chunk) = @_;
116 0           $self->{_protocol}->parse($chunk);
117             }
118 0           );
119             $stream->on(
120             close => sub {
121 0           my $str = shift;
122 0   0       $self->{error} ||= 'disconnected';
123 0           $self->_inform_queue;
124              
125 0           delete $self->{_message_queue};
126              
127 0           delete $self->{_connecting};
128 0           delete $self->{_connection};
129             }
130 0           );
131             $stream->on(
132             error => sub {
133 0           my ($str, $error) = @_;
134 0           $self->error($error);
135 0           $self->_inform_queue;
136              
137 0           $self->on_error->($self);
138 0           $self->ioloop->remove($self->{_connection});
139             }
140 0           );
141              
142             }
143 0           );
144              
145 0           return $self;
146             }
147              
148             sub connected {
149 0     0 0   my $self = shift;
150              
151 0           return $self->{_connection};
152             }
153              
154             sub execute {
155 0     0 1   my ($self, $command, $args, $cb) = @_;
156              
157 0 0 0       if (!$cb && ref $args eq 'CODE') {
    0          
158 0           $cb = $args;
159 0           $args = [];
160             }
161             elsif (!ref $args) {
162 0           $args = [$args];
163             }
164              
165 0           unshift @$args, uc $command;
166              
167 0   0       my $mqueue = $self->{_message_queue} ||= [];
168 0   0       my $cqueue = $self->{_cb_queue} ||= [];
169              
170              
171 0           push @$mqueue, $args;
172 0           push @$cqueue, $cb;
173              
174 0 0         $self->connect unless $self->{_connection};
175 0           $self->_send_next_message;
176              
177 0           return $self;
178             }
179              
180             sub start {
181 0     0 1   my ($self) = @_;
182              
183 0           $self->ioloop->start;
184 0           return $self;
185             }
186              
187             sub stop {
188 0     0 0   my ($self) = @_;
189              
190 0           $self->ioloop->stop;
191 0           return $self;
192             }
193              
194             sub _create_protocol {
195 0     0     my $self = shift;
196              
197 0           my $protocol = $self->protocol_redis->new(api => 1);
198             $protocol->on_message(
199             sub {
200 0     0     my ($parser, $command) = @_;
201 0           $self->_return_command_data($command);
202             }
203 0           );
204              
205 0 0         Carp::croak(q/Protocol::Redis implementation doesn't support APIv1/)
206             unless $protocol;
207              
208 0           $protocol;
209             }
210              
211             sub _send_next_message {
212 0     0     my ($self) = @_;
213              
214 0 0 0       if ((my $id = $self->{_connection}) && !$self->{_connecting}) {
215 0           while (my $args = shift @{$self->{_message_queue}}) {
  0            
216 0           my $cmd_arg = [];
217 0           my $cmd = {type => '*', data => $cmd_arg};
218 0           foreach my $token (@$args) {
219 0 0         $token = Encode::encode($self->encoding, $token)
220             if $self->encoding;
221 0           push @$cmd_arg, {type => '$', data => $token};
222             }
223 0           my $message = $self->{_protocol}->encode($cmd);
224              
225 0           $self->ioloop->stream($id)->write($message);
226             }
227             }
228             }
229              
230              
231             sub _reencode_message {
232 0     0     my ($self, $message) = @_;
233              
234 0           my ($type, $data) = @{$message}{'type', 'data'};
  0            
235              
236             # Decode data
237 0 0 0       if ($type ne '*' && $self->encoding && $data) {
      0        
238 0           $data = Encode::decode($self->encoding, $data);
239             }
240              
241 0 0         if ($type eq '-') {
    0          
242 0           $self->error($data);
243 0           $self->on_error->($self);
244 0           return;
245             }
246             elsif ($type ne '*') {
247 0           return [$data];
248             }
249             else {
250 0           my $reencoded_data = [];
251 0           foreach my $item (@$data) {
252 0           my $message = $self->_reencode_message($item);
253 0           push @$reencoded_data, $message;
254             }
255 0           return $reencoded_data;
256             }
257             }
258              
259             sub _return_command_data {
260 0     0     my ($self, $message) = @_;
261              
262 0           my $data = $self->_reencode_message($message);
263              
264 0           my $cb = shift @{$self->{_cb_queue}};
  0            
265 0 0         $cb->($self, $data) if $cb;
266              
267             # Reset error after callback dispatching
268 0           $self->error(undef);
269             }
270              
271              
272             sub _inform_queue {
273 0     0     my ($self) = @_;
274              
275 0           for my $cb (@{$self->{_cb_queue}}) {
  0            
276 0 0         $cb->($self) if $cb;
277             }
278 0           $self->{_queue} = [];
279             }
280              
281             1;
282             __END__