line
stmt
bran
cond
sub
pod
time
code
1
#!/usr/bin/perl
2
package Apache::Hadoop::WebHDFS;
3
our $VERSION = "0.04";
4
1
1
27665
use warnings;
1
2
1
37
5
1
1
7
use strict;
1
2
1
36
6
1
1
1109
use lib '.';
1
801
1
5
7
1
1
934
use parent 'WWW::Mechanize';
1
270
1
5
8
1
1
261848
use Carp;
1
3
1
69
9
1
1
843
use File::Map 'map_file';
1
7037
1
6
10
11
# ###################
12
# WWW::Mech calls we care about
13
# $m -> get('http://url.com') Does a get on that url
14
# $m -> put('http://blah.com', content=$content)
15
#
16
# $m -> success() boolean if last request was success
17
# $m -> content() content of request, which can be formated
18
# $m -> ct() content type returned, ie: 'application/json'
19
# $m -> status() HTTP status code of response
20
21
sub redirect_ok {
22
# need to allow 'put' to follow redirect on 307 requests, per RFC 2616 section 10.3.8
23
# redirect_ok is part of LWP::UserAgent which is subclassed
24
# by WWW:Mech and finally Apache::Hadoop::WebHDFS.
25
0
0
1
return 1; # always return true.
26
}
27
28
sub new {
29
# Create new WebHDFS object
30
0
0
1
my $class = shift;
31
0
my $namenode = 'localhost';
32
0
my $namenodeport= 50070;
33
0
my $authmethod = 'gssapi'; # 3 values: gssapi, unsecure, doas
34
0
my ($url, $urlpre, $urlauth, $user, $doas_user) = undef;
35
36
0
0
if ($_[0]->{'doas_user'}) { $doas_user = $_[0]->{'doas_user'}; }
0
37
0
0
if ($_[0]->{'namenode'}) { $namenode = $_[0]->{'namenode'}; }
0
38
0
0
if ($_[0]->{'namenodeport'}) { $namenodeport = $_[0]->{'namenodeport'}; }
0
39
0
0
if ($_[0]->{'authmethod'}) { $authmethod = $_[0]->{'authmethod'}; }
0
40
0
0
if ($_[0]->{'user'}) { $user = $_[0]->{'user'}; }
0
41
42
# stack_depth set to 0 so we don't blow-up ram by saving content with each request.
43
0
my $self = $class-> SUPER::new( agent=>"Apache_Hadoop_WebHDFS",
44
stack_depth=>"0",
45
);
46
47
0
$self->{'namenode'} = $namenode;
48
0
$self->{'namenodeport'} = $namenodeport;
49
0
$self->{'authmethod'} = $authmethod;
50
0
$self->{'user'} = $user;
51
0
$self->{'doas_user'} = $doas_user;
52
0
return $self;
53
}
54
55
sub getdelegationtoken {
56
# Fetch delegation token and store in object
57
0
0
1
my ( $self ) = shift;
58
0
my $token = '';
59
0
my $url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1/?op=GETDELEGATIONTOKEN&renewer=' . $self->{'user'};
60
0
0
if ($self->{'authmethod'} eq 'gssapi') {
61
0
$self->get( $url );
62
0
0
if ( $self->success() ) {
63
0
$token = substr ($self->content(), 23 , -3);
64
}
65
0
$self->{'webhdfstoken'}=$token;
66
} else {
67
0
carp "getdelgation token only valid when using GSSAPI" ;
68
}
69
0
return $self;
70
}
71
72
sub canceldelegationtoken {
73
# Tell namenode to cancel existing delegation token and remove token from object
74
0
0
1
my ( $self ) = shift;
75
0
0
if ($self->{'authmethod'} eq 'gssapi') { if ( $self->{'webhdfstoken'} ) {
0
0
76
0
my $url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1/?op=CANCELDELEGATIONTOKEN&token=' . $self->{'webhdfstoken'};
77
0
$self->get( $url );
78
0
delete $self->{'webhdfstoken'}
79
}
80
} else {
81
0
carp "canceldelgation token only valid when using GSSAPI";
82
}
83
0
return $self;
84
}
85
86
sub renewdelegationtoken {
87
# Tell namenode to cancel existing delegation token and remove token from object
88
0
0
1
my ( $self ) = shift;
89
0
0
if ($self->{'authmethod'} eq 'gssapi') {
90
0
0
if ( $self->{'webhdfstoken'} ) {
91
0
my $url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1/?op=RENEWDELEGATION&token=' . $self->{'webhdfstoken'};
92
0
$self->get( $url );
93
0
delete $self->{'webhdfstoken'}
94
}
95
} else {
96
0
carp "canceldelgation token only valid when using GSSAPI";
97
}
98
0
return $self;
99
}
100
101
sub Open {
102
# curl -i -L "http://:/webhdfs/v1/?op=OPEN[&offset=][&length=][&buffersize=]"
103
104
0
0
1
my $self = shift;
105
0
my ( $file, $offset, $length, $buffersize ) = undef;
106
0
0
if ($_[0]->{'file'}) { $file = $_[0]->{'file'}; } else { croak("No HDFS file given to open"); }
0
0
107
0
0
if ($_[0]->{'offset'}) { $offset = $_[0]->{'offset'}; }
0
108
0
0
if ($_[0]->{'length'}) { $length = $_[0]->{'length'}; }
0
109
0
0
if ($_[0]->{'buffersize'}) { $buffersize = $_[0]->{'buffersize'}; }
0
110
111
0
my $url;
112
0
0
if ($self->{'authmethod'} eq 'gssapi') {
0
0
113
0
$url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $file . '?op=OPEN';
114
} elsif ( $self->{'authmethod'} eq 'unsecure' ) {
115
0
0
croak ("I need a 'user' value if authmethod is 'none'") if ( !$self->{'user'} ) ;
116
0
$url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $file . '?op=OPEN' . '&user.name=' . $self->{'user'};
117
} elsif ( $self->{'authmethod'} eq 'doas' ) {
118
0
0
croak ("I need a 'user' value if authmethod is 'doas'") if ( !$self->{'user'} ) ;
119
0
0
croak ("I need a 'doas_user' value if authmethod is 'doas'") if ( !$self->{'doas_user'} ) ;
120
0
$url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $file . '?op=OPEN' . '&user.name=' . $self->{'user'} . '&doas=' . $self->{'doas_user'};
121
}
122
0
0
if ( $self->{'webhdfstoken'} ) {
123
0
$url = $url . "&delegation=" . $self->{'webhdfstoken'};
124
}
125
126
0
0
if ($offset) {
127
0
$url = $url . "&offset=" . $offset;
128
}
129
130
0
0
if ($length) {
131
0
$url = $url . "&length=" . $length;
132
}
133
134
0
0
if ($buffersize) {
135
0
$url = $url . "&buffersize=" . $buffersize;
136
}
137
138
0
$self->get( $url );
139
0
return $self;
140
}
141
142
sub getfilestatus {
143
# curl -i "http://:/webhdfs/v1/?op=GETFILESTATUS"
144
0
0
1
my ( $self, $file ) = undef;
145
0
$self = shift;
146
0
0
if ($_[0]->{'file'}) { $file = $_[0]->{'file'}; } else { croak ("Need HDFS filename before listing status") ;}
0
0
147
148
0
my $url;
149
0
0
if ($self->{'authmethod'} eq 'gssapi') {
0
0
150
0
$url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $file . '?op=GETFILESTATUS';
151
} elsif ( $self->{'authmethod'} eq 'unsecure' ) {
152
0
0
croak ("I need a 'user' value if authmethod is 'none'") if ( !$self->{'user'} ) ;
153
0
$url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $file . '?op=GETFILESTATUS' . '&user.name=' . $self->{'user'};
154
} elsif ( $self->{'authmethod'} eq 'doas' ) {
155
0
0
croak ("I need a 'user' value if authmethod is 'doas'") if ( !$self->{'user'} ) ;
156
0
0
croak ("I need a 'doas_user' value if authmethod is 'doas'") if ( !$self->{'doas_user'} ) ;
157
0
$url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $file . '?op=GETFILESTATUS' . '&user.name=' . $self->{'user'} . '&doas=' . $self->{'doas_user'};
158
}
159
0
0
if ( $self->{'webhdfstoken'} ) {
160
0
$url = $url . "&delegation=" . $self->{'webhdfstoken'};
161
}
162
0
$self->get( $url );
163
0
return $self;
164
}
165
166
# Added as LWP::UserAgent and WWW:Mechanize don't have delete
167
# stolen from http://code.google.com/p/www-mechanize/issues/detail?id=219
168
sub _SUPER_delete {
169
0
0
require HTTP::Request::Common;
170
0
my($self, @parameters) = @_;
171
0
my @suff = $self->_process_colonic_headers(\@parameters,1);
172
0
return $self->request( HTTP::Request::Common::DELETE( @parameters ), @suff );
173
}
174
175
sub Delete {
176
# curl -i -X DELETE "http://:/webhdfs/v1/?op=DELETE[&recursive=]"
177
0
0
1
my ( $path, $recursive, $url ) = undef;
178
0
my $self = shift;
179
180
0
0
if ($_[0]->{'path'}) { $path = $_[0]->{'path'}; } else { croak("No HDFS path given to delete"); }
0
0
181
0
0
if ($_[0]->{'recursive'}) { $recursive = $_[0]->{'recursive'};}
0
182
183
0
0
if ($self->{'authmethod'} eq 'gssapi') {
0
0
184
0
$url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $path . '?op=DELETE';
185
} elsif ( $self->{'authmethod'} eq 'unsecure' ) {
186
0
0
croak ("I need a 'user' value if authmethod is 'none'") if ( !$self->{'user'} ) ;
187
0
$url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $path . '?op=DELETE' . '&user.name=' . $self->{'user'};
188
} elsif ( $self->{'authmethod'} eq 'doas' ) {
189
0
0
croak ("I need a 'user' value if authmethod is 'doas'") if ( !$self->{'user'} ) ;
190
0
0
croak ("I need a 'doas_user' value if authmethod is 'doas'") if ( !$self->{'doas_user'} ) ;
191
0
$url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $path . '?op=DELETE' . '&user.name=' . $self->{'user'} . '&doas=' . $self->{'doas_user'};
192
}
193
0
0
if ( $self->{'webhdfstoken'} ) {
194
0
$url = $url . "&delegation=" . $self->{'webhdfstoken'};
195
}
196
197
0
0
if ($recursive) {
198
0
$url = $url . "&recursive=true";
199
} else {
200
0
$url = $url . "&recursive=false";
201
}
202
203
0
$self->_SUPER_delete( $url );
204
0
return $self;
205
}
206
207
208
sub create {
209
# curl -i -X PUT "http://:/webhdfs/v1/?op=CREATE
210
# [&overwrite=][&blocksize=][&replication=]
211
# [&permission=][&buffersize=]"
212
0
0
1
my ( $self, $content, $file_src, $file_dest, $perms, $overwrite, $blocksize, $replication, $buffersize ) = undef;
213
0
$self = shift;
214
0
0
if ($_[0]->{'permission'}) { $perms = $_[0]->{'permission'}; } else { $perms = '000'; }
0
0
215
0
0
if ($_[0]->{'overwrite'}) { $overwrite = $_[0]->{'overwrite'}; } else { $overwrite='false'; }
0
0
216
0
0
if ($_[0]->{'srcfile'}) { $file_src = $_[0]->{'srcfile'}; } else { croak ("Need local source file to copy to HDFS") ;}
0
0
217
0
0
if ($_[0]->{'dstfile'}) { $file_dest = $_[0]->{'dstfile'}; } else { croak ("Need HDFS destination before file create can happen") ;}
0
0
218
0
0
if ($_[0]->{'blocksize'}) { $blocksize = $_[0]->{'blocksize'}; }
0
219
0
0
if ($_[0]->{'replication'}) { $replication = $_[0]->{'replication'}; }
0
220
0
0
if ($_[0]->{'buffersize'}) { $buffersize = $_[0]->{'buffersize'}; }
0
221
0
my $url;
222
0
0
if ($self->{'authmethod'} eq 'gssapi') {
0
0
223
0
$url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $file_dest . '?op=CREATE&permission=' . $perms . '&overwrite=' . $overwrite ;
224
} elsif ( $self->{'authmethod'} eq 'unsecure' ) {
225
0
0
croak ("I need a 'user' value if authmethod is 'none'") if ( !$self->{'user'} ) ;
226
0
$url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $file_dest . '?op=CREATE&permission=' . $perms . '&overwrite=' . $overwrite . '&user.name=' . $self->{'user'};
227
} elsif ( $self->{'authmethod'} eq 'doas' ) {
228
0
0
croak ("I need a 'user' value if authmethod is 'doas'") if ( !$self->{'user'} ) ;
229
0
0
croak ("I need a 'doas_user' value if authmethod is 'doas'") if ( !$self->{'doas_user'} ) ;
230
0
$url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $file_dest . '?op=CREATE&permission=' . $perms . '&overwrite=' . $overwrite . '&user.name=' . $self->{'user'} . '&doas=' . $self->{'doas_user'};
231
}
232
233
0
0
if ( $self->{'webhdfstoken'} ) { $url = $url . "&delegation=" . $self->{'webhdfstoken'}; }
0
234
235
0
0
if ( $blocksize ) { $url = $url . "&blocksize=" . $blocksize ; }
0
236
0
0
if ( $replication ) { $url = $url . "&replication=" . $replication ; }
0
237
0
0
if ( $buffersize ) { $url = $url . "&buffersize=" . $buffersize ; }
0
238
239
0
map_file($content => $file_src, "<");
240
0
$self->put( $url, content => $content );
241
0
return $self;
242
}
243
244
# TODO need to add 'append' method - for people wanting to corrupt hdfs. :)
245
246
sub mkdirs {
247
# curl -i -X PUT "http://:/?op=MKDIRS[&permission=]"
248
0
0
1
my ( $self, $perms, $path, $url ) = undef;
249
0
$self = shift;
250
0
0
if ($_[0]->{'path'}) { $path = $_[0]->{'path'}; } else { croak ("I need a HDFS location to create directory"); }
0
0
251
0
0
if ($_[0]->{'permissons'}) { $perms = $_[0]->{'permisssions'}; } else { $perms = '000'; }
0
0
252
253
254
0
0
if ($self->{'authmethod'} eq 'gssapi') {
0
0
255
0
$url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $path . '?&op=MKDIRS&permission=' . $perms ;
256
} elsif ( $self->{'authmethod'} eq 'unsecure' ) {
257
0
0
croak ("I need a 'user' value if authmethod is 'none'") if ( !$self->{'user'} ) ;
258
0
$url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $path . '?&op=MKDIRS&permission=' . $perms . '&user.name=' . $self->{'user'};
259
} elsif ( $self->{'authmethod'} eq 'doas' ) {
260
0
0
croak ("I need a 'user' value if authmethod is 'doas'") if ( !$self->{'user'} ) ;
261
0
0
croak ("I need a 'doas_user' value if authmethod is 'doas'") if ( !$self->{'doas_user'} ) ;
262
0
$url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $path . '?&op=MKDIRS&permission=' . $perms . '&user.name=' . $self->{'user'} . '&doas=' . $self->{'doas_user'};
263
}
264
265
0
0
if ( $self->{'webhdfstoken'} ) {
266
0
$url = $url . "&delegation=" . $self->{'webhdfstoken'};
267
}
268
0
$self->put( $url );
269
0
return $self;
270
}
271
272
sub getcontentsummary {
273
# curl -i "http://:/webhdfs/v1/?op=GETCONTENTSUMMARY"
274
0
0
1
my ( $self, $dir, $url ) = undef;
275
0
$self = shift;
276
0
0
if ($_[0]->{'directory'}) { $dir = $_[0]->{'directory'}; } else { croak ("I need a HDFS directory to return content summary"); }
0
0
277
278
0
0
if ($self->{'authmethod'} eq 'gssapi') {
0
0
279
0
$url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $dir . '?op=GETCONTENTSUMMARY';
280
} elsif ( $self->{'authmethod'} eq 'unsecure' ) {
281
0
0
croak ("I need a 'user' value if authmethod is 'none'") if ( !$self->{'user'} ) ;
282
0
$url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $dir . '?op=GETCONTENTSUMMARY' . '&user.name=' . $self->{'user'}; ;
283
} elsif ( $self->{'authmethod'} eq 'doas' ) {
284
0
0
croak ("I need a 'user' value if authmethod is 'doas'") if ( !$self->{'user'} ) ;
285
0
0
croak ("I need a 'doas_user' value if authmethod is 'doas'") if ( !$self->{'doas_user'} ) ;
286
0
$url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $dir . '?op=GETCONTENTSUMMARY' . '&user.name=' . $self->{'user'} . '&doas=' . $self->{'doas_user'};
287
}
288
289
0
0
if ( $self->{'webhdfstoken'} ) {
290
0
$url = $url . "&delegation=" . $self->{'webhdfstoken'};
291
}
292
293
0
$self->get( $url );
294
0
return $self;
295
}
296
297
298
sub getfilechecksum {
299
# get and return checksum for a file, curl -i "http://:/webhdfs/v1/?op=GETFILECHECKSUM"
300
0
0
1
my ( $self, $file, $url ) = undef;
301
0
$self = shift;
302
0
0
if ($_[0]->{'file'}) { $file = $_[0]->{'file'}; } else { croak ("I need a HDFS filename to get checksum"); }
0
0
303
304
0
0
if ($self->{'authmethod'} eq 'gssapi') {
0
0
305
0
$url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $file . '?op=GETFILECHECKSUM';
306
} elsif ( $self->{'authmethod'} eq 'unsecure' ) {
307
0
0
croak ("I need a 'user' value if authmethod is 'none'") if ( !$self->{'user'} ) ;
308
0
$url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $file . '?op=GETFILECHECKSUM' . '&user.name=' . $self->{'user'}; ;
309
} elsif ( $self->{'authmethod'} eq 'doas' ) {
310
0
0
croak ("I need a 'user' value if authmethod is 'doas'") if ( !$self->{'user'} ) ;
311
0
0
croak ("I need a 'doas_user' value if authmethod is 'doas'") if ( !$self->{'doas_user'} ) ;
312
0
$url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $file . '?op=GETFILECHECKSUM' . '&user.name=' . $self->{'user'} . '&doas=' . $self->{'doas_user'};
313
}
314
315
0
0
if ( $self->{'webhdfstoken'} ) {
316
0
$url = $url . "&delegation=" . $self->{'webhdfstoken'};
317
}
318
319
0
$self->get( $url );
320
0
return $self;
321
}
322
323
sub gethomedirectory {
324
# curl -i "http://:/webhdfs/v1/?op=GETHOMEDIRECTORY"
325
0
0
1
my $self = shift;
326
0
my $url;
327
328
0
0
if ($self->{'authmethod'} eq 'gssapi') {
0
0
329
0
$url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1?op=GETHOMEDIRECTORY';
330
} elsif ( $self->{'authmethod'} eq 'unsecure' ) {
331
0
0
croak ("I need a 'user' value if authmethod is 'none'") if ( !$self->{'user'} ) ;
332
0
$url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1?op=GETHOMEDIRECTORY' . '&user.name=' . $self->{'user'}; ;
333
} elsif ( $self->{'authmethod'} eq 'doas' ) {
334
0
0
croak ("I need a 'user' value if authmethod is 'doas'") if ( !$self->{'user'} ) ;
335
0
0
croak ("I need a 'doas_user' value if authmethod is 'doas'") if ( !$self->{'doas_user'} ) ;
336
0
$url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1?op=GETHOMEDIRECTORY' . '&user.name=' . $self->{'user'} . '&doas=' . $self->{'doas_user'};
337
}
338
339
0
0
if ( $self->{'webhdfstoken'} ) {
340
0
$url = $url . "&delegation=" . $self->{'webhdfstoken'};
341
}
342
343
0
$self->put( $url );
344
0
return $self;
345
}
346
347
sub setpermission {
348
# curl -i -X PUT "http://:/webhdfs/v1/?op=SETPERMISSION[&permission=]"
349
0
0
1
my ( $self, $path, $url, $perms ) = undef;
350
0
$self = shift;
351
0
0
if ($_[0]->{'path'}) { $path = $_[0]->{'path'}; } else { croak ("I need a HDFS path to set permmissions"); }
0
0
352
0
0
if ($_[0]->{'permissison'}) { $perms = $_[0]->{'permisssion'}; }
0
353
354
0
0
if ($self->{'authmethod'} eq 'gssapi') {
0
0
355
0
$url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $path . '?op=SETPERMISSION';
356
} elsif ( $self->{'authmethod'} eq 'unsecure' ) {
357
0
0
croak ("I need a 'user' value if authmethod is 'none'") if ( !$self->{'user'} ) ;
358
0
$url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $path . '?op=SETPERMISSION' . '&user.name=' . $self->{'user'}; ;
359
} elsif ( $self->{'authmethod'} eq 'doas' ) {
360
0
0
croak ("I need a 'user' value if authmethod is 'doas'") if ( !$self->{'user'} ) ;
361
0
0
croak ("I need a 'doas_user' value if authmethod is 'doas'") if ( !$self->{'doas_user'} ) ;
362
0
$url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $path . '?op=SETPERMISSION' . '&user.name=' . $self->{'user'} . '&doas=' . $self->{'doas_user'};
363
}
364
365
0
0
if ( $self->{'webhdfstoken'} ) {
366
0
$url = $url . "&delegation=" . $self->{'webhdfstoken'};
367
}
368
369
0
0
if ($perms) {
370
0
$url = $url . "&permission=" . $perms;
371
}
372
373
0
$self->put( $url );
374
0
return $self;
375
}
376
377
sub setowner {
378
# curl -i -X PUT "http://:/webhdfs/v1/?op=SETOWNER[&owner=][&group=]"
379
0
0
1
my ( $self, $path, $user, $group, $url ) = undef;
380
0
$self = shift;
381
0
0
if ($_[0]->{'path'} ) { $path = $_[0]->{'path'}; } else { croak ("I need a HDFS path before changing ownership"); }
0
0
382
0
0
if ($_[0]->{'user'} ) { $user = $_[0]->{'user'}; }
0
383
0
0
if ($_[0]->{'group'}) { $group = $_[0]->{'group'}; }
0
384
385
0
0
if ($self->{'authmethod'} eq 'gssapi') {
0
0
386
0
$url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $path . '?op=SETOWNER';
387
} elsif ( $self->{'authmethod'} eq 'unsecure' ) {
388
0
0
croak ("I need a 'user' value if authmethod is 'none'") if ( !$self->{'user'} ) ;
389
0
$url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $path . '?op=SETOWNER' . '&user.name=' . $self->{'user'}; ;
390
} elsif ( $self->{'authmethod'} eq 'doas' ) {
391
0
0
croak ("I need a 'user' value if authmethod is 'doas'") if ( !$self->{'user'} ) ;
392
0
0
croak ("I need a 'doas_user' value if authmethod is 'doas'") if ( !$self->{'doas_user'} ) ;
393
0
$url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $path . '?op=SETOWNER' . '&user.name=' . $self->{'user'} . '&doas=' . $self->{'doas_user'};
394
}
395
396
0
0
if ($user) {
397
0
$url = $url . "&owner=" . $user;
398
}
399
0
0
if ($group) {
400
0
$url = $url . "&group=" . $group;
401
}
402
403
0
0
if ( $self->{'webhdfstoken'} ) {
404
0
$url = $url . "&delegation=" . $self->{'webhdfstoken'};
405
}
406
407
0
$self->put( $url );
408
0
return $self;
409
}
410
411
412
sub setreplication {
413
#curl -i -X PUT "http://:/webhdfs/v1/?op=SETREPLICATION[&replication=]"
414
0
0
1
my ( $self, $path, $rep,$url ) = undef;
415
0
$self = shift;
416
0
0
if ($_[0]->{'path'} ) { $path = $_[0]->{'path'}; } else { croak ("I need a HDFS path before changing ownership"); }
0
0
417
0
0
if ($_[0]->{'replication'} ) { $rep = $_[0]->{'replication'}; }
0
418
419
0
0
if ($self->{'authmethod'} eq 'gssapi') {
0
0
420
0
$url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $path . '?op=SETREPLICATION';
421
} elsif ( $self->{'authmethod'} eq 'unsecure' ) {
422
0
0
croak ("I need a 'user' value if authmethod is 'none'") if ( !$self->{'user'} ) ;
423
0
$url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $path . '?op=SETREPLICATION' . '&user.name=' . $self->{'user'}; ;
424
} elsif ( $self->{'authmethod'} eq 'doas' ) {
425
0
0
croak ("I need a 'user' value if authmethod is 'doas'") if ( !$self->{'user'} ) ;
426
0
0
croak ("I need a 'doas_user' value if authmethod is 'doas'") if ( !$self->{'doas_user'} ) ;
427
0
$url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $path . '?op=SETREPLICATION' . '&user.name=' . $self->{'user'} . '&doas=' . $self->{'doas_user'};
428
}
429
430
0
0
if ($rep) {
431
0
$url = $url . "&replication=" . $rep;
432
}
433
434
0
0
if ( $self->{'webhdfstoken'} ) {
435
0
$url = $url . "&delegation=" . $self->{'webhdfstoken'};
436
}
437
438
0
$self->put( $url );
439
0
return $self;
440
}
441
442
443
sub settimes {
444
# curl -i -X PUT "http://:/webhdfs/v1/?op=SETTIMES[&modificationtime=][&accesstime=]"
445
446
0
0
1
my ( $url, $self, $path, $modtime, $accesstime ) = undef;
447
0
$self = shift;
448
0
0
if ($_[0]->{'path'} ) { $path = $_[0]->{'path'}; } else { croak ("I need a HDFS path before changing ownership"); }
0
0
449
0
0
if ($_[0]->{'modificationtime'} ) { $modtime = $_[0]->{'modificationtime'}; }
0
450
0
0
if ($_[0]->{'accesstime'} ) { $accesstime = $_[0]->{'accesstime'}; }
0
451
452
0
0
if ($self->{'authmethod'} eq 'gssapi') {
0
0
453
0
$url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $path . '?op=SETTIMES';
454
} elsif ( $self->{'authmethod'} eq 'unsecure' ) {
455
0
0
croak ("I need a 'user' value if authmethod is 'none'") if ( !$self->{'user'} ) ;
456
0
$url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $path . '?op=SETTIMES' . '&user.name=' . $self->{'user'}; ;
457
} elsif ( $self->{'authmethod'} eq 'doas' ) {
458
0
0
croak ("I need a 'user' value if authmethod is 'doas'") if ( !$self->{'user'} ) ;
459
0
0
croak ("I need a 'doas_user' value if authmethod is 'doas'") if ( !$self->{'doas_user'} ) ;
460
0
$url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $path . '?op=SETTIMES' . '&user.name=' . $self->{'user'} . '&doas=' . $self->{'doas_user'};
461
}
462
463
0
0
if ($modtime) {
464
0
$url = $url . "&modificationtime=" . $modtime;
465
}
466
467
0
0
if ($accesstime) {
468
0
$url = $url . "&accesstime=" . $accesstime;
469
}
470
471
0
0
if ( $self->{'webhdfstoken'} ) {
472
0
$url = $url . "&delegation=" . $self->{'webhdfstoken'};
473
}
474
475
0
$self->put( $url );
476
0
return $self;
477
478
}
479
480
481
sub liststatus {
482
# list contents of directory, curl -i "http://:/webhdfs/v1/?op=LISTSTATUS"
483
0
0
1
my ( $self, $path, $url ) = undef;
484
0
$self = shift;
485
0
0
if ($_[0]->{'path'}) { $path = $_[0]->{'path'}; } else { croak ("I need a HDFS pathname to get status"); }
0
0
486
487
0
0
if ($self->{'authmethod'} eq 'gssapi') {
0
0
488
0
$url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $path . '?op=LISTSTATUS';
489
} elsif ( $self->{'authmethod'} eq 'unsecure' ) {
490
0
0
croak ("I need a 'user' value if authmethod is 'none'") if ( !$self->{'user'} ) ;
491
0
$url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $path . '?op=LISTSTATUS' . '&user.name=' . $self->{'user'}; ;
492
} elsif ( $self->{'authmethod'} eq 'doas' ) {
493
0
0
croak ("I need a 'user' value if authmethod is 'doas'") if ( !$self->{'user'} ) ;
494
0
0
croak ("I need a 'doas_user' value if authmethod is 'doas'") if ( !$self->{'doas_user'} ) ;
495
0
$url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $path . '?op=LISTSTATUS' . '&user.name=' . $self->{'user'} . '&doas=' . $self->{'doas_user'};
496
}
497
498
0
0
if ( $self->{'webhdfstoken'} ) {
499
0
$url = $url . "&delegation=" . $self->{'webhdfstoken'};
500
}
501
0
$self->get( $url );
502
0
return $self;
503
}
504
505
sub rename {
506
# curl -i -X PUT ":/webhdfs/v1/?op=RENAME&destination="
507
#my $url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $src . '?op=RENAME&destination=' . $dst;
508
509
0
0
1
my ( $self, $src, $dst ) = undef;
510
0
$self = shift;
511
0
0
if ($_[0]->{'srcfile'}) { $src = $_[0]->{'srcfile'}; } else { croak ("Need HDFS source before rename can happen") ;}
0
0
512
0
0
if ($_[0]->{'dstfile'}) { $dst = $_[0]->{'dstfile'}; } else { croak ("Need HDFS destination before rename can happen") ;}
0
0
513
514
0
my $url;
515
0
0
if ($self->{'authmethod'} eq 'gssapi') {
0
0
516
0
$url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $src . '?op=RENAME&destination=' . $dst ;
517
} elsif ( $self->{'authmethod'} eq 'unsecure' ) {
518
0
0
croak ("I need a 'user' value if authmethod is 'none'") if ( !$self->{'user'} ) ;
519
0
$url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $src . '?op=RENAME&destination=' . $dst . '&user.name=' . $self->{'user'};
520
} elsif ( $self->{'authmethod'} eq 'doas' ) {
521
0
0
croak ("I need a 'user' value if authmethod is 'doas'") if ( !$self->{'user'} ) ;
522
0
0
croak ("I need a 'doas_user' value if authmethod is 'doas'") if ( !$self->{'doas_user'} ) ;
523
0
$url = 'http://' . $self->{'namenode'} . ':' . $self->{'namenodeport'} . '/webhdfs/v1' . $src . '?op=RENAME&destination=' . $dst . '&user.name=' . $self->{'user'} . '&doas=' . $self->{'doas_user'};
524
}
525
526
0
0
if ( $self->{'webhdfstoken'} ) {
527
0
$url = $url . "&delegation=" . $self->{'webhdfstoken'};
528
}
529
0
$self->put( $url );
530
}
531
532
533
=pod
534
535
=head1 NAME
536
537
Apache::Hadoop::WebHDFS - interface to Hadoop's WebHDS API that supports GSSAPI/SPNEGO (secure) access.
538
539
=head1 VERSION
540
541
Version 0.04
542
543
=head1 SYNOPSIS
544
545
Hadoop's WebHDFS API, is a rest interface to HDFS. This module provides
546
a perl interface to the API, allowing one to both read and write files to
547
HDFS. Because Apache::Hadoop::WebHDFS supports GSSAPI, it can be used to
548
interface with secure Hadoop Clusters. This module also supports WebHDFS connections
549
with unsecure grids
550
551
Apache::Hadoop::WebHDFS is a subclass of WWW:Mechanize, so one could
552
reference WWW::Mechanize methods if needed. One will note that
553
WWW::Mechanize is a subclass of LWP, meaning it's possible to also reference
554
LWP methods from Apache::Hadoop::WebHDFS. For example to debug the GSSAPI
555
calls used during the request, enable LWP::Debug by adding 'use LWP::Debug qw(+);' to your script.
556
557
Content returned from WebHDFS is left in the native JSON format. Including your favorite JSON module like JSON::Any
558
will help with mangaging the JSON output. To get access to the content stored in your Apache::Hadoop::WebHDFS object,
559
use the methods provided by WWW::Mechanize, such as 'success', 'status', and 'content'. Please see 'EXAMPLE' below
560
for how this is used.
561
562
563
=head1 METHODS
564
565
=over 3
566
567
=item * new() - creates a new WebHDFS object. Required keys are 'user', 'namenode', 'namenodeport', and 'authmethod'. Default values for 'namenode' and 'namenodeport' are listed below. The default value for authmethod is 'gssapi', which is used on grids where SPNEGO has been enabled. The 'doasuser' is optional and intended to be used when proxying the WebHDFS request as another user.
568
569
my $hdfsclient = new({ namenode => "localhost",
570
namenodeport => "50070",
571
authmethod => "gssapi|unsecure|doas",
572
user => 'user1',
573
doasuser => 'user2',
574
});
575
576
577
=item * getdelegationtoken() - gets a delegation token from the namenode. This token is stored within the WebHDFS object and automatically appended to each WebHDFS request. Delegation tokens are used on grids with security enabled.
578
579
$hdfsclient->getdelegationtoken();
580
581
=item * renewdelegationtoken() - renews a delegation token from the namenode.
582
583
$hdfsclient->renewdelegationtoken();
584
585
=item * canceldelegationtoken() - informs the namenode to invalidate the delegation token as it's no longer needed. When calling this method, the delegation token is also removed from the perl WebHDFS object.
586
587
$hdfsclient->canceldelegationtoken();
588
589
=item * Open() - opens file on HDFS and returns it's content The only required value for Open() is 'file', all others are optional. The values, 'offset', 'length', and 'buffersize', are meant to be sized in bytes.
590
591
$hdfsclient->Open({ file=>'/path/to/my/hdfs/file',
592
offset=>'1024',
593
length=>'2048',
594
buffersize=>'1024',
595
});
596
597
=item * create() - creates and writes to a file on HDFS Required values for create are 'srcfile' which is local, and dstfile which is the path for the new file on HDFS. 'blocksize' is represented in bytes and 'overwrite' has two valid values of 'true' or 'false'. While not required, if permissions are not provided they will default to '000'.
598
599
$hdfsclient->create({ srcfile=>'/my/local/file.txt',
600
dstfile=>'/my/hdfs/location/file.txt',
601
blocksize=>'524288',
602
replication=>'3',
603
buffersize=>'1024',
604
overwrite=>'true|false',
605
permission=>'644',
606
});
607
608
=item * rename() - renames a file on HDFS. Required values for rename are 'srcfile' and 'dstfile', both of which represent HDFS filenames.
609
610
$hdfsclient->rename({ srcfile=>'/my/old/hdfs/file.txt',
611
dstfile=>'my/new/hdfs/file.txt',
612
});
613
614
=item * getfilestatus() - returns a json structure containing status of file or directory. Required input is a HDFS path.
615
616
$hdfsclient->getfilestatus({ file=>'/path/to/my/hdfs/file.txt' });
617
618
=item * liststatus() - returns a json structure of contents inside a directory. Note the timestamps are java timestamps so divide by 1000 to convert to ctime before attempting to format time value.
619
620
$hdfsclient->liststatus({ path=>'/path/to/my/hdfs/directory' });
621
622
=item * mkdirs() - creates a directory on HDFS. The only required input value is path. Their is an optional input value named permissions and if not provided will default to '000'.
623
624
$hdfsclient->mkdirs({ path=>'/path/to/my/hdfs/directory',
625
permissions=>'755',
626
});
627
628
=item * getfilechecksum() - gets HDFS checksum on file. Note this is the crc32 checksum that HDFS uses to detect file corruption. It's not the checksum of the file itself. The only required input value is 'file'.
629
630
$hdfsclient->getfilechecksum({ file=>'/path/to/my/hdfs/directory' });
631
632
=item * Delete() - removes file or directories from HDFS. The only required input value is 'path'. The other optional value is 'recursive' which takes a 'true|false' arguement.
633
634
$hdfsclient->Delete({ path=>'/path/to/my/hdfs/directory',
635
recursive=>'true|false',
636
});
637
638
=item * getcontentsummary() - list metadata information on a directory. This includes things like file count and quota usage for that directory. The only input value is a path to a HDFS directory.
639
640
$hdfsclient->getcontentsummary({ directory=>'/path/to/my/hdfs/directory' });
641
642
=item * getfilestatus() - returns access times, blocksize, and permissions on a HDFS file.
643
644
$hdfsclient->getfilestatus({ file=>'/path/to/my/hdfs/file' });
645
646
=item * gethomedirectory() - returns path to the home directory for the user or 'proxy user'. There is no input for this method.
647
648
$hdfsclient->gethomedirectory();
649
650
=item * setowner() - changes owner and group ownership on a file or directory on HDFS. The only required input is 'path'.
651
652
$hdfsclient->setowner({ path=>'/path/to/my/hdfs/directory',
653
user=>'cartman',
654
group=>'fifthgraders',
655
});
656
657
=item * setpermission() - changes owner and group permissions on a file or directory on HDFS. Path is required and permissions are optional.
658
659
$hdfsclient->setpermisssion({ path=>'/path/to/my/hdfs/directory',
660
permisssion=>'640',
661
});
662
663
664
=item * setreplication() - changes replication count for a file on HDFS. Path is required, replication is optional.
665
666
$hdfsclient->setreplication({ path=>'/path/to/my/hdfs/directory',
667
replication=>'10',
668
});
669
670
671
=item * settimes() - changes access and modifcation time for a file or directory on HDFS. Path is required, both access and modification times are optional. Remember these times are in java time, so make sure to convert ctime to java time by multiplying by 1000.
672
673
$hdfsclient->setreplication({ path=>'/path/to/my/hdfs/directory',
674
modificationtime=>$mymodtime,
675
accesstime=>$myatime,
676
});
677
678
=back
679
680
681
=head1 REQUIREMENTS
682
683
Carp is used for various warnings and errors.
684
WWW::Mechanize is needed as this is a subclass.
685
LWP::Debug is required for debugging GSSAPI connections
686
LWP::Authen::Negotiate is the magic sauce for working with secure hadoop clusters
687
parent included with Perl 5.10.1 and newer or found on CPAN for older versions of perl
688
File::Map required for reading contents of files into mmap'ed memory space instead of perl's symbol table.
689
690
=head1 EXAMPLE
691
692
=head2 list a HDFS directory on a secure hadop cluster
693
694
#!/usr/bin/perl
695
use strict;
696
use warnings;
697
use Data::Dumper;
698
use Apache::Hadoop::WebHDFS;
699
my $username=getlogin();
700
my $hdfsclient = Apache::Hadoop::WebHDFS->new( {namenode =>"mynamenode.example.com",
701
namenodeport =>"50070",
702
authmethod =>"gssapi",
703
user =>$username,
704
});
705
$hdfsclient->liststatus( {path=>'/user/$username'} );
706
if ($hdfsclient->success()) {
707
print "Request SUCCESS: ", $hdfsclient->status() , "\n\n";
708
print "Dumping content:\n";
709
print Dumper $hdfsclient->content() ;
710
} else {
711
print "Request FAILED: ", $hdfsclient->status() , "\n";
712
}
713
714
715
=head1 AUTHOR
716
717
Adam Faris, C<< >>
718
719
=head1 BUGS
720
721
Please use github to report bugs and feature requests
722
https://github.com/opsmekanix/Apache-Hadoop-WebHDFS/issues
723
724
=head1 SUPPORT
725
726
You can find documentation for this module with the perldoc command.
727
728
perldoc Apache::Hadoop::WebHDFS
729
730
731
You can also look for information at:
732
733
=over 4
734
735
=item * AnnoCPAN: Annotated CPAN documentation
736
737
L
738
739
=item * CPAN Ratings
740
741
L
742
743
=item * Search CPAN
744
745
L
746
747
=back
748
749
750
=head1 ACKNOWLEDGEMENTS
751
752
I would like to acknowledge Andy Lester plus the numerous people who have
753
worked on WWW::Mechanize, Anchim Grolms and team for providing
754
LWP::Authen::Negotiate, and the contributors to LWP. Thanks for providing
755
awesome modules.
756
757
=head1 LICENSE AND COPYRIGHT
758
759
Copyright 2013 Adam Faris.
760
761
Licensed under the Apache License, Version 2.0 (the "License");
762
you may not use this file except in compliance with the License.
763
You may obtain a copy of the License at
764
765
L
766
767
Unless required by applicable law or agreed to in writing, software
768
distributed under the License is distributed on an "AS IS" BASIS,
769
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
770
See the License for the specific language governing permissions and
771
limitations under the License.
772
773
774
=cut
775
776
777
return 1;