File Coverage

blib/lib/Net/Hadoop/WebHDFS.pm
Criterion Covered Total %
statement 38 248 15.3
branch 8 114 7.0
condition 12 70 17.1
subroutine 12 49 24.4
pod 19 37 51.3
total 89 518 17.1


line stmt bran cond sub pod time code
1             package Net::Hadoop::WebHDFS;
2              
3 1     1   21699 use strict;
  1         1  
  1         22  
4 1     1   3 use warnings;
  1         1  
  1         17  
5 1     1   3 use Carp;
  1         3  
  1         45  
6              
7 1     1   582 use JSON::XS qw//;
  1         5577  
  1         22  
8              
9 1     1   433 use Furl;
  1         18074  
  1         22  
10 1     1   5 use File::Spec;
  1         1  
  1         13  
11 1     1   471 use URI;
  1         3029  
  1         21  
12 1     1   412 use Try::Tiny;
  1         991  
  1         2290  
13              
14             our $VERSION = "0.7";
15              
16             our %OPT_TABLE = ();
17              
18             sub new {
19 5     5 1 13389 my ($this, %opts) = @_;
20             my $self = +{
21             host => $opts{host} || 'localhost',
22             port => $opts{port} || 50070,
23             standby_host => $opts{standby_host},
24             standby_port => ($opts{standby_port} || $opts{port} || 50070),
25             httpfs_mode => $opts{httpfs_mode} || 0,
26             username => $opts{username},
27             doas => $opts{doas},
28             useragent => $opts{useragent} || 'Furl Net::Hadoop::WebHDFS (perl)',
29             timeout => $opts{timeout} || 10,
30 5   50     141 suppress_errors => $opts{suppress_errors} || 0,
      50        
      50        
      50        
      50        
      50        
      50        
31             last_error => undef,
32             under_failover => 0,
33             };
34 5         35 $self->{furl} = Furl::HTTP->new(agent => $self->{useragent}, timeout => $self->{timeout}, max_redirects => 0);
35 5         154 return bless $self, $this;
36             }
37              
38             # curl -i -X PUT "http://:/webhdfs/v1/?op=CREATE
39             # [&overwrite=][&blocksize=][&replication=]
40             # [&permission=][&buffersize=]"
41             sub create {
42 0     0 1 0 my ($self, $path, $body, %options) = @_;
43 0 0       0 if ($self->{httpfs_mode}) {
44 0         0 %options = (%options, data => 'true');
45             }
46 0         0 my $err = $self->check_options('CREATE', %options);
47 0 0       0 croak $err if $err;
48              
49 0         0 my $res = $self->operate_requests('PUT', $path, 'CREATE', \%options, $body);
50 0         0 $res->{code} == 201;
51             }
52             $OPT_TABLE{CREATE} = ['overwrite', 'blocksize', 'replication', 'permission', 'buffersize', 'data'];
53              
54             # curl -i -X POST "http://:/webhdfs/v1/?op=APPEND[&buffersize=]"
55             sub append {
56 0     0 1 0 my ($self, $path, $body, %options) = @_;
57 0 0       0 if ($self->{httpfs_mode}) {
58 0         0 %options = (%options, data => 'true');
59             }
60 0         0 my $err = $self->check_options('APPEND', %options);
61 0 0       0 croak $err if $err;
62              
63 0         0 my $res = $self->operate_requests('POST', $path, 'APPEND', \%options, $body);
64 0         0 $res->{code} == 200;
65             }
66             $OPT_TABLE{APPEND} = ['buffersize', 'data'];
67              
68             # curl -i -L "http://:/webhdfs/v1/?op=OPEN
69             # [&offset=][&length=][&buffersize=]"
70             sub read {
71 0     0 1 0 my ($self, $path, %options) = @_;
72 0         0 my $err = $self->check_options('OPEN', %options);
73 0 0       0 croak $err if $err;
74              
75 0         0 my $res = $self->operate_requests('GET', $path, 'OPEN', \%options);
76 0         0 $res->{body};
77             }
78             $OPT_TABLE{OPEN} = ['offset', 'length', 'buffersize'];
79 0     0 0 0 sub open { (shift)->read(@_); }
80              
81             # curl -i -X PUT "http://:/?op=MKDIRS[&permission=]"
82             sub mkdir {
83 0     0 1 0 my ($self, $path, %options) = @_;
84 0         0 my $err = $self->check_options('MKDIRS', %options);
85 0 0       0 croak $err if $err;
86              
87 0         0 my $res = $self->operate_requests('PUT', $path, 'MKDIRS', \%options);
88 0         0 $self->check_success_json($res, 'boolean');
89             }
90             $OPT_TABLE{MKDIRS} = ['permission'];
91 0     0 0 0 sub mkdirs { (shift)->mkdir(@_); }
92              
93             # curl -i -X PUT ":/webhdfs/v1/?op=RENAME&destination="
94             sub rename {
95 0     0 1 0 my ($self, $path, $dest, %options) = @_;
96 0         0 my $err = $self->check_options('RENAME', %options);
97 0 0       0 croak $err if $err;
98              
99 0 0       0 unless ($dest =~ m!^/!) {
100 0         0 $dest = '/' . $dest;
101             }
102 0         0 my $res = $self->operate_requests('PUT', $path, 'RENAME', {%options, destination => $dest});
103 0         0 $self->check_success_json($res, 'boolean');
104             }
105              
106             # curl -i -X DELETE "http://:/webhdfs/v1/?op=DELETE
107             # [&recursive=]"
108             sub delete {
109 0     0 1 0 my ($self, $path, %options) = @_;
110 0         0 my $err = $self->check_options('DELETE', %options);
111 0 0       0 croak $err if $err;
112              
113 0         0 my $res = $self->operate_requests('DELETE', $path, 'DELETE', \%options);
114 0         0 $self->check_success_json($res, 'boolean');
115             }
116             $OPT_TABLE{DELETE} = ['recursive'];
117              
118             # curl -i "http://:/webhdfs/v1/?op=GETFILESTATUS"
119             sub stat {
120 0     0 1 0 my ($self, $path, %options) = @_;
121 0         0 my $err = $self->check_options('GETFILESTATUS', %options);
122 0 0       0 croak $err if $err;
123              
124 0         0 my $res = $self->operate_requests('GET', $path, 'GETFILESTATUS', \%options);
125 0         0 $self->check_success_json($res, 'FileStatus');
126             }
127 0     0 0 0 sub getfilestatus { (shift)->stat(@_); }
128              
129             # curl -i "http://:/webhdfs/v1/?op=LISTSTATUS"
130             sub list {
131 0     0 1 0 my ($self, $path, %options) = @_;
132 0         0 my $err = $self->check_options('LISTSTATUS', %options);
133 0 0       0 croak $err if $err;
134              
135 0         0 my $res = $self->operate_requests('GET', $path, 'LISTSTATUS', \%options);
136 0         0 $self->check_success_json($res, 'FileStatuses')->{FileStatus};
137             }
138 0     0 0 0 sub liststatus { (shift)->list(@_); }
139              
140             # curl -i "http://:/webhdfs/v1/?op=GETCONTENTSUMMARY"
141             sub content_summary {
142 0     0 1 0 my ($self, $path, %options) = @_;
143 0         0 my $err = $self->check_options('GETCONTENTSUMMARY', %options);
144 0 0       0 croak $err if $err;
145              
146 0         0 my $res = $self->operate_requests('GET', $path, 'GETCONTENTSUMMARY', \%options);
147 0         0 $self->check_success_json($res, 'ContentSummary');
148             }
149 0     0 0 0 sub getcontentsummary { (shift)->content_summary(@_); }
150              
151             # curl -i "http://:/webhdfs/v1/?op=GETFILECHECKSUM"
152             sub checksum {
153 0     0 1 0 my ($self, $path, %options) = @_;
154 0         0 my $err = $self->check_options('GETFILECHECKSUM', %options);
155 0 0       0 croak $err if $err;
156              
157 0         0 my $res = $self->operate_requests('GET', $path, 'GETFILECHECKSUM', \%options);
158 0         0 $self->check_success_json($res, 'FileChecksum');
159             }
160 0     0 0 0 sub getfilechecksum { (shift)->checksum(@_); }
161              
162             # curl -i "http://:/webhdfs/v1/?op=GETHOMEDIRECTORY"
163             sub homedir {
164 0     0 1 0 my ($self, %options) = @_;
165 0         0 my $err = $self->check_options('GETHOMEDIRECTORY', %options);
166 0 0       0 croak $err if $err;
167              
168 0         0 my $res = $self->operate_requests('GET', '/', 'GETHOMEDIRECTORY', \%options);
169 0         0 $self->check_success_json($res, 'Path');
170             }
171 0     0 0 0 sub gethomedirectory { (shift)->homedir(@_); }
172              
173             # curl -i -X PUT "http://:/webhdfs/v1/?op=SETPERMISSION
174             # [&permission=]"
175             sub chmod {
176 0     0 1 0 my ($self, $path, $mode, %options) = @_;
177 0         0 my $err = $self->check_options('SETPERMISSION', %options);
178 0 0       0 croak $err if $err;
179              
180 0         0 my $res = $self->operate_requests('PUT', $path, 'SETPERMISSION', {%options, permission => $mode});
181 0         0 $res->{code} == 200;
182             }
183 0     0 0 0 sub setpermission { (shift)->chmod(@_); }
184              
185             # curl -i -X PUT "http://:/webhdfs/v1/?op=SETOWNER
186             # [&owner=][&group=]"
187             sub chown {
188 0     0 1 0 my ($self, $path, %options) = @_;
189 0         0 my $err = $self->check_options('SETOWNER', %options);
190 0 0       0 croak $err if $err;
191              
192 0 0 0     0 unless (defined($options{owner}) or defined($options{group})) {
193 0         0 croak "'chown' needs at least one of owner or group";
194             }
195              
196 0         0 my $res = $self->operate_requests('PUT', $path, 'SETOWNER', \%options);
197 0         0 $res->{code} == 200;
198             }
199             $OPT_TABLE{SETOWNER} = ['owner', 'group'];
200 0     0 0 0 sub setowner { (shift)->chown(@_); }
201              
202             # curl -i -X PUT "http://:/webhdfs/v1/?op=SETREPLICATION
203             # [&replication=]"
204             sub replication {
205 0     0 1 0 my ($self, $path, $replnum, %options) = @_;
206 0         0 my $err = $self->check_options('SETREPLICATION', %options);
207 0 0       0 croak $err if $err;
208              
209 0         0 my $res = $self->operate_requests('PUT', $path, 'SETREPLICATION', {%options, replication => $replnum});
210 0         0 $self->check_success_json($res, 'boolean');
211             }
212 0     0 0 0 sub setreplication { (shift)->replication(@_); }
213              
214             # curl -i -X PUT "http://:/webhdfs/v1/?op=SETTIMES
215             # [&modificationtime=
216             # modificationtime: radix-10 long integer
217             # accesstime: radix-10 long integer
218             $OPT_TABLE{SETTIMES} = [ qw( modificationtime accesstime ) ];
219             sub touch {
220 0     0 1 0 my ($self, $path, %options) = @_;
221 0         0 my $err = $self->check_options('SETTIMES', %options);
222 0 0       0 croak $err if $err;
223              
224 0 0 0     0 unless (defined($options{modificationtime}) or defined($options{accesstime})) {
225 0         0 croak "'touch' needs at least one of modificationtime or accesstime";
226             }
227              
228 0         0 my $res = $self->operate_requests('PUT', $path, 'SETTIMES', \%options);
229 0         0 $res->{code} == 200;
230             }
231              
232             sub touchz {
233 0     0 1 0 my ($self, $path) = @_;
234 0         0 return $self->create( $path, '', overwrite => 'true' );
235             }
236              
237 0     0 0 0 sub settimes { (shift)->touch(@_); }
238              
239             # sub delegation_token {}
240             # sub renew_delegation_token {}
241             # sub cancel_delegation_token {}
242              
243             sub check_options {
244 0     0 0 0 my ($self, $op, %opts) = @_;
245 0         0 my @ex = ();
246 0   0     0 my $opts = $OPT_TABLE{$op} || [];
247 0         0 foreach my $k (keys %opts) {
248 0 0       0 push @ex, $k if scalar(grep {$k eq $_} @$opts) < 1;
  0         0  
249             }
250 0 0       0 return undef unless @ex;
251 0         0 'no such option: ' . join(' ', @ex);
252             }
253              
254             sub check_success_json {
255 8     8 0 15 my ($self, $res, $attr) = @_;
256             $res->{code} == 200 and $res->{content_type} =~ m!^application/json! and
257 8 100 66     88 (not defined($attr) or JSON::XS::decode_json($res->{body})->{$attr});
      100        
258             }
259              
260             sub api_path {
261 10     10 0 17 my ($self, $path) = @_;
262 10 100       49 return '/webhdfs/v1' . $path if $path =~ m!^/!;
263 1         6 '/webhdfs/v1/' . $path;
264             }
265              
266             sub build_path {
267 7     7 0 1809 my ($self, $path, $op, %params) = @_;
268             my %opts = (('op' => $op),
269             ($self->{username} ? ('user.name' => $self->{username}) : ()),
270 7 100       36 ($self->{doas} ? ('doas' => $self->{doas}) : ()),
    100          
271             %params);
272 7         24 my $u = URI->new('', 'http');
273 7         6029 $u->query_form(%opts);
274 7         407 $self->api_path($path) . $u->path_query; # path_query() #=> '?foo=1&bar=2'
275             }
276              
277             sub connect_to {
278 0     0 0   my $self = shift;
279 0 0         if ($self->{under_failover}) {
280 0           return ($self->{standby_host}, $self->{standby_port});
281             }
282 0           return ($self->{host}, $self->{port});
283             }
284              
285             our %REDIRECTED_OPERATIONS = (APPEND => 1, CREATE => 1, OPEN => 1, GETFILECHECKSUM => 1);
286             sub operate_requests {
287 0     0 0   my ($self, $method, $path, $op, $params, $payload) = @_;
288              
289 0           my ($host, $port) = $self->connect_to();
290              
291 0           my $headers = []; # or undef ?
292 0 0 0       if ($self->{httpfs_mode} or not $REDIRECTED_OPERATIONS{$op}) {
293             # empty files are ok
294 0 0 0       if ($self->{httpfs_mode} and defined($payload)) {
295 0           $headers = ['Content-Type' => 'application/octet-stream'];
296             }
297 0           return $self->request($host, $port, $method, $path, $op, $params, $payload, $headers);
298             }
299              
300             # pattern for not httpfs and redirected by namenode
301 0           my $res = $self->request($host, $port, $method, $path, $op, $params, undef);
302 0 0 0       unless ($res->{code} >= 300 and $res->{code} <= 399 and $res->{location}) {
      0        
303 0           my $code = $res->{code};
304 0           my $body = $res->{body};
305 0           croak "NameNode returns non-redirection (or without location header), code:$code, body:$body.";
306             }
307 0           my $uri = URI->new($res->{location});
308 0           $headers = ['Content-Type' => 'application/octet-stream'];
309 0           return $self->request($uri->host, $uri->port, $method, $uri->path_query, undef, {}, $payload, $headers);
310             }
311              
312             sub request {
313 0     0 0   my $self = shift;
314 0 0         return $self->_request(@_) unless $self->{suppress_errors};
315              
316             try {
317 0     0     $self->_request(@_);
318             } catch {
319 0     0     $self->{last_error} = $_;
320 0           0;
321 0           };
322             }
323              
324             # IllegalArgumentException 400 Bad Request
325             # UnsupportedOperationException 400 Bad Request
326             # SecurityException 401 Unauthorized
327             # IOException 403 Forbidden
328             # FileNotFoundException 404 Not Found
329             # RumtimeException 500 Internal Server Error
330             sub _request {
331 0     0     my ($self, $host, $port, $method, $path, $op, $params, $payload, $header) = @_;
332              
333 0 0         my $request_path = $op ? $self->build_path($path, $op, %$params) : $path;
334             my ($ver, $code, $msg, $headers, $body) = $self->{furl}->request(
335 0 0         method => $method,
336             host => $host,
337             port => $port,
338             path_query => $request_path,
339             headers => $header,
340             ($payload ? (content => $payload) : ()),
341             );
342              
343 0           my $res = { code => $code, body => $body };
344              
345 0           for (my $i = 0; $i < scalar(@$headers); $i += 2) {
346 0           my $header = $headers->[$i];
347 0           my $value = $headers->[$i + 1];
348              
349 0 0         if ($header =~ m!^location$!i) { $res->{location} = $value; }
  0 0          
350 0           elsif ($header =~ m!^content-type$!i) { $res->{content_type} = $value; }
351             }
352              
353 0 0 0       return $res if $code >= 200 and $code <= 299;
354 0 0 0       return $res if $code >= 300 and $code <= 399;
355              
356 0   0       my $errmsg = $res->{body} || 'Response body is empty...';
357 0           $errmsg =~ s/\n//g;
358              
359 0 0         if ($code == 400) { croak "ClientError: $errmsg"; }
  0 0          
    0          
    0          
    0          
360 0           elsif ($code == 401) { croak "SecurityError: $errmsg"; }
361             elsif ($code == 403) {
362 0 0         if ($errmsg =~ /org\.apache\.hadoop\.ipc\.StandbyException/) {
363 0 0 0       if ($self->{httpfs_mode} || not defined($self->{standby_host})) {
    0          
364             # failover is disabled
365             } elsif ($self->{retrying}) {
366             # more failover is prohibited
367 0           $self->{retrying} = 0;
368             } else {
369 0           $self->{under_failover} = not $self->{under_failover};
370 0           $self->{retrying} = 1;
371 0           my ($next_host, $next_port) = $self->connect_to();
372 0           my $val = $self->request($next_host, $next_port, $method, $path, $op, $params, $payload, $header);
373 0           $self->{retrying} = 0;
374 0           return $val;
375             }
376             }
377 0           croak "IOError: $errmsg";
378             }
379 0           elsif ($code == 404) { croak "FileNotFoundError: $errmsg"; }
380 0           elsif ($code == 500) { croak "ServerError: $errmsg"; }
381              
382 0           croak "RequestFailedError, code:$code, message:$errmsg";
383             }
384              
385             sub exists {
386 0     0 1   my $self = shift;
387 0   0       my $path = shift || croak "No HDFS path was specified";
388 0           my $stat;
389             eval {
390 0           $stat = $self->stat( $path );
391 0           1;
392 0 0         } or do {
393 0   0       my $eval_error = $@ || 'Zombie error';
394 0 0         return if $eval_error =~ m<
395             \QFileNotFoundError: {"RemoteException":{"message":"File does not exist:\E
396             >xms;
397             # just re-throw
398 0           croak $eval_error;
399             };
400 0           return $stat;
401             }
402              
403             sub find {
404 0     0 1   my $self = shift;
405 0   0       my $file_path = shift || croak "No file path specified";
406 0           my $cb = shift;
407 0 0 0       my $opt = @_ && ref $_[-1] eq 'HASH' ? pop @_ : {};
408              
409 0 0         if ( ref $cb ne 'CODE' ) {
410 0           die "Call back needs to be a CODE ref";
411             }
412              
413 0           my $suppress = $self->{suppress_errors};
414             # can be used to quickly skip the java junk like, file names starting with
415             # underscores, etc.
416 0 0         my $re_ignore = $opt->{re_ignore} ? qr/$opt->{re_ignore}/ : undef;
417              
418             #
419             # No such thing like symlinks (yet) in HDFS, in case you're wondering:
420             # https://issues.apache.org/jira/browse/HADOOP-10019
421             # although check that link yourself
422             #
423 0           my $looper;
424             $looper = sub {
425 0     0     my $thing = shift;
426 0 0         if ( ! $self->exists( $thing ) ) {
427             # should happen at the start, so this will short-circuit the recursion
428 0           warn "The HDFS directory specified ($thing) does not exist! Please guard your HDFS paths with exists()";
429 0           return;
430             }
431 0           my $list = $self->list( $thing );
432 0           foreach my $e ( @{ $list } ) {
  0            
433 0           my $path = $e->{pathSuffix};
434 0           my $type = $e->{type};
435              
436 0 0 0       next if $re_ignore && $path && $path =~ $re_ignore;
      0        
437              
438 0 0         if ( $type eq 'DIRECTORY' ) {
    0          
439 0           $cb->( $thing, $e );
440             eval {
441 0           $looper->( File::Spec->catdir( $thing, $path ) );
442 0           1;
443 0 0         } or do {
444 0   0       my $eval_error = $@ || 'Zombie error';
445 0 0         if ( $suppress ) {
446 0           warn "[ERROR DOWNGRADED] Failed to check $thing/$path: $eval_error";
447 0           next;
448             }
449 0           croak $eval_error;
450             }
451             }
452             elsif ( $type eq 'FILE' ) {
453 0           $cb->( $thing, $e );
454             }
455             else {
456 0           my $msg = "I don't know what to do with type=$type!";
457 0 0         if ( $suppress ) {
458 0           warn "[ERROR DOWNGRADED] $msg";
459 0           next;
460             }
461 0           croak $msg;
462             }
463             }
464 0           return;
465 0           };
466              
467 0           $looper->( $file_path );
468              
469 0           return;
470             }
471              
472             1;
473              
474             __END__