File Coverage

blib/lib/AnyEvent/ConnPool.pm
Criterion Covered Total %
statement 139 170 81.7
branch 31 46 67.3
condition 4 13 30.7
subroutine 27 32 84.3
pod 5 9 55.5
total 206 270 76.3


line stmt bran cond sub pod time code
1             =head1 NAME
2              
3             AnyEvent::ConnPool
4              
5             =head1 DESCRIPTION
6              
7             Simple connections pool designed for asynchronous connections
8              
9             =head1 METHODS
10              
11             =over
12              
13             =cut
14              
15             package AnyEvent::ConnPool;
16 6     6   40835 use strict;
  6         10  
  6         218  
17 6     6   26 use warnings;
  6         9  
  6         151  
18              
19 6     6   4746 use AnyEvent;
  6         20942  
  6         160  
20 6     6   36 use Carp;
  6         8  
  6         563  
21              
22             our $VERSION = 0.11;
23              
24             my $PID;
25              
26             BEGIN {
27 6     6   5882 $PID = $$;
28             }
29              
30              
31             =item B
32              
33             Returns new pool object
34              
35             AnyEvent::ConnPool->new(
36             constructor => sub {
37             return generate_connection();
38             },
39             check => {
40             cb => sub {
41             my $connection = shift;
42             ...
43             if ($connection->conn()->ping()) {
44             return 1;
45             }
46             return 0;
47             },
48             interval => 10,
49             },
50             size => 5,
51             init => 1,
52             );
53              
54              
55             constructor => subroutine, which generates connection for pool.
56              
57             check => pingers, allows to specify methods and interval for connection state validation.
58             check->{cb} => callback, used for ping connection. You should implement this logic by yourself.
59             If you need reconnect, you can just call
60              
61             $connection->reconnect();
62              
63             check->{interval} => interval for the callback.
64              
65             size => how many connections should be created on pool initialization.
66              
67             init => initialize connections on pool construction.
68              
69             dispatcher => returns dispatcher object instead of pool object. You can use dispatcher as wrapper around your connection. In that case pool will use
70             all it's features behind the scene. You can get dispatcher not only from constructor, you can get it from pool with dispatcher method. For example:
71            
72             # with your connection
73             $connection->selectall_arrayref(...);
74              
75             # pooled
76             my $dispatcher = $connpool->dispatcher();
77             # same thing as selectall_arrayref above, but with connpool behind the scene.
78             $dispatcher->selectall_arrayref(...);
79             =cut
80              
81             sub new {
82 5     5 1 3749 my ($class, %opts) = @_;
83              
84 0     0   0 my $self = {
85             constructor => sub {1;},
86 5         43 _payload => [],
87             index => 0,
88             init => 0,
89             count => 0,
90             };
91              
92 5 50       20 if (!$opts{constructor}) {
93 0         0 croak "Missing mandatory param constructor.";
94             }
95              
96 5 50       22 if (ref $opts{constructor} ne 'CODE') {
97 0         0 croak "Constructor should be a code reference.";
98             }
99              
100 5 50 33     25 if ($opts{dispatcher} && !$opts{init}) {
101 0         0 croak "Can't get dispatcher on uninitialized pool";
102             }
103              
104 5         15 $self->{constructor} = $opts{constructor};
105              
106 5 100       22 if ($opts{check}) {
107 1 50       3 if (ref $opts{check} ne 'HASH') {
108 0         0 croak "Check param should be a hash reference.";
109             }
110 1 50       4 if (!$opts{check}->{cb}) {
111 0         0 croak 'Missing cb param.';
112             }
113            
114 1 50       3 if (ref $opts{check}->{cb} ne 'CODE') {
115 0         0 croak 'Cb param should be a code reference.';
116             }
117 1 50       4 if (!$opts{check}->{interval}) {
118 0         0 croak 'Missing interval param.';
119             }
120             # TODO: Add interval parameter validation
121            
122 1         3 $self->{check} = $opts{check};
123             }
124              
125 5 50       15 if ($opts{size}) {
126             #TODO: add validation for size
127              
128 5         10 $self->{size} = $opts{size};
129             }
130              
131 5         10 bless $self, $class;
132              
133 5 100       16 if ($opts{init}) {
134 4         15 $self->init();
135             }
136            
137 5 50       12 if ($opts{dispatcher}) {
138 0         0 return $self->dispatcher();
139             }
140 5         11 return $self;
141             }
142              
143              
144             =item B
145              
146             Initializes pool.
147              
148             =cut
149              
150             sub init {
151 5     5 1 14 my ($self, $conn_count) = @_;
152            
153 5 50       29 if ($self->{init}) {
154 0         0 croak "Can't initialize already initilized pool.";
155             }
156            
157 5   33     54 $conn_count ||= delete $self->{size};
158              
159 5 50       15 unless ($conn_count) {
160 0         0 croak "Can't initilize empty pool";
161             }
162            
163 5         15 for (1 .. $conn_count) {
164 26         42 $self->add();
165             }
166            
167 5 100       19 if ($self->{check}) {
168 1         2 my $guard; $guard = AnyEvent->timer (
169             after => $self->{check}->{interval},
170             interval => $self->{check}->{interval},
171             cb => sub {
172 1     1   999137 my $temp_guard = $guard;
173 1         15 for (my $i = 0; $i < $self->{count}; $i++) {
174 3         5 my $conn = $self->{_payload}->[$i];
175             eval {
176 3         11 $self->{check}->{cb}->($conn);
177 3         78 1;
178 3 50       2 } or do {
179 0         0 carp "Error occured: $@";
180             };
181             }
182             },
183 1         12 );
184             }
185              
186 5         26 $self->{init} = 1;
187 5         9 return 1;
188             }
189              
190              
191             =item B
192              
193             Returns dispatcher object instead of pool object. It allows you to call connection's method directly, if you don't care about pool mechanism.
194             And it's simple.
195              
196             my $dispatcher = $connpool->dispatcher();
197             $dispatcher->selectall_arrayref(...);
198             # equivalent to:
199             $connpool->get()->conn()->selectall_arrayref(...);
200              
201             =cut
202              
203             sub dispatcher {
204 1     1 1 4 my ($self) = @_;
205              
206 1         3 my $dispatcher = {
207             _pool => $self,
208             };
209              
210 1         3 bless $dispatcher, 'AnyEvent::ConnPool::Dispatcher';
211 1         1 return $dispatcher;
212             }
213              
214              
215             =item B
216              
217             Adds connection to the pool.
218              
219             =cut
220              
221             sub add {
222 26     26 1 37 my ($self, $count) = @_;
223            
224             # TODO: add count support
225 26         81 my $conn = $self->{constructor}->();
226 26         192 my $unit = AnyEvent::ConnPool::Unit->new($conn,
227             index => $self->{count},
228             constructor => $self->{constructor},
229             );
230              
231 26         42 $self->_add_object_raw($unit);
232             }
233              
234              
235             =item B
236              
237             Returns AnyEvent::ConnPool::Unit object from the pool.
238              
239             my $unit = $pool->get();
240             my $connection = $unit->conn();
241              
242             =cut
243              
244             sub get {
245 65     65 1 191 my ($self, $index) = @_;
246            
247 65 100       140 if (defined $index) {
248 3         7 return $self->{_payload}->[$index];
249             }
250              
251 62 100       122 if ($self->{index} + 1 > $self->{count}) {
252 2         3 $self->{index} = 0;
253             }
254              
255 62         75 my $retval = $self->{_payload}->[$self->{index}];
256            
257 62 100       81 if ($retval->locked()) {
258 15         20 $self->{locked}->{$self->{index}} = 1;
259 15         20 $retval = $self->get_free_connection($self->{index});
260             }
261             else {
262 47         64 delete $self->{locked}->{$self->{index}};
263             }
264              
265 62 50       92 if (wantarray) {
266 0         0 $self->{index}++;
267 0         0 return ($index, $retval);
268             }
269             else {
270 62         60 $self->{index}++;
271 62         93 return $retval;
272             }
273             }
274              
275              
276             sub grow {
277 0     0 0 0 my ($self, $count) = @_;
278              
279 0   0     0 $count ||= 1;
280 0         0 for (1 .. $count) {
281 0         0 $self->add();
282             }
283 0         0 return 1;
284             }
285              
286              
287             sub shrink {
288 0     0 0 0 my ($self, $count) = @_;
289              
290 0   0     0 $count ||= 1;
291 0         0 for (1 .. $count) {
292 0         0 pop @{$self->{_payload}};
  0         0  
293             }
294 0         0 return 1;
295             }
296              
297              
298             # utility functions
299              
300             sub get_free_connection {
301 15     15 0 16 my ($self, $desired_index) = @_;
302            
303 15         9 my $retval = undef;
304 15         20 my @balanced_array = $self->balance_array($desired_index);
305              
306 15         18 for my $i (@balanced_array) {
307 44         39 my $conn = $self->{_payload}->[$i];
308 44 100       48 unless ($conn->locked) {
309 15         9 $retval = $conn;
310 15         13 $self->{index} = $i;
311 15         13 last;
312             }
313             }
314 15         20 return $retval;
315            
316             }
317              
318              
319             sub balance_array {
320 15     15 0 12 my ($self, $index) = @_;
321              
322 15         14 my $count = $self->{count};
323 15         13 $index++;
324 15         9 $count--;
325              
326 15 100 66     48 if ($index == 0 || $index >= $count) {
327 14         29 return (0 .. $count);
328             }
329              
330 1         4 return (($index .. $count), 0 .. $index - 1);
331             }
332              
333              
334             sub _add_object_raw {
335 26     26   27 my ($self, $object, $position) = @_;
336            
337 26 50       42 if (defined $position) {
338 0         0 $self->{_payload}->[$self->{index}] = $object;
339             }
340             else {
341 26         17 push @{$self->{_payload}}, $object;
  26         36  
342             }
343              
344 26         23 $self->{count} = scalar @{$self->{_payload}};
  26         44  
345 26         40 return 1;
346             }
347              
348              
349             =back
350             =cut
351              
352             1;
353              
354             package AnyEvent::ConnPool::Dispatcher;
355 6     6   35 use strict;
  6         7  
  6         211  
