File Coverage

blib/lib/Thrift/API/HiveClient2.pm
Criterion Covered Total %
statement 35 186 18.8
branch 0 68 0.0
condition 0 18 0.0
subroutine 12 31 38.7
pod 6 8 75.0
total 53 311 17.0


line stmt bran cond sub pod time code
1             package Thrift::API::HiveClient2;
2             $Thrift::API::HiveClient2::VERSION = '0.026';
3             {
4             $Thrift::API::HiveClient2::DIST = 'Thrift-API-HiveClient2';
5             }
6              
7             # ABSTRACT: Perl to HiveServer2 Thrift API wrapper
8              
9 3     3   602527 use 5.010;
  3         11  
10 3     3   19 use strict;
  3         5  
  3         101  
11 3     3   14 use warnings;
  3         6  
  3         176  
12 3     3   1810 use Moo;
  3         21456  
  3         17  
13 3     3   4515 use Carp;
  3         6  
  3         168  
14 3     3   12 use Scalar::Util qw( reftype blessed );
  3         5  
  3         122  
15 3     3   1589 use List::MoreUtils 'zip';
  3         57023  
  3         25  
16              
17 3     3   5394 use Thrift;
  3         7946  
  3         118  
18 3     3   1496 use Thrift::Socket;
  3         99758  
  3         171  
19 3     3   1791 use Thrift::BufferedTransport;
  3         2917  
  3         122  
20              
21             # Protocol loading is done dynamically later.
22              
23 3     3   3096 use Thrift::API::HiveClient2::TCLIService;
  3         17  
  3         8631  
