File Coverage

blib/lib/PGObject/Util/BulkLoad.pm
Criterion Covered Total %
statement 89 149 59.7
branch 8 28 28.5
condition n/a
subroutine 18 30 60.0
pod 7 7 100.0
total 122 214 57.0


line stmt bran cond sub pod time code
1             package PGObject::Util::BulkLoad;
2              
3 3     3   30297 use 5.006;
  3         14  
4 3     3   18 use strict;
  3         7  
  3         82  
5 3     3   19 use warnings FATAL => 'all';
  3         11  
  3         144  
6              
7 3     3   19 use Carp;
  3         5  
  3         225  
8 3     3   1736 use Memoize;
  3         7046  
  3         179  
9 3     3   1986 use Text::CSV;
  3         51533  
  3         159  
10 3     3   1609 use Try::Tiny;
  3         7332  
  3         2896  
11              
12             =head1 NAME
13              
14             PGObject::Util::BulkLoad - Bulk load records into PostgreSQL
15              
16             =head1 VERSION
17              
18             Version 0.06
19              
20             =cut
21              
22             our $VERSION = '0.06';
23              
24              
25             =head1 SYNOPSIS
26              
27             To insert all rows into a table using COPY:
28              
29             PGObject::Util::BulkLoad->copy(
30             {table => 'mytable', insert_cols => ['col1', 'col2'], dbh => $dbh},
31             @objects
32             );
33              
34             To copy to a temp table and then upsert:
35              
36             PGObject::Util::BulkLoad->upsert(
37             {table => 'mytable',
38             insert_cols => ['col1', 'col2'],
39             update_cols => ['col1'],
40             key_cols => ['col2'],
41             dbh => $dbh},
42             @objects
43             );
44              
45             Or if you prefer to run the statements yourself:
46              
47             PGObject::Util::BulkLoad->statement(
48             table => 'mytable', type => 'temp', tempname => 'foo_123'
49             );
50             PGObject::Util::BulkLoad->statement(
51             table => 'mytable', type => 'copy', insert_cols => ['col1', 'col2']
52             );
53             PGObject::Util::BulkLoad->statement(
54             type => 'upsert',
55             tempname => 'foo_123',
56             table => 'mytable',
57             insert_cols => ['col1', 'col2'],
58             update_cols => ['col1'],
59             key_cols => ['col2']
60             );
61              
62             If you are running repetitive calls, you may be able to trade time for memory
63             using Memoize by unning the following:
64              
65             PGObject::Util::BulkLoad->memoize_statements;
66              
67             To unmemoize:
68              
69             PGObject::Util::BulkLoad->unmemoize;
70              
71             To flush cache
72              
73             PGObject::Util::BulkLoad->flush_memoization;
74              
75             As of 0.05, an object oriented interface is included. Note that the memoize
76             calls are global but all other calls are subject to a fairly consistent
77             interface. where the first series of arguments become the arguments (for
78             exceptions see below) for the rest of the actions. This makes behavior
79             management a little easier. Note that in this interface, the object is
80             effectively immutable (or should be), so if you want to change it, create a
81             new object:
82              
83             my $bulkloader = PGObject::Util::BulkLoad->new(
84             table => 'foo',
85             tempname => 'bar', # defaults to "pgobject_bulkload_$table"
86             insert_cols => [qw(foo bar baz baz2)],
87             update_cols => [qw(foo bar)],
88             key_cols => [qw(baz baz2)],
89             dbh => $dbh,
90             );
91             $bulkloader->copy(@objects);
92             $bulkloader->copy_temp(@objects);
93             $bulkloader->upsert(@objects);
94             $bulkloader->reset_temp;
95             $bulkloader->destroy_temp;
96            
97              
98             =head1 DESCRIPTION
99              
100             =head1 SUBROUTINES/METHODS
101              
102             =head2 memoize_statements
103              
104             This function exists to memoize statement calls, i.e. generate the exact same
105             statements on the same argument calls. This isn't too likely to be useful in
106             most cases but it may be if you have repeated bulk loader calls in a persistent
107             script (for example real-time importing of csv data from a frequent source).
108              
109             =cut
110              
111             sub memoize_statements {
112 0     0 1 0 memoize 'statement';
113             }
114              
115             =head2 unmemoize
116              
117             Unmemoizes the statement calls.
118              
119             =cut
120              
121             sub unmemoize {
122 0     0 1 0 Memoize::unmemoize 'statement';
123             }
124              
125             =head2 flush_memoization
126              
127             Flushes the cache for statement memoization. Does *not* flush the cache for
128             escaping memoization since that is a bigger win and a pure function accepting
129             simple strings.
130              
131             =cut
132              
133             sub flush_memoization {
134 0     0 1 0 Memoization::flush_cache('statement');
135             }
136            
137             =head2 statement
138              
139             This takes the following arguments and returns a suitable SQL statement
140              
141             =over
142              
143             =item type
144              
145             Type of statement. Options are:
146              
147             =over
148              
149             =item temp
150              
151             Create a temporary table
152              
153             =item copy
154              
155             sql COPY statement
156              
157             =item upsert
158              
159             Update/Insert CTE pulling temp table
160              
161             =item stats
162              
163             Get stats on pending upsert, grouped by an arbitrary column.
164              
165             =back
166              
167             =item table
168              
169             Name of table
170              
171             =item tempname
172              
173             Name of temp table
174              
175             =item insert_cols
176              
177             Column names for insert
178              
179             =item update_cols
180              
181             Column names for update
182              
183             =item key_cols
184              
185             Names of columns in primary key.
186              
187             =item group_stats_by
188              
189             Names of columns to group stats by
190              
191             =back
192              
193             =cut
194              
195             sub _sanitize_ident {
196 98     98   276 my($string) = @_;
197 98         274 $string =~ s/"/""/g;
198 98         525 qq("$string");
199             }
200              
201             sub _statement_stats {
202 3     3   11 my ($args) = @_;
203 3 50       23 croak 'Key columns must array ref' unless (ref $args->{key_cols}) =~ /ARRAY/;
204 3 50       7 croak 'Must supply key columns' unless @{$args->{key_cols}};
  3         13  
205 3 50       12 croak 'Must supply table name' unless $args->{table};
206 3 50       10 croak 'Must supply temp table' unless $args->{tempname};
207              
208 3         8 my @groupcols;
209             @groupcols = $args->{group_stats_by}
210 3         13 ? @{$args->{group_stats_by}}
211 3 50       12 : @{$args->{key_cols}};
  0         0  
212 3         15 my $table = _sanitize_ident($args->{table});
213 3         11 my $temp = _sanitize_ident($args->{tempname});
214 4         15 "SELECT " . join(', ', map {"$temp." . _sanitize_ident($_)} @groupcols) . ",
215 4         16 SUM(CASE WHEN ROW(" . join(', ', map {"$table." . _sanitize_ident($_)
216 3         12 } @{$args->{key_cols}}) . ") IS NULL
217             THEN 1
218             ELSE 0
219             END) AS pgobject_bulkload_inserts,
220 4         13 SUM(CASE WHEN ROW(" . join(', ', map {"$table." . _sanitize_ident($_)
221 3         11 } @{$args->{key_cols}}) . ") IS NULL
222             THEN 0
223             ELSE 1
224             END) AS pgobject_bulkload_updates
225             FROM $temp
226 4         13 LEFT JOIN $table USING (" . join(', ', map {_sanitize_ident($_)
227 3         11 } @{$args->{key_cols}}) . ")
228 3         9 GROUP BY " . join(', ', map {"$temp." . _sanitize_ident($_)} @groupcols);
  4         14  
