File Coverage

blib/lib/App/RecordStream/Operation/join.pm
Criterion Covered Total %
statement 93 105 88.5
branch 20 30 66.6
condition 4 6 66.6
subroutine 14 16 87.5
pod 0 9 0.0
total 131 166 78.9


line stmt bran cond sub pod time code
1             package App::RecordStream::Operation::join;
2              
3             our $VERSION = "4.0.24";
4              
5 2     2   1008 use strict;
  2         5  
  2         66  
6              
7 2     2   11 use base qw(App::RecordStream::Operation);
  2         4  
  2         152  
8              
9 2     2   14 use App::RecordStream::Executor;
  2         11  
  2         43  
10 2     2   10 use App::RecordStream::InputStream;
  2         5  
  2         44  
11 2     2   11 use App::RecordStream::Record;
  2         5  
  2         1111  
12              
13             sub init {
14 5     5 0 8 my $this = shift;
15 5         9 my $args = shift;
16              
17 5         7 my $left = 0;
18 5         8 my $right = 0;
19 5         8 my $inner = 0;
20 5         9 my $outer = 0;
21 5         8 my $operation = "";
22 5         6 my $accumulate_right = 0;
23              
24 5         21 my $spec = {
25             "left" => \$left,
26             "right" => \$right,
27             "inner" => \$inner,
28             "outer" => \$outer,
29             "operation=s" => \$operation,
30             "accumulate-right" => \$accumulate_right,
31             };
32              
33 5         21 $this->parse_options($args, $spec);
34              
35 5 50       16 my $inputkey = shift @$args
36             or die "You must provide inputkey\n";
37              
38 5 50       12 my $dbkey = shift @$args
39             or die "You must provide dbkey\n";
40              
41 5 50       14 my $dbfile = shift @$args
42             or die "You must provide dbfile\n";
43              
44 5         13 $this->{'ACCUMULATE_RIGHT'} = $accumulate_right;
45 5         11 $this->{'DB_KEY'} = $dbkey;
46 5         9 $this->{'INPUT_KEY'} = $inputkey;
47 5   66     19 $this->{'KEEP_LEFT'} = $left || $outer;
48 5   66     22 $this->{'KEEP_RIGHT'} = $right || $outer;
49              
50              
51 5 100       12 if ( $operation ) {
52 1         8 $this->{'OPERATION'} = App::RecordStream::Executor->transform_code($operation);
53             }
54              
55 5         18 $this->create_db($dbfile, $dbkey);
56              
57 5         59 $this->{'KEYS_PRINTED'} = {};
58             }
59              
60             sub create_db {
61 5     5 0 7 my $this = shift;
62 5         9 my $file = shift;
63 5         9 my $key = shift;
64              
65 5         31 my $db_stream = App::RecordStream::InputStream->new('FILE' => $file);
66 5         11 my %db;
67             my $record;
68              
69 5         17 while($record = $db_stream->get_record()) {
70 21         47 my $value = $this->value_for_key($record, $key);
71              
72 21 50       84 $db{$value} = [] unless ( $db{$value} );
73 21         34 push @{$db{$value}}, $record;
  21         88  
74             }
75              
76 5         35 $this->{'DB'} = \%db;
77             }
78              
79             sub value_for_key {
80 50     50 0 80 my $this = shift;
81 50         65 my $record = shift;
82 50         69 my $key = shift;
83              
84             return join "\x1E", # ASCII record separator (RS)
85 50         125 map { ${$record->guess_key_from_spec($_, 0)} }
  60         76  
  60         138  
86             split /,/, $key;
87             }
88              
89             sub accept_record {
90 25     25 0 36 my $this = shift;
91 25         29 my $record = shift;
92              
93 25         47 my $value = $this->value_for_key($record, $this->{'INPUT_KEY'});
94              
95 25         44 my $db = $this->{'DB'};
96              
97 25 100       61 if(my $db_records = $db->{$value}) {
    100          
98 14         25 foreach my $db_record (@$db_records) {
99 14 50       28 if ($this->{'ACCUMULATE_RIGHT'}) {
100 0 0       0 if ($this->{'OPERATION'}) {
101 0         0 $this->run_expression($db_record, $record);
102             }
103             else {
104 0         0 foreach my $this_key (keys %$record) {
105 0 0       0 if (!exists($db_record->{$this_key})) {
106 0         0 $db_record->{$this_key} = $record->{$this_key};
107             }
108             }
109             }
110             }
111             else {
112 14 100       27 if ($this->{'OPERATION'}) {
113 3         14 my $output_record = App::RecordStream::Record->new(%$db_record);
114 3         10 $this->run_expression($output_record, $record);
115 3         10 $this->push_record($output_record);
116             }
117             else {
118 11         52 $this->push_record(App::RecordStream::Record->new(%$record, %$db_record));
119             }
120              
121 14 100       34 if ($this->{'KEEP_LEFT'}) {
122 3         7 $this->{'KEYS_PRINTED'}->{$value} = 1;
123             }
124             }
125             }
126             }
127             elsif ($this->{'KEEP_RIGHT'}) {
128 2         6 $this->push_record($record);
129             }
130              
131 25         114 return 1;
132             }
133              
134             # TODO: shove down into executor
135             sub run_expression {
136 3     3 0 4 my $__MY__this = shift;
137 3         5 my $d = shift;
138 3         4 my $i = shift;
139              
140 2     2   16 no strict;
  2         4  
  2         75  
141 2     2   13 no warnings;
  2         5  
  2         674  
142 3         143 eval $__MY__this->{'OPERATION'};
143              
144 3 50       16 if ( $@ ) {
145 0         0 warn "Code died with $@\n";
146             }
147             }
148              
149             sub stream_done {
150 5     5 0 9 my $this = shift;
151 5 100       14 if ($this->{'KEEP_LEFT'}) {
152 1         2 foreach my $db_records (values %{$this->{'DB'}}) {
  1         4  
153 4         8 foreach my $db_record (@$db_records) {
154 4         9 my $value = $this->value_for_key($db_record, $this->{'DB_KEY'});
155 4 100       12 if (!exists($this->{'KEYS_PRINTED'}->{$value})) {
156 1         3 $this->push_record($db_record);
157             }
158             }
159             }
160             }
161             }
162              
163             sub add_help_types {
164 5     5 0 10 my $this = shift;
165 5         18 $this->use_help_type('keyspecs');
166 5         13 $this->use_help_type('snippet');
167 5         19 $this->add_help_type(
168             'full',
169             \&full_help,
170             'Help on join types and accumulate-right'
171             );
172             }
173              
174             sub full_help {
175 0     0 0   print <
176             Join Types
177             For instance, if you did:
178             recs-join type typeName dbfile fromfile
179              
180             with a db file like:
181             { 'typeName': 'foo', 'hasSetting': 1 }
182             { 'typeName': 'bar', 'hasSetting': 0 }
183              
184             and joined that with
185             { 'name': 'something', 'type': 'foo'}
186             { 'name': 'blarg', 'type': 'hip'}
187              
188             for an inner (default) join, you would get
189             { 'name': 'something', 'type': 'foo', 'typeName': 'foo', 'hasSetting': 1}
190              
191             for an outer join, you would get
192             { 'name': 'something', 'type': 'foo', 'typeName': 'foo', 'hasSetting': 1}
193             { 'name': 'blarg', 'type': 'hip'}
194             { 'typeName': 'bar', 'hasSetting': 0 }
195              
196             for a left join, you would get
197             { 'name': 'something', 'type': 'foo', 'typeName': 'foo', 'hasSetting': 1}
198             { 'typeName': 'bar', 'hasSetting': 0 }
199              
200             for a right join, you would get
201             { 'name': 'something', 'type': 'foo', 'typeName': 'foo', 'hasSetting': 1}
202             { 'name': 'blarg', 'type': 'hip'}
203              
204             Accumulate Right:
205             Accumulate all input records with the same key onto each db record matching
206             that key. This means that a db record can have multiple input records merged
207             into it. If no operation is provided, any fields in second or later records
208             will be lost due to them being discarded. This option is most useful with a
209             user defined operation to handle collisions. For example, one could provide
210             an operation to add fields together:
211              
212             recs-join --left --operation '
213             foreach \$k (keys \%\$i) {
214             if (exists(\$d->{\$k})) {
215             if (\$k =~ /^value/) {\$d->{\$k} = \$d->{\$k} + \$i->{\$k};}
216             } else {
217             \$d->{\$k} = \$i->{\$k};
218             }
219             }' --accumulate-right name name dbfile inputfile
220             HELP_FULL
221             }
222              
223             sub usage {
224 0     0 0   my $this = shift;
225 0           my $message = shift;
226              
227 0           my $options = [
228             ['left', 'Do a left join'],
229             ['right', 'Do a right join'],
230             ['inner', 'Do an inner join (This is the default)'],
231             ['outer', 'Do an outer join'],
232             ['operation', 'An perl expression to evaluate for merging two records together, in place of the default behavior of db fields overwriting input fields. See "Operation" below.'],
233             ['accumulate-right', 'Accumulate all input records with the same key onto each db record matching that key. See "Accumulate Right" below.'],
234             ];
235              
236 0           my $args_string = $this->options_string($options);
237              
238 0           return <
239             $message
240             Usage: recs-join []
241             __FORMAT_TEXT__
242             Records of input (or records from ) are joined against records in
243             , using field from input and field from .
244             Each record from input may match 0, 1, or more records from . Each
245             pair of matches will be combined to form a larger record, with fields from
246             the dbfile overwriting fields from the input stream. If the join is a left
247             join or inner join, any inputs that do not match a dbfile record are
248             discarded. If the join is a right join or inner join, any db records that do
249             not match an input record are discarded.
250              
251             dbkey and inputkey may be key specs, see '--help-keyspecs' for more
252             information. Separate multiple keyspecs with commas to use a composite key.
253             __FORMAT_TEXT__
254              
255             Arguments:
256             $args_string
257              
258             Operation:
259             __FORMAT_TEXT__
260             The expression provided is evaluated for every pair of db record and input
261             record that have matching keys, in place of the default operation to
262             overwrite input fields with db fields. The variable \$d is set to a
263             App::RecordStream::Record object for the db record, and \$i is set to a
264             App::RecordStream::Record object for the input record. The \$d record is
265             used for the result. Thus, if you provide an empty operation, the result
266             will contain only fields from the db record. Note that an empty operation is
267             different from no --operation at all.
268             __FORMAT_TEXT__
269              
270             Examples:
271             Join type from STDIN and typeName from dbfile
272             cat recs | recs-join type typeName dbfile
273              
274             Join host name from a mapping file to machines to get IPs
275             recs-join host host hostIpMapping machines
276             USAGE
277             }
278              
279             1;