line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
#$Id$ |
2
|
3
|
|
|
3
|
|
2164
|
use v5.10.1; |
|
3
|
|
|
|
|
12
|
|
3
|
|
|
|
|
|
|
package REST::Neo4p::Batch; |
4
|
3
|
|
|
3
|
|
19
|
use REST::Neo4p::Exceptions; |
|
3
|
|
|
|
|
5
|
|
|
3
|
|
|
|
|
65
|
|
5
|
3
|
|
|
3
|
|
25
|
use JSON::XS; |
|
3
|
|
|
|
|
8
|
|
|
3
|
|
|
|
|
141
|
|
6
|
3
|
|
|
3
|
|
25
|
use REST::Neo4p::ParseStream; |
|
3
|
|
|
|
|
7
|
|
|
3
|
|
|
|
|
173
|
|
7
|
3
|
|
|
3
|
|
19
|
use HOP::Stream qw/drop head/; |
|
3
|
|
|
|
|
5
|
|
|
3
|
|
|
|
|
179
|
|
8
|
|
|
|
|
|
|
require REST::Neo4p; |
9
|
|
|
|
|
|
|
|
10
|
3
|
|
|
3
|
|
20
|
use base qw(Exporter); |
|
3
|
|
|
|
|
6
|
|
|
3
|
|
|
|
|
242
|
|
11
|
3
|
|
|
3
|
|
25
|
use strict; |
|
3
|
|
|
|
|
6
|
|
|
3
|
|
|
|
|
80
|
|
12
|
3
|
|
|
3
|
|
15
|
use warnings; |
|
3
|
|
|
|
|
5
|
|
|
3
|
|
|
|
|
101
|
|
13
|
3
|
|
|
3
|
|
18
|
no warnings qw(once); |
|
3
|
|
|
|
|
4
|
|
|
3
|
|
|
|
|
128
|
|
14
|
|
|
|
|
|
|
|
15
|
|
|
|
|
|
|
BEGIN { |
16
|
3
|
|
|
3
|
|
849
|
$REST::Neo4p::Batch::VERSION = '0.4000'; |
17
|
|
|
|
|
|
|
} |
18
|
|
|
|
|
|
|
|
19
|
|
|
|
|
|
|
our @EXPORT = qw(batch); |
20
|
|
|
|
|
|
|
our @BATCH_ACTIONS = qw(keep_objs discard_objs); |
21
|
|
|
|
|
|
|
our $BUFSIZE = 50000; |
22
|
|
|
|
|
|
|
|
23
|
|
|
|
|
|
|
sub batch (&@) { |
24
|
0
|
|
|
0
|
1
|
|
my ($coderef,$action) = @_; |
25
|
0
|
|
|
|
|
|
my $agent = REST::Neo4p->agent; |
26
|
0
|
0
|
|
|
|
|
if ($agent->is_version_4) { |
27
|
0
|
|
|
|
|
|
REST::Neo4p::NotSuppException->throw("Batch mode not supported on Neo4j server v4.0+"); |
28
|
|
|
|
|
|
|
} |
29
|
0
|
|
|
|
|
|
my @errors; |
30
|
0
|
0
|
|
|
|
|
REST::Neo4p::CommException->throw("Not connected\n") unless REST::Neo4p->connected; |
31
|
0
|
0
|
|
|
|
|
warn 'Agent already in batch_mode on batch() call' if ($agent->batch_mode); |
32
|
0
|
0
|
0
|
|
|
|
REST::Neo4p::LocalException->throw("batch requires argument 'keep_objs' or 'discard_objs'\n") unless ($action && grep(/^$action$/,qw/keep_objs discard_objs/)); |
33
|
0
|
|
|
|
|
|
$agent->batch_mode(1); |
34
|
0
|
|
|
|
|
|
$coderef->(); |
35
|
0
|
|
|
|
|
|
my $tmpfh = $agent->execute_batch_chunk; |
36
|
0
|
|
|
|
|
|
my $jsonr = JSON::XS->new->utf8; |
37
|
0
|
|
|
|
|
|
my $buf; |
38
|
0
|
|
|
|
|
|
$tmpfh->read($buf, $BUFSIZE); |
39
|
0
|
|
|
|
|
|
$jsonr->incr_parse($buf); |
40
|
0
|
|
|
|
|
|
my $res = j_parse($jsonr); |
41
|
0
|
0
|
|
|
|
|
die "j_parse: expecting BATCH stream" unless ($res->[0] eq 'BATCH'); |
42
|
0
|
|
|
|
|
|
my $str = $res->[1]->(); |
43
|
0
|
|
|
|
|
|
while (my $obj = drop($str)) { |
44
|
3
|
|
|
3
|
|
22
|
use experimental qw/smartmatch/; |
|
3
|
|
|
|
|
6
|
|
|
3
|
|
|
|
|
35
|
|
45
|
0
|
|
|
|
|
|
$obj = $obj->[1]; |
46
|
0
|
|
|
|
|
|
given ($obj) { |
47
|
0
|
|
|
|
|
|
when (!!ref($obj)) { |
48
|
0
|
0
|
|
|
|
|
if ($obj->{status} !~ m/^2../) { |
|
|
0
|
|
|
|
|
|
49
|
0
|
0
|
|
|
|
|
warn "Error at id ".$obj->{id}." from ".$obj->{from}.": status ".$obj->{status} if $REST::Neo4p::VERBOSE; |
50
|
|
|
|
|
|
|
push @errors, REST::Neo4p::Neo4jException->new( |
51
|
|
|
|
|
|
|
code=>$obj->{status}, |
52
|
|
|
|
|
|
|
message => 'Server returned '.$obj->{status}.' at job id '.$obj->{id}.' from '.$obj->{from}, neo4j_message=>$obj->{message} |
53
|
0
|
|
|
|
|
|
); |
54
|
|
|
|
|
|
|
} |
55
|
|
|
|
|
|
|
elsif (!$obj->{status}) { |
56
|
0
|
|
|
|
|
|
$obj->{status} = 599; |
57
|
0
|
0
|
|
|
|
|
warn "Error at id ".$obj->{id}." from ".$obj->{from}.": status ".$obj->{status} if $REST::Neo4p::VERBOSE; |
58
|
|
|
|
|
|
|
push @errors, REST::Neo4p::Neo4jException->new( |
59
|
|
|
|
|
|
|
code=>$obj->{status}, |
60
|
|
|
|
|
|
|
message => 'Server returned no status at job id '.$obj->{id}.' from '.$obj->{from}, neo4j_message=>$obj->{message} |
61
|
0
|
|
|
|
|
|
); |
62
|
|
|
|
|
|
|
} |
63
|
|
|
|
|
|
|
else { |
64
|
0
|
0
|
|
|
|
|
_register_object($obj) if $action eq 'keep_objs'; |
65
|
|
|
|
|
|
|
} |
66
|
|
|
|
|
|
|
} |
67
|
0
|
|
|
|
|
|
when ('PENDING') { |
68
|
0
|
|
|
|
|
|
$tmpfh->read($buf,$BUFSIZE); |
69
|
0
|
|
|
|
|
|
$jsonr->incr_parse($buf) |
70
|
|
|
|
|
|
|
} |
71
|
0
|
|
|
|
|
|
when (!defined) { |
72
|
0
|
|
|
|
|
|
last; |
73
|
|
|
|
|
|
|
} |
74
|
0
|
|
|
|
|
|
default { |
75
|
0
|
|
|
|
|
|
die "j_parse: batch response ended prematurely"; |
76
|
|
|
|
|
|
|
} |
77
|
|
|
|
|
|
|
} |
78
|
|
|
|
|
|
|
|
79
|
|
|
|
|
|
|
} |
80
|
0
|
|
|
|
|
|
$agent->batch_mode(undef); |
81
|
0
|
|
|
|
|
|
return @errors; |
82
|
|
|
|
|
|
|
} |
83
|
|
|
|
|
|
|
|
84
|
|
|
|
|
|
|
# create new nodes, relationships as they are encountered |
85
|
|
|
|
|
|
|
# |
86
|
|
|
|
|
|
|
# TODO: handling indexes, queries? Prevent queries in batch mode? |
87
|
|
|
|
|
|
|
# TODO: use JSON streaming from file |
88
|
|
|
|
|
|
|
|
89
|
|
|
|
|
|
|
sub _register_object { |
90
|
0
|
|
|
0
|
|
|
my $decoded_batch_resp = shift; |
91
|
0
|
|
|
|
|
|
my ($id, $from, $body) = @{$decoded_batch_resp}{qw(id from body)}; |
|
0
|
|
|
|
|
|
|
92
|
0
|
0
|
|
|
|
|
return unless $body; |
93
|
0
|
0
|
|
|
|
|
return if ($decoded_batch_resp->{status} !~ m/^2../); # ignore an error here |
94
|
0
|
|
|
|
|
|
my $obj; |
95
|
0
|
0
|
0
|
|
|
|
if ($body->{template}) { |
|
|
0
|
0
|
|
|
|
|
|
|
0
|
0
|
|
|
|
|
|
|
0
|
|
|
|
|
|
96
|
0
|
|
|
|
|
|
$obj = REST::Neo4p::Index->new_from_json_response($body); |
97
|
|
|
|
|
|
|
} |
98
|
|
|
|
|
|
|
elsif ($body->{from} and $body->{from} =~ /properties/) { |
99
|
0
|
|
|
|
|
|
1; # ignore |
100
|
|
|
|
|
|
|
} |
101
|
|
|
|
|
|
|
elsif ($body->{self} and $body->{self} =~ m|node/[0-9]+$|) { |
102
|
0
|
|
|
|
|
|
$obj = REST::Neo4p::Node->new_from_json_response($body); |
103
|
|
|
|
|
|
|
} |
104
|
|
|
|
|
|
|
elsif ($body->{self} and $body->{self} =~ m|relationship/[0-9]+$|) { |
105
|
0
|
|
|
|
|
|
$obj = REST::Neo4p::Relationship->new_from_json_response($body); |
106
|
|
|
|
|
|
|
} |
107
|
|
|
|
|
|
|
else { |
108
|
0
|
0
|
|
|
|
|
warn "Don't understand object in batch response: id ".$id if $REST::Neo4p::VERBOSE; |
109
|
|
|
|
|
|
|
} |
110
|
0
|
0
|
|
|
|
|
if ($obj) { |
111
|
0
|
|
|
|
|
|
my $batch_objs = $REST::Neo4p::Entity::ENTITY_TABLE->{batch_objs}; |
112
|
0
|
0
|
|
|
|
|
if ( my $batch_obj = delete $batch_objs->{ "{$id}" } ) { |
113
|
0
|
|
|
|
|
|
$$batch_obj = $$obj; |
114
|
|
|
|
|
|
|
} |
115
|
|
|
|
|
|
|
} |
116
|
0
|
|
|
|
|
|
return; |
117
|
|
|
|
|
|
|
} |
118
|
|
|
|
|
|
|
|
119
|
|
|
|
|
|
|
=head1 NAME |
120
|
|
|
|
|
|
|
|
121
|
|
|
|
|
|
|
REST::Neo4p::Batch - Mixin for batch processing |
122
|
|
|
|
|
|
|
|
123
|
|
|
|
|
|
|
=head1 SYNOPSIS |
124
|
|
|
|
|
|
|
|
125
|
|
|
|
|
|
|
use REST::Neo4p; |
126
|
|
|
|
|
|
|
use REST::Neo4p::Batch; |
127
|
|
|
|
|
|
|
use List::MoreUtils qw(pairwise); |
128
|
|
|
|
|
|
|
|
129
|
|
|
|
|
|
|
my @bunch = map { "new_node_$_" } (1..100); |
130
|
|
|
|
|
|
|
my @nodes; |
131
|
|
|
|
|
|
|
batch { |
132
|
|
|
|
|
|
|
my $idx = REST::Neo4p::Index->new('node','bunch'); |
133
|
|
|
|
|
|
|
@nodes = map { REST::Neo4p::Node->new({name => $_}) } @bunch; |
134
|
|
|
|
|
|
|
pairwise { $idx->add_entry($a, name => $b) } @nodes, @bunch; |
135
|
|
|
|
|
|
|
$nodes[$_]->relate_to($nodes[$_+1],'next_node') for (0..$#nodes-1); |
136
|
|
|
|
|
|
|
} 'keep_objs'; |
137
|
|
|
|
|
|
|
|
138
|
|
|
|
|
|
|
$idx = REST::Neo4p->get_index_by_name('node','bunch'); |
139
|
|
|
|
|
|
|
($the_99th_node) = $nodes[98]; |
140
|
|
|
|
|
|
|
($points_to_100th_node) = $the_99th_node->get_outgoing_relationships; |
141
|
|
|
|
|
|
|
($the_100th_node) = $idx->find_entries( name => 'new_node_100'); |
142
|
|
|
|
|
|
|
|
143
|
|
|
|
|
|
|
|
144
|
|
|
|
|
|
|
=head1 DESCRIPTION |
145
|
|
|
|
|
|
|
|
146
|
|
|
|
|
|
|
REST::Neo4p::Batch adds some syntactic sugar allowing ordinary |
147
|
|
|
|
|
|
|
REST::Neo4p code to be processed through the Neo4j REST batch API. |
148
|
|
|
|
|
|
|
|
149
|
|
|
|
|
|
|
Batch mode is not supported in Neo4j version 4.0+. The methods in this |
150
|
|
|
|
|
|
|
module will barf. |
151
|
|
|
|
|
|
|
|
152
|
|
|
|
|
|
|
=head1 batch {} ($action) |
153
|
|
|
|
|
|
|
|
154
|
|
|
|
|
|
|
To execute server calls generated by REST::Neo4p code, |
155
|
|
|
|
|
|
|
wrap the code in a batch block: |
156
|
|
|
|
|
|
|
|
157
|
|
|
|
|
|
|
batch { |
158
|
|
|
|
|
|
|
# create and manipulate REST::Neo4p objects |
159
|
|
|
|
|
|
|
} $action; |
160
|
|
|
|
|
|
|
|
161
|
|
|
|
|
|
|
The C<$action> parameter B (there is no default) one of |
162
|
|
|
|
|
|
|
|
163
|
|
|
|
|
|
|
=over |
164
|
|
|
|
|
|
|
|
165
|
|
|
|
|
|
|
=item * 'keep_objs' |
166
|
|
|
|
|
|
|
|
167
|
|
|
|
|
|
|
If C is specified, any nodes, relationships or indexes |
168
|
|
|
|
|
|
|
returned in the server reponse will be created in memory as |
169
|
|
|
|
|
|
|
REST::Neo4p objects. |
170
|
|
|
|
|
|
|
|
171
|
|
|
|
|
|
|
=item * 'discard_objs' |
172
|
|
|
|
|
|
|
|
173
|
|
|
|
|
|
|
If C is specified, Neo4j entities in the server response |
174
|
|
|
|
|
|
|
will not be automatically registered as REST::Neo4p objects. Of |
175
|
|
|
|
|
|
|
course, these objects can be retrieved from the server through object |
176
|
|
|
|
|
|
|
creation and other methods, outside of the batch block. |
177
|
|
|
|
|
|
|
|
178
|
|
|
|
|
|
|
#!perl |
179
|
|
|
|
|
|
|
# loader... |
180
|
|
|
|
|
|
|
use REST::Neo4p; |
181
|
|
|
|
|
|
|
use REST::Neo4p::Batch; |
182
|
|
|
|
|
|
|
|
183
|
|
|
|
|
|
|
open $f, shift() or die $!; |
184
|
|
|
|
|
|
|
batch { |
185
|
|
|
|
|
|
|
while (<$f>) { |
186
|
|
|
|
|
|
|
chomp; |
187
|
|
|
|
|
|
|
($name, $value) = split /\t/; |
188
|
|
|
|
|
|
|
REST::Neo4p::Node->new({name => $name, value => $value}); |
189
|
|
|
|
|
|
|
} 'discard_objs'; |
190
|
|
|
|
|
|
|
exit(0); |
191
|
|
|
|
|
|
|
|
192
|
|
|
|
|
|
|
=back |
193
|
|
|
|
|
|
|
|
194
|
|
|
|
|
|
|
=head2 Errors in batch jobs |
195
|
|
|
|
|
|
|
|
196
|
|
|
|
|
|
|
C returns returns an array of |
197
|
|
|
|
|
|
|
L error objects for each job that returns |
198
|
|
|
|
|
|
|
a server-generated error. If no errors were encountered, it returns |
199
|
|
|
|
|
|
|
undef. |
200
|
|
|
|
|
|
|
|
201
|
|
|
|
|
|
|
foreach ( batch { _do_stuff() } 'discard_objs' ) { |
202
|
|
|
|
|
|
|
print STDERR $_->message, "(", $_->code, ")\n"; |
203
|
|
|
|
|
|
|
} |
204
|
|
|
|
|
|
|
|
205
|
|
|
|
|
|
|
C will C for each error immediately if |
206
|
|
|
|
|
|
|
C<$REST::Neo4p::VERBOSE> is set. |
207
|
|
|
|
|
|
|
|
208
|
|
|
|
|
|
|
=head1 CAVEATS |
209
|
|
|
|
|
|
|
|
210
|
|
|
|
|
|
|
=over |
211
|
|
|
|
|
|
|
|
212
|
|
|
|
|
|
|
=item * |
213
|
|
|
|
|
|
|
|
214
|
|
|
|
|
|
|
No call to the server is made until after the block is executed. There |
215
|
|
|
|
|
|
|
is some magic provided, but not all object functionality is available |
216
|
|
|
|
|
|
|
to REST::Neo4p entities obtained within the C block. |
217
|
|
|
|
|
|
|
|
218
|
|
|
|
|
|
|
For example, this works: |
219
|
|
|
|
|
|
|
|
220
|
|
|
|
|
|
|
my $idx = REST::Neo4p::Index->new('node' => 'pals_of_bob'); |
221
|
|
|
|
|
|
|
my $name = 'fred' |
222
|
|
|
|
|
|
|
batch { |
223
|
|
|
|
|
|
|
my $node = REST::Neo4p::Node->new({name => $name}); |
224
|
|
|
|
|
|
|
$idx->add_entry($node, name => $name); |
225
|
|
|
|
|
|
|
} 'keep_objs'; |
226
|
|
|
|
|
|
|
|
227
|
|
|
|
|
|
|
but this does not: |
228
|
|
|
|
|
|
|
|
229
|
|
|
|
|
|
|
my $idx = REST::Neo4p::Index->new('node' => 'pals_of_bob'); |
230
|
|
|
|
|
|
|
my $name = 'fred'; |
231
|
|
|
|
|
|
|
batch { |
232
|
|
|
|
|
|
|
my $node = REST::Neo4p::Node->new({name => $name}); |
233
|
|
|
|
|
|
|
$idx->add_entry($node, name => $node->get_property('name')); |
234
|
|
|
|
|
|
|
} 'keep_objs'; |
235
|
|
|
|
|
|
|
|
236
|
|
|
|
|
|
|
because $node has not been created on the server at the time that |
237
|
|
|
|
|
|
|
add_entry() is executed, so C fails. |
238
|
|
|
|
|
|
|
|
239
|
|
|
|
|
|
|
=back |
240
|
|
|
|
|
|
|
|
241
|
|
|
|
|
|
|
=head1 SEE ALSO |
242
|
|
|
|
|
|
|
|
243
|
|
|
|
|
|
|
L, L |
244
|
|
|
|
|
|
|
|
245
|
|
|
|
|
|
|
=head1 AUTHOR |
246
|
|
|
|
|
|
|
|
247
|
|
|
|
|
|
|
Mark A. Jensen |
248
|
|
|
|
|
|
|
CPAN ID: MAJENSEN |
249
|
|
|
|
|
|
|
majensen -at- cpan -dot- org |
250
|
|
|
|
|
|
|
|
251
|
|
|
|
|
|
|
=head1 LICENSE |
252
|
|
|
|
|
|
|
|
253
|
|
|
|
|
|
|
Copyright (c) 2012-2020 Mark A. Jensen. This program is free software; you |
254
|
|
|
|
|
|
|
can redistribute it and/or modify it under the same terms as Perl |
255
|
|
|
|
|
|
|
itself. |
256
|
|
|
|
|
|
|
|
257
|
|
|
|
|
|
|
=cut |
258
|
|
|
|
|
|
|
|
259
|
|
|
|
|
|
|
1; |