|  line  | 
 stmt  | 
 bran  | 
 cond  | 
 sub  | 
 pod  | 
 time  | 
 code  | 
| 
1
 | 
  
 
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 # private base class for poolable HTTP/Mogstored sidechannel connections  | 
| 
2
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 # This is currently only used by HTTP, but is intended for Mogstored  | 
| 
3
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 # connections, too.  | 
| 
4
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 package MogileFS::Connection::Poolable;  | 
| 
5
 | 
21
 | 
 
 | 
 
 | 
  
21
  
 | 
 
 | 
107
 | 
 use strict;  | 
| 
 
 | 
21
 | 
 
 | 
 
 | 
 
 | 
 
 | 
32
 | 
    | 
| 
 
 | 
21
 | 
 
 | 
 
 | 
 
 | 
 
 | 
451
 | 
    | 
| 
6
 | 
21
 | 
 
 | 
 
 | 
  
21
  
 | 
 
 | 
81
 | 
 use warnings;  | 
| 
 
 | 
21
 | 
 
 | 
 
 | 
 
 | 
 
 | 
34
 | 
    | 
| 
 
 | 
21
 | 
 
 | 
 
 | 
 
 | 
 
 | 
366
 | 
    | 
| 
7
 | 
21
 | 
 
 | 
 
 | 
  
21
  
 | 
 
 | 
77
 | 
 use Danga::Socket;  | 
| 
 
 | 
21
 | 
 
 | 
 
 | 
 
 | 
 
 | 
33
 | 
    | 
| 
 
 | 
21
 | 
 
 | 
 
 | 
 
 | 
 
 | 
407
 | 
    | 
| 
8
 | 
21
 | 
 
 | 
 
 | 
  
21
  
 | 
 
 | 
86
 | 
 use base qw(Danga::Socket);  | 
| 
 
 | 
21
 | 
 
 | 
 
 | 
 
 | 
 
 | 
41
 | 
    | 
| 
 
 | 
21
 | 
 
 | 
 
 | 
 
 | 
 
 | 
2704
 | 
    | 
| 
9
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 use fields (  | 
| 
10
 | 
21
 | 
 
 | 
 
 | 
 
 | 
 
 | 
117
 | 
     'mfs_pool',       # owner of the connection (MogileFS::ConnectionPool)  | 
| 
11
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
     'mfs_hostport',   # [ ip, port ]  | 
| 
12
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
     'mfs_expire',     # Danga::Socket::Timer object  | 
| 
13
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
     'mfs_expire_cb',  # Danga::Socket::Timer callback  | 
| 
14
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
     'mfs_requests',   # number of requests made on this object  | 
| 
15
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
     'mfs_err',        # used to propagate an error to start()  | 
| 
16
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
     'mfs_writeq',     # arrayref if connecting, undef otherwise  | 
| 
17
 | 
21
 | 
 
 | 
 
 | 
  
21
  
 | 
 
 | 
128
 | 
 );  | 
| 
 
 | 
21
 | 
 
 | 
 
 | 
 
 | 
 
 | 
42
 | 
    | 
| 
18
 | 
21
 | 
 
 | 
 
 | 
  
21
  
 | 
 
 | 
1712
 | 
 use Socket qw(SO_KEEPALIVE);  | 
| 
 
 | 
21
 | 
 
 | 
 
 | 
 
 | 
 
 | 
36
 | 
    | 
| 
 
 | 
21
 | 
 
 | 
 
 | 
 
 | 
 
 | 
821
 | 
    | 
| 
19
 | 
21
 | 
 
 | 
 
 | 
  
21
  
 | 
 
 | 
98
 | 
 use Time::HiRes;  | 
| 
 
 | 
21
 | 
 
 | 
 
 | 
 
 | 
 
 | 
34
 | 
    | 
| 
 
 | 
21
 | 
 
 | 
 
 | 
 
 | 
 
 | 
126
 | 
    | 
