File Coverage

blib/lib/Avro/DataFileReader.pm
Criterion Covered Total %
statement 147 170 86.4
branch 42 64 65.6
condition 11 18 61.1
subroutine 29 31 93.5
pod 0 17 0.0
total 229 300 76.3


line stmt bran cond sub pod time code
1             # Licensed to the Apache Software Foundation (ASF) under one
2             # or more contributor license agreements. See the NOTICE file
3             # distributed with this work for additional information
4             # regarding copyright ownership. The ASF licenses this file
5             # to you under the Apache License, Version 2.0 (the
6             # "License"); you may not use this file except in compliance
7             # with the License. You may obtain a copy of the License at
8             #
9             # https://www.apache.org/licenses/LICENSE-2.0
10             #
11             # Unless required by applicable law or agreed to in writing,
12             # software distributed under the License is distributed on an
13             # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14             # KIND, either express or implied. See the License for the
15             # specific language governing permissions and limitations
16             # under the License.
17              
18             use strict;
19 1     1   650 use warnings;
  1         4  
  1         22  
20 1     1   4  
  1         2  
  1         28  
21             use Object::Tiny qw{
22 1         5 fh
23             reader_schema
24             sync_marker
25             block_max_size
26             };
27 1     1   415  
  1         270  
28             use constant MARKER_SIZE => 16;
29 1     1   209  
  1         2  
  1         56  
30             # TODO: refuse to read a block more than block_max_size, instead
31             # do partial reads
32              
33             use Avro::DataFile;
34 1     1   5 use Avro::BinaryDecoder;
  1         1  
  1         36  
35 1     1   5 use Avro::Schema;
  1         2  
  1         17  
36 1     1   4 use Carp;
  1         2  
  1         15  
37 1     1   4 use Compress::Zstd;
  1         2  
  1         44  
38 1     1   416 use IO::Uncompress::Bunzip2 qw(bunzip2);
  1         676  
  1         57  
39 1     1   467 use IO::Uncompress::RawInflate ;
  1         21372  
  1         50  
40 1     1   469 use Fcntl();
  1         7315  
  1         40  
41 1     1   6  
  1         2  
  1         1201  
42             our $VERSION = '1.11.1';
43              
44             my $class = shift;
45             my $datafile = $class->SUPER::new(@_);
46 6     6 0 834  
47 6         22 my $schema = $datafile->{reader_schema};
48             croak "schema is invalid"
49 6         42 if $schema && ! eval { $schema->isa("Avro::Schema") };
50              
51 6 50 66     15 return $datafile;
  5         55  
