File Coverage

blib/lib/OPP.pm
Criterion Covered Total %
statement 27 227 11.8
branch 0 96 0.0
condition 0 5 0.0
subroutine 9 21 42.8
pod 0 11 0.0
total 36 360 10.0


line stmt bran cond sub pod time code
1             #
2             # $Id: OPP.pm,v cfbea05b0bc4 2025/01/28 15:06:19 gomor $
3             #
4             package OPP;
5 1     1   223369 use strict;
  1         2  
  1         33  
6 1     1   4 use warnings;
  1         1  
  1         73  
7              
8             our $VERSION = '1.00';
9              
10             our $debug = 0;
11              
12 1     1   413 use Class::Gomor::Array;
  1         3  
  1         71  
13 1     1   9 use base qw(Class::Gomor::Array);
  1         2  
  1         196  
14              
15             our @AS = qw(
16             nested
17             state
18             output
19             );
20             __PACKAGE__->cgBuildIndices;
21             __PACKAGE__->cgBuildAccessorsScalar(\@AS);
22              
23 1     1   8 use Carp;
  1         2  
  1         77  
24 1     1   8 use Data::Dumper;
  1         2  
  1         63  
25 1     1   675 use Text::ParseWords;
  1         1760  
  1         91  
26 1     1   679 use JSON::XS qw(encode_json decode_json);
  1         5986  
  1         71  
27 1     1   479 use Tie::IxHash;
  1         4335  
  1         3654  
