File Coverage

blib/lib/Data/Queue/Batch.pm
Criterion Covered Total %
statement 50 52 96.1
branch 12 12 100.0
condition 6 8 75.0
subroutine 14 16 87.5
pod 11 11 100.0
total 93 99 93.9


line stmt bran cond sub pod time code
1             package Data::Queue::Batch;
2 4     4   64932 use 5.008001;
  4         9  
3 4     4   12 use strict;
  4         4  
  4         64  
4 4     4   17 use warnings;
  4         3  
  4         1866  
5              
6             our $VERSION = "0.03";
7              
8             sub new {
9 20     20 1 9296 my ($class, %args) = @_;
10 20   50     50 my $batch_size = delete($args{batch_size}) || 100;
11 20         22 my $callback = delete($args{callback});
12 20         325 return bless {
13             callback => $callback,
14             batch_size => $batch_size,,
15             %args,
16             available => 0,
17             _queue => [],
18             }, $class;
19             }
20              
21 95     95 1 1764 sub size { scalar(@{ shift->{_queue} }) }
  95         197  
22 0     0 1 0 sub available { shift->{available} }
23 0     0 1 0 sub batch_size { shift->{batch_size} }
24              
25 51     51 1 2452 sub push { shift->enqueue(@_) }
26             sub enqueue {
27 56     56 1 70 my ($self, @values) = @_;
28 56         37 CORE::push(@{ $self->{_queue} }, @values);
  56         99  
29              
30 56         72 my $unmarked = $self->size - $self->{available};
31 56         61 my $marking = $unmarked - ($unmarked % $self->{batch_size});
32 56         46 $self->{available} += $marking;
33            
34 56 100 66     129 if ($self->{callback} && $self->{available}) {
35 19         28 $self->{callback}->($self->_take($self->{available}));
36             }
37 56         610 return;
38             }
39              
40 23     23 1 48 sub shift { shift->dequeue(@_) }
41             sub dequeue {
42 28     28 1 33 my ($self) = @_;
43 28 100       221 return unless $self->{available};
44 15         18 my ($dequeued) = $self->_take(1);
45 15         67 return $dequeued;
46             }
47              
48             sub peek {
49 4     4 1 5 my ($self, $count) = @_;
50 4 100       9 $count = $self->{available} if $count > $self->{available};
51 4         8 my @peeked = @{$self->{_queue}}[0 .. $count - 1];
  4         9  
52 4         16 return @peeked;
53             }
54              
55             sub flush {
56 20     20 1 317 my ($self) = @_;
57 20         44 my @taken = $self->_take($self->size);
58 20 100 100     71 if ($self->{callback} && @taken) {
59 17         242 $self->{callback}->(@taken);
60             }
61 20         268 return @taken;
62             }
63              
64             sub clear {
65 1     1 1 3 my ($self) = @_;
66 1         2 $self->_take($self->size);
67 1         13 return;
68             }
69              
70             sub _take {
71 55     55   46 my ($self, $count) = @_;
72 55         29 my @taken = splice(@{ $self->{_queue} }, 0, $count);
  55         90  
73 55         53 $self->{available} -= $count;
74 55 100       631 $self->{available} = 0 if $self->{available} < 0;
75 55         332 return @taken;
76             }
77              
78             sub DESTROY {
79 20     20   1393 my ($self) = @_;
80 20 100       104 $self->flush if $self->{callback};
81             }
82              
83             1;
84             __END__