File Coverage

blib/lib/App/RecordStream/Operation/chain.pm
Criterion Covered Total %
statement 72 184 39.1
branch 14 56 25.0
condition 2 6 33.3
subroutine 13 25 52.0
pod 0 12 0.0
total 101 283 35.6


line stmt bran cond sub pod time code
1             package App::RecordStream::Operation::chain;
2              
3             our $VERSION = "4.0.25";
4              
5 2     2   969 use strict;
  2         3  
  2         43  
6 2     2   7 use warnings;
  2         3  
  2         36  
7              
8 2     2   7 use App::RecordStream::Operation;
  2         3  
  2         32  
9 2     2   7 use App::RecordStream::Stream::Printer;
  2         3  
  2         31  
10              
11 2     2   6 use base qw(App::RecordStream::Operation);
  2         4  
  2         2225  
12              
13             sub init {
14 1     1 0 2 my $this = shift;
15 1         2 my $args = shift;
16              
17 1         2 my ($show_chain, $dry_run);
18             my $spec = {
19             'show-chain' => \$show_chain,
20 1     1   505 'n' => sub { $show_chain = 1; $dry_run = 1; },
  1         3  
21 1         7 };
22              
23 1         6 $this->parse_options($args, $spec, ['require_order']);
24              
25 1 50       4 return unless (@$args);
26              
27 1 50       4 unless ( App::RecordStream::Operation::is_recs_operation($args->[0]) ) {
28 0         0 die "First chained command must be standard recs command not shell command!\n";
29             }
30              
31 1 50       6 $this->{'SAVED_ARGS'} = [@$args] if ( $show_chain );
32              
33 1 50       3 if ( $dry_run ) {
34 1         3 $this->{'DRY_RUN'} = $dry_run;
35 1         8 return;
36             }
37              
38 0         0 my $operations = $this->create_operations($args);
39              
40 0         0 my ($first_operation, $last_operation, $continuation_pid) = $this->setup_operations($operations);
41              
42 0         0 $this->{'CHAIN_HEAD'} = $first_operation;
43 0         0 $this->{'CHAIN_TAIL'} = $last_operation;
44 0         0 $this->{'CONTINUATION_PID'} = $continuation_pid;
45             }
46              
47             sub print_chain {
48 1     1 0 2 my $this = shift;
49 1         2 my $args = shift;
50              
51 1         2 my @current_command;
52 1         1 my $was_shell = 0;
53              
54 1         5 $this->push_line("Chain Starts with:");
55              
56 1         2 my $indent = 1;
57 1         2 my $last;
58 1         3 foreach my $arg ( @$args ) {
59 13 100       23 if ( $arg eq '|' ) {
60 4         9 $was_shell = $this->print_command(\@current_command, $last, \$indent);
61              
62 4         11 $last = [@current_command];
63 4         8 @current_command = ();
64 4         7 next;
65             }
66 9         14 push @current_command, $arg;
67             }
68              
69 1         4 $this->print_command(\@current_command, $last, \$indent);
70             }
71              
72             sub print_command {
73 5     5 0 8 my $this = shift;
74 5         6 my $current_command = shift;
75 5         6 my $last = shift;
76 5         6 my $indent = shift;
77              
78 5         7 my $message = '';
79 5 100       8 if ( defined $last ) {
80 4 100 66     10 if ( App::RecordStream::Operation::is_recs_operation($last->[0]) && App::RecordStream::Operation::is_recs_operation($current_command->[0]) ) {
81 3         8 $message .= "Passed in memory to ";
82             }
83             else {
84 1         2 $message .= "Passed through a pipe to ";
85 1         2 $$indent++;
86             }
87             }
88              
89 5         12 my $prefix = ' ' x $$indent . $message;
90              
91 5 100       15 if ( App::RecordStream::Operation::is_recs_operation($current_command->[0]) ) {
92 4         57 $this->push_line($prefix . "Recs command: " . join(' ', @$current_command));
93 4         10 return 0;
94             }
95             else {
96 1         6 $this->push_line($prefix . "Shell command: " . join(' ', @$current_command));
97 1         4 return 1;
98             }
99             }
100              
101             sub setup_operations {
102 0     0 0 0 my $this = shift;
103 0         0 my $operations = shift;
104              
105             # others need this
106 0         0 $operations = [@$operations];
107              
108 0         0 my ($first_operation, $last_operation, $continuation_pid);
109 0         0 while ( my $operation = shift @$operations ) {
110 0 0       0 if ( $operation->[0] eq 'SHELL' ) {
    0          
111 0         0 my $in_continuation;
112 0         0 ($in_continuation, $continuation_pid) = $this->setup_fork($operation->[1]);
113              
114 0 0       0 if ( $in_continuation ) {
115 0         0 $first_operation = undef;
116 0         0 $last_operation = undef;
117 0         0 $continuation_pid = undef;
118 0         0 next;
119             }
120             else {
121 0         0 last;
122             }
123             }
124             elsif ( $operation->[0] eq 'RECS' ) {
125             # fall through
126             }
127             else {
128 0         0 die;
129             }
130              
131 0   0     0 $first_operation ||= $operation;
132 0         0 $last_operation = $operation;
133             }
134              
135             # we return $continuation_pid so we can wait on it, we can wait on our shell
136             # child via close(STDOUT)
137 0         0 return ($first_operation, $last_operation, $continuation_pid);
138             }
139              
140             sub create_operations {
141 0     0 0 0 my $this = shift;
142 0         0 my $args = shift;
143              
144 0         0 my @single_command;
145             my @operations;
146 0         0 foreach my $arg ( @$args ) {
147 0 0       0 if ( $arg eq '|' ) {
148 0         0 $this->add_operation(\@single_command, \@operations);
149 0         0 @single_command = ();
150 0         0 next;
151             }
152              
153 0         0 push @single_command, $arg;
154             }
155              
156 0         0 $this->add_operation(\@single_command, \@operations);
157              
158 0         0 return \@operations;
159             }
160              
161             sub add_operation {
162 0     0 0 0 my $this = shift;
163 0         0 my $single_command = shift;
164 0         0 my $operations = shift;
165              
166 0         0 my $idx = @$operations;
167 0         0 my $push_shim = App::RecordStream::Operation::chain::PushShim->new($operations, $idx);
168              
169 0 0       0 if ( App::RecordStream::Operation::is_recs_operation($single_command->[0]) ) {
170 0         0 my ($sc1, @args) = @$single_command;
171 0         0 my $operation = App::RecordStream::Operation::create_operation($sc1, \@args, $push_shim);
172 0         0 push @$operations, ['RECS', $operation, \@args];
173             }
174             else {
175 0         0 push @$operations, ['SHELL', [@$single_command]];
176             }
177             }
178              
179             sub setup_fork {
180 0     0 0 0 my $this = shift;
181 0         0 my $command_arguments = shift;
182              
183 0         0 my $continuation_pid = open(STDOUT, "|-");
184 0 0       0 die "cannot fork: $!" unless defined $continuation_pid;
185              
186 0 0       0 if ( ! $continuation_pid ) {
187             # in continuation
188 0         0 return 1;
189             }
190              
191             # in parent, now split off the shell command as well
192 0         0 my $shell_pid = open(STDOUT, "|-");
193 0 0       0 die "cannot fork: $!" unless defined $shell_pid;
194              
195 0 0       0 if ( ! $shell_pid ) {
196             # the child runs the shell command
197 0         0 exec (@$command_arguments);
198             }
199              
200             # still in parent, we're responsible for the children so we keep the PID
201             # around (shell pid can be waited for via close(STDOUT)).
202 0         0 return (0, $continuation_pid);
203             }
204              
205             sub wants_input {
206 0     0 0 0 return 0;
207             }
208              
209             sub stream_done {
210 1     1 0 2 my $this = shift;
211              
212 1 50       4 if ( my $args = $this->{'SAVED_ARGS'} ) {
213 1         4 $this->print_chain($args);
214             }
215              
216 1 50       4 if ( $this->{'DRY_RUN'} ) {
217 1         3 return;
218             }
219              
220 0         0 my $head = $this->{'CHAIN_HEAD'};
221              
222 0 0       0 if ( $head ) {
223 0         0 my $head_operation = $head->[1];
224 0         0 my $head_args = $head->[2];
225 0 0       0 if ( $head_operation->wants_input() ) {
226 0         0 local @ARGV = @$head_args;
227 0         0 while(my $line = <>) {
228 0         0 chomp $line;
229 0         0 App::RecordStream::Operation::set_current_filename($ARGV);
230 0 0       0 if ( ! $head_operation->accept_line($line) ) {
231 0         0 last;
232             }
233             }
234             }
235 0         0 $head_operation->finish();
236             }
237             else {
238 0         0 while(<>) {
239 0         0 chomp;
240 0         0 $this->push_line($_);
241             }
242             }
243              
244             # wait for shell child (if we even have one)
245 0         0 close(STDOUT);
246              
247             # wait for possible other child (next sequence of recs operations)
248 0         0 my $continuation_pid = $this->{'CONTINUATION_PID'};
249 0 0       0 if ( $continuation_pid ) {
250             # We have recs operation processes wait for the next recs operation
251             # process to the "right" in the chain. so that the left-most is last to
252             # exit. The shell is waiting on this left-most process to exit so this
253             # exit order ensures everyone finishes up before the shell notice the
254             # left-most PID is done and resumes control.
255 0         0 waitpid $continuation_pid, 0;
256             }
257             else {
258             # no next sequence, we must be the last sequence of recs operations
259             }
260              
261             }
262              
263             sub get_exit_value {
264 0     0 0 0 my $this = shift;
265              
266 0 0       0 if ( my $tail = $this->{'CHAIN_TAIL'} ) {
267 0         0 return $tail->[1]->get_exit_value();
268             }
269              
270 0         0 return 0;
271             }
272              
273             sub add_help_types {
274 1     1 0 3 my $this = shift;
275 1         6 $this->use_help_type('keyspecs');
276             }
277              
278             sub usage {
279 0     0 0   my $this = shift;
280              
281 0           my $options = [
282             ['show-chain', 'Before running the commands, print out what will happen in the chain'],
283             ['n', 'Do not run commands, implies --show-chain'],
284             ];
285              
286 0           my $args_string = $this->options_string($options);
287              
288 0           return <
289             Usage: recs-chain | | ...
290             __FORMAT_TEXT__
291             Creates an in-memory chain of recs operations. This avoid serialization and
292             deserialization of records at each step in a complex recs pipeline. For
293             ease of use the chain of recs commands main contain non-recs command,
294             anything that does not start with a recs- is interpreted as a shell command.
295             That command is forked off to the shell. In this case, serialization and
296             deserialization costs apply, but only to and from the shell command,
297             everything else is done in memory. If you have many shell commands in a
298             row, there is extra over head, you should instead consider splitting those
299             into separate pipes. See the examples for more information on this.
300              
301             Arugments are specified in on the command line separated by pipes. For most
302             shells, you will need to escape the pipe character to avoid having the shell
303             interpret the pipe as a shell pipe.
304             __FORMAT_TEXT__
305              
306             $args_string
307              
308             Examples:
309             Parse some fields, sort and collate, all in memory
310             recs-chain recs-frommultire 'data,time=(\\S+) (\\S+)' \\| recs-sort --key time=n \\| recs-collate --a perc,90,data
311             Use shell commands in your recs stream
312             recs-chain recs-frommultire 'data,time=(\\S+) (\\S+)' \\| recs-sort --key time=n \\| grep foo \\| recs-collate --a perc,90,data
313             Many shell commands should be split into real pipes
314             recs-chain recs-frommultire 'data,time=(\\S+) (\\S+)' \\| recs-xform '\$r->{now} = time();' \
315             | grep foo | sort | uniq | recs-chain recs-collate --a perc,90,data \\| recs-totable
316             USAGE
317             }
318              
319             package App::RecordStream::Operation::chain::PushShim;
320              
321 2     2   14 use App::RecordStream::Stream::Base;
  2         4  
  2         29  
