File Coverage

blib/lib/App/ElasticSearch/Utilities/Query.pm
Criterion Covered Total %
statement 82 186 44.0
branch 18 56 32.1
condition 5 22 22.7
subroutine 19 29 65.5
pod 13 13 100.0
total 137 306 44.7


line stmt bran cond sub pod time code
1             package App::ElasticSearch::Utilities::Query;
2             # ABSTRACT: Object representing ES Queries
3              
4 2     2   557 use strict;
  2         10  
  2         62  
5 2     2   11 use warnings;
  2         4  
  2         128  
6              
7             our $VERSION = '8.6'; # VERSION
8              
9 2     2   596 use App::ElasticSearch::Utilities qw(es_request es_indices);
  2         6  
  2         17  
10 2     2   1764 use App::ElasticSearch::Utilities::Aggregations;
  2         5  
  2         16  
11 2     2   1129 use CLI::Helpers qw(:output);
  2         5  
  2         17  
12 2     2   301 use Clone qw(clone);
  2         5  
  2         153  
13 2     2   14 use Const::Fast;
  2         5  
  2         17  
14 2     2   111 use Moo;
  2         6  
  2         15  
15 2     2   743 use Ref::Util qw(is_arrayref is_hashref);
  2         4  
  2         110  
16 2     2   12 use Types::Standard qw(ArrayRef Enum HashRef Int Maybe Str);
  2         6  
  2         32  
17 2     2   3436 use Types::ElasticSearch qw(TimeConstant is_TimeConstant);
  2         428  
  2         18  
18 2     2   944 use namespace::autoclean;
  2         5  
  2         18  
