File Coverage

blib/lib/MogileFS/Connection/Poolable.pm
Criterion Covered Total %
statement 102 117 87.1
branch 20 34 58.8
condition 8 16 50.0
subroutine 25 30 83.3
pod 6 20 30.0
total 161 217 74.1


line stmt bran cond sub pod time code
1             # private base class for poolable HTTP/Mogstored sidechannel connections
2             # This is currently only used by HTTP, but is intended for Mogstored
3             # connections, too.
4             package MogileFS::Connection::Poolable;
5 21     21   107 use strict;
  21         32  
  21         451  
6 21     21   81 use warnings;
  21         34  
  21         366  
7 21     21   77 use Danga::Socket;
  21         33  
  21         407  
8 21     21   86 use base qw(Danga::Socket);
  21         41  
  21         2704  
9             use fields (
10 21         117 'mfs_pool', # owner of the connection (MogileFS::ConnectionPool)
11             'mfs_hostport', # [ ip, port ]
12             'mfs_expire', # Danga::Socket::Timer object
13             'mfs_expire_cb', # Danga::Socket::Timer callback
14             'mfs_requests', # number of requests made on this object
15             'mfs_err', # used to propagate an error to start()
16             'mfs_writeq', # arrayref if connecting, undef otherwise
17 21     21   128 );
  21         42  
18 21     21   1712 use Socket qw(SO_KEEPALIVE);
  21         36  
  21         821  
19 21     21   98 use Time::HiRes;
  21         34  
  21         126  
20              
21             # subclasses (MogileFS::Connection::{HTTP,Mogstored}) must call this sub
22             sub new {
23 10     10 1 70 my ($self, $sock, $ip, $port) = @_;
24 10         66 $self->SUPER::new($sock); # Danga::Socket->new
25              
26             # connection may not be established, yet
27             # so Danga::Socket->peer_addr_string can't be used here
28 10         680 $self->{mfs_hostport} = [ $ip, $port ];
29 10         21 $self->{mfs_requests} = 0;
30              
31             # newly-created socket, we buffer writes until event_write is triggered
32 10         20 $self->{mfs_writeq} = [];
33              
34 10         25 return $self;
35             }
36              
37             # used by ConnectionPool for tracking per-hostport connection counts
38 75     75 0 111 sub key { join(':', @{$_[0]->{mfs_hostport}}); }
  75         405  
39              
40             # backwards compatibility
41 0     0 0 0 sub host_port { $_[0]->key; }
42              
43 0     0 0 0 sub ip_port { @{$_[0]->{mfs_hostport}}; }
  0         0  
