File Coverage

blib/lib/AnyEvent/InfluxDB.pm
Criterion Covered Total %
statement 30 658 4.5
branch 0 318 0.0
condition 0 69 0.0
subroutine 10 91 10.9
pod 37 37 100.0
total 77 1173 6.5


line stmt bran cond sub pod time code
1             #ABSTRACT: An asynchronous library for InfluxDB time-series database
2 1     1   567 use strict;
  1         1  
  1         22  
3 1     1   3 use warnings;
  1         1  
  1         41  
4             package AnyEvent::InfluxDB;
5             our $AUTHORITY = 'cpan:AJGB';
6             $AnyEvent::InfluxDB::VERSION = '0.13';
7 1     1   809 use AnyEvent;
  1         3550  
  1         22  
8 1     1   524 use AnyEvent::HTTP;
  1         18281  
  1         58  
9 1     1   473 use URI;
  1         2890  
  1         35  
10 1     1   375 use URI::QueryParam;
  1         488  
  1         21  
11 1     1   4 use JSON qw(decode_json);
  1         1  
  1         5  
12 1     1   527 use List::MoreUtils qw(zip);
  1         6436  
  1         4  
13 1     1   789 use URI::Encode::XS qw( uri_encode );
  1         330  
  1         42  
14 1     1   415 use Moo;
  1         8715  
  1         3  
