File Coverage

blib/lib/Data/FastPack/JPacker.pm
Criterion Covered Total %
statement 136 150 90.6
branch 25 40 62.5
condition 19 37 51.3
subroutine 15 16 93.7
pod 1 6 16.6
total 196 249 78.7


line stmt bran cond sub pod time code
1             =head1 TITLE
2              
3             Data::FastPack::JPacker - backend class for packing FastPack data files into web loadable JPack
4              
5             =head1 SYNOPSIS
6              
7             use Data::FastPack::JPacker;
8              
9             my @pairs=("input/path", "output/location");
10              
11             my %options=();
12              
13             my $jpacker=Data::FastPack::JPaker->new(%options)
14             $jpacker->pack_files(@pairs);
15            
16              
17             =head1 DESCRIPTION
18              
19             Backend to the fastpack-split program. Splits input files or standard input
20             (assumed FastPack messages/frames) in to output files encoded in JPack. A
21             sequence of files may be created representing a single input file. The
22             resulting jpack files are loadable via JPack in the browser
23              
24             =head1 API
25             =cut
26              
27             package Data::FastPack::JPacker;
28             our $VERSION="v0.1.0";
29              
30             # Module for encoding Fastpack time series as JPACK files
31             # Also splits Fastpack files into smaller ones either on message count or byte limit
32             #
33 1     1   145039 use strict;
  1         3  
  1         34  
34 1     1   4 use warnings;
  1         7  
  1         79  
35 1     1   7 use feature "say";
  1         1  
  1         138  
36              
37              
38 1     1   6 use Data::FastPack; # For parsing data files
  1         2  
  1         36  
39 1     1   1197 use Data::JPack; # For packing/unpacking JPack
  1         163075  
  1         45  
40              
41 1     1   430 use File::Path qw;
  1         3  
  1         74  
42 1     1   656 use File::Spec::Functions qw;
  1         1174  
  1         107  
43 1     1   9 use File::Basename qw;
  1         5  
  1         69  
44 1     1   8 use feature ":all";
  1         2  
  1         311  
45              
46              
47              
48              
49              
50             #use constant KEY_OFFSET=>Data::JPack::Packer::KEY_OFFSET+Data::JPack::Packer::KEY_COUNT;
51 1         12 use constant::more ("byte_limit_="."0", qw<
52             byte_size_
53             message_limit_
54             message_count_
55             messages_
56             html_root_
57             html_container_
58             jpack_
59             jpack_options_
60             write_threshold_
61             read_size_
62             ifh_
63             out_fh_
64             in_buffer_
65             out_buffer_
66             input_done_flag_
67             file_count_
68             first_
69 1     1   9 >);
  1         3  
