File Coverage

lib/AnyEvent/Memcached.pm
Criterion Covered Total %
statement 54 374 14.4
branch 0 154 0.0
condition 0 81 0.0
subroutine 19 58 32.7
pod 17 17 100.0
total 90 684 13.1


line stmt bran cond sub pod time code
1             package AnyEvent::Memcached;
2              
3 4     4   193312 use 5.8.8;
  4         11  
4              
5             =head1 NAME
6              
7             AnyEvent::Memcached - AnyEvent memcached client
8              
9             =cut
10              
11             our $VERSION = '0.08';
12              
13             =head1 SYNOPSIS
14              
15             use AnyEvent::Memcached;
16              
17             my $memd = AnyEvent::Memcached->new(
18             servers => [ "10.0.0.15:11211", "10.0.0.15:11212" ], # same as in Cache::Memcached
19             debug => 1,
20             compress_threshold => 10000,
21             namespace => 'my-namespace:',
22              
23             # May use another hashing algo:
24             hasher => 'AnyEvent::Memcached::Hash::WithNext',
25              
26             cv => $cv, # AnyEvent->condvar: group callback
27             );
28              
29             $memd->set_servers([ "10.0.0.15:11211", "10.0.0.15:11212" ]);
30              
31             # Basic methods are like in Cache::Memcached, but with additional cb => sub { ... };
32             # first argument to cb is return value, second is the error(s)
33              
34             $memd->set( key => $value, cb => sub {
35             shift or warn "Set failed: @_"
36             } );
37              
38             # Single get
39             $memd->get( 'key', cb => sub {
40             my ($value,$err) = shift;
41             $err and return warn "Get failed: @_";
42             warn "Value for key is $value";
43             } );
44              
45             # Multi-get
46             $memd->get( [ 'key1', 'key2' ], cb => sub {
47             my ($values,$err) = shift;
48             $err and return warn "Get failed: @_";
49             warn "Value for key1 is $values->{key1} and value for key2 is $values->{key2}"
50             } );
51              
52             # Additionally there is rget (see memcachedb-1.2.1-beta)
53              
54             $memd->rget( 'fromkey', 'tokey', cb => sub {
55             my ($values,$err) = shift;
56             $err and warn "Get failed: @_";
57             while (my ($key,$value) = each %$values) {
58             # ...
59             }
60             } );
61              
62             # Rget with sorted responce values
63             $memd->rget( 'fromkey', 'tokey', rv => 'array' cb => sub {
64             my ($values,$err) = shift;
65             $err and warn "Get failed: @_";
66             for (0 .. $#values/2) {
67             my ($key,$value) = @$values[$_*2,$_*2+1];
68             }
69             } );
70              
71             =head1 DESCRIPTION
72              
73             Asyncronous C client for L framework
74              
75             =head1 NOTICE
76              
77             There is a notices in L related to this module. They all has been fixed
78              
79             =over 4
80              
81             =item Prerequisites
82              
83             We no longer need L and L. At all, the dependency list is like in L + L
84              
85             =item Binary protocol
86              
87             It seems to me, that usage of binary protocol from pure perl gives very little advantage. So for now I don't implement it
88              
89             =item Unimplemented Methods
90              
91             There is a note, that get_multi is not implementeted. In fact, it was implemented by method L, but the documentation was wrong.
92              
93             =back
94              
95             In general, this module follows the spirit of L rather than correspondence to L interface.
96              
97             =cut
98              
99 4     4   2108 use common::sense 2;m{
  4         117  
  4         20  
100             use strict;
101             use warnings;
102             }x;
103              
104 4     4   282 use Carp;
  4         11  
  4         296  
105 4     4   1127 use AnyEvent 5;
  4         5347  
  4         114  
106             #use Devel::Leak::Cb;
107              
108 4     4   861 use AnyEvent::Socket;
  4         25180  
  4         357  
109 4     4   3353 use AnyEvent::Handle;
  4         26011  
  4         143  
110 4     4   2133 use AnyEvent::Connection;
  4         44941  
  4         123  
111 4     4   26 use AnyEvent::Connection::Util;
  4         6  
  4         15  
112 4     4   1741 use AnyEvent::Memcached::Conn;
  4         7  
  4         120  
