File Coverage

blib/lib/Search/Elasticsearch/Role/Cxn.pm
Criterion Covered Total %
statement 158 210 75.2
branch 51 94 54.2
condition 35 61 57.3
subroutine 31 33 93.9
pod 9 13 69.2
total 284 411 69.1


line stmt bran cond sub pod time code
1             # Licensed to Elasticsearch B.V. under one or more contributor
2             # license agreements. See the NOTICE file distributed with
3             # this work for additional information regarding copyright
4             # ownership. Elasticsearch B.V. licenses this file to you under
5             # the Apache License, Version 2.0 (the "License"); you may
6             # not use this file except in compliance with the License.
7             # You may obtain a copy of the License at
8             #
9             # http://www.apache.org/licenses/LICENSE-2.0
10             #
11             # Unless required by applicable law or agreed to in writing,
12             # software distributed under the License is distributed on an
13             # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14             # KIND, either express or implied. See the License for the
15             # specific language governing permissions and limitations
16             # under the License.
17              
18             package Search::Elasticsearch::Role::Cxn;
19             $Search::Elasticsearch::Role::Cxn::VERSION = '8.12';
20             our $PRODUCT_CHECK_HEADER = 'x-elastic-product';
21             our $PRODUCT_CHECK_VALUE = 'Elasticsearch';
22              
23 55     55   100654 use Moo::Role;
  55         275799  
  55         441  
24 55     55   35435 use Search::Elasticsearch::Util qw(parse_params throw to_list);
  55         127  
  55         640  
25 55     55   32200 use List::Util qw(min);
  55         133  
  55         4764  
26 55     55   451 use Try::Tiny;
  55         113  
  55         3857  
27 55     55   19605 use URI();
  55         176819  
  55         1483  
28 55     55   32757 use IO::Compress::Deflate();
  55         2323015  
  55         1936  
29 55     55   33851 use IO::Uncompress::Inflate();
  55         963807  
  55         2141  
30 55     55   34603 use IO::Compress::Gzip();
  55         407455  
  55         2480  
31 55     55   33399 use IO::Uncompress::Gunzip qw(gunzip $GunzipError);
  55         139772  
  55         8419  
32 55     55   763 use Search::Elasticsearch::Util qw(to_list);
  55         254  
  55         830  
33 55     55   24871 use namespace::clean;
  55         146  
  55         577  
34 55     55   69541 use Net::IP;
  55         5127105  
  55         231017  
