File Coverage

blib/lib/ETL/Yertl/Adapter/graphite.pm
Criterion Covered Total %
statement 73 86 84.8
branch 15 24 62.5
condition 11 22 50.0
subroutine 14 15 93.3
pod 3 5 60.0
total 116 152 76.3


line stmt bran cond sub pod time code
1             package ETL::Yertl::Adapter::graphite;
2             our $VERSION = '0.037';
3             # ABSTRACT: Adapter to read/write from Graphite time series database
4              
5             #pod =head1 SYNOPSIS
6             #pod
7             #pod my $db = ETL::Yertl::Adapter::graphite->new( 'graphite://localhost:8086' );
8             #pod my @points = $db->read_ts( { metric => 'db.cpu_load.1m' } );
9             #pod $db->write_ts( { metric => 'db.cpu_load.1m', value => 1.23 } );
10             #pod
11             #pod =head1 DESCRIPTION
12             #pod
13             #pod This class allows Yertl to read and write time series from L
14             #pod Graphite time series system|https://graphiteapp.org>. Yertl can write to
15             #pod Carbon using the "plaintext" protocol, and read from Graphite using its
16             #pod HTTP API.
17             #pod
18             #pod This adapter is used by the L command.
19             #pod
20             #pod =head2 Metric Name Format
21             #pod
22             #pod Graphite metrics are paths separated by C<.>, which is the native format
23             #pod that Yertl supports.
24             #pod
25             #pod yts graphite://localhost foo.bar.baz
26             #pod yts graphite://localhost foo.bar
27             #pod
28             #pod =head1 SEE ALSO
29             #pod
30             #pod L, L,
31             #pod L
32             #pod L
33             #pod
34             #pod =cut
35              
36 1     1   157258 use ETL::Yertl;
  1         2  
  1         10  
37 1     1   350 use Net::Async::HTTP;
  1         37938  
  1         33  
38 1     1   8 use URI;
  1         2  
  1         19  
39 1     1   250 use JSON::MaybeXS qw( decode_json );
  1         3799  
  1         52  
40 1     1   7 use List::Util qw( first );
  1         2  
  1         42  
41 1     1   5 use IO::Async::Loop;
  1         2  
  1         15  
42 1     1   287 use Time::Piece ();
  1         6122  
  1         25  
43 1     1   7 use Scalar::Util qw( looks_like_number );
  1         2  
  1         903  
