line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
#!/usr/bin/perl -w |
2
|
|
|
|
|
|
|
|
3
|
|
|
|
|
|
|
# |
4
|
|
|
|
|
|
|
# dbsort.pm |
5
|
|
|
|
|
|
|
# Copyright (C) 1991-2016 by John Heidemann |
6
|
|
|
|
|
|
|
# |
7
|
|
|
|
|
|
|
# This program is distributed under terms of the GNU general |
8
|
|
|
|
|
|
|
# public license, version 2. See the file COPYING |
9
|
|
|
|
|
|
|
# in $dblibdir for details. |
10
|
|
|
|
|
|
|
# |
11
|
|
|
|
|
|
|
|
12
|
|
|
|
|
|
|
package Fsdb::Filter::dbsort; |
13
|
|
|
|
|
|
|
|
14
|
|
|
|
|
|
|
=head1 NAME |
15
|
|
|
|
|
|
|
|
16
|
|
|
|
|
|
|
dbsort - sort rows based on the the specified columns |
17
|
|
|
|
|
|
|
|
18
|
|
|
|
|
|
|
=head1 SYNOPSIS |
19
|
|
|
|
|
|
|
|
20
|
|
|
|
|
|
|
dbsort [-M MemLimit] [-T TemporaryDirectory] [-nNrR] column [column...] |
21
|
|
|
|
|
|
|
|
22
|
|
|
|
|
|
|
=head1 DESCRIPTION |
23
|
|
|
|
|
|
|
|
24
|
|
|
|
|
|
|
Sort all input rows as specified by the numeric or lexical columns. |
25
|
|
|
|
|
|
|
|
26
|
|
|
|
|
|
|
Dbsort consumes a fixed amount of memory regardless of input size. |
27
|
|
|
|
|
|
|
(It reverts to temporary files on disk if necessary, based on the -M |
28
|
|
|
|
|
|
|
and -T options.) |
29
|
|
|
|
|
|
|
|
30
|
|
|
|
|
|
|
The sort should be stable, but this has not yet been verified. |
31
|
|
|
|
|
|
|
|
32
|
|
|
|
|
|
|
For large inputs (those that spill to disk), |
33
|
|
|
|
|
|
|
L will do some of the merging in parallel, if possible. |
34
|
|
|
|
|
|
|
The B<--parallel> option can control the degree of parallelism, |
35
|
|
|
|
|
|
|
if desired. |
36
|
|
|
|
|
|
|
|
37
|
|
|
|
|
|
|
=head1 OPTIONS |
38
|
|
|
|
|
|
|
|
39
|
|
|
|
|
|
|
General option: |
40
|
|
|
|
|
|
|
|
41
|
|
|
|
|
|
|
=over 4 |
42
|
|
|
|
|
|
|
|
43
|
|
|
|
|
|
|
=item B<-M MaxMemBytes> |
44
|
|
|
|
|
|
|
|
45
|
|
|
|
|
|
|
Specify an approximate limit on memory usage (in bytes). |
46
|
|
|
|
|
|
|
Larger values allow faster sorting because more operations |
47
|
|
|
|
|
|
|
happen in-memory, provided you have enough memory. |
48
|
|
|
|
|
|
|
|
49
|
|
|
|
|
|
|
=item B<-T TmpDir> |
50
|
|
|
|
|
|
|
|
51
|
|
|
|
|
|
|
where to put tmp files. |
52
|
|
|
|
|
|
|
Also uses environment variable TMPDIR, if -T is |
53
|
|
|
|
|
|
|
not specified. |
54
|
|
|
|
|
|
|
Default is /tmp. |
55
|
|
|
|
|
|
|
|
56
|
|
|
|
|
|
|
=item B<--parallelism N> or B<-j N> |
57
|
|
|
|
|
|
|
|
58
|
|
|
|
|
|
|
Allow up to N merges to happen in parallel. |
59
|
|
|
|
|
|
|
Default is the number of CPUs in the machine. |
60
|
|
|
|
|
|
|
|
61
|
|
|
|
|
|
|
=back |
62
|
|
|
|
|
|
|
|
63
|
|
|
|
|
|
|
Sort specification options (can be interspersed with column names): |
64
|
|
|
|
|
|
|
|
65
|
|
|
|
|
|
|
=over 4 |
66
|
|
|
|
|
|
|
|
67
|
|
|
|
|
|
|
=item B<-r> or B<--descending> |
68
|
|
|
|
|
|
|
|
69
|
|
|
|
|
|
|
sort in reverse order (high to low) |
70
|
|
|
|
|
|
|
|
71
|
|
|
|
|
|
|
=item B<-R> or B<--ascending> |
72
|
|
|
|
|
|
|
|
73
|
|
|
|
|
|
|
sort in normal order (low to high) |
74
|
|
|
|
|
|
|
|
75
|
|
|
|
|
|
|
=item B<-n> or B<--numeric> |
76
|
|
|
|
|
|
|
|
77
|
|
|
|
|
|
|
sort numerically |
78
|
|
|
|
|
|
|
|
79
|
|
|
|
|
|
|
=item B<-N> or B<--lexical> |
80
|
|
|
|
|
|
|
|
81
|
|
|
|
|
|
|
sort lexicographically |
82
|
|
|
|
|
|
|
|
83
|
|
|
|
|
|
|
=back |
84
|
|
|
|
|
|
|
|
85
|
|
|
|
|
|
|
=for comment |
86
|
|
|
|
|
|
|
begin_standard_fsdb_options |
87
|
|
|
|
|
|
|
|
88
|
|
|
|
|
|
|
This module also supports the standard fsdb options: |
89
|
|
|
|
|
|
|
|
90
|
|
|
|
|
|
|
=over 4 |
91
|
|
|
|
|
|
|
|
92
|
|
|
|
|
|
|
=item B<-d> |
93
|
|
|
|
|
|
|
|
94
|
|
|
|
|
|
|
Enable debugging output. |
95
|
|
|
|
|
|
|
|
96
|
|
|
|
|
|
|
=item B<-i> or B<--input> InputSource |
97
|
|
|
|
|
|
|
|
98
|
|
|
|
|
|
|
Read from InputSource, typically a file name, or C<-> for standard input, |
99
|
|
|
|
|
|
|
or (if in Perl) a IO::Handle, Fsdb::IO or Fsdb::BoundedQueue objects. |
100
|
|
|
|
|
|
|
|
101
|
|
|
|
|
|
|
=item B<-o> or B<--output> OutputDestination |
102
|
|
|
|
|
|
|
|
103
|
|
|
|
|
|
|
Write to OutputDestination, typically a file name, or C<-> for standard output, |
104
|
|
|
|
|
|
|
or (if in Perl) a IO::Handle, Fsdb::IO or Fsdb::BoundedQueue objects. |
105
|
|
|
|
|
|
|
|
106
|
|
|
|
|
|
|
=item B<--autorun> or B<--noautorun> |
107
|
|
|
|
|
|
|
|
108
|
|
|
|
|
|
|
By default, programs process automatically, |
109
|
|
|
|
|
|
|
but Fsdb::Filter objects in Perl do not run until you invoke |
110
|
|
|
|
|
|
|
the run() method. |
111
|
|
|
|
|
|
|
The C<--(no)autorun> option controls that behavior within Perl. |
112
|
|
|
|
|
|
|
|
113
|
|
|
|
|
|
|
=item B<--header H> |
114
|
|
|
|
|
|
|
|
115
|
|
|
|
|
|
|
Use H as the full Fsdb header, rather than reading a header from |
116
|
|
|
|
|
|
|
then input. |
117
|
|
|
|
|
|
|
|
118
|
|
|
|
|
|
|
=item B<--help> |
119
|
|
|
|
|
|
|
|
120
|
|
|
|
|
|
|
Show help. |
121
|
|
|
|
|
|
|
|
122
|
|
|
|
|
|
|
=item B<--man> |
123
|
|
|
|
|
|
|
|
124
|
|
|
|
|
|
|
Show full manual. |
125
|
|
|
|
|
|
|
|
126
|
|
|
|
|
|
|
=back |
127
|
|
|
|
|
|
|
|
128
|
|
|
|
|
|
|
=for comment |
129
|
|
|
|
|
|
|
end_standard_fsdb_options |
130
|
|
|
|
|
|
|
|
131
|
|
|
|
|
|
|
|
132
|
|
|
|
|
|
|
=head1 SAMPLE USAGE |
133
|
|
|
|
|
|
|
|
134
|
|
|
|
|
|
|
=head2 Input: |
135
|
|
|
|
|
|
|
|
136
|
|
|
|
|
|
|
#fsdb cid cname |
137
|
|
|
|
|
|
|
10 pascal |
138
|
|
|
|
|
|
|
11 numanal |
139
|
|
|
|
|
|
|
12 os |
140
|
|
|
|
|
|
|
|
141
|
|
|
|
|
|
|
=head2 Command: |
142
|
|
|
|
|
|
|
|
143
|
|
|
|
|
|
|
cat data.fsdb | dbsort cname |
144
|
|
|
|
|
|
|
|
145
|
|
|
|
|
|
|
=head2 Output: |
146
|
|
|
|
|
|
|
|
147
|
|
|
|
|
|
|
#fsdb cid cname |
148
|
|
|
|
|
|
|
11 numanal |
149
|
|
|
|
|
|
|
12 os |
150
|
|
|
|
|
|
|
10 pascal |
151
|
|
|
|
|
|
|
# | dbsort cname |
152
|
|
|
|
|
|
|
|
153
|
|
|
|
|
|
|
=head1 SEE ALSO |
154
|
|
|
|
|
|
|
|
155
|
|
|
|
|
|
|
L, |
156
|
|
|
|
|
|
|
L, |
157
|
|
|
|
|
|
|
L |
158
|
|
|
|
|
|
|
|
159
|
|
|
|
|
|
|
=head1 CLASS FUNCTIONS |
160
|
|
|
|
|
|
|
|
161
|
|
|
|
|
|
|
=cut |
162
|
|
|
|
|
|
|
|
163
|
|
|
|
|
|
|
|
164
|
|
|
|
|
|
|
@ISA = qw(Fsdb::Filter); |
165
|
|
|
|
|
|
|
($VERSION) = 2.0; |
166
|
|
|
|
|
|
|
|
167
|
1
|
|
|
1
|
|
9490
|
use strict; |
|
1
|
|
|
|
|
3
|
|
|
1
|
|
|
|
|
38
|
|
168
|
1
|
|
|
1
|
|
7
|
use Pod::Usage; |
|
1
|
|
|
|
|
5
|
|
|
1
|
|
|
|
|
129
|
|
169
|
1
|
|
|
1
|
|
8
|
use Carp; |
|
1
|
|
|
|
|
3
|
|
|
1
|
|
|
|
|
66
|
|
170
|
|
|
|
|
|
|
|
171
|
1
|
|
|
1
|
|
8
|
use Fsdb::IO::Reader; |
|
1
|
|
|
|
|
3
|
|
|
1
|
|
|
|
|
27
|
|
172
|
1
|
|
|
1
|
|
6
|
use Fsdb::IO::Writer; |
|
1
|
|
|
|
|
2
|
|
|
1
|
|
|
|
|
25
|
|
173
|
1
|
|
|
1
|
|
7
|
use Fsdb::Filter; |
|
1
|
|
|
|
|
93
|
|
|
1
|
|
|
|
|
30
|
|
174
|
1
|
|
|
1
|
|
7
|
use Fsdb::Support::NamedTmpfile; |
|
1
|
|
|
|
|
3
|
|
|
1
|
|
|
|
|
34
|
|
175
|
1
|
|
|
1
|
|
499
|
use Fsdb::Filter::dbmerge; |
|
1
|
|
|
|
|
4
|
|
|
1
|
|
|
|
|
46
|
|
176
|
1
|
|
|
1
|
|
515
|
use Fsdb::Filter::dbpipeline qw(dbpipeline_sink dbmerge); |
|
1
|
|
|
|
|
6
|
|
|
1
|
|
|
|
|
3144
|
|
177
|
|
|
|
|
|
|
|
178
|
|
|
|
|
|
|
my($PERL_MEM_OVERHEAD) = 50; # approx. bytes of overhead for each record in mem |
179
|
|
|
|
|
|
|
my($PERL_MEM_SCALING) = 2; # divided user requested mem by this factor to account to perl memory usage (huge approximatation) |
180
|
|
|
|
|
|
|
|
181
|
|
|
|
|
|
|
|
182
|
|
|
|
|
|
|
=head2 new |
183
|
|
|
|
|
|
|
|
184
|
|
|
|
|
|
|
$filter = new Fsdb::Filter::dbsort(@arguments); |
185
|
|
|
|
|
|
|
|
186
|
|
|
|
|
|
|
Create a new object, taking command-line arguments. |
187
|
|
|
|
|
|
|
|
188
|
|
|
|
|
|
|
=cut |
189
|
|
|
|
|
|
|
|
190
|
|
|
|
|
|
|
sub new ($@) { |
191
|
0
|
|
|
0
|
1
|
|
my $class = shift @_; |
192
|
0
|
|
|
|
|
|
my $self = $class->SUPER::new(@_); |
193
|
0
|
|
|
|
|
|
bless $self, $class; |
194
|
0
|
|
|
|
|
|
$self->set_defaults; |
195
|
0
|
|
|
|
|
|
$self->parse_options(@_); |
196
|
0
|
|
|
|
|
|
$self->SUPER::post_new(); |
197
|
0
|
|
|
|
|
|
return $self; |
198
|
|
|
|
|
|
|
} |
199
|
|
|
|
|
|
|
|
200
|
|
|
|
|
|
|
|
201
|
|
|
|
|
|
|
=head2 set_defaults |
202
|
|
|
|
|
|
|
|
203
|
|
|
|
|
|
|
$filter->set_defaults(); |
204
|
|
|
|
|
|
|
|
205
|
|
|
|
|
|
|
Internal: set up defaults. |
206
|
|
|
|
|
|
|
|
207
|
|
|
|
|
|
|
=cut |
208
|
|
|
|
|
|
|
|
209
|
|
|
|
|
|
|
sub set_defaults ($) { |
210
|
0
|
|
|
0
|
1
|
|
my $self = shift @_; |
211
|
0
|
|
|
|
|
|
$self->SUPER::set_defaults(); |
212
|
0
|
|
|
|
|
|
$self->{_max_memory} = 1024*1024*256; |
213
|
0
|
|
|
|
|
|
$self->{_mem_debug} = undef; |
214
|
0
|
|
|
|
|
|
$self->{_sort_argv} = []; |
215
|
0
|
|
|
|
|
|
$self->set_default_tmpdir; |
216
|
0
|
|
|
|
|
|
$self->{_max_parallelism} = undef; |
217
|
0
|
|
|
|
|
|
$self->{_header} = undef; |
218
|
|
|
|
|
|
|
} |
219
|
|
|
|
|
|
|
|
220
|
|
|
|
|
|
|
=head2 parse_options |
221
|
|
|
|
|
|
|
|
222
|
|
|
|
|
|
|
$filter->parse_options(@ARGV); |
223
|
|
|
|
|
|
|
|
224
|
|
|
|
|
|
|
Internal: parse command-line arguments. |
225
|
|
|
|
|
|
|
|
226
|
|
|
|
|
|
|
=cut |
227
|
|
|
|
|
|
|
|
228
|
|
|
|
|
|
|
sub parse_options ($@) { |
229
|
0
|
|
|
0
|
1
|
|
my $self = shift @_; |
230
|
|
|
|
|
|
|
|
231
|
0
|
|
|
|
|
|
my(@argv) = @_; |
232
|
|
|
|
|
|
|
$self->get_options( |
233
|
|
|
|
|
|
|
\@argv, |
234
|
0
|
|
|
0
|
|
|
'help|?' => sub { pod2usage(1); }, |
235
|
0
|
|
|
0
|
|
|
'man' => sub { pod2usage(-verbose => 2); }, |
236
|
|
|
|
|
|
|
'autorun!' => \$self->{_autorun}, |
237
|
|
|
|
|
|
|
'close!' => \$self->{_close}, |
238
|
|
|
|
|
|
|
'd|debug+' => \$self->{_debug}, |
239
|
|
|
|
|
|
|
'header=s' => \$self->{_header}, |
240
|
0
|
|
|
0
|
|
|
'i|input=s' => sub { $self->parse_io_option('input', @_); }, |
241
|
|
|
|
|
|
|
'j|parallelism=i' => \$self->{_max_parallelism}, |
242
|
|
|
|
|
|
|
'log!' => \$self->{_logprog}, |
243
|
|
|
|
|
|
|
'M|maxmemory=i' => \$self->{_max_memory}, |
244
|
0
|
|
|
0
|
|
|
'o|output=s' => sub { $self->parse_io_option('output', @_); }, |
245
|
|
|
|
|
|
|
'T|tmpdir|tempdir=s' => \$self->{_tmpdir}, |
246
|
|
|
|
|
|
|
# sort key options: |
247
|
0
|
|
|
0
|
|
|
'n|numeric' => sub { $self->parse_sort_option(@_); }, |
248
|
0
|
|
|
0
|
|
|
'N|lexical' => sub { $self->parse_sort_option(@_); }, |
249
|
0
|
|
|
0
|
|
|
'r|descending' => sub { $self->parse_sort_option(@_); }, |
250
|
0
|
|
|
0
|
|
|
'R|ascending' => sub { $self->parse_sort_option(@_); }, |
251
|
0
|
|
|
0
|
|
|
'<>' => sub { $self->parse_sort_option('<>', @_); }, |
252
|
0
|
0
|
|
|
|
|
) or pod2usage(2); |
253
|
0
|
0
|
|
|
|
|
croak $self->{_prog} . ": internal error, extra arguments.\n" |
254
|
|
|
|
|
|
|
if ($#argv != -1); |
255
|
|
|
|
|
|
|
} |
256
|
|
|
|
|
|
|
|
257
|
|
|
|
|
|
|
|
258
|
|
|
|
|
|
|
=head2 setup |
259
|
|
|
|
|
|
|
|
260
|
|
|
|
|
|
|
$filter->setup(); |
261
|
|
|
|
|
|
|
|
262
|
|
|
|
|
|
|
Internal: setup, parse headers. |
263
|
|
|
|
|
|
|
|
264
|
|
|
|
|
|
|
=cut |
265
|
|
|
|
|
|
|
|
266
|
|
|
|
|
|
|
sub setup($) { |
267
|
0
|
|
|
0
|
1
|
|
my($self) = @_; |
268
|
|
|
|
|
|
|
|
269
|
|
|
|
|
|
|
croak $self->{_prog} . ": no sorting key specified.\n" |
270
|
0
|
0
|
|
|
|
|
if ($self->{_sort_argv} == -1); |
271
|
|
|
|
|
|
|
|
272
|
|
|
|
|
|
|
# |
273
|
|
|
|
|
|
|
# setup final IO |
274
|
|
|
|
|
|
|
# |
275
|
0
|
|
|
|
|
|
my(@finish_args) = (-comment_handler => $self->create_delay_comments_sub); |
276
|
0
|
0
|
|
|
|
|
push (@finish_args, -header => $self->{_header}) if (defined($self->{_header})); |
277
|
0
|
|
|
|
|
|
$self->finish_io_option('input', @finish_args); |
278
|
|
|
|
|
|
|
|
279
|
0
|
|
|
|
|
|
$self->{_compare_code} = $self->create_compare_code($self->{_in}, $self->{_in}); |
280
|
|
|
|
|
|
|
croak $self->{_prog} . ": no sort field specified.\n" |
281
|
0
|
0
|
|
|
|
|
if (!defined($self->{_compare_code})); |
282
|
0
|
0
|
|
|
|
|
print "COMPARE CODE:\n\t" . $self->{_compare_code} . "\n" if ($self->{_debug}); |
283
|
0
|
|
|
|
|
|
my $compare_sub; |
284
|
0
|
|
|
|
|
|
eval '$self->{_compare_sub} = $compare_sub = ' . $self->{_compare_code} . ';'; |
285
|
0
|
0
|
|
|
|
|
$@ && croak $self->{_prog} . ": internal eval error in compare code: $@.\n"; |
286
|
|
|
|
|
|
|
# $@ && croak "dbsort.pm: internal eval error in compare code: $@.\n"; |
287
|
|
|
|
|
|
|
} |
288
|
|
|
|
|
|
|
|
289
|
|
|
|
|
|
|
=head2 segment_start |
290
|
|
|
|
|
|
|
|
291
|
|
|
|
|
|
|
$self->segment_start(\@rows); |
292
|
|
|
|
|
|
|
|
293
|
|
|
|
|
|
|
Sorting happens internally, |
294
|
|
|
|
|
|
|
to handle large things in pieces if necessary. |
295
|
|
|
|
|
|
|
|
296
|
|
|
|
|
|
|
call C<$self->segment_start> to init things and to restart after an overflow |
297
|
|
|
|
|
|
|
C<$self->segment_overflow> to close one segment and start the next, |
298
|
|
|
|
|
|
|
and C<$self->segment_merge_finish> to put them back together again. |
299
|
|
|
|
|
|
|
|
300
|
|
|
|
|
|
|
Note that we don't invoke the merge code unless the data |
301
|
|
|
|
|
|
|
exceeds some threshold size, so small sorts happen completely |
302
|
|
|
|
|
|
|
in memory. |
303
|
|
|
|
|
|
|
|
304
|
|
|
|
|
|
|
Once we give up on memory, all the merging happens by making |
305
|
|
|
|
|
|
|
passes over the disk files. |
306
|
|
|
|
|
|
|
|
307
|
|
|
|
|
|
|
=cut |
308
|
|
|
|
|
|
|
|
309
|
|
|
|
|
|
|
sub segment_start ($\@) { |
310
|
0
|
|
|
0
|
1
|
|
my ($self, $rows_ref) = @_; |
311
|
|
|
|
|
|
|
|
312
|
0
|
|
|
|
|
|
$#{$rows_ref} = -1; # truncate array |
|
0
|
|
|
|
|
|
|
313
|
|
|
|
|
|
|
} |
314
|
|
|
|
|
|
|
|
315
|
|
|
|
|
|
|
=head2 segment_next_output |
316
|
|
|
|
|
|
|
|
317
|
|
|
|
|
|
|
$out = $self->segment_next_output($input_finished) |
318
|
|
|
|
|
|
|
|
319
|
|
|
|
|
|
|
Internal: return a Fsdb::IO::Writer as $OUT |
320
|
|
|
|
|
|
|
that either points to our output or a temporary file, |
321
|
|
|
|
|
|
|
depending on how things are going. |
322
|
|
|
|
|
|
|
|
323
|
|
|
|
|
|
|
=cut |
324
|
|
|
|
|
|
|
|
325
|
|
|
|
|
|
|
sub segment_next_output($$) { |
326
|
0
|
|
|
0
|
1
|
|
my ($self, $input_finished) = @_; |
327
|
0
|
|
0
|
|
|
|
my $final_output = ($#{$self->{_files_to_merge}} == -1 && $input_finished); |
328
|
0
|
|
|
|
|
|
my $out; |
329
|
0
|
0
|
|
|
|
|
if ($final_output) { |
330
|
0
|
0
|
|
|
|
|
if (!defined($self->{_merge_fred})) { |
331
|
|
|
|
|
|
|
# setup output |
332
|
|
|
|
|
|
|
# (if merging, then we did this when we forked the merge thread) |
333
|
0
|
|
|
|
|
|
$self->finish_io_option('output', -clone => $self->{_in}); |
334
|
|
|
|
|
|
|
}; |
335
|
0
|
|
|
|
|
|
$out = $self->{_out}; |
336
|
0
|
0
|
|
|
|
|
print "# dbsort segment_next_output: final output\n" if ($self->{_debug}); |
337
|
|
|
|
|
|
|
} else { |
338
|
|
|
|
|
|
|
# dump to a file for merging |
339
|
0
|
|
|
|
|
|
my $tmpfile = Fsdb::Support::NamedTmpfile::alloc($self->{_tmpdir}); |
340
|
0
|
|
|
|
|
|
$out = $tmpfile; # just return the name |
341
|
0
|
|
|
|
|
|
push(@{$self->{_files_to_merge}}, $tmpfile); |
|
0
|
|
|
|
|
|
|
342
|
0
|
0
|
|
|
|
|
print "# dbsort segment_next_output: intermediate file: $tmpfile\n" if ($self->{_debug}); |
343
|
|
|
|
|
|
|
}; |
344
|
0
|
|
|
|
|
|
return ($out, $final_output); |
345
|
|
|
|
|
|
|
} |
346
|
|
|
|
|
|
|
|
347
|
|
|
|
|
|
|
=head2 segment_overflow |
348
|
|
|
|
|
|
|
|
349
|
|
|
|
|
|
|
$self->segment_overflow(\@rows, $input_finished) |
350
|
|
|
|
|
|
|
|
351
|
|
|
|
|
|
|
Called to sort @ROWS, writing them to the appropriate place. |
352
|
|
|
|
|
|
|
$INPUT_FINISHED is set if all input has been read. |
353
|
|
|
|
|
|
|
|
354
|
|
|
|
|
|
|
=cut |
355
|
|
|
|
|
|
|
|
356
|
|
|
|
|
|
|
#sub so1 { |
357
|
|
|
|
|
|
|
# my ($self, $rows_ref) = @_; |
358
|
|
|
|
|
|
|
# my(@sorted_rows) = sort { $a->[0] <=> $b->[0] } @{$rows_ref}; |
359
|
|
|
|
|
|
|
# return @sorted_rows; |
360
|
|
|
|
|
|
|
#} |
361
|
|
|
|
|
|
|
|
362
|
|
|
|
|
|
|
sub segment_overflow($\@$) { |
363
|
0
|
|
|
0
|
1
|
|
my($self, $rows_ref, $input_finished) = @_; |
364
|
|
|
|
|
|
|
|
365
|
0
|
|
|
|
|
|
my $compare_sub = $self->{_compare_sub}; |
366
|
0
|
|
|
|
|
|
my(@sorted_rows) = sort $compare_sub @{$rows_ref}; |
|
0
|
|
|
|
|
|
|
367
|
|
|
|
|
|
|
# my(@sorted_rows) = $self->so1($rows_ref); |
368
|
|
|
|
|
|
|
|
369
|
0
|
|
|
|
|
|
my ($out_fn, $final_output) = $self->segment_next_output($input_finished, 'Fsdb:IO'); |
370
|
0
|
|
|
|
|
|
my $out; |
371
|
0
|
0
|
|
|
|
|
if (ref($out_fn) =~ /^Fsdb::IO::Writer/) { |
372
|
0
|
0
|
|
|
|
|
die "dbsort segment_overflow: suprise writer and NOT final_output\n" |
373
|
|
|
|
|
|
|
if (!$final_output); |
374
|
0
|
|
|
|
|
|
$out = $out_fn; # a bit hacky, but whatever |
375
|
|
|
|
|
|
|
} else { |
376
|
0
|
0
|
|
|
|
|
die "dbsort segment_overflow: suprise filename and final_output\n" |
377
|
|
|
|
|
|
|
if ($final_output); |
378
|
0
|
|
|
|
|
|
$out = new Fsdb::IO::Writer(-file => $out_fn, -clone => $self->{_in}); |
379
|
|
|
|
|
|
|
}; |
380
|
|
|
|
|
|
|
|
381
|
0
|
|
|
|
|
|
my $write_fastpath_sub = $out->fastpath_sub; |
382
|
0
|
|
|
|
|
|
foreach (@sorted_rows) { |
383
|
0
|
|
|
|
|
|
&$write_fastpath_sub($_); |
384
|
|
|
|
|
|
|
}; |
385
|
|
|
|
|
|
|
|
386
|
0
|
0
|
|
|
|
|
if (!$final_output) { |
387
|
0
|
|
|
|
|
|
$out->close; |
388
|
0
|
|
|
|
|
|
$self->segment_merge_start($out_fn); |
389
|
0
|
|
|
|
|
|
$self->segment_start($rows_ref); |
390
|
|
|
|
|
|
|
}; |
391
|
|
|
|
|
|
|
} |
392
|
|
|
|
|
|
|
|
393
|
|
|
|
|
|
|
=head2 segment_merge_start |
394
|
|
|
|
|
|
|
|
395
|
|
|
|
|
|
|
$self->segment_merge_start($fn); |
396
|
|
|
|
|
|
|
|
397
|
|
|
|
|
|
|
Start merging on file $FN. |
398
|
|
|
|
|
|
|
Fork off a merge thread, if necessary. |
399
|
|
|
|
|
|
|
|
400
|
|
|
|
|
|
|
=cut |
401
|
|
|
|
|
|
|
sub segment_merge_start($$) { |
402
|
0
|
|
|
0
|
1
|
|
my($self, $fn) = @_; |
403
|
|
|
|
|
|
|
|
404
|
0
|
0
|
|
|
|
|
if (!defined($self->{_merge_fred})) { |
405
|
|
|
|
|
|
|
# create our output so we can give it to merge-thread |
406
|
0
|
|
|
|
|
|
$self->finish_io_option('output', -clone => $self->{_in}); # , -outputheader => 'never'); |
407
|
|
|
|
|
|
|
|
408
|
0
|
0
|
|
|
|
|
print "# forking merge thread\n" if ($self->{_debug}); |
409
|
0
|
|
|
|
|
|
my(@writer_args) = (-cols => [qw(filename)], -autoflush => 1); |
410
|
0
|
|
|
|
|
|
my(@merge_args) = qw(--nolog --noclose --removeinputs --xargs); |
411
|
|
|
|
|
|
|
push(@merge_args, '--parallelism', $self->{_max_parallelism}) |
412
|
0
|
0
|
|
|
|
|
if (defined($self->{_max_parallelism})); |
413
|
|
|
|
|
|
|
push(@merge_args, '-T', $self->{_tmpdir}) |
414
|
0
|
0
|
|
|
|
|
if (defined($self->{_tmpdir})); |
415
|
0
|
|
|
|
|
|
push(@merge_args, @{$self->{_sort_argv}}); |
|
0
|
|
|
|
|
|
|
416
|
|
|
|
|
|
|
|
417
|
|
|
|
|
|
|
my($writer, $merge_fred) = dbpipeline_sink(\@writer_args, |
418
|
|
|
|
|
|
|
'--output' => $self->{_out}, |
419
|
0
|
|
|
|
|
|
dbmerge(@merge_args)); |
420
|
0
|
0
|
0
|
|
|
|
croak "dbsort: internal error in invoking dbmerge\n" |
421
|
|
|
|
|
|
|
if (!defined($writer) || !defined($merge_fred)); |
422
|
0
|
|
|
|
|
|
$self->{_merge_writer} = $writer; |
423
|
0
|
|
|
|
|
|
$self->{_merge_fred} = $merge_fred; |
424
|
|
|
|
|
|
|
}; |
425
|
0
|
0
|
|
|
|
|
print "# dbsort segment_merge_start: sending merge thread: $fn\n" if ($self->{_debug}); |
426
|
0
|
|
|
|
|
|
$self->{_merge_writer}->write_row($fn); |
427
|
|
|
|
|
|
|
} |
428
|
|
|
|
|
|
|
|
429
|
|
|
|
|
|
|
|
430
|
|
|
|
|
|
|
=head2 segment_merge_finish |
431
|
|
|
|
|
|
|
|
432
|
|
|
|
|
|
|
$self->segment_merge_finish(); |
433
|
|
|
|
|
|
|
|
434
|
|
|
|
|
|
|
Merge queued files, if any. |
435
|
|
|
|
|
|
|
Just call L to do all the real work. |
436
|
|
|
|
|
|
|
|
437
|
|
|
|
|
|
|
=cut |
438
|
|
|
|
|
|
|
|
439
|
|
|
|
|
|
|
sub segment_merge_finish($) { |
440
|
0
|
|
|
0
|
1
|
|
my($self) = @_; |
441
|
0
|
0
|
|
|
|
|
return if (!defined($self->{_merge_fred})); |
442
|
0
|
0
|
|
|
|
|
return if ($#{$self->{_files_to_merge}} == -1); |
|
0
|
|
|
|
|
|
|
443
|
|
|
|
|
|
|
|
444
|
0
|
0
|
|
|
|
|
print "# final output\n" if ($self->{_debug}); |
445
|
|
|
|
|
|
|
# tell it we're done |
446
|
0
|
|
|
|
|
|
$self->{_merge_writer}->close(); |
447
|
|
|
|
|
|
|
# and make it do its work |
448
|
0
|
|
|
|
|
|
$self->{_merge_fred}->join(); |
449
|
0
|
|
|
|
|
|
$self->{_merge_fred} = undef; |
450
|
|
|
|
|
|
|
} |
451
|
|
|
|
|
|
|
|
452
|
|
|
|
|
|
|
|
453
|
|
|
|
|
|
|
=head2 run |
454
|
|
|
|
|
|
|
|
455
|
|
|
|
|
|
|
$filter->run(); |
456
|
|
|
|
|
|
|
|
457
|
|
|
|
|
|
|
Internal: run over each rows. |
458
|
|
|
|
|
|
|
|
459
|
|
|
|
|
|
|
=cut |
460
|
|
|
|
|
|
|
sub run ($) { |
461
|
0
|
|
|
0
|
1
|
|
my($self) = @_; |
462
|
|
|
|
|
|
|
|
463
|
|
|
|
|
|
|
# |
464
|
|
|
|
|
|
|
# read in and set up the data |
465
|
|
|
|
|
|
|
# |
466
|
0
|
|
|
|
|
|
$self->{_files_to_merge} = []; |
467
|
0
|
|
|
|
|
|
my $read_fastpath_sub = $self->{_in}->fastpath_sub(); |
468
|
|
|
|
|
|
|
|
469
|
0
|
|
|
|
|
|
my $fref; # the current row |
470
|
|
|
|
|
|
|
my @rows; # an array of frefs for each row, $i long |
471
|
0
|
|
|
|
|
|
my $memory_used = 0; |
472
|
0
|
|
|
|
|
|
my $scaled_max_memory = int($self->{_max_memory} / (1.0 * $PERL_MEM_SCALING)); |
473
|
0
|
|
|
|
|
|
my $row_mem_overhead = $PERL_MEM_OVERHEAD * ($#{$self->{_in}->cols} + 2); |
|
0
|
|
|
|
|
|
|
474
|
|
|
|
|
|
|
|
475
|
0
|
|
|
|
|
|
$self->segment_start(\@rows); |
476
|
0
|
|
|
|
|
|
my $i = 0; |
477
|
0
|
|
|
|
|
|
while ($fref = &$read_fastpath_sub) { |
478
|
0
|
|
|
|
|
|
push (@rows, $fref); |
479
|
0
|
|
|
|
|
|
$memory_used += $row_mem_overhead; |
480
|
0
|
|
|
|
|
|
foreach (@$fref) { |
481
|
0
|
|
|
|
|
|
$memory_used += length($_); |
482
|
|
|
|
|
|
|
}; |
483
|
0
|
0
|
|
|
|
|
if ($memory_used > $scaled_max_memory) { |
484
|
0
|
|
|
|
|
|
$self->segment_overflow(\@rows); |
485
|
0
|
|
|
|
|
|
$memory_used = 0; |
486
|
|
|
|
|
|
|
}; |
487
|
|
|
|
|
|
|
}; |
488
|
|
|
|
|
|
|
# handle end case |
489
|
0
|
0
|
|
|
|
|
$self->segment_overflow(\@rows, 1) if ($#rows > -1); # (spill any records in queued) |
490
|
|
|
|
|
|
|
# merge, if necessary |
491
|
0
|
|
|
|
|
|
$self->segment_merge_finish(); |
492
|
|
|
|
|
|
|
# handle the null case: no output |
493
|
0
|
0
|
0
|
|
|
|
if ($#rows == -1 && $#{$self->{_files_to_merge}} == -1) { |
|
0
|
|
|
|
|
|
|
494
|
|
|
|
|
|
|
# open _out, just so we can log ourselves in finish() |
495
|
0
|
|
|
|
|
|
$self->finish_io_option('output', -clone => $self->{_in}); |
496
|
|
|
|
|
|
|
}; |
497
|
|
|
|
|
|
|
}; |
498
|
|
|
|
|
|
|
|
499
|
|
|
|
|
|
|
|
500
|
|
|
|
|
|
|
|
501
|
|
|
|
|
|
|
=head1 AUTHOR and COPYRIGHT |
502
|
|
|
|
|
|
|
|
503
|
|
|
|
|
|
|
Copyright (C) 1991-2015 by John Heidemann |
504
|
|
|
|
|
|
|
|
505
|
|
|
|
|
|
|
This program is distributed under terms of the GNU general |
506
|
|
|
|
|
|
|
public license, version 2. See the file COPYING |
507
|
|
|
|
|
|
|
with the distribution for details. |
508
|
|
|
|
|
|
|
|
509
|
|
|
|
|
|
|
=cut |
510
|
|
|
|
|
|
|
|
511
|
|
|
|
|
|
|
1; |