File Coverage

blib/lib/Database/Async/ORM.pm
Criterion Covered Total %
statement 45 243 18.5
branch 0 52 0.0
condition 0 12 0.0
subroutine 15 32 46.8
pod 3 15 20.0
total 63 354 17.8


line stmt bran cond sub pod time code
1             package Database::Async::ORM;
2              
3 1     1   170091 use strict;
  1         2  
  1         33  
4 1     1   3 use warnings;
  1         2  
  1         73  
5              
6             our $VERSION = '0.019'; # VERSION
7              
8             =head1 NAME
9              
10             Database::Async::ORM - provides object-relational features for L
11              
12             =head1 SYNOPSIS
13              
14             use 5.020;
15             use IO::Async::Loop;
16             use Database::Async::ORM;
17             my $loop = IO::Async::Loop->new;
18             $loop->add(
19             my $orm = Database::Async::ORM->new
20             );
21              
22             # Load schemata directly from the database
23             $orm->load_from($db)
24             ->then(sub {
25             say 'We have the following tables:';
26             $orm->tables
27             ->map('name')
28             ->say
29             ->completed
30             })->get;
31              
32             # Load schemata from a hashref (e.g. pulled
33             # from a YAML/JSON/XML file or API)
34             $orm->load_from({ ... })
35             ->then(sub {
36             $orm->apply_to($db)
37             })->then(sub {
38             say 'We have the following tables:';
39             $orm->tables
40             ->map('name')
41             ->say
42             ->completed
43             })->get;
44              
45             =cut
46              
47 1     1   525 use Future;
  1         10648  
  1         29  
48 1     1   371 use Future::AsyncAwait;
  1         3593  
  1         4  
49 1     1   430 use Syntax::Keyword::Try;
  1         981  
  1         6  
50 1     1   836 use Path::Tiny;
  1         15730  
  1         67  
51 1     1   8 use List::Util qw(sum0);
  1         1  
  1         52  
52 1     1   6 use Scalar::Util qw(blessed);
  1         2  
  1         31  
53              
54 1     1   513 use Database::Async::ORM::Schema;
  1         2  
  1         38  
55 1     1   11 use Database::Async::ORM::Type;
  1         2  
  1         14  
56 1     1   420 use Database::Async::ORM::Table;
  1         2  
  1         35  
57 1     1   428 use Database::Async::ORM::Field;
  1         2  
  1         32  
58 1     1   422 use Database::Async::ORM::Constraint;
  1         3  
  1         35  
59 1     1   410 use Database::Async::ORM::Extension;
  1         2  
  1         56  
60              
61 1     1   401 use Log::Any qw($log);
  1         8360  
  1         3  
