File Coverage

blib/lib/App/Prove/Plugin/Distributed.pm
Criterion Covered Total %
statement 60 256 23.4
branch 5 108 4.6
condition 3 22 13.6
subroutine 17 25 68.0
pod 5 5 100.0
total 90 416 21.6


line stmt bran cond sub pod time code
1             package App::Prove::Plugin::Distributed;
2              
3 1     1   24773 use strict;
  1         3  
  1         44  
4 1     1   1401 use Getopt::Long;
  1         16847  
  1         8  
5 1     1   169 use Carp;
  1         7  
  1         103  
6 1     1   6 use Test::More;
  1         1  
  1         10  
7 1     1   1533 use IO::Socket::INET;
  1         29043  
  1         8  
8 1     1   726 use Cwd;
  1         3  
  1         75  
9              
10 1     1   1326 use Sys::Hostname;
  1         1431  
  1         59  
11 1     1   142 use constant LOCK_EX => 2;
  1         3  
  1         69  
12 1     1   6 use constant LOCK_NB => 4;
  1         2  
  1         39  
13 1     1   5 use File::Spec;
  1         2  
  1         27  
14              
15 1     1   5 use vars (qw($VERSION @ISA));
  1         2  
  1         545  
16              
17             my $error = '';
18              
19             =head1 NAME
20              
21             App::Prove::Plugin::Distributed - an L plugin to distribute test jobs using client and server model.
22              
23             =head1 VERSION
24              
25             Version 0.08
26              
27             =cut
28              
29             $VERSION = '0.08';
30              
31             =head1 SYNOPSIS
32              
33             # All of the examples below is loading tests into the worker perl processes
34             # If you want to run the tests in a separate perl process, you can specify
35             # the '--detach' option to accomplish that.
36            
37             # Default workers with L as worker processes.
38             prove -PDistributed -j2 t/
39              
40             # Distributed jobs with LSF workers.
41             prove -PDistributed --distributed-type=LSF -j2 t/
42              
43             # Distributed jobs with SSH workers.
44             prove -PDistributed --distributed-type=SSH -j2 --hosts=host1,host2 t/
45              
46             # If you are using home network that does not have name server setup,
47             # you can specify the option --use-local-public-ip
48             prove -PDistributed --distributed-type=SSH --use-local-public-ip -j2 --hosts=host1,host2 t/
49            
50             # Distributed jobs with PBS workers using L. Note: This is not tested yet.
51             prove -PDistributed --distributed-type=PBS -j2 t/
52              
53             # Distributed jobs with PBS workers using L. Note: This is not tested yet.
54             # With PBS option
55             prove -PDistributed --distributed-type=PBS --mem=200 -j2 t/
56              
57             =head1 DESCRIPTION
58              
59             A plugin for App::Prove to distribute job. The core implementation of the plugin is to
60             provide a easy interface and functionality to extend the use of any distribution method.
61              
62             The initiate release of this module was using the idea from L that load perl
63             code file using "do" perl function to the worker perl process to execute tests.
64              
65             Currently, the distribution comes with a few implementation of distribution methods to
66             initiate external "worker" processes.
67             Shown below is the list.
68            
69             L
70             LSF
71             SSH
72             L * Note: PBS implemetation is not tested yet.
73              
74             =head1 FUNCTIONS
75              
76             Basic functions.
77              
78             =head3 C
79              
80             Load the plugin configuration.
81             It will setup all of the tests to be distributed through the
82             L source handler class.
83              
84             =cut
85              
86             sub load {
87 0     0 1   my ( $class, $p ) = @_;
88 0           my @args = @{ $p->{args} };
  0            
89 0           my $app = $p->{app_prove};
90             {
91 0           local @ARGV = @args;
  0            
92              
93 0           push @ARGV, grep { /^--/ } @{ $app->{argv} };
  0            
  0            
94 0           $app->{argv} = [ grep { !/^--/ } @{ $app->{argv} } ];
  0            
  0            
95 0           Getopt::Long::Configure(qw(no_ignore_case bundling pass_through));
96              
97             # Don't add coderefs to GetOptions
98 0 0         GetOptions(
99             'manager=s' => \$app->{manager},
100             'distributed-type=s' => \$app->{distributed_type},
101             'start-up=s' => \$app->{start_up},
102             'tear-down=s' => \$app->{tear_down},
103             'error-log=s' => \$app->{error_log},
104             'detach' => \$app->{detach},
105             'sync-type=s' => \$app->{sync_type},
106             'source-dir=s' => \$app->{source_dir},
107             'destination-dir=s' => \$app->{destination_dir},
108             ) or croak('Unable to continue');
109              
110             #LSF: We pass the option to the source handler if the source handler want the options.
111 0 0         unless ( $app->{manager} ) {
112 0 0         my $source_handler_class =
113             'TAP::Parser::SourceHandler::'
114             . 'Worker'
115             . (
116             $app->{distributed_type}
117             ? '::' . $app->{distributed_type}
118             : ''
119             );
120 0           eval "use $source_handler_class";
121 0 0         unless ($@) {
122 0 0         unless ( $source_handler_class->load_options( $app, \@ARGV ) ) {
123 0           croak('Unable to continue without needed worker options.');
124             }
125             }
126             }
127              
128             }
129 0           my $type = $app->{distributed_type};
130 0 0         my $option_name = '--worker' . ( $type ? '-' . lc($type) : '' ) . '-option';
131 0 0 0       if ( $app->{argv}->[0]
132             && $app->{argv}->[0] =~ /$option_name=number_of_workers=(\d+)/ )
133             {
134 0 0         if ( $app->{jobs} ) {
135 0           die
136             "-j and $option_name=number_of_workers are mutually exclusive.\n";
137             }
138             else {
139 0           $app->{jobs} = $1;
140             }
141             }
142             else {
143 0   0       $app->{jobs} ||= 1;
144 0           unshift @{ $app->{argv} },
  0            
145             "$option_name=number_of_workers=" . $app->{jobs};
146             }
147              
148 0           for (
149             qw(start_up tear_down error_log detach sync_type source_dir destination_dir)
150             )
151             {
152 0 0         if ( $app->{$_} ) {
153 0           unshift @{ $app->{argv} }, "$option_name=$_=" . $app->{$_};
  0            
154             }
155             }
156              
157 0 0         if ( $app->{sync_type} ) {
158              
159 1     1   6 no warnings 'redefine';
  1         2  
  1         680  
160             #LSF: If we do rsync, that means we want to keep the switches unmodified.
161             *App::Prove::_get_lib = sub {
162 0     0     my $self = shift;
163 0           my @libs;
164 0 0         if ( $self->lib ) {
165 0           push @libs, 'lib';
166             }
167 0 0         if ( $self->blib ) {
168 0           push @libs, 'blib/lib', 'blib/arch';
169             }
170 0 0         if ( @{ $self->includes } ) {
  0            
171 0           push @libs, @{ $self->includes };
  0            
172             }
173              
174             #LSF: Override the original not to do the abs path.
175             #@libs = map { File::Spec->rel2abs($_) } @libs;
176              
177             # Huh?
178 0 0         return @libs ? \@libs : ();
179              
180 0           };
181             }
182              
183 0 0         unless ( $app->{manager} ) {
184              
185             #LSF: Set the iterator.
186 0 0         $app->sources(
187             [
188             'Worker'
189             . (
190             $app->{distributed_type}
191             ? '::' . $app->{distributed_type}
192             : ''
193             )
194             ]
195             );
196 0           return 1;
197             }
198              
199 0   0       my $original_perl_5_lib = $ENV{PERL5LIB} || '';
200 0           my @original_include = $class->extra_used_libs();
201 0 0         if ( $app->{includes} ) {
202 0           my @includes = split /:/, $original_perl_5_lib;
203 0           unshift @includes, @original_include;
204 0           unshift @includes, @{ $app->{includes} };
  0            
205 0           my %found;
206             my @wanted;
207 0           for my $include (@includes) {
208 0 0         unless ( $found{$include}++ ) {
209 0           push @wanted, $include;
210             }
211             }
212 0           $ENV{PERL5LIB} = join ':', @wanted;
213              
214             #LSF: Put the extra libraries to @INC.
215 0           %found = map { $_ => 1 } @INC;
  0            
216 0           for (@wanted) {
217 0 0         unshift @INC, $_ unless ( $found{$_} );
218             }
219             }
220              
221             #LSF: Sync test environment here.
222 0 0         if ( $app->{sync_type} ) {
223 0           my $method = $app->{sync_type} . '_test_env';
224 0 0         unless ( $class->can($method) ) {
225 0           die "not able to sync on the remote with type "
226             . $app->{sync_type}
227             . ".\nCurrently, only the rsync type is supported.\n";
228             }
229              
230 0 0         unless ( $class->$method($app) ) {
231 0           die $error;
232             }
233             }
234              
235             #LSF: Start up.
236 0 0         if ( $app->{start_up} ) {
237 0 0         unless ( $class->_do( $app->{start_up} ) ) {
238 0           die "start server error with error [$error].\n";
239             }
240             }
241              
242 0           while (1) {
243              
244             #LSF: The is the server to serve the test.
245 0 0         $class->start_server(
    0          
246             app => $app,
247             spec => $app->{manager},
248             ( $app->{error_log} ? ( error_log => $app->{error_log} ) : () ),
249             ( $app->{detach} ? ( detach => $app->{detach} ) : () ),
250              
251             );
252             }
253              
254             #LSF: Anything below here might not be called.
255             #LSF: Tear down
256 0 0         if ( $app->{tear_down} ) {
257 0 0         unless ( $class->_do( $app->{tear_down} ) ) {
258 0           die "tear down error with error [$error].\n";
259             }
260             }
261 0           $ENV{PER5LIB} = $original_perl_5_lib;
262 0           @INC = @original_include;
263 0           return 1;
264             }
265              
266             =head3 C
267              
268             Return a list of paths in @INC that are not part of the compiled-in lsit of paths
269              
270             =cut
271              
272             my @initial_compiled_inc;
273              
274             BEGIN {
275 1     1   6 use Config;
  1         2  
  1         269  
276              
277 1     1   21 my @var_list = (
278             'updatesarch', 'updateslib', 'archlib', 'privlib',
279             'sitearch', 'sitelib', 'sitelib_stem', 'vendorarch',
280             'vendorlib', 'vendorlib_stem', 'extrasarch', 'extraslib',
281             );
282              
283 1         3 for my $var_name (@var_list) {
284 12 100 100     4027 if ( $var_name =~ /_stem$/ && $Config{$var_name} ) {
285 1         50 my @stem_list = ( split( ' ', $Config{'inc_version_list'} ), '' );
286 1         10 push @initial_compiled_inc,
287 1         4 map { $Config{$var_name} . "/$_" } @stem_list;
288             }
289             else {
290 11 100       968 push @initial_compiled_inc, $Config{$var_name}
291             if $Config{$var_name};
292             }
293             }
294              
295             # . is part of the initial @INC unless in taint mode
296 1 50       53 push @initial_compiled_inc, '.' if ( ${^TAINT} == 0 );
297              
298 1         2 map { s/\/+/\//g } @initial_compiled_inc;
  6         36  
299 1         2 map { s/\/+$// } @initial_compiled_inc;
  6         767  
300             }
301              
302             sub extra_used_libs {
303 0     0 1   my $class = shift;
304              
305 0           my @extra;
306 0           my @compiled_inc = @initial_compiled_inc;
307 0           my @perl5lib = split( ':', $ENV{PERL5LIB} );
308 0           map { $_ =~ s/\/+$// }
  0            
309             ( @compiled_inc, @perl5lib ); # remove trailing slashes
310 0   0       map { $_ = Cwd::abs_path($_) || $_ } ( @compiled_inc, @perl5lib );
  0            
311 0           for my $inc (@INC) {
312 0           $inc =~ s/\/+$//;
313 0   0       my $abs_inc = Cwd::abs_path($inc)
314             || $inc; # should already be expanded by UR.pm
315 0 0         next if ( grep { $_ =~ /^$abs_inc$/ } @compiled_inc );
  0            
316 0 0         next if ( grep { $_ =~ /^$abs_inc$/ } @perl5lib );
  0            
317 0           push @extra, $inc;
318             }
319              
320             #unshift @extra, ($ENV{PERL_USED_ABOVE} ? split(":", $ENV{PERL_USED_ABOVE}) : ());
321              
322 0           map { $_ =~ s/\/+$// } @extra; # remove trailing slashes again
  0            
323             #@extra = _unique_elements(@extra);
324              
325 0           return @extra;
326             }
327              
328             =head3 C
329              
330             Start a server to serve the test.
331              
332             Parameter is the contoller peer address.
333              
334             =cut
335              
336             sub start_server {
337 0     0 1   my $class = shift;
338 0           my %args = @_;
339 0           my ( $app, $spec, $error_log, $detach ) =
340             @args{ 'app', 'spec', 'error_log', 'detach' };
341              
342 0           my $socket = IO::Socket::INET->new(
343             PeerAddr => $spec,
344             Proto => 'tcp'
345             );
346 0 0         unless ($socket) {
347 0           die "failed to connect to controller with address : $spec.\n";
348             }
349              
350             #LSF: Waiting for job from controller.
351 0           my $job_info = <$socket>;
352 0           chomp($job_info);
353              
354             #LSF: Run job.
355 0           my $pid = fork();
356 0 0         if ($pid) {
    0          
357 0           waitpid( $pid, 0 );
358             }
359             elsif ( $pid == 0 ) {
360              
361             #LSF: Intercept all output from Test::More. Output all of them at one.
362 0           my $builder = Test::More->builder;
363 0           $builder->output($socket);
364 0           $builder->failure_output($socket);
365 0           $builder->todo_output($socket);
366 0 0         if ($detach) {
367 0           my @command =
368             ( $job_info,
369 0 0         ( $app->{test_args} ? @{ $app->{test_args} } : () ) );
370             {
371 0           require TAP::Parser::Source;
  0            
372 0           require TAP::Parser::SourceHandler::Worker;
373 0           require TAP::Parser::SourceHandler::Perl;
374 0           my $source = TAP::Parser::Source->new();
375 0           $source->raw( \$job_info )->assemble_meta;
376 0           my $vote =
377             TAP::Parser::SourceHandler::Worker->can_handle($source);
378 0 0         if ( $vote > 0.25 ) {
379 0           unshift @command,
380             TAP::Parser::SourceHandler::Perl->get_perl();
381             }
382 0           open STDOUT, ">&", $socket;
383 0           open STDERR, ">&", $socket;
384 0 0         exec(@command)
385             or print $socket "Error running command: $!\nCommand was: ",
386             join( ' ', @command ), "\n";
387             }
388 0           exit;
389             }
390 0           *STDERR = $socket;
391 0           *STDOUT = $socket;
392 0 0         unless ( $class->_do( $job_info, $app->{test_args} ) ) {
393 0 0         my $server_spec = (
394             $socket->sockhost eq '0.0.0.0'
395             ? hostname
396             : $socket->sockhost
397             )
398             . ':'
399             . $socket->sockport;
400 0           print $socket (
401             '# ',
402             ( join "\n# ", ( "Worker: <$spec>", ( split /\n/, $error ) ) ),
403             "\n"
404             );
405 0 0         if ($error_log) {
406 1     1   1133 use IO::File;
  1         3519  
  1         385  
407 0           my $fh = IO::File->new( "$error_log", 'a+' );
408 0 0         unless ( flock( $fh, LOCK_EX | LOCK_NB ) ) {
409 0           warn "can't immediately write-lock ",
410             "the file ($!), blocking ...";
411 0 0         unless ( flock( $fh, LOCK_EX ) ) {
412 0           die "can't get write-lock on numfile: $!";
413             }
414             }
415 0           print $fh (
416             join "\n",
417             (
418             "<< START $job_info >>",
419             "SERVER: $server_spec",
420             "PID: $$",
421             "ERROR: $error",
422             "<< END $job_info >>"
423             )
424             );
425 0           close $fh;
426             }
427             }
428              
429             #LSF: How to exit with END block trigger.
430 0           &trigger_end_blocks_before_child_process_exit();
431              
432             #LSF: Might not need this.
433 0           $socket->flush;
434 0           exit(0);
435             }
436             else {
437 0           die "should not get here.\n";
438             }
439 0           $socket->close();
440 0           return 1;
441             }
442              
443             sub _do {
444 0     0     my $proto = shift;
445 0           my $job_info = shift;
446 0           my $args = shift;
447 0           my $cwd = File::Spec->rel2abs('.');
448              
449             #LSF: The code from here to exit is from L module.
450 0     0     local *CORE::GLOBAL::exit = sub { die 'notr3a11yeXit' };
  0            
451 0           local $0 = $job_info; #fixes FindBin (in English $0 means $PROGRAM_NAME)
452 1     1   6 no strict; # default for Perl5
  1         1  
  1         193  
453             {
454              
455 0           package main;
456 0 0         local @ARGV = $args ? @$args : ();
457 0           do $0; # do $0; could be enough for strict scripts
458 0           chdir($cwd);
459              
460 0 0         if ($EVAL_ERROR) {
    0          
461 0           $EVAL_ERROR =~ s{\n+\z}{};
462 0 0         unless ( $EVAL_ERROR =~ m{^notr3a11yeXit} ) {
463 0           $error = $EVAL_ERROR;
464 0           return;
465             }
466             }
467             elsif ($@) {
468 0           $error = $@;
469 0           return;
470             }
471             }
472 0           return 1;
473             }
474              
475             =head3 C
476              
477             Trigger END blocks before the child process exit.
478             The main reason is to have the Test::Builder to have
479             change to finish up.
480              
481             =cut
482              
483             sub trigger_end_blocks_before_child_process_exit {
484 0     0 1   my $original_pid;
485 0 0 0       if ( $Test::Builder::Test && $Test::Builder::Test->{Original_Pid} != $$ ) {
486 0           $original_pid = $Test::Builder::Test->{Original_Pid};
487 0           $Test::Builder::Test->{Original_Pid} = $$;
488             }
489 1     1   4 use B;
  1         2  
  1         297  
490 0           my @ENDS = B::end_av->ARRAY;
491 0           for my $END (@ENDS) {
492 0           $END->object_2svref->();
493             }
494 0 0 0       if ( $Test::Builder::Test && $original_pid ) {
495 0           $Test::Builder::Test->{Original_Pid} = $original_pid;
496             }
497             }
498              
499             =head3 C
500              
501             Rsync test enviroment to the worker host.
502              
503             Parameters $app object
504             Returns boolean
505              
506             =cut
507              
508             sub rsync_test_env {
509 0     0 1   my $proto = shift;
510 0           my $app = shift;
511 0           my $manager = $app->{manager};
512              
513 0           my ( $host, $port ) = split /:/, $manager, 2;
514 0           my $dest = $app->{destination_dir};
515 0 0         unless ($dest) {
516 0           require File::Temp;
517 0           $dest = File::Temp::tempdir();
518             }
519              
520             #LSF: Some system the rsync will not automatically using the current user.
521             # Let get the user for rsync.
522 0           my $user = $ENV{USER};
523              
524 0           my $source = $app->{source_dir};
525 0           require File::Rsync;
526 0           my $rsync = File::Rsync->new( { archive => 1, compress => 1 } );
527             $rsync->exec(
528             {
529             src => ( $user ? "$user\@" : '' ) . "$host:$source",
530             dest => "$dest",
531             }
532 0 0         ) or do { $error = "rsync failed\n$!"; return; };
  0 0          
  0            
533              
534             #LSF: Let change directory to destination.
535 0           chdir "$dest";
536              
537 0           return 1;
538             }
539              
540             1;
541              
542             __END__