File Coverage

blib/lib/MongoDB/Role/_TopologyMonitoring.pm
Criterion Covered Total %
statement 72 85 84.7
branch 37 42 88.1
condition 3 7 42.8
subroutine 14 17 82.3
pod 0 9 0.0
total 126 160 78.7


line stmt bran cond sub pod time code
1             # Copyright 2018 - present MongoDB, Inc.
2             #
3             # Licensed under the Apache License, Version 2.0 (the "License");
4             # you may not use this file except in compliance with the License.
5             # You may obtain a copy of the License at
6             #
7             # http://www.apache.org/licenses/LICENSE-2.0
8             #
9             # Unless required by applicable law or agreed to in writing, software
10             # distributed under the License is distributed on an "AS IS" BASIS,
11             # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12             # See the License for the specific language governing permissions and
13             # limitations under the License.
14              
15 59     59   40377 use strict;
  59         168  
  59         2230  
16 59     59   358 use warnings;
  59         160  
  59         2740  
17              
18             package MongoDB::Role::_TopologyMonitoring;
19              
20             # MongoDB role to add topology monitoring support
21              
22 59     59   352 use version;
  59         148  
  59         488  
23             our $VERSION = 'v2.2.0';
24              
25 59     59   5266 use Moo::Role;
  59         173  
  59         505  
26 59     59   22992 use namespace::clean;
  59         172  
  59         426  