62              
63             sub new {
64 0     0 0   my $class = shift;
65 0           bless {
66             schema => [],
67             extension => [],
68             @_
69             }, $class
70             }
71              
72             sub add_schema {
73 0     0 0   my ($self, $schema) = @_;
74 0           push $self->{schema}->@*, $schema;
75             }
76              
77             sub add_extension {
78 0     0 0   my ($self, $extension) = @_;
79 0           push $self->{extension}->@*, $extension;
80             }
81              
82             sub schemata {
83             shift->{schema}->@*
84 0     0 0   }
85              
86             sub schema_list {
87             shift->{schema}->@*
88 0     0 0   }
89              
90             sub extension_list {
91             shift->{extension}->@*
92 0     0 0   }
93              
94             sub schema_by_name {
95 0     0 0   my ($self, $name) = @_;
96 0 0         my ($schema) = grep { $_->name eq $name } $self->schemata or die 'cannot find schema ' . $name . ', have these instead: ' . join(',', map $_->name, $self->schemata);
  0            
97 0           return $schema;
98             }
99              
100 0   0 0 0   sub schema_definitions { shift->{schema_definitions} //= {} }
101              
102             # Currently hardcoded to PostgreSQL, eventually we should be able to query
103             # the engine for this information.
104             sub ddl_for {
105 0     0 0   require Database::Async::Engine::PostgreSQL::DDL;
106 0           return Database::Async::Engine::PostgreSQL::DDL->new;
107             }
108              
109 0     0 0   async sub apply_database_changes {
110 0           my ($self, $db, @actions) = @_;
111              
112 0           my $ddl = $self->ddl_for($db);
113              
114             # Optional extensions first, and we don't care if any fail
115 0           for my $ext ($self->extension_list) {
116 0           my ($name) = $ext->name;
117             try {
118             die 'invalid name for extension: ' . $name unless $name =~ /^[a-zA-Z0-9_-]+$/;
119             await $db->query(qq{create extension if not exists "$name" cascade})->void if $ext->is_optional;
120 0           } catch {
121             $log->warnf('Failed to install optional extension %s, ignoring: %s', $name, $@);
122             }
123             }
124              
125             # All remaining steps are in a single transaction
126 0           await $db->query(q{begin})->void;
127              
128 0           for my $ext (grep { not $_->is_optional } $self->extension_list) {
  0            
129 0           my ($name) = $ext->name;
130 0           await $db->query(qq{create extension if not exists "$name" cascade})->void;
131             }
132              
133 0           my @out;
134 0           for my $action (@actions) {
135 0 0         if($action->isa('Database::Async::ORM::Table')) {
    0          
    0          
136 0           $log->tracef('Table definition for %s', $action->name);
137 0           my ($sql, @bind) = $ddl->table_info($action);
138 0           my %map = (
139             schema => $action->schema->name,
140             table => $action->name
141             );
142 0           my @data = map { $map{$_} } @bind;
  0            
143 0           my (@fields) = await $db->query(
144             $sql => @data
145             )->row_hashrefs
146             ->as_list;
147 0 0         push @out, $ddl->create_table($action) unless @fields;
148             } elsif($action->isa('Database::Async::ORM::Schema')) {
149 0           $log->tracef('Create schema %s', $action->name);
150 0           my ($sql, @bind) = $ddl->schema_info($action);
151 0           my %map = (
152             schema => $action->name,
153             );
154 0           my @data = map { $map{$_} } @bind;
  0            
155 0           my (@schema) = await $db->query(
156             $sql => @data
157             )->row_hashrefs
158             ->as_list;
159 0 0         push @out, $ddl->create_schema($action) unless @schema;
160             } elsif($action->isa('Database::Async::ORM::Type')) {
161 0           $log->tracef('Create type %s', $action->name);
162 0           my ($sql, @bind) = $ddl->type_info($action);
163 0           my %map = (
164             schema => $action->schema->name,
165             type => $action->name
166             );
167 0           my @data = map { $map{$_} } @bind;
  0            
168 0           my ($existing_type) = await $db->query(
169             $sql => @data
170             )->row_hashrefs
171             ->as_list;
172 0 0         push @out, $ddl->create_type($action) unless $existing_type;
173             } else {
174 0           die 'unknown thing ' . $action;
175             }
176             }
177              
178             # Make sure that we have no empty queries in the list... should not be necessary,
179             # perhaps this should just bail out instead.
180 0           @out = grep { length } @out;
  0            
181              
182 0           $log->debugf('Applying %d pending database migrations', 0 + @out);
183 0           for my $query (@out) {
184 0           $log->tracef('Apply SQL: %s', $query);
185 0           await $db->query($query)->void;
186             }
187              
188 0           await $db->query(q{commit})->void;
189 0           $log->debugf('Applied %d database migrations', 0 + @out);
190 0           return;
191             }
192              
193             sub database_changes_as_sql {
194 0     0 0   my ($self, $db, @actions) = @_;
195              
196 0           my $ddl = $self->ddl_for($db);
197              
198 0           my @out;
199             # Optional extensions first, and we don't care if any fail
200 0           for my $ext ($self->extension_list) {
201 0           my ($name) = $ext->name;
202             try {
203             die 'invalid name for extension: ' . $name unless $name =~ /^[a-zA-Z0-9_-]+$/;
204             push @out, qq{create extension if not exists "$name" cascade} if $ext->is_optional;
205 0           } catch {
206             $log->warnf('Failed to install optional extension %s, ignoring: %s', $name, $@);
207             }
208             }
209              
210             # All remaining steps are in a single transaction
211 0           push @out, q{begin};
212              
213 0           for my $ext (grep { not $_->is_optional } $self->extension_list) {
  0            
214 0           my ($name) = $ext->name;
215 0 0         die 'invalid name for extension: ' . $name unless $name =~ /^[a-zA-Z0-9_-]+$/;
216 0           push @out, qq{create extension if not exists "$name" cascade};
217             }
218              
219 0           for my $action (@actions) {
220 0 0         if($action->isa('Database::Async::ORM::Table')) {
    0          
    0          
221 0           $log->tracef('Create table %s', $action->name);
222 0           push @out, $ddl->create_table($action);
223             } elsif($action->isa('Database::Async::ORM::Schema')) {
224 0           $log->tracef('Create schema %s', $action->name);
225 0           push @out, $ddl->create_schema($action);
226             } elsif($action->isa('Database::Async::ORM::Type')) {
227 0           $log->tracef('Create type %s', $action->name);
228 0           push @out, $ddl->create_type($action);
229             } else {
230 0           die 'unknown thing ' . $action;
231             }
232             }
233              
234 0           push @out, q{commit};
235              
236             # Make sure that we have no empty queries in the list... should not be necessary,
237             # perhaps this should just bail out instead.
238 0           return grep { length } @out;
  0            
239             }
240              
241             =head2 load_from
242              
243             Loads schema, tables, types and any other available objects from
244             a source - currently supports the following:
245              
246             =over 4
247              
248             =item * hashref
249              
250             =item * YAML file
251              
252             =item * directory of YAML files
253              
254             =back
255              
256             You can call this multiple times to accumulate objects from various
257             different sources.
258              
259             Returns the current L instance.
260              
261             =cut
262              
263             sub load_from {
264 0     0 1   my ($self, $source, $loader) = @_;
265 0 0         die 'needs a source to load from' unless defined $source;
266              
267 0 0         my $cfg = ref($source) ? $source : $self->read_from($source, $loader);
268 0           $self->{schema_definitions} = $cfg;
269 0           $log->tracef('Loaded config %s', $cfg);
270              
271 0           my @pending;
272              
273 0           for my $extension_name ($cfg->{extensions}{required}->@*) {
274             my $extension = Database::Async::ORM::Extension->new(
275             defined_in => $cfg->{extensions}{defined_in},
276 0           name => $extension_name,
277             optional => 0,
278             );
279 0           $self->add_extension($extension);
280             }
281 0           for my $extension_name ($cfg->{extensions}{optional}->@*) {
282             my $extension = Database::Async::ORM::Extension->new(
283             defined_in => $cfg->{extensions}{defined_in},
284 0           name => $extension_name,
285             optional => 1,
286             );
287 0           $self->add_extension($extension);
288             }
289              
290 0           my %pending = (type => []);
291 0           for my $schema_name (sort keys $cfg->{schema}->%*) {
292 0           $log->debugf('%s', $schema_name);
293 0           my $schema_details = $cfg->{schema}{$schema_name};
294              
295             my $schema = Database::Async::ORM::Schema->new(
296             defined_in => $schema_details->{defined_in},
297 0           name => $schema_name
298             );
299 0           $self->add_schema($schema);
300 0           push @pending, $schema;
301              
302 0           for my $type_name (sort keys $schema_details->{types}->%*) {
303 0           my $type_details = $schema_details->{types}{$type_name};
304             push $pending{type}->@*, {
305 0           schema => $schema,
306             name => $type_name,
307             details => $type_details,
308             }
309             }
310              
311 0           for my $table_name (sort keys $schema_details->{tables}->%*) {
312 0           my $table_details = $schema_details->{tables}{$table_name};
313 0           for($table_details->{fields}->@*) {
314             $_->{nullable} = 1 unless exists $_->{nullable}
315 0 0         }
316             push $pending{table}->@*, {
317 0           schema => $schema,
318             name => $table_name,
319             details => $table_details,
320             }
321             }
322             }
323              
324 0           my $found = 0;
325 0           my @missing;
326 0           while(sum0 map { 0 + @$_ } values %pending) {
  0            
327 0           @missing = ();
328 0           $log->tracef('Have %d pending types to check', 0 + $pending{type}->@*);
329 0           for my $item (splice $pending{type}->@*) {
330 0           my $type_name = $item->{name};
331 0           my $type_details = $item->{details};
332 0           my $schema = $item->{schema};
333             try {
334             $log->debugf('Add type %s as %s', $type_name, $type_details);
335             my @fields;
336             for my $field_details ($type_details->{fields}->@*) {
337             my $type = $field_details->{type};
338             if(ref $type) {
339             $type = $self->schema_by_name(
340             $type->{schema}
341             )->type_by_name(
342             $type->{name}
343             )
344             } else {
345             $type = $schema->type_by_name($type);
346             }
347             push @fields, Database::Async::ORM::Field->new(
348             type => $type,
349             name => $field_details->{name},
350             attributes => $field_details->{attributes},
351             nullable => 1,
352             default => $field_details->{default},
353             )
354             }
355             my $type = Database::Async::ORM::Type->new(
356             defined_in => $type_details->{defined_in},
357             name => $type_name,
358             schema => $schema,
359             type => $type_details->{type} // 'enum',
360             description => $type_details->{description},
361             values => $type_details->{data},
362             (exists $type_details->{is} ? (basis => $type_details->{is}) : ()),
363             fields => \@fields,
364             );
365             $schema->add_type($type);
366             push @pending, $type;
367             ++$found;
368 0           } catch {
369             $log->tracef('Failed to apply %s.%s - %s, moved to pending',
370             $schema->name,
371             $type_name,
372             $@
373             );
374             push @missing, {
375             schema => $schema->name,
376             name => $type_name,
377             type => 'type',
378             error => $@
379             };
380             push $pending{type}->@*, $item;
381             }
382             }
383              
384 0           $log->tracef('Have %d pending tables to check', 0 + $pending{table}->@*);
385              
386 0           for my $item (splice $pending{table}->@*) {
387 0           my $table_name = $item->{name};
388 0           my $table_details = $item->{details};
389 0           my $schema = $item->{schema};
390             try {
391             my @parents;
392             if(my $parents = $item->{details}{parents}) {
393             for my $parent (@$parents) {
394             # For convenience, we allow strings for tables in the current schema
395             my $details = ref $parent ? $parent : { name => $parent };
396             $log->tracef('Parent table is %s', $details);
397             my $target_schema = $self->schema_by_name($details->{schema} // $schema->name);
398             push @parents, (
399             $target_schema->table_by_name($details->{name})
400             or die 'parent table ' . $details->{name} . ' not found in schema ' . $target_schema->name
401             );
402             }
403             }
404             if(my $constraints = $item->{details}{constraints}) {
405             for my $fk (@$constraints) {
406             next unless $fk->{type} eq 'foreign_key';
407             die 'FK table ' . $fk->{references}{table} . ' not found' unless $schema->table_by_name($fk->{references}{table});
408             }
409             }
410             my $table = $self->populate_table(
411             schema => $schema,
412             details => $table_details,
413             name => $table_name,
414             parents => \@parents,
415             );
416             push @pending, $table;
417             ++$found;
418             } catch {
419             $log->tracef('Failed to apply %s.%s - %s, moved to pending',
420             $schema->name,
421             $table_name,
422             $@
423             );
424             push @missing, {
425             schema => $schema->name,
426             name => $table_name,
427             type => 'table',
428             error => $@
429             };
430             push $pending{table}->@*, $item;
431             }
432 0           }
433             } continue {
434 0 0 0       if(@missing and not $found) {
435 0           $log->error('Currently pending items:');
436 0           s/\v+$// for map $_->{error}, @missing;
437 0           $log->errorf('- %s.%s (%s) - %s', $_->{schema}, $_->{name}, $_->{type}, $_->{error}) for @missing;
438 0           die 'Unable to resolve dependencies, bailing out';
439             }
440 0           $found = 0;
441             }
442 0           return Future->done(@pending);
443             }
444              
445             =head2 METHODS - Internal
446              
447             These are used by L and the precise API details
448             may change in future.
449              
450             =cut
451              
452             =head2 populate_table
453              
454             Populates a L instance.
455              
456             =cut
457              
458             sub populate_table {
459 0     0 1   my ($self, %args) = @_;
460 0           my $table_name = $args{name};
461 0           my $table_details = $args{details};
462 0           my $schema = $args{schema};
463 0           $log->tracef('Add table %s as %s', $table_name, $table_details);
464             my $table = Database::Async::ORM::Table->new(
465             defined_in => $table_details->{defined_in},
466             name => $table_name,
467             schema => $schema,
468             table => $table_details->{table} // 'enum',
469             description => $table_details->{description},
470             values => $table_details->{data},
471             parents => $args{parents},
472             primary_keys => $table_details->{primary_keys},
473 0   0       );
474 0           for my $field_details ($table_details->{fields}->@*) {
475 0           my $type = $field_details->{type};
476 0 0         if(ref $type) {
477             $type = $self->schema_by_name(
478             $type->{schema}
479             )->type_by_name(
480             $type->{name}
481             )
482 0           } else {
483 0           $type = $schema->type_by_name($type);
484             }
485             my $field = Database::Async::ORM::Field->new(
486             defined_in => $table_details->{defined_in},
487             table => $table,
488             type => $type,
489 0           %{$field_details}{grep { exists $field_details->{$_} } qw(name description nullable default attributes)}
  0            
  0            
490             );
491 0           $log->tracef('Add field %s as %s with type %s', $field->name, $field_details, $field->type);
492 0           push $table->{fields}->@*, $field;
493             }
494 0           for my $constraint_details ($table_details->{constraints}->@*) {
495             my $constraint = Database::Async::ORM::Constraint->new(
496             defined_in => $table_details->{defined_in},
497             table => $table,
498 0           %{$constraint_details}{grep { exists $constraint_details->{$_} } qw(name type deferrable initially_deferred fields references)}
  0            
  0            
499             );
500 0           $log->tracef('Add constraint %s as %s with type %s', $constraint->name, $constraint_details, $constraint->type);
501 0           push $table->{constraints}->@*, $constraint;
502             }
503 0           $schema->add_table($table);
504 0           return $table;
505             }
506              
507             =head2 read_from
508              
509             Reads data from a file or recursively from a base path.
510              
511             =cut
512              
513             sub read_from {
514 0     0 1   my ($self, $source, $loader) = @_;
515 0 0         die 'needs a source to load from' unless defined $source;
516              
517 0           my $base = path($source);
518 0 0         die "$source does not exist" unless $base->exists;
519              
520             $loader //= sub {
521 0     0     my ($path) = @_;
522 0 0         if($path->basename ne $path->basename(qw(.yaml .yml))) {
    0          
523 0           require YAML::XS;
524 0           return YAML::XS::LoadFile("$path")
525             } elsif($path->basename ne $path->basename(qw(.yaml .yml))) {
526 0           require JSON::MaybeXS;
527 0           return JSON::MaybeXS->new->decode($path->slurp_utf8);
528             } else {
529 0           die 'Unknown file type for ' . $path;
530             }
531 0   0       };
532              
533 0 0         return $self->load_from_file(undef, $base, $loader) unless $base->is_dir;
534              
535             # Merge in the data from all files recursively.
536             # For example, schema/personal/tables/address.yml would populate
537             # the {schema}->{personal}->{tables}->{address} element.
538             $base->visit(sub {
539 0     0     my $file = $_;
540 0 0         unless($_->is_file) {
541 0           $log->tracef('Skipping %s since it is not a file', "$_");
542 0           return;
543             }
544              
545 0           $self->load_from_file($base, $file, $loader);
546             }, {
547 0           recurse => 1,
548             follow_symlinks => 1
549             });
550 0           return $self->schema_definitions;
551             }
552              
553             sub load_from_file {
554 0     0 0   my ($self, $base, $file, $loader) = @_;
555 0           my $cfg = $self->schema_definitions;
556              
557             # Strip off the base prefix so that we have something that matches our
558             # desired hash data path
559 0 0         my $relative = $base ? substr $file, 1 + length($base->stringify) : '';
560              
561             # Also drop any file extensions
562 0           $relative =~ s{\.[^.]+$}{};
563              
564             # We now want to recurse into our configuration data stucture to the appropriate level.
565 0           my $target = do {
566 0           my $target = $cfg;
567 0           my (@path) = split qr{/}, $relative;
568 0   0       $target = ($target->{$_} //= {}) for @path;
569 0           $target
570             };
571              
572             # So at this point, $target indicates where we should load data into our structure.
573             # For now, we're blindly overwriting, but ideally we should merge the data structure
574             # recursively with the elements in the file.
575 0           $log->debugf('Pulling in configuration from %s', join '.', split qr{/}, $relative);
576 0           my $file_data = $loader->($file);
577 0 0         if(ref($file_data) eq 'ARRAY') {
    0          
578 0           push @$target, @$file_data;
579             } elsif(ref($file_data) eq 'HASH') {
580 0 0         $target->{defined_in} = $base ? substr $file, 1 + length($base->stringify) : $file;
581 0           @{$target}{keys %$file_data} = values %$file_data;
  0            
582             } else {
583 0           die 'Unknown data type in file ' . $file . ' - ' . ref($file_data) . " (actual value $file_data)";
584             }
585 0           $cfg;
586             }
587              
588             1;
589              
590             __END__