line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
package AnyEvent::Memcached::Peer; |
2
|
|
|
|
|
|
|
|
3
|
5
|
|
|
5
|
|
24
|
use common::sense; |
|
5
|
|
|
|
|
6
|
|
|
5
|
|
|
|
|
25
|
|
4
|
5
|
|
|
5
|
|
205
|
use base 'AnyEvent::Connection'; |
|
5
|
|
|
|
|
8
|
|
|
5
|
|
|
|
|
2099
|
|
5
|
5
|
|
|
5
|
|
132650
|
use Carp; |
|
5
|
|
|
|
|
8
|
|
|
5
|
|
|
|
|
246
|
|
6
|
5
|
|
|
5
|
|
23
|
use AnyEvent::Connection::Util; |
|
5
|
|
|
|
|
5
|
|
|
5
|
|
|
|
|
16
|
|
7
|
5
|
|
|
5
|
|
145
|
use Scalar::Util qw(weaken); |
|
5
|
|
|
|
|
6
|
|
|
5
|
|
|
|
|
187
|
|
8
|
5
|
|
|
5
|
|
26
|
use Devel::Leak::Cb; |
|
5
|
|
|
|
|
6
|
|
|
5
|
|
|
|
|
33
|
|
9
|
|
|
|
|
|
|
sub DEBUG () { 0 } |
10
|
|
|
|
|
|
|
|
11
|
5
|
|
|
5
|
|
1996
|
use AnyEvent::Memcached::Conn; |
|
5
|
|
|
|
|
7
|
|
|
5
|
|
|
|
|
4057
|
|
12
|
|
|
|
|
|
|
|
13
|
|
|
|
|
|
|
sub new { |
14
|
0
|
|
|
0
|
1
|
|
my $self = shift->SUPER::new( |
15
|
|
|
|
|
|
|
rawcon => 'AnyEvent::Memcached::Conn', |
16
|
|
|
|
|
|
|
reconnect => 1, |
17
|
|
|
|
|
|
|
@_, |
18
|
|
|
|
|
|
|
); |
19
|
0
|
|
|
|
|
|
$self->{waitingcb} = {}; |
20
|
0
|
|
|
|
|
|
$self; |
21
|
|
|
|
|
|
|
} |
22
|
|
|
|
|
|
|
|
23
|
|
|
|
|
|
|
sub connect { |
24
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
25
|
0
|
0
|
|
|
|
|
$self->{connecting} and return; |
26
|
0
|
|
|
0
|
|
|
$self->{grd}{con} = $self->reg_cb( connected => sub { $self->{failed} = 0; } ); |
|
0
|
|
|
|
|
|
|
27
|
0
|
|
|
0
|
|
|
$self->{grd}{cfl} = $self->reg_cb( connfail => sub { $self->{failed} = 1; } ); |
|
0
|
|
|
|
|
|
|
28
|
|
|
|
|
|
|
$self->{grd}{dis} = $self->reg_cb( disconnect => sub { |
29
|
0
|
|
|
0
|
|
|
shift;shift; |
|
0
|
|
|
|
|
|
|
30
|
0
|
0
|
|
|
|
|
%$self or return; |
31
|
0
|
0
|
|
|
|
|
warn "Peer $self->{host}:$self->{port} disconnected".(@_ ? ": @_" : '')."\n" if $self->{debug}; |
|
|
0
|
|
|
|
|
|
32
|
0
|
0
|
|
|
|
|
my $e = @_ ? "@_" : "disconnected"; |
33
|
0
|
|
|
|
|
|
for ( keys %{$self->{waitingcb}} ) { |
|
0
|
|
|
|
|
|
|
34
|
0
|
0
|
|
|
|
|
if ($self->{waitingcb}{$_}) { |
35
|
|
|
|
|
|
|
#warn "Cleanup: ",::sub_fullname( $self->{waitingcb}{$_} ); |
36
|
0
|
|
|
|
|
|
$self->{waitingcb}{$_}(undef,$e); |
37
|
|
|
|
|
|
|
} |
38
|
0
|
|
|
|
|
|
delete $self->{waitingcb}{$_}; |
39
|
|
|
|
|
|
|
} |
40
|
0
|
|
|
|
|
|
} ); |
41
|
0
|
|
|
|
|
|
$self->SUPER::connect(@_); |
42
|
0
|
|
|
|
|
|
return; |
43
|
|
|
|
|
|
|
} |
44
|
|
|
|
|
|
|
|
45
|
|
|
|
|
|
|
sub conntrack { |
46
|
0
|
|
|
0
|
0
|
|
my $self = shift; |
47
|
0
|
|
|
|
|
|
my ($method,$args,$cb) = @_; |
48
|
0
|
0
|
0
|
|
|
|
if($self->{connecting} and $self->{failed}) { |
|
|
0
|
|
|
|
|
|
49
|
0
|
|
|
|
|
|
warn "Is connecting, have fails => not connected" if DEBUG; |
50
|
0
|
0
|
|
|
|
|
$cb and $cb->(undef, "Not connected"); |
51
|
0
|
|
|
|
|
|
return; |
52
|
|
|
|
|
|
|
} |
53
|
|
|
|
|
|
|
elsif (!$self->{connected}) { |
54
|
0
|
|
|
|
|
|
my @args = @$args; # copy to avoid rewriting |
55
|
0
|
|
|
|
|
|
warn "Not connected, do connect for ".\@args.", ".dumper($args[0]) if DEBUG; |
56
|
0
|
|
|
|
|
|
my ($c,$t); |
57
|
0
|
0
|
|
|
|
|
weaken( $self->{waitingcb}{int $cb} = $cb ) if $cb; |
58
|
0
|
|
|
|
|
|
weaken( $self ); |
59
|
|
|
|
|
|
|
# This rely on correct event invocation order of Object::Event. |
60
|
|
|
|
|
|
|
# If this could change, I'll add own queue |
61
|
|
|
|
|
|
|
$c = $self->reg_cb( |
62
|
|
|
|
|
|
|
connected => sub { |
63
|
0
|
|
|
0
|
|
|
shift->unreg_me; |
64
|
|
|
|
|
|
|
#$c or return; |
65
|
0
|
|
|
|
|
|
warn "connected cb for ".\@args.", ".dumper($args[0]) if DEBUG; |
66
|
0
|
|
|
|
|
|
undef $c;undef $t; |
|
0
|
|
|
|
|
|
|
67
|
0
|
0
|
|
|
|
|
$self or return; |
68
|
0
|
0
|
|
|
|
|
delete $self->{waitingcb}{int $cb} if $cb; |
69
|
0
|
|
|
|
|
|
return $self->{con}->$method(@args); |
70
|
|
|
|
|
|
|
}, |
71
|
0
|
|
|
|
|
|
); |
72
|
|
|
|
|
|
|
$t = AnyEvent->timer( |
73
|
|
|
|
|
|
|
after => $self->{timeout}, |
74
|
|
|
|
|
|
|
cb => sub { |
75
|
|
|
|
|
|
|
#$t or return; |
76
|
0
|
|
|
0
|
|
|
warn "timeout cb for $args->[0]" if DEBUG; |
77
|
0
|
|
|
|
|
|
undef $c;undef $t; |
|
0
|
|
|
|
|
|
|
78
|
0
|
0
|
|
|
|
|
$self or return; |
79
|
0
|
0
|
|
|
|
|
if ($cb){ |
80
|
0
|
|
|
|
|
|
$self->{waitingcb}{int $cb}; |
81
|
0
|
|
|
|
|
|
$cb->(undef, "Connect timeout"); |
82
|
|
|
|
|
|
|
} |
83
|
|
|
|
|
|
|
}, |
84
|
0
|
|
|
|
|
|
); |
85
|
0
|
|
|
|
|
|
$self->connect(); |
86
|
|
|
|
|
|
|
} |
87
|
|
|
|
|
|
|
else { |
88
|
0
|
|
|
|
|
|
Carp::cluck "How do I get here?"; |
89
|
0
|
|
|
|
|
|
return $self->{con}->$method(@$args); |
90
|
|
|
|
|
|
|
} |
91
|
|
|
|
|
|
|
} |
92
|
|
|
|
|
|
|
|
93
|
|
|
|
|
|
|
sub command { |
94
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
95
|
0
|
0
|
|
|
|
|
if ($self->{connected}) { |
96
|
0
|
|
|
|
|
|
return $self->{con}->command( @_ ); |
97
|
|
|
|
|
|
|
} |
98
|
|
|
|
|
|
|
else { |
99
|
0
|
|
|
|
|
|
my ($cmd,%args) = @_; |
100
|
0
|
|
|
|
|
|
$self->conntrack( command => \@_, $args{cb} ); |
101
|
|
|
|
|
|
|
} |
102
|
|
|
|
|
|
|
} |
103
|
|
|
|
|
|
|
|
104
|
|
|
|
|
|
|
sub request { |
105
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
106
|
0
|
0
|
|
|
|
|
if ($self->{connected}) { |
107
|
0
|
|
|
|
|
|
return $self->{con}->say(@_); |
108
|
|
|
|
|
|
|
} |
109
|
|
|
|
|
|
|
else { |
110
|
|
|
|
|
|
|
# no cb |
111
|
0
|
|
|
|
|
|
$self->conntrack( say => \@_ ); |
112
|
|
|
|
|
|
|
} |
113
|
|
|
|
|
|
|
} |
114
|
|
|
|
|
|
|
|
115
|
|
|
|
|
|
|
sub reader { |
116
|
0
|
|
|
0
|
0
|
|
my $self = shift; |
117
|
0
|
0
|
|
|
|
|
if ($self->{connected}) { |
118
|
0
|
|
|
|
|
|
return $self->{con}->reader(@_); |
119
|
|
|
|
|
|
|
} |
120
|
|
|
|
|
|
|
else { |
121
|
0
|
|
|
|
|
|
my %args = @_; |
122
|
0
|
|
|
|
|
|
$self->conntrack( reader => \@_, $args{cb} ); |
123
|
|
|
|
|
|
|
} |
124
|
|
|
|
|
|
|
|
125
|
|
|
|
|
|
|
} |
126
|
|
|
|
|
|
|
|
127
|
|
|
|
|
|
|
1; |