File Coverage

blib/lib/App/Sqitch/Engine/vertica.pm
Criterion Covered Total %
statement 88 173 50.8
branch 18 38 47.3
condition 11 30 36.6
subroutine 37 60 61.6
pod 19 19 100.0
total 173 320 54.0


line stmt bran cond sub pod time code
1             package App::Sqitch::Engine::vertica;
2              
3 3     3   23192 use 5.010;
  3         13  
4 3     3   23 use Moo;
  3         8  
  3         219  
5 3     3   1638 use utf8;
  3         8  
  3         22  
6 3     3   90 use Path::Class;
  3         9  
  3         180  
7 3     3   1628 use DBI;
  3         18443  
  3         132  
8 3     3   21 use Try::Tiny;
  3         11  
  3         188  
9 3     3   20 use App::Sqitch::X qw(hurl);
  3         11  
  3         33  
10 3     3   1489 use Locale::TextDomain qw(App-Sqitch);
  3         9  
  3         24  
11 3     3   672 use App::Sqitch::Types qw(DBH ArrayRef);
  3         9  
  3         34  
12              
13             extends 'App::Sqitch::Engine';
14              
15             our $VERSION = 'v1.4.0'; # VERSION
16              
17 4     4 1 12618 sub key { 'vertica' }
18 3     3 1 44 sub name { 'Vertica' }
19 1     1 1 4 sub driver { 'DBD::ODBC 1.59' }
20 1     1 1 203 sub default_client { 'vsql' }
21              
22             sub destination {
23 13     13 1 594 my $self = shift;
24              
25             # Just use the target name if it doesn't look like a URI or if the URI
26             # includes the database name.
27 13 50 33     150 return $self->target->name if $self->target->name !~ /:/
28             || $self->target->uri->dbname;
29              
30             # Use the URI sans password, and with the database name added.
31 13         615 my $uri = $self->target->uri->clone;
32 13 50       167 $uri->password(undef) if $uri->password;
33 13   66     586 $uri->dbname( $ENV{VSQL_DATABASE} || $self->username );
34 13         901 return $uri->as_string;
35             }
36              
37              
38 8 100   8   447 sub _def_user { $ENV{VSQL_USER} || shift->sqitch->sysuser }
39 4     4   250 sub _def_pass { $ENV{VSQL_PASSWORD} }
40              
41             has _vsql => (
42             is => 'ro',
43             isa => ArrayRef,
44             lazy => 1,
45             default => sub {
46             my $self = shift;
47             my $uri = $self->uri;
48             my @ret = ( $self->client );
49             # Use _port instead of port so it's empty if no port is in the URI.
50             # https://github.com/sqitchers/sqitch/issues/675
51             for my $spec (
52             [ username => $self->username ],
53             [ dbname => $uri->dbname ],
54             [ host => $uri->host ],
55             [ port => $uri->_port ],
56             ) {
57             push @ret, "--$spec->[0]" => $spec->[1] if $spec->[1];
58             }
59              
60             if (my %vars = $self->variables) {
61             push @ret => map {; '--set', "$_=$vars{$_}" } sort keys %vars;
62             }
63              
64             push @ret => $self->_client_opts;
65             return \@ret;
66             },
67             );
68              
69 27     27 1 11417 sub vsql { @{ shift->_vsql } }
  27         560  