52             }
53 6         39  
54             my $datafile = shift;
55             return $datafile->metadata->{'avro.codec'};
56             }
57 6     6 0 7  
58 6         10 my $datafile = shift;
59             unless (exists $datafile->{_writer_schema}) {
60             my $json_schema = $datafile->metadata->{'avro.schema'};
61             $datafile->{_writer_schema} = Avro::Schema->parse($json_schema);
62 7     7 0 9 }
63 7 100       17 return $datafile->{_writer_schema};
64 5         9 }
65 5         21  
66             my $datafile = shift;
67 7         15 unless (exists $datafile->{_metadata}) {
68             my $header = $datafile->header;
69             $datafile->{_metadata} = $header->{meta} || {};
70             }
71 19     19 0 150 return $datafile->{_metadata};
72 19 100       35 }
73 6         13  
74 5   50     37 my $datafile = shift;
75             unless (exists $datafile->{_header}) {
76 18         69 $datafile->{_header} = $datafile->read_file_header;
77             }
78              
79             return $datafile->{_header};
80 6     6 0 6 }
81 6 50       12  
82 6         9 my $datafile = shift;
83              
84             my $data = Avro::BinaryDecoder->decode(
85 5         8 reader_schema => $Avro::DataFile::HEADER_SCHEMA,
86             writer_schema => $Avro::DataFile::HEADER_SCHEMA,
87             reader => $datafile->{fh},
88             );
89 6     6 0 6 croak "Magic '$data->{magic}' doesn't match"
90             unless $data->{magic} eq Avro::DataFile->AVRO_MAGIC;
91              
92             $datafile->{sync_marker} = $data->{sync}
93             or croak "sync marker appears invalid";
94              
95 6         32 my $codec = $data->{meta}{'avro.codec'} || "";
96              
97 6 50       21 throw Avro::DataFile::Error::UnsupportedCodec($codec)
98             unless Avro::DataFile->is_codec_valid($codec);
99              
100 6 50       15 return $data;
101             }
102 6   50     14  
103             my $datafile = shift;
104 6 100       14  
105             my @objs;
106             my @block_objs;
107 5         17 do {
108             if ($datafile->eof) {
109             @block_objs = ();
110             }
111 5     5 0 39 else {
112             $datafile->read_block_header if $datafile->eob;
113 5         8 @block_objs = $datafile->read_to_block_end;
114             push @objs, @block_objs;
115 5         6 }
116 9 100       15  
117 4         54 } until !@block_objs;
118              
119             return @objs
120 5 50       8 }
121 4         8  
122 4         30 my $datafile = shift;
123             my $count = shift;
124              
125             my @objs;
126              
127             return () if $datafile->eof;
128 4         14 $datafile->read_block_header if $datafile->eob;
129              
130             my $block_count = $datafile->{object_count};
131 4     4 0 17  
132 4         7 if ($block_count < $count) {
133             push @objs, $datafile->read_to_block_end;
134 4         6 croak "Didn't read as many objects than expected"
135             unless scalar @objs == $block_count;
136 4 100       11  
137 3 100       46 push @objs, $datafile->next($count - $block_count);
138             }
139 3         6 else {
140             push @objs, $datafile->read_within_block($count);
141 3 50       10 }
142 0         0 return @objs;
143 0 0       0 }
144              
145             my $datafile = shift;
146 0         0 my $count = shift;
147              
148             my $reader = $datafile->reader;
149 3         8 my $writer_schema = $datafile->writer_schema;
150             my $reader_schema = $datafile->reader_schema || $writer_schema;
151 3         157 my @objs;
152             while ($count-- > 0 && $datafile->{object_count} > 0) {
153             push @objs, Avro::BinaryDecoder->decode(
154             writer_schema => $writer_schema,
155 7     7 0 10 reader_schema => $reader_schema,
156 7         10 reader => $reader,
157             );
158 7         14 $datafile->{object_count}--;
159 7         35 }
160 7   66     187 return @objs;
161 7         43 }
162 7   66     36  
163 104         231 my $datafile = shift;
164             my $count = shift;
165              
166             my $block_count = $datafile->{object_count};
167             if ($block_count <= $count) {
168 104         402 $datafile->skip_to_block_end
169             or croak "Cannot skip to end of block!";
170 7         58 $datafile->skip($count - $block_count);
171             }
172             else {
173             my $writer_schema = $datafile->writer_schema;
174 0     0 0 0 ## could probably be optimized
175 0         0 while ($count--) {
176             Avro::BinaryDecoder->skip($writer_schema, $datafile->reader);
177 0         0 $datafile->{object_count}--;
178 0 0       0 }
179 0 0       0 }
180             }
181 0         0  
182             my $datafile = shift;
183             my $fh = $datafile->{fh};
184 0         0 my $codec = $datafile->codec;
185              
186 0         0 $datafile->header unless $datafile->{_header};
187 0         0  
188 0         0 $datafile->{object_count} = Avro::BinaryDecoder->decode_long(
189             undef, undef, $fh,
190             );
191             $datafile->{block_size} = Avro::BinaryDecoder->decode_long(
192             undef, undef, $fh,
193             );
194 6     6 0 8 $datafile->{block_start} = tell $fh;
195 6         8  
196 6         11 return if $codec eq 'null';
197              
198 5 50       12 ## we need to read the entire block into memory, to inflate it
199             my $nread = read $fh, my $block, $datafile->{block_size} + MARKER_SIZE
200 5         12 or croak "Error reading from file: $!";
201              
202             ## remove the marker
203 5         11 my $marker = substr $block, -(MARKER_SIZE), MARKER_SIZE, '';
204             $datafile->{block_marker} = $marker;
205              
206 5         31 ## this is our new reader
207             $datafile->{reader} = do {
208 5 100       14 if ($codec eq 'deflate') {
209             IO::Uncompress::RawInflate->new(\$block);
210             }
211 4 50       16 elsif ($codec eq 'bzip2') {
212             my $uncompressed;
213             bunzip2 \$block => \$uncompressed;
214             do { open $fh, '<', \$uncompressed; $fh };
215 4         12 }
216 4         6 elsif ($codec eq 'zstandard') {
217             do { open $fh, '<', \(decompress(\$block)); $fh };
218             }
219 4         5 };
220 4 100       18  
    100          
    50          
221 1         5 return;
222             }
223              
224 2         3 my $datafile = shift;
225 2         13  
226 1     1   9 my $marker = $datafile->{block_marker};
  1         2  
  1         7  
  2         2254  
  2         66  
  2         714  
227             unless (defined $marker) {
228             ## we are in the fh case
229 1         2 read $datafile->{fh}, $marker, MARKER_SIZE;
  1         52  
  1         6  
230             }
231              
232             unless (($marker || "") eq $datafile->sync_marker) {
233 4         1117 croak "Oops synchronization issue (marker mismatch)";
234             }
235             return;
236             }
237 4     4 0 6  
238             my $datafile = shift;
239 4         4  
240 4 100       8 if (my $reader = $datafile->{reader}) {
241             seek $reader, 0, Fcntl->SEEK_END;
242 1         3 return;
243             }
244              
245 4 50 50     82 my $remaining_size = $datafile->{block_size}
246 0         0 + $datafile->{block_start}
247             - tell $datafile->{fh};
248 4         26  
249             seek $datafile->{fh}, $remaining_size, 0;
250             $datafile->verify_marker; ## will do a read
251             return 1;
252 0     0 0 0 }
253              
254 0 0       0 my $datafile = shift;
255 0         0  
256 0         0 my $reader = $datafile->reader;
257             my @objs = $datafile->read_within_block( $datafile->{object_count} );
258             $datafile->verify_marker;
259             return @objs;
260             }
261 0         0  
262             my $datafile = shift;
263 0         0 return $datafile->{reader} || $datafile->{fh};
264 0         0 }
265 0         0  
266             ## end of block
267             my $datafile = shift;
268              
269 4     4 0 5 return 1 if $datafile->eof;
270              
271 4         8 if ($datafile->{reader}) {
272 4         20 return 1 if $datafile->{reader}->eof;
273 4         10 }
274 4         6 else {
275             my $pos = tell $datafile->{fh};
276             return 1 unless $datafile->{block_start};
277             return 1 if $pos >= $datafile->{block_start} + $datafile->{block_size};
278 11     11 0 13 }
279 11   66     31 return 0;
280             }
281              
282             my $datafile = shift;
283             if ($datafile->{reader}) {
284 11     11 0 4704 return 0 unless $datafile->{reader}->eof;
285             }
286 11 100       21 return 1 if $datafile->{fh}->eof;
287             return 0;
288 10 100       46 }
289 4 50       21  
290             use parent 'Error::Simple';
291              
292 6         9 1;