File Coverage

blib/lib/Avro/DataFileWriter.pm
Criterion Covered Total %
statement 135 140 96.4
branch 19 28 67.8
condition 6 9 66.6
subroutine 26 27 96.3
pod 0 7 0.0
total 186 211 88.1


line stmt bran cond sub pod time code
1             # Licensed to the Apache Software Foundation (ASF) under one
2             # or more contributor license agreements. See the NOTICE file
3             # distributed with this work for additional information
4             # regarding copyright ownership. The ASF licenses this file
5             # to you under the Apache License, Version 2.0 (the
6             # "License"); you may not use this file except in compliance
7             # with the License. You may obtain a copy of the License at
8             #
9             # https://www.apache.org/licenses/LICENSE-2.0
10             #
11             # Unless required by applicable law or agreed to in writing,
12             # software distributed under the License is distributed on an
13             # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14             # KIND, either express or implied. See the License for the
15             # specific language governing permissions and limitations
16             # under the License.
17              
18             package Avro::DataFileWriter;
19 1     1   1227 use strict;
  1         3  
  1         25  
20 1     1   5 use warnings;
  1         3  
  1         26  
21              
22 1     1   4 use constant DEFAULT_BLOCK_MAX_SIZE => 1024 * 64;
  1         3  
  1         64  
23              
24 1         14 use Object::Tiny qw{
25             fh
26             writer_schema
27             codec
28             metadata
29             block_max_size
30             sync_marker
31 1     1   6 };
  1         2  
32              
33 1     1   304 use Avro::BinaryEncoder;
  1         3  
  1         20  
34 1     1   8 use Avro::BinaryDecoder;
  1         2  
  1         16  
35 1     1   5 use Avro::DataFile;
  1         1  
  1         29  
36 1     1   5 use Avro::Schema;
  1         4  
  1         27  
37 1     1   5 use Carp;
  1         4  
  1         48  
38 1     1   6 use Compress::Zstd;
  1         2  
  1         73  
39 1     1   6 use Error::Simple;
  1         2  
  1         9  
40 1     1   1015 use IO::Compress::Bzip2 qw(bzip2 $Bzip2Error);
  1         7802  
  1         101  
41 1     1   438 use IO::Compress::RawDeflate qw(rawdeflate $RawDeflateError);
  1         3109  
  1         1082  
