File Coverage

blib/lib/OpenSearch/Client/Role/Cxn.pm
Criterion Covered Total %
statement 147 193 76.1
branch 43 80 53.7
condition 28 44 63.6
subroutine 30 32 93.7
pod 9 12 75.0
total 257 361 71.1


line stmt bran cond sub pod time code
1             # OpenSearch::Client is an unofficial client for OpenSearch.
2             # It is derived from Search::Elasticsearch version 7.714
3             # License details from the original work are contained in the
4             # NOTICE file distributed with this work.
5             #
6             #-----------------------------------------------------------------------
7             # OpenSearch::Client
8             #-----------------------------------------------------------------------
9             # Copyright 2026 Mark Dootson
10             #
11             # Licensed under the Apache License, Version 2.0 (the "License");
12             # you may not use this file except in compliance with the License.
13             # You may obtain a copy of the License at
14             #
15             # http://www.apache.org/licenses/LICENSE-2.0
16             #
17             # Unless required by applicable law or agreed to in writing, software
18             # distributed under the License is distributed on an "AS IS" BASIS,
19             # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20             # See the License for the specific language governing permissions and
21             # limitations under the License.
22              
23             package OpenSearch::Client::Role::Cxn;
24             $OpenSearch::Client::Role::Cxn::VERSION = '3.007005';
25 56     56   299684 use Moo::Role;
  56         7913  
  56         351  
26 56     56   25052 use OpenSearch::Client::Util qw(parse_params throw to_list);
  56         120  
  56         500  
27 56     56   22124 use List::Util qw(min);
  56         122  
  56         3687  
28 56     56   246 use Try::Tiny;
  56         111  
  56         2824  
29 56     56   13082 use URI();
  56         121160  
  56         1105  
30 56     56   25687 use IO::Compress::Deflate();
  56         1672314  
  56         1414  
31 56     56   26321 use IO::Uncompress::Inflate();
  56         685809  
  56         1392  
32 56     56   26614 use IO::Compress::Gzip();
  56         290276  
  56         1862  
33 56     56   26620 use IO::Uncompress::Gunzip qw(gunzip $GunzipError);
  56         97819  
  56         6329  
34 56     56   395 use OpenSearch::Client::Util qw(to_list);
  56         226  
  56         555  
35 56     56   16154 use namespace::clean;
  56         100  
  56         413  
36 56     56   50021 use Net::IP;
  56         3628752  
  56         151686  
