File Coverage

blib/lib/Avro/DataFileWriter.pm
Criterion Covered Total %
statement 135 140 96.4
branch 19 28 67.8
condition 6 9 66.6
subroutine 26 27 96.3
pod 0 7 0.0
total 186 211 88.1


line stmt bran cond sub pod time code
1             # Licensed to the Apache Software Foundation (ASF) under one
2             # or more contributor license agreements. See the NOTICE file
3             # distributed with this work for additional information
4             # regarding copyright ownership. The ASF licenses this file
5             # to you under the Apache License, Version 2.0 (the
6             # "License"); you may not use this file except in compliance
7             # with the License. You may obtain a copy of the License at
8             #
9             # https://www.apache.org/licenses/LICENSE-2.0
10             #
11             # Unless required by applicable law or agreed to in writing,
12             # software distributed under the License is distributed on an
13             # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14             # KIND, either express or implied. See the License for the
15             # specific language governing permissions and limitations
16             # under the License.
17              
18             package Avro::DataFileWriter;
19 1     1   1320 use strict;
  1         5  
  1         39  
20 1     1   6 use warnings;
  1         2  
  1         30  
21              
22 1     1   9 use constant DEFAULT_BLOCK_MAX_SIZE => 1024 * 64;
  1         3  
  1         57  
23              
24 1         7 use Object::Tiny qw{
25             fh
26             writer_schema
27             codec
28             metadata
29             block_max_size
30             sync_marker
31 1     1   10 };
  1         1  
32              
33 1     1   346 use Avro::BinaryEncoder;
  1         3  
  1         25  
34 1     1   5 use Avro::BinaryDecoder;
  1         2  
  1         17  
35 1     1   5 use Avro::DataFile;
  1         1  
  1         31  
36 1     1   6 use Avro::Schema;
  1         2  
  1         41  
37 1     1   6 use Carp;
  1         3  
  1         53  
38 1     1   10 use Compress::Zstd;
  1         3  
  1         92  
39 1     1   7 use Error::Simple;
  1         2  
  1         10  
40 1     1   622 use IO::Compress::Bzip2 qw(bzip2 $Bzip2Error);
  1         8500  
  1         122  
41 1     1   560 use IO::Compress::RawDeflate qw(rawdeflate $RawDeflateError);
  1         3845  
  1         1333  
