File Coverage

blib/lib/App/Sqitch/Engine/pg.pm
Criterion Covered Total %
statement 117 164 71.3
branch 28 42 66.6
condition 13 19 68.4
subroutine 35 53 66.0
pod 17 17 100.0
total 210 295 71.1


line stmt bran cond sub pod time code
1              
2             use 5.010;
3 9     9   105605 use Moo;
  9         30  
4 9     9   46 use utf8;
  9         22  
  9         62  
5 9     9   2760 use Path::Class;
  9         19  
  9         49  
6 9     9   210 use DBI;
  9         16  
  9         483  
7 9     9   2922 use Try::Tiny;
  9         31386  
  9         345  
8 9     9   57 use App::Sqitch::X qw(hurl);
  9         17  
  9         483  
9 9     9   73 use Locale::TextDomain qw(App-Sqitch);
  9         20  
  9         75  
10 9     9   2589 use App::Sqitch::Plan::Change;
  9         28  
  9         59  
11 9     9   1514 use List::Util qw(first);
  9         21  
  9         269  
12 9     9   47 use App::Sqitch::Types qw(DBH ArrayRef);
  9         17  
  9         656  
13 9     9   64 use Type::Utils qw(enum);
  9         16  
  9         96  
14 9     9   7575 use namespace::autoclean;
  9         15  
  9         81  
15 9     9   4649  
  9         18  
  9         75  
16             extends 'App::Sqitch::Engine';
17              
18             our $VERSION = 'v1.3.1'; # VERSION
19              
20             my $self = shift;
21              
22 26     26 1 988 # Just use the target name if it doesn't look like a URI or if the URI
23             # includes the database name.
24             return $self->target->name if $self->target->name !~ /:/
25             || $self->target->uri->dbname;
26 26 50 33     230  
27             # Use the URI sans password, and with the database name added.
28             my $uri = $self->target->uri->clone;
29             $uri->password(undef) if $uri->password;
30 26         1012 $uri->dbname(
31 26 50       284 $ENV{PGDATABASE}
32             || $self->username
33             || $ENV{PGUSER}
34             || $self->sqitch->sysuser
35             );
36 26   66     978 return $uri->as_string;
37             }
38 26         1530  
39             # DBD::pg and psql use fallbacks consistently, thanks to libpq. These include
40             # environment variables, system info (username), the password file, and the
41             # connection service file. Best for us not to second-guess these values,
42             # though we admittedly try when setting the database name in the destination
43             # URI for unnamed targets a few lines up from here.
44              
45             has _psql => (
46       18     is => 'ro',
47       6     isa => ArrayRef,
48             lazy => 1,
49             default => sub {
50             my $self = shift;
51             my $uri = $self->uri;
52             my @ret = ( $self->client );
53              
54             my %query_params = $uri->query_params;
55             my @conninfo;
56             # Use _port instead of port so it's empty if no port is in the URI.
57             # https://github.com/sqitchers/sqitch/issues/675
58             for my $spec (
59             [ user => $self->username ],
60             [ dbname => $uri->dbname ],
61             [ host => $uri->host ],
62             [ port => $uri->_port ],
63             map { [ $_ => $query_params{$_} ] }
64             sort keys %query_params,
65             ) {
66             next unless defined $spec->[1] && length $spec->[1];
67             if ($spec->[1] =~ /[ "'\\]/) {
68             $spec->[1] =~ s/([ "'\\])/\\$1/g;
69             }
70             push @conninfo, "$spec->[0]=$spec->[1]";
71             }
72              
73             push @ret => '--dbname', join ' ', @conninfo if @conninfo;
74              
75             if (my %vars = $self->variables) {
76             push @ret => map {; '--set', "$_=$vars{$_}" } sort keys %vars;
77             }
78              
79             push @ret => $self->_client_opts;
80             return \@ret;
81             },
82             );
83              
84             my $self = shift;
85             return (
86             '--quiet',
87             '--no-psqlrc',
88             '--no-align',
89 12     12   20 '--tuples-only',
90             '--set' => 'ON_ERROR_STOP=1',
91 12         178 '--set' => 'registry=' . $self->registry,
92             );
93             }
94              
95              
96              
97             has _provider => (
98             is => 'rw',
99             isa => enum([qw( postgres yugabyte )]),
100 58     58 1 24810 default => 'postgres',
  58         1050  
101             lazy => 1,
102 4     4 1 12912 );
103 3     3 1 43  
104 0     0 1 0 has dbh => (
105 11     11 1 1329 is => 'rw',
106             isa => DBH,
107             lazy => 1,
108             default => sub {
109             my $self = shift;
110             $self->use_driver;
111              
112             my $uri = $self->uri;
113             local $ENV{PGCLIENTENCODING} = 'UTF8';
114             DBI->connect($uri->dbi_dsn, $self->username, $self->password, {
115             PrintError => 0,
116             RaiseError => 0,
117             AutoCommit => 1,
118             pg_enable_utf8 => 1,
119             pg_server_prepare => 1,
120             HandleError => sub {
121             my ($err, $dbh) = @_;
122             $@ = $err;
123             @_ = ($dbh->state || 'DEV' => $dbh->errstr);
124             goto &hurl;
125             },
126             Callbacks => {
127             connected => sub {
128             my $dbh = shift;
129             $dbh->do('SET client_min_messages = WARNING');
130             try {
131             $dbh->do(
132             'SET search_path = ?',
133             undef, $self->registry
134             );
135             # https://www.nntp.perl.org/group/perl.dbi.dev/2013/11/msg7622.html
136             $dbh->set_err(undef, undef) if $dbh->err;
137             };
138             # Determine the provider. Yugabyte says this is the right way to do it.
139             # https://yugabyte-db.slack.com/archives/CG0KQF0GG/p1653762283847589
140             my $v = $dbh->selectcol_arrayref(
141             q{SELECT split_part(version(), ' ', 2)}
142             )->[0] // '';
143             $self->_provider('yugabyte') if $v =~ /-YB-/;
144             return;
145             },
146             },
147             });
148             }
149             );
150              
151             # Need to wait until dbh is defined.
152             with 'App::Sqitch::Role::DBIEngine';
153              
154             [ map { $_->format_name } $_[1]->tags ];
155             }
156              
157             [ map { $_->as_string } $_[1]->requires ];
158             }
159              
160             [ map { $_->as_string } $_[1]->conflicts ];
161             }
162              
163             q{to_char(%s AT TIME ZONE 'UTC', '"year":YYYY:"month":MM:"day":DD:"hour":HH24:"minute":MI:"second":SS:"time_zone":"UTC"')};
164             }
165 0     0   0  
  0         0  
