File Coverage

lib/AnyEvent/Memcached/Peer.pm
Criterion Covered Total %
statement 18 90 20.0
branch 0 36 0.0
condition 0 3 0.0
subroutine 6 18 33.3
pod 4 7 57.1
total 28 154 18.1


line stmt bran cond sub pod time code
1             package #hide
2             AnyEvent::Memcached::Peer;
3              
4 4     4   22 use common::sense 2;m{
  4         100  
  4         31  
5             use strict;
6             use warnings;
7             }x;
8 4     4   289 use base 'AnyEvent::Connection';
  4         8  
  4         391  
9 4     4   20 use Carp;
  4         8  
  4         293  
10 4     4   29 use AnyEvent::Connection::Util;
  4         9  
  4         30  
11 4     4   164 use Scalar::Util qw(weaken);
  4         7  
  4         351  
12             #use Devel::Leak::Cb;
13             sub DEBUG () { 0 }
14              
15 4     4   21 use AnyEvent::Memcached::Conn;
  4         8  
  4         5693  
16              
17             sub new {
18 0     0 1   my $self = shift->SUPER::new(
19             rawcon => 'AnyEvent::Memcached::Conn',
20             reconnect => 1,
21             @_,
22             );
23 0           $self->{waitingcb} = {};
24 0           $self;
25             }
26              
27             sub connect {
28 0     0 1   my $self = shift;
29 0 0         $self->{connecting} and return;
30 0     0     $self->{grd}{con} = $self->reg_cb( connected => sub { $self->{failed} = 0; } );
  0            
31 0     0     $self->{grd}{cfl} = $self->reg_cb( connfail => sub { $self->{failed} = 1; } );
  0            
32             $self->{grd}{dis} = $self->reg_cb( disconnect => sub {
33 0     0     shift;shift;
  0            
34 0 0         %$self or return;
35 0 0         warn "Peer $self->{host}:$self->{port} disconnected".(@_ ? ": @_" : '')."\n" if $self->{debug};
    0          
36 0 0         my $e = @_ ? "@_" : "disconnected";
37 0           for ( keys %{$self->{waitingcb}} ) {
  0            
38 0 0         if ($self->{waitingcb}{$_}) {
39             #warn "Cleanup: ",::sub_fullname( $self->{waitingcb}{$_} );
40 0           $self->{waitingcb}{$_}(undef,$e);
41             }
42 0           delete $self->{waitingcb}{$_};
43             }
44 0           } );
45 0           $self->SUPER::connect(@_);
46 0           return;
47             }
48              
49             sub conntrack {
50 0     0 0   my $self = shift;
51 0           my ($method,$args,$cb) = @_;
52 0 0 0       if($self->{connecting} and $self->{failed}) {
    0          
53 0           warn "Is connecting, have fails => not connected" if DEBUG;
54 0 0         $cb and $cb->(undef, "Not connected");
55 0           return;
56             }
57             elsif (!$self->{connected}) {
58 0           my @args = @$args; # copy to avoid rewriting
59 0           warn time()." Not connected, do connect for ".\@args.", ".dumper($args[0]) if DEBUG;
60 0           my ($c,$t);
61 0 0         weaken( $self->{waitingcb}{int $cb} = $cb ) if $cb;
62 0           weaken( $self );
63             # This rely on correct event invocation order of Object::Event.
64             # If this could change, I'll add own queue
65             $c = $self->reg_cb(
66             connected => sub {
67 0     0     shift->unreg_me;
68             #$c or return;
69 0           warn "connected cb for ".\@args.", ".dumper($args[0]) if DEBUG;
70 0           undef $c;undef $t;
  0            
71 0 0         $self or return;
72 0 0         delete $self->{waitingcb}{int $cb} if $cb;
73 0           return $self->{con}->$method(@args);
74             },
75 0           );
76             $t = AnyEvent->timer(
77             after => $self->{timeout},# + 0.05, # Since there are timers inside connect, we need to delay a bit longer
78             cb => sub {
79             #$t or return;
80 0     0     warn time()." timeout $self->{timeout} cb for $args->[0]" if DEBUG;
81 0           undef $c;undef $t;
  0            
82 0 0         $self or return;
83 0 0         if ($cb){
84 0           $self->{waitingcb}{int $cb};
85 0           $cb->(undef, "Connect timeout");
86             }
87             },
88 0           );
89 0           $self->connect();
90             }
91             else {
92 0           Carp::cluck "How do I get here?";
93 0           return $self->{con}->$method(@$args);
94             }
95             }
96              
97             sub command {
98 0     0 1   my $self = shift;
99 0 0         if ($self->{connected}) {
100 0           return $self->{con}->command( @_ );
101             }
102             else {
103 0           my ($cmd,%args) = @_;
104 0           $self->conntrack( command => \@_, $args{cb} );
105             }
106             }
107              
108             sub request {
109 0     0 1   my $self = shift;
110 0 0         if ($self->{connected}) {
111 0           return $self->{con}->say(@_);
112             }
113             else {
114             # no cb
115 0           $self->conntrack( say => \@_ );
116             }
117             }
118              
119             sub reader {
120 0     0 0   my $self = shift;
121 0 0         if ($self->{connected}) {
122 0           return $self->{con}->reader(@_);
123             }
124             else {
125 0           my %args = @_;
126 0           $self->conntrack( reader => \@_, $args{cb} );
127             }
128            
129             }
130              
131             sub want_command {
132 0     0 0   my $self = shift;
133 0           warn "wanting command";
134 0 0         if ($self->{connected}) {
135 0           return $self->{con}->want_command(@_);
136             }
137             else {
138 0           my %args = @_;
139 0           $self->conntrack( want_command => \@_ );
140             }
141             }
142              
143             1;