File Coverage

blib/lib/Avro/BinaryEncoder.pm
Criterion Covered Total %
statement 112 125 89.6
branch 19 26 73.0
condition n/a
subroutine 23 25 92.0
pod 1 17 5.8
total 155 193 80.3


line stmt bran cond sub pod time code
1             # Licensed to the Apache Software Foundation (ASF) under one
2             # or more contributor license agreements. See the NOTICE file
3             # distributed with this work for additional information
4             # regarding copyright ownership. The ASF licenses this file
5             # to you under the Apache License, Version 2.0 (the
6             # "License"); you may not use this file except in compliance
7             # with the License. You may obtain a copy of the License at
8             #
9             # https://www.apache.org/licenses/LICENSE-2.0
10             #
11             # Unless required by applicable law or agreed to in writing,
12             # software distributed under the License is distributed on an
13             # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14             # KIND, either express or implied. See the License for the
15             # specific language governing permissions and limitations
16             # under the License.
17              
18             package Avro::BinaryEncoder;
19 3     3   1563 use strict;
  3         9  
  3         87  
20 3     3   12 use warnings;
  3         5  
  3         59  
21              
22 3     3   12 use Config;
  3         4  
  3         106  
23 3     3   1466 use Encode();
  3         25602  
  3         60  
24 3     3   16 use Error::Simple;
  3         5  
  3         33  
25 3     3   127 use Regexp::Common qw(number);
  3         4  
  3         17  
