File Coverage

blib/lib/AnyEvent/InfluxDB.pm
Criterion Covered Total %
statement 30 658 4.5
branch 0 318 0.0
condition 0 69 0.0
subroutine 10 91 10.9
pod 37 37 100.0
total 77 1173 6.5


line stmt bran cond sub pod time code
1             #ABSTRACT: An asynchronous library for InfluxDB time-series database
2 1     1   646 use strict;
  1         2  
  1         22  
3 1     1   3 use warnings;
  1         1  
  1         44  
4             package AnyEvent::InfluxDB;
5             our $AUTHORITY = 'cpan:AJGB';
6             $AnyEvent::InfluxDB::VERSION = '1.0.0'; # TRIAL
7 1     1   859 use AnyEvent;
  1         3871  
  1         25  
8 1     1   536 use AnyEvent::HTTP;
  1         20060  
  1         68  
9 1     1   537 use URI;
  1         3214  
  1         25  
10 1     1   361 use URI::QueryParam;
  1         511  
  1         25  
11 1     1   6 use JSON qw(decode_json);
  1         0  
  1         6  
12 1     1   572 use List::MoreUtils qw(zip);
  1         6975  
  1         4  
13 1     1   818 use URI::Encode::XS qw( uri_encode );
  1         340  
  1         49  
14 1     1   464 use Moo;
  1         9146  
  1         5  