24              
25             # See https://msdn.microsoft.com/en-us/library/ms711683(v=vs.85).aspx
26             my @odbc_coldesc_fields = qw(
27             TABLE_CAT
28             TABLE_SCHEM
29             TABLE_NAME
30             COLUMN_NAME
31             DATA_TYPE
32             TYPE_NAME
33             COLUMN_SIZE
34             BUFFER_LENGTH
35             DECIMAL_DIGITS
36             NUM_PREC_RADIX
37             NULLABLE
38             REMARKS
39             COLUMN_DEF
40             SQL_DATA_TYPE
41             SQL_DATETIME_SUB
42             CHAR_OCTET_LENGTH
43             ORDINAL_POSITION
44             IS_NULLABLE
45             );
46              
47             my @tabledesc_fields = qw(
48             TABLE_CAT
49             TABLE_SCHEM
50             TABLE_NAME
51             TABLE_TYPE
52             REMARKS
53             );
54              
55             # Don't use XS for now, fails initializing properly with BufferedTransport. See
56             # Thrift::XS documentation.
57             has use_xs => (
58             is => 'rwp',
59             default => sub {0},
60             lazy => 1,
61             );
62              
63             has host => (
64             is => 'ro',
65             default => sub {'localhost'},
66             );
67             has port => (
68             is => 'ro',
69             default => sub {10_000},
70             );
71             has sasl => (
72             is => 'ro',
73             default => 0,
74             );
75              
76             has use_ssl => (
77             is => 'ro',
78             default => 0,
79             );
80              
81             # Kerberos principal
82             # Usually in the format 'hive/{hostname}@REALM.COM';
83             has principal => (
84             is => 'rw',
85             );
86              
87             # 1 hour default recv socket timeout. Increase for longer-running queries
88             # called "timeout" for simplicity's sake, as this is how a user will experience
89             # it: a time after which the Thrift stack will throw an exception if not
90             # getting an answer from the server
91              
92             has timeout => (
93             is => 'rw',
94             default => sub {3_600},
95             );
96              
97             # These exist to make testing with various other Thrift Implementation classes
98             # easier, eventually.
99              
100             has _socket => ( is => 'rwp', lazy => 1 );
101             has _transport => ( is => 'rwp', lazy => 1 );
102             has _protocol => ( is => 'rwp', lazy => 1 );
103             has _client => ( is => 'rwp', lazy => 1 );
104             has _sasl => ( is => 'rwp', lazy => 1 );
105              
106             # setters implied by the 'rwp' mode on the attrs above.
107              
108 0     0     sub _set_socket { $_[0]->{_socket} = $_[1] }
109 0     0     sub _set_transport { $_[0]->{_transport} = $_[1] }
110 0     0     sub _set_protocol { $_[0]->{_protocol} = $_[1] }
111 0     0     sub _set_client { $_[0]->{_client} = $_[1] }
112              
113             sub _set_sasl {
114 0     0     my ( $self, $sasl ) = @_;
115 0 0         return if !$sasl;
116              
117             # This normally selects XS first (hopefully)
118 0           require Authen::SASL;
119 0           Authen::SASL->import;
120              
121 0           require Thrift::SASL::Transport;
122 0           Thrift::SASL::Transport->import;
123              
124 0 0         if ( $sasl == 1 ) {
    0          
125 0           return $self->{_sasl} = Authen::SASL->new( mechanism => 'GSSAPI' );
126             }
127             elsif ( reftype $sasl eq "HASH" ) {
128 0           return $self->{_sasl} = Authen::SASL->new(%$sasl); #, debug => 8 );
129             }
130 0           die "Incorrect parameter passed to _set_sasl";
131             }
132              
133             # after constructon is complete, initialize any attributes that
134             # weren't set in the constructor.
135             sub BUILD {
136 0     0 0   my $self = shift;
137              
138 0           my $thrift_socket;
139            
140 0           my $transport_class = 'Thrift::Socket';
141 0 0         if( $self->use_ssl ) {
142 0           require Thrift::SSLSocket;
143 0           $transport_class = 'Thrift::SSLSocket';
144             }
145              
146 0           $thrift_socket = $transport_class->new( $self->host, $self->port );
147              
148              
149 0 0         $self->_set_socket( $thrift_socket )
150             unless $self->_socket;
151 0           $self->_socket->setRecvTimeout( $self->timeout * 1000 );
152              
153 0 0 0       $self->_set_sasl( $self->sasl ) if ( $self->sasl && !$self->_sasl );
154              
155 0 0         if ( !$self->_transport ) {
156 0           my $transport = Thrift::BufferedTransport->new( $self->_socket );
157 0 0         if ( $self->_sasl ) {
158 0           my $debug = 0;
159 0           $self->_set_transport( Thrift::SASL::Transport->new( $transport, $self->_sasl, $debug, $self->principal ) );
160             }
161             else {
162 0           $self->_set_transport($transport);
163             }
164             }
165              
166 0 0         $self->_set_protocol( $self->_init_protocol( $self->_transport ) )
167             unless $self->_protocol;
168              
169 0 0         $self->_set_client( Thrift::API::HiveClient2::TCLIServiceClient->new( $self->_protocol ) )
170             unless $self->_client;
171             }
172              
173             sub _init_protocol {
174 0     0     my $self = shift;
175 0           my $err;
176             my $protocol = eval {
177 0 0         $self->use_xs
178             && require Thrift::XS::BinaryProtocol;
179 0           Thrift::XS::BinaryProtocol->new( $self->_transport );
180 0 0         } or do { $err = $@; 0 };
  0            
  0            
181             $protocol
182 0   0       ||= do { require Thrift::BinaryProtocol; Thrift::BinaryProtocol->new( $self->_transport ) };
  0            
  0            
183 0 0         $self->_set_use_xs(0) if ref($protocol) !~ /XS/;
184              
185             # TODO Add warning when XS was asked but failed to load
186 0           return $protocol;
187             }
188              
189             sub connect {
190 0     0 1   my ($self) = @_;
191 0           $self->_transport->open;
192             }
193              
194             has _session => (
195             is => 'rwp',
196             isa => sub {
197             my($val) = @_;
198             if ( !blessed( $val )
199             || !$val->isa('Thrift::API::HiveClient2::TOpenSessionResp')
200             ) {
201             die sprintf "Session `%s` isn't a Thrift::API::HiveClient2::TOpenSessionResp",
202             $val // '[undefined]'
203             ;
204             }
205             },
206             lazy => 1,
207             builder => '_build_session',
208             );
209              
210             has username => (
211             is => 'rwp',
212             lazy => 1,
213             default => sub { $ENV{USER} },
214             );
215              
216             has password => (
217             is => 'rwp',
218             lazy => 1,
219             default => sub {''},
220             );
221              
222             sub _build_session {
223 0     0     my $self = shift;
224 0 0         $self->_transport->open if !$self->_transport->isOpen;
225 0           return $self->_client->OpenSession(
226             Thrift::API::HiveClient2::TOpenSessionReq->new(
227             { username => $self->username,
228             password => $self->password,
229             }
230             )
231             );
232             }
233              
234             has _session_handle => (
235             is => 'rwp',
236             isa => sub {
237             my($val) = @_;
238             if ( !blessed( $val )
239             || !$val->isa('Thrift::API::HiveClient2::TSessionHandle')
240             ) {
241             die sprintf "Session handle `%s` isn't a Thrift::API::HiveClient2::TSessionHandle",
242             $val // '[undefined]'
243             ;
244             }
245             },
246             lazy => 1,
247             builder => '_build_session_handle',
248             predicate => '_has_session_handle',
249             );
250              
251             sub _build_session_handle {
252 0     0     my $self = shift;
253 0           return $self->_session->{sessionHandle};
254             }
255              
256             has _operation => (
257             is => "rwp",
258             isa => sub {
259             my($val) = @_;
260             if ( defined $val
261             && (
262             !blessed( $val )
263             || ( !$val->isa('Thrift::API::HiveClient2::TExecuteStatementResp')
264             && !$val->isa('Thrift::API::HiveClient2::TGetColumnsResp')
265             && !$val->isa('Thrift::API::HiveClient2::TGetTablesResp')
266             )
267             )
268             ) {
269             die "Operation `%s` isn't a Thrift::API::HiveClient2::T*Resp",
270             $val // '[undefined]'
271             ;
272             }
273             },
274             lazy => 1,
275             );
276              
277             has _operation_handle => (
278             is => 'rwp',
279             isa => sub {
280             my($val) = @_;
281             if (
282             defined $val
283             && ( !blessed( $val )
284             || !$val->isa('Thrift::API::HiveClient2::TOperationHandle')
285             )
286             ) {
287             die sprintf "Operation handle isn't a Thrift::API::HiveClient2::TOperationHandle",
288             $val // '[undefined]'
289             ;
290             }
291             },
292             lazy => 1,
293             );
294              
295             sub _cleanup_previous_operation {
296 0     0     my $self = shift;
297              
298             # We seeem to have some memory leaks in the Hive server, let's try freeing the
299             # operation handle explicitely
300 0 0         if ( $self->_operation_handle ) {
301 0           $self->_client->CloseOperation(
302             Thrift::API::HiveClient2::TCloseOperationReq->new(
303             { operationHandle => $self->_operation_handle, }
304             )
305             );
306 0           $self->_set__operation(undef);
307 0           $self->_set__operation_handle(undef);
308             }
309             }
310              
311             sub execute {
312 0     0 1   my $self = shift;
313 0           my ($query) = @_; # make this a bit more flexible
314              
315 0           $self->_cleanup_previous_operation;
316              
317 0           my $rh = $self->_client->ExecuteStatement(
318             Thrift::API::HiveClient2::TExecuteStatementReq->new(
319             { sessionHandle => $self->_session_handle, statement => $query, confOverlay => {} }
320             )
321             );
322 0 0         if ( $rh->{status}{errorCode} ) {
323 0           die __PACKAGE__ . "::execute: $rh->{status}{errorMessage}; HQL was: \"$query\"";
324             }
325 0           $self->_set__operation($rh);
326 0           $self->_set__operation_handle( $rh->{operationHandle} );
327 0           return $rh;
328             }
329              
330             {
331             # cache the column names we need to extract from the bloated data structure
332             # (keyed on query)
333             my ( $column_keys, $column_names );
334              
335             sub fetch_hashref {
336 0     0 1   my $self = shift;
337 0           my ( $rh, $rows_at_a_time ) = @_;
338 0           return $self->fetch( $rh, $rows_at_a_time, 1 );
339             }
340              
341             sub fetch {
342 0     0 1   my $self = shift;
343 0           my ( $rh, $rows_at_a_time, $use_hashref ) = @_;
344              
345             # if $rh looks like a number, use it instead of $rows_at_a_time
346             # it means we're using the new form for this call, which takes only the
347             # number of wanted rows, or even nothing (and relies on the defaults,
348             # and a cached copy of the query $rh)
349 0 0 0       $rows_at_a_time = $rh if ( $rh && !$rows_at_a_time && $rh =~ /^[1-9][0-9]*$/ );
      0        
350 0   0       $rows_at_a_time ||= 10_000;
351              
352 0           my $result = [];
353 0           my $has_more_rows;
354              
355             # NOTE we don't use the provided $rh any more, maybe we should leave
356             # that possibility open for parallel queries, but that would need a lot
357             # more testing. Patches welcome.
358 0           my $cached_rh = $self->_operation_handle;
359 0           $rh = $self->_client->FetchResults(
360             Thrift::API::HiveClient2::TFetchResultsReq->new(
361             { operationHandle => $cached_rh,
362             maxRows => $rows_at_a_time,
363             }
364             )
365             );
366 0 0         if ( ref $rh eq 'Thrift::API::HiveClient2::TFetchResultsResp' ) {
367              
368             # NOTE that currently (july 2013) the hasMoreRows method is broken,
369             # see the explanation in the POD
370 0           $has_more_rows = $rh->hasMoreRows();
371              
372 0 0         for my $row ( @{ $rh->{results}{rows} || [] } ) {
  0            
373              
374             # Find which fields to extract from each row, only on the first iteration
375 0 0         if ( !@{ $column_keys->{$cached_rh} || [] } ) {
  0 0          
376              
377             # metadata for the query
378 0 0         if ($use_hashref) {
379 0           my $rh_meta = $self->_client->GetResultSetMetadata(
380             Thrift::API::HiveClient2::TGetResultSetMetadataReq->new(
381             { operationHandle => $cached_rh }
382             )
383             );
384 0           $column_names = [ map { $_->{columnName} }
385 0 0         @{ $rh_meta->{schema}{columns} || [] } ];
  0            
386             }
387              
388             # TODO redo all this using the TGetResultSetMetadataResp object we retrieved
389             # above
390 0 0         for my $column ( @{ $row->{colVals} || [] } ) {
  0            
391              
392 0           my $first_col = {%$column};
393              
394             # Only 1 element of each TColumnValue is populated
395             # (although 7 keys are present, 1 for each possible data
396             # type) with a T*Value, and the rest is undef. Find out
397             # which is defined, and put the key (i.e. the data type) in
398             # cache, to reuse it to fetch the next rows faster.
399             # NOTE this data structure smells of Java and friends from
400             # miles away. Dynamically typed languages don't really need
401             # the bloat.
402 0           push @{ $column_keys->{$cached_rh} },
403 0           grep { ref $first_col->{$_} } keys %$first_col;
  0            
404             }
405             }
406              
407             # TODO find something faster? (see comment above)
408              
409 0           my $idx = 0;
410             my $retval = [
411 0           map { $_->value }
412 0           grep { defined $_ }
413 0           map { $row->{colVals}[ $idx++ ]{$_} } @{ $column_keys->{$cached_rh} }
  0            
  0            
414             ];
415 0 0         if ($use_hashref) {
416 0           push @$result, { zip @$column_names, @$retval };
417             }
418             else {
419 0           push @$result, $retval;
420             }
421             }
422             }
423 0 0         return wantarray ? ( $result, $has_more_rows ) : ( @$result ? $result : undef );
    0          
424             }
425             }
426              
427             sub get_columns {
428 0     0 1   my $self = shift;
429 0           my ( $table, $schema ) = @_;
430              
431             # note that not specifying a table name would return all columns for all
432             # tables we probably don't want that, but feel free to change this
433             # behaviour. Same goes for the schema name: we probably want a default
434             # value for the schema, which is what we use here.
435 0 0         die "Unspecified table name" if !$table;
436 0   0       $schema //= "default";
437              
438 0           $self->_cleanup_previous_operation;
439              
440 0           my $rh = $self->_client->GetColumns(
441             Thrift::API::HiveClient2::TGetColumnsReq->new(
442             { sessionHandle => $self->_session_handle,
443             catalogName => undef,
444             schemaName => $schema,
445             tableName => $table,
446             columnName => undef,
447             confOverlay => {}
448             }
449             )
450             );
451 0 0         if ( $rh->{status}{errorCode} ) {
452 0           die __PACKAGE__ . "::execute: $rh->{status}{errorMessage}";
453             }
454 0           $self->_set__operation($rh);
455 0           $self->_set__operation_handle( $rh->{operationHandle} );
456 0           my $columns;
457 0           while ( my $res = $self->fetch($rh) ) {
458 0           for my $line (@$res) {
459 0           my $idx = 0;
460 0           push @$columns, { map { $_ => $line->[ $idx++ ] } @odbc_coldesc_fields };
  0            
461             }
462             }
463 0           return $columns;
464             }
465              
466             sub get_tables {
467 0     0 1   my $self = shift;
468 0           my ( $schema, $table_pattern ) = @_;
469              
470             # note that not specifying a table name would return all columns for all
471             # tables we probably don't want that, but feel free to change this
472             # behaviour. Same goes for the schema name: we probably want a default
473             # value for the schema, which is what we use here.
474 0   0       $schema //= "default";
475              
476 0           $self->_cleanup_previous_operation;
477              
478 0           my $rh = $self->_client->GetTables(
479             Thrift::API::HiveClient2::TGetTablesReq->new(
480             { sessionHandle => $self->_session_handle,
481             catalogName => undef,
482             schemaName => $schema,
483             tableName => $table_pattern,
484             confOverlay => {},
485             }
486             )
487             );
488 0 0         if ( $rh->{status}{errorCode} ) {
489 0           die __PACKAGE__ . "::execute: $rh->{status}{errorMessage}";
490             }
491 0           $self->_set__operation($rh);
492 0           $self->_set__operation_handle( $rh->{operationHandle} );
493 0           my $tables;
494 0           while ( my $res = $self->fetch($rh) ) {
495 0           for my $line (@$res) {
496 0           my $idx = 0;
497 0           push @$tables, { map { $_ => $line->[ $idx++ ] } @tabledesc_fields };
  0            
498             }
499             }
500 0           return $tables;
501             }
502              
503             sub DEMOLISH {
504 0     0 0   my $self = shift;
505              
506 0           $self->_cleanup_previous_operation;
507              
508 0 0         if ( $self->_has_session_handle ) {
509 0           $self->_client->CloseSession(
510             Thrift::API::HiveClient2::TCloseSessionReq->new(
511             { sessionHandle => $self->_session_handle, }
512             )
513             );
514             }
515            
516 0 0         if ( $self->_transport ) {
517 0           $self->_transport->close;
518             }
519             }
520              
521             # when the user calls a method on an object of this class, see if that method
522             # exists on the TCLIService object. If so, create a sub that calls that method
523             # on the client object. If not, die horribly.
524             sub AUTOLOAD {
525 0     0     my ($self) = @_;
526 0           ( my $meth = our $AUTOLOAD ) =~ s/.*:://;
527 0 0         return if $meth eq 'DESTROY';
528 0           print STDERR "$meth\n";
529 3     3   27 no strict 'refs';
  3         11  
  3         607  
530 0 0         if ( $self->_client->can($meth) ) {
531 0     0     *$AUTOLOAD = sub { shift->_client->$meth(@_) };
  0            
532 0           goto &$AUTOLOAD;
533             }
534 0           croak "No such method exists: $AUTOLOAD";
535             }
536              
537             1;
538              
539             __END__