166              
167              
168             my $dbh = shift->dbh;
169 0     0   0 # Since 9.3, we can use array_remove().
  0         0  
170             return q{array_remove(array_agg(%1$s ORDER BY %1$s), NULL)}
171             if $dbh->{pg_server_version} >= 90300;
172              
173 0     0   0 # Since 8.4 we can use ORDER BY.
  0         0  
174             return q{ARRAY(SELECT * FROM UNNEST( array_agg(%1$s ORDER BY %1$s) ) a WHERE a IS NOT NULL)}
175             if $dbh->{pg_server_version} >= 80400;
176              
177 2     2   2356 return q{ARRAY(SELECT * FROM UNNEST( array_agg(%s) ) a WHERE a IS NOT NULL)};
178             }
179              
180 0     0   0  
181              
182 0     0   0 my $self = shift;
183             return $self->dbh->selectcol_arrayref(q{
184             SELECT EXISTS(
185 10     10   2010 SELECT TRUE FROM pg_catalog.pg_tables
186             WHERE schemaname = ? AND tablename = ?
187             )
188 10 100       80 }, undef, $self->registry, 'changes')->[0];
189             }
190              
191             my $self = shift;
192 6 100       50 hurl engine => __x(
193             'Sqitch schema "{schema}" already exists',
194 2         276 schema => $self->registry
195             ) if $self->initialized;
196             $self->_run_registry_file( file(__FILE__)->dir->file($self->key . '.sql') );
197 0     0   0 $self->_register_release;
198             }
199 0     0   0  
200             my $self = shift;
201             my $psql_version = $self->sqitch->probe($self->client, '--version');
202 0     0 1 0 my @parts = split /\s+/, $psql_version;
203 0         0 my ($maj) = $parts[-1] =~ /^(\d+)/;
204             return $maj || 0;
205             }
206              
207             my ($self, $file) = @_;
208             my $schema = $self->registry;
209              
210             # Fetch the client version. 8.4 == 80400
211             my $version = $self->_probe('-c', 'SHOW server_version_num');
212 0     0 1 0 my $psql_maj = $self->_psql_major_version;
213 0 0       0  
214             # Is this XC?
215             my $opts = $self->_probe('-c', q{
216             SELECT count(*)
217 0         0 FROM pg_catalog.pg_proc p
218 0         0 JOIN pg_catalog.pg_namespace n ON p.pronamespace = n.oid
219             WHERE nspname = 'pg_catalog'
220             AND proname = 'pgxc_version';
221             }) ? ' DISTRIBUTE BY REPLICATION' : '';
222 12     12   19722  
223 12         278 if ($version < 90300 || $psql_maj < 9) {
224 12         564 # Need to transform the SQL and write it to a temp file.
225 12         40 my $sql = scalar $file->slurp;
226 12   50     72  
227             # No CREATE SCHEMA IF NOT EXISTS syntax prior to 9.3.
228             $sql =~ s/SCHEMA IF NOT EXISTS/SCHEMA/ if $version < 90300;
229             if ($psql_maj < 9) {
230 6     6   22988 # Also no :"registry" variable syntax prior to psql 9.0.s
231 6         126 ($schema) = $self->dbh->selectrow_array(
232             'SELECT quote_ident(?)', undef, $schema
233             );
234 6         250 $sql =~ s{:"registry"}{$schema}g;
235 6         38 }
236             require File::Temp;
237             my $fh = File::Temp->new;
238 6 100       22 print $fh $sql;
239             close $fh;
240             $self->_run(
241             '--file' => $fh->filename,
242             '--set' => "tableopts=$opts",
243             );
244             } else {
245             # We can take advantage of the :"registry" variable syntax.
246 6 100 100     98 $self->_run(
247             '--file' => $file,
248 4         28 '--set' => "registry=$schema",
249             '--set' => "tableopts=$opts",
250             );
251 4 100       1240 }
252 4 100       174  
253             $self->dbh->do('SET search_path = ?', undef, $schema);
254 2         14 }
255              
256             # Override to lock the changes table. This ensures that only one instance of
257 2         102 # Sqitch runs at one time.
258             my $self = shift;
259 4         32 my $dbh = $self->dbh;
260 4         24  
261 4         2280 # Start transaction and lock changes to allow only one change at a time.
262 4         100 $dbh->begin_work;
263 4         30 $dbh->do('LOCK TABLE changes IN EXCLUSIVE MODE')
264             if $self->_provider eq 'postgres';
265             # Yugabyte does not yet support EXCLUSIVE MODE.
266             # https://docs.yugabyte.com/preview/api/ysql/the-sql-language/statements/txn_lock/#lockmode-1
267             return $self;
268             }
269 2         16  
270             # Override to try to acquire a lock on a constant number without waiting.
271             shift->dbh->selectcol_arrayref(
272             'SELECT pg_try_advisory_lock(75474063)'
273             )->[0]
274             }
275              
276 6         86 # Override to try to acquire a lock on a constant number, waiting for the lock
277             # until timeout.
278             my $self = shift;
279              
280             # Yugabyte does not support advisory locks.
281             # https://github.com/yugabyte/yugabyte-db/issues/3642
282 0     0 1 0 # Use pessimistic locking when it becomes available.
283 0         0 # https://github.com/yugabyte/yugabyte-db/issues/5680
284             return 1 if $self->_provider ne 'postgres';
285              
286 0         0 # Asynchronously request a lock with an indefinite wait.
287 0 0       0 my $dbh = $self->dbh;
288             $dbh->do(
289             'SELECT pg_advisory_lock(75474063)',
290             { pg_async => DBD::Pg::PG_ASYNC() },
291 0         0 );
292              
293             # Use _timeout to periodically check for the result.
294             return 1 if $self->_timeout(sub { $dbh->pg_ready && $dbh->pg_result });
295              
296             # Timed out, cancel the query and return false.
297 0     0 1 0 $dbh->pg_cancel;
298             return 0;
299             }
300              
301             my ($self, $file) = @_;
302             $self->_run('--file' => $file);
303             }
304 0     0 1 0  
305             my $self = shift;
306             # Suppress STDOUT unless we want extra verbosity.
307             my $meth = $self->can($self->sqitch->verbosity > 1 ? '_run' : '_capture');
308             return $self->$meth('--file' => @_);
309             }
310 0 0       0  
311             my ($self, $fh) = @_;
312             $self->_spool($fh);
313 0         0 }
314 0         0  
315             shift->_run_registry_file(@_);
316             }
317              
318             # Override to avoid cast errors, and to use VALUES instead of a UNION query.
319             my ( $self, $change ) = @_;
320 0 0   0   0 my @tags = $change->tags or return $self;
  0 0       0  
