File Coverage

blib/lib/App/ElasticSearch/Utilities/Metrics.pm
Criterion Covered Total %
statement 26 134 19.4
branch 0 46 0.0
condition 0 10 0.0
subroutine 9 19 47.3
pod 4 4 100.0
total 39 213 18.3


line stmt bran cond sub pod time code
1             package App::ElasticSearch::Utilities::Metrics;
2             # ABSTRACT: Fetches performance metrics about the node
3              
4              
5 1     1   379068 use v5.16;
  1         4  
6 1     1   7 use warnings;
  1         3  
  1         112  
7              
8             our $VERSION = '8.8'; # VERSION
9              
10 1     1   706 use App::ElasticSearch::Utilities qw(es_connect);
  1         6  
  1         11  
11 1     1   390 use CLI::Helpers qw(:output);
  1         2  
  1         8  
12 1     1   262 use JSON::MaybeXS;
  1         2  
  1         93  
13 1     1   7 use Ref::Util qw(is_ref is_arrayref is_hashref);
  1         3  
  1         75  
14 1     1   7 use Types::Standard qw( ArrayRef Bool HashRef InstanceOf Int Str );
  1         3  
  1         20  
15              
16 1     1   3154 use Moo;
  1         2  
  1         10  
17 1     1   684 use namespace::autoclean;
  1         2  
  1         27  
