File Coverage

blib/lib/App/BigQuery/Importer/MySQL.pm
Criterion Covered Total %
statement 32 92 34.7
branch 2 32 6.2
condition n/a
subroutine 9 10 90.0
pod 0 2 0.0
total 43 136 31.6


line stmt bran cond sub pod time code
1             package App::BigQuery::Importer::MySQL;
2 2     2   18860 use 5.008001;
  2         7  
  2         65  
3 2     2   9 use strict;
  2         1  
  2         48  
4 2     2   15 use warnings;
  2         2  
  2         55  
5 2     2   1339 use Getopt::Long qw(:config posix_default no_ignore_case gnu_compat);
  2         20319  
  2         12  
6 2     2   1934 use File::Temp qw(tempfile tempdir);
  2         31572  
  2         126  
7 2     2   14 use File::Basename;
  2         3  
  2         125  
8 2     2   2791 use DBI;
  2         27443  
  2         129  
9 2     2   15 use Carp qw(croak);
  2         3  
  2         1501  
10              
11             our $VERSION = "0.021";
12              
13             sub new {
14 8     8 0 10275 my ($class, $args) = @_;
15              
16 8         32 my @required_list = qw/ src dst mysqlhost mysqluser mysqlpassword project_id progs /;
17 8         12 my $obj_data = +{};
18 8         13 for my $required (@required_list) {
19 35 100       69 if( ! defined $args->{$required} ) { croak "$required is required"};
  7         910  
20 28         46 $obj_data->{$required} = $args->{$required};
21             }
22 1         8 $obj_data->{dryrun} = $args->{dryrun},
23              
24             bless $obj_data, $class;
25             }
26              
27             sub run {
28 0     0 0   my $self = shift;
29              
30 0           my $db_host = $self->{'mysqlhost'};
31 0           my ($src_schema, $src_table) = split /\./, $self->{'src'};
32 0           my ($dst_schema, $dst_table) = split /\./, $self->{'dst'};
33              
34             # check the table does not have BLOB or TEXT
35 0           my $dbh = DBI->connect("dbi:mysql:${src_schema}:${db_host}", $self->{'mysqluser'}, $self->{'mysqlpassword'});
36 0           my $blob_text_check_sql = "SELECT SUM(IF((DATA_TYPE LIKE '%blob%' OR DATA_TYPE LIKE '%text%'),1, 0)) AS cnt
37             FROM INFORMATION_SCHEMA.columns
38             WHERE TABLE_SCHEMA = '${src_schema}' AND TABLE_NAME = '${src_table}'";
39 0           my $cnt = $dbh->selectrow_hashref($blob_text_check_sql);
40 0 0         if ($cnt->{cnt} > 0) {
41 0           die "${src_schema}.${src_table} has BLOB or TEXT table";
42             }
43              
44             # create BigQuery schema json structure
45 0           my $schema_type_check_sql = "SELECT
46             CONCAT('{\"name\": \"', COLUMN_NAME, '\",\"type\":\"', IF(DATA_TYPE LIKE \"%int%\", \"INTEGER\",IF(DATA_TYPE = \"decimal\",\"FLOAT\",\"STRING\")) , '\"}') AS json
47             FROM INFORMATION_SCHEMA.columns where TABLE_SCHEMA = '${src_schema}' AND TABLE_NAME = '${src_table}'";
48 0           my $rows = $dbh->selectall_arrayref($schema_type_check_sql);
49 0           my @schemas;
50 0           for my $row (@$rows) {
51 0           push @schemas, @$row[0];
52             }
53 0           my $bq_schema_json = '[' . join(',', @schemas) . ']';
54 0           my($bq_schema_json_fh, $bq_schema_json_filename) = tempfile;
55 0 0         unless ($self->{'dryrun'}) {
56 0           print {$bq_schema_json_fh} $bq_schema_json;
  0            
57             }
58              
59             # create temporary bucket
60 0           my $bucket_name = $src_table . '_' . time;
61 0 0         unless ($self->{'dryrun'}) {
62 0           my $mb_command = "$self->{'progs'}->{'gsutil'} mb -p $self->{'project_id'} gs://$bucket_name";
63 0           my $result_create_bucket = system($mb_command);
64 0 0         if ($result_create_bucket != 0) {
65 0           die "${mb_command} : failed";
66             }
67             }
68              
69             # dump table data
70 0           my $dump_command = "$self->{'progs'}->{'mysql'} -u$self->{'mysqluser'} -p'$self->{'mysqlpassword'}' -h$self->{'mysqlhost'} ${src_schema} -Bse'SELECT * FROM ${src_table}'";
71 0           my $dump_result = `$dump_command`;
72 0 0         if ($? != 0) {
73 0           die "${dump_command} : failed";
74             }
75 0           $dump_result =~ s/\"//g;
76 0           $dump_result =~ s/NULL//g;
77 0           my($src_dump_fh, $src_dump_filename) = tempfile;
78 0 0         unless ($self->{'dryrun'}) {
79 0           print {$src_dump_fh} $dump_result;
  0            
80             }
81              
82             # upload dump data
83 0           my $dump_upload_command = "$self->{'progs'}->{'gsutil'} cp $src_dump_filename gs://$bucket_name";
84 0 0         unless ($self->{'dryrun'}) {
85 0           my $result_upload_schema = system($dump_upload_command);
86 0 0         if ($result_upload_schema != 0) {
87 0           die "${dump_upload_command} : failed";
88             }
89             }
90              
91             # copy to BigQuery
92 0           my $remove_table_command = "$self->{'progs'}->{'bq'} rm -f $self->{'dst'}";
93 0           my $src_dump_file_basename = basename($src_dump_filename);
94 0 0         unless ($self->{'dryrun'}) {
95 0           my $result_remove_table = system($remove_table_command);
96 0 0         if ($result_remove_table != 0) {
97 0           die "${remove_table_command} : failed";
98             }
99 0           my $load_dump_command = "$self->{'progs'}->{'bq'} load -F '\\t' --max_bad_record=300 $self->{'dst'} gs://${bucket_name}/${src_dump_file_basename} ${bq_schema_json_filename}";
100 0           my $result_load_dump = system($load_dump_command);
101 0 0         if ($result_load_dump != 0) {
102 0           die "${load_dump_command} : failed";
103             }
104             }
105              
106             # remove dump data
107 0           my $dump_rm_command = "$self->{'progs'}->{'gsutil'} rm gs://${bucket_name}/${src_dump_file_basename}";
108 0 0         unless ($self->{'dryrun'}) {
109 0           my $result_dump_rm = system($dump_rm_command);
110 0 0         if ($result_dump_rm != 0) {
111 0           die "${dump_rm_command} : failed";
112             }
113             }
114              
115             # remove bucket
116 0           my $bucket_rm_command = "$self->{'progs'}->{'gsutil'} rb -f gs://$bucket_name";
117 0 0         unless ($self->{'dryrun'}) {
118 0           my $result_bucket_rm = system($bucket_rm_command);
119 0 0         if ($result_bucket_rm != 0) {
120 0           die "${dump_rm_command} : failed";
121             }
122             }
123             }
124              
125             1;
126             __END__