35              
36             requires qw(perform_request error_from_text handle);
37              
38             has 'client_version' => ( is => 'ro', required => 1, default => $Search::Elasticsearch::VERSION );
39             has 'host' => ( is => 'ro', required => 1 );
40             has 'port' => ( is => 'ro', required => 1 );
41             has 'uri' => ( is => 'ro', required => 1 );
42             has 'request_timeout' => ( is => 'ro', default => 30 );
43             has 'ping_timeout' => ( is => 'ro', default => 2 );
44             has 'sniff_timeout' => ( is => 'ro', default => 1 );
45             has 'sniff_request_timeout' => ( is => 'ro', default => 2 );
46             has 'next_ping' => ( is => 'rw', default => 0 );
47             has 'ping_failures' => ( is => 'rw', default => 0 );
48             has 'dead_timeout' => ( is => 'ro', default => 60 );
49             has 'max_dead_timeout' => ( is => 'ro', default => 3600 );
50             has 'serializer' => ( is => 'ro', required => 1 );
51             has 'logger' => ( is => 'ro', required => 1 );
52             has 'handle_args' => ( is => 'ro', default => sub { {} } );
53             has 'default_qs_params' => ( is => 'ro', default => sub { {} } );
54             has 'scheme' => ( is => 'ro' );
55             has 'is_https' => ( is => 'ro' );
56             has 'userinfo' => ( is => 'ro' );
57             has 'max_content_length' => ( is => 'ro' );
58             has 'default_headers' => ( is => 'ro' );
59             has 'deflate' => ( is => 'ro' );
60             has 'gzip' => ( is => 'ro' );
61             has 'ssl_options' => ( is => 'ro', predicate => 'has_ssl_options' );
62             has 'handle' => ( is => 'lazy', clearer => 1 );
63             has '_pid' => ( is => 'rw', default => $$ );
64              
65             my %Code_To_Error = (
66             400 => 'Request',
67             401 => 'Unauthorized',
68             403 => 'Forbidden',
69             404 => 'Missing',
70             408 => 'RequestTimeout',
71             409 => 'Conflict',
72             413 => 'ContentLength',
73             502 => 'BadGateway',
74             503 => 'Unavailable',
75             504 => 'GatewayTimeout'
76             );
77              
78             #===================================
79 611     611 0 3063 sub stringify { shift->uri . '' }
80             #===================================
81              
82             #===================================
83             sub get_user_agent {
84             #===================================
85 176     176 0 317259 return sprintf("elasticsearch-perl/%s (%s; perl %s)", $Search::Elasticsearch::VERSION, $^O, $]);
86             }
87              
88             #===================================
89             sub get_meta_header {
90             #===================================
91 176     176 0 1651 return sprintf("es=%s,pl=%s", $Search::Elasticsearch::VERSION, $^V);
92             }
93              
94              
95             #===================================
96             sub BUILDARGS {
97             #===================================
98 175     175 0 222109 my ( $class, $params ) = parse_params(@_);
99              
100             my $node = $params->{node}
101 175   50     843 || { host => 'localhost', port => '9200' };
102              
103 175 100       606 unless ( ref $node eq 'HASH' ) {
104 170 100       874 $node = "[$node]" if Net::IP::ip_is_ipv6($node);
105 170 100       3379 unless ( $node =~ m{^http(s)?://} ) {
106 124 100       534 $node = ( $params->{use_https} ? 'https://' : 'http://' ) . $node;
107             }
108 170 100 100     793 if ( $params->{port} && $node !~ m{//[^/\[]+:\d+} ) {
109 3         36 $node =~ s{(//[^/]+)}{$1:$params->{port}};
110             }
111 170         1220 my $uri = URI->new($node);
112 170         581659 $node = {
113             scheme => $uri->scheme,
114             host => $uri->host,
115             port => $uri->port,
116             path => $uri->path,
117             userinfo => $uri->userinfo
118             };
119             }
120              
121 175   100     28373 my $host = $node->{host} || 'localhost';
122 175   100     1467 my $userinfo = $node->{userinfo} || $params->{userinfo} || '';
123             my $scheme
124 175   66     581 = $node->{scheme} || ( $params->{use_https} ? 'https' : 'http' );
125             my $port
126             = $node->{port}
127             || $params->{port}
128 175   33     603 || ( $scheme eq 'http' ? 80 : 443 );
129 175   100     1269 my $path = $node->{path} || $params->{path_prefix} || '';
130 175         1065 $path =~ s{^/?}{/}g;
131 175         777 $path =~ s{/+$}{};
132              
133 175 50       315 my %default_headers = %{ $params->{default_headers} || {} };
  175         1169  
134              
135 175 100       723 if ($userinfo) {
136 3         30 require MIME::Base64;
137 3         22 my $auth = MIME::Base64::encode_base64( $userinfo, "" );
138 3         9 chomp $auth;
139 3         14 $default_headers{Authorization} = "Basic $auth";
140             }
141              
142 175 50       817 if ( $params->{gzip} ) {
    100          
143 0         0 $default_headers{'Accept-Encoding'} = "gzip";
144             }
145              
146             elsif ( $params->{deflate} ) {
147 1         5 $default_headers{'Accept-Encoding'} = "deflate";
148             }
149              
150 175         1048 $default_headers{'User-Agent'} = $class->get_user_agent();
151              
152             # Add Elastic meta header
153 175         616 $default_headers{'x-elastic-client-meta'} = $class->get_meta_header();
154              
155             # Compatibility header
156 175 0 0     672 if (defined $ENV{ELASTIC_CLIENT_APIVERSIONING} &&
      33        
157             (lc($ENV{ELASTIC_CLIENT_APIVERSIONING}) eq 'true' || $ENV{ELASTIC_CLIENT_APIVERSIONING} eq '1')) {
158 0         0 $default_headers{'Accept'} = 'application/vnd.elasticsearch+json;compatible-with=7';
159 0         0 $default_headers{'Content-Type'} = 'application/vnd.elasticsearch+json; compatible-with=7';
160             }
161              
162 175 50 33     754 if (defined $params->{elastic_cloud_api_key} && defined $params->{token_api}) {
163 0         0 throw( 'Request',
164             "You cannot set elastic_cloud_api_key and token_api together" );
165             }
166              
167             # Elastic cloud API key
168 175 50       550 if (defined $params->{elastic_cloud_api_key}) {
169 0         0 $default_headers{'Authorization'} = sprintf("ApiKey %s", $params->{elastic_cloud_api_key});
170             }
171              
172             # Elasticsearch token API (https://www.elastic.co/guide/en/elasticsearch/reference/current/security-api-get-token.html)
173 175 50       465 if (defined $params->{token_api}) {
174 0         0 $default_headers{'Authorization'} = sprintf("Bearer %s", $params->{token_api});
175             }
176              
177             # Elasticsearch
178 175         536 $params->{scheme} = $scheme;
179 175         566 $params->{is_https} = $scheme eq 'https';
180 175         1976 $params->{host} = $host;
181 175         2610 $params->{port} = $port;
182 175         481 $params->{path} = $path;
183 175         409 $params->{userinfo} = $userinfo;
184 175 100       775 $host = "[$host]" if Net::IP::ip_is_ipv6($host);
185 175         2516 $params->{uri} = URI->new("$scheme://$host:$port$path");
186 175         16023 $params->{default_headers} = \%default_headers;
187             # Add the client version
188 175 50       2674 if (defined $params->{client}) {
189 0         0 $params->{client_version} = substr($params->{client}, 0, index($params->{client}, '_'));
190             }
191              
192 175         5276 return $params;
193             }
194              
195             #===================================
196             before 'handle' => sub {
197             #===================================
198             my $self = shift;
199             if ( $$ != $self->_pid ) {
200             $self->clear_handle;
201             $self->_pid($$);
202             }
203             };
204              
205             #===================================
206 182     182 1 1190 sub is_live { !shift->next_ping }
207 128     128 1 863 sub is_dead { !!shift->next_ping }
208             #===================================
209              
210             #===================================
211             sub mark_live {
212             #===================================
213 149     149 1 968 my $self = shift;
214 149         469 $self->ping_failures(0);
215 149         403 $self->next_ping(0);
216             }
217              
218             #===================================
219             sub mark_dead {
220             #===================================
221 143     143 1 4037 my $self = shift;
222 143         285 my $fails = $self->ping_failures;
223 143         329 $self->ping_failures( $fails + 1 );
224              
225 143         639 my $timeout
226             = min( $self->dead_timeout * 2**$fails, $self->max_dead_timeout );
227 143         340 my $next = $self->next_ping( time() + $timeout );
228              
229 143         347 $self->logger->infof( 'Marking [%s] as dead. Next ping at: %s',
230             $self->stringify, scalar localtime($next) );
231              
232             }
233              
234             #===================================
235             sub force_ping {
236             #===================================
237 121     121 1 291 my $self = shift;
238 121         480 $self->ping_failures(0);
239 121         1359 $self->next_ping(-1);
240             }
241              
242             #===================================
243             sub pings_ok {
244             #===================================
245 47     47 1 78 my $self = shift;
246 47         169 $self->logger->infof( 'Pinging [%s]', $self->stringify );
247             return try {
248 47     47   4359 $self->perform_request(
249             { method => 'HEAD',
250             path => '/',
251             timeout => $self->ping_timeout,
252             }
253             );
254 33         256 $self->logger->infof( 'Marking [%s] as live', $self->stringify );
255 33         1885 $self->mark_live;
256 33         89 1;
257             }
258             catch {
259 14     14   914 $self->logger->debug("$_");
260 14         411 $self->mark_dead;
261 14         1347 0;
262 47         3280 };
263             }
264              
265             #===================================
266             sub sniff {
267             #===================================
268 38     38 1 64 my $self = shift;
269 38         157 $self->logger->infof( 'Sniffing [%s]', $self->stringify );
270             return try {
271             $self->perform_request(
272             { method => 'GET',
273             path => '/_nodes/http',
274             qs => { timeout => $self->sniff_timeout . 's' },
275             timeout => $self->sniff_request_timeout,
276             }
277 38     38   3795 )->{nodes};
278             }
279             catch {
280 13     13   380 $self->logger->debug($_);
281 13         469 return;
282 38         2444 };
283             }
284              
285             #===================================
286             sub build_uri {
287             #===================================
288 5     5 1 7 my ( $self, $params ) = @_;
289 5         24 my $uri = $self->uri->clone;
290 5         37 $uri->path( $uri->path . $params->{path} );
291 5 100       262 my %qs = ( %{ $self->default_qs_params }, %{ $params->{qs} || {} } );
  5         18  
  5         25  
292 5         21 $uri->query_form( \%qs );
293 5         346 return $uri;
294             }
295              
296             #===================================
297             before 'perform_request' => sub {
298             #===================================
299             my ( $self, $params ) = @_;
300             return unless defined $params->{data};
301              
302             $self->_compress_body($params);
303              
304             my $max = $self->max_content_length
305             or return;
306              
307             return if length( $params->{data} ) < $max;
308              
309             $self->logger->throw_error( 'ContentLength',
310             "Body is longer than max_content_length ($max)",
311             );
312             };
313              
314             #===================================
315             sub _compress_body {
316             #===================================
317 0     0   0 my ( $self, $params ) = @_;
318 0         0 my $output;
319 0 0       0 if ( $self->gzip ) {
    0          
320 0 0       0 IO::Compress::Gzip::gzip( \( $params->{data} ), \$output )
321             or throw( 'Request',
322             "Couldn't gzip request: $IO::Compress::Gzip::GzipError" );
323 0         0 $params->{data} = $output;
324 0         0 $params->{encoding} = 'gzip';
325             }
326             elsif ( $self->deflate ) {
327 0 0       0 IO::Compress::Deflate::deflate( \( $params->{data} ), \$output )
328             or throw( 'Request',
329             "Couldn't deflate request: $IO::Compress::Deflate::DeflateError" );
330 0         0 $params->{data} = $output;
331 0         0 $params->{encoding} = 'deflate';
332             }
333             }
334              
335             #===================================
336             sub _decompress_body {
337             #===================================
338 239     239   523 my ( $self, $body_ref, $headers ) = @_;
339 239 50       721 if ( my $encoding = $headers->{'content-encoding'} ) {
340 0         0 my $output;
341 0 0       0 if ( $encoding eq 'gzip' ) {
    0          
342 0 0       0 IO::Uncompress::Gunzip::gunzip( $body_ref, \$output )
343             or throw(
344             'Request',
345             "Couldn't gunzip response: $IO::Uncompress::Gunzip::GunzipError"
346             );
347             }
348             elsif ( $encoding eq 'deflate' ) {
349 0 0       0 IO::Uncompress::Inflate::inflate( $body_ref, \$output,
350             Transparent => 0 )
351             or throw(
352             'Request',
353             "Couldn't inflate response: $IO::Uncompress::Inflate::InflateError"
354             );
355             }
356             else {
357 0         0 throw( 'Request', "Unknown content-encoding: $encoding" );
358             }
359 0         0 ${$body_ref} = $output;
  0         0  
360             }
361             }
362              
363             #===================================
364             sub process_response {
365             #===================================
366 240     240 1 46269 my ( $self, $params, $code, $msg, $body, $headers ) = @_;
367              
368             # Product check only for 8+ client API version
369 240 100 33     2360 if ( $self->client_version >= 8 and $code >= 200 and $code < 300 ) {
      66        
370 178   100     712 my $product = $headers->{$PRODUCT_CHECK_HEADER} // '';
371 178 100       529 if ($product ne $PRODUCT_CHECK_VALUE) {
372 1         6 throw( "ProductCheck", "The client noticed that the server is not Elasticsearch and we do not support this unknown product" );
373             }
374             }
375              
376 239         844 $self->_decompress_body( \$body, $headers );
377              
378 239   100     1084 my ($mime_type) = split /\s*;\s*/, ( $headers->{'content-type'} || '' );
379              
380 239   100     913 my $is_encoded = $mime_type && $mime_type ne 'text/plain';
381              
382             # Deprecation warnings
383 239 50       582 if ( my $warnings = $headers->{warning} ) {
384 0         0 my $warning_string = _parse_warnings($warnings);
385 0         0 my %temp = (%$params);
386 0         0 delete $temp{data};
387 0         0 $self->logger->deprecation( $warning_string, \%temp );
388             }
389              
390             # Request is successful
391 239 100 66     918 if ( $code >= 200 and $code <= 209 ) {
392 177 100 100     754 if ( defined $body and length $body ) {
393 141 100       886 $body = $self->serializer->decode($body)
394             if $is_encoded;
395 141         2555 return $code, $body;
396             }
397 36 100       159 return ( $code, 1 ) if $params->{method} eq 'HEAD';
398 2         10 return ( $code, '' );
399             }
400              
401             # Check if the error should be ignored
402 62         384 my @ignore = to_list( $params->{ignore} );
403 62 100       302 push @ignore, 404 if $params->{method} eq 'HEAD';
404 62 100       345 return ($code) if grep { $_ eq $code } @ignore;
  12         76  
405              
406             # Determine error type
407 60         183 my $error_type = $Code_To_Error{$code};
408 60 100       156 unless ($error_type) {
409 49 50 33     151 if ( defined $body and length $body ) {
410 0         0 $msg = $body;
411 0         0 $body = undef;
412             }
413 49         172 $error_type = $self->error_from_text( $code, $msg );
414             }
415              
416 60 50       289 delete $params->{data} if $params->{body};
417 60         229 my %error_args = ( status_code => $code, request => $params );
418              
419             # Extract error message from the body, if present
420              
421 60 100       318 if ( $body = $self->serializer->decode($body) ) {
422 2         37 $error_args{body} = $body;
423 2   33     9 $msg = $self->_munge_elasticsearch_exception($body) || $msg;
424              
425 2 50 33     9 $error_args{current_version} = $1
426             if $error_type eq 'Conflict'
427             and $msg =~ /: version conflict, current (?:version )?\[(\d+)\]/;
428             }
429 60   33     159 $msg ||= $error_type;
430              
431 60         124 chomp $msg;
432 60         201 throw( $error_type, "[" . $self->stringify . "]-[$code] $msg",
433             \%error_args );
434             }
435              
436             #===================================
437             sub _parse_warnings {
438             #===================================
439 0 0   0   0 my @warnings = ref $_[0] eq 'ARRAY' ? @{ shift() } : shift();
  0         0  
440 0         0 my @str;
441 0         0 for (@warnings) {
442 0 0       0 if ( $_ =~ /^\d+\s+\S+\s+"((?:\\"|[^"])+)"/ ) {
443 0         0 my $msg = $1;
444 0         0 $msg =~ s/\\"/"/g, push @str, $msg;
445             }
446             else {
447 0         0 push @str, $_;
448             }
449             }
450 0         0 return join "; ", @str;
451             }
452              
453             #===================================
454             sub _munge_elasticsearch_exception {
455             #===================================
456 2     2   7 my ( $self, $body ) = @_;
457 2 50       8 return $body unless ref $body eq 'HASH';
458 2   50     8 my $error = $body->{error} || return;
459 2 50       11 return $error unless ref $error eq 'HASH';
460              
461 0   0       my $root_causes = $error->{root_cause} || [];
462 0 0         unless (@$root_causes) {
463 0 0         my $msg = "[" . $error->{type} . "] " if $error->{type};
464 0 0         $msg .= $error->{reason} if $error->{reason};
465 0           return $msg;
466             }
467              
468 0           my $json = $self->serializer;
469 0           my @msgs;
470 0           for (@$root_causes) {
471 0           my %cause = (%$_);
472             my $msg
473 0           = "[" . ( delete $cause{type} ) . "] " . ( delete $cause{reason} );
474 0 0         if ( keys %cause ) {
475 0           $msg .= ", with: " . $json->encode( \%cause );
476             }
477 0           push @msgs, $msg;
478             }
479 0           return ( join ", ", @msgs );
480             }
481              
482             1;
483              
484             # ABSTRACT: Provides common functionality to HTTP Cxn implementations
485              
486             __END__