322              
323 2     2   7 use base 'App::RecordStream::Stream::Base';
  2         2  
  2         592  
324              
325             sub new {
326 0     0     my $class = shift;
327 0           my $operations = shift;
328 0           my $idx = shift;
329              
330 0           my $this = {
331             OPERATIONS => $operations,
332             IDX => $idx,
333             };
334              
335 0           bless $this, $class;
336              
337 0           return $this;
338             }
339              
340             sub accept_record {
341 0     0     my $this = shift;
342 0           my $record = shift;
343              
344 0           return $this->_get_delegate()->accept_record($record);
345             }
346              
347             sub accept_line {
348 0     0     my $this = shift;
349 0           my $line = shift;
350              
351 0           return $this->_get_delegate()->accept_line($line);
352             }
353              
354             sub finish {
355 0     0     my $this = shift;
356              
357 0           $this->_get_delegate()->finish();
358             }
359              
360             sub _get_delegate {
361 0     0     my $this = shift;
362              
363 0           my $delegate = $this->{'DELEGATE'};
364              
365 0 0         if ( ! $delegate ) {
366 0           my $operations = $this->{'OPERATIONS'};
367 0           my $idx = $this->{'IDX'};
368 0 0         if ( $idx + 1 < @$operations ) {
369 0           my $next_operation = $operations->[$idx + 1];
370 0 0         if ( $next_operation->[0] eq 'RECS' ) {
    0          
371             # next operation is actually a recs operation, it must be
372             # in process with us, give it our output
373 0           $delegate = $next_operation->[1];
374             }
375             elsif ($next_operation->[0] eq 'SHELL' ) {
376             # next is shell, we're set up to print to it
377 0           $delegate = App::RecordStream::Stream::Printer->new();
378             }
379             else {
380 0           die;
381             }
382             }
383             else {
384             # at the end, we print out
385 0           $delegate = App::RecordStream::Stream::Printer->new();
386             }
387              
388 0           $this->{'DELEGATE'} = $delegate;
389             }
390              
391 0           return $delegate;
392             }
393              
394             1;