File Coverage

blib/lib/AnyEvent/ConnPool.pm
Criterion Covered Total %
statement 147 179 82.1
branch 37 54 68.5
condition 6 19 31.5
subroutine 27 32 84.3
pod 6 10 60.0
total 223 294 75.8


line stmt bran cond sub pod time code
1             =head1 NAME
2              
3             AnyEvent::ConnPool
4              
5             =head1 DESCRIPTION
6              
7             Simple connections pool designed for asynchronous connections
8              
9             =head1 METHODS
10              
11             =over
12              
13             =cut
14              
15             package AnyEvent::ConnPool;
16 6     6   43080 use strict;
  6         12  
  6         232  
17 6     6   27 use warnings;
  6         8  
  6         148  
18              
19 6     6   5353 use AnyEvent;
  6         25698  
  6         197  
20 6     6   41 use Carp;
  6         13  
  6         676  
21              
22             our $VERSION = 0.13;
23              
24             my $PID;
25              
26             BEGIN {
27 6     6   6630 $PID = $$;
28             }
29              
30              
31             =item B
32              
33             Returns new pool object
34              
35             AnyEvent::ConnPool->new(
36             constructor => sub {
37             return generate_connection();
38             },
39             check => {
40             cb => sub {
41             my $connection = shift;
42             ...
43             if ($connection->conn()->ping()) {
44             return 1;
45             }
46             return 0;
47             },
48             interval => 10,
49             },
50             size => 5,
51             init => 1,
52             );
53              
54              
55             constructor => subroutine, which generates connection for pool.
56              
57             check => pingers, allows to specify methods and interval for connection state validation.
58             check->{cb} => callback, used for ping connection. You should implement this logic by yourself.
59             If you need reconnect, you can just call
60              
61             $connection->reconnect();
62              
63             check->{interval} => interval for the callback.
64              
65             size => how many connections should be created on pool initialization.
66              
67             init => initialize connections on pool construction.
68              
69             dispatcher => returns dispatcher object instead of pool object. You can use dispatcher as wrapper around your connection. In that case pool will use
70             all it's features behind the scene. You can get dispatcher not only from constructor, you can get it from pool with dispatcher method. For example:
71            
72             # with your connection
73             $connection->selectall_arrayref(...);
74              
75             # pooled
76             my $dispatcher = $connpool->dispatcher();
77             # same thing as selectall_arrayref above, but with connpool behind the scene.
78             $dispatcher->selectall_arrayref(...);
79             # you can always get connpool from dispatcher:
80             my $pool = AnyEvent::ConnPool->pool_from_dispatcher($dispatcher);
81              
82              
83             =cut
84              
85             sub new {
86 5     5 1 3623 my ($class, %opts) = @_;
87              
88 0     0   0 my $self = {
89             constructor => sub {1;},
90 5         43 _payload => [],
91             index => 0,
92             init => 0,
93             count => 0,
94             };
95              
96 5 50       19 if (!$opts{constructor}) {
97 0         0 croak "Missing mandatory param constructor.";
98             }
99              
100 5 50       22 if (ref $opts{constructor} ne 'CODE') {
101 0         0 croak "Constructor should be a code reference.";
102             }
103              
104 5 50 33     24 if ($opts{dispatcher} && !$opts{init}) {
105 0         0 croak "Can't get dispatcher on uninitialized pool";
106             }
107              
108 5         13 $self->{constructor} = $opts{constructor};
109              
110 5 100       25 if ($opts{check}) {
111 1 50       4 if (ref $opts{check} ne 'HASH') {
112 0         0 croak "Check param should be a hash reference.";
113             }
114 1 50       3 if (!$opts{check}->{cb}) {
115 0         0 croak 'Missing cb param.';
116             }
117            
118 1 50       4 if (ref $opts{check}->{cb} ne 'CODE') {
119 0         0 croak 'Cb param should be a code reference.';
120             }
121 1 50       4 if (!$opts{check}->{interval}) {
122 0         0 croak 'Missing interval param.';
123             }
124             # TODO: Add interval parameter validation
125            
126 1         2 $self->{check} = $opts{check};
127             }
128              
129 5 50       16 if ($opts{size}) {
130             #TODO: add validation for size
131              
132 5         11 $self->{size} = $opts{size};
133             }
134              
135 5         10 bless $self, $class;
136              
137 5 100       15 if ($opts{init}) {
138 4         16 $self->init();
139             }
140            
141 5 50       12 if ($opts{dispatcher}) {
142 0         0 return $self->dispatcher();
143             }
144 5         14 return $self;
145             }
146              
147              
148             =item B
149              
150             Initializes pool.
151              
152             =cut
153              
154             sub init {
155 5     5 1 13 my ($self, $conn_count) = @_;
156            
157 5 50       35 if ($self->{init}) {
158 0         0 croak "Can't initialize already initilized pool.";
159             }
160            
161 5   33     35 $conn_count ||= delete $self->{size};
162              
163 5 50       14 unless ($conn_count) {
164 0         0 croak "Can't initilize empty pool";
165             }
166            
167 5         18 for (1 .. $conn_count) {
168 26         56 $self->add();
169             }
170            
171 5 100       20 if ($self->{check}) {
172 1         1 my $guard; $guard = AnyEvent->timer (
173             after => $self->{check}->{interval},
174             interval => $self->{check}->{interval},
175             cb => sub {
176 1     1   999177 my $temp_guard = $guard;
177 1         15 for (my $i = 0; $i < $self->{count}; $i++) {
178 3         7 my $conn = $self->{_payload}->[$i];
179             eval {
180 3         12 $self->{check}->{cb}->($conn);
181 3         110 1;
182 3 50       3 } or do {
183 0         0 carp "Error occured: $@";
184             };
185             }
186             },
187 1         11 );
188             }
189              
190 5         22 $self->{init} = 1;
191 5         9 return 1;
192             }
193              
194              
195             =item B
196              
197             Returns dispatcher object instead of pool object. It allows you to call connection's method directly, if you don't care about pool mechanism.
198             And it's simple.
199              
200             my $dispatcher = $connpool->dispatcher();
201             $dispatcher->selectall_arrayref(...);
202             # equivalent to:
203             $connpool->get()->conn()->selectall_arrayref(...);
204              
205             =cut
206              
207             sub dispatcher {
208 1     1 1 5 my ($self) = @_;
209              
210 1         2 my $dispatcher = {
211             _pool => $self,
212             };
213              
214 1         2 bless $dispatcher, 'AnyEvent::ConnPool::Dispatcher';
215 1         2 return $dispatcher;
216             }
217              
218              
219             =item B
220              
221             Returns pool object from dispatcher object. You can call it by 3 ways:
222              
223             my $pool = AnyEvent::ConnPool::pool_from_dispatcher($dispatcher);
224             my $pool = AnyEvent::ConnPool->pool_from_dispatcher($dispatcher);
225             my $pool = $connpool->pool_from_dispatcher($dispatcher);
226              
227             =cut
228              
229             sub pool_from_dispatcher {
230 3     3 1 742 my ($p1, $p2) = @_;
231 3         3 my $dispatcher = undef;
232              
233 3 50       10 if ($p1) {
234             # $p1 is just string, called as AnyEvent::ConnPool->dispatcher_from_pool($dispatcher)
235             # so, dispatcher is in $p2
236 3 100       8 if (!ref $p1) {
    100          
237 1         3 $dispatcher = $p2;
238             }
239             elsif (ref $p1 eq __PACKAGE__) {
240 1         2 $dispatcher = $p2;
241             }
242             else {
243 1         2 $dispatcher = $p1;
244             }
245             }
246              
247 3   33     6 $dispatcher ||= $p2;
248 3 50 33     15 if (!$dispatcher || ref $dispatcher ne 'AnyEvent::ConnPool::Dispatcher') {
249 0         0 croak "No dispatcher";
250             }
251            
252 3         5 return $dispatcher->{_pool};
253             }
254              
255              
256             =item B
257              
258             Adds connection to the pool.
259              
260             =cut
261              
262             sub add {
263 26     26 1 27 my ($self, $count) = @_;
264            
265             # TODO: add count support
266 26         47 my $conn = $self->{constructor}->();
267 26         190 my $unit = AnyEvent::ConnPool::Unit->new($conn,
268             index => $self->{count},
269             constructor => $self->{constructor},
270             );
271              
272 26         42 $self->_add_object_raw($unit);
273             }
274              
275              
276             =item B
277              
278             Returns AnyEvent::ConnPool::Unit object from the pool.
279              
280             my $unit = $pool->get();
281             my $connection = $unit->conn();
282              
283             =cut
284              
285             sub get {
286 67     67 1 233 my ($self, $index) = @_;
287            
288 67 100       121 if (defined $index) {
289 3         8 return $self->{_payload}->[$index];
290             }
291              
292 64 100       136 if ($self->{index} + 1 > $self->{count}) {
293 2         3 $self->{index} = 0;
294             }
295              
296 64         86 my $retval = $self->{_payload}->[$self->{index}];
297            
298 64 100       106 if ($retval->locked()) {
299 15         25 $self->{locked}->{$self->{index}} = 1;
300 15         26 $retval = $self->get_free_connection($self->{index});
301             }
302             else {
303 49         76 delete $self->{locked}->{$self->{index}};
304             }
305              
306 64 50       89 if (wantarray) {
307 0         0 $self->{index}++;
308 0         0 return ($index, $retval);
309             }
310             else {
311 64         76 $self->{index}++;
312 64         110 return $retval;
313             }
314             }
315              
316              
317             sub grow {
318 0     0 0 0 my ($self, $count) = @_;
319              
320 0   0     0 $count ||= 1;
321 0         0 for (1 .. $count) {
322 0         0 $self->add();
323             }
324 0         0 return 1;
325             }
326              
327              
328             sub shrink {
329 0     0 0 0 my ($self, $count) = @_;
330              
331 0   0     0 $count ||= 1;
332 0         0 for (1 .. $count) {
333 0         0 pop @{$self->{_payload}};
  0         0  
334             }
335 0         0 return 1;
336             }
337              
338              
339             # utility functions
340              
341             sub get_free_connection {
342 15     15 0 20 my ($self, $desired_index) = @_;
343            
344 15         14 my $retval = undef;
345 15         20 my @balanced_array = $self->balance_array($desired_index);
346              
347 15         27 for my $i (@balanced_array) {
348 44         47 my $conn = $self->{_payload}->[$i];
349 44 100       55 unless ($conn->locked) {
350 15         12 $retval = $conn;
351 15         17 $self->{index} = $i;
352 15         19 last;
353             }
354             }
355 15         54 return $retval;
356            
357             }
358              
359              
360             sub balance_array {
361 15     15 0 17 my ($self, $index) = @_;
362              
363 15         18 my $count = $self->{count};
364 15         11 $index++;
365 15         14 $count--;
366              
367 15 100 66     61 if ($index == 0 || $index >= $count) {
368 14         36 return (0 .. $count);
369             }
370              
371 1         8 return (($index .. $count), 0 .. $index - 1);
372             }
373              
374              
375             sub _add_object_raw {
376 26     26   24 my ($self, $object, $position) = @_;
377            
378 26 50       39 if (defined $position) {
379 0         0 $self->{_payload}->[$self->{index}] = $object;
380             }
381             else {
382 26         21 push @{$self->{_payload}}, $object;
  26         40  
383             }
384              
385 26         21 $self->{count} = scalar @{$self->{_payload}};
  26         36  
386 26         43 return 1;
387             }
388              
389              
390             =back
391             =cut
392              
393             1;
394              
395             package AnyEvent::ConnPool::Dispatcher;
396 6     6   37 use strict;
  6         9  
  6         300  
