File Coverage

blib/lib/Net/Hadoop/YARN/Roles/Common.pm
Criterion Covered Total %
statement 36 38 94.7
branch n/a
condition n/a
subroutine 13 13 100.0
pod n/a
total 49 51 96.0


line stmt bran cond sub pod time code
1             package Net::Hadoop::YARN::Roles::Common;
2             $Net::Hadoop::YARN::Roles::Common::VERSION = '0.201';
3 5     5   23940 use strict;
  5         8  
  5         124  
4 5     5   15 use warnings;
  5         17  
  5         113  
5 5     5   54 use 5.10.0;
  5         12  
6              
7 5     5   18 use Moo::Role;
  5         13  
  5         28  
8              
9 5     5   1241 use Data::Dumper;
  5         8  
  5         566  
10 5     5   1943 use HTTP::Request;
  5         76216  
  5         128  
11 5     5   3072 use JSON::XS;
  5         20131  
  5         285  
12 5     5   3049 use LWP::UserAgent;
  5         79294  
  5         155  
13 5     5   2335 use Regexp::Common qw( net );
  5         9161  
  5         19  
14 5     5   10738 use Scalar::Util qw( blessed );
  5         7  
  5         348  
15 5     5   2492 use Socket;
  5         13214  
  5         1866  
16 5     5   32 use URI;
  5         5  
  5         101  
17 5     5   2117 use XML::LibXML::Simple;
  0            
  0            
