File Coverage

blib/lib/App/ElasticSearch/Utilities/Connection.pm
Criterion Covered Total %
statement 38 108 35.1
branch 0 42 0.0
condition 0 34 0.0
subroutine 13 19 68.4
pod 4 4 100.0
total 55 207 26.5


line stmt bran cond sub pod time code
1             package App::ElasticSearch::Utilities::Connection;
2             # ABSTRACT: Abstract the connection element
3              
4              
5 4     4   50 use v5.16;
  4         14  
6 4     4   25 use warnings;
  4         10  
  4         190  
7              
8             our $VERSION = '8.7'; # VERSION
9              
10 4     4   27 use App::ElasticSearch::Utilities::HTTPRequest;
  4         8  
  4         96  
11 4     4   22 use CLI::Helpers qw(:output);
  4         8  
  4         36  
12 4     4   705 use JSON::MaybeXS;
  4         9  
  4         275  
13 4     4   27 use LWP::UserAgent;
  4         8  
  4         85  
14 4     4   28 use Module::Load;
  4         12  
  4         37  
15 4     4   241 use Ref::Util qw(is_ref is_arrayref is_hashref);
  4         20  
  4         274  
16 4     4   2452 use Types::Standard qw( Enum HashRef InstanceOf Int Str );
  4         297431  
  4         66  
17 4     4   5519 use URI;
  4         11  
  4         103  
18 4     4   26 use URI::QueryParam;
  4         17  
  4         80  
19              
20 4     4   2326 use Moo;
  4         50466  
  4         23  
21 4     4   8204 use namespace::autoclean;
  4         48701  
  4         35  
