File Coverage

blib/lib/App/RecordStream/Operation/collate.pm
Criterion Covered Total %
statement 88 109 80.7
branch 3 8 37.5
condition 0 3 0.0
subroutine 18 25 72.0
pod 0 8 0.0
total 109 153 71.2


line stmt bran cond sub pod time code
1             package App::RecordStream::Operation::collate;
2              
3             our $VERSION = "4.0.24";
4              
5 4     4   1641 use strict;
  4         10  
  4         124  
6 4     4   25 use warnings;
  4         10  
  4         131  
7              
8 4     4   27 use base qw(App::RecordStream::Operation);
  4         9  
  4         305  
9              
10 4     4   1007 use App::RecordStream::Aggregator;
  4         11  
  4         108  
11 4     4   1063 use App::RecordStream::Clumper::Options;
  4         14  
  4         116  
12 4     4   28 use App::RecordStream::DomainLanguage::Library;
  4         7  
  4         78  
13 4     4   19 use App::RecordStream::DomainLanguage::Snippet;
  4         11  
  4         79  
14 4     4   22 use App::RecordStream::DomainLanguage;
  4         11  
  4         90  
15 4     4   1385 use App::RecordStream::Operation::collate::BaseClumperCallback;
  4         12  
  4         127  
16 4     4   30 use App::RecordStream::Operation;
  4         9  
  4         3958  
