File Coverage

blib/lib/Message/Passing/Input/AMQP.pm
Criterion Covered Total %
statement 15 15 100.0
branch n/a
condition n/a
subroutine 5 5 100.0
pod n/a
total 20 20 100.0


line stmt bran cond sub pod time code
1             package Message::Passing::Input::AMQP;
2 1     1   978 use Moo;
  1         2  
  1         6  
3 1     1   322 use AnyEvent;
  1         2  
  1         25  
4 1     1   6 use Scalar::Util qw/ weaken refaddr /;
  1         2  
  1         52  
5 1     1   6 use Try::Tiny;
  1         1  
  1         63  
6 1     1   7 use namespace::autoclean;
  1         2  
  1         6  
7              
8             with qw/
9             Message::Passing::AMQP::Role::BindsAQueue
10             Message::Passing::Role::Input
11             /;
12              
13              
14             after '_set_queue' => sub {
15             my $self = shift;
16             weaken($self);
17             $self->_channel->consume(
18             on_consume => sub {
19             my $message = shift;
20             try {
21             $self->output_to->consume($message->{body}->payload);
22             }
23             catch {
24             warn("Error in consume_message callback: $_");
25             };
26             },
27             consumer_tag => refaddr($self),
28             on_success => sub {
29             },
30             on_failure => sub {
31             Carp::cluck("Failed to start message consumer in $self response " . Dumper(@_));
32             },
33             );
34             };
35              
36             1;
37              
38             =head1 NAME
39              
40             Message::Passing::Input::AMQP - input logstash messages from AMQP.
41              
42             =head1 SYNOPSIS
43              
44             message-pass --output STDOUT --input AMQP --input_options '{"queue_name":"test","exchange_name":"test","hostname":"127.0.0.1","username":"guest","password":"guest"}'
45              
46             =head1 DESCRIPTION
47              
48             =head1 SEE ALSO
49              
50             =over
51              
52             =item L<Message::Passing::AMQP>
53              
54             =item L<Message::Passing::Output::AMQP>
55              
56             =item L<Message::Passing>
57              
58             =item L<AMQP>
59              
60             =item L<http://www.zeromq.org/>
61              
62             =back
63              
64             =head1 AUTHOR, COPYRIGHT AND LICENSE
65              
66             See L<Message::Passing::AMQP>.
67              
68             =cut
69