File Coverage

blib/lib/App/RecordStream/Operation/collate/BaseClumperCallback.pm
Criterion Covered Total %
statement 42 43 97.6
branch 7 8 87.5
condition n/a
subroutine 8 8 100.0
pod 0 4 0.0
total 57 63 90.4


line stmt bran cond sub pod time code
1             package App::RecordStream::Operation::collate::BaseClumperCallback;
2              
3 4     4   22 use strict;
  4         7  
  4         100  
4 4     4   16 use warnings;
  4         6  
  4         80  
5              
6 4     4   16 use App::RecordStream::Aggregator;
  4         7  
  4         57  
7 4     4   14 use App::RecordStream::Record;
  4         7  
  4         1273  
8              
9             sub new {
10 17     17 0 68 my $class = shift;
11 17         42 my $aggregators = shift;
12 17         42 my $incremental = shift;
13 17         42 my $bucket = shift;
14 17         36 my $record_cb = shift;
15              
16 17         108 my $this = {
17             'AGGREGATORS' => $aggregators,
18             'INCREMENTAL' => $incremental,
19             'BUCKET' => $bucket,
20             'RECORD_CB' => $record_cb,
21             };
22 17         48 bless $this, $class;
23              
24 17         88 return $this;
25             }
26              
27             sub clumper_callback_begin {
28 70     70 0 126 my $this = shift;
29 70         113 my $bucket = shift;
30              
31 70 100       263 return [$bucket, $this->{'BUCKET'} ? undef : [], App::RecordStream::Aggregator::map_initial($this->{'AGGREGATORS'})];
32             }
33              
34             sub clumper_callback_push_record {
35 147     147 0 215 my $this = shift;
36 147         207 my $cookie = shift;
37 147         202 my $record = shift;
38              
39 147 100       337 push @{$cookie->[1]}, $record if(!$this->{'BUCKET'});
  5         10  
40 147         362 $cookie->[2] = App::RecordStream::Aggregator::map_combine($this->{'AGGREGATORS'}, $cookie->[2], $record);
41              
42 147 50       581 if($this->{'INCREMENTAL'}) {
43 0         0 $this->clumper_callback_end($cookie);
44             }
45             }
46              
47             sub clumper_callback_end {
48 70     70 0 118 my $this = shift;
49 70         118 my $cookie = shift;
50              
51 70 100       186 for my $proto_result ($this->{'BUCKET'} ? ($cookie->[0]) : @{$cookie->[1]}) {
  2         6  
52             my $result = {
53             # first, the bucket or original record
54             %$proto_result,
55              
56             # then, the aggregators
57 73         183 %{App::RecordStream::Aggregator::map_squish($this->{'AGGREGATORS'}, $cookie->[2])},
  73         192  
58             };
59              
60 73         260 my $record = App::RecordStream::Record->new();
61              
62 73         184 for my $key (keys(%$result))
63             {
64 205         328 my $value = $result->{$key};
65              
66 205         284 ${$record->guess_key_from_spec($key)} = $value;
  205         378  
67             }
68              
69 73         244 $this->{'RECORD_CB'}->($record);
70             }
71             }
72              
73             1;