File Coverage

lib/Neo4j/Driver/Net/HTTP.pm
Criterion Covered Total %
statement 123 125 98.4
branch 35 44 79.5
condition 26 31 83.8
subroutine 21 21 100.0
pod 0 1 100.0
total 205 222 92.7


line stmt bran cond sub pod time code
1 20     20   261 use v5.12;
  20         67  
2 20     20   148 use warnings;
  20         35  
  20         1406  
3              
4             package Neo4j::Driver::Net::HTTP 1.02;
5             # ABSTRACT: Network controller for Neo4j HTTP
6              
7              
8             # This package is not part of the public Neo4j::Driver API.
9              
10              
11 20     20   106 use Carp qw(croak);
  20         28  
  20         1714  
12             our @CARP_NOT = qw(Neo4j::Driver::Transaction Neo4j::Driver::Transaction::HTTP);
13              
14 20     20   10996 use Time::Piece 1.20 qw();
  20         250267  
  20         884  
15 20     20   155 use URI 1.31;
  20         1123  
  20         547  
16              
17 20     20   8214 use Neo4j::Driver::Net::HTTP::Tiny;
  20         84  
  20         827  
18 20     20   10709 use Neo4j::Driver::Result::Jolt;
  20         72  
  20         928  
19 20     20   11200 use Neo4j::Driver::Result::JSON;
  20         67  
  20         886  
20 20     20   9213 use Neo4j::Driver::Result::Text;
  20         58  
  20         860  
21 20     20   150 use Neo4j::Driver::ServerInfo;
  20         36  
  20         43363  
