File Coverage

blib/lib/App/Sqitch/Engine/pg.pm
Criterion Covered Total %
statement 117 165 70.9
branch 28 42 66.6
condition 13 22 59.0
subroutine 35 54 64.8
pod 15 15 100.0
total 208 298 69.8


line stmt bran cond sub pod time code
1             package App::Sqitch::Engine::pg;
2              
3 9     9   130740 use 5.010;
  9         36  
4 9     9   56 use Moo;
  9         29  
  9         88  
5 9     9   3586 use utf8;
  9         22  
  9         70  
6 9     9   246 use Path::Class;
  9         21  
  9         565  
7 9     9   3762 use DBI;
  9         39419  
  9         412  
8 9     9   78 use Try::Tiny;
  9         28  
  9         619  
9 9     9   71 use App::Sqitch::X qw(hurl);
  9         27  
  9         112  
10 9     9   3046 use Locale::TextDomain qw(App-Sqitch);
  9         37  
  9         67  
11 9     9   2011 use App::Sqitch::Plan::Change;
  9         25  
  9         363  
12 9     9   56 use List::Util qw(first);
  9         26  
  9         740  
13 9     9   85 use App::Sqitch::Types qw(DBH ArrayRef);
  9         26  
  9         131  
14 9     9   9090 use Type::Utils qw(enum);
  9         28  
  9         97  
15 9     9   5681 use namespace::autoclean;
  9         26  
  9         78  