42              
43             our $VERSION = '1.11.3';
44              
45             sub new {
46 6     6 0 6107 my $class = shift;
47 6         24 my $datafile = $class->SUPER::new(@_);
48              
49             ## default values
50 6   50     68 $datafile->{block_max_size} ||= DEFAULT_BLOCK_MAX_SIZE;
51 6   33     24 $datafile->{sync_marker} ||= $class->random_sync_marker;
52 6   100     17 $datafile->{metadata} ||= {};
53 6   100     14 $datafile->{codec} ||= 'null';
54              
55 6         9 $datafile->{_current_size} = 0;
56 6         13 $datafile->{_serialized_objects} = [];
57 6         12 $datafile->{_compressed_block} = '';
58              
59 6 50       10 croak "Please specify a writer schema" unless $datafile->{writer_schema};
60             croak "writer_schema is invalid"
61 6 50       9 unless eval { $datafile->{writer_schema}->isa("Avro::Schema") };
  6         26  
62              
63             throw Avro::DataFile::Error::InvalidCodec($datafile->{codec})
64 6 100       26 unless Avro::DataFile->is_codec_valid($datafile->{codec});
65              
66 5         12 return $datafile;
67             }
68              
69             ## it's not really good random, but it should be good enough
70             sub random_sync_marker {
71 6     6 0 8 my $class = shift;
72 6         8 my @r;
73 6         13 for (1..16) {
74 96         119 push @r, int rand(1<<8);
75             }
76 6         25 my $marker = pack "C16", @r;
77 6         17 return $marker;
78             }
79              
80             sub print {
81 104     104 0 396 my $datafile = shift;
82 104         128 my $data = shift;
83 104         141 my $writer_schema = $datafile->{writer_schema};
84              
85 104         146 my $enc_ref = '';
86             Avro::BinaryEncoder->encode(
87             schema => $writer_schema,
88             data => $data,
89             emit_cb => sub {
90 3120     3120   3411 $enc_ref .= ${ $_[0] };
  3120         5872  
91             },
92 104         758 );
93 104         393 $datafile->buffer_or_print(\$enc_ref);
94             }
95              
96             sub buffer_or_print {
97 104     104 0 144 my $datafile = shift;
98 104         111 my $string_ref = shift;
99 104         2081 my $codec = $datafile->codec;
100              
101 104         455 my $ser_objects = $datafile->{_serialized_objects};
102 104         186 push @$ser_objects, $string_ref;
103              
104 104 100       239 if ($codec eq 'deflate') {
    100          
    100          
105 1         2 my $uncompressed = join('', map { $$_ } @$ser_objects);
  1         3  
106             rawdeflate \$uncompressed => \$datafile->{_compressed_block}
107 1 50       5 or croak "rawdeflate failed: $RawDeflateError";
108             $datafile->{_current_size} =
109 1         1867 bytes::length($datafile->{_compressed_block});
110             }
111             elsif ($codec eq 'bzip2') {
112 101         171 my $uncompressed = join('', map { $$_ } @$ser_objects);
  5051         6216  
113 101         291 my $compressed;
114 101 50       267 bzip2 \$uncompressed => \$compressed
115             or croak "bzip2 failed: $Bzip2Error";
116 101         89063 $datafile->{_compressed_block} = $compressed;
117 101         267 $datafile->{_current_size} = bytes::length($datafile->{_compressed_block});
118             }
119             elsif ($codec eq 'zstandard') {
120 1         3 my $uncompressed = join('', map { $$_ } @$ser_objects);
  1         4  
121 1         76 $datafile->{_compressed_block} = compress(\$uncompressed);
122 1         5 $datafile->{_current_size} = bytes::length($datafile->{_compressed_block});
123             }
124             else {
125 1         4 $datafile->{_current_size} += bytes::length($$string_ref);
126             }
127 104 50       429 if ($datafile->{_current_size} > $datafile->{block_max_size}) {
128             ## ok, time to flush!
129 0         0 $datafile->_print_block;
130             }
131 104         298 return;
132             }
133              
134             sub header {
135 5     5 0 29 my $datafile = shift;
136              
137 5         124 my $metadata = $datafile->metadata;
138 5         81 my $schema = $datafile->writer_schema;
139 5         69 my $codec = $datafile->codec;
140              
141 5         29 for (keys %$metadata) {
142 4 50       9 warn "metadata '$_' is reserved" if /^avro\./;
143             }
144              
145 5         8 my $encoded_header = '';
146             Avro::BinaryEncoder->encode(
147             schema => $Avro::DataFile::HEADER_SCHEMA,
148             data => {
149             magic => Avro::DataFile->AVRO_MAGIC,
150             meta => {
151             %$metadata,
152             'avro.schema' => $schema->to_string,
153             'avro.codec' => $codec,
154             },
155             sync => $datafile->{sync_marker},
156             },
157 76     76   84 emit_cb => sub { $encoded_header .= ${ $_[0] } },
  76         158  
158 5         38 );
159 5         84 return $encoded_header;
160             }
161              
162             sub _print_header {
163 5     5   7 my $datafile = shift;
164 5         10 $datafile->{_header_printed} = 1;
165 5         8 my $fh = $datafile->{fh};
166 5         12 print $fh $datafile->header;
167              
168 5         15 return 1;
169             }
170              
171             sub _print_block {
172 5     5   7 my $datafile = shift;
173 5 50       12 unless ($datafile->{_header_printed}) {
174 5         13 $datafile->_print_header;
175             }
176 5         11 my $ser_objects = $datafile->{_serialized_objects};
177 5         8 my $object_count = scalar @$ser_objects;
178 5         9 my $length = $datafile->{_current_size};
179 5         7 my $prefix = '';
180              
181 5         10 for ($object_count, $length) {
182             Avro::BinaryEncoder->encode_long(
183 10     10   13 undef, $_, sub { $prefix .= ${ $_[0] } },
  10         38  
184 10         44 );
185             }
186              
187 5         6 my $sync_marker = $datafile->{sync_marker};
188 5         8 my $fh = $datafile->{fh};
189              
190             ## alternatively here, we could do n calls to print
191             ## but we'll say that this all write block thing is here to overcome
192             ## any memory issues we could have with deferencing the ser_objects
193 5 100       95 if ($datafile->codec ne 'null') {
194 4         28 print $fh $prefix, $datafile->{_compressed_block}, $sync_marker;
195             }
196             else {
197 1         6 print $fh $prefix, (map { $$_ } @$ser_objects), $sync_marker;
  1         4  
198             }
199              
200             ## now reset our internal buffer
201 5         11 $datafile->{_serialized_objects} = [];
202 5         10 $datafile->{_current_size} = 0;
203 5         8 $datafile->{_compressed_block} = '';
204 5         19 return 1;
205             }
206              
207             sub flush {
208 11     11 0 64 my $datafile = shift;
209 11 100       31 $datafile->_print_block if $datafile->{_current_size};
210             }
211              
212             sub close {
213 0     0 0 0 my $datafile = shift;
214 0         0 $datafile->flush;
215 0 0       0 my $fh = $datafile->{fh} or return;
216 0         0 close $fh;
217             }
218              
219             sub DESTROY {
220 6     6   2913 my $datafile = shift;
221 6         17 $datafile->flush;
222 6         33 return 1;
223             }
224              
225             package Avro::DataFile::Error::InvalidCodec;
226 1     1   8 use parent 'Error::Simple';
  1         3  
  1         14  
227              
228             1;