File Coverage

blib/lib/RDF/Query/Plan/Service.pm
Criterion Covered Total %
statement 31 222 13.9
branch 0 62 0.0
condition 0 10 0.0
subroutine 11 30 36.6
pod 17 17 100.0
total 59 341 17.3


line stmt bran cond sub pod time code
1             # RDF::Query::Plan::Service
2             # -----------------------------------------------------------------------------
3              
4             =head1 NAME
5              
6             RDF::Query::Plan::Service - Executable query plan for remote SPARQL queries.
7              
8             =head1 VERSION
9              
10             This document describes RDF::Query::Plan::Service version 2.915_01.
11              
12             =head1 METHODS
13              
14             Beyond the methods documented below, this class inherits methods from the
15             L<RDF::Query::Plan> class.
16              
17             =over 4
18              
19             =cut
20              
21             package RDF::Query::Plan::Service;
22              
23 35     35   183 use strict;
  35         70  
  35         1000  
24 35     35   178 use warnings;
  35         73  
  35         999  
25 35     35   177 use base qw(RDF::Query::Plan);
  35         66  
  35         2592  
26              
27 35     35   190 use Data::Dumper;
  35         73  
  35         1629  
28 35     35   184 use Scalar::Util qw(blessed refaddr);
  35         66  
  35         1946  
29 35     35   192 use Storable qw(store_fd fd_retrieve);
  35         73  
  35         1846  
30 35     35   196 use URI::Escape;
  35         75  
  35         2203  
31              
32 35     35   197 use RDF::Query::Error qw(:try);
  35         70  
  35         270  
33 35     35   4874 use RDF::Query::ExecutionContext;
  35         76  
  35         879  
34 35     35   178 use RDF::Query::VariableBindings;
  35         78  
  35         1501  
