File Coverage

blib/lib/Cache/Memcached/AnyEvent.pm
Criterion Covered Total %
statement 32 246 13.0
branch 1 82 1.2
condition 0 48 0.0
subroutine 10 49 20.4
pod 16 16 100.0
total 59 441 13.3


line stmt bran cond sub pod time code
1             # Cache::Memcached::AnyEvent
2             # is the API
3             # Cache::Memcached::AnyEvent::Selector
4             # is the guts that selects sockets to talk to.
5             # Cache::Memcached::AnyEvent::Protocol
6             # is the guy that actually speaks to memcached
7              
8             package Cache::Memcached::AnyEvent;
9 2     2   109894 use strict;
  2         6  
  2         77  
10 2     2   1740 use AnyEvent;
  2         5831  
  2         49  
11 2     2   2535 use AnyEvent::Handle;
  2         40962  
  2         91  
12 2     2   2528 use AnyEvent::Socket;
  2         39095  
  2         327  
13 2     2   29 use Carp;
  2         5  
  2         113  
14 2     2   2550 use Storable ();
  2         8882  
  2         61  
15 2     2   23 use Scalar::Util ();
  2         4  
  2         90  
16              
17             use constant +{
18 2         6 HAVE_ZLIB => eval { require Compress::Zlib; 1 },
  2         175089  
  2         270200  
19             F_STORABLE => 1,
20             F_COMPRESS => 2,
21             COMPRESS_SAVINGS => 0.20,
22 2     2   12 };
  2         3  