321             my $sqitch = $self->sqitch;
322              
323 0         0 my ($id, $name, $proj, $user, $email) = (
324 0         0 $change->id,
325             $change->format_name,
326             $change->project,
327             $sqitch->user_name,
328 2     2 1 1278 $sqitch->user_email
329 2         8 );
330              
331             $self->dbh->do(
332             q{
333 4     4 1 2810 INSERT INTO tags (
334             tag_id
335 4 100       62 , tag
336 4         302 , project
337             , change_id
338             , note
339             , committer_name
340 2     2 1 1316 , committer_email
341 2         10 , planned_at
342             , planner_name
343             , planner_email
344             )
345 0     0 1 0 SELECT tid, tg, proj, chid, n, name, email, at, pname, pemail FROM ( VALUES
346             } . join( ",\n ", ( q{(?::text, ?::text, ?::text, ?::text, ?::text, ?::text, ?::text, ?::timestamptz, ?::text, ?::text)} ) x @tags )
347             . q{
348             ) i(tid, tg, proj, chid, n, name, email, at, pname, pemail)
349             LEFT JOIN tags ON i.tid = tags.tag_id
350 0     0 1 0 WHERE tags.tag_id IS NULL
351 0 0       0 },
352 0         0 undef,
353             map { (
354 0         0 $_->id,
355             $_->format_name,
356             $proj,
357             $id,
358             $_->note,
359             $user,
360             $email,
361             $_->timestamp->as_string(format => 'iso'),
362             $_->planner_name,
363             $_->planner_email,
364             ) } @tags
365             );
366              
367             return $self;
368             }
369              
370             # Override to take advantage of the RETURNING expression, and to save tags as
371             # an array rather than a space-delimited string.
372             my ($self, $change) = @_;
373             my $dbh = $self->dbh;
374              
375             # Delete tags.
376             my $del_tags = $dbh->selectcol_arrayref(
377             'DELETE FROM tags WHERE change_id = ? RETURNING tag',
378             undef, $change->id
379             ) || [];
380              
381             # Retrieve dependencies.
382             my ($req, $conf) = $dbh->selectrow_array(q{
383             SELECT ARRAY(
384 0         0 SELECT dependency
385 0         0 FROM dependencies
386             WHERE change_id = $1
387             AND type = 'require'
388             ), ARRAY(
389             SELECT dependency
390             FROM dependencies
391             WHERE change_id = $1
392             AND type = 'conflict'
393             )
394             }, undef, $change->id);
395              
396             # Delete the change record.
397             $dbh->do(
398 0         0 'DELETE FROM changes where change_id = ?',
399             undef, $change->id,
400             );
401              
402             # Log it.
403             return $self->_log_event( revert => $change, $del_tags, $req, $conf );
404 0     0 1 0 }
405 0         0  
406             require App::Sqitch::DateTime;
407             return App::Sqitch::DateTime->new(split /:/ => shift);
408 0   0     0 }
409              
410             return 0 unless $DBI::state && $DBI::state eq '42P01'; # undefined_table
411             my $dbh = shift->dbh;
412             return 1 unless $dbh->{pg_server_version} >= 90000;
413              
414 0         0 # Try to avoid confusion for people monitoring the Postgres error log by
415             # sending a warning to the log immediately after the missing relation error
416             # to tell log watchers that Sqitch is aware of the issue and will next
417             # initialize the database. Hopefully this will reduce confusion and
418             # unnecessary time trouble shooting an error that Sqitch handles.
419             my @msg = map { $dbh->quote($_) } (
420             __ 'Sqitch registry not initialized',
421             __ 'Because the "changes" table does not exist, Sqitch will now initialize the database to create its registry tables.',
422             );
423             $dbh->do(sprintf q{DO $$
424             BEGIN
425             SET LOCAL client_min_messages = 'ERROR';
426             RAISE WARNING USING ERRCODE = 'undefined_table', MESSAGE = %s, DETAIL = %s;
427             END;
428             $$}, @msg);
429 0         0 return 1;
430             }
431              
432             return $DBI::state && $DBI::state eq '42703'; # undefined_column
433             }
434              
435 0         0 my ($self, $vals) = @_;
436             return '= ANY(?)', $vals;
437             }
438              
439 4     4   2630 my $self = shift;
440 4         74 my $sqitch = $self->sqitch;
441             my $pass = $self->password or return $sqitch->run( $self->psql, @_ );
442             local $ENV{PGPASSWORD} = $pass;
443             return $sqitch->run( $self->psql, @_ );
444 8 100 100 8   6282 }
445 4         18  
446 4 100       40 my $self = shift;
447             my $sqitch = $self->sqitch;
448             my $pass = $self->password or return $sqitch->capture( $self->psql, @_ );
449             local $ENV{PGPASSWORD} = $pass;
450             return $sqitch->capture( $self->psql, @_ );
451             }
452              
453 2         20 my $self = shift;
  4         998  
