File Coverage

blib/lib/ClickHouse.pm
Criterion Covered Total %
statement 32 182 17.5
branch 0 52 0.0
condition 0 14 0.0
subroutine 11 36 30.5
pod 3 6 50.0
total 46 290 15.8


line stmt bran cond sub pod time code
1             package ClickHouse;
2              
3 1     1   46887 use 5.010;
  1         5  
4 1     1   9 use strict;
  1         3  
  1         32  
5 1     1   7 use warnings FATAL => 'all';
  1         2  
  1         72  
6              
7             our $VERSION = '0.03';
8              
9 1     1   433 use Net::HTTP;
  1         41939  
  1         6  
10 1     1   378 use URI;
  1         2  
  1         14  
11 1     1   4 use URI::Escape;
  1         1  
  1         45  
12 1     1   237 use URI::QueryParam;
  1         548  
  1         25  
13 1     1   5 use Carp;
  1         1  
  1         57  
14 1     1   6 use Scalar::Util qw/looks_like_number/;
  1         2  
  1         36  
15 1     1   277 use Try::Tiny;
  1         1385  
  1         470  
16              
17             our $AUTOLOAD;
18              
19             sub new {
20 0     0 1   my ($class, %opts) = @_;
21 0           my $self = bless {}, $class;
22 0           $self->_init(%opts);
23 0           return $self;
24             }
25              
26             {
27             my %_attrs = (
28             '_host' => 'localhost',
29             '_port' => 8123,
30             '_database' => 'default',
31             '_user' => '',
32             '_password' => '',
33             '_keep_alive' => 1,
34             '_format' => 'TabSeparated',
35             '_socket' => undef,
36             '_uri' => undef,
37             );
38              
39             #
40             # CLASS METHODS
41             #
42             # Returns a copy of the instance.
43             sub _clone {
44 0     0     my ($self) = @_;
45 0           my ($clone) = {%$self};
46 0           bless( $clone, ref $self );
47 0           return ($clone);
48             }
49              
50             # Verify that an attribute is valid (called by the AUTOLOAD sub)
51             sub _accessible {
52 0     0     my ( $self, $name ) = @_;
53 0 0         if ( exists $_attrs{$name} ) {
54              
55             #$self->verbose("attribute $name is valid");
56 0           return 1;
57             }
58 0           else { return 0; }
59             }
60              
61             # Initialize the object (only called by the constructor)
62             sub _init {
63 0     0     my ( $self, %args ) = @_;
64              
65 0           foreach my $key ( keys %_attrs ) {
66 0           $key =~ s/^_+//;
67 0 0 0       if ( defined ($args{$key}) && $self->_accessible( "_$key" ) ) {
68 0           $self->{"_$key"} = $args{$key};
69             }
70             else {
71 0           $self->{"_$key"} = $_attrs{"_$key"};
72             }
73             }
74 0           $self->{'_builder'} = \&_builder;
75              
76 0           $self->_connect();
77              
78 0           return 1;
79             }
80              
81             sub _builder {
82 0     0     my ($self) = @_;
83 0           delete $self->{'_socket'};
84 0           delete $self->{'_uri'};
85              
86             # create Net::HTTP object
87             my $socket = Net::HTTP->new(
88             'Host' => $self->{'_host'},
89             'PeerPort' => $self->{'_port'},
90             'HTTPVersion' => '1.1',
91 0 0         'KeepAlive' => $self->{'_keep_alive'},
92              
93             ) or die "Can't connect: $@";
94              
95             # create URI object
96 0           my $uri = URI->new(sprintf ("http://%s:%d/?database=%s", $self->{'_host'}, $self->{'_port'}, $self->{'_database'}));
97 0 0         $uri->query_param('user' => $self->{'_user'}) if $self->{'_user'};
98 0 0         $uri->query_param('password' => $self->{'_password'}) if $self->{'_password'};
99              
100 0           $self->{'_socket'} = $socket;
101 0           $self->{'_uri'} = $uri;
102              
103 0           return 1;
104              
105             }
106              
107             sub _connect {
108 0     0     my ($self) = @_;
109 0           $self->_builder($self);
110 0           return 1;
111             }
112              
113             sub _query {
114 0     0     my ($self, $cb) = @_;
115             return &try (
116             $cb,
117             catch {
118 0     0     $self->_connect();
119 0           $cb->();
120             }
121 0           );
122             }
123             }
124              
125             sub ClickHouse::AUTOLOAD {
126 1     1   7 no strict 'refs';
  1         1  
  1         980  
127 0     0     my ( $self, $value ) = @_;
128 0 0 0       if ( ( $AUTOLOAD =~ /.*::_get(_\w+)/ ) && ( $self->_accessible($1) ) ) {
129 0           my $attr_name = $1;
130 0     0     *{$AUTOLOAD} = sub { return $_[0]->{$attr_name} };
  0            
  0            
131 0           return ( $self->{$attr_name} );
132             }
133 0 0 0       if ( $AUTOLOAD =~ /.*::_set(_\w+)/ && $self->_accessible($1) ) {
134 0           my $attr_name = $1;
135 0     0     *{$AUTOLOAD} = sub { $_[0]->{$attr_name} = $_[1]; return; };
  0            
  0            
  0            
136 0           $self->{$1} = $value;
137 0           return;
138             }
139 0           croak "No such method: $AUTOLOAD";
140             }
141              
142       0     sub DESTROY {}
143              
144             sub disconnect {
145 0     0 0   my ($self) = @_;
146 0           my $socket = $self->_get_socket();
147 0           $socket->keep_alive(0);
148 0           $self->ping();
149              
150 0           return 1;
151             }
152              
153              
154              
155             sub select {
156 0     0 1   my ($self, $query) = @_;
157             return $self->_query(sub {
158 0     0     my $method;
159             my $query_url;
160 0           my @post_data = ();
161 0 0         if (length ($query) <= 7000) {
162 0           $query_url = $self->_construct_query_uri( $query );
163 0           $method = 'GET';
164             }
165             else {
166 0           $query_url = $self->_get_uri()->clone();
167 0           $method = 'POST';
168 0           push @post_data, $query;
169             }
170              
171              
172 0           $self->_get_socket()->write_request( $method => $query_url, @post_data );
173 0           return $self->_parse_response($query);
174 0           });
175             }
176              
177             sub select_value {
178 0     0 0   my ($self, $query) = @_;
179              
180 0           my $arrayref = $self->select($query);
181 0           return $arrayref->[0]->[0];
182             }
183              
184             sub do {
185 0     0 1   my ($self, $query, @rows) = @_;
186             return $self->_query(sub {
187 0     0     my @prepared_rows = $self->_prepare_query(@rows);
188 0           my $query_url = $self->_construct_query_uri($query);
189 0 0         my $post_data = scalar @prepared_rows ? join (",", map { "(" . join (",", @{ $_ }) . ")" } @prepared_rows) : "\n" ;
  0            
  0            
190              
191 0           $self->_get_socket()->write_request('POST' => $query_url, $post_data);
192 0           return $self->_parse_response($query);
193 0           });
194              
195             }
196              
197             sub ping {
198 0     0 0   my ($self) = @_;
199              
200 0           my ($code) = eval {
201 0           $self->_get_socket()->write_request('GET' => '/');
202 0           $self->_get_socket()->read_response_headers();
203             };
204              
205 0 0         if ($@) {
206 0           return 0;
207             }
208 0 0         unless ($code == 200) {
209 0           return 0;
210             }
211 0           my $result = $self->_read_body();
212 0 0         unless ($result->[0] eq 'Ok.' ) {
213 0           return 0;
214             }
215 0           return 1;
216             }
217              
218             sub _parse_response {
219 0     0     my ($self, $query) = @_;
220 0           my ($code, $mess) = $self->_get_socket()->read_response_headers();
221 0 0         if ($code == 200 ) {
222 0           return _formaty_query_result( $self->_read_body() );
223             }
224             else {
225 0           my $add_mess = _formaty_query_result( $self->_read_body() );
226 0 0 0       if (defined $add_mess) { $add_mess = $add_mess->[0]->[0] // '' };
  0            
227 0           die "ClickHouse error: $mess ($add_mess)\n\t$query";
228             }
229             }
230              
231             sub _read_body {
232 0     0     my ($self) = @_;
233              
234 0           my @response;
235 0           my $chunk = '';
236 0           while (1) {
237 0           my $buf;
238 0           my $n = $self->_get_socket()->read_entity_body($buf, 1024);
239 0 0         die "can't read response: $!" unless defined $n;
240 0 0         last unless $n;
241 0           $buf = $chunk . $buf;
242 0           push @response, split (/\n/, $buf);
243 0 0         $chunk = substr ($buf,-1) eq "\n" ? '' : pop @response;
244             }
245 0 0         push @response, $chunk if $chunk;
246 0           return \@response;
247             }
248              
249             sub _formaty_query_result {
250 0     0     my ($query_result) = @_;
251 0           return [ map { [ split (/\t/) ] } @{ $query_result } ];
  0            
  0            
252             }
253              
254              
255             sub _construct_query_uri {
256 0     0     my ($self, $query) = @_;
257              
258 0           my $query_uri = $self->_get_uri()->clone();
259 0           $query_uri->query_param('query' => $query);
260              
261 0           return $query_uri->as_string();
262             }
263              
264             sub _prepare_query {
265 0     0     my ($class, @rows) = @_;
266 0           my @clone_rows = map { [@$_] } @rows;
  0            
267 0           foreach my $row (@clone_rows) {
268 0           foreach my $value (@$row) {
269 0           my $type = 'NUMBER';
270 0 0         if (ref $value eq 'HASH') {
271 0           $type = $value->{'TYPE'};
272 0           $value = $value->{'VALUE'};
273             }
274 0 0         unless (defined ($value)) {
275 0           $type = 'NULL';
276             }
277 0 0         if (ref $value eq 'ARRAY') {
278 0           $type = 'ARRAY';
279             }
280 0 0 0       if ( defined ($value) && !looks_like_number ($value)) {
281 0           $type = 'STRING';
282             }
283 0           $value = _escape_value($value, $type);
284             }
285             }
286 0           return @clone_rows;
287             }
288              
289             sub _escape_value {
290 0     0     my ($value, $type) = @_;
291 0 0         if ($type eq 'NULL') {
    0          
    0          
292 0           $value = qq{''};
293             }
294             elsif ($type eq 'STRING') {
295 0 0         utf8::encode($value) if utf8::is_utf8($value);
296 0           $value =~ s{\\}{\\\\}g;
297 0           $value =~ s/'/\\'/g;
298 0           $value = qq{'$value'};
299             }
300             elsif ($type eq 'ARRAY') {
301 0           $value = q{'} . join ("','", @$value) . q{'};
302             }
303 0           return $value;
304             }
305              
306             1;
307              
308             __END__