File Coverage

blib/lib/App/Sqitch/Engine/vertica.pm
Criterion Covered Total %
statement 94 179 52.5
branch 22 44 50.0
condition 15 39 38.4
subroutine 38 61 62.3
pod 19 19 100.0
total 188 342 54.9


line stmt bran cond sub pod time code
1             package App::Sqitch::Engine::vertica;
2              
3 3     3   29276 use 5.010;
  3         13  
4 3     3   21 use Moo;
  3         8  
  3         27  
5 3     3   1540 use utf8;
  3         7  
  3         27  
6 3     3   87 use Path::Class;
  3         13  
  3         255  
7 3     3   1445 use DBI;
  3         16262  
  3         198  
8 3     3   25 use Try::Tiny;
  3         8  
  3         255  
9 3     3   21 use App::Sqitch::X qw(hurl);
  3         6  
  3         39  
10 3     3   1202 use Locale::TextDomain qw(App-Sqitch);
  3         7  
  3         58  
11 3     3   771 use App::Sqitch::Types qw(DBH ArrayRef);
  3         7  
  3         40  
12              
13             extends 'App::Sqitch::Engine';
14              
15             our $VERSION = 'v1.6.1'; # VERSION
16              
17 5     5 1 6193 sub key { 'vertica' }
18 3     3 1 21 sub name { 'Vertica' }
19 1     1 1 4 sub driver { 'DBD::ODBC 1.59' }
20 1     1 1 181 sub default_client { 'vsql' }
21              
22             sub destination {
23 13     13 1 1008 my $self = shift;
24              
25             # Just use the target name if it doesn't look like a URI or if the URI
26             # includes the database name.
27 13 50 33     184 return $self->target->name if $self->target->name !~ /:/
28             || $self->target->uri->dbname;
29              
30             # Use the URI sans password, and with the database name added.
31 13         758 my $uri = $self->target->uri->clone;
32 13 50       200 $uri->password(undef) if $uri->password;
33 13   66     883 $uri->dbname( $ENV{VSQL_DATABASE} || $self->username );
34 13         1130 return $uri->as_string;
35             }
36              
37              
38 8 100   8   835 sub _def_user { $ENV{VSQL_USER} || shift->sqitch->sysuser }
39 4     4   502 sub _def_pass { $ENV{VSQL_PASSWORD} }
40              
41             has _vsql => (
42             is => 'ro',
43             isa => ArrayRef,
44             lazy => 1,
45             default => sub {
46             my $self = shift;
47             my $uri = $self->uri;
48             my @ret = ( $self->client );
49             # Use _port instead of port so it's empty if no port is in the URI.
50             # https://github.com/sqitchers/sqitch/issues/675
51             for my $spec (
52             [ username => $self->username ],
53             [ dbname => $uri->dbname ],
54             [ host => $uri->host ],
55             [ port => $uri->_port ],
56             ) {
57             push @ret, "--$spec->[0]" => $spec->[1] if $spec->[1];
58             }
59              
60             if (my %vars = $self->variables) {
61             push @ret => map {; '--set', "$_=$vars{$_}" } sort keys %vars;
62             }
63              
64             push @ret => $self->_client_opts;
65             return \@ret;
66             },
67             );
68              
69 27     27 1 14511 sub vsql { @{ shift->_vsql } }
  27         863  
