File Coverage

blib/lib/App/RecordStream/Operation/collate/BaseClumperCallback.pm
Criterion Covered Total %
statement 42 43 97.6
branch 7 8 87.5
condition n/a
subroutine 8 8 100.0
pod 0 4 0.0
total 57 63 90.4


line stmt bran cond sub pod time code
1             package App::RecordStream::Operation::collate::BaseClumperCallback;
2              
3 4     4   25 use strict;
  4         9  
  4         93  
4 4     4   17 use warnings;
  4         9  
  4         81  
5              
6 4     4   19 use App::RecordStream::Aggregator;
  4         9  
  4         71  
7 4     4   18 use App::RecordStream::Record;
  4         7  
  4         1137  
8              
9             sub new {
10 17     17 0 50 my $class = shift;
11 17         35 my $aggregators = shift;
12 17         35 my $incremental = shift;
13 17         38 my $bucket = shift;
14 17         34 my $record_cb = shift;
15              
16 17         78 my $this = {
17             'AGGREGATORS' => $aggregators,
18             'INCREMENTAL' => $incremental,
19             'BUCKET' => $bucket,
20             'RECORD_CB' => $record_cb,
21             };
22 17         48 bless $this, $class;
23              
24 17         76 return $this;
25             }
26              
27             sub clumper_callback_begin {
28 70     70 0 128 my $this = shift;
29 70         278 my $bucket = shift;
30              
31 70 100       296 return [$bucket, $this->{'BUCKET'} ? undef : [], App::RecordStream::Aggregator::map_initial($this->{'AGGREGATORS'})];
32             }
33              
34             sub clumper_callback_push_record {
35 147     147 0 257 my $this = shift;
36 147         247 my $cookie = shift;
37 147         241 my $record = shift;
38              
39 147 100       348 push @{$cookie->[1]}, $record if(!$this->{'BUCKET'});
  5         11  
40 147         416 $cookie->[2] = App::RecordStream::Aggregator::map_combine($this->{'AGGREGATORS'}, $cookie->[2], $record);
41              
42 147 50       702 if($this->{'INCREMENTAL'}) {
43 0         0 $this->clumper_callback_end($cookie);
44             }
45             }
46              
47             sub clumper_callback_end {
48 70     70 0 128 my $this = shift;
49 70         125 my $cookie = shift;
50              
51 70 100       217 for my $proto_result ($this->{'BUCKET'} ? ($cookie->[0]) : @{$cookie->[1]}) {
  2         8  
52             my $result = {
53             # first, the bucket or original record
54             %$proto_result,
55              
56             # then, the aggregators
57 73         195 %{App::RecordStream::Aggregator::map_squish($this->{'AGGREGATORS'}, $cookie->[2])},
  73         225  
58             };
59              
60 73         293 my $record = App::RecordStream::Record->new();
61              
62 73         197 for my $key (keys(%$result))
63             {
64 205         391 my $value = $result->{$key};
65              
66 205         327 ${$record->guess_key_from_spec($key)} = $value;
  205         490  
67             }
68              
69 73         245 $this->{'RECORD_CB'}->($record);
70             }
71             }
72              
73             1;