File Coverage

blib/lib/Avro/DataFileReader.pm
Criterion Covered Total %
statement 147 170 86.4
branch 42 64 65.6
condition 11 18 61.1
subroutine 29 31 93.5
pod 0 17 0.0
total 229 300 76.3


line stmt bran cond sub pod time code
1             # Licensed to the Apache Software Foundation (ASF) under one
2             # or more contributor license agreements. See the NOTICE file
3             # distributed with this work for additional information
4             # regarding copyright ownership. The ASF licenses this file
5             # to you under the Apache License, Version 2.0 (the
6             # "License"); you may not use this file except in compliance
7             # with the License. You may obtain a copy of the License at
8             #
9             # https://www.apache.org/licenses/LICENSE-2.0
10             #
11             # Unless required by applicable law or agreed to in writing,
12             # software distributed under the License is distributed on an
13             # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14             # KIND, either express or implied. See the License for the
15             # specific language governing permissions and limitations
16             # under the License.
17              
18             package Avro::DataFileReader;
19 1     1   722 use strict;
  1         2  
  1         37  
20 1     1   5 use warnings;
  1         2  
  1         26  
21              
22 1         6 use Object::Tiny qw{
23             fh
24             reader_schema
25             sync_marker
26             block_max_size
27 1     1   421 };
  1         284  
28              
29 1     1   204 use constant MARKER_SIZE => 16;
  1         2  
  1         57  
30              
31             # TODO: refuse to read a block more than block_max_size, instead
32             # do partial reads
33              
34 1     1   6 use Avro::DataFile;
  1         2  
  1         19  
35 1     1   5 use Avro::BinaryDecoder;
  1         2  
  1         15  
36 1     1   4 use Avro::Schema;
  1         1  
  1         16  
37 1     1   4 use Carp;
  1         2  
  1         47  
38 1     1   419 use Compress::Zstd;
  1         699  
  1         59  
39 1     1   1706 use IO::Uncompress::Bunzip2 qw(bunzip2);
  1         24445  
  1         60  
40 1     1   494 use IO::Uncompress::RawInflate ;
  1         7743  
  1         51  
41 1     1   8 use Fcntl();
  1         3  
  1         1274  
