File Coverage

blib/lib/App/TailRabbit.pm
Criterion Covered Total %
statement 1 3 33.3
branch n/a
condition n/a
subroutine 1 1 100.0
pod n/a
total 2 4 50.0


line stmt bran cond sub pod time code
1             package App::TailRabbit;
2 1     1   1594 use Moose;
  0            
  0            
3             use MooseX::Getopt;
4             use Net::RabbitFoot;
5             use Data::Dumper;
6             use MooseX::Types::Moose qw/ ArrayRef Object Bool /;
7             use MooseX::Types::Common::String qw/ NonEmptySimpleStr /;
8             use AnyEvent;
9             use YAML qw/LoadFile/;
10             use File::HomeDir;
11             use Path::Class qw/ file /;
12             use MooseX::Types::LoadableClass qw/ LoadableClass /;
13             use namespace::autoclean;
14              
15             our $VERSION = '0.003';
16              
17             with qw/
18             MooseX::Getopt
19             MooseX::ConfigFromFile
20             /;
21              
22             has exchange_name => (
23             is => 'ro',
24             isa => NonEmptySimpleStr,
25             required => 1,
26             );
27              
28             has routing_key => (
29             is => 'ro',
30             isa => ArrayRef[NonEmptySimpleStr],
31             default => sub { [] },
32             );
33              
34             has rabbitmq_host => (
35             isa => NonEmptySimpleStr,
36             is => 'ro',
37             default => 'localhost',
38             );
39              
40             has [qw/ rabbitmq_user rabbitmq_pass /] => (
41             isa => NonEmptySimpleStr,
42             is => 'ro',
43             default => 'guest',
44             );
45              
46             has convertor => (
47             isa => LoadableClass,
48             is => 'ro',
49             coerce => 1,
50             default => 'App::TailRabbit::Convertor::Null',
51             );
52              
53             has _convertor => (
54             is => 'ro',
55             isa => Object,
56             lazy => 1,
57             default => sub {
58             shift->convertor->new
59             },
60             handles => [qw/ convert /],
61             );
62              
63             has exchange_type => (
64             is => 'ro',
65             isa => NonEmptySimpleStr,
66             default => 'topic',
67             );
68              
69             has durable => (
70             is => 'ro',
71             isa => Bool,
72             default => 0,
73             );
74              
75             sub get_config_from_file {
76             my ($class, $file) = @_;
77             return LoadFile($file) if (-r $file);
78             return {};
79             }
80              
81             has 'configfile' => (
82             default => file(File::HomeDir->my_home(), ".tailrabbit.yml")->stringify,
83             is => 'bare',
84             );
85              
86             my $rf = Net::RabbitFoot->new(
87             # verbose => 1,
88             )->load_xml_spec();
89              
90             sub _get_mq {
91             my $self = shift;
92             $rf->connect(
93             host => $self->rabbitmq_host,
94             port => 5672,
95             user => $self->rabbitmq_user,
96             pass => $self->rabbitmq_pass,
97             vhost => '/',
98             on_close => sub {
99             die("MQ connection closed");
100             },
101             on_read_failure => sub {
102             die("READ FAILED");
103             },
104             on_failure => sub {
105             die("Failed to connect to mq");
106             },
107             );
108             return $rf;
109             }
110              
111             sub _bind_anon_queue {
112             my ($self, $ch) = @_;
113             my $queue_frame = $ch->declare_queue(
114             auto_delete => 1,
115             exclusive => 1,
116             )->method_frame;
117             my @keys = @{ $self->routing_key };
118             push(@keys, "#") unless scalar @keys;
119             foreach my $key (@keys) {
120             my $bind_frame = $ch->bind_queue(
121             queue => $queue_frame->queue,
122             exchange => $self->exchange_name,
123             routing_key => $key,
124             )->method_frame;
125             die Dumper($bind_frame) unless blessed $bind_frame and $bind_frame->isa('Net::AMQP::Protocol::Queue::BindOk');
126             }
127             }
128              
129             sub _get_channel {
130             my ($self, $rf) = @_;
131             my $ch = $rf->open_channel(
132             on_close => sub { warn("Channel closed - wrong exchange options!\n"); exit; },
133             );
134             my $reply = $ch->declare_exchange(
135             type => $self->exchange_type,
136             durable => $self->durable,
137             exchange => $self->exchange_name,
138             );
139             my $exch_frame = $reply->method_frame;
140             die Dumper($exch_frame) unless blessed $exch_frame and $exch_frame->isa('Net::AMQP::Protocol::Exchange::DeclareOk');
141             return $ch;
142             }
143              
144             sub run {
145             my $self = shift;
146             my $ch = $self->_get_channel($self->_get_mq);
147             $self->_bind_anon_queue($ch);
148             my $done = AnyEvent->condvar;
149             $ch->consume(
150             on_consume => sub {
151             my $message = shift;
152             my $payload = $self->convert($message->{body}->payload);
153             my $routing_key = $message->{deliver}->method_frame->routing_key;
154             $self->notify($payload, $routing_key, $message);
155             },
156             );
157             $done->recv; # Go into the event loop forever.
158             }
159              
160             sub notify {
161             my ($self, $payload, $routing_key, $message) = @_;
162             print $routing_key,
163             ': ', $payload, "\n";
164             }
165              
166             __PACKAGE__->meta->make_immutable;
167             1;
168              
169             =head1 NAME
170              
171             App::TailRabbit - Listen to a RabbitMQ exchange and emit the messages to console.
172              
173             =head1 SYNOPSIS
174              
175             tail_reabbit --exchange_name firehose --routing_key # --rabbitmq_user guest --rabbitmq_user guest --rabbitmq_host localhost
176              
177             =head1 DESCRIPTION
178              
179             Simple module to consume messages from a RabitMQ message queue.
180              
181             =head1 BUGS
182              
183             =over
184              
185             =item Virtually no docs
186              
187             =item Creates all exchanges as durable.
188              
189             =item Always creates exchange if it doesn't exist
190              
191             =item Probably several more
192              
193             =back
194              
195             =head1 SEE ALSO
196              
197             L<Net::RabbitFoot>
198              
199             =head1 AUTHOR
200              
201             Tomas (t0m) Doran C<< <bobtfish@bobtfish.net> >>.
202              
203             =head1 COPYRIGHT & LICENSE
204              
205             Copyright the above author(s).
206              
207             Licensed under the same terms as perl itself.
208              
209             =cut
210