15              
16             has [qw( ssl_options username password on_request )] => (
17             is => 'ro',
18             predicate => 1,
19             );
20              
21             has 'server' => (
22             is => 'rw',
23             default => 'http://localhost:8086',
24             );
25              
26             has '_is_ssl' => (
27             is => 'lazy',
28             );
29              
30             has '_tls_ctx' => (
31             is => 'lazy',
32             );
33              
34             has '_server_uri' => (
35             is => 'lazy',
36             );
37              
38              
39             sub _build__tls_ctx {
40 0     0     my ($self) = @_;
41              
42             # no ca/hostname checks
43 0 0         return 'low' unless $self->has_ssl_options;
44              
45             # create ctx
46 0           require AnyEvent::TLS;
47 0           return AnyEvent::TLS->new( %{ $self->ssl_options } );
  0            
48             }
49              
50             sub _build__is_ssl {
51 0     0     my ($self) = @_;
52              
53 0           return $self->server =~ /^https/;
54             }
55              
56             sub _build__server_uri {
57 0     0     my ($self) = @_;
58              
59 0           my $url = URI->new( $self->server, 'http' );
60              
61 0 0 0       if ( $self->has_username && $self->has_password ) {
62 0           $url->query_param( 'u' => $self->username );
63 0           $url->query_param( 'p' => $self->password );
64             }
65              
66 0           return $url;
67             }
68              
69             sub _make_url {
70 0     0     my ($self, $path, $params) = @_;
71              
72 0           my $url = $self->_server_uri->clone;
73 0           $url->path($path);
74              
75 0           while ( my ($k, $v) = each %$params ) {
76 0           $url->query_param( $k => $v );
77             }
78              
79 0           return $url;
80             }
81              
82             sub _http_request {
83 0     0     my $cb = pop;
84 0           my ($self, $method, $url, $post_data) = @_;
85              
86 0 0         if ($self->has_on_request) {
87 0           $self->on_request->($method, $url, $post_data);
88             }
89              
90 0           my %args = (
91             headers => {
92             referer => undef,
93             'user-agent' => "AnyEvent-InfluxDB/0.13",
94             }
95             );
96 0 0         if ( $method eq 'POST' ) {
97 0 0         if ( defined $post_data ) {
98 0           $args{'body'} = $post_data;
99             } else {
100 0 0         if ( my $q = $url->query_param_delete('q') ) {
101 0           $args{headers}{'content-type'} = 'application/x-www-form-urlencoded';
102 0           $args{body} = 'q='. uri_encode($q);
103             }
104             }
105             }
106 0 0         if ( $self->_is_ssl ) {
107 0           $args{tls_ctx} = $self->_tls_ctx;
108             }
109              
110 0           my $guard;
111             $guard = http_request
112             $method => $url->as_string,
113             %args,
114             sub {
115 0     0     $cb->(@_);
116 0           undef $guard;
117 0           };
118             };
119              
120              
121             sub ping {
122 0     0 1   my ($self, %args) = @_;
123              
124             my $url = $self->_make_url('/ping', {
125             (
126             exists $args{wait_for_leader} ?
127             ( wait_for_leader => $args{wait_for_leader} )
128 0 0         :
129             ()
130             )
131             });
132              
133             $self->_http_request( GET => $url,
134             sub {
135 0     0     my ($body, $headers) = @_;
136 0 0         if ( $headers->{Status} eq '204' ) {
137 0           $args{on_success}->( $headers->{'x-influxdb-version'} );
138             } else {
139 0   0       $args{on_error}->( $headers->{Reason} || $body );
140             }
141             }
142 0           );
143             }
144              
145              
146              
147             sub create_database {
148 0     0 1   my ($self, %args) = @_;
149              
150 0           my $q;
151 0 0         if ( exists $args{q} ) {
152 0           $q = $args{q};
153             } else {
154             $q = 'CREATE DATABASE '.
155             (
156             $args{if_not_exists} ? 'IF NOT EXISTS ' : ''
157             )
158 0 0         .$args{database};
159             }
160              
161 0           my $url = $self->_make_url('/query', {
162             q => $q,
163             });
164              
165             $self->_http_request( POST => $url,
166             sub {
167 0     0     my ($body, $headers) = @_;
168              
169 0 0         if ( $headers->{Status} eq '200' ) {
170 0           $args{on_success}->( $body );
171             } else {
172 0           $args{on_error}->( $body );
173             }
174             }
175 0           );
176             }
177              
178              
179             sub drop_database {
180 0     0 1   my ($self, %args) = @_;
181              
182 0           my $q;
183 0 0         if ( exists $args{q} ) {
184 0           $q = $args{q};
185             } else {
186             $q = 'DROP DATABASE '
187             .
188             (
189             $args{if_exists} ? 'IF EXISTS ' : ''
190 0 0         ) . $args{database};
191             }
192              
193 0           my $url = $self->_make_url('/query', {
194             q => $q,
195             });
196              
197             $self->_http_request( POST => $url,
198             sub {
199 0     0     my ($body, $headers) = @_;
200              
201 0 0 0       if ( $headers->{Status} eq '200' && $body eq '{"results":[{}]}' ) {
202 0           $args{on_success}->();
203             } else {
204 0           $args{on_error}->( $body );
205             }
206             }
207 0           );
208             }
209              
210              
211             sub drop_series {
212 0     0 1   my ($self, %args) = @_;
213              
214 0           my $q;
215 0 0         if ( exists $args{q} ) {
216 0           $q = $args{q};
217             } else {
218 0           $q = 'DROP SERIES';
219              
220 0 0         if ( my $measurement = $args{measurement} ) {
    0          
221 0           $q .= ' FROM '. $measurement;
222             }
223             elsif ( my $measurements = $args{measurements} ) {
224 0 0         $q .= ' FROM '. join(',', @{ $measurements || [] });
  0            
225             }
226              
227 0 0         if ( my $cond = $args{where} ) {
228 0           $q .= ' WHERE '. $cond;
229             }
230             }
231              
232             my $url = $self->_make_url('/query', {
233             db => $args{database},
234 0           q => $q
235             });
236              
237             $self->_http_request( POST => $url,
238             sub {
239 0     0     my ($body, $headers) = @_;
240              
241 0 0 0       if ( $headers->{Status} eq '200' && $body eq '{"results":[{}]}' ) {
242 0           $args{on_success}->();
243             } else {
244 0           $args{on_error}->( $body );
245             }
246             }
247 0           );
248             }
249              
250              
251             sub delete_series {
252 0     0 1   my ($self, %args) = @_;
253              
254 0           my $q;
255 0 0         if ( exists $args{q} ) {
256 0           $q = $args{q};
257             } else {
258 0           $q = 'DELETE';
259              
260 0 0         if ( my $measurement = $args{measurement} ) {
261 0           $q .= ' FROM '. $measurement;
262             }
263              
264 0 0         if ( my $cond = $args{where} ) {
265 0           $q .= ' WHERE '. $cond;
266             }
267             }
268              
269             my $url = $self->_make_url('/query', {
270             db => $args{database},
271 0           q => $q
272             });
273              
274             $self->_http_request( POST => $url,
275             sub {
276 0     0     my ($body, $headers) = @_;
277              
278 0 0 0       if ( $headers->{Status} eq '200' && $body eq '{"results":[{}]}' ) {
279 0           $args{on_success}->();
280             } else {
281 0           $args{on_error}->( $body );
282             }
283             }
284 0           );
285             }
286              
287              
288             sub drop_measurement {
289 0     0 1   my ($self, %args) = @_;
290              
291 0           my $q;
292 0 0         if ( exists $args{q} ) {
293 0           $q = $args{q};
294             } else {
295 0           $q = 'DROP MEASUREMENT '. $args{measurement};
296             }
297              
298             my $url = $self->_make_url('/query', {
299             db => $args{database},
300 0           q => $q
301             });
302              
303             $self->_http_request( POST => $url,
304             sub {
305 0     0     my ($body, $headers) = @_;
306              
307 0 0 0       if ( $headers->{Status} eq '200' && $body eq '{"results":[{}]}' ) {
308 0           $args{on_success}->();
309             } else {
310 0           $args{on_error}->( $body );
311             }
312             }
313 0           );
314             }
315              
316              
317             sub show_shards {
318 0     0 1   my ($self, %args) = @_;
319              
320 0           my $url = $self->_make_url('/query', {
321             q => 'SHOW SHARDS'
322             });
323              
324             $self->_http_request( GET => $url,
325             sub {
326 0     0     my ($body, $headers) = @_;
327              
328 0 0         if ( $headers->{Status} eq '200' ) {
329 0           my $data = decode_json($body);
330 0           my $shards = {};
331 0 0         for my $res ( @{ $data->{results}->[0]->{series} || [] } ) {
  0            
332 0           my $cols = $res->{columns};
333 0           my $values = $res->{values};
334             $shards->{ $res->{name } } = [
335             map {
336             +{
337 0           zip(@$cols, @$_)
338             }
339 0 0         } @{ $values || [] }
  0            
340             ];
341             }
342 0           $args{on_success}->($shards);
343             } else {
344 0           $args{on_error}->( $body );
345             }
346             }
347 0           );
348             }
349              
350              
351             sub show_shard_groups {
352 0     0 1   my ($self, %args) = @_;
353              
354 0           my $url = $self->_make_url('/query', {
355             q => 'SHOW SHARD GROUPS'
356             });
357              
358             $self->_http_request( GET => $url,
359             sub {
360 0     0     my ($body, $headers) = @_;
361              
362 0 0         if ( $headers->{Status} eq '200' ) {
363 0           my $data = decode_json($body);
364 0           my $res = $data->{results}->[0]->{series}->[0];
365 0           my $cols = $res->{columns};
366 0           my $values = $res->{values};
367             my @shard_groups = (
368             map {
369             +{
370 0           zip(@$cols, @$_)
371             }
372 0 0         } @{ $values || [] }
  0            
373             );
374 0           $args{on_success}->(@shard_groups);
375             } else {
376 0           $args{on_error}->( $body );
377             }
378             }
379 0           );
380             }
381              
382              
383             sub drop_shard {
384 0     0 1   my ($self, %args) = @_;
385              
386 0           my $q;
387 0 0         if ( exists $args{q} ) {
388 0           $q = $args{q};
389             } else {
390 0           $q = 'DROP SHARD '. $args{id};
391             }
392              
393             my $url = $self->_make_url('/query', {
394             db => $args{database},
395 0           q => $q
396             });
397              
398             $self->_http_request( POST => $url,
399             sub {
400 0     0     my ($body, $headers) = @_;
401              
402 0 0 0       if ( $headers->{Status} eq '200' && $body eq '{"results":[{}]}' ) {
403 0           $args{on_success}->();
404             } else {
405 0           $args{on_error}->( $body );
406             }
407             }
408 0           );
409             }
410              
411              
412             sub show_queries {
413 0     0 1   my ($self, %args) = @_;
414              
415 0           my $url = $self->_make_url('/query', {
416             q => 'SHOW QUERIES'
417             });
418              
419             $self->_http_request( GET => $url,
420             sub {
421 0     0     my ($body, $headers) = @_;
422              
423 0 0         if ( $headers->{Status} eq '200' ) {
424 0           my $data = decode_json($body);
425 0           my $res = $data->{results}->[0]->{series}->[0];
426 0           my $cols = $res->{columns};
427 0           my $values = $res->{values};
428             my @queries = (
429             map {
430             +{
431 0           zip(@$cols, @$_)
432             }
433 0 0         } @{ $values || [] }
  0            
434             );
435 0           $args{on_success}->(@queries);
436             } else {
437 0           $args{on_error}->( $body );
438             }
439             }
440 0           );
441             }
442              
443              
444             sub kill_query {
445 0     0 1   my ($self, %args) = @_;
446              
447 0           my $q;
448 0 0         if ( exists $args{q} ) {
449 0           $q = $args{q};
450             } else {
451 0           $q = 'KILL QUERY '. $args{id};
452             }
453              
454             my $url = $self->_make_url('/query', {
455             db => $args{database},
456 0           q => $q
457             });
458              
459             $self->_http_request( POST => $url,
460             sub {
461 0     0     my ($body, $headers) = @_;
462              
463 0 0 0       if ( $headers->{Status} eq '200' && $body eq '{"results":[{}]}' ) {
464 0           $args{on_success}->();
465             } else {
466 0           $args{on_error}->( $body );
467             }
468             }
469 0           );
470             }
471              
472              
473              
474             sub create_retention_policy {
475 0     0 1   my ($self, %args) = @_;
476              
477 0           my $q;
478 0 0         if ( exists $args{q} ) {
479 0           $q = $args{q};
480             } else {
481             $q = 'CREATE RETENTION POLICY '. $args{name}
482             .' ON '. $args{database}
483             .' DURATION '. $args{duration}
484             .' REPLICATION '. $args{replication}
485 0           .' SHARD DURATION '. $args{shard_duration};
486              
487 0 0         $q .= ' DEFAULT' if $args{default};
488             }
489              
490 0           my $url = $self->_make_url('/query', {
491             q => $q,
492             });
493              
494             $self->_http_request( POST => $url,
495             sub {
496 0     0     my ($body, $headers) = @_;
497              
498 0 0 0       if ( $headers->{Status} eq '200' && $body eq '{"results":[{}]}' ) {
499 0           $args{on_success}->();
500             } else {
501 0           $args{on_error}->( $body );
502             }
503             }
504 0           );
505             }
506              
507              
508             sub alter_retention_policy {
509 0     0 1   my ($self, %args) = @_;
510              
511 0           my $q;
512 0 0         if ( exists $args{q} ) {
513 0           $q = $args{q};
514             } else {
515             $q = 'ALTER RETENTION POLICY '. $args{name}
516 0           .' ON '. $args{database};
517              
518 0 0         $q .= ' DURATION '. $args{duration} if exists $args{duration};
519 0 0         $q .= ' SHARD DURATION '. $args{shard_duration} if exists $args{shard_duration};
520 0 0         $q .= ' REPLICATION '. $args{replication} if exists $args{replication};;
521 0 0         $q .= ' DEFAULT' if $args{default};
522             }
523              
524 0           my $url = $self->_make_url('/query', {
525             q => $q
526             });
527              
528             $self->_http_request( POST => $url,
529             sub {
530 0     0     my ($body, $headers) = @_;
531              
532 0 0 0       if ( $headers->{Status} eq '200' && $body eq '{"results":[{}]}' ) {
533 0           $args{on_success}->();
534             } else {
535 0           $args{on_error}->( $body );
536             }
537             }
538 0           );
539             }
540              
541              
542             sub drop_retention_policy {
543 0     0 1   my ($self, %args) = @_;
544              
545 0           my $q;
546 0 0         if ( exists $args{q} ) {
547 0           $q = $args{q};
548             } else {
549 0           $q = 'DROP RETENTION POLICY '. $args{name} .' ON '. $args{database};
550             }
551              
552 0           my $url = $self->_make_url('/query', {
553             q => $q,
554             });
555              
556             $self->_http_request( POST => $url,
557             sub {
558 0     0     my ($body, $headers) = @_;
559              
560 0 0 0       if ( $headers->{Status} eq '200' && $body eq '{"results":[{}]}' ) {
561 0           $args{on_success}->();
562             } else {
563 0           $args{on_error}->( $body );
564             }
565             }
566 0           );
567             }
568              
569              
570             sub show_databases {
571 0     0 1   my ($self, %args) = @_;
572              
573 0           my $url = $self->_make_url('/query', {
574             q => 'SHOW DATABASES'
575             });
576              
577             $self->_http_request( GET => $url,
578             sub {
579 0     0     my ($body, $headers) = @_;
580              
581 0 0         if ( $headers->{Status} eq '200' ) {
582 0           my $data = decode_json($body);
583 0           my @names;
584 0           eval {
585 0 0         @names = map { $_->[0] } @{ $data->{results}->[0]->{series}->[0]->{values} || [] };
  0            
  0            
586             };
587 0           $args{on_success}->(@names);
588             } else {
589 0           $args{on_error}->( $body );
590             }
591             }
592 0           );
593             }
594              
595              
596             sub show_retention_policies {
597 0     0 1   my ($self, %args) = @_;
598              
599 0           my $q;
600 0 0         if ( exists $args{q} ) {
601 0           $q = $args{q};
602             } else {
603 0           $q = 'SHOW RETENTION POLICIES ON '. $args{database};
604             }
605              
606 0           my $url = $self->_make_url('/query', {
607             q => $q,
608             });
609              
610             $self->_http_request( GET => $url,
611             sub {
612 0     0     my ($body, $headers) = @_;
613              
614 0 0         if ( $headers->{Status} eq '200' ) {
615 0           my $data = decode_json($body);
616 0           my $res = $data->{results}->[0]->{series}->[0];
617 0           my $cols = $res->{columns};
618 0           my $values = $res->{values};
619             my @policies = (
620             map {
621             +{
622 0           zip(@$cols, @$_)
623             }
624 0 0         } @{ $values || [] }
  0            
625             );
626 0           $args{on_success}->(@policies);
627             } else {
628 0           $args{on_error}->( $body );
629             }
630             }
631 0           );
632             }
633              
634              
635             sub show_series {
636 0     0 1   my ($self, %args) = @_;
637              
638 0           my $q;
639 0 0         if ( exists $args{q} ) {
640 0           $q = $args{q};
641             } else {
642 0           $q = 'SHOW SERIES';
643              
644 0 0         if ( my $measurement = $args{measurement} ) {
645 0           $q .= ' FROM '. $measurement;
646             }
647              
648 0 0         if ( my $cond = $args{where} ) {
649 0           $q .= ' WHERE '. $cond;
650             }
651              
652 0 0         if ( my $order_by = $args{order_by} ) {
653 0           $q .= ' ORDER BY '. $order_by;
654             }
655              
656 0 0         if ( my $limit = $args{limit} ) {
657 0           $q .= ' LIMIT '. $limit;
658              
659 0 0         if ( my $offset = $args{offset} ) {
660 0           $q .= ' OFFSET '. $offset;
661             }
662             }
663             }
664              
665             my $url = $self->_make_url('/query', {
666             db => $args{database},
667 0           q => $q
668             });
669              
670             $self->_http_request( GET => $url,
671             sub {
672 0     0     my ($body, $headers) = @_;
673              
674 0 0         if ( $headers->{Status} eq '200' ) {
675 0           my $data = decode_json($body);
676 0           my $res = $data->{results}->[0]->{series}->[0];
677 0           my $values = $res->{values};
678             my @series = (
679 0 0         map { @$_ } @{ $values || [] }
  0            
  0            
680             );
681 0           $args{on_success}->(@series);
682             } else {
683 0           $args{on_error}->( $body );
684             }
685             }
686 0           );
687             }
688              
689              
690             sub show_measurements {
691 0     0 1   my ($self, %args) = @_;
692              
693 0           my $q;
694 0 0         if ( exists $args{q} ) {
695 0           $q = $args{q};
696             } else {
697 0           $q = 'SHOW MEASUREMENTS';
698              
699 0 0         if ( my $measurement = $args{measurement} ) {
700 0 0         $q .= ' WITH MEASUREMENT '
701             . ( $measurement =~ /^\/.*\/$/ ? '=~' : '=' )
702             . $measurement;
703             }
704              
705 0 0         if ( my $cond = $args{where} ) {
706 0           $q .= ' WHERE '. $cond;
707             }
708              
709 0 0         if ( my $order_by = $args{order_by} ) {
710 0           $q .= ' ORDER BY '. $order_by;
711             }
712              
713 0 0         if ( my $limit = $args{limit} ) {
714 0           $q .= ' LIMIT '. $limit;
715              
716 0 0         if ( my $offset = $args{offset} ) {
717 0           $q .= ' OFFSET '. $offset;
718             }
719             }
720             }
721              
722             my $url = $self->_make_url('/query', {
723             db => $args{database},
724 0           q => $q
725             });
726              
727             $self->_http_request( GET => $url,
728             sub {
729 0     0     my ($body, $headers) = @_;
730              
731 0 0         if ( $headers->{Status} eq '200' ) {
732 0           my $data = decode_json($body);
733 0           my $res = $data->{results}->[0]->{series}->[0];
734 0           my $values = $res->{values};
735             my @measurements = (
736 0 0         map { @$_ } @{ $values || [] }
  0            
  0            
737             );
738 0           $args{on_success}->(@measurements);
739             } else {
740 0           $args{on_error}->( $body );
741             }
742             }
743 0           );
744             }
745              
746              
747             sub show_tag_keys {
748 0     0 1   my ($self, %args) = @_;
749              
750 0           my $q;
751 0 0         if ( exists $args{q} ) {
752 0           $q = $args{q};
753             } else {
754 0           $q = 'SHOW TAG KEYS';
755              
756 0 0         if ( my $measurement = $args{measurement} ) {
757 0           $q .= ' FROM '. $measurement;
758             }
759              
760 0 0         if ( my $cond = $args{where} ) {
761 0           $q .= ' WHERE '. $cond;
762             }
763              
764 0 0         if ( my $limit = $args{limit} ) {
765 0           $q .= ' LIMIT '. $limit;
766             }
767              
768 0 0         if ( my $offset = $args{offset} ) {
769 0           $q .= ' OFFSET '. $offset;
770             }
771             }
772              
773             my $url = $self->_make_url('/query', {
774             db => $args{database},
775 0           q => $q
776             });
777              
778             $self->_http_request( GET => $url,
779             sub {
780 0     0     my ($body, $headers) = @_;
781              
782 0 0         if ( $headers->{Status} eq '200' ) {
783 0           my $data = decode_json($body);
784 0           my $tag_keys = {};
785 0 0         for my $res ( @{ $data->{results}->[0]->{series} || [] } ) {
  0            
786 0           my $values = $res->{values};
787             $tag_keys->{ $res->{name } } = [
788             map {
789 0           @$_
790 0 0         } @{ $values || [] }
  0            
791             ];
792             }
793 0           $args{on_success}->($tag_keys);
794             } else {
795 0           $args{on_error}->( $body );
796             }
797             }
798 0           );
799             }
800              
801              
802             sub show_tag_values {
803 0     0 1   my ($self, %args) = @_;
804              
805 0           my $q;
806 0 0         if ( exists $args{q} ) {
807 0           $q = $args{q};
808             } else {
809 0           $q = 'SHOW TAG VALUES';
810              
811 0 0         if ( my $measurement = $args{measurement} ) {
812 0           $q .= ' FROM '. $measurement;
813             }
814              
815 0 0         if ( my $keys = $args{keys} ) {
    0          
816 0           $q .= ' WITH KEY IN ('. join(", ", @$keys) .')';
817             }
818             elsif ( my $key = $args{key} ) {
819 0           $q .= ' WITH KEY = '. $key;
820             }
821              
822 0 0         if ( my $cond = $args{where} ) {
823 0           $q .= ' WHERE '. $cond;
824             }
825              
826 0 0         if ( my $limit = $args{limit} ) {
827 0           $q .= ' LIMIT '. $limit;
828             }
829              
830 0 0         if ( my $offset = $args{offset} ) {
831 0           $q .= ' OFFSET '. $offset;
832             }
833             }
834              
835             my $url = $self->_make_url('/query', {
836             db => $args{database},
837 0           q => $q
838             });
839              
840             $self->_http_request( GET => $url,
841             sub {
842 0     0     my ($body, $headers) = @_;
843              
844 0 0         if ( $headers->{Status} eq '200' ) {
845 0           my $data = decode_json($body);
846 0           my $tag_values = {};
847 0 0         for my $res ( @{ $data->{results}->[0]->{series} || [] } ) {
  0            
848 0           my $cols = $res->{columns};
849 0           my %col_idx = ( key => 0, value => 1 );
850 0 0         for ( my $i = 0; $i < @{ $cols || [] }; $i++ ) {
  0            
851 0           $col_idx{ $cols->[$i] } = $i;
852             }
853 0           my $values = $res->{values};
854 0 0         for my $v ( @{ $values || [] } ) {
  0            
855 0           push @{ $tag_values->{ $res->{name } }->{ $v->[ $col_idx{key} ] } },
856 0           $v->[ $col_idx{value} ];
857             }
858             }
859 0           $args{on_success}->($tag_values);
860             } else {
861 0           $args{on_error}->( $body );
862             }
863             }
864 0           );
865             }
866              
867              
868             sub show_field_keys {
869 0     0 1   my ($self, %args) = @_;
870              
871 0           my $q;
872 0 0         if ( exists $args{q} ) {
873 0           $q = $args{q};
874             } else {
875 0           $q = 'SHOW FIELD KEYS';
876              
877 0 0         if ( my $measurement = $args{measurement} ) {
878 0           $q .= ' FROM '. $measurement;
879             }
880             }
881              
882             my $url = $self->_make_url('/query', {
883             db => $args{database},
884 0           q => $q
885             });
886              
887             $self->_http_request( GET => $url,
888             sub {
889 0     0     my ($body, $headers) = @_;
890              
891 0 0         if ( $headers->{Status} eq '200' ) {
892 0           my $data = decode_json($body);
893 0           my $field_keys = {};
894 0 0         for my $res ( @{ $data->{results}->[0]->{series} || [] } ) {
  0            
895 0           my $values = $res->{values};
896             $field_keys->{ $res->{name } } = [
897             map {
898 0           @$_
899 0 0         } @{ $values || [] }
  0            
900             ];
901             }
902 0           $args{on_success}->($field_keys);
903             } else {
904 0           $args{on_error}->( $body );
905             }
906             }
907 0           );
908             }
909              
910              
911             sub create_user {
912 0     0 1   my ($self, %args) = @_;
913              
914 0           my $q;
915 0 0         if ( exists $args{q} ) {
916 0           $q = $args{q};
917             } else {
918             $q = 'CREATE USER '. $args{username}
919 0           .' WITH PASSWORD \''. $args{password} .'\'';
920              
921 0 0         $q .= ' WITH ALL PRIVILEGES' if $args{all_privileges};
922             }
923              
924 0           my $url = $self->_make_url('/query', {
925             q => $q
926             });
927              
928             $self->_http_request( POST => $url,
929             sub {
930 0     0     my ($body, $headers) = @_;
931              
932 0 0 0       if ( $headers->{Status} eq '200' && $body eq '{"results":[{}]}' ) {
933 0           $args{on_success}->();
934             } else {
935 0           $args{on_error}->( $body );
936             }
937             }
938 0           );
939             }
940              
941              
942             sub set_user_password {
943 0     0 1   my ($self, %args) = @_;
944              
945 0           my $q;
946 0 0         if ( exists $args{q} ) {
947 0           $q = $args{q};
948             } else {
949             $q = 'SET PASSWORD FOR '. $args{username}
950 0           .' = \''. $args{password} .'\'';
951             }
952              
953 0           my $url = $self->_make_url('/query', {
954             q => $q
955             });
956              
957             $self->_http_request( POST => $url,
958             sub {
959 0     0     my ($body, $headers) = @_;
960              
961 0 0 0       if ( $headers->{Status} eq '200' && $body eq '{"results":[{}]}' ) {
962 0           $args{on_success}->();
963             } else {
964 0           $args{on_error}->( $body );
965             }
966             }
967 0           );
968             }
969              
970              
971             sub show_users {
972 0     0 1   my ($self, %args) = @_;
973              
974 0           my $url = $self->_make_url('/query', {
975             q => 'SHOW USERS'
976             });
977              
978             $self->_http_request( GET => $url,
979             sub {
980 0     0     my ($body, $headers) = @_;
981              
982 0 0         if ( $headers->{Status} eq '200' ) {
983 0           my $data = decode_json($body);
984 0           my $res = $data->{results}->[0]->{series}->[0];
985 0           my $cols = $res->{columns};
986 0           my $values = $res->{values};
987             my @users = (
988             map {
989             +{
990 0           zip(@$cols, @$_)
991             }
992 0 0         } @{ $values || [] }
  0            
993             );
994 0           $args{on_success}->(@users);
995             } else {
996 0           $args{on_error}->( $body );
997             }
998             }
999 0           );
1000             }
1001              
1002              
1003             sub grant_privileges {
1004 0     0 1   my ($self, %args) = @_;
1005              
1006 0           my $q;
1007 0 0         if ( exists $args{q} ) {
1008 0           $q = $args{q};
1009             } else {
1010 0           $q = 'GRANT ';
1011              
1012 0 0         if ( $args{all_privileges} ) {
1013 0           $q .= 'ALL PRIVILEGES';
1014             } else {
1015 0           $q .= $args{access} .' ON '. $args{database};
1016             }
1017 0           $q .= ' TO '. $args{username};
1018             }
1019              
1020 0           my $url = $self->_make_url('/query', {
1021             q => $q
1022             });
1023              
1024             $self->_http_request( POST => $url,
1025             sub {
1026 0     0     my ($body, $headers) = @_;
1027              
1028 0 0 0       if ( $headers->{Status} eq '200' && $body eq '{"results":[{}]}' ) {
1029 0           $args{on_success}->();
1030             } else {
1031 0           $args{on_error}->( $body );
1032             }
1033             }
1034 0           );
1035             }
1036              
1037              
1038             sub show_grants {
1039 0     0 1   my ($self, %args) = @_;
1040              
1041 0           my $q;
1042 0 0         if ( exists $args{q} ) {
1043 0           $q = $args{q};
1044             } else {
1045 0           $q = 'SHOW GRANTS FOR '. $args{username};
1046             }
1047              
1048 0           my $url = $self->_make_url('/query', {
1049             q => $q
1050             });
1051              
1052             $self->_http_request( GET => $url,
1053             sub {
1054 0     0     my ($body, $headers) = @_;
1055              
1056 0 0         if ( $headers->{Status} eq '200' ) {
1057 0           my $data = decode_json($body);
1058 0           my $res = $data->{results}->[0]->{series}->[0];
1059 0           my $cols = $res->{columns};
1060 0           my $values = $res->{values};
1061             my @grants = (
1062             map {
1063             +{
1064 0           zip(@$cols, @$_)
1065             }
1066 0 0         } @{ $values || [] }
  0            
1067             );
1068 0           $args{on_success}->(@grants);
1069             } else {
1070 0           $args{on_error}->( $body );
1071             }
1072             }
1073 0           );
1074             }
1075              
1076              
1077              
1078             sub revoke_privileges {
1079 0     0 1   my ($self, %args) = @_;
1080              
1081 0           my $q;
1082 0 0         if ( exists $args{q} ) {
1083 0           $q = $args{q};
1084             } else {
1085 0           $q = 'REVOKE ';
1086              
1087 0 0         if ( $args{all_privileges} ) {
1088 0           $q .= 'ALL PRIVILEGES';
1089             } else {
1090 0           $q .= $args{access} .' ON '. $args{database};
1091             }
1092 0           $q .= ' FROM '. $args{username};
1093             }
1094              
1095 0           my $url = $self->_make_url('/query', {
1096             q => $q
1097             });
1098              
1099             $self->_http_request( POST => $url,
1100             sub {
1101 0     0     my ($body, $headers) = @_;
1102              
1103 0 0 0       if ( $headers->{Status} eq '200' && $body eq '{"results":[{}]}' ) {
1104 0           $args{on_success}->();
1105             } else {
1106 0           $args{on_error}->( $body );
1107             }
1108             }
1109 0           );
1110             }
1111              
1112              
1113              
1114             sub drop_user {
1115 0     0 1   my ($self, %args) = @_;
1116              
1117 0           my $q;
1118 0 0         if ( exists $args{q} ) {
1119 0           $q = $args{q};
1120             } else {
1121 0           $q = 'DROP USER '. $args{username};
1122             }
1123              
1124 0           my $url = $self->_make_url('/query', {
1125             q => $q
1126             });
1127              
1128             $self->_http_request( POST => $url,
1129             sub {
1130 0     0     my ($body, $headers) = @_;
1131              
1132 0 0 0       if ( $headers->{Status} eq '200' && $body eq '{"results":[{}]}' ) {
1133 0           $args{on_success}->();
1134             } else {
1135 0           $args{on_error}->( $body );
1136             }
1137             }
1138 0           );
1139             }
1140              
1141              
1142             sub create_continuous_query {
1143 0     0 1   my ($self, %args) = @_;
1144              
1145 0           my $q;
1146 0 0         if ( exists $args{q} ) {
1147 0           $q = $args{q};
1148             } else {
1149 0           my $resample = '';
1150 0 0 0       if ( $args{every} || $args{for} ) {
1151 0           $resample = ' RESAMPLE';
1152 0 0         if ( $args{every} ) {
1153 0           $resample .= ' EVERY '. $args{every};
1154             }
1155 0 0         if ( $args{for} ) {
1156 0           $resample .= ' FOR '. $args{for};
1157             }
1158             }
1159             $q = 'CREATE CONTINUOUS QUERY '. $args{name}
1160             .' ON '. $args{database}
1161             . $resample
1162             .' BEGIN '. $args{query}
1163 0           .' END';
1164             }
1165              
1166 0           my $url = $self->_make_url('/query', {
1167             q => $q
1168             });
1169              
1170             $self->_http_request( POST => $url,
1171             sub {
1172 0     0     my ($body, $headers) = @_;
1173              
1174 0 0 0       if ( $headers->{Status} eq '200' && $body eq '{"results":[{}]}' ) {
1175 0           $args{on_success}->();
1176             } else {
1177 0           $args{on_error}->( $body );
1178             }
1179             }
1180 0           );
1181             }
1182              
1183              
1184             sub drop_continuous_query {
1185 0     0 1   my ($self, %args) = @_;
1186              
1187 0           my $q;
1188 0 0         if ( exists $args{q} ) {
1189 0           $q = $args{q};
1190             } else {
1191 0           $q = 'DROP CONTINUOUS QUERY '. $args{name} . ' ON '. $args{database};
1192             }
1193              
1194 0           my $url = $self->_make_url('/query', {
1195             q => $q
1196             });
1197              
1198             $self->_http_request( POST => $url,
1199             sub {
1200 0     0     my ($body, $headers) = @_;
1201              
1202 0 0 0       if ( $headers->{Status} eq '200' && $body eq '{"results":[{}]}' ) {
1203 0           $args{on_success}->();
1204             } else {
1205 0           $args{on_error}->( $body );
1206             }
1207             }
1208 0           );
1209             }
1210              
1211              
1212             sub show_continuous_queries {
1213 0     0 1   my ($self, %args) = @_;
1214              
1215             my $url = $self->_make_url('/query', {
1216             db => $args{database},
1217 0           q => 'SHOW CONTINUOUS QUERIES'
1218             });
1219              
1220             $self->_http_request( GET => $url,
1221             sub {
1222 0     0     my ($body, $headers) = @_;
1223              
1224 0 0         if ( $headers->{Status} eq '200' ) {
1225 0           my $data = decode_json($body);
1226 0           my $cqs = {};
1227 0 0         for my $res ( @{ $data->{results}->[0]->{series} || [] } ) {
  0            
1228 0           my $cols = $res->{columns};
1229 0           my $values = $res->{values};
1230             $cqs->{ $res->{name } } = [
1231             map {
1232             +{
1233 0           zip(@$cols, @$_)
1234             }
1235 0 0         } @{ $values || [] }
  0            
1236             ];
1237             }
1238 0           $args{on_success}->($cqs);
1239             } else {
1240 0           $args{on_error}->( $body );
1241             }
1242             }
1243 0           );
1244             }
1245              
1246              
1247              
1248             sub _to_line {
1249 0     0     my $data = shift;
1250              
1251 0   0       my $t = $data->{tags} || {};
1252 0   0       my $f = $data->{fields} || {};
1253              
1254             return $data->{measurement}
1255             .(
1256             $t ?
1257             ','.
1258             join(',',
1259             map {
1260 0           join('=', $_, $t->{$_})
1261 0           } sort { $a cmp $b } keys %$t
1262             )
1263             :
1264             ''
1265             )
1266             . ' '
1267             .(
1268             join(',',
1269             map {
1270 0           join('=', $_, $f->{$_})
1271             } keys %$f
1272             )
1273             )
1274             .(
1275             $data->{time} ?
1276             ' '. $data->{time}
1277             :
1278 0 0         ''
    0          
1279             );
1280             }
1281              
1282             sub write {
1283 0     0 1   my ($self, %args) = @_;
1284              
1285             my $data = ref $args{data} eq 'ARRAY' ?
1286 0 0         join("\n", map { ref $_ eq 'HASH' ? _to_line($_) : $_ } @{ $args{data} })
  0            
1287             :
1288 0 0         ref $args{data} eq 'HASH' ? _to_line($args{data}) : $args{data};
    0          
1289              
1290             my $url = $self->_make_url('/write', {
1291             db => $args{database},
1292             (
1293             $args{consistency} ?
1294             ( consistency => $args{consistency} )
1295             :
1296             ()
1297             ),
1298             (
1299             $args{rp} ?
1300             ( rp => $args{rp} )
1301             :
1302             ()
1303             ),
1304             (
1305             $args{precision} ?
1306             ( precision => $args{precision} )
1307             :
1308             ()
1309             ),
1310             (
1311             $args{one} ?
1312             ( one => $args{one} )
1313 0 0         :
    0          
    0          
    0          
1314             ()
1315             )
1316             });
1317              
1318             $self->_http_request( POST => $url, $data,
1319             sub {
1320 0     0     my ($body, $headers) = @_;
1321              
1322 0 0         if ( $headers->{Status} eq '204' ) {
1323 0           $args{on_success}->();
1324             } else {
1325 0           $args{on_error}->( $body );
1326             }
1327             }
1328 0           );
1329             }
1330              
1331              
1332             sub select {
1333 0     0 1   my ($self, %args) = @_;
1334              
1335 0           my $method = 'GET';
1336 0           my $q;
1337 0 0         if ( exists $args{q} ) {
1338 0           $q = $args{q};
1339 0 0         if ( $q =~ /\s+INTO\s+/i ) {
1340 0           $method = 'POST';
1341             }
1342             } else {
1343 0           $q = 'SELECT '. $args{fields};
1344              
1345 0 0         if ( my $into = $args{into} ) {
1346 0           $q .= ' INTO '. $into;
1347 0           $method = 'POST';
1348             }
1349              
1350 0           $q .= ' FROM '. $args{measurement};
1351              
1352 0 0         if ( my $cond = $args{where} ) {
1353 0           $q .= ' WHERE '. $cond;
1354             }
1355              
1356 0 0         if ( my $group = $args{group_by} ) {
1357 0           $q .= ' GROUP BY '. $group;
1358              
1359 0 0         if ( my $fill = $args{fill} ) {
1360 0           $q .= ' fill('. $fill .')';
1361             }
1362             }
1363              
1364 0 0         if ( my $order_by = $args{order_by} ) {
1365 0           $q .= ' ORDER BY '. $order_by;
1366             }
1367              
1368 0 0         if ( my $limit = $args{limit} ) {
1369 0           $q .= ' LIMIT '. $limit;
1370              
1371 0 0         if ( my $offset = $args{offset} ) {
1372 0           $q .= ' OFFSET '. $offset;
1373             }
1374             }
1375              
1376 0 0         if ( my $slimit = $args{slimit} ) {
1377 0           $q .= ' SLIMIT '. $slimit;
1378              
1379 0 0         if ( my $soffset = $args{soffset} ) {
1380 0           $q .= ' SOFFSET '. $soffset;
1381             }
1382             }
1383             }
1384              
1385             my $url = $self->_make_url('/query', {
1386             db => $args{database},
1387             q => $q,
1388             (
1389             $args{rp} ?
1390             ( rp => $args{rp} )
1391             :
1392             ()
1393             ),
1394             (
1395             $args{epoch} ?
1396             ( epoch => $args{epoch} )
1397             :
1398             ()
1399             ),
1400             (
1401             $args{chunk_size} ?
1402             ( chunk_size => $args{chunk_size} )
1403 0 0         :
    0          
    0          
1404             ()
1405             ),
1406             });
1407              
1408             $self->_http_request( $method => $url,
1409             sub {
1410 0     0     my ($body, $headers) = @_;
1411              
1412 0 0         if ( $headers->{Status} eq '200' ) {
1413 0           my $data = decode_json($body);
1414             my $series = [
1415             map {
1416 0           my $res = $_;
1417              
1418 0           my $cols = $res->{columns};
1419 0           my $values = $res->{values};
1420              
1421             +{
1422             name => $res->{name},
1423             values => [
1424             map {
1425             +{
1426 0           zip(@$cols, @$_)
1427             }
1428 0 0         } @{ $values || [] }
  0            
1429             ]
1430             }
1431 0 0         } @{ $data->{results}->[0]->{series} || [] }
  0            
1432             ];
1433 0           $args{on_success}->($series);
1434             } else {
1435 0           $args{on_error}->( $body );
1436             }
1437             }
1438 0           );
1439             }
1440              
1441              
1442             sub query {
1443 0     0 1   my ($self, %args) = @_;
1444              
1445 0           my $url = $self->_server_uri->clone;
1446 0           $url->path('/query');
1447 0           $url->query_form_hash( $args{query} );
1448              
1449 0   0       my $method = $args{method} || 'GET';
1450              
1451             $self->_http_request( $method => $url,
1452             sub {
1453 0     0     $args{on_response}->(@_);
1454             }
1455 0           );
1456             }
1457              
1458              
1459             sub create_subscription {
1460 0     0 1   my ($self, %args) = @_;
1461              
1462 0           my $q;
1463 0 0         if ( exists $args{q} ) {
1464 0           $q = $args{q};
1465             } else {
1466             $q = 'CREATE SUBSCRIPTION '. $args{name} .' ON '
1467             . $args{database} .'.'. $args{rp}
1468             . ' DESTINATIONS '. $args{mode} .' '
1469             . (
1470             ref $args{destinations} eq 'ARRAY' ?
1471 0 0         join(", ", @{ $args{destinations} || [] } )
1472             :
1473             $args{destinations}
1474 0 0         );
1475             }
1476              
1477 0           my $url = $self->_make_url('/query', {
1478             q => $q,
1479             });
1480              
1481             $self->_http_request( POST => $url,
1482             sub {
1483 0     0     my ($body, $headers) = @_;
1484              
1485 0 0 0       if ( $headers->{Status} eq '200' && $body eq '{"results":[{}]}' ) {
1486 0           $args{on_success}->();
1487             } else {
1488 0           $args{on_error}->( $body );
1489             }
1490             }
1491 0           );
1492             }
1493              
1494              
1495             sub show_subscriptions {
1496 0     0 1   my ($self, %args) = @_;
1497              
1498 0           my $url = $self->_make_url('/query', {
1499             q => 'SHOW SUBSCRIPTIONS'
1500             });
1501              
1502             $self->_http_request( GET => $url,
1503             sub {
1504 0     0     my ($body, $headers) = @_;
1505              
1506 0 0         if ( $headers->{Status} eq '200' ) {
1507 0           my $data = decode_json($body);
1508 0           my $subscriptions = {};
1509 0 0         for my $res ( @{ $data->{results}->[0]->{series} || [] } ) {
  0            
1510 0           my $cols = $res->{columns};
1511 0           my $values = $res->{values};
1512             $subscriptions->{ $res->{name } } = [
1513             map {
1514             +{
1515 0           zip(@$cols, @$_)
1516             }
1517 0 0         } @{ $values || [] }
  0            
1518             ];
1519             }
1520 0           $args{on_success}->($subscriptions);
1521             } else {
1522 0           $args{on_error}->( $body );
1523             }
1524             }
1525 0           );
1526             }
1527              
1528              
1529             sub drop_subscription {
1530 0     0 1   my ($self, %args) = @_;
1531              
1532 0           my $q;
1533 0 0         if ( exists $args{q} ) {
1534 0           $q = $args{q};
1535             } else {
1536             $q = 'DROP SUBSCRIPTION '. $args{name} .' ON '
1537 0           . $args{database} .'.'. $args{rp};
1538             }
1539              
1540 0           my $url = $self->_make_url('/query', {
1541             q => $q,
1542             });
1543              
1544             $self->_http_request( POST => $url,
1545             sub {
1546 0     0     my ($body, $headers) = @_;
1547              
1548 0 0 0       if ( $headers->{Status} eq '200' && $body eq '{"results":[{}]}' ) {
1549 0           $args{on_success}->();
1550             } else {
1551 0           $args{on_error}->( $body );
1552             }
1553             }
1554 0           );
1555             }
1556              
1557              
1558              
1559             1;
1560              
1561             __END__