File Coverage

blib/lib/OpenSearch/Client/Role/CxnPool.pm
Criterion Covered Total %
statement 65 65 100.0
branch 13 14 92.8
condition n/a
subroutine 17 17 100.0
pod 9 9 100.0
total 104 105 99.0


line stmt bran cond sub pod time code
1             # OpenSearch::Client is an unofficial client for OpenSearch.
2             # It is derived from Search::Elasticsearch version 7.714
3             # License details from the original work are contained in the
4             # NOTICE file distributed with this work.
5             #
6             #-----------------------------------------------------------------------
7             # OpenSearch::Client
8             #-----------------------------------------------------------------------
9             # Copyright 2026 Mark Dootson
10             #
11             # Licensed under the Apache License, Version 2.0 (the "License");
12             # you may not use this file except in compliance with the License.
13             # You may obtain a copy of the License at
14             #
15             # http://www.apache.org/licenses/LICENSE-2.0
16             #
17             # Unless required by applicable law or agreed to in writing, software
18             # distributed under the License is distributed on an "AS IS" BASIS,
19             # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20             # See the License for the specific language governing permissions and
21             # limitations under the License.
22              
23             package OpenSearch::Client::Role::CxnPool;
24             $OpenSearch::Client::Role::CxnPool::VERSION = '3.007000';
25 55     55   25489 use Moo::Role;
  55         98  
  55         318  
26 55     55   22135 use OpenSearch::Client::Util qw(parse_params);
  55         317  
  55         533  
27 55     55   12357 use List::Util qw(shuffle);
  55         124  
  55         3509  
28 55     55   21415 use IO::Select();
  55         72889  
  55         1457  
29 55     55   313 use Time::HiRes qw(time sleep);
  55         79  
  55         512  
30 55     55   3620 use OpenSearch::Client::Util qw(to_list);
  55         103  
  55         334  
31 55     55   11418 use namespace::clean;
  55         91  
  55         288  
32              
33             requires qw(next_cxn schedule_check);
34              
35             has 'cxn_factory' => ( is => 'ro', required => 1 );
36             has 'logger' => ( is => 'ro', required => 1 );
37             has 'serializer' => ( is => 'ro', required => 1 );
38             has 'current_cxn_num' => ( is => 'rwp', default => 0 );
39             has 'cxns' => ( is => 'rwp', default => sub { [] } );
40             has 'seed_nodes' => ( is => 'ro', required => 1 );
41             has 'retries' => ( is => 'rw', default => 0 );
42             has 'randomize_cxns' => ( is => 'ro', default => 1 );
43              
44             #===================================
45             around BUILDARGS => sub {
46             #===================================
47             my $orig = shift;
48             my $params = $orig->(@_);
49             my @seed = grep {$_} to_list( delete $params->{nodes} || ('') );
50              
51             @seed = $params->{cxn_factory}->default_host
52             unless @seed;
53             $params->{seed_nodes} = \@seed;
54             return $params;
55             };
56              
57             #===================================
58             sub next_cxn_num {
59             #===================================
60 200     200 1 274 my $self = shift;
61 200         295 my $cxns = $self->cxns;
62 200 50       387 return unless @$cxns;
63 200         332 my $current = $self->current_cxn_num;
64 200         504 $self->_set_current_cxn_num( ( $current + 1 ) % @$cxns );
65 200         377 return $current;
66             }
67              
68             #===================================
69             sub set_cxns {
70             #===================================
71 112     112 1 188 my $self = shift;
72 112         221 my $factory = $self->cxn_factory;
73 112         386 my @cxns = map { $factory->new_cxn($_) } @_;
  155         32094  
74 112 100       12836 @cxns = shuffle @cxns if $self->randomize_cxns;
75 112         401 $self->_set_cxns( \@cxns );
76 112         288 $self->_set_current_cxn_num(0);
77              
78             $self->logger->infof( "Current cxns: %s",
79 112         276 [ map { $_->stringify } @cxns ] );
  155         587  
80              
81 112         48682 return;
82             }
83              
84             #===================================
85             sub request_ok {
86             #===================================
87 114     114 1 246 my ( $self, $cxn ) = @_;
88 114         413 $cxn->mark_live;
89 114         844 $self->reset_retries;
90             }
91              
92             #===================================
93             sub request_failed {
94             #===================================
95 46     46 1 90 my ( $self, $cxn, $error ) = @_;
96              
97 46 100       169 if ( $error->is( 'Cxn', 'Timeout' ) ) {
98 33 100       93 $cxn->mark_dead if $self->should_mark_dead($error);
99 33         2118 $self->schedule_check;
100              
101 33 100       116 if ( $self->should_retry($error) ) {
102 25         84 my $retries = $self->retries( $self->retries + 1 );
103 25 100       96 return 1 if $retries < $self->_max_retries;
104             }
105             }
106             else {
107 13 100       28 $cxn->mark_live if $cxn;
108             }
109 27         115 $self->reset_retries;
110 27         69 return 0;
111             }
112              
113             #===================================
114             sub should_retry {
115             #===================================
116 33     33 1 62 my ( $self, $error ) = @_;
117 33         87 return $error->is('Cxn');
118             }
119              
120             #===================================
121             sub should_mark_dead {
122             #===================================
123 21     21 1 35 my ( $self, $error ) = @_;
124 21         40 return $error->is('Cxn');
125             }
126              
127             #===================================
128             sub cxns_str {
129             #===================================
130 7     7 1 10 my $self = shift;
131 7         11 join ", ", map { $_->stringify } @{ $self->cxns };
  12         40  
  7         35  
132             }
133              
134             #===================================
135             sub cxns_seeds_str {
136             #===================================
137 5     5 1 6 my $self = shift;
138 4         13 join ", ", ( map { $_->stringify } @{ $self->cxns } ),
  5         12  
139 5         8 @{ $self->seed_nodes };
  5         33  
140             }
141              
142             #===================================
143 141     141 1 1019 sub reset_retries { shift->retries(0) }
144 14     14   54 sub _max_retries {2}
145             #===================================
146              
147             1;
148              
149             __END__