File Coverage

blib/lib/ETL/Yertl/Adapter/graphite.pm
Criterion Covered Total %
statement 67 77 87.0
branch 13 20 65.0
condition 9 19 47.3
subroutine 13 14 92.8
pod 3 5 60.0
total 105 135 77.7


line stmt bran cond sub pod time code
1             package ETL::Yertl::Adapter::graphite;
2             our $VERSION = '0.035';
3             # ABSTRACT: Adapter to read/write from Graphite time series database
4              
5             #pod =head1 SYNOPSIS
6             #pod
7             #pod my $db = ETL::Yertl::Adapter::graphite->new( 'graphite://localhost:8086' );
8             #pod my @points = $db->read_ts( { metric => 'db.cpu_load.1m' } );
9             #pod $db->write_ts( { metric => 'db.cpu_load.1m', value => 1.23 } );
10             #pod
11             #pod =head1 DESCRIPTION
12             #pod
13             #pod This class allows Yertl to read and write time series from L
14             #pod Graphite time series system|https://graphiteapp.org>. Yertl can write to
15             #pod Carbon using the "plaintext" protocol, and read from Graphite using its
16             #pod HTTP API.
17             #pod
18             #pod This adapter is used by the L command.
19             #pod
20             #pod =head2 Metric Name Format
21             #pod
22             #pod Graphite metrics are paths separated by C<.>, which is the native format
23             #pod that Yertl supports.
24             #pod
25             #pod yts graphite://localhost foo.bar.baz
26             #pod yts graphite://localhost foo.bar
27             #pod
28             #pod =head1 SEE ALSO
29             #pod
30             #pod L, L,
31             #pod L
32             #pod L
33             #pod
34             #pod =cut
35              
36 1     1   160723 use ETL::Yertl;
  1         3  
  1         10  
37 1     1   343 use Net::Async::HTTP;
  1         40645  
  1         65  
38 1     1   11 use URI;
  1         2  
  1         26  
39 1     1   385 use JSON::MaybeXS qw( decode_json );
  1         4926  
  1         68  
40 1     1   8 use List::Util qw( first );
  1         3  
  1         55  
41 1     1   6 use IO::Async::Loop;
  1         2  
  1         20  
42 1     1   418 use Time::Piece ();
  1         7243  
  1         948  
