File Coverage

blib/lib/Test/HadoopSingleNode.pm
Criterion Covered Total %
statement 43 45 95.5
branch n/a
condition n/a
subroutine 15 15 100.0
pod n/a
total 58 60 96.6


line stmt bran cond sub pod time code
1             package Test::HadoopSingleNode;
2              
3 4     4   12702 use strict;
  4         9  
  4         148  
4 4     4   20 use warnings;
  4         6  
  4         110  
5              
6 4     4   93 use 5.008;
  4         13  
  4         140  
7 4     4   19 use Carp;
  4         6  
  4         371  
8              
9 4     4   4290 use File::Temp qw(tempdir);
  4         79785  
  4         314  
10 4     4   6417 use Test::TCP qw(wait_port empty_port);
  4         201111  
  4         306  
11 4     4   2768 use File::Which qw(which);
  4         3202  
  4         252  
12 4     4   3508 use Proc::Guard qw(proc_guard);
  4         21926  
  4         252  
13              
14 4     4   3593 use IPC::Open3 qw(open3);
  4         18799  
  4         276  
15 4     4   32 use POSIX ":sys_wait_h";
  4         9  
  4         32  
16 4     4   7199 use IO::Select;
  4         7135  
  4         214  
17 4     4   29 use IO::Handle;
  4         6  
  4         151  
18 4     4   89 use Symbol qw(gensym);
  4         9  
  4         201  
19 4     4   6136 use IPC::Run qw(harness timeout);
  4         149040  
  4         429  
20              
21 4     4   6440 use XML::Simple;
  0            
  0            
