File Coverage

blib/lib/App/Sqitch/Engine/pg.pm
Criterion Covered Total %
statement 117 168 69.6
branch 28 44 63.6
condition 13 22 59.0
subroutine 35 55 63.6
pod 15 15 100.0
total 208 304 68.4


line stmt bran cond sub pod time code
1             package App::Sqitch::Engine::pg;
2              
3 9     9   213157 use 5.010;
  9         40  
4 9     9   97 use Moo;
  9         21  
  9         84  
5 9     9   4475 use utf8;
  9         24  
  9         66  
6 9     9   428 use Path::Class;
  9         21  
  9         746  
7 9     9   4811 use DBI;
  9         63402  
  9         576  
8 9     9   68 use Try::Tiny;
  9         18  
  9         703  
9 9     9   77 use App::Sqitch::X qw(hurl);
  9         19  
  9         115  
10 9     9   3597 use Locale::TextDomain qw(App-Sqitch);
  9         20  
  9         147  
11 9     9   2338 use App::Sqitch::Plan::Change;
  9         25  
  9         386  
12 9     9   60 use List::Util qw(first);
  9         19  
  9         773  
13 9     9   63 use App::Sqitch::Types qw(DBH ArrayRef);
  9         17  
  9         112  
14 9     9   26894 use Type::Utils qw(enum);
  9         23  
  9         114  
15 9     9   7127 use namespace::autoclean;
  9         18  
  9         119  
