File Coverage

blib/lib/App/RecordStream/InputStream.pm
Criterion Covered Total %
statement 84 93 90.3
branch 26 38 68.4
condition 3 8 37.5
subroutine 18 18 100.0
pod 4 12 33.3
total 135 169 79.8


line stmt bran cond sub pod time code
1             package App::RecordStream::InputStream;
2              
3             =head1 NAME
4              
5             App::RecordStream::InputStream
6              
7             =head1 AUTHOR
8              
9             Benjamin Bernard
10             Keith Amling
11              
12             =head1 DESCRIPTION
13              
14             This module will generate an stream of App::RecordStream::Record objects for given inputs.
15              
16             =head1 SYNOPSIS
17              
18             use App::RecordStream::InputStream;
19             my $stream = App::RecordStream::InputStream(STRING => $recs_string);
20              
21             while ( my $record = $stream->get_record() ) {
22             ... do stuff ...
23             }
24              
25             =head1 CONSTRUCTOR
26              
27             =over 4
28              
29             =item my $in = App::RecordStream::InputStream->new(OPTIONS);
30              
31             The input stream takes named parameters, it will take one of: FILE, STRING, or FH
32             (a file handle).
33              
34             FILE - Name of a file, must be readable
35             STRING - String of new line separated records
36             FH - File handle to a stream of data
37              
38             Optionally, it wil take a NEXT argument. The NEXT argument should be another
39             InputStream object. Once the returned object reaches the end of its string, it
40             will get records from the NEXT App::RecordStream::InputStream. In this manner, InputStream
41             objects can be chained
42              
43             returns an instance of InputStream
44              
45             =item my $in = App::RecordStream::InputStream->new_magic()
46              
47             Provides GNU-style input semantics for scripts. If there are arguments left in
48             @ARGV, it will assume those are file names and make a set of chained streams
49             for those files, returning the first stream. If no files are specified, will
50             open an InputStream on STDIN
51              
52             =item my $in = App::RecordStream::InpustStream->new_from_files(FILES)
53              
54             Takes an array of FILES and constructs a set of chained streams for those
55             files. Returns the first stream
56              
57             =back
58              
59              
60             =head1 PUBLIC METHODS
61              
62             =over 4
63              
64             =item my $record = $this->get_record();
65              
66             Retrieve the next L from the stream. Will return a false value
67             if no records are available. If this stream has a NEXT stream specified in the
68             constructor, this will continue to return Record objects until all chained
69             streams are exhausted
70              
71             =back
72              
73             =cut
74              
75             our $VERSION = "4.0.24";
76              
77 46     46   104661 use strict;
  46         103  
  46         1316  
78 46     46   259 use warnings;
  46         103  
  46         1278  
79              
80 46     46   13882 use IO::String;
  46         149054  
  46         1616  
81 46     46   14820 use JSON::MaybeXS;
  46         282720  
  46         2925  
82              
83 46     46   15698 use App::RecordStream::Record;
  46         154  
  46         40035  
