File Coverage

blib/lib/Avro/DataFileWriter.pm
Criterion Covered Total %
statement 135 140 96.4
branch 19 28 67.8
condition 6 9 66.6
subroutine 26 27 96.3
pod 0 7 0.0
total 186 211 88.1


line stmt bran cond sub pod time code
1             # Licensed to the Apache Software Foundation (ASF) under one
2             # or more contributor license agreements. See the NOTICE file
3             # distributed with this work for additional information
4             # regarding copyright ownership. The ASF licenses this file
5             # to you under the Apache License, Version 2.0 (the
6             # "License"); you may not use this file except in compliance
7             # with the License. You may obtain a copy of the License at
8             #
9             # https://www.apache.org/licenses/LICENSE-2.0
10             #
11             # Unless required by applicable law or agreed to in writing,
12             # software distributed under the License is distributed on an
13             # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14             # KIND, either express or implied. See the License for the
15             # specific language governing permissions and limitations
16             # under the License.
17              
18             use strict;
19 1     1   1034 use warnings;
  1         3  
  1         24  
20 1     1   4  
  1         2  
  1         24  
21             use constant DEFAULT_BLOCK_MAX_SIZE => 1024 * 64;
22 1     1   4  
  1         2  
  1         48  
23             use Object::Tiny qw{
24 1         5 fh
25             writer_schema
26             codec
27             metadata
28             block_max_size
29             sync_marker
30             };
31 1     1   8  
  1         2  
32             use Avro::BinaryEncoder;
33 1     1   282 use Avro::BinaryDecoder;
  1         1  
  1         18  
34 1     1   5 use Avro::DataFile;
  1         1  
  1         14  
35 1     1   4 use Avro::Schema;
  1         2  
  1         22  
36 1     1   6 use Carp;
  1         1  
  1         21  
37 1     1   4 use Compress::Zstd;
  1         1  
  1         41  
38 1     1   8 use Error::Simple;
  1         12  
  1         60  
39 1     1   6 use IO::Compress::Bzip2 qw(bzip2 $Bzip2Error);
  1         2  
  1         6  
40 1     1   511 use IO::Compress::RawDeflate qw(rawdeflate $RawDeflateError);
  1         6776  
  1         94  
41 1     1   431  
  1         3033  
  1         1043  
42             our $VERSION = '1.11.1';
43              
44             my $class = shift;
45             my $datafile = $class->SUPER::new(@_);
46 6     6 0 5882  
47 6         21 ## default values
48             $datafile->{block_max_size} ||= DEFAULT_BLOCK_MAX_SIZE;
49             $datafile->{sync_marker} ||= $class->random_sync_marker;
50 6   50     52 $datafile->{metadata} ||= {};
51 6   33     23 $datafile->{codec} ||= 'null';
52 6   100     16  
53 6   100     13 $datafile->{_current_size} = 0;
54             $datafile->{_serialized_objects} = [];
55 6         9 $datafile->{_compressed_block} = '';
56 6         15  
57 6         6 croak "Please specify a writer schema" unless $datafile->{writer_schema};
58             croak "writer_schema is invalid"
59 6 50       13 unless eval { $datafile->{writer_schema}->isa("Avro::Schema") };
60              
61 6 50       7 throw Avro::DataFile::Error::InvalidCodec($datafile->{codec})
  6         27  
62             unless Avro::DataFile->is_codec_valid($datafile->{codec});
63              
64 6 100       23 return $datafile;
65             }
66 5         12  
67             ## it's not really good random, but it should be good enough
68             my $class = shift;
69             my @r;
70             for (1..16) {
71 6     6 0 8 push @r, int rand(1<<8);
72 6         10 }
73 6         13 my $marker = pack "C16", @r;
74 96         113 return $marker;
75             }
76 6         22  
77 6         19 my $datafile = shift;
78             my $data = shift;
79             my $writer_schema = $datafile->{writer_schema};
80              
81 104     104 0 462 my $enc_ref = '';
82 104         217 Avro::BinaryEncoder->encode(
83 104         210 schema => $writer_schema,
84             data => $data,
85 104         194 emit_cb => sub {
86             $enc_ref .= ${ $_[0] };
87             },
88             );
89             $datafile->buffer_or_print(\$enc_ref);
90 3120     3120   3270 }
  3120         5754  
91              
92 104         947 my $datafile = shift;
93 104         595 my $string_ref = shift;
94             my $codec = $datafile->codec;
95              
96             my $ser_objects = $datafile->{_serialized_objects};
97 104     104 0 220 push @$ser_objects, $string_ref;
98 104         141  
99 104         3009 if ($codec eq 'deflate') {
100             my $uncompressed = join('', map { $$_ } @$ser_objects);
101 104         531 rawdeflate \$uncompressed => \$datafile->{_compressed_block}
102 104         208 or croak "rawdeflate failed: $RawDeflateError";
103             $datafile->{_current_size} =
104 104 100       350 bytes::length($datafile->{_compressed_block});
    100          
    100          
105 1         2 }
  1         5  
