File Coverage

blib/lib/Mojar/Mysql/Replication.pm
Criterion Covered Total %
statement 12 186 6.4
branch 0 116 0.0
condition 0 83 0.0
subroutine 4 22 18.1
pod 0 17 0.0
total 16 424 3.7


line stmt bran cond sub pod time code
1             package Mojar::Mysql::Replication;
2 1     1   1539 use Mojo::Base -base;
  1         2  
  1         9  
3              
4             our $VERSION = 0.034;
5              
6 1     1   191 use Carp 'croak';
  1         2  
  1         54  
7 1     1   7 use Mojar::Util qw(as_bool dumper lc_keys);
  1         2  
  1         49  
8 1     1   372 use Mojar::Log;
  1         8949  
  1         7  
9              
10             # Attributes
11              
12             # Time
13             has stop_io_run_time => 2;
14             has stop_sql_max_wait => 55;
15              
16             # Behaviour
17             has safety => 'medium_safety';
18              
19             # Accessors
20             has 'connector';
21             has log => sub { Mojar::Log->new(pattern => '%y%m%d %H%M%S') };
22              
23             # Public methods
24              
25             sub stop_io {
26 0     0 0   my $self = shift;
27             eval {
28 0           $self->connector->connection->do(q{STOP SLAVE IO_THREAD});
29 0           1;
30             }
31 0 0         or do {
32 0   0       my $e = $@ // 'unknown failure';
33 0 0         if ($e =~ / failed: This operation requires a running slave/) {
34 0 0         $self->log->warn('stop_io: Was not a running slave') if $self->log;
35 0           return undef;
36             }
37             else {
38 0 0         $self->log->error("stop_io: $e") if $self->log;
39 0           croak "Failed to stop io_thread: $e";
40             }
41             };
42 0           return $self;
43             }
44              
45             sub start_io {
46 0     0 0   my $self = shift;
47             eval {
48 0           $self->connector->connection->do(q{START SLAVE IO_THREAD});
49 0           1;
50             }
51 0 0         or do {
52 0   0       my $e = $@ // 'unknown failure';
53 0 0         if ($e =~
54             / failed: This operation cannot be performed with a running slave/) {
55 0 0         $self->log->warn('start_io: Was already a running slave') if $self->log;
56 0           return undef;
57             }
58             else {
59 0 0         $self->log->error("start_io: $e") if $self->log;
60 0           croak "Failed to start io_thread: $e";
61             }
62             };
63 0           return $self;
64             }
65              
66             sub stop_sql {
67 0     0 0   my $self = shift;
68             eval {
69 0           $self->connector->connection->do(q{STOP SLAVE SQL_THREAD});
70 0           1;
71             }
72 0 0         or do {
73 0   0       my $e = $@ // '';
74 0 0         if ($e =~ / failed: This operation requires a running slave/) {
75 0 0         $self->log->warn(q{stop_sql: Was not a running slave}) if $self->log;
76 0           return undef;
77             }
78             else {
79 0 0         $self->log->error("stop_sql: $e") if $self->log;
80 0           croak "Failed to stop sql_thread: $e";
81             }
82             };
83 0           return $self;
84             }
85              
86             sub start_sql {
87 0     0 0   my $self = shift;
88             eval {
89 0           $self->connector->connection->do(q{START SLAVE SQL_THREAD});
90 0           1;
91             }
92 0 0         or do {
93 0   0       my $e = $@ // '';
94 0 0         if ($e =~
95             / failed: This operation cannot be performed with a running slave/) {
96 0 0         $self->log->warn(q{start_sql: Was already a running slave}) if $self->log;
97 0           return undef;
98             }
99             else {
100 0 0         $self->log->error("start_sql: $e") if $self->log;
101 0           croak "Failed to start sql_thread: $e";
102             }
103             };
104 0           return $self;
105             }
106              
107             sub sql_thread_waiting {
108 0     0 0   my $self = shift;
109 0           my ($is_waiting, $repl_found, $threads);
110             eval {
111 0           $threads = $self->connector->connection->threads;
112             }
113 0 0         or do {
114 0   0       my $e = $@ // 'unknown failure';
115 0 0         $self->log->error("sql_thread_waiting: $e") if $self->log;
116 0           croak "Problem found trying to check sql_thread\n$e";
117             };
118 0           for my $t (grep {$_->{user} eq 'system user'} @$threads) {
  0            
119 0           ++$repl_found;
120             $is_waiting = 1 if $t->{state}
121 0 0 0       and $t->{state} =~ /^(?:Slave has|Has) read all relay log/;
122             }
123 0 0         croak 'Replication does not appear to be running' unless $repl_found;
124 0           return $is_waiting;
125             }
126              
127             sub sql_thread_aligned {
128 0     0 0   my $self = shift;
129              
130 0           my $status = $self->status;
131 0 0         croak q{Not configured as a slave} unless $status->{master_host};
132              
133             return ($status->{master_log_file} eq $status->{relay_master_log_file}
134 0   0       and $status->{read_master_log_pos} == $status->{exec_master_log_pos});
135             }
136              
137             sub status {
138 0     0 0   my $self = shift;
139 0           my $status;
140             eval {
141 0   0       $status = lc_keys($self->connector->connection->selectrow_hashref(
142             q{SHOW SLAVE STATUS}) || {});
143             }
144 0 0         or do {
145 0   0       my $e = $@ // 'unknown failure';
146 0 0         $self->log->error("status: $e") if $self->log;
147 0           croak 'Failed to get replication status';
148             };
149 0           return $status;
150             }
151              
152             sub stop {
153 0     0 0   my ($self, $safety) = @_;
154 0   0       $safety //= $self->safety;
155             #TODO: Check whether has privs to see system_user threads
156              
157             # max_safety == outside_transaction
158             # medium_safety == trust_transaction
159             # low_safety == time_limited
160             # no_safety == emergency
161 0 0 0       if ($safety eq 'no_safety' or $safety eq 'emergency') {
    0 0        
    0 0        
    0 0        
162 0           $self->stop_io;
163 0           $self->stop_sql;
164             }
165             elsif ($safety eq 'max_safety' or $safety eq 'outside_transaction') {
166 0           $self->stop_io;
167 0 0         if ($self->status->{slave_sql_running} =~ /^No/i) {
168 0           $self->start_sql;
169 0           sleep 1;
170             }
171 0           sleep 1 while not $self->sql_thread_waiting;
172              
173 0           while (not $self->sql_thread_aligned) {
174 0           $self->start_io;
175 0           sleep $self->stop_io_run_time;
176 0           $self->stop_io;
177 0           sleep 1 while not $self->sql_thread_waiting;
178             }
179 0           $self->stop_sql;
180             }
181             elsif ($safety eq 'medium_safety' or $safety eq 'trust_transaction') {
182 0           $self->stop_io;
183 0 0         return $self unless $self->status->{slave_sql_running} =~ /^Yes/i;
184 0           sleep 1 while not $self->sql_thread_waiting;
185 0           $self->stop_sql;
186             }
187             elsif ($safety eq 'low_safety' or $safety eq 'time_limited') {
188 0           $self->stop_io;
189 0 0         return $self unless $self->status->{slave_sql_running} =~ /^Yes/i;
190 0           my $waited = 0;
191 0   0       sleep 1 while not $self->sql_thread_waiting
192             and $waited++ < $self->stop_sql_max_wait;
193 0           $self->stop_sql;
194             }
195             else {
196 0           croak "Unrecognised safety level ($safety)";
197             }
198 0           return $self;
199             }
200              
201             sub io_run_time {
202 0     0 0   my $self = shift;
203 0           my ($thread, undef) = $self->system_threads;
204 0 0         unless (defined $thread) {
205 0           sleep 5;
206 0           ($thread, undef) = $self->system_threads;
207             }
208 0 0         return undef unless defined $thread;
209 0 0         return $thread->{time} < 4_000_000_000 ? $thread->{time} : 0;
210             }
211              
212             sub sql_lag {
213 0     0 0   my $self = shift;
214 0           my (undef, $thread) = $self->system_threads;
215 0 0         unless (defined $thread) {
216 0           sleep 5;
217 0           (undef, $thread) = $self->system_threads;
218             }
219 0 0         return undef unless defined $thread;
220 0 0         return $thread->{time} < 4_000_000_000 ? $thread->{time} : 0;
221             }
222              
223             sub system_threads {
224 0     0 0   my ($self, $threads) = @_;
225 0           my ($io_thread, $sql_thread, @candidates);
226             $_->{user} eq 'system user' and push @candidates, $_
227 0   0       for @{$threads // $self->connector->connection->threads};
  0   0        
228 0 0         return (undef, undef) unless @candidates;
229              
230             # The following assumes there are at most two system threads
231             # Examine first candidate
232 0           my $hint = $self->_system_thread_spotter($candidates[0]);
233 0 0         if ($hint == 1) {
    0          
    0          
234 0           $io_thread = $candidates[0];
235 0 0         $sql_thread = $candidates[1] if @candidates > 1;
236 0           return ($io_thread, $sql_thread);
237             }
238             elsif ($hint == 2) {
239 0           $sql_thread = $candidates[0];
240 0 0         $io_thread = $candidates[1] if @candidates > 1;
241 0           return ($io_thread, $sql_thread);
242             }
243             elsif (@candidates == 1) {
244 0 0         $self->log->error("Failed to identify thread\n". dumper $candidates[0])
245             if $self->log;
246 0           return undef;
247             }
248              
249             # Examine second candidate
250 0           $hint = $self->_system_thread_spotter($candidates[1]);
251 0 0         if ($hint == 1) {
    0          
252 0           return ($io_thread, $sql_thread) = ($candidates[1], $candidates[0]);
253             }
254             elsif ($hint == 2) {
255 0           return ($io_thread, $sql_thread) = @candidates;
256             }
257 0 0         $self->log->error("Failed to identify thread\n". dumper $candidates[1])
258             if $self->log;
259 0           return undef;
260             }
261              
262             sub active_threads {
263 0     0 0   my ($self, $threads) = @_;
264 0   0       $threads //= $self->connector->connection->threads;
265 0           my @candidates;
266             $_->{user} and $_->{user} ne 'system user'
267             and $_->{state} and $_->{state} ne 'Sleep'
268 0   0       and push @candidates, $_ for @$threads;
      0        
      0        
      0        
269 0           return \@candidates;
270             }
271              
272             sub purge_binary_logs {
273 0     0 0   my ($self, $master, $slaves, $keep) = @_;
274 0           for my $c ($master, @$slaves) {
275 0 0 0       croak 'Requires database connectors'
      0        
276             unless defined $c and ref $c and $c->can('connection');
277             }
278              
279             # Check the master
280 0           my $master_dbh = $master->connection;
281 0           my $logs = $master_dbh->selectall_arrayref_hashrefs(
282             q{SHOW MASTER LOGS}
283             );
284 0 0         return 0 unless @$logs;
285              
286             # Check each of the slaves
287 0           my $required = $logs->[-1]{log_name};
288 0           for my $s (@$slaves) {
289 0           my $r = $self->new(connector => $s);
290             my $referenced = $r->status->{master_log_file}
291 0 0         or croak 'Failed to interpret slave status';
292 0 0         $required = $referenced if $referenced lt $required;
293             }
294             # $required is the oldest bin log required
295              
296             # Check it is still available
297 0           my $required_index = -1;
298 0           for (my $i = 0; $i < @$logs; ++$i) {
299 0 0 0       $required_index = $i and last if $logs->[$i]{log_name} eq $required;
300             }
301 0 0         die "Required binary log not available ($required)"
302             unless $required_index >= 0;
303              
304 0           $required_index = $#{$logs} - $keep
305 0 0 0       if $keep and $#{$logs} - $keep < $required_index;
  0            
306              
307 0           die sprintf 'Purging to %s', $logs->[$required_index]{log_name};
308             }
309              
310             sub incr_skip_counter {
311 0     0 0   my ($self, $count) = @_;
312 0   0       $count //= 1;
313 0           return $self->connector->connection->global_var(
314             sql_slave_skip_counter => $count);
315             }
316              
317             sub incr_heartbeat {
318 0   0 0 0   my ($self, $schema, $table, $column, $where) = @_; $where //= '1';
  0            
319 0           $self->connector->connection->do(sprintf
320             q{UPDATE `%s`.`%s`
321             SET `%s` = `%s` + 1
322             WHERE %s},
323             $schema, $table,
324             $column, $column,
325             $where
326             );
327             }
328              
329             sub repair {
330 0     0 0   my ($self, $error, $repair_map) = @_;
331 0           my $dbh = $self->connector->connection;
332             # ...
333 0           return !! $self->start_sql;
334             }
335              
336             sub errored {
337 0     0 0   my ($self, $status) = @_;
338 0   0       $status //= $self->status;
339             return ($status->{last_error} // sprintf 'Error: %u', $status->{last_errno})
340 0 0 0       if not as_bool($status->{slave_sql_running}) and $status->{last_errno};
      0        
341 0           return undef;
342             }
343              
344             sub _system_thread_spotter {
345 0     0     my ($self, $thread) = @_;
346             # 1: $thread is the io_thread
347             # 2: $thread is the sql_thread
348             # 0: failed to determine
349 0 0         return 0 unless ref $thread eq 'HASH';
350 0 0         return 2 if defined $thread->{db};
351 0 0         return 1 if $thread->{state} =~ /^Waiting for master to send event/;
352 0 0         return 2 if $thread->{state} =~ m{
353             ^Waiting\ for\ the\ next\ event
354             | ^Reading\ event\ from\ the\ relay
355             | ^Making\ temp\ file
356             | ^Has\ read\ all\ relay\ log;\ waiting\ for\ the
357             | ^Slave\ has\ read\ all\ relay\ log;\ waiting\ for\ the
358             | freeing\ items
359             }x;
360 0           return 0; # Don't know
361             }
362              
363             1;
364             __END__