18              
19              
20             has 'connection' => (
21             is => 'ro',
22             isa => InstanceOf['App::ElasticSearch::Utilities::Connection'],
23             default => sub { es_connect() },
24             handles => [qw(host port request)],
25             );
26              
27             my @_IGNORES = qw(
28             _all _shards
29             attributes id timestamp uptime_in_millis
30             );
31              
32              
33             has 'ignore' => (
34             is => 'lazy',
35             isa => ArrayRef[Str],
36             default => sub {
37             my ($self) = @_;
38              
39             my %roles = map { $_ => 1 } @{ $self->node_details->{roles} };
40             my @ignore = qw(adaptive_selection discovery);
41              
42             # Easy roles and sections are the same
43             foreach my $section ( qw(ingest ml transform) ) {
44             push @ignore, $section
45             unless $roles{$section};
46             }
47              
48             if( ! $roles{ml} ) {
49             push @ignore, qw(ml_datafeed ml_job_comms ml_utility);
50             }
51              
52             # Skip some sections if we're not a data node
53             if( ! grep { /^data/ } keys %roles ) {
54             push @ignore, qw(force_merge indexing indices merges pressure recovery segments translog);
55             }
56              
57             return \@ignore;
58             },
59             );
60              
61              
62             has 'node_details' => (
63             is => 'lazy',
64             isa => HashRef,
65             init_arg => undef,
66             );
67              
68             sub _build_node_details {
69 0     0     my ($self) = @_;
70              
71 0 0         if( my $res = $self->request('_nodes/_local')->content ) {
72 0 0         if( my $nodes = $res->{nodes} ) {
73 0           my ($id) = keys %{ $nodes };
  0            
74             return {
75 0           %{ $nodes->{$id} },
  0            
76             id => $id,
77             }
78             }
79             }
80              
81             # Fail our type check
82 0           return;
83             }
84              
85              
86             has 'node_id' => (
87             is => 'lazy',
88             isa => Str,
89             );
90              
91             sub _build_node_id {
92 0     0     my ($self) = @_;
93              
94 0 0         if( my $details = $self->node_details ) {
95 0           return $details->{id};
96             }
97              
98 0           warn sprintf "unable to determine node_id for %s:%d",
99             $self->host, $self->port;
100              
101             # Fail our type check
102 0           return;
103             }
104              
105              
106             has 'with_cluster_metrics' => (
107             is => 'lazy',
108             isa => Bool,
109             builder => sub {
110 0     0     my ($self) = @_;
111 0 0         if( my $info = $self->node_details ) {
112 0           return !!grep { $_ eq 'master' } @{ $info->{roles} };
  0            
  0            
113             }
114 0           return 0;
115             },
116             );
117              
118              
119              
120             has 'with_index_metrics' => (
121             is => 'lazy',
122             isa => Bool,
123             builder => sub {
124 0     0     my ($self) = @_;
125 0 0         if( my $info = $self->node_details ) {
126 0           return !!grep { /^data/ } @{ $info->{roles} };
  0            
  0            
127             }
128 0           return 0;
129             },
130             );
131              
132              
133             sub get_metrics {
134 0     0 1   my ($self) = @_;
135              
136             # Fetch Node Local Stats
137 0           my @collected = $self->collect_node_metrics();
138              
139 0 0         push @collected, $self->collect_cluster_metrics()
140             if $self->with_cluster_metrics;
141              
142 0 0         push @collected, $self->collect_index_metrics()
143             if $self->with_index_metrics;
144              
145             # Flatten Collected and Return the Stats
146 0           return \@collected;
147             }
148              
149              
150             sub collect_node_metrics {
151 0     0 1   my ($self) = @_;
152              
153 0 0         if( my $res = $self->request('_nodes/_local/stats')->content ) {
154 0           return $self->_stat_collector( $res->{nodes}{$self->node_id} );
155             }
156              
157             # Explicit return of empty list
158 0           return;
159             }
160              
161              
162             sub collect_cluster_metrics {
163 0     0 1   my ($self) = @_;
164              
165 0           my @stats = ();
166              
167 0 0         if( my $res = $self->request('_cluster/health')->content ) {
168             push @stats,
169             { key => "cluster.nodes.total", value => $res->{number_of_nodes}, },
170             { key => "cluster.nodes.data", value => $res->{number_of_data_nodes}, },
171             { key => "cluster.shards.primary", value => $res->{active_primary_shards}, },
172             { key => "cluster.shards.active", value => $res->{active_shards}, },
173             { key => "cluster.shards.initializing", value => $res->{initializing_shards}, },
174             { key => "cluster.shards.relocating", value => $res->{relocating_shards}, },
175             { key => "cluster.shards.unassigned", value => $res->{unassigned_shards}, },
176 0           ;
177             }
178 0           push @stats, $self->_collect_index_blocks();
179 0           return @stats;
180             }
181              
182             sub _collect_index_blocks {
183 0     0     my ($self) = @_;
184              
185 0           my @req = (
186             '_settings/index.blocks.*',
187             {
188             index => '_all',
189             uri_param => {
190             flat_settings => 'true',
191             },
192             },
193             );
194              
195 0 0         if( my $res = $self->request(@req)->content ) {
196 0           my %collected=();
197 0           foreach my $idx ( keys %{ $res } ) {
  0            
198 0 0         if( my $settings = $res->{$idx}{settings} ) {
199 0           foreach my $block ( keys %{ $settings } ) {
  0            
200 0           my $value = $settings->{$block};
201 0 0         if( lc $value eq 'true') {
202 0   0       $collected{$block} ||= 0;
203 0           $collected{$block}++;
204             }
205             }
206             }
207             }
208 0           return map { { key => "cluster.$_", value => $collected{$_} } } sort keys %collected;
  0            
209             }
210              
211             # Explicit return of empty list
212 0           return;
213             }
214              
215              
216             sub collect_index_metrics {
217 0     0 1   my ($self) = @_;
218              
219 0           my $id = $self->node_id;
220 0           my $shardres = $self->request('_cat/shards',
221             {
222             uri_param => {
223             local => 'true',
224             format => 'json',
225             bytes => 'b',
226             h => join(',', qw( index prirep docs store id state )),
227             }
228             }
229             )->content;
230              
231 0           my %results;
232 0           foreach my $shard ( @{ $shardres } ) {
  0            
233             # Skip unallocated shards
234 0 0         next unless $shard->{id};
235              
236             # Skip unless this shard is allocated to this shard
237 0 0         next unless $shard->{id} eq $id;
238              
239             # Skip "Special" Indexes
240 0 0         next if $shard->{index} =~ /^\./;
241              
242             # Figure out the Index Basename
243 0           my $index = $shard->{index} =~ s/[-_]\d{4}([.-])\d{2}\g{1}\d{2}(?:[-_.]\d+)?$//r;
244 0 0         next unless $index;
245 0           $index =~ s/[^a-zA-Z0-9]+/_/g;
246              
247 0 0         my $type = $shard->{prirep} eq 'p' ? 'primary' : 'replica';
248              
249             # Initialize
250 0   0       $results{$index} ||= { map { $_ => 0 } qw( docs bytes primary replica ) };
  0            
251 0   0       $results{$index}->{state} ||= {};
252 0   0       $results{$index}->{state}{$shard->{state}} ||= 0;
253 0           $results{$index}->{state}{$shard->{state}}++;
254              
255             # Add it up, Add it up
256 0           $results{$index}->{docs} += $shard->{docs};
257 0           $results{$index}->{bytes} += $shard->{store};
258 0           $results{$index}->{$type}++;
259             }
260              
261 0           my @results;
262 0           foreach my $idx (sort keys %results) {
263 0           foreach my $k ( sort keys %{ $results{$idx} } ) {
  0            
264             # Skip the complex
265 0 0         next if ref $results{$idx}->{$k};
266             push @results,
267             {
268             key => sprintf("node.indices.%s.%s", $idx, $k),
269 0           value => $results{$idx}->{$k},
270             };
271             }
272 0   0       my $states = $results{$idx}->{state} || {};
273              
274 0           foreach my $k ( sort keys %{ $states } ) {
  0            
275             push @results,
276             {
277             key => sprintf("node.indices.%s.state.%s", $idx, $k),
278 0           value => $states->{$k},
279             };
280             }
281             }
282 0           return @results;
283             }
284              
285             #------------------------------------------------------------------------#
286             # Parse Statistics Dynamically
287             sub _stat_collector {
288 0     0     my $self = shift;
289 0           my $ref = shift;
290 0           my @path = @_;
291 0           my @stats = ();
292              
293             # Base Case
294 0 0         return unless is_hashref($ref);
295              
296 0           my %ignores = map { $_ => 1 } @{ $self->ignore }, @_IGNORES;
  0            
  0            
297 0           foreach my $key (sort keys %{ $ref }) {
  0            
298             # Skip uninteresting keys
299 0 0         next if $ignores{$key};
300              
301             # Skip peak values, we'll see those in the graphs.
302 0 0         next if $key =~ /^peak/;
303              
304             # Sanitize Key Name
305 0           my $key_name = $key;
306 0           $key_name =~ s/(?:_time)?(?:_in)?_millis/_ms/;
307 0           $key_name =~ s/(?:size_)?in_bytes/bytes/;
308 0           $key_name =~ s/[^a-zA-Z0-9]+/_/g;
309              
310 0 0         if( is_hashref($ref->{$key}) ) {
    0          
311             # Recurse
312 0           push @stats, $self->_stat_collector($ref->{$key},@path,$key_name);
313             }
314             elsif( $ref->{$key} =~ /^\d+(?:\.\d+)?$/ ) {
315             # Numeric
316             push @stats, {
317             key => join('.',@path,$key_name),
318 0           value => $ref->{$key},
319             };
320             }
321             }
322              
323 0           return @stats;
324             }
325              
326             __PACKAGE__->meta->make_immutable;
327              
328             __END__
329              
330             =pod
331              
332             =head1 NAME
333              
334             App::ElasticSearch::Utilities::Metrics - Fetches performance metrics about the node
335              
336             =head1 VERSION
337              
338             version 8.8
339              
340             =head1 SYNOPSIS
341              
342             This provides a simple API to export some core metrics from the local
343             ElasticSearch instance.
344              
345             use App::ElasticSearch::Utilities qw(es_connect);
346             use App::ElasticSearch::Utilities::Metrics;
347              
348             my $metrics_fetcher = App::ElasticSearch::Utilities::Metrics->new(
349             connection => es_connect(),
350             with_cluster_metrics => 1,
351             with_index_metrics => 1,
352             );
353              
354             my $metrics = $metrics_fetcher->get_metrics();
355              
356             =head1 ATTRIBUTES
357              
358             =head2 connection
359              
360             An `App::ElasticSearch::Utilities::Connection` instance, or automatically
361             created via C<es_connect()>.
362              
363             =head2 ignore
364              
365             An array of metric names to ignore, in addition to the static list when parsing
366             the `_node/_local/stats` stats. Defaults to:
367              
368             [qw(adaptive_selection discovery)]
369              
370             Plus ignores sections containing C<ingest>, C<ml>, C<transform> B<UNLESS> those
371             roles appear in the node's roles. Also, unless the node is tagged as a
372             C<data*> node, the following keys are ignored:
373              
374             [qw(force_merge indexing indices merges pressure recovery segments translog)]
375              
376             =head2 node_details
377              
378             The Node details provided by the C<_nodes/_local> API.
379              
380             =head2 node_id
381              
382             The Node ID for the connection, will be automatically discovered
383              
384             =head2 with_cluster_metrics
385              
386             Boolean, set to true to collect cluster metrics in addition to node metrics
387              
388             =head2 with_index_metrics
389              
390             Boolean, set to true to collect index level metrics in addition to node metrics
391              
392             =head1 METHODS
393              
394             =head2 get_metrics()
395              
396             Retrieves the metrics from the local node.
397              
398             =head2 collect_node_metrics()
399              
400             Returns all relevant stats from the C<_nodes/_local> API
401              
402             =head2 collect_cluster_metrics()
403              
404             Return all relevant stats from the C<_cluster/health> API as well as a count of
405             `index.blocks.*` in place.
406              
407             =head2 collect_index_metrics()
408              
409             This method totals the shard, and segment state and size for the current node by index base name.
410              
411             =head1 AUTHOR
412              
413             Brad Lhotsky <brad@divisionbyzero.net>
414              
415             =head1 COPYRIGHT AND LICENSE
416              
417             This software is Copyright (c) 2024 by Brad Lhotsky.
418              
419             This is free software, licensed under:
420              
421             The (three-clause) BSD License
422              
423             =cut