File Coverage

blib/lib/Mojo/Redis/Connection.pm
Criterion Covered Total %
statement 170 178 95.5
branch 55 80 68.7
condition 29 45 64.4
subroutine 26 28 92.8
pod 4 4 100.0
total 284 335 84.7


line stmt bran cond sub pod time code
1             package Mojo::Redis::Connection;
2 18     18   118 use Mojo::Base 'Mojo::EventEmitter';
  18         39  
  18         101  
3              
4 18     18   11225 use File::Spec::Functions 'file_name_is_absolute';
  18         14611  
  18         1123  
5 18     18   8100 use Mojo::IOLoop;
  18         2724600  
  18         102  
6 18     18   899 use Mojo::Promise;
  18         40  
  18         102  
7              
8 18     18   615 use constant DEBUG => $ENV{MOJO_REDIS_DEBUG};
  18         34  
  18         1243  
9 18   50 18   95 use constant CONNECT_TIMEOUT => $ENV{MOJO_REDIS_CONNECT_TIMEOUT} || 10;
  18         40  
  18         968  
10 18   50 18   288 use constant SENTINELS_CONNECT_TIMEOUT => $ENV{MOJO_REDIS_SENTINELS_CONNECT_TIMEOUT} || CONNECT_TIMEOUT;
  18         31  
  18         49349  
