File Coverage

blib/lib/Database/Async.pm
Criterion Covered Total %
statement 66 125 52.8
branch 8 24 33.3
condition 2 15 13.3
subroutine 21 37 56.7
pod 15 20 75.0
total 112 221 50.6


line stmt bran cond sub pod time code
1             package Database::Async;
2             # ABSTRACT: database interface for use with IO::Async
3 2     2   171267 use strict;
  2         23  
  2         61  
4 2     2   10 use warnings;
  2         4  
  2         84  
5              
6             our $VERSION = '0.016';
7              
8 2     2   923 use parent qw(Database::Async::DB IO::Async::Notifier);
  2         628  
  2         24  
9              
10             =head1 NAME
11              
12             Database::Async - provides a database abstraction layer for L
13              
14             =head1 SYNOPSIS
15              
16             # Just looking up one thing?
17             my ($id) = $db->query(
18             q{select id from some_table where name = ?},
19             bind => ['some name']
20             )->single
21             # This is an example, so we want the result immediately - in
22             # real async code, you'd rarely call Future->get, but would
23             # typically use `->then` or `->on_done` instead
24             ->get;
25             # or, with Future::AsyncAwait, try:
26             my ($id) = await $db->query(
27             q{select id from some_table where name = ?},
28             bind => ['some name']
29             )->single;
30              
31             # Simple query
32             $db->query(q{select id, some_data from some_table})
33             ->row_hashrefs
34             ->each(sub {
35             printf "ID %d, data %s\n", $_->{id}, $_->{some_data};
36             })
37             # If you want to complete the full query, don't forget to call
38             # ->get or ->retain here!
39             ->retain;
40              
41             # Transactions
42             $db->transaction(sub {
43             my ($tx) = @_;
44             })->commit
45             # This returns a Future, so if you want to wait for it to complete,
46             # call `->get` (throws an exception if something goes wrong)
47             # or `->await` (just waits for it to succeed or fail, but ignores
48             # the result).
49             ->get;
50              
51             =head1 DESCRIPTION
52              
53             Database support for L. This is the base API, see L
54             and subclasses for specific database functionality.
55              
56             B.
57              
58             L provides a basic API for interacting with a database, but this is
59             very low level and uses a synchronous design. See L if you're
60             familiar with L and want an interface that follows it more closely.
61              
62             Typically a database only allows a single query to run at a time.
63             Other queries will be queued.
64              
65             Set up a pool of connections to provide better parallelism:
66              
67             my $dbh = Database::Async->new(
68             uri => 'postgresql://write@maindb/dbname?sslmode=require',
69             pool => {
70             max => 4,
71             },
72             );
73              
74             Queries and transactions will then automatically be distributed
75             among these connections. However, note that:
76              
77             =over 4
78              
79             =item * all queries within a transaction will be made on the same connection
80              
81             =item * ordering guarantees are weaker: queries will be started in
82             order on the next available connection
83              
84             =back
85              
86             With a single connection, you could expect:
87              
88             Future->needs_all(
89             $dbh->do(q{insert into x ...}),
90             $dbh->do(q{select from x ...})
91             );
92              
93             to insert the rows first, then return them in the C
94              
95             =head2 Pool configuration
96              
97             The following parameters are currently accepted for defining the pool:
98              
99             =over 4
100              
101             =item * C - minimum number of total connections to maintain, defaults to 0
102              
103             =item * C - maximum permitted active connections, default is 1
104              
105             =item * C - how to iterate through the available URIs, options include
106             C and C (default, round-robin behaviour).
107              
108             =item * C - algorithm for managing connection timeouts or failures. The default
109             is an exponential backoff with 10ms initial delay, 30s maximum, resetting on successful
110             connection.
111              
112             =back
113              
114             See L for more details.
115              
116             =head2 DBI
117              
118             The interface is not the same as L, but here are some approximate equivalents for
119             common patterns:
120              
121             =head3 selectall_hashref
122              
123             In L:
124              
125             print $_->{id} . "\n" for
126             $dbh->selectall_hashref(
127             q{select * from something where id = ?},
128             undef,
129             $id
130             )->@*;
131              
132             In L:
133              
134             print $_->{id} . "\n" for
135             $db->query(
136             q{select * from something where id = ?},
137             bind => [
138             $id
139             ])->row_hashrefs
140             ->as_arrayref
141             ->@*
142              
143             In L:
144              
145             my $sth = $dbh->prepare(q{select * from something where id = ?});
146             for my $id (1, 2, 3) {
147             $sth->bind(0, $id, 'bigint');
148             $sth->execute;
149             while(my $row = $sth->fetchrow_hashref) {
150             print $row->{name} . "\n";
151             }
152             }
153              
154             In L:
155              
156             my $sth = $db->prepare(q{select * from something where id = ?});
157             (Future::Utils::fmap_void {
158             my ($id) = @_;
159             $sth->bind(0, $id, 'bigint')
160             ->then(sub { $sth->execute })
161             ->then(sub {
162             $sth->row_hashrefs
163             ->each(sub {
164             print $_->{name} . "\n";
165             })->completed
166             })
167             } foreach => [1, 2, 3 ])->get;
168              
169             =cut
170              
171 2     2   9074 use mro;
  2         4  
  2         19  
