File Coverage

blib/lib/OpenSearch/Client/Role/CxnPool.pm
Criterion Covered Total %
statement 65 65 100.0
branch 13 14 92.8
condition n/a
subroutine 17 17 100.0
pod 9 9 100.0
total 104 105 99.0


line stmt bran cond sub pod time code
1             # OpenSearch::Client is an unofficial client for OpenSearch.
2             # It is derived from Search::Elasticsearch version 7.714
3             # License details from the original work are contained in the
4             # NOTICE file distributed with this work.
5             #
6             #-----------------------------------------------------------------------
7             # OpenSearch::Client
8             #-----------------------------------------------------------------------
9             # Copyright 2026 Mark Dootson
10             #
11             # Licensed under the Apache License, Version 2.0 (the "License");
12             # you may not use this file except in compliance with the License.
13             # You may obtain a copy of the License at
14             #
15             # http://www.apache.org/licenses/LICENSE-2.0
16             #
17             # Unless required by applicable law or agreed to in writing, software
18             # distributed under the License is distributed on an "AS IS" BASIS,
19             # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20             # See the License for the specific language governing permissions and
21             # limitations under the License.
22              
23             package OpenSearch::Client::Role::CxnPool;
24             $OpenSearch::Client::Role::CxnPool::VERSION = '3.007004';
25 55     55   26249 use Moo::Role;
  55         155  
  55         318  
26 55     55   22468 use OpenSearch::Client::Util qw(parse_params);
  55         338  
  55         484  
27 55     55   12997 use List::Util qw(shuffle);
  55         132  
  55         3618  
28 55     55   21553 use IO::Select();
  55         78960  
  55         1594  
29 55     55   296 use Time::HiRes qw(time sleep);
  55         92  
  55         487  
30 55     55   3804 use OpenSearch::Client::Util qw(to_list);
  55         79  
  55         377  
31 55     55   11765 use namespace::clean;
  55         116  
  55         317  
32              
33             requires qw(next_cxn schedule_check);
34              
35             has 'cxn_factory' => ( is => 'ro', required => 1 );
36             has 'logger' => ( is => 'ro', required => 1 );
37             has 'serializer' => ( is => 'ro', required => 1 );
38             has 'current_cxn_num' => ( is => 'rwp', default => 0 );
39             has 'cxns' => ( is => 'rwp', default => sub { [] } );
40             has 'seed_nodes' => ( is => 'ro', required => 1 );
41             has 'retries' => ( is => 'rw', default => 0 );
42             has 'randomize_cxns' => ( is => 'ro', default => 1 );
43              
44             #===================================
45             around BUILDARGS => sub {
46             #===================================
47             my $orig = shift;
48             my $params = $orig->(@_);
49             my @seed = grep {$_} to_list( delete $params->{nodes} || ('') );
50              
51             @seed = $params->{cxn_factory}->default_host
52             unless @seed;
53             $params->{seed_nodes} = \@seed;
54             return $params;
55             };
56              
57             #===================================
58             sub next_cxn_num {
59             #===================================
60 200     200 1 262 my $self = shift;
61 200         283 my $cxns = $self->cxns;
62 200 50       364 return unless @$cxns;
63 200         352 my $current = $self->current_cxn_num;
64 200         433 $self->_set_current_cxn_num( ( $current + 1 ) % @$cxns );
65 200         366 return $current;
66             }
67              
68             #===================================
69             sub set_cxns {
70             #===================================
71 112     112 1 154 my $self = shift;
72 112         279 my $factory = $self->cxn_factory;
73 112         283 my @cxns = map { $factory->new_cxn($_) } @_;
  155         33983  
74 112 100       13152 @cxns = shuffle @cxns if $self->randomize_cxns;
75 112         383 $self->_set_cxns( \@cxns );
76 112         309 $self->_set_current_cxn_num(0);
77              
78             $self->logger->infof( "Current cxns: %s",
79 112         305 [ map { $_->stringify } @cxns ] );
  155         591  
80              
81 112         52397 return;
82             }
83              
84             #===================================
85             sub request_ok {
86             #===================================
87 114     114 1 327 my ( $self, $cxn ) = @_;
88 114         313 $cxn->mark_live;
89 114         894 $self->reset_retries;
90             }
91              
92             #===================================
93             sub request_failed {
94             #===================================
95 46     46 1 94 my ( $self, $cxn, $error ) = @_;
96              
97 46 100       194 if ( $error->is( 'Cxn', 'Timeout' ) ) {
98 33 100       104 $cxn->mark_dead if $self->should_mark_dead($error);
99 33         2207 $self->schedule_check;
100              
101 33 100       90 if ( $self->should_retry($error) ) {
102 25         201 my $retries = $self->retries( $self->retries + 1 );
103 25 100       65 return 1 if $retries < $self->_max_retries;
104             }
105             }
106             else {
107 13 100       31 $cxn->mark_live if $cxn;
108             }
109 27         120 $self->reset_retries;
110 27         69 return 0;
111             }
112              
113             #===================================
114             sub should_retry {
115             #===================================
116 33     33 1 67 my ( $self, $error ) = @_;
117 33         140 return $error->is('Cxn');
118             }
119              
120             #===================================
121             sub should_mark_dead {
122             #===================================
123 21     21 1 37 my ( $self, $error ) = @_;
124 21         46 return $error->is('Cxn');
125             }
126              
127             #===================================
128             sub cxns_str {
129             #===================================
130 7     7 1 12 my $self = shift;
131 7         12 join ", ", map { $_->stringify } @{ $self->cxns };
  12         60  
  7         18  
132             }
133              
134             #===================================
135             sub cxns_seeds_str {
136             #===================================
137 5     5 1 8 my $self = shift;
138 4         14 join ", ", ( map { $_->stringify } @{ $self->cxns } ),
  5         11  
139 5         8 @{ $self->seed_nodes };
  5         31  
140             }
141              
142             #===================================
143 141     141 1 1035 sub reset_retries { shift->retries(0) }
144 14     14   56 sub _max_retries {2}
145             #===================================
146              
147             1;
148              
149             __END__