File Coverage

blib/lib/AtteanX/Store/DBI.pm
Criterion Covered Total %
statement 397 513 77.3
branch 86 154 55.8
condition 15 36 41.6
subroutine 45 56 80.3
pod 20 23 86.9
total 563 782 71.9


line stmt bran cond sub pod time code
1             =head1 NAME
2              
3             AtteanX::Store::DBI - Database quad-store
4              
5             =head1 VERSION
6              
7             This document describes AtteanX::Store::DBI version 0.002
8              
9             =head1 SYNOPSIS
10              
11             use Attean;
12             my $store = Attean->get_store('DBI')->new( dbh => $dbh );
13              
14             =head1 DESCRIPTION
15              
16             AtteanX::Store::DBI provides a quad-store backed by a relational database.
17              
18             =head1 ATTRIBUTES
19              
20             =over 4
21              
22             =item C<< dbh >>
23              
24             =back
25              
26             =cut
27              
28 3     3   7760537 use utf8;
  3         25  
  3         33  
29 3     3   124 use v5.14;
  3         12  
30 3     3   16 use warnings;
  3         10  
  3         180  
31              
32             package AtteanX::Store::DBI {
33             our $VERSION = '0.002';
34 3     3   19 use Moo;
  3         7  
  3         22  
35 3     3   9554 use DBI;
  3         58101  
  3         202  
36 3     3   26 use Attean 0.013;
  3         66  
  3         25  
37 3     3   2340 use DBI::Const::GetInfoType;
  3         21340  
  3         392  
38 3     3   27 use Type::Tiny::Role;
  3         7  
  3         126  
39 3     3   16 use Types::Standard qw(Int Str ArrayRef HashRef ConsumerOf InstanceOf);
  3         7  
  3         33  
40 3     3   3719 use Encode;
  3         7  
  3         246  
41 3     3   1531 use Cache::LRU;
  3         2074  
  3         107  
42 3     3   22 use Set::Scalar;
  3         7  
  3         129  
43 3     3   1230 use DBIx::MultiStatementDo;
  3         2253620  
  3         161  
44 3     3   30 use List::MoreUtils qw(zip);
  3         5  
  3         35  
45 3     3   2587 use List::Util qw(any all first);
  3         8  
  3         271  
46 3     3   25 use File::ShareDir qw(dist_dir dist_file);
  3         35  
  3         164  
47 3     3   2343 use File::Slurp;
  3         31194  
  3         212  
48 3     3   23 use Scalar::Util qw(refaddr reftype blessed);
  3         7  
  3         157  
49 3     3   25 use namespace::clean;
  3         6  
  3         45  
50              
51             with 'Attean::API::MutableQuadStore';
52             with 'Attean::API::BulkUpdatableStore';
53             with 'Attean::API::QuadStore';
54             with 'Attean::API::CostPlanner';
55              
56             my @pos_names = Attean::API::Quad->variables;
57              
58             =head1 ROLES
59              
60             This class consumes L<Attean::API::QuadStore>,
61             L<Attean::API::MutableQuadStore>, and L<Attean::API::BulkUpdatableStore>.
62              
63             =head1 METHODS
64              
65             =over 4
66              
67             =item C<< new ( dbh => $dbh ) >>
68              
69             Returns a new quad-store object backed by the database referenced by the
70             supplied database handle.
71              
72             =cut
73              
74             has dbh => (is => 'ro', isa => InstanceOf['DBI::db'], required => 1);
75             has _i2t_cache => (is => 'ro', default => sub { Cache::LRU->new( size => 256 ) });
76             has _t2i_cache => (is => 'ro', default => sub { Cache::LRU->new( size => 256 ) });
77              
78             =item C<< init() >>
79              
80             Create the tables and indexes required for using the database as a quadstore.
81              
82             =cut
83             sub init {
84 0     0 1 0 my $self = shift;
85 0         0 my $dbh = $self->dbh;
86 0         0 my $batch = DBIx::MultiStatementDo->new( dbh => $dbh );
87 0 0       0 my $file = $self->create_schema_file or die 'No schema files available for store initialization';
88 0         0 my $sql = read_file($file);
89 0         0 $batch->do($sql);
90             }
91            
92             =item C<< temporary_store() >>
93              
94             Returns a temporary (in-memory, SQLite) store.
95              
96             =cut
97             sub temporary_store {
98 0     0 1 0 my $class = shift;
99 0         0 my $dbh = DBI->connect('dbi:SQLite:dbname=:memory:', '', '');
100 0         0 my $store = $class->new(dbh => $dbh);
101 0         0 $store->init();
102 0         0 return $store;
103             }
104            
105             sub _last_insert_id {
106 246     246   847 my $self = shift;
107 246         699 my $table = shift;
108 246         964 my $dbh = $self->dbh;
109 246         6800 return $dbh->last_insert_id(undef, undef, $table, undef);
110             }
111              
112             sub _get_term {
113 41     41   101 my $self = shift;
114 41         60 my $id = shift;
115 41 100       127 if (my $term = $self->_i2t_cache->get($id)) {
116 28         477 return $term;
117             }
118 13         174 my $sth = $self->dbh->prepare('SELECT term.type, term.value, dtterm.value AS datatype, term.language FROM term LEFT JOIN term dtterm ON (term.datatype_id = dtterm.term_id) WHERE term.term_id = ?');
119 13         1471 $sth->execute($id);
120 13         351 my $row = $sth->fetchrow_hashref;
121 13         62 my $type = $row->{type};
122 13         20 my $term;
123 13         25 my $value = $row->{value};
124 13         21 my $datatype = $row->{datatype};
125 13         20 my $lang = $row->{language};
126 13 100       49 if ($type eq 'iri') {
    50          
    50          
127 9         270 $term = Attean::IRI->new( value => $value );
128             } elsif ($type eq 'blank') {
129 0         0 $term = Attean::Blank->new( value => $value );
130             } elsif ($type eq 'literal') {
131 4         109 my %args = (value => $value, datatype => Attean::IRI->new(value => $datatype));
132 4 100       1418 if ($lang) {
133 2         7 $args{language} = $lang;
134             }
135 4         81 $term = Attean::Literal->new( %args );
136             }
137 13 50       4695 if ($term) {
138 13         70 $self->_i2t_cache->set($id => $term);
139 13         512 return $term;
140             }
141 0         0 Carp::confess "Failed to load term values for bad ID " . Dumper($id);
142             }
143            
144             sub _get_term_id {
145 41     41   87 my $self = shift;
146 41         57 my $term = shift;
147 41 100       213 if (my $id = $self->_t2i_cache->get($term->as_string)) {
148 37         8952 return $id;
149             }
150 4         1377 my $dbh = $self->dbh;
151 4         12 my $tid;
152 4 50       28 if ($term->does('Attean::API::IRI')) {
    0          
    0          
153 4         98 my $sth = $dbh->prepare('SELECT term_id FROM term WHERE type = ? AND value = ?');
154 4         349 my $value = $term->value;
155 4         277 $sth->execute('iri', $value);
156 4         83 ($tid) = $sth->fetchrow_array;
157             } elsif ($term->does('Attean::API::Blank')) {
158 0         0 my $sth = $dbh->prepare('SELECT term_id FROM term WHERE type = ? AND value = ?');
159 0         0 my $value = $term->value;
160 0         0 $sth->execute('blank', $value);
161 0         0 ($tid) = $sth->fetchrow_array;
162             } elsif ($term->does('Attean::API::Literal')) {
163 0         0 my $dtid = $self->_get_or_create_term_id($term->datatype);
164 0         0 my $sth = $dbh->prepare('SELECT term_id FROM term WHERE type = ? AND value = ? AND datatype_id = ? AND language = ?');
165 0         0 my $value = $term->value;
166 0         0 my $lang = $term->language;
167 0         0 $sth->execute('literal', $value, $dtid, $lang);
168 0         0 ($tid) = $sth->fetchrow_array;
169             }
170 4 50       21 if (defined($tid)) {
171 0         0 $self->_t2i_cache->set($term->as_string => $tid);
172 0         0 return $tid;
173             }
174 4         72 return;
175             }
176            
177             sub _get_or_create_term_id {
178 630     630   10345 my $self = shift;
179 630         1174 my $term = shift;
180 630 100       3362 if (my $id = $self->_t2i_cache->get($term->as_string)) {
181 384         121495 return $id;
182             }
183 246         74618 my $dbh = $self->dbh;
184 246         478 my $tid;
185 246         1989 my $insert_term_sth = $dbh->prepare('INSERT INTO term (type, value, datatype_id, language) VALUES (?, ?, ?, ?)');
186 246 100       22579 if ($term->does('Attean::API::IRI')) {
    100          
    50          
187 134         3725 my $sth = $dbh->prepare('SELECT term_id FROM term WHERE type = ? AND value = ?');
188 134         11592 my $value = $term->value;
189 134         12071 $sth->execute('iri', $value);
190 134         1296 ($tid) = $sth->fetchrow_array;
191 134 50       646 unless (defined($tid)) {
192 134         1438283 $insert_term_sth->execute('iri', $value, undef, undef);
193 134         1543 $tid = $self->_last_insert_id('term');
194             }
195             } elsif ($term->does('Attean::API::Blank')) {
196 42         3173 my $sth = $dbh->prepare('SELECT term_id FROM term WHERE type = ? AND value = ?');
197 42         4134 my $value = $term->value;
198 42         3978 $sth->execute('blank', $value);
199 42         456 ($tid) = $sth->fetchrow_array;
200 42 50       207 unless (defined($tid)) {
201 42         447788 $insert_term_sth->execute('blank', $value, undef, undef);
202 42         595 $tid = $self->_last_insert_id('term');
203             }
204             } elsif ($term->does('Attean::API::Literal')) {
205 70         9054 my $dtid = $self->_get_or_create_term_id($term->datatype);
206 70         346 my $sql = 'SELECT term_id FROM term WHERE type = ? AND value = ? AND datatype_id = ?';
207 70         254 my $value = $term->value;
208 70         277 my @bind = ('literal', $value, $dtid);
209 70         265 my $lang = $term->language;
210 70 100       281 if ($lang) {
211 14         44 $sql .= ' AND language = ?';
212 14         37 push(@bind, $lang);
213             }
214 70         471 my $sth = $dbh->prepare($sql);
215 70         13827 $sth->execute(@bind);
216 70         679 ($tid) = $sth->fetchrow_array;
217 70 50       417 unless (defined($tid)) {
218 70         741278 $insert_term_sth->execute('literal', $value, $dtid, $lang);
219 70         973 $tid = $self->_last_insert_id('term');
220             }
221             } else {
222 0         0 die "Failed to get ID for term: " . $term->as_string;
223             }
224            
225 246 50       1421 if (defined($tid)) {
226 246         1964 $self->_t2i_cache->set($term->as_string => $tid);
227 246         35873 return $tid;
228             }
229 0         0 die;
230             }
231              
232             =item C<< get_quads ( $subject, $predicate, $object, $graph ) >>
233              
234             Returns a stream object of all statements matching the specified subject,
235             predicate and objects. Any of the arguments may be undef to match any value,
236             or an ARRAY reference of terms that are allowable in the respective quad
237             position.
238              
239             =cut
240              
241             sub get_quads {
242             my $self = shift;
243             my @nodes = map { ref($_) eq 'ARRAY' ? $_ : [$_] } @_;
244             my @where;
245             my @bind;
246             foreach my $i (0 .. 3) {
247             my $name = $pos_names[$i];
248             my $terms = $nodes[$i];
249             if (defined($terms)) {
250             unless (scalar(@$terms) == 1 and not defined($terms->[0])) {
251             unless (any { $_->does('Attean::API::Variable') } @$terms) {
252             my @ids = map { $self->_get_term_id($_) } @$terms;
253             unless (scalar(@ids)) {
254             return Attean::ListIterator->new( values => [], item_type => 'Attean::API::Quad' );
255             }
256             push(@where, "$name IN (" . join(', ', ('?') x scalar(@ids)) . ")");
257             push(@bind, @ids);
258             }
259             }
260             }
261             }
262             my $sql = 'SELECT subject, predicate, object, graph FROM quad';
263             if (scalar(@where)) {
264             $sql .= ' WHERE ' . join(' AND ', @where);
265             }
266             my $sth = $self->dbh->prepare($sql);
267             $sth->execute(@bind);
268             my $ok = 1;
269             my $sub = sub {
270             return unless ($ok);
271             if (my $row = $sth->fetchrow_arrayref) {
272             my @terms = map { $self->_get_term($_) } @$row;
273             my $quad = Attean::Quad->new(zip @pos_names, @terms);
274             return $quad;
275             }
276             $ok = 0;
277             return;
278             };
279             my $iter = Attean::CodeIterator->new( generator => $sub, item_type => 'Attean::API::Quad' );
280             return $iter;
281             }
282              
283             =item C<< count_quads ( $subject, $predicate, $object, $graph ) >>
284              
285             Returns the count of all statements matching the specified subject,
286             predicate and objects. Any of the arguments may be undef to match any value,
287             or an ARRAY reference of terms that are allowable in the respective quad
288             position.
289              
290             =cut
291              
292             sub count_quads {
293 17     17 1 2180 my $self = shift;
294 17 50       64 my @nodes = map { ref($_) eq 'ARRAY' ? $_ : [$_] } @_;
  7         29  
295 17         140 my @where;
296             my @bind;
297 17         76 foreach my $i (0 .. 3) {
298 62         169 my $name = $pos_names[$i];
299 62         103 my $terms = $nodes[$i];
300 62 100       162 if (defined($terms)) {
301 7 100 66     32 unless (scalar(@$terms) == 1 and not defined($terms->[0])) {
302 5 100   5   32 unless (any { $_->does('Attean::API::Variable') } @$terms) {
  5         22  
303 4         187 my @ids = map { $self->_get_term_id($_) } @$terms;
  4         13  
304 4 100       205 return 0 unless scalar(@ids);
305 2         9 push(@where, "$name IN (" . join(', ', ('?') x scalar(@ids)) . ")");
306 2         12 push(@bind, @ids);
307             }
308             }
309             }
310             }
311 15         49 my $sql = 'SELECT COUNT(*) FROM quad';
312 15 100       52 if (scalar(@where)) {
313 2         7 $sql .= ' WHERE ' . join(' AND ', @where);
314             }
315 15         125 my $sth = $self->dbh->prepare($sql);
316 15         2360 $sth->execute(@bind);
317 15         323 my ($count) = $sth->fetchrow_array;
318 15         381 return $count;
319             }
320              
321             =item C<< get_graphs >>
322              
323             Returns an iterator over the Attean::API::Term objects comprising
324             the set of graphs of the stored quads.
325              
326             =cut
327              
328             sub get_graphs {
329 10     10 1 1559 my $self = shift;
330 10         88 my $sth = $self->dbh->prepare('SELECT DISTINCT value FROM quad JOIN term ON (quad.graph = term.term_id)');
331 10         1995 $sth->execute;
332             my $sub = sub {
333 15     15   7989 my $row = $sth->fetchrow_arrayref;
334 15 100       84 return unless ref($row);
335 6         20 my ($value) = @$row;
336 6         174 return Attean::IRI->new(value => $value);
337 10         109 };
338 10         409 return Attean::CodeIterator->new( generator => $sub, item_type => 'Attean::API::Term' );
339             }
340              
341             # -----------------------------------------------------------------------------
342              
343             =item C<< add_quad ( $quad ) >>
344              
345             Adds the specified C<$quad> to the underlying model.
346              
347             =cut
348            
349             sub add_quad {
350             my $self = shift;
351             my $st = shift;
352             my @ids = map { $self->_get_or_create_term_id($_) } $st->values;
353             if (any { not defined($_) } @ids) {
354             return;
355             }
356            
357             my $type = $self->database_type;
358             my @bind = @ids;
359             my $sql = 'INSERT INTO quad (subject, predicate, object, graph) VALUES (?, ?, ?, ?)';
360             if ($type eq 'sqlite') {
361             $sql = 'INSERT OR IGNORE INTO quad (subject, predicate, object, graph) VALUES (?, ?, ?, ?)';
362             } elsif ($type eq 'mysql') {
363             $sql = 'INSERT IGNORE INTO quad (subject, predicate, object, graph) VALUES (?, ?, ?, ?)';
364             } elsif ($type eq 'postgresql') {
365             $sql = 'INSERT INTO quad (subject, predicate, object, graph) SELECT ?, ?, ?, ? WHERE NOT EXISTS (SELECT 1 FROM quad WHERE subject = ? AND predicate = ? AND object = ? AND graph = ?)';
366             push(@bind, @ids);
367             }
368             my $sth = $self->dbh->prepare($sql);
369             $sth->execute(@bind);
370             return;
371             }
372              
373             =item C<< remove_quad ( $statement ) >>
374              
375             Removes the specified C<$statement> from the underlying model.
376              
377             =cut
378              
379             sub remove_quad {
380 4     4 1 9 my $self = shift;
381 4         7 my $st = shift;
382 4         35 my @ids = map { $self->_get_term_id($_) } $st->values;
  16         198  
383 4 50       27 unless (scalar(@ids) == 4) {
384 0         0 return;
385             }
386 4 50   16   27 unless (all { defined($_) } @ids) {
  16         26  
387 0         0 return;
388             }
389 4         37 my $sth = $self->dbh->prepare('DELETE FROM quad WHERE subject = ? AND predicate = ? AND object = ? AND graph = ?');
390 4         32650 $sth->execute(@ids);
391 4         84 return;
392             }
393              
394             =item C<< create_graph( $graph ) >>
395              
396             This is a no-op function for the memory quad-store.
397              
398             =cut
399              
400       4 1   sub create_graph {
401             # no-op on a quad-store
402             }
403              
404             =item C<< drop_graph( $graph ) >>
405              
406             Removes all quads with the given C<< $graph >>.
407              
408             =cut
409              
410             sub drop_graph {
411 2     2 1 1066 my $self = shift;
412 2         12 return $self->clear_graph(@_);
413             }
414              
415             =item C<< clear_graph( $graph ) >>
416              
417             Removes all quads with the given C<< $graph >>.
418              
419             =cut
420              
421             sub clear_graph {
422 4     4 1 1180 my $self = shift;
423 4         10 my $graph = shift;
424 4         22 my $gid = $self->_get_term_id($graph);
425 4 50       25 return unless defined($gid);
426 4         53 my $sth = $self->dbh->prepare('DELETE FROM quad WHERE graph = ?');
427 4         45177 $sth->execute($gid);
428 4         110 return;
429             }
430            
431             =item C<< begin_transaction >>
432              
433             Begin a database transaction.
434              
435             =cut
436              
437             sub begin_transaction {
438             # warn 'begin transaction';
439 0     0 1 0 my $self = shift;
440 0         0 $self->dbh->begin_work;
441             }
442            
443             =item C<< abort_transaction >>
444              
445             Rollback the current database transaction.
446              
447             =cut
448              
449             sub abort_transaction {
450             # warn 'abort transaction';
451 0     0 1 0 my $self = shift;
452 0         0 $self->dbh->rollback;
453             }
454            
455             =item C<< end_transaction >>
456              
457             Commit the current database transaction.
458              
459             =cut
460              
461             sub end_transaction {
462             # warn 'end transaction';
463 0     0 1 0 my $self = shift;
464 0         0 $self->dbh->commit;
465             }
466            
467             =item C<< begin_bulk_updates >>
468              
469             Begin a database transaction.
470              
471             =cut
472              
473             sub begin_bulk_updates {
474 0     0 1 0 my $self = shift;
475 0         0 $self->dbh->begin_work;
476             }
477            
478             =item C<< end_bulk_updates >>
479              
480             Commit the current database transaction.
481              
482             =cut
483              
484             sub end_bulk_updates {
485 0     0 1 0 my $self = shift;
486 0         0 $self->dbh->commit;
487             }
488            
489             =item C<< database_type >>
490              
491             Returns the database type name as a string (e.g. 'mysql', 'sqlite', or 'postgresql').
492              
493             =cut
494              
495             sub database_type {
496 164     164 1 432 my $self = shift;
497 164         616 my $dbh = $self->dbh;
498             # warn $dbh->get_info($GetInfoType{SQL_DRIVER_NAME});
499 164         1860 my $type = lc($dbh->get_info($GetInfoType{SQL_DBMS_NAME}));
500 164         5232 return $type;
501             }
502            
503             =item C<< initialize_version >>
504              
505             Insert data into the attean_version table.
506              
507             =cut
508              
509             sub initialize_version {
510 0     0 1 0 my $self = shift;
511 0         0 my $dbh = $self->dbh;
512 0         0 $dbh->do('DELETE FROM attean_version');
513 0         0 my $sql = 'INSERT INTO attean_version (attean_version, store_version) VALUES (?, ?);';
514 0         0 $dbh->do($sql, undef, $Attean::VERSION, $AtteanX::Store::DBI::VERSION);
515             }
516              
517             =item C<< create_schema_file >>
518              
519             Returns the path to the file containing the database DDL for quadstore creation
520             for the current database type if available, undef otherwise.
521              
522             =cut
523              
524             sub create_schema_file {
525 20     20 1 1340 my $self = shift;
526 20         89 my $type = $self->database_type;
527 20   50     126 my $dir = $ENV{ATTEAN_SHAREDIR} || eval { dist_dir('AtteanX-Store-DBI') } || 'share';
528 20         3313 my $file = File::Spec->catfile($dir, 'database-schema', sprintf('%s-create.sql', $type));
529 20 50       501 if (-r $file) {
530 20         140 return $file;
531             }
532 0         0 return;
533             }
534              
535             =item C<< drop_schema_file >>
536              
537             Returns the path to the file containing the database DDL for quadstore deletion
538             for the current database type if available, undef otherwise.
539              
540             =cut
541              
542             sub drop_schema_file {
543 0     0 1 0 my $self = shift;
544 0         0 my $type = $self->database_type;
545 0   0     0 my $dir = $ENV{ATTEAN_SHAREDIR} || eval { dist_dir('AtteanX-Store-DBI') } || 'share';
546 0         0 my $file = File::Spec->catfile($dir, 'database-schema', sprintf('%s-drop.sql', $type));
547 0 0       0 if (-r $file) {
548 0         0 return $file;
549             }
550 0         0 return;
551             }
552            
553             =item C<< available_database_types >>
554              
555             Returns the names of the database types for which the system has schemas
556             available to create and drop quadstore tables.
557              
558             =cut
559              
560             sub available_database_types {
561 0     0 1 0 my $self = shift;
562 0   0     0 my $dir = $ENV{ATTEAN_SHAREDIR} || eval { dist_dir('AtteanX-Store-DBI') } || 'share';
563 0         0 my $pat = File::Spec->catfile($dir, 'database-schema', '*-create.sql');
564 0         0 my @files = glob($pat);
565 0         0 my @types = map { /(\w+)-create.sql/ } @files;
  0         0  
566 0         0 return @types;
567             }
568            
569             =item C<< dbi_connect_args ( $type, %args ) >>
570              
571             =item C<< dbi_connect_args ( %args ) >>
572              
573             Returns a quad C<< $dsn, $user, $password, \%connect_args >> suitable for
574             passing to C<< DBI->connect >> to obtain a database handle to be used in
575             constructing a C<< AtteanX::Store::DBI >> quadstore.
576              
577             C<< %args >> must contain a value for the C<< database >> key. It may also
578             contain values for the optional keys: C<< user >>, C<< password >>,
579             C<< host >>, and C<< port >>.
580              
581             If invoked as a class method, the C<< $type >> parameter is required, and must
582             be one of the database types returned by C<< available_database_types >>.
583              
584             If invoked as an object method, the C<< $type >> parameter must not be
585             included; this information will be obtained directly from the
586             C<< AtteanX::Store::DBI >> object.
587              
588             =cut
589              
590             sub dbi_connect_args {
591 20     20 1 111966 my $self = shift;
592 20 50       139 my $type = blessed($self) ? $self->database_type : shift;
593 20         112 my %args = @_;
594 20         75 my $database = $args{database};
595 20         58 my $user = $args{user};
596 20         49 my $password = $args{password};
597 20         70 my $host = $args{host};
598 20         69 my $port = $args{port};
599 20         53 my $dsn;
600             my %connect_args;
601 20         71 $connect_args{RaiseError} = 1;
602              
603 20 50       162 if ($type eq 'mysql') {
    50          
    50          
604 0         0 $dsn = "DBI:mysql:database=${database}";
605 0 0       0 if (defined($host)) {
606 0         0 $dsn .= ";host=$host";
607             }
608 0 0       0 if (defined($port)) {
609 0         0 $dsn .= ";port=$port";
610             }
611 0         0 $connect_args{mysql_enable_utf8} = 1;
612             } elsif ($type eq 'postgresql') {
613 0         0 $dsn = "DBI:Pg:dbname=${database}";
614 0 0       0 if (defined($host)) {
615 0         0 $dsn .= ";host=$host";
616             }
617 0 0       0 if (defined($port)) {
618 0         0 $dsn .= ";port=$port";
619             }
620             } elsif ($type eq 'sqlite') {
621 20         83 $dsn = "DBI:SQLite:dbname=${database}";
622 20         63 $connect_args{sqlite_unicode} = 1;
623             }
624            
625 20         153 return ($dsn, $user, $password, \%connect_args);
626             }
627              
628             =item C<< plans_for_algebra( $algebra, $model, $active_graphs, $default_graphs ) >>
629              
630             For BGP algebras, returns a DBI-specific L<Attean::API::Plan> object, otherwise
631             returns undef.
632              
633             =cut
634              
635             sub plans_for_algebra {
636 36     36 1 359975 my $self = shift;
637 36         61 my $algebra = shift;
638 36         56 my $model = shift;
639 36         65 my $active_graphs = shift;
640 36         54 my $default_graphs = shift;
641 36 50       100 return unless ($algebra);
642            
643 36         68 my %args = @_;
644 36         155 my $counter = $args{dbi_filter_counter}++;
645            
646 36 100 66     356 if ($algebra->isa('Attean::Algebra::Filter')) {
    100          
647 9         43 my $e = $algebra->expression;
648 9 50       52 if ($e->isa('Attean::FunctionExpression')) {
649 9 100 33     96 if ($e->operator =~ m/IS(IRI|LITERAL|BLANK)/i) {
    50          
650 4         21 my $type = lc($1);
651 4         11 my ($operand) = @{ $e->children };
  4         18  
652 4 50 33     39 if ($operand->isa('Attean::ValueExpression') and $operand->value->does('Attean::API::Variable')) {
653 4         79 my $var = $operand->value;
654 4 50       20 if (my ($plan) = $self->plans_for_algebra($algebra->child, $model, $active_graphs, $default_graphs, %args)) {
655 4 50       1893 if ($plan->isa('AtteanX::Store::DBI::Plan')) {
656 4 50       28 if (exists $plan->variables->{ $var->value }) {
657 4         9 my ($table, $col) = @{ $plan->variables->{ $var->value } };
  4         20  
658 4         17 my $ref = join('.', map { $self->dbh->quote_identifier($_) } ($table, $col));
  8         256  
659 4         113 my $typecol = $self->dbh->quote_identifier('type');
660 4         96 push(@{ $plan->where }, "$ref IN (SELECT term_id FROM term WHERE ${typecol} = ?)");
  4         30  
661 4         9 push(@{ $plan->bindings }, $type);
  4         14  
662 4         28 return $plan;
663             }
664             }
665             }
666             }
667             } elsif ($e->operator eq 'STRSTARTS' or $e->operator eq 'CONTAINS') {
668 5         13 my ($varexpr, $pat) = @{ $e->children };
  5         22  
669 5 50 66     45 if ($varexpr->isa('Attean::ValueExpression') and $varexpr->value->does('Attean::API::Variable') and $pat->isa('Attean::ValueExpression') and $pat->value->does('Attean::API::Literal')) {
      66        
      33        
670 4 50       189 if (my ($plan) = $self->plans_for_algebra($algebra->child, $model, $active_graphs, $default_graphs, %args)) {
671 4 50       10706 if ($plan->isa('AtteanX::Store::DBI::Plan')) {
672 4         18 my $var = $varexpr->value;
673 4         14 my $varname = $var->value;
674 4 50       25 if (exists $plan->variables->{ $var->value }) {
675 4         7 my ($table, $col) = @{ $plan->variables->{ $var->value } };
  4         18  
676 4         12 my $literal = $pat->value;
677 4         10 my $ref = join('.', map { $self->dbh->quote_identifier($_) } ($table, $col));
  8         316  
678 4         112 my $typecol = $self->dbh->quote_identifier('type');
679 4         95 my $termtable = "tf$counter";
680 4         7 push(@{ $plan->tables }, ['term', $termtable]);
  4         22  
681              
682 4         21 my $db = $self->database_type;
683 4 50 33     41 return unless ($db eq 'mysql' or $db eq 'postgresql' or $db eq 'sqlite');
      33        
684            
685 4         10 push(@{ $plan->where }, "$ref = $termtable.term_id");
  4         21  
686            
687 4         10 push(@{ $plan->where }, "$termtable.$typecol = ?"); # TODO: Remove this (and the bindings below) if $varexpr is STR(?var) instead of just ?var.
  4         15  
688            
689 4 50       19 my $op = ($e->operator eq 'STRSTARTS') ? '=' : '>=';
690 4 50       23 if ($db eq 'mysql') {
    50          
    50          
691 0         0 push(@{ $plan->where }, "LOCATE(?, $termtable.value) ${op} ?");
  0         0  
692 0         0 push(@{ $plan->bindings }, 'literal');
  0         0  
693 0         0 push(@{ $plan->bindings }, $literal->value);
  0         0  
694 0         0 push(@{ $plan->bindings }, 1);
  0         0  
695             } elsif ($db eq 'postgresql') {
696 0         0 push(@{ $plan->where }, "STRPOS($termtable.value, ?) ${op} ?");
  0         0  
697 0         0 push(@{ $plan->bindings }, 'literal');
  0         0  
698 0         0 push(@{ $plan->bindings }, $literal->value);
  0         0  
699 0         0 push(@{ $plan->bindings }, 1);
  0         0  
700             } elsif ($db eq 'sqlite') {
701 4         8 push(@{ $plan->where }, "INSTR($termtable.value, ?) ${op} 1");
  4         18  
702 4         8 push(@{ $plan->bindings }, 'literal');
  4         15  
703 4         6 push(@{ $plan->bindings }, $literal->value);
  4         21  
704             }
705            
706 4 100       26 if (my $lang = $literal->language) {
707 2         4 push(@{ $plan->where }, "$termtable.language = ?");
  2         10  
708 2         5 push(@{ $plan->bindings }, $lang);
  2         7  
709             } else {
710 2         51 my $xs = Attean::IRI->new( value => 'http://www.w3.org/2001/XMLSchema#string' );
711 2         675 my $la = Attean::IRI->new( value => 'http://www.w3.org/1999/02/22-rdf-syntax-ns#langString' );
712 2         792 my $xid = $self->_get_term_id($xs);
713 2         8 my $lid = $self->_get_term_id($la);
714 2         6 push(@{ $plan->where }, "$termtable.datatype_id IN (?, ?)");
  2         12  
715 2         6 push(@{ $plan->bindings }, $xid, $lid);
  2         14  
716             }
717 4         28 return $plan;
718             }
719             }
720             }
721             }
722             }
723             }
724 9         74 } elsif ($algebra->isa('Attean::Algebra::BGP') and scalar(@{ $algebra->triples }) > 0) {
725 9         38 my @vars = $algebra->in_scope_variables;
726            
727 9         2520 my @triples = @{ $algebra->triples };
  9         39  
728 9         55 my @select;
729             my @where_joins;
730 9         0 my %seen_vars;
731 9         0 my %source_table_for_var;
732 9         0 my %blanks;
733 9         24 my $tcounter = 0;
734 9         16 my $bcounter = 0;
735 9         17 my @tables;
736            
737             my %rename_mapping;
738             my $rename_proj = sub {
739 36     36   67 my $name = shift;
740 36 100       148 if ($name =~ /[-._]|\W/) {
741 9         35 my $old = $name;
742 9         69 $name =~ s/_/__/g;
743 9         53 $name =~ s/([-.]|\W)/_d/g;
744 9         38 $name =~ s/([-.]|\W)/'_x' . sprintf('%02x', ord($1))/e;
  0         0  
745 9         33 $rename_mapping{$old} = $name;
746             }
747            
748 36         81 return $name;
749 9         66 };
750            
751 9 50       52 Carp::confess Dumper($active_graphs) unless (ref($active_graphs) eq 'ARRAY');
752            
753 9         20 my @graph_ids = map { $self->_get_term_id($_) } @{ $active_graphs };
  9         49  
  9         24  
754 9 50   9   91 if (any { not defined($_) } @graph_ids) {
  9         46  
755 0         0 return;
756             }
757            
758 9         35 my @bind;
759 9         205 my $graph = Attean::Variable->new(value => '___g');
760 9         968 $seen_vars{ $graph->value }++;
761 9         100 my $graph_values = sprintf('(%s)', join(', ', ('?') x scalar(@graph_ids)));
762 9         32 push(@bind, @graph_ids);
763 9         38 my @where = ("t0.graph IN $graph_values");
764            
765 9         30 foreach my $t (@triples) {
766 9         28 my $table = 't' . $tcounter++;
767 9         40 push(@tables, ['quad', $table]);
768              
769 9         25 my @vars;
770 9         61 my $q = $t->as_quadpattern($graph);
771 9         3557 my @nodes = $q->values;
772 9         448 foreach my $i (0 .. $#nodes) {
773 36         72 my $node = $nodes[$i];
774 36         88 my $name = $pos_names[$i];
775 36 50       109 if ($node->does('Attean::API::Variable')) {
    0          
776 36         518 my $var = $node;
777 36         103 push(@vars, [$var, $name]);
778             } elsif ($node->does('Attean::API::Blank')) {
779 0         0 my $id = $node->value;
780 0 0       0 unless (exists $blanks{$id}) {
781 0         0 my $bname = sprintf('.%s_%d', 'blank', $bcounter++);
782 0         0 $blanks{$id} = Attean::Variable->new(value => $bname);
783             }
784 0         0 my $var = $blanks{$id};
785 0         0 push(@vars, [$var, $name]);
786 0         0 $seen_vars{ $var->value }++;
787             } else {
788 0         0 my $id = $self->_get_term_id($node);
789 0 0       0 return unless defined($id);
790 0         0 push(@where, sprintf('%s.%s = ?', $table, $name));
791 0         0 push(@bind, $id);
792             }
793             }
794            
795 9         35 foreach my $vdata (@vars) {
796 36         110 my ($var, $name) = @$vdata;
797 36         81 my $var_name = $rename_proj->( $var->value );
798 36 100       184 push(@select, [$table, $name, $var_name]) unless ($seen_vars{$var->value}++);
799 36 50       103 if (my $tt = $source_table_for_var{ $var->value }) {
800 0         0 push(@where_joins, ['=', $tt, [$table, $name]]);
801             } else {
802 36         145 $source_table_for_var{ $var->value } = [$table, $name];
803             }
804             }
805             }
806            
807 9         33 foreach my $w (@where_joins) {
808 0         0 my ($op, $a, $b) = @$w;
809 0         0 my ($as, $bs) = map { sprintf('%s.%s', @$_) } ($a, $b);
  0         0  
810 0         0 push(@where, join(' ', $as, $op, $bs));
811             }
812            
813 9         288 return AtteanX::Store::DBI::Plan->new(
814             store => $self,
815             select => \@select,
816             where => \@where,
817             tables => \@tables,
818             in_scope_variables => [@vars],
819             rename_mapping => \%rename_mapping,
820             bindings => \@bind,
821             variables => \%source_table_for_var,
822             );
823             }
824              
825 19         67 return;
826             }
827              
828             =item C<< cost_for_plan( $plan ) >>
829              
830             Returns the estimated cost for a DBI-specific query plan, undef otherwise.
831              
832             =cut
833              
834             sub cost_for_plan {
835             my $self = shift;
836             my $plan = shift;
837             if ($plan->isa('AtteanX::Store::DBI::Plan')) {
838             return 1; # TODO: actually estimate cost here
839             }
840             return;
841             }
842              
843             }
844              
845             package AtteanX::Store::DBI::Plan 0.012 {
846 3     3   22945 use Moo;
  3         9  
  3         30  
847 3     3   8480 use Type::Tiny::Role;
  3         7  
  3         100  
848 3     3   18 use Types::Standard qw(HashRef ArrayRef InstanceOf Str);
  3         7  
  3         62  
849 3     3   2865 use namespace::clean;
  3         11  
  3         24  
850            
851             has store => (is => 'ro', isa => InstanceOf['AtteanX::Store::DBI'], required => 1);
852             has rename_mapping => (is => 'ro', isa => HashRef[Str], default => sub { +{} });
853             has variables => (is => 'ro', isa => HashRef, required => 1);
854             has bindings => (is => 'ro', isa => ArrayRef, required => 1);
855             has select => (is => 'ro', isa => ArrayRef, required => 1);
856             has where => (is => 'ro', isa => ArrayRef, required => 1);
857             has tables => (is => 'ro', isa => ArrayRef[ArrayRef[Str]], required => 1);
858            
859             with 'Attean::API::BindingSubstitutionPlan', 'Attean::API::NullaryQueryTree';
860            
861             sub plan_as_string {
862 0     0 0 0 my $self = shift;
863 0         0 my ($sql, @bind) = $self->sql();
864 0         0 return sprintf('DBI BGP { %s ← (%s) }', $sql, join(', ', @bind));
865             }
866            
867             sub sql {
868 9     9 0 1772 my $self = shift;
869 9         24 my $bind = shift;
870              
871 9         33 my $store = $self->store;
872 9         31 my $dbh = $store->dbh;
873 9         17 my @bind = @{ $self->bindings };
  9         38  
874 9         21 my @where = @{ $self->where };
  9         40  
875 9 100       36 if ($bind) {
876 3         15 foreach my $var ($bind->variables) {
877 0         0 my $id = $store->_get_term_id($bind->value($var));
878 0 0       0 return unless defined($id);
879 0 0       0 if (my $cdata = $self->variables->{ $var }) {
880 0         0 my ($table, $col) = @$cdata;
881 0         0 push(@where, sprintf("%s.%s = ?", $table, $col));
882 0         0 push(@bind, $id);
883             }
884             }
885             }
886            
887 9         47 my @select = map { sprintf("%s.%s AS %s", map { $dbh->quote_identifier( $_ ) } @$_) } @{ $self->select };
  27         466  
  81         1386  
  9         34  
888 9 50       233 unless (scalar(@select)) {
889 0         0 push(@select, '1');
890             }
891            
892            
893 9         17 my @sql;
894 9         25 push(@sql, 'SELECT');
895 9         32 push(@sql, join(', ', @select));
896              
897 9         34 push(@sql, 'FROM');
898 9         26 push(@sql, join(', ', map { join(' ', @$_) } @{ $self->tables }));
  13         50  
  9         29  
899              
900 9 50       31 if (scalar(@where)) {
901 9         22 push(@sql, 'WHERE');
902 9         25 push(@sql, join(' AND ', map { "($_)" } @where));
  29         85  
903             }
904            
905 9         38 my $sql = join(" ", @sql);
906 9         51 return ($sql, @bind);
907             }
908            
909             sub substitute_impl {
910 3     3 0 3210 my $self = shift;
911 3         10 my $model = shift;
912 3         17 my ($sql, @bind) = $self->sql(@_);
913 3         11 my $store = $self->store;
914 3         8 my $dbh = $store->dbh;
915              
916             # warn "TODO: generatee SQL for BGP: $sql\n";
917             # warn "======================================================================\n";
918             # warn "$sql\n";
919             # warn "======================================================================\n";
920              
921 3         11 my $vars = $self->in_scope_variables;
922 3         23 my $sth = $dbh->prepare($sql);
923             return sub {
924             # warn "Generating impl by executing SQL";
925 3     3   449 my $rv = $sth->execute(@bind);
926 3 50       21 unless ($rv) {
927 0         0 warn '*** SQL error: ' . $sth->errstr;
928 0         0 die;
929             }
930             my $sub = sub {
931             # warn '=== Iterator invoked';
932             # use Data::Dumper;
933             # warn Dumper($sql, \@bind);
934 10 100       24761 if (my $row = $sth->fetchrow_hashref) {
935             # warn '@@@ Iterator got row';
936             # warn Dumper($row);
937            
938 7         34 my %bindings;
939 7         24 foreach my $k (@$vars) {
940 21   33     101 my $key = $self->rename_mapping->{$k} // $k;
941 21         59 my $term = $store->_get_term($row->{$key});
942 21         62 $bindings{$k} = $term;
943             }
944 7         245 my $r = Attean::Result->new( bindings => \%bindings );
945 7         1665 return $r;
946             } else {
947             # warn '^^^ Reached end-of-iterator'
948             }
949 3         15 return;
950 3         27 };
951 3         109 return Attean::CodeIterator->new( generator => $sub, item_type => 'Attean::API::Result', variables => $vars );
952 3         619 };
953             }
954            
955             }
956              
957             1;
958              
959             __END__
960              
961             =back
962              
963             =head1 BUGS
964              
965             Please report any bugs or feature requests to through the GitHub web interface
966             at L<https://github.com/kasei/perlrdf2/issues>.
967              
968             =head1 AUTHOR
969              
970             Gregory Todd Williams C<< <gwilliams@cpan.org> >>
971              
972             =head1 COPYRIGHT
973              
974             Copyright (c) 2014--2020 Gregory Todd Williams. This
975             program is free software; you can redistribute it and/or modify it under
976             the same terms as Perl itself.
977              
978             =cut