File Coverage

blib/lib/MogileFS/Rebalance.pm
Criterion Covered Total %
statement 15 266 5.6
branch 0 154 0.0
condition 0 35 0.0
subroutine 5 23 21.7
pod 0 11 0.0
total 20 489 4.0


line stmt bran cond sub pod time code
1             package MogileFS::Rebalance;
2 21     21   133 use strict;
  21         49  
  21         973  
3 21     21   127 use warnings;
  21         42  
  21         1024  
4 21     21   136 use Carp qw(croak);
  21         45  
  21         1483  
5 21     21   130 use List::Util ();
  21         314  
  21         614  
6 21     21   158 use MogileFS::Server ();
  21         41  
  21         83507  
7              
8             # Note: The filters aren't written for maximum speed, as they're not likely
9             # in the slow path. They're supposed to be readable/extensible. Please don't
10             # cram them down unless you have to.
11             # TODO: allow filters to note by dev why they were filtered in/out, and return
12             # that info for DEBUG display.
13             # TODO: Add "debug trace" lines to most functions. "choosing sdev to work on",
14             # etc.
15             # TODO: tally into the state how many fids/size/etc it's done so far.
16             # TODO: should track old device state and return to it. Overall this is
17             # probably better fit by switching "device state" to a set of "device flags",
18             # so we can disable specifically "stop getting new files" while we work :(
19              
20             # Default policy structure are all of these fields.
21             # A minimum set of fields should be defined for a policy to be valid..
22             my %default_policy = (
23             # source
24             from_all_devs => 1,
25             from_hosts => [], # host ids.
26             from_devices => [], # dev ids.
27             from_percent_used => undef, # 0.nn * 100
28             from_percent_free => undef,
29             from_space_used => undef,
30             from_space_free => undef,
31             fid_age => 'old', # old|new
32             limit_type => 'device', # global|device
33             limit_by => 'none', # size|count|percent|none
34             limit => undef, # 100g|10%|5000
35             # target
36             to_all_devs => 1,
37             to_hosts => [],
38             to_devices => [],
39             to_percent_used => undef,
40             to_percent_free => undef,
41             to_space_used => undef,
42             to_space_free => undef,
43             not_to_hosts => [],
44             not_to_devices => [],
45             use_dest_devs => 'all', # all|N (list up to N devices to rep pol)
46             leave_in_drain_mode => 0,
47             );
48              
49             # State policy example
50             my %default_state = (
51             completed_devs => [],
52             source_devs => [],
53             sdev_current => 0,
54             sdev_lastfid => 0,
55             sdev_limit => 0,
56             limit => 0,
57             fids_queued => 0,
58             bytes_queued => 0,
59             time_started => 0,
60             time_finished => 0,
61             time_stopped => 0,
62             time_latest_run => 0,
63             time_latest_empty_run => 0,
64             empty_runs => 0,
65             );
66              
67             sub new {
68 0     0 0   my $class = shift;
69 0   0       my $policy = shift || "";
70 0   0       my $state = shift || '';
71              
72             # Validate policy here?
73 0           my $self = bless {
74             policy => '',
75             state => '',
76             }, $class;
77              
78 0 0         $self->policy($policy) if $policy;
79 0 0         $self->load_state($state) if $state;
80              
81 0           return $self;
82             }
83              
84             sub init {
85 0     0 0   my $self = shift;
86 0           my $devs = shift;
87              
88 0 0         croak "policy object already initialized" if $self->{state};
89 0 0 0       croak "please pass in devices to filter" unless $devs && ref($devs);
90 0           my %state = %default_state;
91              
92             # If we don't have an initial source device list, discover them.
93             # Used to filter destination devices later.
94 0           $state{source_devs} = $self->filter_source_devices($devs);
95 0           $state{time_started} = time();
96 0           $self->{state} = \%state;
97             }
98              
99             sub stop {
100 0     0 0   my $self = shift;
101 0           my $p = $self->{policy};
102 0           my $s = $self->{state};
103 0           my $sdev = $self->{sdev_current};
104 0 0         unless ($p->{leave_in_drain_mode}) {
105 0 0         Mgd::get_store()->set_device_state($sdev, 'alive') if $sdev;
106             }
107 0           $s->{time_stopped} = time();
108             }
109              
110             sub finish {
111 0     0 0   my $self = shift;
112 0           my $s = $self->{state};
113 0           $s->{time_finished} = time();
114             }
115              
116             # Resume from saved as_string state.
117             sub load_state {
118 0     0 0   my $self = shift;
119 0           my $state = shift;
120 0           my $state_parsed = $self->_parse_settings($state, \%default_state);
121             # TODO: validate state?
122 0           $self->{state} = $state_parsed;
123             }
124              
125             # Call as_string()? merge into load_state as "state"?
126             sub save_state {
127 0     0 0   my $self = shift;
128 0           return $self->_save_settings($self->{state});
129             }
130              
131             sub source_devices {
132 0     0 0   my $self = shift;
133 0           return $self->{source_devs};
134             }
135              
136             sub policy {
137 0     0 0   my $self = shift;
138 0 0         unless (@_) {
139             # TODO: serialize it or pass a structure?
140 0           return $self->{policy};
141             }
142 0           my $policy = shift;
143 0           $self->{policy} = $self->_parse_settings($policy, \%default_policy);
144 0           return $self->{policy};
145             }
146              
147             sub _save_settings {
148 0     0     my $self = shift;
149 0           my $settings = shift;
150 0           my @tosave = ();
151 0           while (my ($key, $val) = each %{$settings}) {
  0            
152             # Only ref we support is ARRAY at the mo'...
153 0 0         if (ref($val) eq 'ARRAY') {
154 0           push(@tosave, $key . '=' . join(',', @$val));
155             } else {
156 0           push(@tosave, $key . '=' . $val);
157             }
158             }
159 0           return join(' ', @tosave);
160             }
161              
162             # foo=bar foo2=bar2 foo3=baaz,quux
163             sub _parse_settings {
164 0     0     my $self = shift;
165 0           my $settings = shift;
166 0   0       my $constraint = shift || '';
167 0           my %parsed = ();
168             # the constraint also serves as a set of defaults.
169 0 0         %parsed = %{$constraint} if ($constraint);
  0            
170              
171 0 0         return unless $settings;
172             # parse out from a string: key=value key=value
173 0           for my $tuple (split /\s/, $settings) {
174 0           my ($key, $value) = split /=/, $tuple;
175 0 0         if (index($value, ',') > -1) {
176             # ',' is reserved for multivalue types.
177 0           $value = [split /,/, $value];
178             }
179             # In the future we could do stronger type checking at load
180             # time, but for now this will happen at use time :/
181 0 0         if ($constraint) {
182 0 0         if (exists $constraint->{$key}) {
183 0           my $c = $constraint->{$key};
184             # default says we should be an array.
185 0 0 0       if (ref($c) && ref($c) eq 'ARRAY' && !ref($value)) {
      0        
186 0           $parsed{$key} = [$value];
187             } else {
188 0           $parsed{$key} = $value;
189             }
190             } else {
191 0           croak "Invalid setting $key";
192             }
193             } else {
194 0           $parsed{$key} = $value;
195             }
196             }
197 0           return \%parsed;
198             }
199              
200             # step through the filters and find the next set of fids to rebalance.
201             # should $sto be passed in here or should we fetch it ourselves?
202             # also, should device info be passed in? I think so.
203             # returning 'undef' means there's nothing left
204             # returning an empty array means "try again"
205             sub next_fids_to_rebalance {
206 0     0 0   my $self = shift;
207 0           my $devs = shift;
208 0           my $sto = shift;
209 0   0       my $limit = shift || 100; # random low default.
210             # Balk unless we have a policy or a state?
211 0           my $policy = $self->{policy};
212 0 0         croak "No policy loaded" unless $policy;
213 0 0         croak "Must pass in device list" unless $devs;
214 0 0         croak "Must pass in storage object" unless $sto;
215 0           my $state = $self->{state};
216              
217             # If we're not working against a source device, discover one
218 0           my $sdev = $self->_find_source_device($state->{source_devs});
219 0 0         return undef unless $sdev;
220 0           $sdev = Mgd::device_factory()->get_by_id($sdev);
221 0           my $filtered_destdevs = $self->filter_dest_devices($devs);
222              
223 0 0         croak("rebalance cannot find suitable destination devices")
224             unless (@$filtered_destdevs);
225              
226 0           my @fids = $sdev->fid_chunks(age => $policy->{fid_age},
227             fidid => $state->{sdev_lastfid},
228             limit => $limit);
229             # We'll wait until the next cycle to find a new sdev.
230 0 0 0       if (! @fids || ! $self->_check_limits) {
231 0           $self->_finish_source_device;
232 0           return [];
233             }
234              
235             # In both old or new cases, the "last" fid in the list is correct.
236 0           $state->{sdev_lastfid} = $fids[-1]->id;
237              
238             # TODO: create a filterset for $fid settings. filesize, class, domain, etc.
239 0           my @devfids = ();
240 0           for my $fid (@fids) {
241             # count the fid or size against device limit.
242 0 0         next unless $fid->exists;
243 0 0         $self->_check_limits($fid) or next;
244 0           my $destdevs = $self->_choose_dest_devs($fid, $filtered_destdevs);
245             # Update internal stats.
246 0           $state->{fids_queued}++;
247 0           $state->{bytes_queued} += $fid->length;
248 0           push(@devfids, [$fid->id, $sdev->id, $destdevs]);
249             }
250              
251 0           $state->{time_latest_run} = time;
252 0 0         unless (@devfids) {
253 0           $state->{empty_runs}++;
254 0           $state->{time_latest_empty_run} = time;
255             }
256              
257             # return block of fiddev combos.
258 0           return \@devfids;
259             }
260              
261             # ensure this fid wouldn't overrun a limit.
262             sub _check_limits {
263 0     0     my $self = shift;
264 0           my $fid = shift;
265 0           my $p = $self->{policy};
266 0           my $s = $self->{state};
267 0 0         return 1 if ($p->{limit_by} eq 'none');
268              
269 0           my $limit;
270 0 0         if ($p->{limit_type} eq 'global') {
271 0           $limit = \$s->{limit};
272             } else {
273 0           $limit = \$s->{sdev_limit};
274             }
275              
276 0 0         if ($p->{limit_by} eq 'count') {
    0          
277 0 0         return $fid ? $$limit-- : $$limit;
278             } elsif ($p->{limit_by} eq 'size') {
279 0 0         if ($fid) {
280 0 0         if ($fid->length() <= $$limit) {
281 0           $$limit -= $fid->length();
282 0           return 1;
283             } else {
284 0           return 0;
285             }
286             } else {
287 0 0         if ($$limit < 1024) {
288             # Arbitrary "give up if there's less than 1kb in the limit"
289             # FIXME: Make this configurable
290 0           return 0;
291             } else {
292 0           return 1;
293             }
294             }
295             } else {
296 0           croak("uknown limit_by type");
297             }
298             }
299              
300             # shuffle the list and return by limit.
301             # TODO: use the fid->length to ensure we don't send the file to devices
302             # that can't handle it.
303             sub _choose_dest_devs {
304 0     0     my $self = shift;
305 0           my $fid = shift;
306 0           my $filtered_devs = shift;
307 0           my $p = $self->{policy};
308              
309 0           my @shuffled_devs = List::Util::shuffle(@$filtered_devs);
310 0 0         return \@shuffled_devs if ($p->{use_dest_devs} eq 'all');
311              
312 0           return [splice @shuffled_devs, 0, $p->{use_dest_devs}];
313             }
314              
315             # Iterate through all possible constraints until we have a final list.
316             # unlike the source list we try this
317             sub filter_source_devices {
318 0     0 0   my $self = shift;
319 0           my $devs = shift;
320 0           my $policy = $self->{policy};
321            
322 0           my @sdevs = ();
323 0           for my $dev (@$devs) {
324 0 0         next unless $dev->can_delete_from;
325 0           my $id = $dev->id;
326 0 0         if (@{$policy->{from_devices}}) {
  0            
327 0 0         next unless grep { $_ == $id } @{$policy->{from_devices}};
  0            
  0            
328             }
329 0 0         if (@{$policy->{from_hosts}}) {
  0            
330 0           my $hostid = $dev->hostid;
331 0 0         next unless grep { $_ == $hostid } @{$policy->{from_hosts}};
  0            
  0            
332             }
333             # "at least this much used"
334 0 0         if ($policy->{from_percent_used}) {
335             # returns undef if it doesn't have stats on the device.
336 0           my $full = $dev->percent_full * 100;
337 0 0         next unless defined $full;
338 0 0         next unless $full > $policy->{from_percent_used};
339             }
340             # "at least this much free"
341 0 0         if ($policy->{from_percent_free}) {
342             # returns *0* if lacking stats. Must fix :(
343 0           my $free = $dev->percent_free * 100;
344 0 0         next unless $free; # hope this never lands at exact zero.
345 0 0         next unless $free > $policy->{from_percent_free};
346             }
347             # "at least this much used"
348 0 0         if ($policy->{from_space_used}) {
349 0           my $used = $dev->mb_used;
350 0 0 0       next unless $used && $used > $policy->{from_space_used};
351             }
352             # "at least this much free"
353 0 0         if ($policy->{from_space_free}) {
354 0           my $free = $dev->mb_free;
355 0 0 0       next unless $free && $free > $policy->{from_space_free};
356             }
357 0           push @sdevs, $id;
358             }
359              
360 0           return \@sdevs;
361             }
362              
363             sub _finish_source_device {
364 0     0     my $self = shift;
365 0           my $state = $self->{state};
366 0           my $policy = $self->{policy};
367 0 0         croak "Not presently working on a source device"
368             unless $state->{sdev_current};
369              
370 0           delete $state->{sdev_lastfid};
371 0           delete $state->{sdev_limit};
372 0           my $sdev = delete $state->{sdev_current};
373             # Unless the user wants a device to never get new files again (sticking in
374             # drain mode), return to alive.
375 0 0         unless ($policy->{leave_in_drain_mode}) {
376 0           Mgd::get_store()->set_device_state($sdev, 'alive');
377             }
378 0           push @{$state->{completed_devs}}, $sdev;
  0            
379             }
380              
381             # TODO: Be aware of down/unavail devices. temp skip them?
382             sub _find_source_device {
383 0     0     my $self = shift;
384 0           my $sdevs = shift;
385              
386 0           my $state = $self->{state};
387 0           my $p = $self->{policy};
388 0 0         unless ($state->{sdev_current}) {
389 0           my $sdev = shift @$sdevs;
390 0 0         return undef, undef unless $sdev;
391 0           $state->{sdev_current} = $sdev;
392 0           $state->{sdev_lastfid} = 0;
393 0           my $limit;
394 0 0         if ($p->{limit_type} eq 'device') {
395 0 0         if ($p->{limit_by} eq 'size') {
    0          
    0          
    0          
396             # Parse the size (default in megs?) out into bytes.
397 0           $limit = $self->_human_to_bytes($p->{limit});
398             } elsif ($p->{limit_by} eq 'count') {
399 0           $limit = $p->{limit};
400             } elsif ($p->{limit_by} eq 'percent') {
401 0           croak("policy size limits by percent are unimplemented");
402             } elsif ($p->{limit_by} eq 'none') {
403 0           $limit = 'none';
404             }
405             }
406             # Must mark device in "drain" mode while we work on it.
407 0           Mgd::get_store()->set_device_state($sdev, 'drain');
408 0           $state->{sdev_limit} = $limit;
409             }
410              
411 0           return $state->{sdev_current};
412             }
413              
414             # FIXME: Move to MogileFS::Util
415             # take a numeric string with a char suffix and turn it into bytes.
416             # no suffix means it's already bytes.
417             sub _human_to_bytes {
418 0     0     my $self = shift;
419 0           my $num = shift;
420              
421 0           my ($digits, $type);
422 0 0         if ($num =~ m/^(\d+)([bkmgtp])?$/i) {
423 0           $digits = $1;
424 0           $type = lc($2);
425             } else {
426 0           croak("Don't know what this number is: " . $num);
427             }
428              
429 0 0 0       return $digits unless $type || $type eq 'b';
430             # Sorry, being cute here :P
431 0           return $digits * (1024 ** index('bkmgtpezy', $type));
432             }
433              
434             # Apply policy to destination devices.
435             sub filter_dest_devices {
436 0     0 0   my $self = shift;
437 0           my $devs = shift;
438 0           my $policy = $self->{policy};
439 0           my $state = $self->{state};
440              
441             # skip anything we would source from.
442             # FIXME: ends up not skipping stuff out of completed_devs? :/
443 0           my %sdevs = map { $_ => 1 } @{$state->{source_devs}},
  0            
  0            
444 0           @{$state->{completed_devs}}, $state->{sdev_current};
445 0           my @devs = grep { ! $sdevs{$_->id} } @$devs;
  0            
446              
447 0           my @ddevs = ();
448 0           for my $dev (@devs) {
449 0 0         next unless $dev->should_get_new_files;
450 0           my $id = $dev->id;
451 0           my $hostid = $dev->hostid;
452              
453 0 0         if (@{$policy->{to_devices}}) {
  0            
454 0 0         next unless grep { $_ == $id } @{$policy->{to_devices}};
  0            
  0            
455             }
456 0 0         if (@{$policy->{to_hosts}}) {
  0            
457 0 0         next unless grep { $_ == $hostid } @{$policy->{to_hosts}};
  0            
  0            
458             }
459 0 0         if (@{$policy->{not_to_devices}}) {
  0            
460 0 0         next if grep { $_ == $id } @{$policy->{not_to_devices}};
  0            
  0            
461             }
462 0 0         if (@{$policy->{not_to_hosts}}) {
  0            
463 0 0         next if grep { $_ == $hostid } @{$policy->{not_to_hosts}};
  0            
  0            
464             }
465 0 0         if ($policy->{to_percent_used}) {
466 0           my $full = $dev->percent_full * 100;
467 0 0         next unless defined $full;
468 0 0         next unless $full > $policy->{to_percent_used};
469             }
470 0 0         if ($policy->{to_percent_free}) {
471 0           my $free = $dev->percent_free * 100;
472 0 0         next unless $free; # hope this never lands at exact zero.
473 0 0         next unless $free > $policy->{to_percent_free};
474             }
475 0 0         if ($policy->{to_space_used}) {
476 0           my $used = $dev->mb_used;
477 0 0 0       next unless $used && $used > $policy->{to_space_used};
478             }
479 0 0         if ($policy->{to_space_free}) {
480 0           my $free = $dev->mb_free;
481 0 0 0       next unless $free && $free > $policy->{to_space_free};
482             }
483 0           push @ddevs, $id;
484             }
485              
486 0           return \@ddevs;
487             }