File Coverage

blib/lib/PGObject/Util/LogRep/TestDecoding.pm
Criterion Covered Total %
statement 57 59 96.6
branch 15 18 83.3
condition 2 5 40.0
subroutine 13 13 100.0
pod 3 3 100.0
total 90 98 91.8


line stmt bran cond sub pod time code
1             package PGObject::Util::LogRep::TestDecoding;
2              
3 3     3   460039 use 5.034;
  3         13  
4 3     3   19 use strict;
  3         13  
  3         113  
5 3     3   15 use warnings FATAL => 'all';
  3         6  
  3         219  
6 3     3   1423 use parent 'Exporter';
  3         2377  
  3         22  
7 3     3   4913 use Parse::RecDescent;
  3         188432  
  3         27  
8 3     3   2286 use Moo;
  3         29590  
  3         16  
9 3     3   3097 use Types::Standard qw(Maybe ArrayRef Str Bool Int);
  3         538924  
  3         45  
10 3     3   13206 use namespace::autoclean;
  3         71402  
  3         16  
11              
12             =head1 NAME
13              
14             PGObject::Util::LogRep::TestDecoding - Parse Pg's test_decoding messages
15              
16             =head1 VERSION
17              
18             Version 0.1.4
19              
20             =cut
21              
22             our $VERSION = '0.1.4';
23              
24              
25             =head1 SYNOPSIS
26              
27             use PGObject::util::LogRep::TestDecoding qw(parse_msg);
28              
29             my $msg = parse_msg($repmsg); # procedural interace
30             # tells you the operation, transaction status etc.
31              
32             # or the OO interface which gives more functionality
33              
34             my $decoder = PGObject::util::LogRep::TestDecoding->new(
35             schema=> ['myschema'], txn_status => 0
36             );
37             handle_message($decoder->parse($repmsg)) if $decoder->matches($repmsg);
38              
39             =head1 DESCRIPTION
40              
41             This module provides parse capabiltiies for the test_decoding plugin for
42             PostgreSQL's logical replication. The test_decoding plugin does not recognize
43             or handle publications and simply replicates everything.
44              
45             Messages follow formats like:
46              
47             table public.data: INSERT: id[integer]:3 data[text]:'5'
48              
49             or for begin or commit messages:
50              
51             BEGIN 529
52             COMMIT 529
53              
54             Transactions are always processed sequentially in the test decoding stack.
55              
56             This module can be combined with C to create programs
57             which process PostgreSQL's logical replication streams.
58              
59             Note that PostgreSQL's logical replication sends out transactions in commit
60             order and this module assumes that it will process most messages if transaction
61             information is important (which it might not be for some applications).
62              
63             =head1 EXPORT
64              
65             =over
66              
67             =item parse_msg # single message / non-OO parser
68              
69             =back
70              
71             =cut
72              
73 3     3   2672 BEGIN { our @EXPORT_OK = ('parse_msg'); }
74              
75             =head1 ATTRIBUTES/ACCESSORS
76              
77             These are for the OO interface only. These are read-ony after the object is
78             created but can be set in the constructor. If you need to change them. create
79             a new object instead.
80              
81             =head2 schema Maybe[ArrayRef[Str]]
82              
83             Undef or an arrayref of schalars. If it is set, then matches returns true if
84             the message matches any table in any schema mentioned.
85              
86             =cut
87              
88             has schema => (is => 'ro', isa => Maybe[ArrayRef[Str]]);
89              
90             =head2 txn_status Bool
91              
92             Whether to report transactoin status.
93              
94             =cut
95              
96             has txn_status => (is => 'ro', isa => Bool);
97              
98             =head2 tables Maybe[ArrayRef[Str]]
99              
100             A list of fully qualified tablenames to match against. Note that this filter
101             operates along with the schema filter and if either matches, the match is
102             met.
103              
104             =cut
105              
106             has tables => (is => 'ro', isa => Maybe[ArrayRef[Str]]);
107              
108             =head2 current_txn (calculated)
109              
110             Logical replication sends messages out for transactions in commit order.
111             Assuming the transaction numbers have been requested, this will produce the
112             transaction number of the most recent BEGIN statement. Note that this
113             information is only available when certain options are passed so it may return
114             C.
115              
116             =cut
117              
118             has current_txn => (is => 'rw', writer => '_set_current_txn');
119              
120             =head1 GRAMMAR
121              
122             Test_decoding lines come in two basic formats: transaction control lines and
123             DML lines. The former have a syntax like C (or COMMIT).
124              
125             The DML records have a longer and more complex. They have a format begins with
126             the word "table" followed by a fully qualified tablename, then an operation,
127             then a column list in name[type]:value format. Identifiers can be SQL escaped.
128             So can literals.
129              
130             Since transactions are handled sequentially in commit order, the DML records do
131             not carry transaction identifiers in them.
132              
133             =cut
134              
135              
136             my $grammar = <<'_ENDGRAMMAR' ;
137             { my $retval = {}; 1; }
138             record : dmlrec | txnrec
139             { $retval; }
140             txnrec : txncmd txnid(?)
141             { $retval->{"txn_cmd"} = $item[1]; $retval; }
142             { $retval->{"type"} = "txn"; $retval; }
143             dmlrec : header operation ":" col(s)
144             { $retval->{"type"} = "dml"; $retval; }
145             { $retval->{"operation"} = $item{operation}; $retval }
146             header : "table " schema "." tablename ": "
147             col : column(s)
148             schema : sqlident
149             { $retval->{"schema"} = $item[1]; $retval; }
150             tablename : sqlident
151             { $retval->{"tablename"} = $item[1]; $retval; }
152             column : /\s?/ colname "[" coltype "]" ":" value
153             { $retval->{row_data}->{$item{"colname"}} = $item{"value"} }
154             colname : sqlident
155             coltype : schemaq(?) sqlident array(?)
156             schemaq : sqlident '.'
157             array : '[]'
158             value : literal
159             sqlident : /[a-zA-Z0-9()_ ]+/ | /"([^"]|"")+"/
160             literal : /\w+/ | /'([^']|'')+'/
161             txnid : /\d+/
162             { $retval->{txnid} = $item[1]; $retval; }
163             txncmd : "BEGIN" | "COMMIT"
164             operation : "INSERT" | "UPDATE" | "DELETE"
165             _ENDGRAMMAR
166              
167              
168              
169             =head1 SUBROUTINES/METHODS
170              
171             =head2 output
172              
173             The Parsing routines return a consistent payload in the form of a hashref with
174             one of two formats depending on the message type. Both forms have a "type"
175             field which is set to "txn" or "dml" depending on the record type.
176              
177             =head3 txn messages
178              
179             The txn message output has three fields:
180              
181             =over
182              
183             =item type = txn
184              
185             =item txnid
186              
187             Integer, may be omitted if data not available
188              
189             =item txncmd
190              
191             Either BEGIN or COMMIT
192              
193             =back
194              
195             Examples:
196              
197             { type => "txn",
198             txncmd => "BEGIN',
199             txnid => 50123 }
200              
201             { type => "txn",
202             txncmd => "COMMIT',
203             txnid => 50123 }
204              
205             Or if transaction numbers are not available"
206              
207             { type => "txn",
208             txncmd => "BEGIN' }
209              
210             =head3 dml messages
211              
212             The dml lessages have a number of fields:
213              
214             =over
215              
216             =item type = "dml"
217              
218             =item schema
219              
220             Namespace of the table
221              
222             =item tablename
223              
224             Name of the table
225              
226             =item row_data
227              
228             A hashref of name => value.
229              
230             =item operation
231              
232             One of INSERT, UPDATE, or DELETE
233              
234             =back
235              
236             Examples:
237              
238             { type => 'dml',
239             tablename => 'sometable',
240             schema => 'public',
241             row_data => { id => 1, key => 'test', value => 123 },
242             operation => 'INSERT' }
243              
244             { type => 'dml',
245             tablename => 'sometable',
246             schema => 'public',
247             row_data => { id => 1, key => 'test', value => 123 },
248             operation => 'DELETE' }
249              
250              
251             =head2 parse (OOP interface)
252              
253             In the OOP interface, the parse function parses the message and returns the
254             output.
255              
256             =cut
257              
258             sub _unescape {
259 49     49   100 my ($val, $escape) = @_;
260 49 50       97 return unless defined $val;
261 49 100       450 return $val unless $val =~ /^$escape/;
262 10         163 $val =~ s/(^$escape|$escape$)//g;
263 10         135 $val =~ s/$escape{2}/$escape/g;
264 10         42 return $val;
265             }
266              
267             sub parse_msg {
268 11     11 1 532688 my ($msg) = @_;
269 11         68 my $parser = new Parse::RecDescent ($grammar);
270 11   50     700440 my $parsed = $parser->record($msg) || return;
271 11 100       73022 return $parsed if $parsed->{type} eq 'txn';
272 7         50 for (qw(schema tablename)){
273 14         59 $parsed->{$_} = _unescape($parsed->{$_}, '"');
274             }
275 7         16 my $rowdata = $parsed->{row_data};
276 7         35 delete $parsed->{row_data};
277 7         17 for (keys %{$rowdata}) {
  7         27  
278 17         37 my $key = _unescape($_, '"');
279             $parsed->{row_data}->{$key} = $rowdata->{$_} eq 'null' ? undef :
280 17 100       60 _unescape($rowdata->{$_}, q('));
281             }
282 7         56 return $parsed;
283             }
284              
285             sub parse {
286 4     4 1 3594 my ($self, $msg) = @_;
287 4   33     13 my $parsed = parse_msg($msg) || warn "Invalid message $msg";
288 4 100       2197 if ($parsed->{type} eq 'txn') {
289 2         29 $self->_set_current_txn($parsed->{txnid});
290             }
291 4         62 return $parsed;
292             }
293              
294             =head2 matches (OOP Interface)
295              
296             Evaluates whether the schema AND tablename match rules are met. If the
297             message
298             is a txn message it will then be processed (and possibly affect the txnid
299             state).
300              
301             Note that only txn messages are parsed here.
302              
303             =cut
304              
305             sub matches {
306 5     5 1 274864 my ($self, $msg) = @_;
307 5 50       25 if ($msg =~ /^table /){
308 5 50       28 if ($self->schema){
309 5         7 for my $ns (@{$self->schema}){
  5         17  
310 5 100       57 return 1 if $msg =~ /^table $ns\./;
311             }
312 4         9 for my $tn (@{$self->tables}){
  4         13  
313 7 100       137 return 1 if $msg =~ /^table $tn:/;
314             }
315             }
316             } else {
317 0           $self->parse($msg);
318 0           return 0;
319             }
320             }
321              
322             =head2 parse_msg
323              
324             parse_msg parses the message provided and returns a hashref as detailed above.
325              
326             =head1 AUTHOR
327              
328             Chris Travers, C<< >>
329              
330             =head1 BUGS
331              
332             Please report any bugs or feature requests to C, or through
333             the web interface at L. I will be notified, and then you'll
334             automatically be notified of progress on your bug as I make changes.
335              
336              
337              
338              
339             =head1 SUPPORT
340              
341             You can find documentation for this module with the perldoc command.
342              
343             perldoc PGObject::util::LogRep::TestDecoding
344              
345              
346             You can also look for information at:
347              
348             =over 4
349              
350             =item * RT: CPAN's request tracker (report bugs here)
351              
352             L
353              
354             When submitting a bug, lease try to include the message that causes it.
355              
356             =item * CPAN Ratings
357              
358             L
359              
360             =item * Search CPAN
361              
362             L
363              
364             =back
365              
366              
367             =head1 ACKNOWLEDGEMENTS
368              
369              
370             =head1 LICENSE AND COPYRIGHT
371              
372             This software is Copyright (c) 2023 by Chris Travers.
373              
374             This program is released under the following license:
375              
376             BSD2
377              
378              
379             =cut
380              
381             1; # End of PGObject::util::LogRep::TestDecoding