22              
23              
24             my $DISCOVERY_ENDPOINT = '/';
25             my $COMMIT_ENDPOINT = 'commit';
26              
27             my @RESULT_MODULES = qw( Neo4j::Driver::Result::Jolt Neo4j::Driver::Result::JSON );
28             my $RESULT_FALLBACK = 'Neo4j::Driver::Result::Text';
29              
30             my $RFC5322_DATE = '%a, %d %b %Y %H:%M:%S %z'; # strftime(3)
31              
32              
33             sub new {
34             # uncoverable pod
35 173     173 0 408 my ($class, $driver) = @_;
36            
37             $driver->{events}->{default_handlers}->{http_adapter_factory} //= sub {
38 7   50 7   47 my $net_module = $driver->{config}->{net_module} || 'Neo4j::Driver::Net::HTTP::Tiny';
39 7         59 return $net_module->new($driver);
40 173   66     1320 };
41 173         762 my $http_adapter = $driver->{events}->trigger('http_adapter_factory', $driver);
42            
43             my $self = bless {
44             events => $driver->{events},
45             server_info => $driver->{server_info},
46             http_agent => $http_adapter,
47             want_jolt => $driver->{config}->{jolt},
48 170   100     2585 want_concurrent => $driver->config('concurrent_tx') // 0,
49             active_tx => {},
50             }, $class;
51            
52 170         1016 return $self;
53             }
54              
55              
56             # Use Neo4j Discovery API to obtain both ServerInfo and the
57             # transaction endpoint templates.
58             sub _server {
59 78     78   262 my ($self) = @_;
60            
61 78         144 my ($neo4j_version, $tx_endpoint);
62 78         443 my @discovery_queue = ($DISCOVERY_ENDPOINT);
63 78         280 while (@discovery_queue) {
64 78         387 my $events = $self->{events};
65             my $tx = {
66 2     2   7 error_handler => sub { $events->trigger(error => shift) },
67 78         513 transaction_endpoint => shift @discovery_queue,
68             };
69 78         317 my $service = $self->_request($tx, 'GET')->_json;
70            
71 76         349 $neo4j_version = $service->{neo4j_version};
72 76         134 $tx_endpoint = $service->{transaction};
73 76 50 33     705 last if $neo4j_version && $tx_endpoint;
74            
75             # a different discovery endpoint existed in Neo4j < 4.0
76 0 0       0 if ($service->{data}) {
77 0         0 push @discovery_queue, URI->new( $service->{data} )->path;
78             }
79             }
80            
81 76 50       204 croak "Neo4j server not found (ServerInfo discovery failed)" unless $neo4j_version;
82            
83 76         334 my $date = $self->{http_agent}->date_header;
84 76         1324 $date =~ s/ GMT$/ +0000/;
85 76 50       679 $date = $date ? Time::Piece->strptime($date, $RFC5322_DATE) : Time::Piece->new;
86            
87             $self->{server_info} = Neo4j::Driver::ServerInfo->new({
88             uri => $self->{http_agent}->uri,
89 76         8014 version => "Neo4j/$neo4j_version",
90             time_diff => Time::Piece->new - $date,
91             tx_endpoint => $tx_endpoint,
92             });
93            
94 76         894 return $self->{server_info};
95             }
96              
97              
98             # Update requested database name based on transaction endpoint templates.
99             sub _set_database {
100 163     163   405 my ($self, $database) = @_;
101            
102 163         350 my $tx_endpoint = $self->{server_info}->{tx_endpoint};
103             $self->{endpoints} = {
104 163 50       1099 new_transaction => "$tx_endpoint",
105             new_commit => "$tx_endpoint/$COMMIT_ENDPOINT",
106             } if $tx_endpoint;
107            
108 163 100       447 return unless defined $database;
109 160         605 $database = URI::Escape::uri_escape_utf8 $database;
110 160         5603 $self->{endpoints}->{new_transaction} =~ s/\{databaseName}/$database/;
111 160         586 $self->{endpoints}->{new_commit} =~ s/\{databaseName}/$database/;
112             }
113              
114              
115             # Send queries to the Neo4j server and return a list of all results.
116             sub _run {
117 269     269   724 my ($self, $tx, @queries) = @_;
118            
119 269 100       763 if ( ! $self->{want_concurrent} ) {
120             # A new HTTP tx has no commit endpoint until after the first result is received.
121 256   100     390 my $is_concurrent = keys %{$self->{active_tx}} && ! defined $tx->{commit_endpoint};
122 256 100       776 $is_concurrent and croak "Concurrent transactions for HTTP are disabled; use multiple sessions or enable the concurrent_tx config option";
123             }
124            
125 265         745 my $json = { statements => \@queries };
126 265         1001 return $self->_request($tx, 'POST', $json)->_results;
127             }
128              
129              
130             # Determine the Accept HTTP header that is appropriate for the specified
131             # request method. Accept headers are cached in $self->{accept_for}.
132             sub _accept_for {
133 214     214   438 my ($self, $method) = @_;
134            
135             $self->{want_jolt} = 'v1' if ! defined $self->{want_jolt}
136 214 100 100     1488 && $self->{server_info} && $self->{server_info}->{version} =~ m{^Neo4j/4\.[234]\.};
      100        
137            
138             # GET requests may fail if Neo4j sees clients that support Jolt, see neo4j #12644
139 214         631 my @modules = @RESULT_MODULES;
140 214 100       1665 unshift @modules, $self->{http_agent}->result_handlers if $self->{http_agent}->can('result_handlers');
141 214         593 my @accept = map { $_->_accept_header( $self->{want_jolt}, $method ) } @modules;
  428         2418  
142 214         2138 return $self->{accept_for}->{$method} = join ', ', @accept;
143             }
144              
145              
146             # Determine a result handler module that is appropriate for the specified
147             # media type. Result handlers are cached in $self->{result_module_for}.
148             sub _result_module_for {
149 204     204   554 my ($self, $content_type) = @_;
150            
151 204         534 my @modules = @RESULT_MODULES;
152 204 100       1019 unshift @modules, $self->{http_agent}->result_handlers if $self->{http_agent}->can('result_handlers');
153 204         558 foreach my $module (@modules) {
154 328 100       2881 if ($module->_acceptable($content_type)) {
155 193         1265 return $self->{result_module_for}->{$content_type} = $module;
156             }
157             }
158 11         48 return $RESULT_FALLBACK;
159             }
160              
161              
162             # Send a HTTP request to the Neo4j server and return a representation
163             # of the response.
164             sub _request {
165 352     352   927 my ($self, $tx, $method, $json) = @_;
166            
167 352 100       972 if (! defined $tx->{transaction_endpoint}) {
168 36         242 $tx->{transaction_endpoint} = URI->new( $self->{endpoints}->{new_transaction} )->path;
169             }
170 352         5282 my $tx_endpoint = "$tx->{transaction_endpoint}";
171 352   100     1615 my $accept = $self->{accept_for}->{$method}
172             // $self->_accept_for($method);
173            
174 352         2221 $self->{http_agent}->request($method, $tx_endpoint, $json, $accept, $tx->{mode});
175            
176 352         80231 my $header = $self->{http_agent}->http_header;
177             my $result_module = $self->{result_module_for}->{ $header->{content_type} }
178 352   66     856530 // $self->_result_module_for( $header->{content_type} );
179            
180             my $result = $result_module->new({
181             http_agent => $self->{http_agent},
182             http_method => $method,
183             http_path => $tx_endpoint,
184             http_header => $header,
185             server_info => $self->{server_info},
186 352 100       5378 queries => $json ? $json->{statements} : [],
187             });
188            
189 350         1937 my $info = $result->_info;
190 350         1401 $self->_parse_tx_status($tx, $header, $info);
191 350 100       1132 $tx->{error_handler}->($info->{_error}) if $info->{_error};
192 324         2109 return $result;
193             }
194              
195              
196             # Update list of active transactions and update transaction endpoints.
197             sub _parse_tx_status {
198 350     350   894 my ($self, $tx, $header, $info) = @_;
199            
200             # In case of errors, HTTP transaction status info is only reliable for
201             # server errors that aren't reported as network errors. (neo4j #12651)
202 350 100       1147 if (my $error = $info->{_error}) {
203 27 100       104 return if $error->source ne 'Server';
204 14 50       66 do { return if $error->source eq 'Network' } while $error = $error->related;
  14         36  
205             }
206            
207 337         843 $tx->{unused} = 0;
208 337   100     1306 $tx->{closed} = ! $info->{commit} || ! $info->{transaction};
209            
210 337 100       858 if ( $tx->{closed} ) {
211 294         634 my $old_endpoint = $tx->{transaction_endpoint};
212 294         2792 $old_endpoint =~ s|/$COMMIT_ENDPOINT$||; # both endpoints may be set to /commit (for autocommit), so we need to remove that here
213 294         760 delete $self->{active_tx}->{ $old_endpoint };
214 294         637 return;
215             }
216 43 100 100     231 if ( $header->{location} && $header->{status} eq '201' ) { # Created
217 34         279 my $new_commit = URI->new( $info->{commit} )->path_query;
218 34         4452 my $new_endpoint = URI->new( $header->{location} )->path_query;
219 34         3732 $tx->{commit_endpoint} = $new_commit;
220 34         87 $tx->{transaction_endpoint} = $new_endpoint;
221             }
222 43 50       167 if ( my $expires = $info->{transaction}->{expires} ) {
223 43         148 $expires =~ s/ GMT$/ +0000/;
224 43         336 $expires = Time::Piece->strptime($expires, $RFC5322_DATE) + $self->{server_info}->{time_diff};
225 43         10015 $self->{active_tx}->{ $tx->{transaction_endpoint} } = $expires;
226             }
227             }
228              
229              
230             # Query list of active transactions, removing expired ones.
231             sub _is_active_tx {
232 36     36   84 my ($self, $tx) = @_;
233            
234 36         162 my $now = Time::Piece->new;
235 36         2919 foreach my $tx_key ( keys %{$self->{active_tx}} ) {
  36         156  
236 39         254 my $expires = $self->{active_tx}->{$tx_key};
237 39 50       192 delete $self->{active_tx}->{$tx_key} if $now > $expires;
238             }
239            
240 36         1772 my $tx_endpoint = $tx->{transaction_endpoint};
241 36         505 $tx_endpoint =~ s|/$COMMIT_ENDPOINT$||; # for tx in the (auto)commit state, both endpoints are set to commit
242 36         234 return exists $self->{active_tx}->{ $tx_endpoint };
243             }
244              
245              
246             1;