File Coverage

blib/lib/AnyEvent/RabbitMQ/LocalQueue.pm
Criterion Covered Total %
statement 28 35 80.0
branch 2 2 100.0
condition n/a
subroutine 6 7 85.7
pod 0 3 0.0
total 36 47 76.6


line stmt bran cond sub pod time code
1             package AnyEvent::RabbitMQ::LocalQueue;
2              
3 2     2   106054 use strict;
  2         13  
  2         58  
4 2     2   10 use warnings;
  2         4  
  2         692  
5              
6             sub new {
7 1     1 0 202 my $class = shift;
8 1         8 return bless {
9             _message_queue => [],
10             _drain_code_queue => [],
11             }, $class;
12             }
13              
14             sub push {
15 6     6 0 31 my $self = shift;
16              
17 6         10 CORE::push @{$self->{_message_queue}}, @_;
  6         28  
18 6         15 return $self->_drain_queue();
19             }
20              
21             sub get {
22 6     6 0 52 my $self = shift;
23              
24 6         12 CORE::push @{$self->{_drain_code_queue}}, @_;
  6         14  
25 6         13 return $self->_drain_queue();
26             }
27              
28             sub _drain_queue {
29 12     12   18 my $self = shift;
30              
31 12         16 my $message_count = scalar @{$self->{_message_queue}};
  12         20  
32 12         19 my $drain_code_count = scalar @{$self->{_drain_code_queue}};
  12         19  
33              
34 12 100       27 my $count = $message_count < $drain_code_count
35             ? $message_count : $drain_code_count;
36              
37 12         30 for (1 .. $count) {
38 10         14 &{shift @{$self->{_drain_code_queue}}}(
  10         27  
39 10         2233 shift @{$self->{_message_queue}}
  10         22  
40             );
41             }
42              
43 12         3824 return $self;
44             }
45              
46             sub _flush {
47 0     0     my ($self, $frame) = @_;
48              
49 0           $self->_drain_queue;
50              
51 0           while (my $cb = shift @{$self->{_drain_code_queue}}) {
  0            
52 0           local $@; # Flush frames immediately, throwing away errors for on-close
53 0           eval { $cb->($frame) };
  0            
54             }
55             }
56              
57             1;
58