File Coverage

blib/lib/App/Sqitch/Engine/pg.pm
Criterion Covered Total %
statement 117 164 71.3
branch 28 42 66.6
condition 13 19 68.4
subroutine 35 53 66.0
pod 17 17 100.0
total 210 295 71.1


line stmt bran cond sub pod time code
1              
2             use 5.010;
3 9     9   35556 use Moo;
  9         34  
4 9     9   55 use utf8;
  9         65  
  9         55  
5 9     9   2883 use Path::Class;
  9         23  
  9         60  
6 9     9   229 use DBI;
  9         33  
  9         587  
7 9     9   8180 use Try::Tiny;
  9         94503  
  9         462  
8 9     9   77 use App::Sqitch::X qw(hurl);
  9         19  
  9         560  
9 9     9   58 use Locale::TextDomain qw(App-Sqitch);
  9         18  
  9         82  
10 9     9   2634 use App::Sqitch::Plan::Change;
  9         17  
  9         79  
11 9     9   1729 use List::Util qw(first);
  9         36  
  9         281  
12 9     9   62 use App::Sqitch::Types qw(DBH ArrayRef);
  9         17  
  9         562  
13 9     9   62 use Type::Utils qw(enum);
  9         23  
  9         90  
14 9     9   7342 use namespace::autoclean;
  9         18  
  9         81  
15 9     9   4708  
  9         20  
  9         73  
16             extends 'App::Sqitch::Engine';
17              
18             our $VERSION = 'v1.3.0'; # VERSION
19              
20             my $self = shift;
21              
22 26     26 1 916 # Just use the target name if it doesn't look like a URI or if the URI
23             # includes the database name.
24             return $self->target->name if $self->target->name !~ /:/
25             || $self->target->uri->dbname;
26 26 50 33     282  
27             # Use the URI sans password, and with the database name added.
28             my $uri = $self->target->uri->clone;
29             $uri->password(undef) if $uri->password;
30 26         1006 $uri->dbname(
31 26 50       292 $ENV{PGDATABASE}
32             || $self->username
33             || $ENV{PGUSER}
34             || $self->sqitch->sysuser
35             );
36 26   66     1026 return $uri->as_string;
37             }
38 26         1522  
39             # DBD::pg and psql use fallbacks consistently, thanks to libpq. These include
40             # environment variables, system info (username), the password file, and the
41             # connection service file. Best for us not to second-guess these values,
42             # though we admittedly try when setting the database name in the destination
43             # URI for unnamed targets a few lines up from here.
44              
45             has _psql => (
46       18     is => 'ro',
47       6     isa => ArrayRef,
48             lazy => 1,
49             default => sub {
50             my $self = shift;
51             my $uri = $self->uri;
52             my @ret = ( $self->client );
53              
54             my %query_params = $uri->query_params;
55             my @conninfo;
56             for my $spec (
57             [ user => $self->username ],
58             [ dbname => $uri->dbname ],
59             [ host => $uri->host ],
60             [ port => $uri->port ],
61             map { [ $_ => $query_params{$_} ] }
62             sort keys %query_params,
63             ) {
64             next unless defined $spec->[1] && length $spec->[1];
65             if ($spec->[1] =~ /[ "'\\]/) {
66             $spec->[1] =~ s/([ "'\\])/\\$1/g;
67             }
68             push @conninfo, "$spec->[0]=$spec->[1]";
69             }
70              
71             push @ret => '--dbname', join ' ', @conninfo if @conninfo;
72              
73             if (my %vars = $self->variables) {
74             push @ret => map {; '--set', "$_=$vars{$_}" } sort keys %vars;
75             }
76              
77             push @ret => $self->_client_opts;
78             return \@ret;
79             },
80             );
81              
82             my $self = shift;
83             return (
84             '--quiet',
85             '--no-psqlrc',
86             '--no-align',
87 12     12   20 '--tuples-only',
88             '--set' => 'ON_ERROR_STOP=1',
89 12         184 '--set' => 'registry=' . $self->registry,
90             );
91             }
92              
93              
94              
95             has _provider => (
96             is => 'rw',
97             isa => enum([qw( postgres yugabyte )]),
98 58     58 1 19002 default => 'postgres',
  58         982  
99             lazy => 1,
100 4     4 1 9527 );
101 3     3 1 31  
102 0     0 1 0 has dbh => (
103 11     11 1 1299 is => 'rw',
104             isa => DBH,
105             lazy => 1,
106             default => sub {
107             my $self = shift;
108             $self->use_driver;
109              
110             my $uri = $self->uri;
111             local $ENV{PGCLIENTENCODING} = 'UTF8';
112             DBI->connect($uri->dbi_dsn, $self->username, $self->password, {
113             PrintError => 0,
114             RaiseError => 0,
115             AutoCommit => 1,
116             pg_enable_utf8 => 1,
117             pg_server_prepare => 1,
118             HandleError => sub {
119             my ($err, $dbh) = @_;
120             $@ = $err;
121             @_ = ($dbh->state || 'DEV' => $dbh->errstr);
122             goto &hurl;
123             },
124             Callbacks => {
125             connected => sub {
126             my $dbh = shift;
127             $dbh->do('SET client_min_messages = WARNING');
128             try {
129             $dbh->do(
130             'SET search_path = ?',
131             undef, $self->registry
132             );
133             # https://www.nntp.perl.org/group/perl.dbi.dev/2013/11/msg7622.html
134             $dbh->set_err(undef, undef) if $dbh->err;
135             };
136             # Determine the provider. Yugabyte says this is the right way to do it.
137             # https://yugabyte-db.slack.com/archives/CG0KQF0GG/p1653762283847589
138             my $v = $dbh->selectcol_arrayref(
139             q{SELECT split_part(version(), ' ', 2)}
140             )->[0] // '';
141             $self->_provider('yugabyte') if $v =~ /-YB-/;
142             return;
143             },
144             },
145             });
146             }
147             );
148              
149             # Need to wait until dbh is defined.
150             with 'App::Sqitch::Role::DBIEngine';
151              
152             [ map { $_->format_name } $_[1]->tags ];
153             }
154              
155             [ map { $_->as_string } $_[1]->requires ];
156             }
157              
158             [ map { $_->as_string } $_[1]->conflicts ];
159             }
160              
161             q{to_char(%s AT TIME ZONE 'UTC', '"year":YYYY:"month":MM:"day":DD:"hour":HH24:"minute":MI:"second":SS:"time_zone":"UTC"')};
162             }
163 0     0   0  
  0         0  