172 2     2   1036 no indirect;
  2         2245  
  2         10  
173              
174 2     2   613 use Future::AsyncAwait;
  2         3235  
  2         12  
175 2     2   1141 use Syntax::Keyword::Try;
  2         2121  
  2         12  
176              
177 2     2   1296 use URI;
  2         9998  
  2         80  
178 2     2   996 use URI::db;
  2         24307  
  2         79  
179 2     2   1040 use Module::Load ();
  2         2445  
  2         67  
180 2     2   16 use Scalar::Util qw(blessed);
  2         6  
  2         117  
181              
182 2     2   872 use Database::Async::Engine;
  2         6  
  2         72  
183 2     2   871 use Database::Async::Pool;
  2         8  
  2         83  
184 2     2   1007 use Database::Async::Query;
  2         39  
  2         96  
185 2     2   16 use Database::Async::StatementHandle;
  2         5  
  2         57  
186 2     2   1056 use Database::Async::Transaction;
  2         6  
  2         81  
187              
188 2     2   13 use Log::Any qw($log);
  2         4  
  2         8  
189              
190             =head1 METHODS
191              
192             =cut
193              
194             =head2 transaction
195              
196             Resolves to a L which will yield a L
197             instance once ready.
198              
199             =cut
200              
201 0     0 1 0 async sub transaction {
202 0         0 my ($self, @args) = @_;
203             Scalar::Util::weaken(
204 0         0 $self->{transactions}[@{$self->{transactions}}] =
  0         0  
205             my $txn = Database::Async::Transaction->new(
206             database => $self,
207             @args
208             )
209             );
210 0         0 await $txn->begin;
211 0         0 return $txn;
212             }
213              
214             =head2 txn
215              
216             Executes code within a transaction. This is meant as a shorter form of
217             the common idiom
218              
219             $db->transaction
220             ->then(sub {
221             my ($txn) = @_;
222             Future->call($code)
223             ->then(sub {
224             $txn->commit
225             })->on_fail(sub {
226             $txn->rollback
227             });
228             })
229              
230             The code must return a L, and the transaction will only be committed
231             if that L resolves cleanly.
232              
233             Returns a L which resolves once the transaction is committed.
234              
235             =cut
236              
237 0     0 1 0 async sub txn {
238 0         0 my ($self, $code, @args) = @_;
239 0         0 my $txn = await $self->transaction;
240             try {
241             my @data = await Future->call(
242             $code => ($txn, @args)
243             );
244             await $txn->commit;
245             return @data;
246 0         0 } catch {
247             my $exception = $@;
248             try {
249             await $txn->rollback;
250             } catch {
251             $log->warnf("exception %s in rollback", $@);
252             }
253             die $exception;
254             }
255             }
256              
257             =head1 METHODS - Internal
258              
259             You're welcome to call these, but they're mostly intended
260             for internal usage, and the API B change in future versions.
261              
262             =cut
263              
264             =head2 uri
265              
266             Returns the configured L for populating database instances.
267              
268             =cut
269              
270 3     3 1 96 sub uri { shift->{uri} }
271              
272             =head2 pool
273              
274             Returns the L instance.
275              
276             =cut
277              
278             sub pool {
279 16     16 1 2085 my ($self) = @_;
280 16   66     114 $self->{pool} //= Database::Async::Pool->new(
281             $self->pool_args
282             )
283             }
284              
285             =head2 pool_args
286              
287             Returns a list of standard pool constructor arguments.
288              
289             =cut
290              
291             sub pool_args {
292 3     3 1 10 my ($self) = @_;
293             return (
294 3         35 request_engine => $self->curry::weak::request_engine,
295             uri => $self->uri,
296             );
297             }
298              
299             =head2 configure
300              
301             Applies configuration, see L for details.
302              
303             Supports the following named parameters:
304              
305             =over 4
306              
307             =item * C - the endpoint to use when connecting a new engine instance
308              
309             =item * C - the parameters to pass when instantiating a new L
310              
311             =item * C - parameters for setting up the pool, or a L instance
312              
313             =item * C - default encoding to apply to parameters, queries and results, defaults to C
314              
315             =back
316              
317             =cut
318              
319             my %encoding_map = (
320             'utf8' => 'UTF-8',
321             'utf-8' => 'UTF-8',
322             'UTF8' => 'UTF-8',
323             'unicode' => 'UTF-8',
324             );
325              
326             sub configure {
327 5     5 1 6627 my ($self, %args) = @_;
328              
329 5 50       21 if(my $encoding = delete $args{encoding}) {
330 0   0     0 $self->{encoding} = $encoding_map{$encoding} // $encoding;
331             }
332              
333 5 50       14 if(my $uri = delete $args{uri}) {
334             # This could be any type of object. We make
335             # the assumption here that it safely serialises
336             # to a standard URI. Some of the database
337             # engines provide such a standard (e.g. PostgreSQL).
338             # Others may not...
339 0         0 $self->{uri} = URI->new("$uri");
340             }
341 5 50       15 if(exists $args{engine}) {
342 0         0 $self->{engine_parameters} = delete $args{engine};
343             }
344 5 50       12 if(exists $args{type}) {
345 0         0 $self->{type} = delete $args{type};
346             }
347 5 100       16 if(my $pool = delete $args{pool}) {
348 3 100       15 if(blessed $pool) {
349 1         3 $self->{pool} = $pool;
350             } else {
351 2         8 $self->{pool} = Database::Async::Pool->new(
352             $self->pool_args,
353             %$pool,
354             );
355             }
356             }
357 5         24 $self->next::method(%args);
358             }
359              
360 0     0 1   sub encoding { shift->{encoding} }
361              
362             =head2 ryu
363              
364             A L instance, used for requesting sources, sinks and timers.
365              
366             =cut
367              
368             sub ryu {
369 0     0 1   my ($self) = @_;
370 0   0       $self->{ryu} //= do {
371 0           $self->add_child(
372             my $ryu = Ryu::Async->new
373             );
374 0           $ryu
375             }
376             }
377              
378             =head2 new_source
379              
380             Instantiates a new L.
381              
382             =cut
383              
384 0     0 1   sub new_source { shift->ryu->source }
385              
386             =head2 new_sink
387              
388             Instantiates a new L.
389              
390             =cut
391              
392 0     0 1   sub new_sink { shift->ryu->sink }
393              
394             =head2 new_future
395              
396             Instantiates a new L.
397              
398             =cut
399              
400 0     0 1   sub new_future { shift->loop->new_future }
401              
402             =head1 METHODS - Internal, engine-related
403              
404             =cut
405              
406             =head2 request_engine
407              
408             Attempts to instantiate and connect to a new L
409             subclass. Returns a L which should resolve to a new
410             L instance when ready to use.
411              
412             =cut
413              
414 0     0 1   async sub request_engine {
415 0           my ($self) = @_;
416 0           $log->tracef('Requesting new engine');
417 0           my $engine = $self->engine_instance;
418 0           $log->tracef('Connecting');
419 0           return await $engine->connect;
420             }
421              
422             =head2 engine_instance
423              
424             Loads the appropriate engine class and attaches to the loop.
425              
426             =cut
427              
428             sub engine_instance {
429 0     0 1   my ($self) = @_;
430 0           my $uri = $self->uri;
431 0   0       my $type = $self->{type} // $uri->scheme;
432             die 'unknown database type ' . $type
433 0 0         unless my $engine_class = $Database::Async::Engine::ENGINE_MAP{$type};
434 0 0         Module::Load::load($engine_class) unless $engine_class->can('new');
435 0           $log->tracef('Instantiating new %s', $engine_class);
436             my %param = (
437 0 0         %{$self->{engine_parameters} || {}},
  0 0          
438             (defined($uri) ? (uri => $uri) : ()),
439             db => $self,
440             );
441              
442             # Only recent engine versions support this parameter
443 0 0         if(my $encoding = $self->encoding) {
444 0 0         if($engine_class->can('encoding')) {
445 0           $param{encoding} = $self->encoding;
446             } else {
447             # If we're given this parameter, let's not ignore it silently
448 0           die 'Database engine ' . $engine_class . ' does not support encoding parameter, try upgrading that module from CPAN or remove the encoding configuration in Database::Async';
449             }
450             }
451              
452             $self->add_child(
453 0           my $engine = $engine_class->new(%param)
454             );
455 0           $engine;
456             }
457              
458             =head2 engine_ready
459              
460             Called by L instances when the engine is
461             ready for queries.
462              
463             =cut
464              
465             sub engine_ready {
466 0     0 1   my ($self, $engine) = @_;
467 0           $self->pool->queue_ready_engine($engine);
468             }
469              
470             sub engine_disconnected {
471 0     0 0   my ($self, $engine) = @_;
472 0           $self->pool->unregister_engine($engine);
473             }
474              
475 0     0 0   sub db { shift }
476              
477             =head2 queue_query
478              
479             Assign the given query to the next available engine instance.
480              
481             =cut
482              
483 0     0 1   async sub queue_query {
484 0           my ($self, $query) = @_;
485 0           $log->tracef('Queuing query %s', $query);
486 0           my $engine = await $self->pool->next_engine;
487 0           $log->tracef('Query %s about to run on %s', $query, $engine);
488 0           return await $engine->handle_query($query);
489             }
490              
491             sub diagnostics {
492 0     0 0   my ($self) = @_;
493             }
494              
495             sub notification {
496 0     0 0   my ($self, $engine, $channel, $data) = @_;
497 0           $log->tracef('Database notifies us via %s of %s', $channel, $data);
498 0           $self->notification_source($channel)->emit($data);
499             }
500              
501             sub notification_source {
502 0     0 0   my ($self, $name) = @_;
503 0   0       $self->{notification_source}{$name} //= $self->new_source;
504             }
505              
506             1;
507              
508             __END__