File Coverage

blib/lib/OpenSearch/Client/Role/Cxn.pm
Criterion Covered Total %
statement 147 193 76.1
branch 43 80 53.7
condition 28 44 63.6
subroutine 30 32 93.7
pod 9 12 75.0
total 257 361 71.1


line stmt bran cond sub pod time code
1             # OpenSearch::Client is an unofficial client for OpenSearch.
2             # It is derived from Search::Elasticsearch version 7.714
3             # License details from the original work are contained in the
4             # NOTICE file distributed with this work.
5             #
6             #-----------------------------------------------------------------------
7             # OpenSearch::Client
8             #-----------------------------------------------------------------------
9             # Copyright 2026 Mark Dootson
10             #
11             # Licensed under the Apache License, Version 2.0 (the "License");
12             # you may not use this file except in compliance with the License.
13             # You may obtain a copy of the License at
14             #
15             # http://www.apache.org/licenses/LICENSE-2.0
16             #
17             # Unless required by applicable law or agreed to in writing, software
18             # distributed under the License is distributed on an "AS IS" BASIS,
19             # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20             # See the License for the specific language governing permissions and
21             # limitations under the License.
22              
23             package OpenSearch::Client::Role::Cxn;
24             $OpenSearch::Client::Role::Cxn::VERSION = '3.007002';
25 55     55   292891 use Moo::Role;
  55         7799  
  55         365  
26 55     55   23465 use OpenSearch::Client::Util qw(parse_params throw to_list);
  55         84  
  55         385  
27 55     55   21281 use List::Util qw(min);
  55         93  
  55         3237  
28 55     55   233 use Try::Tiny;
  55         111  
  55         2732  
29 55     55   13036 use URI();
  55         123482  
  55         937  
30 55     55   24694 use IO::Compress::Deflate();
  55         1603161  
  55         1322  
31 55     55   25330 use IO::Uncompress::Inflate();
  55         665236  
  55         1416  
32 55     55   26362 use IO::Compress::Gzip();
  55         278761  
  55         1666  
33 55     55   25336 use IO::Uncompress::Gunzip qw(gunzip $GunzipError);
  55         95103  
  55         6021  
34 55     55   462 use OpenSearch::Client::Util qw(to_list);
  55         183  
  55         618  
35 55     55   15684 use namespace::clean;
  55         132  
  55         448  
36 55     55   49041 use Net::IP;
  55         3541935  
  55         146634  