397 6     6   31 no strict qw/refs/;
  6         7  
  6         195  
398 6     6   29 use warnings;
  6         7  
  6         189  
399 6     6   22 use Carp;
  6         7  
  6         1207  
400              
401             our $AUTOLOAD;
402             sub AUTOLOAD {
403 3     3   700 my ($d, @params) = @_;
404              
405 3         4 my $program = $AUTOLOAD;
406 3         13 $program =~ s/.*:://;
407            
408 3         10 my $conn = $d->{_pool}->get()->conn();
409 3         11 my $reference = sprintf("%s::%s", ref ($conn), $program);
410            
411 3         4 shift;
412 3         4 unshift @_, $conn;
413              
414 3         3 goto &{$reference};
  3         23  
415             }
416              
417             1;
418              
419             package AnyEvent::ConnPool::Unit;
420             =head1 NAME
421              
422             AnyEvent::ConnPool::Unit
423              
424             =head1 DESCRIPTION
425              
426             Connection unit. Just wrapper around user-specified connection.
427             Required for transactions support.
428              
429             =head1 METHODS
430              
431             =over
432              
433             =cut
434              
435 6     6   32 use strict;
  6         7  
  6         269  
436 6     6   27 use warnings;
  6         7  
  6         1504  
437              
438             sub new {
439 26     26   54 my ($class, $object, %opts) = @_;
440              
441 26         96 my ($index, $constructor) = ($opts{index}, $opts{constructor});
442              
443 26         57 my $unit = {
444             _conn => $object,
445             _locked => 0,
446             _index => $index,
447             _constructor => $constructor,
448             };
449              
450 26         34 bless $unit, $class;
451 26         40 return $unit;
452             }
453              
454              
455             =item B
456              
457             Returns connection from unit object.
458              
459             =cut
460              
461             sub conn {
462 124     124   6370 my $self = shift;
463 124         245 return $self->{_conn};
464             }
465              
466              
467             =item B
468              
469             Locks current connection. After that connection shouldn't be used in balancing mechanism and never will be
470             returned from pool. To unlock connection you should use unlock method.
471              
472             $connection->lock();
473              
474             =cut
475              
476             sub lock {
477 3     3   11 my ($self) = @_;
478              
479 3         10 $self->{_locked} = 1;
480 3         5 return 1;
481             }
482              
483              
484             =item B
485              
486             Unlocks connection and returns it to the balancing scheme.
487              
488             $connection->unlock();
489              
490             =cut
491              
492             sub unlock {
493 0     0   0 my ($self) = @_;
494              
495 0         0 delete $self->{_locked};
496 0         0 return 1;
497             }
498              
499              
500             =item B
501              
502             Returns true if connection is locked.
503              
504             if ($connection->locked()) {
505             ...
506             }
507              
508             =cut
509              
510             sub locked {
511 108     108   95 my ($self) = @_;
512            
513 108         260 return $self->{_locked};
514             }
515              
516              
517             sub index {
518 0     0   0 my ($self) = @_;
519 0         0 return $self->{_index};
520             }
521              
522              
523             sub reconnect {
524 1     1   4 my ($self) = @_;
525              
526 1 50       3 if ($self->{_constructor}) {
527 1         3 $self->{_conn} = $self->{_constructor}->();
528             }
529 1         3 return 1;
530             }
531             =back
532             =cut
533              
534             1;
535