11              
12             has encoding => sub { Carp::confess('encoding is required in constructor') };
13             has ioloop => sub { Carp::confess('ioloop is required in constructor') };
14             has protocol => sub { Carp::confess('protocol is required in constructor') };
15             has url => sub { Carp::confess('url is required in constructor') };
16              
17             sub DESTROY {
18 16     16   3615 my $self = shift;
19 16 100 66     434 $self->disconnect if defined $self->{pid} and $self->{pid} == $$;
20             }
21              
22             sub disconnect {
23 5     5 1 10 my $self = shift;
24 5         16 $self->_reject_queue;
25 5 100       27 $self->{stream}->close if $self->{stream};
26 5         246 $self->{gone_away} = 1;
27 5         225 return $self;
28             }
29              
30 25 100 100 25 1 147 sub is_connected { $_[0]->{stream} && !$_[0]->{gone_away} ? 1 : 0 }
31              
32             sub write {
33 1     1 1 3 my $self = shift;
34 1         2 push @{$self->{write}}, [$self->_encode(@_)];
  1         5  
35 1 50       47 $self->is_connected ? $self->_write : $self->_connect;
36 1         3 return $self;
37             }
38              
39             sub write_p {
40 8     8 1 13 my $self = shift;
41 8         32 my $p = Mojo::Promise->new->ioloop($self->ioloop);
42 8         306 push @{$self->{write}}, [$self->_encode(@_), $p];
  8         37  
43 8 100       315 $self->is_connected ? $self->_write : $self->_connect;
44 8         31 return $p;
45             }
46              
47             sub _connect {
48 11     11   21 my $self = shift;
49 11 50       30 return $self if $self->{id}; # Connecting
50              
51             # Cannot reuse a connection because of transaction state and other state
52 11 100       34 return $self->_reject_queue('Redis server has gone away') if $self->{gone_away};
53              
54 9   66     48 my $url = $self->{master_url} || $self->url;
55 9 100 100     141 return $self->_discover_master if !$self->{master_url} and $url->query->param('sentinel');
56              
57 8         462 Scalar::Util::weaken($self);
58 8         13 delete $self->{master_url}; # Make sure we forget master_url so we can reconnect
59 8         28 $self->protocol->on_message($self->_parse_message_cb);
60             $self->{id} = $self->ioloop->client(
61             $self->_connect_args($url, {port => 6379, timeout => CONNECT_TIMEOUT}),
62             sub {
63 6 50   6   10949 return unless $self;
64 6         18 my ($loop, $err, $stream) = @_;
65 6         19 my $close_cb = $self->_on_close_cb;
66 6 100       21 return $self->$close_cb($err) if $err;
67              
68 5         17 $stream->timeout(0);
69 5         114 $stream->on(close => $close_cb);
70 5         35 $stream->on(error => $close_cb);
71 5         31 $stream->on(read => $self->_on_read_cb);
72              
73 5 100       38 unshift @{$self->{write}}, [$self->_encode(SELECT => $url->path->[0])] if length $url->path->[0];
  2         239  
74 5 100       158 unshift @{$self->{write}}, [$self->_encode(AUTH => $url->password)] if length $url->password;
  2         61  
75 5         102 $self->{pid} = $$;
76 5         15 $self->{stream} = $stream;
77 5         95 $self->emit('connect');
78 5         130 $self->_write;
79             }
80 8         52 );
81              
82 8         1618 warn "[@{[$self->_id]}] CONNECTING $url (blocking=@{[$self->_is_blocking]})\n" if DEBUG;
83 8         23 return $self;
84             }
85              
86             sub _connect_args {
87 10     10   89 my ($self, $url, $defaults) = @_;
88 10   50     30 my %args = (address => $url->host || 'localhost');
89              
90 10 100       91 if (file_name_is_absolute $args{address}) {
91 1         17 $args{path} = delete $args{address};
92             }
93             else {
94 9   66     92 $args{port} = $url->port || $defaults->{port};
95             }
96              
97 10   50     78 $args{timeout} = $defaults->{timeout} || CONNECT_TIMEOUT;
98 10         76 return \%args;
99             }
100              
101             sub _discover_master {
102 2     2   156 my $self = shift;
103 2         6 my $url = $self->url->clone;
104 2         240 my $sentinels = $url->query->every_param('sentinel');
105 2   50     58 my $timeout = $url->query->param('sentinel_connect_timeout') || SENTINELS_CONNECT_TIMEOUT;
106              
107 2         61 $url->host_port(shift @$sentinels);
108 2         89 $self->url->query->param(sentinel => [@$sentinels, $url->host_port]); # Round-robin sentinel list
109 2         170 $self->protocol->on_message($self->_parse_message_cb);
110             $self->{id} = $self->ioloop->client(
111             $self->_connect_args($url, {port => 16379, timeout => $timeout}),
112             sub {
113 2     2   3721 my ($loop, $err, $stream) = @_;
114 2 50       7 return unless $self;
115 2 50       5 return $self->_discover_master if $err;
116              
117 2         6 $stream->timeout(0);
118 2 50       49 $stream->on(close => sub { $self->_discover_master unless $self->{master_url} });
  1         83  
119 2         15 $stream->on(error => sub { $self->_discover_master });
  0         0  
120 2         14 $stream->on(read => $self->_on_read_cb);
121              
122 2         13 $self->{stream} = $stream;
123 2         13 my $p = Mojo::Promise->new;
124 2         50 unshift @{$self->{write}}, undef; # prevent _write() from writing commands
  2         5  
125 2         4 unshift @{$self->{write}}, [$self->_encode(SENTINEL => 'get-master-addr-by-name', $self->url->host), $p];
  2         7  
126 2 50       101 unshift @{$self->{write}}, [$self->_encode(AUTH => $url->password)] if length $url->password;
  2         44  
127              
128 2         77 $self->{write_lock} = 1;
129             $p->then(
130             sub {
131 2         385 my $host_port = shift;
132 2         13 delete $self->{id};
133 2         5 delete $self->{write_lock};
134 2 100 66     11 return $self->_discover_master unless ref $host_port and @$host_port == 2;
135 1         5 $self->{master_url} = $self->url->clone->host($host_port->[0])->port($host_port->[1]);
136 1         118 $self->{stream}->close;
137 1         58 $self->_connect;
138             },
139 0         0 sub { $self->_discover_master },
140 2         17 );
141              
142 2         129 $self->_write;
143             }
144 2         17 );
145              
146 2         435 warn "[@{[$self->_id]}] SENTINEL DISCOVERY $url (blocking=@{[$self->_is_blocking]})\n" if DEBUG;
147 2         7 return $self;
148             }
149              
150             sub _encode {
151 17     17   105 my $self = shift;
152 17         70 my $encoding = $self->encoding;
153             return $self->protocol->encode({
154 17 50       89 type => '*', data => [map { +{type => '$', data => $encoding ? Mojo::Util::encode($encoding, $_) : $_} } @_]
  34         281  
155             });
156             }
157              
158 0 0   0   0 sub _id { $_[0]->{id} || '0' }
159              
160 0 0   0   0 sub _is_blocking { shift->ioloop eq Mojo::IOLoop->singleton ? 0 : 1 }
161              
162             sub _on_close_cb {
163 6     6   12 my $self = shift;
164              
165 6         19 Scalar::Util::weaken($self);
166             return sub {
167 5 50   5   1779 return unless $self;
168 5         11 my ($stream, $err) = @_;
169 5         19 delete $self->{$_} for qw(id stream);
170 5         11 $self->{gone_away} = 1;
171 5         13 $self->_reject_queue($err);
172 5 100       26 $self->emit('close') if @_ == 1;
173 5 0 50     69 warn qq([@{[$self->_id]}] @{[$err ? "ERROR $err" : "CLOSED"]}\n) if $self and DEBUG;
  0 50       0  
  0         0  
174 6         30 };
175             }
176              
177             sub _on_read_cb {
178 7     7   15 my $self = shift;
179              
180 7         34 Scalar::Util::weaken($self);
181             return sub {
182 4 50   4   4318 return unless $self;
183 4         10 my ($stream, $chunk) = @_;
184 4         8 do { local $_ = $chunk; s!\r\n!\\r\\n!g; warn "[@{[$self->_id]}] >>> ($_)\n" } if DEBUG;
185 4         14 $self->protocol->parse($chunk);
186 7         62 };
187             }
188              
189             sub _parse_message_cb {
190 10     10   60 my $self = shift;
191              
192 10         35 Scalar::Util::weaken($self);
193             return sub {
194 8     8   452 my ($protocol, @messages) = @_;
195 8         21 my $encoding = $self->encoding;
196 8 100       66 $self->_write unless $self->{write_lock};
197              
198             my $unpack = sub {
199 10         13 my @res;
200              
201 10         29 while (my $m = shift @_) {
202 12 50 66     139 if ($m->{type} eq '-') {
    50 66        
    100 66        
    100          
203 0         0 return $m->{data}, undef;
204             }
205             elsif ($m->{type} eq ':') {
206 0         0 push @res, 0 + $m->{data};
207             }
208             elsif ($m->{type} eq '*' and ref $m->{data} eq 'ARRAY') {
209 2         4 my ($err, $res) = __SUB__->(@{$m->{data}});
  2         8  
210 2 50       6 return $err if defined $err;
211 2         6 push @res, $res;
212             }
213              
214             # Only bulk string replies can contain binary-safe encoded data
215             elsif ($m->{type} eq '$' and $encoding and defined $m->{data}) {
216 9         27 push @res, Mojo::Util::decode($encoding, $m->{data});
217             }
218             else {
219 1         4 push @res, $m->{data};
220             }
221             }
222              
223 10         172 return undef, \@res;
224 8         49 };
225              
226 8         20 my ($err, $res) = $unpack->(@messages);
227 8 50       12 my $p = shift @{$self->{waiting} || []};
  8         21  
228 8 0       16 return $p ? $p->reject($err) : $self->emit(error => $err) unless $res;
    50          
229 8 100       72 return $p ? $p->resolve($res->[0]) : $self->emit(response => $res->[0]);
230 10         71 };
231             }
232              
233             sub _reject_queue {
234 12     12   21 my ($self, $err) = @_;
235 12         19 state $default = 'Premature connection close';
236 12 100 33     16 for my $p (@{delete $self->{waiting} || []}) { $p and $p->reject($err || $default) }
  12 100       46  
  4         17  
237 12 100 66     181 for my $i (@{delete $self->{write} || []}) { $i->[1] and $i->[1]->reject($err || $default) }
  12 50       53  
  4         23  
238 12         277 return $self;
239             }
240              
241             sub _write {
242 12     12   23 my $self = shift;
243              
244 12         16 while (my $op = shift @{$self->{write}}) {
  24         343  
245 12         31 my $loop = $self->ioloop;
246 12         44 do { local $_ = $op->[0]; s!\r\n!\\r\\n!g; warn "[@{[$self->_id]}] <<< ($_)\n" } if DEBUG;
247 12         23 push @{$self->{waiting}}, $op->[1];
  12         28  
248 12         36 $self->{stream}->write($op->[0]);
249             }
250             }
251              
252             1;
253              
254             =encoding utf8
255              
256             =head1 NAME
257              
258             Mojo::Redis::Connection - Low level connection class for talking to Redis
259              
260             =head1 SYNOPSIS
261              
262             use Mojo::Redis::Connection;
263              
264             my $conn = Mojo::Redis::Connection->new(
265             ioloop => Mojo::IOLoop->singleton,
266             protocol => Protocol::Redis::Faster->new(api => 1),
267             url => Mojo::URL->new("redis://localhost"),
268             );
269              
270             $conn->write_p("GET some_key")->then(sub { print "some_key=$_[0]" })->wait;
271              
272             =head1 DESCRIPTION
273              
274             L is a low level driver for writing and reading data
275             from a Redis server.
276              
277             You probably want to use L instead of this class.
278              
279             =head1 EVENTS
280              
281             =head2 close
282              
283             $cb = $conn->on(close => sub { my ($conn) = @_; });
284              
285             Emitted when the connection to the redis server gets closed.
286              
287             =head2 connect
288              
289             $cb = $conn->on(connect => sub { my ($conn) = @_; });
290              
291             Emitted right after a connection is established to the Redis server, but
292             after the AUTH and SELECT commands are queued.
293              
294             =head2 error
295              
296             $cb = $conn->on(error => sub { my ($conn, $error) = @_; });
297              
298             Emitted if there's a connection error or the Redis server emits an error, and
299             there's not a promise to handle the message.
300              
301             =head2 response
302              
303             $cb = $conn->on(response => sub { my ($conn, $res) = @_; });
304              
305             Emitted when receiving a message from the Redis server.
306              
307             =head1 ATTRIBUTES
308              
309             =head2 encoding
310              
311             $str = $conn->encoding;
312             $conn = $conn->encoding("UTF-8");
313              
314             Holds the character encoding to use for data from/to Redis. Set to C
315             to disable encoding/decoding data. Without an encoding set, Redis expects and
316             returns bytes. See also L.
317              
318             =head2 ioloop
319              
320             $loop = $conn->ioloop;
321             $conn = $conn->ioloop(Mojo::IOLoop->new);
322              
323             Holds an instance of L.
324              
325             =head2 protocol
326              
327             $protocol = $conn->protocol;
328             $conn = $conn->protocol(Protocol::Redis::XS->new(api => 1));
329              
330             Holds a protocol object, such as L that is used to
331             generate and parse Redis messages.
332              
333             =head2 url
334              
335             $url = $conn->url;
336             $conn = $conn->url(Mojo::URL->new->host("/tmp/redis.sock")->path("/5"));
337             $conn = $conn->url("redis://localhost:6379/1");
338              
339             =head1 METHODS
340              
341             =head2 disconnect
342              
343             $conn = $conn->disconnect;
344              
345             Used to disconnect from the Redis server.
346              
347             =head2 is_connected
348              
349             $bool = $conn->is_connected;
350              
351             True if a connection to the Redis server is established.
352              
353             =head2 write
354              
355             $conn = $conn->write(@command_and_args);
356              
357             Used to write a message to the redis server. Calling this method should result
358             in either a L or L event.
359              
360             This is useful in the a
361              
362             =head2 write_p
363              
364             $promise = $conn->write_p(@command_and_args);
365              
366             Will write a command to the Redis server and establish a connection if not
367             already connected and returns a L.
368              
369             =head1 SEE ALSO
370              
371             L
372              
373             =cut