164              
165              
166             my $dbh = shift->dbh;
167 0     0   0 # Since 9.3, we can use array_remove().
  0         0  
168             return q{array_remove(array_agg(%1$s ORDER BY %1$s), NULL)}
169             if $dbh->{pg_server_version} >= 90300;
170              
171 0     0   0 # Since 8.4 we can use ORDER BY.
  0         0  
172             return q{ARRAY(SELECT * FROM UNNEST( array_agg(%1$s ORDER BY %1$s) ) a WHERE a IS NOT NULL)}
173             if $dbh->{pg_server_version} >= 80400;
174              
175 2     2   1900 return q{ARRAY(SELECT * FROM UNNEST( array_agg(%s) ) a WHERE a IS NOT NULL)};
176             }
177              
178 0     0   0  
179              
180 0     0   0 my $self = shift;
181             return $self->dbh->selectcol_arrayref(q{
182             SELECT EXISTS(
183 10     10   1652 SELECT TRUE FROM pg_catalog.pg_tables
184             WHERE schemaname = ? AND tablename = ?
185             )
186 10 100       80 }, undef, $self->registry, 'changes')->[0];
187             }
188              
189             my $self = shift;
190 6 100       38 hurl engine => __x(
191             'Sqitch schema "{schema}" already exists',
192 2         12 schema => $self->registry
193             ) if $self->initialized;
194             $self->_run_registry_file( file(__FILE__)->dir->file($self->key . '.sql') );
195 0     0   0 $self->_register_release;
196             }
197 0     0   0  
198             my $self = shift;
199             my $psql_version = $self->sqitch->probe($self->client, '--version');
200 0     0 1 0 my @parts = split /\s+/, $psql_version;
201 0         0 my ($maj) = $parts[-1] =~ /^(\d+)/;
202             return $maj || 0;
203             }
204              
205             my ($self, $file) = @_;
206             my $schema = $self->registry;
207              
208             # Fetch the client version. 8.4 == 80400
209             my $version = $self->_probe('-c', 'SHOW server_version_num');
210 0     0 1 0 my $psql_maj = $self->_psql_major_version;
211 0 0       0  
212             # Is this XC?
213             my $opts = $self->_probe('-c', q{
214             SELECT count(*)
215 0         0 FROM pg_catalog.pg_proc p
216 0         0 JOIN pg_catalog.pg_namespace n ON p.pronamespace = n.oid
217             WHERE nspname = 'pg_catalog'
218             AND proname = 'pgxc_version';
219             }) ? ' DISTRIBUTE BY REPLICATION' : '';
220 12     12   16322  
221 12         266 if ($version < 90300 || $psql_maj < 9) {
222 12         532 # Need to transform the SQL and write it to a temp file.
223 12         36 my $sql = scalar $file->slurp;
224 12   50     80  
225             # No CREATE SCHEMA IF NOT EXISTS syntax prior to 9.3.
226             $sql =~ s/SCHEMA IF NOT EXISTS/SCHEMA/ if $version < 90300;
227             if ($psql_maj < 9) {
228 6     6   18730 # Also no :"registry" variable syntax prior to psql 9.0.s
229 6         138 ($schema) = $self->dbh->selectrow_array(
230             'SELECT quote_ident(?)', undef, $schema
231             );
232 6         244 $sql =~ s{:"registry"}{$schema}g;
233 6         38 }
234             require File::Temp;
235             my $fh = File::Temp->new;
236 6 100       20 print $fh $sql;
237             close $fh;
238             $self->_run(
239             '--file' => $fh->filename,
240             '--set' => "tableopts=$opts",
241             );
242             } else {
243             # We can take advantage of the :"registry" variable syntax.
244 6 100 100     60 $self->_run(
245             '--file' => $file,
246 4         16 '--set' => "registry=$schema",
247             '--set' => "tableopts=$opts",
248             );
249 4 100       1006 }
250 4 100       14  
251             $self->dbh->do('SET search_path = ?', undef, $schema);
252 2         10 }
253              
254             # Override to lock the changes table. This ensures that only one instance of
255 2         96 # Sqitch runs at one time.
256             my $self = shift;
257 4         164 my $dbh = $self->dbh;
258 4         32  
259 4         1892 # Start transaction and lock changes to allow only one change at a time.
260 4         86 $dbh->begin_work;
261 4         26 $dbh->do('LOCK TABLE changes IN EXCLUSIVE MODE')
262             if $self->_provider eq 'postgres';
263             # Yugabyte does not yet support EXCLUSIVE MODE.
264             # https://docs.yugabyte.com/preview/api/ysql/the-sql-language/statements/txn_lock/#lockmode-1
265             return $self;
266             }
267 2         14  
268             # Override to try to acquire a lock on a constant number without waiting.
269             shift->dbh->selectcol_arrayref(
270             'SELECT pg_try_advisory_lock(75474063)'
271             )->[0]
272             }
273              
274 6         78 # Override to try to acquire a lock on a constant number, waiting for the lock
275             # until timeout.
276             my $self = shift;
277              
278             # Yugabyte does not support advisory locks.
279             # https://github.com/yugabyte/yugabyte-db/issues/3642
280 0     0 1 0 # Use pessimistic locking when it becomes available.
281 0         0 # https://github.com/yugabyte/yugabyte-db/issues/5680
282             return 1 if $self->_provider ne 'postgres';
283              
284 0         0 # Asynchronously request a lock with an indefinite wait.
285 0 0       0 my $dbh = $self->dbh;
286             $dbh->do(
287             'SELECT pg_advisory_lock(75474063)',
288             { pg_async => DBD::Pg::PG_ASYNC() },
289 0         0 );
290              
291             # Use _timeout to periodically check for the result.
292             return 1 if $self->_timeout(sub { $dbh->pg_ready && $dbh->pg_result });
293              
294             # Timed out, cancel the query and return false.
295 0     0 1 0 $dbh->pg_cancel;
296             return 0;
297             }
298              
299             my ($self, $file) = @_;
300             $self->_run('--file' => $file);
301             }
302 0     0 1 0  
303             my $self = shift;
304             # Suppress STDOUT unless we want extra verbosity.
305             my $meth = $self->can($self->sqitch->verbosity > 1 ? '_run' : '_capture');
306             return $self->$meth('--file' => @_);
307             }
308 0 0       0  
309             my ($self, $fh) = @_;
310             $self->_spool($fh);
311 0         0 }
312 0         0  
313             shift->_run_registry_file(@_);
314             }
315              
316             # Override to avoid cast errors, and to use VALUES instead of a UNION query.
317             my ( $self, $change ) = @_;
318 0 0   0   0 my @tags = $change->tags or return $self;
  0 0       0  