22             use Data::Rmap qw(rmap);
23             use Path::Class qw(dir file);
24             use File::Copy qw(copy);
25             use Data::Dumper;
26              
27             our $VERSION = '0.02_04';
28              
29             # Tested at hadoop-0.20.2-cdh3u1
30             our $conf_files = +{
31             'core-site.xml' => +{
32             property => [
33             +{
34             name => "hadoop.tmp.dir",
35             value => "__TMP_DIR__/tmp",
36             },
37             +{
38             name => "fs.default.name",
39             value => "hdfs://localhost:__FS_PORT__",
40             },
41             ],
42             },
43             'mapred-site.xml' => +{
44             property => [
45             +{
46             name => "mapred.job.tramrer",
47             value => "hdfs://localhost:__MR_PORT__",
48             },
49             ],
50             },
51             'hdfs-site.xml' => +{
52             property => [
53             +{
54             name => "dfs.replication",
55             value => 1,
56             },
57             ],
58             },
59             };
60              
61             our @HADOOP_DAEMON_PROCESS_NAMES = qw/
62             org.apache.hadoop.hdfs.server.namenode.NameNode
63             org.apache.hadoop.hdfs.server.datanode.DataNode
64             /;
65              
66             my $DEBUG_MSG = 1;
67              
68             use Class::Accessor::Lite (
69             rw => [ qw(hadoop_conf_dir fs_port mr_port) ],
70             );
71              
72             #-------------------------------------------------------------------------------
73             sub new {
74             my ($class, %args) = @_;
75             my $self = bless +{
76             fs_port => empty_port,
77             mr_port => empty_port,
78             running_hadoop_conf_dir => $args{running_hadoop_conf_dir} || $ENV{TEST_HADOOP_SINGLENODE_CONF},
79             default_hadoop_conf_dir => $args{default_hadoop_conf_dir},
80             _RUNNING_TEST_HADOOP => 0,
81             _fixture_files => [],
82             }, $class;
83             $self->{hadoop_bin} = scalar(which('hadoop'));
84             unless ($self->{hadoop_bin}) {
85             return;
86             }
87             $self->{hadoop_conf_dir} = $self->_set_hadoop_conf_dir() || die "conf dir specified";
88             $self;
89             }
90              
91             sub DESTROY {
92             my $self = shift;
93             $self->stop_all;
94             }
95              
96             sub start_all {
97             my ($self) = @_;
98             # Use running hadoop
99             if ($self->{running_hadoop_conf_dir}) {
100             die "hadoop isnt running" unless (__check_alived_hadoop());
101             warn "hadoop is already running" if ($DEBUG_MSG);
102             $self->{_RUNNING_TEST_HADOOP} = 1;
103             return;
104             }
105             # Check running hadoop process
106             if (my @pids = __check_alived_hadoop()) {
107             die 'hadoop is running. (' . join(',', @pids) . ')';
108             }
109             # Format namenode
110             $self->_cmd_harness_y_only( scalar(which('hadoop')), $self->_add_hadoop_opt, 'namenode', '-format' );
111             # Start hdfs
112             my $dfs_proc = proc_guard( scalar(which('start-dfs.sh')), $self->_add_hadoop_opt );
113             # Start mapred
114             my $mr_proc = proc_guard( scalar(which('start-mapred.sh')), $self->_add_hadoop_opt );
115             while ( __check_alived_hadoop(@HADOOP_DAEMON_PROCESS_NAMES) < scalar @HADOOP_DAEMON_PROCESS_NAMES ) {
116             sleep 3;
117             print '.';
118             }
119             $self->{_RUNNING_TEST_HADOOP} = 1;
120             }
121              
122             sub construct_fixture {
123             my ($self, %args) = @_;
124             for my $dist (keys %args) {
125             my $local = $args{$dist};
126             die "$local not found" unless (-e $local);
127             my ($ret, $out, $err)
128             = $self->cmd( scalar(which('hadoop')), $self->_add_hadoop_opt, 'fs', '-put', $local, $dist );
129             carp "hdfs put : $local => $dist" if ($DEBUG_MSG);
130             carp "failed to put on hdfs($ret) : $err" if ($ret != 0);
131             push @{$self->{_fixture_files}}, $dist;
132             }
133             }
134              
135             sub stop_all {
136             my ($self) = @_;
137             return if ($self->{running_hadoop_conf_dir});
138             return if (scalar __check_alived_hadoop() == 0);
139             return if ($self->{_RUNNING_TEST_HADOOP} == 0);
140             $self->cmd(scalar(which('stop-all.sh')), $self->_add_hadoop_opt );
141             if (__check_alived_hadoop()) {
142             carp join ' ', "Perhaps fail to stop hadoop daemon."
143             , "Please stop daemons manually."
144             , "ex: \$HADOOP_HOME/bin/stop-all.sh --config \$HADOOP_CONF_DIR"
145             , "or ", scalar(which('stop-all.sh')), $self->_add_hadoop_opt;
146             }
147             else {
148             $self->{_RUNNING_TEST_HADOOP} = 0;
149             }
150             }
151              
152             sub cleanup {
153             my ($self) = @_;
154             for (@{$self->{_fixture_files}}) {
155             $self->cmd( scalar(which('hadoop')), $self->_add_hadoop_opt, 'fs', '-rmr', $_ );
156             }
157             }
158              
159             #-------------------------------------------------------------------------------
160              
161             sub _set_hadoop_conf_dir {
162             my ($self) = @_;
163              
164             return $self->{running_hadoop_conf_dir} if ($self->{running_hadoop_conf_dir});
165              
166             my $hadoop_conf_dir = tempdir();
167             my $log_dir = "$hadoop_conf_dir/log";
168             mkdir($log_dir, 0700) unless (-d $log_dir);
169             # Replace confs
170             __replace_data($conf_files,
171             +{
172             __TMP_DIR__ => $hadoop_conf_dir,
173             __FS_PORT__ => $self->fs_port,
174             __MR_PORT__ => $self->mr_port,
175             __LOG_DIR__ => $log_dir,
176             }
177             );
178             # Copy default configs
179             if (defined $self->{default_hadoop_conf_dir}
180             && -d $self->{default_hadoop_conf_dir}) {
181             my $dir = dir( $self->{default_hadoop_conf_dir});
182             while (my $file = $dir->next ) {
183             copy($file, $hadoop_conf_dir);
184             }
185             }
186             # Overwrite configs
187             for my $filename (keys %{$conf_files}) {
188             if ($filename =~ /.xml$/) {
189             XML::Simple->new()->XMLout(
190             $conf_files->{$filename},
191             XMLDecl => '',
192             NoAttr => 1,
193             RootName => "configuration",
194             outputfile => "$hadoop_conf_dir/$filename",
195             );
196             }
197             else {
198             open my $fh, '>', "$hadoop_conf_dir/$filename" or die $!;
199             for my $content (@{$conf_files->{$filename}}) {
200             print $fh $content, "\n";
201             }
202             close $fh;
203             }
204             }
205             $hadoop_conf_dir;
206             }
207              
208             sub cmd {
209             my ($self, @cmd) = @_;
210             carp join(' ', 'exec command:', @cmd) if ($DEBUG_MSG);
211             my ($output, $error) = ('', '');
212             my ($rdr, $err) = (gensym, gensym);
213             my $pid = IPC::Open3::open3(undef, $rdr, $err, @cmd);
214             my $reader = new IO::Select($rdr, $err);
215              
216             while (1) {
217             while (my @ready = $reader->can_read()) {
218             foreach my $fh (@ready) {
219             my $data;
220             my $length = sysread $fh, $data, 4096;
221             if( ! defined $length || $length == 0 ) {
222             $err .= "Error from child: $!\n" unless defined $length;
223             $reader->remove($fh);
224             } else {
225             if ($fh == $rdr) {
226             $output .= $data;
227             } elsif ($fh == $err) {
228             $error .= $data;
229             } else {
230             die "BUG: got an unexpected filehandle:" . Dumper($data);
231             }
232             }
233             }
234             }
235             if (waitpid( $pid, WNOHANG ) > 0) {
236             last;
237             }
238             }
239             my $retcode = WIFEXITED($?) ? WEXITSTATUS($?) : WTERMSIG($?);
240             return wantarray ? ($retcode, $output, $error) : $retcode;
241             }
242              
243             sub _cmd_harness_y_only {
244             my ($self, @cmd) = @_;
245             carp join(' ', @cmd) if ($DEBUG_MSG);
246             local $ENV{LANG} = "C";
247             eval {
248             my ($in, $out, $err);
249             my $h = IPC::Run::harness(\@cmd, \$in, \$out, \$err, timeout( 600 ));
250             $h->start;
251             $in .= "Y\n";
252             $h->finish;
253             print $out if ($DEBUG_MSG);
254             warn $err if ($DEBUG_MSG);
255             };
256             if ($@) {
257             die join(' ', @cmd, 'error', $@);
258             }
259             }
260              
261             sub _add_hadoop_opt {
262             my ($self, @args) = @_;
263             my @params;
264             push @params, ('--config', $self->{hadoop_conf_dir})
265             if (exists $self->{hadoop_conf_dir} && $self->{hadoop_conf_dir});
266             @params;
267             }
268              
269             #-------------------------------------------------------------------------------
270             # Utility methods
271              
272             sub __check_alived_hadoop {
273             my @process_names = @_;
274             my @pids =();
275             if (my $ps = which('ps')) {
276             my $cmd =
277             ($^O =~ /linux/i) ? qq|$ps -n -o "%p %a"| # linux
278             : ($^O =~ /darwin/i) ? qq|$ps -Ao "pid command"| # osx
279             : '';
280             return unless $cmd;
281             my $regex = (@process_names)
282             ? '(' . join('|', map { $_ =~ s/\./\\./go; $_; } @process_names) . ')'
283             : '';
284             for my $ps_cmd (split "\n", `$cmd`) {
285             next if ($regex && $ps_cmd !~ /$regex/i);
286             if ($ps_cmd =~ /\s*(\d+).*org\.apache\.hadoop/i) {
287             push @pids, $1;
288             }
289             }
290             }
291             return @pids;
292             }
293              
294             sub __replace_data {
295             my ($stash, $replace) = @_;
296             for my $key (keys %{$replace}) {
297             rmap { s/$key/$replace->{$key}/ unless ref $_ } $stash;
298             }
299             }
300              
301             sub __dd {
302             my (@args) = @_;
303             return unless $DEBUG_MSG;
304             warn map { Dumper $_ } @args;
305             }
306              
307             1;
308             __END__