File Coverage

blib/lib/Net/CascadeCopy.pm
Criterion Covered Total %
statement 243 257 94.5
branch 71 92 77.1
condition 7 11 63.6
subroutine 28 29 96.5
pod 4 4 100.0
total 353 393 89.8


line stmt bran cond sub pod time code
1             package Net::CascadeCopy;
2 34     34   929403 use strict;
  34         78  
  34         1154  
3 34     34   198 use warnings;
  34         45  
  34         1540  
4              
5             our $VERSION = '0.2.6'; # VERSION
6              
7 34     34   32726 use Mouse;
  34         1404456  
  34         238  
8              
9 34     34   82867 use Benchmark;
  34         332813  
  34         290  
10 34     34   54485 use Log::Log4perl qw(:easy);
  34         2335723  
  34         253  
11 34     34   53643 use POSIX ":sys_wait_h"; # imports WNOHANG
  34         329619  
  34         269  
12 34     34   101649 use Proc::Queue size => 32, debug => 0, trace => 0, delay => 1;
  34         563965  
  34         4911  
13              
14             my $logger = get_logger( 'default' );
15              
16             has data => ( is => 'ro', isa => 'HashRef', default => sub { return {} } );
17              
18             has total_time => ( is => 'rw', isa => 'Num', default => 0 );
19              
20             has ssh => ( is => 'ro', isa => 'Str', default => "ssh" );
21             has ssh_args => ( is => 'ro', isa => 'Str', default => "-x -A" );
22              
23             has command => ( is => 'ro', isa => 'Str', required => 1 );
24             has command_args => ( is => 'ro', isa => 'Str', default => "" );
25              
26             has source_path => ( is => 'ro', isa => 'Str', required => 1 );
27             has target_path => ( is => 'ro', isa => 'Str', required => 1 );
28              
29             has output => ( is => 'ro', isa => 'Str', default => "" );
30              
31             # maximum number of failures per server
32             has max_failures => ( is => 'ro', isa => 'Num', default => 3 );
33              
34             # maximum processes per remote server
35             has max_forks => ( is => 'ro', isa => 'Num', default => 2 );
36              
37             # keep track of child processes
38             has children => ( is => 'ro', isa => 'HashRef', default => sub { return {} } );
39              
40             # for testing purposes
41             has transfer_map => ( is => 'ro', isa => 'HashRef', default => sub { return {} } );
42              
43             # sort order
44             has sort_order => ( is => 'ro', isa => 'HashRef', default => sub { return {} } );
45              
46             sub add_group {
47 47     47 1 517 my ( $self, $group, $servers_a ) = @_;
48              
49 47         474 $logger->info( "Adding group: $group: ",
50             join( ", ", @$servers_a ),
51             );
52              
53             # initialize data structures
54 47         706 for my $server ( @{ $servers_a } ) {
  47         133  
55 253         1012 $self->data->{remaining}->{ $group }->{$server} = 1;
56              
57 253 100       870 unless ( defined $self->sort_order->{ $group }->{ $server } ) {
58 252         272 $self->sort_order->{ $group }->{ $server } = scalar keys %{ $self->sort_order->{ $group } };
  252         1417  
59             }
60             }
61              
62             # first server to transfer from is the current server
63 47         745 $self->data->{available}->{ $group }->{localhost} = 1;
64              
65             # initialize data structures
66 47         1234 $self->data->{completed}->{ $group } = [];
67              
68             }
69              
70             sub get_groups {
71 6     6 1 20 my ( $self ) = @_;
72              
73 6         17 my @groups;
74              
75 6         43 for my $group ( keys %{ $self->sort_order } ) {
  6         63  
76 7         43 push @groups, $group;
77             }
78              
79 6         29 return @groups;
80             }
81              
82             sub transfer {
83 11     11 1 33 my ( $self ) = @_;
84              
85 11         99 my $transfer_start = new Benchmark;
86              
87             LOOP:
88 11         352 while ( 1 ) {
89 109 100       1234 last LOOP unless $self->_transfer_loop( $transfer_start );
90 98         98162482 sleep 1;
91             }
92             }
93              
94             sub _transfer_loop {
95 153     153   16422 my ( $self, $transfer_start ) = @_;
96              
97 153         1122 $self->_check_for_completed_processes();
98              
99             # keep track if there are any remaining servers in any groups
100 153         317 my ( $remaining_flag, $available_flag );
101              
102             # handle completed processes
103 153 100 66     375 if ( ! scalar keys %{ $self->data->{remaining} } && ! $self->data->{running} ) {
  153         1190  
104 6         96 my $transfer_end = new Benchmark;
105 6         112 my $transfer_diff = timediff( $transfer_end, $transfer_start );
106 6         228 my $transfer_time = $self->_human_friendly_time( $transfer_diff->[0] );
107 6         90 $logger->warn( "Job completed in $transfer_time" );
108              
109 6         80 my $total_time = $self->_human_friendly_time( $self->total_time );
110 6         45 $logger->info ( "Cumulative tansfer time of all jobs: $total_time" );
111              
112 6         64 my $savings = $self->total_time - $transfer_diff->[0];
113 6 100       50 if ( $savings ) {
114 2         6 $savings = $self->_human_friendly_time( $savings );
115 2         12 $logger->info( "Approximate Time Saved: $savings" );
116             }
117              
118 6         27 my $failure_count;
119 6         39 for my $group ( $self->get_groups() ) {
120 7         33 my ( $errors, $failures ) = $self->_get_failed_count( $group );
121 7         27 $failure_count += $failures;
122             }
123              
124 6 100       41 if ( $failure_count ) {
125 1         6 $logger->fatal( "$failure_count Fatal Errors" );
126             }
127             else {
128 5         23 $logger->warn( "Completed successfully" );
129             }
130 6         102 return;
131             }
132              
133             # start new transfers to remaining servers
134 147         496 for ( 0, 1 ) {
135             # code_smell: start 2 transfers each round to keep the
136             # test cases from breaking. need to refactor test cases!
137 276         3159 for my $group ( $self->_get_remaining_groups() ) {
138             # group contains servers that still need to be tranferred
139              
140 300 100       2902 if ( $self->_get_available_servers( $group ) ) {
141             # reserve a server to start a new transfer from
142 144         1047 my $source = $self->_reserve_available_server( $group );
143 144         1417 my $target = $self->_reserve_remaining_server( $group );
144 144         951 $self->_start_process( $group, $source, $target );
145             }
146             }
147             }
148              
149 121         3803 return 1;
150             }
151              
152             sub _human_friendly_time {
153 95     95   259 my ( $self, $seconds ) = @_;
154              
155 95 50       457 return "0 secs" unless $seconds;
156              
157 95         188 my @time_string;
158              
159 95 50       506 if ( $seconds > 3600 ) {
160 0         0 my $hours = int( $seconds / 3600 );
161 0         0 $seconds = $seconds % 3600;
162 0         0 push @time_string, "$hours hrs";
163             }
164 95 50       507 if ( $seconds > 60 ) {
165 0         0 my $minutes = int( $seconds / 60 );
166 0         0 $seconds = $seconds % 60;
167 0         0 push @time_string, "$minutes mins";
168             }
169 95 50       392 if ( $seconds ) {
170 95         328 push @time_string, "$seconds secs";
171             }
172              
173 95         953 return join " ", @time_string;
174             }
175              
176             sub _print_status {
177 82     82   239 my ( $self, $group ) = @_;
178              
179             # completed procs
180 82         199 my $completed = 0;
181 82 50       461 if ( $self->data->{completed}->{ $group } ) {
182 82         139 $completed = scalar @{ $self->data->{completed}->{ $group } };
  82         764  
183             }
184              
185             # running procs
186 82         176 my $running = 0;
187 82 50       646 if ( $self->data->{running} ) {
188 82         174 for my $pid ( keys %{ $self->data->{running} } ) {
  82         463  
189 90 100       831 if ( $self->data->{running}->{ $pid }->{group} eq $group ) {
190 57         253 $running++;
191             }
192             }
193             }
194              
195             # unstarted
196 82         230 my $unstarted = 0;
197 82 100       2166 if ( $self->data->{remaining}->{ $group } ) {
198 68         140 $unstarted = scalar keys %{ $self->data->{remaining}->{ $group } };
  68         300  
199             }
200              
201             # failed
202 82         356 my ( $errors, $failures ) = $self->_get_failed_count( $group );
203              
204 82         1452 $logger->info( "\U$group: ",
205             "completed:$completed ",
206             "running:$running ",
207             "left:$unstarted ",
208             "errors:$errors ",
209             "failures:$failures ",
210             );
211             }
212              
213             sub _start_process {
214 144     144   389 my ( $self, $group, $source, $target ) = @_;
215              
216 144         819 $self->transfer_map->{$source}->{$target} += 1;
217              
218 144         712 my $f=fork;
219 144 100 66     64285438 if (defined ($f) and $f==0) {
220              
221 26         10334 my $target_path = $self->target_path;
222              
223 26         442 my $command;
224 26 100       1188 if ( $source eq "localhost" ) {
225 7         373 $command = join( " ",
226             $self->command,
227             $self->command_args,
228             $self->source_path,
229             "$target:$target_path"
230             );
231             } else {
232 19         3560 $command = join( " ",
233             $self->ssh,
234             $self->ssh_args,
235             $source,
236             $self->command,
237             $self->command_args,
238             $self->target_path,
239             "$target:$target_path"
240             );
241             }
242              
243 26         10230 print "COMMAND: $command\n";
244              
245 26   50     1473 my $output = $self->output || "";
246 26 50       487 if ( $output eq "stdout" ) {
    50          
247             # don't modify command
248             } elsif ( $output eq "log" ) {
249             # redirect all child output to log
250 0         0 $command = "$command >> ccp.$source.$target.log 2>&1"
251             } else {
252             # default is to redirectout stdout to /dev/null
253 26         255 $command = "$command >/dev/null"
254             }
255              
256 26         2256 $logger->info( "Starting: ($group) $source => $target" );
257 26         1654 $logger->debug( "Starting new child: $command" );
258              
259 26         48281265 system( $command );
260              
261 26 50       24584 if ($? == -1) {
    50          
262 0         0 $logger->logconfess( "failed to execute: $!" );
263             } elsif ($? & 127) {
264 0 0       0 $logger->logconfess( sprintf "child died with signal %d, %s coredump",
265             ($? & 127), ($? & 128) ? 'with' : 'without'
266             );
267             } else {
268 26         379 my $exit_status = $? >> 8;
269 26 100       456 if ( $exit_status ) {
270 1         250 $logger->error( "child exit status: $exit_status" );
271             }
272 26         13015 exit $exit_status;
273             }
274             } else {
275 118         19273 my $start = new Benchmark;
276 118         50018 $self->data->{running}->{ $f } = { group => $group,
277             source => $source,
278             target => $target,
279             start => $start,
280             };
281             }
282             }
283              
284             sub _check_for_completed_processes {
285 176     176   492 my ( $self ) = @_;
286              
287 176 100       3098 return unless $self->data->{running};
288              
289             # find any processes that ended and reschedule the source and
290             # target servers in the available pool
291 121         392 for my $pid ( keys %{ $self->data->{running} } ) {
  121         1771  
292 237 100       9041 if ( waitpid( $pid, WNOHANG) ) {
293              
294             # check the exit status of the command.
295 82 100       8894 if ( $? ) {
296 1         10 $self->_failed_process( $pid );
297             } else {
298 81         676 $self->_succeeded_process( $pid );
299             }
300             }
301             }
302              
303 121 100       4927 unless ( keys %{ $self->data->{running} } ) {
  121         827  
304 34         275 delete $self->data->{running};
305             }
306             }
307              
308             sub _succeeded_process {
309 81     81   433 my ( $self, $pid ) = @_;
310              
311 81         602 my $group = $self->data->{running}->{ $pid }->{group};
312 81         441 my $source = $self->data->{running}->{ $pid }->{source};
313 81         596 my $target = $self->data->{running}->{ $pid }->{target};
314 81         455 my $start = $self->data->{running}->{ $pid }->{start};
315              
316             # calculate time for this transfer
317 81         988 my $end = new Benchmark;
318 81         2469 my $diff = timediff( $end, $start );
319              
320             # keep track of transfer time totals
321 81         5085 $self->total_time( $self->total_time + $diff->[0] );
322              
323 81         722 my $time = $self->_human_friendly_time( $diff->[0] );
324 81         6048 $logger->warn( "Succeeded: ($group) $source => $target ($time)" );
325              
326 81         1815 $self->_mark_available( $group, $source );
327 81         527 $self->_mark_completed( $group, $target );
328 81         241 $self->_mark_available( $group, $target );
329              
330 81         460 delete $self->data->{running}->{ $pid };
331              
332 81         347 $self->_print_status( $group );
333             }
334              
335             sub _failed_process {
336 1     1   7 my ( $self, $pid ) = @_;
337              
338 1         8 my $group = $self->data->{running}->{ $pid }->{group};
339 1         20 my $source = $self->data->{running}->{ $pid }->{source};
340 1         10 my $target = $self->data->{running}->{ $pid }->{target};
341 1         8 my $start = $self->data->{running}->{ $pid }->{start};
342              
343             # calculate time for this transfer
344 1         9 my $end = new Benchmark;
345 1         34 my $diff = timediff( $end, $start );
346             # keep track of transfer time totals
347 1         61 $self->total_time( $self->total_time + $diff->[0] );
348              
349 1         24 $logger->warn( "Failed: ($group) $source => $target ($diff->[0] seconds)" );
350              
351             # there was an error during the transfer, reschedule
352             # it at the end of the list
353 1         23 $self->_mark_available( $group, $source );
354 1         8 my $fail_count = $self->_mark_failed( $group, $target );
355 1 50       6 if ( ! $self->_get_available_servers( $group ) ) {
    0          
356 1         10 $logger->fatal( "Error: no available servers in '$group' to handle $target" );
357 1         15 $self->_mark_failed( $group, $target, 1 );
358             }
359             elsif ( $fail_count >= $self->max_failures ) {
360 0         0 $logger->fatal( "Error: giving up on ($group) $target" );
361             } else {
362 0         0 $self->_mark_remaining( $group, $target );
363             }
364              
365 1         6 delete $self->data->{running}->{ $pid };
366              
367 1         7 $self->_print_status( $group );
368             }
369              
370             sub _get_available_servers {
371 513     513   1495 my ( $self, $group ) = @_;
372 513 50       4106 return unless $self->data->{available};
373 513 50       2374 return unless $self->data->{available}->{ $group };
374              
375 513         880 my @available;
376 513         831 for my $host ( $self->_sort_servers( $group, keys %{ $self->data->{available}->{ $group } } ) ) {
  513         5602  
377 619 100       2524 if ( $self->children->{$host} ) {
378 258 100       2866 next if $self->children->{$host} >= $self->max_forks;
379             }
380 495         1985 push @available, $host;
381             }
382              
383 513         3385 return @available;
384             }
385              
386             sub _reserve_available_server {
387 144     144   733 my ( $self, $group ) = @_;
388              
389 144         436 my ( $server ) = $self->_get_available_servers( $group );
390 144         4318 $logger->debug( "Reserving ($group) $server" );
391              
392             # only one transfer from localhost to each dc
393 144 100       3336 if ( $server eq "localhost" ) {
394 42         216 delete $self->data->{available}->{$group}->{localhost};
395             }
396              
397 144         770 $self->children->{ $server }++;
398 144         461 return $server;
399             }
400              
401             sub _get_remaining_servers {
402 216     216   844 my ( $self, $group ) = @_;
403              
404 216 50       4610 return unless $self->data->{remaining};
405              
406 216 100       1141 return unless $self->data->{remaining}->{ $group };
407              
408 210         312 my @hosts = $self->_sort_servers( $group, keys %{ $self->data->{remaining}->{ $group } } );
  210         4748  
409 210         2561 return @hosts;
410             }
411              
412             sub _sort_servers {
413 867     867   2946 my ( $self, $group, @servers ) = @_;
414              
415             # sort servers based on original insertion order
416 867         6385 @servers = sort { $self->sort_order->{$group}->{$a} <=> $self->sort_order->{$group}->{$b} } @servers;
  2751         11957  
417              
418 867         6621 return @servers;
419             }
420              
421             sub _reserve_remaining_server {
422 144     144   1101 my ( $self, $group ) = @_;
423              
424 144 50       448 if ( $self->_get_remaining_servers( $group ) ) {
425 144         346 my $server = ( $self->_sort_servers( $group, keys %{ $self->data->{remaining}->{ $group } } ) )[0];
  144         891  
426 144         1213 delete $self->data->{remaining}->{ $group }->{$server};
427 144         1044 $logger->debug( "Reserving ($group) $server" );
428              
429             # delete remaining data structure as groups are completed
430 144 100       3594 unless ( scalar keys %{ $self->data->{remaining}->{ $group } } ) {
  144         1591  
431 15         499 $logger->debug( "Group empty: $group" );
432 15         258 delete $self->data->{remaining}->{ $group };
433 15 100       315 unless ( scalar ( keys %{ $self->data->{remaining} } ) ) {
  15         163  
434 12         172 $logger->debug( "No servers remaining" );
435 12         220 delete $self->data->{remaining};
436             }
437             }
438 144         514 return $server;
439             }
440             }
441              
442             sub _get_remaining_groups {
443 276     276   843 my ( $self ) = @_;
444 276 100       1516 return unless $self->data->{remaining};
445 271         420 my @keys = sort keys %{ $self->data->{remaining} };
  271         2613  
446 271 100       997 return unless scalar @keys;
447 261         1932 return @keys;
448             }
449              
450             sub _mark_available {
451 163     163   364 my ( $self, $group, $server ) = @_;
452              
453             # only the initial transfer to each dc comes from localhost. iff
454             # first transfer (from localhost) succeeded, then don't reschedule
455             # for future syncs
456 163 100       728 if ( $server eq "localhost" ) {
457              
458 34 50       769 if ( $self->data->{completed}->{ $group } ) {
459              
460 34         203 delete $self->data->{available}->{ $group }->{$server};
461 34         100 return;
462             }
463             }
464              
465 129         1230 $logger->debug( "Server available: ($group) $server" );
466 129         1907 $self->data->{available}->{ $group }->{$server} = 1;
467              
468             # reduce count of number of active processes on this server
469 129 100       881 if ( $self->children->{$server} ) {
470 48         759 $self->children->{$server}--;
471             }
472             }
473              
474             sub _mark_remaining {
475 0     0   0 my ( $self, $group, $server ) = @_;
476              
477 0         0 $logger->debug( "Server remaining: ($group) $server" );
478 0         0 $self->data->{remaining}->{ $group }->{$server} = 1;
479             }
480              
481             sub _mark_completed {
482 81     81   237 my ( $self, $group, $server ) = @_;
483              
484 81         1340 $logger->debug( "Server completed: ($group) $server" );
485 81         694 push @{ $self->data->{completed}->{ $group } }, $server;
  81         544  
486             }
487              
488             sub _mark_failed {
489 2     2   5 my ( $self, $group, $server, $fatal ) = @_;
490              
491 2         12 $logger->debug( "Server failed: ($group) $server" );
492              
493 2 100       20 if ( $fatal ) {
494 1         23 $self->data->{failed}->{ $group }->{ $server } = $self->max_failures;
495             }
496             else {
497 1         10 $self->data->{failed}->{ $group }->{ $server }++;
498             }
499              
500 2         7 my $failures = $self->data->{failed}->{ $group }->{ $server };
501 2         10 $logger->debug( "$failures failures for ($group) $server" );
502 2         23 return $failures;
503             }
504              
505             sub _get_failed_count {
506 89     89   191 my ( $self, $group ) = @_;
507              
508 89         663 my $errors = 0;
509 89         181 my $failures = 0;
510              
511 89 100 66     746 if ( $self->data->{failed} && $self->data->{failed}->{ $group } ) {
512 2         5 for my $server ( keys %{ $self->data->{failed}->{ $group }} ) {
  2         17  
513 2         7 $errors += $self->data->{failed}->{ $group }->{ $server };
514 2 50       12 if ( $self->data->{failed}->{ $group }->{ $server } >= $self->max_failures ) {
515 2         5 $failures++;
516             }
517             }
518             }
519              
520 89         280 return ( $errors, $failures );
521             }
522              
523             # tracing all the attempted transfers
524             sub get_transfer_map {
525 16     16 1 518 my ( $self ) = @_;
526              
527 16         1381 return $self->transfer_map;
528             }
529              
530              
531             1;
532              
533             __END__