319             my $sqitch = $self->sqitch;
320              
321 0         0 my ($id, $name, $proj, $user, $email) = (
322 0         0 $change->id,
323             $change->format_name,
324             $change->project,
325             $sqitch->user_name,
326 2     2 1 1076 $sqitch->user_email
327 2         8 );
328              
329             $self->dbh->do(
330             q{
331 4     4 1 2376 INSERT INTO tags (
332             tag_id
333 4 100       56 , tag
334 4         246 , project
335             , change_id
336             , note
337             , committer_name
338 2     2 1 1078 , committer_email
339 2         8 , planned_at
340             , planner_name
341             , planner_email
342             )
343 0     0 1 0 SELECT tid, tg, proj, chid, n, name, email, at, pname, pemail FROM ( VALUES
344             } . join( ",\n ", ( q{(?::text, ?::text, ?::text, ?::text, ?::text, ?::text, ?::text, ?::timestamptz, ?::text, ?::text)} ) x @tags )
345             . q{
346             ) i(tid, tg, proj, chid, n, name, email, at, pname, pemail)
347             LEFT JOIN tags ON i.tid = tags.tag_id
348 0     0 1 0 WHERE tags.tag_id IS NULL
349 0 0       0 },
350 0         0 undef,
351             map { (
352 0         0 $_->id,
353             $_->format_name,
354             $proj,
355             $id,
356             $_->note,
357             $user,
358             $email,
359             $_->timestamp->as_string(format => 'iso'),
360             $_->planner_name,
361             $_->planner_email,
362             ) } @tags
363             );
364              
365             return $self;
366             }
367              
368             # Override to take advantage of the RETURNING expression, and to save tags as
369             # an array rather than a space-delimited string.
370             my ($self, $change) = @_;
371             my $dbh = $self->dbh;
372              
373             # Delete tags.
374             my $del_tags = $dbh->selectcol_arrayref(
375             'DELETE FROM tags WHERE change_id = ? RETURNING tag',
376             undef, $change->id
377             ) || [];
378              
379             # Retrieve dependencies.
380             my ($req, $conf) = $dbh->selectrow_array(q{
381             SELECT ARRAY(
382 0         0 SELECT dependency
383 0         0 FROM dependencies
384             WHERE change_id = $1
385             AND type = 'require'
386             ), ARRAY(
387             SELECT dependency
388             FROM dependencies
389             WHERE change_id = $1
390             AND type = 'conflict'
391             )
392             }, undef, $change->id);
393              
394             # Delete the change record.
395             $dbh->do(
396 0         0 'DELETE FROM changes where change_id = ?',
397             undef, $change->id,
398             );
399              
400             # Log it.
401             return $self->_log_event( revert => $change, $del_tags, $req, $conf );
402 0     0 1 0 }
403 0         0  
404             require App::Sqitch::DateTime;
405             return App::Sqitch::DateTime->new(split /:/ => shift);
406 0   0     0 }
407              
408             return 0 unless $DBI::state && $DBI::state eq '42P01'; # undefined_table
409             my $dbh = shift->dbh;
410             return 1 unless $dbh->{pg_server_version} >= 90000;
411              
412 0         0 # Try to avoid confusion for people monitoring the Postgres error log by
413             # sending a warning to the log immediately after the missing relation error
414             # to tell log watchers that Sqitch is aware of the issue and will next
415             # initialize the database. Hopefully this will reduce confusion and
416             # unnecessary time trouble shooting an error that Sqitch handles.
417             my @msg = map { $dbh->quote($_) } (
418             __ 'Sqitch registry not initialized',
419             __ 'Because the "changes" table does not exist, Sqitch will now initialize the database to create its registry tables.',
420             );
421             $dbh->do(sprintf q{DO $$
422             BEGIN
423             SET LOCAL client_min_messages = 'ERROR';
424             RAISE WARNING USING ERRCODE = 'undefined_table', MESSAGE = %s, DETAIL = %s;
425             END;
426             $$}, @msg);
427 0         0 return 1;
428             }
429              
430             return $DBI::state && $DBI::state eq '42703'; # undefined_column
431             }
432              
433 0         0 my ($self, $vals) = @_;
434             return '= ANY(?)', $vals;
435             }
436              
437 4     4   2370 my $self = shift;
438 4         56 my $sqitch = $self->sqitch;
439             my $pass = $self->password or return $sqitch->run( $self->psql, @_ );
440             local $ENV{PGPASSWORD} = $pass;
441             return $sqitch->run( $self->psql, @_ );
442 8 100 100 8   41788 }
443 4         14  
444 4 100       40 my $self = shift;
445             my $sqitch = $self->sqitch;
446             my $pass = $self->password or return $sqitch->capture( $self->psql, @_ );
447             local $ENV{PGPASSWORD} = $pass;
448             return $sqitch->capture( $self->psql, @_ );
449             }
450              
451 2         12 my $self = shift;
  4         794  