16              
17             extends 'App::Sqitch::Engine';
18              
19             our $VERSION = 'v1.4.0'; # VERSION
20              
21             sub destination {
22 26     26 1 1250 my $self = shift;
23              
24             # Just use the target name if it doesn't look like a URI or if the URI
25             # includes the database name.
26 26 50 33     268 return $self->target->name if $self->target->name !~ /:/
27             || $self->target->uri->dbname;
28              
29             # Use the URI sans password, and with the database name added.
30 26         1282 my $uri = $self->target->uri->clone;
31 26 50       396 $uri->password(undef) if $uri->password;
32             $uri->dbname(
33             $ENV{PGDATABASE}
34             || $self->username
35             || $ENV{PGUSER}
36 26   66     1314 || $self->sqitch->sysuser
37             );
38 26         1934 return $uri->as_string;
39             }
40              
41             # DBD::pg and psql use fallbacks consistently, thanks to libpq. These include
42             # environment variables, system info (username), the password file, and the
43             # connection service file. Best for us not to second-guess these values,
44             # though we admittedly try when setting the database name in the destination
45             # URI for unnamed targets a few lines up from here.
46       18     sub _def_user { }
47       6     sub _def_pass { }
48              
49             has _psql => (
50             is => 'ro',
51             isa => ArrayRef,
52             lazy => 1,
53             default => sub {
54             my $self = shift;
55             my $uri = $self->uri;
56             my @ret = ( $self->client );
57              
58             my %query_params = $uri->query_params;
59             my @conninfo;
60             # Use _port instead of port so it's empty if no port is in the URI.
61             # https://github.com/sqitchers/sqitch/issues/675
62             for my $spec (
63             [ user => $self->username ],
64             [ dbname => $uri->dbname ],
65             [ host => $uri->host ],
66             [ port => $uri->_port ],
67             map { [ $_ => $query_params{$_} ] }
68             sort keys %query_params,
69             ) {
70             next unless defined $spec->[1] && length $spec->[1];
71             if ($spec->[1] =~ /[ "'\\]/) {
72             $spec->[1] =~ s/([ "'\\])/\\$1/g;
73             }
74             push @conninfo, "$spec->[0]=$spec->[1]";
75             }
76              
77             push @ret => '--dbname', join ' ', @conninfo if @conninfo;
78              
79             if (my %vars = $self->variables) {
80             push @ret => map {; '--set', "$_=$vars{$_}" } sort keys %vars;
81             }
82              
83             push @ret => $self->_client_opts;
84             return \@ret;
85             },
86             );
87              
88             sub _client_opts {
89 12     12   34 my $self = shift;
90             return (
91 12         216 '--quiet',
92             '--no-psqlrc',
93             '--no-align',
94             '--tuples-only',
95             '--set' => 'ON_ERROR_STOP=1',
96             '--set' => 'registry=' . $self->registry,
97             );
98             }
99              
100 58     58 1 23150 sub psql { @{ shift->_psql } }
  58         1368  
101              
102 4     4 1 8558 sub key { 'pg' }
103 3     3 1 45 sub name { 'PostgreSQL' }
104 0     0 1 0 sub driver { 'DBD::Pg 2.0' }
105 11     11 1 1392 sub default_client { 'psql' }
106              
107             has _provider => (
108             is => 'rw',
109             isa => enum([qw( postgres yugabyte )]),
110             default => 'postgres',
111             lazy => 1,
112             );
113              
114             has dbh => (
115             is => 'rw',
116             isa => DBH,
117             lazy => 1,
118             default => sub {
119             my $self = shift;
120             $self->use_driver;
121              
122             my $uri = $self->uri;
123             local $ENV{PGCLIENTENCODING} = 'UTF8';
124             DBI->connect($uri->dbi_dsn, $self->username, $self->password, {
125             PrintError => 0,
126             RaiseError => 0,
127             AutoCommit => 1,
128             pg_enable_utf8 => 1,
129             pg_server_prepare => 1,
130             HandleError => sub {
131             my ($err, $dbh) = @_;
132             $@ = $err;
133             @_ = ($dbh->state || 'DEV' => $dbh->errstr);
134             goto &hurl;
135             },
136             Callbacks => {
137             connected => sub {
138             my $dbh = shift;
139             $dbh->do('SET client_min_messages = WARNING');
140             # Setting search currently never fails, but call
141             # _handle_no_registry in case that changes in the future.
142             $dbh->do(
143             'SET search_path = ?',
144             undef, $self->registry
145             ) or $self->_handle_no_registry($dbh);
146              
147             # Determine the provider. Yugabyte says this is the right way to do it.
148             # https://yugabyte-db.slack.com/archives/CG0KQF0GG/p1653762283847589
149             my $v = $dbh->selectcol_arrayref(
150             q{SELECT split_part(version(), ' ', 2)}
151             )->[0] // '';
152             $self->_provider('yugabyte') if $v =~ /-YB-/;
153             return;
154             },
155             },
156             });
157             }
158             );
159              
160             # Need to wait until dbh is defined.
161             with 'App::Sqitch::Role::DBIEngine';
162              
163             sub _log_tags_param {
164 0     0   0 [ map { $_->format_name } $_[1]->tags ];
  0         0  
165             }
166              
167             sub _log_requires_param {
168 0     0   0 [ map { $_->as_string } $_[1]->requires ];
  0         0  
169             }
170              
171             sub _log_conflicts_param {
172 0     0   0 [ map { $_->as_string } $_[1]->conflicts ];
  0         0  
173             }
174              
175             sub _ts2char_format {
176 2     2   2386 q{to_char(%s AT TIME ZONE 'UTC', '"year":YYYY:"month":MM:"day":DD:"hour":HH24:"minute":MI:"second":SS:"time_zone":"UTC"')};
177             }
178              
179 0     0   0 sub _ts_default { 'clock_timestamp()' }
180              
181 0     0   0 sub _char2ts { $_[1]->as_string(format => 'iso') }
182              
183             sub _listagg_format {
184 10     10   2122 my $dbh = shift->dbh;
185             # Since 9.3, we can use array_remove().
186             return q{array_remove(array_agg(%1$s ORDER BY %1$s), NULL)}
187 10 100       108 if $dbh->{pg_server_version} >= 90300;
188              
189             # Since 9.0 we can use ORDER BY.
190             return q{ARRAY(SELECT * FROM UNNEST( array_agg(%1$s ORDER BY %1$s) ) a WHERE a IS NOT NULL)}
191 6 100       76 if $dbh->{pg_server_version} >= 90000;
192              
193 2         28 return q{ARRAY(SELECT * FROM UNNEST( array_agg(%s) ) a WHERE a IS NOT NULL)};
194             }
195              
196 0     0   0 sub _regex_op { '~' }
197              
198 0     0   0 sub _version_query { 'SELECT MAX(version)::TEXT FROM releases' }
199              
200             sub _initialized {
201 0     0   0 my $self = shift;
202 0         0 return $self->dbh->selectcol_arrayref(q{
203             SELECT EXISTS(
204             SELECT TRUE FROM pg_catalog.pg_tables
205             WHERE schemaname = ? AND tablename = ?
206             )
207             }, undef, $self->registry, 'changes')->[0];
208             }
209              
210             sub _initialize {
211 0     0   0 my $self = shift;
212 0 0       0 hurl engine => __x(
213             'Sqitch schema "{schema}" already exists',
214             schema => $self->registry
215             ) if $self->initialized;
216 0         0 $self->_run_registry_file( file(__FILE__)->dir->file($self->key . '.sql') );
217 0         0 $self->_register_release;
218             }
219              
220             sub _psql_major_version {
221 12     12   20358 my $self = shift;
222 12         324 my $psql_version = $self->sqitch->probe($self->client, '--version');
223 12         664 my @parts = split /\s+/, $psql_version;
224 12         52 my ($maj) = $parts[-1] =~ /^(\d+)/;
225 12   50     92 return $maj || 0;
226             }
227              
228             sub _run_registry_file {
229 6     6   23552 my ($self, $file) = @_;
230 6         154 my $schema = $self->registry;
231              
232             # Fetch the client version. 8.4 == 80400
233 6         308 my $version = $self->_probe('-c', 'SHOW server_version_num');
234 6         46 my $psql_maj = $self->_psql_major_version;
235              
236             # Is this XC?
237 6 100       24 my $opts = $self->_probe('-c', q{
238             SELECT count(*)
239             FROM pg_catalog.pg_proc p
240             JOIN pg_catalog.pg_namespace n ON p.pronamespace = n.oid
241             WHERE nspname = 'pg_catalog'
242             AND proname = 'pgxc_version';
243             }) ? ' DISTRIBUTE BY REPLICATION' : '';
244              
245 6 100 100     100 if ($version < 90300 || $psql_maj < 9) {
246             # Need to transform the SQL and write it to a temp file.
247 4         30 my $sql = scalar $file->slurp;
248              
249             # No CREATE SCHEMA IF NOT EXISTS syntax prior to 9.3.
250 4 100       1248 $sql =~ s/SCHEMA IF NOT EXISTS/SCHEMA/ if $version < 90300;
251 4 100       24 if ($psql_maj < 9) {
252             # Also no :"registry" variable syntax prior to psql 9.0.s
253 2         10 ($schema) = $self->dbh->selectrow_array(
254             'SELECT quote_ident(?)', undef, $schema
255             );
256 2         118 $sql =~ s{:"registry"}{$schema}g;
257             }
258 4         34 require File::Temp;
259 4         34 my $fh = File::Temp->new;
260 4         7306 print $fh $sql;
261 4         154 close $fh;
262 4         32 $self->_run(
263             '--file' => $fh->filename,
264             '--set' => "tableopts=$opts",
265             );
266             } else {
267             # We can take advantage of the :"registry" variable syntax.
268 2         190 $self->_run(
269             '--file' => $file,
270             '--set' => "registry=$schema",
271             '--set' => "tableopts=$opts",
272             );
273             }
274              
275 6         112 $self->dbh->do('SET search_path = ?', undef, $schema);
276             }
277              
278             # Override to lock the changes table. This ensures that only one instance of
279             # Sqitch runs at one time.
280             sub begin_work {
281 0     0 1 0 my $self = shift;
282 0         0 my $dbh = $self->dbh;
283              
284             # Start transaction and lock changes to allow only one change at a time.
285 0         0 $dbh->begin_work;
286 0 0       0 $dbh->do('LOCK TABLE changes IN EXCLUSIVE MODE')
287             if $self->_provider eq 'postgres';
288             # Yugabyte does not yet support EXCLUSIVE MODE.
289             # https://docs.yugabyte.com/preview/api/ysql/the-sql-language/statements/txn_lock/#lockmode-1
290 0         0 return $self;
291             }
292              
293             # Override to try to acquire a lock on a constant number without waiting.
294             sub try_lock {
295             shift->dbh->selectcol_arrayref(
296 0     0 1 0 'SELECT pg_try_advisory_lock(75474063)'
297             )->[0]
298             }
299              
300             # Override to try to acquire a lock on a constant number, waiting for the lock
301             # until timeout.
302             sub wait_lock {
303 0     0 1 0 my $self = shift;
304              
305             # Yugabyte does not support advisory locks.
306             # https://github.com/yugabyte/yugabyte-db/issues/3642
307             # Use pessimistic locking when it becomes available.
308             # https://github.com/yugabyte/yugabyte-db/issues/5680
309 0 0       0 return 1 if $self->_provider ne 'postgres';
310              
311             # Asynchronously request a lock with an indefinite wait.
312 0         0 my $dbh = $self->dbh;
313 0         0 $dbh->do(
314             'SELECT pg_advisory_lock(75474063)',
315             { pg_async => DBD::Pg::PG_ASYNC() },
316             );
317              
318             # Use _timeout to periodically check for the result.
319 0 0   0   0 return 1 if $self->_timeout(sub { $dbh->pg_ready && $dbh->pg_result });
  0 0       0  
320              
321             # Timed out, cancel the query and return false.
322 0         0 $dbh->pg_cancel;
323 0         0 return 0;
324             }
325              
326             sub run_file {
327 2     2 1 1370 my ($self, $file) = @_;
328 2         10 $self->_run('--file' => $file);
329             }
330              
331             sub run_verify {
332 4     4 1 2946 my $self = shift;
333             # Suppress STDOUT unless we want extra verbosity.
334 4 100       72 my $meth = $self->can($self->sqitch->verbosity > 1 ? '_run' : '_capture');
335 4         314 return $self->$meth('--file' => @_);
336             }
337              
338             sub run_handle {
339 2     2 1 1334 my ($self, $fh) = @_;
340 2         12 $self->_spool($fh);
341             }
342              
343             sub run_upgrade {
344 0     0 1 0 shift->_run_registry_file(@_);
345             }
346              
347             # Override to avoid cast errors, and to use VALUES instead of a UNION query.
348             sub log_new_tags {
349 0     0 1 0 my ( $self, $change ) = @_;
350 0 0       0 my @tags = $change->tags or return $self;
351 0         0 my $sqitch = $self->sqitch;
352              
353 0         0 my ($id, $name, $proj, $user, $email) = (
354             $change->id,
355             $change->format_name,
356             $change->project,
357             $sqitch->user_name,
358             $sqitch->user_email
359             );
360              
361             $self->dbh->do(
362             q{
363             INSERT INTO tags (
364             tag_id
365             , tag
366             , project
367             , change_id
368             , note
369             , committer_name
370             , committer_email
371             , planned_at
372             , planner_name
373             , planner_email
374             )
375             SELECT tid, tg, proj, chid, n, name, email, at, pname, pemail FROM ( VALUES
376             } . join( ",\n ", ( q{(?::text, ?::text, ?::text, ?::text, ?::text, ?::text, ?::text, ?::timestamptz, ?::text, ?::text)} ) x @tags )
377             . q{
378             ) i(tid, tg, proj, chid, n, name, email, at, pname, pemail)
379             LEFT JOIN tags ON i.tid = tags.tag_id
380             WHERE tags.tag_id IS NULL
381             },
382             undef,
383 0         0 map { (
384 0         0 $_->id,
385             $_->format_name,
386             $proj,
387             $id,
388             $_->note,
389             $user,
390             $email,
391             $_->timestamp->as_string(format => 'iso'),
392             $_->planner_name,
393             $_->planner_email,
394             ) } @tags
395             );
396              
397 0         0 return $self;
398             }
399              
400             # Override to take advantage of the RETURNING expression, and to save tags as
401             # an array rather than a space-delimited string.
402             sub log_revert_change {
403 0     0 1 0 my ($self, $change) = @_;
404 0         0 my $dbh = $self->dbh;
405              
406             # Delete tags.
407 0   0     0 my $del_tags = $dbh->selectcol_arrayref(
408             'DELETE FROM tags WHERE change_id = ? RETURNING tag',
409             undef, $change->id
410             ) || [];
411              
412             # Retrieve dependencies.
413 0         0 my ($req, $conf) = $dbh->selectrow_array(q{
414             SELECT ARRAY(
415             SELECT dependency
416             FROM dependencies
417             WHERE change_id = $1
418             AND type = 'require'
419             ), ARRAY(
420             SELECT dependency
421             FROM dependencies
422             WHERE change_id = $1
423             AND type = 'conflict'
424             )
425             }, undef, $change->id);
426              
427             # Delete the change record.
428 0         0 $dbh->do(
429             'DELETE FROM changes where change_id = ?',
430             undef, $change->id,
431             );
432              
433             # Log it.
434 0         0 return $self->_log_event( revert => $change, $del_tags, $req, $conf );
435             }
436              
437             sub _dt($) {
438 4     4   2836 require App::Sqitch::DateTime;
439 4         74 return App::Sqitch::DateTime->new(split /:/ => shift);
440             }
441              
442             sub _no_table_error {
443 8 100 100 8   7152 return 0 unless $DBI::state && $DBI::state eq '42P01'; # undefined_table
444 4         16 my $dbh = shift->dbh;
445 4 100       76 return 1 unless $dbh->{pg_server_version} >= 90000;
446              
447             # Try to avoid confusion for people monitoring the Postgres error log by
448             # sending a warning to the log immediately after the missing relation error
449             # to tell log watchers that Sqitch is aware of the issue and will next
450             # initialize the database. Hopefully this will reduce confusion and
451             # unnecessary time trouble shooting an error that Sqitch handles.
452 2         18 my @msg = map { $dbh->quote($_) } (
  4         1222  
453             __ 'Sqitch registry not initialized',
454             __ 'Because the "changes" table does not exist, Sqitch will now initialize the database to create its registry tables.',
455             );
456 2         42 $dbh->do(sprintf q{DO $$
457             BEGIN
458             SET LOCAL client_min_messages = 'ERROR';
459             RAISE WARNING USING ERRCODE = 'undefined_table', MESSAGE = %s, DETAIL = %s;
460             END;
461             $$}, @msg);
462 2         26 return 1;
463             }
464              
465             # https://www.postgresql.org/docs/current/errcodes-appendix.html
466             sub _no_column_error {
467 14   100 14   162 return $DBI::state && $DBI::state eq '42703'; # undefined_column
468             }
469              
470             sub _unique_error {
471 0   0 0   0 return $DBI::state && $DBI::state eq '23505'; # unique_violation
472             }
473              
474             sub _in_expr {
475 0     0   0 my ($self, $vals) = @_;
476 0         0 return '= ANY(?)', $vals;
477             }
478              
479             sub _run {
480 8     8   4208 my $self = shift;
481 8         26 my $sqitch = $self->sqitch;
482 8 100       194 my $pass = $self->password or return $sqitch->run( $self->psql, @_ );
483 2         320 local $ENV{PGPASSWORD} = $pass;
484 2         8 return $sqitch->run( $self->psql, @_ );
485             }
486              
487             sub _capture {
488 10     10   4672 my $self = shift;
489 10         50 my $sqitch = $self->sqitch;
490 10 100       276 my $pass = $self->password or return $sqitch->capture( $self->psql, @_ );
491 2         32 local $ENV{PGPASSWORD} = $pass;
492 2         8 return $sqitch->capture( $self->psql, @_ );
493             }
494              
495             sub _probe {
496 4     4   2738 my $self = shift;
497 4         14 my $sqitch = $self->sqitch;
498 4 100       100 my $pass = $self->password or return $sqitch->probe( $self->psql, @_ );
499 2         34 local $ENV{PGPASSWORD} = $pass;
500 2         10 return $sqitch->probe( $self->psql, @_ );
501             }
502              
503             sub _spool {
504 6     6   2770 my $self = shift;
505 6         14 my $fh = shift;
506 6         18 my $sqitch = $self->sqitch;
507 6 100       148 my $pass = $self->password or return $sqitch->spool( $fh, $self->psql, @_ );
508 2         36 local $ENV{PGPASSWORD} = $pass;
509 2         12 return $sqitch->spool( $fh, $self->psql, @_ );
510             }
511              
512             1;
513              
514             __END__
515              
516             =head1 Name
517              
518             App::Sqitch::Engine::pg - Sqitch PostgreSQL Engine
519              
520             =head1 Synopsis
521              
522             my $pg = App::Sqitch::Engine->load( engine => 'pg' );
523              
524             =head1 Description
525              
526             App::Sqitch::Engine::pg provides the PostgreSQL storage engine for Sqitch. It
527             supports PostgreSQL 8.4.0 and higher, Postgres-XC 1.2 and higher, and YugabyteDB.
528              
529             =head1 Interface
530              
531             =head2 Instance Methods
532              
533             =head3 C<initialized>
534              
535             $pg->initialize unless $pg->initialized;
536              
537             Returns true if the database has been initialized for Sqitch, and false if it
538             has not.
539              
540             =head3 C<initialize>
541              
542             $pg->initialize;
543              
544             Initializes a database for Sqitch by installing the Sqitch registry schema.
545              
546             =head3 C<psql>
547              
548             Returns a list containing the C<psql> client and options to be passed to it.
549             Used internally when executing scripts.
550              
551             =head1 Author
552              
553             David E. Wheeler <david@justatheory.com>
554              
555             =head1 License
556              
557             Copyright (c) 2012-2023 iovation Inc., David E. Wheeler
558              
559             Permission is hereby granted, free of charge, to any person obtaining a copy
560             of this software and associated documentation files (the "Software"), to deal
561             in the Software without restriction, including without limitation the rights
562             to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
563             copies of the Software, and to permit persons to whom the Software is
564             furnished to do so, subject to the following conditions:
565              
566             The above copyright notice and this permission notice shall be included in all
567             copies or substantial portions of the Software.
568              
569             THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
570             IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
571             FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
572             AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
573             LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
574             OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
575             SOFTWARE.
576              
577             =cut