File Coverage

blib/lib/Net/Hadoop/WebHDFS/LWP.pm
Criterion Covered Total %
statement 27 128 21.0
branch 0 60 0.0
condition 0 29 0.0
subroutine 9 13 69.2
pod 1 2 50.0
total 37 232 15.9


line stmt bran cond sub pod time code
1             package Net::Hadoop::WebHDFS::LWP;
2             $Net::Hadoop::WebHDFS::LWP::VERSION = '0.012';
3 2     2   391041 use strict;
  2         3  
  2         74  
4 2     2   8 use warnings;
  2         4  
  2         107  
5 2     2   15 use parent 'Net::Hadoop::WebHDFS';
  2         3  
  2         13  
6              
7             # VERSION
8              
9 2     2   116964 use LWP::UserAgent;
  2         106252  
  2         97  
10 2     2   16 use Carp;
  2         5  
  2         151  
11 2     2   1186 use Ref::Util qw( is_arrayref );
  2         5551  
  2         196  
12 2     2   14 use Scalar::Util qw( openhandle );
  2         4  
  2         90  
13 2     2   1085 use HTTP::Request::StreamingUpload;
  2         1327  
  2         91  
14              
15 2         3626 use constant UA_PASSTHROUGH_OPTIONS => qw(
16             cookie_jar
17             env_proxy
18             no_proxy
19             proxy
20 2     2   12 );
  2         3  
