File Coverage

blib/lib/ODS/Storage/Directory.pm
Criterion Covered Total %
statement 208 243 85.6
branch 46 68 67.6
condition 14 21 66.6
subroutine 33 39 84.6
pod 0 27 0.0
total 301 398 75.6


line stmt bran cond sub pod time code
1             package ODS::Storage::Directory;
2              
3 68     68   368 use YAOO;
  68         81  
  68         426  
4 68     68   26921 use Cwd qw/getcwd/;
  68         162  
  68         3768  
5 68     68   39738 use Parallel::ForkManager;
  68         3732715  
  68         3979  
6              
7             extends 'ODS::Storage::Base';
8              
9 68     68   600 use ODS::Utils qw/load move unique_class_name error write_directory/;
  68         24  
  68         947  
10              
11             auto_build;
12              
13             has file_handle => isa(fh);
14              
15             has directory => isa(string);
16              
17             has cache_directory => isa(string);
18              
19             has remove_regex => isa(string("\'\"\."));
20              
21             sub all {
22 101     101 0 2283 my ($self, %params) = @_;
23              
24 101   50     2065 $params{type} ||= 'all';
25 101   66     830 $params{sort} ||= $self->table->keyfield;
26 101   100     2229 $params{sort_direction} ||= 'asc';
27              
28 101         720 my ($data, $from_cache) = $self->into_rows($self->cache_or_all(%params));
29              
30 38         11309 return $data;
31             }
32              
33             sub create {
34 1717 50   1717 0 21521 my ($self, %params) = (shift, @_ > 1 ? @_ : %{ $_[0] });
  1717         12425  
35              
36 1717   66     11780 my $file = $params{__custom_file_name} || sprintf '%s_%s.%s', time, unique_class_name, $self->serialize_class->file_suffix;
37              
38 1717         109381 $params{__file} = $file;
39              
40 1717         4735 $file .= '.tmp';
41              
42 1717         7588 my $data = $self->into_rows(\%params, 1);
43              
44 1717         20439 $data->validate();
45              
46 1717 100       9414 if ($self->table->rows) {
47 1715         18697 push @{ $self->table->rows }, $data;
  1715         2903  
48             } else {
49 2 50 50     35 $self->table->rows(ref($data || "") eq 'ARRAY' ? $data : [$data]);
50             }
51              
52 1717         18387 $data = $self->into_storage($data);
53              
54 1717         13341 $self->write_file(sprintf("%s/%s", $self->directory, $file), $data);
55              
56 1717         279230 $self->cache_clear();
57              
58 1717         5458 $self->table;
59             }
60              
61             sub search {
62 2 50   2 0 85 my ($self, %params) = (shift, @_ > 1 ? @_ : %{ $_[0] });
  0         0  
63              
64 2         21 my $cache_prefix = $self->cache_prefix('search', %params);
65              
66 2         16 my ($data, $from_cache) = $self->cache_or_filter($cache_prefix, %params);
67              
68 2 100 66     48 if (ref $data eq 'ARRAY' && ref $data->[0] eq 'HASH') {
69 1         4 $data = [ map { $self->into_rows($_) } @{ $self->into_rows($data) } ];
  1         37  
  1         9  
70             }
71              
72 2         23 my $table = $self->table->clone();
73 2         12 $table->rows($data);
74 2         120 return ODS::Iterator->new(table => $table);
75             }
76              
77             sub find {
78 2 50   2 0 52 my ($self, %params) = (shift, @_ > 1 ? @_ : %{ $_[0] });
  0         0  
79              
80 2         15 my $cache_prefix = $self->cache_prefix('find', %params);
81              
82 2         23 my ($data, $from_cache) = $self->cache_or_find($cache_prefix, %params);
83              
84 2 50       12 if (ref $data eq 'HASH') {
85 0         0 $data = $self->into_rows($data);
86             }
87              
88 2         14 return $data;
89             }
90              
91             sub update {
92 0     0 0 0 my ($self, $update, %params) = (shift, pop, @_);
93              
94 0         0 my $find = $self->find(%params);
95              
96 0 0       0 croak sprintf "No row found for search params %s", Dumper \%params
97             unless $find;
98              
99 0         0 $find->validate($update);
100              
101 0         0 $self->update_row($find);
102             }
103              
104             sub update_row {
105 3     3 0 33 my ($self, $row) = @_;
106              
107 3         14 my $data = $self->into_storage($row);
108              
109 3   33     16 my $file = $row->__custom_file_name || $row->__file;
110              
111 3         16 $self->write_file(sprintf("%s/%s", $self->directory, $file), $data);
112              
113 3         318 $self->table;
114             }
115              
116             sub delete {
117 1 50   1 0 22 my ($self, %params) = (shift, @_ > 1 ? @_ : %{ $_[0] });
  1         5  
118              
119 1 50       5 my $data = $self->table->rows ? ODS::Iterator->new(table => $self->table) : $self->all;
120              
121             my $index = $data->find_index(sub {
122 2     2   4 my $row = shift;
123 2         4 my $select = 1;
124 2         6 for my $key ( keys %params ) {
125 2 100       10 if ( $params{$key} ne $row->{$key} ) {
126 1         3 $select = undef;
127 1         2 last;
128             }
129             }
130 2         7 $select;
131 1         229 });
132              
133 1         21 my $delete = $data->splice($index, 1);
134              
135 1         111 my $file = $delete->__file;
136              
137 1         6 $self->unlink_file(sprintf("%s/%s", $self->directory, $file));
138              
139 1         10 $self->cache_clear($delete);
140              
141 1         7 $self->table;
142             }
143              
144             sub delete_row {
145 0     0 0 0 my ($self, $r) = @_;
146              
147 0         0 my $data = ODS::Iterator->new(table => $self->table);
148              
149 0         0 my $keyfield = $data->table->keyfield;
150              
151 0         0 my $index;
152 0 0       0 if ($keyfield) {
153             $index = $data->find_index(sub {
154 0     0   0 $_[0]->{$keyfield} eq $r->$keyfield;
155 0         0 });
156             } else {
157             $index = $data->find_index(sub {
158 0     0   0 my $row = shift;
159 0         0 my $select = 1;
160 0         0 for my $key ( keys %{ $row->columns } ) {
  0         0  
161 0 0       0 if ( $r->$key ne $row->{$key} ) {
162 0         0 $select = undef;
163 0         0 last;
164             }
165             }
166 0         0 $select;
167 0         0 });
168             }
169              
170 0         0 my $delete = $data->splice($index, 1);
171              
172 0         0 my $file = $delete->__file;
173              
174 0         0 $self->unlink_file(sprintf("%s/%s", $self->directory, $file));
175              
176 0         0 $self->cache_clear($delete);
177              
178 0         0 $self->table;
179             }
180              
181             sub parse_data_format {
182 0     0 0 0 my ($self, $data) = @_;
183 0         0 return $self->serialize_class->parse($data);
184             }
185              
186             sub stringify_data_format {
187 1720     1720 0 3385 my ($self, $data) = @_;
188 1720         4457 return $self->serialize_class->stringify($data);
189             }
190              
191             # methods very much specific to files
192              
193             sub directory_files_last_updated {
194 105     105 0 255 my ($self) = @_;
195 105         455 my $files = $self->read_directory($self->directory);
196 105         2849 (my $last_update = $files->[-1]) =~ s/(\d+).*/$1/;
197 105         785 return ($files, $last_update);
198             }
199              
200             sub cache_write {
201 40     40 0 468 my ($self, $type, $data) = @_;
202 40         604 my $file = sprintf "%s/%s__%s.%s.tmp", $self->cache_directory, $type, time, $self->serialize_class->file_suffix;
203 40         2398 $self->write_file($file, $data);
204             }
205              
206             sub cache_clear {
207 1718     1718 0 7752 my ($self, $row) = @_;
208 1718         5152 my $files = $self->read_directory($self->cache_directory);
209 1718         3968 for (@{$files}) {
  1718         5689  
210 5         20 my %file_params = $self->cache_parse_filename($_);
211 5         10 my $clear = 1;
212 5 100       18 if (scalar keys %file_params) {
213             PARAM:
214 3         8 for my $key ( keys %file_params ) {
215 5 100       45 next PARAM if $key =~ m/^__/;
216 3 100 100     30 if (!$row || $row->$key ne $file_params{$key}) {
217 2         49 $clear = 0;
218 2         6 last PARAM;
219             }
220             }
221             }
222 5 100       25 $self->unlink_file(sprintf("%s/%s", $self->cache_directory, $_))
223             if $clear;
224             }
225             }
226              
227             sub cache_parse_filename {
228 5     5 0 15 my ($self, $name) = @_;
229 5         10 my %file;
230 5 100       36 return %file unless $name =~ s/^find__//;
231 3         13 my @parts = split "__", $name;
232 3         23 ($file{__create_time} = pop @parts) =~ s/\.\w+$//;
233 3         10 for (@parts) {
234 3         12 my ($key, $value) = split "_", $_;
235 3         22 $file{$key} = $value;
236             }
237 3         20 return %file;
238             }
239              
240              
241             sub cache_prefix {
242 105     105 0 346 my ($self, $type, %args) = @_;
243              
244 105         534 my $regex = $self->remove_regex;
245 105         1253 for my $key ( keys %args ) {
246 206         2352 (my $value = $args{$key}) =~ s/$regex//g;
247 206         815 $type .= sprintf('__%s_%s', $key, $value);
248             }
249              
250 105         426 return $type;
251             }
252              
253             sub cache_file {
254 105     105 0 320 my ($self, $type) = @_;
255              
256             my @cache_file = grep {
257 40         958 $_ =~ m/^$type/;
258 105         308 } @{ $self->read_directory($self->cache_directory) };
  105         526  
259              
260 105 100       575 return scalar @cache_file ? sprintf( "%s/%s", $self->cache_directory, $cache_file[0]) : undef;
261             }
262              
263             sub cache_or_all {
264 101     101 0 452 my ($self, %args) = @_;
265              
266 101         331 my $type = delete $args{type};
267              
268 101         471 my $file_prefix = $self->cache_prefix($type, %args);
269              
270 101         856 my ($files, $last_update) = $self->directory_files_last_updated();
271              
272 101         566 my $cache_file = $self->cache_file($file_prefix);
273              
274 101 100       463 if ($cache_file) {
275 1         6 return ($self->serialize_class->parse(
276             $self->read_file($cache_file)
277             ), 1);
278             }
279              
280 100         2508 my $fm = Parallel::ForkManager->new(5000);
281              
282 100         495932 my @data;
283             $fm->run_on_finish(sub {
284 907     907   358291213 my ($pid, $exit_code, $ident, $exit_signal, $core_dump, $data_structure_reference) = @_;
285              
286 907         4590 push @data, $data_structure_reference;
287              
288 100         2293 });
289              
290             READ_FILE:
291 100         1395 for my $file (@{$files}) {
  100         290  
292 1785 100       17212576 my $pid = $fm->start and next READ_FILE;
293              
294 63         1819837 my $d = $self->serialize_class->parse(
295             $self->read_file(sprintf("%s/%s", $self->directory, $file))
296             );
297              
298 63         35431 $d->{__file} = $file;
299 63 100       4132 if ($file !~ m/^\d{9}\d+/) {
300 2         51 $d->{__custom_file_name} = $file;
301             }
302              
303              
304 63         21491 $fm->finish(0, $d);
305             }
306              
307 37         1314772 $fm->wait_all_children;
308              
309 37 100       1041 if ($args{sort_direction} eq 'asc') {
310 36         603 @data = sort { $a->{$args{sort}} cmp $b->{$args{sort}} } @data;
  3243         7622  
311             } else {
312 1         22 @data = sort { $b->{$args{sort}} <=> $a->{$args{sort}} } @data;
  106         559  
313             }
314 37         1655 $self->cache_write($file_prefix, $self->serialize_class->stringify(\@data));
315              
316 37         9850 return \@data;
317             }
318              
319             sub cache_or_filter {
320 2     2 0 10 my ($self, $type, %params) = @_;
321              
322 2         12 my ($files, $last_update) = $self->directory_files_last_updated();
323              
324 2         15 my $cache_file = $self->cache_file($type);
325              
326 2 100       22 if ($cache_file) {
327 1         7 return $self->serialize_class->parse(
328             $self->read_file($cache_file)
329             );
330             }
331              
332 1 50       6 my $data = $self->table->rows ? ODS::Iterator->new(table => $self->table) : $self->all();
333              
334             my $select = $data->filter(sub {
335 3     3   7 my $row = shift;
336 3         6 my $select = 1;
337 3         9 for my $key ( keys %params ) {
338 3 100       14 if ( $params{$key} ne $row->{$key} ) {
339 2         3 $select = undef;
340 2         6 last;
341             }
342             }
343 3         25 $select;
344 1         292 });
345              
346 1         10 $self->cache_write($type, $self->serialize_class->stringify([ map { $_->as_hash } @{$select}]));
  1         5  
  1         10  
347              
348 1         8361 return $select;
349             }
350              
351             sub cache_or_find {
352 2     2 0 10 my ($self, $type, %params) = @_;
353              
354 2         29 my ($files, $last_update) = $self->directory_files_last_updated();
355              
356 2         11 my $cache_file = $self->cache_file($type);
357              
358 2 50       11 if ($cache_file) {
359 0         0 return $self->serialize_class->parse(
360             $self->read_file($cache_file)
361             );
362             }
363              
364 2 50       11 my $data = $self->table->rows ? ODS::Iterator->new(table => $self->table) : $self->all;
365              
366             # this only works for JSON and YAML, CSS and JSONL we can stream/read rows/lines instead of reading/loading
367             # all into memory.
368             my $select = $data->find(sub {
369 3     3   6 my $row = shift;
370 3         6 my $select = 1;
371 3         12 for my $key ( keys %params ) {
372 3 100       14 if ( $params{$key} ne $row->{$key} ) {
373 1         3 $select = undef;
374 1         4 last;
375             }
376             }
377 3         9 $select;
378 2         559 });
379              
380 2 50       27 $self->cache_write($type, $self->serialize_class->stringify($select->as_hash))
381             if ($select);
382              
383 2         436 return $select;
384             }
385              
386             sub open_file {
387 65     65 0 3254 my ($self, $file) = @_;
388 2 50   2   4920 open my $fh, '<:encoding(UTF-8)', $file or die "Cannot open file $file for reading: $!";
  2         44  
  2         44  
  65         417897  
389 65         648282 return $fh;
390             }
391              
392             sub open_write_file {
393 1760     1760 0 2950 my ($self, $file) = @_;
394 1760         6222 write_directory($file, 1);
395 1760 50   6   255556 open my $fh, '>:encoding(UTF-8)', $file or die "Cannot open file $file for writing: $!";
  6         6460  
  6         106  
  6         67  
396 1760         163176 return $fh;
397             }
398              
399             sub seek_file {
400 0     0 0 0 my ($self, @args) = @_;
401 0 0       0 @args = (0, 0) if (!scalar @args);
402 0         0 seek $self->file_handle, shift @args, shift @args;
403             }
404              
405             sub read_file {
406 65     65 0 65075 my ($self, $file) = @_;
407 65         3159 my $fh = $self->open_file($file);
408 65         46477 my $data = do { local $/; <$fh> };
  65         61463  
  65         109529  
409 65         60798 return $data;
410             }
411              
412             sub read_directory {
413 1928     1928 0 17147 my ($self, $directory) = @_;
414 1928         6500 write_directory($directory);
415 1928 50       77106 opendir(my $dh, $directory) || die "Can't opendir $directory: $!";
416 1928         46097 my @files = sort { $a cmp $b } grep { $_ !~ m/^\.+$/ } readdir($dh);
  2528         4176  
  6517         23227  
417 1928         16883 closedir $dh;
418 1928         12571 return \@files;
419             }
420              
421             sub write_file {
422 1760     1760 0 17043 my ($self, $file, $data) = @_;
423 1760         4389 my $fh = $self->open_write_file($file);
424 1760         17260 print $fh $data;
425 1760         5410 $self->close_file($fh);
426 1760         13726 (my $real = $file) =~ s/\.tmp$//;
427 1760         8318 move($file, $real);
428             }
429              
430             sub unlink_file {
431 4     4 0 49 my ($self, $file) = @_;
432 4         649 unlink $file;
433             }
434              
435             sub close_file {
436 1760     1760 0 101952 close $_[1];
437             }
438              
439             1;