File Coverage

blib/lib/AnyEvent/ConnPool.pm
Criterion Covered Total %
statement 146 178 82.0
branch 37 54 68.5
condition 6 19 31.5
subroutine 27 32 84.3
pod 6 10 60.0
total 222 293 75.7


line stmt bran cond sub pod time code
1             =head1 NAME
2              
3             AnyEvent::ConnPool
4              
5             =head1 DESCRIPTION
6              
7             Simple connections pool designed for asynchronous connections
8              
9             =head1 METHODS
10              
11             =over
12              
13             =cut
14              
15             package AnyEvent::ConnPool;
16 6     6   39741 use strict;
  6         13  
  6         209  
17 6     6   30 use warnings;
  6         6  
  6         159  
18              
19 6     6   5762 use AnyEvent;
  6         25763  
  6         172  
20 6     6   34 use Carp;
  6         8  
  6         554  
21              
22             our $VERSION = 0.12;
23              
24             my $PID;
25              
26             BEGIN {
27 6     6   6422 $PID = $$;
28             }
29              
30              
31             =item B
32              
33             Returns new pool object
34              
35             AnyEvent::ConnPool->new(
36             constructor => sub {
37             return generate_connection();
38             },
39             check => {
40             cb => sub {
41             my $connection = shift;
42             ...
43             if ($connection->conn()->ping()) {
44             return 1;
45             }
46             return 0;
47             },
48             interval => 10,
49             },
50             size => 5,
51             init => 1,
52             );
53              
54              
55             constructor => subroutine, which generates connection for pool.
56              
57             check => pingers, allows to specify methods and interval for connection state validation.
58             check->{cb} => callback, used for ping connection. You should implement this logic by yourself.
59             If you need reconnect, you can just call
60              
61             $connection->reconnect();
62              
63             check->{interval} => interval for the callback.
64              
65             size => how many connections should be created on pool initialization.
66              
67             init => initialize connections on pool construction.
68              
69             dispatcher => returns dispatcher object instead of pool object. You can use dispatcher as wrapper around your connection. In that case pool will use
70             all it's features behind the scene. You can get dispatcher not only from constructor, you can get it from pool with dispatcher method. For example:
71            
72             # with your connection
73             $connection->selectall_arrayref(...);
74              
75             # pooled
76             my $dispatcher = $connpool->dispatcher();
77             # same thing as selectall_arrayref above, but with connpool behind the scene.
78             $dispatcher->selectall_arrayref(...);
79             # you can always get connpool from dispatcher:
80             my $pool = AnyEvent::ConnPool->pool_from_dispatcher($dispatcher);
81              
82              
83             =cut
84              
85             sub new {
86 5     5 1 3893 my ($class, %opts) = @_;
87              
88 0     0   0 my $self = {
89             constructor => sub {1;},
90 5         46 _payload => [],
91             index => 0,
92             init => 0,
93             count => 0,
94             };
95              
96 5 50       22 if (!$opts{constructor}) {
97 0         0 croak "Missing mandatory param constructor.";
98             }
99              
100 5 50       21 if (ref $opts{constructor} ne 'CODE') {
101 0         0 croak "Constructor should be a code reference.";
102             }
103              
104 5 50 33     28 if ($opts{dispatcher} && !$opts{init}) {
105 0         0 croak "Can't get dispatcher on uninitialized pool";
106             }
107              
108 5         14 $self->{constructor} = $opts{constructor};
109              
110 5 100       24 if ($opts{check}) {
111 1 50       4 if (ref $opts{check} ne 'HASH') {
112 0         0 croak "Check param should be a hash reference.";
113             }
114 1 50       4 if (!$opts{check}->{cb}) {
115 0         0 croak 'Missing cb param.';
116             }
117            
118 1 50       4 if (ref $opts{check}->{cb} ne 'CODE') {
119 0         0 croak 'Cb param should be a code reference.';
120             }
121 1 50       3 if (!$opts{check}->{interval}) {
122 0         0 croak 'Missing interval param.';
123             }
124             # TODO: Add interval parameter validation
125            
126 1         2 $self->{check} = $opts{check};
127             }
128              
129 5 50       16 if ($opts{size}) {
130             #TODO: add validation for size
131              
132 5         12 $self->{size} = $opts{size};
133             }
134              
135 5         10 bless $self, $class;
136              
137 5 100       15 if ($opts{init}) {
138 4         16 $self->init();
139             }
140            
141 5 50       15 if ($opts{dispatcher}) {
142 0         0 return $self->dispatcher();
143             }
144 5         13 return $self;
145             }
146              
147              
148             =item B
149              
150             Initializes pool.
151              
152             =cut
153              
154             sub init {
155 5     5 1 13 my ($self, $conn_count) = @_;
156            
157 5 50       31 if ($self->{init}) {
158 0         0 croak "Can't initialize already initilized pool.";
159             }
160            
161 5   33     36 $conn_count ||= delete $self->{size};
162              
163 5 50       15 unless ($conn_count) {
164 0         0 croak "Can't initilize empty pool";
165             }
166            
167 5         16 for (1 .. $conn_count) {
168 26         62 $self->add();
169             }
170            
171 5 100       18 if ($self->{check}) {
172 1         2 my $guard; $guard = AnyEvent->timer (
173             after => $self->{check}->{interval},
174             interval => $self->{check}->{interval},
175             cb => sub {
176 1     1   998155 my $temp_guard = $guard;
177 1         10 for (my $i = 0; $i < $self->{count}; $i++) {
178 3         5 my $conn = $self->{_payload}->[$i];
179             eval {
180 3         17 $self->{check}->{cb}->($conn);
181 3         69 1;
182 3 50       5 } or do {
183 0         0 carp "Error occured: $@";
184             };
185             }
186             },
187 1         12 );
188             }
189              
190 5         23 $self->{init} = 1;
191 5         8 return 1;
192             }
193              
194              
195             =item B
196              
197             Returns dispatcher object instead of pool object. It allows you to call connection's method directly, if you don't care about pool mechanism.
198             And it's simple.
199              
200             my $dispatcher = $connpool->dispatcher();
201             $dispatcher->selectall_arrayref(...);
202             # equivalent to:
203             $connpool->get()->conn()->selectall_arrayref(...);
204              
205             =cut
206              
207             sub dispatcher {
208 1     1 1 4 my ($self) = @_;
209              
210 1         1 my $dispatcher = {
211             _pool => $self,
212             };
213              
214 1         2 bless $dispatcher, 'AnyEvent::ConnPool::Dispatcher';
215 1         2 return $dispatcher;
216             }
217              
218              
219             =item B
220              
221             Returns pool object from dispatcher object. You can call it by 3 ways:
222              
223             my $pool = AnyEvent::ConnPool::pool_from_dispatcher($dispatcher);
224             my $pool = AnyEvent::ConnPool->pool_from_dispatcher($dispatcher);
225             my $pool = $connpool->pool_from_dispatcher($dispatcher);
226              
227             =cut
228              
229             sub pool_from_dispatcher {
230 3     3 1 987 my ($p1, $p2) = @_;
231 3         5 my $dispatcher = undef;
232              
233 3 50       9 if ($p1) {
234             # $p1 is just string, called as AnyEvent::ConnPool->dispatcher_from_pool($dispatcher)
235             # so, dispatcher is in $p2
236 3 100       12 if (!ref $p1) {
    100          
237 1         3 $dispatcher = $p2;
238             }
239             elsif (ref $p1 eq __PACKAGE__) {
240 1         3 $dispatcher = $p2;
241             }
242             else {
243 1         3 $dispatcher = $p1;
244             }
245             }
246              
247 3   33     9 $dispatcher ||= $p2;
248 3 50 33     19 if (!$dispatcher || ref $dispatcher ne 'AnyEvent::ConnPool::Dispatcher') {
249 0         0 croak "No dispatcher";
250             }
251            
252 3         9 return $dispatcher->{_pool};
253             }
254              
255              
256             =item B
257              
258             Adds connection to the pool.
259              
260             =cut
261              
262             sub add {
263 26     26 1 23 my ($self, $count) = @_;
264            
265             # TODO: add count support
266 26         47 my $conn = $self->{constructor}->();
267 26         201 my $unit = AnyEvent::ConnPool::Unit->new($conn,
268             index => $self->{count},
269             constructor => $self->{constructor},
270             );
271              
272 26         42 $self->_add_object_raw($unit);
273             }
274              
275              
276             =item B
277              
278             Returns AnyEvent::ConnPool::Unit object from the pool.
279              
280             my $unit = $pool->get();
281             my $connection = $unit->conn();
282              
283             =cut
284              
285             sub get {
286 66     66 1 199 my ($self, $index) = @_;
287            
288 66 100       127 if (defined $index) {
289 3         6 return $self->{_payload}->[$index];
290             }
291              
292 63 100       135 if ($self->{index} + 1 > $self->{count}) {
293 2         3 $self->{index} = 0;
294             }
295              
296 63         81 my $retval = $self->{_payload}->[$self->{index}];
297            
298 63 100       95 if ($retval->locked()) {
299 15         21 $self->{locked}->{$self->{index}} = 1;
300 15         21 $retval = $self->get_free_connection($self->{index});
301             }
302             else {
303 48         76 delete $self->{locked}->{$self->{index}};
304             }
305              
306 63 50       102 if (wantarray) {
307 0         0 $self->{index}++;
308 0         0 return ($index, $retval);
309             }
310             else {
311 63         68 $self->{index}++;
312 63         103 return $retval;
313             }
314             }
315              
316              
317             sub grow {
318 0     0 0 0 my ($self, $count) = @_;
319              
320 0   0     0 $count ||= 1;
321 0         0 for (1 .. $count) {
322 0         0 $self->add();
323             }
324 0         0 return 1;
325             }
326              
327              
328             sub shrink {
329 0     0 0 0 my ($self, $count) = @_;
330              
331 0   0     0 $count ||= 1;
332 0         0 for (1 .. $count) {
333 0         0 pop @{$self->{_payload}};
  0         0  
334             }
335 0         0 return 1;
336             }
337              
338              
339             # utility functions
340              
341             sub get_free_connection {
342 15     15 0 15 my ($self, $desired_index) = @_;
343            
344 15         14 my $retval = undef;
345 15         19 my @balanced_array = $self->balance_array($desired_index);
346              
347 15         19 for my $i (@balanced_array) {
348 44         45 my $conn = $self->{_payload}->[$i];
349 44 100       50 unless ($conn->locked) {
350 15         16 $retval = $conn;
351 15         13 $self->{index} = $i;
352 15         16 last;
353             }
354             }
355 15         21 return $retval;
356            
357             }
358              
359              
360             sub balance_array {
361 15     15 0 10 my ($self, $index) = @_;
362              
363 15         16 my $count = $self->{count};
364 15         13 $index++;
365 15         9 $count--;
366              
367 15 100 66     52 if ($index == 0 || $index >= $count) {
368 14         32 return (0 .. $count);
369             }
370              
371 1         5 return (($index .. $count), 0 .. $index - 1);
372             }
373              
374              
375             sub _add_object_raw {
376 26     26   29 my ($self, $object, $position) = @_;
377            
378 26 50       40 if (defined $position) {
379 0         0 $self->{_payload}->[$self->{index}] = $object;
380             }
381             else {
382 26         23 push @{$self->{_payload}}, $object;
  26         41  
383             }
384              
385 26         19 $self->{count} = scalar @{$self->{_payload}};
  26         35  
386 26         43 return 1;
387             }
388              
389              
390             =back
391             =cut
392              
393             1;
394              
395             package AnyEvent::ConnPool::Dispatcher;
396 6     6   32 use strict;
  6         11  
  6         207  