70              
71             sub _dsn {
72 3     3   2192 my $self = shift;
73             # Set defaults in the URI.
74 3         110 my $uri = $self->uri;
75             # https://my.vertica.com/docs/5.1.6/HTML/index.htm#2736.htm
76 3 100 66     178 $uri->dbname($ENV{VSQL_DATABASE}) if !$uri->dbname && $ENV{VSQL_DATABASE};
77 3 50 33     253 $uri->host($ENV{VSQL_HOST}) if !$uri->host && $ENV{VSQL_HOST};
78 3 50 33     120 $uri->port($ENV{VSQL_PORT}) if !$uri->_port && $ENV{VSQL_PORT};
79 3         91 return $uri->dbi_dsn;
80             }
81              
82             has dbh => (
83             is => 'rw',
84             isa => DBH,
85             lazy => 1,
86             default => sub {
87             my $self = shift;
88             $self->use_driver;
89              
90             DBI->connect($self->_dsn, $self->username, $self->password, {
91             PrintError => 0,
92             RaiseError => 0,
93             AutoCommit => 1,
94             odbc_utf8_on => 1,
95             HandleError => $self->error_handler,
96             Callbacks => {
97             connected => sub {
98             my $dbh = shift;
99             $dbh->do('SET search_path = ' . $dbh->quote($self->registry))
100             or $self->_handle_no_registry($dbh);
101             return;
102             },
103             },
104             });
105             }
106             );
107              
108 1     1   6852 sub _listagg_format { undef } # Vertica has none!
109              
110             # Need to wait until dbh is defined.
111             with 'App::Sqitch::Role::DBIEngine';
112              
113             sub _client_opts {
114             return (
115 4     4   87 '--quiet',
116             '--no-vsqlrc',
117             '--no-align',
118             '--tuples-only',
119             '--set' => 'ON_ERROR_STOP=1',
120             '--set' => 'registry=' . shift->registry,
121             );
122             }
123              
124             sub _initialized {
125 0     0   0 my $self = shift;
126 0         0 return $self->dbh->selectcol_arrayref(q{
127             SELECT EXISTS(
128             SELECT TRUE FROM v_catalog.schemata WHERE schema_name = ?
129             )
130             }, undef, $self->registry)->[0];
131             }
132              
133             sub _initialize {
134 0     0   0 my $self = shift;
135 0         0 my $schema = $self->registry;
136 0 0       0 hurl engine => __x(
137             'Sqitch schema "{schema}" already exists',
138             schema => $schema
139             ) if $self->initialized;
140              
141 0         0 $self->_run_registry_file( file(__FILE__)->dir->file('vertica.sql') );
142 0         0 $self->dbh->do('SET search_path = ' . $self->dbh->quote($schema));
143 0         0 $self->_register_release;
144             }
145              
146             sub run_upgrade {
147 0     0 1 0 shift->_run_registry_file(@_);
148             }
149              
150             sub _run_registry_file {
151 0     0   0 my ($self, $file) = @_;
152              
153             # Check the database version.
154 0         0 my $vline = $self->dbh->selectcol_arrayref('SELECT version()')->[0];
155 0         0 my ($maj) = $vline =~ /\bv?(\d+)/;
156              
157             # Need to write a temp file; no :"registry" variable syntax.
158 0         0 my ($schema) = $self->dbh->selectrow_array(
159             'SELECT quote_ident(?)', undef, $self->registry
160             );
161 0         0 (my $sql = scalar $file->slurp) =~ s{:"registry"}{$schema}g;
162              
163             # Write out the temporary file.
164 0         0 require File::Temp;
165 0         0 my $fh = File::Temp->new;
166 0         0 print $fh $sql;
167 0         0 close $fh;
168              
169             # Now we can execute the file.
170 0         0 $self->_run_with_verbosity( $fh->filename );
171             }
172              
173             sub _no_table_error {
174 5   100 5   52 return $DBI::state && $DBI::state eq '42V01'; # ERRCODE_UNDEFINED_TABLE
175             }
176              
177             sub _no_column_error {
178 4   100 4   35 return $DBI::state && $DBI::state eq '42703'; # ERRCODE_UNDEFINED_COLUMN
179             }
180              
181             sub _unique_error {
182 0   0 0   0 return $DBI::state && $DBI::state eq '23505'; # ERRCODE_UNIQUE_VIOLATION
183             }
184              
185             sub _dt($) {
186 1     1   791 require App::Sqitch::DateTime;
187 1         16 return App::Sqitch::DateTime->new(split /:/ => shift);
188             }
189              
190             sub _multi_values {
191 0     0   0 my ($self, $count, $expr) = @_;
192 0         0 return join "\nUNION ALL ", ("SELECT $expr") x $count;
193             }
194              
195             sub _dependency_placeholders {
196 0     0   0 return 'CAST(? AS CHAR(40)), CAST(? AS VARCHAR), CAST(? AS VARCHAR), CAST(? AS CHAR(40))';
197             }
198              
199             sub _tag_placeholders {
200 0     0   0 my $self = shift;
201 0         0 return join(', ',
202             'CAST(? AS CHAR(40))',
203             'CAST(? AS VARCHAR)',
204             'CAST(? AS VARCHAR)',
205             'CAST(? AS CHAR(40))',
206             'CAST(? AS VARCHAR)',
207             'CAST(? AS VARCHAR)',
208             'CAST(? AS VARCHAR)',
209             'CAST(? AS TIMESTAMPTZ)',
210             'CAST(? AS VARCHAR)',
211             'CAST(? AS VARCHAR)',
212             $self->_ts_default,
213             );
214             }
215              
216             sub _tag_subselect_columns {
217 0     0   0 my $self = shift;
218 0         0 return join(', ',
219             'CAST(? AS CHAR(40)) AS tid',
220             'CAST(? AS VARCHAR) AS tname',
221             'CAST(? AS VARCHAR) AS proj',
222             'CAST(? AS CHAR(40)) AS cid',
223             'CAST(? AS VARCHAR) AS note',
224             'CAST(? AS VARCHAR) AS cuser',
225             'CAST(? AS VARCHAR) AS cemail',
226             'CAST(? AS TIMESTAMPTZ) AS tts',
227             'CAST(? AS VARCHAR) AS puser',
228             'CAST(? AS VARCHAR) AS pemail',
229             $self->_ts_default,
230             );
231             }
232              
233             sub _select_state {
234 0     0   0 my ( $self, $project, $with_hash ) = @_;
235 0         0 my $cdtcol = sprintf $self->_ts2char_format, 'c.committed_at';
236 0         0 my $pdtcol = sprintf $self->_ts2char_format, 'c.planned_at';
237 0 0       0 my $hshcol = $with_hash ? "c.script_hash\n , " : '';
238 0   0     0 return $self->dbh->selectrow_hashref(qq{
239             SELECT c.change_id
240             , ${hshcol}c.change
241             , c.project
242             , c.note
243             , c.committer_name
244             , c.committer_email
245             , $cdtcol AS committed_at
246             , c.planner_name
247             , c.planner_email
248             , $pdtcol AS planned_at
249             FROM changes c
250             WHERE c.project = ?
251             ORDER BY c.committed_at DESC
252             LIMIT 1
253             }, undef, $project // $self->plan->project );
254             }
255              
256             sub current_state {
257 1     1 1 678 my ( $self, $project ) = @_;
258             my $state = try {
259 1     1   135 $self->_select_state($project, 1)
260             } catch {
261 1 50 33 1   20 return if $self->_no_table_error && !$self->initialized;
262 1 50       4 return $self->_select_state($project, 0) if $self->_no_column_error;
263 1         12 die $_;
264 1 0       14 } or return undef;
265              
266             $state->{tags} = $self->dbh->selectcol_arrayref(
267             'SELECT tag FROM tags WHERE change_id = ? ORDER BY committed_at',
268             undef, $state->{change_id}
269 0         0 );
270 0         0 $state->{committed_at} = _dt $state->{committed_at};
271 0         0 $state->{planned_at} = _dt $state->{planned_at};
272 0         0 return $state;
273             }
274              
275             sub _deployed_changes {
276 0     0   0 my ($self, $sql, @params) = @_;
277 0         0 my $sth = $self->dbh->prepare($sql);
278 0         0 $sth->execute(@params);
279              
280 0         0 my ($last_id, @changes) = ('');
281 0         0 while (my $res = $sth->fetchrow_hashref) {
282 0 0       0 if ($res->{id} eq $last_id) {
283 0         0 push @{ $changes[-1]->{tags} } => $res->{tag};
  0         0  
284             } else {
285 0         0 $last_id = $res->{id};
286 0   0     0 $res->{tags} = [ delete $res->{tag} || () ];
287 0         0 $res->{timestamp} = _dt $res->{timestamp};
288 0         0 push @changes => $res;
289             }
290             }
291 0         0 return @changes;
292             }
293              
294             sub deployed_changes {
295 0     0 1 0 my $self = shift;
296 0         0 my $tscol = sprintf $self->_ts2char_format, 'c.planned_at';
297 0         0 return $self->_deployed_changes(qq{
298             SELECT c.change_id AS id, c.change AS name, c.project, c.note,
299             $tscol AS "timestamp", c.planner_name, c.planner_email,
300             t.tag AS tag, c.script_hash
301             FROM changes c
302             LEFT JOIN tags t ON c.change_id = t.change_id
303             WHERE c.project = ?
304             ORDER BY c.committed_at ASC
305             }, $self->plan->project);
306             }
307              
308             sub deployed_changes_since {
309 0     0 1 0 my ( $self, $change ) = @_;
310 0         0 my $tscol = sprintf $self->_ts2char_format, 'c.planned_at';
311 0         0 $self->_deployed_changes(qq{
312             SELECT c.change_id AS id, c.change AS name, c.project, c.note,
313             $tscol AS "timestamp", c.planner_name, c.planner_email,
314             t.tag AS tag, c.script_hash
315             FROM changes c
316             LEFT JOIN tags t ON c.change_id = t.change_id
317             WHERE c.project = ?
318             AND c.committed_at > (SELECT committed_at FROM changes WHERE change_id = ?)
319             ORDER BY c.committed_at ASC
320             }, $self->plan->project, $change->id);
321             }
322              
323             sub load_change {
324 0     0 1 0 my ( $self, $change_id ) = @_;
325 0         0 my $tscol = sprintf $self->_ts2char_format, 'c.planned_at';
326 0         0 my @res = $self->_deployed_changes(qq{
327             SELECT c.change_id AS id, c.change AS name, c.project, c.note,
328             $tscol AS "timestamp", c.planner_name, c.planner_email,
329             t.tag AS tag, c.script_hash
330             FROM changes c
331             LEFT JOIN tags t ON c.change_id = t.change_id
332             WHERE c.change_id = ?
333             }, $change_id);
334 0         0 return $res[0];
335             }
336              
337             sub _offset_op {
338 0     0   0 my ( $self, $offset ) = @_;
339 0 0       0 my ( $dir, $op ) = $offset > 0 ? ( 'ASC', '>' ) : ( 'DESC' , '<' );
340 0         0 return $dir, $op, 'OFFSET ' . (abs($offset) - 1);
341             }
342              
343             sub change_id_offset_from_id {
344 0     0 1 0 my ( $self, $change_id, $offset ) = @_;
345              
346             # Just return the ID if there is no offset.
347 0 0       0 return $change_id unless $offset;
348              
349             # Are we offset forwards or backwards?
350 0         0 my ($dir, $op, $offset_expr) = $self->_offset_op($offset);
351 0         0 return $self->dbh->selectcol_arrayref(qq{
352             SELECT change_id
353             FROM changes
354             WHERE project = ?
355             AND committed_at $op (
356             SELECT committed_at FROM changes WHERE change_id = ?
357             )
358             ORDER BY committed_at $dir
359             LIMIT 1 $offset_expr
360             }, undef, $self->plan->project, $change_id)->[0];
361             }
362              
363             sub change_offset_from_id {
364 0     0 1 0 my ( $self, $change_id, $offset ) = @_;
365              
366             # Just return the object if there is no offset.
367 0 0       0 return $self->load_change($change_id) unless $offset;
368              
369             # Are we offset forwards or backwards?
370 0         0 my ($dir, $op, $offset_expr) = $self->_offset_op($offset);
371 0         0 my $tscol = sprintf $self->_ts2char_format, 'c.planned_at';
372              
373 0         0 my @res = $self->_deployed_changes(qq{
374             SELECT c.change_id AS id, c.change AS name, c.project, c.note,
375             $tscol AS "timestamp", c.planner_name, c.planner_email,
376             t.tag AS tag, c.script_hash
377             FROM changes c
378             LEFT JOIN tags t ON c.change_id = t.change_id
379             WHERE c.project = ?
380             AND c.committed_at $op (
381             SELECT committed_at FROM changes WHERE change_id = ?
382             )
383             ORDER BY c.committed_at $dir
384             $offset_expr
385             }, $self->plan->project, $change_id);
386 0         0 return $res[0];
387             }
388              
389             sub _ts2char_format {
390 1     1   1596 q{to_char(%s AT TIME ZONE 'UTC', '"year":YYYY:"month":MM:"day":DD:"hour":HH24:"minute":MI:"second":SS:"time_zone":"UTC"')};
391             }
392              
393 0     0   0 sub _ts_default { 'clock_timestamp()' }
394              
395 0     0   0 sub _char2ts { $_[1]->as_string(format => 'iso') }
396              
397 0     0   0 sub _regex_op { '~' }
398              
399             # Override to lock the changes table. This ensures that only one instance of
400             # Sqitch runs at one time.
401             sub begin_work {
402 0     0 1 0 my $self = shift;
403 0         0 my $dbh = $self->dbh;
404              
405             # Start transaction and lock changes to allow only one change at a time.
406 0         0 $dbh->begin_work;
407 0         0 $dbh->do('LOCK TABLE changes IN EXCLUSIVE MODE');
408 0         0 return $self;
409             }
410              
411             sub run_file {
412 1     1 1 855 my ($self, $file) = @_;
413 1         7 $self->_run('--file' => $file);
414             }
415              
416 2     2 1 2099 sub run_verify { shift->_run_with_verbosity(@_) }
417              
418             sub _run_with_verbosity {
419 2     2   5 my $self = shift;
420 2 100       52 my $meth = $self->can($self->sqitch->verbosity > 1 ? '_run' : '_capture');
421 2         244 return $self->$meth('--file' => @_);
422             }
423              
424             sub run_handle {
425 1     1 1 976 my ($self, $fh) = @_;
426 1         5 $self->_spool($fh);
427             }
428              
429             sub _cid {
430 1     1   650 my ( $self, $ord, $offset, $project ) = @_;
431              
432 1 50       5 my $offexpr = $offset ? " OFFSET $offset" : '';
433             return try {
434 1   0 1   114 return $self->dbh->selectcol_arrayref(qq{
435             SELECT change_id
436             FROM changes
437             WHERE project = ?
438             ORDER BY committed_at $ord
439             LIMIT 1$offexpr
440             }, undef, $project || $self->plan->project)->[0];
441             } catch {
442 1 50 33 1   18 return if $self->_no_table_error && !$self->initialized;
443 1         14 die $_;
444 1         11 };
445             }
446              
447             sub changes_requiring_change {
448 0     0 1 0 my ( $self, $change ) = @_;
449             # Why CTE: https://forums.oracle.com/forums/thread.jspa?threadID=1005221
450 0         0 return @{ $self->dbh->selectall_arrayref(q{
  0         0  
451             WITH tag AS (
452             SELECT tag, committed_at, project,
453             ROW_NUMBER() OVER (partition by project ORDER BY committed_at) AS rnk
454             FROM tags
455             )
456             SELECT c.change_id, c.project, c.change, t.tag AS asof_tag
457             FROM dependencies d
458             JOIN changes c ON c.change_id = d.change_id
459             LEFT JOIN tag t ON t.project = c.project AND t.committed_at >= c.committed_at
460             WHERE d.dependency_id = ?
461             AND (t.rnk IS NULL OR t.rnk = 1)
462             }, { Slice => {} }, $change->id) };
463             }
464              
465             sub name_for_change_id {
466 0     0 1 0 my ( $self, $change_id ) = @_;
467             # Why CTE: https://forums.oracle.com/forums/thread.jspa?threadID=1005221
468 0         0 return $self->dbh->selectcol_arrayref(q{
469             WITH tag AS (
470             SELECT tag, committed_at, project,
471             ROW_NUMBER() OVER (partition by project ORDER BY committed_at) AS rnk
472             FROM tags
473             )
474             SELECT change || COALESCE(t.tag, '@HEAD')
475             FROM changes c
476             LEFT JOIN tag t ON c.project = t.project AND t.committed_at >= c.committed_at
477             WHERE change_id = ?
478             AND (t.rnk IS NULL OR t.rnk = 1)
479             }, undef, $change_id)->[0];
480             }
481              
482             sub _run {
483 4     4   3450 my $self = shift;
484 4         17 my $sqitch = $self->sqitch;
485 4 100       179 my $pass = $self->password or return $sqitch->run( $self->vsql, @_ );
486 1         189 local $ENV{VSQL_PASSWORD} = $pass;
487 1         7 return $sqitch->run( $self->vsql, @_ );
488             }
489              
490             sub _capture {
491 3     3   1862 my $self = shift;
492 3         14 my $sqitch = $self->sqitch;
493 3 100       100 my $pass = $self->password or return $sqitch->capture( $self->vsql, @_ );
494 1         22 local $ENV{VSQL_PASSWORD} = $pass;
495 1         5 return $sqitch->capture( $self->vsql, @_ );
496             }
497              
498             sub _probe {
499 2     2   1865 my $self = shift;
500 2         11 my $sqitch = $self->sqitch;
501 2 100       76 my $pass = $self->password or return $sqitch->probe( $self->vsql, @_ );
502 1         19 local $ENV{VSQL_PASSWORD} = $pass;
503 1         5 return $sqitch->probe( $self->vsql, @_ );
504             }
505              
506             sub _spool {
507 3     3   1946 my $self = shift;
508 3         8 my $fh = shift;
509 3         11 my $sqitch = $self->sqitch;
510 3 100       106 my $pass = $self->password or return $sqitch->spool( $fh, $self->vsql, @_ );
511 1         21 local $ENV{VSQL_PASSWORD} = $pass;
512 1         5 return $sqitch->spool( $fh, $self->vsql, @_ );
513             }
514              
515             1;
516              
517             __END__
518              
519             =head1 Name
520              
521             App::Sqitch::Engine::vertica - Sqitch Vertica Engine
522              
523             =head1 Synopsis
524              
525             my $vertica = App::Sqitch::Engine->load( engine => 'vertica' );
526              
527             =head1 Description
528              
529             App::Sqitch::Engine::vertica provides the Vertica storage engine for Sqitch.
530             It supports Vertica 6.
531              
532             =head1 Interface
533              
534             =head2 Instance Methods
535              
536             =head3 C<initialized>
537              
538             $vertica->initialize unless $vertica->initialized;
539              
540             Returns true if the database has been initialized for Sqitch, and false if it
541             has not.
542              
543             =head3 C<initialize>
544              
545             $vertica->initialize;
546              
547             Initializes a database for Sqitch by installing the Sqitch registry schema.
548              
549             =head3 C<vsql>
550              
551             Returns a list containing the C<vsql> client and options to be passed to it.
552             Used internally when executing scripts.
553              
554             =head1 Author
555              
556             David E. Wheeler <david@justatheory.com>
557              
558             =head1 License
559              
560             Copyright (c) 2012-2026 David E. Wheeler, 2012-2021 iovation Inc.
561              
562             Permission is hereby granted, free of charge, to any person obtaining a copy
563             of this software and associated documentation files (the "Software"), to deal
564             in the Software without restriction, including without limitation the rights
565             to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
566             copies of the Software, and to permit persons to whom the Software is
567             furnished to do so, subject to the following conditions:
568              
569             The above copyright notice and this permission notice shall be included in all
570             copies or substantial portions of the Software.
571              
572             THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
573             IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
574             FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
575             AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
576             LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
577             OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
578             SOFTWARE.
579              
580             =cut