File Coverage

blib/lib/Apache/Hadoop/WebHDFS.pm
Criterion Covered Total %
statement 18 364 4.9
branch 0 306 0.0
condition n/a
subroutine 6 26 23.0
pod 19 19 100.0
total 43 715 6.0


line stmt bran cond sub pod time code
1             #!/usr/bin/perl
2             package Apache::Hadoop::WebHDFS;
3             our $VERSION = "0.04";
4 1     1   27665 use warnings;
  1         2  
  1         37  
5 1     1   7 use strict;
  1         2  
  1         36  
6 1     1   1109 use lib '.';
  1         801  
  1         5  
7 1     1   934 use parent 'WWW::Mechanize';
  1         270  
  1         5  
8 1     1   261848 use Carp;
  1         3  
  1         69  
9 1     1   843 use File::Map 'map_file';
  1         7037  
  1         6  
10              
11             # ###################
12             # WWW::Mech calls we care about
13             # $m -> get('http://url.com') Does a get on that url
14             # $m -> put('http://blah.com', content=$content)
15             #
16             # $m -> success() boolean if last request was success
17             # $m -> content() content of request, which can be formated
18             # $m -> ct() content type returned, ie: 'application/json'
19             # $m -> status() HTTP status code of response
20              
21             sub redirect_ok {
22             # need to allow 'put' to follow redirect on 307 requests, per RFC 2616 section 10.3.8
23             # redirect_ok is part of LWP::UserAgent which is subclassed
24             # by WWW:Mech and finally Apache::Hadoop::WebHDFS.
25 0     0 1   return 1; # always return true.
26             }
27              
28             sub new {
29             # Create new WebHDFS object
30 0     0 1   my $class = shift;
31 0           my $namenode = 'localhost';
32 0           my $namenodeport= 50070;
33 0           my $authmethod = 'gssapi'; # 3 values: gssapi, unsecure, doas
34 0           my ($url, $urlpre, $urlauth, $user, $doas_user) = undef;
35            
36 0 0         if ($_[0]->{'doas_user'}) { $doas_user = $_[0]->{'doas_user'}; }
  0            
37 0 0         if ($_[0]->{'namenode'}) { $namenode = $_[0]->{'namenode'}; }
  0            
38 0 0         if ($_[0]->{'namenodeport'}) { $namenodeport = $_[0]->{'namenodeport'}; }
  0            
39 0 0         if ($_[0]->{'authmethod'}) { $authmethod = $_[0]->{'authmethod'}; }
  0            
40 0 0         if ($_[0]->{'user'}) { $user = $_[0]->{'user'}; }
  0            
41              
42             # stack_depth set to 0 so we don't blow-up ram by saving content with each request.
43 0           my $self = $class-> SUPER::new( agent=>"Apache_Hadoop_WebHDFS",
44             stack_depth=>"0",
45             );
46              
47 0           $self->{'namenode'} = $namenode;
48 0           $self->{'namenodeport'} = $namenodeport;
49 0           $self->{'authmethod'} = $authmethod;
50 0           $self->{'user'} = $user;
51 0           $self->{'doas_user'} = $doas_user;
52 0           return $self;
53             }
54              
55             sub getdelegationtoken {
56             # Fetch delegation token and store in object
57 0     0 1   my ( $self ) = shift;
58 0           my $token = '';
59 0           my $url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1/?op=GETDELEGATIONTOKEN&renewer=' . $self->{'user'};
60 0 0         if ($self->{'authmethod'} eq 'gssapi') {
61 0           $self->get( $url );
62 0 0         if ( $self->success() ) {
63 0           $token = substr ($self->content(), 23 , -3);
64             }
65 0           $self->{'webhdfstoken'}=$token;
66             } else {
67 0           carp "getdelgation token only valid when using GSSAPI" ;
68             }
69 0           return $self;
70             }
71              
72             sub canceldelegationtoken {
73             # Tell namenode to cancel existing delegation token and remove token from object
74 0     0 1   my ( $self ) = shift;
75 0 0         if ($self->{'authmethod'} eq 'gssapi') { if ( $self->{'webhdfstoken'} ) {
  0 0          
76 0           my $url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1/?op=CANCELDELEGATIONTOKEN&token=' . $self->{'webhdfstoken'};
77 0           $self->get( $url );
78 0           delete $self->{'webhdfstoken'}
79             }
80             } else {
81 0           carp "canceldelgation token only valid when using GSSAPI";
82             }
83 0           return $self;
84             }
85              
86             sub renewdelegationtoken {
87             # Tell namenode to cancel existing delegation token and remove token from object
88 0     0 1   my ( $self ) = shift;
89 0 0         if ($self->{'authmethod'} eq 'gssapi') {
90 0 0         if ( $self->{'webhdfstoken'} ) {
91 0           my $url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1/?op=RENEWDELEGATION&token=' . $self->{'webhdfstoken'};
92 0           $self->get( $url );
93 0           delete $self->{'webhdfstoken'}
94             }
95             } else {
96 0           carp "canceldelgation token only valid when using GSSAPI";
97             }
98 0           return $self;
99             }
100              
101             sub Open {
102             # curl -i -L "http://:/webhdfs/v1/?op=OPEN[&offset=][&length=][&buffersize=]"
103              
104 0     0 1   my $self = shift;
105 0           my ( $file, $offset, $length, $buffersize ) = undef;
106 0 0         if ($_[0]->{'file'}) { $file = $_[0]->{'file'}; } else { croak("No HDFS file given to open"); }
  0            
  0            
107 0 0         if ($_[0]->{'offset'}) { $offset = $_[0]->{'offset'}; }
  0            
108 0 0         if ($_[0]->{'length'}) { $length = $_[0]->{'length'}; }
  0            
109 0 0         if ($_[0]->{'buffersize'}) { $buffersize = $_[0]->{'buffersize'}; }
  0            
110              
111 0           my $url;
112 0 0         if ($self->{'authmethod'} eq 'gssapi') {
    0          
    0          
113 0           $url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $file . '?op=OPEN';
114             } elsif ( $self->{'authmethod'} eq 'unsecure' ) {
115 0 0         croak ("I need a 'user' value if authmethod is 'none'") if ( !$self->{'user'} ) ;
116 0           $url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $file . '?op=OPEN' . '&user.name=' . $self->{'user'};
117             } elsif ( $self->{'authmethod'} eq 'doas' ) {
118 0 0         croak ("I need a 'user' value if authmethod is 'doas'") if ( !$self->{'user'} ) ;
119 0 0         croak ("I need a 'doas_user' value if authmethod is 'doas'") if ( !$self->{'doas_user'} ) ;
120 0           $url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $file . '?op=OPEN' . '&user.name=' . $self->{'user'} . '&doas=' . $self->{'doas_user'};
121             }
122 0 0         if ( $self->{'webhdfstoken'} ) {
123 0           $url = $url . "&delegation=" . $self->{'webhdfstoken'};
124             }
125              
126 0 0         if ($offset) {
127 0           $url = $url . "&offset=" . $offset;
128             }
129              
130 0 0         if ($length) {
131 0           $url = $url . "&length=" . $length;
132             }
133              
134 0 0         if ($buffersize) {
135 0           $url = $url . "&buffersize=" . $buffersize;
136             }
137              
138 0           $self->get( $url );
139 0           return $self;
140             }
141              
142             sub getfilestatus {
143             # curl -i "http://:/webhdfs/v1/?op=GETFILESTATUS"
144 0     0 1   my ( $self, $file ) = undef;
145 0           $self = shift;
146 0 0         if ($_[0]->{'file'}) { $file = $_[0]->{'file'}; } else { croak ("Need HDFS filename before listing status") ;}
  0            
  0            
147            
148 0           my $url;
149 0 0         if ($self->{'authmethod'} eq 'gssapi') {
    0          
    0          
150 0           $url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $file . '?op=GETFILESTATUS';
151             } elsif ( $self->{'authmethod'} eq 'unsecure' ) {
152 0 0         croak ("I need a 'user' value if authmethod is 'none'") if ( !$self->{'user'} ) ;
153 0           $url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $file . '?op=GETFILESTATUS' . '&user.name=' . $self->{'user'};
154             } elsif ( $self->{'authmethod'} eq 'doas' ) {
155 0 0         croak ("I need a 'user' value if authmethod is 'doas'") if ( !$self->{'user'} ) ;
156 0 0         croak ("I need a 'doas_user' value if authmethod is 'doas'") if ( !$self->{'doas_user'} ) ;
157 0           $url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $file . '?op=GETFILESTATUS' . '&user.name=' . $self->{'user'} . '&doas=' . $self->{'doas_user'};
158             }
159 0 0         if ( $self->{'webhdfstoken'} ) {
160 0           $url = $url . "&delegation=" . $self->{'webhdfstoken'};
161             }
162 0           $self->get( $url );
163 0           return $self;
164             }
165              
166             # Added as LWP::UserAgent and WWW:Mechanize don't have delete
167             # stolen from http://code.google.com/p/www-mechanize/issues/detail?id=219
168             sub _SUPER_delete {
169 0     0     require HTTP::Request::Common;
170 0           my($self, @parameters) = @_;
171 0           my @suff = $self->_process_colonic_headers(\@parameters,1);
172 0           return $self->request( HTTP::Request::Common::DELETE( @parameters ), @suff );
173             }
174              
175             sub Delete {
176             # curl -i -X DELETE "http://:/webhdfs/v1/?op=DELETE[&recursive=]"
177 0     0 1   my ( $path, $recursive, $url ) = undef;
178 0           my $self = shift;
179              
180 0 0         if ($_[0]->{'path'}) { $path = $_[0]->{'path'}; } else { croak("No HDFS path given to delete"); }
  0            
  0            
181 0 0         if ($_[0]->{'recursive'}) { $recursive = $_[0]->{'recursive'};}
  0            
182              
183 0 0         if ($self->{'authmethod'} eq 'gssapi') {
    0          
    0          
184 0           $url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $path . '?op=DELETE';
185             } elsif ( $self->{'authmethod'} eq 'unsecure' ) {
186 0 0         croak ("I need a 'user' value if authmethod is 'none'") if ( !$self->{'user'} ) ;
187 0           $url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $path . '?op=DELETE' . '&user.name=' . $self->{'user'};
188             } elsif ( $self->{'authmethod'} eq 'doas' ) {
189 0 0         croak ("I need a 'user' value if authmethod is 'doas'") if ( !$self->{'user'} ) ;
190 0 0         croak ("I need a 'doas_user' value if authmethod is 'doas'") if ( !$self->{'doas_user'} ) ;
191 0           $url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $path . '?op=DELETE' . '&user.name=' . $self->{'user'} . '&doas=' . $self->{'doas_user'};
192             }
193 0 0         if ( $self->{'webhdfstoken'} ) {
194 0           $url = $url . "&delegation=" . $self->{'webhdfstoken'};
195             }
196              
197 0 0         if ($recursive) {
198 0           $url = $url . "&recursive=true";
199             } else {
200 0           $url = $url . "&recursive=false";
201             }
202              
203 0           $self->_SUPER_delete( $url );
204 0           return $self;
205             }
206              
207              
208             sub create {
209             # curl -i -X PUT "http://:/webhdfs/v1/?op=CREATE
210             # [&overwrite=][&blocksize=][&replication=]
211             # [&permission=][&buffersize=]"
212 0     0 1   my ( $self, $content, $file_src, $file_dest, $perms, $overwrite, $blocksize, $replication, $buffersize ) = undef;
213 0           $self = shift;
214 0 0         if ($_[0]->{'permission'}) { $perms = $_[0]->{'permission'}; } else { $perms = '000'; }
  0            
  0            
215 0 0         if ($_[0]->{'overwrite'}) { $overwrite = $_[0]->{'overwrite'}; } else { $overwrite='false'; }
  0            
  0            
216 0 0         if ($_[0]->{'srcfile'}) { $file_src = $_[0]->{'srcfile'}; } else { croak ("Need local source file to copy to HDFS") ;}
  0            
  0            
217 0 0         if ($_[0]->{'dstfile'}) { $file_dest = $_[0]->{'dstfile'}; } else { croak ("Need HDFS destination before file create can happen") ;}
  0            
  0            
218 0 0         if ($_[0]->{'blocksize'}) { $blocksize = $_[0]->{'blocksize'}; }
  0            
219 0 0         if ($_[0]->{'replication'}) { $replication = $_[0]->{'replication'}; }
  0            
220 0 0         if ($_[0]->{'buffersize'}) { $buffersize = $_[0]->{'buffersize'}; }
  0            
221 0           my $url;
222 0 0         if ($self->{'authmethod'} eq 'gssapi') {
    0          
    0          
223 0           $url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $file_dest . '?op=CREATE&permission=' . $perms . '&overwrite=' . $overwrite ;
224             } elsif ( $self->{'authmethod'} eq 'unsecure' ) {
225 0 0         croak ("I need a 'user' value if authmethod is 'none'") if ( !$self->{'user'} ) ;
226 0           $url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $file_dest . '?op=CREATE&permission=' . $perms . '&overwrite=' . $overwrite . '&user.name=' . $self->{'user'};
227             } elsif ( $self->{'authmethod'} eq 'doas' ) {
228 0 0         croak ("I need a 'user' value if authmethod is 'doas'") if ( !$self->{'user'} ) ;
229 0 0         croak ("I need a 'doas_user' value if authmethod is 'doas'") if ( !$self->{'doas_user'} ) ;
230 0           $url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $file_dest . '?op=CREATE&permission=' . $perms . '&overwrite=' . $overwrite . '&user.name=' . $self->{'user'} . '&doas=' . $self->{'doas_user'};
231             }
232              
233 0 0         if ( $self->{'webhdfstoken'} ) { $url = $url . "&delegation=" . $self->{'webhdfstoken'}; }
  0            
234              
235 0 0         if ( $blocksize ) { $url = $url . "&blocksize=" . $blocksize ; }
  0            
236 0 0         if ( $replication ) { $url = $url . "&replication=" . $replication ; }
  0            
237 0 0         if ( $buffersize ) { $url = $url . "&buffersize=" . $buffersize ; }
  0            
238              
239 0           map_file($content => $file_src, "<");
240 0           $self->put( $url, content => $content );
241 0           return $self;
242             }
243              
244             # TODO need to add 'append' method - for people wanting to corrupt hdfs. :)
245              
246             sub mkdirs {
247             # curl -i -X PUT "http://:/?op=MKDIRS[&permission=]"
248 0     0 1   my ( $self, $perms, $path, $url ) = undef;
249 0           $self = shift;
250 0 0         if ($_[0]->{'path'}) { $path = $_[0]->{'path'}; } else { croak ("I need a HDFS location to create directory"); }
  0            
  0            
251 0 0         if ($_[0]->{'permissons'}) { $perms = $_[0]->{'permisssions'}; } else { $perms = '000'; }
  0            
  0            
252              
253              
254 0 0         if ($self->{'authmethod'} eq 'gssapi') {
    0          
    0          
255 0           $url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $path . '?&op=MKDIRS&permission=' . $perms ;
256             } elsif ( $self->{'authmethod'} eq 'unsecure' ) {
257 0 0         croak ("I need a 'user' value if authmethod is 'none'") if ( !$self->{'user'} ) ;
258 0           $url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $path . '?&op=MKDIRS&permission=' . $perms . '&user.name=' . $self->{'user'};
259             } elsif ( $self->{'authmethod'} eq 'doas' ) {
260 0 0         croak ("I need a 'user' value if authmethod is 'doas'") if ( !$self->{'user'} ) ;
261 0 0         croak ("I need a 'doas_user' value if authmethod is 'doas'") if ( !$self->{'doas_user'} ) ;
262 0           $url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $path . '?&op=MKDIRS&permission=' . $perms . '&user.name=' . $self->{'user'} . '&doas=' . $self->{'doas_user'};
263             }
264              
265 0 0         if ( $self->{'webhdfstoken'} ) {
266 0           $url = $url . "&delegation=" . $self->{'webhdfstoken'};
267             }
268 0           $self->put( $url );
269 0           return $self;
270             }
271              
272             sub getcontentsummary {
273             # curl -i "http://:/webhdfs/v1/?op=GETCONTENTSUMMARY"
274 0     0 1   my ( $self, $dir, $url ) = undef;
275 0           $self = shift;
276 0 0         if ($_[0]->{'directory'}) { $dir = $_[0]->{'directory'}; } else { croak ("I need a HDFS directory to return content summary"); }
  0            
  0            
277              
278 0 0         if ($self->{'authmethod'} eq 'gssapi') {
    0          
    0          
279 0           $url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $dir . '?op=GETCONTENTSUMMARY';
280             } elsif ( $self->{'authmethod'} eq 'unsecure' ) {
281 0 0         croak ("I need a 'user' value if authmethod is 'none'") if ( !$self->{'user'} ) ;
282 0           $url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $dir . '?op=GETCONTENTSUMMARY' . '&user.name=' . $self->{'user'}; ;
283             } elsif ( $self->{'authmethod'} eq 'doas' ) {
284 0 0         croak ("I need a 'user' value if authmethod is 'doas'") if ( !$self->{'user'} ) ;
285 0 0         croak ("I need a 'doas_user' value if authmethod is 'doas'") if ( !$self->{'doas_user'} ) ;
286 0           $url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $dir . '?op=GETCONTENTSUMMARY' . '&user.name=' . $self->{'user'} . '&doas=' . $self->{'doas_user'};
287             }
288              
289 0 0         if ( $self->{'webhdfstoken'} ) {
290 0           $url = $url . "&delegation=" . $self->{'webhdfstoken'};
291             }
292            
293 0           $self->get( $url );
294 0           return $self;
295             }
296              
297              
298             sub getfilechecksum {
299             # get and return checksum for a file, curl -i "http://:/webhdfs/v1/?op=GETFILECHECKSUM"
300 0     0 1   my ( $self, $file, $url ) = undef;
301 0           $self = shift;
302 0 0         if ($_[0]->{'file'}) { $file = $_[0]->{'file'}; } else { croak ("I need a HDFS filename to get checksum"); }
  0            
  0            
303              
304 0 0         if ($self->{'authmethod'} eq 'gssapi') {
    0          
    0          
305 0           $url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $file . '?op=GETFILECHECKSUM';
306             } elsif ( $self->{'authmethod'} eq 'unsecure' ) {
307 0 0         croak ("I need a 'user' value if authmethod is 'none'") if ( !$self->{'user'} ) ;
308 0           $url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $file . '?op=GETFILECHECKSUM' . '&user.name=' . $self->{'user'}; ;
309             } elsif ( $self->{'authmethod'} eq 'doas' ) {
310 0 0         croak ("I need a 'user' value if authmethod is 'doas'") if ( !$self->{'user'} ) ;
311 0 0         croak ("I need a 'doas_user' value if authmethod is 'doas'") if ( !$self->{'doas_user'} ) ;
312 0           $url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $file . '?op=GETFILECHECKSUM' . '&user.name=' . $self->{'user'} . '&doas=' . $self->{'doas_user'};
313             }
314              
315 0 0         if ( $self->{'webhdfstoken'} ) {
316 0           $url = $url . "&delegation=" . $self->{'webhdfstoken'};
317             }
318            
319 0           $self->get( $url );
320 0           return $self;
321             }
322              
323             sub gethomedirectory {
324             # curl -i "http://:/webhdfs/v1/?op=GETHOMEDIRECTORY"
325 0     0 1   my $self = shift;
326 0           my $url;
327              
328 0 0         if ($self->{'authmethod'} eq 'gssapi') {
    0          
    0          
329 0           $url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1?op=GETHOMEDIRECTORY';
330             } elsif ( $self->{'authmethod'} eq 'unsecure' ) {
331 0 0         croak ("I need a 'user' value if authmethod is 'none'") if ( !$self->{'user'} ) ;
332 0           $url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1?op=GETHOMEDIRECTORY' . '&user.name=' . $self->{'user'}; ;
333             } elsif ( $self->{'authmethod'} eq 'doas' ) {
334 0 0         croak ("I need a 'user' value if authmethod is 'doas'") if ( !$self->{'user'} ) ;
335 0 0         croak ("I need a 'doas_user' value if authmethod is 'doas'") if ( !$self->{'doas_user'} ) ;
336 0           $url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1?op=GETHOMEDIRECTORY' . '&user.name=' . $self->{'user'} . '&doas=' . $self->{'doas_user'};
337             }
338              
339 0 0         if ( $self->{'webhdfstoken'} ) {
340 0           $url = $url . "&delegation=" . $self->{'webhdfstoken'};
341             }
342            
343 0           $self->put( $url );
344 0           return $self;
345             }
346              
347             sub setpermission {
348             # curl -i -X PUT "http://:/webhdfs/v1/?op=SETPERMISSION[&permission=]"
349 0     0 1   my ( $self, $path, $url, $perms ) = undef;
350 0           $self = shift;
351 0 0         if ($_[0]->{'path'}) { $path = $_[0]->{'path'}; } else { croak ("I need a HDFS path to set permmissions"); }
  0            
  0            
352 0 0         if ($_[0]->{'permissison'}) { $perms = $_[0]->{'permisssion'}; }
  0            
353              
354 0 0         if ($self->{'authmethod'} eq 'gssapi') {
    0          
    0          
355 0           $url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $path . '?op=SETPERMISSION';
356             } elsif ( $self->{'authmethod'} eq 'unsecure' ) {
357 0 0         croak ("I need a 'user' value if authmethod is 'none'") if ( !$self->{'user'} ) ;
358 0           $url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $path . '?op=SETPERMISSION' . '&user.name=' . $self->{'user'}; ;
359             } elsif ( $self->{'authmethod'} eq 'doas' ) {
360 0 0         croak ("I need a 'user' value if authmethod is 'doas'") if ( !$self->{'user'} ) ;
361 0 0         croak ("I need a 'doas_user' value if authmethod is 'doas'") if ( !$self->{'doas_user'} ) ;
362 0           $url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $path . '?op=SETPERMISSION' . '&user.name=' . $self->{'user'} . '&doas=' . $self->{'doas_user'};
363             }
364              
365 0 0         if ( $self->{'webhdfstoken'} ) {
366 0           $url = $url . "&delegation=" . $self->{'webhdfstoken'};
367             }
368              
369 0 0         if ($perms) {
370 0           $url = $url . "&permission=" . $perms;
371             }
372              
373 0           $self->put( $url );
374 0           return $self;
375             }
376              
377             sub setowner {
378             # curl -i -X PUT "http://:/webhdfs/v1/?op=SETOWNER[&owner=][&group=]"
379 0     0 1   my ( $self, $path, $user, $group, $url ) = undef;
380 0           $self = shift;
381 0 0         if ($_[0]->{'path'} ) { $path = $_[0]->{'path'}; } else { croak ("I need a HDFS path before changing ownership"); }
  0            
  0            
382 0 0         if ($_[0]->{'user'} ) { $user = $_[0]->{'user'}; }
  0            
383 0 0         if ($_[0]->{'group'}) { $group = $_[0]->{'group'}; }
  0            
384              
385 0 0         if ($self->{'authmethod'} eq 'gssapi') {
    0          
    0          
386 0           $url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $path . '?op=SETOWNER';
387             } elsif ( $self->{'authmethod'} eq 'unsecure' ) {
388 0 0         croak ("I need a 'user' value if authmethod is 'none'") if ( !$self->{'user'} ) ;
389 0           $url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $path . '?op=SETOWNER' . '&user.name=' . $self->{'user'}; ;
390             } elsif ( $self->{'authmethod'} eq 'doas' ) {
391 0 0         croak ("I need a 'user' value if authmethod is 'doas'") if ( !$self->{'user'} ) ;
392 0 0         croak ("I need a 'doas_user' value if authmethod is 'doas'") if ( !$self->{'doas_user'} ) ;
393 0           $url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $path . '?op=SETOWNER' . '&user.name=' . $self->{'user'} . '&doas=' . $self->{'doas_user'};
394             }
395              
396 0 0         if ($user) {
397 0           $url = $url . "&owner=" . $user;
398             }
399 0 0         if ($group) {
400 0           $url = $url . "&group=" . $group;
401             }
402              
403 0 0         if ( $self->{'webhdfstoken'} ) {
404 0           $url = $url . "&delegation=" . $self->{'webhdfstoken'};
405             }
406              
407 0           $self->put( $url );
408 0           return $self;
409             }
410              
411              
412             sub setreplication {
413             #curl -i -X PUT "http://:/webhdfs/v1/?op=SETREPLICATION[&replication=]"
414 0     0 1   my ( $self, $path, $rep,$url ) = undef;
415 0           $self = shift;
416 0 0         if ($_[0]->{'path'} ) { $path = $_[0]->{'path'}; } else { croak ("I need a HDFS path before changing ownership"); }
  0            
  0            
417 0 0         if ($_[0]->{'replication'} ) { $rep = $_[0]->{'replication'}; }
  0            
418              
419 0 0         if ($self->{'authmethod'} eq 'gssapi') {
    0          
    0          
420 0           $url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $path . '?op=SETREPLICATION';
421             } elsif ( $self->{'authmethod'} eq 'unsecure' ) {
422 0 0         croak ("I need a 'user' value if authmethod is 'none'") if ( !$self->{'user'} ) ;
423 0           $url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $path . '?op=SETREPLICATION' . '&user.name=' . $self->{'user'}; ;
424             } elsif ( $self->{'authmethod'} eq 'doas' ) {
425 0 0         croak ("I need a 'user' value if authmethod is 'doas'") if ( !$self->{'user'} ) ;
426 0 0         croak ("I need a 'doas_user' value if authmethod is 'doas'") if ( !$self->{'doas_user'} ) ;
427 0           $url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $path . '?op=SETREPLICATION' . '&user.name=' . $self->{'user'} . '&doas=' . $self->{'doas_user'};
428             }
429              
430 0 0         if ($rep) {
431 0           $url = $url . "&replication=" . $rep;
432             }
433              
434 0 0         if ( $self->{'webhdfstoken'} ) {
435 0           $url = $url . "&delegation=" . $self->{'webhdfstoken'};
436             }
437              
438 0           $self->put( $url );
439 0           return $self;
440             }
441              
442              
443             sub settimes {
444             # curl -i -X PUT "http://:/webhdfs/v1/?op=SETTIMES[&modificationtime=
445            
446 0     0 1   my ( $url, $self, $path, $modtime, $accesstime ) = undef;
447 0           $self = shift;
448 0 0         if ($_[0]->{'path'} ) { $path = $_[0]->{'path'}; } else { croak ("I need a HDFS path before changing ownership"); }
  0            
  0            
449 0 0         if ($_[0]->{'modificationtime'} ) { $modtime = $_[0]->{'modificationtime'}; }
  0            
450 0 0         if ($_[0]->{'accesstime'} ) { $accesstime = $_[0]->{'accesstime'}; }
  0            
451              
452 0 0         if ($self->{'authmethod'} eq 'gssapi') {
    0          
    0          
453 0           $url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $path . '?op=SETTIMES';
454             } elsif ( $self->{'authmethod'} eq 'unsecure' ) {
455 0 0         croak ("I need a 'user' value if authmethod is 'none'") if ( !$self->{'user'} ) ;
456 0           $url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $path . '?op=SETTIMES' . '&user.name=' . $self->{'user'}; ;
457             } elsif ( $self->{'authmethod'} eq 'doas' ) {
458 0 0         croak ("I need a 'user' value if authmethod is 'doas'") if ( !$self->{'user'} ) ;
459 0 0         croak ("I need a 'doas_user' value if authmethod is 'doas'") if ( !$self->{'doas_user'} ) ;
460 0           $url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $path . '?op=SETTIMES' . '&user.name=' . $self->{'user'} . '&doas=' . $self->{'doas_user'};
461             }
462              
463 0 0         if ($modtime) {
464 0           $url = $url . "&modificationtime=" . $modtime;
465             }
466              
467 0 0         if ($accesstime) {
468 0           $url = $url . "&accesstime=" . $accesstime;
469             }
470              
471 0 0         if ( $self->{'webhdfstoken'} ) {
472 0           $url = $url . "&delegation=" . $self->{'webhdfstoken'};
473             }
474              
475 0           $self->put( $url );
476 0           return $self;
477              
478             }
479              
480              
481             sub liststatus {
482             # list contents of directory, curl -i "http://:/webhdfs/v1/?op=LISTSTATUS"
483 0     0 1   my ( $self, $path, $url ) = undef;
484 0           $self = shift;
485 0 0         if ($_[0]->{'path'}) { $path = $_[0]->{'path'}; } else { croak ("I need a HDFS pathname to get status"); }
  0            
  0            
486              
487 0 0         if ($self->{'authmethod'} eq 'gssapi') {
    0          
    0          
488 0           $url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $path . '?op=LISTSTATUS';
489             } elsif ( $self->{'authmethod'} eq 'unsecure' ) {
490 0 0         croak ("I need a 'user' value if authmethod is 'none'") if ( !$self->{'user'} ) ;
491 0           $url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $path . '?op=LISTSTATUS' . '&user.name=' . $self->{'user'}; ;
492             } elsif ( $self->{'authmethod'} eq 'doas' ) {
493 0 0         croak ("I need a 'user' value if authmethod is 'doas'") if ( !$self->{'user'} ) ;
494 0 0         croak ("I need a 'doas_user' value if authmethod is 'doas'") if ( !$self->{'doas_user'} ) ;
495 0           $url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $path . '?op=LISTSTATUS' . '&user.name=' . $self->{'user'} . '&doas=' . $self->{'doas_user'};
496             }
497              
498 0 0         if ( $self->{'webhdfstoken'} ) {
499 0           $url = $url . "&delegation=" . $self->{'webhdfstoken'};
500             }
501 0           $self->get( $url );
502 0           return $self;
503             }
504              
505             sub rename {
506             # curl -i -X PUT ":/webhdfs/v1/?op=RENAME&destination="
507             #my $url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $src . '?op=RENAME&destination=' . $dst;
508              
509 0     0 1   my ( $self, $src, $dst ) = undef;
510 0           $self = shift;
511 0 0         if ($_[0]->{'srcfile'}) { $src = $_[0]->{'srcfile'}; } else { croak ("Need HDFS source before rename can happen") ;}
  0            
  0            
512 0 0         if ($_[0]->{'dstfile'}) { $dst = $_[0]->{'dstfile'}; } else { croak ("Need HDFS destination before rename can happen") ;}
  0            
  0            
513            
514 0           my $url;
515 0 0         if ($self->{'authmethod'} eq 'gssapi') {
    0          
    0          
516 0           $url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $src . '?op=RENAME&destination=' . $dst ;
517             } elsif ( $self->{'authmethod'} eq 'unsecure' ) {
518 0 0         croak ("I need a 'user' value if authmethod is 'none'") if ( !$self->{'user'} ) ;
519 0           $url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $src . '?op=RENAME&destination=' . $dst . '&user.name=' . $self->{'user'};
520             } elsif ( $self->{'authmethod'} eq 'doas' ) {
521 0 0         croak ("I need a 'user' value if authmethod is 'doas'") if ( !$self->{'user'} ) ;
522 0 0         croak ("I need a 'doas_user' value if authmethod is 'doas'") if ( !$self->{'doas_user'} ) ;
523 0           $url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $src . '?op=RENAME&destination=' . $dst . '&user.name=' . $self->{'user'} . '&doas=' . $self->{'doas_user'};
524             }
525              
526 0 0         if ( $self->{'webhdfstoken'} ) {
527 0           $url = $url . "&delegation=" . $self->{'webhdfstoken'};
528             }
529 0           $self->put( $url );
530             }
531              
532              
533             =pod
534              
535             =head1 NAME
536              
537             Apache::Hadoop::WebHDFS - interface to Hadoop's WebHDS API that supports GSSAPI/SPNEGO (secure) access.
538              
539             =head1 VERSION
540              
541             Version 0.04
542              
543             =head1 SYNOPSIS
544              
545             Hadoop's WebHDFS API, is a rest interface to HDFS. This module provides
546             a perl interface to the API, allowing one to both read and write files to
547             HDFS. Because Apache::Hadoop::WebHDFS supports GSSAPI, it can be used to
548             interface with secure Hadoop Clusters. This module also supports WebHDFS connections
549             with unsecure grids
550              
551             Apache::Hadoop::WebHDFS is a subclass of WWW:Mechanize, so one could
552             reference WWW::Mechanize methods if needed. One will note that
553             WWW::Mechanize is a subclass of LWP, meaning it's possible to also reference
554             LWP methods from Apache::Hadoop::WebHDFS. For example to debug the GSSAPI
555             calls used during the request, enable LWP::Debug by adding 'use LWP::Debug qw(+);' to your script.
556              
557             Content returned from WebHDFS is left in the native JSON format. Including your favorite JSON module like JSON::Any
558             will help with mangaging the JSON output. To get access to the content stored in your Apache::Hadoop::WebHDFS object,
559             use the methods provided by WWW::Mechanize, such as 'success', 'status', and 'content'. Please see 'EXAMPLE' below
560             for how this is used.
561              
562              
563             =head1 METHODS
564              
565             =over 3
566              
567             =item * new() - creates a new WebHDFS object. Required keys are 'user', 'namenode', 'namenodeport', and 'authmethod'. Default values for 'namenode' and 'namenodeport' are listed below. The default value for authmethod is 'gssapi', which is used on grids where SPNEGO has been enabled. The 'doasuser' is optional and intended to be used when proxying the WebHDFS request as another user.
568            
569             my $hdfsclient = new({ namenode => "localhost",
570             namenodeport => "50070",
571             authmethod => "gssapi|unsecure|doas",
572             user => 'user1',
573             doasuser => 'user2',
574             });
575            
576              
577             =item * getdelegationtoken() - gets a delegation token from the namenode. This token is stored within the WebHDFS object and automatically appended to each WebHDFS request. Delegation tokens are used on grids with security enabled.
578            
579             $hdfsclient->getdelegationtoken();
580              
581             =item * renewdelegationtoken() - renews a delegation token from the namenode.
582              
583             $hdfsclient->renewdelegationtoken();
584              
585             =item * canceldelegationtoken() - informs the namenode to invalidate the delegation token as it's no longer needed. When calling this method, the delegation token is also removed from the perl WebHDFS object.
586              
587             $hdfsclient->canceldelegationtoken();
588              
589             =item * Open() - opens file on HDFS and returns it's content The only required value for Open() is 'file', all others are optional. The values, 'offset', 'length', and 'buffersize', are meant to be sized in bytes.
590              
591             $hdfsclient->Open({ file=>'/path/to/my/hdfs/file',
592             offset=>'1024',
593             length=>'2048',
594             buffersize=>'1024',
595             });
596              
597             =item * create() - creates and writes to a file on HDFS Required values for create are 'srcfile' which is local, and dstfile which is the path for the new file on HDFS. 'blocksize' is represented in bytes and 'overwrite' has two valid values of 'true' or 'false'. While not required, if permissions are not provided they will default to '000'.
598              
599             $hdfsclient->create({ srcfile=>'/my/local/file.txt',
600             dstfile=>'/my/hdfs/location/file.txt',
601             blocksize=>'524288',
602             replication=>'3',
603             buffersize=>'1024',
604             overwrite=>'true|false',
605             permission=>'644',
606             });
607              
608             =item * rename() - renames a file on HDFS. Required values for rename are 'srcfile' and 'dstfile', both of which represent HDFS filenames.
609            
610             $hdfsclient->rename({ srcfile=>'/my/old/hdfs/file.txt',
611             dstfile=>'my/new/hdfs/file.txt',
612             });
613              
614             =item * getfilestatus() - returns a json structure containing status of file or directory. Required input is a HDFS path.
615            
616             $hdfsclient->getfilestatus({ file=>'/path/to/my/hdfs/file.txt' });
617              
618             =item * liststatus() - returns a json structure of contents inside a directory. Note the timestamps are java timestamps so divide by 1000 to convert to ctime before attempting to format time value.
619            
620             $hdfsclient->liststatus({ path=>'/path/to/my/hdfs/directory' });
621              
622             =item * mkdirs() - creates a directory on HDFS. The only required input value is path. Their is an optional input value named permissions and if not provided will default to '000'.
623              
624             $hdfsclient->mkdirs({ path=>'/path/to/my/hdfs/directory',
625             permissions=>'755',
626             });
627              
628             =item * getfilechecksum() - gets HDFS checksum on file. Note this is the crc32 checksum that HDFS uses to detect file corruption. It's not the checksum of the file itself. The only required input value is 'file'.
629              
630             $hdfsclient->getfilechecksum({ file=>'/path/to/my/hdfs/directory' });
631              
632             =item * Delete() - removes file or directories from HDFS. The only required input value is 'path'. The other optional value is 'recursive' which takes a 'true|false' arguement.
633              
634             $hdfsclient->Delete({ path=>'/path/to/my/hdfs/directory',
635             recursive=>'true|false',
636             });
637              
638             =item * getcontentsummary() - list metadata information on a directory. This includes things like file count and quota usage for that directory. The only input value is a path to a HDFS directory.
639              
640             $hdfsclient->getcontentsummary({ directory=>'/path/to/my/hdfs/directory' });
641              
642             =item * getfilestatus() - returns access times, blocksize, and permissions on a HDFS file.
643              
644             $hdfsclient->getfilestatus({ file=>'/path/to/my/hdfs/file' });
645              
646             =item * gethomedirectory() - returns path to the home directory for the user or 'proxy user'. There is no input for this method.
647              
648             $hdfsclient->gethomedirectory();
649              
650             =item * setowner() - changes owner and group ownership on a file or directory on HDFS. The only required input is 'path'.
651              
652             $hdfsclient->setowner({ path=>'/path/to/my/hdfs/directory',
653             user=>'cartman',
654             group=>'fifthgraders',
655             });
656              
657             =item * setpermission() - changes owner and group permissions on a file or directory on HDFS. Path is required and permissions are optional.
658              
659             $hdfsclient->setpermisssion({ path=>'/path/to/my/hdfs/directory',
660             permisssion=>'640',
661             });
662              
663              
664             =item * setreplication() - changes replication count for a file on HDFS. Path is required, replication is optional.
665              
666             $hdfsclient->setreplication({ path=>'/path/to/my/hdfs/directory',
667             replication=>'10',
668             });
669              
670              
671             =item * settimes() - changes access and modifcation time for a file or directory on HDFS. Path is required, both access and modification times are optional. Remember these times are in java time, so make sure to convert ctime to java time by multiplying by 1000.
672              
673             $hdfsclient->setreplication({ path=>'/path/to/my/hdfs/directory',
674             modificationtime=>$mymodtime,
675             accesstime=>$myatime,
676             });
677              
678             =back
679              
680              
681             =head1 REQUIREMENTS
682              
683             Carp is used for various warnings and errors.
684             WWW::Mechanize is needed as this is a subclass.
685             LWP::Debug is required for debugging GSSAPI connections
686             LWP::Authen::Negotiate is the magic sauce for working with secure hadoop clusters
687             parent included with Perl 5.10.1 and newer or found on CPAN for older versions of perl
688             File::Map required for reading contents of files into mmap'ed memory space instead of perl's symbol table.
689              
690             =head1 EXAMPLE
691              
692             =head2 list a HDFS directory on a secure hadop cluster
693              
694             #!/usr/bin/perl
695             use strict;
696             use warnings;
697             use Data::Dumper;
698             use Apache::Hadoop::WebHDFS;
699             my $username=getlogin();
700             my $hdfsclient = Apache::Hadoop::WebHDFS->new( {namenode =>"mynamenode.example.com",
701             namenodeport =>"50070",
702             authmethod =>"gssapi",
703             user =>$username,
704             });
705             $hdfsclient->liststatus( {path=>'/user/$username'} );
706             if ($hdfsclient->success()) {
707             print "Request SUCCESS: ", $hdfsclient->status() , "\n\n";
708             print "Dumping content:\n";
709             print Dumper $hdfsclient->content() ;
710             } else {
711             print "Request FAILED: ", $hdfsclient->status() , "\n";
712             }
713              
714            
715             =head1 AUTHOR
716              
717             Adam Faris, C<< >>
718              
719             =head1 BUGS
720              
721             Please use github to report bugs and feature requests
722             https://github.com/opsmekanix/Apache-Hadoop-WebHDFS/issues
723              
724             =head1 SUPPORT
725              
726             You can find documentation for this module with the perldoc command.
727              
728             perldoc Apache::Hadoop::WebHDFS
729              
730              
731             You can also look for information at:
732              
733             =over 4
734              
735             =item * AnnoCPAN: Annotated CPAN documentation
736              
737             L
738              
739             =item * CPAN Ratings
740              
741             L
742              
743             =item * Search CPAN
744              
745             L
746              
747             =back
748              
749              
750             =head1 ACKNOWLEDGEMENTS
751              
752             I would like to acknowledge Andy Lester plus the numerous people who have
753             worked on WWW::Mechanize, Anchim Grolms and team for providing
754             LWP::Authen::Negotiate, and the contributors to LWP. Thanks for providing
755             awesome modules.
756              
757             =head1 LICENSE AND COPYRIGHT
758              
759             Copyright 2013 Adam Faris.
760              
761             Licensed under the Apache License, Version 2.0 (the "License");
762             you may not use this file except in compliance with the License.
763             You may obtain a copy of the License at
764              
765             L
766              
767             Unless required by applicable law or agreed to in writing, software
768             distributed under the License is distributed on an "AS IS" BASIS,
769             WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
770             See the License for the specific language governing permissions and
771             limitations under the License.
772              
773              
774             =cut
775              
776              
777             return 1;