File Coverage

blib/lib/AtteanX/Store/DBI.pm
Criterion Covered Total %
statement 397 513 77.3
branch 86 154 55.8
condition 15 36 41.6
subroutine 45 56 80.3
pod 20 23 86.9
total 563 782 71.9


line stmt bran cond sub pod time code
1             =head1 NAME
2              
3             AtteanX::Store::DBI - Database quad-store
4              
5             =head1 VERSION
6              
7             This document describes AtteanX::Store::DBI version 0.001_01
8              
9             =head1 SYNOPSIS
10              
11             use Attean;
12             my $store = Attean->get_store('DBI')->new( dbh => $dbh );
13              
14             =head1 DESCRIPTION
15              
16             AtteanX::Store::DBI provides a quad-store backed by a relational database.
17              
18             =head1 ATTRIBUTES
19              
20             =over 4
21              
22             =item C<< dbh >>
23              
24             =back
25              
26             =cut
27              
28 3     3   7918844 use utf8;
  3         25  
  3         31  
29 3     3   121 use v5.14;
  3         12  
30 3     3   16 use warnings;
  3         6  
  3         185  
31              
32             package AtteanX::Store::DBI {
33             our $VERSION = '0.001_01';
34 3     3   24 use Moo;
  3         4  
  3         22  
35 3     3   9595 use DBI;
  3         58087  
  3         231  
36 3     3   32 use Attean 0.013;
  3         67  
  3         28  
37 3     3   2462 use DBI::Const::GetInfoType;
  3         20104  
  3         457  
38 3     3   26 use Type::Tiny::Role;
  3         8  
  3         120  
39 3     3   15 use Types::Standard qw(Int Str ArrayRef HashRef ConsumerOf InstanceOf);
  3         8  
  3         36  
40 3     3   3815 use Encode;
  3         8  
  3         239  
41 3     3   1520 use Cache::LRU;
  3         2148  
  3         98  
42 3     3   22 use Set::Scalar;
  3         7  
  3         115  
43 3     3   1107 use DBIx::MultiStatementDo;
  3         2261118  
  3         159  
44 3     3   29 use List::MoreUtils qw(zip);
  3         19  
  3         34  
45 3     3   2985 use List::Util qw(any all first);
  3         6  
  3         271  
46 3     3   21 use File::ShareDir qw(dist_dir dist_file);
  3         36  
  3         157  
47 3     3   2696 use File::Slurp;
  3         32015  
  3         203  
48 3     3   25 use Scalar::Util qw(refaddr reftype blessed);
  3         7  
  3         143  
49 3     3   20 use namespace::clean;
  3         7  
  3         34  
50              
51             with 'Attean::API::MutableQuadStore';
52             with 'Attean::API::BulkUpdatableStore';
53             with 'Attean::API::QuadStore';
54             with 'Attean::API::CostPlanner';
55              
56             my @pos_names = Attean::API::Quad->variables;
57              
58             =head1 ROLES
59              
60             This class consumes L<Attean::API::QuadStore>,
61             L<Attean::API::MutableQuadStore>, and L<Attean::API::BulkUpdatableStore>.
62              
63             =head1 METHODS
64              
65             =over 4
66              
67             =item C<< new ( dbh => $dbh ) >>
68              
69             Returns a new quad-store object backed by the database referenced by the
70             supplied database handle.
71              
72             =cut
73              
74             has dbh => (is => 'ro', isa => InstanceOf['DBI::db'], required => 1);
75             has _i2t_cache => (is => 'ro', default => sub { Cache::LRU->new( size => 256 ) });
76             has _t2i_cache => (is => 'ro', default => sub { Cache::LRU->new( size => 256 ) });
77              
78             =item C<< init() >>
79              
80             Create the tables and indexes required for using the database as a quadstore.
81              
82             =cut
83             sub init {
84 0     0 1 0 my $self = shift;
85 0         0 my $dbh = $self->dbh;
86 0         0 my $batch = DBIx::MultiStatementDo->new( dbh => $dbh );
87 0 0       0 my $file = $self->create_schema_file or die 'No schema files available for store initialization';
88 0         0 my $sql = read_file($file);
89 0         0 $batch->do($sql);
90             }
91            
92             =item C<< temporary_store() >>
93              
94             Returns a temporary (in-memory, SQLite) store.
95              
96             =cut
97             sub temporary_store {
98 0     0 1 0 my $class = shift;
99 0         0 my $dbh = DBI->connect('dbi:SQLite:dbname=:memory:', '', '');
100 0         0 my $store = $class->new(dbh => $dbh);
101 0         0 $store->init();
102 0         0 return $store;
103             }
104            
105             sub _last_insert_id {
106 246     246   806 my $self = shift;
107 246         658 my $table = shift;
108 246         1194 my $dbh = $self->dbh;
109 246         7571 return $dbh->last_insert_id(undef, undef, $table, undef);
110             }
111              
112             sub _get_term {
113 41     41   75 my $self = shift;
114 41         70 my $id = shift;
115 41 100       137 if (my $term = $self->_i2t_cache->get($id)) {
116 28         511 return $term;
117             }
118 13         210 my $sth = $self->dbh->prepare('SELECT term.type, term.value, dtterm.value AS datatype, term.language FROM term LEFT JOIN term dtterm ON (term.datatype_id = dtterm.term_id) WHERE term.term_id = ?');
119 13         1977 $sth->execute($id);
120 13         450 my $row = $sth->fetchrow_hashref;
121 13         71 my $type = $row->{type};
122 13         24 my $term;
123 13         33 my $value = $row->{value};
124 13         25 my $datatype = $row->{datatype};
125 13         28 my $lang = $row->{language};
126 13 100       57 if ($type eq 'iri') {
    50          
    50          
127 9         372 $term = Attean::IRI->new( value => $value );
128             } elsif ($type eq 'blank') {
129 0         0 $term = Attean::Blank->new( value => $value );
130             } elsif ($type eq 'literal') {
131 4         113 my %args = (value => $value, datatype => Attean::IRI->new(value => $datatype));
132 4 100       1529 if ($lang) {
133 2         7 $args{language} = $lang;
134             }
135 4         84 $term = Attean::Literal->new( %args );
136             }
137 13 50       5439 if ($term) {
138 13         82 $self->_i2t_cache->set($id => $term);
139 13         754 return $term;
140             }
141 0         0 Carp::confess "Failed to load term values for bad ID " . Dumper($id);
142             }
143            
144             sub _get_term_id {
145 41     41   80 my $self = shift;
146 41         69 my $term = shift;
147 41 100       193 if (my $id = $self->_t2i_cache->get($term->as_string)) {
148 37         9296 return $id;
149             }
150 4         1238 my $dbh = $self->dbh;
151 4         10 my $tid;
152 4 50       15 if ($term->does('Attean::API::IRI')) {
    0          
    0          
153 4         98 my $sth = $dbh->prepare('SELECT term_id FROM term WHERE type = ? AND value = ?');
154 4         339 my $value = $term->value;
155 4         268 $sth->execute('iri', $value);
156 4         78 ($tid) = $sth->fetchrow_array;
157             } elsif ($term->does('Attean::API::Blank')) {
158 0         0 my $sth = $dbh->prepare('SELECT term_id FROM term WHERE type = ? AND value = ?');
159 0         0 my $value = $term->value;
160 0         0 $sth->execute('blank', $value);
161 0         0 ($tid) = $sth->fetchrow_array;
162             } elsif ($term->does('Attean::API::Literal')) {
163 0         0 my $dtid = $self->_get_or_create_term_id($term->datatype);
164 0         0 my $sth = $dbh->prepare('SELECT term_id FROM term WHERE type = ? AND value = ? AND datatype_id = ? AND language = ?');
165 0         0 my $value = $term->value;
166 0         0 my $lang = $term->language;
167 0         0 $sth->execute('literal', $value, $dtid, $lang);
168 0         0 ($tid) = $sth->fetchrow_array;
169             }
170 4 50       22 if (defined($tid)) {
171 0         0 $self->_t2i_cache->set($term->as_string => $tid);
172 0         0 return $tid;
173             }
174 4         74 return;
175             }
176            
177             sub _get_or_create_term_id {
178 630     630   11585 my $self = shift;
179 630         1296 my $term = shift;
180 630 100       3601 if (my $id = $self->_t2i_cache->get($term->as_string)) {
181 384         131613 return $id;
182             }
183 246         78036 my $dbh = $self->dbh;
184 246         494 my $tid;
185 246         1911 my $insert_term_sth = $dbh->prepare('INSERT INTO term (type, value, datatype_id, language) VALUES (?, ?, ?, ?)');
186 246 100       24615 if ($term->does('Attean::API::IRI')) {
    100          
    50          
187 134         4304 my $sth = $dbh->prepare('SELECT term_id FROM term WHERE type = ? AND value = ?');
188 134         13173 my $value = $term->value;
189 134         12878 $sth->execute('iri', $value);
190 134         1729 ($tid) = $sth->fetchrow_array;
191 134 50       552 unless (defined($tid)) {
192 134         1494942 $insert_term_sth->execute('iri', $value, undef, undef);
193 134         1937 $tid = $self->_last_insert_id('term');
194             }
195             } elsif ($term->does('Attean::API::Blank')) {
196 42         3160 my $sth = $dbh->prepare('SELECT term_id FROM term WHERE type = ? AND value = ?');
197 42         4400 my $value = $term->value;
198 42         4184 $sth->execute('blank', $value);
199 42         479 ($tid) = $sth->fetchrow_array;
200 42 50       271 unless (defined($tid)) {
201 42         474098 $insert_term_sth->execute('blank', $value, undef, undef);
202 42         655 $tid = $self->_last_insert_id('term');
203             }
204             } elsif ($term->does('Attean::API::Literal')) {
205 70         9696 my $dtid = $self->_get_or_create_term_id($term->datatype);
206 70         318 my $sql = 'SELECT term_id FROM term WHERE type = ? AND value = ? AND datatype_id = ?';
207 70         317 my $value = $term->value;
208 70         323 my @bind = ('literal', $value, $dtid);
209 70         322 my $lang = $term->language;
210 70 100       326 if ($lang) {
211 14         70 $sql .= ' AND language = ?';
212 14         49 push(@bind, $lang);
213             }
214 70         512 my $sth = $dbh->prepare($sql);
215 70         15087 $sth->execute(@bind);
216 70         847 ($tid) = $sth->fetchrow_array;
217 70 50       418 unless (defined($tid)) {
218 70         798523 $insert_term_sth->execute('literal', $value, $dtid, $lang);
219 70         1080 $tid = $self->_last_insert_id('term');
220             }
221             } else {
222 0         0 die "Failed to get ID for term: " . $term->as_string;
223             }
224            
225 246 50       1661 if (defined($tid)) {
226 246         2232 $self->_t2i_cache->set($term->as_string => $tid);
227 246         38444 return $tid;
228             }
229 0         0 die;
230             }
231              
232             =item C<< get_quads ( $subject, $predicate, $object, $graph ) >>
233              
234             Returns a stream object of all statements matching the specified subject,
235             predicate and objects. Any of the arguments may be undef to match any value,
236             or an ARRAY reference of terms that are allowable in the respective quad
237             position.
238              
239             =cut
240              
241             sub get_quads {
242             my $self = shift;
243             my @nodes = map { ref($_) eq 'ARRAY' ? $_ : [$_] } @_;
244             my @where;
245             my @bind;
246             foreach my $i (0 .. 3) {
247             my $name = $pos_names[$i];
248             my $terms = $nodes[$i];
249             if (defined($terms)) {
250             unless (scalar(@$terms) == 1 and not defined($terms->[0])) {
251             unless (any { $_->does('Attean::API::Variable') } @$terms) {
252             my @ids = map { $self->_get_term_id($_) } @$terms;
253             unless (scalar(@ids)) {
254             return Attean::ListIterator->new( values => [], item_type => 'Attean::API::Quad' );
255             }
256             push(@where, "$name IN (" . join(', ', ('?') x scalar(@ids)) . ")");
257             push(@bind, @ids);
258             }
259             }
260             }
261             }
262             my $sql = 'SELECT subject, predicate, object, graph FROM quad';
263             if (scalar(@where)) {
264             $sql .= ' WHERE ' . join(' AND ', @where);
265             }
266             my $sth = $self->dbh->prepare($sql);
267             $sth->execute(@bind);
268             my $ok = 1;
269             my $sub = sub {
270             return unless ($ok);
271             if (my $row = $sth->fetchrow_arrayref) {
272             my @terms = map { $self->_get_term($_) } @$row;
273             my $quad = Attean::Quad->new(zip @pos_names, @terms);
274             return $quad;
275             }
276             $ok = 0;
277             return;
278             };
279             my $iter = Attean::CodeIterator->new( generator => $sub, item_type => 'Attean::API::Quad' );
280             return $iter;
281             }
282              
283             =item C<< count_quads ( $subject, $predicate, $object, $graph ) >>
284              
285             Returns the count of all statements matching the specified subject,
286             predicate and objects. Any of the arguments may be undef to match any value,
287             or an ARRAY reference of terms that are allowable in the respective quad
288             position.
289              
290             =cut
291              
292             sub count_quads {
293 17     17 1 2173 my $self = shift;
294 17 50       73 my @nodes = map { ref($_) eq 'ARRAY' ? $_ : [$_] } @_;
  7         29  
295 17         138 my @where;
296             my @bind;
297 17         73 foreach my $i (0 .. 3) {
298 62         162 my $name = $pos_names[$i];
299 62         99 my $terms = $nodes[$i];
300 62 100       170 if (defined($terms)) {
301 7 100 66     36 unless (scalar(@$terms) == 1 and not defined($terms->[0])) {
302 5 100   5   29 unless (any { $_->does('Attean::API::Variable') } @$terms) {
  5         21  
303 4         184 my @ids = map { $self->_get_term_id($_) } @$terms;
  4         16  
304 4 100       31 return 0 unless scalar(@ids);
305 2         11 push(@where, "$name IN (" . join(', ', ('?') x scalar(@ids)) . ")");
306 2         11 push(@bind, @ids);
307             }
308             }
309             }
310             }
311 15         47 my $sql = 'SELECT COUNT(*) FROM quad';
312 15 100       61 if (scalar(@where)) {
313 2         7 $sql .= ' WHERE ' . join(' AND ', @where);
314             }
315 15         145 my $sth = $self->dbh->prepare($sql);
316 15         2523 $sth->execute(@bind);
317 15         338 my ($count) = $sth->fetchrow_array;
318 15         375 return $count;
319             }
320              
321             =item C<< get_graphs >>
322              
323             Returns an iterator over the Attean::API::Term objects comprising
324             the set of graphs of the stored quads.
325              
326             =cut
327              
328             sub get_graphs {
329 10     10 1 1905 my $self = shift;
330 10         93 my $sth = $self->dbh->prepare('SELECT DISTINCT value FROM quad JOIN term ON (quad.graph = term.term_id)');
331 10         2113 $sth->execute;
332             my $sub = sub {
333 15     15   7722 my $row = $sth->fetchrow_arrayref;
334 15 100       83 return unless ref($row);
335 6         27 my ($value) = @$row;
336 6         193 return Attean::IRI->new(value => $value);
337 10         130 };
338 10         379 return Attean::CodeIterator->new( generator => $sub, item_type => 'Attean::API::Term' );
339             }
340              
341             # -----------------------------------------------------------------------------
342              
343             =item C<< add_quad ( $quad ) >>
344              
345             Adds the specified C<$quad> to the underlying model.
346              
347             =cut
348            
349             sub add_quad {
350             my $self = shift;
351             my $st = shift;
352             my @ids = map { $self->_get_or_create_term_id($_) } $st->values;
353             if (any { not defined($_) } @ids) {
354             return;
355             }
356            
357             my $type = $self->database_type;
358             my @bind = @ids;
359             my $sql = 'INSERT INTO quad (subject, predicate, object, graph) VALUES (?, ?, ?, ?)';
360             if ($type eq 'sqlite') {
361             $sql = 'INSERT OR IGNORE INTO quad (subject, predicate, object, graph) VALUES (?, ?, ?, ?)';
362             } elsif ($type eq 'mysql') {
363             $sql = 'INSERT IGNORE INTO quad (subject, predicate, object, graph) VALUES (?, ?, ?, ?)';
364             } elsif ($type eq 'postgresql') {
365             $sql = 'INSERT INTO quad (subject, predicate, object, graph) SELECT ?, ?, ?, ? WHERE NOT EXISTS (SELECT 1 FROM quad WHERE subject = ? AND predicate = ? AND object = ? AND graph = ?)';
366             push(@bind, @ids);
367             }
368             my $sth = $self->dbh->prepare($sql);
369             $sth->execute(@bind);
370             return;
371             }
372              
373             =item C<< remove_quad ( $statement ) >>
374              
375             Removes the specified C<$statement> from the underlying model.
376              
377             =cut
378              
379             sub remove_quad {
380 4     4 1 10 my $self = shift;
381 4         8 my $st = shift;
382 4         41 my @ids = map { $self->_get_term_id($_) } $st->values;
  16         236  
383 4 50       29 unless (scalar(@ids) == 4) {
384 0         0 return;
385             }
386 4 50   16   29 unless (all { defined($_) } @ids) {
  16         30  
387 0         0 return;
388             }
389 4         38 my $sth = $self->dbh->prepare('DELETE FROM quad WHERE subject = ? AND predicate = ? AND object = ? AND graph = ?');
390 4         31237 $sth->execute(@ids);
391 4         113 return;
392             }
393              
394             =item C<< create_graph( $graph ) >>
395              
396             This is a no-op function for the memory quad-store.
397              
398             =cut
399              
400       4 1   sub create_graph {
401             # no-op on a quad-store
402             }
403              
404             =item C<< drop_graph( $graph ) >>
405              
406             Removes all quads with the given C<< $graph >>.
407              
408             =cut
409              
410             sub drop_graph {
411 2     2 1 1045 my $self = shift;
412 2         11 return $self->clear_graph(@_);
413             }
414              
415             =item C<< clear_graph( $graph ) >>
416              
417             Removes all quads with the given C<< $graph >>.
418              
419             =cut
420              
421             sub clear_graph {
422 4     4 1 1120 my $self = shift;
423 4         9 my $graph = shift;
424 4         25 my $gid = $self->_get_term_id($graph);
425 4 50       18 return unless defined($gid);
426 4         39 my $sth = $self->dbh->prepare('DELETE FROM quad WHERE graph = ?');
427 4         41703 $sth->execute($gid);
428 4         131 return;
429             }
430            
431             =item C<< begin_transaction >>
432              
433             Begin a database transaction.
434              
435             =cut
436              
437             sub begin_transaction {
438             # warn 'begin transaction';
439 0     0 1 0 my $self = shift;
440 0         0 $self->dbh->begin_work;
441             }
442            
443             =item C<< abort_transaction >>
444              
445             Rollback the current database transaction.
446              
447             =cut
448              
449             sub abort_transaction {
450             # warn 'abort transaction';
451 0     0 1 0 my $self = shift;
452 0         0 $self->dbh->rollback;
453             }
454            
455             =item C<< end_transaction >>
456              
457             Commit the current database transaction.
458              
459             =cut
460              
461             sub end_transaction {
462             # warn 'end transaction';
463 0     0 1 0 my $self = shift;
464 0         0 $self->dbh->commit;
465             }
466            
467             =item C<< begin_bulk_updates >>
468              
469             Begin a database transaction.
470              
471             =cut
472              
473             sub begin_bulk_updates {
474 0     0 1 0 my $self = shift;
475 0         0 $self->dbh->begin_work;
476             }
477            
478             =item C<< end_bulk_updates >>
479              
480             Commit the current database transaction.
481              
482             =cut
483              
484             sub end_bulk_updates {
485 0     0 1 0 my $self = shift;
486 0         0 $self->dbh->commit;
487             }
488            
489             =item C<< database_type >>
490              
491             Returns the database type name as a string (e.g. 'mysql', 'sqlite', or 'postgresql').
492              
493             =cut
494              
495             sub database_type {
496 164     164 1 408 my $self = shift;
497 164         595 my $dbh = $self->dbh;
498             # warn $dbh->get_info($GetInfoType{SQL_DRIVER_NAME});
499 164         2047 my $type = lc($dbh->get_info($GetInfoType{SQL_DBMS_NAME}));
500 164         5425 return $type;
501             }
502            
503             =item C<< initialize_version >>
504              
505             Insert data into the attean_version table.
506              
507             =cut
508              
509             sub initialize_version {
510 0     0 1 0 my $self = shift;
511 0         0 my $dbh = $self->dbh;
512 0         0 $dbh->do('DELETE FROM attean_version');
513 0         0 my $sql = 'INSERT INTO attean_version (attean_version, store_version) VALUES (?, ?);';
514 0         0 $dbh->do($sql, undef, $Attean::VERSION, $AtteanX::Store::DBI::VERSION);
515             }
516              
517             =item C<< create_schema_file >>
518              
519             Returns the path to the file containing the database DDL for quadstore creation
520             for the current database type if available, undef otherwise.
521              
522             =cut
523              
524             sub create_schema_file {
525 20     20 1 1392 my $self = shift;
526 20         99 my $type = $self->database_type;
527 20   50     133 my $dir = $ENV{ATTEAN_SHAREDIR} || eval { dist_dir('AtteanX-Store-DBI') } || 'share';
528 20         3436 my $file = File::Spec->catfile($dir, 'database-schema', sprintf('%s-create.sql', $type));
529 20 50       497 if (-r $file) {
530 20         130 return $file;
531             }
532 0         0 return;
533             }
534              
535             =item C<< drop_schema_file >>
536              
537             Returns the path to the file containing the database DDL for quadstore deletion
538             for the current database type if available, undef otherwise.
539              
540             =cut
541              
542             sub drop_schema_file {
543 0     0 1 0 my $self = shift;
544 0         0 my $type = $self->database_type;
545 0   0     0 my $dir = $ENV{ATTEAN_SHAREDIR} || eval { dist_dir('AtteanX-Store-DBI') } || 'share';
546 0         0 my $file = File::Spec->catfile($dir, 'database-schema', sprintf('%s-drop.sql', $type));
547 0 0       0 if (-r $file) {
548 0         0 return $file;
549             }
550 0         0 return;
551             }
552            
553             =item C<< available_database_types >>
554              
555             Returns the names of the database types for which the system has schemas
556             available to create and drop quadstore tables.
557              
558             =cut
559              
560             sub available_database_types {
561 0     0 1 0 my $self = shift;
562 0   0     0 my $dir = $ENV{ATTEAN_SHAREDIR} || eval { dist_dir('AtteanX-Store-DBI') } || 'share';
563 0         0 my $pat = File::Spec->catfile($dir, 'database-schema', '*-create.sql');
564 0         0 my @files = glob($pat);
565 0         0 my @types = map { /(\w+)-create.sql/ } @files;
  0         0  
566 0         0 return @types;
567             }
568            
569             =item C<< dbi_connect_args ( $type, %args ) >>
570              
571             =item C<< dbi_connect_args ( %args ) >>
572              
573             Returns a quad C<< $dsn, $user, $password, \%connect_args >> suitable for
574             passing to C<< DBI->connect >> to obtain a database handle to be used in
575             constructing a C<< AtteanX::Store::DBI >> quadstore.
576              
577             C<< %args >> must contain a value for the C<< database >> key. It may also
578             contain values for the optional keys: C<< user >>, C<< password >>,
579             C<< host >>, and C<< port >>.
580              
581             If invoked as a class method, the C<< $type >> parameter is required, and must
582             be one of the database types returned by C<< available_database_types >>.
583              
584             If invoked as an object method, the C<< $type >> parameter must not be
585             included; this information will be obtained directly from the
586             C<< AtteanX::Store::DBI >> object.
587              
588             =cut
589              
590             sub dbi_connect_args {
591 20     20 1 112438 my $self = shift;
592 20 50       134 my $type = blessed($self) ? $self->database_type : shift;
593 20         101 my %args = @_;
594 20         77 my $database = $args{database};
595 20         56 my $user = $args{user};
596 20         59 my $password = $args{password};
597 20         69 my $host = $args{host};
598 20         61 my $port = $args{port};
599 20         55 my $dsn;
600             my %connect_args;
601 20         68 $connect_args{RaiseError} = 1;
602              
603 20 50       190 if ($type eq 'mysql') {
    50          
    50          
604 0         0 $dsn = "DBI:mysql:database=${database}";
605 0 0       0 if (defined($host)) {
606 0         0 $dsn .= ";host=$host";
607             }
608 0 0       0 if (defined($port)) {
609 0         0 $dsn .= ";port=$port";
610             }
611 0         0 $connect_args{mysql_enable_utf8} = 1;
612             } elsif ($type eq 'postgresql') {
613 0         0 $dsn = "DBI:Pg:dbname=${database}";
614 0 0       0 if (defined($host)) {
615 0         0 $dsn .= ";host=$host";
616             }
617 0 0       0 if (defined($port)) {
618 0         0 $dsn .= ";port=$port";
619             }
620             } elsif ($type eq 'sqlite') {
621 20         89 $dsn = "DBI:SQLite:dbname=${database}";
622 20         67 $connect_args{sqlite_unicode} = 1;
623             }
624            
625 20         157 return ($dsn, $user, $password, \%connect_args);
626             }
627              
628             =item C<< plans_for_algebra( $algebra, $model, $active_graphs, $default_graphs ) >>
629              
630             For BGP algebras, returns a DBI-specific L<Attean::API::Plan> object, otherwise
631             returns undef.
632              
633             =cut
634              
635             sub plans_for_algebra {
636 36     36 1 370840 my $self = shift;
637 36         61 my $algebra = shift;
638 36         65 my $model = shift;
639 36         53 my $active_graphs = shift;
640 36         62 my $default_graphs = shift;
641 36 50       114 return unless ($algebra);
642            
643 36         98 my %args = @_;
644 36         109 my $counter = $args{dbi_filter_counter}++;
645            
646 36 100 66     389 if ($algebra->isa('Attean::Algebra::Filter')) {
    100          
647 9         38 my $e = $algebra->expression;
648 9 50       45 if ($e->isa('Attean::FunctionExpression')) {
649 9 100 33     101 if ($e->operator =~ m/IS(IRI|LITERAL|BLANK)/i) {
    50          
650 4         23 my $type = lc($1);
651 4         10 my ($operand) = @{ $e->children };
  4         19  
652 4 50 33     44 if ($operand->isa('Attean::ValueExpression') and $operand->value->does('Attean::API::Variable')) {
653 4         84 my $var = $operand->value;
654 4 50       22 if (my ($plan) = $self->plans_for_algebra($algebra->child, $model, $active_graphs, $default_graphs, %args)) {
655 4 50       2213 if ($plan->isa('AtteanX::Store::DBI::Plan')) {
656 4 50       30 if (exists $plan->variables->{ $var->value }) {
657 4         9 my ($table, $col) = @{ $plan->variables->{ $var->value } };
  4         20  
658 4         12 my $ref = join('.', map { $self->dbh->quote_identifier($_) } ($table, $col));
  8         231  
659 4         108 my $typecol = $self->dbh->quote_identifier('type');
660 4         88 push(@{ $plan->where }, "$ref IN (SELECT term_id FROM term WHERE ${typecol} = ?)");
  4         29  
661 4         10 push(@{ $plan->bindings }, $type);
  4         17  
662 4         23 return $plan;
663             }
664             }
665             }
666             }
667             } elsif ($e->operator eq 'STRSTARTS' or $e->operator eq 'CONTAINS') {
668 5         13 my ($varexpr, $pat) = @{ $e->children };
  5         26  
669 5 50 66     51 if ($varexpr->isa('Attean::ValueExpression') and $varexpr->value->does('Attean::API::Variable') and $pat->isa('Attean::ValueExpression') and $pat->value->does('Attean::API::Literal')) {
      66        
      33        
670 4 50       176 if (my ($plan) = $self->plans_for_algebra($algebra->child, $model, $active_graphs, $default_graphs, %args)) {
671 4 50       11080 if ($plan->isa('AtteanX::Store::DBI::Plan')) {
672 4         21 my $var = $varexpr->value;
673 4         13 my $varname = $var->value;
674 4 50       25 if (exists $plan->variables->{ $var->value }) {
675 4         8 my ($table, $col) = @{ $plan->variables->{ $var->value } };
  4         20  
676 4         14 my $literal = $pat->value;
677 4         10 my $ref = join('.', map { $self->dbh->quote_identifier($_) } ($table, $col));
  8         341  
678 4         109 my $typecol = $self->dbh->quote_identifier('type');
679 4         97 my $termtable = "tf$counter";
680 4         9 push(@{ $plan->tables }, ['term', $termtable]);
  4         20  
681              
682 4         21 my $db = $self->database_type;
683 4 50 33     45 return unless ($db eq 'mysql' or $db eq 'postgresql' or $db eq 'sqlite');
      33        
684            
685 4         9 push(@{ $plan->where }, "$ref = $termtable.term_id");
  4         22  
686            
687 4         8 push(@{ $plan->where }, "$termtable.$typecol = ?"); # TODO: Remove this (and the bindings below) if $varexpr is STR(?var) instead of just ?var.
  4         15  
688            
689 4 50       20 my $op = ($e->operator eq 'STRSTARTS') ? '=' : '>=';
690 4 50       31 if ($db eq 'mysql') {
    50          
    50          
691 0         0 push(@{ $plan->where }, "LOCATE(?, $termtable.value) ${op} ?");
  0         0  
692 0         0 push(@{ $plan->bindings }, 'literal');
  0         0  
693 0         0 push(@{ $plan->bindings }, $literal->value);
  0         0  
694 0         0 push(@{ $plan->bindings }, 1);
  0         0  
695             } elsif ($db eq 'postgresql') {
696 0         0 push(@{ $plan->where }, "STRPOS($termtable.value, ?) ${op} ?");
  0         0  
697 0         0 push(@{ $plan->bindings }, 'literal');
  0         0  
698 0         0 push(@{ $plan->bindings }, $literal->value);
  0         0  
699 0         0 push(@{ $plan->bindings }, 1);
  0         0  
700             } elsif ($db eq 'sqlite') {
701 4         10 push(@{ $plan->where }, "INSTR($termtable.value, ?) ${op} 1");
  4         17  
702 4         9 push(@{ $plan->bindings }, 'literal');
  4         15  
703 4         9 push(@{ $plan->bindings }, $literal->value);
  4         20  
704             }
705            
706 4 100       26 if (my $lang = $literal->language) {
707 2         5 push(@{ $plan->where }, "$termtable.language = ?");
  2         11  
708 2         4 push(@{ $plan->bindings }, $lang);
  2         8  
709             } else {
710 2         52 my $xs = Attean::IRI->new( value => 'http://www.w3.org/2001/XMLSchema#string' );
711 2         686 my $la = Attean::IRI->new( value => 'http://www.w3.org/1999/02/22-rdf-syntax-ns#langString' );
712 2         717 my $xid = $self->_get_term_id($xs);
713 2         8 my $lid = $self->_get_term_id($la);
714 2         5 push(@{ $plan->where }, "$termtable.datatype_id IN (?, ?)");
  2         12  
715 2         5 push(@{ $plan->bindings }, $xid, $lid);
  2         14  
716             }
717 4         45 return $plan;
718             }
719             }
720             }
721             }
722             }
723             }
724 9         61 } elsif ($algebra->isa('Attean::Algebra::BGP') and scalar(@{ $algebra->triples }) > 0) {
725 9         37 my @vars = $algebra->in_scope_variables;
726            
727 9         2562 my @triples = @{ $algebra->triples };
  9         45  
728 9         67 my @select;
729             my @where_joins;
730 9         0 my %seen_vars;
731 9         0 my %source_table_for_var;
732 9         0 my %blanks;
733 9         26 my $tcounter = 0;
734 9         21 my $bcounter = 0;
735 9         20 my @tables;
736            
737             my %rename_mapping;
738             my $rename_proj = sub {
739 36     36   62 my $name = shift;
740 36 100       132 if ($name =~ /[-._]|\W/) {
741 9         30 my $old = $name;
742 9         52 $name =~ s/_/__/g;
743 9         40 $name =~ s/([-.]|\W)/_d/g;
744 9         37 $name =~ s/([-.]|\W)/'_x' . sprintf('%02x', ord($1))/e;
  0         0  
745 9         29 $rename_mapping{$old} = $name;
746             }
747            
748 36         78 return $name;
749 9         76 };
750            
751 9 50       51 Carp::confess Dumper($active_graphs) unless (ref($active_graphs) eq 'ARRAY');
752            
753 9         25 my @graph_ids = map { $self->_get_term_id($_) } @{ $active_graphs };
  9         57  
  9         25  
754 9 50   9   101 if (any { not defined($_) } @graph_ids) {
  9         55  
755 0         0 return;
756             }
757            
758 9         38 my @bind;
759 9         244 my $graph = Attean::Variable->new(value => '___g');
760 9         998 $seen_vars{ $graph->value }++;
761 9         84 my $graph_values = sprintf('(%s)', join(', ', ('?') x scalar(@graph_ids)));
762 9         28 push(@bind, @graph_ids);
763 9         37 my @where = ("t0.graph IN $graph_values");
764            
765 9         28 foreach my $t (@triples) {
766 9         30 my $table = 't' . $tcounter++;
767 9         33 push(@tables, ['quad', $table]);
768              
769 9         27 my @vars;
770 9         63 my $q = $t->as_quadpattern($graph);
771 9         3508 my @nodes = $q->values;
772 9         477 foreach my $i (0 .. $#nodes) {
773 36         64 my $node = $nodes[$i];
774 36         74 my $name = $pos_names[$i];
775 36 50       82 if ($node->does('Attean::API::Variable')) {
    0          
776 36         504 my $var = $node;
777 36         89 push(@vars, [$var, $name]);
778             } elsif ($node->does('Attean::API::Blank')) {
779 0         0 my $id = $node->value;
780 0 0       0 unless (exists $blanks{$id}) {
781 0         0 my $bname = sprintf('.%s_%d', 'blank', $bcounter++);
782 0         0 $blanks{$id} = Attean::Variable->new(value => $bname);
783             }
784 0         0 my $var = $blanks{$id};
785 0         0 push(@vars, [$var, $name]);
786 0         0 $seen_vars{ $var->value }++;
787             } else {
788 0         0 my $id = $self->_get_term_id($node);
789 0 0       0 return unless defined($id);
790 0         0 push(@where, sprintf('%s.%s = ?', $table, $name));
791 0         0 push(@bind, $id);
792             }
793             }
794            
795 9         26 foreach my $vdata (@vars) {
796 36         88 my ($var, $name) = @$vdata;
797 36         79 my $var_name = $rename_proj->( $var->value );
798 36 100       152 push(@select, [$table, $name, $var_name]) unless ($seen_vars{$var->value}++);
799 36 50       93 if (my $tt = $source_table_for_var{ $var->value }) {
800 0         0 push(@where_joins, ['=', $tt, [$table, $name]]);
801             } else {
802 36         141 $source_table_for_var{ $var->value } = [$table, $name];
803             }
804             }
805             }
806            
807 9         35 foreach my $w (@where_joins) {
808 0         0 my ($op, $a, $b) = @$w;
809 0         0 my ($as, $bs) = map { sprintf('%s.%s', @$_) } ($a, $b);
  0         0  
810 0         0 push(@where, join(' ', $as, $op, $bs));
811             }
812            
813 9         277 return AtteanX::Store::DBI::Plan->new(
814             store => $self,
815             select => \@select,
816             where => \@where,
817             tables => \@tables,
818             in_scope_variables => [@vars],
819             rename_mapping => \%rename_mapping,
820             bindings => \@bind,
821             variables => \%source_table_for_var,
822             );
823             }
824              
825 19         73 return;
826             }
827              
828             =item C<< cost_for_plan( $plan ) >>
829              
830             Returns the estimated cost for a DBI-specific query plan, undef otherwise.
831              
832             =cut
833              
834             sub cost_for_plan {
835             my $self = shift;
836             my $plan = shift;
837             if ($plan->isa('AtteanX::Store::DBI::Plan')) {
838             return 1; # TODO: actually estimate cost here
839             }
840             return;
841             }
842              
843             }
844              
845             package AtteanX::Store::DBI::Plan 0.012 {
846 3     3   23124 use Moo;
  3         7  
  3         29  
847 3     3   8728 use Type::Tiny::Role;
  3         10  
  3         101  
848 3     3   18 use Types::Standard qw(HashRef ArrayRef InstanceOf Str);
  3         8  
  3         54  
849 3     3   3074 use namespace::clean;
  3         8  
  3         20  
850            
851             has store => (is => 'ro', isa => InstanceOf['AtteanX::Store::DBI'], required => 1);
852             has rename_mapping => (is => 'ro', isa => HashRef[Str], default => sub { +{} });
853             has variables => (is => 'ro', isa => HashRef, required => 1);
854             has bindings => (is => 'ro', isa => ArrayRef, required => 1);
855             has select => (is => 'ro', isa => ArrayRef, required => 1);
856             has where => (is => 'ro', isa => ArrayRef, required => 1);
857             has tables => (is => 'ro', isa => ArrayRef[ArrayRef[Str]], required => 1);
858            
859             with 'Attean::API::BindingSubstitutionPlan', 'Attean::API::NullaryQueryTree';
860            
861             sub plan_as_string {
862 0     0 0 0 my $self = shift;
863 0         0 my ($sql, @bind) = $self->sql();
864 0         0 return sprintf('DBI BGP { %s ← (%s) }', $sql, join(', ', @bind));
865             }
866            
867             sub sql {
868 9     9 0 1760 my $self = shift;
869 9         22 my $bind = shift;
870              
871 9         36 my $store = $self->store;
872 9         31 my $dbh = $store->dbh;
873 9         15 my @bind = @{ $self->bindings };
  9         42  
874 9         21 my @where = @{ $self->where };
  9         36  
875 9 100       38 if ($bind) {
876 3         17 foreach my $var ($bind->variables) {
877 0         0 my $id = $store->_get_term_id($bind->value($var));
878 0 0       0 return unless defined($id);
879 0 0       0 if (my $cdata = $self->variables->{ $var }) {
880 0         0 my ($table, $col) = @$cdata;
881 0         0 push(@where, sprintf("%s.%s = ?", $table, $col));
882 0         0 push(@bind, $id);
883             }
884             }
885             }
886            
887 9         41 my @select = map { sprintf("%s.%s AS %s", map { $dbh->quote_identifier( $_ ) } @$_) } @{ $self->select };
  27         486  
  81         1413  
  9         43  
888 9 50       340 unless (scalar(@select)) {
889 0         0 push(@select, '1');
890             }
891            
892            
893 9         23 my @sql;
894 9         20 push(@sql, 'SELECT');
895 9         31 push(@sql, join(', ', @select));
896              
897 9         25 push(@sql, 'FROM');
898 9         31 push(@sql, join(', ', map { join(' ', @$_) } @{ $self->tables }));
  13         50  
  9         37  
899              
900 9 50       31 if (scalar(@where)) {
901 9         24 push(@sql, 'WHERE');
902 9         24 push(@sql, join(' AND ', map { "($_)" } @where));
  29         81  
903             }
904            
905 9         37 my $sql = join(" ", @sql);
906 9         54 return ($sql, @bind);
907             }
908            
909             sub substitute_impl {
910 3     3 0 3135 my $self = shift;
911 3         9 my $model = shift;
912 3         19 my ($sql, @bind) = $self->sql(@_);
913 3         12 my $store = $self->store;
914 3         8 my $dbh = $store->dbh;
915              
916             # warn "TODO: generatee SQL for BGP: $sql\n";
917             # warn "======================================================================\n";
918             # warn "$sql\n";
919             # warn "======================================================================\n";
920              
921 3         11 my $vars = $self->in_scope_variables;
922 3         21 my $sth = $dbh->prepare($sql);
923             return sub {
924             # warn "Generating impl by executing SQL";
925 3     3   500 my $rv = $sth->execute(@bind);
926 3 50       22 unless ($rv) {
927 0         0 warn '*** SQL error: ' . $sth->errstr;
928 0         0 die;
929             }
930             my $sub = sub {
931             # warn '=== Iterator invoked';
932             # use Data::Dumper;
933             # warn Dumper($sql, \@bind);
934 10 100       24447 if (my $row = $sth->fetchrow_hashref) {
935             # warn '@@@ Iterator got row';
936             # warn Dumper($row);
937            
938 7         34 my %bindings;
939 7         22 foreach my $k (@$vars) {
940 21   33     107 my $key = $self->rename_mapping->{$k} // $k;
941 21         62 my $term = $store->_get_term($row->{$key});
942 21         63 $bindings{$k} = $term;
943             }
944 7         176 my $r = Attean::Result->new( bindings => \%bindings );
945 7         2125 return $r;
946             } else {
947             # warn '^^^ Reached end-of-iterator'
948             }
949 3         16 return;
950 3         25 };
951 3         116 return Attean::CodeIterator->new( generator => $sub, item_type => 'Attean::API::Result', variables => $vars );
952 3         773 };
953             }
954            
955             }
956              
957             1;
958              
959             __END__
960              
961             =back
962              
963             =head1 BUGS
964              
965             Please report any bugs or feature requests to through the GitHub web interface
966             at L<https://github.com/kasei/perlrdf2/issues>.
967              
968             =head1 AUTHOR
969              
970             Gregory Todd Williams C<< <gwilliams@cpan.org> >>
971              
972             =head1 COPYRIGHT
973              
974             Copyright (c) 2014--2020 Gregory Todd Williams. This
975             program is free software; you can redistribute it and/or modify it under
976             the same terms as Perl itself.
977              
978             =cut