37              
38             requires qw(perform_request error_from_text handle);
39              
40             has 'host' => ( is => 'ro', required => 1 );
41             has 'port' => ( is => 'ro', required => 1 );
42             has 'uri' => ( is => 'ro', required => 1 );
43             has 'request_timeout' => ( is => 'ro', default => 30 );
44             has 'ping_timeout' => ( is => 'ro', default => 2 );
45             has 'sniff_timeout' => ( is => 'ro', default => 1 );
46             has 'sniff_request_timeout' => ( is => 'ro', default => 2 );
47             has 'next_ping' => ( is => 'rw', default => 0 );
48             has 'ping_failures' => ( is => 'rw', default => 0 );
49             has 'dead_timeout' => ( is => 'ro', default => 60 );
50             has 'max_dead_timeout' => ( is => 'ro', default => 3600 );
51             has 'serializer' => ( is => 'ro', required => 1 );
52             has 'logger' => ( is => 'ro', required => 1 );
53             has 'handle_args' => ( is => 'ro', default => sub { {} } );
54             has 'default_qs_params' => ( is => 'ro', default => sub { {} } );
55             has 'scheme' => ( is => 'ro' );
56             has 'is_https' => ( is => 'ro' );
57             has 'userinfo' => ( is => 'ro' );
58             has 'max_content_length' => ( is => 'ro' );
59             has 'default_headers' => ( is => 'ro' );
60             has 'deflate' => ( is => 'ro' );
61             has 'gzip' => ( is => 'ro' );
62             has 'ssl_options' => ( is => 'ro', predicate => 'has_ssl_options' );
63             has 'handle' => ( is => 'lazy', clearer => 1 );
64             has '_pid' => ( is => 'rw', default => $$ );
65             has 'http_proxy' => ( is => 'ro', predicate => 'has_http_proxy' );
66             has 'https_proxy' => ( is => 'ro', predicate => 'has_https_proxy' );
67             has 'no_proxy' => ( is => 'ro', predicate => 'has_no_proxy' );
68              
69             my %Code_To_Error = (
70             400 => 'Request',
71             401 => 'Unauthorized',
72             403 => 'Forbidden',
73             404 => 'Missing',
74             408 => 'RequestTimeout',
75             409 => 'Conflict',
76             413 => 'ContentLength',
77             502 => 'BadGateway',
78             503 => 'Unavailable',
79             504 => 'GatewayTimeout'
80             );
81              
82             #===================================
83 611     611 0 3929 sub stringify { shift->uri . '' }
84             #===================================
85              
86             #===================================
87             sub get_user_agent {
88             #===================================
89 176     176 0 191502 return sprintf("opensearch-client-perl/%s (%s; perl %s)", $OpenSearch::Client::VERSION, $^O, $]);
90             }
91              
92             #===================================
93             sub BUILDARGS {
94             #===================================
95 175     175 0 167539 my ( $class, $params ) = parse_params(@_);
96              
97             my $node = $params->{node}
98 175   50     665 || { host => 'localhost', port => '9200' };
99              
100 175 100       522 unless ( ref $node eq 'HASH' ) {
101 170 100       692 $node = "[$node]" if Net::IP::ip_is_ipv6($node);
102 170 100       2521 unless ( $node =~ m{^http(s)?://} ) {
103 124 100       421 $node = ( $params->{use_https} ? 'https://' : 'http://' ) . $node;
104             }
105 170 100 100     564 if ( $params->{port} && $node !~ m{//[^/\[]+:\d+} ) {
106 3         27 $node =~ s{(//[^/]+)}{$1:$params->{port}};
107             }
108 170         874 my $uri = URI->new($node);
109 170         456360 $node = {
110             scheme => $uri->scheme,
111             host => $uri->host,
112             port => $uri->port,
113             path => $uri->path,
114             userinfo => $uri->userinfo
115             };
116             }
117              
118 175   100     21094 my $host = $node->{host} || 'localhost';
119 175   100     1086 my $userinfo = $node->{userinfo} || $params->{userinfo} || '';
120             my $scheme
121 175   66     474 = $node->{scheme} || ( $params->{use_https} ? 'https' : 'http' );
122             my $port
123             = $node->{port}
124             || $params->{port}
125 175   33     1919 || ( $scheme eq 'http' ? 80 : 443 );
126 175   100     1000 my $path = $node->{path} || $params->{path_prefix} || '';
127 175         824 $path =~ s{^/?}{/}g;
128 175         565 $path =~ s{/+$}{};
129              
130 175 50       274 my %default_headers = %{ $params->{default_headers} || {} };
  175         793  
131              
132 175 100       559 if ($userinfo) {
133 3         28 require MIME::Base64;
134 3         18 my $auth = MIME::Base64::encode_base64( $userinfo, "" );
135 3         9 chomp $auth;
136 3         10 $default_headers{Authorization} = "Basic $auth";
137             }
138              
139 175 50       733 if ( $params->{gzip} ) {
    100          
140 0         0 $default_headers{'Accept-Encoding'} = "gzip";
141             }
142              
143             elsif ( $params->{deflate} ) {
144 1         5 $default_headers{'Accept-Encoding'} = "deflate";
145             }
146              
147 175         714 $default_headers{'User-Agent'} = $class->get_user_agent();
148              
149 175         432 $params->{scheme} = $scheme;
150 175         438 $params->{is_https} = $scheme eq 'https';
151 175         449 $params->{host} = $host;
152 175         340 $params->{port} = $port;
153 175         327 $params->{path} = $path;
154 175         315 $params->{userinfo} = $userinfo;
155 175 100       505 $host = "[$host]" if Net::IP::ip_is_ipv6($host);
156 175         1802 $params->{uri} = URI->new("$scheme://$host:$port$path");
157 175         11073 $params->{default_headers} = \%default_headers;
158              
159 175         3811 return $params;
160             }
161              
162             #===================================
163             before 'handle' => sub {
164             #===================================
165             my $self = shift;
166             if ( $$ != $self->_pid ) {
167             $self->clear_handle;
168             $self->_pid($$);
169             }
170             };
171              
172             #===================================
173 182     182 1 879 sub is_live { !shift->next_ping }
174 128     128 1 562 sub is_dead { !!shift->next_ping }
175             #===================================
176              
177             #===================================
178             sub mark_live {
179             #===================================
180 149     149 1 748 my $self = shift;
181 149         300 $self->ping_failures(0);
182 149         271 $self->next_ping(0);
183             }
184              
185             #===================================
186             sub mark_dead {
187             #===================================
188 143     143 1 3709 my $self = shift;
189 143         236 my $fails = $self->ping_failures;
190 143         326 $self->ping_failures( $fails + 1 );
191              
192 143         442 my $timeout
193             = min( $self->dead_timeout * 2**$fails, $self->max_dead_timeout );
194 143         246 my $next = $self->next_ping( time() + $timeout );
195              
196 143         313 $self->logger->infof( 'Marking [%s] as dead. Next ping at: %s',
197             $self->stringify, scalar localtime($next) );
198              
199             }
200              
201             #===================================
202             sub force_ping {
203             #===================================
204 121     121 1 217 my $self = shift;
205 121         379 $self->ping_failures(0);
206 121         984 $self->next_ping(-1);
207             }
208              
209             #===================================
210             sub pings_ok {
211             #===================================
212 47     47 1 59 my $self = shift;
213 47         134 $self->logger->infof( 'Pinging [%s]', $self->stringify );
214             return try {
215 47     47   3098 $self->perform_request(
216             { method => 'HEAD',
217             path => '/',
218             timeout => $self->ping_timeout,
219             }
220             );
221 33         147 $self->logger->infof( 'Marking [%s] as live', $self->stringify );
222 33         1341 $self->mark_live;
223 33         65 1;
224             }
225             catch {
226 14     14   578 $self->logger->debug("$_");
227 14         364 $self->mark_dead;
228 14         1159 0;
229 47         2329 };
230             }
231              
232             #===================================
233             sub sniff {
234             #===================================
235 38     38 1 54 my $self = shift;
236 38         102 $self->logger->infof( 'Sniffing [%s]', $self->stringify );
237             return try {
238             $self->perform_request(
239             { method => 'GET',
240             path => '/_nodes/http',
241             qs => { timeout => $self->sniff_timeout . 's' },
242             timeout => $self->sniff_request_timeout,
243             }
244 38     38   2581 )->{nodes};
245             }
246             catch {
247 13     13   375 $self->logger->debug($_);
248 13         455 return;
249 38         1631 };
250             }
251              
252             #===================================
253             sub build_uri {
254             #===================================
255 5     5 1 12 my ( $self, $params ) = @_;
256 5         30 my $uri = $self->uri->clone;
257 5         39 $uri->path( $uri->path . $params->{path} );
258 5 100       203 my %qs = ( %{ $self->default_qs_params }, %{ $params->{qs} || {} } );
  5         14  
  5         20  
259 5         25 $uri->query_form( \%qs );
260 5         355 return $uri;
261             }
262              
263             #===================================
264             before 'perform_request' => sub {
265             #===================================
266             my ( $self, $params ) = @_;
267             return unless defined $params->{data};
268              
269             $self->_compress_body($params);
270              
271             my $max = $self->max_content_length
272             or return;
273              
274             return if length( $params->{data} ) < $max;
275              
276             $self->logger->throw_error( 'ContentLength',
277             "Body is longer than max_content_length ($max)",
278             );
279             };
280              
281             #===================================
282             sub _compress_body {
283             #===================================
284 0     0   0 my ( $self, $params ) = @_;
285 0         0 my $output;
286 0 0       0 if ( $self->gzip ) {
    0          
287 0 0       0 IO::Compress::Gzip::gzip( \( $params->{data} ), \$output )
288             or throw( 'Request',
289             "Couldn't gzip request: $IO::Compress::Gzip::GzipError" );
290 0         0 $params->{data} = $output;
291 0         0 $params->{encoding} = 'gzip';
292             }
293             elsif ( $self->deflate ) {
294 0 0       0 IO::Compress::Deflate::deflate( \( $params->{data} ), \$output )
295             or throw( 'Request',
296             "Couldn't deflate request: $IO::Compress::Deflate::DeflateError" );
297 0         0 $params->{data} = $output;
298 0         0 $params->{encoding} = 'deflate';
299             }
300             }
301              
302             #===================================
303             sub _decompress_body {
304             #===================================
305 238     238   526 my ( $self, $body_ref, $headers ) = @_;
306 238 50       1052 if ( my $encoding = $headers->{'content-encoding'} ) {
307 0         0 my $output;
308 0 0       0 if ( $encoding eq 'gzip' ) {
    0          
309 0 0       0 IO::Uncompress::Gunzip::gunzip( $body_ref, \$output )
310             or throw(
311             'Request',
312             "Couldn't gunzip response: $IO::Uncompress::Gunzip::GunzipError"
313             );
314             }
315             elsif ( $encoding eq 'deflate' ) {
316 0 0       0 IO::Uncompress::Inflate::inflate( $body_ref, \$output,
317             Transparent => 0 )
318             or throw(
319             'Request',
320             "Couldn't inflate response: $IO::Uncompress::Inflate::InflateError"
321             );
322             }
323             else {
324 0         0 throw( 'Request', "Unknown content-encoding: $encoding" );
325             }
326 0         0 ${$body_ref} = $output;
  0         0  
327             }
328             }
329              
330             #===================================
331             sub process_response {
332             #===================================
333 238     238 1 27094 my ( $self, $params, $code, $msg, $body, $headers ) = @_;
334 238         592 $self->_decompress_body( \$body, $headers );
335              
336 238   100     816 my ($mime_type) = split /\s*;\s*/, ( $headers->{'content-type'} || '' );
337              
338 238   100     793 my $is_encoded = $mime_type && $mime_type ne 'text/plain';
339              
340             # Deprecation warnings
341 238 50       419 if ( my $warnings = $headers->{warning} ) {
342 0         0 my $warning_string = _parse_warnings($warnings);
343 0         0 my %temp = (%$params);
344 0         0 delete $temp{data};
345 0         0 $self->logger->deprecation( $warning_string, \%temp );
346             }
347              
348             # Request is successful
349              
350 238 100 66     757 if ( $code >= 200 and $code <= 209 ) {
351 176 100 100     559 if ( defined $body and length $body ) {
352 141 100       626 $body = $self->serializer->decode($body)
353             if $is_encoded;
354 141         1672 return $code, $body;
355             }
356 35 100       107 return ( $code, 1 ) if $params->{method} eq 'HEAD';
357 1         4 return ( $code, '' );
358             }
359              
360             # Check if the error should be ignored
361 62         261 my @ignore = to_list( $params->{ignore} );
362 62 100       161 push @ignore, 404 if $params->{method} eq 'HEAD';
363 62 100       158 return ($code) if grep { $_ eq $code } @ignore;
  12         38  
364              
365             # Determine error type
366 60         193 my $error_type = $Code_To_Error{$code};
367 60 100       125 unless ($error_type) {
368 49 50 33     150 if ( defined $body and length $body ) {
369 0         0 $msg = $body;
370 0         0 $body = undef;
371             }
372 49         242 $error_type = $self->error_from_text( $code, $msg );
373             }
374              
375 60 50       205 delete $params->{data} if $params->{body};
376 60         209 my %error_args = ( status_code => $code, request => $params );
377              
378             # Extract error message from the body, if present
379              
380 60 100       242 if ( $body = $self->serializer->decode($body) ) {
381 2         117 $error_args{body} = $body;
382 2   33     8 $msg = $self->_munge_opensearch_exception($body) || $msg;
383              
384 2 50 33     6 $error_args{current_version} = $1
385             if $error_type eq 'Conflict'
386             and $msg =~ /: version conflict, current (?:version )?\[(\d+)\]/;
387             }
388 60   33     130 $msg ||= $error_type;
389              
390 60         126 chomp $msg;
391 60         124 throw( $error_type, "[" . $self->stringify . "]-[$code] $msg",
392             \%error_args );
393             }
394              
395             #===================================
396             sub _parse_warnings {
397             #===================================
398 0 0   0   0 my @warnings = ref $_[0] eq 'ARRAY' ? @{ shift() } : shift();
  0         0  
399 0         0 my @str;
400 0         0 for (@warnings) {
401 0 0       0 if ( $_ =~ /^\d+\s+\S+\s+"((?:\\"|[^"])+)"/ ) {
402 0         0 my $msg = $1;
403 0         0 $msg =~ s/\\"/"/g, push @str, $msg;
404             }
405             else {
406 0         0 push @str, $_;
407             }
408             }
409 0         0 return join "; ", @str;
410             }
411              
412             #===================================
413             sub _munge_opensearch_exception {
414             #===================================
415 2     2   4 my ( $self, $body ) = @_;
416 2 50       6 return $body unless ref $body eq 'HASH';
417 2   50     5 my $error = $body->{error} || return;
418 2 50       6 return $error unless ref $error eq 'HASH';
419              
420 0   0       my $root_causes = $error->{root_cause} || [];
421 0 0         unless (@$root_causes) {
422 0 0         my $msg = "[" . $error->{type} . "] " if $error->{type};
423 0 0         $msg .= $error->{reason} if $error->{reason};
424 0           return $msg;
425             }
426              
427 0           my $json = $self->serializer;
428 0           my @msgs;
429 0           for (@$root_causes) {
430 0           my %cause = (%$_);
431             my $msg
432 0           = "[" . ( delete $cause{type} ) . "] " . ( delete $cause{reason} );
433 0 0         if ( keys %cause ) {
434 0           $msg .= ", with: " . $json->encode( \%cause );
435             }
436 0           push @msgs, $msg;
437             }
438 0           return ( join ", ", @msgs );
439             }
440              
441             1;
442              
443             __END__