229             }
230              
231             sub _statement_temp {
232 3     3   10 my ($args) = @_;
233              
234             "CREATE TEMPORARY TABLE " . _sanitize_ident($args->{tempname}) .
235 3         15 " ( LIKE " . _sanitize_ident($args->{table}) . " )";
236             }
237              
238             sub _statement_copy {
239 3     3   10 my ($args) = @_;
240 3 50       16 croak 'No insert cols' unless $args->{insert_cols};
241              
242             "COPY " . _sanitize_ident($args->{table}) . "(" .
243 3         12 join(', ', map { _sanitize_ident($_) } @{$args->{insert_cols}}) . ') ' .
  9         26  
  3         13  
244             "FROM STDIN WITH CSV";
245             }
246              
247             sub _statement_upsert {
248 3     3   11 my ($args) = @_;
249 3         9 for (qw(insert_cols update_cols key_cols table tempname)){
250 15 50       60 croak "Missing argument $_" unless $args->{$_};
251             }
252 3         12 my $table = _sanitize_ident($args->{table});
253 3         15 my $temp = _sanitize_ident($args->{tempname});
254              
255             "WITH UP AS (
256             UPDATE $table
257             SET " . join(",
258 5         16 ", map { _sanitize_ident($_) . ' = ' .
259 3         25 "$temp." . _sanitize_ident($_)} @{$args->{update_cols}}) . "
260             FROM $temp
261             WHERE " . join("
262 4         14 AND ", map {"$table." . _sanitize_ident($_) . ' = ' .
263 3         12 "$temp." . _sanitize_ident($_)} @{$args->{key_cols}}) . "
264 4         15 RETURNING " . join(", ", map {"$table." . _sanitize_ident($_)} @{$args->{key_cols}}) ."
  3         10  