17              
18             sub init {
19 17     17 0 40 my $this = shift;
20 17         43 my $args = shift;
21              
22 17         178 App::RecordStream::Aggregator->load_implementations();
23              
24             # clumping
25 17         1035737 my $clumper_options = $this->{'CLUMPER_OPTIONS'} = App::RecordStream::Clumper::Options->new();
26              
27             # aggregation
28 17         89 my @aggregators;
29             my %dlaggregators;
30 17         0 my @mr_aggregators;
31 17         0 my @ii_aggregators;
32 17         44 my $incremental = 0;
33 17         40 my $bucket = 1;
34              
35             # help
36 17         39 my $list_aggregators = 0;
37 17         40 my $aggregator = 0;
38              
39             my $spec = {
40             $clumper_options->main_options(),
41              
42             # aggregation
43 17     17   8103 "aggregator|a=s" => sub { push @aggregators, $_[1]; },
44 1     1   279 "dlaggregator|A=s" => sub { build_dlaggregator(\%dlaggregators, $_[1]); },
45 17         78 "mr-agg=s{4}" => \@mr_aggregators,
46             "ii-agg=s{4}" => \@ii_aggregators,
47             "incremental|i" => \$incremental,
48             "bucket!" => \$bucket,
49              
50             # help
51             "list-aggregators" => \$list_aggregators,
52             "show-aggregator=s" => \$aggregator,
53             $clumper_options->help_options(),
54             "list" => \$list_aggregators,
55             };
56              
57 17         194 $this->parse_options($args, $spec);
58              
59             # check help first
60              
61 17 50       72 if ( $list_aggregators ) {
62 0     0   0 die sub { print App::RecordStream::Aggregator->list_implementations(); };
  0         0  
63             }
64              
65 17 50       70 if ( $aggregator ) {
66 0     0   0 die sub { App::RecordStream::Aggregator->show_implementation($aggregator) };
  0         0  
67             }
68              
69 17         173 my $aggregator_objects = App::RecordStream::Aggregator->make_aggregators(@aggregators);
70              
71 17         92 $aggregator_objects = {%$aggregator_objects, %dlaggregators};
72              
73 17         86 for(my $i = 0; $i < @mr_aggregators; 1) {
74 1         4 my $name = $mr_aggregators[$i++];
75 1         3 my $map_string = $mr_aggregators[$i++];
76 1         3 my $reduce_string = $mr_aggregators[$i++];
77 1         3 my $squish_string = $mr_aggregators[$i++];
78              
79 1         11 my $map_snippet = App::RecordStream::DomainLanguage::Snippet->new($map_string);
80 1         7 my $reduce_snippet = App::RecordStream::DomainLanguage::Snippet->new($reduce_string);
81 1         4 my $squish_snippet = App::RecordStream::DomainLanguage::Snippet->new($squish_string);
82              
83 1         8 $aggregator_objects->{$name} = App::RecordStream::DomainLanguage::Library::map_reduce_aggregator($map_snippet, $reduce_snippet, $squish_snippet);
84             }
85              
86 17         70 for(my $i = 0; $i < @ii_aggregators; 1) {
87 1         3 my $name = $ii_aggregators[$i++];
88 1         3 my $initial_string = $ii_aggregators[$i++];
89 1         3 my $combine_string = $ii_aggregators[$i++];
90 1         2 my $squish_string = $ii_aggregators[$i++];
91              
92 1         9 my $initial_snippet = App::RecordStream::DomainLanguage::Snippet->new($initial_string);
93 1         6 my $combine_snippet = App::RecordStream::DomainLanguage::Snippet->new($combine_string);
94 1         6 my $squish_snippet = App::RecordStream::DomainLanguage::Snippet->new($squish_string);
95              
96 1         7 $aggregator_objects->{$name} = App::RecordStream::DomainLanguage::Library::inject_into_aggregator($initial_snippet, $combine_snippet, $squish_snippet);
97             }
98              
99 17     73   243 $clumper_options->check_options(App::RecordStream::Operation::collate::BaseClumperCallback->new($aggregator_objects, $incremental, $bucket, sub { $this->push_record($_[0]); }));
  73         292  
100             }
101              
102             sub build_dlaggregator {
103 1     1 0 4 my $dlaggregators_ref = shift;
104 1         3 my $string = shift;
105              
106 1         3 my $name;
107 1 50       13 if($string =~ s/^([^=]*)=//) {
108 1         4 $name = $1;
109             }
110             else {
111 0         0 die "Bad domain language aggregator option (missing '=' to separate name and code): " . $string;
112             }
113              
114 1         7 $dlaggregators_ref->{$name} = App::RecordStream::DomainLanguage::Snippet->new($string)->evaluate_as('AGGREGATOR');
115             }
116              
117             sub accept_record {
118 106     106 0 216 my $this = shift;
119 106         221 my $record = shift;
120              
121 106         385 $this->{'CLUMPER_OPTIONS'}->accept_record($record);
122             }
123              
124             sub stream_done {
125 17     17 0 29 my $this = shift;
126              
127 17         79 $this->{'CLUMPER_OPTIONS'}->stream_done();
128             }
129              
130             sub print_usage {
131 0     0 0 0 my $this = shift;
132 0         0 my $message = shift;
133              
134 0 0 0     0 if ( $message && UNIVERSAL::isa($message, 'CODE') ) {
135 0         0 $message->();
136 0         0 exit 1;
137             }
138              
139 0         0 $this->SUPER::print_usage($message);
140             }
141              
142             sub add_help_types {
143 17     17 0 44 my $this = shift;
144 17         103 $this->use_help_type('keyspecs');
145 17         71 $this->use_help_type('keygroups');
146 17         72 $this->use_help_type('keys');
147 17         56 $this->use_help_type('domainlanguage');
148 17         59 $this->use_help_type('clumping');
149             $this->add_help_type(
150             'aggregators',
151 0     0   0 sub { print App::RecordStream::Aggregator->list_implementations(); },
152 17         239 'List the aggregators'
153             );
154             $this->add_help_type(
155             'more',
156 0     0     sub { $this->more_help() },
157 17         103 'Larger help documentation'
158             );
159             }
160              
161             sub usage {
162 0     0 0   my $this = shift;
163              
164             my $options = [
165             [ 'dlaggregator|-A ...', 'Specify a domain language aggregate. See "Domain Language Integration" below.'],
166             [ 'aggregator|-a ', 'Colon separated list of aggregate field specifiers. See "Aggregates" section below.'],
167             [ 'mr-agg ', 'Specify a map reduce aggregator via 3 snippets, similar to mr_agg() from the domain language.'],
168             [ 'ii-agg ', 'Specify an inject into aggregator via 3 snippets, similar to ii_agg() from the domain language.'],
169             [ 'incremental', 'Output a record every time an input record is added to a clump (instead of every time a clump is flushed).'],
170             [ '[no]-bucket', 'With --bucket outputs one record per clump, with --no-bucket outputs one record for each record that went into the clump.'],
171             $this->{'CLUMPER_OPTIONS'}->main_usage(),
172              
173             [ 'list-aggregators|--list', 'Bail and output a list of aggregators' ],
174             [ 'show-aggregator ', 'Bail and output this aggregator\'s detailed usage.'],
175 0           $this->{'CLUMPER_OPTIONS'}->help_usage(),
176             ];
177              
178 0           my $args_string = $this->options_string($options);
179              
180             return <
181             Usage: recs-collate []
182             __FORMAT_TEXT__
183             Take records, grouped togther by --keys, and compute statistics (like
184             average, count, sum, concat, etc) within those groups.
185              
186             For starting with collate, try doing single --key collates with some number
187             of aggregators (list available in --list-agrregators)
188             __FORMAT_TEXT__
189              
190             Arguments:
191             $args_string
192              
193             Examples:
194             Count clumps of adjacent lines with matching x fields.
195             recs-collate --adjacent --key x --aggregator count
196             Count number of each x field value in the entire file.
197             recs-collate --key x --aggregator count
198             Finds the maximum latency for each date, hour pair
199             recs-collate --key date,hour --aggregator worst_latency=max,latency
200             Find the median value of x+y in records
201             recs-collate --dlaggregator "m=perc(50,snip(<<{{x}}+{{y}}>>))"
202             USAGE
203 0           }
204              
205             sub more_help {
206 0     0 0   my $this = shift;
207 0           my $usage = $this->usage() . <
208              
209             Aggregates:
210             __FORMAT_TEXT__
211             Aggregates are specified as [=][,]. The
212             default field name is aggregator and arguments joined by underscores. See
213             --list-aggregators for a list of available aggregators.
214              
215             Fieldname maybe a key spec. (i.e. foo/bar=sum,field). Additionally, all key
216             name arguments to aggregators maybe be key specs (i.e.
217             foo=max,latency/url), but not key groups
218             __FORMAT_TEXT__
219              
220             Cubing:
221             __FORMAT_TEXT__
222             Instead of added one entry for each input record, we add 2 ** (number of key
223             fields), with every possible combination of fields replaced with the default
224             of "ALL". This is not meant to be used with --adjacent or --size. If our
225             key fields were x and y then we'd get output records for {x = 1, y = 2}, {x
226             = 1, y = ALL}, {x = ALL, y = 2} and {x = ALL, y = ALL}.
227             __FORMAT_TEXT__
228              
229             Domain Lanuage Integration:
230             USAGE
231 0           $usage .= App::RecordStream::DomainLanguage::short_usage() . <
232              
233             __FORMAT_TEXT__
234             Either aggregates or keys may be specified using the recs domain language.
235             Both --dlkey and --dlaggregator require an options of the format
236             '='. --dlkey requires the code evaluate as a
237             valuation, --dlaggregator requires the code evaluate as an aggregator.
238              
239             See --help-domainlanguage for a more complete description of its workings
240             and a list of available functions.
241              
242             See the examples below for a more gentle introduction.
243             __FORMAT_TEXT__
244              
245             Examples:
246             Count clumps of adjacent lines with matching x fields.
247             recs-collate --adjacent --key x --aggregator count
248             Count number of each x field in the entire file.
249             recs-collate --key x --aggregator count
250             Count number of each x field in the entire file, including an "ALL" line.
251             recs-collate --key x --aggregator count --cube
252             Produce a cummulative sum of field profit up to each date
253             recs-collate --key date --adjacent --incremental --aggregator profit_to_date=sum,profit
254             Produce record count for each date, hour pair
255             recs-collate --key date,hour --aggregator count
256             Finds the maximum latency for each date, hour pair
257             recs-collate --key date,hour --aggregator worst_latency=max,latency
258             Produce a list of hosts in each datacenter.
259             recs-collate --key dc --dlaggregator "hosts=uconcat(', ', 'host')"
260             Sum all time fields
261             recs-collate --key ... --dlaggregator 'times=for_field(qr/^t/, <>)'
262             Find the median value of x+y in records
263             recs-collate --dlaggregator "m=perc(50,snip(<<{{x}}+{{y}}>>))"
264             Count people by first three letters of their name
265             recs-collate --dlkey "tla=<>" -a ct
266             USAGE
267 0           print $this->format_usage($usage);
268             }
269              
270             1;