| 
20
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
21
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 # subclasses (MogileFS::Connection::{HTTP,Mogstored}) must call this sub  | 
| 
22
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 sub new {  | 
| 
23
 | 
10
 | 
 
 | 
 
 | 
  
10
  
 | 
  
1
  
 | 
70
 | 
     my ($self, $sock, $ip, $port) = @_;  | 
| 
24
 | 
10
 | 
 
 | 
 
 | 
 
 | 
 
 | 
66
 | 
     $self->SUPER::new($sock); # Danga::Socket->new  | 
| 
25
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
26
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
     # connection may not be established, yet  | 
| 
27
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
     # so Danga::Socket->peer_addr_string can't be used here  | 
| 
28
 | 
10
 | 
 
 | 
 
 | 
 
 | 
 
 | 
680
 | 
     $self->{mfs_hostport} = [ $ip, $port ];  | 
| 
29
 | 
10
 | 
 
 | 
 
 | 
 
 | 
 
 | 
21
 | 
     $self->{mfs_requests} = 0;  | 
| 
30
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
31
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
     # newly-created socket, we buffer writes until event_write is triggered  | 
| 
32
 | 
10
 | 
 
 | 
 
 | 
 
 | 
 
 | 
20
 | 
     $self->{mfs_writeq} = [];  | 
| 
33
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
34
 | 
10
 | 
 
 | 
 
 | 
 
 | 
 
 | 
25
 | 
     return $self;  | 
| 
35
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 }  | 
| 
36
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
37
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 # used by ConnectionPool for tracking per-hostport connection counts  | 
| 
38
 | 
75
 | 
 
 | 
 
 | 
  
75
  
 | 
  
0
  
 | 
111
 | 
 sub key { join(':', @{$_[0]->{mfs_hostport}}); }  | 
| 
 
 | 
75
 | 
 
 | 
 
 | 
 
 | 
 
 | 
405
 | 
    | 
| 
39
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
40
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 # backwards compatibility  | 
| 
41
 | 
  
0
  
 | 
 
 | 
 
 | 
  
0
  
 | 
  
0
  
 | 
0
 | 
 sub host_port { $_[0]->key; }  | 
| 
42
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
43
 | 
  
0
  
 | 
 
 | 
 
 | 
  
0
  
 | 
  
0
  
 | 
0
 | 
 sub ip_port { @{$_[0]->{mfs_hostport}}; }  | 
| 
 
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
    | 
| 
44
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
45
 | 
110
 | 
 
 | 
 
 | 
  
110
  
 | 
  
0
  
 | 
317
 | 
 sub fd { fileno($_[0]->sock); }  | 
