line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
# |
2
|
|
|
|
|
|
|
# TODO: what this can't handle right now is things like: |
3
|
|
|
|
|
|
|
# |
4
|
|
|
|
|
|
|
# * how many different URLs were there on a per query basis? |
5
|
|
|
|
|
|
|
# |
6
|
|
|
|
|
|
|
|
7
|
|
|
|
|
|
|
package Stream::Aggregate; |
8
|
|
|
|
|
|
|
|
9
|
7
|
|
|
7
|
|
105377
|
use strict; |
|
7
|
|
|
|
|
15
|
|
|
7
|
|
|
|
|
335
|
|
10
|
7
|
|
|
7
|
|
36
|
use warnings; |
|
7
|
|
|
|
|
13
|
|
|
7
|
|
|
|
|
512
|
|
11
|
6
|
|
|
6
|
|
4919
|
use Hash::Util qw(lock_keys); |
|
6
|
|
|
|
|
23251
|
|
|
6
|
|
|
|
|
72
|
|
12
|
6
|
|
|
6
|
|
448
|
use B::Deparse; |
|
6
|
|
|
|
|
12
|
|
|
6
|
|
|
|
|
282
|
|
13
|
5
|
|
|
5
|
|
28
|
use List::Util qw(min max minstr maxstr); |
|
5
|
|
|
|
|
9
|
|
|
5
|
|
|
|
|
567
|
|
14
|
5
|
|
|
5
|
|
4322
|
use Config::Checker; |
|
5
|
|
|
|
|
239306
|
|
|
5
|
|
|
|
|
419
|
|
15
|
5
|
|
|
5
|
|
3532
|
use Stream::Aggregate::Stats; |
|
5
|
|
|
|
|
13
|
|
|
5
|
|
|
|
|
453
|
|
16
|
5
|
|
|
5
|
|
2534
|
use Stream::Aggregate::Random; |
|
5
|
|
|
|
|
11
|
|
|
5
|
|
|
|
|
176
|
|
17
|
5
|
|
|
5
|
|
4849
|
use List::EvenMoreUtils qw(list_difference_position); |
|
5
|
|
|
|
|
6864
|
|
|
5
|
|
|
|
|
323
|
|
18
|
5
|
|
|
5
|
|
4515
|
use Tie::Function::Examples qw(%line_numbers); |
|
5
|
|
|
|
|
8628
|
|
|
5
|
|
|
|
|
787
|
|
19
|
5
|
|
|
5
|
|
36
|
use Eval::LineNumbers qw(eval_line_numbers); |
|
5
|
|
|
|
|
18
|
|
|
5
|
|
|
|
|
223
|
|
20
|
5
|
|
|
5
|
|
30
|
use Config::YAMLMacros::YAML; |
|
5
|
|
|
|
|
10
|
|
|
5
|
|
|
|
|
380
|
|
21
|
5
|
|
|
5
|
|
30
|
use Carp qw(confess); |
|
5
|
|
|
|
|
10
|
|
|
5
|
|
|
|
|
207
|
|
22
|
5
|
|
|
5
|
|
4664
|
use List::MoreUtils qw(uniq); |
|
5
|
|
|
|
|
6090
|
|
|
5
|
|
|
|
|
467
|
|
23
|
5
|
|
|
5
|
|
3575
|
use Clone qw(clone); |
|
5
|
|
|
|
|
16126
|
|
|
5
|
|
|
|
|
46873
|
|
24
|
|
|
|
|
|
|
|
25
|
|
|
|
|
|
|
require Exporter; |
26
|
|
|
|
|
|
|
|
27
|
|
|
|
|
|
|
our @ISA = qw(Exporter); |
28
|
|
|
|
|
|
|
our @EXPORT = qw(generate_aggregation_func); |
29
|
|
|
|
|
|
|
our $VERSION = 0.406; |
30
|
|
|
|
|
|
|
|
31
|
|
|
|
|
|
|
our $suppress_line_numbers = 0; |
32
|
|
|
|
|
|
|
|
33
|
|
|
|
|
|
|
my $prototype_config = <<'END_PROTOTYPE'; |
34
|
|
|
|
|
|
|
max_stats_to_keep: '?<4000>Maximum number of stats to keep for mean/stddev etc[INTEGER]' |
35
|
|
|
|
|
|
|
context: '?From $log, return an array describing the current context[CODE]' |
36
|
|
|
|
|
|
|
context_config: '%optional configuration hash for "context" code' |
37
|
|
|
|
|
|
|
context2columns: '?From @current_context, return a hash of columns[CODE]' |
38
|
|
|
|
|
|
|
context2columns_config: '%optional configuration hash for "context2columns" code' |
39
|
|
|
|
|
|
|
stringify_context: '?Turn @currnet_context into an array of strings[CODE]' |
40
|
|
|
|
|
|
|
stringify_context_config: '%optional configuration hash for "stringify_context" code' |
41
|
|
|
|
|
|
|
finalize_result: '?Final opportunity to adjust the return values[CODE]' |
42
|
|
|
|
|
|
|
finalize_result_config: '%optional configuration has for "finalize_result" code' |
43
|
|
|
|
|
|
|
filter: '?Should this result be saved for statistics and counted for counts?[CODE]' |
44
|
|
|
|
|
|
|
filter_config: '%optional configuration hash for "filter" code' |
45
|
|
|
|
|
|
|
filter_early: '?<0>Check the filter early (before figuring out contexts)?[BOOLEAN]' |
46
|
|
|
|
|
|
|
passthrough: '?Any additional items for the output?[CODE]' |
47
|
|
|
|
|
|
|
passthrough_config: '%optional configuration has for "passthrough" code' |
48
|
|
|
|
|
|
|
ephemeral: '%ephemeral columns (column -> code)' |
49
|
|
|
|
|
|
|
ephemeral0: '%ephemeral columns (column -> code, evaluated before "ephemeral")' |
50
|
|
|
|
|
|
|
ephemeral2: '%ephemeral columns (column -> code, evaluated after "ephemeral")' |
51
|
|
|
|
|
|
|
ephemeral3: '%ephemeral columns (column -> code, evaluated after crossproduct has set context (after "ephemeral2"))' |
52
|
|
|
|
|
|
|
output: '%generated output columns (column -> code)' |
53
|
|
|
|
|
|
|
counter: '%counter columns (column -> code)' |
54
|
|
|
|
|
|
|
percentage: '%like a counter, but divided by the number of items' |
55
|
|
|
|
|
|
|
sum: '%summation columns (column -> code)' |
56
|
|
|
|
|
|
|
dominant: '%most frequent (mode) value (column -> code)' |
57
|
|
|
|
|
|
|
mean: '%mean value columns (column -> code)' |
58
|
|
|
|
|
|
|
standard_deviation: '%standard deviaton value columns (column -> code)' |
59
|
|
|
|
|
|
|
median: '%median value columns (column -> code)' |
60
|
|
|
|
|
|
|
min: '%min value columns (column -> code)' |
61
|
|
|
|
|
|
|
max: '%max value columns (column -> code)' |
62
|
|
|
|
|
|
|
minstr: '%minstr value columns (column -> code)' |
63
|
|
|
|
|
|
|
maxstr: '%maxstr value columns (column -> code)' |
64
|
|
|
|
|
|
|
keep: '%list of values to keep' |
65
|
|
|
|
|
|
|
stat: '%statistical columns (using keep, column -> code)' |
66
|
|
|
|
|
|
|
debug: '?<0>Print out the code for debugging' |
67
|
|
|
|
|
|
|
strict: '?<0>enforce strict and warnings for user code' |
68
|
|
|
|
|
|
|
preprocess: '?Code to pre-process the input data[CODE]' |
69
|
|
|
|
|
|
|
item_name: '?<$log>Name of the item variable' |
70
|
|
|
|
|
|
|
new_context: '?Code that is run when there is a new context[CODE]' |
71
|
|
|
|
|
|
|
new_context_config: '%optional configuration hash for "new_context" code' |
72
|
|
|
|
|
|
|
merge: '?Code that is run when merging a subcontext into a parent context[CODE]' |
73
|
|
|
|
|
|
|
merge_config: '%optional configuration hash for "merge" code' |
74
|
|
|
|
|
|
|
reduce: '?Code that is run when reducing the saved data to save memory[CODE]' |
75
|
|
|
|
|
|
|
merge_config: '%optional configuration hash for "reduce" code' |
76
|
|
|
|
|
|
|
crossproduct: '%crossproduct context, keys are existing columns, values are size limits' |
77
|
|
|
|
|
|
|
simplify: '%code to choose new simpler values for over-full columns (column -> code)' |
78
|
|
|
|
|
|
|
combinations: '%code to decide if new crossproduct context ($row) is worth keeping[CODE]' |
79
|
|
|
|
|
|
|
END_PROTOTYPE |
80
|
|
|
|
|
|
|
|
81
|
|
|
|
|
|
|
sub nonblank |
82
|
|
|
|
|
|
|
{ |
83
|
0
|
|
|
0
|
1
|
0
|
my $value = shift; |
84
|
0
|
0
|
|
|
|
0
|
return undef unless defined $value; |
85
|
0
|
0
|
|
|
|
0
|
return undef if $value eq ''; |
86
|
0
|
|
|
|
|
0
|
return $value; |
87
|
|
|
|
|
|
|
} |
88
|
|
|
|
|
|
|
|
89
|
|
|
|
|
|
|
sub resume_line_numbering |
90
|
|
|
|
|
|
|
{ |
91
|
47
|
|
|
47
|
0
|
246
|
my ($pkg, $file, $line) = caller(0); |
92
|
47
|
|
|
|
|
254
|
return sprintf(qq{#line %d "generated-code-interpoloated-after-%s-%d"\n}, $line, $file, $line); |
93
|
|
|
|
|
|
|
} |
94
|
|
|
|
|
|
|
|
95
|
|
|
|
|
|
|
sub generate_aggregation_func |
96
|
|
|
|
|
|
|
{ |
97
|
5
|
|
|
5
|
0
|
158570
|
my ($agg_config, $extra, $user_extra) = @_; |
98
|
|
|
|
|
|
|
|
99
|
5
|
|
|
|
|
33
|
validate_aggregation_config($agg_config); |
100
|
|
|
|
|
|
|
|
101
|
5
|
|
|
|
|
8036
|
my $renumber = ! $agg_config->{debug}; |
102
|
|
|
|
|
|
|
|
103
|
|
|
|
|
|
|
# input data |
104
|
5
|
|
|
|
|
16
|
my $itemref; |
105
|
|
|
|
|
|
|
my $last_item; |
106
|
|
|
|
|
|
|
|
107
|
|
|
|
|
|
|
# |
108
|
|
|
|
|
|
|
# if counting URLs, then the @current_context might be something like: |
109
|
|
|
|
|
|
|
# 'com', 'apple', '/movies', '/action' |
110
|
|
|
|
|
|
|
# If counting queries it might be something like: |
111
|
|
|
|
|
|
|
# 'homocide', 'movies' |
112
|
|
|
|
|
|
|
# |
113
|
|
|
|
|
|
|
# @contexts is an array to state variables ($ps) that corrospond to the |
114
|
|
|
|
|
|
|
# elements of @current_context. @context_strings is a string-ified |
115
|
|
|
|
|
|
|
# copy of @current_context to handle contexts which are references. |
116
|
|
|
|
|
|
|
# |
117
|
|
|
|
|
|
|
# $count_this is return from &$filter_func; |
118
|
|
|
|
|
|
|
# |
119
|
0
|
|
|
|
|
0
|
my @contexts; |
120
|
0
|
|
|
|
|
0
|
my @context_strings; |
121
|
0
|
|
|
|
|
0
|
my @current_context; |
122
|
0
|
|
|
|
|
0
|
my $ps; |
123
|
0
|
|
|
|
|
0
|
my $oldps; |
124
|
5
|
|
|
|
|
11
|
my $count_this = 1; |
125
|
5
|
|
|
|
|
48
|
my @items_seen = ( 0 ); |
126
|
5
|
|
|
|
|
11
|
my %cross_context; |
127
|
5
|
|
|
|
|
10
|
my $cross_data = {}; |
128
|
5
|
|
|
|
|
14
|
my @cross_keys; |
129
|
5
|
|
|
|
|
11
|
my $cross_limit = 1; |
130
|
5
|
|
|
|
|
8
|
my $cross_count = 0; |
131
|
5
|
|
|
|
|
14
|
my %cross_key_values; |
132
|
|
|
|
|
|
|
my %persist; |
133
|
0
|
|
|
|
|
0
|
my @combinations; |
134
|
|
|
|
|
|
|
|
135
|
|
|
|
|
|
|
# output |
136
|
0
|
|
|
|
|
0
|
my $row; |
137
|
0
|
|
|
|
|
0
|
my $suppress_result; |
138
|
|
|
|
|
|
|
|
139
|
|
|
|
|
|
|
# reduce data to limit memory use |
140
|
0
|
|
|
|
|
0
|
my @keepers; |
141
|
0
|
|
|
|
|
0
|
my @tossers; |
142
|
5
|
|
|
|
|
14
|
my $max_stats2keep = $agg_config->{max_stats_to_keep}; |
143
|
5
|
|
|
|
|
15
|
my $do_reduce; |
144
|
|
|
|
|
|
|
|
145
|
|
|
|
|
|
|
# closures |
146
|
|
|
|
|
|
|
my $get_context_func; |
147
|
0
|
|
|
|
|
0
|
my $count_func; |
148
|
0
|
|
|
|
|
0
|
my $initialize_func; |
149
|
0
|
|
|
|
|
0
|
my $final_values_func; |
150
|
0
|
|
|
|
|
0
|
my $merge_func; |
151
|
0
|
|
|
|
|
0
|
my $context_columns_func; |
152
|
0
|
|
|
|
|
0
|
my $preprocess_func; |
153
|
0
|
|
|
|
|
0
|
my $filter_func; |
154
|
0
|
|
|
|
|
0
|
my $stringify_func; |
155
|
0
|
|
|
|
|
0
|
my $finalize_result_func; |
156
|
0
|
|
|
|
|
0
|
my $passthrough_func; |
157
|
0
|
|
|
|
|
0
|
my $user_merge_func; |
158
|
0
|
|
|
|
|
0
|
my $user_new_context_func; |
159
|
0
|
|
|
|
|
0
|
my $user_reduce_func; |
160
|
0
|
|
|
|
|
0
|
my $cross_reduce_func; |
161
|
0
|
|
|
|
|
0
|
my $new_ps_func; |
162
|
0
|
|
|
|
|
0
|
my $process_func; |
163
|
0
|
|
|
|
|
0
|
my $finish_context_func; |
164
|
0
|
|
|
|
|
0
|
my $finish_cross_func; |
165
|
0
|
|
|
|
|
0
|
my $add_context_component_func; |
166
|
0
|
|
|
|
|
0
|
my $cross_key_reduce_func; |
167
|
5
|
|
|
|
|
12
|
my $declarations = ''; |
168
|
5
|
|
|
|
|
10
|
my %combination_funcs; |
169
|
|
|
|
|
|
|
my $do_combinations; |
170
|
|
|
|
|
|
|
|
171
|
5
|
50
|
|
|
|
27
|
my $strict = $agg_config->{strict} |
172
|
|
|
|
|
|
|
? "use strict; use warnings;" |
173
|
|
|
|
|
|
|
: "no strict; no warnings;"; |
174
|
|
|
|
|
|
|
|
175
|
|
|
|
|
|
|
my $eval_line_numbers = $agg_config->{debug} |
176
|
0
|
|
|
0
|
|
0
|
? sub { $_[0] } |
177
|
5
|
50
|
|
|
|
25
|
: \&eval_line_numbers; |
178
|
|
|
|
|
|
|
|
179
|
5
|
100
|
66
|
|
|
34
|
if ($agg_config->{crossproduct} && keys %{$agg_config->{crossproduct}}) { |
|
3
|
|
|
|
|
31
|
|
180
|
3
|
|
|
|
|
8
|
@cross_keys = sort keys %{$agg_config->{crossproduct}}; |
|
3
|
|
|
|
|
22
|
|
181
|
3
|
|
|
|
|
14
|
for my $k (@cross_keys) { |
182
|
9
|
|
|
|
|
26
|
$cross_limit *= $agg_config->{crossproduct}{$k}; |
183
|
|
|
|
|
|
|
} |
184
|
|
|
|
|
|
|
} |
185
|
|
|
|
|
|
|
|
186
|
|
|
|
|
|
|
my $compile_config = sub { |
187
|
5
|
|
|
5
|
|
13
|
my %varname; |
188
|
|
|
|
|
|
|
my $reduce_func; |
189
|
0
|
|
|
|
|
0
|
my %s; |
190
|
0
|
|
|
|
|
0
|
my %var_types; |
191
|
0
|
|
|
|
|
0
|
my %var_value; |
192
|
|
|
|
|
|
|
|
193
|
5
|
|
|
|
|
435
|
my $deparse = B::Deparse->new("-p", "-sC"); |
194
|
|
|
|
|
|
|
|
195
|
|
|
|
|
|
|
# |
196
|
|
|
|
|
|
|
# A more sophisticated approach would figure out the dependencies of one value on |
197
|
|
|
|
|
|
|
# another and order them appropriately. What's going on here is kinda hit & miss. |
198
|
|
|
|
|
|
|
# |
199
|
|
|
|
|
|
|
my $alias_varname = sub { |
200
|
54
|
|
|
|
|
87
|
my ($cc, $value) = @_; |
201
|
54
|
|
|
|
|
320
|
$varname{"\$column_$cc"} = $value; |
202
|
5
|
|
|
|
|
56
|
}; |
203
|
|
|
|
|
|
|
my $usercode_inner = sub { |
204
|
|
|
|
|
|
|
# |
205
|
|
|
|
|
|
|
# The {precount} undef statements may not be required. |
206
|
|
|
|
|
|
|
# They are there to be safe, just in case someone is referencing |
207
|
|
|
|
|
|
|
# a column that hasn't had its value assigned yet. If so, |
208
|
|
|
|
|
|
|
# they'll always get undef rather than a left-over value from |
209
|
|
|
|
|
|
|
# a previous input record. |
210
|
|
|
|
|
|
|
# |
211
|
62
|
|
|
|
|
96
|
my ($cctype, $cc, $cc_code) = @_; |
212
|
62
|
100
|
|
|
|
143
|
if (! defined($cc_code)) { |
213
|
8
|
|
|
|
|
18
|
$declarations .= "my \$column_$cc;\n"; |
214
|
8
|
|
|
|
|
18
|
$s{precount} .= "\tundef \$column_$cc;\n"; |
215
|
8
|
|
|
|
|
17
|
return; |
216
|
|
|
|
|
|
|
} |
217
|
54
|
100
|
|
|
|
190
|
return $alias_varname->($cc, $varname{$cc_code}) if $varname{$cc_code}; |
218
|
35
|
|
|
|
|
59
|
my $original = $cc_code; |
219
|
35
|
50
|
|
|
|
107
|
return $alias_varname->($cc, $varname{$cc_code}) if $varname{$cc_code}; |
220
|
35
|
100
|
|
|
|
118
|
$cc_code =~ s/(\$column_\w+)/defined($varname{$1}) ? $varname{$1} : $1/ge; |
|
18
|
|
|
|
|
122
|
|
221
|
35
|
100
|
|
|
|
242
|
if ($cc_code =~ /\breturn\b/) { |
|
|
100
|
|
|
|
|
|
|
|
50
|
|
|
|
|
|
222
|
5
|
|
|
|
|
43
|
$cc_code =~ s/^/\t\t/mg; |
223
|
5
|
50
|
|
|
|
40
|
$s{user} .= qq{#line 3001 "FAKE-$extra->{name}-$cctype-$cc"\n} if $renumber; |
224
|
5
|
|
|
|
|
19
|
$s{user} .= "my \$${cctype}_${cc}_func = sub {\n"; |
225
|
5
|
|
|
|
|
11
|
$s{user} .= $cc_code; |
226
|
5
|
|
|
|
|
10
|
$s{user} .= "};\n\n"; |
227
|
5
|
|
|
|
|
16
|
$s{precount} .= "\tundef \$column_$cc;\n"; |
228
|
5
|
|
|
|
|
13
|
$s{$cctype} .= "\t\$column_$cc = "; |
229
|
5
|
|
|
|
|
15
|
$s{$cctype} .= qq{\$${cctype}_${cc}_func->();\n}; |
230
|
|
|
|
|
|
|
} elsif ($cc_code =~ /[;\n]/) { |
231
|
7
|
|
|
|
|
44
|
$cc_code =~ s/^/\t\t/mg; |
232
|
7
|
|
|
|
|
26
|
$s{precount} .= "\tundef \$column_$cc;\n"; |
233
|
7
|
|
|
|
|
21
|
$s{$cctype} .= "\t\$column_$cc = "; |
234
|
7
|
|
|
|
|
12
|
$s{$cctype} .= "do {\n"; |
235
|
7
|
50
|
|
|
|
51
|
$s{$cctype} .= qq{#line 4001 "FAKE-$extra->{name}-$cctype-$cc"\n} if $renumber; |
236
|
7
|
|
|
|
|
26
|
$s{$cctype} .= $cc_code; |
237
|
7
|
|
|
|
|
18
|
$s{$cctype} .= "\n\t};\n"; |
238
|
|
|
|
|
|
|
} elsif ($cc_code =~ /\A\$(column_\w+)\Z/) { |
239
|
0
|
|
|
|
|
0
|
die "value of $cc_code isn't available yet, please compute it in an earlier step like 'ephemeral0'"; |
240
|
|
|
|
|
|
|
} else { |
241
|
23
|
50
|
|
|
|
159
|
$s{$cctype} .= qq{#line 5001 "FAKE-$extra->{name}-$cctype-$cc"\n} if $renumber; |
242
|
23
|
|
|
|
|
123
|
$s{precount} .= "\tundef \$column_$cc;\n"; |
243
|
23
|
|
|
|
|
74
|
$s{$cctype} .= "\t\$column_$cc = $cc_code;\n"; |
244
|
|
|
|
|
|
|
} |
245
|
35
|
|
|
|
|
87
|
$declarations .= "my \$column_$cc;\n"; |
246
|
|
|
|
|
|
|
|
247
|
5
|
|
|
5
|
|
42
|
my $te = eval "no strict; no warnings; sub { $cc_code }"; |
|
5
|
|
|
5
|
|
9
|
|
|
5
|
|
|
5
|
|
216
|
|
|
5
|
|
|
5
|
|
27
|
|
|
5
|
|
|
4
|
|
8
|
|
|
5
|
|
|
4
|
|
549
|
|
|
5
|
|
|
4
|
|
41
|
|
|
5
|
|
|
4
|
|
10
|
|
|
5
|
|
|
4
|
|
197
|
|
|
5
|
|
|
4
|
|
27
|
|
|
5
|
|
|
4
|
|
10
|
|
|
5
|
|
|
3
|
|
441
|
|
|
4
|
|
|
3
|
|
30
|
|
|
4
|
|
|
3
|
|
63
|
|
|
4
|
|
|
2
|
|
1106
|
|
|
4
|
|
|
2
|
|
24
|
|
|
4
|
|
|
2
|
|
96
|
|
|
4
|
|
|
2
|
|
295
|
|
|
4
|
|
|
2
|
|
25
|
|
|
4
|
|
|
1
|
|
8
|
|
|
4
|
|
|
1
|
|
370
|
|
|
4
|
|
|
1
|
|
27
|
|
|
4
|
|
|
1
|
|
6
|
|
|
4
|
|
|
1
|
|
229
|
|
|
4
|
|
|
|
|
25
|
|
|
4
|
|
|
|
|
8
|
|
|
4
|
|
|
|
|
355
|
|
|
4
|
|
|
|
|
27
|
|
|
4
|
|
|
|
|
8
|
|
|
4
|
|
|
|
|
197
|
|
|
4
|
|
|
|
|
26
|
|
|
4
|
|
|
|
|
7
|
|
|
4
|
|
|
|
|
232
|
|
|
3
|
|
|
|
|
15
|
|
|
3
|
|
|
|
|
8
|
|
|
3
|
|
|
|
|
193
|
|
|
3
|
|
|
|
|
20
|
|
|
3
|
|
|
|
|
5
|
|
|
3
|
|
|
|
|
215
|
|
|
3
|
|
|
|
|
19
|
|
|
3
|
|
|
|
|
5
|
|
|
3
|
|
|
|
|
161
|
|
|
2
|
|
|
|
|
15
|
|
|
2
|
|
|
|
|
5
|
|
|
2
|
|
|
|
|
188
|
|
|
2
|
|
|
|
|
13
|
|
|
2
|
|
|
|
|
3
|
|
|
2
|
|
|
|
|
120
|
|
|
2
|
|
|
|
|
11
|
|
|
2
|
|
|
|
|
5
|
|
|
2
|
|
|
|
|
115
|
|
|
2
|
|
|
|
|
13
|
|
|
2
|
|
|
|
|
5
|
|
|
2
|
|
|
|
|
108
|
|
|
2
|
|
|
|
|
32
|
|
|
2
|
|
|
|
|
4
|
|
|
2
|
|
|
|
|
103
|
|
|
1
|
|
|
|
|
4
|
|
|
1
|
|
|
|
|
2
|
|
|
1
|
|
|
|
|
89
|
|
|
1
|
|
|
|
|
5
|
|
|
1
|
|
|
|
|
1
|
|
|
1
|
|
|
|
|
30
|
|
|
1
|
|
|
|
|
4
|
|
|
1
|
|
|
|
|
5
|
|
|
1
|
|
|
|
|
64
|
|
|
1
|
|
|
|
|
7
|
|
|
1
|
|
|
|
|
2
|
|
|
1
|
|
|
|
|
37
|
|
|
1
|
|
|
|
|
5
|
|
|
1
|
|
|
|
|
3
|
|
|
1
|
|
|
|
|
76
|
|
|
35
|
|
|
|
|
3717
|
|
248
|
35
|
50
|
|
|
|
120
|
die "eval $cctype/$cc: $original ($cc_code): $@" if $@; |
249
|
35
|
|
|
|
|
175956
|
my $body = $deparse->coderef2text($te); |
250
|
35
|
50
|
|
|
|
197
|
return $varname{$body} if $varname{$body}; |
251
|
35
|
|
|
|
|
217
|
$varname{$body} = $varname{$cc_code} = $varname{$original} = "\$column_$cc"; |
252
|
35
|
|
|
|
|
140
|
$alias_varname->($cc, $varname{$cc_code}); |
253
|
5
|
|
|
|
|
93
|
}; |
254
|
|
|
|
|
|
|
my $usercode = sub { |
255
|
62
|
|
|
|
|
116
|
my ($cctype, $cc, $cc_code) = @_; |
256
|
62
|
|
|
|
|
116
|
my $value = $usercode_inner->(@_); |
257
|
62
|
|
|
|
|
141
|
$var_value{$cc} = $value; |
258
|
62
|
|
|
|
|
111
|
$var_types{$cc} = $cctype; |
259
|
62
|
|
|
|
|
201
|
return $value; |
260
|
5
|
|
|
|
|
30
|
}; |
261
|
|
|
|
|
|
|
|
262
|
5
|
|
|
|
|
16
|
my %seen; |
263
|
|
|
|
|
|
|
my $cc; |
264
|
|
|
|
|
|
|
|
265
|
5
|
|
|
|
|
43
|
my @all_data = qw(ephemeral0 ephemeral ephemeral2 ephemeral3 keep output counter percentage sum mean standard_deviation median dominant min minstr max maxstr stat); |
266
|
5
|
|
|
|
|
28
|
my @lock_data = qw( keep output counter percentage sum mean standard_deviation median dominant min minstr max maxstr stat); |
267
|
5
|
|
|
|
|
17
|
my @output_cols = qw( output counter percentage sum mean standard_deviation median dominant min minstr max maxstr stat); |
268
|
5
|
|
|
|
|
26
|
my @kept_cols = qw( keep standard_deviation median dominant ); |
269
|
5
|
|
|
|
|
12
|
my @stats_cols = qw( standard_deviation median dominant ); |
270
|
5
|
|
|
|
|
12
|
my @cross_cols = qw(ephemeral0 ephemeral ephemeral2 ); |
271
|
5
|
|
|
|
|
15
|
my %cross_cols; |
272
|
5
|
|
|
|
|
20
|
@cross_cols{@cross_cols} = @cross_cols; |
273
|
|
|
|
|
|
|
|
274
|
|
|
|
|
|
|
# |
275
|
|
|
|
|
|
|
# Compile all the user code that for the various columns |
276
|
|
|
|
|
|
|
# |
277
|
5
|
|
|
|
|
16
|
for my $ucc (@all_data) { |
278
|
90
|
100
|
|
|
|
270
|
next unless $agg_config->{$ucc}; |
279
|
34
|
|
|
|
|
51
|
for $cc (sort keys %{$agg_config->{$ucc}}) { |
|
34
|
|
|
|
|
169
|
|
280
|
62
|
50
|
|
|
|
214
|
die "column $cc is duplicated" if $seen{$cc}++; |
281
|
62
|
|
|
|
|
197
|
$usercode->($ucc, $cc, $agg_config->{$ucc}{$cc}); |
282
|
|
|
|
|
|
|
} |
283
|
|
|
|
|
|
|
} |
284
|
|
|
|
|
|
|
|
285
|
|
|
|
|
|
|
# |
286
|
|
|
|
|
|
|
# 'keep' has to be first because 'stat' can't rewrite names |
287
|
|
|
|
|
|
|
# |
288
|
5
|
|
|
|
|
12
|
my %donekeep; |
289
|
5
|
|
|
|
|
12
|
my $has_keepers = 0; |
290
|
5
|
|
|
|
|
16
|
for my $keepers (@kept_cols) { |
291
|
20
|
|
|
|
|
31
|
for $cc (sort keys %{$agg_config->{$keepers}}) { |
|
20
|
|
|
|
|
91
|
|
292
|
13
|
100
|
|
|
|
51
|
next if $donekeep{$varname{$agg_config->{$keepers}{$cc}}}; |
293
|
10
|
|
|
|
|
31
|
$donekeep{$varname{$agg_config->{$keepers}{$cc}}} = $cc; |
294
|
10
|
|
|
|
|
29
|
$s{initialize} .= "\t\$ps->{keep}{$cc} = [];\n"; |
295
|
10
|
|
|
|
|
45
|
$s{keeper2} .= "\t\tpush(\@{\$ps->{keep}{$cc}}, $varname{$agg_config->{$keepers}{$cc}}) if \$count_this;\n"; |
296
|
10
|
|
|
|
|
32
|
$s{merge} .= "\tpush(\@{\$ps->{keep}{$cc}}, \@{\$oldps->{keep}{$cc}});\n"; |
297
|
10
|
|
|
|
|
28
|
$s{reduce2} .= "\t\@{\$ps->{keep}{$cc}} = \@{\$ps->{keep}{$cc}}[\@keepers];\n"; |
298
|
10
|
|
|
|
|
24
|
$has_keepers++; |
299
|
|
|
|
|
|
|
} |
300
|
|
|
|
|
|
|
} |
301
|
5
|
100
|
|
|
|
20
|
if ($has_keepers) { |
302
|
4
|
|
|
|
|
12
|
$s{initialize} .= "\t# has keepers\n"; |
303
|
4
|
|
|
|
|
10
|
$s{initialize} .= "\t\$ps->{numeric} = {};\n"; |
304
|
|
|
|
|
|
|
|
305
|
4
|
|
|
|
|
33
|
$s{fv_setup} .= "\t# has keepers\n"; |
306
|
4
|
|
|
|
|
9
|
$s{fv_setup} .= "\tlocal(\$Stream::Aggregate::Stats::ps) = \$ps;\n"; |
307
|
|
|
|
|
|
|
|
308
|
4
|
50
|
|
|
|
31
|
$s{keeper1} .= resume_line_numbering if $renumber; |
309
|
4
|
|
|
|
|
17
|
$s{keeper1} .= "\t# has keepers\n"; |
310
|
4
|
|
|
|
|
10
|
$s{keeper1} .= "\tmy \$random = rand(1);\n"; |
311
|
4
|
|
|
|
|
13
|
$s{keeper1} .= "\tif (\@{\$ps->{random}} < $max_stats2keep || \$random < \$ps->{random}[0]) {\n"; |
312
|
4
|
|
|
|
|
10
|
$s{keeper1} .= "\t\tpush(\@{\$ps->{random}}, \$random);\n"; |
313
|
|
|
|
|
|
|
|
314
|
4
|
50
|
|
|
|
21
|
$s{keeper3} .= resume_line_numbering if $renumber; |
315
|
4
|
|
|
|
|
10
|
$s{keeper3} .= "\t\t# has keepers\n"; |
316
|
4
|
|
|
|
|
15
|
$s{keeper3} .= "\t\t&\$reduce_func if \@{\$ps->{random}} > $max_stats2keep * 1.5;\n"; |
317
|
4
|
|
|
|
|
9
|
$s{keeper3} .= "\t}\n"; |
318
|
|
|
|
|
|
|
|
319
|
4
|
50
|
|
|
|
20
|
$s{merge} .= resume_line_numbering if $renumber; |
320
|
4
|
|
|
|
|
10
|
$s{merge} .= "\t# has keepers\n"; |
321
|
4
|
|
|
|
|
9
|
$s{merge} .= "\tpush(\@{\$ps->{random}}, \@{\$oldps->{random}});\n"; |
322
|
|
|
|
|
|
|
|
323
|
4
|
50
|
|
|
|
43
|
$s{merge2} .= resume_line_numbering if $renumber; |
324
|
4
|
|
|
|
|
9
|
$s{merge2} .= "\t# has keepers\n"; |
325
|
4
|
|
|
|
|
22
|
$s{merge2} .= "\t&\$reduce_func if \@{\$ps->{random}} > $max_stats2keep * 1.5;\n"; |
326
|
|
|
|
|
|
|
|
327
|
4
|
|
|
|
|
30
|
$s{reduce} .= $eval_line_numbers->(<<'END_REDUCE'); |
328
|
|
|
|
|
|
|
# has keepers |
329
|
|
|
|
|
|
|
my $random = $ps->{random}; |
330
|
|
|
|
|
|
|
@keepers = sort { $random->[$a] cmp $random->[$b] } 0..$#$random; |
331
|
|
|
|
|
|
|
@tossers = splice(@keepers, $max_stats2keep); |
332
|
|
|
|
|
|
|
@$random = @$random[@keepers]; |
333
|
|
|
|
|
|
|
END_REDUCE |
334
|
4
|
50
|
|
|
|
89
|
$s{reduce} .= resume_line_numbering if $renumber; |
335
|
|
|
|
|
|
|
} |
336
|
|
|
|
|
|
|
|
337
|
5
|
|
|
|
|
11
|
for $cc (sort keys %{$agg_config->{output}}) { |
|
5
|
|
|
|
|
26
|
|
338
|
5
|
|
|
|
|
24
|
$s{initialize} .= "\t\$ps->{output}{$cc} = 0;\n"; |
339
|
|
|
|
|
|
|
} |
340
|
|
|
|
|
|
|
|
341
|
5
|
|
|
|
|
14
|
for $cc (sort keys %{$agg_config->{counter}}) { |
|
5
|
|
|
|
|
23
|
|
342
|
1
|
|
|
|
|
3
|
$s{initialize} .= "\t\$ps->{counter}{$cc} = 0;\n"; |
343
|
1
|
|
|
|
|
6
|
$s{count2} .= "\t\$ps->{counter}{$cc}++ if $varname{$agg_config->{counter}{$cc}};\n"; |
344
|
1
|
|
|
|
|
12
|
$s{merge} .= "\t\$ps->{counter}{$cc} += \$oldps->{counter}{$cc};\n"; |
345
|
|
|
|
|
|
|
} |
346
|
|
|
|
|
|
|
|
347
|
5
|
|
|
|
|
12
|
for $cc (sort keys %{$agg_config->{percentage}}) { |
|
5
|
|
|
|
|
23
|
|
348
|
1
|
|
|
|
|
5
|
$s{initialize} .= "\t\$ps->{percentage}{$cc} = undef;\n"; |
349
|
1
|
|
|
|
|
5
|
$s{stat} .= "\t\$ps->{percentage}{$cc} = \$ps->{percentage_counter}{$cc} * 100 / (\$ps->{percentage_total}{$cc} || .001);\n"; |
350
|
1
|
|
|
|
|
3
|
$s{initialize} .= "\t\$ps->{percentage_counter}{$cc} = 0;\n"; |
351
|
1
|
|
|
|
|
4
|
$s{initialize} .= "\t\$ps->{percentage_total}{$cc} = 0;\n"; |
352
|
1
|
|
|
|
|
5
|
$s{count2} .= "\t\$ps->{percentage_counter}{$cc}++ if $varname{$agg_config->{percentage}{$cc}};\n"; |
353
|
1
|
|
|
|
|
5
|
$s{count2} .= "\t\$ps->{percentage_total}{$cc}++ if defined $varname{$agg_config->{percentage}{$cc}};\n"; |
354
|
1
|
|
|
|
|
4
|
$s{merge} .= "\t\$ps->{percentage_counter}{$cc} += \$oldps->{percentage_counter}{$cc};\n"; |
355
|
1
|
|
|
|
|
4
|
$s{merge} .= "\t\$ps->{percentage_total}{$cc} += \$oldps->{percentage_total}{$cc};\n"; |
356
|
|
|
|
|
|
|
} |
357
|
|
|
|
|
|
|
|
358
|
5
|
|
|
|
|
14
|
for $cc (sort keys %{$agg_config->{sum}}) { |
|
5
|
|
|
|
|
25
|
|
359
|
3
|
|
|
|
|
14
|
$s{initialize} .= "\t\$ps->{sum}{$cc} = 0;\n"; |
360
|
3
|
|
|
|
|
13
|
$s{count2} .= "\t\$ps->{sum}{$cc} += $varname{$agg_config->{sum}{$cc}};\n"; |
361
|
3
|
|
|
|
|
18
|
$s{merge} .= "\t\$ps->{sum}{$cc} += \$oldps->{sum}{$cc};\n"; |
362
|
|
|
|
|
|
|
} |
363
|
|
|
|
|
|
|
|
364
|
5
|
|
|
|
|
88
|
for $cc (sort keys %{$agg_config->{mean}}) { |
|
5
|
|
|
|
|
21
|
|
365
|
7
|
|
|
|
|
26
|
$s{initialize} .= "\t\$ps->{mean}{$cc} = undef;\n"; |
366
|
7
|
|
|
|
|
36
|
$s{stat} .= "\t\$ps->{mean}{$cc} = \$ps->{mean_sum}{$cc} / (\$ps->{mean_count}{$cc} || 100);\n"; |
367
|
7
|
|
|
|
|
24
|
$s{initialize} .= "\t\$ps->{mean_sum}{$cc} = 0;\n"; |
368
|
7
|
|
|
|
|
24
|
$s{initialize} .= "\t\$ps->{mean_count}{$cc} = 0;\n"; |
369
|
7
|
|
|
|
|
32
|
$s{count2} .= "\tif (defined($varname{$agg_config->{mean}{$cc}})) {\n"; |
370
|
7
|
|
|
|
|
28
|
$s{count2} .= "\t \$ps->{mean_sum}{$cc} += $varname{$agg_config->{mean}{$cc}};\n"; |
371
|
7
|
|
|
|
|
22
|
$s{count2} .= "\t \$ps->{mean_count}{$cc}++;\n"; |
372
|
7
|
|
|
|
|
13
|
$s{count2} .= "\t}\n"; |
373
|
7
|
|
|
|
|
26
|
$s{merge} .= "\t\$ps->{mean_sum}{$cc} += \$oldps->{mean_sum}{$cc};\n"; |
374
|
7
|
|
|
|
|
28
|
$s{merge} .= "\t\$ps->{mean_count}{$cc} += \$oldps->{mean_count}{$cc};\n"; |
375
|
|
|
|
|
|
|
} |
376
|
|
|
|
|
|
|
|
377
|
5
|
|
|
|
|
15
|
for $cc (sort keys %{$agg_config->{min}}) { |
|
5
|
|
|
|
|
21
|
|
378
|
2
|
|
|
|
|
7
|
$s{initialize} .= "\t\$ps->{min}{$cc} = undef;\n"; |
379
|
2
|
|
|
|
|
11
|
$s{count2} .= "\t\$ps->{min}{$cc} = min grep { defined } \$ps->{min}{$cc}, $varname{$agg_config->{min}{$cc}};\n"; |
380
|
2
|
|
|
|
|
9
|
$s{merge} .= "\t\$ps->{min}{$cc} = min grep { defined } \$ps->{min}{$cc}, \$oldps->{min}{$cc};\n"; |
381
|
|
|
|
|
|
|
} |
382
|
|
|
|
|
|
|
|
383
|
5
|
|
|
|
|
22
|
for $cc (sort keys %{$agg_config->{minstr}}) { |
|
5
|
|
|
|
|
25
|
|
384
|
0
|
|
|
|
|
0
|
$s{initialize} .= "\t\$ps->{minstr}{$cc} = undef;\n"; |
385
|
0
|
|
|
|
|
0
|
$s{count2} .= "\t\$ps->{minstr}{$cc} = minstr grep { defined } \$ps->{minstr}{$cc}, $varname{$agg_config->{minstr}{$cc}};\n"; |
386
|
0
|
|
|
|
|
0
|
$s{merge} .= "\t\$ps->{minstr}{$cc} = minstr grep { defined } \$ps->{minstr}{$cc}, \$oldps->{minstr}{$cc};\n"; |
387
|
|
|
|
|
|
|
} |
388
|
|
|
|
|
|
|
|
389
|
5
|
|
|
|
|
13
|
for $cc (sort keys %{$agg_config->{max}}) { |
|
5
|
|
|
|
|
21
|
|
390
|
3
|
|
|
|
|
11
|
$s{initialize} .= "\t\$ps->{max}{$cc} = undef;\n"; |
391
|
3
|
|
|
|
|
14
|
$s{count2} .= "\t\$ps->{max}{$cc} = max grep { defined } \$ps->{max}{$cc}, $varname{$agg_config->{max}{$cc}};\n"; |
392
|
3
|
|
|
|
|
14
|
$s{merge} .= "\t\$ps->{max}{$cc} = max grep { defined } \$ps->{max}{$cc}, \$oldps->{max}{$cc};\n"; |
393
|
|
|
|
|
|
|
} |
394
|
|
|
|
|
|
|
|
395
|
5
|
|
|
|
|
15
|
for $cc (sort keys %{$agg_config->{maxstr}}) { |
|
5
|
|
|
|
|
21
|
|
396
|
0
|
|
|
|
|
0
|
$s{initialize} .= "\t\$ps->{maxstr}{$cc} = undef;\n"; |
397
|
0
|
|
|
|
|
0
|
$s{count2} .= "\t\$ps->{maxstr}{$cc} = maxstr grep { defined } \$ps->{maxstr}{$cc}, $varname{$agg_config->{maxstr}{$cc}};\n"; |
398
|
0
|
|
|
|
|
0
|
$s{merge} .= "\t\$ps->{maxstr}{$cc} = maxstr grep { defined } \$ps->{maxstr}{$cc}, \$oldps->{maxstr}{$cc};\n"; |
399
|
|
|
|
|
|
|
} |
400
|
|
|
|
|
|
|
|
401
|
5
|
|
|
|
|
15
|
for my $statc (@stats_cols) { |
402
|
15
|
|
|
|
|
22
|
for $cc (sort keys %{$agg_config->{$statc}}) { |
|
15
|
|
|
|
|
60
|
|
403
|
6
|
|
50
|
|
|
33
|
my $keepcc = $donekeep{$varname{$agg_config->{$statc}{$cc}}} || die; |
404
|
6
|
|
|
|
|
33
|
$s{initialize} .= "\t\$ps->{$statc}{$cc} = undef;\n"; |
405
|
6
|
|
|
|
|
32
|
$s{stat} .= "\t\$ps->{$statc}{$cc} = $statc('$keepcc');\n"; |
406
|
|
|
|
|
|
|
} |
407
|
|
|
|
|
|
|
} |
408
|
|
|
|
|
|
|
|
409
|
5
|
|
|
|
|
12
|
for $cc (sort keys %{$agg_config->{stat}}) { |
|
5
|
|
|
|
|
20
|
|
410
|
3
|
|
|
|
|
11
|
$s{stat} .= "\t\$ps->{stat}{$cc} = $varname{$agg_config->{stat}{$cc}};\n"; |
411
|
3
|
|
|
|
|
7
|
$s{initialize} .= "\t\$ps->{stat}{$cc} = undef;\n"; |
412
|
|
|
|
|
|
|
} |
413
|
|
|
|
|
|
|
|
414
|
5
|
|
|
|
|
13
|
for my $cc (sort keys %{$agg_config->{output}}) { |
|
5
|
|
|
|
|
18
|
|
415
|
5
|
|
|
|
|
22
|
$s{initialize} .= "\t\$ps->{output}{$cc} = undef;\n"; |
416
|
5
|
|
|
|
|
24
|
$s{stat} .= "\t\$ps->{output}{$cc} = $varname{$agg_config->{output}{$cc}};\n"; |
417
|
|
|
|
|
|
|
} |
418
|
|
|
|
|
|
|
|
419
|
5
|
|
|
|
|
16
|
for my $icol (@lock_data) { |
420
|
70
|
|
|
|
|
206
|
$s{initialize} .= "\tlock_keys(%{\$ps->{$icol}});\n" |
421
|
70
|
100
|
|
|
|
69
|
if keys %{$agg_config->{$icol}}; |
422
|
|
|
|
|
|
|
} |
423
|
|
|
|
|
|
|
|
424
|
5
|
|
|
|
|
14
|
for my $ctype (@output_cols) { |
425
|
65
|
|
|
|
|
65
|
for $cc (sort keys %{$agg_config->{$ctype}}) { |
|
65
|
|
|
|
|
166
|
|
426
|
31
|
|
|
|
|
103
|
$s{final_values} .= "\t\$row->{$cc} = \$ps->{$ctype}{$cc};\n"; |
427
|
|
|
|
|
|
|
} |
428
|
|
|
|
|
|
|
} |
429
|
5
|
100
|
|
|
|
32
|
$s{final_values} .= "\t&\$finalize_result_func;\n" if $agg_config->{finalize_result}; |
430
|
|
|
|
|
|
|
|
431
|
5
|
|
|
|
|
12
|
my $code = $strict; |
432
|
5
|
50
|
|
|
|
32
|
$code .= qq{\n#line 1 "FAKE-all-code-for-$extra->{name}"\n} if $renumber; |
433
|
5
|
|
|
|
|
22
|
$code .= qq{\nmy $agg_config->{item_name};\n}; |
434
|
5
|
|
|
|
|
20
|
$code .= $declarations; |
435
|
5
|
|
|
|
|
8
|
$code .= "{\n"; |
436
|
|
|
|
|
|
|
|
437
|
5
|
|
|
|
|
28
|
$s{reduce} .= "\t&\$user_reduce_func;\n"; |
438
|
|
|
|
|
|
|
|
439
|
|
|
|
|
|
|
my $assemble_code = sub { |
440
|
50
|
|
|
|
|
203
|
my ($func, @keys) = @_; |
441
|
50
|
|
|
|
|
54
|
my $something; |
442
|
50
|
|
|
|
|
61
|
my $c = "# ---------------------------------------------------------------\n"; |
443
|
50
|
100
|
|
|
|
188
|
$c .= "\$${func}_func = sub {\n" |
444
|
|
|
|
|
|
|
if $func; |
445
|
50
|
|
|
|
|
72
|
for my $s (@keys) { |
446
|
200
|
100
|
|
|
|
419
|
next unless exists $s{$s}; |
447
|
114
|
50
|
|
|
|
468
|
$c .= qq{\n#line 1001 "FAKEFUNC-$extra->{name}-$func-$s"\n} if $renumber; |
448
|
114
|
|
|
|
|
209
|
$c .= $s{$s}; |
449
|
114
|
|
|
|
|
169
|
delete $s{$s}; |
450
|
114
|
|
|
|
|
189
|
$something = 1; |
451
|
|
|
|
|
|
|
} |
452
|
50
|
100
|
100
|
|
|
218
|
$c .= "\t0\n" |
453
|
|
|
|
|
|
|
if $func && ! $something; |
454
|
50
|
100
|
|
|
|
114
|
$c .= "};\n" |
455
|
|
|
|
|
|
|
if $func; |
456
|
50
|
|
|
|
|
373
|
return $c; |
457
|
5
|
|
|
|
|
37
|
}; |
458
|
|
|
|
|
|
|
|
459
|
|
|
|
|
|
|
# |
460
|
|
|
|
|
|
|
# Cross product aggregation & counts |
461
|
|
|
|
|
|
|
# |
462
|
5
|
100
|
|
|
|
34
|
if (@cross_keys) { |
|
|
50
|
|
|
|
|
|
463
|
3
|
|
|
|
|
6
|
my $esub = ''; |
464
|
3
|
|
|
|
|
7
|
my $newsub = ''; |
465
|
3
|
|
|
|
|
5
|
my $oldsub = ''; |
466
|
3
|
|
|
|
|
7
|
my $loop_in = ''; |
467
|
3
|
|
|
|
|
5
|
my $loop_in2 = ''; |
468
|
3
|
|
|
|
|
6
|
my $loop_in3 = ''; |
469
|
3
|
|
|
|
|
4
|
my $loop_in3a = ''; |
470
|
3
|
|
|
|
|
7
|
my $loop_out = ''; |
471
|
3
|
|
|
|
|
5
|
my $loop_out2 = ''; |
472
|
3
|
|
|
|
|
5
|
my $loop_indent = ""; |
473
|
3
|
|
|
|
|
6
|
my $loop_head = ''; |
474
|
3
|
|
|
|
|
4
|
my $loop_mid = ''; |
475
|
3
|
|
|
|
|
6
|
my $loop_mid3 = ''; |
476
|
3
|
|
|
|
|
6
|
my $loop_dbug_old = ''; |
477
|
3
|
|
|
|
|
6
|
my $loop_dbug_new = ''; |
478
|
3
|
|
|
|
|
7
|
for my $cc (@cross_keys) { |
479
|
9
|
50
|
|
|
|
27
|
die "Crossproduct column '$cc' doesn't exist" unless $var_types{$cc}; |
480
|
9
|
50
|
|
|
|
25
|
die "Crossproduct column '$cc' ($var_types{$cc}) isn't a valid type (@cross_cols)" unless $cross_cols{$var_types{$cc}}; |
481
|
|
|
|
|
|
|
|
482
|
9
|
|
100
|
|
|
46
|
my $cc_code = $agg_config->{simplify}{$cc} || 'return "*";'; |
483
|
9
|
|
|
|
|
54
|
$s{user} .= "my \$simplify_$cc = sub {\n"; |
484
|
9
|
50
|
|
|
|
42
|
$s{user} .= qq{#line 3001 "FAKE-$extra->{name}-simplify-$cc"\n} if $renumber; |
485
|
9
|
|
|
|
|
19
|
$s{user} .= "\t".$cc_code; |
486
|
9
|
|
|
|
|
12
|
$s{user} .= "\n};\n"; |
487
|
|
|
|
|
|
|
|
488
|
9
|
|
|
|
|
28
|
$loop_head .= "\tmy %key_count_$cc;\n"; |
489
|
|
|
|
|
|
|
|
490
|
9
|
|
|
|
|
27
|
$loop_mid .= "\tmy \$key_map_$cc = \$cross_key_reduce_func->('$cc', \\%key_count_$cc, \$simplify_$cc);\n"; |
491
|
9
|
|
|
|
|
25
|
$loop_mid3 .= ", $cc => \$key_$cc"; |
492
|
|
|
|
|
|
|
|
493
|
9
|
|
|
|
|
18
|
$loop_dbug_old .= " $cc:\$key_$cc"; |
494
|
9
|
|
|
|
|
16
|
$loop_dbug_new .= " $cc:\$new_$cc"; |
495
|
|
|
|
|
|
|
|
496
|
9
|
|
|
|
|
23
|
$loop_in2 .= "$loop_indent for my \$key_$cc (keys %{\$cross_data$oldsub}) {\n"; |
497
|
|
|
|
|
|
|
|
498
|
9
|
|
|
|
|
34
|
$loop_in .= "$loop_indent for my \$key_$cc (keys %{\$cross_data$oldsub}) {\n"; |
499
|
9
|
|
|
|
|
24
|
$loop_in .= "$loop_indent my \$new_$cc = \$key_$cc;\n"; |
500
|
9
|
|
|
|
|
18
|
$loop_in .= "$loop_indent my \$must_inc = 0;\n"; |
501
|
9
|
|
|
|
|
19
|
$loop_in .= "$loop_indent if (exists \$key_map_${cc}->{\$key_$cc}) {\n"; |
502
|
9
|
|
|
|
|
21
|
$loop_in .= "$loop_indent \$new_$cc = \$key_map_${cc}->{\$key_$cc};\n"; |
503
|
9
|
|
|
|
|
22
|
$loop_in .= "$loop_indent \$must_inc = 1;\n"; |
504
|
9
|
|
|
|
|
14
|
$loop_in .= "$loop_indent \$must_do++;\n"; |
505
|
9
|
|
|
|
|
13
|
$loop_in .= "$loop_indent } else {\n"; |
506
|
9
|
|
|
|
|
19
|
$loop_in .= "$loop_indent \$new_$cc = \$key_$cc;\n"; |
507
|
9
|
|
|
|
|
16
|
$loop_in .= "$loop_indent }\n"; |
508
|
|
|
|
|
|
|
|
509
|
9
|
|
|
|
|
20
|
$loop_in3a .= "\$key_count_${cc}{\$key_$cc}++;\n"; |
510
|
|
|
|
|
|
|
|
511
|
9
|
|
|
|
|
15
|
$loop_out .= "$loop_indent }\n"; |
512
|
9
|
|
|
|
|
16
|
$loop_out .= "$loop_indent \$must_do -= \$must_inc;\n"; |
513
|
9
|
|
|
|
|
14
|
$loop_out2 .= "$loop_indent }\n"; |
514
|
|
|
|
|
|
|
|
515
|
9
|
|
|
|
|
17
|
$loop_indent .= "\t"; |
516
|
|
|
|
|
|
|
|
517
|
9
|
|
|
|
|
23
|
$esub .= "->{$var_value{$cc}}"; |
518
|
9
|
|
|
|
|
18
|
$newsub .= "->{\$new_$cc}"; |
519
|
9
|
|
|
|
|
26
|
$oldsub .= "->{\$key_$cc}"; |
520
|
|
|
|
|
|
|
}; |
521
|
3
|
|
|
|
|
16
|
for my $in3a (split(/\n/, $loop_in3a)) { |
522
|
9
|
|
|
|
|
43
|
$loop_in3 .= "$loop_indent $in3a\n"; |
523
|
|
|
|
|
|
|
} |
524
|
|
|
|
|
|
|
|
525
|
3
|
|
|
|
|
21
|
$loop_out = join("\n", reverse split(/\n/, $loop_out)) . "\n"; |
526
|
3
|
|
|
|
|
554
|
$loop_out2 = join("\n", reverse split(/\n/, $loop_out2)) . "\n"; |
527
|
|
|
|
|
|
|
|
528
|
|
|
|
|
|
|
# |
529
|
|
|
|
|
|
|
# Reduce the number of contexts |
530
|
|
|
|
|
|
|
# |
531
|
|
|
|
|
|
|
|
532
|
|
|
|
|
|
|
$cross_key_reduce_func = sub { |
533
|
9
|
|
|
|
|
727
|
my ($keyname, $valcounts, $simplify_func) = @_; |
534
|
9
|
|
|
|
|
11
|
my %ret; |
535
|
9
|
100
|
|
|
|
52
|
if (keys %$valcounts > $agg_config->{crossproduct}{$keyname}) { |
536
|
2
|
|
|
|
|
5
|
$do_reduce = 1; |
537
|
2
|
|
|
|
|
6
|
my $limit = $agg_config->{crossproduct}{$keyname}; |
538
|
2
|
|
|
|
|
7
|
my $current = keys %$valcounts; |
539
|
2
|
|
|
|
|
2
|
my %seen; |
540
|
|
|
|
|
|
|
my %new; |
541
|
2
|
|
|
|
|
12
|
for my $val (sort { $valcounts->{$a} <=> $valcounts->{$b} } keys %$valcounts) { |
|
33
|
|
|
|
|
68
|
|
542
|
15
|
100
|
|
|
|
31
|
if ($current > $limit) { |
543
|
12
|
|
|
|
|
28
|
my $new = $simplify_func->($val, $keyname); |
544
|
12
|
50
|
|
|
|
39
|
next if $new eq $val; |
545
|
12
|
100
|
|
|
|
36
|
if ($seen{$new}++) { |
546
|
10
|
|
|
|
|
12
|
$current--; |
547
|
|
|
|
|
|
|
} |
548
|
12
|
|
|
|
|
14
|
$new{$new}++; |
549
|
12
|
50
|
|
|
|
24
|
if ($new{$val}) { |
550
|
|
|
|
|
|
|
# we can't throw this one away since we have new |
551
|
|
|
|
|
|
|
# users... we may not be able to meet our contract. |
552
|
0
|
0
|
|
|
|
0
|
$current-- unless --$seen{$new}; |
553
|
0
|
|
|
|
|
0
|
$new{$new}--; |
554
|
0
|
|
|
|
|
0
|
next; |
555
|
|
|
|
|
|
|
} else { |
556
|
12
|
|
|
|
|
25
|
$ret{$val} = $new; |
557
|
|
|
|
|
|
|
} |
558
|
|
|
|
|
|
|
} |
559
|
|
|
|
|
|
|
} |
560
|
|
|
|
|
|
|
} |
561
|
9
|
50
|
|
|
|
27
|
print STDERR YAML::Dump("reduce $keyname", \%ret) if $agg_config->{debug} > 2; |
562
|
9
|
|
|
|
|
30
|
return \%ret; |
563
|
3
|
|
|
|
|
26
|
}; |
564
|
|
|
|
|
|
|
|
565
|
3
|
|
|
|
|
8
|
my $db1 = ''; |
566
|
3
|
|
|
|
|
5
|
my $db2 = ''; |
567
|
3
|
50
|
|
|
|
18
|
$db1 = qq{print STDERR "Merging\t$loop_dbug_old (\$cross_data${oldsub}->{item_counter})\tinto\t$loop_dbug_new\t\$cross_count\\n";} if $agg_config->{debug}; |
568
|
3
|
50
|
|
|
|
13
|
$db2 = qq{print STDERR "Moving\t$loop_dbug_old (\$cross_data${oldsub}->{item_counter})\tto\t$loop_dbug_new\\n";} if $agg_config->{debug}; |
569
|
3
|
50
|
|
|
|
36
|
$s{cross_reduce} .= resume_line_numbering if $renumber; |
570
|
|
|
|
|
|
|
|
571
|
3
|
|
|
|
|
11
|
$s{cross_reduce} .= "\t\$do_reduce = 0;\n"; |
572
|
3
|
|
|
|
|
7
|
$s{cross_reduce} .= "\tmy \$must_do = 0;\n"; |
573
|
3
|
|
|
|
|
14
|
$s{cross_reduce} .= $loop_head; |
574
|
3
|
|
|
|
|
9
|
$s{cross_reduce} .= $loop_in2; |
575
|
3
|
|
|
|
|
8
|
$s{cross_reduce} .= $loop_in3; |
576
|
3
|
|
|
|
|
14
|
$s{cross_reduce} .= $loop_out2; |
577
|
3
|
|
|
|
|
8
|
$s{cross_reduce} .= $loop_mid; |
578
|
3
|
|
|
|
|
8
|
$s{cross_reduce} .= $loop_in; |
579
|
3
|
|
|
|
|
42
|
$s{cross_reduce} .= $eval_line_numbers->(<
|
580
|
|
|
|
|
|
|
# --------------- reduce ------------- |
581
|
|
|
|
|
|
|
if (\$must_do) { |
582
|
|
|
|
|
|
|
if (\$cross_data$newsub) { |
583
|
|
|
|
|
|
|
\$cross_count--; |
584
|
|
|
|
|
|
|
$db1 |
585
|
|
|
|
|
|
|
\$ps = \$cross_data$newsub; |
586
|
|
|
|
|
|
|
\$oldps = delete \$cross_data$oldsub; |
587
|
|
|
|
|
|
|
# |
588
|
|
|
|
|
|
|
# print STDERR "ABOUT TO MERGE: \$key_color \$key_size \$key_style \$oldps\\n"; |
589
|
|
|
|
|
|
|
# print STDERR YAML::Dump("Pre-mege cross-data", \$cross_data); |
590
|
|
|
|
|
|
|
# |
591
|
|
|
|
|
|
|
&\$merge_func; |
592
|
|
|
|
|
|
|
\$ps = \$contexts[-1]; |
593
|
|
|
|
|
|
|
} else { |
594
|
|
|
|
|
|
|
$db2 |
595
|
|
|
|
|
|
|
\$cross_data$newsub = delete \$cross_data$oldsub; |
596
|
|
|
|
|
|
|
} |
597
|
|
|
|
|
|
|
} |
598
|
|
|
|
|
|
|
# --------------- reduce ------------- |
599
|
|
|
|
|
|
|
END_CR |
600
|
3
|
50
|
|
|
|
216
|
$s{cross_reduce} .= resume_line_numbering if $renumber; |
601
|
3
|
|
|
|
|
9
|
$s{cross_reduce} .= $loop_out; |
602
|
|
|
|
|
|
|
|
603
|
|
|
|
|
|
|
# |
604
|
|
|
|
|
|
|
# Add data to the right context |
605
|
|
|
|
|
|
|
# |
606
|
3
|
|
|
|
|
12
|
my $db3 = ''; |
607
|
3
|
50
|
|
|
|
19
|
$db3 = qq{print STDERR "Cross-count: \$cross_count\\n";} if $agg_config->{debug} > 3; |
608
|
3
|
|
|
|
|
27
|
$s{crossproduct} .= $eval_line_numbers->(<
|
609
|
|
|
|
|
|
|
if (\$cross_count > \$cross_limit * 2) { |
610
|
|
|
|
|
|
|
&\$cross_reduce_func; |
611
|
|
|
|
|
|
|
} |
612
|
|
|
|
|
|
|
if (\$cross_data$esub) { |
613
|
|
|
|
|
|
|
\$ps = \$cross_data$esub; |
614
|
|
|
|
|
|
|
} else { |
615
|
|
|
|
|
|
|
&\$new_ps_func; |
616
|
|
|
|
|
|
|
\$cross_data$esub = \$ps; |
617
|
|
|
|
|
|
|
\$cross_count++; |
618
|
|
|
|
|
|
|
$db3 |
619
|
|
|
|
|
|
|
} |
620
|
|
|
|
|
|
|
END_CP |
621
|
3
|
50
|
|
|
|
54
|
$s{crossproduct} .= resume_line_numbering if $renumber; |
622
|
|
|
|
|
|
|
|
623
|
|
|
|
|
|
|
# |
624
|
|
|
|
|
|
|
# handle combinations |
625
|
|
|
|
|
|
|
# |
626
|
3
|
|
|
|
|
11
|
$s{inner_combine} = ''; |
627
|
3
|
|
|
|
|
6
|
$s{outer_combine} = ''; |
628
|
3
|
100
|
|
|
|
15
|
if ($agg_config->{combinations}) { |
629
|
2
|
|
|
|
|
3
|
my $generate_combinations; |
630
|
2
|
|
|
|
|
4
|
my $combination_number = 0; |
631
|
2
|
|
|
|
|
5
|
my %mapping; |
632
|
|
|
|
|
|
|
$generate_combinations = sub { |
633
|
12
|
|
|
|
|
29
|
my ($output_field, $input_ps, $indent, $keys, $done, $loop_over) = @_; |
634
|
12
|
|
|
|
|
18
|
my $out = \$s{$output_field}; |
635
|
12
|
|
|
|
|
16
|
my $loopout = ''; |
636
|
12
|
|
|
|
|
17
|
my $x = ''; |
637
|
12
|
|
|
|
|
14
|
my $y = ''; |
638
|
12
|
100
|
|
|
|
25
|
my @loop_keys = grep { $agg_config->{combinations}{$_} && ! $done->{$_} } sort @$keys; |
|
17
|
|
|
|
|
107
|
|
639
|
12
|
|
|
|
|
30
|
my @delayed_call; |
640
|
12
|
100
|
|
|
|
35
|
if ($loop_over) { |
641
|
10
|
|
|
|
|
13
|
for my $k (@loop_keys) { |
642
|
11
|
|
|
|
|
29
|
$$out .= "$indent$x for my \$ck_$k (keys %{$input_ps}) {\n"; |
643
|
11
|
|
|
|
|
22
|
$loopout = "$indent$x }\n$loopout"; |
644
|
11
|
|
|
|
|
14
|
$x .= "\t"; |
645
|
11
|
|
|
|
|
24
|
$input_ps .= "{\$ck_$k}"; |
646
|
|
|
|
|
|
|
} |
647
|
10
|
|
|
|
|
14
|
$y = $x; |
648
|
10
|
100
|
|
|
|
20
|
if (! @loop_keys) { |
649
|
2
|
|
|
|
|
5
|
$y .= "\t"; |
650
|
2
|
|
|
|
|
7
|
$$out .= "$indent$x if ($input_ps) {\n"; |
651
|
|
|
|
|
|
|
} |
652
|
10
|
|
|
|
|
30
|
$$out .= "$indent$y \$row = ${input_ps}->{row};\n"; |
653
|
|
|
|
|
|
|
} |
654
|
12
|
|
|
|
|
18
|
for my $cc (@loop_keys) { |
655
|
16
|
|
|
|
|
26
|
my @keeping = grep { $_ ne $cc } @loop_keys; |
|
30
|
|
|
|
|
63
|
|
656
|
16
|
100
|
|
|
|
71
|
if ($mapping{"@keeping"}++) { |
657
|
6
|
|
|
|
|
15
|
$$out .= "$indent$x # we've already handled keeping '@keeping'\n"; |
658
|
6
|
|
|
|
|
104
|
next; |
659
|
|
|
|
|
|
|
} |
660
|
|
|
|
|
|
|
|
661
|
11
|
|
|
|
|
38
|
my $accessor = @keeping |
662
|
10
|
100
|
|
|
|
26
|
? "{" . join("}{", map { "\$row->{'$_'}" } @keeping) . "}" |
663
|
|
|
|
|
|
|
: ''; |
664
|
|
|
|
|
|
|
|
665
|
10
|
100
|
100
|
|
|
50
|
if ($loop_over && @keeping) { |
666
|
3
|
|
|
|
|
4
|
$accessor = "{" . join("}{", map { "\$ck_$_" } @keeping) . "}" |
|
3
|
|
|
|
|
13
|
|
667
|
|
|
|
|
|
|
} |
668
|
|
|
|
|
|
|
|
669
|
|
|
|
|
|
|
# yes, we're using auto-vivification. It's ugly, but simplifies |
670
|
|
|
|
|
|
|
# the code. |
671
|
|
|
|
|
|
|
|
672
|
10
|
|
|
|
|
15
|
$$out .= "\n"; |
673
|
10
|
|
|
|
|
18
|
$$out .= "$indent$x # combine, dropping $cc"; |
674
|
10
|
100
|
|
|
|
32
|
$$out .= ", keeping: @keeping" if @keeping; |
675
|
10
|
|
|
|
|
13
|
$$out .= "\n"; |
676
|
|
|
|
|
|
|
|
677
|
10
|
|
|
|
|
20
|
$$out .= "$indent$x if (\$combination_funcs{'$cc'}->()) {\n"; |
678
|
10
|
|
|
|
|
23
|
$$out .= "$indent$x if (\$combinations[$combination_number]$accessor) {\n"; |
679
|
10
|
|
|
|
|
19
|
$$out .= "$indent$x local(\$Stream::Aggregate::Stats::ps)\n"; |
680
|
10
|
|
|
|
|
13
|
$$out .= "$indent$x = \$ps\n"; |
681
|
10
|
|
|
|
|
19
|
$$out .= "$indent$x = \$combinations[$combination_number]$accessor;\n"; |
682
|
10
|
100
|
|
|
|
29
|
$$out .= "$indent$x \$oldps = $input_ps;\n" if $input_ps ne '$oldps'; |
683
|
10
|
|
|
|
|
34
|
$$out .= "$indent$x &\$merge_func;\n"; |
684
|
10
|
|
|
|
|
16
|
$$out .= "$indent$x } else {\n"; |
685
|
10
|
|
|
|
|
27
|
$$out .= "$indent$x \$ps = \$combinations[$combination_number]$accessor = clone($input_ps);\n"; |
686
|
10
|
|
|
|
|
17
|
$$out .= "$indent$x \$ps->{row} = { %\$row };\n"; |
687
|
10
|
|
|
|
|
17
|
$$out .= "$indent$x delete \$ps->{row}{'$cc'};\n"; |
688
|
10
|
|
|
|
|
14
|
$$out .= "$indent$x }\n"; |
689
|
10
|
|
|
|
|
89
|
$$out .= "$indent$x }\n"; |
690
|
|
|
|
|
|
|
|
691
|
10
|
|
|
|
|
16
|
my $pnum = $combination_number++; |
692
|
|
|
|
|
|
|
|
693
|
|
|
|
|
|
|
push(@delayed_call, sub { |
694
|
10
|
|
|
|
|
101
|
$generate_combinations->( |
695
|
|
|
|
|
|
|
outer_combine => "\$combinations[$pnum]", |
696
|
|
|
|
|
|
|
"", |
697
|
|
|
|
|
|
|
\@keeping, |
698
|
|
|
|
|
|
|
{ %$done, $cc => 1 }, |
699
|
|
|
|
|
|
|
$cc); |
700
|
10
|
|
|
|
|
50
|
}); |
701
|
|
|
|
|
|
|
} |
702
|
12
|
100
|
|
|
|
26
|
if ($loop_over) { |
703
|
10
|
|
|
|
|
15
|
$$out .= "\n"; |
704
|
10
|
|
|
|
|
27
|
$$out .= "$indent$y # final values with '@loop_keys' keys\n"; |
705
|
10
|
|
|
|
|
44
|
$$out .= "$indent$y local(\$Stream::Aggregate::Stats::ps) = \$ps = ${input_ps};\n"; |
706
|
10
|
|
|
|
|
19
|
$$out .= "$indent$y \$suppress_result = 0;\n"; |
707
|
10
|
|
|
|
|
14
|
$$out .= "$indent$y \$final_values_func->();\n"; |
708
|
10
|
|
|
|
|
26
|
$$out .= "$indent$y push(\@\$retref, \$row) unless \$suppress_result;\n"; |
709
|
10
|
|
|
|
|
13
|
$$out .= "\n"; |
710
|
10
|
100
|
|
|
|
22
|
if (! @loop_keys) { |
711
|
2
|
|
|
|
|
5
|
$$out .= "$indent$x }\n"; |
712
|
|
|
|
|
|
|
} |
713
|
|
|
|
|
|
|
|
714
|
10
|
|
|
|
|
14
|
$$out .= $loopout; |
715
|
|
|
|
|
|
|
} |
716
|
12
|
|
|
|
|
94
|
while (my $dc = shift(@delayed_call)) { |
717
|
10
|
|
|
|
|
21
|
$dc->(); |
718
|
|
|
|
|
|
|
} |
719
|
|
|
|
|
|
|
|
720
|
2
|
|
|
|
|
33
|
}; |
721
|
2
|
|
|
|
|
11
|
$generate_combinations->( |
722
|
|
|
|
|
|
|
inner_combine => '$oldps', |
723
|
|
|
|
|
|
|
"\t\t\t", |
724
|
|
|
|
|
|
|
\@cross_keys, |
725
|
|
|
|
|
|
|
{}, |
726
|
|
|
|
|
|
|
undef); |
727
|
|
|
|
|
|
|
} |
728
|
|
|
|
|
|
|
|
729
|
|
|
|
|
|
|
# |
730
|
|
|
|
|
|
|
# Return the cross product results |
731
|
|
|
|
|
|
|
# |
732
|
3
|
50
|
|
|
|
15
|
$s{finish_cross} .= qq{print STDERR "Finish cross called\n";} if $agg_config->{debug} > 7; |
733
|
3
|
50
|
|
|
|
34
|
$s{finish_cross} .= qq{print STDERR YAML::Dump('cross_data-before',\$cross_data);\n} if $agg_config->{debug} > 8; |
734
|
3
|
|
|
|
|
8
|
$s{finish_cross} .= "\tmy (\$retref) = shift;\n"; |
735
|
3
|
|
|
|
|
6
|
$s{finish_cross} .= "\tmy \$rowtmp;\n"; |
736
|
3
|
|
|
|
|
7
|
$s{finish_cross} .= "\t&\$cross_reduce_func;\n"; |
737
|
3
|
50
|
|
|
|
12
|
$s{finish_cross} .= qq{print STDERR YAML::Dump('cross_data-after',\$cross_data);\n} if $agg_config->{debug} > 8; |
738
|
3
|
|
|
|
|
6
|
$s{finish_cross} .= $loop_in2; |
739
|
3
|
|
|
|
|
26
|
$s{finish_cross} .= $eval_line_numbers->(<
|
740
|
|
|
|
|
|
|
# --------------- finish cross ------------- |
741
|
|
|
|
|
|
|
local(\$Stream::Aggregate::Stats::ps) |
742
|
|
|
|
|
|
|
= \$ps |
743
|
|
|
|
|
|
|
= \$cross_data$oldsub; |
744
|
|
|
|
|
|
|
confess unless \$ps; |
745
|
|
|
|
|
|
|
\$suppress_result = 0; |
746
|
|
|
|
|
|
|
\$rowtmp = \$row = { &\$context_columns_func $loop_mid3 }; |
747
|
|
|
|
|
|
|
&\$final_values_func; |
748
|
|
|
|
|
|
|
push(@\$retref, \$row) unless \$suppress_result; |
749
|
|
|
|
|
|
|
\$oldps = delete \$cross_data$oldsub; |
750
|
|
|
|
|
|
|
\$oldps->{row} = \$row; |
751
|
|
|
|
|
|
|
\$ps = \$contexts[-1]; |
752
|
|
|
|
|
|
|
&\$merge_func if \$ps; |
753
|
|
|
|
|
|
|
\$cross_count--; |
754
|
|
|
|
|
|
|
$db3 |
755
|
|
|
|
|
|
|
END_FC |
756
|
3
|
|
|
|
|
60
|
$s{finish_cross} .= delete $s{inner_combine}; |
757
|
3
|
|
|
|
|
7
|
$s{finish_cross} .= "\t\t\t\t# --------------- finish cross -------------\n"; |
758
|
3
|
50
|
|
|
|
16
|
$s{finish_cross} .= resume_line_numbering if $renumber; |
759
|
3
|
|
|
|
|
7
|
$s{finish_cross} .= $loop_out2; |
760
|
3
|
|
|
|
|
17
|
$s{finish_cross} .= delete $s{outer_combine}; |
761
|
|
|
|
|
|
|
} elsif ($agg_config->{combinations}) { |
762
|
0
|
|
|
|
|
0
|
die "combinations requires crossproduct which isn't defined"; |
763
|
|
|
|
|
|
|
} |
764
|
|
|
|
|
|
|
|
765
|
5
|
|
|
|
|
22
|
$code .= $eval_line_numbers->(<<'END_FIELDS'); |
766
|
|
|
|
|
|
|
|
767
|
|
|
|
|
|
|
my $compile_user_code = sub { |
768
|
|
|
|
|
|
|
my ($c, $field, $config_key, $default) = @_; |
769
|
|
|
|
|
|
|
return $default unless defined $c->{$field}; |
770
|
|
|
|
|
|
|
my $config = $c->{$config_key} || {}; # maybe used by eval |
771
|
|
|
|
|
|
|
my $coderef; |
772
|
|
|
|
|
|
|
my $code = $strict; |
773
|
|
|
|
|
|
|
$code .= qq{\n#line 2001 "FAKE-$extra->{name}-$field"\n} if $renumber; |
774
|
|
|
|
|
|
|
$code .= qq{sub { $c->{$field} }; }; |
775
|
4
|
|
|
4
|
|
28
|
my $sub = eval $code; |
|
4
|
|
|
4
|
|
8
|
|
|
4
|
|
|
4
|
|
161
|
|
|
4
|
|
|
4
|
|
40
|
|
|
4
|
|
|
4
|
|
7
|
|
|
4
|
|
|
4
|
|
929
|
|
|
4
|
|
|
|
|
25
|
|
|
4
|
|
|
|
|
11
|
|
|
4
|
|
|
|
|
232
|
|
|
4
|
|
|
|
|
22
|
|
|
4
|
|
|
|
|
8
|
|
|
4
|
|
|
|
|
760
|
|
|
4
|
|
|
|
|
22
|
|
|
4
|
|
|
|
|
8
|
|
|
4
|
|
|
|
|
145
|
|
|
4
|
|
|
|
|
104
|
|
|
4
|
|
|
|
|
8
|
|
|
4
|
|
|
|
|
715
|
|
776
|
|
|
|
|
|
|
die "Cannot compile user code for $extra->{name}/$field: $@\n$code" if $@; |
777
|
|
|
|
|
|
|
return $coderef if $coderef; |
778
|
|
|
|
|
|
|
return $sub; |
779
|
|
|
|
|
|
|
}; |
780
|
|
|
|
|
|
|
|
781
|
|
|
|
|
|
|
$get_context_func = $compile_user_code->($agg_config, 'context', 'context_config', sub { return () }); |
782
|
|
|
|
|
|
|
$context_columns_func = $compile_user_code->($agg_config, 'context2columns', 'context2columns_config', sub { return () }); |
783
|
|
|
|
|
|
|
$filter_func = $compile_user_code->($agg_config, 'filter', 'filter_config', sub { 1 }); |
784
|
|
|
|
|
|
|
$preprocess_func = $compile_user_code->($agg_config, 'preprocess', 'preprocess_config', sub {}); |
785
|
|
|
|
|
|
|
$stringify_func = $compile_user_code->($agg_config, 'stringify_context', 'stringify_context_config', sub { map { ref($_) ? Dump($_) : $_ } @_ }); |
786
|
|
|
|
|
|
|
$finalize_result_func = $compile_user_code->($agg_config, 'finalize_result', 'finalize_result_config', sub {}); |
787
|
|
|
|
|
|
|
$passthrough_func = $compile_user_code->($agg_config, 'passthrough', 'passthrough_config', sub { return () }); |
788
|
|
|
|
|
|
|
$user_new_context_func = $compile_user_code->($agg_config, 'new_context', 'new_context_config', sub { return () }); |
789
|
|
|
|
|
|
|
$user_merge_func = $compile_user_code->($agg_config, 'merge', 'merge_config', sub { return () }); |
790
|
|
|
|
|
|
|
$user_reduce_func = $compile_user_code->($agg_config, 'reduce', 'reduce_config', sub { return () }); |
791
|
|
|
|
|
|
|
|
792
|
|
|
|
|
|
|
if ($agg_config->{crossproduct} && $agg_config->{combinations}) { |
793
|
|
|
|
|
|
|
for my $crosskey (uniq(keys(%{$agg_config->{crossproduct}}), keys(%{$agg_config->{combinations}}))) { |
794
|
|
|
|
|
|
|
$combination_funcs{$crosskey} = $compile_user_code->($agg_config->{combinations}, $crosskey, "combine on $crosskey", sub { 0 }); |
795
|
|
|
|
|
|
|
} |
796
|
|
|
|
|
|
|
} |
797
|
|
|
|
|
|
|
|
798
|
|
|
|
|
|
|
END_FIELDS |
799
|
5
|
|
|
|
|
122
|
$code .= "\t\$itemref = \\$agg_config->{item_name};\n"; |
800
|
5
|
|
|
|
|
18
|
$code .= "}\n"; |
801
|
|
|
|
|
|
|
|
802
|
|
|
|
|
|
|
# |
803
|
|
|
|
|
|
|
# New context ($ps) allocator |
804
|
|
|
|
|
|
|
# |
805
|
|
|
|
|
|
|
|
806
|
5
|
|
|
|
|
14
|
$s{new_ps} .= "\t\$ps = {};\n"; |
807
|
5
|
|
|
|
|
12
|
$s{new_ps} .= "\t\$ps->{item_counter} = 0;\n"; |
808
|
5
|
100
|
|
|
|
41
|
$s{new_ps} .= "\t\$ps->{heap} = {};\n" |
809
|
|
|
|
|
|
|
if Dump($agg_config) =~ /\{heap\}/; |
810
|
5
|
100
|
|
|
|
1060
|
if ($has_keepers) { |
811
|
4
|
|
|
|
|
13
|
$s{new_ps} .= "\t\$ps->{random} = [];\n"; |
812
|
4
|
|
|
|
|
12
|
$s{new_ps} .= "\t\$ps->{sidestats} = {};\n"; # for Stream::Aggregate::Stats |
813
|
|
|
|
|
|
|
} |
814
|
5
|
100
|
|
|
|
21
|
$s{new_ps} .= "\t\$ps->{unfiltered_counter} = 0;\n" if $agg_config->{filter}; |
815
|
5
|
50
|
|
|
|
24
|
$s{new_ps} .= "\t&\$initialize_func;\n" if $s{initialize}; |
816
|
5
|
50
|
|
|
|
18
|
$s{new_ps} .= "\t&\$user_new_context_func;\n" if $agg_config->{new_context}; |
817
|
5
|
|
|
|
|
14
|
$s{new_ps} .= "\t\$ps->{row} = undef;\n"; |
818
|
5
|
|
|
|
|
8
|
$s{new_ps} .= "\tlock_keys(%\$ps);\n"; |
819
|
|
|
|
|
|
|
|
820
|
|
|
|
|
|
|
# |
821
|
|
|
|
|
|
|
# main processing loop, generated for execution efficiency |
822
|
|
|
|
|
|
|
# |
823
|
|
|
|
|
|
|
|
824
|
5
|
50
|
|
|
|
17
|
$s{process} .= "\t\$last_item = \$\$itemref;\n" |
825
|
|
|
|
|
|
|
if Dump($agg_config) =~ /\$last_item\b/; |
826
|
5
|
|
|
|
|
701
|
$s{process} .= $eval_line_numbers->(<<'END_P0'); |
827
|
|
|
|
|
|
|
$last_item = $$itemref; |
828
|
|
|
|
|
|
|
($$itemref) = @_; |
829
|
|
|
|
|
|
|
my @ret; |
830
|
|
|
|
|
|
|
unless ($$itemref) { |
831
|
|
|
|
|
|
|
$finish_cross_func->(\@ret) if keys %$cross_data; |
832
|
|
|
|
|
|
|
$finish_context_func->(\@ret) |
833
|
|
|
|
|
|
|
while @contexts; |
834
|
|
|
|
|
|
|
return @ret; |
835
|
|
|
|
|
|
|
} |
836
|
|
|
|
|
|
|
END_P0 |
837
|
5
|
100
|
|
|
|
93
|
$s{process} .= $eval_line_numbers->(<<'END_P1') if $agg_config->{preprocess}; |
838
|
|
|
|
|
|
|
|
839
|
|
|
|
|
|
|
&$preprocess_func; |
840
|
|
|
|
|
|
|
|
841
|
|
|
|
|
|
|
END_P1 |
842
|
5
|
100
|
66
|
|
|
48
|
$s{process} .= $eval_line_numbers->(<<'END_P2') if $agg_config->{filter} && $agg_config->{filter_early}; |
843
|
|
|
|
|
|
|
|
844
|
|
|
|
|
|
|
$count_this = &$filter_func; |
845
|
|
|
|
|
|
|
END_P2 |
846
|
5
|
50
|
|
|
|
44
|
$s{process} .= $eval_line_numbers->(<<'END_P3') if $agg_config->{passthrough}; |
847
|
|
|
|
|
|
|
|
848
|
|
|
|
|
|
|
push(@ret, &$passthrough_func); |
849
|
|
|
|
|
|
|
|
850
|
|
|
|
|
|
|
END_P3 |
851
|
|
|
|
|
|
|
|
852
|
5
|
100
|
|
|
|
20
|
if ($agg_config->{context}) { |
853
|
2
|
50
|
33
|
|
|
11
|
$s{process} .= $eval_line_numbers->(<<'END_P4') if $agg_config->{filter} && $agg_config->{filter_early}; |
854
|
|
|
|
|
|
|
|
855
|
|
|
|
|
|
|
if ($count_this) { |
856
|
|
|
|
|
|
|
|
857
|
|
|
|
|
|
|
END_P4 |
858
|
2
|
|
|
|
|
7
|
$s{process} .= $eval_line_numbers->(<<'END_P5'); |
859
|
|
|
|
|
|
|
|
860
|
|
|
|
|
|
|
my @new_context = &$get_context_func; |
861
|
|
|
|
|
|
|
my @new_strings = $stringify_func->(@new_context); |
862
|
|
|
|
|
|
|
|
863
|
|
|
|
|
|
|
my $diffpos = list_difference_position(@new_strings, @context_strings); |
864
|
|
|
|
|
|
|
|
865
|
|
|
|
|
|
|
if (defined $diffpos) { |
866
|
|
|
|
|
|
|
$finish_context_func->(\@ret) |
867
|
|
|
|
|
|
|
while @current_context >= $diffpos; |
868
|
|
|
|
|
|
|
} |
869
|
|
|
|
|
|
|
|
870
|
|
|
|
|
|
|
while (@new_context > @current_context) { |
871
|
|
|
|
|
|
|
$add_context_component_func->($new_context[@current_context], $new_strings[@current_context]); |
872
|
|
|
|
|
|
|
} |
873
|
|
|
|
|
|
|
END_P5 |
874
|
|
|
|
|
|
|
|
875
|
2
|
50
|
33
|
|
|
37
|
$s{process} .= $eval_line_numbers->(<<'END_P7') if $agg_config->{filter} && $agg_config->{filter_early}; |
876
|
|
|
|
|
|
|
} |
877
|
|
|
|
|
|
|
|
878
|
|
|
|
|
|
|
END_P7 |
879
|
|
|
|
|
|
|
} |
880
|
|
|
|
|
|
|
|
881
|
5
|
50
|
66
|
|
|
32
|
$s{process} .= $eval_line_numbers->(<<'END_P7A') if $agg_config->{filter} && ! $agg_config->{filter_early}; |
882
|
|
|
|
|
|
|
|
883
|
|
|
|
|
|
|
$count_this = &$filter_func; |
884
|
|
|
|
|
|
|
END_P7A |
885
|
|
|
|
|
|
|
|
886
|
5
|
100
|
66
|
|
|
40
|
$s{process} .= $eval_line_numbers->(<<'END_P7B') if $agg_config->{filter} && ! $agg_config->{context}; |
887
|
|
|
|
|
|
|
if ($count_this) { |
888
|
|
|
|
|
|
|
END_P7B |
889
|
|
|
|
|
|
|
|
890
|
5
|
|
|
|
|
42
|
$s{process} .= $eval_line_numbers->(<<'END_P8'); |
891
|
|
|
|
|
|
|
&$count_func; |
892
|
|
|
|
|
|
|
$ps->{item_counter}++; |
893
|
|
|
|
|
|
|
END_P8 |
894
|
|
|
|
|
|
|
|
895
|
|
|
|
|
|
|
# this closes the if ($count_this) in P3 or in P7B |
896
|
5
|
100
|
|
|
|
82
|
$s{process} .= $eval_line_numbers->(<<'END_P9') if $agg_config->{filter}; |
897
|
|
|
|
|
|
|
} |
898
|
|
|
|
|
|
|
$ps->{unfiltered_counter}++; |
899
|
|
|
|
|
|
|
END_P9 |
900
|
|
|
|
|
|
|
|
901
|
5
|
|
|
|
|
38
|
$s{process} .= $eval_line_numbers->(<<'END_P10'); |
902
|
|
|
|
|
|
|
return @ret; |
903
|
|
|
|
|
|
|
END_P10 |
904
|
5
|
50
|
|
|
|
81
|
$s{process} .= resume_line_numbering if $renumber; |
905
|
|
|
|
|
|
|
|
906
|
|
|
|
|
|
|
# |
907
|
|
|
|
|
|
|
# Merge contexts func |
908
|
|
|
|
|
|
|
# |
909
|
|
|
|
|
|
|
|
910
|
5
|
50
|
|
|
|
26
|
$s{merge0} .= "print STDERR YAML::Dump('MERGE', \$ps, \$oldps);\n" if $agg_config->{debug} > 11; |
911
|
5
|
50
|
|
|
|
28
|
$s{merge0} .= resume_line_numbering if $renumber; |
912
|
5
|
|
|
|
|
15
|
$s{merge0} .= "\t\$ps->{item_counter} += \$oldps->{item_counter};\n"; |
913
|
5
|
100
|
|
|
|
24
|
$s{merge0} .= "\t\$ps->{unfiltered_counter} += \$oldps->{unfiltered_counter};\n" if $agg_config->{filter}; |
914
|
5
|
50
|
|
|
|
23
|
$s{merge3} .= resume_line_numbering if $renumber; |
915
|
5
|
|
|
|
|
11
|
$s{merge3} .= "\t&\$user_merge_func;\n"; |
916
|
|
|
|
|
|
|
|
917
|
5
|
50
|
|
|
|
21
|
$s{fv_setup} .= "print STDERR YAML::Dump('final_values', \$ps);\n" if $agg_config->{debug} > 12; |
918
|
|
|
|
|
|
|
|
919
|
5
|
|
|
|
|
32
|
$code .= $assemble_code->('', qw(user)); |
920
|
5
|
|
|
|
|
34
|
$code .= $assemble_code->('merge', qw(merge0 merge merge2 merge3)); |
921
|
5
|
|
|
|
|
20
|
$code .= $assemble_code->('cross_reduce', qw(cross_reduce)); |
922
|
5
|
|
|
|
|
46
|
$code .= $assemble_code->('finish_cross', qw(finish_cross)); |
923
|
5
|
|
|
|
|
16
|
$code .= $assemble_code->('new_ps', qw(new_ps)); |
924
|
5
|
|
|
|
|
18
|
$code .= $assemble_code->('process', qw(process)); |
925
|
5
|
|
|
|
|
18
|
$code .= $assemble_code->('initialize', qw(initialize)); |
926
|
5
|
|
|
|
|
19
|
$code .= $assemble_code->('final_values', qw(fv_setup output stat final_values)); |
927
|
5
|
|
|
|
|
92
|
$code .= $assemble_code->('count', qw(precount count ephemeral0 ephemeral ephemeral2 crossproduct ephemeral3 keep standard_deviation median dominant counter percentage sum mean median min minstr max maxstr count2 keeper1 keeper2 keeper3 )); |
928
|
5
|
|
|
|
|
19
|
$code .= $assemble_code->('reduce', qw(reduce reduce2)); |
929
|
5
|
50
|
|
|
|
27
|
die "INTERNAL ERROR: ".join(' ', keys %s) if keys %s; |
930
|
|
|
|
|
|
|
|
931
|
5
|
50
|
|
|
|
24
|
if ($suppress_line_numbers) { |
932
|
0
|
|
|
|
|
0
|
$code =~ s/^#line \d+ ".*"\s*?\n//mg; |
933
|
|
|
|
|
|
|
} |
934
|
|
|
|
|
|
|
|
935
|
5
|
50
|
|
|
|
20
|
print STDERR $line_numbers{$code}."\n" if $agg_config->{debug}; |
936
|
|
|
|
|
|
|
|
937
|
5
|
|
|
5
|
|
36
|
eval $code; |
|
5
|
|
|
5
|
|
15
|
|
|
5
|
|
|
|
|
183
|
|
|
5
|
|
|
|
|
25
|
|
|
5
|
|
|
|
|
8
|
|
|
5
|
|
|
|
|
13586
|
|
|
5
|
|
|
|
|
614
|
|
938
|
5
|
50
|
|
|
|
6951
|
die "$@\n$line_numbers{$code}" if $@; |
939
|
|
|
|
|
|
|
|
940
|
5
|
|
|
|
|
342
|
}; |
941
|
|
|
|
|
|
|
|
942
|
5
|
|
|
|
|
19
|
&$compile_config; |
943
|
|
|
|
|
|
|
|
944
|
|
|
|
|
|
|
$add_context_component_func = sub { |
945
|
9
|
|
|
9
|
|
15
|
my ($component, $component_string) = @_; |
946
|
|
|
|
|
|
|
|
947
|
9
|
|
|
|
|
23
|
&$new_ps_func; |
948
|
|
|
|
|
|
|
|
949
|
|
|
|
|
|
|
# keep @contexts and @current_context together |
950
|
9
|
|
|
|
|
784
|
push(@current_context, $component); |
951
|
9
|
|
|
|
|
25
|
push(@context_strings, $component_string); |
952
|
9
|
|
|
|
|
14
|
push(@contexts, $ps); |
953
|
|
|
|
|
|
|
|
954
|
9
|
|
|
|
|
15
|
$items_seen[$#contexts] += 1; |
955
|
9
|
|
|
|
|
20
|
$#items_seen = $#contexts; |
956
|
9
|
|
|
|
|
30
|
push(@items_seen, 0); |
957
|
5
|
|
|
|
|
33
|
}; |
958
|
|
|
|
|
|
|
|
959
|
|
|
|
|
|
|
$finish_context_func = sub { |
960
|
9
|
|
|
9
|
|
15
|
my ($retref) = @_; |
961
|
|
|
|
|
|
|
|
962
|
9
|
50
|
|
|
|
21
|
die unless @contexts; |
963
|
|
|
|
|
|
|
|
964
|
9
|
50
|
|
|
|
29
|
print STDERR "about to call finish cross\n" if $agg_config->{debug} > 5; |
965
|
9
|
|
|
|
|
23
|
$finish_cross_func->($retref); |
966
|
|
|
|
|
|
|
|
967
|
9
|
50
|
|
|
|
30
|
die unless @contexts; |
968
|
|
|
|
|
|
|
|
969
|
9
|
50
|
|
|
|
27
|
confess unless ref $ps; |
970
|
|
|
|
|
|
|
|
971
|
9
|
|
|
|
|
12
|
$suppress_result = 0; |
972
|
9
|
|
|
|
|
21
|
$row = { |
973
|
|
|
|
|
|
|
&$context_columns_func, |
974
|
|
|
|
|
|
|
}; |
975
|
9
|
|
|
|
|
362
|
&$final_values_func; |
976
|
|
|
|
|
|
|
|
977
|
|
|
|
|
|
|
# keep @contexts and @current_context together |
978
|
9
|
|
|
|
|
176
|
$oldps = pop(@contexts); |
979
|
9
|
|
|
|
|
57
|
pop(@current_context); |
980
|
9
|
|
|
|
|
14
|
pop(@context_strings); |
981
|
|
|
|
|
|
|
|
982
|
9
|
|
|
|
|
19
|
$ps = $contexts[-1]; |
983
|
|
|
|
|
|
|
|
984
|
9
|
100
|
|
|
|
156
|
&$merge_func if $ps; |
985
|
|
|
|
|
|
|
|
986
|
9
|
50
|
|
|
|
43
|
push (@$retref, $row) unless $suppress_result; |
987
|
5
|
|
|
|
|
29
|
}; |
988
|
|
|
|
|
|
|
|
989
|
5
|
|
|
|
|
55
|
return $process_func; |
990
|
|
|
|
|
|
|
|
991
|
|
|
|
|
|
|
} |
992
|
|
|
|
|
|
|
|
993
|
|
|
|
|
|
|
sub validate_aggregation_config |
994
|
|
|
|
|
|
|
{ |
995
|
5
|
|
|
5
|
0
|
13
|
my ($agg_config) = @_; |
996
|
5
|
|
|
|
|
38
|
my $checker = eval config_checker_source; |
997
|
5
|
50
|
|
|
|
13666
|
die $@ if $@; |
998
|
5
|
|
|
|
|
28
|
$checker->($agg_config, $prototype_config, '- Stream::Aggregate config'); |
999
|
|
|
|
|
|
|
} |
1000
|
|
|
|
|
|
|
|
1001
|
|
|
|
|
|
|
1; |
1002
|
|
|
|
|
|
|
|