File Coverage

blib/lib/App/BigQuery/Importer/MySQL.pm
Criterion Covered Total %
statement 31 98 31.6
branch 2 36 5.5
condition n/a
subroutine 9 11 81.8
pod 0 2 0.0
total 42 147 28.5


line stmt bran cond sub pod time code
1             package App::BigQuery::Importer::MySQL;
2 3     3   77332 use 5.008001;
  3         8  
3 3     3   13 use strict;
  3         3  
  3         56  
4 3     3   17 use warnings;
  3         4  
  3         103  
5 3     3   2024 use Getopt::Long qw(:config posix_default no_ignore_case gnu_compat);
  3         23885  
  3         11  
6 3     3   1869 use File::Temp qw(tempfile);
  3         27649  
  3         148  
7 3     3   14 use File::Basename;
  3         3  
  3         165  
8 3     3   2669 use DBI;
  3         23757  
  3         145  
9 3     3   17 use Carp qw(croak);
  3         3  
  3         2226  
10              
11             our $VERSION = "0.024";
12              
13             sub new {
14 8     8 0 6921 my ($class, $args) = @_;
15              
16 8         18 my @required_list = qw/ src dst mysqlhost mysqluser mysqlpassword project_id progs /;
17 8         5 my $obj_data = +{};
18 8         12 for my $required (@required_list) {
19 35 100       92 if( ! defined $args->{$required} ) { croak "$required is required"};
  7         644  
20 28         29 $obj_data->{$required} = $args->{$required};
21             }
22             $obj_data->{dryrun} = $args->{dryrun},
23             $obj_data->{allow_text_type} = $args->{allow_text_type},
24              
25 1         6 bless $obj_data, $class;
26             }
27              
28             sub run {
29 0     0 0   my $self = shift;
30              
31 0           my $db_host = $self->{'mysqlhost'};
32 0           my ($src_schema, $src_table) = split /\./, $self->{'src'};
33 0           my ($dst_schema, $dst_table) = split /\./, $self->{'dst'};
34              
35             # check the table does not have BLOB or TEXT
36 0           my $dbh = DBI->connect("dbi:mysql:${src_schema}:${db_host}", $self->{'mysqluser'}, $self->{'mysqlpassword'});
37              
38             $self->_check_columns(
39             +{
40             dbh => $dbh,
41             src_schema => $src_schema,
42             src_table => $src_table,
43 0           allow_text_type => $self->{'allow_text_type'},
44             }
45             );
46              
47             # create BigQuery schema json structure
48 0           my $schema_type_check_sql = "SELECT
49             CONCAT('{\"name\": \"', COLUMN_NAME, '\",\"type\":\"', IF(DATA_TYPE LIKE \"%int%\", \"INTEGER\",IF(DATA_TYPE = \"decimal\",\"FLOAT\",\"STRING\")) , '\"}') AS json
50             FROM INFORMATION_SCHEMA.columns where TABLE_SCHEMA = '${src_schema}' AND TABLE_NAME = '${src_table}'";
51 0           my $rows = $dbh->selectall_arrayref($schema_type_check_sql);
52 0           my @schemas;
53 0           for my $row (@$rows) {
54 0           push @schemas, @$row[0];
55             }
56 0           my $bq_schema_json = '[' . join(',', @schemas) . ']';
57 0           my($bq_schema_json_fh, $bq_schema_json_filename) = tempfile(UNLINK => 1);
58 0 0         unless ($self->{'dryrun'}) {
59 0           print {$bq_schema_json_fh} $bq_schema_json;
  0            
60             }
61              
62             # create temporary bucket
63 0           my $bucket_name = $src_table . '_' . time;
64 0 0         unless ($self->{'dryrun'}) {
65 0           my $mb_command = "$self->{'progs'}->{'gsutil'} mb -p $self->{'project_id'} gs://$bucket_name";
66 0           my $result_create_bucket = system($mb_command);
67 0 0         if ($result_create_bucket != 0) {
68 0           die "${mb_command} : failed";
69             }
70             }
71              
72             # dump table data
73 0           my $dump_command = "$self->{'progs'}->{'mysql'} -u$self->{'mysqluser'} -p'$self->{'mysqlpassword'}' -h$self->{'mysqlhost'} ${src_schema} -Bse'SELECT * FROM ${src_table}'";
74 0           my $dump_result = `$dump_command`;
75 0 0         if ($? != 0) {
76 0           die "${dump_command} : failed";
77             }
78 0           $dump_result =~ s/\"//g;
79 0           $dump_result =~ s/NULL//g;
80 0           my($src_dump_fh, $src_dump_filename) = tempfile(UNLINK => 1);
81 0 0         unless ($self->{'dryrun'}) {
82 0           print {$src_dump_fh} $dump_result;
  0            
83             }
84              
85             # upload dump data
86 0           my $dump_upload_command = "$self->{'progs'}->{'gsutil'} cp $src_dump_filename gs://$bucket_name";
87 0 0         unless ($self->{'dryrun'}) {
88 0           my $result_upload_schema = system($dump_upload_command);
89 0 0         if ($result_upload_schema != 0) {
90 0           die "${dump_upload_command} : failed";
91             }
92             }
93              
94             # copy to BigQuery
95 0           my $remove_table_command = "$self->{'progs'}->{'bq'} rm -f $self->{'dst'}";
96 0           my $src_dump_file_basename = basename($src_dump_filename);
97 0 0         unless ($self->{'dryrun'}) {
98 0           my $result_remove_table = system($remove_table_command);
99 0 0         if ($result_remove_table != 0) {
100 0           die "${remove_table_command} : failed";
101             }
102 0           my $load_dump_command = "$self->{'progs'}->{'bq'} load -F '\\t' --max_bad_record=300 $self->{'dst'} gs://${bucket_name}/${src_dump_file_basename} ${bq_schema_json_filename}";
103 0           my $result_load_dump = system($load_dump_command);
104 0 0         if ($result_load_dump != 0) {
105 0           die "${load_dump_command} : failed";
106             }
107             }
108              
109             # remove dump data
110 0           my $dump_rm_command = "$self->{'progs'}->{'gsutil'} rm gs://${bucket_name}/${src_dump_file_basename}";
111 0 0         unless ($self->{'dryrun'}) {
112 0           my $result_dump_rm = system($dump_rm_command);
113 0 0         if ($result_dump_rm != 0) {
114 0           die "${dump_rm_command} : failed";
115             }
116             }
117              
118             # remove bucket
119 0           my $bucket_rm_command = "$self->{'progs'}->{'gsutil'} rb -f gs://$bucket_name";
120 0 0         unless ($self->{'dryrun'}) {
121 0           my $result_bucket_rm = system($bucket_rm_command);
122 0 0         if ($result_bucket_rm != 0) {
123 0           die "${dump_rm_command} : failed";
124             }
125             }
126             }
127              
128             sub _check_columns {
129 0     0     my ($self, $args) = @_;
130              
131 0           my $blob_check_sql = "SELECT SUM(IF((DATA_TYPE LIKE '%blob%'),1, 0)) AS cnt
132             FROM INFORMATION_SCHEMA.columns
133             WHERE TABLE_SCHEMA = '$args->{src_schema}' AND TABLE_NAME = '$args->{src_table}'";
134 0           my $cnt = $args->{dbh}->selectrow_hashref($blob_check_sql);
135 0 0         if ($cnt->{cnt} > 0) {
136 0           die "$args->{src_schema}.$args->{src_table} has BLOB table";
137             }
138              
139 0 0         return if $args->{allow_text_type};
140              
141 0           my $text_check_sql = "SELECT SUM(IF((DATA_TYPE LIKE '%text%'),1, 0)) AS cnt
142             FROM INFORMATION_SCHEMA.columns
143             WHERE TABLE_SCHEMA = '$args->{src_schema}' AND TABLE_NAME = '$args->{src_table}'";
144 0           $cnt = $args->{dbh}->selectrow_hashref($text_check_sql);
145 0 0         if ($cnt->{cnt} > 0) {
146 0           die "$args->{src_schema}.$args->{src_table} has TEXT table";
147             }
148             }
149              
150             1;
151             __END__