16              
17             extends 'App::Sqitch::Engine';
18              
19             our $VERSION = 'v1.6.1'; # VERSION
20              
21             sub destination {
22 26     26 1 2026 my $self = shift;
23              
24             # Just use the target name if it doesn't look like a URI or if the URI
25             # includes the database name.
26 26 50 33     420 return $self->target->name if $self->target->name !~ /:/
27             || $self->target->uri->dbname;
28              
29             # Use the URI sans password, and with the database name added.
30 26         1648 my $uri = $self->target->uri->clone;
31 26 50       458 $uri->password(undef) if $uri->password;
32             $uri->dbname(
33             $ENV{PGDATABASE}
34             || $self->username
35             || $ENV{PGUSER}
36 26   66     1686 || $self->sqitch->sysuser
37             );
38 26         2728 return $uri->as_string;
39             }
40              
41             # DBD::pg and psql use fallbacks consistently, thanks to libpq. These include
42             # environment variables, system info (username), the password file, and the
43             # connection service file. Best for us not to second-guess these values,
44             # though we admittedly try when setting the database name in the destination
45             # URI for unnamed targets a few lines up from here.
46       18     sub _def_user { }
47       6     sub _def_pass { }
48              
49             has _psql => (
50             is => 'ro',
51             isa => ArrayRef,
52             lazy => 1,
53             default => sub {
54             my $self = shift;
55             my $uri = $self->uri;
56             my @ret = ( $self->client );
57              
58             my %query_params = $uri->query_params;
59             my @conninfo;
60             # Use _port instead of port so it's empty if no port is in the URI.
61             # https://github.com/sqitchers/sqitch/issues/675
62             for my $spec (
63             [ user => $self->username ],
64             [ dbname => $uri->dbname ],
65             [ host => $uri->host ],
66             [ port => $uri->_port ],
67             map { [ $_ => $query_params{$_} ] }
68             sort keys %query_params,
69             ) {
70             next unless defined $spec->[1] && length $spec->[1];
71             if ($spec->[1] =~ /[ "'\\]/) {
72             $spec->[1] =~ s/([ "'\\])/\\$1/g;
73             }
74             push @conninfo, "$spec->[0]=$spec->[1]";
75             }
76              
77             push @ret => '--dbname', join ' ', @conninfo if @conninfo;
78              
79             if (my %vars = $self->variables) {
80             push @ret => map {; '--set', "$_=$vars{$_}" } sort keys %vars;
81             }
82              
83             push @ret => $self->_client_opts;
84             return \@ret;
85             },
86             );
87              
88             sub _client_opts {
89 12     12   28 my $self = shift;
90             return (
91 12         340 '--quiet',
92             '--no-psqlrc',
93             '--no-align',
94             '--tuples-only',
95             '--set' => 'ON_ERROR_STOP=1',
96             '--set' => 'registry=' . $self->registry,
97             );
98             }
99              
100 58     58 1 30586 sub psql { @{ shift->_psql } }
  58         1736  
101              
102 6     6 1 3993402 sub key { 'pg' }
103 3     3 1 40 sub name { 'PostgreSQL' }
104 0     0 1 0 sub driver { 'DBD::Pg 2.0' }
105 11     11 1 1526 sub default_client { 'psql' }
106              
107             has dbh => (
108             is => 'rw',
109             isa => DBH,
110             lazy => 1,
111             default => sub {
112             my $self = shift;
113             $self->use_driver;
114              
115             local $ENV{PGCLIENTENCODING} = 'UTF8';
116             DBI->connect($self->_dsn, $self->username, $self->password, {
117             PrintError => 0,
118             RaiseError => 0,
119             AutoCommit => 1,
120             pg_enable_utf8 => 1,
121             pg_server_prepare => 1,
122             HandleError => $self->error_handler,
123             Callbacks => {
124             connected => sub {
125             my $dbh = shift;
126             $dbh->do('SET client_min_messages = WARNING');
127             # Setting search currently never fails, but call
128             # _handle_no_registry in case that changes in the future.
129             $dbh->do(
130             'SET search_path = ?',
131             undef, $self->registry
132             ) or $self->_handle_no_registry($dbh);
133              
134             # Determine the provider. Yugabyte says this is the right way to do it.
135             # https://yugabyte-db.slack.com/archives/CG0KQF0GG/p1653762283847589
136             my $v = $dbh->selectcol_arrayref(
137             q{SELECT split_part(version(), ' ', 2)}
138             )->[0] // '';
139             $dbh->{private_sqitch_info} = {
140             provider => $v =~ /-YB-/ ? 'yugabyte' : 'postgres',
141             };
142             return;
143             },
144             },
145             });
146             }
147             );
148              
149             # Need to wait until dbh is defined.
150             with 'App::Sqitch::Role::DBIEngine';
151              
152             sub _log_tags_param {
153 0     0   0 [ map { $_->format_name } $_[1]->tags ];
  0         0  
154             }
155              
156             sub _log_requires_param {
157 0     0   0 [ map { $_->as_string } $_[1]->requires ];
  0         0  
158             }
159              
160             sub _log_conflicts_param {
161 0     0   0 [ map { $_->as_string } $_[1]->conflicts ];
  0         0  
162             }
163              
164             sub _ts2char_format {
165 2     2   3124 q{to_char(%s AT TIME ZONE 'UTC', '"year":YYYY:"month":MM:"day":DD:"hour":HH24:"minute":MI:"second":SS:"time_zone":"UTC"')};
166             }
167              
168 0     0   0 sub _ts_default { 'clock_timestamp()' }
169              
170 0     0   0 sub _char2ts { $_[1]->as_string(format => 'iso') }
171              
172             sub _listagg_format {
173 10     10   2668 my $dbh = shift->dbh;
174             # Since 9.3, we can use array_remove().
175             return q{array_remove(array_agg(%1$s ORDER BY %1$s), NULL)}
176 10 100       162 if $dbh->{pg_server_version} >= 90300;
177              
178             # Since 9.0 we can use ORDER BY.
179             return q{ARRAY(SELECT * FROM UNNEST( array_agg(%1$s ORDER BY %1$s) ) a WHERE a IS NOT NULL)}
180 6 100       62 if $dbh->{pg_server_version} >= 90000;
181              
182 2         16 return q{ARRAY(SELECT * FROM UNNEST( array_agg(%s) ) a WHERE a IS NOT NULL)};
183             }
184              
185 0     0   0 sub _regex_op { '~' }
186              
187 0     0   0 sub _version_query { 'SELECT MAX(version)::TEXT FROM releases' }
188              
189             sub _initialized {
190 0     0   0 my $self = shift;
191 0         0 return $self->dbh->selectcol_arrayref(q{
192             SELECT EXISTS(
193             SELECT TRUE FROM pg_catalog.pg_tables
194             WHERE schemaname = ? AND tablename = ?
195             )
196             }, undef, $self->registry, 'changes')->[0];
197             }
198              
199             sub _initialize {
200 0     0   0 my $self = shift;
201 0 0       0 hurl engine => __x(
202             'Sqitch schema "{schema}" already exists',
203             schema => $self->registry
204             ) if $self->initialized;
205 0         0 $self->_run_registry_file( file(__FILE__)->dir->file($self->key . '.sql') );
206 0         0 $self->_register_release;
207             }
208              
209             sub _psql_major_version {
210 12     12   28240 my $self = shift;
211 12         472 my $psql_version = $self->sqitch->probe($self->client, '--version');
212 12         966 my @parts = split /\s+/, $psql_version;
213 12         58 my ($maj) = $parts[-1] =~ /^(\d+)/;
214 12   50     154 return $maj || 0;
215             }
216              
217             sub _run_registry_file {
218 6     6   30846 my ($self, $file) = @_;
219 6         218 my $schema = $self->registry;
220              
221             # Fetch the client version. 8.4 == 80400
222 6         514 my $version = $self->_probe('-c', 'SHOW server_version_num');
223 6         100 my $psql_maj = $self->_psql_major_version;
224              
225             # Is this XC?
226 6 100       32 my $opts = $self->_probe('-c', q{
227             SELECT count(*)
228             FROM pg_catalog.pg_proc p
229             JOIN pg_catalog.pg_namespace n ON p.pronamespace = n.oid
230             WHERE nspname = 'pg_catalog'
231             AND proname = 'pgxc_version';
232             }) ? ' DISTRIBUTE BY REPLICATION' : '';
233              
234 6 100 100     116 if ($version < 90300 || $psql_maj < 9) {
235             # Need to transform the SQL and write it to a temp file.
236 4         28 my $sql = scalar $file->slurp;
237              
238             # No CREATE SCHEMA IF NOT EXISTS syntax prior to 9.3.
239 4 100       3836 $sql =~ s/SCHEMA IF NOT EXISTS/SCHEMA/ if $version < 90300;
240 4 100       20 if ($psql_maj < 9) {
241             # Also no :"registry" variable syntax prior to psql 9.0.s
242 2         12 ($schema) = $self->dbh->selectrow_array(
243             'SELECT quote_ident(?)', undef, $schema
244             );
245 2         192 $sql =~ s{:"registry"}{$schema}g;
246             }
247 4         44 require File::Temp;
248 4         40 my $fh = File::Temp->new;
249 4         3912 print $fh $sql;
250 4         164 close $fh;
251 4         34 $self->_run(
252             '--file' => $fh->filename,
253             '--set' => "tableopts=$opts",
254             );
255             } else {
256             # We can take advantage of the :"registry" variable syntax.
257 2         20 $self->_run(
258             '--file' => $file,
259             '--set' => "registry=$schema",
260             '--set' => "tableopts=$opts",
261             );
262             }
263              
264 6         118 $self->dbh->do('SET search_path = ?', undef, $schema);
265             }
266              
267             # Returns the name of the provider.
268             sub _provider {
269             shift->dbh->{private_sqitch_info}{provider}
270 0     0   0 }
271              
272             # Override to lock the changes table. This ensures that only one instance of
273             # Sqitch runs at one time.
274             sub begin_work {
275 0     0 1 0 my $self = shift;
276 0         0 my $dbh = $self->dbh;
277              
278             # Start transaction and lock changes to allow only one change at a time.
279 0         0 $dbh->begin_work;
280 0 0       0 $dbh->do('LOCK TABLE changes IN EXCLUSIVE MODE')
281             if $self->_provider eq 'postgres';
282             # Yugabyte does not yet support EXCLUSIVE MODE.
283             # https://docs.yugabyte.com/preview/api/ysql/the-sql-language/statements/txn_lock/#lockmode-1
284 0         0 return $self;
285             }
286              
287             # Override to try to acquire a lock on a constant number without waiting.
288             sub try_lock {
289 0     0 1 0 my $self = shift;
290 0 0       0 return 1 if $self->_provider ne 'postgres';
291 0         0 $self->dbh->selectcol_arrayref(
292             'SELECT pg_try_advisory_lock(75474063)'
293             )->[0]
294             }
295              
296             # Override to try to acquire a lock on a constant number, waiting for the lock
297             # until timeout.
298             sub wait_lock {
299 0     0 1 0 my $self = shift;
300              
301             # Yugabyte does not support advisory locks.
302             # https://github.com/yugabyte/yugabyte-db/issues/3642
303             # Use pessimistic locking when it becomes available.
304             # https://github.com/yugabyte/yugabyte-db/issues/5680
305 0 0       0 return 1 if $self->_provider ne 'postgres';
306              
307             # Asynchronously request a lock with an indefinite wait.
308 0         0 my $dbh = $self->dbh;
309 0         0 $dbh->do(
310             'SELECT pg_advisory_lock(75474063)',
311             { pg_async => DBD::Pg::PG_ASYNC() },
312             );
313              
314             # Use _timeout to periodically check for the result.
315 0 0   0   0 return 1 if $self->_timeout(sub { $dbh->pg_ready && $dbh->pg_result });
  0 0       0  
316              
317             # Timed out, cancel the query and return false.
318 0         0 $dbh->pg_cancel;
319 0         0 return 0;
320             }
321              
322             sub run_file {
323 2     2 1 1750 my ($self, $file) = @_;
324 2         12 $self->_run('--file' => $file);
325             }
326              
327             sub run_verify {
328 4     4 1 3814 my $self = shift;
329             # Suppress STDOUT unless we want extra verbosity.
330 4 100       112 my $meth = $self->can($self->sqitch->verbosity > 1 ? '_run' : '_capture');
331 4         510 return $self->$meth('--file' => @_);
332             }
333              
334             sub run_handle {
335 2     2 1 1674 my ($self, $fh) = @_;
336 2         12 $self->_spool($fh);
337             }
338              
339             sub run_upgrade {
340 0     0 1 0 shift->_run_registry_file(@_);
341             }
342              
343             # Override to avoid cast errors, and to use VALUES instead of a UNION query.
344             sub log_new_tags {
345 0     0 1 0 my ( $self, $change ) = @_;
346 0 0       0 my @tags = $change->tags or return $self;
347 0         0 my $sqitch = $self->sqitch;
348              
349 0         0 my ($id, $name, $proj, $user, $email) = (
350             $change->id,
351             $change->format_name,
352             $change->project,
353             $sqitch->user_name,
354             $sqitch->user_email
355             );
356              
357             $self->dbh->do(
358             q{
359             INSERT INTO tags (
360             tag_id
361             , tag
362             , project
363             , change_id
364             , note
365             , committer_name
366             , committer_email
367             , planned_at
368             , planner_name
369             , planner_email
370             )
371             SELECT tid, tg, proj, chid, n, name, email, at, pname, pemail FROM ( VALUES
372             } . join( ",\n ", ( q{(?::text, ?::text, ?::text, ?::text, ?::text, ?::text, ?::text, ?::timestamptz, ?::text, ?::text)} ) x @tags )
373             . q{
374             ) i(tid, tg, proj, chid, n, name, email, at, pname, pemail)
375             LEFT JOIN tags ON i.tid = tags.tag_id
376             WHERE tags.tag_id IS NULL
377             },
378             undef,
379 0         0 map { (
380 0         0 $_->id,
381             $_->format_name,
382             $proj,
383             $id,
384             $_->note,
385             $user,
386             $email,
387             $_->timestamp->as_string(format => 'iso'),
388             $_->planner_name,
389             $_->planner_email,
390             ) } @tags
391             );
392              
393 0         0 return $self;
394             }
395              
396             # Override to take advantage of the RETURNING expression, and to save tags as
397             # an array rather than a space-delimited string.
398             sub log_revert_change {
399 0     0 1 0 my ($self, $change) = @_;
400 0         0 my $dbh = $self->dbh;
401              
402             # Delete tags.
403 0   0     0 my $del_tags = $dbh->selectcol_arrayref(
404             'DELETE FROM tags WHERE change_id = ? RETURNING tag',
405             undef, $change->id
406             ) || [];
407              
408             # Retrieve dependencies.
409 0         0 my ($req, $conf) = $dbh->selectrow_array(q{
410             SELECT ARRAY(
411             SELECT dependency
412             FROM dependencies
413             WHERE change_id = $1
414             AND type = 'require'
415             ), ARRAY(
416             SELECT dependency
417             FROM dependencies
418             WHERE change_id = $1
419             AND type = 'conflict'
420             )
421             }, undef, $change->id);
422              
423             # Delete the change record.
424 0         0 $dbh->do(
425             'DELETE FROM changes where change_id = ?',
426             undef, $change->id,
427             );
428              
429             # Log it.
430 0         0 return $self->_log_event( revert => $change, $del_tags, $req, $conf );
431             }
432              
433             sub _dt($) {
434 4     4   4016 require App::Sqitch::DateTime;
435 4         72 return App::Sqitch::DateTime->new(split /:/ => shift);
436             }
437              
438             sub _no_table_error {
439 8 100 100 8   8722 return 0 unless $DBI::state && $DBI::state eq '42P01'; # undefined_table
440 4         20 my $dbh = shift->dbh;
441 4 100       110 return 1 unless $dbh->{pg_server_version} >= 90000;
442              
443             # Try to avoid confusion for people monitoring the Postgres error log by
444             # sending a warning to the log immediately after the missing relation error
445             # to tell log watchers that Sqitch is aware of the issue and will next
446             # initialize the database. Hopefully this will reduce confusion and
447             # unnecessary time trouble shooting an error that Sqitch handles.
448 2         22 my @msg = map { $dbh->quote($_) } (
  4         1166  
449             __ 'Sqitch registry not initialized',
450             __ 'Because the "changes" table does not exist, Sqitch will now initialize the database to create its registry tables.',
451             );
452 2         28 $dbh->do(sprintf q{DO $$
453             BEGIN
454             SET LOCAL client_min_messages = 'ERROR';
455             RAISE WARNING USING ERRCODE = 'undefined_table', MESSAGE = %s, DETAIL = %s;
456             END;
457             $$}, @msg);
458 2         22 return 1;
459             }
460              
461             # https://www.postgresql.org/docs/current/errcodes-appendix.html
462             sub _no_column_error {
463 14   100 14   118 return $DBI::state && $DBI::state eq '42703'; # undefined_column
464             }
465              
466             sub _unique_error {
467 0   0 0   0 return $DBI::state && $DBI::state eq '23505'; # unique_violation
468             }
469              
470             sub _in_expr {
471 0     0   0 my ($self, $vals) = @_;
472 0         0 return '= ANY(?)', $vals;
473             }
474              
475             sub _run {
476 8     8   6280 my $self = shift;
477 8         38 my $sqitch = $self->sqitch;
478 8 100       336 my $pass = $self->password or return $sqitch->run( $self->psql, @_ );
479 2         420 local $ENV{PGPASSWORD} = $pass;
480 2         12 return $sqitch->run( $self->psql, @_ );
481             }
482              
483             sub _capture {
484 10     10   5524 my $self = shift;
485 10         48 my $sqitch = $self->sqitch;
486 10 100       372 my $pass = $self->password or return $sqitch->capture( $self->psql, @_ );
487 2         46 local $ENV{PGPASSWORD} = $pass;
488 2         8 return $sqitch->capture( $self->psql, @_ );
489             }
490              
491             sub _probe {
492 4     4   3088 my $self = shift;
493 4         24 my $sqitch = $self->sqitch;
494 4 100       154 my $pass = $self->password or return $sqitch->probe( $self->psql, @_ );
495 2         62 local $ENV{PGPASSWORD} = $pass;
496 2         12 return $sqitch->probe( $self->psql, @_ );
497             }
498              
499             sub _spool {
500 6     6   3700 my $self = shift;
501 6         16 my $fh = shift;
502 6         30 my $sqitch = $self->sqitch;
503 6 100       230 my $pass = $self->password or return $sqitch->spool( $fh, $self->psql, @_ );
504 2         42 local $ENV{PGPASSWORD} = $pass;
505 2         8 return $sqitch->spool( $fh, $self->psql, @_ );
506             }
507              
508             1;
509              
510             __END__
511              
512             =head1 Name
513              
514             App::Sqitch::Engine::pg - Sqitch PostgreSQL Engine
515              
516             =head1 Synopsis
517              
518             my $pg = App::Sqitch::Engine->load( engine => 'pg' );
519              
520             =head1 Description
521              
522             App::Sqitch::Engine::pg provides the PostgreSQL storage engine for Sqitch. It
523             supports PostgreSQL 8.4.0 and higher, Postgres-XC 1.2 and higher, and YugabyteDB.
524              
525             =head1 Interface
526              
527             =head2 Instance Methods
528              
529             =head3 C<initialized>
530              
531             $pg->initialize unless $pg->initialized;
532              
533             Returns true if the database has been initialized for Sqitch, and false if it
534             has not.
535              
536             =head3 C<initialize>
537              
538             $pg->initialize;
539              
540             Initializes a database for Sqitch by installing the Sqitch registry schema.
541              
542             =head3 C<psql>
543              
544             Returns a list containing the C<psql> client and options to be passed to it.
545             Used internally when executing scripts.
546              
547             =head1 Author
548              
549             David E. Wheeler <david@justatheory.com>
550              
551             =head1 License
552              
553             Copyright (c) 2012-2026 David E. Wheeler, 2012-2021 iovation Inc.
554              
555             Permission is hereby granted, free of charge, to any person obtaining a copy
556             of this software and associated documentation files (the "Software"), to deal
557             in the Software without restriction, including without limitation the rights
558             to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
559             copies of the Software, and to permit persons to whom the Software is
560             furnished to do so, subject to the following conditions:
561              
562             The above copyright notice and this permission notice shall be included in all
563             copies or substantial portions of the Software.
564              
565             THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
566             IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
567             FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
568             AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
569             LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
570             OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
571             SOFTWARE.
572              
573             =cut