File Coverage

blib/lib/Fsdb/Filter/dbpipeline.pm
Criterion Covered Total %
statement 27 195 13.8
branch 0 68 0.0
condition 0 15 0.0
subroutine 9 30 30.0
pod 10 10 100.0
total 46 318 14.4


line stmt bran cond sub pod time code
1             #!/usr/bin/perl -w
2              
3             #
4             # dbpipeline.pm
5             # Copyright (C) 2007-2016 by John Heidemann
6             #
7             # This program is distributed under terms of the GNU general
8             # public license, version 2. See the file COPYING
9             # in $dblibdir for details.
10             #
11              
12             package Fsdb::Filter::dbpipeline;
13              
14             =head1 NAME
15              
16             dbpipeline - allow db commands to be assembled as pipelines in Perl
17              
18             =head1 SYNOPSIS
19              
20             use Fsdb::Filter::dbpipeline qw(:all);
21             dbpipeline(
22             dbrow(qw(name test1)),
23             dbroweval('_test1 += 5;')
24             );
25              
26             Or for more customized versions, see
27             L,
28             L,
29             L,
30             and
31             L.
32              
33              
34             =head1 DESCRIPTION
35              
36             This module makes it easy to create pipelines in Perl
37             using separate processes.
38             (In the past we used to use perl threads.)
39              
40             By default (as with all Fsdb modules), input is from STDIN and output
41             to STDOUT. Two helper functions, fromfile and tofile can grab
42             data from files.
43              
44             Dbpipeline differs in several ways from all other Fsdb::Filter modules:
45             it does not have a corresponding Unix command (it is used only from
46             within Perl).
47             It does not log its presence to the output stream (this is arguably a bug,
48             but it doesn't actually do anything).
49              
50              
51             =head1 OPTIONS
52              
53             Unlike most Fsdb modules, dbpipeline defaults to C<--autorun>.
54              
55             =for comment
56             begin_standard_fsdb_options
57              
58             This module also supports the standard fsdb options:
59              
60             =over 4
61              
62             =item B<-d>
63              
64             Enable debugging output.
65              
66             =item B<-i> or B<--input> InputSource
67              
68             Read from InputSource, typically a file name, or C<-> for standard input,
69             or (if in Perl) a IO::Handle, Fsdb::IO or Fsdb::BoundedQueue objects.
70              
71             =item B<-o> or B<--output> OutputDestination
72              
73             Write to OutputDestination, typically a file name, or C<-> for standard output,
74             or (if in Perl) a IO::Handle, Fsdb::IO or Fsdb::BoundedQueue objects.
75              
76             =item B<--autorun> or B<--noautorun>
77              
78             By default, programs process automatically,
79             but Fsdb::Filter objects in Perl do not run until you invoke
80             the run() method.
81             The C<--(no)autorun> option controls that behavior within Perl.
82              
83             =item B<--help>
84              
85             Show help.
86              
87             =item B<--man>
88              
89             Show full manual.
90              
91             =back
92              
93             =for comment
94             end_standard_fsdb_options
95              
96              
97             =head1 SEE ALSO
98              
99             L
100              
101              
102             =head1 CLASS FUNCTIONS
103              
104             =cut
105              
106             @ISA = qw(Fsdb::Filter);
107             ($VERSION) = 2.0;
108              
109 1     1   5 use Exporter 'import';
  1         2  
  1         201  
110             @EXPORT = ();
111             @EXPORT_OK = qw(
112             dbpipeline_filter
113             dbpipeline_sink
114             dbpipeline_open2
115             dbpipeline_close2_hash
116             );
117             # update them here, in toplevel Makefile.PL, and below in the documentation.
118             # and add dbpipeline to this list
119             # BEGIN AUTOGENERATED VARIABLE SECTION
120             # This next section is managed by update_modules. DO NOT EDIT DIRECTLY.
121             our @modularized_db_programs = qw(
122             dbcol
123             dbcolcopylast
124             dbcolcreate
125             dbcoldefine
126             dbcolhisto
127             dbcolmerge
128             dbcolmovingstats
129             dbcolneaten
130             dbcolpercentile
131             dbcolrename
132             dbcolscorrelate
133             dbcolsplittocols
134             dbcolsplittorows
135             dbcolsregression
136             dbcolstats
137             dbcolstatscores
138             dbfilealter
139             dbfilecat
140             dbfilediff
141             dbfilepivot
142             dbfilestripcomments
143             dbfilevalidate
144             dbformmail
145             dbjoin
146             dbmapreduce
147             dbmerge
148             dbmerge2
149             dbmultistats
150             dbrow
151             dbrowaccumulate
152             dbrowcount
153             dbrowdiff
154             dbroweval
155             dbrowuniq
156             dbrvstatdiff
157             dbsort
158             );
159             our @modularized_db_converters = qw(
160             cgi_to_db
161             combined_log_format_to_db
162             csv_to_db
163             db_to_csv
164             db_to_html_table
165             html_table_to_db
166             kitrace_to_db
167             mysql_to_db
168             tabdelim_to_db
169             tcpdump_to_db
170             xml_to_db
171             );
172             our @modularized_db_non_programs = qw(
173             dbpipeline
174             dbsubprocess
175             );
176             # END AUTOGENERATED VARIABLE SECTION
177             %EXPORT_TAGS = (all => [@EXPORT_OK, @modularized_db_programs, @modularized_db_converters, @modularized_db_non_programs]);
178             Exporter::export_ok_tags('all');
179             my %autoloadable = map { $_ => 1 } @modularized_db_programs, @modularized_db_converters, @modularized_db_non_programs;
180              
181 1     1   7 use strict;
  1         2  
  1         20  
182 1     1   5 use Carp;
  1         2  
  1         145  
183 1     1   8 use Pod::Usage;
  1         1  
  1         94  
184 1     1   5 use IO::Pipe;
  1         9  
  1         23  
185              
186 1     1   83 use Fsdb::Filter;
  1         2  
  1         20  
187 1     1   5 use Fsdb::IO::Reader;
  1         1  
  1         17  
188 1     1   3 use Fsdb::IO::Writer;
  1         1  
  1         20  
189 1     1   4 use Fsdb::Support::Freds;
  1         1  
  1         1769  
190              
191             #
192             # First off, create all the bindings we promise in EXPORT_TAGS.
193             # Automated via AUTOLOAD for extra coolness.
194             #
195             our $AUTOLOAD;
196             sub AUTOLOAD {
197 0     0     my $sub = $AUTOLOAD;
198 0           (my $localsub = $sub) =~ s/.*:://;
199             die "dbpipeline: AUTOLOAD on non-autoloadable sub $sub\n"
200 0 0         if (!defined($autoloadable{$localsub}));
201 0           eval "sub $localsub { use Fsdb::Filter::$localsub; return new Fsdb::Filter::$localsub(" . '@_' . "); };\n";
202 0 0         $@ and die "dbpipeline: error creating stubs: $@\n";
203 0           goto &$sub;
204             }
205              
206       0     sub DESTROY {
207             # just suppress autoloading warnings
208             }
209              
210             =head2 dbpipeline
211              
212             dbpipeline(@modules);
213              
214             This shorthand-routine creates a dbpipeline object
215             and then I.
216              
217             Thus perl code becomes nearly as terse as shell code:
218              
219             dbpipeline(
220             dbcol(qw(name test1)),
221             dbroweval('_test1 += 5;'),
222             );
223              
224             The following commands currently have shorthand aliases:
225              
226             =for comment
227             BEGIN AUTOGENERATED DOCUMENTATION SECTION
228              
229             =over
230              
231             =item L
232              
233             =item L
234              
235             =item L
236              
237             =item L
238              
239             =item L
240              
241             =item L
242              
243             =item L
244              
245             =item L
246              
247             =item L
248              
249             =item L
250              
251             =item L
252              
253             =item L
254              
255             =item L
256              
257             =item L
258              
259             =item L
260              
261             =item L
262              
263             =item L
264              
265             =item L
266              
267             =item L
268              
269             =item L
270              
271             =item L
272              
273             =item L
274              
275             =item L
276              
277             =item L
278              
279             =item L
280              
281             =item L
282              
283             =item L
284              
285             =item L
286              
287             =item L
288              
289             =item L
290              
291             =item L
292              
293             =item L
294              
295             =item L
296              
297             =item L
298              
299             =item L
300              
301             =item L
302              
303             =item L
304              
305             =item L
306              
307             =item L
308              
309             =item L
310              
311             =item L
312              
313             =item L
314              
315             =item L
316              
317             =item L
318              
319             =item L
320              
321             =item L
322              
323             =item L
324              
325             =back
326              
327             =for comment
328             END AUTOGENERATED DOCUMENTATION SECTION
329              
330             and
331              
332             =over 4
333              
334             =item L
335              
336             =back
337              
338              
339             =cut
340              
341             =head2 dbpipeline_filter
342              
343             my($result_reader, $fred) = dbpipeline_filter($source, $result_reader_aref, @modules);
344              
345             Set up a pipeline of @MODULES that filters data pushed through it,
346             where the data comes from $SOURCE
347             (any L object,
348             like a Fsdb::IO::Reader object, queue, or filename).
349              
350             Returns a $RESULT_READER Fsdb::IO::Reader object,
351             created with $RESULT_READER_AREF as options.
352             This reader will produce the filtered data,
353             and a $FRED that must be joined to guarantee output
354             has completed.
355              
356             Or if $RESULT_READER_AREF is C<[-raw_fh, 1]>, it just returns the IO::Handle
357             to the pipe.
358              
359             As an example, this code uses C to insure the input
360             (from C<$in> which is a filename or L) is sorted
361             numerically by column C:
362              
363             use Fsdb::Filter::dbpipeline qw(dbpipeline_filter dbsort);
364             my($new_in, $new_fred) = dbpipeline_filter($in,
365             [-comment_handler => $self->create_delay_comments_sub],
366             dbsort(qw(--nolog -n x)));
367             while (my $fref = $new_in->read_rowwobj()) {
368             # do something
369             };
370             $new_in->close;
371             $new_fred->join();
372              
373             =cut
374              
375             sub dbpipeline_filter($@) {
376 0     0 1   my($pre_filter_source) = shift @_;
377 0           my($post_filter_reader_aref) = shift @_;
378 0           my(@args) = @_;
379              
380 0           my $pipe = new IO::Pipe;
381              
382             my $pipeline_fred = new Fsdb::Support::Freds('dbpipeline_filter',
383             sub {
384 0     0     $pipe->writer();
385 0           my $pipeline = new Fsdb::Filter::dbpipeline(
386             '--noautorun',
387             '--input' => $pre_filter_source,
388             '--output' => $pipe,
389             @args);
390 0           $pipeline->setup_run_finish;
391 0           exit 0;
392 0           });
393 0 0         if ($pipeline_fred->error()) {
394 0           $pipe->close;
395 0           return (undef, $pipeline_fred);
396             };
397 0           $pipe->reader();
398             # Next line will block until pipeline produces the header!
399 0 0 0       if ($#{$post_filter_reader_aref} >= 1
  0   0        
400             && $post_filter_reader_aref->[0] eq '-raw_fh'
401             && $post_filter_reader_aref->[1]) {
402 0           return ($pipe, $pipeline_fred);
403             };
404 0           my $post_filter = new Fsdb::IO::Reader(-fh => $pipe, @$post_filter_reader_aref);
405 0           return ($post_filter, $pipeline_fred);
406             }
407              
408              
409             =head2 dbpipeline_sink
410              
411             my($fsdb_writer, $fred) = dbpipeline_sink($writer_arguments_aref, @modules);
412              
413             Set up a pipeline of @MODULES that is a data "sink", where the output
414             is given by a C<--output> argument, or goes to standard output (by default).
415             The caller generates input into the pipeline
416             by writing to a newly created $FSDB_WRITER,
417             whose configuration is specified by the mandatory first
418             argument $WRITER_ARGUMENTS_AREF.
419             (These arguments should include the schema.)
420             Returns this writer, and a $FRED that must be joined to guarantee output
421             has completed.
422              
423             If the first argument to modules is "--fred_exit_sub",
424             then the second is taken as a CODE block that runs at fred exit
425             (and the two are not passed to modules).
426              
427             If the first argument to modules is "--fred_description",
428             then the second is taken as a text description of the Fred.
429              
430             =cut
431              
432             sub dbpipeline_sink($@) {
433 0     0 1   my($writer_aref) = shift @_;
434 0           my(@args) = @_;
435              
436 0           my $fred_exit_sub = undef;
437 0           my $fred_desc = 'dbpipeline_sink';
438 0           for (;;) {
439 0 0 0       last if ($#args == -1 || ref($args[0]) ne '');
440 0 0         if ($args[0] eq '--fred_exit_sub') {
    0          
441 0           shift @args;
442 0           $fred_exit_sub = shift @args;
443             } elsif ($args[0] eq '--fred_description') {
444 0           shift @args;
445 0           $fred_desc = shift @args;
446             } else {
447 0           last;
448             };
449             };
450            
451 0           my $pipe = new IO::Pipe;
452              
453             my $pipeline_fred = new Fsdb::Support::Freds($fred_desc,
454             sub {
455 0     0     $pipe->reader();
456 0           my $pipeline = new Fsdb::Filter::dbpipeline(
457             '--noautorun',
458             '--input' => $pipe,
459             @args);
460 0           $pipeline->setup_run_finish;
461 0           exit 0;
462 0           }, $fred_exit_sub);
463 0 0         if ($pipeline_fred->error()) {
464 0           $pipe->close;
465 0           return (undef, $pipeline_fred);
466             };
467 0           $pipe->writer();
468 0           my $writer = new Fsdb::IO::Writer(-fh => $pipe, @$writer_aref);
469 0           return ($writer, $pipeline_fred);
470             }
471              
472             =head2 dbpipeline_open2
473              
474             my($fsdb_reader_fh, $fsdb_writer, $fred) =
475             dbpipeline_open2($writer_arguments_aref, @modules);
476              
477             Set up a pipeline of @MODULES that is a data sink and source (both!).
478             The caller generates input into the pipeline
479             by writing to a newly created $FSDB_WRITER,
480             whose configuration is specified by the mandatory
481             argument $WRITER_ARGUMENTS_AREF.
482             These arguments should include the schema.)
483             The output of the pipeline comes out to the newly
484             created $FSDB_READER_FH.
485             Returns this read queue and writer,
486             and a $PID that must be joined to guarantee output
487             has completed.
488              
489             (Unfortunately the interface is asymmetric with a read I
490             but a write C object, because C blocks on
491             input of the header.)
492              
493             Like L, with all of its pros and cons like potential deadlock.
494              
495             =cut
496              
497             sub dbpipeline_open2 ($@) {
498 0     0 1   my($writer_aref) = shift @_;
499 0           my(@args) = @_;
500            
501 0           my $into_pipeline_pipe = new IO::Pipe;
502 0           my $from_pipeline_pipe = new IO::Pipe;
503             my $pipeline_fred = new Fsdb::Support::Freds('dbpipeline_open2',
504             sub {
505 0     0     $into_pipeline_pipe->reader();
506 0           $from_pipeline_pipe->writer();
507 0           my $pipeline = new Fsdb::Filter::dbpipeline(
508             '--noautorun',
509             '--input' => $into_pipeline_pipe,
510             '--output' => $from_pipeline_pipe,
511             @args);
512 0           $pipeline->setup_run_finish;
513 0           exit 0;
514 0           });
515 0 0         if ($pipeline_fred->error()) {
516 0           $into_pipeline_pipe->close;
517 0           $from_pipeline_pipe->close;
518 0           return (undef, undef, $pipeline_fred);
519             };
520             # can't make a reader here since it will block on the header
521             # my $reader = new Fsdb::IO::Reader(-queue => $from_pipeline_queue, @$reader_aref);
522 0           $into_pipeline_pipe->writer();
523 0           $from_pipeline_pipe->reader();
524 0           my $writer = new Fsdb::IO::Writer(-fh => $into_pipeline_pipe, @$writer_aref);
525 0           return ($from_pipeline_pipe, $writer, $pipeline_fred);
526             }
527              
528             =head2 dbpipeline_close2_hash
529              
530             my($href) = dbpipeline_close2_hash($fsdb_read_fh, $fsdb_writer, $pid);
531              
532             Reads and returns one line of output from $FSDB_READER,
533             after closing $FSDB_WRITER and joining the $PID.
534              
535             Useful, for example, to get L output cleanly.
536              
537             =cut
538              
539             sub dbpipeline_close2_hash ($$$) {
540 0     0 1   my($read_fh, $writer, $fred) = @_;
541 0 0         $writer->close if (defined($writer));
542 0 0         if (defined($fred)) {
543 0           $fred->join();
544             };
545 0           my %out_hash;
546 0           my $reader = new Fsdb::IO::Reader(-fh => $read_fh);
547 0 0         $reader->error and croak "dbpipeline_close2_hash: couldn't setup reader.\n";
548 0 0         $reader->read_row_to_href(\%out_hash) or croak "dbpipeline_close2_hash: no output from pipeline.\n";
549             # check for eof
550 0           my $o;
551 0           while ($o = $reader->read_rowobj) {
552 0 0         next if (!ref($o)); # comment
553             # data is bad
554 0 0         $o and croak "dbpipeline_close2_hash: multiple lines of output.\n";
555             };
556 0           return \%out_hash;
557             }
558              
559              
560             =head2 new
561              
562             $filter = new Fsdb::Filter::dbpipeline(@arguments);
563              
564             =cut
565              
566             sub new {
567 0     0 1   my $class = shift @_;
568 0           my $self = $class->SUPER::new(@_);
569 0           bless $self, $class;
570 0           $self->set_defaults;
571 0           $self->parse_options(@_);
572 0           $self->SUPER::post_new();
573 0           return $self;
574             }
575              
576              
577             =head2 set_defaults
578              
579             $filter->set_defaults();
580              
581             Internal: set up defaults.
582              
583             =cut
584              
585             sub set_defaults ($) {
586 0     0 1   my($self) = @_;
587 0           $self->{_modules} = [];
588 0           $self->SUPER::set_defaults();
589 0           $self->{_autorun} = 1; # override superclass default
590 0           $self->{_header} = undef;
591             }
592              
593             =head2 parse_options
594              
595             $filter->parse_options(@ARGV);
596              
597             Internal: parse options
598              
599             =cut
600              
601             sub parse_options ($@) {
602 0     0 1   my $self = shift @_;
603              
604 0           my @argv = @_;
605             $self->get_options(
606             \@argv,
607 0     0     'help|?' => sub { pod2usage(1); },
608 0     0     'man' => sub { pod2usage(-verbose => 2); },
609             'autorun!' => \$self->{_autorun},
610             'close!' => \$self->{_close},
611             'd|debug+' => \$self->{_debug},
612             'header=s' => \$self->{_header},
613 0     0     'i|input=s' => sub { $self->parse_io_option('input', @_); },
614             'log!' => \$self->{_logprog},
615 0     0     'o|output=s' => sub { $self->parse_io_option('output', @_); },
616 0 0         ) or pod2usage(2);
617 0           push(@{$self->{_modules}}, @argv);
  0            
618             }
619              
620             =head2 setup
621              
622             $filter->_reap();
623              
624             Internal: reap any forked threads.
625              
626             =cut
627              
628             sub _reap($) {
629 0     0     my($self) = @_;
630             # wait for termination of everyone, in order (it's a pipeline)
631 0           while ($#{$self->{_freds}} >= 0) {
  0            
632 0           my $fred = shift @{$self->{_freds}};
  0            
633 0           my $res = $fred->join();
634 0 0         if ($res == -1) {
635             print STDERR "dbpipeline: join on fred returns error.\n"
636 0 0         if ($self->{_debug});
637             };
638 0 0         if ($fred->exit_code() != 0) {
639             print STDERR "dbpipeline: child returns exit code " . $fred->exit_code() . "\n"
640 0 0         if ($self->{_debug});
641             };
642             };
643             }
644              
645              
646             =head2 setup
647              
648             $filter->setup();
649              
650             Internal: setup, parse headers.
651              
652             =cut
653              
654             sub setup ($) {
655 0     0 1   my($self) = @_;
656              
657 0           my $prev_module_i = $#{$self->{_modules}};
  0            
658 0 0         die $self->{_prog} . ": no modules in pipeline.\n"
659             if ($prev_module_i < 0);
660              
661             #
662             # Make sure module inputs are sensible.
663             #
664 0           my $prev_mod = undef;
665 0           my $i = 0;
666 0           my $mod;
667 0           foreach $mod (@{$self->{_modules}}) {
  0            
668 0 0         die $self->{_prog} . ": module $i isn't type Fsdb::Filter.\n"
669             if (ref($mod) !~ /^Fsdb::Filter/);
670 0 0         if (defined($prev_mod)) {
671 0 0         die $self->{_prog} . ": incompatible module input and output between modules $i and " . $i+1 . ".\n"
672             if ($prev_mod->info('output_type') ne $mod->info('input_type'));
673             # xxx: above is a bit too strict, since fsdbtext should match fsdb*
674             };
675              
676 0           $prev_mod = $mod;
677 0           $i++;
678             };
679              
680             #
681             # Everything benign has now happend.
682             #
683             # Now fork off processes for each child.
684             # Ideally we would do that in run,
685             # except that perl has problems when a pipe (like stdin) is
686             # opened in one thread and used in another---it trys to lseek
687             # and gives up when lseek fails on the pipe.
688             # Sigh. These details are all 5.8.8... maybe 5.10 fixes them?
689             #
690             # Setup children.
691             # Built a queue to tell us when to reap them.
692             #
693             # Fork of each, in order, then we run the final one.
694             #
695 0           $self->{_freds} = [];
696 0           $i = 0;
697 0           my $final_mod_i = $#{$self->{_modules}};
  0            
698 0           my $prev_pipe = undef;
699 0           my $err = undef;
700 0           foreach $mod (@{$self->{_modules}}) {
  0            
701 0 0         last if ($i == $final_mod_i); # do last module in-line
702             # my $pipe = ($i == $final_mod_i) ? undef : new IO::Pipe;
703 0           my $pipe = new IO::Pipe;
704             my $fred = new Fsdb::Support::Freds('dbpipeline',
705             sub {
706 0 0   0     $prev_pipe->reader() if ($prev_pipe);
707 0   0       my $in = $prev_pipe // $self->{_input};
708 0 0 0       $mod->parse_options("--header" => $self->{_header}) if (defined($self->{_header}) && !$prev_pipe);
709 0 0         $mod->parse_options("--input" => $in) if (defined($in));
710 0 0         $pipe->writer() if ($pipe);
711             # $mod->parse_options("--output" => ($pipe ? $pipe : $self->{_output}));
712 0           $mod->parse_options("--output" => $pipe);
713 0           $mod->setup_run_finish();
714 0           exit 0;
715 0           });
716 0 0         if ($fred->error()) {
717 0           $pipe->close;
718 0           $err = $fred->error();
719 0           last;
720             };
721             # parent
722 0           push (@{$self->{_freds}}, $fred);
  0            
723 0           $i++;
724 0           $prev_pipe = $pipe;
725 0           $prev_mod = $mod;
726             };
727 0 0         if ($err) {
728 0           $self->_reap();
729 0           return;
730             };
731             # start last one in this process
732 0 0         $prev_pipe->reader() if ($prev_pipe);
733 0           my $final_mod = $self->{_modules}[$#{$self->{_modules}}];
  0            
734 0 0         $final_mod->parse_options("--input" => ($prev_pipe ? $prev_pipe : $self->{_input}));
735             $final_mod->parse_options("--output" => $self->{_output})
736 0 0         if (defined($self->{_output}));
737 0           $final_mod->setup();
738             }
739              
740             =head2 run
741              
742             $filter->run();
743              
744             Internal: run over all IO
745              
746             =cut
747             sub run ($) {
748 0     0 1   my($self) = @_;
749 0           $self->{_modules}[$#{$self->{_modules}}]->run();
  0            
750 0           $self->_reap();
751             }
752              
753             =head2 finish
754              
755             $filter->finish();
756              
757             Internal: we would write a trailer, but we don't because
758             we depend on the last command in the pipeline to do that.
759             We don't actually have a valid output stream.
760              
761             =cut
762             sub finish ($) {
763 0     0 1   my($self) = @_;
764 0           $self->{_modules}[$#{$self->{_modules}}]->finish();
  0            
765             # $self->SUPER::finish();
766             }
767              
768             =head1 AUTHOR and COPYRIGHT
769              
770             Copyright (C) 1991-2016 by John Heidemann
771              
772             This program is distributed under terms of the GNU general
773             public license, version 2. See the file COPYING
774             with the distribution for details.
775              
776             =cut
777              
778             1;
779