454             my $sqitch = $self->sqitch;
455             my $pass = $self->password or return $sqitch->probe( $self->psql, @_ );
456             local $ENV{PGPASSWORD} = $pass;
457 2         22 return $sqitch->probe( $self->psql, @_ );
458             }
459              
460             my $self = shift;
461             my $fh = shift;
462             my $sqitch = $self->sqitch;
463 2         22 my $pass = $self->password or return $sqitch->spool( $fh, $self->psql, @_ );
464             local $ENV{PGPASSWORD} = $pass;
465             return $sqitch->spool( $fh, $self->psql, @_ );
466             }
467 14   100 14   106  
468             1;
469              
470              
471 0     0   0 =head1 Name
472 0         0  
473             App::Sqitch::Engine::pg - Sqitch PostgreSQL Engine
474              
475             =head1 Synopsis
476 8     8   4202  
477 8         20 my $pg = App::Sqitch::Engine->load( engine => 'pg' );
478 8 100       174  
479 2         288 =head1 Description
480 2         8  
481             App::Sqitch::Engine::pg provides the PostgreSQL storage engine for Sqitch. It
482             supports PostgreSQL 8.4.0 and higher, Postgres-XC 1.2 and higher, and YugabyteDB.
483              
484 10     10   4454 =head1 Interface
485 10         42  
486 10 100       218 =head2 Instance Methods
487 2         42  
488 2         30 =head3 C<initialized>
489              
490             $pg->initialize unless $pg->initialized;
491              
492 4     4   2684 Returns true if the database has been initialized for Sqitch, and false if it
493 4         18 has not.
494 4 100       84  
495 2         38 =head3 C<initialize>
496 2         8  
497             $pg->initialize;
498              
499             Initializes a database for Sqitch by installing the Sqitch registry schema.
500 6     6   2682  
501 6         12 =head3 C<psql>
502 6         18  
503 6 100       130 Returns a list containing the C<psql> client and options to be passed to it.
504 2         76 Used internally when executing scripts.
505 2         14  
506             =head1 Author
507              
508             David E. Wheeler <david@justatheory.com>
509              
510             =head1 License
511              
512             Copyright (c) 2012-2022 iovation Inc., David E. Wheeler
513              
514             Permission is hereby granted, free of charge, to any person obtaining a copy
515             of this software and associated documentation files (the "Software"), to deal
516             in the Software without restriction, including without limitation the rights
517             to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
518             copies of the Software, and to permit persons to whom the Software is
519             furnished to do so, subject to the following conditions:
520              
521             The above copyright notice and this permission notice shall be included in all
522             copies or substantial portions of the Software.
523              
524             THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
525             IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
526             FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
527             AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
528             LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
529             OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
530             SOFTWARE.
531              
532             =cut