File Coverage

lib/AnyEvent/Memcached.pm
Criterion Covered Total %
statement 49 285 17.1
branch 0 106 0.0
condition 0 56 0.0
subroutine 17 47 36.1
pod 13 13 100.0
total 79 507 15.5


line stmt bran cond sub pod time code
1             package AnyEvent::Memcached;
2              
3 5     5   176342 use common::sense;
  5         27  
  5         25  
4 5     5   2195 use Devel::Leak::Cb;
  5         29798  
  5         28  
5 5     5   1970 use AnyEvent::Memcached::Peer;
  5         10  
  5         271  
6              
7             =head1 NAME
8              
9             AnyEvent::Memcached - AnyEvent memcached client
10              
11             =head1 VERSION
12              
13             Version 0.01_6
14              
15             =head1 NOTICE
16              
17             This is a B. Interface is subject to change.
18              
19             If you want to rely on some features, please, notify me about them
20              
21             =cut
22              
23             our $VERSION = '0.01_6';
24              
25             =head1 SYNOPSIS
26              
27             use AnyEvent::Memcached;
28              
29             my $memd = AnyEvent::Memcached->new(
30             servers => [ "10.0.0.15:11211", "10.0.0.15:11212" ], # same as in Cache::Memcached
31             debug => 1,
32             compress_threshold => 10000,
33             namespace => 'my-namespace:',
34              
35             cv => $cv, # AnyEvent->condvar: group callback
36             );
37            
38             $memd->set_servers([ "10.0.0.15:11211", "10.0.0.15:11212" ]);
39            
40             # Basic methods are like in Cache::Memcached, but with additional cb => sub { ... };
41             # first argument to cb is return value, second is the error(s)
42            
43             $memd->set( key => $value, cb => sub {
44             shift or warn "Set failed: @_"
45             } );
46              
47             $memd->get( 'key', cb => sub {
48             my ($value,$err) = shift;
49             $err and return warn "Get failed: @_";
50             warn "Value for key is $value";
51             } );
52              
53             $memd->mget( [ 'key1', 'key2' ], cb => sub {
54             my ($values,$err) = shift;
55             $err and return warn "Get failed: @_";
56             warn "Value for key1 is $values->{key1} and value for key2 is $values->{key2}"
57             } );
58              
59             # Additionally there is rget (see memcachedb-1.2.1-beta)
60              
61             $memd->rget( 'fromkey', 'tokey', cb => sub {
62             my ($value,$err) = shift;
63             $err and warn "Get failed: @_";
64             } );
65              
66             =cut
67              
68 5     5   28 use AnyEvent;
  5         6  
  5         94  
69 5     5   16 use AnyEvent::Socket;
  5         6  
  5         393  
70 5     5   21 use AnyEvent::Handle;
  5         6  
  5         75  
71 5     5   16 use AnyEvent::Connection;
  5         5  
  5         112  
72 5     5   17 use AnyEvent::Memcached::Conn;
  5         10  
  5         79  
73 5     5   15 use base 'Object::Event';
  5         4  
  5         253  
74 5     5   2117 use String::CRC32;
  5         1547  
  5         222  
75 5     5   2694 use Storable ();
  5         12276  
  5         127  
76              
77             # flag definitions
78 5     5   24 use constant F_STORABLE => 1;
  5         6  
  5         303  
79 5     5   19 use constant F_COMPRESS => 2;
  5         4  
  5         188  
80              
81             # size savings required before saving compressed value
82 5     5   24 use constant COMPRESS_SAVINGS => 0.20; # percent
  5         6  
  5         365  
