File Coverage

blib/lib/Log/Saftpresse/Output/Elasticsearch.pm
Criterion Covered Total %
statement 18 41 43.9
branch 0 8 0.0
condition 0 6 0.0
subroutine 6 10 60.0
pod 0 3 0.0
total 24 68 35.2


line stmt bran cond sub pod time code
1             package Log::Saftpresse::Output::Elasticsearch;
2              
3 1     1   1573 use Moose;
  1         2  
  1         9  
4              
5             # ABSTRACT: plugin to write events to elasticsearch
6             our $VERSION = '1.6'; # VERSION
7              
8             extends 'Log::Saftpresse::Output';
9              
10 1     1   5556 use Log::Saftpresse::Log4perl;
  1         3  
  1         129  
11              
12 1     1   6 use Time::Piece;
  1         1  
  1         11  
13 1     1   823 use Search::Elasticsearch;
  1         30003  
  1         38  
14 1     1   10 use JSON;
  1         1  
  1         10  
15 1     1   812 use File::Slurp;
  1         9733  
  1         703  
16              
17             has 'nodes' => ( is => 'rw', isa => 'Str', default => 'localhost:9200' );
18             has 'cxn_pool' => ( is => 'rw', isa => 'Str', default => 'Static' );
19             has 'type' => ( is => 'rw', isa => 'Str', default => 'log' );
20              
21             has 'indices_template' => (
22             is => 'rw', isa => 'Str', default => 'saftpresse-%Y-%m-%d' );
23              
24             has 'template_name' => ( is => 'ro', isa => 'Str', default => 'saftpresse' );
25             has 'install_template' => ( is => 'ro', isa => 'Bool', default => 1 );
26             has 'template_file' => ( is => 'ro', isa => 'Maybe[Str]' );
27              
28             has '_template_body' => ( is => 'ro', isa => 'HashRef', lazy => 1,
29             default => sub {
30             my $self = shift;
31             my $json_text;
32             if( defined $self->template_file ) {
33             $json_text = read_file( $self->template_file );
34             } else {
35             $json_text = read_file( \*DATA );
36             }
37             return( from_json( $json_text ) );
38             },
39             );
40              
41             sub current_index {
42 0     0 0   my $self = shift;
43 0           return( Time::Piece->new->strftime( $self->indices_template ) );
44             }
45              
46             has 'es' => ( is => 'ro', lazy => 1,
47             default => sub {
48             my $self = shift;
49             $log->debug('connecting to elasticsearch: '.$self->nodes.'...');
50             my $es = Search::Elasticsearch->new(
51             nodes => [ split(/\s*,\s*/, $self->nodes) ],
52             cxn_pool => $self->cxn_pool,
53             );
54             if( $self->install_template ) {
55             $self->_es_install_template( $es );
56             }
57             return $es;
58             },
59             );
60              
61             has 'flush' => ( is => 'rw', isa => 'Bool', default => 1 );
62              
63             has 'autoflush_count' => ( is => 'rw', isa => 'Int', default => 1000 );
64             has 'autoflush_size' => ( is => 'rw', isa => 'Int', default => 1000000 );
65             has 'autoflush_time' => ( is => 'rw', isa => 'Int', default => 10 );
66              
67             has 'bulk' => (
68             is => 'ro', isa => 'Search::Elasticsearch::Bulk', lazy => 1,
69             default => sub {
70             my $self = shift;
71             return $self->es->bulk_helper(
72             max_count => $self->autoflush_count,
73             max_size => $self->autoflush_size,
74             max_time => $self->autoflush_time,
75             );
76             },
77             );
78              
79             sub _es_install_template {
80 0     0     my ( $self, $es ) = @_;
81 0           my $name = $self->template_name;
82 0 0         if( $es->indices->exists_template( name => $name ) ) {
83 0           $log->debug("index template '$name' already in place");
84             } else {
85 0           $log->info("installing index template '$name'...");
86 0           $es->indices->put_template(
87             name => $name,
88             body => $self->_template_body,
89             );
90             }
91 0           return;
92             }
93              
94             sub index_event {
95 0     0 0   my ( $self, $e ) = @_;
96              
97 0 0 0       if( defined $e->{'time'} &&
98             ref($e->{'time'}) eq 'Time::Piece' ) {
99 0           $e->{'@timestamp'} = $e->{'time'}->datetime;
100 0           delete $e->{'time'};
101             }
102             $self->bulk->index( {
103 0           index => $self->current_index,
104             type => $self->type,
105             source => $e,
106             } );
107              
108 0           return;
109             }
110              
111             sub output {
112 0     0 0   my ( $self, @events ) = @_;
113              
114 0           foreach my $event (@events) {
115 0 0 0       if( defined $event->{'type'} && $event->{'type'} ne $self->type ) {
116 0           next;
117             }
118 0           $self->index_event( $event );
119             }
120              
121 0 0         if( $self->flush ) { $self->bulk->flush; }
  0            
122              
123 0           return;
124             }
125              
126              
127             1;
128              
129             =pod
130              
131             =encoding UTF-8
132              
133             =head1 NAME
134              
135             Log::Saftpresse::Output::Elasticsearch - plugin to write events to elasticsearch
136              
137             =head1 VERSION
138              
139             version 1.6
140              
141             =head1 AUTHOR
142              
143             Markus Benning <ich@markusbenning.de>
144              
145             =head1 COPYRIGHT AND LICENSE
146              
147             This software is Copyright (c) 1998 by James S. Seymour, 2015 by Markus Benning.
148              
149             This is free software, licensed under:
150              
151             The GNU General Public License, Version 2, June 1991
152              
153             =cut
154              
155             __DATA__
156             {
157             "template" : "saftpresse-*",
158             "mappings" : {
159             "_default_" : {
160             "_all" : {"enabled" : true, "omit_norms" : true},
161             "dynamic_templates" : [ {
162             "floating-numbers" : {
163             "match_pattern" : "regex",
164             "match" : "delay",
165             "mapping" : {
166             "type" : "float"
167             }
168             }
169             } , {
170             "integer-numbers" : {
171             "match_pattern" : "regex",
172             "match" : "pid|size|port|len|dpt|spt|ttl|tls_keylen|connection_time|code",
173             "mapping" : {
174             "type" : "integer"
175             }
176             }
177             } , {
178             "simple_strings" : {
179             "match_pattern" : "regex",
180             "match" : "facility|priority|queue_id|service|method",
181             "match_mapping_type" : "string",
182             "mapping" : {
183             "type" : "string",
184             "index" : "not_analyzed",
185             "omit_norms" : true
186             }
187             }
188             } , {
189             "string_fields" : {
190             "match" : "*",
191             "match_mapping_type" : "string",
192             "mapping" : {
193             "type" : "string",
194             "index" : "analyzed",
195             "omit_norms" : true,
196             "fields" : {
197             "raw" : {
198             "type": "string",
199             "index" : "not_analyzed",
200             "ignore_above" : 256
201             }
202             }
203             }
204             }
205             } ]
206             }
207             }
208             }