File Coverage

blib/lib/Message/Passing/Output/ElasticSearch.pm
Criterion Covered Total %
statement 1 3 33.3
branch n/a
condition n/a
subroutine 1 1 100.0
pod n/a
total 2 4 50.0


line stmt bran cond sub pod time code
1             package Message::Passing::Output::ElasticSearch;
2 1     1   4198 use Moose;
  0            
  0            
3             use ElasticSearch;
4             use AnyEvent;
5             use Scalar::Util qw/ weaken /;
6             use MooseX::Types::Moose qw/ ArrayRef Str Bool /;
7             use Scalar::Util qw/ weaken /;
8             use Try::Tiny qw/ try catch /;
9             use aliased 'DateTime' => 'DT';
10             use MooseX::Types::ISO8601 qw/ ISO8601DateTimeStr /;
11             use MooseX::Types::DateTime qw/ DateTime /;
12             use JSON qw/ encode_json /;
13             use Data::Dumper;
14             use namespace::autoclean;
15              
16             our $VERSION = '0.004';
17             $VERSION = eval $VERSION;
18              
19             with 'Message::Passing::Role::Output';
20              
21             has elasticsearch_servers => (
22             isa => ArrayRef[Str],
23             is => 'ro',
24             required => 1,
25             );
26              
27             has _es => (
28             is => 'ro',
29             isa => 'ElasticSearch',
30             lazy => 1,
31             default => sub {
32             my $self = shift;
33             return ElasticSearch->new(
34             transport => "aehttp",
35             servers => $self->elasticsearch_servers,
36             timeout => 30,
37             no_refresh => 1,
38             # trace_calls => 1,
39             );
40             }
41             );
42              
43             has queue => (
44             is => 'ro',
45             isa => ArrayRef,
46             default => sub { [] },
47             init_arg => undef,
48             lazy => 1,
49             clearer => '_clear_queue',
50             );
51              
52             has _indexes => (
53             isa => 'HashRef',
54             lazy => 1,
55             is => 'ro',
56             default => sub { {} },
57             clearer => '_clear_indexes',
58             );
59              
60             has verbose => (
61             isa => 'Bool',
62             is => 'ro',
63             default => sub {
64             -t STDIN
65             },
66             );
67              
68             sub consume {
69             my ($self, $data) = @_;
70             return unless $data;
71             my $date;
72             if (my $epochtime = delete($data->{epochtime})) {
73             $date = DT->from_epoch(epoch => $epochtime);
74             delete($data->{date});
75             }
76             elsif (my $try_date = delete($data->{date})) {
77             if (is_ISO8601DateTimeStr($try_date)) {
78             $date = to_DateTime($try_date);
79             }
80             }
81             $date ||= DT->from_epoch(epoch => time());
82             my $type = $data->{__CLASS__} || 'unknown';
83             my $index_name = $self->_index_name_by_dt($date);
84             $self->_indexes->{$index_name} = 1;
85             my $to_queue = {
86             type => $type,
87             index => $index_name,
88             data => {
89             '@timestamp' => to_ISO8601DateTimeStr($date),
90             '@tags' => [],
91             '@type' => $type,
92             '@source_host' => delete($data->{hostname}) || 'none',
93             '@message' => exists($data->{message}) ? delete($data->{message}) : encode_json($data),
94             '@fields' => $data,
95             },
96             exists($data->{uuid}) ? ( id => delete($data->{uuid}) ) : (),
97             };
98             push(@{$self->queue}, $to_queue);
99             if (scalar(@{$self->queue}) > 1000) {
100             $self->_flush;
101             }
102             }
103              
104             sub _index_name_by_dt {
105             my ($self, $dt) = @_;
106             return 'logstash-' . $dt->year . '.' . sprintf("%02d", $dt->month) . '.' . sprintf("%02d", $dt->day);
107             }
108              
109             has _am_flushing => (
110             isa => Bool,
111             is => 'rw',
112             default => 0,
113             );
114              
115             has _flush_timer => (
116             is => 'ro',
117             default => sub {
118             my $self = shift;
119             weaken($self);
120             AnyEvent->timer(
121             after => 1,
122             interval => 1,
123             cb => sub { $self->_flush },
124             );
125             },
126             );
127              
128             has _needs_optimize => (
129             isa => Bool,
130             is => 'rw',
131             default => 0,
132             );
133              
134             has _optimize_timer => (
135             is => 'ro',
136             default => sub {
137             my $self = shift;
138             weaken($self);
139             # FIXME!!! This is over-aggressive, you only need to do indexes
140             # when you've finished writing them.
141             my $time = 60 * 60; # Every hour
142             AnyEvent->timer(
143             after => $time,
144             interval => $time,
145             cb => sub { $self->_needs_optimize(1) },
146             );
147             },
148             );
149              
150             sub _do_optimize {
151             my $self = shift;
152             weaken($self);
153             $self->_am_flushing(1);
154             my @indexes = sort keys( %{ $self->_indexes } );
155             $self->_clear_indexes;
156             $self->_es->optimize_index(
157             index => $indexes[0],
158             wait_for_merge => 1,
159             max_num_segments => 2,
160             )->cb(sub {
161             warn("Did optimize of " . $indexes[0] . "\n") if $self->verbose;
162             $self->_am_flushing(0); $self->_needs_optimize(0) });
163             }
164              
165             sub _flush {
166             my $self = shift;
167             weaken($self);
168             return if $self->_am_flushing;
169             if ($self->_needs_optimize) {
170             return $self->_do_optimize;
171             }
172             my $queue = $self->queue;
173             return unless scalar @$queue;
174             $self->_clear_queue;
175             $self->_am_flushing(1);
176             my $res = $self->_es->bulk_index(
177             docs => $queue,
178             consistency => 'quorum',
179             );
180             $res->cb(sub {
181             my $res = shift;
182             my @indexes = sort keys( %{ $self->_indexes } );
183             warn("Indexed " . scalar(@$queue) . " " . join(", ", @indexes) . "\n") if $self->verbose;
184             $self->_am_flushing(0);
185             foreach my $result (@{ $res->{results} }) {
186             if (!$result->{index}->{ok} && !$result->{create}->{ok}) {
187             warn "Indexing failure: " . Dumper($result) . "\n";
188             last;
189             }
190             }
191             });
192             }
193              
194             has _archive_timer => (
195             is => 'ro',
196             default => sub {
197             my $self = shift;
198             weaken($self);
199             my $time = 60 * 60 * 24; # Every day
200             AnyEvent->timer(
201             after => 60, # delay 1 hour to start first loop
202             interval => $time,
203             cb => sub { $self->_archive_index() },
204             );
205             },
206             );
207              
208             # _archive_index run 1 time per day to close index older than 7 days and delete
209             # index older than 30 days
210             #
211             sub _archive_index {
212             my ($self) = @_;
213              
214             my $dt = DT->from_epoch(epoch => time());
215              
216             my $dt_to_close = $dt->clone->subtract(days => 7);
217             my $index_to_close = $self->_index_name_by_dt($dt_to_close);
218             $self->_es->close_index(index => $index_to_close)
219             ->cb( sub { warn "Close index: $index_to_close \n" if $self->verbose; });
220              
221             my $dt_to_delete = $dt->clone->subtract(days => 30);
222             my $index_to_delete = $self->_index_name_by_dt($dt_to_delete);
223             $self->_es->delete_index(
224             index => $index_to_delete,
225             ignore_missing => 1,
226             )->cb( sub { warn "Delete index: $index_to_delete \n" if $self->verbose;});
227             }
228              
229              
230             1;
231              
232             =head1 NAME
233              
234             Message::Passing::Output::ElasticSearch - output logstash messages into ElasticSearch.
235              
236             =head1 DESCRIPTION
237              
238             =head1 METHODS
239              
240             =head2 consume ($msg)
241              
242             Consumes a message, queuing it for consumption by ElasticSearch
243              
244             =head1 SEE ALSO
245              
246             =over
247              
248             =item L<Message::Passing>
249              
250             =item L<http://logstash.net>
251              
252             =back
253              
254             =head1 AUTHOR
255              
256             Tomas (t0m) Doran <bobtfish@bobtfish.net>
257              
258             =head1 COPYRIGHT
259              
260             =head1 LICENSE
261              
262             XXX - TODO
263