File Coverage

lib/Data/Record/Serialize/Encode/dbi.pm
Criterion Covered Total %
statement 133 182 73.0
branch 39 78 50.0
condition 10 23 43.4
subroutine 26 32 81.2
pod 5 6 83.3
total 213 321 66.3


line stmt bran cond sub pod time code
1             package Data::Record::Serialize::Encode::dbi;
2              
3             # ABSTRACT: store a record in a database
4              
5 2     2   1182597 use v5.12;
  2         19  
6 2     2   637 use Moo::Role;
  2         19906  
  2         17  
7              
8             use Data::Record::Serialize::Error {
9 2         51 errors => [
10             qw( param
11             connect
12             schema
13             create
14             insert
15             sqlite_backend
16             ),
17             ],
18             },
19 2     2   2237 -all;
  2         17572  
20              
21             our $VERSION = '1.08';
22              
23 2     2   7610 use Data::Record::Serialize::Types -types;
  2         254930  
  2         22  
24              
25 2     2   11594 use SQL::Translator;
  2         689261  
  2         92  
26 2     2   18 use SQL::Translator::Schema;
  2         5  
  2         67  
27 2     2   10 use Types::Standard -types;
  2         4  
  2         25  
28 2     2   17945 use Types::Common::String qw( NonEmptySimpleStr );
  2         93867  
  2         34  
29              
30 2     2   3083 use List::Util 1.33 qw[ pairmap any ];
  2         55  
  2         280  
31              
32 2     2   18 use DBI;
  2         7  
  2         111  
33              
34 2     2   16 use namespace::clean;
  2         5  
  2         25  
