File Coverage

blib/lib/MogileFS/ConnectionPool.pm
Criterion Covered Total %
statement 144 203 70.9
branch 34 74 45.9
condition 17 28 60.7
subroutine 26 32 81.2
pod 0 13 0.0
total 221 350 63.1


line stmt bran cond sub pod time code
1             # a connection pool class with queueing.
2             # (something doesn't sound quite right with that...)
3             # This requires Danga::Socket to drive, but may also function without it
4             # via conn_get/conn_put.
5             package MogileFS::ConnectionPool;
6 21     21   97 use strict;
  21         38  
  21         816  
7 21     21   99 use warnings;
  21         66  
  21         690  
8 21     21   94 use Carp qw(croak confess);
  21         29  
  21         1372  
9 21     21   119 use Time::HiRes;
  21         36  
  21         208  
10              
11 21     21   1974 use constant NEVER => (0xffffffff << 32) | 0xffffffff; # portable version :P
  21         40  
  21         41548  
12              
13             sub new {
14 1     1 0 2 my ($class, $conn_class, $opts) = @_;
15              
16 1   50     3 $opts ||= {};
17 1         13 my $self = bless {
18             fdmap => {}, # { fd -> conn }
19             idle => {}, # ip:port -> [ MogileFS::Connection::Poolable, ... ]
20             queue => [], # [ [ ip, port, callback ], ... ]
21             timer => undef, # Danga::Socket::Timer object
22             timeouts => {}, # { fd -> conn }
23             inflight => {}, # ip:port -> { fd -> callback }
24             total_inflight => 0, # number of inflight connections
25             dest_capacity => $opts->{dest_capacity},
26             total_capacity => $opts->{total_capacity},
27             class => $conn_class,
28             scheduled => 0, # set if we'll start tasks on next tick
29             on_next_tick => [],
30             next_expiry => NEVER,
31             }, $class;
32              
33             # total_capacity=20 matches what we used with LWP
34 1   50     6 $self->{total_capacity} ||= 20;
35              
36             # allow users to specify per-destination capacity limits
37 1   33     6 $self->{dest_capacity} ||= $self->{total_capacity};
38              
39 1         2 return $self;
40             }
41              
42             # retrieves an idle connection for the [IP, port] pair
43             sub _conn_idle_get {
44 15     15   38 my ($self, $ip, $port) = @_;
45              
46 15         61 my $key = "$ip:$port";
47 15 100       117 my $idle = $self->{idle}->{$key} or return;
48              
49             # the Danga::Socket event loop may detect hangups and close sockets,
50             # However not all MFS workers run this event loop, so we need to
51             # validate the connection when retrieving a connection from the pool
52 13         89 while (my $conn = pop @$idle) {
53             # make sure the socket is valid:
54              
55             # due to response callback ordering, we actually place connections
56             # in the pool before invoking the user-supplied response callback
57             # (to allow connections to get reused ASAP)
58 10 50       100 my $sock = $conn->sock or next;
59              
60             # hope this returns EAGAIN, not using OO->sysread here since
61             # Net::HTTP::NB overrides that and we _want_ to hit EAGAIN here
62 10         243 my $r = sysread($sock, my $byte, 1);
63              
64             # good, connection is possibly still alive if we got EAGAIN
65 10 100 66     183 return $conn if (!defined $r && $!{EAGAIN});
66              
67 5         41 my $err = $!;
68 5 50       19 if (defined $r) {
69 5 50       18 if ($r == 0) {
70             # a common case and to be expected
71 5         15 $err = "server dropped idle connection";
72             } else {
73             # this is a bug either on our side or the HTTP server
74 0         0 Mgd::error("Bug: unexpected got $r bytes from idle conn to ". $conn->host_port. ") (byte=$byte)");
75             }
76             }
77              
78             # connection is bad, close the socket and move onto the
79             # next idle connection if there is one.
80 5         56 $conn->close($err);
81             }
82              
83 8         83 return;
84             }
85              
86             # creates a new connection if under capacity
87             # returns undef if we're at capacity (or on EMFILE/ENFILE)
88             sub _conn_new_maybe {
89 10     10   36 my ($self, $ip, $port) = @_;
90 10         30 my $key = "$ip:$port";
91              
92             # we only call this sub if we don't have idle connections, so
93             # we don't check {idle} here
94              
95             # make sure we're not already at capacity for this destination
96 10   100     14 my $nr_inflight = scalar keys %{$self->{inflight}->{$key} ||= {}};
  10         68  
97 10 50       121 return if ($nr_inflight >= $self->{dest_capacity});
98              
99             # see how we're doing with regard to total capacity:
100 10 50       56 if ($self->_total_connections >= $self->{total_capacity}) {
101             # see if we have idle connections for other pools to kill
102 0 0       0 if ($self->{total_inflight} < $self->{total_capacity}) {
103             # we have idle connections to other destinations, drop one of those
104 0         0 $self->_conn_drop_idle;
105             # fall through to creating a new connection
106             } else {
107             # we're at total capacity for the entire pool
108 0         0 return;
109             }
110             }
111              
112             # we're hopefully under capacity if we got here, create a new connection
113 10         39 $self->_conn_new($ip, $port);
114             }
115              
116             # creates new connection and registers it in our fdmap
117             # returns error string if resources (FDs, buffers) aren't available
118             sub _conn_new {
119 10     10   36 my ($self, $ip, $port) = @_;
120              
121             # calls MogileFS::Connection::{HTTP,Mogstored}->new:
122 10         166 my $conn = $self->{class}->new($ip, $port);
123 10 50       34 if ($conn) {
124             # register the connection
125 10         66 $self->{fdmap}->{$conn->fd} = $conn;
126 10         118 $conn->set_pool($self);
127              
128 10         79 return $conn;
129             } else {
130             # EMFILE/ENFILE should never happen as the capacity for this
131             # pool is far under the system defaults. Just give up on
132             # EMFILE/ENFILE like any other error.
133 0         0 return "failed to create socket to $ip:$port ($!)";
134             }
135             }
136              
137             # retrieves a connection, may return undef if at capacity
138             sub _conn_get {
139 15     15   37 my ($self, $ip, $port) = @_;
140              
141             # if we have idle connections, always use them first
142 15 100       88 $self->_conn_idle_get($ip, $port) || $self->_conn_new_maybe($ip, $port);
143             }
144              
145             # Pulls a connection out of the pool for synchronous use.
146             # This may create a new connection (independent of pool limits).
147             # The connection returned by this is _blocking_. This is currently
148             # only used by replicate.
149             sub conn_get {
150 0     0 0 0 my ($self, $ip, $port) = @_;
151 0         0 my $conn = $self->_conn_idle_get($ip, $port);
152              
153 0 0       0 if ($conn) {
154             # in case the connection never comes back, let refcounting close() it:
155 0         0 delete $self->{fdmap}->{$conn->fd};
156             } else {
157 0         0 $conn = $self->_conn_new($ip, $port);
158 0 0       0 unless (ref $conn) {
159 0         0 $! = $conn; # $conn is an error message :<
160 0         0 return;
161             }
162 0         0 delete $self->{fdmap}->{$conn->fd};
163 0         0 my $timeout = MogileFS->config("node_timeout");
164 0 0       0 MogileFS::Util::wait_for_writeability($conn->fd, $timeout) or return;
165             }
166              
167 0         0 return $conn;
168             }
169              
170             # retrieves a connection from the connection pool and executes
171             # inflight_cb on it. If the pool is at capacity, this will queue the task.
172             # This relies on Danga::Socket->EventLoop
173             sub start {
174 15     15 0 55 my ($self, $ip, $port, $inflight_cb) = @_;
175              
176 15         94 my $conn = $self->_conn_get($ip, $port);
177 15 50       206 if ($conn) {
178 15         92 $self->_conn_run($conn, $inflight_cb);
179             } else { # we're too busy right now, queue up
180 0         0 $self->enqueue($ip, $port, $inflight_cb);
181             }
182             }
183              
184             # returns the total number of connections we have
185             sub _total_connections {
186 10     10   21 my ($self) = @_;
187 10         12 return scalar keys %{$self->{fdmap}};
  10         59  
188             }
189              
190             # marks a connection as no longer inflight, returns the inflight
191             # callback if the connection was active, undef if not
192             sub inflight_cb_expire {
193 23     23 0 53 my ($self, $conn) = @_;
194 23         188 my $inflight_cb = delete $self->{inflight}->{$conn->key}->{$conn->fd};
195 23 100       280 $self->{total_inflight}-- if $inflight_cb;
196              
197 23         392 return $inflight_cb;
198             }
199              
200             # schedules the event loop to dequeue and run a task on the next
201             # tick of the Danga::Socket event loop. Call this
202             # 1) whenever a task is enqueued
203             # 2) whenever a task is complete
204             sub schedule_queued {
205 20     20 0 47 my ($self) = @_;
206              
207             # AddTimer(0) to avoid potential stack overflow
208             $self->{scheduled} ||= Danga::Socket->AddTimer(0, sub {
209 15     15   14870 $self->{scheduled} = undef;
210 15         103 my $queue = $self->{queue};
211              
212 15         117 my $total_capacity = $self->{total_capacity};
213 15         772 my $i = 0;
214              
215 15   33     750 while ($self->{total_inflight} < $total_capacity
216             && $i <= (scalar(@$queue) - 1)) {
217 0         0 my ($ip, $port, $cb) = @{$queue->[$i]};
  0         0  
218              
219 0         0 my $conn = $self->_conn_get($ip, $port);
220 0 0       0 if ($conn) {
221 0         0 splice(@$queue, $i, 1); # remove from queue
222 0         0 $self->_conn_run($conn, $cb);
223             } else {
224             # this queue object cannot be dequeued, skip it for now
225 0         0 $i++;
226             }
227             }
228 20   66     386 });
229             }
230              
231             # Call this when done using an (inflight) connection
232             # This possibly places a connection in the connection pool.
233             # This will close the connection of the pool is already at capacity.
234             # This will also start the next queued callback, or retry if needed
235             sub conn_persist {
236 11     11 0 20 my ($self, $conn) = @_;
237              
238             # schedule the next request if we're done with any connection
239 11         49 $self->schedule_queued;
240 11         535 $self->conn_put($conn);
241             }
242              
243             # The opposite of conn_get, this returns a connection retrieved with conn_get
244             # back to the connection pool, making it available for future use. Dead
245             # connections are not stored.
246             # This is currently only used by replicate.
247             sub conn_put {
248 11     11 0 27 my ($self, $conn) = @_;
249              
250 11         54 my $key = $conn->key;
251             # we do not store dead connections
252 11         117 my $peer_addr = $conn->peer_addr_string;
253              
254 11 50       794 if ($peer_addr) {
255             # connection is still alive, respect capacity limits
256 11   100     94 my $idle = $self->{idle}->{$key} ||= [];
257              
258             # register it in the fdmap just in case:
259 11         58 $self->{fdmap}->{$conn->fd} = $conn;
260              
261 11 50       155 if ($self->_dest_total($conn) < $self->{dest_capacity}) {
262 11         70 $conn->mark_idle;
263 11         28 push @$idle, $conn; # yay, connection is reusable
264 11         73 $conn->set_timeout(undef); # clear timeout
265 11         102 return 1; # success
266             }
267             }
268              
269             # we have too many connections or the socket is dead, caller
270             # should close after returning from this function.
271 0         0 return 0;
272             }
273              
274             # enqueues a request (inflight_cb) and schedules it to run ASAP
275             # This must be used with Danga::Socket->EventLoop
276             sub enqueue {
277 0     0 0 0 my ($self, $ip, $port, $inflight_cb) = @_;
278              
279 0         0 push @{$self->{queue}}, [ $ip, $port, $inflight_cb ];
  0         0  
280              
281             # we have something in the queue, make sure it's run soon
282 0         0 $self->schedule_queued;
283             }
284              
285             # returns the total connections to the host of a given connection
286             sub _dest_total {
287 11     11   27 my ($self, $conn) = @_;
288 11         37 my $key = $conn->key;
289 11         30 my $inflight = scalar keys %{$self->{inflight}->{$key}};
  11         48  
290 11         20 my $idle = scalar @{$self->{idle}->{$key}};
  11         35  
291 11         68 return $idle + $inflight;
292             }
293              
294             # only call this from the event_hup/event_err callbacks used by Danga::Socket
295             sub conn_drop {
296 0     0 0 0 my ($self, $conn, $close_reason) = @_;
297              
298 0         0 my $fd = $conn->fd;
299 0         0 my $key = $conn->key;
300              
301             # event_read must handle errors anyways, so hand off
302             # error handling to the event_read callback if inflight.
303 0 0       0 return $conn->event_read if $self->{inflight}->{$key}->{$fd};
304              
305             # we get here if and only if the socket is idle, we can drop it ourselves
306             # splice out the socket we're closing from the idle pool
307 0         0 my $idle = $self->{idle}->{$key};
308 0         0 foreach my $i (0 .. (scalar(@$idle) - 1)) {
309 0         0 my $old = $idle->[$i];
310 0 0       0 if ($old->sock) {
311 0 0       0 if ($old->fd == $fd) {
312 0         0 splice(@$idle, $i, 1);
313 0         0 $conn->close($close_reason);
314 0         0 return;
315             }
316             } else {
317             # some connections may have expired but not been spliced out, yet
318             # splice it out here since we're iterating anyways
319 0         0 splice(@$idle, $i, 1);
320             }
321             }
322             }
323              
324             # unregisters and prepares connection to be closed
325             # Returns the inflight callback if there was one
326             sub conn_close_prepare {
327 9     9 0 24 my ($self, $conn, $close_reason) = @_;
328              
329 9 50       35 if ($conn->sock) {
330 9         149 my $fd = $conn->fd;
331              
332 9         607 my $valid = delete $self->{fdmap}->{$fd};
333 9         35 delete $self->{timeouts}->{$fd};
334              
335 9         51 my $inflight_cb = $self->inflight_cb_expire($conn);
336              
337             # $valid may be undef in replicate worker which removes connections
338             # from fdmap. However, valid==undef connections should never have
339             # an inflight_cb
340 9 50 66     42 if ($inflight_cb && !$valid) {
341 0         0 croak("BUG: dropping unregistered conn with callback: $conn");
342             }
343 9         35 return $inflight_cb;
344             }
345             }
346              
347             # schedules cb to run on the next tick of the event loop,
348             # (immediately after this tick runs)
349             sub on_next_tick {
350 1     1 0 4 my ($self, $cb) = @_;
351 1         5 my $on_next_tick = $self->{on_next_tick};
352 1         5 push @$on_next_tick, $cb;
353              
354 1 50       5 if (scalar(@$on_next_tick) == 1) {
355             Danga::Socket->AddTimer(0, sub {
356             # prevent scheduled callbacks from being called on _this_ tick
357 1     1   9 $on_next_tick = $self->{on_next_tick};
358 1         3 $self->{on_next_tick} = [];
359              
360 1         5 while (my $sub = shift @$on_next_tick) {
361 1         6 $sub->()
362             }
363 1         11 });
364             }
365             }
366              
367             # marks a connection inflight and invokes cb on it
368             # $conn may be a error string, in which case we'll invoke the user-supplied
369             # callback with a mock error (this mimics how LWP fakes an HTTP response
370             # even if the socket could not be created/connected)
371             sub _conn_run {
372 15     15   37 my ($self, $conn, $cb) = @_;
373              
374 15 50       59 if (ref $conn) {
375 15   50     98 my $inflight = $self->{inflight}->{$conn->key} ||= {};
376 15         76 $inflight->{$conn->fd} = $cb; # stash callback for retrying
377 15         122 $self->{total_inflight}++;
378 15         61 $cb->($conn);
379             } else {
380             # fake an error message on the response callback
381             $self->on_next_tick(sub {
382             # fatal error creating the socket, do not queue
383 0     0   0 my $mfs_err = $conn;
384 0         0 $self->{class}->new_err($mfs_err, $cb);
385              
386             # onto the next request
387 0         0 $self->schedule_queued;
388 0         0 });
389             }
390             }
391              
392             # drops an idle connection from the idle connection pool (so we can open
393             # another socket without incurring out-of-FD errors)
394             # Only call when you're certain there's a connection to drop
395             # XXX This is O(destinations), unfortunately
396             sub _conn_drop_idle {
397 0     0   0 my ($self) = @_;
398 0         0 my $idle = $self->{idle};
399              
400             # using "each" on the hash since it preserves the internal iterator
401             # of the hash across invocations of this sub. This should preserve
402             # the balance of idle connections in a big pool with many hosts.
403             # Thus we loop twice to ensure we scan the entire idle connection
404             # pool if needed
405 0         0 foreach (1..2) {
406 0         0 while (my (undef, $val) = each %$idle) {
407 0 0       0 my $conn = shift @$val or next;
408              
409 0 0       0 $conn->close("idle_expire") if $conn->sock;
410 0         0 return;
411             }
412             }
413              
414 0         0 confess("BUG: unable to drop an idle connection");
415             }
416              
417             # checks for expired connections, this can be expensive if there
418             # are many concurrent connections waiting on timeouts, but still
419             # better than having AddTimer create a Danga::Socket::Timer object
420             # every time a timeout is reset.
421             sub check_timeouts {
422 6     6 0 31 my ($self) = @_;
423 6         60 my $timeouts = $self->{timeouts};
424 6         63 my @fds = keys %$timeouts;
425 6         41 my $next_expiry = NEVER;
426 6         41 my $now = Time::HiRes::time();
427              
428             # this is O(n) where n is concurrent connections
429 6         128 foreach my $fd (@fds) {
430 6         16 my $conn = $timeouts->{$fd};
431 6 100       123 if ($conn->expired($now)) {
432 2         27 delete $timeouts->{$fd};
433             } else {
434             # look for the next timeout
435 4         63 my $expiry = $conn->expiry;
436 4 50       21 if ($expiry) {
437 4 50       31 $next_expiry = $expiry if $expiry < $next_expiry;
438             } else {
439             # just in case, this may not happen...
440 0         0 delete $timeouts->{$fd};
441             }
442             }
443             }
444              
445             # schedule the wakeup for the next timeout
446 6 100       39 if ($next_expiry == NEVER) {
447 2         14 $self->{timer} = undef;
448             } else {
449 4         16 my $timeout = $next_expiry - $now;
450 4 50       26 $timeout = 0 if $timeout <= 0;
451             $self->{timer} = Danga::Socket->AddTimer($timeout, sub {
452 4     4   1109414 $self->check_timeouts;
453 4         136 });
454             }
455 6         267 $self->{next_expiry} = $next_expiry;
456             }
457              
458             # registers a timeout for a given connection, each connection may only
459             # have one pending timeout. Timeout may be undef to cancel the current
460             # timeout.
461             sub register_timeout {
462 42     42 0 167 my ($self, $conn, $timeout) = @_;
463              
464 42 50       205 if ($conn->sock) {
    0          
465 42         369 my $fd = $conn->fd;
466 42 100       260 if ($timeout) {
467 31         117 $self->{timeouts}->{$fd} = $conn;
468 31         120 my $next_expiry = $self->{next_expiry};
469 31         84 my $old_timer = $self->{timer};
470 31         96 my $expiry = $timeout + Time::HiRes::time();
471              
472 31 100 66     399 if (!$old_timer || $expiry < $next_expiry) {
473 3         4 $self->{next_expiry} = $expiry;
474             $self->{timer} = Danga::Socket->AddTimer($timeout, sub {
475 2     2   1565737 $self->check_timeouts;
476 3         29 });
477 3 50       69 $old_timer->cancel if $old_timer;
478             }
479             } else {
480 11         99 delete $self->{timeouts}->{$fd};
481             }
482             } elsif ($timeout) { # this may never happen...
483             # no FD, so we must allocate a new Danga::Socket::Timer object
484             # add 1msec to avoid FP rounding problems leading to missed
485             # expiration when calling conn->expired
486 0     0     Danga::Socket->AddTimer($timeout + 0.001, sub { $conn->expired });
  0            
487             }
488             }
489              
490             1;