File Coverage

blib/lib/Net/Hadoop/WebHDFS/LWP.pm
Criterion Covered Total %
statement 27 125 21.6
branch 0 58 0.0
condition 0 27 0.0
subroutine 9 13 69.2
pod 1 2 50.0
total 37 225 16.4


line stmt bran cond sub pod time code
1             package Net::Hadoop::WebHDFS::LWP;
2             $Net::Hadoop::WebHDFS::LWP::VERSION = '0.011';
3 1     1   110734 use strict;
  1         14  
  1         34  
4 1     1   6 use warnings;
  1         2  
  1         33  
5 1     1   5 use parent 'Net::Hadoop::WebHDFS';
  1         2  
  1         9  
6              
7             # VERSION
8              
9 1     1   47082 use LWP::UserAgent;
  1         45647  
  1         39  
10 1     1   10 use Carp;
  1         2  
  1         71  
11 1     1   553 use Ref::Util qw( is_arrayref );
  1         1648  
  1         87  
12 1     1   8 use Scalar::Util qw( openhandle );
  1         2  
  1         44  
13 1     1   474 use HTTP::Request::StreamingUpload;
  1         570  
  1         38  
14              
15 1         1909 use constant UA_PASSTHROUGH_OPTIONS => qw(
16             cookie_jar
17             env_proxy
18             no_proxy
19             proxy
20 1     1   10 );
  1         3  