70              
71             has dbh => (
72             is => 'rw',
73             isa => DBH,
74             lazy => 1,
75             default => sub {
76             my $self = shift;
77             $self->use_driver;
78              
79             # Set defaults in the URI.
80             my $target = $self->target;
81             my $uri = $self->uri;
82             # https://my.vertica.com/docs/5.1.6/HTML/index.htm#2736.htm
83             $uri->dbname($ENV{VSQL_DATABASE}) if !$uri->dbname && $ENV{VSQL_DATABASE};
84             $uri->host($ENV{VSQL_HOST}) if !$uri->host && $ENV{VSQL_HOST};
85             $uri->port($ENV{VSQL_PORT}) if !$uri->_port && $ENV{VSQL_PORT};
86              
87             DBI->connect($uri->dbi_dsn, $self->username, $self->password, {
88             PrintError => 0,
89             RaiseError => 0,
90             AutoCommit => 1,
91             odbc_utf8_on => 1,
92             HandleError => sub {
93             my ($err, $dbh) = @_;
94             $@ = $err;
95             @_ = ($dbh->state || 'DEV' => $dbh->errstr);
96             goto &hurl;
97             },
98             Callbacks => {
99             connected => sub {
100             my $dbh = shift;
101             $dbh->do('SET search_path = ' . $dbh->quote($self->registry))
102             or $self->_handle_no_registry($dbh);
103             return;
104             },
105             },
106             });
107             }
108             );
109              
110 1     1   6315 sub _listagg_format { undef } # Vertica has none!
111              
112             # Need to wait until dbh is defined.
113             with 'App::Sqitch::Role::DBIEngine';
114              
115             sub _client_opts {
116             return (
117 4     4   71 '--quiet',
118             '--no-vsqlrc',
119             '--no-align',
120             '--tuples-only',
121             '--set' => 'ON_ERROR_STOP=1',
122             '--set' => 'registry=' . shift->registry,
123             );
124             }
125              
126             sub _initialized {
127 0     0   0 my $self = shift;
128 0         0 return $self->dbh->selectcol_arrayref(q{
129             SELECT EXISTS(
130             SELECT TRUE FROM v_catalog.schemata WHERE schema_name = ?
131             )
132             }, undef, $self->registry)->[0];
133             }
134              
135             sub _initialize {
136 0     0   0 my $self = shift;
137 0         0 my $schema = $self->registry;
138 0 0       0 hurl engine => __x(
139             'Sqitch schema "{schema}" already exists',
140             schema => $schema
141             ) if $self->initialized;
142              
143 0         0 $self->_run_registry_file( file(__FILE__)->dir->file('vertica.sql') );
144 0         0 $self->dbh->do('SET search_path = ' . $self->dbh->quote($schema));
145 0         0 $self->_register_release;
146             }
147              
148             sub run_upgrade {
149 0     0 1 0 shift->_run_registry_file(@_);
150             }
151              
152             sub _run_registry_file {
153 0     0   0 my ($self, $file) = @_;
154              
155             # Check the database version.
156 0         0 my $vline = $self->dbh->selectcol_arrayref('SELECT version()')->[0];
157 0         0 my ($maj) = $vline =~ /\bv?(\d+)/;
158              
159             # Need to write a temp file; no :"registry" variable syntax.
160 0         0 my ($schema) = $self->dbh->selectrow_array(
161             'SELECT quote_ident(?)', undef, $self->registry
162             );
163 0         0 (my $sql = scalar $file->slurp) =~ s{:"registry"}{$schema}g;
164              
165             # Write out the temporary file.
166 0         0 require File::Temp;
167 0         0 my $fh = File::Temp->new;
168 0         0 print $fh $sql;
169 0         0 close $fh;
170              
171             # Now we can execute the file.
172 0         0 $self->_run_with_verbosity( $fh->filename );
173             }
174              
175             sub _no_table_error {
176 5   100 5   60 return $DBI::state && $DBI::state eq '42V01'; # ERRCODE_UNDEFINED_TABLE
177             }
178              
179             sub _no_column_error {
180 4   100 4   40 return $DBI::state && $DBI::state eq '42703'; # ERRCODE_UNDEFINED_COLUMN
181             }
182              
183             sub _unique_error {
184 0   0 0   0 return $DBI::state && $DBI::state eq '23505'; # ERRCODE_UNIQUE_VIOLATION
185             }
186              
187             sub _dt($) {
188 1     1   808 require App::Sqitch::DateTime;
189 1         23 return App::Sqitch::DateTime->new(split /:/ => shift);
190             }
191              
192             sub _multi_values {
193 0     0   0 my ($self, $count, $expr) = @_;
194 0         0 return join "\nUNION ALL ", ("SELECT $expr") x $count;
195             }
196              
197             sub _dependency_placeholders {
198 0     0   0 return 'CAST(? AS CHAR(40)), CAST(? AS VARCHAR), CAST(? AS VARCHAR), CAST(? AS CHAR(40))';
199             }
200              
201             sub _tag_placeholders {
202 0     0   0 my $self = shift;
203 0         0 return join(', ',
204             'CAST(? AS CHAR(40))',
205             'CAST(? AS VARCHAR)',
206             'CAST(? AS VARCHAR)',
207             'CAST(? AS CHAR(40))',
208             'CAST(? AS VARCHAR)',
209             'CAST(? AS VARCHAR)',
210             'CAST(? AS VARCHAR)',
211             'CAST(? AS TIMESTAMPTZ)',
212             'CAST(? AS VARCHAR)',
213             'CAST(? AS VARCHAR)',
214             $self->_ts_default,
215             );
216             }
217              
218             sub _tag_subselect_columns {
219 0     0   0 my $self = shift;
220 0         0 return join(', ',
221             'CAST(? AS CHAR(40)) AS tid',
222             'CAST(? AS VARCHAR) AS tname',
223             'CAST(? AS VARCHAR) AS proj',
224             'CAST(? AS CHAR(40)) AS cid',
225             'CAST(? AS VARCHAR) AS note',
226             'CAST(? AS VARCHAR) AS cuser',
227             'CAST(? AS VARCHAR) AS cemail',
228             'CAST(? AS TIMESTAMPTZ) AS tts',
229             'CAST(? AS VARCHAR) AS puser',
230             'CAST(? AS VARCHAR) AS pemail',
231             $self->_ts_default,
232             );
233             }
234              
235             sub _select_state {
236 0     0   0 my ( $self, $project, $with_hash ) = @_;
237 0         0 my $cdtcol = sprintf $self->_ts2char_format, 'c.committed_at';
238 0         0 my $pdtcol = sprintf $self->_ts2char_format, 'c.planned_at';
239 0 0       0 my $hshcol = $with_hash ? "c.script_hash\n , " : '';
240 0   0     0 return $self->dbh->selectrow_hashref(qq{
241             SELECT c.change_id
242             , ${hshcol}c.change
243             , c.project
244             , c.note
245             , c.committer_name
246             , c.committer_email
247             , $cdtcol AS committed_at
248             , c.planner_name
249             , c.planner_email
250             , $pdtcol AS planned_at
251             FROM changes c
252             WHERE c.project = ?
253             ORDER BY c.committed_at DESC
254             LIMIT 1
255             }, undef, $project // $self->plan->project );
256             }
257              
258             sub current_state {
259 1     1 1 615 my ( $self, $project ) = @_;
260             my $state = try {
261 1     1   152 $self->_select_state($project, 1)
262             } catch {
263 1 50 33 1   38 return if $self->_no_table_error && !$self->initialized;
264 1 50       9 return $self->_select_state($project, 0) if $self->_no_column_error;
265 1         11 die $_;
266 1 0       14 } or return undef;
267              
268             $state->{tags} = $self->dbh->selectcol_arrayref(
269             'SELECT tag FROM tags WHERE change_id = ? ORDER BY committed_at',
270             undef, $state->{change_id}
271 0         0 );
272 0         0 $state->{committed_at} = _dt $state->{committed_at};
273 0         0 $state->{planned_at} = _dt $state->{planned_at};
274 0         0 return $state;
275             }
276              
277             sub _deployed_changes {
278 0     0   0 my ($self, $sql, @params) = @_;
279 0         0 my $sth = $self->dbh->prepare($sql);
280 0         0 $sth->execute(@params);
281              
282 0         0 my ($last_id, @changes) = ('');
283 0         0 while (my $res = $sth->fetchrow_hashref) {
284 0 0       0 if ($res->{id} eq $last_id) {
285 0         0 push @{ $changes[-1]->{tags} } => $res->{tag};
  0         0  
286             } else {
287 0         0 $last_id = $res->{id};
288 0   0     0 $res->{tags} = [ delete $res->{tag} || () ];
289 0         0 $res->{timestamp} = _dt $res->{timestamp};
290 0         0 push @changes => $res;
291             }
292             }
293 0         0 return @changes;
294             }
295              
296             sub deployed_changes {
297 0     0 1 0 my $self = shift;
298 0         0 my $tscol = sprintf $self->_ts2char_format, 'c.planned_at';
299 0         0 return $self->_deployed_changes(qq{
300             SELECT c.change_id AS id, c.change AS name, c.project, c.note,
301             $tscol AS "timestamp", c.planner_name, c.planner_email,
302             t.tag AS tag, c.script_hash
303             FROM changes c
304             LEFT JOIN tags t ON c.change_id = t.change_id
305             WHERE c.project = ?
306             ORDER BY c.committed_at ASC
307             }, $self->plan->project);
308             }
309              
310             sub deployed_changes_since {
311 0     0 1 0 my ( $self, $change ) = @_;
312 0         0 my $tscol = sprintf $self->_ts2char_format, 'c.planned_at';
313 0         0 $self->_deployed_changes(qq{
314             SELECT c.change_id AS id, c.change AS name, c.project, c.note,
315             $tscol AS "timestamp", c.planner_name, c.planner_email,
316             t.tag AS tag, c.script_hash
317             FROM changes c
318             LEFT JOIN tags t ON c.change_id = t.change_id
319             WHERE c.project = ?
320             AND c.committed_at > (SELECT committed_at FROM changes WHERE change_id = ?)
321             ORDER BY c.committed_at ASC
322             }, $self->plan->project, $change->id);
323             }
324              
325             sub load_change {
326 0     0 1 0 my ( $self, $change_id ) = @_;
327 0         0 my $tscol = sprintf $self->_ts2char_format, 'c.planned_at';
328 0         0 my @res = $self->_deployed_changes(qq{
329             SELECT c.change_id AS id, c.change AS name, c.project, c.note,
330             $tscol AS "timestamp", c.planner_name, c.planner_email,
331             t.tag AS tag, c.script_hash
332             FROM changes c
333             LEFT JOIN tags t ON c.change_id = t.change_id
334             WHERE c.change_id = ?
335             }, $change_id);
336 0         0 return $res[0];
337             }
338              
339             sub _offset_op {
340 0     0   0 my ( $self, $offset ) = @_;
341 0 0       0 my ( $dir, $op ) = $offset > 0 ? ( 'ASC', '>' ) : ( 'DESC' , '<' );
342 0         0 return $dir, $op, 'OFFSET ' . (abs($offset) - 1);
343             }
344              
345             sub change_id_offset_from_id {
346 0     0 1 0 my ( $self, $change_id, $offset ) = @_;
347              
348             # Just return the ID if there is no offset.
349 0 0       0 return $change_id unless $offset;
350              
351             # Are we offset forwards or backwards?
352 0         0 my ($dir, $op, $offset_expr) = $self->_offset_op($offset);
353 0         0 return $self->dbh->selectcol_arrayref(qq{
354             SELECT change_id
355             FROM changes
356             WHERE project = ?
357             AND committed_at $op (
358             SELECT committed_at FROM changes WHERE change_id = ?
359             )
360             ORDER BY committed_at $dir
361             LIMIT 1 $offset_expr
362             }, undef, $self->plan->project, $change_id)->[0];
363             }
364              
365             sub change_offset_from_id {
366 0     0 1 0 my ( $self, $change_id, $offset ) = @_;
367              
368             # Just return the object if there is no offset.
369 0 0       0 return $self->load_change($change_id) unless $offset;
370              
371             # Are we offset forwards or backwards?
372 0         0 my ($dir, $op, $offset_expr) = $self->_offset_op($offset);
373 0         0 my $tscol = sprintf $self->_ts2char_format, 'c.planned_at';
374              
375 0         0 my @res = $self->_deployed_changes(qq{
376             SELECT c.change_id AS id, c.change AS name, c.project, c.note,
377             $tscol AS "timestamp", c.planner_name, c.planner_email,
378             t.tag AS tag, c.script_hash
379             FROM changes c
380             LEFT JOIN tags t ON c.change_id = t.change_id
381             WHERE c.project = ?
382             AND c.committed_at $op (
383             SELECT committed_at FROM changes WHERE change_id = ?
384             )
385             ORDER BY c.committed_at $dir
386             $offset_expr
387             }, $self->plan->project, $change_id);
388 0         0 return $res[0];
389             }
390              
391             sub _ts2char_format {
392 1     1   1249 q{to_char(%s AT TIME ZONE 'UTC', '"year":YYYY:"month":MM:"day":DD:"hour":HH24:"minute":MI:"second":SS:"time_zone":"UTC"')};
393             }
394              
395 0     0   0 sub _ts_default { 'clock_timestamp()' }
396              
397 0     0   0 sub _char2ts { $_[1]->as_string(format => 'iso') }
398              
399 0     0   0 sub _regex_op { '~' }
400              
401             # Override to lock the changes table. This ensures that only one instance of
402             # Sqitch runs at one time.
403             sub begin_work {
404 0     0 1 0 my $self = shift;
405 0         0 my $dbh = $self->dbh;
406              
407             # Start transaction and lock changes to allow only one change at a time.
408 0         0 $dbh->begin_work;
409 0         0 $dbh->do('LOCK TABLE changes IN EXCLUSIVE MODE');
410 0         0 return $self;
411             }
412              
413             sub run_file {
414 1     1 1 730 my ($self, $file) = @_;
415 1         6 $self->_run('--file' => $file);
416             }
417              
418 2     2 1 1554 sub run_verify { shift->_run_with_verbosity(@_) }
419              
420             sub _run_with_verbosity {
421 2     2   9 my $self = shift;
422 2 100       35 my $meth = $self->can($self->sqitch->verbosity > 1 ? '_run' : '_capture');
423 2         157 return $self->$meth('--file' => @_);
424             }
425              
426             sub run_handle {
427 1     1 1 747 my ($self, $fh) = @_;
428 1         5 $self->_spool($fh);
429             }
430              
431             sub _cid {
432 1     1   632 my ( $self, $ord, $offset, $project ) = @_;
433              
434 1 50       5 my $offexpr = $offset ? " OFFSET $offset" : '';
435             return try {
436 1   0 1   105 return $self->dbh->selectcol_arrayref(qq{
437             SELECT change_id
438             FROM changes
439             WHERE project = ?
440             ORDER BY committed_at $ord
441             LIMIT 1$offexpr
442             }, undef, $project || $self->plan->project)->[0];
443             } catch {
444 1 50 33 1   21 return if $self->_no_table_error && !$self->initialized;
445 1         21 die $_;
446 1         11 };
447             }
448              
449             sub changes_requiring_change {
450 0     0 1 0 my ( $self, $change ) = @_;
451             # Why CTE: https://forums.oracle.com/forums/thread.jspa?threadID=1005221
452 0         0 return @{ $self->dbh->selectall_arrayref(q{
  0         0  
453             WITH tag AS (
454             SELECT tag, committed_at, project,
455             ROW_NUMBER() OVER (partition by project ORDER BY committed_at) AS rnk
456             FROM tags
457             )
458             SELECT c.change_id, c.project, c.change, t.tag AS asof_tag
459             FROM dependencies d
460             JOIN changes c ON c.change_id = d.change_id
461             LEFT JOIN tag t ON t.project = c.project AND t.committed_at >= c.committed_at
462             WHERE d.dependency_id = ?
463             AND (t.rnk IS NULL OR t.rnk = 1)
464             }, { Slice => {} }, $change->id) };
465             }
466              
467             sub name_for_change_id {
468 0     0 1 0 my ( $self, $change_id ) = @_;
469             # Why CTE: https://forums.oracle.com/forums/thread.jspa?threadID=1005221
470 0         0 return $self->dbh->selectcol_arrayref(q{
471             WITH tag AS (
472             SELECT tag, committed_at, project,
473             ROW_NUMBER() OVER (partition by project ORDER BY committed_at) AS rnk
474             FROM tags
475             )
476             SELECT change || COALESCE(t.tag, '@HEAD')
477             FROM changes c
478             LEFT JOIN tag t ON c.project = t.project AND t.committed_at >= c.committed_at
479             WHERE change_id = ?
480             AND (t.rnk IS NULL OR t.rnk = 1)
481             }, undef, $change_id)->[0];
482             }
483              
484             sub _run {
485 4     4   2218 my $self = shift;
486 4         13 my $sqitch = $self->sqitch;
487 4 100       94 my $pass = $self->password or return $sqitch->run( $self->vsql, @_ );
488 1         149 local $ENV{VSQL_PASSWORD} = $pass;
489 1         7 return $sqitch->run( $self->vsql, @_ );
490             }
491              
492             sub _capture {
493 3     3   1437 my $self = shift;
494 3         12 my $sqitch = $self->sqitch;
495 3 100       66 my $pass = $self->password or return $sqitch->capture( $self->vsql, @_ );
496 1         21 local $ENV{VSQL_PASSWORD} = $pass;
497 1         6 return $sqitch->capture( $self->vsql, @_ );
498             }
499              
500             sub _probe {
501 2     2   1462 my $self = shift;
502 2         10 my $sqitch = $self->sqitch;
503 2 100       49 my $pass = $self->password or return $sqitch->probe( $self->vsql, @_ );
504 1         18 local $ENV{VSQL_PASSWORD} = $pass;
505 1         7 return $sqitch->probe( $self->vsql, @_ );
506             }
507              
508             sub _spool {
509 3     3   1490 my $self = shift;
510 3         7 my $fh = shift;
511 3         11 my $sqitch = $self->sqitch;
512 3 100       75 my $pass = $self->password or return $sqitch->spool( $fh, $self->vsql, @_ );
513 1         26 local $ENV{VSQL_PASSWORD} = $pass;
514 1         5 return $sqitch->spool( $fh, $self->vsql, @_ );
515             }
516              
517             1;
518              
519             __END__
520              
521             =head1 Name
522              
523             App::Sqitch::Engine::vertica - Sqitch Vertica Engine
524              
525             =head1 Synopsis
526              
527             my $vertica = App::Sqitch::Engine->load( engine => 'vertica' );
528              
529             =head1 Description
530              
531             App::Sqitch::Engine::vertica provides the Vertica storage engine for Sqitch.
532             It supports Vertica 6.
533              
534             =head1 Interface
535              
536             =head2 Instance Methods
537              
538             =head3 C<initialized>
539              
540             $vertica->initialize unless $vertica->initialized;
541              
542             Returns true if the database has been initialized for Sqitch, and false if it
543             has not.
544              
545             =head3 C<initialize>
546              
547             $vertica->initialize;
548              
549             Initializes a database for Sqitch by installing the Sqitch registry schema.
550              
551             =head3 C<vsql>
552              
553             Returns a list containing the C<vsql> client and options to be passed to it.
554             Used internally when executing scripts.
555              
556             =head1 Author
557              
558             David E. Wheeler <david@justatheory.com>
559              
560             =head1 License
561              
562             Copyright (c) 2012-2023 iovation Inc., David E. Wheeler
563              
564             Permission is hereby granted, free of charge, to any person obtaining a copy
565             of this software and associated documentation files (the "Software"), to deal
566             in the Software without restriction, including without limitation the rights
567             to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
568             copies of the Software, and to permit persons to whom the Software is
569             furnished to do so, subject to the following conditions:
570              
571             The above copyright notice and this permission notice shall be included in all
572             copies or substantial portions of the Software.
573              
574             THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
575             IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
576             FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
577             AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
578             LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
579             OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
580             SOFTWARE.
581              
582             =cut