line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
#!/usr/bin/perl -w |
2
|
|
|
|
|
|
|
|
3
|
|
|
|
|
|
|
# |
4
|
|
|
|
|
|
|
# dbmapreduce.pm |
5
|
|
|
|
|
|
|
# Copyright (C) 1991-2016 by John Heidemann |
6
|
|
|
|
|
|
|
# |
7
|
|
|
|
|
|
|
# This program is distributed under terms of the GNU general |
8
|
|
|
|
|
|
|
# public license, version 2. See the file COPYING |
9
|
|
|
|
|
|
|
# in $dblibdir for details. |
10
|
|
|
|
|
|
|
# |
11
|
|
|
|
|
|
|
|
12
|
|
|
|
|
|
|
|
13
|
|
|
|
|
|
|
package Fsdb::Filter::dbmapreduce; |
14
|
|
|
|
|
|
|
|
15
|
|
|
|
|
|
|
=head1 NAME |
16
|
|
|
|
|
|
|
|
17
|
|
|
|
|
|
|
dbmapreduce - reduce all input rows with the same key |
18
|
|
|
|
|
|
|
|
19
|
|
|
|
|
|
|
=head1 SYNOPSIS |
20
|
|
|
|
|
|
|
|
21
|
|
|
|
|
|
|
dbmapreduce [-dMS] [-k KeyField] [-f CodeFile] [-C Filtercode] [--] [ReduceCommand [ReduceArguments...]] |
22
|
|
|
|
|
|
|
|
23
|
|
|
|
|
|
|
=head1 DESCRIPTION |
24
|
|
|
|
|
|
|
|
25
|
|
|
|
|
|
|
Group input data by KeyField, |
26
|
|
|
|
|
|
|
then apply a function (the "reducer") to each group. |
27
|
|
|
|
|
|
|
The reduce function can be an external program |
28
|
|
|
|
|
|
|
given by ReduceCommand and ReduceArguments, |
29
|
|
|
|
|
|
|
or an Perl subroutine given in CodeFile or FilterCode. |
30
|
|
|
|
|
|
|
|
31
|
|
|
|
|
|
|
If a "--" appears before reduce command, |
32
|
|
|
|
|
|
|
arguments after the -- passed the the command. |
33
|
|
|
|
|
|
|
|
34
|
|
|
|
|
|
|
|
35
|
|
|
|
|
|
|
=head2 Grouping (The Mapper) |
36
|
|
|
|
|
|
|
|
37
|
|
|
|
|
|
|
By default the KeyField is the first field in the row. |
38
|
|
|
|
|
|
|
Unlike Hadoop streaming, the -k KeyField option can explicitly |
39
|
|
|
|
|
|
|
name where the key is in any column of each input row. |
40
|
|
|
|
|
|
|
|
41
|
|
|
|
|
|
|
By default, we sort the data to make sure data is grouped by key. |
42
|
|
|
|
|
|
|
If the input is already grouped, the C<-S> option avoids this cost. |
43
|
|
|
|
|
|
|
|
44
|
|
|
|
|
|
|
|
45
|
|
|
|
|
|
|
=head2 The Reducer |
46
|
|
|
|
|
|
|
|
47
|
|
|
|
|
|
|
Reduce functions default to be shell commands. |
48
|
|
|
|
|
|
|
However, with C<-C>, one can use arbitrary Perl code |
49
|
|
|
|
|
|
|
|
50
|
|
|
|
|
|
|
(see the C<-C> option below for details). |
51
|
|
|
|
|
|
|
the C<-f> option is useful to specify complex Perl code |
52
|
|
|
|
|
|
|
somewhere other than the command line. |
53
|
|
|
|
|
|
|
|
54
|
|
|
|
|
|
|
Finally, as a special case, if there are no rows of input, |
55
|
|
|
|
|
|
|
the reducer will be invoked once with the empty value (if it's an external |
56
|
|
|
|
|
|
|
reducer) or with undef (if it's a subroutine). |
57
|
|
|
|
|
|
|
It is expected to generate the output header, |
58
|
|
|
|
|
|
|
and it may generate no data rows itself, or a null data row |
59
|
|
|
|
|
|
|
of its choosing. |
60
|
|
|
|
|
|
|
|
61
|
|
|
|
|
|
|
=head2 Output |
62
|
|
|
|
|
|
|
|
63
|
|
|
|
|
|
|
For non-multi-key-aware reducers, |
64
|
|
|
|
|
|
|
we add the KeyField use for each Reduce |
65
|
|
|
|
|
|
|
is in the output stream. |
66
|
|
|
|
|
|
|
(If the reducer passes the key we trust that it gives a correct value.) |
67
|
|
|
|
|
|
|
We also insure that the output field separator is the |
68
|
|
|
|
|
|
|
same as the input field separator. |
69
|
|
|
|
|
|
|
|
70
|
|
|
|
|
|
|
Adding the key and adjusting the output field separator |
71
|
|
|
|
|
|
|
is not possible for |
72
|
|
|
|
|
|
|
non-multi-key-aware reducers. |
73
|
|
|
|
|
|
|
|
74
|
|
|
|
|
|
|
|
75
|
|
|
|
|
|
|
=head2 Comparison to Related Work |
76
|
|
|
|
|
|
|
|
77
|
|
|
|
|
|
|
This program thus implements Google-style map/reduce, |
78
|
|
|
|
|
|
|
but executed sequentially. |
79
|
|
|
|
|
|
|
|
80
|
|
|
|
|
|
|
For input, these systems include a map function and apply it to input data |
81
|
|
|
|
|
|
|
to generate the key. |
82
|
|
|
|
|
|
|
We assume this key generation (the map function) |
83
|
|
|
|
|
|
|
has occurred head of time. |
84
|
|
|
|
|
|
|
|
85
|
|
|
|
|
|
|
We also allow the grouping key to be in any column. |
86
|
|
|
|
|
|
|
Hadoop Streaming requires it to be in the first column. |
87
|
|
|
|
|
|
|
|
88
|
|
|
|
|
|
|
By default, the reducer gets exactly (and only) one key. |
89
|
|
|
|
|
|
|
This invariant is stronger than Google and Hadoop. |
90
|
|
|
|
|
|
|
They both pass multiple keys to the |
91
|
|
|
|
|
|
|
reducer, insuring that each key is grouped together. |
92
|
|
|
|
|
|
|
With the C<-M> option, we also pass multiple multiple groups to the reducer. |
93
|
|
|
|
|
|
|
|
94
|
|
|
|
|
|
|
Unlike those systems, with the C<-S> option |
95
|
|
|
|
|
|
|
we do not require the groups arrive in any particular |
96
|
|
|
|
|
|
|
order, just that they be grouped together. |
97
|
|
|
|
|
|
|
(They guarantees they arrive in lexically sorted order). |
98
|
|
|
|
|
|
|
However, with C<-S> we create lexical ordering. |
99
|
|
|
|
|
|
|
|
100
|
|
|
|
|
|
|
With C<--prepend-key> we insure that the KeyField is in the output stream; |
101
|
|
|
|
|
|
|
other systems do not enforce this. |
102
|
|
|
|
|
|
|
|
103
|
|
|
|
|
|
|
|
104
|
|
|
|
|
|
|
=head2 Assumptions and requirements |
105
|
|
|
|
|
|
|
|
106
|
|
|
|
|
|
|
By default, data can be provided in arbitrary order |
107
|
|
|
|
|
|
|
and the program consumes O(number of unique tags) memory, |
108
|
|
|
|
|
|
|
and O(size of data) disk space. |
109
|
|
|
|
|
|
|
|
110
|
|
|
|
|
|
|
With the C<-S> option, data must arrive group by tags (not necessarily sorted), |
111
|
|
|
|
|
|
|
and the program consumes O(number of tags) memory and no disk space. |
112
|
|
|
|
|
|
|
The program will check and abort if this precondition is not met. |
113
|
|
|
|
|
|
|
|
114
|
|
|
|
|
|
|
With two C<-S>'s, program consumes O(1) memory, but doesn't verify |
115
|
|
|
|
|
|
|
that the data-arrival precondition is met. |
116
|
|
|
|
|
|
|
|
117
|
|
|
|
|
|
|
The field separators of the input and the output |
118
|
|
|
|
|
|
|
can now be different |
119
|
|
|
|
|
|
|
(early versions of this tool prohibited such variation.) |
120
|
|
|
|
|
|
|
With C<--copy-fs> we copy the input field separator to the output, |
121
|
|
|
|
|
|
|
but only for non-multi-key-aware reducers. |
122
|
|
|
|
|
|
|
(this used to be done automatically). |
123
|
|
|
|
|
|
|
|
124
|
|
|
|
|
|
|
|
125
|
|
|
|
|
|
|
=head2 Known bugs |
126
|
|
|
|
|
|
|
|
127
|
|
|
|
|
|
|
As of 2013-09-21, we don't verify key order with options C<-M -S>. |
128
|
|
|
|
|
|
|
|
129
|
|
|
|
|
|
|
|
130
|
|
|
|
|
|
|
=head1 OPTIONS |
131
|
|
|
|
|
|
|
|
132
|
|
|
|
|
|
|
=over 4 |
133
|
|
|
|
|
|
|
|
134
|
|
|
|
|
|
|
=item B<-k> or B<--key> KeyField |
135
|
|
|
|
|
|
|
|
136
|
|
|
|
|
|
|
specify which column is the key for grouping (default: the first column) |
137
|
|
|
|
|
|
|
|
138
|
|
|
|
|
|
|
=item B<-S> or B<--pre-sorted> |
139
|
|
|
|
|
|
|
|
140
|
|
|
|
|
|
|
Assume data is already grouped by tag. |
141
|
|
|
|
|
|
|
Provided twice, it removes the validation of this assertion. |
142
|
|
|
|
|
|
|
|
143
|
|
|
|
|
|
|
=item B<-M> or B<--multiple-ok> |
144
|
|
|
|
|
|
|
|
145
|
|
|
|
|
|
|
Assume the ReduceCommand can handle multiple grouped keys, |
146
|
|
|
|
|
|
|
and the ReduceCommand is responsible for outputting the |
147
|
|
|
|
|
|
|
with each output row. |
148
|
|
|
|
|
|
|
(By default, a separate ReduceCommand is run for each key, |
149
|
|
|
|
|
|
|
and dbmapreduce adds the key to each output row.) |
150
|
|
|
|
|
|
|
|
151
|
|
|
|
|
|
|
=item B<-K> or B<--pass-current-key> |
152
|
|
|
|
|
|
|
|
153
|
|
|
|
|
|
|
Pass the current key as an argument to the external, |
154
|
|
|
|
|
|
|
non-map-aware ReduceCommand. |
155
|
|
|
|
|
|
|
This is only done optionally since some external commands |
156
|
|
|
|
|
|
|
do not expect an extra argument. |
157
|
|
|
|
|
|
|
(Internal, non-map-aware Perl reducers are always given |
158
|
|
|
|
|
|
|
the current key as an argument.) |
159
|
|
|
|
|
|
|
|
160
|
|
|
|
|
|
|
=item B<--prepend-key> |
161
|
|
|
|
|
|
|
|
162
|
|
|
|
|
|
|
Add the current key into the reducer output |
163
|
|
|
|
|
|
|
for non-multi-key-aware reducers only. |
164
|
|
|
|
|
|
|
Not done by default. |
165
|
|
|
|
|
|
|
|
166
|
|
|
|
|
|
|
=item B<--copy-fs> or B<--copy-fieldseparator> |
167
|
|
|
|
|
|
|
|
168
|
|
|
|
|
|
|
Change the field separator of a |
169
|
|
|
|
|
|
|
non-multi-key-aware reducers to match the input's field separator. |
170
|
|
|
|
|
|
|
Not done by default. |
171
|
|
|
|
|
|
|
|
172
|
|
|
|
|
|
|
=item B<--parallelism=N> or B<-j N> |
173
|
|
|
|
|
|
|
|
174
|
|
|
|
|
|
|
Allow up to N reducers to run in parallel. |
175
|
|
|
|
|
|
|
Default is the number of CPUs in the machine. |
176
|
|
|
|
|
|
|
|
177
|
|
|
|
|
|
|
=item B<-C FILTER-CODE> or B<--filter-code=FILTER-CODE> |
178
|
|
|
|
|
|
|
|
179
|
|
|
|
|
|
|
Provide FILTER-CODE, Perl code that generates and returns |
180
|
|
|
|
|
|
|
a Fsdb::Filter object that implements the reduce function. |
181
|
|
|
|
|
|
|
The provided code should be an anonymous sub |
182
|
|
|
|
|
|
|
that creates a Fsdb Filter that implements the reduce object. |
183
|
|
|
|
|
|
|
|
184
|
|
|
|
|
|
|
The reduce object will then be called with --input and --output |
185
|
|
|
|
|
|
|
parameters that hook it into a the reduce with queues. |
186
|
|
|
|
|
|
|
|
187
|
|
|
|
|
|
|
One sample fragment that works is just: |
188
|
|
|
|
|
|
|
|
189
|
|
|
|
|
|
|
dbcolstats(qw(--nolog duration)) |
190
|
|
|
|
|
|
|
|
191
|
|
|
|
|
|
|
So this command: |
192
|
|
|
|
|
|
|
|
193
|
|
|
|
|
|
|
cat DATA/stats.fsdb | \ |
194
|
|
|
|
|
|
|
dbmapreduce -k experiment -C 'dbcolstats(qw(--nolog duration))' |
195
|
|
|
|
|
|
|
|
196
|
|
|
|
|
|
|
is the same as the example |
197
|
|
|
|
|
|
|
|
198
|
|
|
|
|
|
|
cat DATA/stats.fsdb | \ |
199
|
|
|
|
|
|
|
dbmapreduce -k experiment -- dbcolstats duration |
200
|
|
|
|
|
|
|
|
201
|
|
|
|
|
|
|
except that with C<-C> there is no forking and so things run faster. |
202
|
|
|
|
|
|
|
|
203
|
|
|
|
|
|
|
If C is invoked from within Perl, then one can use |
204
|
|
|
|
|
|
|
a code SUB as well: |
205
|
|
|
|
|
|
|
dbmapreduce(-k => 'experiment', |
206
|
|
|
|
|
|
|
-C => sub { dbcolstats(qw(--nolong duration)) }); |
207
|
|
|
|
|
|
|
|
208
|
|
|
|
|
|
|
The reduce object must consume I input as a Fsdb stream, |
209
|
|
|
|
|
|
|
and close the output Fsdb stream. (If this assumption is not |
210
|
|
|
|
|
|
|
met the map/reduce will be aborted.) |
211
|
|
|
|
|
|
|
|
212
|
|
|
|
|
|
|
For non-map-reduce-aware filters, |
213
|
|
|
|
|
|
|
when the filter-generator code runs, C<$_[0]> will be the current key. |
214
|
|
|
|
|
|
|
|
215
|
|
|
|
|
|
|
=item B<-f CODE-FILE> or B<--code-file=CODE-FILE> |
216
|
|
|
|
|
|
|
|
217
|
|
|
|
|
|
|
Includes F in the program. |
218
|
|
|
|
|
|
|
This option is useful for more complicated perl reducer functions. |
219
|
|
|
|
|
|
|
|
220
|
|
|
|
|
|
|
Thus, if reducer.pl has the code. |
221
|
|
|
|
|
|
|
|
222
|
|
|
|
|
|
|
sub make_reducer { |
223
|
|
|
|
|
|
|
my($current_key) = @_; |
224
|
|
|
|
|
|
|
dbcolstats(qw(--nolog duration)); |
225
|
|
|
|
|
|
|
} |
226
|
|
|
|
|
|
|
|
227
|
|
|
|
|
|
|
Then the command |
228
|
|
|
|
|
|
|
|
229
|
|
|
|
|
|
|
cat DATA/stats.fsdb | \ |
230
|
|
|
|
|
|
|
dbmapreduce -k experiment -f reducer.pl -C make_reducer |
231
|
|
|
|
|
|
|
|
232
|
|
|
|
|
|
|
does the same thing as the example. |
233
|
|
|
|
|
|
|
|
234
|
|
|
|
|
|
|
|
235
|
|
|
|
|
|
|
=item B<-w> or B<--warnings> |
236
|
|
|
|
|
|
|
|
237
|
|
|
|
|
|
|
Enable warnings in user supplied code. |
238
|
|
|
|
|
|
|
Warnings are issued if an external reducer fails to consume all input. |
239
|
|
|
|
|
|
|
(Default to include warnings.) |
240
|
|
|
|
|
|
|
|
241
|
|
|
|
|
|
|
=item B<-T TmpDir> |
242
|
|
|
|
|
|
|
|
243
|
|
|
|
|
|
|
where to put tmp files. |
244
|
|
|
|
|
|
|
Also uses environment variable TMPDIR, if -T is |
245
|
|
|
|
|
|
|
not specified. |
246
|
|
|
|
|
|
|
Default is /tmp. |
247
|
|
|
|
|
|
|
|
248
|
|
|
|
|
|
|
=back |
249
|
|
|
|
|
|
|
|
250
|
|
|
|
|
|
|
=for comment |
251
|
|
|
|
|
|
|
begin_standard_fsdb_options |
252
|
|
|
|
|
|
|
|
253
|
|
|
|
|
|
|
This module also supports the standard fsdb options: |
254
|
|
|
|
|
|
|
|
255
|
|
|
|
|
|
|
=over 4 |
256
|
|
|
|
|
|
|
|
257
|
|
|
|
|
|
|
=item B<-d> |
258
|
|
|
|
|
|
|
|
259
|
|
|
|
|
|
|
Enable debugging output. |
260
|
|
|
|
|
|
|
|
261
|
|
|
|
|
|
|
=item B<-i> or B<--input> InputSource |
262
|
|
|
|
|
|
|
|
263
|
|
|
|
|
|
|
Read from InputSource, typically a file name, or C<-> for standard input, |
264
|
|
|
|
|
|
|
or (if in Perl) a IO::Handle, Fsdb::IO or Fsdb::BoundedQueue objects. |
265
|
|
|
|
|
|
|
|
266
|
|
|
|
|
|
|
=item B<-o> or B<--output> OutputDestination |
267
|
|
|
|
|
|
|
|
268
|
|
|
|
|
|
|
Write to OutputDestination, typically a file name, or C<-> for standard output, |
269
|
|
|
|
|
|
|
or (if in Perl) a IO::Handle, Fsdb::IO or Fsdb::BoundedQueue objects. |
270
|
|
|
|
|
|
|
|
271
|
|
|
|
|
|
|
=item B<--autorun> or B<--noautorun> |
272
|
|
|
|
|
|
|
|
273
|
|
|
|
|
|
|
By default, programs process automatically, |
274
|
|
|
|
|
|
|
but Fsdb::Filter objects in Perl do not run until you invoke |
275
|
|
|
|
|
|
|
the run() method. |
276
|
|
|
|
|
|
|
The C<--(no)autorun> option controls that behavior within Perl. |
277
|
|
|
|
|
|
|
|
278
|
|
|
|
|
|
|
=item B<--header> H |
279
|
|
|
|
|
|
|
|
280
|
|
|
|
|
|
|
Use H as the full Fsdb header, rather than reading a header from |
281
|
|
|
|
|
|
|
then input. |
282
|
|
|
|
|
|
|
|
283
|
|
|
|
|
|
|
=item B<--help> |
284
|
|
|
|
|
|
|
|
285
|
|
|
|
|
|
|
Show help. |
286
|
|
|
|
|
|
|
|
287
|
|
|
|
|
|
|
=item B<--man> |
288
|
|
|
|
|
|
|
|
289
|
|
|
|
|
|
|
Show full manual. |
290
|
|
|
|
|
|
|
|
291
|
|
|
|
|
|
|
=back |
292
|
|
|
|
|
|
|
|
293
|
|
|
|
|
|
|
=for comment |
294
|
|
|
|
|
|
|
end_standard_fsdb_options |
295
|
|
|
|
|
|
|
|
296
|
|
|
|
|
|
|
|
297
|
|
|
|
|
|
|
=head1 SAMPLE USAGE |
298
|
|
|
|
|
|
|
|
299
|
|
|
|
|
|
|
=head2 Input: |
300
|
|
|
|
|
|
|
|
301
|
|
|
|
|
|
|
#fsdb experiment duration |
302
|
|
|
|
|
|
|
ufs_mab_sys 37.2 |
303
|
|
|
|
|
|
|
ufs_mab_sys 37.3 |
304
|
|
|
|
|
|
|
ufs_rcp_real 264.5 |
305
|
|
|
|
|
|
|
ufs_rcp_real 277.9 |
306
|
|
|
|
|
|
|
|
307
|
|
|
|
|
|
|
=head2 Command: |
308
|
|
|
|
|
|
|
|
309
|
|
|
|
|
|
|
cat DATA/stats.fsdb | \ |
310
|
|
|
|
|
|
|
dbmapreduce --prepend-key -k experiment -- dbcolstats duration |
311
|
|
|
|
|
|
|
|
312
|
|
|
|
|
|
|
=head2 Output: |
313
|
|
|
|
|
|
|
|
314
|
|
|
|
|
|
|
#fsdb experiment mean stddev pct_rsd conf_range conf_low conf_high conf_pct sum sum_squared min max n |
315
|
|
|
|
|
|
|
ufs_mab_sys 37.25 0.070711 0.18983 0.6353 36.615 37.885 0.95 74.5 2775.1 37.2 37.3 2 |
316
|
|
|
|
|
|
|
ufs_rcp_real 271.2 9.4752 3.4938 85.13 186.07 356.33 0.95 542.4 1.4719e+05 264.5 277.9 2 |
317
|
|
|
|
|
|
|
# | dbmapreduce -k experiment dbstats duration |
318
|
|
|
|
|
|
|
|
319
|
|
|
|
|
|
|
|
320
|
|
|
|
|
|
|
=head1 SEE ALSO |
321
|
|
|
|
|
|
|
|
322
|
|
|
|
|
|
|
L. |
323
|
|
|
|
|
|
|
L |
324
|
|
|
|
|
|
|
L |
325
|
|
|
|
|
|
|
|
326
|
|
|
|
|
|
|
|
327
|
|
|
|
|
|
|
=head1 CLASS FUNCTIONS |
328
|
|
|
|
|
|
|
|
329
|
|
|
|
|
|
|
OLD TEXT: |
330
|
|
|
|
|
|
|
A few notes about the internal structure: |
331
|
|
|
|
|
|
|
L uses two to four threads (actually Freds) to run. |
332
|
|
|
|
|
|
|
An optional thread C<$self->{_in_fred}> sorts the input. |
333
|
|
|
|
|
|
|
The main process reads input and groups input by key. |
334
|
|
|
|
|
|
|
Each group is passed to a |
335
|
|
|
|
|
|
|
secondary fred C<$self->{_reducer_thread}> |
336
|
|
|
|
|
|
|
that invokes the reducer on each group |
337
|
|
|
|
|
|
|
and does any output. |
338
|
|
|
|
|
|
|
If the reducer is I map-aware, then |
339
|
|
|
|
|
|
|
we create a final postprocessor thread that |
340
|
|
|
|
|
|
|
adds the key back to the output. |
341
|
|
|
|
|
|
|
Either the reducer or the postprocessor thread do output. |
342
|
|
|
|
|
|
|
|
343
|
|
|
|
|
|
|
NEW VERSION with Freds: |
344
|
|
|
|
|
|
|
|
345
|
|
|
|
|
|
|
A few notes about parallelism, since we have fairly different structure |
346
|
|
|
|
|
|
|
depending on what we're doing: |
347
|
|
|
|
|
|
|
|
348
|
|
|
|
|
|
|
1. for multi-key aware reducers, there is no output post-processing. |
349
|
|
|
|
|
|
|
|
350
|
|
|
|
|
|
|
1a. if input is sorted and there is no input checking (-S -S), |
351
|
|
|
|
|
|
|
we run the reducer in our own process. |
352
|
|
|
|
|
|
|
(F) |
353
|
|
|
|
|
|
|
|
354
|
|
|
|
|
|
|
1b. with grouped input and input checking (-S), |
355
|
|
|
|
|
|
|
we fork off an input process that checks grouping, |
356
|
|
|
|
|
|
|
then run the reducer in our process. |
357
|
|
|
|
|
|
|
(F) |
358
|
|
|
|
|
|
|
xxx: case 1b not yet done |
359
|
|
|
|
|
|
|
|
360
|
|
|
|
|
|
|
1c. with ungrouped input, |
361
|
|
|
|
|
|
|
we invoke an input process to do sorting, |
362
|
|
|
|
|
|
|
then run the reducer in our process. |
363
|
|
|
|
|
|
|
(F) |
364
|
|
|
|
|
|
|
|
365
|
|
|
|
|
|
|
2. for non-multi-key aware. |
366
|
|
|
|
|
|
|
A sorter thread groups content, if necessary. |
367
|
|
|
|
|
|
|
We breaks stuff into groups |
368
|
|
|
|
|
|
|
and feeds them to a reducer Fred, one per group. |
369
|
|
|
|
|
|
|
A dedicated additional Fred merges output and addes the missing key, |
370
|
|
|
|
|
|
|
if necessary. |
371
|
|
|
|
|
|
|
Either way, output ends up in a file. |
372
|
|
|
|
|
|
|
A finally postprocessor thread merges all the output files. |
373
|
|
|
|
|
|
|
|
374
|
|
|
|
|
|
|
=cut |
375
|
|
|
|
|
|
|
|
376
|
|
|
|
|
|
|
@ISA = qw(Fsdb::Filter); |
377
|
|
|
|
|
|
|
$VERSION = 2.0; |
378
|
|
|
|
|
|
|
|
379
|
1
|
|
|
1
|
|
20
|
use 5.010; |
|
1
|
|
|
|
|
3
|
|
380
|
1
|
|
|
1
|
|
4
|
use strict; |
|
1
|
|
|
|
|
1
|
|
|
1
|
|
|
|
|
17
|
|
381
|
1
|
|
|
1
|
|
3
|
use Pod::Usage; |
|
1
|
|
|
|
|
6
|
|
|
1
|
|
|
|
|
61
|
|
382
|
1
|
|
|
1
|
|
6
|
use Carp; |
|
1
|
|
|
|
|
2
|
|
|
1
|
|
|
|
|
46
|
|
383
|
|
|
|
|
|
|
|
384
|
1
|
|
|
1
|
|
6
|
use Fsdb::Filter; |
|
1
|
|
|
|
|
1
|
|
|
1
|
|
|
|
|
19
|
|
385
|
1
|
|
|
1
|
|
4
|
use Fsdb::IO::Reader; |
|
1
|
|
|
|
|
2
|
|
|
1
|
|
|
|
|
15
|
|
386
|
1
|
|
|
1
|
|
3
|
use Fsdb::IO::Writer; |
|
1
|
|
|
|
|
2
|
|
|
1
|
|
|
|
|
17
|
|
387
|
1
|
|
|
1
|
|
304
|
use Fsdb::Filter::dbsubprocess; |
|
1
|
|
|
|
|
3
|
|
|
1
|
|
|
|
|
24
|
|
388
|
1
|
|
|
1
|
|
7
|
use Fsdb::Support::NamedTmpfile; |
|
1
|
|
|
|
|
3
|
|
|
1
|
|
|
|
|
17
|
|
389
|
1
|
|
|
1
|
|
4
|
use Fsdb::Support::OS; |
|
1
|
|
|
|
|
1
|
|
|
1
|
|
|
|
|
36
|
|
390
|
1
|
|
|
1
|
|
6
|
use Fsdb::Filter::dbpipeline qw(dbpipeline_filter dbpipeline_sink dbsort dbcolcreate dbfilecat dbfilealter dbsubprocess); |
|
1
|
|
|
|
|
1
|
|
|
1
|
|
|
|
|
2224
|
|
391
|
|
|
|
|
|
|
|
392
|
|
|
|
|
|
|
my $REDUCER_GROUP_SYNCHRONIZATION_FLAG = 'reducer group synchronization flag'; |
393
|
|
|
|
|
|
|
|
394
|
|
|
|
|
|
|
=head2 new |
395
|
|
|
|
|
|
|
|
396
|
|
|
|
|
|
|
$filter = new Fsdb::Filter::dbmapreduce(@arguments); |
397
|
|
|
|
|
|
|
|
398
|
|
|
|
|
|
|
Create a new dbmapreduce object, taking command-line arguments. |
399
|
|
|
|
|
|
|
|
400
|
|
|
|
|
|
|
=cut |
401
|
|
|
|
|
|
|
|
402
|
|
|
|
|
|
|
sub new ($@) { |
403
|
0
|
|
|
0
|
1
|
|
my $class = shift @_; |
404
|
0
|
|
|
|
|
|
my $self = $class->SUPER::new(@_); |
405
|
0
|
|
|
|
|
|
bless $self, $class; |
406
|
0
|
|
|
|
|
|
$self->set_defaults; |
407
|
0
|
|
|
|
|
|
$self->parse_options(@_); |
408
|
0
|
|
|
|
|
|
$self->SUPER::post_new(); |
409
|
0
|
|
|
|
|
|
return $self; |
410
|
|
|
|
|
|
|
} |
411
|
|
|
|
|
|
|
|
412
|
|
|
|
|
|
|
|
413
|
|
|
|
|
|
|
=head2 set_defaults |
414
|
|
|
|
|
|
|
|
415
|
|
|
|
|
|
|
$filter->set_defaults(); |
416
|
|
|
|
|
|
|
|
417
|
|
|
|
|
|
|
Internal: set up defaults. |
418
|
|
|
|
|
|
|
|
419
|
|
|
|
|
|
|
=cut |
420
|
|
|
|
|
|
|
|
421
|
|
|
|
|
|
|
sub set_defaults ($) { |
422
|
0
|
|
|
0
|
1
|
|
my($self) = @_; |
423
|
0
|
|
|
|
|
|
$self->SUPER::set_defaults(); |
424
|
0
|
|
|
|
|
|
$self->{_key_column} = undef; |
425
|
0
|
|
|
|
|
|
$self->{_pre_sorted} = 0; |
426
|
0
|
|
|
|
|
|
$self->{_filter_generator_code} = undef; |
427
|
0
|
|
|
|
|
|
$self->{_reduce_generator} = undef; |
428
|
0
|
|
|
|
|
|
$self->{_reducer_is_multikey_aware} = undef; |
429
|
0
|
|
|
|
|
|
$self->{_external_command_argv} = []; |
430
|
0
|
|
|
|
|
|
$self->{_pass_current_key} = undef; |
431
|
0
|
|
|
|
|
|
$self->{_prepend_key} = undef; |
432
|
0
|
|
|
|
|
|
$self->{_copy_fscode} = undef; |
433
|
0
|
|
|
|
|
|
$self->{_filter_generator_code} = undef; |
434
|
0
|
|
|
|
|
|
$self->{_code_files} = []; |
435
|
0
|
|
|
|
|
|
$self->{_warnings} = 1; |
436
|
0
|
|
|
|
|
|
$self->{_max_parallelism} = undef; |
437
|
0
|
|
|
|
|
|
$self->{_parallelism_available} = undef; |
438
|
0
|
|
|
|
|
|
$self->{_test_parallelism} = undef; |
439
|
0
|
|
|
|
|
|
$self->{_header} = undef; |
440
|
0
|
|
|
|
|
|
$self->set_default_tmpdir; |
441
|
|
|
|
|
|
|
} |
442
|
|
|
|
|
|
|
|
443
|
|
|
|
|
|
|
=head2 parse_options |
444
|
|
|
|
|
|
|
|
445
|
|
|
|
|
|
|
$filter->parse_options(@ARGV); |
446
|
|
|
|
|
|
|
|
447
|
|
|
|
|
|
|
Internal: parse command-line arguments. |
448
|
|
|
|
|
|
|
|
449
|
|
|
|
|
|
|
=cut |
450
|
|
|
|
|
|
|
|
451
|
|
|
|
|
|
|
sub parse_options ($@) { |
452
|
0
|
|
|
0
|
1
|
|
my $self = shift @_; |
453
|
|
|
|
|
|
|
|
454
|
0
|
|
|
|
|
|
my(@argv) = @_; |
455
|
|
|
|
|
|
|
$self->get_options( |
456
|
|
|
|
|
|
|
\@argv, |
457
|
0
|
|
|
0
|
|
|
'help|?' => sub { pod2usage(1); }, |
458
|
0
|
|
|
0
|
|
|
'man' => sub { pod2usage(-verbose => 2); }, |
459
|
|
|
|
|
|
|
'autorun!' => \$self->{_autorun}, |
460
|
|
|
|
|
|
|
'C|filter-code|code=s' => \$self->{_filter_generator_code}, |
461
|
|
|
|
|
|
|
'close!' => \$self->{_close}, |
462
|
|
|
|
|
|
|
'copy-fs|copy-fieldseparator!' => \$self->{_copy_fscode}, |
463
|
|
|
|
|
|
|
'd|debug+' => \$self->{_debug}, |
464
|
|
|
|
|
|
|
'f|code-files=s@' => $self->{_code_files}, |
465
|
|
|
|
|
|
|
'header=s' => \$self->{_header}, |
466
|
0
|
|
|
0
|
|
|
'i|input=s' => sub { $self->parse_io_option('input', @_); }, |
467
|
|
|
|
|
|
|
'j|parallelism=i' => \$self->{_max_parallelism}, |
468
|
|
|
|
|
|
|
'k|key=s' => \$self->{_key_column}, |
469
|
|
|
|
|
|
|
'K|pass-current-key!' => \$self->{_pass_current_key}, |
470
|
0
|
|
|
0
|
|
|
'prepend-key' => sub { $self->{_prepend_key} = 1; }, |
471
|
0
|
|
|
0
|
|
|
'no-prepend-key' => sub { $self->{_prepend_key} = 0; }, # set but false |
472
|
|
|
|
|
|
|
'log!' => \$self->{_logprog}, |
473
|
|
|
|
|
|
|
'M|multiple-ok!' => \$self->{_reducer_is_multikey_aware}, |
474
|
0
|
|
|
0
|
|
|
'o|output=s' => sub { $self->parse_io_option('output', @_); }, |
475
|
|
|
|
|
|
|
'S|pre-sorted+' => \$self->{_pre_sorted}, |
476
|
|
|
|
|
|
|
'test-parallelism!' => \$self->{_test_parallelism}, # for test suite only |
477
|
|
|
|
|
|
|
'T|tmpdir|tempdir=s' => \$self->{_tmpdir}, |
478
|
|
|
|
|
|
|
'saveoutput=s' => \$self->{_save_output}, |
479
|
|
|
|
|
|
|
'w|warnings!' => \$self->{_warnings}, |
480
|
0
|
0
|
|
|
|
|
) or pod2usage(2); |
481
|
0
|
|
|
|
|
|
push (@{$self->{_external_command_argv}}, @argv); |
|
0
|
|
|
|
|
|
|
482
|
|
|
|
|
|
|
} |
483
|
|
|
|
|
|
|
|
484
|
|
|
|
|
|
|
|
485
|
|
|
|
|
|
|
=head2 setup |
486
|
|
|
|
|
|
|
|
487
|
|
|
|
|
|
|
$filter->setup(); |
488
|
|
|
|
|
|
|
|
489
|
|
|
|
|
|
|
Internal: setup, parse headers. |
490
|
|
|
|
|
|
|
|
491
|
|
|
|
|
|
|
=cut |
492
|
|
|
|
|
|
|
|
493
|
|
|
|
|
|
|
sub setup($) { |
494
|
0
|
|
|
0
|
1
|
|
my($self) = @_; |
495
|
|
|
|
|
|
|
|
496
|
|
|
|
|
|
|
$self->{_prepend_key} = !$self->{_reducer_is_multikey_aware} |
497
|
0
|
0
|
|
|
|
|
if (!defined($self->{_prepend_key})); |
498
|
|
|
|
|
|
|
croak $self->{_prog} . ": cannot prepend keys for multikey-aware reducers.\n" |
499
|
0
|
0
|
0
|
|
|
|
if ($self->{_prepend_key} && $self->{_reducer_is_multikey_aware}); |
500
|
|
|
|
|
|
|
|
501
|
0
|
|
|
|
|
|
my $included_code = ''; |
502
|
|
|
|
|
|
|
# |
503
|
|
|
|
|
|
|
# get any extra code |
504
|
|
|
|
|
|
|
# |
505
|
0
|
|
|
|
|
|
foreach my $code_file (@{$self->{_code_files}}) { |
|
0
|
|
|
|
|
|
|
506
|
0
|
0
|
|
|
|
|
open(IN, "< $code_file") or croak $self->{_prog} . ": cannot read code from $code_file\n"; |
507
|
0
|
|
|
|
|
|
$included_code .= join('', ); |
508
|
0
|
|
|
|
|
|
close IN; |
509
|
|
|
|
|
|
|
}; |
510
|
|
|
|
|
|
|
|
511
|
|
|
|
|
|
|
# |
512
|
|
|
|
|
|
|
# control parallelism |
513
|
|
|
|
|
|
|
# |
514
|
0
|
|
0
|
|
|
|
$self->{_max_parallelism} //= Fsdb::Support::OS::max_parallelism(); |
515
|
0
|
0
|
|
|
|
|
$self->{_max_parallelism} = 1 if ($self->{_max_parallelism} < 1); # always allow some |
516
|
0
|
|
0
|
|
|
|
$self->{_parallelism_available} //= $self->{_max_parallelism}; |
517
|
|
|
|
|
|
|
|
518
|
|
|
|
|
|
|
# |
519
|
|
|
|
|
|
|
# what are we running? |
520
|
|
|
|
|
|
|
# |
521
|
|
|
|
|
|
|
# Figure it out, and generate a |
522
|
|
|
|
|
|
|
# $self->{_reducer_generator_sub} that creates a |
523
|
|
|
|
|
|
|
# filter object that will be passed to dbpipeline_open2 |
524
|
|
|
|
|
|
|
# to reduce one (or many, if map_aware_reducer) keys. |
525
|
|
|
|
|
|
|
# |
526
|
0
|
0
|
|
|
|
|
if ($#{$self->{_external_command_argv}} >= 0) { |
|
0
|
0
|
|
|
|
|
|
527
|
|
|
|
|
|
|
# external command |
528
|
0
|
|
|
|
|
|
my @argv = @{$self->{_external_command_argv}}; |
|
0
|
|
|
|
|
|
|
529
|
0
|
0
|
|
|
|
|
shift @argv if ($argv[0] eq '--'); |
530
|
0
|
|
|
|
|
|
my $empty = $self->{_empty}; |
531
|
0
|
|
|
|
|
|
my @pre_argv; |
532
|
0
|
0
|
|
|
|
|
push @pre_argv, ($self->{_warnings} ? '--warnings' : '--nowarnings'), |
533
|
|
|
|
|
|
|
'--nolog', '--'; |
534
|
0
|
|
|
|
|
|
my $reducer_generator_sub; |
535
|
0
|
0
|
|
|
|
|
if ($self->{_pass_current_key}) { |
536
|
|
|
|
|
|
|
$reducer_generator_sub = sub { |
537
|
0
|
|
0
|
0
|
|
|
return dbsubprocess(@pre_argv, @argv, $_[0] // $empty); |
538
|
0
|
|
|
|
|
|
}; |
539
|
|
|
|
|
|
|
} else { |
540
|
|
|
|
|
|
|
$reducer_generator_sub = sub { |
541
|
0
|
|
|
0
|
|
|
return dbsubprocess(@pre_argv, @argv); |
542
|
0
|
|
|
|
|
|
}; |
543
|
|
|
|
|
|
|
}; |
544
|
0
|
|
|
|
|
|
$self->{_reducer_generator_sub} = $reducer_generator_sub; |
545
|
0
|
0
|
|
|
|
|
print STDERR "# dbmapreduce/setup: external command is " . join(" ", @pre_argv, @argv) . "\n" if ($self->{_debug} > 2); |
546
|
|
|
|
|
|
|
} elsif (defined($self->{_filter_generator_code})) { |
547
|
0
|
|
|
|
|
|
my $reducer_generator_sub; |
548
|
0
|
0
|
|
|
|
|
if (ref($self->{_filter_generator_code}) eq 'CODE') { |
549
|
0
|
0
|
|
|
|
|
print STDERR "# dbmapreduce/setup: direct code assignment for reducer sub\n" if ($self->{_debug}); |
550
|
0
|
|
|
|
|
|
$reducer_generator_sub = $self->{_filter_generator_code}; |
551
|
|
|
|
|
|
|
} else { |
552
|
0
|
|
|
|
|
|
my $sub_code; |
553
|
|
|
|
|
|
|
$sub_code = |
554
|
|
|
|
|
|
|
"use Fsdb::Filter::dbpipeline qw(:all);\n" . |
555
|
|
|
|
|
|
|
$included_code . |
556
|
|
|
|
|
|
|
'$reducer_generator_sub = sub {' . "\n" . |
557
|
|
|
|
|
|
|
$self->{_filter_generator_code} . |
558
|
0
|
|
|
|
|
|
"\n\t;\n};\n"; |
559
|
0
|
0
|
|
|
|
|
print STDERR "# dbmapreduce/setup: sub_code: $sub_code" if ($self->{_debug}); |
560
|
0
|
|
|
|
|
|
eval $sub_code; |
561
|
0
|
0
|
|
|
|
|
$@ && croak $self->{_prog} . ": error evaluating user-provided reducer sub:\n$sub_code\nerror is: $@.\n"; |
562
|
|
|
|
|
|
|
}; |
563
|
0
|
|
|
|
|
|
$self->{_reducer_generator_sub} = $reducer_generator_sub; |
564
|
|
|
|
|
|
|
} else { |
565
|
0
|
|
|
|
|
|
croak $self->{_prog} . ": reducer not specified.\n"; |
566
|
|
|
|
|
|
|
}; |
567
|
|
|
|
|
|
|
|
568
|
|
|
|
|
|
|
# |
569
|
|
|
|
|
|
|
# do we need to group the keys for the user? |
570
|
|
|
|
|
|
|
# |
571
|
0
|
|
|
|
|
|
my($input_reader_aref) = (); |
572
|
0
|
|
0
|
|
|
|
my $raw_to_raw = ($#{$self->{_external_command_argv}} >= 0 && $self->{_reducer_is_multikey_aware}); |
573
|
0
|
0
|
|
|
|
|
if ($raw_to_raw) { |
574
|
|
|
|
|
|
|
# external and we're good? just hook it together |
575
|
|
|
|
|
|
|
# (test case: dbmapreduce_cat.cmd) |
576
|
0
|
|
|
|
|
|
$input_reader_aref = [-raw_fh => 1]; |
577
|
|
|
|
|
|
|
} else { |
578
|
0
|
|
|
|
|
|
push (@$input_reader_aref, -comment_handler => $self->create_tolerant_pass_comments_sub('_cat_writer')); |
579
|
|
|
|
|
|
|
}; |
580
|
0
|
0
|
|
|
|
|
push(@$input_reader_aref, -header => $self->{_header}) if (defined($self->{_header})); |
581
|
0
|
0
|
|
|
|
|
if ($self->{_pre_sorted}) { |
582
|
0
|
|
|
|
|
|
$self->finish_io_option('input', @$input_reader_aref); |
583
|
|
|
|
|
|
|
} else { |
584
|
|
|
|
|
|
|
# not pre-sorted, so do lexical sort |
585
|
0
|
0
|
|
|
|
|
my $sort_column = defined($self->{_key_column}) ? $self->{_key_column} : '0'; |
586
|
0
|
|
|
|
|
|
my(@sort_args) = ('--nolog'); |
587
|
0
|
0
|
|
|
|
|
push(@sort_args, '--header' => $self->{_header}) if (defined($self->{_header})); |
588
|
0
|
|
|
|
|
|
push(@sort_args, $sort_column); |
589
|
|
|
|
|
|
|
my($new_reader, $new_fred) = dbpipeline_filter( |
590
|
|
|
|
|
|
|
$self->{_input}, |
591
|
0
|
|
|
|
|
|
$input_reader_aref, |
592
|
|
|
|
|
|
|
dbsort(@sort_args)); |
593
|
0
|
|
|
|
|
|
$self->{_pre_sorted_input} = $self->{_input}; |
594
|
0
|
|
|
|
|
|
$self->{_in} = $new_reader; |
595
|
0
|
|
|
|
|
|
$self->{_sorter_fred} = $new_fred; |
596
|
|
|
|
|
|
|
# |
597
|
|
|
|
|
|
|
# We will join the sorter in finish(). |
598
|
|
|
|
|
|
|
# |
599
|
|
|
|
|
|
|
}; |
600
|
|
|
|
|
|
|
|
601
|
|
|
|
|
|
|
# |
602
|
|
|
|
|
|
|
# figure out key column's name, now that we've done setup |
603
|
|
|
|
|
|
|
# |
604
|
0
|
0
|
|
|
|
|
if ($raw_to_raw) { |
|
|
0
|
|
|
|
|
|
605
|
|
|
|
|
|
|
# raw, so no parsing input at all |
606
|
0
|
|
|
|
|
|
$self->{_key_coli} = undef; |
607
|
|
|
|
|
|
|
} elsif (defined($self->{_key_column})) { |
608
|
0
|
|
|
|
|
|
$self->{_key_coli} = $self->{_in}->col_to_i($self->{_key_column}); |
609
|
|
|
|
|
|
|
croak $self->{_prog} . ": key column " . $self->{_key_column} . " is not in input stream.\n" |
610
|
0
|
0
|
|
|
|
|
if (!defined($self->{_key_coli})); |
611
|
|
|
|
|
|
|
} else { |
612
|
|
|
|
|
|
|
# default to first column |
613
|
0
|
|
|
|
|
|
$self->{_key_coli} = 0; |
614
|
0
|
|
|
|
|
|
$self->{_key_column} = $self->{_in}->i_to_col(0); |
615
|
|
|
|
|
|
|
}; |
616
|
|
|
|
|
|
|
|
617
|
|
|
|
|
|
|
# |
618
|
|
|
|
|
|
|
# setup the postprocessing thread |
619
|
|
|
|
|
|
|
# |
620
|
0
|
|
|
|
|
|
$self->_setup_reducer(); |
621
|
|
|
|
|
|
|
|
622
|
0
|
|
|
|
|
|
$self->{_reducer_invocation_count} = 0; |
623
|
|
|
|
|
|
|
# $SIG{'PIPE'} = 'IGNORE'; |
624
|
|
|
|
|
|
|
} |
625
|
|
|
|
|
|
|
|
626
|
|
|
|
|
|
|
|
627
|
|
|
|
|
|
|
|
628
|
|
|
|
|
|
|
=head2 _setup_reducer |
629
|
|
|
|
|
|
|
|
630
|
|
|
|
|
|
|
_setup_reducer |
631
|
|
|
|
|
|
|
|
632
|
|
|
|
|
|
|
(internal) |
633
|
|
|
|
|
|
|
One Fred that runs the reducer and produces output. |
634
|
|
|
|
|
|
|
C<_reducer_queue> is sends the new key, |
635
|
|
|
|
|
|
|
then a Fsdb stream, then EOF (undef) |
636
|
|
|
|
|
|
|
for each group. |
637
|
|
|
|
|
|
|
We setup the output, suppress all but the first header, |
638
|
|
|
|
|
|
|
and add in the keys if necessary. |
639
|
|
|
|
|
|
|
|
640
|
|
|
|
|
|
|
=cut |
641
|
|
|
|
|
|
|
|
642
|
|
|
|
|
|
|
sub _setup_reducer() { |
643
|
0
|
|
|
0
|
|
|
my($self) = @_; |
644
|
|
|
|
|
|
|
|
645
|
0
|
0
|
|
|
|
|
if ($self->{_reducer_is_multikey_aware}) { |
646
|
|
|
|
|
|
|
# croak "case not yet handled--need to verify correct sort order\n" if ($self->{_pre_sorted} == 1); |
647
|
|
|
|
|
|
|
# No need to do input checking, |
648
|
|
|
|
|
|
|
# and reducer promises to handle whatever we give it, |
649
|
|
|
|
|
|
|
# and we assume it outputs the key, so |
650
|
|
|
|
|
|
|
# just start the reducer on our own input and run it here. |
651
|
0
|
|
|
|
|
|
my $reducer = &{$self->{_reducer_generator_sub}}(); |
|
0
|
|
|
|
|
|
|
652
|
|
|
|
|
|
|
$reducer->parse_options('--input' => $self->{_in}, |
653
|
|
|
|
|
|
|
'--output' => $self->{_output}, |
654
|
|
|
|
|
|
|
'--saveoutput' => \$self->{_out}, |
655
|
0
|
|
|
|
|
|
'--noclose'); |
656
|
0
|
|
|
|
|
|
$reducer->setup(); |
657
|
0
|
|
|
|
|
|
$self->{_multikey_aware_reducer} = $reducer; |
658
|
0
|
|
|
|
|
|
return; |
659
|
|
|
|
|
|
|
} else { |
660
|
|
|
|
|
|
|
# do nothing; we do our work below |
661
|
|
|
|
|
|
|
}; |
662
|
|
|
|
|
|
|
} |
663
|
|
|
|
|
|
|
|
664
|
|
|
|
|
|
|
=head2 _key_to_string |
665
|
|
|
|
|
|
|
|
666
|
|
|
|
|
|
|
$self->_key_to_string($key) |
667
|
|
|
|
|
|
|
|
668
|
|
|
|
|
|
|
Convert a key (maybe undef) to a string for status messages. |
669
|
|
|
|
|
|
|
|
670
|
|
|
|
|
|
|
=cut |
671
|
|
|
|
|
|
|
|
672
|
|
|
|
|
|
|
sub _key_to_string($$) { |
673
|
0
|
|
|
0
|
|
|
my($self, $key) = @_; |
674
|
0
|
0
|
|
|
|
|
return defined($key) ? $key : '(undef)'; |
675
|
|
|
|
|
|
|
} |
676
|
|
|
|
|
|
|
|
677
|
|
|
|
|
|
|
|
678
|
|
|
|
|
|
|
=head2 _open_new_key |
679
|
|
|
|
|
|
|
|
680
|
|
|
|
|
|
|
_open_new_key |
681
|
|
|
|
|
|
|
|
682
|
|
|
|
|
|
|
(internal) |
683
|
|
|
|
|
|
|
|
684
|
|
|
|
|
|
|
Note that new_key can be undef if there was no input. |
685
|
|
|
|
|
|
|
|
686
|
|
|
|
|
|
|
=cut |
687
|
|
|
|
|
|
|
|
688
|
|
|
|
|
|
|
sub _open_new_key { |
689
|
0
|
|
|
0
|
|
|
my($self, $new_key) = @_; |
690
|
|
|
|
|
|
|
|
691
|
0
|
0
|
|
|
|
|
print STDERR "# dbmapreduce: _open_new_key on " . $self->_key_to_string($new_key) . "\n" if ($self->{_debug} >= 2); |
692
|
|
|
|
|
|
|
|
693
|
0
|
|
|
|
|
|
$self->{_current_key} = $new_key; |
694
|
|
|
|
|
|
|
|
695
|
|
|
|
|
|
|
# If already running and can handle multiple tags, just keep going. |
696
|
0
|
0
|
|
|
|
|
die "internal error: no more multikey here\n" if ($self->{_reducer_is_multikey_aware}); |
697
|
|
|
|
|
|
|
|
698
|
|
|
|
|
|
|
# |
699
|
|
|
|
|
|
|
# make the reducer |
700
|
|
|
|
|
|
|
# |
701
|
0
|
|
|
|
|
|
my $output_file = Fsdb::Support::NamedTmpfile::alloc($self->{_tmpdir}); |
702
|
0
|
|
|
|
|
|
my @reducer_modules; |
703
|
0
|
|
|
|
|
|
push(@reducer_modules, &{$self->{_reducer_generator_sub}}($new_key)); |
|
0
|
|
|
|
|
|
|
704
|
0
|
0
|
|
|
|
|
if ($self->{_copy_fscode}) { |
705
|
0
|
|
|
|
|
|
push(@reducer_modules, dbfilealter('--nolog', '-F', $self->{_in}->fscode())); |
706
|
|
|
|
|
|
|
}; |
707
|
0
|
0
|
|
|
|
|
if ($self->{_prepend_key}) { |
708
|
|
|
|
|
|
|
# croak $self->{_prog} . ": no key_column\n" if (!defined($self->{_key_column})); |
709
|
0
|
|
0
|
|
|
|
push(@reducer_modules, dbcolcreate('--no-recreate-fatal', '--nolog', '--first', '-e', $new_key // $self->{_empty}, $self->{_key_column})); |
710
|
|
|
|
|
|
|
}; |
711
|
0
|
0
|
|
|
|
|
print STDERR "# reducer output to $output_file (in process $$)\n" if ($self->{_debug}); |
712
|
|
|
|
|
|
|
# $reducer_modules[$#reducer_modules]->parse_options('--output' => $output_file); |
713
|
0
|
|
|
|
|
|
unshift(@reducer_modules, '--output' => $output_file); |
714
|
0
|
|
|
|
|
|
my %work_queue_entry; |
715
|
0
|
|
|
|
|
|
$work_queue_entry{'status'} = 'running'; |
716
|
0
|
|
|
|
|
|
$work_queue_entry{'output'} = $output_file; |
717
|
0
|
|
|
|
|
|
my $debug = $self->{_debug}; |
718
|
|
|
|
|
|
|
my($to_reducer_writer, $reducer_fred) = dbpipeline_sink([-clone => $self->{_in}], |
719
|
|
|
|
|
|
|
'--fred_description' => 'dbmapreduce:dbpipeline_sink(to_reducer)', |
720
|
|
|
|
|
|
|
'--fred_exit_sub' => sub { |
721
|
0
|
|
|
0
|
|
|
$work_queue_entry{'status'} = 'done'; |
722
|
0
|
0
|
|
|
|
|
print STDERR "# dbmapreduce:reducer: output $output_file\n" if ($debug); |
723
|
0
|
0
|
|
|
|
|
print STDERR "# dbmapreduce:reducer: zero size $output_file\n" if (-z $output_file); |
724
|
0
|
|
|
|
|
|
}, @reducer_modules); |
725
|
0
|
|
|
|
|
|
$work_queue_entry{'fred'} = $reducer_fred; |
726
|
|
|
|
|
|
|
|
727
|
0
|
|
|
|
|
|
$self->{_to_reducer_writer} = $to_reducer_writer; |
728
|
0
|
|
|
|
|
|
$self->{_current_reducer_fastpath_sub} = $to_reducer_writer->fastpath_sub(); |
729
|
0
|
|
|
|
|
|
push (@{$self->{_work_queue}}, \%work_queue_entry); |
|
0
|
|
|
|
|
|
|
730
|
|
|
|
|
|
|
} |
731
|
|
|
|
|
|
|
|
732
|
|
|
|
|
|
|
=head2 _close_old_key |
733
|
|
|
|
|
|
|
|
734
|
|
|
|
|
|
|
_close_old_key |
735
|
|
|
|
|
|
|
|
736
|
|
|
|
|
|
|
Internal, finish a key. |
737
|
|
|
|
|
|
|
|
738
|
|
|
|
|
|
|
=cut |
739
|
|
|
|
|
|
|
|
740
|
|
|
|
|
|
|
sub _close_old_key { |
741
|
0
|
|
|
0
|
|
|
my($self, $key, $final) = @_; |
742
|
|
|
|
|
|
|
|
743
|
0
|
0
|
|
|
|
|
print STDERR "# dbmapreduce: _close_old_key on " . $self->_key_to_string($key) . "\n" if ($self->{_debug} >= 2); |
744
|
|
|
|
|
|
|
|
745
|
0
|
0
|
|
|
|
|
if (!defined($key)) { |
746
|
0
|
0
|
|
|
|
|
croak $self->{_prog} . ": internal error: _close_old_key called on non-final null-key.\n" |
747
|
|
|
|
|
|
|
if (!$final); |
748
|
|
|
|
|
|
|
}; |
749
|
0
|
0
|
|
|
|
|
die "internal error: no more multikey here\n" if ($self->{_reducer_is_multikey_aware}); |
750
|
|
|
|
|
|
|
|
751
|
|
|
|
|
|
|
croak $self->{_prog} . ": internal error: current key doesn't equal prior key " . $self->_key_to_string($self->{_current_key}) . " != key " . $self->_key_to_string($key) . "\n" |
752
|
0
|
0
|
0
|
|
|
|
if (defined($key) && $self->{_current_key} ne $key); |
753
|
|
|
|
|
|
|
# finish the reducer |
754
|
0
|
0
|
0
|
|
|
|
print STDERR "# dbmapreduce: _close_old_key closing reducer (" . ($key // "null key") . ")\n" if ($self->{_debug} >= 2); |
755
|
0
|
|
|
|
|
|
$self->{_to_reducer_writer}->close; |
756
|
|
|
|
|
|
|
} |
757
|
|
|
|
|
|
|
|
758
|
|
|
|
|
|
|
=head2 _check_finished_reducers |
759
|
|
|
|
|
|
|
|
760
|
|
|
|
|
|
|
$self->_check_finished_reducers($force); |
761
|
|
|
|
|
|
|
|
762
|
|
|
|
|
|
|
Internal: see if any reducer freds finished, optionally $FORCE-ing |
763
|
|
|
|
|
|
|
all to finish. |
764
|
|
|
|
|
|
|
|
765
|
|
|
|
|
|
|
This routine also enforces a maximum amount of parallelism, blocking us when we have too |
766
|
|
|
|
|
|
|
many reducers running. |
767
|
|
|
|
|
|
|
|
768
|
|
|
|
|
|
|
=cut |
769
|
|
|
|
|
|
|
|
770
|
|
|
|
|
|
|
sub _check_finished_reducers($$) { |
771
|
0
|
|
|
0
|
|
|
my($self, $force) = @_; |
772
|
|
|
|
|
|
|
|
773
|
0
|
0
|
|
|
|
|
my $force_status = ($force ? "forced" : "optional"); |
774
|
0
|
|
|
|
|
|
my $backlog = $#{$self->{_work_queue}} + 1; |
|
0
|
|
|
|
|
|
|
775
|
0
|
0
|
|
|
|
|
$self->{_cat_writer}->write_rowobj("# dbmapreduce: test_parallelism backlog $backlog, max $self->{_max_parallelism}\n") if ($self->{_test_parallelism}); |
776
|
0
|
0
|
|
|
|
|
if ($backlog >= $self->{_max_parallelism}) { |
777
|
0
|
|
|
|
|
|
$force = 2; |
778
|
0
|
|
|
|
|
|
$force_status = "backlog-forced"; |
779
|
|
|
|
|
|
|
} |
780
|
|
|
|
|
|
|
|
781
|
0
|
0
|
|
|
|
|
print STDERR "# dbmerge:_check_finished_reducers: $force_status\n" if ($self->{_debug}); |
782
|
0
|
|
|
|
|
|
for(;;) { |
783
|
0
|
|
|
|
|
|
my $fred_or_code = Fsdb::Support::Freds::join_any(); |
784
|
0
|
0
|
|
|
|
|
last if (ref($fred_or_code) eq ''); |
785
|
0
|
0
|
|
|
|
|
croak "dbmapreduce: reducer failed\n" |
786
|
|
|
|
|
|
|
if ($fred_or_code->exit_code() != 0); |
787
|
0
|
0
|
|
|
|
|
print STDERR "# dbmerge:_check_finished_reducers: merged fred " . $fred_or_code->info() . "\n" if ($self->{_debug}); |
788
|
|
|
|
|
|
|
}; |
789
|
|
|
|
|
|
|
# |
790
|
|
|
|
|
|
|
# Reducers finish-sub has adjusted the work queue. |
791
|
|
|
|
|
|
|
# Try to push out output. |
792
|
|
|
|
|
|
|
# Be forceful (and block) if required. |
793
|
|
|
|
|
|
|
# |
794
|
0
|
|
|
|
|
|
while ($#{$self->{_work_queue}} >= 0) { |
|
0
|
|
|
|
|
|
|
795
|
0
|
|
|
|
|
|
my $work_queue_href = $self->{_work_queue}->[0]; |
796
|
0
|
0
|
|
|
|
|
if ($force) { |
797
|
0
|
|
|
|
|
|
my $fred = $work_queue_href->{fred}; |
798
|
0
|
0
|
|
|
|
|
print STDERR "# dbmerge:_check_finished_reducers: blocking on pending fred " . $fred->info() . "\n" if ($self->{_debug}); |
799
|
0
|
|
|
|
|
|
my $exit_code = $fred->join(); |
800
|
0
|
0
|
|
|
|
|
croak "dbmapreduce: reducer " . $fred->info() . " failed, exit $exit_code\n" if ($exit_code != 0); |
801
|
|
|
|
|
|
|
croak "dbmapreduce: reducer didn't leave status done\n" |
802
|
0
|
0
|
|
|
|
|
if ($work_queue_href->{status} ne 'done'); |
803
|
|
|
|
|
|
|
}; |
804
|
0
|
0
|
|
|
|
|
if ($work_queue_href->{status} ne 'done') { |
805
|
0
|
0
|
|
|
|
|
croak $self->{_prog} . ": internal error, reducer refused to complete\n" if ($force); |
806
|
0
|
|
|
|
|
|
last; |
807
|
|
|
|
|
|
|
}; |
808
|
|
|
|
|
|
|
# this one is done, send it to output |
809
|
0
|
|
|
|
|
|
my $output = $work_queue_href->{output}; |
810
|
0
|
0
|
|
|
|
|
print STDERR "# dbmerge->_check_finished_reducers: done with output $output\n" if ($self->{_debug}); |
811
|
0
|
|
|
|
|
|
$self->{_cat_writer}->write_rowobj([$output]); |
812
|
0
|
|
|
|
|
|
shift(@{$self->{_work_queue}}); |
|
0
|
|
|
|
|
|
|
813
|
|
|
|
|
|
|
}; |
814
|
|
|
|
|
|
|
} |
815
|
|
|
|
|
|
|
|
816
|
|
|
|
|
|
|
|
817
|
|
|
|
|
|
|
|
818
|
|
|
|
|
|
|
=head2 _mapper_run |
819
|
|
|
|
|
|
|
|
820
|
|
|
|
|
|
|
$filter->_mapper_run(); |
821
|
|
|
|
|
|
|
|
822
|
|
|
|
|
|
|
Internal: run over each rows, grouping them. |
823
|
|
|
|
|
|
|
Fork off reducer as necessary. |
824
|
|
|
|
|
|
|
|
825
|
|
|
|
|
|
|
=cut |
826
|
|
|
|
|
|
|
sub _mapper_run($) { |
827
|
0
|
|
|
0
|
|
|
my($self) = @_; |
828
|
|
|
|
|
|
|
|
829
|
0
|
|
|
|
|
|
$self->{_work_queue} = []; # track running reducers |
830
|
0
|
|
|
|
|
|
my $read_fastpath_sub = $self->{_in}->fastpath_sub(); |
831
|
0
|
|
|
|
|
|
my $reducer_fastpath_sub = undef; |
832
|
|
|
|
|
|
|
|
833
|
|
|
|
|
|
|
# |
834
|
|
|
|
|
|
|
# output merger |
835
|
|
|
|
|
|
|
# |
836
|
0
|
0
|
|
|
|
|
print STDERR "# opening dbfilecat\n" if ($self->{_debug}); |
837
|
0
|
|
|
|
|
|
my(@writer_args) = (-cols => [qw(filename)], -outputheader => 'never', -autoflush => 1); |
838
|
|
|
|
|
|
|
my($cat_writer, $cat_fred) = dbpipeline_sink(\@writer_args, |
839
|
|
|
|
|
|
|
'--fred_description' => 'dbmapreduce:dbpipeline_sink(cat_writer)', |
840
|
|
|
|
|
|
|
'--output' => $self->{_output}, |
841
|
0
|
|
|
|
|
|
dbfilecat(qw(--nolog --xargs --removeinputs))); |
842
|
0
|
0
|
|
|
|
|
croak $self->{_prog} . ": cannot invoke dbfilecat.\n" |
843
|
|
|
|
|
|
|
if ($cat_writer->error); |
844
|
0
|
|
|
|
|
|
$self->{_cat_writer} = $cat_writer; |
845
|
0
|
|
|
|
|
|
$self->{_cat_fred} = $cat_fred; |
846
|
|
|
|
|
|
|
|
847
|
|
|
|
|
|
|
# read data |
848
|
0
|
|
|
|
|
|
my($last_key) = undef; |
849
|
0
|
|
|
|
|
|
my $fref; |
850
|
0
|
|
|
|
|
|
my $key_coli = $self->{_key_coli}; |
851
|
0
|
|
|
|
|
|
$self->{_key_counts} = {}; |
852
|
0
|
|
|
|
|
|
my $nrows = 0; |
853
|
0
|
|
|
|
|
|
my $debug = $self->{_debug}; |
854
|
0
|
|
|
|
|
|
while ($fref = &$read_fastpath_sub()) { |
855
|
|
|
|
|
|
|
# print STDERR "data line: " . join(" ", @$fref) . "\n"; |
856
|
0
|
|
|
|
|
|
my($key) = $fref->[$key_coli]; |
857
|
|
|
|
|
|
|
|
858
|
0
|
0
|
0
|
|
|
|
if (!defined($last_key) || $key ne $last_key) { |
859
|
|
|
|
|
|
|
# start a new one |
860
|
|
|
|
|
|
|
# check for out-of-order duplicates |
861
|
0
|
0
|
|
|
|
|
if ($self->{_pre_sorted} == 1) { |
862
|
|
|
|
|
|
|
croak $self->{_prog} . ": single key ``$key'' split into multiple groups, selection of -S was invalid\n" |
863
|
0
|
0
|
|
|
|
|
if (defined($self->{_key_counts}{$key})); |
864
|
0
|
|
|
|
|
|
$self->{_key_counts}{$key} = 1; |
865
|
|
|
|
|
|
|
}; |
866
|
|
|
|
|
|
|
# finish off old one? |
867
|
0
|
0
|
|
|
|
|
if (defined($last_key)) { |
868
|
0
|
|
|
|
|
|
$self->_close_old_key($last_key); |
869
|
0
|
|
|
|
|
|
$self->_check_finished_reducers(0); |
870
|
|
|
|
|
|
|
}; |
871
|
0
|
|
|
|
|
|
$self->_open_new_key($key); |
872
|
0
|
|
|
|
|
|
$last_key = $key; |
873
|
0
|
|
|
|
|
|
$reducer_fastpath_sub = $self->{_current_reducer_fastpath_sub}; |
874
|
0
|
0
|
|
|
|
|
die "no reducer\n" if (!defined($reducer_fastpath_sub)); |
875
|
|
|
|
|
|
|
}; |
876
|
|
|
|
|
|
|
# pass the data to be reduced |
877
|
0
|
|
|
|
|
|
&{$reducer_fastpath_sub}($fref); |
|
0
|
|
|
|
|
|
|
878
|
|
|
|
|
|
|
}; |
879
|
|
|
|
|
|
|
|
880
|
0
|
0
|
|
|
|
|
if (!defined($last_key)) { |
881
|
|
|
|
|
|
|
# no input data, so write a single null key |
882
|
0
|
|
|
|
|
|
$self->_open_new_key(undef); |
883
|
|
|
|
|
|
|
}; |
884
|
|
|
|
|
|
|
|
885
|
|
|
|
|
|
|
# print STDERR "done with input, last_key=$last_key\n"; |
886
|
|
|
|
|
|
|
# close out any pending processing? (use the force option) |
887
|
0
|
|
|
|
|
|
$self->_close_old_key($last_key, 1); |
888
|
0
|
|
|
|
|
|
$self->_check_finished_reducers(1); |
889
|
|
|
|
|
|
|
# will clean up cat_writer in finish |
890
|
|
|
|
|
|
|
} |
891
|
|
|
|
|
|
|
|
892
|
|
|
|
|
|
|
|
893
|
|
|
|
|
|
|
=head2 run |
894
|
|
|
|
|
|
|
|
895
|
|
|
|
|
|
|
$filter->run(); |
896
|
|
|
|
|
|
|
|
897
|
|
|
|
|
|
|
Internal: run over each rows. |
898
|
|
|
|
|
|
|
|
899
|
|
|
|
|
|
|
=cut |
900
|
|
|
|
|
|
|
sub run($) { |
901
|
0
|
|
|
0
|
1
|
|
my($self) = @_; |
902
|
|
|
|
|
|
|
|
903
|
0
|
0
|
|
|
|
|
if ($self->{_reducer_is_multikey_aware}) { |
904
|
0
|
|
|
|
|
|
$self->{_multikey_aware_reducer}->run(); |
905
|
|
|
|
|
|
|
} else { |
906
|
0
|
|
|
|
|
|
$self->_mapper_run(); |
907
|
|
|
|
|
|
|
}; |
908
|
|
|
|
|
|
|
} |
909
|
|
|
|
|
|
|
|
910
|
|
|
|
|
|
|
=head2 finish |
911
|
|
|
|
|
|
|
|
912
|
|
|
|
|
|
|
$filter->finish(); |
913
|
|
|
|
|
|
|
|
914
|
|
|
|
|
|
|
Internal: write trailer. |
915
|
|
|
|
|
|
|
|
916
|
|
|
|
|
|
|
=cut |
917
|
|
|
|
|
|
|
|
918
|
|
|
|
|
|
|
sub finish($) { |
919
|
0
|
|
|
0
|
1
|
|
my($self) = @_; |
920
|
|
|
|
|
|
|
|
921
|
|
|
|
|
|
|
# |
922
|
|
|
|
|
|
|
# Join any pending Freds. |
923
|
|
|
|
|
|
|
# |
924
|
0
|
0
|
|
|
|
|
if ($self->{_sorter_fred}) { |
925
|
|
|
|
|
|
|
print STDERR "# mapreduce main: join sorter\n" |
926
|
0
|
0
|
|
|
|
|
if ($self->{_debug}); |
927
|
0
|
|
|
|
|
|
$self->{_sorter_fred}->join(); |
928
|
|
|
|
|
|
|
croak $self->{_prog} . ": input sorter failed: " . $self->{_sorter_fred}->error() |
929
|
0
|
0
|
|
|
|
|
if ($self->{_sorter_fred}->error()); |
930
|
|
|
|
|
|
|
}; |
931
|
|
|
|
|
|
|
|
932
|
0
|
0
|
|
|
|
|
if ($self->{_reducer_is_multikey_aware}) { |
933
|
0
|
|
|
|
|
|
$self->{_multikey_aware_reducer}->finish(); |
934
|
|
|
|
|
|
|
# output our log message, in-line |
935
|
0
|
|
|
|
|
|
$self->SUPER::finish(); |
936
|
|
|
|
|
|
|
} else { |
937
|
|
|
|
|
|
|
# output log message by sending it all to cat_writer (a hack) |
938
|
0
|
|
|
|
|
|
$self->{_out} = $self->{_cat_writer}; |
939
|
0
|
|
|
|
|
|
$self->SUPER::finish(); # will close it |
940
|
0
|
|
|
|
|
|
$self->{_cat_writer}->close; |
941
|
|
|
|
|
|
|
print STDERR "# mapreduce main: join dbfilecat\n" |
942
|
0
|
0
|
|
|
|
|
if ($self->{_debug}); |
943
|
0
|
|
|
|
|
|
$self->{_cat_fred}->join(); |
944
|
0
|
0
|
|
|
|
|
if (my $error = $self->{_cat_fred}->error()) { |
945
|
0
|
|
|
|
|
|
croak $self->{_prog} . ": dbfilecat erred: $error"; |
946
|
|
|
|
|
|
|
}; |
947
|
|
|
|
|
|
|
}; |
948
|
|
|
|
|
|
|
} |
949
|
|
|
|
|
|
|
|
950
|
|
|
|
|
|
|
|
951
|
|
|
|
|
|
|
=head1 AUTHOR and COPYRIGHT |
952
|
|
|
|
|
|
|
|
953
|
|
|
|
|
|
|
Copyright (C) 1991-2016 by John Heidemann |
954
|
|
|
|
|
|
|
|
955
|
|
|
|
|
|
|
This program is distributed under terms of the GNU general |
956
|
|
|
|
|
|
|
public license, version 2. See the file COPYING |
957
|
|
|
|
|
|
|
with the distribution for details. |
958
|
|
|
|
|
|
|
|
959
|
|
|
|
|
|
|
=cut |
960
|
|
|
|
|
|
|
|
961
|
|
|
|
|
|
|
1; |