File Coverage

blib/lib/Net/Hadoop/YARN/ApplicationMaster.pm
Criterion Covered Total %
statement 38 101 37.6
branch 0 16 0.0
condition 0 26 0.0
subroutine 13 19 68.4
pod 3 3 100.0
total 54 165 32.7


line stmt bran cond sub pod time code
1             package Net::Hadoop::YARN::ApplicationMaster;
2             $Net::Hadoop::YARN::ApplicationMaster::VERSION = '0.203';
3 1     1   86384 use strict;
  1         10  
  1         25  
4 1     1   4 use warnings;
  1         2  
  1         23  
5 1     1   9 use 5.10.0;
  1         3  
6              
7             use constant {
8 1         68 RE_ARCHIVED_ERROR => qr{
9             Application .+?
10             \Qcould not be found, please try the history server\E
11             }xms,
12 1     1   5 };
  1         2  
13              
14 1     1   367 use Constant::FromGlobal DEBUG => { int => 1, default => 0, env => 1 };
  1         7575  
  1         6  
15              
16 1     1   183 use Carp ();
  1         2  
  1         11  
17 1     1   331 use Clone ();
  1         1915  
  1         22  
18 1     1   396 use HTML::PullParser;
  1         5561  
  1         29  
19 1     1   441 use Moo;
  1         9023  
  1         4  
20 1     1   1575 use Ref::Util ();
  1         1217  
  1         20  
21 1     1   6 use Scalar::Util ();
  1         2  
  1         24  
22              
23 1     1   374 use Net::Hadoop::YARN::HistoryServer;
  1         4  
  1         239  
