File Coverage

blib/lib/AnyEvent/InfluxDB.pm
Criterion Covered Total %
statement 30 660 4.5
branch 0 316 0.0
condition 0 15 0.0
subroutine 10 91 10.9
pod 37 37 100.0
total 77 1119 6.8


line stmt bran cond sub pod time code
1             #ABSTRACT: An asynchronous library for InfluxDB time-series database
2 1     1   630 use strict;
  1         1  
  1         23  
3 1     1   3 use warnings;
  1         1  
  1         39  
4             package AnyEvent::InfluxDB;
5             our $AUTHORITY = 'cpan:AJGB';
6             $AnyEvent::InfluxDB::VERSION = '1.0.2.0';
7 1     1   873 use AnyEvent;
  1         3788  
  1         23  
8 1     1   573 use AnyEvent::HTTP;
  1         19122  
  1         62  
9 1     1   509 use URI;
  1         3186  
  1         23  
10 1     1   377 use URI::QueryParam;
  1         485  
  1         68  
11 1     1   5 use JSON qw(decode_json);
  1         1  
  1         4  
12 1     1   539 use List::MoreUtils qw(zip);
  1         6988  
  1         5  
13 1     1   1108 use URI::Encode::XS qw( uri_encode );
  1         394  
  1         45  
14 1     1   425 use Moo;
  1         9125  
  1         5  
