File Coverage

blib/lib/Async/ResourcePool.pm
Criterion Covered Total %
statement 9 29 31.0
branch 0 4 0.0
condition n/a
subroutine 3 11 27.2
pod n/a
total 12 44 27.2


line stmt bran cond sub pod time code
1             package Async::ResourcePool v0.1.3;
2              
3             =head1 NAME
4              
5             Async::ResourcePool - Resource pooling for asynchronous programs.
6              
7             =head1 DESCRIPTION
8              
9             This module implements the simple functionality of creating a source pool for
10             event-based/asynchronous programs. It provides consumers with the ability to
11             have some code execute whenever a resource happens to be ready. Further, it
12             allows resources to be categorized (by label) and limited as such.
13              
14             =cut
15              
16 3     3   85939 use strict;
  3         8  
  3         182  
17 3     3   18 use warnings FATAL => "all";
  3         10  
  3         432  
18 3     3   20 use Carp qw( croak );
  3         15  
  3         5761  
19              
20             =head1 CONSTRUCTOR
21              
22             =over 4
23              
24             =item new [ ATTRIBUTES ]
25              
26             =cut
27              
28             sub new {
29 0     0     my ($class, %params) = @_;
30              
31 0           my $self = bless {
32             %params,
33              
34             _resources => {},
35             _allocated => 0,
36             _wait_queue => [],
37             _available_queue => [],
38             }, $class;
39              
40 0           return $self;
41             }
42              
43             =back
44              
45             =head1 ATTRIBUTES
46              
47             =over 4
48              
49             =item factory -> CodeRef(POOL, CodeRef(RESOURCE, MESSAGE))
50              
51             The factory for generating the resource. The factory is a subroutine reference
52             which accepts an instance of this object and a callback as a reference. The
53             callback, to be invoked when the resource has been allocated.
54              
55             If no resource could be allocated due to error, then undef should be supplied
56             with the second argument being a string describing the failure.
57              
58             =cut
59              
60             sub factory {
61 0     0     my ($self, $value) = @_;
62              
63 0 0         if (@_ == 2) {
64 0           $self->{factory} = $value;
65             }
66              
67 0           $self->{factory};
68             }
69              
70             =item limit -> Int
71              
72             The number of resources to create per label.
73              
74             Optional.
75              
76             =cut
77              
78             sub limit {
79 0     0     my ($self, $value) = @_;
80              
81 0 0         if (@_ == 2) {
82 0           $self->{limit} = $value;
83             }
84              
85 0           $self->{limit};
86             }
87              
88             =item has_waiters -> Bool
89              
90             A flag indicating whether or not this pool currently has a wait queue.
91              
92             Read-only.
93              
94             =cut
95              
96             sub has_waiters {
97 0     0     return scalar @{ shift->{_wait_queue} };
  0            
98             }
99              
100             =item has_available_queue -> Bool
101              
102             A flag indicating whether or not this pool has any idle resources available.
103              
104             Read-only.
105              
106             =cut
107              
108             sub has_available {
109 0     0     return scalar @{ shift->{_available_queue} };
  0            
110             }
111              
112             =item size -> Int
113              
114             The current size of the pool.
115              
116             Read-only.
117              
118             =cut
119              
120             sub size {
121 0     0     return shift->{_allocated};
122             }
123              
124             =back
125              
126             =head1 METHODS
127              
128             =cut
129              
130             sub _track_resource {
131 0     0     my ($self, $resource) = @_;
132              
133 0           $self->{_resources}->{$resource} = $resource;
134             }
135              
136             sub _is_tracked {
137 0     0     my ($self, $resource) = @_;
138              
139 0           return exists $self->{_resources}{$resource};
140             }
141              
142             sub _prevent_halt {
143             my ($self) = @_;
144              
145             if ($self->has_waiters) {
146             $self->lease(shift $self->{_wait_queue});
147             }
148             }
149              
150             =over 4
151              
152             =item lease CALLBACK(RESOURCE, MESSAGE)
153              
154             Request a lease, with a callback invoked when the resource becomes available.
155             The first argument of the callback will be the resource, if it was able to be
156             granted, the second argument of the callback will be the error message, which
157             will only be defined if first argument is not.
158              
159             =cut
160              
161             sub lease {
162             my ($self, $callback) = @_;
163              
164             if ($self->has_available) {
165             my $resource = shift $self->{_available_queue};
166              
167             delete $self->{_available}{$resource};
168              
169             $callback->($resource);
170             }
171             else {
172             my $allocated = $self->size;
173              
174             unless ($allocated == $self->limit) {
175             $self->{_allocated}++;
176              
177             $self->factory->(
178             $self,
179             sub {
180             my ($resource, $message) = @_;
181              
182             if (defined $resource) {
183             $self->_track_resource($resource);
184             }
185             else {
186             # Decrement the semaphore so that we don't
187             # degrade the pool on an error state.
188             $self->{_allocated}--;
189              
190             # Prevent halting by reentering the allocation
191             # routine if we have waiters, since we just
192             # lost a resource from the semaphore.
193             $self->_prevent_halt;
194             }
195              
196             $callback->($resource, $message);
197             }
198             );
199             }
200             else {
201             push $self->{_wait_queue}, $callback;
202             }
203             }
204             }
205              
206             =item release RESOURCE
207              
208             Return a resource to the pool. This will signal any waiters which haven't yet
209             received a callback.
210              
211             =cut
212              
213             sub release {
214             my ($self, $resource) = @_;
215              
216             # Ignore resources which are not tracked.
217             # This may mean they've been invalidated.
218             if ($self->{_resources}{$resource}) {
219             unless ($self->{_available}{$resource}) {
220             if ($self->has_waiters) {
221             my $callback = shift $self->{_wait_queue};
222              
223             $callback->($resource);
224             }
225             else {
226             $self->{_available}{$resource} = $resource;
227              
228             push $self->{_available_queue}, $resource;
229             }
230             }
231             else {
232             croak "Attempted to release resource twice: $resource";
233             }
234             }
235             else {
236             croak "Attempted to release untracked resource, $resource";
237             }
238             }
239              
240             =item invalidate RESOURCE
241              
242             Invalidate a resource, signaling that it is no longer valid and no longer can
243             be distributed by this pool. This will allocate another resource if there are
244             any waiters.
245              
246             =cut
247              
248             sub invalidate {
249             my ($self, $resource) = @_;
250              
251             my $resources = $self->{_resources};
252             my $available = $self->{_available_queue};
253              
254             $self->{_allocated}--;
255              
256             my $resource_name = "$resource";
257              
258             if (delete $resources->{$resource_name}) {
259             # Strip the resource from the available queue so we don't accidently
260             # dispatch it.
261             @$available = grep $_ != $resource, @$available;
262              
263             delete $resources->{_available}{$resource_name};
264              
265             $self->_prevent_halt;
266             }
267             }
268              
269             =back
270              
271             =cut
272              
273             return __PACKAGE__;