File Coverage

blib/lib/Collectd/Plugin/Read/Message/Passing.pm
Criterion Covered Total %
statement 22 24 91.6
branch n/a
condition n/a
subroutine 8 8 100.0
pod n/a
total 30 32 93.7


line stmt bran cond sub pod time code
1             package Collectd::Plugin::Read::Message::Passing;
2 1     1   1321 use strict;
  1         2  
  1         34  
3 1     1   5 use warnings;
  1         1  
  1         24  
4 1     1   4 use Collectd ();
  1         2  
  1         13  
5 1     1   5 use JSON;
  1         1  
  1         9  
6 1     1   694 use Module::Runtime qw/ require_module /;
  1         2  
  1         8  
7 1     1   52 use String::RewritePrefix ();
  1         2  
  1         15  
8 1     1   5 use Try::Tiny;
  1         2  
  1         68  
9 1     1   479 use Message::Passing::Output::Callback;
  0            
  0            
10             use AnyEvent;
11             BEGIN {
12             *tid = eval {
13             require threads;
14             } ? sub { threads->tid } : sub { 0 };
15             }
16             use namespace::clean;
17              
18             our $INPUT;
19             our %CONFIG;
20              
21             sub _clean_value {
22             my $val = shift;
23             scalar(@$val) > 1 ? $val : $val->[0];
24             }
25              
26             sub _flatten_item {
27             my $item = shift;
28             my $val;
29             if (scalar(@{$item->{children}})) {
30             $val = [ map { my $i = $_; _flatten_item($i) } @{$item->{children}} ];
31             }
32             else {
33             $val = $item->{values};
34             }
35             return {
36             $item->{key} => _clean_value($val)
37             }
38             }
39              
40             sub config {
41             my @items = @{ $_[0]->{children} };
42             foreach my $item (@items) {
43             %CONFIG = ( %{_flatten_item($item)} , %CONFIG );
44             }
45             }
46              
47             my %_TYPE_LOOKUP = (
48             'COUNTER' => 0,
49             'GAUGE' => 1,
50             );
51              
52             sub _do_message_passing_read {
53             my $message = shift;
54             Collectd::plugin_log(Collectd::LOG_WARNING, "Got message from Message::Passing " . JSON::encode_json($message));
55             my $vl = {
56             values => [ map { $_->{value} } @{$message->{values}} ],
57             plugin => $message->{plugin},
58             type => $_TYPE_LOOKUP{$message->{values}->[0]->{type}},
59             };
60             $vl = {
61             values => [ map { $_->{value} } @{$message->{values}} ],
62             plugin => $message->{plugin},
63             type => $message->{type},
64             $message->{plugin_instance} ? (plugin_instance => $message->{plugin_instance}) : (),
65             };
66             Collectd::plugin_log(Collectd::LOG_WARNING, "Got message for collectd " . JSON::encode_json($vl));
67             Collectd::plugin_dispatch_values($vl);
68             }
69              
70             # ["load",[{"min":0,"max":100,"name":"shortterm","type":1},{"min":0,"max":100,"name":"midterm","type":1},{"min":0,"max":100,"name":"longterm","type":1}],{"plugin":"load","time":1341655869.22588,"type":"load","values":[0.41,0.13,0.08],"interval":10,"host":"ldn-dev-tdoran.youdevise.com"}]
71             # "transport.tx.size",[{"min":0,"max":0,"name":"transport.tx.size","type":0}],{"plugin":"ElasticSearch","time":1341655799.77979,"type":"transport.tx.size","values":[9725948078],"interval":10,"host":"ldn-dev-tdoran.youdevise.com"}
72             sub _input {
73             if (!$INPUT) {
74             try {
75             my $out = Message::Passing::Output::Callback->new(
76             cb => \&_do_message_passing_read,
77             );
78             my $decoder = $CONFIG{decoderclass}->new(
79             %{ $CONFIG{decoderoptions} },
80             output_to => $out,
81             );
82             $INPUT = $CONFIG{inputclass}->new(
83             %{ $CONFIG{inputoptions} },
84             output_to => $decoder,
85             );
86             }
87             catch {
88             Collectd::plugin_log(Collectd::LOG_WARNING, "Got exception building inputs: $_ - DISABLING thread id " . tid());
89             undef $INPUT;
90             };
91             }
92             return $INPUT;
93             }
94              
95             sub init {
96             if (!$CONFIG{inputclass}) {
97             Collectd::plugin_log(Collectd::LOG_WARNING, "No inputclass config for Message::Passing plugin - disabling PID $$ TID " . tid());
98             return 0;
99             }
100             $CONFIG{inputclass} = String::RewritePrefix->rewrite(
101             { '' => 'Message::Passing::Input::', '+' => '' },
102             $CONFIG{inputclass}
103             );
104             if (!eval { require_module($CONFIG{inputclass}) }) {
105             Collectd::plugin_log(Collectd::LOG_WARNING, "Could not load inputclass=" . $CONFIG{InputClass} . " error: $@");
106             return 0;
107             }
108             $CONFIG{decoderclass} ||= '+Message::Passing::Filter::Decoder::JSON';
109             $CONFIG{decoderclass} = String::RewritePrefix->rewrite(
110             { '' => 'Message::Passing::Filter::Decoder::', '+' => '' },
111             $CONFIG{decoderclass}
112             );
113             if (!eval { require_module($CONFIG{decoderclass}) }) {
114             Collectd::plugin_log(Collectd::LOG_WARNING, "Could not load decoderclass=" . $CONFIG{decoderclass} . " error: $@");
115             return 0;
116             }
117             $CONFIG{inputoptions} ||= {};
118             $CONFIG{decoderoptions} ||= {};
119             $CONFIG{readtimeslice} = 0.25;
120             return 1;
121             }
122              
123             sub read {
124             _input();
125             my $cv = AnyEvent->condvar;
126             my $t = AnyEvent->timer(
127             after => $CONFIG{readtimeslice},
128             cb => sub { $cv->send },
129             );
130             $cv->recv;
131             undef $t;
132              
133             return 1;
134             }
135              
136             Collectd::plugin_register(
137             Collectd::TYPE_INIT, 'Read::Message::Passing', 'Collectd::Plugin::Read::Message::Passing::init'
138             );
139             Collectd::plugin_register(
140             Collectd::TYPE_CONFIG, 'Read::Message::Passing', 'Collectd::Plugin::Read::Message::Passing::config'
141             );
142             Collectd::plugin_register(
143             Collectd::TYPE_READ, 'Read::Message::Passing', 'Collectd::Plugin::Read::Message::Passing::read'
144             );
145              
146             1;
147              
148             =head1 NAME
149              
150             Collectd::Plugin::Read::Message::Passing - Read collectd metrics via Message::Passing
151              
152             =head1 SYNOPSIS
153              
154             # Only tested with 1 read thread!
155             ReadThreads 1
156             # You MUST setup types.db for all types you emit!
157             TypesDB "/usr/share/collectd/types.db"
158             TypesDB "/usr/local/share/collectd/types_local.db"
159            
160             Globals true
161            
162            
163             BaseName "Collectd::Plugin"
164             LoadPlugin "Read::Message::Passing"
165            
166             # MANDATORY - You MUST configure an output class
167             inputclass "ZeroMQ"
168            
169             socket_bind "tcp://192.168.0.1:5552"
170            
171             # OPTIONAL - Defaults to JSON
172             #decoderclass "JSON"
173             #
174             #
175            
176            
177              
178             Will consume metrics like this:
179              
180             {
181             "plugin":"ElasticSearch",
182             "time":1341656031.18621,
183             "values":[
184             {
185             "value":0,
186             "min":0,
187             "name":"indices.get.time",
188             "max":0,
189             "type":0
190             }
191             ],
192             "type":"indices.get.time",
193             "interval":10,
194             "host":"t0m.local"
195             }
196              
197             or, for multi-value metrics:
198              
199             {
200             "plugin":"load",
201             "time":1341655869.22588,
202             "type":"load",
203             "values":[
204             {
205             "value":0.41,
206             "min":0,"max":100,"name":"shortterm","type":1
207             },
208             {
209             "value":0.13,
210             "min":0,
211             "max":100,
212             "name": "midterm",
213             "type":1
214             },
215             {
216             "value":0.08
217             "min":0,
218             "max":100,
219             "name":"longterm",
220             "type":1
221             }
222             ],
223             "interval":10,
224             "host":"t0m.local"
225             }
226              
227             =head1 DESCRIPTION
228              
229             A collectd plugin to consume metrics from L into L.
230              
231             B This plugin is pre-alpha, and is only tested with 1 collectd thread and the ZeroMQ Input.
232              
233             B You B have registered any types you ingest in a C for collectd. Failure to do this can result in segfaults!
234              
235             =head1 PACKAGE VARIABLES
236              
237             =head2 %CONFIG
238              
239             A hash containing the following:
240              
241             =head3 inputclass
242              
243             The name of the class which will act as the Message::Passing output. Will be used as-is if prefixed with C<+>,
244             otherwise C will be prepended. Required.
245              
246             =head3 inputoptions
247              
248             The hash of options for the input class. Not required, but almost certainly needed.
249              
250             =head3 decoderclass
251              
252             The name of the class which will act the Message::Passing decoder. Will be used as-is if prefixed with C<+>,
253             otherwise C will be prepended. Optional, defaults to L.
254              
255             =head3 decoderoptions
256              
257             The hash of options for the decoder class.
258              
259             =head3 readtimeslice
260              
261             The amount of time to block in Message::Passing's read loop. Defaults to 0.25 seconds, which could
262             not be enough if you are consuming a lot of metrics..
263              
264             =head1 FUNCTIONS
265              
266             =head2 config
267              
268             Called first with configuration in the config file, munges it into the format expected
269             and places it into the C<%CONFIG> hash.
270              
271             =head2 init
272              
273             Validates the config, and initializes the C<$INPUT>
274              
275             =head2 read
276              
277             Blocks for a metric to the output in C<$INPUT>.
278              
279             =head1 BUGS
280              
281             Blocking collectd for a fixed time to allow the AnyEvent loop to run is a horrible horrible way
282             of reading.
283              
284             =head1 AUTHOR, COPYRIGHT & LICENSE
285              
286             See L.
287              
288             =cut
289