File Coverage

lib/AnyEvent/Memcached/Peer.pm
Criterion Covered Total %
statement 21 87 24.1
branch 0 34 0.0
condition 0 3 0.0
subroutine 7 18 38.8
pod 4 6 66.6
total 32 148 21.6


line stmt bran cond sub pod time code
1             package AnyEvent::Memcached::Peer;
2              
3 5     5   24 use common::sense;
  5         6  
  5         25  
4 5     5   205 use base 'AnyEvent::Connection';
  5         8  
  5         2099  
5 5     5   132650 use Carp;
  5         8  
  5         246  
6 5     5   23 use AnyEvent::Connection::Util;
  5         5  
  5         16  
7 5     5   145 use Scalar::Util qw(weaken);
  5         6  
  5         187  
8 5     5   26 use Devel::Leak::Cb;
  5         6  
  5         33  
9             sub DEBUG () { 0 }
10              
11 5     5   1996 use AnyEvent::Memcached::Conn;
  5         7  
  5         4057  
12              
13             sub new {
14 0     0 1   my $self = shift->SUPER::new(
15             rawcon => 'AnyEvent::Memcached::Conn',
16             reconnect => 1,
17             @_,
18             );
19 0           $self->{waitingcb} = {};
20 0           $self;
21             }
22              
23             sub connect {
24 0     0 1   my $self = shift;
25 0 0         $self->{connecting} and return;
26 0     0     $self->{grd}{con} = $self->reg_cb( connected => sub { $self->{failed} = 0; } );
  0            
27 0     0     $self->{grd}{cfl} = $self->reg_cb( connfail => sub { $self->{failed} = 1; } );
  0            
28             $self->{grd}{dis} = $self->reg_cb( disconnect => sub {
29 0     0     shift;shift;
  0            
30 0 0         %$self or return;
31 0 0         warn "Peer $self->{host}:$self->{port} disconnected".(@_ ? ": @_" : '')."\n" if $self->{debug};
    0          
32 0 0         my $e = @_ ? "@_" : "disconnected";
33 0           for ( keys %{$self->{waitingcb}} ) {
  0            
34 0 0         if ($self->{waitingcb}{$_}) {
35             #warn "Cleanup: ",::sub_fullname( $self->{waitingcb}{$_} );
36 0           $self->{waitingcb}{$_}(undef,$e);
37             }
38 0           delete $self->{waitingcb}{$_};
39             }
40 0           } );
41 0           $self->SUPER::connect(@_);
42 0           return;
43             }
44              
45             sub conntrack {
46 0     0 0   my $self = shift;
47 0           my ($method,$args,$cb) = @_;
48 0 0 0       if($self->{connecting} and $self->{failed}) {
    0          
49 0           warn "Is connecting, have fails => not connected" if DEBUG;
50 0 0         $cb and $cb->(undef, "Not connected");
51 0           return;
52             }
53             elsif (!$self->{connected}) {
54 0           my @args = @$args; # copy to avoid rewriting
55 0           warn "Not connected, do connect for ".\@args.", ".dumper($args[0]) if DEBUG;
56 0           my ($c,$t);
57 0 0         weaken( $self->{waitingcb}{int $cb} = $cb ) if $cb;
58 0           weaken( $self );
59             # This rely on correct event invocation order of Object::Event.
60             # If this could change, I'll add own queue
61             $c = $self->reg_cb(
62             connected => sub {
63 0     0     shift->unreg_me;
64             #$c or return;
65 0           warn "connected cb for ".\@args.", ".dumper($args[0]) if DEBUG;
66 0           undef $c;undef $t;
  0            
67 0 0         $self or return;
68 0 0         delete $self->{waitingcb}{int $cb} if $cb;
69 0           return $self->{con}->$method(@args);
70             },
71 0           );
72             $t = AnyEvent->timer(
73             after => $self->{timeout},
74             cb => sub {
75             #$t or return;
76 0     0     warn "timeout cb for $args->[0]" if DEBUG;
77 0           undef $c;undef $t;
  0            
78 0 0         $self or return;
79 0 0         if ($cb){
80 0           $self->{waitingcb}{int $cb};
81 0           $cb->(undef, "Connect timeout");
82             }
83             },
84 0           );
85 0           $self->connect();
86             }
87             else {
88 0           Carp::cluck "How do I get here?";
89 0           return $self->{con}->$method(@$args);
90             }
91             }
92              
93             sub command {
94 0     0 1   my $self = shift;
95 0 0         if ($self->{connected}) {
96 0           return $self->{con}->command( @_ );
97             }
98             else {
99 0           my ($cmd,%args) = @_;
100 0           $self->conntrack( command => \@_, $args{cb} );
101             }
102             }
103              
104             sub request {
105 0     0 1   my $self = shift;
106 0 0         if ($self->{connected}) {
107 0           return $self->{con}->say(@_);
108             }
109             else {
110             # no cb
111 0           $self->conntrack( say => \@_ );
112             }
113             }
114              
115             sub reader {
116 0     0 0   my $self = shift;
117 0 0         if ($self->{connected}) {
118 0           return $self->{con}->reader(@_);
119             }
120             else {
121 0           my %args = @_;
122 0           $self->conntrack( reader => \@_, $args{cb} );
123             }
124            
125             }
126              
127             1;