File Coverage

blib/lib/Message/Passing/Input/AMQP.pm
Criterion Covered Total %
statement 15 15 100.0
branch n/a
condition n/a
subroutine 5 5 100.0
pod n/a
total 20 20 100.0


line stmt bran cond sub pod time code
1             package Message::Passing::Input::AMQP;
2 1     1   919 use Moose;
  1         2  
  1         8  
3 1     1   5519 use AnyEvent;
  1         2  
  1         30  
4 1     1   5 use Scalar::Util qw/ weaken refaddr /;
  1         2  
  1         65  
5 1     1   4 use Try::Tiny;
  1         2  
  1         50  
6 1     1   5 use namespace::autoclean;
  1         2  
  1         8  
7              
8             with qw/
9             Message::Passing::AMQP::Role::BindsAQueue
10             Message::Passing::Role::Input
11             /;
12              
13              
14             after '_set_queue' => sub {
15             my $self = shift;
16             weaken($self);
17             $self->_channel->consume(
18             on_consume => sub {
19             my $message = shift;
20             try {
21             $self->output_to->consume($message->{body}->payload);
22             }
23             catch {
24             warn("Error in consume_message callback: $_");
25             };
26             },
27             consumer_tag => refaddr($self),
28             on_success => sub {
29             },
30             on_failure => sub {
31             Carp::cluck("Failed to start message consumer in $self response " . Dumper(@_));
32             },
33             );
34             };
35              
36             __PACKAGE__->meta->make_immutable;
37             1;
38              
39             =head1 NAME
40              
41             Message::Passing::Input::AMQP - input logstash messages from AMQP.
42              
43             =head1 SYNOPSIS
44              
45             message-pass --output STDOUT --input AMQP --input_options '{"queue_name":"test","exchange_name":"test","hostname":"127.0.0.1","username":"guest","password":"guest"}'
46              
47             =head1 DESCRIPTION
48              
49             =head1 SEE ALSO
50              
51             =over
52              
53             =item L<Message::Passing::AMQP>
54              
55             =item L<Message::Passing::Output::AMQP>
56              
57             =item L<Message::Passing>
58              
59             =item L<AMQP>
60              
61             =item L<http://www.zeromq.org/>
62              
63             =back
64              
65             =head1 AUTHOR, COPYRIGHT AND LICENSE
66              
67             See L<Message::Passing::AMQP>.
68              
69             =cut
70