| line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
|
1
|
|
|
|
|
|
|
package MogileFS::Worker::Replicate; |
|
2
|
|
|
|
|
|
|
# replicates files around |
|
3
|
|
|
|
|
|
|
|
|
4
|
21
|
|
|
21
|
|
93
|
use strict; |
|
|
21
|
|
|
|
|
32
|
|
|
|
21
|
|
|
|
|
752
|
|
|
5
|
21
|
|
|
21
|
|
89
|
use base 'MogileFS::Worker'; |
|
|
21
|
|
|
|
|
29
|
|
|
|
21
|
|
|
|
|
2435
|
|
|
6
|
|
|
|
|
|
|
use fields ( |
|
7
|
21
|
|
|
|
|
163
|
'fidtodo', # hashref { fid => 1 } |
|
8
|
21
|
|
|
21
|
|
121
|
); |
|
|
21
|
|
|
|
|
36
|
|
|
9
|
|
|
|
|
|
|
|
|
10
|
21
|
|
|
21
|
|
1145
|
use List::Util (); |
|
|
21
|
|
|
|
|
32
|
|
|
|
21
|
|
|
|
|
258
|
|
|
11
|
21
|
|
|
21
|
|
78
|
use MogileFS::Server; |
|
|
21
|
|
|
|
|
31
|
|
|
|
21
|
|
|
|
|
396
|
|
|
12
|
21
|
|
|
21
|
|
77
|
use MogileFS::Util qw(error every debug); |
|
|
21
|
|
|
|
|
31
|
|
|
|
21
|
|
|
|
|
1053
|
|
|
13
|
21
|
|
|
21
|
|
101
|
use MogileFS::Config; |
|
|
21
|
|
|
|
|
29
|
|
|
|
21
|
|
|
|
|
2621
|
|
|
14
|
21
|
|
|
21
|
|
6802
|
use MogileFS::ReplicationRequest qw(rr_upgrade); |
|
|
21
|
|
|
|
|
43
|
|
|
|
21
|
|
|
|
|
1473
|
|
|
15
|
21
|
|
|
21
|
|
108
|
use Digest; |
|
|
21
|
|
|
|
|
34
|
|
|
|
21
|
|
|
|
|
448
|
|
|
16
|
21
|
|
|
21
|
|
10774
|
use MIME::Base64 qw(encode_base64); |
|
|
21
|
|
|
|
|
13290
|
|
|
|
21
|
|
|
|
|
79674
|
|
|
17
|
|
|
|
|
|
|
|
|
18
|
|
|
|
|
|
|
sub new { |
|
19
|
0
|
|
|
0
|
0
|
|
my ($class, $psock) = @_; |
|
20
|
0
|
|
|
|
|
|
my $self = fields::new($class); |
|
21
|
0
|
|
|
|
|
|
$self->SUPER::new($psock); |
|
22
|
0
|
|
|
|
|
|
$self->{fidtodo} = {}; |
|
23
|
0
|
|
|
|
|
|
return $self; |
|
24
|
|
|
|
|
|
|
} |
|
25
|
|
|
|
|
|
|
|
|
26
|
|
|
|
|
|
|
# replicator wants |
|
27
|
0
|
|
|
0
|
0
|
|
sub watchdog_timeout { 90; } |
|
28
|
|
|
|
|
|
|
|
|
29
|
|
|
|
|
|
|
sub work { |
|
30
|
0
|
|
|
0
|
0
|
|
my $self = shift; |
|
31
|
|
|
|
|
|
|
|
|
32
|
|
|
|
|
|
|
every(1.0, sub { |
|
33
|
0
|
|
|
0
|
|
|
$self->send_to_parent("worker_bored 100 replicate rebalance"); |
|
34
|
|
|
|
|
|
|
|
|
35
|
0
|
|
|
|
|
|
my $queue_todo = $self->queue_todo('replicate'); |
|
36
|
0
|
|
|
|
|
|
my $queue_todo2 = $self->queue_todo('rebalance'); |
|
37
|
0
|
0
|
0
|
|
|
|
return unless (@$queue_todo || @$queue_todo2); |
|
38
|
|
|
|
|
|
|
|
|
39
|
0
|
0
|
|
|
|
|
return unless $self->validate_dbh; |
|
40
|
0
|
|
|
|
|
|
my $sto = Mgd::get_store(); |
|
41
|
|
|
|
|
|
|
|
|
42
|
0
|
|
|
|
|
|
while (my $todo = shift @$queue_todo) { |
|
43
|
0
|
|
|
|
|
|
my $fid = $todo->{fid}; |
|
44
|
0
|
|
|
|
|
|
$self->replicate_using_torepl_table($todo); |
|
45
|
|
|
|
|
|
|
} |
|
46
|
0
|
|
|
|
|
|
while (my $todo = shift @$queue_todo2) { |
|
47
|
0
|
|
|
|
|
|
$self->still_alive; |
|
48
|
|
|
|
|
|
|
# deserialize the arg :/ |
|
49
|
0
|
|
|
|
|
|
$todo->{arg} = [split /,/, $todo->{arg}]; |
|
50
|
0
|
|
|
|
|
|
my $devfid = |
|
51
|
|
|
|
|
|
|
MogileFS::DevFID->new($todo->{devid}, $todo->{fid}); |
|
52
|
0
|
|
|
|
|
|
$self->rebalance_devfid($devfid, |
|
53
|
|
|
|
|
|
|
{ target_devids => $todo->{arg} }); |
|
54
|
|
|
|
|
|
|
|
|
55
|
|
|
|
|
|
|
# If files error out, we want to send the error up to syslog |
|
56
|
|
|
|
|
|
|
# and make a real effort to chew through the queue. Users may |
|
57
|
|
|
|
|
|
|
# manually re-run rebalance to retry. |
|
58
|
0
|
|
|
|
|
|
$sto->delete_fid_from_file_to_queue($todo->{fid}, REBAL_QUEUE); |
|
59
|
|
|
|
|
|
|
} |
|
60
|
0
|
|
|
|
|
|
$_[0]->(0); # don't sleep. |
|
61
|
0
|
|
|
|
|
|
}); |
|
62
|
|
|
|
|
|
|
} |
|
63
|
|
|
|
|
|
|
|
|
64
|
|
|
|
|
|
|
# return 1 if we did something (or tried to do something), return 0 if |
|
65
|
|
|
|
|
|
|
# there was nothing to be done. |
|
66
|
|
|
|
|
|
|
sub replicate_using_torepl_table { |
|
67
|
0
|
|
|
0
|
0
|
|
my $self = shift; |
|
68
|
0
|
|
|
|
|
|
my $todo = shift; |
|
69
|
|
|
|
|
|
|
|
|
70
|
|
|
|
|
|
|
# find some fids to replicate, prioritize based on when they should be tried |
|
71
|
0
|
|
|
|
|
|
my $sto = Mgd::get_store(); |
|
72
|
|
|
|
|
|
|
|
|
73
|
0
|
|
|
|
|
|
my $fid = $todo->{fid}; |
|
74
|
0
|
|
|
|
|
|
$self->still_alive; |
|
75
|
|
|
|
|
|
|
|
|
76
|
0
|
|
|
|
|
|
my $errcode; |
|
77
|
|
|
|
|
|
|
|
|
78
|
|
|
|
|
|
|
my %opts; |
|
79
|
0
|
|
|
|
|
|
$opts{errref} = \$errcode; |
|
80
|
0
|
|
|
|
|
|
$opts{no_unlock} = 1; # to make it return an $unlock subref |
|
81
|
0
|
0
|
|
|
|
|
$opts{source_devid} = $todo->{fromdevid} if $todo->{fromdevid}; |
|
82
|
|
|
|
|
|
|
|
|
83
|
0
|
|
|
|
|
|
my ($status, $unlock) = replicate($fid, %opts); |
|
84
|
|
|
|
|
|
|
|
|
85
|
0
|
0
|
|
|
|
|
if ($status) { |
|
86
|
|
|
|
|
|
|
# $status is either 0 (failure, handled below), 1 (success, we actually |
|
87
|
|
|
|
|
|
|
# replicated this file), or 2 (success, but someone else replicated it). |
|
88
|
|
|
|
|
|
|
|
|
89
|
|
|
|
|
|
|
# when $staus eq "lost_race", this delete is unnecessary normally |
|
90
|
|
|
|
|
|
|
# (somebody else presumably already deleted it if they |
|
91
|
|
|
|
|
|
|
# also replicated it), but in the case of running with old |
|
92
|
|
|
|
|
|
|
# replicators from previous versions, -or- simply if the |
|
93
|
|
|
|
|
|
|
# other guy's delete failed, this cleans it up.... |
|
94
|
0
|
|
|
|
|
|
$sto->delete_fid_from_file_to_replicate($fid); |
|
95
|
0
|
0
|
|
|
|
|
$unlock->() if $unlock; |
|
96
|
0
|
|
|
|
|
|
next; |
|
97
|
|
|
|
|
|
|
} |
|
98
|
|
|
|
|
|
|
|
|
99
|
0
|
0
|
|
|
|
|
debug("Replication of fid=$fid failed with errcode=$errcode") if $Mgd::DEBUG >= 2; |
|
100
|
|
|
|
|
|
|
|
|
101
|
|
|
|
|
|
|
# ERROR CASES: |
|
102
|
|
|
|
|
|
|
|
|
103
|
|
|
|
|
|
|
# README: please keep this up to date if you update the replicate() function so we ensure |
|
104
|
|
|
|
|
|
|
# that this code always does the right thing |
|
105
|
|
|
|
|
|
|
# |
|
106
|
|
|
|
|
|
|
# -- HARMLESS -- |
|
107
|
|
|
|
|
|
|
# failed_getting_lock => harmless. skip. somebody else probably doing. |
|
108
|
|
|
|
|
|
|
# |
|
109
|
|
|
|
|
|
|
# -- ACTIONABLE -- |
|
110
|
|
|
|
|
|
|
# too_happy => too many copies, attempt to rebalance. |
|
111
|
|
|
|
|
|
|
# |
|
112
|
|
|
|
|
|
|
# -- TEMPORARY; DO EXPONENTIAL BACKOFF -- |
|
113
|
|
|
|
|
|
|
# source_down => only source available is observed down. |
|
114
|
|
|
|
|
|
|
# policy_error_doing_failed => policy plugin fucked up. it's looping. |
|
115
|
|
|
|
|
|
|
# policy_error_already_there => policy plugin fucked up. it's dumb. |
|
116
|
|
|
|
|
|
|
# policy_no_suggestions => no copy was attempted. policy is just not happy. |
|
117
|
|
|
|
|
|
|
# copy_error => policy said to do 1+ things, we failed, it ran out of suggestions. |
|
118
|
|
|
|
|
|
|
# |
|
119
|
|
|
|
|
|
|
# -- FATAL; DON'T TRY AGAIN -- |
|
120
|
|
|
|
|
|
|
# no_source => it simply exists nowhere. not that something's down, but file_on is empty. |
|
121
|
|
|
|
|
|
|
|
|
122
|
|
|
|
|
|
|
# bail if we failed getting the lock, that means someone else probably |
|
123
|
|
|
|
|
|
|
# already did it, so we should just move on |
|
124
|
0
|
0
|
|
|
|
|
if ($errcode eq 'failed_getting_lock') { |
|
125
|
0
|
0
|
|
|
|
|
$unlock->() if $unlock; |
|
126
|
0
|
|
|
|
|
|
next; |
|
127
|
|
|
|
|
|
|
} |
|
128
|
|
|
|
|
|
|
|
|
129
|
|
|
|
|
|
|
# logic for setting the next try time appropriately |
|
130
|
|
|
|
|
|
|
my $update_nexttry = sub { |
|
131
|
0
|
|
|
0
|
|
|
my ($type, $delay) = @_; |
|
132
|
0
|
|
|
|
|
|
my $sto = Mgd::get_store(); |
|
133
|
0
|
0
|
|
|
|
|
if ($type eq 'end_of_time') { |
|
|
|
0
|
|
|
|
|
|
|
134
|
|
|
|
|
|
|
# special; update to a time that won't happen again, |
|
135
|
|
|
|
|
|
|
# as we've encountered a scenario in which case we're |
|
136
|
|
|
|
|
|
|
# really hosed |
|
137
|
0
|
|
|
|
|
|
$sto->reschedule_file_to_replicate_absolute($fid, $sto->end_of_time); |
|
138
|
|
|
|
|
|
|
} elsif ($type eq "offset") { |
|
139
|
0
|
|
|
|
|
|
$sto->reschedule_file_to_replicate_relative($fid, $delay+0); |
|
140
|
|
|
|
|
|
|
} else { |
|
141
|
0
|
|
|
|
|
|
$sto->reschedule_file_to_replicate_absolute($fid, $delay+0); |
|
142
|
|
|
|
|
|
|
} |
|
143
|
0
|
|
|
|
|
|
}; |
|
144
|
|
|
|
|
|
|
|
|
145
|
|
|
|
|
|
|
# now let's handle any error we want to consider a total failure; do not |
|
146
|
|
|
|
|
|
|
# retry at any point. push this file off to the end so someone has to come |
|
147
|
|
|
|
|
|
|
# along and figure out what went wrong. |
|
148
|
0
|
0
|
|
|
|
|
if ($errcode eq 'no_source') { |
|
149
|
0
|
|
|
|
|
|
$update_nexttry->( end_of_time => 1 ); |
|
150
|
0
|
0
|
|
|
|
|
$unlock->() if $unlock; |
|
151
|
0
|
|
|
|
|
|
next; |
|
152
|
|
|
|
|
|
|
} |
|
153
|
|
|
|
|
|
|
|
|
154
|
|
|
|
|
|
|
# try to shake off extra copies. fall through to the backoff logic |
|
155
|
|
|
|
|
|
|
# so we don't flood if it's impossible to properly weaken the fid. |
|
156
|
|
|
|
|
|
|
# there's a race where the fid could be checked again, but the |
|
157
|
|
|
|
|
|
|
# exclusive locking prevents replication clobbering. |
|
158
|
0
|
0
|
|
|
|
|
if ($errcode eq 'too_happy') { |
|
159
|
0
|
0
|
|
|
|
|
$unlock->() if $unlock; |
|
160
|
0
|
|
|
|
|
|
$unlock = undef; |
|
161
|
0
|
|
|
|
|
|
my $f = MogileFS::FID->new($fid); |
|
162
|
0
|
|
|
|
|
|
my @devs = List::Util::shuffle($f->devids); |
|
163
|
0
|
|
|
|
|
|
my $devfid; |
|
164
|
|
|
|
|
|
|
# First one we can delete from, we try to rebalance away from. |
|
165
|
0
|
|
|
|
|
|
for (@devs) { |
|
166
|
0
|
|
|
|
|
|
my $dev = Mgd::device_factory()->get_by_id($_); |
|
167
|
|
|
|
|
|
|
# Not positive 'should_read_from' needs to be here. |
|
168
|
|
|
|
|
|
|
# We must be able to delete off of this dev so the fid can |
|
169
|
|
|
|
|
|
|
# move. |
|
170
|
0
|
0
|
0
|
|
|
|
if ($dev->can_delete_from && $dev->should_read_from) { |
|
171
|
0
|
|
|
|
|
|
$devfid = MogileFS::DevFID->new($dev, $f); |
|
172
|
0
|
|
|
|
|
|
last; |
|
173
|
|
|
|
|
|
|
} |
|
174
|
|
|
|
|
|
|
} |
|
175
|
0
|
0
|
|
|
|
|
$self->rebalance_devfid($devfid) if $devfid; |
|
176
|
|
|
|
|
|
|
} |
|
177
|
|
|
|
|
|
|
|
|
178
|
|
|
|
|
|
|
# at this point, the rest of the errors require exponential backoff. define what this means |
|
179
|
|
|
|
|
|
|
# as far as failcount -> delay to next try. |
|
180
|
|
|
|
|
|
|
# 15s, 1m, 5m, 30m, 1h, 2h, 4h, 8h, 24h, 24h, 24h, 24h, ... |
|
181
|
0
|
|
|
|
|
|
my @backoff = qw( 15 60 300 1800 3600 7200 14400 28800 ); |
|
182
|
0
|
|
0
|
|
|
|
$update_nexttry->( offset => int(($backoff[$todo->{failcount}] || 86400) * (rand(0.4) + 0.8)) ); |
|
183
|
0
|
0
|
|
|
|
|
$unlock->() if $unlock; |
|
184
|
0
|
|
|
|
|
|
return 1; |
|
185
|
|
|
|
|
|
|
} |
|
186
|
|
|
|
|
|
|
|
|
187
|
|
|
|
|
|
|
# Return 1 on success, 0 on failure. |
|
188
|
|
|
|
|
|
|
sub rebalance_devfid { |
|
189
|
0
|
|
|
0
|
0
|
|
my ($self, $devfid, $opts) = @_; |
|
190
|
0
|
|
0
|
|
|
|
$opts ||= {}; |
|
191
|
0
|
|
|
|
|
|
MogileFS::Util::okay_args($opts, qw(avoid_devids target_devids)); |
|
192
|
|
|
|
|
|
|
|
|
193
|
0
|
|
|
|
|
|
my $fid = $devfid->fid; |
|
194
|
|
|
|
|
|
|
|
|
195
|
|
|
|
|
|
|
# bail out early if this FID is no longer in the namespace (weird |
|
196
|
|
|
|
|
|
|
# case where file is in file_on because not yet deleted, but |
|
197
|
|
|
|
|
|
|
# has been replaced/deleted in 'file' table...). not too harmful |
|
198
|
|
|
|
|
|
|
# (just noisy) if this line didn't exist, but whatever... it |
|
199
|
|
|
|
|
|
|
# makes stuff cleaner on my intentionally-corrupted-for-fsck-testing |
|
200
|
|
|
|
|
|
|
# dev machine... |
|
201
|
0
|
0
|
|
|
|
|
return 1 if ! $fid->exists; |
|
202
|
|
|
|
|
|
|
|
|
203
|
0
|
|
|
|
|
|
my $errcode; |
|
204
|
0
|
|
|
|
|
|
my ($ret, $unlock) = replicate($fid, |
|
205
|
|
|
|
|
|
|
mask_devids => { $devfid->devid => 1 }, |
|
206
|
|
|
|
|
|
|
no_unlock => 1, |
|
207
|
|
|
|
|
|
|
target_devids => $opts->{target_devids}, |
|
208
|
|
|
|
|
|
|
errref => \$errcode, |
|
209
|
|
|
|
|
|
|
); |
|
210
|
|
|
|
|
|
|
|
|
211
|
|
|
|
|
|
|
my $fail = sub { |
|
212
|
0
|
|
|
0
|
|
|
my $error = shift; |
|
213
|
0
|
|
|
|
|
|
$unlock->(); |
|
214
|
0
|
|
|
|
|
|
error("Rebalance for $devfid (" . $devfid->url . ") failed: $error"); |
|
215
|
0
|
|
|
|
|
|
return 0; |
|
216
|
0
|
|
|
|
|
|
}; |
|
217
|
|
|
|
|
|
|
|
|
218
|
0
|
0
|
0
|
|
|
|
unless ($ret || $errcode eq "too_happy") { |
|
219
|
0
|
|
|
|
|
|
return $fail->("Replication failed"); |
|
220
|
|
|
|
|
|
|
} |
|
221
|
|
|
|
|
|
|
|
|
222
|
0
|
|
|
|
|
|
my $should_delete = 0; |
|
223
|
0
|
|
|
|
|
|
my $del_reason; |
|
224
|
|
|
|
|
|
|
|
|
225
|
0
|
0
|
0
|
|
|
|
if ($errcode eq "too_happy" || $ret eq "lost_race") { |
|
|
|
0
|
|
|
|
|
|
|
226
|
|
|
|
|
|
|
# for some reason, we did no work. that could be because |
|
227
|
|
|
|
|
|
|
# either 1) we lost the race, as the error code implies, |
|
228
|
|
|
|
|
|
|
# and some other process rebalanced this first, or 2) |
|
229
|
|
|
|
|
|
|
# the file is over-replicated, and everybody just thinks they |
|
230
|
|
|
|
|
|
|
# lost the race because the replication policy said there's |
|
231
|
|
|
|
|
|
|
# nothing to do, even with this devfid masked away. |
|
232
|
|
|
|
|
|
|
# so let's figure it out... if this devfid still exists, |
|
233
|
|
|
|
|
|
|
# we're over-replicated, else we just lost the race. |
|
234
|
0
|
0
|
|
|
|
|
if ($devfid->exists) { |
|
235
|
|
|
|
|
|
|
# over-replicated |
|
236
|
|
|
|
|
|
|
|
|
237
|
|
|
|
|
|
|
# see if some copy, besides this one we want |
|
238
|
|
|
|
|
|
|
# to delete, is currently alive & of right size.. |
|
239
|
|
|
|
|
|
|
# just as extra paranoid check before we delete it |
|
240
|
0
|
|
|
|
|
|
foreach my $test_df ($fid->devfids) { |
|
241
|
0
|
0
|
|
|
|
|
next if $test_df->devid == $devfid->devid; |
|
242
|
0
|
0
|
|
|
|
|
if ($test_df->size_matches) { |
|
243
|
0
|
|
|
|
|
|
$should_delete = 1; |
|
244
|
0
|
|
|
|
|
|
$del_reason = "over_replicated"; |
|
245
|
0
|
|
|
|
|
|
last; |
|
246
|
|
|
|
|
|
|
} |
|
247
|
|
|
|
|
|
|
} |
|
248
|
|
|
|
|
|
|
} else { |
|
249
|
|
|
|
|
|
|
# lost race |
|
250
|
0
|
|
|
|
|
|
$should_delete = 0; # no-op |
|
251
|
|
|
|
|
|
|
} |
|
252
|
|
|
|
|
|
|
} elsif ($ret eq "would_worsen") { |
|
253
|
|
|
|
|
|
|
# replication has indicated we would be making ruining this fid's day |
|
254
|
|
|
|
|
|
|
# if we delete an existing copy, so lets not do that. |
|
255
|
|
|
|
|
|
|
# this indicates a condition where there're no suitable devices to |
|
256
|
|
|
|
|
|
|
# copy new data onto, so lets be loud about it. |
|
257
|
0
|
|
|
|
|
|
return $fail->("no suitable destination devices available"); |
|
258
|
|
|
|
|
|
|
} else { |
|
259
|
0
|
|
|
|
|
|
$should_delete = 1; |
|
260
|
0
|
|
|
|
|
|
$del_reason = "did_rebalance;ret=$ret"; |
|
261
|
|
|
|
|
|
|
} |
|
262
|
|
|
|
|
|
|
|
|
263
|
0
|
|
|
|
|
|
my %destroy_opts; |
|
264
|
|
|
|
|
|
|
|
|
265
|
0
|
0
|
|
|
|
|
$destroy_opts{ignore_missing} = 1 |
|
266
|
|
|
|
|
|
|
if MogileFS::Config->config("rebalance_ignore_missing"); |
|
267
|
|
|
|
|
|
|
|
|
268
|
0
|
0
|
|
|
|
|
if ($should_delete) { |
|
269
|
0
|
|
|
|
|
|
eval { $devfid->destroy(%destroy_opts) }; |
|
|
0
|
|
|
|
|
|
|
|
270
|
0
|
0
|
|
|
|
|
if ($@) { |
|
271
|
0
|
|
|
|
|
|
return $fail->("HTTP delete (due to '$del_reason') failed: $@"); |
|
272
|
|
|
|
|
|
|
} |
|
273
|
|
|
|
|
|
|
} |
|
274
|
|
|
|
|
|
|
|
|
275
|
0
|
|
|
|
|
|
$unlock->(); |
|
276
|
0
|
|
|
|
|
|
return 1; |
|
277
|
|
|
|
|
|
|
} |
|
278
|
|
|
|
|
|
|
|
|
279
|
|
|
|
|
|
|
# replicates $fid to make sure it meets its class' replicate policy. |
|
280
|
|
|
|
|
|
|
# |
|
281
|
|
|
|
|
|
|
# README: if you update this sub to return a new error code, please update the |
|
282
|
|
|
|
|
|
|
# appropriate callers to know how to deal with the errors returned. |
|
283
|
|
|
|
|
|
|
# |
|
284
|
|
|
|
|
|
|
# returns either: |
|
285
|
|
|
|
|
|
|
# $rv |
|
286
|
|
|
|
|
|
|
# ($rv, $unlock_sub) -- when 'no_unlock' %opt is used. subref to release lock. |
|
287
|
|
|
|
|
|
|
# $rv is one of: |
|
288
|
|
|
|
|
|
|
# 0 = failure (failure written to ${$opts{errref}}) |
|
289
|
|
|
|
|
|
|
# 1 = success |
|
290
|
|
|
|
|
|
|
# "lost_race" = skipping, we did no work and policy was already met. |
|
291
|
|
|
|
|
|
|
# "nofid" => fid no longer exists. skip replication. |
|
292
|
|
|
|
|
|
|
sub replicate { |
|
293
|
0
|
|
|
0
|
0
|
|
my ($fid, %opts) = @_; |
|
294
|
0
|
0
|
|
|
|
|
$fid = MogileFS::FID->new($fid) unless ref $fid; |
|
295
|
0
|
|
|
|
|
|
my $fidid = $fid->id; |
|
296
|
|
|
|
|
|
|
|
|
297
|
0
|
0
|
|
|
|
|
debug("Replication for $fidid called, opts=".join(',',keys(%opts))) if $Mgd::DEBUG >= 2; |
|
298
|
|
|
|
|
|
|
|
|
299
|
0
|
|
|
|
|
|
my $errref = delete $opts{'errref'}; |
|
300
|
0
|
|
|
|
|
|
my $no_unlock = delete $opts{'no_unlock'}; |
|
301
|
0
|
|
|
|
|
|
my $fixed_source = delete $opts{'source_devid'}; |
|
302
|
0
|
|
0
|
|
|
|
my $mask_devids = delete $opts{'mask_devids'} || {}; |
|
303
|
0
|
|
0
|
|
|
|
my $avoid_devids = delete $opts{'avoid_devids'} || {}; |
|
304
|
0
|
|
0
|
|
|
|
my $target_devids = delete $opts{'target_devids'} || []; # inverse of avoid_devids. |
|
305
|
0
|
0
|
|
|
|
|
die "unknown_opts" if %opts; |
|
306
|
0
|
0
|
|
|
|
|
die unless ref $mask_devids eq "HASH"; |
|
307
|
|
|
|
|
|
|
|
|
308
|
0
|
|
|
|
|
|
my $sdevid; |
|
309
|
|
|
|
|
|
|
|
|
310
|
0
|
|
|
|
|
|
my $sto = Mgd::get_store(); |
|
311
|
|
|
|
|
|
|
my $unlock = sub { |
|
312
|
0
|
|
|
0
|
|
|
$sto->note_done_replicating($fidid); |
|
313
|
0
|
|
|
|
|
|
}; |
|
314
|
|
|
|
|
|
|
|
|
315
|
|
|
|
|
|
|
my $retunlock = sub { |
|
316
|
0
|
|
|
0
|
|
|
my $rv = shift; |
|
317
|
0
|
|
|
|
|
|
my ($errmsg, $errcode); |
|
318
|
0
|
0
|
|
|
|
|
if (@_ == 2) { |
|
319
|
0
|
|
|
|
|
|
($errcode, $errmsg) = @_; |
|
320
|
0
|
|
|
|
|
|
$errmsg = "$errcode: $errmsg"; # include code with message |
|
321
|
|
|
|
|
|
|
} else { |
|
322
|
0
|
|
|
|
|
|
($errmsg) = @_; |
|
323
|
|
|
|
|
|
|
} |
|
324
|
0
|
0
|
|
|
|
|
$$errref = $errcode if $errref; |
|
325
|
|
|
|
|
|
|
|
|
326
|
0
|
|
|
|
|
|
my $ret; |
|
327
|
0
|
0
|
0
|
|
|
|
if ($errcode && $errcode eq "failed_getting_lock") { |
|
328
|
|
|
|
|
|
|
# don't emit a warning with error() on lock failure. not |
|
329
|
|
|
|
|
|
|
# a big deal, don't scare people. |
|
330
|
0
|
|
|
|
|
|
$ret = 0; |
|
331
|
|
|
|
|
|
|
} else { |
|
332
|
0
|
0
|
|
|
|
|
$ret = $rv ? $rv : error($errmsg); |
|
333
|
|
|
|
|
|
|
} |
|
334
|
0
|
0
|
|
|
|
|
if ($no_unlock) { |
|
335
|
0
|
0
|
|
|
|
|
die "ERROR: must be called in list context w/ no_unlock" unless wantarray; |
|
336
|
0
|
|
|
|
|
|
return ($ret, $unlock); |
|
337
|
|
|
|
|
|
|
} else { |
|
338
|
0
|
0
|
|
|
|
|
die "ERROR: must not be called in list context w/o no_unlock" if wantarray; |
|
339
|
0
|
|
|
|
|
|
$unlock->(); |
|
340
|
0
|
|
|
|
|
|
return $ret; |
|
341
|
|
|
|
|
|
|
} |
|
342
|
0
|
|
|
|
|
|
}; |
|
343
|
|
|
|
|
|
|
|
|
344
|
|
|
|
|
|
|
# hashref of devid -> MogileFS::Device |
|
345
|
0
|
0
|
|
|
|
|
my $devs = Mgd::device_factory()->map_by_id |
|
346
|
|
|
|
|
|
|
or die "No device map"; |
|
347
|
|
|
|
|
|
|
|
|
348
|
0
|
0
|
|
|
|
|
return $retunlock->(0, "failed_getting_lock", "Unable to obtain lock for fid $fidid") |
|
349
|
|
|
|
|
|
|
unless $sto->should_begin_replicating_fidid($fidid); |
|
350
|
|
|
|
|
|
|
|
|
351
|
|
|
|
|
|
|
# if the fid doesn't even exist, consider our job done! no point |
|
352
|
|
|
|
|
|
|
# replicating file contents of a file no longer in the namespace. |
|
353
|
0
|
0
|
|
|
|
|
return $retunlock->("nofid") unless $fid->exists; |
|
354
|
|
|
|
|
|
|
|
|
355
|
0
|
|
|
|
|
|
my $cls = $fid->class; |
|
356
|
0
|
|
|
|
|
|
my $polobj = $cls->repl_policy_obj; |
|
357
|
|
|
|
|
|
|
|
|
358
|
|
|
|
|
|
|
# learn what this devices file is already on |
|
359
|
0
|
|
|
|
|
|
my @on_devs; # all devices fid is on, reachable or not. |
|
360
|
|
|
|
|
|
|
my @on_devs_tellpol; # subset of @on_devs, to tell the policy class about |
|
361
|
0
|
|
|
|
|
|
my @on_up_devid; # subset of @on_devs: just devs that are readable |
|
362
|
|
|
|
|
|
|
|
|
363
|
0
|
|
|
|
|
|
foreach my $devid ($fid->devids) { |
|
364
|
0
|
0
|
|
|
|
|
my $d = Mgd::device_factory()->get_by_id($devid) |
|
365
|
|
|
|
|
|
|
or next; |
|
366
|
0
|
|
|
|
|
|
push @on_devs, $d; |
|
367
|
0
|
0
|
0
|
|
|
|
if ($d->dstate->should_have_files && ! $mask_devids->{$devid}) { |
|
368
|
0
|
|
|
|
|
|
push @on_devs_tellpol, $d; |
|
369
|
|
|
|
|
|
|
} |
|
370
|
0
|
0
|
|
|
|
|
if ($d->should_read_from) { |
|
371
|
0
|
|
|
|
|
|
push @on_up_devid, $devid; |
|
372
|
|
|
|
|
|
|
} |
|
373
|
|
|
|
|
|
|
} |
|
374
|
|
|
|
|
|
|
|
|
375
|
0
|
0
|
|
|
|
|
return $retunlock->(0, "no_source", "Source is no longer available replicating $fidid") if @on_devs == 0; |
|
376
|
0
|
0
|
|
|
|
|
return $retunlock->(0, "source_down", "No alive devices available replicating $fidid") if @on_up_devid == 0; |
|
377
|
|
|
|
|
|
|
|
|
378
|
0
|
0
|
0
|
|
|
|
if ($fixed_source && ! grep { $_ == $fixed_source } @on_up_devid) { |
|
|
0
|
|
|
|
|
|
|
|
379
|
0
|
|
|
|
|
|
error("Fixed source dev$fixed_source requested for $fidid but not available. Trying other devices"); |
|
380
|
|
|
|
|
|
|
} |
|
381
|
|
|
|
|
|
|
|
|
382
|
0
|
|
|
|
|
|
my %dest_failed; # devid -> 1 for each devid we were asked to copy to, but failed. |
|
383
|
|
|
|
|
|
|
my %source_failed; # devid -> 1 for each devid we had problems reading from. |
|
384
|
0
|
|
|
|
|
|
my $got_copy_request = 0; # true once replication policy asks us to move something somewhere |
|
385
|
0
|
|
|
|
|
|
my $copy_err; |
|
386
|
|
|
|
|
|
|
|
|
387
|
0
|
|
|
|
|
|
my $dest_devs = $devs; |
|
388
|
0
|
0
|
|
|
|
|
if (@$target_devids) { |
|
389
|
0
|
|
|
|
|
|
$dest_devs = {map { $_ => $devs->{$_} } @$target_devids}; |
|
|
0
|
|
|
|
|
|
|
|
390
|
|
|
|
|
|
|
} |
|
391
|
|
|
|
|
|
|
|
|
392
|
0
|
|
|
|
|
|
my $rr; # MogileFS::ReplicationRequest |
|
393
|
0
|
|
|
|
|
|
while (1) { |
|
394
|
0
|
|
|
|
|
|
$rr = rr_upgrade($polobj->replicate_to( |
|
395
|
|
|
|
|
|
|
fid => $fidid, |
|
396
|
|
|
|
|
|
|
on_devs => \@on_devs_tellpol, # all device objects fid is on, dead or otherwise |
|
397
|
|
|
|
|
|
|
all_devs => $dest_devs, |
|
398
|
|
|
|
|
|
|
failed => \%dest_failed, |
|
399
|
|
|
|
|
|
|
min => $cls->mindevcount, |
|
400
|
|
|
|
|
|
|
)); |
|
401
|
|
|
|
|
|
|
|
|
402
|
0
|
0
|
|
|
|
|
last if $rr->is_happy; |
|
403
|
|
|
|
|
|
|
|
|
404
|
0
|
|
|
|
|
|
my @ddevs; # dest devs, in order of preference |
|
405
|
|
|
|
|
|
|
my $ddevid; # dest devid we've chosen to copy to |
|
406
|
0
|
0
|
|
|
|
|
if (@ddevs = $rr->copy_to_one_of_ideally) { |
|
|
|
0
|
|
|
|
|
|
|
407
|
0
|
0
|
0
|
|
|
|
if (my @not_masked_ids = (grep { ! $mask_devids->{$_} && |
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
408
|
|
|
|
|
|
|
! $avoid_devids->{$_} |
|
409
|
|
|
|
|
|
|
} |
|
410
|
|
|
|
|
|
|
map { $_->id } @ddevs)) { |
|
411
|
0
|
|
|
|
|
|
$ddevid = $not_masked_ids[0]; |
|
412
|
|
|
|
|
|
|
} else { |
|
413
|
|
|
|
|
|
|
# once we masked devids away, there were no |
|
414
|
|
|
|
|
|
|
# ideal suggestions. this is the case of rebalancing, |
|
415
|
|
|
|
|
|
|
# which without this check could 'worsen' the state |
|
416
|
|
|
|
|
|
|
# of the world. consider the case: |
|
417
|
|
|
|
|
|
|
# h1[ d1 d2 ] h2[ d3 ] |
|
418
|
|
|
|
|
|
|
# and files are on d1 & d3, an ideal layout. |
|
419
|
|
|
|
|
|
|
# if d3 is being rebalanced, and masked away, the |
|
420
|
|
|
|
|
|
|
# replication policy could presumably say to put |
|
421
|
|
|
|
|
|
|
# the file on d2, even though d3 isn't dead. |
|
422
|
|
|
|
|
|
|
# so instead, when masking is in effect, we don't |
|
423
|
|
|
|
|
|
|
# use non-ideal placement, just bailing out. |
|
424
|
|
|
|
|
|
|
|
|
425
|
|
|
|
|
|
|
# this used to return "lost_race" as a lie, but rebalance was |
|
426
|
|
|
|
|
|
|
# happily deleting the masked fid if at least one other fid |
|
427
|
|
|
|
|
|
|
# existed... because it assumed it was over replicated. |
|
428
|
|
|
|
|
|
|
# now we tell rebalance that touching this fid would be |
|
429
|
|
|
|
|
|
|
# stupid. |
|
430
|
0
|
|
|
|
|
|
return $retunlock->("would_worsen"); |
|
431
|
|
|
|
|
|
|
} |
|
432
|
|
|
|
|
|
|
} elsif (@ddevs = $rr->copy_to_one_of_desperate) { |
|
433
|
|
|
|
|
|
|
# TODO: reschedule a replication for 'n' minutes in future, or |
|
434
|
|
|
|
|
|
|
# when new hosts/devices become available or change state |
|
435
|
0
|
|
|
|
|
|
$ddevid = $ddevs[0]->id; |
|
436
|
|
|
|
|
|
|
} else { |
|
437
|
0
|
|
|
|
|
|
last; |
|
438
|
|
|
|
|
|
|
} |
|
439
|
|
|
|
|
|
|
|
|
440
|
0
|
|
|
|
|
|
$got_copy_request = 1; |
|
441
|
|
|
|
|
|
|
|
|
442
|
|
|
|
|
|
|
# replication policy shouldn't tell us to put a file on a device |
|
443
|
|
|
|
|
|
|
# we've already told it that we've failed at. so if we get that response, |
|
444
|
|
|
|
|
|
|
# the policy plugin is broken and we should terminate now. |
|
445
|
0
|
0
|
|
|
|
|
if ($dest_failed{$ddevid}) { |
|
446
|
0
|
|
|
|
|
|
return $retunlock->(0, "policy_error_doing_failed", |
|
447
|
|
|
|
|
|
|
"replication policy told us to do something we already told it we failed at while replicating fid $fidid"); |
|
448
|
|
|
|
|
|
|
} |
|
449
|
|
|
|
|
|
|
|
|
450
|
|
|
|
|
|
|
# replication policy shouldn't tell us to put a file on a |
|
451
|
|
|
|
|
|
|
# device that it's already on. that's just stupid. |
|
452
|
0
|
0
|
|
|
|
|
if (grep { $_->id == $ddevid } @on_devs) { |
|
|
0
|
|
|
|
|
|
|
|
453
|
0
|
|
|
|
|
|
return $retunlock->(0, "policy_error_already_there", |
|
454
|
|
|
|
|
|
|
"replication policy told us to put fid $fidid on dev $ddevid, but it's already there!"); |
|
455
|
|
|
|
|
|
|
} |
|
456
|
|
|
|
|
|
|
|
|
457
|
|
|
|
|
|
|
# find where we're replicating from |
|
458
|
|
|
|
|
|
|
{ |
|
459
|
|
|
|
|
|
|
# TODO: use an observed good device+host as source to start. |
|
460
|
0
|
|
|
|
|
|
my @choices = grep { ! $source_failed{$_} } @on_up_devid; |
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
461
|
0
|
0
|
|
|
|
|
return $retunlock->(0, "source_down", "No devices available replicating $fidid") unless @choices; |
|
462
|
0
|
0
|
0
|
|
|
|
if ($fixed_source && grep { $_ == $fixed_source } @choices) { |
|
|
0
|
|
|
|
|
|
|
|
463
|
0
|
|
|
|
|
|
$sdevid = $fixed_source; |
|
464
|
|
|
|
|
|
|
} else { |
|
465
|
0
|
|
|
|
|
|
@choices = List::Util::shuffle(@choices); |
|
466
|
0
|
|
|
|
|
|
MogileFS::run_global_hook('replicate_order_final_choices', $devs, \@choices); |
|
467
|
0
|
|
|
|
|
|
$sdevid = shift @choices; |
|
468
|
|
|
|
|
|
|
} |
|
469
|
|
|
|
|
|
|
} |
|
470
|
|
|
|
|
|
|
|
|
471
|
0
|
0
|
|
|
|
|
my $worker = MogileFS::ProcManager->is_child or die; |
|
472
|
0
|
|
|
|
|
|
my $digest; |
|
473
|
0
|
|
|
|
|
|
my $fid_checksum = $fid->checksum; |
|
474
|
0
|
0
|
|
|
|
|
$digest = Digest->new($fid_checksum->hashname) if $fid_checksum; |
|
475
|
0
|
0
|
0
|
|
|
|
$digest ||= Digest->new($cls->hashname) if $cls->hashtype; |
|
476
|
|
|
|
|
|
|
|
|
477
|
|
|
|
|
|
|
my $rv = http_copy( |
|
478
|
|
|
|
|
|
|
sdevid => $sdevid, |
|
479
|
|
|
|
|
|
|
ddevid => $ddevid, |
|
480
|
|
|
|
|
|
|
fid => $fid, |
|
481
|
|
|
|
|
|
|
errref => \$copy_err, |
|
482
|
0
|
|
|
0
|
|
|
callback => sub { $worker->still_alive; }, |
|
483
|
0
|
|
|
|
|
|
digest => $digest, |
|
484
|
|
|
|
|
|
|
); |
|
485
|
0
|
0
|
0
|
|
|
|
die "Bogus error code: $copy_err" if !$rv && $copy_err !~ /^(?:src|dest)_error$/; |
|
486
|
|
|
|
|
|
|
|
|
487
|
0
|
0
|
|
|
|
|
unless ($rv) { |
|
488
|
0
|
|
|
|
|
|
error("Failed copying fid $fidid from devid $sdevid to devid $ddevid (error type: $copy_err)"); |
|
489
|
0
|
0
|
|
|
|
|
if ($copy_err eq "src_error") { |
|
490
|
0
|
|
|
|
|
|
$source_failed{$sdevid} = 1; |
|
491
|
|
|
|
|
|
|
|
|
492
|
0
|
0
|
0
|
|
|
|
if ($fixed_source && $fixed_source == $sdevid) { |
|
493
|
0
|
|
|
|
|
|
error("Fixed source dev$fixed_source was requested for $fidid but failed: will try other sources"); |
|
494
|
|
|
|
|
|
|
} |
|
495
|
|
|
|
|
|
|
|
|
496
|
|
|
|
|
|
|
} else { |
|
497
|
0
|
|
|
|
|
|
$dest_failed{$ddevid} = 1; |
|
498
|
|
|
|
|
|
|
} |
|
499
|
0
|
|
|
|
|
|
next; |
|
500
|
|
|
|
|
|
|
} |
|
501
|
|
|
|
|
|
|
|
|
502
|
0
|
|
|
|
|
|
my $dfid = MogileFS::DevFID->new($ddevid, $fid); |
|
503
|
0
|
|
|
|
|
|
$dfid->add_to_db; |
|
504
|
0
|
0
|
0
|
|
|
|
if ($digest && !$fid->checksum) { |
|
505
|
0
|
|
|
|
|
|
$sto->set_checksum($fidid, $cls->hashtype, $digest->digest); |
|
506
|
|
|
|
|
|
|
} |
|
507
|
|
|
|
|
|
|
|
|
508
|
0
|
|
|
|
|
|
push @on_devs, $devs->{$ddevid}; |
|
509
|
0
|
|
|
|
|
|
push @on_devs_tellpol, $devs->{$ddevid}; |
|
510
|
0
|
|
|
|
|
|
push @on_up_devid, $ddevid; |
|
511
|
|
|
|
|
|
|
} |
|
512
|
|
|
|
|
|
|
|
|
513
|
|
|
|
|
|
|
# We are over replicated. Let caller decide if it should rebalance. |
|
514
|
0
|
0
|
|
|
|
|
if ($rr->too_happy) { |
|
515
|
0
|
|
|
|
|
|
return $retunlock->(0, "too_happy", "fid $fidid is on too many devices"); |
|
516
|
|
|
|
|
|
|
} |
|
517
|
|
|
|
|
|
|
|
|
518
|
0
|
0
|
|
|
|
|
if ($rr->is_happy) { |
|
519
|
0
|
0
|
|
|
|
|
return $retunlock->(1) if $got_copy_request; |
|
520
|
0
|
|
|
|
|
|
return $retunlock->("lost_race"); # some other process got to it first. policy was happy immediately. |
|
521
|
|
|
|
|
|
|
} |
|
522
|
|
|
|
|
|
|
|
|
523
|
0
|
|
|
|
|
|
return $retunlock->(0, "policy_no_suggestions", |
|
524
|
|
|
|
|
|
|
"replication policy ran out of suggestions for us replicating fid $fidid"); |
|
525
|
|
|
|
|
|
|
} |
|
526
|
|
|
|
|
|
|
|
|
527
|
|
|
|
|
|
|
# Returns a hashref with the following: |
|
528
|
|
|
|
|
|
|
# { |
|
529
|
|
|
|
|
|
|
# code => HTTP status code integer, |
|
530
|
|
|
|
|
|
|
# keep => boolean, whether to keep the connection after reading |
|
531
|
|
|
|
|
|
|
# len => value of the Content-Length header (integer) |
|
532
|
|
|
|
|
|
|
# } |
|
533
|
|
|
|
|
|
|
sub read_headers { |
|
534
|
0
|
|
|
0
|
0
|
|
my ($sock) = @_; |
|
535
|
0
|
|
|
|
|
|
my %rv = (); |
|
536
|
|
|
|
|
|
|
# FIXME: this can block. needs to timeout. |
|
537
|
0
|
|
|
|
|
|
my $line = <$sock>; |
|
538
|
0
|
0
|
|
|
|
|
return unless defined $line; |
|
539
|
0
|
0
|
|
|
|
|
$line =~ m!\AHTTP/(\d+\.\d+)\s+(\d+)! or return; |
|
540
|
0
|
|
|
|
|
|
$rv{keep} = $1 >= 1.1; |
|
541
|
0
|
|
|
|
|
|
$rv{code} = $2; |
|
542
|
|
|
|
|
|
|
|
|
543
|
0
|
|
|
|
|
|
while (1) { |
|
544
|
0
|
|
|
|
|
|
$line = <$sock>; |
|
545
|
0
|
0
|
|
|
|
|
return unless defined $line; |
|
546
|
0
|
0
|
|
|
|
|
last if $line =~ /\A\r?\n\z/; |
|
547
|
0
|
0
|
|
|
|
|
if ($line =~ /\AConnection:\s*keep-alive\s*\z/is) { |
|
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
548
|
0
|
|
|
|
|
|
$rv{keep} = 1; |
|
549
|
|
|
|
|
|
|
} elsif ($line =~ /\AConnection:\s*close\s*\z/is) { |
|
550
|
0
|
|
|
|
|
|
$rv{keep} = 0; |
|
551
|
|
|
|
|
|
|
} elsif ($line =~ /\AContent-Length:\s*(\d+)\s*\z/is) { |
|
552
|
0
|
|
|
|
|
|
$rv{len} = $1; |
|
553
|
|
|
|
|
|
|
} |
|
554
|
|
|
|
|
|
|
} |
|
555
|
0
|
|
|
|
|
|
return \%rv; |
|
556
|
|
|
|
|
|
|
} |
|
557
|
|
|
|
|
|
|
|
|
558
|
|
|
|
|
|
|
# copies a file from one Perlbal to another utilizing HTTP |
|
559
|
|
|
|
|
|
|
sub http_copy { |
|
560
|
0
|
|
|
0
|
0
|
|
my %opts = @_; |
|
561
|
0
|
|
|
|
|
|
my ($sdevid, $ddevid, $fid, $intercopy_cb, $errref, $digest) = |
|
562
|
0
|
|
|
|
|
|
map { delete $opts{$_} } qw(sdevid |
|
563
|
|
|
|
|
|
|
ddevid |
|
564
|
|
|
|
|
|
|
fid |
|
565
|
|
|
|
|
|
|
callback |
|
566
|
|
|
|
|
|
|
errref |
|
567
|
|
|
|
|
|
|
digest |
|
568
|
|
|
|
|
|
|
); |
|
569
|
0
|
0
|
|
|
|
|
die if %opts; |
|
570
|
|
|
|
|
|
|
|
|
571
|
0
|
0
|
|
|
|
|
$fid = MogileFS::FID->new($fid) unless ref($fid); |
|
572
|
0
|
|
|
|
|
|
my $fidid = $fid->id; |
|
573
|
0
|
|
|
|
|
|
my $expected_clen = $fid->length; |
|
574
|
0
|
|
|
|
|
|
my $clen; |
|
575
|
0
|
|
|
|
|
|
my $content_md5 = ''; |
|
576
|
0
|
|
|
|
|
|
my ($sconn, $dconn); |
|
577
|
0
|
|
|
|
|
|
my $fid_checksum = $fid->checksum; |
|
578
|
0
|
0
|
0
|
|
|
|
if ($fid_checksum && $fid_checksum->hashname eq "MD5") { |
|
579
|
|
|
|
|
|
|
# some HTTP servers may be able to verify Content-MD5 on PUT |
|
580
|
|
|
|
|
|
|
# and reject corrupted requests. no HTTP server should reject |
|
581
|
|
|
|
|
|
|
# a request for an unrecognized header |
|
582
|
0
|
|
|
|
|
|
my $b64digest = encode_base64($fid_checksum->{checksum}, ""); |
|
583
|
0
|
|
|
|
|
|
$content_md5 = "\r\nContent-MD5: $b64digest"; |
|
584
|
|
|
|
|
|
|
} |
|
585
|
|
|
|
|
|
|
|
|
586
|
0
|
|
0
|
0
|
|
|
$intercopy_cb ||= sub {}; |
|
|
0
|
|
|
|
|
|
|
|
587
|
|
|
|
|
|
|
|
|
588
|
|
|
|
|
|
|
my $err_common = sub { |
|
589
|
0
|
|
|
0
|
|
|
my ($err, $msg) = @_; |
|
590
|
0
|
0
|
|
|
|
|
$$errref = $err if $errref; |
|
591
|
0
|
0
|
|
|
|
|
$sconn->close($err) if $sconn; |
|
592
|
0
|
0
|
|
|
|
|
$dconn->close($err) if $dconn; |
|
593
|
0
|
|
|
|
|
|
return error($msg); |
|
594
|
0
|
|
|
|
|
|
}; |
|
595
|
|
|
|
|
|
|
|
|
596
|
|
|
|
|
|
|
# handles setting unreachable magic; $error->(reachability, "message") |
|
597
|
|
|
|
|
|
|
my $error_unreachable = sub { |
|
598
|
0
|
|
|
0
|
|
|
return $err_common->("src_error", "Fid $fidid unreachable while replicating: $_[0]"); |
|
599
|
0
|
|
|
|
|
|
}; |
|
600
|
|
|
|
|
|
|
|
|
601
|
|
|
|
|
|
|
my $dest_error = sub { |
|
602
|
0
|
|
|
0
|
|
|
return $err_common->("dest_error", $_[0]); |
|
603
|
0
|
|
|
|
|
|
}; |
|
604
|
|
|
|
|
|
|
|
|
605
|
|
|
|
|
|
|
my $src_error = sub { |
|
606
|
0
|
|
|
0
|
|
|
return $err_common->("src_error", $_[0]); |
|
607
|
0
|
|
|
|
|
|
}; |
|
608
|
|
|
|
|
|
|
|
|
609
|
|
|
|
|
|
|
# get some information we'll need |
|
610
|
0
|
|
|
|
|
|
my $sdev = Mgd::device_factory()->get_by_id($sdevid); |
|
611
|
0
|
|
|
|
|
|
my $ddev = Mgd::device_factory()->get_by_id($ddevid); |
|
612
|
|
|
|
|
|
|
|
|
613
|
0
|
0
|
0
|
|
|
|
return error("Error: unable to get device information: source=$sdevid, destination=$ddevid, fid=$fidid") |
|
614
|
|
|
|
|
|
|
unless $sdev && $ddev; |
|
615
|
|
|
|
|
|
|
|
|
616
|
0
|
|
|
|
|
|
my $s_dfid = MogileFS::DevFID->new($sdev, $fid); |
|
617
|
0
|
|
|
|
|
|
my $d_dfid = MogileFS::DevFID->new($ddev, $fid); |
|
618
|
|
|
|
|
|
|
|
|
619
|
0
|
|
|
|
|
|
my ($spath, $dpath) = (map { $_->uri_path } ($s_dfid, $d_dfid)); |
|
|
0
|
|
|
|
|
|
|
|
620
|
0
|
|
|
|
|
|
my ($shost, $dhost) = (map { $_->host } ($sdev, $ddev)); |
|
|
0
|
|
|
|
|
|
|
|
621
|
|
|
|
|
|
|
|
|
622
|
0
|
|
|
|
|
|
my ($shostip, $sport) = ($shost->ip, $shost->http_port); |
|
623
|
0
|
0
|
|
|
|
|
if (MogileFS::Config->config("repl_use_get_port")) { |
|
624
|
0
|
|
|
|
|
|
$sport = $shost->http_get_port; |
|
625
|
|
|
|
|
|
|
} |
|
626
|
0
|
|
|
|
|
|
my ($dhostip, $dport) = ($dhost->ip, $dhost->http_port); |
|
627
|
0
|
0
|
0
|
|
|
|
unless (defined $spath && defined $dpath && defined $shostip && defined $dhostip && $sport && $dport) { |
|
|
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
628
|
|
|
|
|
|
|
# show detailed information to find out what's not configured right |
|
629
|
0
|
|
|
|
|
|
error("Error: unable to replicate file fid=$fidid from device id $sdevid to device id $ddevid"); |
|
630
|
0
|
|
|
|
|
|
error(" http://$shostip:$sport$spath -> http://$dhostip:$dport$dpath"); |
|
631
|
0
|
|
|
|
|
|
return 0; |
|
632
|
|
|
|
|
|
|
} |
|
633
|
|
|
|
|
|
|
|
|
634
|
0
|
|
|
|
|
|
my $put = "PUT $dpath HTTP/1.0\r\nConnection: keep-alive\r\n" . |
|
635
|
|
|
|
|
|
|
"Content-length: $expected_clen$content_md5\r\n\r\n"; |
|
636
|
|
|
|
|
|
|
|
|
637
|
|
|
|
|
|
|
# need by webdav servers, like lighttpd... |
|
638
|
0
|
|
|
|
|
|
$ddev->vivify_directories($d_dfid->url); |
|
639
|
|
|
|
|
|
|
|
|
640
|
|
|
|
|
|
|
# call a hook for odd casing completely different source data |
|
641
|
|
|
|
|
|
|
# for specific files. |
|
642
|
0
|
|
|
|
|
|
my $shttphost; |
|
643
|
0
|
|
|
|
|
|
MogileFS::run_global_hook('replicate_alternate_source', |
|
644
|
|
|
|
|
|
|
$fid, \$shostip, \$sport, \$spath, \$shttphost); |
|
645
|
|
|
|
|
|
|
|
|
646
|
0
|
|
|
|
|
|
my $durl = "http://$dhostip:$dport$dpath"; |
|
647
|
0
|
|
|
|
|
|
my $surl = "http://$shostip:$sport$spath"; |
|
648
|
|
|
|
|
|
|
# okay, now get the file |
|
649
|
0
|
|
|
|
|
|
my %sopts = ( ip => $shostip, port => $sport ); |
|
650
|
|
|
|
|
|
|
|
|
651
|
0
|
|
|
|
|
|
my $get = "GET $spath HTTP/1.0\r\nConnection: keep-alive\r\n"; |
|
652
|
|
|
|
|
|
|
# plugin set a custom host. |
|
653
|
0
|
0
|
|
|
|
|
$get .= "Host: $shttphost\r\n" if $shttphost; |
|
654
|
|
|
|
|
|
|
|
|
655
|
0
|
|
|
|
|
|
my $data = ''; |
|
656
|
0
|
|
|
|
|
|
my ($sock, $dsock); |
|
657
|
0
|
|
|
|
|
|
my ($wcount, $bytes_to_read, $written, $remain); |
|
658
|
0
|
|
|
|
|
|
my ($stries, $dtries) = (0, 0); |
|
659
|
|
|
|
|
|
|
|
|
660
|
0
|
0
|
|
|
|
|
retry: |
|
661
|
|
|
|
|
|
|
$sconn->close("retrying") if $sconn; |
|
662
|
0
|
0
|
|
|
|
|
$dconn->close("retrying") if $dconn; |
|
663
|
0
|
|
|
|
|
|
$dconn = undef; |
|
664
|
0
|
|
|
|
|
|
$stries++; |
|
665
|
0
|
0
|
|
|
|
|
$sconn = $shost->http_conn_get(\%sopts) |
|
666
|
|
|
|
|
|
|
or return $src_error->("Unable to create source socket to $shostip:$sport for $spath"); |
|
667
|
0
|
|
|
|
|
|
$sock = $sconn->sock; |
|
668
|
0
|
0
|
|
|
|
|
unless ($sock->write("$get\r\n")) { |
|
669
|
0
|
0
|
0
|
|
|
|
goto retry if $sconn->retryable && $stries == 1; |
|
670
|
0
|
|
|
|
|
|
return $src_error->("Pipe closed retrieving $spath from $shostip:$sport"); |
|
671
|
|
|
|
|
|
|
} |
|
672
|
|
|
|
|
|
|
|
|
673
|
|
|
|
|
|
|
# we just want a content length |
|
674
|
0
|
|
|
|
|
|
my $sres = read_headers($sock); |
|
675
|
0
|
0
|
|
|
|
|
unless ($sres) { |
|
676
|
0
|
0
|
0
|
|
|
|
goto retry if $sconn->retryable && $stries == 1; |
|
677
|
0
|
|
|
|
|
|
return $error_unreachable->("Error: Resource $surl failed to return an HTTP response"); |
|
678
|
|
|
|
|
|
|
} |
|
679
|
0
|
0
|
0
|
|
|
|
unless ($sres->{code} >= 200 && $sres->{code} <= 299) { |
|
680
|
0
|
|
|
|
|
|
return $error_unreachable->("Error: Resource $surl failed: HTTP $sres->{code}"); |
|
681
|
|
|
|
|
|
|
} |
|
682
|
0
|
|
|
|
|
|
$clen = $sres->{len}; |
|
683
|
|
|
|
|
|
|
|
|
684
|
0
|
0
|
|
|
|
|
return $error_unreachable->("File $spath has unexpected content-length of $clen, not $expected_clen") |
|
685
|
|
|
|
|
|
|
if $clen != $expected_clen; |
|
686
|
|
|
|
|
|
|
|
|
687
|
|
|
|
|
|
|
# open target for put |
|
688
|
0
|
|
|
|
|
|
$dtries++; |
|
689
|
0
|
0
|
|
|
|
|
$dconn = $dhost->http_conn_get |
|
690
|
|
|
|
|
|
|
or return $dest_error->("Unable to create dest socket to $dhostip:$dport for $dpath"); |
|
691
|
0
|
|
|
|
|
|
$dsock = $dconn->sock; |
|
692
|
|
|
|
|
|
|
|
|
693
|
0
|
0
|
|
|
|
|
unless ($dsock->write($put)) { |
|
694
|
0
|
0
|
0
|
|
|
|
goto retry if $dconn->retryable && $dtries == 1; |
|
695
|
0
|
|
|
|
|
|
return $dest_error->("Pipe closed during write to $dpath on $dhostip:$dport"); |
|
696
|
|
|
|
|
|
|
} |
|
697
|
|
|
|
|
|
|
|
|
698
|
|
|
|
|
|
|
# now read data and print while we're reading. |
|
699
|
0
|
|
|
|
|
|
($written, $remain) = (0, $clen); |
|
700
|
0
|
|
|
|
|
|
$bytes_to_read = 1024*1024; # read 1MB at a time until there's less than that remaining |
|
701
|
0
|
0
|
|
|
|
|
$bytes_to_read = $remain if $remain < $bytes_to_read; |
|
702
|
0
|
|
|
|
|
|
$wcount = 0; |
|
703
|
|
|
|
|
|
|
|
|
704
|
0
|
|
|
|
|
|
while ($bytes_to_read) { |
|
705
|
0
|
|
|
|
|
|
my $bytes = $sock->read($data, $bytes_to_read); |
|
706
|
0
|
0
|
|
|
|
|
unless (defined $bytes) { |
|
707
|
0
|
|
|
|
|
|
return $src_error->("error reading midway through source: $!"); |
|
708
|
|
|
|
|
|
|
} |
|
709
|
0
|
0
|
|
|
|
|
if ($bytes == 0) { |
|
710
|
0
|
|
|
|
|
|
return $src_error->("EOF reading midway through source: $!"); |
|
711
|
|
|
|
|
|
|
} |
|
712
|
|
|
|
|
|
|
|
|
713
|
|
|
|
|
|
|
# now we've read in $bytes bytes |
|
714
|
0
|
|
|
|
|
|
$remain -= $bytes; |
|
715
|
0
|
0
|
|
|
|
|
$bytes_to_read = $remain if $remain < $bytes_to_read; |
|
716
|
0
|
0
|
|
|
|
|
$digest->add($data) if $digest; |
|
717
|
|
|
|
|
|
|
|
|
718
|
0
|
|
|
|
|
|
my $data_len = $bytes; |
|
719
|
0
|
|
|
|
|
|
my $data_off = 0; |
|
720
|
0
|
|
|
|
|
|
while (1) { |
|
721
|
0
|
|
|
|
|
|
my $wbytes = syswrite($dsock, $data, $data_len, $data_off); |
|
722
|
0
|
0
|
|
|
|
|
unless (defined $wbytes) { |
|
723
|
|
|
|
|
|
|
# it can take two writes to determine if a socket is dead |
|
724
|
|
|
|
|
|
|
# (TCP_NODELAY and TCP_CORK are (and must be) zero here) |
|
725
|
0
|
0
|
0
|
|
|
|
goto retry if (!$wcount && $dconn->retryable && $dtries == 1); |
|
|
|
|
0
|
|
|
|
|
|
726
|
0
|
|
|
|
|
|
return $dest_error->("Error: syswrite failed after $written bytes with: $!; failed putting to $dpath"); |
|
727
|
|
|
|
|
|
|
} |
|
728
|
0
|
|
|
|
|
|
$wcount++; |
|
729
|
0
|
|
|
|
|
|
$written += $wbytes; |
|
730
|
0
|
|
|
|
|
|
$intercopy_cb->(); |
|
731
|
0
|
0
|
|
|
|
|
last if ($data_len == $wbytes); |
|
732
|
|
|
|
|
|
|
|
|
733
|
0
|
|
|
|
|
|
$data_len -= $wbytes; |
|
734
|
0
|
|
|
|
|
|
$data_off += $wbytes; |
|
735
|
|
|
|
|
|
|
} |
|
736
|
|
|
|
|
|
|
|
|
737
|
0
|
0
|
|
|
|
|
die if $bytes_to_read < 0; |
|
738
|
|
|
|
|
|
|
} |
|
739
|
|
|
|
|
|
|
|
|
740
|
|
|
|
|
|
|
# source connection drained, return to pool |
|
741
|
0
|
0
|
|
|
|
|
if ($sres->{keep}) { |
|
742
|
0
|
|
|
|
|
|
$shost->http_conn_put($sconn); |
|
743
|
0
|
|
|
|
|
|
$sconn = undef; |
|
744
|
|
|
|
|
|
|
} else { |
|
745
|
0
|
|
|
|
|
|
$sconn->close("http_close"); |
|
746
|
|
|
|
|
|
|
} |
|
747
|
|
|
|
|
|
|
|
|
748
|
|
|
|
|
|
|
# callee will want this digest, too, so clone as "digest" is destructive |
|
749
|
0
|
0
|
|
|
|
|
$digest = $digest->clone->digest if $digest; |
|
750
|
|
|
|
|
|
|
|
|
751
|
0
|
0
|
|
|
|
|
if ($fid_checksum) { |
|
752
|
0
|
0
|
|
|
|
|
if ($digest ne $fid_checksum->{checksum}) { |
|
753
|
0
|
|
|
|
|
|
my $expect = $fid_checksum->hexdigest; |
|
754
|
0
|
|
|
|
|
|
$digest = unpack("H*", $digest); |
|
755
|
0
|
|
|
|
|
|
return $src_error->("checksum mismatch on GET: expected: $expect actual: $digest"); |
|
756
|
|
|
|
|
|
|
} |
|
757
|
|
|
|
|
|
|
} |
|
758
|
|
|
|
|
|
|
|
|
759
|
|
|
|
|
|
|
# now read in the response line (should be first line) |
|
760
|
0
|
|
|
|
|
|
my $dres = read_headers($dsock); |
|
761
|
0
|
0
|
|
|
|
|
unless ($dres) { |
|
762
|
0
|
0
|
0
|
|
|
|
goto retry if (!$wcount && $dconn->retryable && $dtries == 1); |
|
|
|
|
0
|
|
|
|
|
|
763
|
0
|
|
|
|
|
|
return $dest_error->("Error: HTTP response line not recognized writing to $durl"); |
|
764
|
|
|
|
|
|
|
} |
|
765
|
|
|
|
|
|
|
|
|
766
|
|
|
|
|
|
|
# drain the response body if there is one |
|
767
|
|
|
|
|
|
|
# there may be no dres->{len}/Content-Length if there is no body |
|
768
|
0
|
0
|
|
|
|
|
if ($dres->{len}) { |
|
769
|
0
|
|
|
|
|
|
my $r = $dsock->read($data, $dres->{len}); # dres->{len} should be tiny |
|
770
|
0
|
0
|
|
|
|
|
if (defined $r) { |
|
771
|
0
|
0
|
|
|
|
|
if ($r != $dres->{len}) { |
|
772
|
0
|
|
|
|
|
|
Mgd::error("Failed to read $r of Content-Length:$dres->{len} bytes for PUT response on $durl"); |
|
773
|
0
|
|
|
|
|
|
$dres->{keep} = 0; |
|
774
|
|
|
|
|
|
|
} |
|
775
|
|
|
|
|
|
|
} else { |
|
776
|
0
|
|
|
|
|
|
Mgd::error("Failed to read Content-Length:$dres->{len} bytes for PUT response on $durl ($!)"); |
|
777
|
0
|
|
|
|
|
|
$dres->{keep} = 0; |
|
778
|
|
|
|
|
|
|
} |
|
779
|
|
|
|
|
|
|
} |
|
780
|
|
|
|
|
|
|
|
|
781
|
|
|
|
|
|
|
# return the connection back to the connection pool |
|
782
|
0
|
0
|
|
|
|
|
if ($dres->{keep}) { |
|
783
|
0
|
|
|
|
|
|
$dhost->http_conn_put($dconn); |
|
784
|
0
|
|
|
|
|
|
$dconn = undef; |
|
785
|
|
|
|
|
|
|
} else { |
|
786
|
0
|
|
|
|
|
|
$dconn->close("http_close"); |
|
787
|
|
|
|
|
|
|
} |
|
788
|
|
|
|
|
|
|
|
|
789
|
0
|
0
|
0
|
|
|
|
if ($dres->{code} >= 200 && $dres->{code} <= 299) { |
|
790
|
0
|
0
|
|
|
|
|
if ($digest) { |
|
791
|
0
|
|
0
|
|
|
|
my $alg = ($fid_checksum && $fid_checksum->hashname) || $fid->class->hashname; |
|
792
|
|
|
|
|
|
|
|
|
793
|
0
|
0
|
0
|
|
|
|
if ($ddev->{reject_bad_md5} && ($alg eq "MD5")) { |
|
794
|
|
|
|
|
|
|
# dest device would've rejected us with a error, |
|
795
|
|
|
|
|
|
|
# no need to reread the file |
|
796
|
0
|
|
|
|
|
|
return 1; |
|
797
|
|
|
|
|
|
|
} |
|
798
|
0
|
|
|
|
|
|
my $httpfile = MogileFS::HTTPFile->at($durl); |
|
799
|
0
|
|
|
|
|
|
my $actual = $httpfile->digest($alg, $intercopy_cb); |
|
800
|
0
|
0
|
|
|
|
|
if ($actual ne $digest) { |
|
801
|
0
|
|
|
|
|
|
my $expect = unpack("H*", $digest); |
|
802
|
0
|
|
|
|
|
|
$actual = unpack("H*", $actual); |
|
803
|
0
|
|
|
|
|
|
return $dest_error->("checksum mismatch on PUT, expected: $expect actual: $digest"); |
|
804
|
|
|
|
|
|
|
} |
|
805
|
|
|
|
|
|
|
} |
|
806
|
0
|
|
|
|
|
|
return 1; |
|
807
|
|
|
|
|
|
|
} |
|
808
|
0
|
|
|
|
|
|
return $dest_error->("Got HTTP status code $dres->{code} PUTing to $durl"); |
|
809
|
|
|
|
|
|
|
} |
|
810
|
|
|
|
|
|
|
|
|
811
|
|
|
|
|
|
|
1; |
|
812
|
|
|
|
|
|
|
|
|
813
|
|
|
|
|
|
|
# Local Variables: |
|
814
|
|
|
|
|
|
|
# mode: perl |
|
815
|
|
|
|
|
|
|
# c-basic-indent: 4 |
|
816
|
|
|
|
|
|
|
# indent-tabs-mode: nil |
|
817
|
|
|
|
|
|
|
# End: |
|
818
|
|
|
|
|
|
|
|
|
819
|
|
|
|
|
|
|
__END__ |