452             my $sqitch = $self->sqitch;
453             my $pass = $self->password or return $sqitch->probe( $self->psql, @_ );
454             local $ENV{PGPASSWORD} = $pass;
455 2         22 return $sqitch->probe( $self->psql, @_ );
456             }
457              
458             my $self = shift;
459             my $fh = shift;
460             my $sqitch = $self->sqitch;
461 2         16 my $pass = $self->password or return $sqitch->spool( $fh, $self->psql, @_ );
462             local $ENV{PGPASSWORD} = $pass;
463             return $sqitch->spool( $fh, $self->psql, @_ );
464             }
465 14   100 14   100  
466             1;
467              
468              
469 0     0   0 =head1 Name
470 0         0  
471             App::Sqitch::Engine::pg - Sqitch PostgreSQL Engine
472              
473             =head1 Synopsis
474 8     8   3620  
475 8         20 my $pg = App::Sqitch::Engine->load( engine => 'pg' );
476 8 100       164  
477 2         238 =head1 Description
478 2         8  
479             App::Sqitch::Engine::pg provides the PostgreSQL storage engine for Sqitch. It
480             supports PostgreSQL 8.4.0 and higher, Postgres-XC 1.2 and higher, and YugabyteDB.
481              
482 10     10   3802 =head1 Interface
483 10         36  
484 10 100       206 =head2 Instance Methods
485 2         36  
486 2         8 =head3 C<initialized>
487              
488             $pg->initialize unless $pg->initialized;
489              
490 4     4   2190 Returns true if the database has been initialized for Sqitch, and false if it
491 4         12 has not.
492 4 100       80  
493 2         24 =head3 C<initialize>
494 2         6  
495             $pg->initialize;
496              
497             Initializes a database for Sqitch by installing the Sqitch registry schema.
498 6     6   2232  
499 6         8 =head3 C<psql>
500 6         18  
501 6 100       118 Returns a list containing the C<psql> client and options to be passed to it.
502 2         28 Used internally when executing scripts.
503 2         6  
504             =head1 Author
505              
506             David E. Wheeler <david@justatheory.com>
507              
508             =head1 License
509              
510             Copyright (c) 2012-2022 iovation Inc., David E. Wheeler
511              
512             Permission is hereby granted, free of charge, to any person obtaining a copy
513             of this software and associated documentation files (the "Software"), to deal
514             in the Software without restriction, including without limitation the rights
515             to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
516             copies of the Software, and to permit persons to whom the Software is
517             furnished to do so, subject to the following conditions:
518              
519             The above copyright notice and this permission notice shall be included in all
520             copies or substantial portions of the Software.
521              
522             THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
523             IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
524             FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
525             AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
526             LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
527             OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
528             SOFTWARE.
529              
530             =cut