File Coverage

blib/lib/App/RabbitTail.pm
Criterion Covered Total %
statement 1 3 33.3
branch n/a
condition n/a
subroutine 1 1 100.0
pod n/a
total 2 4 50.0


line stmt bran cond sub pod time code
1             package App::RabbitTail;
2 1     1   1741 use Moose;
  0            
  0            
3             use Net::RabbitFoot 1.03;
4             use App::RabbitTail::FileTailer;
5             use AnyEvent;
6             use Data::Dumper;
7             use Moose::Autobox;
8             use MooseX::Types::Moose qw/ArrayRef Str Int/;
9             use Try::Tiny qw/ try catch /;
10             use namespace::autoclean;
11              
12             our $VERSION = '0.002';
13             $VERSION = eval $VERSION;
14              
15             with 'MooseX::Getopt';
16              
17             has filename => (
18             isa => ArrayRef[Str],
19             is => 'ro',
20             cmd_aliases => ['fn'],
21             required => 1,
22             traits => ['Getopt'],
23             );
24              
25             has routing_key => (
26             isa => ArrayRef[Str],
27             is => 'ro',
28             cmd_aliases => ['rk'],
29             default => sub { [ '#' ] },
30             traits => ['Getopt'],
31             );
32              
33             has max_sleep => (
34             isa => Int,
35             is => 'ro',
36             default => 10,
37             documentation => 'The max sleep time between trying to read a line from an input file',
38             );
39              
40             has _cv => (
41             is => 'ro',
42             lazy => 1,
43             default => sub { AnyEvent->condvar },
44             clearer => '_clear_cv',
45             );
46              
47             my $rf = Net::RabbitFoot->new(
48             varbose => 1,
49             )->load_xml_spec();
50              
51             has _rf => (
52             isa => 'Net::RabbitFoot',
53             is => 'ro',
54             lazy => 1,
55             builder => '_build_rf',
56             clearer => '_clear_rf',
57             );
58              
59             sub _build_rf {
60             my ($self) = @_;
61             my $rf_conn;
62             while (!$rf_conn) {
63             try {
64             $rf_conn = $rf->connect(
65             on_close => sub {
66             warn(sprintf("RabbitMQ connection to %s:%s closed!\n", $self->host, $self->port));
67             $self->_clear_ch;
68             $self->_clear_rf;
69             $self->_cv->send("ARGH");
70             },
71             map { $_ => $self->$_ }
72             qw/ host port user pass vhost /
73             );
74             }
75             catch {
76             warn($_);
77             sleep 2;
78             };
79             }
80             return $rf_conn;
81             }
82              
83             my %defaults = (
84             host => 'localhost',
85             port => 5672,
86             user => 'guest',
87             pass => 'guest',
88             vhost => '/',
89             exchange_type => 'direct',
90             exchange_name => 'logs',
91             exchange_durable => 0,
92             );
93              
94             foreach my $k (keys %defaults) {
95             has $k => ( is => 'ro', isa => Str, default => $defaults{$k} );
96             }
97              
98             has _ch => (
99             is => 'ro',
100             lazy => 1,
101             builder => '_build_ch',
102             clearer => '_clear_ch',
103             predicate => '_has_ch',
104             );
105              
106             sub _build_ch {
107             my ($self) = @_;
108             my $ch = $self->_rf->open_channel;
109             my $exch_frame = $ch->declare_exchange(
110             type => $self->exchange_type,
111             durable => $self->exchange_durable,
112             exchange => $self->exchange_name,
113             )->method_frame;
114             die Dumper($exch_frame) unless blessed $exch_frame
115             and $exch_frame->isa('Net::AMQP::Protocol::Exchange::DeclareOk');
116             return $ch;
117             }
118              
119             sub run {
120             my $self = shift;
121             my $tail_started = 0;
122             while (1) {
123             $self->_clear_cv;
124             $self->_ch; # Build channel before going into the event loop
125             $self->tail # Setup all the timers
126             unless $tail_started++;
127             $self->_cv->recv; # Enter event loop. We will leave here if channel dies..
128             }
129             }
130              
131             sub tail {
132             my $self = shift;
133             my $rkeys = $self->routing_key;
134             foreach my $fn ($self->filename->flatten) {
135             my $rk = $rkeys->shift;
136             $rkeys->unshift($rk) unless $rkeys->length;
137             # warn("Setup tail for $fn on $rk");
138             my $ft = $self->setup_tail($fn, $rk);
139             $ft->tail;
140             }
141             }
142              
143             sub setup_tail {
144             my ($self, $file, $routing_key) = @_;
145             App::RabbitTail::FileTailer->new(
146             max_sleep => $self->max_sleep,
147             cb => sub {
148             my $message = shift;
149             chomp($message);
150             # warn("SENT $message to " . $self->exchange_name . " with " . $routing_key);
151             if (!$self->_has_ch) {
152             warn("DROPPED $message to " . $self->exchange_name . " with " . $routing_key . "\n");
153             return;
154             }
155             $self->_ch->publish(
156             body => $message,
157             exchange => $self->exchange_name,
158             routing_key => $routing_key,
159             );
160             },
161             fn => $file,
162             );
163             }
164              
165             __PACKAGE__->meta->make_immutable;
166             __END__
167              
168             =head1 NAME
169              
170             App::RabbitTail - Log tailer which broadcasts log lines into RabbitMQ exchanges.
171              
172             =head1 SYNOPSIS
173              
174             See the rabbit_tail script shipped with the distribution for simple CLI useage.
175              
176             use App::RabbitTail;
177             use AnyEvent; # Not strictly needed, but you probably want to
178             # use it yourself if you're doing this manually.
179              
180             my $tailer = App::RabbitTail->new(
181             # At least 1 filename must be supplied
182             filename => [qw/ file1 file2 /],
183             # Optional args, defaults below
184             routing_key => [qw/ # /],
185             host => 'localhost',
186             port => 5672,
187             user => 'guest',
188             pass => 'guest',
189             vhost => '/',
190             exchange_type => 'direct',
191             exchange_name => 'logs',
192             exchange_durable => 0,
193             max_sleep => 10,
194             );
195             # You can setup other AnyEvent io watchers etc here.
196             $tailer->run; # enters the event loop
197             # Or:
198             $tailer->tail;
199              
200             =head1 DECRIPTION
201              
202             App::RabbitTail is a trivial file tail implementation using L<AnyEvent> IO watchers,
203             which emits lines from the tailed files into L<http://www.rabbitmq.com/>
204             via the L<Net::RabbitFoot> client.
205              
206             Note that this software should be considered experimental.
207              
208             =head1 BUGS
209              
210             Plenty. Along with error conditions not being handled gracefully etc.
211              
212             They will be fixed in due course as I start using this more seriously,
213             however in the meantime, patches are welcome :)
214              
215             =head1 AUTHOR
216              
217             Tomas Doran (t0m) C<< <bobtfish@bobtfish.net> >>
218              
219             =head1 COPYRIGHT AND LICENSE
220              
221             Copyright (c) 2010 Tomas Doran
222              
223             Licensed under the same terms as perl itself.
224              
225             =cut
226