37              
38             requires qw(perform_request error_from_text handle);
39              
40             has 'host' => ( is => 'ro', required => 1 );
41             has 'port' => ( is => 'ro', required => 1 );
42             has 'uri' => ( is => 'ro', required => 1 );
43             has 'request_timeout' => ( is => 'ro', default => 30 );
44             has 'ping_timeout' => ( is => 'ro', default => 2 );
45             has 'sniff_timeout' => ( is => 'ro', default => 1 );
46             has 'sniff_request_timeout' => ( is => 'ro', default => 2 );
47             has 'next_ping' => ( is => 'rw', default => 0 );
48             has 'ping_failures' => ( is => 'rw', default => 0 );
49             has 'dead_timeout' => ( is => 'ro', default => 60 );
50             has 'max_dead_timeout' => ( is => 'ro', default => 3600 );
51             has 'serializer' => ( is => 'ro', required => 1 );
52             has 'logger' => ( is => 'ro', required => 1 );
53             has 'handle_args' => ( is => 'ro', default => sub { {} } );
54             has 'default_qs_params' => ( is => 'ro', default => sub { {} } );
55             has 'scheme' => ( is => 'ro' );
56             has 'is_https' => ( is => 'ro' );
57             has 'userinfo' => ( is => 'ro' );
58             has 'max_content_length' => ( is => 'ro' );
59             has 'default_headers' => ( is => 'ro' );
60             has 'deflate' => ( is => 'ro' );
61             has 'gzip' => ( is => 'ro' );
62             has 'ssl_options' => ( is => 'ro', predicate => 'has_ssl_options' );
63             has 'handle' => ( is => 'lazy', clearer => 1 );
64             has '_pid' => ( is => 'rw', default => $$ );
65              
66             my %Code_To_Error = (
67             400 => 'Request',
68             401 => 'Unauthorized',
69             403 => 'Forbidden',
70             404 => 'Missing',
71             408 => 'RequestTimeout',
72             409 => 'Conflict',
73             413 => 'ContentLength',
74             502 => 'BadGateway',
75             503 => 'Unavailable',
76             504 => 'GatewayTimeout'
77             );
78              
79             #===================================
80 611     611 0 2279 sub stringify { shift->uri . '' }
81             #===================================
82              
83             #===================================
84             sub get_user_agent {
85             #===================================
86 176     176 0 180037 return sprintf("opensearch-client-perl/%s (%s; perl %s)", $OpenSearch::Client::VERSION, $^O, $]);
87             }
88              
89             #===================================
90             sub BUILDARGS {
91             #===================================
92 175     175 0 150161 my ( $class, $params ) = parse_params(@_);
93              
94             my $node = $params->{node}
95 175   50     686 || { host => 'localhost', port => '9200' };
96              
97 175 100       453 unless ( ref $node eq 'HASH' ) {
98 170 100       638 $node = "[$node]" if Net::IP::ip_is_ipv6($node);
99 170 100       2248 unless ( $node =~ m{^http(s)?://} ) {
100 124 100       391 $node = ( $params->{use_https} ? 'https://' : 'http://' ) . $node;
101             }
102 170 100 100     487 if ( $params->{port} && $node !~ m{//[^/\[]+:\d+} ) {
103 3         24 $node =~ s{(//[^/]+)}{$1:$params->{port}};
104             }
105 170         880 my $uri = URI->new($node);
106 170         421295 $node = {
107             scheme => $uri->scheme,
108             host => $uri->host,
109             port => $uri->port,
110             path => $uri->path,
111             userinfo => $uri->userinfo
112             };
113             }
114              
115 175   100     20129 my $host = $node->{host} || 'localhost';
116 175   100     7245 my $userinfo = $node->{userinfo} || $params->{userinfo} || '';
117             my $scheme
118 175   66     481 = $node->{scheme} || ( $params->{use_https} ? 'https' : 'http' );
119             my $port
120             = $node->{port}
121             || $params->{port}
122 175   33     457 || ( $scheme eq 'http' ? 80 : 443 );
123 175   100     871 my $path = $node->{path} || $params->{path_prefix} || '';
124 175         807 $path =~ s{^/?}{/}g;
125 175         2104 $path =~ s{/+$}{};
126              
127 175 50       274 my %default_headers = %{ $params->{default_headers} || {} };
  175         746  
128              
129 175 100       475 if ($userinfo) {
130 3         17 require MIME::Base64;
131 3         12 my $auth = MIME::Base64::encode_base64( $userinfo, "" );
132 3         7 chomp $auth;
133 3         7 $default_headers{Authorization} = "Basic $auth";
134             }
135              
136 175 50       623 if ( $params->{gzip} ) {
    100          
137 0         0 $default_headers{'Accept-Encoding'} = "gzip";
138             }
139              
140             elsif ( $params->{deflate} ) {
141 1         2 $default_headers{'Accept-Encoding'} = "deflate";
142             }
143              
144 175         795 $default_headers{'User-Agent'} = $class->get_user_agent();
145              
146 175         339 $params->{scheme} = $scheme;
147 175         382 $params->{is_https} = $scheme eq 'https';
148 175         409 $params->{host} = $host;
149 175         346 $params->{port} = $port;
150 175         323 $params->{path} = $path;
151 175         339 $params->{userinfo} = $userinfo;
152 175 100       544 $host = "[$host]" if Net::IP::ip_is_ipv6($host);
153 175         1786 $params->{uri} = URI->new("$scheme://$host:$port$path");
154 175         10405 $params->{default_headers} = \%default_headers;
155              
156 175         3539 return $params;
157             }
158              
159             #===================================
160             before 'handle' => sub {
161             #===================================
162             my $self = shift;
163             if ( $$ != $self->_pid ) {
164             $self->clear_handle;
165             $self->_pid($$);
166             }
167             };
168              
169             #===================================
170 182     182 1 854 sub is_live { !shift->next_ping }
171 128     128 1 575 sub is_dead { !!shift->next_ping }
172             #===================================
173              
174             #===================================
175             sub mark_live {
176             #===================================
177 149     149 1 795 my $self = shift;
178 149         305 $self->ping_failures(0);
179 149         283 $self->next_ping(0);
180             }
181              
182             #===================================
183             sub mark_dead {
184             #===================================
185 143     143 1 4416 my $self = shift;
186 143         246 my $fails = $self->ping_failures;
187 143         246 $self->ping_failures( $fails + 1 );
188              
189 143         447 my $timeout
190             = min( $self->dead_timeout * 2**$fails, $self->max_dead_timeout );
191 143         270 my $next = $self->next_ping( time() + $timeout );
192              
193 143         359 $self->logger->infof( 'Marking [%s] as dead. Next ping at: %s',
194             $self->stringify, scalar localtime($next) );
195              
196             }
197              
198             #===================================
199             sub force_ping {
200             #===================================
201 121     121 1 197 my $self = shift;
202 121         316 $self->ping_failures(0);
203 121         884 $self->next_ping(-1);
204             }
205              
206             #===================================
207             sub pings_ok {
208             #===================================
209 47     47 1 56 my $self = shift;
210 47         128 $self->logger->infof( 'Pinging [%s]', $self->stringify );
211             return try {
212 47     47   2938 $self->perform_request(
213             { method => 'HEAD',
214             path => '/',
215             timeout => $self->ping_timeout,
216             }
217             );
218 33         171 $self->logger->infof( 'Marking [%s] as live', $self->stringify );
219 33         1114 $self->mark_live;
220 33         68 1;
221             }
222             catch {
223 14     14   613 $self->logger->debug("$_");
224 14         308 $self->mark_dead;
225 14         1133 0;
226 47         2186 };
227             }
228              
229             #===================================
230             sub sniff {
231             #===================================
232 38     38 1 52 my $self = shift;
233 38         118 $self->logger->infof( 'Sniffing [%s]', $self->stringify );
234             return try {
235             $self->perform_request(
236             { method => 'GET',
237             path => '/_nodes/http',
238             qs => { timeout => $self->sniff_timeout . 's' },
239             timeout => $self->sniff_request_timeout,
240             }
241 38     38   2767 )->{nodes};
242             }
243             catch {
244 13     13   370 $self->logger->debug($_);
245 13         414 return;
246 38         1751 };
247             }
248              
249             #===================================
250             sub build_uri {
251             #===================================
252 5     5 1 9 my ( $self, $params ) = @_;
253 5         24 my $uri = $self->uri->clone;
254 5         34 $uri->path( $uri->path . $params->{path} );
255 5 100       195 my %qs = ( %{ $self->default_qs_params }, %{ $params->{qs} || {} } );
  5         12  
  5         21  
256 5         22 $uri->query_form( \%qs );
257 5         337 return $uri;
258             }
259              
260             #===================================
261             before 'perform_request' => sub {
262             #===================================
263             my ( $self, $params ) = @_;
264             return unless defined $params->{data};
265              
266             $self->_compress_body($params);
267              
268             my $max = $self->max_content_length
269             or return;
270              
271             return if length( $params->{data} ) < $max;
272              
273             $self->logger->throw_error( 'ContentLength',
274             "Body is longer than max_content_length ($max)",
275             );
276             };
277              
278             #===================================
279             sub _compress_body {
280             #===================================
281 0     0   0 my ( $self, $params ) = @_;
282 0         0 my $output;
283 0 0       0 if ( $self->gzip ) {
    0          
284 0 0       0 IO::Compress::Gzip::gzip( \( $params->{data} ), \$output )
285             or throw( 'Request',
286             "Couldn't gzip request: $IO::Compress::Gzip::GzipError" );
287 0         0 $params->{data} = $output;
288 0         0 $params->{encoding} = 'gzip';
289             }
290             elsif ( $self->deflate ) {
291 0 0       0 IO::Compress::Deflate::deflate( \( $params->{data} ), \$output )
292             or throw( 'Request',
293             "Couldn't deflate request: $IO::Compress::Deflate::DeflateError" );
294 0         0 $params->{data} = $output;
295 0         0 $params->{encoding} = 'deflate';
296             }
297             }
298              
299             #===================================
300             sub _decompress_body {
301             #===================================
302 238     238   384 my ( $self, $body_ref, $headers ) = @_;
303 238 50       678 if ( my $encoding = $headers->{'content-encoding'} ) {
304 0         0 my $output;
305 0 0       0 if ( $encoding eq 'gzip' ) {
    0          
306 0 0       0 IO::Uncompress::Gunzip::gunzip( $body_ref, \$output )
307             or throw(
308             'Request',
309             "Couldn't gunzip response: $IO::Uncompress::Gunzip::GunzipError"
310             );
311             }
312             elsif ( $encoding eq 'deflate' ) {
313 0 0       0 IO::Uncompress::Inflate::inflate( $body_ref, \$output,
314             Transparent => 0 )
315             or throw(
316             'Request',
317             "Couldn't inflate response: $IO::Uncompress::Inflate::InflateError"
318             );
319             }
320             else {
321 0         0 throw( 'Request', "Unknown content-encoding: $encoding" );
322             }
323 0         0 ${$body_ref} = $output;
  0         0  
324             }
325             }
326              
327             #===================================
328             sub process_response {
329             #===================================
330 238     238 1 28945 my ( $self, $params, $code, $msg, $body, $headers ) = @_;
331 238         635 $self->_decompress_body( \$body, $headers );
332              
333 238   100     830 my ($mime_type) = split /\s*;\s*/, ( $headers->{'content-type'} || '' );
334              
335 238   100     1279 my $is_encoded = $mime_type && $mime_type ne 'text/plain';
336              
337             # Deprecation warnings
338 238 50       576 if ( my $warnings = $headers->{warning} ) {
339 0         0 my $warning_string = _parse_warnings($warnings);
340 0         0 my %temp = (%$params);
341 0         0 delete $temp{data};
342 0         0 $self->logger->deprecation( $warning_string, \%temp );
343             }
344              
345             # Request is successful
346              
347 238 100 66     709 if ( $code >= 200 and $code <= 209 ) {
348 176 100 100     645 if ( defined $body and length $body ) {
349 141 100       704 $body = $self->serializer->decode($body)
350             if $is_encoded;
351 141         2026 return $code, $body;
352             }
353 35 100       127 return ( $code, 1 ) if $params->{method} eq 'HEAD';
354 1         5 return ( $code, '' );
355             }
356              
357             # Check if the error should be ignored
358 62         243 my @ignore = to_list( $params->{ignore} );
359 62 100       193 push @ignore, 404 if $params->{method} eq 'HEAD';
360 62 100       130 return ($code) if grep { $_ eq $code } @ignore;
  12         38  
361              
362             # Determine error type
363 60         124 my $error_type = $Code_To_Error{$code};
364 60 100       116 unless ($error_type) {
365 49 50 33     208 if ( defined $body and length $body ) {
366 0         0 $msg = $body;
367 0         0 $body = undef;
368             }
369 49         132 $error_type = $self->error_from_text( $code, $msg );
370             }
371              
372 60 50       194 delete $params->{data} if $params->{body};
373 60         185 my %error_args = ( status_code => $code, request => $params );
374              
375             # Extract error message from the body, if present
376              
377 60 100       317 if ( $body = $self->serializer->decode($body) ) {
378 2         21 $error_args{body} = $body;
379 2   33     8 $msg = $self->_munge_opensearch_exception($body) || $msg;
380              
381 2 50 33     7 $error_args{current_version} = $1
382             if $error_type eq 'Conflict'
383             and $msg =~ /: version conflict, current (?:version )?\[(\d+)\]/;
384             }
385 60   33     236 $msg ||= $error_type;
386              
387 60         111 chomp $msg;
388 60         153 throw( $error_type, "[" . $self->stringify . "]-[$code] $msg",
389             \%error_args );
390             }
391              
392             #===================================
393             sub _parse_warnings {
394             #===================================
395 0 0   0   0 my @warnings = ref $_[0] eq 'ARRAY' ? @{ shift() } : shift();
  0         0  
396 0         0 my @str;
397 0         0 for (@warnings) {
398 0 0       0 if ( $_ =~ /^\d+\s+\S+\s+"((?:\\"|[^"])+)"/ ) {
399 0         0 my $msg = $1;
400 0         0 $msg =~ s/\\"/"/g, push @str, $msg;
401             }
402             else {
403 0         0 push @str, $_;
404             }
405             }
406 0         0 return join "; ", @str;
407             }
408              
409             #===================================
410             sub _munge_opensearch_exception {
411             #===================================
412 2     2   3 my ( $self, $body ) = @_;
413 2 50       5 return $body unless ref $body eq 'HASH';
414 2   50     6 my $error = $body->{error} || return;
415 2 50       6 return $error unless ref $error eq 'HASH';
416              
417 0   0       my $root_causes = $error->{root_cause} || [];
418 0 0         unless (@$root_causes) {
419 0 0         my $msg = "[" . $error->{type} . "] " if $error->{type};
420 0 0         $msg .= $error->{reason} if $error->{reason};
421 0           return $msg;
422             }
423              
424 0           my $json = $self->serializer;
425 0           my @msgs;
426 0           for (@$root_causes) {
427 0           my %cause = (%$_);
428             my $msg
429 0           = "[" . ( delete $cause{type} ) . "] " . ( delete $cause{reason} );
430 0 0         if ( keys %cause ) {
431 0           $msg .= ", with: " . $json->encode( \%cause );
432             }
433 0           push @msgs, $msg;
434             }
435 0           return ( join ", ", @msgs );
436             }
437              
438             1;
439              
440             __END__