28              
29             #
30             # Check given field is of nested kind:
31             #
32             # $self->is_nested("domain"); # 0
33             # $self->is_nested("app.http.component"); # ( 'app.http.component', undef )
34             # $self->is_nested("app.http.component.product"); # ( 'app.http.component', 'product' )
35             #
36             sub is_nested {
37 0     0 0   my $self = shift;
38 0           my ($field) = @_;
39              
40 0 0         croak("is_nested: need field arg") unless defined($field);
41              
42 0           my $fields = $self->nested;
43 0 0         return 0 unless defined($fields);
44              
45 0           my $nested = { map { $_ => 1 } @{$self->nested} };
  0            
  0            
46              
47 0           my ($head, $leaf) = $field =~ m{^(.+)\.(\S+)$};
48              
49 0           my $is_nested = 0;
50             # Handle first case: app.http.component.product given as input:
51             # Will have head set to app.http.component and leaf to product:
52 0 0 0       if (defined($head) && $nested->{$head}) {
    0          
53 0           $is_nested = 1;
54             }
55             # Handle second case: app.http.component given as input:
56             elsif ($nested->{$field}) {
57 0           $head = $field;
58 0           $leaf = undef;
59 0           $is_nested = 1;
60             }
61              
62 0 0         return $is_nested ? [ $head, $leaf ] : 0;
63             }
64              
65             #
66             # Flatten given doc so we can work with field names in 'a.b.c' format i/o of {a}{b}{c}:
67             #
68             sub flatten {
69 0     0 0   my $self = shift;
70 0           my ($docs) = @_;
71              
72             #croak("flatten: need doc|docs argument") unless defined($docs);
73 0 0         return $docs unless defined($docs);
74              
75 0 0         $docs = ref($docs) eq 'ARRAY' ? $docs : [ $docs ];
76              
77 0           my @new = ();
78 0           for my $doc (@$docs) {
79 0           my $new = { __opp_flatten => 1 };
80 0           my $sub; $sub = sub {
81 0     0     my ($doc, $field) = @_;
82              
83 0           for my $k (keys %$doc) {
84 0 0         my $this_field = defined($field) ? "$field.$k" : $k;
85 0 0         if (ref($doc->{$k}) eq 'HASH') {
86 0           $sub->($doc->{$k}, $this_field);
87             }
88             else {
89 0           $new->{$this_field} = $doc->{$k};
90             }
91             }
92              
93 0           return $new;
94 0           };
95              
96             #push @new, ($doc->{__opp_flatten} ? $doc : $sub->($doc));
97 0           push @new, $sub->($doc);
98             }
99              
100 0           return \@new;
101             }
102              
103             my $tie = sub {
104             my ($h) = @_;
105             my $t = tie(my %res, 'Tie::IxHash');
106             %res = %$h;
107             $t->SortByKey;
108             return \%res;
109             };
110              
111             my $order; $order = sub {
112             my ($h) = @_;
113              
114             my $tie = $tie->($h);
115              
116             for my $k (keys %$h) {
117             next unless defined $k;
118             next unless defined $h->{$k};
119             if (ref($h->{$k}) eq 'HASH') {
120             my $this_tie = $order->($h->{$k});
121             $tie->{$k} = $this_tie;
122             }
123             elsif (ref($h->{$k} eq 'ARRAY')) {
124             my @a = ();
125             for (@{$h->{$k}}) {
126             next unless ref($_) eq 'HASH';
127             my $this_tie = $order->($_);
128             push @a, $this_tie;
129             }
130             $h->{$k} = \@a if @a;
131             }
132             }
133              
134             return $tie;
135             };
136              
137             sub order {
138 0     0 0   my $self = shift;
139 0           my ($docs) = @_;
140              
141 0 0         $docs = ref($docs) eq 'ARRAY' ? $docs : [ $docs ];
142              
143 0           my @ordered = ();
144 0           for (@$docs) {
145 0 0         my $this = $order->($_) or next;
146 0           push @ordered, $this;
147             }
148              
149 0           return \@ordered;
150             }
151              
152             sub unflatten {
153 0     0 0   my $self = shift;
154 0           my ($flats) = @_;
155              
156 0 0         croak("unflatten: need flat|flats argument") unless defined($flats);
157              
158 0 0         $flats = ref($flats) eq 'ARRAY' ? $flats : [ $flats ];
159              
160 0           my @new = ();
161 0           for my $flat (@$flats) {
162 0 0         if ($flat->{_opp_nounflatten}) {
163 0           delete $flat->{_opp_nounflatten};
164 0           push @new, $flat;
165 0           next;
166             }
167              
168 0           my %new;
169 0           for my $k (keys %$flat) {
170 0           my @toks = split(/\./, $k);
171 0           my $value = $flat->{$k};
172              
173 0           my $current = \%new;
174 0           my $last = $#toks;
175 0           for my $idx (0..$#toks) {
176 0 0         if ($idx == $last) { # Last token
177 0           $current->{$toks[$idx]} = $value;
178 0           last;
179             }
180              
181             # Create HASH key so we can iterate and create all subkeys
182             # Merge with existing or create empty HASH:
183 0   0       $current->{$toks[$idx]} = $current->{$toks[$idx]} || {};
184 0           $current = $current->{$toks[$idx]};
185             }
186             }
187              
188 0           delete $new{__opp_flatten};
189 0           push @new, \%new;
190             }
191              
192 0           return \@new;
193             }
194              
195             sub pipeone {
196 0     0 0   my $self = shift;
197 0           my ($input, $opp) = @_;
198              
199 0 0         $input = ref($input) eq 'ARRAY' ? $input : [ $input ];
200              
201 0 0         return $input unless defined($opp);
202              
203 0           $opp =~ s{(?:^\s*|\s*$)}{}g;
204              
205 0           my @cmd = split(/\s*(?
206 0 0         croak("pipeone: no query, aborting") if @cmd == 0;
207              
208 0 0         print STDERR "pipeone: cmdlist[@cmd] count[".scalar(@cmd)."]\n" if $debug;
209              
210 0           my $idx = 0;
211 0           $self->output->add($self->flatten($input));
212 0           for my $this (@cmd) {
213 0 0         print STDERR "pipeone: cmd[$this]\n" if $debug;
214 0           my @proc = $this =~ m{^(\S+)(?:\s+(.+))?$};
215 0 0         if (! defined($proc[0])) {
216 0 0         print STDERR "pipeone: parse failed for [$this]\n" if $debug;
217 0           return;
218             }
219              
220             # Load proc
221 0           my $module = 'OPP::Proc::'.ucfirst(lc($proc[0]));
222 0           eval("use $module;");
223 0 0         if ($@) {
224 0           chomp($@);
225 0           print STDERR "pipeone: use proc failed [$proc[0]]: $@\n";
226 0           return;
227             }
228 0           my $proc = $module->new;
229 0 0         if (!defined($proc)) {
230 0           print STDERR "pipeone: load proc failed [$proc[0]]\n";
231 0           return;
232             }
233 0           $proc->idx($idx);
234 0           $proc->nested($self->nested);
235 0           $proc->state($self->state);
236 0           $proc->output($proc->clone($self->output)->init);
237              
238 0           my $argument = $proc[1];
239 0           my $options = $proc->parse($argument);
240 0           $proc->options($options);
241              
242 0 0         print STDERR "pipeone: proc[$proc]\n" if $debug;
243              
244 0           for my $input (@{$self->output->docs}) {
  0            
245 0           $proc->process($input);
246             }
247 0           $self->output->docs($proc->output->docs);
248 0           $idx++;
249             }
250              
251 0 0         if (defined($self->output->docs)) {
252 0           my $docs = $self->unflatten($self->output->docs);
253 0           $self->output->flush;
254 0           return $docs;
255             }
256              
257 0           return;
258             }
259              
260             sub pipeline {
261 0     0 0   my $self = shift;
262 0           my ($input, $opp) = @_;
263              
264 0 0         $input = ref($input) eq 'ARRAY' ? $input : [ $input ];
265              
266 0 0         return $input unless defined($opp);
267              
268 0           $opp =~ s{(?:^\s*|\s*$)}{}g;
269              
270 0           my @cmd = split(/\s*(?
271 0 0         croak("pipeline: no query, aborting") if @cmd == 0;
272              
273 0 0         print STDERR "pipeline: cmdlist[@cmd] count[".scalar(@cmd)."]\n" if $debug;
274              
275 0           my $idx = 0;
276 0           $self->output->add($self->flatten($input));
277 0           for my $this (@cmd) {
278 0 0         print STDERR "pipeline: cmd[$this]\n" if $debug;
279 0           my @proc = $this =~ m{^(\S+)(?:\s+(.+))?$};
280 0 0         if (! defined($proc[0])) {
281 0 0         print STDERR "pipeline: parse failed for [$this]\n" if $debug;
282 0           return;
283             }
284              
285             # Load proc
286 0           my $module = 'OPP::Proc::'.ucfirst(lc($proc[0]));
287 0           eval("use $module;");
288 0 0         if ($@) {
289 0           chomp($@);
290 0           print STDERR "pipeline: use proc failed [$proc[0]]: $@\n";
291 0           return;
292             }
293 0           my $proc = $module->new;
294 0 0         if (!defined($proc)) {
295 0           print STDERR "pipeline: load proc failed [$proc[0]]\n";
296 0           return;
297             }
298 0           $proc->idx($idx);
299 0           $proc->nested($self->nested);
300 0           $proc->state($self->state);
301 0           $proc->output($proc->clone($self->output)->init);
302              
303 0           my $argument = $proc[1];
304 0           my $options = $proc->parse($argument);
305 0           $proc->options($options);
306              
307 0 0         print STDERR "pipeline: proc[$proc]\n" if $debug;
308              
309 0           for my $input (@{$self->output->docs}) {
  0            
310 0           $proc->process($input);
311             }
312 0           $self->output->docs($proc->output->docs);
313 0           $idx++;
314             }
315              
316 0 0         if (defined($self->output->docs)) {
317 0           for my $doc (@{$self->unflatten($self->output->docs)}) {
  0            
318 0           print "$_\n" for @{$self->to_json($doc)};
  0            
319             }
320 0           $self->output->flush; # Flush output when processed
321             }
322              
323 0           return 1;
324             }
325              
326             sub to_json {
327 0     0 0   my $self = shift;
328 0           my ($doc) = @_;
329              
330 0 0         $doc = ref($doc) eq 'ARRAY' ? $doc : [ $doc ];
331              
332 0           my @json = ();
333 0           for (@$doc) {
334 0 0         my $docs = $self->order($_) or next;
335 0           for my $doc (@$docs) {
336 0           my $json;
337 0           eval {
338 0           $json = encode_json($doc);
339             };
340 0 0         if ($@) { # Silently discard in case of error
341 0           next;
342             }
343 0 0         next unless defined $json;
344 0           push @json, $json;
345             }
346             }
347              
348 0           return \@json;
349             }
350              
351             sub from_json {
352 0     0 0   my $self = shift;
353 0           my ($docs) = @_;
354              
355 0 0         $docs = ref($docs) eq 'ARRAY' ? $docs : [ $docs ];
356              
357 0           my @json = ();
358 0           for my $doc (@$docs) {
359 0           my $json;
360 0           eval {
361 0           $json = decode_json($doc);
362             };
363 0 0         if ($@) { # Silently discard in case of error
364 0           next;
365             }
366 0 0         next unless defined $json;
367 0           push @json, $json;
368             }
369              
370 0           return $self->order(\@json);
371             }
372              
373             sub add_output {
374 0     0 0   my $self = shift;
375 0           my ($doc) = @_;
376              
377 0           return push @{$self->output}, $doc;
  0            
378             }
379              
380             sub process_as_json {
381 0     0 0   my $self = shift;
382 0           my ($input, $opp) = @_;
383              
384 0 0         croak("process: need input argument") unless defined($input);
385 0 0         croak("process: need opp argument") unless defined($opp);
386              
387 0           $input = $self->from_json($input);
388              
389 0           return $self->pipeline($input, $opp);
390             }
391              
392             sub process_as_perl {
393 0     0 0   my $self = shift;
394 0           my ($input, $opp) = @_;
395              
396 0 0         croak("process: need input argument") unless defined($input);
397 0 0         croak("process: need opp argument") unless defined($opp);
398              
399 0           return $self->pipeline($input, $opp);
400             }
401              
402             1;
403              
404             __END__