265             )
266             INSERT INTO $table (" . join(", ",
267 9         26 map {_sanitize_ident($_)} @{$args->{insert_cols}}) . ")
  3         10  
268 9         27 SELECT " . join(", ", map {_sanitize_ident($_)} @{$args->{insert_cols}}) . "
  3         10  
269             FROM $temp
270 4         15 WHERE ROW(". join(", ", map { "$temp." . _sanitize_ident($_)} @{$args->{key_cols}}) .")
  3         10  
271 3         10 NOT IN (SELECT ".join(", ", map { "UP." . _sanitize_ident($_)} @{$args->{key_cols}}) ." FROM UP)";
  4         10  
  3         12  
272              
273             }
274              
275             sub statement {
276 12     12 1 8290 my %args = @_;
277 12 50       62 croak "Missing argument 'type'" unless $args{type};
278 3     3   29 no strict 'refs';
  3         8  
  3         1569  
279 12         40 &{"_statement_$args{type}"}(\%args);
  12         67  
280             }
281              
282             =head2 upsert
283              
284             Creates a temporary table named "pg_object.bulkload" and copies the data there
285              
286             If the first argument is an object, then if there is a function by the name
287             of the object, it will provide the value.
288              
289             =over
290              
291             =item table
292              
293             Table to upsert into
294              
295             =item insert_cols
296              
297             Columns to insert (by name)
298              
299             =item update_cols
300              
301             Columns to update (by name)
302              
303             =item key_cols
304              
305             Key columns (by name)
306              
307             =item group_stats_by
308              
309             This is an array of column names for optional stats retrieval and grouping.
310             If it is set then we will grab the stats and return them. Note this has a
311             performance penalty because it means an extra scan of the temp table and an
312             extra join against the parent table. See get_stats for the return value
313             information if this is set.
314              
315             =back
316              
317             =cut
318              
319             sub _build_args {
320 0     0     my ($init_args, $obj) = @_;
321 0           my @arglist = qw(table insert_cols update_cols key_cols dbh
322             tempname group_stats_by);
323             return {
324 0           map { my $val;
  0            
325 0     0     for my $v ($init_args->{$_}, try { $obj->$_ } ){
  0            
326 0 0         $val = $v if defined $v;
327             }
328 0           $_ => $val;
329             } @arglist
330             }
331             }
332              
333             sub upsert {
334 0     0 1   my ($args) = shift;
335 0 0         $args = shift if $args eq __PACKAGE__;
336             try {
337 0     0     $args->can('foo');
338 0           unshift @_, $args; # args is an object
339 0           };
340 0           $args = _build_args($args, $_[0]);
341 0           my $dbh = $args->{dbh};
342              
343             # pg_temp is the schema of temporary tables. If someone wants to create
344             # a permanent table there, they are inviting disaster. At any rate this is
345             # safe but a plain drop without schema qualification risks losing user data.
346              
347 0           my $return_value;
348              
349 0           $dbh->do("DROP TABLE IF EXISTS pg_temp.pgobject_bulkloader");
350 0           $dbh->do(statement( %$args, (type => 'temp',
351             tempname => 'pgobject_bulkloader')
352             ));
353 0           copy({(%$args, (table => 'pgobject_bulkloader'))}, @_);
354              
355 0 0         if ($args->{group_stats_by}){
356 0           $return_value = get_stats(
357             {(%$args, (tempname => 'pgobject_bulkloader'))}
358             );
359             }
360              
361 0           $dbh->do(statement( %$args, (type => 'upsert',
362             tempname => 'pgobject_bulkloader')));
363 0           my $dropstatus = $dbh->do("DROP TABLE pg_temp.pgobject_bulkloader");
364 0 0         return $return_value if $args->{group_stats_by};
365 0           return $dropstatus;
366             }
367              
368             =head2 copy
369              
370             Copies data into the specified table. The following arguments are used:
371              
372             =over
373              
374             =item table
375              
376             Table to upsert into
377              
378             =item insert_cols
379              
380             Columns to insert (by name)
381              
382             =back
383              
384             =cut
385              
386             sub _to_csv {
387 0     0     my ($args) = shift;
388              
389 0           my $csv = Text::CSV->new();
390             join("\n", map {
391 0           my $obj = $_;
  0            
392 0           $csv->combine(map { $obj->{$_} } @{$args->{cols}});
  0            
  0            
393 0           $csv->string();
394             } @_);
395             }
396              
397             sub copy {
398 0     0 1   my ($args) = shift;
399 0 0         $args = shift if $args eq __PACKAGE__;
400             try {
401 3     3   27 no warnings;
  3         7  
  3         142  
402 3     3   39 no strict;
  3         10  
  3         454  
403 0     0     $args->can('foo');
404 0           unshift @_, $args; # args is an object
405 0           };
406 0           $args = _build_args($args, $_[0]);
407 0           my $dbh = $args->{dbh};
408 0           $dbh->do(statement(%$args, (type => 'copy')));
409 0           $dbh->pg_putcopydata(_to_csv({cols => $args->{insert_cols}}, @_));
410 0           $dbh->pg_putcopyend();
411             }
412              
413             =head2 get_stats
414              
415             Takes the same arguments as upsert plus group_stats_by
416              
417             Returns an array of hashrefs representing the number of inserts and updates
418             that an upsert will perform. It must be performed before the upsert statement
419             actually runs. Typically this is run via the upsert command (which
420             automatically runs this if group_stats_by is set in the argumements hash).
421              
422             There is a performance penalty here since an unindexed left join is required
423             between the temp and the normal table.
424              
425             This function requires tempname, table, and group_stats_by to be set in the
426             argument hashref. The return value is a list of hashrefs with the following
427             keys:
428              
429             =over
430              
431             =item stats
432              
433             Hashref with keys inserts and updates including numbers of rows.
434              
435             =item keys
436              
437             Hashref for key columns and their values, by name
438              
439             =back
440              
441             =cut
442              
443             sub get_stats {
444 0     0 1   my ($args) = shift;
445 0 0         $args = shift if $args eq __PACKAGE__;
446             try {
447 3     3   22 no warnings;
  3         7  
  3         100  
448 3     3   18 no strict;
  3         9  
  3         518  
449 0     0     $args->can('foo');
450 0           unshift @_, $args; # args is an object
451 0           };
452 0           $args = _build_args($args, $_[0]);
453 0           my $dbh = $args->{dbh};
454              
455             my $returnval = [
456             map {
457 0           my @row = @$_;
458             { stats => {
459             updates => pop @row,
460             inserts => pop @row,
461             },
462             keys => {
463 0           map { $_ => shift @row } @{$args->{group_stats_by}}
  0            
  0            
464             },
465             }
466 0           } @{ $dbh->selectall_arrayref(statement(%$args, (type => 'stats'))) }
  0            
467             ];
468             }
469              
470             =head1 AUTHOR
471              
472             Chris Travers, C<< >>
473              
474             =head1 CO-MAINTAINERS
475              
476             =over
477              
478             =item Binary.com, C<< >>
479              
480             =back
481              
482             =head1 BUGS
483              
484             Please report any bugs or feature requests to C, or through
485             the web interface at L. I will be notified, and then you'll
486             automatically be notified of progress on your bug as I make changes.
487              
488              
489              
490              
491             =head1 SUPPORT
492              
493             You can find documentation for this module with the perldoc command.
494              
495             perldoc PGObject::Util::BulkLoad
496              
497              
498             You can also look for information at:
499              
500             =over 4
501              
502             =item * RT: CPAN's request tracker (report bugs here)
503              
504             L
505              
506             =item * AnnoCPAN: Annotated CPAN documentation
507              
508             L
509              
510             =item * CPAN Ratings
511              
512             L
513              
514             =item * Search CPAN
515              
516             L
517              
518             =back
519              
520              
521             =head1 ACKNOWLEDGEMENTS
522              
523              
524             =head1 LICENSE AND COPYRIGHT
525              
526             Copyright 2014 Chris Travers.
527              
528             This program is distributed under the (Revised) BSD License:
529             L
530              
531             Redistribution and use in source and binary forms, with or without
532             modification, are permitted provided that the following conditions
533             are met:
534              
535             * Redistributions of source code must retain the above copyright
536             notice, this list of conditions and the following disclaimer.
537              
538             * Redistributions in binary form must reproduce the above copyright
539             notice, this list of conditions and the following disclaimer in the
540             documentation and/or other materials provided with the distribution.
541              
542             * Neither the name of Chris Travers's Organization
543             nor the names of its contributors may be used to endorse or promote
544             products derived from this software without specific prior written
545             permission.
546              
547             THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
548             "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
549             LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
550             A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
551             OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
552             SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
553             LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
554             DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
555             THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
556             (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
557             OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
558              
559              
560             =cut
561              
562             1; # End of PGObject::Util::BulkUpload