1             =head1 NAME
3             AtteanX::Store::DBI - Database quad-store
5             =head1 VERSION
7             This document describes AtteanX::Store::DBI version 0.002
9             =head1 SYNOPSIS
11             use Attean;
12             my $store = Attean->get_store('DBI')->new( dbh => $dbh );
14             =head1 DESCRIPTION
16             AtteanX::Store::DBI provides a quad-store backed by a relational database.
18             =head1 ATTRIBUTES
20             =over 4
22             =item C<< dbh >>
24             =back
26             =cut
28 3     3   7760537 use utf8;
  3         25  
  3         33  
29 3     3   124 use v5.14;
  3         12  
30 3     3   16 use warnings;
  3         10  
  3         180  
32             package AtteanX::Store::DBI {
33             our $VERSION = '0.002';
34 3     3   19 use Moo;
  3         7  
  3         22  
35 3     3   9554 use DBI;
  3         58101  
  3         202  
36 3     3   26 use Attean 0.013;
  3         66  
  3         25  
37 3     3   2340 use DBI::Const::GetInfoType;
  3         21340  
  3         392  
38 3     3   27 use Type::Tiny::Role;
  3         7  
  3         126  
39 3     3   16 use Types::Standard qw(Int Str ArrayRef HashRef ConsumerOf InstanceOf);
  3         7  
  3         33  
40 3     3   3719 use Encode;
  3         7  
  3         246  
41 3     3   1531 use Cache::LRU;
  3         2074  
  3         107  
42 3     3   22 use Set::Scalar;
  3         7  
  3         129  
43 3     3   1230 use DBIx::MultiStatementDo;
  3         2253620  
  3         161  
44 3     3   30 use List::MoreUtils qw(zip);
  3         5  
  3         35  
45 3     3   2587 use List::Util qw(any all first);
  3         8  
  3         271  
46 3     3   25 use File::ShareDir qw(dist_dir dist_file);
  3         35  
  3         164  
47 3     3   2343 use File::Slurp;
  3         31194  
  3         212  
48 3     3   23 use Scalar::Util qw(refaddr reftype blessed);
  3         7  
  3         157  
49 3     3   25 use namespace::clean;
  3         6  
  3         45  
51             with 'Attean::API::MutableQuadStore';
52             with 'Attean::API::BulkUpdatableStore';
53             with 'Attean::API::QuadStore';
54             with 'Attean::API::CostPlanner';
56             my @pos_names = Attean::API::Quad->variables;
58             =head1 ROLES
60             This class consumes L<Attean::API::QuadStore>,
61             L<Attean::API::MutableQuadStore>, and L<Attean::API::BulkUpdatableStore>.
63             =head1 METHODS
65             =over 4
67             =item C<< new ( dbh => $dbh ) >>
69             Returns a new quad-store object backed by the database referenced by the
70             supplied database handle.
72             =cut
74             has dbh => (is => 'ro', isa => InstanceOf['DBI::db'], required => 1);
75             has _i2t_cache => (is => 'ro', default => sub { Cache::LRU->new( size => 256 ) });
76             has _t2i_cache => (is => 'ro', default => sub { Cache::LRU->new( size => 256 ) });
78             =item C<< init() >>
80             Create the tables and indexes required for using the database as a quadstore.
82             =cut
83             sub init {
84 0     0 1 0 my $self = shift;
85 0         0 my $dbh = $self->dbh;
86 0         0 my $batch = DBIx::MultiStatementDo->new( dbh => $dbh );
87 0 0       0 my $file = $self->create_schema_file or die 'No schema files available for store initialization';
88 0         0 my $sql = read_file($file);
89 0         0 $batch->do($sql);
90             }
92             =item C<< temporary_store() >>
94             Returns a temporary (in-memory, SQLite) store.
96             =cut
97             sub temporary_store {
98 0     0 1 0 my $class = shift;
99 0         0 my $dbh = DBI->connect('dbi:SQLite:dbname=:memory:', '', '');
100 0         0 my $store = $class->new(dbh => $dbh);
101 0         0 $store->init();
102 0         0 return $store;
103             }
105             sub _last_insert_id {
106 246     246   847 my $self = shift;
107 246         699 my $table = shift;
108 246         964 my $dbh = $self->dbh;
109 246         6800 return $dbh->last_insert_id(undef, undef, $table, undef);
110             }
112             sub _get_term {
113 41     41   101 my $self = shift;
114 41         60 my $id = shift;
115 41 100       127 if (my $term = $self->_i2t_cache->get($id)) {
116 28         477 return $term;
117             }
118 13         174 my $sth = $self->dbh->prepare('SELECT term.type, term.value, dtterm.value AS datatype, term.language FROM term LEFT JOIN term dtterm ON (term.datatype_id = dtterm.term_id) WHERE term.term_id = ?');
119 13         1471 $sth->execute($id);
120 13         351 my $row = $sth->fetchrow_hashref;
121 13         62 my $type = $row->{type};
122 13         20 my $term;
123 13         25 my $value = $row->{value};
124 13         21 my $datatype = $row->{datatype};
125 13         20 my $lang = $row->{language};
126 13 100       49 if ($type eq 'iri') {
127 9         270 $term = Attean::IRI->new( value => $value );
128             } elsif ($type eq 'blank') {
129 0         0 $term = Attean::Blank->new( value => $value );
130             } elsif ($type eq 'literal') {
131 4         109 my %args = (value => $value, datatype => Attean::IRI->new(value => $datatype));
132 4 100       1418 if ($lang) {
133 2         7 $args{language} = $lang;
134             }
135 4         81 $term = Attean::Literal->new( %args );
136             }
137 13 50       4695 if ($term) {
138 13         70 $self->_i2t_cache->set($id => $term);
139 13         512 return $term;
140             }
141 0         0 Carp::confess "Failed to load term values for bad ID " . Dumper($id);
142             }
144             sub _get_term_id {
145 41     41   87 my $self = shift;
146 41         57 my $term = shift;
147 41 100       213 if (my $id = $self->_t2i_cache->get($term->as_string)) {
148 37         8952 return $id;
149             }
150 4         1377 my $dbh = $self->dbh;
151 4         12 my $tid;
152 4 50       28 if ($term->does('Attean::API::IRI')) {
153 4         98 my $sth = $dbh->prepare('SELECT term_id FROM term WHERE type = ? AND value = ?');
154 4         349 my $value = $term->value;
155 4         277 $sth->execute('iri', $value);
156 4         83 ($tid) = $sth->fetchrow_array;
157             } elsif ($term->does('Attean::API::Blank')) {
158 0         0 my $sth = $dbh->prepare('SELECT term_id FROM term WHERE type = ? AND value = ?');
159 0         0 my $value = $term->value;
160 0         0 $sth->execute('blank', $value);
161 0         0 ($tid) = $sth->fetchrow_array;
162             } elsif ($term->does('Attean::API::Literal')) {
163 0         0 my $dtid = $self->_get_or_create_term_id($term->datatype);
164 0         0 my $sth = $dbh->prepare('SELECT term_id FROM term WHERE type = ? AND value = ? AND datatype_id = ? AND language = ?');
165 0         0 my $value = $term->value;
166 0         0 my $lang = $term->language;
167 0         0 $sth->execute('literal', $value, $dtid, $lang);
168 0         0 ($tid) = $sth->fetchrow_array;
169             }
170 4 50       21 if (defined($tid)) {
171 0         0 $self->_t2i_cache->set($term->as_string => $tid);
172 0         0 return $tid;
173             }
174 4         72 return;
175             }
177             sub _get_or_create_term_id {
178 630     630   10345 my $self = shift;
179 630         1174 my $term = shift;
180 630 100       3362 if (my $id = $self->_t2i_cache->get($term->as_string)) {
181 384         121495 return $id;
182             }
183 246         74618 my $dbh = $self->dbh;
184 246         478 my $tid;
185 246         1989 my $insert_term_sth = $dbh->prepare('INSERT INTO term (type, value, datatype_id, language) VALUES (?, ?, ?, ?)');
186 246 100       22579 if ($term->does('Attean::API::IRI')) {
187 134         3725 my $sth = $dbh->prepare('SELECT term_id FROM term WHERE type = ? AND value = ?');
188 134         11592 my $value = $term->value;
189 134         12071 $sth->execute('iri', $value);
190 134         1296 ($tid) = $sth->fetchrow_array;
191 134 50       646 unless (defined($tid)) {
192 134         1438283 $insert_term_sth->execute('iri', $value, undef, undef);
193 134         1543 $tid = $self->_last_insert_id('term');
194             }
195             } elsif ($term->does('Attean::API::Blank')) {
196 42         3173 my $sth = $dbh->prepare('SELECT term_id FROM term WHERE type = ? AND value = ?');
197 42         4134 my $value = $term->value;
198 42         3978 $sth->execute('blank', $value);
199 42         456 ($tid) = $sth->fetchrow_array;
200 42 50       207 unless (defined($tid)) {
201 42         447788 $insert_term_sth->execute('blank', $value, undef, undef);
202 42         595 $tid = $self->_last_insert_id('term');
203             }
204             } elsif ($term->does('Attean::API::Literal')) {
205 70         9054 my $dtid = $self->_get_or_create_term_id($term->datatype);
206 70         346 my $sql = 'SELECT term_id FROM term WHERE type = ? AND value = ? AND datatype_id = ?';
207 70         254 my $value = $term->value;
208 70         277 my @bind = ('literal', $value, $dtid);
209 70         265 my $lang = $term->language;
210 70 100       281 if ($lang) {
211 14         44 $sql .= ' AND language = ?';
212 14         37 push(@bind, $lang);
213             }
214 70         471 my $sth = $dbh->prepare($sql);
215 70         13827 $sth->execute(@bind);
216 70         679 ($tid) = $sth->fetchrow_array;
217 70 50       417 unless (defined($tid)) {
218 70         741278 $insert_term_sth->execute('literal', $value, $dtid, $lang);
219 70         973 $tid = $self->_last_insert_id('term');
220             }
221             } else {
222 0         0 die "Failed to get ID for term: " . $term->as_string;
223             }
225 246 50       1421 if (defined($tid)) {
226 246         1964 $self->_t2i_cache->set($term->as_string => $tid);
227 246         35873 return $tid;
228             }
229 0         0 die;
230             }
232             =item C<< get_quads ( $subject, $predicate, $object, $graph ) >>
234             Returns a stream object of all statements matching the specified subject,
235             predicate and objects. Any of the arguments may be undef to match any value,
236             or an ARRAY reference of terms that are allowable in the respective quad
237             position.
239             =cut
241             sub get_quads {
242             my $self = shift;
243             my @nodes = map { ref($_) eq 'ARRAY' ? $_ : [$_] } @_;
244             my @where;
245             my @bind;
246             foreach my $i (0 .. 3) {
247             my $name = $pos_names[$i];
248             my $terms = $nodes[$i];
249             if (defined($terms)) {
250             unless (scalar(@$terms) == 1 and not defined($terms->[0])) {
251             unless (any { $_->does('Attean::API::Variable') } @$terms) {
252             my @ids = map { $self->_get_term_id($_) } @$terms;
253             unless (scalar(@ids)) {
254             return Attean::ListIterator->new( values => [], item_type => 'Attean::API::Quad' );
255             }
256             push(@where, "$name IN (" . join(', ', ('?') x scalar(@ids)) . ")");
257             push(@bind, @ids);
258             }
259             }
260             }
261             }
262             my $sql = 'SELECT subject, predicate, object, graph FROM quad';
263             if (scalar(@where)) {
264             $sql .= ' WHERE ' . join(' AND ', @where);
265             }
266             my $sth = $self->dbh->prepare($sql);
267             $sth->execute(@bind);
268             my $ok = 1;
269             my $sub = sub {
270             return unless ($ok);
271             if (my $row = $sth->fetchrow_arrayref) {
272             my @terms = map { $self->_get_term($_) } @$row;
273             my $quad = Attean::Quad->new(zip @pos_names, @terms);
274             return $quad;
275             }
276             $ok = 0;
277             return;
278             };
279             my $iter = Attean::CodeIterator->new( generator => $sub, item_type => 'Attean::API::Quad' );
280             return $iter;
281             }
283             =item C<< count_quads ( $subject, $predicate, $object, $graph ) >>
285             Returns the count of all statements matching the specified subject,
286             predicate and objects. Any of the arguments may be undef to match any value,
287             or an ARRAY reference of terms that are allowable in the respective quad
288             position.
290             =cut
292             sub count_quads {
293 17     17 1 2180 my $self = shift;
294 17 50       64 my @nodes = map { ref($_) eq 'ARRAY' ? $_ : [$_] } @_;
  7         29  
295 17         140 my @where;
296             my @bind;
297 17         76 foreach my $i (0 .. 3) {
298 62         169 my $name = $pos_names[$i];
299 62         103 my $terms = $nodes[$i];
300 62 100       162 if (defined($terms)) {
301 7 100 66     32 unless (scalar(@$terms) == 1 and not defined($terms->[0])) {
302 5 100   5   32 unless (any { $_->does('Attean::API::Variable') } @$terms) {
  5         22  
303 4         187 my @ids = map { $self->_get_term_id($_) } @$terms;
  4         13  
304 4 100       205 return 0 unless scalar(@ids);
305 2         9 push(@where, "$name IN (" . join(', ', ('?') x scalar(@ids)) . ")");
306 2         12 push(@bind, @ids);
307             }
308             }
309             }
310             }
311 15         49 my $sql = 'SELECT COUNT(*) FROM quad';
312 15 100       52 if (scalar(@where)) {
313 2         7 $sql .= ' WHERE ' . join(' AND ', @where);
314             }
315 15         125 my $sth = $self->dbh->prepare($sql);
316 15         2360 $sth->execute(@bind);
317 15         323 my ($count) = $sth->fetchrow_array;
318 15         381 return $count;
319             }
321             =item C<< get_graphs >>
323             Returns an iterator over the Attean::API::Term objects comprising
324             the set of graphs of the stored quads.
326             =cut
328             sub get_graphs {
329 10     10 1 1559 my $self = shift;
330 10         88 my $sth = $self->dbh->prepare('SELECT DISTINCT value FROM quad JOIN term ON (quad.graph = term.term_id)');
331 10         1995 $sth->execute;
332             my $sub = sub {
333 15     15   7989 my $row = $sth->fetchrow_arrayref;
334 15 100       84 return unless ref($row);
335 6         20 my ($value) = @$row;
336 6         174 return Attean::IRI->new(value => $value);
337 10         109 };
338 10         409 return Attean::CodeIterator->new( generator => $sub, item_type => 'Attean::API::Term' );
339             }
341             # -----------------------------------------------------------------------------
343             =item C<< add_quad ( $quad ) >>
345             Adds the specified C<$quad> to the underlying model.
347             =cut
349             sub add_quad {
350             my $self = shift;
351             my $st = shift;
352             my @ids = map { $self->_get_or_create_term_id($_) } $st->values;
353             if (any { not defined($_) } @ids) {
354             return;
355             }
357             my $type = $self->database_type;
358             my @bind = @ids;
359             my $sql = 'INSERT INTO quad (subject, predicate, object, graph) VALUES (?, ?, ?, ?)';
360             if ($type eq 'sqlite') {
361             $sql = 'INSERT OR IGNORE INTO quad (subject, predicate, object, graph) VALUES (?, ?, ?, ?)';
362             } elsif ($type eq 'mysql') {
363             $sql = 'INSERT IGNORE INTO quad (subject, predicate, object, graph) VALUES (?, ?, ?, ?)';
364             } elsif ($type eq 'postgresql') {
365             $sql = 'INSERT INTO quad (subject, predicate, object, graph) SELECT ?, ?, ?, ? WHERE NOT EXISTS (SELECT 1 FROM quad WHERE subject = ? AND predicate = ? AND object = ? AND graph = ?)';
366             push(@bind, @ids);
367             }
368             my $sth = $self->dbh->prepare($sql);
369             $sth->execute(@bind);
370             return;
371             }
373             =item C<< remove_quad ( $statement ) >>
375             Removes the specified C<$statement> from the underlying model.
377             =cut
379             sub remove_quad {
380 4     4 1 9 my $self = shift;
381 4         7 my $st = shift;
382 4         35 my @ids = map { $self->_get_term_id($_) } $st->values;
  16         198  
383 4 50       27 unless (scalar(@ids) == 4) {
384 0         0 return;
385             }
386 4 50   16   27 unless (all { defined($_) } @ids) {
  16         26  
387 0         0 return;
388             }
389 4         37 my $sth = $self->dbh->prepare('DELETE FROM quad WHERE subject = ? AND predicate = ? AND object = ? AND graph = ?');
390 4         32650 $sth->execute(@ids);
391 4         84 return;
392             }
394             =item C<< create_graph( $graph ) >>
396             This is a no-op function for the memory quad-store.
398             =cut
400       4 1   sub create_graph {
401             # no-op on a quad-store
402             }
404             =item C<< drop_graph( $graph ) >>
406             Removes all quads with the given C<< $graph >>.
408             =cut
410             sub drop_graph {
411 2     2 1 1066 my $self = shift;
412 2         12 return $self->clear_graph(@_);
413             }
415             =item C<< clear_graph( $graph ) >>
417             Removes all quads with the given C<< $graph >>.
419             =cut
421             sub clear_graph {
422 4     4 1 1180 my $self = shift;
423 4         10 my $graph = shift;
424 4         22 my $gid = $self->_get_term_id($graph);
425 4 50       25 return unless defined($gid);
426 4         53 my $sth = $self->dbh->prepare('DELETE FROM quad WHERE graph = ?');
427 4         45177 $sth->execute($gid);
428 4         110 return;
429             }
431             =item C<< begin_transaction >>
433             Begin a database transaction.
435             =cut
437             sub begin_transaction {
438             # warn 'begin transaction';
439 0     0 1 0 my $self = shift;
440 0         0 $self->dbh->begin_work;
441             }
443             =item C<< abort_transaction >>
445             Rollback the current database transaction.
447             =cut
449             sub abort_transaction {
450             # warn 'abort transaction';
451 0     0 1 0 my $self = shift;
452 0         0 $self->dbh->rollback;
453             }
455             =item C<< end_transaction >>
457             Commit the current database transaction.
459             =cut
461             sub end_transaction {
462             # warn 'end transaction';
463 0     0 1 0 my $self = shift;
464 0         0 $self->dbh->commit;
465             }
467             =item C<< begin_bulk_updates >>
469             Begin a database transaction.
471             =cut
473             sub begin_bulk_updates {
474 0     0 1 0 my $self = shift;
475 0         0 $self->dbh->begin_work;
476             }
478             =item C<< end_bulk_updates >>
480             Commit the current database transaction.
482             =cut
484             sub end_bulk_updates {
485 0     0 1 0 my $self = shift;
486 0         0 $self->dbh->commit;
487             }
489             =item C<< database_type >>
491             Returns the database type name as a string (e.g. 'mysql', 'sqlite', or 'postgresql').
493             =cut
495             sub database_type {
496 164     164 1 432 my $self = shift;
497 164         616 my $dbh = $self->dbh;
498             # warn $dbh->get_info($GetInfoType{SQL_DRIVER_NAME});
499 164         1860 my $type = lc($dbh->get_info($GetInfoType{SQL_DBMS_NAME}));
500 164         5232 return $type;
501             }
503             =item C<< initialize_version >>
505             Insert data into the attean_version table.
507             =cut
509             sub initialize_version {
510 0     0 1 0 my $self = shift;
511 0         0 my $dbh = $self->dbh;
512 0         0 $dbh->do('DELETE FROM attean_version');
513 0         0 my $sql = 'INSERT INTO attean_version (attean_version, store_version) VALUES (?, ?);';
514 0         0 $dbh->do($sql, undef, $Attean::VERSION, $AtteanX::Store::DBI::VERSION);
515             }
517             =item C<< create_schema_file >>
519             Returns the path to the file containing the database DDL for quadstore creation
520             for the current database type if available, undef otherwise.
522             =cut
524             sub create_schema_file {
525 20     20 1 1340 my $self = shift;
526 20         89 my $type = $self->database_type;
527 20   50     126 my $dir = $ENV{ATTEAN_SHAREDIR} || eval { dist_dir('AtteanX-Store-DBI') } || 'share';
528 20         3313 my $file = File::Spec->catfile($dir, 'database-schema', sprintf('%s-create.sql', $type));
529 20 50       501 if (-r $file) {
530 20         140 return $file;
531             }
532 0         0 return;
533             }
535             =item C<< drop_schema_file >>
537             Returns the path to the file containing the database DDL for quadstore deletion
538             for the current database type if available, undef otherwise.
540             =cut
542             sub drop_schema_file {
543 0     0 1 0 my $self = shift;
544 0         0 my $type = $self->database_type;
545 0   0     0 my $dir = $ENV{ATTEAN_SHAREDIR} || eval { dist_dir('AtteanX-Store-DBI') } || 'share';
546 0         0 my $file = File::Spec->catfile($dir, 'database-schema', sprintf('%s-drop.sql', $type));
547 0 0       0 if (-r $file) {
548 0         0 return $file;
549             }
550 0         0 return;
551             }
553             =item C<< available_database_types >>
555             Returns the names of the database types for which the system has schemas
556             available to create and drop quadstore tables.
558             =cut
560             sub available_database_types {
561 0     0 1 0 my $self = shift;
562 0   0     0 my $dir = $ENV{ATTEAN_SHAREDIR} || eval { dist_dir('AtteanX-Store-DBI') } || 'share';
563 0         0 my $pat = File::Spec->catfile($dir, 'database-schema', '*-create.sql');
564 0         0 my @files = glob($pat);
565 0         0 my @types = map { /(\w+)-create.sql/ } @files;
  0         0  
566 0         0 return @types;
567             }
569             =item C<< dbi_connect_args ( $type, %args ) >>
571             =item C<< dbi_connect_args ( %args ) >>
573             Returns a quad C<< $dsn, $user, $password, \%connect_args >> suitable for
574             passing to C<< DBI->connect >> to obtain a database handle to be used in
575             constructing a C<< AtteanX::Store::DBI >> quadstore.
577             C<< %args >> must contain a value for the C<< database >> key. It may also
578             contain values for the optional keys: C<< user >>, C<< password >>,
579             C<< host >>, and C<< port >>.
581             If invoked as a class method, the C<< $type >> parameter is required, and must
582             be one of the database types returned by C<< available_database_types >>.
584             If invoked as an object method, the C<< $type >> parameter must not be
585             included; this information will be obtained directly from the
586             C<< AtteanX::Store::DBI >> object.
588             =cut
590             sub dbi_connect_args {
591 20     20 1 111966 my $self = shift;
592 20 50       139 my $type = blessed($self) ? $self->database_type : shift;
593 20         112 my %args = @_;
594 20         75 my $database = $args{database};
595 20         58 my $user = $args{user};
596 20         49 my $password = $args{password};
597 20         70 my $host = $args{host};
598 20         69 my $port = $args{port};
599 20         53 my $dsn;
600             my %connect_args;
601 20         71 $connect_args{RaiseError} = 1;
603 20 50       162 if ($type eq 'mysql') {
604 0         0 $dsn = "DBI:mysql:database=${database}";
605 0 0       0 if (defined($host)) {
606 0         0 $dsn .= ";host=$host";
607             }
608 0 0       0 if (defined($port)) {
609 0         0 $dsn .= ";port=$port";
610             }
611 0         0 $connect_args{mysql_enable_utf8} = 1;
612             } elsif ($type eq 'postgresql') {
613 0         0 $dsn = "DBI:Pg:dbname=${database}";
614 0 0       0 if (defined($host)) {
615 0         0 $dsn .= ";host=$host";
616             }
617 0 0       0 if (defined($port)) {
618 0         0 $dsn .= ";port=$port";
619             }
620             } elsif ($type eq 'sqlite') {
621 20         83 $dsn = "DBI:SQLite:dbname=${database}";
622 20         63 $connect_args{sqlite_unicode} = 1;
623             }
625 20         153 return ($dsn, $user, $password, \%connect_args);
626             }
628             =item C<< plans_for_algebra( $algebra, $model, $active_graphs, $default_graphs ) >>
630             For BGP algebras, returns a DBI-specific L<Attean::API::Plan> object, otherwise
631             returns undef.
633             =cut
635             sub plans_for_algebra {
636 36     36 1 359975 my $self = shift;
637 36         61 my $algebra = shift;
638 36         56 my $model = shift;
639 36         65 my $active_graphs = shift;
640 36         54 my $default_graphs = shift;
641 36 50       100 return unless ($algebra);
643 36         68 my %args = @_;
644 36         155 my $counter = $args{dbi_filter_counter}++;
646 36 100 66     356 if ($algebra->isa('Attean::Algebra::Filter')) {
647 9         43 my $e = $algebra->expression;
648 9 50       52 if ($e->isa('Attean::FunctionExpression')) {
649 9 100 33     96 if ($e->operator =~ m/IS(IRI|LITERAL|BLANK)/i) {
650 4         21 my $type = lc($1);
651 4         11 my ($operand) = @{ $e->children };
  4         18  
652 4 50 33     39 if ($operand->isa('Attean::ValueExpression') and $operand->value->does('Attean::API::Variable')) {
653 4         79 my $var = $operand->value;
654 4 50       20 if (my ($plan) = $self->plans_for_algebra($algebra->child, $model, $active_graphs, $default_graphs, %args)) {
655 4 50       1893 if ($plan->isa('AtteanX::Store::DBI::Plan')) {
656 4 50       28 if (exists $plan->variables->{ $var->value }) {
657 4         9 my ($table, $col) = @{ $plan->variables->{ $var->value } };
  4         20  
658 4         17 my $ref = join('.', map { $self->dbh->quote_identifier($_) } ($table, $col));
  8         256  
659 4         113 my $typecol = $self->dbh->quote_identifier('type');
660 4         96 push(@{ $plan->where }, "$ref IN (SELECT term_id FROM term WHERE ${typecol} = ?)");
  4         30  
661 4         9 push(@{ $plan->bindings }, $type);
  4         14  
662 4         28 return $plan;
663             }
664             }
665             }
666             }
667             } elsif ($e->operator eq 'STRSTARTS' or $e->operator eq 'CONTAINS') {
668 5         13 my ($varexpr, $pat) = @{ $e->children };
  5         22  
669 5 50 66     45 if ($varexpr->isa('Attean::ValueExpression') and $varexpr->value->does('Attean::API::Variable') and $pat->isa('Attean::ValueExpression') and $pat->value->does('Attean::API::Literal')) {
670 4 50       189 if (my ($plan) = $self->plans_for_algebra($algebra->child, $model, $active_graphs, $default_graphs, %args)) {
671 4 50       10706 if ($plan->isa('AtteanX::Store::DBI::Plan')) {
672 4         18 my $var = $varexpr->value;
673 4         14 my $varname = $var->value;
674 4 50       25 if (exists $plan->variables->{ $var->value }) {
675 4         7 my ($table, $col) = @{ $plan->variables->{ $var->value } };
  4         18  
676 4         12 my $literal = $pat->value;
677 4         10 my $ref = join('.', map { $self->dbh->quote_identifier($_) } ($table, $col));
  8         316  
678 4         112 my $typecol = $self->dbh->quote_identifier('type');
679 4         95 my $termtable = "tf$counter";
680 4         7 push(@{ $plan->tables }, ['term', $termtable]);
  4         22  
682 4         21 my $db = $self->database_type;
683 4 50 33     41 return unless ($db eq 'mysql' or $db eq 'postgresql' or $db eq 'sqlite');
685 4         10 push(@{ $plan->where }, "$ref = $termtable.term_id");
  4         21  
687 4         10 push(@{ $plan->where }, "$termtable.$typecol = ?"); # TODO: Remove this (and the bindings below) if $varexpr is STR(?var) instead of just ?var.
  4         15  
689 4 50       19 my $op = ($e->operator eq 'STRSTARTS') ? '=' : '>=';
690 4 50       23 if ($db eq 'mysql') {
691 0         0 push(@{ $plan->where }, "LOCATE(?, $termtable.value) ${op} ?");
  0         0  
692 0         0 push(@{ $plan->bindings }, 'literal');
  0         0  
693 0         0 push(@{ $plan->bindings }, $literal->value);
  0         0  
694 0         0 push(@{ $plan->bindings }, 1);
  0         0  
695             } elsif ($db eq 'postgresql') {
696 0         0 push(@{ $plan->where }, "STRPOS($termtable.value, ?) ${op} ?");
  0         0  
697 0         0 push(@{ $plan->bindings }, 'literal');
  0         0  
698 0         0 push(@{ $plan->bindings }, $literal->value);
  0         0  
699 0         0 push(@{ $plan->bindings }, 1);
  0         0  
700             } elsif ($db eq 'sqlite') {
701 4         8 push(@{ $plan->where }, "INSTR($termtable.value, ?) ${op} 1");
  4         18  
702 4         8 push(@{ $plan->bindings }, 'literal');
  4         15  
703 4         6 push(@{ $plan->bindings }, $literal->value);
  4         21  
704             }
706 4 100       26 if (my $lang = $literal->language) {
707 2         4 push(@{ $plan->where }, "$termtable.language = ?");
  2         10  
708 2         5 push(@{ $plan->bindings }, $lang);
  2         7  
709             } else {
710 2         51 my $xs = Attean::IRI->new( value => '' );
711 2         675 my $la = Attean::IRI->new( value => '' );
712 2         792 my $xid = $self->_get_term_id($xs);
713 2         8 my $lid = $self->_get_term_id($la);
714 2         6 push(@{ $plan->where }, "$termtable.datatype_id IN (?, ?)");
  2         12  
715 2         6 push(@{ $plan->bindings }, $xid, $lid);
  2         14  
716             }
717 4         28 return $plan;
718             }
719             }
720             }
721             }
722             }
723             }
724 9         74 } elsif ($algebra->isa('Attean::Algebra::BGP') and scalar(@{ $algebra->triples }) > 0) {
725 9         38 my @vars = $algebra->in_scope_variables;
727 9         2520 my @triples = @{ $algebra->triples };
  9         39  
728 9         55 my @select;
729             my @where_joins;
730 9         0 my %seen_vars;
731 9         0 my %source_table_for_var;
732 9         0 my %blanks;
733 9         24 my $tcounter = 0;
734 9         16 my $bcounter = 0;
735 9         17 my @tables;
737             my %rename_mapping;
738             my $rename_proj = sub {
739 36     36   67 my $name = shift;
740 36 100       148 if ($name =~ /[-._]|\W/) {
741 9         35 my $old = $name;
742 9         69 $name =~ s/_/__/g;
743 9         53 $name =~ s/([-.]|\W)/_d/g;
744 9         38 $name =~ s/([-.]|\W)/'_x' . sprintf('%02x', ord($1))/e;
  0         0  
745 9         33 $rename_mapping{$old} = $name;
746             }
748 36         81 return $name;
749 9         66 };
751 9 50       52 Carp::confess Dumper($active_graphs) unless (ref($active_graphs) eq 'ARRAY');
753 9         20 my @graph_ids = map { $self->_get_term_id($_) } @{ $active_graphs };
  9         49  
  9         24  
754 9 50   9   91 if (any { not defined($_) } @graph_ids) {
  9         46  
755 0         0 return;
756             }
758 9         35 my @bind;
759 9         205 my $graph = Attean::Variable->new(value => '___g');
760 9         968 $seen_vars{ $graph->value }++;
761 9         100 my $graph_values = sprintf('(%s)', join(', ', ('?') x scalar(@graph_ids)));
762 9         32 push(@bind, @graph_ids);
763 9         38 my @where = ("t0.graph IN $graph_values");
765 9         30 foreach my $t (@triples) {
766 9         28 my $table = 't' . $tcounter++;
767 9         40 push(@tables, ['quad', $table]);
769 9         25 my @vars;
770 9         61 my $q = $t->as_quadpattern($graph);
771 9         3557 my @nodes = $q->values;
772 9         448 foreach my $i (0 .. $#nodes) {
773 36         72 my $node = $nodes[$i];
774 36         88 my $name = $pos_names[$i];
775 36 50       109 if ($node->does('Attean::API::Variable')) {
776 36         518 my $var = $node;
777 36         103 push(@vars, [$var, $name]);
778             } elsif ($node->does('Attean::API::Blank')) {
779 0         0 my $id = $node->value;
780 0 0       0 unless (exists $blanks{$id}) {
781 0         0 my $bname = sprintf('.%s_%d', 'blank', $bcounter++);
782 0         0 $blanks{$id} = Attean::Variable->new(value => $bname);
783             }
784 0         0 my $var = $blanks{$id};
785 0         0 push(@vars, [$var, $name]);
786 0         0 $seen_vars{ $var->value }++;
787             } else {
788 0         0 my $id = $self->_get_term_id($node);
789 0 0       0 return unless defined($id);
790 0         0 push(@where, sprintf('%s.%s = ?', $table, $name));
791 0         0 push(@bind, $id);
792             }
793             }
795 9         35 foreach my $vdata (@vars) {
796 36         110 my ($var, $name) = @$vdata;
797 36         81 my $var_name = $rename_proj->( $var->value );
798 36 100       184 push(@select, [$table, $name, $var_name]) unless ($seen_vars{$var->value}++);
799 36 50       103 if (my $tt = $source_table_for_var{ $var->value }) {
800 0         0 push(@where_joins, ['=', $tt, [$table, $name]]);
801             } else {
802 36         145 $source_table_for_var{ $var->value } = [$table, $name];
803             }
804             }
805             }
807 9         33 foreach my $w (@where_joins) {
808 0         0 my ($op, $a, $b) = @$w;
809 0         0 my ($as, $bs) = map { sprintf('%s.%s', @$_) } ($a, $b);
  0         0  
810 0         0 push(@where, join(' ', $as, $op, $bs));
811             }
813 9         288 return AtteanX::Store::DBI::Plan->new(
814             store => $self,
815             select => \@select,
816             where => \@where,
817             tables => \@tables,
818             in_scope_variables => [@vars],
819             rename_mapping => \%rename_mapping,
820             bindings => \@bind,
821             variables => \%source_table_for_var,
822             );
823             }
825 19         67 return;
826             }
828             =item C<< cost_for_plan( $plan ) >>
830             Returns the estimated cost for a DBI-specific query plan, undef otherwise.
832             =cut
834             sub cost_for_plan {
835             my $self = shift;
836             my $plan = shift;
837             if ($plan->isa('AtteanX::Store::DBI::Plan')) {
838             return 1; # TODO: actually estimate cost here
839             }
840             return;
841             }
843             }
845             package AtteanX::Store::DBI::Plan 0.012 {
846 3     3   22945 use Moo;
  3         9  
  3         30  
847 3     3   8480 use Type::Tiny::Role;
  3         7  
  3         100  
848 3     3   18 use Types::Standard qw(HashRef ArrayRef InstanceOf Str);
  3         7  
  3         62  
849 3     3   2865 use namespace::clean;
  3         11  
  3         24  
851             has store => (is => 'ro', isa => InstanceOf['AtteanX::Store::DBI'], required => 1);
852             has rename_mapping => (is => 'ro', isa => HashRef[Str], default => sub { +{} });
853             has variables => (is => 'ro', isa => HashRef, required => 1);
854             has bindings => (is => 'ro', isa => ArrayRef, required => 1);
855             has select => (is => 'ro', isa => ArrayRef, required => 1);
856             has where => (is => 'ro', isa => ArrayRef, required => 1);
857             has tables => (is => 'ro', isa => ArrayRef[ArrayRef[Str]], required => 1);
859             with 'Attean::API::BindingSubstitutionPlan', 'Attean::API::NullaryQueryTree';
861             sub plan_as_string {
862 0     0 0 0 my $self = shift;
863 0         0 my ($sql, @bind) = $self->sql();
864 0         0 return sprintf('DBI BGP { %s ← (%s) }', $sql, join(', ', @bind));
865             }
867             sub sql {
868 9     9 0 1772 my $self = shift;
869 9         24 my $bind = shift;
871 9         33 my $store = $self->store;
872 9         31 my $dbh = $store->dbh;
873 9         17 my @bind = @{ $self->bindings };
  9         38  
874 9         21 my @where = @{ $self->where };
  9         40  
875 9 100       36 if ($bind) {
876 3         15 foreach my $var ($bind->variables) {
877 0         0 my $id = $store->_get_term_id($bind->value($var));
878 0 0       0 return unless defined($id);
879 0 0       0 if (my $cdata = $self->variables->{ $var }) {
880 0         0 my ($table, $col) = @$cdata;
881 0         0 push(@where, sprintf("%s.%s = ?", $table, $col));
882 0         0 push(@bind, $id);
883             }
884             }
885             }
887 9         47 my @select = map { sprintf("%s.%s AS %s", map { $dbh->quote_identifier( $_ ) } @$_) } @{ $self->select };
  27         466  
  81         1386  
  9         34  
888 9 50       233 unless (scalar(@select)) {
889 0         0 push(@select, '1');
890             }
893 9         17 my @sql;
894 9         25 push(@sql, 'SELECT');
895 9         32 push(@sql, join(', ', @select));
897 9         34 push(@sql, 'FROM');
898 9         26 push(@sql, join(', ', map { join(' ', @$_) } @{ $self->tables }));
  13         50  
  9         29  
900 9 50       31 if (scalar(@where)) {
901 9         22 push(@sql, 'WHERE');
902 9         25 push(@sql, join(' AND ', map { "($_)" } @where));
  29         85  
903             }
905 9         38 my $sql = join(" ", @sql);
906 9         51 return ($sql, @bind);
907             }
909             sub substitute_impl {
910 3     3 0 3210 my $self = shift;
911 3         10 my $model = shift;
912 3         17 my ($sql, @bind) = $self->sql(@_);
913 3         11 my $store = $self->store;
914 3         8 my $dbh = $store->dbh;
916             # warn "TODO: generatee SQL for BGP: $sql\n";
917             # warn "======================================================================\n";
918             # warn "$sql\n";
919             # warn "======================================================================\n";
921 3         11 my $vars = $self->in_scope_variables;
922 3         23 my $sth = $dbh->prepare($sql);
923             return sub {
924             # warn "Generating impl by executing SQL";
925 3     3   449 my $rv = $sth->execute(@bind);
926 3 50       21 unless ($rv) {
927 0         0 warn '*** SQL error: ' . $sth->errstr;
928 0         0 die;
929             }
930             my $sub = sub {
931             # warn '=== Iterator invoked';
932             # use Data::Dumper;
933             # warn Dumper($sql, \@bind);
934 10 100       24761 if (my $row = $sth->fetchrow_hashref) {
935             # warn '@@@ Iterator got row';
936             # warn Dumper($row);
938 7         34 my %bindings;
939 7         24 foreach my $k (@$vars) {
940 21   33     101 my $key = $self->rename_mapping->{$k} // $k;
941 21         59 my $term = $store->_get_term($row->{$key});
942 21         62 $bindings{$k} = $term;
943             }
944 7         245 my $r = Attean::Result->new( bindings => \%bindings );
945 7         1665 return $r;
946             } else {
947             # warn '^^^ Reached end-of-iterator'
948             }
949 3         15 return;
950 3         27 };
951 3         109 return Attean::CodeIterator->new( generator => $sub, item_type => 'Attean::API::Result', variables => $vars );
952 3         619 };
953             }
955             }
957             1;
959             __END__
961             =back
963             =head1 BUGS
965             Please report any bugs or feature requests to through the GitHub web interface
966             at L<>.
968             =head1 AUTHOR
970             Gregory Todd Williams C<< <> >>
972             =head1 COPYRIGHT
974             Copyright (c) 2014--2020 Gregory Todd Williams. This
975             program is free software; you can redistribute it and/or modify it under
976             the same terms as Perl itself.
978             =cut