File Coverage

blib/lib/Mojo/Redis/Connection.pm
Criterion Covered Total %
statement 171 180 95.0
branch 55 80 68.7
condition 29 45 64.4
subroutine 26 28 92.8
pod 4 4 100.0
total 285 337 84.5


line stmt bran cond sub pod time code
1             package Mojo::Redis::Connection;
2 18     18   95 use Mojo::Base 'Mojo::EventEmitter';
  18         27  
  18         112  
3              
4 18     18   10022 use File::Spec::Functions 'file_name_is_absolute';
  18         12274  
  18         1099  
5 18     18   7297 use Mojo::IOLoop;
  18         2858808  
  18         97  
6 18     18   990 use Mojo::Promise;
  18         44  
  18         120  
7              
8 18     18   663 use constant DEBUG => $ENV{MOJO_REDIS_DEBUG};
  18         29  
  18         1197  
9 18   50 18   114 use constant CONNECT_TIMEOUT => $ENV{MOJO_REDIS_CONNECT_TIMEOUT} || 10;
  18         35  
  18         1016  
10 18   50 18   76 use constant SENTINELS_CONNECT_TIMEOUT => $ENV{MOJO_REDIS_SENTINELS_CONNECT_TIMEOUT} || CONNECT_TIMEOUT;
  18         23  
  18         52934  
