File Coverage

blib/lib/DiaColloDB/Corpus/Compiled.pm
Criterion Covered Total %
statement 29 210 13.8
branch 1 116 0.8
condition 0 146 0.0
subroutine 11 31 35.4
pod 18 18 100.0
total 59 521 11.3


line stmt bran cond sub pod time code
1             ## -*- Mode: CPerl -*-
2             ## File: DiaColloDB::Corpus::Compiled.pm
3             ## Author: Bryan Jurish <moocow@cpan.org>
4             ## Description: collocation db, source corpus (pre-compiled)
5              
6             package DiaColloDB::Corpus::Compiled;
7              
8 1     1   8 use DiaColloDB::threads;
  1         3  
  1         10  
9 1     1   509 use DiaColloDB::threads::shared;
  1         2  
  1         5  
10             our ($HAVE_THREADS);
11             BEGIN {
12 1 50   1   146 $HAVE_THREADS = $DiaColloDB::threads::shared::MODULE ? 1 : 0;
13             }
14              
15 1     1   7 use DiaColloDB::Corpus;
  1         2  
  1         19  
16 1     1   564 use DiaColloDB::Corpus::Filters;
  1         2  
  1         22  
17 1     1   137 use DiaColloDB::Logger;
  1         2  
  1         26  
18 1     1   5 use DiaColloDB::Utils qw(:fcntl :jobs);
  1         2  
  1         34  
19 1     1   263 use File::Basename qw(basename dirname);
  1         2  
  1         23  
20 1     1   130 use File::Path qw(make_path remove_tree);
  1         2  
  1         23  
21 1     1   94 use strict;
  1         2  
  1         427  
