File Coverage

blib/lib/App/Memcached/CLI/DataSource.pm
Criterion Covered Total %
statement 23 171 13.4
branch 0 32 0.0
condition 0 10 0.0
subroutine 8 35 22.8
pod 0 16 0.0
total 31 264 11.7


line stmt bran cond sub pod time code
1             package App::Memcached::CLI::DataSource;
2              
3 2     2   6 use strict;
  2         2  
  2         44  
4 2     2   6 use warnings;
  2         2  
  2         38  
5 2     2   28 use 5.008_001;
  2         5  
6              
7 2     2   6 use Carp;
  2         2  
  2         101  
8 2     2   6 use IO::Socket;
  2         2  
  2         18  
9              
10 2     2   1455 use App::Memcached::CLI::Util qw(is_unixsocket debug);
  2         3  
  2         1041  
11              
12             sub new {
13 0     0 0   my $class = shift;
14 0           my %args = @_;
15 0           bless \%args, $class;
16             }
17              
18             sub connect {
19 0     0 0   my $class = shift;
20 0           my $addr = shift;
21 0           my %opts = @_;
22              
23             my $socket = sub {
24 0 0   0     return IO::Socket::UNIX->new(Peer => $addr) if is_unixsocket($addr);
25             return IO::Socket::INET->new(
26             PeerAddr => $addr,
27             Proto => 'tcp',
28 0   0       Timeout => $opts{timeout} || 1,
29             );
30 0           }->();
31 0 0         confess "Can't connect to $addr" unless $socket;
32              
33 0           return $class->new(socket => $socket);
34             }
35              
36             sub get {
37 0     0 0   my $self = shift;
38 0           return $self->_retrieve('get', shift);
39             }
40              
41             sub gets {
42 0     0 0   my $self = shift;
43 0           return $self->_retrieve('gets', shift);
44             }
45              
46             sub _retrieve {
47 0     0     my $self = shift;
48 0           my ($cmd, $keys) = @_;
49              
50 0           my $socket = $self->{socket};
51 0           my $key_str = join(q{ }, @$keys);
52 0           print $socket "$cmd $key_str\r\n";
53              
54 0           my @results;
55              
56 0           while (1) {
57 0           my $response = <$socket>;
58 0 0         next if ($response =~ m/^[\r\n]+$/);
59 0 0         if ($response =~ m/^VALUE (\S+) (\d+) (\d+)(?: (\d+))?/) {
    0          
60 0           my %data = (
61             key => $1,
62             flags => $2,
63             length => $3,
64             cas => $4,
65             );
66 0           read $socket, $response, $data{length};
67 0           $data{value} = $response;
68 0           push @results, \%data;
69             } elsif ($response =~ m/^END/) {
70 0           last;
71             } else {
72 0           warn "Unknown response '$response'";
73             }
74             }
75              
76 0           return \@results;
77             }
78              
79 0     0 0   sub set { return &_store(shift, 'set', @_); }
80 0     0 0   sub add { return &_store(shift, 'add', @_); }
81 0     0 0   sub replace { return &_store(shift, 'replace', @_); }
82 0     0 0   sub append { return &_modify(shift, 'append', @_); }
83 0     0 0   sub prepend { return &_modify(shift, 'prepend', @_); }
84              
85             sub _modify {
86 0     0     my $self = shift;
87 0           my ($cmd, $key, $value) = @_;
88 0           return $self->_store($cmd, $key, $value);
89             }
90              
91             sub _store {
92 0     0     my $self = shift;
93 0           my $cmd = shift;
94 0           my $key = shift;
95 0           my $value = shift;
96 0           my %option = @_;
97              
98 0   0       my $flags = $option{flags} || 0;
99 0   0       my $expire = $option{expire} || 0;
100             my $bytes = sub {
101 2     2   506 use bytes;
  2         8  
  2         10  
102 0     0     return length $value;
103 0           }->();
104              
105 0           my $socket = $self->{socket};
106 0           print $socket "$cmd $key $flags $expire $bytes\r\n";
107 0           print $socket "$value\r\n";
108 0           my $response = $self->_readline;
109 0 0         if ($response !~ m/^STORED/) {
110 0           debug qq{Failed to $cmd data as ($key, $value)};
111 0           return;
112             }
113 0           return 1;
114             }
115              
116             sub cas {
117 0     0 0   my $self = shift;
118 0           my $key = shift;
119 0           my $value = shift;
120 0           my $cas = shift;
121 0           my %option = @_;
122              
123 0   0       my $flags = $option{flags} || 0;
124 0   0       my $expire = $option{expire} || 0;
125             my $bytes = sub {
126 2     2   313 use bytes;
  2         2  
  2         4  
127 0     0     return length $value;
128 0           }->();
129              
130 0           my $socket = $self->{socket};
131 0           print $socket "cas $key $flags $expire $bytes $cas\r\n";
132 0           print $socket "$value\r\n";
133 0           my $response = $self->_readline;
134 0 0         if ($response !~ m/^STORED/) {
135 0           debug qq{Failed to set data as ($key, $value) with cas $cas};
136 0           return;
137             }
138 0           return 1;
139             }
140              
141             sub delete {
142 0     0 0   my $self = shift;
143 0           my $key = shift;
144              
145 0           my $socket = $self->{socket};
146 0           print $socket "delete $key\r\n";
147 0           my $response = $self->_readline;
148 0 0         if ($response !~ m/^DELETED/) {
149 0           warn "Failed to delete '$key'";
150 0           return;
151             }
152 0           return 1;
153             }
154              
155             sub touch {
156 0     0 0   my $self = shift;
157 0           my $key = shift;
158 0           my $expire = shift;
159              
160 0           my $socket = $self->{socket};
161 0           print $socket "touch $key $expire\r\n";
162 0           my $response = $self->_readline;
163 0 0         if ($response =~ m/^NOT_FOUND/) {
    0          
164 0           debug "No such data KEY '$key'";
165 0           return;
166             } elsif ($response !~ m/^TOUCHED/) {
167 0           warn "Failed to touch '$key' with EXPIRE '$expire'. RES: $response";
168 0           return;
169             }
170 0           return 1;
171             }
172              
173 0     0 0   sub incr { return &_incr_decr(shift, 'incr', @_); }
174 0     0 0   sub decr { return &_incr_decr(shift, 'decr', @_); }
175              
176             sub _incr_decr {
177 0     0     my $self = shift;
178 0           my $cmd = shift;
179 0           my $key = shift;
180 0           my $number = shift;
181              
182 0           my $socket = $self->{socket};
183 0           print $socket "$cmd $key $number\r\n";
184 0           my $response = $self->_readline;
185 0 0         if ($response =~ m/^NOT_FOUND/) {
    0          
186 0           warn "No such data KEY '$key'";
187 0           return;
188             } elsif ($response !~ m/^(\d+)/) {
189 0           warn "Failed to $cmd '$key' by number '$number'. RES: $response";
190 0           return;
191             }
192 0           my $new_value = $1;
193 0           return $new_value;
194             }
195              
196             sub version {
197 0     0 0   my $self = shift;
198 0           my $query = shift;
199              
200 0           my $socket = $self->{socket};
201 0           print $socket "version\r\n";
202 0           my $response = $self->_readline;
203 0           chomp $response;
204 0           return $response;
205             }
206              
207             sub query {
208 0     0 0   my $self = shift;
209 0           my $query = shift;
210 0           my $response = eval {
211 0           return $self->_query($query);
212             };
213 0 0         if ($@) {
214 0           confess "Failed to query! query: $query ERROR: " . $@;
215             }
216 0           return $response;
217             }
218              
219             sub _query {
220 0     0     my $self = shift;
221 0           my $query = shift;
222              
223 0           my $socket = $self->{socket};
224 0           print $socket "$query\r\n";
225              
226 0           my @response;
227 0           while (1) {
228 0           my $line = $self->_readline;
229 0           $line =~ s/[\r\n]+$//;
230 0 0         last if ($line =~ m/^(OK|END)/);
231 0 0         die $line if ($line =~ m/^(CLIENT|SERVER_)?ERROR/);
232 0           push @response, $line;
233             }
234              
235 0           return \@response;
236             }
237              
238             sub _readline {
239 0     0     my $self = shift;
240 0           my $socket = $self->{socket};
241 0     0     local $SIG{ALRM} = sub { die 'Timed out to Read Socket.' };
  0            
242 0           alarm 3;
243 0           my $line = <$socket>;
244 0           alarm 0;
245 0           return $line;
246             }
247              
248             sub DESTROY {
249 0     0     my $self = shift;
250 0 0         if ($self->{socket}) { $self->{socket}->close; }
  0            
251             }
252              
253             1;
254             __END__