File Coverage

blib/lib/DBIx/Poggy.pm
Criterion Covered Total %
statement 12 14 85.7
branch n/a
condition n/a
subroutine 5 5 100.0
pod n/a
total 17 19 89.4


line stmt bran cond sub pod time code
1 1     1   379 use strict;
  1         1  
  1         22  
2 1     1   2 use warnings;
  1         1  
  1         20  
3 1     1   7 use v5.14;
  1         8  
4              
5             package DBIx::Poggy;
6             our $VERSION = '0.06';
7              
8 1     1   3 use Scalar::Util qw(weaken);
  1         1  
  1         66  
9              
10             =head1 NAME
11              
12             DBIx::Poggy - async Pg with AnyEvent and Promises
13              
14             =head1 SYNOPSIS
15              
16             use strict;
17             use warnings;
18              
19             use DBIx::Poggy;
20             my $pool = DBIx::Poggy->new( pool_size => 5 );
21             $pool->connect('dbi:Pg:db=test', 'root', 'password');
22              
23             use AnyEvent;
24             my $cv = AnyEvent->condvar;
25              
26             my $res;
27             $pool->take->selectrow_arrayref(
28             'SELECT * FROM users WHERE name = ?', {}, 'ruz'
29             )
30             ->then(sub {
31             my $user = $res->{user} = shift;
32              
33             return $pool->take->selectall_arrayref(
34             'SELECT * FROM friends WHERE user_id = ?', undef, $user->{id}
35             );
36             })
37             ->then(sub {
38             my $friends = $res->{friends} = shift;
39             ...
40             })
41             ->catch(sub {
42             my $error = shift;
43             die $error;
44             })
45             ->finally(sub {
46             $cv->send( $res );
47             });
48              
49             $cv->recv;
50              
51             =head1 DESCRIPTION
52              
53             "Async" postgres as much as L allows with L instead of callbacks.
54              
55             You get DBI interface you used to that returns promises, connections pool, queries
56             queuing and support of transactions.
57              
58             =head2 Why pool?
59              
60             DBD::Pg is not async, it's non blocking. Every connection can execute only one query
61             at a moment, so to execute several queries in parallel you need several connections.
62             What you get is you can do something in Perl side while postgres crunches data for
63             you.
64              
65             =head2 Queue
66              
67             Usually if you attempt to run two queries on the same connection then DBI throws an
68             error about active query. Poggy takes care of that by queuing up queries you run on
69             one connection. Handy for transactions and pool doesn't grow too much.
70              
71             =head2 What is async here then?
72              
73             Only a queries on multiple connections, so if you need to execute many parallel
74             queries then you need many connections. pg_bouncer and haproxy are your friends.
75              
76             =cut
77              
78 1     1   321 use DBIx::Poggy::DBI;
  0            
  0            
79             use DBIx::Poggy::Error;
80              
81             =head1 METHODS
82              
83             =head2 new
84              
85             Named arguments:
86              
87             =over 4
88              
89             =item pool_size
90              
91             number of connections to create, creates one more in case all are busy
92              
93             =back
94              
95             Returns a new pool object.
96              
97             =cut
98              
99             sub new {
100             my $proto = shift;
101             my $self = bless { @_ }, ref($proto) || $proto;
102             return $self->init;
103             }
104              
105             sub init {
106             my $self = shift;
107             $self->{pool_size} ||= 10;
108             return $self;
109             }
110              
111             =head2 connect
112              
113             Takes the same arguments as L, opens "pool_size" connections.
114              
115             =cut
116              
117             sub connect {
118             my $self = shift;
119             my ($dsn, $user, $password, $opts) = @_;
120              
121             $opts ||= {};
122              
123             $self->{free} ||= [];
124              
125             $self->{connection_settings} = [ $dsn, $user, $password, $opts ];
126              
127             $self->_connect for 1 .. $self->{pool_size};
128             return $self;
129             }
130              
131             sub _connect {
132             my $self = shift;
133              
134             my $dbh = DBIx::Poggy::DBI->connect(
135             @{ $self->{connection_settings} }
136             ) or die DBIx::Poggy::Error->new( 'DBIx::Poggy::DBI' );
137             push @{$self->{free}}, $dbh;
138              
139             return;
140             }
141              
142             =head2 take
143              
144             Gives one connection from the pool. Takes arguments:
145              
146             =over 4
147              
148             =item auto
149              
150             Connection will be released to the pool after transaction or
151             as soon as query queue becomes empty. True by default.
152              
153             =back
154              
155             Returns L handle. When "auto" is turned off
156             then in list context returns also guard object that will L
157             handle to the pool on destruction.
158              
159             =cut
160              
161             sub take {
162             my $self = shift;
163             my (%args) = (auto => 1, @_);
164             unless ( $self->{free} ) {
165             die DBIx::Poggy::Error->new(
166             err => 666,
167             errstr => 'Attempt to take a connection from not initialized pool',
168             );
169             }
170             unless ( @{ $self->{free} } ) {
171             warn "DB pool exhausted, creating a new connection";
172             $self->_connect;
173             }
174              
175             my $dbh = shift @{ $self->{free} };
176             if ( $args{auto} ) {
177             $dbh->{private_poggy_state}{release_to} = $self;
178             weaken $dbh->{private_poggy_state}{release_to};
179             return $dbh;
180             }
181             return $dbh unless wantarray;
182             return ( $dbh, guard { $self->release($dbh) } );
183             }
184              
185             =head2 release
186              
187             Takes a handle as argument and puts it back into the pool. At the moment,
188             no protection against double putting or active queries on the handle.
189              
190             =cut
191              
192             sub release {
193             my $self = shift;
194             push @{ $self->{free} }, shift;
195             return $self;
196             }
197              
198             =head2 AUTHOR
199              
200             Ruslan U. Zakirov ERuslan.Zakirov@gmail.comE
201              
202             =head2 LICENSE
203              
204             Under the same terms as perl itself.
205              
206             =cut
207              
208             1;