| line | stmt | bran | cond | sub | pod | time | code | 
| 1 |  |  |  |  |  |  | package AtteanX::Query::Cache::Analyzer; | 
| 2 |  |  |  |  |  |  |  | 
| 3 | 5 |  |  | 5 |  | 5531718 | use 5.010001; | 
|  | 5 |  |  |  |  | 12 |  | 
| 4 | 5 |  |  | 5 |  | 17 | use strict; | 
|  | 5 |  |  |  |  | 7 |  | 
|  | 5 |  |  |  |  | 90 |  | 
| 5 | 5 |  |  | 5 |  | 16 | use warnings; | 
|  | 5 |  |  |  |  | 7 |  | 
|  | 5 |  |  |  |  | 202 |  | 
| 6 |  |  |  |  |  |  |  | 
| 7 |  |  |  |  |  |  | our $AUTHORITY = 'cpan:KJETILK'; | 
| 8 |  |  |  |  |  |  | our $VERSION   = '0.002'; | 
| 9 |  |  |  |  |  |  |  | 
| 10 | 5 |  |  | 5 |  | 18 | use Moo; | 
|  | 5 |  |  |  |  | 5 |  | 
|  | 5 |  |  |  |  | 33 |  | 
| 11 | 5 |  |  | 5 |  | 1146 | use Attean::RDF qw(triplepattern variable iri); | 
|  | 5 |  |  |  |  | 5 |  | 
|  | 5 |  |  |  |  | 267 |  | 
| 12 | 5 |  |  | 5 |  | 20 | use Types::Standard qw(Str Int InstanceOf ConsumerOf); | 
|  | 5 |  |  |  |  | 7 |  | 
|  | 5 |  |  |  |  | 38 |  | 
| 13 | 5 |  |  | 5 |  | 2934 | use Types::URI -all; | 
|  | 5 |  |  |  |  | 9 |  | 
|  | 5 |  |  |  |  | 71 |  | 
| 14 | 5 |  |  | 5 |  | 10902 | use AtteanX::Parser::SPARQL; | 
|  | 5 |  |  |  |  | 218003 |  | 
|  | 5 |  |  |  |  | 253 |  | 
| 15 | 5 |  |  | 5 |  | 2755 | use AtteanX::Query::Cache::Analyzer::Model; | 
|  | 5 |  |  |  |  | 12 |  | 
|  | 5 |  |  |  |  | 165 |  | 
| 16 | 5 |  |  | 5 |  | 2155 | use AtteanX::QueryPlanner::Cache; | 
|  | 5 |  |  |  |  | 12 |  | 
|  | 5 |  |  |  |  | 158 |  | 
| 17 | 5 |  |  | 5 |  | 1767 | use AtteanX::Query::Cache::Analyzer::QueryPlanner; | 
|  | 5 |  |  |  |  | 12 |  | 
|  | 5 |  |  |  |  | 139 |  | 
| 18 | 5 |  |  | 5 |  | 1976 | use AtteanX::Query::Cache::Retriever; | 
|  | 5 |  |  |  |  | 8 |  | 
|  | 5 |  |  |  |  | 133 |  | 
| 19 |  |  |  |  |  |  |  | 
| 20 | 5 |  |  | 5 |  | 26 | use Carp; | 
|  | 5 |  |  |  |  | 8 |  | 
|  | 5 |  |  |  |  | 2175 |  | 
| 21 |  |  |  |  |  |  |  | 
| 22 |  |  |  |  |  |  | has 'query' => (is => 'ro', required => 1, isa => Str); | 
| 23 |  |  |  |  |  |  | has 'algebra' => (is => 'ro', isa => ConsumerOf['Attean::API::Algebra'], builder => '_parse_query', lazy => 1); | 
| 24 |  |  |  |  |  |  | has 'base_uri' => (is => 'ro', default => 'http://default.invalid/'); | 
| 25 |  |  |  |  |  |  |  | 
| 26 |  |  |  |  |  |  | has 'model' => (is => 'ro', isa => InstanceOf['AtteanX::Query::Cache::Analyzer::Model'], required => 1); | 
| 27 |  |  |  |  |  |  |  | 
| 28 |  |  |  |  |  |  | has 'graph' => (is => 'ro', isa => InstanceOf['Attean::IRI'], default => sub { return iri('http://example.invalid')}); | 
| 29 |  |  |  |  |  |  |  | 
| 30 |  |  |  |  |  |  | has 'improvement_threshold' => (is => 'ro', isa => Int, default => '10'); | 
| 31 |  |  |  |  |  |  | has 'improvement_top' => (is => 'ro', isa => Int, default => '3'); | 
| 32 |  |  |  |  |  |  |  | 
| 33 |  |  |  |  |  |  | has 'count_threshold' => (is => 'ro', isa => Int, default => '3'); | 
| 34 |  |  |  |  |  |  |  | 
| 35 |  |  |  |  |  |  | has 'max_triples' => (is => 'ro', isa => Int, default => sub { return $ENV{'LDF_MAX_TRIPLES'} || 100000 }); | 
| 36 |  |  |  |  |  |  |  | 
| 37 |  |  |  |  |  |  | with 'MooX::Log::Any'; | 
| 38 |  |  |  |  |  |  |  | 
| 39 |  |  |  |  |  |  | =pod | 
| 40 |  |  |  |  |  |  |  | 
| 41 |  |  |  |  |  |  | =over | 
| 42 |  |  |  |  |  |  |  | 
| 43 |  |  |  |  |  |  | =item C<< store >> | 
| 44 |  |  |  |  |  |  |  | 
| 45 |  |  |  |  |  |  | A L<Redis> object. This has two purposes: First, to store any | 
| 46 |  |  |  |  |  |  | data the analyzer needs to persist to decide when to prefetch. Second, | 
| 47 |  |  |  |  |  |  | it uses Redis' publish-subscribe system to publish the URLs containing | 
| 48 |  |  |  |  |  |  | queries that the prefetcher should fetch. | 
| 49 |  |  |  |  |  |  |  | 
| 50 |  |  |  |  |  |  | =cut | 
| 51 |  |  |  |  |  |  |  | 
| 52 |  |  |  |  |  |  | has store => (is => 'ro', | 
| 53 |  |  |  |  |  |  | isa => InstanceOf['Redis'], | 
| 54 |  |  |  |  |  |  | required => 1 | 
| 55 |  |  |  |  |  |  | ); | 
| 56 |  |  |  |  |  |  |  | 
| 57 |  |  |  |  |  |  | sub _parse_query { | 
| 58 | 0 |  |  | 0 |  |  | my $self = shift; | 
| 59 | 0 |  |  |  |  |  | my $parser = AtteanX::Parser::SPARQL->new(); | 
| 60 | 0 |  |  |  |  |  | my ($algebra) = $parser->parse_list_from_bytes($self->query, $self->base_uri); # TODO: this is a bit of cargocult | 
| 61 | 0 |  |  |  |  |  | return $algebra; | 
| 62 |  |  |  |  |  |  | } | 
| 63 |  |  |  |  |  |  |  | 
| 64 |  |  |  |  |  |  | sub best_cost_improvement { | 
| 65 | 0 |  |  | 0 | 0 |  | my $self = shift; | 
| 66 |  |  |  |  |  |  | # First, we find the cost of the plan with the current cache: | 
| 67 | 0 |  |  |  |  |  | my $algebra = $self->algebra; | 
| 68 | 0 |  |  |  |  |  | my $curplanner = AtteanX::QueryPlanner::Cache::LDF->new; | 
| 69 | 0 |  |  |  |  |  | my $curplan = $curplanner->plan_for_algebra($algebra, $self->model, [$self->graph]); | 
| 70 | 0 |  |  |  |  |  | my $curcost = $curplanner->cost_for_plan($curplan, $self->model); | 
| 71 | 0 |  |  |  |  |  | $self->log->trace("Cost of incumbent plan: $curcost"); | 
| 72 | 0 |  |  |  |  |  | my %costs; | 
| 73 |  |  |  |  |  |  | my %triples; | 
| 74 | 0 |  |  |  |  |  | my $percentage = 1-($self->improvement_threshold/100); | 
| 75 | 0 |  |  |  |  |  | my $planner = AtteanX::Query::Cache::Analyzer::QueryPlanner->new; | 
| 76 | 0 |  |  |  |  |  | foreach my $bgp ($algebra->subpatterns_of_type('Attean::Algebra::BGP')) { # TODO: Parallelize | 
| 77 | 0 |  |  |  |  |  | foreach my $triple (@{ $bgp->triples }) { # TODO: May need quads | 
|  | 0 |  |  |  |  |  |  | 
| 78 | 0 |  |  |  |  |  | my $key = $triple->canonicalize->tuples_string; | 
| 79 | 0 | 0 |  |  |  |  | next if ($self->model->is_cached($key)); | 
| 80 | 0 | 0 |  |  |  |  | next if ($self->model->ldf_store->count_triples_estimate($triple->values) > $self->max_triples); | 
| 81 | 0 |  |  |  |  |  | $self->model->try($key); | 
| 82 | 0 | 0 |  |  |  |  | if ($self->log->is_trace) { | 
| 83 | 0 |  |  |  |  |  | foreach my $plan ($planner->plans_for_algebra($algebra, $self->model, [$self->graph])) { | 
| 84 | 0 |  |  |  |  |  | my $cost = $planner->cost_for_plan($plan, $self->model); | 
| 85 | 0 |  |  |  |  |  | $self->log->trace("Cost $cost for:\n" . $plan->as_string); | 
| 86 |  |  |  |  |  |  | } | 
| 87 |  |  |  |  |  |  | } | 
| 88 | 0 |  |  |  |  |  | my $plan = $planner->plan_for_algebra($algebra, $self->model, [$self->graph]); | 
| 89 | 0 |  |  |  |  |  | $self->log->debug("Alternative plan after fetching $key:\n" . $plan->as_string); | 
| 90 | 0 |  |  |  |  |  | $costs{$key} = $planner->cost_for_plan($plan, $self->model); | 
| 91 | 0 |  |  |  |  |  | $self->log->info("Triple $key has cost $costs{$key}, current $curcost"); | 
| 92 | 0 | 0 |  |  |  |  | if ($costs{$key} < $curcost * $percentage) { | 
| 93 | 0 |  |  |  |  |  | $triples{$key} = $triple; | 
| 94 |  |  |  |  |  |  | } | 
| 95 |  |  |  |  |  |  | } | 
| 96 |  |  |  |  |  |  | } | 
| 97 | 5 |  |  | 5 |  | 24 | no sort 'stable'; | 
|  | 5 |  |  |  |  | 7 |  | 
|  | 5 |  |  |  |  | 40 |  | 
| 98 | 0 |  |  |  |  |  | my @worthy = map { $triples{$_} } sort {$costs{$a} <=> $costs{$b}} keys(%triples); | 
|  | 0 |  |  |  |  |  |  | 
|  | 0 |  |  |  |  |  |  | 
| 99 | 0 |  |  |  |  |  | return splice(@worthy,0, $self->improvement_top-1); | 
| 100 |  |  |  |  |  |  | } | 
| 101 |  |  |  |  |  |  |  | 
| 102 |  |  |  |  |  |  |  | 
| 103 |  |  |  |  |  |  | =item C<< count_patterns >> | 
| 104 |  |  |  |  |  |  |  | 
| 105 |  |  |  |  |  |  | Loops the triple patterns, checks if any of them have a cached result | 
| 106 |  |  |  |  |  |  | (TODO) and increments the number of times a certain predicate has been | 
| 107 |  |  |  |  |  |  | seen in the store. When that number exceeds the C<count_threshold>, a | 
| 108 |  |  |  |  |  |  | single-element array of L<Attean::TriplePattern>s will be returned. | 
| 109 |  |  |  |  |  |  |  | 
| 110 |  |  |  |  |  |  |  | 
| 111 |  |  |  |  |  |  | =back | 
| 112 |  |  |  |  |  |  |  | 
| 113 |  |  |  |  |  |  | =cut | 
| 114 |  |  |  |  |  |  |  | 
| 115 |  |  |  |  |  |  | sub count_patterns { | 
| 116 | 0 |  |  | 0 | 1 |  | my $self = shift; | 
| 117 | 0 |  |  |  |  |  | my $algebra = $self->algebra; | 
| 118 | 0 |  |  |  |  |  | my @worthy = (); | 
| 119 |  |  |  |  |  |  | # TODO: Return undef if we can't process the query | 
| 120 | 0 |  |  |  |  |  | foreach my $bgp ($algebra->subpatterns_of_type('Attean::Algebra::BGP')) { | 
| 121 | 0 |  |  |  |  |  | foreach my $triple (@{ $bgp->triples }) { # TODO: May need quads | 
|  | 0 |  |  |  |  |  |  | 
| 122 | 0 |  |  |  |  |  | my $patternkey = $triple->canonicalize->tuples_string; # This is the key for the triple we process | 
| 123 | 0 | 0 |  |  |  |  | next if ($self->model->is_cached($patternkey)); | 
| 124 | 0 | 0 |  |  |  |  | next if ($self->model->ldf_store->count_triples_estimate($triple->values) > $self->max_triples); | 
| 125 | 0 |  |  |  |  |  | my $key = $triple->predicate->as_string; # This is the key for the predicate we count | 
| 126 |  |  |  |  |  |  | # Update the storage and return the triple pattern | 
| 127 | 0 |  |  |  |  |  | $self->store->incr($key); | 
| 128 | 0 |  |  |  |  |  | my $count = $self->store->get($key); | 
| 129 | 0 |  |  |  |  |  | $self->log->debug("Count for key '$key' in database is $count"); | 
| 130 | 0 | 0 |  |  |  |  | if ($count >= $self->count_threshold) { # TODO: A way to expire counts | 
| 131 | 0 |  |  |  |  |  | $self->log->info("Triple '$patternkey' has predicate with $count counts"); | 
| 132 | 0 |  |  |  |  |  | push(@worthy, $triple); | 
| 133 |  |  |  |  |  |  | } | 
| 134 |  |  |  |  |  |  | } | 
| 135 |  |  |  |  |  |  | } | 
| 136 | 0 |  |  |  |  |  | return @worthy; | 
| 137 |  |  |  |  |  |  | } | 
| 138 |  |  |  |  |  |  |  | 
| 139 |  |  |  |  |  |  | sub analyze_and_cache { | 
| 140 | 0 |  |  | 0 | 0 |  | my ($self, @analyzers) = @_; | 
| 141 | 0 | 0 |  |  |  |  | croak 'No analyzers given to analyze and cache' unless @analyzers; | 
| 142 | 0 | 0 |  |  |  |  | if ($analyzers[0] eq 'all') { | 
| 143 | 0 |  |  |  |  |  | @analyzers = ('count_patterns', 'best_cost_improvement'); | 
| 144 |  |  |  |  |  |  | } | 
| 145 | 0 |  |  |  |  |  | foreach my $analyzer (@analyzers) { | 
| 146 | 0 | 0 |  |  |  |  | croak "Could not find analyzer method $analyzer" unless $self->can($analyzer); | 
| 147 |  |  |  |  |  |  | } | 
| 148 | 0 |  |  |  |  |  | $self->log->info('Running analyzers named ' . join(', ', @analyzers)); | 
| 149 | 0 |  |  |  |  |  | my $retriever = AtteanX::Query::Cache::Retriever->new(model => $self->model); # TODO: Only OK if we don't do query planning | 
| 150 | 0 |  |  |  |  |  | my $i = 0; | 
| 151 | 0 |  |  |  |  |  | my %done; | 
| 152 | 0 |  |  |  |  |  | foreach my $analyzer (@analyzers) { | 
| 153 | 0 |  |  |  |  |  | foreach my $triple ($self->$analyzer) { | 
| 154 | 0 |  |  |  |  |  | my $key = $triple->canonicalize->tuples_string; | 
| 155 | 0 | 0 |  |  |  |  | next if $done{$key}; # Skip if some other analyzer already did fetch | 
| 156 | 0 |  |  |  |  |  | $self->log->debug('Fetching triple pattern ' . $triple->as_string); | 
| 157 | 0 |  |  |  |  |  | my $data = $retriever->fetch($triple); | 
| 158 | 0 | 0 |  |  |  |  | if (defined($data)) { | 
| 159 | 0 |  |  |  |  |  | $done{$key} = 1; | 
| 160 | 0 |  |  |  |  |  | $i++; | 
| 161 |  |  |  |  |  |  | } | 
| 162 | 0 |  |  |  |  |  | $self->model->cache->set($key, $data); | 
| 163 |  |  |  |  |  |  | } | 
| 164 |  |  |  |  |  |  | } | 
| 165 | 0 |  |  |  |  |  | $self->log->info("Got results from prefetching $i triple patterns"); | 
| 166 | 0 |  |  |  |  |  | return $i; | 
| 167 |  |  |  |  |  |  | } | 
| 168 |  |  |  |  |  |  |  | 
| 169 |  |  |  |  |  |  | 1; |