24              
25             with 'Net::Hadoop::YARN::Roles::AppMasterHistoryServer';
26             with 'Net::Hadoop::YARN::Roles::Common';
27              
28             has '+servers' => (
29             default => sub {["localhost:8088"]},
30             );
31              
32             has history_object => (
33             is => 'rw',
34             isa => sub {
35             my $o = shift || return; # this is optional
36             if ( ! Scalar::Util::blessed $o
37             || ! $o->isa('Net::Hadoop::YARN::HistoryServer')
38             ) {
39             Carp::confess "$o is not a Net::Hadoop::YARN::HistoryServer";
40             }
41             },
42             lazy => 1,
43             default => sub { },
44             );
45              
46             my $PREFIX = '_' x 4;
47              
48             #<<<
49             my $methods_urls = {
50             jobs => ['/proxy/{appid}/ws/v1/mapreduce/jobs', 'job' ],
51             job => ['/proxy/{appid}/ws/v1/mapreduce/jobs/{jobid}', '' ],
52             jobconf => ['/proxy/{appid}/ws/v1/mapreduce/jobs/{jobid}/conf', '' ],
53             jobcounters => ['/proxy/{appid}/ws/v1/mapreduce/jobs/{jobid}/counters', 'counterGroup' ],
54             jobattempts => ['/proxy/{appid}/ws/v1/mapreduce/jobs/{jobid}/jobattempts', 'jobAttempt' ],
55             _get_tasks => ['/proxy/{appid}/ws/v1/mapreduce/jobs/{jobid}/tasks', 'task' ],
56             task => ['/proxy/{appid}/ws/v1/mapreduce/jobs/{jobid}/tasks/{taskid}', '' ],
57             taskcounters => ['/proxy/{appid}/ws/v1/mapreduce/jobs/{jobid}/tasks/{taskid}/counters', 'taskCounterGroup' ],
58             taskattempts => ['/proxy/{appid}/ws/v1/mapreduce/jobs/{jobid}/tasks/{taskid}/attempts', 'taskAttempt' ],
59             taskattempt => ['/proxy/{appid}/ws/v1/mapreduce/jobs/{jobid}/tasks/{taskid}/attempts/{attemptid}', '' ],
60             taskattemptcounters => ['/proxy/{appid}/ws/v1/mapreduce/jobs/{jobid}/tasks/{taskid}/attempts/{attemptid}/counters', 'taskAttemptCounterGroup' ],
61             };
62             #>>>
63              
64             # For each of the keys, at startup:
65             # - make a method, adding the path
66             # - pass the path and variables to a validation and substitution engine
67             # - execute the request
68             # - return the proper fragment of the JSON tree
69              
70             _mk_subs($methods_urls, { prefix => $PREFIX } );
71              
72             my %app_to_hist = (
73             jobs => [ job => qr{ \A job_[0-9]+ }xms ],
74             );
75              
76             foreach my $name ( keys %{ $methods_urls } ) {
77             my $base = $PREFIX . $name;
78 1     1   9 no strict qw( refs );
  1         2  
  1         711  
79             *{ $name } = sub {
80 0     0     my $self = shift;
81 0           my $args = Clone::clone( \@_ );
82 0           my @rv;
83              
84             eval {
85 0           @rv = $self->$base( @_ );
86 0           1;
87 0 0         } or do {
88 0   0       my $eval_error = $@ || 'Zombie error';
89 0 0 0       if ( $eval_error =~ RE_ARCHIVED_ERROR && $self->history_object ) {
90 0           @rv = $self->_collect_from_history(
91             $args,
92             $name,
93             $eval_error,
94             );
95             }
96             else {
97 0           Carp::confess $eval_error;
98             }
99             };
100              
101 0 0         return wantarray ? @rv : $rv[0];
102             };
103             }
104              
105             sub _collect_from_history {
106 0     0     my $self = shift;
107 0           my $args = shift;
108 0           my $name = shift;
109 0   0       my $error = shift || Carp::confess "No error message specified!";
110              
111 0   0       my $hist_method = $app_to_hist{ $name } || [ $name ];
112 0           my($hmethod, $hregex) = @{ $hist_method };
  0            
113              
114 0           if ( DEBUG ) {
115             print STDERR "Received HTML from the API. ",
116             "I will now attempt to collect the information from the history server\n";
117             printf STDERR "The error was: %s\n", $error
118             if DEBUG > 1;
119             }
120              
121 0           my @hist_param;
122 0 0 0       if ( $error =~ RE_ARCHIVED_ERROR && ( $name eq 'jobs' || $name eq 'job' ) ) {
      0        
123 0           print STDERR "Job was archived\n" if DEBUG;
124             @hist_param = (
125             map {
126 0           (my $c = $_) =~ s{ \bapplication_ }{job_}xms;
127 0           $c;
128 0           } @{ $args }
  0            
129             );
130             }
131             else {
132 0           print STDERR "Job was not available from he RM\n" if DEBUG;
133             @hist_param = (
134             $hregex
135 0 0         ? grep { $_ =~ $hregex }
  0            
136             $self->_extract_ids_from_error_html( $error )
137             : ()
138             );
139             }
140              
141 0           my @rv;
142             eval {
143 0           @rv = $self->history_object->$hmethod( @hist_param );
144 0           1;
145 0 0         } or do {
146 0   0       my $eval_error_hist = $@ || 'Zombie error';
147 0           Carp::confess "Received HTML from the API and attempting to map that to a historical job failed: $error\n$eval_error_hist\n";
148             };
149              
150 0           foreach my $thing ( @rv ) {
151 0 0         next if ! Ref::Util::is_hashref $thing;
152 0           $thing->{__from_history} = 1;
153             }
154              
155 0           return @rv;
156             }
157              
158             sub _extract_ids_from_error_html {
159 0     0     my $self = shift;
160 0   0       my $error = shift || Carp::confess "No error message specified!";
161 0           my(undef, $html) = split m{\Q
162 0           $html = '
163 0   0       my $parser = HTML::PullParser->new(
164             doc => \$html,
165             start => 'event, tagname, @attr',
166             report_tags => [qw( a )],
167             ) || Carp::confess "Can't parse HTML received from the API: $!";
168 0           my %link;
169 0           while ( my $token = $parser->get_token ) {
170 0 0         next if $token->[0] ne 'start';
171 0           my($type, $tag, %attr) = @{ $token };
  0            
172 0   0       my $link = $attr{href} || next;
173 0           $link{ $link }++;
174             }
175 0           my %id;
176 0           for my $link ( keys %link ) {
177 0           $id{ $_ }++ for $self->_extract_valid_params( $link );
178             }
179 0           return keys %id;
180             }
181              
182             sub info {
183 0     0 1   my $self = shift;
184 0           $self->mapreduce(@_);
185             }
186              
187             sub mapreduce {
188 0     0 1   my $self = shift;
189 0           my $app_id = shift;
190 0           my $res = $self->_get("{appid}/ws/v1/mapreduce/info");
191 0           return $res->{info};
192             }
193              
194             sub tasks {
195 0     0 1   my $self = shift;
196 0           $self->_get_tasks(@_);
197             }
198              
199             1;
200              
201             __END__