File Coverage

blib/lib/REST/Neo4p/ParseStream.pm
Criterion Covered Total %
statement 135 149 90.6
branch 42 62 67.7
condition 12 15 80.0
subroutine 14 14 100.0
pod 0 3 0.0
total 203 243 83.5


line stmt bran cond sub pod time code
1 38     38   71847 use v5.10.1;
  38         141  
2             package REST::Neo4p::ParseStream;
3 38     38   240 use base Exporter;
  38         72  
  38         3000  
4 38     38   21302 use HOP::Stream qw/node promise/;
  38         64164  
  38         2909  
5              
6 38     38   290 use strict;
  38         79  
  38         776  
7 38     38   178 use warnings;
  38         86  
  38         1282  
8              
9             BEGIN {
10 38     38   5656 $REST::Neo4p::ParseStream::VERSION = '0.4003';
11             }
12              
13             our @EXPORT = qw/j_parse/;# j_parse_object j_parse_array /;
14              
15             # lazy linked lists
16             # qry response in txn - "data":[{ "row" : [...]},...]
17             # qry response for qry - "data":[ [{...},...],... ]
18             # so - for txn response, j_parse_query_response returns hashes,
19             # for qry response, j_parse_query_response returns arrays
20              
21             sub j_parse {
22 15     15 0 8032 my $j = shift;
23 15         30 state $state; # undef == first call?
24 15 100       485 if ($j->incr_text =~ s/^\s*\[\s*//) {
    50          
    0          
25             # batch (simple array of objects)
26 1         6 return ['BATCH', j_parse_array($j)];
27             }
28             elsif ($j->incr_text =~ s/^\s*{\s*//) {
29             # object
30 14         31 my $type;
31 38     38   1462 use experimental 'smartmatch';
  38         7927  
  38         278  
32 14         39 given ($j->incr_text) {
33 14         62 when (/^\s*"commit"/i) {
34 8         27 $type = 'TXN';
35             }
36 6         20 when (/^\s*"columns"/i) {
37 5         14 $type = 'QUERY';
38             }
39 1         3 default {
40 1         4 $type = 'OBJECT';
41             }
42             }
43 14         45 return [$type, j_parse_object($j)]
44             }
45             elsif ($j->incr_text =~ m/^\s*"[a-zA-Z_]+"/) {
46             # after a stream, next key of object
47 0         0 return ['OBJECT', j_parse_object($j)];
48             }
49             else {
50             # problem
51 0         0 return;
52             }
53             }
54              
55             # generic parse array stream
56             # return decoded json entities at the top level
57             # opening [ must be removed
58             # handle empty array too
59             sub j_parse_array {
60 17     17 0 34 my $j = shift;
61 17         28 my $po;
62             $po = sub {
63 474     474   6361 my $elt;
64 474         679 my $int_po = $po;
65 474         641 state $last_text;
66 474         668 my $done = eval {$j->incr_text =~ s/^\s*\]\s*//};
  474         15483  
67 474 100       1228 return node(undef,undef) if $done;
68 470         654 eval {
69 470         6471 $elt = $j->incr_parse;
70             };
71 470 100       1123 if (defined $elt) {
    100          
72 412         1345 $last_text = $j->incr_text;
73 412 100       1587 if ($j->incr_text =~ m/^\]}\],/) { # JSON::XS <=3.04 transaction kludge
    100          
74 5         44 $j->incr_text =~ s/(\])(}\],)//;
75 5         14 $done=1;
76             }
77             elsif ($elt eq 'transaction') { # JSON::XS 4.02 transaction kludge
78 4         20 $j->incr_text = '"transaction"'.$j->incr_text;
79 4         15 return node(undef,undef);
80             }
81             else {
82 403         10017 $j->incr_text =~ s/^(\s*,\s*)|(\s*\]\s*)//;
83 403         1269 $done = !!$2;
84             }
85              
86             }
87             elsif ($@) {
88 1 50 33     15 if ($@ =~ /already started parsing/) {
    50          
89 0         0 $elt = 'PENDING';
90             }
91             elsif ($@ =~ /must be an object or array/ &&
92             $last_text =~ /("[^"]+")/) {
93             # txn kludge
94 0         0 $j->incr_skip;
95 0         0 $j->incr_text = $1.$j->incr_text;
96 0         0 return node(undef,undef);
97             }
98             else {
99 1         7 die "j_parse: $@";
100             }
101             }
102             else {
103 57         92 $elt = 'PENDING';
104             }
105 465 100       2613 node(['ARELT'=>$elt], $done ? undef : promise { $po->() });
  429         215182  