84             require App::RecordStream::Operation;
85              
86             my $json = JSON->new;
87              
88             my $ONE_OF = [qw(FH STRING FILE)];
89              
90             my $ARGUMENTS = {
91             FH => 0,
92             STRING => 0,
93             FILE => 0,
94             NEXT => 0,
95             };
96              
97             sub new_magic {
98 1     1 1 3 my $class = shift;
99 1   50     6 my $files = shift || \@ARGV;
100              
101 1 50       4 if ( scalar @$files > 0 ) {
102 1         4 return $class->new_from_files($files);
103             }
104              
105 0         0 return $class->new(FH => \*STDIN);
106             }
107              
108             sub new_from_files {
109 2     2 1 5 my $class = shift;
110 2         5 my $files = shift;
111              
112 2         2 my $last_stream;
113              
114 2         6 foreach my $file ( reverse @$files ) {
115 4 50 33     78 unless ( -e $file && -r $file ) {
116 0         0 die "File does not exist or is not readable: $file\n";
117             }
118              
119 4         19 my $new_stream = $class->new(FILE => $file, NEXT => $last_stream);
120 4         10 $last_stream = $new_stream;
121             }
122              
123 2         11 return $last_stream;
124             }
125              
126             sub new {
127 255     255 1 775 my $class = shift;
128 255         895 my %args = @_;
129              
130 255         842 my $this = {};
131              
132 255         1155 foreach my $key (keys %$ARGUMENTS) {
133 1020         1749 my $value = $args{$key};
134 1020         2090 $this->{$key} = $value;
135              
136 1020 50       2445 if ( $ARGUMENTS->{$key} ) {
137 0 0       0 die "Did not supply required argument: $key" unless ( $value );
138             }
139             }
140              
141 255         710 bless $this, $class;
142              
143 255         965 $this->_init();
144 255         1485 return $this;
145             }
146              
147             sub _init {
148 255     255   487 my $this = shift;
149              
150 255         528 my $found = {};
151              
152 255         632 foreach my $arg (@$ONE_OF) {
153 765 100       2114 if ( $this->{$arg} ) {
154 255         740 $found->{$arg} = $this->{$arg};
155             }
156             }
157              
158 255 50       945 if ( scalar keys %$found > 1 ) {
159 0         0 die "Must specify only one of " . join(' ', keys %$found);
160             }
161              
162 255 50       813 unless ( scalar keys %$found == 1 ) {
163 0         0 die "Must specify one of " . join(' ', @$ONE_OF);
164             }
165              
166 255 100       778 if ( $this->get_string() ) {
167 238         624 $this->{'FH'} = IO::String->new($this->get_string());
168             }
169              
170 255         13371 my $file = $this->get_file();
171 255 100       1040 if ( $file ) {
172 10 50       318 open(my $fh, '<', $file) or die "Cannot open $file: $!";
173 10         46 $this->{'FH'} = $fh;
174             }
175             }
176              
177             sub get_file {
178 377     377 0 760 my $this = shift;
179 377         1061 return $this->{'FILE'};
180             }
181              
182             sub get_string {
183 611     611 0 999 my $this = shift;
184 611         3053 return $this->{'STRING'};
185             }
186              
187             # Performance! :(
188             sub get_fh {
189 1498     1498 0 2738 return $_[0]->{'FH'};
190             }
191              
192             sub get_record {
193 1502     1502 1 4901 my $this = shift;
194              
195 1502 100       2743 if ( $this->is_done() ) {
196 4         9 return $this->call_next_record();
197             }
198              
199 1498         3082 my $fh = $this->get_fh();
200              
201 1498         6390 my $line = <$fh>;
202              
203 1498 100       34337 if ( ! $line ) {
204 245         1111 close $fh;
205 245         3182 $this->set_done();
206              
207             # This is ugly, reaching into the other class
208 245         635 App::RecordStream::Operation::set_current_filename($this->get_filename());
209              
210 245         663 return $this->call_next_record();
211             }
212              
213             # Direct bless done in the name of performance
214 1253         6948 my $record = $json->decode($line);
215 1253         2815 bless $record, 'App::RecordStream::Record';
216              
217 1253         3989 return $record;
218             }
219              
220             sub call_next_record {
221 249     249 0 450 my $this = shift;
222              
223 249         529 my $next = $this->get_next();
224              
225 249 100       672 unless ( $next ) {
226 243         987 return undef;
227             }
228              
229             # Prevent a deep recursion with many passed files
230 6 50 33     18 if ( $next && $next->is_done() ) {
231 0         0 $next = $next->get_next();
232 0         0 $this->{'NEXT'} = $next;
233             }
234              
235 6         16 return $next->get_record();
236             }
237              
238             sub get_filename {
239 365     365 0 702 my $this = shift;
240              
241 365 100       918 if ( ! $this->is_done() ) {
    100          
242 120 100       395 return $this->get_file() if ( $this->get_file() );
243 118 50       348 return 'STRING_INPUT' if ( $this->get_string() );
244 0 0       0 return 'STREAM_INPUT' if ( $this->get_fh() );
245 0         0 return 'UNKNOWN';
246             }
247             elsif ( $this->get_next() ) {
248 2         6 return $this->get_next()->get_filename();
249             }
250              
251             }
252              
253             sub get_next {
254 496     496 0 835 my $this = shift;
255 496         1848 return $this->{'NEXT'};
256             }
257              
258             sub is_done {
259 1874     1874 0 4872 return $_[0]->{'DONE'};
260             }
261              
262             sub set_done {
263 245     245 0 481 my $this = shift;
264 245         602 $this->{'DONE'} = 1;
265             }
266              
267             1;