18              
19             has _json => (
20             is => 'rw',
21             lazy => 1,
22             default => sub {
23             return JSON::XS->new->pretty(1)->canonical(1);
24             },
25             isa => sub {
26             my $json = shift;
27             if ( ! blessed $json
28             || ! $json->isa('JSON::XS')
29             || ! $json->can('decode')
30             ) {
31             die "Not a JSON object"
32             }
33             },
34             );
35              
36             has debug => (
37             is => 'rw',
38             default => sub { $ENV{NET_HADOOP_YARN_DEBUG} || 0 },
39             isa => sub { die 'debug should be an integer' if $_[0] !~ /^[0-9]$/ },
40             lazy => 1,
41             );
42              
43             has ua => (
44             is => 'rw',
45             default => sub {
46             return LWP::UserAgent->new(
47             env_proxy => 0,
48             timeout => $_[0]->timeout,
49             );
50             },
51             isa => sub {
52             my $ua = shift;
53             if ( ! blessed( $ua ) || ! $ua->isa("LWP::UserAgent") ) {
54             die "'ua' isn't a LWP::UserAgent";
55             }
56             },
57             lazy => 1,
58             );
59              
60             has timeout => (
61             is => 'rw',
62             default => sub {30},
63             lazy => 1,
64             isa => sub {
65             if ( $_[0] !~ /^[0-9]+$/ || $_[0] <= 0 ) {
66             die "timeout must be an integer"
67             }
68             },
69             );
70              
71             has servers => (
72             is => 'rw',
73             isa => sub {
74             die "Incorrect server list" if ! _check_servers(@_);
75             },
76             lazy => 1,
77             );
78              
79             has add_host_key => (
80             is => 'rw',
81             default => sub { 0 },
82             lazy => 1,
83             );
84              
85             has host_key => (
86             is => 'rw',
87             default => sub { '__RESTHost' },
88             lazy => 1,
89             );
90              
91             sub _check_host {
92             my $host = shift;
93             return !!( eval { inet_aton($host) }
94             || $host =~ $RE{net}{IPv4}
95             || $host =~ $RE{net}{IPv6} );
96             }
97              
98             sub _check_servers {
99             for my $server (@{+shift}) {
100             my ($host, $port) = split /:/, $server, 2;
101             if ( ! _check_host($host)
102             || $port !~ /^[0-9]+$/
103             || $port < 1
104             || $port > 19888
105             ) {
106             die "server $server bad host (port=$port)";
107             }
108             }
109             return 1;
110             }
111              
112             sub _mk_uri {
113             my $self = shift;
114             my ( $server, $path, $params ) = @_;
115             my $uri = $server . "/ws/v1/" . $path;
116             $uri =~ s#//+#/#g;
117             $uri = URI->new("http://" . $uri);
118             if ( $params ) {
119             $uri->query_form($params);
120             }
121             return $uri;
122             }
123              
124             # http://hadoop.apache.org/docs/r2.2.0/hadoop-yarn/hadoop-yarn-site/WebServicesIntro.html
125              
126             sub _get {
127             shift->_request( 'GET', @_ );
128             }
129              
130             sub _put {
131             shift->_request( 'PUT', @_ );
132             }
133              
134             sub _post {
135             shift->_request( 'POST', @_ );
136             }
137              
138             sub _request {
139             my $self = shift;
140             my ( $method, $path, $extra, $server ) = @_;
141              
142             my $host_key = $self->host_key;
143             my @servers = $server ? ( $server ) : @{ $self->servers };
144             my $maxtries = @servers;
145              
146             my ($eval_error, $ret, $n);
147              
148             # get a copy, don't mess with the global setting
149             #
150             my @banned_servers;
151             my $selected_server;
152              
153             TRY: for ( 1 .. $maxtries ) {
154             my $redo;
155              
156             $n++;
157              
158             if ( ! @servers ) {
159             $eval_error = sprintf "No servers left in the queue. Banned servers: '%s'",
160             @banned_servers
161             ? join( q{', '}, @banned_servers)
162             : '[none]',
163             ;
164             last TRY;
165             }
166              
167             $selected_server = $servers[0];
168             eval {
169             $eval_error = undef;
170              
171             my $uri = $self->_mk_uri(
172             $selected_server,
173             $path,
174             $method eq 'GET' ? $extra->{params} : (),
175             );
176              
177             print STDERR "====> $uri\n" if $self->debug;
178              
179             my $req = HTTP::Request->new( uc($method), $uri );
180             $req->header( "Accept-Encoding", "gzip" );
181             #$req->header( "Accept", "application/json" );
182             $req->header( "Accept", "application/xml" );
183              
184             my $response = $self->ua->request($req);
185              
186             if ( $response->code == 500 ) {
187             die "Bad request: $uri";
188             }
189              
190             # found out the json support is buggy at least in the scheduler
191             # info (overwrites child queues instead of making a list), revert
192             # to XML (see YARN-2336)
193              
194             my $res;
195             eval {
196             my $content = $response->decoded_content
197             || die 'No response from the server!';
198              
199             if ( $content !~ m{ \A ( \s+ )? <[?]xml }xms ) {
200             if ( $content =~ m{
201             \QThis is standby RM. Redirecting to the current active RM\E
202             }xms ) {
203             push @banned_servers, shift @servers;
204             $redo++;
205             die "Hit the standby with $selected_server";
206             }
207             die "Response doesn't look like XML: $content";
208             }
209              
210             $res = XMLin(
211             $content,
212             KeepRoot => 0,
213             KeyAttr => [],
214             ForceArray => [qw(
215             app
216             appAttempt
217             container
218             counterGroup
219             job
220             jobAttempt
221             task
222             taskAttempt
223             )],
224             ) || die "Failed to parse XML!";
225             1;
226             } or do {
227             # $self->_json->decode($content)
228             my $decode_error = $@ || 'Zombie error';
229              
230             # when redirected to the history server, a bug present in hadoop 2.5.1
231             # sends to an HTML page, ignoring the Accept-Type header
232             my $msg = $response->redirects
233             ? q{server response wasn't valid (possibly buggy redirect to HTML instead of JSON or XML)}
234             : q{server response wasn't valid JSON or XML}
235             ;
236              
237             die "$msg - $uri ($n/$maxtries): $decode_error";
238             };
239              
240             print STDERR Dumper $res if $self->debug;
241              
242             if ( $response->is_success ) {
243             $ret = $res;
244             return 1;
245             }
246              
247             my $e = $res->{RemoteException};
248              
249             die sprintf "%s (%s in %s) for URI: %s",
250             $e->{message} || $res->{message} || '[unknown message]',
251             $e->{exception} || $res->{exception} || '[unknown exception]',
252             $e->{javaClassName} || $res->{javaClassName} || '[unknown javaClassName]',
253             $uri,
254             ;
255              
256             1;
257             } or do {
258             # store the error for later; will be displayed if this is the last
259             # iteration. also use the next server in the list in case of retry,
260             # or reset the list for the next call (we went a full circle)
261             $eval_error = $@ || 'Zombie error';
262             redo TRY if $redo;
263             push @servers, shift @servers if @servers > 1;
264             };
265              
266             if ( $ret ) {
267             if ( $self->add_host_key ) {
268             # mark where we've been
269             $ret->{ $host_key } = $selected_server;
270             }
271             last TRY;
272             }
273              
274             } # retry as many times as there are servers
275              
276             if ( $eval_error ) {
277             die "Final error ($n/$maxtries): $eval_error";
278             }
279              
280             return $ret;
281             }
282              
283             1;
284              
285             __END__