106 17         114 };
107 17         63 return $po;
108             }
109              
110             # generic parse object to one level
111             # opening { must be removed
112             # when another stream is returned as the value,
113             # another call to j_parse_object has to wait until that stream is
114             # exhausted...
115             sub j_parse_object {
116 23     23 0 43 my $j = shift;
117 23         34 my $po;
118             $po = sub {
119 84     84   1268 state $key;
120 84         132 state $current = '';
121 84         196 my ($text, $head,$obj);
122 84         0 my $done;
123 84 50       204 unless ($current eq 'PENDING') {
124 84         119 my $m;
125 38     38   29303 use experimental 'smartmatch';
  38         95  
  38         240  
126 84         131 eval {
127 84         599 $j->incr_text =~ m/^(?:(\s*"([^"]+)"\s*:\s*)|(\s*}\s*))/; # look ahead
128 84   100     341 $m = $2||$3;
129 84 100 100     392 if ($m && ($m eq 'columns') && ($current eq 'RESULTS_STREAM')) {
      100        
130             # if this is a 'results' item, don't eat the 'columns',
131             # let the results stream do it
132 8         27 $m = undef;
133             }
134             else {
135 76         886 $j->incr_text =~ s/^(?:(\s*"([^"]+)"\s*:\s*)|(\s*}\s*))//; # consume
136             }
137              
138             };
139              
140 84 100 66     358 if ($@ =~ /already started parsing/ || !$m) {
    50          
141             # either another function instance is in the middle of parsing,
142             # or no key was found (which is the same thing)
143             # so report where this instance is:
144 25         154 return node([$key => $current], promise { $po->() });
  10         7426  
145             }
146             elsif ($@) {
147 0         0 die "j_parse: incr parser error: $@";
148             }
149 59         152 $key = $m;
150             }
151 38     38   11394 use experimental 'smartmatch';
  38         98  
  38         212  
152 59         97 given ($key) {
153 59         183 when ('columns') {
154 14         26 eval {
155 14         81 $obj = $j->incr_parse;
156             };
157 14 50       51 die "j_parse: incr parser error: $@" if $@;
158 14         36 $current = 'COMPLETE';
159             }
160 45         96 when (/commit/) {
161 8         99 $j->incr_text =~ s/^"([^"]+)"\s*,?\s*//;
162 8         23 $obj = $1;
163 8         23 $current = 'COMPLETE';
164             }
165 37         68 when (/transaction/) {
166 6         14 eval {
167 6         39 $obj = $j->incr_parse; # get txn info obj
168             };
169 6 50       22 die "j_parse: incr parser error: $@" if $@;
170 6         18 $current = 'COMPLETE';
171             }
172 31         111 when ('data') {
173 12 100       195 if ($j->incr_text =~ s/^\[\s*//) {
174 10         35 $obj = j_parse_array($j);
175 10         32 $current = 'DATA_STREAM';
176             }
177             else {
178 2         21 die "j_parse: expecting an array value for 'data' key";
179             }
180             }
181 19         36 when ('results') {
182 8 50       95 if ($j->incr_text =~ s/^\[\s*//) {
    0          
183 8         96 $j->incr_text =~ s/^\s*{\s*//;
184 8         21 eval {
185 8         20 $obj = j_parse_object($j);
186             };
187 8 50       25 die "j_parse: incr parser error: $@" if $@;
188 8         43 $current = 'RESULTS_STREAM';
189             }
190             elsif ($j->incr_text eq '') {
191 0         0 $current = 'DONE';
192 0         0 $done=1;
193             }
194             }
195 11         26 when ('errors') {
196 6 50       37 if ($j->incr_text=~ s/^\[\s*//) {
    0          
197 6         17 $obj = j_parse_array($j);
198 6         20 $current = 'ERRORS_STREAM';
199             }
200             elsif ($j->incr_text =~ s/^\s*\]\s*,?\s*//) {
201 0         0 $current = 'DONE';
202 0         0 $done=1;
203             }
204             }
205 5         17 when (/}/) {
206 3 50       10 if ($current eq 'DATA_STREAM') {
207 3 100       68 if ($j->incr_text =~ s/^\s*,\s*{//) {
    100          
    50          
208             # prepared for next results object
209 1         6 $obj = j_parse_object($j);
210 1         3 $key = 'results';
211 1         4 $current = 'RESULTS_STREAM';
212             }
213             elsif ($j->incr_text =~ s/^\s*\]\s*,?\s*//) {
214 1         3 $current = 'DONE';
215 1         4 $done=1;
216             }
217             elsif ($j->incr_text eq '') {
218 1         3 $current = 'DONE';
219 1         5 $done=1;
220             }
221             }
222             else {
223 0         0 $current = 'DONE';
224 0         0 $done=1;
225             }
226             }
227 2         7 when (undef) {
228 0         0 die "j_parse: No key found";
229             }
230 2         5 default {
231             # why am I here?
232 2         23 die "j_parse: Unexpected key '$key' in stream";
233             }
234             }
235 55 100       117 if (defined $obj) {
236 53         111 $head = [$key => $obj];
237 53         2041 $j->incr_text =~ s/^(?:(\s*,\s*)|(\s*}\s*))//;
238 53         138 $done = !!$2;
239             }
240             else {
241 2 50       9 $head = $done ? undef : [$key => $current = 'PENDING'];
242             }
243 55 50       326 return node($head, $done ? undef : promise { $po->() }) if $head;
  52 100       13389  
244 2         11 return node(undef, undef);
245 23         145 };
246 23         129 return $po;
247             }
248              
249             =head1 NAME
250              
251             REST::Neo4p::ParseStream - Parse Neo4j REST responses on the fly
252              
253             =head1 SYNOPSIS
254              
255             Not for human consumption.
256             This module is ignored by the Neo4j::Driver-based agent.
257              
258             =head1 DESCRIPTION
259              
260             This module helps L exploit the L
261             server's chunked transfer encoding of its JSON REST responses. It is
262             based on the fast L incremental parser and
263             L's L
264             Perl|http://hop.perl.plover.com> ideas as implemented in
265             L.
266              
267             The goal is to be able to pull in objects from the server stream as
268             soon as they are available. In practice, this means specifically
269             finding and incrementally processing the potentially large arrays of
270             objects that are returned from cypher queries, transaction queries,
271             and batch requests.
272              
273             Because of inconsistencies among the Neo4j response formats for each
274             of these functions, this module does a significant amount of
275             "hand-parsing". Currently the code will not be very robust to changes
276             in those response formats. If you find your query handling is breaking
277             with a new server version, L
278             ticket|https://rt.cpan.org/Public/Bug/Report.html?Queue=REST-Neo4p>. In
279             the meantime, you should be able to keep things going (albeit more
280             slowly) by turning off streaming at the agent:
281              
282             use REST::Neo4p;
283             REST::Neo4p->agent->no_stream;
284             ...
285              
286             =head1 SEE ALSO
287              
288             L, L, L,
289             L, L.
290              
291             =head1 AUTHOR
292              
293             Mark A. Jensen
294             CPAN ID: MAJENSEN
295             majensen -at- cpan -dot- org
296              
297             =head1 LICENSE
298              
299             Copyright (c) 2012-2022 Mark A. Jensen. This program is free software; you
300             can redistribute it and/or modify it under the same terms as Perl
301             itself.
302              
303             =cut
304              
305             1;