File Coverage

blib/lib/Memcached/Client/Connection.pm
Criterion Covered Total %
statement 16 80 20.0
branch 0 24 0.0
condition 0 5 0.0
subroutine 6 17 35.2
pod 8 8 100.0
total 30 134 22.3


line stmt bran cond sub pod time code
1             package Memcached::Client::Connection;
2             BEGIN {
3 2     2   60 $Memcached::Client::Connection::VERSION = '2.01';
4             }
5             # ABSTRACT: Class to manage Memcached::Client server connections
6              
7 2     2   14 use strict;
  2         6  
  2         78  
8 2     2   14 use warnings;
  2         6  
  2         76  
9 2     2   14 use AnyEvent qw{};
  2         6  
  2         36  
10 2     2   12 use AnyEvent::Handle qw{};
  2         4  
  2         58  
11 2     2   1884 use Memcached::Client::Log qw{DEBUG LOG};
  2         8  
  2         2386  
12              
13              
14             sub new {
15 0     0 1   my ($class, $server, $protocol) = @_;
16 0 0         die "You must give me a server to connect to.\n" unless ($server);
17 0 0         die "You must give me a protocol to use.\n" unless ($protocol);
18 0 0         $server .= ":11211" unless 0 < index $server, ':';
19 0           my $self = {attempts => 0, protocol => $protocol, queue => [], server => $server};
20 0           bless $self, $class;
21             }
22              
23              
24             sub log {
25 0     0 1   my ($self, $format, @args) = @_;
26 0           LOG ("Connection/%s> " . $format, $self->{server}, @args);
27             }
28              
29              
30             sub connect {
31 0     0 1   my ($self) = @_;
32 0 0         unless ($self->{handle}) {
33 0           $self->log ("Initiating connection", $self->{server}) if DEBUG;
34             $self->{handle} = AnyEvent::Handle->new (connect => [split /:/, $self->{server}],
35             keepalive => 1,
36             on_connect => sub {
37 0     0     $self->log ("Connected") if DEBUG;
38 0           $self->{attempts} = 0;
39 0           $self->dequeue;
40             },
41             on_error => sub {
42 0     0     my ($handle, $fatal, $message) = @_;
43 0           $self->log ("Removing handle") if DEBUG;
44 0           delete $self->{handle};
45 0 0 0       if ($message eq "Broken pipe") {
    0          
46 0           $self->log ("Broken pipe, reconnecting") if DEBUG;
47 0           $self->connect;
48             } elsif ($message eq "Connection timed out" and ++$self->{attempts} < 5) {
49 0           $self->log ("Connection timed out, retrying") if DEBUG;
50 0           $self->connect;
51             } else {
52 0           $self->log ("Error %s, failing", $message) if DEBUG;
53 0           $self->fail;
54             }
55             },
56             on_prepare => sub {
57 0     0     my ($handle) = @_;
58 0           $self->log ("Preparing handle") if DEBUG;
59 0 0         $self->{protocol}->prepare_handle ($handle) if ($self->{protocol}->can ("prepare_handle"));
60 0   0       return $self->{connect_timeout} || 0.5;
61 0           });
62             }
63             }
64              
65              
66             sub disconnect {
67 0     0 1   my ($self) = @_;
68              
69 0           $self->log ("Disconnecting") if DEBUG;
70 0 0         if (my $handle = delete $self->{handle}) {
71 0           $self->log ("Shutting down handle") if DEBUG;
72 0           $handle->destroy();
73             }
74              
75 0           $self->log ("Failing all requests") if DEBUG;
76 0           $self->fail;
77             }
78              
79              
80             sub enqueue {
81 0     0 1   my ($self, $request) = @_;
82 0           $self->log ("Request is %s", $request) if DEBUG;
83 0           push @{$self->{queue}}, $request;
  0            
84 0           $self->dequeue;
85             }
86              
87              
88             sub dequeue {
89 0     0 1   my ($self) = @_;
90 0 0         if ($self->{handle}) {
91 0 0         return if ($self->{executing});
92 0 0         if ($self->{executing} = shift @{$self->{queue}}) {
  0            
93 0           $self->log ("Initiating request") if DEBUG;
94 0           my $command = $self->{executing}->{type};
95 0           $self->{protocol}->$command ($self, $self->{executing});
96             }
97             } else {
98 0           $self->connect;
99             }
100             }
101              
102              
103             sub complete {
104 0     0 1   my ($self) = @_;
105 0           $self->log ("Done with request") if DEBUG;
106 0           delete $self->{executing};
107 0           goto &dequeue;
108             }
109              
110              
111             sub fail {
112 0     0 1   my ($self) = @_;
113 0           $self->log ("Checking for executing request") if DEBUG;
114 0 0         if (my $executing = delete $self->{executing}) {
115 0           $self->log ("Failing executing request %s", $executing) if DEBUG;
116 0           $executing->result;
117             }
118 0           $self->log ("Failing requests in queue: %s", $self->{queue}) if DEBUG;
119 0           while (my $request = shift @{$self->{queue}}) {
  0            
120 0           $self->log ("Failing request %s", $request) if DEBUG;
121 0           $request->result;
122             }
123             }
124              
125             1;
126              
127             __END__