42              
43             our $VERSION = '1.11.3';
44              
45             sub new {
46 6     6 0 794 my $class = shift;
47 6         43 my $datafile = $class->SUPER::new(@_);
48              
49 6         44 my $schema = $datafile->{reader_schema};
50             croak "schema is invalid"
51 6 50 66     18 if $schema && ! eval { $schema->isa("Avro::Schema") };
  5         34  
52              
53 6         39 return $datafile;
54             }
55              
56             sub codec {
57 6     6 0 9 my $datafile = shift;
58 6         13 return $datafile->metadata->{'avro.codec'};
59             }
60              
61             sub writer_schema {
62 7     7 0 9 my $datafile = shift;
63 7 100       15 unless (exists $datafile->{_writer_schema}) {
64 5         8 my $json_schema = $datafile->metadata->{'avro.schema'};
65 5         21 $datafile->{_writer_schema} = Avro::Schema->parse($json_schema);
66             }
67 7         12 return $datafile->{_writer_schema};
68             }
69              
70             sub metadata {
71 19     19 0 168 my $datafile = shift;
72 19 100       42 unless (exists $datafile->{_metadata}) {
73 6         12 my $header = $datafile->header;
74 5   50     13 $datafile->{_metadata} = $header->{meta} || {};
75             }
76 18         57 return $datafile->{_metadata};
77             }
78              
79             sub header {
80 6     6 0 10 my $datafile = shift;
81 6 50       11 unless (exists $datafile->{_header}) {
82 6         14 $datafile->{_header} = $datafile->read_file_header;
83             }
84              
85 5         9 return $datafile->{_header};
86             }
87              
88             sub read_file_header {
89 6     6 0 7 my $datafile = shift;
90              
91             my $data = Avro::BinaryDecoder->decode(
92             reader_schema => $Avro::DataFile::HEADER_SCHEMA,
93             writer_schema => $Avro::DataFile::HEADER_SCHEMA,
94             reader => $datafile->{fh},
95 6         33 );
96             croak "Magic '$data->{magic}' doesn't match"
97 6 50       20 unless $data->{magic} eq Avro::DataFile->AVRO_MAGIC;
98              
99             $datafile->{sync_marker} = $data->{sync}
100 6 50       20 or croak "sync marker appears invalid";
101              
102 6   50     14 my $codec = $data->{meta}{'avro.codec'} || "";
103              
104 6 100       14 throw Avro::DataFile::Error::UnsupportedCodec($codec)
105             unless Avro::DataFile->is_codec_valid($codec);
106              
107 5         12 return $data;
108             }
109              
110             sub all {
111 5     5 0 41 my $datafile = shift;
112              
113 5         7 my @objs;
114             my @block_objs;
115 5         7 do {
116 9 100       21 if ($datafile->eof) {
117 4         57 @block_objs = ();
118             }
119             else {
120 5 50       11 $datafile->read_block_header if $datafile->eob;
121 4         13 @block_objs = $datafile->read_to_block_end;
122 4         19 push @objs, @block_objs;
123             }
124              
125             } until !@block_objs;
126              
127             return @objs
128 4         18 }
129              
130             sub next {
131 4     4 0 14 my $datafile = shift;
132 4         6 my $count = shift;
133              
134 4         7 my @objs;
135              
136 4 100       11 return () if $datafile->eof;
137 3 100       47 $datafile->read_block_header if $datafile->eob;
138              
139 3         8 my $block_count = $datafile->{object_count};
140              
141 3 50       7 if ($block_count < $count) {
142 0         0 push @objs, $datafile->read_to_block_end;
143 0 0       0 croak "Didn't read as many objects than expected"
144             unless scalar @objs == $block_count;
145              
146 0         0 push @objs, $datafile->next($count - $block_count);
147             }
148             else {
149 3         12 push @objs, $datafile->read_within_block($count);
150             }
151 3         132 return @objs;
152             }
153              
154             sub read_within_block {
155 7     7 0 12 my $datafile = shift;
156 7         11 my $count = shift;
157              
158 7         13 my $reader = $datafile->reader;
159 7         40 my $writer_schema = $datafile->writer_schema;
160 7   66     151 my $reader_schema = $datafile->reader_schema || $writer_schema;
161 7         39 my @objs;
162 7   66     33 while ($count-- > 0 && $datafile->{object_count} > 0) {
163 104         197 push @objs, Avro::BinaryDecoder->decode(
164             writer_schema => $writer_schema,
165             reader_schema => $reader_schema,
166             reader => $reader,
167             );
168 104         361 $datafile->{object_count}--;
169             }
170 7         44 return @objs;
171             }
172              
173             sub skip {
174 0     0 0 0 my $datafile = shift;
175 0         0 my $count = shift;
176              
177 0         0 my $block_count = $datafile->{object_count};
178 0 0       0 if ($block_count <= $count) {
179 0 0       0 $datafile->skip_to_block_end
180             or croak "Cannot skip to end of block!";
181 0         0 $datafile->skip($count - $block_count);
182             }
183             else {
184 0         0 my $writer_schema = $datafile->writer_schema;
185             ## could probably be optimized
186 0         0 while ($count--) {
187 0         0 Avro::BinaryDecoder->skip($writer_schema, $datafile->reader);
188 0         0 $datafile->{object_count}--;
189             }
190             }
191             }
192              
193             sub read_block_header {
194 6     6 0 9 my $datafile = shift;
195 6         7 my $fh = $datafile->{fh};
196 6         15 my $codec = $datafile->codec;
197              
198 5 50       12 $datafile->header unless $datafile->{_header};
199              
200 5         15 $datafile->{object_count} = Avro::BinaryDecoder->decode_long(
201             undef, undef, $fh,
202             );
203 5         14 $datafile->{block_size} = Avro::BinaryDecoder->decode_long(
204             undef, undef, $fh,
205             );
206 5         28 $datafile->{block_start} = tell $fh;
207              
208 5 100       11 return if $codec eq 'null';
209              
210             ## we need to read the entire block into memory, to inflate it
211 4 50       17 my $nread = read $fh, my $block, $datafile->{block_size} + MARKER_SIZE
212             or croak "Error reading from file: $!";
213              
214             ## remove the marker
215 4         12 my $marker = substr $block, -(MARKER_SIZE), MARKER_SIZE, '';
216 4         8 $datafile->{block_marker} = $marker;
217              
218             ## this is our new reader
219 4         6 $datafile->{reader} = do {
220 4 100       14 if ($codec eq 'deflate') {
    100          
    50          
221 1         7 IO::Uncompress::RawInflate->new(\$block);
222             }
223             elsif ($codec eq 'bzip2') {
224 2         2 my $uncompressed;
225 2         13 bunzip2 \$block => \$uncompressed;
226 1     1   7 do { open $fh, '<', \$uncompressed; $fh };
  1         2  
  1         6  
  2         2265  
  2         80  
  2         830  
227             }
228             elsif ($codec eq 'zstandard') {
229 1         3 do { open $fh, '<', \(decompress(\$block)); $fh };
  1         41  
  1         6  
230             }
231             };
232              
233 4         1593 return;
234             }
235              
236             sub verify_marker {
237 4     4 0 6 my $datafile = shift;
238              
239 4         7 my $marker = $datafile->{block_marker};
240 4 100       8 unless (defined $marker) {
241             ## we are in the fh case
242 1         4 read $datafile->{fh}, $marker, MARKER_SIZE;
243             }
244              
245 4 50 50     82 unless (($marker || "") eq $datafile->sync_marker) {
246 0         0 croak "Oops synchronization issue (marker mismatch)";
247             }
248 4         36 return;
249             }
250              
251             sub skip_to_block_end {
252 0     0 0 0 my $datafile = shift;
253              
254 0 0       0 if (my $reader = $datafile->{reader}) {
255 0         0 seek $reader, 0, Fcntl->SEEK_END;
256 0         0 return;
257             }
258              
259             my $remaining_size = $datafile->{block_size}
260             + $datafile->{block_start}
261 0         0 - tell $datafile->{fh};
262              
263 0         0 seek $datafile->{fh}, $remaining_size, 0;
264 0         0 $datafile->verify_marker; ## will do a read
265 0         0 return 1;
266             }
267              
268             sub read_to_block_end {
269 4     4 0 6 my $datafile = shift;
270              
271 4         10 my $reader = $datafile->reader;
272 4         21 my @objs = $datafile->read_within_block( $datafile->{object_count} );
273 4         10 $datafile->verify_marker;
274 4         8 return @objs;
275             }
276              
277             sub reader {
278 11     11 0 14 my $datafile = shift;
279 11   66     33 return $datafile->{reader} || $datafile->{fh};
280             }
281              
282             ## end of block
283             sub eob {
284 11     11 0 5244 my $datafile = shift;
285              
286 11 100       20 return 1 if $datafile->eof;
287              
288 10 100       46 if ($datafile->{reader}) {
289 4 50       20 return 1 if $datafile->{reader}->eof;
290             }
291             else {
292 6         12 my $pos = tell $datafile->{fh};
293 6 50       32 return 1 unless $datafile->{block_start};
294 0 0       0 return 1 if $pos >= $datafile->{block_start} + $datafile->{block_size};
295             }
296 4         27 return 0;
297             }
298              
299             sub eof {
300 27     27 0 38 my $datafile = shift;
301 27 100       67 if ($datafile->{reader}) {
302 14 100       97 return 0 unless $datafile->{reader}->eof;
303             }
304 19 100       92 return 1 if $datafile->{fh}->eof;
305 12         101 return 0;
306             }
307              
308             package Avro::DataFile::Error::UnsupportedCodec;
309 1     1   8 use parent 'Error::Simple';
  1         9  
  1         8  
310              
311             1;