File Coverage

blib/lib/Search/Elasticsearch/CxnPool/Async/Sniff.pm
Criterion Covered Total %
statement 77 78 98.7
branch 20 22 90.9
condition 11 11 100.0
subroutine 10 10 100.0
pod 2 2 100.0
total 120 123 97.5


line stmt bran cond sub pod time code
1             # Licensed to Elasticsearch B.V. under one or more contributor
2             # license agreements. See the NOTICE file distributed with
3             # this work for additional information regarding copyright
4             # ownership. Elasticsearch B.V. licenses this file to you under
5             # the Apache License, Version 2.0 (the "License"); you may
6             # not use this file except in compliance with the License.
7             # You may obtain a copy of the License at
8             #
9             # http://www.apache.org/licenses/LICENSE-2.0
10             #
11             # Unless required by applicable law or agreed to in writing,
12             # software distributed under the License is distributed on an
13             # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14             # KIND, either express or implied. See the License for the
15             # specific language governing permissions and limitations
16             # under the License.
17              
18             package Search::Elasticsearch::CxnPool::Async::Sniff;
19             $Search::Elasticsearch::CxnPool::Async::Sniff::VERSION = '7.717';
20 11     11   481283 use Moo;
  11         23  
  11         55  
21             with 'Search::Elasticsearch::Role::CxnPool::Sniff',
22             'Search::Elasticsearch::Role::Is_Async';
23              
24 11     11   3177 use Scalar::Util qw(weaken);
  11         21  
  11         649  
25 11     11   76 use Promises qw(deferred);
  11         21  
  11         81  
26 11     11   2706 use Search::Elasticsearch::Util qw(new_error);
  11         20  
  11         77  
27              
28 11     11   2114 use namespace::clean;
  11         20  
  11         63  
29             has 'concurrent_sniff' => ( is => 'rw', default => 4 );
30             has '_current_sniff' => ( is => 'rw', clearer => '_clear_sniff' );
31              
32             #===================================
33             sub next_cxn {
34             #===================================
35 79     79 1 395 my ( $self, $no_sniff ) = @_;
36              
37 29     29   1440 return $self->sniff->then( sub { $self->next_cxn('no_sniff') } )
38 79 100 100     447 if $self->next_sniff <= time() && !$no_sniff;
39              
40 50         102 my $cxns = $self->cxns;
41 50         83 my $total = @$cxns;
42 50         61 my $cxn;
43              
44 50         123 while ( 0 < $total-- ) {
45 49         145 $cxn = $cxns->[ $self->next_cxn_num ];
46 49 100       541 last if $cxn->is_live;
47 4         19 undef $cxn;
48             }
49              
50 50         292 my $deferred = deferred;
51              
52 50 100       492 if ($cxn) {
53 45         99 $deferred->resolve($cxn);
54             }
55             else {
56 5         21 $deferred->reject(
57             new_error(
58             "NoNodes",
59             "No nodes are available: [" . $self->cxns_seeds_str . ']'
60             )
61             );
62             }
63 50         1523 return $deferred->promise;
64             }
65              
66             #===================================
67             sub sniff {
68             #===================================
69 29     29 1 50 my $self = shift;
70              
71 29         48 my $promise;
72 29 50       141 if ( $promise = $self->_current_sniff ) {
73 0         0 return $promise;
74             }
75              
76 29         66 my $deferred = deferred;
77 29         329 my $cxns = $self->cxns;
78 29         47 my $total = @$cxns;
79 29         44 my $done = 0;
80 29         44 my $current = 0;
81 29         41 my $done_seeds = 0;
82 29         77 $promise = $self->_current_sniff( $deferred->promise );
83              
84 29         386 my ( @all, @skipped );
85              
86 29         89 while ( 0 < $total-- ) {
87 27         68 my $cxn = $cxns->[ $self->next_cxn_num ];
88 27 100       281 if ( $cxn->is_dead ) {
89 10         45 push @skipped, $cxn;
90             }
91             else {
92 17         99 push @all, $cxn;
93             }
94             }
95              
96 29         70 push @all, @skipped;
97 29 100       85 unless (@all) {
98 13         43 @all = $self->_seeds_as_cxns;
99 13         25791 $done_seeds++;
100             }
101              
102 29         52 my ( $weak_check_sniff, $cxn );
103             my $check_sniff = sub {
104              
105 54 100   54   3530 return if $done;
106 38         66 my ( $cxn, $nodes ) = @_;
107 38 100 100     146 if ( $nodes && $self->parse_sniff($nodes) ) {
108 24         25656 $done++;
109 24         334 $self->_clear_sniff;
110 24         153 return $deferred->resolve();
111             }
112              
113 14 100 100     73 unless ( @all || $done_seeds++ ) {
114 2         36 $self->logger->info("No live nodes available. Trying seed nodes.");
115 2         65 @all = $self->_seeds_as_cxns;
116             }
117              
118 14 100       892 if ( my $cxn = shift @all ) {
119 9         26 return $cxn->sniff->done($weak_check_sniff);
120             }
121 5 50       20 if ( --$current == 0 ) {
122 5         88 $self->_clear_sniff;
123 5         31 $deferred->resolve();
124             }
125 29         173 };
126 29         112 weaken( $weak_check_sniff = $check_sniff );
127              
128 29         109 for ( 1 .. $self->concurrent_sniff ) {
129 74   100     1539 my $cxn = shift(@all) || last;
130 45         66 $current++;
131 45         159 $cxn->sniff->done($check_sniff);
132             }
133              
134 29         345 return $promise;
135             }
136              
137             #===================================
138             sub _seeds_as_cxns {
139             #===================================
140 15     15   29 my $self = shift;
141 15         49 my $factory = $self->cxn_factory;
142 15         45 return map { $factory->new_cxn($_) } @{ $self->seed_nodes };
  27         89379  
  15         52  
143             }
144              
145             1;
146              
147             # ABSTRACT: An async CxnPool for connecting to a local cluster with a dynamic node list
148              
149             __END__