File Coverage

blib/lib/Database/Async/Pool.pm
Criterion Covered Total %
statement 93 105 88.5
branch 9 16 56.2
condition 5 9 55.5
subroutine 25 28 89.2
pod 4 14 28.5
total 136 172 79.0


line stmt bran cond sub pod time code
1             package Database::Async::Pool;
2              
3 5     5   231007 use strict;
  5         9  
  5         208  
4 5     5   24 use warnings;
  5         7  
  5         332  
5              
6 5     5   26 use parent qw(IO::Async::Notifier);
  5         9  
  5         32  
7              
8             our $VERSION = '0.019'; # VERSION
9              
10             =head1 NAME
11              
12             Database::Async::Pool - connection manager for L
13              
14             =head1 DESCRIPTION
15              
16             =cut
17              
18 5     5   19096 use Database::Async::Backoff::Exponential;
  5         17  
  5         266  
19 5     5   2799 use Database::Async::Backoff::None;
  5         17  
  5         273  
20              
21 5     5   35 use Future;
  5         9  
  5         332  
22 5     5   34 use Future::AsyncAwait qw(:experimental);
  5         8  
  5         37  
23 5     5   1136 use Syntax::Keyword::Try;
  5         1877  
  5         41  
24 5     5   516 use Scalar::Util qw(blessed refaddr);
  5         10  
  5         411  
25 5     5   3005 use List::UtilsBy qw(extract_by);
  5         23875  
  5         678  
26 5     5   669 use Log::Any qw($log);
  5         15264  
  5         93  
27              
28             sub new {
29 6     6 1 3330 my ($class, %args) = @_;
30 6         17 my $backoff = delete $args{backoff};
31 6 50       25 unless(blessed $backoff) {
32 6         13 my $type = 'exponential';
33 6 100 66     28 $type = $backoff if $backoff and not ref $backoff;
34 6 50 66     65 $backoff = Database::Async::Backoff->instantiate(
35             type => $type,
36             initial_delay => 0.010,
37             max_delay => 30,
38             ($backoff && ref($backoff) ? %$backoff : ())
39             )
40             }
41             bless {
42 6         151 pending_count => 0,
43             count => 0,
44             min => 0,
45             max => 1,
46             attempts => undef,
47             ordering => 'serial',
48             backoff => $backoff,
49             waiting => [],
50             ready => [],
51             %args
52             }, $class
53             }
54              
55 4     4 0 25 sub min { shift->{min} }
56 6     6 0 99 sub max { shift->{max} }
57 7     7 0 47 sub count { shift->{count} }
58 2     2 0 93 sub pending_count { shift->{pending_count} }
59 5     5 0 42 sub backoff { shift->{backoff} }
60              
61             sub register_engine {
62 2     2 0 35 my ($self, $engine) = @_;
63 2         8 --$self->{pending_count};
64 2         6 ++$self->{count};
65 2         7 $self
66             }
67              
68             sub unregister_engine {
69 0     0 0 0 my ($self, $engine) = @_;
70             try {
71             $log->tracef('Engine is removed from the pool, with %d in the queue', 0 + @{$self->{waiting}});
72             my $addr = refaddr($engine);
73             # This engine may have been actively processing a request, and not in the pool:
74             # that's fine, we only remove if we had it.
75 0     0   0 my $count = () = extract_by { refaddr($_) == $addr } @{$self->{ready}};
76             $log->tracef('Removed %d engine instances from the ready pool', $count);
77             # Any engine that wasn't in the ready queue (`count`) was out on assignment
78             # and thus included in `pending_count`
79             --$self->{$count ? 'count' : 'pending_count'};
80             $log->infof('After cleanup we have %d count, %d pending, %d waiting', $self->{count}, $self->{pending_count}, 0 + @{$self->{waiting}});
81             $self->process_pending->retain if @{$self->{waiting}};
82 0         0 } catch ($e) {
83             $log->errorf('Failed %s', $e);
84             }
85 0         0 $self
86             }
87              
88             =head2 queue_ready_engine
89              
90             Called when there's a spare engine we can put back in the pool.
91              
92             =cut
93              
94             sub queue_ready_engine {
95 1     1 1 4 my ($self, $engine) = @_;
96 1         3 $log->tracef('Engine is now ready, with %d in the queue', 0 + @{$self->{waiting}});
  1         10  
97 1 50       5 return $self->notify_engine($engine) if @{$self->{waiting}};
  1         6  
98 1         2 push @{$self->{ready}}, $engine;
  1         756  
99 1         8 $self
100             }
101              
102             =head2 notify_engine
103              
104             We call this internally to hand an engine over to the next
105             waiting request.
106              
107             =cut
108              
109             sub notify_engine {
110 0     0 1 0 my ($self, $engine) = @_;
111             die 'unable to notify, we have no pending requests'
112 0 0       0 unless my $f = shift @{$self->{waiting}};
  0         0  
113 0         0 $f->done($engine);
114 0         0 return $self;
115             }
116              
117             =head2 next_engine
118              
119             Resolves to an engine. May need to wait if there are none available.
120              
121             =cut
122              
123 2     2 1 3894 async sub next_engine {
124 2         6 my ($self) = @_;
125 2         6 $log->tracef('Have %d ready engines to use', 0 + @{$self->{ready}});
  2         46  
126 2 100       10 if(my $engine = shift @{$self->{ready}}) {
  2         24  
127 1         31 return $engine;
128             }
129 1         4 push @{$self->{waiting}}, my $f = $self->new_future;
  1         6  
130 1         41 await $self->process_pending;
131 1         42 return await $f;
132             }
133              
134 1     1 0 2 async sub process_pending {
135 1         3 my ($self) = @_;
136 1         5 my $total = $self->count + $self->pending_count;
137 1         9 $log->tracef('Might request, current count is %d/%d (%d pending, %d active)', $total, $self->max, $self->pending_count, $self->count);
138 1 50       8 await $self->request_engine unless $total >= $self->max;
139 1         81 return;
140             }
141              
142             sub new_future {
143 1     1 0 3 my ($self, $label) = @_;
144             (
145             $self->{new_future} //= sub {
146 1     1   9 Future->new->set_label($_[1])
147             }
148 1   33     14 )->($label)
149             }
150              
151 1     1 0 5 async sub request_engine {
152 1         5 my ($self) = @_;
153 1         5 $log->tracef('Pool requesting new engine');
154 1         5 ++$self->{pending_count};
155 1         5 my $delay = $self->backoff->next;
156 1 50       4 if($delay) {
157 0         0 my $f = $self->loop->delay_future(
158             after => $delay
159             );
160             CANCEL { $f->cancel }
161 0         0 await $f;
  0         0  
162             }
163 1         7 my $req = $self->{request_engine}->();
164             CANCEL { $req->cancel }
165 1         46 await $req;
  1         30  
166 1         39 $self->backoff->reset;
167             }
168              
169             sub _remove_from_loop {
170 2     2   163 my ($self, $loop) = @_;
171 2         5 $_->cancel for splice @{$self->{waiting}};
  2         10  
172 2         5 $self->unregister_engine($_) for splice @{$self->{ready}};
  2         7  
173 2         49 return $self->next::method($loop);
174             }
175              
176             1;
177              
178             =head1 AUTHOR
179              
180             Tom Molesworth C<< >>
181              
182             =head1 LICENSE
183              
184             Copyright Tom Molesworth 2011-2023. Licensed under the same terms as Perl itself.
185