File Coverage

blib/lib/App/RecordStream/Operation/collate/BaseClumperCallback.pm
Criterion Covered Total %
statement 42 43 97.6
branch 7 8 87.5
condition n/a
subroutine 8 8 100.0
pod 0 4 0.0
total 57 63 90.4


line stmt bran cond sub pod time code
1             package App::RecordStream::Operation::collate::BaseClumperCallback;
2              
3 4     4   25 use strict;
  4         11  
  4         109  
4 4     4   22 use warnings;
  4         9  
  4         111  
5              
6 4     4   22 use App::RecordStream::Aggregator;
  4         9  
  4         99  
7 4     4   21 use App::RecordStream::Record;
  4         10  
  4         1545  
8              
9             sub new {
10 17     17 0 50 my $class = shift;
11 17         52 my $aggregators = shift;
12 17         32 my $incremental = shift;
13 17         39 my $bucket = shift;
14 17         32 my $record_cb = shift;
15              
16 17         104 my $this = {
17             'AGGREGATORS' => $aggregators,
18             'INCREMENTAL' => $incremental,
19             'BUCKET' => $bucket,
20             'RECORD_CB' => $record_cb,
21             };
22 17         46 bless $this, $class;
23              
24 17         91 return $this;
25             }
26              
27             sub clumper_callback_begin {
28 70     70 0 192 my $this = shift;
29 70         356 my $bucket = shift;
30              
31 70 100       444 return [$bucket, $this->{'BUCKET'} ? undef : [], App::RecordStream::Aggregator::map_initial($this->{'AGGREGATORS'})];
32             }
33              
34             sub clumper_callback_push_record {
35 147     147 0 433 my $this = shift;
36 147         267 my $cookie = shift;
37 147         344 my $record = shift;
38              
39 147 100       398 push @{$cookie->[1]}, $record if(!$this->{'BUCKET'});
  5         17  
40 147         500 $cookie->[2] = App::RecordStream::Aggregator::map_combine($this->{'AGGREGATORS'}, $cookie->[2], $record);
41              
42 147 50       975 if($this->{'INCREMENTAL'}) {
43 0         0 $this->clumper_callback_end($cookie);
44             }
45             }
46              
47             sub clumper_callback_end {
48 70     70 0 155 my $this = shift;
49 70         133 my $cookie = shift;
50              
51 70 100       239 for my $proto_result ($this->{'BUCKET'} ? ($cookie->[0]) : @{$cookie->[1]}) {
  2         7  
52             my $result = {
53             # first, the bucket or original record
54             %$proto_result,
55              
56             # then, the aggregators
57 73         231 %{App::RecordStream::Aggregator::map_squish($this->{'AGGREGATORS'}, $cookie->[2])},
  73         257  
58             };
59              
60 73         390 my $record = App::RecordStream::Record->new();
61              
62 73         254 for my $key (keys(%$result))
63             {
64 205         496 my $value = $result->{$key};
65              
66 205         372 ${$record->guess_key_from_spec($key)} = $value;
  205         608  
67             }
68              
69 73         290 $this->{'RECORD_CB'}->($record);
70             }
71             }
72              
73             1;