File Coverage

blib/lib/App/RecordStream/Operation/multiplex/BaseClumperCallback.pm
Criterion Covered Total %
statement 48 50 96.0
branch 4 6 66.6
condition n/a
subroutine 10 10 100.0
pod 0 4 0.0
total 62 70 88.5


line stmt bran cond sub pod time code
1             package App::RecordStream::Operation::multiplex::BaseClumperCallback;
2              
3 2     2   10 use strict;
  2         4  
  2         46  
4 2     2   9 use warnings;
  2         3  
  2         40  
5              
6 2     2   10 use App::RecordStream::Operation;
  2         4  
  2         39  
7 2     2   8 use App::RecordStream::Record;
  2         4  
  2         34  
8 2     2   458 use App::RecordStream::Stream::Sub;
  2         4  
  2         514  
9              
10             sub new {
11 4     4 0 11 my $class = shift;
12 4         11 my $script = shift;
13 4         6 my $args = shift;
14 4         8 my $line_key = shift;
15 4         8 my $record_cb = shift;
16 4         8 my $line_cb = shift;
17              
18 4         21 my $this = {
19             'SCRIPT' => $script,
20             'ARGS' => $args,
21             'LINE_KEY' => $line_key,
22             'RECORD_CB' => $record_cb,
23             'LINE_CB' => $line_cb,
24             };
25 4         11 bless $this, $class;
26              
27 4         17 return $this;
28             }
29              
30             sub clumper_callback_begin {
31 8     8 0 16 my $this = shift;
32 8         12 my $bucket = shift;
33              
34 8         15 my $record_cb = $this->{'RECORD_CB'};
35             my $record_cb2 = sub {
36 14     14   23 my $r = shift;
37              
38 14         55 return $record_cb->(App::RecordStream::Record->new(%$r, %$bucket));
39 8         41 };
40 8         63 my $next = App::RecordStream::Stream::Sub->new($record_cb2, $this->{'LINE_CB'});
41 8         14 my @args = @{$this->{'ARGS'}};
  8         19  
42 8         31 my $op = App::RecordStream::Operation::create_operation($this->{'SCRIPT'}, \@args, $next);
43              
44 8 50       22 if(@args) {
45 0         0 die "Extra options to multiplex operation.";
46             }
47 8 50       33 if(!$op->wants_input()) {
48 0         0 die "Multiplex operation must want input.";
49             }
50              
51 8         42 return $op;
52             }
53              
54             sub clumper_callback_push_record {
55 32     32 0 51 my $this = shift;
56 32         41 my $op = shift;
57 32         51 my $record = shift;
58              
59 32         50 my $line_key = $this->{'LINE_KEY'};
60 32 100       71 if(defined($line_key)) {
61 8         12 $op->accept_line(${$record->guess_key_from_spec($line_key)});
  8         19  
62             }
63             else {
64 24         60 $op->accept_record($record);
65             }
66             }
67              
68             sub clumper_callback_end {
69 8     8 0 12 my $this = shift;
70 8         15 my $op = shift;
71 8         32 $op->finish();
72             }
73              
74             1;