File Coverage

blib/lib/Fsdb/Filter/dbfilepivot.pm
Criterion Covered Total %
statement 24 137 17.5
branch 0 54 0.0
condition 0 9 0.0
subroutine 8 23 34.7
pod 5 5 100.0
total 37 228 16.2


line stmt bran cond sub pod time code
1             #!/usr/bin/perl
2              
3             #
4             # dbfilepivot.pm
5             # Copyright (C) 2011-2016 by John Heidemann
6             #
7             # This program is distributed under terms of the GNU general
8             # public license, version 2. See the file COPYING
9             # in $dblibdir for details.
10             #
11              
12             package Fsdb::Filter::dbfilepivot;
13              
14             =head1 NAME
15              
16             dbfilepivot - pivot a table, converting multiple rows into single wide row
17              
18             =head1 SYNOPSIS
19              
20             dbfilepivot [-e empty] -k KeyField -p PivotField [-v ValueField]
21              
22             =head1 DESCRIPTION
23              
24             Pivot a table, converting multiple rows corresponding to the
25             same key into a single wide row.
26              
27             In a normalized database, one might have data with a schema like
28             (id, attribute, value),
29             but sometimes it's more convenient to see the data with a schema like
30             (id, attribute1, attribute2).
31             (For example, gnuplot's stacked histograms requires denormalized data.)
32             Dbfilepivot converts the normalized format to the denormalized,
33             but sometimes useful, format.
34             Here the "id" is the key, the attribute is the "pivot",
35             and the value is, well, the optional "value".
36              
37             An example is clearer. A gradebook usually looks like:
38              
39             #fsdb name hw_1 hw_2 hw_3
40             John 97 98 99
41             Paul - 80 82
42              
43             but a properly normalized format would represent it as:
44              
45             #fsdb name hw score
46             John 1 97
47             John 2 98
48             John 3 99
49             Paul 2 80
50             Paul 3 82
51              
52             This tool converts the second form into the first, when used as
53              
54             dbfilepivot -k name -p hw -v score
55              
56             or
57              
58             dbfilepivot --possible-pivots='1 2 3' -k name -p hw -v score
59              
60             Here name is the I column that indicates which rows belong
61             to the same entity,
62             hw is the I column that will be indicate which column
63             in the output is relevant,
64             and score is the I that indicates what goes in the
65             output.
66              
67             The pivot creates a new column C, C, etc.
68             for each tag, the contents of the pivot field in the input.
69             It then populates those new columns with the contents of the value field
70             in the input.
71              
72             If no value column is specified, then values are either empty or 1.
73              
74             Dbfilepivot assumes all lines with the same key are adjacent
75             in the input source, like L with the F<-S> option.
76             To enforce this invariant, by default, it I input be sorted by key.
77              
78             There is no requirement that the pivot field be sorted (provided the key field is already sorted).
79              
80             By default, dbfilepivot makes two passes over its data
81             and so requires temporary disk space equal to the input size.
82             With the B<--possible-pivots> option, the user can specify pivots
83             and skip the second pass and avoid temporary data storage.
84              
85             Memory usage is proportional to the number of unique pivot values.
86              
87             The inverse of this commend is L.
88              
89              
90             =head1 OPTIONS
91              
92             =over 4
93              
94             =item B<-k> or B<--key> KeyField
95              
96             specify which column is the key for grouping.
97             Required (no default).
98              
99             =item B<-p> or B<--pivot> PivotField
100              
101             specify which column is the key to indicate which column in the output
102             is relevant.
103             Required (no default).
104              
105             =item B<-v> or B<--value> ValueField
106              
107             Specify which column is the value in the output.
108             If none is given, 1 is used for the value.
109              
110             =item B<--possible-pivots PP>
111              
112             Specify all possible pivot values as PP, a whitespace-separated list.
113             With this option, data is processed only once (not twice).
114              
115             =item B<-C S> or B<--element-separator S>
116              
117             Specify the separator I used to join the input's key column
118             with its contents.
119             (Defaults to a single underscore.)
120              
121             =item B<-e E> or B<--empty E>
122              
123             give value E as the value for empty (null) records
124              
125             =item B<-S> or B<--pre-sorted>
126              
127             Assume data is already grouped by key.
128             Provided twice, it removes the validation of this assertion.
129             By default, we sort by key.
130              
131             =item B<-T TmpDir>
132              
133             where to put tmp files.
134             Also uses environment variable TMPDIR, if -T is
135             not specified.
136             Default is /tmp.
137              
138             =back
139              
140             =for comment
141             begin_standard_fsdb_options
142              
143             This module also supports the standard fsdb options:
144              
145             =over 4
146              
147             =item B<-d>
148              
149             Enable debugging output.
150              
151             =item B<-i> or B<--input> InputSource
152              
153             Read from InputSource, typically a file name, or C<-> for standard input,
154             or (if in Perl) a IO::Handle, Fsdb::IO or Fsdb::BoundedQueue objects.
155              
156             =item B<-o> or B<--output> OutputDestination
157              
158             Write to OutputDestination, typically a file name, or C<-> for standard output,
159             or (if in Perl) a IO::Handle, Fsdb::IO or Fsdb::BoundedQueue objects.
160              
161             =item B<--autorun> or B<--noautorun>
162              
163             By default, programs process automatically,
164             but Fsdb::Filter objects in Perl do not run until you invoke
165             the run() method.
166             The C<--(no)autorun> option controls that behavior within Perl.
167              
168             =item B<--help>
169              
170             Show help.
171              
172             =item B<--man>
173              
174             Show full manual.
175              
176             =back
177              
178             =for comment
179             end_standard_fsdb_options
180              
181              
182             =head1 SAMPLE USAGE
183              
184             =head2 Input:
185              
186             #fsdb name hw score
187             John 1 97
188             John 2 98
189             John 3 99
190             Paul 2 80
191             Paul 3 82
192              
193             =head2 Command:
194              
195             cat data.fsdb | dbfilepivot -k name -p hw -v score
196              
197             =head2 Output:
198              
199             #fsdb name hw_1 hw_2 hw_3
200             John 97 98 99
201             Paul - 80 82
202             # | dbfilepivot -k name -p hw -v score
203              
204             =head1 SEE ALSO
205              
206             L.
207             L.
208             L.
209             L.
210              
211              
212             =head1 CLASS FUNCTIONS
213              
214             =cut
215              
216             @ISA = qw(Fsdb::Filter);
217             $VERSION = 2.0;
218              
219 1     1   5667 use strict;
  1         2  
  1         23  
220 1     1   4 use Pod::Usage;
  1         1  
  1         63  
221 1     1   4 use Carp;
  1         2  
  1         37  
222              
223 1     1   4 use Fsdb::Filter;
  1         1  
  1         14  
224 1     1   3 use Fsdb::IO::Reader;
  1         1  
  1         20  
225 1     1   3 use Fsdb::IO::Writer;
  1         1  
  1         13  
226 1     1   4 use Fsdb::IO::Replayable;
  1         1  
  1         22  
227 1     1   4 use Fsdb::Filter::dbpipeline qw(dbpipeline_filter dbsort);
  1         1  
  1         1267  
228              
229              
230             =head2 new
231              
232             $filter = new Fsdb::Filter::dbfilepivot(@arguments);
233              
234             Create a new dbfilepivot object, taking command-line arguments.
235              
236             =cut
237              
238             sub new ($@) {
239 0     0 1   my $class = shift @_;
240 0           my $self = $class->SUPER::new(@_);
241 0           bless $self, $class;
242 0           $self->set_defaults;
243 0           $self->parse_options(@_);
244 0           $self->SUPER::post_new();
245 0           return $self;
246             }
247              
248              
249             =head2 set_defaults
250              
251             $filter->set_defaults();
252              
253             Internal: set up defaults.
254              
255             =cut
256              
257             sub set_defaults ($) {
258 0     0 1   my($self) = @_;
259 0           $self->SUPER::set_defaults();
260 0           $self->{_elem_separator} = '_';
261 0 0         $self->{_tmpdir} = defined($ENV{'TMPDIR'}) ? $ENV{'TMPDIR'} : "/tmp";
262 0           $self->{_key_column} = undef;
263 0           $self->{_pivot_column} = undef;
264 0           $self->{_value_column} = undef;
265 0           $self->{_pre_sorted} = 0;
266 0           $self->{_sort_order} = undef;
267 0           $self->{_sort_as_numeric} = undef;
268 0           $self->{_possible_pivots} = undef;
269             }
270              
271             =head2 parse_options
272              
273             $filter->parse_options(@ARGV);
274              
275             Internal: parse command-line arguments.
276              
277             =cut
278              
279             sub parse_options ($@) {
280 0     0 1   my $self = shift @_;
281              
282 0           my(@argv) = @_;
283             $self->get_options(
284             \@argv,
285 0     0     'help|?' => sub { pod2usage(1); },
286 0     0     'man' => sub { pod2usage(-verbose => 2); },
287             'autorun!' => \$self->{_autorun},
288             'close!' => \$self->{_close},
289             'C|element-separator=s' => \$self->{_elem_separator},
290             'd|debug+' => \$self->{_debug},
291             'e|empty=s' => \$self->{_empty},
292 0     0     'i|input=s' => sub { $self->parse_io_option('input', @_); },
293             'k|key=s' => \$self->{_key_column},
294             'log!' => \$self->{_logprog},
295 0     0     'o|output=s' => sub { $self->parse_io_option('output', @_); },
296             'p|pivot=s' => \$self->{_pivot_column},
297             'possible-pivots=s' => \$self->{_possible_pivots},
298             'S|pre-sorted+' => \$self->{_pre_sorted},
299             'T|tmpdir|tempdir=s' => \$self->{_tmpdir},
300             'v|value=s' => \$self->{_value_column},
301             # sort key options:
302 0     0     'n|numeric' => sub { $self->{_sort_as_numeric} = 1; },
303 0     0     'N|lexical' => sub { $self->{_sort_as_numeric} = undef; },
304 0     0     'r|descending' => sub { $self->{_sort_order} = -1; },
305 0     0     'R|ascending' => sub { $self->{_sort_order} = 1; },
306 0 0         ) or pod2usage(2);
307 0 0         pod2usage(2) if ($#argv != -1);
308             }
309              
310             =head2 _find_possible_pivots
311              
312             $filter->_find_possible_pivots();
313              
314             Internal: scan input data to find all possible pivot values.
315              
316             Returns npivots, pivots_aref.
317              
318             =cut
319              
320             sub _find_possible_pivots($) {
321 0     0     my($self) = @_;
322              
323             #
324             # Read the data to find all possible pivots,
325             # saving a copy as we go.
326             #
327 0           $self->{_replayable} = new Fsdb::IO::Replayable(-writer_args => [ -clone => $self->{_in} ], -reader_args => [ -comment_handler => $self->create_pass_comments_sub ]);
328 0           my $save_out = $self->{_replayable_writer} = $self->{_replayable}->writer;
329 0           my $read_fastpath_sub = $self->{_in}->fastpath_sub();
330 0           my $save_write_fastpath_sub = $save_out->fastpath_sub;
331 0           my $fref;
332             my %pivots;
333 0           my $npivots = 0;
334             my $loop = q(
335             # first pass: reading data to find all possible pivots
336             while ($fref = &$read_fastpath_sub) {
337             my $value = $fref->[) . $self->{_pivot_coli} . q@];
338 0           if ($value ne '@ . $self->{_empty} . q@') {
339             $npivots++ if (!defined($pivots{$value}));
340             $pivots{$value} = 1;
341             };
342             &$save_write_fastpath_sub($fref);
343             };
344             @;
345 0 0         print $loop if ($self->{_debug});
346 0           eval $loop;
347 0 0         $@ && croak $self->{_prog} . ": internal eval error: $@.\n";
348              
349 0 0         if (defined($self->{_sorter_fred})) {
350 0           $self->{_sorter_fred}->join();
351 0           $self->{_sorter_fred} = undef;
352             };
353              
354 0           $self->{_replayable}->close;
355              
356 0           my(@pivots) = keys %pivots;
357              
358 0           return ($npivots, \@pivots);
359             }
360              
361              
362             =head2 _given_possible_pivots
363              
364             $filter->_given_possible_pivots();
365              
366             Internal: parse option of possible pivots.
367              
368             Returns npivots, pivots_href.
369              
370             =cut
371              
372             sub _given_possible_pivots($) {
373 0     0     my($self) = @_;
374              
375             #
376             # All possible pivots are given by the user.
377             #
378 0           my @pivots = split(/\s+/, $self->{_possible_pivots});
379 0           return ($#pivots + 1, \@pivots);
380             }
381              
382              
383             =head2 setup
384              
385             $filter->setup();
386              
387             Internal: setup, parse headers.
388              
389             =cut
390              
391             sub setup ($) {
392 0     0 1   my($self) = @_;
393              
394             croak $self->{_prog} . ": invalid empty value (single quote).\n"
395 0 0         if ($self->{_empty} eq "'");
396              
397             #
398             # guarantee data is sorted
399             # (swap reader if necessary)
400 0 0         if ($self->{_pre_sorted}) {
401             # pre-sorted, so just read it
402 0           $self->finish_io_option('input', -comment_handler => $self->create_delay_comments_sub);
403 0           $self->{_sorter_fred} = undef;
404             } else {
405             # not sorted, so sort it and read that
406 0           my @sort_args = ('--nolog', $self->{_key_column});
407 0 0         unshift(@sort_args, '--descending') if ($self->{_sort_order} == -1);
408 0 0         unshift(@sort_args, ($self->{_sort_as_numeric} ? '--numeric' : '--lexical'));
409 0           my($new_reader, $new_fred) = dbpipeline_filter($self->{_input}, [-comment_handler => $self->create_delay_comments_sub], dbsort(@sort_args));
410 0           $self->{_pre_sorted_input} = $self->{_input};
411 0           $self->{_in} = $new_reader;
412 0           $self->{_sorter_fred} = $new_fred;
413             };
414              
415 0 0         pod2usage(2) if (!defined($self->{_key_column}));
416 0           $self->{_key_coli} = $self->{_in}->col_to_i($self->{_key_column});
417             croak $self->{_prog} . ": key column " . $self->{_key_column} . " is not in input stream.\n"
418 0 0         if (!defined($self->{_key_coli}));
419              
420 0 0         pod2usage(2) if (!defined($self->{_pivot_column}));
421 0           $self->{_pivot_coli} = $self->{_in}->col_to_i($self->{_pivot_column});
422             croak $self->{_prog} . ": pivot column " . $self->{_pivot_column} . " is not in input stream.\n"
423 0 0         if (!defined($self->{_pivot_coli}));
424              
425 0 0         if (defined($self->{_value_column})) {
426 0           $self->{_value_coli} = $self->{_in}->col_to_i($self->{_value_column});
427             croak $self->{_prog} . ": value column " . $self->{_value_column} . " is not in input stream.\n"
428 0 0         if (!defined($self->{_value_coli}));
429             };
430              
431 0 0         my($npivots, $pivots_aref) = (defined($self->{_possible_pivots}) ? $self->_given_possible_pivots() : $self->_find_possible_pivots());
432 0 0         croak $self->{_prog} . ": no input data or pivots\n"
433             if ($npivots == 0);
434              
435             #
436             # Now that we know the pivots, make the new columns.
437             #
438             # kill the old pivot column, and value if given.
439             my @new_cols = grep(!($_ eq $self->{_pivot_column} ||
440             (defined($self->{_value_column}) && $_ eq $self->{_value_column})),
441 0   0       @{$self->{_in}->cols});
  0            
442 0           $self->finish_io_option('output', -clone => $self->{_in}, -cols => \@new_cols, -outputheader => 'delay');
443 0           my %tag_colis;
444             my %new_columns;
445 0           foreach (sort @$pivots_aref) {
446             # xxx: could try to sort numerically if all pivots are numbers
447 0           my $new_column = $self->{_pivot_column} . $self->{_elem_separator} . $_;
448 0           $new_columns{$new_column} = 1;
449             $self->{_out}->col_create($new_column)
450 0 0         or croak $self->{_prog} . ": cannot create column $new_column (maybe it already existed?)\n";
451 0           $tag_colis{$_} = $self->{_out}->col_to_i($new_column);
452             };
453 0           $self->{_tag_colis_href} = \%tag_colis;
454             # write the mapping code.
455 0           my $old_mapping_code = '';
456             # first the old bits
457 0           foreach (@{$self->{_in}->cols}) {
  0            
458             next if ($_ eq $self->{_pivot_column} ||
459 0 0 0       (defined($self->{_value_column}) && $_ eq $self->{_value_column}));
      0        
460             $old_mapping_code .= '$nf[' . $self->{_out}->col_to_i($_) . '] = ' .
461 0           '$fref->[' . $self->{_in}->col_to_i($_) . '];' . "\n";
462             };
463 0           $self->{_old_mapping_code} = $old_mapping_code;
464             # and initialize the new
465 0           my $new_initialization_code = '';
466 0           foreach (sort keys %new_columns) {
467 0           $new_initialization_code .= '$nf[' . $self->{_out}->col_to_i($_) . '] = ' . "\n";
468             };
469 0           $new_initialization_code .= "\t'" . $self->{_empty} . "';\n";
470 0           $self->{_new_initialization_code} = $new_initialization_code;
471             }
472              
473             =head2 run
474              
475             $filter->run();
476              
477             Internal: run over each rows.
478              
479             =cut
480             sub run ($) {
481 0     0 1   my($self) = @_;
482              
483 0 0         my $in_reader = (defined($self->{_replayable}) ? $self->{_replayable}->reader : $self->{_in});
484 0           my $read_fastpath_sub = $in_reader->fastpath_sub();
485 0           my $write_fastpath_sub = $self->{_out}->fastpath_sub();
486              
487             #
488             # Basic idea: mapreduce on the input
489             # with a multikey aware reducer.
490             #
491             # We don't actually run mapreduce
492             # because (sadly) it's easier to do it in-line
493             # given we assume sorted input.
494             #
495 0           my $emit_nf_code = '&$write_fastpath_sub(\@nf);';
496             my $check_ordering_code = '
497 0           die "' . $self->{_prog} . q': keys $old_key and $new_key are out-of-order\n" if ($old_key gt $new_key);
498             ';
499 0 0         $check_ordering_code = '' if ($self->{_pre_sorted} > 1);
500 0 0         my $value_value = (defined($self->{_value_column})) ? '$fref->[' . $self->{_value_coli} . ']' : '1';
501 0           my $tag_colis_href = $self->{_tag_colis_href};
502             my($loop) = q'
503             {
504             my $old_key = undef;
505             my $fref;
506             my @nf;
507             while ($fref = &$read_fastpath_sub()) {
508             my $new_key = $fref->[' . $self->{_key_coli} . '];
509             if (!defined($old_key) || $old_key ne $new_key) {
510             if (defined($old_key)) {
511             ' . $emit_nf_code .
512             $check_ordering_code . '
513             };
514             ' . $self->{_new_initialization_code} . '
515             ' . $self->{_old_mapping_code} . '
516             $old_key = $new_key;
517             };
518 0           my $pivot_value = $fref->[' . $self->{_pivot_coli} . '];
519             my $target_coli = $tag_colis_href->{$pivot_value};
520             die $self->{_prog} . ": unanticipated pivot value $pivot_value (forgot it in --possible-pivots?).\n"
521             if (!defined($target_coli));
522             $nf[$target_coli] = ' . $value_value . ';
523             };
524             if (defined($old_key)) {
525             ' . $emit_nf_code . "
526             };
527             }\n";
528 0 0         print $loop if ($self->{_debug});
529 0           eval $loop;
530 0 0         if ($@) {
531             # propagate sort failure cleanly
532 0 0         if ($@ =~ /^$self->{_prog}/) {
533 0           croak "$@";
534             } else {
535 0           croak $self->{_prog} . ": internal eval error: $@.\n";
536             };
537             };
538              
539             # If single pass, we may need to collect this thread here.
540 0 0         if (defined($self->{_sorter_fred})) {
541 0           $self->{_sorter_fred}->join();
542 0           $self->{_sorter_fred} = undef;
543             };
544              
545             }
546              
547              
548             =head1 AUTHOR and COPYRIGHT
549              
550             Copyright (C) 2011-2016 by John Heidemann
551              
552             This program is distributed under terms of the GNU general
553             public license, version 2. See the file COPYING
554             with the distribution for details.
555              
556             =cut
557              
558             1;