| line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
|
1
|
|
|
|
|
|
|
# |
|
2
|
|
|
|
|
|
|
# TODO: what this can't handle right now is things like: |
|
3
|
|
|
|
|
|
|
# |
|
4
|
|
|
|
|
|
|
# * how many different URLs were there on a per query basis? |
|
5
|
|
|
|
|
|
|
# |
|
6
|
|
|
|
|
|
|
|
|
7
|
|
|
|
|
|
|
package Stream::Aggregate; |
|
8
|
|
|
|
|
|
|
|
|
9
|
7
|
|
|
7
|
|
105377
|
use strict; |
|
|
7
|
|
|
|
|
15
|
|
|
|
7
|
|
|
|
|
335
|
|
|
10
|
7
|
|
|
7
|
|
36
|
use warnings; |
|
|
7
|
|
|
|
|
13
|
|
|
|
7
|
|
|
|
|
512
|
|
|
11
|
6
|
|
|
6
|
|
4919
|
use Hash::Util qw(lock_keys); |
|
|
6
|
|
|
|
|
23251
|
|
|
|
6
|
|
|
|
|
72
|
|
|
12
|
6
|
|
|
6
|
|
448
|
use B::Deparse; |
|
|
6
|
|
|
|
|
12
|
|
|
|
6
|
|
|
|
|
282
|
|
|
13
|
5
|
|
|
5
|
|
28
|
use List::Util qw(min max minstr maxstr); |
|
|
5
|
|
|
|
|
9
|
|
|
|
5
|
|
|
|
|
567
|
|
|
14
|
5
|
|
|
5
|
|
4322
|
use Config::Checker; |
|
|
5
|
|
|
|
|
239306
|
|
|
|
5
|
|
|
|
|
419
|
|
|
15
|
5
|
|
|
5
|
|
3532
|
use Stream::Aggregate::Stats; |
|
|
5
|
|
|
|
|
13
|
|
|
|
5
|
|
|
|
|
453
|
|
|
16
|
5
|
|
|
5
|
|
2534
|
use Stream::Aggregate::Random; |
|
|
5
|
|
|
|
|
11
|
|
|
|
5
|
|
|
|
|
176
|
|
|
17
|
5
|
|
|
5
|
|
4849
|
use List::EvenMoreUtils qw(list_difference_position); |
|
|
5
|
|
|
|
|
6864
|
|
|
|
5
|
|
|
|
|
323
|
|
|
18
|
5
|
|
|
5
|
|
4515
|
use Tie::Function::Examples qw(%line_numbers); |
|
|
5
|
|
|
|
|
8628
|
|
|
|
5
|
|
|
|
|
787
|
|
|
19
|
5
|
|
|
5
|
|
36
|
use Eval::LineNumbers qw(eval_line_numbers); |
|
|
5
|
|
|
|
|
18
|
|
|
|
5
|
|
|
|
|
223
|
|
|
20
|
5
|
|
|
5
|
|
30
|
use Config::YAMLMacros::YAML; |
|
|
5
|
|
|
|
|
10
|
|
|
|
5
|
|
|
|
|
380
|
|
|
21
|
5
|
|
|
5
|
|
30
|
use Carp qw(confess); |
|
|
5
|
|
|
|
|
10
|
|
|
|
5
|
|
|
|
|
207
|
|
|
22
|
5
|
|
|
5
|
|
4664
|
use List::MoreUtils qw(uniq); |
|
|
5
|
|
|
|
|
6090
|
|
|
|
5
|
|
|
|
|
467
|
|
|
23
|
5
|
|
|
5
|
|
3575
|
use Clone qw(clone); |
|
|
5
|
|
|
|
|
16126
|
|
|
|
5
|
|
|
|
|
46873
|
|
|
24
|
|
|
|
|
|
|
|
|
25
|
|
|
|
|
|
|
require Exporter; |
|
26
|
|
|
|
|
|
|
|
|
27
|
|
|
|
|
|
|
our @ISA = qw(Exporter); |
|
28
|
|
|
|
|
|
|
our @EXPORT = qw(generate_aggregation_func); |
|
29
|
|
|
|
|
|
|
our $VERSION = 0.406; |
|
30
|
|
|
|
|
|
|
|
|
31
|
|
|
|
|
|
|
our $suppress_line_numbers = 0; |
|
32
|
|
|
|
|
|
|
|
|
33
|
|
|
|
|
|
|
my $prototype_config = <<'END_PROTOTYPE'; |
|
34
|
|
|
|
|
|
|
max_stats_to_keep: '?<4000>Maximum number of stats to keep for mean/stddev etc[INTEGER]' |
|
35
|
|
|
|
|
|
|
context: '?From $log, return an array describing the current context[CODE]' |
|
36
|
|
|
|
|
|
|
context_config: '%optional configuration hash for "context" code' |
|
37
|
|
|
|
|
|
|
context2columns: '?From @current_context, return a hash of columns[CODE]' |
|
38
|
|
|
|
|
|
|
context2columns_config: '%optional configuration hash for "context2columns" code' |
|
39
|
|
|
|
|
|
|
stringify_context: '?Turn @currnet_context into an array of strings[CODE]' |
|
40
|
|
|
|
|
|
|
stringify_context_config: '%optional configuration hash for "stringify_context" code' |
|
41
|
|
|
|
|
|
|
finalize_result: '?Final opportunity to adjust the return values[CODE]' |
|
42
|
|
|
|
|
|
|
finalize_result_config: '%optional configuration has for "finalize_result" code' |
|
43
|
|
|
|
|
|
|
filter: '?Should this result be saved for statistics and counted for counts?[CODE]' |
|
44
|
|
|
|
|
|
|
filter_config: '%optional configuration hash for "filter" code' |
|
45
|
|
|
|
|
|
|
filter_early: '?<0>Check the filter early (before figuring out contexts)?[BOOLEAN]' |
|
46
|
|
|
|
|
|
|
passthrough: '?Any additional items for the output?[CODE]' |
|
47
|
|
|
|
|
|
|
passthrough_config: '%optional configuration has for "passthrough" code' |
|
48
|
|
|
|
|
|
|
ephemeral: '%ephemeral columns (column -> code)' |
|
49
|
|
|
|
|
|
|
ephemeral0: '%ephemeral columns (column -> code, evaluated before "ephemeral")' |
|
50
|
|
|
|
|
|
|
ephemeral2: '%ephemeral columns (column -> code, evaluated after "ephemeral")' |
|
51
|
|
|
|
|
|
|
ephemeral3: '%ephemeral columns (column -> code, evaluated after crossproduct has set context (after "ephemeral2"))' |
|
52
|
|
|
|
|
|
|
output: '%generated output columns (column -> code)' |
|
53
|
|
|
|
|
|
|
counter: '%counter columns (column -> code)' |
|
54
|
|
|
|
|
|
|
percentage: '%like a counter, but divided by the number of items' |
|
55
|
|
|
|
|
|
|
sum: '%summation columns (column -> code)' |
|
56
|
|
|
|
|
|
|
dominant: '%most frequent (mode) value (column -> code)' |
|
57
|
|
|
|
|
|
|
mean: '%mean value columns (column -> code)' |
|
58
|
|
|
|
|
|
|
standard_deviation: '%standard deviaton value columns (column -> code)' |
|
59
|
|
|
|
|
|
|
median: '%median value columns (column -> code)' |
|
60
|
|
|
|
|
|
|
min: '%min value columns (column -> code)' |
|
61
|
|
|
|
|
|
|
max: '%max value columns (column -> code)' |
|
62
|
|
|
|
|
|
|
minstr: '%minstr value columns (column -> code)' |
|
63
|
|
|
|
|
|
|
maxstr: '%maxstr value columns (column -> code)' |
|
64
|
|
|
|
|
|
|
keep: '%list of values to keep' |
|
65
|
|
|
|
|
|
|
stat: '%statistical columns (using keep, column -> code)' |
|
66
|
|
|
|
|
|
|
debug: '?<0>Print out the code for debugging' |
|
67
|
|
|
|
|
|
|
strict: '?<0>enforce strict and warnings for user code' |
|
68
|
|
|
|
|
|
|
preprocess: '?Code to pre-process the input data[CODE]' |
|
69
|
|
|
|
|
|
|
item_name: '?<$log>Name of the item variable' |
|
70
|
|
|
|
|
|
|
new_context: '?Code that is run when there is a new context[CODE]' |
|
71
|
|
|
|
|
|
|
new_context_config: '%optional configuration hash for "new_context" code' |
|
72
|
|
|
|
|
|
|
merge: '?Code that is run when merging a subcontext into a parent context[CODE]' |
|
73
|
|
|
|
|
|
|
merge_config: '%optional configuration hash for "merge" code' |
|
74
|
|
|
|
|
|
|
reduce: '?Code that is run when reducing the saved data to save memory[CODE]' |
|
75
|
|
|
|
|
|
|
merge_config: '%optional configuration hash for "reduce" code' |
|
76
|
|
|
|
|
|
|
crossproduct: '%crossproduct context, keys are existing columns, values are size limits' |
|
77
|
|
|
|
|
|
|
simplify: '%code to choose new simpler values for over-full columns (column -> code)' |
|
78
|
|
|
|
|
|
|
combinations: '%code to decide if new crossproduct context ($row) is worth keeping[CODE]' |
|
79
|
|
|
|
|
|
|
END_PROTOTYPE |
|
80
|
|
|
|
|
|
|
|
|
81
|
|
|
|
|
|
|
sub nonblank |
|
82
|
|
|
|
|
|
|
{ |
|
83
|
0
|
|
|
0
|
1
|
0
|
my $value = shift; |
|
84
|
0
|
0
|
|
|
|
0
|
return undef unless defined $value; |
|
85
|
0
|
0
|
|
|
|
0
|
return undef if $value eq ''; |
|
86
|
0
|
|
|
|
|
0
|
return $value; |
|
87
|
|
|
|
|
|
|
} |
|
88
|
|
|
|
|
|
|
|
|
89
|
|
|
|
|
|
|
sub resume_line_numbering |
|
90
|
|
|
|
|
|
|
{ |
|
91
|
47
|
|
|
47
|
0
|
246
|
my ($pkg, $file, $line) = caller(0); |
|
92
|
47
|
|
|
|
|
254
|
return sprintf(qq{#line %d "generated-code-interpoloated-after-%s-%d"\n}, $line, $file, $line); |
|
93
|
|
|
|
|
|
|
} |
|
94
|
|
|
|
|
|
|
|
|
95
|
|
|
|
|
|
|
sub generate_aggregation_func |
|
96
|
|
|
|
|
|
|
{ |
|
97
|
5
|
|
|
5
|
0
|
158570
|
my ($agg_config, $extra, $user_extra) = @_; |
|
98
|
|
|
|
|
|
|
|
|
99
|
5
|
|
|
|
|
33
|
validate_aggregation_config($agg_config); |
|
100
|
|
|
|
|
|
|
|
|
101
|
5
|
|
|
|
|
8036
|
my $renumber = ! $agg_config->{debug}; |
|
102
|
|
|
|
|
|
|
|
|
103
|
|
|
|
|
|
|
# input data |
|
104
|
5
|
|
|
|
|
16
|
my $itemref; |
|
105
|
|
|
|
|
|
|
my $last_item; |
|
106
|
|
|
|
|
|
|
|
|
107
|
|
|
|
|
|
|
# |
|
108
|
|
|
|
|
|
|
# if counting URLs, then the @current_context might be something like: |
|
109
|
|
|
|
|
|
|
# 'com', 'apple', '/movies', '/action' |
|
110
|
|
|
|
|
|
|
# If counting queries it might be something like: |
|
111
|
|
|
|
|
|
|
# 'homocide', 'movies' |
|
112
|
|
|
|
|
|
|
# |
|
113
|
|
|
|
|
|
|
# @contexts is an array to state variables ($ps) that corrospond to the |
|
114
|
|
|
|
|
|
|
# elements of @current_context. @context_strings is a string-ified |
|
115
|
|
|
|
|
|
|
# copy of @current_context to handle contexts which are references. |
|
116
|
|
|
|
|
|
|
# |
|
117
|
|
|
|
|
|
|
# $count_this is return from &$filter_func; |
|
118
|
|
|
|
|
|
|
# |
|
119
|
0
|
|
|
|
|
0
|
my @contexts; |
|
120
|
0
|
|
|
|
|
0
|
my @context_strings; |
|
121
|
0
|
|
|
|
|
0
|
my @current_context; |
|
122
|
0
|
|
|
|
|
0
|
my $ps; |
|
123
|
0
|
|
|
|
|
0
|
my $oldps; |
|
124
|
5
|
|
|
|
|
11
|
my $count_this = 1; |
|
125
|
5
|
|
|
|
|
48
|
my @items_seen = ( 0 ); |
|
126
|
5
|
|
|
|
|
11
|
my %cross_context; |
|
127
|
5
|
|
|
|
|
10
|
my $cross_data = {}; |
|
128
|
5
|
|
|
|
|
14
|
my @cross_keys; |
|
129
|
5
|
|
|
|
|
11
|
my $cross_limit = 1; |
|
130
|
5
|
|
|
|
|
8
|
my $cross_count = 0; |
|
131
|
5
|
|
|
|
|
14
|
my %cross_key_values; |
|
132
|
|
|
|
|
|
|
my %persist; |
|
133
|
0
|
|
|
|
|
0
|
my @combinations; |
|
134
|
|
|
|
|
|
|
|
|
135
|
|
|
|
|
|
|
# output |
|
136
|
0
|
|
|
|
|
0
|
my $row; |
|
137
|
0
|
|
|
|
|
0
|
my $suppress_result; |
|
138
|
|
|
|
|
|
|
|
|
139
|
|
|
|
|
|
|
# reduce data to limit memory use |
|
140
|
0
|
|
|
|
|
0
|
my @keepers; |
|
141
|
0
|
|
|
|
|
0
|
my @tossers; |
|
142
|
5
|
|
|
|
|
14
|
my $max_stats2keep = $agg_config->{max_stats_to_keep}; |
|
143
|
5
|
|
|
|
|
15
|
my $do_reduce; |
|
144
|
|
|
|
|
|
|
|
|
145
|
|
|
|
|
|
|
# closures |
|
146
|
|
|
|
|
|
|
my $get_context_func; |
|
147
|
0
|
|
|
|
|
0
|
my $count_func; |
|
148
|
0
|
|
|
|
|
0
|
my $initialize_func; |
|
149
|
0
|
|
|
|
|
0
|
my $final_values_func; |
|
150
|
0
|
|
|
|
|
0
|
my $merge_func; |
|
151
|
0
|
|
|
|
|
0
|
my $context_columns_func; |
|
152
|
0
|
|
|
|
|
0
|
my $preprocess_func; |
|
153
|
0
|
|
|
|
|
0
|
my $filter_func; |
|
154
|
0
|
|
|
|
|
0
|
my $stringify_func; |
|
155
|
0
|
|
|
|
|
0
|
my $finalize_result_func; |
|
156
|
0
|
|
|
|
|
0
|
my $passthrough_func; |
|
157
|
0
|
|
|
|
|
0
|
my $user_merge_func; |
|
158
|
0
|
|
|
|
|
0
|
my $user_new_context_func; |
|
159
|
0
|
|
|
|
|
0
|
my $user_reduce_func; |
|
160
|
0
|
|
|
|
|
0
|
my $cross_reduce_func; |
|
161
|
0
|
|
|
|
|
0
|
my $new_ps_func; |
|
162
|
0
|
|
|
|
|
0
|
my $process_func; |
|
163
|
0
|
|
|
|
|
0
|
my $finish_context_func; |
|
164
|
0
|
|
|
|
|
0
|
my $finish_cross_func; |
|
165
|
0
|
|
|
|
|
0
|
my $add_context_component_func; |
|
166
|
0
|
|
|
|
|
0
|
my $cross_key_reduce_func; |
|
167
|
5
|
|
|
|
|
12
|
my $declarations = ''; |
|
168
|
5
|
|
|
|
|
10
|
my %combination_funcs; |
|
169
|
|
|
|
|
|
|
my $do_combinations; |
|
170
|
|
|
|
|
|
|
|
|
171
|
5
|
50
|
|
|
|
27
|
my $strict = $agg_config->{strict} |
|
172
|
|
|
|
|
|
|
? "use strict; use warnings;" |
|
173
|
|
|
|
|
|
|
: "no strict; no warnings;"; |
|
174
|
|
|
|
|
|
|
|
|
175
|
|
|
|
|
|
|
my $eval_line_numbers = $agg_config->{debug} |
|
176
|
0
|
|
|
0
|
|
0
|
? sub { $_[0] } |
|
177
|
5
|
50
|
|
|
|
25
|
: \&eval_line_numbers; |
|
178
|
|
|
|
|
|
|
|
|
179
|
5
|
100
|
66
|
|
|
34
|
if ($agg_config->{crossproduct} && keys %{$agg_config->{crossproduct}}) { |
|
|
3
|
|
|
|
|
31
|
|
|
180
|
3
|
|
|
|
|
8
|
@cross_keys = sort keys %{$agg_config->{crossproduct}}; |
|
|
3
|
|
|
|
|
22
|
|
|
181
|
3
|
|
|
|
|
14
|
for my $k (@cross_keys) { |
|
182
|
9
|
|
|
|
|
26
|
$cross_limit *= $agg_config->{crossproduct}{$k}; |
|
183
|
|
|
|
|
|
|
} |
|
184
|
|
|
|
|
|
|
} |
|
185
|
|
|
|
|
|
|
|
|
186
|
|
|
|
|
|
|
my $compile_config = sub { |
|
187
|
5
|
|
|
5
|
|
13
|
my %varname; |
|
188
|
|
|
|
|
|
|
my $reduce_func; |
|
189
|
0
|
|
|
|
|
0
|
my %s; |
|
190
|
0
|
|
|
|
|
0
|
my %var_types; |
|
191
|
0
|
|
|
|
|
0
|
my %var_value; |
|
192
|
|
|
|
|
|
|
|
|
193
|
5
|
|
|
|
|
435
|
my $deparse = B::Deparse->new("-p", "-sC"); |
|
194
|
|
|
|
|
|
|
|
|
195
|
|
|
|
|
|
|
# |
|
196
|
|
|
|
|
|
|
# A more sophisticated approach would figure out the dependencies of one value on |
|
197
|
|
|
|
|
|
|
# another and order them appropriately. What's going on here is kinda hit & miss. |
|
198
|
|
|
|
|
|
|
# |
|
199
|
|
|
|
|
|
|
my $alias_varname = sub { |
|
200
|
54
|
|
|
|
|
87
|
my ($cc, $value) = @_; |
|
201
|
54
|
|
|
|
|
320
|
$varname{"\$column_$cc"} = $value; |
|
202
|
5
|
|
|
|
|
56
|
}; |
|
203
|
|
|
|
|
|
|
my $usercode_inner = sub { |
|
204
|
|
|
|
|
|
|
# |
|
205
|
|
|
|
|
|
|
# The {precount} undef statements may not be required. |
|
206
|
|
|
|
|
|
|
# They are there to be safe, just in case someone is referencing |
|
207
|
|
|
|
|
|
|
# a column that hasn't had its value assigned yet. If so, |
|
208
|
|
|
|
|
|
|
# they'll always get undef rather than a left-over value from |
|
209
|
|
|
|
|
|
|
# a previous input record. |
|
210
|
|
|
|
|
|
|
# |
|
211
|
62
|
|
|
|
|
96
|
my ($cctype, $cc, $cc_code) = @_; |
|
212
|
62
|
100
|
|
|
|
143
|
if (! defined($cc_code)) { |
|
213
|
8
|
|
|
|
|
18
|
$declarations .= "my \$column_$cc;\n"; |
|
214
|
8
|
|
|
|
|
18
|
$s{precount} .= "\tundef \$column_$cc;\n"; |
|
215
|
8
|
|
|
|
|
17
|
return; |
|
216
|
|
|
|
|
|
|
} |
|
217
|
54
|
100
|
|
|
|
190
|
return $alias_varname->($cc, $varname{$cc_code}) if $varname{$cc_code}; |
|
218
|
35
|
|
|
|
|
59
|
my $original = $cc_code; |
|
219
|
35
|
50
|
|
|
|
107
|
return $alias_varname->($cc, $varname{$cc_code}) if $varname{$cc_code}; |
|
220
|
35
|
100
|
|
|
|
118
|
$cc_code =~ s/(\$column_\w+)/defined($varname{$1}) ? $varname{$1} : $1/ge; |
|
|
18
|
|
|
|
|
122
|
|
|
221
|
35
|
100
|
|
|
|
242
|
if ($cc_code =~ /\breturn\b/) { |
|
|
|
100
|
|
|
|
|
|
|
|
|
50
|
|
|
|
|
|
|
222
|
5
|
|
|
|
|
43
|
$cc_code =~ s/^/\t\t/mg; |
|
223
|
5
|
50
|
|
|
|
40
|
$s{user} .= qq{#line 3001 "FAKE-$extra->{name}-$cctype-$cc"\n} if $renumber; |
|
224
|
5
|
|
|
|
|
19
|
$s{user} .= "my \$${cctype}_${cc}_func = sub {\n"; |
|
225
|
5
|
|
|
|
|
11
|
$s{user} .= $cc_code; |
|
226
|
5
|
|
|
|
|
10
|
$s{user} .= "};\n\n"; |
|
227
|
5
|
|
|
|
|
16
|
$s{precount} .= "\tundef \$column_$cc;\n"; |
|
228
|
5
|
|
|
|
|
13
|
$s{$cctype} .= "\t\$column_$cc = "; |
|
229
|
5
|
|
|
|
|
15
|
$s{$cctype} .= qq{\$${cctype}_${cc}_func->();\n}; |
|
230
|
|
|
|
|
|
|
} elsif ($cc_code =~ /[;\n]/) { |
|
231
|
7
|
|
|
|
|
44
|
$cc_code =~ s/^/\t\t/mg; |
|
232
|
7
|
|
|
|
|
26
|
$s{precount} .= "\tundef \$column_$cc;\n"; |
|
233
|
7
|
|
|
|
|
21
|
$s{$cctype} .= "\t\$column_$cc = "; |
|
234
|
7
|
|
|
|
|
12
|
$s{$cctype} .= "do {\n"; |
|
235
|
7
|
50
|
|
|
|
51
|
$s{$cctype} .= qq{#line 4001 "FAKE-$extra->{name}-$cctype-$cc"\n} if $renumber; |
|
236
|
7
|
|
|
|
|
26
|
$s{$cctype} .= $cc_code; |
|
237
|
7
|
|
|
|
|
18
|
$s{$cctype} .= "\n\t};\n"; |
|
238
|
|
|
|
|
|
|
} elsif ($cc_code =~ /\A\$(column_\w+)\Z/) { |
|
239
|
0
|
|
|
|
|
0
|
die "value of $cc_code isn't available yet, please compute it in an earlier step like 'ephemeral0'"; |
|
240
|
|
|
|
|
|
|
} else { |
|
241
|
23
|
50
|
|
|
|
159
|
$s{$cctype} .= qq{#line 5001 "FAKE-$extra->{name}-$cctype-$cc"\n} if $renumber; |
|
242
|
23
|
|
|
|
|
123
|
$s{precount} .= "\tundef \$column_$cc;\n"; |
|
243
|
23
|
|
|
|
|
74
|
$s{$cctype} .= "\t\$column_$cc = $cc_code;\n"; |
|
244
|
|
|
|
|
|
|
} |
|
245
|
35
|
|
|
|
|
87
|
$declarations .= "my \$column_$cc;\n"; |
|
246
|
|
|
|
|
|
|
|
|
247
|
5
|
|
|
5
|
|
42
|
my $te = eval "no strict; no warnings; sub { $cc_code }"; |
|
|
5
|
|
|
5
|
|
9
|
|
|
|
5
|
|
|
5
|
|
216
|
|
|
|
5
|
|
|
5
|
|
27
|
|
|
|
5
|
|
|
4
|
|
8
|
|
|
|
5
|
|
|
4
|
|
549
|
|
|
|
5
|
|
|
4
|
|
41
|
|
|
|
5
|
|
|
4
|
|
10
|
|
|
|
5
|
|
|
4
|
|
197
|
|
|
|
5
|
|
|
4
|
|
27
|
|
|
|
5
|
|
|
4
|
|
10
|
|
|
|
5
|
|
|
3
|
|
441
|
|
|
|
4
|
|
|
3
|
|
30
|
|
|
|
4
|
|
|
3
|
|
63
|
|
|
|
4
|
|
|
2
|
|
1106
|
|
|
|
4
|
|
|
2
|
|
24
|
|
|
|
4
|
|
|
2
|
|
96
|
|
|
|
4
|
|
|
2
|
|
295
|
|
|
|
4
|
|
|
2
|
|
25
|
|
|
|
4
|
|
|
1
|
|
8
|
|
|
|
4
|
|
|
1
|
|
370
|
|
|
|
4
|
|
|
1
|
|
27
|
|
|
|
4
|
|
|
1
|
|
6
|
|
|
|
4
|
|
|
1
|
|
229
|
|
|
|
4
|
|
|
|
|
25
|
|
|
|
4
|
|
|
|
|
8
|
|
|
|
4
|
|
|
|
|
355
|
|
|
|
4
|
|
|
|
|
27
|
|
|
|
4
|
|
|
|
|
8
|
|
|
|
4
|
|
|
|
|
197
|
|
|
|
4
|
|
|
|
|
26
|
|
|
|
4
|
|
|
|
|
7
|
|
|
|
4
|
|
|
|
|
232
|
|
|
|
3
|
|
|
|
|
15
|
|
|
|
3
|
|
|
|
|
8
|
|
|
|
3
|
|
|
|
|
193
|
|
|
|
3
|
|
|
|
|
20
|
|
|
|
3
|
|
|
|
|
5
|
|
|
|
3
|
|
|
|
|
215
|
|
|
|
3
|
|
|
|
|
19
|
|
|
|
3
|
|
|
|
|
5
|
|
|
|
3
|
|
|
|
|
161
|
|
|
|
2
|
|
|
|
|
15
|
|
|
|
2
|
|
|
|
|
5
|
|
|
|
2
|
|
|
|
|
188
|
|
|
|
2
|
|
|
|
|
13
|
|
|
|
2
|
|
|
|
|
3
|
|
|
|
2
|
|
|
|
|
120
|
|
|
|
2
|
|
|
|
|
11
|
|
|
|
2
|
|
|
|
|
5
|
|
|
|
2
|
|
|
|
|
115
|
|
|
|
2
|
|
|
|
|
13
|
|
|
|
2
|
|
|
|
|
5
|
|
|
|
2
|
|
|
|
|
108
|
|
|
|
2
|
|
|
|
|
32
|
|
|
|
2
|
|
|
|
|
4
|
|
|
|
2
|
|
|
|
|
103
|
|
|
|
1
|
|
|
|
|
4
|
|
|
|
1
|
|
|
|
|
2
|
|
|
|
1
|
|
|
|
|
89
|
|
|
|
1
|
|
|
|
|
5
|
|
|
|
1
|
|
|
|
|
1
|
|
|
|
1
|
|
|
|
|
30
|
|
|
|
1
|
|
|
|
|
4
|
|
|
|
1
|
|
|
|
|
5
|
|
|
|
1
|
|
|
|
|
64
|
|
|
|
1
|
|
|
|
|
7
|
|
|
|
1
|
|
|
|
|
2
|
|
|
|
1
|
|
|
|
|
37
|
|
|
|
1
|
|
|
|
|
5
|
|
|
|
1
|
|
|
|
|
3
|
|
|
|
1
|
|
|
|
|
76
|
|
|
|
35
|
|
|
|
|
3717
|
|
|
248
|
35
|
50
|
|
|
|
120
|
die "eval $cctype/$cc: $original ($cc_code): $@" if $@; |
|
249
|
35
|
|
|
|
|
175956
|
my $body = $deparse->coderef2text($te); |
|
250
|
35
|
50
|
|
|
|
197
|
return $varname{$body} if $varname{$body}; |
|
251
|
35
|
|
|
|
|
217
|
$varname{$body} = $varname{$cc_code} = $varname{$original} = "\$column_$cc"; |
|
252
|
35
|
|
|
|
|
140
|
$alias_varname->($cc, $varname{$cc_code}); |
|
253
|
5
|
|
|
|
|
93
|
}; |
|
254
|
|
|
|
|
|
|
my $usercode = sub { |
|
255
|
62
|
|
|
|
|
116
|
my ($cctype, $cc, $cc_code) = @_; |
|
256
|
62
|
|
|
|
|
116
|
my $value = $usercode_inner->(@_); |
|
257
|
62
|
|
|
|
|
141
|
$var_value{$cc} = $value; |
|
258
|
62
|
|
|
|
|
111
|
$var_types{$cc} = $cctype; |
|
259
|
62
|
|
|
|
|
201
|
return $value; |
|
260
|
5
|
|
|
|
|
30
|
}; |
|
261
|
|
|
|
|
|
|
|
|
262
|
5
|
|
|
|
|
16
|
my %seen; |
|
263
|
|
|
|
|
|
|
my $cc; |
|
264
|
|
|
|
|
|
|
|
|
265
|
5
|
|
|
|
|
43
|
my @all_data = qw(ephemeral0 ephemeral ephemeral2 ephemeral3 keep output counter percentage sum mean standard_deviation median dominant min minstr max maxstr stat); |
|
266
|
5
|
|
|
|
|
28
|
my @lock_data = qw( keep output counter percentage sum mean standard_deviation median dominant min minstr max maxstr stat); |
|
267
|
5
|
|
|
|
|
17
|
my @output_cols = qw( output counter percentage sum mean standard_deviation median dominant min minstr max maxstr stat); |
|
268
|
5
|
|
|
|
|
26
|
my @kept_cols = qw( keep standard_deviation median dominant ); |
|
269
|
5
|
|
|
|
|
12
|
my @stats_cols = qw( standard_deviation median dominant ); |
|
270
|
5
|
|
|
|
|
12
|
my @cross_cols = qw(ephemeral0 ephemeral ephemeral2 ); |
|
271
|
5
|
|
|
|
|
15
|
my %cross_cols; |
|
272
|
5
|
|
|
|
|
20
|
@cross_cols{@cross_cols} = @cross_cols; |
|
273
|
|
|
|
|
|
|
|
|
274
|
|
|
|
|
|
|
# |
|
275
|
|
|
|
|
|
|
# Compile all the user code that for the various columns |
|
276
|
|
|
|
|
|
|
# |
|
277
|
5
|
|
|
|
|
16
|
for my $ucc (@all_data) { |
|
278
|
90
|
100
|
|
|
|
270
|
next unless $agg_config->{$ucc}; |
|
279
|
34
|
|
|
|
|
51
|
for $cc (sort keys %{$agg_config->{$ucc}}) { |
|
|
34
|
|
|
|
|
169
|
|
|
280
|
62
|
50
|
|
|
|
214
|
die "column $cc is duplicated" if $seen{$cc}++; |
|
281
|
62
|
|
|
|
|
197
|
$usercode->($ucc, $cc, $agg_config->{$ucc}{$cc}); |
|
282
|
|
|
|
|
|
|
} |
|
283
|
|
|
|
|
|
|
} |
|
284
|
|
|
|
|
|
|
|
|
285
|
|
|
|
|
|
|
# |
|
286
|
|
|
|
|
|
|
# 'keep' has to be first because 'stat' can't rewrite names |
|
287
|
|
|
|
|
|
|
# |
|
288
|
5
|
|
|
|
|
12
|
my %donekeep; |
|
289
|
5
|
|
|
|
|
12
|
my $has_keepers = 0; |
|
290
|
5
|
|
|
|
|
16
|
for my $keepers (@kept_cols) { |
|
291
|
20
|
|
|
|
|
31
|
for $cc (sort keys %{$agg_config->{$keepers}}) { |
|
|
20
|
|
|
|
|
91
|
|
|
292
|
13
|
100
|
|
|
|
51
|
next if $donekeep{$varname{$agg_config->{$keepers}{$cc}}}; |
|
293
|
10
|
|
|
|
|
31
|
$donekeep{$varname{$agg_config->{$keepers}{$cc}}} = $cc; |
|
294
|
10
|
|
|
|
|
29
|
$s{initialize} .= "\t\$ps->{keep}{$cc} = [];\n"; |
|
295
|
10
|
|
|
|
|
45
|
$s{keeper2} .= "\t\tpush(\@{\$ps->{keep}{$cc}}, $varname{$agg_config->{$keepers}{$cc}}) if \$count_this;\n"; |
|
296
|
10
|
|
|
|
|
32
|
$s{merge} .= "\tpush(\@{\$ps->{keep}{$cc}}, \@{\$oldps->{keep}{$cc}});\n"; |
|
297
|
10
|
|
|
|
|
28
|
$s{reduce2} .= "\t\@{\$ps->{keep}{$cc}} = \@{\$ps->{keep}{$cc}}[\@keepers];\n"; |
|
298
|
10
|
|
|
|
|
24
|
$has_keepers++; |
|
299
|
|
|
|
|
|
|
} |
|
300
|
|
|
|
|
|
|
} |
|
301
|
5
|
100
|
|
|
|
20
|
if ($has_keepers) { |
|
302
|
4
|
|
|
|
|
12
|
$s{initialize} .= "\t# has keepers\n"; |
|
303
|
4
|
|
|
|
|
10
|
$s{initialize} .= "\t\$ps->{numeric} = {};\n"; |
|
304
|
|
|
|
|
|
|
|
|
305
|
4
|
|
|
|
|
33
|
$s{fv_setup} .= "\t# has keepers\n"; |
|
306
|
4
|
|
|
|
|
9
|
$s{fv_setup} .= "\tlocal(\$Stream::Aggregate::Stats::ps) = \$ps;\n"; |
|
307
|
|
|
|
|
|
|
|
|
308
|
4
|
50
|
|
|
|
31
|
$s{keeper1} .= resume_line_numbering if $renumber; |
|
309
|
4
|
|
|
|
|
17
|
$s{keeper1} .= "\t# has keepers\n"; |
|
310
|
4
|
|
|
|
|
10
|
$s{keeper1} .= "\tmy \$random = rand(1);\n"; |
|
311
|
4
|
|
|
|
|
13
|
$s{keeper1} .= "\tif (\@{\$ps->{random}} < $max_stats2keep || \$random < \$ps->{random}[0]) {\n"; |
|
312
|
4
|
|
|
|
|
10
|
$s{keeper1} .= "\t\tpush(\@{\$ps->{random}}, \$random);\n"; |
|
313
|
|
|
|
|
|
|
|
|
314
|
4
|
50
|
|
|
|
21
|
$s{keeper3} .= resume_line_numbering if $renumber; |
|
315
|
4
|
|
|
|
|
10
|
$s{keeper3} .= "\t\t# has keepers\n"; |
|
316
|
4
|
|
|
|
|
15
|
$s{keeper3} .= "\t\t&\$reduce_func if \@{\$ps->{random}} > $max_stats2keep * 1.5;\n"; |
|
317
|
4
|
|
|
|
|
9
|
$s{keeper3} .= "\t}\n"; |
|
318
|
|
|
|
|
|
|
|
|
319
|
4
|
50
|
|
|
|
20
|
$s{merge} .= resume_line_numbering if $renumber; |
|
320
|
4
|
|
|
|
|
10
|
$s{merge} .= "\t# has keepers\n"; |
|
321
|
4
|
|
|
|
|
9
|
$s{merge} .= "\tpush(\@{\$ps->{random}}, \@{\$oldps->{random}});\n"; |
|
322
|
|
|
|
|
|
|
|
|
323
|
4
|
50
|
|
|
|
43
|
$s{merge2} .= resume_line_numbering if $renumber; |
|
324
|
4
|
|
|
|
|
9
|
$s{merge2} .= "\t# has keepers\n"; |
|
325
|
4
|
|
|
|
|
22
|
$s{merge2} .= "\t&\$reduce_func if \@{\$ps->{random}} > $max_stats2keep * 1.5;\n"; |
|
326
|
|
|
|
|
|
|
|
|
327
|
4
|
|
|
|
|
30
|
$s{reduce} .= $eval_line_numbers->(<<'END_REDUCE'); |
|
328
|
|
|
|
|
|
|
# has keepers |
|
329
|
|
|
|
|
|
|
my $random = $ps->{random}; |
|
330
|
|
|
|
|
|
|
@keepers = sort { $random->[$a] cmp $random->[$b] } 0..$#$random; |
|
331
|
|
|
|
|
|
|
@tossers = splice(@keepers, $max_stats2keep); |
|
332
|
|
|
|
|
|
|
@$random = @$random[@keepers]; |
|
333
|
|
|
|
|
|
|
END_REDUCE |
|
334
|
4
|
50
|
|
|
|
89
|
$s{reduce} .= resume_line_numbering if $renumber; |
|
335
|
|
|
|
|
|
|
} |
|
336
|
|
|
|
|
|
|
|
|
337
|
5
|
|
|
|
|
11
|
for $cc (sort keys %{$agg_config->{output}}) { |
|
|
5
|
|
|
|
|
26
|
|
|
338
|
5
|
|
|
|
|
24
|
$s{initialize} .= "\t\$ps->{output}{$cc} = 0;\n"; |
|
339
|
|
|
|
|
|
|
} |
|
340
|
|
|
|
|
|
|
|
|
341
|
5
|
|
|
|
|
14
|
for $cc (sort keys %{$agg_config->{counter}}) { |
|
|
5
|
|
|
|
|
23
|
|
|
342
|
1
|
|
|
|
|
3
|
$s{initialize} .= "\t\$ps->{counter}{$cc} = 0;\n"; |
|
343
|
1
|
|
|
|
|
6
|
$s{count2} .= "\t\$ps->{counter}{$cc}++ if $varname{$agg_config->{counter}{$cc}};\n"; |
|
344
|
1
|
|
|
|
|
12
|
$s{merge} .= "\t\$ps->{counter}{$cc} += \$oldps->{counter}{$cc};\n"; |
|
345
|
|
|
|
|
|
|
} |
|
346
|
|
|
|
|
|
|
|
|
347
|
5
|
|
|
|
|
12
|
for $cc (sort keys %{$agg_config->{percentage}}) { |
|
|
5
|
|
|
|
|
23
|
|
|
348
|
1
|
|
|
|
|
5
|
$s{initialize} .= "\t\$ps->{percentage}{$cc} = undef;\n"; |
|
349
|
1
|
|
|
|
|
5
|
$s{stat} .= "\t\$ps->{percentage}{$cc} = \$ps->{percentage_counter}{$cc} * 100 / (\$ps->{percentage_total}{$cc} || .001);\n"; |
|
350
|
1
|
|
|
|
|
3
|
$s{initialize} .= "\t\$ps->{percentage_counter}{$cc} = 0;\n"; |
|
351
|
1
|
|
|
|
|
4
|
$s{initialize} .= "\t\$ps->{percentage_total}{$cc} = 0;\n"; |
|
352
|
1
|
|
|
|
|
5
|
$s{count2} .= "\t\$ps->{percentage_counter}{$cc}++ if $varname{$agg_config->{percentage}{$cc}};\n"; |
|
353
|
1
|
|
|
|
|
5
|
$s{count2} .= "\t\$ps->{percentage_total}{$cc}++ if defined $varname{$agg_config->{percentage}{$cc}};\n"; |
|
354
|
1
|
|
|
|
|
4
|
$s{merge} .= "\t\$ps->{percentage_counter}{$cc} += \$oldps->{percentage_counter}{$cc};\n"; |
|
355
|
1
|
|
|
|
|
4
|
$s{merge} .= "\t\$ps->{percentage_total}{$cc} += \$oldps->{percentage_total}{$cc};\n"; |
|
356
|
|
|
|
|
|
|
} |
|
357
|
|
|
|
|
|
|
|
|
358
|
5
|
|
|
|
|
14
|
for $cc (sort keys %{$agg_config->{sum}}) { |
|
|
5
|
|
|
|
|
25
|
|
|
359
|
3
|
|
|
|
|
14
|
$s{initialize} .= "\t\$ps->{sum}{$cc} = 0;\n"; |
|
360
|
3
|
|
|
|
|
13
|
$s{count2} .= "\t\$ps->{sum}{$cc} += $varname{$agg_config->{sum}{$cc}};\n"; |
|
361
|
3
|
|
|
|
|
18
|
$s{merge} .= "\t\$ps->{sum}{$cc} += \$oldps->{sum}{$cc};\n"; |
|
362
|
|
|
|
|
|
|
} |
|
363
|
|
|
|
|
|
|
|
|
364
|
5
|
|
|
|
|
88
|
for $cc (sort keys %{$agg_config->{mean}}) { |
|
|
5
|
|
|
|
|
21
|
|
|
365
|
7
|
|
|
|
|
26
|
$s{initialize} .= "\t\$ps->{mean}{$cc} = undef;\n"; |
|
366
|
7
|
|
|
|
|
36
|
$s{stat} .= "\t\$ps->{mean}{$cc} = \$ps->{mean_sum}{$cc} / (\$ps->{mean_count}{$cc} || 100);\n"; |
|
367
|
7
|
|
|
|
|
24
|
$s{initialize} .= "\t\$ps->{mean_sum}{$cc} = 0;\n"; |
|
368
|
7
|
|
|
|
|
24
|
$s{initialize} .= "\t\$ps->{mean_count}{$cc} = 0;\n"; |
|
369
|
7
|
|
|
|
|
32
|
$s{count2} .= "\tif (defined($varname{$agg_config->{mean}{$cc}})) {\n"; |
|
370
|
7
|
|
|
|
|
28
|
$s{count2} .= "\t \$ps->{mean_sum}{$cc} += $varname{$agg_config->{mean}{$cc}};\n"; |
|
371
|
7
|
|
|
|
|
22
|
$s{count2} .= "\t \$ps->{mean_count}{$cc}++;\n"; |
|
372
|
7
|
|
|
|
|
13
|
$s{count2} .= "\t}\n"; |
|
373
|
7
|
|
|
|
|
26
|
$s{merge} .= "\t\$ps->{mean_sum}{$cc} += \$oldps->{mean_sum}{$cc};\n"; |
|
374
|
7
|
|
|
|
|
28
|
$s{merge} .= "\t\$ps->{mean_count}{$cc} += \$oldps->{mean_count}{$cc};\n"; |
|
375
|
|
|
|
|
|
|
} |
|
376
|
|
|
|
|
|
|
|
|
377
|
5
|
|
|
|
|
15
|
for $cc (sort keys %{$agg_config->{min}}) { |
|
|
5
|
|
|
|
|
21
|
|
|
378
|
2
|
|
|
|
|
7
|
$s{initialize} .= "\t\$ps->{min}{$cc} = undef;\n"; |
|
379
|
2
|
|
|
|
|
11
|
$s{count2} .= "\t\$ps->{min}{$cc} = min grep { defined } \$ps->{min}{$cc}, $varname{$agg_config->{min}{$cc}};\n"; |
|
380
|
2
|
|
|
|
|
9
|
$s{merge} .= "\t\$ps->{min}{$cc} = min grep { defined } \$ps->{min}{$cc}, \$oldps->{min}{$cc};\n"; |
|
381
|
|
|
|
|
|
|
} |
|
382
|
|
|
|
|
|
|
|
|
383
|
5
|
|
|
|
|
22
|
for $cc (sort keys %{$agg_config->{minstr}}) { |
|
|
5
|
|
|
|
|
25
|
|
|
384
|
0
|
|
|
|
|
0
|
$s{initialize} .= "\t\$ps->{minstr}{$cc} = undef;\n"; |
|
385
|
0
|
|
|
|
|
0
|
$s{count2} .= "\t\$ps->{minstr}{$cc} = minstr grep { defined } \$ps->{minstr}{$cc}, $varname{$agg_config->{minstr}{$cc}};\n"; |
|
386
|
0
|
|
|
|
|
0
|
$s{merge} .= "\t\$ps->{minstr}{$cc} = minstr grep { defined } \$ps->{minstr}{$cc}, \$oldps->{minstr}{$cc};\n"; |
|
387
|
|
|
|
|
|
|
} |
|
388
|
|
|
|
|
|
|
|
|
389
|
5
|
|
|
|
|
13
|
for $cc (sort keys %{$agg_config->{max}}) { |
|
|
5
|
|
|
|
|
21
|
|
|
390
|
3
|
|
|
|
|
11
|
$s{initialize} .= "\t\$ps->{max}{$cc} = undef;\n"; |
|
391
|
3
|
|
|
|
|
14
|
$s{count2} .= "\t\$ps->{max}{$cc} = max grep { defined } \$ps->{max}{$cc}, $varname{$agg_config->{max}{$cc}};\n"; |
|
392
|
3
|
|
|
|
|
14
|
$s{merge} .= "\t\$ps->{max}{$cc} = max grep { defined } \$ps->{max}{$cc}, \$oldps->{max}{$cc};\n"; |
|
393
|
|
|
|
|
|
|
} |
|
394
|
|
|
|
|
|
|
|
|
395
|
5
|
|
|
|
|
15
|
for $cc (sort keys %{$agg_config->{maxstr}}) { |
|
|
5
|
|
|
|
|
21
|
|
|
396
|
0
|
|
|
|
|
0
|
$s{initialize} .= "\t\$ps->{maxstr}{$cc} = undef;\n"; |
|
397
|
0
|
|
|
|
|
0
|
$s{count2} .= "\t\$ps->{maxstr}{$cc} = maxstr grep { defined } \$ps->{maxstr}{$cc}, $varname{$agg_config->{maxstr}{$cc}};\n"; |
|
398
|
0
|
|
|
|
|
0
|
$s{merge} .= "\t\$ps->{maxstr}{$cc} = maxstr grep { defined } \$ps->{maxstr}{$cc}, \$oldps->{maxstr}{$cc};\n"; |
|
399
|
|
|
|
|
|
|
} |
|
400
|
|
|
|
|
|
|
|
|
401
|
5
|
|
|
|
|
15
|
for my $statc (@stats_cols) { |
|
402
|
15
|
|
|
|
|
22
|
for $cc (sort keys %{$agg_config->{$statc}}) { |
|
|
15
|
|
|
|
|
60
|
|
|
403
|
6
|
|
50
|
|
|
33
|
my $keepcc = $donekeep{$varname{$agg_config->{$statc}{$cc}}} || die; |
|
404
|
6
|
|
|
|
|
33
|
$s{initialize} .= "\t\$ps->{$statc}{$cc} = undef;\n"; |
|
405
|
6
|
|
|
|
|
32
|
$s{stat} .= "\t\$ps->{$statc}{$cc} = $statc('$keepcc');\n"; |
|
406
|
|
|
|
|
|
|
} |
|
407
|
|
|
|
|
|
|
} |
|
408
|
|
|
|
|
|
|
|
|
409
|
5
|
|
|
|
|
12
|
for $cc (sort keys %{$agg_config->{stat}}) { |
|
|
5
|
|
|
|
|
20
|
|
|
410
|
3
|
|
|
|
|
11
|
$s{stat} .= "\t\$ps->{stat}{$cc} = $varname{$agg_config->{stat}{$cc}};\n"; |
|
411
|
3
|
|
|
|
|
7
|
$s{initialize} .= "\t\$ps->{stat}{$cc} = undef;\n"; |
|
412
|
|
|
|
|
|
|
} |
|
413
|
|
|
|
|
|
|
|
|
414
|
5
|
|
|
|
|
13
|
for my $cc (sort keys %{$agg_config->{output}}) { |
|
|
5
|
|
|
|
|
18
|
|
|
415
|
5
|
|
|
|
|
22
|
$s{initialize} .= "\t\$ps->{output}{$cc} = undef;\n"; |
|
416
|
5
|
|
|
|
|
24
|
$s{stat} .= "\t\$ps->{output}{$cc} = $varname{$agg_config->{output}{$cc}};\n"; |
|
417
|
|
|
|
|
|
|
} |
|
418
|
|
|
|
|
|
|
|
|
419
|
5
|
|
|
|
|
16
|
for my $icol (@lock_data) { |
|
420
|
70
|
|
|
|
|
206
|
$s{initialize} .= "\tlock_keys(%{\$ps->{$icol}});\n" |
|
421
|
70
|
100
|
|
|
|
69
|
if keys %{$agg_config->{$icol}}; |
|
422
|
|
|
|
|
|
|
} |
|
423
|
|
|
|
|
|
|
|
|
424
|
5
|
|
|
|
|
14
|
for my $ctype (@output_cols) { |
|
425
|
65
|
|
|
|
|
65
|
for $cc (sort keys %{$agg_config->{$ctype}}) { |
|
|
65
|
|
|
|
|
166
|
|
|
426
|
31
|
|
|
|
|
103
|
$s{final_values} .= "\t\$row->{$cc} = \$ps->{$ctype}{$cc};\n"; |
|
427
|
|
|
|
|
|
|
} |
|
428
|
|
|
|
|
|
|
} |
|
429
|
5
|
100
|
|
|
|
32
|
$s{final_values} .= "\t&\$finalize_result_func;\n" if $agg_config->{finalize_result}; |
|
430
|
|
|
|
|
|
|
|
|
431
|
5
|
|
|
|
|
12
|
my $code = $strict; |
|
432
|
5
|
50
|
|
|
|
32
|
$code .= qq{\n#line 1 "FAKE-all-code-for-$extra->{name}"\n} if $renumber; |
|
433
|
5
|
|
|
|
|
22
|
$code .= qq{\nmy $agg_config->{item_name};\n}; |
|
434
|
5
|
|
|
|
|
20
|
$code .= $declarations; |
|
435
|
5
|
|
|
|
|
8
|
$code .= "{\n"; |
|
436
|
|
|
|
|
|
|
|
|
437
|
5
|
|
|
|
|
28
|
$s{reduce} .= "\t&\$user_reduce_func;\n"; |
|
438
|
|
|
|
|
|
|
|
|
439
|
|
|
|
|
|
|
my $assemble_code = sub { |
|
440
|
50
|
|
|
|
|
203
|
my ($func, @keys) = @_; |
|
441
|
50
|
|
|
|
|
54
|
my $something; |
|
442
|
50
|
|
|
|
|
61
|
my $c = "# ---------------------------------------------------------------\n"; |
|
443
|
50
|
100
|
|
|
|
188
|
$c .= "\$${func}_func = sub {\n" |
|
444
|
|
|
|
|
|
|
if $func; |
|
445
|
50
|
|
|
|
|
72
|
for my $s (@keys) { |
|
446
|
200
|
100
|
|
|
|
419
|
next unless exists $s{$s}; |
|
447
|
114
|
50
|
|
|
|
468
|
$c .= qq{\n#line 1001 "FAKEFUNC-$extra->{name}-$func-$s"\n} if $renumber; |
|
448
|
114
|
|
|
|
|
209
|
$c .= $s{$s}; |
|
449
|
114
|
|
|
|
|
169
|
delete $s{$s}; |
|
450
|
114
|
|
|
|
|
189
|
$something = 1; |
|
451
|
|
|
|
|
|
|
} |
|
452
|
50
|
100
|
100
|
|
|
218
|
$c .= "\t0\n" |
|
453
|
|
|
|
|
|
|
if $func && ! $something; |
|
454
|
50
|
100
|
|
|
|
114
|
$c .= "};\n" |
|
455
|
|
|
|
|
|
|
if $func; |
|
456
|
50
|
|
|
|
|
373
|
return $c; |
|
457
|
5
|
|
|
|
|
37
|
}; |
|
458
|
|
|
|
|
|
|
|
|
459
|
|
|
|
|
|
|
# |
|
460
|
|
|
|
|
|
|
# Cross product aggregation & counts |
|
461
|
|
|
|
|
|
|
# |
|
462
|
5
|
100
|
|
|
|
34
|
if (@cross_keys) { |
|
|
|
50
|
|
|
|
|
|
|
463
|
3
|
|
|
|
|
6
|
my $esub = ''; |
|
464
|
3
|
|
|
|
|
7
|
my $newsub = ''; |
|
465
|
3
|
|
|
|
|
5
|
my $oldsub = ''; |
|
466
|
3
|
|
|
|
|
7
|
my $loop_in = ''; |
|
467
|
3
|
|
|
|
|
5
|
my $loop_in2 = ''; |
|
468
|
3
|
|
|
|
|
6
|
my $loop_in3 = ''; |
|
469
|
3
|
|
|
|
|
4
|
my $loop_in3a = ''; |
|
470
|
3
|
|
|
|
|
7
|
my $loop_out = ''; |
|
471
|
3
|
|
|
|
|
5
|
my $loop_out2 = ''; |
|
472
|
3
|
|
|
|
|
5
|
my $loop_indent = ""; |
|
473
|
3
|
|
|
|
|
6
|
my $loop_head = ''; |
|
474
|
3
|
|
|
|
|
4
|
my $loop_mid = ''; |
|
475
|
3
|
|
|
|
|
6
|
my $loop_mid3 = ''; |
|
476
|
3
|
|
|
|
|
6
|
my $loop_dbug_old = ''; |
|
477
|
3
|
|
|
|
|
6
|
my $loop_dbug_new = ''; |
|
478
|
3
|
|
|
|
|
7
|
for my $cc (@cross_keys) { |
|
479
|
9
|
50
|
|
|
|
27
|
die "Crossproduct column '$cc' doesn't exist" unless $var_types{$cc}; |
|
480
|
9
|
50
|
|
|
|
25
|
die "Crossproduct column '$cc' ($var_types{$cc}) isn't a valid type (@cross_cols)" unless $cross_cols{$var_types{$cc}}; |
|
481
|
|
|
|
|
|
|
|
|
482
|
9
|
|
100
|
|
|
46
|
my $cc_code = $agg_config->{simplify}{$cc} || 'return "*";'; |
|
483
|
9
|
|
|
|
|
54
|
$s{user} .= "my \$simplify_$cc = sub {\n"; |
|
484
|
9
|
50
|
|
|
|
42
|
$s{user} .= qq{#line 3001 "FAKE-$extra->{name}-simplify-$cc"\n} if $renumber; |
|
485
|
9
|
|
|
|
|
19
|
$s{user} .= "\t".$cc_code; |
|
486
|
9
|
|
|
|
|
12
|
$s{user} .= "\n};\n"; |
|
487
|
|
|
|
|
|
|
|
|
488
|
9
|
|
|
|
|
28
|
$loop_head .= "\tmy %key_count_$cc;\n"; |
|
489
|
|
|
|
|
|
|
|
|
490
|
9
|
|
|
|
|
27
|
$loop_mid .= "\tmy \$key_map_$cc = \$cross_key_reduce_func->('$cc', \\%key_count_$cc, \$simplify_$cc);\n"; |
|
491
|
9
|
|
|
|
|
25
|
$loop_mid3 .= ", $cc => \$key_$cc"; |
|
492
|
|
|
|
|
|
|
|
|
493
|
9
|
|
|
|
|
18
|
$loop_dbug_old .= " $cc:\$key_$cc"; |
|
494
|
9
|
|
|
|
|
16
|
$loop_dbug_new .= " $cc:\$new_$cc"; |
|
495
|
|
|
|
|
|
|
|
|
496
|
9
|
|
|
|
|
23
|
$loop_in2 .= "$loop_indent for my \$key_$cc (keys %{\$cross_data$oldsub}) {\n"; |
|
497
|
|
|
|
|
|
|
|
|
498
|
9
|
|
|
|
|
34
|
$loop_in .= "$loop_indent for my \$key_$cc (keys %{\$cross_data$oldsub}) {\n"; |
|
499
|
9
|
|
|
|
|
24
|
$loop_in .= "$loop_indent my \$new_$cc = \$key_$cc;\n"; |
|
500
|
9
|
|
|
|
|
18
|
$loop_in .= "$loop_indent my \$must_inc = 0;\n"; |
|
501
|
9
|
|
|
|
|
19
|
$loop_in .= "$loop_indent if (exists \$key_map_${cc}->{\$key_$cc}) {\n"; |
|
502
|
9
|
|
|
|
|
21
|
$loop_in .= "$loop_indent \$new_$cc = \$key_map_${cc}->{\$key_$cc};\n"; |
|
503
|
9
|
|
|
|
|
22
|
$loop_in .= "$loop_indent \$must_inc = 1;\n"; |
|
504
|
9
|
|
|
|
|
14
|
$loop_in .= "$loop_indent \$must_do++;\n"; |
|
505
|
9
|
|
|
|
|
13
|
$loop_in .= "$loop_indent } else {\n"; |
|
506
|
9
|
|
|
|
|
19
|
$loop_in .= "$loop_indent \$new_$cc = \$key_$cc;\n"; |
|
507
|
9
|
|
|
|
|
16
|
$loop_in .= "$loop_indent }\n"; |
|
508
|
|
|
|
|
|
|
|
|
509
|
9
|
|
|
|
|
20
|
$loop_in3a .= "\$key_count_${cc}{\$key_$cc}++;\n"; |
|
510
|
|
|
|
|
|
|
|
|
511
|
9
|
|
|
|
|
15
|
$loop_out .= "$loop_indent }\n"; |
|
512
|
9
|
|
|
|
|
16
|
$loop_out .= "$loop_indent \$must_do -= \$must_inc;\n"; |
|
513
|
9
|
|
|
|
|
14
|
$loop_out2 .= "$loop_indent }\n"; |
|
514
|
|
|
|
|
|
|
|
|
515
|
9
|
|
|
|
|
17
|
$loop_indent .= "\t"; |
|
516
|
|
|
|
|
|
|
|
|
517
|
9
|
|
|
|
|
23
|
$esub .= "->{$var_value{$cc}}"; |
|
518
|
9
|
|
|
|
|
18
|
$newsub .= "->{\$new_$cc}"; |
|
519
|
9
|
|
|
|
|
26
|
$oldsub .= "->{\$key_$cc}"; |
|
520
|
|
|
|
|
|
|
}; |
|
521
|
3
|
|
|
|
|
16
|
for my $in3a (split(/\n/, $loop_in3a)) { |
|
522
|
9
|
|
|
|
|
43
|
$loop_in3 .= "$loop_indent $in3a\n"; |
|
523
|
|
|
|
|
|
|
} |
|
524
|
|
|
|
|
|
|
|
|
525
|
3
|
|
|
|
|
21
|
$loop_out = join("\n", reverse split(/\n/, $loop_out)) . "\n"; |
|
526
|
3
|
|
|
|
|
554
|
$loop_out2 = join("\n", reverse split(/\n/, $loop_out2)) . "\n"; |
|
527
|
|
|
|
|
|
|
|
|
528
|
|
|
|
|
|
|
# |
|
529
|
|
|
|
|
|
|
# Reduce the number of contexts |
|
530
|
|
|
|
|
|
|
# |
|
531
|
|
|
|
|
|
|
|
|
532
|
|
|
|
|
|
|
$cross_key_reduce_func = sub { |
|
533
|
9
|
|
|
|
|
727
|
my ($keyname, $valcounts, $simplify_func) = @_; |
|
534
|
9
|
|
|
|
|
11
|
my %ret; |
|
535
|
9
|
100
|
|
|
|
52
|
if (keys %$valcounts > $agg_config->{crossproduct}{$keyname}) { |
|
536
|
2
|
|
|
|
|
5
|
$do_reduce = 1; |
|
537
|
2
|
|
|
|
|
6
|
my $limit = $agg_config->{crossproduct}{$keyname}; |
|
538
|
2
|
|
|
|
|
7
|
my $current = keys %$valcounts; |
|
539
|
2
|
|
|
|
|
2
|
my %seen; |
|
540
|
|
|
|
|
|
|
my %new; |
|
541
|
2
|
|
|
|
|
12
|
for my $val (sort { $valcounts->{$a} <=> $valcounts->{$b} } keys %$valcounts) { |
|
|
33
|
|
|
|
|
68
|
|
|
542
|
15
|
100
|
|
|
|
31
|
if ($current > $limit) { |
|
543
|
12
|
|
|
|
|
28
|
my $new = $simplify_func->($val, $keyname); |
|
544
|
12
|
50
|
|
|
|
39
|
next if $new eq $val; |
|
545
|
12
|
100
|
|
|
|
36
|
if ($seen{$new}++) { |
|
546
|
10
|
|
|
|
|
12
|
$current--; |
|
547
|
|
|
|
|
|
|
} |
|
548
|
12
|
|
|
|
|
14
|
$new{$new}++; |
|
549
|
12
|
50
|
|
|
|
24
|
if ($new{$val}) { |
|
550
|
|
|
|
|
|
|
# we can't throw this one away since we have new |
|
551
|
|
|
|
|
|
|
# users... we may not be able to meet our contract. |
|
552
|
0
|
0
|
|
|
|
0
|
$current-- unless --$seen{$new}; |
|
553
|
0
|
|
|
|
|
0
|
$new{$new}--; |
|
554
|
0
|
|
|
|
|
0
|
next; |
|
555
|
|
|
|
|
|
|
} else { |
|
556
|
12
|
|
|
|
|
25
|
$ret{$val} = $new; |
|
557
|
|
|
|
|
|
|
} |
|
558
|
|
|
|
|
|
|
} |
|
559
|
|
|
|
|
|
|
} |
|
560
|
|
|
|
|
|
|
} |
|
561
|
9
|
50
|
|
|
|
27
|
print STDERR YAML::Dump("reduce $keyname", \%ret) if $agg_config->{debug} > 2; |
|
562
|
9
|
|
|
|
|
30
|
return \%ret; |
|
563
|
3
|
|
|
|
|
26
|
}; |
|
564
|
|
|
|
|
|
|
|
|
565
|
3
|
|
|
|
|
8
|
my $db1 = ''; |
|
566
|
3
|
|
|
|
|
5
|
my $db2 = ''; |
|
567
|
3
|
50
|
|
|
|
18
|
$db1 = qq{print STDERR "Merging\t$loop_dbug_old (\$cross_data${oldsub}->{item_counter})\tinto\t$loop_dbug_new\t\$cross_count\\n";} if $agg_config->{debug}; |
|
568
|
3
|
50
|
|
|
|
13
|
$db2 = qq{print STDERR "Moving\t$loop_dbug_old (\$cross_data${oldsub}->{item_counter})\tto\t$loop_dbug_new\\n";} if $agg_config->{debug}; |
|
569
|
3
|
50
|
|
|
|
36
|
$s{cross_reduce} .= resume_line_numbering if $renumber; |
|
570
|
|
|
|
|
|
|
|
|
571
|
3
|
|
|
|
|
11
|
$s{cross_reduce} .= "\t\$do_reduce = 0;\n"; |
|
572
|
3
|
|
|
|
|
7
|
$s{cross_reduce} .= "\tmy \$must_do = 0;\n"; |
|
573
|
3
|
|
|
|
|
14
|
$s{cross_reduce} .= $loop_head; |
|
574
|
3
|
|
|
|
|
9
|
$s{cross_reduce} .= $loop_in2; |
|
575
|
3
|
|
|
|
|
8
|
$s{cross_reduce} .= $loop_in3; |
|
576
|
3
|
|
|
|
|
14
|
$s{cross_reduce} .= $loop_out2; |
|
577
|
3
|
|
|
|
|
8
|
$s{cross_reduce} .= $loop_mid; |
|
578
|
3
|
|
|
|
|
8
|
$s{cross_reduce} .= $loop_in; |
|
579
|
3
|
|
|
|
|
42
|
$s{cross_reduce} .= $eval_line_numbers->(<
|
|
580
|
|
|
|
|
|
|
# --------------- reduce ------------- |
|
581
|
|
|
|
|
|
|
if (\$must_do) { |
|
582
|
|
|
|
|
|
|
if (\$cross_data$newsub) { |
|
583
|
|
|
|
|
|
|
\$cross_count--; |
|
584
|
|
|
|
|
|
|
$db1 |
|
585
|
|
|
|
|
|
|
\$ps = \$cross_data$newsub; |
|
586
|
|
|
|
|
|
|
\$oldps = delete \$cross_data$oldsub; |
|
587
|
|
|
|
|
|
|
# |
|
588
|
|
|
|
|
|
|
# print STDERR "ABOUT TO MERGE: \$key_color \$key_size \$key_style \$oldps\\n"; |
|
589
|
|
|
|
|
|
|
# print STDERR YAML::Dump("Pre-mege cross-data", \$cross_data); |
|
590
|
|
|
|
|
|
|
# |
|
591
|
|
|
|
|
|
|
&\$merge_func; |
|
592
|
|
|
|
|
|
|
\$ps = \$contexts[-1]; |
|
593
|
|
|
|
|
|
|
} else { |
|
594
|
|
|
|
|
|
|
$db2 |
|
595
|
|
|
|
|
|
|
\$cross_data$newsub = delete \$cross_data$oldsub; |
|
596
|
|
|
|
|
|
|
} |
|
597
|
|
|
|
|
|
|
} |
|
598
|
|
|
|
|
|
|
# --------------- reduce ------------- |
|
599
|
|
|
|
|
|
|
END_CR |
|
600
|
3
|
50
|
|
|
|
216
|
$s{cross_reduce} .= resume_line_numbering if $renumber; |
|
601
|
3
|
|
|
|
|
9
|
$s{cross_reduce} .= $loop_out; |
|
602
|
|
|
|
|
|
|
|
|
603
|
|
|
|
|
|
|
# |
|
604
|
|
|
|
|
|
|
# Add data to the right context |
|
605
|
|
|
|
|
|
|
# |
|
606
|
3
|
|
|
|
|
12
|
my $db3 = ''; |
|
607
|
3
|
50
|
|
|
|
19
|
$db3 = qq{print STDERR "Cross-count: \$cross_count\\n";} if $agg_config->{debug} > 3; |
|
608
|
3
|
|
|
|
|
27
|
$s{crossproduct} .= $eval_line_numbers->(<
|
|
609
|
|
|
|
|
|
|
if (\$cross_count > \$cross_limit * 2) { |
|
610
|
|
|
|
|
|
|
&\$cross_reduce_func; |
|
611
|
|
|
|
|
|
|
} |
|
612
|
|
|
|
|
|
|
if (\$cross_data$esub) { |
|
613
|
|
|
|
|
|
|
\$ps = \$cross_data$esub; |
|
614
|
|
|
|
|
|
|
} else { |
|
615
|
|
|
|
|
|
|
&\$new_ps_func; |
|
616
|
|
|
|
|
|
|
\$cross_data$esub = \$ps; |
|
617
|
|
|
|
|
|
|
\$cross_count++; |
|
618
|
|
|
|
|
|
|
$db3 |
|
619
|
|
|
|
|
|
|
} |
|
620
|
|
|
|
|
|
|
END_CP |
|
621
|
3
|
50
|
|
|
|
54
|
$s{crossproduct} .= resume_line_numbering if $renumber; |
|
622
|
|
|
|
|
|
|
|
|
623
|
|
|
|
|
|
|
# |
|
624
|
|
|
|
|
|
|
# handle combinations |
|
625
|
|
|
|
|
|
|
# |
|
626
|
3
|
|
|
|
|
11
|
$s{inner_combine} = ''; |
|
627
|
3
|
|
|
|
|
6
|
$s{outer_combine} = ''; |
|
628
|
3
|
100
|
|
|
|
15
|
if ($agg_config->{combinations}) { |
|
629
|
2
|
|
|
|
|
3
|
my $generate_combinations; |
|
630
|
2
|
|
|
|
|
4
|
my $combination_number = 0; |
|
631
|
2
|
|
|
|
|
5
|
my %mapping; |
|
632
|
|
|
|
|
|
|
$generate_combinations = sub { |
|
633
|
12
|
|
|
|
|
29
|
my ($output_field, $input_ps, $indent, $keys, $done, $loop_over) = @_; |
|
634
|
12
|
|
|
|
|
18
|
my $out = \$s{$output_field}; |
|
635
|
12
|
|
|
|
|
16
|
my $loopout = ''; |
|
636
|
12
|
|
|
|
|
17
|
my $x = ''; |
|
637
|
12
|
|
|
|
|
14
|
my $y = ''; |
|
638
|
12
|
100
|
|
|
|
25
|
my @loop_keys = grep { $agg_config->{combinations}{$_} && ! $done->{$_} } sort @$keys; |
|
|
17
|
|
|
|
|
107
|
|
|
639
|
12
|
|
|
|
|
30
|
my @delayed_call; |
|
640
|
12
|
100
|
|
|
|
35
|
if ($loop_over) { |
|
641
|
10
|
|
|
|
|
13
|
for my $k (@loop_keys) { |
|
642
|
11
|
|
|
|
|
29
|
$$out .= "$indent$x for my \$ck_$k (keys %{$input_ps}) {\n"; |
|
643
|
11
|
|
|
|
|
22
|
$loopout = "$indent$x }\n$loopout"; |
|
644
|
11
|
|
|
|
|
14
|
$x .= "\t"; |
|
645
|
11
|
|
|
|
|
24
|
$input_ps .= "{\$ck_$k}"; |
|
646
|
|
|
|
|
|
|
} |
|
647
|
10
|
|
|
|
|
14
|
$y = $x; |
|
648
|
10
|
100
|
|
|
|
20
|
if (! @loop_keys) { |
|
649
|
2
|
|
|
|
|
5
|
$y .= "\t"; |
|
650
|
2
|
|
|
|
|
7
|
$$out .= "$indent$x if ($input_ps) {\n"; |
|
651
|
|
|
|
|
|
|
} |
|
652
|
10
|
|
|
|
|
30
|
$$out .= "$indent$y \$row = ${input_ps}->{row};\n"; |
|
653
|
|
|
|
|
|
|
} |
|
654
|
12
|
|
|
|
|
18
|
for my $cc (@loop_keys) { |
|
655
|
16
|
|
|
|
|
26
|
my @keeping = grep { $_ ne $cc } @loop_keys; |
|
|
30
|
|
|
|
|
63
|
|
|
656
|
16
|
100
|
|
|
|
71
|
if ($mapping{"@keeping"}++) { |
|
657
|
6
|
|
|
|
|
15
|
$$out .= "$indent$x # we've already handled keeping '@keeping'\n"; |
|
658
|
6
|
|
|
|
|
104
|
next; |
|
659
|
|
|
|
|
|
|
} |
|
660
|
|
|
|
|
|
|
|
|
661
|
11
|
|
|
|
|
38
|
my $accessor = @keeping |
|
662
|
10
|
100
|
|
|
|
26
|
? "{" . join("}{", map { "\$row->{'$_'}" } @keeping) . "}" |
|
663
|
|
|
|
|
|
|
: ''; |
|
664
|
|
|
|
|
|
|
|
|
665
|
10
|
100
|
100
|
|
|
50
|
if ($loop_over && @keeping) { |
|
666
|
3
|
|
|
|
|
4
|
$accessor = "{" . join("}{", map { "\$ck_$_" } @keeping) . "}" |
|
|
3
|
|
|
|
|
13
|
|
|
667
|
|
|
|
|
|
|
} |
|
668
|
|
|
|
|
|
|
|
|
669
|
|
|
|
|
|
|
# yes, we're using auto-vivification. It's ugly, but simplifies |
|
670
|
|
|
|
|
|
|
# the code. |
|
671
|
|
|
|
|
|
|
|
|
672
|
10
|
|
|
|
|
15
|
$$out .= "\n"; |
|
673
|
10
|
|
|
|
|
18
|
$$out .= "$indent$x # combine, dropping $cc"; |
|
674
|
10
|
100
|
|
|
|
32
|
$$out .= ", keeping: @keeping" if @keeping; |
|
675
|
10
|
|
|
|
|
13
|
$$out .= "\n"; |
|
676
|
|
|
|
|
|
|
|
|
677
|
10
|
|
|
|
|
20
|
$$out .= "$indent$x if (\$combination_funcs{'$cc'}->()) {\n"; |
|
678
|
10
|
|
|
|
|
23
|
$$out .= "$indent$x if (\$combinations[$combination_number]$accessor) {\n"; |
|
679
|
10
|
|
|
|
|
19
|
$$out .= "$indent$x local(\$Stream::Aggregate::Stats::ps)\n"; |
|
680
|
10
|
|
|
|
|
13
|
$$out .= "$indent$x = \$ps\n"; |
|
681
|
10
|
|
|
|
|
19
|
$$out .= "$indent$x = \$combinations[$combination_number]$accessor;\n"; |
|
682
|
10
|
100
|
|
|
|
29
|
$$out .= "$indent$x \$oldps = $input_ps;\n" if $input_ps ne '$oldps'; |
|
683
|
10
|
|
|
|
|
34
|
$$out .= "$indent$x &\$merge_func;\n"; |
|
684
|
10
|
|
|
|
|
16
|
$$out .= "$indent$x } else {\n"; |
|
685
|
10
|
|
|
|
|
27
|
$$out .= "$indent$x \$ps = \$combinations[$combination_number]$accessor = clone($input_ps);\n"; |
|
686
|
10
|
|
|
|
|
17
|
$$out .= "$indent$x \$ps->{row} = { %\$row };\n"; |
|
687
|
10
|
|
|
|
|
17
|
$$out .= "$indent$x delete \$ps->{row}{'$cc'};\n"; |
|
688
|
10
|
|
|
|
|
14
|
$$out .= "$indent$x }\n"; |
|
689
|
10
|
|
|
|
|
89
|
$$out .= "$indent$x }\n"; |
|
690
|
|
|
|
|
|
|
|
|
691
|
10
|
|
|
|
|
16
|
my $pnum = $combination_number++; |
|
692
|
|
|
|
|
|
|
|
|
693
|
|
|
|
|
|
|
push(@delayed_call, sub { |
|
694
|
10
|
|
|
|
|
101
|
$generate_combinations->( |
|
695
|
|
|
|
|
|
|
outer_combine => "\$combinations[$pnum]", |
|
696
|
|
|
|
|
|
|
"", |
|
697
|
|
|
|
|
|
|
\@keeping, |
|
698
|
|
|
|
|
|
|
{ %$done, $cc => 1 }, |
|
699
|
|
|
|
|
|
|
$cc); |
|
700
|
10
|
|
|
|
|
50
|
}); |
|
701
|
|
|
|
|
|
|
} |
|
702
|
12
|
100
|
|
|
|
26
|
if ($loop_over) { |
|
703
|
10
|
|
|
|
|
15
|
$$out .= "\n"; |
|
704
|
10
|
|
|
|
|
27
|
$$out .= "$indent$y # final values with '@loop_keys' keys\n"; |
|
705
|
10
|
|
|
|
|
44
|
$$out .= "$indent$y local(\$Stream::Aggregate::Stats::ps) = \$ps = ${input_ps};\n"; |
|
706
|
10
|
|
|
|
|
19
|
$$out .= "$indent$y \$suppress_result = 0;\n"; |
|
707
|
10
|
|
|
|
|
14
|
$$out .= "$indent$y \$final_values_func->();\n"; |
|
708
|
10
|
|
|
|
|
26
|
$$out .= "$indent$y push(\@\$retref, \$row) unless \$suppress_result;\n"; |
|
709
|
10
|
|
|
|
|
13
|
$$out .= "\n"; |
|
710
|
10
|
100
|
|
|
|
22
|
if (! @loop_keys) { |
|
711
|
2
|
|
|
|
|
5
|
$$out .= "$indent$x }\n"; |
|
712
|
|
|
|
|
|
|
} |
|
713
|
|
|
|
|
|
|
|
|
714
|
10
|
|
|
|
|
14
|
$$out .= $loopout; |
|
715
|
|
|
|
|
|
|
} |
|
716
|
12
|
|
|
|
|
94
|
while (my $dc = shift(@delayed_call)) { |
|
717
|
10
|
|
|
|
|
21
|
$dc->(); |
|
718
|
|
|
|
|
|
|
} |
|
719
|
|
|
|
|
|
|
|
|
720
|
2
|
|
|
|
|
33
|
}; |
|
721
|
2
|
|
|
|
|
11
|
$generate_combinations->( |
|
722
|
|
|
|
|
|
|
inner_combine => '$oldps', |
|
723
|
|
|
|
|
|
|
"\t\t\t", |
|
724
|
|
|
|
|
|
|
\@cross_keys, |
|
725
|
|
|
|
|
|
|
{}, |
|
726
|
|
|
|
|
|
|
undef); |
|
727
|
|
|
|
|
|
|
} |
|
728
|
|
|
|
|
|
|
|
|
729
|
|
|
|
|
|
|
# |
|
730
|
|
|
|
|
|
|
# Return the cross product results |
|
731
|
|
|
|
|
|
|
# |
|
732
|
3
|
50
|
|
|
|
15
|
$s{finish_cross} .= qq{print STDERR "Finish cross called\n";} if $agg_config->{debug} > 7; |
|
733
|
3
|
50
|
|
|
|
34
|
$s{finish_cross} .= qq{print STDERR YAML::Dump('cross_data-before',\$cross_data);\n} if $agg_config->{debug} > 8; |
|
734
|
3
|
|
|
|
|
8
|
$s{finish_cross} .= "\tmy (\$retref) = shift;\n"; |
|
735
|
3
|
|
|
|
|
6
|
$s{finish_cross} .= "\tmy \$rowtmp;\n"; |
|
736
|
3
|
|
|
|
|
7
|
$s{finish_cross} .= "\t&\$cross_reduce_func;\n"; |
|
737
|
3
|
50
|
|
|
|
12
|
$s{finish_cross} .= qq{print STDERR YAML::Dump('cross_data-after',\$cross_data);\n} if $agg_config->{debug} > 8; |
|
738
|
3
|
|
|
|
|
6
|
$s{finish_cross} .= $loop_in2; |
|
739
|
3
|
|
|
|
|
26
|
$s{finish_cross} .= $eval_line_numbers->(<
|
|
740
|
|
|
|
|
|
|
# --------------- finish cross ------------- |
|
741
|
|
|
|
|
|
|
local(\$Stream::Aggregate::Stats::ps) |
|
742
|
|
|
|
|
|
|
= \$ps |
|
743
|
|
|
|
|
|
|
= \$cross_data$oldsub; |
|
744
|
|
|
|
|
|
|
confess unless \$ps; |
|
745
|
|
|
|
|
|
|
\$suppress_result = 0; |
|
746
|
|
|
|
|
|
|
\$rowtmp = \$row = { &\$context_columns_func $loop_mid3 }; |
|
747
|
|
|
|
|
|
|
&\$final_values_func; |
|
748
|
|
|
|
|
|
|
push(@\$retref, \$row) unless \$suppress_result; |
|
749
|
|
|
|
|
|
|
\$oldps = delete \$cross_data$oldsub; |
|
750
|
|
|
|
|
|
|
\$oldps->{row} = \$row; |
|
751
|
|
|
|
|
|
|
\$ps = \$contexts[-1]; |
|
752
|
|
|
|
|
|
|
&\$merge_func if \$ps; |
|
753
|
|
|
|
|
|
|
\$cross_count--; |
|
754
|
|
|
|
|
|
|
$db3 |
|
755
|
|
|
|
|
|
|
END_FC |
|
756
|
3
|
|
|
|
|
60
|
$s{finish_cross} .= delete $s{inner_combine}; |
|
757
|
3
|
|
|
|
|
7
|
$s{finish_cross} .= "\t\t\t\t# --------------- finish cross -------------\n"; |
|
758
|
3
|
50
|
|
|
|
16
|
$s{finish_cross} .= resume_line_numbering if $renumber; |
|
759
|
3
|
|
|
|
|
7
|
$s{finish_cross} .= $loop_out2; |
|
760
|
3
|
|
|
|
|
17
|
$s{finish_cross} .= delete $s{outer_combine}; |
|
761
|
|
|
|
|
|
|
} elsif ($agg_config->{combinations}) { |
|
762
|
0
|
|
|
|
|
0
|
die "combinations requires crossproduct which isn't defined"; |
|
763
|
|
|
|
|
|
|
} |
|
764
|
|
|
|
|
|
|
|
|
765
|
5
|
|
|
|
|
22
|
$code .= $eval_line_numbers->(<<'END_FIELDS'); |
|
766
|
|
|
|
|
|
|
|
|
767
|
|
|
|
|
|
|
my $compile_user_code = sub { |
|
768
|
|
|
|
|
|
|
my ($c, $field, $config_key, $default) = @_; |
|
769
|
|
|
|
|
|
|
return $default unless defined $c->{$field}; |
|
770
|
|
|
|
|
|
|
my $config = $c->{$config_key} || {}; # maybe used by eval |
|
771
|
|
|
|
|
|
|
my $coderef; |
|
772
|
|
|
|
|
|
|
my $code = $strict; |
|
773
|
|
|
|
|
|
|
$code .= qq{\n#line 2001 "FAKE-$extra->{name}-$field"\n} if $renumber; |
|
774
|
|
|
|
|
|
|
$code .= qq{sub { $c->{$field} }; }; |
|
775
|
4
|
|
|
4
|
|
28
|
my $sub = eval $code; |
|
|
4
|
|
|
4
|
|
8
|
|
|
|
4
|
|
|
4
|
|
161
|
|
|
|
4
|
|
|
4
|
|
40
|
|
|
|
4
|
|
|
4
|
|
7
|
|
|
|
4
|
|
|
4
|
|
929
|
|
|
|
4
|
|
|
|
|
25
|
|
|
|
4
|
|
|
|
|
11
|
|
|
|
4
|
|
|
|
|
232
|
|
|
|
4
|
|
|
|
|
22
|
|
|
|
4
|
|
|
|
|
8
|
|
|
|
4
|
|
|
|
|
760
|
|
|
|
4
|
|
|
|
|
22
|
|
|
|
4
|
|
|
|
|
8
|
|
|
|
4
|
|
|
|
|
145
|
|
|
|
4
|
|
|
|
|
104
|
|
|
|
4
|
|
|
|
|
8
|
|
|
|
4
|
|
|
|
|
715
|
|
|
776
|
|
|
|
|
|
|
die "Cannot compile user code for $extra->{name}/$field: $@\n$code" if $@; |
|
777
|
|
|
|
|
|
|
return $coderef if $coderef; |
|
778
|
|
|
|
|
|
|
return $sub; |
|
779
|
|
|
|
|
|
|
}; |
|
780
|
|
|
|
|
|
|
|
|
781
|
|
|
|
|
|
|
$get_context_func = $compile_user_code->($agg_config, 'context', 'context_config', sub { return () }); |
|
782
|
|
|
|
|
|
|
$context_columns_func = $compile_user_code->($agg_config, 'context2columns', 'context2columns_config', sub { return () }); |
|
783
|
|
|
|
|
|
|
$filter_func = $compile_user_code->($agg_config, 'filter', 'filter_config', sub { 1 }); |
|
784
|
|
|
|
|
|
|
$preprocess_func = $compile_user_code->($agg_config, 'preprocess', 'preprocess_config', sub {}); |
|
785
|
|
|
|
|
|
|
$stringify_func = $compile_user_code->($agg_config, 'stringify_context', 'stringify_context_config', sub { map { ref($_) ? Dump($_) : $_ } @_ }); |
|
786
|
|
|
|
|
|
|
$finalize_result_func = $compile_user_code->($agg_config, 'finalize_result', 'finalize_result_config', sub {}); |
|
787
|
|
|
|
|
|
|
$passthrough_func = $compile_user_code->($agg_config, 'passthrough', 'passthrough_config', sub { return () }); |
|
788
|
|
|
|
|
|
|
$user_new_context_func = $compile_user_code->($agg_config, 'new_context', 'new_context_config', sub { return () }); |
|
789
|
|
|
|
|
|
|
$user_merge_func = $compile_user_code->($agg_config, 'merge', 'merge_config', sub { return () }); |
|
790
|
|
|
|
|
|
|
$user_reduce_func = $compile_user_code->($agg_config, 'reduce', 'reduce_config', sub { return () }); |
|
791
|
|
|
|
|
|
|
|
|
792
|
|
|
|
|
|
|
if ($agg_config->{crossproduct} && $agg_config->{combinations}) { |
|
793
|
|
|
|
|
|
|
for my $crosskey (uniq(keys(%{$agg_config->{crossproduct}}), keys(%{$agg_config->{combinations}}))) { |
|
794
|
|
|
|
|
|
|
$combination_funcs{$crosskey} = $compile_user_code->($agg_config->{combinations}, $crosskey, "combine on $crosskey", sub { 0 }); |
|
795
|
|
|
|
|
|
|
} |
|
796
|
|
|
|
|
|
|
} |
|
797
|
|
|
|
|
|
|
|
|
798
|
|
|
|
|
|
|
END_FIELDS |
|
799
|
5
|
|
|
|
|
122
|
$code .= "\t\$itemref = \\$agg_config->{item_name};\n"; |
|
800
|
5
|
|
|
|
|
18
|
$code .= "}\n"; |
|
801
|
|
|
|
|
|
|
|
|
802
|
|
|
|
|
|
|
# |
|
803
|
|
|
|
|
|
|
# New context ($ps) allocator |
|
804
|
|
|
|
|
|
|
# |
|
805
|
|
|
|
|
|
|
|
|
806
|
5
|
|
|
|
|
14
|
$s{new_ps} .= "\t\$ps = {};\n"; |
|
807
|
5
|
|
|
|
|
12
|
$s{new_ps} .= "\t\$ps->{item_counter} = 0;\n"; |
|
808
|
5
|
100
|
|
|
|
41
|
$s{new_ps} .= "\t\$ps->{heap} = {};\n" |
|
809
|
|
|
|
|
|
|
if Dump($agg_config) =~ /\{heap\}/; |
|
810
|
5
|
100
|
|
|
|
1060
|
if ($has_keepers) { |
|
811
|
4
|
|
|
|
|
13
|
$s{new_ps} .= "\t\$ps->{random} = [];\n"; |
|
812
|
4
|
|
|
|
|
12
|
$s{new_ps} .= "\t\$ps->{sidestats} = {};\n"; # for Stream::Aggregate::Stats |
|
813
|
|
|
|
|
|
|
} |
|
814
|
5
|
100
|
|
|
|
21
|
$s{new_ps} .= "\t\$ps->{unfiltered_counter} = 0;\n" if $agg_config->{filter}; |
|
815
|
5
|
50
|
|
|
|
24
|
$s{new_ps} .= "\t&\$initialize_func;\n" if $s{initialize}; |
|
816
|
5
|
50
|
|
|
|
18
|
$s{new_ps} .= "\t&\$user_new_context_func;\n" if $agg_config->{new_context}; |
|
817
|
5
|
|
|
|
|
14
|
$s{new_ps} .= "\t\$ps->{row} = undef;\n"; |
|
818
|
5
|
|
|
|
|
8
|
$s{new_ps} .= "\tlock_keys(%\$ps);\n"; |
|
819
|
|
|
|
|
|
|
|
|
820
|
|
|
|
|
|
|
# |
|
821
|
|
|
|
|
|
|
# main processing loop, generated for execution efficiency |
|
822
|
|
|
|
|
|
|
# |
|
823
|
|
|
|
|
|
|
|
|
824
|
5
|
50
|
|
|
|
17
|
$s{process} .= "\t\$last_item = \$\$itemref;\n" |
|
825
|
|
|
|
|
|
|
if Dump($agg_config) =~ /\$last_item\b/; |
|
826
|
5
|
|
|
|
|
701
|
$s{process} .= $eval_line_numbers->(<<'END_P0'); |
|
827
|
|
|
|
|
|
|
$last_item = $$itemref; |
|
828
|
|
|
|
|
|
|
($$itemref) = @_; |
|
829
|
|
|
|
|
|
|
my @ret; |
|
830
|
|
|
|
|
|
|
unless ($$itemref) { |
|
831
|
|
|
|
|
|
|
$finish_cross_func->(\@ret) if keys %$cross_data; |
|
832
|
|
|
|
|
|
|
$finish_context_func->(\@ret) |
|
833
|
|
|
|
|
|
|
while @contexts; |
|
834
|
|
|
|
|
|
|
return @ret; |
|
835
|
|
|
|
|
|
|
} |
|
836
|
|
|
|
|
|
|
END_P0 |
|
837
|
5
|
100
|
|
|
|
93
|
$s{process} .= $eval_line_numbers->(<<'END_P1') if $agg_config->{preprocess}; |
|
838
|
|
|
|
|
|
|
|
|
839
|
|
|
|
|
|
|
&$preprocess_func; |
|
840
|
|
|
|
|
|
|
|
|
841
|
|
|
|
|
|
|
END_P1 |
|
842
|
5
|
100
|
66
|
|
|
48
|
$s{process} .= $eval_line_numbers->(<<'END_P2') if $agg_config->{filter} && $agg_config->{filter_early}; |
|
843
|
|
|
|
|
|
|
|
|
844
|
|
|
|
|
|
|
$count_this = &$filter_func; |
|
845
|
|
|
|
|
|
|
END_P2 |
|
846
|
5
|
50
|
|
|
|
44
|
$s{process} .= $eval_line_numbers->(<<'END_P3') if $agg_config->{passthrough}; |
|
847
|
|
|
|
|
|
|
|
|
848
|
|
|
|
|
|
|
push(@ret, &$passthrough_func); |
|
849
|
|
|
|
|
|
|
|
|
850
|
|
|
|
|
|
|
END_P3 |
|
851
|
|
|
|
|
|
|
|
|
852
|
5
|
100
|
|
|
|
20
|
if ($agg_config->{context}) { |
|
853
|
2
|
50
|
33
|
|
|
11
|
$s{process} .= $eval_line_numbers->(<<'END_P4') if $agg_config->{filter} && $agg_config->{filter_early}; |
|
854
|
|
|
|
|
|
|
|
|
855
|
|
|
|
|
|
|
if ($count_this) { |
|
856
|
|
|
|
|
|
|
|
|
857
|
|
|
|
|
|
|
END_P4 |
|
858
|
2
|
|
|
|
|
7
|
$s{process} .= $eval_line_numbers->(<<'END_P5'); |
|
859
|
|
|
|
|
|
|
|
|
860
|
|
|
|
|
|
|
my @new_context = &$get_context_func; |
|
861
|
|
|
|
|
|
|
my @new_strings = $stringify_func->(@new_context); |
|
862
|
|
|
|
|
|
|
|
|
863
|
|
|
|
|
|
|
my $diffpos = list_difference_position(@new_strings, @context_strings); |
|
864
|
|
|
|
|
|
|
|
|
865
|
|
|
|
|
|
|
if (defined $diffpos) { |
|
866
|
|
|
|
|
|
|
$finish_context_func->(\@ret) |
|
867
|
|
|
|
|
|
|
while @current_context >= $diffpos; |
|
868
|
|
|
|
|
|
|
} |
|
869
|
|
|
|
|
|
|
|
|
870
|
|
|
|
|
|
|
while (@new_context > @current_context) { |
|
871
|
|
|
|
|
|
|
$add_context_component_func->($new_context[@current_context], $new_strings[@current_context]); |
|
872
|
|
|
|
|
|
|
} |
|
873
|
|
|
|
|
|
|
END_P5 |
|
874
|
|
|
|
|
|
|
|
|
875
|
2
|
50
|
33
|
|
|
37
|
$s{process} .= $eval_line_numbers->(<<'END_P7') if $agg_config->{filter} && $agg_config->{filter_early}; |
|
876
|
|
|
|
|
|
|
} |
|
877
|
|
|
|
|
|
|
|
|
878
|
|
|
|
|
|
|
END_P7 |
|
879
|
|
|
|
|
|
|
} |
|
880
|
|
|
|
|
|
|
|
|
881
|
5
|
50
|
66
|
|
|
32
|
$s{process} .= $eval_line_numbers->(<<'END_P7A') if $agg_config->{filter} && ! $agg_config->{filter_early}; |
|
882
|
|
|
|
|
|
|
|
|
883
|
|
|
|
|
|
|
$count_this = &$filter_func; |
|
884
|
|
|
|
|
|
|
END_P7A |
|
885
|
|
|
|
|
|
|
|
|
886
|
5
|
100
|
66
|
|
|
40
|
$s{process} .= $eval_line_numbers->(<<'END_P7B') if $agg_config->{filter} && ! $agg_config->{context}; |
|
887
|
|
|
|
|
|
|
if ($count_this) { |
|
888
|
|
|
|
|
|
|
END_P7B |
|
889
|
|
|
|
|
|
|
|
|
890
|
5
|
|
|
|
|
42
|
$s{process} .= $eval_line_numbers->(<<'END_P8'); |
|
891
|
|
|
|
|
|
|
&$count_func; |
|
892
|
|
|
|
|
|
|
$ps->{item_counter}++; |
|
893
|
|
|
|
|
|
|
END_P8 |
|
894
|
|
|
|
|
|
|
|
|
895
|
|
|
|
|
|
|
# this closes the if ($count_this) in P3 or in P7B |
|
896
|
5
|
100
|
|
|
|
82
|
$s{process} .= $eval_line_numbers->(<<'END_P9') if $agg_config->{filter}; |
|
897
|
|
|
|
|
|
|
} |
|
898
|
|
|
|
|
|
|
$ps->{unfiltered_counter}++; |
|
899
|
|
|
|
|
|
|
END_P9 |
|
900
|
|
|
|
|
|
|
|
|
901
|
5
|
|
|
|
|
38
|
$s{process} .= $eval_line_numbers->(<<'END_P10'); |
|
902
|
|
|
|
|
|
|
return @ret; |
|
903
|
|
|
|
|
|
|
END_P10 |
|
904
|
5
|
50
|
|
|
|
81
|
$s{process} .= resume_line_numbering if $renumber; |
|
905
|
|
|
|
|
|
|
|
|
906
|
|
|
|
|
|
|
# |
|
907
|
|
|
|
|
|
|
# Merge contexts func |
|
908
|
|
|
|
|
|
|
# |
|
909
|
|
|
|
|
|
|
|
|
910
|
5
|
50
|
|
|
|
26
|
$s{merge0} .= "print STDERR YAML::Dump('MERGE', \$ps, \$oldps);\n" if $agg_config->{debug} > 11; |
|
911
|
5
|
50
|
|
|
|
28
|
$s{merge0} .= resume_line_numbering if $renumber; |
|
912
|
5
|
|
|
|
|
15
|
$s{merge0} .= "\t\$ps->{item_counter} += \$oldps->{item_counter};\n"; |
|
913
|
5
|
100
|
|
|
|
24
|
$s{merge0} .= "\t\$ps->{unfiltered_counter} += \$oldps->{unfiltered_counter};\n" if $agg_config->{filter}; |
|
914
|
5
|
50
|
|
|
|
23
|
$s{merge3} .= resume_line_numbering if $renumber; |
|
915
|
5
|
|
|
|
|
11
|
$s{merge3} .= "\t&\$user_merge_func;\n"; |
|
916
|
|
|
|
|
|
|
|
|
917
|
5
|
50
|
|
|
|
21
|
$s{fv_setup} .= "print STDERR YAML::Dump('final_values', \$ps);\n" if $agg_config->{debug} > 12; |
|
918
|
|
|
|
|
|
|
|
|
919
|
5
|
|
|
|
|
32
|
$code .= $assemble_code->('', qw(user)); |
|
920
|
5
|
|
|
|
|
34
|
$code .= $assemble_code->('merge', qw(merge0 merge merge2 merge3)); |
|
921
|
5
|
|
|
|
|
20
|
$code .= $assemble_code->('cross_reduce', qw(cross_reduce)); |
|
922
|
5
|
|
|
|
|
46
|
$code .= $assemble_code->('finish_cross', qw(finish_cross)); |
|
923
|
5
|
|
|
|
|
16
|
$code .= $assemble_code->('new_ps', qw(new_ps)); |
|
924
|
5
|
|
|
|
|
18
|
$code .= $assemble_code->('process', qw(process)); |
|
925
|
5
|
|
|
|
|
18
|
$code .= $assemble_code->('initialize', qw(initialize)); |
|
926
|
5
|
|
|
|
|
19
|
$code .= $assemble_code->('final_values', qw(fv_setup output stat final_values)); |
|
927
|
5
|
|
|
|
|
92
|
$code .= $assemble_code->('count', qw(precount count ephemeral0 ephemeral ephemeral2 crossproduct ephemeral3 keep standard_deviation median dominant counter percentage sum mean median min minstr max maxstr count2 keeper1 keeper2 keeper3 )); |
|
928
|
5
|
|
|
|
|
19
|
$code .= $assemble_code->('reduce', qw(reduce reduce2)); |
|
929
|
5
|
50
|
|
|
|
27
|
die "INTERNAL ERROR: ".join(' ', keys %s) if keys %s; |
|
930
|
|
|
|
|
|
|
|
|
931
|
5
|
50
|
|
|
|
24
|
if ($suppress_line_numbers) { |
|
932
|
0
|
|
|
|
|
0
|
$code =~ s/^#line \d+ ".*"\s*?\n//mg; |
|
933
|
|
|
|
|
|
|
} |
|
934
|
|
|
|
|
|
|
|
|
935
|
5
|
50
|
|
|
|
20
|
print STDERR $line_numbers{$code}."\n" if $agg_config->{debug}; |
|
936
|
|
|
|
|
|
|
|
|
937
|
5
|
|
|
5
|
|
36
|
eval $code; |
|
|
5
|
|
|
5
|
|
15
|
|
|
|
5
|
|
|
|
|
183
|
|
|
|
5
|
|
|
|
|
25
|
|
|
|
5
|
|
|
|
|
8
|
|
|
|
5
|
|
|
|
|
13586
|
|
|
|
5
|
|
|
|
|
614
|
|
|
938
|
5
|
50
|
|
|
|
6951
|
die "$@\n$line_numbers{$code}" if $@; |
|
939
|
|
|
|
|
|
|
|
|
940
|
5
|
|
|
|
|
342
|
}; |
|
941
|
|
|
|
|
|
|
|
|
942
|
5
|
|
|
|
|
19
|
&$compile_config; |
|
943
|
|
|
|
|
|
|
|
|
944
|
|
|
|
|
|
|
$add_context_component_func = sub { |
|
945
|
9
|
|
|
9
|
|
15
|
my ($component, $component_string) = @_; |
|
946
|
|
|
|
|
|
|
|
|
947
|
9
|
|
|
|
|
23
|
&$new_ps_func; |
|
948
|
|
|
|
|
|
|
|
|
949
|
|
|
|
|
|
|
# keep @contexts and @current_context together |
|
950
|
9
|
|
|
|
|
784
|
push(@current_context, $component); |
|
951
|
9
|
|
|
|
|
25
|
push(@context_strings, $component_string); |
|
952
|
9
|
|
|
|
|
14
|
push(@contexts, $ps); |
|
953
|
|
|
|
|
|
|
|
|
954
|
9
|
|
|
|
|
15
|
$items_seen[$#contexts] += 1; |
|
955
|
9
|
|
|
|
|
20
|
$#items_seen = $#contexts; |
|
956
|
9
|
|
|
|
|
30
|
push(@items_seen, 0); |
|
957
|
5
|
|
|
|
|
33
|
}; |
|
958
|
|
|
|
|
|
|
|
|
959
|
|
|
|
|
|
|
$finish_context_func = sub { |
|
960
|
9
|
|
|
9
|
|
15
|
my ($retref) = @_; |
|
961
|
|
|
|
|
|
|
|
|
962
|
9
|
50
|
|
|
|
21
|
die unless @contexts; |
|
963
|
|
|
|
|
|
|
|
|
964
|
9
|
50
|
|
|
|
29
|
print STDERR "about to call finish cross\n" if $agg_config->{debug} > 5; |
|
965
|
9
|
|
|
|
|
23
|
$finish_cross_func->($retref); |
|
966
|
|
|
|
|
|
|
|
|
967
|
9
|
50
|
|
|
|
30
|
die unless @contexts; |
|
968
|
|
|
|
|
|
|
|
|
969
|
9
|
50
|
|
|
|
27
|
confess unless ref $ps; |
|
970
|
|
|
|
|
|
|
|
|
971
|
9
|
|
|
|
|
12
|
$suppress_result = 0; |
|
972
|
9
|
|
|
|
|
21
|
$row = { |
|
973
|
|
|
|
|
|
|
&$context_columns_func, |
|
974
|
|
|
|
|
|
|
}; |
|
975
|
9
|
|
|
|
|
362
|
&$final_values_func; |
|
976
|
|
|
|
|
|
|
|
|
977
|
|
|
|
|
|
|
# keep @contexts and @current_context together |
|
978
|
9
|
|
|
|
|
176
|
$oldps = pop(@contexts); |
|
979
|
9
|
|
|
|
|
57
|
pop(@current_context); |
|
980
|
9
|
|
|
|
|
14
|
pop(@context_strings); |
|
981
|
|
|
|
|
|
|
|
|
982
|
9
|
|
|
|
|
19
|
$ps = $contexts[-1]; |
|
983
|
|
|
|
|
|
|
|
|
984
|
9
|
100
|
|
|
|
156
|
&$merge_func if $ps; |
|
985
|
|
|
|
|
|
|
|
|
986
|
9
|
50
|
|
|
|
43
|
push (@$retref, $row) unless $suppress_result; |
|
987
|
5
|
|
|
|
|
29
|
}; |
|
988
|
|
|
|
|
|
|
|
|
989
|
5
|
|
|
|
|
55
|
return $process_func; |
|
990
|
|
|
|
|
|
|
|
|
991
|
|
|
|
|
|
|
} |
|
992
|
|
|
|
|
|
|
|
|
993
|
|
|
|
|
|
|
sub validate_aggregation_config |
|
994
|
|
|
|
|
|
|
{ |
|
995
|
5
|
|
|
5
|
0
|
13
|
my ($agg_config) = @_; |
|
996
|
5
|
|
|
|
|
38
|
my $checker = eval config_checker_source; |
|
997
|
5
|
50
|
|
|
|
13666
|
die $@ if $@; |
|
998
|
5
|
|
|
|
|
28
|
$checker->($agg_config, $prototype_config, '- Stream::Aggregate config'); |
|
999
|
|
|
|
|
|
|
} |
|
1000
|
|
|
|
|
|
|
|
|
1001
|
|
|
|
|
|
|
1; |
|
1002
|
|
|
|
|
|
|
|