113 4     4   2707 use Storable ();
  4         11258  
  4         122  
114              
115 4     4   1743 use AnyEvent::Memcached::Peer;
  4         16  
  4         115  
116 4     4   1570 use AnyEvent::Memcached::Hash;
  4         8  
  4         123  
117 4     4   1478 use AnyEvent::Memcached::Buckets;
  4         7  
  4         135  
118              
119             # flag definitions
120 4     4   21 use constant F_STORABLE => 1;
  4         4  
  4         336  
121 4     4   17 use constant F_COMPRESS => 2;
  4         5  
  4         188  
122              
123             # size savings required before saving compressed value
124 4     4   26 use constant COMPRESS_SAVINGS => 0.20; # percent
  4         5  
  4         305  
125              
126             our $HAVE_ZLIB;
127             BEGIN {
128 4     4   292 $HAVE_ZLIB = eval "use Compress::Zlib (); 1;";
  4     4   2791  
  4         223218  
  4         56  
129             }
130              
131             =head1 METHODS
132              
133             =head2 new %args
134              
135             Currently supported options:
136              
137             =over 4
138              
139             =item servers
140             =item namespace
141             =item debug
142             =item cv
143             =item compress_threshold
144             =item compress_enable
145             =item timeout
146             =item hasher
147              
148             If set, will use instance of this class for hashing instead of default.
149             For implementing your own hashing, see sources of L and L
150              
151             =item noreply
152              
153             If true, additional connection will established for noreply commands.
154              
155             =item cas
156              
157             If true, will enable cas/gets commands (since they are not suppotred in memcachedb)
158              
159             =back
160              
161             =cut
162              
163             sub new {
164 0     0 1   my $self = bless {}, shift;
165 0           my %args = @_;
166 0 0         $self->{namespace} = exists $args{namespace} ? delete $args{namespace} : '';
167 0           for (qw( debug cv compress_threshold compress_enable timeout noreply cas)) {
168 0 0         $self->{$_} = exists $args{$_} ? delete $args{$_} : 0;
169             }
170 0   0       $self->{timeout} ||= 3;
171 0   0       $self->{_bucker} = $args{bucker} || 'AnyEvent::Memcached::Buckets';
172 0   0       $self->{_hasher} = $args{hasher} || 'AnyEvent::Memcached::Hash';
173              
174 0           $self->set_servers(delete $args{servers});
175 0 0 0       $self->{compress_enable} and !$HAVE_ZLIB and carp("Have no Compress::Zlib installed, but have compress_enable option");
176 0 0         carp "@{[ keys %args ]} options are not supported yet" if %args;
  0            
177 0 0         Carp::confess "Invalid characters in 'namespace' option: '$self->{namespace}'" if $self->{namespace} =~ /[\x00-\x20\x7F]/;
178 0           $self;
179             }
180              
181             =head2 set_servers
182              
183             Setup server list
184              
185             =cut
186              
187             sub set_servers {
188 0     0 1   my $self = shift;
189 0           my $list = shift;
190 0           my $buckets = $self->{_bucker}->new(servers => $list);
191             #warn R::Dump($list, $buckets);
192 0           $self->{hash} = $self->{_hasher}->new(buckets => $buckets);
193             $self->{peers} =
194 0           my $peers = $buckets->peers;
195 0           for my $peer ( values %{ $peers } ) {
  0            
196             $peer->{con} = AnyEvent::Memcached::Peer->new(
197             port => $peer->{port},
198             host => $peer->{host},
199             timeout => $self->{timeout},
200             debug => $self->{debug},
201 0           );
202             # Noreply connection
203 0 0         if ($self->{noreply}) {
204             $peer->{nrc} = AnyEvent::Memcached::Peer->new(
205             port => $peer->{port},
206             host => $peer->{host},
207             timeout => $self->{timeout},
208             debug => $self->{debug},# || 1,
209 0           );
210             }
211             }
212 0           return $self;
213             }
214              
215             =head2 connect
216              
217             Establish connection to all servers and invoke event C, when ready
218              
219             =cut
220              
221             sub connect {
222 0     0 1   my $self = shift;
223             $_->{con}->connect
224 0           for values %{ $self->{peers} };
  0            
225             }
226              
227             sub _handle_errors {
228 0     0     my $self = shift;
229 0           my $peer = shift;
230 0           local $_ = shift;
231 0 0         if ($_ eq 'ERROR') {
    0          
232 0           warn "Error";
233             }
234             elsif (/(CLIENT|SERVER)_ERROR (.*)/) {
235 0           warn ucfirst(lc $1)." error: $2";
236             }
237             else {
238 0           warn "Bad response from $peer->{host}:$peer->{port}: $_";
239             }
240             }
241              
242             sub _do {
243 0     0     my $self = shift;
244 0 0 0       my $key = shift; utf8::decode($key) xor utf8::encode($key) if utf8::is_utf8($key);
  0            
245 0 0 0       my $command = shift; utf8::decode($command) xor utf8::encode($command) if utf8::is_utf8($command);
  0            
246 0           my $worker = shift; # CODE
247 0           my %args = @_;
248 0           my $servers = $self->{hash}->servers($key);
249 0           my %res;
250             my %err;
251 0           my $res;
252              
253 0 0         if ($key =~ /[\x00-\x20\x7F]/) {
254 0           carp "Invalid characters in key '$key'";
255 0 0         return $args{cb} ? $args{cb}(undef, "Invalid key") : 0;
256             }
257 0 0 0       if ($args{noreply} and !$self->{noreply}) {
258 0 0         if (!$args{cb}) {
259 0           carp "Noreply option not set, but noreply command requested. command ignored";
260 0           return 0;
261             } else {
262 0           carp "Noreply option not set, but noreply command requested. fallback to common command";
263             }
264 0           delete $args{noreply};
265             }
266 0 0         if ($args{noreply}) {
267 0           for my $srv ( keys %$servers ) {
268 0           for my $real (@{ $servers->{$srv} }) {
  0            
269 0           my $cmd = $command.' noreply';
270 0           substr($cmd, index($cmd,'%s'),2) = $real;
271 0           $self->{peers}{$srv}{nrc}->request($cmd);
272 0           $self->{peers}{$srv}{lastnr} = $cmd;
273 0 0         unless ($self->{peers}{$srv}{nrc}->handles('command')) {
274             $self->{peers}{$srv}{nrc}->reg_cb(command => sub { # cb {
275 0     0     shift;
276 0           warn "Got data from $srv noreply connection (while shouldn't): @_\nLast noreply command was $self->{peers}{$srv}{lastnr}\n";
277 0           });
278 0           $self->{peers}{$srv}{nrc}->want_command();
279             }
280             }
281             }
282 0 0         $args{cb}(1) if $args{cb};
283 0           return 1;
284             }
285 0   0       $_ and $_->begin for $self->{cv}, $args{cv};
286             my $cv = AE::cv {
287             #use Data::Dumper;
288             #warn Dumper $res,\%res,\%err;
289 0 0   0     if ($res != -1) {
    0          
290 0           $args{cb}($res);
291             }
292             elsif (!%err) {
293 0           warn "-1 while not err";
294 0           $args{cb}($res{$key});
295             }
296             else {
297 0           $args{cb}(undef, dumper($err{$key}));
298             }
299             #warn "cv end";
300            
301 0   0       $_ and $_->end for $args{cv}, $self->{cv};
302 0           };
303 0           $cv->begin;
304 0           for my $srv ( keys %$servers ) {
305 0           for my $real (@{ $servers->{$srv} }) {
  0            
306 0           $cv->begin;
307 0           my $cmd = $command;
308 0           substr($cmd, index($cmd,'%s'),2) = $real;
309             $self->{peers}{$srv}{con}->command(
310             $cmd,
311             cb => sub { # cb {
312 0 0   0     if (defined( local $_ = shift )) {
313 0           my ($ok,$fail) = $worker->($_);
314 0 0         if (defined $ok) {
315 0           $res{$real}{$srv} = $ok;
316 0 0 0       $res = (!defined $res ) || $res == $ok ? $ok : -1;
317             } else {
318 0           $err{$real}{$srv} = $fail;
319 0           $res = -1;
320             }
321             } else {
322 0           warn "do failed: @_/$!";
323 0           $err{$real}{$srv} = $_;
324 0           $res = -1;
325             }
326 0           $cv->end;
327             }
328 0           );
329             }
330             }
331 0           $cv->end;
332 0           return;
333             }
334              
335             sub _set {
336 0     0     my $self = shift;
337 0           my $cmd = shift;
338 0           my $key = shift;
339 0           my $cas;
340 0 0         if ($cmd eq 'cas') {
341 0           $cas = shift;
342             }
343 0           my $val = shift;
344 0           my %args = @_;
345 0 0         return $args{cb}(undef, "Readonly") if $self->{readonly};
346 0 0         if ($cas =~ /\D/) {
347 0           carp "Invalid characters in cas '$cas'";
348 0           return $args{cb}(undef, "Invalid cas");
349             }
350              
351             #warn "cv begin";
352              
353 4     4   35 use bytes; # return bytes from length()
  4         5  
  4         18  
354              
355 0 0         warn "value for memkey:$key is not defined" unless defined $val;
356 0           my $flags = 0;
357 0 0         if (ref $val) {
358 0           local $Carp::CarpLevel = 2;
359 0           $val = Storable::nfreeze($val);
360 0           $flags |= F_STORABLE;
361             }
362 0           my $len = length($val);
363              
364 0 0 0       if ( $self->{compress_threshold} and $HAVE_ZLIB
      0        
      0        
365             and $self->{compress_enable} and $len >= $self->{compress_threshold}) {
366              
367 0           my $c_val = Compress::Zlib::memGzip($val);
368 0           my $c_len = length($c_val);
369              
370             # do we want to keep it?
371 0 0         if ($c_len < $len*(1 - COMPRESS_SAVINGS)) {
372 0           $val = $c_val;
373 0           $len = $c_len;
374 0           $flags |= F_COMPRESS;
375             }
376             }
377              
378 0   0       my $expire = int($args{expire} || 0);
379             return $self->_do(
380             $key,
381             "$cmd $self->{namespace}%s $flags $expire $len".(defined $cas ? ' '.$cas : '')."\015\012$val",
382             sub { # cb {
383 0     0     local $_ = shift;
384 0 0         if ($_ eq 'STORED') { return 1 }
  0 0          
    0          
385 0           elsif ($_ eq 'NOT_STORED') { return 0 }
386 0           elsif ($_ eq 'EXISTS') { return 0 }
387 0           else { return undef, $_ }
388             },
389             cb => $args{cb},
390 0 0         );
391 0   0       $_ and $_->begin for $self->{cv}, $args{cv};
392 0           my $servers = $self->{hash}->servers($key);
393 0           my %res;
394             my %err;
395 0           my $res;
396             my $cv = AE::cv {
397 0 0   0     if ($res != -1) {
    0          
398 0           $args{cb}($res);
399             }
400             elsif (!%err) {
401 0           warn "-1 while not err";
402 0           $args{cb}($res{$key});
403             }
404             else {
405 0           $args{cb}(undef, dumper($err{$key}));
406             }
407 0           warn "cv end";
408            
409 0   0       $_ and $_->end for $args{cv}, $self->{cv};
410 0           };
411 0           for my $srv ( keys %$servers ) {
412             # ??? Can hasher return more than one key for single key passed?
413             # If no, need to remove this inner loop
414             #warn "server for $key = $srv, $self->{peers}{$srv}";
415 0           for my $real (@{ $servers->{$srv} }) {
  0            
416 0           $cv->begin;
417             $self->{peers}{$srv}{con}->command(
418             "$cmd $self->{namespace}$real $flags $expire $len\015\012$val",
419             cb => sub { # cb {
420 0 0   0     if (defined( local $_ = shift )) {
421 0 0         if ($_ eq 'STORED') {
    0          
    0          
422 0           $res{$real}{$srv} = 1;
423 0 0 0       $res = (!defined $res)||$res == 1 ? 1 : -1;
424             }
425             elsif ($_ eq 'NOT_STORED') {
426 0           $res{$real}{$srv} = 0;
427 0 0 0       $res = (!defined $res)&&$res == 0 ? 0 : -1;
428             }
429             elsif ($_ eq 'EXISTS') {
430 0           $res{$real}{$srv} = 0;
431 0 0 0       $res = (!defined $res)&&$res == 0 ? 0 : -1;
432             }
433             else {
434 0           $err{$real}{$srv} = $_;
435 0           $res = -1;
436             }
437             } else {
438 0           warn "set failed: @_/$!";
439             #$args{cb}(undef, @_);
440 0           $err{$real}{$srv} = $_;
441 0           $res = -1;
442             }
443 0           $cv->end;
444             }
445 0           );
446             }
447             }
448 0           return;
449             }
450              
451             =head2 set( $key, $value, [cv => $cv], [ expire => $expire ], cb => $cb->( $rc, $err ) )
452              
453             Unconditionally sets a key to a given value in the memcache.
454              
455             C<$rc> is
456              
457             =over 4
458              
459             =item '1'
460              
461             Successfully stored
462              
463             =item '0'
464              
465             Item was not stored
466              
467             =item undef
468              
469             Error happens, see C<$err>
470              
471             =back
472              
473             =head2 cas( $key, $cas, $value, [cv => $cv], [ expire => $expire ], cb => $cb->( $rc, $err ) )
474              
475             $memd->gets($key, cb => sub {
476             my $value = shift;
477             unless (@_) { # No errors
478             my ($cas,$val) = @$value;
479             # Change your value in $val
480             $memd->cas( $key, $cas, $value, cb => sub {
481             my $rc = shift;
482             if ($rc) {
483             # stored
484             } else {
485             # ...
486             }
487             });
488             }
489             })
490              
491             C<$rc> is the same, as for L
492              
493             Store the C<$value> on the server under the C<$key>, but only if CAS value associated with this key is equal to C<$cas>. See also L
494              
495             =head2 add( $key, $value, [cv => $cv], [ expire => $expire ], cb => $cb->( $rc, $err ) )
496              
497             Like C, but only stores in memcache if the key doesn't already exist.
498              
499             =head2 replace( $key, $value, [cv => $cv], [ expire => $expire ], cb => $cb->( $rc, $err ) )
500              
501             Like C, but only stores in memcache if the key already exists. The opposite of add.
502              
503             =head2 append( $key, $value, [cv => $cv], [ expire => $expire ], cb => $cb->( $rc, $err ) )
504              
505             Append the $value to the current value on the server under the $key.
506              
507             B command first appeared in memcached 1.2.4.
508              
509             =head2 prepend( $key, $value, [cv => $cv], [ expire => $expire ], cb => $cb->( $rc, $err ) )
510              
511             Prepend the $value to the current value on the server under the $key.
512              
513             B command first appeared in memcached 1.2.4.
514              
515             =cut
516              
517 0     0 1   sub set { shift->_set( set => @_) }
518             sub cas {
519 0     0 1   my $self = shift;
520 0 0         unless ($self->{cas}) { shift;shift;my %args = @_;return $args{cb}(undef, "CAS not enabled") }
  0            
  0            
  0            
  0            
521 0           $self->_set( cas => @_)
522             }
523 0     0 1   sub add { shift->_set( add => @_) }
524 0     0 1   sub replace { shift->_set( replace => @_) }
525 0     0 1   sub append { shift->_set( append => @_) }
526 0     0 1   sub prepend { shift->_set( prepend => @_) }
527              
528             =head2 get( $key, [cv => $cv], [ expire => $expire ], cb => $cb->( $rc, $err ) )
529              
530             Retrieve the value for a $key. $key should be a scalar
531              
532             =head2 get( $keys : ARRAYREF, [cv => $cv], [ expire => $expire ], cb => $cb->( $values_hash, $err ) )
533              
534             Retrieve the values for a $keys. Return a hash with keys/values
535              
536             =head2 gets( $key, [cv => $cv], [ expire => $expire ], cb => $cb->( $rc, $err ) )
537              
538             Retrieve the value and its CAS for a $key. $key should be a scalar.
539              
540             C<$rc> is a reference to an array [$cas, $value], or nothing for non-existent key
541              
542             =head2 gets( $keys : ARRAYREF, [cv => $cv], [ expire => $expire ], cb => $cb->( $rc, $err ) )
543              
544             Retrieve the values and their CAS for a $keys.
545              
546             C<$rc> is a hash reference with $rc->{$key} is a reference to an array [$cas, $value]
547              
548             =cut
549              
550             sub _deflate {
551 0     0     my $self = shift;
552 0           my $result = shift;
553 0 0         for (
    0          
554             ref $result eq 'ARRAY' ?
555 0           @$result ? @$result[ map { $_*2+1 } 0..int( $#$result / 2 ) ] : ()
556             : values %$result
557             ) {
558 0 0 0       if ($HAVE_ZLIB and $_->{flags} & F_COMPRESS) {
559 0           $_->{data} = Compress::Zlib::memGunzip($_->{data});
560             }
561 0 0         if ($_->{flags} & F_STORABLE) {
562 0 0         eval{ $_->{data} = Storable::thaw($_->{data}); 1 } or delete $_->{data};
  0            
  0            
563             }
564 0 0         if (exists $_->{cas}) {
565 0           $_ = [$_->{cas},$_->{data}];
566             } else {
567 0           $_ = $_->{data};
568             }
569             }
570 0           return;
571             }
572              
573             sub _get {
574 0     0     my $self = shift;
575 0           my $cmd = shift;
576 0           my $keys = shift;
577 0           my %args = @_;
578 0           my $array;
579 0 0 0       if (ref $keys and ref $keys eq 'ARRAY') {
580 0           $array = 1;
581             }
582 0 0         if (my ($key) = grep { /[\x00-\x20\x7F]/ } $array ? @$keys : $keys) {
  0 0          
583 0           carp "Invalid characters in key '$key'";
584 0 0         return $args{cb} ? $args{cb}(undef, "Invalid key") : 0;
585             }
586              
587 0   0       $_ and $_->begin for $self->{cv}, $args{cv};
588 0           my $servers = $self->{hash}->servers($keys, for => 'get');
589 0           my %res;
590             my $cv = AE::cv {
591 0     0     $self->_deflate(\%res);
592 0 0         $args{cb}( $array ? \%res : $res{ $keys } );
593 0   0       $_ and $_->end for $args{cv}, $self->{cv};
594 0           };
595 0           for my $srv ( keys %$servers ) {
596             #warn "server for $key = $srv, $self->{peers}{$srv}";
597 0           $cv->begin;
598 0           my $keys = join(' ',map "$self->{namespace}$_", @{ $servers->{$srv} });
  0            
599 0           $self->{peers}{$srv}{con}->request( "$cmd $keys" );
600             $self->{peers}{$srv}{con}->reader( id => $srv.'+'.$keys, res => \%res, namespace => $self->{namespace}, cb => sub { # cb {
601 0     0     $cv->end;
602 0           });
603             }
604 0           return;
605             }
606 0     0 1   sub get { shift->_get(get => @_) }
607             sub gets {
608 0     0 1   my $self = shift;
609 0 0         unless ($self->{cas}) { shift;my %args = @_;return $args{cb}(undef, "CAS not enabled") }
  0            
  0            
  0            
610 0           $self->_get(gets => @_)
611             }
612              
613             =head2 delete( $key, [cv => $cv], [ noreply => 1 ], cb => $cb->( $rc, $err ) )
614              
615             Delete $key and its value from the cache.
616              
617             If C is true, cb doesn't required
618              
619             =head2 del
620              
621             Alias for "delete"
622              
623             =head2 remove
624              
625             Alias for "delete"
626              
627             =cut
628              
629             sub delete {
630 0     0 1   my $self = shift;
631 0           my ($cmd) = (caller(0))[3] =~ /([^:]+)$/;
632 0           my $key = shift;
633 0           my %args = @_;
634 0 0         return $args{cb}(undef, "Readonly") if $self->{readonly};
635 0 0         my $time = $args{delay} ? " $args{delay}" : '';
636             return $self->_do(
637             $key,
638             "delete $self->{namespace}%s$time",
639             sub { # cb {
640 0     0     local $_ = shift;
641 0 0         if ($_ eq 'DELETED') { return 1 }
  0 0          
642 0           elsif ($_ eq 'NOT_FOUND') { return 0 }
643 0           else { return undef, $_ }
644             },
645             cb => $args{cb},
646             noreply => $args{noreply},
647 0           );
648             }
649             *del = \&delete;
650             *remove = \&delete;
651              
652             =head2 incr( $key, $increment, [cv => $cv], [ noreply => 1 ], cb => $cb->( $rc, $err ) )
653              
654             Increment the value for the $key by $delta. Starting with memcached 1.3.3 $key should be set to a number or the command will fail.
655             Note that the server doesn't check for overflow.
656              
657             If C is true, cb doesn't required, and if passed, simply called with rc = 1
658              
659             Similar to DBI, zero is returned as "0E0", and evaluates to true in a boolean context.
660              
661             =head2 decr( $key, $decrement, [cv => $cv], [ noreply => 1 ], cb => $cb->( $rc, $err ) )
662              
663             Opposite to C
664              
665             =cut
666              
667             sub _delta {
668 0     0     my $self = shift;
669 0           my ($cmd) = (caller(1))[3] =~ /([^:]+)$/;
670 0           my $key = shift;
671 0           my $val = shift;
672 0           my %args = @_;
673 0 0         return $args{cb}(undef, "Readonly") if $self->{readonly};
674             return $self->_do(
675             $key,
676             "$cmd $self->{namespace}%s $val",
677             sub { # cb {
678 0     0     local $_ = shift;
679 0 0         if ($_ eq 'NOT_FOUND') { return 0 }
  0 0          
680 0 0         elsif (/^(\d+)$/) { return $1 eq '0' ? '0E0' : $_ }
681 0           else { return undef, $_ }
682             },
683             cb => $args{cb},
684             noreply => $args{noreply},
685 0           );
686             }
687 0     0 1   sub incr { shift->_delta(@_) }
688 0     0 1   sub decr { shift->_delta(@_) }
689              
690             #rget \r\n
691             #
692             #- where the query starts.
693             #- where the query ends.
694             #- indicates the openness of left side, 0 means the result includes , while 1 means not.
695             #- indicates the openness of right side, 0 means the result includes , while 1 means not.
696             #- how many items at most return, max is 100.
697              
698             # rget ($from,$till, '+left' => 1, '+right' => 0, max => 10, cb => sub { ... } );
699              
700             =head2 rget( $from, $till, [ max => 100 ], [ '+left' => 1 ], [ '+right' => 1 ], [cv => $cv], [ rv => 'array' ], cb => $cb->( $rc, $err ) )
701              
702             Memcachedb 1.2.1-beta implements rget method, that allows to look through the whole storage
703              
704             =over 4
705              
706             =item $from
707              
708             the starting key
709              
710             =item $till
711              
712             finishing key
713              
714             =item +left
715              
716             If true, then starting key will be included in results. true by default
717              
718             =item +right
719              
720             If true, then finishing key will be included in results. true by default
721              
722             =item max
723              
724             Maximum number of results to fetch. 100 is the maximum and is the default
725              
726             =item rv
727              
728             If passed rv => 'array', then the return value will be arrayref with values in order, returned by memcachedb.
729              
730             =back
731              
732             =cut
733              
734             sub rget {
735 0     0 1   my $self = shift;
736             #my ($cmd) = (caller(0))[3] =~ /([^:]+)$/;
737 0           my $cmd = 'rget';
738 0           my $from = shift;
739 0           my $till = shift;
740 0           my %args = @_;
741 0           my ($lkey,$rkey);
742             #$lkey = ( exists $args{'+left'} && !$args{'+left'} ) ? 1 : 0;
743 0 0         $lkey = exists $args{'+left'} ? $args{'+left'} ? 0 : 1 : 0;
    0          
744 0 0         $rkey = exists $args{'+right'} ? $args{'+right'} ? 0 : 1 : 0;
    0          
745 0   0       $args{max} ||= 100;
746              
747 0           my $result;
748 0 0         if (lc $args{rv} eq 'array') {
749 0           $result = [];
750             } else {
751 0           $result = {};
752             }
753 0           my $err;
754 0           my $cv = AnyEvent->condvar;
755 0   0       $_ and $_->begin for $self->{cv}, $args{cv};
756             $cv->begin(sub {
757 0     0     undef $cv;
758 0           $self->_deflate($result);
759 0 0         $args{cb}( $err ? (undef,$err) : $result );
760 0           undef $result;
761 0   0       $_ and $_->end for $args{cv}, $self->{cv};
762 0           });
763              
764 0           for my $peer (keys %{$self->{peers}}) {
  0            
765 0           $cv->begin;
766 0           my $do;$do = sub {
767 0     0     undef $do;
768 0           $self->{peers}{$peer}{con}->request( "$cmd $self->{namespace}$from $self->{namespace}$till $lkey $rkey $args{max}" );
769             $self->{peers}{$peer}{con}->reader( id => $peer, res => $result, namespace => $self->{namespace}, cb => sub {
770             #warn "rget from: $peer";
771 0           $cv->end;
772 0           });
773 0           };
774 0 0         if (exists $self->{peers}{$peer}{rget_ok}) {
775 0 0         if ($self->{peers}{$peer}{rget_ok}) {
776 0           $do->();
777             } else {
778             #warn
779 0           $err = "rget not supported on peer $peer";
780 0           $cv->end;
781             }
782             } else {
783             $self->{peers}{$peer}{con}->command( "$cmd 1 0 0 0 1", cb => sub {
784 0     0     local $_ = shift;
785 0 0         if (defined $_) {
786 0 0         if ($_ eq 'END') {
787 0           $self->{peers}{$peer}{rget_ok} = 1;
788 0           $do->();
789             }
790             else {
791             #warn
792 0           $err = "rget not supported on peer $peer: @_";
793 0           $self->{peers}{$peer}{rget_ok} = 0;
794 0           undef $do;
795 0           $cv->end;
796             }
797             } else {
798 0           $err = "@_";
799 0           undef $do;
800 0           $cv->end;
801             }
802 0           } );
803            
804             }
805             }
806 0           $cv->end;
807 0           return;
808             }
809              
810             =head2 incadd ( $key, $increment, [cv => $cv], [ noreply => 1 ], cb => $cb->( $rc, $err ) )
811              
812             Increment key, and if it not exists, add it with initial value. If add fails, try again to incr or fail
813              
814             =cut
815              
816             sub incadd {
817 0     0 1   my $self = shift;
818 0           my $key = shift;
819 0           my $val = shift;
820 0           my %args = @_;
821             $self->incr($key => $val, cb => sub {
822 0 0 0 0     if (my $rc = shift or @_) {
823             #if (@_) {
824             # warn("incr failed: @_");
825             #} else {
826             # warn "incr ok";
827             #}
828 0           $args{cb}($rc, @_);
829             }
830             else {
831             $self->add( $key, $val, %args, cb => sub {
832 0 0 0       if ( my $rc = shift or @_ ) {
833             #if (@_) {
834             # warn("add failed: @_");
835             #} else {
836             # warn "add ok";
837             #}
838 0           $args{cb}($val, @_);
839             }
840             else {
841             #warn "add failed, try again";
842 0           $self->incadd($key,$val,%args);
843             }
844 0           });
845             }
846 0           });
847             }
848              
849             =head2 destroy
850              
851             Shutdown object as much, as possible, incl cleaning of incapsulated objects
852              
853             =cut
854              
855       0     sub AnyEvent::Memcached::destroyed::AUTOLOAD {}
856              
857             sub destroy {
858 0     0 1   my $self = shift;
859 0           $self->DESTROY;
860 0           bless $self, "AnyEvent::Memcached::destroyed";
861             }
862              
863             sub DESTROY {
864 0     0     my $self = shift;
865 0 0         warn "(".int($self).") Destroying AE:MC" if $self->{debug};
866 0           for (values %{$self->{peers}}) {
  0            
867 0 0         $_->{con} and $_->{con}->destroy;
868             }
869 0           %$self = ();
870             }
871              
872             =head1 BUGS
873              
874             Feature requests are welcome
875              
876             Bug reports are welcome
877              
878             =head1 AUTHOR
879              
880             Mons Anderson, C<< >>
881              
882             =head1 COPYRIGHT & LICENSE
883              
884             Copyright 2009 Mons Anderson, all rights reserved.
885              
886             This program is free software; you can redistribute it and/or modify it
887             under the same terms as Perl itself.
888              
889             =cut
890              
891             1; # End of AnyEvent::Memcached