File Coverage

blib/lib/AnyEvent/RabbitMQ/LocalQueue.pm
Criterion Covered Total %
statement 28 35 80.0
branch 2 2 100.0
condition n/a
subroutine 6 7 85.7
pod 0 3 0.0
total 36 47 76.6


line stmt bran cond sub pod time code
1             package AnyEvent::RabbitMQ::LocalQueue;
2              
3 2     2   107552 use strict;
  2         16  
  2         58  
4 2     2   11 use warnings;
  2         4  
  2         765  
5              
6             our $VERSION = '1.22'; # VERSION
7              
8             sub new {
9 1     1 0 147 my $class = shift;
10 1         8 return bless {
11             _message_queue => [],
12             _drain_code_queue => [],
13             }, $class;
14             }
15              
16             sub push {
17 6     6 0 29 my $self = shift;
18              
19 6         13 CORE::push @{$self->{_message_queue}}, @_;
  6         16  
20 6         13 return $self->_drain_queue();
21             }
22              
23             sub get {
24 6     6 0 68 my $self = shift;
25              
26 6         8 CORE::push @{$self->{_drain_code_queue}}, @_;
  6         16  
27 6         13 return $self->_drain_queue();
28             }
29              
30             sub _drain_queue {
31 12     12   16 my $self = shift;
32              
33 12         22 my $message_count = scalar @{$self->{_message_queue}};
  12         19  
34 12         46 my $drain_code_count = scalar @{$self->{_drain_code_queue}};
  12         23  
35              
36 12 100       26 my $count = $message_count < $drain_code_count
37             ? $message_count : $drain_code_count;
38              
39 12         27 for (1 .. $count) {
40 10         15 &{shift @{$self->{_drain_code_queue}}}(
  10         29  
41 10         2312 shift @{$self->{_message_queue}}
  10         20  
42             );
43             }
44              
45 12         3869 return $self;
46             }
47              
48             sub _flush {
49 0     0     my ($self, $frame) = @_;
50              
51 0           $self->_drain_queue;
52              
53 0           while (my $cb = shift @{$self->{_drain_code_queue}}) {
  0            
54 0           local $@; # Flush frames immediately, throwing away errors for on-close
55 0           eval { $cb->($frame) };
  0            
56             }
57             }
58              
59             1;
60