File Coverage

blib/lib/App/Memcached/CLI/DataSource.pm
Criterion Covered Total %
statement 26 208 12.5
branch 0 48 0.0
condition 0 13 0.0
subroutine 9 41 21.9
pod 0 18 0.0
total 35 328 10.6


line stmt bran cond sub pod time code
1             package App::Memcached::CLI::DataSource;
2              
3 2     2   12 use strict;
  2         10  
  2         46  
4 2     2   8 use warnings;
  2         3  
  2         39  
5 2     2   26 use 5.008_001;
  2         5  
6              
7 2     2   10 use Carp;
  2         2  
  2         87  
8 2     2   16 use IO::Socket;
  2         4  
  2         11  
9 2     2   802 use Time::HiRes;
  2         22  
  2         9  
10              
11 2     2   457 use App::Memcached::CLI::Util qw(is_unixsocket debug);
  2         4  
  2         1384  
12              
13             sub new {
14 0     0 0   my $class = shift;
15 0           my %args = @_;
16 0           bless \%args, $class;
17             }
18              
19             sub connect {
20 0     0 0   my $class = shift;
21 0           my $addr = shift;
22 0           my %opts = @_;
23              
24             my $socket = sub {
25 0 0   0     return IO::Socket::UNIX->new(Peer => $addr) if is_unixsocket($addr);
26             return IO::Socket::INET->new(
27             PeerAddr => $addr,
28             Proto => 'tcp',
29 0   0       Timeout => $opts{timeout} || 1,
30             );
31 0           }->();
32 0 0         confess "Can't connect to $addr" unless $socket;
33              
34 0           return $class->new(socket => $socket);
35             }
36              
37             sub ping {
38 0     0 0   my $self = shift;
39 0           my $version = eval {
40 0           return $self->query_one('version');
41             };
42 0 0 0       if (!$version or $@) {
43 0           debug "Ping failed.";
44 0 0         debug "ERROR: " . $@ if $@;
45 0           return;
46             }
47 0           return 1;
48             }
49              
50             sub get {
51 0     0 0   my $self = shift;
52 0           return $self->_retrieve('get', shift);
53             }
54              
55             sub gets {
56 0     0 0   my $self = shift;
57 0           return $self->_retrieve('gets', shift);
58             }
59              
60             sub _retrieve {
61 0     0     my $self = shift;
62 0           my ($cmd, $keys) = @_;
63              
64 0           my $key_str = join(q{ }, @$keys);
65 0           $self->{socket}->write("$cmd $key_str\r\n");
66              
67 0           my @results;
68              
69 0           while (1) {
70 0           my $response = $self->_readline;
71 0 0         next if ($response =~ m/^[\r\n]+$/);
72 0 0         if ($response =~ m/^VALUE (\S+) (\d+) (\d+)(?: (\d+))?/) {
    0          
73 0           my %data = (
74             key => $1,
75             flags => $2,
76             length => $3,
77             cas => $4,
78             );
79 0     0     local $SIG{ALRM} = sub { die 'Timed out to Read Socket.' };
  0            
80 0           alarm 3;
81 0           $self->{socket}->read($response, $data{length});
82 0           alarm 0;
83 0           $data{value} = $response;
84 0           push @results, \%data;
85             } elsif ($response =~ m/^END/) {
86 0           last;
87             } else {
88 0           warn "Unknown response '$response'";
89             }
90             }
91              
92 0           return \@results;
93             }
94              
95 0     0 0   sub set { return &_store(shift, 'set', @_); }
96 0     0 0   sub add { return &_store(shift, 'add', @_); }
97 0     0 0   sub replace { return &_store(shift, 'replace', @_); }
98 0     0 0   sub append { return &_modify(shift, 'append', @_); }
99 0     0 0   sub prepend { return &_modify(shift, 'prepend', @_); }
100              
101             sub _modify {
102 0     0     my $self = shift;
103 0           my ($cmd, $key, $value) = @_;
104 0           return $self->_store($cmd, $key, $value);
105             }
106              
107             sub _store {
108 0     0     my $self = shift;
109 0           my $cmd = shift;
110 0           my $key = shift;
111 0           my $value = shift;
112 0           my %option = @_;
113              
114 0   0       my $flags = $option{flags} || 0;
115 0   0       my $expire = $option{expire} || 0;
116             my $bytes = sub {
117 2     2   972 use bytes;
  2         23  
  2         9  
118 0     0     return length $value;
119 0           }->();
120              
121 0           $self->{socket}->write("$cmd $key $flags $expire $bytes\r\n");
122 0           $self->{socket}->write("$value\r\n");
123 0           my $response = eval {
124 0           return $self->_readline;
125             };
126 0 0         if ($@) {
127 0           confess qq{Failed to store data by "$cmd"! ($key, $value) ERROR: } . $@;
128             }
129 0 0         if ($response !~ m/^STORED/) {
130 0           debug qq{Failed to $cmd data as ($key, $value)};
131 0           return;
132             }
133 0           return 1;
134             }
135              
136             sub cas {
137 0     0 0   my $self = shift;
138 0           my $key = shift;
139 0           my $value = shift;
140 0           my $cas = shift;
141 0           my %option = @_;
142              
143 0   0       my $flags = $option{flags} || 0;
144 0   0       my $expire = $option{expire} || 0;
145             my $bytes = sub {
146 2     2   387 use bytes;
  2         3  
  2         6  
147 0     0     return length $value;
148 0           }->();
149              
150 0           $self->{socket}->write("cas $key $flags $expire $bytes $cas\r\n");
151 0           $self->{socket}->write("$value\r\n");
152 0           my $response = eval {
153 0           return $self->_readline;
154             };
155 0 0         if ($@) {
156 0           confess qq{Failed to store data by "cas"! ($key, $value) ERROR: } . $@;
157             }
158 0 0         if ($response !~ m/^STORED/) {
159 0           debug qq{Failed to set data as ($key, $value) with cas $cas};
160 0           return;
161             }
162 0           return 1;
163             }
164              
165             sub delete {
166 0     0 0   my $self = shift;
167 0           my $key = shift;
168              
169 0           my $response = $self->query_one("delete $key");
170 0 0         if ($response !~ m/^DELETED/) {
171 0           warn "Failed to delete '$key'";
172 0           return;
173             }
174 0           return 1;
175             }
176              
177             sub touch {
178 0     0 0   my $self = shift;
179 0           my $key = shift;
180 0           my $expire = shift;
181              
182 0           my $response = $self->query_one("touch $key $expire");
183 0 0         if ($response =~ m/^NOT_FOUND/) {
    0          
184 0           debug "No such data KEY '$key'";
185 0           return;
186             } elsif ($response !~ m/^TOUCHED/) {
187 0           warn "Failed to touch '$key' with EXPIRE '$expire'. RES: $response";
188 0           return;
189             }
190 0           return 1;
191             }
192              
193 0     0 0   sub incr { return &_incr_decr(shift, 'incr', @_); }
194 0     0 0   sub decr { return &_incr_decr(shift, 'decr', @_); }
195              
196             sub _incr_decr {
197 0     0     my $self = shift;
198 0           my $cmd = shift;
199 0           my $key = shift;
200 0           my $number = shift;
201              
202 0           my $response = $self->query_one("$cmd $key $number");
203 0 0         if ($response =~ m/^NOT_FOUND/) {
    0          
204 0           warn "No such data KEY '$key'";
205 0           return;
206             } elsif ($response !~ m/^(\d+)/) {
207 0           warn "Failed to $cmd '$key' by number '$number'. RES: $response";
208 0           return;
209             }
210 0           my $new_value = $1;
211 0           return $new_value;
212             }
213              
214             sub query_one {
215 0     0 0   my $self = shift;
216 0           my $query = shift;
217              
218 0           $self->{socket}->write("$query\r\n");
219 0           my $response = eval {
220 0           return $self->_readline;
221             };
222 0 0         if ($@) {
223 0           confess "Failed to query! query: $query ERROR: " . $@;
224             }
225 0 0         chomp $response if $response;
226 0           return $response;
227             }
228              
229             sub query_any {
230 0     0 0   my $self = shift;
231 0           my $query = shift;
232              
233 0           $self->{socket}->write("$query\r\n");
234              
235             # Save blocking mode
236 0           my $blocking_mode = $self->{socket}->blocking;
237              
238 0           my $response = eval {
239 0     0     local $SIG{ALRM} = sub { die 'Timed out to Read Socket.' };
  0            
240 0           alarm 5;
241 0           my $resp = q{};
242 0           $self->{socket}->blocking(0);
243             my $getline_from_sock = sub {
244 0     0     for my $i (1..3) {
245 0           my $line = $self->{socket}->getline;
246 0 0         return $line if defined $line;
247             #debug "failed to getline - $i. query: $query";
248 0           Time::HiRes::sleep(0.01);
249             }
250 0           return;
251 0           };
252 0           while (my $line = $getline_from_sock->()) {
253 0           $resp .= $line;
254             }
255 0           alarm 0;
256 0           return $resp;
257             };
258 0           my $err = $@;
259              
260             # Restore blocking mode
261 0           $self->{socket}->blocking($blocking_mode);
262              
263 0 0         if ($err) {
264 0           confess "Failed to query! query: $query ERROR: " . $err;
265             }
266              
267 0           return $response;
268             }
269              
270             sub query {
271 0     0 0   my $self = shift;
272 0           my $query = shift;
273 0           my $response = eval {
274 0           return $self->_query($query);
275             };
276 0 0         if ($@) {
277 0           confess "Failed to query! query: $query ERROR: " . $@;
278             }
279 0           return $response;
280             }
281              
282             sub _query {
283 0     0     my $self = shift;
284 0           my $query = shift;
285              
286 0           $self->{socket}->write("$query\r\n");
287              
288 0           my @response;
289 0           while (1) {
290 0           my $line = $self->_readline;
291 0           $line =~ s/[\r\n]+$//;
292 0 0         last if ($line =~ m/^(OK|END)/);
293 0 0         die $line if ($line =~ m/^(CLIENT|SERVER_)?ERROR/);
294 0           push @response, $line;
295             }
296              
297 0           return \@response;
298             }
299              
300             sub _readline {
301 0     0     my $self = shift;
302 0     0     local $SIG{ALRM} = sub { die 'Timed out to Read Socket.' };
  0            
303 0           alarm 3;
304 0           my $line = $self->{socket}->getline;
305 0           alarm 0;
306 0           return $line;
307             }
308              
309             sub DESTROY {
310 0     0     my $self = shift;
311 0 0         if ($self->{socket}) { $self->{socket}->close; }
  0            
312             }
313              
314             1;
315             __END__