File Coverage

blib/lib/Avro/BinaryDecoder.pm
Criterion Covered Total %
statement 128 184 69.5
branch 13 22 59.0
condition 1 3 33.3
subroutine 23 40 57.5
pod 1 32 3.1
total 166 281 59.0


line stmt bran cond sub pod time code
1             # Licensed to the Apache Software Foundation (ASF) under one
2             # or more contributor license agreements. See the NOTICE file
3             # distributed with this work for additional information
4             # regarding copyright ownership. The ASF licenses this file
5             # to you under the Apache License, Version 2.0 (the
6             # "License"); you may not use this file except in compliance
7             # with the License. You may obtain a copy of the License at
8             #
9             # https://www.apache.org/licenses/LICENSE-2.0
10             #
11             # Unless required by applicable law or agreed to in writing,
12             # software distributed under the License is distributed on an
13             # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14             # KIND, either express or implied. See the License for the
15             # specific language governing permissions and limitations
16             # under the License.
17              
18             package Avro::BinaryDecoder;
19 2     2   1092 use strict;
  2         4  
  2         48  
20 2     2   8 use warnings;
  2         4  
  2         38  
21              
22 2     2   8 use Config;
  2         3  
  2         54  
23 2     2   7 use Encode();
  2         4  
  2         22  
24 2     2   6 use Error::Simple;
  2         3  
  2         13  
25 2     2   88 use Avro::Schema;
  2         3  
  2         3570  