44              
45 110     110 0 317 sub fd { fileno($_[0]->sock); }
46              
47             # marks a connection as idle, call this before putting it in a connection
48             # pool for eventual reuse.
49             sub mark_idle {
50 11     11 0 27 my ($self) = @_;
51              
52 11         44 $self->watch_read(0);
53              
54             # set the keepalive flag the first time we're idle
55 11 100       496 $self->sock->sockopt(SO_KEEPALIVE, 1) if $self->{mfs_requests} == 0;
56              
57 11         350 $self->{mfs_requests}++;
58             }
59              
60             sub write {
61 50     50 1 123 my ($self, $arg) = @_;
62 50         83 my $writeq = $self->{mfs_writeq};
63              
64 50 100       105 if (ref($writeq) eq "ARRAY") {
65             # if we're still connecting, we must buffer explicitly for *BSD
66             # and not attempt a real write() until event_write is triggered
67 20         37 push @$writeq, $arg;
68 20         74 $self->watch_write(1); # enable event_write triggering
69 20         655 0; # match Danga::Socket::write return value
70             } else {
71 30         239 $self->SUPER::write($arg);
72             }
73             }
74              
75             # Danga::Socket will trigger this when a socket is writable
76             sub event_write {
77 10     10 1 2469 my ($self) = @_;
78              
79             # we may have buffered writes in mfs_writeq during non-blocking connect(),
80             # this is needed on *BSD but unnecessary (but harmless) on Linux.
81 10         80 my $writeq = delete $self->{mfs_writeq};
82 10 50       51 if ($writeq) {
83 10         158 $self->watch_write(0); # ->write will re-enable if needed
84 10         532 foreach my $queued (@$writeq) {
85 20         880 $self->write($queued);
86             }
87             } else {
88 0         0 $self->SUPER::event_write();
89             }
90             }
91              
92             # the request running on this connection is retryable if this socket
93             # has ever been marked idle. The connection pool can never be 100%
94             # reliable for detecting dead sockets, and all HTTP requests made by
95             # MogileFS are idempotent.
96             sub retryable {
97 3     3 0 12 my ($self, $reason) = @_;
98 3   66     80 return ($reason !~ /timeout/ && $self->{mfs_requests} > 0);
99             }
100              
101             # Sets (or updates) the timeout of the connection
102             # timeout_key is "node_timeout" or "conn_timeout"
103             # clears the current timeout if timeout_key is undef
104             sub set_timeout {
105 42     42 0 193 my ($self, $timeout_key) = @_;
106 42         94 my $mfs_pool = $self->{mfs_pool};
107              
108 42         224 $self->SetPostLoopCallback(undef);
109 42 100       598 if ($timeout_key) {
110 31         47 my $timeout;
111              
112 31 50       370 if ($timeout_key =~ /\A[a-z_]+\z/) {
113 31   50     343 $timeout = MogileFS->config($timeout_key) || 2;
114             } else {
115 0         0 $timeout = $timeout_key;
116 0         0 $timeout_key = "timeout";
117             }
118              
119 31         109 my $t0 = Time::HiRes::time();
120 31         89 $self->{mfs_expire} = $t0 + $timeout;
121             $self->{mfs_expire_cb} = sub {
122 2     2   14 my ($now) = @_;
123 2         11 my $elapsed = $now - $t0;
124              
125             # for HTTP, this will fake an HTTP error response like LWP does
126 2         61 $self->err("$timeout_key: $timeout (elapsed: $elapsed)");
127 31         386 };
128 31 50       313 $mfs_pool->register_timeout($self, $timeout) if $mfs_pool;
129             } else {
130 11         95 $self->{mfs_expire} = $self->{mfs_expire_cb} = undef;
131 11 50       59 $mfs_pool->register_timeout($self, undef) if $mfs_pool;
132             }
133             }
134              
135             # returns the expiry time of the connection
136 4     4 0 32 sub expiry { $_[0]->{mfs_expire} }
137              
138             # runs expiry callback and returns true if time is up,
139             # returns false if there is time remaining
140             sub expired {
141 6     6 0 28 my ($self, $now) = @_;
142 6 50       40 my $expire = $self->{mfs_expire} or return 0;
143 6   33     41 $now ||= Time::HiRes::time();
144              
145 6 100       147 if ($now >= $expire) {
146 2         30 my $expire_cb = delete $self->{mfs_expire_cb};
147 2 50 33     63 if ($expire_cb && $self->sock) {
148 2     2   68 $self->SetPostLoopCallback(sub { $expire_cb->($now); 1 });
  2         291214  
  2         58  
149             }
150 2         58 return 1;
151             }
152 4         46 return 0;
153             }
154              
155             # may be overriden in subclass, called only on errors
156             # The HTTP version of this will fake an HTTP response for LWP compatibility
157             sub err {
158 0     0 0 0 my ($self, $close_reason) = @_;
159              
160 0         0 $self->inflight_expire; # ensure we don't call new_err on eventual close()
161              
162 0 0       0 if ($close_reason =~ /\A:event_(?:hup|err)\z/) {
163             # there's a chance this can be invoked while inflight,
164             # conn_drop will handle this case appropriately
165 0 0       0 $self->{mfs_pool}->conn_drop($self, $close_reason) if $self->{mfs_pool};
166             } else {
167 0         0 $self->close($close_reason);
168             }
169             }
170              
171             # sets the pool this connection belongs to, only call from ConnectionPool
172             sub set_pool {
173 10     10 0 27 my ($self, $pool) = @_;
174              
175 10         25 $self->{mfs_pool} = $pool;
176             }
177              
178             # closes a connection, and may reschedule the inflight callback if
179             # close_reason is ":retry"
180             sub close {
181 9     9 1 30 my ($self, $close_reason) = @_;
182              
183 9         43 delete $self->{mfs_expire_cb}; # avoid circular ref
184              
185 9         29 my $mfs_pool = delete $self->{mfs_pool}; # avoid circular ref
186 9         16 my $inflight_cb;
187              
188 9 50       28 if ($mfs_pool) {
189 9         46 $mfs_pool->schedule_queued;
190 9         195 $inflight_cb = $mfs_pool->conn_close_prepare($self, $close_reason);
191             }
192 9         90 $self->SUPER::close($close_reason); # Danga::Socket->close
193              
194 9 100 66     1199 if ($inflight_cb && $close_reason) {
195 1 50       12 if ($close_reason eq ":retry") {
196 0         0 my ($ip, $port) = $self->ip_port;
197              
198 0         0 $mfs_pool->enqueue($ip, $port, $inflight_cb);
199             } else {
200             # Danga::Socket-scheduled write()s which fail with ECONNREFUSED,
201             # EPIPE, or "write_error" after an initial (non-blocking)
202             # connect()
203             $mfs_pool->on_next_tick(sub {
204 1   50 1   24 ref($self)->new_err($close_reason || "error", $inflight_cb);
205 1         19 });
206             }
207             }
208             }
209              
210             # Marks a connection as no-longer inflight. Calling this prevents retries.
211             sub inflight_expire {
212 14     14 0 43 my ($self) = @_;
213 14         38 my $mfs_pool = $self->{mfs_pool};
214 14 50       69 die "BUG: expiring without MogileFS::ConnectionPool\n" unless $mfs_pool;
215 14         165 $mfs_pool->inflight_cb_expire($self);
216             }
217              
218             # Danga::Socket callbacks
219 0     0 1 0 sub event_hup { $_[0]->err(':event_hup'); }
220 0     0 1 0 sub event_err { $_[0]->err(':event_err'); }
221              
222             # called when we couldn't create a socket, but need to create an object
223             # anyways for errors (creating fake, LWP-style error responses)
224             sub new_err {
225 1     1 0 7 my ($class, $err, $start_cb) = @_;
226 1         7 my $self = fields::new($class);
227 1         322 $self->{mfs_err} = $err;
228             # on socket errors
229 1         9 $start_cb->($self);
230             }
231              
232             # returns this connection back to its associated pool.
233             # Returns false if not successful (pool is full)
234             sub persist {
235 11     11 0 46 my ($self) = @_;
236 11         39 my $mfs_pool = $self->{mfs_pool};
237              
238 11 50       62 return $mfs_pool ? $mfs_pool->conn_persist($self) : 0;
239             }
240              
241             1;