19              
20             const my $AGG_KEY => 'aggregations';
21             my %TO = (
22             array_ref => sub { defined $_[0] && is_arrayref($_[0]) ? $_[0] : defined $_[0] ? [ $_[0] ] : $_[0] },
23             );
24              
25              
26             has fields_meta => (
27             is => 'rw',
28             isa => HashRef,
29             default => sub { {} },
30             );
31              
32              
33             has query_stash => (
34             is => 'rw',
35             isa => HashRef,
36             lazy => 1,
37             init_arg => undef,
38             default => sub {{}},
39             );
40              
41              
42             has scroll_id => (
43             is => 'rw',
44             isa => Str,
45             init_arg => undef,
46             writer => 'set_scroll_id',
47             clearer => 1,
48             );
49              
50              
51              
52             has 'minimum_should_match' => (
53             is => 'rw',
54             isa => Str,
55             );
56              
57             my %QUERY = (
58             must => { isa => ArrayRef, coerce => $TO{array_ref} },
59             must_not => { isa => ArrayRef, coerce => $TO{array_ref} },
60             should => { isa => ArrayRef, coerce => $TO{array_ref} },
61             filter => { isa => ArrayRef, coerce => $TO{array_ref} },
62             nested => { isa => HashRef },
63             nested_path => { isa => Str },
64             );
65              
66              
67              
68             my %REQUEST_BODY = (
69             from => { isa => Int },
70             size => { default => sub {50}, isa => Int },
71             fields => { isa => ArrayRef, coerce => $TO{array_ref} },
72             sort => { isa => ArrayRef, coerce => $TO{array_ref} },
73             aggregations => { isa => HashRef },
74             );
75              
76              
77             my %PARAMS = (
78             scroll => { isa => Maybe[TimeConstant] },
79             timeout => { isa => TimeConstant },
80             terminate_after => { isa => Int },
81             track_total_hits => { isa => Enum[qw( true false )], default => sub { 'true' } },
82             track_scores => { isa => Enum[qw( true false )] },
83             search_type => { isa => Enum[qw( dfs_query_then_fetch query_then_fetch )] },
84             rest_total_hits_as_int => { isa => Enum[qw( true false )], default => sub { 'true' } },
85             );
86              
87             # Dynamically build our attributes
88             foreach my $attr (keys %QUERY) {
89             has $attr => (
90             is => 'rw',
91             lazy => 1,
92             writer => "set_$attr",
93             %{ $QUERY{$attr} },
94             );
95             }
96             foreach my $attr (keys %REQUEST_BODY) {
97             has $attr => (
98             is => 'rw',
99             lazy => 1,
100             writer => "set_$attr",
101             init_arg => undef,
102             %{ $REQUEST_BODY{$attr} },
103             );
104             }
105             foreach my $attr (keys %PARAMS) {
106             has $attr => (
107             is => 'rw',
108             lazy => 1,
109             writer => "set_$attr",
110             %{ $PARAMS{$attr} },
111             );
112             }
113              
114              
115             sub as_search {
116 0     0 1 0 my ($self,$indexes) = @_;
117             return (
118 0         0 _search => {
119             index => $indexes,
120             uri_param => $self->uri_params,
121             method => 'POST',
122             },
123             $self->request_body,
124             );
125             }
126              
127              
128             sub execute {
129 0     0 1 0 my($self,$indexes) = @_;
130              
131             # Default to context based indexes
132 0   0     0 $indexes ||= es_indices();
133              
134 0         0 my $result = es_request( $self->as_search($indexes) );
135              
136 0 0       0 if( $result->{_scroll_id} ) {
137             $self->set_scroll_id($result->{_scroll_id})
138 0         0 }
139              
140 0         0 return $result;
141             }
142              
143              
144             sub scroll_results {
145 0     0 1 0 my($self) = @_;
146 0         0 my $result;
147 0 0       0 if( $self->scroll_id ) {
148 0         0 $result = es_request( '_search/scroll',
149             { method => 'POST' },
150             {
151             scroll => $self->scroll,
152             scroll_id => $self->scroll_id,
153             }
154             );
155 0 0 0     0 if( $result && $result->{_scroll_id} ) {
156             $self->set_scroll_id($result->{_scroll_id})
157 0         0 }
158             else {
159 0         0 $self->clear_scroll_id();
160             }
161             }
162 0 0       0 return $result ? $result : ();
163             }
164              
165              
166             sub uri_params {
167 0     0 1 0 my $self = shift;
168              
169 0         0 my %params=();
170 0         0 foreach my $field (keys %PARAMS) {
171 0         0 my $v;
172 0         0 eval {
173             ## no critic
174 2     2   2008 no strict 'refs';
  2         5  
  2         368  
175 0         0 $v = $self->$field();
176             ## user critic
177             };
178 0 0       0 debug({color=>'magenta'}, sprintf "uri_params() - retrieving param '%s' got '%s'",
179             $field, ( defined $v ? $v : '' ),
180             );
181              
182 0 0       0 next unless defined $v;
183 0         0 $params{$field} = $v;
184             }
185 0         0 return \%params;
186             }
187              
188              
189             sub request_body {
190 0     0 1 0 my $self = shift;
191              
192 0         0 my %body = ();
193 0         0 my %map = qw( fields _source );
194 0         0 foreach my $section (keys %REQUEST_BODY) {
195 0         0 my $val;
196             eval {
197             ## no critic
198 2     2   16 no strict 'refs';
  2         6  
  2         585  
199 0         0 $val = $self->$section;
200             ## use critic
201 0         0 1;
202 0 0       0 } or do {
203 0         0 debug({color=>'red'}, "request_body() - Failed to retrieve '$section'");
204             };
205 0 0       0 next unless defined $val;
206 0         0 my $data = { $section => $val };
207 0   0     0 my $param = $map{$section} || $section;
208 0         0 $body{$param} = $val;
209             }
210 0         0 $body{query} = $self->query;
211 0         0 return \%body;
212             }
213              
214              
215             sub query {
216 11     11 1 66 my $self = shift;
217              
218 11         23 my $qref;
219 11 50       204 if( $self->nested ) {
220 0         0 $qref = {
221             nested => {
222             path => $self->nested_path,
223             query => $self->nested,
224             }
225             };
226             }
227             else {
228 11 100       245 my %bool = (
229             $self->minimum_should_match ? ( minimum_should_match => $self->minimum_should_match ) : (),
230             );
231 11         124 foreach my $k (keys %QUERY) {
232 66 100       192 next if $k =~ /^nested/;
233 44         95 $bool{$k} = [];
234 44         73 my $v;
235             eval {
236 44         210 debug({color=>'yellow'}, "query() - retrieving section '$k'");
237             ## no critic
238 2     2   24 no strict 'refs';
  2         10  
  2         2029  
239 44         1135 $v = $self->$k();
240             ## user critic
241 44 100 66     388 debug_var({color=>'cyan'},$v) if defined $v and ref $v;
242 44         22582 1;
243 44 50       70 } or do {
244 0         0 debug({color=>'red'}, "query() - Failed to retrieve '$k'");
245             };
246 44 100       326 $bool{$k} = clone $v if defined $v;
247 44 50       112 if(my $stash = $self->stash($k)) {
248 0 0       0 push @{ $bool{$k} }, is_arrayref($stash) ? @{ $stash } : $stash;
  0         0  
  0         0  
249             }
250 44 100 33     192 delete $bool{$k} if exists $bool{$k} and is_arrayref($bool{$k}) and not @{ $bool{$k} };
  44   66     176  
251             }
252 11         46 $qref = { bool => \%bool };
253             }
254 11         91 return $qref;
255             }
256              
257              
258             sub add_aggregations {
259 0     0 1 0 my $self = shift;
260 0         0 my %aggs = @_;
261              
262 0         0 my $aggs = $self->aggregations();
263 0   0     0 $aggs ||= {};
264 0         0 foreach my $agg (keys %aggs) {
265 0         0 debug("aggregation[$agg] added to query");
266 0         0 $aggs->{$agg} = $aggs{$agg};
267             }
268 0         0 $self->set_aggregations($aggs);
269 0         0 $self->set_size(0);
270 0         0 $self->set_scroll(undef);
271             }
272              
273              
274             sub wrap_aggregations {
275 0     0 1 0 my $self = shift;
276 0         0 my %wrapper = @_;
277 0         0 my $aggs = $self->aggregations;
278              
279 0 0       0 if( keys %{ $aggs } ) {
  0         0  
280 0         0 foreach my $a (keys %wrapper) {
281 0         0 $wrapper{$a}->{$AGG_KEY} = clone $aggs;
282             }
283             }
284              
285 0         0 $self->set_aggregations(\%wrapper);
286             }
287              
288              
289             sub aggregations_by {
290 0     0 1 0 my ($self,$dir,$aggs) = @_;
291              
292 0         0 my @sort = ();
293 0         0 my %aggs = ();
294 0         0 foreach my $def (@{ $aggs }) {
  0         0  
295 0         0 my ($name,$agg) = %{ expand_aggregate_string($def) };
  0         0  
296 0 0       0 next unless is_single_stat(keys %{ $agg });
  0         0  
297 0         0 $aggs{$name} = $agg;
298 0         0 push @sort, { $name => $dir };
299             }
300 0 0       0 if( @sort ) {
301 0         0 push @sort, { '_count' => 'desc' };
302              
303 0         0 my $ref_aggs = $self->aggregations;
304 0         0 foreach my $name ( keys %{ $ref_aggs } ) {
  0         0  
305 0         0 foreach my $k ( keys %{ $ref_aggs->{$name} } ) {
  0         0  
306 0 0       0 next if $k eq $AGG_KEY;
307 0         0 $ref_aggs->{$name}{$k}{order} = \@sort;
308 0         0 foreach my $agg (keys %aggs) {
309 0         0 $ref_aggs->{$name}{$AGG_KEY}{$agg} = $aggs{$agg};
310             }
311             }
312             }
313 0         0 $self->set_aggregations( $ref_aggs );
314             }
315             }
316              
317             # Support Short-hand like ES
318             *aggs = \&aggregations;
319             *set_aggs = \&set_aggregations;
320             *add_aggs = \&add_aggregations;
321             *wrap_aggs = \&wrap_aggregations;
322             *aggs_by = \&aggregations_by;
323              
324              
325              
326              
327             sub set_scan_scroll {
328 0     0 1 0 my ($self,$ctxt_life) = @_;
329              
330             # Validate Context Lifetime
331 0 0       0 if( !is_TimeConstant( $ctxt_life) ) {
332 0         0 undef($ctxt_life);
333             }
334 0   0     0 $ctxt_life ||= '1m';
335              
336 0         0 $self->set_sort( [qw(_doc)] );
337 0         0 $self->set_scroll( $ctxt_life );
338 0         0 $self;
339             }
340              
341             sub set_match_all {
342 0     0 1 0 my ($self) = @_;
343             # Reset the relevant pieces of the query
344 0         0 $self->set_must_not([]);
345 0         0 $self->set_filter([]);
346 0         0 $self->set_should([]);
347             # Set the match_all bits
348 0         0 $self->set_must({match_all=>{}});
349 0         0 $self;
350             }
351              
352              
353             sub add_bool {
354 12     12 1 63 my $self = shift;
355 12         37 my %bools = @_;
356 12         46 foreach my $section ( sort keys %bools ) {
357 12 50       44 next unless exists $QUERY{$section};
358             ## no critic
359 2     2   18 no strict 'refs';
  2         5  
  2         562  
360 12         267 my $set = $self->$section;
361 12 50       92 push @{ $set }, is_arrayref($bools{$section}) ? @{ $bools{$section} } : $bools{$section};
  12         44  
  0         0  
362 12         32 my $setter = "set_$section";
363 12         208 $self->$setter($set);
364             ## use critic
365             }
366 12         325 $self;
367             }
368              
369              
370             sub stash {
371 44     44 1 99 my ($self,$section,$condition) = @_;
372              
373 44         798 my $stash = $self->query_stash;
374 44 50       453 if( exists $QUERY{$section} ) {
375 44 50       99 if( defined $condition ) {
376 0 0       0 debug({color=>exists $stash->{$section} ? 'green' : 'red' }, "setting $section in stash");
377 0         0 $stash->{$section} = $condition;
378             # Reset Scroll ID
379 0         0 $self->clear_scroll_id();
380             }
381             }
382 44 50       136 return exists $stash->{$section} ? $stash->{$section} : undef;
383             }
384              
385             # Return True
386             1;
387              
388             __END__
389              
390             =pod
391              
392             =head1 NAME
393              
394             App::ElasticSearch::Utilities::Query - Object representing ES Queries
395              
396             =head1 VERSION
397              
398             version 8.6
399              
400             =head1 ATTRIBUTES
401              
402             =head2 fields_meta
403              
404             A hash reference with the field data from L<App::ElasticSearch::Utilities::es_index_fields>.
405              
406             =head2 query_stash
407              
408             Hash reference containing replaceable query elements. See L<stash>.
409              
410             =head2 scroll_id
411              
412             The scroll id for the last executed query. You shouldn't mess with this
413             directly. It's best to use the L<execute()> and L<scroll_results()> methods.
414              
415             =head2 must
416              
417             The must section of a bool query as an array reference. See: L<add_bool>
418             Can be set using set_must and is a valid init_arg.
419              
420             =head2 must_not
421              
422             The must_not section of a bool query as an array reference. See: L<add_bool>
423             Can be set using set_must_not and is a valid init_arg.
424              
425             =head2 should
426              
427             The should section of a bool query as an array reference. See: L<add_bool>
428             Can be set using set_should and is a valid init_arg.
429              
430             =head2 filter
431              
432             The filter section of a bool query as an array reference. See: L<add_bool>
433             Can be set using set_filter and is a valid init_arg.
434              
435             =head2 nested
436              
437             The nested query, this shortcircuits the rest of the query due to restrictions
438             on the nested queries.
439              
440             =head2 nested_path
441              
442             The path by being nested, only used in nested queries.
443              
444             =head2 minimum_should_match
445              
446             A string defining the minimum number of should conditions to qualify a match.
447             See L<https://www.elastic.co/guide/en/elasticsearch/reference/7.3/query-dsl-minimum-should-match.html>
448              
449             =head2 from
450              
451             Integer representing the offset the query should start returning documents from. The default is undefined, which
452             falls back on the Elasticsearch default of 0, or from the beginning.
453             Can be set with B<set_from>. Cannot be an init_arg.
454              
455             =head2 size
456              
457             The number of documents to return in the query. The default size is B<50>.
458             Can be set with B<set_size>. Cannot be an init_arg.
459              
460             =head2 fields
461              
462             An array reference containing the names of the fields to retrieve with the query. The default is undefined, which
463             falls back on the Elasticsearch default of empty, or no fields retrieved. The B<_source> is still retrieved.
464             Can be set with B<set_fields>. Cannot be an init_arg.
465              
466             =head2 sort
467              
468             An array reference of sorting keys/directions. The default is undefined, which falls back on the Elasticsearch
469             default of B<score:desc>.
470             Can be set with B<set_sort>. Cannot be an init_arg.
471              
472             =head2 aggregations
473              
474             A hash reference of aggergations to perform. The default is undefined, which means do not perform any aggregations.
475             Can be set with B<set_aggregations>, which is aliased as B<set_aggs>. Cannot be an init_arg.
476             Aliased as B<aggs>.
477              
478             =head2 scroll
479              
480             An L<ElasticSearch time constant|https://www.elastic.co/guide/en/elasticsearch/reference/master/common-options.html#time-units>.
481             The default is undefined, which means scroll will not be set on a query.
482             Can be set with B<set_scroll>. Cannot be an init_arg.
483             See also: L<set_scan_scroll>.
484              
485             =head2 timeout
486              
487             An L<ElasticSearch time constant|https://www.elastic.co/guide/en/elasticsearch/reference/master/common-options.html#time-units>.
488             The default is undefined, which means it will default to the connection timeout.
489             Can be set with B<set_timeout>. Cannot be an init_arg.
490              
491             =head2 terminate_after
492              
493             The number of documents to cancel the search after. This generally shouldn't be used except for
494             large queries where you are protecting against OOM Errors. The B<size> attribute is more accurate as it's
495             truncation occurs after the reduce operation, where B<terminate_after> occurs during the map phase of the query.
496             Can be set with B<set_terminateafter>. Cannot be an init_arg.
497              
498             =head2 track_total_hits
499              
500             Should the query attempt to calculate the number of hits the query would match.
501             Defaults to C<true>.
502              
503             =head2 track_scores
504              
505             Set to true to score every hit in the search results, set to false to not
506             report scores. Defaults to unset, i.e., use the ElasticSearch default.
507              
508             =head2 rest_total_hits_as_int
509              
510             In ElasticSearch 7.0, the total hits element became a hash reference with more
511             details. Since most of the tooling relies on the old behavior, this defaults
512             to C<true>.
513              
514             =head2 search_type
515              
516             Choose an execution path for the query. This is null by default, but you can
517             set it to a valid `search_type` setting, see:
518             L<https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-body.html#request-body-search-search-type>
519              
520             =head1 METHODS
521              
522             =head2 as_search( [ 'index1', 'index2' ] )
523              
524             Returns a list of parameters to pass directly to C<es_request()>.
525              
526             =head2 execute( [ $index1, $index2 ] )
527              
528             Uses `es_request()` to return the result, stores any relevant scroll data.
529              
530             =head2 scroll_results()
531              
532             If a scroll has been set, this will construct and run the requisite scroll
533             search, otherwise it returns undef.
534              
535             =head2 uri_params()
536              
537             Retrieves the URI parameters for the query as a hash reference. Undefined parameters
538             will not be represented in the hash.
539              
540             =head2 request_body()
541              
542             Builds and returns a hash reference representing the request body for the
543             Elasticsearch query. Undefined elements will not be represented in the hash.
544              
545             =head2 query()
546              
547             Builds and returns a hash reference represnting the bool query section of the
548             request body. This function is called by the L<request_body> function but is
549             useful and distinct enough to expose as it's own method. Undefined elements of
550             the query will not be represented in the hash it returns.
551              
552             =head2 add_aggregations( name => { ... } )
553              
554             Takes one or more key-value pairs. The key is the name of the aggregation.
555             The value being the hash reference representation of the aggregation itself.
556             It will silently replace a previously named aggregation with the most recent
557             call.
558              
559             Calling this function overrides the L<size> element to B<0> and disables L<scroll>.
560              
561             Aliased as B<add_aggs>.
562              
563             =head2 wrap_aggregations( name => { ... } )
564              
565             Use this to wrap an aggregation in another aggregation. For example:
566              
567             $q->add_aggregations(ip => { terms => { field => src_ip } });
568              
569             Creates:
570              
571             {
572             "aggs": {
573             "ip": {
574             "terms": {
575             "field": "src_ip"
576             }
577             }
578             }
579             }
580              
581             Would give you the top IP for the whole query set. To wrap that aggregation to get top IPs per hour, you could:
582              
583             $q->wrap_aggregations( hourly => { date_histogram => { field => 'timestamp', interval => '1h' } } );
584              
585             Which translates the query into:
586              
587             {
588             "aggs": {
589             "hourly": {
590             "date_histogram": {
591             "field": "timestamp",
592             "interval": "1h"
593             }
594             "aggs": {
595             "ip": {
596             "terms": {
597             "field": "src_ip"
598             }
599             }
600             }
601             }
602             }
603             }
604              
605             =head2 aggregations_by( [asc | desc] => aggregation_string )
606              
607             Applies a sort to all aggregations at the current level based on the
608             aggregation string.
609              
610             Aggregation strings are parsed with the
611             L<App::ElasticSearch::Utilities::Aggregations> C<expand_aggregate_string()>
612             functions.
613              
614             Examples:
615              
616             $q->aggregations_by( desc => [ qw( sum:bytes ) ] );
617             $q->aggregations_by( desc => [ qw( sum:bytes cardinality:user_agent ) ] );
618              
619             =head2 set_scan_scroll($ctxt_life)
620              
621             This function emulates the old scan scroll feature in early version of Elasticsearch. It takes
622             an optional L<ElasticSearch time constant|https://www.elastic.co/guide/en/elasticsearch/reference/master/common-options.html#time-units>,
623             but defaults to '1m'. It is the same as calling:
624              
625             $self->set_sort( [qw(_doc)] );
626             $self->set_scroll( $ctxt_life );
627              
628             =head2 set_match_all()
629              
630             This method clears all filters and query elements to and sets the must to match_all.
631             It will not reset other parameters like size, sort, and aggregations.
632              
633             =head2 add_bool( section => conditions .. )
634              
635             Appends a search condition to a section in the query body. Valid query body
636             points are: must, must_not, should, and filter.
637              
638             $q->add_bool( must => { term => { http_status => 200 } } );
639              
640             # or
641              
642             $q->add_bool(
643             must => [
644             { term => { http_method => 'GET' } }
645             { term => { client_ip => '10.10.10.1' } }
646             ]
647             must_not => { term => { http_status => 400 } },
648             );
649              
650             =head2 stash( section => condition )
651              
652             Allows a replaceable query element to exist in the query body sections: must, must_not,
653             should, and/or filter. This is useful for moving through a data-set preserving everthing in a query
654             except one piece that shifts. Imagine:
655              
656             my $query = App::ElasticSearch::Utilities::Query->new();
657             $query->add_bool(must => { terms => {src_ip => [qw(1.2.3.4)]} });
658             $query->add_bool(must => { range => { attack_score => { gt => 10 }} });
659              
660             while( 1 ) {
661             $query->stash( must => { range => { timestamp => { gt => time() } } } );
662             my @results = make_es_request( $query->request_body, $query->uri_params );
663              
664             # Long processing
665             }
666              
667             This allows re-use of the query object inside of loops like this.
668              
669             =for Pod::Coverage aggs
670             =for Pod::Coverage set_aggs
671             =for Pod::Coverage add_aggs
672             =for Pod::Coverage wrap_aggs
673             =for Pod::Coverage aggs_by
674              
675             =head1 AUTHOR
676              
677             Brad Lhotsky <brad@divisionbyzero.net>
678              
679             =head1 COPYRIGHT AND LICENSE
680              
681             This software is Copyright (c) 2023 by Brad Lhotsky.
682              
683             This is free software, licensed under:
684              
685             The (three-clause) BSD License
686              
687             =cut