44              
45             #pod =method new
46             #pod
47             #pod my $db = ETL::Yertl::Adapter::graphite->new( 'graphite://localhost' );
48             #pod my $db = ETL::Yertl::Adapter::graphite->new( 'graphite://localhost:8080' );
49             #pod
50             #pod Construct a new Graphite adapter for the database on the given host and port.
51             #pod Port is optional and defaults to C<2003> (the Carbon plaintext port) for writing
52             #pod and C<8080> (the default Graphite HTTP API port) for reading.
53             #pod
54             #pod =cut
55              
56             sub new {
57 5     5 1 32112 my $class = shift;
58              
59 5         9 my %args;
60 5 100       16 if ( @_ == 1 ) {
61 3 100       22 if ( $_[0] =~ m{://([^:]+)(?::([^/]+))?} ) {
62 2         10 ($args{host}, my $port ) = ( $1, $2 );
63 2 100       5 if ( $port ) {
64 1         5 @args{qw( http_port write_port )} = ( $port ) x 2;
65             }
66             }
67             }
68             else {
69 2         9 %args = @_;
70             }
71              
72 5 100       24 die "Host is required" unless $args{host};
73              
74 4   100     21 $args{write_port} ||= 2003; # "plaintext" port
75 4   100     14 $args{http_port} ||= 8080; # http port
76              
77 4         12 return bless \%args, $class;
78             }
79              
80             sub _loop {
81 1     1   3 my ( $self ) = @_;
82 1   33     16 return $self->{_loop} ||= IO::Async::Loop->new;
83             }
84              
85             sub write_client {
86 1     1 0 3 my ( $self ) = @_;
87             return $self->{write_client} ||= $self->_loop->connect(
88             socktype => 'stream',
89             host => $self->{host},
90             service => $self->{write_port},
91 1   33     5 )->get;
92             }
93              
94             sub http_client {
95 1     1 0 3 my ( $self ) = @_;
96 1   33     5 return $self->{http_client} ||= do {
97 1         15 my $http = Net::Async::HTTP->new;
98 1         158 $self->_loop->add( $http );
99 1         79 $http;
100             };
101             }
102              
103             #pod =method read_ts
104             #pod
105             #pod my @points = $db->read_ts( $query );
106             #pod
107             #pod Read a time series from the database. C<$query> is a hash reference
108             #pod with the following keys:
109             #pod
110             #pod =over
111             #pod
112             #pod =item metric
113             #pod
114             #pod The time series to read. For Graphite, this is a path separated by dots
115             #pod (C<.>).
116             #pod
117             #pod =item start
118             #pod
119             #pod An ISO8601 date/time for the start of the series points to return,
120             #pod inclusive.
121             #pod
122             #pod =item end
123             #pod
124             #pod An ISO8601 date/time for the end of the series points to return,
125             #pod inclusive.
126             #pod
127             #pod =item tags
128             #pod
129             #pod B: Graphite does not support per-value tags. Using this field will
130             #pod cause a fatal error.
131             #pod
132             #pod =back
133             #pod
134             #pod =cut
135              
136             sub read_ts {
137 1     1 1 443 my ( $self, $query ) = @_;
138 1 50 33     7 die "Tags are not supported by Graphite" if $query->{tags} && keys %{ $query->{tags} };
  0         0  
139 1         2 my $metric = $query->{ metric };
140              
141 1         6 my %form = (
142             target => $metric,
143             format => 'json',
144             noNullPoints => 'true',
145             );
146              
147 1 50       3 if ( $query->{ start } ) {
148 0         0 $form{ from } = _format_graphite_dt( $query->{start} );
149             }
150 1 50       4 if ( $query->{ end } ) {
151 0         0 $form{ until } = _format_graphite_dt( $query->{end} );
152             }
153              
154 1         16 my $url = URI->new( sprintf 'http://%s:%s/render', $self->{host}, $self->{http_port} );
155 1         5792 $url->query_form( %form );
156              
157             #; say "Fetching $url";
158 1         150 my $res = $self->http_client->GET( $url )->get;
159              
160             #; say $res->decoded_content;
161 1 50       235 if ( $res->is_error ) {
162 0         0 die sprintf "Error fetching metric '%s': " . $res->decoded_content . "\n", $metric;
163             }
164              
165 1         17 my $result = decode_json( $res->decoded_content );
166 1         1803 my @points;
167 1         3 for my $series ( @{ $result } ) {
  1         3  
168 1         2 for my $point ( @{ $series->{datapoints} } ) {
  1         3  
169             push @points, {
170             metric => $series->{target},
171 3         226 timestamp => Time::Piece->gmtime( $point->[1] )->datetime,
172             value => $point->[0],
173             };
174             }
175             }
176              
177 1         57 return @points;
178             }
179              
180             #pod =method write_ts
181             #pod
182             #pod $db->write_ts( @points );
183             #pod
184             #pod Write time series points to the database. C<@points> is an array
185             #pod of hashrefs with the following keys:
186             #pod
187             #pod =over
188             #pod
189             #pod =item metric
190             #pod
191             #pod The metric to write. For Graphite, this is a path separated by dots
192             #pod (C<.>).
193             #pod
194             #pod =item timestamp
195             #pod
196             #pod An ISO8601 timestamp or UNIX epoch time. Optional. Defaults to the
197             #pod current time.
198             #pod
199             #pod =item value
200             #pod
201             #pod The metric value.
202             #pod
203             #pod =item tags
204             #pod
205             #pod B: Graphite does not support per-value tags. Using this field will
206             #pod cause a fatal error.
207             #pod
208             #pod =back
209             #pod
210             #pod =cut
211              
212             sub write_ts {
213 1     1 1 19 my ( $self, @points ) = @_;
214 1         4 my $sock = $self->write_client;
215 1         4 for my $point ( @points ) {
216 3 50 33     282 die "Tags are not supported by Graphite" if $point->{tags} && keys %{ $point->{tags} };
  0         0  
217 3   66     12 $point->{timestamp} ||= time;
218 3         11 $point->{timestamp} =~ s/[.]\d+Z?$//; # We do not support nanoseconds
219 3 100       16 if ( !looks_like_number( $point->{timestamp} ) ) {
220 1         10 $point->{timestamp} = Time::Piece->strptime( $point->{timestamp}, '%Y-%m-%dT%H:%M:%S' )->epoch;
221             }
222             $sock->write(
223             join( " ", $point->{metric}, $point->{value}, $point->{timestamp}, )
224 3         129 . "\n",
225             );
226             }
227 1         41 return;
228             }
229              
230             #=sub _format_graphite_dt
231             #
232             # my $graphite_dt = _format_graphite_dt( $iso_dt );
233             #
234             # Graphite supports two date/time formats: YYYYMMDD and, bizarrely,
235             # HH:MM_YYYYMMDD
236             sub _format_graphite_dt {
237 0     0     my ( $iso ) = @_;
238 0 0         if ( looks_like_number( $iso ) ) {
239 0           my $t = Time::Piece->new( $iso );
240 0           return sprintf "%s:%s_%s%s%s", $t->hour, $t->min, $t->year, $t->mon, $t->mday;
241             }
242 0 0         if ( $iso =~ /^(\d{4})-?(\d{2})-?(\d{2})$/ ) {
243 0           return join "", $1, $2, $3;
244             }
245 0           $iso =~ /^(\d{4})-?(\d{2})-?(\d{2})[T ]?(\d{2}):?(\d{2})/;
246 0           return "$4:$5_$1$2$3";
247             }
248              
249             1;
250              
251             __END__