83              
84             our $HAVE_ZLIB;
85             BEGIN {
86 5     5   344 $HAVE_ZLIB = eval "use Compress::Zlib (); 1;";
  5     5   2620  
  5         235267  
  5         63  
87             }
88              
89             sub _hash($) {
90 0     0     return (crc32($_[0]) >> 16) & 0x7fff;
91             }
92              
93             =head1 METHODS
94              
95             =head2 new %args
96              
97             Currently supported options:
98              
99             =over 4
100              
101             =item servers
102             =item namespace
103             =item debug
104             =item cv
105             =item compress_threshold
106             =item compress_enable
107             =item timeout
108              
109             =back
110              
111             =cut
112              
113             sub new {
114 0     0 1   my $self = bless {}, shift;
115 0           my %args = @_;
116 0           $self->set_servers(delete $args{servers});
117 0 0         $self->{namespace} = exists $args{namespace} ? delete $args{namespace} : '';
118 0           for (qw( debug cv compress_threshold compress_enable timeout)) {
119 0 0         $self->{$_} = exists $args{$_} ? delete $args{$_} : 0;
120             }
121 0 0 0       $self->{compress_enable} and !$HAVE_ZLIB and Carp::carp("Have no Compress::Zlib installed, but have compress_enable option");
122 0 0         require Carp; Carp::carp "@{[ keys %args ]} options are not supported yet" if %args;
  0            
  0            
123 0           $self;
124             }
125              
126             =head2 set_servers
127              
128             Setup server list
129              
130             =cut
131              
132             sub set_servers {
133 0     0 1   my $self = shift;
134 0           my $list = shift;
135 0 0         $list = [$list] unless ref $list eq 'ARRAY';
136 0   0       $self->{servers} = $list || [];
137 0           $self->{active} = 0+@{$self->{servers}};
  0            
138 0           $self->{buckets} = undef;
139 0           $self->{bucketcount} = 0;
140 0           $self->_init_buckets;
141 0           @{$self->{buck2sock}} = ();
  0            
142              
143 0           $self->{'_single_sock'} = undef;
144 0 0         if (@{$self->{'servers'}} == 1) {
  0            
145 0           $self->{'_single_sock'} = $self->{'servers'}[0];
146             }
147              
148 0           return $self;
149             }
150              
151             sub _init_buckets {
152 0     0     my $self = shift;
153 0 0         return if $self->{buckets};
154 0           my $bu = $self->{buckets} = [];
155 0           foreach my $v (@{$self->{servers}}) {
  0            
156 0           my $peer;
157 0 0         if (ref $v eq "ARRAY") {
158 0           $peer = $v->[0];
159 0           for (1..$v->[1]) { push @$bu, $v->[0]; }
  0            
160             } else {
161 0           push @$bu, $peer = $v;
162             }
163 0           my ($host,$port) = $peer =~ /^(.+?)(?:|:(\d+))$/;
164 0           $self->{peers}{$peer} = {
165             host => $host,
166             port => $port,
167             };
168             }
169 0           $self->{bucketcount} = scalar @{$self->{buckets}};
  0            
170 0           $self->_create_connections;
171             }
172              
173             sub _create_connections {
174 0     0     my $self = shift;
175 0           for my $peer ( values %{ $self->{peers} }) {
  0            
176             $peer->{con} = AnyEvent::Memcached::Peer->new(
177             port => $peer->{port},
178             host => $peer->{host},
179             timeout => $self->{timeout},
180             debug => $self->{debug},
181 0           );
182             }
183             }
184              
185             =head2 connect
186              
187             Establish connection to all servers and invoke event C, when ready
188              
189             =cut
190              
191             sub connect {
192 0     0 1   my $self = shift;
193             $_->{con}->connect
194 0           for values %{ $self->{peers} };
  0            
195             }
196              
197             sub _handle_errors {
198 0     0     my $self = shift;
199 0           my $peer = shift;
200 0           local $_ = shift;
201 0 0         if ($_ eq 'ERROR') {
    0          
202 0           warn "Error";
203             }
204             elsif (/(CLIENT|SERVER)_ERROR (.*)/) {
205 0           warn ucfirst(lc $1)." error: $2";
206             }
207             else {
208 0           warn "Bad response from $peer->{host}:$peer->{port}: $_";
209             }
210             }
211              
212             sub _p4k {
213 0     0     my $self = shift;
214 0           my $key = shift;
215 0 0         my ($hv, $real_key) = ref $key ?
216             (int($key->[0]), $key->[1]) :
217             (_hash($key), $key);
218 0           my $bucket = $hv % $self->{bucketcount};
219 0 0         return wantarray ? ( $self->{buckets}[$bucket], $real_key ) : $self->{buckets}[$bucket];
220             }
221              
222             sub _set {
223 0     0     my $self = shift;
224 0           my $cmd = shift;
225 0           my $key = shift;
226 0           my $val = shift;
227 0           my %args = @_;
228 0 0         return $args{cb}(undef, "Readonly") if $self->{readonly};
229 0   0       $_ and $_->begin for $self->{cv}, $args{cv};
230 0 0         (my $peer,$key) = $self->_p4k($key) or return $args{cb}(undef, "Peer dead???");
231              
232 5     5   32 use bytes; # return bytes from length()
  5         7  
  5         17  
233              
234 0 0         warn "value for memkey:$key is not defined" unless defined $val;
235 0           my $flags = 0;
236 0 0         if (ref $val) {
237 0           local $Carp::CarpLevel = 2;
238 0           $val = Storable::nfreeze($val);
239 0           $flags |= F_STORABLE;
240             }
241 0           my $len = length($val);
242              
243 0 0 0       if ( $self->{compress_threshold} and $HAVE_ZLIB
      0        
      0        
244             and $self->{compress_enable} and $len >= $self->{compress_threshold}) {
245              
246 0           my $c_val = Compress::Zlib::memGzip($val);
247 0           my $c_len = length($c_val);
248              
249             # do we want to keep it?
250 0 0         if ($c_len < $len*(1 - COMPRESS_SAVINGS)) {
251 0           $val = $c_val;
252 0           $len = $c_len;
253 0           $flags |= F_COMPRESS;
254             }
255             }
256              
257 0   0       my $expire = int($args{expire} || 0);
258              
259             $self->{peers}{$peer}{con}->command(
260             "$cmd $self->{namespace}$key $flags $expire $len\015\012$val",
261 0 0   0     cb => cb {
262 0 0         if (defined( local $_ = shift )) {
    0          
    0          
263 0           if ($_ eq 'STORED') {
264             $args{cb}(1);
265             }
266 0           elsif ($_ eq 'NOT_STORED') {
267             $args{cb}(0);
268             }
269 0           elsif ($_ eq 'EXISTS') {
270             $args{cb}(0);
271             }
272 0           else {
273             $args{cb}(undef,$_);
274             }
275 0           } else {
276 0           warn "set failed: @_/$!";
277             $args{cb}(undef, @_);
278 0   0       }
279             $_ and $_->end for $args{cv}, $self->{cv};
280 0           }
281 0           );
282             return;
283             }
284              
285             =head2 set( $key, $value, [cv => $cv], [ expire => $expire ], cb => $cb->( $rc, $err ) )
286              
287             Unconditionally sets a key to a given value in the memcache.
288              
289             C<$rc> is
290              
291             =over 4
292              
293             =item '1'
294              
295             Successfully stored
296              
297             =item '0'
298              
299             Item was not stored
300              
301             =item undef
302              
303             Error happens, see C<$err>
304              
305             =back
306              
307             =head2 add( $key, $value, [cv => $cv], [ expire => $expire ], cb => $cb->( $rc, $err ) )
308              
309             Like C, but only stores in memcache if the key doesn't already exist.
310              
311             =head2 replace( $key, $value, [cv => $cv], [ expire => $expire ], cb => $cb->( $rc, $err ) )
312              
313             Like C, but only stores in memcache if the key already exists. The opposite of add.
314              
315             =head2 append( $key, $value, [cv => $cv], [ expire => $expire ], cb => $cb->( $rc, $err ) )
316              
317             Append the $value to the current value on the server under the $key.
318              
319             B command first appeared in memcached 1.2.4.
320              
321             =head2 prepend( $key, $value, [cv => $cv], [ expire => $expire ], cb => $cb->( $rc, $err ) )
322              
323             Prepend the $value to the current value on the server under the $key.
324              
325             B command first appeared in memcached 1.2.4.
326              
327             =cut
328 0     0 1    
329 0     0 1   sub set { shift->_set( set => @_) }
330 0     0 1   sub add { shift->_set( add => @_) }
331 0     0 1   sub replace { shift->_set( replace => @_) }
332 0     0 1   sub append { shift->_set( append => @_) }
333             sub prepend { shift->_set( prepend => @_) }
334              
335             =head2 get( $key, [cv => $cv], [ expire => $expire ], cb => $cb->( $rc, $err ) )
336              
337             Retrieve the value for a $key. $key should be a scalar
338              
339             =head2 mget( $keys : ARRAYREF, [cv => $cv], [ expire => $expire ], cb => $cb->( $rc, $err ) )
340              
341             B
342              
343             Retrieve the values for a $keys.
344              
345             =head2 get_multi : Alias for mget.
346              
347             B
348              
349             =head2 gets( $keys : ARRAYREF, [cv => $cv], [ expire => $expire ], cb => $cb->( $rc, $err ) )
350              
351             Retrieve the value and its CAS for a $key. $key should be a scalar.
352              
353             B
354              
355             =cut
356              
357 0     0     sub _deflate {
358 0           my $self = shift;
359 0           my $result = shift;
360 0 0 0       for (values %$result) {
361 0           if ($HAVE_ZLIB and $_->{flags} & F_COMPRESS) {
362             $_->{data} = Compress::Zlib::memGunzip($_->{data});
363 0 0         }
364 0 0         if ($_->{flags} & F_STORABLE) {
  0            
  0            
365             eval{ $_->{data} = Storable::thaw($_->{data}); 1 } or delete $_->{data};
366 0           }
367             $_ = $_->{data};
368 0           }
369             return;
370             }
371              
372 0     0 1   sub get {
373 0           my $self = shift;
374 0           my ($cmd) = (caller(0))[3] =~ /([^:]+)$/;
375 0           my $keys = shift;
376             my %args = @_;
377 0           #$self->{connected} or return $self->connect( $cmd => $keys, \%args );
378 0 0 0       my $array;
379 0           if (ref $keys and ref $keys eq 'ARRAY') {
380             $array = 1;
381 0           } else {
382             $keys = [$keys];
383 0           }
384             my $bcount = $self->{bucketcount};
385 0            
386 0           my %peers;
387 0           for my $key (@$keys) {
388             my ($peer,$real_key) = $self->_p4k($key);
389 0   0       #warn "$peer, $real_key | $self->{peers}{$peer}";
  0            
390             push @{ $peers{$peer} ||= [] }, $real_key;
391             }
392 0          
393 0           my %result;
394 0   0       my $cv = AnyEvent->condvar;
395             $_ and $_->begin for $self->{cv}, $args{cv};
396 0     0     $cv->begin(cb {
397 0           undef $cv;
398 0 0         $self->_deflate(\%result);
399 0           $args{cb}( $array ? \%result : $result{ $keys->[0]} );
400 0   0       %result = ();
401 0           $_ and $_->end for $args{cv}, $self->{cv};
402 0           });
403 0           for my $peer (keys %peers) {
404 0           $cv->begin;
  0            
405 0           my $keys = join(' ',map "$self->{namespace}$_", @{ $peers{$peer} });
406             $self->{peers}{$peer}{con}->request( "get $keys" );
407 0     0     $self->{peers}{$peer}{con}->reader( id => $peer.'+'.$keys, res => \%result, namespace => $self->{namespace}, cb => cb {
408 0           $cv->end;
409             });
410 0           }
411 0           $cv->end;
412             return;
413             }
414              
415             =head2 delete( $key, [cv => $cv], [ noreply => 1 ], cb => $cb->( $rc, $err ) )
416              
417             Delete $key and its value from the cache.
418              
419             If C is true, cb doesn't required
420              
421             =head2 del
422              
423             Alias for "delete"
424              
425             =head2 remove
426              
427             Alias for "delete"
428              
429             =cut
430              
431 0     0 1   sub delete {
432 0           my $self = shift;
433 0           my ($cmd) = (caller(0))[3] =~ /([^:]+)$/;
434 0           my $key = shift;
435 0 0         my %args = @_;
436 0 0         return $args{cb}(undef, "Readonly") if $self->{readonly};
437 0 0         (my $peer,$key) = $self->_p4k($key) or return $args{cb}(undef, "Peer dead???");
438 0 0         my $time = $args{delay} ? " $args{delay}" : '';
439 0           if ($args{noreply}) {
440 0 0         $self->{peers}{$peer}{con}->request("delete $self->{namespace}$key noreply");
441             $args{cb}(1) if $args{cb};
442 0   0       } else {
443             $_ and $_->begin for $self->{cv}, $args{cv};
444             $self->{peers}{$peer}{con}->command(
445 0 0   0     "delete $self->{namespace}$key$time",
446 0 0         cb => cb {
    0          
447 0           if (defined( local $_ = shift )) {
448             if ($_ eq 'DELETED') {
449 0           $args{cb}(1);
450             } elsif ($_ eq 'NOT_FOUND') {
451 0           $args{cb}(0);
452             } else {
453             $args{cb}(undef,$_);
454 0           }
455             } else {
456 0   0       $args{cb}(undef, @_);
457             }
458 0           $_ and $_->end for $args{cv}, $self->{cv};
459             }
460 0           );
461             }
462             return;
463             }
464             *del = \&delete;
465             *remove = \&delete;
466              
467             =head2 incr( $key, $increment, [cv => $cv], [ noreply => 1 ], cb => $cb->( $rc, $err ) )
468              
469             Increment the value for the $key by $delta. Starting with memcached 1.3.3 $key should be set to a number or the command will fail.
470             Note that the server doesn't check for overflow.
471              
472             If C is true, cb doesn't required, and if passed, simply called with rc = 1
473              
474             Similar to DBI, zero is returned as "0E0", and evaluates to true in a boolean context.
475              
476             =head2 decr( $key, $decrement, [cv => $cv], [ noreply => 1 ], cb => $cb->( $rc, $err ) )
477              
478             Opposite to C
479              
480             =cut
481 0     0 1    
482 0           sub incr {
483 0           my $self = shift;
484 0           my ($cmd) = (caller(0))[3] =~ /([^:]+)$/;
485 0           my $key = shift;
486 0 0         my $val = shift;
487 0 0         my %args = @_;
488 0 0         return $args{cb}(undef, "Readonly") if $self->{readonly};
489 0           (my $peer,$key) = $self->_p4k($key) or return $args{cb}(undef, "Peer dead???");
490 0 0         if ($args{noreply}) {
491             $self->{peers}{$peer}{con}->request("$cmd $self->{namespace}$key $val noreply");
492 0   0       $args{cb}(1) if $args{cb};
493             } else {
494             $_ and $_->begin for $self->{cv}, $args{cv};
495 0 0   0     $self->{peers}{$peer}{con}->command(
496 0 0         "$cmd $self->{namespace}$key $val",
    0          
497 0           cb => cb {
498             if (defined( local $_ = shift )) {
499             if ($_ eq 'NOT_FOUND') {
500 0 0         $args{cb}(undef);
501             }
502             elsif( /^(\d+)$/ ) {
503 0           $args{cb}($1 eq '0' ? '0E0' : $1);
504             }
505             else {
506 0           $args{cb}(undef,$_);
507             }
508 0   0       } else {
509             $args{cb}(undef, @_);
510 0           }
511             $_ and $_->end for $args{cv}, $self->{cv};
512 0           }
513             );
514             }
515             return;
516             }
517             *decr = \&incr;
518              
519             #rget \r\n
520             #
521             #- where the query starts.
522             #- where the query ends.
523             #- indicates the openness of left side, 0 means the result includes , while 1 means not.
524             #- indicates the openness of right side, 0 means the result includes , while 1 means not.
525             #- how many items at most return, max is 100.
526              
527             # rget ($from,$till, '+left' => 1, '+right' => 0, max => 10, cb => sub { ... } );
528              
529             =head2 rget( $from, $till, [ max => 100 ], [ '+left' => 1 ], [ '+right' => 1 ], [cv => $cv], cb => $cb->( $rc, $err ) )
530              
531             Memcachedb 1.2.1-beta implements rget method, that allows to look through the whole storage
532              
533             =over 4
534              
535             =item $from
536              
537             the starting key
538              
539             =item $till
540              
541             finishing key
542              
543             =item +left
544              
545             If true, then starting key will be included in results. true by default
546              
547             =item +right
548              
549             If true, then finishing key will be included in results. true by default
550              
551             =item max
552              
553             Maximum number of results to fetch. 100 is the maximum and is the default
554              
555             =back
556              
557 0     0 1   =cut
558              
559 0           sub rget {
560 0           my $self = shift;
561 0           #my ($cmd) = (caller(0))[3] =~ /([^:]+)$/;
562 0           my $cmd = 'rget';
563 0           my $from = shift;
564 0 0         my $till = shift;
    0          
565 0 0         my %args = @_;
    0          
566 0   0       my ($lkey,$rkey);
567             $lkey = exists $args{'+left'} ? $args{'+left'} ? 0 : 1 : 0;
568             $rkey = exists $args{'+right'} ? $args{'+right'} ? 0 : 1 : 0;
569 0           $args{max} ||= 100;
570             #return $args{cb}(undef, "Readonly") if $self->{readonly};
571 0            
572 0   0       my %result;
573             my $err;
574 0     0     my $cv = AnyEvent->condvar;
575 0           $_ and $_->begin for $self->{cv}, $args{cv};
576 0 0         $cv->begin(sub {
577 0           undef $cv;
578 0   0       $self->_deflate(\%result);
579 0           $args{cb}( $err ? (undef,$err) : \%result );
580             %result = ();
581             $_ and $_->end for $args{cv}, $self->{cv};
582 0           });
  0            
583 0            
584 0           # TODO: peers ?
585 0     0     for my $peer (keys %{$self->{peers}}) {
586 0           $cv->begin;
587             my $do;$do = sub {
588             undef $do;
589 0           $self->{peers}{$peer}{con}->request( "$cmd $self->{namespace}$from $self->{namespace}$till $lkey $rkey $args{max}" );
590 0           $self->{peers}{$peer}{con}->reader( id => $peer, res => \%result, namespace => $self->{namespace}, cb => sub {
591 0           #warn "rget from: $peer";
592 0 0         $cv->end;
593 0 0         });
594 0           };
595             if (exists $self->{peers}{$peer}{rget_ok}) {
596             if ($self->{peers}{$peer}{rget_ok}) {
597 0           $do->();
598 0           } else {
599             #warn
600             $err = "rget not supported on peer $peer";
601             $cv->end;
602 0     0     }
603 0 0         } else {
604 0           $self->{peers}{$peer}{con}->command( "$cmd 1 0 0 0 1", cb => sub {
605 0           local $_ = shift;
606             if ($_ eq 'END') {
607             $self->{peers}{$peer}{rget_ok} = 1;
608 0           $do->();
609 0           } else {
610 0           #warn
611             $err = "rget not supported on peer $peer";
612 0           $self->{peers}{$peer}{rget_ok} = 0;
613             $cv->end;
614             }
615             } );
616 0          
617 0           }
618             }
619             $cv->end;
620             return;
621             }
622              
623             =head2 destroy
624              
625             Shutdown object as much, as possible, incl cleaning of incapsulated objects
626       0      
627             =cut
628              
629 0     0 1   sub AnyEvent::Memcached::destroyed::AUTOLOAD {}
630 0            
631 0           sub destroy {
632             my $self = shift;
633             $self->DESTROY;
634             bless $self, "AnyEvent::Memcached::destroyed";
635 0     0     }
636 0 0          
637 0           sub DESTROY {
  0            
638 0 0         my $self = shift;
639             warn "(".int($self).") Destroying AE:MC" if $self->{debug};
640             for (values %{$self->{peers}}) {
641 0           $_->{con} and $_->{con}->destroy;
642             }
643             # TODO
644             %$self = ();
645             }
646              
647             =head1 BUGS
648              
649             Since there is developer release, there may be a lot of bugs
650              
651             Feature requests are welcome
652              
653             Bug reports are welcome
654              
655             =head1 AUTHOR
656              
657             Mons Anderson, C<< >>
658              
659             =head1 COPYRIGHT & LICENSE
660              
661             Copyright 2009 Mons Anderson, all rights reserved.
662              
663             This program is free software; you can redistribute it and/or modify it
664             under the same terms as Perl itself.
665              
666              
667             =cut
668              
669             1; # End of AnyEvent::Memcached