| line | stmt | bran | cond | sub | pod | time | code | 
| 1 |  |  |  |  |  |  | package App::RecordStream::Operation::chain; | 
| 2 |  |  |  |  |  |  |  | 
| 3 |  |  |  |  |  |  | our $VERSION = "4.0.23"; | 
| 4 |  |  |  |  |  |  |  | 
| 5 | 2 |  |  | 2 |  | 978 | use strict; | 
|  | 2 |  |  |  |  | 6 |  | 
|  | 2 |  |  |  |  | 57 |  | 
| 6 | 2 |  |  | 2 |  | 10 | use warnings; | 
|  | 2 |  |  |  |  | 4 |  | 
|  | 2 |  |  |  |  | 42 |  | 
| 7 |  |  |  |  |  |  |  | 
| 8 | 2 |  |  | 2 |  | 10 | use App::RecordStream::Operation; | 
|  | 2 |  |  |  |  | 12 |  | 
|  | 2 |  |  |  |  | 38 |  | 
| 9 | 2 |  |  | 2 |  | 8 | use App::RecordStream::Stream::Printer; | 
|  | 2 |  |  |  |  | 5 |  | 
|  | 2 |  |  |  |  | 36 |  | 
| 10 |  |  |  |  |  |  |  | 
| 11 | 2 |  |  | 2 |  | 7 | use base qw(App::RecordStream::Operation); | 
|  | 2 |  |  |  |  | 4 |  | 
|  | 2 |  |  |  |  | 2054 |  | 
| 12 |  |  |  |  |  |  |  | 
| 13 |  |  |  |  |  |  | sub init { | 
| 14 | 1 |  |  | 1 | 0 | 2 | my $this = shift; | 
| 15 | 1 |  |  |  |  | 2 | my $args = shift; | 
| 16 |  |  |  |  |  |  |  | 
| 17 | 1 |  |  |  |  | 2 | my ($show_chain, $dry_run); | 
| 18 |  |  |  |  |  |  | my $spec = { | 
| 19 |  |  |  |  |  |  | 'show-chain' => \$show_chain, | 
| 20 | 1 |  |  | 1 |  | 560 | 'n'          => sub { $show_chain = 1; $dry_run = 1; }, | 
|  | 1 |  |  |  |  | 3 |  | 
| 21 | 1 |  |  |  |  | 7 | }; | 
| 22 |  |  |  |  |  |  |  | 
| 23 | 1 |  |  |  |  | 7 | $this->parse_options($args, $spec, ['require_order']); | 
| 24 |  |  |  |  |  |  |  | 
| 25 | 1 | 50 |  |  |  | 3 | return unless (@$args); | 
| 26 |  |  |  |  |  |  |  | 
| 27 | 1 | 50 |  |  |  | 5 | unless ( App::RecordStream::Operation::is_recs_operation($args->[0]) ) { | 
| 28 | 0 |  |  |  |  | 0 | die "First chained command must be standard recs command not shell command!\n"; | 
| 29 |  |  |  |  |  |  | } | 
| 30 |  |  |  |  |  |  |  | 
| 31 | 1 | 50 |  |  |  | 6 | $this->{'SAVED_ARGS'} = [@$args] if ( $show_chain ); | 
| 32 |  |  |  |  |  |  |  | 
| 33 | 1 | 50 |  |  |  | 4 | if ( $dry_run ) { | 
| 34 | 1 |  |  |  |  | 2 | $this->{'DRY_RUN'} = $dry_run; | 
| 35 | 1 |  |  |  |  | 9 | return; | 
| 36 |  |  |  |  |  |  | } | 
| 37 |  |  |  |  |  |  |  | 
| 38 | 0 |  |  |  |  | 0 | my $operations = $this->create_operations($args); | 
| 39 |  |  |  |  |  |  |  | 
| 40 | 0 |  |  |  |  | 0 | my ($first_operation, $last_operation, $continuation_pid) = $this->setup_operations($operations); | 
| 41 |  |  |  |  |  |  |  | 
| 42 | 0 |  |  |  |  | 0 | $this->{'CHAIN_HEAD'}        = $first_operation; | 
| 43 | 0 |  |  |  |  | 0 | $this->{'CHAIN_TAIL'}        = $last_operation; | 
| 44 | 0 |  |  |  |  | 0 | $this->{'CONTINUATION_PID'}  = $continuation_pid; | 
| 45 |  |  |  |  |  |  | } | 
| 46 |  |  |  |  |  |  |  | 
| 47 |  |  |  |  |  |  | sub print_chain { | 
| 48 | 1 |  |  | 1 | 0 | 2 | my $this = shift; | 
| 49 | 1 |  |  |  |  | 2 | my $args = shift; | 
| 50 |  |  |  |  |  |  |  | 
| 51 | 1 |  |  |  |  | 2 | my @current_command; | 
| 52 | 1 |  |  |  |  | 2 | my $was_shell = 0; | 
| 53 |  |  |  |  |  |  |  | 
| 54 | 1 |  |  |  |  | 6 | $this->push_line("Chain Starts with:"); | 
| 55 |  |  |  |  |  |  |  | 
| 56 | 1 |  |  |  |  | 2 | my $indent = 1; | 
| 57 | 1 |  |  |  |  | 1 | my $last; | 
| 58 | 1 |  |  |  |  | 3 | foreach my $arg ( @$args ) { | 
| 59 | 13 | 100 |  |  |  | 32 | if ( $arg eq '|' ) { | 
| 60 | 4 |  |  |  |  | 21 | $was_shell = $this->print_command(\@current_command, $last, \$indent); | 
| 61 |  |  |  |  |  |  |  | 
| 62 | 4 |  |  |  |  | 10 | $last = [@current_command]; | 
| 63 | 4 |  |  |  |  | 39 | @current_command = (); | 
| 64 | 4 |  |  |  |  | 8 | next; | 
| 65 |  |  |  |  |  |  | } | 
| 66 | 9 |  |  |  |  | 17 | push @current_command, $arg; | 
| 67 |  |  |  |  |  |  | } | 
| 68 |  |  |  |  |  |  |  | 
| 69 | 1 |  |  |  |  | 3 | $this->print_command(\@current_command, $last, \$indent); | 
| 70 |  |  |  |  |  |  | } | 
| 71 |  |  |  |  |  |  |  | 
| 72 |  |  |  |  |  |  | sub print_command { | 
| 73 | 5 |  |  | 5 | 0 | 7 | my $this            = shift; | 
| 74 | 5 |  |  |  |  | 10 | my $current_command = shift; | 
| 75 | 5 |  |  |  |  | 6 | my $last            = shift; | 
| 76 | 5 |  |  |  |  | 10 | my $indent          = shift; | 
| 77 |  |  |  |  |  |  |  | 
| 78 | 5 |  |  |  |  | 6 | my $message = ''; | 
| 79 | 5 | 100 |  |  |  | 11 | if ( defined $last ) { | 
| 80 | 4 | 100 | 66 |  |  | 13 | if ( App::RecordStream::Operation::is_recs_operation($last->[0]) && App::RecordStream::Operation::is_recs_operation($current_command->[0]) ) { | 
| 81 | 3 |  |  |  |  | 10 | $message .= "Passed in memory to "; | 
| 82 |  |  |  |  |  |  | } | 
| 83 |  |  |  |  |  |  | else { | 
| 84 | 1 |  |  |  |  | 2 | $message .= "Passed through a pipe to "; | 
| 85 | 1 |  |  |  |  | 3 | $$indent++; | 
| 86 |  |  |  |  |  |  | } | 
| 87 |  |  |  |  |  |  | } | 
| 88 |  |  |  |  |  |  |  | 
| 89 | 5 |  |  |  |  | 15 | my $prefix = '  ' x $$indent . $message; | 
| 90 |  |  |  |  |  |  |  | 
| 91 | 5 | 100 |  |  |  | 15 | if ( App::RecordStream::Operation::is_recs_operation($current_command->[0]) ) { | 
| 92 | 4 |  |  |  |  | 26 | $this->push_line($prefix . "Recs command: " . join(' ', @$current_command)); | 
| 93 | 4 |  |  |  |  | 9 | return 0; | 
| 94 |  |  |  |  |  |  | } | 
| 95 |  |  |  |  |  |  | else { | 
| 96 | 1 |  |  |  |  | 7 | $this->push_line($prefix . "Shell command: " . join(' ', @$current_command)); | 
| 97 | 1 |  |  |  |  | 3 | return 1; | 
| 98 |  |  |  |  |  |  | } | 
| 99 |  |  |  |  |  |  | } | 
| 100 |  |  |  |  |  |  |  | 
| 101 |  |  |  |  |  |  | sub setup_operations { | 
| 102 | 0 |  |  | 0 | 0 | 0 | my $this       = shift; | 
| 103 | 0 |  |  |  |  | 0 | my $operations = shift; | 
| 104 |  |  |  |  |  |  |  | 
| 105 |  |  |  |  |  |  | # others need this | 
| 106 | 0 |  |  |  |  | 0 | $operations = [@$operations]; | 
| 107 |  |  |  |  |  |  |  | 
| 108 | 0 |  |  |  |  | 0 | my ($first_operation, $last_operation, $continuation_pid); | 
| 109 | 0 |  |  |  |  | 0 | while ( my $operation = shift @$operations ) { | 
| 110 | 0 | 0 |  |  |  | 0 | if ( $operation->[0] eq 'SHELL' ) { | 
|  |  | 0 |  |  |  |  |  | 
| 111 | 0 |  |  |  |  | 0 | my $in_continuation; | 
| 112 | 0 |  |  |  |  | 0 | ($in_continuation, $continuation_pid) = $this->setup_fork($operation->[1]); | 
| 113 |  |  |  |  |  |  |  | 
| 114 | 0 | 0 |  |  |  | 0 | if ( $in_continuation ) { | 
| 115 | 0 |  |  |  |  | 0 | $first_operation  = undef; | 
| 116 | 0 |  |  |  |  | 0 | $last_operation   = undef; | 
| 117 | 0 |  |  |  |  | 0 | $continuation_pid = undef; | 
| 118 | 0 |  |  |  |  | 0 | next; | 
| 119 |  |  |  |  |  |  | } | 
| 120 |  |  |  |  |  |  | else { | 
| 121 | 0 |  |  |  |  | 0 | last; | 
| 122 |  |  |  |  |  |  | } | 
| 123 |  |  |  |  |  |  | } | 
| 124 |  |  |  |  |  |  | elsif ( $operation->[0] eq 'RECS' ) { | 
| 125 |  |  |  |  |  |  | # fall through | 
| 126 |  |  |  |  |  |  | } | 
| 127 |  |  |  |  |  |  | else { | 
| 128 | 0 |  |  |  |  | 0 | die; | 
| 129 |  |  |  |  |  |  | } | 
| 130 |  |  |  |  |  |  |  | 
| 131 | 0 |  | 0 |  |  | 0 | $first_operation ||= $operation; | 
| 132 | 0 |  |  |  |  | 0 | $last_operation = $operation; | 
| 133 |  |  |  |  |  |  | } | 
| 134 |  |  |  |  |  |  |  | 
| 135 |  |  |  |  |  |  | # we return $continuation_pid so we can wait on it, we can wait on our shell | 
| 136 |  |  |  |  |  |  | # child via close(STDOUT) | 
| 137 | 0 |  |  |  |  | 0 | return ($first_operation, $last_operation, $continuation_pid); | 
| 138 |  |  |  |  |  |  | } | 
| 139 |  |  |  |  |  |  |  | 
| 140 |  |  |  |  |  |  | sub create_operations { | 
| 141 | 0 |  |  | 0 | 0 | 0 | my $this = shift; | 
| 142 | 0 |  |  |  |  | 0 | my $args = shift; | 
| 143 |  |  |  |  |  |  |  | 
| 144 | 0 |  |  |  |  | 0 | my @single_command; | 
| 145 |  |  |  |  |  |  | my @operations; | 
| 146 | 0 |  |  |  |  | 0 | foreach my $arg ( @$args ) { | 
| 147 | 0 | 0 |  |  |  | 0 | if ( $arg eq '|' ) { | 
| 148 | 0 |  |  |  |  | 0 | $this->add_operation(\@single_command, \@operations); | 
| 149 | 0 |  |  |  |  | 0 | @single_command = (); | 
| 150 | 0 |  |  |  |  | 0 | next; | 
| 151 |  |  |  |  |  |  | } | 
| 152 |  |  |  |  |  |  |  | 
| 153 | 0 |  |  |  |  | 0 | push @single_command, $arg; | 
| 154 |  |  |  |  |  |  | } | 
| 155 |  |  |  |  |  |  |  | 
| 156 | 0 |  |  |  |  | 0 | $this->add_operation(\@single_command, \@operations); | 
| 157 |  |  |  |  |  |  |  | 
| 158 | 0 |  |  |  |  | 0 | return \@operations; | 
| 159 |  |  |  |  |  |  | } | 
| 160 |  |  |  |  |  |  |  | 
| 161 |  |  |  |  |  |  | sub add_operation { | 
| 162 | 0 |  |  | 0 | 0 | 0 | my $this           = shift; | 
| 163 | 0 |  |  |  |  | 0 | my $single_command = shift; | 
| 164 | 0 |  |  |  |  | 0 | my $operations     = shift; | 
| 165 |  |  |  |  |  |  |  | 
| 166 | 0 |  |  |  |  | 0 | my $idx = @$operations; | 
| 167 | 0 |  |  |  |  | 0 | my $push_shim = App::RecordStream::Operation::chain::PushShim->new($operations, $idx); | 
| 168 |  |  |  |  |  |  |  | 
| 169 | 0 | 0 |  |  |  | 0 | if ( App::RecordStream::Operation::is_recs_operation($single_command->[0]) ) { | 
| 170 | 0 |  |  |  |  | 0 | my ($sc1, @args) = @$single_command; | 
| 171 | 0 |  |  |  |  | 0 | my $operation = App::RecordStream::Operation::create_operation($sc1, \@args, $push_shim); | 
| 172 | 0 |  |  |  |  | 0 | push @$operations, ['RECS', $operation, \@args]; | 
| 173 |  |  |  |  |  |  | } | 
| 174 |  |  |  |  |  |  | else { | 
| 175 | 0 |  |  |  |  | 0 | push @$operations, ['SHELL', [@$single_command]]; | 
| 176 |  |  |  |  |  |  | } | 
| 177 |  |  |  |  |  |  | } | 
| 178 |  |  |  |  |  |  |  | 
| 179 |  |  |  |  |  |  | sub setup_fork { | 
| 180 | 0 |  |  | 0 | 0 | 0 | my $this              = shift; | 
| 181 | 0 |  |  |  |  | 0 | my $command_arguments = shift; | 
| 182 |  |  |  |  |  |  |  | 
| 183 | 0 |  |  |  |  | 0 | my $continuation_pid = open(STDOUT, "|-"); | 
| 184 | 0 | 0 |  |  |  | 0 | die "cannot fork: $!" unless defined $continuation_pid; | 
| 185 |  |  |  |  |  |  |  | 
| 186 | 0 | 0 |  |  |  | 0 | if ( ! $continuation_pid ) { | 
| 187 |  |  |  |  |  |  | # in continuation | 
| 188 | 0 |  |  |  |  | 0 | return 1; | 
| 189 |  |  |  |  |  |  | } | 
| 190 |  |  |  |  |  |  |  | 
| 191 |  |  |  |  |  |  | # in parent, now split off the shell command as well | 
| 192 | 0 |  |  |  |  | 0 | my $shell_pid = open(STDOUT, "|-"); | 
| 193 | 0 | 0 |  |  |  | 0 | die "cannot fork: $!" unless defined $shell_pid; | 
| 194 |  |  |  |  |  |  |  | 
| 195 | 0 | 0 |  |  |  | 0 | if ( ! $shell_pid ) { | 
| 196 |  |  |  |  |  |  | # the child runs the shell command | 
| 197 | 0 |  |  |  |  | 0 | exec (@$command_arguments); | 
| 198 |  |  |  |  |  |  | } | 
| 199 |  |  |  |  |  |  |  | 
| 200 |  |  |  |  |  |  | # still in parent, we're responsible for the children so we keep the PID | 
| 201 |  |  |  |  |  |  | # around (shell pid can be waited for via close(STDOUT)). | 
| 202 | 0 |  |  |  |  | 0 | return (0, $continuation_pid); | 
| 203 |  |  |  |  |  |  | } | 
| 204 |  |  |  |  |  |  |  | 
| 205 |  |  |  |  |  |  | sub wants_input { | 
| 206 | 0 |  |  | 0 | 0 | 0 | return 0; | 
| 207 |  |  |  |  |  |  | } | 
| 208 |  |  |  |  |  |  |  | 
| 209 |  |  |  |  |  |  | sub stream_done { | 
| 210 | 1 |  |  | 1 | 0 | 3 | my $this = shift; | 
| 211 |  |  |  |  |  |  |  | 
| 212 | 1 | 50 |  |  |  | 4 | if ( my $args = $this->{'SAVED_ARGS'} ) { | 
| 213 | 1 |  |  |  |  | 4 | $this->print_chain($args); | 
| 214 |  |  |  |  |  |  | } | 
| 215 |  |  |  |  |  |  |  | 
| 216 | 1 | 50 |  |  |  | 5 | if ( $this->{'DRY_RUN'} ) { | 
| 217 | 1 |  |  |  |  | 2 | return; | 
| 218 |  |  |  |  |  |  | } | 
| 219 |  |  |  |  |  |  |  | 
| 220 | 0 |  |  |  |  | 0 | my $head = $this->{'CHAIN_HEAD'}; | 
| 221 |  |  |  |  |  |  |  | 
| 222 | 0 | 0 |  |  |  | 0 | if ( $head ) { | 
| 223 | 0 |  |  |  |  | 0 | my $head_operation = $head->[1]; | 
| 224 | 0 |  |  |  |  | 0 | my $head_args = $head->[2]; | 
| 225 | 0 | 0 |  |  |  | 0 | if ( $head_operation->wants_input() ) { | 
| 226 | 0 |  |  |  |  | 0 | local @ARGV = @$head_args; | 
| 227 | 0 |  |  |  |  | 0 | while(my $line = <>) { | 
| 228 | 0 |  |  |  |  | 0 | chomp $line; | 
| 229 | 0 |  |  |  |  | 0 | App::RecordStream::Operation::set_current_filename($ARGV); | 
| 230 | 0 | 0 |  |  |  | 0 | if ( ! $head_operation->accept_line($line) ) { | 
| 231 | 0 |  |  |  |  | 0 | last; | 
| 232 |  |  |  |  |  |  | } | 
| 233 |  |  |  |  |  |  | } | 
| 234 |  |  |  |  |  |  | } | 
| 235 | 0 |  |  |  |  | 0 | $head_operation->finish(); | 
| 236 |  |  |  |  |  |  | } | 
| 237 |  |  |  |  |  |  | else { | 
| 238 | 0 |  |  |  |  | 0 | while(<>) { | 
| 239 | 0 |  |  |  |  | 0 | chomp; | 
| 240 | 0 |  |  |  |  | 0 | $this->push_line($_); | 
| 241 |  |  |  |  |  |  | } | 
| 242 |  |  |  |  |  |  | } | 
| 243 |  |  |  |  |  |  |  | 
| 244 |  |  |  |  |  |  | # wait for shell child (if we even have one) | 
| 245 | 0 |  |  |  |  | 0 | close(STDOUT); | 
| 246 |  |  |  |  |  |  |  | 
| 247 |  |  |  |  |  |  | # wait for possible other child (next sequence of recs operations) | 
| 248 | 0 |  |  |  |  | 0 | my $continuation_pid = $this->{'CONTINUATION_PID'}; | 
| 249 | 0 | 0 |  |  |  | 0 | if ( $continuation_pid ) { | 
| 250 |  |  |  |  |  |  | # We have recs operation processes wait for the next recs operation | 
| 251 |  |  |  |  |  |  | # process to the "right" in the chain.  so that the left-most is last to | 
| 252 |  |  |  |  |  |  | # exit.  The shell is waiting on this left-most process to exit so this | 
| 253 |  |  |  |  |  |  | # exit order ensures everyone finishes up before the shell notice the | 
| 254 |  |  |  |  |  |  | # left-most PID is done and resumes control. | 
| 255 | 0 |  |  |  |  | 0 | waitpid $continuation_pid, 0; | 
| 256 |  |  |  |  |  |  | } | 
| 257 |  |  |  |  |  |  | else { | 
| 258 |  |  |  |  |  |  | # no next sequence, we must be the last sequence of recs operations | 
| 259 |  |  |  |  |  |  | } | 
| 260 |  |  |  |  |  |  |  | 
| 261 |  |  |  |  |  |  | } | 
| 262 |  |  |  |  |  |  |  | 
| 263 |  |  |  |  |  |  | sub get_exit_value { | 
| 264 | 0 |  |  | 0 | 0 | 0 | my $this = shift; | 
| 265 |  |  |  |  |  |  |  | 
| 266 | 0 | 0 |  |  |  | 0 | if ( my $tail = $this->{'CHAIN_TAIL'} ) { | 
| 267 | 0 |  |  |  |  | 0 | return $tail->[1]->get_exit_value(); | 
| 268 |  |  |  |  |  |  | } | 
| 269 |  |  |  |  |  |  |  | 
| 270 | 0 |  |  |  |  | 0 | return 0; | 
| 271 |  |  |  |  |  |  | } | 
| 272 |  |  |  |  |  |  |  | 
| 273 |  |  |  |  |  |  | sub add_help_types { | 
| 274 | 1 |  |  | 1 | 0 | 3 | my $this = shift; | 
| 275 | 1 |  |  |  |  | 5 | $this->use_help_type('keyspecs'); | 
| 276 |  |  |  |  |  |  | } | 
| 277 |  |  |  |  |  |  |  | 
| 278 |  |  |  |  |  |  | sub usage { | 
| 279 | 0 |  |  | 0 | 0 |  | my $this = shift; | 
| 280 |  |  |  |  |  |  |  | 
| 281 | 0 |  |  |  |  |  | my $options = [ | 
| 282 |  |  |  |  |  |  | ['show-chain', 'Before running the commands, print out what will happen in the chain'], | 
| 283 |  |  |  |  |  |  | ['n', 'Do not run commands, implies --show-chain'], | 
| 284 |  |  |  |  |  |  | ]; | 
| 285 |  |  |  |  |  |  |  | 
| 286 | 0 |  |  |  |  |  | my $args_string = $this->options_string($options); | 
| 287 |  |  |  |  |  |  |  | 
| 288 | 0 |  |  |  |  |  | return < | 
| 289 |  |  |  |  |  |  | Usage: recs-chain  |  | ... | 
| 290 |  |  |  |  |  |  | __FORMAT_TEXT__ | 
| 291 |  |  |  |  |  |  | Creates an in-memory chain of recs operations.  This avoid serialization and | 
| 292 |  |  |  |  |  |  | deserialization of records at each step in a complex recs pipeline.  For | 
| 293 |  |  |  |  |  |  | ease of use the chain of recs commands main contain non-recs command, | 
| 294 |  |  |  |  |  |  | anything that does not start with a recs- is interpreted as a shell command. | 
| 295 |  |  |  |  |  |  | That command is forked off to the shell.  In this case, serialization and | 
| 296 |  |  |  |  |  |  | deserialization costs apply, but only to and from the shell command, | 
| 297 |  |  |  |  |  |  | everything else is done in memory.  If you have many shell commands in a | 
| 298 |  |  |  |  |  |  | row, there is extra over head, you should instead consider splitting those | 
| 299 |  |  |  |  |  |  | into separate pipes.  See the examples for more information on this. | 
| 300 |  |  |  |  |  |  |  | 
| 301 |  |  |  |  |  |  | Arugments are specified in on the command line separated by pipes.  For most | 
| 302 |  |  |  |  |  |  | shells, you will need to escape the pipe character to avoid having the shell | 
| 303 |  |  |  |  |  |  | interpret the pipe as a shell pipe. | 
| 304 |  |  |  |  |  |  | __FORMAT_TEXT__ | 
| 305 |  |  |  |  |  |  |  | 
| 306 |  |  |  |  |  |  | $args_string | 
| 307 |  |  |  |  |  |  |  | 
| 308 |  |  |  |  |  |  | Examples: | 
| 309 |  |  |  |  |  |  | Parse some fields, sort and collate, all in memory | 
| 310 |  |  |  |  |  |  | recs-chain recs-frommultire 'data,time=(\\S+) (\\S+)' \\| recs-sort --key time=n \\| recs-collate --a perc,90,data | 
| 311 |  |  |  |  |  |  | Use shell commands in your recs stream | 
| 312 |  |  |  |  |  |  | recs-chain recs-frommultire 'data,time=(\\S+) (\\S+)' \\| recs-sort --key time=n \\| grep foo \\| recs-collate --a perc,90,data | 
| 313 |  |  |  |  |  |  | Many shell commands should be split into real pipes | 
| 314 |  |  |  |  |  |  | recs-chain recs-frommultire 'data,time=(\\S+) (\\S+)' \\| recs-xform '\$r->{now} = time();' \ | 
| 315 |  |  |  |  |  |  | | grep foo | sort | uniq | recs-chain recs-collate --a perc,90,data \\| recs-totable | 
| 316 |  |  |  |  |  |  | USAGE | 
| 317 |  |  |  |  |  |  | } | 
| 318 |  |  |  |  |  |  |  | 
| 319 |  |  |  |  |  |  | package App::RecordStream::Operation::chain::PushShim; | 
| 320 |  |  |  |  |  |  |  | 
| 321 | 2 |  |  | 2 |  | 16 | use App::RecordStream::Stream::Base; | 
|  | 2 |  |  |  |  | 4 |  | 
|  | 2 |  |  |  |  | 32 |  | 
| 322 |  |  |  |  |  |  |  | 
| 323 | 2 |  |  | 2 |  | 8 | use base 'App::RecordStream::Stream::Base'; | 
|  | 2 |  |  |  |  | 3 |  | 
|  | 2 |  |  |  |  | 509 |  | 
| 324 |  |  |  |  |  |  |  | 
| 325 |  |  |  |  |  |  | sub new { | 
| 326 | 0 |  |  | 0 |  |  | my $class = shift; | 
| 327 | 0 |  |  |  |  |  | my $operations = shift; | 
| 328 | 0 |  |  |  |  |  | my $idx = shift; | 
| 329 |  |  |  |  |  |  |  | 
| 330 | 0 |  |  |  |  |  | my $this = { | 
| 331 |  |  |  |  |  |  | OPERATIONS => $operations, | 
| 332 |  |  |  |  |  |  | IDX => $idx, | 
| 333 |  |  |  |  |  |  | }; | 
| 334 |  |  |  |  |  |  |  | 
| 335 | 0 |  |  |  |  |  | bless $this, $class; | 
| 336 |  |  |  |  |  |  |  | 
| 337 | 0 |  |  |  |  |  | return $this; | 
| 338 |  |  |  |  |  |  | } | 
| 339 |  |  |  |  |  |  |  | 
| 340 |  |  |  |  |  |  | sub accept_record { | 
| 341 | 0 |  |  | 0 |  |  | my $this = shift; | 
| 342 | 0 |  |  |  |  |  | my $record = shift; | 
| 343 |  |  |  |  |  |  |  | 
| 344 | 0 |  |  |  |  |  | return $this->_get_delegate()->accept_record($record); | 
| 345 |  |  |  |  |  |  | } | 
| 346 |  |  |  |  |  |  |  | 
| 347 |  |  |  |  |  |  | sub accept_line { | 
| 348 | 0 |  |  | 0 |  |  | my $this = shift; | 
| 349 | 0 |  |  |  |  |  | my $line = shift; | 
| 350 |  |  |  |  |  |  |  | 
| 351 | 0 |  |  |  |  |  | return $this->_get_delegate()->accept_line($line); | 
| 352 |  |  |  |  |  |  | } | 
| 353 |  |  |  |  |  |  |  | 
| 354 |  |  |  |  |  |  | sub finish { | 
| 355 | 0 |  |  | 0 |  |  | my $this = shift; | 
| 356 |  |  |  |  |  |  |  | 
| 357 | 0 |  |  |  |  |  | $this->_get_delegate()->finish(); | 
| 358 |  |  |  |  |  |  | } | 
| 359 |  |  |  |  |  |  |  | 
| 360 |  |  |  |  |  |  | sub _get_delegate { | 
| 361 | 0 |  |  | 0 |  |  | my $this = shift; | 
| 362 |  |  |  |  |  |  |  | 
| 363 | 0 |  |  |  |  |  | my $delegate = $this->{'DELEGATE'}; | 
| 364 |  |  |  |  |  |  |  | 
| 365 | 0 | 0 |  |  |  |  | if ( ! $delegate ) { | 
| 366 | 0 |  |  |  |  |  | my $operations = $this->{'OPERATIONS'}; | 
| 367 | 0 |  |  |  |  |  | my $idx = $this->{'IDX'}; | 
| 368 | 0 | 0 |  |  |  |  | if ( $idx + 1 < @$operations ) { | 
| 369 | 0 |  |  |  |  |  | my $next_operation = $operations->[$idx + 1]; | 
| 370 | 0 | 0 |  |  |  |  | if ( $next_operation->[0] eq 'RECS' ) { | 
|  |  | 0 |  |  |  |  |  | 
| 371 |  |  |  |  |  |  | # next operation is actually a recs operation, it must be | 
| 372 |  |  |  |  |  |  | # in process with us, give it our output | 
| 373 | 0 |  |  |  |  |  | $delegate = $next_operation->[1]; | 
| 374 |  |  |  |  |  |  | } | 
| 375 |  |  |  |  |  |  | elsif ($next_operation->[0] eq 'SHELL' ) { | 
| 376 |  |  |  |  |  |  | # next is shell, we're set up to print to it | 
| 377 | 0 |  |  |  |  |  | $delegate = App::RecordStream::Stream::Printer->new(); | 
| 378 |  |  |  |  |  |  | } | 
| 379 |  |  |  |  |  |  | else { | 
| 380 | 0 |  |  |  |  |  | die; | 
| 381 |  |  |  |  |  |  | } | 
| 382 |  |  |  |  |  |  | } | 
| 383 |  |  |  |  |  |  | else { | 
| 384 |  |  |  |  |  |  | # at the end, we print out | 
| 385 | 0 |  |  |  |  |  | $delegate = App::RecordStream::Stream::Printer->new(); | 
| 386 |  |  |  |  |  |  | } | 
| 387 |  |  |  |  |  |  |  | 
| 388 | 0 |  |  |  |  |  | $this->{'DELEGATE'} = $delegate; | 
| 389 |  |  |  |  |  |  | } | 
| 390 |  |  |  |  |  |  |  | 
| 391 | 0 |  |  |  |  |  | return $delegate; | 
| 392 |  |  |  |  |  |  | } | 
| 393 |  |  |  |  |  |  |  | 
| 394 |  |  |  |  |  |  | 1; |