356 6     6   22 no strict qw/refs/;
  6         7  
  6         145  
357 6     6   23 use warnings;
  6         9  
  6         170  
358 6     6   4064 use Data::Dumper;
  6         46274  
  6         419  
359 6     6   40 use Carp;
  6         6  
  6         1082  
360              
361             our $AUTOLOAD;
362             sub AUTOLOAD {
363 1     1   12 my ($d, @params) = @_;
364              
365 1         1 my $program = $AUTOLOAD;
366 1         7 $program =~ s/.*:://;
367            
368 1         4 my $conn = $d->{_pool}->get()->conn();
369 1         6 my $reference = sprintf("%s::%s", ref ($conn), $program);
370            
371 1         2 $_[0] = $conn;
372              
373 1         1 goto &{$reference};
  1         9  
374             }
375              
376             1;
377              
378             package AnyEvent::ConnPool::Unit;
379             =head1 NAME
380              
381             AnyEvent::ConnPool::Unit
382              
383             =head1 DESCRIPTION
384              
385             Connection unit. Just wrapper around user-specified connection.
386             Required for transactions support.
387              
388             =head1 METHODS
389              
390             =over
391              
392             =cut
393              
394 6     6   32 use strict;
  6         10  
  6         188  
395 6     6   59 use warnings;
  6         8  
  6         1626  