21              
22             sub new {
23 0     0 1   my $class = shift;
24 0           my %options = @_;
25 0   0       my $debug = delete $options{debug} || 0;
26              
27 0 0         require Data::Dumper if $debug;
28              
29 0           my $self = $class->SUPER::new(@_);
30              
31             # we don't need Furl
32 0           delete $self->{furl};
33              
34 0           $self->{debug} = $debug;
35              
36             # default timeout is a bit short, raise it
37 0   0       $self->{timeout} = $options{timeout} || 30;
38              
39             # For filehandle upload support
40 0   0       $self->{chunksize} = $options{chunksize} || 4096;
41              
42             $self->{ua_opts} = {
43             map {
44 0           exists $options{$_} ? (
45 0 0         $_ => $options{ $_ }
46             ) : ()
47             } UA_PASSTHROUGH_OPTIONS
48             };
49              
50 0           $self->_create_ua;
51              
52 0           return $self;
53             }
54              
55             # Code below copied and modified for LWP from Net::Hadoop::WebHDFS
56             #
57             sub request {
58 0     0 0   my ( $self, $host, $port, $method, $path, $op, $params, $payload, $header ) = @_;
59              
60 0 0         my $request_path = $op ? $self->build_path( $path, $op, %$params ) : $path;
61              
62             # Note: ugly things done with URI, which is already used in the parent
63             # module. So we re-parse the path produced there. yuk.
64 0           my $uri = URI->new( $request_path, 'http' );
65              
66 0           $uri->host($host);
67 0           $uri->port($port);
68 0           $uri->scheme('http'); # no ssl for webhdfs? check the docs
69              
70 0 0         printf STDERR "URI : %s\n", $uri if $self->{debug};
71              
72 0           my $req;
73              
74 0 0 0       if ( $payload && openhandle($payload) ) {
    0          
75             $req = HTTP::Request::StreamingUpload->new(
76             $method => $uri,
77             fh => $payload,
78             headers => HTTP::Headers->new( 'Content-Length' => -s $payload, ),
79             chunk_size => $self->{chunksize},
80 0           );
81             }
82             elsif ( ref $payload ) {
83 0           croak __PACKAGE__ . " does not accept refs as content, only scalars and FH";
84             }
85             else {
86 0           $req = HTTP::Request->new( $method => $uri );
87 0           $req->content($payload);
88             }
89              
90 0 0         if ( is_arrayref( $header ) ) {
91 0           while ( my ( $h_field, $h_value ) = splice( @{ $header }, 0, 2 ) ) {
  0            
92 0           $req->header( $h_field => $h_value );
93             }
94             }
95              
96 0           my $real_res = $self->{ua}->request($req);
97              
98 0           my $res = { code => $real_res->code, body => $real_res->decoded_content };
99 0           my $code = $real_res->code;
100              
101 0 0         printf STDERR "HTTP code : %s\n", $code if $self->{debug};
102              
103 0           my $headers = $real_res->headers;
104              
105 0 0         printf STDERR "Headers: %s", Data::Dumper::Dumper $headers if $self->{debug};
106              
107 0 0         for my $h_key ( keys %{ $headers || {} } ) {
  0            
108 0           my $h_value = $headers->{$h_key};
109              
110 0 0         if ( $h_key =~ m!^location$!i ) { $res->{location} = $h_value; }
  0 0          
111 0           elsif ( $h_key =~ m!^content-type$!i ) { $res->{content_type} = $h_value; }
112             }
113              
114 0 0 0       return $res if $res->{code} >= 200 and $res->{code} <= 299;
115 0 0 0       return $res if $res->{code} >= 300 and $res->{code} <= 399;
116              
117 0   0       my $errmsg = $res->{body} || 'Response body is empty...';
118 0           $errmsg =~ s/\n//g;
119              
120             # Attempt to strigfy the HTML message
121 0 0         if ( $errmsg =~ m{ \A }xmsi ) {
122 0 0         if ( my @errors = $self->_parse_error_from_html( $errmsg ) ) {
123             # @error can also be assigned to a hash as it is mapped
124             # to kay=>value pairs, however strigifying the message
125             # is enough for now
126 0           my @flat;
127 0           while ( my ( $key, $val ) = splice( @errors, 0, 2 ) ) {
128 0           push @flat, "$key: $val"
129             }
130             # reset to something meaningful now that we've removed the html cruft
131 0           $errmsg = join '. ', @flat;
132             }
133             }
134              
135 0 0         if ( $code == 400 ) {
    0          
    0          
    0          
    0          
136 0           croak "ClientError: $errmsg";
137             }
138             elsif ( $code == 401 ) {
139             # this error happens for secure clusters when using Net::Hadoop::WebHDFS,
140             # but LWP::Authen::Negotiate takes care of it transparently in this module.
141             # we still may get this error on a secure cluster, when the credentials
142             # cache hasn't been initialized
143             my $extramsg = ( $headers->{'www-authenticate'} || '' ) eq 'Negotiate'
144 0 0 0       ? eval { require LWP::Authen::Negotiate; 1; }
  0 0          
  0            
145             ? q{ (Did you forget to run kinit?)}
146             : q{ (LWP::Authen::Negotiate doesn't seem available)}
147             : '';
148 0           croak "SecurityError$extramsg: $errmsg";
149             }
150             elsif ( $code == 403 ) {
151 0 0         if ( $errmsg =~ m{ \Qorg.apache.hadoop.ipc.StandbyException\E }xms ) {
152 0 0 0       if ( $self->{httpfs_mode} || not defined( $self->{standby_host} ) ) {
    0          
153              
154             # failover is disabled
155             }
156             elsif ( $self->{retrying} ) {
157              
158             # more failover is prohibited
159 0           $self->{retrying} = 0;
160             }
161             else {
162 0           $self->{under_failover} = not $self->{under_failover};
163 0           $self->{retrying} = 1;
164 0           my ( $next_host, $next_port ) = $self->connect_to;
165 0           my $val = $self->request(
166             $next_host,
167             $next_port,
168             $method,
169             $path,
170             $op,
171             $params,
172             $payload,
173             $header,
174             );
175 0           $self->{retrying} = 0;
176 0           return $val;
177             }
178             }
179 0           croak "IOError: $errmsg";
180             }
181             elsif ( $code == 404 ) {
182 0           croak "FileNotFoundError: $errmsg";
183             }
184             elsif ( $code == 500 ) {
185 0           croak "ServerError: $errmsg";
186             }
187             else {
188             # do nothing
189             }
190              
191             # catch-all exception
192 0           croak "RequestFailedError, code:$code, message:$errmsg";
193             }
194              
195             sub _create_ua {
196 0     0     my $self = shift;
197 0           my $class = ref $self;
198              
199             $self->{ua} = LWP::UserAgent->new(
200             requests_redirectable => [qw(
201             GET
202             HEAD
203             POST
204             PUT
205             )],
206 0           %{ $self->{ua_opts} },
  0            
207             );
208              
209             $self->{ua}->agent(
210 0   0       sprintf "%s %s",
211             $class,
212             $class->VERSION || 'beta',
213             );
214              
215 0           $self->{useragent} = $self->{ua}->agent;
216 0           $self->{ua}->timeout( $self->{timeout} );
217              
218 0           return $self;
219             }
220              
221             sub _parse_error_from_html {
222             # This is a brittle function as it assumes certain things to be present
223             # in the HTML output and will most likely break with future updates.
224             # However the interface returns HTML in certain cases (like secure clusters)
225             # and currently that's a failure on the backend where we can;t fix things.
226             #
227             # In any case, the program should default to the original message fetched,
228             # if this fails for any reason.
229             #
230 0     0     my $self = shift;
231 0           my $errmsg = shift;
232              
233 0 0         if ( ! eval { require HTML::Parser;} ) {
  0            
234 0 0         if ( $self->{debug} ) {
235 0           printf STDERR "Tried to parse the HTML error message but HTML::Parser is not available!\n";
236             }
237 0           return;
238             }
239              
240 0           my @errors;
241 0           my $p = HTML::Parser->new(
242             api_version => 3,
243             handlers => {
244             text => [
245             \@errors,
246             'event,text',
247             ],
248             }
249             );
250 0           $p->parse( $errmsg );
251              
252             my @flat = map {;
253 0           s{ \A \s+ }{}xmsg;
254 0           s{ \s+ \z }{}xmsg;
255 0           $_;
256             }
257             grep {
258 0   0       $_ !~ m{ \Q