22              
23             ##==============================================================================
24             ## Globals & Constants
25              
26             our @ISA = qw(DiaColloDB::Persistent DiaColloDB::Corpus);
27              
28             ##==============================================================================
29             ## Constructors etc.
30              
31             ## $corpus = CLASS_OR_OBJECT->new(%args)
32             ## + %args, object structure:
33             ## (
34             ## ##-- NEW in DiaColloDB::Corpus::Compiled
35             ## dbdir => $dbdir, ##-- data directory for compiled corpus
36             ## flags => $flags, ##-- open mode flags (fcntl flags or perl-style; default='r')
37             ## filters => \%filters, ##-- corpus filters ( DiaColloDB::Corpus::Filters object or HASH-ref )
38             ## njobs => $njobs, ##-- number of parallel worker jobs for create(); default=-1 (= nCores)
39             ## temp => $bool, ##-- implicitly unlink() on exit?
40             ## logThreads => $level ##-- log-level for thread stuff (default='off')
41             ## ##
42             ## ##-- INHERITED from DiaColloDB::Corpus
43             ## #files => \@files, ##-- source files (OVERRIDE: unused)
44             ## #dclass => $dclass, ##-- DiaColloDB::Document subclass for loading (OVERRIDE force 'DiaColloDB::Document::JSON')
45             ## dopts => \%opts, ##-- options for $dclass->fromFile() (override default={})
46             ## cur => $i, ##-- index of current file
47             ## logOpen => $level, ##-- log-level for open(); default='info'
48             ## )
49             sub new {
50 0     0 1   my $that = shift;
51 0           my $corpus = $that->SUPER::new(
52             ##-- new
53             dbdir => undef,
54             flags => 'r',
55             #filters => DiaColloDB::Corpus::Filters->new(),
56             #temp => 0,
57             #opened => 0,
58             njobs => -1,
59             logThreads => 'off',
60              
61             @_, ##-- user arguments
62              
63             ##-- strong overrides
64             dclass => 'DiaColloDB::Document::JSON',
65             );
66 0 0         $corpus->{filters} = DiaColloDB::Corpus::Filters->new() if (!exists($corpus->{filters}));
67 0 0         return $corpus->open() if (defined($corpus->{dbdir}));
68 0           return $corpus;
69             }
70              
71             sub DESTROY {
72 0     0     my $obj = $_[0];
73 0 0         $obj->unlink() if ($obj->{temp});
74 0 0         $obj->close() if ($obj->opened);
75             }
76              
77             ##==============================================================================
78             ## Persistent API
79              
80             ## @keys = $obj->headerKeys()
81             ## + keys to save as header; default implementation returns all keys of all non-references
82             sub headerKeys {
83 0     0 1   return (grep {$_ !~ m{^log|^(?:cur|dbdir|njobs|opened|flags|files|list|glob|compiled|append|temp)$}} keys %{$_[0]});
  0            
  0            
84             }
85              
86             ## @files = $obj->diskFiles()
87             ## + returns disk storage files, used by du() and timestamp()
88             ## + default implementation returns $obj->{file} or glob("$obj->{base}*")
89             sub diskFiles {
90 0     0 1   my $obj = shift;
91 0 0         return ($obj->{dbdir}) if ($obj->{dbdir});
92 0           return qw();
93             }
94              
95             ## $bool = $obj->unlink(%opts)
96             ## + override %opts:
97             ## close => $bool, ##-- implicitly call $obj->close() ? (default=1)
98             ## + unlinks disk files
99             ## + implcitly calls $obj->close() if available
100             sub unlink {
101 0     0 1   my ($obj,%opts) = @_;
102 0           my $dbdir = $obj->datadir;
103             #$obj->vlog($obj->{logOpen}, "unlink(", $obj->dbdir, ")") if ($obj->opened);
104 0 0 0       $obj->close() if (!exists($opts{close}) || $opts{close});
105 0 0         return (-e $dbdir ? File::Path::remove_tree($dbdir) : 1);
106             }
107              
108             ##----------------------------------------------------------------------
109             ## Compiled API: disk files etc.
110              
111             ## $dirname = $corpus->datadir()
112             ## $dirname = $corpus->datadir($dir)
113 1     1   2580 BEGIN { *dbdir = \&datadir; }
114             sub datadir {
115 0   0 0 1   my $dir = $_[1] // $_[0]{dbdir};
116 0 0         $dir =~ s{/$}{} if ($dir);
117 0           return $dir;
118             }
119              
120             ## $bool = $corpus->truncate()
121             ## + removes all disk data (including header) and resets $corpus->{size}=0
122             sub truncate {
123 0     0 1   my $corpus = shift;
124 0 0         return undef if (!$corpus->unlink(close=>0));
125 0           $corpus->{size} = 0;
126 0           return $corpus;
127             }
128              
129             ## $filters = $ccorpus->filters()
130             ## + return corpus filters as a DiaColloDB::Corpus::Filters object
131             sub filters {
132 0 0   0 1   return $_[0]{filters} if (UNIVERSAL::isa($_[0]{filters},'DiaColloDB::Corpus::Filters'));
133 0 0         return DiaColloDB::Corpus::Filters->null() if (!defined($_[0]{filters}));
134 0           return DiaColloDB::Corpus::Filters->new( %{$_[0]{filters}} );
  0            
135             }
136              
137             ##==============================================================================
138             ## Corpus API: open/close
139              
140             ## $bool = $corpus->open([$dbdir], %opts); ##-- compat
141             ## $bool = $corpus->open($dbdir, %opts); ##-- new
142             ## + opens corpus "$base.*"
143             ## + \@ARGV should be a single-element $dbdir, or (dbdir=>$dbdir) must exist or be specified in %opts
144             ## + DiaColloDB::Corpus %opts:
145             ## compiled => $bool, ##-- implicit
146             ## glob => $bool, ##-- (ignored) whether to glob arguments
147             ## list => $bool, ##-- (ignored) whether arguments are file-lists
148             sub open {
149 0     0 1   my ($corpus,$argv,%opts) = @_;
150 0           delete @opts{qw(compiled glob list)};
151 0 0         $corpus = $corpus->new() if (!ref($corpus));
152 0 0         $corpus->close() if ($corpus->opened);
153 0           @$corpus{keys %opts} = values(%opts);
154              
155             ##-- sanity check(s): dbdir
156 0           my $dbdir = $corpus->dbdir;
157 0 0         if (UNIVERSAL::isa($argv,'ARRAY')) {
    0          
158 0 0         if (@$argv==1) {
159 0           $dbdir = $argv->[0]; ##-- single-element list
160             } else {
161 0           $corpus->logconfess("open(): can't handle multi-element array");
162             }
163             } elsif (defined($argv)) {
164 0           $dbdir = $argv; ##-- simple scalar
165             }
166 0 0         $corpus->{dbdir} = $corpus->dbdir($dbdir)
167             or $corpus->logconfess("open(): no {dbdir} specified!");
168              
169 0 0         my $flags = $corpus->{flags} = (fcflags($corpus->{flags}) | ($corpus->{append} ? fcflags('>>') : 0));
170 0           $corpus->vlog($corpus->{logOpen}, "open(", fcperl($flags), "$dbdir)");
171              
172             ##-- flag-dependent dispatch
173 0 0 0       if (fcwrite($flags) && fctrunc($flags)) {
174             ##-- truncate: remove any stale corpus
175 0 0         $corpus->truncate()
176             or $corpus->logconfess("open(): failed to truncate stale corpus $corpus->{dbdir}/: $!");
177             }
178 0 0 0       if (fcwrite($flags) && fccreat($flags)) {
179             ##-- create: data-directory
180 0           my $datadir = $corpus->datadir;
181 0 0 0       -d $datadir
182             or make_path($datadir)
183             or $corpus->logconfess("open(): could not create data directory '$datadir': $!");
184             }
185 0 0 0       if (fcread($flags) && !fctrunc($flags)) {
186             ##-- read-only, no create
187 0 0         $corpus->loadHeaderFile
188             or $corpus->logconfess("open(): failed to load header-file ", $corpus->headerFile);
189             }
190              
191             ##-- force options: dclass, files, opened
192 0           $corpus->{opened} = 1;
193 0           $corpus->{dclass} = 'DiaColloDB::Document::JSON';
194 0           delete $corpus->{files};
195              
196 0           return $corpus;
197             }
198              
199             ## $bool = $corpus->close()
200             sub close {
201 0     0 1   my $corpus = shift;
202 0 0         $corpus->vlog($corpus->{logOpen}, "close(", $corpus->dbdir, ")") if ($corpus->opened);
203 0 0 0       my $rc = ($corpus->opened && fcwrite($corpus->{flags}) ? $corpus->flush : 1);
204 0   0       $rc &&= $corpus->SUPER::close();
205 0 0         if ($rc) {
206 0           $corpus->{opened} = 0;
207 0           $corpus->{size} = 0;
208             }
209 0           return $rc;
210             }
211              
212             ##----------------------------------------------------------------------
213             ## Compiled API: open/close
214              
215             ## $bool = $corpus->opened()
216             ## + Returns true iff $corpus is currently opened.
217             sub opened {
218 0     0 1   my $corpus = shift;
219 0   0       return $corpus->{dbdir} && $corpus->{opened};
220             }
221              
222             ## $bool = $corpus->flush()
223             ## + flush pending data (header) to disk
224             sub flush {
225 0     0 1   my $corpus = shift;
226 0 0 0       return undef if (!$corpus->opened || !fcwrite($corpus->{flags}));
227 0 0         $corpus->saveHeader()
228             or $corpus->logconfess("flush(): failed to store header file ", $corpus->headerFile, ": $!");
229             }
230              
231             ## $corpus = $corpus->reopen(%opts)
232             ## + close and re-open corpus (e.g. with different flags)
233             sub reopen {
234 0     0 1   my $corpus = shift;
235 0           my $dbdir = $corpus->{dbdir};
236 0 0         return $corpus if (!$corpus->opened);
237 0   0       return $corpus->close() && $corpus->open([$dbdir], @_);
238             }
239              
240             ##==============================================================================
241             ## Corpus API: iteration
242             ## + mostly inherited from DiaColloDB::Corpus
243              
244             ## $nfiles = $corpus->size()
245             sub size {
246 0   0 0 1   return $_[0]{size} // 0;
247             }
248              
249             ## $bool = $corpus->iok()
250             ## + true if iterator is valid
251             sub iok {
252 0   0 0 1   return $_[0]{cur} < ($_[0]{size}//0);
253             }
254              
255             ## $label = $corpus->ifile()
256             ## $label = $corpus->ifile($pos)
257             ## + current iterator label
258             sub ifile {
259 0   0 0 1   my $pos = $_[1] // $_[0]{cur};
260 0 0         return undef if ($pos >= $_[0]{size});
261 0           return "$_[0]{dbdir}/$pos.json";
262             }
263              
264             ## $doc_or_undef = $corpus->idocument()
265             ## $doc_or_undef = $corpus->idocument($pos)
266             ## + gets current document
267             sub idocument {
268 0     0 1   my ($corpus,$pos) = @_;
269 0   0       $pos //= $corpus->{cur};
270 0 0         return undef if ($pos >= $corpus->size);
271 0   0       return $corpus->{dclass}->fromFile($corpus->ifile($pos), %{$corpus->{dopts}//{}});
  0            
272             }
273              
274              
275             ##==============================================================================
276             ## Corpus::Compiled API: corpus compilation
277              
278             ## $ccorpus = CLASS_OR_OBJECT->create($src_corpus, %opts)
279             ## + compile or append a single $src_corpus to $opts{dbdir}, returns $ccorpus
280             ## + honors $opts{flags} for append and truncate
281             sub create {
282 0     0 1   my ($that,$icorpus,%opts) = @_;
283 0 0         my $ocorpus = ref($that) ? $that : $that->new();
284 0           my $logas = 'create()';
285 0           $ocorpus->vlog('info',$logas);
286              
287             ##-- save options
288             my $odir = $ocorpus->dbdir($opts{dbdir})
289 0 0         or $ocorpus->logconfess("$logas: no output corpus {dbdir} specified");
290              
291 0   0       my $flags = (fcflags($ocorpus->{flags}) | fcflags($opts{flags})) || fcflags('w');
292 0           delete $opts{dbdir};
293              
294             ##-- (re-)open output corpus
295 0 0 0       if (!$ocorpus->opened || ($ocorpus->{dbdir} ne $odir)) {
296 0 0         $ocorpus->open([$odir], %opts, flags=>$flags)
297             or $ocorpus->logconfess("$logas: failed to (re-)open output corpus '$odir' in mode '", fcperl($flags));
298             }
299 0           @$ocorpus{keys %opts} = values %opts;
300              
301             ##-- check whether we're doing any filtering at all
302 0           my $filters = $ocorpus->filters();
303 0           my $dofilter = !$filters->isnull();
304 0 0         if ($dofilter) {
305 0           $ocorpus->vlog('info', "$logas: corpus content filters enabled");
306 0           foreach (grep {defined($filters->{$_})} sort keys %$filters) {
  0            
307 0           $ocorpus->vlog('info', " + filter $_ => $filters->{$_}");
308             }
309             } else {
310 0           $ocorpus->vlog('info', "$logas: corpus content filters disabled");
311             }
312              
313             ##-- common data
314 0           my $nfiles = $icorpus->size();
315 0   0       my $logFileN = $ocorpus->{logFileN} || int($nfiles / 20) || 1;
316 0           my @outkeys = keys %{DiaColloDB::Document->new};
  0            
317              
318 0           my $osize = $ocorpus->size();
319 0           my $outdir = $ocorpus->datadir();
320              
321 0           my $filei_shared = 0;
322 0           share( $filei_shared );
323              
324             ##--------------------------------------------
325             my $cb_worker = sub {
326 0   0 0     my $thrid = shift || DiaColloDB::threads->tid();
327 0           $logas .= "#$thrid";
328 0           (*STDERR)->autoflush(1);
329 0           $ocorpus->vlog($ocorpus->{logThreads}, "$logas: starting worker thread #$thrid");
330              
331             ##-- initialize: disable auto-deletion
332 0           $ocorpus->{temp} = 0;
333              
334             ##-- initialize filters (formerly in DiaColloDB.pm)
335 0 0         my $cfilters = $dofilter ? $filters->compile() : {}
    0          
336             or $ocorpus->logconfess("$logas: failed to compile corpus content filters: $!");
337             ##
338             ##-- initialize: filters: variables
339 0           my ($pgood, $pbad, $wgood, $wbad, $lgood, $lbad ) = @$cfilters{map {("${_}good","${_}bad")} qw(p w l)};
  0            
340 0           my ($pgoodh,$pbadh,$wgoodh,$wbadh,$lgoodh,$lbadh) = @$cfilters{map {("${_}goodfile","${_}badfile")} qw(p w l)};
  0            
341 0           my ($tok,$w,$p,$l);
342              
343 0           my ($filei);
344 0           while (1) {
345             {
346 0           lock($filei_shared);
  0            
347 0           $filei = $filei_shared;
348 0           ++$filei_shared;
349             }
350 0 0         last if ($filei >= $nfiles);
351              
352 0           my $idoc = $icorpus->idocument($filei);
353 0           my $infile = $icorpus->ifile($filei);
354 0           my $outfile = "$outdir/".($filei+$osize).".json";
355              
356             #$ocorpus->vlog('info', sprintf("processing files [%3.0f%%]: %s -> %s", 100*($filei-1)/$nfiles, $infile, $outfile))
357 0 0 0       $ocorpus->vlog('info', sprintf("%s: processing files [%3.0f%%]: %s", $logas, 100*($filei-1)/$nfiles, $infile))
358             if ($logFileN && ($filei % $logFileN)==0);
359              
360             ##-- apply filters
361 0 0         if ($dofilter) {
362 0           my $ftokens = [];
363 0           foreach $tok (@{$idoc->{tokens}}) {
  0            
364 0 0         if (ref($tok)) {
365             ##-- normal token: apply filters
366 0           ($w,$p,$l) = @$tok{qw(w p l)};
367             next if ((defined($pgood) && $p !~ $pgood) || ($pgoodh && !exists($pgoodh->{$p}))
368             || (defined($pbad) && $p =~ $pbad) || ($pbadh && exists($pbadh->{$p}))
369             || (defined($wgood) && $w !~ $wgood) || ($wgoodh && !exists($wgoodh->{$w}))
370             || (defined($wbad) && $w =~ $wbad) || ($wbadh && exists($wbadh->{$w}))
371             || (defined($lgood) && $l !~ $lgood) || ($lgoodh && !exists($lgoodh->{$l}))
372 0 0 0       || (defined($lbad) && $l =~ $lbad) || ($lbadh && exists($lbadh->{$l}))
      0        
      0        
      0        
      0        
      0        
      0        
      0        
      0        
      0        
      0        
      0        
      0        
      0        
      0        
      0        
      0        
      0        
      0        
      0        
      0        
      0        
      0        
373             );
374             }
375 0 0 0       push(@$ftokens,$tok) if (defined($tok) || (@$ftokens && defined($ftokens->[$#$ftokens])));
      0        
376             }
377 0           $idoc->{tokens} = $ftokens;
378             }
379              
380             ##-- create output document
381 0           my $odoc = {};
382 0           @$odoc{@outkeys} = @$idoc{@outkeys};
383              
384             ##-- dump output document (json)
385 0 0         DiaColloDB::Utils::saveJsonFile($odoc,$outfile, pretty=>0,canonical=>0)
386             or $ocorpus->logconfess("$logas: failed to save JSON data for '$infile' to '$outfile': $!");
387             }
388              
389 0           $ocorpus->vlog($ocorpus->{logThreads}, "$logas: worker thread #$thrid exiting normally");
390 0           $ocorpus->{logOpen} = 'off'; ##-- suppress 'close()' messages from worker threads
391 0           };
392             ##--/cb_worker
393              
394             ##-- spawn workers
395 0           my $njobs = nJobs($ocorpus->{njobs});
396 0 0 0       if ($njobs==0 || !$HAVE_THREADS) {
397 0           $ocorpus->info("$logas: running in serial mode");
398 0           $cb_worker->(0);
399             } else {
400 0           $ocorpus->info("$logas: running in parallel mode with $njobs job(s)");
401 0           my @workers = (map {threads->new($cb_worker,$_)} (1..$njobs));
  0            
402 0           foreach my $thrid (1..$njobs) {
403 0           my $worker = $workers[$thrid-1];
404 0           $worker->join();
405 0 0         if (defined(my $err=$worker->error)) {
406 0           $ocorpus->logconfess("$logas: error for worker thread #$thrid: $err");
407             }
408             }
409             }
410              
411             ##-- adopt list of compiled files
412 0           $ocorpus->{size} += $nfiles;
413              
414             ##-- save header (happens implicitly on DESTROY() via close())
415             #$ocorpus->saveHeader()
416             # or $ocorpus->logconfess("$logas: failed to save header file ", $ocorpus->headerFile, ": $!");
417              
418 0           return $ocorpus;
419             }
420              
421              
422             ##==============================================================================
423             ## Corpus::Compiled API: union
424              
425             ## $ccorpus = $ccorpus->union(\@sources, %opts)
426             ## + merge source corpora \@sources to $opts{dbdir}, destructive
427             ## + each $src in \@sources is either a Corpus::Compiled object or a simple scalar (dbdir of a Corpus::Compiled object)
428             ## + honors $opts{flags} for append and truncate
429             ## + no filters are applied
430             sub union {
431 0     0 1   my ($that,$sources,%opts) = @_;
432 0 0         my $ocorpus = ref($that) ? $that : $that->new();
433 0           my $logas = 'union()';
434 0           $ocorpus->vlog('info',$logas);
435              
436             ##-- save options before open()
437             my $odir = $ocorpus->dbdir($opts{dbdir})
438 0 0         or $ocorpus->logconfess("$logas: no output corpus {dbdir} specified");
439 0   0       my $flags = (fcflags($ocorpus->{flags}) | fcflags($opts{flags})) || fcflags('w');
440 0           delete $opts{dbdir};
441              
442             ##-- (re-)open output corpus
443 0 0 0       if (!$ocorpus->opened || ($ocorpus->{dbdir} ne $odir)) {
444 0 0         $ocorpus->open([$odir], %opts, flags=>$flags)
445             or $ocorpus->logconfess("$logas: failed to (re-)open output corpus '$odir' in mode '", fcperl($flags));
446             }
447 0           @$ocorpus{keys %opts} = values %opts;
448              
449             ##-- union: guts
450 0 0         foreach my $src (UNIVERSAL::isa($sources,'ARRAY') ? @$sources : $sources) {
451 0 0         my $idir = ref($src) ? $src->{dbdir} : $src;
452 0           $ocorpus->vlog('info',"$logas: processing $idir");
453              
454 0 0         my $icorpus = ref($src) ? $src : DiaColloDB::Corpus::Compiled->new(dbdir=>$src,logOpen=>undef)
    0          
455             or $ocorpus->logconfess("union(): failed to open input corpus '$src': $!");
456              
457 0           my $nifiles = $icorpus->{size};
458 0           my $osize = $ocorpus->size;
459              
460 0           my ($filei,$infile,$outfile);
461 0           for ($filei=0; $filei < $nifiles; ++$filei) {
462 0           $infile = $icorpus->ifile($filei);
463 0           $outfile = "$odir/".($filei+$osize).".json";
464              
465             ##-- link
466 0 0 0       link($infile,$outfile)
467             or symlink($infile,$outfile)
468             or $ocorpus->logconfess("union(): failed to create output link $outfile -> $infile: $!");
469             }
470 0           $ocorpus->{size} += $nifiles;
471             }
472              
473             ##-- all done
474             #$ocorpus->vlog('info', "merged ", scalar(@$sources), " input corpora to $odir");
475 0           return $ocorpus;
476             }
477              
478              
479             ##==============================================================================
480             ## Footer
481             1;
482              
483             __END__
484              
485              
486              
487