35              
36             ######################################################################
37              
38             our ($VERSION);
39             BEGIN {
40 35     35   82344 $VERSION = '2.915_01';
41             }
42              
43             ######################################################################
44              
45             =item C<< new ( $endpoint, $plan, $silent, $sparql, [ \%logging_keys ] ) >>
46              
47             Returns a new SERVICE (remote endpoint call) query plan object. C<<$endpoint>>
48             is the URL of the endpoint (as a node object). C<<$plan>> is the query plan
49             representing the query to be sent to the remote endpoint (needed for cost
50             estimates). C<<$sparql>> is the serialized SPARQL query to be sent to the remote
51             endpoint. Finally, if present, C<<%logging_keys>> is a HASH containing the keys
52             to use in logging the execution of this plan. Valid HASH keys are:
53              
54             * bf - The bound/free string representing C<<$plan>>
55              
56             =cut
57              
58             sub new {
59 0     0 1   my $class = shift;
60 0           my $endpoint = shift;
61 0           my $plan = shift;
62 0           my $silent = shift;
63 0           my $sparql = shift;
64 0 0         unless ($sparql) {
65 0           throw RDF::Query::Error::MethodInvocationError -text => "SERVICE plan constructor requires a serialized SPARQL query argument";
66             }
67 0           my $keys = {};
68 0           my $self;
69 0 0         if ($endpoint->isa('RDF::Query::Node::Variable')) {
70 0           my $lhs = shift;
71 0 0         Carp::confess "no lhs to binary SERVICE plan" unless ($lhs);
72 0           $self = $class->SUPER::new( $endpoint, $plan, $silent, $sparql, $lhs );
73             } else {
74 0           $self = $class->SUPER::new( $endpoint, $plan, $silent, $sparql );
75             }
76 0           $self->[0]{referenced_variables} = [ $plan->referenced_variables ];
77 0           $self->[0]{logging_keys} = $keys;
78             # if (@_) {
79             # # extra args (like the bound/free stuff for logging
80             # my %args = @_;
81             # @{ $self->[0] }{ keys %args } = values %args;
82             # }
83 0           return $self;
84             }
85              
86             =item C<< new_from_plan ( $endpoint, $plan, $context ) >>
87              
88             Returns a new SERVICE query plan object. C<<$endpoint>> is the URL of the endpoint
89             (as a string). C<<$plan>> is the query plan representing the query to be sent to
90             the remote endpoint. The exact SPARQL serialization that will be used is obtained
91             by getting the originating RDF::Query::Algebra object from C<<$plan>>, and serializing
92             it (with the aid of the RDF::Query::ExecutionContext object C<<$context>>).
93              
94             =cut
95              
96             sub new_from_plan {
97 0     0 1   my $class = shift;
98 0           my $url = shift;
99 0           my $plan = shift;
100 0           my $context = shift;
101 0           my $pattern = $plan->label( 'algebra' );
102 0 0         unless ($pattern->isa('RDF::Query::Algebra::GroupGraphPattern')) {
103 0           $pattern = RDF::Query::Algebra::GroupGraphPattern->new( $pattern );
104             }
105 0           my $ns = $context->ns;
106             my $sparql = join("\n",
107 0 0         (map { sprintf("PREFIX %s: <%s>", ($_ eq '__DEFAULT__' ? '' : $_), $ns->{$_}) } (keys %$ns)),
  0            
108             sprintf("SELECT * WHERE %s", $pattern->as_sparql({namespaces => $ns}, ''))
109             );
110 0           my $service = $class->new( $url, $plan, $sparql, @_ );
111 0           return $service;
112             }
113              
114             =item C<< execute ( $execution_context ) >>
115              
116             =cut
117              
118             sub execute ($) {
119 0     0 1   my $self = shift;
120 0           my $context = shift;
121 0           $self->[0]{delegate} = $context->delegate;
122 0 0         if ($self->state == $self->OPEN) {
123 0           throw RDF::Query::Error::ExecutionError -text => "SERVICE plan can't be executed while already open";
124             }
125 0           my $l = Log::Log4perl->get_logger("rdf.query.plan.service");
126 0           my $endpoint = $self->endpoint;
127            
128 0   0       my $bound = $context->bound || {};
129 0           my $sparql = $self->sparql( $bound );
130 0           my $query = $context->query;
131            
132 0 0         if ($self->lhs) {
133 0           $self->lhs->execute( $context );
134 0           $self->[0]{context} = $context;
135 0           $self->[0]{'open'} = 1;
136 0           $self->[0]{'count'} = 0;
137 0           $self->[0]{logger} = $context->logger;
138 0           $self->state( $self->OPEN );
139             } else {
140             {
141 0           $l->debug('SERVICE execute');
  0            
142 0           my $printable = $sparql;
143 0           $l->debug("SERVICE <$endpoint> pattern: $printable");
144             }
145 0           my $iter = $self->_get_iterator( $context, $endpoint->uri_value, $sparql );
146 0 0         if ($iter) {
147             # $self->[0]{args} = $args;
148             # $self->[0]{fh} = $fh;
149             # $self->[0]{'write'} = $write;
150 0           $self->[0]{iter} = $iter;
151 0           $self->[0]{'open'} = 1;
152 0           $self->[0]{'count'} = 0;
153 0           $self->[0]{logger} = $context->logger;
154 0 0         if (my $log = $self->[0]{logger}) {
155 0           $log->push_value( service_endpoints => $endpoint );
156             }
157 0           $self->state( $self->OPEN );
158             } else {
159 0           warn "no iterator in execute()";
160             }
161             }
162 0           $self;
163             }
164              
165             =item C<< next >>
166              
167             =cut
168              
169             sub next {
170 0     0 1   my $self = shift;
171 0 0         unless ($self->state == $self->OPEN) {
172 0           throw RDF::Query::Error::ExecutionError -text => "next() cannot be called on an un-open SERVICE";
173             }
174 0 0         return undef unless ($self->[0]{'open'});
175 0 0         if ($self->lhs) {
176 0           while (1) {
177 0 0         if ($self->[0]{iter_stash}) {
178 0           while (my $row = $self->[0]{iter_stash}->next) {
179             # warn "got service RHS row: $row\n";
180 0           my $lhs = $self->[0]{lhs_row};
181             # warn "joining:\n\t$row\n\t$lhs\n";
182 0 0         if (my $j = $row->join( $lhs )) {
183 0           return $j;
184             }
185             }
186 0           delete $self->[0]{iter_stash};
187             }
188 0           my $lrow = $self->lhs->next;
189 0 0         unless ($lrow) {
190             # warn "reached end of service LHS iterator\n";
191 0           delete $self->[0]{lhs_row};
192 0           return undef;
193             }
194             # warn "got service LHS row: $lrow\n";
195 0           $self->[0]{lhs_row} = $lrow;
196 0           my $endpoint = $lrow->{ $self->endpoint->name };
197 0 0 0       unless (blessed($endpoint) and $endpoint->isa('RDF::Query::Node::Resource')) {
198 0           throw RDF::Query::Error::ExecutionError -text => "Endpoint variable bound to non-IRI";
199             }
200 0           my $context = $self->[0]{context};
201 0   0       my $bound = $context->bound || {};
202 0           my $sparql = $self->sparql( $bound );
203 0           my $plan = $RDF::Query::Plan::PLAN_CLASSES{'service'}->new( $endpoint, $self->pattern, $self->silent, $sparql );
204             # warn $plan;
205 0           my $iter = $plan->execute( $context );
206 0 0         return undef unless ($iter);
207 0           $self->[0]{iter_stash} = $iter;
208             }
209             } else {
210 0           my $iter = $self->[0]{iter};
211 0           my $result = $iter->next;
212 0 0         return undef unless $result;
213 0           $self->[0]{'count'}++;
214 0           my $row = RDF::Query::VariableBindings->new( $result );
215 0           $row->label( origin => [ $self->endpoint ] );
216 0 0         if (my $d = $self->delegate) {
217 0           $d->log_result( $self, $row );
218             }
219 0           return $row;
220             }
221             }
222              
223             =item C<< close >>
224              
225             =cut
226              
227             sub close {
228 0     0 1   my $self = shift;
229 0 0         unless ($self->state == $self->OPEN) {
230 0           throw RDF::Query::Error::ExecutionError -text => "close() cannot be called on an un-open SERVICE";
231             }
232 0           delete $self->[0]{args};
233 0 0         if (my $log = delete $self->[0]{logger}) {
234 0           my $endpoint = $self->endpoint;
235 0           my $sparql = $self->sparql;
236 0           my $count = $self->[0]{count};
237 0           $log->push_key_value( 'cardinality-service-' . $endpoint, $sparql, $count );
238 0 0         if (my $bf = $self->logging_keys->{ 'bf' }) {
239 0           $log->push_key_value( 'cardinality-bf-service-' . $endpoint, $bf, $count );
240             }
241             }
242 0           delete $self->[0]{count};
243 0           my $fh = delete $self->[0]{fh};
244             # 1 while (<$fh>);
245             # delete $self->[0]{'write'};
246             # delete $self->[0]{'open'};
247 0           $self->SUPER::close();
248             }
249              
250             sub _get_iterator {
251 0     0     my $self = shift;
252 0           my $context = shift;
253 0           my $endpoint = shift;
254 0           my $sparql = shift;
255 0           my $url = $endpoint . '?query=' . uri_escape($sparql);
256 0           my $l = Log::Log4perl->get_logger("rdf.query.plan.service");
257 0           $l->trace( 'SERVICE URL: ' . $url );
258 0           my $query = $context->query;
259            
260 0           my $handler = RDF::Trine::Iterator::SAXHandler->new();
261 0           my $p = XML::SAX::ParserFactory->parser(Handler => $handler);
262            
263            
264             my $ua = ($query)
265             ? $query->useragent
266 0 0         : do {
267 0           my $u = LWP::UserAgent->new( agent => "RDF::Query/${RDF::Query::VERSION}" );
268 0           $u->default_headers->push_header( 'Accept' => "application/sparql-results+xml;q=0.9,application/rdf+xml;q=0.5,text/turtle;q=0.7,text/xml" );
269 0           $u;
270             };
271            
272 0           my $req = HTTP::Request->new('GET', $url );
273 0           my $response = $self->_request( $ua, $req );
274 0 0 0       if (blessed($response) and $response->is_success) {
    0          
275 0           $p->parse_string( $response->content );
276 0           return $handler->iterator;
277             } elsif ($self->silent) {
278 0           my $v = RDF::Query::VariableBindings->new( {} );
279 0           my $iter = RDF::Trine::Iterator::Bindings->new( [ $v ] );
280 0           return $iter;
281             } else {
282 0           my $status = $response->status_line;
283 0           my $sparql = $self->sparql;
284             # warn "url: $url\n";
285 0 0         unless ($self->silent) {
286 0           throw RDF::Query::Error::ExecutionError -text => "*** error making remote SPARQL call to endpoint $endpoint ($status) while making service call for query: $sparql";
287             }
288             }
289             }
290              
291             sub _request {
292 0     0     my $self = shift;
293 0           my $ua = shift;
294 0           my $req = shift;
295 0           my $l = Log::Log4perl->get_logger("rdf.query.plan.service");
296            
297 0 0         if ($ENV{RDFQUERY_THROW_ON_SERVICE}) {
298 0 0         if ($self->silent) {
299 0           return;
300             } else {
301 0           throw RDF::Query::Error::RequestedInterruptError -text => "Won't execute SERVICE block. Unset RDFQUERY_THROW_ON_SERVICE to continue.";
302             }
303             }
304            
305 0           my $response = $ua->request( $req );
306 0           return $response;
307             }
308              
309             # sub _get_and_parse_url {
310             # my $self = shift;
311             # my $context = shift;
312             # my $url = shift;
313             # my $fh = shift;
314             # my $pid = shift;
315             # my $query = $context->query;
316             #
317             # eval "
318             # require XML::SAX::Expat;
319             # require XML::SAX::Expat::Incremental;
320             # ";
321             # if ($@) {
322             # die $@;
323             # }
324             # local($XML::SAX::ParserPackage) = 'XML::SAX::Expat::Incremental';
325             # my $handler = RDF::Trine::Iterator::SAXHandler->new();
326             # my $p = XML::SAX::Expat::Incremental->new( Handler => $handler );
327             # $p->parse_start;
328             #
329             # my $has_head = 0;
330             # my $callback = sub {
331             # my $content = shift;
332             # my $resp = shift;
333             # my $proto = shift;
334             # unless ($resp->is_success) {
335             # throw RDF::Query::Error -text => "SERVICE query couldn't get remote content: " . $resp->status_line;
336             # }
337             # $p->parse_more( $content );
338             #
339             # if (not($has_head) and $handler->has_head) {
340             # my @args = $handler->iterator_args;
341             # if (exists( $args[2]{Handler} )) {
342             # delete $args[2]{Handler};
343             # }
344             # $has_head = 1;
345             # store_fd \@args, $fh or die "PID $pid can't store!\n";
346             # }
347             #
348             # while (my $data = $handler->pull_result) {
349             # store_fd $data, $fh or die "PID $pid can't store!\n";
350             # }
351             # };
352             # my $ua = ($query)
353             # ? $query->useragent
354             # : do {
355             # my $u = LWP::UserAgent->new( agent => "RDF::Query/${RDF::Query::VERSION}" );
356             # $u->default_headers->push_header( 'Accept' => "application/sparql-results+xml;q=0.9,application/rdf+xml;q=0.5,text/turtle;q=0.7,text/xml" );
357             # $u;
358             # };
359             #
360             # $ua->get( $url, ':content_cb' => $callback );
361             # store_fd \undef, $fh or die "can't store end-of-stream";
362             # }
363              
364             =item C<< endpoint >>
365              
366             =cut
367              
368             sub endpoint {
369 0     0 1   my $self = shift;
370 0 0         if (@_) {
371 0           $self->[1] = shift;
372             }
373 0           my $e = $self->[1];
374 0           return $e;
375             }
376              
377             =item C<< sparql >>
378              
379             Returns the SPARQL query (as a string) that will be sent to the remote endpoint.
380              
381             =cut
382              
383             sub sparql {
384 0     0 1   my $self = shift;
385 0           my $sparql = $self->[4];
386 0 0         if (ref($sparql)) {
387 0           $sparql = $sparql->( @_ );
388             }
389 0           return $sparql;
390             }
391              
392             =item C<< lhs >>
393              
394             =cut
395              
396             sub lhs {
397 0     0 1   my $self = shift;
398 0           my $lhs = $self->[5];
399 0           return $lhs;
400             }
401              
402             =item C<< silent >>
403              
404             Returns a boolean value indicating whether the service plan will ignore errors.
405              
406             =cut
407              
408             sub silent {
409 0     0 1   my $self = shift;
410 0           return $self->[3];
411             }
412              
413             =item C<< pattern >>
414              
415             Returns the query plan that will be used in the remote service call.
416              
417             =cut
418              
419             sub pattern {
420 0     0 1   my $self = shift;
421 0           return $self->[2];
422             }
423              
424             =item C<< distinct >>
425              
426             Returns true if the pattern is guaranteed to return distinct results.
427              
428             =cut
429              
430             sub distinct {
431 0     0 1   my $self = shift;
432             # XXX this could be set at construction time, if we want to trust the remote
433             # XXX endpoint to return DISTINCT results (when appropriate).
434 0           return 0;
435             }
436              
437             =item C<< ordered >>
438              
439             Returns true if the pattern is guaranteed to return ordered results.
440              
441             =cut
442              
443             sub ordered {
444 0     0 1   my $self = shift;
445             # XXX this could be set at construction time, if we want to trust the remote
446             # XXX endpoint to return ORDERED results (when appropriate).
447 0           return 0;
448             }
449              
450             =item C<< plan_node_name >>
451              
452             Returns the string name of this plan node, suitable for use in serialization.
453              
454             =cut
455              
456             sub plan_node_name {
457 0     0 1   return 'service';
458             }
459              
460             =item C<< plan_prototype >>
461              
462             Returns a list of scalar identifiers for the type of the content (children)
463             nodes of this plan node. See L<RDF::Query::Plan> for a list of the allowable
464             identifiers.
465              
466             =cut
467              
468             sub plan_prototype {
469 0     0 1   my $self = shift;
470 0           return qw(u s);
471             }
472              
473             =item C<< plan_node_data >>
474              
475             Returns the data for this plan node that corresponds to the values described by
476             the signature returned by C<< plan_prototype >>.
477              
478             =cut
479              
480             sub plan_node_data {
481 0     0 1   my $self = shift;
482 0           my $expr = $self->[2];
483 0           return ($self->endpoint, $self->sparql);
484             }
485              
486             =item C<< graph ( $g ) >>
487              
488             =cut
489              
490             sub graph {
491 0     0 1   my $self = shift;
492 0           my $g = shift;
493 0           $g->add_node( "$self", label => "Service (" . $self->endpoint->value . ")" . $self->graph_labels );
494 0           $g->add_node( "${self}-sparql", label => $self->sparql );
495 0           $g->add_edge( "$self" => "${self}-sparql" );
496 0           return "$self";
497             }
498              
499             =item C<< explain >>
500              
501             Returns a string serialization of the query plan appropriate for display
502             on the command line.
503              
504             =cut
505              
506             sub explain {
507 0     0 1   my $self = shift;
508 0           my ($s, $count) = (' ', 0);
509 0 0         if (@_) {
510 0           $s = shift;
511 0           $count = shift;
512             }
513 0           my $indent = '' . ($s x $count);
514 0           my $type = $self->plan_node_name;
515 0           my $sparql = $self->sparql;
516 0           $sparql =~ s/\n/\n${indent}${s}${s}/g;
517 0           my $string = sprintf("%s%s (0x%x)\n", $indent, $type, refaddr($self))
518             . "${indent}${s}" . $self->endpoint->as_string . "\n"
519             . "${indent}${s}${sparql}\n";
520 0           return $string;
521             }
522              
523             1;
524              
525             __END__
526              
527             =back
528              
529             =head1 AUTHOR
530              
531             Gregory Todd Williams <gwilliams@cpan.org>
532              
533             =cut