File Coverage

lib/AnyEvent/Memcached.pm
Criterion Covered Total %
statement 55 364 15.1
branch 0 140 0.0
condition 0 81 0.0
subroutine 19 58 32.7
pod 17 17 100.0
total 91 660 13.7


line stmt bran cond sub pod time code
1             package AnyEvent::Memcached;
2              
3 4     4   291549 use 5.8.8;
  4         19  
  4         357  
4              
5             =head1 NAME
6              
7             AnyEvent::Memcached - AnyEvent memcached client
8              
9             =cut
10              
11             our $VERSION = '0.06';
12              
13             =head1 SYNOPSIS
14              
15             use AnyEvent::Memcached;
16              
17             my $memd = AnyEvent::Memcached->new(
18             servers => [ "10.0.0.15:11211", "10.0.0.15:11212" ], # same as in Cache::Memcached
19             debug => 1,
20             compress_threshold => 10000,
21             namespace => 'my-namespace:',
22            
23             # May use another hashing algo:
24             hasher => 'AnyEvent::Memcached::Hash::WithNext',
25              
26             cv => $cv, # AnyEvent->condvar: group callback
27             );
28            
29             $memd->set_servers([ "10.0.0.15:11211", "10.0.0.15:11212" ]);
30            
31             # Basic methods are like in Cache::Memcached, but with additional cb => sub { ... };
32             # first argument to cb is return value, second is the error(s)
33            
34             $memd->set( key => $value, cb => sub {
35             shift or warn "Set failed: @_"
36             } );
37              
38             # Single get
39             $memd->get( 'key', cb => sub {
40             my ($value,$err) = shift;
41             $err and return warn "Get failed: @_";
42             warn "Value for key is $value";
43             } );
44              
45             # Multi-get
46             $memd->get( [ 'key1', 'key2' ], cb => sub {
47             my ($values,$err) = shift;
48             $err and return warn "Get failed: @_";
49             warn "Value for key1 is $values->{key1} and value for key2 is $values->{key2}"
50             } );
51              
52             # Additionally there is rget (see memcachedb-1.2.1-beta)
53              
54             $memd->rget( 'fromkey', 'tokey', cb => sub {
55             my ($values,$err) = shift;
56             $err and warn "Get failed: @_";
57             while (my ($key,$value) = each %$values) {
58             # ...
59             }
60             } );
61            
62             # Rget with sorted responce values
63             $memd->rget( 'fromkey', 'tokey', rv => 'array' cb => sub {
64             my ($values,$err) = shift;
65             $err and warn "Get failed: @_";
66             for (0 .. $#values/2) {
67             my ($key,$value) = @$values[$_*2,$_*2+1];
68             }
69             } );
70              
71             =head1 DESCRIPTION
72              
73             Asyncronous C client for L framework
74              
75             =head1 NOTICE
76              
77             There is a notices in L related to this module. They all has been fixed
78              
79             =over 4
80              
81             =item Prerequisites
82              
83             We no longer need L and L. At all, the dependency list is like in L + L
84              
85             =item Binary protocol
86              
87             It seems to me, that usage of binary protocol from pure perl gives very little advantage. So for now I don't implement it
88              
89             =item Unimplemented Methods
90              
91             There is a note, that get_multi is not implementeted. In fact, it was implemented by method L, but the documentation was wrong.
92              
93             =back
94              
95             In general, this module follows the spirit of L rather than correspondence to L interface.
96              
97             =cut
98              
99 4     4   3841 use common::sense 2;m{
  4         117  
  4         29  
100             use strict;
101             use warnings;
102             }x;
103              
104 4     4   330 use Carp;
  4         13  
  4         284  
105 4     4   1743 use AnyEvent 5;
  4         6977  
  4         133  
106             #use Devel::Leak::Cb;
107              
108 4     4   1224 use AnyEvent::Socket;
  4         35140  
  4         468  
109 4     4   5253 use AnyEvent::Handle;
  4         34671  
  4         161  
110 4     4   4133 use AnyEvent::Connection;
  4         74738  
  4         175  