35              
36              
37              
38              
39              
40              
41              
42             has dsn => (
43             is => 'ro',
44             required => 1,
45             coerce => sub {
46             my $arg = 'ARRAY' eq ref $_[0] ? $_[0] : [ $_[0] ];
47             my @dsn;
48             my @args;
49             for my $el ( @{$arg} ) {
50              
51             my $ref = ref $el;
52             unless ( $ref eq 'ARRAY' || $ref eq 'HASH' ) {
53             push( @dsn, $el );
54             next;
55             }
56              
57             my @arr = $ref eq 'ARRAY' ? @{$el} : %{$el};
58              
59             push @args, pairmap { join( q{=}, $a, $b ) } @arr;
60             }
61             my $args = join( q{;}, @args );
62             push @dsn, $args if length $args;
63             unshift @dsn, 'dbi' unless $dsn[0] =~ /^dbi/;
64             return join( q{:}, @dsn );
65             },
66             );
67              
68             has _cached => (
69             is => 'ro',
70             default => 0,
71             init_arg => 'cached',
72             );
73              
74              
75              
76              
77              
78              
79              
80              
81             has table => (
82             is => 'ro',
83             isa => Str,
84             required => 1,
85             );
86              
87              
88              
89              
90              
91              
92              
93              
94              
95              
96              
97             has schema => (
98             is => 'ro',
99             isa => Maybe [NonEmptySimpleStr],
100             );
101              
102              
103              
104              
105              
106              
107              
108             has drop_table => (
109             is => 'ro',
110             isa => Bool,
111             default => 0,
112             );
113              
114              
115              
116              
117              
118              
119              
120             has create_output_dir => (
121             is => 'ro',
122             isa => Bool,
123             default => 0,
124             );
125              
126              
127              
128              
129              
130              
131              
132              
133              
134             has create_table => (
135             is => 'ro',
136             isa => Bool,
137             default => 1,
138             );
139              
140              
141              
142              
143              
144              
145              
146             has primary => (
147             is => 'ro',
148             isa => ArrayOfStr,
149             coerce => 1,
150             default => sub { [] },
151             );
152              
153              
154              
155              
156              
157              
158              
159             has db_user => (
160             is => 'ro',
161             isa => Str,
162             default => q{},
163             );
164              
165              
166              
167              
168              
169              
170              
171             has db_pass => (
172             is => 'ro',
173             isa => Str,
174             default => q{},
175             );
176              
177             has _sth => (
178             is => 'rwp',
179             init_arg => undef,
180             );
181              
182             has _dbh => (
183             is => 'rwp',
184             init_arg => undef,
185             clearer => 1,
186             predicate => 1,
187             );
188              
189              
190              
191              
192              
193              
194              
195             has column_defs => (
196             is => 'rwp',
197             lazy => 1,
198             clearer => 1,
199             init_arg => undef,
200             builder => sub {
201 0     0   0 my $self = shift;
202              
203 0         0 my @column_defs;
204 0         0 for my $field ( @{ $self->output_fields } ) {
  0         0  
205             push @column_defs,
206             join( q{ },
207             $field,
208 0         0 $self->output_types->{$field},
209             ( 'primary key' )x!!( $self->primary eq $field ) );
210             }
211              
212 0         0 return join ', ', @column_defs;
213             },
214             );
215              
216              
217              
218              
219              
220              
221              
222             has batch => (
223             is => 'ro',
224             isa => Int,
225             default => 100,
226             coerce => sub { $_[0] > 1 ? $_[0] : 0 },
227             );
228              
229              
230              
231              
232              
233              
234              
235             has dbitrace => ( is => 'ro', );
236              
237              
238              
239              
240              
241              
242              
243              
244              
245              
246              
247              
248              
249             has queue => (
250             is => 'ro',
251             init_arg => undef,
252             default => sub { [] },
253             );
254              
255              
256              
257              
258              
259              
260              
261             has quote_identifiers => (
262             is => 'ro',
263             isa => Bool,
264             default => !!0,
265             );
266              
267             around '_build__nullified' => sub {
268             my $orig = shift;
269             my $self = $_[0];
270              
271             my $nullified = $self->$orig( @_ );
272              
273             # defer to the caller
274             return $nullified if $self->has_nullify;
275              
276             # add all of the numeric fields
277             [ @{ $self->numeric_fields } ];
278              
279             };
280              
281             my %MapTypes = (
282             Pg => { S => 'text', N => 'real', I => 'integer', B => 'boolean' },
283             SQLite => { S => 'text', N => 'real', I => 'integer', B => 'integer' },
284             Default => { S => 'text', N => 'real', I => 'integer', B => 'integer' },
285             );
286              
287             sub _map_types {
288 15   33 15   7214 $MapTypes{ $_[0]->_dbi_driver } // $MapTypes{Default};
289             }
290              
291              
292              
293              
294              
295              
296              
297              
298              
299 0 0   0 1 0 sub to_bool { $_[0] ? 1 : 0 }
300              
301             sub _table_exists {
302 5     5   12 my $self = shift;
303              
304             # DBD::Sybase doesn't filter, so need to search
305 5         56 my $matches
306             = $self->_dbh->table_info( q{%}, $self->schema, $self->table, 'TABLE' )->fetchall_arrayref;
307              
308             # if $self->table needs to be quoted, table_info returns a quoted
309             # string (at least for DBD::Pg), so need an extra check...
310             return
311 1 50   1   11 any { $_->[2] eq $self->table or $_->[2] eq $self->_dbh->quote_identifier( $self->table ) }
312 5         4383 @{$matches};
  5         47  
313             }
314              
315             sub _fq_table_name {
316 5     5   10 my $self = shift;
317 5 50       62 defined $self->schema ? $self->schema . q{.} . $self->table : $self->table;
318             }
319              
320             has _dsn_components => (
321             is => 'lazy',
322             init_arg => undef,
323             builder => sub {
324 5     5   57 my %dsn;
325 5 50       64 @dsn{ 'scheme', 'driver', 'attr_string', 'attr_hash', 'driver_dsn' } = DBI->parse_dsn( $_[0]->dsn )
326             or error( 'param', 'unable to parse DSN: ', $_[0]->dsn );
327              
328 5         208 my @driver_dsn = split( /;/, $dsn{driver_dsn} );
329 5         29 my %driver_dsn = map { split( /=/, $_, 2 ) } @driver_dsn;
  5         34  
330 5         20 $dsn{driver_dsn} = \%driver_dsn;
331 5         48 \%dsn;
332             },
333             );
334              
335             sub _dbi_driver {
336 30     30   808 $_[0]->_dsn_components->{driver};
337             }
338              
339             my %producer = (
340             DB2 => 'DB2',
341             MySQL => 'mysql',
342             Oracle => 'Oracle',
343             Pg => 'PostgreSQL',
344             SQLServer => 'SQLServer',
345             SQLite => 'SQLite',
346             Sybase => 'Sybase',
347             );
348              
349             has _producer => (
350             is => 'lazy',
351             init_arg => undef,
352             builder => sub {
353 5     5   64 my $dbi_driver = $_[0]->_dbi_driver;
354 5 50       295 $producer{$dbi_driver} || $dbi_driver;
355             },
356             );
357              
358             has _is_file_based => (
359             is => 'lazy',
360             init_arg => undef,
361             builder => sub {
362 5     5   64 my $dbi_driver = $_[0]->_dbi_driver;
363 5         77 return $dbi_driver eq 'SQLite';
364             },
365             );
366              
367              
368              
369              
370              
371              
372              
373             sub setup { ## no critic (Subroutines::ProhibitExcessComplexity)
374 5     5 0 306 my $self = shift;
375              
376 5 50       30 return if $self->_has_dbh;
377              
378 5         54 my %attr = (
379             AutoCommit => !$self->batch,
380             RaiseError => 1,
381             PrintError => 0,
382             ( 'private_' . __PACKAGE__ ) => __FILE__ . __LINE__,
383             );
384              
385              
386 5         21 my $dbd = $self->_dbi_driver;
387              
388 5 50       90 if ( $dbd eq 'Sybase' ) {
389 0         0 $attr{syb_quoted_identifier} = 1;
390             }
391              
392 5 50       36 my $connect = $self->_cached ? 'connect_cached' : 'connect';
393              
394 5 50 33     215 if ( $self->create_output_dir && $self->_is_file_based ) {
395             my $dbname = $self->_dsn_components->{driver_dsn}{dbname}
396 5   33     129 // error( 'param', "unable to parse dbname from DSN (@{[ $self->dsn ]}) to create output dir" );
  0         0  
397              
398 5         85 require Path::Tiny;
399 5         35 my $dir = Path::Tiny::path( $dbname )->parent;
400 5 50       798 eval { $dir->mkdir; } or error( '::create', "unable to create output directory '$dir': $@" );
  5         30  
401             }
402              
403 5 50       2302 $self->_set__dbh( DBI->$connect( $self->dsn, $self->db_user, $self->db_pass, \%attr ) )
404             or error( 'connect', 'error connecting to ', $self->dsn, "\n" );
405              
406 5 50       7625 $self->_dbh->trace( $self->dbitrace )
407             if $self->dbitrace;
408              
409 5 50 33     46 if ( $self->drop_table || ( $self->create_table && !( my $table_exists = $self->_table_exists ) ) )
      66        
410             {
411             # in case the first half of the conditional in the above if
412             # statement is true, the call to $self->_table_exists isn't
413             # made, so $table_exists is not defined.
414 5   66     22 $table_exists //= $self->_table_exists;
415              
416             ## no critic (Variables::ProhibitUnusedVarsStricter)
417 5 50       21 my @guards = $dbd eq 'Sybase' ? _patch_sqlt_producer_sybase() : ();
418              
419             my $tr = SQL::Translator->new(
420             from => sub {
421 5     5   3426 my $schema = $_[0]->schema;
422 5 50       4468 my $table = $schema->add_table( name => $self->_fq_table_name )
423             or error( 'schema', $schema->error );
424              
425 5         6095 for my $field_name ( @{ $self->output_fields } ) {
  5         108  
426             $table->add_field(
427             name => $field_name,
428 15 50       16162 data_type => $self->output_types->{$field_name} ) or error( 'schema', $table->error );
429             }
430              
431 5 50       9423 if ( @{ $self->primary } ) {
  5         36  
432 0 0       0 $table->primary_key( @{ $self->primary } )
  0         0  
433             or error( 'schema', $table->error );
434             }
435              
436 5         24 1;
437             },
438 5   66     198 to => $self->_producer,
439             producer_args => { no_transaction => 1 },
440             add_drop_table => $self->drop_table && $table_exists,
441             quote_identifiers => $self->quote_identifiers,
442             no_comments => 1,
443             );
444              
445 5         25763 my @sql = $tr->translate;
446 5 50       32379 defined $sql[0] or error( 'schema', $tr->error );
447              
448 5         36 _do_sql( $self->_dbh, 'create', @sql );
449 5 100       75888 $self->_dbh->commit if $self->batch;
450             }
451              
452             my $sql = sprintf(
453             'insert into %s (%s) values (%s)',
454             $self->_dbh->quote_identifier( undef, $self->schema, $self->table ),
455 5         2959 join( q{,}, @{ $self->output_fields } ),
456 5         198 join( q{,}, ( q{?} ) x @{ $self->output_fields } ),
  5         168  
457             );
458              
459 5         125 $self->_set__sth( $self->_dbh->prepare( $sql ) );
460              
461 5         892 return;
462             }
463              
464             # SQL::Translator::Producer::Sybase has behaviors and bugs.
465             sub _patch_sqlt_producer_sybase {
466              
467 0     0   0 require SQL::Translator::Producer::Sybase;
468 0         0 require Monkey::Patch;
469 0         0 my %scope;
470              
471             my @guards;
472              
473             # Behavior: It tries to be 'helpful' and renames a table if it
474             # collides with a table it has already seen. There's no way to
475             # turn that off. In this case, we may dropping/creating a table
476             # multiple times, and having the table name change is not useful.
477             # We turn its concept of "global name space" into a local one for
478             # tables/indices/constraints. args[2] is the namespace hash, and
479             # mk_name is currently passed a non-empty entry *only* for fields,
480             # so we make sure to only give it a hash when its not specified
481             # (and thus not a field), as we don't want to mix field names and
482             # the table/indices/constraints level names.
483              
484             push @guards, Monkey::Patch::patch_package(
485             'SQL::Translator::Producer::Sybase',
486             'mk_name',
487             sub {
488 0     0   0 my ( $orig, @args ) = @_;
489 0   0     0 $args[2] ||= \%scope;
490 0         0 $orig->( @args );
491 0         0 } );
492              
493             # Bug: add_drop_table attribute doesn't add the drop table
494             # command. future proof by chcking if the Sybase producer
495             # implements it correctly and only monkey-patching it if it
496             # doesn't.
497 0         0 my $need_to_add_drop_table = do {
498             my $tr = SQL::Translator->new(
499             from => sub {
500 0     0   0 require SQL::Translator::Schema::Table;
501 0         0 my $table = SQL::Translator::Schema::Table->new( name => 'foo' );
502 0         0 $table->add_field( name => 'foo_id', date_type => 'integer' );
503 0         0 $_[0]->schema->add_table( $table );
504 0         0 1;
505             },
506 0         0 to => 'Sybase',
507             add_drop_table => 1,
508             no_comments => 1,
509             );
510 0         0 $tr->translate !~ /DROP TABLE/;
511             };
512              
513 0 0       0 if ( $need_to_add_drop_table ) {
514             push @guards, Monkey::Patch::patch_package(
515             'SQL::Translator::Producer::Sybase',
516             'produce',
517             sub {
518 0     0   0 my ( $orig, @args ) = @_;
519              
520 0         0 my $translator = $args[0];
521              
522 0 0       0 return $orig->( @args ) unless $translator->add_drop_table;
523              
524 0 0       0 if ( wantarray ) { ## no critic (Community::Wantarray)
525 0         0 my @orig = $orig->( @args );
526 0 0       0 return map { /CREATE TABLE (\S+) [(]/ ? ( qq[DROP TABLE $1\n\n], $_ ) : $_ } @orig;
  0         0  
527             }
528             else {
529 0         0 my $output = $orig->( @args );
530 0         0 $output =~ s/CREATE TABLE\s+(\S+)\s+[(]/DROP TABLE $1\n$&/g;
531 0         0 return $output;
532             }
533              
534 0         0 } );
535              
536             }
537              
538 0         0 return @guards;
539             }
540              
541             sub _do_sql {
542 5     5   16 my ( $dbh, $subsys, @sql ) = @_;
543              
544 5         125 my $in_txn = !$dbh->{AutoCommit};
545              
546 5         18 my $statement;
547             eval {
548 5 100       28 $dbh->begin_work unless $in_txn;
549 5         33 while ( $statement = shift @sql ) {
550 6 50       604 die if !defined $dbh->do( $statement );
551             }
552 5 100       16068 $dbh->commit unless $in_txn;
553 5         52 1;
554 5 50       10 } or do {
555 0         0 my $e = $@;
556 0 0       0 $dbh->rollback unless $in_txn;
557 0         0 $statement =~ s/\n/ /g;
558 0         0 error( $subsys, { msg => $e, payload => $statement } );
559             };
560              
561             }
562              
563              
564              
565              
566              
567              
568              
569              
570              
571              
572              
573              
574              
575              
576              
577              
578              
579              
580              
581              
582              
583              
584              
585              
586              
587              
588              
589              
590              
591              
592              
593              
594              
595              
596              
597              
598              
599              
600              
601              
602              
603              
604              
605              
606              
607              
608              
609              
610              
611              
612              
613              
614             sub flush {
615 7     7 1 19 my $self = shift;
616              
617 7 50       30 return 1 unless $self->_has_dbh;
618              
619 7         20 my $queue = $self->queue;
620              
621 7 100       12 if ( @{$queue} ) {
  7         26  
622 6         11 my $last;
623              
624 6         10 my $ret = eval { $self->_sth->execute( @$last ) while $last = shift @{$queue}; 1; };
  6         12  
  30         2455  
  6         22  
625 6         15 my $error = $@;
626              
627 6         126820 $self->_dbh->commit;
628              
629 6 50       69 if ( !defined $ret ) {
630 0         0 unshift @{$queue}, $last;
  0         0  
631              
632 0         0 my %query;
633 0         0 @query{ @{ $self->output_fields } } = @$last;
  0         0  
634 0         0 error( 'insert', { msg => "Transaction aborted: $error", payload => \%query } );
635             }
636             }
637              
638 7         69 1;
639             }
640              
641              
642              
643              
644              
645              
646              
647              
648              
649              
650              
651              
652              
653              
654              
655              
656              
657              
658             sub send {
659 30     30 1 12085 my $self = shift;
660              
661 30 100       119 if ( $self->batch ) {
662 24         35 push @{ $self->queue }, [ @{ $_[0] }{ @{ $self->output_fields } } ];
  24         59  
  24         280  
  24         461  
663             $self->flush
664 24 100       75 if @{ $self->queue } == $self->batch;
  24         204  
665              
666             }
667             else {
668 6 50       14 eval { $self->_sth->execute( @{ $_[0] }{ @{ $self->output_fields } } ); 1; }
  6         28  
  6         162191  
  6         149  
  6         172  
669             or error( 'insert', { msg => "record insertion failed: $@", payload => $_[0] } );
670             }
671             }
672              
673              
674             after '_trigger_output_fields' => sub {
675             $_[0]->clear_column_defs;
676             };
677              
678             after '_trigger_output_types' => sub {
679             $_[0]->clear_column_defs;
680             };
681              
682              
683              
684              
685              
686              
687              
688              
689              
690              
691              
692              
693              
694              
695              
696              
697              
698              
699              
700              
701              
702             sub close {
703 5     5 1 1105 my $self = shift;
704              
705 5 50       35 return 1 unless $self->_has_dbh;
706              
707 5 100       36 $self->flush if $self->batch;
708 5         901 $self->_dbh->disconnect;
709 5         254 $self->_clear_dbh;
710              
711 5         96 1;
712             }
713              
714              
715              
716              
717              
718              
719              
720              
721              
722              
723              
724              
725             sub DEMOLISH {
726 5     5 1 67947 my $self = shift;
727              
728             warnings::warnif( 'Data::Record::Serialize::Encode::dbi::queue',
729             __PACKAGE__ . ': record queue is not empty in object destruction' )
730 5 50       15 if @{ $self->queue };
  5         39  
731              
732 5 50       118 $self->_dbh->disconnect
733             if $self->_has_dbh;
734              
735             }
736              
737              
738             # these are required by the Sink/Encode interfaces but should never be
739             # called in the ordinary run of things.
740              
741              
742              
743              
744              
745              
746              
747              
748             with 'Data::Record::Serialize::Role::EncodeAndSink';
749              
750             1;
751              
752             #
753             # This file is part of Data-Record-Serialize-Encode-dbi
754             #
755             # This software is Copyright (c) 2017 by Smithsonian Astrophysical Observatory.
756             #
757             # This is free software, licensed under:
758             #
759             # The GNU General Public License, Version 3, June 2007
760             #
761              
762             __END__