43              
44             #pod =method new
45             #pod
46             #pod my $db = ETL::Yertl::Adapter::graphite->new( 'graphite://localhost' );
47             #pod my $db = ETL::Yertl::Adapter::graphite->new( 'graphite://localhost:8080' );
48             #pod
49             #pod Construct a new Graphite adapter for the database on the given host and port.
50             #pod Port is optional and defaults to C<2003> (the Carbon plaintext port) for writing
51             #pod and C<8080> (the default Graphite HTTP API port) for reading.
52             #pod
53             #pod =cut
54              
55             sub new {
56 5     5 1 29763 my $class = shift;
57              
58 5         9 my %args;
59 5 100       13 if ( @_ == 1 ) {
60 3 100       21 if ( $_[0] =~ m{://([^:]+)(?::([^/]+))?} ) {
61 2         13 ($args{host}, my $port ) = ( $1, $2 );
62 2 100       5 if ( $port ) {
63 1         5 @args{qw( http_port write_port )} = ( $port ) x 2;
64             }
65             }
66             }
67             else {
68 2         9 %args = @_;
69             }
70              
71 5 100       22 die "Host is required" unless $args{host};
72              
73 4   100     17 $args{write_port} ||= 2003; # "plaintext" port
74 4   100     15 $args{http_port} ||= 8080; # http port
75              
76 4         12 return bless \%args, $class;
77             }
78              
79             sub _loop {
80 1     1   2 my ( $self ) = @_;
81 1   33     12 return $self->{_loop} ||= IO::Async::Loop->new;
82             }
83              
84             sub write_client {
85 1     1 0 3 my ( $self ) = @_;
86             return $self->{write_client} ||= $self->_loop->connect(
87             socktype => 'stream',
88             host => $self->{host},
89             service => $self->{write_port},
90 1   33     4 )->get;
91             }
92              
93             sub http_client {
94 1     1 0 2 my ( $self ) = @_;
95 1   33     5 return $self->{http_client} ||= do {
96 1         12 my $http = Net::Async::HTTP->new;
97 1         85 $self->_loop->add( $http );
98 1         71 $http;
99             };
100             }
101              
102             #pod =method read_ts
103             #pod
104             #pod my @points = $db->read_ts( $query );
105             #pod
106             #pod Read a time series from the database. C<$query> is a hash reference
107             #pod with the following keys:
108             #pod
109             #pod =over
110             #pod
111             #pod =item metric
112             #pod
113             #pod The time series to read. For Graphite, this is a path separated by dots
114             #pod (C<.>).
115             #pod
116             #pod =item start
117             #pod
118             #pod An ISO8601 date/time for the start of the series points to return,
119             #pod inclusive.
120             #pod
121             #pod =item end
122             #pod
123             #pod An ISO8601 date/time for the end of the series points to return,
124             #pod inclusive.
125             #pod
126             #pod =item tags
127             #pod
128             #pod B: Graphite does not support per-value tags. Using this field will
129             #pod cause a fatal error.
130             #pod
131             #pod =back
132             #pod
133             #pod =cut
134              
135             sub read_ts {
136 1     1 1 405 my ( $self, $query ) = @_;
137 1 50 33     7 die "Tags are not supported by Graphite" if $query->{tags} && keys %{ $query->{tags} };
  0         0  
138 1         2 my $metric = $query->{ metric };
139              
140 1         5 my %form = (
141             target => $metric,
142             format => 'json',
143             noNullPoints => 'true',
144             );
145              
146 1 50       3 if ( $query->{ start } ) {
147 0         0 $form{ from } = _format_graphite_dt( $query->{start} );
148             }
149 1 50       3 if ( $query->{ end } ) {
150 0         0 $form{ until } = _format_graphite_dt( $query->{end} );
151             }
152              
153 1         11 my $url = URI->new( sprintf 'http://%s:%s/render', $self->{host}, $self->{http_port} );
154 1         5102 $url->query_form( %form );
155              
156             #; say "Fetching $url";
157 1         139 my $res = $self->http_client->GET( $url )->get;
158              
159             #; say $res->decoded_content;
160 1 50       226 if ( $res->is_error ) {
161 0         0 die sprintf "Error fetching metric '%s': " . $res->decoded_content . "\n", $metric;
162             }
163              
164 1         15 my $result = decode_json( $res->decoded_content );
165 1         1546 my @points;
166 1         2 for my $series ( @{ $result } ) {
  1         3  
167 1         2 for my $point ( @{ $series->{datapoints} } ) {
  1         3  
168             push @points, {
169             metric => $series->{target},
170 3         236 timestamp => Time::Piece->gmtime( $point->[1] )->datetime,
171             value => $point->[0],
172             };
173             }
174             }
175              
176 1         54 return @points;
177             }
178              
179             #pod =method write_ts
180             #pod
181             #pod $db->write_ts( @points );
182             #pod
183             #pod Write time series points to the database. C<@points> is an array
184             #pod of hashrefs with the following keys:
185             #pod
186             #pod =over
187             #pod
188             #pod =item metric
189             #pod
190             #pod The metric to write. For Graphite, this is a path separated by dots
191             #pod (C<.>).
192             #pod
193             #pod =item timestamp
194             #pod
195             #pod An ISO8601 timestamp. Optional. Defaults to the current time on the
196             #pod InfluxDB server.
197             #pod
198             #pod =item value
199             #pod
200             #pod The metric value.
201             #pod
202             #pod =item tags
203             #pod
204             #pod B: Graphite does not support per-value tags. Using this field will
205             #pod cause a fatal error.
206             #pod
207             #pod =back
208             #pod
209             #pod =cut
210              
211             sub write_ts {
212 1     1 1 10 my ( $self, @points ) = @_;
213 1         4 my $sock = $self->write_client;
214 1         3 for my $point ( @points ) {
215 2 50 33     320 die "Tags are not supported by Graphite" if $point->{tags} && keys %{ $point->{tags} };
  0         0  
216 2         5 $point->{timestamp} =~ s/[.]\d+Z?$//; # We do not support nanoseconds
217             $sock->write(
218             join( " ", $point->{metric}, $point->{value},
219 2         11 Time::Piece->strptime( $point->{timestamp}, '%Y-%m-%dT%H:%M:%S' )->epoch, )
220             . "\n",
221             );
222             }
223 1         97 return;
224             }
225              
226             #=sub _format_graphite_dt
227             #
228             # my $graphite_dt = _format_graphite_dt( $iso_dt );
229             #
230             # Graphite supports two date/time formats: YYYYMMDD and, bizarrely,
231             # HH:MM_YYYYMMDD
232             sub _format_graphite_dt {
233 0     0     my ( $iso ) = @_;
234 0 0         if ( $iso =~ /^(\d{4})-?(\d{2})-?(\d{2})$/ ) {
235 0           return join "", $1, $2, $3;
236             }
237 0           $iso =~ /^(\d{4})-?(\d{2})-?(\d{2})[T ]?(\d{2}):?(\d{2})/;
238 0           return "$4:$5_$1$2$3";
239             }
240              
241             1;
242              
243             __END__