File Coverage

blib/lib/AtteanX/Store/DBI.pm
Criterion Covered Total %
statement 397 513 77.3
branch 86 154 55.8
condition 15 36 41.6
subroutine 45 56 80.3
pod 20 23 86.9
total 563 782 71.9


line stmt bran cond sub pod time code
1             =head1 NAME
2              
3             AtteanX::Store::DBI - Database quad-store
4              
5             =head1 VERSION
6              
7             This document describes AtteanX::Store::DBI version 0.001
8              
9             =head1 SYNOPSIS
10              
11             use Attean;
12             my $store = Attean->get_store('DBI')->new( dbh => $dbh );
13              
14             =head1 DESCRIPTION
15              
16             AtteanX::Store::DBI provides a quad-store backed by a relational database.
17              
18             =head1 ATTRIBUTES
19              
20             =over 4
21              
22             =item C<< dbh >>
23              
24             =back
25              
26             =cut
27              
28 3     3   7680859 use utf8;
  3         24  
  3         30  
29 3     3   115 use v5.14;
  3         10  
30 3     3   15 use warnings;
  3         8  
  3         190  
31              
32             package AtteanX::Store::DBI {
33             our $VERSION = '0.001';
34 3     3   19 use Moo;
  3         6  
  3         23  
35 3     3   10119 use DBI;
  3         58499  
  3         209  
36 3     3   34 use Attean 0.013;
  3         62  
  3         27  
37 3     3   2400 use DBI::Const::GetInfoType;
  3         19864  
  3         374  
38 3     3   27 use Type::Tiny::Role;
  3         7  
  3         120  
39 3     3   17 use Types::Standard qw(Int Str ArrayRef HashRef ConsumerOf InstanceOf);
  3         6  
  3         31  
40 3     3   3544 use Encode;
  3         8  
  3         298  
41 3     3   1445 use Cache::LRU;
  3         1979  
  3         102  
42 3     3   22 use Set::Scalar;
  3         6  
  3         113  
43 3     3   1106 use DBIx::MultiStatementDo;
  3         2194436  
  3         152  
44 3     3   33 use List::MoreUtils qw(zip);
  3         8  
  3         33  
45 3     3   2603 use List::Util qw(any all first);
  3         7  
  3         270  
46 3     3   27 use File::ShareDir qw(dist_dir dist_file);
  3         46  
  3         164  
47 3     3   2582 use File::Slurp;
  3         30515  
  3         225  
48 3     3   22 use Scalar::Util qw(refaddr reftype blessed);
  3         6  
  3         142  
49 3     3   25 use namespace::clean;
  3         9  
  3         32  
50              
51             with 'Attean::API::MutableQuadStore';
52             with 'Attean::API::BulkUpdatableStore';
53             with 'Attean::API::QuadStore';
54             with 'Attean::API::CostPlanner';
55              
56             my @pos_names = Attean::API::Quad->variables;
57              
58             =head1 ROLES
59              
60             This class consumes L<Attean::API::QuadStore>,
61             L<Attean::API::MutableQuadStore>, and L<Attean::API::BulkUpdatableStore>.
62              
63             =head1 METHODS
64              
65             =over 4
66              
67             =item C<< new ( dbh => $dbh ) >>
68              
69             Returns a new quad-store object backed by the database referenced by the
70             supplied database handle.
71              
72             =cut
73              
74             has dbh => (is => 'ro', isa => InstanceOf['DBI::db'], required => 1);
75             has _i2t_cache => (is => 'ro', default => sub { Cache::LRU->new( size => 256 ) });
76             has _t2i_cache => (is => 'ro', default => sub { Cache::LRU->new( size => 256 ) });
77              
78             =item C<< init() >>
79              
80             Create the tables and indexes required for using the database as a quadstore.
81              
82             =cut
83             sub init {
84 0     0 1 0 my $self = shift;
85 0         0 my $dbh = $self->dbh;
86 0         0 my $batch = DBIx::MultiStatementDo->new( dbh => $dbh );
87 0 0       0 my $file = $self->create_schema_file or die 'No schema files available for store initialization';
88 0         0 my $sql = read_file($file);
89 0         0 $batch->do($sql);
90             }
91            
92             =item C<< temporary_store() >>
93              
94             Returns a temporary (in-memory, SQLite) store.
95              
96             =cut
97             sub temporary_store {
98 0     0 1 0 my $class = shift;
99 0         0 my $dbh = DBI->connect('dbi:SQLite:dbname=:memory:', '', '');
100 0         0 my $store = $class->new(dbh => $dbh);
101 0         0 $store->init();
102 0         0 return $store;
103             }
104            
105             sub _last_insert_id {
106 246     246   804 my $self = shift;
107 246         713 my $table = shift;
108 246         1143 my $dbh = $self->dbh;
109 246         7644 return $dbh->last_insert_id(undef, undef, $table, undef);
110             }
111              
112             sub _get_term {
113 41     41   66 my $self = shift;
114 41         57 my $id = shift;
115 41 100       165 if (my $term = $self->_i2t_cache->get($id)) {
116 28         500 return $term;
117             }
118 13         165 my $sth = $self->dbh->prepare('SELECT term.type, term.value, dtterm.value AS datatype, term.language FROM term LEFT JOIN term dtterm ON (term.datatype_id = dtterm.term_id) WHERE term.term_id = ?');
119 13         1437 $sth->execute($id);
120 13         386 my $row = $sth->fetchrow_hashref;
121 13         61 my $type = $row->{type};
122 13         26 my $term;
123 13         24 my $value = $row->{value};
124 13         25 my $datatype = $row->{datatype};
125 13         25 my $lang = $row->{language};
126 13 100       51 if ($type eq 'iri') {
    50          
    50          
127 9         254 $term = Attean::IRI->new( value => $value );
128             } elsif ($type eq 'blank') {
129 0         0 $term = Attean::Blank->new( value => $value );
130             } elsif ($type eq 'literal') {
131 4         114 my %args = (value => $value, datatype => Attean::IRI->new(value => $datatype));
132 4 100       1397 if ($lang) {
133 2         7 $args{language} = $lang;
134             }
135 4         85 $term = Attean::Literal->new( %args );
136             }
137 13 50       4636 if ($term) {
138 13         111 $self->_i2t_cache->set($id => $term);
139 13         520 return $term;
140             }
141 0         0 Carp::confess "Failed to load term values for bad ID " . Dumper($id);
142             }
143            
144             sub _get_term_id {
145 41     41   87 my $self = shift;
146 41         61 my $term = shift;
147 41 100       193 if (my $id = $self->_t2i_cache->get($term->as_string)) {
148 37         9238 return $id;
149             }
150 4         1233 my $dbh = $self->dbh;
151 4         10 my $tid;
152 4 50       16 if ($term->does('Attean::API::IRI')) {
    0          
    0          
153 4         104 my $sth = $dbh->prepare('SELECT term_id FROM term WHERE type = ? AND value = ?');
154 4         355 my $value = $term->value;
155 4         316 $sth->execute('iri', $value);
156 4         81 ($tid) = $sth->fetchrow_array;
157             } elsif ($term->does('Attean::API::Blank')) {
158 0         0 my $sth = $dbh->prepare('SELECT term_id FROM term WHERE type = ? AND value = ?');
159 0         0 my $value = $term->value;
160 0         0 $sth->execute('blank', $value);
161 0         0 ($tid) = $sth->fetchrow_array;
162             } elsif ($term->does('Attean::API::Literal')) {
163 0         0 my $dtid = $self->_get_or_create_term_id($term->datatype);
164 0         0 my $sth = $dbh->prepare('SELECT term_id FROM term WHERE type = ? AND value = ? AND datatype_id = ? AND language = ?');
165 0         0 my $value = $term->value;
166 0         0 my $lang = $term->language;
167 0         0 $sth->execute('literal', $value, $dtid, $lang);
168 0         0 ($tid) = $sth->fetchrow_array;
169             }
170 4 50       19 if (defined($tid)) {
171 0         0 $self->_t2i_cache->set($term->as_string => $tid);
172 0         0 return $tid;
173             }
174 4         82 return;
175             }
176            
177             sub _get_or_create_term_id {
178 630     630   10643 my $self = shift;
179 630         1456 my $term = shift;
180 630 100       3264 if (my $id = $self->_t2i_cache->get($term->as_string)) {
181 384         131829 return $id;
182             }
183 246         76738 my $dbh = $self->dbh;
184 246         500 my $tid;
185 246         2040 my $insert_term_sth = $dbh->prepare('INSERT INTO term (type, value, datatype_id, language) VALUES (?, ?, ?, ?)');
186 246 100       24309 if ($term->does('Attean::API::IRI')) {
    100          
    50          
187 134         3884 my $sth = $dbh->prepare('SELECT term_id FROM term WHERE type = ? AND value = ?');
188 134         12436 my $value = $term->value;
189 134         12837 $sth->execute('iri', $value);
190 134         1477 ($tid) = $sth->fetchrow_array;
191 134 50       625 unless (defined($tid)) {
192 134         1377484 $insert_term_sth->execute('iri', $value, undef, undef);
193 134         1734 $tid = $self->_last_insert_id('term');
194             }
195             } elsif ($term->does('Attean::API::Blank')) {
196 42         3275 my $sth = $dbh->prepare('SELECT term_id FROM term WHERE type = ? AND value = ?');
197 42         4289 my $value = $term->value;
198 42         4383 $sth->execute('blank', $value);
199 42         522 ($tid) = $sth->fetchrow_array;
200 42 50       212 unless (defined($tid)) {
201 42         464458 $insert_term_sth->execute('blank', $value, undef, undef);
202 42         613 $tid = $self->_last_insert_id('term');
203             }
204             } elsif ($term->does('Attean::API::Literal')) {
205 70         9377 my $dtid = $self->_get_or_create_term_id($term->datatype);
206 70         254 my $sql = 'SELECT term_id FROM term WHERE type = ? AND value = ? AND datatype_id = ?';
207 70         262 my $value = $term->value;
208 70         345 my @bind = ('literal', $value, $dtid);
209 70         254 my $lang = $term->language;
210 70 100       295 if ($lang) {
211 14         38 $sql .= ' AND language = ?';
212 14         37 push(@bind, $lang);
213             }
214 70         509 my $sth = $dbh->prepare($sql);
215 70         14969 $sth->execute(@bind);
216 70         754 ($tid) = $sth->fetchrow_array;
217 70 50       430 unless (defined($tid)) {
218 70         735708 $insert_term_sth->execute('literal', $value, $dtid, $lang);
219 70         1140 $tid = $self->_last_insert_id('term');
220             }
221             } else {
222 0         0 die "Failed to get ID for term: " . $term->as_string;
223             }
224            
225 246 50       1521 if (defined($tid)) {
226 246         2187 $self->_t2i_cache->set($term->as_string => $tid);
227 246         38096 return $tid;
228             }
229 0         0 die;
230             }
231              
232             =item C<< get_quads ( $subject, $predicate, $object, $graph ) >>
233              
234             Returns a stream object of all statements matching the specified subject,
235             predicate and objects. Any of the arguments may be undef to match any value,
236             or an ARRAY reference of terms that are allowable in the respective quad
237             position.
238              
239             =cut
240              
241             sub get_quads {
242             my $self = shift;
243             my @nodes = map { ref($_) eq 'ARRAY' ? $_ : [$_] } @_;
244             my @where;
245             my @bind;
246             foreach my $i (0 .. 3) {
247             my $name = $pos_names[$i];
248             my $terms = $nodes[$i];
249             if (defined($terms)) {
250             unless (scalar(@$terms) == 1 and not defined($terms->[0])) {
251             unless (any { $_->does('Attean::API::Variable') } @$terms) {
252             my @ids = map { $self->_get_term_id($_) } @$terms;
253             unless (scalar(@ids)) {
254             return Attean::ListIterator->new( values => [], item_type => 'Attean::API::Quad' );
255             }
256             push(@where, "$name IN (" . join(', ', ('?') x scalar(@ids)) . ")");
257             push(@bind, @ids);
258             }
259             }
260             }
261             }
262             my $sql = 'SELECT subject, predicate, object, graph FROM quad';
263             if (scalar(@where)) {
264             $sql .= ' WHERE ' . join(' AND ', @where);
265             }
266             my $sth = $self->dbh->prepare($sql);
267             $sth->execute(@bind);
268             my $ok = 1;
269             my $sub = sub {
270             return unless ($ok);
271             if (my $row = $sth->fetchrow_arrayref) {
272             my @terms = map { $self->_get_term($_) } @$row;
273             my $quad = Attean::Quad->new(zip @pos_names, @terms);
274             return $quad;
275             }
276             $ok = 0;
277             return;
278             };
279             my $iter = Attean::CodeIterator->new( generator => $sub, item_type => 'Attean::API::Quad' );
280             return $iter;
281             }
282              
283             =item C<< count_quads ( $subject, $predicate, $object, $graph ) >>
284              
285             Returns the count of all statements matching the specified subject,
286             predicate and objects. Any of the arguments may be undef to match any value,
287             or an ARRAY reference of terms that are allowable in the respective quad
288             position.
289              
290             =cut
291              
292             sub count_quads {
293 17     17 1 2150 my $self = shift;
294 17 50       66 my @nodes = map { ref($_) eq 'ARRAY' ? $_ : [$_] } @_;
  7         27  
295 17         134 my @where;
296             my @bind;
297 17         114 foreach my $i (0 .. 3) {
298 62         165 my $name = $pos_names[$i];
299 62         97 my $terms = $nodes[$i];
300 62 100       164 if (defined($terms)) {
301 7 100 66     58 unless (scalar(@$terms) == 1 and not defined($terms->[0])) {
302 5 100   5   32 unless (any { $_->does('Attean::API::Variable') } @$terms) {
  5         21  
303 4         174 my @ids = map { $self->_get_term_id($_) } @$terms;
  4         14  
304 4 100       31 return 0 unless scalar(@ids);
305 2         13 push(@where, "$name IN (" . join(', ', ('?') x scalar(@ids)) . ")");
306 2         11 push(@bind, @ids);
307             }
308             }
309             }
310             }
311 15         44 my $sql = 'SELECT COUNT(*) FROM quad';
312 15 100       59 if (scalar(@where)) {
313 2         9 $sql .= ' WHERE ' . join(' AND ', @where);
314             }
315 15         161 my $sth = $self->dbh->prepare($sql);
316 15         2369 $sth->execute(@bind);
317 15         355 my ($count) = $sth->fetchrow_array;
318 15         409 return $count;
319             }
320              
321             =item C<< get_graphs >>
322              
323             Returns an iterator over the Attean::API::Term objects comprising
324             the set of graphs of the stored quads.
325              
326             =cut
327              
328             sub get_graphs {
329 10     10 1 1634 my $self = shift;
330 10         90 my $sth = $self->dbh->prepare('SELECT DISTINCT value FROM quad JOIN term ON (quad.graph = term.term_id)');
331 10         2126 $sth->execute;
332             my $sub = sub {
333 15     15   8088 my $row = $sth->fetchrow_arrayref;
334 15 100       88 return unless ref($row);
335 6         22 my ($value) = @$row;
336 6         196 return Attean::IRI->new(value => $value);
337 10         162 };
338 10         383 return Attean::CodeIterator->new( generator => $sub, item_type => 'Attean::API::Term' );
339             }
340              
341             # -----------------------------------------------------------------------------
342              
343             =item C<< add_quad ( $quad ) >>
344              
345             Adds the specified C<$quad> to the underlying model.
346              
347             =cut
348            
349             sub add_quad {
350             my $self = shift;
351             my $st = shift;
352             my @ids = map { $self->_get_or_create_term_id($_) } $st->values;
353             if (any { not defined($_) } @ids) {
354             return;
355             }
356            
357             my $type = $self->database_type;
358             my @bind = @ids;
359             my $sql = 'INSERT INTO quad (subject, predicate, object, graph) VALUES (?, ?, ?, ?)';
360             if ($type eq 'sqlite') {
361             $sql = 'INSERT OR IGNORE INTO quad (subject, predicate, object, graph) VALUES (?, ?, ?, ?)';
362             } elsif ($type eq 'mysql') {
363             $sql = 'INSERT IGNORE INTO quad (subject, predicate, object, graph) VALUES (?, ?, ?, ?)';
364             } elsif ($type eq 'postgresql') {
365             $sql = 'INSERT INTO quad (subject, predicate, object, graph) SELECT ?, ?, ?, ? WHERE NOT EXISTS (SELECT 1 FROM quad WHERE subject = ? AND predicate = ? AND object = ? AND graph = ?)';
366             push(@bind, @ids);
367             }
368             my $sth = $self->dbh->prepare($sql);
369             $sth->execute(@bind);
370             return;
371             }
372              
373             =item C<< remove_quad ( $statement ) >>
374              
375             Removes the specified C<$statement> from the underlying model.
376              
377             =cut
378              
379             sub remove_quad {
380 4     4 1 13 my $self = shift;
381 4         8 my $st = shift;
382 4         51 my @ids = map { $self->_get_term_id($_) } $st->values;
  16         226  
383 4 50       29 unless (scalar(@ids) == 4) {
384 0         0 return;
385             }
386 4 50   16   29 unless (all { defined($_) } @ids) {
  16         26  
387 0         0 return;
388             }
389 4         40 my $sth = $self->dbh->prepare('DELETE FROM quad WHERE subject = ? AND predicate = ? AND object = ? AND graph = ?');
390 4         29947 $sth->execute(@ids);
391 4         100 return;
392             }
393              
394             =item C<< create_graph( $graph ) >>
395              
396             This is a no-op function for the memory quad-store.
397              
398             =cut
399              
400       4 1   sub create_graph {
401             # no-op on a quad-store
402             }
403              
404             =item C<< drop_graph( $graph ) >>
405              
406             Removes all quads with the given C<< $graph >>.
407              
408             =cut
409              
410             sub drop_graph {
411 2     2 1 1026 my $self = shift;
412 2         12 return $self->clear_graph(@_);
413             }
414              
415             =item C<< clear_graph( $graph ) >>
416              
417             Removes all quads with the given C<< $graph >>.
418              
419             =cut
420              
421             sub clear_graph {
422 4     4 1 1177 my $self = shift;
423 4         9 my $graph = shift;
424 4         23 my $gid = $self->_get_term_id($graph);
425 4 50       21 return unless defined($gid);
426 4         46 my $sth = $self->dbh->prepare('DELETE FROM quad WHERE graph = ?');
427 4         36444 $sth->execute($gid);
428 4         105 return;
429             }
430            
431             =item C<< begin_transaction >>
432              
433             Begin a database transaction.
434              
435             =cut
436              
437             sub begin_transaction {
438             # warn 'begin transaction';
439 0     0 1 0 my $self = shift;
440 0         0 $self->dbh->begin_work;
441             }
442            
443             =item C<< abort_transaction >>
444              
445             Rollback the current database transaction.
446              
447             =cut
448              
449             sub abort_transaction {
450             # warn 'abort transaction';
451 0     0 1 0 my $self = shift;
452 0         0 $self->dbh->rollback;
453             }
454            
455             =item C<< end_transaction >>
456              
457             Commit the current database transaction.
458              
459             =cut
460              
461             sub end_transaction {
462             # warn 'end transaction';
463 0     0 1 0 my $self = shift;
464 0         0 $self->dbh->commit;
465             }
466            
467             =item C<< begin_bulk_updates >>
468              
469             Begin a database transaction.
470              
471             =cut
472              
473             sub begin_bulk_updates {
474 0     0 1 0 my $self = shift;
475 0         0 $self->dbh->begin_work;
476             }
477            
478             =item C<< end_bulk_updates >>
479              
480             Commit the current database transaction.
481              
482             =cut
483              
484             sub end_bulk_updates {
485 0     0 1 0 my $self = shift;
486 0         0 $self->dbh->commit;
487             }
488            
489             =item C<< database_type >>
490              
491             Returns the database type name as a string (e.g. 'mysql', 'sqlite', or 'postgresql').
492              
493             =cut
494              
495             sub database_type {
496 164     164 1 522 my $self = shift;
497 164         522 my $dbh = $self->dbh;
498             # warn $dbh->get_info($GetInfoType{SQL_DRIVER_NAME});
499 164         2143 my $type = lc($dbh->get_info($GetInfoType{SQL_DBMS_NAME}));
500 164         5389 return $type;
501             }
502            
503             =item C<< initialize_version >>
504              
505             Insert data into the attean_version table.
506              
507             =cut
508              
509             sub initialize_version {
510 0     0 1 0 my $self = shift;
511 0         0 my $dbh = $self->dbh;
512 0         0 $dbh->do('DELETE FROM attean_version');
513 0         0 my $sql = 'INSERT INTO attean_version (attean_version, store_version) VALUES (?, ?);';
514 0         0 $dbh->do($sql, undef, $Attean::VERSION, $AtteanX::Store::DBI::VERSION);
515             }
516              
517             =item C<< create_schema_file >>
518              
519             Returns the path to the file containing the database DDL for quadstore creation
520             for the current database type if available, undef otherwise.
521              
522             =cut
523              
524             sub create_schema_file {
525 20     20 1 1351 my $self = shift;
526 20         91 my $type = $self->database_type;
527 20   50     116 my $dir = $ENV{ATTEAN_SHAREDIR} || eval { dist_dir('AtteanX-Store-DBI') } || 'share';
528 20         3305 my $file = File::Spec->catfile($dir, 'database-schema', sprintf('%s-create.sql', $type));
529 20 50       517 if (-r $file) {
530 20         143 return $file;
531             }
532 0         0 return;
533             }
534              
535             =item C<< drop_schema_file >>
536              
537             Returns the path to the file containing the database DDL for quadstore deletion
538             for the current database type if available, undef otherwise.
539              
540             =cut
541              
542             sub drop_schema_file {
543 0     0 1 0 my $self = shift;
544 0         0 my $type = $self->database_type;
545 0   0     0 my $dir = $ENV{ATTEAN_SHAREDIR} || eval { dist_dir('AtteanX-Store-DBI') } || 'share';
546 0         0 my $file = File::Spec->catfile($dir, 'database-schema', sprintf('%s-drop.sql', $type));
547 0 0       0 if (-r $file) {
548 0         0 return $file;
549             }
550 0         0 return;
551             }
552            
553             =item C<< available_database_types >>
554              
555             Returns the names of the database types for which the system has schemas
556             available to create and drop quadstore tables.
557              
558             =cut
559              
560             sub available_database_types {
561 0     0 1 0 my $self = shift;
562 0   0     0 my $dir = $ENV{ATTEAN_SHAREDIR} || eval { dist_dir('AtteanX-Store-DBI') } || 'share';
563 0         0 my $pat = File::Spec->catfile($dir, 'database-schema', '*-create.sql');
564 0         0 my @files = glob($pat);
565 0         0 my @types = map { /(\w+)-create.sql/ } @files;
  0         0  
566 0         0 return @types;
567             }
568            
569             =item C<< dbi_connect_args ( $type, %args ) >>
570              
571             =item C<< dbi_connect_args ( %args ) >>
572              
573             Returns a quad C<< $dsn, $user, $password, \%connect_args >> suitable for
574             passing to C<< DBI->connect >> to obtain a database handle to be used in
575             constructing a C<< AtteanX::Store::DBI >> quadstore.
576              
577             C<< %args >> must contain a value for the C<< database >> key. It may also
578             contain values for the optional keys: C<< user >>, C<< password >>,
579             C<< host >>, and C<< port >>.
580              
581             If invoked as a class method, the C<< $type >> parameter is required, and must
582             be one of the database types returned by C<< available_database_types >>.
583              
584             If invoked as an object method, the C<< $type >> parameter must not be
585             included; this information will be obtained directly from the
586             C<< AtteanX::Store::DBI >> object.
587              
588             =cut
589              
590             sub dbi_connect_args {
591 20     20 1 112767 my $self = shift;
592 20 50       159 my $type = blessed($self) ? $self->database_type : shift;
593 20         101 my %args = @_;
594 20         94 my $database = $args{database};
595 20         62 my $user = $args{user};
596 20         57 my $password = $args{password};
597 20         63 my $host = $args{host};
598 20         57 my $port = $args{port};
599 20         51 my $dsn;
600             my %connect_args;
601 20         89 $connect_args{RaiseError} = 1;
602              
603 20 50       182 if ($type eq 'mysql') {
    50          
    50          
604 0         0 $dsn = "DBI:mysql:database=${database}";
605 0 0       0 if (defined($host)) {
606 0         0 $dsn .= ";host=$host";
607             }
608 0 0       0 if (defined($port)) {
609 0         0 $dsn .= ";port=$port";
610             }
611 0         0 $connect_args{mysql_enable_utf8} = 1;
612             } elsif ($type eq 'postgresql') {
613 0         0 $dsn = "DBI:Pg:dbname=${database}";
614 0 0       0 if (defined($host)) {
615 0         0 $dsn .= ";host=$host";
616             }
617 0 0       0 if (defined($port)) {
618 0         0 $dsn .= ";port=$port";
619             }
620             } elsif ($type eq 'sqlite') {
621 20         72 $dsn = "DBI:SQLite:dbname=${database}";
622 20         68 $connect_args{sqlite_unicode} = 1;
623             }
624            
625 20         164 return ($dsn, $user, $password, \%connect_args);
626             }
627              
628             =item C<< plans_for_algebra( $algebra, $model, $active_graphs, $default_graphs ) >>
629              
630             For BGP algebras, returns a DBI-specific L<Attean::API::Plan> object, otherwise
631             returns undef.
632              
633             =cut
634              
635             sub plans_for_algebra {
636 36     36 1 364033 my $self = shift;
637 36         65 my $algebra = shift;
638 36         105 my $model = shift;
639 36         63 my $active_graphs = shift;
640 36         65 my $default_graphs = shift;
641 36 50       120 return unless ($algebra);
642            
643 36         92 my %args = @_;
644 36         131 my $counter = $args{dbi_filter_counter}++;
645            
646 36 100 66     338 if ($algebra->isa('Attean::Algebra::Filter')) {
    100          
647 9         40 my $e = $algebra->expression;
648 9 50       50 if ($e->isa('Attean::FunctionExpression')) {
649 9 100 33     106 if ($e->operator =~ m/IS(IRI|LITERAL|BLANK)/i) {
    50          
650 4         21 my $type = lc($1);
651 4         9 my ($operand) = @{ $e->children };
  4         20  
652 4 50 33     53 if ($operand->isa('Attean::ValueExpression') and $operand->value->does('Attean::API::Variable')) {
653 4         85 my $var = $operand->value;
654 4 50       22 if (my ($plan) = $self->plans_for_algebra($algebra->child, $model, $active_graphs, $default_graphs, %args)) {
655 4 50       1818 if ($plan->isa('AtteanX::Store::DBI::Plan')) {
656 4 50       29 if (exists $plan->variables->{ $var->value }) {
657 4         10 my ($table, $col) = @{ $plan->variables->{ $var->value } };
  4         21  
658 4         16 my $ref = join('.', map { $self->dbh->quote_identifier($_) } ($table, $col));
  8         214  
659 4         104 my $typecol = $self->dbh->quote_identifier('type');
660 4         88 push(@{ $plan->where }, "$ref IN (SELECT term_id FROM term WHERE ${typecol} = ?)");
  4         26  
661 4         10 push(@{ $plan->bindings }, $type);
  4         15  
662 4         27 return $plan;
663             }
664             }
665             }
666             }
667             } elsif ($e->operator eq 'STRSTARTS' or $e->operator eq 'CONTAINS') {
668 5         13 my ($varexpr, $pat) = @{ $e->children };
  5         22  
669 5 50 66     53 if ($varexpr->isa('Attean::ValueExpression') and $varexpr->value->does('Attean::API::Variable') and $pat->isa('Attean::ValueExpression') and $pat->value->does('Attean::API::Literal')) {
      66        
      33        
670 4 50       189 if (my ($plan) = $self->plans_for_algebra($algebra->child, $model, $active_graphs, $default_graphs, %args)) {
671 4 50       10464 if ($plan->isa('AtteanX::Store::DBI::Plan')) {
672 4         18 my $var = $varexpr->value;
673 4         15 my $varname = $var->value;
674 4 50       65 if (exists $plan->variables->{ $var->value }) {
675 4         12 my ($table, $col) = @{ $plan->variables->{ $var->value } };
  4         20  
676 4         15 my $literal = $pat->value;
677 4         13 my $ref = join('.', map { $self->dbh->quote_identifier($_) } ($table, $col));
  8         349  
678 4         106 my $typecol = $self->dbh->quote_identifier('type');
679 4         96 my $termtable = "tf$counter";
680 4         9 push(@{ $plan->tables }, ['term', $termtable]);
  4         20  
681              
682 4         21 my $db = $self->database_type;
683 4 50 33     44 return unless ($db eq 'mysql' or $db eq 'postgresql' or $db eq 'sqlite');
      33        
684            
685 4         10 push(@{ $plan->where }, "$ref = $termtable.term_id");
  4         20  
686            
687 4         8 push(@{ $plan->where }, "$termtable.$typecol = ?"); # TODO: Remove this (and the bindings below) if $varexpr is STR(?var) instead of just ?var.
  4         18  
688            
689 4 50       22 my $op = ($e->operator eq 'STRSTARTS') ? '=' : '>=';
690 4 50       30 if ($db eq 'mysql') {
    50          
    50          
691 0         0 push(@{ $plan->where }, "LOCATE(?, $termtable.value) ${op} ?");
  0         0  
692 0         0 push(@{ $plan->bindings }, 'literal');
  0         0  
693 0         0 push(@{ $plan->bindings }, $literal->value);
  0         0  
694 0         0 push(@{ $plan->bindings }, 1);
  0         0  
695             } elsif ($db eq 'postgresql') {
696 0         0 push(@{ $plan->where }, "STRPOS($termtable.value, ?) ${op} ?");
  0         0  
697 0         0 push(@{ $plan->bindings }, 'literal');
  0         0  
698 0         0 push(@{ $plan->bindings }, $literal->value);
  0         0  
699 0         0 push(@{ $plan->bindings }, 1);
  0         0  
700             } elsif ($db eq 'sqlite') {
701 4         10 push(@{ $plan->where }, "INSTR($termtable.value, ?) ${op} 1");
  4         18  
702 4         9 push(@{ $plan->bindings }, 'literal');
  4         17  
703 4         7 push(@{ $plan->bindings }, $literal->value);
  4         20  
704             }
705            
706 4 100       20 if (my $lang = $literal->language) {
707 2         5 push(@{ $plan->where }, "$termtable.language = ?");
  2         13  
708 2         4 push(@{ $plan->bindings }, $lang);
  2         8  
709             } else {
710 2         55 my $xs = Attean::IRI->new( value => 'http://www.w3.org/2001/XMLSchema#string' );
711 2         754 my $la = Attean::IRI->new( value => 'http://www.w3.org/1999/02/22-rdf-syntax-ns#langString' );
712 2         726 my $xid = $self->_get_term_id($xs);
713 2         11 my $lid = $self->_get_term_id($la);
714 2         5 push(@{ $plan->where }, "$termtable.datatype_id IN (?, ?)");
  2         11  
715 2         6 push(@{ $plan->bindings }, $xid, $lid);
  2         16  
716             }
717 4         32 return $plan;
718             }
719             }
720             }
721             }
722             }
723             }
724 9         54 } elsif ($algebra->isa('Attean::Algebra::BGP') and scalar(@{ $algebra->triples }) > 0) {
725 9         47 my @vars = $algebra->in_scope_variables;
726            
727 9         2603 my @triples = @{ $algebra->triples };
  9         44  
728 9         61 my @select;
729             my @where_joins;
730 9         0 my %seen_vars;
731 9         0 my %source_table_for_var;
732 9         0 my %blanks;
733 9         22 my $tcounter = 0;
734 9         20 my $bcounter = 0;
735 9         24 my @tables;
736            
737             my %rename_mapping;
738             my $rename_proj = sub {
739 36     36   65 my $name = shift;
740 36 100       129 if ($name =~ /[-._]|\W/) {
741 9         19 my $old = $name;
742 9         48 $name =~ s/_/__/g;
743 9         40 $name =~ s/([-.]|\W)/_d/g;
744 9         30 $name =~ s/([-.]|\W)/'_x' . sprintf('%02x', ord($1))/e;
  0         0  
745 9         63 $rename_mapping{$old} = $name;
746             }
747            
748 36         84 return $name;
749 9         71 };
750            
751 9 50       43 Carp::confess Dumper($active_graphs) unless (ref($active_graphs) eq 'ARRAY');
752            
753 9         21 my @graph_ids = map { $self->_get_term_id($_) } @{ $active_graphs };
  9         51  
  9         19  
754 9 50   9   83 if (any { not defined($_) } @graph_ids) {
  9         39  
755 0         0 return;
756             }
757            
758 9         36 my @bind;
759 9         208 my $graph = Attean::Variable->new(value => '___g');
760 9         972 $seen_vars{ $graph->value }++;
761 9         76 my $graph_values = sprintf('(%s)', join(', ', ('?') x scalar(@graph_ids)));
762 9         34 push(@bind, @graph_ids);
763 9         33 my @where = ("t0.graph IN $graph_values");
764            
765 9         28 foreach my $t (@triples) {
766 9         31 my $table = 't' . $tcounter++;
767 9         46 push(@tables, ['quad', $table]);
768              
769 9         22 my @vars;
770 9         63 my $q = $t->as_quadpattern($graph);
771 9         3382 my @nodes = $q->values;
772 9         432 foreach my $i (0 .. $#nodes) {
773 36         72 my $node = $nodes[$i];
774 36         86 my $name = $pos_names[$i];
775 36 50       83 if ($node->does('Attean::API::Variable')) {
    0          
776 36         513 my $var = $node;
777 36         100 push(@vars, [$var, $name]);
778             } elsif ($node->does('Attean::API::Blank')) {
779 0         0 my $id = $node->value;
780 0 0       0 unless (exists $blanks{$id}) {
781 0         0 my $bname = sprintf('.%s_%d', 'blank', $bcounter++);
782 0         0 $blanks{$id} = Attean::Variable->new(value => $bname);
783             }
784 0         0 my $var = $blanks{$id};
785 0         0 push(@vars, [$var, $name]);
786 0         0 $seen_vars{ $var->value }++;
787             } else {
788 0         0 my $id = $self->_get_term_id($node);
789 0 0       0 return unless defined($id);
790 0         0 push(@where, sprintf('%s.%s = ?', $table, $name));
791 0         0 push(@bind, $id);
792             }
793             }
794            
795 9         33 foreach my $vdata (@vars) {
796 36         81 my ($var, $name) = @$vdata;
797 36         85 my $var_name = $rename_proj->( $var->value );
798 36 100       155 push(@select, [$table, $name, $var_name]) unless ($seen_vars{$var->value}++);
799 36 50       87 if (my $tt = $source_table_for_var{ $var->value }) {
800 0         0 push(@where_joins, ['=', $tt, [$table, $name]]);
801             } else {
802 36         144 $source_table_for_var{ $var->value } = [$table, $name];
803             }
804             }
805             }
806            
807 9         43 foreach my $w (@where_joins) {
808 0         0 my ($op, $a, $b) = @$w;
809 0         0 my ($as, $bs) = map { sprintf('%s.%s', @$_) } ($a, $b);
  0         0  
810 0         0 push(@where, join(' ', $as, $op, $bs));
811             }
812            
813 9         292 return AtteanX::Store::DBI::Plan->new(
814             store => $self,
815             select => \@select,
816             where => \@where,
817             tables => \@tables,
818             in_scope_variables => [@vars],
819             rename_mapping => \%rename_mapping,
820             bindings => \@bind,
821             variables => \%source_table_for_var,
822             );
823             }
824              
825 19         72 return;
826             }
827              
828             =item C<< cost_for_plan( $plan ) >>
829              
830             Returns the estimated cost for a DBI-specific query plan, undef otherwise.
831              
832             =cut
833              
834             sub cost_for_plan {
835             my $self = shift;
836             my $plan = shift;
837             if ($plan->isa('AtteanX::Store::DBI::Plan')) {
838             return 1; # TODO: actually estimate cost here
839             }
840             return;
841             }
842              
843             }
844              
845             package AtteanX::Store::DBI::Plan 0.012 {
846 3     3   22265 use Moo;
  3         6  
  3         35  
847 3     3   8353 use Type::Tiny::Role;
  3         8  
  3         94  
848 3     3   16 use Types::Standard qw(HashRef ArrayRef InstanceOf Str);
  3         9  
  3         51  
849 3     3   2948 use namespace::clean;
  3         7  
  3         19  
850            
851             has store => (is => 'ro', isa => InstanceOf['AtteanX::Store::DBI'], required => 1);
852             has rename_mapping => (is => 'ro', isa => HashRef[Str], default => sub { +{} });
853             has variables => (is => 'ro', isa => HashRef, required => 1);
854             has bindings => (is => 'ro', isa => ArrayRef, required => 1);
855             has select => (is => 'ro', isa => ArrayRef, required => 1);
856             has where => (is => 'ro', isa => ArrayRef, required => 1);
857             has tables => (is => 'ro', isa => ArrayRef[ArrayRef[Str]], required => 1);
858            
859             with 'Attean::API::BindingSubstitutionPlan', 'Attean::API::NullaryQueryTree';
860            
861             sub plan_as_string {
862 0     0 0 0 my $self = shift;
863 0         0 my ($sql, @bind) = $self->sql();
864 0         0 return sprintf('DBI BGP { %s ← (%s) }', $sql, join(', ', @bind));
865             }
866            
867             sub sql {
868 9     9 0 1789 my $self = shift;
869 9         20 my $bind = shift;
870              
871 9         39 my $store = $self->store;
872 9         30 my $dbh = $store->dbh;
873 9         19 my @bind = @{ $self->bindings };
  9         41  
874 9         22 my @where = @{ $self->where };
  9         34  
875 9 100       35 if ($bind) {
876 3         17 foreach my $var ($bind->variables) {
877 0         0 my $id = $store->_get_term_id($bind->value($var));
878 0 0       0 return unless defined($id);
879 0 0       0 if (my $cdata = $self->variables->{ $var }) {
880 0         0 my ($table, $col) = @$cdata;
881 0         0 push(@where, sprintf("%s.%s = ?", $table, $col));
882 0         0 push(@bind, $id);
883             }
884             }
885             }
886            
887 9         42 my @select = map { sprintf("%s.%s AS %s", map { $dbh->quote_identifier( $_ ) } @$_) } @{ $self->select };
  27         458  
  81         1452  
  9         34  
888 9 50       231 unless (scalar(@select)) {
889 0         0 push(@select, '1');
890             }
891            
892            
893 9         17 my @sql;
894 9         24 push(@sql, 'SELECT');
895 9         30 push(@sql, join(', ', @select));
896              
897 9         20 push(@sql, 'FROM');
898 9         20 push(@sql, join(', ', map { join(' ', @$_) } @{ $self->tables }));
  13         43  
  9         30  
899              
900 9 50       31 if (scalar(@where)) {
901 9         27 push(@sql, 'WHERE');
902 9         24 push(@sql, join(' AND ', map { "($_)" } @where));
  29         77  
903             }
904            
905 9         32 my $sql = join(" ", @sql);
906 9         50 return ($sql, @bind);
907             }
908            
909             sub substitute_impl {
910 3     3 0 3226 my $self = shift;
911 3         9 my $model = shift;
912 3         19 my ($sql, @bind) = $self->sql(@_);
913 3         13 my $store = $self->store;
914 3         10 my $dbh = $store->dbh;
915              
916             # warn "TODO: generatee SQL for BGP: $sql\n";
917             # warn "======================================================================\n";
918             # warn "$sql\n";
919             # warn "======================================================================\n";
920              
921 3         10 my $vars = $self->in_scope_variables;
922 3         24 my $sth = $dbh->prepare($sql);
923             return sub {
924             # warn "Generating impl by executing SQL";
925 3     3   466 my $rv = $sth->execute(@bind);
926 3 50       22 unless ($rv) {
927 0         0 warn '*** SQL error: ' . $sth->errstr;
928 0         0 die;
929             }
930             my $sub = sub {
931             # warn '=== Iterator invoked';
932             # use Data::Dumper;
933             # warn Dumper($sql, \@bind);
934 10 100       24017 if (my $row = $sth->fetchrow_hashref) {
935             # warn '@@@ Iterator got row';
936             # warn Dumper($row);
937            
938 7         22 my %bindings;
939 7         21 foreach my $k (@$vars) {
940 21   33     100 my $key = $self->rename_mapping->{$k} // $k;
941 21         59 my $term = $store->_get_term($row->{$key});
942 21         56 $bindings{$k} = $term;
943             }
944 7         170 my $r = Attean::Result->new( bindings => \%bindings );
945 7         1609 return $r;
946             } else {
947             # warn '^^^ Reached end-of-iterator'
948             }
949 3         15 return;
950 3         25 };
951 3         113 return Attean::CodeIterator->new( generator => $sub, item_type => 'Attean::API::Result', variables => $vars );
952 3         628 };
953             }
954            
955             }
956              
957             1;
958              
959             __END__
960              
961             =back
962              
963             =head1 BUGS
964              
965             Please report any bugs or feature requests to through the GitHub web interface
966             at L<https://github.com/kasei/perlrdf2/issues>.
967              
968             =head1 AUTHOR
969              
970             Gregory Todd Williams C<< <gwilliams@cpan.org> >>
971              
972             =head1 COPYRIGHT
973              
974             Copyright (c) 2014--2016 Gregory Todd Williams. This
975             program is free software; you can redistribute it and/or modify it under
976             the same terms as Perl itself.
977              
978             =cut