26              
27             our $VERSION = '1.11.3';
28              
29             our $max64;
30             our $complement = ~0x7F;
31             if ($Config{use64bitint}) {
32             $max64 = 9223372036854775807;
33             }
34             else {
35             require Math::BigInt;
36             $complement = Math::BigInt->new("0b" . ("1" x 57) . ("0" x 7));
37             $max64 = Math::BigInt->new("0b0" . ("1" x 63));
38             }
39              
40              
41             =head2 encode(%param)
42              
43             Encodes the given C<data> according to the given C<schema>, and pass it
44             to the C<emit_cb>
45              
46             Params are:
47              
48             =over 4
49              
50             =item * data
51              
52             The data to encode (can be any perl data structure, but it should match
53             schema)
54              
55             =item * schema
56              
57             The schema to use to encode C<data>
58              
59             =item * emit_cb($byte_ref)
60              
61             The callback that will be invoked with the a reference to the encoded data
62             in parameters.
63              
64             =back
65              
66             =cut
67              
68             sub encode {
69 1326     1326 1 4124 my $class = shift;
70 1326         2974 my %param = @_;
71 1326         2368 my ($schema, $data, $cb) = @param{qw/schema data emit_cb/};
72              
73             ## a schema can also be just a string
74 1326 50       3197 my $type = ref $schema ? $schema->type : $schema;
75              
76             ## might want to profile and optimize this
77 1326         2161 my $meth = "encode_$type";
78 1326         3107 $class->$meth($schema, $data, $cb);
79 1326         3108 return;
80             }
81              
82             sub encode_null {
83 3     3 0 1059 $_[3]->(\'');
84             }
85              
86             sub encode_boolean {
87 2     2 0 891 my $class = shift;
88 2         4 my ($schema, $data, $cb) = @_;
89 2 100       5 $cb->( $data ? \"\x1" : \"\x0" );
90             }
91              
92             sub encode_int {
93 22     22 0 6157 my $class = shift;
94 22         35 my ($schema, $data, $cb) = @_;
95 22 100       83 if ($data !~ /^$RE{num}{int}$/) {
96 2         198 throw Avro::BinaryEncoder::Error("cannot convert '$data' to integer");
97             }
98 20 100       2316 if (abs($data) > 0x7fffffff) {
99 2         145 throw Avro::BinaryEncoder::Error("int ($data) should be <= 32bits");
100             }
101              
102 18         240 my $enc = unsigned_varint(zigzag($data));
103 18         93 $cb->(\$enc);
104             }
105              
106             sub encode_long {
107 2072     2072 0 8799 my $class = shift;
108 2072         2895 my ($schema, $data, $cb) = @_;
109 2072 100       6404 if ($data !~ /^$RE{num}{int}$/) {
110 2         201 throw Avro::BinaryEncoder::Error("cannot convert '$data' to long integer");
111             }
112 2070 50       187047 if (abs($data) > $max64) {
113 0         0 throw Avro::BinaryEncoder::Error("int ($data) should be <= 64bits");
114             }
115 2070         3902 my $enc = unsigned_varint(zigzag($data));
116 2070         4419 $cb->(\$enc);
117             }
118              
119             sub encode_float {
120 0     0 0 0 my $class = shift;
121 0         0 my ($schema, $data, $cb) = @_;
122 0         0 my $enc = pack "f<", $data;
123 0         0 $cb->(\$enc);
124             }
125              
126             sub encode_double {
127 0     0 0 0 my $class = shift;
128 0         0 my ($schema, $data, $cb) = @_;
129 0         0 my $enc = pack "d<", $data;
130 0         0 $cb->(\$enc);
131             }
132              
133             sub encode_bytes {
134 14     14 0 25 my $class = shift;
135 14         23 my ($schema, $data, $cb) = @_;
136 14         28 encode_long($class, undef, bytes::length($data), $cb);
137 14         24 $cb->(\$data);
138             }
139              
140             sub encode_string {
141 1170     1170 0 1430 my $class = shift;
142 1170         1749 my ($schema, $data, $cb) = @_;
143 1170         2046 my $bytes = Encode::encode_utf8($data);
144 1170         7598 encode_long($class, undef, bytes::length($bytes), $cb);
145 1170         2048 $cb->(\$bytes);
146             }
147              
148             ## 1.3.2 A record is encoded by encoding the values of its fields in the order
149             ## that they are declared. In other words, a record is encoded as just the
150             ## concatenation of the encodings of its fields. Field values are encoded per
151             ## their schema.
152             sub encode_record {
153 8     8 0 13 my $class = shift;
154 8         16 my ($schema, $data, $cb) = @_;
155 8         13 for my $field (@{ $schema->fields }) {
  8         40  
156             $class->encode(
157             schema => $field->{type},
158             data => $data->{ $field->{name} },
159 20         72 emit_cb => $cb,
160             );
161             }
162             }
163              
164             ## 1.3.2 An enum is encoded by a int, representing the zero-based position of
165             ## the symbol in the schema.
166             sub encode_enum {
167 5     5 0 7 my $class = shift;
168 5         7 my ($schema, $data, $cb) = @_;
169 5         12 my $symbols = $schema->symbols_as_hash;
170 5         7 my $pos = $symbols->{ $data };
171 5 50       11 throw Avro::BinaryEncoder::Error("Cannot find enum $data")
172             unless defined $pos;
173 5         20 $class->encode_int(undef, $pos, $cb);
174             }
175              
176             ## 1.3.2 Arrays are encoded as a series of blocks. Each block consists of a
177             ## long count value, followed by that many array items. A block with count zero
178             ## indicates the end of the array. Each item is encoded per the array's item
179             ## schema.
180             ## If a block's count is negative, its absolute value is used, and the count is
181             ## followed immediately by a long block size
182              
183             ## maybe here it would be worth configuring what a typical block size should be
184             sub encode_array {
185 318     318 0 388 my $class = shift;
186 318         431 my ($schema, $data, $cb) = @_;
187              
188             ## FIXME: multiple blocks
189 318 50       539 if (@$data) {
190 318         658 $class->encode_long(undef, scalar @$data, $cb);
191 318         583 for (@$data) {
192 848         1744 $class->encode(
193             schema => $schema->items,
194             data => $_,
195             emit_cb => $cb,
196             );
197             }
198             }
199             ## end of the only block
200 318         603 $class->encode_long(undef, 0, $cb);
201             }
202              
203              
204             ## 1.3.2 Maps are encoded as a series of blocks. Each block consists of a long
205             ## count value, followed by that many key/value pairs. A block with count zero
206             ## indicates the end of the map. Each item is encoded per the map's value
207             ## schema.
208             ##
209             ## (TODO)
210             ## If a block's count is negative, its absolute value is used, and the count is
211             ## followed immediately by a long block size indicating the number of bytes in
212             ## the block. This block size permits fast skipping through data, e.g., when
213             ## projecting a record to a subset of its fields.
214             sub encode_map {
215 111     111 0 159 my $class = shift;
216 111         166 my ($schema, $data, $cb) = @_;
217              
218 111         266 my @keys = keys %$data;
219 111 50       221 if (@keys) {
220 111         248 $class->encode_long(undef, scalar @keys, $cb);
221 111         210 for (@keys) {
222             ## the key
223 330         695 $class->encode_string(undef, $_, $cb);
224              
225             ## the value
226             $class->encode(
227             schema => $schema->values,
228 330         811 data => $data->{$_},
229             emit_cb => $cb,
230             );
231             }
232             }
233             ## end of the only block
234 111         281 $class->encode_long(undef, 0, $cb);
235             }
236              
237             ## 1.3.2 A union is encoded by first writing an int value indicating the
238             ## zero-based position within the union of the schema of its value. The value
239             ## is then encoded per the indicated schema within the union.
240             sub encode_union {
241 4     4 0 6 my $class = shift;
242 4         8 my ($schema, $data, $cb) = @_;
243 4         6 my $idx = 0;
244 4         5 my $elected_schema;
245 4         6 for my $inner_schema (@{$schema->schemas}) {
  4         9  
246 8 100       19 if ($inner_schema->is_data_valid($data)) {
247 4         6 $elected_schema = $inner_schema;
248 4         6 last;
249             }
250 4         8 $idx++;
251             }
252 4 50       15 unless ($elected_schema) {
253 0         0 throw Avro::BinaryEncoder::Error("union cannot validate the data");
254             }
255 4         21 $class->encode_long(undef, $idx, $cb);
256 4         31 $class->encode(
257             schema => $elected_schema,
258             data => $data,
259             emit_cb => $cb,
260             );
261             }
262              
263             ## 1.3.2 Fixed instances are encoded using the number of bytes declared in the
264             ## schema.
265             sub encode_fixed {
266 10     10 0 19 my $class = shift;
267 10         53 my ($schema, $data, $cb) = @_;
268 10 50       27 if (bytes::length $data != $schema->size) {
269 0         0 my $s1 = bytes::length $data;
270 0         0 my $s2 = $schema->size;
271 0         0 throw Avro::BinaryEncoder::Error("Fixed size doesn't match $s1!=$s2");
272             }
273 10         23 $cb->(\$data);
274             }
275              
276             sub zigzag {
277 3     3   3879 use warnings FATAL => 'numeric';
  3         7  
  3         519  
278 2088 100   2088 0 3412 if ( $_[0] >= 0 ) {
279 2083         3863 return $_[0] << 1;
280             }
281 5         181 return (($_[0] << 1) ^ -1) | 0x1;
282             }
283              
284             sub unsigned_varint {
285 2088     2088 0 5990 my @bytes;
286 2088         3554 while ($_[0] & $complement) { # mask with continuation bit
287 8         328 push @bytes, ($_[0] & 0x7F) | 0x80; # out and set continuation bit
288 8         386 $_[0] >>= 7; # next please
289             }
290 2088         3638 push @bytes, $_[0]; # last byte
291 2088         5300 return pack "C*", @bytes;
292             }
293              
294             package Avro::BinaryEncoder::Error;
295 3     3   17 use parent 'Error::Simple';
  3         6  
  3         14  
296              
297             1;