111 4     4   51 use AnyEvent::Connection::Util;
  4         10  
  4         21  
112 4     4   2567 use AnyEvent::Memcached::Conn;
  4         9  
  4         115  
113 4     4   4468 use Storable ();
  4         15566  
  4         114  
114              
115 4     4   2521 use AnyEvent::Memcached::Peer;
  4         10  
  4         117  
116 4     4   1948 use AnyEvent::Memcached::Hash;
  4         12  
  4         155  
117 4     4   2673 use AnyEvent::Memcached::Buckets;
  4         10  
  4         144  
118              
119             # flag definitions
120 4     4   30 use constant F_STORABLE => 1;
  4         7  
  4         345  
121 4     4   23 use constant F_COMPRESS => 2;
  4         8  
  4         291  
122              
123             # size savings required before saving compressed value
124 4     4   45 use constant COMPRESS_SAVINGS => 0.20; # percent
  4         9  
  4         367  
125              
126             our $HAVE_ZLIB;
127             BEGIN {
128 4     4   377 $HAVE_ZLIB = eval "use Compress::Zlib (); 1;";
  4     4   10093  
  4         412490  
  4         64  
129             }
130              
131             =head1 METHODS
132              
133             =head2 new %args
134              
135             Currently supported options:
136              
137             =over 4
138              
139             =item servers
140             =item namespace
141             =item debug
142             =item cv
143             =item compress_threshold
144             =item compress_enable
145             =item timeout
146             =item hasher
147              
148             If set, will use instance of this class for hashing instead of default.
149             For implementing your own hashing, see sources of L and L
150              
151             =item noreply
152              
153             If true, additional connection will established for noreply commands.
154              
155             =item cas
156              
157             If true, will enable cas/gets commands (since they are not suppotred in memcachedb)
158              
159             =back
160              
161             =cut
162              
163             sub new {
164 0     0 1   my $self = bless {}, shift;
165 0           my %args = @_;
166 0 0         $self->{namespace} = exists $args{namespace} ? delete $args{namespace} : '';
167 0           for (qw( debug cv compress_threshold compress_enable timeout noreply cas)) {
168 0 0         $self->{$_} = exists $args{$_} ? delete $args{$_} : 0;
169             }
170 0   0       $self->{timeout} ||= 3;
171 0   0       $self->{_bucker} = $args{bucker} || 'AnyEvent::Memcached::Buckets';
172 0   0       $self->{_hasher} = $args{hasher} || 'AnyEvent::Memcached::Hash';
173              
174 0           $self->set_servers(delete $args{servers});
175 0 0 0       $self->{compress_enable} and !$HAVE_ZLIB and Carp::carp("Have no Compress::Zlib installed, but have compress_enable option");
176 0 0         require Carp; Carp::carp "@{[ keys %args ]} options are not supported yet" if %args;
  0            
  0            
177 0           $self;
178             }
179              
180             =head2 set_servers
181              
182             Setup server list
183              
184             =cut
185              
186             sub set_servers {
187 0     0 1   my $self = shift;
188 0           my $list = shift;
189 0           my $buckets = $self->{_bucker}->new(servers => $list);
190             #warn R::Dump($list, $buckets);
191 0           $self->{hash} = $self->{_hasher}->new(buckets => $buckets);
192 0           $self->{peers} =
193             my $peers = $buckets->peers;
194 0           for my $peer ( values %{ $peers } ) {
  0            
195 0           $peer->{con} = AnyEvent::Memcached::Peer->new(
196             port => $peer->{port},
197             host => $peer->{host},
198             timeout => $self->{timeout},
199             debug => $self->{debug},
200             );
201             # Noreply connection
202 0 0         if ($self->{noreply}) {
203 0           $peer->{nrc} = AnyEvent::Memcached::Peer->new(
204             port => $peer->{port},
205             host => $peer->{host},
206             timeout => $self->{timeout},
207             debug => $self->{debug},# || 1,
208             );
209             }
210             }
211 0           return $self;
212             }
213              
214             =head2 connect
215              
216             Establish connection to all servers and invoke event C, when ready
217              
218             =cut
219              
220             sub connect {
221 0     0 1   my $self = shift;
222 0           $_->{con}->connect
223 0           for values %{ $self->{peers} };
224             }
225              
226             sub _handle_errors {
227 0     0     my $self = shift;
228 0           my $peer = shift;
229 0           local $_ = shift;
230 0 0         if ($_ eq 'ERROR') {
    0          
231 0           warn "Error";
232             }
233             elsif (/(CLIENT|SERVER)_ERROR (.*)/) {
234 0           warn ucfirst(lc $1)." error: $2";
235             }
236             else {
237 0           warn "Bad response from $peer->{host}:$peer->{port}: $_";
238             }
239             }
240              
241             sub _do {
242 0     0     my $self = shift;
243 0 0 0       my $key = shift; utf8::decode($key) xor utf8::encode($key) if utf8::is_utf8($key);
  0            
244 0 0 0       my $command = shift; utf8::decode($command) xor utf8::encode($command) if utf8::is_utf8($command);
  0            
245 0           my $worker = shift; # CODE
246 0           my %args = @_;
247 0           my $servers = $self->{hash}->servers($key);
248 0           my %res;
249             my %err;
250 0           my $res;
251 0 0 0       if ($args{noreply} and !$self->{noreply}) {
252 0 0         if (!$args{cb}) {
253 0           carp "Noreply option not set, but noreply command requested. command ignored";
254 0           return 0;
255             } else {
256 0           carp "Noreply option not set, but noreply command requested. fallback to common command";
257             }
258 0           delete $args{noreply};
259             }
260 0 0         if ($args{noreply}) {
261 0           for my $srv ( keys %$servers ) {
262 0           for my $real (@{ $servers->{$srv} }) {
  0            
263 0           my $cmd = $command.' noreply';
264 0           substr($cmd, index($cmd,'%s'),2) = $real;
265 0           $self->{peers}{$srv}{nrc}->request($cmd);
266 0           $self->{peers}{$srv}{lastnr} = $cmd;
267 0 0         unless ($self->{peers}{$srv}{nrc}->handles('command')) {
268             $self->{peers}{$srv}{nrc}->reg_cb(command => sub { # cb {
269 0     0     shift;
270 0           warn "Got data from $srv noreply connection (while shouldn't): @_\nLast noreply command was $self->{peers}{$srv}{lastnr}\n";
271 0           });
272 0           $self->{peers}{$srv}{nrc}->want_command();
273             }
274             }
275             }
276 0 0         $args{cb}(1) if $args{cb};
277 0           return 1;
278             }
279 0   0       $_ and $_->begin for $self->{cv}, $args{cv};
280             my $cv = AE::cv {
281             #use Data::Dumper;
282             #warn Dumper $res,\%res,\%err;
283 0 0   0     if ($res != -1) {
    0          
284 0           $args{cb}($res);
285             }
286             elsif (!%err) {
287 0           warn "-1 while not err";
288 0           $args{cb}($res{$key});
289             }
290             else {
291 0           $args{cb}(undef, dumper($err{$key}));
292             }
293             #warn "cv end";
294            
295 0   0       $_ and $_->end for $args{cv}, $self->{cv};
296 0           };
297 0           for my $srv ( keys %$servers ) {
298 0           for my $real (@{ $servers->{$srv} }) {
  0            
299 0           $cv->begin;
300 0           my $cmd = $command;
301 0           substr($cmd, index($cmd,'%s'),2) = $real;
302             $self->{peers}{$srv}{con}->command(
303             $cmd,
304             cb => sub { # cb {
305 0 0   0     if (defined( local $_ = shift )) {
306 0           my ($ok,$fail) = $worker->($_);
307 0 0         if (defined $ok) {
308 0           $res{$real}{$srv} = $ok;
309 0 0 0       $res = (!defined $res ) || $res == $ok ? $ok : -1;
310             } else {
311 0           $err{$real}{$srv} = $fail;
312 0           $res = -1;
313             }
314             } else {
315 0           warn "do failed: @_/$!";
316 0           $err{$real}{$srv} = $_;
317 0           $res = -1;
318             }
319 0           $cv->end;
320             }
321 0           );
322             }
323             }
324 0           return;
325             }
326              
327             sub _set {
328 0     0     my $self = shift;
329 0           my $cmd = shift;
330 0           my $key = shift;
331 0           my $cas;
332 0 0         if ($cmd eq 'cas') {
333 0           $cas = shift;
334             }
335 0           my $val = shift;
336 0           my %args = @_;
337 0 0         return $args{cb}(undef, "Readonly") if $self->{readonly};
338             #warn "cv begin";
339              
340 4     4   46 use bytes; # return bytes from length()
  4         9  
  4         31  
341              
342 0 0         warn "value for memkey:$key is not defined" unless defined $val;
343 0           my $flags = 0;
344 0 0         if (ref $val) {
345 0           local $Carp::CarpLevel = 2;
346 0           $val = Storable::nfreeze($val);
347 0           $flags |= F_STORABLE;
348             }
349 0           my $len = length($val);
350              
351 0 0 0       if ( $self->{compress_threshold} and $HAVE_ZLIB
      0        
      0        
352             and $self->{compress_enable} and $len >= $self->{compress_threshold}) {
353              
354 0           my $c_val = Compress::Zlib::memGzip($val);
355 0           my $c_len = length($c_val);
356              
357             # do we want to keep it?
358 0 0         if ($c_len < $len*(1 - COMPRESS_SAVINGS)) {
359 0           $val = $c_val;
360 0           $len = $c_len;
361 0           $flags |= F_COMPRESS;
362             }
363             }
364              
365 0   0       my $expire = int($args{expire} || 0);
366             return $self->_do(
367             $key,
368             "$cmd $self->{namespace}%s $flags $expire $len".(defined $cas ? ' '.$cas : '')."\015\012$val",
369             sub { # cb {
370 0     0     local $_ = shift;
371 0 0         if ($_ eq 'STORED') { return 1 }
  0 0          
    0          
372 0           elsif ($_ eq 'NOT_STORED') { return 0 }
373 0           elsif ($_ eq 'EXISTS') { return 0 }
374 0           else { return undef, $_ }
375             },
376 0 0         cb => $args{cb},
377             );
378 0   0       $_ and $_->begin for $self->{cv}, $args{cv};
379 0           my $servers = $self->{hash}->servers($key);
380 0           my %res;
381             my %err;
382 0           my $res;
383             my $cv = AE::cv {
384 0 0   0     if ($res != -1) {
    0          
385 0           $args{cb}($res);
386             }
387             elsif (!%err) {
388 0           warn "-1 while not err";
389 0           $args{cb}($res{$key});
390             }
391             else {
392 0           $args{cb}(undef, dumper($err{$key}));
393             }
394 0           warn "cv end";
395            
396 0   0       $_ and $_->end for $args{cv}, $self->{cv};
397 0           };
398 0           for my $srv ( keys %$servers ) {
399             # ??? Can hasher return more than one key for single key passed?
400             # If no, need to remove this inner loop
401             #warn "server for $key = $srv, $self->{peers}{$srv}";
402 0           for my $real (@{ $servers->{$srv} }) {
  0            
403 0           $cv->begin;
404             $self->{peers}{$srv}{con}->command(
405             "$cmd $self->{namespace}$real $flags $expire $len\015\012$val",
406             cb => sub { # cb {
407 0 0   0     if (defined( local $_ = shift )) {
408 0 0         if ($_ eq 'STORED') {
    0          
    0          
409 0           $res{$real}{$srv} = 1;
410 0 0 0       $res = (!defined $res)||$res == 1 ? 1 : -1;
411             }
412             elsif ($_ eq 'NOT_STORED') {
413 0           $res{$real}{$srv} = 0;
414 0 0 0       $res = (!defined $res)&&$res == 0 ? 0 : -1;
415             }
416             elsif ($_ eq 'EXISTS') {
417 0           $res{$real}{$srv} = 0;
418 0 0 0       $res = (!defined $res)&&$res == 0 ? 0 : -1;
419             }
420             else {
421 0           $err{$real}{$srv} = $_;
422 0           $res = -1;
423             }
424             } else {
425 0           warn "set failed: @_/$!";
426             #$args{cb}(undef, @_);
427 0           $err{$real}{$srv} = $_;
428 0           $res = -1;
429             }
430 0           $cv->end;
431             }
432 0           );
433             }
434             }
435 0           return;
436             }
437              
438             =head2 set( $key, $value, [cv => $cv], [ expire => $expire ], cb => $cb->( $rc, $err ) )
439              
440             Unconditionally sets a key to a given value in the memcache.
441              
442             C<$rc> is
443              
444             =over 4
445              
446             =item '1'
447              
448             Successfully stored
449              
450             =item '0'
451              
452             Item was not stored
453              
454             =item undef
455              
456             Error happens, see C<$err>
457              
458             =back
459              
460             =head2 cas( $key, $cas, $value, [cv => $cv], [ expire => $expire ], cb => $cb->( $rc, $err ) )
461              
462             $memd->gets($key, cb => sub {
463             my $value = shift;
464             unless (@_) { # No errors
465             my ($cas,$val) = @$value;
466             # Change your value in $val
467             $memd->cas( $key, $cas, $value, cb => sub {
468             my $rc = shift;
469             if ($rc) {
470             # stored
471             } else {
472             # ...
473             }
474             });
475             }
476             })
477              
478             C<$rc> is the same, as for L
479              
480             Store the C<$value> on the server under the C<$key>, but only if CAS value associated with this key is equal to C<$cas>. See also L
481              
482             =head2 add( $key, $value, [cv => $cv], [ expire => $expire ], cb => $cb->( $rc, $err ) )
483              
484             Like C, but only stores in memcache if the key doesn't already exist.
485              
486             =head2 replace( $key, $value, [cv => $cv], [ expire => $expire ], cb => $cb->( $rc, $err ) )
487              
488             Like C, but only stores in memcache if the key already exists. The opposite of add.
489              
490             =head2 append( $key, $value, [cv => $cv], [ expire => $expire ], cb => $cb->( $rc, $err ) )
491              
492             Append the $value to the current value on the server under the $key.
493              
494             B command first appeared in memcached 1.2.4.
495              
496             =head2 prepend( $key, $value, [cv => $cv], [ expire => $expire ], cb => $cb->( $rc, $err ) )
497              
498             Prepend the $value to the current value on the server under the $key.
499              
500             B command first appeared in memcached 1.2.4.
501              
502             =cut
503              
504 0     0 1   sub set { shift->_set( set => @_) }
505             sub cas {
506 0     0 1   my $self = shift;
507 0 0         unless ($self->{cas}) { shift;shift;my %args = @_;return $args{cb}(undef, "CAS not enabled") }
  0            
  0            
  0            
  0            
508 0           $self->_set( cas => @_)
509             }
510 0     0 1   sub add { shift->_set( add => @_) }
511 0     0 1   sub replace { shift->_set( replace => @_) }
512 0     0 1   sub append { shift->_set( append => @_) }
513 0     0 1   sub prepend { shift->_set( prepend => @_) }
514              
515             =head2 get( $key, [cv => $cv], [ expire => $expire ], cb => $cb->( $rc, $err ) )
516              
517             Retrieve the value for a $key. $key should be a scalar
518              
519             =head2 get( $keys : ARRAYREF, [cv => $cv], [ expire => $expire ], cb => $cb->( $values_hash, $err ) )
520              
521             Retrieve the values for a $keys. Return a hash with keys/values
522              
523             =head2 gets( $key, [cv => $cv], [ expire => $expire ], cb => $cb->( $rc, $err ) )
524              
525             Retrieve the value and its CAS for a $key. $key should be a scalar.
526              
527             C<$rc> is a reference to an array [$cas, $value], or nothing for non-existent key
528              
529             =head2 gets( $keys : ARRAYREF, [cv => $cv], [ expire => $expire ], cb => $cb->( $rc, $err ) )
530              
531             Retrieve the values and their CAS for a $keys.
532              
533             C<$rc> is a hash reference with $rc->{$key} is a reference to an array [$cas, $value]
534              
535             =cut
536              
537             sub _deflate {
538 0     0     my $self = shift;
539 0           my $result = shift;
540 0 0         for (
    0          
541 0           ref $result eq 'ARRAY' ?
542             @$result ? @$result[ map { $_*2+1 } 0..int( $#$result / 2 ) ] : ()
543             : values %$result
544             ) {
545 0 0 0       if ($HAVE_ZLIB and $_->{flags} & F_COMPRESS) {
546 0           $_->{data} = Compress::Zlib::memGunzip($_->{data});
547             }
548 0 0         if ($_->{flags} & F_STORABLE) {
549 0 0         eval{ $_->{data} = Storable::thaw($_->{data}); 1 } or delete $_->{data};
  0            
  0            
550             }
551 0 0         if (exists $_->{cas}) {
552 0           $_ = [$_->{cas},$_->{data}];
553             } else {
554 0           $_ = $_->{data};
555             }
556             }
557 0           return;
558             }
559              
560             sub _get {
561 0     0     my $self = shift;
562 0           my $cmd = shift;
563 0           my $keys = shift;
564 0           my %args = @_;
565 0           my $array;
566 0 0 0       if (ref $keys and ref $keys eq 'ARRAY') {
567 0           $array = 1;
568             }
569            
570 0   0       $_ and $_->begin for $self->{cv}, $args{cv};
571 0           my $servers = $self->{hash}->servers($keys, for => 'get');
572 0           my %res;
573             my $cv = AE::cv {
574 0     0     $self->_deflate(\%res);
575 0 0         $args{cb}( $array ? \%res : $res{ $keys } );
576 0   0       $_ and $_->end for $args{cv}, $self->{cv};
577 0           };
578 0           for my $srv ( keys %$servers ) {
579             #warn "server for $key = $srv, $self->{peers}{$srv}";
580 0           $cv->begin;
581 0           my $keys = join(' ',map "$self->{namespace}$_", @{ $servers->{$srv} });
  0            
582 0           $self->{peers}{$srv}{con}->request( "$cmd $keys" );
583             $self->{peers}{$srv}{con}->reader( id => $srv.'+'.$keys, res => \%res, namespace => $self->{namespace}, cb => sub { # cb {
584 0     0     $cv->end;
585 0           });
586             }
587 0           return;
588             }
589 0     0 1   sub get { shift->_get(get => @_) }
590             sub gets {
591 0     0 1   my $self = shift;
592 0 0         unless ($self->{cas}) { shift;my %args = @_;return $args{cb}(undef, "CAS not enabled") }
  0            
  0            
  0            
593 0           $self->_get(gets => @_)
594             }
595              
596             =head2 delete( $key, [cv => $cv], [ noreply => 1 ], cb => $cb->( $rc, $err ) )
597              
598             Delete $key and its value from the cache.
599              
600             If C is true, cb doesn't required
601              
602             =head2 del
603              
604             Alias for "delete"
605              
606             =head2 remove
607              
608             Alias for "delete"
609              
610             =cut
611              
612             sub delete {
613 0     0 1   my $self = shift;
614 0           my ($cmd) = (caller(0))[3] =~ /([^:]+)$/;
615 0           my $key = shift;
616 0           my %args = @_;
617 0 0         return $args{cb}(undef, "Readonly") if $self->{readonly};
618 0 0         my $time = $args{delay} ? " $args{delay}" : '';
619             return $self->_do(
620             $key,
621             "delete $self->{namespace}%s$time",
622             sub { # cb {
623 0     0     local $_ = shift;
624 0 0         if ($_ eq 'DELETED') { return 1 }
  0 0          
625 0           elsif ($_ eq 'NOT_FOUND') { return 0 }
626 0           else { return undef, $_ }
627             },
628 0           cb => $args{cb},
629             noreply => $args{noreply},
630             );
631             }
632             *del = \&delete;
633             *remove = \&delete;
634              
635             =head2 incr( $key, $increment, [cv => $cv], [ noreply => 1 ], cb => $cb->( $rc, $err ) )
636              
637             Increment the value for the $key by $delta. Starting with memcached 1.3.3 $key should be set to a number or the command will fail.
638             Note that the server doesn't check for overflow.
639              
640             If C is true, cb doesn't required, and if passed, simply called with rc = 1
641              
642             Similar to DBI, zero is returned as "0E0", and evaluates to true in a boolean context.
643              
644             =head2 decr( $key, $decrement, [cv => $cv], [ noreply => 1 ], cb => $cb->( $rc, $err ) )
645              
646             Opposite to C
647              
648             =cut
649              
650             sub _delta {
651 0     0     my $self = shift;
652 0           my ($cmd) = (caller(1))[3] =~ /([^:]+)$/;
653 0           my $key = shift;
654 0           my $val = shift;
655 0           my %args = @_;
656 0 0         return $args{cb}(undef, "Readonly") if $self->{readonly};
657             return $self->_do(
658             $key,
659             "$cmd $self->{namespace}%s $val",
660             sub { # cb {
661 0     0     local $_ = shift;
662 0 0         if ($_ eq 'NOT_FOUND') { return 0 }
  0 0          
663 0 0         elsif (/^(\d+)$/) { return $1 eq '0' ? '0E0' : $_ }
664 0           else { return undef, $_ }
665             },
666 0           cb => $args{cb},
667             noreply => $args{noreply},
668             );
669             }
670 0     0 1   sub incr { shift->_delta(@_) }
671 0     0 1   sub decr { shift->_delta(@_) }
672              
673             #rget \r\n
674             #
675             #- where the query starts.
676             #- where the query ends.
677             #- indicates the openness of left side, 0 means the result includes , while 1 means not.
678             #- indicates the openness of right side, 0 means the result includes , while 1 means not.
679             #- how many items at most return, max is 100.
680              
681             # rget ($from,$till, '+left' => 1, '+right' => 0, max => 10, cb => sub { ... } );
682              
683             =head2 rget( $from, $till, [ max => 100 ], [ '+left' => 1 ], [ '+right' => 1 ], [cv => $cv], [ rv => 'array' ], cb => $cb->( $rc, $err ) )
684              
685             Memcachedb 1.2.1-beta implements rget method, that allows to look through the whole storage
686              
687             =over 4
688              
689             =item $from
690              
691             the starting key
692              
693             =item $till
694              
695             finishing key
696              
697             =item +left
698              
699             If true, then starting key will be included in results. true by default
700              
701             =item +right
702              
703             If true, then finishing key will be included in results. true by default
704              
705             =item max
706              
707             Maximum number of results to fetch. 100 is the maximum and is the default
708              
709             =item rv
710              
711             If passed rv => 'array', then the return value will be arrayref with values in order, returned by memcachedb.
712              
713             =back
714              
715             =cut
716              
717             sub rget {
718 0     0 1   my $self = shift;
719             #my ($cmd) = (caller(0))[3] =~ /([^:]+)$/;
720 0           my $cmd = 'rget';
721 0           my $from = shift;
722 0           my $till = shift;
723 0           my %args = @_;
724 0           my ($lkey,$rkey);
725             #$lkey = ( exists $args{'+left'} && !$args{'+left'} ) ? 1 : 0;
726 0 0         $lkey = exists $args{'+left'} ? $args{'+left'} ? 0 : 1 : 0;
    0          
727 0 0         $rkey = exists $args{'+right'} ? $args{'+right'} ? 0 : 1 : 0;
    0          
728 0   0       $args{max} ||= 100;
729              
730 0           my $result;
731 0 0         if (lc $args{rv} eq 'array') {
732 0           $result = [];
733             } else {
734 0           $result = {};
735             }
736 0           my $err;
737 0           my $cv = AnyEvent->condvar;
738 0   0       $_ and $_->begin for $self->{cv}, $args{cv};
739             $cv->begin(sub {
740 0     0     undef $cv;
741 0           $self->_deflate($result);
742 0 0         $args{cb}( $err ? (undef,$err) : $result );
743 0           undef $result;
744 0   0       $_ and $_->end for $args{cv}, $self->{cv};
745 0           });
746              
747 0           for my $peer (keys %{$self->{peers}}) {
  0            
748 0           $cv->begin;
749 0           my $do;$do = sub {
750 0     0     undef $do;
751 0           $self->{peers}{$peer}{con}->request( "$cmd $self->{namespace}$from $self->{namespace}$till $lkey $rkey $args{max}" );
752             $self->{peers}{$peer}{con}->reader( id => $peer, res => $result, namespace => $self->{namespace}, cb => sub {
753             #warn "rget from: $peer";
754 0           $cv->end;
755 0           });
756 0           };
757 0 0         if (exists $self->{peers}{$peer}{rget_ok}) {
758 0 0         if ($self->{peers}{$peer}{rget_ok}) {
759 0           $do->();
760             } else {
761             #warn
762 0           $err = "rget not supported on peer $peer";
763 0           $cv->end;
764             }
765             } else {
766             $self->{peers}{$peer}{con}->command( "$cmd 1 0 0 0 1", cb => sub {
767 0     0     local $_ = shift;
768 0 0         if (defined $_) {
769 0 0         if ($_ eq 'END') {
770 0           $self->{peers}{$peer}{rget_ok} = 1;
771 0           $do->();
772             }
773             else {
774             #warn
775 0           $err = "rget not supported on peer $peer: @_";
776 0           $self->{peers}{$peer}{rget_ok} = 0;
777 0           undef $do;
778 0           $cv->end;
779             }
780             } else {
781 0           $err = "@_";
782 0           undef $do;
783 0           $cv->end;
784             }
785 0           } );
786            
787             }
788             }
789 0           $cv->end;
790 0           return;
791             }
792              
793             =head2 incadd ( $key, $increment, [cv => $cv], [ noreply => 1 ], cb => $cb->( $rc, $err ) )
794              
795             Increment key, and if it not exists, add it with initial value. If add fails, try again to incr or fail
796              
797             =cut
798              
799             sub incadd {
800 0     0 1   my $self = shift;
801 0           my $key = shift;
802 0           my $val = shift;
803 0           my %args = @_;
804             $self->incr($key => $val, cb => sub {
805 0 0 0 0     if (my $rc = shift or @_) {
806             #if (@_) {
807             # warn("incr failed: @_");
808             #} else {
809             # warn "incr ok";
810             #}
811 0           $args{cb}($rc, @_);
812             }
813             else {
814             $self->add( $key, $val, %args, cb => sub {
815 0 0 0       if ( my $rc = shift or @_ ) {
816             #if (@_) {
817             # warn("add failed: @_");
818             #} else {
819             # warn "add ok";
820             #}
821 0           $args{cb}($val, @_);
822             }
823             else {
824             #warn "add failed, try again";
825 0           $self->incadd($key,$val,%args);
826             }
827 0           });
828             }
829 0           });
830             }
831              
832             =head2 destroy
833              
834             Shutdown object as much, as possible, incl cleaning of incapsulated objects
835              
836             =cut
837              
838 0     0     sub AnyEvent::Memcached::destroyed::AUTOLOAD {}
839              
840             sub destroy {
841 0     0 1   my $self = shift;
842 0           $self->DESTROY;
843 0           bless $self, "AnyEvent::Memcached::destroyed";
844             }
845              
846             sub DESTROY {
847 0     0     my $self = shift;
848 0 0         warn "(".int($self).") Destroying AE:MC" if $self->{debug};
849 0           for (values %{$self->{peers}}) {
  0            
850 0 0         $_->{con} and $_->{con}->destroy;
851             }
852 0           %$self = ();
853             }
854              
855             =head1 BUGS
856              
857             Feature requests are welcome
858              
859             Bug reports are welcome
860              
861             =head1 AUTHOR
862              
863             Mons Anderson, C<< >>
864              
865             =head1 COPYRIGHT & LICENSE
866              
867             Copyright 2009 Mons Anderson, all rights reserved.
868              
869             This program is free software; you can redistribute it and/or modify it
870             under the same terms as Perl itself.
871              
872             =cut
873              
874             1; # End of AnyEvent::Memcached