22              
23             has 'host' => (
24             is => 'ro',
25             isa => Str,
26             default => sub { 'localhost' },
27             );
28              
29             has 'port' => (
30             is => 'ro',
31             isa => Int,
32             default => sub { 9200 },
33             );
34              
35              
36             has 'proto' => (
37             is => 'rw',
38             isa => Enum[qw(http https)],
39             default => sub { 'http' },
40             );
41              
42              
43             has 'timeout' => (
44             is => 'ro',
45             isa => Int,
46             default => sub { 10 },
47             );
48              
49              
50             has 'username' => (
51             is => 'ro',
52             isa => Str,
53             default => sub { $ENV{USER} },
54             );
55              
56              
57             has 'password' => (
58             is => 'ro',
59             );
60              
61              
62             has 'ssl_opts' => (
63             is => 'ro',
64             isa => HashRef,
65             default => sub { {} },
66             );
67              
68              
69             has 'ua' => (
70             is => 'lazy',
71             isa => InstanceOf["LWP::UserAgent"],
72             );
73              
74             sub _build_ua {
75 0     0     my ($self) = @_;
76              
77             # Construct the UA Object
78             ## no critic
79 0   0       my $local_version = eval '$VERSION' || '999.9';
80             ## use critic
81 0           my $ua = LWP::UserAgent->new(
82             keep_alive => 3,
83             agent => sprintf("%s/%s (Perl %s)", __PACKAGE__, $local_version, $^V),
84             protocols_allowed => [qw(http https)],
85             timeout => $self->timeout,
86             ssl_opts => $self->ssl_opts,
87             );
88 0 0         debug({color=>'cyan'}, sprintf "Initialized a UA: %s%s", $ua->agent, $self->password ? ' (password provided)' : '');
89              
90             # Decode the JSON Automatically
91             $ua->add_handler( response_done => sub {
92 0     0     my ($response,$lwp_ua,$headers) = @_;
93 0           debug( {color=>'magenta'}, "respone_done handler, got:");
94              
95 0           debug_var($response);
96 0   0       my $ctype = $response->content_type() || 'invalid';
97             # JSON Transform
98 0 0 0       if( $ctype =~ m{^application/json\b} ) {
    0          
99 0           debug({color=>'yellow',indent=>1},"JSON Decoding Response Content");
100 0           eval {
101 0           my $decoded = decode_json( $response->content );
102 0           $response->content($decoded);
103             };
104             }
105             elsif ( $response->is_success && $ctype =~ m{^text/plain} ) {
106             # Plain text transform for the _cat API
107 0           debug({color=>'yellow',indent=>1},"Plain Text Transform Response Content");
108             my $decoded = [
109 0 0 0       grep { defined && length && !/^\s+$/ }
  0            
110             split /\r?\n/, $response->content
111             ];
112 0           debug_var($decoded);
113 0           $response->content($decoded);
114             }
115 0 0         if( my $content = $response->content ) {
116 0           debug({color=>'yellow'}, "After translation:");
117 0 0         if( is_ref($content) ) {
118 0           debug_var( $content );
119             }
120             else{
121 0           debug( $content );
122             }
123             }
124 0           $_[0] = $response;
125 0           });
126              
127             # Warn About Basic Auth without TLS
128 0 0 0       warn "HTTP Basic Authorization configured and not using TLS, this is not supported"
129             if length $self->password && $self->proto ne 'https';
130              
131 0           return $ua;
132             }
133              
134              
135             sub request {
136 0     0 1   my ($self,$url,$options,$body) = @_;
137              
138             # Build the Path
139 0   0       $options->{command} ||= $url;
140 0 0         my @path = grep { defined and length } @{ $options }{qw(index command)};
  0            
  0            
141              
142 0           my $path = join('/', @path);
143              
144 0           debug(sprintf "calling %s->request(%s)", ref $self, $path);
145              
146             # Build a URI
147 0           my $uri = URI->new( sprintf "%s://%s:%d",
148             $self->proto,
149             $self->host,
150             $self->port,
151             );
152 0           $uri->path($path);
153              
154             # Query String
155 0 0 0       if( exists $options->{uri_param} and is_hashref($options->{uri_param}) ) {
156 0           foreach my $k ( keys %{ $options->{uri_param} } ) {
  0            
157 0           $uri->query_param( $k => $options->{uri_param}{$k} );
158             }
159             }
160             # Body Translations
161 0 0 0       if(!defined $body && exists $options->{body}) {
162 0   0       $body ||= delete $options->{body};
163             }
164              
165             # Determine request method
166 0 0         my $method = exists $options->{method} ? uc $options->{method} : 'GET';
167              
168             # Special Case for Index Creation
169 0 0 0       if( $method eq 'PUT' && $options->{index} && $options->{command} eq '/' ) {
      0        
170 0           $uri->path($options->{index});
171             }
172              
173 0           debug({color=>'magenta'}, sprintf "Issuing %s with URI of '%s'", $method, $uri->as_string);
174 0 0         if( defined $body ) {
175 0 0         if( is_ref($body) ) {
176 0           debug_var({indent=>1}, $body);
177             }
178             else {
179 0           debug({indent=>1}, split /\r?\n/, $body);
180             }
181             }
182              
183             # Make the request
184 0           my $req = App::ElasticSearch::Utilities::HTTPRequest->new( $method => $uri->as_string );
185              
186             # Authentication
187 0 0 0       $req->authorization_basic( $self->username, $self->password )
188             if length $self->password and $self->proto eq 'https';
189              
190 0 0         $req->content($body) if defined $body;
191              
192 0           return $self->ua->request( $req );
193             }
194              
195              
196              
197             sub exists {
198 0     0 1   my ($self,%options) = @_;
199              
200 0 0         return unless exists $options{index};
201             my %params = (
202             method => 'HEAD',
203             index => $options{index},
204 0           );
205              
206 0           return $self->request('', \%params,)->is_success;
207             }
208              
209              
210             sub put {
211 0     0 1   my ($self,%options) = @_;
212              
213 0 0         return unless exists $options{body};
214 0           my %params = ( method => 'PUT' );
215 0 0         $params{index} = $options{index} if exists $options{index};
216              
217 0           my $resp = $self->request('', \%params, $options{body});
218 0           return ( $resp->code, $resp->content );
219             }
220              
221              
222             sub bulk {
223 0     0 1   my ($self,%options) = @_;
224              
225 0 0         return unless exists $options{body};
226 0           my %params = ( method => 'POST' );
227 0 0         $params{index} = $options{index} if exists $options{index};
228              
229 0           my $resp = $self->request( '_bulk', \%params, $options{body} );
230 0           return ( $resp->code, $resp->content );
231             }
232              
233             __PACKAGE__->meta->make_immutable;
234              
235             __END__
236              
237             =pod
238              
239             =head1 NAME
240              
241             App::ElasticSearch::Utilities::Connection - Abstract the connection element
242              
243             =head1 VERSION
244              
245             version 8.7
246              
247             =head1 SYNOPSIS
248              
249             For most users, this code will never be called directly since this module
250             doesn't handle parameter parsing on the CLI. To get an object, instead call:
251              
252             use App::ElasticSearch::Utilities qw(es_connect);
253              
254             my $es = es_connect();
255              
256             my $http_response_obj = $es->request('_search',
257             {
258             index=>'logstash',
259             uri_param => {
260             size => 10,
261             }
262             },
263             {
264             query => {
265             query_string => "program:sshd",
266             }
267             }
268             );
269              
270             Though even this is overkill. The B<es_request> method maintains compatability with older versions and emulates
271             the API you'd expect from B<Elastijk>.
272              
273             =head1 ATTRIBUTES
274              
275             =head2 host
276              
277             Hostname or ip to connect to, default 'B<localhost>'
278              
279             =head2 port
280              
281             Port to connect the HTTP transport for the ElasticSearch cluster, default is B<9200>
282              
283             =head2 proto
284              
285             Protocol to use, defaults to 'B<http>'.
286              
287             This module converts from the performance concerned backend of B<Hijk> and B<Elastijk>, to the feature
288             rich B<LWP::UserAgent>. This means we can now support TLS communication to the ES back-end and things like
289             basic authentication.
290              
291             =head2 timeout
292              
293             Connection and Read Timeout for the HTTP connection, defaults to B<10> seconds.
294              
295             =head2 username
296              
297             HTTP Basic Authorization username, defaults to C<$ENV{USER}>.
298              
299             =head2 password
300              
301             HTTP Basic Authorization password, if set, we'll try authentication.
302              
303             =head2 ssl_opts
304              
305             SSL Options for L<LWP::UserAgent/ssl_opts>.
306              
307             =head2 ua
308              
309             Lazy built B<LWP::UserAgent> to access LWP::UserAgent directly.
310              
311             =head1 METHODS
312              
313             =head2 request( $command, { index => ... uri_param => { size => 1 } }, $body )
314              
315             This method provides a wrapper between the Hijk/Elastijk request syntax and the
316             LWP::UserAgent flow. It's return value is the B<HTTP::Response> object from
317             B<LWP::UserAgent> instead of the more simplistic return values of B<Hijk> and
318             B<Elastijk>. Use B<App::ElasticSearch::Utilities::es_request> for a simpler
319             interface.
320              
321             =head2 exists( index => 'name' )
322              
323             Takes the name of an index, returns true if the index exists, false otherwise.
324              
325             =head2 put( body => ... , index => ... )
326              
327             Parameter B<body> is required. Puts something to an index. This is often used to
328             put settings and/or mappings to an index.
329              
330             Returns a list containing the HTTP Status Code, and the Response Content.
331              
332             =head2 bulk( body => ..., index => ... )
333              
334             Parameter B<body> is required. The body should be an array containing the command and documents to send to the
335             ElasticSearch bulk API, see: L<Bulk API|https://www.elastic.co/guide/en/elasticsearch/reference/2.3/docs-bulk.html>
336              
337             Returns a list containing the HTTP Status Code, and the Response Content.
338              
339             =head1 AUTHOR
340              
341             Brad Lhotsky <brad@divisionbyzero.net>
342              
343             =head1 COPYRIGHT AND LICENSE
344              
345             This software is Copyright (c) 2023 by Brad Lhotsky.
346              
347             This is free software, licensed under:
348              
349             The (three-clause) BSD License
350              
351             =cut