| 
46
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
47
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 # marks a connection as idle, call this before putting it in a connection  | 
| 
48
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 # pool for eventual reuse.  | 
| 
49
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 sub mark_idle {  | 
| 
50
 | 
11
 | 
 
 | 
 
 | 
  
11
  
 | 
  
0
  
 | 
27
 | 
     my ($self) = @_;  | 
| 
51
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
52
 | 
11
 | 
 
 | 
 
 | 
 
 | 
 
 | 
44
 | 
     $self->watch_read(0);  | 
| 
53
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
54
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
     # set the keepalive flag the first time we're idle  | 
| 
55
 | 
11
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
496
 | 
     $self->sock->sockopt(SO_KEEPALIVE, 1) if $self->{mfs_requests} == 0;  | 
| 
56
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
57
 | 
11
 | 
 
 | 
 
 | 
 
 | 
 
 | 
350
 | 
     $self->{mfs_requests}++;  | 
| 
58
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 }  | 
| 
59
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
60
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 sub write {  | 
| 
61
 | 
50
 | 
 
 | 
 
 | 
  
50
  
 | 
  
1
  
 | 
123
 | 
     my ($self, $arg) = @_;  | 
| 
62
 | 
50
 | 
 
 | 
 
 | 
 
 | 
 
 | 
83
 | 
     my $writeq = $self->{mfs_writeq};  | 
| 
63
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
64
 | 
50
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
105
 | 
     if (ref($writeq) eq "ARRAY") {  | 
| 
65
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
         # if we're still connecting, we must buffer explicitly for *BSD  | 
| 
66
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
         # and not attempt a real write() until event_write is triggered  | 
| 
67
 | 
20
 | 
 
 | 
 
 | 
 
 | 
 
 | 
37
 | 
         push @$writeq, $arg;  | 
| 
68
 | 
20
 | 
 
 | 
 
 | 
 
 | 
 
 | 
74
 | 
         $self->watch_write(1); # enable event_write triggering  | 
| 
69
 | 
20
 | 
 
 | 
 
 | 
 
 | 
 
 | 
655
 | 
         0; # match Danga::Socket::write return value  | 
| 
70
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
     } else {  | 
| 
71
 | 
30
 | 
 
 | 
 
 | 
 
 | 
 
 | 
239
 | 
         $self->SUPER::write($arg);  | 
| 
72
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
     }  | 
| 
73
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 }  | 
| 
74
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
75
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 # Danga::Socket will trigger this when a socket is writable  | 
| 
76
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 sub event_write {  | 
| 
77
 | 
10
 | 
 
 | 
 
 | 
  
10
  
 | 
  
1
  
 | 
2469
 | 
     my ($self) = @_;  | 
| 
78
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
79
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
     # we may have buffered writes in mfs_writeq during non-blocking connect(),  | 
| 
80
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
     # this is needed on *BSD but unnecessary (but harmless) on Linux.  | 
| 
81
 | 
10
 | 
 
 | 
 
 | 
 
 | 
 
 | 
80
 | 
     my $writeq = delete $self->{mfs_writeq};  | 
| 
82
 | 
10
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
51
 | 
     if ($writeq) {  | 
| 
83
 | 
10
 | 
 
 | 
 
 | 
 
 | 
 
 | 
158
 | 
         $self->watch_write(0); # ->write will re-enable if needed  | 
| 
84
 | 
10
 | 
 
 | 
 
 | 
 
 | 
 
 | 
532
 | 
         foreach my $queued (@$writeq) {  | 
| 
85
 | 
20
 | 
 
 | 
 
 | 
 
 | 
 
 | 
880
 | 
             $self->write($queued);  | 
| 
86
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
         }  | 
| 
87
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
     } else {  | 
| 
88
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
         $self->SUPER::event_write();  | 
| 
89
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
     }  | 
| 
90
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 }  | 
| 
91
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
92
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 # the request running on this connection is retryable if this socket  | 
| 
93
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 # has ever been marked idle.  The connection pool can never be 100%  | 
| 
94
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 # reliable for detecting dead sockets, and all HTTP requests made by  | 
| 
95
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 # MogileFS are idempotent.  | 
| 
96
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 sub retryable {  | 
| 
97
 | 
3
 | 
 
 | 
 
 | 
  
3
  
 | 
  
0
  
 | 
12
 | 
     my ($self, $reason) = @_;  | 
| 
98
 | 
3
 | 
 
 | 
  
 66
  
 | 
 
 | 
 
 | 
80
 | 
     return ($reason !~ /timeout/ && $self->{mfs_requests} > 0);  | 
| 
99
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 }  | 
| 
100
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
101
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 # Sets (or updates) the timeout of the connection  | 
| 
102
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 # timeout_key is "node_timeout" or "conn_timeout"  | 
| 
103
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 # clears the current timeout if timeout_key is undef  | 
| 
104
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 sub set_timeout {  | 
| 
105
 | 
42
 | 
 
 | 
 
 | 
  
42
  
 | 
  
0
  
 | 
193
 | 
     my ($self, $timeout_key) = @_;  | 
| 
106
 | 
42
 | 
 
 | 
 
 | 
 
 | 
 
 | 
94
 | 
     my $mfs_pool = $self->{mfs_pool};  | 
| 
107
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
108
 | 
42
 | 
 
 | 
 
 | 
 
 | 
 
 | 
224
 | 
     $self->SetPostLoopCallback(undef);  | 
| 
109
 | 
42
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
598
 | 
     if ($timeout_key) {  | 
| 
110
 | 
31
 | 
 
 | 
 
 | 
 
 | 
 
 | 
47
 | 
         my $timeout;  | 
| 
111
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
112
 | 
31
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
370
 | 
         if ($timeout_key =~ /\A[a-z_]+\z/) {  | 
| 
113
 | 
31
 | 
 
 | 
  
 50
  
 | 
 
 | 
 
 | 
343
 | 
             $timeout = MogileFS->config($timeout_key) || 2;  | 
| 
114
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
         } else {  | 
| 
115
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
             $timeout = $timeout_key;  | 
| 
116
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
             $timeout_key = "timeout";  | 
| 
117
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
         }  | 
| 
118
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
119
 | 
31
 | 
 
 | 
 
 | 
 
 | 
 
 | 
109
 | 
         my $t0 = Time::HiRes::time();  | 
| 
120
 | 
31
 | 
 
 | 
 
 | 
 
 | 
 
 | 
89
 | 
         $self->{mfs_expire} = $t0 + $timeout;  | 
| 
121
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
         $self->{mfs_expire_cb} = sub {  | 
| 
122
 | 
2
 | 
 
 | 
 
 | 
  
2
  
 | 
 
 | 
14
 | 
             my ($now) = @_;  | 
| 
123
 | 
2
 | 
 
 | 
 
 | 
 
 | 
 
 | 
11
 | 
             my $elapsed = $now - $t0;  | 
| 
124
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
125
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
             # for HTTP, this will fake an HTTP error response like LWP does  | 
| 
126
 | 
2
 | 
 
 | 
 
 | 
 
 | 
 
 | 
61
 | 
             $self->err("$timeout_key: $timeout (elapsed: $elapsed)");  | 
| 
127
 | 
31
 | 
 
 | 
 
 | 
 
 | 
 
 | 
386
 | 
         };  | 
| 
128
 | 
31
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
313
 | 
         $mfs_pool->register_timeout($self, $timeout) if $mfs_pool;  | 
| 
129
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
     } else {  | 
| 
130
 | 
11
 | 
 
 | 
 
 | 
 
 | 
 
 | 
95
 | 
         $self->{mfs_expire} = $self->{mfs_expire_cb} = undef;  | 
| 
131
 | 
11
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
59
 | 
         $mfs_pool->register_timeout($self, undef) if $mfs_pool;  | 
| 
132
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
     }  | 
| 
133
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 }  | 
| 
134
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
135
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 # returns the expiry time of the connection  | 
| 
136
 | 