15              
16             has [qw( ssl_options username password on_request )] => (
17             is => 'ro',
18             predicate => 1,
19             );
20              
21             has 'server' => (
22             is => 'rw',
23             default => 'http://localhost:8086',
24             );
25              
26             has '_is_ssl' => (
27             is => 'lazy',
28             );
29              
30             has '_tls_ctx' => (
31             is => 'lazy',
32             );
33              
34             has '_server_uri' => (
35             is => 'lazy',
36             );
37              
38              
39             sub _build__tls_ctx {
40 0     0     my ($self) = @_;
41              
42             # no ca/hostname checks
43 0 0         return 'low' unless $self->has_ssl_options;
44              
45             # create ctx
46 0           require AnyEvent::TLS;
47 0           return AnyEvent::TLS->new( %{ $self->ssl_options } );
  0            
48             }
49              
50             sub _build__is_ssl {
51 0     0     my ($self) = @_;
52              
53 0           return $self->server =~ /^https/;
54             }
55              
56             sub _build__server_uri {
57 0     0     my ($self) = @_;
58              
59 0           my $url = URI->new( $self->server, 'http' );
60              
61 0 0 0       if ( $self->has_username && $self->has_password ) {
62 0           $url->query_param( 'u' => $self->username );
63 0           $url->query_param( 'p' => $self->password );
64             }
65              
66 0           return $url;
67             }
68              
69             sub _make_url {
70 0     0     my ($self, $path, $params) = @_;
71              
72 0           my $url = $self->_server_uri->clone;
73 0           $url->path($path);
74              
75 0           while ( my ($k, $v) = each %$params ) {
76 0           $url->query_param( $k => $v );
77             }
78              
79 0           return $url;
80             }
81              
82             sub _http_request {
83 0     0     my $cb = pop;
84 0           my ($self, $method, $url, $post_data) = @_;
85              
86 0 0         if ($self->has_on_request) {
87 0           $self->on_request->($method, $url, $post_data);
88             }
89              
90 0           my %args = (
91             headers => {
92             referer => undef,
93             'user-agent' => "AnyEvent-InfluxDB/0.13",
94             }
95             );
96 0 0         if ( $method eq 'POST' ) {
97 0 0         if ( defined $post_data ) {
98 0           $args{'body'} = $post_data;
99             } else {
100 0 0         if ( my $q = $url->query_param_delete('q') ) {
101 0           $args{headers}{'content-type'} = 'application/x-www-form-urlencoded';
102 0           $args{body} = 'q='. uri_encode($q);
103             }
104             }
105             }
106 0 0         if ( $self->_is_ssl ) {
107 0           $args{tls_ctx} = $self->_tls_ctx;
108             }
109              
110 0           my $guard;
111             $guard = http_request
112             $method => $url->as_string,
113             %args,
114             sub {
115 0     0     $cb->(@_);
116 0           undef $guard;
117 0           };
118             };
119              
120              
121             sub ping {
122 0     0 1   my ($self, %args) = @_;
123              
124             my $url = $self->_make_url('/ping', {
125             (
126             exists $args{wait_for_leader} ?
127             ( wait_for_leader => $args{wait_for_leader} )
128 0 0         :
129             ()
130             )
131             });
132              
133             $self->_http_request( GET => $url,
134             sub {
135 0     0     my ($body, $headers) = @_;
136 0 0         if ( $headers->{Status} eq '204' ) {
137 0           $args{on_success}->( $headers->{'x-influxdb-version'} );
138             } else {
139 0   0       $args{on_error}->( $headers->{Reason} || $body );
140             }
141             }
142 0           );
143             }
144              
145              
146             sub _to_line {
147 0     0     my $data = shift;
148              
149 0   0       my $t = $data->{tags} || {};
150 0   0       my $f = $data->{fields} || {};
151              
152             return $data->{measurement}
153             .(
154             $t ?
155             ','.
156             join(',',
157             map {
158 0           join('=', $_, $t->{$_})
159 0           } sort { $a cmp $b } keys %$t
160             )
161             :
162             ''
163             )
164             . ' '
165             .(
166             join(',',
167             map {
168 0           join('=', $_, $f->{$_})
169             } keys %$f
170             )
171             )
172             .(
173             $data->{time} ?
174             ' '. $data->{time}
175             :
176 0 0         ''
    0          
177             );
178             }
179              
180             sub write {
181 0     0 1   my ($self, %args) = @_;
182              
183             my $data = ref $args{data} eq 'ARRAY' ?
184 0 0         join("\n", map { ref $_ eq 'HASH' ? _to_line($_) : $_ } @{ $args{data} })
  0            
185             :
186 0 0         ref $args{data} eq 'HASH' ? _to_line($args{data}) : $args{data};
    0          
187              
188             my $url = $self->_make_url('/write', {
189             db => $args{database},
190             (
191             $args{consistency} ?
192             ( consistency => $args{consistency} )
193             :
194             ()
195             ),
196             (
197             $args{rp} ?
198             ( rp => $args{rp} )
199             :
200             ()
201             ),
202             (
203             $args{precision} ?
204             ( precision => $args{precision} )
205             :
206             ()
207             ),
208             (
209             $args{one} ?
210             ( one => $args{one} )
211 0 0         :
    0          
    0          
    0          
212             ()
213             )
214             });
215              
216             $self->_http_request( POST => $url, $data,
217             sub {
218 0     0     my ($body, $headers) = @_;
219              
220 0 0         if ( $headers->{Status} eq '204' ) {
221 0           $args{on_success}->();
222             } else {
223 0           $args{on_error}->( $body );
224             }
225             }
226 0           );
227             }
228              
229              
230             sub select {
231 0     0 1   my ($self, %args) = @_;
232              
233 0           my $method = 'GET';
234 0           my $q;
235 0 0         if ( exists $args{q} ) {
236 0           $q = $args{q};
237 0 0         if ( $q =~ /\s+INTO\s+/i ) {
238 0           $method = 'POST';
239             }
240             } else {
241 0           $q = 'SELECT '. $args{fields};
242              
243 0 0         if ( my $into = $args{into} ) {
244 0           $q .= ' INTO '. $into;
245 0           $method = 'POST';
246             }
247              
248 0           $q .= ' FROM '. $args{measurement};
249              
250 0 0         if ( my $cond = $args{where} ) {
251 0           $q .= ' WHERE '. $cond;
252             }
253              
254 0 0         if ( my $group = $args{group_by} ) {
255 0           $q .= ' GROUP BY '. $group;
256              
257 0 0         if ( my $fill = $args{fill} ) {
258 0           $q .= ' fill('. $fill .')';
259             }
260             }
261              
262 0 0         if ( my $order_by = $args{order_by} ) {
263 0           $q .= ' ORDER BY '. $order_by;
264             }
265              
266 0 0         if ( my $limit = $args{limit} ) {
267 0           $q .= ' LIMIT '. $limit;
268              
269 0 0         if ( my $offset = $args{offset} ) {
270 0           $q .= ' OFFSET '. $offset;
271             }
272             }
273              
274 0 0         if ( my $slimit = $args{slimit} ) {
275 0           $q .= ' SLIMIT '. $slimit;
276              
277 0 0         if ( my $soffset = $args{soffset} ) {
278 0           $q .= ' SOFFSET '. $soffset;
279             }
280             }
281             }
282              
283             my $url = $self->_make_url('/query', {
284             db => $args{database},
285             q => $q,
286             (
287             $args{rp} ?
288             ( rp => $args{rp} )
289             :
290             ()
291             ),
292             (
293             $args{epoch} ?
294             ( epoch => $args{epoch} )
295             :
296             ()
297             ),
298             (
299             $args{chunk_size} ?
300             ( chunk_size => $args{chunk_size} )
301 0 0         :
    0          
    0          
302             ()
303             ),
304             });
305              
306             $self->_http_request( $method => $url,
307             sub {
308 0     0     my ($body, $headers) = @_;
309              
310 0 0         if ( $headers->{Status} eq '200' ) {
311 0           my $data = decode_json($body);
312             my $series = [
313             map {
314 0           my $res = $_;
315              
316 0           my $cols = $res->{columns};
317 0           my $values = $res->{values};
318              
319             +{
320             name => $res->{name},
321             values => [
322             map {
323             +{
324 0           zip(@$cols, @$_)
325             }
326 0 0         } @{ $values || [] }
  0            
327             ]
328             }
329 0 0         } @{ $data->{results}->[0]->{series} || [] }
  0            
330             ];
331 0           $args{on_success}->($series);
332             } else {
333 0           $args{on_error}->( $body );
334             }
335             }
336 0           );
337             }
338              
339              
340             sub create_database {
341 0     0 1   my ($self, %args) = @_;
342              
343 0           my $q;
344 0 0         if ( exists $args{q} ) {
345 0           $q = $args{q};
346             } else {
347             $q = 'CREATE DATABASE '.
348             (
349             $args{if_not_exists} ? 'IF NOT EXISTS ' : ''
350             )
351 0 0         .$args{database};
352             }
353              
354 0           my $url = $self->_make_url('/query', {
355             q => $q,
356             });
357              
358             $self->_http_request( POST => $url,
359             sub {
360 0     0     my ($body, $headers) = @_;
361              
362 0 0         if ( $headers->{Status} eq '200' ) {
363 0           $args{on_success}->( $body );
364             } else {
365 0           $args{on_error}->( $body );
366             }
367             }
368 0           );
369             }
370              
371              
372             sub drop_database {
373 0     0 1   my ($self, %args) = @_;
374              
375 0           my $q;
376 0 0         if ( exists $args{q} ) {
377 0           $q = $args{q};
378             } else {
379             $q = 'DROP DATABASE '
380             .
381             (
382             $args{if_exists} ? 'IF EXISTS ' : ''
383 0 0         ) . $args{database};
384             }
385              
386 0           my $url = $self->_make_url('/query', {
387             q => $q,
388             });
389              
390             $self->_http_request( POST => $url,
391             sub {
392 0     0     my ($body, $headers) = @_;
393              
394 0 0 0       if ( $headers->{Status} eq '200' && $body eq '{"results":[{}]}' ) {
395 0           $args{on_success}->();
396             } else {
397 0           $args{on_error}->( $body );
398             }
399             }
400 0           );
401             }
402              
403              
404             sub drop_series {
405 0     0 1   my ($self, %args) = @_;
406              
407 0           my $q;
408 0 0         if ( exists $args{q} ) {
409 0           $q = $args{q};
410             } else {
411 0           $q = 'DROP SERIES';
412              
413 0 0         if ( my $measurement = $args{measurement} ) {
    0          
414 0           $q .= ' FROM '. $measurement;
415             }
416             elsif ( my $measurements = $args{measurements} ) {
417 0 0         $q .= ' FROM '. join(',', @{ $measurements || [] });
  0            
418             }
419              
420 0 0         if ( my $cond = $args{where} ) {
421 0           $q .= ' WHERE '. $cond;
422             }
423             }
424              
425             my $url = $self->_make_url('/query', {
426             db => $args{database},
427 0           q => $q
428             });
429              
430             $self->_http_request( POST => $url,
431             sub {
432 0     0     my ($body, $headers) = @_;
433              
434 0 0 0       if ( $headers->{Status} eq '200' && $body eq '{"results":[{}]}' ) {
435 0           $args{on_success}->();
436             } else {
437 0           $args{on_error}->( $body );
438             }
439             }
440 0           );
441             }
442              
443              
444             sub delete_series {
445 0     0 1   my ($self, %args) = @_;
446              
447 0           my $q;
448 0 0         if ( exists $args{q} ) {
449 0           $q = $args{q};
450             } else {
451 0           $q = 'DELETE';
452              
453 0 0         if ( my $measurement = $args{measurement} ) {
454 0           $q .= ' FROM '. $measurement;
455             }
456              
457 0 0         if ( my $cond = $args{where} ) {
458 0           $q .= ' WHERE '. $cond;
459             }
460             }
461              
462             my $url = $self->_make_url('/query', {
463             db => $args{database},
464 0           q => $q
465             });
466              
467             $self->_http_request( POST => $url,
468             sub {
469 0     0     my ($body, $headers) = @_;
470              
471 0 0 0       if ( $headers->{Status} eq '200' && $body eq '{"results":[{}]}' ) {
472 0           $args{on_success}->();
473             } else {
474 0           $args{on_error}->( $body );
475             }
476             }
477 0           );
478             }
479              
480              
481             sub drop_measurement {
482 0     0 1   my ($self, %args) = @_;
483              
484 0           my $q;
485 0 0         if ( exists $args{q} ) {
486 0           $q = $args{q};
487             } else {
488 0           $q = 'DROP MEASUREMENT '. $args{measurement};
489             }
490              
491             my $url = $self->_make_url('/query', {
492             db => $args{database},
493 0           q => $q
494             });
495              
496             $self->_http_request( POST => $url,
497             sub {
498 0     0     my ($body, $headers) = @_;
499              
500 0 0 0       if ( $headers->{Status} eq '200' && $body eq '{"results":[{}]}' ) {
501 0           $args{on_success}->();
502             } else {
503 0           $args{on_error}->( $body );
504             }
505             }
506 0           );
507             }
508              
509              
510             sub show_shards {
511 0     0 1   my ($self, %args) = @_;
512              
513 0           my $url = $self->_make_url('/query', {
514             q => 'SHOW SHARDS'
515             });
516              
517             $self->_http_request( GET => $url,
518             sub {
519 0     0     my ($body, $headers) = @_;
520              
521 0 0         if ( $headers->{Status} eq '200' ) {
522 0           my $data = decode_json($body);
523 0           my $shards = {};
524 0 0         for my $res ( @{ $data->{results}->[0]->{series} || [] } ) {
  0            
525 0           my $cols = $res->{columns};
526 0           my $values = $res->{values};
527             $shards->{ $res->{name } } = [
528             map {
529             +{
530 0           zip(@$cols, @$_)
531             }
532 0 0         } @{ $values || [] }
  0            
533             ];
534             }
535 0           $args{on_success}->($shards);
536             } else {
537 0           $args{on_error}->( $body );
538             }
539             }
540 0           );
541             }
542              
543              
544             sub show_shard_groups {
545 0     0 1   my ($self, %args) = @_;
546              
547 0           my $url = $self->_make_url('/query', {
548             q => 'SHOW SHARD GROUPS'
549             });
550              
551             $self->_http_request( GET => $url,
552             sub {
553 0     0     my ($body, $headers) = @_;
554              
555 0 0         if ( $headers->{Status} eq '200' ) {
556 0           my $data = decode_json($body);
557 0           my $res = $data->{results}->[0]->{series}->[0];
558 0           my $cols = $res->{columns};
559 0           my $values = $res->{values};
560             my @shard_groups = (
561             map {
562             +{
563 0           zip(@$cols, @$_)
564             }
565 0 0         } @{ $values || [] }
  0            
566             );
567 0           $args{on_success}->(@shard_groups);
568             } else {
569 0           $args{on_error}->( $body );
570             }
571             }
572 0           );
573             }
574              
575              
576             sub drop_shard {
577 0     0 1   my ($self, %args) = @_;
578              
579 0           my $q;
580 0 0         if ( exists $args{q} ) {
581 0           $q = $args{q};
582             } else {
583 0           $q = 'DROP SHARD '. $args{id};
584             }
585              
586             my $url = $self->_make_url('/query', {
587             db => $args{database},
588 0           q => $q
589             });
590              
591             $self->_http_request( POST => $url,
592             sub {
593 0     0     my ($body, $headers) = @_;
594              
595 0 0 0       if ( $headers->{Status} eq '200' && $body eq '{"results":[{}]}' ) {
596 0           $args{on_success}->();
597             } else {
598 0           $args{on_error}->( $body );
599             }
600             }
601 0           );
602             }
603              
604              
605             sub show_queries {
606 0     0 1   my ($self, %args) = @_;
607              
608 0           my $url = $self->_make_url('/query', {
609             q => 'SHOW QUERIES'
610             });
611              
612             $self->_http_request( GET => $url,
613             sub {
614 0     0     my ($body, $headers) = @_;
615              
616 0 0         if ( $headers->{Status} eq '200' ) {
617 0           my $data = decode_json($body);
618 0           my $res = $data->{results}->[0]->{series}->[0];
619 0           my $cols = $res->{columns};
620 0           my $values = $res->{values};
621             my @queries = (
622             map {
623             +{
624 0           zip(@$cols, @$_)
625             }
626 0 0         } @{ $values || [] }
  0            
627             );
628 0           $args{on_success}->(@queries);
629             } else {
630 0           $args{on_error}->( $body );
631             }
632             }
633 0           );
634             }
635              
636              
637             sub kill_query {
638 0     0 1   my ($self, %args) = @_;
639              
640 0           my $q;
641 0 0         if ( exists $args{q} ) {
642 0           $q = $args{q};
643             } else {
644 0           $q = 'KILL QUERY '. $args{id};
645             }
646              
647             my $url = $self->_make_url('/query', {
648             db => $args{database},
649 0           q => $q
650             });
651              
652             $self->_http_request( POST => $url,
653             sub {
654 0     0     my ($body, $headers) = @_;
655              
656 0 0 0       if ( $headers->{Status} eq '200' && $body eq '{"results":[{}]}' ) {
657 0           $args{on_success}->();
658             } else {
659 0           $args{on_error}->( $body );
660             }
661             }
662 0           );
663             }
664              
665              
666              
667             sub create_retention_policy {
668 0     0 1   my ($self, %args) = @_;
669              
670 0           my $q;
671 0 0         if ( exists $args{q} ) {
672 0           $q = $args{q};
673             } else {
674             $q = 'CREATE RETENTION POLICY '. $args{name}
675             .' ON '. $args{database}
676             .' DURATION '. $args{duration}
677             .' REPLICATION '. $args{replication}
678 0           .' SHARD DURATION '. $args{shard_duration};
679              
680 0 0         $q .= ' DEFAULT' if $args{default};
681             }
682              
683 0           my $url = $self->_make_url('/query', {
684             q => $q,
685             });
686              
687             $self->_http_request( POST => $url,
688             sub {
689 0     0     my ($body, $headers) = @_;
690              
691 0 0 0       if ( $headers->{Status} eq '200' && $body eq '{"results":[{}]}' ) {
692 0           $args{on_success}->();
693             } else {
694 0           $args{on_error}->( $body );
695             }
696             }
697 0           );
698             }
699              
700              
701             sub alter_retention_policy {
702 0     0 1   my ($self, %args) = @_;
703              
704 0           my $q;
705 0 0         if ( exists $args{q} ) {
706 0           $q = $args{q};
707             } else {
708             $q = 'ALTER RETENTION POLICY '. $args{name}
709 0           .' ON '. $args{database};
710              
711 0 0         $q .= ' DURATION '. $args{duration} if exists $args{duration};
712 0 0         $q .= ' SHARD DURATION '. $args{shard_duration} if exists $args{shard_duration};
713 0 0         $q .= ' REPLICATION '. $args{replication} if exists $args{replication};;
714 0 0         $q .= ' DEFAULT' if $args{default};
715             }
716              
717 0           my $url = $self->_make_url('/query', {
718             q => $q
719             });
720              
721             $self->_http_request( POST => $url,
722             sub {
723 0     0     my ($body, $headers) = @_;
724              
725 0 0 0       if ( $headers->{Status} eq '200' && $body eq '{"results":[{}]}' ) {
726 0           $args{on_success}->();
727             } else {
728 0           $args{on_error}->( $body );
729             }
730             }
731 0           );
732             }
733              
734              
735             sub drop_retention_policy {
736 0     0 1   my ($self, %args) = @_;
737              
738 0           my $q;
739 0 0         if ( exists $args{q} ) {
740 0           $q = $args{q};
741             } else {
742 0           $q = 'DROP RETENTION POLICY '. $args{name} .' ON '. $args{database};
743             }
744              
745 0           my $url = $self->_make_url('/query', {
746             q => $q,
747             });
748              
749             $self->_http_request( POST => $url,
750             sub {
751 0     0     my ($body, $headers) = @_;
752              
753 0 0 0       if ( $headers->{Status} eq '200' && $body eq '{"results":[{}]}' ) {
754 0           $args{on_success}->();
755             } else {
756 0           $args{on_error}->( $body );
757             }
758             }
759 0           );
760             }
761              
762              
763             sub show_databases {
764 0     0 1   my ($self, %args) = @_;
765              
766 0           my $url = $self->_make_url('/query', {
767             q => 'SHOW DATABASES'
768             });
769              
770             $self->_http_request( GET => $url,
771             sub {
772 0     0     my ($body, $headers) = @_;
773              
774 0 0         if ( $headers->{Status} eq '200' ) {
775 0           my $data = decode_json($body);
776 0           my @names;
777 0           eval {
778 0 0         @names = map { $_->[0] } @{ $data->{results}->[0]->{series}->[0]->{values} || [] };
  0            
  0            
779             };
780 0           $args{on_success}->(@names);
781             } else {
782 0           $args{on_error}->( $body );
783             }
784             }
785 0           );
786             }
787              
788              
789             sub show_retention_policies {
790 0     0 1   my ($self, %args) = @_;
791              
792 0           my $q;
793 0 0         if ( exists $args{q} ) {
794 0           $q = $args{q};
795             } else {
796 0           $q = 'SHOW RETENTION POLICIES ON '. $args{database};
797             }
798              
799 0           my $url = $self->_make_url('/query', {
800             q => $q,
801             });
802              
803             $self->_http_request( GET => $url,
804             sub {
805 0     0     my ($body, $headers) = @_;
806              
807 0 0         if ( $headers->{Status} eq '200' ) {
808 0           my $data = decode_json($body);
809 0           my $res = $data->{results}->[0]->{series}->[0];
810 0           my $cols = $res->{columns};
811 0           my $values = $res->{values};
812             my @policies = (
813             map {
814             +{
815 0           zip(@$cols, @$_)
816             }
817 0 0         } @{ $values || [] }
  0            
818             );
819 0           $args{on_success}->(@policies);
820             } else {
821 0           $args{on_error}->( $body );
822             }
823             }
824 0           );
825             }
826              
827              
828             sub show_series {
829 0     0 1   my ($self, %args) = @_;
830              
831 0           my $q;
832 0 0         if ( exists $args{q} ) {
833 0           $q = $args{q};
834             } else {
835 0           $q = 'SHOW SERIES';
836              
837 0 0         if ( my $measurement = $args{measurement} ) {
838 0           $q .= ' FROM '. $measurement;
839             }
840              
841 0 0         if ( my $cond = $args{where} ) {
842 0           $q .= ' WHERE '. $cond;
843             }
844              
845 0 0         if ( my $order_by = $args{order_by} ) {
846 0           $q .= ' ORDER BY '. $order_by;
847             }
848              
849 0 0         if ( my $limit = $args{limit} ) {
850 0           $q .= ' LIMIT '. $limit;
851              
852 0 0         if ( my $offset = $args{offset} ) {
853 0           $q .= ' OFFSET '. $offset;
854             }
855             }
856             }
857              
858             my $url = $self->_make_url('/query', {
859             db => $args{database},
860 0           q => $q
861             });
862              
863             $self->_http_request( GET => $url,
864             sub {
865 0     0     my ($body, $headers) = @_;
866              
867 0 0         if ( $headers->{Status} eq '200' ) {
868 0           my $data = decode_json($body);
869 0           my $res = $data->{results}->[0]->{series}->[0];
870 0           my $values = $res->{values};
871             my @series = (
872 0 0         map { @$_ } @{ $values || [] }
  0            
  0            
873             );
874 0           $args{on_success}->(@series);
875             } else {
876 0           $args{on_error}->( $body );
877             }
878             }
879 0           );
880             }
881              
882              
883             sub show_measurements {
884 0     0 1   my ($self, %args) = @_;
885              
886 0           my $q;
887 0 0         if ( exists $args{q} ) {
888 0           $q = $args{q};
889             } else {
890 0           $q = 'SHOW MEASUREMENTS';
891              
892 0 0         if ( my $measurement = $args{measurement} ) {
893 0 0         $q .= ' WITH MEASUREMENT '
894             . ( $measurement =~ /^\/.*\/$/ ? '=~' : '=' )
895             . $measurement;
896             }
897              
898 0 0         if ( my $cond = $args{where} ) {
899 0           $q .= ' WHERE '. $cond;
900             }
901              
902 0 0         if ( my $order_by = $args{order_by} ) {
903 0           $q .= ' ORDER BY '. $order_by;
904             }
905              
906 0 0         if ( my $limit = $args{limit} ) {
907 0           $q .= ' LIMIT '. $limit;
908              
909 0 0         if ( my $offset = $args{offset} ) {
910 0           $q .= ' OFFSET '. $offset;
911             }
912             }
913             }
914              
915             my $url = $self->_make_url('/query', {
916             db => $args{database},
917 0           q => $q
918             });
919              
920             $self->_http_request( GET => $url,
921             sub {
922 0     0     my ($body, $headers) = @_;
923              
924 0 0         if ( $headers->{Status} eq '200' ) {
925 0           my $data = decode_json($body);
926 0           my $res = $data->{results}->[0]->{series}->[0];
927 0           my $values = $res->{values};
928             my @measurements = (
929 0 0         map { @$_ } @{ $values || [] }
  0            
  0            
930             );
931 0           $args{on_success}->(@measurements);
932             } else {
933 0           $args{on_error}->( $body );
934             }
935             }
936 0           );
937             }
938              
939              
940             sub show_tag_keys {
941 0     0 1   my ($self, %args) = @_;
942              
943 0           my $q;
944 0 0         if ( exists $args{q} ) {
945 0           $q = $args{q};
946             } else {
947 0           $q = 'SHOW TAG KEYS';
948              
949 0 0         if ( my $measurement = $args{measurement} ) {
950 0           $q .= ' FROM '. $measurement;
951             }
952              
953 0 0         if ( my $cond = $args{where} ) {
954 0           $q .= ' WHERE '. $cond;
955             }
956              
957 0 0         if ( my $limit = $args{limit} ) {
958 0           $q .= ' LIMIT '. $limit;
959             }
960              
961 0 0         if ( my $offset = $args{offset} ) {
962 0           $q .= ' OFFSET '. $offset;
963             }
964             }
965              
966             my $url = $self->_make_url('/query', {
967             db => $args{database},
968 0           q => $q
969             });
970              
971             $self->_http_request( GET => $url,
972             sub {
973 0     0     my ($body, $headers) = @_;
974              
975 0 0         if ( $headers->{Status} eq '200' ) {
976 0           my $data = decode_json($body);
977 0           my $tag_keys = {};
978 0 0         for my $res ( @{ $data->{results}->[0]->{series} || [] } ) {
  0            
979 0           my $values = $res->{values};
980             $tag_keys->{ $res->{name } } = [
981             map {
982 0           @$_
983 0 0         } @{ $values || [] }
  0            
984             ];
985             }
986 0           $args{on_success}->($tag_keys);
987             } else {
988 0           $args{on_error}->( $body );
989             }
990             }
991 0           );
992             }
993              
994              
995             sub show_tag_values {
996 0     0 1   my ($self, %args) = @_;
997              
998 0           my $q;
999 0 0         if ( exists $args{q} ) {
1000 0           $q = $args{q};
1001             } else {
1002 0           $q = 'SHOW TAG VALUES';
1003              
1004 0 0         if ( my $measurement = $args{measurement} ) {
1005 0           $q .= ' FROM '. $measurement;
1006             }
1007              
1008 0 0         if ( my $keys = $args{keys} ) {
    0          
1009 0           $q .= ' WITH KEY IN ('. join(", ", @$keys) .')';
1010             }
1011             elsif ( my $key = $args{key} ) {
1012 0           $q .= ' WITH KEY = '. $key;
1013             }
1014              
1015 0 0         if ( my $cond = $args{where} ) {
1016 0           $q .= ' WHERE '. $cond;
1017             }
1018              
1019 0 0         if ( my $limit = $args{limit} ) {
1020 0           $q .= ' LIMIT '. $limit;
1021             }
1022              
1023 0 0         if ( my $offset = $args{offset} ) {
1024 0           $q .= ' OFFSET '. $offset;
1025             }
1026             }
1027              
1028             my $url = $self->_make_url('/query', {
1029             db => $args{database},
1030 0           q => $q
1031             });
1032              
1033             $self->_http_request( GET => $url,
1034             sub {
1035 0     0     my ($body, $headers) = @_;
1036              
1037 0 0         if ( $headers->{Status} eq '200' ) {
1038 0           my $data = decode_json($body);
1039 0           my $tag_values = {};
1040 0 0         for my $res ( @{ $data->{results}->[0]->{series} || [] } ) {
  0            
1041 0           my $cols = $res->{columns};
1042 0           my %col_idx = ( key => 0, value => 1 );
1043 0 0         for ( my $i = 0; $i < @{ $cols || [] }; $i++ ) {
  0            
1044 0           $col_idx{ $cols->[$i] } = $i;
1045             }
1046 0           my $values = $res->{values};
1047 0 0         for my $v ( @{ $values || [] } ) {
  0            
1048 0           push @{ $tag_values->{ $res->{name } }->{ $v->[ $col_idx{key} ] } },
1049 0           $v->[ $col_idx{value} ];
1050             }
1051             }
1052 0           $args{on_success}->($tag_values);
1053             } else {
1054 0           $args{on_error}->( $body );
1055             }
1056             }
1057 0           );
1058             }
1059              
1060              
1061             sub show_field_keys {
1062 0     0 1   my ($self, %args) = @_;
1063              
1064 0           my $q;
1065 0 0         if ( exists $args{q} ) {
1066 0           $q = $args{q};
1067             } else {
1068 0           $q = 'SHOW FIELD KEYS';
1069              
1070 0 0         if ( my $measurement = $args{measurement} ) {
1071 0           $q .= ' FROM '. $measurement;
1072             }
1073             }
1074              
1075             my $url = $self->_make_url('/query', {
1076             db => $args{database},
1077 0           q => $q
1078             });
1079              
1080             $self->_http_request( GET => $url,
1081             sub {
1082 0     0     my ($body, $headers) = @_;
1083              
1084 0 0         if ( $headers->{Status} eq '200' ) {
1085 0           my $data = decode_json($body);
1086 0           my $field_keys = {};
1087 0 0         for my $res ( @{ $data->{results}->[0]->{series} || [] } ) {
  0            
1088 0           my $values = $res->{values};
1089             $field_keys->{ $res->{name } } = [
1090             map {
1091             +{
1092 0           name => $_->[0],
1093             type => $_->[1],
1094             }
1095 0 0         } @{ $values || [] }
  0            
1096             ];
1097             }
1098 0           $args{on_success}->($field_keys);
1099             } else {
1100 0           $args{on_error}->( $body );
1101             }
1102             }
1103 0           );
1104             }
1105              
1106              
1107             sub create_user {
1108 0     0 1   my ($self, %args) = @_;
1109              
1110 0           my $q;
1111 0 0         if ( exists $args{q} ) {
1112 0           $q = $args{q};
1113             } else {
1114             $q = 'CREATE USER '. $args{username}
1115 0           .' WITH PASSWORD \''. $args{password} .'\'';
1116              
1117 0 0         $q .= ' WITH ALL PRIVILEGES' if $args{all_privileges};
1118             }
1119              
1120 0           my $url = $self->_make_url('/query', {
1121             q => $q
1122             });
1123              
1124             $self->_http_request( POST => $url,
1125             sub {
1126 0     0     my ($body, $headers) = @_;
1127              
1128 0 0 0       if ( $headers->{Status} eq '200' && $body eq '{"results":[{}]}' ) {
1129 0           $args{on_success}->();
1130             } else {
1131 0           $args{on_error}->( $body );
1132             }
1133             }
1134 0           );
1135             }
1136              
1137              
1138             sub set_user_password {
1139 0     0 1   my ($self, %args) = @_;
1140              
1141 0           my $q;
1142 0 0         if ( exists $args{q} ) {
1143 0           $q = $args{q};
1144             } else {
1145             $q = 'SET PASSWORD FOR '. $args{username}
1146 0           .' = \''. $args{password} .'\'';
1147             }
1148              
1149 0           my $url = $self->_make_url('/query', {
1150             q => $q
1151             });
1152              
1153             $self->_http_request( POST => $url,
1154             sub {
1155 0     0     my ($body, $headers) = @_;
1156              
1157 0 0 0       if ( $headers->{Status} eq '200' && $body eq '{"results":[{}]}' ) {
1158 0           $args{on_success}->();
1159             } else {
1160 0           $args{on_error}->( $body );
1161             }
1162             }
1163 0           );
1164             }
1165              
1166              
1167             sub show_users {
1168 0     0 1   my ($self, %args) = @_;
1169              
1170 0           my $url = $self->_make_url('/query', {
1171             q => 'SHOW USERS'
1172             });
1173              
1174             $self->_http_request( GET => $url,
1175             sub {
1176 0     0     my ($body, $headers) = @_;
1177              
1178 0 0         if ( $headers->{Status} eq '200' ) {
1179 0           my $data = decode_json($body);
1180 0           my $res = $data->{results}->[0]->{series}->[0];
1181 0           my $cols = $res->{columns};
1182 0           my $values = $res->{values};
1183             my @users = (
1184             map {
1185             +{
1186 0           zip(@$cols, @$_)
1187             }
1188 0 0         } @{ $values || [] }
  0            
1189             );
1190 0           $args{on_success}->(@users);
1191             } else {
1192 0           $args{on_error}->( $body );
1193             }
1194             }
1195 0           );
1196             }
1197              
1198              
1199             sub grant_privileges {
1200 0     0 1   my ($self, %args) = @_;
1201              
1202 0           my $q;
1203 0 0         if ( exists $args{q} ) {
1204 0           $q = $args{q};
1205             } else {
1206 0           $q = 'GRANT ';
1207              
1208 0 0         if ( $args{all_privileges} ) {
1209 0           $q .= 'ALL PRIVILEGES';
1210             } else {
1211 0           $q .= $args{access} .' ON '. $args{database};
1212             }
1213 0           $q .= ' TO '. $args{username};
1214             }
1215              
1216 0           my $url = $self->_make_url('/query', {
1217             q => $q
1218             });
1219              
1220             $self->_http_request( POST => $url,
1221             sub {
1222 0     0     my ($body, $headers) = @_;
1223              
1224 0 0 0       if ( $headers->{Status} eq '200' && $body eq '{"results":[{}]}' ) {
1225 0           $args{on_success}->();
1226             } else {
1227 0           $args{on_error}->( $body );
1228             }
1229             }
1230 0           );
1231             }
1232              
1233              
1234             sub show_grants {
1235 0     0 1   my ($self, %args) = @_;
1236              
1237 0           my $q;
1238 0 0         if ( exists $args{q} ) {
1239 0           $q = $args{q};
1240             } else {
1241 0           $q = 'SHOW GRANTS FOR '. $args{username};
1242             }
1243              
1244 0           my $url = $self->_make_url('/query', {
1245             q => $q
1246             });
1247              
1248             $self->_http_request( GET => $url,
1249             sub {
1250 0     0     my ($body, $headers) = @_;
1251              
1252 0 0         if ( $headers->{Status} eq '200' ) {
1253 0           my $data = decode_json($body);
1254 0           my $res = $data->{results}->[0]->{series}->[0];
1255 0           my $cols = $res->{columns};
1256 0           my $values = $res->{values};
1257             my @grants = (
1258             map {
1259             +{
1260 0           zip(@$cols, @$_)
1261             }
1262 0 0         } @{ $values || [] }
  0            
1263             );
1264 0           $args{on_success}->(@grants);
1265             } else {
1266 0           $args{on_error}->( $body );
1267             }
1268             }
1269 0           );
1270             }
1271              
1272              
1273              
1274             sub revoke_privileges {
1275 0     0 1   my ($self, %args) = @_;
1276              
1277 0           my $q;
1278 0 0         if ( exists $args{q} ) {
1279 0           $q = $args{q};
1280             } else {
1281 0           $q = 'REVOKE ';
1282              
1283 0 0         if ( $args{all_privileges} ) {
1284 0           $q .= 'ALL PRIVILEGES';
1285             } else {
1286 0           $q .= $args{access} .' ON '. $args{database};
1287             }
1288 0           $q .= ' FROM '. $args{username};
1289             }
1290              
1291 0           my $url = $self->_make_url('/query', {
1292             q => $q
1293             });
1294              
1295             $self->_http_request( POST => $url,
1296             sub {
1297 0     0     my ($body, $headers) = @_;
1298              
1299 0 0 0       if ( $headers->{Status} eq '200' && $body eq '{"results":[{}]}' ) {
1300 0           $args{on_success}->();
1301             } else {
1302 0           $args{on_error}->( $body );
1303             }
1304             }
1305 0           );
1306             }
1307              
1308              
1309              
1310             sub drop_user {
1311 0     0 1   my ($self, %args) = @_;
1312              
1313 0           my $q;
1314 0 0         if ( exists $args{q} ) {
1315 0           $q = $args{q};
1316             } else {
1317 0           $q = 'DROP USER '. $args{username};
1318             }
1319              
1320 0           my $url = $self->_make_url('/query', {
1321             q => $q
1322             });
1323              
1324             $self->_http_request( POST => $url,
1325             sub {
1326 0     0     my ($body, $headers) = @_;
1327              
1328 0 0 0       if ( $headers->{Status} eq '200' && $body eq '{"results":[{}]}' ) {
1329 0           $args{on_success}->();
1330             } else {
1331 0           $args{on_error}->( $body );
1332             }
1333             }
1334 0           );
1335             }
1336              
1337              
1338             sub create_continuous_query {
1339 0     0 1   my ($self, %args) = @_;
1340              
1341 0           my $q;
1342 0 0         if ( exists $args{q} ) {
1343 0           $q = $args{q};
1344             } else {
1345 0           my $resample = '';
1346 0 0 0       if ( $args{every} || $args{for} ) {
1347 0           $resample = ' RESAMPLE';
1348 0 0         if ( $args{every} ) {
1349 0           $resample .= ' EVERY '. $args{every};
1350             }
1351 0 0         if ( $args{for} ) {
1352 0           $resample .= ' FOR '. $args{for};
1353             }
1354             }
1355             $q = 'CREATE CONTINUOUS QUERY '. $args{name}
1356             .' ON '. $args{database}
1357             . $resample
1358             .' BEGIN '. $args{query}
1359 0           .' END';
1360             }
1361              
1362 0           my $url = $self->_make_url('/query', {
1363             q => $q
1364             });
1365              
1366             $self->_http_request( POST => $url,
1367             sub {
1368 0     0     my ($body, $headers) = @_;
1369              
1370 0 0 0       if ( $headers->{Status} eq '200' && $body eq '{"results":[{}]}' ) {
1371 0           $args{on_success}->();
1372             } else {
1373 0           $args{on_error}->( $body );
1374             }
1375             }
1376 0           );
1377             }
1378              
1379              
1380             sub drop_continuous_query {
1381 0     0 1   my ($self, %args) = @_;
1382              
1383 0           my $q;
1384 0 0         if ( exists $args{q} ) {
1385 0           $q = $args{q};
1386             } else {
1387 0           $q = 'DROP CONTINUOUS QUERY '. $args{name} . ' ON '. $args{database};
1388             }
1389              
1390 0           my $url = $self->_make_url('/query', {
1391             q => $q
1392             });
1393              
1394             $self->_http_request( POST => $url,
1395             sub {
1396 0     0     my ($body, $headers) = @_;
1397              
1398 0 0 0       if ( $headers->{Status} eq '200' && $body eq '{"results":[{}]}' ) {
1399 0           $args{on_success}->();
1400             } else {
1401 0           $args{on_error}->( $body );
1402             }
1403             }
1404 0           );
1405             }
1406              
1407              
1408             sub show_continuous_queries {
1409 0     0 1   my ($self, %args) = @_;
1410              
1411             my $url = $self->_make_url('/query', {
1412             db => $args{database},
1413 0           q => 'SHOW CONTINUOUS QUERIES'
1414             });
1415              
1416             $self->_http_request( GET => $url,
1417             sub {
1418 0     0     my ($body, $headers) = @_;
1419              
1420 0 0         if ( $headers->{Status} eq '200' ) {
1421 0           my $data = decode_json($body);
1422 0           my $cqs = {};
1423 0 0         for my $res ( @{ $data->{results}->[0]->{series} || [] } ) {
  0            
1424 0           my $cols = $res->{columns};
1425 0           my $values = $res->{values};
1426             $cqs->{ $res->{name } } = [
1427             map {
1428             +{
1429 0           zip(@$cols, @$_)
1430             }
1431 0 0         } @{ $values || [] }
  0            
1432             ];
1433             }
1434 0           $args{on_success}->($cqs);
1435             } else {
1436 0           $args{on_error}->( $body );
1437             }
1438             }
1439 0           );
1440             }
1441              
1442              
1443             sub create_subscription {
1444 0     0 1   my ($self, %args) = @_;
1445              
1446 0           my $q;
1447 0 0         if ( exists $args{q} ) {
1448 0           $q = $args{q};
1449             } else {
1450             $q = 'CREATE SUBSCRIPTION '. $args{name} .' ON '
1451             . $args{database} .'.'. $args{rp}
1452             . ' DESTINATIONS '. $args{mode} .' '
1453             . (
1454             ref $args{destinations} eq 'ARRAY' ?
1455 0 0         join(", ", @{ $args{destinations} || [] } )
1456             :
1457             $args{destinations}
1458 0 0         );
1459             }
1460              
1461 0           my $url = $self->_make_url('/query', {
1462             q => $q,
1463             });
1464              
1465             $self->_http_request( POST => $url,
1466             sub {
1467 0     0     my ($body, $headers) = @_;
1468              
1469 0 0 0       if ( $headers->{Status} eq '200' && $body eq '{"results":[{}]}' ) {
1470 0           $args{on_success}->();
1471             } else {
1472 0           $args{on_error}->( $body );
1473             }
1474             }
1475 0           );
1476             }
1477              
1478              
1479             sub show_subscriptions {
1480 0     0 1   my ($self, %args) = @_;
1481              
1482 0           my $url = $self->_make_url('/query', {
1483             q => 'SHOW SUBSCRIPTIONS'
1484             });
1485              
1486             $self->_http_request( GET => $url,
1487             sub {
1488 0     0     my ($body, $headers) = @_;
1489              
1490 0 0         if ( $headers->{Status} eq '200' ) {
1491 0           my $data = decode_json($body);
1492 0           my $subscriptions = {};
1493 0 0         for my $res ( @{ $data->{results}->[0]->{series} || [] } ) {
  0            
1494 0           my $cols = $res->{columns};
1495 0           my $values = $res->{values};
1496             $subscriptions->{ $res->{name } } = [
1497             map {
1498             +{
1499 0           zip(@$cols, @$_)
1500             }
1501 0 0         } @{ $values || [] }
  0            
1502             ];
1503             }
1504 0           $args{on_success}->($subscriptions);
1505             } else {
1506 0           $args{on_error}->( $body );
1507             }
1508             }
1509 0           );
1510             }
1511              
1512              
1513             sub drop_subscription {
1514 0     0 1   my ($self, %args) = @_;
1515              
1516 0           my $q;
1517 0 0         if ( exists $args{q} ) {
1518 0           $q = $args{q};
1519             } else {
1520             $q = 'DROP SUBSCRIPTION '. $args{name} .' ON '
1521 0           . $args{database} .'.'. $args{rp};
1522             }
1523              
1524 0           my $url = $self->_make_url('/query', {
1525             q => $q,
1526             });
1527              
1528             $self->_http_request( POST => $url,
1529             sub {
1530 0     0     my ($body, $headers) = @_;
1531              
1532 0 0 0       if ( $headers->{Status} eq '200' && $body eq '{"results":[{}]}' ) {
1533 0           $args{on_success}->();
1534             } else {
1535 0           $args{on_error}->( $body );
1536             }
1537             }
1538 0           );
1539             }
1540              
1541              
1542             sub query {
1543 0     0 1   my ($self, %args) = @_;
1544              
1545 0           my $url = $self->_server_uri->clone;
1546 0           $url->path('/query');
1547 0           $url->query_form_hash( $args{query} );
1548              
1549 0   0       my $method = $args{method} || 'GET';
1550              
1551             $self->_http_request( $method => $url,
1552             sub {
1553 0     0     $args{on_response}->(@_);
1554             }
1555 0           );
1556             }
1557              
1558              
1559             1;
1560              
1561             __END__