File Coverage

blib/lib/Async/Redis/Pool.pm
Criterion Covered Total %
statement 29 226 12.8
branch 0 58 0.0
condition 0 24 0.0
subroutine 10 37 27.0
pod 8 8 100.0
total 47 353 13.3


line stmt bran cond sub pod time code
1             package Async::Redis::Pool;
2              
3 1     1   158052 use strict;
  1         1  
  1         49  
4 1     1   5 use warnings;
  1         7  
  1         77  
5 1     1   14 use 5.018;
  1         2  
6              
7 1     1   3 use Future;
  1         2  
  1         19  
8 1     1   3 use Future::AsyncAwait;
  1         1  
  1         5  
9 1     1   427 use Future::IO;
  1         32573  
  1         52  
10 1     1   7 use Scalar::Util qw(refaddr);
  1         5  
  1         39  
11 1     1   733 use Async::Redis;
  1         4  
  1         54  
12 1     1   10 use Async::Redis::Error::Disconnected;
  1         1  
  1         20  
13 1     1   3 use Async::Redis::Error::Timeout;
  1         1  
  1         2914  
14              
15             sub new {
16 0     0 1   my ($class, %args) = @_;
17              
18             # Separate pool-specific args from connection args.
19             # Everything not pool-specific is passed through to Async::Redis->new().
20 0           my %pool_args;
21 0           for my $key (qw(min max acquire_timeout idle_timeout cleanup_timeout on_dirty)) {
22 0 0         $pool_args{$key} = delete $args{$key} if exists $args{$key};
23             }
24              
25             my $self = bless {
26             # Connection params (passed through to Async::Redis->new)
27             _conn_args => \%args,
28              
29             # Pool sizing
30             min => $pool_args{min} // 1,
31             max => $pool_args{max} // 10,
32              
33             # Timeouts
34             acquire_timeout => $pool_args{acquire_timeout} // 5,
35             idle_timeout => $pool_args{idle_timeout} // 60,
36             cleanup_timeout => $pool_args{cleanup_timeout} // 5,
37              
38             # Dirty handling
39 0   0       on_dirty => $pool_args{on_dirty} // 'destroy',
      0        
      0        
      0        
      0        
      0        
40              
41             # Pool state
42             _idle => [], # Available connections
43             _active => {}, # Connections in use (refaddr => conn)
44             _waiters => [], # Futures waiting for connection
45             _shutdown => 0, # Set by shutdown(); blocks new acquires
46             _pending => [], # Background futures (creation, cleanup)
47             _creating => 0, # Connections currently being created
48             _total_created => 0,
49             _total_destroyed => 0,
50              
51             # Fork safety
52             _pid => $$,
53             }, $class;
54              
55 0           return $self;
56             }
57              
58             # Accessors
59 0     0 1   sub min { shift->{min} }
60 0     0 1   sub max { shift->{max} }
61              
62             # Statistics
63             sub stats {
64 0     0 1   my ($self) = @_;
65              
66             return {
67 0           active => scalar keys %{$self->{_active}},
68 0           idle => scalar @{$self->{_idle}},
69 0           waiting => scalar @{$self->{_waiters}},
70 0           total => (scalar keys %{$self->{_active}}) + (scalar @{$self->{_idle}}),
  0            
71             destroyed => $self->{_total_destroyed},
72 0           };
73             }
74              
75             # Check if fork occurred and clear pool
76             sub _check_fork {
77 0     0     my ($self) = @_;
78              
79 0 0 0       if ($self->{_pid} && $self->{_pid} != $$) {
80             # Fork detected - invalidate all connections
81 0           $self->_clear_all_connections;
82 0           $self->{_pid} = $$;
83 0           return 1;
84             }
85              
86 0           return 0;
87             }
88              
89             # Clear all connections (called after fork)
90             sub _clear_all_connections {
91 0     0     my ($self) = @_;
92              
93             # Clear idle connections without closing (parent owns the sockets)
94 0           $self->{_idle} = [];
95              
96             # Clear active connection tracking (caller still has reference)
97 0           $self->{_active} = {};
98              
99             # Cancel waiters
100 0           for my $waiter (@{$self->{_waiters}}) {
  0            
101 0 0         $waiter->fail("Pool invalidated after fork") unless $waiter->is_ready;
102             }
103 0           $self->{_waiters} = [];
104              
105             # Clear pending futures and creation counter
106 0           $self->{_pending} = [];
107 0           $self->{_creating} = 0;
108             }
109              
110             # Acquire a connection from the pool
111 0     0 1   async sub acquire {
112 0           my ($self) = @_;
113              
114 0 0         if ($self->{_shutdown}) {
115 0           die Async::Redis::Error::Disconnected->new(
116             message => "Pool is shut down",
117             );
118             }
119              
120             # Check for fork - clear pool if PID changed
121 0           $self->_check_fork;
122              
123             # Try to get an idle connection
124 0           while (@{$self->{_idle}}) {
  0            
125 0           my $conn = shift @{$self->{_idle}};
  0            
126              
127             # Health check
128 0           my $healthy = await $self->_health_check($conn);
129 0 0         if ($healthy) {
130 0           $self->{_active}{refaddr($conn)} = $conn;
131 0           return $conn;
132             }
133              
134             # Unhealthy - destroy and try next
135 0           $self->_destroy_connection($conn);
136             }
137              
138             # No idle connections - can we create a new one?
139             # Include _creating count to prevent concurrent acquires from exceeding max
140 0           my $current_total = (scalar keys %{$self->{_active}})
141 0           + (scalar @{$self->{_idle}})
142 0           + $self->{_creating};
143              
144 0 0         if ($current_total < $self->{max}) {
145 0           $self->{_creating}++;
146 0           my $conn;
147 0           eval {
148 0           $conn = await $self->_create_connection;
149             };
150 0           my $error = $@;
151 0           $self->{_creating}--;
152              
153 0 0         if ($error) {
154 0           die $error;
155             }
156              
157 0           $self->{_active}{refaddr($conn)} = $conn;
158 0           return $conn;
159             }
160              
161             # At max capacity - wait for release
162 0           my $waiter = Future->new;
163 0           push @{$self->{_waiters}}, $waiter;
  0            
164              
165             my $timeout_future = Future::IO->sleep($self->{acquire_timeout})->then(sub {
166             Future->fail(Async::Redis::Error::Timeout->new(
167             message => "Acquire timed out after $self->{acquire_timeout}s",
168             timeout => $self->{acquire_timeout},
169 0     0     ));
170 0           });
171              
172 0           my $wait_f = Future->wait_any($waiter, $timeout_future);
173              
174 0           my $result;
175 0           eval {
176 0           $result = await $wait_f;
177             };
178 0           my $error = $@;
179              
180             # If waiter was cancelled by timeout, remove from queue
181 0 0         if (!$waiter->is_done) {
182 0           @{$self->{_waiters}} = grep { $_ != $waiter } @{$self->{_waiters}};
  0            
  0            
  0            
183             }
184              
185 0 0         die $error if $error;
186 0           return $result;
187             }
188              
189             # Release a connection back to the pool
190             sub release {
191 0     0 1   my ($self, $conn) = @_;
192              
193 0 0         return unless defined $conn;
194              
195             # Check for fork - if forked, don't return to pool
196 0 0         if ($self->_check_fork) {
197             # Pool was cleared, just drop this connection
198 0           return;
199             }
200              
201 0           my $id = refaddr($conn);
202 0 0         unless (exists $self->{_active}{$id}) {
203 0           warn "Pool: release called on unknown or already-released connection";
204 0           return;
205             }
206 0           delete $self->{_active}{$id};
207              
208             # After shutdown, destroy instead of pooling
209 0 0         if ($self->{_shutdown}) {
210 0           $self->_destroy_connection($conn);
211 0           return;
212             }
213              
214             # Check if connection is dirty
215 0 0         if ($conn->is_dirty) {
216 0 0 0       if ($self->{on_dirty} eq 'cleanup' && $self->_can_attempt_cleanup($conn)) {
217             # Attempt cleanup asynchronously
218             $self->_track_pending(
219             $self->_cleanup_connection($conn)->on_done(sub {
220 0     0     $self->_return_to_pool($conn);
221             })->on_fail(sub {
222 0     0     $self->_destroy_connection($conn);
223 0           $self->_maybe_create_replacement;
224             })
225 0           );
226             }
227             else {
228             # Default: destroy and potentially replace
229 0           $self->_destroy_connection($conn);
230 0           $self->_maybe_create_replacement;
231             }
232 0           return;
233             }
234              
235             # Clean connection - return to pool or give to waiter
236 0           $self->_return_to_pool($conn);
237             }
238              
239             sub _return_to_pool {
240 0     0     my ($self, $conn) = @_;
241              
242             # Give to waiting acquirer if any
243 0 0         if (@{$self->{_waiters}}) {
  0            
244 0           my $waiter = shift @{$self->{_waiters}};
  0            
245 0           $self->{_active}{refaddr($conn)} = $conn;
246 0           $waiter->done($conn);
247 0           return;
248             }
249              
250             # Return to idle pool
251 0           push @{$self->{_idle}}, $conn;
  0            
252             }
253              
254             # Create a new connection
255 0     0     async sub _create_connection {
256 0           my ($self) = @_;
257              
258 0           my $conn = Async::Redis->new(%{$self->{_conn_args}});
  0            
259 0           await $conn->connect;
260              
261 0           $self->{_total_created}++;
262              
263 0           return $conn;
264             }
265              
266             # Destroy a connection
267             sub _destroy_connection {
268 0     0     my ($self, $conn) = @_;
269              
270 0           eval { $conn->disconnect };
  0            
271 0           $self->{_total_destroyed}++;
272             }
273              
274             # Maybe create a replacement connection to maintain min
275             sub _maybe_create_replacement {
276 0     0     my ($self) = @_;
277              
278 0           my $current_total = (scalar keys %{$self->{_active}})
279 0           + (scalar @{$self->{_idle}})
280 0           + $self->{_creating};
281              
282 0 0         if ($current_total < $self->{min}) {
283             # Create replacement asynchronously
284 0           $self->{_creating}++;
285             $self->_track_pending(
286             $self->_create_connection->on_done(sub {
287 0     0     my ($conn) = @_;
288 0           $self->{_creating}--;
289 0           $self->_return_to_pool($conn);
290             })->on_fail(sub {
291 0     0     $self->{_creating}--;
292             # Failed to create replacement - log and continue
293 0           warn "Failed to create replacement connection: @_";
294             })
295 0           );
296             }
297             }
298              
299             # Track a pending background future
300             sub _track_pending {
301 0     0     my ($self, $f) = @_;
302              
303 0           push @{$self->{_pending}}, $f;
  0            
304              
305             # Clean up completed futures
306             $f->on_ready(sub {
307 0     0     @{$self->{_pending}} = grep { !$_->is_ready } @{$self->{_pending}};
  0            
  0            
  0            
308 0           });
309              
310 0           return $f;
311             }
312              
313             # Health check
314 0     0     async sub _health_check {
315 0           my ($self, $conn) = @_;
316              
317             # Can't PING a pubsub connection
318 0 0         if ($conn->in_pubsub) {
319 0           return 0;
320             }
321              
322             # Quick PING with 1 second timeout
323 0           my $ok = 0;
324 0           eval {
325 0           my $ping_f = $conn->ping;
326 0     0     my $timeout_f = Future::IO->sleep(1)->then(sub { Future->fail('health_timeout') });
  0            
327 0           my $result = await Future->wait_any($ping_f, $timeout_f);
328 0 0 0       $ok = 1 if defined $result && $result eq 'PONG';
329             };
330              
331 0           return $ok;
332             }
333              
334             # Check if cleanup can be attempted
335             sub _can_attempt_cleanup {
336 0     0     my ($self, $conn) = @_;
337              
338             # NEVER attempt cleanup for these states:
339              
340             # PubSub mode - UNSUBSCRIBE returns confirmation frames that
341             # must be correctly drained in modal pubsub mode. Too risky.
342 0 0         return 0 if $conn->in_pubsub;
343              
344             # Inflight requests - after timeout/reset we've already
345             # declared the stream desynced.
346 0 0         return 0 if $conn->inflight_count > 0;
347              
348             # Cleanup MAY be attempted for these (still risky, but bounded):
349             # - in_multi: DISCARD is safe if we're actually in MULTI
350             # - watching: UNWATCH is always safe
351 0 0 0       return 1 if $conn->in_multi || $conn->watching;
352              
353             # Unknown dirty state - don't risk it
354 0           return 0;
355             }
356              
357             # Attempt to cleanup a dirty connection
358 0     0     async sub _cleanup_connection {
359 0           my ($self, $conn) = @_;
360              
361             # Note: Only called for in_multi or watching states
362             # PubSub and inflight connections are always destroyed
363              
364 0           eval {
365             # Reset transaction state
366 0 0         if ($conn->in_multi) {
367 0           my $discard_f = $conn->command('DISCARD');
368             my $timeout_f = Future::IO->sleep($self->{cleanup_timeout})->then(sub {
369 0     0     Future->fail('cleanup_timeout');
370 0           });
371 0           await Future->wait_any($discard_f, $timeout_f);
372 0           $conn->{in_multi} = 0;
373             }
374              
375 0 0         if ($conn->watching) {
376 0           my $unwatch_f = $conn->command('UNWATCH');
377             my $timeout_f = Future::IO->sleep($self->{cleanup_timeout})->then(sub {
378 0     0     Future->fail('cleanup_timeout');
379 0           });
380 0           await Future->wait_any($unwatch_f, $timeout_f);
381 0           $conn->{watching} = 0;
382             }
383             };
384              
385 0 0         if ($@) {
386 0           die "Cleanup failed: $@";
387             }
388              
389             # Verify connection is now clean
390 0 0         if ($conn->is_dirty) {
391 0           die "Connection still dirty after cleanup";
392             }
393              
394 0           return $conn;
395             }
396              
397             # The recommended pattern
398 0     0 1   async sub with {
399 0           my ($self, $code) = @_;
400              
401 0           my $conn = await $self->acquire;
402 0           my $result;
403             my $error;
404              
405 0           eval {
406 0           $result = await $code->($conn);
407             };
408 0           $error = $@;
409              
410             # Always release, even on exception
411             # release() handles dirty detection
412 0           $self->release($conn);
413              
414 0 0         die $error if $error;
415 0           return $result;
416             }
417              
418             # Shutdown the pool — synchronous. Blocks new acquires, fails waiters,
419             # closes idle connections. Active connections are destroyed when released.
420             sub shutdown {
421 0     0 1   my ($self) = @_;
422 0 0         return if $self->{_shutdown};
423 0           $self->{_shutdown} = 1;
424              
425             # Close idle connections
426 0           for my $conn (@{$self->{_idle}}) {
  0            
427 0           $self->_destroy_connection($conn);
428             }
429 0           $self->{_idle} = [];
430              
431             # Fail all pending acquire waiters
432 0           for my $waiter (@{$self->{_waiters}}) {
  0            
433 0 0         $waiter->fail(Async::Redis::Error::Disconnected->new(
434             message => "Pool is shutting down",
435             )) unless $waiter->is_ready;
436             }
437 0           $self->{_waiters} = [];
438             }
439              
440             1;
441              
442             __END__