397 6     6   29 no strict qw/refs/;
  6         8  
  6         146  
398 6     6   22 use warnings;
  6         8  
  6         170  
399 6     6   24 use Carp;
  6         7  
  6         1109  
400              
401             our $AUTOLOAD;
402             sub AUTOLOAD {
403 2     2   2078 my ($d, @params) = @_;
404              
405 2         6 my $program = $AUTOLOAD;
406 2         20 $program =~ s/.*:://;
407            
408 2         16 my $conn = $d->{_pool}->get()->conn();
409 2         14 my $reference = sprintf("%s::%s", ref ($conn), $program);
410            
411 2         6 local $_[0] = $conn;
412              
413 2         3 goto &{$reference};
  2         32  
414             }
415              
416             1;
417              
418             package AnyEvent::ConnPool::Unit;
419             =head1 NAME
420              
421             AnyEvent::ConnPool::Unit
422              
423             =head1 DESCRIPTION
424              
425             Connection unit. Just wrapper around user-specified connection.
426             Required for transactions support.
427              
428             =head1 METHODS
429              
430             =over
431              
432             =cut
433              
434 6     6   28 use strict;
  6         6  
  6         213  
435 6     6   25 use warnings;
  6         7  
  6         1516  
436              
437             sub new {
438 26     26   66 my ($class, $object, %opts) = @_;
439              
440 26         72 my ($index, $constructor) = ($opts{index}, $opts{constructor});
441              
442 26         63 my $unit = {
443             _conn => $object,
444             _locked => 0,
445             _index => $index,
446             _constructor => $constructor,
447             };
448              
449 26         39 bless $unit, $class;
450 26         41 return $unit;
451             }
452              
453              
454             =item B
455              
456             Returns connection from unit object.
457              
458             =cut
459              
460             sub conn {
461 123     123   11066 my $self = shift;
462 123         248 return $self->{_conn};
463             }
464              
465              
466             =item B
467              
468             Locks current connection. After that connection shouldn't be used in balancing mechanism and never will be
469             returned from pool. To unlock connection you should use unlock method.
470              
471             $connection->lock();
472              
473             =cut
474              
475             sub lock {
476 3     3   7 my ($self) = @_;
477              
478 3         7 $self->{_locked} = 1;
479 3         3 return 1;
480             }
481              
482              
483             =item B
484              
485             Unlocks connection and returns it to the balancing scheme.
486              
487             $connection->unlock();
488              
489             =cut
490              
491             sub unlock {
492 0     0   0 my ($self) = @_;
493              
494 0         0 delete $self->{_locked};
495 0         0 return 1;
496             }
497              
498              
499             =item B
500              
501             Returns true if connection is locked.
502              
503             if ($connection->locked()) {
504             ...
505             }
506              
507             =cut
508              
509             sub locked {
510 107     107   94 my ($self) = @_;
511            
512 107         243 return $self->{_locked};
513             }
514              
515              
516             sub index {
517 0     0   0 my ($self) = @_;
518 0         0 return $self->{_index};
519             }
520              
521              
522             sub reconnect {
523 1     1   4 my ($self) = @_;
524              
525 1 50       4 if ($self->{_constructor}) {
526 1         3 $self->{_conn} = $self->{_constructor}->();
527             }
528 1         4 return 1;
529             }
530             =back
531             =cut
532              
533             1;
534