42              
43             our $VERSION = '1.11.2';
44              
45             sub new {
46 6     6 0 7496 my $class = shift;
47 6         25 my $datafile = $class->SUPER::new(@_);
48              
49             ## default values
50 6   50     71 $datafile->{block_max_size} ||= DEFAULT_BLOCK_MAX_SIZE;
51 6   33     32 $datafile->{sync_marker} ||= $class->random_sync_marker;
52 6   100     26 $datafile->{metadata} ||= {};
53 6   100     15 $datafile->{codec} ||= 'null';
54              
55 6         11 $datafile->{_current_size} = 0;
56 6         18 $datafile->{_serialized_objects} = [];
57 6         14 $datafile->{_compressed_block} = '';
58              
59 6 50       16 croak "Please specify a writer schema" unless $datafile->{writer_schema};
60             croak "writer_schema is invalid"
61 6 50       9 unless eval { $datafile->{writer_schema}->isa("Avro::Schema") };
  6         35  
62              
63             throw Avro::DataFile::Error::InvalidCodec($datafile->{codec})
64 6 100       35 unless Avro::DataFile->is_codec_valid($datafile->{codec});
65              
66 5         16 return $datafile;
67             }
68              
69             ## it's not really good random, but it should be good enough
70             sub random_sync_marker {
71 6     6 0 11 my $class = shift;
72 6         9 my @r;
73 6         17 for (1..16) {
74 96         139 push @r, int rand(1<<8);
75             }
76 6         58 my $marker = pack "C16", @r;
77 6         26 return $marker;
78             }
79              
80             sub print {
81 104     104 0 425 my $datafile = shift;
82 104         129 my $data = shift;
83 104         164 my $writer_schema = $datafile->{writer_schema};
84              
85 104         179 my $enc_ref = '';
86             Avro::BinaryEncoder->encode(
87             schema => $writer_schema,
88             data => $data,
89             emit_cb => sub {
90 3120     3120   3942 $enc_ref .= ${ $_[0] };
  3120         7256  
91             },
92 104         1039 );
93 104         478 $datafile->buffer_or_print(\$enc_ref);
94             }
95              
96             sub buffer_or_print {
97 104     104 0 181 my $datafile = shift;
98 104         146 my $string_ref = shift;
99 104         2948 my $codec = $datafile->codec;
100              
101 104         545 my $ser_objects = $datafile->{_serialized_objects};
102 104         221 push @$ser_objects, $string_ref;
103              
104 104 100       305 if ($codec eq 'deflate') {
    100          
    100          
105 1         3 my $uncompressed = join('', map { $$_ } @$ser_objects);
  1         5  
106             rawdeflate \$uncompressed => \$datafile->{_compressed_block}
107 1 50       6 or croak "rawdeflate failed: $RawDeflateError";
108             $datafile->{_current_size} =
109 1         2139 bytes::length($datafile->{_compressed_block});
110             }
111             elsif ($codec eq 'bzip2') {
112 101         239 my $uncompressed = join('', map { $$_ } @$ser_objects);
  5051         7179  
113 101         298 my $compressed;
114 101 50       312 bzip2 \$uncompressed => \$compressed
115             or croak "bzip2 failed: $Bzip2Error";
116 101         107591 $datafile->{_compressed_block} = $compressed;
117 101         315 $datafile->{_current_size} = bytes::length($datafile->{_compressed_block});
118             }
119             elsif ($codec eq 'zstandard') {
120 1         3 my $uncompressed = join('', map { $$_ } @$ser_objects);
  1         5  
121 1         68 $datafile->{_compressed_block} = compress(\$uncompressed);
122 1         7 $datafile->{_current_size} = bytes::length($datafile->{_compressed_block});
123             }
124             else {
125 1         4 $datafile->{_current_size} += bytes::length($$string_ref);
126             }
127 104 50       507 if ($datafile->{_current_size} > $datafile->{block_max_size}) {
128             ## ok, time to flush!
129 0         0 $datafile->_print_block;
130             }
131 104         324 return;
132             }
133              
134             sub header {
135 5     5 0 26 my $datafile = shift;
136              
137 5         135 my $metadata = $datafile->metadata;
138 5         101 my $schema = $datafile->writer_schema;
139 5         88 my $codec = $datafile->codec;
140              
141 5         35 for (keys %$metadata) {
142 4 50       13 warn "metadata '$_' is reserved" if /^avro\./;
143             }
144              
145 5         9 my $encoded_header = '';
146             Avro::BinaryEncoder->encode(
147             schema => $Avro::DataFile::HEADER_SCHEMA,
148             data => {
149             magic => Avro::DataFile->AVRO_MAGIC,
150             meta => {
151             %$metadata,
152             'avro.schema' => $schema->to_string,
153             'avro.codec' => $codec,
154             },
155             sync => $datafile->{sync_marker},
156             },
157 76     76   109 emit_cb => sub { $encoded_header .= ${ $_[0] } },
  76         203  
158 5         51 );
159 5         125 return $encoded_header;
160             }
161              
162             sub _print_header {
163 5     5   7 my $datafile = shift;
164 5         11 $datafile->{_header_printed} = 1;
165 5         6 my $fh = $datafile->{fh};
166 5         17 print $fh $datafile->header;
167              
168 5         27 return 1;
169             }
170              
171             sub _print_block {
172 5     5   8 my $datafile = shift;
173 5 50       14 unless ($datafile->{_header_printed}) {
174 5         12 $datafile->_print_header;
175             }
176 5         14 my $ser_objects = $datafile->{_serialized_objects};
177 5         9 my $object_count = scalar @$ser_objects;
178 5         15 my $length = $datafile->{_current_size};
179 5         9 my $prefix = '';
180              
181 5         13 for ($object_count, $length) {
182             Avro::BinaryEncoder->encode_long(
183 10     10   17 undef, $_, sub { $prefix .= ${ $_[0] } },
  10         47  
184 10         57 );
185             }
186              
187 5         12 my $sync_marker = $datafile->{sync_marker};
188 5         11 my $fh = $datafile->{fh};
189              
190             ## alternatively here, we could do n calls to print
191             ## but we'll say that this all write block thing is here to overcome
192             ## any memory issues we could have with deferencing the ser_objects
193 5 100       131 if ($datafile->codec ne 'null') {
194 4         35 print $fh $prefix, $datafile->{_compressed_block}, $sync_marker;
195             }
196             else {
197 1         8 print $fh $prefix, (map { $$_ } @$ser_objects), $sync_marker;
  1         4  
198             }
199              
200             ## now reset our internal buffer
201 5         13 $datafile->{_serialized_objects} = [];
202 5         10 $datafile->{_current_size} = 0;
203 5         8 $datafile->{_compressed_block} = '';
204 5         25 return 1;
205             }
206              
207             sub flush {
208 11     11 0 32 my $datafile = shift;
209 11 100       40 $datafile->_print_block if $datafile->{_current_size};
210             }
211              
212             sub close {
213 0     0 0 0 my $datafile = shift;
214 0         0 $datafile->flush;
215 0 0       0 my $fh = $datafile->{fh} or return;
216 0         0 close $fh;
217             }
218              
219             sub DESTROY {
220 6     6   3274 my $datafile = shift;
221 6         21 $datafile->flush;
222 6         42 return 1;
223             }
224              
225             package Avro::DataFile::Error::InvalidCodec;
226 1     1   9 use parent 'Error::Simple';
  1         2  
  1         13  
227              
228             1;