File Coverage

blib/lib/AnyEvent/RabbitMQ/LocalQueue.pm
Criterion Covered Total %
statement 28 35 80.0
branch 2 2 100.0
condition n/a
subroutine 6 7 85.7
pod 0 3 0.0
total 36 47 76.6


line stmt bran cond sub pod time code
1             package AnyEvent::RabbitMQ::LocalQueue;
2              
3 2     2   106049 use strict;
  2         17  
  2         60  
4 2     2   10 use warnings;
  2         6  
  2         807  
5              
6             our $VERSION = '1.21_01'; # TRIAL VERSION
7             $VERSION = eval $VERSION;
8              
9             sub new {
10 1     1 0 151 my $class = shift;
11 1         7 return bless {
12             _message_queue => [],
13             _drain_code_queue => [],
14             }, $class;
15             }
16              
17             sub push {
18 6     6 0 28 my $self = shift;
19              
20 6         7 CORE::push @{$self->{_message_queue}}, @_;
  6         37  
21 6         13 return $self->_drain_queue();
22             }
23              
24             sub get {
25 6     6 0 58 my $self = shift;
26              
27 6         11 CORE::push @{$self->{_drain_code_queue}}, @_;
  6         14  
28 6         15 return $self->_drain_queue();
29             }
30              
31             sub _drain_queue {
32 12     12   19 my $self = shift;
33              
34 12         17 my $message_count = scalar @{$self->{_message_queue}};
  12         20  
35 12         19 my $drain_code_count = scalar @{$self->{_drain_code_queue}};
  12         16  
36              
37 12 100       28 my $count = $message_count < $drain_code_count
38             ? $message_count : $drain_code_count;
39              
40 12         30 for (1 .. $count) {
41 10         12 &{shift @{$self->{_drain_code_queue}}}(
  10         35  
42 10         2224 shift @{$self->{_message_queue}}
  10         23  
43             );
44             }
45              
46 12         4178 return $self;
47             }
48              
49             sub _flush {
50 0     0     my ($self, $frame) = @_;
51              
52 0           $self->_drain_queue;
53              
54 0           while (my $cb = shift @{$self->{_drain_code_queue}}) {
  0            
55 0           local $@; # Flush frames immediately, throwing away errors for on-close
56 0           eval { $cb->($frame) };
  0            
57             }
58             }
59              
60             1;
61