File Coverage

blib/lib/AnyEvent/RabbitMQ/PubSub/Consumer.pm
Criterion Covered Total %
statement 4 6 66.6
branch n/a
condition n/a
subroutine 2 2 100.0
pod n/a
total 6 8 75.0


line stmt bran cond sub pod time code
1             package AnyEvent::RabbitMQ::PubSub::Consumer;
2 1     1   1055 use Moose;
  1         2  
  1         6  
3 1     1   4629 use AnyEvent::RabbitMQ::PubSub;
  0            
  0            
4             use Data::Dumper;
5             use Time::HiRes qw(usleep);
6              
7             use AnyEvent;
8             use Promises qw(deferred collect);
9              
10             has channel => (
11             is => 'ro', isa => 'AnyEvent::RabbitMQ::Channel', required => 1
12             );
13             has exchange => (
14             is => 'ro', isa => 'HashRef', required => 1
15             );
16             has queue => (
17             is => 'ro', isa => 'HashRef', required => 1
18             );
19             has routing_key => (
20             is => 'ro', isa => 'Str', default => '#'
21             );
22             has prefetch_count => (
23             is => 'ro', isa => 'Int', default => 5,
24             );
25              
26             sub init {
27             my ($self) = @_;
28              
29             $self->channel->qos(prefetch_count => $self->prefetch_count);
30              
31             my $cv = AnyEvent->condvar;
32              
33             $self->declare_exchange_and_queue()
34             ->then( sub { $self->bind_queue() })
35             ->then( sub { $cv->send() })
36             ->catch(sub { $cv->croak(@_) });
37              
38             $cv->recv();
39             return
40             }
41              
42             sub consume {
43             my ($self, $cv, $on_consume) = @_;
44              
45             my $d = deferred();
46              
47             $self->channel->consume(
48             queue => $self->queue->{queue},
49             no_ack => 0,
50             on_success => sub { $d->resolve() },
51             on_cancel => sub {AnyEvent::RabbitMQ::PubSub::_report_error($cv, @_)},
52             on_failure => sub {AnyEvent::RabbitMQ::PubSub::_report_error($cv, @_)},
53             on_consume => sub { $on_consume->($self, @_) },
54             );
55              
56             return $d->promise
57             }
58              
59             sub reject_and_republish {
60             my ($self, $msg) = @_;
61              
62             usleep 10_000; # wait 10 ms before republish to avoid 100 % CPU
63             my $delivery_tag = $msg->{deliver}{method_frame}{delivery_tag};
64              
65             $self->channel->reject(delivery_tag => $delivery_tag);
66              
67             $msg->{header}{headers}{trials}++;
68             $self->channel->publish(
69             body => $msg->{body}->{payload},
70             header => $msg->{header},
71             exchange => "",
72             routing_key => $self->queue->{queue},
73             );
74             }
75              
76             sub ack {
77             my ($self, $msg) = @_;
78              
79             warn "Message to ack not specified" if !defined $msg;
80              
81             my $delivery_tag = $msg->{deliver}{method_frame}{delivery_tag};
82             $self->channel->ack(delivery_tag => $delivery_tag);
83             }
84              
85             sub declare_exchange_and_queue {
86             my ($self, $cv) = @_;
87              
88             return collect(
89             $self->declare_exchange(),
90             $self->declare_queue(),
91             )->then(sub {
92             return @{ $_[0] }
93             });
94             }
95              
96             sub declare_queue {
97             my ($self) = @_;
98              
99             my $d = deferred;
100             $self->channel->declare_queue(
101             %{ $self->queue },
102             on_success => sub { $d->resolve() },
103             on_failure => sub { $d->reject(@_) },
104             );
105             return $d->promise()
106             }
107              
108             sub declare_exchange {
109             my ($self) = @_;
110              
111             my $d = deferred;
112             $self->channel->declare_exchange(
113             %{ $self->exchange },
114             on_success => sub { $d->resolve() },
115             on_failure => sub { $d->reject(@_) },
116             );
117             return $d->promise()
118             }
119              
120             sub bind_queue {
121             my ($self) = @_;
122              
123             my $d = deferred;
124             $self->channel->bind_queue(
125             queue => $self->queue->{queue},
126             exchange => $self->exchange->{exchange},
127             routing_key => $self->routing_key,
128             on_success => sub { $d->resolve() },
129             on_failure => sub { $d->reject(@_) },
130             );
131             return $d->promise()
132             }
133              
134             1