4
 | 
 
 | 
 
 | 
  
4
  
 | 
  
0
  
 | 
32
 | 
 sub expiry { $_[0]->{mfs_expire} }  | 
| 
137
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
138
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 # runs expiry callback and returns true if time is up,  | 
| 
139
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 # returns false if there is time remaining  | 
| 
140
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 sub expired {  | 
| 
141
 | 
6
 | 
 
 | 
 
 | 
  
6
  
 | 
  
0
  
 | 
28
 | 
     my ($self, $now) = @_;  | 
| 
142
 | 
6
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
40
 | 
     my $expire = $self->{mfs_expire} or return 0;  | 
| 
143
 | 
6
 | 
 
 | 
  
 33
  
 | 
 
 | 
 
 | 
41
 | 
     $now ||= Time::HiRes::time();  | 
| 
144
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
145
 | 
6
 | 
  
100
  
 | 
 
 | 
 
 | 
 
 | 
147
 | 
     if ($now >= $expire) {  | 
| 
146
 | 
2
 | 
 
 | 
 
 | 
 
 | 
 
 | 
30
 | 
         my $expire_cb = delete $self->{mfs_expire_cb};  | 
| 
147
 | 
2
 | 
  
 50
  
 | 
  
 33
  
 | 
 
 | 
 
 | 
63
 | 
         if ($expire_cb && $self->sock) {  | 
| 
148
 | 
2
 | 
 
 | 
 
 | 
  
2
  
 | 
 
 | 
68
 | 
             $self->SetPostLoopCallback(sub { $expire_cb->($now); 1 });  | 
| 
 
 | 
2
 | 
 
 | 
 
 | 
 
 | 
 
 | 
291214
 | 
    | 
| 
 
 | 
2
 | 
 
 | 
 
 | 
 
 | 
 
 | 
58
 | 
    | 
| 
149
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
         }  | 
| 
150
 | 
2
 | 
 
 | 
 
 | 
 
 | 
 
 | 
58
 | 
         return 1;  | 
| 
151
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
     }  | 
| 
152
 | 
4
 | 
 
 | 
 
 | 
 
 | 
 
 | 
46
 | 
     return 0;  | 
| 
153
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 }  | 
| 
154
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
155
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 # may be overriden in subclass, called only on errors  | 
| 
156
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 # The HTTP version of this will fake an HTTP response for LWP compatibility  | 
| 
157
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 sub err {  | 
| 
158
 | 
  
0
  
 | 
 
 | 
 
 | 
  
0
  
 | 
  
0
  
 | 
0
 | 
     my ($self, $close_reason) = @_;  | 
| 
159
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
160
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
     $self->inflight_expire; # ensure we don't call new_err on eventual close()  | 
| 
161
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
162
 | 
  
0
  
 | 
  
  0
  
 | 
 
 | 
 
 | 
 
 | 
0
 | 
     if ($close_reason =~ /\A:event_(?:hup|err)\z/) {  | 
| 
163
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
         # there's a chance this can be invoked while inflight,  | 
| 
164
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
         # conn_drop will handle this case appropriately  | 
| 
165
 | 
  
0
  
 | 
  
  0
  
 | 
 
 | 
 
 | 
 
 | 
0
 | 
         $self->{mfs_pool}->conn_drop($self, $close_reason) if $self->{mfs_pool};  | 
| 
166
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
     } else {  | 
| 
167
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
         $self->close($close_reason);  | 
| 
168
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
     }  | 
| 
169
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 }  | 
| 
170
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
171
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 # sets the pool this connection belongs to, only call from ConnectionPool  | 
| 
172
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 sub set_pool {  | 
| 
173
 | 
10
 | 
 
 | 
 
 | 
  
10
  
 | 
  
0
  
 | 
27
 | 
     my ($self, $pool) = @_;  | 
| 
174
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
175
 | 
10
 | 
 
 | 
 
 | 
 
 | 
 
 | 
25
 | 
     $self->{mfs_pool} = $pool;  | 
| 
176
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 }  | 
| 
177
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
178
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 # closes a connection, and may reschedule the inflight callback if  | 
| 
179
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 # close_reason is ":retry"  | 
| 
180
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 sub close {  | 
| 
181
 | 
9
 | 
 
 | 
 
 | 
  
9
  
 | 
  
1
  
 | 
30
 | 
     my ($self, $close_reason) = @_;  | 
| 
182
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
183
 | 
9
 | 
 
 | 
 
 | 
 
 | 
 
 | 
43
 | 
     delete $self->{mfs_expire_cb}; # avoid circular ref  | 
| 
184
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
185
 | 
9
 | 
 
 | 
 
 | 
 
 | 
 
 | 
29
 | 
     my $mfs_pool = delete $self->{mfs_pool}; # avoid circular ref  | 
| 
186
 | 
9
 | 
 
 | 
 
 | 
 
 | 
 
 | 
16
 | 
     my $inflight_cb;  | 
| 
187
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
188
 | 
9
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
28
 | 
     if ($mfs_pool) {  | 
| 
189
 | 
9
 | 
 
 | 
 
 | 
 
 | 
 
 | 
46
 | 
         $mfs_pool->schedule_queued;  | 
| 
190
 | 
9
 | 
 
 | 
 
 | 
 
 | 
 
 | 
195
 | 
         $inflight_cb = $mfs_pool->conn_close_prepare($self, $close_reason);  | 
| 
191
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
     }  | 
| 
192
 | 
9
 | 
 
 | 
 
 | 
 
 | 
 
 | 
90
 | 
     $self->SUPER::close($close_reason); # Danga::Socket->close  | 
| 
193
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
194
 | 
9
 | 
  
100
  
 | 
  
 66
  
 | 
 
 | 
 
 | 
1199
 | 
     if ($inflight_cb && $close_reason) {  | 
| 
195
 | 
1
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
12
 | 
         if ($close_reason eq ":retry") {  | 
| 
196
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
             my ($ip, $port) = $self->ip_port;  | 
| 
197
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
198
 | 
  
0
  
 | 
 
 | 
 
 | 
 
 | 
 
 | 
0
 | 
             $mfs_pool->enqueue($ip, $port, $inflight_cb);  | 
| 
199
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
         } else {  | 
| 
200
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
             # Danga::Socket-scheduled write()s which fail with ECONNREFUSED,  | 
| 
201
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
             # EPIPE, or "write_error" after an initial (non-blocking)  | 
| 
202
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
             # connect()  | 
| 
203
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
             $mfs_pool->on_next_tick(sub {  | 
| 
204
 | 
1
 | 
 
 | 
  
 50
  
 | 
  
1
  
 | 
 
 | 
24
 | 
                 ref($self)->new_err($close_reason || "error", $inflight_cb);  | 
| 
205
 | 
1
 | 
 
 | 
 
 | 
 
 | 
 
 | 
19
 | 
             });  | 
| 
206
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
         }  | 
| 
207
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
     }  | 
| 
208
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 }  | 
| 
209
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
210
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 # Marks a connection as no-longer inflight.  Calling this prevents retries.  | 
| 
211
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 sub inflight_expire {  | 
| 
212
 | 
14
 | 
 
 | 
 
 | 
  
14
  
 | 
  
0
  
 | 
43
 | 
     my ($self) = @_;  | 
| 
213
 | 
14
 | 
 
 | 
 
 | 
 
 | 
 
 | 
38
 | 
     my $mfs_pool = $self->{mfs_pool};  | 
| 
214
 | 
14
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
69
 | 
     die "BUG: expiring without MogileFS::ConnectionPool\n" unless $mfs_pool;  | 
| 
215
 | 
14
 | 
 
 | 
 
 | 
 
 | 
 
 | 
165
 | 
     $mfs_pool->inflight_cb_expire($self);  | 
| 
216
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 }  | 
| 
217
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
218
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 # Danga::Socket callbacks  | 
| 
219
 | 
  
