File Coverage

blib/lib/AtteanX/Query/Cache/Analyzer.pm
Criterion Covered Total %
statement 41 114 35.9
branch 0 24 0.0
condition n/a
subroutine 14 18 77.7
pod 1 3 33.3
total 56 159 35.2


line stmt bran cond sub pod time code
1             package AtteanX::Query::Cache::Analyzer;
2              
3 5     5   5378551 use 5.010001;
  5         12  
4 5     5   23 use strict;
  5         7  
  5         88  
5 5     5   16 use warnings;
  5         7  
  5         220  
6              
7             our $AUTHORITY = 'cpan:KJETILK';
8             our $VERSION = '0.001_04';
9              
10 5     5   19 use Moo;
  5         7  
  5         27  
11 5     5   1134 use Attean::RDF qw(triplepattern variable iri);
  5         8  
  5         277  
12 5     5   24 use Types::Standard qw(Str Int InstanceOf ConsumerOf);
  5         5  
  5         33  
13 5     5   3072 use Types::URI -all;
  5         6  
  5         78  
14 5     5   12182 use AtteanX::Parser::SPARQL;
  5         232028  
  5         213  
15 5     5   2789 use AtteanX::Query::Cache::Analyzer::Model;
  5         13  
  5         171  
16 5     5   2329 use AtteanX::QueryPlanner::Cache;
  5         11  
  5         159  
17 5     5   1997 use AtteanX::Query::Cache::Analyzer::QueryPlanner;
  5         13  
  5         151  
18 5     5   2138 use AtteanX::Query::Cache::Retriever;
  5         8  
  5         142  
19              
20 5     5   27 use Carp;
  5         6  
  5         2503  