11              
12             has encoding => sub { Carp::confess('encoding is required in constructor') };
13             has ioloop => sub { Carp::confess('ioloop is required in constructor') };
14             has protocol => sub { Carp::confess('protocol is required in constructor') };
15             has url => sub { Carp::confess('url is required in constructor') };
16             has [qw/ tls tls_ca tls_cert tls_key tls_options /];
17              
18             sub DESTROY {
19 16     16   3369 my $self = shift;
20 16 100 66     463 $self->disconnect if defined $self->{pid} and $self->{pid} == $$;
21             }
22              
23             sub disconnect {
24 5     5 1 6 my $self = shift;
25 5         15 $self->_reject_queue;
26 5 100       24 $self->{stream}->close if $self->{stream};
27 5         169 $self->{gone_away} = 1;
28 5         152 return $self;
29             }
30              
31 25 100 100 25 1 162 sub is_connected { $_[0]->{stream} && !$_[0]->{gone_away} ? 1 : 0 }
32              
33             sub write {
34 1     1 1 2 my $self = shift;
35 1         1 push @{$self->{write}}, [$self->_encode(@_)];
  1         13  
36 1 50       45 $self->is_connected ? $self->_write : $self->_connect;
37 1         4 return $self;
38             }
39              
40             sub write_p {
41 8     8 1 11 my $self = shift;
42 8         34 my $p = Mojo::Promise->new->ioloop($self->ioloop);
43 8         279 push @{$self->{write}}, [$self->_encode(@_), $p];
  8         27  
44 8 100       295 $self->is_connected ? $self->_write : $self->_connect;
45 8         29 return $p;
46             }
47              
48             sub _connect {
49 11     11   17 my $self = shift;
50 11 50       35 return $self if $self->{id}; # Connecting
51              
52             # Cannot reuse a connection because of transaction state and other state
53 11 100       26 return $self->_reject_queue('Redis server has gone away') if $self->{gone_away};
54              
55 9   66     43 my $url = $self->{master_url} || $self->url;
56 9 100 100     86 return $self->_discover_master if !$self->{master_url} and $url->query->param('sentinel');
57              
58 8         189 Scalar::Util::weaken($self);
59 8         13 delete $self->{master_url}; # Make sure we forget master_url so we can reconnect
60 8         19 $self->protocol->on_message($self->_parse_message_cb);
61             $self->{id} = $self->ioloop->client(
62             $self->_connect_args(
63             $url,
64             {
65             port => 6379,
66             timeout => CONNECT_TIMEOUT,
67 0         0 map { ($_ => $self->$_) } grep defined $self->{$_}, qw/ tls tls_ca tls_cert tls_key tls_options /
68             }
69             ),
70             sub {
71 6 50   6   7567 return unless $self;
72 6         13 my ($loop, $err, $stream) = @_;
73 6         17 my $close_cb = $self->_on_close_cb;
74 6 100       30 return $self->$close_cb($err) if $err;
75              
76 5         53 $stream->timeout(0);
77 5         95 $stream->on(close => $close_cb);
78 5         23 $stream->on(error => $close_cb);
79 5         24 $stream->on(read => $self->_on_read_cb);
80              
81 5 100       27 unshift @{$self->{write}}, [$self->_encode(SELECT => $url->path->[0])] if length $url->path->[0];
  2         145  
82 5 100       164 unshift @{$self->{write}}, [$self->_encode(AUTH => $url->password)] if length $url->password;
  2         29  
83 5         109 $self->{pid} = $$;
84 5         13 $self->{stream} = $stream;
85 5         71 $self->emit('connect');
86 5         94 $self->_write;
87             }
88 8         111 );
89              
90 8         1355 warn "[@{[$self->_id]}] CONNECTING $url (blocking=@{[$self->_is_blocking]})\n" if DEBUG;
91 8         27 return $self;
92             }
93              
94             sub _connect_args {
95 10     10   84 my ($self, $url, $defaults) = @_;
96 10   50     28 my %args = (address => $url->host || 'localhost');
97              
98 10 100       98 if (file_name_is_absolute $args{address}) {
99 1         18 $args{path} = delete $args{address};
100             }
101             else {
102 9   66     71 $args{port} = $url->port || $defaults->{port};
103             }
104              
105 10   50     65 $args{timeout} = $defaults->{timeout} || CONNECT_TIMEOUT;
106              
107 10         62 $args{$_} = $defaults->{$_} foreach grep /^tls/, keys %$defaults;
108 10         73 return \%args;
109             }
110              
111             sub _discover_master {
112 2     2   34 my $self = shift;
113 2         4 my $url = $self->url->clone;
114 2         110 my $sentinels = $url->query->every_param('sentinel');
115 2   50     36 my $timeout = $url->query->param('sentinel_connect_timeout') || SENTINELS_CONNECT_TIMEOUT;
116              
117 2         50 $url->host_port(shift @$sentinels);
118 2         56 $self->url->query->param(sentinel => [@$sentinels, $url->host_port]); # Round-robin sentinel list
119 2         123 $self->protocol->on_message($self->_parse_message_cb);
120             $self->{id} = $self->ioloop->client(
121             $self->_connect_args($url, {port => 16379, timeout => $timeout}),
122             sub {
123 2     2   2937 my ($loop, $err, $stream) = @_;
124 2 50       6 return unless $self;
125 2 50       5 return $self->_discover_master if $err;
126              
127 2         4 $stream->timeout(0);
128 2 50       39 $stream->on(close => sub { $self->_discover_master unless $self->{master_url} });
  1         55  
129 2         12 $stream->on(error => sub { $self->_discover_master });
  0         0  
130 2         11 $stream->on(read => $self->_on_read_cb);
131              
132 2         7 $self->{stream} = $stream;
133 2         9 my $p = Mojo::Promise->new->ioloop($self->ioloop);
134 2         63 unshift @{$self->{write}}, undef; # prevent _write() from writing commands
  2         4  
135 2         3 unshift @{$self->{write}}, [$self->_encode(SENTINEL => 'get-master-addr-by-name', $self->url->host), $p];
  2         5  
136 2 50       90 unshift @{$self->{write}}, [$self->_encode(AUTH => $url->password)] if length $url->password;
  2         30  
137              
138 2         61 $self->{write_lock} = 1;
139             $p->then(
140             sub {
141 2         293 my $host_port = shift;
142 2         4 delete $self->{id};
143 2         4 delete $self->{write_lock};
144 2 100 66     10 return $self->_discover_master unless ref $host_port and @$host_port == 2;
145 1         2 $self->{master_url} = $self->url->clone->host($host_port->[0])->port($host_port->[1]);
146 1         69 $self->{stream}->close;
147 1         56 $self->_connect;
148             },
149 0         0 sub { $self->_discover_master },
150 2         11 );
151              
152 2         79 $self->_write;
153             }
154 2         13 );
155              
156 2         337 warn "[@{[$self->_id]}] SENTINEL DISCOVERY $url (blocking=@{[$self->_is_blocking]})\n" if DEBUG;
157 2         6 return $self;
158             }
159              
160             sub _encode {
161 17     17   75 my $self = shift;
162 17         34 my $encoding = $self->encoding;
163             return $self->protocol->encode({
164 17 50       72 type => '*', data => [map { +{type => '$', data => $encoding ? Mojo::Util::encode($encoding, $_) : $_} } @_]
  34         219  
165             });
166             }
167              
168 0 0   0   0 sub _id { $_[0]->{id} || '0' }
169              
170 0 0   0   0 sub _is_blocking { shift->ioloop eq Mojo::IOLoop->singleton ? 0 : 1 }
171              
172             sub _on_close_cb {
173 6     6   8 my $self = shift;
174              
175 6         8 Scalar::Util::weaken($self);
176             return sub {
177 5 50   5   1485 return unless $self;
178 5         20 my ($stream, $err) = @_;
179 5         44 delete $self->{$_} for qw(id stream);
180 5         11 $self->{gone_away} = 1;
181 5         11 $self->_reject_queue($err);
182 5 100       37 $self->emit('close') if @_ == 1;
183 5 0 50     45 warn qq([@{[$self->_id]}] @{[$err ? "ERROR $err" : "CLOSED"]}\n) if $self and DEBUG;
  0 50       0  
  0         0  
184 6         21 };
185             }
186              
187             sub _on_read_cb {
188 7     7   12 my $self = shift;
189              
190 7         8 Scalar::Util::weaken($self);
191             return sub {
192 4 50   4   4381 return unless $self;
193 4         10 my ($stream, $chunk) = @_;
194 4         4 do { local $_ = $chunk; s!\r\n!\\r\\n!g; warn "[@{[$self->_id]}] >>> ($_)\n" } if DEBUG;
195 4         22 $self->protocol->parse($chunk);
196 7         26 };
197             }
198              
199             sub _parse_message_cb {
200 10     10   44 my $self = shift;
201              
202 10         30 Scalar::Util::weaken($self);
203             return sub {
204 8     8   367 my ($protocol, @messages) = @_;
205 8         13 my $encoding = $self->encoding;
206 8 100       35 $self->_write unless $self->{write_lock};
207              
208             my $unpack = sub {
209 10         11 my @res;
210              
211 10         19 while (my $m = shift @_) {
212 12 50 66     84 if ($m->{type} eq '-') {
    50 66        
    100 66        
    100          
213 0         0 return $m->{data}, undef;
214             }
215             elsif ($m->{type} eq ':') {
216 0         0 push @res, 0 + $m->{data};
217             }
218             elsif ($m->{type} eq '*' and ref $m->{data} eq 'ARRAY') {
219 2         3 my ($err, $res) = __SUB__->(@{$m->{data}});
  2         19  
220 2 50       5 return $err if defined $err;
221 2         4 push @res, $res;
222             }
223              
224             # Only bulk string replies can contain binary-safe encoded data
225             elsif ($m->{type} eq '$' and $encoding and defined $m->{data}) {
226 9         17 push @res, Mojo::Util::decode($encoding, $m->{data});
227             }
228             else {
229 1         3 push @res, $m->{data};
230             }
231             }
232              
233 10         104 return undef, \@res;
234 8         40 };
235              
236 8         13 my ($err, $res) = $unpack->(@messages);
237 8 50       9 my $p = shift @{$self->{waiting} || []};
  8         18  
238 8 0       13 return $p ? $p->reject($err) : $self->emit(error => $err) unless $res;
    50          
239 8 100       45 return $p ? $p->resolve($res->[0]) : $self->emit(response => $res->[0]);
240 10         89 };
241             }
242              
243             sub _reject_queue {
244 12     12   16 my ($self, $err) = @_;
245 12         19 state $default = 'Premature connection close';
246 12 100 33     15 for my $p (@{delete $self->{waiting} || []}) { $p and $p->reject($err || $default) }
  12 100       41  
  4         20  
247 12 100 66     160 for my $i (@{delete $self->{write} || []}) { $i->[1] and $i->[1]->reject($err || $default) }
  12 50       36  
  4         23  
248 12         247 return $self;
249             }
250              
251             sub _write {
252 12     12   17 my $self = shift;
253              
254 12         14 while (my $op = shift @{$self->{write}}) {
  24         285  
255 12         21 my $loop = $self->ioloop;
256 12         30 do { local $_ = $op->[0]; s!\r\n!\\r\\n!g; warn "[@{[$self->_id]}] <<< ($_)\n" } if DEBUG;
257 12         11 push @{$self->{waiting}}, $op->[1];
  12         25  
258 12         30 $self->{stream}->write($op->[0]);
259             }
260             }
261              
262             1;
263              
264             =encoding utf8
265              
266             =head1 NAME
267              
268             Mojo::Redis::Connection - Low level connection class for talking to Redis
269              
270             =head1 SYNOPSIS
271              
272             use Mojo::Redis::Connection;
273              
274             my $conn = Mojo::Redis::Connection->new(
275             ioloop => Mojo::IOLoop->singleton,
276             protocol => Protocol::Redis::Faster->new(api => 1),
277             url => Mojo::URL->new("redis://localhost"),
278             );
279              
280             $conn->write_p("GET some_key")->then(sub { print "some_key=$_[0]" })->wait;
281              
282             =head1 DESCRIPTION
283              
284             L is a low level driver for writing and reading data
285             from a Redis server.
286              
287             You probably want to use L instead of this class.
288              
289             =head1 EVENTS
290              
291             =head2 close
292              
293             $cb = $conn->on(close => sub { my ($conn) = @_; });
294              
295             Emitted when the connection to the redis server gets closed.
296              
297             =head2 connect
298              
299             $cb = $conn->on(connect => sub { my ($conn) = @_; });
300              
301             Emitted right after a connection is established to the Redis server, but
302             after the AUTH and SELECT commands are queued.
303              
304             =head2 error
305              
306             $cb = $conn->on(error => sub { my ($conn, $error) = @_; });
307              
308             Emitted if there's a connection error or the Redis server emits an error, and
309             there's not a promise to handle the message.
310              
311             =head2 response
312              
313             $cb = $conn->on(response => sub { my ($conn, $res) = @_; });
314              
315             Emitted when receiving a message from the Redis server.
316              
317             =head1 ATTRIBUTES
318              
319             =head2 encoding
320              
321             $str = $conn->encoding;
322             $conn = $conn->encoding("UTF-8");
323              
324             Holds the character encoding to use for data from/to Redis. Set to C
325             to disable encoding/decoding data. Without an encoding set, Redis expects and
326             returns bytes. See also L.
327              
328             =head2 ioloop
329              
330             $loop = $conn->ioloop;
331             $conn = $conn->ioloop(Mojo::IOLoop->new);
332              
333             Holds an instance of L.
334              
335             =head2 protocol
336              
337             $protocol = $conn->protocol;
338             $conn = $conn->protocol(Protocol::Redis::XS->new(api => 1));
339              
340             Holds a protocol object, such as L that is used to
341             generate and parse Redis messages.
342              
343             =head2 tls
344              
345             my $tls = $conn->tls;
346             $conn = $conn->tls(1);
347              
348             See L
349              
350             =head2 tls_ca
351              
352             my $tls_ca = $conn->tls_ca;
353             $conn = $conn->tls_ca('/etc/tls/ca.crt');
354              
355             See L
356              
357             =head2 tls_cert
358              
359             my $tls_cert = $conn->tls_cert;
360             $conn = $conn->tls_cert('/etc/tls/client.crt');
361              
362             See L
363              
364             =head2 tls_key
365              
366             my $tls_key = $conn->tls_key;
367             $conn = $conn->tls_key('/etc/tls/client.key');
368              
369             See L
370              
371             =head2 tls_options
372              
373             my $tls_options = $conn->tls_options;
374             $conn = $conn->tls_options({SSL_alpn_protocols => ['foo', 'bar'], SSL_verify_mode => 0x00});
375              
376             See L
377              
378             =head2 url
379              
380             $url = $conn->url;
381             $conn = $conn->url(Mojo::URL->new->host("/tmp/redis.sock")->path("/5"));
382             $conn = $conn->url("redis://localhost:6379/1");
383              
384             =head1 METHODS
385              
386             =head2 disconnect
387              
388             $conn = $conn->disconnect;
389              
390             Used to disconnect from the Redis server.
391              
392             =head2 is_connected
393              
394             $bool = $conn->is_connected;
395              
396             True if a connection to the Redis server is established.
397              
398             =head2 write
399              
400             $conn = $conn->write(@command_and_args);
401              
402             Used to write a message to the redis server. Calling this method should result
403             in either a L or L event.
404              
405             This is useful in the a
406              
407             =head2 write_p
408              
409             $promise = $conn->write_p(@command_and_args);
410              
411             Will write a command to the Redis server and establish a connection if not
412             already connected and returns a L.
413              
414             =head1 SEE ALSO
415              
416             L
417              
418             =cut