File Coverage

blib/lib/Data/Queue/Batch.pm
Criterion Covered Total %
statement 50 52 96.1
branch 12 12 100.0
condition 6 8 75.0
subroutine 14 16 87.5
pod 11 11 100.0
total 93 99 93.9


line stmt bran cond sub pod time code
1             package Data::Queue::Batch;
2 4     4   65178 use 5.008001;
  4         9  
3 4     4   13 use strict;
  4         4  
  4         68  
4 4     4   17 use warnings;
  4         5  
  4         1949  
5              
6             our $VERSION = "0.04";
7              
8             sub new {
9 20     20 1 10589 my ($class, %args) = @_;
10 20   50     58 my $batch_size = delete($args{batch_size}) || 100;
11 20         22 my $callback = delete($args{callback});
12 20         318 return bless {
13             callback => $callback,
14             batch_size => $batch_size,,
15             %args,
16             available => 0,
17             _queue => [],
18             }, $class;
19             }
20              
21 95     95 1 1819 sub size { scalar(@{ shift->{_queue} }) }
  95         200  
22 0     0 1 0 sub available { shift->{available} }
23 0     0 1 0 sub batch_size { shift->{batch_size} }
24              
25 51     51 1 2477 sub push :method { shift->enqueue(@_) }
26             sub enqueue {
27 56     56 1 73 my ($self, @values) = @_;
28 56         40 push(@{ $self->{_queue} }, @values);
  56         97  
29              
30 56         69 my $unmarked = $self->size - $self->{available};
31 56         61 my $marking = $unmarked - ($unmarked % $self->{batch_size});
32 56         51 $self->{available} += $marking;
33            
34 56 100 66     128 if ($self->{callback} && $self->{available}) {
35 19         27 $self->{callback}->($self->_take($self->{available}));
36             }
37 56         581 return;
38             }
39              
40 23     23 1 45 sub shift :method { shift->dequeue(@_) }
41             sub dequeue {
42 28     28 1 32 my ($self) = @_;
43 28 100       156 return unless $self->{available};
44 15         21 my ($dequeued) = $self->_take(1);
45 15         68 return $dequeued;
46             }
47              
48             sub peek {
49 4     4 1 7 my ($self, $count) = @_;
50 4 100       10 $count = $self->{available} if $count > $self->{available};
51 4         8 my @peeked = @{$self->{_queue}}[0 .. $count - 1];
  4         6  
52 4         17 return @peeked;
53             }
54              
55             sub flush {
56 20     20 1 316 my ($self) = @_;
57 20         37 my @taken = $self->_take($self->size);
58 20 100 100     74 if ($self->{callback} && @taken) {
59 17         220 $self->{callback}->(@taken);
60             }
61 20         265 return @taken;
62             }
63              
64             sub clear {
65 1     1 1 3 my ($self) = @_;
66 1         2 $self->_take($self->size);
67 1         14 return;
68             }
69              
70             sub _take {
71 55     55   42 my ($self, $count) = @_;
72 55         31 my @taken = splice(@{ $self->{_queue} }, 0, $count);
  55         90  
73 55         54 $self->{available} -= $count;
74 55 100       87 $self->{available} = 0 if $self->{available} < 0;
75 55         314 return @taken;
76             }
77              
78             sub DESTROY {
79 20     20   1381 my ($self) = @_;
80 20 100       101 $self->flush if $self->{callback};
81             }
82              
83             1;
84             __END__