File Coverage

blib/lib/ClickHouse.pm
Criterion Covered Total %
statement 32 182 17.5
branch 0 54 0.0
condition 0 14 0.0
subroutine 11 36 30.5
pod 3 6 50.0
total 46 292 15.7


line stmt bran cond sub pod time code
1             package ClickHouse;
2              
3 1     1   67936 use 5.010;
  1         4  
4 1     1   5 use strict;
  1         2  
  1         26  
5 1     1   5 use warnings FATAL => 'all';
  1         2  
  1         72  
6              
7             our $VERSION = '0.04';
8              
9 1     1   529 use Net::HTTP;
  1         51778  
  1         12  
10 1     1   564 use URI;
  1         3  
  1         31  
11 1     1   8 use URI::Escape;
  1         1  
  1         74  
12 1     1   502 use URI::QueryParam;
  1         825  
  1         32  
13 1     1   6 use Carp;
  1         2  
  1         63  
14 1     1   6 use Scalar::Util qw/looks_like_number/;
  1         2  
  1         43  
15 1     1   559 use Try::Tiny;
  1         2091  
  1         677  
16              
17             our $AUTOLOAD;
18              
19             sub new {
20 0     0 1   my ($class, %opts) = @_;
21 0           my $self = bless {}, $class;
22 0           $self->_init(%opts);
23 0           return $self;
24             }
25              
26             {
27             my %_attrs = (
28             '_host' => 'localhost',
29             '_port' => 8123,
30             '_database' => 'default',
31             '_user' => '',
32             '_password' => '',
33             '_keep_alive' => 1,
34             '_format' => 'TabSeparated',
35             '_socket' => undef,
36             '_uri' => undef,
37             '_timeout' => 30,
38             );
39              
40             #
41             # CLASS METHODS
42             #
43             # Returns a copy of the instance.
44             sub _clone {
45 0     0     my ($self) = @_;
46 0           my ($clone) = {%$self};
47 0           bless( $clone, ref $self );
48 0           return ($clone);
49             }
50              
51             # Verify that an attribute is valid (called by the AUTOLOAD sub)
52             sub _accessible {
53 0     0     my ( $self, $name ) = @_;
54 0 0         if ( exists $_attrs{$name} ) {
55              
56             #$self->verbose("attribute $name is valid");
57 0           return 1;
58             }
59 0           else { return 0; }
60             }
61              
62             # Initialize the object (only called by the constructor)
63             sub _init {
64 0     0     my ( $self, %args ) = @_;
65              
66 0           foreach my $key ( keys %_attrs ) {
67 0           $key =~ s/^_+//;
68 0 0 0       if ( defined ($args{$key}) && $self->_accessible( "_$key" ) ) {
69 0           $self->{"_$key"} = $args{$key};
70             }
71             else {
72 0           $self->{"_$key"} = $_attrs{"_$key"};
73             }
74             }
75 0           $self->{'_builder'} = \&_builder;
76              
77 0           $self->_connect();
78              
79 0           return 1;
80             }
81              
82             sub _builder {
83 0     0     my ($self) = @_;
84 0           delete $self->{'_socket'};
85 0           delete $self->{'_uri'};
86              
87             # create Net::HTTP object
88             my $socket = Net::HTTP->new(
89             'Host' => $self->{'_host'},
90             'PeerPort' => $self->{'_port'},
91             'HTTPVersion' => '1.1',
92             'KeepAlive' => $self->{'_keep_alive'},
93 0 0         'Timeout' => $self->{'_timeout'},
94              
95             ) or die "Can't connect: $@";
96              
97             # create URI object
98 0           my $uri = URI->new(sprintf ("http://%s:%d/?database=%s", $self->{'_host'}, $self->{'_port'}, $self->{'_database'}));
99 0 0         $uri->query_param('user' => $self->{'_user'}) if $self->{'_user'};
100 0 0         $uri->query_param('password' => $self->{'_password'}) if $self->{'_password'};
101              
102 0           $self->{'_socket'} = $socket;
103 0           $self->{'_uri'} = $uri;
104              
105 0           return 1;
106              
107             }
108              
109             sub _connect {
110 0     0     my ($self) = @_;
111 0           $self->_builder($self);
112 0           return 1;
113             }
114              
115             sub _query {
116 0     0     my ($self, $cb) = @_;
117             return &try (
118             $cb,
119             catch {
120 0     0     $self->_connect();
121 0           $cb->();
122             }
123 0           );
124             }
125             }
126              
127             sub ClickHouse::AUTOLOAD {
128 1     1   9 no strict 'refs';
  1         2  
  1         1629  
129 0     0     my ( $self, $value ) = @_;
130 0 0 0       if ( ( $AUTOLOAD =~ /.*::_get(_\w+)/ ) && ( $self->_accessible($1) ) ) {
131 0           my $attr_name = $1;
132 0     0     *{$AUTOLOAD} = sub { return $_[0]->{$attr_name} };
  0            
  0            
133 0           return ( $self->{$attr_name} );
134             }
135 0 0 0       if ( $AUTOLOAD =~ /.*::_set(_\w+)/ && $self->_accessible($1) ) {
136 0           my $attr_name = $1;
137 0     0     *{$AUTOLOAD} = sub { $_[0]->{$attr_name} = $_[1]; return; };
  0            
  0            
  0            
138 0           $self->{$1} = $value;
139 0           return;
140             }
141 0           croak "No such method: $AUTOLOAD";
142             }
143              
144       0     sub DESTROY {}
145              
146             sub disconnect {
147 0     0 0   my ($self) = @_;
148 0 0         if (my $socket = $self->_get_socket()) {
149 0           $socket->keep_alive(0);
150 0           $self->ping();
151             }
152 0           return 1;
153             }
154              
155              
156              
157             sub select {
158 0     0 1   my ($self, $query) = @_;
159             return $self->_query(sub {
160 0     0     my $method;
161             my $query_url;
162 0           my @post_data = ();
163 0 0         if (length ($query) <= 7000) {
164 0           $query_url = $self->_construct_query_uri( $query );
165 0           $method = 'GET';
166             }
167             else {
168 0           $query_url = $self->_get_uri()->clone();
169 0           $method = 'POST';
170 0           push @post_data, $query;
171             }
172              
173              
174 0           $self->_get_socket()->write_request( $method => $query_url, @post_data );
175 0           return $self->_parse_response($query);
176 0           });
177             }
178              
179             sub select_value {
180 0     0 0   my ($self, $query) = @_;
181              
182 0           my $arrayref = $self->select($query);
183 0           return $arrayref->[0]->[0];
184             }
185              
186             sub do {
187 0     0 1   my ($self, $query, @rows) = @_;
188             return $self->_query(sub {
189 0     0     my @prepared_rows = $self->_prepare_query(@rows);
190 0           my $query_url = $self->_construct_query_uri($query);
191 0 0         my $post_data = scalar @prepared_rows ? join (",", map { "(" . join (",", @{ $_ }) . ")" } @prepared_rows) : "\n" ;
  0            
  0            
192              
193 0           $self->_get_socket()->write_request('POST' => $query_url, $post_data);
194 0           return $self->_parse_response($query);
195 0           });
196              
197             }
198              
199             sub ping {
200 0     0 0   my ($self) = @_;
201              
202 0           my ($code) = eval {
203 0           $self->_get_socket()->write_request('GET' => '/');
204 0           $self->_get_socket()->read_response_headers();
205             };
206              
207 0 0         if ($@) {
208 0           return 0;
209             }
210 0 0         unless ($code == 200) {
211 0           return 0;
212             }
213 0           my $result = $self->_read_body();
214 0 0         unless ($result->[0] eq 'Ok.' ) {
215 0           return 0;
216             }
217 0           return 1;
218             }
219              
220             sub _parse_response {
221 0     0     my ($self, $query) = @_;
222 0           my ($code, $mess) = $self->_get_socket()->read_response_headers();
223 0 0         if ($code == 200 ) {
224 0           return _formaty_query_result( $self->_read_body() );
225             }
226             else {
227 0           my $add_mess = _formaty_query_result( $self->_read_body() );
228 0 0 0       if (defined $add_mess) { $add_mess = $add_mess->[0]->[0] // '' };
  0            
229 0           die "ClickHouse error: $mess ($add_mess)\n\t$query";
230             }
231             }
232              
233             sub _read_body {
234 0     0     my ($self) = @_;
235              
236 0           my @response;
237 0           my $chunk = '';
238 0           while (1) {
239 0           my $buf;
240 0           my $n = $self->_get_socket()->read_entity_body($buf, 1024);
241 0 0         die "can't read response: $!" unless defined $n;
242 0 0         last unless $n;
243 0           $buf = $chunk . $buf;
244 0           push @response, split (/\n/, $buf);
245 0 0         $chunk = substr ($buf,-1) eq "\n" ? '' : pop @response;
246             }
247 0 0         push @response, $chunk if $chunk;
248 0           return \@response;
249             }
250              
251             sub _formaty_query_result {
252 0     0     my ($query_result) = @_;
253 0           return [ map { [ split (/\t/) ] } @{ $query_result } ];
  0            
  0            
254             }
255              
256              
257             sub _construct_query_uri {
258 0     0     my ($self, $query) = @_;
259              
260 0           my $query_uri = $self->_get_uri()->clone();
261 0           $query_uri->query_param('query' => $query);
262              
263 0           return $query_uri->as_string();
264             }
265              
266             sub _prepare_query {
267 0     0     my ($class, @rows) = @_;
268 0           my @clone_rows = map { [@$_] } @rows;
  0            
269 0           foreach my $row (@clone_rows) {
270 0           foreach my $value (@$row) {
271 0           my $type = 'NUMBER';
272 0 0         if (ref $value eq 'HASH') {
273 0           $type = $value->{'TYPE'};
274 0           $value = $value->{'VALUE'};
275             }
276 0 0         unless (defined ($value)) {
277 0           $type = 'NULL';
278             }
279 0 0         if (ref $value eq 'ARRAY') {
280 0           $type = 'ARRAY';
281             }
282 0 0 0       if ( defined ($value) && !looks_like_number ($value)) {
283 0           $type = 'STRING';
284             }
285 0           $value = _escape_value($value, $type);
286             }
287             }
288 0           return @clone_rows;
289             }
290              
291             sub _escape_value {
292 0     0     my ($value, $type) = @_;
293 0 0         if ($type eq 'NULL') {
    0          
    0          
294 0           $value = qq{''};
295             }
296             elsif ($type eq 'STRING') {
297 0 0         utf8::encode($value) if utf8::is_utf8($value);
298 0           $value =~ s{\\}{\\\\}g;
299 0           $value =~ s/'/\\'/g;
300 0           $value = qq{'$value'};
301             }
302             elsif ($type eq 'ARRAY') {
303 0           $value = q{'} . join ("','", @$value) . q{'};
304             }
305 0           return $value;
306             }
307              
308             1;
309              
310             __END__