File Coverage

blib/lib/OpenSearch/Client/Role/CxnPool.pm
Criterion Covered Total %
statement 65 65 100.0
branch 13 14 92.8
condition n/a
subroutine 17 17 100.0
pod 9 9 100.0
total 104 105 99.0


line stmt bran cond sub pod time code
1             # OpenSearch::Client is an unofficial client for OpenSearch.
2             # It is derived from Search::Elasticsearch version 7.714
3             # License details from the original work are contained in the
4             # NOTICE file distributed with this work.
5             #
6             #-----------------------------------------------------------------------
7             # OpenSearch::Client
8             #-----------------------------------------------------------------------
9             # Copyright 2026 Mark Dootson
10             #
11             # Licensed under the Apache License, Version 2.0 (the "License");
12             # you may not use this file except in compliance with the License.
13             # You may obtain a copy of the License at
14             #
15             # http://www.apache.org/licenses/LICENSE-2.0
16             #
17             # Unless required by applicable law or agreed to in writing, software
18             # distributed under the License is distributed on an "AS IS" BASIS,
19             # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20             # See the License for the specific language governing permissions and
21             # limitations under the License.
22              
23             package OpenSearch::Client::Role::CxnPool;
24             $OpenSearch::Client::Role::CxnPool::VERSION = '3.007002';
25 55     55   26077 use Moo::Role;
  55         115  
  55         319  
26 55     55   22334 use OpenSearch::Client::Util qw(parse_params);
  55         594  
  55         580  
27 55     55   12637 use List::Util qw(shuffle);
  55         146  
  55         3589  
28 55     55   21248 use IO::Select();
  55         74317  
  55         1604  
29 55     55   314 use Time::HiRes qw(time sleep);
  55         81  
  55         522  
30 55     55   3775 use OpenSearch::Client::Util qw(to_list);
  55         84  
  55         362  
31 55     55   11802 use namespace::clean;
  55         112  
  55         279  
32              
33             requires qw(next_cxn schedule_check);
34              
35             has 'cxn_factory' => ( is => 'ro', required => 1 );
36             has 'logger' => ( is => 'ro', required => 1 );
37             has 'serializer' => ( is => 'ro', required => 1 );
38             has 'current_cxn_num' => ( is => 'rwp', default => 0 );
39             has 'cxns' => ( is => 'rwp', default => sub { [] } );
40             has 'seed_nodes' => ( is => 'ro', required => 1 );
41             has 'retries' => ( is => 'rw', default => 0 );
42             has 'randomize_cxns' => ( is => 'ro', default => 1 );
43              
44             #===================================
45             around BUILDARGS => sub {
46             #===================================
47             my $orig = shift;
48             my $params = $orig->(@_);
49             my @seed = grep {$_} to_list( delete $params->{nodes} || ('') );
50              
51             @seed = $params->{cxn_factory}->default_host
52             unless @seed;
53             $params->{seed_nodes} = \@seed;
54             return $params;
55             };
56              
57             #===================================
58             sub next_cxn_num {
59             #===================================
60 200     200 1 258 my $self = shift;
61 200         350 my $cxns = $self->cxns;
62 200 50       349 return unless @$cxns;
63 200         342 my $current = $self->current_cxn_num;
64 200         451 $self->_set_current_cxn_num( ( $current + 1 ) % @$cxns );
65 200         377 return $current;
66             }
67              
68             #===================================
69             sub set_cxns {
70             #===================================
71 112     112 1 189 my $self = shift;
72 112         236 my $factory = $self->cxn_factory;
73 112         273 my @cxns = map { $factory->new_cxn($_) } @_;
  155         33904  
74 112 100       12569 @cxns = shuffle @cxns if $self->randomize_cxns;
75 112         369 $self->_set_cxns( \@cxns );
76 112         318 $self->_set_current_cxn_num(0);
77              
78             $self->logger->infof( "Current cxns: %s",
79 112         273 [ map { $_->stringify } @cxns ] );
  155         581  
80              
81 112         48675 return;
82             }
83              
84             #===================================
85             sub request_ok {
86             #===================================
87 114     114 1 180 my ( $self, $cxn ) = @_;
88 114         301 $cxn->mark_live;
89 114         921 $self->reset_retries;
90             }
91              
92             #===================================
93             sub request_failed {
94             #===================================
95 46     46 1 90 my ( $self, $cxn, $error ) = @_;
96              
97 46 100       162 if ( $error->is( 'Cxn', 'Timeout' ) ) {
98 33 100       152 $cxn->mark_dead if $self->should_mark_dead($error);
99 33         2184 $self->schedule_check;
100              
101 33 100       175 if ( $self->should_retry($error) ) {
102 25         95 my $retries = $self->retries( $self->retries + 1 );
103 25 100       101 return 1 if $retries < $self->_max_retries;
104             }
105             }
106             else {
107 13 100       29 $cxn->mark_live if $cxn;
108             }
109 27         144 $self->reset_retries;
110 27         72 return 0;
111             }
112              
113             #===================================
114             sub should_retry {
115             #===================================
116 33     33 1 82 my ( $self, $error ) = @_;
117 33         91 return $error->is('Cxn');
118             }
119              
120             #===================================
121             sub should_mark_dead {
122             #===================================
123 21     21 1 38 my ( $self, $error ) = @_;
124 21         41 return $error->is('Cxn');
125             }
126              
127             #===================================
128             sub cxns_str {
129             #===================================
130 7     7 1 10 my $self = shift;
131 7         10 join ", ", map { $_->stringify } @{ $self->cxns };
  12         39  
  7         18  
132             }
133              
134             #===================================
135             sub cxns_seeds_str {
136             #===================================
137 5     5 1 8 my $self = shift;
138 4         13 join ", ", ( map { $_->stringify } @{ $self->cxns } ),
  5         27  
139 5         7 @{ $self->seed_nodes };
  5         32  
140             }
141              
142             #===================================
143 141     141 1 1062 sub reset_retries { shift->retries(0) }
144 14     14   84 sub _max_retries {2}
145             #===================================
146              
147             1;
148              
149             __END__