line
stmt
bran
cond
sub
pod
time
code
1
package Database::Async;
2
# ABSTRACT: database interface for use with IO::Async
3
2
2
186796
use strict;
2
26
2
66
4
2
2
12
use warnings;
2
3
2
99
5
6
our $VERSION = '0.015';
7
8
2
2
1059
use parent qw(Database::Async::DB IO::Async::Notifier);
2
686
2
13
9
10
=head1 NAME
11
12
Database::Async - provides a database abstraction layer for L
13
14
=head1 SYNOPSIS
15
16
# Just looking up one thing?
17
my ($id) = $db->query(
18
q{select id from some_table where name = ?},
19
bind => ['some name']
20
)->single
21
# This is an example, so we want the result immediately - in
22
# real async code, you'd rarely call Future->get, but would
23
# typically use `->then` or `->on_done` instead
24
->get;
25
# or, with Future::AsyncAwait, try:
26
my ($id) = await $db->query(
27
q{select id from some_table where name = ?},
28
bind => ['some name']
29
)->single;
30
31
# Simple query
32
$db->query(q{select id, some_data from some_table})
33
->row_hashrefs
34
->each(sub {
35
printf "ID %d, data %s\n", $_->{id}, $_->{some_data};
36
})
37
# If you want to complete the full query, don't forget to call
38
# ->get or ->retain here!
39
->retain;
40
41
# Transactions
42
$db->transaction(sub {
43
my ($tx) = @_;
44
})->commit
45
# This returns a Future, so if you want to wait for it to complete,
46
# call `->get` (throws an exception if something goes wrong)
47
# or `->await` (just waits for it to succeed or fail, but ignores
48
# the result).
49
->get;
50
51
=head1 DESCRIPTION
52
53
Database support for L. This is the base API, see L
54
and subclasses for specific database functionality.
55
56
B.
57
58
L provides a basic API for interacting with a database, but this is
59
very low level and uses a synchronous design. See L if you're
60
familiar with L and want an interface that follows it more closely.
61
62
Typically a database only allows a single query to run at a time.
63
Other queries will be queued.
64
65
Set up a pool of connections to provide better parallelism:
66
67
my $dbh = Database::Async->new(
68
uri => 'postgresql://write@maindb/dbname?sslmode=require',
69
pool => {
70
max => 4,
71
},
72
);
73
74
Queries and transactions will then automatically be distributed
75
among these connections. However, note that:
76
77
=over 4
78
79
=item * all queries within a transaction will be made on the same connection
80
81
=item * ordering guarantees are weaker: queries will be started in
82
order on the next available connection
83
84
=back
85
86
With a single connection, you could expect:
87
88
Future->needs_all(
89
$dbh->do(q{insert into x ...}),
90
$dbh->do(q{select from x ...})
91
);
92
93
to insert the rows first, then return them in the C call. B.
94
95
=head2 Pool configuration
96
97
The following parameters are currently accepted for defining the pool:
98
99
=over 4
100
101
=item * C - minimum number of total connections to maintain, defaults to 0
102
103
=item * C - maximum permitted active connections, default is 1
104
105
=item * C - how to iterate through the available URIs, options include
106
C and C (default, round-robin behaviour).
107
108
=item * C - algorithm for managing connection timeouts or failures. The default
109
is an exponential backoff with 10ms initial delay, 30s maximum, resetting on successful
110
connection.
111
112
=back
113
114
See L for more details.
115
116
=head2 DBI
117
118
The interface is not the same as L, but here are some approximate equivalents for
119
common patterns:
120
121
=head3 selectall_hashref
122
123
In L:
124
125
print $_->{id} . "\n" for
126
$dbh->selectall_hashref(
127
q{select * from something where id = ?},
128
undef,
129
$id
130
)->@*;
131
132
In L:
133
134
print $_->{id} . "\n" for
135
$db->query(
136
q{select * from something where id = ?},
137
bind => [
138
$id
139
])->row_hashrefs
140
->as_arrayref
141
->@*
142
143
In L:
144
145
my $sth = $dbh->prepare(q{select * from something where id = ?});
146
for my $id (1, 2, 3) {
147
$sth->bind(0, $id, 'bigint');
148
$sth->execute;
149
while(my $row = $sth->fetchrow_hashref) {
150
print $row->{name} . "\n";
151
}
152
}
153
154
In L:
155
156
my $sth = $db->prepare(q{select * from something where id = ?});
157
(Future::Utils::fmap_void {
158
my ($id) = @_;
159
$sth->bind(0, $id, 'bigint')
160
->then(sub { $sth->execute })
161
->then(sub {
162
$sth->row_hashrefs
163
->each(sub {
164
print $_->{name} . "\n";
165
})->completed
166
})
167
} foreach => [1, 2, 3 ])->get;
168
169
=cut
170
171
2
2
9535
use mro;
2
6
2
21
172
2
2
1360
no indirect;
2
2973
2
13
173
174
2
2
777
use Future::AsyncAwait;
2
3452
2
18
175
2
2
1346
use Syntax::Keyword::Try;
2
2177
2
11
176
177
2
2
1526
use URI;
2
10771
2
112
178
2
2
1499
use URI::db;
2
26395
2
92
179
2
2
1324
use Module::Load ();
2
2510
2
71
180
2
2
16
use Scalar::Util qw(blessed);
2
6
2
159
181
182
2
2
1006
use Database::Async::Engine;
2
5
2
87
183
2
2
916
use Database::Async::Pool;
2
7
2
89
184
2
2
1208
use Database::Async::Query;
2
9
2
149
185
2
2
21
use Database::Async::StatementHandle;
2
7
2
68
186
2
2
1184
use Database::Async::Transaction;
2
8
2
101
187
188
2
2
13
use Log::Any qw($log);
2
6
2
9
189
190
=head1 METHODS
191
192
=cut
193
194
=head2 transaction
195
196
Resolves to a L which will yield a L
197
instance once ready.
198
199
=cut
200
201
0
0
1
0
async sub transaction {
202
0
0
my ($self, @args) = @_;
203
Scalar::Util::weaken(
204
0
0
$self->{transactions}[@{$self->{transactions}}] =
0
0
205
my $txn = Database::Async::Transaction->new(
206
database => $self,
207
@args
208
)
209
);
210
0
0
await $txn->begin;
211
0
0
return $txn;
212
}
213
214
=head2 txn
215
216
Executes code within a transaction. This is meant as a shorter form of
217
the common idiom
218
219
$db->transaction
220
->then(sub {
221
my ($txn) = @_;
222
Future->call($code)
223
->then(sub {
224
$txn->commit
225
})->on_fail(sub {
226
$txn->rollback
227
});
228
})
229
230
The code must return a L, and the transaction will only be committed
231
if that L resolves cleanly.
232
233
Returns a L which resolves once the transaction is committed.
234
235
=cut
236
237
0
0
1
0
async sub txn {
238
0
0
my ($self, $code, @args) = @_;
239
0
0
my $txn = await $self->transaction;
240
try {
241
my @data = await Future->call(
242
$code => ($txn, @args)
243
);
244
await $txn->commit;
245
return @data;
246
0
0
} catch {
247
my $exception = $@;
248
try {
249
await $txn->rollback;
250
} catch {
251
$log->warnf("exception %s in rollback", $@);
252
}
253
die $exception;
254
}
255
}
256
257
=head1 METHODS - Internal
258
259
You're welcome to call these, but they're mostly intended
260
for internal usage, and the API B change in future versions.
261
262
=cut
263
264
=head2 uri
265
266
Returns the configured L for populating database instances.
267
268
=cut
269
270
3
3
1
109
sub uri { shift->{uri} }
271
272
=head2 pool
273
274
Returns the L instance.
275
276
=cut
277
278
sub pool {
279
16
16
1
2225
my ($self) = @_;
280
16
66
119
$self->{pool} //= Database::Async::Pool->new(
281
$self->pool_args
282
)
283
}
284
285
=head2 pool_args
286
287
Returns a list of standard pool constructor arguments.
288
289
=cut
290
291
sub pool_args {
292
3
3
1
10
my ($self) = @_;
293
return (
294
3
42
request_engine => $self->curry::weak::request_engine,
295
uri => $self->uri,
296
);
297
}
298
299
=head2 configure
300
301
Applies configuration, see L for details.
302
303
Supports the following named parameters:
304
305
=over 4
306
307
=item * C - the endpoint to use when connecting a new engine instance
308
309
=item * C - the parameters to pass when instantiating a new L
310
311
=item * C - parameters for setting up the pool, or a L instance
312
313
=item * C - default encoding to apply to parameters, queries and results, defaults to C
314
315
=back
316
317
=cut
318
319
my %encoding_map = (
320
'utf8' => 'UTF-8',
321
'utf-8' => 'UTF-8',
322
'UTF8' => 'UTF-8',
323
'unicode' => 'UTF-8',
324
);
325
326
sub configure {
327
5
5
1
6915
my ($self, %args) = @_;
328
329
5
50
28
if(my $encoding = delete $args{encoding}) {
330
0
0
0
$self->{encoding} = $encoding_map{$encoding} // $encoding;
331
}
332
333
5
50
24
if(my $uri = delete $args{uri}) {
334
# This could be any type of object. We make
335
# the assumption here that it safely serialises
336
# to a standard URI. Some of the database
337
# engines provide such a standard (e.g. PostgreSQL).
338
# Others may not...
339
0
0
$self->{uri} = URI->new("$uri");
340
}
341
5
50
19
if(exists $args{engine}) {
342
0
0
$self->{engine_parameters} = delete $args{engine};
343
}
344
5
50
19
if(exists $args{type}) {
345
0
0
$self->{type} = delete $args{type};
346
}
347
5
100
19
if(my $pool = delete $args{pool}) {
348
3
100
18
if(blessed $pool) {
349
1
4
$self->{pool} = $pool;
350
} else {
351
2
9
$self->{pool} = Database::Async::Pool->new(
352
$self->pool_args,
353
%$pool,
354
);
355
}
356
}
357
5
35
$self->next::method(%args);
358
}
359
360
0
0
1
sub encoding { shift->{encoding} }
361
362
=head2 ryu
363
364
A L instance, used for requesting sources, sinks and timers.
365
366
=cut
367
368
sub ryu {
369
0
0
1
my ($self) = @_;
370
0
0
$self->{ryu} //= do {
371
0
$self->add_child(
372
my $ryu = Ryu::Async->new
373
);
374
0
$ryu
375
}
376
}
377
378
=head2 new_source
379
380
Instantiates a new L.
381
382
=cut
383
384
0
0
1
sub new_source { shift->ryu->source }
385
386
=head2 new_sink
387
388
Instantiates a new L.
389
390
=cut
391
392
0
0
1
sub new_sink { shift->ryu->sink }
393
394
=head2 new_future
395
396
Instantiates a new L.
397
398
=cut
399
400
0
0
1
sub new_future { shift->loop->new_future }
401
402
=head1 METHODS - Internal, engine-related
403
404
=cut
405
406
=head2 request_engine
407
408
Attempts to instantiate and connect to a new L
409
subclass. Returns a L which should resolve to a new
410
L instance when ready to use.
411
412
=cut
413
414
0
0
1
async sub request_engine {
415
0
my ($self) = @_;
416
0
$log->tracef('Requesting new engine');
417
0
my $engine = $self->engine_instance;
418
0
$log->tracef('Connecting');
419
0
return await $engine->connect->retain;
420
}
421
422
=head2 engine_instance
423
424
Loads the appropriate engine class and attaches to the loop.
425
426
=cut
427
428
sub engine_instance {
429
0
0
1
my ($self) = @_;
430
0
my $uri = $self->uri;
431
0
0
my $type = $self->{type} // $uri->scheme;
432
die 'unknown database type ' . $type
433
0
0
unless my $engine_class = $Database::Async::Engine::ENGINE_MAP{$type};
434
0
0
Module::Load::load($engine_class) unless $engine_class->can('new');
435
0
$log->tracef('Instantiating new %s', $engine_class);
436
my %param = (
437
0
0
%{$self->{engine_parameters} || {}},
0
0
438
(defined($uri) ? (uri => $uri) : ()),
439
db => $self,
440
);
441
442
# Only recent engine versions support this parameter
443
0
0
if(my $encoding = $self->encoding) {
444
0
0
if($engine_class->can('encoding')) {
445
0
$param{encoding} = $self->encoding;
446
} else {
447
# If we're given this parameter, let's not ignore it silently
448
0
die 'Database engine ' . $engine_class . ' does not support encoding parameter, try upgrading that module from CPAN or remove the encoding configuration in Database::Async';
449
}
450
}
451
452
$self->add_child(
453
0
my $engine = $engine_class->new(%param)
454
);
455
0
$engine;
456
}
457
458
=head2 engine_ready
459
460
Called by L instances when the engine is
461
ready for queries.
462
463
=cut
464
465
sub engine_ready {
466
0
0
1
my ($self, $engine) = @_;
467
0
$self->pool->queue_ready_engine($engine);
468
}
469
470
0
0
0
sub db { shift }
471
472
=head2 queue_query
473
474
Assign the given query to the next available engine instance.
475
476
=cut
477
478
0
0
1
async sub queue_query {
479
0
my ($self, $query) = @_;
480
0
$log->tracef('Queuing query %s', $query);
481
0
my $engine = await $self->pool->next_engine;
482
0
$log->tracef('Query %s about to run on %s', $query, $engine);
483
0
return await $engine->handle_query($query);
484
}
485
486
sub diagnostics {
487
0
0
0
my ($self) = @_;
488
}
489
490
sub notification {
491
0
0
0
my ($self, $engine, $channel, $data) = @_;
492
0
$log->tracef('Database notifies us via %s of %s', $channel, $data);
493
0
$self->notification_source($channel)->emit($data);
494
}
495
496
sub notification_source {
497
0
0
0
my ($self, $name) = @_;
498
0
0
$self->{notification_source}{$name} //= $self->new_source;
499
}
500
501
1;
502
503
__END__