27              
28             # These are used to cache data
29             has old_topology_desc => ( is => 'rw' );
30             has old_server_desc => ( is => 'rw' );
31              
32             sub publish_topology_opening {
33 72     72 0 139 my $self = shift;
34              
35 72         271 my $event = {
36             topologyId => "$self",
37             type => "topology_opening_event"
38             };
39              
40 72         123 eval { $self->monitoring_callback->($event) };
  72         216  
41             }
42              
43             sub publish_topology_closing {
44 72     72 0 130 my $self = shift;
45              
46 72         295 my $event = {
47             topologyId => "$self",
48             type => "topology_closed_event"
49             };
50              
51 72         147 eval { $self->monitoring_callback->($event) };
  72         269  
52             }
53              
54             sub publish_server_opening {
55 153     153 0 268 my ( $self, $address ) = @_;
56              
57 153         555 my $event = {
58             topologyId => "$self",
59             address => $address,
60             type => "server_opening_event"
61             };
62              
63 153         567 eval { $self->monitoring_callback->($event) };
  153         392  
64             }
65              
66             sub publish_server_closing {
67 39     39 0 110 my ( $self, $address ) = @_;
68              
69 39         159 my $event = {
70             topologyId => "$self",
71             address => $address,
72             type => "server_closed_event"
73             };
74              
75 39         69 eval { $self->monitoring_callback->($event) };
  39         163  
76             }
77              
78             sub publish_server_heartbeat_started {
79 0     0 0 0 my ($self, $link) = @_;
80              
81 0         0 my $event = {
82             connectionId => $link->address,
83             type => "server_heartbeat_started_event"
84             };
85              
86 0         0 eval { $self->monitoring_callback->($event) };
  0         0  
87             }
88              
89             sub publish_server_heartbeat_succeeded {
90 0     0 0 0 my ($self, $link, $rtt_sec_fail, $is_master) = @_;
91              
92 0         0 my $event = {
93             duration => $rtt_sec_fail,
94             reply => $is_master,
95             connectionId => $link->address,
96             type => "server_heartbeat_succeeded_event"
97             };
98              
99 0         0 eval { $self->monitoring_callback->($event) };
  0         0  
100             }
101              
102             sub publish_server_heartbeat_failed {
103 0     0 0 0 my ($self, $link, $rtt_sec_fail, $e) = @_;
104              
105 0         0 my $event = {
106             duration => $rtt_sec_fail,
107             failure => $e,
108             connectionId => $link->address,
109             type => "server_heartbeat_failed_event"
110             };
111              
112 0         0 eval { $self->monitoring_callback->($event) };
  0         0  
113             }
114              
115             sub __create_server_description {
116 940     940   1401 my ($self, $server) = @_;
117              
118 940         1222 my $server_desc;
119              
120 940 50       1909 if (defined $server->is_master) {
121             $server_desc = {
122             address => $server->address,
123             error => $server->error,
124             roundTripTime => $server->rtt_sec,
125             lastWriteDate => $server->is_master->{lastWrite}->{lastWriteDate},
126             opTime => $server->is_master->{opTime},
127             type => $server->type || "Unknown",
128             minWireVersion => $server->is_master->{min_wire_version},
129             maxWireVersion => $server->is_master->{max_wire_version},
130             me => $server->me,
131             arbiters => $server->arbiters,
132             hosts => $server->hosts,
133             passives => $server->passives,
134             (defined $server->is_master->{tags} ?
135             (tags => $server->is_master->{tags}) : ()),
136             ($server->primary ne "" ? (primary => $server->primary) : ()),
137             (defined $server->is_master->{setName} ?
138             (setName => $server->is_master->{setName}) : ()),
139             (defined $server->is_master->{setVersion} ?
140             (setVersion => $server->is_master->{setVersion}) : ()),
141             (defined $server->is_master->{electionId} ?
142             (electionId => $server->is_master->{electionId}) : ()),
143             (defined $server->is_master->{logicalSessionTimeoutMinutes} ?
144             (logicalSessionTimeoutMinutes =>
145 940 50 50     14741 $server->is_master->{logicalSessionTimeoutMinutes}) : ()),
    100          
    100          
    100          
    100          
    100          
146             };
147             } else {
148 0 0 0     0 $server_desc = {
149             address => $server->address,
150             error => $server->error,
151             roundTripTime => $server->rtt_sec,
152             type => $server->type || "Unknown",
153             me => $server->me,
154             arbiters => $server->arbiters,
155             hosts => $server->hosts,
156             passives => $server->passives,
157             #TODO figure out what tags should be
158             tags => undef,
159             ($server->primary ne "" ? (primary => $server->primary) : ()),
160             };
161             }
162              
163 940         65110 return $server_desc;
164             }
165              
166             sub __has_changed_servers {
167 146     146   292 my ($self, $new_server ) = @_;
168              
169             # Fields considered Server Description equality
170 146         221 my $equal_servers = 1;
171 146         744 my %equality_fields = (
172             address => 1,
173             type => 1,
174             minWireVersion => 1,
175             minWireVersion => 1,
176             me => 1,
177             arbiters => 1,
178             hosts => 1,
179             passives => 1,
180             tags => 1,
181             primary => 1,
182             setName => 1,
183             setVersion => 1,
184             electionId => 1,
185             logicalSessionTimeoutMinutes => 1,
186             );
187 146         269 my $new_server_desc = $self->__create_server_description($new_server);
188              
189 146         200 my %oldhash = %{$self->old_server_desc};
  146         1031  
190 146         293 my %newhash = %{$new_server_desc};
  146         735  
191              
192 146         506 foreach my $key (keys %newhash) {
193 474 100       829 if (exists($equality_fields{$key})) {
194 291 100 66     1007 if (!exists($oldhash{$key})) {
    100          
    100          
195 30         48 $equal_servers = 0;
196 30         45 last;
197             } elsif (!defined($newhash{$key}) &&
198             !defined($oldhash{$key})) {
199 38         81 next;
200             } elsif ($newhash{$key} ne $oldhash{$key}) {
201 106         182 $equal_servers = 0;
202 106         195 last;
203             }
204             }
205             }
206              
207 146 100       400 unless ( $equal_servers ) {
208 136         617 my $event_server = {
209             topologyId => "$self",
210             address => $new_server->address,
211             previousDescription => $self->old_server_desc,
212             newDescription => $new_server_desc,
213             type => "server_description_changed_event"
214             };
215              
216 136         212 eval { $self->monitoring_callback->($event_server) };
  136         404  
217             }
218             }
219              
220             sub publish_old_topology_desc {
221 218     218 0 432 my ( $self, $address, $new_server ) = @_;
222              
223 218 100       459 if ( $address ) {
224 146         281 my $server = $self->servers->{$address};
225 146         311 my $old_server = $self->__create_server_description($server);
226 146         529 $self->old_server_desc($old_server);
227             }
228              
229 218 100       474 if ( $new_server ) {
230 146         319 $self->__has_changed_servers($new_server);
231             }
232              
233 218         8620 $self->old_topology_desc( $self->__create_topology_description );
234             }
235              
236             sub publish_new_topology_desc {
237 218     218 0 306 my $self = shift;
238              
239 218         655 my $event_topology = {
240             topologyId => "$self",
241             previousDescription => $self->old_topology_desc,
242             newDescription => $self->__create_topology_description,
243             type => "topology_description_changed_event"
244             };
245              
246 218         3383 eval { $self->monitoring_callback->($event_topology) };
  218         600  
247             }
248              
249             sub __create_topology_description {
250 436     436   820 my ( $self ) = @_;
251              
252 436         1032 my @servers = map { $self->__create_server_description($_) } $self->all_servers;
  648         1298  
253              
254             return {
255 436 100       7141 topologyType => $self->type,
    100          
    100          
    100          
    50          
    100          
256             ( $self->replica_set_name ne ""
257             ? ( setName => $self->replica_set_name )
258             : ()
259             ),
260             ( defined $self->max_set_version
261             ? ( maxSetVersion => $self->max_set_version )
262             : ()
263             ),
264             ( defined $self->max_election_id
265             ? ( maxElectionId => $self->max_election_id )
266             : ()
267             ),
268             servers => \@servers,
269             stale => $self->stale,
270             ( defined $self->is_compatible
271             ? ( compatible => $self->is_compatible )
272             : ()
273             ),
274             ( defined $self->compatibility_error
275             ? ( compatibilityError => $self->compatibility_error )
276             : ()
277             ),
278             ( defined $self->logical_session_timeout_minutes
279             ? ( logicalSessionTimeoutMinutes => $self->logical_session_timeout_minutes )
280             : ()
281             ),
282             };
283             }
284              
285             1;