File Coverage

blib/lib/AnyEvent/ZabbixSender.pm
Criterion Covered Total %
statement 18 107 16.8
branch 0 26 0.0
condition 0 24 0.0
subroutine 6 19 31.5
pod 3 3 100.0
total 27 179 15.0


line stmt bran cond sub pod time code
1             =head1 NAME
2              
3             AnyEvent::ZabbixSender - simple and efficient zabbix data submission
4              
5             =head1 SYNOPSIS
6              
7             use AnyEvent::ZabbixSender;
8              
9             =head1 DESCRIPTION
10              
11             This module is an L user, you need to make sure that you use and
12             run a supported event loop.
13              
14             I't implements the zabbix version 2.0-3.4 protocol for item data
15             submission.
16              
17             =head2 METHODS
18              
19             =over 4
20              
21             =cut
22              
23             package AnyEvent::ZabbixSender;
24              
25 1     1   1076 use common::sense;
  1         13  
  1         5  
26              
27 1     1   523 use Errno ();
  1         1418  
  1         30  
28 1     1   7 use Scalar::Util ();
  1         2  
  1         16  
29              
30 1     1   1046 use AnyEvent ();
  1         5578  
  1         27  
31 1     1   635 use AnyEvent::Socket ();
  1         28021  
  1         35  
32 1     1   835 use AnyEvent::Handle ();
  1         8064  
  1         1873  