26              
27             our $VERSION = '1.11.3';
28              
29             our $complement = ~0x7F;
30             unless ($Config{use64bitint}) {
31             require Math::BigInt;
32             $complement = Math::BigInt->new("0b" . ("1" x 57) . ("0" x 7));
33             }
34              
35             =head2 decode(%param)
36              
37             Resolve the given writer and reader_schema to decode the data provided by the
38             reader.
39              
40             =over 4
41              
42             =item * writer_schema
43              
44             The schema that was used to encode the data provided by the C<reader>
45              
46             =item * reader_schema
47              
48             The schema we want to use to decode the data.
49              
50             =item * reader
51              
52             An object implementing a straightforward interface. C<read($buf, $nbytes)> and
53             C<seek($nbytes, $whence)> are expected. Typically a IO::String object or a
54             IO::File object. It is expected that this calls will block the decoder, if not
55             enough data is available for read.
56              
57             =back
58              
59             =cut
60             sub decode {
61 1332     1332 1 2975 my $class = shift;
62 1332         2688 my %param = @_;
63              
64             my ($writer_schema, $reader_schema, $reader)
65 1332         2170 = @param{qw/writer_schema reader_schema reader/};
66              
67 1332 100       2219 my $type = Avro::Schema->match(
68             writer => $writer_schema,
69             reader => $reader_schema,
70             ) or throw Avro::Schema::Error::Mismatch;
71              
72 1331         2086 my $meth = "decode_$type";
73 1331         2312 return $class->$meth($writer_schema, $reader_schema, $reader);
74             }
75              
76             sub skip {
77 2     2 0 2 my $class = shift;
78 2         4 my ($schema, $reader) = @_;
79 2 50       5 my $type = ref $schema ? $schema->type : $schema;
80 2         3 my $meth = "skip_$type";
81 2         5 return $class->$meth($schema, $reader);
82             }
83              
84 1     1 0 10 sub decode_null { undef }
85              
86 0     0 0 0 sub skip_boolean { &decode_boolean }
87             sub decode_boolean {
88 0     0 0 0 my $class = shift;
89 0         0 my $reader = pop;
90 0         0 $reader->read(my $bool, 1);
91 0         0 return unpack 'C', $bool;
92             }
93              
94 0     0 0 0 sub skip_int { &decode_int }
95             sub decode_int {
96 2084     2084 0 2172 my $class = shift;
97 2084         2094 my $reader = pop;
98 2084         2754 return zigzag(unsigned_varint($reader));
99             }
100              
101 0     0 0 0 sub skip_long { &decode_long };
102             sub decode_long {
103 2075     2075 0 2305 my $class = shift;
104 2075         2533 return decode_int($class, @_);
105             }
106              
107 0     0 0 0 sub skip_float { &decode_float }
108             sub decode_float {
109 0     0 0 0 my $class = shift;
110 0         0 my $reader = pop;
111 0         0 $reader->read(my $buf, 4);
112 0         0 return unpack "f<", $buf;
113             }
114              
115 0     0 0 0 sub skip_double { &decode_double }
116             sub decode_double {
117 0     0 0 0 my $class = shift;
118 0         0 my $reader = pop;
119 0         0 $reader->read(my $buf, 8);
120 0         0 return unpack "d<", $buf,
121             }
122              
123             sub skip_bytes {
124 2     2 0 2 my $class = shift;
125 2         3 my $reader = pop;
126 2         2 my $size = decode_long($class, undef, undef, $reader);
127 2         7 $reader->seek($size, 0);
128 2         39 return;
129             }
130              
131             sub decode_bytes {
132 1189     1189 0 1514 my $class = shift;
133 1189         1199 my $reader = pop;
134 1189         1787 my $size = decode_long($class, undef, undef, $reader);
135 1189         2382 $reader->read(my $buf, $size);
136 1189         5162 return $buf;
137             }
138              
139 2     2 0 3 sub skip_string { &skip_bytes }
140             sub decode_string {
141 1172     1172 0 1324 my $class = shift;
142 1172         1259 my $reader = pop;
143 1172         1576 my $bytes = decode_bytes($class, undef, undef, $reader);
144 1172         1798 return Encode::decode_utf8($bytes);
145             }
146              
147             sub skip_record {
148 0     0 0 0 my $class = shift;
149 0         0 my ($schema, $reader) = @_;
150 0         0 for my $field (@{ $schema->fields }){
  0         0  
151 0         0 skip($class, $field->{type}, $reader);
152             }
153             }
154              
155             ## 1.3.2 A record is encoded by encoding the values of its fields in the order
156             ## that they are declared. In other words, a record is encoded as just the
157             ## concatenation of the encodings of its fields. Field values are encoded per
158             ## their schema.
159             sub decode_record {
160 9     9 0 12 my $class = shift;
161 9         16 my ($writer_schema, $reader_schema, $reader) = @_;
162 9         10 my $record;
163              
164 9         13 my %extra_fields = %{ $reader_schema->fields_as_hash };
  9         22  
165 9         15 for my $field (@{ $writer_schema->fields }) {
  9         15  
166 24         38 my $name = $field->{name};
167 24         34 my $w_field_schema = $field->{type};
168 24         41 my $r_field_schema = delete $extra_fields{$name};
169              
170             ## 1.3.2 if the writer's record contains a field with a name not
171             ## present in the reader's record, the writer's value for that field
172             ## is ignored.
173 24 100       69 if (! $r_field_schema) {
174 2         5 $class->skip($w_field_schema, $reader);
175 2         3 next;
176             }
177             my $data = $class->decode(
178             writer_schema => $w_field_schema,
179             reader_schema => $r_field_schema->{type},
180 22         49 reader => $reader,
181             );
182 22         68 $record->{ $name } = $data;
183             }
184              
185 9         22 for my $name (keys %extra_fields) {
186             ## 1.3.2. if the reader's record schema has a field with no default
187             ## value, and writer's schema does not have a field with the same
188             ## name, an error is signalled.
189 2 100       4 unless (exists $extra_fields{$name}->{default}) {
190 1         5 throw Avro::Schema::Error::Mismatch(
191             "cannot resolve without default"
192             );
193             }
194             ## 1.3.2 ... else the default value is used
195 1         2 $record->{ $name } = $extra_fields{$name}->{default};
196             }
197 8         38 return $record;
198             }
199              
200 0     0 0 0 sub skip_enum { &skip_int }
201              
202             ## 1.3.2 An enum is encoded by a int, representing the zero-based position of
203             ## the symbol in the schema.
204             sub decode_enum {
205 5     5 0 6 my $class = shift;
206 5         8 my ($writer_schema, $reader_schema, $reader) = @_;
207 5         9 my $index = decode_int($class, @_);
208              
209 5         11 my $w_data = $writer_schema->symbols->[$index];
210             ## 1.3.2 if the writer's symbol is not present in the reader's enum,
211             ## then an error is signalled.
212 5 100       10 throw Avro::Schema::Error::Mismatch("enum unknown")
213             unless $reader_schema->is_data_valid($w_data);
214 3         9 return $w_data;
215             }
216              
217             sub skip_block {
218 0     0 0 0 my $class = shift;
219 0         0 my ($reader, $block_content) = @_;
220 0         0 my $block_count = decode_long($class, undef, undef, $reader);
221 0         0 while ($block_count) {
222 0 0       0 if ($block_count < 0) {
223 0         0 $reader->seek($block_count, 0);
224 0         0 next;
225             }
226             else {
227 0         0 for (1..$block_count) {
228 0         0 $block_content->();
229             }
230             }
231 0         0 $block_count = decode_long($class, undef, undef, $reader);
232             }
233             }
234              
235             sub skip_array {
236 0     0 0 0 my $class = shift;
237 0         0 my ($schema, $reader) = @_;
238 0     0   0 skip_block($reader, sub { $class->skip($schema->items, $reader) });
  0         0  
239             }
240              
241             ## 1.3.2 Arrays are encoded as a series of blocks. Each block consists of a
242             ## long count value, followed by that many array items. A block with count zero
243             ## indicates the end of the array. Each item is encoded per the array's item
244             ## schema.
245             ## If a block's count is negative, its absolute value is used, and the count is
246             ## followed immediately by a long block size
247             sub decode_array {
248 318     318 0 374 my $class = shift;
249 318         458 my ($writer_schema, $reader_schema, $reader) = @_;
250 318         464 my $block_count = decode_long($class, @_);
251 318         354 my @array;
252 318         508 my $writer_items = $writer_schema->items;
253 318         505 my $reader_items = $reader_schema->items;
254 318         532 while ($block_count) {
255 318         341 my $block_size;
256 318 50       476 if ($block_count < 0) {
257 0         0 $block_count = -$block_count;
258 0         0 $block_size = decode_long($class, @_);
259             ## XXX we can skip with $reader_schema?
260             }
261 318         524 for (1..$block_count) {
262 848         7691 push @array, $class->decode(
263             writer_schema => $writer_items,
264             reader_schema => $reader_items,
265             reader => $reader,
266             );
267             }
268 318         4330 $block_count = decode_long($class, @_);
269             }
270 318         1051 return \@array;
271             }
272              
273             sub skip_map {
274 0     0 0 0 my $class = shift;
275 0         0 my ($schema, $reader) = @_;
276             skip_block($reader, sub {
277 0     0   0 skip_string($class, $reader);
278 0         0 $class->skip($schema->values, $reader);
279 0         0 });
280             }
281              
282             ## 1.3.2 Maps are encoded as a series of blocks. Each block consists of a long
283             ## count value, followed by that many key/value pairs. A block with count zero
284             ## indicates the end of the map. Each item is encoded per the map's value
285             ## schema.
286             ##
287             ## If a block's count is negative, its absolute value is used, and the count is
288             ## followed immediately by a long block size indicating the number of bytes in
289             ## the block. This block size permits fast skipping through data, e.g., when
290             ## projecting a record to a subset of its fields.
291             sub decode_map {
292 112     112 0 136 my $class = shift;
293 112         173 my ($writer_schema, $reader_schema, $reader) = @_;
294 112         128 my %hash;
295              
296 112         170 my $block_count = decode_long($class, @_);
297 112         183 my $writer_values = $writer_schema->values;
298 112         168 my $reader_values = $reader_schema->values;
299 112         183 while ($block_count) {
300 112         131 my $block_size;
301 112 50       164 if ($block_count < 0) {
302 0         0 $block_count = -$block_count;
303 0         0 $block_size = decode_long($class, @_);
304             ## XXX we can skip with $reader_schema?
305             }
306 112         193 for (1..$block_count) {
307 333         504 my $key = decode_string($class, @_);
308 333 50 33     4728 unless (defined $key && length $key) {
309 0         0 throw Avro::Schema::Error::Parse("key of map is invalid");
310             }
311 333         614 $hash{$key} = $class->decode(
312             writer_schema => $writer_values,
313             reader_schema => $reader_values,
314             reader => $reader,
315             );
316             }
317 112         166 $block_count = decode_long($class, @_);
318             }
319 112         292 return \%hash;
320             }
321              
322             sub skip_union {
323 0     0 0 0 my $class = shift;
324 0         0 my ($schema, $reader) = @_;
325 0         0 my $idx = decode_long($class, undef, undef, $reader);
326 0 0       0 my $union_schema = $schema->schemas->[$idx]
327             or throw Avro::Schema::Error::Parse("union union member");
328 0         0 $class->skip($union_schema, $reader);
329             }
330              
331             ## 1.3.2 A union is encoded by first writing an int value indicating the
332             ## zero-based position within the union of the schema of its value. The value
333             ## is then encoded per the indicated schema within the union.
334             sub decode_union {
335 3     3 0 5 my $class = shift;
336 3         4 my ($writer_schema, $reader_schema, $reader) = @_;
337 3         6 my $idx = decode_long($class, @_);
338 3         6 my $union_schema = $writer_schema->schemas->[$idx];
339             ## XXX TODO: schema resolution
340             # The first schema in the reader's union that matches the selected writer's
341             # union schema is recursively resolved against it. if none match, an error
342             # is signalled.
343 3         6 return $class->decode(
344             reader_schema => $union_schema,
345             writer_schema => $union_schema,
346             reader => $reader,
347             );
348             }
349              
350             sub skip_fixed {
351 0     0 0 0 my $class = shift;
352 0         0 my ($schema, $reader) = @_;
353 0         0 $reader->seek($schema->size, 0);
354             }
355              
356             ## 1.3.2 Fixed instances are encoded using the number of bytes declared in the
357             ## schema.
358             sub decode_fixed {
359 12     12 0 16 my $class = shift;
360 12         19 my ($writer_schema, $reader_schema, $reader) = @_;
361 12         27 $reader->read(my $buf, $writer_schema->size);
362 12         136 return $buf;
363             }
364              
365             sub zigzag {
366 2084     2084 0 2274 my $int = shift;
367 2084 50       2851 if (1 & $int) {
368             ## odd values are encoded negative ints
369 0         0 return -( 1 + ($int >> 1) );
370             }
371             ## even values are positive natural left shifted one bit
372             else {
373 2084         3569 return $int >> 1;
374             }
375             }
376              
377             sub unsigned_varint {
378 2084     2084 0 2191 my $reader = shift;
379 2084         2359 my $int = 0;
380 2084         2130 my $more;
381 2084         2201 my $shift = 0;
382 2084         2237 do {
383 2087         3890 $reader->read(my $buf, 1);
384 2087         9159 my $byte = ord $buf;
385 2087         2345 my $value = $byte & 0x7F;
386 2087         2473 $int |= $value << $shift;
387 2087         2342 $shift += 7;
388 2087         3287 $more = $byte & 0x80;
389             } until (! $more);
390 2084         2915 return $int;
391             }
392              
393             1;