File Coverage

blib/lib/AnyEvent/RabbitMQ/PubSub/Consumer.pm
Criterion Covered Total %
statement 4 6 66.6
branch n/a
condition n/a
subroutine 2 2 100.0
pod n/a
total 6 8 75.0


line stmt bran cond sub pod time code
1             package AnyEvent::RabbitMQ::PubSub::Consumer;
2 1     1   1735 use Moose;
  1         4  
  1         9  
3 1     1   10348 use AnyEvent::RabbitMQ::PubSub;
  0            
  0            
4             use Data::Dumper;
5             use Time::HiRes qw(usleep);
6              
7             use AnyEvent;
8             use Promises qw(deferred collect);
9              
10             =head1 NAME
11              
12             AnyEvent::RabbitMQ::PubSub::Consumer - rabbitmq consumer
13              
14             =cut
15              
16             has channel => (
17             is => 'ro', isa => 'AnyEvent::RabbitMQ::Channel', required => 1
18             );
19             has exchange => (
20             is => 'ro', isa => 'HashRef', required => 1
21             );
22             has queue => (
23             is => 'ro', isa => 'HashRef', required => 1
24             );
25             has routing_key => (
26             is => 'ro', isa => 'Str', default => '#'
27             );
28             has prefetch_count => (
29             is => 'ro', isa => 'Int', default => 5,
30             );
31              
32             =head1 METHODS
33              
34             =head2 init()
35              
36             set prefetch_count
37              
38             declare exchange and queue
39              
40             =cut
41              
42             sub init {
43             my ($self) = @_;
44              
45             $self->channel->qos(prefetch_count => $self->prefetch_count);
46              
47             my $cv = AnyEvent->condvar;
48              
49             $self->declare_exchange_and_queue()
50             ->then( sub { $self->bind_queue() })
51             ->then( sub { $cv->send() })
52             ->catch(sub { $cv->croak(@_) });
53              
54             $cv->recv();
55             return
56             }
57              
58             =head2 consume($cv, $on_consume)
59              
60             run consume C<$on_consume> code on channel
61              
62             return L<Promise>
63              
64             my $cv = AnyEvent->condvar();
65             $self->consume(
66             $cv,
67             sub {
68             my ($consumer, $msg) = @_;
69              
70             ...
71             }
72             )->then(sub {
73             say 'Consumer was started...';
74             });
75              
76              
77             =cut
78              
79             sub consume {
80             my ($self, $cv, $on_consume) = @_;
81              
82             my $d = deferred();
83              
84             $self->channel->consume(
85             queue => $self->queue->{queue},
86             no_ack => 0,
87             on_success => sub { $d->resolve() },
88             on_cancel => sub {AnyEvent::RabbitMQ::PubSub::_report_error($cv, @_)},
89             on_failure => sub {AnyEvent::RabbitMQ::PubSub::_report_error($cv, @_)},
90             on_consume => sub { $on_consume->($self, @_) },
91             );
92              
93             return $d->promise
94             }
95              
96             =head2 reject_and_republish($msg)
97              
98             reject (drop) message
99              
100             and after 10ms (to avoid 100% CPU)
101              
102             republish message back (to end of queue)
103              
104             =cut
105              
106             sub reject_and_republish {
107             my ($self, $msg) = @_;
108              
109             usleep 10_000; # wait 10 ms before republish to avoid 100 % CPU
110             $self->reject($msg);
111              
112             $msg->{header}{headers}{trials}++;
113             $self->channel->publish(
114             body => $msg->{body}->{payload},
115             header => $msg->{header},
116             exchange => "",
117             routing_key => $self->queue->{queue},
118             );
119             }
120              
121             =head2 reject($msg)
122              
123             reject (drop) message
124              
125             =cut
126              
127             sub reject {
128             my ($self, $msg) = @_;
129              
130             warn "Message to reject not specified" if !defined $msg;
131              
132             my $delivery_tag = $msg->{deliver}{method_frame}{delivery_tag};
133             $self->channel->reject(delivery_tag => $delivery_tag);
134             }
135              
136             =head2 ack($msg)
137              
138             ack C<$msg> same as
139              
140             $consumer->channel->ack(delivery_tag => $msg->{deliver}{method_frame}{delivery_tag});
141              
142             =cut
143              
144             sub ack {
145             my ($self, $msg) = @_;
146              
147             warn "Message to ack not specified" if !defined $msg;
148              
149             my $delivery_tag = $msg->{deliver}{method_frame}{delivery_tag};
150             $self->channel->ack(delivery_tag => $delivery_tag);
151             }
152              
153             sub declare_exchange_and_queue {
154             my ($self, $cv) = @_;
155              
156             return collect(
157             $self->declare_exchange(),
158             $self->declare_queue(),
159             )->then(sub {
160             return @{ $_[0] }
161             });
162             }
163              
164             sub declare_queue {
165             my ($self) = @_;
166              
167             my $d = deferred;
168             $self->channel->declare_queue(
169             %{ $self->queue },
170             on_success => sub { $d->resolve() },
171             on_failure => sub { $d->reject(@_) },
172             );
173             return $d->promise()
174             }
175              
176             sub declare_exchange {
177             my ($self) = @_;
178              
179             my $d = deferred;
180             $self->channel->declare_exchange(
181             %{ $self->exchange },
182             on_success => sub { $d->resolve() },
183             on_failure => sub { $d->reject(@_) },
184             );
185             return $d->promise()
186             }
187              
188             sub bind_queue {
189             my ($self) = @_;
190              
191             my $d = deferred;
192             $self->channel->bind_queue(
193             queue => $self->queue->{queue},
194             exchange => $self->exchange->{exchange},
195             routing_key => $self->routing_key,
196             on_success => sub { $d->resolve() },
197             on_failure => sub { $d->reject(@_) },
198             );
199             return $d->promise()
200             }
201              
202             1