396              
397             sub new {
398 26     26   42 my ($class, $object, %opts) = @_;
399              
400 26         34 my ($index, $constructor) = ($opts{index}, $opts{constructor});
401              
402 26         54 my $unit = {
403             _conn => $object,
404             _locked => 0,
405             _index => $index,
406             _constructor => $constructor,
407             };
408              
409 26         39 bless $unit, $class;
410 26         40 return $unit;
411             }
412              
413              
414             =item B
415              
416             Returns connection from unit object.
417              
418             =cut
419              
420             sub conn {
421 122     122   10035 my $self = shift;
422 122         230 return $self->{_conn};
423             }
424              
425              
426             =item B
427              
428             Locks current connection. After that connection shouldn't be used in balancing mechanism and never will be
429             returned from pool. To unlock connection you should use unlock method.
430              
431             $connection->lock();
432              
433             =cut
434              
435             sub lock {
436 3     3   7 my ($self) = @_;
437              
438 3         6 $self->{_locked} = 1;
439 3         4 return 1;
440             }
441              
442              
443             =item B
444              
445             Unlocks connection and returns it to the balancing scheme.
446              
447             $connection->unlock();
448              
449             =cut
450              
451             sub unlock {
452 0     0   0 my ($self) = @_;
453              
454 0         0 delete $self->{_locked};
455 0         0 return 1;
456             }
457              
458              
459             =item B
460              
461             Returns true if connection is locked.
462              
463             if ($connection->locked()) {
464             ...
465             }
466              
467             =cut
468              
469             sub locked {
470 106     106   79 my ($self) = @_;
471            
472 106         203 return $self->{_locked};
473             }
474              
475              
476             sub index {
477 0     0   0 my ($self) = @_;
478 0         0 return $self->{_index};
479             }
480              
481              
482             sub reconnect {
483 1     1   3 my ($self) = @_;
484              
485 1 50       4 if ($self->{_constructor}) {
486 1         2 $self->{_conn} = $self->{_constructor}->();
487             }
488 1         4 return 1;
489             }
490             =back
491             =cut
492              
493             1;
494