File Coverage

blib/lib/Yars/Command/yars_balance.pm
Criterion Covered Total %
statement 32 152 21.0
branch 0 42 0.0
condition 0 4 0.0
subroutine 11 21 52.3
pod 0 1 0.0
total 43 220 19.5


line stmt bran cond sub pod time code
1             package Yars::Command::yars_balance;
2              
3 1     1   897 use strict;
  1         2  
  1         25  
4 1     1   6 use warnings;
  1         1  
  1         23  
5 1     1   19 use 5.010;
  1         3  
6 1     1   296 use Yars;
  1         3  
  1         13  
7 1     1   355 use Yars::Client;
  1         3  
  1         6  
8 1     1   276 use Path::Class qw( dir file );
  1         313  
  1         51  
9 1     1   6 use Getopt::Long qw( GetOptions );
  1         2  
  1         7  
10 1     1   114 use Pod::Usage qw( pod2usage );
  1         2  
  1         44  
11 1     1   5 use Digest::file qw( digest_file_hex );
  1         2  
  1         43  
12 1     1   5 use Mojo::URL;
  1         2  
  1         7  
13              
14             # PODNAME: yars_balance
15             # ABSTRACT: Fix all files
16             our $VERSION = '1.31'; # VERSION
17              
18              
19             sub _recurse
20             {
21 0     0     my($root, $cb) = @_;
22 0           foreach my $child ($root->children)
23             {
24 0 0         if($child->is_dir)
25             {
26 0           _recurse($child,$cb);
27             }
28             else
29             {
30 0           $cb->($child);
31             }
32             }
33            
34 0           my $count = do {
35 1     1   88 use autodie;
  1         1  
  1         7  
36 0           my $dh;
37 0           opendir $dh, $root;
38 0           my $count = scalar grep !/^\.\.?$/, readdir $dh;
39 0           closedir $dh;
40 0           $count;
41             };
42            
43 0 0         if($count == 0)
44             {
45 0           rmdir $root;
46             }
47             }
48              
49             sub _rebalance_dir
50             {
51 0     0     my($yars, $client, $disk, $server, $opt) = @_;
52              
53 0           my $root = dir( $disk->{root} );
54            
55             my $cleanup_file = $opt->{backup}
56             ? sub {
57 0     0     my($filename, $md5) = @_;
58 0           my $dir = $root->subdir('balance-backup', @$md5);
59 0           $dir->mkpath;
60 0           my $to = $dir->file($filename->basename);
61 0 0         rename "$filename", "$to"
62             or warn "unable to rename $filename => $to $!";
63             }
64             : sub {
65 0     0     my($filename) = @_;
66 0 0         unlink "$filename"
67             or warn "error removing $filename";
68 0 0         };
69            
70             my $compute_md5_as_list = sub {
71 0     0     my($filename) = @_;
72             # compute the md5 to ensure that the file isn't corrupt
73 0           my $md5 = digest_file_hex("$filename", "MD5");
74 0           my @md5 = ($md5 =~ /(..)/g);
75            
76             # verify that the file itself is in the right place
77 0           my $expected_file = $root->subdir(@md5, $filename->basename);
78 0 0         if("$expected_file" ne "$filename")
79             {
80 0           warn "file: $filename (md5 $md5) is stored at $filename instead of $expected_file. May be corrupt.";
81 0           return;
82             }
83 0           ($md5, @md5);
84 0           };
85            
86 0           foreach my $dir (sort grep { $_->basename =~ /^[a-f0-9]{1,2}$/ } $root->children)
  0            
87             {
88 0           my $expected_dir = $yars->tools->disk_for($dir->basename);
89            
90             # If disk_for returns a value, then it means the file belongs on the current
91             # server. If it returns undef it should be uploaded to a different server.
92             # so we do either a filesystem level move, or a http remote move for each
93             # file in the stashed directory.
94              
95 0 0         if(defined $expected_dir)
96             {
97 0           $expected_dir = dir( $expected_dir );
98            
99             # if the expected dir is where it is stored, then it is already in the right place.
100 0 0         next if $expected_dir eq $dir->parent;
101              
102             _recurse $dir, sub {
103 0     0     my($from) = @_;
104 0           say 'LCL ', $from->basename;
105            
106 0           my($md5, @md5) = $compute_md5_as_list->($from);
107 0 0         return unless $md5;
108            
109             # temporary filename to copy to first
110 0           my(undef,$tmp) = $expected_dir->subdir('tmp')->tempfile( "balanceXXXXXX", SUFFIX => '.tmp' );
111 0           $tmp = file($tmp);
112 0           $tmp->parent->mkpath(0,0700);
113            
114             # final filename to move file once the transfer to the new
115             # partition is complete.
116 0           my $to = $expected_dir->subdir(@md5, $from->basename);
117            
118 0 0         $from->copy_to($tmp) or do {
119 0           warn "error copying $from => $tmp $!";
120 0           unlink "$tmp";
121 0           return;
122             };
123            
124             # verify that the copied file still has the same MD5 in its
125             # new location.
126 0           my $md5_verify = digest_file_hex("$tmp", "MD5");
127 0 0         if($md5 ne $md5_verify)
128             {
129 0           warn "file: $tmp does not match original md5. May be corrupt.";
130 0           return;
131             }
132            
133 0           $to->parent->mkpath(0,0700);
134 0 0         $tmp->move_to($to) or do {
135 0           warn "error moving $tmp => $to $!";
136 0           return;
137             };
138            
139 0           $cleanup_file->($from, \@md5);
140 0           };
141             }
142             else
143             {
144             _recurse $dir, sub {
145 0     0     my($file) = @_;
146 0           say 'RMT ', $file->basename;
147              
148 0           my($md5,@md5) = $compute_md5_as_list->($file);
149 0 0         return unless $md5;
150              
151 0 0         $client->upload('--nostash' => 1, "$file") or do {
152 0           warn "unable to upload $file @{[ $client->errorstring ]}";
  0            
153 0           return;
154             };
155            
156             # we did a bucket map check above, but doublecheck the header returned
157             # to us for the server doesn't match the old server location. If
158             # there is a server restart between the original check and here it
159             # could otherwise cause problems.
160 0           my $new_location = Mojo::URL->new($client->res->headers->location);
161 0           my $old_location = Mojo::URL->new($yars->config->url);
162 0           $old_location->path($new_location->path);
163 0 0         if("$new_location" eq "$old_location")
164             {
165 0           die "uploaded to the same server, probably configuration mismatch!";
166             }
167            
168 0           $cleanup_file->($file, \@md5);
169 0           };
170             }
171             }
172             }
173              
174             sub main
175             {
176 0     0 0   my $class = shift;
177 0           local @ARGV = @_;
178 0           my $threads = 1;
179 0           my $backup = 0;
180            
181             GetOptions(
182             'threads|t=i' => \$threads,
183             'backup|b' => \$backup,
184 0     0     'help|h' => sub { pod2usage({ -verbose => 2 }) },
185             'version' => sub {
186 0   0 0     say 'Yars version ', ($Yars::Command::yars_fast_balance::VERSION // 'dev');
187 0           exit 1;
188             },
189 0 0         ) || pod2usage(1);
190            
191 0           my $yars = Yars->new;
192 0           my $client = Yars::Client->new;
193 0           my @work_list;
194              
195 0           foreach my $server ($yars->config->servers)
196             {
197              
198             # doublecheck that the local bucket map and the
199             # server bucketmaps match. Otherwise we could
200             # migrate a file to the same server, and then
201             # delete it, thus loosing the file! Not good.
202 0           my $bucket_map_url = Mojo::URL->new($server->{url});
203 0           $bucket_map_url->path('/bucket_map');
204 0           my $tx = $client->ua->get($bucket_map_url);
205 0 0         if(my $res = $tx->success)
206             {
207 0           my %server_bucket_map = %{ $res->json };
  0            
208 0           my %my_bucket_map = %{ $yars->tools->bucket_map };
  0            
209            
210 0           foreach my $key (keys %my_bucket_map)
211             {
212 0   0       my $other = (delete $server_bucket_map{$key})//'';
213 0 0         if($my_bucket_map{$key} ne $other)
214             {
215 0           die "client/server mismatch on bucket $key";
216             }
217             }
218 0           foreach my $key (keys %server_bucket_map)
219             {
220 0           die "client/server mismatch on bucket $key";
221             }
222             }
223             else
224             {
225 0           die "unable to get bucket map from ", $server->{url};
226             }
227            
228             # only rebalance disks that we are responsible for...
229             # even if perhaps those disks are available to us...
230 0 0         next unless $yars->config->url eq $server->{url};
231 0           foreach my $disk (@{ $server->{disks} })
  0            
232             {
233 0           push @work_list, [$yars,$client,$disk,$server, { backup => $backup } ];
234             }
235             }
236              
237 0 0         if($threads > 1)
238             {
239 0           say "running with $threads threads";
240 0 0         if(eval { require Parallel::ForkManager; 1 })
  0            
  0            
241             {
242 0           my $pm = Parallel::ForkManager->new($threads);
243 0           foreach my $work (@work_list)
244             {
245 0           $pm->start;
246 0           _rebalance_dir(@$work);
247 0           $pm->finish;
248             }
249 0           $pm->wait_all_children;
250 0           return;
251             }
252             else
253             {
254 0           warn "Unable to fork without Parallel::ForkManager";
255             }
256             }
257              
258 0           _rebalance_dir(@$_) for @work_list;
259             }
260              
261             1;
262              
263             __END__
264              
265             =pod
266              
267             =head1 NAME
268              
269             Yars::Command::yars_balance - code for yars_balance
270              
271             =head1 DESCRIPTION
272              
273             This module contains the machinery for the command line program L<yars_balance>
274              
275             =head1 SEE ALSO
276              
277             L<yars_disk_scan>
278              
279             =cut