| line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
|
1
|
|
|
|
|
|
|
# ABSTRACT: a Perl port of Etsy's statsd *server* |
|
2
|
|
|
|
|
|
|
|
|
3
|
|
|
|
|
|
|
package Net::Statsd::Server; |
|
4
|
|
|
|
|
|
|
|
|
5
|
|
|
|
|
|
|
# Use statements {{{ |
|
6
|
|
|
|
|
|
|
|
|
7
|
3
|
|
|
3
|
|
47379
|
use strict; |
|
|
3
|
|
|
|
|
4
|
|
|
|
3
|
|
|
|
|
66
|
|
|
8
|
|
|
|
|
|
|
#se warnings; |
|
9
|
|
|
|
|
|
|
|
|
10
|
3
|
|
|
3
|
|
1684
|
use JSON::XS (); |
|
|
3
|
|
|
|
|
14163
|
|
|
|
3
|
|
|
|
|
69
|
|
|
11
|
3
|
|
|
3
|
|
1390
|
use Socket qw(SOL_SOCKET SO_RCVBUF); |
|
|
3
|
|
|
|
|
9083
|
|
|
|
3
|
|
|
|
|
388
|
|
|
12
|
3
|
|
|
3
|
|
1291
|
use Time::HiRes (); |
|
|
3
|
|
|
|
|
2619
|
|
|
|
3
|
|
|
|
|
62
|
|
|
13
|
|
|
|
|
|
|
|
|
14
|
3
|
|
|
3
|
|
2576
|
use AnyEvent; |
|
|
3
|
|
|
|
|
11022
|
|
|
|
3
|
|
|
|
|
79
|
|
|
15
|
3
|
|
|
3
|
|
1819
|
use AnyEvent::Handle; |
|
|
3
|
|
|
|
|
31736
|
|
|
|
3
|
|
|
|
|
86
|
|
|
16
|
3
|
|
|
3
|
|
1313
|
use AnyEvent::Handle::UDP; |
|
|
3
|
|
|
|
|
151149
|
|
|
|
3
|
|
|
|
|
99
|
|
|
17
|
3
|
|
|
3
|
|
1600
|
use AnyEvent::Log; |
|
|
3
|
|
|
|
|
22519
|
|
|
|
3
|
|
|
|
|
97
|
|
|
18
|
3
|
|
|
3
|
|
15
|
use AnyEvent::Socket; |
|
|
3
|
|
|
|
|
3
|
|
|
|
3
|
|
|
|
|
244
|
|
|
19
|
|
|
|
|
|
|
|
|
20
|
3
|
|
|
3
|
|
1096
|
use Net::Statsd::Server::Backend; |
|
|
3
|
|
|
|
|
10
|
|
|
|
3
|
|
|
|
|
67
|
|
|
21
|
3
|
|
|
3
|
|
1002
|
use Net::Statsd::Server::Metrics; |
|
|
3
|
|
|
|
|
17
|
|
|
|
3
|
|
|
|
|
104
|
|
|
22
|
|
|
|
|
|
|
|
|
23
|
|
|
|
|
|
|
# }}} |
|
24
|
|
|
|
|
|
|
|
|
25
|
|
|
|
|
|
|
# Constants and global variables {{{ |
|
26
|
|
|
|
|
|
|
|
|
27
|
|
|
|
|
|
|
use constant { |
|
28
|
3
|
|
|
|
|
9082
|
DEBUG => 0, |
|
29
|
|
|
|
|
|
|
DEFAULT_CONFIG_FILE => 'localConfig.js', |
|
30
|
|
|
|
|
|
|
DEFAULT_FLUSH_INTERVAL => 10_000, |
|
31
|
|
|
|
|
|
|
DEFAULT_LOG_LEVEL => 'info', |
|
32
|
|
|
|
|
|
|
RECEIVE_BUFFER_MB => 8, # 0 = setsockopt disabled |
|
33
|
3
|
|
|
3
|
|
14
|
}; |
|
|
3
|
|
|
|
|
3
|
|
|
34
|
|
|
|
|
|
|
|
|
35
|
|
|
|
|
|
|
our $VERSION = '0.17'; |
|
36
|
|
|
|
|
|
|
our $logger; |
|
37
|
|
|
|
|
|
|
|
|
38
|
|
|
|
|
|
|
# }}} |
|
39
|
|
|
|
|
|
|
|
|
40
|
|
|
|
|
|
|
sub new { |
|
41
|
1
|
|
|
1
|
0
|
380
|
my ($class, $opt) = @_; |
|
42
|
1
|
|
50
|
|
|
3
|
$opt ||= {}; |
|
43
|
1
|
|
33
|
|
|
5
|
$class = ref $class || $class; |
|
44
|
|
|
|
|
|
|
|
|
45
|
1
|
|
|
|
|
5
|
my $startup_time = time(); |
|
46
|
|
|
|
|
|
|
|
|
47
|
|
|
|
|
|
|
# Initialize data structures with defaults for statsd stats |
|
48
|
|
|
|
|
|
|
my $self = { |
|
49
|
|
|
|
|
|
|
|
|
50
|
|
|
|
|
|
|
startup_time => $startup_time, |
|
51
|
|
|
|
|
|
|
start_time_hi => [Time::HiRes::gettimeofday], |
|
52
|
|
|
|
|
|
|
|
|
53
|
|
|
|
|
|
|
server => undef, |
|
54
|
|
|
|
|
|
|
mgmtServer => undef, |
|
55
|
|
|
|
|
|
|
|
|
56
|
|
|
|
|
|
|
config_file => $opt->{config_file}, |
|
57
|
1
|
|
|
|
|
16
|
config => undef, |
|
58
|
|
|
|
|
|
|
stats => { |
|
59
|
|
|
|
|
|
|
messages => { |
|
60
|
|
|
|
|
|
|
"last_msg_seen" => $startup_time, |
|
61
|
|
|
|
|
|
|
"bad_lines_seen" => 0, |
|
62
|
|
|
|
|
|
|
} |
|
63
|
|
|
|
|
|
|
}, |
|
64
|
|
|
|
|
|
|
metrics => undef, |
|
65
|
|
|
|
|
|
|
|
|
66
|
|
|
|
|
|
|
debugInt => undef, |
|
67
|
|
|
|
|
|
|
flushInterval => undef, |
|
68
|
|
|
|
|
|
|
|
|
69
|
|
|
|
|
|
|
backends => [], |
|
70
|
|
|
|
|
|
|
logger => $logger, |
|
71
|
|
|
|
|
|
|
}; |
|
72
|
|
|
|
|
|
|
|
|
73
|
|
|
|
|
|
|
$self->{$_} = $opt->{$_} |
|
74
|
1
|
|
|
|
|
2
|
for keys %{ $opt }; |
|
|
1
|
|
|
|
|
5
|
|
|
75
|
|
|
|
|
|
|
|
|
76
|
1
|
|
|
|
|
3
|
bless $self, $class; |
|
77
|
|
|
|
|
|
|
} |
|
78
|
|
|
|
|
|
|
|
|
79
|
|
|
|
|
|
|
# Flatten JSON booleans to avoid calls to JSON::XS::bool |
|
80
|
|
|
|
|
|
|
# in the performance-critical code paths |
|
81
|
|
|
|
|
|
|
sub _flatten_bools { |
|
82
|
1
|
|
|
1
|
|
1
|
my ($self, $conf_hash) = @_; |
|
83
|
1
|
|
|
|
|
3
|
for (qw(dumpMessages debug)) { |
|
84
|
2
|
|
|
|
|
34
|
$conf_hash->{$_} = !! $conf_hash->{$_}; |
|
85
|
|
|
|
|
|
|
} |
|
86
|
1
|
|
|
|
|
5
|
return $conf_hash; |
|
87
|
|
|
|
|
|
|
} |
|
88
|
|
|
|
|
|
|
|
|
89
|
|
|
|
|
|
|
sub _json_emitter { |
|
90
|
0
|
|
|
0
|
|
0
|
my ($self) = @_; |
|
91
|
0
|
|
|
|
|
0
|
my $js = JSON::XS->new() |
|
92
|
|
|
|
|
|
|
->utf8(1) |
|
93
|
|
|
|
|
|
|
->shrink(1) |
|
94
|
|
|
|
|
|
|
->space_before(0) |
|
95
|
|
|
|
|
|
|
->space_after(1) |
|
96
|
|
|
|
|
|
|
->indent(0); |
|
97
|
0
|
|
|
|
|
0
|
return $js; |
|
98
|
|
|
|
|
|
|
} |
|
99
|
|
|
|
|
|
|
|
|
100
|
|
|
|
|
|
|
sub _start_time_hi { |
|
101
|
0
|
|
|
0
|
|
0
|
return $_[0]->{start_time_hi}; |
|
102
|
|
|
|
|
|
|
} |
|
103
|
|
|
|
|
|
|
|
|
104
|
|
|
|
|
|
|
sub config_defaults { |
|
105
|
|
|
|
|
|
|
return { |
|
106
|
1
|
|
|
1
|
0
|
10
|
"debug" => 0, |
|
107
|
|
|
|
|
|
|
"debugInterval" => 10000, # ms |
|
108
|
|
|
|
|
|
|
"graphitePort" => 2003, |
|
109
|
|
|
|
|
|
|
"port" => 8125, |
|
110
|
|
|
|
|
|
|
"address" => "0.0.0.0", |
|
111
|
|
|
|
|
|
|
"mgmt_port" => 8126, |
|
112
|
|
|
|
|
|
|
"mgmt_address" => "0.0.0.0", |
|
113
|
|
|
|
|
|
|
"flushInterval" => DEFAULT_FLUSH_INTERVAL, # ms |
|
114
|
|
|
|
|
|
|
#"keyFlush" => { |
|
115
|
|
|
|
|
|
|
# "interval" => 10, # s |
|
116
|
|
|
|
|
|
|
# "percent" => 100, |
|
117
|
|
|
|
|
|
|
# "log" => "", |
|
118
|
|
|
|
|
|
|
#}, |
|
119
|
|
|
|
|
|
|
"log" => { |
|
120
|
|
|
|
|
|
|
"backend" => "stdout", |
|
121
|
|
|
|
|
|
|
"level" => "LOG_INFO", |
|
122
|
|
|
|
|
|
|
}, |
|
123
|
|
|
|
|
|
|
|
|
124
|
|
|
|
|
|
|
"prefixStats" => "statsd", |
|
125
|
|
|
|
|
|
|
"dumpMessages" => 0, |
|
126
|
|
|
|
|
|
|
|
|
127
|
|
|
|
|
|
|
"deleteIdleStats" => 0, |
|
128
|
|
|
|
|
|
|
#"deleteCounters" => 0, |
|
129
|
|
|
|
|
|
|
#"deleteGauges" => 0, |
|
130
|
|
|
|
|
|
|
#"deleteSets" => 0, |
|
131
|
|
|
|
|
|
|
#"deleteTimers" => 0, |
|
132
|
|
|
|
|
|
|
|
|
133
|
|
|
|
|
|
|
"percentThreshold" => [ 90 ], |
|
134
|
|
|
|
|
|
|
|
|
135
|
|
|
|
|
|
|
"backends" => [ |
|
136
|
|
|
|
|
|
|
"Console", |
|
137
|
|
|
|
|
|
|
], |
|
138
|
|
|
|
|
|
|
}; |
|
139
|
|
|
|
|
|
|
} |
|
140
|
|
|
|
|
|
|
|
|
141
|
|
|
|
|
|
|
sub config { |
|
142
|
1
|
|
|
1
|
0
|
5
|
my ($self, $config_file) = @_; |
|
143
|
|
|
|
|
|
|
|
|
144
|
1
|
50
|
33
|
|
|
9
|
if (exists $self->{config} && defined $self->{config}) { |
|
145
|
0
|
|
|
|
|
0
|
return $self->{config}; |
|
146
|
|
|
|
|
|
|
} |
|
147
|
|
|
|
|
|
|
|
|
148
|
1
|
|
33
|
|
|
5
|
$config_file ||= $self->config_file(); |
|
149
|
|
|
|
|
|
|
|
|
150
|
1
|
50
|
|
|
|
15
|
if (! -e $config_file) { |
|
151
|
0
|
|
|
|
|
0
|
return; |
|
152
|
|
|
|
|
|
|
} |
|
153
|
|
|
|
|
|
|
|
|
154
|
1
|
|
|
|
|
2
|
my $defaults = $self->config_defaults(); |
|
155
|
|
|
|
|
|
|
|
|
156
|
1
|
50
|
|
|
|
22
|
open my $conf_fh, '<', $config_file |
|
157
|
|
|
|
|
|
|
or return $defaults; |
|
158
|
|
|
|
|
|
|
|
|
159
|
1
|
|
|
|
|
16
|
my $conf_json = join("", <$conf_fh>); |
|
160
|
1
|
|
|
|
|
6
|
close $conf_fh; |
|
161
|
|
|
|
|
|
|
|
|
162
|
1
|
|
|
|
|
10
|
my $json = JSON::XS->new->relaxed->utf8; |
|
163
|
1
|
|
|
|
|
10
|
my $conf_hash = $json->decode($conf_json); |
|
164
|
|
|
|
|
|
|
|
|
165
|
1
|
|
|
|
|
3
|
$conf_hash = $self->_flatten_bools($conf_hash); |
|
166
|
|
|
|
|
|
|
|
|
167
|
|
|
|
|
|
|
# Poor man's Hash::Merge |
|
168
|
1
|
|
|
|
|
1
|
for (keys %{ $defaults }) { |
|
|
1
|
|
|
|
|
4
|
|
|
169
|
14
|
100
|
|
|
|
19
|
if (! exists $conf_hash->{$_}) { |
|
170
|
4
|
|
|
|
|
5
|
$conf_hash->{$_} = $defaults->{$_}; |
|
171
|
|
|
|
|
|
|
} |
|
172
|
|
|
|
|
|
|
} |
|
173
|
|
|
|
|
|
|
|
|
174
|
1
|
|
|
|
|
9
|
return $self->{config} = $conf_hash; |
|
175
|
|
|
|
|
|
|
} |
|
176
|
|
|
|
|
|
|
|
|
177
|
|
|
|
|
|
|
sub clear_metrics { |
|
178
|
0
|
|
|
0
|
0
|
0
|
my ($self) = @_; |
|
179
|
|
|
|
|
|
|
|
|
180
|
0
|
|
|
|
|
0
|
my $conf = $self->config; |
|
181
|
|
|
|
|
|
|
|
|
182
|
0
|
|
|
|
|
0
|
my $del_counters = $conf->{deleteCounters}; |
|
183
|
0
|
|
|
|
|
0
|
my $del_gauges = $conf->{deleteGauges}; |
|
184
|
0
|
|
|
|
|
0
|
my $del_sets = $conf->{deleteSets}; |
|
185
|
0
|
|
|
|
|
0
|
my $del_timers = $conf->{deleteTimers}; |
|
186
|
|
|
|
|
|
|
|
|
187
|
|
|
|
|
|
|
# Metrics that are not seen in the interval won't |
|
188
|
|
|
|
|
|
|
# be sent anymore. Enable this with 'deleteIdleStats' |
|
189
|
0
|
|
|
|
|
0
|
my $del_idle = _defined_or($conf->{deleteIdleStats}, 0); |
|
190
|
|
|
|
|
|
|
|
|
191
|
0
|
0
|
|
|
|
0
|
if ($del_idle) { |
|
192
|
0
|
|
|
|
|
0
|
$del_counters = _defined_or($del_counters, 1); |
|
193
|
0
|
|
|
|
|
0
|
$del_gauges = _defined_or($del_gauges, 1); |
|
194
|
0
|
|
|
|
|
0
|
$del_timers = _defined_or($del_timers, 1); |
|
195
|
0
|
|
|
|
|
0
|
$del_sets = _defined_or($del_sets, 1); |
|
196
|
|
|
|
|
|
|
} |
|
197
|
|
|
|
|
|
|
|
|
198
|
|
|
|
|
|
|
# Whether to just reset them to zero or to wipe them |
|
199
|
0
|
|
|
|
|
0
|
my $metrics = $self->{metrics}; |
|
200
|
0
|
0
|
|
|
|
0
|
if ($del_counters) { |
|
201
|
0
|
|
|
|
|
0
|
$metrics->{counters} = {}; |
|
202
|
0
|
|
|
|
|
0
|
$metrics->{counter_rates} = {}; |
|
203
|
|
|
|
|
|
|
} |
|
204
|
|
|
|
|
|
|
else { |
|
205
|
0
|
|
|
|
|
0
|
my $counters = $metrics->{counters}; |
|
206
|
0
|
|
|
|
|
0
|
my $counter_rates = $metrics->{counter_rates}; |
|
207
|
0
|
|
|
|
|
0
|
$_ = 0 for |
|
208
|
0
|
|
|
|
|
0
|
values %{ $counters }, |
|
209
|
0
|
|
|
|
|
0
|
values %{ $counter_rates }; |
|
210
|
|
|
|
|
|
|
} |
|
211
|
|
|
|
|
|
|
|
|
212
|
0
|
0
|
|
|
|
0
|
if ($del_timers) { |
|
213
|
0
|
|
|
|
|
0
|
$metrics->{timers} = {}; |
|
214
|
0
|
|
|
|
|
0
|
$metrics->{timer_data} = {}; |
|
215
|
|
|
|
|
|
|
} |
|
216
|
|
|
|
|
|
|
else { |
|
217
|
0
|
|
|
|
|
0
|
my $timers = $metrics->{timers}; |
|
218
|
0
|
|
|
|
|
0
|
my $timer_data = $metrics->{timer_data}; |
|
219
|
0
|
|
|
|
|
0
|
$_ = [] for |
|
220
|
0
|
|
|
|
|
0
|
values %{ $timers }, |
|
221
|
0
|
|
|
|
|
0
|
values %{ $timer_data }; |
|
222
|
|
|
|
|
|
|
} |
|
223
|
|
|
|
|
|
|
|
|
224
|
0
|
0
|
|
|
|
0
|
if ($del_gauges) { |
|
225
|
0
|
|
|
|
|
0
|
$metrics->{gauges} = {}; |
|
226
|
|
|
|
|
|
|
} |
|
227
|
|
|
|
|
|
|
|
|
228
|
0
|
0
|
|
|
|
0
|
if ($del_sets) { |
|
229
|
0
|
|
|
|
|
0
|
$metrics->{sets} = {}; |
|
230
|
|
|
|
|
|
|
} |
|
231
|
|
|
|
|
|
|
else { |
|
232
|
0
|
|
|
|
|
0
|
my $sets = $metrics->{sets}; |
|
233
|
0
|
|
|
|
|
0
|
$_ = {} for values %{ $sets }; |
|
|
0
|
|
|
|
|
0
|
|
|
234
|
|
|
|
|
|
|
} |
|
235
|
|
|
|
|
|
|
|
|
236
|
0
|
|
|
|
|
0
|
return; |
|
237
|
|
|
|
|
|
|
} |
|
238
|
|
|
|
|
|
|
|
|
239
|
|
|
|
|
|
|
sub config_file { |
|
240
|
1
|
|
|
1
|
0
|
4
|
_defined_or($_[0]->{config_file}, DEFAULT_CONFIG_FILE); |
|
241
|
|
|
|
|
|
|
} |
|
242
|
|
|
|
|
|
|
|
|
243
|
|
|
|
|
|
|
sub flush_metrics { |
|
244
|
0
|
|
|
0
|
0
|
0
|
my ($self) = @_; |
|
245
|
0
|
|
|
|
|
0
|
my $flush_start_time = time; |
|
246
|
0
|
|
|
|
|
0
|
$logger->(notice => "flushing metrics"); |
|
247
|
0
|
|
|
|
|
0
|
my $flush_interval = $self->config->{flushInterval}; |
|
248
|
0
|
|
|
|
|
0
|
my $metrics = $self->metrics->process($flush_interval); |
|
249
|
|
|
|
|
|
|
$self->foreach_backend(sub { |
|
250
|
0
|
|
|
0
|
|
0
|
$_[0]->flush($flush_start_time, $metrics); |
|
251
|
0
|
|
|
|
|
0
|
}); |
|
252
|
0
|
|
|
|
|
0
|
$self->clear_metrics(); |
|
253
|
0
|
|
|
|
|
0
|
return; |
|
254
|
|
|
|
|
|
|
} |
|
255
|
|
|
|
|
|
|
|
|
256
|
|
|
|
|
|
|
# This is the performance-critical section of Net::Statsd::Server. |
|
257
|
|
|
|
|
|
|
# Everything below has been optimised for performance rather than |
|
258
|
|
|
|
|
|
|
# legibility or transparency. Be careful. |
|
259
|
|
|
|
|
|
|
|
|
260
|
|
|
|
|
|
|
sub handle_client_packet { |
|
261
|
0
|
|
|
0
|
0
|
0
|
my ($self, $request) = @_; |
|
262
|
|
|
|
|
|
|
|
|
263
|
0
|
|
|
|
|
0
|
my $config = $self->{config}; |
|
264
|
0
|
|
|
|
|
0
|
my $metrics = $self->{metrics}; |
|
265
|
0
|
|
|
|
|
0
|
my $counters = $metrics->{counters}; |
|
266
|
0
|
|
|
|
|
0
|
my $stats = $self->{stats}; |
|
267
|
0
|
|
|
|
|
0
|
my $g_pref = $config->{prefixStats}; |
|
268
|
|
|
|
|
|
|
|
|
269
|
0
|
|
|
|
|
0
|
$counters->{"${g_pref}.packets_received"}++; |
|
270
|
|
|
|
|
|
|
|
|
271
|
|
|
|
|
|
|
# TODO backendEvents.emit('packet', msg, rinfo); |
|
272
|
|
|
|
|
|
|
|
|
273
|
0
|
|
|
|
|
0
|
my @metrics = split("\n", $request); |
|
274
|
|
|
|
|
|
|
|
|
275
|
0
|
|
|
|
|
0
|
my $dump_messages = $config->{dumpMessages}; |
|
276
|
|
|
|
|
|
|
my $must_count_keys = exists $config->{keyFlush} |
|
277
|
0
|
|
0
|
|
|
0
|
&& $config->{keyFlush}->{interval}; |
|
278
|
|
|
|
|
|
|
|
|
279
|
0
|
|
|
|
|
0
|
for my $m (@metrics) { |
|
280
|
|
|
|
|
|
|
|
|
281
|
0
|
0
|
|
|
|
0
|
$logger->(debug => $m) if $dump_messages; |
|
282
|
|
|
|
|
|
|
|
|
283
|
0
|
|
|
|
|
0
|
my @bits = split(":", $m); |
|
284
|
0
|
|
|
|
|
0
|
my $key = shift @bits; |
|
285
|
|
|
|
|
|
|
|
|
286
|
|
|
|
|
|
|
# Keep [,=] to allow sending of tags. Handy for Influxdb integration. |
|
287
|
0
|
|
|
|
|
0
|
$key =~ y{/ }{_-}s; |
|
288
|
0
|
|
|
|
|
0
|
$key =~ y{a-zA-Z0-9_\-\.,=}{}cd; |
|
289
|
|
|
|
|
|
|
|
|
290
|
|
|
|
|
|
|
# Not very clear here. Etsy's code was doing this differently |
|
291
|
0
|
0
|
|
|
|
0
|
if ($must_count_keys) { |
|
292
|
0
|
|
|
|
|
0
|
my $key_counter = $metrics->{keyCounter}; |
|
293
|
0
|
|
|
|
|
0
|
$key_counter->{$key}++; |
|
294
|
|
|
|
|
|
|
} |
|
295
|
|
|
|
|
|
|
|
|
296
|
0
|
0
|
|
|
|
0
|
push @bits, "1" if 0 == @bits; |
|
297
|
|
|
|
|
|
|
|
|
298
|
0
|
|
|
|
|
0
|
for my $i (0..$#bits) { |
|
299
|
|
|
|
|
|
|
|
|
300
|
0
|
|
|
|
|
0
|
my $sample_rate = 1; |
|
301
|
0
|
|
|
|
|
0
|
my @fields = split(/\|/, $bits[$i]); |
|
302
|
|
|
|
|
|
|
|
|
303
|
0
|
0
|
0
|
|
|
0
|
if (! defined $fields[1] || $fields[1] eq "") { |
|
304
|
0
|
|
|
|
|
0
|
$logger->(warn => "Bad line: $bits[$i] in msg \"$m\""); |
|
305
|
0
|
|
|
|
|
0
|
$counters->{"${g_pref}.bad_lines_seen"}++; |
|
306
|
0
|
|
|
|
|
0
|
$stats->{"messages"}->{"bad_lines_seen"}++; |
|
307
|
0
|
|
|
|
|
0
|
next; |
|
308
|
|
|
|
|
|
|
} |
|
309
|
|
|
|
|
|
|
|
|
310
|
0
|
|
0
|
|
|
0
|
my $value = $fields[0] || 0; |
|
311
|
0
|
|
|
|
|
0
|
my $unit = $fields[1]; |
|
312
|
0
|
|
|
|
|
0
|
for ($unit) { |
|
313
|
0
|
|
|
|
|
0
|
s{^\s*}{}; |
|
314
|
0
|
|
|
|
|
0
|
s{\s*$}{}; |
|
315
|
|
|
|
|
|
|
} |
|
316
|
|
|
|
|
|
|
|
|
317
|
|
|
|
|
|
|
# Timers |
|
318
|
0
|
0
|
|
|
|
0
|
if ($unit eq "ms") { |
|
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
319
|
0
|
|
|
|
|
0
|
my $timers = $metrics->{timers}; |
|
320
|
0
|
|
0
|
|
|
0
|
$timers->{$key} ||= []; |
|
321
|
0
|
|
|
|
|
0
|
push @{ $timers->{$key} }, $value; |
|
|
0
|
|
|
|
|
0
|
|
|
322
|
|
|
|
|
|
|
} |
|
323
|
|
|
|
|
|
|
|
|
324
|
|
|
|
|
|
|
# Gauges |
|
325
|
|
|
|
|
|
|
elsif ($unit eq "g") { |
|
326
|
0
|
|
|
|
|
0
|
my $gauges = $metrics->{gauges}; |
|
327
|
0
|
|
|
|
|
0
|
$gauges->{$key} = $value; |
|
328
|
|
|
|
|
|
|
} |
|
329
|
|
|
|
|
|
|
|
|
330
|
|
|
|
|
|
|
# Sets |
|
331
|
|
|
|
|
|
|
elsif ($unit eq "s") { |
|
332
|
|
|
|
|
|
|
# Treat set as a normal hash with undef keys |
|
333
|
|
|
|
|
|
|
# to minimize memory consumption *and* insertion speed |
|
334
|
0
|
|
|
|
|
0
|
my $sets = $metrics->{sets}; |
|
335
|
0
|
|
0
|
|
|
0
|
$sets->{$key} ||= {}; |
|
336
|
0
|
|
|
|
|
0
|
$sets->{$key}->{$value} = undef; |
|
337
|
|
|
|
|
|
|
} |
|
338
|
|
|
|
|
|
|
|
|
339
|
|
|
|
|
|
|
# Counters |
|
340
|
|
|
|
|
|
|
else { |
|
341
|
0
|
0
|
|
|
|
0
|
if (defined $fields[2]) { |
|
342
|
0
|
0
|
|
|
|
0
|
if ($fields[2] =~ m{^\@([\d\.]+)}) { |
|
343
|
0
|
|
|
|
|
0
|
$sample_rate = $1 + 0; |
|
344
|
|
|
|
|
|
|
} |
|
345
|
|
|
|
|
|
|
else { |
|
346
|
0
|
|
|
|
|
0
|
$logger->(warn => "Bad line: $bits[$i] in msg \"$m\"; has invalid sample rate"); |
|
347
|
0
|
|
|
|
|
0
|
$counters->{"${g_pref}.bad_lines_seen"}++; |
|
348
|
0
|
|
|
|
|
0
|
$stats->{"messages"}->{"bad_lines_seen"}++; |
|
349
|
0
|
|
|
|
|
0
|
next; |
|
350
|
|
|
|
|
|
|
} |
|
351
|
|
|
|
|
|
|
} |
|
352
|
0
|
|
0
|
|
|
0
|
$counters->{$key} ||= 0; |
|
353
|
0
|
|
0
|
|
|
0
|
$value ||= 1; |
|
354
|
0
|
|
|
|
|
0
|
$value /= $sample_rate; |
|
355
|
0
|
|
|
|
|
0
|
$counters->{$key} += $value; |
|
356
|
|
|
|
|
|
|
} |
|
357
|
|
|
|
|
|
|
} |
|
358
|
|
|
|
|
|
|
} |
|
359
|
|
|
|
|
|
|
|
|
360
|
0
|
|
|
|
|
0
|
$stats->{"messages"}->{"last_msg_seen"} = time(); |
|
361
|
|
|
|
|
|
|
} |
|
362
|
|
|
|
|
|
|
|
|
363
|
|
|
|
|
|
|
sub handle_manager_command { |
|
364
|
0
|
|
|
0
|
0
|
0
|
my ($self, $handle, $request) = @_; |
|
365
|
0
|
|
|
|
|
0
|
my @cmdline = split(" ", trim($request)); |
|
366
|
0
|
|
|
|
|
0
|
my $cmd = shift @cmdline; |
|
367
|
0
|
|
|
|
|
0
|
my $reply; |
|
368
|
|
|
|
|
|
|
|
|
369
|
0
|
|
|
|
|
0
|
$logger->(notice => "Received manager command '$cmd' (req=$request)"); |
|
370
|
|
|
|
|
|
|
|
|
371
|
0
|
0
|
|
|
|
0
|
if ($cmd eq "help") { |
|
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
372
|
0
|
|
|
|
|
0
|
$reply = ( |
|
373
|
|
|
|
|
|
|
"Commands: stats, counters, timers, gauges, delcounters, deltimers, delgauges, quit\015\012\015\012" |
|
374
|
|
|
|
|
|
|
); |
|
375
|
|
|
|
|
|
|
} |
|
376
|
|
|
|
|
|
|
elsif ($cmd eq "stats") { |
|
377
|
0
|
|
|
|
|
0
|
my $now = time; |
|
378
|
0
|
|
|
|
|
0
|
my $uptime = $now - $self->{startup_time}; |
|
379
|
0
|
|
|
|
|
0
|
$reply = "uptime: $uptime\n"; |
|
380
|
|
|
|
|
|
|
|
|
381
|
|
|
|
|
|
|
# Loop through the base stats |
|
382
|
0
|
|
|
|
|
0
|
my $stats = $self->stats; |
|
383
|
|
|
|
|
|
|
|
|
384
|
0
|
|
|
|
|
0
|
for my $group (keys %{$stats}) { |
|
|
0
|
|
|
|
|
0
|
|
|
385
|
0
|
|
|
|
|
0
|
for my $metric (keys %{$stats->{$group}}) { |
|
|
0
|
|
|
|
|
0
|
|
|
386
|
0
|
|
|
|
|
0
|
my $val = $stats->{$group}->{$metric}; |
|
387
|
0
|
0
|
|
|
|
0
|
my $delta = $metric =~ m{^last_} |
|
388
|
|
|
|
|
|
|
? $now - $val |
|
389
|
|
|
|
|
|
|
: $val; |
|
390
|
0
|
|
|
|
|
0
|
$reply .= "${group}.${metric}: ${delta}\n"; |
|
391
|
|
|
|
|
|
|
} |
|
392
|
|
|
|
|
|
|
} |
|
393
|
|
|
|
|
|
|
|
|
394
|
|
|
|
|
|
|
$self->foreach_backend(sub { |
|
395
|
0
|
|
|
0
|
|
0
|
my $backend_status = $_[0]->status; |
|
396
|
0
|
0
|
0
|
|
|
0
|
if ($backend_status && ref $backend_status eq "HASH") { |
|
397
|
0
|
|
|
|
|
0
|
for (keys %{ $backend_status }) { |
|
|
0
|
|
|
|
|
0
|
|
|
398
|
|
|
|
|
|
|
$reply .= sprintf("%s.%s: %s\n", |
|
399
|
|
|
|
|
|
|
lc($_[0]->name), |
|
400
|
0
|
|
|
|
|
0
|
$_ => $backend_status->{$_} |
|
401
|
|
|
|
|
|
|
); |
|
402
|
|
|
|
|
|
|
} |
|
403
|
|
|
|
|
|
|
} |
|
404
|
0
|
|
|
|
|
0
|
}); |
|
405
|
|
|
|
|
|
|
|
|
406
|
0
|
|
|
|
|
0
|
$reply .= "END\n\n"; |
|
407
|
|
|
|
|
|
|
} |
|
408
|
|
|
|
|
|
|
elsif ($cmd eq "counters") { |
|
409
|
0
|
|
|
|
|
0
|
my $counters = $self->{metrics}->{counters}; |
|
410
|
0
|
|
|
|
|
0
|
$reply = $self->_json_emitter()->encode($counters); |
|
411
|
0
|
|
|
|
|
0
|
$reply .= "\nEND\n\n"; |
|
412
|
|
|
|
|
|
|
} |
|
413
|
|
|
|
|
|
|
elsif ($cmd eq "timers") { |
|
414
|
0
|
|
|
|
|
0
|
my $timers = $self->{metrics}->{timers}; |
|
415
|
0
|
|
|
|
|
0
|
$reply = $self->_json_emitter()->encode($timers); |
|
416
|
0
|
|
|
|
|
0
|
$reply .= "\nEND\n\n"; |
|
417
|
|
|
|
|
|
|
} |
|
418
|
|
|
|
|
|
|
elsif ($cmd eq "gauges") { |
|
419
|
0
|
|
|
|
|
0
|
my $gauges = $self->{metrics}->{gauges}; |
|
420
|
0
|
|
|
|
|
0
|
$reply = $self->_json_emitter()->encode($gauges); |
|
421
|
0
|
|
|
|
|
0
|
$reply .= "\nEND\n\n"; |
|
422
|
|
|
|
|
|
|
} |
|
423
|
|
|
|
|
|
|
elsif ($cmd eq "sets") { |
|
424
|
0
|
|
|
|
|
0
|
my $sets = $self->{metrics}->{sets}; |
|
425
|
0
|
|
|
|
|
0
|
my $sets_as_lists = {}; |
|
426
|
|
|
|
|
|
|
# FIXME Not really happy about this... |
|
427
|
|
|
|
|
|
|
# if you have huge sets, it's going to suck. |
|
428
|
|
|
|
|
|
|
# If you have huge sets, probably statsd is not for you anyway. |
|
429
|
0
|
|
|
|
|
0
|
for my $set (keys %{$sets}) { |
|
|
0
|
|
|
|
|
0
|
|
|
430
|
0
|
|
|
|
|
0
|
$sets_as_lists->{$set} = [ keys %{ $sets->{$set} } ]; |
|
|
0
|
|
|
|
|
0
|
|
|
431
|
|
|
|
|
|
|
} |
|
432
|
0
|
|
|
|
|
0
|
$reply = $self->_json_emitter()->encode($sets_as_lists); |
|
433
|
0
|
|
|
|
|
0
|
$reply .= "\nEND\n\n"; |
|
434
|
|
|
|
|
|
|
} |
|
435
|
|
|
|
|
|
|
elsif ($cmd eq "delcounters") { |
|
436
|
0
|
|
|
|
|
0
|
my $counters = $self->{metrics}->{counters}; |
|
437
|
0
|
|
|
|
|
0
|
for my $name (@cmdline) { |
|
438
|
0
|
|
|
|
|
0
|
delete $counters->{$name}; |
|
439
|
0
|
|
|
|
|
0
|
$reply .= "deleted: $name\n"; |
|
440
|
|
|
|
|
|
|
} |
|
441
|
0
|
|
|
|
|
0
|
$reply .= "\nEND\n\n"; |
|
442
|
|
|
|
|
|
|
} |
|
443
|
|
|
|
|
|
|
elsif ($cmd eq "deltimers") { |
|
444
|
0
|
|
|
|
|
0
|
my $timers = $self->{metrics}->{timers}; |
|
445
|
0
|
|
|
|
|
0
|
for my $name (@cmdline) { |
|
446
|
0
|
|
|
|
|
0
|
delete $timers->{$name}; |
|
447
|
0
|
|
|
|
|
0
|
$reply .= "deleted: $name\n"; |
|
448
|
|
|
|
|
|
|
} |
|
449
|
0
|
|
|
|
|
0
|
$reply .= "\nEND\n\n"; |
|
450
|
|
|
|
|
|
|
} |
|
451
|
|
|
|
|
|
|
elsif ($cmd eq "delgauges") { |
|
452
|
0
|
|
|
|
|
0
|
my $gauges = $self->{metrics}->{gauges}; |
|
453
|
0
|
|
|
|
|
0
|
for my $name (@cmdline) { |
|
454
|
0
|
|
|
|
|
0
|
delete $gauges->{$name}; |
|
455
|
0
|
|
|
|
|
0
|
$reply .= "deleted: $name\n"; |
|
456
|
|
|
|
|
|
|
} |
|
457
|
0
|
|
|
|
|
0
|
$reply .= "\nEND\n\n"; |
|
458
|
|
|
|
|
|
|
} |
|
459
|
|
|
|
|
|
|
elsif ($cmd eq "quit") { |
|
460
|
0
|
|
|
|
|
0
|
undef $reply; |
|
461
|
0
|
|
|
|
|
0
|
$handle->destroy(); |
|
462
|
|
|
|
|
|
|
} |
|
463
|
|
|
|
|
|
|
else { |
|
464
|
0
|
|
|
|
|
0
|
$reply = "ERROR\n"; |
|
465
|
|
|
|
|
|
|
} |
|
466
|
0
|
|
|
|
|
0
|
return $reply; |
|
467
|
|
|
|
|
|
|
} |
|
468
|
|
|
|
|
|
|
|
|
469
|
|
|
|
|
|
|
sub handle_manager_connection { |
|
470
|
0
|
|
|
0
|
0
|
0
|
my ($self, $handle, $line) = @_; |
|
471
|
|
|
|
|
|
|
#$logger->(notice => "Received mgmt command [$line]"); |
|
472
|
0
|
0
|
|
|
|
0
|
if (my $reply = $self->handle_manager_command($handle, $line)) { |
|
473
|
0
|
|
|
|
|
0
|
$logger->(notice => "Sending mgmt reply [$reply]"); |
|
474
|
0
|
|
|
|
|
0
|
$handle->push_write($reply); |
|
475
|
|
|
|
|
|
|
# Accept a new command on the same connection |
|
476
|
|
|
|
|
|
|
$handle->push_read(line => sub { |
|
477
|
0
|
|
|
0
|
|
0
|
handle_manager_connection($self, @_) |
|
478
|
0
|
|
|
|
|
0
|
}); |
|
479
|
|
|
|
|
|
|
} |
|
480
|
|
|
|
|
|
|
else { |
|
481
|
0
|
|
|
|
|
0
|
$logger->(notice => "Shutting down socket"); |
|
482
|
0
|
|
|
|
|
0
|
$handle->push_write("\n"); |
|
483
|
0
|
|
|
|
|
0
|
$handle->destroy; |
|
484
|
|
|
|
|
|
|
} |
|
485
|
|
|
|
|
|
|
} |
|
486
|
|
|
|
|
|
|
|
|
487
|
|
|
|
|
|
|
sub init_backends { |
|
488
|
0
|
|
|
0
|
0
|
0
|
my ($self) = @_; |
|
489
|
0
|
|
|
|
|
0
|
my $backends = $self->config->{backends}; |
|
490
|
0
|
0
|
0
|
|
|
0
|
if (! $backends or ref $backends ne 'ARRAY') { |
|
491
|
0
|
|
|
|
|
0
|
die "At least one backend is needed in your configuration"; |
|
492
|
|
|
|
|
|
|
} |
|
493
|
0
|
|
|
|
|
0
|
for my $backend (@{ $backends }) { |
|
|
0
|
|
|
|
|
0
|
|
|
494
|
|
|
|
|
|
|
|
|
495
|
|
|
|
|
|
|
# Nodejs statsd expects a relative path |
|
496
|
0
|
0
|
|
|
|
0
|
if ($backend =~ m{^ \./backends/ (.+) $}x) { |
|
497
|
0
|
|
|
|
|
0
|
$backend = $1; |
|
498
|
|
|
|
|
|
|
} |
|
499
|
|
|
|
|
|
|
|
|
500
|
0
|
|
|
|
|
0
|
my $pkg = $backend; |
|
501
|
0
|
0
|
|
|
|
0
|
if ($backend =~ m{^ (\w+) $}x) { |
|
502
|
0
|
|
|
|
|
0
|
$pkg = ucfirst lc $pkg; |
|
503
|
0
|
|
|
|
|
0
|
$pkg = "Net::Statsd::Server::Backend::${pkg}"; |
|
504
|
|
|
|
|
|
|
} |
|
505
|
0
|
|
|
|
|
0
|
my $mod = $pkg; |
|
506
|
0
|
|
|
|
|
0
|
$mod =~ s{::}{/}g; |
|
507
|
0
|
|
|
|
|
0
|
$mod .= ".pm"; |
|
508
|
|
|
|
|
|
|
eval { |
|
509
|
0
|
|
|
|
|
0
|
require $mod ; 1 |
|
|
0
|
|
|
|
|
0
|
|
|
510
|
0
|
0
|
|
|
|
0
|
} or do { |
|
511
|
0
|
|
|
|
|
0
|
$logger->(error=>"Backend ${backend} failed to load: $@"); |
|
512
|
0
|
|
|
|
|
0
|
next; |
|
513
|
|
|
|
|
|
|
}; |
|
514
|
0
|
|
|
|
|
0
|
$self->register_backend($pkg); |
|
515
|
|
|
|
|
|
|
} |
|
516
|
|
|
|
|
|
|
} |
|
517
|
|
|
|
|
|
|
|
|
518
|
|
|
|
|
|
|
sub init_logger { |
|
519
|
0
|
|
|
0
|
0
|
0
|
my ($self, $config) = @_; |
|
520
|
|
|
|
|
|
|
|
|
521
|
0
|
|
0
|
|
|
0
|
$config ||= {}; |
|
522
|
|
|
|
|
|
|
|
|
523
|
0
|
|
0
|
|
|
0
|
my $backend = $config->{backend} || 'stdout'; |
|
524
|
0
|
|
0
|
|
|
0
|
my $level = lc($config->{level} || 'LOG_INFO'); |
|
525
|
0
|
|
|
|
|
0
|
$level =~ s{^log_}{}; |
|
526
|
|
|
|
|
|
|
|
|
527
|
0
|
0
|
|
|
|
0
|
if ($backend eq 'stdout') { |
|
|
|
0
|
|
|
|
|
|
|
528
|
0
|
|
|
|
|
0
|
$AnyEvent::Log::FILTER->level($level); |
|
529
|
|
|
|
|
|
|
} |
|
530
|
|
|
|
|
|
|
elsif ($backend eq 'syslog') { |
|
531
|
|
|
|
|
|
|
# Syslog logging works commenting out the FILTER->level line |
|
532
|
0
|
|
|
|
|
0
|
$AnyEvent::Log::COLLECT->attach( |
|
533
|
|
|
|
|
|
|
AnyEvent::Log::Ctx->new( |
|
534
|
|
|
|
|
|
|
level => $level, |
|
535
|
|
|
|
|
|
|
log_to_syslog => "user", |
|
536
|
|
|
|
|
|
|
) |
|
537
|
|
|
|
|
|
|
); |
|
538
|
|
|
|
|
|
|
} |
|
539
|
0
|
|
0
|
0
|
|
0
|
$logger ||= sub { AE::log(shift(@_), shift(@_)) }; |
|
|
0
|
|
|
|
|
0
|
|
|
540
|
|
|
|
|
|
|
} |
|
541
|
|
|
|
|
|
|
|
|
542
|
|
|
|
|
|
|
sub logger { |
|
543
|
0
|
|
|
0
|
0
|
0
|
return $logger; |
|
544
|
|
|
|
|
|
|
} |
|
545
|
|
|
|
|
|
|
|
|
546
|
|
|
|
|
|
|
sub metrics { |
|
547
|
0
|
|
|
0
|
0
|
0
|
$_[0]->{metrics}; |
|
548
|
|
|
|
|
|
|
} |
|
549
|
|
|
|
|
|
|
|
|
550
|
|
|
|
|
|
|
sub register_backend { |
|
551
|
0
|
|
|
0
|
0
|
0
|
my ($self, $backend) = @_; |
|
552
|
0
|
|
0
|
|
|
0
|
$self->{backends} ||= []; |
|
553
|
0
|
|
|
|
|
0
|
my $backend_instance = $backend->new( |
|
554
|
|
|
|
|
|
|
$self->_start_time_hi, $self->config, |
|
555
|
|
|
|
|
|
|
); |
|
556
|
0
|
|
|
|
|
0
|
$logger->(notice => "Initializing $backend backend"); |
|
557
|
0
|
|
|
|
|
0
|
push @{ $self->{backends} }, $backend_instance; |
|
|
0
|
|
|
|
|
0
|
|
|
558
|
|
|
|
|
|
|
} |
|
559
|
|
|
|
|
|
|
|
|
560
|
|
|
|
|
|
|
sub foreach_backend { |
|
561
|
0
|
|
|
0
|
0
|
0
|
my ($self, $callback) = @_; |
|
562
|
0
|
|
0
|
|
|
0
|
my $backends = $self->{backends} || []; |
|
563
|
0
|
|
|
|
|
0
|
for my $obj (@{ $backends }) { |
|
|
0
|
|
|
|
|
0
|
|
|
564
|
|
|
|
|
|
|
eval { |
|
565
|
0
|
|
|
|
|
0
|
$callback->($obj); 1; |
|
|
0
|
|
|
|
|
0
|
|
|
566
|
0
|
0
|
|
|
|
0
|
} or do { |
|
567
|
0
|
|
|
|
|
0
|
$logger->(error => "Failed callback on $obj backend: $@"); |
|
568
|
|
|
|
|
|
|
}; |
|
569
|
|
|
|
|
|
|
} |
|
570
|
|
|
|
|
|
|
} |
|
571
|
|
|
|
|
|
|
|
|
572
|
|
|
|
|
|
|
sub reload_config { |
|
573
|
0
|
|
|
0
|
0
|
0
|
my ($self) = @_; |
|
574
|
0
|
|
|
|
|
0
|
delete $self->{config}; |
|
575
|
0
|
|
|
|
|
0
|
$logger->(warn => "Received SIGHUP: reloading configuration"); |
|
576
|
0
|
|
|
|
|
0
|
return $self->{config} = $self->config(); |
|
577
|
|
|
|
|
|
|
} |
|
578
|
|
|
|
|
|
|
|
|
579
|
|
|
|
|
|
|
sub setup_flush_timer { |
|
580
|
0
|
|
|
0
|
0
|
0
|
my ($self) = @_; |
|
581
|
|
|
|
|
|
|
|
|
582
|
|
|
|
|
|
|
my $flush_interval = $self->config->{flushInterval} |
|
583
|
0
|
|
0
|
|
|
0
|
|| DEFAULT_FLUSH_INTERVAL; |
|
584
|
|
|
|
|
|
|
|
|
585
|
0
|
|
|
|
|
0
|
$flush_interval = $flush_interval / 1000; |
|
586
|
0
|
|
|
|
|
0
|
$logger->(notice => "metrics flush will happen every ${flush_interval}s"); |
|
587
|
|
|
|
|
|
|
|
|
588
|
|
|
|
|
|
|
my $flush_t = AE::timer $flush_interval, $flush_interval, sub { |
|
589
|
0
|
|
|
0
|
|
0
|
$self->flush_metrics |
|
590
|
0
|
|
|
|
|
0
|
}; |
|
591
|
|
|
|
|
|
|
|
|
592
|
0
|
|
|
|
|
0
|
return $flush_t; |
|
593
|
|
|
|
|
|
|
} |
|
594
|
|
|
|
|
|
|
|
|
595
|
1
|
50
|
|
1
|
|
4
|
sub _defined_or { defined $_[0] ? $_[0] : $_[1] } |
|
596
|
|
|
|
|
|
|
|
|
597
|
|
|
|
|
|
|
sub setup_keyflush_timer { |
|
598
|
0
|
|
|
0
|
0
|
0
|
my ($self) = @_; |
|
599
|
|
|
|
|
|
|
|
|
600
|
0
|
|
|
|
|
0
|
my $conf_kf = $self->config->{keyFlush}; |
|
601
|
0
|
|
|
|
|
0
|
my $kf_interval = _defined_or($conf_kf->{interval}, 0); |
|
602
|
0
|
0
|
|
|
|
0
|
return if $kf_interval <= 0; |
|
603
|
|
|
|
|
|
|
|
|
604
|
|
|
|
|
|
|
# Always milliseconds in the config! |
|
605
|
0
|
|
|
|
|
0
|
$kf_interval /= 1000; |
|
606
|
|
|
|
|
|
|
|
|
607
|
0
|
|
|
|
|
0
|
my $kf_pct = _defined_or($conf_kf->{percent}, 100); |
|
608
|
0
|
|
|
|
|
0
|
my $kf_log = $conf_kf->{log}; |
|
609
|
|
|
|
|
|
|
|
|
610
|
0
|
|
0
|
|
|
0
|
$logger->(notice => "flushing top ${kf_pct}% keys to " |
|
611
|
|
|
|
|
|
|
. ($kf_log || "stdout") |
|
612
|
|
|
|
|
|
|
. " every ${kf_interval}s" |
|
613
|
|
|
|
|
|
|
); |
|
614
|
|
|
|
|
|
|
|
|
615
|
|
|
|
|
|
|
my $kf_timer = AE::timer $kf_interval, $kf_interval, sub { |
|
616
|
0
|
|
|
0
|
|
0
|
$self->flush_top_keys() |
|
617
|
0
|
|
|
|
|
0
|
}; |
|
618
|
|
|
|
|
|
|
|
|
619
|
0
|
|
|
|
|
0
|
return $kf_timer; |
|
620
|
|
|
|
|
|
|
} |
|
621
|
|
|
|
|
|
|
|
|
622
|
|
|
|
|
|
|
sub flush_top_keys { |
|
623
|
0
|
|
|
0
|
0
|
0
|
my ($self) = @_; |
|
624
|
|
|
|
|
|
|
|
|
625
|
0
|
|
|
|
|
0
|
my $conf_kf = _defined_or($self->config->{keyFlush}, {}); |
|
626
|
0
|
|
|
|
|
0
|
my $kf_interval = _defined_or($conf_kf->{interval}, 0); |
|
627
|
0
|
|
|
|
|
0
|
$kf_interval /= 1000; |
|
628
|
|
|
|
|
|
|
|
|
629
|
0
|
|
0
|
|
|
0
|
my $kf_pct = $conf_kf->{percent} || 100; |
|
630
|
0
|
|
|
|
|
0
|
my $kf_log = $conf_kf->{log}; |
|
631
|
|
|
|
|
|
|
|
|
632
|
0
|
|
|
|
|
0
|
my @sorted_keys; |
|
633
|
0
|
|
|
|
|
0
|
my $key_counter = $self->metrics->{keyCounter}; |
|
634
|
0
|
|
|
|
|
0
|
while (my ($k, $v) = each %{ $key_counter }) { |
|
|
0
|
|
|
|
|
0
|
|
|
635
|
0
|
|
|
|
|
0
|
push @sorted_keys, [ $k, $v ]; |
|
636
|
|
|
|
|
|
|
} |
|
637
|
|
|
|
|
|
|
|
|
638
|
0
|
|
|
|
|
0
|
@sorted_keys = sort { $b->[1] <=> $a->[1] } @sorted_keys; |
|
|
0
|
|
|
|
|
0
|
|
|
639
|
|
|
|
|
|
|
|
|
640
|
0
|
|
|
|
|
0
|
my @time = localtime; |
|
641
|
0
|
|
|
|
|
0
|
my $time_str = sprintf "%04d-%02d-%02d %02d:%02d:%02d", |
|
642
|
|
|
|
|
|
|
$time[5] + 1900, $time[4] + 1, $time[3], |
|
643
|
|
|
|
|
|
|
$time[2], $time[1], $time[0]; |
|
644
|
|
|
|
|
|
|
|
|
645
|
0
|
|
|
|
|
0
|
my $log_message = ""; |
|
646
|
|
|
|
|
|
|
|
|
647
|
|
|
|
|
|
|
# Only show the top keyFlush.percent keys |
|
648
|
0
|
|
|
|
|
0
|
my $top_pct_limit = int(scalar(@sorted_keys) * $kf_pct / 100); |
|
649
|
0
|
|
|
|
|
0
|
for my $i (0 .. $top_pct_limit - 1) { |
|
650
|
0
|
|
|
|
|
0
|
$log_message .= sprintf "$time_str count=%d key=%s\n", |
|
651
|
|
|
|
|
|
|
$sorted_keys[$i][1], $sorted_keys[$i][0]; |
|
652
|
|
|
|
|
|
|
} |
|
653
|
|
|
|
|
|
|
|
|
654
|
0
|
0
|
|
|
|
0
|
if ($kf_log) { |
|
655
|
0
|
0
|
|
|
|
0
|
if (open my $log_fh, '>>', $kf_log) { |
|
656
|
0
|
|
|
|
|
0
|
$log_fh->printflush($log_message); |
|
657
|
0
|
|
|
|
|
0
|
$log_fh->close(); |
|
658
|
|
|
|
|
|
|
} |
|
659
|
|
|
|
|
|
|
} else { |
|
660
|
0
|
|
|
|
|
0
|
print $log_message; |
|
661
|
|
|
|
|
|
|
} |
|
662
|
|
|
|
|
|
|
|
|
663
|
|
|
|
|
|
|
# Clear the counters |
|
664
|
0
|
|
|
|
|
0
|
$self->metrics->{keyCounter} = {}; |
|
665
|
|
|
|
|
|
|
|
|
666
|
|
|
|
|
|
|
} |
|
667
|
|
|
|
|
|
|
|
|
668
|
|
|
|
|
|
|
sub init_metrics { |
|
669
|
0
|
|
|
0
|
0
|
0
|
my ($self) = @_; |
|
670
|
0
|
|
|
|
|
0
|
my $config = $self->config; |
|
671
|
0
|
|
|
|
|
0
|
$self->{metrics} = Net::Statsd::Server::Metrics->new($config); |
|
672
|
0
|
|
|
|
|
0
|
return $self->{metrics}; |
|
673
|
|
|
|
|
|
|
} |
|
674
|
|
|
|
|
|
|
|
|
675
|
|
|
|
|
|
|
sub start_server { |
|
676
|
0
|
|
|
0
|
0
|
0
|
my ($self, $config) = @_; |
|
677
|
|
|
|
|
|
|
|
|
678
|
0
|
0
|
|
|
|
0
|
if (! defined $config) { |
|
679
|
0
|
|
|
|
|
0
|
$config = $self->config(); |
|
680
|
|
|
|
|
|
|
} |
|
681
|
|
|
|
|
|
|
|
|
682
|
0
|
|
|
|
|
0
|
$self->init_logger($config->{log}); |
|
683
|
|
|
|
|
|
|
|
|
684
|
0
|
|
0
|
|
|
0
|
my $host = $config->{address} || '0.0.0.0'; |
|
685
|
0
|
|
0
|
|
|
0
|
my $port = $config->{port} || 8125; |
|
686
|
|
|
|
|
|
|
|
|
687
|
0
|
|
0
|
|
|
0
|
my $mgmt_host = $config->{mgmt_address} || '0.0.0.0'; |
|
688
|
0
|
|
0
|
|
|
0
|
my $mgmt_port = $config->{mgmt_port} || 8126; |
|
689
|
|
|
|
|
|
|
|
|
690
|
0
|
|
|
|
|
0
|
$self->init_backends(); |
|
691
|
0
|
|
|
|
|
0
|
$self->init_metrics(); |
|
692
|
|
|
|
|
|
|
|
|
693
|
|
|
|
|
|
|
# Statsd clients interface (UDP) |
|
694
|
|
|
|
|
|
|
$self->{server} = AnyEvent::Handle::UDP->new( |
|
695
|
|
|
|
|
|
|
bind => [$host, $port], |
|
696
|
|
|
|
|
|
|
on_recv => sub { |
|
697
|
0
|
|
|
0
|
|
0
|
my ($data, $ae_handle, $client_addr) = @_; |
|
698
|
|
|
|
|
|
|
#$logger->(debug => "Got data=$data self=$self"); |
|
699
|
0
|
|
|
|
|
0
|
$self->handle_client_packet($data); |
|
700
|
|
|
|
|
|
|
}, |
|
701
|
0
|
|
|
|
|
0
|
); |
|
702
|
|
|
|
|
|
|
|
|
703
|
|
|
|
|
|
|
# Bump up SO_RCVBUF on UDP socket, to buffer up incoming |
|
704
|
|
|
|
|
|
|
# UDP packets, to avoid significant packet loss under load. |
|
705
|
|
|
|
|
|
|
# Read more: http://bit.ly/10eeFoE |
|
706
|
0
|
|
|
|
|
0
|
if (RECEIVE_BUFFER_MB > 0) { |
|
707
|
|
|
|
|
|
|
# On some systems this could fail (cpantesters reports) |
|
708
|
|
|
|
|
|
|
# Have it emit a warning instead of throwing an exception |
|
709
|
0
|
0
|
|
|
|
0
|
setsockopt($self->{server}->fh, SOL_SOCKET, |
|
710
|
|
|
|
|
|
|
SO_RCVBUF, RECEIVE_BUFFER_MB * 1048576) |
|
711
|
|
|
|
|
|
|
or warn "Couldn't set SO_RCVBUF: $!"; |
|
712
|
|
|
|
|
|
|
} |
|
713
|
|
|
|
|
|
|
|
|
714
|
|
|
|
|
|
|
# Management interface (TCP, for 'stats' command, etc...) |
|
715
|
|
|
|
|
|
|
$self->{mgmtServer} = AnyEvent::Socket::tcp_server $mgmt_host, $mgmt_port, sub { |
|
716
|
0
|
0
|
|
0
|
|
0
|
my ($fh, $host, $port) = @_ |
|
717
|
|
|
|
|
|
|
or die "Unable to connect: $!"; |
|
718
|
|
|
|
|
|
|
|
|
719
|
0
|
|
|
|
|
0
|
my $handle; $handle = AnyEvent::Handle->new( |
|
720
|
|
|
|
|
|
|
fh => $fh, |
|
721
|
|
|
|
|
|
|
on_error => sub { |
|
722
|
0
|
|
|
|
|
0
|
AE::log error => $_[2], |
|
723
|
|
|
|
|
|
|
$_[0]->destroy; |
|
724
|
|
|
|
|
|
|
}, |
|
725
|
|
|
|
|
|
|
on_eof => sub { |
|
726
|
0
|
|
|
|
|
0
|
$handle->destroy; |
|
727
|
0
|
|
|
|
|
0
|
AE::log info => "Done.", |
|
728
|
|
|
|
|
|
|
}, |
|
729
|
0
|
|
|
|
|
0
|
); |
|
730
|
|
|
|
|
|
|
|
|
731
|
|
|
|
|
|
|
$handle->push_read(line => sub { |
|
732
|
0
|
|
|
|
|
0
|
handle_manager_connection($self, @_) |
|
733
|
0
|
|
|
|
|
0
|
}); |
|
734
|
0
|
|
|
|
|
0
|
}; |
|
735
|
|
|
|
|
|
|
|
|
736
|
0
|
|
|
|
|
0
|
$logger->(notice => "statsd server started on ${host}:${port} (v${VERSION})"); |
|
737
|
0
|
|
|
|
|
0
|
$logger->(notice => "manager interface started on ${mgmt_host}:${mgmt_port}"); |
|
738
|
|
|
|
|
|
|
|
|
739
|
0
|
|
|
|
|
0
|
my $f_ti = $self->setup_flush_timer; |
|
740
|
0
|
|
|
|
|
0
|
my $kf_ti = $self->setup_keyflush_timer; |
|
741
|
|
|
|
|
|
|
|
|
742
|
|
|
|
|
|
|
# This will block waiting for |
|
743
|
|
|
|
|
|
|
# incoming connections (TCP) or packets (UDP) |
|
744
|
0
|
|
|
|
|
0
|
my $cv = AE::cv; |
|
745
|
0
|
|
|
|
|
0
|
$cv->recv(); |
|
746
|
|
|
|
|
|
|
} |
|
747
|
|
|
|
|
|
|
|
|
748
|
|
|
|
|
|
|
sub stats { |
|
749
|
0
|
|
|
0
|
0
|
0
|
$_[0]->{stats}; |
|
750
|
|
|
|
|
|
|
} |
|
751
|
|
|
|
|
|
|
|
|
752
|
|
|
|
|
|
|
sub trim { |
|
753
|
8
|
|
|
8
|
0
|
5248
|
my $s = shift; |
|
754
|
8
|
100
|
|
|
|
19
|
return unless defined $s; |
|
755
|
7
|
|
|
|
|
32
|
$s =~ s{^\s+}{}; |
|
756
|
7
|
|
|
|
|
12
|
$s =~ s{\s+$}{}; |
|
757
|
7
|
|
|
|
|
11
|
return $s; |
|
758
|
|
|
|
|
|
|
} |
|
759
|
|
|
|
|
|
|
|
|
760
|
|
|
|
|
|
|
1; |
|
761
|
|
|
|
|
|
|
|
|
762
|
|
|
|
|
|
|
__END__ |