15              
16             has [qw( ssl_options username password jwt on_request )] => (
17             is => 'ro',
18             predicate => 1,
19             );
20              
21             has 'server' => (
22             is => 'rw',
23             default => 'http://localhost:8086',
24             );
25              
26             has '_is_ssl' => (
27             is => 'lazy',
28             );
29              
30             has '_tls_ctx' => (
31             is => 'lazy',
32             );
33              
34             has '_server_uri' => (
35             is => 'lazy',
36             );
37              
38              
39             sub _build__tls_ctx {
40 0     0     my ($self) = @_;
41              
42             # no ca/hostname checks
43 0 0         return 'low' unless $self->has_ssl_options;
44              
45             # create ctx
46 0           require AnyEvent::TLS;
47 0           return AnyEvent::TLS->new( %{ $self->ssl_options } );
  0            
48             }
49              
50             sub _build__is_ssl {
51 0     0     my ($self) = @_;
52              
53 0           return $self->server =~ /^https/;
54             }
55              
56             sub _build__server_uri {
57 0     0     my ($self) = @_;
58              
59 0           my $url = URI->new( $self->server, 'http' );
60              
61 0 0 0       if ( $self->has_username && $self->has_password ) {
62 0           $url->query_param( 'u' => $self->username );
63 0           $url->query_param( 'p' => $self->password );
64             }
65              
66 0           return $url;
67             }
68              
69             sub _make_url {
70 0     0     my ($self, $path, $params) = @_;
71              
72 0           my $url = $self->_server_uri->clone;
73 0           $url->path($path);
74              
75 0           while ( my ($k, $v) = each %$params ) {
76 0           $url->query_param( $k => $v );
77             }
78              
79 0           return $url;
80             }
81              
82             sub _http_request {
83 0     0     my $cb = pop;
84 0           my ($self, $method, $url, $post_data) = @_;
85              
86 0 0         if ($self->has_on_request) {
87 0           $self->on_request->($method, $url, $post_data);
88             }
89              
90 0           my %args = (
91             headers => {
92             referer => undef,
93             'user-agent' => "AnyEvent-InfluxDB/0.13",
94             }
95             );
96              
97 0 0         if ($self->has_jwt) {
98 0           $args{headers}->{Authorization} = 'Bearer '. $self->jwt;
99             }
100              
101 0 0         if ( $method eq 'POST' ) {
102 0 0         if ( defined $post_data ) {
103 0           $args{'body'} = $post_data;
104             } else {
105 0 0         if ( my $q = $url->query_param_delete('q') ) {
106 0           $args{headers}{'content-type'} = 'application/x-www-form-urlencoded';
107 0           $args{body} = 'q='. uri_encode($q);
108             }
109             }
110             }
111 0 0         if ( $self->_is_ssl ) {
112 0           $args{tls_ctx} = $self->_tls_ctx;
113             }
114              
115 0           my $guard;
116             $guard = http_request
117             $method => $url->as_string,
118             %args,
119             sub {
120 0     0     $cb->(@_);
121 0           undef $guard;
122 0           };
123             };
124              
125              
126             sub ping {
127 0     0 1   my ($self, %args) = @_;
128              
129             my $url = $self->_make_url('/ping', {
130             (
131             exists $args{wait_for_leader} ?
132             ( wait_for_leader => $args{wait_for_leader} )
133 0 0         :
134             ()
135             )
136             });
137              
138             $self->_http_request( GET => $url,
139             sub {
140 0     0     my ($body, $headers) = @_;
141 0 0         if ( $headers->{Status} eq '204' ) {
142 0           $args{on_success}->( $headers->{'x-influxdb-version'} );
143             } else {
144 0   0       $args{on_error}->( $headers->{Reason} || $body );
145             }
146             }
147 0           );
148             }
149              
150              
151             sub _to_line {
152 0     0     my $data = shift;
153              
154 0   0       my $t = $data->{tags} || {};
155 0   0       my $f = $data->{fields} || {};
156              
157             return $data->{measurement}
158             .(
159             $t ?
160             ','.
161             join(',',
162             map {
163 0           join('=', $_, $t->{$_})
164 0           } sort { $a cmp $b } keys %$t
165             )
166             :
167             ''
168             )
169             . ' '
170             .(
171             join(',',
172             map {
173 0           join('=', $_, $f->{$_})
174             } keys %$f
175             )
176             )
177             .(
178             $data->{time} ?
179             ' '. $data->{time}
180             :
181 0 0         ''
    0          
182             );
183             }
184              
185             sub write {
186 0     0 1   my ($self, %args) = @_;
187              
188             my $data = ref $args{data} eq 'ARRAY' ?
189 0 0         join("\n", map { ref $_ eq 'HASH' ? _to_line($_) : $_ } @{ $args{data} })
  0            
190             :
191 0 0         ref $args{data} eq 'HASH' ? _to_line($args{data}) : $args{data};
    0          
192              
193             my $url = $self->_make_url('/write', {
194             db => $args{database},
195             (
196             $args{consistency} ?
197             ( consistency => $args{consistency} )
198             :
199             ()
200             ),
201             (
202             $args{rp} ?
203             ( rp => $args{rp} )
204             :
205             ()
206             ),
207             (
208             $args{precision} ?
209             ( precision => $args{precision} )
210             :
211             ()
212             ),
213             (
214             $args{one} ?
215             ( one => $args{one} )
216 0 0         :
    0          
    0          
    0          
217             ()
218             )
219             });
220              
221             $self->_http_request( POST => $url, $data,
222             sub {
223 0     0     my ($body, $headers) = @_;
224              
225 0 0         if ( $headers->{Status} eq '204' ) {
226 0           $args{on_success}->();
227             } else {
228 0           $args{on_error}->( $body );
229             }
230             }
231 0           );
232             }
233              
234              
235             sub select {
236 0     0 1   my ($self, %args) = @_;
237              
238 0           my $method = 'GET';
239 0           my $q;
240 0 0         if ( exists $args{q} ) {
241 0           $q = $args{q};
242 0 0         if ( $q =~ /\s+INTO\s+/i ) {
243 0           $method = 'POST';
244             }
245             } else {
246 0           $q = 'SELECT '. $args{fields};
247              
248 0 0         if ( my $into = $args{into} ) {
249 0           $q .= ' INTO '. $into;
250 0           $method = 'POST';
251             }
252              
253 0           $q .= ' FROM '. $args{measurement};
254              
255 0 0         if ( my $cond = $args{where} ) {
256 0           $q .= ' WHERE '. $cond;
257             }
258              
259 0 0         if ( my $group = $args{group_by} ) {
260 0           $q .= ' GROUP BY '. $group;
261              
262 0 0         if ( my $fill = $args{fill} ) {
263 0           $q .= ' fill('. $fill .')';
264             }
265             }
266              
267 0 0         if ( my $order_by = $args{order_by} ) {
268 0           $q .= ' ORDER BY '. $order_by;
269             }
270              
271 0 0         if ( my $limit = $args{limit} ) {
272 0           $q .= ' LIMIT '. $limit;
273              
274 0 0         if ( my $offset = $args{offset} ) {
275 0           $q .= ' OFFSET '. $offset;
276             }
277             }
278              
279 0 0         if ( my $slimit = $args{slimit} ) {
280 0           $q .= ' SLIMIT '. $slimit;
281              
282 0 0         if ( my $soffset = $args{soffset} ) {
283 0           $q .= ' SOFFSET '. $soffset;
284             }
285             }
286             }
287              
288             my $url = $self->_make_url('/query', {
289             db => $args{database},
290             q => $q,
291             (
292             $args{rp} ?
293             ( rp => $args{rp} )
294             :
295             ()
296             ),
297             (
298             $args{epoch} ?
299             ( epoch => $args{epoch} )
300             :
301             ()
302             ),
303             (
304             $args{chunk_size} ?
305             ( chunk_size => $args{chunk_size} )
306 0 0         :
    0          
    0          
307             ()
308             ),
309             });
310              
311             $self->_http_request( $method => $url,
312             sub {
313 0     0     my ($body, $headers) = @_;
314              
315 0 0         if ( $headers->{Status} eq '200' ) {
316 0           my $data = decode_json($body);
317             my $series = [
318             map {
319 0           my $res = $_;
320              
321 0           my $cols = $res->{columns};
322 0           my $values = $res->{values};
323              
324             +{
325             name => $res->{name},
326             values => [
327             map {
328             +{
329 0           zip(@$cols, @$_)
330             }
331 0 0         } @{ $values || [] }
  0            
332             ]
333             }
334 0 0         } @{ $data->{results}->[0]->{series} || [] }
  0            
335             ];
336 0           $args{on_success}->($series);
337             } else {
338 0           $args{on_error}->( $body );
339             }
340             }
341 0           );
342             }
343              
344              
345             sub create_database {
346 0     0 1   my ($self, %args) = @_;
347              
348 0           my $q;
349 0 0         if ( exists $args{q} ) {
350 0           $q = $args{q};
351             } else {
352 0           $q = 'CREATE DATABASE '. $args{database};
353             }
354              
355 0           my $url = $self->_make_url('/query', {
356             q => $q,
357             });
358              
359             $self->_http_request( POST => $url,
360             sub {
361 0     0     my ($body, $headers) = @_;
362              
363 0 0         if ( $headers->{Status} eq '200' ) {
364 0           $args{on_success}->( $body );
365             } else {
366 0           $args{on_error}->( $body );
367             }
368             }
369 0           );
370             }
371              
372              
373             sub drop_database {
374 0     0 1   my ($self, %args) = @_;
375              
376 0           my $q;
377 0 0         if ( exists $args{q} ) {
378 0           $q = $args{q};
379             } else {
380 0           $q = 'DROP DATABASE '. $args{database};
381             }
382              
383 0           my $url = $self->_make_url('/query', {
384             q => $q,
385             });
386              
387             $self->_http_request( POST => $url,
388             sub {
389 0     0     my ($body, $headers) = @_;
390              
391 0 0         if ( $headers->{Status} eq '200' ) {
392 0           $args{on_success}->();
393             } else {
394 0           $args{on_error}->( $body );
395             }
396             }
397 0           );
398             }
399              
400              
401             sub drop_series {
402 0     0 1   my ($self, %args) = @_;
403              
404 0           my $q;
405 0 0         if ( exists $args{q} ) {
406 0           $q = $args{q};
407             } else {
408 0           $q = 'DROP SERIES';
409              
410 0 0         if ( my $measurement = $args{measurement} ) {
    0          
411 0           $q .= ' FROM '. $measurement;
412             }
413             elsif ( my $measurements = $args{measurements} ) {
414 0 0         $q .= ' FROM '. join(',', @{ $measurements || [] });
  0            
415             }
416              
417 0 0         if ( my $cond = $args{where} ) {
418 0           $q .= ' WHERE '. $cond;
419             }
420             }
421              
422             my $url = $self->_make_url('/query', {
423             db => $args{database},
424 0           q => $q
425             });
426              
427             $self->_http_request( POST => $url,
428             sub {
429 0     0     my ($body, $headers) = @_;
430              
431 0 0         if ( $headers->{Status} eq '200' ) {
432 0           $args{on_success}->();
433             } else {
434 0           $args{on_error}->( $body );
435             }
436             }
437 0           );
438             }
439              
440              
441             sub delete_series {
442 0     0 1   my ($self, %args) = @_;
443              
444 0           my $q;
445 0 0         if ( exists $args{q} ) {
446 0           $q = $args{q};
447             } else {
448 0           $q = 'DELETE';
449              
450 0 0         if ( my $measurement = $args{measurement} ) {
451 0           $q .= ' FROM '. $measurement;
452             }
453              
454 0 0         if ( my $cond = $args{where} ) {
455 0           $q .= ' WHERE '. $cond;
456             }
457             }
458              
459             my $url = $self->_make_url('/query', {
460             db => $args{database},
461 0           q => $q
462             });
463              
464             $self->_http_request( POST => $url,
465             sub {
466 0     0     my ($body, $headers) = @_;
467              
468 0 0         if ( $headers->{Status} eq '200' ) {
469 0           $args{on_success}->();
470             } else {
471 0           $args{on_error}->( $body );
472             }
473             }
474 0           );
475             }
476              
477              
478             sub drop_measurement {
479 0     0 1   my ($self, %args) = @_;
480              
481 0           my $q;
482 0 0         if ( exists $args{q} ) {
483 0           $q = $args{q};
484             } else {
485 0           $q = 'DROP MEASUREMENT '. $args{measurement};
486             }
487              
488             my $url = $self->_make_url('/query', {
489             db => $args{database},
490 0           q => $q
491             });
492              
493             $self->_http_request( POST => $url,
494             sub {
495 0     0     my ($body, $headers) = @_;
496              
497 0 0         if ( $headers->{Status} eq '200' ) {
498 0           $args{on_success}->();
499             } else {
500 0           $args{on_error}->( $body );
501             }
502             }
503 0           );
504             }
505              
506              
507             sub show_shards {
508 0     0 1   my ($self, %args) = @_;
509              
510 0           my $url = $self->_make_url('/query', {
511             q => 'SHOW SHARDS'
512             });
513              
514             $self->_http_request( GET => $url,
515             sub {
516 0     0     my ($body, $headers) = @_;
517              
518 0 0         if ( $headers->{Status} eq '200' ) {
519 0           my $data = decode_json($body);
520 0           my $shards = {};
521 0 0         for my $res ( @{ $data->{results}->[0]->{series} || [] } ) {
  0            
522 0           my $cols = $res->{columns};
523 0           my $values = $res->{values};
524             $shards->{ $res->{name } } = [
525             map {
526             +{
527 0           zip(@$cols, @$_)
528             }
529 0 0         } @{ $values || [] }
  0            
530             ];
531             }
532 0           $args{on_success}->($shards);
533             } else {
534 0           $args{on_error}->( $body );
535             }
536             }
537 0           );
538             }
539              
540              
541             sub show_shard_groups {
542 0     0 1   my ($self, %args) = @_;
543              
544 0           my $url = $self->_make_url('/query', {
545             q => 'SHOW SHARD GROUPS'
546             });
547              
548             $self->_http_request( GET => $url,
549             sub {
550 0     0     my ($body, $headers) = @_;
551              
552 0 0         if ( $headers->{Status} eq '200' ) {
553 0           my $data = decode_json($body);
554 0           my $res = $data->{results}->[0]->{series}->[0];
555 0           my $cols = $res->{columns};
556 0           my $values = $res->{values};
557             my @shard_groups = (
558             map {
559             +{
560 0           zip(@$cols, @$_)
561             }
562 0 0         } @{ $values || [] }
  0            
563             );
564 0           $args{on_success}->(@shard_groups);
565             } else {
566 0           $args{on_error}->( $body );
567             }
568             }
569 0           );
570             }
571              
572              
573             sub drop_shard {
574 0     0 1   my ($self, %args) = @_;
575              
576 0           my $q;
577 0 0         if ( exists $args{q} ) {
578 0           $q = $args{q};
579             } else {
580 0           $q = 'DROP SHARD '. $args{id};
581             }
582              
583             my $url = $self->_make_url('/query', {
584             db => $args{database},
585 0           q => $q
586             });
587              
588             $self->_http_request( POST => $url,
589             sub {
590 0     0     my ($body, $headers) = @_;
591              
592 0 0         if ( $headers->{Status} eq '200' ) {
593 0           $args{on_success}->();
594             } else {
595 0           $args{on_error}->( $body );
596             }
597             }
598 0           );
599             }
600              
601              
602             sub show_queries {
603 0     0 1   my ($self, %args) = @_;
604              
605 0           my $url = $self->_make_url('/query', {
606             q => 'SHOW QUERIES'
607             });
608              
609             $self->_http_request( GET => $url,
610             sub {
611 0     0     my ($body, $headers) = @_;
612              
613 0 0         if ( $headers->{Status} eq '200' ) {
614 0           my $data = decode_json($body);
615 0           my $res = $data->{results}->[0]->{series}->[0];
616 0           my $cols = $res->{columns};
617 0           my $values = $res->{values};
618             my @queries = (
619             map {
620             +{
621 0           zip(@$cols, @$_)
622             }
623 0 0         } @{ $values || [] }
  0            
624             );
625 0           $args{on_success}->(@queries);
626             } else {
627 0           $args{on_error}->( $body );
628             }
629             }
630 0           );
631             }
632              
633              
634             sub kill_query {
635 0     0 1   my ($self, %args) = @_;
636              
637 0           my $q;
638 0 0         if ( exists $args{q} ) {
639 0           $q = $args{q};
640             } else {
641 0           $q = 'KILL QUERY '. $args{id};
642             }
643              
644             my $url = $self->_make_url('/query', {
645             db => $args{database},
646 0           q => $q
647             });
648              
649             $self->_http_request( POST => $url,
650             sub {
651 0     0     my ($body, $headers) = @_;
652              
653 0 0         if ( $headers->{Status} eq '200' ) {
654 0           $args{on_success}->();
655             } else {
656 0           $args{on_error}->( $body );
657             }
658             }
659 0           );
660             }
661              
662              
663              
664             sub create_retention_policy {
665 0     0 1   my ($self, %args) = @_;
666              
667 0           my $q;
668 0 0         if ( exists $args{q} ) {
669 0           $q = $args{q};
670             } else {
671             $q = 'CREATE RETENTION POLICY '. $args{name}
672             .' ON '. $args{database}
673             .' DURATION '. $args{duration}
674             .' REPLICATION '. $args{replication}
675 0           .' SHARD DURATION '. $args{shard_duration};
676              
677 0 0         $q .= ' DEFAULT' if $args{default};
678             }
679              
680 0           my $url = $self->_make_url('/query', {
681             q => $q,
682             });
683              
684             $self->_http_request( POST => $url,
685             sub {
686 0     0     my ($body, $headers) = @_;
687              
688 0 0         if ( $headers->{Status} eq '200' ) {
689 0           $args{on_success}->();
690             } else {
691 0           $args{on_error}->( $body );
692             }
693             }
694 0           );
695             }
696              
697              
698             sub alter_retention_policy {
699 0     0 1   my ($self, %args) = @_;
700              
701 0           my $q;
702 0 0         if ( exists $args{q} ) {
703 0           $q = $args{q};
704             } else {
705             $q = 'ALTER RETENTION POLICY '. $args{name}
706 0           .' ON '. $args{database};
707              
708 0 0         $q .= ' DURATION '. $args{duration} if exists $args{duration};
709 0 0         $q .= ' SHARD DURATION '. $args{shard_duration} if exists $args{shard_duration};
710 0 0         $q .= ' REPLICATION '. $args{replication} if exists $args{replication};;
711 0 0         $q .= ' DEFAULT' if $args{default};
712             }
713              
714 0           my $url = $self->_make_url('/query', {
715             q => $q
716             });
717              
718             $self->_http_request( POST => $url,
719             sub {
720 0     0     my ($body, $headers) = @_;
721              
722 0 0         if ( $headers->{Status} eq '200' ) {
723 0           $args{on_success}->();
724             } else {
725 0           $args{on_error}->( $body );
726             }
727             }
728 0           );
729             }
730              
731              
732             sub drop_retention_policy {
733 0     0 1   my ($self, %args) = @_;
734              
735 0           my $q;
736 0 0         if ( exists $args{q} ) {
737 0           $q = $args{q};
738             } else {
739 0           $q = 'DROP RETENTION POLICY '. $args{name} .' ON '. $args{database};
740             }
741              
742 0           my $url = $self->_make_url('/query', {
743             q => $q,
744             });
745              
746             $self->_http_request( POST => $url,
747             sub {
748 0     0     my ($body, $headers) = @_;
749              
750 0 0         if ( $headers->{Status} eq '200' ) {
751 0           $args{on_success}->();
752             } else {
753 0           $args{on_error}->( $body );
754             }
755             }
756 0           );
757             }
758              
759              
760             sub show_databases {
761 0     0 1   my ($self, %args) = @_;
762              
763 0           my $url = $self->_make_url('/query', {
764             q => 'SHOW DATABASES'
765             });
766              
767             $self->_http_request( GET => $url,
768             sub {
769 0     0     my ($body, $headers) = @_;
770              
771 0 0         if ( $headers->{Status} eq '200' ) {
772 0           my $data = decode_json($body);
773 0           my @names;
774 0           eval {
775 0 0         @names = map { $_->[0] } @{ $data->{results}->[0]->{series}->[0]->{values} || [] };
  0            
  0            
776             };
777 0           $args{on_success}->(@names);
778             } else {
779 0           $args{on_error}->( $body );
780             }
781             }
782 0           );
783             }
784              
785              
786             sub show_retention_policies {
787 0     0 1   my ($self, %args) = @_;
788              
789 0           my $q;
790 0 0         if ( exists $args{q} ) {
791 0           $q = $args{q};
792             } else {
793 0           $q = 'SHOW RETENTION POLICIES ON '. $args{database};
794             }
795              
796 0           my $url = $self->_make_url('/query', {
797             q => $q,
798             });
799              
800             $self->_http_request( GET => $url,
801             sub {
802 0     0     my ($body, $headers) = @_;
803              
804 0 0         if ( $headers->{Status} eq '200' ) {
805 0           my $data = decode_json($body);
806 0           my $res = $data->{results}->[0]->{series}->[0];
807 0           my $cols = $res->{columns};
808 0           my $values = $res->{values};
809             my @policies = (
810             map {
811             +{
812 0           zip(@$cols, @$_)
813             }
814 0 0         } @{ $values || [] }
  0            
815             );
816 0           $args{on_success}->(@policies);
817             } else {
818 0           $args{on_error}->( $body );
819             }
820             }
821 0           );
822             }
823              
824              
825             sub show_series {
826 0     0 1   my ($self, %args) = @_;
827              
828 0           my $q;
829 0 0         if ( exists $args{q} ) {
830 0           $q = $args{q};
831             } else {
832 0           $q = 'SHOW SERIES';
833              
834 0 0         if ( my $measurement = $args{measurement} ) {
835 0           $q .= ' FROM '. $measurement;
836             }
837              
838 0 0         if ( my $cond = $args{where} ) {
839 0           $q .= ' WHERE '. $cond;
840             }
841              
842 0 0         if ( my $order_by = $args{order_by} ) {
843 0           $q .= ' ORDER BY '. $order_by;
844             }
845              
846 0 0         if ( my $limit = $args{limit} ) {
847 0           $q .= ' LIMIT '. $limit;
848              
849 0 0         if ( my $offset = $args{offset} ) {
850 0           $q .= ' OFFSET '. $offset;
851             }
852             }
853             }
854              
855             my $url = $self->_make_url('/query', {
856             db => $args{database},
857 0           q => $q
858             });
859              
860             $self->_http_request( GET => $url,
861             sub {
862 0     0     my ($body, $headers) = @_;
863              
864 0 0         if ( $headers->{Status} eq '200' ) {
865 0           my $data = decode_json($body);
866 0           my $res = $data->{results}->[0]->{series}->[0];
867 0           my $values = $res->{values};
868             my @series = (
869 0 0         map { @$_ } @{ $values || [] }
  0            
  0            
870             );
871 0           $args{on_success}->(@series);
872             } else {
873 0           $args{on_error}->( $body );
874             }
875             }
876 0           );
877             }
878              
879              
880             sub show_measurements {
881 0     0 1   my ($self, %args) = @_;
882              
883 0           my $q;
884 0 0         if ( exists $args{q} ) {
885 0           $q = $args{q};
886             } else {
887 0           $q = 'SHOW MEASUREMENTS';
888              
889 0 0         if ( my $measurement = $args{measurement} ) {
890 0 0         $q .= ' WITH MEASUREMENT '
891             . ( $measurement =~ /^\/.*\/$/ ? '=~' : '=' )
892             . $measurement;
893             }
894              
895 0 0         if ( my $cond = $args{where} ) {
896 0           $q .= ' WHERE '. $cond;
897             }
898              
899 0 0         if ( my $order_by = $args{order_by} ) {
900 0           $q .= ' ORDER BY '. $order_by;
901             }
902              
903 0 0         if ( my $limit = $args{limit} ) {
904 0           $q .= ' LIMIT '. $limit;
905              
906 0 0         if ( my $offset = $args{offset} ) {
907 0           $q .= ' OFFSET '. $offset;
908             }
909             }
910             }
911              
912             my $url = $self->_make_url('/query', {
913             db => $args{database},
914 0           q => $q
915             });
916              
917             $self->_http_request( GET => $url,
918             sub {
919 0     0     my ($body, $headers) = @_;
920              
921 0 0         if ( $headers->{Status} eq '200' ) {
922 0           my $data = decode_json($body);
923 0           my $res = $data->{results}->[0]->{series}->[0];
924 0           my $values = $res->{values};
925             my @measurements = (
926 0 0         map { @$_ } @{ $values || [] }
  0            
  0            
927             );
928 0           $args{on_success}->(@measurements);
929             } else {
930 0           $args{on_error}->( $body );
931             }
932             }
933 0           );
934             }
935              
936              
937             sub show_tag_keys {
938 0     0 1   my ($self, %args) = @_;
939              
940 0           my $q;
941 0 0         if ( exists $args{q} ) {
942 0           $q = $args{q};
943             } else {
944 0           $q = 'SHOW TAG KEYS';
945              
946 0 0         if ( my $measurement = $args{measurement} ) {
947 0           $q .= ' FROM '. $measurement;
948             }
949              
950 0 0         if ( my $cond = $args{where} ) {
951 0           $q .= ' WHERE '. $cond;
952             }
953              
954 0 0         if ( my $limit = $args{limit} ) {
955 0           $q .= ' LIMIT '. $limit;
956             }
957              
958 0 0         if ( my $offset = $args{offset} ) {
959 0           $q .= ' OFFSET '. $offset;
960             }
961             }
962              
963             my $url = $self->_make_url('/query', {
964             db => $args{database},
965 0           q => $q
966             });
967              
968             $self->_http_request( GET => $url,
969             sub {
970 0     0     my ($body, $headers) = @_;
971              
972 0 0         if ( $headers->{Status} eq '200' ) {
973 0           my $data = decode_json($body);
974 0           my $tag_keys = {};
975 0 0         for my $res ( @{ $data->{results}->[0]->{series} || [] } ) {
  0            
976 0           my $values = $res->{values};
977             $tag_keys->{ $res->{name } } = [
978             map {
979 0           @$_
980 0 0         } @{ $values || [] }
  0            
981             ];
982             }
983 0           $args{on_success}->($tag_keys);
984             } else {
985 0           $args{on_error}->( $body );
986             }
987             }
988 0           );
989             }
990              
991              
992             sub show_tag_values {
993 0     0 1   my ($self, %args) = @_;
994              
995 0           my $q;
996 0 0         if ( exists $args{q} ) {
997 0           $q = $args{q};
998             } else {
999 0           $q = 'SHOW TAG VALUES';
1000              
1001 0 0         if ( my $measurement = $args{measurement} ) {
1002 0           $q .= ' FROM '. $measurement;
1003             }
1004              
1005 0 0         if ( my $keys = $args{keys} ) {
    0          
1006 0           $q .= ' WITH KEY IN ('. join(", ", @$keys) .')';
1007             }
1008             elsif ( my $key = $args{key} ) {
1009 0           $q .= ' WITH KEY = '. $key;
1010             }
1011              
1012 0 0         if ( my $cond = $args{where} ) {
1013 0           $q .= ' WHERE '. $cond;
1014             }
1015              
1016 0 0         if ( my $limit = $args{limit} ) {
1017 0           $q .= ' LIMIT '. $limit;
1018             }
1019              
1020 0 0         if ( my $offset = $args{offset} ) {
1021 0           $q .= ' OFFSET '. $offset;
1022             }
1023             }
1024              
1025             my $url = $self->_make_url('/query', {
1026             db => $args{database},
1027 0           q => $q
1028             });
1029              
1030             $self->_http_request( GET => $url,
1031             sub {
1032 0     0     my ($body, $headers) = @_;
1033              
1034 0 0         if ( $headers->{Status} eq '200' ) {
1035 0           my $data = decode_json($body);
1036 0           my $tag_values = {};
1037 0 0         for my $res ( @{ $data->{results}->[0]->{series} || [] } ) {
  0            
1038 0           my $cols = $res->{columns};
1039 0           my %col_idx = ( key => 0, value => 1 );
1040 0 0         for ( my $i = 0; $i < @{ $cols || [] }; $i++ ) {
  0            
1041 0           $col_idx{ $cols->[$i] } = $i;
1042             }
1043 0           my $values = $res->{values};
1044 0 0         for my $v ( @{ $values || [] } ) {
  0            
1045 0           push @{ $tag_values->{ $res->{name } }->{ $v->[ $col_idx{key} ] } },
1046 0           $v->[ $col_idx{value} ];
1047             }
1048             }
1049 0           $args{on_success}->($tag_values);
1050             } else {
1051 0           $args{on_error}->( $body );
1052             }
1053             }
1054 0           );
1055             }
1056              
1057              
1058             sub show_field_keys {
1059 0     0 1   my ($self, %args) = @_;
1060              
1061 0           my $q;
1062 0 0         if ( exists $args{q} ) {
1063 0           $q = $args{q};
1064             } else {
1065 0           $q = 'SHOW FIELD KEYS';
1066              
1067 0 0         if ( my $measurement = $args{measurement} ) {
1068 0           $q .= ' FROM '. $measurement;
1069             }
1070             }
1071              
1072             my $url = $self->_make_url('/query', {
1073             db => $args{database},
1074 0           q => $q
1075             });
1076              
1077             $self->_http_request( GET => $url,
1078             sub {
1079 0     0     my ($body, $headers) = @_;
1080              
1081 0 0         if ( $headers->{Status} eq '200' ) {
1082 0           my $data = decode_json($body);
1083 0           my $field_keys = {};
1084 0 0         for my $res ( @{ $data->{results}->[0]->{series} || [] } ) {
  0            
1085 0           my $values = $res->{values};
1086             $field_keys->{ $res->{name } } = [
1087             map {
1088             +{
1089 0           name => $_->[0],
1090             type => $_->[1],
1091             }
1092 0 0         } @{ $values || [] }
  0            
1093             ];
1094             }
1095 0           $args{on_success}->($field_keys);
1096             } else {
1097 0           $args{on_error}->( $body );
1098             }
1099             }
1100 0           );
1101             }
1102              
1103              
1104             sub create_user {
1105 0     0 1   my ($self, %args) = @_;
1106              
1107 0           my $q;
1108 0 0         if ( exists $args{q} ) {
1109 0           $q = $args{q};
1110             } else {
1111             $q = 'CREATE USER '. $args{username}
1112 0           .' WITH PASSWORD \''. $args{password} .'\'';
1113              
1114 0 0         $q .= ' WITH ALL PRIVILEGES' if $args{all_privileges};
1115             }
1116              
1117 0           my $url = $self->_make_url('/query', {
1118             q => $q
1119             });
1120              
1121             $self->_http_request( POST => $url,
1122             sub {
1123 0     0     my ($body, $headers) = @_;
1124              
1125 0 0         if ( $headers->{Status} eq '200' ) {
1126 0           $args{on_success}->();
1127             } else {
1128 0           $args{on_error}->( $body );
1129             }
1130             }
1131 0           );
1132             }
1133              
1134              
1135             sub set_user_password {
1136 0     0 1   my ($self, %args) = @_;
1137              
1138 0           my $q;
1139 0 0         if ( exists $args{q} ) {
1140 0           $q = $args{q};
1141             } else {
1142             $q = 'SET PASSWORD FOR '. $args{username}
1143 0           .' = \''. $args{password} .'\'';
1144             }
1145              
1146 0           my $url = $self->_make_url('/query', {
1147             q => $q
1148             });
1149              
1150             $self->_http_request( POST => $url,
1151             sub {
1152 0     0     my ($body, $headers) = @_;
1153              
1154 0 0         if ( $headers->{Status} eq '200' ) {
1155 0           $args{on_success}->();
1156             } else {
1157 0           $args{on_error}->( $body );
1158             }
1159             }
1160 0           );
1161             }
1162              
1163              
1164             sub show_users {
1165 0     0 1   my ($self, %args) = @_;
1166              
1167 0           my $url = $self->_make_url('/query', {
1168             q => 'SHOW USERS'
1169             });
1170              
1171             $self->_http_request( GET => $url,
1172             sub {
1173 0     0     my ($body, $headers) = @_;
1174              
1175 0 0         if ( $headers->{Status} eq '200' ) {
1176 0           my $data = decode_json($body);
1177 0           my $res = $data->{results}->[0]->{series}->[0];
1178 0           my $cols = $res->{columns};
1179 0           my $values = $res->{values};
1180             my @users = (
1181             map {
1182             +{
1183 0           zip(@$cols, @$_)
1184             }
1185 0 0         } @{ $values || [] }
  0            
1186             );
1187 0           $args{on_success}->(@users);
1188             } else {
1189 0           $args{on_error}->( $body );
1190             }
1191             }
1192 0           );
1193             }
1194              
1195              
1196             sub grant_privileges {
1197 0     0 1   my ($self, %args) = @_;
1198              
1199 0           my $q;
1200 0 0         if ( exists $args{q} ) {
1201 0           $q = $args{q};
1202             } else {
1203 0           $q = 'GRANT ';
1204              
1205 0 0         if ( $args{all_privileges} ) {
1206 0           $q .= 'ALL PRIVILEGES';
1207             } else {
1208 0           $q .= $args{access} .' ON '. $args{database};
1209             }
1210 0           $q .= ' TO '. $args{username};
1211             }
1212              
1213 0           my $url = $self->_make_url('/query', {
1214             q => $q
1215             });
1216              
1217             $self->_http_request( POST => $url,
1218             sub {
1219 0     0     my ($body, $headers) = @_;
1220              
1221 0 0         if ( $headers->{Status} eq '200' ) {
1222 0           $args{on_success}->();
1223             } else {
1224 0           $args{on_error}->( $body );
1225             }
1226             }
1227 0           );
1228             }
1229              
1230              
1231             sub show_grants {
1232 0     0 1   my ($self, %args) = @_;
1233              
1234 0           my $q;
1235 0 0         if ( exists $args{q} ) {
1236 0           $q = $args{q};
1237             } else {
1238 0           $q = 'SHOW GRANTS FOR '. $args{username};
1239             }
1240              
1241 0           my $url = $self->_make_url('/query', {
1242             q => $q
1243             });
1244              
1245             $self->_http_request( GET => $url,
1246             sub {
1247 0     0     my ($body, $headers) = @_;
1248              
1249 0 0         if ( $headers->{Status} eq '200' ) {
1250 0           my $data = decode_json($body);
1251 0           my $res = $data->{results}->[0]->{series}->[0];
1252 0           my $cols = $res->{columns};
1253 0           my $values = $res->{values};
1254             my @grants = (
1255             map {
1256             +{
1257 0           zip(@$cols, @$_)
1258             }
1259 0 0         } @{ $values || [] }
  0            
1260             );
1261 0           $args{on_success}->(@grants);
1262             } else {
1263 0           $args{on_error}->( $body );
1264             }
1265             }
1266 0           );
1267             }
1268              
1269              
1270              
1271             sub revoke_privileges {
1272 0     0 1   my ($self, %args) = @_;
1273              
1274 0           my $q;
1275 0 0         if ( exists $args{q} ) {
1276 0           $q = $args{q};
1277             } else {
1278 0           $q = 'REVOKE ';
1279              
1280 0 0         if ( $args{all_privileges} ) {
1281 0           $q .= 'ALL PRIVILEGES';
1282             } else {
1283 0           $q .= $args{access} .' ON '. $args{database};
1284             }
1285 0           $q .= ' FROM '. $args{username};
1286             }
1287              
1288 0           my $url = $self->_make_url('/query', {
1289             q => $q
1290             });
1291              
1292             $self->_http_request( POST => $url,
1293             sub {
1294 0     0     my ($body, $headers) = @_;
1295              
1296 0 0         if ( $headers->{Status} eq '200' ) {
1297 0           $args{on_success}->();
1298             } else {
1299 0           $args{on_error}->( $body );
1300             }
1301             }
1302 0           );
1303             }
1304              
1305              
1306              
1307             sub drop_user {
1308 0     0 1   my ($self, %args) = @_;
1309              
1310 0           my $q;
1311 0 0         if ( exists $args{q} ) {
1312 0           $q = $args{q};
1313             } else {
1314 0           $q = 'DROP USER '. $args{username};
1315             }
1316              
1317 0           my $url = $self->_make_url('/query', {
1318             q => $q
1319             });
1320              
1321             $self->_http_request( POST => $url,
1322             sub {
1323 0     0     my ($body, $headers) = @_;
1324              
1325 0 0         if ( $headers->{Status} eq '200' ) {
1326 0           $args{on_success}->();
1327             } else {
1328 0           $args{on_error}->( $body );
1329             }
1330             }
1331 0           );
1332             }
1333              
1334              
1335             sub create_continuous_query {
1336 0     0 1   my ($self, %args) = @_;
1337              
1338 0           my $q;
1339 0 0         if ( exists $args{q} ) {
1340 0           $q = $args{q};
1341             } else {
1342 0           my $resample = '';
1343 0 0 0       if ( $args{every} || $args{for} ) {
1344 0           $resample = ' RESAMPLE';
1345 0 0         if ( $args{every} ) {
1346 0           $resample .= ' EVERY '. $args{every};
1347             }
1348 0 0         if ( $args{for} ) {
1349 0           $resample .= ' FOR '. $args{for};
1350             }
1351             }
1352             $q = 'CREATE CONTINUOUS QUERY '. $args{name}
1353             .' ON '. $args{database}
1354             . $resample
1355             .' BEGIN '. $args{query}
1356 0           .' END';
1357             }
1358              
1359 0           my $url = $self->_make_url('/query', {
1360             q => $q
1361             });
1362              
1363             $self->_http_request( POST => $url,
1364             sub {
1365 0     0     my ($body, $headers) = @_;
1366              
1367 0 0         if ( $headers->{Status} eq '200' ) {
1368 0           $args{on_success}->();
1369             } else {
1370 0           $args{on_error}->( $body );
1371             }
1372             }
1373 0           );
1374             }
1375              
1376              
1377             sub drop_continuous_query {
1378 0     0 1   my ($self, %args) = @_;
1379              
1380 0           my $q;
1381 0 0         if ( exists $args{q} ) {
1382 0           $q = $args{q};
1383             } else {
1384 0           $q = 'DROP CONTINUOUS QUERY '. $args{name} . ' ON '. $args{database};
1385             }
1386              
1387 0           my $url = $self->_make_url('/query', {
1388             q => $q
1389             });
1390              
1391             $self->_http_request( POST => $url,
1392             sub {
1393 0     0     my ($body, $headers) = @_;
1394              
1395 0 0         if ( $headers->{Status} eq '200' ) {
1396 0           $args{on_success}->();
1397             } else {
1398 0           $args{on_error}->( $body );
1399             }
1400             }
1401 0           );
1402             }
1403              
1404              
1405             sub show_continuous_queries {
1406 0     0 1   my ($self, %args) = @_;
1407              
1408             my $url = $self->_make_url('/query', {
1409             db => $args{database},
1410 0           q => 'SHOW CONTINUOUS QUERIES'
1411             });
1412              
1413             $self->_http_request( GET => $url,
1414             sub {
1415 0     0     my ($body, $headers) = @_;
1416              
1417 0 0         if ( $headers->{Status} eq '200' ) {
1418 0           my $data = decode_json($body);
1419 0           my $cqs = {};
1420 0 0         for my $res ( @{ $data->{results}->[0]->{series} || [] } ) {
  0            
1421 0           my $cols = $res->{columns};
1422 0           my $values = $res->{values};
1423             $cqs->{ $res->{name } } = [
1424             map {
1425             +{
1426 0           zip(@$cols, @$_)
1427             }
1428 0 0         } @{ $values || [] }
  0            
1429             ];
1430             }
1431 0           $args{on_success}->($cqs);
1432             } else {
1433 0           $args{on_error}->( $body );
1434             }
1435             }
1436 0           );
1437             }
1438              
1439              
1440             sub create_subscription {
1441 0     0 1   my ($self, %args) = @_;
1442              
1443 0           my $q;
1444 0 0         if ( exists $args{q} ) {
1445 0           $q = $args{q};
1446             } else {
1447             $q = 'CREATE SUBSCRIPTION '. $args{name} .' ON '
1448             . $args{database} .'.'. $args{rp}
1449             . ' DESTINATIONS '. $args{mode} .' '
1450             . (
1451             ref $args{destinations} eq 'ARRAY' ?
1452 0 0         join(", ", @{ $args{destinations} || [] } )
1453             :
1454             $args{destinations}
1455 0 0         );
1456             }
1457              
1458 0           my $url = $self->_make_url('/query', {
1459             q => $q,
1460             });
1461              
1462             $self->_http_request( POST => $url,
1463             sub {
1464 0     0     my ($body, $headers) = @_;
1465              
1466 0 0         if ( $headers->{Status} eq '200' ) {
1467 0           $args{on_success}->();
1468             } else {
1469 0           $args{on_error}->( $body );
1470             }
1471             }
1472 0           );
1473             }
1474              
1475              
1476             sub show_subscriptions {
1477 0     0 1   my ($self, %args) = @_;
1478              
1479 0           my $url = $self->_make_url('/query', {
1480             q => 'SHOW SUBSCRIPTIONS'
1481             });
1482              
1483             $self->_http_request( GET => $url,
1484             sub {
1485 0     0     my ($body, $headers) = @_;
1486              
1487 0 0         if ( $headers->{Status} eq '200' ) {
1488 0           my $data = decode_json($body);
1489 0           my $subscriptions = {};
1490 0 0         for my $res ( @{ $data->{results}->[0]->{series} || [] } ) {
  0            
1491 0           my $cols = $res->{columns};
1492 0           my $values = $res->{values};
1493             $subscriptions->{ $res->{name } } = [
1494             map {
1495             +{
1496 0           zip(@$cols, @$_)
1497             }
1498 0 0         } @{ $values || [] }
  0            
1499             ];
1500             }
1501 0           $args{on_success}->($subscriptions);
1502             } else {
1503 0           $args{on_error}->( $body );
1504             }
1505             }
1506 0           );
1507             }
1508              
1509              
1510             sub drop_subscription {
1511 0     0 1   my ($self, %args) = @_;
1512              
1513 0           my $q;
1514 0 0         if ( exists $args{q} ) {
1515 0           $q = $args{q};
1516             } else {
1517             $q = 'DROP SUBSCRIPTION '. $args{name} .' ON '
1518 0           . $args{database} .'.'. $args{rp};
1519             }
1520              
1521 0           my $url = $self->_make_url('/query', {
1522             q => $q,
1523             });
1524              
1525             $self->_http_request( POST => $url,
1526             sub {
1527 0     0     my ($body, $headers) = @_;
1528              
1529 0 0         if ( $headers->{Status} eq '200' ) {
1530 0           $args{on_success}->();
1531             } else {
1532 0           $args{on_error}->( $body );
1533             }
1534             }
1535 0           );
1536             }
1537              
1538              
1539             sub query {
1540 0     0 1   my ($self, %args) = @_;
1541              
1542 0           my $url = $self->_server_uri->clone;
1543 0           $url->path('/query');
1544 0           $url->query_form_hash( $args{query} );
1545              
1546 0   0       my $method = $args{method} || 'GET';
1547              
1548             $self->_http_request( $method => $url,
1549             sub {
1550 0     0     $args{on_response}->(@_);
1551             }
1552 0           );
1553             }
1554              
1555              
1556             1;
1557              
1558             __END__