33              
34             our $VERSION = '1.1';
35              
36             =item $zbx = new AnyEvent::ZabbixSender [key => value...]
37              
38             Creates a (virtual) connection to a zabbix server. Since each submission
39             requires a new TCP connection, creating the connection object does not
40             actually contact the server.
41              
42             The connection object will linger in the destructor until all data has
43             been submitted or thrown away.
44              
45             You can specify various configuration parameters. The term C<@items>
46             refers to an array with C<[key, value, clock]> array-refs.
47              
48             =over 4
49              
50             =item server => "$hostname:$port" (default: C)
51              
52             The zabbix server to connect to.
53              
54             =item host => $name (default: local nodename)
55              
56             The submission host, the "technical" name from tghe zabbix configuration.
57              
58             =item delay => $seconds (default: C<0>)
59              
60             If non-zero, then the module will gather data submissions for up to this
61             number of seconds before actually submitting them as a single batch.
62              
63             Submissions can get batched even if C<0>, as events submitted while the
64             connection is being established or retried will be batched together in any
65             case.
66              
67             =item queue_time => $seconds (default: C<3600>)
68              
69             The amount of time a data item will be queued until it is thrown away when
70             the server cannot be reached.
71              
72             =item linger_time => $seconds (default: same as C)
73              
74             The amount of time the module will linger in its destructor until all
75             items have been submitted.
76              
77             =item retry_min => $seconds (default: C<30>)
78              
79             =item retry_max => $seconds (default: C<300>)
80              
81             The minimum and maximum retry times when the server cannot be reached.
82              
83             =item on_error => $cb->($zbx, \@items, $msg) (default: log and continue)
84              
85             Called on any protocol errors - these generally indicate that something
86             other than a zabbix server is running on a port. The given key-value pairs
87             are the lost items.
88              
89             =item on_loss => $cb->($zbx, \@items) (default: log and continue)
90              
91             Will be called when some data items are thrown away (this happens if the
92             server isn't reachable for at least C seconds),
93              
94             =item on_response => $cb->($zbx, \@items, \%response) (default: not called)
95              
96             Will be called with the (generally rather useless) response form the
97             zabbix server.
98              
99             =back
100              
101             =cut
102              
103             our $NOP = sub { };
104              
105             my $json = eval { require JSON::XS; JSON::XS->new } || do { require JSON::PP; JSON::PP->new };
106              
107             $json->utf8;
108              
109             sub new {
110 0     0 1   my $class = shift;
111             my $self = bless {
112             server => "localhost:10051",
113             delay => 0,
114             retry_min => 30,
115             retry_max => 300,
116             queue_time => 3600,
117             on_response => $NOP,
118             on_error => sub {
119 0     0     AE::log 4 => "$_[0]{zhost}:$_[0]{zport}: $_[2]"; # error
120             },
121             on_loss => sub {
122 0     0     my $nitems = @{ $_[1] };
  0            
123 0           AE::log 5 => "$_[0]{zhost}:$_[0]{zport}: $nitems items lost"; # warn
124             },
125              
126 0           @_,
127              
128             on_clear => $NOP,
129             }, $class;
130              
131 0           ($self->{zhost}, $self->{zport}) = AnyEvent::Socket::parse_hostport $self->{server}, 10051;
132              
133 0   0       $self->{host} //= do {
134 0           require POSIX;
135 0           (POSIX::uname())[1]
136             };
137              
138 0   0       $self->{linger_time} //= $self->{queue_time};
139              
140 0           $self
141             }
142              
143             sub DESTROY {
144 0     0     my ($self) = @_;
145              
146 0           $self->_wait;
147              
148 0           %$self = ();
149             }
150              
151             sub _wait {
152 0     0     my ($self) = @_;
153              
154 0   0       while (@{ $self->{queue} } || $self->{sending}) {
  0            
155 0           my $cv = AE::cv;
156              
157 0           my $to = AE::timer $self->{linger_time}, 0, $cv;
158 0           local $self->{on_clear} = $cv;
159              
160 0           $cv->recv;
161             }
162             }
163              
164             =item $zbx->submit ($k, $v[, $clock[, $host]])
165              
166             Submits a new key-value pair to the zabbix server. If C<$clock> is missing
167             or C, then C is used for the event timestamp. If C<$host>
168             is missing, then the default set during object creation is used.
169              
170             =item $zbx->submit_multiple ([ [$k, $v, $clock, $host]... ])
171              
172             Like C, but submits many key-value pairs at once.
173              
174             =cut
175              
176             sub submit_multiple {
177 0     0 1   my ($self, $kvcs) = @_;
178              
179 0           push @{ $self->{queue} }, [AE::now, $kvcs];
  0            
180              
181             $self->_send
182 0 0         unless $self->{sending};
183             }
184              
185             sub submit {
186 0     0 1   my ($self, $k, $v, $clock, $host) = @_;
187              
188 0           push @{ $self->{queue} }, [AE::now, [[$k, $v, $clock, $host]]];
  0            
189              
190 0           $self->_send;
191             }
192              
193             # start sending
194             sub _send {
195 0     0     my ($self) = @_;
196              
197 0 0         if ($self->{delay}) {
198 0           Scalar::Util::weaken $self;
199             $self->{delay_w} ||= AE::timer $self->{delay}, 0, sub {
200 0     0     delete $self->{delay_w};
201 0           $self->{send_immediate} = 1;
202 0 0         $self->_send2 unless $self->{sending}++;
203 0   0       };
204             } else {
205 0           $self->{send_immediate} = 1;
206 0 0         $self->_send2 unless $self->{sending}++;
207             }
208             }
209              
210             # actually do send
211             sub _send2 {
212 0     0     my ($self) = @_;
213              
214 0           Scalar::Util::weaken $self;
215             $self->{connect_w} = AnyEvent::Socket::tcp_connect $self->{zhost}, $self->{zport}, sub {
216 0     0     my ($fh) = @_;
217              
218 0 0         $fh
219             or return $self->_retry;
220            
221 0           delete $self->{retry};
222              
223 0           delete $self->{send_immediate};
224 0           my $data = delete $self->{queue};
225 0           my $items = [map @{ $_->[1] }, @$data];
  0            
226              
227             my $fail = sub {
228 0           $self->{on_error}($self, $items, $_[0]);
229 0           $self->_retry;
230 0           };
231              
232             $self->{hdl} = new AnyEvent::Handle
233             fh => $fh,
234             on_error => sub {
235 0           $fail->($_[2]);
236             },
237             on_read => sub {
238 0 0         if (13 <= length $_[0]{rbuf}) {
239 0           my ($zbxd, $version, $length) = unpack "a4 C Q<", $_[0]{rbuf};
240              
241 0 0         $zbxd eq "ZBXD"
242             or return $fail->("protocol mismatch");
243 0 0         $version == 1
244             or return $fail->("protocol version mismatch");
245              
246 0 0         if (13 + $length <= length $_[0]{rbuf}) {
247 0           delete $self->{hdl};
248              
249 0 0         my $res = eval { $json->decode (substr $_[0]{rbuf}, 13) }
  0            
250             or return $fail->("protocol error");
251              
252 0           $self->{on_response}($self, $items, $res);
253              
254 0           delete $self->{sending};
255              
256 0 0 0       $self->_send2 if delete $self->{send_immediate} && $self->{queue};
257              
258 0           $self->{on_clear}();
259             }
260             }
261             },
262 0           ;
263              
264             my $json = $json->encode ({
265             request => "sender data",
266             clock => int AE::now,
267             data => [
268             map {
269 0           my $slot = $_;
  0            
270              
271             map {
272             key => $_->[0],
273             value => $_->[1],
274             clock => int ($_->[2] // $slot->[0]),
275             host => $_->[3] // $self->{host},
276 0   0       }, @{ $slot->[1] }
  0   0        
277             } @$data
278             ],
279             });
280              
281 0           $self->{hdl}->push_write (pack "a4 C Q
282 0           };
283             }
284              
285             sub _retry {
286 0     0     my ($self) = @_;
287              
288 0           Scalar::Util::weaken $self;
289              
290 0           delete $self->{hdl};
291              
292 0           my $expire = AE::now - $self->{queue_time};
293 0   0       while (@{ $self->{queue} } && $self->{queue}[0][0] < $expire) {
  0            
294 0           $self->{on_loss}($self, [shift @{ $self->{queue} }]);
  0            
295             }
296              
297 0 0         unless (@{ $self->{queue} }) {
  0            
298 0           delete $self->{sending};
299 0           $self->{on_clear}();
300 0           return;
301             }
302              
303 0           my $retry = $self->{retry_min} * 2 ** $self->{retry}++;
304 0 0         $retry = $self->{retry_max} if $retry > $self->{retry_max};
305             $self->{retry_w} = AE::timer $retry, 0, sub {
306 0     0     delete $self->{retry_w};
307 0           $self->_send2;
308 0           };
309             }
310              
311             =back
312              
313             =head1 SEE ALSO
314              
315             L.
316              
317             =head1 AUTHOR
318              
319             Marc Lehmann
320             http://home.schmorp.de/
321              
322             =cut
323              
324             1
325