70              
71             #use constant KEY_COUNT=first_-byte_limit_+1;
72             =head2 new
73              
74             Create a new packer. No arguments
75              
76             =cut
77              
78             sub new{
79 1   50 1 0 296610 my $package=shift//__PACKAGE__;
80 1         3 my $self=[];
81 1         4 $self->[messages_]=[];
82 1         3 bless $self, $package;
83 1         8 $self->init(@_);
84             }
85              
86             =head2 init
87              
88             $jpacker->init( OPTIONS )
89              
90             Initializes a packer with the options (key value pairs) given
91              
92             The options to this are
93              
94             =over
95              
96             =item html_container
97              
98             The path to the root dir or 'index.html' file in the root of the html
99             directory. Data will be stored relative to the container
100              
101             =item jpack_options
102              
103             Options specific to the JPack encoding. Please refere to L for
104             more details
105              
106             =item message_limit
107              
108             Maximum number of FastPack messages to store in an output file
109              
110              
111             =item read_size
112              
113             Size of read buffer in bytes. Default is 4096*8
114              
115             =item write_size
116              
117             Size of buffer to accumulate output before writing out to disk. Default is 4096*8
118              
119             =back
120              
121             =cut
122             #like new but reuse the object
123             sub init {
124 1     1 1 3 my $self=shift;
125            
126 1         9 $self->[file_count_]=0;
127 1         7 my %options=@_;
128 1   50     10 $self->[html_container_]=$options{html_container}//"index.html";
129 1         4 for($self->[html_container_]){
130 1 50       87 $_.="/index.html" if -d; # If container is actuall a dir, then make index
131 1         73 $self->[html_root_]=rel2abs dirname $_; # the html file which would be root
132             }
133 1   50     61 $self->[jpack_options_]=$options{jpack_options}//{};
134 1         3 $self->[message_limit_]=$options{message_limit};
135 1         3 $self->[byte_limit_]=$options{byte_limit};
136             #$self->[jpack_flag_]=1;
137 1         10 $self->[first_]=1;
138 1         4 $self->[in_buffer_]="";
139 1         3 $self->[out_buffer_]="";
140 1   50     37 $self->[read_size_]=$options{read_size}//4096*8;
141 1   50     7 $self->[write_threshold_]=$options{write_size}//4096*8;
142 1         12 $self->[jpack_]=Data::JPack->new(%options);;
143              
144              
145              
146             #$self->[write_threshold_]=4096;
147 1 50       189 mkdir $self->[html_root_] unless -e $self->[html_root_];
148 1         7 $self;
149             }
150              
151              
152             # INTERNAL API
153             sub close_output_file {
154 5     5 0 9 my $self=shift;
155 5 100       26 if($self->[out_fh_]){
156 2 50       13 $self->[out_buffer_].=$self->[jpack_]->encode_footer if $self->[jpack_];
157 2         157 syswrite $self->[out_fh_], $self->[out_buffer_];
158 2         31 close $self->[out_fh_];
159 2         12 $self->[out_fh_]=undef;
160 2         6 $self->[out_buffer_]="";
161 2         6 $self->[file_count_]++;
162             }
163             }
164              
165             # INTERNAL API
166             sub open_output_file {
167 2     2 0 5 my $self =shift;
168 2         4 my $dir= shift;
169 2         10 my $name=$self->[jpack_]->next_file_name;
170              
171 2         513 $dir=dirname $name;
172 2         652 make_path $dir;
173 2         205 say STDERR "Opening output file: $name";
174 2         624 open $self->[out_fh_], ">", $name;
175 2         13 $name;
176              
177             #################################################################################
178             # # Format filename with up to 32 characters of hex (16 bytes/128 bits) That is #
179             # # way more files than we currently can process, but does make it easy view #
180             # # the file listing as the names are all the same length. It also makes it #
181             # # very easy to load data as the browser simply keeps attempting to read #
182             # # incrementally named files #
183             # my $name=sprintf "$dir/%032x.jpack", $self->[file_count_]; #
184             # say STDERR "Opening output file: $name"; #
185             # open $self->[out_fh_], ">", $name; #
186             # $name; #
187             #################################################################################
188             }
189              
190             # INTERAL API
191             sub stats {
192 0     0 0 0 my ($self)=@_;
193 0         0 printf STDERR "Message Count: %0d\n", $self->[message_count_];
194 0         0 printf STDERR "Byte Count: %0d\n", $self->[byte_size_];
195 0         0 printf STDERR "Buffer size: %0d\n", length $self->[out_buffer_];
196 0 0       0 printf STDERR "Input Done flag %s\n", $self->[input_done_flag_]?"YES":"NO";
197 0         0 printf STDERR "Message buffer %d\n", scalar $self->[messages_]->@*;
198             }
199              
200              
201             #Each call to this will pack all files passed.
202             #All files are added to current chunk group
203             #sequence numbers are applicable
204             #call reset if before this if data is unrelated
205             #return a list of packed files in group
206             #The returned files can be directly added to a container
207             #
208             # pack_files
209             # arguments are pairs of src and destinations. STDIN is a file with the path of "-"
210             #
211             =head3 pack_files
212              
213             Takes a list of pairs (input, output) files and converts them according to the
214             options initially set up for the object.
215              
216             Output files sequences is calculated for each file, so multiple files can be
217             added to the output location without knowledge of the current count
218              
219              
220             =cut
221              
222             sub pack_files {
223 1 50   1 0 14 die "Must have pairs of files (input, output)" unless @_%2;
224              
225 1         2 my $self=shift;
226 1         5 my @pairs=@_;
227 1         10 my @src= @pairs[map(((2*$_)), 0..@pairs/2-1 )]; #files
228 1         6 my @dst= @pairs[map(((2*$_)+1), 0..@pairs/2-1)];
229 1         1174 say STDERR "SRCs: ", @src; #Input FASTPACK data file
230 1         14 say STDERR "DSTs: ", @dst; #Directory for output files to live
231              
232 1         6 my $current_src;
233             my $current_dst;
234 1         0 my @outputs;
235 1         0 my $message;
236              
237 1         3 my $start=time;
238 1         3 my $now=time;
239 1         3 my $previous_dst;
240              
241             # Main State Machine
242             #
243 1         2 while(){
244             #say STDERR "main loop";
245             #sleep 1;
246             #say "";
247 4         9 $now=time;
248 4 50       14 if(($now-$start)>1){
249 0         0 $start=$now;
250 0         0 $self->stats;
251             }
252              
253             # Parse already open input file and store messages in messages
254             #
255 4 100 33     26 if($self->[ifh_]){
    50          
256             #say "have input file.. will read";
257 2         101 my $read=sysread $self->[ifh_], $self->[in_buffer_], $self->[read_size_], length $self->[in_buffer_];
258 2 100       13 if($read){
    50          
259             #data present
260 1         7 decode_message $self->[in_buffer_], $self->[messages_];
261             #push $self->[messages_]->@*, $message while( $message=next_unparsed_message $self->[in_buffer_])
262              
263             }
264             elsif($read==0){
265             #say "END OF INPUT FILE: setting undef";
266 1         17 $self->[ifh_]=undef;
267             }
268             }
269              
270             # Need to open/change input, but flush current messages to output first
271             #
272             elsif($self->[out_buffer_] or $self->[messages_]->@*){
273             #say "FLUSHING";
274             #Flush
275             }
276              
277             # Open next input (if present). No messages buffered at this point to safe to close/open outputfile
278             #
279             else {
280             #say "No buffer, messages or open input file.. open next";
281             #load new file
282 2 100       42 if(@src){
283             #say STDERR "Opening input file: $src[0]";
284 1         5 $previous_dst= $current_dst;
285 1         3 $current_src=shift @src;
286 1         3 $current_dst=shift @dst;
287 1         8 $self->[jpack_]->set_prefix($current_dst);
288              
289              
290 1 50       41 if($current_src eq "-"){
291              
292 0         0 say STDERR "Using standard input";
293 0         0 $self->[ifh_]=\*STDIN;
294             }
295             else {
296 1         12 say STDERR "Using $current_src input";
297 1         49 open $self->[ifh_], "<", $current_src;
298             }
299              
300             # Prefix has changed. So close files
301 1 50 33     8 if(defined($previous_dst) and ($current_dst ne $previous_dst)){
302 0         0 $self->close_output_file;
303 0         0 $self->[file_count_]=0;
304             }
305              
306             ###############################################################################################
307             # # Calculate the current file count #
308             # # #
309             # my $p="$self->[html_root_]/$current_dst"; #
310             # my @list= map {hex} sort grep {length == 32 } map {s/\.jpack//; basename $_ } <$p/*.jpack>; #
311             # say "found list: @list"; #
312             # $self->[file_count_]=((pop(@list)//0)+1); #
313             # say "file count $self->[file_count_]"; #
314             # #
315             # #say "Files at $current_dst: @list"; #
316             # say STDERR "CURRENT dst: $current_dst"; #
317             # #redo; #need to read data #
318             ###############################################################################################
319              
320 1         4 $self->[first_]=1; # Reset the first message from file flag
321             }
322             else{
323             #no more files or data to read
324 1         3 $self->[input_done_flag_]=1;
325             }
326             }
327              
328              
329             # Process messages in buffer.
330             #
331 4         131 while($self->[messages_]->@*){
332             #say STDERR "while messages...";
333             #sleep 1;
334              
335 5         40 $message= shift $self->[messages_]->@*;
336             #sleep 1;
337              
338             # Existing output file or size/count boundary or within limits
339 5 100 33     46 if(
      66        
      66        
      66        
340             !$self->[first_]
341             and (!$self->[byte_limit_]||($self->[byte_size_]+$message->[FP_MSG_TOTAL_LEN])< $self->[byte_limit_])
342             and (!$self->[message_limit_]|| ($self->[message_count_]<= $self->[message_limit_]))
343             ){
344              
345              
346             #Serialse the messages and jpack encode
347 3 50       8 if($self->[jpack_]){
348 3         7 my $buf="";
349 3         12 encode_message $buf, [$message];
350             #serialize_messages $buf, $message;
351 3         118 my $data=$self->[jpack_]->encode_data($buf);
352             #say $data if $data;
353 3         75 $self->[out_buffer_].=$data;
354             }
355              
356             #Just serialize...
357             else {
358 0         0 encode_message $self->[out_buffer_], [$message];
359             #serialize_messages $self->[out_buffer_], $message;
360              
361             }
362 3         6 $self->[message_count_]++;
363 3         7 $self->[byte_size_]+=$message->[FP_MSG_TOTAL_LEN];
364             }
365              
366              
367             # New file or limits reached
368             #
369             else {
370             #close and open a file
371 2         10 $self->close_output_file;
372             #$self->[file_count_]++;
373              
374              
375 2         13 my $f=$self->open_output_file($self->[html_root_]."/".$current_dst);
376              
377             #remove the abs html root
378             #$f=rel2abs( $f )=~s/^$self->[html_root_]\///r;
379 2         16 $f=rel2abs($f);
380 2         106 $f=abs2rel($f, $self->[html_root_]);
381              
382 2         181 push @outputs, $f;
383              
384              
385 2 50       9 if($self->[jpack_]){
386             # Encode into jpack format, after writing header
387 2         41 $self->[out_buffer_]=$self->[jpack_]->encode_header;
388              
389 2         89 my $buf="";
390 2         13 encode_message $buf, [$message];
391 2         93 my $data=$self->[jpack_]->encode_data($buf);
392 2         57 $self->[out_buffer_].=$data;
393             }
394             else {
395             # Directly append encoded fast pack message
396 0         0 encode_message $self->[out_buffer_], [$message];
397             }
398              
399 2         6 $self->[message_count_]=1;
400 2         5 $self->[byte_size_]=$message->[FP_MSG_TOTAL_LEN];
401 2         5 $self->[first_]=undef;
402             }
403              
404             # write output file when the output buffer is full or input file is done.
405             last if(
406 5 50 33     37 (!$self->[ifh_] or length($self->[out_buffer_])>=$self->[write_threshold_])
407             );
408             }
409              
410 4         11 my $amount=length $self->[out_buffer_];
411 4         9 while($amount){
412             #say STDERR "WRITE";
413             #sleep 1;
414 1         69 my $write=syswrite $self->[out_fh_], $self->[out_buffer_];
415 1         6 substr $self->[out_buffer_], 0 ,$write,"";
416 1         40 $amount-=$write;
417             }
418              
419              
420 4 50 66     43 if(!$self->[ifh_] and !$self->[messages_]->@*){
421             #say "INPUT FINISHED, and messages flushed";
422 2         9 $self->close_output_file;
423             }
424              
425              
426 4 100 66     18 last if $self->[input_done_flag_] and !$self->[messages_]->@*;
427             }
428              
429 1         4 $self->close_output_file;
430             @outputs
431 1         6 }
432             1;
433              
434              
435             =head1 AUTHOR
436              
437             Ruben Westerberg, Edrclaw@mac.com
438              
439             =head1 REPOSITORTY and BUGS
440              
441             Please report any bugs via git hub: L
442              
443              
444             =head1 COPYRIGHT AND LICENSE
445              
446             Copyright (C) 2023 by Ruben Westerberg
447              
448             This library is free software; you can redistribute it and/or modify it under
449             the same terms as Perl or the MIT license.
450              
451             =head1 DISCLAIMER OF WARRANTIES
452              
453             THIS PACKAGE IS PROVIDED "AS IS" AND WITHOUT ANY EXPRESS OR IMPLIED WARRANTIES,
454             INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
455             FITNESS FOR A PARTICULAR PURPOSE.
456              
457             =cut
458