File Coverage

blib/lib/HBase/JSONRest/Scanner.pm
Criterion Covered Total %
statement 30 131 22.9
branch 1 50 2.0
condition 8 72 11.1
subroutine 6 11 54.5
pod 2 2 100.0
total 47 266 17.6


line stmt bran cond sub pod time code
1             package HBase::JSONRest::Scanner;
2              
3 1     1   458 use strict;
  1         2  
  1         24  
4 1     1   3 use warnings;
  1         2  
  1         20  
5              
6 1     1   3 use URI::Escape;
  1         2  
  1         49  
7 1     1   4 use Time::HiRes qw(time);
  1         2  
  1         6  
8 1     1   108 use Data::Dumper;
  1         2  
  1         865  
9              
10             # new
11             sub new {
12 0     0 1 0 my $class = shift;
13 0         0 my $params = shift;
14              
15             die "HBase handle required!"
16 0 0 0     0 unless ($params->{hbase} and (ref $params->{hbase}));
17              
18 0         0 my $hbase = $params->{hbase};
19              
20 0   0     0 my $limit = $params->{atatime} || 1;
21              
22             my $self = {
23             hbase => $hbase,
24              
25             table => $params->{table},
26              
27             startrow => $params->{startrow},
28              
29             endrow => $params->{endrow},
30              
31             prefix => $params->{prefix},
32              
33 0         0 limit => $limit,
34              
35             last_key_from_previous_batch => undef,
36              
37             batch_no => 0,
38              
39             EOF => 0,
40             };
41              
42 0         0 return bless $self, $class;
43             }
44              
45             # get_next_batch
46             sub get_next_batch {
47              
48 0     0 1 0 my $self = shift;
49              
50 0         0 $self->{_last_batch_time_start} = time;
51              
52 0         0 my $table = $self->{table};
53 0         0 my $prefix = $self->{prefix};
54 0         0 my $limit = $self->{limit};
55 0         0 my $hbase = $self->{hbase};
56              
57 0         0 my $last_key_from_previous_batch;
58              
59             # Three ways of scanning are supported:
60             #
61             # I. Provide a prefix and scan all rows with that prefix
62             # II. Provide startrow and endrow. Scan is inclusive for
63             # startrow and exclusive for endrow.
64             # III. Provide just startrow - scan entire table, batch by batch.
65             #
66             # All of these are converted to startrow and end_condition under
67             # the hood. Difference is only in user API.
68              
69             # First Batch
70 0 0       0 if ($self->{batch_no} == 0) {
71              
72             # Case I:
73 0 0 0     0 if ((defined $prefix) && !$self->{startrow} && !$self->{endrow}) {
    0 0        
    0 0        
    0 0        
    0 0        
      0        
      0        
      0        
      0        
74              
75 0         0 my $first_row = $self->_get_first_row_of_prefix();
76              
77             # no rows for specified prefix
78 0 0 0     0 return undef if (!$first_row && !$first_row->{row});
79              
80 0         0 $self->{startrow} = $first_row->{row};
81 0         0 $self->{end_condition_type} = 'PREFIX';
82             }
83             # Case II:
84             # case no prefix, startrow exists, endrow exists
85             elsif ((!defined $prefix) && $self->{startrow} && $self->{endrow}){
86             # $self->{startrow} allready assigned
87 0         0 $self->{end_condition_type} = 'ENDROW';
88             }
89             # Case III:
90             # only firs_key specified, scan untill the end of the table
91             elsif ((!defined $prefix) && $self->{startrow} && !$self->{endrow}){
92             # $self->{startrow} allready assigned
93 0         0 $self->{end_condition_type} = 'NONE';
94             }
95             # Forbiden cases:
96             # case prefix and startrow/endrow
97             elsif ((defined $prefix) && ($self->{startrow} || $self->{endrow})){
98 0         0 die "Can not use prefix and startrow/endrow at the same time!";
99             }
100             # case no params
101             elsif ((!defined $prefix) && !$self->{startrow}) {
102 0         0 die "Must specify either prefix or startrow!";
103             }
104             else {
105 0         0 die "Unknown query case!";
106             }
107              
108             # SCAN FOR FIRST BATCH
109             my $rows = $self->_scan_raw({
110             table => $self->{table},
111             startrow => $self->{startrow}, # <- inclusive
112 0         0 limit => $limit,
113             });
114 0         0 $self->{last_batch_time} = time - $self->{_last_batch_time_start};
115 0         0 $self->{batch_no}++;
116              
117 0 0       0 if (!$hbase->{last_error}) {
118              
119 0 0 0     0 if ($rows && @$rows) {
120              
121 0         0 $self->_filter_rows_beyond_last_key($rows);
122              
123             # return what is left, if something is left after filter
124 0 0 0     0 if ($rows && @$rows) {
125 0         0 $self->{last_key_from_previous_batch} = $rows->[-1]->{row};
126 0         0 return $rows;
127             }
128             else {
129 0         0 $self->{last_key_from_previous_batch} = undef;
130 0         0 $self->{EOF} = 1;
131 0         0 return [];
132             }
133             }
134             else {
135 0         0 $self->{last_key_from_previous_batch} = undef;
136 0         0 $self->{EOF} = 1;
137 0         0 return [];
138             }
139             }
140             else {
141 0         0 die "Error while trying to get the first key of a prefix!" . Dumper($hbase->{last_error});
142             }
143             }
144             # Next Batch
145             else {
146             # no more records, last batch was empty or it was the last batch
147 0 0 0     0 if (!$self->{last_key_from_previous_batch} || $self->{EOF}) {
148 0         0 return undef;
149             }
150              
151 0         0 $last_key_from_previous_batch = $self->{last_key_from_previous_batch};
152 0         0 $self->{last_key_from_previous_batch} = undef;
153              
154             # Use last row from previous batch as start row for the next scan, but
155             # make an exclude-start-row scan type.
156              
157 0         0 my $next_batch = $self->_scan_raw({
158             table => $table,
159             startrow => $last_key_from_previous_batch,
160             exclude_startrow_from_result => 1,
161             limit => $limit,
162             });
163              
164 0         0 $self->{last_batch_time} = time - $self->{_last_batch_time_start};
165 0         0 $self->{batch_no}++;
166              
167 0 0       0 if (!$hbase->{last_error}) {
168              
169 0 0 0     0 if ($next_batch && @$next_batch) {
170              
171 0         0 $self->_filter_rows_beyond_last_key($next_batch);
172              
173             # return what is left, if something is left after filter
174 0 0 0     0 if ($next_batch && @$next_batch) {
175 0         0 $self->{last_key_from_previous_batch} = $next_batch->[-1]->{row};
176 0         0 return $next_batch;
177             }
178             else {
179 0         0 $self->{last_key_from_previous_batch} = undef;
180 0         0 $self->{EOF} = 1;
181 0         0 return [];
182             }
183             }
184             else {
185 0         0 $self->{last_key_from_previous_batch} = undef;
186 0         0 $self->{EOF} = 1;
187 0         0 return [];
188             }
189             }
190             else {
191             die "Scanner error while trying to get next batch!"
192 0         0 . Dumper($hbase->{last_error});
193             }
194             }
195             }
196              
197             # _get_first_row_of_prefix
198             sub _get_first_row_of_prefix {
199 0     0   0 my $self = shift;
200              
201 0         0 my $prefix = $self->{prefix};
202 0         0 my $hbase = $self->{hbase};
203 0         0 my $table = $self->{table};
204              
205             # use prefix as the first row with limit 1 - returns the first row with given prefix
206 0         0 my $rows = $self->_scan_raw({
207             table => $table,
208             startrow => $prefix,
209             limit => 1,
210             });
211              
212 0 0       0 die "Should be only one first row!"
213             if ( scalar @$rows > 1);
214              
215 0 0       0 return undef unless $rows->[0];
216              
217 0         0 my $first_row = $rows->[0];
218              
219 0         0 return $first_row;
220             }
221              
222             # _scan_raw (uses passed paremeters instead of instance parameters)
223             sub _scan_raw {
224 0     0   0 my $self = shift;
225 0         0 my $params = shift;
226              
227 0         0 my $hbase = $self->{hbase};
228 0         0 $hbase->{last_error} = undef;
229              
230 0         0 my $scan_uri = _build_scan_uri($params);
231              
232 0         0 my $rows = $hbase->_get_tiny($scan_uri);
233              
234 0         0 return $rows;
235             }
236              
237             sub _build_scan_uri {
238 1     1   2 my $params = shift;
239              
240             #
241             # request parameters:
242             #
243             # 1. startrow - The start row for the scan.
244             # 2. endrow - The end row for the scan.
245             # 4. starttime, endtime - To only retrieve columns within a specific range of version timestamps, both start and end time must be specified.
246             # 5. maxversions - To limit the number of versions of each column to be returned.
247             # 6. limit - The number of rows to return in the scan operation.
248              
249 1         2 my $table = $params->{table};
250 1   50     4 my $limit = $params->{limit} || 1;
251              
252             # optional
253 1   50     3 my $startrow = $params->{startrow} || "";
254 1   50     11 my $endrow = $params->{endrow} || "";
255              
256             # not supported yet:
257 1   50     4 my $columns = $params->{columns} || "";
258 1   50     8 my $starttime = $params->{starttime} || "";
259 1   50     7 my $endtime = $params->{endtime} || "";
260 1   50     5 my $maxversions = $params->{maxversions} || "";
261              
262             # option to do scans with exclusion of first row. Usefull when
263             # scanning for the next batch based on the last key from previous
264             # batch. By default this option is false.
265 1   50     4 my $exclude_startrow = $params->{exclude_startrow_from_result} || 0;
266              
267 1         2 my $uri;
268              
269 1 50       3 if ($exclude_startrow) {
270 0         0 $startrow = uri_escape($startrow) . uri_escape(chr(0));
271             }
272             else {
273 1         4 $startrow = uri_escape($startrow);
274             }
275 1         25 $uri
276             = "/"
277             . uri_escape($table)
278             . "/"
279             . '*?'
280             . "startrow=" . $startrow
281             . "&limit=" . $limit
282             ;
283 1         14 return $uri;
284             }
285              
286             sub _filter_rows_beyond_last_key {
287 0     0     my $self = shift;
288 0           my $rows = shift;
289              
290 0           my $last_retrieved_row = $rows->[-1]->{row};
291              
292 0 0         if ($self->{end_condition_type} eq 'PREFIX') {
    0          
    0          
293 0           my $prefix_end = $self->{prefix} . chr(255);
294 0 0         if ($last_retrieved_row gt $prefix_end) {
295             # need to filter out surpluss of rows
296 0           @$rows = grep { $_->{row} le $prefix_end } @$rows;
  0            
297             # also mark EOF
298 0           $self->{EOF} = 1;
299 0 0 0       if ($rows && @$rows) {
300 0           my $last_retrieved_valid_row = $rows->[-1]->{row};
301 0           $self->{last_key_from_previous_batch} = $last_retrieved_valid_row;
302             }
303 0           return;
304             }
305             }
306             elsif ($self->{end_condition_type} eq 'ENDROW') {
307 0 0         if ($last_retrieved_row ge $self->{endrow}) {
308             # need to filter out surpluss of rows
309 0           @$rows = grep { $_->{row} lt $self->{endrow} } @$rows;
  0            
310             # also mark EOF
311 0           $self->{EOF} = 1;
312 0 0 0       if ($rows && @$rows) {
313 0           my $last_retrieved_valid_row = $rows->[-1]->{row};
314 0           $self->{last_key_from_previous_batch} = $last_retrieved_valid_row;
315             }
316 0           return;
317             }
318             }
319             elsif ($self->{end_condition_type} eq 'NONE') {
320 0           return;
321             }
322             else {
323 0           die 'Unknown end_condition_type!';
324             }
325             }
326              
327             1;
328              
329             __END__