0
  
 | 
 
 | 
 
 | 
  
0
  
 | 
  
1
  
 | 
0
 | 
 sub event_hup { $_[0]->err(':event_hup'); }  | 
| 
220
 | 
0
 | 
 
 | 
 
 | 
  
0
  
 | 
  
1
  
 | 
0
 | 
 sub event_err { $_[0]->err(':event_err'); }  | 
| 
221
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
222
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 # called when we couldn't create a socket, but need to create an object  | 
| 
223
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 # anyways for errors (creating fake, LWP-style error responses)  | 
| 
224
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 sub new_err {  | 
| 
225
 | 
1
 | 
 
 | 
 
 | 
  
1
  
 | 
  
0
  
 | 
7
 | 
     my ($class, $err, $start_cb) = @_;  | 
| 
226
 | 
1
 | 
 
 | 
 
 | 
 
 | 
 
 | 
7
 | 
     my $self = fields::new($class);  | 
| 
227
 | 
1
 | 
 
 | 
 
 | 
 
 | 
 
 | 
322
 | 
     $self->{mfs_err} = $err;  | 
| 
228
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
     # on socket errors  | 
| 
229
 | 
1
 | 
 
 | 
 
 | 
 
 | 
 
 | 
9
 | 
     $start_cb->($self);  | 
| 
230
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 }  | 
| 
231
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
232
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 # returns this connection back to its associated pool.  | 
| 
233
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 # Returns false if not successful (pool is full)  | 
| 
234
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 sub persist {  | 
| 
235
 | 
11
 | 
 
 | 
 
 | 
  
11
  
 | 
  
0
  
 | 
46
 | 
     my ($self) = @_;  | 
| 
236
 | 
11
 | 
 
 | 
 
 | 
 
 | 
 
 | 
39
 | 
     my $mfs_pool = $self->{mfs_pool};  | 
| 
237
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
238
 | 
11
 | 
  
 50
  
 | 
 
 | 
 
 | 
 
 | 
62
 | 
     return $mfs_pool ? $mfs_pool->conn_persist($self) : 0;  | 
| 
239
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 }  | 
| 
240
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
    | 
| 
241
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 
 | 
 1;  |