106             elsif ($codec eq 'bzip2') {
107 1 50       4 my $uncompressed = join('', map { $$_ } @$ser_objects);
108             my $compressed;
109 1         1677 bzip2 \$uncompressed => \$compressed
110             or croak "bzip2 failed: $Bzip2Error";
111             $datafile->{_compressed_block} = $compressed;
112 101         231 $datafile->{_current_size} = bytes::length($datafile->{_compressed_block});
  5051         6587  
113 101         265 }
114 101 50       384 elsif ($codec eq 'zstandard') {
115             my $uncompressed = join('', map { $$_ } @$ser_objects);
116 101         109696 $datafile->{_compressed_block} = compress(\$uncompressed);
117 101         352 $datafile->{_current_size} = bytes::length($datafile->{_compressed_block});
118             }
119             else {
120 1         2 $datafile->{_current_size} += bytes::length($$string_ref);
  1         4  
121 1         61 }
122 1         5 if ($datafile->{_current_size} > $datafile->{block_max_size}) {
123             ## ok, time to flush!
124             $datafile->_print_block;
125 1         3 }
126             return;
127 104 50       614 }
128              
129 0         0 my $datafile = shift;
130              
131 104         395 my $metadata = $datafile->metadata;
132             my $schema = $datafile->writer_schema;
133             my $codec = $datafile->codec;
134              
135 5     5 0 18 for (keys %$metadata) {
136             warn "metadata '$_' is reserved" if /^avro\./;
137 5         106 }
138 5         76  
139 5         81 my $encoded_header = '';
140             Avro::BinaryEncoder->encode(
141 5         27 schema => $Avro::DataFile::HEADER_SCHEMA,
142 4 50       9 data => {
143             magic => Avro::DataFile->AVRO_MAGIC,
144             meta => {
145 5         8 %$metadata,
146             'avro.schema' => $schema->to_string,
147             'avro.codec' => $codec,
148             },
149             sync => $datafile->{sync_marker},
150             },
151             emit_cb => sub { $encoded_header .= ${ $_[0] } },
152             );
153             return $encoded_header;
154             }
155              
156             my $datafile = shift;
157 76     76   80 $datafile->{_header_printed} = 1;
  76         155  
158 5         37 my $fh = $datafile->{fh};
159 5         85 print $fh $datafile->header;
160              
161             return 1;
162             }
163 5     5   6  
164 5         7 my $datafile = shift;
165 5         5 unless ($datafile->{_header_printed}) {
166 5         11 $datafile->_print_header;
167             }
168 5         13 my $ser_objects = $datafile->{_serialized_objects};
169             my $object_count = scalar @$ser_objects;
170             my $length = $datafile->{_current_size};
171             my $prefix = '';
172 5     5   7  
173 5 50       8 for ($object_count, $length) {
174 5         13 Avro::BinaryEncoder->encode_long(
175             undef, $_, sub { $prefix .= ${ $_[0] } },
176 5         9 );
177 5         8 }
178 5         6  
179 5         9 my $sync_marker = $datafile->{sync_marker};
180             my $fh = $datafile->{fh};
181 5         8  
182             ## alternatively here, we could do n calls to print
183 10     10   14 ## but we'll say that this all write block thing is here to overcome
  10         55  
184 10         41 ## any memory issues we could have with deferencing the ser_objects
185             if ($datafile->codec ne 'null') {
186             print $fh $prefix, $datafile->{_compressed_block}, $sync_marker;
187 5         17 }
188 5         7 else {
189             print $fh $prefix, (map { $$_ } @$ser_objects), $sync_marker;
190             }
191              
192             ## now reset our internal buffer
193 5 100       95 $datafile->{_serialized_objects} = [];
194 4         36 $datafile->{_current_size} = 0;
195             $datafile->{_compressed_block} = '';
196             return 1;
197 1         7 }
  1         4  
198              
199             my $datafile = shift;
200             $datafile->_print_block if $datafile->{_current_size};
201 5         8 }
202 5         9  
203 5         6 my $datafile = shift;
204 5         21 $datafile->flush;
205             my $fh = $datafile->{fh} or return;
206             close $fh;
207             }
208 11     11 0 25  
209 11 100       32 my $datafile = shift;
210             $datafile->flush;
211             return 1;
212             }
213 0     0 0 0  
214 0         0 use parent 'Error::Simple';
215 0 0       0  
216 0         0 1;