21              
22             has 'query' => (is => 'ro', required => 1, isa => Str);
23             has 'algebra' => (is => 'ro', isa => ConsumerOf['Attean::API::Algebra'], builder => '_parse_query', lazy => 1);
24             has 'base_uri' => (is => 'ro', default => 'http://default.invalid/');
25              
26             has 'model' => (is => 'ro', isa => InstanceOf['AtteanX::Query::Cache::Analyzer::Model'], required => 1);
27              
28             has 'graph' => (is => 'ro', isa => InstanceOf['Attean::IRI'], default => sub { return iri('http://example.invalid')});
29              
30             has 'improvement_threshold' => (is => 'ro', isa => Int, default => '10');
31             has 'improvement_top' => (is => 'ro', isa => Int, default => '3');
32              
33             has 'count_threshold' => (is => 'ro', isa => Int, default => '3');
34              
35             has 'max_triples' => (is => 'ro', isa => Int, default => sub { return $ENV{'LDF_MAX_TRIPLES'} || 100000 });
36              
37             with 'MooX::Log::Any';
38              
39             =pod
40              
41             =over
42              
43             =item C<< store >>
44              
45             A L<Redis> object. This has two purposes: First, to store any
46             data the analyzer needs to persist to decide when to prefetch. Second,
47             it uses Redis' publish-subscribe system to publish the URLs containing
48             queries that the prefetcher should fetch.
49              
50             =cut
51              
52             has store => (is => 'ro',
53             isa => InstanceOf['Redis'],
54             required => 1
55             );
56              
57             sub _parse_query {
58 0     0     my $self = shift;
59 0           my $parser = AtteanX::Parser::SPARQL->new();
60 0           my ($algebra) = $parser->parse_list_from_bytes($self->query, $self->base_uri); # TODO: this is a bit of cargocult
61 0           return $algebra;
62             }
63              
64             sub best_cost_improvement {
65 0     0 0   my $self = shift;
66             # First, we find the cost of the plan with the current cache:
67 0           my $algebra = $self->algebra;
68 0           my $curplanner = AtteanX::QueryPlanner::Cache::LDF->new;
69 0           my $curplan = $curplanner->plan_for_algebra($algebra, $self->model, [$self->graph]);
70 0           my $curcost = $curplanner->cost_for_plan($curplan, $self->model);
71 0           $self->log->trace("Cost of incumbent plan: $curcost");
72 0           my %costs;
73             my %triples;
74 0           my $percentage = 1-($self->improvement_threshold/100);
75 0           my $planner = AtteanX::Query::Cache::Analyzer::QueryPlanner->new;
76 0           foreach my $bgp ($algebra->subpatterns_of_type('Attean::Algebra::BGP')) { # TODO: Parallelize
77 0           foreach my $triple (@{ $bgp->triples }) { # TODO: May need quads
  0            
78 0           my $key = $triple->canonicalize->tuples_string;
79 0 0         next if ($self->model->is_cached($key));
80 0 0         next if ($self->model->ldf_store->count_triples_estimate($triple->values) > $self->max_triples);
81 0           $self->model->try($key);
82 0 0         if ($self->log->is_trace) {
83 0           foreach my $plan ($planner->plans_for_algebra($algebra, $self->model, [$self->graph])) {
84 0           my $cost = $planner->cost_for_plan($plan, $self->model);
85 0           $self->log->trace("Cost $cost for:\n" . $plan->as_string);
86             }
87             }
88 0           my $plan = $planner->plan_for_algebra($algebra, $self->model, [$self->graph]);
89 0           $self->log->debug("Alternative plan after fetching $key:\n" . $plan->as_string);
90 0           $costs{$key} = $planner->cost_for_plan($plan, $self->model);
91 0           $self->log->info("Triple $key has cost $costs{$key}, current $curcost");
92 0 0         if ($costs{$key} < $curcost * $percentage) {
93 0           $triples{$key} = $triple;
94             }
95             }
96             }
97 5     5   28 no sort 'stable';
  5         5  
  5         41  
98 0           my @worthy = map { $triples{$_} } sort {$costs{$a} <=> $costs{$b}} keys(%triples);
  0            
  0            
99 0           return splice(@worthy,0, $self->improvement_top-1);
100             }
101              
102              
103             =item C<< count_patterns >>
104              
105             Loops the triple patterns, checks if any of them have a cached result
106             (TODO) and increments the number of times a certain predicate has been
107             seen in the store. When that number exceeds the C<count_threshold>, a
108             single-element array of L<Attean::TriplePattern>s will be returned.
109              
110              
111             =back
112              
113             =cut
114              
115             sub count_patterns {
116 0     0 1   my $self = shift;
117 0           my $algebra = $self->algebra;
118 0           my @worthy = ();
119             # TODO: Return undef if we can't process the query
120 0           foreach my $bgp ($algebra->subpatterns_of_type('Attean::Algebra::BGP')) {
121 0           foreach my $triple (@{ $bgp->triples }) { # TODO: May need quads
  0            
122 0           my $patternkey = $triple->canonicalize->tuples_string; # This is the key for the triple we process
123 0 0         next if ($self->model->is_cached($patternkey));
124 0 0         next if ($self->model->ldf_store->count_triples_estimate($triple->values) > $self->max_triples);
125 0           my $key = $triple->predicate->as_string; # This is the key for the predicate we count
126             # Update the storage and return the triple pattern
127 0           $self->store->incr($key);
128 0           my $count = $self->store->get($key);
129 0           $self->log->debug("Count for key '$key' in database is $count");
130 0 0         if ($count >= $self->count_threshold) { # TODO: A way to expire counts
131 0           $self->log->info("Triple '$patternkey' has predicate with $count counts");
132 0           push(@worthy, $triple);
133             }
134             }
135             }
136 0           return @worthy;
137             }
138              
139             sub analyze_and_cache {
140 0     0 0   my ($self, @analyzers) = @_;
141 0 0         croak 'No analyzers given to analyze and cache' unless @analyzers;
142 0 0         if ($analyzers[0] eq 'all') {
143 0           @analyzers = ('count_patterns', 'best_cost_improvement');
144             }
145 0           foreach my $analyzer (@analyzers) {
146 0 0         croak "Could not find analyzer method $analyzer" unless $self->can($analyzer);
147             }
148 0           $self->log->info('Running analyzers named ' . join(', ', @analyzers));
149 0           my $retriever = AtteanX::Query::Cache::Retriever->new(model => $self->model); # TODO: Only OK if we don't do query planning
150 0           my $i = 0;
151 0           my %done;
152 0           foreach my $analyzer (@analyzers) {
153 0           foreach my $triple ($self->$analyzer) {
154 0           my $key = $triple->canonicalize->tuples_string;
155 0 0         next if $done{$key}; # Skip if some other analyzer already did fetch
156 0           $self->log->debug('Fetching triple pattern ' . $triple->as_string);
157 0           my $data = $retriever->fetch($triple);
158 0 0         if (defined($data)) {
159 0           $done{$key} = 1;
160 0           $i++;
161             }
162 0           $self->model->cache->set($key, $data);
163             }
164             }
165 0           $self->log->info("Got results from prefetching $i triple patterns");
166 0           return $i;
167             }
168              
169             1;