File Coverage

blib/lib/CSV/HistoryPlayer.pm
Criterion Covered Total %
statement 83 84 98.8
branch 17 18 94.4
condition 4 6 66.6
subroutine 14 14 100.0
pod 2 2 100.0
total 120 124 96.7


line stmt bran cond sub pod time code
1             package CSV::HistoryPlayer;
2              
3 1     1   25784 use List::MoreUtils qw(uniq);
  1         7268  
  1         4  
4 1     1   914 use Moo;
  1         9244  
  1         5  
5 1     1   1017 use Path::Tiny;
  1         1  
  1         36  
6 1     1   561 use Text::CSV;
  1         8626  
  1         71  
7              
8 1     1   6 use strict;
  1         1  
  1         21  
9 1     1   2 use warnings;
  1         1  
  1         25  
10 1     1   466 use namespace::clean;
  1         7554  
  1         4  
11              
12             our $VERSION = '0.03';
13              
14             =head1 NAME
15              
16             =encoding utf8
17              
18             CSV::HistoryPlayer - Plays scattered CSV files with historic data
19              
20             =head1 STATUS
21              
22             =begin HTML
23              
24            

25            
26            

27              
28             =end HTML
29              
30              
31             =head1 SYNOPSYS
32              
33             use CSV::HistoryPlayer;
34              
35             my $player = CSV::HistoryPlayer->(root_dir => 'path/to/directory');
36             while (my $data = $player->poll) {
37             my ($file, $row) = @$data;
38             print "event occured at ", $row->[0], "\n";
39             }
40              
41             =head1 DESCRIPTION
42              
43             Let's assume you have many of CSV-files, each one has some events
44             written in it (in the B column in form of unix timestamp) and
45             filenames also have encoded date of the events within, i.e.
46              
47             ├── income
48             │   ├── 2015-02-10.csv
49             │   └── 2015-02-12.csv
50             └── outcome
51             ├── 2015-02-11.csv
52             └── 2015-02-12.csv
53              
54             Let's assume, that the files have content like:
55              
56             income/2015-02-10.csv: 1455106611, 10, "got pocket money from Mom"
57             income/2015-02-12.csv: 1455301001, 15, "got pocket money from Dad"
58             outcome/2015-02-11.csv: 1455203801, 10, "bought Immortal CD (black metal)"
59             outcome/2015-02-12.csv: 1455307400, 10, "bought Obsidian Gate CD (black metal)"
60              
61             Now, you would to replay all transactions. That's easy
62              
63             use CSV::HistoryPlayer;
64              
65             my $player = CSV::HistoryPlayer->(root_dir => 'path/to/directory');
66             while (my $data = $player->poll) {
67             my ($file, $row) = @$data;
68             my ($when, $how_much, $description) = @$row;
69             my $sign = $file =~ /income/ ? '+' : '-';
70             print $sign, " ", $how_much, '$: ', $description, "\n";
71             }
72              
73             # +10$: got pocket money from Mom
74             # -10$: bought Immortal CD (black metal)
75             # +15$: got pocket money from Dad
76             # -10$: bought Obsidian Gate CD (black metal)
77              
78             I.e. the L virtually unites scattered CSV files,
79             and allows to read evens from them in historically correct order.
80              
81              
82             =head1 ATTRIBUTES
83              
84             =over 2
85              
86             =item * C
87              
88             The root directory, where the csv files should be searched from.
89             This attribute is B.
90              
91             =item * C
92              
93             The closure, which allows to filter out unneeded directories,
94             in the file scan phase to do not include csv-files within
95              
96             my $player = CSV::HistoryPlayer->(
97             ...,
98             # if returns true, than dir will be scanned for csv-files
99             dir_filter => sub { $_[0] =~ /income/ },
100             );
101              
102             By default, all found directories are allowed to be scanned
103             for CSV-files.
104              
105             =item * C
106              
107             The closure, which allows to do custom sort and filtering of found
108             CSV-files in historical order.
109              
110             By default CSV-files are lexically sorted and not filtered.
111              
112             For example, if there are files C<3-Jan-16.csv>, C<4-Jan-16.csv>,
113             ..., they can be sorted with L
114              
115             files_maper => sub {
116             my $orig_files = shift;
117             my @files =
118             map { $_->{file} }
119             sort { $a->{epoch} <=> $b->{epoch} }
120             map {
121             my $date = /(.*\/)(.+)/ ? $2 : die("wrong filename in $_");
122             {
123             file => $_,
124             epoch => Date::Utility->new($date)->epoch,
125             }
126             } @$orig_files;
127             return \@files;
128             }
129              
130              
131             =item * C
132              
133             Returns historically sorted list of found CSV-files; each item in
134             the list is L instance.
135              
136             =back
137              
138             =head1 METHODS
139              
140             =over 2
141              
142             =item * C
143              
144             Returns the reference to the current pointer in the i CSV-file
145             and the actual file.
146              
147             Initially it points to the earliest row of the historically first file.
148             If there are many concurrent files, than the earliest row of them is returned.
149              
150             If end of i CSV-file is reached, then C is returned
151              
152             my $data = $player->peak;
153             if ($data) {
154             my ($file, $row) = @$data;
155             }
156              
157             =item * C
158              
159             The same as C method, but after return of the current row in
160             the i CSV-file, it moves the pointer to the next row.
161             Designed to serve as iterator,
162              
163             while (my $data = $player->poll) {
164             my ($file, $row) = @$data;
165             }
166              
167             =back
168              
169             =head1 ASSUMPTIONS
170              
171             =over 2
172              
173             =item * Same filenames for the same timeframe
174              
175             CSV-files aggregate events on some time-frame (i.e. one day, one hour,
176             one week etc.). The L does not sort content of
177             files due to performance reasons. Than means, if you have files, organized
178             like:
179              
180             event-a/date_1.csv
181             event-b/date_2.csv
182              
183             and C and C intersects, then they should have exactly
184             the same name, e.g.:
185              
186             event-a/3-Jan-16.csv
187             event-b/3-Jan-16.csv
188              
189             to be replayed correctly.
190              
191              
192             =item * unix timestamp is the first column in CSV-files
193              
194             =item * CSV-files are opened with the defaults of L
195              
196             =back
197              
198             =head1 SEE ALSO
199              
200             L, L
201              
202             =head1 SOURCE CODE
203              
204             L
205              
206              
207             =cut
208              
209             has 'root_dir' => (
210             is => 'ro',
211             required => 1
212             );
213              
214             has 'dir_filter' => (
215             is => 'ro',
216             default => sub {
217             return sub { 1 }
218             });
219              
220             has 'files_mapper' => (
221             is => 'ro',
222             default => sub {
223             return sub {
224             my $files = shift;
225             return [sort { $a cmp $b } @$files];
226             }
227             });
228              
229             has 'files' => (is => 'lazy');
230              
231             has _current_data => (is => 'rw');
232              
233             has '_reader' => (is => 'lazy');
234              
235             sub _build_files {
236 6     6   357 my $self = shift;
237              
238 6         6 my @files;
239 6         22 my @dirs_queue = (path($self->root_dir));
240 6         152 my $dir_filter = $self->dir_filter;
241 6         14 while (@dirs_queue) {
242 38         216 my $dir = shift(@dirs_queue);
243 38 100       48 if ($dir_filter->($dir)) {
244 37         226 for my $c ($dir->children) {
245 50 100       2344 push @dirs_queue, $c if (-d $c);
246 50 100 66     562 push @files, $c
247             if ($c =~ /\.csv$/i && -s -r -f _);
248             }
249             }
250             }
251 6         66 my $sorted_files = $self->files_mapper->(\@files);
252 6         49 return $sorted_files;
253             }
254              
255             sub _build__reader {
256 3     3   377 my $self = shift;
257 3         50 my $files = $self->files;
258 3         6 my $clusters = [uniq map { $_->basename } @$files];
  9         98  
259 3         57 my $cluster_idx = -1;
260 3         4 my @cluster_fds;
261             my @cluster_csvs;
262 0         0 my @cluser_files;
263              
264             my $open_cluster = sub {
265 6     6   7 my $cluster_id = $clusters->[$cluster_idx];
266 6         7 @cluser_files = grep { $_->basename eq $cluster_id } @$files;
  21         93  
267 6         52 @cluster_fds = ();
268 6         24 @cluster_csvs = ();
269 6         7 for my $cf (@cluser_files) {
270 9 50       45 my $csv = Text::CSV->new({binary => 1})
271             or die "Cannot use CSV: " . Text::CSV->error_diag();
272 9         629 my $fh = $cf->filehandle("<");
273 9         651 push @cluster_fds, $fh;
274 9         14 push @cluster_csvs, $csv;
275             }
276 3         12 };
277              
278 3         3 my @lines;
279             my $read_line_from_cluster = sub {
280             REDO: {
281              
282             # make sure that we read last line from all cluster files
283 30     30   164 for my $idx (0 .. @cluster_fds - 1) {
  30         65  
284 44 100 66     379 if (!defined $lines[$idx] && !$cluster_csvs[$idx]->eof) {
285 1     1   7 $lines[$idx] =
  1         1  
  1         30  
  30         678  
286             $cluster_csvs[$idx]->getline($cluster_fds[$idx]);
287             }
288             }
289              
290             # we assume that timestamp is the 1st column
291             my @ordered_idx =
292 11         34 sort { $lines[$a]->[0] <=> $lines[$b]->[0] }
293 30         503 grep { defined $lines[$_] } (0 .. @lines - 1);
  47         101  
294 30 100       46 if (@ordered_idx) {
295 21         20 my $idx = shift @ordered_idx;
296 21         19 my $line = $lines[$idx];
297 21         15 my $file = $cluser_files[$idx];
298 21         43 $self->_current_data([$file, $line]);
299 21         26 $lines[$idx] = undef;
300             } else {
301 9 100       21 if ($cluster_idx < @$clusters - 1) {
302 6         12 $open_cluster->(++$cluster_idx);
303 6         48 goto REDO;
304             }
305             }
306             }
307 3         25 };
308              
309 3         11 return $read_line_from_cluster;
310             }
311              
312             sub peek {
313 3     3 1 10 my $self = shift;
314 3 100       23 return $self->_current_data if $self->_current_data;
315 2         27 $self->_reader->();
316 2         9 return $self->_current_data;
317              
318             }
319              
320             sub poll {
321 24     24 1 11968 my $self = shift;
322 24         42 my $result = $self->_current_data;
323 24 100       45 if (!$result) {
324 22         439 $self->_reader->();
325 22         32 $result = $self->_current_data;
326             }
327 24         25 $self->_current_data(undef);
328 24         53 return $result;
329             }
330              
331             =head1 AUTHOR
332              
333             binary.com, C<< >>
334              
335             =head1 BUGS
336              
337             Please report any bugs or feature requests to
338             L.
339              
340             =head1 LICENSE AND COPYRIGHT
341              
342             Copyright (C) 2016 binary.com
343              
344             This program is free software; you can redistribute it and/or modify it
345             under the terms of the the Artistic License (2.0). You may obtain a
346             copy of the full license at:
347              
348             L
349              
350             Any use, modification, and distribution of the Standard or Modified
351             Versions is governed by this Artistic License. By using, modifying or
352             distributing the Package, you accept this license. Do not use, modify,
353             or distribute the Package, if you do not accept this license.
354              
355             If your Modified Version has been derived from a Modified Version made
356             by someone other than you, you are nevertheless required to ensure that
357             your Modified Version complies with the requirements of this license.
358              
359             This license does not grant you the right to use any trademark, service
360             mark, tradename, or logo of the Copyright Holder.
361              
362             This license includes the non-exclusive, worldwide, free-of-charge
363             patent license to make, have made, use, offer to sell, sell, import and
364             otherwise transfer the Package with respect to any patent claims
365             licensable by the Copyright Holder that are necessarily infringed by the
366             Package. If you institute patent litigation (including a cross-claim or
367             counterclaim) against any party alleging that the Package constitutes
368             direct or contributory patent infringement, then this Artistic License
369             to you shall terminate on the date that such litigation is filed.
370              
371             Disclaimer of Warranty: THE PACKAGE IS PROVIDED BY THE COPYRIGHT HOLDER
372             AND CONTRIBUTORS "AS IS' AND WITHOUT ANY EXPRESS OR IMPLIED WARRANTIES.
373             THE IMPLIED WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
374             PURPOSE, OR NON-INFRINGEMENT ARE DISCLAIMED TO THE EXTENT PERMITTED BY
375             YOUR LOCAL LAW. UNLESS REQUIRED BY LAW, NO COPYRIGHT HOLDER OR
376             CONTRIBUTOR WILL BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, OR
377             CONSEQUENTIAL DAMAGES ARISING IN ANY WAY OUT OF THE USE OF THE PACKAGE,
378             EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
379              
380             =cut
381              
382             1;