21              
22             sub new {
23 0     0 1   my $class = shift;
24 0           my %options = @_;
25 0   0       my $debug = delete $options{debug} || 0;
26 0   0       my $use_ssl = delete $options{use_ssl} || 0;
27              
28 0 0         require Data::Dumper if $debug;
29              
30 0           my $self = $class->SUPER::new(@_);
31              
32             # we don't need Furl
33 0           delete $self->{furl};
34              
35 0           $self->{debug} = $debug;
36              
37             # default timeout is a bit short, raise it
38 0   0       $self->{timeout} = $options{timeout} || 30;
39              
40             # For filehandle upload support
41 0   0       $self->{chunksize} = $options{chunksize} || 4096;
42              
43             $self->{ua_opts} = {
44             map {
45 0           exists $options{$_} ? (
46 0 0         $_ => $options{ $_ }
47             ) : ()
48             } UA_PASSTHROUGH_OPTIONS
49             };
50              
51 0           $self->_create_ua;
52              
53 0           $self->{use_ssl} = $use_ssl;
54              
55 0           return $self;
56             }
57              
58             # Code below copied and modified for LWP from Net::Hadoop::WebHDFS
59             #
60             sub request {
61 0     0 0   my ( $self, $host, $port, $method, $path, $op, $params, $payload, $header ) = @_;
62              
63 0 0         my $request_path = $op ? $self->build_path( $path, $op, %$params ) : $path;
64              
65 0 0         my $protocol = $self->{use_ssl} ? 'https' : 'http';
66              
67             # Note: ugly things done with URI, which is already used in the parent
68             # module. So we re-parse the path produced there. yuk.
69 0           my $uri = URI->new( $request_path, $protocol );
70              
71 0           $uri->host($host);
72 0           $uri->port($port);
73              
74 0           $uri->scheme( $protocol );
75              
76 0 0         printf STDERR "URI : %s\n", $uri if $self->{debug};
77              
78 0           my $req;
79              
80 0 0 0       if ( $payload && openhandle($payload) ) {
    0          
81             $req = HTTP::Request::StreamingUpload->new(
82             $method => $uri,
83             fh => $payload,
84             headers => HTTP::Headers->new( 'Content-Length' => -s $payload, ),
85             chunk_size => $self->{chunksize},
86 0           );
87             }
88             elsif ( ref $payload ) {
89 0           croak __PACKAGE__ . " does not accept refs as content, only scalars and FH";
90             }
91             else {
92 0           $req = HTTP::Request->new( $method => $uri );
93 0           $req->content($payload);
94             }
95              
96 0 0         if ( is_arrayref( $header ) ) {
97 0           while ( my ( $h_field, $h_value ) = splice( @{ $header }, 0, 2 ) ) {
  0            
98 0           $req->header( $h_field => $h_value );
99             }
100             }
101              
102 0           my $real_res = $self->{ua}->request($req);
103              
104 0           my $res = { code => $real_res->code, body => $real_res->decoded_content };
105 0           my $code = $real_res->code;
106              
107 0 0         printf STDERR "HTTP code : %s\n", $code if $self->{debug};
108              
109 0           my $headers = $real_res->headers;
110              
111 0 0         printf STDERR "Headers: %s", Data::Dumper::Dumper $headers if $self->{debug};
112              
113 0 0         for my $h_key ( keys %{ $headers || {} } ) {
  0            
114 0           my $h_value = $headers->{$h_key};
115              
116 0 0         if ( $h_key =~ m!^location$!i ) { $res->{location} = $h_value; }
  0 0          
117 0           elsif ( $h_key =~ m!^content-type$!i ) { $res->{content_type} = $h_value; }
118             }
119              
120 0 0 0       return $res if $res->{code} >= 200 and $res->{code} <= 299;
121 0 0 0       return $res if $res->{code} >= 300 and $res->{code} <= 399;
122              
123 0   0       my $errmsg = $res->{body} || 'Response body is empty...';
124 0           $errmsg =~ s/\n//g;
125              
126             # Attempt to strigfy the HTML message
127 0 0         if ( $errmsg =~ m{ \A }xmsi ) {
128 0 0         if ( my @errors = $self->_parse_error_from_html( $errmsg ) ) {
129             # @error can also be assigned to a hash as it is mapped
130             # to kay=>value pairs, however strigifying the message
131             # is enough for now
132 0           my @flat;
133 0           while ( my ( $key, $val ) = splice( @errors, 0, 2 ) ) {
134 0           push @flat, "$key: $val"
135             }
136             # reset to something meaningful now that we've removed the html cruft
137 0           $errmsg = join '. ', @flat;
138             }
139             }
140              
141 0 0         if ( $code == 400 ) {
    0          
    0          
    0          
    0          
142 0           croak "ClientError: $errmsg";
143             }
144             elsif ( $code == 401 ) {
145             # this error happens for secure clusters when using Net::Hadoop::WebHDFS,
146             # but LWP::Authen::Negotiate takes care of it transparently in this module.
147             # we still may get this error on a secure cluster, when the credentials
148             # cache hasn't been initialized
149             my $extramsg = ( $headers->{'www-authenticate'} || '' ) eq 'Negotiate'
150 0 0 0       ? eval { require LWP::Authen::Negotiate; 1; }
  0 0          
  0            
151             ? q{ (Did you forget to run kinit?)}
152             : q{ (LWP::Authen::Negotiate doesn't seem available)}
153             : '';
154 0           croak "SecurityError$extramsg: $errmsg";
155             }
156             elsif ( $code == 403 ) {
157 0 0         if ( $errmsg =~ m{ \Qorg.apache.hadoop.ipc.StandbyException\E }xms ) {
158 0 0 0       if ( $self->{httpfs_mode} || not defined( $self->{standby_host} ) ) {
    0          
159              
160             # failover is disabled
161             }
162             elsif ( $self->{retrying} ) {
163              
164             # more failover is prohibited
165 0           $self->{retrying} = 0;
166             }
167             else {
168 0           $self->{under_failover} = not $self->{under_failover};
169 0           $self->{retrying} = 1;
170 0           my ( $next_host, $next_port ) = $self->connect_to;
171 0           my $val = $self->request(
172             $next_host,
173             $next_port,
174             $method,
175             $path,
176             $op,
177             $params,
178             $payload,
179             $header,
180             );
181 0           $self->{retrying} = 0;
182 0           return $val;
183             }
184             }
185 0           croak "IOError: $errmsg";
186             }
187             elsif ( $code == 404 ) {
188 0           croak "FileNotFoundError: $errmsg";
189             }
190             elsif ( $code == 500 ) {
191 0           croak "ServerError: $errmsg";
192             }
193             else {
194             # do nothing
195             }
196              
197             # catch-all exception
198 0           croak "RequestFailedError, code:$code, message:$errmsg";
199             }
200              
201             sub _create_ua {
202 0     0     my $self = shift;
203 0           my $class = ref $self;
204              
205             $self->{ua} = LWP::UserAgent->new(
206             requests_redirectable => [qw(
207             GET
208             HEAD
209             POST
210             PUT
211             )],
212 0           %{ $self->{ua_opts} },
  0            
213             );
214              
215             $self->{ua}->agent(
216 0   0       sprintf "%s %s",
217             $class,
218             $class->VERSION || 'beta',
219             );
220              
221 0           $self->{useragent} = $self->{ua}->agent;
222 0           $self->{ua}->timeout( $self->{timeout} );
223              
224 0           return $self;
225             }
226              
227             sub _parse_error_from_html {
228             # This is a brittle function as it assumes certain things to be present
229             # in the HTML output and will most likely break with future updates.
230             # However the interface returns HTML in certain cases (like secure clusters)
231             # and currently that's a failure on the backend where we can;t fix things.
232             #
233             # In any case, the program should default to the original message fetched,
234             # if this fails for any reason.
235             #
236 0     0     my $self = shift;
237 0           my $errmsg = shift;
238              
239 0 0         if ( ! eval { require HTML::Parser;} ) {
  0            
240 0 0         if ( $self->{debug} ) {
241 0           printf STDERR "Tried to parse the HTML error message but HTML::Parser is not available!\n";
242             }
243 0           return;
244             }
245              
246 0           my @errors;
247 0           my $p = HTML::Parser->new(
248             api_version => 3,
249             handlers => {
250             text => [
251             \@errors,
252             'event,text',
253             ],
254             }
255             );
256 0           $p->parse( $errmsg );
257              
258             my @flat = map {;
259 0           s{ \A \s+ }{}xmsg;
260 0           s{ \s+ \z }{}xmsg;
261 0           $_;
262             }
263             grep {
264 0   0       $_ !~ m{ \Q