23              
24             our $VERSION = '0.00021';
25              
26             sub new {
27 0     0 1   my $class = shift;
28 0 0         my %args = @_ == 1 ? %{$_[0]} : @_;
  0            
29              
30 0   0       my $protocol_class = delete $args{protocol_class} || 'Text';
31 0   0       my $selector_class = delete $args{selector_class} || 'Traditional';
32 0           my $self = bless {
33             auto_reconnect => 5,
34             compress_threshold => 10_000,
35             protocol => undef,
36             reconnect_delay => 5,
37             servers => undef,
38             namespace => undef,
39             %args,
40             _active_servers => [],
41             _active_server_count => 0,
42             _is_connected => undef,
43             _is_connecting => undef,
44             _queue => [],
45             _server_handles => undef,
46             }, $class;
47              
48 0   0       $self->{selector} ||= $self->_build_selector( $selector_class );
49 0   0       $self->{protocol} ||= $self->_build_protocol( $protocol_class );
50              
51 0           $self->{selector}->set_servers( $self->{servers} );
52              
53 0           return $self;
54             }
55              
56             sub _build_selector {
57 0     0     $_[0]->_build_helper( 'Cache::Memcached::AnyEvent::Selector', $_[1] );
58             }
59             sub _build_protocol {
60 0     0     $_[0]->_build_helper( 'Cache::Memcached::AnyEvent::Protocol', $_[1] );
61             }
62             sub _build_helper {
63 0     0     my ($self, $prefix, $klass) = @_;
64 0 0         if ($klass !~ s/^\+//) {
65 0           $klass = "${prefix}::$klass";
66             }
67              
68 0           $klass =~ s/[^\w:_]//g; # cleanse
69              
70 0           eval "require $klass";
71 0 0         Carp::confess $@ if $@;
72 0           return $klass->new(memcached => $self);
73             }
74              
75             BEGIN {
76 2     2   8 foreach my $attr ( qw(auto_reconnect compress_threshold reconnect_delay servers namespace) ) {
77 10 0   0 1 779 eval <
  0 0   0 1    
  0 0   0 1    
  0 0   0 1    
  0 0   0 1    
  0            
  0            
  0            
  0            
  0            
  0            
  0            
  0            
  0            
  0            
  0            
  0            
  0            
  0            
  0            
  0            
  0            
  0            
  0            
  0            
  0            
78             sub $attr {
79             my \$self = shift;
80             my \$ret = \$self->{$attr};
81             if (\@_) {
82             \$self->{$attr} = shift;
83             }
84             return \$ret;
85             }
86             EOSUB
87 10 50       2311 Carp::confess if $@;
88             }
89             }
90              
91             sub protocol {
92 0     0 1   my $self = shift;
93 0           my $ret = $self->{protocol};
94 0 0         if (@_) {
95 0           my $obj = shift;
96 0           my $class = ref $obj;
97 0           $self->{protocol} = $obj;
98             }
99 0           return $ret;
100             }
101              
102             sub selector {
103 0     0 1   my $self = shift;
104 0           my $ret = $self->{selector};
105 0 0         if (@_) {
106 0           my $obj = shift;
107 0           my $class = ref $obj;
108 0           $self->{selector} = $obj;
109             }
110 0           return $ret;
111             }
112              
113             sub _connect_one {
114 0     0     my ($self, $server, $cv) = @_;
115              
116 0 0         return if $self->{_is_connecting}->{$server};
117              
118 0 0         $cv->begin if $cv;
119 0           my ($host, $port) = split( /:/, $server );
120 0   0       $port ||= 11211;
121              
122             $self->{_is_connecting}->{$server} = tcp_connect $host, $port, sub {
123 0     0     $self->_on_tcp_connect($server, @_);
124 0 0         $cv->end if $cv;
125 0           };
126             }
127              
128             sub _on_tcp_connect {
129 0     0     my ($self, $server, $fh, $host, $port) = @_;
130              
131 0           delete $self->{_is_connecting}->{$server}; # thanks, buddy
132 0 0         if (! $fh) {
133             # connect failed
134 0           warn "failed to connect to $server";
135              
136 0 0         if ($self->{auto_reconnect} > $self->{_connect_attempts}->{ $server }++) {
137             # XXX this watcher holds a reference to $self, which means
138             # it will make your program wait for it to fire until
139             # auto_reconnect attempts have been made.
140             # if you need to close immediately, you need to call disconnect
141             $self->{_reconnect}->{$server} = AE::timer $self->{reconnect_delay}, 0, sub {
142 0     0     delete $self->{_reconnect}->{$server};
143 0           $self->_connect_one($server);
144 0           };
145             }
146             } else {
147 0           my $h; $h = AnyEvent::Handle->new(
148             fh => $fh,
149             on_drain => sub {
150 0     0     my $h = shift;
151 0 0 0       if (defined $h->{wbuf} && $h->{wbuf} eq "") {
152 0           delete $h->{wbuf}; $h->{wbuf} = "";
  0            
153             }
154 0 0 0       if (defined $h->{rbuf} && $h->{rbuf} eq "") {
155 0           delete $h->{rbuf}; $h->{rbuf} = "";
  0            
156             }
157             },
158             on_eof => sub {
159 0     0     my $h = delete $self->{_server_handles}->{$server};
160 0           $h->destroy();
161 0           undef $h;
162             },
163             on_error => sub {
164 0     0     my $h = delete $self->{_server_handles}->{$server};
165 0           $h->destroy();
166 0 0         $self->_connect_one($server) if $self->{auto_reconnect};
167 0           undef $h;
168             },
169 0           );
170              
171 0           $self->_add_active_server( $server, $h );
172 0           delete $self->{_connect_attempts}->{ $server };
173 0           $self->protocol->prepare_handle( $fh );
174             }
175             }
176              
177             sub _add_active_server {
178 0     0     my ($self, $server, $h) = @_;
179 0           push @{$self->{_active_servers}}, $server;
  0            
180 0           $self->{_active_server_count}++;
181 0           $self->{_server_handles}->{ $server } = $h;
182             }
183              
184             sub add_server {
185 0     0 1   my ($self, @servers) = @_;
186 0           push @{$self->{servers}}, @servers;
  0            
187 0           $self->selector->set_servers( $self->{servers} );
188             }
189              
190             sub connect {
191 0     0 1   my $self = shift;
192              
193 0 0 0       return if $self->{_is_connecting} || $self->{_is_connected};
194 0           $self->disconnect();
195              
196 0           $self->{_is_connecting} = {};
197 0           $self->{_active_servers} = [];
198 0           $self->{_active_server_count} = 0;
199             my $connect_cv = AE::cv {
200 0     0     delete $self->{_is_connecting};
201 0 0         if (! $self->{_active_server_count}) {
202 0           die "Failed to connect to any memcached servers";
203             }
204              
205 0           $self->{_is_connected} = 1;
206              
207 0 0         if (my $cb = $self->{ on_connect }) {
208 0           $cb->($self);
209             }
210 0           $self->_drain_queue;
211 0           };
212              
213 0           foreach my $server ( @{ $self->{ servers } }) {
  0            
214 0           $self->_connect_one($server, $connect_cv);
215             }
216             }
217              
218             sub delete {
219 0     0 1   my ($self, @args) = @_;
220 0 0 0       my $cb = pop @args if (ref $args[-1] eq 'CODE' or ref $args[-1] eq 'AnyEvent::CondVar');
221 0           my $noreply = !defined $cb;
222 0           $self->_push_queue( $self->protocol->delete($self, @args, $noreply, $cb ) );
223             }
224              
225 0     0 1   sub get_handle { shift->{_server_handles}->{ $_[0] } }
226              
227             {
228             my $installer = sub {
229             my ($name, $code) = @_;
230             {
231 2     2   26 no strict 'refs';
  2         2  
  2         3019  
232             *{$name} = $code;
233             }
234             };
235              
236             foreach my $method ( qw( get get_multi ) ) {
237             $installer->( $method, sub {
238 0     0     my ($self, $keys, $cb) = @_;
239 0           Scalar::Util::weaken($self);
240 0           $self->_push_queue( $self->protocol->$method($self, $keys, $cb) );
241             } );
242             }
243              
244             foreach my $method ( qw( decr incr ) ) {
245             $installer->($method, sub {
246 0     0     my ($self, @args) = @_;
247 0 0 0       my $cb = pop @args if (ref $args[-1] eq 'CODE' or ref $args[-1] eq 'AnyEvent::CondVar');
248 0           my ($key, $value, $initial) = @args;
249 0           Scalar::Util::weaken($self);
250 0           $self->_push_queue( $self->protocol->$method( $self, $key, $value, $initial, $cb ) );
251             });
252             }
253              
254             foreach my $method ( qw(add append prepend replace set) ) {
255             $installer->($method, sub {
256 0     0     my ($self, @args) = @_;
257 0 0 0       my $cb = pop @args if (ref $args[-1] eq 'CODE' or ref $args[-1] eq 'AnyEvent::CondVar');
258 0           my ($key, $value, $exptime, $noreply) = @args;
259 0           Scalar::Util::weaken($self);
260 0           $self->_push_queue( $self->protocol->$method( $self, $key, $value, $exptime, $noreply, $cb ) );
261             });
262             }
263             }
264              
265             sub stats {
266 0     0 1   my ($self, @args) = @_;
267 0 0 0       my $cb = pop @args if (ref $args[-1] eq 'CODE' or ref $args[-1] eq 'AnyEvent::CondVar');
268 0           my ($name) = @args;
269 0           $self->_push_queue( $self->protocol->stats($self, $name, $cb) );
270             }
271              
272             sub version {
273 0     0 1   my ($self, $cb) = @_;
274 0           $self->_push_queue( $self->protocol->version($self, $cb) );
275             }
276              
277             sub flush_all {
278 0     0 1   my ($self, @args) = @_;
279 0 0 0       my $cb = pop @args if (ref $args[-1] eq 'CODE' or ref $args[-1] eq 'AnyEvent::CondVar');
280 0           my $noreply = !!$cb;
281 0   0       my $delay = shift @args || 0;
282 0           $self->_push_queue( $self->protocol->flush_all($self, $delay, $noreply, $cb) );
283             }
284              
285             sub _push_queue {
286 0     0     my ($self, $cb) = @_;
287 0           push @{$self->{queue}}, $cb;
  0            
288 0 0         $self->_drain_queue unless $self->{_is_draining};
289             }
290              
291             sub _drain_queue {
292 0     0     my $self = shift;
293 0 0         if (! $self->{_is_connected}) {
294 0 0         if ($self->{_is_connecting}) {
295 0           return;
296             }
297 0           $self->connect;
298 0           return;
299             }
300              
301 0 0         if ($self->{_is_draining}) {
302 0           return;
303             }
304 0           my $cb = shift @{$self->{queue}};
  0            
305 0 0         return unless $cb;
306              
307             my $guard = AnyEvent::Util::guard {
308 0     0     my $t; $t = AE::timer 0, 0, sub {
309 0           $self->{_is_draining}--;
310 0           undef $t;
311 0           $self->_drain_queue;
312 0           };
313 0           };
314 0           $self->{_is_draining}++;
315 0           $cb->($guard);
316             }
317              
318             sub disconnect {
319 0     0 1   my $self = shift;
320              
321 0           my $handles = delete $self->{_server_handles};
322 0           foreach my $handle ( values %$handles ) {
323 0 0         if ($handle) {
324 0           eval {
325 0           $handle->stop_read;
326 0           $handle->push_shutdown();
327 0           $handle->destroy();
328             };
329             }
330             }
331              
332 0           delete $self->{_is_connecting};
333 0           delete $self->{_is_connected};
334 0           delete $self->{_is_draining};
335             }
336              
337             sub DESTROY {
338 0     0     my $self = shift;
339 0           $self->disconnect;
340             }
341            
342             sub _get_handle_for {
343 0     0     $_[0]->{selector}->get_handle($_[1]);
344             }
345              
346             sub _prepare_key {
347 0     0     my ($self, $key) = @_;
348 0 0         if (my $ns = $self->{namespace}) {
349 0           $key = $ns . $key;
350             }
351 0           return $key;
352             }
353              
354             sub _decode_key_value {
355 0     0     my ($self, $key_ref, $flags_ref, $data_ref) = @_;
356              
357 0 0         if (my $ns = $self->{namespace}) {
358 0           $$key_ref =~ s/^$ns//;
359             }
360              
361 0 0 0       if (defined $$flags_ref && defined $$data_ref) {
362 0 0 0       if ($$flags_ref & F_COMPRESS() && HAVE_ZLIB()) {
363 0           $$data_ref = Compress::Zlib::memGunzip($$data_ref);
364             }
365 0 0         if ($$flags_ref & F_STORABLE()) {
366 0           $$data_ref = Storable::thaw($$data_ref);
367             }
368             }
369 0           return ();
370             }
371              
372             sub _prepare_value {
373 0     0     my ($self, $cmd, $value_ref, $len_ref, $exptime_ref, $flags_ref) = @_;
374              
375 0           $$flags_ref = 0;
376 0 0         if (ref $$value_ref) {
377 0           $$value_ref = Storable::nfreeze($$value_ref);
378 0           $$flags_ref |= F_STORABLE();
379             }
380              
381 0           $$len_ref = bytes::length($$value_ref);
382 0           my $threshold = $self->compress_threshold;
383 0   0       my $compressable =
384             ($cmd ne 'append' && $cmd ne 'prepend') &&
385             $threshold &&
386             HAVE_ZLIB() &&
387             $$len_ref >= $threshold
388             ;
389              
390 0 0         if ($compressable) {
391 0           my $c_val = Compress::Zlib::memGzip($$value_ref);
392 0           my $c_len = bytes::length($c_val);
393              
394 0 0         if ($c_len < $$len_ref * ( 1 - COMPRESS_SAVINGS() ) ) {
395 0           $$value_ref = $c_val;
396 0           $$len_ref = $c_len;
397 0           $$flags_ref |= F_COMPRESS();
398             }
399             }
400 0   0       $$exptime_ref = int($$exptime_ref || 0);
401             }
402              
403             1;
404              
405             __END__